1 /*- 2 * BSD LICENSE 3 * 4 * Copyright (c) Intel Corporation. 5 * All rights reserved. 6 * 7 * Redistribution and use in source and binary forms, with or without 8 * modification, are permitted provided that the following conditions 9 * are met: 10 * 11 * * Redistributions of source code must retain the above copyright 12 * notice, this list of conditions and the following disclaimer. 13 * * Redistributions in binary form must reproduce the above copyright 14 * notice, this list of conditions and the following disclaimer in 15 * the documentation and/or other materials provided with the 16 * distribution. 17 * * Neither the name of Intel Corporation nor the names of its 18 * contributors may be used to endorse or promote products derived 19 * from this software without specific prior written permission. 20 * 21 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 22 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 23 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR 24 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT 25 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, 26 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT 27 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 28 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY 29 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 30 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE 31 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 32 */ 33 34 #include "vbdev_compress.h" 35 36 #include "spdk/reduce.h" 37 #include "spdk/stdinc.h" 38 #include "spdk/rpc.h" 39 #include "spdk/env.h" 40 #include "spdk/conf.h" 41 #include "spdk/endian.h" 42 #include "spdk/string.h" 43 #include "spdk/thread.h" 44 #include "spdk/util.h" 45 #include "spdk/bdev_module.h" 46 47 #include "spdk/log.h" 48 49 #include <rte_config.h> 50 #include <rte_bus_vdev.h> 51 #include <rte_compressdev.h> 52 #include <rte_comp.h> 53 54 #define NUM_MAX_XFORMS 2 55 #define NUM_MAX_INFLIGHT_OPS 128 56 #define DEFAULT_WINDOW_SIZE 15 57 /* We need extra mbufs per operation to accommodate host buffers that 58 * span a 2MB boundary. 59 */ 60 #define MAX_MBUFS_PER_OP (REDUCE_MAX_IOVECS * 2) 61 #define CHUNK_SIZE (1024 * 16) 62 #define COMP_BDEV_NAME "compress" 63 #define BACKING_IO_SZ (4 * 1024) 64 65 #define ISAL_PMD "compress_isal" 66 #define QAT_PMD "compress_qat" 67 #define NUM_MBUFS 8192 68 #define POOL_CACHE_SIZE 256 69 70 static enum compress_pmd g_opts; 71 72 /* Global list of available compression devices. */ 73 struct compress_dev { 74 struct rte_compressdev_info cdev_info; /* includes device friendly name */ 75 uint8_t cdev_id; /* identifier for the device */ 76 void *comp_xform; /* shared private xform for comp on this PMD */ 77 void *decomp_xform; /* shared private xform for decomp on this PMD */ 78 TAILQ_ENTRY(compress_dev) link; 79 }; 80 static TAILQ_HEAD(, compress_dev) g_compress_devs = TAILQ_HEAD_INITIALIZER(g_compress_devs); 81 82 /* Although ISAL PMD reports 'unlimited' qpairs, it has an unplanned limit of 99 due to 83 * the length of the internal ring name that it creates, it breaks a limit in the generic 84 * ring code and fails the qp initialization. 85 */ 86 #define MAX_NUM_QP 99 87 /* Global list and lock for unique device/queue pair combos */ 88 struct comp_device_qp { 89 struct compress_dev *device; /* ptr to compression device */ 90 uint8_t qp; /* queue pair for this node */ 91 struct spdk_thread *thread; /* thead that this qp is assigned to */ 92 TAILQ_ENTRY(comp_device_qp) link; 93 }; 94 static TAILQ_HEAD(, comp_device_qp) g_comp_device_qp = TAILQ_HEAD_INITIALIZER(g_comp_device_qp); 95 static pthread_mutex_t g_comp_device_qp_lock = PTHREAD_MUTEX_INITIALIZER; 96 97 /* For queueing up compression operations that we can't submit for some reason */ 98 struct vbdev_comp_op { 99 struct spdk_reduce_backing_dev *backing_dev; 100 struct iovec *src_iovs; 101 int src_iovcnt; 102 struct iovec *dst_iovs; 103 int dst_iovcnt; 104 bool compress; 105 void *cb_arg; 106 TAILQ_ENTRY(vbdev_comp_op) link; 107 }; 108 109 struct vbdev_comp_delete_ctx { 110 spdk_delete_compress_complete cb_fn; 111 void *cb_arg; 112 int cb_rc; 113 struct spdk_thread *orig_thread; 114 }; 115 116 /* List of virtual bdevs and associated info for each. */ 117 struct vbdev_compress { 118 struct spdk_bdev *base_bdev; /* the thing we're attaching to */ 119 struct spdk_bdev_desc *base_desc; /* its descriptor we get from open */ 120 struct spdk_io_channel *base_ch; /* IO channel of base device */ 121 struct spdk_bdev comp_bdev; /* the compression virtual bdev */ 122 struct comp_io_channel *comp_ch; /* channel associated with this bdev */ 123 char *drv_name; /* name of the compression device driver */ 124 struct comp_device_qp *device_qp; 125 struct spdk_thread *reduce_thread; 126 pthread_mutex_t reduce_lock; 127 uint32_t ch_count; 128 TAILQ_HEAD(, spdk_bdev_io) pending_comp_ios; /* outstanding operations to a comp library */ 129 struct spdk_poller *poller; /* completion poller */ 130 struct spdk_reduce_vol_params params; /* params for the reduce volume */ 131 struct spdk_reduce_backing_dev backing_dev; /* backing device info for the reduce volume */ 132 struct spdk_reduce_vol *vol; /* the reduce volume */ 133 struct vbdev_comp_delete_ctx *delete_ctx; 134 bool orphaned; /* base bdev claimed but comp_bdev not registered */ 135 int reduce_errno; 136 TAILQ_HEAD(, vbdev_comp_op) queued_comp_ops; 137 TAILQ_ENTRY(vbdev_compress) link; 138 struct spdk_thread *thread; /* thread where base device is opened */ 139 }; 140 static TAILQ_HEAD(, vbdev_compress) g_vbdev_comp = TAILQ_HEAD_INITIALIZER(g_vbdev_comp); 141 142 /* The comp vbdev channel struct. It is allocated and freed on my behalf by the io channel code. 143 */ 144 struct comp_io_channel { 145 struct spdk_io_channel_iter *iter; /* used with for_each_channel in reset */ 146 }; 147 148 /* Per I/O context for the compression vbdev. */ 149 struct comp_bdev_io { 150 struct comp_io_channel *comp_ch; /* used in completion handling */ 151 struct vbdev_compress *comp_bdev; /* vbdev associated with this IO */ 152 struct spdk_bdev_io_wait_entry bdev_io_wait; /* for bdev_io_wait */ 153 struct spdk_bdev_io *orig_io; /* the original IO */ 154 struct spdk_io_channel *ch; /* for resubmission */ 155 int status; /* save for completion on orig thread */ 156 }; 157 158 /* Shared mempools between all devices on this system */ 159 static struct rte_mempool *g_mbuf_mp = NULL; /* mbuf mempool */ 160 static struct rte_mempool *g_comp_op_mp = NULL; /* comp operations, must be rte* mempool */ 161 static struct rte_mbuf_ext_shared_info g_shinfo = {}; /* used by DPDK mbuf macros */ 162 static bool g_qat_available = false; 163 static bool g_isal_available = false; 164 165 /* Create shared (between all ops per PMD) compress xforms. */ 166 static struct rte_comp_xform g_comp_xform = { 167 .type = RTE_COMP_COMPRESS, 168 .compress = { 169 .algo = RTE_COMP_ALGO_DEFLATE, 170 .deflate.huffman = RTE_COMP_HUFFMAN_DEFAULT, 171 .level = RTE_COMP_LEVEL_MAX, 172 .window_size = DEFAULT_WINDOW_SIZE, 173 .chksum = RTE_COMP_CHECKSUM_NONE, 174 .hash_algo = RTE_COMP_HASH_ALGO_NONE 175 } 176 }; 177 /* Create shared (between all ops per PMD) decompress xforms. */ 178 static struct rte_comp_xform g_decomp_xform = { 179 .type = RTE_COMP_DECOMPRESS, 180 .decompress = { 181 .algo = RTE_COMP_ALGO_DEFLATE, 182 .chksum = RTE_COMP_CHECKSUM_NONE, 183 .window_size = DEFAULT_WINDOW_SIZE, 184 .hash_algo = RTE_COMP_HASH_ALGO_NONE 185 } 186 }; 187 188 static void vbdev_compress_examine(struct spdk_bdev *bdev); 189 static int vbdev_compress_claim(struct vbdev_compress *comp_bdev); 190 static void vbdev_compress_queue_io(struct spdk_bdev_io *bdev_io); 191 struct vbdev_compress *_prepare_for_load_init(struct spdk_bdev_desc *bdev_desc, uint32_t lb_size); 192 static void vbdev_compress_submit_request(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io); 193 static void comp_bdev_ch_destroy_cb(void *io_device, void *ctx_buf); 194 static void vbdev_compress_delete_done(void *cb_arg, int bdeverrno); 195 196 /* Dummy function used by DPDK to free ext attached buffers 197 * to mbufs, we free them ourselves but this callback has to 198 * be here. 199 */ 200 static void 201 shinfo_free_cb(void *arg1, void *arg2) 202 { 203 } 204 205 /* Called by vbdev_init_compress_drivers() to init each discovered compression device */ 206 static int 207 create_compress_dev(uint8_t index) 208 { 209 struct compress_dev *device; 210 uint16_t q_pairs; 211 uint8_t cdev_id; 212 int rc, i; 213 struct comp_device_qp *dev_qp; 214 struct comp_device_qp *tmp_qp; 215 216 device = calloc(1, sizeof(struct compress_dev)); 217 if (!device) { 218 return -ENOMEM; 219 } 220 221 /* Get details about this device. */ 222 rte_compressdev_info_get(index, &device->cdev_info); 223 224 cdev_id = device->cdev_id = index; 225 226 /* Zero means no limit so choose number of lcores. */ 227 if (device->cdev_info.max_nb_queue_pairs == 0) { 228 q_pairs = MAX_NUM_QP; 229 } else { 230 q_pairs = spdk_min(device->cdev_info.max_nb_queue_pairs, MAX_NUM_QP); 231 } 232 233 /* Configure the compression device. */ 234 struct rte_compressdev_config config = { 235 .socket_id = rte_socket_id(), 236 .nb_queue_pairs = q_pairs, 237 .max_nb_priv_xforms = NUM_MAX_XFORMS, 238 .max_nb_streams = 0 239 }; 240 rc = rte_compressdev_configure(cdev_id, &config); 241 if (rc < 0) { 242 SPDK_ERRLOG("Failed to configure compressdev %u\n", cdev_id); 243 goto err; 244 } 245 246 /* Pre-setup all potential qpairs now and assign them in the channel 247 * callback. 248 */ 249 for (i = 0; i < q_pairs; i++) { 250 rc = rte_compressdev_queue_pair_setup(cdev_id, i, 251 NUM_MAX_INFLIGHT_OPS, 252 rte_socket_id()); 253 if (rc) { 254 if (i > 0) { 255 q_pairs = i; 256 SPDK_NOTICELOG("FYI failed to setup a queue pair on " 257 "compressdev %u with error %u " 258 "so limiting to %u qpairs\n", 259 cdev_id, rc, q_pairs); 260 break; 261 } else { 262 SPDK_ERRLOG("Failed to setup queue pair on " 263 "compressdev %u with error %u\n", cdev_id, rc); 264 rc = -EINVAL; 265 goto err; 266 } 267 } 268 } 269 270 rc = rte_compressdev_start(cdev_id); 271 if (rc < 0) { 272 SPDK_ERRLOG("Failed to start device %u: error %d\n", 273 cdev_id, rc); 274 goto err; 275 } 276 277 if (device->cdev_info.capabilities->comp_feature_flags & RTE_COMP_FF_SHAREABLE_PRIV_XFORM) { 278 rc = rte_compressdev_private_xform_create(cdev_id, &g_comp_xform, 279 &device->comp_xform); 280 if (rc < 0) { 281 SPDK_ERRLOG("Failed to create private comp xform device %u: error %d\n", 282 cdev_id, rc); 283 goto err; 284 } 285 286 rc = rte_compressdev_private_xform_create(cdev_id, &g_decomp_xform, 287 &device->decomp_xform); 288 if (rc) { 289 SPDK_ERRLOG("Failed to create private decomp xform device %u: error %d\n", 290 cdev_id, rc); 291 goto err; 292 } 293 } else { 294 SPDK_ERRLOG("PMD does not support shared transforms\n"); 295 goto err; 296 } 297 298 /* Build up list of device/qp combinations */ 299 for (i = 0; i < q_pairs; i++) { 300 dev_qp = calloc(1, sizeof(struct comp_device_qp)); 301 if (!dev_qp) { 302 rc = -ENOMEM; 303 goto err; 304 } 305 dev_qp->device = device; 306 dev_qp->qp = i; 307 dev_qp->thread = NULL; 308 TAILQ_INSERT_TAIL(&g_comp_device_qp, dev_qp, link); 309 } 310 311 TAILQ_INSERT_TAIL(&g_compress_devs, device, link); 312 313 if (strcmp(device->cdev_info.driver_name, QAT_PMD) == 0) { 314 g_qat_available = true; 315 } 316 if (strcmp(device->cdev_info.driver_name, ISAL_PMD) == 0) { 317 g_isal_available = true; 318 } 319 320 return 0; 321 322 err: 323 TAILQ_FOREACH_SAFE(dev_qp, &g_comp_device_qp, link, tmp_qp) { 324 TAILQ_REMOVE(&g_comp_device_qp, dev_qp, link); 325 free(dev_qp); 326 } 327 free(device); 328 return rc; 329 } 330 331 /* Called from driver init entry point, vbdev_compress_init() */ 332 static int 333 vbdev_init_compress_drivers(void) 334 { 335 uint8_t cdev_count, i; 336 struct compress_dev *tmp_dev; 337 struct compress_dev *device; 338 int rc; 339 340 /* We always init the compress_isal PMD */ 341 rc = rte_vdev_init(ISAL_PMD, NULL); 342 if (rc == 0) { 343 SPDK_NOTICELOG("created virtual PMD %s\n", ISAL_PMD); 344 } else if (rc == -EEXIST) { 345 SPDK_NOTICELOG("virtual PMD %s already exists.\n", ISAL_PMD); 346 } else { 347 SPDK_ERRLOG("creating virtual PMD %s\n", ISAL_PMD); 348 return -EINVAL; 349 } 350 351 /* If we have no compression devices, there's no reason to continue. */ 352 cdev_count = rte_compressdev_count(); 353 if (cdev_count == 0) { 354 return 0; 355 } 356 if (cdev_count > RTE_COMPRESS_MAX_DEVS) { 357 SPDK_ERRLOG("invalid device count from rte_compressdev_count()\n"); 358 return -EINVAL; 359 } 360 361 g_mbuf_mp = rte_pktmbuf_pool_create("comp_mbuf_mp", NUM_MBUFS, POOL_CACHE_SIZE, 362 sizeof(struct rte_mbuf), 0, rte_socket_id()); 363 if (g_mbuf_mp == NULL) { 364 SPDK_ERRLOG("Cannot create mbuf pool\n"); 365 rc = -ENOMEM; 366 goto error_create_mbuf; 367 } 368 369 g_comp_op_mp = rte_comp_op_pool_create("comp_op_pool", NUM_MBUFS, POOL_CACHE_SIZE, 370 0, rte_socket_id()); 371 if (g_comp_op_mp == NULL) { 372 SPDK_ERRLOG("Cannot create comp op pool\n"); 373 rc = -ENOMEM; 374 goto error_create_op; 375 } 376 377 /* Init all devices */ 378 for (i = 0; i < cdev_count; i++) { 379 rc = create_compress_dev(i); 380 if (rc != 0) { 381 goto error_create_compress_devs; 382 } 383 } 384 385 if (g_qat_available == true) { 386 SPDK_NOTICELOG("initialized QAT PMD\n"); 387 } 388 389 g_shinfo.free_cb = shinfo_free_cb; 390 391 return 0; 392 393 /* Error cleanup paths. */ 394 error_create_compress_devs: 395 TAILQ_FOREACH_SAFE(device, &g_compress_devs, link, tmp_dev) { 396 TAILQ_REMOVE(&g_compress_devs, device, link); 397 free(device); 398 } 399 error_create_op: 400 error_create_mbuf: 401 rte_mempool_free(g_mbuf_mp); 402 403 return rc; 404 } 405 406 /* for completing rw requests on the orig IO thread. */ 407 static void 408 _reduce_rw_blocks_cb(void *arg) 409 { 410 struct comp_bdev_io *io_ctx = arg; 411 412 if (io_ctx->status == 0) { 413 spdk_bdev_io_complete(io_ctx->orig_io, SPDK_BDEV_IO_STATUS_SUCCESS); 414 } else { 415 SPDK_ERRLOG("status %d on operation from reduce API\n", io_ctx->status); 416 spdk_bdev_io_complete(io_ctx->orig_io, SPDK_BDEV_IO_STATUS_FAILED); 417 } 418 } 419 420 /* Completion callback for r/w that were issued via reducelib. */ 421 static void 422 reduce_rw_blocks_cb(void *arg, int reduce_errno) 423 { 424 struct spdk_bdev_io *bdev_io = arg; 425 struct comp_bdev_io *io_ctx = (struct comp_bdev_io *)bdev_io->driver_ctx; 426 struct spdk_io_channel *ch = spdk_io_channel_from_ctx(io_ctx->comp_ch); 427 struct spdk_thread *orig_thread; 428 429 /* TODO: need to decide which error codes are bdev_io success vs failure; 430 * example examine calls reading metadata */ 431 432 io_ctx->status = reduce_errno; 433 434 /* Send this request to the orig IO thread. */ 435 orig_thread = spdk_io_channel_get_thread(ch); 436 if (orig_thread != spdk_get_thread()) { 437 spdk_thread_send_msg(orig_thread, _reduce_rw_blocks_cb, io_ctx); 438 } else { 439 _reduce_rw_blocks_cb(io_ctx); 440 } 441 } 442 443 static uint64_t 444 _setup_compress_mbuf(struct rte_mbuf **mbufs, int *mbuf_total, uint64_t *total_length, 445 struct iovec *iovs, int iovcnt, void *reduce_cb_arg) 446 { 447 uint64_t updated_length, remainder, phys_addr; 448 uint8_t *current_base = NULL; 449 int iov_index, mbuf_index; 450 int rc = 0; 451 452 /* Setup mbufs */ 453 iov_index = mbuf_index = 0; 454 while (iov_index < iovcnt) { 455 456 current_base = iovs[iov_index].iov_base; 457 if (total_length) { 458 *total_length += iovs[iov_index].iov_len; 459 } 460 assert(mbufs[mbuf_index] != NULL); 461 mbufs[mbuf_index]->userdata = reduce_cb_arg; 462 updated_length = iovs[iov_index].iov_len; 463 phys_addr = spdk_vtophys((void *)current_base, &updated_length); 464 465 rte_pktmbuf_attach_extbuf(mbufs[mbuf_index], 466 current_base, 467 phys_addr, 468 updated_length, 469 &g_shinfo); 470 rte_pktmbuf_append(mbufs[mbuf_index], updated_length); 471 remainder = iovs[iov_index].iov_len - updated_length; 472 473 if (mbuf_index > 0) { 474 rte_pktmbuf_chain(mbufs[0], mbufs[mbuf_index]); 475 } 476 477 /* If we crossed 2 2MB boundary we need another mbuf for the remainder */ 478 if (remainder > 0) { 479 /* allocate an mbuf at the end of the array */ 480 rc = rte_pktmbuf_alloc_bulk(g_mbuf_mp, 481 (struct rte_mbuf **)&mbufs[*mbuf_total], 1); 482 if (rc) { 483 SPDK_ERRLOG("ERROR trying to get an extra mbuf!\n"); 484 return -1; 485 } 486 (*mbuf_total)++; 487 mbuf_index++; 488 mbufs[mbuf_index]->userdata = reduce_cb_arg; 489 current_base += updated_length; 490 phys_addr = spdk_vtophys((void *)current_base, &remainder); 491 /* assert we don't cross another */ 492 assert(remainder == iovs[iov_index].iov_len - updated_length); 493 494 rte_pktmbuf_attach_extbuf(mbufs[mbuf_index], 495 current_base, 496 phys_addr, 497 remainder, 498 &g_shinfo); 499 rte_pktmbuf_append(mbufs[mbuf_index], remainder); 500 rte_pktmbuf_chain(mbufs[0], mbufs[mbuf_index]); 501 } 502 iov_index++; 503 mbuf_index++; 504 } 505 506 return 0; 507 } 508 509 static int 510 _compress_operation(struct spdk_reduce_backing_dev *backing_dev, struct iovec *src_iovs, 511 int src_iovcnt, struct iovec *dst_iovs, 512 int dst_iovcnt, bool compress, void *cb_arg) 513 { 514 void *reduce_cb_arg = cb_arg; 515 struct vbdev_compress *comp_bdev = SPDK_CONTAINEROF(backing_dev, struct vbdev_compress, 516 backing_dev); 517 struct rte_comp_op *comp_op; 518 struct rte_mbuf *src_mbufs[MAX_MBUFS_PER_OP]; 519 struct rte_mbuf *dst_mbufs[MAX_MBUFS_PER_OP]; 520 uint8_t cdev_id = comp_bdev->device_qp->device->cdev_id; 521 uint64_t total_length = 0; 522 int rc = 0; 523 struct vbdev_comp_op *op_to_queue; 524 int i; 525 int src_mbuf_total = src_iovcnt; 526 int dst_mbuf_total = dst_iovcnt; 527 bool device_error = false; 528 529 assert(src_iovcnt < MAX_MBUFS_PER_OP); 530 531 #ifdef DEBUG 532 memset(src_mbufs, 0, sizeof(src_mbufs)); 533 memset(dst_mbufs, 0, sizeof(dst_mbufs)); 534 #endif 535 536 comp_op = rte_comp_op_alloc(g_comp_op_mp); 537 if (!comp_op) { 538 SPDK_ERRLOG("trying to get a comp op!\n"); 539 goto error_get_op; 540 } 541 542 /* get an mbuf per iov, src and dst */ 543 rc = rte_pktmbuf_alloc_bulk(g_mbuf_mp, (struct rte_mbuf **)&src_mbufs[0], src_iovcnt); 544 if (rc) { 545 SPDK_ERRLOG("ERROR trying to get src_mbufs!\n"); 546 goto error_get_src; 547 } 548 549 rc = rte_pktmbuf_alloc_bulk(g_mbuf_mp, (struct rte_mbuf **)&dst_mbufs[0], dst_iovcnt); 550 if (rc) { 551 SPDK_ERRLOG("ERROR trying to get dst_mbufs!\n"); 552 goto error_get_dst; 553 } 554 555 /* There is a 1:1 mapping between a bdev_io and a compression operation, but 556 * all compression PMDs that SPDK uses support chaining so build our mbuf chain 557 * and associate with our single comp_op. 558 */ 559 560 rc = _setup_compress_mbuf(&src_mbufs[0], &src_mbuf_total, &total_length, 561 src_iovs, src_iovcnt, reduce_cb_arg); 562 if (rc < 0) { 563 goto error_src_dst; 564 } 565 566 comp_op->m_src = src_mbufs[0]; 567 comp_op->src.offset = 0; 568 comp_op->src.length = total_length; 569 570 /* setup dst mbufs, for the current test being used with this code there's only one vector */ 571 rc = _setup_compress_mbuf(&dst_mbufs[0], &dst_mbuf_total, NULL, 572 dst_iovs, dst_iovcnt, reduce_cb_arg); 573 if (rc < 0) { 574 goto error_src_dst; 575 } 576 577 comp_op->m_dst = dst_mbufs[0]; 578 comp_op->dst.offset = 0; 579 580 if (compress == true) { 581 comp_op->private_xform = comp_bdev->device_qp->device->comp_xform; 582 } else { 583 comp_op->private_xform = comp_bdev->device_qp->device->decomp_xform; 584 } 585 586 comp_op->op_type = RTE_COMP_OP_STATELESS; 587 comp_op->flush_flag = RTE_COMP_FLUSH_FINAL; 588 589 rc = rte_compressdev_enqueue_burst(cdev_id, comp_bdev->device_qp->qp, &comp_op, 1); 590 assert(rc <= 1); 591 592 /* We always expect 1 got queued, if 0 then we need to queue it up. */ 593 if (rc == 1) { 594 return 0; 595 } else if (comp_op->status == RTE_COMP_OP_STATUS_NOT_PROCESSED) { 596 /* we free mbufs differently depending on whether they were chained or not */ 597 rte_pktmbuf_free(comp_op->m_src); 598 rte_pktmbuf_free(comp_op->m_dst); 599 goto error_enqueue; 600 } else { 601 device_error = true; 602 goto error_src_dst; 603 } 604 605 /* Error cleanup paths. */ 606 error_src_dst: 607 for (i = 0; i < dst_mbuf_total; i++) { 608 rte_pktmbuf_free((struct rte_mbuf *)&dst_mbufs[i]); 609 } 610 error_get_dst: 611 for (i = 0; i < src_mbuf_total; i++) { 612 rte_pktmbuf_free((struct rte_mbuf *)&src_mbufs[i]); 613 } 614 error_get_src: 615 error_enqueue: 616 rte_comp_op_free(comp_op); 617 error_get_op: 618 619 if (device_error == true) { 620 /* There was an error sending the op to the device, most 621 * likely with the parameters. 622 */ 623 SPDK_ERRLOG("Compression API returned 0x%x\n", comp_op->status); 624 return -EINVAL; 625 } 626 627 op_to_queue = calloc(1, sizeof(struct vbdev_comp_op)); 628 if (op_to_queue == NULL) { 629 SPDK_ERRLOG("unable to allocate operation for queueing.\n"); 630 return -ENOMEM; 631 } 632 op_to_queue->backing_dev = backing_dev; 633 op_to_queue->src_iovs = src_iovs; 634 op_to_queue->src_iovcnt = src_iovcnt; 635 op_to_queue->dst_iovs = dst_iovs; 636 op_to_queue->dst_iovcnt = dst_iovcnt; 637 op_to_queue->compress = compress; 638 op_to_queue->cb_arg = cb_arg; 639 TAILQ_INSERT_TAIL(&comp_bdev->queued_comp_ops, 640 op_to_queue, 641 link); 642 return 0; 643 } 644 645 /* Poller for the DPDK compression driver. */ 646 static int 647 comp_dev_poller(void *args) 648 { 649 struct vbdev_compress *comp_bdev = args; 650 uint8_t cdev_id = comp_bdev->device_qp->device->cdev_id; 651 struct rte_comp_op *deq_ops[NUM_MAX_INFLIGHT_OPS]; 652 uint16_t num_deq; 653 struct spdk_reduce_vol_cb_args *reduce_args; 654 struct vbdev_comp_op *op_to_resubmit; 655 int rc, i; 656 657 num_deq = rte_compressdev_dequeue_burst(cdev_id, comp_bdev->device_qp->qp, deq_ops, 658 NUM_MAX_INFLIGHT_OPS); 659 for (i = 0; i < num_deq; i++) { 660 reduce_args = (struct spdk_reduce_vol_cb_args *)deq_ops[i]->m_src->userdata; 661 662 if (deq_ops[i]->status == RTE_COMP_OP_STATUS_SUCCESS) { 663 664 /* tell reduce this is done and what the bytecount was */ 665 reduce_args->cb_fn(reduce_args->cb_arg, deq_ops[i]->produced); 666 } else { 667 SPDK_NOTICELOG("FYI storing data uncompressed due to deque status %u\n", 668 deq_ops[i]->status); 669 670 /* Reduce will simply store uncompressed on neg errno value. */ 671 reduce_args->cb_fn(reduce_args->cb_arg, -EINVAL); 672 } 673 674 /* Now free both mbufs and the compress operation. The rte_pktmbuf_free() 675 * call takes care of freeing all of the mbufs in the chain back to their 676 * original pool. 677 */ 678 rte_pktmbuf_free(deq_ops[i]->m_src); 679 rte_pktmbuf_free(deq_ops[i]->m_dst); 680 681 /* There is no bulk free for com ops so we have to free them one at a time 682 * here however it would be rare that we'd ever have more than 1 at a time 683 * anyways. 684 */ 685 rte_comp_op_free(deq_ops[i]); 686 687 /* Check if there are any pending comp ops to process, only pull one 688 * at a time off as _compress_operation() may re-queue the op. 689 */ 690 if (!TAILQ_EMPTY(&comp_bdev->queued_comp_ops)) { 691 op_to_resubmit = TAILQ_FIRST(&comp_bdev->queued_comp_ops); 692 rc = _compress_operation(op_to_resubmit->backing_dev, 693 op_to_resubmit->src_iovs, 694 op_to_resubmit->src_iovcnt, 695 op_to_resubmit->dst_iovs, 696 op_to_resubmit->dst_iovcnt, 697 op_to_resubmit->compress, 698 op_to_resubmit->cb_arg); 699 if (rc == 0) { 700 TAILQ_REMOVE(&comp_bdev->queued_comp_ops, op_to_resubmit, link); 701 free(op_to_resubmit); 702 } 703 } 704 } 705 return num_deq == 0 ? SPDK_POLLER_IDLE : SPDK_POLLER_BUSY; 706 } 707 708 /* Entry point for reduce lib to issue a compress operation. */ 709 static void 710 _comp_reduce_compress(struct spdk_reduce_backing_dev *dev, 711 struct iovec *src_iovs, int src_iovcnt, 712 struct iovec *dst_iovs, int dst_iovcnt, 713 struct spdk_reduce_vol_cb_args *cb_arg) 714 { 715 int rc; 716 717 rc = _compress_operation(dev, src_iovs, src_iovcnt, dst_iovs, dst_iovcnt, true, cb_arg); 718 if (rc) { 719 SPDK_ERRLOG("with compress operation code %d (%s)\n", rc, spdk_strerror(-rc)); 720 cb_arg->cb_fn(cb_arg->cb_arg, rc); 721 } 722 } 723 724 /* Entry point for reduce lib to issue a decompress operation. */ 725 static void 726 _comp_reduce_decompress(struct spdk_reduce_backing_dev *dev, 727 struct iovec *src_iovs, int src_iovcnt, 728 struct iovec *dst_iovs, int dst_iovcnt, 729 struct spdk_reduce_vol_cb_args *cb_arg) 730 { 731 int rc; 732 733 rc = _compress_operation(dev, src_iovs, src_iovcnt, dst_iovs, dst_iovcnt, false, cb_arg); 734 if (rc) { 735 SPDK_ERRLOG("with decompress operation code %d (%s)\n", rc, spdk_strerror(-rc)); 736 cb_arg->cb_fn(cb_arg->cb_arg, rc); 737 } 738 } 739 740 /* Callback for getting a buf from the bdev pool in the event that the caller passed 741 * in NULL, we need to own the buffer so it doesn't get freed by another vbdev module 742 * beneath us before we're done with it. 743 */ 744 static void 745 comp_read_get_buf_cb(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io, bool success) 746 { 747 struct vbdev_compress *comp_bdev = SPDK_CONTAINEROF(bdev_io->bdev, struct vbdev_compress, 748 comp_bdev); 749 750 spdk_reduce_vol_readv(comp_bdev->vol, bdev_io->u.bdev.iovs, bdev_io->u.bdev.iovcnt, 751 bdev_io->u.bdev.offset_blocks, bdev_io->u.bdev.num_blocks, 752 reduce_rw_blocks_cb, bdev_io); 753 } 754 755 /* scheduled for completion on IO thread */ 756 static void 757 _complete_other_io(void *arg) 758 { 759 struct comp_bdev_io *io_ctx = (struct comp_bdev_io *)arg; 760 if (io_ctx->status == 0) { 761 spdk_bdev_io_complete(io_ctx->orig_io, SPDK_BDEV_IO_STATUS_SUCCESS); 762 } else { 763 spdk_bdev_io_complete(io_ctx->orig_io, SPDK_BDEV_IO_STATUS_FAILED); 764 } 765 } 766 767 /* scheduled for submission on reduce thread */ 768 static void 769 _comp_bdev_io_submit(void *arg) 770 { 771 struct spdk_bdev_io *bdev_io = arg; 772 struct comp_bdev_io *io_ctx = (struct comp_bdev_io *)bdev_io->driver_ctx; 773 struct spdk_io_channel *ch = spdk_io_channel_from_ctx(io_ctx->comp_ch); 774 struct vbdev_compress *comp_bdev = SPDK_CONTAINEROF(bdev_io->bdev, struct vbdev_compress, 775 comp_bdev); 776 struct spdk_thread *orig_thread; 777 int rc = 0; 778 779 switch (bdev_io->type) { 780 case SPDK_BDEV_IO_TYPE_READ: 781 spdk_bdev_io_get_buf(bdev_io, comp_read_get_buf_cb, 782 bdev_io->u.bdev.num_blocks * bdev_io->bdev->blocklen); 783 return; 784 case SPDK_BDEV_IO_TYPE_WRITE: 785 spdk_reduce_vol_writev(comp_bdev->vol, bdev_io->u.bdev.iovs, bdev_io->u.bdev.iovcnt, 786 bdev_io->u.bdev.offset_blocks, bdev_io->u.bdev.num_blocks, 787 reduce_rw_blocks_cb, bdev_io); 788 return; 789 /* TODO in future patch in the series */ 790 case SPDK_BDEV_IO_TYPE_RESET: 791 break; 792 case SPDK_BDEV_IO_TYPE_WRITE_ZEROES: 793 case SPDK_BDEV_IO_TYPE_UNMAP: 794 case SPDK_BDEV_IO_TYPE_FLUSH: 795 default: 796 SPDK_ERRLOG("Unknown I/O type %d\n", bdev_io->type); 797 rc = -EINVAL; 798 } 799 800 if (rc) { 801 if (rc == -ENOMEM) { 802 SPDK_ERRLOG("No memory, start to queue io for compress.\n"); 803 io_ctx->ch = ch; 804 vbdev_compress_queue_io(bdev_io); 805 return; 806 } else { 807 SPDK_ERRLOG("on bdev_io submission!\n"); 808 io_ctx->status = rc; 809 } 810 } 811 812 /* Complete this on the orig IO thread. */ 813 orig_thread = spdk_io_channel_get_thread(ch); 814 if (orig_thread != spdk_get_thread()) { 815 spdk_thread_send_msg(orig_thread, _complete_other_io, io_ctx); 816 } else { 817 _complete_other_io(io_ctx); 818 } 819 } 820 821 /* Called when someone above submits IO to this vbdev. */ 822 static void 823 vbdev_compress_submit_request(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io) 824 { 825 struct comp_bdev_io *io_ctx = (struct comp_bdev_io *)bdev_io->driver_ctx; 826 struct vbdev_compress *comp_bdev = SPDK_CONTAINEROF(bdev_io->bdev, struct vbdev_compress, 827 comp_bdev); 828 struct comp_io_channel *comp_ch = spdk_io_channel_get_ctx(ch); 829 830 memset(io_ctx, 0, sizeof(struct comp_bdev_io)); 831 io_ctx->comp_bdev = comp_bdev; 832 io_ctx->comp_ch = comp_ch; 833 io_ctx->orig_io = bdev_io; 834 835 /* Send this request to the reduce_thread if that's not what we're on. */ 836 if (spdk_get_thread() != comp_bdev->reduce_thread) { 837 spdk_thread_send_msg(comp_bdev->reduce_thread, _comp_bdev_io_submit, bdev_io); 838 } else { 839 _comp_bdev_io_submit(bdev_io); 840 } 841 } 842 843 static bool 844 vbdev_compress_io_type_supported(void *ctx, enum spdk_bdev_io_type io_type) 845 { 846 struct vbdev_compress *comp_bdev = (struct vbdev_compress *)ctx; 847 848 switch (io_type) { 849 case SPDK_BDEV_IO_TYPE_READ: 850 case SPDK_BDEV_IO_TYPE_WRITE: 851 return spdk_bdev_io_type_supported(comp_bdev->base_bdev, io_type); 852 case SPDK_BDEV_IO_TYPE_UNMAP: 853 case SPDK_BDEV_IO_TYPE_RESET: 854 case SPDK_BDEV_IO_TYPE_FLUSH: 855 case SPDK_BDEV_IO_TYPE_WRITE_ZEROES: 856 default: 857 return false; 858 } 859 } 860 861 /* Resubmission function used by the bdev layer when a queued IO is ready to be 862 * submitted. 863 */ 864 static void 865 vbdev_compress_resubmit_io(void *arg) 866 { 867 struct spdk_bdev_io *bdev_io = (struct spdk_bdev_io *)arg; 868 struct comp_bdev_io *io_ctx = (struct comp_bdev_io *)bdev_io->driver_ctx; 869 870 vbdev_compress_submit_request(io_ctx->ch, bdev_io); 871 } 872 873 /* Used to queue an IO in the event of resource issues. */ 874 static void 875 vbdev_compress_queue_io(struct spdk_bdev_io *bdev_io) 876 { 877 struct comp_bdev_io *io_ctx = (struct comp_bdev_io *)bdev_io->driver_ctx; 878 int rc; 879 880 io_ctx->bdev_io_wait.bdev = bdev_io->bdev; 881 io_ctx->bdev_io_wait.cb_fn = vbdev_compress_resubmit_io; 882 io_ctx->bdev_io_wait.cb_arg = bdev_io; 883 884 rc = spdk_bdev_queue_io_wait(bdev_io->bdev, io_ctx->comp_bdev->base_ch, &io_ctx->bdev_io_wait); 885 if (rc) { 886 SPDK_ERRLOG("Queue io failed in vbdev_compress_queue_io, rc=%d.\n", rc); 887 assert(false); 888 spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED); 889 } 890 } 891 892 /* Callback for unregistering the IO device. */ 893 static void 894 _device_unregister_cb(void *io_device) 895 { 896 struct vbdev_compress *comp_bdev = io_device; 897 898 /* Done with this comp_bdev. */ 899 pthread_mutex_destroy(&comp_bdev->reduce_lock); 900 free(comp_bdev->comp_bdev.name); 901 free(comp_bdev); 902 } 903 904 static void 905 _vbdev_compress_destruct_cb(void *ctx) 906 { 907 struct vbdev_compress *comp_bdev = ctx; 908 909 TAILQ_REMOVE(&g_vbdev_comp, comp_bdev, link); 910 spdk_bdev_module_release_bdev(comp_bdev->base_bdev); 911 /* Close the underlying bdev on its same opened thread. */ 912 spdk_bdev_close(comp_bdev->base_desc); 913 comp_bdev->vol = NULL; 914 if (comp_bdev->orphaned == false) { 915 spdk_io_device_unregister(comp_bdev, _device_unregister_cb); 916 } else { 917 vbdev_compress_delete_done(comp_bdev->delete_ctx, 0); 918 _device_unregister_cb(comp_bdev); 919 } 920 } 921 922 static void 923 vbdev_compress_destruct_cb(void *cb_arg, int reduce_errno) 924 { 925 struct vbdev_compress *comp_bdev = (struct vbdev_compress *)cb_arg; 926 927 if (reduce_errno) { 928 SPDK_ERRLOG("number %d\n", reduce_errno); 929 } else { 930 if (comp_bdev->thread && comp_bdev->thread != spdk_get_thread()) { 931 spdk_thread_send_msg(comp_bdev->thread, 932 _vbdev_compress_destruct_cb, comp_bdev); 933 } else { 934 _vbdev_compress_destruct_cb(comp_bdev); 935 } 936 } 937 } 938 939 static void 940 _reduce_destroy_cb(void *ctx, int reduce_errno) 941 { 942 struct vbdev_compress *comp_bdev = (struct vbdev_compress *)ctx; 943 944 if (reduce_errno) { 945 SPDK_ERRLOG("number %d\n", reduce_errno); 946 } 947 948 comp_bdev->vol = NULL; 949 spdk_put_io_channel(comp_bdev->base_ch); 950 if (comp_bdev->orphaned == false) { 951 spdk_bdev_unregister(&comp_bdev->comp_bdev, vbdev_compress_delete_done, 952 comp_bdev->delete_ctx); 953 } else { 954 vbdev_compress_destruct_cb((void *)comp_bdev, 0); 955 } 956 957 } 958 959 static void 960 _delete_vol_unload_cb(void *ctx) 961 { 962 struct vbdev_compress *comp_bdev = ctx; 963 964 /* FIXME: Assert if these conditions are not satisified for now. */ 965 assert(!comp_bdev->reduce_thread || 966 comp_bdev->reduce_thread == spdk_get_thread()); 967 968 /* reducelib needs a channel to comm with the backing device */ 969 comp_bdev->base_ch = spdk_bdev_get_io_channel(comp_bdev->base_desc); 970 971 /* Clean the device before we free our resources. */ 972 spdk_reduce_vol_destroy(&comp_bdev->backing_dev, _reduce_destroy_cb, comp_bdev); 973 } 974 975 /* Called by reduceLib after performing unload vol actions */ 976 static void 977 delete_vol_unload_cb(void *cb_arg, int reduce_errno) 978 { 979 struct vbdev_compress *comp_bdev = (struct vbdev_compress *)cb_arg; 980 981 if (reduce_errno) { 982 SPDK_ERRLOG("number %d\n", reduce_errno); 983 /* FIXME: callback should be executed. */ 984 return; 985 } 986 987 pthread_mutex_lock(&comp_bdev->reduce_lock); 988 if (comp_bdev->reduce_thread && comp_bdev->reduce_thread != spdk_get_thread()) { 989 spdk_thread_send_msg(comp_bdev->reduce_thread, 990 _delete_vol_unload_cb, comp_bdev); 991 pthread_mutex_unlock(&comp_bdev->reduce_lock); 992 } else { 993 pthread_mutex_unlock(&comp_bdev->reduce_lock); 994 995 _delete_vol_unload_cb(comp_bdev); 996 } 997 } 998 999 const char * 1000 compress_get_name(const struct vbdev_compress *comp_bdev) 1001 { 1002 return comp_bdev->comp_bdev.name; 1003 } 1004 1005 struct vbdev_compress * 1006 compress_bdev_first(void) 1007 { 1008 struct vbdev_compress *comp_bdev; 1009 1010 comp_bdev = TAILQ_FIRST(&g_vbdev_comp); 1011 1012 return comp_bdev; 1013 } 1014 1015 struct vbdev_compress * 1016 compress_bdev_next(struct vbdev_compress *prev) 1017 { 1018 struct vbdev_compress *comp_bdev; 1019 1020 comp_bdev = TAILQ_NEXT(prev, link); 1021 1022 return comp_bdev; 1023 } 1024 1025 bool 1026 compress_has_orphan(const char *name) 1027 { 1028 struct vbdev_compress *comp_bdev; 1029 1030 TAILQ_FOREACH(comp_bdev, &g_vbdev_comp, link) { 1031 if (comp_bdev->orphaned && strcmp(name, comp_bdev->comp_bdev.name) == 0) { 1032 return true; 1033 } 1034 } 1035 return false; 1036 } 1037 1038 /* Called after we've unregistered following a hot remove callback. 1039 * Our finish entry point will be called next. 1040 */ 1041 static int 1042 vbdev_compress_destruct(void *ctx) 1043 { 1044 struct vbdev_compress *comp_bdev = (struct vbdev_compress *)ctx; 1045 1046 if (comp_bdev->vol != NULL) { 1047 /* Tell reducelib that we're done with this volume. */ 1048 spdk_reduce_vol_unload(comp_bdev->vol, vbdev_compress_destruct_cb, comp_bdev); 1049 } else { 1050 vbdev_compress_destruct_cb(comp_bdev, 0); 1051 } 1052 1053 return 0; 1054 } 1055 1056 /* We supplied this as an entry point for upper layers who want to communicate to this 1057 * bdev. This is how they get a channel. 1058 */ 1059 static struct spdk_io_channel * 1060 vbdev_compress_get_io_channel(void *ctx) 1061 { 1062 struct vbdev_compress *comp_bdev = (struct vbdev_compress *)ctx; 1063 1064 /* The IO channel code will allocate a channel for us which consists of 1065 * the SPDK channel structure plus the size of our comp_io_channel struct 1066 * that we passed in when we registered our IO device. It will then call 1067 * our channel create callback to populate any elements that we need to 1068 * update. 1069 */ 1070 return spdk_get_io_channel(comp_bdev); 1071 } 1072 1073 /* This is the output for bdev_get_bdevs() for this vbdev */ 1074 static int 1075 vbdev_compress_dump_info_json(void *ctx, struct spdk_json_write_ctx *w) 1076 { 1077 struct vbdev_compress *comp_bdev = (struct vbdev_compress *)ctx; 1078 1079 spdk_json_write_name(w, "compress"); 1080 spdk_json_write_object_begin(w); 1081 spdk_json_write_named_string(w, "name", spdk_bdev_get_name(&comp_bdev->comp_bdev)); 1082 spdk_json_write_named_string(w, "base_bdev_name", spdk_bdev_get_name(comp_bdev->base_bdev)); 1083 spdk_json_write_named_string(w, "compression_pmd", comp_bdev->drv_name); 1084 spdk_json_write_object_end(w); 1085 1086 return 0; 1087 } 1088 1089 /* This is used to generate JSON that can configure this module to its current state. */ 1090 static int 1091 vbdev_compress_config_json(struct spdk_json_write_ctx *w) 1092 { 1093 struct vbdev_compress *comp_bdev; 1094 1095 TAILQ_FOREACH(comp_bdev, &g_vbdev_comp, link) { 1096 spdk_json_write_object_begin(w); 1097 spdk_json_write_named_string(w, "method", "bdev_compress_create"); 1098 spdk_json_write_named_object_begin(w, "params"); 1099 spdk_json_write_named_string(w, "base_bdev_name", spdk_bdev_get_name(comp_bdev->base_bdev)); 1100 spdk_json_write_named_string(w, "name", spdk_bdev_get_name(&comp_bdev->comp_bdev)); 1101 spdk_json_write_named_string(w, "compression_pmd", comp_bdev->drv_name); 1102 spdk_json_write_object_end(w); 1103 spdk_json_write_object_end(w); 1104 } 1105 return 0; 1106 } 1107 1108 static void 1109 _vbdev_reduce_init_cb(void *ctx) 1110 { 1111 struct vbdev_compress *meta_ctx = ctx; 1112 int rc; 1113 1114 assert(meta_ctx->base_desc != NULL); 1115 1116 /* We're done with metadata operations */ 1117 spdk_put_io_channel(meta_ctx->base_ch); 1118 1119 if (meta_ctx->vol) { 1120 rc = vbdev_compress_claim(meta_ctx); 1121 if (rc == 0) { 1122 return; 1123 } 1124 } 1125 1126 /* Close the underlying bdev on its same opened thread. */ 1127 spdk_bdev_close(meta_ctx->base_desc); 1128 free(meta_ctx); 1129 } 1130 1131 /* Callback from reduce for when init is complete. We'll pass the vbdev_comp struct 1132 * used for initial metadata operations to claim where it will be further filled out 1133 * and added to the global list. 1134 */ 1135 static void 1136 vbdev_reduce_init_cb(void *cb_arg, struct spdk_reduce_vol *vol, int reduce_errno) 1137 { 1138 struct vbdev_compress *meta_ctx = cb_arg; 1139 1140 if (reduce_errno == 0) { 1141 meta_ctx->vol = vol; 1142 } else { 1143 SPDK_ERRLOG("for vol %s, error %u\n", 1144 spdk_bdev_get_name(meta_ctx->base_bdev), reduce_errno); 1145 } 1146 1147 if (meta_ctx->thread && meta_ctx->thread != spdk_get_thread()) { 1148 spdk_thread_send_msg(meta_ctx->thread, _vbdev_reduce_init_cb, meta_ctx); 1149 } else { 1150 _vbdev_reduce_init_cb(meta_ctx); 1151 } 1152 } 1153 1154 /* Callback for the function used by reduceLib to perform IO to/from the backing device. We just 1155 * call the callback provided by reduceLib when it called the read/write/unmap function and 1156 * free the bdev_io. 1157 */ 1158 static void 1159 comp_reduce_io_cb(struct spdk_bdev_io *bdev_io, bool success, void *arg) 1160 { 1161 struct spdk_reduce_vol_cb_args *cb_args = arg; 1162 int reduce_errno; 1163 1164 if (success) { 1165 reduce_errno = 0; 1166 } else { 1167 reduce_errno = -EIO; 1168 } 1169 spdk_bdev_free_io(bdev_io); 1170 cb_args->cb_fn(cb_args->cb_arg, reduce_errno); 1171 } 1172 1173 /* This is the function provided to the reduceLib for sending reads directly to 1174 * the backing device. 1175 */ 1176 static void 1177 _comp_reduce_readv(struct spdk_reduce_backing_dev *dev, struct iovec *iov, int iovcnt, 1178 uint64_t lba, uint32_t lba_count, struct spdk_reduce_vol_cb_args *args) 1179 { 1180 struct vbdev_compress *comp_bdev = SPDK_CONTAINEROF(dev, struct vbdev_compress, 1181 backing_dev); 1182 int rc; 1183 1184 rc = spdk_bdev_readv_blocks(comp_bdev->base_desc, comp_bdev->base_ch, 1185 iov, iovcnt, lba, lba_count, 1186 comp_reduce_io_cb, 1187 args); 1188 if (rc) { 1189 if (rc == -ENOMEM) { 1190 SPDK_ERRLOG("No memory, start to queue io.\n"); 1191 /* TODO: there's no bdev_io to queue */ 1192 } else { 1193 SPDK_ERRLOG("submitting readv request\n"); 1194 } 1195 args->cb_fn(args->cb_arg, rc); 1196 } 1197 } 1198 1199 /* This is the function provided to the reduceLib for sending writes directly to 1200 * the backing device. 1201 */ 1202 static void 1203 _comp_reduce_writev(struct spdk_reduce_backing_dev *dev, struct iovec *iov, int iovcnt, 1204 uint64_t lba, uint32_t lba_count, struct spdk_reduce_vol_cb_args *args) 1205 { 1206 struct vbdev_compress *comp_bdev = SPDK_CONTAINEROF(dev, struct vbdev_compress, 1207 backing_dev); 1208 int rc; 1209 1210 rc = spdk_bdev_writev_blocks(comp_bdev->base_desc, comp_bdev->base_ch, 1211 iov, iovcnt, lba, lba_count, 1212 comp_reduce_io_cb, 1213 args); 1214 if (rc) { 1215 if (rc == -ENOMEM) { 1216 SPDK_ERRLOG("No memory, start to queue io.\n"); 1217 /* TODO: there's no bdev_io to queue */ 1218 } else { 1219 SPDK_ERRLOG("error submitting writev request\n"); 1220 } 1221 args->cb_fn(args->cb_arg, rc); 1222 } 1223 } 1224 1225 /* This is the function provided to the reduceLib for sending unmaps directly to 1226 * the backing device. 1227 */ 1228 static void 1229 _comp_reduce_unmap(struct spdk_reduce_backing_dev *dev, 1230 uint64_t lba, uint32_t lba_count, struct spdk_reduce_vol_cb_args *args) 1231 { 1232 struct vbdev_compress *comp_bdev = SPDK_CONTAINEROF(dev, struct vbdev_compress, 1233 backing_dev); 1234 int rc; 1235 1236 rc = spdk_bdev_unmap_blocks(comp_bdev->base_desc, comp_bdev->base_ch, 1237 lba, lba_count, 1238 comp_reduce_io_cb, 1239 args); 1240 1241 if (rc) { 1242 if (rc == -ENOMEM) { 1243 SPDK_ERRLOG("No memory, start to queue io.\n"); 1244 /* TODO: there's no bdev_io to queue */ 1245 } else { 1246 SPDK_ERRLOG("submitting unmap request\n"); 1247 } 1248 args->cb_fn(args->cb_arg, rc); 1249 } 1250 } 1251 1252 /* Called by reduceLib after performing unload vol actions following base bdev hotremove */ 1253 static void 1254 bdev_hotremove_vol_unload_cb(void *cb_arg, int reduce_errno) 1255 { 1256 struct vbdev_compress *comp_bdev = (struct vbdev_compress *)cb_arg; 1257 1258 if (reduce_errno) { 1259 SPDK_ERRLOG("number %d\n", reduce_errno); 1260 } 1261 1262 comp_bdev->vol = NULL; 1263 spdk_bdev_unregister(&comp_bdev->comp_bdev, NULL, NULL); 1264 } 1265 1266 static void 1267 vbdev_compress_base_bdev_hotremove_cb(struct spdk_bdev *bdev_find) 1268 { 1269 struct vbdev_compress *comp_bdev, *tmp; 1270 1271 TAILQ_FOREACH_SAFE(comp_bdev, &g_vbdev_comp, link, tmp) { 1272 if (bdev_find == comp_bdev->base_bdev) { 1273 /* Tell reduceLib that we're done with this volume. */ 1274 spdk_reduce_vol_unload(comp_bdev->vol, bdev_hotremove_vol_unload_cb, comp_bdev); 1275 } 1276 } 1277 } 1278 1279 /* Called when the underlying base bdev triggers asynchronous event such as bdev removal. */ 1280 static void 1281 vbdev_compress_base_bdev_event_cb(enum spdk_bdev_event_type type, struct spdk_bdev *bdev, 1282 void *event_ctx) 1283 { 1284 switch (type) { 1285 case SPDK_BDEV_EVENT_REMOVE: 1286 vbdev_compress_base_bdev_hotremove_cb(bdev); 1287 break; 1288 default: 1289 SPDK_NOTICELOG("Unsupported bdev event: type %d\n", type); 1290 break; 1291 } 1292 } 1293 1294 /* TODO: determine which parms we want user configurable, HC for now 1295 * params.vol_size 1296 * params.chunk_size 1297 * compression PMD, algorithm, window size, comp level, etc. 1298 * DEV_MD_PATH 1299 */ 1300 1301 /* Common function for init and load to allocate and populate the minimal 1302 * information for reducelib to init or load. 1303 */ 1304 struct vbdev_compress * 1305 _prepare_for_load_init(struct spdk_bdev_desc *bdev_desc, uint32_t lb_size) 1306 { 1307 struct vbdev_compress *meta_ctx; 1308 struct spdk_bdev *bdev; 1309 1310 meta_ctx = calloc(1, sizeof(struct vbdev_compress)); 1311 if (meta_ctx == NULL) { 1312 SPDK_ERRLOG("failed to alloc init contexts\n"); 1313 return NULL; 1314 } 1315 1316 meta_ctx->drv_name = "None"; 1317 meta_ctx->backing_dev.unmap = _comp_reduce_unmap; 1318 meta_ctx->backing_dev.readv = _comp_reduce_readv; 1319 meta_ctx->backing_dev.writev = _comp_reduce_writev; 1320 meta_ctx->backing_dev.compress = _comp_reduce_compress; 1321 meta_ctx->backing_dev.decompress = _comp_reduce_decompress; 1322 1323 meta_ctx->base_desc = bdev_desc; 1324 bdev = spdk_bdev_desc_get_bdev(bdev_desc); 1325 meta_ctx->base_bdev = bdev; 1326 1327 meta_ctx->backing_dev.blocklen = bdev->blocklen; 1328 meta_ctx->backing_dev.blockcnt = bdev->blockcnt; 1329 1330 meta_ctx->params.chunk_size = CHUNK_SIZE; 1331 if (lb_size == 0) { 1332 meta_ctx->params.logical_block_size = bdev->blocklen; 1333 } else { 1334 meta_ctx->params.logical_block_size = lb_size; 1335 } 1336 1337 meta_ctx->params.backing_io_unit_size = BACKING_IO_SZ; 1338 return meta_ctx; 1339 } 1340 1341 static bool 1342 _set_pmd(struct vbdev_compress *comp_dev) 1343 { 1344 if (g_opts == COMPRESS_PMD_AUTO) { 1345 if (g_qat_available) { 1346 comp_dev->drv_name = QAT_PMD; 1347 } else { 1348 comp_dev->drv_name = ISAL_PMD; 1349 } 1350 } else if (g_opts == COMPRESS_PMD_QAT_ONLY && g_qat_available) { 1351 comp_dev->drv_name = QAT_PMD; 1352 } else if (g_opts == COMPRESS_PMD_ISAL_ONLY && g_isal_available) { 1353 comp_dev->drv_name = ISAL_PMD; 1354 } else { 1355 SPDK_ERRLOG("Requested PMD is not available.\n"); 1356 return false; 1357 } 1358 SPDK_NOTICELOG("PMD being used: %s\n", comp_dev->drv_name); 1359 return true; 1360 } 1361 1362 /* Call reducelib to initialize a new volume */ 1363 static int 1364 vbdev_init_reduce(const char *bdev_name, const char *pm_path, uint32_t lb_size) 1365 { 1366 struct spdk_bdev_desc *bdev_desc = NULL; 1367 struct vbdev_compress *meta_ctx; 1368 int rc; 1369 1370 rc = spdk_bdev_open_ext(bdev_name, true, vbdev_compress_base_bdev_event_cb, 1371 NULL, &bdev_desc); 1372 if (rc) { 1373 SPDK_ERRLOG("could not open bdev %s\n", bdev_name); 1374 return rc; 1375 } 1376 1377 meta_ctx = _prepare_for_load_init(bdev_desc, lb_size); 1378 if (meta_ctx == NULL) { 1379 spdk_bdev_close(bdev_desc); 1380 return -EINVAL; 1381 } 1382 1383 if (_set_pmd(meta_ctx) == false) { 1384 SPDK_ERRLOG("could not find required pmd\n"); 1385 free(meta_ctx); 1386 spdk_bdev_close(bdev_desc); 1387 return -EINVAL; 1388 } 1389 1390 /* Save the thread where the base device is opened */ 1391 meta_ctx->thread = spdk_get_thread(); 1392 1393 meta_ctx->base_ch = spdk_bdev_get_io_channel(meta_ctx->base_desc); 1394 1395 spdk_reduce_vol_init(&meta_ctx->params, &meta_ctx->backing_dev, 1396 pm_path, 1397 vbdev_reduce_init_cb, 1398 meta_ctx); 1399 return 0; 1400 } 1401 1402 /* We provide this callback for the SPDK channel code to create a channel using 1403 * the channel struct we provided in our module get_io_channel() entry point. Here 1404 * we get and save off an underlying base channel of the device below us so that 1405 * we can communicate with the base bdev on a per channel basis. If we needed 1406 * our own poller for this vbdev, we'd register it here. 1407 */ 1408 static int 1409 comp_bdev_ch_create_cb(void *io_device, void *ctx_buf) 1410 { 1411 struct vbdev_compress *comp_bdev = io_device; 1412 struct comp_device_qp *device_qp; 1413 1414 /* Now set the reduce channel if it's not already set. */ 1415 pthread_mutex_lock(&comp_bdev->reduce_lock); 1416 if (comp_bdev->ch_count == 0) { 1417 /* We use this queue to track outstanding IO in our layer. */ 1418 TAILQ_INIT(&comp_bdev->pending_comp_ios); 1419 1420 /* We use this to queue up compression operations as needed. */ 1421 TAILQ_INIT(&comp_bdev->queued_comp_ops); 1422 1423 comp_bdev->base_ch = spdk_bdev_get_io_channel(comp_bdev->base_desc); 1424 comp_bdev->reduce_thread = spdk_get_thread(); 1425 comp_bdev->poller = SPDK_POLLER_REGISTER(comp_dev_poller, comp_bdev, 0); 1426 /* Now assign a q pair */ 1427 pthread_mutex_lock(&g_comp_device_qp_lock); 1428 TAILQ_FOREACH(device_qp, &g_comp_device_qp, link) { 1429 if (strcmp(device_qp->device->cdev_info.driver_name, comp_bdev->drv_name) == 0) { 1430 if (device_qp->thread == spdk_get_thread()) { 1431 comp_bdev->device_qp = device_qp; 1432 break; 1433 } 1434 if (device_qp->thread == NULL) { 1435 comp_bdev->device_qp = device_qp; 1436 device_qp->thread = spdk_get_thread(); 1437 break; 1438 } 1439 } 1440 } 1441 pthread_mutex_unlock(&g_comp_device_qp_lock); 1442 } 1443 comp_bdev->ch_count++; 1444 pthread_mutex_unlock(&comp_bdev->reduce_lock); 1445 1446 if (comp_bdev->device_qp != NULL) { 1447 return 0; 1448 } else { 1449 SPDK_ERRLOG("out of qpairs, cannot assign one to comp_bdev %p\n", comp_bdev); 1450 assert(false); 1451 return -ENOMEM; 1452 } 1453 } 1454 1455 static void 1456 _channel_cleanup(struct vbdev_compress *comp_bdev) 1457 { 1458 /* Note: comp_bdevs can share a device_qp if they are 1459 * on the same thread so we leave the device_qp element 1460 * alone for this comp_bdev and just clear the reduce thread. 1461 */ 1462 spdk_put_io_channel(comp_bdev->base_ch); 1463 comp_bdev->reduce_thread = NULL; 1464 spdk_poller_unregister(&comp_bdev->poller); 1465 } 1466 1467 /* Used to reroute destroy_ch to the correct thread */ 1468 static void 1469 _comp_bdev_ch_destroy_cb(void *arg) 1470 { 1471 struct vbdev_compress *comp_bdev = arg; 1472 1473 pthread_mutex_lock(&comp_bdev->reduce_lock); 1474 _channel_cleanup(comp_bdev); 1475 pthread_mutex_unlock(&comp_bdev->reduce_lock); 1476 } 1477 1478 /* We provide this callback for the SPDK channel code to destroy a channel 1479 * created with our create callback. We just need to undo anything we did 1480 * when we created. If this bdev used its own poller, we'd unregister it here. 1481 */ 1482 static void 1483 comp_bdev_ch_destroy_cb(void *io_device, void *ctx_buf) 1484 { 1485 struct vbdev_compress *comp_bdev = io_device; 1486 1487 pthread_mutex_lock(&comp_bdev->reduce_lock); 1488 comp_bdev->ch_count--; 1489 if (comp_bdev->ch_count == 0) { 1490 /* Send this request to the thread where the channel was created. */ 1491 if (comp_bdev->reduce_thread != spdk_get_thread()) { 1492 spdk_thread_send_msg(comp_bdev->reduce_thread, 1493 _comp_bdev_ch_destroy_cb, comp_bdev); 1494 } else { 1495 _channel_cleanup(comp_bdev); 1496 } 1497 } 1498 pthread_mutex_unlock(&comp_bdev->reduce_lock); 1499 } 1500 1501 /* RPC entry point for compression vbdev creation. */ 1502 int 1503 create_compress_bdev(const char *bdev_name, const char *pm_path, uint32_t lb_size) 1504 { 1505 if ((lb_size != 0) && (lb_size != LB_SIZE_4K) && (lb_size != LB_SIZE_512B)) { 1506 SPDK_ERRLOG("Logical block size must be 512 or 4096\n"); 1507 return -EINVAL; 1508 } 1509 1510 return vbdev_init_reduce(bdev_name, pm_path, lb_size); 1511 } 1512 1513 /* On init, just init the compress drivers. All metadata is stored on disk. */ 1514 static int 1515 vbdev_compress_init(void) 1516 { 1517 if (vbdev_init_compress_drivers()) { 1518 SPDK_ERRLOG("Error setting up compression devices\n"); 1519 return -EINVAL; 1520 } 1521 1522 return 0; 1523 } 1524 1525 /* Called when the entire module is being torn down. */ 1526 static void 1527 vbdev_compress_finish(void) 1528 { 1529 struct comp_device_qp *dev_qp; 1530 /* TODO: unload vol in a future patch */ 1531 1532 while ((dev_qp = TAILQ_FIRST(&g_comp_device_qp))) { 1533 TAILQ_REMOVE(&g_comp_device_qp, dev_qp, link); 1534 free(dev_qp); 1535 } 1536 pthread_mutex_destroy(&g_comp_device_qp_lock); 1537 1538 rte_mempool_free(g_comp_op_mp); 1539 rte_mempool_free(g_mbuf_mp); 1540 } 1541 1542 /* During init we'll be asked how much memory we'd like passed to us 1543 * in bev_io structures as context. Here's where we specify how 1544 * much context we want per IO. 1545 */ 1546 static int 1547 vbdev_compress_get_ctx_size(void) 1548 { 1549 return sizeof(struct comp_bdev_io); 1550 } 1551 1552 /* When we register our bdev this is how we specify our entry points. */ 1553 static const struct spdk_bdev_fn_table vbdev_compress_fn_table = { 1554 .destruct = vbdev_compress_destruct, 1555 .submit_request = vbdev_compress_submit_request, 1556 .io_type_supported = vbdev_compress_io_type_supported, 1557 .get_io_channel = vbdev_compress_get_io_channel, 1558 .dump_info_json = vbdev_compress_dump_info_json, 1559 .write_config_json = NULL, 1560 }; 1561 1562 static struct spdk_bdev_module compress_if = { 1563 .name = "compress", 1564 .module_init = vbdev_compress_init, 1565 .config_text = NULL, 1566 .get_ctx_size = vbdev_compress_get_ctx_size, 1567 .examine_disk = vbdev_compress_examine, 1568 .module_fini = vbdev_compress_finish, 1569 .config_json = vbdev_compress_config_json 1570 }; 1571 1572 SPDK_BDEV_MODULE_REGISTER(compress, &compress_if) 1573 1574 static int _set_compbdev_name(struct vbdev_compress *comp_bdev) 1575 { 1576 struct spdk_bdev_alias *aliases; 1577 1578 if (!TAILQ_EMPTY(spdk_bdev_get_aliases(comp_bdev->base_bdev))) { 1579 aliases = TAILQ_FIRST(spdk_bdev_get_aliases(comp_bdev->base_bdev)); 1580 comp_bdev->comp_bdev.name = spdk_sprintf_alloc("COMP_%s", aliases->alias); 1581 if (!comp_bdev->comp_bdev.name) { 1582 SPDK_ERRLOG("could not allocate comp_bdev name for alias\n"); 1583 return -ENOMEM; 1584 } 1585 } else { 1586 comp_bdev->comp_bdev.name = spdk_sprintf_alloc("COMP_%s", comp_bdev->base_bdev->name); 1587 if (!comp_bdev->comp_bdev.name) { 1588 SPDK_ERRLOG("could not allocate comp_bdev name for unique name\n"); 1589 return -ENOMEM; 1590 } 1591 } 1592 return 0; 1593 } 1594 1595 static int 1596 vbdev_compress_claim(struct vbdev_compress *comp_bdev) 1597 { 1598 int rc; 1599 1600 if (_set_compbdev_name(comp_bdev)) { 1601 return -EINVAL; 1602 } 1603 1604 /* Note: some of the fields below will change in the future - for example, 1605 * blockcnt specifically will not match (the compressed volume size will 1606 * be slightly less than the base bdev size) 1607 */ 1608 comp_bdev->comp_bdev.product_name = COMP_BDEV_NAME; 1609 comp_bdev->comp_bdev.write_cache = comp_bdev->base_bdev->write_cache; 1610 1611 if (strcmp(comp_bdev->drv_name, QAT_PMD) == 0) { 1612 comp_bdev->comp_bdev.required_alignment = 1613 spdk_max(spdk_u32log2(comp_bdev->base_bdev->blocklen), 1614 comp_bdev->base_bdev->required_alignment); 1615 SPDK_NOTICELOG("QAT in use: Required alignment set to %u\n", 1616 comp_bdev->comp_bdev.required_alignment); 1617 } else { 1618 comp_bdev->comp_bdev.required_alignment = comp_bdev->base_bdev->required_alignment; 1619 } 1620 comp_bdev->comp_bdev.optimal_io_boundary = 1621 comp_bdev->params.chunk_size / comp_bdev->params.logical_block_size; 1622 1623 comp_bdev->comp_bdev.split_on_optimal_io_boundary = true; 1624 1625 comp_bdev->comp_bdev.blocklen = comp_bdev->params.logical_block_size; 1626 comp_bdev->comp_bdev.blockcnt = comp_bdev->params.vol_size / comp_bdev->comp_bdev.blocklen; 1627 assert(comp_bdev->comp_bdev.blockcnt > 0); 1628 1629 /* This is the context that is passed to us when the bdev 1630 * layer calls in so we'll save our comp_bdev node here. 1631 */ 1632 comp_bdev->comp_bdev.ctxt = comp_bdev; 1633 comp_bdev->comp_bdev.fn_table = &vbdev_compress_fn_table; 1634 comp_bdev->comp_bdev.module = &compress_if; 1635 1636 pthread_mutex_init(&comp_bdev->reduce_lock, NULL); 1637 1638 /* Save the thread where the base device is opened */ 1639 comp_bdev->thread = spdk_get_thread(); 1640 1641 spdk_io_device_register(comp_bdev, comp_bdev_ch_create_cb, comp_bdev_ch_destroy_cb, 1642 sizeof(struct comp_io_channel), 1643 comp_bdev->comp_bdev.name); 1644 1645 rc = spdk_bdev_module_claim_bdev(comp_bdev->base_bdev, comp_bdev->base_desc, 1646 comp_bdev->comp_bdev.module); 1647 if (rc) { 1648 SPDK_ERRLOG("could not claim bdev %s\n", spdk_bdev_get_name(comp_bdev->base_bdev)); 1649 goto error_claim; 1650 } 1651 1652 rc = spdk_bdev_register(&comp_bdev->comp_bdev); 1653 if (rc < 0) { 1654 SPDK_ERRLOG("trying to register bdev\n"); 1655 goto error_bdev_register; 1656 } 1657 1658 TAILQ_INSERT_TAIL(&g_vbdev_comp, comp_bdev, link); 1659 1660 SPDK_NOTICELOG("registered io_device and virtual bdev for: %s\n", comp_bdev->comp_bdev.name); 1661 1662 return 0; 1663 1664 /* Error cleanup paths. */ 1665 error_bdev_register: 1666 spdk_bdev_module_release_bdev(comp_bdev->base_bdev); 1667 error_claim: 1668 spdk_io_device_unregister(comp_bdev, NULL); 1669 free(comp_bdev->comp_bdev.name); 1670 return rc; 1671 } 1672 1673 static void 1674 _vbdev_compress_delete_done(void *_ctx) 1675 { 1676 struct vbdev_comp_delete_ctx *ctx = _ctx; 1677 1678 ctx->cb_fn(ctx->cb_arg, ctx->cb_rc); 1679 1680 free(ctx); 1681 } 1682 1683 static void 1684 vbdev_compress_delete_done(void *cb_arg, int bdeverrno) 1685 { 1686 struct vbdev_comp_delete_ctx *ctx = cb_arg; 1687 1688 ctx->cb_rc = bdeverrno; 1689 1690 if (ctx->orig_thread != spdk_get_thread()) { 1691 spdk_thread_send_msg(ctx->orig_thread, _vbdev_compress_delete_done, ctx); 1692 } else { 1693 _vbdev_compress_delete_done(ctx); 1694 } 1695 } 1696 1697 void 1698 bdev_compress_delete(const char *name, spdk_delete_compress_complete cb_fn, void *cb_arg) 1699 { 1700 struct vbdev_compress *comp_bdev = NULL; 1701 struct vbdev_comp_delete_ctx *ctx; 1702 1703 TAILQ_FOREACH(comp_bdev, &g_vbdev_comp, link) { 1704 if (strcmp(name, comp_bdev->comp_bdev.name) == 0) { 1705 break; 1706 } 1707 } 1708 1709 if (comp_bdev == NULL) { 1710 cb_fn(cb_arg, -ENODEV); 1711 return; 1712 } 1713 1714 ctx = calloc(1, sizeof(*ctx)); 1715 if (ctx == NULL) { 1716 SPDK_ERRLOG("Failed to allocate delete context\n"); 1717 cb_fn(cb_arg, -ENOMEM); 1718 return; 1719 } 1720 1721 /* Save these for after the vol is destroyed. */ 1722 ctx->cb_fn = cb_fn; 1723 ctx->cb_arg = cb_arg; 1724 ctx->orig_thread = spdk_get_thread(); 1725 1726 comp_bdev->delete_ctx = ctx; 1727 1728 /* Tell reducelib that we're done with this volume. */ 1729 if (comp_bdev->orphaned == false) { 1730 spdk_reduce_vol_unload(comp_bdev->vol, delete_vol_unload_cb, comp_bdev); 1731 } else { 1732 delete_vol_unload_cb(comp_bdev, 0); 1733 } 1734 } 1735 1736 static void 1737 _vbdev_reduce_load_cb(void *ctx) 1738 { 1739 struct vbdev_compress *meta_ctx = ctx; 1740 int rc; 1741 1742 assert(meta_ctx->base_desc != NULL); 1743 1744 /* Done with metadata operations */ 1745 spdk_put_io_channel(meta_ctx->base_ch); 1746 1747 if (meta_ctx->reduce_errno == 0) { 1748 if (_set_pmd(meta_ctx) == false) { 1749 SPDK_ERRLOG("could not find required pmd\n"); 1750 goto err; 1751 } 1752 1753 rc = vbdev_compress_claim(meta_ctx); 1754 if (rc != 0) { 1755 goto err; 1756 } 1757 } else if (meta_ctx->reduce_errno == -ENOENT) { 1758 if (_set_compbdev_name(meta_ctx)) { 1759 goto err; 1760 } 1761 1762 /* Save the thread where the base device is opened */ 1763 meta_ctx->thread = spdk_get_thread(); 1764 1765 meta_ctx->comp_bdev.module = &compress_if; 1766 pthread_mutex_init(&meta_ctx->reduce_lock, NULL); 1767 rc = spdk_bdev_module_claim_bdev(meta_ctx->base_bdev, meta_ctx->base_desc, 1768 meta_ctx->comp_bdev.module); 1769 if (rc) { 1770 SPDK_ERRLOG("could not claim bdev %s\n", spdk_bdev_get_name(meta_ctx->base_bdev)); 1771 free(meta_ctx->comp_bdev.name); 1772 goto err; 1773 } 1774 1775 meta_ctx->orphaned = true; 1776 TAILQ_INSERT_TAIL(&g_vbdev_comp, meta_ctx, link); 1777 } else { 1778 if (meta_ctx->reduce_errno != -EILSEQ) { 1779 SPDK_ERRLOG("for vol %s, error %u\n", 1780 spdk_bdev_get_name(meta_ctx->base_bdev), meta_ctx->reduce_errno); 1781 } 1782 goto err; 1783 } 1784 1785 spdk_bdev_module_examine_done(&compress_if); 1786 return; 1787 1788 err: 1789 /* Close the underlying bdev on its same opened thread. */ 1790 spdk_bdev_close(meta_ctx->base_desc); 1791 free(meta_ctx); 1792 spdk_bdev_module_examine_done(&compress_if); 1793 } 1794 1795 /* Callback from reduce for then load is complete. We'll pass the vbdev_comp struct 1796 * used for initial metadata operations to claim where it will be further filled out 1797 * and added to the global list. 1798 */ 1799 static void 1800 vbdev_reduce_load_cb(void *cb_arg, struct spdk_reduce_vol *vol, int reduce_errno) 1801 { 1802 struct vbdev_compress *meta_ctx = cb_arg; 1803 1804 if (reduce_errno == 0) { 1805 /* Update information following volume load. */ 1806 meta_ctx->vol = vol; 1807 memcpy(&meta_ctx->params, spdk_reduce_vol_get_params(vol), 1808 sizeof(struct spdk_reduce_vol_params)); 1809 } 1810 1811 meta_ctx->reduce_errno = reduce_errno; 1812 1813 if (meta_ctx->thread && meta_ctx->thread != spdk_get_thread()) { 1814 spdk_thread_send_msg(meta_ctx->thread, _vbdev_reduce_load_cb, meta_ctx); 1815 } else { 1816 _vbdev_reduce_load_cb(meta_ctx); 1817 } 1818 1819 } 1820 1821 /* Examine_disk entry point: will do a metadata load to see if this is ours, 1822 * and if so will go ahead and claim it. 1823 */ 1824 static void 1825 vbdev_compress_examine(struct spdk_bdev *bdev) 1826 { 1827 struct spdk_bdev_desc *bdev_desc = NULL; 1828 struct vbdev_compress *meta_ctx; 1829 int rc; 1830 1831 if (strcmp(bdev->product_name, COMP_BDEV_NAME) == 0) { 1832 spdk_bdev_module_examine_done(&compress_if); 1833 return; 1834 } 1835 1836 rc = spdk_bdev_open_ext(spdk_bdev_get_name(bdev), false, 1837 vbdev_compress_base_bdev_event_cb, NULL, &bdev_desc); 1838 if (rc) { 1839 SPDK_ERRLOG("could not open bdev %s\n", spdk_bdev_get_name(bdev)); 1840 spdk_bdev_module_examine_done(&compress_if); 1841 return; 1842 } 1843 1844 meta_ctx = _prepare_for_load_init(bdev_desc, 0); 1845 if (meta_ctx == NULL) { 1846 spdk_bdev_close(bdev_desc); 1847 spdk_bdev_module_examine_done(&compress_if); 1848 return; 1849 } 1850 1851 /* Save the thread where the base device is opened */ 1852 meta_ctx->thread = spdk_get_thread(); 1853 1854 meta_ctx->base_ch = spdk_bdev_get_io_channel(meta_ctx->base_desc); 1855 spdk_reduce_vol_load(&meta_ctx->backing_dev, vbdev_reduce_load_cb, meta_ctx); 1856 } 1857 1858 int 1859 compress_set_pmd(enum compress_pmd *opts) 1860 { 1861 g_opts = *opts; 1862 1863 return 0; 1864 } 1865 1866 SPDK_LOG_REGISTER_COMPONENT(vbdev_compress) 1867