1 /*- 2 * BSD LICENSE 3 * 4 * Copyright (c) Intel Corporation. 5 * All rights reserved. 6 * Copyright (c) 2021 NVIDIA CORPORATION & AFFILIATES. All rights reserved. 7 * 8 * Redistribution and use in source and binary forms, with or without 9 * modification, are permitted provided that the following conditions 10 * are met: 11 * 12 * * Redistributions of source code must retain the above copyright 13 * notice, this list of conditions and the following disclaimer. 14 * * Redistributions in binary form must reproduce the above copyright 15 * notice, this list of conditions and the following disclaimer in 16 * the documentation and/or other materials provided with the 17 * distribution. 18 * * Neither the name of Intel Corporation nor the names of its 19 * contributors may be used to endorse or promote products derived 20 * from this software without specific prior written permission. 21 * 22 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 23 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 24 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR 25 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT 26 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, 27 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT 28 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 29 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY 30 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 31 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE 32 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 33 */ 34 35 #include "vbdev_compress.h" 36 37 #include "spdk/reduce.h" 38 #include "spdk/stdinc.h" 39 #include "spdk/rpc.h" 40 #include "spdk/env.h" 41 #include "spdk/endian.h" 42 #include "spdk/string.h" 43 #include "spdk/thread.h" 44 #include "spdk/util.h" 45 #include "spdk/bdev_module.h" 46 47 #include "spdk/log.h" 48 49 #include <rte_config.h> 50 #include <rte_bus_vdev.h> 51 #include <rte_compressdev.h> 52 #include <rte_comp.h> 53 #include <rte_mbuf_dyn.h> 54 55 /* Used to store IO context in mbuf */ 56 static const struct rte_mbuf_dynfield rte_mbuf_dynfield_io_context = { 57 .name = "context_reduce", 58 .size = sizeof(uint64_t), 59 .align = __alignof__(uint64_t), 60 .flags = 0, 61 }; 62 static int g_mbuf_offset; 63 64 #define NUM_MAX_XFORMS 2 65 #define NUM_MAX_INFLIGHT_OPS 128 66 #define DEFAULT_WINDOW_SIZE 15 67 /* We need extra mbufs per operation to accommodate host buffers that 68 * span a 2MB boundary. 69 */ 70 #define MAX_MBUFS_PER_OP (REDUCE_MAX_IOVECS * 2) 71 #define CHUNK_SIZE (1024 * 16) 72 #define COMP_BDEV_NAME "compress" 73 #define BACKING_IO_SZ (4 * 1024) 74 75 #define ISAL_PMD "compress_isal" 76 #define QAT_PMD "compress_qat" 77 #define MLX5_PMD "mlx5_pci" 78 #define NUM_MBUFS 8192 79 #define POOL_CACHE_SIZE 256 80 81 static enum compress_pmd g_opts; 82 83 /* Global list of available compression devices. */ 84 struct compress_dev { 85 struct rte_compressdev_info cdev_info; /* includes device friendly name */ 86 uint8_t cdev_id; /* identifier for the device */ 87 void *comp_xform; /* shared private xform for comp on this PMD */ 88 void *decomp_xform; /* shared private xform for decomp on this PMD */ 89 TAILQ_ENTRY(compress_dev) link; 90 }; 91 static TAILQ_HEAD(, compress_dev) g_compress_devs = TAILQ_HEAD_INITIALIZER(g_compress_devs); 92 93 /* Although ISAL PMD reports 'unlimited' qpairs, it has an unplanned limit of 99 due to 94 * the length of the internal ring name that it creates, it breaks a limit in the generic 95 * ring code and fails the qp initialization. 96 * FIXME: Reduce number of qpairs to 48, due to issue #2338 97 */ 98 #define MAX_NUM_QP 48 99 /* Global list and lock for unique device/queue pair combos */ 100 struct comp_device_qp { 101 struct compress_dev *device; /* ptr to compression device */ 102 uint8_t qp; /* queue pair for this node */ 103 struct spdk_thread *thread; /* thread that this qp is assigned to */ 104 TAILQ_ENTRY(comp_device_qp) link; 105 }; 106 static TAILQ_HEAD(, comp_device_qp) g_comp_device_qp = TAILQ_HEAD_INITIALIZER(g_comp_device_qp); 107 static pthread_mutex_t g_comp_device_qp_lock = PTHREAD_MUTEX_INITIALIZER; 108 109 /* For queueing up compression operations that we can't submit for some reason */ 110 struct vbdev_comp_op { 111 struct spdk_reduce_backing_dev *backing_dev; 112 struct iovec *src_iovs; 113 int src_iovcnt; 114 struct iovec *dst_iovs; 115 int dst_iovcnt; 116 bool compress; 117 void *cb_arg; 118 TAILQ_ENTRY(vbdev_comp_op) link; 119 }; 120 121 struct vbdev_comp_delete_ctx { 122 spdk_delete_compress_complete cb_fn; 123 void *cb_arg; 124 int cb_rc; 125 struct spdk_thread *orig_thread; 126 }; 127 128 /* List of virtual bdevs and associated info for each. */ 129 struct vbdev_compress { 130 struct spdk_bdev *base_bdev; /* the thing we're attaching to */ 131 struct spdk_bdev_desc *base_desc; /* its descriptor we get from open */ 132 struct spdk_io_channel *base_ch; /* IO channel of base device */ 133 struct spdk_bdev comp_bdev; /* the compression virtual bdev */ 134 struct comp_io_channel *comp_ch; /* channel associated with this bdev */ 135 char *drv_name; /* name of the compression device driver */ 136 struct comp_device_qp *device_qp; 137 struct spdk_thread *reduce_thread; 138 pthread_mutex_t reduce_lock; 139 uint32_t ch_count; 140 TAILQ_HEAD(, spdk_bdev_io) pending_comp_ios; /* outstanding operations to a comp library */ 141 struct spdk_poller *poller; /* completion poller */ 142 struct spdk_reduce_vol_params params; /* params for the reduce volume */ 143 struct spdk_reduce_backing_dev backing_dev; /* backing device info for the reduce volume */ 144 struct spdk_reduce_vol *vol; /* the reduce volume */ 145 struct vbdev_comp_delete_ctx *delete_ctx; 146 bool orphaned; /* base bdev claimed but comp_bdev not registered */ 147 int reduce_errno; 148 TAILQ_HEAD(, vbdev_comp_op) queued_comp_ops; 149 TAILQ_ENTRY(vbdev_compress) link; 150 struct spdk_thread *thread; /* thread where base device is opened */ 151 }; 152 static TAILQ_HEAD(, vbdev_compress) g_vbdev_comp = TAILQ_HEAD_INITIALIZER(g_vbdev_comp); 153 154 /* The comp vbdev channel struct. It is allocated and freed on my behalf by the io channel code. 155 */ 156 struct comp_io_channel { 157 struct spdk_io_channel_iter *iter; /* used with for_each_channel in reset */ 158 }; 159 160 /* Per I/O context for the compression vbdev. */ 161 struct comp_bdev_io { 162 struct comp_io_channel *comp_ch; /* used in completion handling */ 163 struct vbdev_compress *comp_bdev; /* vbdev associated with this IO */ 164 struct spdk_bdev_io_wait_entry bdev_io_wait; /* for bdev_io_wait */ 165 struct spdk_bdev_io *orig_io; /* the original IO */ 166 struct spdk_io_channel *ch; /* for resubmission */ 167 int status; /* save for completion on orig thread */ 168 }; 169 170 /* Shared mempools between all devices on this system */ 171 static struct rte_mempool *g_mbuf_mp = NULL; /* mbuf mempool */ 172 static struct rte_mempool *g_comp_op_mp = NULL; /* comp operations, must be rte* mempool */ 173 static struct rte_mbuf_ext_shared_info g_shinfo = {}; /* used by DPDK mbuf macros */ 174 static bool g_qat_available = false; 175 static bool g_isal_available = false; 176 static bool g_mlx5_pci_available = false; 177 178 /* Create shared (between all ops per PMD) compress xforms. */ 179 static struct rte_comp_xform g_comp_xform = { 180 .type = RTE_COMP_COMPRESS, 181 .compress = { 182 .algo = RTE_COMP_ALGO_DEFLATE, 183 .deflate.huffman = RTE_COMP_HUFFMAN_DEFAULT, 184 .level = RTE_COMP_LEVEL_MAX, 185 .window_size = DEFAULT_WINDOW_SIZE, 186 .chksum = RTE_COMP_CHECKSUM_NONE, 187 .hash_algo = RTE_COMP_HASH_ALGO_NONE 188 } 189 }; 190 /* Create shared (between all ops per PMD) decompress xforms. */ 191 static struct rte_comp_xform g_decomp_xform = { 192 .type = RTE_COMP_DECOMPRESS, 193 .decompress = { 194 .algo = RTE_COMP_ALGO_DEFLATE, 195 .chksum = RTE_COMP_CHECKSUM_NONE, 196 .window_size = DEFAULT_WINDOW_SIZE, 197 .hash_algo = RTE_COMP_HASH_ALGO_NONE 198 } 199 }; 200 201 static void vbdev_compress_examine(struct spdk_bdev *bdev); 202 static int vbdev_compress_claim(struct vbdev_compress *comp_bdev); 203 static void vbdev_compress_queue_io(struct spdk_bdev_io *bdev_io); 204 struct vbdev_compress *_prepare_for_load_init(struct spdk_bdev_desc *bdev_desc, uint32_t lb_size); 205 static void vbdev_compress_submit_request(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io); 206 static void comp_bdev_ch_destroy_cb(void *io_device, void *ctx_buf); 207 static void vbdev_compress_delete_done(void *cb_arg, int bdeverrno); 208 209 /* Dummy function used by DPDK to free ext attached buffers 210 * to mbufs, we free them ourselves but this callback has to 211 * be here. 212 */ 213 static void 214 shinfo_free_cb(void *arg1, void *arg2) 215 { 216 } 217 218 /* Called by vbdev_init_compress_drivers() to init each discovered compression device */ 219 static int 220 create_compress_dev(uint8_t index) 221 { 222 struct compress_dev *device; 223 uint16_t q_pairs; 224 uint8_t cdev_id; 225 int rc, i; 226 struct comp_device_qp *dev_qp; 227 struct comp_device_qp *tmp_qp; 228 229 device = calloc(1, sizeof(struct compress_dev)); 230 if (!device) { 231 return -ENOMEM; 232 } 233 234 /* Get details about this device. */ 235 rte_compressdev_info_get(index, &device->cdev_info); 236 237 cdev_id = device->cdev_id = index; 238 239 /* Zero means no limit so choose number of lcores. */ 240 if (device->cdev_info.max_nb_queue_pairs == 0) { 241 q_pairs = MAX_NUM_QP; 242 } else { 243 q_pairs = spdk_min(device->cdev_info.max_nb_queue_pairs, MAX_NUM_QP); 244 } 245 246 /* Configure the compression device. */ 247 struct rte_compressdev_config config = { 248 .socket_id = rte_socket_id(), 249 .nb_queue_pairs = q_pairs, 250 .max_nb_priv_xforms = NUM_MAX_XFORMS, 251 .max_nb_streams = 0 252 }; 253 rc = rte_compressdev_configure(cdev_id, &config); 254 if (rc < 0) { 255 SPDK_ERRLOG("Failed to configure compressdev %u\n", cdev_id); 256 goto err; 257 } 258 259 /* Pre-setup all potential qpairs now and assign them in the channel 260 * callback. 261 */ 262 for (i = 0; i < q_pairs; i++) { 263 rc = rte_compressdev_queue_pair_setup(cdev_id, i, 264 NUM_MAX_INFLIGHT_OPS, 265 rte_socket_id()); 266 if (rc) { 267 if (i > 0) { 268 q_pairs = i; 269 SPDK_NOTICELOG("FYI failed to setup a queue pair on " 270 "compressdev %u with error %u " 271 "so limiting to %u qpairs\n", 272 cdev_id, rc, q_pairs); 273 break; 274 } else { 275 SPDK_ERRLOG("Failed to setup queue pair on " 276 "compressdev %u with error %u\n", cdev_id, rc); 277 rc = -EINVAL; 278 goto err; 279 } 280 } 281 } 282 283 rc = rte_compressdev_start(cdev_id); 284 if (rc < 0) { 285 SPDK_ERRLOG("Failed to start device %u: error %d\n", 286 cdev_id, rc); 287 goto err; 288 } 289 290 if (device->cdev_info.capabilities->comp_feature_flags & RTE_COMP_FF_SHAREABLE_PRIV_XFORM) { 291 rc = rte_compressdev_private_xform_create(cdev_id, &g_comp_xform, 292 &device->comp_xform); 293 if (rc < 0) { 294 SPDK_ERRLOG("Failed to create private comp xform device %u: error %d\n", 295 cdev_id, rc); 296 goto err; 297 } 298 299 rc = rte_compressdev_private_xform_create(cdev_id, &g_decomp_xform, 300 &device->decomp_xform); 301 if (rc) { 302 SPDK_ERRLOG("Failed to create private decomp xform device %u: error %d\n", 303 cdev_id, rc); 304 goto err; 305 } 306 } else { 307 SPDK_ERRLOG("PMD does not support shared transforms\n"); 308 goto err; 309 } 310 311 /* Build up list of device/qp combinations */ 312 for (i = 0; i < q_pairs; i++) { 313 dev_qp = calloc(1, sizeof(struct comp_device_qp)); 314 if (!dev_qp) { 315 rc = -ENOMEM; 316 goto err; 317 } 318 dev_qp->device = device; 319 dev_qp->qp = i; 320 dev_qp->thread = NULL; 321 TAILQ_INSERT_TAIL(&g_comp_device_qp, dev_qp, link); 322 } 323 324 TAILQ_INSERT_TAIL(&g_compress_devs, device, link); 325 326 if (strcmp(device->cdev_info.driver_name, QAT_PMD) == 0) { 327 g_qat_available = true; 328 } 329 if (strcmp(device->cdev_info.driver_name, ISAL_PMD) == 0) { 330 g_isal_available = true; 331 } 332 if (strcmp(device->cdev_info.driver_name, MLX5_PMD) == 0) { 333 g_mlx5_pci_available = true; 334 } 335 336 return 0; 337 338 err: 339 TAILQ_FOREACH_SAFE(dev_qp, &g_comp_device_qp, link, tmp_qp) { 340 TAILQ_REMOVE(&g_comp_device_qp, dev_qp, link); 341 free(dev_qp); 342 } 343 free(device); 344 return rc; 345 } 346 347 /* Called from driver init entry point, vbdev_compress_init() */ 348 static int 349 vbdev_init_compress_drivers(void) 350 { 351 uint8_t cdev_count, i; 352 struct compress_dev *tmp_dev; 353 struct compress_dev *device; 354 int rc; 355 356 /* We always init the compress_isal PMD */ 357 rc = rte_vdev_init(ISAL_PMD, NULL); 358 if (rc == 0) { 359 SPDK_NOTICELOG("created virtual PMD %s\n", ISAL_PMD); 360 } else if (rc == -EEXIST) { 361 SPDK_NOTICELOG("virtual PMD %s already exists.\n", ISAL_PMD); 362 } else { 363 SPDK_ERRLOG("creating virtual PMD %s\n", ISAL_PMD); 364 return -EINVAL; 365 } 366 367 /* If we have no compression devices, there's no reason to continue. */ 368 cdev_count = rte_compressdev_count(); 369 if (cdev_count == 0) { 370 return 0; 371 } 372 if (cdev_count > RTE_COMPRESS_MAX_DEVS) { 373 SPDK_ERRLOG("invalid device count from rte_compressdev_count()\n"); 374 return -EINVAL; 375 } 376 377 g_mbuf_offset = rte_mbuf_dynfield_register(&rte_mbuf_dynfield_io_context); 378 if (g_mbuf_offset < 0) { 379 SPDK_ERRLOG("error registering dynamic field with DPDK\n"); 380 return -EINVAL; 381 } 382 383 g_mbuf_mp = rte_pktmbuf_pool_create("comp_mbuf_mp", NUM_MBUFS, POOL_CACHE_SIZE, 384 sizeof(struct rte_mbuf), 0, rte_socket_id()); 385 if (g_mbuf_mp == NULL) { 386 SPDK_ERRLOG("Cannot create mbuf pool\n"); 387 rc = -ENOMEM; 388 goto error_create_mbuf; 389 } 390 391 g_comp_op_mp = rte_comp_op_pool_create("comp_op_pool", NUM_MBUFS, POOL_CACHE_SIZE, 392 0, rte_socket_id()); 393 if (g_comp_op_mp == NULL) { 394 SPDK_ERRLOG("Cannot create comp op pool\n"); 395 rc = -ENOMEM; 396 goto error_create_op; 397 } 398 399 /* Init all devices */ 400 for (i = 0; i < cdev_count; i++) { 401 rc = create_compress_dev(i); 402 if (rc != 0) { 403 goto error_create_compress_devs; 404 } 405 } 406 407 if (g_qat_available == true) { 408 SPDK_NOTICELOG("initialized QAT PMD\n"); 409 } 410 411 g_shinfo.free_cb = shinfo_free_cb; 412 413 return 0; 414 415 /* Error cleanup paths. */ 416 error_create_compress_devs: 417 TAILQ_FOREACH_SAFE(device, &g_compress_devs, link, tmp_dev) { 418 TAILQ_REMOVE(&g_compress_devs, device, link); 419 free(device); 420 } 421 error_create_op: 422 error_create_mbuf: 423 rte_mempool_free(g_mbuf_mp); 424 425 return rc; 426 } 427 428 /* for completing rw requests on the orig IO thread. */ 429 static void 430 _reduce_rw_blocks_cb(void *arg) 431 { 432 struct comp_bdev_io *io_ctx = arg; 433 434 if (io_ctx->status == 0) { 435 spdk_bdev_io_complete(io_ctx->orig_io, SPDK_BDEV_IO_STATUS_SUCCESS); 436 } else { 437 SPDK_ERRLOG("status %d on operation from reduce API\n", io_ctx->status); 438 spdk_bdev_io_complete(io_ctx->orig_io, SPDK_BDEV_IO_STATUS_FAILED); 439 } 440 } 441 442 /* Completion callback for r/w that were issued via reducelib. */ 443 static void 444 reduce_rw_blocks_cb(void *arg, int reduce_errno) 445 { 446 struct spdk_bdev_io *bdev_io = arg; 447 struct comp_bdev_io *io_ctx = (struct comp_bdev_io *)bdev_io->driver_ctx; 448 struct spdk_io_channel *ch = spdk_io_channel_from_ctx(io_ctx->comp_ch); 449 struct spdk_thread *orig_thread; 450 451 /* TODO: need to decide which error codes are bdev_io success vs failure; 452 * example examine calls reading metadata */ 453 454 io_ctx->status = reduce_errno; 455 456 /* Send this request to the orig IO thread. */ 457 orig_thread = spdk_io_channel_get_thread(ch); 458 459 spdk_thread_exec_msg(orig_thread, _reduce_rw_blocks_cb, io_ctx); 460 } 461 462 static uint64_t 463 _setup_compress_mbuf(struct rte_mbuf **mbufs, int *mbuf_total, uint64_t *total_length, 464 struct iovec *iovs, int iovcnt, void *reduce_cb_arg) 465 { 466 uint64_t updated_length, remainder, phys_addr; 467 uint8_t *current_base = NULL; 468 int iov_index, mbuf_index; 469 int rc = 0; 470 471 /* Setup mbufs */ 472 iov_index = mbuf_index = 0; 473 while (iov_index < iovcnt) { 474 475 current_base = iovs[iov_index].iov_base; 476 if (total_length) { 477 *total_length += iovs[iov_index].iov_len; 478 } 479 assert(mbufs[mbuf_index] != NULL); 480 *RTE_MBUF_DYNFIELD(mbufs[mbuf_index], g_mbuf_offset, uint64_t *) = (uint64_t)reduce_cb_arg; 481 updated_length = iovs[iov_index].iov_len; 482 phys_addr = spdk_vtophys((void *)current_base, &updated_length); 483 484 rte_pktmbuf_attach_extbuf(mbufs[mbuf_index], 485 current_base, 486 phys_addr, 487 updated_length, 488 &g_shinfo); 489 rte_pktmbuf_append(mbufs[mbuf_index], updated_length); 490 remainder = iovs[iov_index].iov_len - updated_length; 491 492 if (mbuf_index > 0) { 493 rte_pktmbuf_chain(mbufs[0], mbufs[mbuf_index]); 494 } 495 496 /* If we crossed 2 2MB boundary we need another mbuf for the remainder */ 497 if (remainder > 0) { 498 /* allocate an mbuf at the end of the array */ 499 rc = rte_pktmbuf_alloc_bulk(g_mbuf_mp, 500 (struct rte_mbuf **)&mbufs[*mbuf_total], 1); 501 if (rc) { 502 SPDK_ERRLOG("ERROR trying to get an extra mbuf!\n"); 503 return -1; 504 } 505 (*mbuf_total)++; 506 mbuf_index++; 507 *RTE_MBUF_DYNFIELD(mbufs[mbuf_index], g_mbuf_offset, uint64_t *) = (uint64_t)reduce_cb_arg; 508 current_base += updated_length; 509 phys_addr = spdk_vtophys((void *)current_base, &remainder); 510 /* assert we don't cross another */ 511 assert(remainder == iovs[iov_index].iov_len - updated_length); 512 513 rte_pktmbuf_attach_extbuf(mbufs[mbuf_index], 514 current_base, 515 phys_addr, 516 remainder, 517 &g_shinfo); 518 rte_pktmbuf_append(mbufs[mbuf_index], remainder); 519 rte_pktmbuf_chain(mbufs[0], mbufs[mbuf_index]); 520 } 521 iov_index++; 522 mbuf_index++; 523 } 524 525 return 0; 526 } 527 528 static int 529 _compress_operation(struct spdk_reduce_backing_dev *backing_dev, struct iovec *src_iovs, 530 int src_iovcnt, struct iovec *dst_iovs, 531 int dst_iovcnt, bool compress, void *cb_arg) 532 { 533 void *reduce_cb_arg = cb_arg; 534 struct vbdev_compress *comp_bdev = SPDK_CONTAINEROF(backing_dev, struct vbdev_compress, 535 backing_dev); 536 struct rte_comp_op *comp_op; 537 struct rte_mbuf *src_mbufs[MAX_MBUFS_PER_OP]; 538 struct rte_mbuf *dst_mbufs[MAX_MBUFS_PER_OP]; 539 uint8_t cdev_id = comp_bdev->device_qp->device->cdev_id; 540 uint64_t total_length = 0; 541 int rc = 0; 542 struct vbdev_comp_op *op_to_queue; 543 int i; 544 int src_mbuf_total = src_iovcnt; 545 int dst_mbuf_total = dst_iovcnt; 546 bool device_error = false; 547 548 assert(src_iovcnt < MAX_MBUFS_PER_OP); 549 550 #ifdef DEBUG 551 memset(src_mbufs, 0, sizeof(src_mbufs)); 552 memset(dst_mbufs, 0, sizeof(dst_mbufs)); 553 #endif 554 555 comp_op = rte_comp_op_alloc(g_comp_op_mp); 556 if (!comp_op) { 557 SPDK_ERRLOG("trying to get a comp op!\n"); 558 goto error_get_op; 559 } 560 561 /* get an mbuf per iov, src and dst */ 562 rc = rte_pktmbuf_alloc_bulk(g_mbuf_mp, (struct rte_mbuf **)&src_mbufs[0], src_iovcnt); 563 if (rc) { 564 SPDK_ERRLOG("ERROR trying to get src_mbufs!\n"); 565 goto error_get_src; 566 } 567 568 rc = rte_pktmbuf_alloc_bulk(g_mbuf_mp, (struct rte_mbuf **)&dst_mbufs[0], dst_iovcnt); 569 if (rc) { 570 SPDK_ERRLOG("ERROR trying to get dst_mbufs!\n"); 571 goto error_get_dst; 572 } 573 574 /* There is a 1:1 mapping between a bdev_io and a compression operation, but 575 * all compression PMDs that SPDK uses support chaining so build our mbuf chain 576 * and associate with our single comp_op. 577 */ 578 579 rc = _setup_compress_mbuf(&src_mbufs[0], &src_mbuf_total, &total_length, 580 src_iovs, src_iovcnt, reduce_cb_arg); 581 if (rc < 0) { 582 goto error_src_dst; 583 } 584 585 comp_op->m_src = src_mbufs[0]; 586 comp_op->src.offset = 0; 587 comp_op->src.length = total_length; 588 589 /* setup dst mbufs, for the current test being used with this code there's only one vector */ 590 rc = _setup_compress_mbuf(&dst_mbufs[0], &dst_mbuf_total, NULL, 591 dst_iovs, dst_iovcnt, reduce_cb_arg); 592 if (rc < 0) { 593 goto error_src_dst; 594 } 595 596 comp_op->m_dst = dst_mbufs[0]; 597 comp_op->dst.offset = 0; 598 599 if (compress == true) { 600 comp_op->private_xform = comp_bdev->device_qp->device->comp_xform; 601 } else { 602 comp_op->private_xform = comp_bdev->device_qp->device->decomp_xform; 603 } 604 605 comp_op->op_type = RTE_COMP_OP_STATELESS; 606 comp_op->flush_flag = RTE_COMP_FLUSH_FINAL; 607 608 rc = rte_compressdev_enqueue_burst(cdev_id, comp_bdev->device_qp->qp, &comp_op, 1); 609 assert(rc <= 1); 610 611 /* We always expect 1 got queued, if 0 then we need to queue it up. */ 612 if (rc == 1) { 613 return 0; 614 } else if (comp_op->status == RTE_COMP_OP_STATUS_NOT_PROCESSED) { 615 /* we free mbufs differently depending on whether they were chained or not */ 616 rte_pktmbuf_free(comp_op->m_src); 617 rte_pktmbuf_free(comp_op->m_dst); 618 goto error_enqueue; 619 } else { 620 device_error = true; 621 goto error_src_dst; 622 } 623 624 /* Error cleanup paths. */ 625 error_src_dst: 626 for (i = 0; i < dst_mbuf_total; i++) { 627 rte_pktmbuf_free((struct rte_mbuf *)&dst_mbufs[i]); 628 } 629 error_get_dst: 630 for (i = 0; i < src_mbuf_total; i++) { 631 rte_pktmbuf_free((struct rte_mbuf *)&src_mbufs[i]); 632 } 633 error_get_src: 634 error_enqueue: 635 rte_comp_op_free(comp_op); 636 error_get_op: 637 638 if (device_error == true) { 639 /* There was an error sending the op to the device, most 640 * likely with the parameters. 641 */ 642 SPDK_ERRLOG("Compression API returned 0x%x\n", comp_op->status); 643 return -EINVAL; 644 } 645 646 op_to_queue = calloc(1, sizeof(struct vbdev_comp_op)); 647 if (op_to_queue == NULL) { 648 SPDK_ERRLOG("unable to allocate operation for queueing.\n"); 649 return -ENOMEM; 650 } 651 op_to_queue->backing_dev = backing_dev; 652 op_to_queue->src_iovs = src_iovs; 653 op_to_queue->src_iovcnt = src_iovcnt; 654 op_to_queue->dst_iovs = dst_iovs; 655 op_to_queue->dst_iovcnt = dst_iovcnt; 656 op_to_queue->compress = compress; 657 op_to_queue->cb_arg = cb_arg; 658 TAILQ_INSERT_TAIL(&comp_bdev->queued_comp_ops, 659 op_to_queue, 660 link); 661 return 0; 662 } 663 664 /* Poller for the DPDK compression driver. */ 665 static int 666 comp_dev_poller(void *args) 667 { 668 struct vbdev_compress *comp_bdev = args; 669 uint8_t cdev_id = comp_bdev->device_qp->device->cdev_id; 670 struct rte_comp_op *deq_ops[NUM_MAX_INFLIGHT_OPS]; 671 uint16_t num_deq; 672 struct spdk_reduce_vol_cb_args *reduce_args; 673 struct vbdev_comp_op *op_to_resubmit; 674 int rc, i; 675 676 num_deq = rte_compressdev_dequeue_burst(cdev_id, comp_bdev->device_qp->qp, deq_ops, 677 NUM_MAX_INFLIGHT_OPS); 678 for (i = 0; i < num_deq; i++) { 679 reduce_args = (struct spdk_reduce_vol_cb_args *)*RTE_MBUF_DYNFIELD(deq_ops[i]->m_src, g_mbuf_offset, 680 uint64_t *); 681 if (deq_ops[i]->status == RTE_COMP_OP_STATUS_SUCCESS) { 682 683 /* tell reduce this is done and what the bytecount was */ 684 reduce_args->cb_fn(reduce_args->cb_arg, deq_ops[i]->produced); 685 } else { 686 SPDK_NOTICELOG("FYI storing data uncompressed due to deque status %u\n", 687 deq_ops[i]->status); 688 689 /* Reduce will simply store uncompressed on neg errno value. */ 690 reduce_args->cb_fn(reduce_args->cb_arg, -EINVAL); 691 } 692 693 /* Now free both mbufs and the compress operation. The rte_pktmbuf_free() 694 * call takes care of freeing all of the mbufs in the chain back to their 695 * original pool. 696 */ 697 rte_pktmbuf_free(deq_ops[i]->m_src); 698 rte_pktmbuf_free(deq_ops[i]->m_dst); 699 700 /* There is no bulk free for com ops so we have to free them one at a time 701 * here however it would be rare that we'd ever have more than 1 at a time 702 * anyways. 703 */ 704 rte_comp_op_free(deq_ops[i]); 705 706 /* Check if there are any pending comp ops to process, only pull one 707 * at a time off as _compress_operation() may re-queue the op. 708 */ 709 if (!TAILQ_EMPTY(&comp_bdev->queued_comp_ops)) { 710 op_to_resubmit = TAILQ_FIRST(&comp_bdev->queued_comp_ops); 711 rc = _compress_operation(op_to_resubmit->backing_dev, 712 op_to_resubmit->src_iovs, 713 op_to_resubmit->src_iovcnt, 714 op_to_resubmit->dst_iovs, 715 op_to_resubmit->dst_iovcnt, 716 op_to_resubmit->compress, 717 op_to_resubmit->cb_arg); 718 if (rc == 0) { 719 TAILQ_REMOVE(&comp_bdev->queued_comp_ops, op_to_resubmit, link); 720 free(op_to_resubmit); 721 } 722 } 723 } 724 return num_deq == 0 ? SPDK_POLLER_IDLE : SPDK_POLLER_BUSY; 725 } 726 727 /* Entry point for reduce lib to issue a compress operation. */ 728 static void 729 _comp_reduce_compress(struct spdk_reduce_backing_dev *dev, 730 struct iovec *src_iovs, int src_iovcnt, 731 struct iovec *dst_iovs, int dst_iovcnt, 732 struct spdk_reduce_vol_cb_args *cb_arg) 733 { 734 int rc; 735 736 rc = _compress_operation(dev, src_iovs, src_iovcnt, dst_iovs, dst_iovcnt, true, cb_arg); 737 if (rc) { 738 SPDK_ERRLOG("with compress operation code %d (%s)\n", rc, spdk_strerror(-rc)); 739 cb_arg->cb_fn(cb_arg->cb_arg, rc); 740 } 741 } 742 743 /* Entry point for reduce lib to issue a decompress operation. */ 744 static void 745 _comp_reduce_decompress(struct spdk_reduce_backing_dev *dev, 746 struct iovec *src_iovs, int src_iovcnt, 747 struct iovec *dst_iovs, int dst_iovcnt, 748 struct spdk_reduce_vol_cb_args *cb_arg) 749 { 750 int rc; 751 752 rc = _compress_operation(dev, src_iovs, src_iovcnt, dst_iovs, dst_iovcnt, false, cb_arg); 753 if (rc) { 754 SPDK_ERRLOG("with decompress operation code %d (%s)\n", rc, spdk_strerror(-rc)); 755 cb_arg->cb_fn(cb_arg->cb_arg, rc); 756 } 757 } 758 759 /* Callback for getting a buf from the bdev pool in the event that the caller passed 760 * in NULL, we need to own the buffer so it doesn't get freed by another vbdev module 761 * beneath us before we're done with it. 762 */ 763 static void 764 comp_read_get_buf_cb(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io, bool success) 765 { 766 struct vbdev_compress *comp_bdev = SPDK_CONTAINEROF(bdev_io->bdev, struct vbdev_compress, 767 comp_bdev); 768 769 spdk_reduce_vol_readv(comp_bdev->vol, bdev_io->u.bdev.iovs, bdev_io->u.bdev.iovcnt, 770 bdev_io->u.bdev.offset_blocks, bdev_io->u.bdev.num_blocks, 771 reduce_rw_blocks_cb, bdev_io); 772 } 773 774 /* scheduled for completion on IO thread */ 775 static void 776 _complete_other_io(void *arg) 777 { 778 struct comp_bdev_io *io_ctx = (struct comp_bdev_io *)arg; 779 if (io_ctx->status == 0) { 780 spdk_bdev_io_complete(io_ctx->orig_io, SPDK_BDEV_IO_STATUS_SUCCESS); 781 } else { 782 spdk_bdev_io_complete(io_ctx->orig_io, SPDK_BDEV_IO_STATUS_FAILED); 783 } 784 } 785 786 /* scheduled for submission on reduce thread */ 787 static void 788 _comp_bdev_io_submit(void *arg) 789 { 790 struct spdk_bdev_io *bdev_io = arg; 791 struct comp_bdev_io *io_ctx = (struct comp_bdev_io *)bdev_io->driver_ctx; 792 struct spdk_io_channel *ch = spdk_io_channel_from_ctx(io_ctx->comp_ch); 793 struct vbdev_compress *comp_bdev = SPDK_CONTAINEROF(bdev_io->bdev, struct vbdev_compress, 794 comp_bdev); 795 struct spdk_thread *orig_thread; 796 int rc = 0; 797 798 switch (bdev_io->type) { 799 case SPDK_BDEV_IO_TYPE_READ: 800 spdk_bdev_io_get_buf(bdev_io, comp_read_get_buf_cb, 801 bdev_io->u.bdev.num_blocks * bdev_io->bdev->blocklen); 802 return; 803 case SPDK_BDEV_IO_TYPE_WRITE: 804 spdk_reduce_vol_writev(comp_bdev->vol, bdev_io->u.bdev.iovs, bdev_io->u.bdev.iovcnt, 805 bdev_io->u.bdev.offset_blocks, bdev_io->u.bdev.num_blocks, 806 reduce_rw_blocks_cb, bdev_io); 807 return; 808 /* TODO in future patch in the series */ 809 case SPDK_BDEV_IO_TYPE_RESET: 810 break; 811 case SPDK_BDEV_IO_TYPE_WRITE_ZEROES: 812 case SPDK_BDEV_IO_TYPE_UNMAP: 813 case SPDK_BDEV_IO_TYPE_FLUSH: 814 default: 815 SPDK_ERRLOG("Unknown I/O type %d\n", bdev_io->type); 816 rc = -EINVAL; 817 } 818 819 if (rc) { 820 if (rc == -ENOMEM) { 821 SPDK_ERRLOG("No memory, start to queue io for compress.\n"); 822 io_ctx->ch = ch; 823 vbdev_compress_queue_io(bdev_io); 824 return; 825 } else { 826 SPDK_ERRLOG("on bdev_io submission!\n"); 827 io_ctx->status = rc; 828 } 829 } 830 831 /* Complete this on the orig IO thread. */ 832 orig_thread = spdk_io_channel_get_thread(ch); 833 if (orig_thread != spdk_get_thread()) { 834 spdk_thread_send_msg(orig_thread, _complete_other_io, io_ctx); 835 } else { 836 _complete_other_io(io_ctx); 837 } 838 } 839 840 /* Called when someone above submits IO to this vbdev. */ 841 static void 842 vbdev_compress_submit_request(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io) 843 { 844 struct comp_bdev_io *io_ctx = (struct comp_bdev_io *)bdev_io->driver_ctx; 845 struct vbdev_compress *comp_bdev = SPDK_CONTAINEROF(bdev_io->bdev, struct vbdev_compress, 846 comp_bdev); 847 struct comp_io_channel *comp_ch = spdk_io_channel_get_ctx(ch); 848 849 memset(io_ctx, 0, sizeof(struct comp_bdev_io)); 850 io_ctx->comp_bdev = comp_bdev; 851 io_ctx->comp_ch = comp_ch; 852 io_ctx->orig_io = bdev_io; 853 854 /* Send this request to the reduce_thread if that's not what we're on. */ 855 if (spdk_get_thread() != comp_bdev->reduce_thread) { 856 spdk_thread_send_msg(comp_bdev->reduce_thread, _comp_bdev_io_submit, bdev_io); 857 } else { 858 _comp_bdev_io_submit(bdev_io); 859 } 860 } 861 862 static bool 863 vbdev_compress_io_type_supported(void *ctx, enum spdk_bdev_io_type io_type) 864 { 865 struct vbdev_compress *comp_bdev = (struct vbdev_compress *)ctx; 866 867 switch (io_type) { 868 case SPDK_BDEV_IO_TYPE_READ: 869 case SPDK_BDEV_IO_TYPE_WRITE: 870 return spdk_bdev_io_type_supported(comp_bdev->base_bdev, io_type); 871 case SPDK_BDEV_IO_TYPE_UNMAP: 872 case SPDK_BDEV_IO_TYPE_RESET: 873 case SPDK_BDEV_IO_TYPE_FLUSH: 874 case SPDK_BDEV_IO_TYPE_WRITE_ZEROES: 875 default: 876 return false; 877 } 878 } 879 880 /* Resubmission function used by the bdev layer when a queued IO is ready to be 881 * submitted. 882 */ 883 static void 884 vbdev_compress_resubmit_io(void *arg) 885 { 886 struct spdk_bdev_io *bdev_io = (struct spdk_bdev_io *)arg; 887 struct comp_bdev_io *io_ctx = (struct comp_bdev_io *)bdev_io->driver_ctx; 888 889 vbdev_compress_submit_request(io_ctx->ch, bdev_io); 890 } 891 892 /* Used to queue an IO in the event of resource issues. */ 893 static void 894 vbdev_compress_queue_io(struct spdk_bdev_io *bdev_io) 895 { 896 struct comp_bdev_io *io_ctx = (struct comp_bdev_io *)bdev_io->driver_ctx; 897 int rc; 898 899 io_ctx->bdev_io_wait.bdev = bdev_io->bdev; 900 io_ctx->bdev_io_wait.cb_fn = vbdev_compress_resubmit_io; 901 io_ctx->bdev_io_wait.cb_arg = bdev_io; 902 903 rc = spdk_bdev_queue_io_wait(bdev_io->bdev, io_ctx->comp_bdev->base_ch, &io_ctx->bdev_io_wait); 904 if (rc) { 905 SPDK_ERRLOG("Queue io failed in vbdev_compress_queue_io, rc=%d.\n", rc); 906 assert(false); 907 spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED); 908 } 909 } 910 911 /* Callback for unregistering the IO device. */ 912 static void 913 _device_unregister_cb(void *io_device) 914 { 915 struct vbdev_compress *comp_bdev = io_device; 916 917 /* Done with this comp_bdev. */ 918 pthread_mutex_destroy(&comp_bdev->reduce_lock); 919 free(comp_bdev->comp_bdev.name); 920 free(comp_bdev); 921 } 922 923 static void 924 _vbdev_compress_destruct_cb(void *ctx) 925 { 926 struct vbdev_compress *comp_bdev = ctx; 927 928 TAILQ_REMOVE(&g_vbdev_comp, comp_bdev, link); 929 spdk_bdev_module_release_bdev(comp_bdev->base_bdev); 930 /* Close the underlying bdev on its same opened thread. */ 931 spdk_bdev_close(comp_bdev->base_desc); 932 comp_bdev->vol = NULL; 933 if (comp_bdev->orphaned == false) { 934 spdk_io_device_unregister(comp_bdev, _device_unregister_cb); 935 } else { 936 vbdev_compress_delete_done(comp_bdev->delete_ctx, 0); 937 _device_unregister_cb(comp_bdev); 938 } 939 } 940 941 static void 942 vbdev_compress_destruct_cb(void *cb_arg, int reduce_errno) 943 { 944 struct vbdev_compress *comp_bdev = (struct vbdev_compress *)cb_arg; 945 946 if (reduce_errno) { 947 SPDK_ERRLOG("number %d\n", reduce_errno); 948 } else { 949 if (comp_bdev->thread && comp_bdev->thread != spdk_get_thread()) { 950 spdk_thread_send_msg(comp_bdev->thread, 951 _vbdev_compress_destruct_cb, comp_bdev); 952 } else { 953 _vbdev_compress_destruct_cb(comp_bdev); 954 } 955 } 956 } 957 958 static void 959 _reduce_destroy_cb(void *ctx, int reduce_errno) 960 { 961 struct vbdev_compress *comp_bdev = (struct vbdev_compress *)ctx; 962 963 if (reduce_errno) { 964 SPDK_ERRLOG("number %d\n", reduce_errno); 965 } 966 967 comp_bdev->vol = NULL; 968 spdk_put_io_channel(comp_bdev->base_ch); 969 if (comp_bdev->orphaned == false) { 970 spdk_bdev_unregister(&comp_bdev->comp_bdev, vbdev_compress_delete_done, 971 comp_bdev->delete_ctx); 972 } else { 973 vbdev_compress_destruct_cb((void *)comp_bdev, 0); 974 } 975 976 } 977 978 static void 979 _delete_vol_unload_cb(void *ctx) 980 { 981 struct vbdev_compress *comp_bdev = ctx; 982 983 /* FIXME: Assert if these conditions are not satisfied for now. */ 984 assert(!comp_bdev->reduce_thread || 985 comp_bdev->reduce_thread == spdk_get_thread()); 986 987 /* reducelib needs a channel to comm with the backing device */ 988 comp_bdev->base_ch = spdk_bdev_get_io_channel(comp_bdev->base_desc); 989 990 /* Clean the device before we free our resources. */ 991 spdk_reduce_vol_destroy(&comp_bdev->backing_dev, _reduce_destroy_cb, comp_bdev); 992 } 993 994 /* Called by reduceLib after performing unload vol actions */ 995 static void 996 delete_vol_unload_cb(void *cb_arg, int reduce_errno) 997 { 998 struct vbdev_compress *comp_bdev = (struct vbdev_compress *)cb_arg; 999 1000 if (reduce_errno) { 1001 SPDK_ERRLOG("number %d\n", reduce_errno); 1002 /* FIXME: callback should be executed. */ 1003 return; 1004 } 1005 1006 pthread_mutex_lock(&comp_bdev->reduce_lock); 1007 if (comp_bdev->reduce_thread && comp_bdev->reduce_thread != spdk_get_thread()) { 1008 spdk_thread_send_msg(comp_bdev->reduce_thread, 1009 _delete_vol_unload_cb, comp_bdev); 1010 pthread_mutex_unlock(&comp_bdev->reduce_lock); 1011 } else { 1012 pthread_mutex_unlock(&comp_bdev->reduce_lock); 1013 1014 _delete_vol_unload_cb(comp_bdev); 1015 } 1016 } 1017 1018 const char * 1019 compress_get_name(const struct vbdev_compress *comp_bdev) 1020 { 1021 return comp_bdev->comp_bdev.name; 1022 } 1023 1024 struct vbdev_compress * 1025 compress_bdev_first(void) 1026 { 1027 struct vbdev_compress *comp_bdev; 1028 1029 comp_bdev = TAILQ_FIRST(&g_vbdev_comp); 1030 1031 return comp_bdev; 1032 } 1033 1034 struct vbdev_compress * 1035 compress_bdev_next(struct vbdev_compress *prev) 1036 { 1037 struct vbdev_compress *comp_bdev; 1038 1039 comp_bdev = TAILQ_NEXT(prev, link); 1040 1041 return comp_bdev; 1042 } 1043 1044 bool 1045 compress_has_orphan(const char *name) 1046 { 1047 struct vbdev_compress *comp_bdev; 1048 1049 TAILQ_FOREACH(comp_bdev, &g_vbdev_comp, link) { 1050 if (comp_bdev->orphaned && strcmp(name, comp_bdev->comp_bdev.name) == 0) { 1051 return true; 1052 } 1053 } 1054 return false; 1055 } 1056 1057 /* Called after we've unregistered following a hot remove callback. 1058 * Our finish entry point will be called next. 1059 */ 1060 static int 1061 vbdev_compress_destruct(void *ctx) 1062 { 1063 struct vbdev_compress *comp_bdev = (struct vbdev_compress *)ctx; 1064 1065 if (comp_bdev->vol != NULL) { 1066 /* Tell reducelib that we're done with this volume. */ 1067 spdk_reduce_vol_unload(comp_bdev->vol, vbdev_compress_destruct_cb, comp_bdev); 1068 } else { 1069 vbdev_compress_destruct_cb(comp_bdev, 0); 1070 } 1071 1072 return 0; 1073 } 1074 1075 /* We supplied this as an entry point for upper layers who want to communicate to this 1076 * bdev. This is how they get a channel. 1077 */ 1078 static struct spdk_io_channel * 1079 vbdev_compress_get_io_channel(void *ctx) 1080 { 1081 struct vbdev_compress *comp_bdev = (struct vbdev_compress *)ctx; 1082 1083 /* The IO channel code will allocate a channel for us which consists of 1084 * the SPDK channel structure plus the size of our comp_io_channel struct 1085 * that we passed in when we registered our IO device. It will then call 1086 * our channel create callback to populate any elements that we need to 1087 * update. 1088 */ 1089 return spdk_get_io_channel(comp_bdev); 1090 } 1091 1092 /* This is the output for bdev_get_bdevs() for this vbdev */ 1093 static int 1094 vbdev_compress_dump_info_json(void *ctx, struct spdk_json_write_ctx *w) 1095 { 1096 struct vbdev_compress *comp_bdev = (struct vbdev_compress *)ctx; 1097 1098 spdk_json_write_name(w, "compress"); 1099 spdk_json_write_object_begin(w); 1100 spdk_json_write_named_string(w, "name", spdk_bdev_get_name(&comp_bdev->comp_bdev)); 1101 spdk_json_write_named_string(w, "base_bdev_name", spdk_bdev_get_name(comp_bdev->base_bdev)); 1102 spdk_json_write_named_string(w, "compression_pmd", comp_bdev->drv_name); 1103 spdk_json_write_object_end(w); 1104 1105 return 0; 1106 } 1107 1108 /* This is used to generate JSON that can configure this module to its current state. */ 1109 static int 1110 vbdev_compress_config_json(struct spdk_json_write_ctx *w) 1111 { 1112 struct vbdev_compress *comp_bdev; 1113 1114 TAILQ_FOREACH(comp_bdev, &g_vbdev_comp, link) { 1115 spdk_json_write_object_begin(w); 1116 spdk_json_write_named_string(w, "method", "bdev_compress_create"); 1117 spdk_json_write_named_object_begin(w, "params"); 1118 spdk_json_write_named_string(w, "base_bdev_name", spdk_bdev_get_name(comp_bdev->base_bdev)); 1119 spdk_json_write_named_string(w, "name", spdk_bdev_get_name(&comp_bdev->comp_bdev)); 1120 spdk_json_write_named_string(w, "compression_pmd", comp_bdev->drv_name); 1121 spdk_json_write_object_end(w); 1122 spdk_json_write_object_end(w); 1123 } 1124 return 0; 1125 } 1126 1127 static void 1128 _vbdev_reduce_init_cb(void *ctx) 1129 { 1130 struct vbdev_compress *meta_ctx = ctx; 1131 int rc; 1132 1133 assert(meta_ctx->base_desc != NULL); 1134 1135 /* We're done with metadata operations */ 1136 spdk_put_io_channel(meta_ctx->base_ch); 1137 1138 if (meta_ctx->vol) { 1139 rc = vbdev_compress_claim(meta_ctx); 1140 if (rc == 0) { 1141 return; 1142 } 1143 } 1144 1145 /* Close the underlying bdev on its same opened thread. */ 1146 spdk_bdev_close(meta_ctx->base_desc); 1147 free(meta_ctx); 1148 } 1149 1150 /* Callback from reduce for when init is complete. We'll pass the vbdev_comp struct 1151 * used for initial metadata operations to claim where it will be further filled out 1152 * and added to the global list. 1153 */ 1154 static void 1155 vbdev_reduce_init_cb(void *cb_arg, struct spdk_reduce_vol *vol, int reduce_errno) 1156 { 1157 struct vbdev_compress *meta_ctx = cb_arg; 1158 1159 if (reduce_errno == 0) { 1160 meta_ctx->vol = vol; 1161 } else { 1162 SPDK_ERRLOG("for vol %s, error %u\n", 1163 spdk_bdev_get_name(meta_ctx->base_bdev), reduce_errno); 1164 } 1165 1166 if (meta_ctx->thread && meta_ctx->thread != spdk_get_thread()) { 1167 spdk_thread_send_msg(meta_ctx->thread, _vbdev_reduce_init_cb, meta_ctx); 1168 } else { 1169 _vbdev_reduce_init_cb(meta_ctx); 1170 } 1171 } 1172 1173 /* Callback for the function used by reduceLib to perform IO to/from the backing device. We just 1174 * call the callback provided by reduceLib when it called the read/write/unmap function and 1175 * free the bdev_io. 1176 */ 1177 static void 1178 comp_reduce_io_cb(struct spdk_bdev_io *bdev_io, bool success, void *arg) 1179 { 1180 struct spdk_reduce_vol_cb_args *cb_args = arg; 1181 int reduce_errno; 1182 1183 if (success) { 1184 reduce_errno = 0; 1185 } else { 1186 reduce_errno = -EIO; 1187 } 1188 spdk_bdev_free_io(bdev_io); 1189 cb_args->cb_fn(cb_args->cb_arg, reduce_errno); 1190 } 1191 1192 /* This is the function provided to the reduceLib for sending reads directly to 1193 * the backing device. 1194 */ 1195 static void 1196 _comp_reduce_readv(struct spdk_reduce_backing_dev *dev, struct iovec *iov, int iovcnt, 1197 uint64_t lba, uint32_t lba_count, struct spdk_reduce_vol_cb_args *args) 1198 { 1199 struct vbdev_compress *comp_bdev = SPDK_CONTAINEROF(dev, struct vbdev_compress, 1200 backing_dev); 1201 int rc; 1202 1203 rc = spdk_bdev_readv_blocks(comp_bdev->base_desc, comp_bdev->base_ch, 1204 iov, iovcnt, lba, lba_count, 1205 comp_reduce_io_cb, 1206 args); 1207 if (rc) { 1208 if (rc == -ENOMEM) { 1209 SPDK_ERRLOG("No memory, start to queue io.\n"); 1210 /* TODO: there's no bdev_io to queue */ 1211 } else { 1212 SPDK_ERRLOG("submitting readv request\n"); 1213 } 1214 args->cb_fn(args->cb_arg, rc); 1215 } 1216 } 1217 1218 /* This is the function provided to the reduceLib for sending writes directly to 1219 * the backing device. 1220 */ 1221 static void 1222 _comp_reduce_writev(struct spdk_reduce_backing_dev *dev, struct iovec *iov, int iovcnt, 1223 uint64_t lba, uint32_t lba_count, struct spdk_reduce_vol_cb_args *args) 1224 { 1225 struct vbdev_compress *comp_bdev = SPDK_CONTAINEROF(dev, struct vbdev_compress, 1226 backing_dev); 1227 int rc; 1228 1229 rc = spdk_bdev_writev_blocks(comp_bdev->base_desc, comp_bdev->base_ch, 1230 iov, iovcnt, lba, lba_count, 1231 comp_reduce_io_cb, 1232 args); 1233 if (rc) { 1234 if (rc == -ENOMEM) { 1235 SPDK_ERRLOG("No memory, start to queue io.\n"); 1236 /* TODO: there's no bdev_io to queue */ 1237 } else { 1238 SPDK_ERRLOG("error submitting writev request\n"); 1239 } 1240 args->cb_fn(args->cb_arg, rc); 1241 } 1242 } 1243 1244 /* This is the function provided to the reduceLib for sending unmaps directly to 1245 * the backing device. 1246 */ 1247 static void 1248 _comp_reduce_unmap(struct spdk_reduce_backing_dev *dev, 1249 uint64_t lba, uint32_t lba_count, struct spdk_reduce_vol_cb_args *args) 1250 { 1251 struct vbdev_compress *comp_bdev = SPDK_CONTAINEROF(dev, struct vbdev_compress, 1252 backing_dev); 1253 int rc; 1254 1255 rc = spdk_bdev_unmap_blocks(comp_bdev->base_desc, comp_bdev->base_ch, 1256 lba, lba_count, 1257 comp_reduce_io_cb, 1258 args); 1259 1260 if (rc) { 1261 if (rc == -ENOMEM) { 1262 SPDK_ERRLOG("No memory, start to queue io.\n"); 1263 /* TODO: there's no bdev_io to queue */ 1264 } else { 1265 SPDK_ERRLOG("submitting unmap request\n"); 1266 } 1267 args->cb_fn(args->cb_arg, rc); 1268 } 1269 } 1270 1271 /* Called by reduceLib after performing unload vol actions following base bdev hotremove */ 1272 static void 1273 bdev_hotremove_vol_unload_cb(void *cb_arg, int reduce_errno) 1274 { 1275 struct vbdev_compress *comp_bdev = (struct vbdev_compress *)cb_arg; 1276 1277 if (reduce_errno) { 1278 SPDK_ERRLOG("number %d\n", reduce_errno); 1279 } 1280 1281 comp_bdev->vol = NULL; 1282 spdk_bdev_unregister(&comp_bdev->comp_bdev, NULL, NULL); 1283 } 1284 1285 static void 1286 vbdev_compress_base_bdev_hotremove_cb(struct spdk_bdev *bdev_find) 1287 { 1288 struct vbdev_compress *comp_bdev, *tmp; 1289 1290 TAILQ_FOREACH_SAFE(comp_bdev, &g_vbdev_comp, link, tmp) { 1291 if (bdev_find == comp_bdev->base_bdev) { 1292 /* Tell reduceLib that we're done with this volume. */ 1293 spdk_reduce_vol_unload(comp_bdev->vol, bdev_hotremove_vol_unload_cb, comp_bdev); 1294 } 1295 } 1296 } 1297 1298 /* Called when the underlying base bdev triggers asynchronous event such as bdev removal. */ 1299 static void 1300 vbdev_compress_base_bdev_event_cb(enum spdk_bdev_event_type type, struct spdk_bdev *bdev, 1301 void *event_ctx) 1302 { 1303 switch (type) { 1304 case SPDK_BDEV_EVENT_REMOVE: 1305 vbdev_compress_base_bdev_hotremove_cb(bdev); 1306 break; 1307 default: 1308 SPDK_NOTICELOG("Unsupported bdev event: type %d\n", type); 1309 break; 1310 } 1311 } 1312 1313 /* TODO: determine which parms we want user configurable, HC for now 1314 * params.vol_size 1315 * params.chunk_size 1316 * compression PMD, algorithm, window size, comp level, etc. 1317 * DEV_MD_PATH 1318 */ 1319 1320 /* Common function for init and load to allocate and populate the minimal 1321 * information for reducelib to init or load. 1322 */ 1323 struct vbdev_compress * 1324 _prepare_for_load_init(struct spdk_bdev_desc *bdev_desc, uint32_t lb_size) 1325 { 1326 struct vbdev_compress *meta_ctx; 1327 struct spdk_bdev *bdev; 1328 1329 meta_ctx = calloc(1, sizeof(struct vbdev_compress)); 1330 if (meta_ctx == NULL) { 1331 SPDK_ERRLOG("failed to alloc init contexts\n"); 1332 return NULL; 1333 } 1334 1335 meta_ctx->drv_name = "None"; 1336 meta_ctx->backing_dev.unmap = _comp_reduce_unmap; 1337 meta_ctx->backing_dev.readv = _comp_reduce_readv; 1338 meta_ctx->backing_dev.writev = _comp_reduce_writev; 1339 meta_ctx->backing_dev.compress = _comp_reduce_compress; 1340 meta_ctx->backing_dev.decompress = _comp_reduce_decompress; 1341 1342 meta_ctx->base_desc = bdev_desc; 1343 bdev = spdk_bdev_desc_get_bdev(bdev_desc); 1344 meta_ctx->base_bdev = bdev; 1345 1346 meta_ctx->backing_dev.blocklen = bdev->blocklen; 1347 meta_ctx->backing_dev.blockcnt = bdev->blockcnt; 1348 1349 meta_ctx->params.chunk_size = CHUNK_SIZE; 1350 if (lb_size == 0) { 1351 meta_ctx->params.logical_block_size = bdev->blocklen; 1352 } else { 1353 meta_ctx->params.logical_block_size = lb_size; 1354 } 1355 1356 meta_ctx->params.backing_io_unit_size = BACKING_IO_SZ; 1357 return meta_ctx; 1358 } 1359 1360 static bool 1361 _set_pmd(struct vbdev_compress *comp_dev) 1362 { 1363 if (g_opts == COMPRESS_PMD_AUTO) { 1364 if (g_qat_available) { 1365 comp_dev->drv_name = QAT_PMD; 1366 } else if (g_mlx5_pci_available) { 1367 comp_dev->drv_name = MLX5_PMD; 1368 } else { 1369 comp_dev->drv_name = ISAL_PMD; 1370 } 1371 } else if (g_opts == COMPRESS_PMD_QAT_ONLY && g_qat_available) { 1372 comp_dev->drv_name = QAT_PMD; 1373 } else if (g_opts == COMPRESS_PMD_ISAL_ONLY && g_isal_available) { 1374 comp_dev->drv_name = ISAL_PMD; 1375 } else if (g_opts == COMPRESS_PMD_MLX5_PCI_ONLY && g_mlx5_pci_available) { 1376 comp_dev->drv_name = MLX5_PMD; 1377 } else { 1378 SPDK_ERRLOG("Requested PMD is not available.\n"); 1379 return false; 1380 } 1381 SPDK_NOTICELOG("PMD being used: %s\n", comp_dev->drv_name); 1382 return true; 1383 } 1384 1385 /* Call reducelib to initialize a new volume */ 1386 static int 1387 vbdev_init_reduce(const char *bdev_name, const char *pm_path, uint32_t lb_size) 1388 { 1389 struct spdk_bdev_desc *bdev_desc = NULL; 1390 struct vbdev_compress *meta_ctx; 1391 int rc; 1392 1393 rc = spdk_bdev_open_ext(bdev_name, true, vbdev_compress_base_bdev_event_cb, 1394 NULL, &bdev_desc); 1395 if (rc) { 1396 SPDK_ERRLOG("could not open bdev %s\n", bdev_name); 1397 return rc; 1398 } 1399 1400 meta_ctx = _prepare_for_load_init(bdev_desc, lb_size); 1401 if (meta_ctx == NULL) { 1402 spdk_bdev_close(bdev_desc); 1403 return -EINVAL; 1404 } 1405 1406 if (_set_pmd(meta_ctx) == false) { 1407 SPDK_ERRLOG("could not find required pmd\n"); 1408 free(meta_ctx); 1409 spdk_bdev_close(bdev_desc); 1410 return -EINVAL; 1411 } 1412 1413 /* Save the thread where the base device is opened */ 1414 meta_ctx->thread = spdk_get_thread(); 1415 1416 meta_ctx->base_ch = spdk_bdev_get_io_channel(meta_ctx->base_desc); 1417 1418 spdk_reduce_vol_init(&meta_ctx->params, &meta_ctx->backing_dev, 1419 pm_path, 1420 vbdev_reduce_init_cb, 1421 meta_ctx); 1422 return 0; 1423 } 1424 1425 /* We provide this callback for the SPDK channel code to create a channel using 1426 * the channel struct we provided in our module get_io_channel() entry point. Here 1427 * we get and save off an underlying base channel of the device below us so that 1428 * we can communicate with the base bdev on a per channel basis. If we needed 1429 * our own poller for this vbdev, we'd register it here. 1430 */ 1431 static int 1432 comp_bdev_ch_create_cb(void *io_device, void *ctx_buf) 1433 { 1434 struct vbdev_compress *comp_bdev = io_device; 1435 struct comp_device_qp *device_qp; 1436 1437 /* Now set the reduce channel if it's not already set. */ 1438 pthread_mutex_lock(&comp_bdev->reduce_lock); 1439 if (comp_bdev->ch_count == 0) { 1440 /* We use this queue to track outstanding IO in our layer. */ 1441 TAILQ_INIT(&comp_bdev->pending_comp_ios); 1442 1443 /* We use this to queue up compression operations as needed. */ 1444 TAILQ_INIT(&comp_bdev->queued_comp_ops); 1445 1446 comp_bdev->base_ch = spdk_bdev_get_io_channel(comp_bdev->base_desc); 1447 comp_bdev->reduce_thread = spdk_get_thread(); 1448 comp_bdev->poller = SPDK_POLLER_REGISTER(comp_dev_poller, comp_bdev, 0); 1449 /* Now assign a q pair */ 1450 pthread_mutex_lock(&g_comp_device_qp_lock); 1451 TAILQ_FOREACH(device_qp, &g_comp_device_qp, link) { 1452 if (strcmp(device_qp->device->cdev_info.driver_name, comp_bdev->drv_name) == 0) { 1453 if (device_qp->thread == spdk_get_thread()) { 1454 comp_bdev->device_qp = device_qp; 1455 break; 1456 } 1457 if (device_qp->thread == NULL) { 1458 comp_bdev->device_qp = device_qp; 1459 device_qp->thread = spdk_get_thread(); 1460 break; 1461 } 1462 } 1463 } 1464 pthread_mutex_unlock(&g_comp_device_qp_lock); 1465 } 1466 comp_bdev->ch_count++; 1467 pthread_mutex_unlock(&comp_bdev->reduce_lock); 1468 1469 if (comp_bdev->device_qp != NULL) { 1470 return 0; 1471 } else { 1472 SPDK_ERRLOG("out of qpairs, cannot assign one to comp_bdev %p\n", comp_bdev); 1473 assert(false); 1474 return -ENOMEM; 1475 } 1476 } 1477 1478 static void 1479 _channel_cleanup(struct vbdev_compress *comp_bdev) 1480 { 1481 /* Note: comp_bdevs can share a device_qp if they are 1482 * on the same thread so we leave the device_qp element 1483 * alone for this comp_bdev and just clear the reduce thread. 1484 */ 1485 spdk_put_io_channel(comp_bdev->base_ch); 1486 comp_bdev->reduce_thread = NULL; 1487 spdk_poller_unregister(&comp_bdev->poller); 1488 } 1489 1490 /* Used to reroute destroy_ch to the correct thread */ 1491 static void 1492 _comp_bdev_ch_destroy_cb(void *arg) 1493 { 1494 struct vbdev_compress *comp_bdev = arg; 1495 1496 pthread_mutex_lock(&comp_bdev->reduce_lock); 1497 _channel_cleanup(comp_bdev); 1498 pthread_mutex_unlock(&comp_bdev->reduce_lock); 1499 } 1500 1501 /* We provide this callback for the SPDK channel code to destroy a channel 1502 * created with our create callback. We just need to undo anything we did 1503 * when we created. If this bdev used its own poller, we'd unregister it here. 1504 */ 1505 static void 1506 comp_bdev_ch_destroy_cb(void *io_device, void *ctx_buf) 1507 { 1508 struct vbdev_compress *comp_bdev = io_device; 1509 1510 pthread_mutex_lock(&comp_bdev->reduce_lock); 1511 comp_bdev->ch_count--; 1512 if (comp_bdev->ch_count == 0) { 1513 /* Send this request to the thread where the channel was created. */ 1514 if (comp_bdev->reduce_thread != spdk_get_thread()) { 1515 spdk_thread_send_msg(comp_bdev->reduce_thread, 1516 _comp_bdev_ch_destroy_cb, comp_bdev); 1517 } else { 1518 _channel_cleanup(comp_bdev); 1519 } 1520 } 1521 pthread_mutex_unlock(&comp_bdev->reduce_lock); 1522 } 1523 1524 /* RPC entry point for compression vbdev creation. */ 1525 int 1526 create_compress_bdev(const char *bdev_name, const char *pm_path, uint32_t lb_size) 1527 { 1528 if ((lb_size != 0) && (lb_size != LB_SIZE_4K) && (lb_size != LB_SIZE_512B)) { 1529 SPDK_ERRLOG("Logical block size must be 512 or 4096\n"); 1530 return -EINVAL; 1531 } 1532 1533 return vbdev_init_reduce(bdev_name, pm_path, lb_size); 1534 } 1535 1536 /* On init, just init the compress drivers. All metadata is stored on disk. */ 1537 static int 1538 vbdev_compress_init(void) 1539 { 1540 if (vbdev_init_compress_drivers()) { 1541 SPDK_ERRLOG("Error setting up compression devices\n"); 1542 return -EINVAL; 1543 } 1544 1545 return 0; 1546 } 1547 1548 /* Called when the entire module is being torn down. */ 1549 static void 1550 vbdev_compress_finish(void) 1551 { 1552 struct comp_device_qp *dev_qp; 1553 /* TODO: unload vol in a future patch */ 1554 1555 while ((dev_qp = TAILQ_FIRST(&g_comp_device_qp))) { 1556 TAILQ_REMOVE(&g_comp_device_qp, dev_qp, link); 1557 free(dev_qp); 1558 } 1559 pthread_mutex_destroy(&g_comp_device_qp_lock); 1560 1561 rte_mempool_free(g_comp_op_mp); 1562 rte_mempool_free(g_mbuf_mp); 1563 } 1564 1565 /* During init we'll be asked how much memory we'd like passed to us 1566 * in bev_io structures as context. Here's where we specify how 1567 * much context we want per IO. 1568 */ 1569 static int 1570 vbdev_compress_get_ctx_size(void) 1571 { 1572 return sizeof(struct comp_bdev_io); 1573 } 1574 1575 /* When we register our bdev this is how we specify our entry points. */ 1576 static const struct spdk_bdev_fn_table vbdev_compress_fn_table = { 1577 .destruct = vbdev_compress_destruct, 1578 .submit_request = vbdev_compress_submit_request, 1579 .io_type_supported = vbdev_compress_io_type_supported, 1580 .get_io_channel = vbdev_compress_get_io_channel, 1581 .dump_info_json = vbdev_compress_dump_info_json, 1582 .write_config_json = NULL, 1583 }; 1584 1585 static struct spdk_bdev_module compress_if = { 1586 .name = "compress", 1587 .module_init = vbdev_compress_init, 1588 .get_ctx_size = vbdev_compress_get_ctx_size, 1589 .examine_disk = vbdev_compress_examine, 1590 .module_fini = vbdev_compress_finish, 1591 .config_json = vbdev_compress_config_json 1592 }; 1593 1594 SPDK_BDEV_MODULE_REGISTER(compress, &compress_if) 1595 1596 static int _set_compbdev_name(struct vbdev_compress *comp_bdev) 1597 { 1598 struct spdk_bdev_alias *aliases; 1599 1600 if (!TAILQ_EMPTY(spdk_bdev_get_aliases(comp_bdev->base_bdev))) { 1601 aliases = TAILQ_FIRST(spdk_bdev_get_aliases(comp_bdev->base_bdev)); 1602 comp_bdev->comp_bdev.name = spdk_sprintf_alloc("COMP_%s", aliases->alias.name); 1603 if (!comp_bdev->comp_bdev.name) { 1604 SPDK_ERRLOG("could not allocate comp_bdev name for alias\n"); 1605 return -ENOMEM; 1606 } 1607 } else { 1608 comp_bdev->comp_bdev.name = spdk_sprintf_alloc("COMP_%s", comp_bdev->base_bdev->name); 1609 if (!comp_bdev->comp_bdev.name) { 1610 SPDK_ERRLOG("could not allocate comp_bdev name for unique name\n"); 1611 return -ENOMEM; 1612 } 1613 } 1614 return 0; 1615 } 1616 1617 static int 1618 vbdev_compress_claim(struct vbdev_compress *comp_bdev) 1619 { 1620 int rc; 1621 1622 if (_set_compbdev_name(comp_bdev)) { 1623 return -EINVAL; 1624 } 1625 1626 /* Note: some of the fields below will change in the future - for example, 1627 * blockcnt specifically will not match (the compressed volume size will 1628 * be slightly less than the base bdev size) 1629 */ 1630 comp_bdev->comp_bdev.product_name = COMP_BDEV_NAME; 1631 comp_bdev->comp_bdev.write_cache = comp_bdev->base_bdev->write_cache; 1632 1633 if (strcmp(comp_bdev->drv_name, QAT_PMD) == 0) { 1634 comp_bdev->comp_bdev.required_alignment = 1635 spdk_max(spdk_u32log2(comp_bdev->base_bdev->blocklen), 1636 comp_bdev->base_bdev->required_alignment); 1637 SPDK_NOTICELOG("QAT in use: Required alignment set to %u\n", 1638 comp_bdev->comp_bdev.required_alignment); 1639 } else { 1640 comp_bdev->comp_bdev.required_alignment = comp_bdev->base_bdev->required_alignment; 1641 } 1642 comp_bdev->comp_bdev.optimal_io_boundary = 1643 comp_bdev->params.chunk_size / comp_bdev->params.logical_block_size; 1644 1645 comp_bdev->comp_bdev.split_on_optimal_io_boundary = true; 1646 1647 comp_bdev->comp_bdev.blocklen = comp_bdev->params.logical_block_size; 1648 comp_bdev->comp_bdev.blockcnt = comp_bdev->params.vol_size / comp_bdev->comp_bdev.blocklen; 1649 assert(comp_bdev->comp_bdev.blockcnt > 0); 1650 1651 /* This is the context that is passed to us when the bdev 1652 * layer calls in so we'll save our comp_bdev node here. 1653 */ 1654 comp_bdev->comp_bdev.ctxt = comp_bdev; 1655 comp_bdev->comp_bdev.fn_table = &vbdev_compress_fn_table; 1656 comp_bdev->comp_bdev.module = &compress_if; 1657 1658 pthread_mutex_init(&comp_bdev->reduce_lock, NULL); 1659 1660 /* Save the thread where the base device is opened */ 1661 comp_bdev->thread = spdk_get_thread(); 1662 1663 spdk_io_device_register(comp_bdev, comp_bdev_ch_create_cb, comp_bdev_ch_destroy_cb, 1664 sizeof(struct comp_io_channel), 1665 comp_bdev->comp_bdev.name); 1666 1667 rc = spdk_bdev_module_claim_bdev(comp_bdev->base_bdev, comp_bdev->base_desc, 1668 comp_bdev->comp_bdev.module); 1669 if (rc) { 1670 SPDK_ERRLOG("could not claim bdev %s\n", spdk_bdev_get_name(comp_bdev->base_bdev)); 1671 goto error_claim; 1672 } 1673 1674 rc = spdk_bdev_register(&comp_bdev->comp_bdev); 1675 if (rc < 0) { 1676 SPDK_ERRLOG("trying to register bdev\n"); 1677 goto error_bdev_register; 1678 } 1679 1680 TAILQ_INSERT_TAIL(&g_vbdev_comp, comp_bdev, link); 1681 1682 SPDK_NOTICELOG("registered io_device and virtual bdev for: %s\n", comp_bdev->comp_bdev.name); 1683 1684 return 0; 1685 1686 /* Error cleanup paths. */ 1687 error_bdev_register: 1688 spdk_bdev_module_release_bdev(comp_bdev->base_bdev); 1689 error_claim: 1690 spdk_io_device_unregister(comp_bdev, NULL); 1691 free(comp_bdev->comp_bdev.name); 1692 return rc; 1693 } 1694 1695 static void 1696 _vbdev_compress_delete_done(void *_ctx) 1697 { 1698 struct vbdev_comp_delete_ctx *ctx = _ctx; 1699 1700 ctx->cb_fn(ctx->cb_arg, ctx->cb_rc); 1701 1702 free(ctx); 1703 } 1704 1705 static void 1706 vbdev_compress_delete_done(void *cb_arg, int bdeverrno) 1707 { 1708 struct vbdev_comp_delete_ctx *ctx = cb_arg; 1709 1710 ctx->cb_rc = bdeverrno; 1711 1712 if (ctx->orig_thread != spdk_get_thread()) { 1713 spdk_thread_send_msg(ctx->orig_thread, _vbdev_compress_delete_done, ctx); 1714 } else { 1715 _vbdev_compress_delete_done(ctx); 1716 } 1717 } 1718 1719 void 1720 bdev_compress_delete(const char *name, spdk_delete_compress_complete cb_fn, void *cb_arg) 1721 { 1722 struct vbdev_compress *comp_bdev = NULL; 1723 struct vbdev_comp_delete_ctx *ctx; 1724 1725 TAILQ_FOREACH(comp_bdev, &g_vbdev_comp, link) { 1726 if (strcmp(name, comp_bdev->comp_bdev.name) == 0) { 1727 break; 1728 } 1729 } 1730 1731 if (comp_bdev == NULL) { 1732 cb_fn(cb_arg, -ENODEV); 1733 return; 1734 } 1735 1736 ctx = calloc(1, sizeof(*ctx)); 1737 if (ctx == NULL) { 1738 SPDK_ERRLOG("Failed to allocate delete context\n"); 1739 cb_fn(cb_arg, -ENOMEM); 1740 return; 1741 } 1742 1743 /* Save these for after the vol is destroyed. */ 1744 ctx->cb_fn = cb_fn; 1745 ctx->cb_arg = cb_arg; 1746 ctx->orig_thread = spdk_get_thread(); 1747 1748 comp_bdev->delete_ctx = ctx; 1749 1750 /* Tell reducelib that we're done with this volume. */ 1751 if (comp_bdev->orphaned == false) { 1752 spdk_reduce_vol_unload(comp_bdev->vol, delete_vol_unload_cb, comp_bdev); 1753 } else { 1754 delete_vol_unload_cb(comp_bdev, 0); 1755 } 1756 } 1757 1758 static void 1759 _vbdev_reduce_load_cb(void *ctx) 1760 { 1761 struct vbdev_compress *meta_ctx = ctx; 1762 int rc; 1763 1764 assert(meta_ctx->base_desc != NULL); 1765 1766 /* Done with metadata operations */ 1767 spdk_put_io_channel(meta_ctx->base_ch); 1768 1769 if (meta_ctx->reduce_errno == 0) { 1770 if (_set_pmd(meta_ctx) == false) { 1771 SPDK_ERRLOG("could not find required pmd\n"); 1772 goto err; 1773 } 1774 1775 rc = vbdev_compress_claim(meta_ctx); 1776 if (rc != 0) { 1777 goto err; 1778 } 1779 } else if (meta_ctx->reduce_errno == -ENOENT) { 1780 if (_set_compbdev_name(meta_ctx)) { 1781 goto err; 1782 } 1783 1784 /* Save the thread where the base device is opened */ 1785 meta_ctx->thread = spdk_get_thread(); 1786 1787 meta_ctx->comp_bdev.module = &compress_if; 1788 pthread_mutex_init(&meta_ctx->reduce_lock, NULL); 1789 rc = spdk_bdev_module_claim_bdev(meta_ctx->base_bdev, meta_ctx->base_desc, 1790 meta_ctx->comp_bdev.module); 1791 if (rc) { 1792 SPDK_ERRLOG("could not claim bdev %s\n", spdk_bdev_get_name(meta_ctx->base_bdev)); 1793 free(meta_ctx->comp_bdev.name); 1794 goto err; 1795 } 1796 1797 meta_ctx->orphaned = true; 1798 TAILQ_INSERT_TAIL(&g_vbdev_comp, meta_ctx, link); 1799 } else { 1800 if (meta_ctx->reduce_errno != -EILSEQ) { 1801 SPDK_ERRLOG("for vol %s, error %u\n", 1802 spdk_bdev_get_name(meta_ctx->base_bdev), meta_ctx->reduce_errno); 1803 } 1804 goto err; 1805 } 1806 1807 spdk_bdev_module_examine_done(&compress_if); 1808 return; 1809 1810 err: 1811 /* Close the underlying bdev on its same opened thread. */ 1812 spdk_bdev_close(meta_ctx->base_desc); 1813 free(meta_ctx); 1814 spdk_bdev_module_examine_done(&compress_if); 1815 } 1816 1817 /* Callback from reduce for then load is complete. We'll pass the vbdev_comp struct 1818 * used for initial metadata operations to claim where it will be further filled out 1819 * and added to the global list. 1820 */ 1821 static void 1822 vbdev_reduce_load_cb(void *cb_arg, struct spdk_reduce_vol *vol, int reduce_errno) 1823 { 1824 struct vbdev_compress *meta_ctx = cb_arg; 1825 1826 if (reduce_errno == 0) { 1827 /* Update information following volume load. */ 1828 meta_ctx->vol = vol; 1829 memcpy(&meta_ctx->params, spdk_reduce_vol_get_params(vol), 1830 sizeof(struct spdk_reduce_vol_params)); 1831 } 1832 1833 meta_ctx->reduce_errno = reduce_errno; 1834 1835 if (meta_ctx->thread && meta_ctx->thread != spdk_get_thread()) { 1836 spdk_thread_send_msg(meta_ctx->thread, _vbdev_reduce_load_cb, meta_ctx); 1837 } else { 1838 _vbdev_reduce_load_cb(meta_ctx); 1839 } 1840 1841 } 1842 1843 /* Examine_disk entry point: will do a metadata load to see if this is ours, 1844 * and if so will go ahead and claim it. 1845 */ 1846 static void 1847 vbdev_compress_examine(struct spdk_bdev *bdev) 1848 { 1849 struct spdk_bdev_desc *bdev_desc = NULL; 1850 struct vbdev_compress *meta_ctx; 1851 int rc; 1852 1853 if (strcmp(bdev->product_name, COMP_BDEV_NAME) == 0) { 1854 spdk_bdev_module_examine_done(&compress_if); 1855 return; 1856 } 1857 1858 rc = spdk_bdev_open_ext(spdk_bdev_get_name(bdev), false, 1859 vbdev_compress_base_bdev_event_cb, NULL, &bdev_desc); 1860 if (rc) { 1861 SPDK_ERRLOG("could not open bdev %s\n", spdk_bdev_get_name(bdev)); 1862 spdk_bdev_module_examine_done(&compress_if); 1863 return; 1864 } 1865 1866 meta_ctx = _prepare_for_load_init(bdev_desc, 0); 1867 if (meta_ctx == NULL) { 1868 spdk_bdev_close(bdev_desc); 1869 spdk_bdev_module_examine_done(&compress_if); 1870 return; 1871 } 1872 1873 /* Save the thread where the base device is opened */ 1874 meta_ctx->thread = spdk_get_thread(); 1875 1876 meta_ctx->base_ch = spdk_bdev_get_io_channel(meta_ctx->base_desc); 1877 spdk_reduce_vol_load(&meta_ctx->backing_dev, vbdev_reduce_load_cb, meta_ctx); 1878 } 1879 1880 int 1881 compress_set_pmd(enum compress_pmd *opts) 1882 { 1883 g_opts = *opts; 1884 1885 return 0; 1886 } 1887 1888 SPDK_LOG_REGISTER_COMPONENT(vbdev_compress) 1889