1 /*- 2 * BSD LICENSE 3 * 4 * Copyright (c) Intel Corporation. 5 * All rights reserved. 6 * 7 * Redistribution and use in source and binary forms, with or without 8 * modification, are permitted provided that the following conditions 9 * are met: 10 * 11 * * Redistributions of source code must retain the above copyright 12 * notice, this list of conditions and the following disclaimer. 13 * * Redistributions in binary form must reproduce the above copyright 14 * notice, this list of conditions and the following disclaimer in 15 * the documentation and/or other materials provided with the 16 * distribution. 17 * * Neither the name of Intel Corporation nor the names of its 18 * contributors may be used to endorse or promote products derived 19 * from this software without specific prior written permission. 20 * 21 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 22 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 23 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR 24 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT 25 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, 26 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT 27 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 28 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY 29 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 30 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE 31 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 32 */ 33 34 #include "vbdev_compress.h" 35 36 #include "spdk/reduce.h" 37 #include "spdk/stdinc.h" 38 #include "spdk/rpc.h" 39 #include "spdk/env.h" 40 #include "spdk/conf.h" 41 #include "spdk/endian.h" 42 #include "spdk/string.h" 43 #include "spdk/thread.h" 44 #include "spdk/util.h" 45 #include "spdk/bdev_module.h" 46 47 #include "spdk_internal/log.h" 48 49 #include <rte_config.h> 50 #include <rte_bus_vdev.h> 51 #include <rte_compressdev.h> 52 #include <rte_comp.h> 53 54 #define NUM_MAX_XFORMS 2 55 #define NUM_MAX_INFLIGHT_OPS 128 56 #define DEFAULT_WINDOW_SIZE 15 57 /* We need extra mbufs per operation to accommodate host buffers that 58 * span a 2MB boundary. 59 */ 60 #define MAX_MBUFS_PER_OP (REDUCE_MAX_IOVECS * 2) 61 #define CHUNK_SIZE (1024 * 16) 62 #define COMP_BDEV_NAME "compress" 63 #define BACKING_IO_SZ (4 * 1024) 64 65 #define ISAL_PMD "compress_isal" 66 #define QAT_PMD "compress_qat" 67 #define NUM_MBUFS 8192 68 #define POOL_CACHE_SIZE 256 69 70 static enum compress_pmd g_opts; 71 72 /* Global list of available compression devices. */ 73 struct compress_dev { 74 struct rte_compressdev_info cdev_info; /* includes device friendly name */ 75 uint8_t cdev_id; /* identifier for the device */ 76 void *comp_xform; /* shared private xform for comp on this PMD */ 77 void *decomp_xform; /* shared private xform for decomp on this PMD */ 78 TAILQ_ENTRY(compress_dev) link; 79 }; 80 static TAILQ_HEAD(, compress_dev) g_compress_devs = TAILQ_HEAD_INITIALIZER(g_compress_devs); 81 82 /* Although ISAL PMD reports 'unlimited' qpairs, it has an unplanned limit of 99 due to 83 * the length of the internal ring name that it creates, it breaks a limit in the generic 84 * ring code and fails the qp initialization. 85 */ 86 #define MAX_NUM_QP 99 87 /* Global list and lock for unique device/queue pair combos */ 88 struct comp_device_qp { 89 struct compress_dev *device; /* ptr to compression device */ 90 uint8_t qp; /* queue pair for this node */ 91 struct spdk_thread *thread; /* thead that this qp is assigned to */ 92 TAILQ_ENTRY(comp_device_qp) link; 93 }; 94 static TAILQ_HEAD(, comp_device_qp) g_comp_device_qp = TAILQ_HEAD_INITIALIZER(g_comp_device_qp); 95 static pthread_mutex_t g_comp_device_qp_lock = PTHREAD_MUTEX_INITIALIZER; 96 97 /* For queueing up compression operations that we can't submit for some reason */ 98 struct vbdev_comp_op { 99 struct spdk_reduce_backing_dev *backing_dev; 100 struct iovec *src_iovs; 101 int src_iovcnt; 102 struct iovec *dst_iovs; 103 int dst_iovcnt; 104 bool compress; 105 void *cb_arg; 106 TAILQ_ENTRY(vbdev_comp_op) link; 107 }; 108 109 /* List of virtual bdevs and associated info for each. */ 110 struct vbdev_compress { 111 struct spdk_bdev *base_bdev; /* the thing we're attaching to */ 112 struct spdk_bdev_desc *base_desc; /* its descriptor we get from open */ 113 struct spdk_io_channel *base_ch; /* IO channel of base device */ 114 struct spdk_bdev comp_bdev; /* the compression virtual bdev */ 115 struct comp_io_channel *comp_ch; /* channel associated with this bdev */ 116 char *drv_name; /* name of the compression device driver */ 117 struct comp_device_qp *device_qp; 118 struct spdk_thread *reduce_thread; 119 pthread_mutex_t reduce_lock; 120 uint32_t ch_count; 121 TAILQ_HEAD(, spdk_bdev_io) pending_comp_ios; /* outstanding operations to a comp library */ 122 struct spdk_poller *poller; /* completion poller */ 123 struct spdk_reduce_vol_params params; /* params for the reduce volume */ 124 struct spdk_reduce_backing_dev backing_dev; /* backing device info for the reduce volume */ 125 struct spdk_reduce_vol *vol; /* the reduce volume */ 126 spdk_delete_compress_complete delete_cb_fn; 127 void *delete_cb_arg; 128 bool orphaned; /* base bdev claimed but comp_bdev not registered */ 129 TAILQ_HEAD(, vbdev_comp_op) queued_comp_ops; 130 TAILQ_ENTRY(vbdev_compress) link; 131 }; 132 static TAILQ_HEAD(, vbdev_compress) g_vbdev_comp = TAILQ_HEAD_INITIALIZER(g_vbdev_comp); 133 134 /* The comp vbdev channel struct. It is allocated and freed on my behalf by the io channel code. 135 */ 136 struct comp_io_channel { 137 struct spdk_io_channel_iter *iter; /* used with for_each_channel in reset */ 138 }; 139 140 /* Per I/O context for the compression vbdev. */ 141 struct comp_bdev_io { 142 struct comp_io_channel *comp_ch; /* used in completion handling */ 143 struct vbdev_compress *comp_bdev; /* vbdev associated with this IO */ 144 struct spdk_bdev_io_wait_entry bdev_io_wait; /* for bdev_io_wait */ 145 struct spdk_bdev_io *orig_io; /* the original IO */ 146 struct spdk_io_channel *ch; /* for resubmission */ 147 int status; /* save for completion on orig thread */ 148 }; 149 150 /* Shared mempools between all devices on this system */ 151 static struct rte_mempool *g_mbuf_mp = NULL; /* mbuf mempool */ 152 static struct rte_mempool *g_comp_op_mp = NULL; /* comp operations, must be rte* mempool */ 153 static struct rte_mbuf_ext_shared_info g_shinfo = {}; /* used by DPDK mbuf macros */ 154 static bool g_qat_available = false; 155 static bool g_isal_available = false; 156 157 /* Create shared (between all ops per PMD) compress xforms. */ 158 static struct rte_comp_xform g_comp_xform = { 159 .type = RTE_COMP_COMPRESS, 160 .compress = { 161 .algo = RTE_COMP_ALGO_DEFLATE, 162 .deflate.huffman = RTE_COMP_HUFFMAN_DEFAULT, 163 .level = RTE_COMP_LEVEL_MAX, 164 .window_size = DEFAULT_WINDOW_SIZE, 165 .chksum = RTE_COMP_CHECKSUM_NONE, 166 .hash_algo = RTE_COMP_HASH_ALGO_NONE 167 } 168 }; 169 /* Create shared (between all ops per PMD) decompress xforms. */ 170 static struct rte_comp_xform g_decomp_xform = { 171 .type = RTE_COMP_DECOMPRESS, 172 .decompress = { 173 .algo = RTE_COMP_ALGO_DEFLATE, 174 .chksum = RTE_COMP_CHECKSUM_NONE, 175 .window_size = DEFAULT_WINDOW_SIZE, 176 .hash_algo = RTE_COMP_HASH_ALGO_NONE 177 } 178 }; 179 180 static void vbdev_compress_examine(struct spdk_bdev *bdev); 181 static void vbdev_compress_claim(struct vbdev_compress *comp_bdev); 182 static void vbdev_compress_queue_io(struct spdk_bdev_io *bdev_io); 183 struct vbdev_compress *_prepare_for_load_init(struct spdk_bdev *bdev); 184 static void vbdev_compress_submit_request(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io); 185 static void comp_bdev_ch_destroy_cb(void *io_device, void *ctx_buf); 186 187 /* Dummy function used by DPDK to free ext attached buffers 188 * to mbufs, we free them ourselves but this callback has to 189 * be here. 190 */ 191 static void 192 shinfo_free_cb(void *arg1, void *arg2) 193 { 194 } 195 196 /* Called by vbdev_init_compress_drivers() to init each discovered compression device */ 197 static int 198 create_compress_dev(uint8_t index) 199 { 200 struct compress_dev *device; 201 uint16_t q_pairs; 202 uint8_t cdev_id; 203 int rc, i; 204 struct comp_device_qp *dev_qp; 205 struct comp_device_qp *tmp_qp; 206 207 device = calloc(1, sizeof(struct compress_dev)); 208 if (!device) { 209 return -ENOMEM; 210 } 211 212 /* Get details about this device. */ 213 rte_compressdev_info_get(index, &device->cdev_info); 214 215 cdev_id = device->cdev_id = index; 216 217 /* Zero means no limit so choose number of lcores. */ 218 if (device->cdev_info.max_nb_queue_pairs == 0) { 219 q_pairs = MAX_NUM_QP; 220 } else { 221 q_pairs = spdk_min(device->cdev_info.max_nb_queue_pairs, MAX_NUM_QP); 222 } 223 224 /* Configure the compression device. */ 225 struct rte_compressdev_config config = { 226 .socket_id = rte_socket_id(), 227 .nb_queue_pairs = q_pairs, 228 .max_nb_priv_xforms = NUM_MAX_XFORMS, 229 .max_nb_streams = 0 230 }; 231 rc = rte_compressdev_configure(cdev_id, &config); 232 if (rc < 0) { 233 SPDK_ERRLOG("Failed to configure compressdev %u\n", cdev_id); 234 goto err; 235 } 236 237 /* Pre-setup all potential qpairs now and assign them in the channel 238 * callback. 239 */ 240 for (i = 0; i < q_pairs; i++) { 241 rc = rte_compressdev_queue_pair_setup(cdev_id, i, 242 NUM_MAX_INFLIGHT_OPS, 243 rte_socket_id()); 244 if (rc) { 245 if (i > 0) { 246 q_pairs = i; 247 SPDK_NOTICELOG("FYI failed to setup a queue pair on " 248 "compressdev %u with error %u " 249 "so limiting to %u qpairs\n", 250 cdev_id, rc, q_pairs); 251 break; 252 } else { 253 SPDK_ERRLOG("Failed to setup queue pair on " 254 "compressdev %u with error %u\n", cdev_id, rc); 255 rc = -EINVAL; 256 goto err; 257 } 258 } 259 } 260 261 rc = rte_compressdev_start(cdev_id); 262 if (rc < 0) { 263 SPDK_ERRLOG("Failed to start device %u: error %d\n", 264 cdev_id, rc); 265 goto err; 266 } 267 268 if (device->cdev_info.capabilities->comp_feature_flags & RTE_COMP_FF_SHAREABLE_PRIV_XFORM) { 269 rc = rte_compressdev_private_xform_create(cdev_id, &g_comp_xform, 270 &device->comp_xform); 271 if (rc < 0) { 272 SPDK_ERRLOG("Failed to create private comp xform device %u: error %d\n", 273 cdev_id, rc); 274 goto err; 275 } 276 277 rc = rte_compressdev_private_xform_create(cdev_id, &g_decomp_xform, 278 &device->decomp_xform); 279 if (rc) { 280 SPDK_ERRLOG("Failed to create private decomp xform device %u: error %d\n", 281 cdev_id, rc); 282 goto err; 283 } 284 } else { 285 SPDK_ERRLOG("PMD does not support shared transforms\n"); 286 goto err; 287 } 288 289 /* Build up list of device/qp combinations */ 290 for (i = 0; i < q_pairs; i++) { 291 dev_qp = calloc(1, sizeof(struct comp_device_qp)); 292 if (!dev_qp) { 293 rc = -ENOMEM; 294 goto err; 295 } 296 dev_qp->device = device; 297 dev_qp->qp = i; 298 dev_qp->thread = NULL; 299 TAILQ_INSERT_TAIL(&g_comp_device_qp, dev_qp, link); 300 } 301 302 TAILQ_INSERT_TAIL(&g_compress_devs, device, link); 303 304 if (strcmp(device->cdev_info.driver_name, QAT_PMD) == 0) { 305 g_qat_available = true; 306 } 307 if (strcmp(device->cdev_info.driver_name, ISAL_PMD) == 0) { 308 g_isal_available = true; 309 } 310 311 return 0; 312 313 err: 314 TAILQ_FOREACH_SAFE(dev_qp, &g_comp_device_qp, link, tmp_qp) { 315 TAILQ_REMOVE(&g_comp_device_qp, dev_qp, link); 316 free(dev_qp); 317 } 318 free(device); 319 return rc; 320 } 321 322 /* Called from driver init entry point, vbdev_compress_init() */ 323 static int 324 vbdev_init_compress_drivers(void) 325 { 326 uint8_t cdev_count, i; 327 struct compress_dev *tmp_dev; 328 struct compress_dev *device; 329 int rc; 330 331 /* We always init the compress_isal PMD */ 332 rc = rte_vdev_init(ISAL_PMD, NULL); 333 if (rc == 0) { 334 SPDK_NOTICELOG("created virtual PMD %s\n", ISAL_PMD); 335 } else if (rc == -EEXIST) { 336 SPDK_NOTICELOG("virtual PMD %s already exists.\n", ISAL_PMD); 337 } else { 338 SPDK_ERRLOG("creating virtual PMD %s\n", ISAL_PMD); 339 return -EINVAL; 340 } 341 342 /* If we have no compression devices, there's no reason to continue. */ 343 cdev_count = rte_compressdev_count(); 344 if (cdev_count == 0) { 345 return 0; 346 } 347 if (cdev_count > RTE_COMPRESS_MAX_DEVS) { 348 SPDK_ERRLOG("invalid device count from rte_compressdev_count()\n"); 349 return -EINVAL; 350 } 351 352 g_mbuf_mp = rte_pktmbuf_pool_create("comp_mbuf_mp", NUM_MBUFS, POOL_CACHE_SIZE, 353 sizeof(struct rte_mbuf), 0, rte_socket_id()); 354 if (g_mbuf_mp == NULL) { 355 SPDK_ERRLOG("Cannot create mbuf pool\n"); 356 rc = -ENOMEM; 357 goto error_create_mbuf; 358 } 359 360 g_comp_op_mp = rte_comp_op_pool_create("comp_op_pool", NUM_MBUFS, POOL_CACHE_SIZE, 361 0, rte_socket_id()); 362 if (g_comp_op_mp == NULL) { 363 SPDK_ERRLOG("Cannot create comp op pool\n"); 364 rc = -ENOMEM; 365 goto error_create_op; 366 } 367 368 /* Init all devices */ 369 for (i = 0; i < cdev_count; i++) { 370 rc = create_compress_dev(i); 371 if (rc != 0) { 372 goto error_create_compress_devs; 373 } 374 } 375 376 if (g_qat_available == true) { 377 SPDK_NOTICELOG("initialized QAT PMD\n"); 378 } 379 380 g_shinfo.free_cb = shinfo_free_cb; 381 382 return 0; 383 384 /* Error cleanup paths. */ 385 error_create_compress_devs: 386 TAILQ_FOREACH_SAFE(device, &g_compress_devs, link, tmp_dev) { 387 TAILQ_REMOVE(&g_compress_devs, device, link); 388 free(device); 389 } 390 error_create_op: 391 error_create_mbuf: 392 rte_mempool_free(g_mbuf_mp); 393 394 return rc; 395 } 396 397 /* for completing rw requests on the orig IO thread. */ 398 static void 399 _spdk_reduce_rw_blocks_cb(void *arg) 400 { 401 struct comp_bdev_io *io_ctx = arg; 402 403 if (io_ctx->status == 0) { 404 spdk_bdev_io_complete(io_ctx->orig_io, SPDK_BDEV_IO_STATUS_SUCCESS); 405 } else { 406 SPDK_ERRLOG("status %d on operation from reduce API\n", io_ctx->status); 407 spdk_bdev_io_complete(io_ctx->orig_io, SPDK_BDEV_IO_STATUS_FAILED); 408 } 409 } 410 411 /* Completion callback for r/w that were issued via reducelib. */ 412 static void 413 spdk_reduce_rw_blocks_cb(void *arg, int reduce_errno) 414 { 415 struct spdk_bdev_io *bdev_io = arg; 416 struct comp_bdev_io *io_ctx = (struct comp_bdev_io *)bdev_io->driver_ctx; 417 struct spdk_io_channel *ch = spdk_io_channel_from_ctx(io_ctx->comp_ch); 418 419 /* TODO: need to decide which error codes are bdev_io success vs failure; 420 * example examine calls reading metadata */ 421 422 io_ctx->status = reduce_errno; 423 424 /* Send this request to the orig IO thread. */ 425 if (spdk_io_channel_get_thread(ch) != spdk_get_thread()) { 426 spdk_thread_send_msg(spdk_io_channel_get_thread(ch), _spdk_reduce_rw_blocks_cb, io_ctx); 427 } else { 428 _spdk_reduce_rw_blocks_cb(io_ctx); 429 } 430 } 431 432 static int 433 _compress_operation(struct spdk_reduce_backing_dev *backing_dev, struct iovec *src_iovs, 434 int src_iovcnt, struct iovec *dst_iovs, 435 int dst_iovcnt, bool compress, void *cb_arg) 436 { 437 void *reduce_cb_arg = cb_arg; 438 struct vbdev_compress *comp_bdev = SPDK_CONTAINEROF(backing_dev, struct vbdev_compress, 439 backing_dev); 440 struct rte_comp_op *comp_op; 441 struct rte_mbuf *src_mbufs[MAX_MBUFS_PER_OP]; 442 struct rte_mbuf *dst_mbufs[MAX_MBUFS_PER_OP]; 443 uint8_t cdev_id = comp_bdev->device_qp->device->cdev_id; 444 uint64_t updated_length, remainder, phys_addr, total_length = 0; 445 uint8_t *current_src_base = NULL; 446 uint8_t *current_dst_base = NULL; 447 int iov_index, mbuf_index; 448 int rc = 0; 449 struct vbdev_comp_op *op_to_queue; 450 int i; 451 int src_mbuf_total = src_iovcnt; 452 int dst_mbuf_total = dst_iovcnt; 453 bool device_error = false; 454 455 assert(src_iovcnt < MAX_MBUFS_PER_OP); 456 457 #ifdef DEBUG 458 memset(src_mbufs, 0, sizeof(src_mbufs)); 459 memset(dst_mbufs, 0, sizeof(dst_mbufs)); 460 #endif 461 462 comp_op = rte_comp_op_alloc(g_comp_op_mp); 463 if (!comp_op) { 464 SPDK_ERRLOG("trying to get a comp op!\n"); 465 goto error_get_op; 466 } 467 468 /* get an mbuf per iov, src and dst */ 469 rc = rte_pktmbuf_alloc_bulk(g_mbuf_mp, (struct rte_mbuf **)&src_mbufs[0], src_iovcnt); 470 if (rc) { 471 SPDK_ERRLOG("ERROR trying to get src_mbufs!\n"); 472 goto error_get_src; 473 } 474 475 rc = rte_pktmbuf_alloc_bulk(g_mbuf_mp, (struct rte_mbuf **)&dst_mbufs[0], dst_iovcnt); 476 if (rc) { 477 SPDK_ERRLOG("ERROR trying to get dst_mbufs!\n"); 478 goto error_get_dst; 479 } 480 481 /* There is a 1:1 mapping between a bdev_io and a compression operation, but 482 * all compression PMDs that SPDK uses support chaining so build our mbuf chain 483 * and associate with our single comp_op. 484 */ 485 486 /* Setup src mbufs */ 487 iov_index = mbuf_index = 0; 488 while (iov_index < src_iovcnt) { 489 490 current_src_base = src_iovs[iov_index].iov_base; 491 total_length += src_iovs[iov_index].iov_len; 492 assert(src_mbufs[mbuf_index] != NULL); 493 src_mbufs[mbuf_index]->userdata = reduce_cb_arg; 494 updated_length = src_iovs[iov_index].iov_len; 495 phys_addr = spdk_vtophys((void *)current_src_base, &updated_length); 496 497 rte_pktmbuf_attach_extbuf(src_mbufs[mbuf_index], 498 current_src_base, 499 phys_addr, 500 updated_length, 501 &g_shinfo); 502 rte_pktmbuf_append(src_mbufs[mbuf_index], updated_length); 503 remainder = src_iovs[iov_index].iov_len - updated_length; 504 505 if (mbuf_index > 0) { 506 rte_pktmbuf_chain(src_mbufs[0], src_mbufs[mbuf_index]); 507 } 508 509 /* If we crossed 2 2MB boundary we need another mbuf for the remainder */ 510 if (remainder > 0) { 511 /* allocate an mbuf at the end of the array */ 512 rc = rte_pktmbuf_alloc_bulk(g_mbuf_mp, (struct rte_mbuf **)&src_mbufs[src_mbuf_total], 1); 513 if (rc) { 514 SPDK_ERRLOG("ERROR trying to get an extra src_mbuf!\n"); 515 goto error_src_dst; 516 } 517 src_mbuf_total++; 518 mbuf_index++; 519 src_mbufs[mbuf_index]->userdata = reduce_cb_arg; 520 current_src_base += updated_length; 521 phys_addr = spdk_vtophys((void *)current_src_base, &remainder); 522 /* assert we don't cross another */ 523 assert(remainder == src_iovs[iov_index].iov_len - updated_length); 524 525 rte_pktmbuf_attach_extbuf(src_mbufs[mbuf_index], 526 current_src_base, 527 phys_addr, 528 remainder, 529 &g_shinfo); 530 rte_pktmbuf_append(src_mbufs[mbuf_index], remainder); 531 rte_pktmbuf_chain(src_mbufs[0], src_mbufs[mbuf_index]); 532 } 533 iov_index++; 534 mbuf_index++; 535 } 536 537 comp_op->m_src = src_mbufs[0]; 538 comp_op->src.offset = 0; 539 comp_op->src.length = total_length; 540 541 /* setup dst mbufs, for the current test being used with this code there's only one vector */ 542 iov_index = mbuf_index = 0; 543 while (iov_index < dst_iovcnt) { 544 545 current_dst_base = dst_iovs[iov_index].iov_base; 546 updated_length = dst_iovs[iov_index].iov_len; 547 phys_addr = spdk_vtophys((void *)current_dst_base, &updated_length); 548 549 rte_pktmbuf_attach_extbuf(dst_mbufs[mbuf_index], 550 current_dst_base, 551 phys_addr, 552 updated_length, 553 &g_shinfo); 554 rte_pktmbuf_append(dst_mbufs[mbuf_index], updated_length); 555 remainder = dst_iovs[iov_index].iov_len - updated_length; 556 557 if (mbuf_index > 0) { 558 rte_pktmbuf_chain(dst_mbufs[0], dst_mbufs[mbuf_index]); 559 } 560 561 /* If we crossed 2 2MB boundary we need another mbuf for the remainder */ 562 if (remainder > 0) { 563 rc = rte_pktmbuf_alloc_bulk(g_mbuf_mp, (struct rte_mbuf **)&dst_mbufs[dst_mbuf_total], 1); 564 if (rc) { 565 SPDK_ERRLOG("ERROR trying to get an extra dst_mbuf!\n"); 566 goto error_src_dst; 567 } 568 dst_mbuf_total++; 569 mbuf_index++; 570 current_dst_base += updated_length; 571 phys_addr = spdk_vtophys((void *)current_dst_base, &remainder); 572 /* assert we don't cross another */ 573 assert(remainder == dst_iovs[iov_index].iov_len - updated_length); 574 575 rte_pktmbuf_attach_extbuf(dst_mbufs[mbuf_index], 576 current_dst_base, 577 phys_addr, 578 remainder, 579 &g_shinfo); 580 rte_pktmbuf_append(dst_mbufs[mbuf_index], remainder); 581 rte_pktmbuf_chain(dst_mbufs[0], dst_mbufs[mbuf_index]); 582 } 583 iov_index++; 584 mbuf_index++; 585 } 586 587 comp_op->m_dst = dst_mbufs[0]; 588 comp_op->dst.offset = 0; 589 590 if (compress == true) { 591 comp_op->private_xform = comp_bdev->device_qp->device->comp_xform; 592 } else { 593 comp_op->private_xform = comp_bdev->device_qp->device->decomp_xform; 594 } 595 596 comp_op->op_type = RTE_COMP_OP_STATELESS; 597 comp_op->flush_flag = RTE_COMP_FLUSH_FINAL; 598 599 rc = rte_compressdev_enqueue_burst(cdev_id, comp_bdev->device_qp->qp, &comp_op, 1); 600 assert(rc <= 1); 601 602 /* We always expect 1 got queued, if 0 then we need to queue it up. */ 603 if (rc == 1) { 604 return 0; 605 } else if (comp_op->status == RTE_COMP_OP_STATUS_NOT_PROCESSED) { 606 /* we free mbufs differently depending on whether they were chained or not */ 607 rte_pktmbuf_free(comp_op->m_src); 608 rte_pktmbuf_free(comp_op->m_dst); 609 goto error_enqueue; 610 } else { 611 device_error = true; 612 goto error_src_dst; 613 } 614 615 /* Error cleanup paths. */ 616 error_src_dst: 617 for (i = 0; i < dst_mbuf_total; i++) { 618 rte_pktmbuf_free((struct rte_mbuf *)&dst_mbufs[i]); 619 } 620 error_get_dst: 621 for (i = 0; i < src_mbuf_total; i++) { 622 rte_pktmbuf_free((struct rte_mbuf *)&src_mbufs[i]); 623 } 624 error_get_src: 625 error_enqueue: 626 rte_comp_op_free(comp_op); 627 error_get_op: 628 629 if (device_error == true) { 630 /* There was an error sending the op to the device, most 631 * likely with the parameters. 632 */ 633 SPDK_ERRLOG("Compression API returned 0x%x\n", comp_op->status); 634 return -EINVAL; 635 } 636 637 op_to_queue = calloc(1, sizeof(struct vbdev_comp_op)); 638 if (op_to_queue == NULL) { 639 SPDK_ERRLOG("unable to allocate operation for queueing.\n"); 640 return -ENOMEM; 641 } 642 op_to_queue->backing_dev = backing_dev; 643 op_to_queue->src_iovs = src_iovs; 644 op_to_queue->src_iovcnt = src_iovcnt; 645 op_to_queue->dst_iovs = dst_iovs; 646 op_to_queue->dst_iovcnt = dst_iovcnt; 647 op_to_queue->compress = compress; 648 op_to_queue->cb_arg = cb_arg; 649 TAILQ_INSERT_TAIL(&comp_bdev->queued_comp_ops, 650 op_to_queue, 651 link); 652 return 0; 653 } 654 655 /* Poller for the DPDK compression driver. */ 656 static int 657 comp_dev_poller(void *args) 658 { 659 struct vbdev_compress *comp_bdev = args; 660 uint8_t cdev_id = comp_bdev->device_qp->device->cdev_id; 661 struct rte_comp_op *deq_ops[NUM_MAX_INFLIGHT_OPS]; 662 uint16_t num_deq; 663 struct spdk_reduce_vol_cb_args *reduce_args; 664 struct vbdev_comp_op *op_to_resubmit; 665 int rc, i; 666 667 num_deq = rte_compressdev_dequeue_burst(cdev_id, comp_bdev->device_qp->qp, deq_ops, 668 NUM_MAX_INFLIGHT_OPS); 669 for (i = 0; i < num_deq; i++) { 670 reduce_args = (struct spdk_reduce_vol_cb_args *)deq_ops[i]->m_src->userdata; 671 672 if (deq_ops[i]->status == RTE_COMP_OP_STATUS_SUCCESS) { 673 674 /* tell reduce this is done and what the bytecount was */ 675 reduce_args->cb_fn(reduce_args->cb_arg, deq_ops[i]->produced); 676 } else { 677 SPDK_NOTICELOG("FYI storing data uncompressed due to deque status %u\n", 678 deq_ops[i]->status); 679 680 /* Reduce will simply store uncompressed on neg errno value. */ 681 reduce_args->cb_fn(reduce_args->cb_arg, -EINVAL); 682 } 683 684 /* Now free both mbufs and the compress operation. The rte_pktmbuf_free() 685 * call takes care of freeing all of the mbufs in the chain back to their 686 * original pool. 687 */ 688 rte_pktmbuf_free(deq_ops[i]->m_src); 689 rte_pktmbuf_free(deq_ops[i]->m_dst); 690 691 /* There is no bulk free for com ops so we have to free them one at a time 692 * here however it would be rare that we'd ever have more than 1 at a time 693 * anyways. 694 */ 695 rte_comp_op_free(deq_ops[i]); 696 697 /* Check if there are any pending comp ops to process, only pull one 698 * at a time off as _compress_operation() may re-queue the op. 699 */ 700 if (!TAILQ_EMPTY(&comp_bdev->queued_comp_ops)) { 701 op_to_resubmit = TAILQ_FIRST(&comp_bdev->queued_comp_ops); 702 rc = _compress_operation(op_to_resubmit->backing_dev, 703 op_to_resubmit->src_iovs, 704 op_to_resubmit->src_iovcnt, 705 op_to_resubmit->dst_iovs, 706 op_to_resubmit->dst_iovcnt, 707 op_to_resubmit->compress, 708 op_to_resubmit->cb_arg); 709 if (rc == 0) { 710 TAILQ_REMOVE(&comp_bdev->queued_comp_ops, op_to_resubmit, link); 711 free(op_to_resubmit); 712 } 713 } 714 } 715 return 0; 716 } 717 718 /* Entry point for reduce lib to issue a compress operation. */ 719 static void 720 _comp_reduce_compress(struct spdk_reduce_backing_dev *dev, 721 struct iovec *src_iovs, int src_iovcnt, 722 struct iovec *dst_iovs, int dst_iovcnt, 723 struct spdk_reduce_vol_cb_args *cb_arg) 724 { 725 int rc; 726 727 rc = _compress_operation(dev, src_iovs, src_iovcnt, dst_iovs, dst_iovcnt, true, cb_arg); 728 if (rc) { 729 SPDK_ERRLOG("with compress operation code %d (%s)\n", rc, spdk_strerror(-rc)); 730 cb_arg->cb_fn(cb_arg->cb_arg, rc); 731 } 732 } 733 734 /* Entry point for reduce lib to issue a decompress operation. */ 735 static void 736 _comp_reduce_decompress(struct spdk_reduce_backing_dev *dev, 737 struct iovec *src_iovs, int src_iovcnt, 738 struct iovec *dst_iovs, int dst_iovcnt, 739 struct spdk_reduce_vol_cb_args *cb_arg) 740 { 741 int rc; 742 743 rc = _compress_operation(dev, src_iovs, src_iovcnt, dst_iovs, dst_iovcnt, false, cb_arg); 744 if (rc) { 745 SPDK_ERRLOG("with decompress operation code %d (%s)\n", rc, spdk_strerror(-rc)); 746 cb_arg->cb_fn(cb_arg->cb_arg, rc); 747 } 748 } 749 750 /* Callback for getting a buf from the bdev pool in the event that the caller passed 751 * in NULL, we need to own the buffer so it doesn't get freed by another vbdev module 752 * beneath us before we're done with it. 753 */ 754 static void 755 comp_read_get_buf_cb(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io, bool success) 756 { 757 struct vbdev_compress *comp_bdev = SPDK_CONTAINEROF(bdev_io->bdev, struct vbdev_compress, 758 comp_bdev); 759 760 spdk_reduce_vol_readv(comp_bdev->vol, bdev_io->u.bdev.iovs, bdev_io->u.bdev.iovcnt, 761 bdev_io->u.bdev.offset_blocks, bdev_io->u.bdev.num_blocks, 762 spdk_reduce_rw_blocks_cb, bdev_io); 763 } 764 765 /* scheduled for completion on IO thread */ 766 static void 767 _complete_other_io(void *arg) 768 { 769 struct comp_bdev_io *io_ctx = (struct comp_bdev_io *)arg; 770 if (io_ctx->status == 0) { 771 spdk_bdev_io_complete(io_ctx->orig_io, SPDK_BDEV_IO_STATUS_SUCCESS); 772 } else { 773 spdk_bdev_io_complete(io_ctx->orig_io, SPDK_BDEV_IO_STATUS_FAILED); 774 } 775 } 776 777 /* scheduled for submission on reduce thread */ 778 static void 779 _comp_bdev_io_submit(void *arg) 780 { 781 struct spdk_bdev_io *bdev_io = arg; 782 struct comp_bdev_io *io_ctx = (struct comp_bdev_io *)bdev_io->driver_ctx; 783 struct spdk_io_channel *ch = spdk_io_channel_from_ctx(io_ctx->comp_ch); 784 struct vbdev_compress *comp_bdev = SPDK_CONTAINEROF(bdev_io->bdev, struct vbdev_compress, 785 comp_bdev); 786 int rc = 0; 787 788 switch (bdev_io->type) { 789 case SPDK_BDEV_IO_TYPE_READ: 790 spdk_bdev_io_get_buf(bdev_io, comp_read_get_buf_cb, 791 bdev_io->u.bdev.num_blocks * bdev_io->bdev->blocklen); 792 return; 793 case SPDK_BDEV_IO_TYPE_WRITE: 794 spdk_reduce_vol_writev(comp_bdev->vol, bdev_io->u.bdev.iovs, bdev_io->u.bdev.iovcnt, 795 bdev_io->u.bdev.offset_blocks, bdev_io->u.bdev.num_blocks, 796 spdk_reduce_rw_blocks_cb, bdev_io); 797 return; 798 /* TODO in future patch in the series */ 799 case SPDK_BDEV_IO_TYPE_RESET: 800 break; 801 case SPDK_BDEV_IO_TYPE_WRITE_ZEROES: 802 case SPDK_BDEV_IO_TYPE_UNMAP: 803 case SPDK_BDEV_IO_TYPE_FLUSH: 804 default: 805 SPDK_ERRLOG("Unknown I/O type %d\n", bdev_io->type); 806 rc = -EINVAL; 807 } 808 809 if (rc) { 810 if (rc == -ENOMEM) { 811 SPDK_ERRLOG("No memory, start to queue io for compress.\n"); 812 io_ctx->ch = ch; 813 vbdev_compress_queue_io(bdev_io); 814 return; 815 } else { 816 SPDK_ERRLOG("on bdev_io submission!\n"); 817 io_ctx->status = rc; 818 } 819 } 820 821 /* Complete this on the orig IO thread. */ 822 if (spdk_io_channel_get_thread(ch) != spdk_get_thread()) { 823 spdk_thread_send_msg(spdk_io_channel_get_thread(ch), _complete_other_io, io_ctx); 824 } else { 825 _complete_other_io(io_ctx); 826 } 827 } 828 829 /* Called when someone above submits IO to this vbdev. */ 830 static void 831 vbdev_compress_submit_request(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io) 832 { 833 struct comp_bdev_io *io_ctx = (struct comp_bdev_io *)bdev_io->driver_ctx; 834 struct vbdev_compress *comp_bdev = SPDK_CONTAINEROF(bdev_io->bdev, struct vbdev_compress, 835 comp_bdev); 836 struct comp_io_channel *comp_ch = spdk_io_channel_get_ctx(ch); 837 838 memset(io_ctx, 0, sizeof(struct comp_bdev_io)); 839 io_ctx->comp_bdev = comp_bdev; 840 io_ctx->comp_ch = comp_ch; 841 io_ctx->orig_io = bdev_io; 842 843 /* Send this request to the reduce_thread if that's not what we're on. */ 844 if (spdk_io_channel_get_thread(ch) != comp_bdev->reduce_thread) { 845 spdk_thread_send_msg(comp_bdev->reduce_thread, _comp_bdev_io_submit, bdev_io); 846 } else { 847 _comp_bdev_io_submit(bdev_io); 848 } 849 } 850 851 static bool 852 vbdev_compress_io_type_supported(void *ctx, enum spdk_bdev_io_type io_type) 853 { 854 struct vbdev_compress *comp_bdev = (struct vbdev_compress *)ctx; 855 856 switch (io_type) { 857 case SPDK_BDEV_IO_TYPE_READ: 858 case SPDK_BDEV_IO_TYPE_WRITE: 859 return spdk_bdev_io_type_supported(comp_bdev->base_bdev, io_type); 860 case SPDK_BDEV_IO_TYPE_UNMAP: 861 case SPDK_BDEV_IO_TYPE_RESET: 862 case SPDK_BDEV_IO_TYPE_FLUSH: 863 case SPDK_BDEV_IO_TYPE_WRITE_ZEROES: 864 default: 865 return false; 866 } 867 } 868 869 /* Resubmission function used by the bdev layer when a queued IO is ready to be 870 * submitted. 871 */ 872 static void 873 vbdev_compress_resubmit_io(void *arg) 874 { 875 struct spdk_bdev_io *bdev_io = (struct spdk_bdev_io *)arg; 876 struct comp_bdev_io *io_ctx = (struct comp_bdev_io *)bdev_io->driver_ctx; 877 878 vbdev_compress_submit_request(io_ctx->ch, bdev_io); 879 } 880 881 /* Used to queue an IO in the event of resource issues. */ 882 static void 883 vbdev_compress_queue_io(struct spdk_bdev_io *bdev_io) 884 { 885 struct comp_bdev_io *io_ctx = (struct comp_bdev_io *)bdev_io->driver_ctx; 886 int rc; 887 888 io_ctx->bdev_io_wait.bdev = bdev_io->bdev; 889 io_ctx->bdev_io_wait.cb_fn = vbdev_compress_resubmit_io; 890 io_ctx->bdev_io_wait.cb_arg = bdev_io; 891 892 rc = spdk_bdev_queue_io_wait(bdev_io->bdev, io_ctx->comp_bdev->base_ch, &io_ctx->bdev_io_wait); 893 if (rc) { 894 SPDK_ERRLOG("Queue io failed in vbdev_compress_queue_io, rc=%d.\n", rc); 895 assert(false); 896 spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED); 897 } 898 } 899 900 /* Callback for unregistering the IO device. */ 901 static void 902 _device_unregister_cb(void *io_device) 903 { 904 struct vbdev_compress *comp_bdev = io_device; 905 906 /* Done with this comp_bdev. */ 907 pthread_mutex_destroy(&comp_bdev->reduce_lock); 908 free(comp_bdev->comp_bdev.name); 909 free(comp_bdev); 910 } 911 912 static void 913 vbdev_compress_destruct_cb(void *cb_arg, int reduce_errno) 914 { 915 struct vbdev_compress *comp_bdev = (struct vbdev_compress *)cb_arg; 916 917 if (reduce_errno) { 918 SPDK_ERRLOG("number %d\n", reduce_errno); 919 } else { 920 TAILQ_REMOVE(&g_vbdev_comp, comp_bdev, link); 921 spdk_bdev_module_release_bdev(comp_bdev->base_bdev); 922 spdk_bdev_close(comp_bdev->base_desc); 923 comp_bdev->vol = NULL; 924 if (comp_bdev->orphaned == false) { 925 spdk_io_device_unregister(comp_bdev, _device_unregister_cb); 926 } else { 927 comp_bdev->delete_cb_fn(comp_bdev->delete_cb_arg, 0); 928 _device_unregister_cb(comp_bdev); 929 } 930 } 931 } 932 933 static void 934 _reduce_destroy_cb(void *ctx, int reduce_errno) 935 { 936 struct vbdev_compress *comp_bdev = (struct vbdev_compress *)ctx; 937 938 if (reduce_errno) { 939 SPDK_ERRLOG("number %d\n", reduce_errno); 940 } 941 942 comp_bdev->vol = NULL; 943 spdk_put_io_channel(comp_bdev->base_ch); 944 if (comp_bdev->orphaned == false) { 945 spdk_bdev_unregister(&comp_bdev->comp_bdev, comp_bdev->delete_cb_fn, 946 comp_bdev->delete_cb_arg); 947 } else { 948 vbdev_compress_destruct_cb((void *)comp_bdev, 0); 949 } 950 951 } 952 953 /* Called by reduceLib after performing unload vol actions */ 954 static void 955 delete_vol_unload_cb(void *cb_arg, int reduce_errno) 956 { 957 struct vbdev_compress *comp_bdev = (struct vbdev_compress *)cb_arg; 958 959 if (reduce_errno) { 960 SPDK_ERRLOG("number %d\n", reduce_errno); 961 } else { 962 /* reducelib needs a channel to comm with the backing device */ 963 comp_bdev->base_ch = spdk_bdev_get_io_channel(comp_bdev->base_desc); 964 965 /* Clean the device before we free our resources. */ 966 spdk_reduce_vol_destroy(&comp_bdev->backing_dev, _reduce_destroy_cb, comp_bdev); 967 } 968 } 969 970 const char * 971 compress_get_name(const struct vbdev_compress *comp_bdev) 972 { 973 return comp_bdev->comp_bdev.name; 974 } 975 976 struct vbdev_compress * 977 compress_bdev_first(void) 978 { 979 struct vbdev_compress *comp_bdev; 980 981 comp_bdev = TAILQ_FIRST(&g_vbdev_comp); 982 983 return comp_bdev; 984 } 985 986 struct vbdev_compress * 987 compress_bdev_next(struct vbdev_compress *prev) 988 { 989 struct vbdev_compress *comp_bdev; 990 991 comp_bdev = TAILQ_NEXT(prev, link); 992 993 return comp_bdev; 994 } 995 996 bool 997 compress_has_orphan(const char *name) 998 { 999 struct vbdev_compress *comp_bdev; 1000 1001 TAILQ_FOREACH(comp_bdev, &g_vbdev_comp, link) { 1002 if (comp_bdev->orphaned && strcmp(name, comp_bdev->comp_bdev.name) == 0) { 1003 return true; 1004 } 1005 } 1006 return false; 1007 } 1008 1009 /* Called after we've unregistered following a hot remove callback. 1010 * Our finish entry point will be called next. 1011 */ 1012 static int 1013 vbdev_compress_destruct(void *ctx) 1014 { 1015 struct vbdev_compress *comp_bdev = (struct vbdev_compress *)ctx; 1016 1017 if (comp_bdev->vol != NULL) { 1018 /* Tell reducelib that we're done with this volume. */ 1019 spdk_reduce_vol_unload(comp_bdev->vol, vbdev_compress_destruct_cb, comp_bdev); 1020 } else { 1021 vbdev_compress_destruct_cb(comp_bdev, 0); 1022 } 1023 1024 return 0; 1025 } 1026 1027 /* We supplied this as an entry point for upper layers who want to communicate to this 1028 * bdev. This is how they get a channel. 1029 */ 1030 static struct spdk_io_channel * 1031 vbdev_compress_get_io_channel(void *ctx) 1032 { 1033 struct vbdev_compress *comp_bdev = (struct vbdev_compress *)ctx; 1034 1035 /* The IO channel code will allocate a channel for us which consists of 1036 * the SPDK channel structure plus the size of our comp_io_channel struct 1037 * that we passed in when we registered our IO device. It will then call 1038 * our channel create callback to populate any elements that we need to 1039 * update. 1040 */ 1041 return spdk_get_io_channel(comp_bdev); 1042 } 1043 1044 /* This is the output for bdev_get_bdevs() for this vbdev */ 1045 static int 1046 vbdev_compress_dump_info_json(void *ctx, struct spdk_json_write_ctx *w) 1047 { 1048 struct vbdev_compress *comp_bdev = (struct vbdev_compress *)ctx; 1049 1050 spdk_json_write_name(w, "compress"); 1051 spdk_json_write_object_begin(w); 1052 spdk_json_write_named_string(w, "name", spdk_bdev_get_name(&comp_bdev->comp_bdev)); 1053 spdk_json_write_named_string(w, "base_bdev_name", spdk_bdev_get_name(comp_bdev->base_bdev)); 1054 spdk_json_write_named_string(w, "compression_pmd", comp_bdev->drv_name); 1055 spdk_json_write_object_end(w); 1056 1057 return 0; 1058 } 1059 1060 /* This is used to generate JSON that can configure this module to its current state. */ 1061 static int 1062 vbdev_compress_config_json(struct spdk_json_write_ctx *w) 1063 { 1064 struct vbdev_compress *comp_bdev; 1065 1066 TAILQ_FOREACH(comp_bdev, &g_vbdev_comp, link) { 1067 spdk_json_write_object_begin(w); 1068 spdk_json_write_named_string(w, "method", "bdev_compress_create"); 1069 spdk_json_write_named_object_begin(w, "params"); 1070 spdk_json_write_named_string(w, "base_bdev_name", spdk_bdev_get_name(comp_bdev->base_bdev)); 1071 spdk_json_write_named_string(w, "name", spdk_bdev_get_name(&comp_bdev->comp_bdev)); 1072 spdk_json_write_named_string(w, "compression_pmd", comp_bdev->drv_name); 1073 spdk_json_write_object_end(w); 1074 spdk_json_write_object_end(w); 1075 } 1076 return 0; 1077 } 1078 1079 /* Callback from reduce for when init is complete. We'll pass the vbdev_comp struct 1080 * used for initial metadata operations to claim where it will be further filled out 1081 * and added to the global list. 1082 */ 1083 static void 1084 vbdev_reduce_init_cb(void *cb_arg, struct spdk_reduce_vol *vol, int reduce_errno) 1085 { 1086 struct vbdev_compress *meta_ctx = cb_arg; 1087 1088 /* We're done with metadata operations */ 1089 spdk_put_io_channel(meta_ctx->base_ch); 1090 spdk_bdev_close(meta_ctx->base_desc); 1091 meta_ctx->base_desc = NULL; 1092 1093 if (reduce_errno == 0) { 1094 meta_ctx->vol = vol; 1095 vbdev_compress_claim(meta_ctx); 1096 } else { 1097 SPDK_ERRLOG("for vol %s, error %u\n", 1098 spdk_bdev_get_name(meta_ctx->base_bdev), reduce_errno); 1099 free(meta_ctx); 1100 } 1101 } 1102 1103 /* Callback for the function used by reduceLib to perform IO to/from the backing device. We just 1104 * call the callback provided by reduceLib when it called the read/write/unmap function and 1105 * free the bdev_io. 1106 */ 1107 static void 1108 comp_reduce_io_cb(struct spdk_bdev_io *bdev_io, bool success, void *arg) 1109 { 1110 struct spdk_reduce_vol_cb_args *cb_args = arg; 1111 int reduce_errno; 1112 1113 if (success) { 1114 reduce_errno = 0; 1115 } else { 1116 reduce_errno = -EIO; 1117 } 1118 spdk_bdev_free_io(bdev_io); 1119 cb_args->cb_fn(cb_args->cb_arg, reduce_errno); 1120 } 1121 1122 /* This is the function provided to the reduceLib for sending reads directly to 1123 * the backing device. 1124 */ 1125 static void 1126 _comp_reduce_readv(struct spdk_reduce_backing_dev *dev, struct iovec *iov, int iovcnt, 1127 uint64_t lba, uint32_t lba_count, struct spdk_reduce_vol_cb_args *args) 1128 { 1129 struct vbdev_compress *comp_bdev = SPDK_CONTAINEROF(dev, struct vbdev_compress, 1130 backing_dev); 1131 int rc; 1132 1133 rc = spdk_bdev_readv_blocks(comp_bdev->base_desc, comp_bdev->base_ch, 1134 iov, iovcnt, lba, lba_count, 1135 comp_reduce_io_cb, 1136 args); 1137 if (rc) { 1138 if (rc == -ENOMEM) { 1139 SPDK_ERRLOG("No memory, start to queue io.\n"); 1140 /* TODO: there's no bdev_io to queue */ 1141 } else { 1142 SPDK_ERRLOG("submitting readv request\n"); 1143 } 1144 args->cb_fn(args->cb_arg, rc); 1145 } 1146 } 1147 1148 /* This is the function provided to the reduceLib for sending writes directly to 1149 * the backing device. 1150 */ 1151 static void 1152 _comp_reduce_writev(struct spdk_reduce_backing_dev *dev, struct iovec *iov, int iovcnt, 1153 uint64_t lba, uint32_t lba_count, struct spdk_reduce_vol_cb_args *args) 1154 { 1155 struct vbdev_compress *comp_bdev = SPDK_CONTAINEROF(dev, struct vbdev_compress, 1156 backing_dev); 1157 int rc; 1158 1159 rc = spdk_bdev_writev_blocks(comp_bdev->base_desc, comp_bdev->base_ch, 1160 iov, iovcnt, lba, lba_count, 1161 comp_reduce_io_cb, 1162 args); 1163 if (rc) { 1164 if (rc == -ENOMEM) { 1165 SPDK_ERRLOG("No memory, start to queue io.\n"); 1166 /* TODO: there's no bdev_io to queue */ 1167 } else { 1168 SPDK_ERRLOG("error submitting writev request\n"); 1169 } 1170 args->cb_fn(args->cb_arg, rc); 1171 } 1172 } 1173 1174 /* This is the function provided to the reduceLib for sending unmaps directly to 1175 * the backing device. 1176 */ 1177 static void 1178 _comp_reduce_unmap(struct spdk_reduce_backing_dev *dev, 1179 uint64_t lba, uint32_t lba_count, struct spdk_reduce_vol_cb_args *args) 1180 { 1181 struct vbdev_compress *comp_bdev = SPDK_CONTAINEROF(dev, struct vbdev_compress, 1182 backing_dev); 1183 int rc; 1184 1185 rc = spdk_bdev_unmap_blocks(comp_bdev->base_desc, comp_bdev->base_ch, 1186 lba, lba_count, 1187 comp_reduce_io_cb, 1188 args); 1189 1190 if (rc) { 1191 if (rc == -ENOMEM) { 1192 SPDK_ERRLOG("No memory, start to queue io.\n"); 1193 /* TODO: there's no bdev_io to queue */ 1194 } else { 1195 SPDK_ERRLOG("submitting unmap request\n"); 1196 } 1197 args->cb_fn(args->cb_arg, rc); 1198 } 1199 } 1200 1201 /* Called by reduceLib after performing unload vol actions following base bdev hotremove */ 1202 static void 1203 bdev_hotremove_vol_unload_cb(void *cb_arg, int reduce_errno) 1204 { 1205 struct vbdev_compress *comp_bdev = (struct vbdev_compress *)cb_arg; 1206 1207 if (reduce_errno) { 1208 SPDK_ERRLOG("number %d\n", reduce_errno); 1209 } 1210 1211 comp_bdev->vol = NULL; 1212 spdk_bdev_unregister(&comp_bdev->comp_bdev, NULL, NULL); 1213 } 1214 1215 /* Called when the underlying base bdev goes away. */ 1216 static void 1217 vbdev_compress_base_bdev_hotremove_cb(void *ctx) 1218 { 1219 struct vbdev_compress *comp_bdev, *tmp; 1220 struct spdk_bdev *bdev_find = ctx; 1221 1222 TAILQ_FOREACH_SAFE(comp_bdev, &g_vbdev_comp, link, tmp) { 1223 if (bdev_find == comp_bdev->base_bdev) { 1224 /* Tell reduceLib that we're done with this volume. */ 1225 spdk_reduce_vol_unload(comp_bdev->vol, bdev_hotremove_vol_unload_cb, comp_bdev); 1226 } 1227 } 1228 } 1229 1230 /* TODO: determine which parms we want user configurable, HC for now 1231 * params.vol_size 1232 * params.chunk_size 1233 * compression PMD, algorithm, window size, comp level, etc. 1234 * DEV_MD_PATH 1235 */ 1236 1237 /* Common function for init and load to allocate and populate the minimal 1238 * information for reducelib to init or load. 1239 */ 1240 struct vbdev_compress * 1241 _prepare_for_load_init(struct spdk_bdev *bdev) 1242 { 1243 struct vbdev_compress *meta_ctx; 1244 1245 meta_ctx = calloc(1, sizeof(struct vbdev_compress)); 1246 if (meta_ctx == NULL) { 1247 SPDK_ERRLOG("failed to alloc init contexts\n"); 1248 return NULL; 1249 } 1250 1251 meta_ctx->drv_name = "None"; 1252 meta_ctx->base_bdev = bdev; 1253 meta_ctx->backing_dev.unmap = _comp_reduce_unmap; 1254 meta_ctx->backing_dev.readv = _comp_reduce_readv; 1255 meta_ctx->backing_dev.writev = _comp_reduce_writev; 1256 meta_ctx->backing_dev.compress = _comp_reduce_compress; 1257 meta_ctx->backing_dev.decompress = _comp_reduce_decompress; 1258 1259 meta_ctx->backing_dev.blocklen = bdev->blocklen; 1260 meta_ctx->backing_dev.blockcnt = bdev->blockcnt; 1261 1262 meta_ctx->params.chunk_size = CHUNK_SIZE; 1263 meta_ctx->params.logical_block_size = bdev->blocklen; 1264 meta_ctx->params.backing_io_unit_size = BACKING_IO_SZ; 1265 return meta_ctx; 1266 } 1267 1268 static bool 1269 _set_pmd(struct vbdev_compress *comp_dev) 1270 { 1271 if (g_opts == COMPRESS_PMD_AUTO) { 1272 if (g_qat_available) { 1273 comp_dev->drv_name = QAT_PMD; 1274 } else { 1275 comp_dev->drv_name = ISAL_PMD; 1276 } 1277 } else if (g_opts == COMPRESS_PMD_QAT_ONLY && g_qat_available) { 1278 comp_dev->drv_name = QAT_PMD; 1279 } else if (g_opts == COMPRESS_PMD_ISAL_ONLY && g_isal_available) { 1280 comp_dev->drv_name = ISAL_PMD; 1281 } else { 1282 SPDK_ERRLOG("Requested PMD is not available.\n"); 1283 return false; 1284 } 1285 SPDK_NOTICELOG("PMD being used: %s\n", comp_dev->drv_name); 1286 return true; 1287 } 1288 1289 /* Call reducelib to initialize a new volume */ 1290 static int 1291 vbdev_init_reduce(struct spdk_bdev *bdev, const char *pm_path) 1292 { 1293 struct vbdev_compress *meta_ctx; 1294 int rc; 1295 1296 meta_ctx = _prepare_for_load_init(bdev); 1297 if (meta_ctx == NULL) { 1298 return -EINVAL; 1299 } 1300 1301 if (_set_pmd(meta_ctx) == false) { 1302 SPDK_ERRLOG("could not find required pmd\n"); 1303 free(meta_ctx); 1304 return -EINVAL; 1305 } 1306 1307 rc = spdk_bdev_open(meta_ctx->base_bdev, true, vbdev_compress_base_bdev_hotremove_cb, 1308 meta_ctx->base_bdev, &meta_ctx->base_desc); 1309 if (rc) { 1310 SPDK_ERRLOG("could not open bdev %s\n", spdk_bdev_get_name(meta_ctx->base_bdev)); 1311 free(meta_ctx); 1312 return -EINVAL; 1313 } 1314 meta_ctx->base_ch = spdk_bdev_get_io_channel(meta_ctx->base_desc); 1315 1316 spdk_reduce_vol_init(&meta_ctx->params, &meta_ctx->backing_dev, 1317 pm_path, 1318 vbdev_reduce_init_cb, 1319 meta_ctx); 1320 return 0; 1321 } 1322 1323 /* We provide this callback for the SPDK channel code to create a channel using 1324 * the channel struct we provided in our module get_io_channel() entry point. Here 1325 * we get and save off an underlying base channel of the device below us so that 1326 * we can communicate with the base bdev on a per channel basis. If we needed 1327 * our own poller for this vbdev, we'd register it here. 1328 */ 1329 static int 1330 comp_bdev_ch_create_cb(void *io_device, void *ctx_buf) 1331 { 1332 struct vbdev_compress *comp_bdev = io_device; 1333 struct comp_device_qp *device_qp; 1334 1335 /* We use this queue to track outstanding IO in our layer. */ 1336 TAILQ_INIT(&comp_bdev->pending_comp_ios); 1337 1338 /* We use this to queue up compression operations as needed. */ 1339 TAILQ_INIT(&comp_bdev->queued_comp_ops); 1340 1341 /* Now set the reduce channel if it's not already set. */ 1342 pthread_mutex_lock(&comp_bdev->reduce_lock); 1343 if (comp_bdev->ch_count == 0) { 1344 comp_bdev->base_ch = spdk_bdev_get_io_channel(comp_bdev->base_desc); 1345 comp_bdev->reduce_thread = spdk_get_thread(); 1346 comp_bdev->poller = spdk_poller_register(comp_dev_poller, comp_bdev, 0); 1347 /* Now assign a q pair */ 1348 pthread_mutex_lock(&g_comp_device_qp_lock); 1349 TAILQ_FOREACH(device_qp, &g_comp_device_qp, link) { 1350 if ((strcmp(device_qp->device->cdev_info.driver_name, comp_bdev->drv_name) == 0)) { 1351 if (device_qp->thread == spdk_get_thread()) { 1352 comp_bdev->device_qp = device_qp; 1353 break; 1354 } 1355 if (device_qp->thread == NULL) { 1356 comp_bdev->device_qp = device_qp; 1357 device_qp->thread = spdk_get_thread(); 1358 break; 1359 } 1360 } 1361 } 1362 pthread_mutex_unlock(&g_comp_device_qp_lock); 1363 } 1364 comp_bdev->ch_count++; 1365 pthread_mutex_unlock(&comp_bdev->reduce_lock); 1366 1367 if (comp_bdev->device_qp != NULL) { 1368 return 0; 1369 } else { 1370 SPDK_ERRLOG("out of qpairs, cannot assign one to comp_bdev %p\n", comp_bdev); 1371 assert(false); 1372 return -ENOMEM; 1373 } 1374 } 1375 1376 static void 1377 _channel_cleanup(struct vbdev_compress *comp_bdev) 1378 { 1379 /* Note: comp_bdevs can share a device_qp if they are 1380 * on the same thread so we leave the device_qp element 1381 * alone for this comp_bdev and just clear the reduce thread. 1382 */ 1383 spdk_put_io_channel(comp_bdev->base_ch); 1384 comp_bdev->reduce_thread = NULL; 1385 spdk_poller_unregister(&comp_bdev->poller); 1386 } 1387 1388 /* Used to reroute destroy_ch to the correct thread */ 1389 static void 1390 _comp_bdev_ch_destroy_cb(void *arg) 1391 { 1392 struct vbdev_compress *comp_bdev = arg; 1393 1394 pthread_mutex_lock(&comp_bdev->reduce_lock); 1395 if (comp_bdev->ch_count == 0) { 1396 _channel_cleanup(comp_bdev); 1397 } 1398 pthread_mutex_unlock(&comp_bdev->reduce_lock); 1399 } 1400 1401 /* We provide this callback for the SPDK channel code to destroy a channel 1402 * created with our create callback. We just need to undo anything we did 1403 * when we created. If this bdev used its own poller, we'd unregister it here. 1404 */ 1405 static void 1406 comp_bdev_ch_destroy_cb(void *io_device, void *ctx_buf) 1407 { 1408 struct vbdev_compress *comp_bdev = io_device; 1409 1410 pthread_mutex_lock(&comp_bdev->reduce_lock); 1411 comp_bdev->ch_count--; 1412 if (comp_bdev->ch_count == 0) { 1413 /* Send this request to the thread where the channel was created. */ 1414 if (comp_bdev->reduce_thread != spdk_get_thread()) { 1415 spdk_thread_send_msg(comp_bdev->reduce_thread, 1416 _comp_bdev_ch_destroy_cb, comp_bdev); 1417 } else { 1418 _channel_cleanup(comp_bdev); 1419 } 1420 } 1421 pthread_mutex_unlock(&comp_bdev->reduce_lock); 1422 } 1423 1424 /* RPC entry point for compression vbdev creation. */ 1425 int 1426 create_compress_bdev(const char *bdev_name, const char *pm_path) 1427 { 1428 struct spdk_bdev *bdev; 1429 1430 bdev = spdk_bdev_get_by_name(bdev_name); 1431 if (!bdev) { 1432 return -ENODEV; 1433 } 1434 1435 return vbdev_init_reduce(bdev, pm_path);; 1436 } 1437 1438 /* On init, just init the compress drivers. All metadata is stored on disk. */ 1439 static int 1440 vbdev_compress_init(void) 1441 { 1442 if (vbdev_init_compress_drivers()) { 1443 SPDK_ERRLOG("Error setting up compression devices\n"); 1444 return -EINVAL; 1445 } 1446 1447 return 0; 1448 } 1449 1450 /* Called when the entire module is being torn down. */ 1451 static void 1452 vbdev_compress_finish(void) 1453 { 1454 struct comp_device_qp *dev_qp; 1455 /* TODO: unload vol in a future patch */ 1456 1457 while ((dev_qp = TAILQ_FIRST(&g_comp_device_qp))) { 1458 TAILQ_REMOVE(&g_comp_device_qp, dev_qp, link); 1459 free(dev_qp); 1460 } 1461 pthread_mutex_destroy(&g_comp_device_qp_lock); 1462 1463 rte_mempool_free(g_comp_op_mp); 1464 rte_mempool_free(g_mbuf_mp); 1465 } 1466 1467 /* During init we'll be asked how much memory we'd like passed to us 1468 * in bev_io structures as context. Here's where we specify how 1469 * much context we want per IO. 1470 */ 1471 static int 1472 vbdev_compress_get_ctx_size(void) 1473 { 1474 return sizeof(struct comp_bdev_io); 1475 } 1476 1477 /* When we register our bdev this is how we specify our entry points. */ 1478 static const struct spdk_bdev_fn_table vbdev_compress_fn_table = { 1479 .destruct = vbdev_compress_destruct, 1480 .submit_request = vbdev_compress_submit_request, 1481 .io_type_supported = vbdev_compress_io_type_supported, 1482 .get_io_channel = vbdev_compress_get_io_channel, 1483 .dump_info_json = vbdev_compress_dump_info_json, 1484 .write_config_json = NULL, 1485 }; 1486 1487 static struct spdk_bdev_module compress_if = { 1488 .name = "compress", 1489 .module_init = vbdev_compress_init, 1490 .config_text = NULL, 1491 .get_ctx_size = vbdev_compress_get_ctx_size, 1492 .examine_disk = vbdev_compress_examine, 1493 .module_fini = vbdev_compress_finish, 1494 .config_json = vbdev_compress_config_json 1495 }; 1496 1497 SPDK_BDEV_MODULE_REGISTER(compress, &compress_if) 1498 1499 static int _set_compbdev_name(struct vbdev_compress *comp_bdev) 1500 { 1501 struct spdk_bdev_alias *aliases; 1502 1503 if (!TAILQ_EMPTY(spdk_bdev_get_aliases(comp_bdev->base_bdev))) { 1504 aliases = TAILQ_FIRST(spdk_bdev_get_aliases(comp_bdev->base_bdev)); 1505 comp_bdev->comp_bdev.name = spdk_sprintf_alloc("COMP_%s", aliases->alias); 1506 if (!comp_bdev->comp_bdev.name) { 1507 SPDK_ERRLOG("could not allocate comp_bdev name for alias\n"); 1508 return -ENOMEM; 1509 } 1510 } else { 1511 comp_bdev->comp_bdev.name = spdk_sprintf_alloc("COMP_%s", comp_bdev->base_bdev->name); 1512 if (!comp_bdev->comp_bdev.name) { 1513 SPDK_ERRLOG("could not allocate comp_bdev name for unique name\n"); 1514 return -ENOMEM; 1515 } 1516 } 1517 return 0; 1518 } 1519 1520 static void 1521 vbdev_compress_claim(struct vbdev_compress *comp_bdev) 1522 { 1523 int rc; 1524 1525 if (_set_compbdev_name(comp_bdev)) { 1526 goto error_bdev_name; 1527 } 1528 1529 /* Note: some of the fields below will change in the future - for example, 1530 * blockcnt specifically will not match (the compressed volume size will 1531 * be slightly less than the base bdev size) 1532 */ 1533 comp_bdev->comp_bdev.product_name = COMP_BDEV_NAME; 1534 comp_bdev->comp_bdev.write_cache = comp_bdev->base_bdev->write_cache; 1535 1536 if (strcmp(comp_bdev->drv_name, QAT_PMD) == 0) { 1537 comp_bdev->comp_bdev.required_alignment = 1538 spdk_max(spdk_u32log2(comp_bdev->base_bdev->blocklen), 1539 comp_bdev->base_bdev->required_alignment); 1540 SPDK_NOTICELOG("QAT in use: Required alignment set to %u\n", 1541 comp_bdev->comp_bdev.required_alignment); 1542 } else { 1543 comp_bdev->comp_bdev.required_alignment = comp_bdev->base_bdev->required_alignment; 1544 } 1545 comp_bdev->comp_bdev.optimal_io_boundary = 1546 comp_bdev->params.chunk_size / comp_bdev->params.logical_block_size; 1547 1548 comp_bdev->comp_bdev.split_on_optimal_io_boundary = true; 1549 1550 comp_bdev->comp_bdev.blocklen = comp_bdev->base_bdev->blocklen; 1551 comp_bdev->comp_bdev.blockcnt = comp_bdev->params.vol_size / comp_bdev->comp_bdev.blocklen; 1552 assert(comp_bdev->comp_bdev.blockcnt > 0); 1553 1554 /* This is the context that is passed to us when the bdev 1555 * layer calls in so we'll save our comp_bdev node here. 1556 */ 1557 comp_bdev->comp_bdev.ctxt = comp_bdev; 1558 comp_bdev->comp_bdev.fn_table = &vbdev_compress_fn_table; 1559 comp_bdev->comp_bdev.module = &compress_if; 1560 1561 pthread_mutex_init(&comp_bdev->reduce_lock, NULL); 1562 1563 TAILQ_INSERT_TAIL(&g_vbdev_comp, comp_bdev, link); 1564 1565 rc = spdk_bdev_open(comp_bdev->base_bdev, true, vbdev_compress_base_bdev_hotremove_cb, 1566 comp_bdev->base_bdev, &comp_bdev->base_desc); 1567 if (rc) { 1568 SPDK_ERRLOG("could not open bdev %s\n", spdk_bdev_get_name(comp_bdev->base_bdev)); 1569 goto error_open; 1570 } 1571 1572 spdk_io_device_register(comp_bdev, comp_bdev_ch_create_cb, comp_bdev_ch_destroy_cb, 1573 sizeof(struct comp_io_channel), 1574 comp_bdev->comp_bdev.name); 1575 1576 rc = spdk_bdev_module_claim_bdev(comp_bdev->base_bdev, comp_bdev->base_desc, 1577 comp_bdev->comp_bdev.module); 1578 if (rc) { 1579 SPDK_ERRLOG("could not claim bdev %s\n", spdk_bdev_get_name(comp_bdev->base_bdev)); 1580 goto error_claim; 1581 } 1582 1583 rc = spdk_bdev_register(&comp_bdev->comp_bdev); 1584 if (rc < 0) { 1585 SPDK_ERRLOG("trying to register bdev\n"); 1586 goto error_bdev_register; 1587 } 1588 1589 SPDK_NOTICELOG("registered io_device and virtual bdev for: %s\n", comp_bdev->comp_bdev.name); 1590 1591 return; 1592 /* Error cleanup paths. */ 1593 error_bdev_register: 1594 spdk_bdev_module_release_bdev(comp_bdev->base_bdev); 1595 error_claim: 1596 TAILQ_REMOVE(&g_vbdev_comp, comp_bdev, link); 1597 spdk_io_device_unregister(comp_bdev, NULL); 1598 error_open: 1599 free(comp_bdev->comp_bdev.name); 1600 error_bdev_name: 1601 spdk_put_io_channel(comp_bdev->base_ch); 1602 spdk_bdev_close(comp_bdev->base_desc); 1603 free(comp_bdev); 1604 spdk_bdev_module_examine_done(&compress_if); 1605 } 1606 1607 void 1608 bdev_compress_delete(const char *name, spdk_delete_compress_complete cb_fn, void *cb_arg) 1609 { 1610 struct vbdev_compress *comp_bdev = NULL; 1611 1612 TAILQ_FOREACH(comp_bdev, &g_vbdev_comp, link) { 1613 if (strcmp(name, comp_bdev->comp_bdev.name) == 0) { 1614 break; 1615 } 1616 } 1617 1618 if (comp_bdev == NULL) { 1619 cb_fn(cb_arg, -ENODEV); 1620 return; 1621 } 1622 1623 /* Save these for after the vol is destroyed. */ 1624 comp_bdev->delete_cb_fn = cb_fn; 1625 comp_bdev->delete_cb_arg = cb_arg; 1626 1627 /* Tell reducelib that we're done with this volume. */ 1628 if (comp_bdev->orphaned == false) { 1629 spdk_reduce_vol_unload(comp_bdev->vol, delete_vol_unload_cb, comp_bdev); 1630 } else { 1631 delete_vol_unload_cb(comp_bdev, 0); 1632 } 1633 } 1634 1635 /* Callback from reduce for then load is complete. We'll pass the vbdev_comp struct 1636 * used for initial metadata operations to claim where it will be further filled out 1637 * and added to the global list. 1638 */ 1639 static void 1640 vbdev_reduce_load_cb(void *cb_arg, struct spdk_reduce_vol *vol, int reduce_errno) 1641 { 1642 struct vbdev_compress *meta_ctx = cb_arg; 1643 int rc; 1644 1645 /* Done with metadata operations */ 1646 spdk_put_io_channel(meta_ctx->base_ch); 1647 spdk_bdev_close(meta_ctx->base_desc); 1648 meta_ctx->base_desc = NULL; 1649 1650 if (reduce_errno != 0 && reduce_errno != -ENOENT) { 1651 /* This error means it is not a compress disk. */ 1652 if (reduce_errno != -EILSEQ) { 1653 SPDK_ERRLOG("for vol %s, error %u\n", 1654 spdk_bdev_get_name(meta_ctx->base_bdev), reduce_errno); 1655 } 1656 free(meta_ctx); 1657 spdk_bdev_module_examine_done(&compress_if); 1658 return; 1659 } 1660 1661 /* this status means that the vol could not be loaded because 1662 * the pmem file can't be found. 1663 */ 1664 if (reduce_errno == -ENOENT) { 1665 if (_set_compbdev_name(meta_ctx)) { 1666 goto err; 1667 } 1668 1669 /* We still want to open and claim the backing device to protect the data until 1670 * either the pm metadata file is recovered or the comp bdev is deleted. 1671 */ 1672 rc = spdk_bdev_open(meta_ctx->base_bdev, true, vbdev_compress_base_bdev_hotremove_cb, 1673 meta_ctx->base_bdev, &meta_ctx->base_desc); 1674 if (rc) { 1675 SPDK_ERRLOG("could not open bdev %s\n", spdk_bdev_get_name(meta_ctx->base_bdev)); 1676 goto err; 1677 } 1678 1679 meta_ctx->comp_bdev.module = &compress_if; 1680 pthread_mutex_init(&meta_ctx->reduce_lock, NULL); 1681 rc = spdk_bdev_module_claim_bdev(meta_ctx->base_bdev, meta_ctx->base_desc, 1682 meta_ctx->comp_bdev.module); 1683 if (rc) { 1684 SPDK_ERRLOG("could not claim bdev %s\n", spdk_bdev_get_name(meta_ctx->base_bdev)); 1685 goto err; 1686 } 1687 1688 meta_ctx->orphaned = true; 1689 TAILQ_INSERT_TAIL(&g_vbdev_comp, meta_ctx, link); 1690 err: 1691 spdk_bdev_module_examine_done(&compress_if); 1692 return; 1693 } 1694 1695 if (_set_pmd(meta_ctx) == false) { 1696 SPDK_ERRLOG("could not find required pmd\n"); 1697 free(meta_ctx); 1698 spdk_bdev_module_examine_done(&compress_if); 1699 return; 1700 } 1701 1702 /* Update information following volume load. */ 1703 meta_ctx->vol = vol; 1704 memcpy(&meta_ctx->params, spdk_reduce_vol_get_params(vol), 1705 sizeof(struct spdk_reduce_vol_params)); 1706 vbdev_compress_claim(meta_ctx); 1707 spdk_bdev_module_examine_done(&compress_if); 1708 } 1709 1710 /* Examine_disk entry point: will do a metadata load to see if this is ours, 1711 * and if so will go ahead and claim it. 1712 */ 1713 static void 1714 vbdev_compress_examine(struct spdk_bdev *bdev) 1715 { 1716 struct vbdev_compress *meta_ctx; 1717 int rc; 1718 1719 if (strcmp(bdev->product_name, COMP_BDEV_NAME) == 0) { 1720 spdk_bdev_module_examine_done(&compress_if); 1721 return; 1722 } 1723 1724 meta_ctx = _prepare_for_load_init(bdev); 1725 if (meta_ctx == NULL) { 1726 spdk_bdev_module_examine_done(&compress_if); 1727 return; 1728 } 1729 1730 rc = spdk_bdev_open(meta_ctx->base_bdev, false, vbdev_compress_base_bdev_hotremove_cb, 1731 meta_ctx->base_bdev, &meta_ctx->base_desc); 1732 if (rc) { 1733 SPDK_ERRLOG("could not open bdev %s\n", spdk_bdev_get_name(meta_ctx->base_bdev)); 1734 free(meta_ctx); 1735 spdk_bdev_module_examine_done(&compress_if); 1736 return; 1737 } 1738 1739 meta_ctx->base_ch = spdk_bdev_get_io_channel(meta_ctx->base_desc); 1740 spdk_reduce_vol_load(&meta_ctx->backing_dev, vbdev_reduce_load_cb, meta_ctx); 1741 } 1742 1743 int 1744 compress_set_pmd(enum compress_pmd *opts) 1745 { 1746 g_opts = *opts; 1747 1748 return 0; 1749 } 1750 1751 SPDK_LOG_REGISTER_COMPONENT("vbdev_compress", SPDK_LOG_VBDEV_COMPRESS) 1752