1 /*- 2 * BSD LICENSE 3 * 4 * Copyright (c) Intel Corporation. 5 * All rights reserved. 6 * 7 * Redistribution and use in source and binary forms, with or without 8 * modification, are permitted provided that the following conditions 9 * are met: 10 * 11 * * Redistributions of source code must retain the above copyright 12 * notice, this list of conditions and the following disclaimer. 13 * * Redistributions in binary form must reproduce the above copyright 14 * notice, this list of conditions and the following disclaimer in 15 * the documentation and/or other materials provided with the 16 * distribution. 17 * * Neither the name of Intel Corporation nor the names of its 18 * contributors may be used to endorse or promote products derived 19 * from this software without specific prior written permission. 20 * 21 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 22 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 23 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR 24 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT 25 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, 26 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT 27 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 28 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY 29 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 30 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE 31 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 32 */ 33 34 #include "vbdev_compress.h" 35 36 #include "spdk/reduce.h" 37 #include "spdk/stdinc.h" 38 #include "spdk/rpc.h" 39 #include "spdk/env.h" 40 #include "spdk/endian.h" 41 #include "spdk/string.h" 42 #include "spdk/thread.h" 43 #include "spdk/util.h" 44 #include "spdk/bdev_module.h" 45 46 #include "spdk/log.h" 47 48 #include <rte_config.h> 49 #include <rte_bus_vdev.h> 50 #include <rte_compressdev.h> 51 #include <rte_comp.h> 52 53 #define NUM_MAX_XFORMS 2 54 #define NUM_MAX_INFLIGHT_OPS 128 55 #define DEFAULT_WINDOW_SIZE 15 56 /* We need extra mbufs per operation to accommodate host buffers that 57 * span a 2MB boundary. 58 */ 59 #define MAX_MBUFS_PER_OP (REDUCE_MAX_IOVECS * 2) 60 #define CHUNK_SIZE (1024 * 16) 61 #define COMP_BDEV_NAME "compress" 62 #define BACKING_IO_SZ (4 * 1024) 63 64 #define ISAL_PMD "compress_isal" 65 #define QAT_PMD "compress_qat" 66 #define NUM_MBUFS 8192 67 #define POOL_CACHE_SIZE 256 68 69 static enum compress_pmd g_opts; 70 71 /* Global list of available compression devices. */ 72 struct compress_dev { 73 struct rte_compressdev_info cdev_info; /* includes device friendly name */ 74 uint8_t cdev_id; /* identifier for the device */ 75 void *comp_xform; /* shared private xform for comp on this PMD */ 76 void *decomp_xform; /* shared private xform for decomp on this PMD */ 77 TAILQ_ENTRY(compress_dev) link; 78 }; 79 static TAILQ_HEAD(, compress_dev) g_compress_devs = TAILQ_HEAD_INITIALIZER(g_compress_devs); 80 81 /* Although ISAL PMD reports 'unlimited' qpairs, it has an unplanned limit of 99 due to 82 * the length of the internal ring name that it creates, it breaks a limit in the generic 83 * ring code and fails the qp initialization. 84 */ 85 #define MAX_NUM_QP 99 86 /* Global list and lock for unique device/queue pair combos */ 87 struct comp_device_qp { 88 struct compress_dev *device; /* ptr to compression device */ 89 uint8_t qp; /* queue pair for this node */ 90 struct spdk_thread *thread; /* thead that this qp is assigned to */ 91 TAILQ_ENTRY(comp_device_qp) link; 92 }; 93 static TAILQ_HEAD(, comp_device_qp) g_comp_device_qp = TAILQ_HEAD_INITIALIZER(g_comp_device_qp); 94 static pthread_mutex_t g_comp_device_qp_lock = PTHREAD_MUTEX_INITIALIZER; 95 96 /* For queueing up compression operations that we can't submit for some reason */ 97 struct vbdev_comp_op { 98 struct spdk_reduce_backing_dev *backing_dev; 99 struct iovec *src_iovs; 100 int src_iovcnt; 101 struct iovec *dst_iovs; 102 int dst_iovcnt; 103 bool compress; 104 void *cb_arg; 105 TAILQ_ENTRY(vbdev_comp_op) link; 106 }; 107 108 struct vbdev_comp_delete_ctx { 109 spdk_delete_compress_complete cb_fn; 110 void *cb_arg; 111 int cb_rc; 112 struct spdk_thread *orig_thread; 113 }; 114 115 /* List of virtual bdevs and associated info for each. */ 116 struct vbdev_compress { 117 struct spdk_bdev *base_bdev; /* the thing we're attaching to */ 118 struct spdk_bdev_desc *base_desc; /* its descriptor we get from open */ 119 struct spdk_io_channel *base_ch; /* IO channel of base device */ 120 struct spdk_bdev comp_bdev; /* the compression virtual bdev */ 121 struct comp_io_channel *comp_ch; /* channel associated with this bdev */ 122 char *drv_name; /* name of the compression device driver */ 123 struct comp_device_qp *device_qp; 124 struct spdk_thread *reduce_thread; 125 pthread_mutex_t reduce_lock; 126 uint32_t ch_count; 127 TAILQ_HEAD(, spdk_bdev_io) pending_comp_ios; /* outstanding operations to a comp library */ 128 struct spdk_poller *poller; /* completion poller */ 129 struct spdk_reduce_vol_params params; /* params for the reduce volume */ 130 struct spdk_reduce_backing_dev backing_dev; /* backing device info for the reduce volume */ 131 struct spdk_reduce_vol *vol; /* the reduce volume */ 132 struct vbdev_comp_delete_ctx *delete_ctx; 133 bool orphaned; /* base bdev claimed but comp_bdev not registered */ 134 int reduce_errno; 135 TAILQ_HEAD(, vbdev_comp_op) queued_comp_ops; 136 TAILQ_ENTRY(vbdev_compress) link; 137 struct spdk_thread *thread; /* thread where base device is opened */ 138 }; 139 static TAILQ_HEAD(, vbdev_compress) g_vbdev_comp = TAILQ_HEAD_INITIALIZER(g_vbdev_comp); 140 141 /* The comp vbdev channel struct. It is allocated and freed on my behalf by the io channel code. 142 */ 143 struct comp_io_channel { 144 struct spdk_io_channel_iter *iter; /* used with for_each_channel in reset */ 145 }; 146 147 /* Per I/O context for the compression vbdev. */ 148 struct comp_bdev_io { 149 struct comp_io_channel *comp_ch; /* used in completion handling */ 150 struct vbdev_compress *comp_bdev; /* vbdev associated with this IO */ 151 struct spdk_bdev_io_wait_entry bdev_io_wait; /* for bdev_io_wait */ 152 struct spdk_bdev_io *orig_io; /* the original IO */ 153 struct spdk_io_channel *ch; /* for resubmission */ 154 int status; /* save for completion on orig thread */ 155 }; 156 157 /* Shared mempools between all devices on this system */ 158 static struct rte_mempool *g_mbuf_mp = NULL; /* mbuf mempool */ 159 static struct rte_mempool *g_comp_op_mp = NULL; /* comp operations, must be rte* mempool */ 160 static struct rte_mbuf_ext_shared_info g_shinfo = {}; /* used by DPDK mbuf macros */ 161 static bool g_qat_available = false; 162 static bool g_isal_available = false; 163 164 /* Create shared (between all ops per PMD) compress xforms. */ 165 static struct rte_comp_xform g_comp_xform = { 166 .type = RTE_COMP_COMPRESS, 167 .compress = { 168 .algo = RTE_COMP_ALGO_DEFLATE, 169 .deflate.huffman = RTE_COMP_HUFFMAN_DEFAULT, 170 .level = RTE_COMP_LEVEL_MAX, 171 .window_size = DEFAULT_WINDOW_SIZE, 172 .chksum = RTE_COMP_CHECKSUM_NONE, 173 .hash_algo = RTE_COMP_HASH_ALGO_NONE 174 } 175 }; 176 /* Create shared (between all ops per PMD) decompress xforms. */ 177 static struct rte_comp_xform g_decomp_xform = { 178 .type = RTE_COMP_DECOMPRESS, 179 .decompress = { 180 .algo = RTE_COMP_ALGO_DEFLATE, 181 .chksum = RTE_COMP_CHECKSUM_NONE, 182 .window_size = DEFAULT_WINDOW_SIZE, 183 .hash_algo = RTE_COMP_HASH_ALGO_NONE 184 } 185 }; 186 187 static void vbdev_compress_examine(struct spdk_bdev *bdev); 188 static int vbdev_compress_claim(struct vbdev_compress *comp_bdev); 189 static void vbdev_compress_queue_io(struct spdk_bdev_io *bdev_io); 190 struct vbdev_compress *_prepare_for_load_init(struct spdk_bdev_desc *bdev_desc, uint32_t lb_size); 191 static void vbdev_compress_submit_request(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io); 192 static void comp_bdev_ch_destroy_cb(void *io_device, void *ctx_buf); 193 static void vbdev_compress_delete_done(void *cb_arg, int bdeverrno); 194 195 /* Dummy function used by DPDK to free ext attached buffers 196 * to mbufs, we free them ourselves but this callback has to 197 * be here. 198 */ 199 static void 200 shinfo_free_cb(void *arg1, void *arg2) 201 { 202 } 203 204 /* Called by vbdev_init_compress_drivers() to init each discovered compression device */ 205 static int 206 create_compress_dev(uint8_t index) 207 { 208 struct compress_dev *device; 209 uint16_t q_pairs; 210 uint8_t cdev_id; 211 int rc, i; 212 struct comp_device_qp *dev_qp; 213 struct comp_device_qp *tmp_qp; 214 215 device = calloc(1, sizeof(struct compress_dev)); 216 if (!device) { 217 return -ENOMEM; 218 } 219 220 /* Get details about this device. */ 221 rte_compressdev_info_get(index, &device->cdev_info); 222 223 cdev_id = device->cdev_id = index; 224 225 /* Zero means no limit so choose number of lcores. */ 226 if (device->cdev_info.max_nb_queue_pairs == 0) { 227 q_pairs = MAX_NUM_QP; 228 } else { 229 q_pairs = spdk_min(device->cdev_info.max_nb_queue_pairs, MAX_NUM_QP); 230 } 231 232 /* Configure the compression device. */ 233 struct rte_compressdev_config config = { 234 .socket_id = rte_socket_id(), 235 .nb_queue_pairs = q_pairs, 236 .max_nb_priv_xforms = NUM_MAX_XFORMS, 237 .max_nb_streams = 0 238 }; 239 rc = rte_compressdev_configure(cdev_id, &config); 240 if (rc < 0) { 241 SPDK_ERRLOG("Failed to configure compressdev %u\n", cdev_id); 242 goto err; 243 } 244 245 /* Pre-setup all potential qpairs now and assign them in the channel 246 * callback. 247 */ 248 for (i = 0; i < q_pairs; i++) { 249 rc = rte_compressdev_queue_pair_setup(cdev_id, i, 250 NUM_MAX_INFLIGHT_OPS, 251 rte_socket_id()); 252 if (rc) { 253 if (i > 0) { 254 q_pairs = i; 255 SPDK_NOTICELOG("FYI failed to setup a queue pair on " 256 "compressdev %u with error %u " 257 "so limiting to %u qpairs\n", 258 cdev_id, rc, q_pairs); 259 break; 260 } else { 261 SPDK_ERRLOG("Failed to setup queue pair on " 262 "compressdev %u with error %u\n", cdev_id, rc); 263 rc = -EINVAL; 264 goto err; 265 } 266 } 267 } 268 269 rc = rte_compressdev_start(cdev_id); 270 if (rc < 0) { 271 SPDK_ERRLOG("Failed to start device %u: error %d\n", 272 cdev_id, rc); 273 goto err; 274 } 275 276 if (device->cdev_info.capabilities->comp_feature_flags & RTE_COMP_FF_SHAREABLE_PRIV_XFORM) { 277 rc = rte_compressdev_private_xform_create(cdev_id, &g_comp_xform, 278 &device->comp_xform); 279 if (rc < 0) { 280 SPDK_ERRLOG("Failed to create private comp xform device %u: error %d\n", 281 cdev_id, rc); 282 goto err; 283 } 284 285 rc = rte_compressdev_private_xform_create(cdev_id, &g_decomp_xform, 286 &device->decomp_xform); 287 if (rc) { 288 SPDK_ERRLOG("Failed to create private decomp xform device %u: error %d\n", 289 cdev_id, rc); 290 goto err; 291 } 292 } else { 293 SPDK_ERRLOG("PMD does not support shared transforms\n"); 294 goto err; 295 } 296 297 /* Build up list of device/qp combinations */ 298 for (i = 0; i < q_pairs; i++) { 299 dev_qp = calloc(1, sizeof(struct comp_device_qp)); 300 if (!dev_qp) { 301 rc = -ENOMEM; 302 goto err; 303 } 304 dev_qp->device = device; 305 dev_qp->qp = i; 306 dev_qp->thread = NULL; 307 TAILQ_INSERT_TAIL(&g_comp_device_qp, dev_qp, link); 308 } 309 310 TAILQ_INSERT_TAIL(&g_compress_devs, device, link); 311 312 if (strcmp(device->cdev_info.driver_name, QAT_PMD) == 0) { 313 g_qat_available = true; 314 } 315 if (strcmp(device->cdev_info.driver_name, ISAL_PMD) == 0) { 316 g_isal_available = true; 317 } 318 319 return 0; 320 321 err: 322 TAILQ_FOREACH_SAFE(dev_qp, &g_comp_device_qp, link, tmp_qp) { 323 TAILQ_REMOVE(&g_comp_device_qp, dev_qp, link); 324 free(dev_qp); 325 } 326 free(device); 327 return rc; 328 } 329 330 /* Called from driver init entry point, vbdev_compress_init() */ 331 static int 332 vbdev_init_compress_drivers(void) 333 { 334 uint8_t cdev_count, i; 335 struct compress_dev *tmp_dev; 336 struct compress_dev *device; 337 int rc; 338 339 /* We always init the compress_isal PMD */ 340 rc = rte_vdev_init(ISAL_PMD, NULL); 341 if (rc == 0) { 342 SPDK_NOTICELOG("created virtual PMD %s\n", ISAL_PMD); 343 } else if (rc == -EEXIST) { 344 SPDK_NOTICELOG("virtual PMD %s already exists.\n", ISAL_PMD); 345 } else { 346 SPDK_ERRLOG("creating virtual PMD %s\n", ISAL_PMD); 347 return -EINVAL; 348 } 349 350 /* If we have no compression devices, there's no reason to continue. */ 351 cdev_count = rte_compressdev_count(); 352 if (cdev_count == 0) { 353 return 0; 354 } 355 if (cdev_count > RTE_COMPRESS_MAX_DEVS) { 356 SPDK_ERRLOG("invalid device count from rte_compressdev_count()\n"); 357 return -EINVAL; 358 } 359 360 g_mbuf_mp = rte_pktmbuf_pool_create("comp_mbuf_mp", NUM_MBUFS, POOL_CACHE_SIZE, 361 sizeof(struct rte_mbuf), 0, rte_socket_id()); 362 if (g_mbuf_mp == NULL) { 363 SPDK_ERRLOG("Cannot create mbuf pool\n"); 364 rc = -ENOMEM; 365 goto error_create_mbuf; 366 } 367 368 g_comp_op_mp = rte_comp_op_pool_create("comp_op_pool", NUM_MBUFS, POOL_CACHE_SIZE, 369 0, rte_socket_id()); 370 if (g_comp_op_mp == NULL) { 371 SPDK_ERRLOG("Cannot create comp op pool\n"); 372 rc = -ENOMEM; 373 goto error_create_op; 374 } 375 376 /* Init all devices */ 377 for (i = 0; i < cdev_count; i++) { 378 rc = create_compress_dev(i); 379 if (rc != 0) { 380 goto error_create_compress_devs; 381 } 382 } 383 384 if (g_qat_available == true) { 385 SPDK_NOTICELOG("initialized QAT PMD\n"); 386 } 387 388 g_shinfo.free_cb = shinfo_free_cb; 389 390 return 0; 391 392 /* Error cleanup paths. */ 393 error_create_compress_devs: 394 TAILQ_FOREACH_SAFE(device, &g_compress_devs, link, tmp_dev) { 395 TAILQ_REMOVE(&g_compress_devs, device, link); 396 free(device); 397 } 398 error_create_op: 399 error_create_mbuf: 400 rte_mempool_free(g_mbuf_mp); 401 402 return rc; 403 } 404 405 /* for completing rw requests on the orig IO thread. */ 406 static void 407 _reduce_rw_blocks_cb(void *arg) 408 { 409 struct comp_bdev_io *io_ctx = arg; 410 411 if (io_ctx->status == 0) { 412 spdk_bdev_io_complete(io_ctx->orig_io, SPDK_BDEV_IO_STATUS_SUCCESS); 413 } else { 414 SPDK_ERRLOG("status %d on operation from reduce API\n", io_ctx->status); 415 spdk_bdev_io_complete(io_ctx->orig_io, SPDK_BDEV_IO_STATUS_FAILED); 416 } 417 } 418 419 /* Completion callback for r/w that were issued via reducelib. */ 420 static void 421 reduce_rw_blocks_cb(void *arg, int reduce_errno) 422 { 423 struct spdk_bdev_io *bdev_io = arg; 424 struct comp_bdev_io *io_ctx = (struct comp_bdev_io *)bdev_io->driver_ctx; 425 struct spdk_io_channel *ch = spdk_io_channel_from_ctx(io_ctx->comp_ch); 426 struct spdk_thread *orig_thread; 427 428 /* TODO: need to decide which error codes are bdev_io success vs failure; 429 * example examine calls reading metadata */ 430 431 io_ctx->status = reduce_errno; 432 433 /* Send this request to the orig IO thread. */ 434 orig_thread = spdk_io_channel_get_thread(ch); 435 if (orig_thread != spdk_get_thread()) { 436 spdk_thread_send_msg(orig_thread, _reduce_rw_blocks_cb, io_ctx); 437 } else { 438 _reduce_rw_blocks_cb(io_ctx); 439 } 440 } 441 442 static uint64_t 443 _setup_compress_mbuf(struct rte_mbuf **mbufs, int *mbuf_total, uint64_t *total_length, 444 struct iovec *iovs, int iovcnt, void *reduce_cb_arg) 445 { 446 uint64_t updated_length, remainder, phys_addr; 447 uint8_t *current_base = NULL; 448 int iov_index, mbuf_index; 449 int rc = 0; 450 451 /* Setup mbufs */ 452 iov_index = mbuf_index = 0; 453 while (iov_index < iovcnt) { 454 455 current_base = iovs[iov_index].iov_base; 456 if (total_length) { 457 *total_length += iovs[iov_index].iov_len; 458 } 459 assert(mbufs[mbuf_index] != NULL); 460 mbufs[mbuf_index]->userdata = reduce_cb_arg; 461 updated_length = iovs[iov_index].iov_len; 462 phys_addr = spdk_vtophys((void *)current_base, &updated_length); 463 464 rte_pktmbuf_attach_extbuf(mbufs[mbuf_index], 465 current_base, 466 phys_addr, 467 updated_length, 468 &g_shinfo); 469 rte_pktmbuf_append(mbufs[mbuf_index], updated_length); 470 remainder = iovs[iov_index].iov_len - updated_length; 471 472 if (mbuf_index > 0) { 473 rte_pktmbuf_chain(mbufs[0], mbufs[mbuf_index]); 474 } 475 476 /* If we crossed 2 2MB boundary we need another mbuf for the remainder */ 477 if (remainder > 0) { 478 /* allocate an mbuf at the end of the array */ 479 rc = rte_pktmbuf_alloc_bulk(g_mbuf_mp, 480 (struct rte_mbuf **)&mbufs[*mbuf_total], 1); 481 if (rc) { 482 SPDK_ERRLOG("ERROR trying to get an extra mbuf!\n"); 483 return -1; 484 } 485 (*mbuf_total)++; 486 mbuf_index++; 487 mbufs[mbuf_index]->userdata = reduce_cb_arg; 488 current_base += updated_length; 489 phys_addr = spdk_vtophys((void *)current_base, &remainder); 490 /* assert we don't cross another */ 491 assert(remainder == iovs[iov_index].iov_len - updated_length); 492 493 rte_pktmbuf_attach_extbuf(mbufs[mbuf_index], 494 current_base, 495 phys_addr, 496 remainder, 497 &g_shinfo); 498 rte_pktmbuf_append(mbufs[mbuf_index], remainder); 499 rte_pktmbuf_chain(mbufs[0], mbufs[mbuf_index]); 500 } 501 iov_index++; 502 mbuf_index++; 503 } 504 505 return 0; 506 } 507 508 static int 509 _compress_operation(struct spdk_reduce_backing_dev *backing_dev, struct iovec *src_iovs, 510 int src_iovcnt, struct iovec *dst_iovs, 511 int dst_iovcnt, bool compress, void *cb_arg) 512 { 513 void *reduce_cb_arg = cb_arg; 514 struct vbdev_compress *comp_bdev = SPDK_CONTAINEROF(backing_dev, struct vbdev_compress, 515 backing_dev); 516 struct rte_comp_op *comp_op; 517 struct rte_mbuf *src_mbufs[MAX_MBUFS_PER_OP]; 518 struct rte_mbuf *dst_mbufs[MAX_MBUFS_PER_OP]; 519 uint8_t cdev_id = comp_bdev->device_qp->device->cdev_id; 520 uint64_t total_length = 0; 521 int rc = 0; 522 struct vbdev_comp_op *op_to_queue; 523 int i; 524 int src_mbuf_total = src_iovcnt; 525 int dst_mbuf_total = dst_iovcnt; 526 bool device_error = false; 527 528 assert(src_iovcnt < MAX_MBUFS_PER_OP); 529 530 #ifdef DEBUG 531 memset(src_mbufs, 0, sizeof(src_mbufs)); 532 memset(dst_mbufs, 0, sizeof(dst_mbufs)); 533 #endif 534 535 comp_op = rte_comp_op_alloc(g_comp_op_mp); 536 if (!comp_op) { 537 SPDK_ERRLOG("trying to get a comp op!\n"); 538 goto error_get_op; 539 } 540 541 /* get an mbuf per iov, src and dst */ 542 rc = rte_pktmbuf_alloc_bulk(g_mbuf_mp, (struct rte_mbuf **)&src_mbufs[0], src_iovcnt); 543 if (rc) { 544 SPDK_ERRLOG("ERROR trying to get src_mbufs!\n"); 545 goto error_get_src; 546 } 547 548 rc = rte_pktmbuf_alloc_bulk(g_mbuf_mp, (struct rte_mbuf **)&dst_mbufs[0], dst_iovcnt); 549 if (rc) { 550 SPDK_ERRLOG("ERROR trying to get dst_mbufs!\n"); 551 goto error_get_dst; 552 } 553 554 /* There is a 1:1 mapping between a bdev_io and a compression operation, but 555 * all compression PMDs that SPDK uses support chaining so build our mbuf chain 556 * and associate with our single comp_op. 557 */ 558 559 rc = _setup_compress_mbuf(&src_mbufs[0], &src_mbuf_total, &total_length, 560 src_iovs, src_iovcnt, reduce_cb_arg); 561 if (rc < 0) { 562 goto error_src_dst; 563 } 564 565 comp_op->m_src = src_mbufs[0]; 566 comp_op->src.offset = 0; 567 comp_op->src.length = total_length; 568 569 /* setup dst mbufs, for the current test being used with this code there's only one vector */ 570 rc = _setup_compress_mbuf(&dst_mbufs[0], &dst_mbuf_total, NULL, 571 dst_iovs, dst_iovcnt, reduce_cb_arg); 572 if (rc < 0) { 573 goto error_src_dst; 574 } 575 576 comp_op->m_dst = dst_mbufs[0]; 577 comp_op->dst.offset = 0; 578 579 if (compress == true) { 580 comp_op->private_xform = comp_bdev->device_qp->device->comp_xform; 581 } else { 582 comp_op->private_xform = comp_bdev->device_qp->device->decomp_xform; 583 } 584 585 comp_op->op_type = RTE_COMP_OP_STATELESS; 586 comp_op->flush_flag = RTE_COMP_FLUSH_FINAL; 587 588 rc = rte_compressdev_enqueue_burst(cdev_id, comp_bdev->device_qp->qp, &comp_op, 1); 589 assert(rc <= 1); 590 591 /* We always expect 1 got queued, if 0 then we need to queue it up. */ 592 if (rc == 1) { 593 return 0; 594 } else if (comp_op->status == RTE_COMP_OP_STATUS_NOT_PROCESSED) { 595 /* we free mbufs differently depending on whether they were chained or not */ 596 rte_pktmbuf_free(comp_op->m_src); 597 rte_pktmbuf_free(comp_op->m_dst); 598 goto error_enqueue; 599 } else { 600 device_error = true; 601 goto error_src_dst; 602 } 603 604 /* Error cleanup paths. */ 605 error_src_dst: 606 for (i = 0; i < dst_mbuf_total; i++) { 607 rte_pktmbuf_free((struct rte_mbuf *)&dst_mbufs[i]); 608 } 609 error_get_dst: 610 for (i = 0; i < src_mbuf_total; i++) { 611 rte_pktmbuf_free((struct rte_mbuf *)&src_mbufs[i]); 612 } 613 error_get_src: 614 error_enqueue: 615 rte_comp_op_free(comp_op); 616 error_get_op: 617 618 if (device_error == true) { 619 /* There was an error sending the op to the device, most 620 * likely with the parameters. 621 */ 622 SPDK_ERRLOG("Compression API returned 0x%x\n", comp_op->status); 623 return -EINVAL; 624 } 625 626 op_to_queue = calloc(1, sizeof(struct vbdev_comp_op)); 627 if (op_to_queue == NULL) { 628 SPDK_ERRLOG("unable to allocate operation for queueing.\n"); 629 return -ENOMEM; 630 } 631 op_to_queue->backing_dev = backing_dev; 632 op_to_queue->src_iovs = src_iovs; 633 op_to_queue->src_iovcnt = src_iovcnt; 634 op_to_queue->dst_iovs = dst_iovs; 635 op_to_queue->dst_iovcnt = dst_iovcnt; 636 op_to_queue->compress = compress; 637 op_to_queue->cb_arg = cb_arg; 638 TAILQ_INSERT_TAIL(&comp_bdev->queued_comp_ops, 639 op_to_queue, 640 link); 641 return 0; 642 } 643 644 /* Poller for the DPDK compression driver. */ 645 static int 646 comp_dev_poller(void *args) 647 { 648 struct vbdev_compress *comp_bdev = args; 649 uint8_t cdev_id = comp_bdev->device_qp->device->cdev_id; 650 struct rte_comp_op *deq_ops[NUM_MAX_INFLIGHT_OPS]; 651 uint16_t num_deq; 652 struct spdk_reduce_vol_cb_args *reduce_args; 653 struct vbdev_comp_op *op_to_resubmit; 654 int rc, i; 655 656 num_deq = rte_compressdev_dequeue_burst(cdev_id, comp_bdev->device_qp->qp, deq_ops, 657 NUM_MAX_INFLIGHT_OPS); 658 for (i = 0; i < num_deq; i++) { 659 reduce_args = (struct spdk_reduce_vol_cb_args *)deq_ops[i]->m_src->userdata; 660 661 if (deq_ops[i]->status == RTE_COMP_OP_STATUS_SUCCESS) { 662 663 /* tell reduce this is done and what the bytecount was */ 664 reduce_args->cb_fn(reduce_args->cb_arg, deq_ops[i]->produced); 665 } else { 666 SPDK_NOTICELOG("FYI storing data uncompressed due to deque status %u\n", 667 deq_ops[i]->status); 668 669 /* Reduce will simply store uncompressed on neg errno value. */ 670 reduce_args->cb_fn(reduce_args->cb_arg, -EINVAL); 671 } 672 673 /* Now free both mbufs and the compress operation. The rte_pktmbuf_free() 674 * call takes care of freeing all of the mbufs in the chain back to their 675 * original pool. 676 */ 677 rte_pktmbuf_free(deq_ops[i]->m_src); 678 rte_pktmbuf_free(deq_ops[i]->m_dst); 679 680 /* There is no bulk free for com ops so we have to free them one at a time 681 * here however it would be rare that we'd ever have more than 1 at a time 682 * anyways. 683 */ 684 rte_comp_op_free(deq_ops[i]); 685 686 /* Check if there are any pending comp ops to process, only pull one 687 * at a time off as _compress_operation() may re-queue the op. 688 */ 689 if (!TAILQ_EMPTY(&comp_bdev->queued_comp_ops)) { 690 op_to_resubmit = TAILQ_FIRST(&comp_bdev->queued_comp_ops); 691 rc = _compress_operation(op_to_resubmit->backing_dev, 692 op_to_resubmit->src_iovs, 693 op_to_resubmit->src_iovcnt, 694 op_to_resubmit->dst_iovs, 695 op_to_resubmit->dst_iovcnt, 696 op_to_resubmit->compress, 697 op_to_resubmit->cb_arg); 698 if (rc == 0) { 699 TAILQ_REMOVE(&comp_bdev->queued_comp_ops, op_to_resubmit, link); 700 free(op_to_resubmit); 701 } 702 } 703 } 704 return num_deq == 0 ? SPDK_POLLER_IDLE : SPDK_POLLER_BUSY; 705 } 706 707 /* Entry point for reduce lib to issue a compress operation. */ 708 static void 709 _comp_reduce_compress(struct spdk_reduce_backing_dev *dev, 710 struct iovec *src_iovs, int src_iovcnt, 711 struct iovec *dst_iovs, int dst_iovcnt, 712 struct spdk_reduce_vol_cb_args *cb_arg) 713 { 714 int rc; 715 716 rc = _compress_operation(dev, src_iovs, src_iovcnt, dst_iovs, dst_iovcnt, true, cb_arg); 717 if (rc) { 718 SPDK_ERRLOG("with compress operation code %d (%s)\n", rc, spdk_strerror(-rc)); 719 cb_arg->cb_fn(cb_arg->cb_arg, rc); 720 } 721 } 722 723 /* Entry point for reduce lib to issue a decompress operation. */ 724 static void 725 _comp_reduce_decompress(struct spdk_reduce_backing_dev *dev, 726 struct iovec *src_iovs, int src_iovcnt, 727 struct iovec *dst_iovs, int dst_iovcnt, 728 struct spdk_reduce_vol_cb_args *cb_arg) 729 { 730 int rc; 731 732 rc = _compress_operation(dev, src_iovs, src_iovcnt, dst_iovs, dst_iovcnt, false, cb_arg); 733 if (rc) { 734 SPDK_ERRLOG("with decompress operation code %d (%s)\n", rc, spdk_strerror(-rc)); 735 cb_arg->cb_fn(cb_arg->cb_arg, rc); 736 } 737 } 738 739 /* Callback for getting a buf from the bdev pool in the event that the caller passed 740 * in NULL, we need to own the buffer so it doesn't get freed by another vbdev module 741 * beneath us before we're done with it. 742 */ 743 static void 744 comp_read_get_buf_cb(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io, bool success) 745 { 746 struct vbdev_compress *comp_bdev = SPDK_CONTAINEROF(bdev_io->bdev, struct vbdev_compress, 747 comp_bdev); 748 749 spdk_reduce_vol_readv(comp_bdev->vol, bdev_io->u.bdev.iovs, bdev_io->u.bdev.iovcnt, 750 bdev_io->u.bdev.offset_blocks, bdev_io->u.bdev.num_blocks, 751 reduce_rw_blocks_cb, bdev_io); 752 } 753 754 /* scheduled for completion on IO thread */ 755 static void 756 _complete_other_io(void *arg) 757 { 758 struct comp_bdev_io *io_ctx = (struct comp_bdev_io *)arg; 759 if (io_ctx->status == 0) { 760 spdk_bdev_io_complete(io_ctx->orig_io, SPDK_BDEV_IO_STATUS_SUCCESS); 761 } else { 762 spdk_bdev_io_complete(io_ctx->orig_io, SPDK_BDEV_IO_STATUS_FAILED); 763 } 764 } 765 766 /* scheduled for submission on reduce thread */ 767 static void 768 _comp_bdev_io_submit(void *arg) 769 { 770 struct spdk_bdev_io *bdev_io = arg; 771 struct comp_bdev_io *io_ctx = (struct comp_bdev_io *)bdev_io->driver_ctx; 772 struct spdk_io_channel *ch = spdk_io_channel_from_ctx(io_ctx->comp_ch); 773 struct vbdev_compress *comp_bdev = SPDK_CONTAINEROF(bdev_io->bdev, struct vbdev_compress, 774 comp_bdev); 775 struct spdk_thread *orig_thread; 776 int rc = 0; 777 778 switch (bdev_io->type) { 779 case SPDK_BDEV_IO_TYPE_READ: 780 spdk_bdev_io_get_buf(bdev_io, comp_read_get_buf_cb, 781 bdev_io->u.bdev.num_blocks * bdev_io->bdev->blocklen); 782 return; 783 case SPDK_BDEV_IO_TYPE_WRITE: 784 spdk_reduce_vol_writev(comp_bdev->vol, bdev_io->u.bdev.iovs, bdev_io->u.bdev.iovcnt, 785 bdev_io->u.bdev.offset_blocks, bdev_io->u.bdev.num_blocks, 786 reduce_rw_blocks_cb, bdev_io); 787 return; 788 /* TODO in future patch in the series */ 789 case SPDK_BDEV_IO_TYPE_RESET: 790 break; 791 case SPDK_BDEV_IO_TYPE_WRITE_ZEROES: 792 case SPDK_BDEV_IO_TYPE_UNMAP: 793 case SPDK_BDEV_IO_TYPE_FLUSH: 794 default: 795 SPDK_ERRLOG("Unknown I/O type %d\n", bdev_io->type); 796 rc = -EINVAL; 797 } 798 799 if (rc) { 800 if (rc == -ENOMEM) { 801 SPDK_ERRLOG("No memory, start to queue io for compress.\n"); 802 io_ctx->ch = ch; 803 vbdev_compress_queue_io(bdev_io); 804 return; 805 } else { 806 SPDK_ERRLOG("on bdev_io submission!\n"); 807 io_ctx->status = rc; 808 } 809 } 810 811 /* Complete this on the orig IO thread. */ 812 orig_thread = spdk_io_channel_get_thread(ch); 813 if (orig_thread != spdk_get_thread()) { 814 spdk_thread_send_msg(orig_thread, _complete_other_io, io_ctx); 815 } else { 816 _complete_other_io(io_ctx); 817 } 818 } 819 820 /* Called when someone above submits IO to this vbdev. */ 821 static void 822 vbdev_compress_submit_request(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io) 823 { 824 struct comp_bdev_io *io_ctx = (struct comp_bdev_io *)bdev_io->driver_ctx; 825 struct vbdev_compress *comp_bdev = SPDK_CONTAINEROF(bdev_io->bdev, struct vbdev_compress, 826 comp_bdev); 827 struct comp_io_channel *comp_ch = spdk_io_channel_get_ctx(ch); 828 829 memset(io_ctx, 0, sizeof(struct comp_bdev_io)); 830 io_ctx->comp_bdev = comp_bdev; 831 io_ctx->comp_ch = comp_ch; 832 io_ctx->orig_io = bdev_io; 833 834 /* Send this request to the reduce_thread if that's not what we're on. */ 835 if (spdk_get_thread() != comp_bdev->reduce_thread) { 836 spdk_thread_send_msg(comp_bdev->reduce_thread, _comp_bdev_io_submit, bdev_io); 837 } else { 838 _comp_bdev_io_submit(bdev_io); 839 } 840 } 841 842 static bool 843 vbdev_compress_io_type_supported(void *ctx, enum spdk_bdev_io_type io_type) 844 { 845 struct vbdev_compress *comp_bdev = (struct vbdev_compress *)ctx; 846 847 switch (io_type) { 848 case SPDK_BDEV_IO_TYPE_READ: 849 case SPDK_BDEV_IO_TYPE_WRITE: 850 return spdk_bdev_io_type_supported(comp_bdev->base_bdev, io_type); 851 case SPDK_BDEV_IO_TYPE_UNMAP: 852 case SPDK_BDEV_IO_TYPE_RESET: 853 case SPDK_BDEV_IO_TYPE_FLUSH: 854 case SPDK_BDEV_IO_TYPE_WRITE_ZEROES: 855 default: 856 return false; 857 } 858 } 859 860 /* Resubmission function used by the bdev layer when a queued IO is ready to be 861 * submitted. 862 */ 863 static void 864 vbdev_compress_resubmit_io(void *arg) 865 { 866 struct spdk_bdev_io *bdev_io = (struct spdk_bdev_io *)arg; 867 struct comp_bdev_io *io_ctx = (struct comp_bdev_io *)bdev_io->driver_ctx; 868 869 vbdev_compress_submit_request(io_ctx->ch, bdev_io); 870 } 871 872 /* Used to queue an IO in the event of resource issues. */ 873 static void 874 vbdev_compress_queue_io(struct spdk_bdev_io *bdev_io) 875 { 876 struct comp_bdev_io *io_ctx = (struct comp_bdev_io *)bdev_io->driver_ctx; 877 int rc; 878 879 io_ctx->bdev_io_wait.bdev = bdev_io->bdev; 880 io_ctx->bdev_io_wait.cb_fn = vbdev_compress_resubmit_io; 881 io_ctx->bdev_io_wait.cb_arg = bdev_io; 882 883 rc = spdk_bdev_queue_io_wait(bdev_io->bdev, io_ctx->comp_bdev->base_ch, &io_ctx->bdev_io_wait); 884 if (rc) { 885 SPDK_ERRLOG("Queue io failed in vbdev_compress_queue_io, rc=%d.\n", rc); 886 assert(false); 887 spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED); 888 } 889 } 890 891 /* Callback for unregistering the IO device. */ 892 static void 893 _device_unregister_cb(void *io_device) 894 { 895 struct vbdev_compress *comp_bdev = io_device; 896 897 /* Done with this comp_bdev. */ 898 pthread_mutex_destroy(&comp_bdev->reduce_lock); 899 free(comp_bdev->comp_bdev.name); 900 free(comp_bdev); 901 } 902 903 static void 904 _vbdev_compress_destruct_cb(void *ctx) 905 { 906 struct vbdev_compress *comp_bdev = ctx; 907 908 TAILQ_REMOVE(&g_vbdev_comp, comp_bdev, link); 909 spdk_bdev_module_release_bdev(comp_bdev->base_bdev); 910 /* Close the underlying bdev on its same opened thread. */ 911 spdk_bdev_close(comp_bdev->base_desc); 912 comp_bdev->vol = NULL; 913 if (comp_bdev->orphaned == false) { 914 spdk_io_device_unregister(comp_bdev, _device_unregister_cb); 915 } else { 916 vbdev_compress_delete_done(comp_bdev->delete_ctx, 0); 917 _device_unregister_cb(comp_bdev); 918 } 919 } 920 921 static void 922 vbdev_compress_destruct_cb(void *cb_arg, int reduce_errno) 923 { 924 struct vbdev_compress *comp_bdev = (struct vbdev_compress *)cb_arg; 925 926 if (reduce_errno) { 927 SPDK_ERRLOG("number %d\n", reduce_errno); 928 } else { 929 if (comp_bdev->thread && comp_bdev->thread != spdk_get_thread()) { 930 spdk_thread_send_msg(comp_bdev->thread, 931 _vbdev_compress_destruct_cb, comp_bdev); 932 } else { 933 _vbdev_compress_destruct_cb(comp_bdev); 934 } 935 } 936 } 937 938 static void 939 _reduce_destroy_cb(void *ctx, int reduce_errno) 940 { 941 struct vbdev_compress *comp_bdev = (struct vbdev_compress *)ctx; 942 943 if (reduce_errno) { 944 SPDK_ERRLOG("number %d\n", reduce_errno); 945 } 946 947 comp_bdev->vol = NULL; 948 spdk_put_io_channel(comp_bdev->base_ch); 949 if (comp_bdev->orphaned == false) { 950 spdk_bdev_unregister(&comp_bdev->comp_bdev, vbdev_compress_delete_done, 951 comp_bdev->delete_ctx); 952 } else { 953 vbdev_compress_destruct_cb((void *)comp_bdev, 0); 954 } 955 956 } 957 958 static void 959 _delete_vol_unload_cb(void *ctx) 960 { 961 struct vbdev_compress *comp_bdev = ctx; 962 963 /* FIXME: Assert if these conditions are not satisified for now. */ 964 assert(!comp_bdev->reduce_thread || 965 comp_bdev->reduce_thread == spdk_get_thread()); 966 967 /* reducelib needs a channel to comm with the backing device */ 968 comp_bdev->base_ch = spdk_bdev_get_io_channel(comp_bdev->base_desc); 969 970 /* Clean the device before we free our resources. */ 971 spdk_reduce_vol_destroy(&comp_bdev->backing_dev, _reduce_destroy_cb, comp_bdev); 972 } 973 974 /* Called by reduceLib after performing unload vol actions */ 975 static void 976 delete_vol_unload_cb(void *cb_arg, int reduce_errno) 977 { 978 struct vbdev_compress *comp_bdev = (struct vbdev_compress *)cb_arg; 979 980 if (reduce_errno) { 981 SPDK_ERRLOG("number %d\n", reduce_errno); 982 /* FIXME: callback should be executed. */ 983 return; 984 } 985 986 pthread_mutex_lock(&comp_bdev->reduce_lock); 987 if (comp_bdev->reduce_thread && comp_bdev->reduce_thread != spdk_get_thread()) { 988 spdk_thread_send_msg(comp_bdev->reduce_thread, 989 _delete_vol_unload_cb, comp_bdev); 990 pthread_mutex_unlock(&comp_bdev->reduce_lock); 991 } else { 992 pthread_mutex_unlock(&comp_bdev->reduce_lock); 993 994 _delete_vol_unload_cb(comp_bdev); 995 } 996 } 997 998 const char * 999 compress_get_name(const struct vbdev_compress *comp_bdev) 1000 { 1001 return comp_bdev->comp_bdev.name; 1002 } 1003 1004 struct vbdev_compress * 1005 compress_bdev_first(void) 1006 { 1007 struct vbdev_compress *comp_bdev; 1008 1009 comp_bdev = TAILQ_FIRST(&g_vbdev_comp); 1010 1011 return comp_bdev; 1012 } 1013 1014 struct vbdev_compress * 1015 compress_bdev_next(struct vbdev_compress *prev) 1016 { 1017 struct vbdev_compress *comp_bdev; 1018 1019 comp_bdev = TAILQ_NEXT(prev, link); 1020 1021 return comp_bdev; 1022 } 1023 1024 bool 1025 compress_has_orphan(const char *name) 1026 { 1027 struct vbdev_compress *comp_bdev; 1028 1029 TAILQ_FOREACH(comp_bdev, &g_vbdev_comp, link) { 1030 if (comp_bdev->orphaned && strcmp(name, comp_bdev->comp_bdev.name) == 0) { 1031 return true; 1032 } 1033 } 1034 return false; 1035 } 1036 1037 /* Called after we've unregistered following a hot remove callback. 1038 * Our finish entry point will be called next. 1039 */ 1040 static int 1041 vbdev_compress_destruct(void *ctx) 1042 { 1043 struct vbdev_compress *comp_bdev = (struct vbdev_compress *)ctx; 1044 1045 if (comp_bdev->vol != NULL) { 1046 /* Tell reducelib that we're done with this volume. */ 1047 spdk_reduce_vol_unload(comp_bdev->vol, vbdev_compress_destruct_cb, comp_bdev); 1048 } else { 1049 vbdev_compress_destruct_cb(comp_bdev, 0); 1050 } 1051 1052 return 0; 1053 } 1054 1055 /* We supplied this as an entry point for upper layers who want to communicate to this 1056 * bdev. This is how they get a channel. 1057 */ 1058 static struct spdk_io_channel * 1059 vbdev_compress_get_io_channel(void *ctx) 1060 { 1061 struct vbdev_compress *comp_bdev = (struct vbdev_compress *)ctx; 1062 1063 /* The IO channel code will allocate a channel for us which consists of 1064 * the SPDK channel structure plus the size of our comp_io_channel struct 1065 * that we passed in when we registered our IO device. It will then call 1066 * our channel create callback to populate any elements that we need to 1067 * update. 1068 */ 1069 return spdk_get_io_channel(comp_bdev); 1070 } 1071 1072 /* This is the output for bdev_get_bdevs() for this vbdev */ 1073 static int 1074 vbdev_compress_dump_info_json(void *ctx, struct spdk_json_write_ctx *w) 1075 { 1076 struct vbdev_compress *comp_bdev = (struct vbdev_compress *)ctx; 1077 1078 spdk_json_write_name(w, "compress"); 1079 spdk_json_write_object_begin(w); 1080 spdk_json_write_named_string(w, "name", spdk_bdev_get_name(&comp_bdev->comp_bdev)); 1081 spdk_json_write_named_string(w, "base_bdev_name", spdk_bdev_get_name(comp_bdev->base_bdev)); 1082 spdk_json_write_named_string(w, "compression_pmd", comp_bdev->drv_name); 1083 spdk_json_write_object_end(w); 1084 1085 return 0; 1086 } 1087 1088 /* This is used to generate JSON that can configure this module to its current state. */ 1089 static int 1090 vbdev_compress_config_json(struct spdk_json_write_ctx *w) 1091 { 1092 struct vbdev_compress *comp_bdev; 1093 1094 TAILQ_FOREACH(comp_bdev, &g_vbdev_comp, link) { 1095 spdk_json_write_object_begin(w); 1096 spdk_json_write_named_string(w, "method", "bdev_compress_create"); 1097 spdk_json_write_named_object_begin(w, "params"); 1098 spdk_json_write_named_string(w, "base_bdev_name", spdk_bdev_get_name(comp_bdev->base_bdev)); 1099 spdk_json_write_named_string(w, "name", spdk_bdev_get_name(&comp_bdev->comp_bdev)); 1100 spdk_json_write_named_string(w, "compression_pmd", comp_bdev->drv_name); 1101 spdk_json_write_object_end(w); 1102 spdk_json_write_object_end(w); 1103 } 1104 return 0; 1105 } 1106 1107 static void 1108 _vbdev_reduce_init_cb(void *ctx) 1109 { 1110 struct vbdev_compress *meta_ctx = ctx; 1111 int rc; 1112 1113 assert(meta_ctx->base_desc != NULL); 1114 1115 /* We're done with metadata operations */ 1116 spdk_put_io_channel(meta_ctx->base_ch); 1117 1118 if (meta_ctx->vol) { 1119 rc = vbdev_compress_claim(meta_ctx); 1120 if (rc == 0) { 1121 return; 1122 } 1123 } 1124 1125 /* Close the underlying bdev on its same opened thread. */ 1126 spdk_bdev_close(meta_ctx->base_desc); 1127 free(meta_ctx); 1128 } 1129 1130 /* Callback from reduce for when init is complete. We'll pass the vbdev_comp struct 1131 * used for initial metadata operations to claim where it will be further filled out 1132 * and added to the global list. 1133 */ 1134 static void 1135 vbdev_reduce_init_cb(void *cb_arg, struct spdk_reduce_vol *vol, int reduce_errno) 1136 { 1137 struct vbdev_compress *meta_ctx = cb_arg; 1138 1139 if (reduce_errno == 0) { 1140 meta_ctx->vol = vol; 1141 } else { 1142 SPDK_ERRLOG("for vol %s, error %u\n", 1143 spdk_bdev_get_name(meta_ctx->base_bdev), reduce_errno); 1144 } 1145 1146 if (meta_ctx->thread && meta_ctx->thread != spdk_get_thread()) { 1147 spdk_thread_send_msg(meta_ctx->thread, _vbdev_reduce_init_cb, meta_ctx); 1148 } else { 1149 _vbdev_reduce_init_cb(meta_ctx); 1150 } 1151 } 1152 1153 /* Callback for the function used by reduceLib to perform IO to/from the backing device. We just 1154 * call the callback provided by reduceLib when it called the read/write/unmap function and 1155 * free the bdev_io. 1156 */ 1157 static void 1158 comp_reduce_io_cb(struct spdk_bdev_io *bdev_io, bool success, void *arg) 1159 { 1160 struct spdk_reduce_vol_cb_args *cb_args = arg; 1161 int reduce_errno; 1162 1163 if (success) { 1164 reduce_errno = 0; 1165 } else { 1166 reduce_errno = -EIO; 1167 } 1168 spdk_bdev_free_io(bdev_io); 1169 cb_args->cb_fn(cb_args->cb_arg, reduce_errno); 1170 } 1171 1172 /* This is the function provided to the reduceLib for sending reads directly to 1173 * the backing device. 1174 */ 1175 static void 1176 _comp_reduce_readv(struct spdk_reduce_backing_dev *dev, struct iovec *iov, int iovcnt, 1177 uint64_t lba, uint32_t lba_count, struct spdk_reduce_vol_cb_args *args) 1178 { 1179 struct vbdev_compress *comp_bdev = SPDK_CONTAINEROF(dev, struct vbdev_compress, 1180 backing_dev); 1181 int rc; 1182 1183 rc = spdk_bdev_readv_blocks(comp_bdev->base_desc, comp_bdev->base_ch, 1184 iov, iovcnt, lba, lba_count, 1185 comp_reduce_io_cb, 1186 args); 1187 if (rc) { 1188 if (rc == -ENOMEM) { 1189 SPDK_ERRLOG("No memory, start to queue io.\n"); 1190 /* TODO: there's no bdev_io to queue */ 1191 } else { 1192 SPDK_ERRLOG("submitting readv request\n"); 1193 } 1194 args->cb_fn(args->cb_arg, rc); 1195 } 1196 } 1197 1198 /* This is the function provided to the reduceLib for sending writes directly to 1199 * the backing device. 1200 */ 1201 static void 1202 _comp_reduce_writev(struct spdk_reduce_backing_dev *dev, struct iovec *iov, int iovcnt, 1203 uint64_t lba, uint32_t lba_count, struct spdk_reduce_vol_cb_args *args) 1204 { 1205 struct vbdev_compress *comp_bdev = SPDK_CONTAINEROF(dev, struct vbdev_compress, 1206 backing_dev); 1207 int rc; 1208 1209 rc = spdk_bdev_writev_blocks(comp_bdev->base_desc, comp_bdev->base_ch, 1210 iov, iovcnt, lba, lba_count, 1211 comp_reduce_io_cb, 1212 args); 1213 if (rc) { 1214 if (rc == -ENOMEM) { 1215 SPDK_ERRLOG("No memory, start to queue io.\n"); 1216 /* TODO: there's no bdev_io to queue */ 1217 } else { 1218 SPDK_ERRLOG("error submitting writev request\n"); 1219 } 1220 args->cb_fn(args->cb_arg, rc); 1221 } 1222 } 1223 1224 /* This is the function provided to the reduceLib for sending unmaps directly to 1225 * the backing device. 1226 */ 1227 static void 1228 _comp_reduce_unmap(struct spdk_reduce_backing_dev *dev, 1229 uint64_t lba, uint32_t lba_count, struct spdk_reduce_vol_cb_args *args) 1230 { 1231 struct vbdev_compress *comp_bdev = SPDK_CONTAINEROF(dev, struct vbdev_compress, 1232 backing_dev); 1233 int rc; 1234 1235 rc = spdk_bdev_unmap_blocks(comp_bdev->base_desc, comp_bdev->base_ch, 1236 lba, lba_count, 1237 comp_reduce_io_cb, 1238 args); 1239 1240 if (rc) { 1241 if (rc == -ENOMEM) { 1242 SPDK_ERRLOG("No memory, start to queue io.\n"); 1243 /* TODO: there's no bdev_io to queue */ 1244 } else { 1245 SPDK_ERRLOG("submitting unmap request\n"); 1246 } 1247 args->cb_fn(args->cb_arg, rc); 1248 } 1249 } 1250 1251 /* Called by reduceLib after performing unload vol actions following base bdev hotremove */ 1252 static void 1253 bdev_hotremove_vol_unload_cb(void *cb_arg, int reduce_errno) 1254 { 1255 struct vbdev_compress *comp_bdev = (struct vbdev_compress *)cb_arg; 1256 1257 if (reduce_errno) { 1258 SPDK_ERRLOG("number %d\n", reduce_errno); 1259 } 1260 1261 comp_bdev->vol = NULL; 1262 spdk_bdev_unregister(&comp_bdev->comp_bdev, NULL, NULL); 1263 } 1264 1265 static void 1266 vbdev_compress_base_bdev_hotremove_cb(struct spdk_bdev *bdev_find) 1267 { 1268 struct vbdev_compress *comp_bdev, *tmp; 1269 1270 TAILQ_FOREACH_SAFE(comp_bdev, &g_vbdev_comp, link, tmp) { 1271 if (bdev_find == comp_bdev->base_bdev) { 1272 /* Tell reduceLib that we're done with this volume. */ 1273 spdk_reduce_vol_unload(comp_bdev->vol, bdev_hotremove_vol_unload_cb, comp_bdev); 1274 } 1275 } 1276 } 1277 1278 /* Called when the underlying base bdev triggers asynchronous event such as bdev removal. */ 1279 static void 1280 vbdev_compress_base_bdev_event_cb(enum spdk_bdev_event_type type, struct spdk_bdev *bdev, 1281 void *event_ctx) 1282 { 1283 switch (type) { 1284 case SPDK_BDEV_EVENT_REMOVE: 1285 vbdev_compress_base_bdev_hotremove_cb(bdev); 1286 break; 1287 default: 1288 SPDK_NOTICELOG("Unsupported bdev event: type %d\n", type); 1289 break; 1290 } 1291 } 1292 1293 /* TODO: determine which parms we want user configurable, HC for now 1294 * params.vol_size 1295 * params.chunk_size 1296 * compression PMD, algorithm, window size, comp level, etc. 1297 * DEV_MD_PATH 1298 */ 1299 1300 /* Common function for init and load to allocate and populate the minimal 1301 * information for reducelib to init or load. 1302 */ 1303 struct vbdev_compress * 1304 _prepare_for_load_init(struct spdk_bdev_desc *bdev_desc, uint32_t lb_size) 1305 { 1306 struct vbdev_compress *meta_ctx; 1307 struct spdk_bdev *bdev; 1308 1309 meta_ctx = calloc(1, sizeof(struct vbdev_compress)); 1310 if (meta_ctx == NULL) { 1311 SPDK_ERRLOG("failed to alloc init contexts\n"); 1312 return NULL; 1313 } 1314 1315 meta_ctx->drv_name = "None"; 1316 meta_ctx->backing_dev.unmap = _comp_reduce_unmap; 1317 meta_ctx->backing_dev.readv = _comp_reduce_readv; 1318 meta_ctx->backing_dev.writev = _comp_reduce_writev; 1319 meta_ctx->backing_dev.compress = _comp_reduce_compress; 1320 meta_ctx->backing_dev.decompress = _comp_reduce_decompress; 1321 1322 meta_ctx->base_desc = bdev_desc; 1323 bdev = spdk_bdev_desc_get_bdev(bdev_desc); 1324 meta_ctx->base_bdev = bdev; 1325 1326 meta_ctx->backing_dev.blocklen = bdev->blocklen; 1327 meta_ctx->backing_dev.blockcnt = bdev->blockcnt; 1328 1329 meta_ctx->params.chunk_size = CHUNK_SIZE; 1330 if (lb_size == 0) { 1331 meta_ctx->params.logical_block_size = bdev->blocklen; 1332 } else { 1333 meta_ctx->params.logical_block_size = lb_size; 1334 } 1335 1336 meta_ctx->params.backing_io_unit_size = BACKING_IO_SZ; 1337 return meta_ctx; 1338 } 1339 1340 static bool 1341 _set_pmd(struct vbdev_compress *comp_dev) 1342 { 1343 if (g_opts == COMPRESS_PMD_AUTO) { 1344 if (g_qat_available) { 1345 comp_dev->drv_name = QAT_PMD; 1346 } else { 1347 comp_dev->drv_name = ISAL_PMD; 1348 } 1349 } else if (g_opts == COMPRESS_PMD_QAT_ONLY && g_qat_available) { 1350 comp_dev->drv_name = QAT_PMD; 1351 } else if (g_opts == COMPRESS_PMD_ISAL_ONLY && g_isal_available) { 1352 comp_dev->drv_name = ISAL_PMD; 1353 } else { 1354 SPDK_ERRLOG("Requested PMD is not available.\n"); 1355 return false; 1356 } 1357 SPDK_NOTICELOG("PMD being used: %s\n", comp_dev->drv_name); 1358 return true; 1359 } 1360 1361 /* Call reducelib to initialize a new volume */ 1362 static int 1363 vbdev_init_reduce(const char *bdev_name, const char *pm_path, uint32_t lb_size) 1364 { 1365 struct spdk_bdev_desc *bdev_desc = NULL; 1366 struct vbdev_compress *meta_ctx; 1367 int rc; 1368 1369 rc = spdk_bdev_open_ext(bdev_name, true, vbdev_compress_base_bdev_event_cb, 1370 NULL, &bdev_desc); 1371 if (rc) { 1372 SPDK_ERRLOG("could not open bdev %s\n", bdev_name); 1373 return rc; 1374 } 1375 1376 meta_ctx = _prepare_for_load_init(bdev_desc, lb_size); 1377 if (meta_ctx == NULL) { 1378 spdk_bdev_close(bdev_desc); 1379 return -EINVAL; 1380 } 1381 1382 if (_set_pmd(meta_ctx) == false) { 1383 SPDK_ERRLOG("could not find required pmd\n"); 1384 free(meta_ctx); 1385 spdk_bdev_close(bdev_desc); 1386 return -EINVAL; 1387 } 1388 1389 /* Save the thread where the base device is opened */ 1390 meta_ctx->thread = spdk_get_thread(); 1391 1392 meta_ctx->base_ch = spdk_bdev_get_io_channel(meta_ctx->base_desc); 1393 1394 spdk_reduce_vol_init(&meta_ctx->params, &meta_ctx->backing_dev, 1395 pm_path, 1396 vbdev_reduce_init_cb, 1397 meta_ctx); 1398 return 0; 1399 } 1400 1401 /* We provide this callback for the SPDK channel code to create a channel using 1402 * the channel struct we provided in our module get_io_channel() entry point. Here 1403 * we get and save off an underlying base channel of the device below us so that 1404 * we can communicate with the base bdev on a per channel basis. If we needed 1405 * our own poller for this vbdev, we'd register it here. 1406 */ 1407 static int 1408 comp_bdev_ch_create_cb(void *io_device, void *ctx_buf) 1409 { 1410 struct vbdev_compress *comp_bdev = io_device; 1411 struct comp_device_qp *device_qp; 1412 1413 /* Now set the reduce channel if it's not already set. */ 1414 pthread_mutex_lock(&comp_bdev->reduce_lock); 1415 if (comp_bdev->ch_count == 0) { 1416 /* We use this queue to track outstanding IO in our layer. */ 1417 TAILQ_INIT(&comp_bdev->pending_comp_ios); 1418 1419 /* We use this to queue up compression operations as needed. */ 1420 TAILQ_INIT(&comp_bdev->queued_comp_ops); 1421 1422 comp_bdev->base_ch = spdk_bdev_get_io_channel(comp_bdev->base_desc); 1423 comp_bdev->reduce_thread = spdk_get_thread(); 1424 comp_bdev->poller = SPDK_POLLER_REGISTER(comp_dev_poller, comp_bdev, 0); 1425 /* Now assign a q pair */ 1426 pthread_mutex_lock(&g_comp_device_qp_lock); 1427 TAILQ_FOREACH(device_qp, &g_comp_device_qp, link) { 1428 if (strcmp(device_qp->device->cdev_info.driver_name, comp_bdev->drv_name) == 0) { 1429 if (device_qp->thread == spdk_get_thread()) { 1430 comp_bdev->device_qp = device_qp; 1431 break; 1432 } 1433 if (device_qp->thread == NULL) { 1434 comp_bdev->device_qp = device_qp; 1435 device_qp->thread = spdk_get_thread(); 1436 break; 1437 } 1438 } 1439 } 1440 pthread_mutex_unlock(&g_comp_device_qp_lock); 1441 } 1442 comp_bdev->ch_count++; 1443 pthread_mutex_unlock(&comp_bdev->reduce_lock); 1444 1445 if (comp_bdev->device_qp != NULL) { 1446 return 0; 1447 } else { 1448 SPDK_ERRLOG("out of qpairs, cannot assign one to comp_bdev %p\n", comp_bdev); 1449 assert(false); 1450 return -ENOMEM; 1451 } 1452 } 1453 1454 static void 1455 _channel_cleanup(struct vbdev_compress *comp_bdev) 1456 { 1457 /* Note: comp_bdevs can share a device_qp if they are 1458 * on the same thread so we leave the device_qp element 1459 * alone for this comp_bdev and just clear the reduce thread. 1460 */ 1461 spdk_put_io_channel(comp_bdev->base_ch); 1462 comp_bdev->reduce_thread = NULL; 1463 spdk_poller_unregister(&comp_bdev->poller); 1464 } 1465 1466 /* Used to reroute destroy_ch to the correct thread */ 1467 static void 1468 _comp_bdev_ch_destroy_cb(void *arg) 1469 { 1470 struct vbdev_compress *comp_bdev = arg; 1471 1472 pthread_mutex_lock(&comp_bdev->reduce_lock); 1473 _channel_cleanup(comp_bdev); 1474 pthread_mutex_unlock(&comp_bdev->reduce_lock); 1475 } 1476 1477 /* We provide this callback for the SPDK channel code to destroy a channel 1478 * created with our create callback. We just need to undo anything we did 1479 * when we created. If this bdev used its own poller, we'd unregister it here. 1480 */ 1481 static void 1482 comp_bdev_ch_destroy_cb(void *io_device, void *ctx_buf) 1483 { 1484 struct vbdev_compress *comp_bdev = io_device; 1485 1486 pthread_mutex_lock(&comp_bdev->reduce_lock); 1487 comp_bdev->ch_count--; 1488 if (comp_bdev->ch_count == 0) { 1489 /* Send this request to the thread where the channel was created. */ 1490 if (comp_bdev->reduce_thread != spdk_get_thread()) { 1491 spdk_thread_send_msg(comp_bdev->reduce_thread, 1492 _comp_bdev_ch_destroy_cb, comp_bdev); 1493 } else { 1494 _channel_cleanup(comp_bdev); 1495 } 1496 } 1497 pthread_mutex_unlock(&comp_bdev->reduce_lock); 1498 } 1499 1500 /* RPC entry point for compression vbdev creation. */ 1501 int 1502 create_compress_bdev(const char *bdev_name, const char *pm_path, uint32_t lb_size) 1503 { 1504 if ((lb_size != 0) && (lb_size != LB_SIZE_4K) && (lb_size != LB_SIZE_512B)) { 1505 SPDK_ERRLOG("Logical block size must be 512 or 4096\n"); 1506 return -EINVAL; 1507 } 1508 1509 return vbdev_init_reduce(bdev_name, pm_path, lb_size); 1510 } 1511 1512 /* On init, just init the compress drivers. All metadata is stored on disk. */ 1513 static int 1514 vbdev_compress_init(void) 1515 { 1516 if (vbdev_init_compress_drivers()) { 1517 SPDK_ERRLOG("Error setting up compression devices\n"); 1518 return -EINVAL; 1519 } 1520 1521 return 0; 1522 } 1523 1524 /* Called when the entire module is being torn down. */ 1525 static void 1526 vbdev_compress_finish(void) 1527 { 1528 struct comp_device_qp *dev_qp; 1529 /* TODO: unload vol in a future patch */ 1530 1531 while ((dev_qp = TAILQ_FIRST(&g_comp_device_qp))) { 1532 TAILQ_REMOVE(&g_comp_device_qp, dev_qp, link); 1533 free(dev_qp); 1534 } 1535 pthread_mutex_destroy(&g_comp_device_qp_lock); 1536 1537 rte_mempool_free(g_comp_op_mp); 1538 rte_mempool_free(g_mbuf_mp); 1539 } 1540 1541 /* During init we'll be asked how much memory we'd like passed to us 1542 * in bev_io structures as context. Here's where we specify how 1543 * much context we want per IO. 1544 */ 1545 static int 1546 vbdev_compress_get_ctx_size(void) 1547 { 1548 return sizeof(struct comp_bdev_io); 1549 } 1550 1551 /* When we register our bdev this is how we specify our entry points. */ 1552 static const struct spdk_bdev_fn_table vbdev_compress_fn_table = { 1553 .destruct = vbdev_compress_destruct, 1554 .submit_request = vbdev_compress_submit_request, 1555 .io_type_supported = vbdev_compress_io_type_supported, 1556 .get_io_channel = vbdev_compress_get_io_channel, 1557 .dump_info_json = vbdev_compress_dump_info_json, 1558 .write_config_json = NULL, 1559 }; 1560 1561 static struct spdk_bdev_module compress_if = { 1562 .name = "compress", 1563 .module_init = vbdev_compress_init, 1564 .get_ctx_size = vbdev_compress_get_ctx_size, 1565 .examine_disk = vbdev_compress_examine, 1566 .module_fini = vbdev_compress_finish, 1567 .config_json = vbdev_compress_config_json 1568 }; 1569 1570 SPDK_BDEV_MODULE_REGISTER(compress, &compress_if) 1571 1572 static int _set_compbdev_name(struct vbdev_compress *comp_bdev) 1573 { 1574 struct spdk_bdev_alias *aliases; 1575 1576 if (!TAILQ_EMPTY(spdk_bdev_get_aliases(comp_bdev->base_bdev))) { 1577 aliases = TAILQ_FIRST(spdk_bdev_get_aliases(comp_bdev->base_bdev)); 1578 comp_bdev->comp_bdev.name = spdk_sprintf_alloc("COMP_%s", aliases->alias); 1579 if (!comp_bdev->comp_bdev.name) { 1580 SPDK_ERRLOG("could not allocate comp_bdev name for alias\n"); 1581 return -ENOMEM; 1582 } 1583 } else { 1584 comp_bdev->comp_bdev.name = spdk_sprintf_alloc("COMP_%s", comp_bdev->base_bdev->name); 1585 if (!comp_bdev->comp_bdev.name) { 1586 SPDK_ERRLOG("could not allocate comp_bdev name for unique name\n"); 1587 return -ENOMEM; 1588 } 1589 } 1590 return 0; 1591 } 1592 1593 static int 1594 vbdev_compress_claim(struct vbdev_compress *comp_bdev) 1595 { 1596 int rc; 1597 1598 if (_set_compbdev_name(comp_bdev)) { 1599 return -EINVAL; 1600 } 1601 1602 /* Note: some of the fields below will change in the future - for example, 1603 * blockcnt specifically will not match (the compressed volume size will 1604 * be slightly less than the base bdev size) 1605 */ 1606 comp_bdev->comp_bdev.product_name = COMP_BDEV_NAME; 1607 comp_bdev->comp_bdev.write_cache = comp_bdev->base_bdev->write_cache; 1608 1609 if (strcmp(comp_bdev->drv_name, QAT_PMD) == 0) { 1610 comp_bdev->comp_bdev.required_alignment = 1611 spdk_max(spdk_u32log2(comp_bdev->base_bdev->blocklen), 1612 comp_bdev->base_bdev->required_alignment); 1613 SPDK_NOTICELOG("QAT in use: Required alignment set to %u\n", 1614 comp_bdev->comp_bdev.required_alignment); 1615 } else { 1616 comp_bdev->comp_bdev.required_alignment = comp_bdev->base_bdev->required_alignment; 1617 } 1618 comp_bdev->comp_bdev.optimal_io_boundary = 1619 comp_bdev->params.chunk_size / comp_bdev->params.logical_block_size; 1620 1621 comp_bdev->comp_bdev.split_on_optimal_io_boundary = true; 1622 1623 comp_bdev->comp_bdev.blocklen = comp_bdev->params.logical_block_size; 1624 comp_bdev->comp_bdev.blockcnt = comp_bdev->params.vol_size / comp_bdev->comp_bdev.blocklen; 1625 assert(comp_bdev->comp_bdev.blockcnt > 0); 1626 1627 /* This is the context that is passed to us when the bdev 1628 * layer calls in so we'll save our comp_bdev node here. 1629 */ 1630 comp_bdev->comp_bdev.ctxt = comp_bdev; 1631 comp_bdev->comp_bdev.fn_table = &vbdev_compress_fn_table; 1632 comp_bdev->comp_bdev.module = &compress_if; 1633 1634 pthread_mutex_init(&comp_bdev->reduce_lock, NULL); 1635 1636 /* Save the thread where the base device is opened */ 1637 comp_bdev->thread = spdk_get_thread(); 1638 1639 spdk_io_device_register(comp_bdev, comp_bdev_ch_create_cb, comp_bdev_ch_destroy_cb, 1640 sizeof(struct comp_io_channel), 1641 comp_bdev->comp_bdev.name); 1642 1643 rc = spdk_bdev_module_claim_bdev(comp_bdev->base_bdev, comp_bdev->base_desc, 1644 comp_bdev->comp_bdev.module); 1645 if (rc) { 1646 SPDK_ERRLOG("could not claim bdev %s\n", spdk_bdev_get_name(comp_bdev->base_bdev)); 1647 goto error_claim; 1648 } 1649 1650 rc = spdk_bdev_register(&comp_bdev->comp_bdev); 1651 if (rc < 0) { 1652 SPDK_ERRLOG("trying to register bdev\n"); 1653 goto error_bdev_register; 1654 } 1655 1656 TAILQ_INSERT_TAIL(&g_vbdev_comp, comp_bdev, link); 1657 1658 SPDK_NOTICELOG("registered io_device and virtual bdev for: %s\n", comp_bdev->comp_bdev.name); 1659 1660 return 0; 1661 1662 /* Error cleanup paths. */ 1663 error_bdev_register: 1664 spdk_bdev_module_release_bdev(comp_bdev->base_bdev); 1665 error_claim: 1666 spdk_io_device_unregister(comp_bdev, NULL); 1667 free(comp_bdev->comp_bdev.name); 1668 return rc; 1669 } 1670 1671 static void 1672 _vbdev_compress_delete_done(void *_ctx) 1673 { 1674 struct vbdev_comp_delete_ctx *ctx = _ctx; 1675 1676 ctx->cb_fn(ctx->cb_arg, ctx->cb_rc); 1677 1678 free(ctx); 1679 } 1680 1681 static void 1682 vbdev_compress_delete_done(void *cb_arg, int bdeverrno) 1683 { 1684 struct vbdev_comp_delete_ctx *ctx = cb_arg; 1685 1686 ctx->cb_rc = bdeverrno; 1687 1688 if (ctx->orig_thread != spdk_get_thread()) { 1689 spdk_thread_send_msg(ctx->orig_thread, _vbdev_compress_delete_done, ctx); 1690 } else { 1691 _vbdev_compress_delete_done(ctx); 1692 } 1693 } 1694 1695 void 1696 bdev_compress_delete(const char *name, spdk_delete_compress_complete cb_fn, void *cb_arg) 1697 { 1698 struct vbdev_compress *comp_bdev = NULL; 1699 struct vbdev_comp_delete_ctx *ctx; 1700 1701 TAILQ_FOREACH(comp_bdev, &g_vbdev_comp, link) { 1702 if (strcmp(name, comp_bdev->comp_bdev.name) == 0) { 1703 break; 1704 } 1705 } 1706 1707 if (comp_bdev == NULL) { 1708 cb_fn(cb_arg, -ENODEV); 1709 return; 1710 } 1711 1712 ctx = calloc(1, sizeof(*ctx)); 1713 if (ctx == NULL) { 1714 SPDK_ERRLOG("Failed to allocate delete context\n"); 1715 cb_fn(cb_arg, -ENOMEM); 1716 return; 1717 } 1718 1719 /* Save these for after the vol is destroyed. */ 1720 ctx->cb_fn = cb_fn; 1721 ctx->cb_arg = cb_arg; 1722 ctx->orig_thread = spdk_get_thread(); 1723 1724 comp_bdev->delete_ctx = ctx; 1725 1726 /* Tell reducelib that we're done with this volume. */ 1727 if (comp_bdev->orphaned == false) { 1728 spdk_reduce_vol_unload(comp_bdev->vol, delete_vol_unload_cb, comp_bdev); 1729 } else { 1730 delete_vol_unload_cb(comp_bdev, 0); 1731 } 1732 } 1733 1734 static void 1735 _vbdev_reduce_load_cb(void *ctx) 1736 { 1737 struct vbdev_compress *meta_ctx = ctx; 1738 int rc; 1739 1740 assert(meta_ctx->base_desc != NULL); 1741 1742 /* Done with metadata operations */ 1743 spdk_put_io_channel(meta_ctx->base_ch); 1744 1745 if (meta_ctx->reduce_errno == 0) { 1746 if (_set_pmd(meta_ctx) == false) { 1747 SPDK_ERRLOG("could not find required pmd\n"); 1748 goto err; 1749 } 1750 1751 rc = vbdev_compress_claim(meta_ctx); 1752 if (rc != 0) { 1753 goto err; 1754 } 1755 } else if (meta_ctx->reduce_errno == -ENOENT) { 1756 if (_set_compbdev_name(meta_ctx)) { 1757 goto err; 1758 } 1759 1760 /* Save the thread where the base device is opened */ 1761 meta_ctx->thread = spdk_get_thread(); 1762 1763 meta_ctx->comp_bdev.module = &compress_if; 1764 pthread_mutex_init(&meta_ctx->reduce_lock, NULL); 1765 rc = spdk_bdev_module_claim_bdev(meta_ctx->base_bdev, meta_ctx->base_desc, 1766 meta_ctx->comp_bdev.module); 1767 if (rc) { 1768 SPDK_ERRLOG("could not claim bdev %s\n", spdk_bdev_get_name(meta_ctx->base_bdev)); 1769 free(meta_ctx->comp_bdev.name); 1770 goto err; 1771 } 1772 1773 meta_ctx->orphaned = true; 1774 TAILQ_INSERT_TAIL(&g_vbdev_comp, meta_ctx, link); 1775 } else { 1776 if (meta_ctx->reduce_errno != -EILSEQ) { 1777 SPDK_ERRLOG("for vol %s, error %u\n", 1778 spdk_bdev_get_name(meta_ctx->base_bdev), meta_ctx->reduce_errno); 1779 } 1780 goto err; 1781 } 1782 1783 spdk_bdev_module_examine_done(&compress_if); 1784 return; 1785 1786 err: 1787 /* Close the underlying bdev on its same opened thread. */ 1788 spdk_bdev_close(meta_ctx->base_desc); 1789 free(meta_ctx); 1790 spdk_bdev_module_examine_done(&compress_if); 1791 } 1792 1793 /* Callback from reduce for then load is complete. We'll pass the vbdev_comp struct 1794 * used for initial metadata operations to claim where it will be further filled out 1795 * and added to the global list. 1796 */ 1797 static void 1798 vbdev_reduce_load_cb(void *cb_arg, struct spdk_reduce_vol *vol, int reduce_errno) 1799 { 1800 struct vbdev_compress *meta_ctx = cb_arg; 1801 1802 if (reduce_errno == 0) { 1803 /* Update information following volume load. */ 1804 meta_ctx->vol = vol; 1805 memcpy(&meta_ctx->params, spdk_reduce_vol_get_params(vol), 1806 sizeof(struct spdk_reduce_vol_params)); 1807 } 1808 1809 meta_ctx->reduce_errno = reduce_errno; 1810 1811 if (meta_ctx->thread && meta_ctx->thread != spdk_get_thread()) { 1812 spdk_thread_send_msg(meta_ctx->thread, _vbdev_reduce_load_cb, meta_ctx); 1813 } else { 1814 _vbdev_reduce_load_cb(meta_ctx); 1815 } 1816 1817 } 1818 1819 /* Examine_disk entry point: will do a metadata load to see if this is ours, 1820 * and if so will go ahead and claim it. 1821 */ 1822 static void 1823 vbdev_compress_examine(struct spdk_bdev *bdev) 1824 { 1825 struct spdk_bdev_desc *bdev_desc = NULL; 1826 struct vbdev_compress *meta_ctx; 1827 int rc; 1828 1829 if (strcmp(bdev->product_name, COMP_BDEV_NAME) == 0) { 1830 spdk_bdev_module_examine_done(&compress_if); 1831 return; 1832 } 1833 1834 rc = spdk_bdev_open_ext(spdk_bdev_get_name(bdev), false, 1835 vbdev_compress_base_bdev_event_cb, NULL, &bdev_desc); 1836 if (rc) { 1837 SPDK_ERRLOG("could not open bdev %s\n", spdk_bdev_get_name(bdev)); 1838 spdk_bdev_module_examine_done(&compress_if); 1839 return; 1840 } 1841 1842 meta_ctx = _prepare_for_load_init(bdev_desc, 0); 1843 if (meta_ctx == NULL) { 1844 spdk_bdev_close(bdev_desc); 1845 spdk_bdev_module_examine_done(&compress_if); 1846 return; 1847 } 1848 1849 /* Save the thread where the base device is opened */ 1850 meta_ctx->thread = spdk_get_thread(); 1851 1852 meta_ctx->base_ch = spdk_bdev_get_io_channel(meta_ctx->base_desc); 1853 spdk_reduce_vol_load(&meta_ctx->backing_dev, vbdev_reduce_load_cb, meta_ctx); 1854 } 1855 1856 int 1857 compress_set_pmd(enum compress_pmd *opts) 1858 { 1859 g_opts = *opts; 1860 1861 return 0; 1862 } 1863 1864 SPDK_LOG_REGISTER_COMPONENT(vbdev_compress) 1865