1 /*- 2 * BSD LICENSE 3 * 4 * Copyright (c) Intel Corporation. 5 * All rights reserved. 6 * 7 * Redistribution and use in source and binary forms, with or without 8 * modification, are permitted provided that the following conditions 9 * are met: 10 * 11 * * Redistributions of source code must retain the above copyright 12 * notice, this list of conditions and the following disclaimer. 13 * * Redistributions in binary form must reproduce the above copyright 14 * notice, this list of conditions and the following disclaimer in 15 * the documentation and/or other materials provided with the 16 * distribution. 17 * * Neither the name of Intel Corporation nor the names of its 18 * contributors may be used to endorse or promote products derived 19 * from this software without specific prior written permission. 20 * 21 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 22 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 23 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR 24 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT 25 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, 26 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT 27 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 28 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY 29 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 30 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE 31 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 32 */ 33 34 #include "spdk/stdinc.h" 35 #include "spdk/thread.h" 36 #include "spdk/env.h" 37 #include "spdk/event.h" 38 #include "spdk/log.h" 39 #include "spdk/string.h" 40 #include "spdk/accel_engine.h" 41 #include "spdk/crc32.h" 42 #include "spdk/util.h" 43 44 #define DATA_PATTERN 0x5a 45 #define ALIGN_4K 0x1000 46 47 static uint64_t g_tsc_rate; 48 static uint64_t g_tsc_end; 49 static int g_rc; 50 static int g_xfer_size_bytes = 4096; 51 static int g_queue_depth = 32; 52 /* g_allocate_depth indicates how many tasks we allocate per worker. It will 53 * be at least as much as the queue depth. 54 */ 55 static int g_allocate_depth = 0; 56 static int g_ops_per_batch = 0; 57 static int g_threads_per_core = 1; 58 static int g_time_in_sec = 5; 59 static uint32_t g_crc32c_seed = 0; 60 static uint32_t g_crc32c_chained_count = 1; 61 static int g_fail_percent_goal = 0; 62 static uint8_t g_fill_pattern = 255; 63 static bool g_verify = false; 64 static const char *g_workload_type = NULL; 65 static enum accel_capability g_workload_selection; 66 static struct worker_thread *g_workers = NULL; 67 static int g_num_workers = 0; 68 static pthread_mutex_t g_workers_lock = PTHREAD_MUTEX_INITIALIZER; 69 70 struct worker_thread; 71 static void accel_done(void *ref, int status); 72 73 struct display_info { 74 int core; 75 int thread; 76 }; 77 78 struct ap_task { 79 void *src; 80 struct iovec *iovs; 81 uint32_t iov_cnt; 82 void *dst; 83 void *dst2; 84 uint32_t crc_dst; 85 struct worker_thread *worker; 86 int expected_status; /* used for the compare operation */ 87 TAILQ_ENTRY(ap_task) link; 88 }; 89 90 struct accel_batch { 91 int cmd_count; 92 struct spdk_accel_batch *batch; 93 struct worker_thread *worker; 94 TAILQ_ENTRY(accel_batch) link; 95 }; 96 97 struct worker_thread { 98 struct spdk_io_channel *ch; 99 uint64_t xfer_completed; 100 uint64_t xfer_failed; 101 uint64_t injected_miscompares; 102 uint64_t current_queue_depth; 103 TAILQ_HEAD(, ap_task) tasks_pool; 104 struct worker_thread *next; 105 unsigned core; 106 struct spdk_thread *thread; 107 bool is_draining; 108 struct spdk_poller *is_draining_poller; 109 struct spdk_poller *stop_poller; 110 void *task_base; 111 struct accel_batch *batch_base; 112 struct display_info display; 113 TAILQ_HEAD(, accel_batch) in_prep_batches; 114 TAILQ_HEAD(, accel_batch) in_use_batches; 115 TAILQ_HEAD(, accel_batch) to_submit_batches; 116 }; 117 118 static void 119 dump_user_config(struct spdk_app_opts *opts) 120 { 121 printf("SPDK Configuration:\n"); 122 printf("Core mask: %s\n\n", opts->reactor_mask); 123 printf("Accel Perf Configuration:\n"); 124 printf("Workload Type: %s\n", g_workload_type); 125 if (g_workload_selection == ACCEL_CRC32C || g_workload_selection == ACCEL_COPY_CRC32C) { 126 printf("CRC-32C seed: %u\n", g_crc32c_seed); 127 printf("vector count %u\n", g_crc32c_chained_count); 128 } else if (g_workload_selection == ACCEL_FILL) { 129 printf("Fill pattern: 0x%x\n", g_fill_pattern); 130 } else if ((g_workload_selection == ACCEL_COMPARE) && g_fail_percent_goal > 0) { 131 printf("Failure inject: %u percent\n", g_fail_percent_goal); 132 } 133 if (g_workload_selection == ACCEL_COPY_CRC32C) { 134 printf("Vector size: %u bytes\n", g_xfer_size_bytes); 135 printf("Transfer size: %u bytes\n", g_xfer_size_bytes * g_crc32c_chained_count); 136 } else { 137 printf("Transfer size: %u bytes\n", g_xfer_size_bytes); 138 } 139 printf("Queue depth: %u\n", g_queue_depth); 140 printf("Allocate depth: %u\n", g_allocate_depth); 141 printf("# threads/core: %u\n", g_threads_per_core); 142 printf("Run time: %u seconds\n", g_time_in_sec); 143 if (g_ops_per_batch > 0) { 144 printf("Batching: %u operations\n", g_ops_per_batch); 145 } else { 146 printf("Batching: Disabled\n"); 147 } 148 printf("Verify: %s\n\n", g_verify ? "Yes" : "No"); 149 } 150 151 static void 152 usage(void) 153 { 154 printf("accel_perf options:\n"); 155 printf("\t[-h help message]\n"); 156 printf("\t[-q queue depth per core]\n"); 157 printf("\t[-C for crc32c workload, use this value to configure the io vector size to test (default 1)\n"); 158 printf("\t[-T number of threads per core\n"); 159 printf("\t[-n number of channels]\n"); 160 printf("\t[-o transfer size in bytes]\n"); 161 printf("\t[-t time in seconds]\n"); 162 printf("\t[-w workload type must be one of these: copy, fill, crc32c, copy_crc32c, compare, dualcast\n"); 163 printf("\t[-s for crc32c workload, use this seed value (default 0)\n"); 164 printf("\t[-P for compare workload, percentage of operations that should miscompare (percent, default 0)\n"); 165 printf("\t[-f for fill workload, use this BYTE value (default 255)\n"); 166 printf("\t[-y verify result if this switch is on]\n"); 167 printf("\t[-b batch this number of operations at a time (default 0 = disabled)]\n"); 168 printf("\t[-a tasks to allocate per core (default: same value as -q)]\n"); 169 printf("\t\tCan be used to spread operations across a wider range of memory.\n"); 170 } 171 172 static int 173 parse_args(int argc, char *argv) 174 { 175 int argval = 0; 176 177 switch (argc) { 178 case 'a': 179 case 'b': 180 case 'C': 181 case 'f': 182 case 'T': 183 case 'o': 184 case 'P': 185 case 'q': 186 case 's': 187 case 't': 188 argval = spdk_strtol(optarg, 10); 189 if (argval < 0) { 190 fprintf(stderr, "-%c option must be non-negative.\n", argc); 191 usage(); 192 return 1; 193 } 194 break; 195 default: 196 break; 197 }; 198 199 switch (argc) { 200 case 'a': 201 g_allocate_depth = argval; 202 break; 203 case 'b': 204 g_ops_per_batch = argval; 205 break; 206 case 'C': 207 g_crc32c_chained_count = argval; 208 break; 209 case 'f': 210 g_fill_pattern = (uint8_t)argval; 211 break; 212 case 'T': 213 g_threads_per_core = argval; 214 break; 215 case 'o': 216 g_xfer_size_bytes = argval; 217 break; 218 case 'P': 219 g_fail_percent_goal = argval; 220 break; 221 case 'q': 222 g_queue_depth = argval; 223 break; 224 case 's': 225 g_crc32c_seed = argval; 226 break; 227 case 't': 228 g_time_in_sec = argval; 229 break; 230 case 'y': 231 g_verify = true; 232 break; 233 case 'w': 234 g_workload_type = optarg; 235 if (!strcmp(g_workload_type, "copy")) { 236 g_workload_selection = ACCEL_COPY; 237 } else if (!strcmp(g_workload_type, "fill")) { 238 g_workload_selection = ACCEL_FILL; 239 } else if (!strcmp(g_workload_type, "crc32c")) { 240 g_workload_selection = ACCEL_CRC32C; 241 } else if (!strcmp(g_workload_type, "copy_crc32c")) { 242 g_workload_selection = ACCEL_COPY_CRC32C; 243 } else if (!strcmp(g_workload_type, "compare")) { 244 g_workload_selection = ACCEL_COMPARE; 245 } else if (!strcmp(g_workload_type, "dualcast")) { 246 g_workload_selection = ACCEL_DUALCAST; 247 } 248 break; 249 default: 250 usage(); 251 return 1; 252 } 253 254 return 0; 255 } 256 257 static int dump_result(void); 258 static void 259 unregister_worker(void *arg1) 260 { 261 struct worker_thread *worker = arg1; 262 263 free(worker->task_base); 264 free(worker->batch_base); 265 spdk_put_io_channel(worker->ch); 266 pthread_mutex_lock(&g_workers_lock); 267 assert(g_num_workers >= 1); 268 if (--g_num_workers == 0) { 269 pthread_mutex_unlock(&g_workers_lock); 270 g_rc = dump_result(); 271 spdk_app_stop(0); 272 } 273 pthread_mutex_unlock(&g_workers_lock); 274 } 275 276 static int 277 _get_task_data_bufs(struct ap_task *task) 278 { 279 uint32_t align = 0; 280 uint32_t i = 0; 281 int dst_buff_len = g_xfer_size_bytes; 282 283 /* For dualcast, the DSA HW requires 4K alignment on destination addresses but 284 * we do this for all engines to keep it simple. 285 */ 286 if (g_workload_selection == ACCEL_DUALCAST) { 287 align = ALIGN_4K; 288 } 289 290 if (g_workload_selection == ACCEL_CRC32C || g_workload_selection == ACCEL_COPY_CRC32C) { 291 assert(g_crc32c_chained_count > 0); 292 task->iov_cnt = g_crc32c_chained_count; 293 task->iovs = calloc(task->iov_cnt, sizeof(struct iovec)); 294 if (!task->iovs) { 295 fprintf(stderr, "cannot allocated task->iovs fot task=%p\n", task); 296 return -ENOMEM; 297 } 298 299 if (g_workload_selection == ACCEL_COPY_CRC32C) { 300 dst_buff_len = g_xfer_size_bytes * g_crc32c_chained_count; 301 } 302 303 for (i = 0; i < task->iov_cnt; i++) { 304 task->iovs[i].iov_base = spdk_dma_zmalloc(g_xfer_size_bytes, 0, NULL); 305 if (task->iovs[i].iov_base == NULL) { 306 return -ENOMEM; 307 } 308 memset(task->iovs[i].iov_base, DATA_PATTERN, g_xfer_size_bytes); 309 task->iovs[i].iov_len = g_xfer_size_bytes; 310 } 311 312 } else { 313 task->src = spdk_dma_zmalloc(g_xfer_size_bytes, 0, NULL); 314 if (task->src == NULL) { 315 fprintf(stderr, "Unable to alloc src buffer\n"); 316 return -ENOMEM; 317 } 318 319 /* For fill, set the entire src buffer so we can check if verify is enabled. */ 320 if (g_workload_selection == ACCEL_FILL) { 321 memset(task->src, g_fill_pattern, g_xfer_size_bytes); 322 } else { 323 memset(task->src, DATA_PATTERN, g_xfer_size_bytes); 324 } 325 } 326 327 if (g_workload_selection != ACCEL_CRC32C) { 328 task->dst = spdk_dma_zmalloc(dst_buff_len, align, NULL); 329 if (task->dst == NULL) { 330 fprintf(stderr, "Unable to alloc dst buffer\n"); 331 return -ENOMEM; 332 } 333 334 /* For compare we want the buffers to match, otherwise not. */ 335 if (g_workload_selection == ACCEL_COMPARE) { 336 memset(task->dst, DATA_PATTERN, dst_buff_len); 337 } else { 338 memset(task->dst, ~DATA_PATTERN, dst_buff_len); 339 } 340 } 341 342 if (g_workload_selection == ACCEL_DUALCAST) { 343 task->dst2 = spdk_dma_zmalloc(g_xfer_size_bytes, align, NULL); 344 if (task->dst2 == NULL) { 345 fprintf(stderr, "Unable to alloc dst buffer\n"); 346 return -ENOMEM; 347 } 348 memset(task->dst2, ~DATA_PATTERN, g_xfer_size_bytes); 349 } 350 351 return 0; 352 } 353 354 inline static struct ap_task * 355 _get_task(struct worker_thread *worker) 356 { 357 struct ap_task *task; 358 359 if (!TAILQ_EMPTY(&worker->tasks_pool)) { 360 task = TAILQ_FIRST(&worker->tasks_pool); 361 TAILQ_REMOVE(&worker->tasks_pool, task, link); 362 } else { 363 fprintf(stderr, "Unable to get ap_task\n"); 364 return NULL; 365 } 366 367 return task; 368 } 369 370 /* Submit one operation using the same ap task that just completed. */ 371 static void 372 _submit_single(struct worker_thread *worker, struct ap_task *task) 373 { 374 int random_num; 375 int rc = 0; 376 377 assert(worker); 378 379 switch (g_workload_selection) { 380 case ACCEL_COPY: 381 rc = spdk_accel_submit_copy(worker->ch, task->dst, task->src, 382 g_xfer_size_bytes, accel_done, task); 383 break; 384 case ACCEL_FILL: 385 /* For fill use the first byte of the task->dst buffer */ 386 rc = spdk_accel_submit_fill(worker->ch, task->dst, *(uint8_t *)task->src, 387 g_xfer_size_bytes, accel_done, task); 388 break; 389 case ACCEL_CRC32C: 390 rc = spdk_accel_submit_crc32cv(worker->ch, &task->crc_dst, 391 task->iovs, task->iov_cnt, g_crc32c_seed, 392 accel_done, task); 393 break; 394 case ACCEL_COPY_CRC32C: 395 rc = spdk_accel_submit_copy_crc32cv(worker->ch, task->dst, task->iovs, task->iov_cnt, 396 &task->crc_dst, g_crc32c_seed, accel_done, task); 397 break; 398 case ACCEL_COMPARE: 399 random_num = rand() % 100; 400 if (random_num < g_fail_percent_goal) { 401 task->expected_status = -EILSEQ; 402 *(uint8_t *)task->dst = ~DATA_PATTERN; 403 } else { 404 task->expected_status = 0; 405 *(uint8_t *)task->dst = DATA_PATTERN; 406 } 407 rc = spdk_accel_submit_compare(worker->ch, task->dst, task->src, 408 g_xfer_size_bytes, accel_done, task); 409 break; 410 case ACCEL_DUALCAST: 411 rc = spdk_accel_submit_dualcast(worker->ch, task->dst, task->dst2, 412 task->src, g_xfer_size_bytes, accel_done, task); 413 break; 414 default: 415 assert(false); 416 break; 417 418 } 419 420 if (rc) { 421 accel_done(task, rc); 422 } 423 } 424 425 static int 426 _batch_prep_cmd(struct worker_thread *worker, struct ap_task *task, 427 struct accel_batch *worker_batch) 428 { 429 struct spdk_accel_batch *batch = worker_batch->batch; 430 int rc = 0; 431 432 worker_batch->cmd_count++; 433 assert(worker_batch->cmd_count <= g_ops_per_batch); 434 435 switch (g_workload_selection) { 436 case ACCEL_COPY: 437 rc = spdk_accel_batch_prep_copy(worker->ch, batch, task->dst, 438 task->src, g_xfer_size_bytes, accel_done, task); 439 break; 440 case ACCEL_DUALCAST: 441 rc = spdk_accel_batch_prep_dualcast(worker->ch, batch, task->dst, task->dst2, 442 task->src, g_xfer_size_bytes, accel_done, task); 443 break; 444 case ACCEL_COMPARE: 445 rc = spdk_accel_batch_prep_compare(worker->ch, batch, task->dst, task->src, 446 g_xfer_size_bytes, accel_done, task); 447 break; 448 case ACCEL_FILL: 449 rc = spdk_accel_batch_prep_fill(worker->ch, batch, task->dst, 450 *(uint8_t *)task->src, 451 g_xfer_size_bytes, accel_done, task); 452 break; 453 case ACCEL_COPY_CRC32C: 454 rc = spdk_accel_batch_prep_copy_crc32c(worker->ch, batch, task->dst, task->src, &task->crc_dst, 455 g_crc32c_seed, g_xfer_size_bytes, accel_done, task); 456 break; 457 case ACCEL_CRC32C: 458 rc = spdk_accel_batch_prep_crc32cv(worker->ch, batch, &task->crc_dst, 459 task->iovs, task->iov_cnt, g_crc32c_seed, accel_done, task); 460 break; 461 default: 462 assert(false); 463 break; 464 } 465 466 return rc; 467 } 468 469 static void 470 _free_task_buffers(struct ap_task *task) 471 { 472 uint32_t i; 473 474 if (g_workload_selection == ACCEL_CRC32C || g_workload_selection == ACCEL_COPY_CRC32C) { 475 if (task->iovs) { 476 for (i = 0; i < task->iov_cnt; i++) { 477 if (task->iovs[i].iov_base) { 478 spdk_dma_free(task->iovs[i].iov_base); 479 } 480 } 481 free(task->iovs); 482 } 483 } else { 484 spdk_dma_free(task->src); 485 } 486 487 spdk_dma_free(task->dst); 488 if (g_workload_selection == ACCEL_DUALCAST) { 489 spdk_dma_free(task->dst2); 490 } 491 } 492 493 static void 494 _build_batch(struct worker_thread *worker, struct ap_task *task) 495 { 496 struct accel_batch *worker_batch = NULL; 497 int rc; 498 499 assert(!TAILQ_EMPTY(&worker->in_prep_batches)); 500 501 worker_batch = TAILQ_FIRST(&worker->in_prep_batches); 502 503 /* If an accel batch hasn't been created yet do so now. */ 504 if (worker_batch->batch == NULL) { 505 worker_batch->batch = spdk_accel_batch_create(worker->ch); 506 if (worker_batch->batch == NULL) { 507 fprintf(stderr, "error unable to create new batch\n"); 508 return; 509 } 510 } 511 512 /* Prep the command re-using the last completed command's task */ 513 rc = _batch_prep_cmd(worker, task, worker_batch); 514 if (rc) { 515 fprintf(stderr, "error preping command for batch\n"); 516 goto error; 517 } 518 519 /* If this batch is full move it to the to_submit list so it gets 520 * submitted as batches complete. 521 */ 522 if (worker_batch->cmd_count == g_ops_per_batch) { 523 TAILQ_REMOVE(&worker->in_prep_batches, worker_batch, link); 524 TAILQ_INSERT_TAIL(&worker->to_submit_batches, worker_batch, link); 525 } 526 527 return; 528 error: 529 spdk_accel_batch_cancel(worker->ch, worker_batch->batch); 530 531 } 532 533 static void batch_done(void *cb_arg, int status); 534 static void 535 _drain_batch(struct worker_thread *worker) 536 { 537 struct accel_batch *worker_batch, *tmp; 538 int rc; 539 540 /* submit any batches that were being built up. */ 541 TAILQ_FOREACH_SAFE(worker_batch, &worker->in_prep_batches, link, tmp) { 542 if (worker_batch->cmd_count == 0) { 543 continue; 544 } 545 worker->current_queue_depth += worker_batch->cmd_count + 1; 546 547 TAILQ_REMOVE(&worker->in_prep_batches, worker_batch, link); 548 TAILQ_INSERT_TAIL(&worker->in_use_batches, worker_batch, link); 549 rc = spdk_accel_batch_submit(worker->ch, worker_batch->batch, batch_done, worker_batch); 550 if (rc == 0) { 551 worker_batch->cmd_count = 0; 552 } else { 553 fprintf(stderr, "error sending final batch\n"); 554 worker->current_queue_depth -= worker_batch->cmd_count + 1; 555 break; 556 } 557 } 558 } 559 560 static void 561 batch_done(void *arg1, int status) 562 { 563 struct accel_batch *worker_batch = (struct accel_batch *)arg1; 564 struct worker_thread *worker = worker_batch->worker; 565 int rc; 566 567 assert(worker); 568 assert(TAILQ_EMPTY(&worker->in_use_batches) == 0); 569 570 if (status) { 571 SPDK_ERRLOG("error %d\n", status); 572 } 573 574 worker->current_queue_depth--; 575 TAILQ_REMOVE(&worker->in_use_batches, worker_batch, link); 576 TAILQ_INSERT_TAIL(&worker->in_prep_batches, worker_batch, link); 577 worker_batch->batch = NULL; 578 worker_batch->cmd_count = 0; 579 580 if (!worker->is_draining) { 581 worker_batch = TAILQ_FIRST(&worker->to_submit_batches); 582 if (worker_batch != NULL) { 583 584 assert(worker_batch->cmd_count == g_ops_per_batch); 585 586 /* Add one for the batch command itself. */ 587 worker->current_queue_depth += g_ops_per_batch + 1; 588 TAILQ_REMOVE(&worker->to_submit_batches, worker_batch, link); 589 TAILQ_INSERT_TAIL(&worker->in_use_batches, worker_batch, link); 590 591 rc = spdk_accel_batch_submit(worker->ch, worker_batch->batch, batch_done, worker_batch); 592 if (rc) { 593 fprintf(stderr, "error ending batch\n"); 594 worker->current_queue_depth -= g_ops_per_batch + 1; 595 return; 596 } 597 } 598 } else { 599 _drain_batch(worker); 600 } 601 } 602 603 static int 604 _vector_memcmp(void *_dst, struct iovec *src_iovs, uint32_t iovcnt) 605 { 606 uint32_t i; 607 uint32_t ttl_len = 0; 608 uint8_t *dst = (uint8_t *)_dst; 609 610 for (i = 0; i < iovcnt; i++) { 611 if (memcmp(dst, src_iovs[i].iov_base, src_iovs[i].iov_len)) { 612 return -1; 613 } 614 dst += src_iovs[i].iov_len; 615 ttl_len += src_iovs[i].iov_len; 616 } 617 618 if (ttl_len != iovcnt * g_xfer_size_bytes) { 619 return -1; 620 } 621 622 return 0; 623 } 624 625 static void 626 accel_done(void *arg1, int status) 627 { 628 struct ap_task *task = arg1; 629 struct worker_thread *worker = task->worker; 630 uint32_t sw_crc32c; 631 632 assert(worker); 633 assert(worker->current_queue_depth > 0); 634 635 if (g_verify && status == 0) { 636 switch (g_workload_selection) { 637 case ACCEL_COPY_CRC32C: 638 sw_crc32c = spdk_crc32c_iov_update(task->iovs, task->iov_cnt, ~g_crc32c_seed); 639 if (task->crc_dst != sw_crc32c) { 640 SPDK_NOTICELOG("CRC-32C miscompare\n"); 641 worker->xfer_failed++; 642 } 643 if (_vector_memcmp(task->dst, task->iovs, task->iov_cnt)) { 644 SPDK_NOTICELOG("Data miscompare\n"); 645 worker->xfer_failed++; 646 } 647 break; 648 case ACCEL_CRC32C: 649 sw_crc32c = spdk_crc32c_iov_update(task->iovs, task->iov_cnt, ~g_crc32c_seed); 650 if (task->crc_dst != sw_crc32c) { 651 SPDK_NOTICELOG("CRC-32C miscompare\n"); 652 worker->xfer_failed++; 653 } 654 break; 655 case ACCEL_COPY: 656 if (memcmp(task->src, task->dst, g_xfer_size_bytes)) { 657 SPDK_NOTICELOG("Data miscompare\n"); 658 worker->xfer_failed++; 659 } 660 break; 661 case ACCEL_DUALCAST: 662 if (memcmp(task->src, task->dst, g_xfer_size_bytes)) { 663 SPDK_NOTICELOG("Data miscompare, first destination\n"); 664 worker->xfer_failed++; 665 } 666 if (memcmp(task->src, task->dst2, g_xfer_size_bytes)) { 667 SPDK_NOTICELOG("Data miscompare, second destination\n"); 668 worker->xfer_failed++; 669 } 670 break; 671 case ACCEL_FILL: 672 if (memcmp(task->dst, task->src, g_xfer_size_bytes)) { 673 SPDK_NOTICELOG("Data miscompare\n"); 674 worker->xfer_failed++; 675 } 676 break; 677 case ACCEL_COMPARE: 678 break; 679 default: 680 assert(false); 681 break; 682 } 683 } 684 685 if (task->expected_status == -EILSEQ) { 686 assert(status != 0); 687 worker->injected_miscompares++; 688 } else if (status) { 689 /* Expected to pass but the accel engine reported an error (ex: COMPARE operation). */ 690 worker->xfer_failed++; 691 } 692 693 worker->xfer_completed++; 694 worker->current_queue_depth--; 695 696 if (!worker->is_draining) { 697 TAILQ_INSERT_TAIL(&worker->tasks_pool, task, link); 698 task = _get_task(worker); 699 if (g_ops_per_batch == 0) { 700 _submit_single(worker, task); 701 worker->current_queue_depth++; 702 } else { 703 _build_batch(worker, task); 704 } 705 } else if (g_ops_per_batch > 0) { 706 _drain_batch(worker); 707 } else { 708 TAILQ_INSERT_TAIL(&worker->tasks_pool, task, link); 709 } 710 } 711 712 static int 713 dump_result(void) 714 { 715 uint64_t total_completed = 0; 716 uint64_t total_failed = 0; 717 uint64_t total_miscompared = 0; 718 uint64_t total_xfer_per_sec, total_bw_in_MiBps; 719 struct worker_thread *worker = g_workers; 720 721 printf("\nCore,Thread Transfers Bandwidth Failed Miscompares\n"); 722 printf("------------------------------------------------------------------------\n"); 723 while (worker != NULL) { 724 725 uint64_t xfer_per_sec = worker->xfer_completed / g_time_in_sec; 726 uint64_t bw_in_MiBps = (worker->xfer_completed * g_xfer_size_bytes) / 727 (g_time_in_sec * 1024 * 1024); 728 729 total_completed += worker->xfer_completed; 730 total_failed += worker->xfer_failed; 731 total_miscompared += worker->injected_miscompares; 732 733 if (xfer_per_sec) { 734 printf("%u,%u%17" PRIu64 "/s%9" PRIu64 " MiB/s%7" PRIu64 " %11" PRIu64 "\n", 735 worker->display.core, worker->display.thread, xfer_per_sec, 736 bw_in_MiBps, worker->xfer_failed, worker->injected_miscompares); 737 } 738 739 worker = worker->next; 740 } 741 742 total_xfer_per_sec = total_completed / g_time_in_sec; 743 total_bw_in_MiBps = (total_completed * g_xfer_size_bytes) / 744 (g_time_in_sec * 1024 * 1024); 745 746 printf("=========================================================================\n"); 747 printf("Total:%15" PRIu64 "/s%9" PRIu64 " MiB/s%6" PRIu64 " %11" PRIu64"\n\n", 748 total_xfer_per_sec, total_bw_in_MiBps, total_failed, total_miscompared); 749 750 return total_failed ? 1 : 0; 751 } 752 753 static inline void 754 _free_task_buffers_in_pool(struct worker_thread *worker) 755 { 756 struct ap_task *task; 757 758 assert(worker); 759 while ((task = TAILQ_FIRST(&worker->tasks_pool))) { 760 TAILQ_REMOVE(&worker->tasks_pool, task, link); 761 _free_task_buffers(task); 762 } 763 } 764 765 static int 766 _check_draining(void *arg) 767 { 768 struct worker_thread *worker = arg; 769 770 assert(worker); 771 772 if (worker->current_queue_depth == 0) { 773 _free_task_buffers_in_pool(worker); 774 spdk_poller_unregister(&worker->is_draining_poller); 775 unregister_worker(worker); 776 } 777 778 return -1; 779 } 780 781 static int 782 _worker_stop(void *arg) 783 { 784 struct worker_thread *worker = arg; 785 786 assert(worker); 787 788 spdk_poller_unregister(&worker->stop_poller); 789 790 /* now let the worker drain and check it's outstanding IO with a poller */ 791 worker->is_draining = true; 792 worker->is_draining_poller = SPDK_POLLER_REGISTER(_check_draining, worker, 0); 793 794 return 0; 795 } 796 797 static void 798 _init_thread(void *arg1) 799 { 800 struct worker_thread *worker; 801 struct ap_task *task; 802 int i, rc, num_batches; 803 int max_per_batch; 804 int remaining = g_queue_depth; 805 int num_tasks = g_allocate_depth; 806 struct accel_batch *tmp; 807 struct accel_batch *worker_batch = NULL; 808 struct display_info *display = arg1; 809 810 worker = calloc(1, sizeof(*worker)); 811 if (worker == NULL) { 812 fprintf(stderr, "Unable to allocate worker\n"); 813 free(display); 814 return; 815 } 816 817 worker->display.core = display->core; 818 worker->display.thread = display->thread; 819 free(display); 820 worker->core = spdk_env_get_current_core(); 821 worker->thread = spdk_get_thread(); 822 pthread_mutex_lock(&g_workers_lock); 823 g_num_workers++; 824 worker->next = g_workers; 825 g_workers = worker; 826 pthread_mutex_unlock(&g_workers_lock); 827 worker->ch = spdk_accel_engine_get_io_channel(); 828 829 TAILQ_INIT(&worker->tasks_pool); 830 831 if (g_ops_per_batch > 0) { 832 833 max_per_batch = spdk_accel_batch_get_max(worker->ch); 834 assert(max_per_batch > 0); 835 836 if (g_ops_per_batch > max_per_batch) { 837 fprintf(stderr, "Reducing requested batch amount to max supported of %d\n", max_per_batch); 838 g_ops_per_batch = max_per_batch; 839 } 840 841 if (g_ops_per_batch > g_queue_depth) { 842 fprintf(stderr, "Batch amount > queue depth, resetting to %d\n", g_queue_depth); 843 g_ops_per_batch = g_queue_depth; 844 } 845 846 TAILQ_INIT(&worker->in_prep_batches); 847 TAILQ_INIT(&worker->to_submit_batches); 848 TAILQ_INIT(&worker->in_use_batches); 849 850 /* A worker_batch will live on one of 3 lists: 851 * IN_PREP: as individual IOs complete new ones are built on on a 852 * worker_batch on this list until it reaches g_ops_per_batch. 853 * TO_SUBMIT: as batches are built up on IO completion they are moved 854 * to this list once they are full. This list is used in 855 * batch completion to start new batches. 856 * IN_USE: the worker_batch is outstanding and will be moved to in prep 857 * list when the batch is completed. 858 * 859 * So we need enough to cover Q depth loading and then one to replace 860 * each one of those and for when everything is outstanding there needs 861 * to be one extra batch to build up while the last batch is completing 862 * IO but before it's completed the batch command. 863 */ 864 num_batches = (g_queue_depth / g_ops_per_batch * 2) + 1; 865 worker->batch_base = calloc(num_batches, sizeof(struct accel_batch)); 866 worker_batch = worker->batch_base; 867 for (i = 0; i < num_batches; i++) { 868 worker_batch->worker = worker; 869 TAILQ_INSERT_TAIL(&worker->in_prep_batches, worker_batch, link); 870 worker_batch++; 871 } 872 } 873 874 worker->task_base = calloc(num_tasks, sizeof(struct ap_task)); 875 if (worker->task_base == NULL) { 876 fprintf(stderr, "Could not allocate task base.\n"); 877 goto error; 878 } 879 880 task = worker->task_base; 881 for (i = 0; i < num_tasks; i++) { 882 TAILQ_INSERT_TAIL(&worker->tasks_pool, task, link); 883 task->worker = worker; 884 if (_get_task_data_bufs(task)) { 885 fprintf(stderr, "Unable to get data bufs\n"); 886 goto error; 887 } 888 task++; 889 } 890 891 /* Register a poller that will stop the worker at time elapsed */ 892 worker->stop_poller = SPDK_POLLER_REGISTER(_worker_stop, worker, 893 g_time_in_sec * 1000000ULL); 894 895 /* If batching is enabled load up to the full Q depth before 896 * processing any completions, then ping pong between two batches, 897 * one processing and one being built up for when the other completes. 898 */ 899 if (g_ops_per_batch > 0) { 900 do { 901 worker_batch = TAILQ_FIRST(&worker->in_prep_batches); 902 if (worker_batch == NULL) { 903 goto error; 904 } 905 906 worker_batch->batch = spdk_accel_batch_create(worker->ch); 907 if (worker_batch->batch == NULL) { 908 raise(SIGINT); 909 break; 910 } 911 912 for (i = 0; i < g_ops_per_batch; i++) { 913 task = _get_task(worker); 914 worker->current_queue_depth++; 915 if (task == NULL) { 916 goto error; 917 } 918 919 rc = _batch_prep_cmd(worker, task, worker_batch); 920 if (rc) { 921 fprintf(stderr, "error preping command\n"); 922 goto error; 923 } 924 } 925 926 /* for the batch operation itself. */ 927 task->worker->current_queue_depth++; 928 TAILQ_REMOVE(&worker->in_prep_batches, worker_batch, link); 929 TAILQ_INSERT_TAIL(&worker->in_use_batches, worker_batch, link); 930 931 rc = spdk_accel_batch_submit(worker->ch, worker_batch->batch, batch_done, worker_batch); 932 if (rc) { 933 fprintf(stderr, "error ending batch\n"); 934 goto error; 935 } 936 assert(remaining >= g_ops_per_batch); 937 remaining -= g_ops_per_batch; 938 } while (remaining > 0); 939 } 940 941 /* Submit as singles when no batching is enabled or we ran out of batches. */ 942 for (i = 0; i < remaining; i++) { 943 task = _get_task(worker); 944 worker->current_queue_depth++; 945 if (task == NULL) { 946 goto error; 947 } 948 949 _submit_single(worker, task); 950 } 951 return; 952 error: 953 if (worker_batch && worker_batch->batch) { 954 TAILQ_FOREACH_SAFE(worker_batch, &worker->in_use_batches, link, tmp) { 955 spdk_accel_batch_cancel(worker->ch, worker_batch->batch); 956 TAILQ_REMOVE(&worker->in_use_batches, worker_batch, link); 957 } 958 } 959 960 _free_task_buffers_in_pool(worker); 961 free(worker->batch_base); 962 free(worker->task_base); 963 free(worker); 964 spdk_app_stop(-1); 965 } 966 967 static inline void 968 identify_accel_engine_usage(void) 969 { 970 struct spdk_io_channel *ch; 971 uint64_t capabilities; 972 973 ch = spdk_accel_engine_get_io_channel(); 974 assert(ch != NULL); 975 976 capabilities = spdk_accel_get_capabilities(ch); 977 if ((capabilities & g_workload_selection) != g_workload_selection) { 978 SPDK_WARNLOG("The selected workload is not natively supported by the current engine\n"); 979 SPDK_WARNLOG("The software engine will be used instead.\n\n"); 980 } 981 982 spdk_put_io_channel(ch); 983 } 984 985 static void 986 accel_perf_start(void *arg1) 987 { 988 struct spdk_cpuset tmp_cpumask = {}; 989 char thread_name[32]; 990 uint32_t i; 991 int j; 992 struct spdk_thread *thread; 993 struct display_info *display; 994 995 identify_accel_engine_usage(); 996 997 g_tsc_rate = spdk_get_ticks_hz(); 998 g_tsc_end = spdk_get_ticks() + g_time_in_sec * g_tsc_rate; 999 1000 printf("Running for %d seconds...\n", g_time_in_sec); 1001 fflush(stdout); 1002 1003 /* Create worker threads for each core that was specified. */ 1004 SPDK_ENV_FOREACH_CORE(i) { 1005 for (j = 0; j < g_threads_per_core; j++) { 1006 snprintf(thread_name, sizeof(thread_name), "ap_worker_%u_%u", i, j); 1007 spdk_cpuset_zero(&tmp_cpumask); 1008 spdk_cpuset_set_cpu(&tmp_cpumask, i, true); 1009 thread = spdk_thread_create(thread_name, &tmp_cpumask); 1010 display = calloc(1, sizeof(*display)); 1011 if (display == NULL) { 1012 fprintf(stderr, "Unable to allocate memory\n"); 1013 spdk_app_stop(-1); 1014 return; 1015 } 1016 display->core = i; 1017 display->thread = j; 1018 spdk_thread_send_msg(thread, _init_thread, display); 1019 } 1020 } 1021 } 1022 1023 int 1024 main(int argc, char **argv) 1025 { 1026 struct spdk_app_opts opts = {}; 1027 struct worker_thread *worker, *tmp; 1028 1029 pthread_mutex_init(&g_workers_lock, NULL); 1030 spdk_app_opts_init(&opts, sizeof(opts)); 1031 opts.reactor_mask = "0x1"; 1032 if (spdk_app_parse_args(argc, argv, &opts, "a:C:o:q:t:yw:P:f:b:T:", NULL, parse_args, 1033 usage) != SPDK_APP_PARSE_ARGS_SUCCESS) { 1034 g_rc = -1; 1035 goto cleanup; 1036 } 1037 1038 if ((g_workload_selection != ACCEL_COPY) && 1039 (g_workload_selection != ACCEL_FILL) && 1040 (g_workload_selection != ACCEL_CRC32C) && 1041 (g_workload_selection != ACCEL_COPY_CRC32C) && 1042 (g_workload_selection != ACCEL_COMPARE) && 1043 (g_workload_selection != ACCEL_DUALCAST)) { 1044 usage(); 1045 g_rc = -1; 1046 goto cleanup; 1047 } 1048 1049 if (g_ops_per_batch > 0 && (g_queue_depth % g_ops_per_batch > 0)) { 1050 fprintf(stdout, "batch size must be a multiple of queue depth\n"); 1051 usage(); 1052 g_rc = -1; 1053 goto cleanup; 1054 } 1055 1056 if (g_allocate_depth > 0 && g_queue_depth > g_allocate_depth) { 1057 fprintf(stdout, "allocate depth must be at least as big as queue depth\n"); 1058 usage(); 1059 g_rc = -1; 1060 goto cleanup; 1061 } 1062 1063 if (g_allocate_depth == 0) { 1064 g_allocate_depth = g_queue_depth; 1065 } 1066 1067 if ((g_workload_selection == ACCEL_CRC32C || g_workload_selection == ACCEL_COPY_CRC32C) && 1068 g_crc32c_chained_count == 0) { 1069 usage(); 1070 g_rc = -1; 1071 goto cleanup; 1072 } 1073 1074 dump_user_config(&opts); 1075 g_rc = spdk_app_start(&opts, accel_perf_start, NULL); 1076 if (g_rc) { 1077 SPDK_ERRLOG("ERROR starting application\n"); 1078 } 1079 1080 pthread_mutex_destroy(&g_workers_lock); 1081 1082 worker = g_workers; 1083 while (worker) { 1084 tmp = worker->next; 1085 free(worker); 1086 worker = tmp; 1087 } 1088 cleanup: 1089 spdk_app_fini(); 1090 return g_rc; 1091 } 1092