1 /*- 2 * BSD LICENSE 3 * 4 * Copyright (c) Intel Corporation. 5 * All rights reserved. 6 * 7 * Redistribution and use in source and binary forms, with or without 8 * modification, are permitted provided that the following conditions 9 * are met: 10 * 11 * * Redistributions of source code must retain the above copyright 12 * notice, this list of conditions and the following disclaimer. 13 * * Redistributions in binary form must reproduce the above copyright 14 * notice, this list of conditions and the following disclaimer in 15 * the documentation and/or other materials provided with the 16 * distribution. 17 * * Neither the name of Intel Corporation nor the names of its 18 * contributors may be used to endorse or promote products derived 19 * from this software without specific prior written permission. 20 * 21 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 22 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 23 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR 24 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT 25 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, 26 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT 27 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 28 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY 29 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 30 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE 31 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 32 */ 33 34 #include "vbdev_compress.h" 35 36 #include "spdk/reduce.h" 37 #include "spdk/stdinc.h" 38 #include "spdk/rpc.h" 39 #include "spdk/env.h" 40 #include "spdk/conf.h" 41 #include "spdk/endian.h" 42 #include "spdk/string.h" 43 #include "spdk/thread.h" 44 #include "spdk/util.h" 45 #include "spdk/bdev_module.h" 46 47 #include "spdk_internal/log.h" 48 49 #include <rte_config.h> 50 #include <rte_bus_vdev.h> 51 #include <rte_compressdev.h> 52 #include <rte_comp.h> 53 54 #define NUM_MAX_XFORMS 2 55 #define NUM_MAX_INFLIGHT_OPS 128 56 #define DEFAULT_WINDOW_SIZE 15 57 /* We need extra mbufs per operation to accommodate host buffers that 58 * span a 2MB boundary. 59 */ 60 #define MAX_MBUFS_PER_OP (REDUCE_MAX_IOVECS * 2) 61 #define CHUNK_SIZE (1024 * 16) 62 #define COMP_BDEV_NAME "compress" 63 #define BACKING_IO_SZ (4 * 1024) 64 65 #define ISAL_PMD "compress_isal" 66 #define QAT_PMD "compress_qat" 67 #define NUM_MBUFS 8192 68 #define POOL_CACHE_SIZE 256 69 70 static enum compress_pmd g_opts; 71 72 /* Global list of available compression devices. */ 73 struct compress_dev { 74 struct rte_compressdev_info cdev_info; /* includes device friendly name */ 75 uint8_t cdev_id; /* identifier for the device */ 76 void *comp_xform; /* shared private xform for comp on this PMD */ 77 void *decomp_xform; /* shared private xform for decomp on this PMD */ 78 TAILQ_ENTRY(compress_dev) link; 79 }; 80 static TAILQ_HEAD(, compress_dev) g_compress_devs = TAILQ_HEAD_INITIALIZER(g_compress_devs); 81 82 /* Although ISAL PMD reports 'unlimited' qpairs, it has an unplanned limit of 99 due to 83 * the length of the internal ring name that it creates, it breaks a limit in the generic 84 * ring code and fails the qp initialization. 85 */ 86 #define MAX_NUM_QP 99 87 /* Global list and lock for unique device/queue pair combos */ 88 struct comp_device_qp { 89 struct compress_dev *device; /* ptr to compression device */ 90 uint8_t qp; /* queue pair for this node */ 91 struct spdk_thread *thread; /* thead that this qp is assigned to */ 92 TAILQ_ENTRY(comp_device_qp) link; 93 }; 94 static TAILQ_HEAD(, comp_device_qp) g_comp_device_qp = TAILQ_HEAD_INITIALIZER(g_comp_device_qp); 95 static pthread_mutex_t g_comp_device_qp_lock = PTHREAD_MUTEX_INITIALIZER; 96 97 /* For queueing up compression operations that we can't submit for some reason */ 98 struct vbdev_comp_op { 99 struct spdk_reduce_backing_dev *backing_dev; 100 struct iovec *src_iovs; 101 int src_iovcnt; 102 struct iovec *dst_iovs; 103 int dst_iovcnt; 104 bool compress; 105 void *cb_arg; 106 TAILQ_ENTRY(vbdev_comp_op) link; 107 }; 108 109 /* List of virtual bdevs and associated info for each. */ 110 struct vbdev_compress { 111 struct spdk_bdev *base_bdev; /* the thing we're attaching to */ 112 struct spdk_bdev_desc *base_desc; /* its descriptor we get from open */ 113 struct spdk_io_channel *base_ch; /* IO channel of base device */ 114 struct spdk_bdev comp_bdev; /* the compression virtual bdev */ 115 struct comp_io_channel *comp_ch; /* channel associated with this bdev */ 116 char *drv_name; /* name of the compression device driver */ 117 struct comp_device_qp *device_qp; 118 struct spdk_thread *reduce_thread; 119 pthread_mutex_t reduce_lock; 120 uint32_t ch_count; 121 TAILQ_HEAD(, spdk_bdev_io) pending_comp_ios; /* outstanding operations to a comp library */ 122 struct spdk_poller *poller; /* completion poller */ 123 struct spdk_reduce_vol_params params; /* params for the reduce volume */ 124 struct spdk_reduce_backing_dev backing_dev; /* backing device info for the reduce volume */ 125 struct spdk_reduce_vol *vol; /* the reduce volume */ 126 spdk_delete_compress_complete delete_cb_fn; 127 void *delete_cb_arg; 128 bool orphaned; /* base bdev claimed but comp_bdev not registered */ 129 TAILQ_HEAD(, vbdev_comp_op) queued_comp_ops; 130 TAILQ_ENTRY(vbdev_compress) link; 131 }; 132 static TAILQ_HEAD(, vbdev_compress) g_vbdev_comp = TAILQ_HEAD_INITIALIZER(g_vbdev_comp); 133 134 /* The comp vbdev channel struct. It is allocated and freed on my behalf by the io channel code. 135 */ 136 struct comp_io_channel { 137 struct spdk_io_channel_iter *iter; /* used with for_each_channel in reset */ 138 }; 139 140 /* Per I/O context for the compression vbdev. */ 141 struct comp_bdev_io { 142 struct comp_io_channel *comp_ch; /* used in completion handling */ 143 struct vbdev_compress *comp_bdev; /* vbdev associated with this IO */ 144 struct spdk_bdev_io_wait_entry bdev_io_wait; /* for bdev_io_wait */ 145 struct spdk_bdev_io *orig_io; /* the original IO */ 146 struct spdk_io_channel *ch; /* for resubmission */ 147 int status; /* save for completion on orig thread */ 148 }; 149 150 /* Shared mempools between all devices on this system */ 151 static struct rte_mempool *g_mbuf_mp = NULL; /* mbuf mempool */ 152 static struct rte_mempool *g_comp_op_mp = NULL; /* comp operations, must be rte* mempool */ 153 static struct rte_mbuf_ext_shared_info g_shinfo = {}; /* used by DPDK mbuf macros */ 154 static bool g_qat_available = false; 155 static bool g_isal_available = false; 156 157 /* Create shared (between all ops per PMD) compress xforms. */ 158 static struct rte_comp_xform g_comp_xform = { 159 .type = RTE_COMP_COMPRESS, 160 .compress = { 161 .algo = RTE_COMP_ALGO_DEFLATE, 162 .deflate.huffman = RTE_COMP_HUFFMAN_DEFAULT, 163 .level = RTE_COMP_LEVEL_MAX, 164 .window_size = DEFAULT_WINDOW_SIZE, 165 .chksum = RTE_COMP_CHECKSUM_NONE, 166 .hash_algo = RTE_COMP_HASH_ALGO_NONE 167 } 168 }; 169 /* Create shared (between all ops per PMD) decompress xforms. */ 170 static struct rte_comp_xform g_decomp_xform = { 171 .type = RTE_COMP_DECOMPRESS, 172 .decompress = { 173 .algo = RTE_COMP_ALGO_DEFLATE, 174 .chksum = RTE_COMP_CHECKSUM_NONE, 175 .window_size = DEFAULT_WINDOW_SIZE, 176 .hash_algo = RTE_COMP_HASH_ALGO_NONE 177 } 178 }; 179 180 static void vbdev_compress_examine(struct spdk_bdev *bdev); 181 static void vbdev_compress_claim(struct vbdev_compress *comp_bdev); 182 static void vbdev_compress_queue_io(struct spdk_bdev_io *bdev_io); 183 struct vbdev_compress *_prepare_for_load_init(struct spdk_bdev *bdev); 184 static void vbdev_compress_submit_request(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io); 185 static void comp_bdev_ch_destroy_cb(void *io_device, void *ctx_buf); 186 187 /* Dummy function used by DPDK to free ext attached buffers 188 * to mbufs, we free them ourselves but this callback has to 189 * be here. 190 */ 191 static void 192 shinfo_free_cb(void *arg1, void *arg2) 193 { 194 } 195 196 /* Called by vbdev_init_compress_drivers() to init each discovered compression device */ 197 static int 198 create_compress_dev(uint8_t index) 199 { 200 struct compress_dev *device; 201 uint16_t q_pairs; 202 uint8_t cdev_id; 203 int rc, i; 204 struct comp_device_qp *dev_qp; 205 struct comp_device_qp *tmp_qp; 206 207 device = calloc(1, sizeof(struct compress_dev)); 208 if (!device) { 209 return -ENOMEM; 210 } 211 212 /* Get details about this device. */ 213 rte_compressdev_info_get(index, &device->cdev_info); 214 215 cdev_id = device->cdev_id = index; 216 217 /* Zero means no limit so choose number of lcores. */ 218 if (device->cdev_info.max_nb_queue_pairs == 0) { 219 q_pairs = MAX_NUM_QP; 220 } else { 221 q_pairs = spdk_min(device->cdev_info.max_nb_queue_pairs, MAX_NUM_QP); 222 } 223 224 /* Configure the compression device. */ 225 struct rte_compressdev_config config = { 226 .socket_id = rte_socket_id(), 227 .nb_queue_pairs = q_pairs, 228 .max_nb_priv_xforms = NUM_MAX_XFORMS, 229 .max_nb_streams = 0 230 }; 231 rc = rte_compressdev_configure(cdev_id, &config); 232 if (rc < 0) { 233 SPDK_ERRLOG("Failed to configure compressdev %u\n", cdev_id); 234 goto err; 235 } 236 237 /* Pre-setup all potential qpairs now and assign them in the channel 238 * callback. 239 */ 240 for (i = 0; i < q_pairs; i++) { 241 rc = rte_compressdev_queue_pair_setup(cdev_id, i, 242 NUM_MAX_INFLIGHT_OPS, 243 rte_socket_id()); 244 if (rc) { 245 if (i > 0) { 246 q_pairs = i; 247 SPDK_NOTICELOG("FYI failed to setup a queue pair on " 248 "compressdev %u with error %u " 249 "so limiting to %u qpairs\n", 250 cdev_id, rc, q_pairs); 251 break; 252 } else { 253 SPDK_ERRLOG("Failed to setup queue pair on " 254 "compressdev %u with error %u\n", cdev_id, rc); 255 rc = -EINVAL; 256 goto err; 257 } 258 } 259 } 260 261 rc = rte_compressdev_start(cdev_id); 262 if (rc < 0) { 263 SPDK_ERRLOG("Failed to start device %u: error %d\n", 264 cdev_id, rc); 265 goto err; 266 } 267 268 if (device->cdev_info.capabilities->comp_feature_flags & RTE_COMP_FF_SHAREABLE_PRIV_XFORM) { 269 rc = rte_compressdev_private_xform_create(cdev_id, &g_comp_xform, 270 &device->comp_xform); 271 if (rc < 0) { 272 SPDK_ERRLOG("Failed to create private comp xform device %u: error %d\n", 273 cdev_id, rc); 274 goto err; 275 } 276 277 rc = rte_compressdev_private_xform_create(cdev_id, &g_decomp_xform, 278 &device->decomp_xform); 279 if (rc) { 280 SPDK_ERRLOG("Failed to create private decomp xform device %u: error %d\n", 281 cdev_id, rc); 282 goto err; 283 } 284 } else { 285 SPDK_ERRLOG("PMD does not support shared transforms\n"); 286 goto err; 287 } 288 289 /* Build up list of device/qp combinations */ 290 for (i = 0; i < q_pairs; i++) { 291 dev_qp = calloc(1, sizeof(struct comp_device_qp)); 292 if (!dev_qp) { 293 rc = -ENOMEM; 294 goto err; 295 } 296 dev_qp->device = device; 297 dev_qp->qp = i; 298 dev_qp->thread = NULL; 299 TAILQ_INSERT_TAIL(&g_comp_device_qp, dev_qp, link); 300 } 301 302 TAILQ_INSERT_TAIL(&g_compress_devs, device, link); 303 304 if (strcmp(device->cdev_info.driver_name, QAT_PMD) == 0) { 305 g_qat_available = true; 306 } 307 if (strcmp(device->cdev_info.driver_name, ISAL_PMD) == 0) { 308 g_isal_available = true; 309 } 310 311 return 0; 312 313 err: 314 TAILQ_FOREACH_SAFE(dev_qp, &g_comp_device_qp, link, tmp_qp) { 315 TAILQ_REMOVE(&g_comp_device_qp, dev_qp, link); 316 free(dev_qp); 317 } 318 free(device); 319 return rc; 320 } 321 322 /* Called from driver init entry point, vbdev_compress_init() */ 323 static int 324 vbdev_init_compress_drivers(void) 325 { 326 uint8_t cdev_count, i; 327 struct compress_dev *tmp_dev; 328 struct compress_dev *device; 329 int rc; 330 331 /* We always init the compress_isal PMD */ 332 rc = rte_vdev_init(ISAL_PMD, NULL); 333 if (rc == 0) { 334 SPDK_NOTICELOG("created virtual PMD %s\n", ISAL_PMD); 335 } else if (rc == -EEXIST) { 336 SPDK_NOTICELOG("virtual PMD %s already exists.\n", ISAL_PMD); 337 } else { 338 SPDK_ERRLOG("creating virtual PMD %s\n", ISAL_PMD); 339 return -EINVAL; 340 } 341 342 /* If we have no compression devices, there's no reason to continue. */ 343 cdev_count = rte_compressdev_count(); 344 if (cdev_count == 0) { 345 return 0; 346 } 347 if (cdev_count > RTE_COMPRESS_MAX_DEVS) { 348 SPDK_ERRLOG("invalid device count from rte_compressdev_count()\n"); 349 return -EINVAL; 350 } 351 352 g_mbuf_mp = rte_pktmbuf_pool_create("comp_mbuf_mp", NUM_MBUFS, POOL_CACHE_SIZE, 353 sizeof(struct rte_mbuf), 0, rte_socket_id()); 354 if (g_mbuf_mp == NULL) { 355 SPDK_ERRLOG("Cannot create mbuf pool\n"); 356 rc = -ENOMEM; 357 goto error_create_mbuf; 358 } 359 360 g_comp_op_mp = rte_comp_op_pool_create("comp_op_pool", NUM_MBUFS, POOL_CACHE_SIZE, 361 0, rte_socket_id()); 362 if (g_comp_op_mp == NULL) { 363 SPDK_ERRLOG("Cannot create comp op pool\n"); 364 rc = -ENOMEM; 365 goto error_create_op; 366 } 367 368 /* Init all devices */ 369 for (i = 0; i < cdev_count; i++) { 370 rc = create_compress_dev(i); 371 if (rc != 0) { 372 goto error_create_compress_devs; 373 } 374 } 375 376 if (g_qat_available == true) { 377 SPDK_NOTICELOG("initialized QAT PMD\n"); 378 } 379 380 g_shinfo.free_cb = shinfo_free_cb; 381 382 return 0; 383 384 /* Error cleanup paths. */ 385 error_create_compress_devs: 386 TAILQ_FOREACH_SAFE(device, &g_compress_devs, link, tmp_dev) { 387 TAILQ_REMOVE(&g_compress_devs, device, link); 388 free(device); 389 } 390 error_create_op: 391 error_create_mbuf: 392 rte_mempool_free(g_mbuf_mp); 393 394 return rc; 395 } 396 397 /* for completing rw requests on the orig IO thread. */ 398 static void 399 _spdk_reduce_rw_blocks_cb(void *arg) 400 { 401 struct comp_bdev_io *io_ctx = arg; 402 403 if (io_ctx->status == 0) { 404 spdk_bdev_io_complete(io_ctx->orig_io, SPDK_BDEV_IO_STATUS_SUCCESS); 405 } else { 406 SPDK_ERRLOG("status %d on operation from reduce API\n", io_ctx->status); 407 spdk_bdev_io_complete(io_ctx->orig_io, SPDK_BDEV_IO_STATUS_FAILED); 408 } 409 } 410 411 /* Completion callback for r/w that were issued via reducelib. */ 412 static void 413 spdk_reduce_rw_blocks_cb(void *arg, int reduce_errno) 414 { 415 struct spdk_bdev_io *bdev_io = arg; 416 struct comp_bdev_io *io_ctx = (struct comp_bdev_io *)bdev_io->driver_ctx; 417 struct spdk_io_channel *ch = spdk_io_channel_from_ctx(io_ctx->comp_ch); 418 419 /* TODO: need to decide which error codes are bdev_io success vs failure; 420 * example examine calls reading metadata */ 421 422 io_ctx->status = reduce_errno; 423 424 /* Send this request to the orig IO thread. */ 425 if (spdk_io_channel_get_thread(ch) != spdk_get_thread()) { 426 spdk_thread_send_msg(spdk_io_channel_get_thread(ch), _spdk_reduce_rw_blocks_cb, io_ctx); 427 } else { 428 _spdk_reduce_rw_blocks_cb(io_ctx); 429 } 430 } 431 432 static int 433 _compress_operation(struct spdk_reduce_backing_dev *backing_dev, struct iovec *src_iovs, 434 int src_iovcnt, struct iovec *dst_iovs, 435 int dst_iovcnt, bool compress, void *cb_arg) 436 { 437 void *reduce_cb_arg = cb_arg; 438 struct vbdev_compress *comp_bdev = SPDK_CONTAINEROF(backing_dev, struct vbdev_compress, 439 backing_dev); 440 struct rte_comp_op *comp_op; 441 struct rte_mbuf *src_mbufs[MAX_MBUFS_PER_OP]; 442 struct rte_mbuf *dst_mbufs[MAX_MBUFS_PER_OP]; 443 uint8_t cdev_id = comp_bdev->device_qp->device->cdev_id; 444 uint64_t updated_length, remainder, phys_addr, total_length = 0; 445 uint8_t *current_src_base = NULL; 446 uint8_t *current_dst_base = NULL; 447 int iov_index, mbuf_index; 448 int rc = 0; 449 struct vbdev_comp_op *op_to_queue; 450 int i; 451 int src_mbuf_total = src_iovcnt; 452 int dst_mbuf_total = dst_iovcnt; 453 454 assert(src_iovcnt < MAX_MBUFS_PER_OP); 455 456 #ifdef DEBUG 457 memset(src_mbufs, 0, sizeof(src_mbufs)); 458 memset(dst_mbufs, 0, sizeof(dst_mbufs)); 459 #endif 460 461 comp_op = rte_comp_op_alloc(g_comp_op_mp); 462 if (!comp_op) { 463 SPDK_ERRLOG("trying to get a comp op!\n"); 464 goto error_get_op; 465 } 466 467 /* get an mbuf per iov, src and dst */ 468 rc = rte_pktmbuf_alloc_bulk(g_mbuf_mp, (struct rte_mbuf **)&src_mbufs[0], src_iovcnt); 469 if (rc) { 470 SPDK_ERRLOG("ERROR trying to get src_mbufs!\n"); 471 goto error_get_src; 472 } 473 474 rc = rte_pktmbuf_alloc_bulk(g_mbuf_mp, (struct rte_mbuf **)&dst_mbufs[0], dst_iovcnt); 475 if (rc) { 476 SPDK_ERRLOG("ERROR trying to get dst_mbufs!\n"); 477 goto error_get_dst; 478 } 479 480 /* There is a 1:1 mapping between a bdev_io and a compression operation, but 481 * all compression PMDs that SPDK uses support chaining so build our mbuf chain 482 * and associate with our single comp_op. 483 */ 484 485 /* Setup src mbufs */ 486 iov_index = mbuf_index = 0; 487 while (iov_index < src_iovcnt) { 488 489 current_src_base = src_iovs[iov_index].iov_base; 490 total_length += src_iovs[iov_index].iov_len; 491 assert(src_mbufs[mbuf_index] != NULL); 492 src_mbufs[mbuf_index]->userdata = reduce_cb_arg; 493 updated_length = src_iovs[iov_index].iov_len; 494 phys_addr = spdk_vtophys((void *)current_src_base, &updated_length); 495 496 rte_pktmbuf_attach_extbuf(src_mbufs[mbuf_index], 497 current_src_base, 498 phys_addr, 499 updated_length, 500 &g_shinfo); 501 rte_pktmbuf_append(src_mbufs[mbuf_index], updated_length); 502 remainder = src_iovs[iov_index].iov_len - updated_length; 503 504 if (mbuf_index > 0) { 505 rte_pktmbuf_chain(src_mbufs[0], src_mbufs[mbuf_index]); 506 } 507 508 /* If we crossed 2 2MB boundary we need another mbuf for the remainder */ 509 if (remainder > 0) { 510 /* allocate an mbuf at the end of the array */ 511 rc = rte_pktmbuf_alloc_bulk(g_mbuf_mp, (struct rte_mbuf **)&src_mbufs[src_mbuf_total], 1); 512 if (rc) { 513 SPDK_ERRLOG("ERROR trying to get an extra src_mbuf!\n"); 514 goto error_src_dst; 515 } 516 src_mbuf_total++; 517 mbuf_index++; 518 src_mbufs[mbuf_index]->userdata = reduce_cb_arg; 519 current_src_base += updated_length; 520 phys_addr = spdk_vtophys((void *)current_src_base, &remainder); 521 /* assert we don't cross another */ 522 assert(remainder == src_iovs[iov_index].iov_len - updated_length); 523 524 rte_pktmbuf_attach_extbuf(src_mbufs[mbuf_index], 525 current_src_base, 526 phys_addr, 527 remainder, 528 &g_shinfo); 529 rte_pktmbuf_append(src_mbufs[mbuf_index], remainder); 530 rte_pktmbuf_chain(src_mbufs[0], src_mbufs[mbuf_index]); 531 } 532 iov_index++; 533 mbuf_index++; 534 } 535 536 comp_op->m_src = src_mbufs[0]; 537 comp_op->src.offset = 0; 538 comp_op->src.length = total_length; 539 540 /* setup dst mbufs, for the current test being used with this code there's only one vector */ 541 iov_index = mbuf_index = 0; 542 while (iov_index < dst_iovcnt) { 543 544 current_dst_base = dst_iovs[iov_index].iov_base; 545 updated_length = dst_iovs[iov_index].iov_len; 546 phys_addr = spdk_vtophys((void *)current_dst_base, &updated_length); 547 548 rte_pktmbuf_attach_extbuf(dst_mbufs[mbuf_index], 549 current_dst_base, 550 phys_addr, 551 updated_length, 552 &g_shinfo); 553 rte_pktmbuf_append(dst_mbufs[mbuf_index], updated_length); 554 remainder = dst_iovs[iov_index].iov_len - updated_length; 555 556 if (mbuf_index > 0) { 557 rte_pktmbuf_chain(dst_mbufs[0], dst_mbufs[mbuf_index]); 558 } 559 560 /* If we crossed 2 2MB boundary we need another mbuf for the remainder */ 561 if (remainder > 0) { 562 rc = rte_pktmbuf_alloc_bulk(g_mbuf_mp, (struct rte_mbuf **)&dst_mbufs[dst_mbuf_total], 1); 563 if (rc) { 564 SPDK_ERRLOG("ERROR trying to get an extra dst_mbuf!\n"); 565 goto error_src_dst; 566 } 567 dst_mbuf_total++; 568 mbuf_index++; 569 current_dst_base += updated_length; 570 phys_addr = spdk_vtophys((void *)current_dst_base, &remainder); 571 /* assert we don't cross another */ 572 assert(remainder == dst_iovs[iov_index].iov_len - updated_length); 573 574 rte_pktmbuf_attach_extbuf(dst_mbufs[mbuf_index], 575 current_dst_base, 576 phys_addr, 577 remainder, 578 &g_shinfo); 579 rte_pktmbuf_append(dst_mbufs[mbuf_index], remainder); 580 rte_pktmbuf_chain(dst_mbufs[0], dst_mbufs[mbuf_index]); 581 } 582 iov_index++; 583 mbuf_index++; 584 } 585 586 comp_op->m_dst = dst_mbufs[0]; 587 comp_op->dst.offset = 0; 588 589 if (compress == true) { 590 comp_op->private_xform = comp_bdev->device_qp->device->comp_xform; 591 } else { 592 comp_op->private_xform = comp_bdev->device_qp->device->decomp_xform; 593 } 594 595 comp_op->op_type = RTE_COMP_OP_STATELESS; 596 comp_op->flush_flag = RTE_COMP_FLUSH_FINAL; 597 598 rc = rte_compressdev_enqueue_burst(cdev_id, comp_bdev->device_qp->qp, &comp_op, 1); 599 assert(rc <= 1); 600 601 /* We always expect 1 got queued, if 0 then we need to queue it up. */ 602 if (rc == 1) { 603 return 0; 604 } else { 605 /* we free mbufs differently depending on whether they were chained or not */ 606 rte_pktmbuf_free(comp_op->m_src); 607 rte_pktmbuf_free(comp_op->m_dst); 608 goto error_enqueue; 609 } 610 611 /* Error cleanup paths. */ 612 error_src_dst: 613 for (i = 0; i < dst_mbuf_total; i++) { 614 rte_pktmbuf_free((struct rte_mbuf *)&dst_mbufs[i]); 615 } 616 error_get_dst: 617 for (i = 0; i < src_mbuf_total; i++) { 618 rte_pktmbuf_free((struct rte_mbuf *)&src_mbufs[i]); 619 } 620 error_get_src: 621 error_enqueue: 622 rte_comp_op_free(comp_op); 623 error_get_op: 624 op_to_queue = calloc(1, sizeof(struct vbdev_comp_op)); 625 if (op_to_queue == NULL) { 626 SPDK_ERRLOG("unable to allocate operation for queueing.\n"); 627 return -ENOMEM; 628 } 629 op_to_queue->backing_dev = backing_dev; 630 op_to_queue->src_iovs = src_iovs; 631 op_to_queue->src_iovcnt = src_iovcnt; 632 op_to_queue->dst_iovs = dst_iovs; 633 op_to_queue->dst_iovcnt = dst_iovcnt; 634 op_to_queue->compress = compress; 635 op_to_queue->cb_arg = cb_arg; 636 TAILQ_INSERT_TAIL(&comp_bdev->queued_comp_ops, 637 op_to_queue, 638 link); 639 return 0; 640 } 641 642 /* Poller for the DPDK compression driver. */ 643 static int 644 comp_dev_poller(void *args) 645 { 646 struct vbdev_compress *comp_bdev = args; 647 uint8_t cdev_id = comp_bdev->device_qp->device->cdev_id; 648 struct rte_comp_op *deq_ops[NUM_MAX_INFLIGHT_OPS]; 649 uint16_t num_deq; 650 struct spdk_reduce_vol_cb_args *reduce_args; 651 struct vbdev_comp_op *op_to_resubmit; 652 int rc, i; 653 654 num_deq = rte_compressdev_dequeue_burst(cdev_id, comp_bdev->device_qp->qp, deq_ops, 655 NUM_MAX_INFLIGHT_OPS); 656 for (i = 0; i < num_deq; i++) { 657 reduce_args = (struct spdk_reduce_vol_cb_args *)deq_ops[i]->m_src->userdata; 658 659 if (deq_ops[i]->status == RTE_COMP_OP_STATUS_SUCCESS) { 660 661 /* tell reduce this is done and what the bytecount was */ 662 reduce_args->cb_fn(reduce_args->cb_arg, deq_ops[i]->produced); 663 } else { 664 SPDK_NOTICELOG("FYI storing data uncompressed due to deque status %u\n", 665 deq_ops[i]->status); 666 667 /* Reduce will simply store uncompressed on neg errno value. */ 668 reduce_args->cb_fn(reduce_args->cb_arg, -EINVAL); 669 } 670 671 /* Now free both mbufs and the compress operation. The rte_pktmbuf_free() 672 * call takes care of freeing all of the mbufs in the chain back to their 673 * original pool. 674 */ 675 rte_pktmbuf_free(deq_ops[i]->m_src); 676 rte_pktmbuf_free(deq_ops[i]->m_dst); 677 678 /* There is no bulk free for com ops so we have to free them one at a time 679 * here however it would be rare that we'd ever have more than 1 at a time 680 * anyways. 681 */ 682 rte_comp_op_free(deq_ops[i]); 683 684 /* Check if there are any pending comp ops to process */ 685 while (!TAILQ_EMPTY(&comp_bdev->queued_comp_ops)) { 686 op_to_resubmit = TAILQ_FIRST(&comp_bdev->queued_comp_ops); 687 rc = _compress_operation(op_to_resubmit->backing_dev, 688 op_to_resubmit->src_iovs, 689 op_to_resubmit->src_iovcnt, 690 op_to_resubmit->dst_iovs, 691 op_to_resubmit->dst_iovcnt, 692 op_to_resubmit->compress, 693 op_to_resubmit->cb_arg); 694 if (rc == 0) { 695 TAILQ_REMOVE(&comp_bdev->queued_comp_ops, op_to_resubmit, link); 696 free(op_to_resubmit); 697 } 698 break; 699 } 700 } 701 return 0; 702 } 703 704 /* Entry point for reduce lib to issue a compress operation. */ 705 static void 706 _comp_reduce_compress(struct spdk_reduce_backing_dev *dev, 707 struct iovec *src_iovs, int src_iovcnt, 708 struct iovec *dst_iovs, int dst_iovcnt, 709 struct spdk_reduce_vol_cb_args *cb_arg) 710 { 711 int rc; 712 713 rc = _compress_operation(dev, src_iovs, src_iovcnt, dst_iovs, dst_iovcnt, true, cb_arg); 714 if (rc) { 715 SPDK_ERRLOG("with compress operation code %d (%s)\n", rc, spdk_strerror(-rc)); 716 cb_arg->cb_fn(cb_arg->cb_arg, rc); 717 } 718 } 719 720 /* Entry point for reduce lib to issue a decompress operation. */ 721 static void 722 _comp_reduce_decompress(struct spdk_reduce_backing_dev *dev, 723 struct iovec *src_iovs, int src_iovcnt, 724 struct iovec *dst_iovs, int dst_iovcnt, 725 struct spdk_reduce_vol_cb_args *cb_arg) 726 { 727 int rc; 728 729 rc = _compress_operation(dev, src_iovs, src_iovcnt, dst_iovs, dst_iovcnt, false, cb_arg); 730 if (rc) { 731 SPDK_ERRLOG("with decompress operation code %d (%s)\n", rc, spdk_strerror(-rc)); 732 cb_arg->cb_fn(cb_arg->cb_arg, rc); 733 } 734 } 735 736 /* Callback for getting a buf from the bdev pool in the event that the caller passed 737 * in NULL, we need to own the buffer so it doesn't get freed by another vbdev module 738 * beneath us before we're done with it. 739 */ 740 static void 741 comp_read_get_buf_cb(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io, bool success) 742 { 743 struct vbdev_compress *comp_bdev = SPDK_CONTAINEROF(bdev_io->bdev, struct vbdev_compress, 744 comp_bdev); 745 746 spdk_reduce_vol_readv(comp_bdev->vol, bdev_io->u.bdev.iovs, bdev_io->u.bdev.iovcnt, 747 bdev_io->u.bdev.offset_blocks, bdev_io->u.bdev.num_blocks, 748 spdk_reduce_rw_blocks_cb, bdev_io); 749 } 750 751 /* scheduled for completion on IO thread */ 752 static void 753 _complete_other_io(void *arg) 754 { 755 struct comp_bdev_io *io_ctx = (struct comp_bdev_io *)arg; 756 if (io_ctx->status == 0) { 757 spdk_bdev_io_complete(io_ctx->orig_io, SPDK_BDEV_IO_STATUS_SUCCESS); 758 } else { 759 spdk_bdev_io_complete(io_ctx->orig_io, SPDK_BDEV_IO_STATUS_FAILED); 760 } 761 } 762 763 /* scheduled for submission on reduce thread */ 764 static void 765 _comp_bdev_io_submit(void *arg) 766 { 767 struct spdk_bdev_io *bdev_io = arg; 768 struct comp_bdev_io *io_ctx = (struct comp_bdev_io *)bdev_io->driver_ctx; 769 struct spdk_io_channel *ch = spdk_io_channel_from_ctx(io_ctx->comp_ch); 770 struct vbdev_compress *comp_bdev = SPDK_CONTAINEROF(bdev_io->bdev, struct vbdev_compress, 771 comp_bdev); 772 int rc = 0; 773 774 switch (bdev_io->type) { 775 case SPDK_BDEV_IO_TYPE_READ: 776 spdk_bdev_io_get_buf(bdev_io, comp_read_get_buf_cb, 777 bdev_io->u.bdev.num_blocks * bdev_io->bdev->blocklen); 778 return; 779 case SPDK_BDEV_IO_TYPE_WRITE: 780 spdk_reduce_vol_writev(comp_bdev->vol, bdev_io->u.bdev.iovs, bdev_io->u.bdev.iovcnt, 781 bdev_io->u.bdev.offset_blocks, bdev_io->u.bdev.num_blocks, 782 spdk_reduce_rw_blocks_cb, bdev_io); 783 return; 784 /* TODO in future patch in the series */ 785 case SPDK_BDEV_IO_TYPE_RESET: 786 break; 787 case SPDK_BDEV_IO_TYPE_WRITE_ZEROES: 788 case SPDK_BDEV_IO_TYPE_UNMAP: 789 case SPDK_BDEV_IO_TYPE_FLUSH: 790 default: 791 SPDK_ERRLOG("Unknown I/O type %d\n", bdev_io->type); 792 rc = -EINVAL; 793 } 794 795 if (rc) { 796 if (rc == -ENOMEM) { 797 SPDK_ERRLOG("No memory, start to queue io for compress.\n"); 798 io_ctx->ch = ch; 799 vbdev_compress_queue_io(bdev_io); 800 return; 801 } else { 802 SPDK_ERRLOG("on bdev_io submission!\n"); 803 io_ctx->status = rc; 804 } 805 } 806 807 /* Complete this on the orig IO thread. */ 808 if (spdk_io_channel_get_thread(ch) != spdk_get_thread()) { 809 spdk_thread_send_msg(spdk_io_channel_get_thread(ch), _complete_other_io, io_ctx); 810 } else { 811 _complete_other_io(io_ctx); 812 } 813 } 814 815 /* Called when someone above submits IO to this vbdev. */ 816 static void 817 vbdev_compress_submit_request(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io) 818 { 819 struct comp_bdev_io *io_ctx = (struct comp_bdev_io *)bdev_io->driver_ctx; 820 struct vbdev_compress *comp_bdev = SPDK_CONTAINEROF(bdev_io->bdev, struct vbdev_compress, 821 comp_bdev); 822 struct comp_io_channel *comp_ch = spdk_io_channel_get_ctx(ch); 823 824 memset(io_ctx, 0, sizeof(struct comp_bdev_io)); 825 io_ctx->comp_bdev = comp_bdev; 826 io_ctx->comp_ch = comp_ch; 827 io_ctx->orig_io = bdev_io; 828 829 /* Send this request to the reduce_thread if that's not what we're on. */ 830 if (spdk_io_channel_get_thread(ch) != comp_bdev->reduce_thread) { 831 spdk_thread_send_msg(comp_bdev->reduce_thread, _comp_bdev_io_submit, bdev_io); 832 } else { 833 _comp_bdev_io_submit(bdev_io); 834 } 835 } 836 837 static bool 838 vbdev_compress_io_type_supported(void *ctx, enum spdk_bdev_io_type io_type) 839 { 840 struct vbdev_compress *comp_bdev = (struct vbdev_compress *)ctx; 841 842 switch (io_type) { 843 case SPDK_BDEV_IO_TYPE_READ: 844 case SPDK_BDEV_IO_TYPE_WRITE: 845 return spdk_bdev_io_type_supported(comp_bdev->base_bdev, io_type); 846 case SPDK_BDEV_IO_TYPE_UNMAP: 847 case SPDK_BDEV_IO_TYPE_RESET: 848 case SPDK_BDEV_IO_TYPE_FLUSH: 849 case SPDK_BDEV_IO_TYPE_WRITE_ZEROES: 850 default: 851 return false; 852 } 853 } 854 855 /* Resubmission function used by the bdev layer when a queued IO is ready to be 856 * submitted. 857 */ 858 static void 859 vbdev_compress_resubmit_io(void *arg) 860 { 861 struct spdk_bdev_io *bdev_io = (struct spdk_bdev_io *)arg; 862 struct comp_bdev_io *io_ctx = (struct comp_bdev_io *)bdev_io->driver_ctx; 863 864 vbdev_compress_submit_request(io_ctx->ch, bdev_io); 865 } 866 867 /* Used to queue an IO in the event of resource issues. */ 868 static void 869 vbdev_compress_queue_io(struct spdk_bdev_io *bdev_io) 870 { 871 struct comp_bdev_io *io_ctx = (struct comp_bdev_io *)bdev_io->driver_ctx; 872 int rc; 873 874 io_ctx->bdev_io_wait.bdev = bdev_io->bdev; 875 io_ctx->bdev_io_wait.cb_fn = vbdev_compress_resubmit_io; 876 io_ctx->bdev_io_wait.cb_arg = bdev_io; 877 878 rc = spdk_bdev_queue_io_wait(bdev_io->bdev, io_ctx->ch, &io_ctx->bdev_io_wait); 879 if (rc) { 880 SPDK_ERRLOG("Queue io failed in vbdev_compress_queue_io, rc=%d.\n", rc); 881 assert(false); 882 spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED); 883 } 884 } 885 886 /* Callback for unregistering the IO device. */ 887 static void 888 _device_unregister_cb(void *io_device) 889 { 890 struct vbdev_compress *comp_bdev = io_device; 891 892 /* Done with this comp_bdev. */ 893 pthread_mutex_destroy(&comp_bdev->reduce_lock); 894 free(comp_bdev->comp_bdev.name); 895 free(comp_bdev); 896 } 897 898 static void 899 vbdev_compress_destruct_cb(void *cb_arg, int reduce_errno) 900 { 901 struct vbdev_compress *comp_bdev = (struct vbdev_compress *)cb_arg; 902 903 if (reduce_errno) { 904 SPDK_ERRLOG("number %d\n", reduce_errno); 905 } else { 906 TAILQ_REMOVE(&g_vbdev_comp, comp_bdev, link); 907 spdk_bdev_module_release_bdev(comp_bdev->base_bdev); 908 spdk_bdev_close(comp_bdev->base_desc); 909 comp_bdev->vol = NULL; 910 if (comp_bdev->orphaned == false) { 911 spdk_io_device_unregister(comp_bdev, _device_unregister_cb); 912 } else { 913 comp_bdev->delete_cb_fn(comp_bdev->delete_cb_arg, 0); 914 _device_unregister_cb(comp_bdev); 915 } 916 } 917 } 918 919 static void 920 _reduce_destroy_cb(void *ctx, int reduce_errno) 921 { 922 struct vbdev_compress *comp_bdev = (struct vbdev_compress *)ctx; 923 924 if (reduce_errno) { 925 SPDK_ERRLOG("number %d\n", reduce_errno); 926 } 927 928 comp_bdev->vol = NULL; 929 spdk_put_io_channel(comp_bdev->base_ch); 930 if (comp_bdev->orphaned == false) { 931 spdk_bdev_unregister(&comp_bdev->comp_bdev, comp_bdev->delete_cb_fn, 932 comp_bdev->delete_cb_arg); 933 } else { 934 vbdev_compress_destruct_cb((void *)comp_bdev, 0); 935 } 936 937 } 938 939 /* Called by reduceLib after performing unload vol actions */ 940 static void 941 delete_vol_unload_cb(void *cb_arg, int reduce_errno) 942 { 943 struct vbdev_compress *comp_bdev = (struct vbdev_compress *)cb_arg; 944 945 if (reduce_errno) { 946 SPDK_ERRLOG("number %d\n", reduce_errno); 947 } else { 948 /* reducelib needs a channel to comm with the backing device */ 949 comp_bdev->base_ch = spdk_bdev_get_io_channel(comp_bdev->base_desc); 950 951 /* Clean the device before we free our resources. */ 952 spdk_reduce_vol_destroy(&comp_bdev->backing_dev, _reduce_destroy_cb, comp_bdev); 953 } 954 } 955 956 const char * 957 compress_get_name(const struct vbdev_compress *comp_bdev) 958 { 959 return comp_bdev->comp_bdev.name; 960 } 961 962 struct vbdev_compress * 963 compress_bdev_first(void) 964 { 965 struct vbdev_compress *comp_bdev; 966 967 comp_bdev = TAILQ_FIRST(&g_vbdev_comp); 968 969 return comp_bdev; 970 } 971 972 struct vbdev_compress * 973 compress_bdev_next(struct vbdev_compress *prev) 974 { 975 struct vbdev_compress *comp_bdev; 976 977 comp_bdev = TAILQ_NEXT(prev, link); 978 979 return comp_bdev; 980 } 981 982 bool 983 compress_has_orphan(const char *name) 984 { 985 struct vbdev_compress *comp_bdev; 986 987 TAILQ_FOREACH(comp_bdev, &g_vbdev_comp, link) { 988 if (comp_bdev->orphaned && strcmp(name, comp_bdev->comp_bdev.name) == 0) { 989 return true; 990 } 991 } 992 return false; 993 } 994 995 /* Called after we've unregistered following a hot remove callback. 996 * Our finish entry point will be called next. 997 */ 998 static int 999 vbdev_compress_destruct(void *ctx) 1000 { 1001 struct vbdev_compress *comp_bdev = (struct vbdev_compress *)ctx; 1002 1003 if (comp_bdev->vol != NULL) { 1004 /* Tell reducelib that we're done with this volume. */ 1005 spdk_reduce_vol_unload(comp_bdev->vol, vbdev_compress_destruct_cb, comp_bdev); 1006 } else { 1007 vbdev_compress_destruct_cb(comp_bdev, 0); 1008 } 1009 1010 return 0; 1011 } 1012 1013 /* We supplied this as an entry point for upper layers who want to communicate to this 1014 * bdev. This is how they get a channel. 1015 */ 1016 static struct spdk_io_channel * 1017 vbdev_compress_get_io_channel(void *ctx) 1018 { 1019 struct vbdev_compress *comp_bdev = (struct vbdev_compress *)ctx; 1020 1021 /* The IO channel code will allocate a channel for us which consists of 1022 * the SPDK channel structure plus the size of our comp_io_channel struct 1023 * that we passed in when we registered our IO device. It will then call 1024 * our channel create callback to populate any elements that we need to 1025 * update. 1026 */ 1027 return spdk_get_io_channel(comp_bdev); 1028 } 1029 1030 /* This is the output for bdev_get_bdevs() for this vbdev */ 1031 static int 1032 vbdev_compress_dump_info_json(void *ctx, struct spdk_json_write_ctx *w) 1033 { 1034 struct vbdev_compress *comp_bdev = (struct vbdev_compress *)ctx; 1035 1036 spdk_json_write_name(w, "compress"); 1037 spdk_json_write_object_begin(w); 1038 spdk_json_write_named_string(w, "name", spdk_bdev_get_name(&comp_bdev->comp_bdev)); 1039 spdk_json_write_named_string(w, "base_bdev_name", spdk_bdev_get_name(comp_bdev->base_bdev)); 1040 spdk_json_write_named_string(w, "compression_pmd", comp_bdev->drv_name); 1041 spdk_json_write_object_end(w); 1042 1043 return 0; 1044 } 1045 1046 /* This is used to generate JSON that can configure this module to its current state. */ 1047 static int 1048 vbdev_compress_config_json(struct spdk_json_write_ctx *w) 1049 { 1050 struct vbdev_compress *comp_bdev; 1051 1052 TAILQ_FOREACH(comp_bdev, &g_vbdev_comp, link) { 1053 spdk_json_write_object_begin(w); 1054 spdk_json_write_named_string(w, "method", "bdev_compress_create"); 1055 spdk_json_write_named_object_begin(w, "params"); 1056 spdk_json_write_named_string(w, "base_bdev_name", spdk_bdev_get_name(comp_bdev->base_bdev)); 1057 spdk_json_write_named_string(w, "name", spdk_bdev_get_name(&comp_bdev->comp_bdev)); 1058 spdk_json_write_named_string(w, "compression_pmd", comp_bdev->drv_name); 1059 spdk_json_write_object_end(w); 1060 spdk_json_write_object_end(w); 1061 } 1062 return 0; 1063 } 1064 1065 /* Callback from reduce for when init is complete. We'll pass the vbdev_comp struct 1066 * used for initial metadata operations to claim where it will be further filled out 1067 * and added to the global list. 1068 */ 1069 static void 1070 vbdev_reduce_init_cb(void *cb_arg, struct spdk_reduce_vol *vol, int reduce_errno) 1071 { 1072 struct vbdev_compress *meta_ctx = cb_arg; 1073 1074 /* We're done with metadata operations */ 1075 spdk_put_io_channel(meta_ctx->base_ch); 1076 spdk_bdev_close(meta_ctx->base_desc); 1077 meta_ctx->base_desc = NULL; 1078 1079 if (reduce_errno == 0) { 1080 meta_ctx->vol = vol; 1081 vbdev_compress_claim(meta_ctx); 1082 } else { 1083 SPDK_ERRLOG("for vol %s, error %u\n", 1084 spdk_bdev_get_name(meta_ctx->base_bdev), reduce_errno); 1085 free(meta_ctx); 1086 } 1087 } 1088 1089 /* Callback for the function used by reduceLib to perform IO to/from the backing device. We just 1090 * call the callback provided by reduceLib when it called the read/write/unmap function and 1091 * free the bdev_io. 1092 */ 1093 static void 1094 comp_reduce_io_cb(struct spdk_bdev_io *bdev_io, bool success, void *arg) 1095 { 1096 struct spdk_reduce_vol_cb_args *cb_args = arg; 1097 int reduce_errno; 1098 1099 if (success) { 1100 reduce_errno = 0; 1101 } else { 1102 reduce_errno = -EIO; 1103 } 1104 spdk_bdev_free_io(bdev_io); 1105 cb_args->cb_fn(cb_args->cb_arg, reduce_errno); 1106 } 1107 1108 /* This is the function provided to the reduceLib for sending reads directly to 1109 * the backing device. 1110 */ 1111 static void 1112 _comp_reduce_readv(struct spdk_reduce_backing_dev *dev, struct iovec *iov, int iovcnt, 1113 uint64_t lba, uint32_t lba_count, struct spdk_reduce_vol_cb_args *args) 1114 { 1115 struct vbdev_compress *comp_bdev = SPDK_CONTAINEROF(dev, struct vbdev_compress, 1116 backing_dev); 1117 int rc; 1118 1119 rc = spdk_bdev_readv_blocks(comp_bdev->base_desc, comp_bdev->base_ch, 1120 iov, iovcnt, lba, lba_count, 1121 comp_reduce_io_cb, 1122 args); 1123 if (rc) { 1124 if (rc == -ENOMEM) { 1125 SPDK_ERRLOG("No memory, start to queue io.\n"); 1126 /* TODO: there's no bdev_io to queue */ 1127 } else { 1128 SPDK_ERRLOG("submitting readv request\n"); 1129 } 1130 args->cb_fn(args->cb_arg, rc); 1131 } 1132 } 1133 1134 /* This is the function provided to the reduceLib for sending writes directly to 1135 * the backing device. 1136 */ 1137 static void 1138 _comp_reduce_writev(struct spdk_reduce_backing_dev *dev, struct iovec *iov, int iovcnt, 1139 uint64_t lba, uint32_t lba_count, struct spdk_reduce_vol_cb_args *args) 1140 { 1141 struct vbdev_compress *comp_bdev = SPDK_CONTAINEROF(dev, struct vbdev_compress, 1142 backing_dev); 1143 int rc; 1144 1145 rc = spdk_bdev_writev_blocks(comp_bdev->base_desc, comp_bdev->base_ch, 1146 iov, iovcnt, lba, lba_count, 1147 comp_reduce_io_cb, 1148 args); 1149 if (rc) { 1150 if (rc == -ENOMEM) { 1151 SPDK_ERRLOG("No memory, start to queue io.\n"); 1152 /* TODO: there's no bdev_io to queue */ 1153 } else { 1154 SPDK_ERRLOG("error submitting writev request\n"); 1155 } 1156 args->cb_fn(args->cb_arg, rc); 1157 } 1158 } 1159 1160 /* This is the function provided to the reduceLib for sending unmaps directly to 1161 * the backing device. 1162 */ 1163 static void 1164 _comp_reduce_unmap(struct spdk_reduce_backing_dev *dev, 1165 uint64_t lba, uint32_t lba_count, struct spdk_reduce_vol_cb_args *args) 1166 { 1167 struct vbdev_compress *comp_bdev = SPDK_CONTAINEROF(dev, struct vbdev_compress, 1168 backing_dev); 1169 int rc; 1170 1171 rc = spdk_bdev_unmap_blocks(comp_bdev->base_desc, comp_bdev->base_ch, 1172 lba, lba_count, 1173 comp_reduce_io_cb, 1174 args); 1175 1176 if (rc) { 1177 if (rc == -ENOMEM) { 1178 SPDK_ERRLOG("No memory, start to queue io.\n"); 1179 /* TODO: there's no bdev_io to queue */ 1180 } else { 1181 SPDK_ERRLOG("submitting unmap request\n"); 1182 } 1183 args->cb_fn(args->cb_arg, rc); 1184 } 1185 } 1186 1187 /* Called by reduceLib after performing unload vol actions following base bdev hotremove */ 1188 static void 1189 bdev_hotremove_vol_unload_cb(void *cb_arg, int reduce_errno) 1190 { 1191 struct vbdev_compress *comp_bdev = (struct vbdev_compress *)cb_arg; 1192 1193 if (reduce_errno) { 1194 SPDK_ERRLOG("number %d\n", reduce_errno); 1195 } 1196 1197 comp_bdev->vol = NULL; 1198 spdk_bdev_unregister(&comp_bdev->comp_bdev, NULL, NULL); 1199 } 1200 1201 /* Called when the underlying base bdev goes away. */ 1202 static void 1203 vbdev_compress_base_bdev_hotremove_cb(void *ctx) 1204 { 1205 struct vbdev_compress *comp_bdev, *tmp; 1206 struct spdk_bdev *bdev_find = ctx; 1207 1208 TAILQ_FOREACH_SAFE(comp_bdev, &g_vbdev_comp, link, tmp) { 1209 if (bdev_find == comp_bdev->base_bdev) { 1210 /* Tell reduceLib that we're done with this volume. */ 1211 spdk_reduce_vol_unload(comp_bdev->vol, bdev_hotremove_vol_unload_cb, comp_bdev); 1212 } 1213 } 1214 } 1215 1216 /* TODO: determine which parms we want user configurable, HC for now 1217 * params.vol_size 1218 * params.chunk_size 1219 * compression PMD, algorithm, window size, comp level, etc. 1220 * DEV_MD_PATH 1221 */ 1222 1223 /* Common function for init and load to allocate and populate the minimal 1224 * information for reducelib to init or load. 1225 */ 1226 struct vbdev_compress * 1227 _prepare_for_load_init(struct spdk_bdev *bdev) 1228 { 1229 struct vbdev_compress *meta_ctx; 1230 1231 meta_ctx = calloc(1, sizeof(struct vbdev_compress)); 1232 if (meta_ctx == NULL) { 1233 SPDK_ERRLOG("failed to alloc init contexts\n"); 1234 return NULL; 1235 } 1236 1237 meta_ctx->drv_name = "None"; 1238 meta_ctx->base_bdev = bdev; 1239 meta_ctx->backing_dev.unmap = _comp_reduce_unmap; 1240 meta_ctx->backing_dev.readv = _comp_reduce_readv; 1241 meta_ctx->backing_dev.writev = _comp_reduce_writev; 1242 meta_ctx->backing_dev.compress = _comp_reduce_compress; 1243 meta_ctx->backing_dev.decompress = _comp_reduce_decompress; 1244 1245 meta_ctx->backing_dev.blocklen = bdev->blocklen; 1246 meta_ctx->backing_dev.blockcnt = bdev->blockcnt; 1247 1248 meta_ctx->params.chunk_size = CHUNK_SIZE; 1249 meta_ctx->params.logical_block_size = bdev->blocklen; 1250 meta_ctx->params.backing_io_unit_size = BACKING_IO_SZ; 1251 return meta_ctx; 1252 } 1253 1254 static bool 1255 _set_pmd(struct vbdev_compress *comp_dev) 1256 { 1257 if (g_opts == COMPRESS_PMD_AUTO) { 1258 if (g_qat_available) { 1259 comp_dev->drv_name = QAT_PMD; 1260 } else { 1261 comp_dev->drv_name = ISAL_PMD; 1262 } 1263 } else if (g_opts == COMPRESS_PMD_QAT_ONLY && g_qat_available) { 1264 comp_dev->drv_name = QAT_PMD; 1265 } else if (g_opts == COMPRESS_PMD_ISAL_ONLY && g_isal_available) { 1266 comp_dev->drv_name = ISAL_PMD; 1267 } else { 1268 SPDK_ERRLOG("Requested PMD is not available.\n"); 1269 return false; 1270 } 1271 SPDK_NOTICELOG("PMD being used: %s\n", comp_dev->drv_name); 1272 return true; 1273 } 1274 1275 /* Call reducelib to initialize a new volume */ 1276 static int 1277 vbdev_init_reduce(struct spdk_bdev *bdev, const char *pm_path) 1278 { 1279 struct vbdev_compress *meta_ctx; 1280 int rc; 1281 1282 meta_ctx = _prepare_for_load_init(bdev); 1283 if (meta_ctx == NULL) { 1284 return -EINVAL; 1285 } 1286 1287 if (_set_pmd(meta_ctx) == false) { 1288 SPDK_ERRLOG("could not find required pmd\n"); 1289 free(meta_ctx); 1290 return -EINVAL; 1291 } 1292 1293 rc = spdk_bdev_open(meta_ctx->base_bdev, true, vbdev_compress_base_bdev_hotremove_cb, 1294 meta_ctx->base_bdev, &meta_ctx->base_desc); 1295 if (rc) { 1296 SPDK_ERRLOG("could not open bdev %s\n", spdk_bdev_get_name(meta_ctx->base_bdev)); 1297 free(meta_ctx); 1298 return -EINVAL; 1299 } 1300 meta_ctx->base_ch = spdk_bdev_get_io_channel(meta_ctx->base_desc); 1301 1302 spdk_reduce_vol_init(&meta_ctx->params, &meta_ctx->backing_dev, 1303 pm_path, 1304 vbdev_reduce_init_cb, 1305 meta_ctx); 1306 return 0; 1307 } 1308 1309 /* We provide this callback for the SPDK channel code to create a channel using 1310 * the channel struct we provided in our module get_io_channel() entry point. Here 1311 * we get and save off an underlying base channel of the device below us so that 1312 * we can communicate with the base bdev on a per channel basis. If we needed 1313 * our own poller for this vbdev, we'd register it here. 1314 */ 1315 static int 1316 comp_bdev_ch_create_cb(void *io_device, void *ctx_buf) 1317 { 1318 struct vbdev_compress *comp_bdev = io_device; 1319 struct comp_device_qp *device_qp; 1320 1321 /* We use this queue to track outstanding IO in our layer. */ 1322 TAILQ_INIT(&comp_bdev->pending_comp_ios); 1323 1324 /* We use this to queue up compression operations as needed. */ 1325 TAILQ_INIT(&comp_bdev->queued_comp_ops); 1326 1327 /* Now set the reduce channel if it's not already set. */ 1328 pthread_mutex_lock(&comp_bdev->reduce_lock); 1329 if (comp_bdev->ch_count == 0) { 1330 comp_bdev->base_ch = spdk_bdev_get_io_channel(comp_bdev->base_desc); 1331 comp_bdev->reduce_thread = spdk_get_thread(); 1332 comp_bdev->poller = spdk_poller_register(comp_dev_poller, comp_bdev, 0); 1333 /* Now assign a q pair */ 1334 pthread_mutex_lock(&g_comp_device_qp_lock); 1335 TAILQ_FOREACH(device_qp, &g_comp_device_qp, link) { 1336 if ((strcmp(device_qp->device->cdev_info.driver_name, comp_bdev->drv_name) == 0)) { 1337 if (device_qp->thread == spdk_get_thread()) { 1338 comp_bdev->device_qp = device_qp; 1339 break; 1340 } 1341 if (device_qp->thread == NULL) { 1342 comp_bdev->device_qp = device_qp; 1343 device_qp->thread = spdk_get_thread(); 1344 break; 1345 } 1346 } 1347 } 1348 pthread_mutex_unlock(&g_comp_device_qp_lock); 1349 } 1350 comp_bdev->ch_count++; 1351 pthread_mutex_unlock(&comp_bdev->reduce_lock); 1352 1353 if (comp_bdev->device_qp != NULL) { 1354 return 0; 1355 } else { 1356 SPDK_ERRLOG("out of qpairs, cannot assign one to comp_bdev %p\n", comp_bdev); 1357 assert(false); 1358 return -ENOMEM; 1359 } 1360 } 1361 1362 static void 1363 _channel_cleanup(struct vbdev_compress *comp_bdev) 1364 { 1365 /* Note: comp_bdevs can share a device_qp if they are 1366 * on the same thread so we leave the device_qp element 1367 * alone for this comp_bdev and just clear the reduce thread. 1368 */ 1369 spdk_put_io_channel(comp_bdev->base_ch); 1370 comp_bdev->reduce_thread = NULL; 1371 spdk_poller_unregister(&comp_bdev->poller); 1372 } 1373 1374 /* Used to reroute destroy_ch to the correct thread */ 1375 static void 1376 _comp_bdev_ch_destroy_cb(void *arg) 1377 { 1378 struct vbdev_compress *comp_bdev = arg; 1379 1380 pthread_mutex_lock(&comp_bdev->reduce_lock); 1381 if (comp_bdev->ch_count == 0) { 1382 _channel_cleanup(comp_bdev); 1383 } 1384 pthread_mutex_unlock(&comp_bdev->reduce_lock); 1385 } 1386 1387 /* We provide this callback for the SPDK channel code to destroy a channel 1388 * created with our create callback. We just need to undo anything we did 1389 * when we created. If this bdev used its own poller, we'd unregister it here. 1390 */ 1391 static void 1392 comp_bdev_ch_destroy_cb(void *io_device, void *ctx_buf) 1393 { 1394 struct vbdev_compress *comp_bdev = io_device; 1395 1396 pthread_mutex_lock(&comp_bdev->reduce_lock); 1397 comp_bdev->ch_count--; 1398 if (comp_bdev->ch_count == 0) { 1399 /* Send this request to the thread where the channel was created. */ 1400 if (comp_bdev->reduce_thread != spdk_get_thread()) { 1401 spdk_thread_send_msg(comp_bdev->reduce_thread, 1402 _comp_bdev_ch_destroy_cb, comp_bdev); 1403 } else { 1404 _channel_cleanup(comp_bdev); 1405 } 1406 } 1407 pthread_mutex_unlock(&comp_bdev->reduce_lock); 1408 } 1409 1410 /* RPC entry point for compression vbdev creation. */ 1411 int 1412 create_compress_bdev(const char *bdev_name, const char *pm_path) 1413 { 1414 struct spdk_bdev *bdev; 1415 1416 bdev = spdk_bdev_get_by_name(bdev_name); 1417 if (!bdev) { 1418 return -ENODEV; 1419 } 1420 1421 return vbdev_init_reduce(bdev, pm_path);; 1422 } 1423 1424 /* On init, just init the compress drivers. All metadata is stored on disk. */ 1425 static int 1426 vbdev_compress_init(void) 1427 { 1428 if (vbdev_init_compress_drivers()) { 1429 SPDK_ERRLOG("Error setting up compression devices\n"); 1430 return -EINVAL; 1431 } 1432 1433 return 0; 1434 } 1435 1436 /* Called when the entire module is being torn down. */ 1437 static void 1438 vbdev_compress_finish(void) 1439 { 1440 struct comp_device_qp *dev_qp; 1441 /* TODO: unload vol in a future patch */ 1442 1443 while ((dev_qp = TAILQ_FIRST(&g_comp_device_qp))) { 1444 TAILQ_REMOVE(&g_comp_device_qp, dev_qp, link); 1445 free(dev_qp); 1446 } 1447 pthread_mutex_destroy(&g_comp_device_qp_lock); 1448 1449 rte_mempool_free(g_comp_op_mp); 1450 rte_mempool_free(g_mbuf_mp); 1451 } 1452 1453 /* During init we'll be asked how much memory we'd like passed to us 1454 * in bev_io structures as context. Here's where we specify how 1455 * much context we want per IO. 1456 */ 1457 static int 1458 vbdev_compress_get_ctx_size(void) 1459 { 1460 return sizeof(struct comp_bdev_io); 1461 } 1462 1463 /* When we register our bdev this is how we specify our entry points. */ 1464 static const struct spdk_bdev_fn_table vbdev_compress_fn_table = { 1465 .destruct = vbdev_compress_destruct, 1466 .submit_request = vbdev_compress_submit_request, 1467 .io_type_supported = vbdev_compress_io_type_supported, 1468 .get_io_channel = vbdev_compress_get_io_channel, 1469 .dump_info_json = vbdev_compress_dump_info_json, 1470 .write_config_json = NULL, 1471 }; 1472 1473 static struct spdk_bdev_module compress_if = { 1474 .name = "compress", 1475 .module_init = vbdev_compress_init, 1476 .config_text = NULL, 1477 .get_ctx_size = vbdev_compress_get_ctx_size, 1478 .examine_disk = vbdev_compress_examine, 1479 .module_fini = vbdev_compress_finish, 1480 .config_json = vbdev_compress_config_json 1481 }; 1482 1483 SPDK_BDEV_MODULE_REGISTER(compress, &compress_if) 1484 1485 static int _set_compbdev_name(struct vbdev_compress *comp_bdev) 1486 { 1487 struct spdk_bdev_alias *aliases; 1488 1489 if (!TAILQ_EMPTY(spdk_bdev_get_aliases(comp_bdev->base_bdev))) { 1490 aliases = TAILQ_FIRST(spdk_bdev_get_aliases(comp_bdev->base_bdev)); 1491 comp_bdev->comp_bdev.name = spdk_sprintf_alloc("COMP_%s", aliases->alias); 1492 if (!comp_bdev->comp_bdev.name) { 1493 SPDK_ERRLOG("could not allocate comp_bdev name for alias\n"); 1494 return -ENOMEM; 1495 } 1496 } else { 1497 comp_bdev->comp_bdev.name = spdk_sprintf_alloc("COMP_%s", comp_bdev->base_bdev->name); 1498 if (!comp_bdev->comp_bdev.name) { 1499 SPDK_ERRLOG("could not allocate comp_bdev name for unique name\n"); 1500 return -ENOMEM; 1501 } 1502 } 1503 return 0; 1504 } 1505 1506 static void 1507 vbdev_compress_claim(struct vbdev_compress *comp_bdev) 1508 { 1509 int rc; 1510 1511 if (_set_compbdev_name(comp_bdev)) { 1512 goto error_bdev_name; 1513 } 1514 1515 /* Note: some of the fields below will change in the future - for example, 1516 * blockcnt specifically will not match (the compressed volume size will 1517 * be slightly less than the base bdev size) 1518 */ 1519 comp_bdev->comp_bdev.product_name = COMP_BDEV_NAME; 1520 comp_bdev->comp_bdev.write_cache = comp_bdev->base_bdev->write_cache; 1521 1522 if (strcmp(comp_bdev->drv_name, QAT_PMD) == 0) { 1523 comp_bdev->comp_bdev.required_alignment = 1524 spdk_max(spdk_u32log2(comp_bdev->base_bdev->blocklen), 1525 comp_bdev->base_bdev->required_alignment); 1526 SPDK_NOTICELOG("QAT in use: Required alignment set to %u\n", 1527 comp_bdev->comp_bdev.required_alignment); 1528 } else { 1529 comp_bdev->comp_bdev.required_alignment = comp_bdev->base_bdev->required_alignment; 1530 } 1531 comp_bdev->comp_bdev.optimal_io_boundary = 1532 comp_bdev->params.chunk_size / comp_bdev->params.logical_block_size; 1533 1534 comp_bdev->comp_bdev.split_on_optimal_io_boundary = true; 1535 1536 comp_bdev->comp_bdev.blocklen = comp_bdev->base_bdev->blocklen; 1537 comp_bdev->comp_bdev.blockcnt = comp_bdev->params.vol_size / comp_bdev->comp_bdev.blocklen; 1538 assert(comp_bdev->comp_bdev.blockcnt > 0); 1539 1540 /* This is the context that is passed to us when the bdev 1541 * layer calls in so we'll save our comp_bdev node here. 1542 */ 1543 comp_bdev->comp_bdev.ctxt = comp_bdev; 1544 comp_bdev->comp_bdev.fn_table = &vbdev_compress_fn_table; 1545 comp_bdev->comp_bdev.module = &compress_if; 1546 1547 pthread_mutex_init(&comp_bdev->reduce_lock, NULL); 1548 1549 TAILQ_INSERT_TAIL(&g_vbdev_comp, comp_bdev, link); 1550 1551 rc = spdk_bdev_open(comp_bdev->base_bdev, true, vbdev_compress_base_bdev_hotremove_cb, 1552 comp_bdev->base_bdev, &comp_bdev->base_desc); 1553 if (rc) { 1554 SPDK_ERRLOG("could not open bdev %s\n", spdk_bdev_get_name(comp_bdev->base_bdev)); 1555 goto error_open; 1556 } 1557 1558 spdk_io_device_register(comp_bdev, comp_bdev_ch_create_cb, comp_bdev_ch_destroy_cb, 1559 sizeof(struct comp_io_channel), 1560 comp_bdev->comp_bdev.name); 1561 1562 rc = spdk_bdev_module_claim_bdev(comp_bdev->base_bdev, comp_bdev->base_desc, 1563 comp_bdev->comp_bdev.module); 1564 if (rc) { 1565 SPDK_ERRLOG("could not claim bdev %s\n", spdk_bdev_get_name(comp_bdev->base_bdev)); 1566 goto error_claim; 1567 } 1568 1569 rc = spdk_bdev_register(&comp_bdev->comp_bdev); 1570 if (rc < 0) { 1571 SPDK_ERRLOG("trying to register bdev\n"); 1572 goto error_bdev_register; 1573 } 1574 1575 SPDK_NOTICELOG("registered io_device and virtual bdev for: %s\n", comp_bdev->comp_bdev.name); 1576 1577 return; 1578 /* Error cleanup paths. */ 1579 error_bdev_register: 1580 spdk_bdev_module_release_bdev(comp_bdev->base_bdev); 1581 error_claim: 1582 TAILQ_REMOVE(&g_vbdev_comp, comp_bdev, link); 1583 spdk_io_device_unregister(comp_bdev, NULL); 1584 error_open: 1585 free(comp_bdev->comp_bdev.name); 1586 error_bdev_name: 1587 spdk_put_io_channel(comp_bdev->base_ch); 1588 spdk_bdev_close(comp_bdev->base_desc); 1589 free(comp_bdev); 1590 spdk_bdev_module_examine_done(&compress_if); 1591 } 1592 1593 void 1594 bdev_compress_delete(const char *name, spdk_delete_compress_complete cb_fn, void *cb_arg) 1595 { 1596 struct vbdev_compress *comp_bdev = NULL; 1597 1598 TAILQ_FOREACH(comp_bdev, &g_vbdev_comp, link) { 1599 if (strcmp(name, comp_bdev->comp_bdev.name) == 0) { 1600 break; 1601 } 1602 } 1603 1604 if (comp_bdev == NULL) { 1605 cb_fn(cb_arg, -ENODEV); 1606 return; 1607 } 1608 1609 /* Save these for after the vol is destroyed. */ 1610 comp_bdev->delete_cb_fn = cb_fn; 1611 comp_bdev->delete_cb_arg = cb_arg; 1612 1613 /* Tell reducelib that we're done with this volume. */ 1614 if (comp_bdev->orphaned == false) { 1615 spdk_reduce_vol_unload(comp_bdev->vol, delete_vol_unload_cb, comp_bdev); 1616 } else { 1617 delete_vol_unload_cb(comp_bdev, 0); 1618 } 1619 } 1620 1621 /* Callback from reduce for then load is complete. We'll pass the vbdev_comp struct 1622 * used for initial metadata operations to claim where it will be further filled out 1623 * and added to the global list. 1624 */ 1625 static void 1626 vbdev_reduce_load_cb(void *cb_arg, struct spdk_reduce_vol *vol, int reduce_errno) 1627 { 1628 struct vbdev_compress *meta_ctx = cb_arg; 1629 int rc; 1630 1631 /* Done with metadata operations */ 1632 spdk_put_io_channel(meta_ctx->base_ch); 1633 spdk_bdev_close(meta_ctx->base_desc); 1634 meta_ctx->base_desc = NULL; 1635 1636 if (reduce_errno != 0 && reduce_errno != -ENOENT) { 1637 /* This error means it is not a compress disk. */ 1638 if (reduce_errno != -EILSEQ) { 1639 SPDK_ERRLOG("for vol %s, error %u\n", 1640 spdk_bdev_get_name(meta_ctx->base_bdev), reduce_errno); 1641 } 1642 free(meta_ctx); 1643 spdk_bdev_module_examine_done(&compress_if); 1644 return; 1645 } 1646 1647 /* this status means that the vol could not be loaded because 1648 * the pmem file can't be found. 1649 */ 1650 if (reduce_errno == -ENOENT) { 1651 if (_set_compbdev_name(meta_ctx)) { 1652 goto err; 1653 } 1654 1655 /* We still want to open and claim the backing device to protect the data until 1656 * either the pm metadata file is recovered or the comp bdev is deleted. 1657 */ 1658 rc = spdk_bdev_open(meta_ctx->base_bdev, true, vbdev_compress_base_bdev_hotremove_cb, 1659 meta_ctx->base_bdev, &meta_ctx->base_desc); 1660 if (rc) { 1661 SPDK_ERRLOG("could not open bdev %s\n", spdk_bdev_get_name(meta_ctx->base_bdev)); 1662 goto err; 1663 } 1664 1665 meta_ctx->comp_bdev.module = &compress_if; 1666 pthread_mutex_init(&meta_ctx->reduce_lock, NULL); 1667 rc = spdk_bdev_module_claim_bdev(meta_ctx->base_bdev, meta_ctx->base_desc, 1668 meta_ctx->comp_bdev.module); 1669 if (rc) { 1670 SPDK_ERRLOG("could not claim bdev %s\n", spdk_bdev_get_name(meta_ctx->base_bdev)); 1671 goto err; 1672 } 1673 1674 meta_ctx->orphaned = true; 1675 TAILQ_INSERT_TAIL(&g_vbdev_comp, meta_ctx, link); 1676 err: 1677 spdk_bdev_module_examine_done(&compress_if); 1678 return; 1679 } 1680 1681 if (_set_pmd(meta_ctx) == false) { 1682 SPDK_ERRLOG("could not find required pmd\n"); 1683 free(meta_ctx); 1684 spdk_bdev_module_examine_done(&compress_if); 1685 return; 1686 } 1687 1688 /* Update information following volume load. */ 1689 meta_ctx->vol = vol; 1690 memcpy(&meta_ctx->params, spdk_reduce_vol_get_params(vol), 1691 sizeof(struct spdk_reduce_vol_params)); 1692 vbdev_compress_claim(meta_ctx); 1693 spdk_bdev_module_examine_done(&compress_if); 1694 } 1695 1696 /* Examine_disk entry point: will do a metadata load to see if this is ours, 1697 * and if so will go ahead and claim it. 1698 */ 1699 static void 1700 vbdev_compress_examine(struct spdk_bdev *bdev) 1701 { 1702 struct vbdev_compress *meta_ctx; 1703 int rc; 1704 1705 if (strcmp(bdev->product_name, COMP_BDEV_NAME) == 0) { 1706 spdk_bdev_module_examine_done(&compress_if); 1707 return; 1708 } 1709 1710 meta_ctx = _prepare_for_load_init(bdev); 1711 if (meta_ctx == NULL) { 1712 spdk_bdev_module_examine_done(&compress_if); 1713 return; 1714 } 1715 1716 rc = spdk_bdev_open(meta_ctx->base_bdev, false, vbdev_compress_base_bdev_hotremove_cb, 1717 meta_ctx->base_bdev, &meta_ctx->base_desc); 1718 if (rc) { 1719 SPDK_ERRLOG("could not open bdev %s\n", spdk_bdev_get_name(meta_ctx->base_bdev)); 1720 free(meta_ctx); 1721 spdk_bdev_module_examine_done(&compress_if); 1722 return; 1723 } 1724 1725 meta_ctx->base_ch = spdk_bdev_get_io_channel(meta_ctx->base_desc); 1726 spdk_reduce_vol_load(&meta_ctx->backing_dev, vbdev_reduce_load_cb, meta_ctx); 1727 } 1728 1729 int 1730 set_compress_pmd(enum compress_pmd *opts) 1731 { 1732 g_opts = *opts; 1733 1734 return 0; 1735 } 1736 1737 SPDK_LOG_REGISTER_COMPONENT("vbdev_compress", SPDK_LOG_VBDEV_COMPRESS) 1738