1 /*- 2 * BSD LICENSE 3 * 4 * Copyright (c) Intel Corporation. 5 * All rights reserved. 6 * 7 * Redistribution and use in source and binary forms, with or without 8 * modification, are permitted provided that the following conditions 9 * are met: 10 * 11 * * Redistributions of source code must retain the above copyright 12 * notice, this list of conditions and the following disclaimer. 13 * * Redistributions in binary form must reproduce the above copyright 14 * notice, this list of conditions and the following disclaimer in 15 * the documentation and/or other materials provided with the 16 * distribution. 17 * * Neither the name of Intel Corporation nor the names of its 18 * contributors may be used to endorse or promote products derived 19 * from this software without specific prior written permission. 20 * 21 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 22 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 23 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR 24 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT 25 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, 26 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT 27 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 28 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY 29 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 30 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE 31 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 32 */ 33 34 #include "vbdev_compress.h" 35 36 #include "spdk/reduce.h" 37 #include "spdk/stdinc.h" 38 #include "spdk/rpc.h" 39 #include "spdk/env.h" 40 #include "spdk/conf.h" 41 #include "spdk/endian.h" 42 #include "spdk/string.h" 43 #include "spdk/thread.h" 44 #include "spdk/util.h" 45 #include "spdk/bdev_module.h" 46 47 #include "spdk_internal/log.h" 48 49 #include <rte_config.h> 50 #include <rte_bus_vdev.h> 51 #include <rte_compressdev.h> 52 #include <rte_comp.h> 53 54 #define NUM_MAX_XFORMS 2 55 #define NUM_MAX_INFLIGHT_OPS 128 56 #define DEFAULT_WINDOW_SIZE 15 57 /* We need extra mbufs per operation to accommodate host buffers that 58 * span a 2MB boundary. 59 */ 60 #define MAX_MBUFS_PER_OP (REDUCE_MAX_IOVECS * 2) 61 #define CHUNK_SIZE (1024 * 16) 62 #define COMP_BDEV_NAME "compress" 63 #define BACKING_IO_SZ (4 * 1024) 64 65 #define ISAL_PMD "compress_isal" 66 #define QAT_PMD "compress_qat" 67 #define NUM_MBUFS 8192 68 #define POOL_CACHE_SIZE 256 69 70 static enum compress_pmd g_opts; 71 72 /* Global list of available compression devices. */ 73 struct compress_dev { 74 struct rte_compressdev_info cdev_info; /* includes device friendly name */ 75 uint8_t cdev_id; /* identifier for the device */ 76 void *comp_xform; /* shared private xform for comp on this PMD */ 77 void *decomp_xform; /* shared private xform for decomp on this PMD */ 78 TAILQ_ENTRY(compress_dev) link; 79 }; 80 static TAILQ_HEAD(, compress_dev) g_compress_devs = TAILQ_HEAD_INITIALIZER(g_compress_devs); 81 82 /* Although ISAL PMD reports 'unlimited' qpairs, it has an unplanned limit of 99 due to 83 * the length of the internal ring name that it creates, it breaks a limit in the generic 84 * ring code and fails the qp initialization. 85 */ 86 #define MAX_NUM_QP 99 87 /* Global list and lock for unique device/queue pair combos */ 88 struct comp_device_qp { 89 struct compress_dev *device; /* ptr to compression device */ 90 uint8_t qp; /* queue pair for this node */ 91 struct spdk_thread *thread; /* thead that this qp is assigned to */ 92 TAILQ_ENTRY(comp_device_qp) link; 93 }; 94 static TAILQ_HEAD(, comp_device_qp) g_comp_device_qp = TAILQ_HEAD_INITIALIZER(g_comp_device_qp); 95 static pthread_mutex_t g_comp_device_qp_lock = PTHREAD_MUTEX_INITIALIZER; 96 97 /* For queueing up compression operations that we can't submit for some reason */ 98 struct vbdev_comp_op { 99 struct spdk_reduce_backing_dev *backing_dev; 100 struct iovec *src_iovs; 101 int src_iovcnt; 102 struct iovec *dst_iovs; 103 int dst_iovcnt; 104 bool compress; 105 void *cb_arg; 106 TAILQ_ENTRY(vbdev_comp_op) link; 107 }; 108 109 /* List of virtual bdevs and associated info for each. */ 110 struct vbdev_compress { 111 struct spdk_bdev *base_bdev; /* the thing we're attaching to */ 112 struct spdk_bdev_desc *base_desc; /* its descriptor we get from open */ 113 struct spdk_io_channel *base_ch; /* IO channel of base device */ 114 struct spdk_bdev comp_bdev; /* the compression virtual bdev */ 115 struct comp_io_channel *comp_ch; /* channel associated with this bdev */ 116 char *drv_name; /* name of the compression device driver */ 117 struct comp_device_qp *device_qp; 118 struct spdk_thread *reduce_thread; 119 pthread_mutex_t reduce_lock; 120 uint32_t ch_count; 121 TAILQ_HEAD(, spdk_bdev_io) pending_comp_ios; /* outstanding operations to a comp library */ 122 struct spdk_poller *poller; /* completion poller */ 123 struct spdk_reduce_vol_params params; /* params for the reduce volume */ 124 struct spdk_reduce_backing_dev backing_dev; /* backing device info for the reduce volume */ 125 struct spdk_reduce_vol *vol; /* the reduce volume */ 126 spdk_delete_compress_complete delete_cb_fn; 127 void *delete_cb_arg; 128 bool orphaned; /* base bdev claimed but comp_bdev not registered */ 129 TAILQ_HEAD(, vbdev_comp_op) queued_comp_ops; 130 TAILQ_ENTRY(vbdev_compress) link; 131 struct spdk_thread *thread; /* thread where base device is opened */ 132 }; 133 static TAILQ_HEAD(, vbdev_compress) g_vbdev_comp = TAILQ_HEAD_INITIALIZER(g_vbdev_comp); 134 135 /* The comp vbdev channel struct. It is allocated and freed on my behalf by the io channel code. 136 */ 137 struct comp_io_channel { 138 struct spdk_io_channel_iter *iter; /* used with for_each_channel in reset */ 139 }; 140 141 /* Per I/O context for the compression vbdev. */ 142 struct comp_bdev_io { 143 struct comp_io_channel *comp_ch; /* used in completion handling */ 144 struct vbdev_compress *comp_bdev; /* vbdev associated with this IO */ 145 struct spdk_bdev_io_wait_entry bdev_io_wait; /* for bdev_io_wait */ 146 struct spdk_bdev_io *orig_io; /* the original IO */ 147 struct spdk_io_channel *ch; /* for resubmission */ 148 int status; /* save for completion on orig thread */ 149 }; 150 151 /* Shared mempools between all devices on this system */ 152 static struct rte_mempool *g_mbuf_mp = NULL; /* mbuf mempool */ 153 static struct rte_mempool *g_comp_op_mp = NULL; /* comp operations, must be rte* mempool */ 154 static struct rte_mbuf_ext_shared_info g_shinfo = {}; /* used by DPDK mbuf macros */ 155 static bool g_qat_available = false; 156 static bool g_isal_available = false; 157 158 /* Create shared (between all ops per PMD) compress xforms. */ 159 static struct rte_comp_xform g_comp_xform = { 160 .type = RTE_COMP_COMPRESS, 161 .compress = { 162 .algo = RTE_COMP_ALGO_DEFLATE, 163 .deflate.huffman = RTE_COMP_HUFFMAN_DEFAULT, 164 .level = RTE_COMP_LEVEL_MAX, 165 .window_size = DEFAULT_WINDOW_SIZE, 166 .chksum = RTE_COMP_CHECKSUM_NONE, 167 .hash_algo = RTE_COMP_HASH_ALGO_NONE 168 } 169 }; 170 /* Create shared (between all ops per PMD) decompress xforms. */ 171 static struct rte_comp_xform g_decomp_xform = { 172 .type = RTE_COMP_DECOMPRESS, 173 .decompress = { 174 .algo = RTE_COMP_ALGO_DEFLATE, 175 .chksum = RTE_COMP_CHECKSUM_NONE, 176 .window_size = DEFAULT_WINDOW_SIZE, 177 .hash_algo = RTE_COMP_HASH_ALGO_NONE 178 } 179 }; 180 181 static void vbdev_compress_examine(struct spdk_bdev *bdev); 182 static void vbdev_compress_claim(struct vbdev_compress *comp_bdev); 183 static void vbdev_compress_queue_io(struct spdk_bdev_io *bdev_io); 184 struct vbdev_compress *_prepare_for_load_init(struct spdk_bdev *bdev); 185 static void vbdev_compress_submit_request(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io); 186 static void comp_bdev_ch_destroy_cb(void *io_device, void *ctx_buf); 187 188 /* Dummy function used by DPDK to free ext attached buffers 189 * to mbufs, we free them ourselves but this callback has to 190 * be here. 191 */ 192 static void 193 shinfo_free_cb(void *arg1, void *arg2) 194 { 195 } 196 197 /* Called by vbdev_init_compress_drivers() to init each discovered compression device */ 198 static int 199 create_compress_dev(uint8_t index) 200 { 201 struct compress_dev *device; 202 uint16_t q_pairs; 203 uint8_t cdev_id; 204 int rc, i; 205 struct comp_device_qp *dev_qp; 206 struct comp_device_qp *tmp_qp; 207 208 device = calloc(1, sizeof(struct compress_dev)); 209 if (!device) { 210 return -ENOMEM; 211 } 212 213 /* Get details about this device. */ 214 rte_compressdev_info_get(index, &device->cdev_info); 215 216 cdev_id = device->cdev_id = index; 217 218 /* Zero means no limit so choose number of lcores. */ 219 if (device->cdev_info.max_nb_queue_pairs == 0) { 220 q_pairs = MAX_NUM_QP; 221 } else { 222 q_pairs = spdk_min(device->cdev_info.max_nb_queue_pairs, MAX_NUM_QP); 223 } 224 225 /* Configure the compression device. */ 226 struct rte_compressdev_config config = { 227 .socket_id = rte_socket_id(), 228 .nb_queue_pairs = q_pairs, 229 .max_nb_priv_xforms = NUM_MAX_XFORMS, 230 .max_nb_streams = 0 231 }; 232 rc = rte_compressdev_configure(cdev_id, &config); 233 if (rc < 0) { 234 SPDK_ERRLOG("Failed to configure compressdev %u\n", cdev_id); 235 goto err; 236 } 237 238 /* Pre-setup all potential qpairs now and assign them in the channel 239 * callback. 240 */ 241 for (i = 0; i < q_pairs; i++) { 242 rc = rte_compressdev_queue_pair_setup(cdev_id, i, 243 NUM_MAX_INFLIGHT_OPS, 244 rte_socket_id()); 245 if (rc) { 246 if (i > 0) { 247 q_pairs = i; 248 SPDK_NOTICELOG("FYI failed to setup a queue pair on " 249 "compressdev %u with error %u " 250 "so limiting to %u qpairs\n", 251 cdev_id, rc, q_pairs); 252 break; 253 } else { 254 SPDK_ERRLOG("Failed to setup queue pair on " 255 "compressdev %u with error %u\n", cdev_id, rc); 256 rc = -EINVAL; 257 goto err; 258 } 259 } 260 } 261 262 rc = rte_compressdev_start(cdev_id); 263 if (rc < 0) { 264 SPDK_ERRLOG("Failed to start device %u: error %d\n", 265 cdev_id, rc); 266 goto err; 267 } 268 269 if (device->cdev_info.capabilities->comp_feature_flags & RTE_COMP_FF_SHAREABLE_PRIV_XFORM) { 270 rc = rte_compressdev_private_xform_create(cdev_id, &g_comp_xform, 271 &device->comp_xform); 272 if (rc < 0) { 273 SPDK_ERRLOG("Failed to create private comp xform device %u: error %d\n", 274 cdev_id, rc); 275 goto err; 276 } 277 278 rc = rte_compressdev_private_xform_create(cdev_id, &g_decomp_xform, 279 &device->decomp_xform); 280 if (rc) { 281 SPDK_ERRLOG("Failed to create private decomp xform device %u: error %d\n", 282 cdev_id, rc); 283 goto err; 284 } 285 } else { 286 SPDK_ERRLOG("PMD does not support shared transforms\n"); 287 goto err; 288 } 289 290 /* Build up list of device/qp combinations */ 291 for (i = 0; i < q_pairs; i++) { 292 dev_qp = calloc(1, sizeof(struct comp_device_qp)); 293 if (!dev_qp) { 294 rc = -ENOMEM; 295 goto err; 296 } 297 dev_qp->device = device; 298 dev_qp->qp = i; 299 dev_qp->thread = NULL; 300 TAILQ_INSERT_TAIL(&g_comp_device_qp, dev_qp, link); 301 } 302 303 TAILQ_INSERT_TAIL(&g_compress_devs, device, link); 304 305 if (strcmp(device->cdev_info.driver_name, QAT_PMD) == 0) { 306 g_qat_available = true; 307 } 308 if (strcmp(device->cdev_info.driver_name, ISAL_PMD) == 0) { 309 g_isal_available = true; 310 } 311 312 return 0; 313 314 err: 315 TAILQ_FOREACH_SAFE(dev_qp, &g_comp_device_qp, link, tmp_qp) { 316 TAILQ_REMOVE(&g_comp_device_qp, dev_qp, link); 317 free(dev_qp); 318 } 319 free(device); 320 return rc; 321 } 322 323 /* Called from driver init entry point, vbdev_compress_init() */ 324 static int 325 vbdev_init_compress_drivers(void) 326 { 327 uint8_t cdev_count, i; 328 struct compress_dev *tmp_dev; 329 struct compress_dev *device; 330 int rc; 331 332 /* We always init the compress_isal PMD */ 333 rc = rte_vdev_init(ISAL_PMD, NULL); 334 if (rc == 0) { 335 SPDK_NOTICELOG("created virtual PMD %s\n", ISAL_PMD); 336 } else if (rc == -EEXIST) { 337 SPDK_NOTICELOG("virtual PMD %s already exists.\n", ISAL_PMD); 338 } else { 339 SPDK_ERRLOG("creating virtual PMD %s\n", ISAL_PMD); 340 return -EINVAL; 341 } 342 343 /* If we have no compression devices, there's no reason to continue. */ 344 cdev_count = rte_compressdev_count(); 345 if (cdev_count == 0) { 346 return 0; 347 } 348 if (cdev_count > RTE_COMPRESS_MAX_DEVS) { 349 SPDK_ERRLOG("invalid device count from rte_compressdev_count()\n"); 350 return -EINVAL; 351 } 352 353 g_mbuf_mp = rte_pktmbuf_pool_create("comp_mbuf_mp", NUM_MBUFS, POOL_CACHE_SIZE, 354 sizeof(struct rte_mbuf), 0, rte_socket_id()); 355 if (g_mbuf_mp == NULL) { 356 SPDK_ERRLOG("Cannot create mbuf pool\n"); 357 rc = -ENOMEM; 358 goto error_create_mbuf; 359 } 360 361 g_comp_op_mp = rte_comp_op_pool_create("comp_op_pool", NUM_MBUFS, POOL_CACHE_SIZE, 362 0, rte_socket_id()); 363 if (g_comp_op_mp == NULL) { 364 SPDK_ERRLOG("Cannot create comp op pool\n"); 365 rc = -ENOMEM; 366 goto error_create_op; 367 } 368 369 /* Init all devices */ 370 for (i = 0; i < cdev_count; i++) { 371 rc = create_compress_dev(i); 372 if (rc != 0) { 373 goto error_create_compress_devs; 374 } 375 } 376 377 if (g_qat_available == true) { 378 SPDK_NOTICELOG("initialized QAT PMD\n"); 379 } 380 381 g_shinfo.free_cb = shinfo_free_cb; 382 383 return 0; 384 385 /* Error cleanup paths. */ 386 error_create_compress_devs: 387 TAILQ_FOREACH_SAFE(device, &g_compress_devs, link, tmp_dev) { 388 TAILQ_REMOVE(&g_compress_devs, device, link); 389 free(device); 390 } 391 error_create_op: 392 error_create_mbuf: 393 rte_mempool_free(g_mbuf_mp); 394 395 return rc; 396 } 397 398 /* for completing rw requests on the orig IO thread. */ 399 static void 400 _spdk_reduce_rw_blocks_cb(void *arg) 401 { 402 struct comp_bdev_io *io_ctx = arg; 403 404 if (io_ctx->status == 0) { 405 spdk_bdev_io_complete(io_ctx->orig_io, SPDK_BDEV_IO_STATUS_SUCCESS); 406 } else { 407 SPDK_ERRLOG("status %d on operation from reduce API\n", io_ctx->status); 408 spdk_bdev_io_complete(io_ctx->orig_io, SPDK_BDEV_IO_STATUS_FAILED); 409 } 410 } 411 412 /* Completion callback for r/w that were issued via reducelib. */ 413 static void 414 spdk_reduce_rw_blocks_cb(void *arg, int reduce_errno) 415 { 416 struct spdk_bdev_io *bdev_io = arg; 417 struct comp_bdev_io *io_ctx = (struct comp_bdev_io *)bdev_io->driver_ctx; 418 struct spdk_io_channel *ch = spdk_io_channel_from_ctx(io_ctx->comp_ch); 419 420 /* TODO: need to decide which error codes are bdev_io success vs failure; 421 * example examine calls reading metadata */ 422 423 io_ctx->status = reduce_errno; 424 425 /* Send this request to the orig IO thread. */ 426 if (spdk_io_channel_get_thread(ch) != spdk_get_thread()) { 427 spdk_thread_send_msg(spdk_io_channel_get_thread(ch), _spdk_reduce_rw_blocks_cb, io_ctx); 428 } else { 429 _spdk_reduce_rw_blocks_cb(io_ctx); 430 } 431 } 432 433 static int 434 _compress_operation(struct spdk_reduce_backing_dev *backing_dev, struct iovec *src_iovs, 435 int src_iovcnt, struct iovec *dst_iovs, 436 int dst_iovcnt, bool compress, void *cb_arg) 437 { 438 void *reduce_cb_arg = cb_arg; 439 struct vbdev_compress *comp_bdev = SPDK_CONTAINEROF(backing_dev, struct vbdev_compress, 440 backing_dev); 441 struct rte_comp_op *comp_op; 442 struct rte_mbuf *src_mbufs[MAX_MBUFS_PER_OP]; 443 struct rte_mbuf *dst_mbufs[MAX_MBUFS_PER_OP]; 444 uint8_t cdev_id = comp_bdev->device_qp->device->cdev_id; 445 uint64_t updated_length, remainder, phys_addr, total_length = 0; 446 uint8_t *current_src_base = NULL; 447 uint8_t *current_dst_base = NULL; 448 int iov_index, mbuf_index; 449 int rc = 0; 450 struct vbdev_comp_op *op_to_queue; 451 int i; 452 int src_mbuf_total = src_iovcnt; 453 int dst_mbuf_total = dst_iovcnt; 454 bool device_error = false; 455 456 assert(src_iovcnt < MAX_MBUFS_PER_OP); 457 458 #ifdef DEBUG 459 memset(src_mbufs, 0, sizeof(src_mbufs)); 460 memset(dst_mbufs, 0, sizeof(dst_mbufs)); 461 #endif 462 463 comp_op = rte_comp_op_alloc(g_comp_op_mp); 464 if (!comp_op) { 465 SPDK_ERRLOG("trying to get a comp op!\n"); 466 goto error_get_op; 467 } 468 469 /* get an mbuf per iov, src and dst */ 470 rc = rte_pktmbuf_alloc_bulk(g_mbuf_mp, (struct rte_mbuf **)&src_mbufs[0], src_iovcnt); 471 if (rc) { 472 SPDK_ERRLOG("ERROR trying to get src_mbufs!\n"); 473 goto error_get_src; 474 } 475 476 rc = rte_pktmbuf_alloc_bulk(g_mbuf_mp, (struct rte_mbuf **)&dst_mbufs[0], dst_iovcnt); 477 if (rc) { 478 SPDK_ERRLOG("ERROR trying to get dst_mbufs!\n"); 479 goto error_get_dst; 480 } 481 482 /* There is a 1:1 mapping between a bdev_io and a compression operation, but 483 * all compression PMDs that SPDK uses support chaining so build our mbuf chain 484 * and associate with our single comp_op. 485 */ 486 487 /* Setup src mbufs */ 488 iov_index = mbuf_index = 0; 489 while (iov_index < src_iovcnt) { 490 491 current_src_base = src_iovs[iov_index].iov_base; 492 total_length += src_iovs[iov_index].iov_len; 493 assert(src_mbufs[mbuf_index] != NULL); 494 src_mbufs[mbuf_index]->userdata = reduce_cb_arg; 495 updated_length = src_iovs[iov_index].iov_len; 496 phys_addr = spdk_vtophys((void *)current_src_base, &updated_length); 497 498 rte_pktmbuf_attach_extbuf(src_mbufs[mbuf_index], 499 current_src_base, 500 phys_addr, 501 updated_length, 502 &g_shinfo); 503 rte_pktmbuf_append(src_mbufs[mbuf_index], updated_length); 504 remainder = src_iovs[iov_index].iov_len - updated_length; 505 506 if (mbuf_index > 0) { 507 rte_pktmbuf_chain(src_mbufs[0], src_mbufs[mbuf_index]); 508 } 509 510 /* If we crossed 2 2MB boundary we need another mbuf for the remainder */ 511 if (remainder > 0) { 512 /* allocate an mbuf at the end of the array */ 513 rc = rte_pktmbuf_alloc_bulk(g_mbuf_mp, (struct rte_mbuf **)&src_mbufs[src_mbuf_total], 1); 514 if (rc) { 515 SPDK_ERRLOG("ERROR trying to get an extra src_mbuf!\n"); 516 goto error_src_dst; 517 } 518 src_mbuf_total++; 519 mbuf_index++; 520 src_mbufs[mbuf_index]->userdata = reduce_cb_arg; 521 current_src_base += updated_length; 522 phys_addr = spdk_vtophys((void *)current_src_base, &remainder); 523 /* assert we don't cross another */ 524 assert(remainder == src_iovs[iov_index].iov_len - updated_length); 525 526 rte_pktmbuf_attach_extbuf(src_mbufs[mbuf_index], 527 current_src_base, 528 phys_addr, 529 remainder, 530 &g_shinfo); 531 rte_pktmbuf_append(src_mbufs[mbuf_index], remainder); 532 rte_pktmbuf_chain(src_mbufs[0], src_mbufs[mbuf_index]); 533 } 534 iov_index++; 535 mbuf_index++; 536 } 537 538 comp_op->m_src = src_mbufs[0]; 539 comp_op->src.offset = 0; 540 comp_op->src.length = total_length; 541 542 /* setup dst mbufs, for the current test being used with this code there's only one vector */ 543 iov_index = mbuf_index = 0; 544 while (iov_index < dst_iovcnt) { 545 546 current_dst_base = dst_iovs[iov_index].iov_base; 547 updated_length = dst_iovs[iov_index].iov_len; 548 phys_addr = spdk_vtophys((void *)current_dst_base, &updated_length); 549 550 rte_pktmbuf_attach_extbuf(dst_mbufs[mbuf_index], 551 current_dst_base, 552 phys_addr, 553 updated_length, 554 &g_shinfo); 555 rte_pktmbuf_append(dst_mbufs[mbuf_index], updated_length); 556 remainder = dst_iovs[iov_index].iov_len - updated_length; 557 558 if (mbuf_index > 0) { 559 rte_pktmbuf_chain(dst_mbufs[0], dst_mbufs[mbuf_index]); 560 } 561 562 /* If we crossed 2 2MB boundary we need another mbuf for the remainder */ 563 if (remainder > 0) { 564 rc = rte_pktmbuf_alloc_bulk(g_mbuf_mp, (struct rte_mbuf **)&dst_mbufs[dst_mbuf_total], 1); 565 if (rc) { 566 SPDK_ERRLOG("ERROR trying to get an extra dst_mbuf!\n"); 567 goto error_src_dst; 568 } 569 dst_mbuf_total++; 570 mbuf_index++; 571 current_dst_base += updated_length; 572 phys_addr = spdk_vtophys((void *)current_dst_base, &remainder); 573 /* assert we don't cross another */ 574 assert(remainder == dst_iovs[iov_index].iov_len - updated_length); 575 576 rte_pktmbuf_attach_extbuf(dst_mbufs[mbuf_index], 577 current_dst_base, 578 phys_addr, 579 remainder, 580 &g_shinfo); 581 rte_pktmbuf_append(dst_mbufs[mbuf_index], remainder); 582 rte_pktmbuf_chain(dst_mbufs[0], dst_mbufs[mbuf_index]); 583 } 584 iov_index++; 585 mbuf_index++; 586 } 587 588 comp_op->m_dst = dst_mbufs[0]; 589 comp_op->dst.offset = 0; 590 591 if (compress == true) { 592 comp_op->private_xform = comp_bdev->device_qp->device->comp_xform; 593 } else { 594 comp_op->private_xform = comp_bdev->device_qp->device->decomp_xform; 595 } 596 597 comp_op->op_type = RTE_COMP_OP_STATELESS; 598 comp_op->flush_flag = RTE_COMP_FLUSH_FINAL; 599 600 rc = rte_compressdev_enqueue_burst(cdev_id, comp_bdev->device_qp->qp, &comp_op, 1); 601 assert(rc <= 1); 602 603 /* We always expect 1 got queued, if 0 then we need to queue it up. */ 604 if (rc == 1) { 605 return 0; 606 } else if (comp_op->status == RTE_COMP_OP_STATUS_NOT_PROCESSED) { 607 /* we free mbufs differently depending on whether they were chained or not */ 608 rte_pktmbuf_free(comp_op->m_src); 609 rte_pktmbuf_free(comp_op->m_dst); 610 goto error_enqueue; 611 } else { 612 device_error = true; 613 goto error_src_dst; 614 } 615 616 /* Error cleanup paths. */ 617 error_src_dst: 618 for (i = 0; i < dst_mbuf_total; i++) { 619 rte_pktmbuf_free((struct rte_mbuf *)&dst_mbufs[i]); 620 } 621 error_get_dst: 622 for (i = 0; i < src_mbuf_total; i++) { 623 rte_pktmbuf_free((struct rte_mbuf *)&src_mbufs[i]); 624 } 625 error_get_src: 626 error_enqueue: 627 rte_comp_op_free(comp_op); 628 error_get_op: 629 630 if (device_error == true) { 631 /* There was an error sending the op to the device, most 632 * likely with the parameters. 633 */ 634 SPDK_ERRLOG("Compression API returned 0x%x\n", comp_op->status); 635 return -EINVAL; 636 } 637 638 op_to_queue = calloc(1, sizeof(struct vbdev_comp_op)); 639 if (op_to_queue == NULL) { 640 SPDK_ERRLOG("unable to allocate operation for queueing.\n"); 641 return -ENOMEM; 642 } 643 op_to_queue->backing_dev = backing_dev; 644 op_to_queue->src_iovs = src_iovs; 645 op_to_queue->src_iovcnt = src_iovcnt; 646 op_to_queue->dst_iovs = dst_iovs; 647 op_to_queue->dst_iovcnt = dst_iovcnt; 648 op_to_queue->compress = compress; 649 op_to_queue->cb_arg = cb_arg; 650 TAILQ_INSERT_TAIL(&comp_bdev->queued_comp_ops, 651 op_to_queue, 652 link); 653 return 0; 654 } 655 656 /* Poller for the DPDK compression driver. */ 657 static int 658 comp_dev_poller(void *args) 659 { 660 struct vbdev_compress *comp_bdev = args; 661 uint8_t cdev_id = comp_bdev->device_qp->device->cdev_id; 662 struct rte_comp_op *deq_ops[NUM_MAX_INFLIGHT_OPS]; 663 uint16_t num_deq; 664 struct spdk_reduce_vol_cb_args *reduce_args; 665 struct vbdev_comp_op *op_to_resubmit; 666 int rc, i; 667 668 num_deq = rte_compressdev_dequeue_burst(cdev_id, comp_bdev->device_qp->qp, deq_ops, 669 NUM_MAX_INFLIGHT_OPS); 670 for (i = 0; i < num_deq; i++) { 671 reduce_args = (struct spdk_reduce_vol_cb_args *)deq_ops[i]->m_src->userdata; 672 673 if (deq_ops[i]->status == RTE_COMP_OP_STATUS_SUCCESS) { 674 675 /* tell reduce this is done and what the bytecount was */ 676 reduce_args->cb_fn(reduce_args->cb_arg, deq_ops[i]->produced); 677 } else { 678 SPDK_NOTICELOG("FYI storing data uncompressed due to deque status %u\n", 679 deq_ops[i]->status); 680 681 /* Reduce will simply store uncompressed on neg errno value. */ 682 reduce_args->cb_fn(reduce_args->cb_arg, -EINVAL); 683 } 684 685 /* Now free both mbufs and the compress operation. The rte_pktmbuf_free() 686 * call takes care of freeing all of the mbufs in the chain back to their 687 * original pool. 688 */ 689 rte_pktmbuf_free(deq_ops[i]->m_src); 690 rte_pktmbuf_free(deq_ops[i]->m_dst); 691 692 /* There is no bulk free for com ops so we have to free them one at a time 693 * here however it would be rare that we'd ever have more than 1 at a time 694 * anyways. 695 */ 696 rte_comp_op_free(deq_ops[i]); 697 698 /* Check if there are any pending comp ops to process, only pull one 699 * at a time off as _compress_operation() may re-queue the op. 700 */ 701 if (!TAILQ_EMPTY(&comp_bdev->queued_comp_ops)) { 702 op_to_resubmit = TAILQ_FIRST(&comp_bdev->queued_comp_ops); 703 rc = _compress_operation(op_to_resubmit->backing_dev, 704 op_to_resubmit->src_iovs, 705 op_to_resubmit->src_iovcnt, 706 op_to_resubmit->dst_iovs, 707 op_to_resubmit->dst_iovcnt, 708 op_to_resubmit->compress, 709 op_to_resubmit->cb_arg); 710 if (rc == 0) { 711 TAILQ_REMOVE(&comp_bdev->queued_comp_ops, op_to_resubmit, link); 712 free(op_to_resubmit); 713 } 714 } 715 } 716 return 0; 717 } 718 719 /* Entry point for reduce lib to issue a compress operation. */ 720 static void 721 _comp_reduce_compress(struct spdk_reduce_backing_dev *dev, 722 struct iovec *src_iovs, int src_iovcnt, 723 struct iovec *dst_iovs, int dst_iovcnt, 724 struct spdk_reduce_vol_cb_args *cb_arg) 725 { 726 int rc; 727 728 rc = _compress_operation(dev, src_iovs, src_iovcnt, dst_iovs, dst_iovcnt, true, cb_arg); 729 if (rc) { 730 SPDK_ERRLOG("with compress operation code %d (%s)\n", rc, spdk_strerror(-rc)); 731 cb_arg->cb_fn(cb_arg->cb_arg, rc); 732 } 733 } 734 735 /* Entry point for reduce lib to issue a decompress operation. */ 736 static void 737 _comp_reduce_decompress(struct spdk_reduce_backing_dev *dev, 738 struct iovec *src_iovs, int src_iovcnt, 739 struct iovec *dst_iovs, int dst_iovcnt, 740 struct spdk_reduce_vol_cb_args *cb_arg) 741 { 742 int rc; 743 744 rc = _compress_operation(dev, src_iovs, src_iovcnt, dst_iovs, dst_iovcnt, false, cb_arg); 745 if (rc) { 746 SPDK_ERRLOG("with decompress operation code %d (%s)\n", rc, spdk_strerror(-rc)); 747 cb_arg->cb_fn(cb_arg->cb_arg, rc); 748 } 749 } 750 751 /* Callback for getting a buf from the bdev pool in the event that the caller passed 752 * in NULL, we need to own the buffer so it doesn't get freed by another vbdev module 753 * beneath us before we're done with it. 754 */ 755 static void 756 comp_read_get_buf_cb(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io, bool success) 757 { 758 struct vbdev_compress *comp_bdev = SPDK_CONTAINEROF(bdev_io->bdev, struct vbdev_compress, 759 comp_bdev); 760 761 spdk_reduce_vol_readv(comp_bdev->vol, bdev_io->u.bdev.iovs, bdev_io->u.bdev.iovcnt, 762 bdev_io->u.bdev.offset_blocks, bdev_io->u.bdev.num_blocks, 763 spdk_reduce_rw_blocks_cb, bdev_io); 764 } 765 766 /* scheduled for completion on IO thread */ 767 static void 768 _complete_other_io(void *arg) 769 { 770 struct comp_bdev_io *io_ctx = (struct comp_bdev_io *)arg; 771 if (io_ctx->status == 0) { 772 spdk_bdev_io_complete(io_ctx->orig_io, SPDK_BDEV_IO_STATUS_SUCCESS); 773 } else { 774 spdk_bdev_io_complete(io_ctx->orig_io, SPDK_BDEV_IO_STATUS_FAILED); 775 } 776 } 777 778 /* scheduled for submission on reduce thread */ 779 static void 780 _comp_bdev_io_submit(void *arg) 781 { 782 struct spdk_bdev_io *bdev_io = arg; 783 struct comp_bdev_io *io_ctx = (struct comp_bdev_io *)bdev_io->driver_ctx; 784 struct spdk_io_channel *ch = spdk_io_channel_from_ctx(io_ctx->comp_ch); 785 struct vbdev_compress *comp_bdev = SPDK_CONTAINEROF(bdev_io->bdev, struct vbdev_compress, 786 comp_bdev); 787 int rc = 0; 788 789 switch (bdev_io->type) { 790 case SPDK_BDEV_IO_TYPE_READ: 791 spdk_bdev_io_get_buf(bdev_io, comp_read_get_buf_cb, 792 bdev_io->u.bdev.num_blocks * bdev_io->bdev->blocklen); 793 return; 794 case SPDK_BDEV_IO_TYPE_WRITE: 795 spdk_reduce_vol_writev(comp_bdev->vol, bdev_io->u.bdev.iovs, bdev_io->u.bdev.iovcnt, 796 bdev_io->u.bdev.offset_blocks, bdev_io->u.bdev.num_blocks, 797 spdk_reduce_rw_blocks_cb, bdev_io); 798 return; 799 /* TODO in future patch in the series */ 800 case SPDK_BDEV_IO_TYPE_RESET: 801 break; 802 case SPDK_BDEV_IO_TYPE_WRITE_ZEROES: 803 case SPDK_BDEV_IO_TYPE_UNMAP: 804 case SPDK_BDEV_IO_TYPE_FLUSH: 805 default: 806 SPDK_ERRLOG("Unknown I/O type %d\n", bdev_io->type); 807 rc = -EINVAL; 808 } 809 810 if (rc) { 811 if (rc == -ENOMEM) { 812 SPDK_ERRLOG("No memory, start to queue io for compress.\n"); 813 io_ctx->ch = ch; 814 vbdev_compress_queue_io(bdev_io); 815 return; 816 } else { 817 SPDK_ERRLOG("on bdev_io submission!\n"); 818 io_ctx->status = rc; 819 } 820 } 821 822 /* Complete this on the orig IO thread. */ 823 if (spdk_io_channel_get_thread(ch) != spdk_get_thread()) { 824 spdk_thread_send_msg(spdk_io_channel_get_thread(ch), _complete_other_io, io_ctx); 825 } else { 826 _complete_other_io(io_ctx); 827 } 828 } 829 830 /* Called when someone above submits IO to this vbdev. */ 831 static void 832 vbdev_compress_submit_request(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io) 833 { 834 struct comp_bdev_io *io_ctx = (struct comp_bdev_io *)bdev_io->driver_ctx; 835 struct vbdev_compress *comp_bdev = SPDK_CONTAINEROF(bdev_io->bdev, struct vbdev_compress, 836 comp_bdev); 837 struct comp_io_channel *comp_ch = spdk_io_channel_get_ctx(ch); 838 839 memset(io_ctx, 0, sizeof(struct comp_bdev_io)); 840 io_ctx->comp_bdev = comp_bdev; 841 io_ctx->comp_ch = comp_ch; 842 io_ctx->orig_io = bdev_io; 843 844 /* Send this request to the reduce_thread if that's not what we're on. */ 845 if (spdk_io_channel_get_thread(ch) != comp_bdev->reduce_thread) { 846 spdk_thread_send_msg(comp_bdev->reduce_thread, _comp_bdev_io_submit, bdev_io); 847 } else { 848 _comp_bdev_io_submit(bdev_io); 849 } 850 } 851 852 static bool 853 vbdev_compress_io_type_supported(void *ctx, enum spdk_bdev_io_type io_type) 854 { 855 struct vbdev_compress *comp_bdev = (struct vbdev_compress *)ctx; 856 857 switch (io_type) { 858 case SPDK_BDEV_IO_TYPE_READ: 859 case SPDK_BDEV_IO_TYPE_WRITE: 860 return spdk_bdev_io_type_supported(comp_bdev->base_bdev, io_type); 861 case SPDK_BDEV_IO_TYPE_UNMAP: 862 case SPDK_BDEV_IO_TYPE_RESET: 863 case SPDK_BDEV_IO_TYPE_FLUSH: 864 case SPDK_BDEV_IO_TYPE_WRITE_ZEROES: 865 default: 866 return false; 867 } 868 } 869 870 /* Resubmission function used by the bdev layer when a queued IO is ready to be 871 * submitted. 872 */ 873 static void 874 vbdev_compress_resubmit_io(void *arg) 875 { 876 struct spdk_bdev_io *bdev_io = (struct spdk_bdev_io *)arg; 877 struct comp_bdev_io *io_ctx = (struct comp_bdev_io *)bdev_io->driver_ctx; 878 879 vbdev_compress_submit_request(io_ctx->ch, bdev_io); 880 } 881 882 /* Used to queue an IO in the event of resource issues. */ 883 static void 884 vbdev_compress_queue_io(struct spdk_bdev_io *bdev_io) 885 { 886 struct comp_bdev_io *io_ctx = (struct comp_bdev_io *)bdev_io->driver_ctx; 887 int rc; 888 889 io_ctx->bdev_io_wait.bdev = bdev_io->bdev; 890 io_ctx->bdev_io_wait.cb_fn = vbdev_compress_resubmit_io; 891 io_ctx->bdev_io_wait.cb_arg = bdev_io; 892 893 rc = spdk_bdev_queue_io_wait(bdev_io->bdev, io_ctx->comp_bdev->base_ch, &io_ctx->bdev_io_wait); 894 if (rc) { 895 SPDK_ERRLOG("Queue io failed in vbdev_compress_queue_io, rc=%d.\n", rc); 896 assert(false); 897 spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED); 898 } 899 } 900 901 /* Callback for unregistering the IO device. */ 902 static void 903 _device_unregister_cb(void *io_device) 904 { 905 struct vbdev_compress *comp_bdev = io_device; 906 907 /* Done with this comp_bdev. */ 908 pthread_mutex_destroy(&comp_bdev->reduce_lock); 909 free(comp_bdev->comp_bdev.name); 910 free(comp_bdev); 911 } 912 913 static void 914 _vbdev_compress_destruct_cb(void *ctx) 915 { 916 struct spdk_bdev_desc *desc = ctx; 917 918 spdk_bdev_close(desc); 919 } 920 921 static void 922 vbdev_compress_destruct_cb(void *cb_arg, int reduce_errno) 923 { 924 struct vbdev_compress *comp_bdev = (struct vbdev_compress *)cb_arg; 925 926 if (reduce_errno) { 927 SPDK_ERRLOG("number %d\n", reduce_errno); 928 } else { 929 TAILQ_REMOVE(&g_vbdev_comp, comp_bdev, link); 930 spdk_bdev_module_release_bdev(comp_bdev->base_bdev); 931 /* Close the underlying bdev on its same opened thread. */ 932 if (comp_bdev->thread && comp_bdev->thread != spdk_get_thread()) { 933 spdk_thread_send_msg(comp_bdev->thread, _vbdev_compress_destruct_cb, comp_bdev->base_desc); 934 } else { 935 spdk_bdev_close(comp_bdev->base_desc); 936 } 937 comp_bdev->vol = NULL; 938 if (comp_bdev->orphaned == false) { 939 spdk_io_device_unregister(comp_bdev, _device_unregister_cb); 940 } else { 941 comp_bdev->delete_cb_fn(comp_bdev->delete_cb_arg, 0); 942 _device_unregister_cb(comp_bdev); 943 } 944 } 945 } 946 947 static void 948 _reduce_destroy_cb(void *ctx, int reduce_errno) 949 { 950 struct vbdev_compress *comp_bdev = (struct vbdev_compress *)ctx; 951 952 if (reduce_errno) { 953 SPDK_ERRLOG("number %d\n", reduce_errno); 954 } 955 956 comp_bdev->vol = NULL; 957 spdk_put_io_channel(comp_bdev->base_ch); 958 if (comp_bdev->orphaned == false) { 959 spdk_bdev_unregister(&comp_bdev->comp_bdev, comp_bdev->delete_cb_fn, 960 comp_bdev->delete_cb_arg); 961 } else { 962 vbdev_compress_destruct_cb((void *)comp_bdev, 0); 963 } 964 965 } 966 967 /* Called by reduceLib after performing unload vol actions */ 968 static void 969 delete_vol_unload_cb(void *cb_arg, int reduce_errno) 970 { 971 struct vbdev_compress *comp_bdev = (struct vbdev_compress *)cb_arg; 972 973 if (reduce_errno) { 974 SPDK_ERRLOG("number %d\n", reduce_errno); 975 } else { 976 /* reducelib needs a channel to comm with the backing device */ 977 comp_bdev->base_ch = spdk_bdev_get_io_channel(comp_bdev->base_desc); 978 979 /* Clean the device before we free our resources. */ 980 spdk_reduce_vol_destroy(&comp_bdev->backing_dev, _reduce_destroy_cb, comp_bdev); 981 } 982 } 983 984 const char * 985 compress_get_name(const struct vbdev_compress *comp_bdev) 986 { 987 return comp_bdev->comp_bdev.name; 988 } 989 990 struct vbdev_compress * 991 compress_bdev_first(void) 992 { 993 struct vbdev_compress *comp_bdev; 994 995 comp_bdev = TAILQ_FIRST(&g_vbdev_comp); 996 997 return comp_bdev; 998 } 999 1000 struct vbdev_compress * 1001 compress_bdev_next(struct vbdev_compress *prev) 1002 { 1003 struct vbdev_compress *comp_bdev; 1004 1005 comp_bdev = TAILQ_NEXT(prev, link); 1006 1007 return comp_bdev; 1008 } 1009 1010 bool 1011 compress_has_orphan(const char *name) 1012 { 1013 struct vbdev_compress *comp_bdev; 1014 1015 TAILQ_FOREACH(comp_bdev, &g_vbdev_comp, link) { 1016 if (comp_bdev->orphaned && strcmp(name, comp_bdev->comp_bdev.name) == 0) { 1017 return true; 1018 } 1019 } 1020 return false; 1021 } 1022 1023 /* Called after we've unregistered following a hot remove callback. 1024 * Our finish entry point will be called next. 1025 */ 1026 static int 1027 vbdev_compress_destruct(void *ctx) 1028 { 1029 struct vbdev_compress *comp_bdev = (struct vbdev_compress *)ctx; 1030 1031 if (comp_bdev->vol != NULL) { 1032 /* Tell reducelib that we're done with this volume. */ 1033 spdk_reduce_vol_unload(comp_bdev->vol, vbdev_compress_destruct_cb, comp_bdev); 1034 } else { 1035 vbdev_compress_destruct_cb(comp_bdev, 0); 1036 } 1037 1038 return 0; 1039 } 1040 1041 /* We supplied this as an entry point for upper layers who want to communicate to this 1042 * bdev. This is how they get a channel. 1043 */ 1044 static struct spdk_io_channel * 1045 vbdev_compress_get_io_channel(void *ctx) 1046 { 1047 struct vbdev_compress *comp_bdev = (struct vbdev_compress *)ctx; 1048 1049 /* The IO channel code will allocate a channel for us which consists of 1050 * the SPDK channel structure plus the size of our comp_io_channel struct 1051 * that we passed in when we registered our IO device. It will then call 1052 * our channel create callback to populate any elements that we need to 1053 * update. 1054 */ 1055 return spdk_get_io_channel(comp_bdev); 1056 } 1057 1058 /* This is the output for bdev_get_bdevs() for this vbdev */ 1059 static int 1060 vbdev_compress_dump_info_json(void *ctx, struct spdk_json_write_ctx *w) 1061 { 1062 struct vbdev_compress *comp_bdev = (struct vbdev_compress *)ctx; 1063 1064 spdk_json_write_name(w, "compress"); 1065 spdk_json_write_object_begin(w); 1066 spdk_json_write_named_string(w, "name", spdk_bdev_get_name(&comp_bdev->comp_bdev)); 1067 spdk_json_write_named_string(w, "base_bdev_name", spdk_bdev_get_name(comp_bdev->base_bdev)); 1068 spdk_json_write_named_string(w, "compression_pmd", comp_bdev->drv_name); 1069 spdk_json_write_object_end(w); 1070 1071 return 0; 1072 } 1073 1074 /* This is used to generate JSON that can configure this module to its current state. */ 1075 static int 1076 vbdev_compress_config_json(struct spdk_json_write_ctx *w) 1077 { 1078 struct vbdev_compress *comp_bdev; 1079 1080 TAILQ_FOREACH(comp_bdev, &g_vbdev_comp, link) { 1081 spdk_json_write_object_begin(w); 1082 spdk_json_write_named_string(w, "method", "bdev_compress_create"); 1083 spdk_json_write_named_object_begin(w, "params"); 1084 spdk_json_write_named_string(w, "base_bdev_name", spdk_bdev_get_name(comp_bdev->base_bdev)); 1085 spdk_json_write_named_string(w, "name", spdk_bdev_get_name(&comp_bdev->comp_bdev)); 1086 spdk_json_write_named_string(w, "compression_pmd", comp_bdev->drv_name); 1087 spdk_json_write_object_end(w); 1088 spdk_json_write_object_end(w); 1089 } 1090 return 0; 1091 } 1092 1093 static void 1094 _vbdev_reduce_init_cb(void *ctx) 1095 { 1096 struct spdk_bdev_desc *desc = ctx; 1097 1098 spdk_bdev_close(desc); 1099 } 1100 1101 /* Callback from reduce for when init is complete. We'll pass the vbdev_comp struct 1102 * used for initial metadata operations to claim where it will be further filled out 1103 * and added to the global list. 1104 */ 1105 static void 1106 vbdev_reduce_init_cb(void *cb_arg, struct spdk_reduce_vol *vol, int reduce_errno) 1107 { 1108 struct vbdev_compress *meta_ctx = cb_arg; 1109 1110 /* We're done with metadata operations */ 1111 spdk_put_io_channel(meta_ctx->base_ch); 1112 /* Close the underlying bdev on its same opened thread. */ 1113 if (meta_ctx->thread && meta_ctx->thread != spdk_get_thread()) { 1114 spdk_thread_send_msg(meta_ctx->thread, _vbdev_reduce_init_cb, meta_ctx->base_desc); 1115 } else { 1116 spdk_bdev_close(meta_ctx->base_desc); 1117 } 1118 meta_ctx->base_desc = NULL; 1119 1120 if (reduce_errno == 0) { 1121 meta_ctx->vol = vol; 1122 vbdev_compress_claim(meta_ctx); 1123 } else { 1124 SPDK_ERRLOG("for vol %s, error %u\n", 1125 spdk_bdev_get_name(meta_ctx->base_bdev), reduce_errno); 1126 free(meta_ctx); 1127 } 1128 } 1129 1130 /* Callback for the function used by reduceLib to perform IO to/from the backing device. We just 1131 * call the callback provided by reduceLib when it called the read/write/unmap function and 1132 * free the bdev_io. 1133 */ 1134 static void 1135 comp_reduce_io_cb(struct spdk_bdev_io *bdev_io, bool success, void *arg) 1136 { 1137 struct spdk_reduce_vol_cb_args *cb_args = arg; 1138 int reduce_errno; 1139 1140 if (success) { 1141 reduce_errno = 0; 1142 } else { 1143 reduce_errno = -EIO; 1144 } 1145 spdk_bdev_free_io(bdev_io); 1146 cb_args->cb_fn(cb_args->cb_arg, reduce_errno); 1147 } 1148 1149 /* This is the function provided to the reduceLib for sending reads directly to 1150 * the backing device. 1151 */ 1152 static void 1153 _comp_reduce_readv(struct spdk_reduce_backing_dev *dev, struct iovec *iov, int iovcnt, 1154 uint64_t lba, uint32_t lba_count, struct spdk_reduce_vol_cb_args *args) 1155 { 1156 struct vbdev_compress *comp_bdev = SPDK_CONTAINEROF(dev, struct vbdev_compress, 1157 backing_dev); 1158 int rc; 1159 1160 rc = spdk_bdev_readv_blocks(comp_bdev->base_desc, comp_bdev->base_ch, 1161 iov, iovcnt, lba, lba_count, 1162 comp_reduce_io_cb, 1163 args); 1164 if (rc) { 1165 if (rc == -ENOMEM) { 1166 SPDK_ERRLOG("No memory, start to queue io.\n"); 1167 /* TODO: there's no bdev_io to queue */ 1168 } else { 1169 SPDK_ERRLOG("submitting readv request\n"); 1170 } 1171 args->cb_fn(args->cb_arg, rc); 1172 } 1173 } 1174 1175 /* This is the function provided to the reduceLib for sending writes directly to 1176 * the backing device. 1177 */ 1178 static void 1179 _comp_reduce_writev(struct spdk_reduce_backing_dev *dev, struct iovec *iov, int iovcnt, 1180 uint64_t lba, uint32_t lba_count, struct spdk_reduce_vol_cb_args *args) 1181 { 1182 struct vbdev_compress *comp_bdev = SPDK_CONTAINEROF(dev, struct vbdev_compress, 1183 backing_dev); 1184 int rc; 1185 1186 rc = spdk_bdev_writev_blocks(comp_bdev->base_desc, comp_bdev->base_ch, 1187 iov, iovcnt, lba, lba_count, 1188 comp_reduce_io_cb, 1189 args); 1190 if (rc) { 1191 if (rc == -ENOMEM) { 1192 SPDK_ERRLOG("No memory, start to queue io.\n"); 1193 /* TODO: there's no bdev_io to queue */ 1194 } else { 1195 SPDK_ERRLOG("error submitting writev request\n"); 1196 } 1197 args->cb_fn(args->cb_arg, rc); 1198 } 1199 } 1200 1201 /* This is the function provided to the reduceLib for sending unmaps directly to 1202 * the backing device. 1203 */ 1204 static void 1205 _comp_reduce_unmap(struct spdk_reduce_backing_dev *dev, 1206 uint64_t lba, uint32_t lba_count, struct spdk_reduce_vol_cb_args *args) 1207 { 1208 struct vbdev_compress *comp_bdev = SPDK_CONTAINEROF(dev, struct vbdev_compress, 1209 backing_dev); 1210 int rc; 1211 1212 rc = spdk_bdev_unmap_blocks(comp_bdev->base_desc, comp_bdev->base_ch, 1213 lba, lba_count, 1214 comp_reduce_io_cb, 1215 args); 1216 1217 if (rc) { 1218 if (rc == -ENOMEM) { 1219 SPDK_ERRLOG("No memory, start to queue io.\n"); 1220 /* TODO: there's no bdev_io to queue */ 1221 } else { 1222 SPDK_ERRLOG("submitting unmap request\n"); 1223 } 1224 args->cb_fn(args->cb_arg, rc); 1225 } 1226 } 1227 1228 /* Called by reduceLib after performing unload vol actions following base bdev hotremove */ 1229 static void 1230 bdev_hotremove_vol_unload_cb(void *cb_arg, int reduce_errno) 1231 { 1232 struct vbdev_compress *comp_bdev = (struct vbdev_compress *)cb_arg; 1233 1234 if (reduce_errno) { 1235 SPDK_ERRLOG("number %d\n", reduce_errno); 1236 } 1237 1238 comp_bdev->vol = NULL; 1239 spdk_bdev_unregister(&comp_bdev->comp_bdev, NULL, NULL); 1240 } 1241 1242 /* Called when the underlying base bdev goes away. */ 1243 static void 1244 vbdev_compress_base_bdev_hotremove_cb(void *ctx) 1245 { 1246 struct vbdev_compress *comp_bdev, *tmp; 1247 struct spdk_bdev *bdev_find = ctx; 1248 1249 TAILQ_FOREACH_SAFE(comp_bdev, &g_vbdev_comp, link, tmp) { 1250 if (bdev_find == comp_bdev->base_bdev) { 1251 /* Tell reduceLib that we're done with this volume. */ 1252 spdk_reduce_vol_unload(comp_bdev->vol, bdev_hotremove_vol_unload_cb, comp_bdev); 1253 } 1254 } 1255 } 1256 1257 /* TODO: determine which parms we want user configurable, HC for now 1258 * params.vol_size 1259 * params.chunk_size 1260 * compression PMD, algorithm, window size, comp level, etc. 1261 * DEV_MD_PATH 1262 */ 1263 1264 /* Common function for init and load to allocate and populate the minimal 1265 * information for reducelib to init or load. 1266 */ 1267 struct vbdev_compress * 1268 _prepare_for_load_init(struct spdk_bdev *bdev) 1269 { 1270 struct vbdev_compress *meta_ctx; 1271 1272 meta_ctx = calloc(1, sizeof(struct vbdev_compress)); 1273 if (meta_ctx == NULL) { 1274 SPDK_ERRLOG("failed to alloc init contexts\n"); 1275 return NULL; 1276 } 1277 1278 meta_ctx->drv_name = "None"; 1279 meta_ctx->base_bdev = bdev; 1280 meta_ctx->backing_dev.unmap = _comp_reduce_unmap; 1281 meta_ctx->backing_dev.readv = _comp_reduce_readv; 1282 meta_ctx->backing_dev.writev = _comp_reduce_writev; 1283 meta_ctx->backing_dev.compress = _comp_reduce_compress; 1284 meta_ctx->backing_dev.decompress = _comp_reduce_decompress; 1285 1286 meta_ctx->backing_dev.blocklen = bdev->blocklen; 1287 meta_ctx->backing_dev.blockcnt = bdev->blockcnt; 1288 1289 meta_ctx->params.chunk_size = CHUNK_SIZE; 1290 meta_ctx->params.logical_block_size = bdev->blocklen; 1291 meta_ctx->params.backing_io_unit_size = BACKING_IO_SZ; 1292 return meta_ctx; 1293 } 1294 1295 static bool 1296 _set_pmd(struct vbdev_compress *comp_dev) 1297 { 1298 if (g_opts == COMPRESS_PMD_AUTO) { 1299 if (g_qat_available) { 1300 comp_dev->drv_name = QAT_PMD; 1301 } else { 1302 comp_dev->drv_name = ISAL_PMD; 1303 } 1304 } else if (g_opts == COMPRESS_PMD_QAT_ONLY && g_qat_available) { 1305 comp_dev->drv_name = QAT_PMD; 1306 } else if (g_opts == COMPRESS_PMD_ISAL_ONLY && g_isal_available) { 1307 comp_dev->drv_name = ISAL_PMD; 1308 } else { 1309 SPDK_ERRLOG("Requested PMD is not available.\n"); 1310 return false; 1311 } 1312 SPDK_NOTICELOG("PMD being used: %s\n", comp_dev->drv_name); 1313 return true; 1314 } 1315 1316 /* Call reducelib to initialize a new volume */ 1317 static int 1318 vbdev_init_reduce(struct spdk_bdev *bdev, const char *pm_path) 1319 { 1320 struct vbdev_compress *meta_ctx; 1321 int rc; 1322 1323 meta_ctx = _prepare_for_load_init(bdev); 1324 if (meta_ctx == NULL) { 1325 return -EINVAL; 1326 } 1327 1328 if (_set_pmd(meta_ctx) == false) { 1329 SPDK_ERRLOG("could not find required pmd\n"); 1330 free(meta_ctx); 1331 return -EINVAL; 1332 } 1333 1334 rc = spdk_bdev_open(meta_ctx->base_bdev, true, vbdev_compress_base_bdev_hotremove_cb, 1335 meta_ctx->base_bdev, &meta_ctx->base_desc); 1336 if (rc) { 1337 SPDK_ERRLOG("could not open bdev %s\n", spdk_bdev_get_name(meta_ctx->base_bdev)); 1338 free(meta_ctx); 1339 return -EINVAL; 1340 } 1341 1342 /* Save the thread where the base device is opened */ 1343 meta_ctx->thread = spdk_get_thread(); 1344 1345 meta_ctx->base_ch = spdk_bdev_get_io_channel(meta_ctx->base_desc); 1346 1347 spdk_reduce_vol_init(&meta_ctx->params, &meta_ctx->backing_dev, 1348 pm_path, 1349 vbdev_reduce_init_cb, 1350 meta_ctx); 1351 return 0; 1352 } 1353 1354 /* We provide this callback for the SPDK channel code to create a channel using 1355 * the channel struct we provided in our module get_io_channel() entry point. Here 1356 * we get and save off an underlying base channel of the device below us so that 1357 * we can communicate with the base bdev on a per channel basis. If we needed 1358 * our own poller for this vbdev, we'd register it here. 1359 */ 1360 static int 1361 comp_bdev_ch_create_cb(void *io_device, void *ctx_buf) 1362 { 1363 struct vbdev_compress *comp_bdev = io_device; 1364 struct comp_device_qp *device_qp; 1365 1366 /* We use this queue to track outstanding IO in our layer. */ 1367 TAILQ_INIT(&comp_bdev->pending_comp_ios); 1368 1369 /* We use this to queue up compression operations as needed. */ 1370 TAILQ_INIT(&comp_bdev->queued_comp_ops); 1371 1372 /* Now set the reduce channel if it's not already set. */ 1373 pthread_mutex_lock(&comp_bdev->reduce_lock); 1374 if (comp_bdev->ch_count == 0) { 1375 comp_bdev->base_ch = spdk_bdev_get_io_channel(comp_bdev->base_desc); 1376 comp_bdev->reduce_thread = spdk_get_thread(); 1377 comp_bdev->poller = spdk_poller_register(comp_dev_poller, comp_bdev, 0); 1378 /* Now assign a q pair */ 1379 pthread_mutex_lock(&g_comp_device_qp_lock); 1380 TAILQ_FOREACH(device_qp, &g_comp_device_qp, link) { 1381 if ((strcmp(device_qp->device->cdev_info.driver_name, comp_bdev->drv_name) == 0)) { 1382 if (device_qp->thread == spdk_get_thread()) { 1383 comp_bdev->device_qp = device_qp; 1384 break; 1385 } 1386 if (device_qp->thread == NULL) { 1387 comp_bdev->device_qp = device_qp; 1388 device_qp->thread = spdk_get_thread(); 1389 break; 1390 } 1391 } 1392 } 1393 pthread_mutex_unlock(&g_comp_device_qp_lock); 1394 } 1395 comp_bdev->ch_count++; 1396 pthread_mutex_unlock(&comp_bdev->reduce_lock); 1397 1398 if (comp_bdev->device_qp != NULL) { 1399 return 0; 1400 } else { 1401 SPDK_ERRLOG("out of qpairs, cannot assign one to comp_bdev %p\n", comp_bdev); 1402 assert(false); 1403 return -ENOMEM; 1404 } 1405 } 1406 1407 static void 1408 _channel_cleanup(struct vbdev_compress *comp_bdev) 1409 { 1410 /* Note: comp_bdevs can share a device_qp if they are 1411 * on the same thread so we leave the device_qp element 1412 * alone for this comp_bdev and just clear the reduce thread. 1413 */ 1414 spdk_put_io_channel(comp_bdev->base_ch); 1415 comp_bdev->reduce_thread = NULL; 1416 spdk_poller_unregister(&comp_bdev->poller); 1417 } 1418 1419 /* Used to reroute destroy_ch to the correct thread */ 1420 static void 1421 _comp_bdev_ch_destroy_cb(void *arg) 1422 { 1423 struct vbdev_compress *comp_bdev = arg; 1424 1425 pthread_mutex_lock(&comp_bdev->reduce_lock); 1426 if (comp_bdev->ch_count == 0) { 1427 _channel_cleanup(comp_bdev); 1428 } 1429 pthread_mutex_unlock(&comp_bdev->reduce_lock); 1430 } 1431 1432 /* We provide this callback for the SPDK channel code to destroy a channel 1433 * created with our create callback. We just need to undo anything we did 1434 * when we created. If this bdev used its own poller, we'd unregister it here. 1435 */ 1436 static void 1437 comp_bdev_ch_destroy_cb(void *io_device, void *ctx_buf) 1438 { 1439 struct vbdev_compress *comp_bdev = io_device; 1440 1441 pthread_mutex_lock(&comp_bdev->reduce_lock); 1442 comp_bdev->ch_count--; 1443 if (comp_bdev->ch_count == 0) { 1444 /* Send this request to the thread where the channel was created. */ 1445 if (comp_bdev->reduce_thread != spdk_get_thread()) { 1446 spdk_thread_send_msg(comp_bdev->reduce_thread, 1447 _comp_bdev_ch_destroy_cb, comp_bdev); 1448 } else { 1449 _channel_cleanup(comp_bdev); 1450 } 1451 } 1452 pthread_mutex_unlock(&comp_bdev->reduce_lock); 1453 } 1454 1455 /* RPC entry point for compression vbdev creation. */ 1456 int 1457 create_compress_bdev(const char *bdev_name, const char *pm_path) 1458 { 1459 struct spdk_bdev *bdev; 1460 1461 bdev = spdk_bdev_get_by_name(bdev_name); 1462 if (!bdev) { 1463 return -ENODEV; 1464 } 1465 1466 return vbdev_init_reduce(bdev, pm_path);; 1467 } 1468 1469 /* On init, just init the compress drivers. All metadata is stored on disk. */ 1470 static int 1471 vbdev_compress_init(void) 1472 { 1473 if (vbdev_init_compress_drivers()) { 1474 SPDK_ERRLOG("Error setting up compression devices\n"); 1475 return -EINVAL; 1476 } 1477 1478 return 0; 1479 } 1480 1481 /* Called when the entire module is being torn down. */ 1482 static void 1483 vbdev_compress_finish(void) 1484 { 1485 struct comp_device_qp *dev_qp; 1486 /* TODO: unload vol in a future patch */ 1487 1488 while ((dev_qp = TAILQ_FIRST(&g_comp_device_qp))) { 1489 TAILQ_REMOVE(&g_comp_device_qp, dev_qp, link); 1490 free(dev_qp); 1491 } 1492 pthread_mutex_destroy(&g_comp_device_qp_lock); 1493 1494 rte_mempool_free(g_comp_op_mp); 1495 rte_mempool_free(g_mbuf_mp); 1496 } 1497 1498 /* During init we'll be asked how much memory we'd like passed to us 1499 * in bev_io structures as context. Here's where we specify how 1500 * much context we want per IO. 1501 */ 1502 static int 1503 vbdev_compress_get_ctx_size(void) 1504 { 1505 return sizeof(struct comp_bdev_io); 1506 } 1507 1508 /* When we register our bdev this is how we specify our entry points. */ 1509 static const struct spdk_bdev_fn_table vbdev_compress_fn_table = { 1510 .destruct = vbdev_compress_destruct, 1511 .submit_request = vbdev_compress_submit_request, 1512 .io_type_supported = vbdev_compress_io_type_supported, 1513 .get_io_channel = vbdev_compress_get_io_channel, 1514 .dump_info_json = vbdev_compress_dump_info_json, 1515 .write_config_json = NULL, 1516 }; 1517 1518 static struct spdk_bdev_module compress_if = { 1519 .name = "compress", 1520 .module_init = vbdev_compress_init, 1521 .config_text = NULL, 1522 .get_ctx_size = vbdev_compress_get_ctx_size, 1523 .examine_disk = vbdev_compress_examine, 1524 .module_fini = vbdev_compress_finish, 1525 .config_json = vbdev_compress_config_json 1526 }; 1527 1528 SPDK_BDEV_MODULE_REGISTER(compress, &compress_if) 1529 1530 static int _set_compbdev_name(struct vbdev_compress *comp_bdev) 1531 { 1532 struct spdk_bdev_alias *aliases; 1533 1534 if (!TAILQ_EMPTY(spdk_bdev_get_aliases(comp_bdev->base_bdev))) { 1535 aliases = TAILQ_FIRST(spdk_bdev_get_aliases(comp_bdev->base_bdev)); 1536 comp_bdev->comp_bdev.name = spdk_sprintf_alloc("COMP_%s", aliases->alias); 1537 if (!comp_bdev->comp_bdev.name) { 1538 SPDK_ERRLOG("could not allocate comp_bdev name for alias\n"); 1539 return -ENOMEM; 1540 } 1541 } else { 1542 comp_bdev->comp_bdev.name = spdk_sprintf_alloc("COMP_%s", comp_bdev->base_bdev->name); 1543 if (!comp_bdev->comp_bdev.name) { 1544 SPDK_ERRLOG("could not allocate comp_bdev name for unique name\n"); 1545 return -ENOMEM; 1546 } 1547 } 1548 return 0; 1549 } 1550 1551 static void 1552 vbdev_compress_claim(struct vbdev_compress *comp_bdev) 1553 { 1554 int rc; 1555 1556 if (_set_compbdev_name(comp_bdev)) { 1557 goto error_bdev_name; 1558 } 1559 1560 /* Note: some of the fields below will change in the future - for example, 1561 * blockcnt specifically will not match (the compressed volume size will 1562 * be slightly less than the base bdev size) 1563 */ 1564 comp_bdev->comp_bdev.product_name = COMP_BDEV_NAME; 1565 comp_bdev->comp_bdev.write_cache = comp_bdev->base_bdev->write_cache; 1566 1567 if (strcmp(comp_bdev->drv_name, QAT_PMD) == 0) { 1568 comp_bdev->comp_bdev.required_alignment = 1569 spdk_max(spdk_u32log2(comp_bdev->base_bdev->blocklen), 1570 comp_bdev->base_bdev->required_alignment); 1571 SPDK_NOTICELOG("QAT in use: Required alignment set to %u\n", 1572 comp_bdev->comp_bdev.required_alignment); 1573 } else { 1574 comp_bdev->comp_bdev.required_alignment = comp_bdev->base_bdev->required_alignment; 1575 } 1576 comp_bdev->comp_bdev.optimal_io_boundary = 1577 comp_bdev->params.chunk_size / comp_bdev->params.logical_block_size; 1578 1579 comp_bdev->comp_bdev.split_on_optimal_io_boundary = true; 1580 1581 comp_bdev->comp_bdev.blocklen = comp_bdev->base_bdev->blocklen; 1582 comp_bdev->comp_bdev.blockcnt = comp_bdev->params.vol_size / comp_bdev->comp_bdev.blocklen; 1583 assert(comp_bdev->comp_bdev.blockcnt > 0); 1584 1585 /* This is the context that is passed to us when the bdev 1586 * layer calls in so we'll save our comp_bdev node here. 1587 */ 1588 comp_bdev->comp_bdev.ctxt = comp_bdev; 1589 comp_bdev->comp_bdev.fn_table = &vbdev_compress_fn_table; 1590 comp_bdev->comp_bdev.module = &compress_if; 1591 1592 pthread_mutex_init(&comp_bdev->reduce_lock, NULL); 1593 1594 TAILQ_INSERT_TAIL(&g_vbdev_comp, comp_bdev, link); 1595 1596 rc = spdk_bdev_open(comp_bdev->base_bdev, true, vbdev_compress_base_bdev_hotremove_cb, 1597 comp_bdev->base_bdev, &comp_bdev->base_desc); 1598 if (rc) { 1599 SPDK_ERRLOG("could not open bdev %s\n", spdk_bdev_get_name(comp_bdev->base_bdev)); 1600 goto error_open; 1601 } 1602 1603 /* Save the thread where the base device is opened */ 1604 comp_bdev->thread = spdk_get_thread(); 1605 1606 spdk_io_device_register(comp_bdev, comp_bdev_ch_create_cb, comp_bdev_ch_destroy_cb, 1607 sizeof(struct comp_io_channel), 1608 comp_bdev->comp_bdev.name); 1609 1610 rc = spdk_bdev_module_claim_bdev(comp_bdev->base_bdev, comp_bdev->base_desc, 1611 comp_bdev->comp_bdev.module); 1612 if (rc) { 1613 SPDK_ERRLOG("could not claim bdev %s\n", spdk_bdev_get_name(comp_bdev->base_bdev)); 1614 goto error_claim; 1615 } 1616 1617 rc = spdk_bdev_register(&comp_bdev->comp_bdev); 1618 if (rc < 0) { 1619 SPDK_ERRLOG("trying to register bdev\n"); 1620 goto error_bdev_register; 1621 } 1622 1623 SPDK_NOTICELOG("registered io_device and virtual bdev for: %s\n", comp_bdev->comp_bdev.name); 1624 1625 return; 1626 /* Error cleanup paths. */ 1627 error_bdev_register: 1628 spdk_bdev_module_release_bdev(comp_bdev->base_bdev); 1629 error_claim: 1630 TAILQ_REMOVE(&g_vbdev_comp, comp_bdev, link); 1631 spdk_io_device_unregister(comp_bdev, NULL); 1632 error_open: 1633 free(comp_bdev->comp_bdev.name); 1634 error_bdev_name: 1635 spdk_put_io_channel(comp_bdev->base_ch); 1636 spdk_bdev_close(comp_bdev->base_desc); 1637 free(comp_bdev); 1638 spdk_bdev_module_examine_done(&compress_if); 1639 } 1640 1641 void 1642 bdev_compress_delete(const char *name, spdk_delete_compress_complete cb_fn, void *cb_arg) 1643 { 1644 struct vbdev_compress *comp_bdev = NULL; 1645 1646 TAILQ_FOREACH(comp_bdev, &g_vbdev_comp, link) { 1647 if (strcmp(name, comp_bdev->comp_bdev.name) == 0) { 1648 break; 1649 } 1650 } 1651 1652 if (comp_bdev == NULL) { 1653 cb_fn(cb_arg, -ENODEV); 1654 return; 1655 } 1656 1657 /* Save these for after the vol is destroyed. */ 1658 comp_bdev->delete_cb_fn = cb_fn; 1659 comp_bdev->delete_cb_arg = cb_arg; 1660 1661 /* Tell reducelib that we're done with this volume. */ 1662 if (comp_bdev->orphaned == false) { 1663 spdk_reduce_vol_unload(comp_bdev->vol, delete_vol_unload_cb, comp_bdev); 1664 } else { 1665 delete_vol_unload_cb(comp_bdev, 0); 1666 } 1667 } 1668 1669 static void 1670 _vbdev_reduce_load_cb(void *ctx) 1671 { 1672 struct spdk_bdev_desc *desc = ctx; 1673 1674 spdk_bdev_close(desc); 1675 } 1676 1677 /* Callback from reduce for then load is complete. We'll pass the vbdev_comp struct 1678 * used for initial metadata operations to claim where it will be further filled out 1679 * and added to the global list. 1680 */ 1681 static void 1682 vbdev_reduce_load_cb(void *cb_arg, struct spdk_reduce_vol *vol, int reduce_errno) 1683 { 1684 struct vbdev_compress *meta_ctx = cb_arg; 1685 int rc; 1686 1687 /* Done with metadata operations */ 1688 spdk_put_io_channel(meta_ctx->base_ch); 1689 /* Close the underlying bdev on its same opened thread. */ 1690 if (meta_ctx->thread && meta_ctx->thread != spdk_get_thread()) { 1691 spdk_thread_send_msg(meta_ctx->thread, _vbdev_reduce_load_cb, meta_ctx->base_desc); 1692 } else { 1693 spdk_bdev_close(meta_ctx->base_desc); 1694 } 1695 meta_ctx->base_desc = NULL; 1696 1697 if (reduce_errno != 0 && reduce_errno != -ENOENT) { 1698 /* This error means it is not a compress disk. */ 1699 if (reduce_errno != -EILSEQ) { 1700 SPDK_ERRLOG("for vol %s, error %u\n", 1701 spdk_bdev_get_name(meta_ctx->base_bdev), reduce_errno); 1702 } 1703 free(meta_ctx); 1704 spdk_bdev_module_examine_done(&compress_if); 1705 return; 1706 } 1707 1708 /* this status means that the vol could not be loaded because 1709 * the pmem file can't be found. 1710 */ 1711 if (reduce_errno == -ENOENT) { 1712 if (_set_compbdev_name(meta_ctx)) { 1713 goto err; 1714 } 1715 1716 /* We still want to open and claim the backing device to protect the data until 1717 * either the pm metadata file is recovered or the comp bdev is deleted. 1718 */ 1719 rc = spdk_bdev_open(meta_ctx->base_bdev, true, vbdev_compress_base_bdev_hotremove_cb, 1720 meta_ctx->base_bdev, &meta_ctx->base_desc); 1721 if (rc) { 1722 SPDK_ERRLOG("could not open bdev %s\n", spdk_bdev_get_name(meta_ctx->base_bdev)); 1723 goto err; 1724 } 1725 1726 /* Save the thread where the base device is opened */ 1727 meta_ctx->thread = spdk_get_thread(); 1728 1729 meta_ctx->comp_bdev.module = &compress_if; 1730 pthread_mutex_init(&meta_ctx->reduce_lock, NULL); 1731 rc = spdk_bdev_module_claim_bdev(meta_ctx->base_bdev, meta_ctx->base_desc, 1732 meta_ctx->comp_bdev.module); 1733 if (rc) { 1734 SPDK_ERRLOG("could not claim bdev %s\n", spdk_bdev_get_name(meta_ctx->base_bdev)); 1735 goto err; 1736 } 1737 1738 meta_ctx->orphaned = true; 1739 TAILQ_INSERT_TAIL(&g_vbdev_comp, meta_ctx, link); 1740 err: 1741 spdk_bdev_module_examine_done(&compress_if); 1742 return; 1743 } 1744 1745 if (_set_pmd(meta_ctx) == false) { 1746 SPDK_ERRLOG("could not find required pmd\n"); 1747 free(meta_ctx); 1748 spdk_bdev_module_examine_done(&compress_if); 1749 return; 1750 } 1751 1752 /* Update information following volume load. */ 1753 meta_ctx->vol = vol; 1754 memcpy(&meta_ctx->params, spdk_reduce_vol_get_params(vol), 1755 sizeof(struct spdk_reduce_vol_params)); 1756 vbdev_compress_claim(meta_ctx); 1757 spdk_bdev_module_examine_done(&compress_if); 1758 } 1759 1760 /* Examine_disk entry point: will do a metadata load to see if this is ours, 1761 * and if so will go ahead and claim it. 1762 */ 1763 static void 1764 vbdev_compress_examine(struct spdk_bdev *bdev) 1765 { 1766 struct vbdev_compress *meta_ctx; 1767 int rc; 1768 1769 if (strcmp(bdev->product_name, COMP_BDEV_NAME) == 0) { 1770 spdk_bdev_module_examine_done(&compress_if); 1771 return; 1772 } 1773 1774 meta_ctx = _prepare_for_load_init(bdev); 1775 if (meta_ctx == NULL) { 1776 spdk_bdev_module_examine_done(&compress_if); 1777 return; 1778 } 1779 1780 rc = spdk_bdev_open(meta_ctx->base_bdev, false, vbdev_compress_base_bdev_hotremove_cb, 1781 meta_ctx->base_bdev, &meta_ctx->base_desc); 1782 if (rc) { 1783 SPDK_ERRLOG("could not open bdev %s\n", spdk_bdev_get_name(meta_ctx->base_bdev)); 1784 free(meta_ctx); 1785 spdk_bdev_module_examine_done(&compress_if); 1786 return; 1787 } 1788 1789 /* Save the thread where the base device is opened */ 1790 meta_ctx->thread = spdk_get_thread(); 1791 1792 meta_ctx->base_ch = spdk_bdev_get_io_channel(meta_ctx->base_desc); 1793 spdk_reduce_vol_load(&meta_ctx->backing_dev, vbdev_reduce_load_cb, meta_ctx); 1794 } 1795 1796 int 1797 compress_set_pmd(enum compress_pmd *opts) 1798 { 1799 g_opts = *opts; 1800 1801 return 0; 1802 } 1803 1804 SPDK_LOG_REGISTER_COMPONENT("vbdev_compress", SPDK_LOG_VBDEV_COMPRESS) 1805