1 /*- 2 * BSD LICENSE 3 * 4 * Copyright (c) Intel Corporation. 5 * All rights reserved. 6 * Copyright (c) 2021, 2022 NVIDIA CORPORATION & AFFILIATES. All rights reserved. 7 * 8 * Redistribution and use in source and binary forms, with or without 9 * modification, are permitted provided that the following conditions 10 * are met: 11 * 12 * * Redistributions of source code must retain the above copyright 13 * notice, this list of conditions and the following disclaimer. 14 * * Redistributions in binary form must reproduce the above copyright 15 * notice, this list of conditions and the following disclaimer in 16 * the documentation and/or other materials provided with the 17 * distribution. 18 * * Neither the name of Intel Corporation nor the names of its 19 * contributors may be used to endorse or promote products derived 20 * from this software without specific prior written permission. 21 * 22 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 23 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 24 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR 25 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT 26 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, 27 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT 28 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 29 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY 30 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 31 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE 32 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 33 */ 34 35 #include "vbdev_compress.h" 36 37 #include "spdk/reduce.h" 38 #include "spdk/stdinc.h" 39 #include "spdk/rpc.h" 40 #include "spdk/env.h" 41 #include "spdk/endian.h" 42 #include "spdk/string.h" 43 #include "spdk/thread.h" 44 #include "spdk/util.h" 45 #include "spdk/bdev_module.h" 46 #include "spdk/likely.h" 47 48 #include "spdk/log.h" 49 50 #include <rte_config.h> 51 #include <rte_bus_vdev.h> 52 #include <rte_compressdev.h> 53 #include <rte_comp.h> 54 #include <rte_mbuf_dyn.h> 55 56 /* Used to store IO context in mbuf */ 57 static const struct rte_mbuf_dynfield rte_mbuf_dynfield_io_context = { 58 .name = "context_reduce", 59 .size = sizeof(uint64_t), 60 .align = __alignof__(uint64_t), 61 .flags = 0, 62 }; 63 static int g_mbuf_offset; 64 65 #define NUM_MAX_XFORMS 2 66 #define NUM_MAX_INFLIGHT_OPS 128 67 #define DEFAULT_WINDOW_SIZE 15 68 /* We need extra mbufs per operation to accommodate host buffers that 69 * span a physical page boundary. 70 */ 71 #define MAX_MBUFS_PER_OP (REDUCE_MAX_IOVECS * 2) 72 #define CHUNK_SIZE (1024 * 16) 73 #define COMP_BDEV_NAME "compress" 74 #define BACKING_IO_SZ (4 * 1024) 75 76 #define ISAL_PMD "compress_isal" 77 #define QAT_PMD "compress_qat" 78 #define MLX5_PMD "mlx5_pci" 79 #define NUM_MBUFS 8192 80 #define POOL_CACHE_SIZE 256 81 82 static enum compress_pmd g_opts; 83 84 /* Global list of available compression devices. */ 85 struct compress_dev { 86 struct rte_compressdev_info cdev_info; /* includes device friendly name */ 87 uint8_t cdev_id; /* identifier for the device */ 88 void *comp_xform; /* shared private xform for comp on this PMD */ 89 void *decomp_xform; /* shared private xform for decomp on this PMD */ 90 TAILQ_ENTRY(compress_dev) link; 91 }; 92 static TAILQ_HEAD(, compress_dev) g_compress_devs = TAILQ_HEAD_INITIALIZER(g_compress_devs); 93 94 /* Although ISAL PMD reports 'unlimited' qpairs, it has an unplanned limit of 99 due to 95 * the length of the internal ring name that it creates, it breaks a limit in the generic 96 * ring code and fails the qp initialization. 97 * FIXME: Reduce number of qpairs to 48, due to issue #2338 98 */ 99 #define MAX_NUM_QP 48 100 /* Global list and lock for unique device/queue pair combos */ 101 struct comp_device_qp { 102 struct compress_dev *device; /* ptr to compression device */ 103 uint8_t qp; /* queue pair for this node */ 104 struct spdk_thread *thread; /* thread that this qp is assigned to */ 105 TAILQ_ENTRY(comp_device_qp) link; 106 }; 107 static TAILQ_HEAD(, comp_device_qp) g_comp_device_qp = TAILQ_HEAD_INITIALIZER(g_comp_device_qp); 108 static pthread_mutex_t g_comp_device_qp_lock = PTHREAD_MUTEX_INITIALIZER; 109 110 /* For queueing up compression operations that we can't submit for some reason */ 111 struct vbdev_comp_op { 112 struct spdk_reduce_backing_dev *backing_dev; 113 struct iovec *src_iovs; 114 int src_iovcnt; 115 struct iovec *dst_iovs; 116 int dst_iovcnt; 117 bool compress; 118 void *cb_arg; 119 TAILQ_ENTRY(vbdev_comp_op) link; 120 }; 121 122 struct vbdev_comp_delete_ctx { 123 spdk_delete_compress_complete cb_fn; 124 void *cb_arg; 125 int cb_rc; 126 struct spdk_thread *orig_thread; 127 }; 128 129 /* List of virtual bdevs and associated info for each. */ 130 struct vbdev_compress { 131 struct spdk_bdev *base_bdev; /* the thing we're attaching to */ 132 struct spdk_bdev_desc *base_desc; /* its descriptor we get from open */ 133 struct spdk_io_channel *base_ch; /* IO channel of base device */ 134 struct spdk_bdev comp_bdev; /* the compression virtual bdev */ 135 struct comp_io_channel *comp_ch; /* channel associated with this bdev */ 136 char *drv_name; /* name of the compression device driver */ 137 struct comp_device_qp *device_qp; 138 struct spdk_thread *reduce_thread; 139 pthread_mutex_t reduce_lock; 140 uint32_t ch_count; 141 TAILQ_HEAD(, spdk_bdev_io) pending_comp_ios; /* outstanding operations to a comp library */ 142 struct spdk_poller *poller; /* completion poller */ 143 struct spdk_reduce_vol_params params; /* params for the reduce volume */ 144 struct spdk_reduce_backing_dev backing_dev; /* backing device info for the reduce volume */ 145 struct spdk_reduce_vol *vol; /* the reduce volume */ 146 struct vbdev_comp_delete_ctx *delete_ctx; 147 bool orphaned; /* base bdev claimed but comp_bdev not registered */ 148 int reduce_errno; 149 TAILQ_HEAD(, vbdev_comp_op) queued_comp_ops; 150 TAILQ_ENTRY(vbdev_compress) link; 151 struct spdk_thread *thread; /* thread where base device is opened */ 152 }; 153 static TAILQ_HEAD(, vbdev_compress) g_vbdev_comp = TAILQ_HEAD_INITIALIZER(g_vbdev_comp); 154 155 /* The comp vbdev channel struct. It is allocated and freed on my behalf by the io channel code. 156 */ 157 struct comp_io_channel { 158 struct spdk_io_channel_iter *iter; /* used with for_each_channel in reset */ 159 }; 160 161 /* Per I/O context for the compression vbdev. */ 162 struct comp_bdev_io { 163 struct comp_io_channel *comp_ch; /* used in completion handling */ 164 struct vbdev_compress *comp_bdev; /* vbdev associated with this IO */ 165 struct spdk_bdev_io_wait_entry bdev_io_wait; /* for bdev_io_wait */ 166 struct spdk_bdev_io *orig_io; /* the original IO */ 167 struct spdk_io_channel *ch; /* for resubmission */ 168 int status; /* save for completion on orig thread */ 169 }; 170 171 /* Shared mempools between all devices on this system */ 172 static struct rte_mempool *g_mbuf_mp = NULL; /* mbuf mempool */ 173 static struct rte_mempool *g_comp_op_mp = NULL; /* comp operations, must be rte* mempool */ 174 static struct rte_mbuf_ext_shared_info g_shinfo = {}; /* used by DPDK mbuf macros */ 175 static bool g_qat_available = false; 176 static bool g_isal_available = false; 177 static bool g_mlx5_pci_available = false; 178 179 /* Create shared (between all ops per PMD) compress xforms. */ 180 static struct rte_comp_xform g_comp_xform = { 181 .type = RTE_COMP_COMPRESS, 182 .compress = { 183 .algo = RTE_COMP_ALGO_DEFLATE, 184 .deflate.huffman = RTE_COMP_HUFFMAN_DEFAULT, 185 .level = RTE_COMP_LEVEL_MAX, 186 .window_size = DEFAULT_WINDOW_SIZE, 187 .chksum = RTE_COMP_CHECKSUM_NONE, 188 .hash_algo = RTE_COMP_HASH_ALGO_NONE 189 } 190 }; 191 /* Create shared (between all ops per PMD) decompress xforms. */ 192 static struct rte_comp_xform g_decomp_xform = { 193 .type = RTE_COMP_DECOMPRESS, 194 .decompress = { 195 .algo = RTE_COMP_ALGO_DEFLATE, 196 .chksum = RTE_COMP_CHECKSUM_NONE, 197 .window_size = DEFAULT_WINDOW_SIZE, 198 .hash_algo = RTE_COMP_HASH_ALGO_NONE 199 } 200 }; 201 202 static void vbdev_compress_examine(struct spdk_bdev *bdev); 203 static int vbdev_compress_claim(struct vbdev_compress *comp_bdev); 204 static void vbdev_compress_queue_io(struct spdk_bdev_io *bdev_io); 205 struct vbdev_compress *_prepare_for_load_init(struct spdk_bdev_desc *bdev_desc, uint32_t lb_size); 206 static void vbdev_compress_submit_request(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io); 207 static void comp_bdev_ch_destroy_cb(void *io_device, void *ctx_buf); 208 static void vbdev_compress_delete_done(void *cb_arg, int bdeverrno); 209 210 /* Dummy function used by DPDK to free ext attached buffers 211 * to mbufs, we free them ourselves but this callback has to 212 * be here. 213 */ 214 static void 215 shinfo_free_cb(void *arg1, void *arg2) 216 { 217 } 218 219 /* Called by vbdev_init_compress_drivers() to init each discovered compression device */ 220 static int 221 create_compress_dev(uint8_t index) 222 { 223 struct compress_dev *device; 224 uint16_t q_pairs; 225 uint8_t cdev_id; 226 int rc, i; 227 struct comp_device_qp *dev_qp; 228 struct comp_device_qp *tmp_qp; 229 230 device = calloc(1, sizeof(struct compress_dev)); 231 if (!device) { 232 return -ENOMEM; 233 } 234 235 /* Get details about this device. */ 236 rte_compressdev_info_get(index, &device->cdev_info); 237 238 cdev_id = device->cdev_id = index; 239 240 /* Zero means no limit so choose number of lcores. */ 241 if (device->cdev_info.max_nb_queue_pairs == 0) { 242 q_pairs = MAX_NUM_QP; 243 } else { 244 q_pairs = spdk_min(device->cdev_info.max_nb_queue_pairs, MAX_NUM_QP); 245 } 246 247 /* Configure the compression device. */ 248 struct rte_compressdev_config config = { 249 .socket_id = rte_socket_id(), 250 .nb_queue_pairs = q_pairs, 251 .max_nb_priv_xforms = NUM_MAX_XFORMS, 252 .max_nb_streams = 0 253 }; 254 rc = rte_compressdev_configure(cdev_id, &config); 255 if (rc < 0) { 256 SPDK_ERRLOG("Failed to configure compressdev %u\n", cdev_id); 257 goto err; 258 } 259 260 /* Pre-setup all potential qpairs now and assign them in the channel 261 * callback. 262 */ 263 for (i = 0; i < q_pairs; i++) { 264 rc = rte_compressdev_queue_pair_setup(cdev_id, i, 265 NUM_MAX_INFLIGHT_OPS, 266 rte_socket_id()); 267 if (rc) { 268 if (i > 0) { 269 q_pairs = i; 270 SPDK_NOTICELOG("FYI failed to setup a queue pair on " 271 "compressdev %u with error %u " 272 "so limiting to %u qpairs\n", 273 cdev_id, rc, q_pairs); 274 break; 275 } else { 276 SPDK_ERRLOG("Failed to setup queue pair on " 277 "compressdev %u with error %u\n", cdev_id, rc); 278 rc = -EINVAL; 279 goto err; 280 } 281 } 282 } 283 284 rc = rte_compressdev_start(cdev_id); 285 if (rc < 0) { 286 SPDK_ERRLOG("Failed to start device %u: error %d\n", 287 cdev_id, rc); 288 goto err; 289 } 290 291 if (device->cdev_info.capabilities->comp_feature_flags & RTE_COMP_FF_SHAREABLE_PRIV_XFORM) { 292 rc = rte_compressdev_private_xform_create(cdev_id, &g_comp_xform, 293 &device->comp_xform); 294 if (rc < 0) { 295 SPDK_ERRLOG("Failed to create private comp xform device %u: error %d\n", 296 cdev_id, rc); 297 goto err; 298 } 299 300 rc = rte_compressdev_private_xform_create(cdev_id, &g_decomp_xform, 301 &device->decomp_xform); 302 if (rc) { 303 SPDK_ERRLOG("Failed to create private decomp xform device %u: error %d\n", 304 cdev_id, rc); 305 goto err; 306 } 307 } else { 308 SPDK_ERRLOG("PMD does not support shared transforms\n"); 309 goto err; 310 } 311 312 /* Build up list of device/qp combinations */ 313 for (i = 0; i < q_pairs; i++) { 314 dev_qp = calloc(1, sizeof(struct comp_device_qp)); 315 if (!dev_qp) { 316 rc = -ENOMEM; 317 goto err; 318 } 319 dev_qp->device = device; 320 dev_qp->qp = i; 321 dev_qp->thread = NULL; 322 TAILQ_INSERT_TAIL(&g_comp_device_qp, dev_qp, link); 323 } 324 325 TAILQ_INSERT_TAIL(&g_compress_devs, device, link); 326 327 if (strcmp(device->cdev_info.driver_name, QAT_PMD) == 0) { 328 g_qat_available = true; 329 } 330 if (strcmp(device->cdev_info.driver_name, ISAL_PMD) == 0) { 331 g_isal_available = true; 332 } 333 if (strcmp(device->cdev_info.driver_name, MLX5_PMD) == 0) { 334 g_mlx5_pci_available = true; 335 } 336 337 return 0; 338 339 err: 340 TAILQ_FOREACH_SAFE(dev_qp, &g_comp_device_qp, link, tmp_qp) { 341 TAILQ_REMOVE(&g_comp_device_qp, dev_qp, link); 342 free(dev_qp); 343 } 344 free(device); 345 return rc; 346 } 347 348 /* Called from driver init entry point, vbdev_compress_init() */ 349 static int 350 vbdev_init_compress_drivers(void) 351 { 352 uint8_t cdev_count, i; 353 struct compress_dev *tmp_dev; 354 struct compress_dev *device; 355 int rc; 356 357 /* We always init the compress_isal PMD */ 358 rc = rte_vdev_init(ISAL_PMD, NULL); 359 if (rc == 0) { 360 SPDK_NOTICELOG("created virtual PMD %s\n", ISAL_PMD); 361 } else if (rc == -EEXIST) { 362 SPDK_NOTICELOG("virtual PMD %s already exists.\n", ISAL_PMD); 363 } else { 364 SPDK_ERRLOG("creating virtual PMD %s\n", ISAL_PMD); 365 return -EINVAL; 366 } 367 368 /* If we have no compression devices, there's no reason to continue. */ 369 cdev_count = rte_compressdev_count(); 370 if (cdev_count == 0) { 371 return 0; 372 } 373 if (cdev_count > RTE_COMPRESS_MAX_DEVS) { 374 SPDK_ERRLOG("invalid device count from rte_compressdev_count()\n"); 375 return -EINVAL; 376 } 377 378 g_mbuf_offset = rte_mbuf_dynfield_register(&rte_mbuf_dynfield_io_context); 379 if (g_mbuf_offset < 0) { 380 SPDK_ERRLOG("error registering dynamic field with DPDK\n"); 381 return -EINVAL; 382 } 383 384 g_mbuf_mp = rte_pktmbuf_pool_create("comp_mbuf_mp", NUM_MBUFS, POOL_CACHE_SIZE, 385 sizeof(struct rte_mbuf), 0, rte_socket_id()); 386 if (g_mbuf_mp == NULL) { 387 SPDK_ERRLOG("Cannot create mbuf pool\n"); 388 rc = -ENOMEM; 389 goto error_create_mbuf; 390 } 391 392 g_comp_op_mp = rte_comp_op_pool_create("comp_op_pool", NUM_MBUFS, POOL_CACHE_SIZE, 393 0, rte_socket_id()); 394 if (g_comp_op_mp == NULL) { 395 SPDK_ERRLOG("Cannot create comp op pool\n"); 396 rc = -ENOMEM; 397 goto error_create_op; 398 } 399 400 /* Init all devices */ 401 for (i = 0; i < cdev_count; i++) { 402 rc = create_compress_dev(i); 403 if (rc != 0) { 404 goto error_create_compress_devs; 405 } 406 } 407 408 if (g_qat_available == true) { 409 SPDK_NOTICELOG("initialized QAT PMD\n"); 410 } 411 412 g_shinfo.free_cb = shinfo_free_cb; 413 414 return 0; 415 416 /* Error cleanup paths. */ 417 error_create_compress_devs: 418 TAILQ_FOREACH_SAFE(device, &g_compress_devs, link, tmp_dev) { 419 TAILQ_REMOVE(&g_compress_devs, device, link); 420 free(device); 421 } 422 error_create_op: 423 error_create_mbuf: 424 rte_mempool_free(g_mbuf_mp); 425 426 return rc; 427 } 428 429 /* for completing rw requests on the orig IO thread. */ 430 static void 431 _reduce_rw_blocks_cb(void *arg) 432 { 433 struct comp_bdev_io *io_ctx = arg; 434 435 if (spdk_likely(io_ctx->status == 0)) { 436 spdk_bdev_io_complete(io_ctx->orig_io, SPDK_BDEV_IO_STATUS_SUCCESS); 437 } else if (io_ctx->status == -ENOMEM) { 438 vbdev_compress_queue_io(spdk_bdev_io_from_ctx(io_ctx)); 439 } else { 440 SPDK_ERRLOG("status %d on operation from reduce API\n", io_ctx->status); 441 spdk_bdev_io_complete(io_ctx->orig_io, SPDK_BDEV_IO_STATUS_FAILED); 442 } 443 } 444 445 /* Completion callback for r/w that were issued via reducelib. */ 446 static void 447 reduce_rw_blocks_cb(void *arg, int reduce_errno) 448 { 449 struct spdk_bdev_io *bdev_io = arg; 450 struct comp_bdev_io *io_ctx = (struct comp_bdev_io *)bdev_io->driver_ctx; 451 struct spdk_io_channel *ch = spdk_io_channel_from_ctx(io_ctx->comp_ch); 452 struct spdk_thread *orig_thread; 453 454 /* TODO: need to decide which error codes are bdev_io success vs failure; 455 * example examine calls reading metadata */ 456 457 io_ctx->status = reduce_errno; 458 459 /* Send this request to the orig IO thread. */ 460 orig_thread = spdk_io_channel_get_thread(ch); 461 462 spdk_thread_exec_msg(orig_thread, _reduce_rw_blocks_cb, io_ctx); 463 } 464 465 static int 466 _setup_compress_mbuf(struct rte_mbuf **mbufs, int *mbuf_total, uint64_t *total_length, 467 struct iovec *iovs, int iovcnt, void *reduce_cb_arg) 468 { 469 uint64_t updated_length, remainder, phys_addr; 470 uint8_t *current_base = NULL; 471 int iov_index, mbuf_index; 472 int rc = 0; 473 474 /* Setup mbufs */ 475 iov_index = mbuf_index = 0; 476 while (iov_index < iovcnt) { 477 478 current_base = iovs[iov_index].iov_base; 479 if (total_length) { 480 *total_length += iovs[iov_index].iov_len; 481 } 482 assert(mbufs[mbuf_index] != NULL); 483 *RTE_MBUF_DYNFIELD(mbufs[mbuf_index], g_mbuf_offset, uint64_t *) = (uint64_t)reduce_cb_arg; 484 updated_length = iovs[iov_index].iov_len; 485 phys_addr = spdk_vtophys((void *)current_base, &updated_length); 486 487 rte_pktmbuf_attach_extbuf(mbufs[mbuf_index], 488 current_base, 489 phys_addr, 490 updated_length, 491 &g_shinfo); 492 rte_pktmbuf_append(mbufs[mbuf_index], updated_length); 493 remainder = iovs[iov_index].iov_len - updated_length; 494 495 if (mbuf_index > 0) { 496 rte_pktmbuf_chain(mbufs[0], mbufs[mbuf_index]); 497 } 498 499 /* If we crossed 2 physical pages boundary we need another mbuf for the remainder */ 500 if (remainder > 0) { 501 /* allocate an mbuf at the end of the array */ 502 rc = rte_pktmbuf_alloc_bulk(g_mbuf_mp, 503 (struct rte_mbuf **)&mbufs[*mbuf_total], 1); 504 if (rc) { 505 SPDK_ERRLOG("ERROR trying to get an extra mbuf!\n"); 506 return -1; 507 } 508 (*mbuf_total)++; 509 mbuf_index++; 510 *RTE_MBUF_DYNFIELD(mbufs[mbuf_index], g_mbuf_offset, uint64_t *) = (uint64_t)reduce_cb_arg; 511 current_base += updated_length; 512 phys_addr = spdk_vtophys((void *)current_base, &remainder); 513 /* assert we don't cross another */ 514 assert(remainder == iovs[iov_index].iov_len - updated_length); 515 516 rte_pktmbuf_attach_extbuf(mbufs[mbuf_index], 517 current_base, 518 phys_addr, 519 remainder, 520 &g_shinfo); 521 rte_pktmbuf_append(mbufs[mbuf_index], remainder); 522 rte_pktmbuf_chain(mbufs[0], mbufs[mbuf_index]); 523 } 524 iov_index++; 525 mbuf_index++; 526 } 527 528 return 0; 529 } 530 531 static int 532 _compress_operation(struct spdk_reduce_backing_dev *backing_dev, struct iovec *src_iovs, 533 int src_iovcnt, struct iovec *dst_iovs, 534 int dst_iovcnt, bool compress, void *cb_arg) 535 { 536 void *reduce_cb_arg = cb_arg; 537 struct vbdev_compress *comp_bdev = SPDK_CONTAINEROF(backing_dev, struct vbdev_compress, 538 backing_dev); 539 struct rte_comp_op *comp_op; 540 struct rte_mbuf *src_mbufs[MAX_MBUFS_PER_OP]; 541 struct rte_mbuf *dst_mbufs[MAX_MBUFS_PER_OP]; 542 uint8_t cdev_id = comp_bdev->device_qp->device->cdev_id; 543 uint64_t total_length = 0; 544 int rc = 0; 545 struct vbdev_comp_op *op_to_queue; 546 int src_mbuf_total = src_iovcnt; 547 int dst_mbuf_total = dst_iovcnt; 548 bool device_error = false; 549 550 assert(src_iovcnt < MAX_MBUFS_PER_OP); 551 552 #ifdef DEBUG 553 memset(src_mbufs, 0, sizeof(src_mbufs)); 554 memset(dst_mbufs, 0, sizeof(dst_mbufs)); 555 #endif 556 557 comp_op = rte_comp_op_alloc(g_comp_op_mp); 558 if (!comp_op) { 559 SPDK_ERRLOG("trying to get a comp op!\n"); 560 rc = -ENOMEM; 561 goto error_get_op; 562 } 563 564 /* get an mbuf per iov, src and dst */ 565 rc = rte_pktmbuf_alloc_bulk(g_mbuf_mp, (struct rte_mbuf **)&src_mbufs[0], src_iovcnt); 566 if (rc) { 567 SPDK_ERRLOG("ERROR trying to get src_mbufs!\n"); 568 rc = -ENOMEM; 569 goto error_get_src; 570 } 571 assert(src_mbufs[0]); 572 573 rc = rte_pktmbuf_alloc_bulk(g_mbuf_mp, (struct rte_mbuf **)&dst_mbufs[0], dst_iovcnt); 574 if (rc) { 575 SPDK_ERRLOG("ERROR trying to get dst_mbufs!\n"); 576 rc = -ENOMEM; 577 goto error_get_dst; 578 } 579 assert(dst_mbufs[0]); 580 581 /* There is a 1:1 mapping between a bdev_io and a compression operation 582 * Some PMDs that SPDK uses don't support chaining, but reduce library should 583 * provide correct buffers 584 * Build our mbuf chain and associate it with our single comp_op. 585 */ 586 rc = _setup_compress_mbuf(src_mbufs, &src_mbuf_total, &total_length, 587 src_iovs, src_iovcnt, reduce_cb_arg); 588 if (rc < 0) { 589 goto error_src_dst; 590 } 591 if (!comp_bdev->backing_dev.sgl_in && src_mbufs[0]->next != NULL) { 592 if (src_iovcnt == 1) { 593 SPDK_ERRLOG("Src buffer crosses physical page boundary but driver %s doesn't support SGL input\n", 594 comp_bdev->drv_name); 595 } else { 596 SPDK_ERRLOG("Driver %s doesn't support SGL input\n", comp_bdev->drv_name); 597 } 598 rc = -EINVAL; 599 goto error_src_dst; 600 } 601 602 comp_op->m_src = src_mbufs[0]; 603 comp_op->src.offset = 0; 604 comp_op->src.length = total_length; 605 606 rc = _setup_compress_mbuf(dst_mbufs, &dst_mbuf_total, NULL, 607 dst_iovs, dst_iovcnt, reduce_cb_arg); 608 if (rc < 0) { 609 goto error_src_dst; 610 } 611 if (!comp_bdev->backing_dev.sgl_out && dst_mbufs[0]->next != NULL) { 612 if (dst_iovcnt == 1) { 613 SPDK_ERRLOG("Dst buffer crosses physical page boundary but driver %s doesn't support SGL output\n", 614 comp_bdev->drv_name); 615 } else { 616 SPDK_ERRLOG("Driver %s doesn't support SGL output\n", comp_bdev->drv_name); 617 } 618 rc = -EINVAL; 619 goto error_src_dst; 620 } 621 622 comp_op->m_dst = dst_mbufs[0]; 623 comp_op->dst.offset = 0; 624 625 if (compress == true) { 626 comp_op->private_xform = comp_bdev->device_qp->device->comp_xform; 627 } else { 628 comp_op->private_xform = comp_bdev->device_qp->device->decomp_xform; 629 } 630 631 comp_op->op_type = RTE_COMP_OP_STATELESS; 632 comp_op->flush_flag = RTE_COMP_FLUSH_FINAL; 633 634 rc = rte_compressdev_enqueue_burst(cdev_id, comp_bdev->device_qp->qp, &comp_op, 1); 635 assert(rc <= 1); 636 637 /* We always expect 1 got queued, if 0 then we need to queue it up. */ 638 if (rc == 1) { 639 return 0; 640 } else if (comp_op->status == RTE_COMP_OP_STATUS_NOT_PROCESSED) { 641 rc = -EAGAIN; 642 } else { 643 device_error = true; 644 } 645 646 /* Error cleanup paths. */ 647 error_src_dst: 648 rte_pktmbuf_free_bulk(dst_mbufs, dst_iovcnt); 649 error_get_dst: 650 rte_pktmbuf_free_bulk(src_mbufs, src_iovcnt); 651 error_get_src: 652 rte_comp_op_free(comp_op); 653 error_get_op: 654 655 if (device_error == true) { 656 /* There was an error sending the op to the device, most 657 * likely with the parameters. 658 */ 659 SPDK_ERRLOG("Compression API returned 0x%x\n", comp_op->status); 660 return -EINVAL; 661 } 662 if (rc != -ENOMEM && rc != -EAGAIN) { 663 return rc; 664 } 665 666 op_to_queue = calloc(1, sizeof(struct vbdev_comp_op)); 667 if (op_to_queue == NULL) { 668 SPDK_ERRLOG("unable to allocate operation for queueing.\n"); 669 return -ENOMEM; 670 } 671 op_to_queue->backing_dev = backing_dev; 672 op_to_queue->src_iovs = src_iovs; 673 op_to_queue->src_iovcnt = src_iovcnt; 674 op_to_queue->dst_iovs = dst_iovs; 675 op_to_queue->dst_iovcnt = dst_iovcnt; 676 op_to_queue->compress = compress; 677 op_to_queue->cb_arg = cb_arg; 678 TAILQ_INSERT_TAIL(&comp_bdev->queued_comp_ops, 679 op_to_queue, 680 link); 681 return 0; 682 } 683 684 /* Poller for the DPDK compression driver. */ 685 static int 686 comp_dev_poller(void *args) 687 { 688 struct vbdev_compress *comp_bdev = args; 689 uint8_t cdev_id = comp_bdev->device_qp->device->cdev_id; 690 struct rte_comp_op *deq_ops[NUM_MAX_INFLIGHT_OPS]; 691 uint16_t num_deq; 692 struct spdk_reduce_vol_cb_args *reduce_args; 693 struct vbdev_comp_op *op_to_resubmit; 694 int rc, i; 695 696 num_deq = rte_compressdev_dequeue_burst(cdev_id, comp_bdev->device_qp->qp, deq_ops, 697 NUM_MAX_INFLIGHT_OPS); 698 for (i = 0; i < num_deq; i++) { 699 reduce_args = (struct spdk_reduce_vol_cb_args *)*RTE_MBUF_DYNFIELD(deq_ops[i]->m_src, g_mbuf_offset, 700 uint64_t *); 701 if (deq_ops[i]->status == RTE_COMP_OP_STATUS_SUCCESS) { 702 703 /* tell reduce this is done and what the bytecount was */ 704 reduce_args->cb_fn(reduce_args->cb_arg, deq_ops[i]->produced); 705 } else { 706 SPDK_NOTICELOG("FYI storing data uncompressed due to deque status %u\n", 707 deq_ops[i]->status); 708 709 /* Reduce will simply store uncompressed on neg errno value. */ 710 reduce_args->cb_fn(reduce_args->cb_arg, -EINVAL); 711 } 712 713 /* Now free both mbufs and the compress operation. The rte_pktmbuf_free() 714 * call takes care of freeing all of the mbufs in the chain back to their 715 * original pool. 716 */ 717 rte_pktmbuf_free(deq_ops[i]->m_src); 718 rte_pktmbuf_free(deq_ops[i]->m_dst); 719 720 /* There is no bulk free for com ops so we have to free them one at a time 721 * here however it would be rare that we'd ever have more than 1 at a time 722 * anyways. 723 */ 724 rte_comp_op_free(deq_ops[i]); 725 726 /* Check if there are any pending comp ops to process, only pull one 727 * at a time off as _compress_operation() may re-queue the op. 728 */ 729 if (!TAILQ_EMPTY(&comp_bdev->queued_comp_ops)) { 730 op_to_resubmit = TAILQ_FIRST(&comp_bdev->queued_comp_ops); 731 rc = _compress_operation(op_to_resubmit->backing_dev, 732 op_to_resubmit->src_iovs, 733 op_to_resubmit->src_iovcnt, 734 op_to_resubmit->dst_iovs, 735 op_to_resubmit->dst_iovcnt, 736 op_to_resubmit->compress, 737 op_to_resubmit->cb_arg); 738 if (rc == 0) { 739 TAILQ_REMOVE(&comp_bdev->queued_comp_ops, op_to_resubmit, link); 740 free(op_to_resubmit); 741 } 742 } 743 } 744 return num_deq == 0 ? SPDK_POLLER_IDLE : SPDK_POLLER_BUSY; 745 } 746 747 /* Entry point for reduce lib to issue a compress operation. */ 748 static void 749 _comp_reduce_compress(struct spdk_reduce_backing_dev *dev, 750 struct iovec *src_iovs, int src_iovcnt, 751 struct iovec *dst_iovs, int dst_iovcnt, 752 struct spdk_reduce_vol_cb_args *cb_arg) 753 { 754 int rc; 755 756 rc = _compress_operation(dev, src_iovs, src_iovcnt, dst_iovs, dst_iovcnt, true, cb_arg); 757 if (rc) { 758 SPDK_ERRLOG("with compress operation code %d (%s)\n", rc, spdk_strerror(-rc)); 759 cb_arg->cb_fn(cb_arg->cb_arg, rc); 760 } 761 } 762 763 /* Entry point for reduce lib to issue a decompress operation. */ 764 static void 765 _comp_reduce_decompress(struct spdk_reduce_backing_dev *dev, 766 struct iovec *src_iovs, int src_iovcnt, 767 struct iovec *dst_iovs, int dst_iovcnt, 768 struct spdk_reduce_vol_cb_args *cb_arg) 769 { 770 int rc; 771 772 rc = _compress_operation(dev, src_iovs, src_iovcnt, dst_iovs, dst_iovcnt, false, cb_arg); 773 if (rc) { 774 SPDK_ERRLOG("with decompress operation code %d (%s)\n", rc, spdk_strerror(-rc)); 775 cb_arg->cb_fn(cb_arg->cb_arg, rc); 776 } 777 } 778 779 /* Callback for getting a buf from the bdev pool in the event that the caller passed 780 * in NULL, we need to own the buffer so it doesn't get freed by another vbdev module 781 * beneath us before we're done with it. 782 */ 783 static void 784 comp_read_get_buf_cb(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io, bool success) 785 { 786 struct vbdev_compress *comp_bdev = SPDK_CONTAINEROF(bdev_io->bdev, struct vbdev_compress, 787 comp_bdev); 788 789 if (spdk_unlikely(!success)) { 790 SPDK_ERRLOG("Failed to get data buffer\n"); 791 reduce_rw_blocks_cb(bdev_io, -ENOMEM); 792 return; 793 } 794 795 spdk_reduce_vol_readv(comp_bdev->vol, bdev_io->u.bdev.iovs, bdev_io->u.bdev.iovcnt, 796 bdev_io->u.bdev.offset_blocks, bdev_io->u.bdev.num_blocks, 797 reduce_rw_blocks_cb, bdev_io); 798 } 799 800 /* scheduled for completion on IO thread */ 801 static void 802 _complete_other_io(void *arg) 803 { 804 struct comp_bdev_io *io_ctx = (struct comp_bdev_io *)arg; 805 if (io_ctx->status == 0) { 806 spdk_bdev_io_complete(io_ctx->orig_io, SPDK_BDEV_IO_STATUS_SUCCESS); 807 } else { 808 spdk_bdev_io_complete(io_ctx->orig_io, SPDK_BDEV_IO_STATUS_FAILED); 809 } 810 } 811 812 /* scheduled for submission on reduce thread */ 813 static void 814 _comp_bdev_io_submit(void *arg) 815 { 816 struct spdk_bdev_io *bdev_io = arg; 817 struct comp_bdev_io *io_ctx = (struct comp_bdev_io *)bdev_io->driver_ctx; 818 struct spdk_io_channel *ch = spdk_io_channel_from_ctx(io_ctx->comp_ch); 819 struct vbdev_compress *comp_bdev = SPDK_CONTAINEROF(bdev_io->bdev, struct vbdev_compress, 820 comp_bdev); 821 struct spdk_thread *orig_thread; 822 823 switch (bdev_io->type) { 824 case SPDK_BDEV_IO_TYPE_READ: 825 spdk_bdev_io_get_buf(bdev_io, comp_read_get_buf_cb, 826 bdev_io->u.bdev.num_blocks * bdev_io->bdev->blocklen); 827 return; 828 case SPDK_BDEV_IO_TYPE_WRITE: 829 spdk_reduce_vol_writev(comp_bdev->vol, bdev_io->u.bdev.iovs, bdev_io->u.bdev.iovcnt, 830 bdev_io->u.bdev.offset_blocks, bdev_io->u.bdev.num_blocks, 831 reduce_rw_blocks_cb, bdev_io); 832 return; 833 /* TODO support RESET in future patch in the series */ 834 case SPDK_BDEV_IO_TYPE_RESET: 835 case SPDK_BDEV_IO_TYPE_WRITE_ZEROES: 836 case SPDK_BDEV_IO_TYPE_UNMAP: 837 case SPDK_BDEV_IO_TYPE_FLUSH: 838 default: 839 SPDK_ERRLOG("Unknown I/O type %d\n", bdev_io->type); 840 io_ctx->status = -ENOTSUP; 841 842 /* Complete this on the orig IO thread. */ 843 orig_thread = spdk_io_channel_get_thread(ch); 844 if (orig_thread != spdk_get_thread()) { 845 spdk_thread_send_msg(orig_thread, _complete_other_io, io_ctx); 846 } else { 847 _complete_other_io(io_ctx); 848 } 849 } 850 } 851 852 /* Called when someone above submits IO to this vbdev. */ 853 static void 854 vbdev_compress_submit_request(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io) 855 { 856 struct comp_bdev_io *io_ctx = (struct comp_bdev_io *)bdev_io->driver_ctx; 857 struct vbdev_compress *comp_bdev = SPDK_CONTAINEROF(bdev_io->bdev, struct vbdev_compress, 858 comp_bdev); 859 struct comp_io_channel *comp_ch = spdk_io_channel_get_ctx(ch); 860 861 memset(io_ctx, 0, sizeof(struct comp_bdev_io)); 862 io_ctx->comp_bdev = comp_bdev; 863 io_ctx->comp_ch = comp_ch; 864 io_ctx->orig_io = bdev_io; 865 866 /* Send this request to the reduce_thread if that's not what we're on. */ 867 if (spdk_get_thread() != comp_bdev->reduce_thread) { 868 spdk_thread_send_msg(comp_bdev->reduce_thread, _comp_bdev_io_submit, bdev_io); 869 } else { 870 _comp_bdev_io_submit(bdev_io); 871 } 872 } 873 874 static bool 875 vbdev_compress_io_type_supported(void *ctx, enum spdk_bdev_io_type io_type) 876 { 877 struct vbdev_compress *comp_bdev = (struct vbdev_compress *)ctx; 878 879 switch (io_type) { 880 case SPDK_BDEV_IO_TYPE_READ: 881 case SPDK_BDEV_IO_TYPE_WRITE: 882 return spdk_bdev_io_type_supported(comp_bdev->base_bdev, io_type); 883 case SPDK_BDEV_IO_TYPE_UNMAP: 884 case SPDK_BDEV_IO_TYPE_RESET: 885 case SPDK_BDEV_IO_TYPE_FLUSH: 886 case SPDK_BDEV_IO_TYPE_WRITE_ZEROES: 887 default: 888 return false; 889 } 890 } 891 892 /* Resubmission function used by the bdev layer when a queued IO is ready to be 893 * submitted. 894 */ 895 static void 896 vbdev_compress_resubmit_io(void *arg) 897 { 898 struct spdk_bdev_io *bdev_io = (struct spdk_bdev_io *)arg; 899 struct comp_bdev_io *io_ctx = (struct comp_bdev_io *)bdev_io->driver_ctx; 900 901 vbdev_compress_submit_request(io_ctx->ch, bdev_io); 902 } 903 904 /* Used to queue an IO in the event of resource issues. */ 905 static void 906 vbdev_compress_queue_io(struct spdk_bdev_io *bdev_io) 907 { 908 struct comp_bdev_io *io_ctx = (struct comp_bdev_io *)bdev_io->driver_ctx; 909 int rc; 910 911 io_ctx->bdev_io_wait.bdev = bdev_io->bdev; 912 io_ctx->bdev_io_wait.cb_fn = vbdev_compress_resubmit_io; 913 io_ctx->bdev_io_wait.cb_arg = bdev_io; 914 915 rc = spdk_bdev_queue_io_wait(bdev_io->bdev, io_ctx->comp_bdev->base_ch, &io_ctx->bdev_io_wait); 916 if (rc) { 917 SPDK_ERRLOG("Queue io failed in vbdev_compress_queue_io, rc=%d.\n", rc); 918 assert(false); 919 spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED); 920 } 921 } 922 923 /* Callback for unregistering the IO device. */ 924 static void 925 _device_unregister_cb(void *io_device) 926 { 927 struct vbdev_compress *comp_bdev = io_device; 928 929 /* Done with this comp_bdev. */ 930 pthread_mutex_destroy(&comp_bdev->reduce_lock); 931 free(comp_bdev->comp_bdev.name); 932 free(comp_bdev); 933 } 934 935 static void 936 _vbdev_compress_destruct_cb(void *ctx) 937 { 938 struct vbdev_compress *comp_bdev = ctx; 939 940 TAILQ_REMOVE(&g_vbdev_comp, comp_bdev, link); 941 spdk_bdev_module_release_bdev(comp_bdev->base_bdev); 942 /* Close the underlying bdev on its same opened thread. */ 943 spdk_bdev_close(comp_bdev->base_desc); 944 comp_bdev->vol = NULL; 945 if (comp_bdev->orphaned == false) { 946 spdk_io_device_unregister(comp_bdev, _device_unregister_cb); 947 } else { 948 vbdev_compress_delete_done(comp_bdev->delete_ctx, 0); 949 _device_unregister_cb(comp_bdev); 950 } 951 } 952 953 static void 954 vbdev_compress_destruct_cb(void *cb_arg, int reduce_errno) 955 { 956 struct vbdev_compress *comp_bdev = (struct vbdev_compress *)cb_arg; 957 958 if (reduce_errno) { 959 SPDK_ERRLOG("number %d\n", reduce_errno); 960 } else { 961 if (comp_bdev->thread && comp_bdev->thread != spdk_get_thread()) { 962 spdk_thread_send_msg(comp_bdev->thread, 963 _vbdev_compress_destruct_cb, comp_bdev); 964 } else { 965 _vbdev_compress_destruct_cb(comp_bdev); 966 } 967 } 968 } 969 970 static void 971 _reduce_destroy_cb(void *ctx, int reduce_errno) 972 { 973 struct vbdev_compress *comp_bdev = (struct vbdev_compress *)ctx; 974 975 if (reduce_errno) { 976 SPDK_ERRLOG("number %d\n", reduce_errno); 977 } 978 979 comp_bdev->vol = NULL; 980 spdk_put_io_channel(comp_bdev->base_ch); 981 if (comp_bdev->orphaned == false) { 982 spdk_bdev_unregister(&comp_bdev->comp_bdev, vbdev_compress_delete_done, 983 comp_bdev->delete_ctx); 984 } else { 985 vbdev_compress_destruct_cb((void *)comp_bdev, 0); 986 } 987 988 } 989 990 static void 991 _delete_vol_unload_cb(void *ctx) 992 { 993 struct vbdev_compress *comp_bdev = ctx; 994 995 /* FIXME: Assert if these conditions are not satisfied for now. */ 996 assert(!comp_bdev->reduce_thread || 997 comp_bdev->reduce_thread == spdk_get_thread()); 998 999 /* reducelib needs a channel to comm with the backing device */ 1000 comp_bdev->base_ch = spdk_bdev_get_io_channel(comp_bdev->base_desc); 1001 1002 /* Clean the device before we free our resources. */ 1003 spdk_reduce_vol_destroy(&comp_bdev->backing_dev, _reduce_destroy_cb, comp_bdev); 1004 } 1005 1006 /* Called by reduceLib after performing unload vol actions */ 1007 static void 1008 delete_vol_unload_cb(void *cb_arg, int reduce_errno) 1009 { 1010 struct vbdev_compress *comp_bdev = (struct vbdev_compress *)cb_arg; 1011 1012 if (reduce_errno) { 1013 SPDK_ERRLOG("number %d\n", reduce_errno); 1014 /* FIXME: callback should be executed. */ 1015 return; 1016 } 1017 1018 pthread_mutex_lock(&comp_bdev->reduce_lock); 1019 if (comp_bdev->reduce_thread && comp_bdev->reduce_thread != spdk_get_thread()) { 1020 spdk_thread_send_msg(comp_bdev->reduce_thread, 1021 _delete_vol_unload_cb, comp_bdev); 1022 pthread_mutex_unlock(&comp_bdev->reduce_lock); 1023 } else { 1024 pthread_mutex_unlock(&comp_bdev->reduce_lock); 1025 1026 _delete_vol_unload_cb(comp_bdev); 1027 } 1028 } 1029 1030 const char * 1031 compress_get_name(const struct vbdev_compress *comp_bdev) 1032 { 1033 return comp_bdev->comp_bdev.name; 1034 } 1035 1036 struct vbdev_compress * 1037 compress_bdev_first(void) 1038 { 1039 struct vbdev_compress *comp_bdev; 1040 1041 comp_bdev = TAILQ_FIRST(&g_vbdev_comp); 1042 1043 return comp_bdev; 1044 } 1045 1046 struct vbdev_compress * 1047 compress_bdev_next(struct vbdev_compress *prev) 1048 { 1049 struct vbdev_compress *comp_bdev; 1050 1051 comp_bdev = TAILQ_NEXT(prev, link); 1052 1053 return comp_bdev; 1054 } 1055 1056 bool 1057 compress_has_orphan(const char *name) 1058 { 1059 struct vbdev_compress *comp_bdev; 1060 1061 TAILQ_FOREACH(comp_bdev, &g_vbdev_comp, link) { 1062 if (comp_bdev->orphaned && strcmp(name, comp_bdev->comp_bdev.name) == 0) { 1063 return true; 1064 } 1065 } 1066 return false; 1067 } 1068 1069 /* Called after we've unregistered following a hot remove callback. 1070 * Our finish entry point will be called next. 1071 */ 1072 static int 1073 vbdev_compress_destruct(void *ctx) 1074 { 1075 struct vbdev_compress *comp_bdev = (struct vbdev_compress *)ctx; 1076 1077 if (comp_bdev->vol != NULL) { 1078 /* Tell reducelib that we're done with this volume. */ 1079 spdk_reduce_vol_unload(comp_bdev->vol, vbdev_compress_destruct_cb, comp_bdev); 1080 } else { 1081 vbdev_compress_destruct_cb(comp_bdev, 0); 1082 } 1083 1084 return 0; 1085 } 1086 1087 /* We supplied this as an entry point for upper layers who want to communicate to this 1088 * bdev. This is how they get a channel. 1089 */ 1090 static struct spdk_io_channel * 1091 vbdev_compress_get_io_channel(void *ctx) 1092 { 1093 struct vbdev_compress *comp_bdev = (struct vbdev_compress *)ctx; 1094 1095 /* The IO channel code will allocate a channel for us which consists of 1096 * the SPDK channel structure plus the size of our comp_io_channel struct 1097 * that we passed in when we registered our IO device. It will then call 1098 * our channel create callback to populate any elements that we need to 1099 * update. 1100 */ 1101 return spdk_get_io_channel(comp_bdev); 1102 } 1103 1104 /* This is the output for bdev_get_bdevs() for this vbdev */ 1105 static int 1106 vbdev_compress_dump_info_json(void *ctx, struct spdk_json_write_ctx *w) 1107 { 1108 struct vbdev_compress *comp_bdev = (struct vbdev_compress *)ctx; 1109 1110 spdk_json_write_name(w, "compress"); 1111 spdk_json_write_object_begin(w); 1112 spdk_json_write_named_string(w, "name", spdk_bdev_get_name(&comp_bdev->comp_bdev)); 1113 spdk_json_write_named_string(w, "base_bdev_name", spdk_bdev_get_name(comp_bdev->base_bdev)); 1114 spdk_json_write_named_string(w, "compression_pmd", comp_bdev->drv_name); 1115 spdk_json_write_object_end(w); 1116 1117 return 0; 1118 } 1119 1120 /* This is used to generate JSON that can configure this module to its current state. */ 1121 static int 1122 vbdev_compress_config_json(struct spdk_json_write_ctx *w) 1123 { 1124 struct vbdev_compress *comp_bdev; 1125 1126 TAILQ_FOREACH(comp_bdev, &g_vbdev_comp, link) { 1127 spdk_json_write_object_begin(w); 1128 spdk_json_write_named_string(w, "method", "bdev_compress_create"); 1129 spdk_json_write_named_object_begin(w, "params"); 1130 spdk_json_write_named_string(w, "base_bdev_name", spdk_bdev_get_name(comp_bdev->base_bdev)); 1131 spdk_json_write_named_string(w, "name", spdk_bdev_get_name(&comp_bdev->comp_bdev)); 1132 spdk_json_write_named_string(w, "compression_pmd", comp_bdev->drv_name); 1133 spdk_json_write_object_end(w); 1134 spdk_json_write_object_end(w); 1135 } 1136 return 0; 1137 } 1138 1139 static void 1140 _vbdev_reduce_init_cb(void *ctx) 1141 { 1142 struct vbdev_compress *meta_ctx = ctx; 1143 int rc; 1144 1145 assert(meta_ctx->base_desc != NULL); 1146 1147 /* We're done with metadata operations */ 1148 spdk_put_io_channel(meta_ctx->base_ch); 1149 1150 if (meta_ctx->vol) { 1151 rc = vbdev_compress_claim(meta_ctx); 1152 if (rc == 0) { 1153 return; 1154 } 1155 } 1156 1157 /* Close the underlying bdev on its same opened thread. */ 1158 spdk_bdev_close(meta_ctx->base_desc); 1159 free(meta_ctx); 1160 } 1161 1162 /* Callback from reduce for when init is complete. We'll pass the vbdev_comp struct 1163 * used for initial metadata operations to claim where it will be further filled out 1164 * and added to the global list. 1165 */ 1166 static void 1167 vbdev_reduce_init_cb(void *cb_arg, struct spdk_reduce_vol *vol, int reduce_errno) 1168 { 1169 struct vbdev_compress *meta_ctx = cb_arg; 1170 1171 if (reduce_errno == 0) { 1172 meta_ctx->vol = vol; 1173 } else { 1174 SPDK_ERRLOG("for vol %s, error %u\n", 1175 spdk_bdev_get_name(meta_ctx->base_bdev), reduce_errno); 1176 } 1177 1178 if (meta_ctx->thread && meta_ctx->thread != spdk_get_thread()) { 1179 spdk_thread_send_msg(meta_ctx->thread, _vbdev_reduce_init_cb, meta_ctx); 1180 } else { 1181 _vbdev_reduce_init_cb(meta_ctx); 1182 } 1183 } 1184 1185 /* Callback for the function used by reduceLib to perform IO to/from the backing device. We just 1186 * call the callback provided by reduceLib when it called the read/write/unmap function and 1187 * free the bdev_io. 1188 */ 1189 static void 1190 comp_reduce_io_cb(struct spdk_bdev_io *bdev_io, bool success, void *arg) 1191 { 1192 struct spdk_reduce_vol_cb_args *cb_args = arg; 1193 int reduce_errno; 1194 1195 if (success) { 1196 reduce_errno = 0; 1197 } else { 1198 reduce_errno = -EIO; 1199 } 1200 spdk_bdev_free_io(bdev_io); 1201 cb_args->cb_fn(cb_args->cb_arg, reduce_errno); 1202 } 1203 1204 /* This is the function provided to the reduceLib for sending reads directly to 1205 * the backing device. 1206 */ 1207 static void 1208 _comp_reduce_readv(struct spdk_reduce_backing_dev *dev, struct iovec *iov, int iovcnt, 1209 uint64_t lba, uint32_t lba_count, struct spdk_reduce_vol_cb_args *args) 1210 { 1211 struct vbdev_compress *comp_bdev = SPDK_CONTAINEROF(dev, struct vbdev_compress, 1212 backing_dev); 1213 int rc; 1214 1215 rc = spdk_bdev_readv_blocks(comp_bdev->base_desc, comp_bdev->base_ch, 1216 iov, iovcnt, lba, lba_count, 1217 comp_reduce_io_cb, 1218 args); 1219 if (rc) { 1220 if (rc == -ENOMEM) { 1221 SPDK_ERRLOG("No memory, start to queue io.\n"); 1222 /* TODO: there's no bdev_io to queue */ 1223 } else { 1224 SPDK_ERRLOG("submitting readv request\n"); 1225 } 1226 args->cb_fn(args->cb_arg, rc); 1227 } 1228 } 1229 1230 /* This is the function provided to the reduceLib for sending writes directly to 1231 * the backing device. 1232 */ 1233 static void 1234 _comp_reduce_writev(struct spdk_reduce_backing_dev *dev, struct iovec *iov, int iovcnt, 1235 uint64_t lba, uint32_t lba_count, struct spdk_reduce_vol_cb_args *args) 1236 { 1237 struct vbdev_compress *comp_bdev = SPDK_CONTAINEROF(dev, struct vbdev_compress, 1238 backing_dev); 1239 int rc; 1240 1241 rc = spdk_bdev_writev_blocks(comp_bdev->base_desc, comp_bdev->base_ch, 1242 iov, iovcnt, lba, lba_count, 1243 comp_reduce_io_cb, 1244 args); 1245 if (rc) { 1246 if (rc == -ENOMEM) { 1247 SPDK_ERRLOG("No memory, start to queue io.\n"); 1248 /* TODO: there's no bdev_io to queue */ 1249 } else { 1250 SPDK_ERRLOG("error submitting writev request\n"); 1251 } 1252 args->cb_fn(args->cb_arg, rc); 1253 } 1254 } 1255 1256 /* This is the function provided to the reduceLib for sending unmaps directly to 1257 * the backing device. 1258 */ 1259 static void 1260 _comp_reduce_unmap(struct spdk_reduce_backing_dev *dev, 1261 uint64_t lba, uint32_t lba_count, struct spdk_reduce_vol_cb_args *args) 1262 { 1263 struct vbdev_compress *comp_bdev = SPDK_CONTAINEROF(dev, struct vbdev_compress, 1264 backing_dev); 1265 int rc; 1266 1267 rc = spdk_bdev_unmap_blocks(comp_bdev->base_desc, comp_bdev->base_ch, 1268 lba, lba_count, 1269 comp_reduce_io_cb, 1270 args); 1271 1272 if (rc) { 1273 if (rc == -ENOMEM) { 1274 SPDK_ERRLOG("No memory, start to queue io.\n"); 1275 /* TODO: there's no bdev_io to queue */ 1276 } else { 1277 SPDK_ERRLOG("submitting unmap request\n"); 1278 } 1279 args->cb_fn(args->cb_arg, rc); 1280 } 1281 } 1282 1283 /* Called by reduceLib after performing unload vol actions following base bdev hotremove */ 1284 static void 1285 bdev_hotremove_vol_unload_cb(void *cb_arg, int reduce_errno) 1286 { 1287 struct vbdev_compress *comp_bdev = (struct vbdev_compress *)cb_arg; 1288 1289 if (reduce_errno) { 1290 SPDK_ERRLOG("number %d\n", reduce_errno); 1291 } 1292 1293 comp_bdev->vol = NULL; 1294 spdk_bdev_unregister(&comp_bdev->comp_bdev, NULL, NULL); 1295 } 1296 1297 static void 1298 vbdev_compress_base_bdev_hotremove_cb(struct spdk_bdev *bdev_find) 1299 { 1300 struct vbdev_compress *comp_bdev, *tmp; 1301 1302 TAILQ_FOREACH_SAFE(comp_bdev, &g_vbdev_comp, link, tmp) { 1303 if (bdev_find == comp_bdev->base_bdev) { 1304 /* Tell reduceLib that we're done with this volume. */ 1305 spdk_reduce_vol_unload(comp_bdev->vol, bdev_hotremove_vol_unload_cb, comp_bdev); 1306 } 1307 } 1308 } 1309 1310 /* Called when the underlying base bdev triggers asynchronous event such as bdev removal. */ 1311 static void 1312 vbdev_compress_base_bdev_event_cb(enum spdk_bdev_event_type type, struct spdk_bdev *bdev, 1313 void *event_ctx) 1314 { 1315 switch (type) { 1316 case SPDK_BDEV_EVENT_REMOVE: 1317 vbdev_compress_base_bdev_hotremove_cb(bdev); 1318 break; 1319 default: 1320 SPDK_NOTICELOG("Unsupported bdev event: type %d\n", type); 1321 break; 1322 } 1323 } 1324 1325 /* TODO: determine which parms we want user configurable, HC for now 1326 * params.vol_size 1327 * params.chunk_size 1328 * compression PMD, algorithm, window size, comp level, etc. 1329 * DEV_MD_PATH 1330 */ 1331 1332 /* Common function for init and load to allocate and populate the minimal 1333 * information for reducelib to init or load. 1334 */ 1335 struct vbdev_compress * 1336 _prepare_for_load_init(struct spdk_bdev_desc *bdev_desc, uint32_t lb_size) 1337 { 1338 struct vbdev_compress *meta_ctx; 1339 struct spdk_bdev *bdev; 1340 1341 meta_ctx = calloc(1, sizeof(struct vbdev_compress)); 1342 if (meta_ctx == NULL) { 1343 SPDK_ERRLOG("failed to alloc init contexts\n"); 1344 return NULL; 1345 } 1346 1347 meta_ctx->drv_name = "None"; 1348 meta_ctx->backing_dev.unmap = _comp_reduce_unmap; 1349 meta_ctx->backing_dev.readv = _comp_reduce_readv; 1350 meta_ctx->backing_dev.writev = _comp_reduce_writev; 1351 meta_ctx->backing_dev.compress = _comp_reduce_compress; 1352 meta_ctx->backing_dev.decompress = _comp_reduce_decompress; 1353 1354 meta_ctx->base_desc = bdev_desc; 1355 bdev = spdk_bdev_desc_get_bdev(bdev_desc); 1356 meta_ctx->base_bdev = bdev; 1357 1358 meta_ctx->backing_dev.blocklen = bdev->blocklen; 1359 meta_ctx->backing_dev.blockcnt = bdev->blockcnt; 1360 1361 meta_ctx->params.chunk_size = CHUNK_SIZE; 1362 if (lb_size == 0) { 1363 meta_ctx->params.logical_block_size = bdev->blocklen; 1364 } else { 1365 meta_ctx->params.logical_block_size = lb_size; 1366 } 1367 1368 meta_ctx->params.backing_io_unit_size = BACKING_IO_SZ; 1369 return meta_ctx; 1370 } 1371 1372 static bool 1373 _set_pmd(struct vbdev_compress *comp_dev) 1374 { 1375 if (g_opts == COMPRESS_PMD_AUTO) { 1376 if (g_qat_available) { 1377 comp_dev->drv_name = QAT_PMD; 1378 } else if (g_mlx5_pci_available) { 1379 comp_dev->drv_name = MLX5_PMD; 1380 } else { 1381 comp_dev->drv_name = ISAL_PMD; 1382 } 1383 } else if (g_opts == COMPRESS_PMD_QAT_ONLY && g_qat_available) { 1384 comp_dev->drv_name = QAT_PMD; 1385 } else if (g_opts == COMPRESS_PMD_ISAL_ONLY && g_isal_available) { 1386 comp_dev->drv_name = ISAL_PMD; 1387 } else if (g_opts == COMPRESS_PMD_MLX5_PCI_ONLY && g_mlx5_pci_available) { 1388 comp_dev->drv_name = MLX5_PMD; 1389 } else { 1390 SPDK_ERRLOG("Requested PMD is not available.\n"); 1391 return false; 1392 } 1393 SPDK_NOTICELOG("PMD being used: %s\n", comp_dev->drv_name); 1394 return true; 1395 } 1396 1397 /* Call reducelib to initialize a new volume */ 1398 static int 1399 vbdev_init_reduce(const char *bdev_name, const char *pm_path, uint32_t lb_size) 1400 { 1401 struct spdk_bdev_desc *bdev_desc = NULL; 1402 struct vbdev_compress *meta_ctx; 1403 int rc; 1404 1405 rc = spdk_bdev_open_ext(bdev_name, true, vbdev_compress_base_bdev_event_cb, 1406 NULL, &bdev_desc); 1407 if (rc) { 1408 SPDK_ERRLOG("could not open bdev %s\n", bdev_name); 1409 return rc; 1410 } 1411 1412 meta_ctx = _prepare_for_load_init(bdev_desc, lb_size); 1413 if (meta_ctx == NULL) { 1414 spdk_bdev_close(bdev_desc); 1415 return -EINVAL; 1416 } 1417 1418 if (_set_pmd(meta_ctx) == false) { 1419 SPDK_ERRLOG("could not find required pmd\n"); 1420 free(meta_ctx); 1421 spdk_bdev_close(bdev_desc); 1422 return -EINVAL; 1423 } 1424 1425 /* Save the thread where the base device is opened */ 1426 meta_ctx->thread = spdk_get_thread(); 1427 1428 meta_ctx->base_ch = spdk_bdev_get_io_channel(meta_ctx->base_desc); 1429 1430 spdk_reduce_vol_init(&meta_ctx->params, &meta_ctx->backing_dev, 1431 pm_path, 1432 vbdev_reduce_init_cb, 1433 meta_ctx); 1434 return 0; 1435 } 1436 1437 /* We provide this callback for the SPDK channel code to create a channel using 1438 * the channel struct we provided in our module get_io_channel() entry point. Here 1439 * we get and save off an underlying base channel of the device below us so that 1440 * we can communicate with the base bdev on a per channel basis. If we needed 1441 * our own poller for this vbdev, we'd register it here. 1442 */ 1443 static int 1444 comp_bdev_ch_create_cb(void *io_device, void *ctx_buf) 1445 { 1446 struct vbdev_compress *comp_bdev = io_device; 1447 struct comp_device_qp *device_qp; 1448 1449 /* Now set the reduce channel if it's not already set. */ 1450 pthread_mutex_lock(&comp_bdev->reduce_lock); 1451 if (comp_bdev->ch_count == 0) { 1452 /* We use this queue to track outstanding IO in our layer. */ 1453 TAILQ_INIT(&comp_bdev->pending_comp_ios); 1454 1455 /* We use this to queue up compression operations as needed. */ 1456 TAILQ_INIT(&comp_bdev->queued_comp_ops); 1457 1458 comp_bdev->base_ch = spdk_bdev_get_io_channel(comp_bdev->base_desc); 1459 comp_bdev->reduce_thread = spdk_get_thread(); 1460 comp_bdev->poller = SPDK_POLLER_REGISTER(comp_dev_poller, comp_bdev, 0); 1461 /* Now assign a q pair */ 1462 pthread_mutex_lock(&g_comp_device_qp_lock); 1463 TAILQ_FOREACH(device_qp, &g_comp_device_qp, link) { 1464 if (strcmp(device_qp->device->cdev_info.driver_name, comp_bdev->drv_name) == 0) { 1465 if (device_qp->thread == spdk_get_thread()) { 1466 comp_bdev->device_qp = device_qp; 1467 break; 1468 } 1469 if (device_qp->thread == NULL) { 1470 comp_bdev->device_qp = device_qp; 1471 device_qp->thread = spdk_get_thread(); 1472 break; 1473 } 1474 } 1475 } 1476 pthread_mutex_unlock(&g_comp_device_qp_lock); 1477 } 1478 comp_bdev->ch_count++; 1479 pthread_mutex_unlock(&comp_bdev->reduce_lock); 1480 1481 if (comp_bdev->device_qp != NULL) { 1482 uint64_t comp_feature_flags = 1483 comp_bdev->device_qp->device->cdev_info.capabilities[RTE_COMP_ALGO_DEFLATE].comp_feature_flags; 1484 1485 if (comp_feature_flags & (RTE_COMP_FF_OOP_SGL_IN_SGL_OUT | RTE_COMP_FF_OOP_SGL_IN_LB_OUT)) { 1486 comp_bdev->backing_dev.sgl_in = true; 1487 } 1488 if (comp_feature_flags & (RTE_COMP_FF_OOP_SGL_IN_SGL_OUT | RTE_COMP_FF_OOP_LB_IN_SGL_OUT)) { 1489 comp_bdev->backing_dev.sgl_out = true; 1490 } 1491 return 0; 1492 } else { 1493 SPDK_ERRLOG("out of qpairs, cannot assign one to comp_bdev %p\n", comp_bdev); 1494 assert(false); 1495 return -ENOMEM; 1496 } 1497 } 1498 1499 static void 1500 _channel_cleanup(struct vbdev_compress *comp_bdev) 1501 { 1502 /* Note: comp_bdevs can share a device_qp if they are 1503 * on the same thread so we leave the device_qp element 1504 * alone for this comp_bdev and just clear the reduce thread. 1505 */ 1506 spdk_put_io_channel(comp_bdev->base_ch); 1507 comp_bdev->reduce_thread = NULL; 1508 spdk_poller_unregister(&comp_bdev->poller); 1509 } 1510 1511 /* Used to reroute destroy_ch to the correct thread */ 1512 static void 1513 _comp_bdev_ch_destroy_cb(void *arg) 1514 { 1515 struct vbdev_compress *comp_bdev = arg; 1516 1517 pthread_mutex_lock(&comp_bdev->reduce_lock); 1518 _channel_cleanup(comp_bdev); 1519 pthread_mutex_unlock(&comp_bdev->reduce_lock); 1520 } 1521 1522 /* We provide this callback for the SPDK channel code to destroy a channel 1523 * created with our create callback. We just need to undo anything we did 1524 * when we created. If this bdev used its own poller, we'd unregister it here. 1525 */ 1526 static void 1527 comp_bdev_ch_destroy_cb(void *io_device, void *ctx_buf) 1528 { 1529 struct vbdev_compress *comp_bdev = io_device; 1530 1531 pthread_mutex_lock(&comp_bdev->reduce_lock); 1532 comp_bdev->ch_count--; 1533 if (comp_bdev->ch_count == 0) { 1534 /* Send this request to the thread where the channel was created. */ 1535 if (comp_bdev->reduce_thread != spdk_get_thread()) { 1536 spdk_thread_send_msg(comp_bdev->reduce_thread, 1537 _comp_bdev_ch_destroy_cb, comp_bdev); 1538 } else { 1539 _channel_cleanup(comp_bdev); 1540 } 1541 } 1542 pthread_mutex_unlock(&comp_bdev->reduce_lock); 1543 } 1544 1545 /* RPC entry point for compression vbdev creation. */ 1546 int 1547 create_compress_bdev(const char *bdev_name, const char *pm_path, uint32_t lb_size) 1548 { 1549 struct vbdev_compress *comp_bdev = NULL; 1550 1551 if ((lb_size != 0) && (lb_size != LB_SIZE_4K) && (lb_size != LB_SIZE_512B)) { 1552 SPDK_ERRLOG("Logical block size must be 512 or 4096\n"); 1553 return -EINVAL; 1554 } 1555 1556 TAILQ_FOREACH(comp_bdev, &g_vbdev_comp, link) { 1557 if (strcmp(bdev_name, comp_bdev->base_bdev->name) == 0) { 1558 SPDK_ERRLOG("Bass bdev %s already being used for a compress bdev\n", bdev_name); 1559 return -EBUSY; 1560 } 1561 } 1562 return vbdev_init_reduce(bdev_name, pm_path, lb_size); 1563 } 1564 1565 /* On init, just init the compress drivers. All metadata is stored on disk. */ 1566 static int 1567 vbdev_compress_init(void) 1568 { 1569 if (vbdev_init_compress_drivers()) { 1570 SPDK_ERRLOG("Error setting up compression devices\n"); 1571 return -EINVAL; 1572 } 1573 1574 return 0; 1575 } 1576 1577 /* Called when the entire module is being torn down. */ 1578 static void 1579 vbdev_compress_finish(void) 1580 { 1581 struct comp_device_qp *dev_qp; 1582 /* TODO: unload vol in a future patch */ 1583 1584 while ((dev_qp = TAILQ_FIRST(&g_comp_device_qp))) { 1585 TAILQ_REMOVE(&g_comp_device_qp, dev_qp, link); 1586 free(dev_qp); 1587 } 1588 pthread_mutex_destroy(&g_comp_device_qp_lock); 1589 1590 rte_mempool_free(g_comp_op_mp); 1591 rte_mempool_free(g_mbuf_mp); 1592 } 1593 1594 /* During init we'll be asked how much memory we'd like passed to us 1595 * in bev_io structures as context. Here's where we specify how 1596 * much context we want per IO. 1597 */ 1598 static int 1599 vbdev_compress_get_ctx_size(void) 1600 { 1601 return sizeof(struct comp_bdev_io); 1602 } 1603 1604 /* When we register our bdev this is how we specify our entry points. */ 1605 static const struct spdk_bdev_fn_table vbdev_compress_fn_table = { 1606 .destruct = vbdev_compress_destruct, 1607 .submit_request = vbdev_compress_submit_request, 1608 .io_type_supported = vbdev_compress_io_type_supported, 1609 .get_io_channel = vbdev_compress_get_io_channel, 1610 .dump_info_json = vbdev_compress_dump_info_json, 1611 .write_config_json = NULL, 1612 }; 1613 1614 static struct spdk_bdev_module compress_if = { 1615 .name = "compress", 1616 .module_init = vbdev_compress_init, 1617 .get_ctx_size = vbdev_compress_get_ctx_size, 1618 .examine_disk = vbdev_compress_examine, 1619 .module_fini = vbdev_compress_finish, 1620 .config_json = vbdev_compress_config_json 1621 }; 1622 1623 SPDK_BDEV_MODULE_REGISTER(compress, &compress_if) 1624 1625 static int _set_compbdev_name(struct vbdev_compress *comp_bdev) 1626 { 1627 struct spdk_bdev_alias *aliases; 1628 1629 if (!TAILQ_EMPTY(spdk_bdev_get_aliases(comp_bdev->base_bdev))) { 1630 aliases = TAILQ_FIRST(spdk_bdev_get_aliases(comp_bdev->base_bdev)); 1631 comp_bdev->comp_bdev.name = spdk_sprintf_alloc("COMP_%s", aliases->alias.name); 1632 if (!comp_bdev->comp_bdev.name) { 1633 SPDK_ERRLOG("could not allocate comp_bdev name for alias\n"); 1634 return -ENOMEM; 1635 } 1636 } else { 1637 comp_bdev->comp_bdev.name = spdk_sprintf_alloc("COMP_%s", comp_bdev->base_bdev->name); 1638 if (!comp_bdev->comp_bdev.name) { 1639 SPDK_ERRLOG("could not allocate comp_bdev name for unique name\n"); 1640 return -ENOMEM; 1641 } 1642 } 1643 return 0; 1644 } 1645 1646 static int 1647 vbdev_compress_claim(struct vbdev_compress *comp_bdev) 1648 { 1649 int rc; 1650 1651 if (_set_compbdev_name(comp_bdev)) { 1652 return -EINVAL; 1653 } 1654 1655 /* Note: some of the fields below will change in the future - for example, 1656 * blockcnt specifically will not match (the compressed volume size will 1657 * be slightly less than the base bdev size) 1658 */ 1659 comp_bdev->comp_bdev.product_name = COMP_BDEV_NAME; 1660 comp_bdev->comp_bdev.write_cache = comp_bdev->base_bdev->write_cache; 1661 1662 if (strcmp(comp_bdev->drv_name, QAT_PMD) == 0) { 1663 comp_bdev->comp_bdev.required_alignment = 1664 spdk_max(spdk_u32log2(comp_bdev->base_bdev->blocklen), 1665 comp_bdev->base_bdev->required_alignment); 1666 SPDK_NOTICELOG("QAT in use: Required alignment set to %u\n", 1667 comp_bdev->comp_bdev.required_alignment); 1668 } else { 1669 comp_bdev->comp_bdev.required_alignment = comp_bdev->base_bdev->required_alignment; 1670 } 1671 comp_bdev->comp_bdev.optimal_io_boundary = 1672 comp_bdev->params.chunk_size / comp_bdev->params.logical_block_size; 1673 1674 comp_bdev->comp_bdev.split_on_optimal_io_boundary = true; 1675 1676 comp_bdev->comp_bdev.blocklen = comp_bdev->params.logical_block_size; 1677 comp_bdev->comp_bdev.blockcnt = comp_bdev->params.vol_size / comp_bdev->comp_bdev.blocklen; 1678 assert(comp_bdev->comp_bdev.blockcnt > 0); 1679 1680 /* This is the context that is passed to us when the bdev 1681 * layer calls in so we'll save our comp_bdev node here. 1682 */ 1683 comp_bdev->comp_bdev.ctxt = comp_bdev; 1684 comp_bdev->comp_bdev.fn_table = &vbdev_compress_fn_table; 1685 comp_bdev->comp_bdev.module = &compress_if; 1686 1687 pthread_mutex_init(&comp_bdev->reduce_lock, NULL); 1688 1689 /* Save the thread where the base device is opened */ 1690 comp_bdev->thread = spdk_get_thread(); 1691 1692 spdk_io_device_register(comp_bdev, comp_bdev_ch_create_cb, comp_bdev_ch_destroy_cb, 1693 sizeof(struct comp_io_channel), 1694 comp_bdev->comp_bdev.name); 1695 1696 rc = spdk_bdev_module_claim_bdev(comp_bdev->base_bdev, comp_bdev->base_desc, 1697 comp_bdev->comp_bdev.module); 1698 if (rc) { 1699 SPDK_ERRLOG("could not claim bdev %s\n", spdk_bdev_get_name(comp_bdev->base_bdev)); 1700 goto error_claim; 1701 } 1702 1703 rc = spdk_bdev_register(&comp_bdev->comp_bdev); 1704 if (rc < 0) { 1705 SPDK_ERRLOG("trying to register bdev\n"); 1706 goto error_bdev_register; 1707 } 1708 1709 TAILQ_INSERT_TAIL(&g_vbdev_comp, comp_bdev, link); 1710 1711 SPDK_NOTICELOG("registered io_device and virtual bdev for: %s\n", comp_bdev->comp_bdev.name); 1712 1713 return 0; 1714 1715 /* Error cleanup paths. */ 1716 error_bdev_register: 1717 spdk_bdev_module_release_bdev(comp_bdev->base_bdev); 1718 error_claim: 1719 spdk_io_device_unregister(comp_bdev, NULL); 1720 free(comp_bdev->comp_bdev.name); 1721 return rc; 1722 } 1723 1724 static void 1725 _vbdev_compress_delete_done(void *_ctx) 1726 { 1727 struct vbdev_comp_delete_ctx *ctx = _ctx; 1728 1729 ctx->cb_fn(ctx->cb_arg, ctx->cb_rc); 1730 1731 free(ctx); 1732 } 1733 1734 static void 1735 vbdev_compress_delete_done(void *cb_arg, int bdeverrno) 1736 { 1737 struct vbdev_comp_delete_ctx *ctx = cb_arg; 1738 1739 ctx->cb_rc = bdeverrno; 1740 1741 if (ctx->orig_thread != spdk_get_thread()) { 1742 spdk_thread_send_msg(ctx->orig_thread, _vbdev_compress_delete_done, ctx); 1743 } else { 1744 _vbdev_compress_delete_done(ctx); 1745 } 1746 } 1747 1748 void 1749 bdev_compress_delete(const char *name, spdk_delete_compress_complete cb_fn, void *cb_arg) 1750 { 1751 struct vbdev_compress *comp_bdev = NULL; 1752 struct vbdev_comp_delete_ctx *ctx; 1753 1754 TAILQ_FOREACH(comp_bdev, &g_vbdev_comp, link) { 1755 if (strcmp(name, comp_bdev->comp_bdev.name) == 0) { 1756 break; 1757 } 1758 } 1759 1760 if (comp_bdev == NULL) { 1761 cb_fn(cb_arg, -ENODEV); 1762 return; 1763 } 1764 1765 ctx = calloc(1, sizeof(*ctx)); 1766 if (ctx == NULL) { 1767 SPDK_ERRLOG("Failed to allocate delete context\n"); 1768 cb_fn(cb_arg, -ENOMEM); 1769 return; 1770 } 1771 1772 /* Save these for after the vol is destroyed. */ 1773 ctx->cb_fn = cb_fn; 1774 ctx->cb_arg = cb_arg; 1775 ctx->orig_thread = spdk_get_thread(); 1776 1777 comp_bdev->delete_ctx = ctx; 1778 1779 /* Tell reducelib that we're done with this volume. */ 1780 if (comp_bdev->orphaned == false) { 1781 spdk_reduce_vol_unload(comp_bdev->vol, delete_vol_unload_cb, comp_bdev); 1782 } else { 1783 delete_vol_unload_cb(comp_bdev, 0); 1784 } 1785 } 1786 1787 static void 1788 _vbdev_reduce_load_cb(void *ctx) 1789 { 1790 struct vbdev_compress *meta_ctx = ctx; 1791 int rc; 1792 1793 assert(meta_ctx->base_desc != NULL); 1794 1795 /* Done with metadata operations */ 1796 spdk_put_io_channel(meta_ctx->base_ch); 1797 1798 if (meta_ctx->reduce_errno == 0) { 1799 if (_set_pmd(meta_ctx) == false) { 1800 SPDK_ERRLOG("could not find required pmd\n"); 1801 goto err; 1802 } 1803 1804 rc = vbdev_compress_claim(meta_ctx); 1805 if (rc != 0) { 1806 goto err; 1807 } 1808 } else if (meta_ctx->reduce_errno == -ENOENT) { 1809 if (_set_compbdev_name(meta_ctx)) { 1810 goto err; 1811 } 1812 1813 /* Save the thread where the base device is opened */ 1814 meta_ctx->thread = spdk_get_thread(); 1815 1816 meta_ctx->comp_bdev.module = &compress_if; 1817 pthread_mutex_init(&meta_ctx->reduce_lock, NULL); 1818 rc = spdk_bdev_module_claim_bdev(meta_ctx->base_bdev, meta_ctx->base_desc, 1819 meta_ctx->comp_bdev.module); 1820 if (rc) { 1821 SPDK_ERRLOG("could not claim bdev %s\n", spdk_bdev_get_name(meta_ctx->base_bdev)); 1822 free(meta_ctx->comp_bdev.name); 1823 goto err; 1824 } 1825 1826 meta_ctx->orphaned = true; 1827 TAILQ_INSERT_TAIL(&g_vbdev_comp, meta_ctx, link); 1828 } else { 1829 if (meta_ctx->reduce_errno != -EILSEQ) { 1830 SPDK_ERRLOG("for vol %s, error %u\n", 1831 spdk_bdev_get_name(meta_ctx->base_bdev), meta_ctx->reduce_errno); 1832 } 1833 goto err; 1834 } 1835 1836 spdk_bdev_module_examine_done(&compress_if); 1837 return; 1838 1839 err: 1840 /* Close the underlying bdev on its same opened thread. */ 1841 spdk_bdev_close(meta_ctx->base_desc); 1842 free(meta_ctx); 1843 spdk_bdev_module_examine_done(&compress_if); 1844 } 1845 1846 /* Callback from reduce for then load is complete. We'll pass the vbdev_comp struct 1847 * used for initial metadata operations to claim where it will be further filled out 1848 * and added to the global list. 1849 */ 1850 static void 1851 vbdev_reduce_load_cb(void *cb_arg, struct spdk_reduce_vol *vol, int reduce_errno) 1852 { 1853 struct vbdev_compress *meta_ctx = cb_arg; 1854 1855 if (reduce_errno == 0) { 1856 /* Update information following volume load. */ 1857 meta_ctx->vol = vol; 1858 memcpy(&meta_ctx->params, spdk_reduce_vol_get_params(vol), 1859 sizeof(struct spdk_reduce_vol_params)); 1860 } 1861 1862 meta_ctx->reduce_errno = reduce_errno; 1863 1864 if (meta_ctx->thread && meta_ctx->thread != spdk_get_thread()) { 1865 spdk_thread_send_msg(meta_ctx->thread, _vbdev_reduce_load_cb, meta_ctx); 1866 } else { 1867 _vbdev_reduce_load_cb(meta_ctx); 1868 } 1869 1870 } 1871 1872 /* Examine_disk entry point: will do a metadata load to see if this is ours, 1873 * and if so will go ahead and claim it. 1874 */ 1875 static void 1876 vbdev_compress_examine(struct spdk_bdev *bdev) 1877 { 1878 struct spdk_bdev_desc *bdev_desc = NULL; 1879 struct vbdev_compress *meta_ctx; 1880 int rc; 1881 1882 if (strcmp(bdev->product_name, COMP_BDEV_NAME) == 0) { 1883 spdk_bdev_module_examine_done(&compress_if); 1884 return; 1885 } 1886 1887 rc = spdk_bdev_open_ext(spdk_bdev_get_name(bdev), false, 1888 vbdev_compress_base_bdev_event_cb, NULL, &bdev_desc); 1889 if (rc) { 1890 SPDK_ERRLOG("could not open bdev %s\n", spdk_bdev_get_name(bdev)); 1891 spdk_bdev_module_examine_done(&compress_if); 1892 return; 1893 } 1894 1895 meta_ctx = _prepare_for_load_init(bdev_desc, 0); 1896 if (meta_ctx == NULL) { 1897 spdk_bdev_close(bdev_desc); 1898 spdk_bdev_module_examine_done(&compress_if); 1899 return; 1900 } 1901 1902 /* Save the thread where the base device is opened */ 1903 meta_ctx->thread = spdk_get_thread(); 1904 1905 meta_ctx->base_ch = spdk_bdev_get_io_channel(meta_ctx->base_desc); 1906 spdk_reduce_vol_load(&meta_ctx->backing_dev, vbdev_reduce_load_cb, meta_ctx); 1907 } 1908 1909 int 1910 compress_set_pmd(enum compress_pmd *opts) 1911 { 1912 g_opts = *opts; 1913 1914 return 0; 1915 } 1916 1917 SPDK_LOG_REGISTER_COMPONENT(vbdev_compress) 1918