1 /* SPDX-License-Identifier: BSD-3-Clause 2 * Copyright (C) 2018 Intel Corporation. 3 * All rights reserved. 4 * Copyright (c) 2021, 2022 NVIDIA CORPORATION & AFFILIATES. All rights reserved. 5 */ 6 7 #include "vbdev_compress.h" 8 9 #include "spdk/reduce.h" 10 #include "spdk/stdinc.h" 11 #include "spdk/rpc.h" 12 #include "spdk/env.h" 13 #include "spdk/endian.h" 14 #include "spdk/string.h" 15 #include "spdk/thread.h" 16 #include "spdk/util.h" 17 #include "spdk/bdev_module.h" 18 #include "spdk/likely.h" 19 20 #include "spdk/log.h" 21 22 #include <rte_config.h> 23 #include <rte_bus_vdev.h> 24 #include <rte_compressdev.h> 25 #include <rte_comp.h> 26 #include <rte_mbuf_dyn.h> 27 28 /* Used to store IO context in mbuf */ 29 static const struct rte_mbuf_dynfield rte_mbuf_dynfield_io_context = { 30 .name = "context_reduce", 31 .size = sizeof(uint64_t), 32 .align = __alignof__(uint64_t), 33 .flags = 0, 34 }; 35 static int g_mbuf_offset; 36 37 #define NUM_MAX_XFORMS 2 38 #define NUM_MAX_INFLIGHT_OPS 128 39 #define DEFAULT_WINDOW_SIZE 15 40 /* We need extra mbufs per operation to accommodate host buffers that 41 * span a physical page boundary. 42 */ 43 #define MAX_MBUFS_PER_OP (REDUCE_MAX_IOVECS * 2) 44 #define CHUNK_SIZE (1024 * 16) 45 #define COMP_BDEV_NAME "compress" 46 #define BACKING_IO_SZ (4 * 1024) 47 48 #define ISAL_PMD "compress_isal" 49 #define QAT_PMD "compress_qat" 50 #define MLX5_PMD "mlx5_pci" 51 #define NUM_MBUFS 8192 52 #define POOL_CACHE_SIZE 256 53 54 static enum compress_pmd g_opts; 55 56 /* Global list of available compression devices. */ 57 struct compress_dev { 58 struct rte_compressdev_info cdev_info; /* includes device friendly name */ 59 uint8_t cdev_id; /* identifier for the device */ 60 void *comp_xform; /* shared private xform for comp on this PMD */ 61 void *decomp_xform; /* shared private xform for decomp on this PMD */ 62 TAILQ_ENTRY(compress_dev) link; 63 }; 64 static TAILQ_HEAD(, compress_dev) g_compress_devs = TAILQ_HEAD_INITIALIZER(g_compress_devs); 65 66 /* Although ISAL PMD reports 'unlimited' qpairs, it has an unplanned limit of 99 due to 67 * the length of the internal ring name that it creates, it breaks a limit in the generic 68 * ring code and fails the qp initialization. 69 * FIXME: Reduce number of qpairs to 48, due to issue #2338 70 */ 71 #define MAX_NUM_QP 48 72 /* Global list and lock for unique device/queue pair combos */ 73 struct comp_device_qp { 74 struct compress_dev *device; /* ptr to compression device */ 75 uint8_t qp; /* queue pair for this node */ 76 struct spdk_thread *thread; /* thread that this qp is assigned to */ 77 TAILQ_ENTRY(comp_device_qp) link; 78 }; 79 static TAILQ_HEAD(, comp_device_qp) g_comp_device_qp = TAILQ_HEAD_INITIALIZER(g_comp_device_qp); 80 static pthread_mutex_t g_comp_device_qp_lock = PTHREAD_MUTEX_INITIALIZER; 81 82 /* For queueing up compression operations that we can't submit for some reason */ 83 struct vbdev_comp_op { 84 struct spdk_reduce_backing_dev *backing_dev; 85 struct iovec *src_iovs; 86 int src_iovcnt; 87 struct iovec *dst_iovs; 88 int dst_iovcnt; 89 bool compress; 90 void *cb_arg; 91 TAILQ_ENTRY(vbdev_comp_op) link; 92 }; 93 94 struct vbdev_comp_delete_ctx { 95 spdk_delete_compress_complete cb_fn; 96 void *cb_arg; 97 int cb_rc; 98 struct spdk_thread *orig_thread; 99 }; 100 101 /* List of virtual bdevs and associated info for each. */ 102 struct vbdev_compress { 103 struct spdk_bdev *base_bdev; /* the thing we're attaching to */ 104 struct spdk_bdev_desc *base_desc; /* its descriptor we get from open */ 105 struct spdk_io_channel *base_ch; /* IO channel of base device */ 106 struct spdk_bdev comp_bdev; /* the compression virtual bdev */ 107 struct comp_io_channel *comp_ch; /* channel associated with this bdev */ 108 char *drv_name; /* name of the compression device driver */ 109 struct comp_device_qp *device_qp; 110 struct spdk_thread *reduce_thread; 111 pthread_mutex_t reduce_lock; 112 uint32_t ch_count; 113 TAILQ_HEAD(, spdk_bdev_io) pending_comp_ios; /* outstanding operations to a comp library */ 114 struct spdk_poller *poller; /* completion poller */ 115 struct spdk_reduce_vol_params params; /* params for the reduce volume */ 116 struct spdk_reduce_backing_dev backing_dev; /* backing device info for the reduce volume */ 117 struct spdk_reduce_vol *vol; /* the reduce volume */ 118 struct vbdev_comp_delete_ctx *delete_ctx; 119 bool orphaned; /* base bdev claimed but comp_bdev not registered */ 120 int reduce_errno; 121 TAILQ_HEAD(, vbdev_comp_op) queued_comp_ops; 122 TAILQ_ENTRY(vbdev_compress) link; 123 struct spdk_thread *thread; /* thread where base device is opened */ 124 }; 125 static TAILQ_HEAD(, vbdev_compress) g_vbdev_comp = TAILQ_HEAD_INITIALIZER(g_vbdev_comp); 126 127 /* The comp vbdev channel struct. It is allocated and freed on my behalf by the io channel code. 128 */ 129 struct comp_io_channel { 130 struct spdk_io_channel_iter *iter; /* used with for_each_channel in reset */ 131 }; 132 133 /* Per I/O context for the compression vbdev. */ 134 struct comp_bdev_io { 135 struct comp_io_channel *comp_ch; /* used in completion handling */ 136 struct vbdev_compress *comp_bdev; /* vbdev associated with this IO */ 137 struct spdk_bdev_io_wait_entry bdev_io_wait; /* for bdev_io_wait */ 138 struct spdk_bdev_io *orig_io; /* the original IO */ 139 struct spdk_io_channel *ch; /* for resubmission */ 140 int status; /* save for completion on orig thread */ 141 }; 142 143 /* Shared mempools between all devices on this system */ 144 static struct rte_mempool *g_mbuf_mp = NULL; /* mbuf mempool */ 145 static struct rte_mempool *g_comp_op_mp = NULL; /* comp operations, must be rte* mempool */ 146 static struct rte_mbuf_ext_shared_info g_shinfo = {}; /* used by DPDK mbuf macros */ 147 static bool g_qat_available = false; 148 static bool g_isal_available = false; 149 static bool g_mlx5_pci_available = false; 150 151 /* Create shared (between all ops per PMD) compress xforms. */ 152 static struct rte_comp_xform g_comp_xform = { 153 .type = RTE_COMP_COMPRESS, 154 .compress = { 155 .algo = RTE_COMP_ALGO_DEFLATE, 156 .deflate.huffman = RTE_COMP_HUFFMAN_DEFAULT, 157 .level = RTE_COMP_LEVEL_MAX, 158 .window_size = DEFAULT_WINDOW_SIZE, 159 .chksum = RTE_COMP_CHECKSUM_NONE, 160 .hash_algo = RTE_COMP_HASH_ALGO_NONE 161 } 162 }; 163 /* Create shared (between all ops per PMD) decompress xforms. */ 164 static struct rte_comp_xform g_decomp_xform = { 165 .type = RTE_COMP_DECOMPRESS, 166 .decompress = { 167 .algo = RTE_COMP_ALGO_DEFLATE, 168 .chksum = RTE_COMP_CHECKSUM_NONE, 169 .window_size = DEFAULT_WINDOW_SIZE, 170 .hash_algo = RTE_COMP_HASH_ALGO_NONE 171 } 172 }; 173 174 static void vbdev_compress_examine(struct spdk_bdev *bdev); 175 static int vbdev_compress_claim(struct vbdev_compress *comp_bdev); 176 static void vbdev_compress_queue_io(struct spdk_bdev_io *bdev_io); 177 struct vbdev_compress *_prepare_for_load_init(struct spdk_bdev_desc *bdev_desc, uint32_t lb_size); 178 static void vbdev_compress_submit_request(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io); 179 static void comp_bdev_ch_destroy_cb(void *io_device, void *ctx_buf); 180 static void vbdev_compress_delete_done(void *cb_arg, int bdeverrno); 181 182 /* Dummy function used by DPDK to free ext attached buffers 183 * to mbufs, we free them ourselves but this callback has to 184 * be here. 185 */ 186 static void 187 shinfo_free_cb(void *arg1, void *arg2) 188 { 189 } 190 191 /* Called by vbdev_init_compress_drivers() to init each discovered compression device */ 192 static int 193 create_compress_dev(uint8_t index) 194 { 195 struct compress_dev *device; 196 uint16_t q_pairs; 197 uint8_t cdev_id; 198 int rc, i; 199 struct comp_device_qp *dev_qp; 200 struct comp_device_qp *tmp_qp; 201 202 device = calloc(1, sizeof(struct compress_dev)); 203 if (!device) { 204 return -ENOMEM; 205 } 206 207 /* Get details about this device. */ 208 rte_compressdev_info_get(index, &device->cdev_info); 209 210 cdev_id = device->cdev_id = index; 211 212 /* Zero means no limit so choose number of lcores. */ 213 if (device->cdev_info.max_nb_queue_pairs == 0) { 214 q_pairs = MAX_NUM_QP; 215 } else { 216 q_pairs = spdk_min(device->cdev_info.max_nb_queue_pairs, MAX_NUM_QP); 217 } 218 219 /* Configure the compression device. */ 220 struct rte_compressdev_config config = { 221 .socket_id = rte_socket_id(), 222 .nb_queue_pairs = q_pairs, 223 .max_nb_priv_xforms = NUM_MAX_XFORMS, 224 .max_nb_streams = 0 225 }; 226 rc = rte_compressdev_configure(cdev_id, &config); 227 if (rc < 0) { 228 SPDK_ERRLOG("Failed to configure compressdev %u\n", cdev_id); 229 goto err; 230 } 231 232 /* Pre-setup all potential qpairs now and assign them in the channel 233 * callback. 234 */ 235 for (i = 0; i < q_pairs; i++) { 236 rc = rte_compressdev_queue_pair_setup(cdev_id, i, 237 NUM_MAX_INFLIGHT_OPS, 238 rte_socket_id()); 239 if (rc) { 240 if (i > 0) { 241 q_pairs = i; 242 SPDK_NOTICELOG("FYI failed to setup a queue pair on " 243 "compressdev %u with error %u " 244 "so limiting to %u qpairs\n", 245 cdev_id, rc, q_pairs); 246 break; 247 } else { 248 SPDK_ERRLOG("Failed to setup queue pair on " 249 "compressdev %u with error %u\n", cdev_id, rc); 250 rc = -EINVAL; 251 goto err; 252 } 253 } 254 } 255 256 rc = rte_compressdev_start(cdev_id); 257 if (rc < 0) { 258 SPDK_ERRLOG("Failed to start device %u: error %d\n", 259 cdev_id, rc); 260 goto err; 261 } 262 263 if (device->cdev_info.capabilities->comp_feature_flags & RTE_COMP_FF_SHAREABLE_PRIV_XFORM) { 264 rc = rte_compressdev_private_xform_create(cdev_id, &g_comp_xform, 265 &device->comp_xform); 266 if (rc < 0) { 267 SPDK_ERRLOG("Failed to create private comp xform device %u: error %d\n", 268 cdev_id, rc); 269 goto err; 270 } 271 272 rc = rte_compressdev_private_xform_create(cdev_id, &g_decomp_xform, 273 &device->decomp_xform); 274 if (rc) { 275 SPDK_ERRLOG("Failed to create private decomp xform device %u: error %d\n", 276 cdev_id, rc); 277 goto err; 278 } 279 } else { 280 SPDK_ERRLOG("PMD does not support shared transforms\n"); 281 goto err; 282 } 283 284 /* Build up list of device/qp combinations */ 285 for (i = 0; i < q_pairs; i++) { 286 dev_qp = calloc(1, sizeof(struct comp_device_qp)); 287 if (!dev_qp) { 288 rc = -ENOMEM; 289 goto err; 290 } 291 dev_qp->device = device; 292 dev_qp->qp = i; 293 dev_qp->thread = NULL; 294 TAILQ_INSERT_TAIL(&g_comp_device_qp, dev_qp, link); 295 } 296 297 TAILQ_INSERT_TAIL(&g_compress_devs, device, link); 298 299 if (strcmp(device->cdev_info.driver_name, QAT_PMD) == 0) { 300 g_qat_available = true; 301 } 302 if (strcmp(device->cdev_info.driver_name, ISAL_PMD) == 0) { 303 g_isal_available = true; 304 } 305 if (strcmp(device->cdev_info.driver_name, MLX5_PMD) == 0) { 306 g_mlx5_pci_available = true; 307 } 308 309 return 0; 310 311 err: 312 TAILQ_FOREACH_SAFE(dev_qp, &g_comp_device_qp, link, tmp_qp) { 313 TAILQ_REMOVE(&g_comp_device_qp, dev_qp, link); 314 free(dev_qp); 315 } 316 free(device); 317 return rc; 318 } 319 320 /* Called from driver init entry point, vbdev_compress_init() */ 321 static int 322 vbdev_init_compress_drivers(void) 323 { 324 uint8_t cdev_count, i; 325 struct compress_dev *tmp_dev; 326 struct compress_dev *device; 327 int rc; 328 329 /* We always init the compress_isal PMD */ 330 rc = rte_vdev_init(ISAL_PMD, NULL); 331 if (rc == 0) { 332 SPDK_NOTICELOG("created virtual PMD %s\n", ISAL_PMD); 333 } else if (rc == -EEXIST) { 334 SPDK_NOTICELOG("virtual PMD %s already exists.\n", ISAL_PMD); 335 } else { 336 SPDK_ERRLOG("creating virtual PMD %s\n", ISAL_PMD); 337 return -EINVAL; 338 } 339 340 /* If we have no compression devices, there's no reason to continue. */ 341 cdev_count = rte_compressdev_count(); 342 if (cdev_count == 0) { 343 return 0; 344 } 345 if (cdev_count > RTE_COMPRESS_MAX_DEVS) { 346 SPDK_ERRLOG("invalid device count from rte_compressdev_count()\n"); 347 return -EINVAL; 348 } 349 350 g_mbuf_offset = rte_mbuf_dynfield_register(&rte_mbuf_dynfield_io_context); 351 if (g_mbuf_offset < 0) { 352 SPDK_ERRLOG("error registering dynamic field with DPDK\n"); 353 return -EINVAL; 354 } 355 356 g_mbuf_mp = rte_pktmbuf_pool_create("comp_mbuf_mp", NUM_MBUFS, POOL_CACHE_SIZE, 357 sizeof(struct rte_mbuf), 0, rte_socket_id()); 358 if (g_mbuf_mp == NULL) { 359 SPDK_ERRLOG("Cannot create mbuf pool\n"); 360 rc = -ENOMEM; 361 goto error_create_mbuf; 362 } 363 364 g_comp_op_mp = rte_comp_op_pool_create("comp_op_pool", NUM_MBUFS, POOL_CACHE_SIZE, 365 0, rte_socket_id()); 366 if (g_comp_op_mp == NULL) { 367 SPDK_ERRLOG("Cannot create comp op pool\n"); 368 rc = -ENOMEM; 369 goto error_create_op; 370 } 371 372 /* Init all devices */ 373 for (i = 0; i < cdev_count; i++) { 374 rc = create_compress_dev(i); 375 if (rc != 0) { 376 goto error_create_compress_devs; 377 } 378 } 379 380 if (g_qat_available == true) { 381 SPDK_NOTICELOG("initialized QAT PMD\n"); 382 } 383 384 g_shinfo.free_cb = shinfo_free_cb; 385 386 return 0; 387 388 /* Error cleanup paths. */ 389 error_create_compress_devs: 390 TAILQ_FOREACH_SAFE(device, &g_compress_devs, link, tmp_dev) { 391 TAILQ_REMOVE(&g_compress_devs, device, link); 392 free(device); 393 } 394 error_create_op: 395 error_create_mbuf: 396 rte_mempool_free(g_mbuf_mp); 397 398 return rc; 399 } 400 401 /* for completing rw requests on the orig IO thread. */ 402 static void 403 _reduce_rw_blocks_cb(void *arg) 404 { 405 struct comp_bdev_io *io_ctx = arg; 406 407 if (spdk_likely(io_ctx->status == 0)) { 408 spdk_bdev_io_complete(io_ctx->orig_io, SPDK_BDEV_IO_STATUS_SUCCESS); 409 } else if (io_ctx->status == -ENOMEM) { 410 vbdev_compress_queue_io(spdk_bdev_io_from_ctx(io_ctx)); 411 } else { 412 SPDK_ERRLOG("status %d on operation from reduce API\n", io_ctx->status); 413 spdk_bdev_io_complete(io_ctx->orig_io, SPDK_BDEV_IO_STATUS_FAILED); 414 } 415 } 416 417 /* Completion callback for r/w that were issued via reducelib. */ 418 static void 419 reduce_rw_blocks_cb(void *arg, int reduce_errno) 420 { 421 struct spdk_bdev_io *bdev_io = arg; 422 struct comp_bdev_io *io_ctx = (struct comp_bdev_io *)bdev_io->driver_ctx; 423 struct spdk_io_channel *ch = spdk_io_channel_from_ctx(io_ctx->comp_ch); 424 struct spdk_thread *orig_thread; 425 426 /* TODO: need to decide which error codes are bdev_io success vs failure; 427 * example examine calls reading metadata */ 428 429 io_ctx->status = reduce_errno; 430 431 /* Send this request to the orig IO thread. */ 432 orig_thread = spdk_io_channel_get_thread(ch); 433 434 spdk_thread_exec_msg(orig_thread, _reduce_rw_blocks_cb, io_ctx); 435 } 436 437 static int 438 _setup_compress_mbuf(struct rte_mbuf **mbufs, int *mbuf_total, uint64_t *total_length, 439 struct iovec *iovs, int iovcnt, void *reduce_cb_arg) 440 { 441 uint64_t updated_length, remainder, phys_addr; 442 uint8_t *current_base = NULL; 443 int iov_index, mbuf_index; 444 int rc = 0; 445 446 /* Setup mbufs */ 447 iov_index = mbuf_index = 0; 448 while (iov_index < iovcnt) { 449 450 current_base = iovs[iov_index].iov_base; 451 if (total_length) { 452 *total_length += iovs[iov_index].iov_len; 453 } 454 assert(mbufs[mbuf_index] != NULL); 455 *RTE_MBUF_DYNFIELD(mbufs[mbuf_index], g_mbuf_offset, uint64_t *) = (uint64_t)reduce_cb_arg; 456 updated_length = iovs[iov_index].iov_len; 457 phys_addr = spdk_vtophys((void *)current_base, &updated_length); 458 459 rte_pktmbuf_attach_extbuf(mbufs[mbuf_index], 460 current_base, 461 phys_addr, 462 updated_length, 463 &g_shinfo); 464 rte_pktmbuf_append(mbufs[mbuf_index], updated_length); 465 remainder = iovs[iov_index].iov_len - updated_length; 466 467 if (mbuf_index > 0) { 468 rte_pktmbuf_chain(mbufs[0], mbufs[mbuf_index]); 469 } 470 471 /* If we crossed 2 physical pages boundary we need another mbuf for the remainder */ 472 if (remainder > 0) { 473 /* allocate an mbuf at the end of the array */ 474 rc = rte_pktmbuf_alloc_bulk(g_mbuf_mp, 475 (struct rte_mbuf **)&mbufs[*mbuf_total], 1); 476 if (rc) { 477 SPDK_ERRLOG("ERROR trying to get an extra mbuf!\n"); 478 return -1; 479 } 480 (*mbuf_total)++; 481 mbuf_index++; 482 *RTE_MBUF_DYNFIELD(mbufs[mbuf_index], g_mbuf_offset, uint64_t *) = (uint64_t)reduce_cb_arg; 483 current_base += updated_length; 484 phys_addr = spdk_vtophys((void *)current_base, &remainder); 485 /* assert we don't cross another */ 486 assert(remainder == iovs[iov_index].iov_len - updated_length); 487 488 rte_pktmbuf_attach_extbuf(mbufs[mbuf_index], 489 current_base, 490 phys_addr, 491 remainder, 492 &g_shinfo); 493 rte_pktmbuf_append(mbufs[mbuf_index], remainder); 494 rte_pktmbuf_chain(mbufs[0], mbufs[mbuf_index]); 495 } 496 iov_index++; 497 mbuf_index++; 498 } 499 500 return 0; 501 } 502 503 static int 504 _compress_operation(struct spdk_reduce_backing_dev *backing_dev, struct iovec *src_iovs, 505 int src_iovcnt, struct iovec *dst_iovs, 506 int dst_iovcnt, bool compress, void *cb_arg) 507 { 508 void *reduce_cb_arg = cb_arg; 509 struct vbdev_compress *comp_bdev = SPDK_CONTAINEROF(backing_dev, struct vbdev_compress, 510 backing_dev); 511 struct rte_comp_op *comp_op; 512 struct rte_mbuf *src_mbufs[MAX_MBUFS_PER_OP]; 513 struct rte_mbuf *dst_mbufs[MAX_MBUFS_PER_OP]; 514 uint8_t cdev_id = comp_bdev->device_qp->device->cdev_id; 515 uint64_t total_length = 0; 516 int rc = 0; 517 struct vbdev_comp_op *op_to_queue; 518 int src_mbuf_total = src_iovcnt; 519 int dst_mbuf_total = dst_iovcnt; 520 bool device_error = false; 521 522 assert(src_iovcnt < MAX_MBUFS_PER_OP); 523 524 #ifdef DEBUG 525 memset(src_mbufs, 0, sizeof(src_mbufs)); 526 memset(dst_mbufs, 0, sizeof(dst_mbufs)); 527 #endif 528 529 comp_op = rte_comp_op_alloc(g_comp_op_mp); 530 if (!comp_op) { 531 SPDK_ERRLOG("trying to get a comp op!\n"); 532 rc = -ENOMEM; 533 goto error_get_op; 534 } 535 536 /* get an mbuf per iov, src and dst */ 537 rc = rte_pktmbuf_alloc_bulk(g_mbuf_mp, (struct rte_mbuf **)&src_mbufs[0], src_iovcnt); 538 if (rc) { 539 SPDK_ERRLOG("ERROR trying to get src_mbufs!\n"); 540 rc = -ENOMEM; 541 goto error_get_src; 542 } 543 assert(src_mbufs[0]); 544 545 rc = rte_pktmbuf_alloc_bulk(g_mbuf_mp, (struct rte_mbuf **)&dst_mbufs[0], dst_iovcnt); 546 if (rc) { 547 SPDK_ERRLOG("ERROR trying to get dst_mbufs!\n"); 548 rc = -ENOMEM; 549 goto error_get_dst; 550 } 551 assert(dst_mbufs[0]); 552 553 /* There is a 1:1 mapping between a bdev_io and a compression operation 554 * Some PMDs that SPDK uses don't support chaining, but reduce library should 555 * provide correct buffers 556 * Build our mbuf chain and associate it with our single comp_op. 557 */ 558 rc = _setup_compress_mbuf(src_mbufs, &src_mbuf_total, &total_length, 559 src_iovs, src_iovcnt, reduce_cb_arg); 560 if (rc < 0) { 561 goto error_src_dst; 562 } 563 if (!comp_bdev->backing_dev.sgl_in && src_mbufs[0]->next != NULL) { 564 if (src_iovcnt == 1) { 565 SPDK_ERRLOG("Src buffer crosses physical page boundary but driver %s doesn't support SGL input\n", 566 comp_bdev->drv_name); 567 } else { 568 SPDK_ERRLOG("Driver %s doesn't support SGL input\n", comp_bdev->drv_name); 569 } 570 rc = -EINVAL; 571 goto error_src_dst; 572 } 573 574 comp_op->m_src = src_mbufs[0]; 575 comp_op->src.offset = 0; 576 comp_op->src.length = total_length; 577 578 rc = _setup_compress_mbuf(dst_mbufs, &dst_mbuf_total, NULL, 579 dst_iovs, dst_iovcnt, reduce_cb_arg); 580 if (rc < 0) { 581 goto error_src_dst; 582 } 583 if (!comp_bdev->backing_dev.sgl_out && dst_mbufs[0]->next != NULL) { 584 if (dst_iovcnt == 1) { 585 SPDK_ERRLOG("Dst buffer crosses physical page boundary but driver %s doesn't support SGL output\n", 586 comp_bdev->drv_name); 587 } else { 588 SPDK_ERRLOG("Driver %s doesn't support SGL output\n", comp_bdev->drv_name); 589 } 590 rc = -EINVAL; 591 goto error_src_dst; 592 } 593 594 comp_op->m_dst = dst_mbufs[0]; 595 comp_op->dst.offset = 0; 596 597 if (compress == true) { 598 comp_op->private_xform = comp_bdev->device_qp->device->comp_xform; 599 } else { 600 comp_op->private_xform = comp_bdev->device_qp->device->decomp_xform; 601 } 602 603 comp_op->op_type = RTE_COMP_OP_STATELESS; 604 comp_op->flush_flag = RTE_COMP_FLUSH_FINAL; 605 606 rc = rte_compressdev_enqueue_burst(cdev_id, comp_bdev->device_qp->qp, &comp_op, 1); 607 assert(rc <= 1); 608 609 /* We always expect 1 got queued, if 0 then we need to queue it up. */ 610 if (rc == 1) { 611 return 0; 612 } else if (comp_op->status == RTE_COMP_OP_STATUS_NOT_PROCESSED) { 613 rc = -EAGAIN; 614 } else { 615 device_error = true; 616 } 617 618 /* Error cleanup paths. */ 619 error_src_dst: 620 rte_pktmbuf_free_bulk(dst_mbufs, dst_iovcnt); 621 error_get_dst: 622 rte_pktmbuf_free_bulk(src_mbufs, src_iovcnt); 623 error_get_src: 624 rte_comp_op_free(comp_op); 625 error_get_op: 626 627 if (device_error == true) { 628 /* There was an error sending the op to the device, most 629 * likely with the parameters. 630 */ 631 SPDK_ERRLOG("Compression API returned 0x%x\n", comp_op->status); 632 return -EINVAL; 633 } 634 if (rc != -ENOMEM && rc != -EAGAIN) { 635 return rc; 636 } 637 638 op_to_queue = calloc(1, sizeof(struct vbdev_comp_op)); 639 if (op_to_queue == NULL) { 640 SPDK_ERRLOG("unable to allocate operation for queueing.\n"); 641 return -ENOMEM; 642 } 643 op_to_queue->backing_dev = backing_dev; 644 op_to_queue->src_iovs = src_iovs; 645 op_to_queue->src_iovcnt = src_iovcnt; 646 op_to_queue->dst_iovs = dst_iovs; 647 op_to_queue->dst_iovcnt = dst_iovcnt; 648 op_to_queue->compress = compress; 649 op_to_queue->cb_arg = cb_arg; 650 TAILQ_INSERT_TAIL(&comp_bdev->queued_comp_ops, 651 op_to_queue, 652 link); 653 return 0; 654 } 655 656 /* Poller for the DPDK compression driver. */ 657 static int 658 comp_dev_poller(void *args) 659 { 660 struct vbdev_compress *comp_bdev = args; 661 uint8_t cdev_id = comp_bdev->device_qp->device->cdev_id; 662 struct rte_comp_op *deq_ops[NUM_MAX_INFLIGHT_OPS]; 663 uint16_t num_deq; 664 struct spdk_reduce_vol_cb_args *reduce_args; 665 struct vbdev_comp_op *op_to_resubmit; 666 int rc, i; 667 668 num_deq = rte_compressdev_dequeue_burst(cdev_id, comp_bdev->device_qp->qp, deq_ops, 669 NUM_MAX_INFLIGHT_OPS); 670 for (i = 0; i < num_deq; i++) { 671 reduce_args = (struct spdk_reduce_vol_cb_args *)*RTE_MBUF_DYNFIELD(deq_ops[i]->m_src, g_mbuf_offset, 672 uint64_t *); 673 if (deq_ops[i]->status == RTE_COMP_OP_STATUS_SUCCESS) { 674 675 /* tell reduce this is done and what the bytecount was */ 676 reduce_args->cb_fn(reduce_args->cb_arg, deq_ops[i]->produced); 677 } else { 678 SPDK_NOTICELOG("FYI storing data uncompressed due to deque status %u\n", 679 deq_ops[i]->status); 680 681 /* Reduce will simply store uncompressed on neg errno value. */ 682 reduce_args->cb_fn(reduce_args->cb_arg, -EINVAL); 683 } 684 685 /* Now free both mbufs and the compress operation. The rte_pktmbuf_free() 686 * call takes care of freeing all of the mbufs in the chain back to their 687 * original pool. 688 */ 689 rte_pktmbuf_free(deq_ops[i]->m_src); 690 rte_pktmbuf_free(deq_ops[i]->m_dst); 691 692 /* There is no bulk free for com ops so we have to free them one at a time 693 * here however it would be rare that we'd ever have more than 1 at a time 694 * anyways. 695 */ 696 rte_comp_op_free(deq_ops[i]); 697 698 /* Check if there are any pending comp ops to process, only pull one 699 * at a time off as _compress_operation() may re-queue the op. 700 */ 701 if (!TAILQ_EMPTY(&comp_bdev->queued_comp_ops)) { 702 op_to_resubmit = TAILQ_FIRST(&comp_bdev->queued_comp_ops); 703 rc = _compress_operation(op_to_resubmit->backing_dev, 704 op_to_resubmit->src_iovs, 705 op_to_resubmit->src_iovcnt, 706 op_to_resubmit->dst_iovs, 707 op_to_resubmit->dst_iovcnt, 708 op_to_resubmit->compress, 709 op_to_resubmit->cb_arg); 710 if (rc == 0) { 711 TAILQ_REMOVE(&comp_bdev->queued_comp_ops, op_to_resubmit, link); 712 free(op_to_resubmit); 713 } 714 } 715 } 716 return num_deq == 0 ? SPDK_POLLER_IDLE : SPDK_POLLER_BUSY; 717 } 718 719 /* Entry point for reduce lib to issue a compress operation. */ 720 static void 721 _comp_reduce_compress(struct spdk_reduce_backing_dev *dev, 722 struct iovec *src_iovs, int src_iovcnt, 723 struct iovec *dst_iovs, int dst_iovcnt, 724 struct spdk_reduce_vol_cb_args *cb_arg) 725 { 726 int rc; 727 728 rc = _compress_operation(dev, src_iovs, src_iovcnt, dst_iovs, dst_iovcnt, true, cb_arg); 729 if (rc) { 730 SPDK_ERRLOG("with compress operation code %d (%s)\n", rc, spdk_strerror(-rc)); 731 cb_arg->cb_fn(cb_arg->cb_arg, rc); 732 } 733 } 734 735 /* Entry point for reduce lib to issue a decompress operation. */ 736 static void 737 _comp_reduce_decompress(struct spdk_reduce_backing_dev *dev, 738 struct iovec *src_iovs, int src_iovcnt, 739 struct iovec *dst_iovs, int dst_iovcnt, 740 struct spdk_reduce_vol_cb_args *cb_arg) 741 { 742 int rc; 743 744 rc = _compress_operation(dev, src_iovs, src_iovcnt, dst_iovs, dst_iovcnt, false, cb_arg); 745 if (rc) { 746 SPDK_ERRLOG("with decompress operation code %d (%s)\n", rc, spdk_strerror(-rc)); 747 cb_arg->cb_fn(cb_arg->cb_arg, rc); 748 } 749 } 750 751 static void 752 _comp_submit_write(void *ctx) 753 { 754 struct spdk_bdev_io *bdev_io = ctx; 755 struct vbdev_compress *comp_bdev = SPDK_CONTAINEROF(bdev_io->bdev, struct vbdev_compress, 756 comp_bdev); 757 758 spdk_reduce_vol_writev(comp_bdev->vol, bdev_io->u.bdev.iovs, bdev_io->u.bdev.iovcnt, 759 bdev_io->u.bdev.offset_blocks, bdev_io->u.bdev.num_blocks, 760 reduce_rw_blocks_cb, bdev_io); 761 } 762 763 static void 764 _comp_submit_read(void *ctx) 765 { 766 struct spdk_bdev_io *bdev_io = ctx; 767 struct vbdev_compress *comp_bdev = SPDK_CONTAINEROF(bdev_io->bdev, struct vbdev_compress, 768 comp_bdev); 769 770 spdk_reduce_vol_readv(comp_bdev->vol, bdev_io->u.bdev.iovs, bdev_io->u.bdev.iovcnt, 771 bdev_io->u.bdev.offset_blocks, bdev_io->u.bdev.num_blocks, 772 reduce_rw_blocks_cb, bdev_io); 773 } 774 775 776 /* Callback for getting a buf from the bdev pool in the event that the caller passed 777 * in NULL, we need to own the buffer so it doesn't get freed by another vbdev module 778 * beneath us before we're done with it. 779 */ 780 static void 781 comp_read_get_buf_cb(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io, bool success) 782 { 783 struct vbdev_compress *comp_bdev = SPDK_CONTAINEROF(bdev_io->bdev, struct vbdev_compress, 784 comp_bdev); 785 786 if (spdk_unlikely(!success)) { 787 SPDK_ERRLOG("Failed to get data buffer\n"); 788 reduce_rw_blocks_cb(bdev_io, -ENOMEM); 789 return; 790 } 791 792 spdk_thread_exec_msg(comp_bdev->reduce_thread, _comp_submit_read, bdev_io); 793 } 794 795 /* Called when someone above submits IO to this vbdev. */ 796 static void 797 vbdev_compress_submit_request(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io) 798 { 799 struct comp_bdev_io *io_ctx = (struct comp_bdev_io *)bdev_io->driver_ctx; 800 struct vbdev_compress *comp_bdev = SPDK_CONTAINEROF(bdev_io->bdev, struct vbdev_compress, 801 comp_bdev); 802 struct comp_io_channel *comp_ch = spdk_io_channel_get_ctx(ch); 803 804 memset(io_ctx, 0, sizeof(struct comp_bdev_io)); 805 io_ctx->comp_bdev = comp_bdev; 806 io_ctx->comp_ch = comp_ch; 807 io_ctx->orig_io = bdev_io; 808 809 switch (bdev_io->type) { 810 case SPDK_BDEV_IO_TYPE_READ: 811 spdk_bdev_io_get_buf(bdev_io, comp_read_get_buf_cb, 812 bdev_io->u.bdev.num_blocks * bdev_io->bdev->blocklen); 813 return; 814 case SPDK_BDEV_IO_TYPE_WRITE: 815 spdk_thread_exec_msg(comp_bdev->reduce_thread, _comp_submit_write, bdev_io); 816 return; 817 /* TODO support RESET in future patch in the series */ 818 case SPDK_BDEV_IO_TYPE_RESET: 819 case SPDK_BDEV_IO_TYPE_WRITE_ZEROES: 820 case SPDK_BDEV_IO_TYPE_UNMAP: 821 case SPDK_BDEV_IO_TYPE_FLUSH: 822 default: 823 SPDK_ERRLOG("Unknown I/O type %d\n", bdev_io->type); 824 spdk_bdev_io_complete(io_ctx->orig_io, SPDK_BDEV_IO_STATUS_FAILED); 825 break; 826 } 827 } 828 829 static bool 830 vbdev_compress_io_type_supported(void *ctx, enum spdk_bdev_io_type io_type) 831 { 832 struct vbdev_compress *comp_bdev = (struct vbdev_compress *)ctx; 833 834 switch (io_type) { 835 case SPDK_BDEV_IO_TYPE_READ: 836 case SPDK_BDEV_IO_TYPE_WRITE: 837 return spdk_bdev_io_type_supported(comp_bdev->base_bdev, io_type); 838 case SPDK_BDEV_IO_TYPE_UNMAP: 839 case SPDK_BDEV_IO_TYPE_RESET: 840 case SPDK_BDEV_IO_TYPE_FLUSH: 841 case SPDK_BDEV_IO_TYPE_WRITE_ZEROES: 842 default: 843 return false; 844 } 845 } 846 847 /* Resubmission function used by the bdev layer when a queued IO is ready to be 848 * submitted. 849 */ 850 static void 851 vbdev_compress_resubmit_io(void *arg) 852 { 853 struct spdk_bdev_io *bdev_io = (struct spdk_bdev_io *)arg; 854 struct comp_bdev_io *io_ctx = (struct comp_bdev_io *)bdev_io->driver_ctx; 855 856 vbdev_compress_submit_request(io_ctx->ch, bdev_io); 857 } 858 859 /* Used to queue an IO in the event of resource issues. */ 860 static void 861 vbdev_compress_queue_io(struct spdk_bdev_io *bdev_io) 862 { 863 struct comp_bdev_io *io_ctx = (struct comp_bdev_io *)bdev_io->driver_ctx; 864 int rc; 865 866 io_ctx->bdev_io_wait.bdev = bdev_io->bdev; 867 io_ctx->bdev_io_wait.cb_fn = vbdev_compress_resubmit_io; 868 io_ctx->bdev_io_wait.cb_arg = bdev_io; 869 870 rc = spdk_bdev_queue_io_wait(bdev_io->bdev, io_ctx->comp_bdev->base_ch, &io_ctx->bdev_io_wait); 871 if (rc) { 872 SPDK_ERRLOG("Queue io failed in vbdev_compress_queue_io, rc=%d.\n", rc); 873 assert(false); 874 spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED); 875 } 876 } 877 878 /* Callback for unregistering the IO device. */ 879 static void 880 _device_unregister_cb(void *io_device) 881 { 882 struct vbdev_compress *comp_bdev = io_device; 883 884 /* Done with this comp_bdev. */ 885 pthread_mutex_destroy(&comp_bdev->reduce_lock); 886 free(comp_bdev->comp_bdev.name); 887 free(comp_bdev); 888 } 889 890 static void 891 _vbdev_compress_destruct_cb(void *ctx) 892 { 893 struct vbdev_compress *comp_bdev = ctx; 894 895 TAILQ_REMOVE(&g_vbdev_comp, comp_bdev, link); 896 spdk_bdev_module_release_bdev(comp_bdev->base_bdev); 897 /* Close the underlying bdev on its same opened thread. */ 898 spdk_bdev_close(comp_bdev->base_desc); 899 comp_bdev->vol = NULL; 900 if (comp_bdev->orphaned == false) { 901 spdk_io_device_unregister(comp_bdev, _device_unregister_cb); 902 } else { 903 vbdev_compress_delete_done(comp_bdev->delete_ctx, 0); 904 _device_unregister_cb(comp_bdev); 905 } 906 } 907 908 static void 909 vbdev_compress_destruct_cb(void *cb_arg, int reduce_errno) 910 { 911 struct vbdev_compress *comp_bdev = (struct vbdev_compress *)cb_arg; 912 913 if (reduce_errno) { 914 SPDK_ERRLOG("number %d\n", reduce_errno); 915 } else { 916 if (comp_bdev->thread && comp_bdev->thread != spdk_get_thread()) { 917 spdk_thread_send_msg(comp_bdev->thread, 918 _vbdev_compress_destruct_cb, comp_bdev); 919 } else { 920 _vbdev_compress_destruct_cb(comp_bdev); 921 } 922 } 923 } 924 925 static void 926 _reduce_destroy_cb(void *ctx, int reduce_errno) 927 { 928 struct vbdev_compress *comp_bdev = (struct vbdev_compress *)ctx; 929 930 if (reduce_errno) { 931 SPDK_ERRLOG("number %d\n", reduce_errno); 932 } 933 934 comp_bdev->vol = NULL; 935 spdk_put_io_channel(comp_bdev->base_ch); 936 if (comp_bdev->orphaned == false) { 937 spdk_bdev_unregister(&comp_bdev->comp_bdev, vbdev_compress_delete_done, 938 comp_bdev->delete_ctx); 939 } else { 940 vbdev_compress_destruct_cb((void *)comp_bdev, 0); 941 } 942 943 } 944 945 static void 946 _delete_vol_unload_cb(void *ctx) 947 { 948 struct vbdev_compress *comp_bdev = ctx; 949 950 /* FIXME: Assert if these conditions are not satisfied for now. */ 951 assert(!comp_bdev->reduce_thread || 952 comp_bdev->reduce_thread == spdk_get_thread()); 953 954 /* reducelib needs a channel to comm with the backing device */ 955 comp_bdev->base_ch = spdk_bdev_get_io_channel(comp_bdev->base_desc); 956 957 /* Clean the device before we free our resources. */ 958 spdk_reduce_vol_destroy(&comp_bdev->backing_dev, _reduce_destroy_cb, comp_bdev); 959 } 960 961 /* Called by reduceLib after performing unload vol actions */ 962 static void 963 delete_vol_unload_cb(void *cb_arg, int reduce_errno) 964 { 965 struct vbdev_compress *comp_bdev = (struct vbdev_compress *)cb_arg; 966 967 if (reduce_errno) { 968 SPDK_ERRLOG("number %d\n", reduce_errno); 969 /* FIXME: callback should be executed. */ 970 return; 971 } 972 973 pthread_mutex_lock(&comp_bdev->reduce_lock); 974 if (comp_bdev->reduce_thread && comp_bdev->reduce_thread != spdk_get_thread()) { 975 spdk_thread_send_msg(comp_bdev->reduce_thread, 976 _delete_vol_unload_cb, comp_bdev); 977 pthread_mutex_unlock(&comp_bdev->reduce_lock); 978 } else { 979 pthread_mutex_unlock(&comp_bdev->reduce_lock); 980 981 _delete_vol_unload_cb(comp_bdev); 982 } 983 } 984 985 const char * 986 compress_get_name(const struct vbdev_compress *comp_bdev) 987 { 988 return comp_bdev->comp_bdev.name; 989 } 990 991 struct vbdev_compress * 992 compress_bdev_first(void) 993 { 994 struct vbdev_compress *comp_bdev; 995 996 comp_bdev = TAILQ_FIRST(&g_vbdev_comp); 997 998 return comp_bdev; 999 } 1000 1001 struct vbdev_compress * 1002 compress_bdev_next(struct vbdev_compress *prev) 1003 { 1004 struct vbdev_compress *comp_bdev; 1005 1006 comp_bdev = TAILQ_NEXT(prev, link); 1007 1008 return comp_bdev; 1009 } 1010 1011 bool 1012 compress_has_orphan(const char *name) 1013 { 1014 struct vbdev_compress *comp_bdev; 1015 1016 TAILQ_FOREACH(comp_bdev, &g_vbdev_comp, link) { 1017 if (comp_bdev->orphaned && strcmp(name, comp_bdev->comp_bdev.name) == 0) { 1018 return true; 1019 } 1020 } 1021 return false; 1022 } 1023 1024 /* Called after we've unregistered following a hot remove callback. 1025 * Our finish entry point will be called next. 1026 */ 1027 static int 1028 vbdev_compress_destruct(void *ctx) 1029 { 1030 struct vbdev_compress *comp_bdev = (struct vbdev_compress *)ctx; 1031 1032 if (comp_bdev->vol != NULL) { 1033 /* Tell reducelib that we're done with this volume. */ 1034 spdk_reduce_vol_unload(comp_bdev->vol, vbdev_compress_destruct_cb, comp_bdev); 1035 } else { 1036 vbdev_compress_destruct_cb(comp_bdev, 0); 1037 } 1038 1039 return 0; 1040 } 1041 1042 /* We supplied this as an entry point for upper layers who want to communicate to this 1043 * bdev. This is how they get a channel. 1044 */ 1045 static struct spdk_io_channel * 1046 vbdev_compress_get_io_channel(void *ctx) 1047 { 1048 struct vbdev_compress *comp_bdev = (struct vbdev_compress *)ctx; 1049 1050 /* The IO channel code will allocate a channel for us which consists of 1051 * the SPDK channel structure plus the size of our comp_io_channel struct 1052 * that we passed in when we registered our IO device. It will then call 1053 * our channel create callback to populate any elements that we need to 1054 * update. 1055 */ 1056 return spdk_get_io_channel(comp_bdev); 1057 } 1058 1059 /* This is the output for bdev_get_bdevs() for this vbdev */ 1060 static int 1061 vbdev_compress_dump_info_json(void *ctx, struct spdk_json_write_ctx *w) 1062 { 1063 struct vbdev_compress *comp_bdev = (struct vbdev_compress *)ctx; 1064 1065 spdk_json_write_name(w, "compress"); 1066 spdk_json_write_object_begin(w); 1067 spdk_json_write_named_string(w, "name", spdk_bdev_get_name(&comp_bdev->comp_bdev)); 1068 spdk_json_write_named_string(w, "base_bdev_name", spdk_bdev_get_name(comp_bdev->base_bdev)); 1069 spdk_json_write_named_string(w, "compression_pmd", comp_bdev->drv_name); 1070 spdk_json_write_object_end(w); 1071 1072 return 0; 1073 } 1074 1075 /* This is used to generate JSON that can configure this module to its current state. */ 1076 static int 1077 vbdev_compress_config_json(struct spdk_json_write_ctx *w) 1078 { 1079 struct vbdev_compress *comp_bdev; 1080 1081 TAILQ_FOREACH(comp_bdev, &g_vbdev_comp, link) { 1082 spdk_json_write_object_begin(w); 1083 spdk_json_write_named_string(w, "method", "bdev_compress_create"); 1084 spdk_json_write_named_object_begin(w, "params"); 1085 spdk_json_write_named_string(w, "base_bdev_name", spdk_bdev_get_name(comp_bdev->base_bdev)); 1086 spdk_json_write_named_string(w, "name", spdk_bdev_get_name(&comp_bdev->comp_bdev)); 1087 spdk_json_write_named_string(w, "compression_pmd", comp_bdev->drv_name); 1088 spdk_json_write_object_end(w); 1089 spdk_json_write_object_end(w); 1090 } 1091 return 0; 1092 } 1093 1094 static void 1095 _vbdev_reduce_init_cb(void *ctx) 1096 { 1097 struct vbdev_compress *meta_ctx = ctx; 1098 int rc; 1099 1100 assert(meta_ctx->base_desc != NULL); 1101 1102 /* We're done with metadata operations */ 1103 spdk_put_io_channel(meta_ctx->base_ch); 1104 1105 if (meta_ctx->vol) { 1106 rc = vbdev_compress_claim(meta_ctx); 1107 if (rc == 0) { 1108 return; 1109 } 1110 } 1111 1112 /* Close the underlying bdev on its same opened thread. */ 1113 spdk_bdev_close(meta_ctx->base_desc); 1114 free(meta_ctx); 1115 } 1116 1117 /* Callback from reduce for when init is complete. We'll pass the vbdev_comp struct 1118 * used for initial metadata operations to claim where it will be further filled out 1119 * and added to the global list. 1120 */ 1121 static void 1122 vbdev_reduce_init_cb(void *cb_arg, struct spdk_reduce_vol *vol, int reduce_errno) 1123 { 1124 struct vbdev_compress *meta_ctx = cb_arg; 1125 1126 if (reduce_errno == 0) { 1127 meta_ctx->vol = vol; 1128 } else { 1129 SPDK_ERRLOG("for vol %s, error %u\n", 1130 spdk_bdev_get_name(meta_ctx->base_bdev), reduce_errno); 1131 } 1132 1133 if (meta_ctx->thread && meta_ctx->thread != spdk_get_thread()) { 1134 spdk_thread_send_msg(meta_ctx->thread, _vbdev_reduce_init_cb, meta_ctx); 1135 } else { 1136 _vbdev_reduce_init_cb(meta_ctx); 1137 } 1138 } 1139 1140 /* Callback for the function used by reduceLib to perform IO to/from the backing device. We just 1141 * call the callback provided by reduceLib when it called the read/write/unmap function and 1142 * free the bdev_io. 1143 */ 1144 static void 1145 comp_reduce_io_cb(struct spdk_bdev_io *bdev_io, bool success, void *arg) 1146 { 1147 struct spdk_reduce_vol_cb_args *cb_args = arg; 1148 int reduce_errno; 1149 1150 if (success) { 1151 reduce_errno = 0; 1152 } else { 1153 reduce_errno = -EIO; 1154 } 1155 spdk_bdev_free_io(bdev_io); 1156 cb_args->cb_fn(cb_args->cb_arg, reduce_errno); 1157 } 1158 1159 /* This is the function provided to the reduceLib for sending reads directly to 1160 * the backing device. 1161 */ 1162 static void 1163 _comp_reduce_readv(struct spdk_reduce_backing_dev *dev, struct iovec *iov, int iovcnt, 1164 uint64_t lba, uint32_t lba_count, struct spdk_reduce_vol_cb_args *args) 1165 { 1166 struct vbdev_compress *comp_bdev = SPDK_CONTAINEROF(dev, struct vbdev_compress, 1167 backing_dev); 1168 int rc; 1169 1170 rc = spdk_bdev_readv_blocks(comp_bdev->base_desc, comp_bdev->base_ch, 1171 iov, iovcnt, lba, lba_count, 1172 comp_reduce_io_cb, 1173 args); 1174 if (rc) { 1175 if (rc == -ENOMEM) { 1176 SPDK_ERRLOG("No memory, start to queue io.\n"); 1177 /* TODO: there's no bdev_io to queue */ 1178 } else { 1179 SPDK_ERRLOG("submitting readv request\n"); 1180 } 1181 args->cb_fn(args->cb_arg, rc); 1182 } 1183 } 1184 1185 /* This is the function provided to the reduceLib for sending writes directly to 1186 * the backing device. 1187 */ 1188 static void 1189 _comp_reduce_writev(struct spdk_reduce_backing_dev *dev, struct iovec *iov, int iovcnt, 1190 uint64_t lba, uint32_t lba_count, struct spdk_reduce_vol_cb_args *args) 1191 { 1192 struct vbdev_compress *comp_bdev = SPDK_CONTAINEROF(dev, struct vbdev_compress, 1193 backing_dev); 1194 int rc; 1195 1196 rc = spdk_bdev_writev_blocks(comp_bdev->base_desc, comp_bdev->base_ch, 1197 iov, iovcnt, lba, lba_count, 1198 comp_reduce_io_cb, 1199 args); 1200 if (rc) { 1201 if (rc == -ENOMEM) { 1202 SPDK_ERRLOG("No memory, start to queue io.\n"); 1203 /* TODO: there's no bdev_io to queue */ 1204 } else { 1205 SPDK_ERRLOG("error submitting writev request\n"); 1206 } 1207 args->cb_fn(args->cb_arg, rc); 1208 } 1209 } 1210 1211 /* This is the function provided to the reduceLib for sending unmaps directly to 1212 * the backing device. 1213 */ 1214 static void 1215 _comp_reduce_unmap(struct spdk_reduce_backing_dev *dev, 1216 uint64_t lba, uint32_t lba_count, struct spdk_reduce_vol_cb_args *args) 1217 { 1218 struct vbdev_compress *comp_bdev = SPDK_CONTAINEROF(dev, struct vbdev_compress, 1219 backing_dev); 1220 int rc; 1221 1222 rc = spdk_bdev_unmap_blocks(comp_bdev->base_desc, comp_bdev->base_ch, 1223 lba, lba_count, 1224 comp_reduce_io_cb, 1225 args); 1226 1227 if (rc) { 1228 if (rc == -ENOMEM) { 1229 SPDK_ERRLOG("No memory, start to queue io.\n"); 1230 /* TODO: there's no bdev_io to queue */ 1231 } else { 1232 SPDK_ERRLOG("submitting unmap request\n"); 1233 } 1234 args->cb_fn(args->cb_arg, rc); 1235 } 1236 } 1237 1238 /* Called by reduceLib after performing unload vol actions following base bdev hotremove */ 1239 static void 1240 bdev_hotremove_vol_unload_cb(void *cb_arg, int reduce_errno) 1241 { 1242 struct vbdev_compress *comp_bdev = (struct vbdev_compress *)cb_arg; 1243 1244 if (reduce_errno) { 1245 SPDK_ERRLOG("number %d\n", reduce_errno); 1246 } 1247 1248 comp_bdev->vol = NULL; 1249 spdk_bdev_unregister(&comp_bdev->comp_bdev, NULL, NULL); 1250 } 1251 1252 static void 1253 vbdev_compress_base_bdev_hotremove_cb(struct spdk_bdev *bdev_find) 1254 { 1255 struct vbdev_compress *comp_bdev, *tmp; 1256 1257 TAILQ_FOREACH_SAFE(comp_bdev, &g_vbdev_comp, link, tmp) { 1258 if (bdev_find == comp_bdev->base_bdev) { 1259 /* Tell reduceLib that we're done with this volume. */ 1260 spdk_reduce_vol_unload(comp_bdev->vol, bdev_hotremove_vol_unload_cb, comp_bdev); 1261 } 1262 } 1263 } 1264 1265 /* Called when the underlying base bdev triggers asynchronous event such as bdev removal. */ 1266 static void 1267 vbdev_compress_base_bdev_event_cb(enum spdk_bdev_event_type type, struct spdk_bdev *bdev, 1268 void *event_ctx) 1269 { 1270 switch (type) { 1271 case SPDK_BDEV_EVENT_REMOVE: 1272 vbdev_compress_base_bdev_hotremove_cb(bdev); 1273 break; 1274 default: 1275 SPDK_NOTICELOG("Unsupported bdev event: type %d\n", type); 1276 break; 1277 } 1278 } 1279 1280 /* TODO: determine which parms we want user configurable, HC for now 1281 * params.vol_size 1282 * params.chunk_size 1283 * compression PMD, algorithm, window size, comp level, etc. 1284 * DEV_MD_PATH 1285 */ 1286 1287 /* Common function for init and load to allocate and populate the minimal 1288 * information for reducelib to init or load. 1289 */ 1290 struct vbdev_compress * 1291 _prepare_for_load_init(struct spdk_bdev_desc *bdev_desc, uint32_t lb_size) 1292 { 1293 struct vbdev_compress *meta_ctx; 1294 struct spdk_bdev *bdev; 1295 1296 meta_ctx = calloc(1, sizeof(struct vbdev_compress)); 1297 if (meta_ctx == NULL) { 1298 SPDK_ERRLOG("failed to alloc init contexts\n"); 1299 return NULL; 1300 } 1301 1302 meta_ctx->drv_name = "None"; 1303 meta_ctx->backing_dev.unmap = _comp_reduce_unmap; 1304 meta_ctx->backing_dev.readv = _comp_reduce_readv; 1305 meta_ctx->backing_dev.writev = _comp_reduce_writev; 1306 meta_ctx->backing_dev.compress = _comp_reduce_compress; 1307 meta_ctx->backing_dev.decompress = _comp_reduce_decompress; 1308 1309 meta_ctx->base_desc = bdev_desc; 1310 bdev = spdk_bdev_desc_get_bdev(bdev_desc); 1311 meta_ctx->base_bdev = bdev; 1312 1313 meta_ctx->backing_dev.blocklen = bdev->blocklen; 1314 meta_ctx->backing_dev.blockcnt = bdev->blockcnt; 1315 1316 meta_ctx->params.chunk_size = CHUNK_SIZE; 1317 if (lb_size == 0) { 1318 meta_ctx->params.logical_block_size = bdev->blocklen; 1319 } else { 1320 meta_ctx->params.logical_block_size = lb_size; 1321 } 1322 1323 meta_ctx->params.backing_io_unit_size = BACKING_IO_SZ; 1324 return meta_ctx; 1325 } 1326 1327 static bool 1328 _set_pmd(struct vbdev_compress *comp_dev) 1329 { 1330 if (g_opts == COMPRESS_PMD_AUTO) { 1331 if (g_qat_available) { 1332 comp_dev->drv_name = QAT_PMD; 1333 } else if (g_mlx5_pci_available) { 1334 comp_dev->drv_name = MLX5_PMD; 1335 } else { 1336 comp_dev->drv_name = ISAL_PMD; 1337 } 1338 } else if (g_opts == COMPRESS_PMD_QAT_ONLY && g_qat_available) { 1339 comp_dev->drv_name = QAT_PMD; 1340 } else if (g_opts == COMPRESS_PMD_ISAL_ONLY && g_isal_available) { 1341 comp_dev->drv_name = ISAL_PMD; 1342 } else if (g_opts == COMPRESS_PMD_MLX5_PCI_ONLY && g_mlx5_pci_available) { 1343 comp_dev->drv_name = MLX5_PMD; 1344 } else { 1345 SPDK_ERRLOG("Requested PMD is not available.\n"); 1346 return false; 1347 } 1348 SPDK_NOTICELOG("PMD being used: %s\n", comp_dev->drv_name); 1349 return true; 1350 } 1351 1352 /* Call reducelib to initialize a new volume */ 1353 static int 1354 vbdev_init_reduce(const char *bdev_name, const char *pm_path, uint32_t lb_size) 1355 { 1356 struct spdk_bdev_desc *bdev_desc = NULL; 1357 struct vbdev_compress *meta_ctx; 1358 int rc; 1359 1360 rc = spdk_bdev_open_ext(bdev_name, true, vbdev_compress_base_bdev_event_cb, 1361 NULL, &bdev_desc); 1362 if (rc) { 1363 SPDK_ERRLOG("could not open bdev %s\n", bdev_name); 1364 return rc; 1365 } 1366 1367 meta_ctx = _prepare_for_load_init(bdev_desc, lb_size); 1368 if (meta_ctx == NULL) { 1369 spdk_bdev_close(bdev_desc); 1370 return -EINVAL; 1371 } 1372 1373 if (_set_pmd(meta_ctx) == false) { 1374 SPDK_ERRLOG("could not find required pmd\n"); 1375 free(meta_ctx); 1376 spdk_bdev_close(bdev_desc); 1377 return -EINVAL; 1378 } 1379 1380 /* Save the thread where the base device is opened */ 1381 meta_ctx->thread = spdk_get_thread(); 1382 1383 meta_ctx->base_ch = spdk_bdev_get_io_channel(meta_ctx->base_desc); 1384 1385 spdk_reduce_vol_init(&meta_ctx->params, &meta_ctx->backing_dev, 1386 pm_path, 1387 vbdev_reduce_init_cb, 1388 meta_ctx); 1389 return 0; 1390 } 1391 1392 /* We provide this callback for the SPDK channel code to create a channel using 1393 * the channel struct we provided in our module get_io_channel() entry point. Here 1394 * we get and save off an underlying base channel of the device below us so that 1395 * we can communicate with the base bdev on a per channel basis. If we needed 1396 * our own poller for this vbdev, we'd register it here. 1397 */ 1398 static int 1399 comp_bdev_ch_create_cb(void *io_device, void *ctx_buf) 1400 { 1401 struct vbdev_compress *comp_bdev = io_device; 1402 struct comp_device_qp *device_qp; 1403 1404 /* Now set the reduce channel if it's not already set. */ 1405 pthread_mutex_lock(&comp_bdev->reduce_lock); 1406 if (comp_bdev->ch_count == 0) { 1407 /* We use this queue to track outstanding IO in our layer. */ 1408 TAILQ_INIT(&comp_bdev->pending_comp_ios); 1409 1410 /* We use this to queue up compression operations as needed. */ 1411 TAILQ_INIT(&comp_bdev->queued_comp_ops); 1412 1413 comp_bdev->base_ch = spdk_bdev_get_io_channel(comp_bdev->base_desc); 1414 comp_bdev->reduce_thread = spdk_get_thread(); 1415 comp_bdev->poller = SPDK_POLLER_REGISTER(comp_dev_poller, comp_bdev, 0); 1416 /* Now assign a q pair */ 1417 pthread_mutex_lock(&g_comp_device_qp_lock); 1418 TAILQ_FOREACH(device_qp, &g_comp_device_qp, link) { 1419 if (strcmp(device_qp->device->cdev_info.driver_name, comp_bdev->drv_name) == 0) { 1420 if (device_qp->thread == spdk_get_thread()) { 1421 comp_bdev->device_qp = device_qp; 1422 break; 1423 } 1424 if (device_qp->thread == NULL) { 1425 comp_bdev->device_qp = device_qp; 1426 device_qp->thread = spdk_get_thread(); 1427 break; 1428 } 1429 } 1430 } 1431 pthread_mutex_unlock(&g_comp_device_qp_lock); 1432 } 1433 comp_bdev->ch_count++; 1434 pthread_mutex_unlock(&comp_bdev->reduce_lock); 1435 1436 if (comp_bdev->device_qp != NULL) { 1437 uint64_t comp_feature_flags = 1438 comp_bdev->device_qp->device->cdev_info.capabilities[RTE_COMP_ALGO_DEFLATE].comp_feature_flags; 1439 1440 if (comp_feature_flags & (RTE_COMP_FF_OOP_SGL_IN_SGL_OUT | RTE_COMP_FF_OOP_SGL_IN_LB_OUT)) { 1441 comp_bdev->backing_dev.sgl_in = true; 1442 } 1443 if (comp_feature_flags & (RTE_COMP_FF_OOP_SGL_IN_SGL_OUT | RTE_COMP_FF_OOP_LB_IN_SGL_OUT)) { 1444 comp_bdev->backing_dev.sgl_out = true; 1445 } 1446 return 0; 1447 } else { 1448 SPDK_ERRLOG("out of qpairs, cannot assign one to comp_bdev %p\n", comp_bdev); 1449 assert(false); 1450 return -ENOMEM; 1451 } 1452 } 1453 1454 static void 1455 _channel_cleanup(struct vbdev_compress *comp_bdev) 1456 { 1457 /* Note: comp_bdevs can share a device_qp if they are 1458 * on the same thread so we leave the device_qp element 1459 * alone for this comp_bdev and just clear the reduce thread. 1460 */ 1461 spdk_put_io_channel(comp_bdev->base_ch); 1462 comp_bdev->reduce_thread = NULL; 1463 spdk_poller_unregister(&comp_bdev->poller); 1464 } 1465 1466 /* Used to reroute destroy_ch to the correct thread */ 1467 static void 1468 _comp_bdev_ch_destroy_cb(void *arg) 1469 { 1470 struct vbdev_compress *comp_bdev = arg; 1471 1472 pthread_mutex_lock(&comp_bdev->reduce_lock); 1473 _channel_cleanup(comp_bdev); 1474 pthread_mutex_unlock(&comp_bdev->reduce_lock); 1475 } 1476 1477 /* We provide this callback for the SPDK channel code to destroy a channel 1478 * created with our create callback. We just need to undo anything we did 1479 * when we created. If this bdev used its own poller, we'd unregister it here. 1480 */ 1481 static void 1482 comp_bdev_ch_destroy_cb(void *io_device, void *ctx_buf) 1483 { 1484 struct vbdev_compress *comp_bdev = io_device; 1485 1486 pthread_mutex_lock(&comp_bdev->reduce_lock); 1487 comp_bdev->ch_count--; 1488 if (comp_bdev->ch_count == 0) { 1489 /* Send this request to the thread where the channel was created. */ 1490 if (comp_bdev->reduce_thread != spdk_get_thread()) { 1491 spdk_thread_send_msg(comp_bdev->reduce_thread, 1492 _comp_bdev_ch_destroy_cb, comp_bdev); 1493 } else { 1494 _channel_cleanup(comp_bdev); 1495 } 1496 } 1497 pthread_mutex_unlock(&comp_bdev->reduce_lock); 1498 } 1499 1500 /* RPC entry point for compression vbdev creation. */ 1501 int 1502 create_compress_bdev(const char *bdev_name, const char *pm_path, uint32_t lb_size) 1503 { 1504 struct vbdev_compress *comp_bdev = NULL; 1505 1506 if ((lb_size != 0) && (lb_size != LB_SIZE_4K) && (lb_size != LB_SIZE_512B)) { 1507 SPDK_ERRLOG("Logical block size must be 512 or 4096\n"); 1508 return -EINVAL; 1509 } 1510 1511 TAILQ_FOREACH(comp_bdev, &g_vbdev_comp, link) { 1512 if (strcmp(bdev_name, comp_bdev->base_bdev->name) == 0) { 1513 SPDK_ERRLOG("Bass bdev %s already being used for a compress bdev\n", bdev_name); 1514 return -EBUSY; 1515 } 1516 } 1517 return vbdev_init_reduce(bdev_name, pm_path, lb_size); 1518 } 1519 1520 /* On init, just init the compress drivers. All metadata is stored on disk. */ 1521 static int 1522 vbdev_compress_init(void) 1523 { 1524 if (vbdev_init_compress_drivers()) { 1525 SPDK_ERRLOG("Error setting up compression devices\n"); 1526 return -EINVAL; 1527 } 1528 1529 return 0; 1530 } 1531 1532 /* Called when the entire module is being torn down. */ 1533 static void 1534 vbdev_compress_finish(void) 1535 { 1536 struct comp_device_qp *dev_qp; 1537 /* TODO: unload vol in a future patch */ 1538 1539 while ((dev_qp = TAILQ_FIRST(&g_comp_device_qp))) { 1540 TAILQ_REMOVE(&g_comp_device_qp, dev_qp, link); 1541 free(dev_qp); 1542 } 1543 pthread_mutex_destroy(&g_comp_device_qp_lock); 1544 1545 rte_mempool_free(g_comp_op_mp); 1546 rte_mempool_free(g_mbuf_mp); 1547 } 1548 1549 /* During init we'll be asked how much memory we'd like passed to us 1550 * in bev_io structures as context. Here's where we specify how 1551 * much context we want per IO. 1552 */ 1553 static int 1554 vbdev_compress_get_ctx_size(void) 1555 { 1556 return sizeof(struct comp_bdev_io); 1557 } 1558 1559 /* When we register our bdev this is how we specify our entry points. */ 1560 static const struct spdk_bdev_fn_table vbdev_compress_fn_table = { 1561 .destruct = vbdev_compress_destruct, 1562 .submit_request = vbdev_compress_submit_request, 1563 .io_type_supported = vbdev_compress_io_type_supported, 1564 .get_io_channel = vbdev_compress_get_io_channel, 1565 .dump_info_json = vbdev_compress_dump_info_json, 1566 .write_config_json = NULL, 1567 }; 1568 1569 static struct spdk_bdev_module compress_if = { 1570 .name = "compress", 1571 .module_init = vbdev_compress_init, 1572 .get_ctx_size = vbdev_compress_get_ctx_size, 1573 .examine_disk = vbdev_compress_examine, 1574 .module_fini = vbdev_compress_finish, 1575 .config_json = vbdev_compress_config_json 1576 }; 1577 1578 SPDK_BDEV_MODULE_REGISTER(compress, &compress_if) 1579 1580 static int _set_compbdev_name(struct vbdev_compress *comp_bdev) 1581 { 1582 struct spdk_bdev_alias *aliases; 1583 1584 if (!TAILQ_EMPTY(spdk_bdev_get_aliases(comp_bdev->base_bdev))) { 1585 aliases = TAILQ_FIRST(spdk_bdev_get_aliases(comp_bdev->base_bdev)); 1586 comp_bdev->comp_bdev.name = spdk_sprintf_alloc("COMP_%s", aliases->alias.name); 1587 if (!comp_bdev->comp_bdev.name) { 1588 SPDK_ERRLOG("could not allocate comp_bdev name for alias\n"); 1589 return -ENOMEM; 1590 } 1591 } else { 1592 comp_bdev->comp_bdev.name = spdk_sprintf_alloc("COMP_%s", comp_bdev->base_bdev->name); 1593 if (!comp_bdev->comp_bdev.name) { 1594 SPDK_ERRLOG("could not allocate comp_bdev name for unique name\n"); 1595 return -ENOMEM; 1596 } 1597 } 1598 return 0; 1599 } 1600 1601 static int 1602 vbdev_compress_claim(struct vbdev_compress *comp_bdev) 1603 { 1604 int rc; 1605 1606 if (_set_compbdev_name(comp_bdev)) { 1607 return -EINVAL; 1608 } 1609 1610 /* Note: some of the fields below will change in the future - for example, 1611 * blockcnt specifically will not match (the compressed volume size will 1612 * be slightly less than the base bdev size) 1613 */ 1614 comp_bdev->comp_bdev.product_name = COMP_BDEV_NAME; 1615 comp_bdev->comp_bdev.write_cache = comp_bdev->base_bdev->write_cache; 1616 1617 if (strcmp(comp_bdev->drv_name, QAT_PMD) == 0) { 1618 comp_bdev->comp_bdev.required_alignment = 1619 spdk_max(spdk_u32log2(comp_bdev->base_bdev->blocklen), 1620 comp_bdev->base_bdev->required_alignment); 1621 SPDK_NOTICELOG("QAT in use: Required alignment set to %u\n", 1622 comp_bdev->comp_bdev.required_alignment); 1623 } else { 1624 comp_bdev->comp_bdev.required_alignment = comp_bdev->base_bdev->required_alignment; 1625 } 1626 comp_bdev->comp_bdev.optimal_io_boundary = 1627 comp_bdev->params.chunk_size / comp_bdev->params.logical_block_size; 1628 1629 comp_bdev->comp_bdev.split_on_optimal_io_boundary = true; 1630 1631 comp_bdev->comp_bdev.blocklen = comp_bdev->params.logical_block_size; 1632 comp_bdev->comp_bdev.blockcnt = comp_bdev->params.vol_size / comp_bdev->comp_bdev.blocklen; 1633 assert(comp_bdev->comp_bdev.blockcnt > 0); 1634 1635 /* This is the context that is passed to us when the bdev 1636 * layer calls in so we'll save our comp_bdev node here. 1637 */ 1638 comp_bdev->comp_bdev.ctxt = comp_bdev; 1639 comp_bdev->comp_bdev.fn_table = &vbdev_compress_fn_table; 1640 comp_bdev->comp_bdev.module = &compress_if; 1641 1642 pthread_mutex_init(&comp_bdev->reduce_lock, NULL); 1643 1644 /* Save the thread where the base device is opened */ 1645 comp_bdev->thread = spdk_get_thread(); 1646 1647 spdk_io_device_register(comp_bdev, comp_bdev_ch_create_cb, comp_bdev_ch_destroy_cb, 1648 sizeof(struct comp_io_channel), 1649 comp_bdev->comp_bdev.name); 1650 1651 rc = spdk_bdev_module_claim_bdev(comp_bdev->base_bdev, comp_bdev->base_desc, 1652 comp_bdev->comp_bdev.module); 1653 if (rc) { 1654 SPDK_ERRLOG("could not claim bdev %s\n", spdk_bdev_get_name(comp_bdev->base_bdev)); 1655 goto error_claim; 1656 } 1657 1658 rc = spdk_bdev_register(&comp_bdev->comp_bdev); 1659 if (rc < 0) { 1660 SPDK_ERRLOG("trying to register bdev\n"); 1661 goto error_bdev_register; 1662 } 1663 1664 TAILQ_INSERT_TAIL(&g_vbdev_comp, comp_bdev, link); 1665 1666 SPDK_NOTICELOG("registered io_device and virtual bdev for: %s\n", comp_bdev->comp_bdev.name); 1667 1668 return 0; 1669 1670 /* Error cleanup paths. */ 1671 error_bdev_register: 1672 spdk_bdev_module_release_bdev(comp_bdev->base_bdev); 1673 error_claim: 1674 spdk_io_device_unregister(comp_bdev, NULL); 1675 free(comp_bdev->comp_bdev.name); 1676 return rc; 1677 } 1678 1679 static void 1680 _vbdev_compress_delete_done(void *_ctx) 1681 { 1682 struct vbdev_comp_delete_ctx *ctx = _ctx; 1683 1684 ctx->cb_fn(ctx->cb_arg, ctx->cb_rc); 1685 1686 free(ctx); 1687 } 1688 1689 static void 1690 vbdev_compress_delete_done(void *cb_arg, int bdeverrno) 1691 { 1692 struct vbdev_comp_delete_ctx *ctx = cb_arg; 1693 1694 ctx->cb_rc = bdeverrno; 1695 1696 if (ctx->orig_thread != spdk_get_thread()) { 1697 spdk_thread_send_msg(ctx->orig_thread, _vbdev_compress_delete_done, ctx); 1698 } else { 1699 _vbdev_compress_delete_done(ctx); 1700 } 1701 } 1702 1703 void 1704 bdev_compress_delete(const char *name, spdk_delete_compress_complete cb_fn, void *cb_arg) 1705 { 1706 struct vbdev_compress *comp_bdev = NULL; 1707 struct vbdev_comp_delete_ctx *ctx; 1708 1709 TAILQ_FOREACH(comp_bdev, &g_vbdev_comp, link) { 1710 if (strcmp(name, comp_bdev->comp_bdev.name) == 0) { 1711 break; 1712 } 1713 } 1714 1715 if (comp_bdev == NULL) { 1716 cb_fn(cb_arg, -ENODEV); 1717 return; 1718 } 1719 1720 ctx = calloc(1, sizeof(*ctx)); 1721 if (ctx == NULL) { 1722 SPDK_ERRLOG("Failed to allocate delete context\n"); 1723 cb_fn(cb_arg, -ENOMEM); 1724 return; 1725 } 1726 1727 /* Save these for after the vol is destroyed. */ 1728 ctx->cb_fn = cb_fn; 1729 ctx->cb_arg = cb_arg; 1730 ctx->orig_thread = spdk_get_thread(); 1731 1732 comp_bdev->delete_ctx = ctx; 1733 1734 /* Tell reducelib that we're done with this volume. */ 1735 if (comp_bdev->orphaned == false) { 1736 spdk_reduce_vol_unload(comp_bdev->vol, delete_vol_unload_cb, comp_bdev); 1737 } else { 1738 delete_vol_unload_cb(comp_bdev, 0); 1739 } 1740 } 1741 1742 static void 1743 _vbdev_reduce_load_cb(void *ctx) 1744 { 1745 struct vbdev_compress *meta_ctx = ctx; 1746 int rc; 1747 1748 assert(meta_ctx->base_desc != NULL); 1749 1750 /* Done with metadata operations */ 1751 spdk_put_io_channel(meta_ctx->base_ch); 1752 1753 if (meta_ctx->reduce_errno == 0) { 1754 if (_set_pmd(meta_ctx) == false) { 1755 SPDK_ERRLOG("could not find required pmd\n"); 1756 goto err; 1757 } 1758 1759 rc = vbdev_compress_claim(meta_ctx); 1760 if (rc != 0) { 1761 goto err; 1762 } 1763 } else if (meta_ctx->reduce_errno == -ENOENT) { 1764 if (_set_compbdev_name(meta_ctx)) { 1765 goto err; 1766 } 1767 1768 /* Save the thread where the base device is opened */ 1769 meta_ctx->thread = spdk_get_thread(); 1770 1771 meta_ctx->comp_bdev.module = &compress_if; 1772 pthread_mutex_init(&meta_ctx->reduce_lock, NULL); 1773 rc = spdk_bdev_module_claim_bdev(meta_ctx->base_bdev, meta_ctx->base_desc, 1774 meta_ctx->comp_bdev.module); 1775 if (rc) { 1776 SPDK_ERRLOG("could not claim bdev %s\n", spdk_bdev_get_name(meta_ctx->base_bdev)); 1777 free(meta_ctx->comp_bdev.name); 1778 goto err; 1779 } 1780 1781 meta_ctx->orphaned = true; 1782 TAILQ_INSERT_TAIL(&g_vbdev_comp, meta_ctx, link); 1783 } else { 1784 if (meta_ctx->reduce_errno != -EILSEQ) { 1785 SPDK_ERRLOG("for vol %s, error %u\n", 1786 spdk_bdev_get_name(meta_ctx->base_bdev), meta_ctx->reduce_errno); 1787 } 1788 goto err; 1789 } 1790 1791 spdk_bdev_module_examine_done(&compress_if); 1792 return; 1793 1794 err: 1795 /* Close the underlying bdev on its same opened thread. */ 1796 spdk_bdev_close(meta_ctx->base_desc); 1797 free(meta_ctx); 1798 spdk_bdev_module_examine_done(&compress_if); 1799 } 1800 1801 /* Callback from reduce for then load is complete. We'll pass the vbdev_comp struct 1802 * used for initial metadata operations to claim where it will be further filled out 1803 * and added to the global list. 1804 */ 1805 static void 1806 vbdev_reduce_load_cb(void *cb_arg, struct spdk_reduce_vol *vol, int reduce_errno) 1807 { 1808 struct vbdev_compress *meta_ctx = cb_arg; 1809 1810 if (reduce_errno == 0) { 1811 /* Update information following volume load. */ 1812 meta_ctx->vol = vol; 1813 memcpy(&meta_ctx->params, spdk_reduce_vol_get_params(vol), 1814 sizeof(struct spdk_reduce_vol_params)); 1815 } 1816 1817 meta_ctx->reduce_errno = reduce_errno; 1818 1819 if (meta_ctx->thread && meta_ctx->thread != spdk_get_thread()) { 1820 spdk_thread_send_msg(meta_ctx->thread, _vbdev_reduce_load_cb, meta_ctx); 1821 } else { 1822 _vbdev_reduce_load_cb(meta_ctx); 1823 } 1824 1825 } 1826 1827 /* Examine_disk entry point: will do a metadata load to see if this is ours, 1828 * and if so will go ahead and claim it. 1829 */ 1830 static void 1831 vbdev_compress_examine(struct spdk_bdev *bdev) 1832 { 1833 struct spdk_bdev_desc *bdev_desc = NULL; 1834 struct vbdev_compress *meta_ctx; 1835 int rc; 1836 1837 if (strcmp(bdev->product_name, COMP_BDEV_NAME) == 0) { 1838 spdk_bdev_module_examine_done(&compress_if); 1839 return; 1840 } 1841 1842 rc = spdk_bdev_open_ext(spdk_bdev_get_name(bdev), false, 1843 vbdev_compress_base_bdev_event_cb, NULL, &bdev_desc); 1844 if (rc) { 1845 SPDK_ERRLOG("could not open bdev %s\n", spdk_bdev_get_name(bdev)); 1846 spdk_bdev_module_examine_done(&compress_if); 1847 return; 1848 } 1849 1850 meta_ctx = _prepare_for_load_init(bdev_desc, 0); 1851 if (meta_ctx == NULL) { 1852 spdk_bdev_close(bdev_desc); 1853 spdk_bdev_module_examine_done(&compress_if); 1854 return; 1855 } 1856 1857 /* Save the thread where the base device is opened */ 1858 meta_ctx->thread = spdk_get_thread(); 1859 1860 meta_ctx->base_ch = spdk_bdev_get_io_channel(meta_ctx->base_desc); 1861 spdk_reduce_vol_load(&meta_ctx->backing_dev, vbdev_reduce_load_cb, meta_ctx); 1862 } 1863 1864 int 1865 compress_set_pmd(enum compress_pmd *opts) 1866 { 1867 g_opts = *opts; 1868 1869 return 0; 1870 } 1871 1872 SPDK_LOG_REGISTER_COMPONENT(vbdev_compress) 1873