1 /* SPDX-License-Identifier: BSD-3-Clause 2 * Copyright (c) Intel Corporation. 3 * All rights reserved. 4 * Copyright (c) 2021, 2022 NVIDIA CORPORATION & AFFILIATES. All rights reserved. 5 */ 6 7 #include "vbdev_compress.h" 8 9 #include "spdk/reduce.h" 10 #include "spdk/stdinc.h" 11 #include "spdk/rpc.h" 12 #include "spdk/env.h" 13 #include "spdk/endian.h" 14 #include "spdk/string.h" 15 #include "spdk/thread.h" 16 #include "spdk/util.h" 17 #include "spdk/bdev_module.h" 18 #include "spdk/likely.h" 19 20 #include "spdk/log.h" 21 22 #include <rte_config.h> 23 #include <rte_bus_vdev.h> 24 #include <rte_compressdev.h> 25 #include <rte_comp.h> 26 #include <rte_mbuf_dyn.h> 27 28 /* Used to store IO context in mbuf */ 29 static const struct rte_mbuf_dynfield rte_mbuf_dynfield_io_context = { 30 .name = "context_reduce", 31 .size = sizeof(uint64_t), 32 .align = __alignof__(uint64_t), 33 .flags = 0, 34 }; 35 static int g_mbuf_offset; 36 37 #define NUM_MAX_XFORMS 2 38 #define NUM_MAX_INFLIGHT_OPS 128 39 #define DEFAULT_WINDOW_SIZE 15 40 /* We need extra mbufs per operation to accommodate host buffers that 41 * span a physical page boundary. 42 */ 43 #define MAX_MBUFS_PER_OP (REDUCE_MAX_IOVECS * 2) 44 #define CHUNK_SIZE (1024 * 16) 45 #define COMP_BDEV_NAME "compress" 46 #define BACKING_IO_SZ (4 * 1024) 47 48 #define ISAL_PMD "compress_isal" 49 #define QAT_PMD "compress_qat" 50 #define MLX5_PMD "mlx5_pci" 51 #define NUM_MBUFS 8192 52 #define POOL_CACHE_SIZE 256 53 54 static enum compress_pmd g_opts; 55 56 /* Global list of available compression devices. */ 57 struct compress_dev { 58 struct rte_compressdev_info cdev_info; /* includes device friendly name */ 59 uint8_t cdev_id; /* identifier for the device */ 60 void *comp_xform; /* shared private xform for comp on this PMD */ 61 void *decomp_xform; /* shared private xform for decomp on this PMD */ 62 TAILQ_ENTRY(compress_dev) link; 63 }; 64 static TAILQ_HEAD(, compress_dev) g_compress_devs = TAILQ_HEAD_INITIALIZER(g_compress_devs); 65 66 /* Although ISAL PMD reports 'unlimited' qpairs, it has an unplanned limit of 99 due to 67 * the length of the internal ring name that it creates, it breaks a limit in the generic 68 * ring code and fails the qp initialization. 69 * FIXME: Reduce number of qpairs to 48, due to issue #2338 70 */ 71 #define MAX_NUM_QP 48 72 /* Global list and lock for unique device/queue pair combos */ 73 struct comp_device_qp { 74 struct compress_dev *device; /* ptr to compression device */ 75 uint8_t qp; /* queue pair for this node */ 76 struct spdk_thread *thread; /* thread that this qp is assigned to */ 77 TAILQ_ENTRY(comp_device_qp) link; 78 }; 79 static TAILQ_HEAD(, comp_device_qp) g_comp_device_qp = TAILQ_HEAD_INITIALIZER(g_comp_device_qp); 80 static pthread_mutex_t g_comp_device_qp_lock = PTHREAD_MUTEX_INITIALIZER; 81 82 /* For queueing up compression operations that we can't submit for some reason */ 83 struct vbdev_comp_op { 84 struct spdk_reduce_backing_dev *backing_dev; 85 struct iovec *src_iovs; 86 int src_iovcnt; 87 struct iovec *dst_iovs; 88 int dst_iovcnt; 89 bool compress; 90 void *cb_arg; 91 TAILQ_ENTRY(vbdev_comp_op) link; 92 }; 93 94 struct vbdev_comp_delete_ctx { 95 spdk_delete_compress_complete cb_fn; 96 void *cb_arg; 97 int cb_rc; 98 struct spdk_thread *orig_thread; 99 }; 100 101 /* List of virtual bdevs and associated info for each. */ 102 struct vbdev_compress { 103 struct spdk_bdev *base_bdev; /* the thing we're attaching to */ 104 struct spdk_bdev_desc *base_desc; /* its descriptor we get from open */ 105 struct spdk_io_channel *base_ch; /* IO channel of base device */ 106 struct spdk_bdev comp_bdev; /* the compression virtual bdev */ 107 struct comp_io_channel *comp_ch; /* channel associated with this bdev */ 108 char *drv_name; /* name of the compression device driver */ 109 struct comp_device_qp *device_qp; 110 struct spdk_thread *reduce_thread; 111 pthread_mutex_t reduce_lock; 112 uint32_t ch_count; 113 TAILQ_HEAD(, spdk_bdev_io) pending_comp_ios; /* outstanding operations to a comp library */ 114 struct spdk_poller *poller; /* completion poller */ 115 struct spdk_reduce_vol_params params; /* params for the reduce volume */ 116 struct spdk_reduce_backing_dev backing_dev; /* backing device info for the reduce volume */ 117 struct spdk_reduce_vol *vol; /* the reduce volume */ 118 struct vbdev_comp_delete_ctx *delete_ctx; 119 bool orphaned; /* base bdev claimed but comp_bdev not registered */ 120 int reduce_errno; 121 TAILQ_HEAD(, vbdev_comp_op) queued_comp_ops; 122 TAILQ_ENTRY(vbdev_compress) link; 123 struct spdk_thread *thread; /* thread where base device is opened */ 124 }; 125 static TAILQ_HEAD(, vbdev_compress) g_vbdev_comp = TAILQ_HEAD_INITIALIZER(g_vbdev_comp); 126 127 /* The comp vbdev channel struct. It is allocated and freed on my behalf by the io channel code. 128 */ 129 struct comp_io_channel { 130 struct spdk_io_channel_iter *iter; /* used with for_each_channel in reset */ 131 }; 132 133 /* Per I/O context for the compression vbdev. */ 134 struct comp_bdev_io { 135 struct comp_io_channel *comp_ch; /* used in completion handling */ 136 struct vbdev_compress *comp_bdev; /* vbdev associated with this IO */ 137 struct spdk_bdev_io_wait_entry bdev_io_wait; /* for bdev_io_wait */ 138 struct spdk_bdev_io *orig_io; /* the original IO */ 139 struct spdk_io_channel *ch; /* for resubmission */ 140 int status; /* save for completion on orig thread */ 141 }; 142 143 /* Shared mempools between all devices on this system */ 144 static struct rte_mempool *g_mbuf_mp = NULL; /* mbuf mempool */ 145 static struct rte_mempool *g_comp_op_mp = NULL; /* comp operations, must be rte* mempool */ 146 static struct rte_mbuf_ext_shared_info g_shinfo = {}; /* used by DPDK mbuf macros */ 147 static bool g_qat_available = false; 148 static bool g_isal_available = false; 149 static bool g_mlx5_pci_available = false; 150 151 /* Create shared (between all ops per PMD) compress xforms. */ 152 static struct rte_comp_xform g_comp_xform = { 153 .type = RTE_COMP_COMPRESS, 154 .compress = { 155 .algo = RTE_COMP_ALGO_DEFLATE, 156 .deflate.huffman = RTE_COMP_HUFFMAN_DEFAULT, 157 .level = RTE_COMP_LEVEL_MAX, 158 .window_size = DEFAULT_WINDOW_SIZE, 159 .chksum = RTE_COMP_CHECKSUM_NONE, 160 .hash_algo = RTE_COMP_HASH_ALGO_NONE 161 } 162 }; 163 /* Create shared (between all ops per PMD) decompress xforms. */ 164 static struct rte_comp_xform g_decomp_xform = { 165 .type = RTE_COMP_DECOMPRESS, 166 .decompress = { 167 .algo = RTE_COMP_ALGO_DEFLATE, 168 .chksum = RTE_COMP_CHECKSUM_NONE, 169 .window_size = DEFAULT_WINDOW_SIZE, 170 .hash_algo = RTE_COMP_HASH_ALGO_NONE 171 } 172 }; 173 174 static void vbdev_compress_examine(struct spdk_bdev *bdev); 175 static int vbdev_compress_claim(struct vbdev_compress *comp_bdev); 176 static void vbdev_compress_queue_io(struct spdk_bdev_io *bdev_io); 177 struct vbdev_compress *_prepare_for_load_init(struct spdk_bdev_desc *bdev_desc, uint32_t lb_size); 178 static void vbdev_compress_submit_request(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io); 179 static void comp_bdev_ch_destroy_cb(void *io_device, void *ctx_buf); 180 static void vbdev_compress_delete_done(void *cb_arg, int bdeverrno); 181 182 /* Dummy function used by DPDK to free ext attached buffers 183 * to mbufs, we free them ourselves but this callback has to 184 * be here. 185 */ 186 static void 187 shinfo_free_cb(void *arg1, void *arg2) 188 { 189 } 190 191 /* Called by vbdev_init_compress_drivers() to init each discovered compression device */ 192 static int 193 create_compress_dev(uint8_t index) 194 { 195 struct compress_dev *device; 196 uint16_t q_pairs; 197 uint8_t cdev_id; 198 int rc, i; 199 struct comp_device_qp *dev_qp; 200 struct comp_device_qp *tmp_qp; 201 202 device = calloc(1, sizeof(struct compress_dev)); 203 if (!device) { 204 return -ENOMEM; 205 } 206 207 /* Get details about this device. */ 208 rte_compressdev_info_get(index, &device->cdev_info); 209 210 cdev_id = device->cdev_id = index; 211 212 /* Zero means no limit so choose number of lcores. */ 213 if (device->cdev_info.max_nb_queue_pairs == 0) { 214 q_pairs = MAX_NUM_QP; 215 } else { 216 q_pairs = spdk_min(device->cdev_info.max_nb_queue_pairs, MAX_NUM_QP); 217 } 218 219 /* Configure the compression device. */ 220 struct rte_compressdev_config config = { 221 .socket_id = rte_socket_id(), 222 .nb_queue_pairs = q_pairs, 223 .max_nb_priv_xforms = NUM_MAX_XFORMS, 224 .max_nb_streams = 0 225 }; 226 rc = rte_compressdev_configure(cdev_id, &config); 227 if (rc < 0) { 228 SPDK_ERRLOG("Failed to configure compressdev %u\n", cdev_id); 229 goto err; 230 } 231 232 /* Pre-setup all potential qpairs now and assign them in the channel 233 * callback. 234 */ 235 for (i = 0; i < q_pairs; i++) { 236 rc = rte_compressdev_queue_pair_setup(cdev_id, i, 237 NUM_MAX_INFLIGHT_OPS, 238 rte_socket_id()); 239 if (rc) { 240 if (i > 0) { 241 q_pairs = i; 242 SPDK_NOTICELOG("FYI failed to setup a queue pair on " 243 "compressdev %u with error %u " 244 "so limiting to %u qpairs\n", 245 cdev_id, rc, q_pairs); 246 break; 247 } else { 248 SPDK_ERRLOG("Failed to setup queue pair on " 249 "compressdev %u with error %u\n", cdev_id, rc); 250 rc = -EINVAL; 251 goto err; 252 } 253 } 254 } 255 256 rc = rte_compressdev_start(cdev_id); 257 if (rc < 0) { 258 SPDK_ERRLOG("Failed to start device %u: error %d\n", 259 cdev_id, rc); 260 goto err; 261 } 262 263 if (device->cdev_info.capabilities->comp_feature_flags & RTE_COMP_FF_SHAREABLE_PRIV_XFORM) { 264 rc = rte_compressdev_private_xform_create(cdev_id, &g_comp_xform, 265 &device->comp_xform); 266 if (rc < 0) { 267 SPDK_ERRLOG("Failed to create private comp xform device %u: error %d\n", 268 cdev_id, rc); 269 goto err; 270 } 271 272 rc = rte_compressdev_private_xform_create(cdev_id, &g_decomp_xform, 273 &device->decomp_xform); 274 if (rc) { 275 SPDK_ERRLOG("Failed to create private decomp xform device %u: error %d\n", 276 cdev_id, rc); 277 goto err; 278 } 279 } else { 280 SPDK_ERRLOG("PMD does not support shared transforms\n"); 281 goto err; 282 } 283 284 /* Build up list of device/qp combinations */ 285 for (i = 0; i < q_pairs; i++) { 286 dev_qp = calloc(1, sizeof(struct comp_device_qp)); 287 if (!dev_qp) { 288 rc = -ENOMEM; 289 goto err; 290 } 291 dev_qp->device = device; 292 dev_qp->qp = i; 293 dev_qp->thread = NULL; 294 TAILQ_INSERT_TAIL(&g_comp_device_qp, dev_qp, link); 295 } 296 297 TAILQ_INSERT_TAIL(&g_compress_devs, device, link); 298 299 if (strcmp(device->cdev_info.driver_name, QAT_PMD) == 0) { 300 g_qat_available = true; 301 } 302 if (strcmp(device->cdev_info.driver_name, ISAL_PMD) == 0) { 303 g_isal_available = true; 304 } 305 if (strcmp(device->cdev_info.driver_name, MLX5_PMD) == 0) { 306 g_mlx5_pci_available = true; 307 } 308 309 return 0; 310 311 err: 312 TAILQ_FOREACH_SAFE(dev_qp, &g_comp_device_qp, link, tmp_qp) { 313 TAILQ_REMOVE(&g_comp_device_qp, dev_qp, link); 314 free(dev_qp); 315 } 316 free(device); 317 return rc; 318 } 319 320 /* Called from driver init entry point, vbdev_compress_init() */ 321 static int 322 vbdev_init_compress_drivers(void) 323 { 324 uint8_t cdev_count, i; 325 struct compress_dev *tmp_dev; 326 struct compress_dev *device; 327 int rc; 328 329 /* We always init the compress_isal PMD */ 330 rc = rte_vdev_init(ISAL_PMD, NULL); 331 if (rc == 0) { 332 SPDK_NOTICELOG("created virtual PMD %s\n", ISAL_PMD); 333 } else if (rc == -EEXIST) { 334 SPDK_NOTICELOG("virtual PMD %s already exists.\n", ISAL_PMD); 335 } else { 336 SPDK_ERRLOG("creating virtual PMD %s\n", ISAL_PMD); 337 return -EINVAL; 338 } 339 340 /* If we have no compression devices, there's no reason to continue. */ 341 cdev_count = rte_compressdev_count(); 342 if (cdev_count == 0) { 343 return 0; 344 } 345 if (cdev_count > RTE_COMPRESS_MAX_DEVS) { 346 SPDK_ERRLOG("invalid device count from rte_compressdev_count()\n"); 347 return -EINVAL; 348 } 349 350 g_mbuf_offset = rte_mbuf_dynfield_register(&rte_mbuf_dynfield_io_context); 351 if (g_mbuf_offset < 0) { 352 SPDK_ERRLOG("error registering dynamic field with DPDK\n"); 353 return -EINVAL; 354 } 355 356 g_mbuf_mp = rte_pktmbuf_pool_create("comp_mbuf_mp", NUM_MBUFS, POOL_CACHE_SIZE, 357 sizeof(struct rte_mbuf), 0, rte_socket_id()); 358 if (g_mbuf_mp == NULL) { 359 SPDK_ERRLOG("Cannot create mbuf pool\n"); 360 rc = -ENOMEM; 361 goto error_create_mbuf; 362 } 363 364 g_comp_op_mp = rte_comp_op_pool_create("comp_op_pool", NUM_MBUFS, POOL_CACHE_SIZE, 365 0, rte_socket_id()); 366 if (g_comp_op_mp == NULL) { 367 SPDK_ERRLOG("Cannot create comp op pool\n"); 368 rc = -ENOMEM; 369 goto error_create_op; 370 } 371 372 /* Init all devices */ 373 for (i = 0; i < cdev_count; i++) { 374 rc = create_compress_dev(i); 375 if (rc != 0) { 376 goto error_create_compress_devs; 377 } 378 } 379 380 if (g_qat_available == true) { 381 SPDK_NOTICELOG("initialized QAT PMD\n"); 382 } 383 384 g_shinfo.free_cb = shinfo_free_cb; 385 386 return 0; 387 388 /* Error cleanup paths. */ 389 error_create_compress_devs: 390 TAILQ_FOREACH_SAFE(device, &g_compress_devs, link, tmp_dev) { 391 TAILQ_REMOVE(&g_compress_devs, device, link); 392 free(device); 393 } 394 error_create_op: 395 error_create_mbuf: 396 rte_mempool_free(g_mbuf_mp); 397 398 return rc; 399 } 400 401 /* for completing rw requests on the orig IO thread. */ 402 static void 403 _reduce_rw_blocks_cb(void *arg) 404 { 405 struct comp_bdev_io *io_ctx = arg; 406 407 if (spdk_likely(io_ctx->status == 0)) { 408 spdk_bdev_io_complete(io_ctx->orig_io, SPDK_BDEV_IO_STATUS_SUCCESS); 409 } else if (io_ctx->status == -ENOMEM) { 410 vbdev_compress_queue_io(spdk_bdev_io_from_ctx(io_ctx)); 411 } else { 412 SPDK_ERRLOG("status %d on operation from reduce API\n", io_ctx->status); 413 spdk_bdev_io_complete(io_ctx->orig_io, SPDK_BDEV_IO_STATUS_FAILED); 414 } 415 } 416 417 /* Completion callback for r/w that were issued via reducelib. */ 418 static void 419 reduce_rw_blocks_cb(void *arg, int reduce_errno) 420 { 421 struct spdk_bdev_io *bdev_io = arg; 422 struct comp_bdev_io *io_ctx = (struct comp_bdev_io *)bdev_io->driver_ctx; 423 struct spdk_io_channel *ch = spdk_io_channel_from_ctx(io_ctx->comp_ch); 424 struct spdk_thread *orig_thread; 425 426 /* TODO: need to decide which error codes are bdev_io success vs failure; 427 * example examine calls reading metadata */ 428 429 io_ctx->status = reduce_errno; 430 431 /* Send this request to the orig IO thread. */ 432 orig_thread = spdk_io_channel_get_thread(ch); 433 434 spdk_thread_exec_msg(orig_thread, _reduce_rw_blocks_cb, io_ctx); 435 } 436 437 static int 438 _setup_compress_mbuf(struct rte_mbuf **mbufs, int *mbuf_total, uint64_t *total_length, 439 struct iovec *iovs, int iovcnt, void *reduce_cb_arg) 440 { 441 uint64_t updated_length, remainder, phys_addr; 442 uint8_t *current_base = NULL; 443 int iov_index, mbuf_index; 444 int rc = 0; 445 446 /* Setup mbufs */ 447 iov_index = mbuf_index = 0; 448 while (iov_index < iovcnt) { 449 450 current_base = iovs[iov_index].iov_base; 451 if (total_length) { 452 *total_length += iovs[iov_index].iov_len; 453 } 454 assert(mbufs[mbuf_index] != NULL); 455 *RTE_MBUF_DYNFIELD(mbufs[mbuf_index], g_mbuf_offset, uint64_t *) = (uint64_t)reduce_cb_arg; 456 updated_length = iovs[iov_index].iov_len; 457 phys_addr = spdk_vtophys((void *)current_base, &updated_length); 458 459 rte_pktmbuf_attach_extbuf(mbufs[mbuf_index], 460 current_base, 461 phys_addr, 462 updated_length, 463 &g_shinfo); 464 rte_pktmbuf_append(mbufs[mbuf_index], updated_length); 465 remainder = iovs[iov_index].iov_len - updated_length; 466 467 if (mbuf_index > 0) { 468 rte_pktmbuf_chain(mbufs[0], mbufs[mbuf_index]); 469 } 470 471 /* If we crossed 2 physical pages boundary we need another mbuf for the remainder */ 472 if (remainder > 0) { 473 /* allocate an mbuf at the end of the array */ 474 rc = rte_pktmbuf_alloc_bulk(g_mbuf_mp, 475 (struct rte_mbuf **)&mbufs[*mbuf_total], 1); 476 if (rc) { 477 SPDK_ERRLOG("ERROR trying to get an extra mbuf!\n"); 478 return -1; 479 } 480 (*mbuf_total)++; 481 mbuf_index++; 482 *RTE_MBUF_DYNFIELD(mbufs[mbuf_index], g_mbuf_offset, uint64_t *) = (uint64_t)reduce_cb_arg; 483 current_base += updated_length; 484 phys_addr = spdk_vtophys((void *)current_base, &remainder); 485 /* assert we don't cross another */ 486 assert(remainder == iovs[iov_index].iov_len - updated_length); 487 488 rte_pktmbuf_attach_extbuf(mbufs[mbuf_index], 489 current_base, 490 phys_addr, 491 remainder, 492 &g_shinfo); 493 rte_pktmbuf_append(mbufs[mbuf_index], remainder); 494 rte_pktmbuf_chain(mbufs[0], mbufs[mbuf_index]); 495 } 496 iov_index++; 497 mbuf_index++; 498 } 499 500 return 0; 501 } 502 503 static int 504 _compress_operation(struct spdk_reduce_backing_dev *backing_dev, struct iovec *src_iovs, 505 int src_iovcnt, struct iovec *dst_iovs, 506 int dst_iovcnt, bool compress, void *cb_arg) 507 { 508 void *reduce_cb_arg = cb_arg; 509 struct vbdev_compress *comp_bdev = SPDK_CONTAINEROF(backing_dev, struct vbdev_compress, 510 backing_dev); 511 struct rte_comp_op *comp_op; 512 struct rte_mbuf *src_mbufs[MAX_MBUFS_PER_OP]; 513 struct rte_mbuf *dst_mbufs[MAX_MBUFS_PER_OP]; 514 uint8_t cdev_id = comp_bdev->device_qp->device->cdev_id; 515 uint64_t total_length = 0; 516 int rc = 0; 517 struct vbdev_comp_op *op_to_queue; 518 int src_mbuf_total = src_iovcnt; 519 int dst_mbuf_total = dst_iovcnt; 520 bool device_error = false; 521 522 assert(src_iovcnt < MAX_MBUFS_PER_OP); 523 524 #ifdef DEBUG 525 memset(src_mbufs, 0, sizeof(src_mbufs)); 526 memset(dst_mbufs, 0, sizeof(dst_mbufs)); 527 #endif 528 529 comp_op = rte_comp_op_alloc(g_comp_op_mp); 530 if (!comp_op) { 531 SPDK_ERRLOG("trying to get a comp op!\n"); 532 rc = -ENOMEM; 533 goto error_get_op; 534 } 535 536 /* get an mbuf per iov, src and dst */ 537 rc = rte_pktmbuf_alloc_bulk(g_mbuf_mp, (struct rte_mbuf **)&src_mbufs[0], src_iovcnt); 538 if (rc) { 539 SPDK_ERRLOG("ERROR trying to get src_mbufs!\n"); 540 rc = -ENOMEM; 541 goto error_get_src; 542 } 543 assert(src_mbufs[0]); 544 545 rc = rte_pktmbuf_alloc_bulk(g_mbuf_mp, (struct rte_mbuf **)&dst_mbufs[0], dst_iovcnt); 546 if (rc) { 547 SPDK_ERRLOG("ERROR trying to get dst_mbufs!\n"); 548 rc = -ENOMEM; 549 goto error_get_dst; 550 } 551 assert(dst_mbufs[0]); 552 553 /* There is a 1:1 mapping between a bdev_io and a compression operation 554 * Some PMDs that SPDK uses don't support chaining, but reduce library should 555 * provide correct buffers 556 * Build our mbuf chain and associate it with our single comp_op. 557 */ 558 rc = _setup_compress_mbuf(src_mbufs, &src_mbuf_total, &total_length, 559 src_iovs, src_iovcnt, reduce_cb_arg); 560 if (rc < 0) { 561 goto error_src_dst; 562 } 563 if (!comp_bdev->backing_dev.sgl_in && src_mbufs[0]->next != NULL) { 564 if (src_iovcnt == 1) { 565 SPDK_ERRLOG("Src buffer crosses physical page boundary but driver %s doesn't support SGL input\n", 566 comp_bdev->drv_name); 567 } else { 568 SPDK_ERRLOG("Driver %s doesn't support SGL input\n", comp_bdev->drv_name); 569 } 570 rc = -EINVAL; 571 goto error_src_dst; 572 } 573 574 comp_op->m_src = src_mbufs[0]; 575 comp_op->src.offset = 0; 576 comp_op->src.length = total_length; 577 578 rc = _setup_compress_mbuf(dst_mbufs, &dst_mbuf_total, NULL, 579 dst_iovs, dst_iovcnt, reduce_cb_arg); 580 if (rc < 0) { 581 goto error_src_dst; 582 } 583 if (!comp_bdev->backing_dev.sgl_out && dst_mbufs[0]->next != NULL) { 584 if (dst_iovcnt == 1) { 585 SPDK_ERRLOG("Dst buffer crosses physical page boundary but driver %s doesn't support SGL output\n", 586 comp_bdev->drv_name); 587 } else { 588 SPDK_ERRLOG("Driver %s doesn't support SGL output\n", comp_bdev->drv_name); 589 } 590 rc = -EINVAL; 591 goto error_src_dst; 592 } 593 594 comp_op->m_dst = dst_mbufs[0]; 595 comp_op->dst.offset = 0; 596 597 if (compress == true) { 598 comp_op->private_xform = comp_bdev->device_qp->device->comp_xform; 599 } else { 600 comp_op->private_xform = comp_bdev->device_qp->device->decomp_xform; 601 } 602 603 comp_op->op_type = RTE_COMP_OP_STATELESS; 604 comp_op->flush_flag = RTE_COMP_FLUSH_FINAL; 605 606 rc = rte_compressdev_enqueue_burst(cdev_id, comp_bdev->device_qp->qp, &comp_op, 1); 607 assert(rc <= 1); 608 609 /* We always expect 1 got queued, if 0 then we need to queue it up. */ 610 if (rc == 1) { 611 return 0; 612 } else if (comp_op->status == RTE_COMP_OP_STATUS_NOT_PROCESSED) { 613 rc = -EAGAIN; 614 } else { 615 device_error = true; 616 } 617 618 /* Error cleanup paths. */ 619 error_src_dst: 620 rte_pktmbuf_free_bulk(dst_mbufs, dst_iovcnt); 621 error_get_dst: 622 rte_pktmbuf_free_bulk(src_mbufs, src_iovcnt); 623 error_get_src: 624 rte_comp_op_free(comp_op); 625 error_get_op: 626 627 if (device_error == true) { 628 /* There was an error sending the op to the device, most 629 * likely with the parameters. 630 */ 631 SPDK_ERRLOG("Compression API returned 0x%x\n", comp_op->status); 632 return -EINVAL; 633 } 634 if (rc != -ENOMEM && rc != -EAGAIN) { 635 return rc; 636 } 637 638 op_to_queue = calloc(1, sizeof(struct vbdev_comp_op)); 639 if (op_to_queue == NULL) { 640 SPDK_ERRLOG("unable to allocate operation for queueing.\n"); 641 return -ENOMEM; 642 } 643 op_to_queue->backing_dev = backing_dev; 644 op_to_queue->src_iovs = src_iovs; 645 op_to_queue->src_iovcnt = src_iovcnt; 646 op_to_queue->dst_iovs = dst_iovs; 647 op_to_queue->dst_iovcnt = dst_iovcnt; 648 op_to_queue->compress = compress; 649 op_to_queue->cb_arg = cb_arg; 650 TAILQ_INSERT_TAIL(&comp_bdev->queued_comp_ops, 651 op_to_queue, 652 link); 653 return 0; 654 } 655 656 /* Poller for the DPDK compression driver. */ 657 static int 658 comp_dev_poller(void *args) 659 { 660 struct vbdev_compress *comp_bdev = args; 661 uint8_t cdev_id = comp_bdev->device_qp->device->cdev_id; 662 struct rte_comp_op *deq_ops[NUM_MAX_INFLIGHT_OPS]; 663 uint16_t num_deq; 664 struct spdk_reduce_vol_cb_args *reduce_args; 665 struct vbdev_comp_op *op_to_resubmit; 666 int rc, i; 667 668 num_deq = rte_compressdev_dequeue_burst(cdev_id, comp_bdev->device_qp->qp, deq_ops, 669 NUM_MAX_INFLIGHT_OPS); 670 for (i = 0; i < num_deq; i++) { 671 reduce_args = (struct spdk_reduce_vol_cb_args *)*RTE_MBUF_DYNFIELD(deq_ops[i]->m_src, g_mbuf_offset, 672 uint64_t *); 673 if (deq_ops[i]->status == RTE_COMP_OP_STATUS_SUCCESS) { 674 675 /* tell reduce this is done and what the bytecount was */ 676 reduce_args->cb_fn(reduce_args->cb_arg, deq_ops[i]->produced); 677 } else { 678 SPDK_NOTICELOG("FYI storing data uncompressed due to deque status %u\n", 679 deq_ops[i]->status); 680 681 /* Reduce will simply store uncompressed on neg errno value. */ 682 reduce_args->cb_fn(reduce_args->cb_arg, -EINVAL); 683 } 684 685 /* Now free both mbufs and the compress operation. The rte_pktmbuf_free() 686 * call takes care of freeing all of the mbufs in the chain back to their 687 * original pool. 688 */ 689 rte_pktmbuf_free(deq_ops[i]->m_src); 690 rte_pktmbuf_free(deq_ops[i]->m_dst); 691 692 /* There is no bulk free for com ops so we have to free them one at a time 693 * here however it would be rare that we'd ever have more than 1 at a time 694 * anyways. 695 */ 696 rte_comp_op_free(deq_ops[i]); 697 698 /* Check if there are any pending comp ops to process, only pull one 699 * at a time off as _compress_operation() may re-queue the op. 700 */ 701 if (!TAILQ_EMPTY(&comp_bdev->queued_comp_ops)) { 702 op_to_resubmit = TAILQ_FIRST(&comp_bdev->queued_comp_ops); 703 rc = _compress_operation(op_to_resubmit->backing_dev, 704 op_to_resubmit->src_iovs, 705 op_to_resubmit->src_iovcnt, 706 op_to_resubmit->dst_iovs, 707 op_to_resubmit->dst_iovcnt, 708 op_to_resubmit->compress, 709 op_to_resubmit->cb_arg); 710 if (rc == 0) { 711 TAILQ_REMOVE(&comp_bdev->queued_comp_ops, op_to_resubmit, link); 712 free(op_to_resubmit); 713 } 714 } 715 } 716 return num_deq == 0 ? SPDK_POLLER_IDLE : SPDK_POLLER_BUSY; 717 } 718 719 /* Entry point for reduce lib to issue a compress operation. */ 720 static void 721 _comp_reduce_compress(struct spdk_reduce_backing_dev *dev, 722 struct iovec *src_iovs, int src_iovcnt, 723 struct iovec *dst_iovs, int dst_iovcnt, 724 struct spdk_reduce_vol_cb_args *cb_arg) 725 { 726 int rc; 727 728 rc = _compress_operation(dev, src_iovs, src_iovcnt, dst_iovs, dst_iovcnt, true, cb_arg); 729 if (rc) { 730 SPDK_ERRLOG("with compress operation code %d (%s)\n", rc, spdk_strerror(-rc)); 731 cb_arg->cb_fn(cb_arg->cb_arg, rc); 732 } 733 } 734 735 /* Entry point for reduce lib to issue a decompress operation. */ 736 static void 737 _comp_reduce_decompress(struct spdk_reduce_backing_dev *dev, 738 struct iovec *src_iovs, int src_iovcnt, 739 struct iovec *dst_iovs, int dst_iovcnt, 740 struct spdk_reduce_vol_cb_args *cb_arg) 741 { 742 int rc; 743 744 rc = _compress_operation(dev, src_iovs, src_iovcnt, dst_iovs, dst_iovcnt, false, cb_arg); 745 if (rc) { 746 SPDK_ERRLOG("with decompress operation code %d (%s)\n", rc, spdk_strerror(-rc)); 747 cb_arg->cb_fn(cb_arg->cb_arg, rc); 748 } 749 } 750 751 /* Callback for getting a buf from the bdev pool in the event that the caller passed 752 * in NULL, we need to own the buffer so it doesn't get freed by another vbdev module 753 * beneath us before we're done with it. 754 */ 755 static void 756 comp_read_get_buf_cb(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io, bool success) 757 { 758 struct vbdev_compress *comp_bdev = SPDK_CONTAINEROF(bdev_io->bdev, struct vbdev_compress, 759 comp_bdev); 760 761 if (spdk_unlikely(!success)) { 762 SPDK_ERRLOG("Failed to get data buffer\n"); 763 reduce_rw_blocks_cb(bdev_io, -ENOMEM); 764 return; 765 } 766 767 spdk_reduce_vol_readv(comp_bdev->vol, bdev_io->u.bdev.iovs, bdev_io->u.bdev.iovcnt, 768 bdev_io->u.bdev.offset_blocks, bdev_io->u.bdev.num_blocks, 769 reduce_rw_blocks_cb, bdev_io); 770 } 771 772 /* scheduled for completion on IO thread */ 773 static void 774 _complete_other_io(void *arg) 775 { 776 struct comp_bdev_io *io_ctx = (struct comp_bdev_io *)arg; 777 if (io_ctx->status == 0) { 778 spdk_bdev_io_complete(io_ctx->orig_io, SPDK_BDEV_IO_STATUS_SUCCESS); 779 } else { 780 spdk_bdev_io_complete(io_ctx->orig_io, SPDK_BDEV_IO_STATUS_FAILED); 781 } 782 } 783 784 /* scheduled for submission on reduce thread */ 785 static void 786 _comp_bdev_io_submit(void *arg) 787 { 788 struct spdk_bdev_io *bdev_io = arg; 789 struct comp_bdev_io *io_ctx = (struct comp_bdev_io *)bdev_io->driver_ctx; 790 struct spdk_io_channel *ch = spdk_io_channel_from_ctx(io_ctx->comp_ch); 791 struct vbdev_compress *comp_bdev = SPDK_CONTAINEROF(bdev_io->bdev, struct vbdev_compress, 792 comp_bdev); 793 struct spdk_thread *orig_thread; 794 795 switch (bdev_io->type) { 796 case SPDK_BDEV_IO_TYPE_READ: 797 spdk_bdev_io_get_buf(bdev_io, comp_read_get_buf_cb, 798 bdev_io->u.bdev.num_blocks * bdev_io->bdev->blocklen); 799 return; 800 case SPDK_BDEV_IO_TYPE_WRITE: 801 spdk_reduce_vol_writev(comp_bdev->vol, bdev_io->u.bdev.iovs, bdev_io->u.bdev.iovcnt, 802 bdev_io->u.bdev.offset_blocks, bdev_io->u.bdev.num_blocks, 803 reduce_rw_blocks_cb, bdev_io); 804 return; 805 /* TODO support RESET in future patch in the series */ 806 case SPDK_BDEV_IO_TYPE_RESET: 807 case SPDK_BDEV_IO_TYPE_WRITE_ZEROES: 808 case SPDK_BDEV_IO_TYPE_UNMAP: 809 case SPDK_BDEV_IO_TYPE_FLUSH: 810 default: 811 SPDK_ERRLOG("Unknown I/O type %d\n", bdev_io->type); 812 io_ctx->status = -ENOTSUP; 813 814 /* Complete this on the orig IO thread. */ 815 orig_thread = spdk_io_channel_get_thread(ch); 816 if (orig_thread != spdk_get_thread()) { 817 spdk_thread_send_msg(orig_thread, _complete_other_io, io_ctx); 818 } else { 819 _complete_other_io(io_ctx); 820 } 821 } 822 } 823 824 /* Called when someone above submits IO to this vbdev. */ 825 static void 826 vbdev_compress_submit_request(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io) 827 { 828 struct comp_bdev_io *io_ctx = (struct comp_bdev_io *)bdev_io->driver_ctx; 829 struct vbdev_compress *comp_bdev = SPDK_CONTAINEROF(bdev_io->bdev, struct vbdev_compress, 830 comp_bdev); 831 struct comp_io_channel *comp_ch = spdk_io_channel_get_ctx(ch); 832 833 memset(io_ctx, 0, sizeof(struct comp_bdev_io)); 834 io_ctx->comp_bdev = comp_bdev; 835 io_ctx->comp_ch = comp_ch; 836 io_ctx->orig_io = bdev_io; 837 838 /* Send this request to the reduce_thread if that's not what we're on. */ 839 if (spdk_get_thread() != comp_bdev->reduce_thread) { 840 spdk_thread_send_msg(comp_bdev->reduce_thread, _comp_bdev_io_submit, bdev_io); 841 } else { 842 _comp_bdev_io_submit(bdev_io); 843 } 844 } 845 846 static bool 847 vbdev_compress_io_type_supported(void *ctx, enum spdk_bdev_io_type io_type) 848 { 849 struct vbdev_compress *comp_bdev = (struct vbdev_compress *)ctx; 850 851 switch (io_type) { 852 case SPDK_BDEV_IO_TYPE_READ: 853 case SPDK_BDEV_IO_TYPE_WRITE: 854 return spdk_bdev_io_type_supported(comp_bdev->base_bdev, io_type); 855 case SPDK_BDEV_IO_TYPE_UNMAP: 856 case SPDK_BDEV_IO_TYPE_RESET: 857 case SPDK_BDEV_IO_TYPE_FLUSH: 858 case SPDK_BDEV_IO_TYPE_WRITE_ZEROES: 859 default: 860 return false; 861 } 862 } 863 864 /* Resubmission function used by the bdev layer when a queued IO is ready to be 865 * submitted. 866 */ 867 static void 868 vbdev_compress_resubmit_io(void *arg) 869 { 870 struct spdk_bdev_io *bdev_io = (struct spdk_bdev_io *)arg; 871 struct comp_bdev_io *io_ctx = (struct comp_bdev_io *)bdev_io->driver_ctx; 872 873 vbdev_compress_submit_request(io_ctx->ch, bdev_io); 874 } 875 876 /* Used to queue an IO in the event of resource issues. */ 877 static void 878 vbdev_compress_queue_io(struct spdk_bdev_io *bdev_io) 879 { 880 struct comp_bdev_io *io_ctx = (struct comp_bdev_io *)bdev_io->driver_ctx; 881 int rc; 882 883 io_ctx->bdev_io_wait.bdev = bdev_io->bdev; 884 io_ctx->bdev_io_wait.cb_fn = vbdev_compress_resubmit_io; 885 io_ctx->bdev_io_wait.cb_arg = bdev_io; 886 887 rc = spdk_bdev_queue_io_wait(bdev_io->bdev, io_ctx->comp_bdev->base_ch, &io_ctx->bdev_io_wait); 888 if (rc) { 889 SPDK_ERRLOG("Queue io failed in vbdev_compress_queue_io, rc=%d.\n", rc); 890 assert(false); 891 spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED); 892 } 893 } 894 895 /* Callback for unregistering the IO device. */ 896 static void 897 _device_unregister_cb(void *io_device) 898 { 899 struct vbdev_compress *comp_bdev = io_device; 900 901 /* Done with this comp_bdev. */ 902 pthread_mutex_destroy(&comp_bdev->reduce_lock); 903 free(comp_bdev->comp_bdev.name); 904 free(comp_bdev); 905 } 906 907 static void 908 _vbdev_compress_destruct_cb(void *ctx) 909 { 910 struct vbdev_compress *comp_bdev = ctx; 911 912 TAILQ_REMOVE(&g_vbdev_comp, comp_bdev, link); 913 spdk_bdev_module_release_bdev(comp_bdev->base_bdev); 914 /* Close the underlying bdev on its same opened thread. */ 915 spdk_bdev_close(comp_bdev->base_desc); 916 comp_bdev->vol = NULL; 917 if (comp_bdev->orphaned == false) { 918 spdk_io_device_unregister(comp_bdev, _device_unregister_cb); 919 } else { 920 vbdev_compress_delete_done(comp_bdev->delete_ctx, 0); 921 _device_unregister_cb(comp_bdev); 922 } 923 } 924 925 static void 926 vbdev_compress_destruct_cb(void *cb_arg, int reduce_errno) 927 { 928 struct vbdev_compress *comp_bdev = (struct vbdev_compress *)cb_arg; 929 930 if (reduce_errno) { 931 SPDK_ERRLOG("number %d\n", reduce_errno); 932 } else { 933 if (comp_bdev->thread && comp_bdev->thread != spdk_get_thread()) { 934 spdk_thread_send_msg(comp_bdev->thread, 935 _vbdev_compress_destruct_cb, comp_bdev); 936 } else { 937 _vbdev_compress_destruct_cb(comp_bdev); 938 } 939 } 940 } 941 942 static void 943 _reduce_destroy_cb(void *ctx, int reduce_errno) 944 { 945 struct vbdev_compress *comp_bdev = (struct vbdev_compress *)ctx; 946 947 if (reduce_errno) { 948 SPDK_ERRLOG("number %d\n", reduce_errno); 949 } 950 951 comp_bdev->vol = NULL; 952 spdk_put_io_channel(comp_bdev->base_ch); 953 if (comp_bdev->orphaned == false) { 954 spdk_bdev_unregister(&comp_bdev->comp_bdev, vbdev_compress_delete_done, 955 comp_bdev->delete_ctx); 956 } else { 957 vbdev_compress_destruct_cb((void *)comp_bdev, 0); 958 } 959 960 } 961 962 static void 963 _delete_vol_unload_cb(void *ctx) 964 { 965 struct vbdev_compress *comp_bdev = ctx; 966 967 /* FIXME: Assert if these conditions are not satisfied for now. */ 968 assert(!comp_bdev->reduce_thread || 969 comp_bdev->reduce_thread == spdk_get_thread()); 970 971 /* reducelib needs a channel to comm with the backing device */ 972 comp_bdev->base_ch = spdk_bdev_get_io_channel(comp_bdev->base_desc); 973 974 /* Clean the device before we free our resources. */ 975 spdk_reduce_vol_destroy(&comp_bdev->backing_dev, _reduce_destroy_cb, comp_bdev); 976 } 977 978 /* Called by reduceLib after performing unload vol actions */ 979 static void 980 delete_vol_unload_cb(void *cb_arg, int reduce_errno) 981 { 982 struct vbdev_compress *comp_bdev = (struct vbdev_compress *)cb_arg; 983 984 if (reduce_errno) { 985 SPDK_ERRLOG("number %d\n", reduce_errno); 986 /* FIXME: callback should be executed. */ 987 return; 988 } 989 990 pthread_mutex_lock(&comp_bdev->reduce_lock); 991 if (comp_bdev->reduce_thread && comp_bdev->reduce_thread != spdk_get_thread()) { 992 spdk_thread_send_msg(comp_bdev->reduce_thread, 993 _delete_vol_unload_cb, comp_bdev); 994 pthread_mutex_unlock(&comp_bdev->reduce_lock); 995 } else { 996 pthread_mutex_unlock(&comp_bdev->reduce_lock); 997 998 _delete_vol_unload_cb(comp_bdev); 999 } 1000 } 1001 1002 const char * 1003 compress_get_name(const struct vbdev_compress *comp_bdev) 1004 { 1005 return comp_bdev->comp_bdev.name; 1006 } 1007 1008 struct vbdev_compress * 1009 compress_bdev_first(void) 1010 { 1011 struct vbdev_compress *comp_bdev; 1012 1013 comp_bdev = TAILQ_FIRST(&g_vbdev_comp); 1014 1015 return comp_bdev; 1016 } 1017 1018 struct vbdev_compress * 1019 compress_bdev_next(struct vbdev_compress *prev) 1020 { 1021 struct vbdev_compress *comp_bdev; 1022 1023 comp_bdev = TAILQ_NEXT(prev, link); 1024 1025 return comp_bdev; 1026 } 1027 1028 bool 1029 compress_has_orphan(const char *name) 1030 { 1031 struct vbdev_compress *comp_bdev; 1032 1033 TAILQ_FOREACH(comp_bdev, &g_vbdev_comp, link) { 1034 if (comp_bdev->orphaned && strcmp(name, comp_bdev->comp_bdev.name) == 0) { 1035 return true; 1036 } 1037 } 1038 return false; 1039 } 1040 1041 /* Called after we've unregistered following a hot remove callback. 1042 * Our finish entry point will be called next. 1043 */ 1044 static int 1045 vbdev_compress_destruct(void *ctx) 1046 { 1047 struct vbdev_compress *comp_bdev = (struct vbdev_compress *)ctx; 1048 1049 if (comp_bdev->vol != NULL) { 1050 /* Tell reducelib that we're done with this volume. */ 1051 spdk_reduce_vol_unload(comp_bdev->vol, vbdev_compress_destruct_cb, comp_bdev); 1052 } else { 1053 vbdev_compress_destruct_cb(comp_bdev, 0); 1054 } 1055 1056 return 0; 1057 } 1058 1059 /* We supplied this as an entry point for upper layers who want to communicate to this 1060 * bdev. This is how they get a channel. 1061 */ 1062 static struct spdk_io_channel * 1063 vbdev_compress_get_io_channel(void *ctx) 1064 { 1065 struct vbdev_compress *comp_bdev = (struct vbdev_compress *)ctx; 1066 1067 /* The IO channel code will allocate a channel for us which consists of 1068 * the SPDK channel structure plus the size of our comp_io_channel struct 1069 * that we passed in when we registered our IO device. It will then call 1070 * our channel create callback to populate any elements that we need to 1071 * update. 1072 */ 1073 return spdk_get_io_channel(comp_bdev); 1074 } 1075 1076 /* This is the output for bdev_get_bdevs() for this vbdev */ 1077 static int 1078 vbdev_compress_dump_info_json(void *ctx, struct spdk_json_write_ctx *w) 1079 { 1080 struct vbdev_compress *comp_bdev = (struct vbdev_compress *)ctx; 1081 1082 spdk_json_write_name(w, "compress"); 1083 spdk_json_write_object_begin(w); 1084 spdk_json_write_named_string(w, "name", spdk_bdev_get_name(&comp_bdev->comp_bdev)); 1085 spdk_json_write_named_string(w, "base_bdev_name", spdk_bdev_get_name(comp_bdev->base_bdev)); 1086 spdk_json_write_named_string(w, "compression_pmd", comp_bdev->drv_name); 1087 spdk_json_write_object_end(w); 1088 1089 return 0; 1090 } 1091 1092 /* This is used to generate JSON that can configure this module to its current state. */ 1093 static int 1094 vbdev_compress_config_json(struct spdk_json_write_ctx *w) 1095 { 1096 struct vbdev_compress *comp_bdev; 1097 1098 TAILQ_FOREACH(comp_bdev, &g_vbdev_comp, link) { 1099 spdk_json_write_object_begin(w); 1100 spdk_json_write_named_string(w, "method", "bdev_compress_create"); 1101 spdk_json_write_named_object_begin(w, "params"); 1102 spdk_json_write_named_string(w, "base_bdev_name", spdk_bdev_get_name(comp_bdev->base_bdev)); 1103 spdk_json_write_named_string(w, "name", spdk_bdev_get_name(&comp_bdev->comp_bdev)); 1104 spdk_json_write_named_string(w, "compression_pmd", comp_bdev->drv_name); 1105 spdk_json_write_object_end(w); 1106 spdk_json_write_object_end(w); 1107 } 1108 return 0; 1109 } 1110 1111 static void 1112 _vbdev_reduce_init_cb(void *ctx) 1113 { 1114 struct vbdev_compress *meta_ctx = ctx; 1115 int rc; 1116 1117 assert(meta_ctx->base_desc != NULL); 1118 1119 /* We're done with metadata operations */ 1120 spdk_put_io_channel(meta_ctx->base_ch); 1121 1122 if (meta_ctx->vol) { 1123 rc = vbdev_compress_claim(meta_ctx); 1124 if (rc == 0) { 1125 return; 1126 } 1127 } 1128 1129 /* Close the underlying bdev on its same opened thread. */ 1130 spdk_bdev_close(meta_ctx->base_desc); 1131 free(meta_ctx); 1132 } 1133 1134 /* Callback from reduce for when init is complete. We'll pass the vbdev_comp struct 1135 * used for initial metadata operations to claim where it will be further filled out 1136 * and added to the global list. 1137 */ 1138 static void 1139 vbdev_reduce_init_cb(void *cb_arg, struct spdk_reduce_vol *vol, int reduce_errno) 1140 { 1141 struct vbdev_compress *meta_ctx = cb_arg; 1142 1143 if (reduce_errno == 0) { 1144 meta_ctx->vol = vol; 1145 } else { 1146 SPDK_ERRLOG("for vol %s, error %u\n", 1147 spdk_bdev_get_name(meta_ctx->base_bdev), reduce_errno); 1148 } 1149 1150 if (meta_ctx->thread && meta_ctx->thread != spdk_get_thread()) { 1151 spdk_thread_send_msg(meta_ctx->thread, _vbdev_reduce_init_cb, meta_ctx); 1152 } else { 1153 _vbdev_reduce_init_cb(meta_ctx); 1154 } 1155 } 1156 1157 /* Callback for the function used by reduceLib to perform IO to/from the backing device. We just 1158 * call the callback provided by reduceLib when it called the read/write/unmap function and 1159 * free the bdev_io. 1160 */ 1161 static void 1162 comp_reduce_io_cb(struct spdk_bdev_io *bdev_io, bool success, void *arg) 1163 { 1164 struct spdk_reduce_vol_cb_args *cb_args = arg; 1165 int reduce_errno; 1166 1167 if (success) { 1168 reduce_errno = 0; 1169 } else { 1170 reduce_errno = -EIO; 1171 } 1172 spdk_bdev_free_io(bdev_io); 1173 cb_args->cb_fn(cb_args->cb_arg, reduce_errno); 1174 } 1175 1176 /* This is the function provided to the reduceLib for sending reads directly to 1177 * the backing device. 1178 */ 1179 static void 1180 _comp_reduce_readv(struct spdk_reduce_backing_dev *dev, struct iovec *iov, int iovcnt, 1181 uint64_t lba, uint32_t lba_count, struct spdk_reduce_vol_cb_args *args) 1182 { 1183 struct vbdev_compress *comp_bdev = SPDK_CONTAINEROF(dev, struct vbdev_compress, 1184 backing_dev); 1185 int rc; 1186 1187 rc = spdk_bdev_readv_blocks(comp_bdev->base_desc, comp_bdev->base_ch, 1188 iov, iovcnt, lba, lba_count, 1189 comp_reduce_io_cb, 1190 args); 1191 if (rc) { 1192 if (rc == -ENOMEM) { 1193 SPDK_ERRLOG("No memory, start to queue io.\n"); 1194 /* TODO: there's no bdev_io to queue */ 1195 } else { 1196 SPDK_ERRLOG("submitting readv request\n"); 1197 } 1198 args->cb_fn(args->cb_arg, rc); 1199 } 1200 } 1201 1202 /* This is the function provided to the reduceLib for sending writes directly to 1203 * the backing device. 1204 */ 1205 static void 1206 _comp_reduce_writev(struct spdk_reduce_backing_dev *dev, struct iovec *iov, int iovcnt, 1207 uint64_t lba, uint32_t lba_count, struct spdk_reduce_vol_cb_args *args) 1208 { 1209 struct vbdev_compress *comp_bdev = SPDK_CONTAINEROF(dev, struct vbdev_compress, 1210 backing_dev); 1211 int rc; 1212 1213 rc = spdk_bdev_writev_blocks(comp_bdev->base_desc, comp_bdev->base_ch, 1214 iov, iovcnt, lba, lba_count, 1215 comp_reduce_io_cb, 1216 args); 1217 if (rc) { 1218 if (rc == -ENOMEM) { 1219 SPDK_ERRLOG("No memory, start to queue io.\n"); 1220 /* TODO: there's no bdev_io to queue */ 1221 } else { 1222 SPDK_ERRLOG("error submitting writev request\n"); 1223 } 1224 args->cb_fn(args->cb_arg, rc); 1225 } 1226 } 1227 1228 /* This is the function provided to the reduceLib for sending unmaps directly to 1229 * the backing device. 1230 */ 1231 static void 1232 _comp_reduce_unmap(struct spdk_reduce_backing_dev *dev, 1233 uint64_t lba, uint32_t lba_count, struct spdk_reduce_vol_cb_args *args) 1234 { 1235 struct vbdev_compress *comp_bdev = SPDK_CONTAINEROF(dev, struct vbdev_compress, 1236 backing_dev); 1237 int rc; 1238 1239 rc = spdk_bdev_unmap_blocks(comp_bdev->base_desc, comp_bdev->base_ch, 1240 lba, lba_count, 1241 comp_reduce_io_cb, 1242 args); 1243 1244 if (rc) { 1245 if (rc == -ENOMEM) { 1246 SPDK_ERRLOG("No memory, start to queue io.\n"); 1247 /* TODO: there's no bdev_io to queue */ 1248 } else { 1249 SPDK_ERRLOG("submitting unmap request\n"); 1250 } 1251 args->cb_fn(args->cb_arg, rc); 1252 } 1253 } 1254 1255 /* Called by reduceLib after performing unload vol actions following base bdev hotremove */ 1256 static void 1257 bdev_hotremove_vol_unload_cb(void *cb_arg, int reduce_errno) 1258 { 1259 struct vbdev_compress *comp_bdev = (struct vbdev_compress *)cb_arg; 1260 1261 if (reduce_errno) { 1262 SPDK_ERRLOG("number %d\n", reduce_errno); 1263 } 1264 1265 comp_bdev->vol = NULL; 1266 spdk_bdev_unregister(&comp_bdev->comp_bdev, NULL, NULL); 1267 } 1268 1269 static void 1270 vbdev_compress_base_bdev_hotremove_cb(struct spdk_bdev *bdev_find) 1271 { 1272 struct vbdev_compress *comp_bdev, *tmp; 1273 1274 TAILQ_FOREACH_SAFE(comp_bdev, &g_vbdev_comp, link, tmp) { 1275 if (bdev_find == comp_bdev->base_bdev) { 1276 /* Tell reduceLib that we're done with this volume. */ 1277 spdk_reduce_vol_unload(comp_bdev->vol, bdev_hotremove_vol_unload_cb, comp_bdev); 1278 } 1279 } 1280 } 1281 1282 /* Called when the underlying base bdev triggers asynchronous event such as bdev removal. */ 1283 static void 1284 vbdev_compress_base_bdev_event_cb(enum spdk_bdev_event_type type, struct spdk_bdev *bdev, 1285 void *event_ctx) 1286 { 1287 switch (type) { 1288 case SPDK_BDEV_EVENT_REMOVE: 1289 vbdev_compress_base_bdev_hotremove_cb(bdev); 1290 break; 1291 default: 1292 SPDK_NOTICELOG("Unsupported bdev event: type %d\n", type); 1293 break; 1294 } 1295 } 1296 1297 /* TODO: determine which parms we want user configurable, HC for now 1298 * params.vol_size 1299 * params.chunk_size 1300 * compression PMD, algorithm, window size, comp level, etc. 1301 * DEV_MD_PATH 1302 */ 1303 1304 /* Common function for init and load to allocate and populate the minimal 1305 * information for reducelib to init or load. 1306 */ 1307 struct vbdev_compress * 1308 _prepare_for_load_init(struct spdk_bdev_desc *bdev_desc, uint32_t lb_size) 1309 { 1310 struct vbdev_compress *meta_ctx; 1311 struct spdk_bdev *bdev; 1312 1313 meta_ctx = calloc(1, sizeof(struct vbdev_compress)); 1314 if (meta_ctx == NULL) { 1315 SPDK_ERRLOG("failed to alloc init contexts\n"); 1316 return NULL; 1317 } 1318 1319 meta_ctx->drv_name = "None"; 1320 meta_ctx->backing_dev.unmap = _comp_reduce_unmap; 1321 meta_ctx->backing_dev.readv = _comp_reduce_readv; 1322 meta_ctx->backing_dev.writev = _comp_reduce_writev; 1323 meta_ctx->backing_dev.compress = _comp_reduce_compress; 1324 meta_ctx->backing_dev.decompress = _comp_reduce_decompress; 1325 1326 meta_ctx->base_desc = bdev_desc; 1327 bdev = spdk_bdev_desc_get_bdev(bdev_desc); 1328 meta_ctx->base_bdev = bdev; 1329 1330 meta_ctx->backing_dev.blocklen = bdev->blocklen; 1331 meta_ctx->backing_dev.blockcnt = bdev->blockcnt; 1332 1333 meta_ctx->params.chunk_size = CHUNK_SIZE; 1334 if (lb_size == 0) { 1335 meta_ctx->params.logical_block_size = bdev->blocklen; 1336 } else { 1337 meta_ctx->params.logical_block_size = lb_size; 1338 } 1339 1340 meta_ctx->params.backing_io_unit_size = BACKING_IO_SZ; 1341 return meta_ctx; 1342 } 1343 1344 static bool 1345 _set_pmd(struct vbdev_compress *comp_dev) 1346 { 1347 if (g_opts == COMPRESS_PMD_AUTO) { 1348 if (g_qat_available) { 1349 comp_dev->drv_name = QAT_PMD; 1350 } else if (g_mlx5_pci_available) { 1351 comp_dev->drv_name = MLX5_PMD; 1352 } else { 1353 comp_dev->drv_name = ISAL_PMD; 1354 } 1355 } else if (g_opts == COMPRESS_PMD_QAT_ONLY && g_qat_available) { 1356 comp_dev->drv_name = QAT_PMD; 1357 } else if (g_opts == COMPRESS_PMD_ISAL_ONLY && g_isal_available) { 1358 comp_dev->drv_name = ISAL_PMD; 1359 } else if (g_opts == COMPRESS_PMD_MLX5_PCI_ONLY && g_mlx5_pci_available) { 1360 comp_dev->drv_name = MLX5_PMD; 1361 } else { 1362 SPDK_ERRLOG("Requested PMD is not available.\n"); 1363 return false; 1364 } 1365 SPDK_NOTICELOG("PMD being used: %s\n", comp_dev->drv_name); 1366 return true; 1367 } 1368 1369 /* Call reducelib to initialize a new volume */ 1370 static int 1371 vbdev_init_reduce(const char *bdev_name, const char *pm_path, uint32_t lb_size) 1372 { 1373 struct spdk_bdev_desc *bdev_desc = NULL; 1374 struct vbdev_compress *meta_ctx; 1375 int rc; 1376 1377 rc = spdk_bdev_open_ext(bdev_name, true, vbdev_compress_base_bdev_event_cb, 1378 NULL, &bdev_desc); 1379 if (rc) { 1380 SPDK_ERRLOG("could not open bdev %s\n", bdev_name); 1381 return rc; 1382 } 1383 1384 meta_ctx = _prepare_for_load_init(bdev_desc, lb_size); 1385 if (meta_ctx == NULL) { 1386 spdk_bdev_close(bdev_desc); 1387 return -EINVAL; 1388 } 1389 1390 if (_set_pmd(meta_ctx) == false) { 1391 SPDK_ERRLOG("could not find required pmd\n"); 1392 free(meta_ctx); 1393 spdk_bdev_close(bdev_desc); 1394 return -EINVAL; 1395 } 1396 1397 /* Save the thread where the base device is opened */ 1398 meta_ctx->thread = spdk_get_thread(); 1399 1400 meta_ctx->base_ch = spdk_bdev_get_io_channel(meta_ctx->base_desc); 1401 1402 spdk_reduce_vol_init(&meta_ctx->params, &meta_ctx->backing_dev, 1403 pm_path, 1404 vbdev_reduce_init_cb, 1405 meta_ctx); 1406 return 0; 1407 } 1408 1409 /* We provide this callback for the SPDK channel code to create a channel using 1410 * the channel struct we provided in our module get_io_channel() entry point. Here 1411 * we get and save off an underlying base channel of the device below us so that 1412 * we can communicate with the base bdev on a per channel basis. If we needed 1413 * our own poller for this vbdev, we'd register it here. 1414 */ 1415 static int 1416 comp_bdev_ch_create_cb(void *io_device, void *ctx_buf) 1417 { 1418 struct vbdev_compress *comp_bdev = io_device; 1419 struct comp_device_qp *device_qp; 1420 1421 /* Now set the reduce channel if it's not already set. */ 1422 pthread_mutex_lock(&comp_bdev->reduce_lock); 1423 if (comp_bdev->ch_count == 0) { 1424 /* We use this queue to track outstanding IO in our layer. */ 1425 TAILQ_INIT(&comp_bdev->pending_comp_ios); 1426 1427 /* We use this to queue up compression operations as needed. */ 1428 TAILQ_INIT(&comp_bdev->queued_comp_ops); 1429 1430 comp_bdev->base_ch = spdk_bdev_get_io_channel(comp_bdev->base_desc); 1431 comp_bdev->reduce_thread = spdk_get_thread(); 1432 comp_bdev->poller = SPDK_POLLER_REGISTER(comp_dev_poller, comp_bdev, 0); 1433 /* Now assign a q pair */ 1434 pthread_mutex_lock(&g_comp_device_qp_lock); 1435 TAILQ_FOREACH(device_qp, &g_comp_device_qp, link) { 1436 if (strcmp(device_qp->device->cdev_info.driver_name, comp_bdev->drv_name) == 0) { 1437 if (device_qp->thread == spdk_get_thread()) { 1438 comp_bdev->device_qp = device_qp; 1439 break; 1440 } 1441 if (device_qp->thread == NULL) { 1442 comp_bdev->device_qp = device_qp; 1443 device_qp->thread = spdk_get_thread(); 1444 break; 1445 } 1446 } 1447 } 1448 pthread_mutex_unlock(&g_comp_device_qp_lock); 1449 } 1450 comp_bdev->ch_count++; 1451 pthread_mutex_unlock(&comp_bdev->reduce_lock); 1452 1453 if (comp_bdev->device_qp != NULL) { 1454 uint64_t comp_feature_flags = 1455 comp_bdev->device_qp->device->cdev_info.capabilities[RTE_COMP_ALGO_DEFLATE].comp_feature_flags; 1456 1457 if (comp_feature_flags & (RTE_COMP_FF_OOP_SGL_IN_SGL_OUT | RTE_COMP_FF_OOP_SGL_IN_LB_OUT)) { 1458 comp_bdev->backing_dev.sgl_in = true; 1459 } 1460 if (comp_feature_flags & (RTE_COMP_FF_OOP_SGL_IN_SGL_OUT | RTE_COMP_FF_OOP_LB_IN_SGL_OUT)) { 1461 comp_bdev->backing_dev.sgl_out = true; 1462 } 1463 return 0; 1464 } else { 1465 SPDK_ERRLOG("out of qpairs, cannot assign one to comp_bdev %p\n", comp_bdev); 1466 assert(false); 1467 return -ENOMEM; 1468 } 1469 } 1470 1471 static void 1472 _channel_cleanup(struct vbdev_compress *comp_bdev) 1473 { 1474 /* Note: comp_bdevs can share a device_qp if they are 1475 * on the same thread so we leave the device_qp element 1476 * alone for this comp_bdev and just clear the reduce thread. 1477 */ 1478 spdk_put_io_channel(comp_bdev->base_ch); 1479 comp_bdev->reduce_thread = NULL; 1480 spdk_poller_unregister(&comp_bdev->poller); 1481 } 1482 1483 /* Used to reroute destroy_ch to the correct thread */ 1484 static void 1485 _comp_bdev_ch_destroy_cb(void *arg) 1486 { 1487 struct vbdev_compress *comp_bdev = arg; 1488 1489 pthread_mutex_lock(&comp_bdev->reduce_lock); 1490 _channel_cleanup(comp_bdev); 1491 pthread_mutex_unlock(&comp_bdev->reduce_lock); 1492 } 1493 1494 /* We provide this callback for the SPDK channel code to destroy a channel 1495 * created with our create callback. We just need to undo anything we did 1496 * when we created. If this bdev used its own poller, we'd unregister it here. 1497 */ 1498 static void 1499 comp_bdev_ch_destroy_cb(void *io_device, void *ctx_buf) 1500 { 1501 struct vbdev_compress *comp_bdev = io_device; 1502 1503 pthread_mutex_lock(&comp_bdev->reduce_lock); 1504 comp_bdev->ch_count--; 1505 if (comp_bdev->ch_count == 0) { 1506 /* Send this request to the thread where the channel was created. */ 1507 if (comp_bdev->reduce_thread != spdk_get_thread()) { 1508 spdk_thread_send_msg(comp_bdev->reduce_thread, 1509 _comp_bdev_ch_destroy_cb, comp_bdev); 1510 } else { 1511 _channel_cleanup(comp_bdev); 1512 } 1513 } 1514 pthread_mutex_unlock(&comp_bdev->reduce_lock); 1515 } 1516 1517 /* RPC entry point for compression vbdev creation. */ 1518 int 1519 create_compress_bdev(const char *bdev_name, const char *pm_path, uint32_t lb_size) 1520 { 1521 struct vbdev_compress *comp_bdev = NULL; 1522 1523 if ((lb_size != 0) && (lb_size != LB_SIZE_4K) && (lb_size != LB_SIZE_512B)) { 1524 SPDK_ERRLOG("Logical block size must be 512 or 4096\n"); 1525 return -EINVAL; 1526 } 1527 1528 TAILQ_FOREACH(comp_bdev, &g_vbdev_comp, link) { 1529 if (strcmp(bdev_name, comp_bdev->base_bdev->name) == 0) { 1530 SPDK_ERRLOG("Bass bdev %s already being used for a compress bdev\n", bdev_name); 1531 return -EBUSY; 1532 } 1533 } 1534 return vbdev_init_reduce(bdev_name, pm_path, lb_size); 1535 } 1536 1537 /* On init, just init the compress drivers. All metadata is stored on disk. */ 1538 static int 1539 vbdev_compress_init(void) 1540 { 1541 if (vbdev_init_compress_drivers()) { 1542 SPDK_ERRLOG("Error setting up compression devices\n"); 1543 return -EINVAL; 1544 } 1545 1546 return 0; 1547 } 1548 1549 /* Called when the entire module is being torn down. */ 1550 static void 1551 vbdev_compress_finish(void) 1552 { 1553 struct comp_device_qp *dev_qp; 1554 /* TODO: unload vol in a future patch */ 1555 1556 while ((dev_qp = TAILQ_FIRST(&g_comp_device_qp))) { 1557 TAILQ_REMOVE(&g_comp_device_qp, dev_qp, link); 1558 free(dev_qp); 1559 } 1560 pthread_mutex_destroy(&g_comp_device_qp_lock); 1561 1562 rte_mempool_free(g_comp_op_mp); 1563 rte_mempool_free(g_mbuf_mp); 1564 } 1565 1566 /* During init we'll be asked how much memory we'd like passed to us 1567 * in bev_io structures as context. Here's where we specify how 1568 * much context we want per IO. 1569 */ 1570 static int 1571 vbdev_compress_get_ctx_size(void) 1572 { 1573 return sizeof(struct comp_bdev_io); 1574 } 1575 1576 /* When we register our bdev this is how we specify our entry points. */ 1577 static const struct spdk_bdev_fn_table vbdev_compress_fn_table = { 1578 .destruct = vbdev_compress_destruct, 1579 .submit_request = vbdev_compress_submit_request, 1580 .io_type_supported = vbdev_compress_io_type_supported, 1581 .get_io_channel = vbdev_compress_get_io_channel, 1582 .dump_info_json = vbdev_compress_dump_info_json, 1583 .write_config_json = NULL, 1584 }; 1585 1586 static struct spdk_bdev_module compress_if = { 1587 .name = "compress", 1588 .module_init = vbdev_compress_init, 1589 .get_ctx_size = vbdev_compress_get_ctx_size, 1590 .examine_disk = vbdev_compress_examine, 1591 .module_fini = vbdev_compress_finish, 1592 .config_json = vbdev_compress_config_json 1593 }; 1594 1595 SPDK_BDEV_MODULE_REGISTER(compress, &compress_if) 1596 1597 static int _set_compbdev_name(struct vbdev_compress *comp_bdev) 1598 { 1599 struct spdk_bdev_alias *aliases; 1600 1601 if (!TAILQ_EMPTY(spdk_bdev_get_aliases(comp_bdev->base_bdev))) { 1602 aliases = TAILQ_FIRST(spdk_bdev_get_aliases(comp_bdev->base_bdev)); 1603 comp_bdev->comp_bdev.name = spdk_sprintf_alloc("COMP_%s", aliases->alias.name); 1604 if (!comp_bdev->comp_bdev.name) { 1605 SPDK_ERRLOG("could not allocate comp_bdev name for alias\n"); 1606 return -ENOMEM; 1607 } 1608 } else { 1609 comp_bdev->comp_bdev.name = spdk_sprintf_alloc("COMP_%s", comp_bdev->base_bdev->name); 1610 if (!comp_bdev->comp_bdev.name) { 1611 SPDK_ERRLOG("could not allocate comp_bdev name for unique name\n"); 1612 return -ENOMEM; 1613 } 1614 } 1615 return 0; 1616 } 1617 1618 static int 1619 vbdev_compress_claim(struct vbdev_compress *comp_bdev) 1620 { 1621 int rc; 1622 1623 if (_set_compbdev_name(comp_bdev)) { 1624 return -EINVAL; 1625 } 1626 1627 /* Note: some of the fields below will change in the future - for example, 1628 * blockcnt specifically will not match (the compressed volume size will 1629 * be slightly less than the base bdev size) 1630 */ 1631 comp_bdev->comp_bdev.product_name = COMP_BDEV_NAME; 1632 comp_bdev->comp_bdev.write_cache = comp_bdev->base_bdev->write_cache; 1633 1634 if (strcmp(comp_bdev->drv_name, QAT_PMD) == 0) { 1635 comp_bdev->comp_bdev.required_alignment = 1636 spdk_max(spdk_u32log2(comp_bdev->base_bdev->blocklen), 1637 comp_bdev->base_bdev->required_alignment); 1638 SPDK_NOTICELOG("QAT in use: Required alignment set to %u\n", 1639 comp_bdev->comp_bdev.required_alignment); 1640 } else { 1641 comp_bdev->comp_bdev.required_alignment = comp_bdev->base_bdev->required_alignment; 1642 } 1643 comp_bdev->comp_bdev.optimal_io_boundary = 1644 comp_bdev->params.chunk_size / comp_bdev->params.logical_block_size; 1645 1646 comp_bdev->comp_bdev.split_on_optimal_io_boundary = true; 1647 1648 comp_bdev->comp_bdev.blocklen = comp_bdev->params.logical_block_size; 1649 comp_bdev->comp_bdev.blockcnt = comp_bdev->params.vol_size / comp_bdev->comp_bdev.blocklen; 1650 assert(comp_bdev->comp_bdev.blockcnt > 0); 1651 1652 /* This is the context that is passed to us when the bdev 1653 * layer calls in so we'll save our comp_bdev node here. 1654 */ 1655 comp_bdev->comp_bdev.ctxt = comp_bdev; 1656 comp_bdev->comp_bdev.fn_table = &vbdev_compress_fn_table; 1657 comp_bdev->comp_bdev.module = &compress_if; 1658 1659 pthread_mutex_init(&comp_bdev->reduce_lock, NULL); 1660 1661 /* Save the thread where the base device is opened */ 1662 comp_bdev->thread = spdk_get_thread(); 1663 1664 spdk_io_device_register(comp_bdev, comp_bdev_ch_create_cb, comp_bdev_ch_destroy_cb, 1665 sizeof(struct comp_io_channel), 1666 comp_bdev->comp_bdev.name); 1667 1668 rc = spdk_bdev_module_claim_bdev(comp_bdev->base_bdev, comp_bdev->base_desc, 1669 comp_bdev->comp_bdev.module); 1670 if (rc) { 1671 SPDK_ERRLOG("could not claim bdev %s\n", spdk_bdev_get_name(comp_bdev->base_bdev)); 1672 goto error_claim; 1673 } 1674 1675 rc = spdk_bdev_register(&comp_bdev->comp_bdev); 1676 if (rc < 0) { 1677 SPDK_ERRLOG("trying to register bdev\n"); 1678 goto error_bdev_register; 1679 } 1680 1681 TAILQ_INSERT_TAIL(&g_vbdev_comp, comp_bdev, link); 1682 1683 SPDK_NOTICELOG("registered io_device and virtual bdev for: %s\n", comp_bdev->comp_bdev.name); 1684 1685 return 0; 1686 1687 /* Error cleanup paths. */ 1688 error_bdev_register: 1689 spdk_bdev_module_release_bdev(comp_bdev->base_bdev); 1690 error_claim: 1691 spdk_io_device_unregister(comp_bdev, NULL); 1692 free(comp_bdev->comp_bdev.name); 1693 return rc; 1694 } 1695 1696 static void 1697 _vbdev_compress_delete_done(void *_ctx) 1698 { 1699 struct vbdev_comp_delete_ctx *ctx = _ctx; 1700 1701 ctx->cb_fn(ctx->cb_arg, ctx->cb_rc); 1702 1703 free(ctx); 1704 } 1705 1706 static void 1707 vbdev_compress_delete_done(void *cb_arg, int bdeverrno) 1708 { 1709 struct vbdev_comp_delete_ctx *ctx = cb_arg; 1710 1711 ctx->cb_rc = bdeverrno; 1712 1713 if (ctx->orig_thread != spdk_get_thread()) { 1714 spdk_thread_send_msg(ctx->orig_thread, _vbdev_compress_delete_done, ctx); 1715 } else { 1716 _vbdev_compress_delete_done(ctx); 1717 } 1718 } 1719 1720 void 1721 bdev_compress_delete(const char *name, spdk_delete_compress_complete cb_fn, void *cb_arg) 1722 { 1723 struct vbdev_compress *comp_bdev = NULL; 1724 struct vbdev_comp_delete_ctx *ctx; 1725 1726 TAILQ_FOREACH(comp_bdev, &g_vbdev_comp, link) { 1727 if (strcmp(name, comp_bdev->comp_bdev.name) == 0) { 1728 break; 1729 } 1730 } 1731 1732 if (comp_bdev == NULL) { 1733 cb_fn(cb_arg, -ENODEV); 1734 return; 1735 } 1736 1737 ctx = calloc(1, sizeof(*ctx)); 1738 if (ctx == NULL) { 1739 SPDK_ERRLOG("Failed to allocate delete context\n"); 1740 cb_fn(cb_arg, -ENOMEM); 1741 return; 1742 } 1743 1744 /* Save these for after the vol is destroyed. */ 1745 ctx->cb_fn = cb_fn; 1746 ctx->cb_arg = cb_arg; 1747 ctx->orig_thread = spdk_get_thread(); 1748 1749 comp_bdev->delete_ctx = ctx; 1750 1751 /* Tell reducelib that we're done with this volume. */ 1752 if (comp_bdev->orphaned == false) { 1753 spdk_reduce_vol_unload(comp_bdev->vol, delete_vol_unload_cb, comp_bdev); 1754 } else { 1755 delete_vol_unload_cb(comp_bdev, 0); 1756 } 1757 } 1758 1759 static void 1760 _vbdev_reduce_load_cb(void *ctx) 1761 { 1762 struct vbdev_compress *meta_ctx = ctx; 1763 int rc; 1764 1765 assert(meta_ctx->base_desc != NULL); 1766 1767 /* Done with metadata operations */ 1768 spdk_put_io_channel(meta_ctx->base_ch); 1769 1770 if (meta_ctx->reduce_errno == 0) { 1771 if (_set_pmd(meta_ctx) == false) { 1772 SPDK_ERRLOG("could not find required pmd\n"); 1773 goto err; 1774 } 1775 1776 rc = vbdev_compress_claim(meta_ctx); 1777 if (rc != 0) { 1778 goto err; 1779 } 1780 } else if (meta_ctx->reduce_errno == -ENOENT) { 1781 if (_set_compbdev_name(meta_ctx)) { 1782 goto err; 1783 } 1784 1785 /* Save the thread where the base device is opened */ 1786 meta_ctx->thread = spdk_get_thread(); 1787 1788 meta_ctx->comp_bdev.module = &compress_if; 1789 pthread_mutex_init(&meta_ctx->reduce_lock, NULL); 1790 rc = spdk_bdev_module_claim_bdev(meta_ctx->base_bdev, meta_ctx->base_desc, 1791 meta_ctx->comp_bdev.module); 1792 if (rc) { 1793 SPDK_ERRLOG("could not claim bdev %s\n", spdk_bdev_get_name(meta_ctx->base_bdev)); 1794 free(meta_ctx->comp_bdev.name); 1795 goto err; 1796 } 1797 1798 meta_ctx->orphaned = true; 1799 TAILQ_INSERT_TAIL(&g_vbdev_comp, meta_ctx, link); 1800 } else { 1801 if (meta_ctx->reduce_errno != -EILSEQ) { 1802 SPDK_ERRLOG("for vol %s, error %u\n", 1803 spdk_bdev_get_name(meta_ctx->base_bdev), meta_ctx->reduce_errno); 1804 } 1805 goto err; 1806 } 1807 1808 spdk_bdev_module_examine_done(&compress_if); 1809 return; 1810 1811 err: 1812 /* Close the underlying bdev on its same opened thread. */ 1813 spdk_bdev_close(meta_ctx->base_desc); 1814 free(meta_ctx); 1815 spdk_bdev_module_examine_done(&compress_if); 1816 } 1817 1818 /* Callback from reduce for then load is complete. We'll pass the vbdev_comp struct 1819 * used for initial metadata operations to claim where it will be further filled out 1820 * and added to the global list. 1821 */ 1822 static void 1823 vbdev_reduce_load_cb(void *cb_arg, struct spdk_reduce_vol *vol, int reduce_errno) 1824 { 1825 struct vbdev_compress *meta_ctx = cb_arg; 1826 1827 if (reduce_errno == 0) { 1828 /* Update information following volume load. */ 1829 meta_ctx->vol = vol; 1830 memcpy(&meta_ctx->params, spdk_reduce_vol_get_params(vol), 1831 sizeof(struct spdk_reduce_vol_params)); 1832 } 1833 1834 meta_ctx->reduce_errno = reduce_errno; 1835 1836 if (meta_ctx->thread && meta_ctx->thread != spdk_get_thread()) { 1837 spdk_thread_send_msg(meta_ctx->thread, _vbdev_reduce_load_cb, meta_ctx); 1838 } else { 1839 _vbdev_reduce_load_cb(meta_ctx); 1840 } 1841 1842 } 1843 1844 /* Examine_disk entry point: will do a metadata load to see if this is ours, 1845 * and if so will go ahead and claim it. 1846 */ 1847 static void 1848 vbdev_compress_examine(struct spdk_bdev *bdev) 1849 { 1850 struct spdk_bdev_desc *bdev_desc = NULL; 1851 struct vbdev_compress *meta_ctx; 1852 int rc; 1853 1854 if (strcmp(bdev->product_name, COMP_BDEV_NAME) == 0) { 1855 spdk_bdev_module_examine_done(&compress_if); 1856 return; 1857 } 1858 1859 rc = spdk_bdev_open_ext(spdk_bdev_get_name(bdev), false, 1860 vbdev_compress_base_bdev_event_cb, NULL, &bdev_desc); 1861 if (rc) { 1862 SPDK_ERRLOG("could not open bdev %s\n", spdk_bdev_get_name(bdev)); 1863 spdk_bdev_module_examine_done(&compress_if); 1864 return; 1865 } 1866 1867 meta_ctx = _prepare_for_load_init(bdev_desc, 0); 1868 if (meta_ctx == NULL) { 1869 spdk_bdev_close(bdev_desc); 1870 spdk_bdev_module_examine_done(&compress_if); 1871 return; 1872 } 1873 1874 /* Save the thread where the base device is opened */ 1875 meta_ctx->thread = spdk_get_thread(); 1876 1877 meta_ctx->base_ch = spdk_bdev_get_io_channel(meta_ctx->base_desc); 1878 spdk_reduce_vol_load(&meta_ctx->backing_dev, vbdev_reduce_load_cb, meta_ctx); 1879 } 1880 1881 int 1882 compress_set_pmd(enum compress_pmd *opts) 1883 { 1884 g_opts = *opts; 1885 1886 return 0; 1887 } 1888 1889 SPDK_LOG_REGISTER_COMPONENT(vbdev_compress) 1890