1 /* SPDX-License-Identifier: BSD-3-Clause 2 * Copyright (C) 2018 Intel Corporation. 3 * All rights reserved. 4 * Copyright (c) 2021, 2022 NVIDIA CORPORATION & AFFILIATES. All rights reserved. 5 */ 6 7 #include "vbdev_compress.h" 8 9 #include "spdk/reduce.h" 10 #include "spdk/stdinc.h" 11 #include "spdk/rpc.h" 12 #include "spdk/env.h" 13 #include "spdk/endian.h" 14 #include "spdk/string.h" 15 #include "spdk/thread.h" 16 #include "spdk/util.h" 17 #include "spdk/bdev_module.h" 18 #include "spdk/likely.h" 19 20 #include "spdk/log.h" 21 22 #include <rte_config.h> 23 #include <rte_bus_vdev.h> 24 #include <rte_compressdev.h> 25 #include <rte_comp.h> 26 #include <rte_mbuf_dyn.h> 27 28 /* Used to store IO context in mbuf */ 29 static const struct rte_mbuf_dynfield rte_mbuf_dynfield_io_context = { 30 .name = "context_reduce", 31 .size = sizeof(uint64_t), 32 .align = __alignof__(uint64_t), 33 .flags = 0, 34 }; 35 static int g_mbuf_offset; 36 37 #define NUM_MAX_XFORMS 2 38 #define NUM_MAX_INFLIGHT_OPS 128 39 #define DEFAULT_WINDOW_SIZE 15 40 /* We need extra mbufs per operation to accommodate host buffers that 41 * span a physical page boundary. 42 */ 43 #define MAX_MBUFS_PER_OP (REDUCE_MAX_IOVECS * 2) 44 #define CHUNK_SIZE (1024 * 16) 45 #define COMP_BDEV_NAME "compress" 46 #define BACKING_IO_SZ (4 * 1024) 47 48 #define ISAL_PMD "compress_isal" 49 #define QAT_PMD "compress_qat" 50 #define MLX5_PMD "mlx5_pci" 51 #define NUM_MBUFS 8192 52 #define POOL_CACHE_SIZE 256 53 54 static enum compress_pmd g_opts; 55 56 /* Global list of available compression devices. */ 57 struct compress_dev { 58 struct rte_compressdev_info cdev_info; /* includes device friendly name */ 59 uint8_t cdev_id; /* identifier for the device */ 60 void *comp_xform; /* shared private xform for comp on this PMD */ 61 void *decomp_xform; /* shared private xform for decomp on this PMD */ 62 TAILQ_ENTRY(compress_dev) link; 63 }; 64 static TAILQ_HEAD(, compress_dev) g_compress_devs = TAILQ_HEAD_INITIALIZER(g_compress_devs); 65 66 /* Although ISAL PMD reports 'unlimited' qpairs, it has an unplanned limit of 99 due to 67 * the length of the internal ring name that it creates, it breaks a limit in the generic 68 * ring code and fails the qp initialization. 69 * FIXME: Reduce number of qpairs to 48, due to issue #2338 70 */ 71 #define MAX_NUM_QP 48 72 /* Global list and lock for unique device/queue pair combos */ 73 struct comp_device_qp { 74 struct compress_dev *device; /* ptr to compression device */ 75 uint8_t qp; /* queue pair for this node */ 76 struct spdk_thread *thread; /* thread that this qp is assigned to */ 77 TAILQ_ENTRY(comp_device_qp) link; 78 }; 79 static TAILQ_HEAD(, comp_device_qp) g_comp_device_qp = TAILQ_HEAD_INITIALIZER(g_comp_device_qp); 80 static pthread_mutex_t g_comp_device_qp_lock = PTHREAD_MUTEX_INITIALIZER; 81 82 /* For queueing up compression operations that we can't submit for some reason */ 83 struct vbdev_comp_op { 84 struct spdk_reduce_backing_dev *backing_dev; 85 struct iovec *src_iovs; 86 int src_iovcnt; 87 struct iovec *dst_iovs; 88 int dst_iovcnt; 89 bool compress; 90 void *cb_arg; 91 TAILQ_ENTRY(vbdev_comp_op) link; 92 }; 93 94 struct vbdev_comp_delete_ctx { 95 spdk_delete_compress_complete cb_fn; 96 void *cb_arg; 97 int cb_rc; 98 struct spdk_thread *orig_thread; 99 }; 100 101 /* List of virtual bdevs and associated info for each. */ 102 struct vbdev_compress { 103 struct spdk_bdev *base_bdev; /* the thing we're attaching to */ 104 struct spdk_bdev_desc *base_desc; /* its descriptor we get from open */ 105 struct spdk_io_channel *base_ch; /* IO channel of base device */ 106 struct spdk_bdev comp_bdev; /* the compression virtual bdev */ 107 struct comp_io_channel *comp_ch; /* channel associated with this bdev */ 108 char *drv_name; /* name of the compression device driver */ 109 struct comp_device_qp *device_qp; 110 struct spdk_thread *reduce_thread; 111 pthread_mutex_t reduce_lock; 112 uint32_t ch_count; 113 TAILQ_HEAD(, spdk_bdev_io) pending_comp_ios; /* outstanding operations to a comp library */ 114 struct spdk_poller *poller; /* completion poller */ 115 struct spdk_reduce_vol_params params; /* params for the reduce volume */ 116 struct spdk_reduce_backing_dev backing_dev; /* backing device info for the reduce volume */ 117 struct spdk_reduce_vol *vol; /* the reduce volume */ 118 struct vbdev_comp_delete_ctx *delete_ctx; 119 bool orphaned; /* base bdev claimed but comp_bdev not registered */ 120 int reduce_errno; 121 TAILQ_HEAD(, vbdev_comp_op) queued_comp_ops; 122 TAILQ_ENTRY(vbdev_compress) link; 123 struct spdk_thread *thread; /* thread where base device is opened */ 124 }; 125 static TAILQ_HEAD(, vbdev_compress) g_vbdev_comp = TAILQ_HEAD_INITIALIZER(g_vbdev_comp); 126 127 /* The comp vbdev channel struct. It is allocated and freed on my behalf by the io channel code. 128 */ 129 struct comp_io_channel { 130 struct spdk_io_channel_iter *iter; /* used with for_each_channel in reset */ 131 }; 132 133 /* Per I/O context for the compression vbdev. */ 134 struct comp_bdev_io { 135 struct comp_io_channel *comp_ch; /* used in completion handling */ 136 struct vbdev_compress *comp_bdev; /* vbdev associated with this IO */ 137 struct spdk_bdev_io_wait_entry bdev_io_wait; /* for bdev_io_wait */ 138 struct spdk_bdev_io *orig_io; /* the original IO */ 139 struct spdk_io_channel *ch; /* for resubmission */ 140 int status; /* save for completion on orig thread */ 141 }; 142 143 /* Shared mempools between all devices on this system */ 144 static struct rte_mempool *g_mbuf_mp = NULL; /* mbuf mempool */ 145 static struct rte_mempool *g_comp_op_mp = NULL; /* comp operations, must be rte* mempool */ 146 static struct rte_mbuf_ext_shared_info g_shinfo = {}; /* used by DPDK mbuf macros */ 147 static bool g_qat_available = false; 148 static bool g_isal_available = false; 149 static bool g_mlx5_pci_available = false; 150 151 /* Create shared (between all ops per PMD) compress xforms. */ 152 static struct rte_comp_xform g_comp_xform = { 153 .type = RTE_COMP_COMPRESS, 154 .compress = { 155 .algo = RTE_COMP_ALGO_DEFLATE, 156 .deflate.huffman = RTE_COMP_HUFFMAN_DEFAULT, 157 .level = RTE_COMP_LEVEL_MAX, 158 .window_size = DEFAULT_WINDOW_SIZE, 159 .chksum = RTE_COMP_CHECKSUM_NONE, 160 .hash_algo = RTE_COMP_HASH_ALGO_NONE 161 } 162 }; 163 /* Create shared (between all ops per PMD) decompress xforms. */ 164 static struct rte_comp_xform g_decomp_xform = { 165 .type = RTE_COMP_DECOMPRESS, 166 .decompress = { 167 .algo = RTE_COMP_ALGO_DEFLATE, 168 .chksum = RTE_COMP_CHECKSUM_NONE, 169 .window_size = DEFAULT_WINDOW_SIZE, 170 .hash_algo = RTE_COMP_HASH_ALGO_NONE 171 } 172 }; 173 174 static void vbdev_compress_examine(struct spdk_bdev *bdev); 175 static int vbdev_compress_claim(struct vbdev_compress *comp_bdev); 176 static void vbdev_compress_queue_io(struct spdk_bdev_io *bdev_io); 177 struct vbdev_compress *_prepare_for_load_init(struct spdk_bdev_desc *bdev_desc, uint32_t lb_size); 178 static void vbdev_compress_submit_request(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io); 179 static void comp_bdev_ch_destroy_cb(void *io_device, void *ctx_buf); 180 static void vbdev_compress_delete_done(void *cb_arg, int bdeverrno); 181 182 /* Dummy function used by DPDK to free ext attached buffers 183 * to mbufs, we free them ourselves but this callback has to 184 * be here. 185 */ 186 static void 187 shinfo_free_cb(void *arg1, void *arg2) 188 { 189 } 190 191 /* Called by vbdev_init_compress_drivers() to init each discovered compression device */ 192 static int 193 create_compress_dev(uint8_t index) 194 { 195 struct compress_dev *device; 196 uint16_t q_pairs; 197 uint8_t cdev_id; 198 int rc, i; 199 struct comp_device_qp *dev_qp; 200 struct comp_device_qp *tmp_qp; 201 202 device = calloc(1, sizeof(struct compress_dev)); 203 if (!device) { 204 return -ENOMEM; 205 } 206 207 /* Get details about this device. */ 208 rte_compressdev_info_get(index, &device->cdev_info); 209 210 cdev_id = device->cdev_id = index; 211 212 /* Zero means no limit so choose number of lcores. */ 213 if (device->cdev_info.max_nb_queue_pairs == 0) { 214 q_pairs = MAX_NUM_QP; 215 } else { 216 q_pairs = spdk_min(device->cdev_info.max_nb_queue_pairs, MAX_NUM_QP); 217 } 218 219 /* Configure the compression device. */ 220 struct rte_compressdev_config config = { 221 .socket_id = rte_socket_id(), 222 .nb_queue_pairs = q_pairs, 223 .max_nb_priv_xforms = NUM_MAX_XFORMS, 224 .max_nb_streams = 0 225 }; 226 rc = rte_compressdev_configure(cdev_id, &config); 227 if (rc < 0) { 228 SPDK_ERRLOG("Failed to configure compressdev %u\n", cdev_id); 229 goto err; 230 } 231 232 /* Pre-setup all potential qpairs now and assign them in the channel 233 * callback. 234 */ 235 for (i = 0; i < q_pairs; i++) { 236 rc = rte_compressdev_queue_pair_setup(cdev_id, i, 237 NUM_MAX_INFLIGHT_OPS, 238 rte_socket_id()); 239 if (rc) { 240 if (i > 0) { 241 q_pairs = i; 242 SPDK_NOTICELOG("FYI failed to setup a queue pair on " 243 "compressdev %u with error %u " 244 "so limiting to %u qpairs\n", 245 cdev_id, rc, q_pairs); 246 break; 247 } else { 248 SPDK_ERRLOG("Failed to setup queue pair on " 249 "compressdev %u with error %u\n", cdev_id, rc); 250 rc = -EINVAL; 251 goto err; 252 } 253 } 254 } 255 256 rc = rte_compressdev_start(cdev_id); 257 if (rc < 0) { 258 SPDK_ERRLOG("Failed to start device %u: error %d\n", 259 cdev_id, rc); 260 goto err; 261 } 262 263 if (device->cdev_info.capabilities->comp_feature_flags & RTE_COMP_FF_SHAREABLE_PRIV_XFORM) { 264 rc = rte_compressdev_private_xform_create(cdev_id, &g_comp_xform, 265 &device->comp_xform); 266 if (rc < 0) { 267 SPDK_ERRLOG("Failed to create private comp xform device %u: error %d\n", 268 cdev_id, rc); 269 goto err; 270 } 271 272 rc = rte_compressdev_private_xform_create(cdev_id, &g_decomp_xform, 273 &device->decomp_xform); 274 if (rc) { 275 SPDK_ERRLOG("Failed to create private decomp xform device %u: error %d\n", 276 cdev_id, rc); 277 goto err; 278 } 279 } else { 280 SPDK_ERRLOG("PMD does not support shared transforms\n"); 281 goto err; 282 } 283 284 /* Build up list of device/qp combinations */ 285 for (i = 0; i < q_pairs; i++) { 286 dev_qp = calloc(1, sizeof(struct comp_device_qp)); 287 if (!dev_qp) { 288 rc = -ENOMEM; 289 goto err; 290 } 291 dev_qp->device = device; 292 dev_qp->qp = i; 293 dev_qp->thread = NULL; 294 TAILQ_INSERT_TAIL(&g_comp_device_qp, dev_qp, link); 295 } 296 297 TAILQ_INSERT_TAIL(&g_compress_devs, device, link); 298 299 if (strcmp(device->cdev_info.driver_name, QAT_PMD) == 0) { 300 g_qat_available = true; 301 } 302 if (strcmp(device->cdev_info.driver_name, ISAL_PMD) == 0) { 303 g_isal_available = true; 304 } 305 if (strcmp(device->cdev_info.driver_name, MLX5_PMD) == 0) { 306 g_mlx5_pci_available = true; 307 } 308 309 return 0; 310 311 err: 312 TAILQ_FOREACH_SAFE(dev_qp, &g_comp_device_qp, link, tmp_qp) { 313 TAILQ_REMOVE(&g_comp_device_qp, dev_qp, link); 314 free(dev_qp); 315 } 316 free(device); 317 return rc; 318 } 319 320 /* Called from driver init entry point, vbdev_compress_init() */ 321 static int 322 vbdev_init_compress_drivers(void) 323 { 324 uint8_t cdev_count, i; 325 struct compress_dev *tmp_dev; 326 struct compress_dev *device; 327 int rc; 328 329 /* We always init the compress_isal PMD */ 330 rc = rte_vdev_init(ISAL_PMD, NULL); 331 if (rc == 0) { 332 SPDK_NOTICELOG("created virtual PMD %s\n", ISAL_PMD); 333 } else if (rc == -EEXIST) { 334 SPDK_NOTICELOG("virtual PMD %s already exists.\n", ISAL_PMD); 335 } else { 336 SPDK_ERRLOG("creating virtual PMD %s\n", ISAL_PMD); 337 return -EINVAL; 338 } 339 340 /* If we have no compression devices, there's no reason to continue. */ 341 cdev_count = rte_compressdev_count(); 342 if (cdev_count == 0) { 343 return 0; 344 } 345 if (cdev_count > RTE_COMPRESS_MAX_DEVS) { 346 SPDK_ERRLOG("invalid device count from rte_compressdev_count()\n"); 347 return -EINVAL; 348 } 349 350 g_mbuf_offset = rte_mbuf_dynfield_register(&rte_mbuf_dynfield_io_context); 351 if (g_mbuf_offset < 0) { 352 SPDK_ERRLOG("error registering dynamic field with DPDK\n"); 353 return -EINVAL; 354 } 355 356 /* TODO: make these global pools per thread but do in a follow-up patch to make 357 * it easier to review against the old compressdev code */ 358 g_mbuf_mp = rte_pktmbuf_pool_create("comp_mbuf_mp", NUM_MBUFS, POOL_CACHE_SIZE, 359 sizeof(struct rte_mbuf), 0, rte_socket_id()); 360 if (g_mbuf_mp == NULL) { 361 SPDK_ERRLOG("Cannot create mbuf pool\n"); 362 rc = -ENOMEM; 363 goto error_create_mbuf; 364 } 365 366 g_comp_op_mp = rte_comp_op_pool_create("comp_op_pool", NUM_MBUFS, POOL_CACHE_SIZE, 367 0, rte_socket_id()); 368 if (g_comp_op_mp == NULL) { 369 SPDK_ERRLOG("Cannot create comp op pool\n"); 370 rc = -ENOMEM; 371 goto error_create_op; 372 } 373 374 /* Init all devices */ 375 for (i = 0; i < cdev_count; i++) { 376 rc = create_compress_dev(i); 377 if (rc != 0) { 378 goto error_create_compress_devs; 379 } 380 } 381 382 if (g_qat_available == true) { 383 SPDK_NOTICELOG("initialized QAT PMD\n"); 384 } 385 386 g_shinfo.free_cb = shinfo_free_cb; 387 388 return 0; 389 390 /* Error cleanup paths. */ 391 error_create_compress_devs: 392 TAILQ_FOREACH_SAFE(device, &g_compress_devs, link, tmp_dev) { 393 TAILQ_REMOVE(&g_compress_devs, device, link); 394 free(device); 395 } 396 error_create_op: 397 error_create_mbuf: 398 rte_mempool_free(g_mbuf_mp); 399 400 return rc; 401 } 402 403 /* for completing rw requests on the orig IO thread. */ 404 static void 405 _reduce_rw_blocks_cb(void *arg) 406 { 407 struct comp_bdev_io *io_ctx = arg; 408 409 if (spdk_likely(io_ctx->status == 0)) { 410 spdk_bdev_io_complete(io_ctx->orig_io, SPDK_BDEV_IO_STATUS_SUCCESS); 411 } else if (io_ctx->status == -ENOMEM) { 412 vbdev_compress_queue_io(spdk_bdev_io_from_ctx(io_ctx)); 413 } else { 414 SPDK_ERRLOG("status %d on operation from reduce API\n", io_ctx->status); 415 spdk_bdev_io_complete(io_ctx->orig_io, SPDK_BDEV_IO_STATUS_FAILED); 416 } 417 } 418 419 /* Completion callback for r/w that were issued via reducelib. */ 420 static void 421 reduce_rw_blocks_cb(void *arg, int reduce_errno) 422 { 423 struct spdk_bdev_io *bdev_io = arg; 424 struct comp_bdev_io *io_ctx = (struct comp_bdev_io *)bdev_io->driver_ctx; 425 struct spdk_io_channel *ch = spdk_io_channel_from_ctx(io_ctx->comp_ch); 426 struct spdk_thread *orig_thread; 427 428 /* TODO: need to decide which error codes are bdev_io success vs failure; 429 * example examine calls reading metadata */ 430 431 io_ctx->status = reduce_errno; 432 433 /* Send this request to the orig IO thread. */ 434 orig_thread = spdk_io_channel_get_thread(ch); 435 436 spdk_thread_exec_msg(orig_thread, _reduce_rw_blocks_cb, io_ctx); 437 } 438 439 static int 440 _setup_compress_mbuf(struct rte_mbuf **mbufs, int *mbuf_total, uint64_t *total_length, 441 struct iovec *iovs, int iovcnt, void *reduce_cb_arg) 442 { 443 uint64_t updated_length, remainder, phys_addr; 444 uint8_t *current_base = NULL; 445 int iov_index, mbuf_index; 446 int rc = 0; 447 448 /* Setup mbufs */ 449 iov_index = mbuf_index = 0; 450 while (iov_index < iovcnt) { 451 452 current_base = iovs[iov_index].iov_base; 453 if (total_length) { 454 *total_length += iovs[iov_index].iov_len; 455 } 456 assert(mbufs[mbuf_index] != NULL); 457 *RTE_MBUF_DYNFIELD(mbufs[mbuf_index], g_mbuf_offset, uint64_t *) = (uint64_t)reduce_cb_arg; 458 updated_length = iovs[iov_index].iov_len; 459 phys_addr = spdk_vtophys((void *)current_base, &updated_length); 460 461 rte_pktmbuf_attach_extbuf(mbufs[mbuf_index], 462 current_base, 463 phys_addr, 464 updated_length, 465 &g_shinfo); 466 rte_pktmbuf_append(mbufs[mbuf_index], updated_length); 467 remainder = iovs[iov_index].iov_len - updated_length; 468 469 if (mbuf_index > 0) { 470 rte_pktmbuf_chain(mbufs[0], mbufs[mbuf_index]); 471 } 472 473 /* If we crossed 2 physical pages boundary we need another mbuf for the remainder */ 474 if (remainder > 0) { 475 /* allocate an mbuf at the end of the array */ 476 rc = rte_pktmbuf_alloc_bulk(g_mbuf_mp, 477 (struct rte_mbuf **)&mbufs[*mbuf_total], 1); 478 if (rc) { 479 SPDK_ERRLOG("ERROR trying to get an extra mbuf!\n"); 480 return -1; 481 } 482 (*mbuf_total)++; 483 mbuf_index++; 484 *RTE_MBUF_DYNFIELD(mbufs[mbuf_index], g_mbuf_offset, uint64_t *) = (uint64_t)reduce_cb_arg; 485 current_base += updated_length; 486 phys_addr = spdk_vtophys((void *)current_base, &remainder); 487 /* assert we don't cross another */ 488 assert(remainder == iovs[iov_index].iov_len - updated_length); 489 490 rte_pktmbuf_attach_extbuf(mbufs[mbuf_index], 491 current_base, 492 phys_addr, 493 remainder, 494 &g_shinfo); 495 rte_pktmbuf_append(mbufs[mbuf_index], remainder); 496 rte_pktmbuf_chain(mbufs[0], mbufs[mbuf_index]); 497 } 498 iov_index++; 499 mbuf_index++; 500 } 501 502 return 0; 503 } 504 505 static int 506 _compress_operation(struct spdk_reduce_backing_dev *backing_dev, struct iovec *src_iovs, 507 int src_iovcnt, struct iovec *dst_iovs, 508 int dst_iovcnt, bool compress, void *cb_arg) 509 { 510 void *reduce_cb_arg = cb_arg; 511 struct vbdev_compress *comp_bdev = SPDK_CONTAINEROF(backing_dev, struct vbdev_compress, 512 backing_dev); 513 struct rte_comp_op *comp_op; 514 struct rte_mbuf *src_mbufs[MAX_MBUFS_PER_OP]; 515 struct rte_mbuf *dst_mbufs[MAX_MBUFS_PER_OP]; 516 uint8_t cdev_id = comp_bdev->device_qp->device->cdev_id; 517 uint64_t total_length = 0; 518 int rc = 0; 519 struct vbdev_comp_op *op_to_queue; 520 int src_mbuf_total = src_iovcnt; 521 int dst_mbuf_total = dst_iovcnt; 522 bool device_error = false; 523 524 assert(src_iovcnt < MAX_MBUFS_PER_OP); 525 526 #ifdef DEBUG 527 memset(src_mbufs, 0, sizeof(src_mbufs)); 528 memset(dst_mbufs, 0, sizeof(dst_mbufs)); 529 #endif 530 531 comp_op = rte_comp_op_alloc(g_comp_op_mp); 532 if (!comp_op) { 533 SPDK_ERRLOG("trying to get a comp op!\n"); 534 rc = -ENOMEM; 535 goto error_get_op; 536 } 537 538 /* get an mbuf per iov, src and dst */ 539 rc = rte_pktmbuf_alloc_bulk(g_mbuf_mp, (struct rte_mbuf **)&src_mbufs[0], src_iovcnt); 540 if (rc) { 541 SPDK_ERRLOG("ERROR trying to get src_mbufs!\n"); 542 rc = -ENOMEM; 543 goto error_get_src; 544 } 545 assert(src_mbufs[0]); 546 547 rc = rte_pktmbuf_alloc_bulk(g_mbuf_mp, (struct rte_mbuf **)&dst_mbufs[0], dst_iovcnt); 548 if (rc) { 549 SPDK_ERRLOG("ERROR trying to get dst_mbufs!\n"); 550 rc = -ENOMEM; 551 goto error_get_dst; 552 } 553 assert(dst_mbufs[0]); 554 555 /* There is a 1:1 mapping between a bdev_io and a compression operation 556 * Some PMDs that SPDK uses don't support chaining, but reduce library should 557 * provide correct buffers 558 * Build our mbuf chain and associate it with our single comp_op. 559 */ 560 rc = _setup_compress_mbuf(src_mbufs, &src_mbuf_total, &total_length, 561 src_iovs, src_iovcnt, reduce_cb_arg); 562 if (rc < 0) { 563 goto error_src_dst; 564 } 565 if (!comp_bdev->backing_dev.sgl_in && src_mbufs[0]->next != NULL) { 566 if (src_iovcnt == 1) { 567 SPDK_ERRLOG("Src buffer crosses physical page boundary but driver %s doesn't support SGL input\n", 568 comp_bdev->drv_name); 569 } else { 570 SPDK_ERRLOG("Driver %s doesn't support SGL input\n", comp_bdev->drv_name); 571 } 572 rc = -EINVAL; 573 goto error_src_dst; 574 } 575 576 comp_op->m_src = src_mbufs[0]; 577 comp_op->src.offset = 0; 578 comp_op->src.length = total_length; 579 580 rc = _setup_compress_mbuf(dst_mbufs, &dst_mbuf_total, NULL, 581 dst_iovs, dst_iovcnt, reduce_cb_arg); 582 if (rc < 0) { 583 goto error_src_dst; 584 } 585 if (!comp_bdev->backing_dev.sgl_out && dst_mbufs[0]->next != NULL) { 586 if (dst_iovcnt == 1) { 587 SPDK_ERRLOG("Dst buffer crosses physical page boundary but driver %s doesn't support SGL output\n", 588 comp_bdev->drv_name); 589 } else { 590 SPDK_ERRLOG("Driver %s doesn't support SGL output\n", comp_bdev->drv_name); 591 } 592 rc = -EINVAL; 593 goto error_src_dst; 594 } 595 596 comp_op->m_dst = dst_mbufs[0]; 597 comp_op->dst.offset = 0; 598 599 if (compress == true) { 600 comp_op->private_xform = comp_bdev->device_qp->device->comp_xform; 601 } else { 602 comp_op->private_xform = comp_bdev->device_qp->device->decomp_xform; 603 } 604 605 comp_op->op_type = RTE_COMP_OP_STATELESS; 606 comp_op->flush_flag = RTE_COMP_FLUSH_FINAL; 607 608 rc = rte_compressdev_enqueue_burst(cdev_id, comp_bdev->device_qp->qp, &comp_op, 1); 609 assert(rc <= 1); 610 611 /* We always expect 1 got queued, if 0 then we need to queue it up. */ 612 if (rc == 1) { 613 return 0; 614 } else if (comp_op->status == RTE_COMP_OP_STATUS_NOT_PROCESSED) { 615 rc = -EAGAIN; 616 } else { 617 device_error = true; 618 } 619 620 /* Error cleanup paths. */ 621 error_src_dst: 622 rte_pktmbuf_free_bulk(dst_mbufs, dst_iovcnt); 623 error_get_dst: 624 rte_pktmbuf_free_bulk(src_mbufs, src_iovcnt); 625 error_get_src: 626 rte_comp_op_free(comp_op); 627 error_get_op: 628 629 if (device_error == true) { 630 /* There was an error sending the op to the device, most 631 * likely with the parameters. 632 */ 633 SPDK_ERRLOG("Compression API returned 0x%x\n", comp_op->status); 634 return -EINVAL; 635 } 636 if (rc != -ENOMEM && rc != -EAGAIN) { 637 return rc; 638 } 639 640 op_to_queue = calloc(1, sizeof(struct vbdev_comp_op)); 641 if (op_to_queue == NULL) { 642 SPDK_ERRLOG("unable to allocate operation for queueing.\n"); 643 return -ENOMEM; 644 } 645 op_to_queue->backing_dev = backing_dev; 646 op_to_queue->src_iovs = src_iovs; 647 op_to_queue->src_iovcnt = src_iovcnt; 648 op_to_queue->dst_iovs = dst_iovs; 649 op_to_queue->dst_iovcnt = dst_iovcnt; 650 op_to_queue->compress = compress; 651 op_to_queue->cb_arg = cb_arg; 652 TAILQ_INSERT_TAIL(&comp_bdev->queued_comp_ops, 653 op_to_queue, 654 link); 655 return 0; 656 } 657 658 /* Poller for the DPDK compression driver. */ 659 static int 660 comp_dev_poller(void *args) 661 { 662 struct vbdev_compress *comp_bdev = args; 663 uint8_t cdev_id = comp_bdev->device_qp->device->cdev_id; 664 struct rte_comp_op *deq_ops[NUM_MAX_INFLIGHT_OPS]; 665 uint16_t num_deq; 666 struct spdk_reduce_vol_cb_args *reduce_args; 667 struct vbdev_comp_op *op_to_resubmit; 668 int rc, i; 669 670 num_deq = rte_compressdev_dequeue_burst(cdev_id, comp_bdev->device_qp->qp, deq_ops, 671 NUM_MAX_INFLIGHT_OPS); 672 for (i = 0; i < num_deq; i++) { 673 reduce_args = (struct spdk_reduce_vol_cb_args *)*RTE_MBUF_DYNFIELD(deq_ops[i]->m_src, g_mbuf_offset, 674 uint64_t *); 675 if (deq_ops[i]->status == RTE_COMP_OP_STATUS_SUCCESS) { 676 677 /* tell reduce this is done and what the bytecount was */ 678 reduce_args->cb_fn(reduce_args->cb_arg, deq_ops[i]->produced); 679 } else { 680 SPDK_NOTICELOG("FYI storing data uncompressed due to deque status %u\n", 681 deq_ops[i]->status); 682 683 /* Reduce will simply store uncompressed on neg errno value. */ 684 reduce_args->cb_fn(reduce_args->cb_arg, -EINVAL); 685 } 686 687 /* Now free both mbufs and the compress operation. The rte_pktmbuf_free() 688 * call takes care of freeing all of the mbufs in the chain back to their 689 * original pool. 690 */ 691 rte_pktmbuf_free(deq_ops[i]->m_src); 692 rte_pktmbuf_free(deq_ops[i]->m_dst); 693 694 /* There is no bulk free for com ops so we have to free them one at a time 695 * here however it would be rare that we'd ever have more than 1 at a time 696 * anyways. 697 */ 698 rte_comp_op_free(deq_ops[i]); 699 700 /* Check if there are any pending comp ops to process, only pull one 701 * at a time off as _compress_operation() may re-queue the op. 702 */ 703 if (!TAILQ_EMPTY(&comp_bdev->queued_comp_ops)) { 704 op_to_resubmit = TAILQ_FIRST(&comp_bdev->queued_comp_ops); 705 rc = _compress_operation(op_to_resubmit->backing_dev, 706 op_to_resubmit->src_iovs, 707 op_to_resubmit->src_iovcnt, 708 op_to_resubmit->dst_iovs, 709 op_to_resubmit->dst_iovcnt, 710 op_to_resubmit->compress, 711 op_to_resubmit->cb_arg); 712 if (rc == 0) { 713 TAILQ_REMOVE(&comp_bdev->queued_comp_ops, op_to_resubmit, link); 714 free(op_to_resubmit); 715 } 716 } 717 } 718 return num_deq == 0 ? SPDK_POLLER_IDLE : SPDK_POLLER_BUSY; 719 } 720 721 /* Entry point for reduce lib to issue a compress operation. */ 722 static void 723 _comp_reduce_compress(struct spdk_reduce_backing_dev *dev, 724 struct iovec *src_iovs, int src_iovcnt, 725 struct iovec *dst_iovs, int dst_iovcnt, 726 struct spdk_reduce_vol_cb_args *cb_arg) 727 { 728 int rc; 729 730 rc = _compress_operation(dev, src_iovs, src_iovcnt, dst_iovs, dst_iovcnt, true, cb_arg); 731 if (rc) { 732 SPDK_ERRLOG("with compress operation code %d (%s)\n", rc, spdk_strerror(-rc)); 733 cb_arg->cb_fn(cb_arg->cb_arg, rc); 734 } 735 } 736 737 /* Entry point for reduce lib to issue a decompress operation. */ 738 static void 739 _comp_reduce_decompress(struct spdk_reduce_backing_dev *dev, 740 struct iovec *src_iovs, int src_iovcnt, 741 struct iovec *dst_iovs, int dst_iovcnt, 742 struct spdk_reduce_vol_cb_args *cb_arg) 743 { 744 int rc; 745 746 rc = _compress_operation(dev, src_iovs, src_iovcnt, dst_iovs, dst_iovcnt, false, cb_arg); 747 if (rc) { 748 SPDK_ERRLOG("with decompress operation code %d (%s)\n", rc, spdk_strerror(-rc)); 749 cb_arg->cb_fn(cb_arg->cb_arg, rc); 750 } 751 } 752 753 static void 754 _comp_submit_write(void *ctx) 755 { 756 struct spdk_bdev_io *bdev_io = ctx; 757 struct vbdev_compress *comp_bdev = SPDK_CONTAINEROF(bdev_io->bdev, struct vbdev_compress, 758 comp_bdev); 759 760 spdk_reduce_vol_writev(comp_bdev->vol, bdev_io->u.bdev.iovs, bdev_io->u.bdev.iovcnt, 761 bdev_io->u.bdev.offset_blocks, bdev_io->u.bdev.num_blocks, 762 reduce_rw_blocks_cb, bdev_io); 763 } 764 765 static void 766 _comp_submit_read(void *ctx) 767 { 768 struct spdk_bdev_io *bdev_io = ctx; 769 struct vbdev_compress *comp_bdev = SPDK_CONTAINEROF(bdev_io->bdev, struct vbdev_compress, 770 comp_bdev); 771 772 spdk_reduce_vol_readv(comp_bdev->vol, bdev_io->u.bdev.iovs, bdev_io->u.bdev.iovcnt, 773 bdev_io->u.bdev.offset_blocks, bdev_io->u.bdev.num_blocks, 774 reduce_rw_blocks_cb, bdev_io); 775 } 776 777 778 /* Callback for getting a buf from the bdev pool in the event that the caller passed 779 * in NULL, we need to own the buffer so it doesn't get freed by another vbdev module 780 * beneath us before we're done with it. 781 */ 782 static void 783 comp_read_get_buf_cb(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io, bool success) 784 { 785 struct vbdev_compress *comp_bdev = SPDK_CONTAINEROF(bdev_io->bdev, struct vbdev_compress, 786 comp_bdev); 787 788 if (spdk_unlikely(!success)) { 789 SPDK_ERRLOG("Failed to get data buffer\n"); 790 reduce_rw_blocks_cb(bdev_io, -ENOMEM); 791 return; 792 } 793 794 spdk_thread_exec_msg(comp_bdev->reduce_thread, _comp_submit_read, bdev_io); 795 } 796 797 /* Called when someone above submits IO to this vbdev. */ 798 static void 799 vbdev_compress_submit_request(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io) 800 { 801 struct comp_bdev_io *io_ctx = (struct comp_bdev_io *)bdev_io->driver_ctx; 802 struct vbdev_compress *comp_bdev = SPDK_CONTAINEROF(bdev_io->bdev, struct vbdev_compress, 803 comp_bdev); 804 struct comp_io_channel *comp_ch = spdk_io_channel_get_ctx(ch); 805 806 memset(io_ctx, 0, sizeof(struct comp_bdev_io)); 807 io_ctx->comp_bdev = comp_bdev; 808 io_ctx->comp_ch = comp_ch; 809 io_ctx->orig_io = bdev_io; 810 811 switch (bdev_io->type) { 812 case SPDK_BDEV_IO_TYPE_READ: 813 spdk_bdev_io_get_buf(bdev_io, comp_read_get_buf_cb, 814 bdev_io->u.bdev.num_blocks * bdev_io->bdev->blocklen); 815 return; 816 case SPDK_BDEV_IO_TYPE_WRITE: 817 spdk_thread_exec_msg(comp_bdev->reduce_thread, _comp_submit_write, bdev_io); 818 return; 819 /* TODO support RESET in future patch in the series */ 820 case SPDK_BDEV_IO_TYPE_RESET: 821 case SPDK_BDEV_IO_TYPE_WRITE_ZEROES: 822 case SPDK_BDEV_IO_TYPE_UNMAP: 823 case SPDK_BDEV_IO_TYPE_FLUSH: 824 default: 825 SPDK_ERRLOG("Unknown I/O type %d\n", bdev_io->type); 826 spdk_bdev_io_complete(io_ctx->orig_io, SPDK_BDEV_IO_STATUS_FAILED); 827 break; 828 } 829 } 830 831 static bool 832 vbdev_compress_io_type_supported(void *ctx, enum spdk_bdev_io_type io_type) 833 { 834 struct vbdev_compress *comp_bdev = (struct vbdev_compress *)ctx; 835 836 switch (io_type) { 837 case SPDK_BDEV_IO_TYPE_READ: 838 case SPDK_BDEV_IO_TYPE_WRITE: 839 return spdk_bdev_io_type_supported(comp_bdev->base_bdev, io_type); 840 case SPDK_BDEV_IO_TYPE_UNMAP: 841 case SPDK_BDEV_IO_TYPE_RESET: 842 case SPDK_BDEV_IO_TYPE_FLUSH: 843 case SPDK_BDEV_IO_TYPE_WRITE_ZEROES: 844 default: 845 return false; 846 } 847 } 848 849 /* Resubmission function used by the bdev layer when a queued IO is ready to be 850 * submitted. 851 */ 852 static void 853 vbdev_compress_resubmit_io(void *arg) 854 { 855 struct spdk_bdev_io *bdev_io = (struct spdk_bdev_io *)arg; 856 struct comp_bdev_io *io_ctx = (struct comp_bdev_io *)bdev_io->driver_ctx; 857 858 vbdev_compress_submit_request(io_ctx->ch, bdev_io); 859 } 860 861 /* Used to queue an IO in the event of resource issues. */ 862 static void 863 vbdev_compress_queue_io(struct spdk_bdev_io *bdev_io) 864 { 865 struct comp_bdev_io *io_ctx = (struct comp_bdev_io *)bdev_io->driver_ctx; 866 int rc; 867 868 io_ctx->bdev_io_wait.bdev = bdev_io->bdev; 869 io_ctx->bdev_io_wait.cb_fn = vbdev_compress_resubmit_io; 870 io_ctx->bdev_io_wait.cb_arg = bdev_io; 871 872 rc = spdk_bdev_queue_io_wait(bdev_io->bdev, io_ctx->comp_bdev->base_ch, &io_ctx->bdev_io_wait); 873 if (rc) { 874 SPDK_ERRLOG("Queue io failed in vbdev_compress_queue_io, rc=%d.\n", rc); 875 assert(false); 876 spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED); 877 } 878 } 879 880 /* Callback for unregistering the IO device. */ 881 static void 882 _device_unregister_cb(void *io_device) 883 { 884 struct vbdev_compress *comp_bdev = io_device; 885 886 /* Done with this comp_bdev. */ 887 pthread_mutex_destroy(&comp_bdev->reduce_lock); 888 free(comp_bdev->comp_bdev.name); 889 free(comp_bdev); 890 } 891 892 static void 893 _vbdev_compress_destruct_cb(void *ctx) 894 { 895 struct vbdev_compress *comp_bdev = ctx; 896 897 TAILQ_REMOVE(&g_vbdev_comp, comp_bdev, link); 898 spdk_bdev_module_release_bdev(comp_bdev->base_bdev); 899 /* Close the underlying bdev on its same opened thread. */ 900 spdk_bdev_close(comp_bdev->base_desc); 901 comp_bdev->vol = NULL; 902 if (comp_bdev->orphaned == false) { 903 spdk_io_device_unregister(comp_bdev, _device_unregister_cb); 904 } else { 905 vbdev_compress_delete_done(comp_bdev->delete_ctx, 0); 906 _device_unregister_cb(comp_bdev); 907 } 908 } 909 910 static void 911 vbdev_compress_destruct_cb(void *cb_arg, int reduce_errno) 912 { 913 struct vbdev_compress *comp_bdev = (struct vbdev_compress *)cb_arg; 914 915 if (reduce_errno) { 916 SPDK_ERRLOG("number %d\n", reduce_errno); 917 } else { 918 if (comp_bdev->thread && comp_bdev->thread != spdk_get_thread()) { 919 spdk_thread_send_msg(comp_bdev->thread, 920 _vbdev_compress_destruct_cb, comp_bdev); 921 } else { 922 _vbdev_compress_destruct_cb(comp_bdev); 923 } 924 } 925 } 926 927 static void 928 _reduce_destroy_cb(void *ctx, int reduce_errno) 929 { 930 struct vbdev_compress *comp_bdev = (struct vbdev_compress *)ctx; 931 932 if (reduce_errno) { 933 SPDK_ERRLOG("number %d\n", reduce_errno); 934 } 935 936 comp_bdev->vol = NULL; 937 spdk_put_io_channel(comp_bdev->base_ch); 938 if (comp_bdev->orphaned == false) { 939 spdk_bdev_unregister(&comp_bdev->comp_bdev, vbdev_compress_delete_done, 940 comp_bdev->delete_ctx); 941 } else { 942 vbdev_compress_destruct_cb((void *)comp_bdev, 0); 943 } 944 945 } 946 947 static void 948 _delete_vol_unload_cb(void *ctx) 949 { 950 struct vbdev_compress *comp_bdev = ctx; 951 952 /* FIXME: Assert if these conditions are not satisfied for now. */ 953 assert(!comp_bdev->reduce_thread || 954 comp_bdev->reduce_thread == spdk_get_thread()); 955 956 /* reducelib needs a channel to comm with the backing device */ 957 comp_bdev->base_ch = spdk_bdev_get_io_channel(comp_bdev->base_desc); 958 959 /* Clean the device before we free our resources. */ 960 spdk_reduce_vol_destroy(&comp_bdev->backing_dev, _reduce_destroy_cb, comp_bdev); 961 } 962 963 /* Called by reduceLib after performing unload vol actions */ 964 static void 965 delete_vol_unload_cb(void *cb_arg, int reduce_errno) 966 { 967 struct vbdev_compress *comp_bdev = (struct vbdev_compress *)cb_arg; 968 969 if (reduce_errno) { 970 SPDK_ERRLOG("number %d\n", reduce_errno); 971 /* FIXME: callback should be executed. */ 972 return; 973 } 974 975 pthread_mutex_lock(&comp_bdev->reduce_lock); 976 if (comp_bdev->reduce_thread && comp_bdev->reduce_thread != spdk_get_thread()) { 977 spdk_thread_send_msg(comp_bdev->reduce_thread, 978 _delete_vol_unload_cb, comp_bdev); 979 pthread_mutex_unlock(&comp_bdev->reduce_lock); 980 } else { 981 pthread_mutex_unlock(&comp_bdev->reduce_lock); 982 983 _delete_vol_unload_cb(comp_bdev); 984 } 985 } 986 987 const char * 988 compress_get_name(const struct vbdev_compress *comp_bdev) 989 { 990 return comp_bdev->comp_bdev.name; 991 } 992 993 struct vbdev_compress * 994 compress_bdev_first(void) 995 { 996 struct vbdev_compress *comp_bdev; 997 998 comp_bdev = TAILQ_FIRST(&g_vbdev_comp); 999 1000 return comp_bdev; 1001 } 1002 1003 struct vbdev_compress * 1004 compress_bdev_next(struct vbdev_compress *prev) 1005 { 1006 struct vbdev_compress *comp_bdev; 1007 1008 comp_bdev = TAILQ_NEXT(prev, link); 1009 1010 return comp_bdev; 1011 } 1012 1013 bool 1014 compress_has_orphan(const char *name) 1015 { 1016 struct vbdev_compress *comp_bdev; 1017 1018 TAILQ_FOREACH(comp_bdev, &g_vbdev_comp, link) { 1019 if (comp_bdev->orphaned && strcmp(name, comp_bdev->comp_bdev.name) == 0) { 1020 return true; 1021 } 1022 } 1023 return false; 1024 } 1025 1026 /* Called after we've unregistered following a hot remove callback. 1027 * Our finish entry point will be called next. 1028 */ 1029 static int 1030 vbdev_compress_destruct(void *ctx) 1031 { 1032 struct vbdev_compress *comp_bdev = (struct vbdev_compress *)ctx; 1033 1034 if (comp_bdev->vol != NULL) { 1035 /* Tell reducelib that we're done with this volume. */ 1036 spdk_reduce_vol_unload(comp_bdev->vol, vbdev_compress_destruct_cb, comp_bdev); 1037 } else { 1038 vbdev_compress_destruct_cb(comp_bdev, 0); 1039 } 1040 1041 return 0; 1042 } 1043 1044 /* We supplied this as an entry point for upper layers who want to communicate to this 1045 * bdev. This is how they get a channel. 1046 */ 1047 static struct spdk_io_channel * 1048 vbdev_compress_get_io_channel(void *ctx) 1049 { 1050 struct vbdev_compress *comp_bdev = (struct vbdev_compress *)ctx; 1051 1052 /* The IO channel code will allocate a channel for us which consists of 1053 * the SPDK channel structure plus the size of our comp_io_channel struct 1054 * that we passed in when we registered our IO device. It will then call 1055 * our channel create callback to populate any elements that we need to 1056 * update. 1057 */ 1058 return spdk_get_io_channel(comp_bdev); 1059 } 1060 1061 /* This is the output for bdev_get_bdevs() for this vbdev */ 1062 static int 1063 vbdev_compress_dump_info_json(void *ctx, struct spdk_json_write_ctx *w) 1064 { 1065 struct vbdev_compress *comp_bdev = (struct vbdev_compress *)ctx; 1066 1067 spdk_json_write_name(w, "compress"); 1068 spdk_json_write_object_begin(w); 1069 spdk_json_write_named_string(w, "name", spdk_bdev_get_name(&comp_bdev->comp_bdev)); 1070 spdk_json_write_named_string(w, "base_bdev_name", spdk_bdev_get_name(comp_bdev->base_bdev)); 1071 spdk_json_write_named_string(w, "compression_pmd", comp_bdev->drv_name); 1072 spdk_json_write_object_end(w); 1073 1074 return 0; 1075 } 1076 1077 /* This is used to generate JSON that can configure this module to its current state. */ 1078 static int 1079 vbdev_compress_config_json(struct spdk_json_write_ctx *w) 1080 { 1081 struct vbdev_compress *comp_bdev; 1082 1083 TAILQ_FOREACH(comp_bdev, &g_vbdev_comp, link) { 1084 spdk_json_write_object_begin(w); 1085 spdk_json_write_named_string(w, "method", "bdev_compress_create"); 1086 spdk_json_write_named_object_begin(w, "params"); 1087 spdk_json_write_named_string(w, "base_bdev_name", spdk_bdev_get_name(comp_bdev->base_bdev)); 1088 spdk_json_write_named_string(w, "name", spdk_bdev_get_name(&comp_bdev->comp_bdev)); 1089 spdk_json_write_named_string(w, "compression_pmd", comp_bdev->drv_name); 1090 spdk_json_write_object_end(w); 1091 spdk_json_write_object_end(w); 1092 } 1093 return 0; 1094 } 1095 1096 static void 1097 _vbdev_reduce_init_cb(void *ctx) 1098 { 1099 struct vbdev_compress *meta_ctx = ctx; 1100 int rc; 1101 1102 assert(meta_ctx->base_desc != NULL); 1103 1104 /* We're done with metadata operations */ 1105 spdk_put_io_channel(meta_ctx->base_ch); 1106 1107 if (meta_ctx->vol) { 1108 rc = vbdev_compress_claim(meta_ctx); 1109 if (rc == 0) { 1110 return; 1111 } 1112 } 1113 1114 /* Close the underlying bdev on its same opened thread. */ 1115 spdk_bdev_close(meta_ctx->base_desc); 1116 free(meta_ctx); 1117 } 1118 1119 /* Callback from reduce for when init is complete. We'll pass the vbdev_comp struct 1120 * used for initial metadata operations to claim where it will be further filled out 1121 * and added to the global list. 1122 */ 1123 static void 1124 vbdev_reduce_init_cb(void *cb_arg, struct spdk_reduce_vol *vol, int reduce_errno) 1125 { 1126 struct vbdev_compress *meta_ctx = cb_arg; 1127 1128 if (reduce_errno == 0) { 1129 meta_ctx->vol = vol; 1130 } else { 1131 SPDK_ERRLOG("for vol %s, error %u\n", 1132 spdk_bdev_get_name(meta_ctx->base_bdev), reduce_errno); 1133 } 1134 1135 if (meta_ctx->thread && meta_ctx->thread != spdk_get_thread()) { 1136 spdk_thread_send_msg(meta_ctx->thread, _vbdev_reduce_init_cb, meta_ctx); 1137 } else { 1138 _vbdev_reduce_init_cb(meta_ctx); 1139 } 1140 } 1141 1142 /* Callback for the function used by reduceLib to perform IO to/from the backing device. We just 1143 * call the callback provided by reduceLib when it called the read/write/unmap function and 1144 * free the bdev_io. 1145 */ 1146 static void 1147 comp_reduce_io_cb(struct spdk_bdev_io *bdev_io, bool success, void *arg) 1148 { 1149 struct spdk_reduce_vol_cb_args *cb_args = arg; 1150 int reduce_errno; 1151 1152 if (success) { 1153 reduce_errno = 0; 1154 } else { 1155 reduce_errno = -EIO; 1156 } 1157 spdk_bdev_free_io(bdev_io); 1158 cb_args->cb_fn(cb_args->cb_arg, reduce_errno); 1159 } 1160 1161 /* This is the function provided to the reduceLib for sending reads directly to 1162 * the backing device. 1163 */ 1164 static void 1165 _comp_reduce_readv(struct spdk_reduce_backing_dev *dev, struct iovec *iov, int iovcnt, 1166 uint64_t lba, uint32_t lba_count, struct spdk_reduce_vol_cb_args *args) 1167 { 1168 struct vbdev_compress *comp_bdev = SPDK_CONTAINEROF(dev, struct vbdev_compress, 1169 backing_dev); 1170 int rc; 1171 1172 rc = spdk_bdev_readv_blocks(comp_bdev->base_desc, comp_bdev->base_ch, 1173 iov, iovcnt, lba, lba_count, 1174 comp_reduce_io_cb, 1175 args); 1176 if (rc) { 1177 if (rc == -ENOMEM) { 1178 SPDK_ERRLOG("No memory, start to queue io.\n"); 1179 /* TODO: there's no bdev_io to queue */ 1180 } else { 1181 SPDK_ERRLOG("submitting readv request\n"); 1182 } 1183 args->cb_fn(args->cb_arg, rc); 1184 } 1185 } 1186 1187 /* This is the function provided to the reduceLib for sending writes directly to 1188 * the backing device. 1189 */ 1190 static void 1191 _comp_reduce_writev(struct spdk_reduce_backing_dev *dev, struct iovec *iov, int iovcnt, 1192 uint64_t lba, uint32_t lba_count, struct spdk_reduce_vol_cb_args *args) 1193 { 1194 struct vbdev_compress *comp_bdev = SPDK_CONTAINEROF(dev, struct vbdev_compress, 1195 backing_dev); 1196 int rc; 1197 1198 rc = spdk_bdev_writev_blocks(comp_bdev->base_desc, comp_bdev->base_ch, 1199 iov, iovcnt, lba, lba_count, 1200 comp_reduce_io_cb, 1201 args); 1202 if (rc) { 1203 if (rc == -ENOMEM) { 1204 SPDK_ERRLOG("No memory, start to queue io.\n"); 1205 /* TODO: there's no bdev_io to queue */ 1206 } else { 1207 SPDK_ERRLOG("error submitting writev request\n"); 1208 } 1209 args->cb_fn(args->cb_arg, rc); 1210 } 1211 } 1212 1213 /* This is the function provided to the reduceLib for sending unmaps directly to 1214 * the backing device. 1215 */ 1216 static void 1217 _comp_reduce_unmap(struct spdk_reduce_backing_dev *dev, 1218 uint64_t lba, uint32_t lba_count, struct spdk_reduce_vol_cb_args *args) 1219 { 1220 struct vbdev_compress *comp_bdev = SPDK_CONTAINEROF(dev, struct vbdev_compress, 1221 backing_dev); 1222 int rc; 1223 1224 rc = spdk_bdev_unmap_blocks(comp_bdev->base_desc, comp_bdev->base_ch, 1225 lba, lba_count, 1226 comp_reduce_io_cb, 1227 args); 1228 1229 if (rc) { 1230 if (rc == -ENOMEM) { 1231 SPDK_ERRLOG("No memory, start to queue io.\n"); 1232 /* TODO: there's no bdev_io to queue */ 1233 } else { 1234 SPDK_ERRLOG("submitting unmap request\n"); 1235 } 1236 args->cb_fn(args->cb_arg, rc); 1237 } 1238 } 1239 1240 /* Called by reduceLib after performing unload vol actions following base bdev hotremove */ 1241 static void 1242 bdev_hotremove_vol_unload_cb(void *cb_arg, int reduce_errno) 1243 { 1244 struct vbdev_compress *comp_bdev = (struct vbdev_compress *)cb_arg; 1245 1246 if (reduce_errno) { 1247 SPDK_ERRLOG("number %d\n", reduce_errno); 1248 } 1249 1250 comp_bdev->vol = NULL; 1251 spdk_bdev_unregister(&comp_bdev->comp_bdev, NULL, NULL); 1252 } 1253 1254 static void 1255 vbdev_compress_base_bdev_hotremove_cb(struct spdk_bdev *bdev_find) 1256 { 1257 struct vbdev_compress *comp_bdev, *tmp; 1258 1259 TAILQ_FOREACH_SAFE(comp_bdev, &g_vbdev_comp, link, tmp) { 1260 if (bdev_find == comp_bdev->base_bdev) { 1261 /* Tell reduceLib that we're done with this volume. */ 1262 spdk_reduce_vol_unload(comp_bdev->vol, bdev_hotremove_vol_unload_cb, comp_bdev); 1263 } 1264 } 1265 } 1266 1267 /* Called when the underlying base bdev triggers asynchronous event such as bdev removal. */ 1268 static void 1269 vbdev_compress_base_bdev_event_cb(enum spdk_bdev_event_type type, struct spdk_bdev *bdev, 1270 void *event_ctx) 1271 { 1272 switch (type) { 1273 case SPDK_BDEV_EVENT_REMOVE: 1274 vbdev_compress_base_bdev_hotremove_cb(bdev); 1275 break; 1276 default: 1277 SPDK_NOTICELOG("Unsupported bdev event: type %d\n", type); 1278 break; 1279 } 1280 } 1281 1282 /* TODO: determine which parms we want user configurable, HC for now 1283 * params.vol_size 1284 * params.chunk_size 1285 * compression PMD, algorithm, window size, comp level, etc. 1286 * DEV_MD_PATH 1287 */ 1288 1289 /* Common function for init and load to allocate and populate the minimal 1290 * information for reducelib to init or load. 1291 */ 1292 struct vbdev_compress * 1293 _prepare_for_load_init(struct spdk_bdev_desc *bdev_desc, uint32_t lb_size) 1294 { 1295 struct vbdev_compress *meta_ctx; 1296 struct spdk_bdev *bdev; 1297 1298 meta_ctx = calloc(1, sizeof(struct vbdev_compress)); 1299 if (meta_ctx == NULL) { 1300 SPDK_ERRLOG("failed to alloc init contexts\n"); 1301 return NULL; 1302 } 1303 1304 meta_ctx->drv_name = "None"; 1305 meta_ctx->backing_dev.unmap = _comp_reduce_unmap; 1306 meta_ctx->backing_dev.readv = _comp_reduce_readv; 1307 meta_ctx->backing_dev.writev = _comp_reduce_writev; 1308 meta_ctx->backing_dev.compress = _comp_reduce_compress; 1309 meta_ctx->backing_dev.decompress = _comp_reduce_decompress; 1310 1311 meta_ctx->base_desc = bdev_desc; 1312 bdev = spdk_bdev_desc_get_bdev(bdev_desc); 1313 meta_ctx->base_bdev = bdev; 1314 1315 meta_ctx->backing_dev.blocklen = bdev->blocklen; 1316 meta_ctx->backing_dev.blockcnt = bdev->blockcnt; 1317 1318 meta_ctx->params.chunk_size = CHUNK_SIZE; 1319 if (lb_size == 0) { 1320 meta_ctx->params.logical_block_size = bdev->blocklen; 1321 } else { 1322 meta_ctx->params.logical_block_size = lb_size; 1323 } 1324 1325 meta_ctx->params.backing_io_unit_size = BACKING_IO_SZ; 1326 return meta_ctx; 1327 } 1328 1329 static bool 1330 _set_pmd(struct vbdev_compress *comp_dev) 1331 { 1332 if (g_opts == COMPRESS_PMD_AUTO) { 1333 if (g_qat_available) { 1334 comp_dev->drv_name = QAT_PMD; 1335 } else if (g_mlx5_pci_available) { 1336 comp_dev->drv_name = MLX5_PMD; 1337 } else { 1338 comp_dev->drv_name = ISAL_PMD; 1339 } 1340 } else if (g_opts == COMPRESS_PMD_QAT_ONLY && g_qat_available) { 1341 comp_dev->drv_name = QAT_PMD; 1342 } else if (g_opts == COMPRESS_PMD_ISAL_ONLY && g_isal_available) { 1343 comp_dev->drv_name = ISAL_PMD; 1344 } else if (g_opts == COMPRESS_PMD_MLX5_PCI_ONLY && g_mlx5_pci_available) { 1345 comp_dev->drv_name = MLX5_PMD; 1346 } else { 1347 SPDK_ERRLOG("Requested PMD is not available.\n"); 1348 return false; 1349 } 1350 SPDK_NOTICELOG("PMD being used: %s\n", comp_dev->drv_name); 1351 return true; 1352 } 1353 1354 /* Call reducelib to initialize a new volume */ 1355 static int 1356 vbdev_init_reduce(const char *bdev_name, const char *pm_path, uint32_t lb_size) 1357 { 1358 struct spdk_bdev_desc *bdev_desc = NULL; 1359 struct vbdev_compress *meta_ctx; 1360 int rc; 1361 1362 rc = spdk_bdev_open_ext(bdev_name, true, vbdev_compress_base_bdev_event_cb, 1363 NULL, &bdev_desc); 1364 if (rc) { 1365 SPDK_ERRLOG("could not open bdev %s\n", bdev_name); 1366 return rc; 1367 } 1368 1369 meta_ctx = _prepare_for_load_init(bdev_desc, lb_size); 1370 if (meta_ctx == NULL) { 1371 spdk_bdev_close(bdev_desc); 1372 return -EINVAL; 1373 } 1374 1375 if (_set_pmd(meta_ctx) == false) { 1376 SPDK_ERRLOG("could not find required pmd\n"); 1377 free(meta_ctx); 1378 spdk_bdev_close(bdev_desc); 1379 return -EINVAL; 1380 } 1381 1382 /* Save the thread where the base device is opened */ 1383 meta_ctx->thread = spdk_get_thread(); 1384 1385 meta_ctx->base_ch = spdk_bdev_get_io_channel(meta_ctx->base_desc); 1386 1387 spdk_reduce_vol_init(&meta_ctx->params, &meta_ctx->backing_dev, 1388 pm_path, 1389 vbdev_reduce_init_cb, 1390 meta_ctx); 1391 return 0; 1392 } 1393 1394 /* We provide this callback for the SPDK channel code to create a channel using 1395 * the channel struct we provided in our module get_io_channel() entry point. Here 1396 * we get and save off an underlying base channel of the device below us so that 1397 * we can communicate with the base bdev on a per channel basis. If we needed 1398 * our own poller for this vbdev, we'd register it here. 1399 */ 1400 static int 1401 comp_bdev_ch_create_cb(void *io_device, void *ctx_buf) 1402 { 1403 struct vbdev_compress *comp_bdev = io_device; 1404 struct comp_device_qp *device_qp; 1405 1406 /* TODO look into associating the device_qp with the channel vs the thread, 1407 * doing in next patch to make this one easier to review against code taken 1408 * from the vbdev module */ 1409 1410 /* Now set the reduce channel if it's not already set. */ 1411 pthread_mutex_lock(&comp_bdev->reduce_lock); 1412 if (comp_bdev->ch_count == 0) { 1413 /* We use this queue to track outstanding IO in our layer. */ 1414 TAILQ_INIT(&comp_bdev->pending_comp_ios); 1415 1416 /* We use this to queue up compression operations as needed. */ 1417 TAILQ_INIT(&comp_bdev->queued_comp_ops); 1418 1419 comp_bdev->base_ch = spdk_bdev_get_io_channel(comp_bdev->base_desc); 1420 comp_bdev->reduce_thread = spdk_get_thread(); 1421 comp_bdev->poller = SPDK_POLLER_REGISTER(comp_dev_poller, comp_bdev, 0); 1422 /* Now assign a q pair */ 1423 pthread_mutex_lock(&g_comp_device_qp_lock); 1424 TAILQ_FOREACH(device_qp, &g_comp_device_qp, link) { 1425 if (strcmp(device_qp->device->cdev_info.driver_name, comp_bdev->drv_name) == 0) { 1426 if (device_qp->thread == spdk_get_thread()) { 1427 comp_bdev->device_qp = device_qp; 1428 break; 1429 } 1430 if (device_qp->thread == NULL) { 1431 comp_bdev->device_qp = device_qp; 1432 device_qp->thread = spdk_get_thread(); 1433 break; 1434 } 1435 } 1436 } 1437 pthread_mutex_unlock(&g_comp_device_qp_lock); 1438 } 1439 comp_bdev->ch_count++; 1440 pthread_mutex_unlock(&comp_bdev->reduce_lock); 1441 1442 if (comp_bdev->device_qp != NULL) { 1443 uint64_t comp_feature_flags = 1444 comp_bdev->device_qp->device->cdev_info.capabilities[RTE_COMP_ALGO_DEFLATE].comp_feature_flags; 1445 1446 if (comp_feature_flags & (RTE_COMP_FF_OOP_SGL_IN_SGL_OUT | RTE_COMP_FF_OOP_SGL_IN_LB_OUT)) { 1447 comp_bdev->backing_dev.sgl_in = true; 1448 } 1449 if (comp_feature_flags & (RTE_COMP_FF_OOP_SGL_IN_SGL_OUT | RTE_COMP_FF_OOP_LB_IN_SGL_OUT)) { 1450 comp_bdev->backing_dev.sgl_out = true; 1451 } 1452 return 0; 1453 } else { 1454 SPDK_ERRLOG("out of qpairs, cannot assign one to comp_bdev %p\n", comp_bdev); 1455 assert(false); 1456 return -ENOMEM; 1457 } 1458 } 1459 1460 static void 1461 _channel_cleanup(struct vbdev_compress *comp_bdev) 1462 { 1463 /* Note: comp_bdevs can share a device_qp if they are 1464 * on the same thread so we leave the device_qp element 1465 * alone for this comp_bdev and just clear the reduce thread. 1466 */ 1467 spdk_put_io_channel(comp_bdev->base_ch); 1468 comp_bdev->reduce_thread = NULL; 1469 spdk_poller_unregister(&comp_bdev->poller); 1470 } 1471 1472 /* Used to reroute destroy_ch to the correct thread */ 1473 static void 1474 _comp_bdev_ch_destroy_cb(void *arg) 1475 { 1476 struct vbdev_compress *comp_bdev = arg; 1477 1478 pthread_mutex_lock(&comp_bdev->reduce_lock); 1479 _channel_cleanup(comp_bdev); 1480 pthread_mutex_unlock(&comp_bdev->reduce_lock); 1481 } 1482 1483 /* We provide this callback for the SPDK channel code to destroy a channel 1484 * created with our create callback. We just need to undo anything we did 1485 * when we created. If this bdev used its own poller, we'd unregister it here. 1486 */ 1487 static void 1488 comp_bdev_ch_destroy_cb(void *io_device, void *ctx_buf) 1489 { 1490 struct vbdev_compress *comp_bdev = io_device; 1491 1492 pthread_mutex_lock(&comp_bdev->reduce_lock); 1493 comp_bdev->ch_count--; 1494 if (comp_bdev->ch_count == 0) { 1495 /* Send this request to the thread where the channel was created. */ 1496 if (comp_bdev->reduce_thread != spdk_get_thread()) { 1497 spdk_thread_send_msg(comp_bdev->reduce_thread, 1498 _comp_bdev_ch_destroy_cb, comp_bdev); 1499 } else { 1500 _channel_cleanup(comp_bdev); 1501 } 1502 } 1503 pthread_mutex_unlock(&comp_bdev->reduce_lock); 1504 } 1505 1506 /* RPC entry point for compression vbdev creation. */ 1507 int 1508 create_compress_bdev(const char *bdev_name, const char *pm_path, uint32_t lb_size) 1509 { 1510 struct vbdev_compress *comp_bdev = NULL; 1511 1512 if ((lb_size != 0) && (lb_size != LB_SIZE_4K) && (lb_size != LB_SIZE_512B)) { 1513 SPDK_ERRLOG("Logical block size must be 512 or 4096\n"); 1514 return -EINVAL; 1515 } 1516 1517 TAILQ_FOREACH(comp_bdev, &g_vbdev_comp, link) { 1518 if (strcmp(bdev_name, comp_bdev->base_bdev->name) == 0) { 1519 SPDK_ERRLOG("Bass bdev %s already being used for a compress bdev\n", bdev_name); 1520 return -EBUSY; 1521 } 1522 } 1523 return vbdev_init_reduce(bdev_name, pm_path, lb_size); 1524 } 1525 1526 /* On init, just init the compress drivers. All metadata is stored on disk. */ 1527 static int 1528 vbdev_compress_init(void) 1529 { 1530 if (vbdev_init_compress_drivers()) { 1531 SPDK_ERRLOG("Error setting up compression devices\n"); 1532 return -EINVAL; 1533 } 1534 1535 return 0; 1536 } 1537 1538 /* Called when the entire module is being torn down. */ 1539 static void 1540 vbdev_compress_finish(void) 1541 { 1542 struct comp_device_qp *dev_qp; 1543 /* TODO: unload vol in a future patch */ 1544 1545 while ((dev_qp = TAILQ_FIRST(&g_comp_device_qp))) { 1546 TAILQ_REMOVE(&g_comp_device_qp, dev_qp, link); 1547 free(dev_qp); 1548 } 1549 pthread_mutex_destroy(&g_comp_device_qp_lock); 1550 1551 rte_mempool_free(g_comp_op_mp); 1552 rte_mempool_free(g_mbuf_mp); 1553 } 1554 1555 /* During init we'll be asked how much memory we'd like passed to us 1556 * in bev_io structures as context. Here's where we specify how 1557 * much context we want per IO. 1558 */ 1559 static int 1560 vbdev_compress_get_ctx_size(void) 1561 { 1562 return sizeof(struct comp_bdev_io); 1563 } 1564 1565 /* When we register our bdev this is how we specify our entry points. */ 1566 static const struct spdk_bdev_fn_table vbdev_compress_fn_table = { 1567 .destruct = vbdev_compress_destruct, 1568 .submit_request = vbdev_compress_submit_request, 1569 .io_type_supported = vbdev_compress_io_type_supported, 1570 .get_io_channel = vbdev_compress_get_io_channel, 1571 .dump_info_json = vbdev_compress_dump_info_json, 1572 .write_config_json = NULL, 1573 }; 1574 1575 static struct spdk_bdev_module compress_if = { 1576 .name = "compress", 1577 .module_init = vbdev_compress_init, 1578 .get_ctx_size = vbdev_compress_get_ctx_size, 1579 .examine_disk = vbdev_compress_examine, 1580 .module_fini = vbdev_compress_finish, 1581 .config_json = vbdev_compress_config_json 1582 }; 1583 1584 SPDK_BDEV_MODULE_REGISTER(compress, &compress_if) 1585 1586 static int _set_compbdev_name(struct vbdev_compress *comp_bdev) 1587 { 1588 struct spdk_bdev_alias *aliases; 1589 1590 if (!TAILQ_EMPTY(spdk_bdev_get_aliases(comp_bdev->base_bdev))) { 1591 aliases = TAILQ_FIRST(spdk_bdev_get_aliases(comp_bdev->base_bdev)); 1592 comp_bdev->comp_bdev.name = spdk_sprintf_alloc("COMP_%s", aliases->alias.name); 1593 if (!comp_bdev->comp_bdev.name) { 1594 SPDK_ERRLOG("could not allocate comp_bdev name for alias\n"); 1595 return -ENOMEM; 1596 } 1597 } else { 1598 comp_bdev->comp_bdev.name = spdk_sprintf_alloc("COMP_%s", comp_bdev->base_bdev->name); 1599 if (!comp_bdev->comp_bdev.name) { 1600 SPDK_ERRLOG("could not allocate comp_bdev name for unique name\n"); 1601 return -ENOMEM; 1602 } 1603 } 1604 return 0; 1605 } 1606 1607 static int 1608 vbdev_compress_claim(struct vbdev_compress *comp_bdev) 1609 { 1610 int rc; 1611 1612 if (_set_compbdev_name(comp_bdev)) { 1613 return -EINVAL; 1614 } 1615 1616 /* Note: some of the fields below will change in the future - for example, 1617 * blockcnt specifically will not match (the compressed volume size will 1618 * be slightly less than the base bdev size) 1619 */ 1620 comp_bdev->comp_bdev.product_name = COMP_BDEV_NAME; 1621 comp_bdev->comp_bdev.write_cache = comp_bdev->base_bdev->write_cache; 1622 1623 if (strcmp(comp_bdev->drv_name, QAT_PMD) == 0) { 1624 comp_bdev->comp_bdev.required_alignment = 1625 spdk_max(spdk_u32log2(comp_bdev->base_bdev->blocklen), 1626 comp_bdev->base_bdev->required_alignment); 1627 SPDK_NOTICELOG("QAT in use: Required alignment set to %u\n", 1628 comp_bdev->comp_bdev.required_alignment); 1629 } else { 1630 comp_bdev->comp_bdev.required_alignment = comp_bdev->base_bdev->required_alignment; 1631 } 1632 comp_bdev->comp_bdev.optimal_io_boundary = 1633 comp_bdev->params.chunk_size / comp_bdev->params.logical_block_size; 1634 1635 comp_bdev->comp_bdev.split_on_optimal_io_boundary = true; 1636 1637 comp_bdev->comp_bdev.blocklen = comp_bdev->params.logical_block_size; 1638 comp_bdev->comp_bdev.blockcnt = comp_bdev->params.vol_size / comp_bdev->comp_bdev.blocklen; 1639 assert(comp_bdev->comp_bdev.blockcnt > 0); 1640 1641 /* This is the context that is passed to us when the bdev 1642 * layer calls in so we'll save our comp_bdev node here. 1643 */ 1644 comp_bdev->comp_bdev.ctxt = comp_bdev; 1645 comp_bdev->comp_bdev.fn_table = &vbdev_compress_fn_table; 1646 comp_bdev->comp_bdev.module = &compress_if; 1647 1648 pthread_mutex_init(&comp_bdev->reduce_lock, NULL); 1649 1650 /* Save the thread where the base device is opened */ 1651 comp_bdev->thread = spdk_get_thread(); 1652 1653 spdk_io_device_register(comp_bdev, comp_bdev_ch_create_cb, comp_bdev_ch_destroy_cb, 1654 sizeof(struct comp_io_channel), 1655 comp_bdev->comp_bdev.name); 1656 1657 rc = spdk_bdev_module_claim_bdev(comp_bdev->base_bdev, comp_bdev->base_desc, 1658 comp_bdev->comp_bdev.module); 1659 if (rc) { 1660 SPDK_ERRLOG("could not claim bdev %s\n", spdk_bdev_get_name(comp_bdev->base_bdev)); 1661 goto error_claim; 1662 } 1663 1664 rc = spdk_bdev_register(&comp_bdev->comp_bdev); 1665 if (rc < 0) { 1666 SPDK_ERRLOG("trying to register bdev\n"); 1667 goto error_bdev_register; 1668 } 1669 1670 TAILQ_INSERT_TAIL(&g_vbdev_comp, comp_bdev, link); 1671 1672 SPDK_NOTICELOG("registered io_device and virtual bdev for: %s\n", comp_bdev->comp_bdev.name); 1673 1674 return 0; 1675 1676 /* Error cleanup paths. */ 1677 error_bdev_register: 1678 spdk_bdev_module_release_bdev(comp_bdev->base_bdev); 1679 error_claim: 1680 spdk_io_device_unregister(comp_bdev, NULL); 1681 free(comp_bdev->comp_bdev.name); 1682 return rc; 1683 } 1684 1685 static void 1686 _vbdev_compress_delete_done(void *_ctx) 1687 { 1688 struct vbdev_comp_delete_ctx *ctx = _ctx; 1689 1690 ctx->cb_fn(ctx->cb_arg, ctx->cb_rc); 1691 1692 free(ctx); 1693 } 1694 1695 static void 1696 vbdev_compress_delete_done(void *cb_arg, int bdeverrno) 1697 { 1698 struct vbdev_comp_delete_ctx *ctx = cb_arg; 1699 1700 ctx->cb_rc = bdeverrno; 1701 1702 if (ctx->orig_thread != spdk_get_thread()) { 1703 spdk_thread_send_msg(ctx->orig_thread, _vbdev_compress_delete_done, ctx); 1704 } else { 1705 _vbdev_compress_delete_done(ctx); 1706 } 1707 } 1708 1709 void 1710 bdev_compress_delete(const char *name, spdk_delete_compress_complete cb_fn, void *cb_arg) 1711 { 1712 struct vbdev_compress *comp_bdev = NULL; 1713 struct vbdev_comp_delete_ctx *ctx; 1714 1715 TAILQ_FOREACH(comp_bdev, &g_vbdev_comp, link) { 1716 if (strcmp(name, comp_bdev->comp_bdev.name) == 0) { 1717 break; 1718 } 1719 } 1720 1721 if (comp_bdev == NULL) { 1722 cb_fn(cb_arg, -ENODEV); 1723 return; 1724 } 1725 1726 ctx = calloc(1, sizeof(*ctx)); 1727 if (ctx == NULL) { 1728 SPDK_ERRLOG("Failed to allocate delete context\n"); 1729 cb_fn(cb_arg, -ENOMEM); 1730 return; 1731 } 1732 1733 /* Save these for after the vol is destroyed. */ 1734 ctx->cb_fn = cb_fn; 1735 ctx->cb_arg = cb_arg; 1736 ctx->orig_thread = spdk_get_thread(); 1737 1738 comp_bdev->delete_ctx = ctx; 1739 1740 /* Tell reducelib that we're done with this volume. */ 1741 if (comp_bdev->orphaned == false) { 1742 spdk_reduce_vol_unload(comp_bdev->vol, delete_vol_unload_cb, comp_bdev); 1743 } else { 1744 delete_vol_unload_cb(comp_bdev, 0); 1745 } 1746 } 1747 1748 static void 1749 _vbdev_reduce_load_cb(void *ctx) 1750 { 1751 struct vbdev_compress *meta_ctx = ctx; 1752 int rc; 1753 1754 assert(meta_ctx->base_desc != NULL); 1755 1756 /* Done with metadata operations */ 1757 spdk_put_io_channel(meta_ctx->base_ch); 1758 1759 if (meta_ctx->reduce_errno == 0) { 1760 if (_set_pmd(meta_ctx) == false) { 1761 SPDK_ERRLOG("could not find required pmd\n"); 1762 goto err; 1763 } 1764 1765 rc = vbdev_compress_claim(meta_ctx); 1766 if (rc != 0) { 1767 goto err; 1768 } 1769 } else if (meta_ctx->reduce_errno == -ENOENT) { 1770 if (_set_compbdev_name(meta_ctx)) { 1771 goto err; 1772 } 1773 1774 /* Save the thread where the base device is opened */ 1775 meta_ctx->thread = spdk_get_thread(); 1776 1777 meta_ctx->comp_bdev.module = &compress_if; 1778 pthread_mutex_init(&meta_ctx->reduce_lock, NULL); 1779 rc = spdk_bdev_module_claim_bdev(meta_ctx->base_bdev, meta_ctx->base_desc, 1780 meta_ctx->comp_bdev.module); 1781 if (rc) { 1782 SPDK_ERRLOG("could not claim bdev %s\n", spdk_bdev_get_name(meta_ctx->base_bdev)); 1783 free(meta_ctx->comp_bdev.name); 1784 goto err; 1785 } 1786 1787 meta_ctx->orphaned = true; 1788 TAILQ_INSERT_TAIL(&g_vbdev_comp, meta_ctx, link); 1789 } else { 1790 if (meta_ctx->reduce_errno != -EILSEQ) { 1791 SPDK_ERRLOG("for vol %s, error %u\n", 1792 spdk_bdev_get_name(meta_ctx->base_bdev), meta_ctx->reduce_errno); 1793 } 1794 goto err; 1795 } 1796 1797 spdk_bdev_module_examine_done(&compress_if); 1798 return; 1799 1800 err: 1801 /* Close the underlying bdev on its same opened thread. */ 1802 spdk_bdev_close(meta_ctx->base_desc); 1803 free(meta_ctx); 1804 spdk_bdev_module_examine_done(&compress_if); 1805 } 1806 1807 /* Callback from reduce for then load is complete. We'll pass the vbdev_comp struct 1808 * used for initial metadata operations to claim where it will be further filled out 1809 * and added to the global list. 1810 */ 1811 static void 1812 vbdev_reduce_load_cb(void *cb_arg, struct spdk_reduce_vol *vol, int reduce_errno) 1813 { 1814 struct vbdev_compress *meta_ctx = cb_arg; 1815 1816 if (reduce_errno == 0) { 1817 /* Update information following volume load. */ 1818 meta_ctx->vol = vol; 1819 memcpy(&meta_ctx->params, spdk_reduce_vol_get_params(vol), 1820 sizeof(struct spdk_reduce_vol_params)); 1821 } 1822 1823 meta_ctx->reduce_errno = reduce_errno; 1824 1825 if (meta_ctx->thread && meta_ctx->thread != spdk_get_thread()) { 1826 spdk_thread_send_msg(meta_ctx->thread, _vbdev_reduce_load_cb, meta_ctx); 1827 } else { 1828 _vbdev_reduce_load_cb(meta_ctx); 1829 } 1830 1831 } 1832 1833 /* Examine_disk entry point: will do a metadata load to see if this is ours, 1834 * and if so will go ahead and claim it. 1835 */ 1836 static void 1837 vbdev_compress_examine(struct spdk_bdev *bdev) 1838 { 1839 struct spdk_bdev_desc *bdev_desc = NULL; 1840 struct vbdev_compress *meta_ctx; 1841 int rc; 1842 1843 if (strcmp(bdev->product_name, COMP_BDEV_NAME) == 0) { 1844 spdk_bdev_module_examine_done(&compress_if); 1845 return; 1846 } 1847 1848 rc = spdk_bdev_open_ext(spdk_bdev_get_name(bdev), false, 1849 vbdev_compress_base_bdev_event_cb, NULL, &bdev_desc); 1850 if (rc) { 1851 SPDK_ERRLOG("could not open bdev %s\n", spdk_bdev_get_name(bdev)); 1852 spdk_bdev_module_examine_done(&compress_if); 1853 return; 1854 } 1855 1856 meta_ctx = _prepare_for_load_init(bdev_desc, 0); 1857 if (meta_ctx == NULL) { 1858 spdk_bdev_close(bdev_desc); 1859 spdk_bdev_module_examine_done(&compress_if); 1860 return; 1861 } 1862 1863 /* Save the thread where the base device is opened */ 1864 meta_ctx->thread = spdk_get_thread(); 1865 1866 meta_ctx->base_ch = spdk_bdev_get_io_channel(meta_ctx->base_desc); 1867 spdk_reduce_vol_load(&meta_ctx->backing_dev, vbdev_reduce_load_cb, meta_ctx); 1868 } 1869 1870 int 1871 compress_set_pmd(enum compress_pmd *opts) 1872 { 1873 g_opts = *opts; 1874 1875 return 0; 1876 } 1877 1878 SPDK_LOG_REGISTER_COMPONENT(vbdev_compress) 1879