1 /* SPDX-License-Identifier: BSD-3-Clause 2 * Copyright (C) 2018 Intel Corporation. 3 * All rights reserved. 4 * Copyright (c) 2021-2023 NVIDIA CORPORATION & AFFILIATES. All rights reserved. 5 */ 6 7 #include "accel_dpdk_compressdev.h" 8 #include "spdk/accel_module.h" 9 10 #include "spdk/stdinc.h" 11 #include "spdk/rpc.h" 12 #include "spdk/env.h" 13 #include "spdk/endian.h" 14 #include "spdk/string.h" 15 #include "spdk/thread.h" 16 #include "spdk/util.h" 17 #include "spdk/likely.h" 18 19 #include "spdk/log.h" 20 21 #include <rte_config.h> 22 #include <rte_bus_vdev.h> 23 #include <rte_compressdev.h> 24 #include <rte_comp.h> 25 #include <rte_mbuf_dyn.h> 26 27 /* Used to store IO context in mbuf */ 28 static const struct rte_mbuf_dynfield rte_mbuf_dynfield_io_context = { 29 .name = "context_accel_comp", 30 .size = sizeof(uint64_t), 31 .align = __alignof__(uint64_t), 32 .flags = 0, 33 }; 34 static int g_mbuf_offset; 35 static enum compress_pmd g_opts; 36 static bool g_compressdev_enable = false; 37 static bool g_compressdev_initialized = false; 38 39 #define NUM_MAX_XFORMS 2 40 #define NUM_MAX_INFLIGHT_OPS 128 41 #define DEFAULT_WINDOW_SIZE 15 42 #define MBUF_SPLIT (1UL << DEFAULT_WINDOW_SIZE) 43 #define QAT_PMD "compress_qat" 44 #define MLX5_PMD "mlx5_pci" 45 #define UADK_PMD "compress_uadk" 46 #define NUM_MBUFS 65536 47 #define POOL_CACHE_SIZE 256 48 49 /* Global list of available compression devices. */ 50 struct compress_dev { 51 struct rte_compressdev_info cdev_info; /* includes device friendly name */ 52 uint8_t cdev_id; /* identifier for the device */ 53 void *comp_xform; /* shared private xform for comp on this PMD */ 54 void *decomp_xform; /* shared private xform for decomp on this PMD */ 55 bool sgl_in; 56 bool sgl_out; 57 TAILQ_ENTRY(compress_dev) link; 58 }; 59 static TAILQ_HEAD(, compress_dev) g_compress_devs = TAILQ_HEAD_INITIALIZER(g_compress_devs); 60 61 #define MAX_NUM_QP 48 62 /* Global list and lock for unique device/queue pair combos */ 63 struct comp_device_qp { 64 struct compress_dev *device; /* ptr to compression device */ 65 uint8_t qp; /* queue pair for this node */ 66 struct compress_io_channel *chan; 67 TAILQ_ENTRY(comp_device_qp) link; 68 }; 69 static TAILQ_HEAD(, comp_device_qp) g_comp_device_qp = TAILQ_HEAD_INITIALIZER(g_comp_device_qp); 70 static pthread_mutex_t g_comp_device_qp_lock = PTHREAD_MUTEX_INITIALIZER; 71 72 struct compress_io_channel { 73 char *drv_name; /* name of the compression device driver */ 74 struct comp_device_qp *device_qp; 75 struct spdk_poller *poller; 76 struct rte_mbuf **src_mbufs; 77 struct rte_mbuf **dst_mbufs; 78 STAILQ_HEAD(, spdk_accel_task) queued_tasks; 79 }; 80 81 /* Shared mempools between all devices on this system */ 82 static struct rte_mempool *g_mbuf_mp = NULL; /* mbuf mempool */ 83 static struct rte_mempool *g_comp_op_mp = NULL; /* comp operations, must be rte* mempool */ 84 static struct rte_mbuf_ext_shared_info g_shinfo = {}; /* used by DPDK mbuf macros */ 85 static bool g_qat_available = false; 86 static bool g_mlx5_pci_available = false; 87 static bool g_uadk_available = false; 88 89 /* Create shared (between all ops per PMD) compress xforms. */ 90 static struct rte_comp_xform g_comp_xform = { 91 .type = RTE_COMP_COMPRESS, 92 .compress = { 93 .algo = RTE_COMP_ALGO_DEFLATE, 94 .deflate.huffman = RTE_COMP_HUFFMAN_DEFAULT, 95 .level = RTE_COMP_LEVEL_MAX, 96 .window_size = DEFAULT_WINDOW_SIZE, 97 .chksum = RTE_COMP_CHECKSUM_NONE, 98 .hash_algo = RTE_COMP_HASH_ALGO_NONE 99 } 100 }; 101 /* Create shared (between all ops per PMD) decompress xforms. */ 102 static struct rte_comp_xform g_decomp_xform = { 103 .type = RTE_COMP_DECOMPRESS, 104 .decompress = { 105 .algo = RTE_COMP_ALGO_DEFLATE, 106 .chksum = RTE_COMP_CHECKSUM_NONE, 107 .window_size = DEFAULT_WINDOW_SIZE, 108 .hash_algo = RTE_COMP_HASH_ALGO_NONE 109 } 110 }; 111 112 /* Dummy function used by DPDK to free ext attached buffers 113 * to mbufs, we free them ourselves but this callback has to 114 * be here. 115 */ 116 static void 117 shinfo_free_cb(void *arg1, void *arg2) 118 { 119 } 120 121 /* Called by accel_init_compress_drivers() to init each discovered compression device */ 122 static int 123 create_compress_dev(uint8_t index) 124 { 125 struct compress_dev *device; 126 uint16_t q_pairs; 127 uint8_t cdev_id; 128 int rc, i; 129 struct comp_device_qp *dev_qp; 130 struct comp_device_qp *tmp_qp; 131 132 device = calloc(1, sizeof(struct compress_dev)); 133 if (!device) { 134 return -ENOMEM; 135 } 136 137 /* Get details about this device. */ 138 rte_compressdev_info_get(index, &device->cdev_info); 139 140 cdev_id = device->cdev_id = index; 141 142 /* Zero means no limit so choose number of lcores. */ 143 if (device->cdev_info.max_nb_queue_pairs == 0) { 144 q_pairs = MAX_NUM_QP; 145 } else { 146 q_pairs = spdk_min(device->cdev_info.max_nb_queue_pairs, MAX_NUM_QP); 147 } 148 149 /* Configure the compression device. */ 150 struct rte_compressdev_config config = { 151 .socket_id = rte_socket_id(), 152 .nb_queue_pairs = q_pairs, 153 .max_nb_priv_xforms = NUM_MAX_XFORMS, 154 .max_nb_streams = 0 155 }; 156 rc = rte_compressdev_configure(cdev_id, &config); 157 if (rc < 0) { 158 SPDK_ERRLOG("Failed to configure compressdev %u\n", cdev_id); 159 goto err_close; 160 } 161 162 /* Pre-setup all potential qpairs now and assign them in the channel 163 * callback. 164 */ 165 for (i = 0; i < q_pairs; i++) { 166 rc = rte_compressdev_queue_pair_setup(cdev_id, i, 167 NUM_MAX_INFLIGHT_OPS, 168 rte_socket_id()); 169 if (rc) { 170 if (i > 0) { 171 q_pairs = i; 172 SPDK_NOTICELOG("FYI failed to setup a queue pair on " 173 "compressdev %u with error %u " 174 "so limiting to %u qpairs\n", 175 cdev_id, rc, q_pairs); 176 break; 177 } else { 178 SPDK_ERRLOG("Failed to setup queue pair on " 179 "compressdev %u with error %u\n", cdev_id, rc); 180 rc = -EINVAL; 181 goto err_close; 182 } 183 } 184 } 185 186 rc = rte_compressdev_start(cdev_id); 187 if (rc < 0) { 188 SPDK_ERRLOG("Failed to start device %u: error %d\n", 189 cdev_id, rc); 190 goto err_close; 191 } 192 193 if (device->cdev_info.capabilities->comp_feature_flags & RTE_COMP_FF_SHAREABLE_PRIV_XFORM) { 194 rc = rte_compressdev_private_xform_create(cdev_id, &g_comp_xform, 195 &device->comp_xform); 196 if (rc < 0) { 197 SPDK_ERRLOG("Failed to create private comp xform device %u: error %d\n", 198 cdev_id, rc); 199 goto err_stop; 200 } 201 202 rc = rte_compressdev_private_xform_create(cdev_id, &g_decomp_xform, 203 &device->decomp_xform); 204 if (rc) { 205 SPDK_ERRLOG("Failed to create private decomp xform device %u: error %d\n", 206 cdev_id, rc); 207 goto err_stop; 208 } 209 } else { 210 SPDK_ERRLOG("PMD does not support shared transforms\n"); 211 goto err_stop; 212 } 213 214 /* Build up list of device/qp combinations */ 215 for (i = 0; i < q_pairs; i++) { 216 dev_qp = calloc(1, sizeof(struct comp_device_qp)); 217 if (!dev_qp) { 218 rc = -ENOMEM; 219 goto err_qp; 220 } 221 dev_qp->device = device; 222 dev_qp->qp = i; 223 dev_qp->chan = NULL; 224 TAILQ_INSERT_TAIL(&g_comp_device_qp, dev_qp, link); 225 } 226 227 TAILQ_INSERT_TAIL(&g_compress_devs, device, link); 228 229 if (strcmp(device->cdev_info.driver_name, QAT_PMD) == 0) { 230 g_qat_available = true; 231 } 232 233 if (strcmp(device->cdev_info.driver_name, MLX5_PMD) == 0) { 234 g_mlx5_pci_available = true; 235 } 236 237 if (strcmp(device->cdev_info.driver_name, UADK_PMD) == 0) { 238 g_uadk_available = true; 239 } 240 241 return 0; 242 243 err_qp: 244 TAILQ_FOREACH_SAFE(dev_qp, &g_comp_device_qp, link, tmp_qp) { 245 TAILQ_REMOVE(&g_comp_device_qp, dev_qp, link); 246 free(dev_qp); 247 } 248 err_stop: 249 rte_compressdev_stop(cdev_id); 250 err_close: 251 rte_compressdev_close(cdev_id); 252 free(device); 253 return rc; 254 } 255 256 /* Called from driver init entry point, accel_compress_init() */ 257 static int 258 accel_init_compress_drivers(void) 259 { 260 uint8_t cdev_count, i; 261 struct compress_dev *tmp_dev; 262 struct compress_dev *device; 263 int rc; 264 265 /* If we have no compression devices, report error to fallback on other modules. */ 266 cdev_count = rte_compressdev_count(); 267 if (cdev_count == 0) { 268 return -ENODEV; 269 } 270 if (cdev_count > RTE_COMPRESS_MAX_DEVS) { 271 SPDK_ERRLOG("invalid device count from rte_compressdev_count()\n"); 272 return -EINVAL; 273 } 274 275 g_mbuf_offset = rte_mbuf_dynfield_register(&rte_mbuf_dynfield_io_context); 276 if (g_mbuf_offset < 0) { 277 SPDK_ERRLOG("error registering dynamic field with DPDK\n"); 278 return -EINVAL; 279 } 280 281 g_mbuf_mp = rte_pktmbuf_pool_create("comp_mbuf_mp", NUM_MBUFS, POOL_CACHE_SIZE, 282 sizeof(struct rte_mbuf), 0, rte_socket_id()); 283 if (g_mbuf_mp == NULL) { 284 SPDK_ERRLOG("Cannot create mbuf pool\n"); 285 rc = -ENOMEM; 286 goto error_create_mbuf; 287 } 288 289 g_comp_op_mp = rte_comp_op_pool_create("comp_op_pool", NUM_MBUFS, POOL_CACHE_SIZE, 290 0, rte_socket_id()); 291 if (g_comp_op_mp == NULL) { 292 SPDK_ERRLOG("Cannot create comp op pool\n"); 293 rc = -ENOMEM; 294 goto error_create_op; 295 } 296 297 /* Init all devices */ 298 for (i = 0; i < cdev_count; i++) { 299 rc = create_compress_dev(i); 300 if (rc != 0) { 301 goto error_create_compress_devs; 302 } 303 } 304 305 if (g_qat_available == true) { 306 SPDK_NOTICELOG("initialized QAT PMD\n"); 307 } 308 309 g_shinfo.free_cb = shinfo_free_cb; 310 311 return 0; 312 313 /* Error cleanup paths. */ 314 error_create_compress_devs: 315 TAILQ_FOREACH_SAFE(device, &g_compress_devs, link, tmp_dev) { 316 TAILQ_REMOVE(&g_compress_devs, device, link); 317 free(device); 318 } 319 error_create_op: 320 error_create_mbuf: 321 rte_mempool_free(g_mbuf_mp); 322 323 return rc; 324 } 325 326 int 327 accel_compressdev_enable_probe(enum compress_pmd *opts) 328 { 329 g_opts = *opts; 330 g_compressdev_enable = true; 331 332 return 0; 333 } 334 335 static int 336 _setup_compress_mbuf(struct rte_mbuf **mbufs, int *mbuf_total, uint64_t *total_length, 337 struct iovec *iovs, int iovcnt, struct spdk_accel_task *task) 338 { 339 uint64_t iovec_length, updated_length, phys_addr; 340 uint64_t processed, mbuf_length, remainder; 341 uint8_t *current_base = NULL; 342 int iov_index, mbuf_index; 343 int rc = 0; 344 345 /* Setup mbufs */ 346 iov_index = mbuf_index = 0; 347 while (iov_index < iovcnt) { 348 349 processed = 0; 350 iovec_length = iovs[iov_index].iov_len; 351 352 current_base = iovs[iov_index].iov_base; 353 if (total_length) { 354 *total_length += iovec_length; 355 } 356 357 assert(mbufs[mbuf_index] != NULL); 358 *RTE_MBUF_DYNFIELD(mbufs[mbuf_index], g_mbuf_offset, uint64_t *) = (uint64_t)task; 359 360 do { 361 /* new length is min of remaining left or max mbuf size of MBUF_SPLIT */ 362 mbuf_length = updated_length = spdk_min(MBUF_SPLIT, iovec_length - processed); 363 364 phys_addr = spdk_vtophys((void *)current_base, &updated_length); 365 366 rte_pktmbuf_attach_extbuf(mbufs[mbuf_index], 367 current_base, 368 phys_addr, 369 updated_length, 370 &g_shinfo); 371 rte_pktmbuf_append(mbufs[mbuf_index], updated_length); 372 remainder = mbuf_length - updated_length; 373 374 /* although the mbufs were preallocated, we still need to chain them */ 375 if (mbuf_index > 0) { 376 rte_pktmbuf_chain(mbufs[0], mbufs[mbuf_index]); 377 } 378 379 /* keep track of the total we've put into the mbuf chain */ 380 processed += updated_length; 381 /* bump the base by what was previously added */ 382 current_base += updated_length; 383 384 /* If we crossed 2MB boundary we need another mbuf for the remainder */ 385 if (remainder > 0) { 386 387 assert(remainder <= MBUF_SPLIT); 388 389 /* allocate an mbuf at the end of the array */ 390 rc = rte_pktmbuf_alloc_bulk(g_mbuf_mp, 391 (struct rte_mbuf **)&mbufs[*mbuf_total], 1); 392 if (rc) { 393 SPDK_ERRLOG("ERROR trying to get an extra mbuf!\n"); 394 return -1; 395 } 396 (*mbuf_total)++; 397 mbuf_index++; 398 *RTE_MBUF_DYNFIELD(mbufs[mbuf_index], g_mbuf_offset, uint64_t *) = (uint64_t)task; 399 400 /* bump the base by what was previously added */ 401 current_base += updated_length; 402 403 updated_length = remainder; 404 phys_addr = spdk_vtophys((void *)current_base, &updated_length); 405 406 /* assert we don't cross another */ 407 assert(remainder == updated_length); 408 409 rte_pktmbuf_attach_extbuf(mbufs[mbuf_index], 410 current_base, 411 phys_addr, 412 remainder, 413 &g_shinfo); 414 rte_pktmbuf_append(mbufs[mbuf_index], remainder); 415 rte_pktmbuf_chain(mbufs[0], mbufs[mbuf_index]); 416 417 /* keep track of the total we've put into the mbuf chain */ 418 processed += remainder; 419 } 420 421 mbuf_index++; 422 423 } while (processed < iovec_length); 424 425 assert(processed == iovec_length); 426 iov_index++; 427 } 428 429 return 0; 430 } 431 432 static int 433 _compress_operation(struct compress_io_channel *chan, struct spdk_accel_task *task) 434 { 435 int dst_iovcnt = task->d.iovcnt; 436 struct iovec *dst_iovs = task->d.iovs; 437 int src_iovcnt = task->s.iovcnt; 438 struct iovec *src_iovs = task->s.iovs; 439 struct rte_comp_op *comp_op; 440 uint8_t cdev_id; 441 uint64_t total_length = 0; 442 int rc = 0, i; 443 int src_mbuf_total = 0; 444 int dst_mbuf_total = 0; 445 bool device_error = false; 446 bool compress = (task->op_code == SPDK_ACCEL_OPC_COMPRESS); 447 448 assert(chan->device_qp->device != NULL); 449 cdev_id = chan->device_qp->device->cdev_id; 450 451 /* calc our mbuf totals based on max MBUF size allowed so we can pre-alloc mbufs in bulk */ 452 for (i = 0 ; i < src_iovcnt; i++) { 453 src_mbuf_total += spdk_divide_round_up(src_iovs[i].iov_len, MBUF_SPLIT); 454 } 455 for (i = 0 ; i < dst_iovcnt; i++) { 456 dst_mbuf_total += spdk_divide_round_up(dst_iovs[i].iov_len, MBUF_SPLIT); 457 } 458 459 comp_op = rte_comp_op_alloc(g_comp_op_mp); 460 if (!comp_op) { 461 SPDK_ERRLOG("trying to get a comp op!\n"); 462 rc = -ENOMEM; 463 goto error_get_op; 464 } 465 466 /* get an mbuf per iov, src and dst */ 467 rc = rte_pktmbuf_alloc_bulk(g_mbuf_mp, chan->src_mbufs, src_mbuf_total); 468 if (rc) { 469 SPDK_ERRLOG("ERROR trying to get src_mbufs!\n"); 470 rc = -ENOMEM; 471 goto error_get_src; 472 } 473 assert(chan->src_mbufs[0]); 474 475 rc = rte_pktmbuf_alloc_bulk(g_mbuf_mp, chan->dst_mbufs, dst_mbuf_total); 476 if (rc) { 477 SPDK_ERRLOG("ERROR trying to get dst_mbufs!\n"); 478 rc = -ENOMEM; 479 goto error_get_dst; 480 } 481 assert(chan->dst_mbufs[0]); 482 483 rc = _setup_compress_mbuf(chan->src_mbufs, &src_mbuf_total, &total_length, 484 src_iovs, src_iovcnt, task); 485 486 if (rc < 0) { 487 goto error_src_dst; 488 } 489 if (!chan->device_qp->device->sgl_in && src_mbuf_total > 1) { 490 SPDK_ERRLOG("Src buffer uses chained mbufs but driver %s doesn't support SGL input\n", 491 chan->drv_name); 492 rc = -EINVAL; 493 goto error_src_dst; 494 } 495 496 comp_op->m_src = chan->src_mbufs[0]; 497 comp_op->src.offset = 0; 498 comp_op->src.length = total_length; 499 500 rc = _setup_compress_mbuf(chan->dst_mbufs, &dst_mbuf_total, NULL, 501 dst_iovs, dst_iovcnt, task); 502 if (rc < 0) { 503 goto error_src_dst; 504 } 505 if (!chan->device_qp->device->sgl_out && dst_mbuf_total > 1) { 506 SPDK_ERRLOG("Dst buffer uses chained mbufs but driver %s doesn't support SGL output\n", 507 chan->drv_name); 508 rc = -EINVAL; 509 goto error_src_dst; 510 } 511 512 comp_op->m_dst = chan->dst_mbufs[0]; 513 comp_op->dst.offset = 0; 514 515 if (compress == true) { 516 comp_op->private_xform = chan->device_qp->device->comp_xform; 517 } else { 518 comp_op->private_xform = chan->device_qp->device->decomp_xform; 519 } 520 521 comp_op->op_type = RTE_COMP_OP_STATELESS; 522 comp_op->flush_flag = RTE_COMP_FLUSH_FINAL; 523 524 rc = rte_compressdev_enqueue_burst(cdev_id, chan->device_qp->qp, &comp_op, 1); 525 assert(rc <= 1); 526 527 /* We always expect 1 got queued, if 0 then we need to queue it up. */ 528 if (rc == 1) { 529 return 0; 530 } else if (comp_op->status == RTE_COMP_OP_STATUS_NOT_PROCESSED) { 531 rc = -EAGAIN; 532 } else { 533 device_error = true; 534 } 535 536 /* Error cleanup paths. */ 537 error_src_dst: 538 rte_pktmbuf_free_bulk(chan->dst_mbufs, dst_iovcnt); 539 error_get_dst: 540 rte_pktmbuf_free_bulk(chan->src_mbufs, src_iovcnt); 541 error_get_src: 542 rte_comp_op_free(comp_op); 543 error_get_op: 544 545 if (device_error == true) { 546 /* There was an error sending the op to the device, most 547 * likely with the parameters. 548 */ 549 SPDK_ERRLOG("Compression API returned 0x%x\n", comp_op->status); 550 return -EINVAL; 551 } 552 if (rc != -ENOMEM && rc != -EAGAIN) { 553 return rc; 554 } 555 556 STAILQ_INSERT_TAIL(&chan->queued_tasks, task, link); 557 return 0; 558 } 559 560 /* Poller for the DPDK compression driver. */ 561 static int 562 comp_dev_poller(void *args) 563 { 564 struct compress_io_channel *chan = args; 565 uint8_t cdev_id; 566 struct rte_comp_op *deq_ops[NUM_MAX_INFLIGHT_OPS]; 567 uint16_t num_deq; 568 struct spdk_accel_task *task, *task_to_resubmit; 569 int rc, i, status; 570 571 assert(chan->device_qp->device != NULL); 572 cdev_id = chan->device_qp->device->cdev_id; 573 574 num_deq = rte_compressdev_dequeue_burst(cdev_id, chan->device_qp->qp, deq_ops, 575 NUM_MAX_INFLIGHT_OPS); 576 for (i = 0; i < num_deq; i++) { 577 578 /* We store this off regardless of success/error so we know how to construct the 579 * next task 580 */ 581 task = (struct spdk_accel_task *)*RTE_MBUF_DYNFIELD(deq_ops[i]->m_src, g_mbuf_offset, 582 uint64_t *); 583 status = deq_ops[i]->status; 584 585 if (spdk_likely(status == RTE_COMP_OP_STATUS_SUCCESS)) { 586 if (task->output_size != NULL) { 587 *task->output_size = deq_ops[i]->produced; 588 } 589 } else { 590 SPDK_NOTICELOG("Deque status %u\n", status); 591 } 592 593 spdk_accel_task_complete(task, status); 594 595 /* Now free both mbufs and the compress operation. The rte_pktmbuf_free() 596 * call takes care of freeing all of the mbufs in the chain back to their 597 * original pool. 598 */ 599 rte_pktmbuf_free(deq_ops[i]->m_src); 600 rte_pktmbuf_free(deq_ops[i]->m_dst); 601 602 /* There is no bulk free for com ops so we have to free them one at a time 603 * here however it would be rare that we'd ever have more than 1 at a time 604 * anyways. 605 */ 606 rte_comp_op_free(deq_ops[i]); 607 608 /* Check if there are any pending comp ops to process, only pull one 609 * at a time off as _compress_operation() may re-queue the op. 610 */ 611 if (!STAILQ_EMPTY(&chan->queued_tasks)) { 612 task_to_resubmit = STAILQ_FIRST(&chan->queued_tasks); 613 rc = _compress_operation(chan, task_to_resubmit); 614 if (rc == 0) { 615 STAILQ_REMOVE_HEAD(&chan->queued_tasks, link); 616 } 617 } 618 } 619 620 return num_deq == 0 ? SPDK_POLLER_IDLE : SPDK_POLLER_BUSY; 621 } 622 623 static int 624 _process_single_task(struct spdk_io_channel *ch, struct spdk_accel_task *task) 625 { 626 struct compress_io_channel *chan = spdk_io_channel_get_ctx(ch); 627 int rc; 628 629 rc = _compress_operation(chan, task); 630 if (rc) { 631 SPDK_ERRLOG("Error (%d) in compress operation\n", rc); 632 assert(false); 633 } 634 635 return rc; 636 } 637 638 static int 639 compress_submit_tasks(struct spdk_io_channel *ch, struct spdk_accel_task *first_task) 640 { 641 struct compress_io_channel *chan = spdk_io_channel_get_ctx(ch); 642 struct spdk_accel_task *task, *tmp; 643 int rc = 0; 644 645 task = first_task; 646 647 if (!STAILQ_EMPTY(&chan->queued_tasks)) { 648 goto queue_tasks; 649 } 650 651 /* The caller will either submit a single task or a group of tasks that are 652 * linked together but they cannot be on a list. For example, see poller 653 * where a list of queued tasks is being resubmitted, the list they are on 654 * is initialized after saving off the first task from the list which is then 655 * passed in here. Similar thing is done in the accel framework. 656 */ 657 while (task) { 658 tmp = STAILQ_NEXT(task, link); 659 rc = _process_single_task(ch, task); 660 661 if (rc == -EBUSY) { 662 goto queue_tasks; 663 } else if (rc) { 664 spdk_accel_task_complete(task, rc); 665 } 666 task = tmp; 667 } 668 669 return 0; 670 671 queue_tasks: 672 while (task != NULL) { 673 tmp = STAILQ_NEXT(task, link); 674 STAILQ_INSERT_TAIL(&chan->queued_tasks, task, link); 675 task = tmp; 676 } 677 return 0; 678 } 679 680 static bool 681 _set_pmd(struct compress_io_channel *chan) 682 { 683 684 /* Note: the compress_isal PMD is not supported as accel_fw supports native ISAL 685 * using the accel_sw module */ 686 if (g_opts == COMPRESS_PMD_AUTO) { 687 if (g_qat_available) { 688 chan->drv_name = QAT_PMD; 689 } else if (g_mlx5_pci_available) { 690 chan->drv_name = MLX5_PMD; 691 } else if (g_uadk_available) { 692 chan->drv_name = UADK_PMD; 693 } 694 } else if (g_opts == COMPRESS_PMD_QAT_ONLY && g_qat_available) { 695 chan->drv_name = QAT_PMD; 696 } else if (g_opts == COMPRESS_PMD_MLX5_PCI_ONLY && g_mlx5_pci_available) { 697 chan->drv_name = MLX5_PMD; 698 } else if (g_opts == COMPRESS_PMD_UADK_ONLY && g_uadk_available) { 699 chan->drv_name = UADK_PMD; 700 } else { 701 SPDK_ERRLOG("Requested PMD is not available.\n"); 702 return false; 703 } 704 SPDK_NOTICELOG("Channel %p PMD being used: %s\n", chan, chan->drv_name); 705 return true; 706 } 707 708 static int compress_create_cb(void *io_device, void *ctx_buf); 709 static void compress_destroy_cb(void *io_device, void *ctx_buf); 710 static struct spdk_accel_module_if g_compress_module; 711 static int 712 accel_compress_init(void) 713 { 714 int rc; 715 716 if (!g_compressdev_enable) { 717 return -EINVAL; 718 } 719 720 if (g_opts == COMPRESS_PMD_UADK_ONLY) { 721 char *driver_name = UADK_PMD; 722 723 rc = rte_vdev_init(driver_name, NULL); 724 if (rc) { 725 SPDK_NOTICELOG("Failed to create virtual PMD %s: error %d. " 726 "Possibly %s is not supported by DPDK library. " 727 "Keep going...\n", driver_name, rc, driver_name); 728 } 729 } 730 731 rc = accel_init_compress_drivers(); 732 if (rc) { 733 assert(TAILQ_EMPTY(&g_compress_devs)); 734 return rc; 735 } 736 737 g_compressdev_initialized = true; 738 spdk_io_device_register(&g_compress_module, compress_create_cb, compress_destroy_cb, 739 sizeof(struct compress_io_channel), "compressdev_accel_module"); 740 return 0; 741 } 742 743 static int 744 compress_create_cb(void *io_device, void *ctx_buf) 745 { 746 struct compress_io_channel *chan = ctx_buf; 747 const struct rte_compressdev_capabilities *capab; 748 struct comp_device_qp *device_qp; 749 size_t length; 750 751 if (_set_pmd(chan) == false) { 752 assert(false); 753 return -ENODEV; 754 } 755 756 /* The following variable length arrays of mbuf pointers are required to submit to compressdev */ 757 length = NUM_MBUFS * sizeof(void *); 758 chan->src_mbufs = spdk_zmalloc(length, 0x40, NULL, 759 SPDK_ENV_LCORE_ID_ANY, SPDK_MALLOC_DMA); 760 if (chan->src_mbufs == NULL) { 761 return -ENOMEM; 762 } 763 chan->dst_mbufs = spdk_zmalloc(length, 0x40, NULL, 764 SPDK_ENV_LCORE_ID_ANY, SPDK_MALLOC_DMA); 765 if (chan->dst_mbufs == NULL) { 766 free(chan->src_mbufs); 767 return -ENOMEM; 768 } 769 770 chan->poller = SPDK_POLLER_REGISTER(comp_dev_poller, chan, 0); 771 STAILQ_INIT(&chan->queued_tasks); 772 773 pthread_mutex_lock(&g_comp_device_qp_lock); 774 TAILQ_FOREACH(device_qp, &g_comp_device_qp, link) { 775 if (strcmp(device_qp->device->cdev_info.driver_name, chan->drv_name) == 0) { 776 if (device_qp->chan == NULL) { 777 chan->device_qp = device_qp; 778 device_qp->chan = chan; 779 break; 780 } 781 } 782 } 783 pthread_mutex_unlock(&g_comp_device_qp_lock); 784 785 if (chan->device_qp == NULL) { 786 SPDK_ERRLOG("out of qpairs, cannot assign one\n"); 787 assert(false); 788 return -ENOMEM; 789 } else { 790 capab = rte_compressdev_capability_get(0, RTE_COMP_ALGO_DEFLATE); 791 792 if (capab->comp_feature_flags & (RTE_COMP_FF_OOP_SGL_IN_SGL_OUT | RTE_COMP_FF_OOP_SGL_IN_LB_OUT)) { 793 chan->device_qp->device->sgl_in = true; 794 } 795 796 if (capab->comp_feature_flags & (RTE_COMP_FF_OOP_SGL_IN_SGL_OUT | RTE_COMP_FF_OOP_LB_IN_SGL_OUT)) { 797 chan->device_qp->device->sgl_out = true; 798 } 799 } 800 801 return 0; 802 } 803 804 static void 805 accel_compress_write_config_json(struct spdk_json_write_ctx *w) 806 { 807 if (g_compressdev_enable) { 808 spdk_json_write_object_begin(w); 809 spdk_json_write_named_string(w, "method", "compressdev_scan_accel_module"); 810 spdk_json_write_named_object_begin(w, "params"); 811 spdk_json_write_named_uint32(w, "pmd", g_opts); 812 spdk_json_write_object_end(w); 813 spdk_json_write_object_end(w); 814 } 815 } 816 817 static void 818 compress_destroy_cb(void *io_device, void *ctx_buf) 819 { 820 struct compress_io_channel *chan = ctx_buf; 821 struct comp_device_qp *device_qp = chan->device_qp; 822 823 spdk_free(chan->src_mbufs); 824 spdk_free(chan->dst_mbufs); 825 826 spdk_poller_unregister(&chan->poller); 827 828 pthread_mutex_lock(&g_comp_device_qp_lock); 829 chan->device_qp = NULL; 830 device_qp->chan = NULL; 831 pthread_mutex_unlock(&g_comp_device_qp_lock); 832 } 833 834 static size_t 835 accel_compress_get_ctx_size(void) 836 { 837 return 0; 838 } 839 840 static bool 841 compress_supports_opcode(enum spdk_accel_opcode opc) 842 { 843 if (g_mlx5_pci_available || g_qat_available || g_uadk_available) { 844 switch (opc) { 845 case SPDK_ACCEL_OPC_COMPRESS: 846 case SPDK_ACCEL_OPC_DECOMPRESS: 847 return true; 848 default: 849 break; 850 } 851 } 852 853 return false; 854 } 855 856 static bool 857 compress_supports_algo(enum spdk_accel_comp_algo algo) 858 { 859 if (algo == SPDK_ACCEL_COMP_ALGO_DEFLATE) { 860 return true; 861 } 862 863 return false; 864 } 865 866 static int 867 compress_get_level_range(enum spdk_accel_comp_algo algo, 868 uint32_t *min_level, uint32_t *max_level) 869 { 870 switch (algo) { 871 case SPDK_ACCEL_COMP_ALGO_DEFLATE: 872 /** 873 * Hardware compression is set to the highest level by default and 874 * will not be affected by cover parameters in actual operation. 875 * This is set to the maximum range. 876 * */ 877 *min_level = 0; 878 *max_level = 0; 879 880 return 0; 881 default: 882 return -EINVAL; 883 } 884 } 885 886 static struct spdk_io_channel * 887 compress_get_io_channel(void) 888 { 889 return spdk_get_io_channel(&g_compress_module); 890 } 891 892 static void accel_compress_exit(void *ctx); 893 static struct spdk_accel_module_if g_compress_module = { 894 .module_init = accel_compress_init, 895 .module_fini = accel_compress_exit, 896 .write_config_json = accel_compress_write_config_json, 897 .get_ctx_size = accel_compress_get_ctx_size, 898 .name = "dpdk_compressdev", 899 .supports_opcode = compress_supports_opcode, 900 .get_io_channel = compress_get_io_channel, 901 .submit_tasks = compress_submit_tasks, 902 .compress_supports_algo = compress_supports_algo, 903 .get_compress_level_range = compress_get_level_range, 904 }; 905 906 void 907 accel_dpdk_compressdev_enable(void) 908 { 909 spdk_accel_module_list_add(&g_compress_module); 910 } 911 912 /* Callback for unregistering the IO device. */ 913 static void 914 _device_unregister_cb(void *io_device) 915 { 916 struct comp_device_qp *dev_qp; 917 struct compress_dev *device; 918 919 while ((device = TAILQ_FIRST(&g_compress_devs))) { 920 TAILQ_REMOVE(&g_compress_devs, device, link); 921 rte_compressdev_stop(device->cdev_id); 922 rte_compressdev_close(device->cdev_id); 923 free(device); 924 } 925 926 while ((dev_qp = TAILQ_FIRST(&g_comp_device_qp))) { 927 TAILQ_REMOVE(&g_comp_device_qp, dev_qp, link); 928 free(dev_qp); 929 } 930 931 if (g_opts == COMPRESS_PMD_UADK_ONLY) { 932 rte_vdev_uninit(UADK_PMD); 933 } 934 935 pthread_mutex_destroy(&g_comp_device_qp_lock); 936 937 rte_mempool_free(g_comp_op_mp); 938 rte_mempool_free(g_mbuf_mp); 939 940 spdk_accel_module_finish(); 941 } 942 943 static void 944 accel_compress_exit(void *ctx) 945 { 946 if (g_compressdev_initialized) { 947 spdk_io_device_unregister(&g_compress_module, _device_unregister_cb); 948 g_compressdev_initialized = false; 949 } else { 950 spdk_accel_module_finish(); 951 } 952 } 953