1 /* SPDX-License-Identifier: BSD-3-Clause 2 * Copyright (C) 2018 Intel Corporation. 3 * All rights reserved. 4 * Copyright (c) 2021, 2022 NVIDIA CORPORATION & AFFILIATES. All rights reserved. 5 */ 6 7 #include "accel_dpdk_compressdev.h" 8 #include "spdk/accel_module.h" 9 10 #include "spdk/stdinc.h" 11 #include "spdk/rpc.h" 12 #include "spdk/env.h" 13 #include "spdk/endian.h" 14 #include "spdk/string.h" 15 #include "spdk/thread.h" 16 #include "spdk/util.h" 17 #include "spdk/likely.h" 18 19 #include "spdk/log.h" 20 21 #include <rte_config.h> 22 #include <rte_bus_vdev.h> 23 #include <rte_compressdev.h> 24 #include <rte_comp.h> 25 #include <rte_mbuf_dyn.h> 26 27 /* Used to store IO context in mbuf */ 28 static const struct rte_mbuf_dynfield rte_mbuf_dynfield_io_context = { 29 .name = "context_accel_comp", 30 .size = sizeof(uint64_t), 31 .align = __alignof__(uint64_t), 32 .flags = 0, 33 }; 34 static int g_mbuf_offset; 35 static enum compress_pmd g_opts; 36 static bool g_compressdev_enable = false; 37 static bool g_compressdev_initialized = false; 38 39 #define NUM_MAX_XFORMS 2 40 #define NUM_MAX_INFLIGHT_OPS 128 41 #define DEFAULT_WINDOW_SIZE 15 42 #define MBUF_SPLIT (1UL << DEFAULT_WINDOW_SIZE) 43 #define QAT_PMD "compress_qat" 44 #define MLX5_PMD "mlx5_pci" 45 #define NUM_MBUFS 65536 46 #define POOL_CACHE_SIZE 256 47 48 /* Global list of available compression devices. */ 49 struct compress_dev { 50 struct rte_compressdev_info cdev_info; /* includes device friendly name */ 51 uint8_t cdev_id; /* identifier for the device */ 52 void *comp_xform; /* shared private xform for comp on this PMD */ 53 void *decomp_xform; /* shared private xform for decomp on this PMD */ 54 bool sgl_in; 55 bool sgl_out; 56 TAILQ_ENTRY(compress_dev) link; 57 }; 58 static TAILQ_HEAD(, compress_dev) g_compress_devs = TAILQ_HEAD_INITIALIZER(g_compress_devs); 59 60 #define MAX_NUM_QP 48 61 /* Global list and lock for unique device/queue pair combos */ 62 struct comp_device_qp { 63 struct compress_dev *device; /* ptr to compression device */ 64 uint8_t qp; /* queue pair for this node */ 65 struct compress_io_channel *chan; 66 TAILQ_ENTRY(comp_device_qp) link; 67 }; 68 static TAILQ_HEAD(, comp_device_qp) g_comp_device_qp = TAILQ_HEAD_INITIALIZER(g_comp_device_qp); 69 static pthread_mutex_t g_comp_device_qp_lock = PTHREAD_MUTEX_INITIALIZER; 70 71 struct compress_io_channel { 72 char *drv_name; /* name of the compression device driver */ 73 struct comp_device_qp *device_qp; 74 struct spdk_poller *poller; 75 struct rte_mbuf **src_mbufs; 76 struct rte_mbuf **dst_mbufs; 77 TAILQ_HEAD(, spdk_accel_task) queued_tasks; 78 }; 79 80 /* Shared mempools between all devices on this system */ 81 static struct rte_mempool *g_mbuf_mp = NULL; /* mbuf mempool */ 82 static struct rte_mempool *g_comp_op_mp = NULL; /* comp operations, must be rte* mempool */ 83 static struct rte_mbuf_ext_shared_info g_shinfo = {}; /* used by DPDK mbuf macros */ 84 static bool g_qat_available = false; 85 static bool g_mlx5_pci_available = false; 86 87 /* Create shared (between all ops per PMD) compress xforms. */ 88 static struct rte_comp_xform g_comp_xform = { 89 .type = RTE_COMP_COMPRESS, 90 .compress = { 91 .algo = RTE_COMP_ALGO_DEFLATE, 92 .deflate.huffman = RTE_COMP_HUFFMAN_DEFAULT, 93 .level = RTE_COMP_LEVEL_MAX, 94 .window_size = DEFAULT_WINDOW_SIZE, 95 .chksum = RTE_COMP_CHECKSUM_NONE, 96 .hash_algo = RTE_COMP_HASH_ALGO_NONE 97 } 98 }; 99 /* Create shared (between all ops per PMD) decompress xforms. */ 100 static struct rte_comp_xform g_decomp_xform = { 101 .type = RTE_COMP_DECOMPRESS, 102 .decompress = { 103 .algo = RTE_COMP_ALGO_DEFLATE, 104 .chksum = RTE_COMP_CHECKSUM_NONE, 105 .window_size = DEFAULT_WINDOW_SIZE, 106 .hash_algo = RTE_COMP_HASH_ALGO_NONE 107 } 108 }; 109 110 /* Dummy function used by DPDK to free ext attached buffers 111 * to mbufs, we free them ourselves but this callback has to 112 * be here. 113 */ 114 static void 115 shinfo_free_cb(void *arg1, void *arg2) 116 { 117 } 118 119 /* Called by accel_init_compress_drivers() to init each discovered compression device */ 120 static int 121 create_compress_dev(uint8_t index) 122 { 123 struct compress_dev *device; 124 uint16_t q_pairs; 125 uint8_t cdev_id; 126 int rc, i; 127 struct comp_device_qp *dev_qp; 128 struct comp_device_qp *tmp_qp; 129 130 device = calloc(1, sizeof(struct compress_dev)); 131 if (!device) { 132 return -ENOMEM; 133 } 134 135 /* Get details about this device. */ 136 rte_compressdev_info_get(index, &device->cdev_info); 137 138 cdev_id = device->cdev_id = index; 139 140 /* Zero means no limit so choose number of lcores. */ 141 if (device->cdev_info.max_nb_queue_pairs == 0) { 142 q_pairs = MAX_NUM_QP; 143 } else { 144 q_pairs = spdk_min(device->cdev_info.max_nb_queue_pairs, MAX_NUM_QP); 145 } 146 147 /* Configure the compression device. */ 148 struct rte_compressdev_config config = { 149 .socket_id = rte_socket_id(), 150 .nb_queue_pairs = q_pairs, 151 .max_nb_priv_xforms = NUM_MAX_XFORMS, 152 .max_nb_streams = 0 153 }; 154 rc = rte_compressdev_configure(cdev_id, &config); 155 if (rc < 0) { 156 SPDK_ERRLOG("Failed to configure compressdev %u\n", cdev_id); 157 goto err; 158 } 159 160 /* Pre-setup all potential qpairs now and assign them in the channel 161 * callback. 162 */ 163 for (i = 0; i < q_pairs; i++) { 164 rc = rte_compressdev_queue_pair_setup(cdev_id, i, 165 NUM_MAX_INFLIGHT_OPS, 166 rte_socket_id()); 167 if (rc) { 168 if (i > 0) { 169 q_pairs = i; 170 SPDK_NOTICELOG("FYI failed to setup a queue pair on " 171 "compressdev %u with error %u " 172 "so limiting to %u qpairs\n", 173 cdev_id, rc, q_pairs); 174 break; 175 } else { 176 SPDK_ERRLOG("Failed to setup queue pair on " 177 "compressdev %u with error %u\n", cdev_id, rc); 178 rc = -EINVAL; 179 goto err; 180 } 181 } 182 } 183 184 rc = rte_compressdev_start(cdev_id); 185 if (rc < 0) { 186 SPDK_ERRLOG("Failed to start device %u: error %d\n", 187 cdev_id, rc); 188 goto err; 189 } 190 191 if (device->cdev_info.capabilities->comp_feature_flags & RTE_COMP_FF_SHAREABLE_PRIV_XFORM) { 192 rc = rte_compressdev_private_xform_create(cdev_id, &g_comp_xform, 193 &device->comp_xform); 194 if (rc < 0) { 195 SPDK_ERRLOG("Failed to create private comp xform device %u: error %d\n", 196 cdev_id, rc); 197 goto err; 198 } 199 200 rc = rte_compressdev_private_xform_create(cdev_id, &g_decomp_xform, 201 &device->decomp_xform); 202 if (rc) { 203 SPDK_ERRLOG("Failed to create private decomp xform device %u: error %d\n", 204 cdev_id, rc); 205 goto err; 206 } 207 } else { 208 SPDK_ERRLOG("PMD does not support shared transforms\n"); 209 goto err; 210 } 211 212 /* Build up list of device/qp combinations */ 213 for (i = 0; i < q_pairs; i++) { 214 dev_qp = calloc(1, sizeof(struct comp_device_qp)); 215 if (!dev_qp) { 216 rc = -ENOMEM; 217 goto err; 218 } 219 dev_qp->device = device; 220 dev_qp->qp = i; 221 dev_qp->chan = NULL; 222 TAILQ_INSERT_TAIL(&g_comp_device_qp, dev_qp, link); 223 } 224 225 TAILQ_INSERT_TAIL(&g_compress_devs, device, link); 226 227 if (strcmp(device->cdev_info.driver_name, QAT_PMD) == 0) { 228 g_qat_available = true; 229 } 230 231 if (strcmp(device->cdev_info.driver_name, MLX5_PMD) == 0) { 232 g_mlx5_pci_available = true; 233 } 234 235 return 0; 236 237 err: 238 TAILQ_FOREACH_SAFE(dev_qp, &g_comp_device_qp, link, tmp_qp) { 239 TAILQ_REMOVE(&g_comp_device_qp, dev_qp, link); 240 free(dev_qp); 241 } 242 free(device); 243 return rc; 244 } 245 246 /* Called from driver init entry point, accel_compress_init() */ 247 static int 248 accel_init_compress_drivers(void) 249 { 250 uint8_t cdev_count, i; 251 struct compress_dev *tmp_dev; 252 struct compress_dev *device; 253 int rc; 254 255 /* If we have no compression devices, there's no reason to continue. */ 256 cdev_count = rte_compressdev_count(); 257 if (cdev_count == 0) { 258 return 0; 259 } 260 if (cdev_count > RTE_COMPRESS_MAX_DEVS) { 261 SPDK_ERRLOG("invalid device count from rte_compressdev_count()\n"); 262 return -EINVAL; 263 } 264 265 g_mbuf_offset = rte_mbuf_dynfield_register(&rte_mbuf_dynfield_io_context); 266 if (g_mbuf_offset < 0) { 267 SPDK_ERRLOG("error registering dynamic field with DPDK\n"); 268 return -EINVAL; 269 } 270 271 g_mbuf_mp = rte_pktmbuf_pool_create("comp_mbuf_mp", NUM_MBUFS, POOL_CACHE_SIZE, 272 sizeof(struct rte_mbuf), 0, rte_socket_id()); 273 if (g_mbuf_mp == NULL) { 274 SPDK_ERRLOG("Cannot create mbuf pool\n"); 275 rc = -ENOMEM; 276 goto error_create_mbuf; 277 } 278 279 g_comp_op_mp = rte_comp_op_pool_create("comp_op_pool", NUM_MBUFS, POOL_CACHE_SIZE, 280 0, rte_socket_id()); 281 if (g_comp_op_mp == NULL) { 282 SPDK_ERRLOG("Cannot create comp op pool\n"); 283 rc = -ENOMEM; 284 goto error_create_op; 285 } 286 287 /* Init all devices */ 288 for (i = 0; i < cdev_count; i++) { 289 rc = create_compress_dev(i); 290 if (rc != 0) { 291 goto error_create_compress_devs; 292 } 293 } 294 295 if (g_qat_available == true) { 296 SPDK_NOTICELOG("initialized QAT PMD\n"); 297 } 298 299 g_shinfo.free_cb = shinfo_free_cb; 300 301 return 0; 302 303 /* Error cleanup paths. */ 304 error_create_compress_devs: 305 TAILQ_FOREACH_SAFE(device, &g_compress_devs, link, tmp_dev) { 306 TAILQ_REMOVE(&g_compress_devs, device, link); 307 free(device); 308 } 309 error_create_op: 310 error_create_mbuf: 311 rte_mempool_free(g_mbuf_mp); 312 313 return rc; 314 } 315 316 int 317 accel_compressdev_enable_probe(enum compress_pmd *opts) 318 { 319 g_opts = *opts; 320 g_compressdev_enable = true; 321 322 return 0; 323 } 324 325 static int 326 _setup_compress_mbuf(struct rte_mbuf **mbufs, int *mbuf_total, uint64_t *total_length, 327 struct iovec *iovs, int iovcnt, struct spdk_accel_task *task) 328 { 329 uint64_t iovec_length, updated_length, phys_addr; 330 uint64_t processed, mbuf_length, remainder; 331 uint8_t *current_base = NULL; 332 int iov_index, mbuf_index; 333 int rc = 0; 334 335 /* Setup mbufs */ 336 iov_index = mbuf_index = 0; 337 while (iov_index < iovcnt) { 338 339 processed = 0; 340 iovec_length = iovs[iov_index].iov_len; 341 342 current_base = iovs[iov_index].iov_base; 343 if (total_length) { 344 *total_length += iovec_length; 345 } 346 347 assert(mbufs[mbuf_index] != NULL); 348 *RTE_MBUF_DYNFIELD(mbufs[mbuf_index], g_mbuf_offset, uint64_t *) = (uint64_t)task; 349 350 do { 351 /* new length is min of remaining left or max mbuf size of MBUF_SPLIT */ 352 mbuf_length = updated_length = spdk_min(MBUF_SPLIT, iovec_length - processed); 353 354 phys_addr = spdk_vtophys((void *)current_base, &updated_length); 355 356 rte_pktmbuf_attach_extbuf(mbufs[mbuf_index], 357 current_base, 358 phys_addr, 359 updated_length, 360 &g_shinfo); 361 rte_pktmbuf_append(mbufs[mbuf_index], updated_length); 362 remainder = mbuf_length - updated_length; 363 364 /* although the mbufs were preallocated, we still need to chain them */ 365 if (mbuf_index > 0) { 366 rte_pktmbuf_chain(mbufs[0], mbufs[mbuf_index]); 367 } 368 369 /* keep track of the total we've put into the mbuf chain */ 370 processed += updated_length; 371 /* bump the base by what was previously added */ 372 current_base += updated_length; 373 374 /* If we crossed 2MB boundary we need another mbuf for the remainder */ 375 if (remainder > 0) { 376 377 assert(remainder <= MBUF_SPLIT); 378 379 /* allocate an mbuf at the end of the array */ 380 rc = rte_pktmbuf_alloc_bulk(g_mbuf_mp, 381 (struct rte_mbuf **)&mbufs[*mbuf_total], 1); 382 if (rc) { 383 SPDK_ERRLOG("ERROR trying to get an extra mbuf!\n"); 384 return -1; 385 } 386 (*mbuf_total)++; 387 mbuf_index++; 388 *RTE_MBUF_DYNFIELD(mbufs[mbuf_index], g_mbuf_offset, uint64_t *) = (uint64_t)task; 389 390 /* bump the base by what was previously added */ 391 current_base += updated_length; 392 393 updated_length = remainder; 394 phys_addr = spdk_vtophys((void *)current_base, &updated_length); 395 396 /* assert we don't cross another */ 397 assert(remainder == updated_length); 398 399 rte_pktmbuf_attach_extbuf(mbufs[mbuf_index], 400 current_base, 401 phys_addr, 402 remainder, 403 &g_shinfo); 404 rte_pktmbuf_append(mbufs[mbuf_index], remainder); 405 rte_pktmbuf_chain(mbufs[0], mbufs[mbuf_index]); 406 407 /* keep track of the total we've put into the mbuf chain */ 408 processed += remainder; 409 } 410 411 mbuf_index++; 412 413 } while (processed < iovec_length); 414 415 assert(processed == iovec_length); 416 iov_index++; 417 } 418 419 return 0; 420 } 421 422 static int 423 _compress_operation(struct compress_io_channel *chan, struct spdk_accel_task *task) 424 { 425 int dst_iovcnt = task->d.iovcnt; 426 struct iovec *dst_iovs = task->d.iovs; 427 int src_iovcnt = task->s.iovcnt; 428 struct iovec *src_iovs = task->s.iovs; 429 struct rte_comp_op *comp_op; 430 uint8_t cdev_id; 431 uint64_t total_length = 0; 432 int rc = 0, i; 433 int src_mbuf_total = 0; 434 int dst_mbuf_total = 0; 435 bool device_error = false; 436 bool compress = (task->op_code == SPDK_ACCEL_OPC_COMPRESS); 437 438 assert(chan->device_qp->device != NULL); 439 cdev_id = chan->device_qp->device->cdev_id; 440 441 /* calc our mbuf totals based on max MBUF size allowed so we can pre-alloc mbufs in bulk */ 442 for (i = 0 ; i < src_iovcnt; i++) { 443 src_mbuf_total += spdk_divide_round_up(src_iovs[i].iov_len, MBUF_SPLIT); 444 } 445 for (i = 0 ; i < dst_iovcnt; i++) { 446 dst_mbuf_total += spdk_divide_round_up(dst_iovs[i].iov_len, MBUF_SPLIT); 447 } 448 449 comp_op = rte_comp_op_alloc(g_comp_op_mp); 450 if (!comp_op) { 451 SPDK_ERRLOG("trying to get a comp op!\n"); 452 rc = -ENOMEM; 453 goto error_get_op; 454 } 455 456 /* get an mbuf per iov, src and dst */ 457 rc = rte_pktmbuf_alloc_bulk(g_mbuf_mp, chan->src_mbufs, src_mbuf_total); 458 if (rc) { 459 SPDK_ERRLOG("ERROR trying to get src_mbufs!\n"); 460 rc = -ENOMEM; 461 goto error_get_src; 462 } 463 assert(chan->src_mbufs[0]); 464 465 rc = rte_pktmbuf_alloc_bulk(g_mbuf_mp, chan->dst_mbufs, dst_mbuf_total); 466 if (rc) { 467 SPDK_ERRLOG("ERROR trying to get dst_mbufs!\n"); 468 rc = -ENOMEM; 469 goto error_get_dst; 470 } 471 assert(chan->dst_mbufs[0]); 472 473 rc = _setup_compress_mbuf(chan->src_mbufs, &src_mbuf_total, &total_length, 474 src_iovs, src_iovcnt, task); 475 476 if (rc < 0) { 477 goto error_src_dst; 478 } 479 if (!chan->device_qp->device->sgl_in && src_mbuf_total > 1) { 480 SPDK_ERRLOG("Src buffer uses chained mbufs but driver %s doesn't support SGL input\n", 481 chan->drv_name); 482 rc = -EINVAL; 483 goto error_src_dst; 484 } 485 486 comp_op->m_src = chan->src_mbufs[0]; 487 comp_op->src.offset = 0; 488 comp_op->src.length = total_length; 489 490 rc = _setup_compress_mbuf(chan->dst_mbufs, &dst_mbuf_total, NULL, 491 dst_iovs, dst_iovcnt, task); 492 if (rc < 0) { 493 goto error_src_dst; 494 } 495 if (!chan->device_qp->device->sgl_out && dst_mbuf_total > 1) { 496 SPDK_ERRLOG("Dst buffer uses chained mbufs but driver %s doesn't support SGL output\n", 497 chan->drv_name); 498 rc = -EINVAL; 499 goto error_src_dst; 500 } 501 502 comp_op->m_dst = chan->dst_mbufs[0]; 503 comp_op->dst.offset = 0; 504 505 if (compress == true) { 506 comp_op->private_xform = chan->device_qp->device->comp_xform; 507 } else { 508 comp_op->private_xform = chan->device_qp->device->decomp_xform; 509 } 510 511 comp_op->op_type = RTE_COMP_OP_STATELESS; 512 comp_op->flush_flag = RTE_COMP_FLUSH_FINAL; 513 514 rc = rte_compressdev_enqueue_burst(cdev_id, chan->device_qp->qp, &comp_op, 1); 515 assert(rc <= 1); 516 517 /* We always expect 1 got queued, if 0 then we need to queue it up. */ 518 if (rc == 1) { 519 return 0; 520 } else if (comp_op->status == RTE_COMP_OP_STATUS_NOT_PROCESSED) { 521 rc = -EAGAIN; 522 } else { 523 device_error = true; 524 } 525 526 /* Error cleanup paths. */ 527 error_src_dst: 528 rte_pktmbuf_free_bulk(chan->dst_mbufs, dst_iovcnt); 529 error_get_dst: 530 rte_pktmbuf_free_bulk(chan->src_mbufs, src_iovcnt); 531 error_get_src: 532 rte_comp_op_free(comp_op); 533 error_get_op: 534 535 if (device_error == true) { 536 /* There was an error sending the op to the device, most 537 * likely with the parameters. 538 */ 539 SPDK_ERRLOG("Compression API returned 0x%x\n", comp_op->status); 540 return -EINVAL; 541 } 542 if (rc != -ENOMEM && rc != -EAGAIN) { 543 return rc; 544 } 545 546 TAILQ_INSERT_TAIL(&chan->queued_tasks, task, link); 547 return 0; 548 } 549 550 /* Poller for the DPDK compression driver. */ 551 static int 552 comp_dev_poller(void *args) 553 { 554 struct compress_io_channel *chan = args; 555 uint8_t cdev_id; 556 struct rte_comp_op *deq_ops[NUM_MAX_INFLIGHT_OPS]; 557 uint16_t num_deq; 558 struct spdk_accel_task *task, *task_to_resubmit; 559 int rc, i, status; 560 561 assert(chan->device_qp->device != NULL); 562 cdev_id = chan->device_qp->device->cdev_id; 563 564 num_deq = rte_compressdev_dequeue_burst(cdev_id, chan->device_qp->qp, deq_ops, 565 NUM_MAX_INFLIGHT_OPS); 566 for (i = 0; i < num_deq; i++) { 567 568 /* We store this off regardless of success/error so we know how to contruct the 569 * next task 570 */ 571 task = (struct spdk_accel_task *)*RTE_MBUF_DYNFIELD(deq_ops[i]->m_src, g_mbuf_offset, 572 uint64_t *); 573 status = deq_ops[i]->status; 574 575 if (spdk_likely(status == RTE_COMP_OP_STATUS_SUCCESS)) { 576 if (task->output_size != NULL) { 577 *task->output_size = deq_ops[i]->produced; 578 } 579 } else { 580 SPDK_NOTICELOG("Deque status %u\n", status); 581 } 582 583 spdk_accel_task_complete(task, status); 584 585 /* Now free both mbufs and the compress operation. The rte_pktmbuf_free() 586 * call takes care of freeing all of the mbufs in the chain back to their 587 * original pool. 588 */ 589 rte_pktmbuf_free(deq_ops[i]->m_src); 590 rte_pktmbuf_free(deq_ops[i]->m_dst); 591 592 /* There is no bulk free for com ops so we have to free them one at a time 593 * here however it would be rare that we'd ever have more than 1 at a time 594 * anyways. 595 */ 596 rte_comp_op_free(deq_ops[i]); 597 598 /* Check if there are any pending comp ops to process, only pull one 599 * at a time off as _compress_operation() may re-queue the op. 600 */ 601 if (!TAILQ_EMPTY(&chan->queued_tasks)) { 602 task_to_resubmit = TAILQ_FIRST(&chan->queued_tasks); 603 rc = _compress_operation(chan, task_to_resubmit); 604 if (rc == 0) { 605 TAILQ_REMOVE(&chan->queued_tasks, task_to_resubmit, link); 606 } 607 } 608 } 609 610 return num_deq == 0 ? SPDK_POLLER_IDLE : SPDK_POLLER_BUSY; 611 } 612 613 static int 614 _process_single_task(struct spdk_io_channel *ch, struct spdk_accel_task *task) 615 { 616 struct compress_io_channel *chan = spdk_io_channel_get_ctx(ch); 617 int rc; 618 619 rc = _compress_operation(chan, task); 620 if (rc) { 621 SPDK_ERRLOG("Error (%d) in comrpess operation\n", rc); 622 assert(false); 623 } 624 625 return rc; 626 } 627 628 static int 629 compress_submit_tasks(struct spdk_io_channel *ch, struct spdk_accel_task *first_task) 630 { 631 struct compress_io_channel *chan = spdk_io_channel_get_ctx(ch); 632 struct spdk_accel_task *task, *tmp; 633 int rc = 0; 634 635 task = first_task; 636 637 if (!TAILQ_EMPTY(&chan->queued_tasks)) { 638 goto queue_tasks; 639 } 640 641 /* The caller will either submit a single task or a group of tasks that are 642 * linked together but they cannot be on a list. For example, see poller 643 * where a list of queued tasks is being resubmitted, the list they are on 644 * is initialized after saving off the first task from the list which is then 645 * passed in here. Similar thing is done in the accel framework. 646 */ 647 while (task) { 648 tmp = TAILQ_NEXT(task, link); 649 rc = _process_single_task(ch, task); 650 651 if (rc == -EBUSY) { 652 goto queue_tasks; 653 } else if (rc) { 654 spdk_accel_task_complete(task, rc); 655 } 656 task = tmp; 657 } 658 659 return 0; 660 661 queue_tasks: 662 while (task != NULL) { 663 tmp = TAILQ_NEXT(task, link); 664 TAILQ_INSERT_TAIL(&chan->queued_tasks, task, link); 665 task = tmp; 666 } 667 return 0; 668 } 669 670 static bool 671 _set_pmd(struct compress_io_channel *chan) 672 { 673 674 /* Note: the compress_isal PMD is not supported as accel_fw supports native ISAL 675 * using the accel_sw module */ 676 if (g_opts == COMPRESS_PMD_AUTO) { 677 if (g_qat_available) { 678 chan->drv_name = QAT_PMD; 679 } else if (g_mlx5_pci_available) { 680 chan->drv_name = MLX5_PMD; 681 } 682 } else if (g_opts == COMPRESS_PMD_QAT_ONLY && g_qat_available) { 683 chan->drv_name = QAT_PMD; 684 } else if (g_opts == COMPRESS_PMD_MLX5_PCI_ONLY && g_mlx5_pci_available) { 685 chan->drv_name = MLX5_PMD; 686 } else { 687 SPDK_ERRLOG("Requested PMD is not available.\n"); 688 return false; 689 } 690 SPDK_NOTICELOG("Channel %p PMD being used: %s\n", chan, chan->drv_name); 691 return true; 692 } 693 694 static int compress_create_cb(void *io_device, void *ctx_buf); 695 static void compress_destroy_cb(void *io_device, void *ctx_buf); 696 static struct spdk_accel_module_if g_compress_module; 697 static int 698 accel_compress_init(void) 699 { 700 int rc; 701 702 if (!g_compressdev_enable) { 703 return -EINVAL; 704 } 705 706 rc = accel_init_compress_drivers(); 707 if (rc) { 708 assert(TAILQ_EMPTY(&g_compress_devs)); 709 SPDK_NOTICELOG("no available compression devices\n"); 710 return -EINVAL; 711 } 712 713 g_compressdev_initialized = true; 714 spdk_io_device_register(&g_compress_module, compress_create_cb, compress_destroy_cb, 715 sizeof(struct compress_io_channel), "compressdev_accel_module"); 716 return 0; 717 } 718 719 static int 720 compress_create_cb(void *io_device, void *ctx_buf) 721 { 722 struct compress_io_channel *chan = ctx_buf; 723 const struct rte_compressdev_capabilities *capab; 724 struct comp_device_qp *device_qp; 725 size_t length; 726 727 if (_set_pmd(chan) == false) { 728 assert(false); 729 return -ENODEV; 730 } 731 732 /* The following variable length arrays of mbuf pointers are required to submit to compressdev */ 733 length = NUM_MBUFS * sizeof(void *); 734 chan->src_mbufs = spdk_zmalloc(length, 0x40, NULL, 735 SPDK_ENV_LCORE_ID_ANY, SPDK_MALLOC_DMA); 736 if (chan->src_mbufs == NULL) { 737 return -ENOMEM; 738 } 739 chan->dst_mbufs = spdk_zmalloc(length, 0x40, NULL, 740 SPDK_ENV_LCORE_ID_ANY, SPDK_MALLOC_DMA); 741 if (chan->dst_mbufs == NULL) { 742 free(chan->src_mbufs); 743 return -ENOMEM; 744 } 745 746 chan->poller = SPDK_POLLER_REGISTER(comp_dev_poller, chan, 0); 747 TAILQ_INIT(&chan->queued_tasks); 748 749 pthread_mutex_lock(&g_comp_device_qp_lock); 750 TAILQ_FOREACH(device_qp, &g_comp_device_qp, link) { 751 if (strcmp(device_qp->device->cdev_info.driver_name, chan->drv_name) == 0) { 752 if (device_qp->chan == NULL) { 753 chan->device_qp = device_qp; 754 device_qp->chan = chan; 755 break; 756 } 757 } 758 } 759 pthread_mutex_unlock(&g_comp_device_qp_lock); 760 761 if (chan->device_qp == NULL) { 762 SPDK_ERRLOG("out of qpairs, cannot assign one\n"); 763 assert(false); 764 return -ENOMEM; 765 } else { 766 capab = rte_compressdev_capability_get(0, RTE_COMP_ALGO_DEFLATE); 767 768 if (capab->comp_feature_flags & (RTE_COMP_FF_OOP_SGL_IN_SGL_OUT | RTE_COMP_FF_OOP_SGL_IN_LB_OUT)) { 769 chan->device_qp->device->sgl_in = true; 770 } 771 772 if (capab->comp_feature_flags & (RTE_COMP_FF_OOP_SGL_IN_SGL_OUT | RTE_COMP_FF_OOP_LB_IN_SGL_OUT)) { 773 chan->device_qp->device->sgl_out = true; 774 } 775 } 776 777 return 0; 778 } 779 780 static void 781 accel_compress_write_config_json(struct spdk_json_write_ctx *w) 782 { 783 if (g_compressdev_enable) { 784 spdk_json_write_object_begin(w); 785 spdk_json_write_named_string(w, "method", "compressdev_scan_accel_module"); 786 spdk_json_write_named_object_begin(w, "params"); 787 spdk_json_write_named_uint32(w, "pmd", g_opts); 788 spdk_json_write_object_end(w); 789 spdk_json_write_object_end(w); 790 } 791 } 792 793 static void 794 compress_destroy_cb(void *io_device, void *ctx_buf) 795 { 796 struct compress_io_channel *chan = ctx_buf; 797 struct comp_device_qp *device_qp = chan->device_qp; 798 799 spdk_free(chan->src_mbufs); 800 spdk_free(chan->dst_mbufs); 801 802 spdk_poller_unregister(&chan->poller); 803 804 pthread_mutex_lock(&g_comp_device_qp_lock); 805 chan->device_qp = NULL; 806 device_qp->chan = NULL; 807 pthread_mutex_unlock(&g_comp_device_qp_lock); 808 } 809 810 static size_t 811 accel_compress_get_ctx_size(void) 812 { 813 return 0; 814 } 815 816 static bool 817 compress_supports_opcode(enum spdk_accel_opcode opc) 818 { 819 if (g_mlx5_pci_available || g_qat_available) { 820 switch (opc) { 821 case SPDK_ACCEL_OPC_COMPRESS: 822 case SPDK_ACCEL_OPC_DECOMPRESS: 823 return true; 824 default: 825 break; 826 } 827 } 828 829 return false; 830 } 831 832 static struct spdk_io_channel * 833 compress_get_io_channel(void) 834 { 835 return spdk_get_io_channel(&g_compress_module); 836 } 837 838 static void accel_compress_exit(void *ctx); 839 static struct spdk_accel_module_if g_compress_module = { 840 .module_init = accel_compress_init, 841 .module_fini = accel_compress_exit, 842 .write_config_json = accel_compress_write_config_json, 843 .get_ctx_size = accel_compress_get_ctx_size, 844 .name = "dpdk_compressdev", 845 .supports_opcode = compress_supports_opcode, 846 .get_io_channel = compress_get_io_channel, 847 .submit_tasks = compress_submit_tasks 848 }; 849 850 void 851 accel_dpdk_compressdev_enable(void) 852 { 853 spdk_accel_module_list_add(&g_compress_module); 854 } 855 856 /* Callback for unregistering the IO device. */ 857 static void 858 _device_unregister_cb(void *io_device) 859 { 860 struct comp_device_qp *dev_qp; 861 struct compress_dev *device; 862 863 while ((device = TAILQ_FIRST(&g_compress_devs))) { 864 TAILQ_REMOVE(&g_compress_devs, device, link); 865 free(device); 866 } 867 868 while ((dev_qp = TAILQ_FIRST(&g_comp_device_qp))) { 869 TAILQ_REMOVE(&g_comp_device_qp, dev_qp, link); 870 free(dev_qp); 871 } 872 873 pthread_mutex_destroy(&g_comp_device_qp_lock); 874 875 rte_mempool_free(g_comp_op_mp); 876 rte_mempool_free(g_mbuf_mp); 877 878 spdk_accel_module_finish(); 879 } 880 881 static void 882 accel_compress_exit(void *ctx) 883 { 884 if (g_compressdev_initialized) { 885 spdk_io_device_unregister(&g_compress_module, _device_unregister_cb); 886 g_compressdev_initialized = false; 887 } else { 888 spdk_accel_module_finish(); 889 } 890 } 891