1 /*- 2 * BSD LICENSE 3 * 4 * Copyright (c) Intel Corporation. 5 * All rights reserved. 6 * Copyright (c) 2021 NVIDIA CORPORATION & AFFILIATES. All rights reserved. 7 * 8 * Redistribution and use in source and binary forms, with or without 9 * modification, are permitted provided that the following conditions 10 * are met: 11 * 12 * * Redistributions of source code must retain the above copyright 13 * notice, this list of conditions and the following disclaimer. 14 * * Redistributions in binary form must reproduce the above copyright 15 * notice, this list of conditions and the following disclaimer in 16 * the documentation and/or other materials provided with the 17 * distribution. 18 * * Neither the name of Intel Corporation nor the names of its 19 * contributors may be used to endorse or promote products derived 20 * from this software without specific prior written permission. 21 * 22 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 23 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 24 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR 25 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT 26 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, 27 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT 28 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 29 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY 30 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 31 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE 32 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 33 */ 34 35 #include "vbdev_compress.h" 36 37 #include "spdk/reduce.h" 38 #include "spdk/stdinc.h" 39 #include "spdk/rpc.h" 40 #include "spdk/env.h" 41 #include "spdk/endian.h" 42 #include "spdk/string.h" 43 #include "spdk/thread.h" 44 #include "spdk/util.h" 45 #include "spdk/bdev_module.h" 46 47 #include "spdk/log.h" 48 49 #include <rte_config.h> 50 #include <rte_bus_vdev.h> 51 #include <rte_compressdev.h> 52 #include <rte_comp.h> 53 #include <rte_mbuf_dyn.h> 54 55 /* Used to store IO context in mbuf */ 56 static const struct rte_mbuf_dynfield rte_mbuf_dynfield_io_context = { 57 .name = "context_reduce", 58 .size = sizeof(uint64_t), 59 .align = __alignof__(uint64_t), 60 .flags = 0, 61 }; 62 static int g_mbuf_offset; 63 64 #define NUM_MAX_XFORMS 2 65 #define NUM_MAX_INFLIGHT_OPS 128 66 #define DEFAULT_WINDOW_SIZE 15 67 /* We need extra mbufs per operation to accommodate host buffers that 68 * span a 2MB boundary. 69 */ 70 #define MAX_MBUFS_PER_OP (REDUCE_MAX_IOVECS * 2) 71 #define CHUNK_SIZE (1024 * 16) 72 #define COMP_BDEV_NAME "compress" 73 #define BACKING_IO_SZ (4 * 1024) 74 75 #define ISAL_PMD "compress_isal" 76 #define QAT_PMD "compress_qat" 77 #define MLX5_PMD "mlx5_pci" 78 #define NUM_MBUFS 8192 79 #define POOL_CACHE_SIZE 256 80 81 static enum compress_pmd g_opts; 82 83 /* Global list of available compression devices. */ 84 struct compress_dev { 85 struct rte_compressdev_info cdev_info; /* includes device friendly name */ 86 uint8_t cdev_id; /* identifier for the device */ 87 void *comp_xform; /* shared private xform for comp on this PMD */ 88 void *decomp_xform; /* shared private xform for decomp on this PMD */ 89 TAILQ_ENTRY(compress_dev) link; 90 }; 91 static TAILQ_HEAD(, compress_dev) g_compress_devs = TAILQ_HEAD_INITIALIZER(g_compress_devs); 92 93 /* Although ISAL PMD reports 'unlimited' qpairs, it has an unplanned limit of 99 due to 94 * the length of the internal ring name that it creates, it breaks a limit in the generic 95 * ring code and fails the qp initialization. 96 * FIXME: Reduce number of qpairs to 48, due to issue #2338 97 */ 98 #define MAX_NUM_QP 48 99 /* Global list and lock for unique device/queue pair combos */ 100 struct comp_device_qp { 101 struct compress_dev *device; /* ptr to compression device */ 102 uint8_t qp; /* queue pair for this node */ 103 struct spdk_thread *thread; /* thread that this qp is assigned to */ 104 TAILQ_ENTRY(comp_device_qp) link; 105 }; 106 static TAILQ_HEAD(, comp_device_qp) g_comp_device_qp = TAILQ_HEAD_INITIALIZER(g_comp_device_qp); 107 static pthread_mutex_t g_comp_device_qp_lock = PTHREAD_MUTEX_INITIALIZER; 108 109 /* For queueing up compression operations that we can't submit for some reason */ 110 struct vbdev_comp_op { 111 struct spdk_reduce_backing_dev *backing_dev; 112 struct iovec *src_iovs; 113 int src_iovcnt; 114 struct iovec *dst_iovs; 115 int dst_iovcnt; 116 bool compress; 117 void *cb_arg; 118 TAILQ_ENTRY(vbdev_comp_op) link; 119 }; 120 121 struct vbdev_comp_delete_ctx { 122 spdk_delete_compress_complete cb_fn; 123 void *cb_arg; 124 int cb_rc; 125 struct spdk_thread *orig_thread; 126 }; 127 128 /* List of virtual bdevs and associated info for each. */ 129 struct vbdev_compress { 130 struct spdk_bdev *base_bdev; /* the thing we're attaching to */ 131 struct spdk_bdev_desc *base_desc; /* its descriptor we get from open */ 132 struct spdk_io_channel *base_ch; /* IO channel of base device */ 133 struct spdk_bdev comp_bdev; /* the compression virtual bdev */ 134 struct comp_io_channel *comp_ch; /* channel associated with this bdev */ 135 char *drv_name; /* name of the compression device driver */ 136 struct comp_device_qp *device_qp; 137 struct spdk_thread *reduce_thread; 138 pthread_mutex_t reduce_lock; 139 uint32_t ch_count; 140 TAILQ_HEAD(, spdk_bdev_io) pending_comp_ios; /* outstanding operations to a comp library */ 141 struct spdk_poller *poller; /* completion poller */ 142 struct spdk_reduce_vol_params params; /* params for the reduce volume */ 143 struct spdk_reduce_backing_dev backing_dev; /* backing device info for the reduce volume */ 144 struct spdk_reduce_vol *vol; /* the reduce volume */ 145 struct vbdev_comp_delete_ctx *delete_ctx; 146 bool orphaned; /* base bdev claimed but comp_bdev not registered */ 147 int reduce_errno; 148 TAILQ_HEAD(, vbdev_comp_op) queued_comp_ops; 149 TAILQ_ENTRY(vbdev_compress) link; 150 struct spdk_thread *thread; /* thread where base device is opened */ 151 }; 152 static TAILQ_HEAD(, vbdev_compress) g_vbdev_comp = TAILQ_HEAD_INITIALIZER(g_vbdev_comp); 153 154 /* The comp vbdev channel struct. It is allocated and freed on my behalf by the io channel code. 155 */ 156 struct comp_io_channel { 157 struct spdk_io_channel_iter *iter; /* used with for_each_channel in reset */ 158 }; 159 160 /* Per I/O context for the compression vbdev. */ 161 struct comp_bdev_io { 162 struct comp_io_channel *comp_ch; /* used in completion handling */ 163 struct vbdev_compress *comp_bdev; /* vbdev associated with this IO */ 164 struct spdk_bdev_io_wait_entry bdev_io_wait; /* for bdev_io_wait */ 165 struct spdk_bdev_io *orig_io; /* the original IO */ 166 struct spdk_io_channel *ch; /* for resubmission */ 167 int status; /* save for completion on orig thread */ 168 }; 169 170 /* Shared mempools between all devices on this system */ 171 static struct rte_mempool *g_mbuf_mp = NULL; /* mbuf mempool */ 172 static struct rte_mempool *g_comp_op_mp = NULL; /* comp operations, must be rte* mempool */ 173 static struct rte_mbuf_ext_shared_info g_shinfo = {}; /* used by DPDK mbuf macros */ 174 static bool g_qat_available = false; 175 static bool g_isal_available = false; 176 static bool g_mlx5_pci_available = false; 177 178 /* Create shared (between all ops per PMD) compress xforms. */ 179 static struct rte_comp_xform g_comp_xform = { 180 .type = RTE_COMP_COMPRESS, 181 .compress = { 182 .algo = RTE_COMP_ALGO_DEFLATE, 183 .deflate.huffman = RTE_COMP_HUFFMAN_DEFAULT, 184 .level = RTE_COMP_LEVEL_MAX, 185 .window_size = DEFAULT_WINDOW_SIZE, 186 .chksum = RTE_COMP_CHECKSUM_NONE, 187 .hash_algo = RTE_COMP_HASH_ALGO_NONE 188 } 189 }; 190 /* Create shared (between all ops per PMD) decompress xforms. */ 191 static struct rte_comp_xform g_decomp_xform = { 192 .type = RTE_COMP_DECOMPRESS, 193 .decompress = { 194 .algo = RTE_COMP_ALGO_DEFLATE, 195 .chksum = RTE_COMP_CHECKSUM_NONE, 196 .window_size = DEFAULT_WINDOW_SIZE, 197 .hash_algo = RTE_COMP_HASH_ALGO_NONE 198 } 199 }; 200 201 static void vbdev_compress_examine(struct spdk_bdev *bdev); 202 static int vbdev_compress_claim(struct vbdev_compress *comp_bdev); 203 static void vbdev_compress_queue_io(struct spdk_bdev_io *bdev_io); 204 struct vbdev_compress *_prepare_for_load_init(struct spdk_bdev_desc *bdev_desc, uint32_t lb_size); 205 static void vbdev_compress_submit_request(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io); 206 static void comp_bdev_ch_destroy_cb(void *io_device, void *ctx_buf); 207 static void vbdev_compress_delete_done(void *cb_arg, int bdeverrno); 208 209 /* Dummy function used by DPDK to free ext attached buffers 210 * to mbufs, we free them ourselves but this callback has to 211 * be here. 212 */ 213 static void 214 shinfo_free_cb(void *arg1, void *arg2) 215 { 216 } 217 218 /* Called by vbdev_init_compress_drivers() to init each discovered compression device */ 219 static int 220 create_compress_dev(uint8_t index) 221 { 222 struct compress_dev *device; 223 uint16_t q_pairs; 224 uint8_t cdev_id; 225 int rc, i; 226 struct comp_device_qp *dev_qp; 227 struct comp_device_qp *tmp_qp; 228 229 device = calloc(1, sizeof(struct compress_dev)); 230 if (!device) { 231 return -ENOMEM; 232 } 233 234 /* Get details about this device. */ 235 rte_compressdev_info_get(index, &device->cdev_info); 236 237 cdev_id = device->cdev_id = index; 238 239 /* Zero means no limit so choose number of lcores. */ 240 if (device->cdev_info.max_nb_queue_pairs == 0) { 241 q_pairs = MAX_NUM_QP; 242 } else { 243 q_pairs = spdk_min(device->cdev_info.max_nb_queue_pairs, MAX_NUM_QP); 244 } 245 246 /* Configure the compression device. */ 247 struct rte_compressdev_config config = { 248 .socket_id = rte_socket_id(), 249 .nb_queue_pairs = q_pairs, 250 .max_nb_priv_xforms = NUM_MAX_XFORMS, 251 .max_nb_streams = 0 252 }; 253 rc = rte_compressdev_configure(cdev_id, &config); 254 if (rc < 0) { 255 SPDK_ERRLOG("Failed to configure compressdev %u\n", cdev_id); 256 goto err; 257 } 258 259 /* Pre-setup all potential qpairs now and assign them in the channel 260 * callback. 261 */ 262 for (i = 0; i < q_pairs; i++) { 263 rc = rte_compressdev_queue_pair_setup(cdev_id, i, 264 NUM_MAX_INFLIGHT_OPS, 265 rte_socket_id()); 266 if (rc) { 267 if (i > 0) { 268 q_pairs = i; 269 SPDK_NOTICELOG("FYI failed to setup a queue pair on " 270 "compressdev %u with error %u " 271 "so limiting to %u qpairs\n", 272 cdev_id, rc, q_pairs); 273 break; 274 } else { 275 SPDK_ERRLOG("Failed to setup queue pair on " 276 "compressdev %u with error %u\n", cdev_id, rc); 277 rc = -EINVAL; 278 goto err; 279 } 280 } 281 } 282 283 rc = rte_compressdev_start(cdev_id); 284 if (rc < 0) { 285 SPDK_ERRLOG("Failed to start device %u: error %d\n", 286 cdev_id, rc); 287 goto err; 288 } 289 290 if (device->cdev_info.capabilities->comp_feature_flags & RTE_COMP_FF_SHAREABLE_PRIV_XFORM) { 291 rc = rte_compressdev_private_xform_create(cdev_id, &g_comp_xform, 292 &device->comp_xform); 293 if (rc < 0) { 294 SPDK_ERRLOG("Failed to create private comp xform device %u: error %d\n", 295 cdev_id, rc); 296 goto err; 297 } 298 299 rc = rte_compressdev_private_xform_create(cdev_id, &g_decomp_xform, 300 &device->decomp_xform); 301 if (rc) { 302 SPDK_ERRLOG("Failed to create private decomp xform device %u: error %d\n", 303 cdev_id, rc); 304 goto err; 305 } 306 } else { 307 SPDK_ERRLOG("PMD does not support shared transforms\n"); 308 goto err; 309 } 310 311 /* Build up list of device/qp combinations */ 312 for (i = 0; i < q_pairs; i++) { 313 dev_qp = calloc(1, sizeof(struct comp_device_qp)); 314 if (!dev_qp) { 315 rc = -ENOMEM; 316 goto err; 317 } 318 dev_qp->device = device; 319 dev_qp->qp = i; 320 dev_qp->thread = NULL; 321 TAILQ_INSERT_TAIL(&g_comp_device_qp, dev_qp, link); 322 } 323 324 TAILQ_INSERT_TAIL(&g_compress_devs, device, link); 325 326 if (strcmp(device->cdev_info.driver_name, QAT_PMD) == 0) { 327 g_qat_available = true; 328 } 329 if (strcmp(device->cdev_info.driver_name, ISAL_PMD) == 0) { 330 g_isal_available = true; 331 } 332 if (strcmp(device->cdev_info.driver_name, MLX5_PMD) == 0) { 333 g_mlx5_pci_available = true; 334 } 335 336 return 0; 337 338 err: 339 TAILQ_FOREACH_SAFE(dev_qp, &g_comp_device_qp, link, tmp_qp) { 340 TAILQ_REMOVE(&g_comp_device_qp, dev_qp, link); 341 free(dev_qp); 342 } 343 free(device); 344 return rc; 345 } 346 347 /* Called from driver init entry point, vbdev_compress_init() */ 348 static int 349 vbdev_init_compress_drivers(void) 350 { 351 uint8_t cdev_count, i; 352 struct compress_dev *tmp_dev; 353 struct compress_dev *device; 354 int rc; 355 356 /* We always init the compress_isal PMD */ 357 rc = rte_vdev_init(ISAL_PMD, NULL); 358 if (rc == 0) { 359 SPDK_NOTICELOG("created virtual PMD %s\n", ISAL_PMD); 360 } else if (rc == -EEXIST) { 361 SPDK_NOTICELOG("virtual PMD %s already exists.\n", ISAL_PMD); 362 } else { 363 SPDK_ERRLOG("creating virtual PMD %s\n", ISAL_PMD); 364 return -EINVAL; 365 } 366 367 /* If we have no compression devices, there's no reason to continue. */ 368 cdev_count = rte_compressdev_count(); 369 if (cdev_count == 0) { 370 return 0; 371 } 372 if (cdev_count > RTE_COMPRESS_MAX_DEVS) { 373 SPDK_ERRLOG("invalid device count from rte_compressdev_count()\n"); 374 return -EINVAL; 375 } 376 377 g_mbuf_offset = rte_mbuf_dynfield_register(&rte_mbuf_dynfield_io_context); 378 if (g_mbuf_offset < 0) { 379 SPDK_ERRLOG("error registering dynamic field with DPDK\n"); 380 return -EINVAL; 381 } 382 383 g_mbuf_mp = rte_pktmbuf_pool_create("comp_mbuf_mp", NUM_MBUFS, POOL_CACHE_SIZE, 384 sizeof(struct rte_mbuf), 0, rte_socket_id()); 385 if (g_mbuf_mp == NULL) { 386 SPDK_ERRLOG("Cannot create mbuf pool\n"); 387 rc = -ENOMEM; 388 goto error_create_mbuf; 389 } 390 391 g_comp_op_mp = rte_comp_op_pool_create("comp_op_pool", NUM_MBUFS, POOL_CACHE_SIZE, 392 0, rte_socket_id()); 393 if (g_comp_op_mp == NULL) { 394 SPDK_ERRLOG("Cannot create comp op pool\n"); 395 rc = -ENOMEM; 396 goto error_create_op; 397 } 398 399 /* Init all devices */ 400 for (i = 0; i < cdev_count; i++) { 401 rc = create_compress_dev(i); 402 if (rc != 0) { 403 goto error_create_compress_devs; 404 } 405 } 406 407 if (g_qat_available == true) { 408 SPDK_NOTICELOG("initialized QAT PMD\n"); 409 } 410 411 g_shinfo.free_cb = shinfo_free_cb; 412 413 return 0; 414 415 /* Error cleanup paths. */ 416 error_create_compress_devs: 417 TAILQ_FOREACH_SAFE(device, &g_compress_devs, link, tmp_dev) { 418 TAILQ_REMOVE(&g_compress_devs, device, link); 419 free(device); 420 } 421 error_create_op: 422 error_create_mbuf: 423 rte_mempool_free(g_mbuf_mp); 424 425 return rc; 426 } 427 428 /* for completing rw requests on the orig IO thread. */ 429 static void 430 _reduce_rw_blocks_cb(void *arg) 431 { 432 struct comp_bdev_io *io_ctx = arg; 433 434 if (io_ctx->status == 0) { 435 spdk_bdev_io_complete(io_ctx->orig_io, SPDK_BDEV_IO_STATUS_SUCCESS); 436 } else { 437 SPDK_ERRLOG("status %d on operation from reduce API\n", io_ctx->status); 438 spdk_bdev_io_complete(io_ctx->orig_io, SPDK_BDEV_IO_STATUS_FAILED); 439 } 440 } 441 442 /* Completion callback for r/w that were issued via reducelib. */ 443 static void 444 reduce_rw_blocks_cb(void *arg, int reduce_errno) 445 { 446 struct spdk_bdev_io *bdev_io = arg; 447 struct comp_bdev_io *io_ctx = (struct comp_bdev_io *)bdev_io->driver_ctx; 448 struct spdk_io_channel *ch = spdk_io_channel_from_ctx(io_ctx->comp_ch); 449 struct spdk_thread *orig_thread; 450 451 /* TODO: need to decide which error codes are bdev_io success vs failure; 452 * example examine calls reading metadata */ 453 454 io_ctx->status = reduce_errno; 455 456 /* Send this request to the orig IO thread. */ 457 orig_thread = spdk_io_channel_get_thread(ch); 458 if (orig_thread != spdk_get_thread()) { 459 spdk_thread_send_msg(orig_thread, _reduce_rw_blocks_cb, io_ctx); 460 } else { 461 _reduce_rw_blocks_cb(io_ctx); 462 } 463 } 464 465 static uint64_t 466 _setup_compress_mbuf(struct rte_mbuf **mbufs, int *mbuf_total, uint64_t *total_length, 467 struct iovec *iovs, int iovcnt, void *reduce_cb_arg) 468 { 469 uint64_t updated_length, remainder, phys_addr; 470 uint8_t *current_base = NULL; 471 int iov_index, mbuf_index; 472 int rc = 0; 473 474 /* Setup mbufs */ 475 iov_index = mbuf_index = 0; 476 while (iov_index < iovcnt) { 477 478 current_base = iovs[iov_index].iov_base; 479 if (total_length) { 480 *total_length += iovs[iov_index].iov_len; 481 } 482 assert(mbufs[mbuf_index] != NULL); 483 *RTE_MBUF_DYNFIELD(mbufs[mbuf_index], g_mbuf_offset, uint64_t *) = (uint64_t)reduce_cb_arg; 484 updated_length = iovs[iov_index].iov_len; 485 phys_addr = spdk_vtophys((void *)current_base, &updated_length); 486 487 rte_pktmbuf_attach_extbuf(mbufs[mbuf_index], 488 current_base, 489 phys_addr, 490 updated_length, 491 &g_shinfo); 492 rte_pktmbuf_append(mbufs[mbuf_index], updated_length); 493 remainder = iovs[iov_index].iov_len - updated_length; 494 495 if (mbuf_index > 0) { 496 rte_pktmbuf_chain(mbufs[0], mbufs[mbuf_index]); 497 } 498 499 /* If we crossed 2 2MB boundary we need another mbuf for the remainder */ 500 if (remainder > 0) { 501 /* allocate an mbuf at the end of the array */ 502 rc = rte_pktmbuf_alloc_bulk(g_mbuf_mp, 503 (struct rte_mbuf **)&mbufs[*mbuf_total], 1); 504 if (rc) { 505 SPDK_ERRLOG("ERROR trying to get an extra mbuf!\n"); 506 return -1; 507 } 508 (*mbuf_total)++; 509 mbuf_index++; 510 *RTE_MBUF_DYNFIELD(mbufs[mbuf_index], g_mbuf_offset, uint64_t *) = (uint64_t)reduce_cb_arg; 511 current_base += updated_length; 512 phys_addr = spdk_vtophys((void *)current_base, &remainder); 513 /* assert we don't cross another */ 514 assert(remainder == iovs[iov_index].iov_len - updated_length); 515 516 rte_pktmbuf_attach_extbuf(mbufs[mbuf_index], 517 current_base, 518 phys_addr, 519 remainder, 520 &g_shinfo); 521 rte_pktmbuf_append(mbufs[mbuf_index], remainder); 522 rte_pktmbuf_chain(mbufs[0], mbufs[mbuf_index]); 523 } 524 iov_index++; 525 mbuf_index++; 526 } 527 528 return 0; 529 } 530 531 static int 532 _compress_operation(struct spdk_reduce_backing_dev *backing_dev, struct iovec *src_iovs, 533 int src_iovcnt, struct iovec *dst_iovs, 534 int dst_iovcnt, bool compress, void *cb_arg) 535 { 536 void *reduce_cb_arg = cb_arg; 537 struct vbdev_compress *comp_bdev = SPDK_CONTAINEROF(backing_dev, struct vbdev_compress, 538 backing_dev); 539 struct rte_comp_op *comp_op; 540 struct rte_mbuf *src_mbufs[MAX_MBUFS_PER_OP]; 541 struct rte_mbuf *dst_mbufs[MAX_MBUFS_PER_OP]; 542 uint8_t cdev_id = comp_bdev->device_qp->device->cdev_id; 543 uint64_t total_length = 0; 544 int rc = 0; 545 struct vbdev_comp_op *op_to_queue; 546 int i; 547 int src_mbuf_total = src_iovcnt; 548 int dst_mbuf_total = dst_iovcnt; 549 bool device_error = false; 550 551 assert(src_iovcnt < MAX_MBUFS_PER_OP); 552 553 #ifdef DEBUG 554 memset(src_mbufs, 0, sizeof(src_mbufs)); 555 memset(dst_mbufs, 0, sizeof(dst_mbufs)); 556 #endif 557 558 comp_op = rte_comp_op_alloc(g_comp_op_mp); 559 if (!comp_op) { 560 SPDK_ERRLOG("trying to get a comp op!\n"); 561 goto error_get_op; 562 } 563 564 /* get an mbuf per iov, src and dst */ 565 rc = rte_pktmbuf_alloc_bulk(g_mbuf_mp, (struct rte_mbuf **)&src_mbufs[0], src_iovcnt); 566 if (rc) { 567 SPDK_ERRLOG("ERROR trying to get src_mbufs!\n"); 568 goto error_get_src; 569 } 570 571 rc = rte_pktmbuf_alloc_bulk(g_mbuf_mp, (struct rte_mbuf **)&dst_mbufs[0], dst_iovcnt); 572 if (rc) { 573 SPDK_ERRLOG("ERROR trying to get dst_mbufs!\n"); 574 goto error_get_dst; 575 } 576 577 /* There is a 1:1 mapping between a bdev_io and a compression operation, but 578 * all compression PMDs that SPDK uses support chaining so build our mbuf chain 579 * and associate with our single comp_op. 580 */ 581 582 rc = _setup_compress_mbuf(&src_mbufs[0], &src_mbuf_total, &total_length, 583 src_iovs, src_iovcnt, reduce_cb_arg); 584 if (rc < 0) { 585 goto error_src_dst; 586 } 587 588 comp_op->m_src = src_mbufs[0]; 589 comp_op->src.offset = 0; 590 comp_op->src.length = total_length; 591 592 /* setup dst mbufs, for the current test being used with this code there's only one vector */ 593 rc = _setup_compress_mbuf(&dst_mbufs[0], &dst_mbuf_total, NULL, 594 dst_iovs, dst_iovcnt, reduce_cb_arg); 595 if (rc < 0) { 596 goto error_src_dst; 597 } 598 599 comp_op->m_dst = dst_mbufs[0]; 600 comp_op->dst.offset = 0; 601 602 if (compress == true) { 603 comp_op->private_xform = comp_bdev->device_qp->device->comp_xform; 604 } else { 605 comp_op->private_xform = comp_bdev->device_qp->device->decomp_xform; 606 } 607 608 comp_op->op_type = RTE_COMP_OP_STATELESS; 609 comp_op->flush_flag = RTE_COMP_FLUSH_FINAL; 610 611 rc = rte_compressdev_enqueue_burst(cdev_id, comp_bdev->device_qp->qp, &comp_op, 1); 612 assert(rc <= 1); 613 614 /* We always expect 1 got queued, if 0 then we need to queue it up. */ 615 if (rc == 1) { 616 return 0; 617 } else if (comp_op->status == RTE_COMP_OP_STATUS_NOT_PROCESSED) { 618 /* we free mbufs differently depending on whether they were chained or not */ 619 rte_pktmbuf_free(comp_op->m_src); 620 rte_pktmbuf_free(comp_op->m_dst); 621 goto error_enqueue; 622 } else { 623 device_error = true; 624 goto error_src_dst; 625 } 626 627 /* Error cleanup paths. */ 628 error_src_dst: 629 for (i = 0; i < dst_mbuf_total; i++) { 630 rte_pktmbuf_free((struct rte_mbuf *)&dst_mbufs[i]); 631 } 632 error_get_dst: 633 for (i = 0; i < src_mbuf_total; i++) { 634 rte_pktmbuf_free((struct rte_mbuf *)&src_mbufs[i]); 635 } 636 error_get_src: 637 error_enqueue: 638 rte_comp_op_free(comp_op); 639 error_get_op: 640 641 if (device_error == true) { 642 /* There was an error sending the op to the device, most 643 * likely with the parameters. 644 */ 645 SPDK_ERRLOG("Compression API returned 0x%x\n", comp_op->status); 646 return -EINVAL; 647 } 648 649 op_to_queue = calloc(1, sizeof(struct vbdev_comp_op)); 650 if (op_to_queue == NULL) { 651 SPDK_ERRLOG("unable to allocate operation for queueing.\n"); 652 return -ENOMEM; 653 } 654 op_to_queue->backing_dev = backing_dev; 655 op_to_queue->src_iovs = src_iovs; 656 op_to_queue->src_iovcnt = src_iovcnt; 657 op_to_queue->dst_iovs = dst_iovs; 658 op_to_queue->dst_iovcnt = dst_iovcnt; 659 op_to_queue->compress = compress; 660 op_to_queue->cb_arg = cb_arg; 661 TAILQ_INSERT_TAIL(&comp_bdev->queued_comp_ops, 662 op_to_queue, 663 link); 664 return 0; 665 } 666 667 /* Poller for the DPDK compression driver. */ 668 static int 669 comp_dev_poller(void *args) 670 { 671 struct vbdev_compress *comp_bdev = args; 672 uint8_t cdev_id = comp_bdev->device_qp->device->cdev_id; 673 struct rte_comp_op *deq_ops[NUM_MAX_INFLIGHT_OPS]; 674 uint16_t num_deq; 675 struct spdk_reduce_vol_cb_args *reduce_args; 676 struct vbdev_comp_op *op_to_resubmit; 677 int rc, i; 678 679 num_deq = rte_compressdev_dequeue_burst(cdev_id, comp_bdev->device_qp->qp, deq_ops, 680 NUM_MAX_INFLIGHT_OPS); 681 for (i = 0; i < num_deq; i++) { 682 reduce_args = (struct spdk_reduce_vol_cb_args *)*RTE_MBUF_DYNFIELD(deq_ops[i]->m_src, g_mbuf_offset, 683 uint64_t *); 684 if (deq_ops[i]->status == RTE_COMP_OP_STATUS_SUCCESS) { 685 686 /* tell reduce this is done and what the bytecount was */ 687 reduce_args->cb_fn(reduce_args->cb_arg, deq_ops[i]->produced); 688 } else { 689 SPDK_NOTICELOG("FYI storing data uncompressed due to deque status %u\n", 690 deq_ops[i]->status); 691 692 /* Reduce will simply store uncompressed on neg errno value. */ 693 reduce_args->cb_fn(reduce_args->cb_arg, -EINVAL); 694 } 695 696 /* Now free both mbufs and the compress operation. The rte_pktmbuf_free() 697 * call takes care of freeing all of the mbufs in the chain back to their 698 * original pool. 699 */ 700 rte_pktmbuf_free(deq_ops[i]->m_src); 701 rte_pktmbuf_free(deq_ops[i]->m_dst); 702 703 /* There is no bulk free for com ops so we have to free them one at a time 704 * here however it would be rare that we'd ever have more than 1 at a time 705 * anyways. 706 */ 707 rte_comp_op_free(deq_ops[i]); 708 709 /* Check if there are any pending comp ops to process, only pull one 710 * at a time off as _compress_operation() may re-queue the op. 711 */ 712 if (!TAILQ_EMPTY(&comp_bdev->queued_comp_ops)) { 713 op_to_resubmit = TAILQ_FIRST(&comp_bdev->queued_comp_ops); 714 rc = _compress_operation(op_to_resubmit->backing_dev, 715 op_to_resubmit->src_iovs, 716 op_to_resubmit->src_iovcnt, 717 op_to_resubmit->dst_iovs, 718 op_to_resubmit->dst_iovcnt, 719 op_to_resubmit->compress, 720 op_to_resubmit->cb_arg); 721 if (rc == 0) { 722 TAILQ_REMOVE(&comp_bdev->queued_comp_ops, op_to_resubmit, link); 723 free(op_to_resubmit); 724 } 725 } 726 } 727 return num_deq == 0 ? SPDK_POLLER_IDLE : SPDK_POLLER_BUSY; 728 } 729 730 /* Entry point for reduce lib to issue a compress operation. */ 731 static void 732 _comp_reduce_compress(struct spdk_reduce_backing_dev *dev, 733 struct iovec *src_iovs, int src_iovcnt, 734 struct iovec *dst_iovs, int dst_iovcnt, 735 struct spdk_reduce_vol_cb_args *cb_arg) 736 { 737 int rc; 738 739 rc = _compress_operation(dev, src_iovs, src_iovcnt, dst_iovs, dst_iovcnt, true, cb_arg); 740 if (rc) { 741 SPDK_ERRLOG("with compress operation code %d (%s)\n", rc, spdk_strerror(-rc)); 742 cb_arg->cb_fn(cb_arg->cb_arg, rc); 743 } 744 } 745 746 /* Entry point for reduce lib to issue a decompress operation. */ 747 static void 748 _comp_reduce_decompress(struct spdk_reduce_backing_dev *dev, 749 struct iovec *src_iovs, int src_iovcnt, 750 struct iovec *dst_iovs, int dst_iovcnt, 751 struct spdk_reduce_vol_cb_args *cb_arg) 752 { 753 int rc; 754 755 rc = _compress_operation(dev, src_iovs, src_iovcnt, dst_iovs, dst_iovcnt, false, cb_arg); 756 if (rc) { 757 SPDK_ERRLOG("with decompress operation code %d (%s)\n", rc, spdk_strerror(-rc)); 758 cb_arg->cb_fn(cb_arg->cb_arg, rc); 759 } 760 } 761 762 /* Callback for getting a buf from the bdev pool in the event that the caller passed 763 * in NULL, we need to own the buffer so it doesn't get freed by another vbdev module 764 * beneath us before we're done with it. 765 */ 766 static void 767 comp_read_get_buf_cb(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io, bool success) 768 { 769 struct vbdev_compress *comp_bdev = SPDK_CONTAINEROF(bdev_io->bdev, struct vbdev_compress, 770 comp_bdev); 771 772 spdk_reduce_vol_readv(comp_bdev->vol, bdev_io->u.bdev.iovs, bdev_io->u.bdev.iovcnt, 773 bdev_io->u.bdev.offset_blocks, bdev_io->u.bdev.num_blocks, 774 reduce_rw_blocks_cb, bdev_io); 775 } 776 777 /* scheduled for completion on IO thread */ 778 static void 779 _complete_other_io(void *arg) 780 { 781 struct comp_bdev_io *io_ctx = (struct comp_bdev_io *)arg; 782 if (io_ctx->status == 0) { 783 spdk_bdev_io_complete(io_ctx->orig_io, SPDK_BDEV_IO_STATUS_SUCCESS); 784 } else { 785 spdk_bdev_io_complete(io_ctx->orig_io, SPDK_BDEV_IO_STATUS_FAILED); 786 } 787 } 788 789 /* scheduled for submission on reduce thread */ 790 static void 791 _comp_bdev_io_submit(void *arg) 792 { 793 struct spdk_bdev_io *bdev_io = arg; 794 struct comp_bdev_io *io_ctx = (struct comp_bdev_io *)bdev_io->driver_ctx; 795 struct spdk_io_channel *ch = spdk_io_channel_from_ctx(io_ctx->comp_ch); 796 struct vbdev_compress *comp_bdev = SPDK_CONTAINEROF(bdev_io->bdev, struct vbdev_compress, 797 comp_bdev); 798 struct spdk_thread *orig_thread; 799 int rc = 0; 800 801 switch (bdev_io->type) { 802 case SPDK_BDEV_IO_TYPE_READ: 803 spdk_bdev_io_get_buf(bdev_io, comp_read_get_buf_cb, 804 bdev_io->u.bdev.num_blocks * bdev_io->bdev->blocklen); 805 return; 806 case SPDK_BDEV_IO_TYPE_WRITE: 807 spdk_reduce_vol_writev(comp_bdev->vol, bdev_io->u.bdev.iovs, bdev_io->u.bdev.iovcnt, 808 bdev_io->u.bdev.offset_blocks, bdev_io->u.bdev.num_blocks, 809 reduce_rw_blocks_cb, bdev_io); 810 return; 811 /* TODO in future patch in the series */ 812 case SPDK_BDEV_IO_TYPE_RESET: 813 break; 814 case SPDK_BDEV_IO_TYPE_WRITE_ZEROES: 815 case SPDK_BDEV_IO_TYPE_UNMAP: 816 case SPDK_BDEV_IO_TYPE_FLUSH: 817 default: 818 SPDK_ERRLOG("Unknown I/O type %d\n", bdev_io->type); 819 rc = -EINVAL; 820 } 821 822 if (rc) { 823 if (rc == -ENOMEM) { 824 SPDK_ERRLOG("No memory, start to queue io for compress.\n"); 825 io_ctx->ch = ch; 826 vbdev_compress_queue_io(bdev_io); 827 return; 828 } else { 829 SPDK_ERRLOG("on bdev_io submission!\n"); 830 io_ctx->status = rc; 831 } 832 } 833 834 /* Complete this on the orig IO thread. */ 835 orig_thread = spdk_io_channel_get_thread(ch); 836 if (orig_thread != spdk_get_thread()) { 837 spdk_thread_send_msg(orig_thread, _complete_other_io, io_ctx); 838 } else { 839 _complete_other_io(io_ctx); 840 } 841 } 842 843 /* Called when someone above submits IO to this vbdev. */ 844 static void 845 vbdev_compress_submit_request(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io) 846 { 847 struct comp_bdev_io *io_ctx = (struct comp_bdev_io *)bdev_io->driver_ctx; 848 struct vbdev_compress *comp_bdev = SPDK_CONTAINEROF(bdev_io->bdev, struct vbdev_compress, 849 comp_bdev); 850 struct comp_io_channel *comp_ch = spdk_io_channel_get_ctx(ch); 851 852 memset(io_ctx, 0, sizeof(struct comp_bdev_io)); 853 io_ctx->comp_bdev = comp_bdev; 854 io_ctx->comp_ch = comp_ch; 855 io_ctx->orig_io = bdev_io; 856 857 /* Send this request to the reduce_thread if that's not what we're on. */ 858 if (spdk_get_thread() != comp_bdev->reduce_thread) { 859 spdk_thread_send_msg(comp_bdev->reduce_thread, _comp_bdev_io_submit, bdev_io); 860 } else { 861 _comp_bdev_io_submit(bdev_io); 862 } 863 } 864 865 static bool 866 vbdev_compress_io_type_supported(void *ctx, enum spdk_bdev_io_type io_type) 867 { 868 struct vbdev_compress *comp_bdev = (struct vbdev_compress *)ctx; 869 870 switch (io_type) { 871 case SPDK_BDEV_IO_TYPE_READ: 872 case SPDK_BDEV_IO_TYPE_WRITE: 873 return spdk_bdev_io_type_supported(comp_bdev->base_bdev, io_type); 874 case SPDK_BDEV_IO_TYPE_UNMAP: 875 case SPDK_BDEV_IO_TYPE_RESET: 876 case SPDK_BDEV_IO_TYPE_FLUSH: 877 case SPDK_BDEV_IO_TYPE_WRITE_ZEROES: 878 default: 879 return false; 880 } 881 } 882 883 /* Resubmission function used by the bdev layer when a queued IO is ready to be 884 * submitted. 885 */ 886 static void 887 vbdev_compress_resubmit_io(void *arg) 888 { 889 struct spdk_bdev_io *bdev_io = (struct spdk_bdev_io *)arg; 890 struct comp_bdev_io *io_ctx = (struct comp_bdev_io *)bdev_io->driver_ctx; 891 892 vbdev_compress_submit_request(io_ctx->ch, bdev_io); 893 } 894 895 /* Used to queue an IO in the event of resource issues. */ 896 static void 897 vbdev_compress_queue_io(struct spdk_bdev_io *bdev_io) 898 { 899 struct comp_bdev_io *io_ctx = (struct comp_bdev_io *)bdev_io->driver_ctx; 900 int rc; 901 902 io_ctx->bdev_io_wait.bdev = bdev_io->bdev; 903 io_ctx->bdev_io_wait.cb_fn = vbdev_compress_resubmit_io; 904 io_ctx->bdev_io_wait.cb_arg = bdev_io; 905 906 rc = spdk_bdev_queue_io_wait(bdev_io->bdev, io_ctx->comp_bdev->base_ch, &io_ctx->bdev_io_wait); 907 if (rc) { 908 SPDK_ERRLOG("Queue io failed in vbdev_compress_queue_io, rc=%d.\n", rc); 909 assert(false); 910 spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED); 911 } 912 } 913 914 /* Callback for unregistering the IO device. */ 915 static void 916 _device_unregister_cb(void *io_device) 917 { 918 struct vbdev_compress *comp_bdev = io_device; 919 920 /* Done with this comp_bdev. */ 921 pthread_mutex_destroy(&comp_bdev->reduce_lock); 922 free(comp_bdev->comp_bdev.name); 923 free(comp_bdev); 924 } 925 926 static void 927 _vbdev_compress_destruct_cb(void *ctx) 928 { 929 struct vbdev_compress *comp_bdev = ctx; 930 931 TAILQ_REMOVE(&g_vbdev_comp, comp_bdev, link); 932 spdk_bdev_module_release_bdev(comp_bdev->base_bdev); 933 /* Close the underlying bdev on its same opened thread. */ 934 spdk_bdev_close(comp_bdev->base_desc); 935 comp_bdev->vol = NULL; 936 if (comp_bdev->orphaned == false) { 937 spdk_io_device_unregister(comp_bdev, _device_unregister_cb); 938 } else { 939 vbdev_compress_delete_done(comp_bdev->delete_ctx, 0); 940 _device_unregister_cb(comp_bdev); 941 } 942 } 943 944 static void 945 vbdev_compress_destruct_cb(void *cb_arg, int reduce_errno) 946 { 947 struct vbdev_compress *comp_bdev = (struct vbdev_compress *)cb_arg; 948 949 if (reduce_errno) { 950 SPDK_ERRLOG("number %d\n", reduce_errno); 951 } else { 952 if (comp_bdev->thread && comp_bdev->thread != spdk_get_thread()) { 953 spdk_thread_send_msg(comp_bdev->thread, 954 _vbdev_compress_destruct_cb, comp_bdev); 955 } else { 956 _vbdev_compress_destruct_cb(comp_bdev); 957 } 958 } 959 } 960 961 static void 962 _reduce_destroy_cb(void *ctx, int reduce_errno) 963 { 964 struct vbdev_compress *comp_bdev = (struct vbdev_compress *)ctx; 965 966 if (reduce_errno) { 967 SPDK_ERRLOG("number %d\n", reduce_errno); 968 } 969 970 comp_bdev->vol = NULL; 971 spdk_put_io_channel(comp_bdev->base_ch); 972 if (comp_bdev->orphaned == false) { 973 spdk_bdev_unregister(&comp_bdev->comp_bdev, vbdev_compress_delete_done, 974 comp_bdev->delete_ctx); 975 } else { 976 vbdev_compress_destruct_cb((void *)comp_bdev, 0); 977 } 978 979 } 980 981 static void 982 _delete_vol_unload_cb(void *ctx) 983 { 984 struct vbdev_compress *comp_bdev = ctx; 985 986 /* FIXME: Assert if these conditions are not satisfied for now. */ 987 assert(!comp_bdev->reduce_thread || 988 comp_bdev->reduce_thread == spdk_get_thread()); 989 990 /* reducelib needs a channel to comm with the backing device */ 991 comp_bdev->base_ch = spdk_bdev_get_io_channel(comp_bdev->base_desc); 992 993 /* Clean the device before we free our resources. */ 994 spdk_reduce_vol_destroy(&comp_bdev->backing_dev, _reduce_destroy_cb, comp_bdev); 995 } 996 997 /* Called by reduceLib after performing unload vol actions */ 998 static void 999 delete_vol_unload_cb(void *cb_arg, int reduce_errno) 1000 { 1001 struct vbdev_compress *comp_bdev = (struct vbdev_compress *)cb_arg; 1002 1003 if (reduce_errno) { 1004 SPDK_ERRLOG("number %d\n", reduce_errno); 1005 /* FIXME: callback should be executed. */ 1006 return; 1007 } 1008 1009 pthread_mutex_lock(&comp_bdev->reduce_lock); 1010 if (comp_bdev->reduce_thread && comp_bdev->reduce_thread != spdk_get_thread()) { 1011 spdk_thread_send_msg(comp_bdev->reduce_thread, 1012 _delete_vol_unload_cb, comp_bdev); 1013 pthread_mutex_unlock(&comp_bdev->reduce_lock); 1014 } else { 1015 pthread_mutex_unlock(&comp_bdev->reduce_lock); 1016 1017 _delete_vol_unload_cb(comp_bdev); 1018 } 1019 } 1020 1021 const char * 1022 compress_get_name(const struct vbdev_compress *comp_bdev) 1023 { 1024 return comp_bdev->comp_bdev.name; 1025 } 1026 1027 struct vbdev_compress * 1028 compress_bdev_first(void) 1029 { 1030 struct vbdev_compress *comp_bdev; 1031 1032 comp_bdev = TAILQ_FIRST(&g_vbdev_comp); 1033 1034 return comp_bdev; 1035 } 1036 1037 struct vbdev_compress * 1038 compress_bdev_next(struct vbdev_compress *prev) 1039 { 1040 struct vbdev_compress *comp_bdev; 1041 1042 comp_bdev = TAILQ_NEXT(prev, link); 1043 1044 return comp_bdev; 1045 } 1046 1047 bool 1048 compress_has_orphan(const char *name) 1049 { 1050 struct vbdev_compress *comp_bdev; 1051 1052 TAILQ_FOREACH(comp_bdev, &g_vbdev_comp, link) { 1053 if (comp_bdev->orphaned && strcmp(name, comp_bdev->comp_bdev.name) == 0) { 1054 return true; 1055 } 1056 } 1057 return false; 1058 } 1059 1060 /* Called after we've unregistered following a hot remove callback. 1061 * Our finish entry point will be called next. 1062 */ 1063 static int 1064 vbdev_compress_destruct(void *ctx) 1065 { 1066 struct vbdev_compress *comp_bdev = (struct vbdev_compress *)ctx; 1067 1068 if (comp_bdev->vol != NULL) { 1069 /* Tell reducelib that we're done with this volume. */ 1070 spdk_reduce_vol_unload(comp_bdev->vol, vbdev_compress_destruct_cb, comp_bdev); 1071 } else { 1072 vbdev_compress_destruct_cb(comp_bdev, 0); 1073 } 1074 1075 return 0; 1076 } 1077 1078 /* We supplied this as an entry point for upper layers who want to communicate to this 1079 * bdev. This is how they get a channel. 1080 */ 1081 static struct spdk_io_channel * 1082 vbdev_compress_get_io_channel(void *ctx) 1083 { 1084 struct vbdev_compress *comp_bdev = (struct vbdev_compress *)ctx; 1085 1086 /* The IO channel code will allocate a channel for us which consists of 1087 * the SPDK channel structure plus the size of our comp_io_channel struct 1088 * that we passed in when we registered our IO device. It will then call 1089 * our channel create callback to populate any elements that we need to 1090 * update. 1091 */ 1092 return spdk_get_io_channel(comp_bdev); 1093 } 1094 1095 /* This is the output for bdev_get_bdevs() for this vbdev */ 1096 static int 1097 vbdev_compress_dump_info_json(void *ctx, struct spdk_json_write_ctx *w) 1098 { 1099 struct vbdev_compress *comp_bdev = (struct vbdev_compress *)ctx; 1100 1101 spdk_json_write_name(w, "compress"); 1102 spdk_json_write_object_begin(w); 1103 spdk_json_write_named_string(w, "name", spdk_bdev_get_name(&comp_bdev->comp_bdev)); 1104 spdk_json_write_named_string(w, "base_bdev_name", spdk_bdev_get_name(comp_bdev->base_bdev)); 1105 spdk_json_write_named_string(w, "compression_pmd", comp_bdev->drv_name); 1106 spdk_json_write_object_end(w); 1107 1108 return 0; 1109 } 1110 1111 /* This is used to generate JSON that can configure this module to its current state. */ 1112 static int 1113 vbdev_compress_config_json(struct spdk_json_write_ctx *w) 1114 { 1115 struct vbdev_compress *comp_bdev; 1116 1117 TAILQ_FOREACH(comp_bdev, &g_vbdev_comp, link) { 1118 spdk_json_write_object_begin(w); 1119 spdk_json_write_named_string(w, "method", "bdev_compress_create"); 1120 spdk_json_write_named_object_begin(w, "params"); 1121 spdk_json_write_named_string(w, "base_bdev_name", spdk_bdev_get_name(comp_bdev->base_bdev)); 1122 spdk_json_write_named_string(w, "name", spdk_bdev_get_name(&comp_bdev->comp_bdev)); 1123 spdk_json_write_named_string(w, "compression_pmd", comp_bdev->drv_name); 1124 spdk_json_write_object_end(w); 1125 spdk_json_write_object_end(w); 1126 } 1127 return 0; 1128 } 1129 1130 static void 1131 _vbdev_reduce_init_cb(void *ctx) 1132 { 1133 struct vbdev_compress *meta_ctx = ctx; 1134 int rc; 1135 1136 assert(meta_ctx->base_desc != NULL); 1137 1138 /* We're done with metadata operations */ 1139 spdk_put_io_channel(meta_ctx->base_ch); 1140 1141 if (meta_ctx->vol) { 1142 rc = vbdev_compress_claim(meta_ctx); 1143 if (rc == 0) { 1144 return; 1145 } 1146 } 1147 1148 /* Close the underlying bdev on its same opened thread. */ 1149 spdk_bdev_close(meta_ctx->base_desc); 1150 free(meta_ctx); 1151 } 1152 1153 /* Callback from reduce for when init is complete. We'll pass the vbdev_comp struct 1154 * used for initial metadata operations to claim where it will be further filled out 1155 * and added to the global list. 1156 */ 1157 static void 1158 vbdev_reduce_init_cb(void *cb_arg, struct spdk_reduce_vol *vol, int reduce_errno) 1159 { 1160 struct vbdev_compress *meta_ctx = cb_arg; 1161 1162 if (reduce_errno == 0) { 1163 meta_ctx->vol = vol; 1164 } else { 1165 SPDK_ERRLOG("for vol %s, error %u\n", 1166 spdk_bdev_get_name(meta_ctx->base_bdev), reduce_errno); 1167 } 1168 1169 if (meta_ctx->thread && meta_ctx->thread != spdk_get_thread()) { 1170 spdk_thread_send_msg(meta_ctx->thread, _vbdev_reduce_init_cb, meta_ctx); 1171 } else { 1172 _vbdev_reduce_init_cb(meta_ctx); 1173 } 1174 } 1175 1176 /* Callback for the function used by reduceLib to perform IO to/from the backing device. We just 1177 * call the callback provided by reduceLib when it called the read/write/unmap function and 1178 * free the bdev_io. 1179 */ 1180 static void 1181 comp_reduce_io_cb(struct spdk_bdev_io *bdev_io, bool success, void *arg) 1182 { 1183 struct spdk_reduce_vol_cb_args *cb_args = arg; 1184 int reduce_errno; 1185 1186 if (success) { 1187 reduce_errno = 0; 1188 } else { 1189 reduce_errno = -EIO; 1190 } 1191 spdk_bdev_free_io(bdev_io); 1192 cb_args->cb_fn(cb_args->cb_arg, reduce_errno); 1193 } 1194 1195 /* This is the function provided to the reduceLib for sending reads directly to 1196 * the backing device. 1197 */ 1198 static void 1199 _comp_reduce_readv(struct spdk_reduce_backing_dev *dev, struct iovec *iov, int iovcnt, 1200 uint64_t lba, uint32_t lba_count, struct spdk_reduce_vol_cb_args *args) 1201 { 1202 struct vbdev_compress *comp_bdev = SPDK_CONTAINEROF(dev, struct vbdev_compress, 1203 backing_dev); 1204 int rc; 1205 1206 rc = spdk_bdev_readv_blocks(comp_bdev->base_desc, comp_bdev->base_ch, 1207 iov, iovcnt, lba, lba_count, 1208 comp_reduce_io_cb, 1209 args); 1210 if (rc) { 1211 if (rc == -ENOMEM) { 1212 SPDK_ERRLOG("No memory, start to queue io.\n"); 1213 /* TODO: there's no bdev_io to queue */ 1214 } else { 1215 SPDK_ERRLOG("submitting readv request\n"); 1216 } 1217 args->cb_fn(args->cb_arg, rc); 1218 } 1219 } 1220 1221 /* This is the function provided to the reduceLib for sending writes directly to 1222 * the backing device. 1223 */ 1224 static void 1225 _comp_reduce_writev(struct spdk_reduce_backing_dev *dev, struct iovec *iov, int iovcnt, 1226 uint64_t lba, uint32_t lba_count, struct spdk_reduce_vol_cb_args *args) 1227 { 1228 struct vbdev_compress *comp_bdev = SPDK_CONTAINEROF(dev, struct vbdev_compress, 1229 backing_dev); 1230 int rc; 1231 1232 rc = spdk_bdev_writev_blocks(comp_bdev->base_desc, comp_bdev->base_ch, 1233 iov, iovcnt, lba, lba_count, 1234 comp_reduce_io_cb, 1235 args); 1236 if (rc) { 1237 if (rc == -ENOMEM) { 1238 SPDK_ERRLOG("No memory, start to queue io.\n"); 1239 /* TODO: there's no bdev_io to queue */ 1240 } else { 1241 SPDK_ERRLOG("error submitting writev request\n"); 1242 } 1243 args->cb_fn(args->cb_arg, rc); 1244 } 1245 } 1246 1247 /* This is the function provided to the reduceLib for sending unmaps directly to 1248 * the backing device. 1249 */ 1250 static void 1251 _comp_reduce_unmap(struct spdk_reduce_backing_dev *dev, 1252 uint64_t lba, uint32_t lba_count, struct spdk_reduce_vol_cb_args *args) 1253 { 1254 struct vbdev_compress *comp_bdev = SPDK_CONTAINEROF(dev, struct vbdev_compress, 1255 backing_dev); 1256 int rc; 1257 1258 rc = spdk_bdev_unmap_blocks(comp_bdev->base_desc, comp_bdev->base_ch, 1259 lba, lba_count, 1260 comp_reduce_io_cb, 1261 args); 1262 1263 if (rc) { 1264 if (rc == -ENOMEM) { 1265 SPDK_ERRLOG("No memory, start to queue io.\n"); 1266 /* TODO: there's no bdev_io to queue */ 1267 } else { 1268 SPDK_ERRLOG("submitting unmap request\n"); 1269 } 1270 args->cb_fn(args->cb_arg, rc); 1271 } 1272 } 1273 1274 /* Called by reduceLib after performing unload vol actions following base bdev hotremove */ 1275 static void 1276 bdev_hotremove_vol_unload_cb(void *cb_arg, int reduce_errno) 1277 { 1278 struct vbdev_compress *comp_bdev = (struct vbdev_compress *)cb_arg; 1279 1280 if (reduce_errno) { 1281 SPDK_ERRLOG("number %d\n", reduce_errno); 1282 } 1283 1284 comp_bdev->vol = NULL; 1285 spdk_bdev_unregister(&comp_bdev->comp_bdev, NULL, NULL); 1286 } 1287 1288 static void 1289 vbdev_compress_base_bdev_hotremove_cb(struct spdk_bdev *bdev_find) 1290 { 1291 struct vbdev_compress *comp_bdev, *tmp; 1292 1293 TAILQ_FOREACH_SAFE(comp_bdev, &g_vbdev_comp, link, tmp) { 1294 if (bdev_find == comp_bdev->base_bdev) { 1295 /* Tell reduceLib that we're done with this volume. */ 1296 spdk_reduce_vol_unload(comp_bdev->vol, bdev_hotremove_vol_unload_cb, comp_bdev); 1297 } 1298 } 1299 } 1300 1301 /* Called when the underlying base bdev triggers asynchronous event such as bdev removal. */ 1302 static void 1303 vbdev_compress_base_bdev_event_cb(enum spdk_bdev_event_type type, struct spdk_bdev *bdev, 1304 void *event_ctx) 1305 { 1306 switch (type) { 1307 case SPDK_BDEV_EVENT_REMOVE: 1308 vbdev_compress_base_bdev_hotremove_cb(bdev); 1309 break; 1310 default: 1311 SPDK_NOTICELOG("Unsupported bdev event: type %d\n", type); 1312 break; 1313 } 1314 } 1315 1316 /* TODO: determine which parms we want user configurable, HC for now 1317 * params.vol_size 1318 * params.chunk_size 1319 * compression PMD, algorithm, window size, comp level, etc. 1320 * DEV_MD_PATH 1321 */ 1322 1323 /* Common function for init and load to allocate and populate the minimal 1324 * information for reducelib to init or load. 1325 */ 1326 struct vbdev_compress * 1327 _prepare_for_load_init(struct spdk_bdev_desc *bdev_desc, uint32_t lb_size) 1328 { 1329 struct vbdev_compress *meta_ctx; 1330 struct spdk_bdev *bdev; 1331 1332 meta_ctx = calloc(1, sizeof(struct vbdev_compress)); 1333 if (meta_ctx == NULL) { 1334 SPDK_ERRLOG("failed to alloc init contexts\n"); 1335 return NULL; 1336 } 1337 1338 meta_ctx->drv_name = "None"; 1339 meta_ctx->backing_dev.unmap = _comp_reduce_unmap; 1340 meta_ctx->backing_dev.readv = _comp_reduce_readv; 1341 meta_ctx->backing_dev.writev = _comp_reduce_writev; 1342 meta_ctx->backing_dev.compress = _comp_reduce_compress; 1343 meta_ctx->backing_dev.decompress = _comp_reduce_decompress; 1344 1345 meta_ctx->base_desc = bdev_desc; 1346 bdev = spdk_bdev_desc_get_bdev(bdev_desc); 1347 meta_ctx->base_bdev = bdev; 1348 1349 meta_ctx->backing_dev.blocklen = bdev->blocklen; 1350 meta_ctx->backing_dev.blockcnt = bdev->blockcnt; 1351 1352 meta_ctx->params.chunk_size = CHUNK_SIZE; 1353 if (lb_size == 0) { 1354 meta_ctx->params.logical_block_size = bdev->blocklen; 1355 } else { 1356 meta_ctx->params.logical_block_size = lb_size; 1357 } 1358 1359 meta_ctx->params.backing_io_unit_size = BACKING_IO_SZ; 1360 return meta_ctx; 1361 } 1362 1363 static bool 1364 _set_pmd(struct vbdev_compress *comp_dev) 1365 { 1366 if (g_opts == COMPRESS_PMD_AUTO) { 1367 if (g_qat_available) { 1368 comp_dev->drv_name = QAT_PMD; 1369 } else if (g_mlx5_pci_available) { 1370 comp_dev->drv_name = MLX5_PMD; 1371 } else { 1372 comp_dev->drv_name = ISAL_PMD; 1373 } 1374 } else if (g_opts == COMPRESS_PMD_QAT_ONLY && g_qat_available) { 1375 comp_dev->drv_name = QAT_PMD; 1376 } else if (g_opts == COMPRESS_PMD_ISAL_ONLY && g_isal_available) { 1377 comp_dev->drv_name = ISAL_PMD; 1378 } else if (g_opts == COMPRESS_PMD_MLX5_PCI_ONLY && g_mlx5_pci_available) { 1379 comp_dev->drv_name = MLX5_PMD; 1380 } else { 1381 SPDK_ERRLOG("Requested PMD is not available.\n"); 1382 return false; 1383 } 1384 SPDK_NOTICELOG("PMD being used: %s\n", comp_dev->drv_name); 1385 return true; 1386 } 1387 1388 /* Call reducelib to initialize a new volume */ 1389 static int 1390 vbdev_init_reduce(const char *bdev_name, const char *pm_path, uint32_t lb_size) 1391 { 1392 struct spdk_bdev_desc *bdev_desc = NULL; 1393 struct vbdev_compress *meta_ctx; 1394 int rc; 1395 1396 rc = spdk_bdev_open_ext(bdev_name, true, vbdev_compress_base_bdev_event_cb, 1397 NULL, &bdev_desc); 1398 if (rc) { 1399 SPDK_ERRLOG("could not open bdev %s\n", bdev_name); 1400 return rc; 1401 } 1402 1403 meta_ctx = _prepare_for_load_init(bdev_desc, lb_size); 1404 if (meta_ctx == NULL) { 1405 spdk_bdev_close(bdev_desc); 1406 return -EINVAL; 1407 } 1408 1409 if (_set_pmd(meta_ctx) == false) { 1410 SPDK_ERRLOG("could not find required pmd\n"); 1411 free(meta_ctx); 1412 spdk_bdev_close(bdev_desc); 1413 return -EINVAL; 1414 } 1415 1416 /* Save the thread where the base device is opened */ 1417 meta_ctx->thread = spdk_get_thread(); 1418 1419 meta_ctx->base_ch = spdk_bdev_get_io_channel(meta_ctx->base_desc); 1420 1421 spdk_reduce_vol_init(&meta_ctx->params, &meta_ctx->backing_dev, 1422 pm_path, 1423 vbdev_reduce_init_cb, 1424 meta_ctx); 1425 return 0; 1426 } 1427 1428 /* We provide this callback for the SPDK channel code to create a channel using 1429 * the channel struct we provided in our module get_io_channel() entry point. Here 1430 * we get and save off an underlying base channel of the device below us so that 1431 * we can communicate with the base bdev on a per channel basis. If we needed 1432 * our own poller for this vbdev, we'd register it here. 1433 */ 1434 static int 1435 comp_bdev_ch_create_cb(void *io_device, void *ctx_buf) 1436 { 1437 struct vbdev_compress *comp_bdev = io_device; 1438 struct comp_device_qp *device_qp; 1439 1440 /* Now set the reduce channel if it's not already set. */ 1441 pthread_mutex_lock(&comp_bdev->reduce_lock); 1442 if (comp_bdev->ch_count == 0) { 1443 /* We use this queue to track outstanding IO in our layer. */ 1444 TAILQ_INIT(&comp_bdev->pending_comp_ios); 1445 1446 /* We use this to queue up compression operations as needed. */ 1447 TAILQ_INIT(&comp_bdev->queued_comp_ops); 1448 1449 comp_bdev->base_ch = spdk_bdev_get_io_channel(comp_bdev->base_desc); 1450 comp_bdev->reduce_thread = spdk_get_thread(); 1451 comp_bdev->poller = SPDK_POLLER_REGISTER(comp_dev_poller, comp_bdev, 0); 1452 /* Now assign a q pair */ 1453 pthread_mutex_lock(&g_comp_device_qp_lock); 1454 TAILQ_FOREACH(device_qp, &g_comp_device_qp, link) { 1455 if (strcmp(device_qp->device->cdev_info.driver_name, comp_bdev->drv_name) == 0) { 1456 if (device_qp->thread == spdk_get_thread()) { 1457 comp_bdev->device_qp = device_qp; 1458 break; 1459 } 1460 if (device_qp->thread == NULL) { 1461 comp_bdev->device_qp = device_qp; 1462 device_qp->thread = spdk_get_thread(); 1463 break; 1464 } 1465 } 1466 } 1467 pthread_mutex_unlock(&g_comp_device_qp_lock); 1468 } 1469 comp_bdev->ch_count++; 1470 pthread_mutex_unlock(&comp_bdev->reduce_lock); 1471 1472 if (comp_bdev->device_qp != NULL) { 1473 return 0; 1474 } else { 1475 SPDK_ERRLOG("out of qpairs, cannot assign one to comp_bdev %p\n", comp_bdev); 1476 assert(false); 1477 return -ENOMEM; 1478 } 1479 } 1480 1481 static void 1482 _channel_cleanup(struct vbdev_compress *comp_bdev) 1483 { 1484 /* Note: comp_bdevs can share a device_qp if they are 1485 * on the same thread so we leave the device_qp element 1486 * alone for this comp_bdev and just clear the reduce thread. 1487 */ 1488 spdk_put_io_channel(comp_bdev->base_ch); 1489 comp_bdev->reduce_thread = NULL; 1490 spdk_poller_unregister(&comp_bdev->poller); 1491 } 1492 1493 /* Used to reroute destroy_ch to the correct thread */ 1494 static void 1495 _comp_bdev_ch_destroy_cb(void *arg) 1496 { 1497 struct vbdev_compress *comp_bdev = arg; 1498 1499 pthread_mutex_lock(&comp_bdev->reduce_lock); 1500 _channel_cleanup(comp_bdev); 1501 pthread_mutex_unlock(&comp_bdev->reduce_lock); 1502 } 1503 1504 /* We provide this callback for the SPDK channel code to destroy a channel 1505 * created with our create callback. We just need to undo anything we did 1506 * when we created. If this bdev used its own poller, we'd unregister it here. 1507 */ 1508 static void 1509 comp_bdev_ch_destroy_cb(void *io_device, void *ctx_buf) 1510 { 1511 struct vbdev_compress *comp_bdev = io_device; 1512 1513 pthread_mutex_lock(&comp_bdev->reduce_lock); 1514 comp_bdev->ch_count--; 1515 if (comp_bdev->ch_count == 0) { 1516 /* Send this request to the thread where the channel was created. */ 1517 if (comp_bdev->reduce_thread != spdk_get_thread()) { 1518 spdk_thread_send_msg(comp_bdev->reduce_thread, 1519 _comp_bdev_ch_destroy_cb, comp_bdev); 1520 } else { 1521 _channel_cleanup(comp_bdev); 1522 } 1523 } 1524 pthread_mutex_unlock(&comp_bdev->reduce_lock); 1525 } 1526 1527 /* RPC entry point for compression vbdev creation. */ 1528 int 1529 create_compress_bdev(const char *bdev_name, const char *pm_path, uint32_t lb_size) 1530 { 1531 if ((lb_size != 0) && (lb_size != LB_SIZE_4K) && (lb_size != LB_SIZE_512B)) { 1532 SPDK_ERRLOG("Logical block size must be 512 or 4096\n"); 1533 return -EINVAL; 1534 } 1535 1536 return vbdev_init_reduce(bdev_name, pm_path, lb_size); 1537 } 1538 1539 /* On init, just init the compress drivers. All metadata is stored on disk. */ 1540 static int 1541 vbdev_compress_init(void) 1542 { 1543 if (vbdev_init_compress_drivers()) { 1544 SPDK_ERRLOG("Error setting up compression devices\n"); 1545 return -EINVAL; 1546 } 1547 1548 return 0; 1549 } 1550 1551 /* Called when the entire module is being torn down. */ 1552 static void 1553 vbdev_compress_finish(void) 1554 { 1555 struct comp_device_qp *dev_qp; 1556 /* TODO: unload vol in a future patch */ 1557 1558 while ((dev_qp = TAILQ_FIRST(&g_comp_device_qp))) { 1559 TAILQ_REMOVE(&g_comp_device_qp, dev_qp, link); 1560 free(dev_qp); 1561 } 1562 pthread_mutex_destroy(&g_comp_device_qp_lock); 1563 1564 rte_mempool_free(g_comp_op_mp); 1565 rte_mempool_free(g_mbuf_mp); 1566 } 1567 1568 /* During init we'll be asked how much memory we'd like passed to us 1569 * in bev_io structures as context. Here's where we specify how 1570 * much context we want per IO. 1571 */ 1572 static int 1573 vbdev_compress_get_ctx_size(void) 1574 { 1575 return sizeof(struct comp_bdev_io); 1576 } 1577 1578 /* When we register our bdev this is how we specify our entry points. */ 1579 static const struct spdk_bdev_fn_table vbdev_compress_fn_table = { 1580 .destruct = vbdev_compress_destruct, 1581 .submit_request = vbdev_compress_submit_request, 1582 .io_type_supported = vbdev_compress_io_type_supported, 1583 .get_io_channel = vbdev_compress_get_io_channel, 1584 .dump_info_json = vbdev_compress_dump_info_json, 1585 .write_config_json = NULL, 1586 }; 1587 1588 static struct spdk_bdev_module compress_if = { 1589 .name = "compress", 1590 .module_init = vbdev_compress_init, 1591 .get_ctx_size = vbdev_compress_get_ctx_size, 1592 .examine_disk = vbdev_compress_examine, 1593 .module_fini = vbdev_compress_finish, 1594 .config_json = vbdev_compress_config_json 1595 }; 1596 1597 SPDK_BDEV_MODULE_REGISTER(compress, &compress_if) 1598 1599 static int _set_compbdev_name(struct vbdev_compress *comp_bdev) 1600 { 1601 struct spdk_bdev_alias *aliases; 1602 1603 if (!TAILQ_EMPTY(spdk_bdev_get_aliases(comp_bdev->base_bdev))) { 1604 aliases = TAILQ_FIRST(spdk_bdev_get_aliases(comp_bdev->base_bdev)); 1605 comp_bdev->comp_bdev.name = spdk_sprintf_alloc("COMP_%s", aliases->alias.name); 1606 if (!comp_bdev->comp_bdev.name) { 1607 SPDK_ERRLOG("could not allocate comp_bdev name for alias\n"); 1608 return -ENOMEM; 1609 } 1610 } else { 1611 comp_bdev->comp_bdev.name = spdk_sprintf_alloc("COMP_%s", comp_bdev->base_bdev->name); 1612 if (!comp_bdev->comp_bdev.name) { 1613 SPDK_ERRLOG("could not allocate comp_bdev name for unique name\n"); 1614 return -ENOMEM; 1615 } 1616 } 1617 return 0; 1618 } 1619 1620 static int 1621 vbdev_compress_claim(struct vbdev_compress *comp_bdev) 1622 { 1623 int rc; 1624 1625 if (_set_compbdev_name(comp_bdev)) { 1626 return -EINVAL; 1627 } 1628 1629 /* Note: some of the fields below will change in the future - for example, 1630 * blockcnt specifically will not match (the compressed volume size will 1631 * be slightly less than the base bdev size) 1632 */ 1633 comp_bdev->comp_bdev.product_name = COMP_BDEV_NAME; 1634 comp_bdev->comp_bdev.write_cache = comp_bdev->base_bdev->write_cache; 1635 1636 if (strcmp(comp_bdev->drv_name, QAT_PMD) == 0) { 1637 comp_bdev->comp_bdev.required_alignment = 1638 spdk_max(spdk_u32log2(comp_bdev->base_bdev->blocklen), 1639 comp_bdev->base_bdev->required_alignment); 1640 SPDK_NOTICELOG("QAT in use: Required alignment set to %u\n", 1641 comp_bdev->comp_bdev.required_alignment); 1642 } else { 1643 comp_bdev->comp_bdev.required_alignment = comp_bdev->base_bdev->required_alignment; 1644 } 1645 comp_bdev->comp_bdev.optimal_io_boundary = 1646 comp_bdev->params.chunk_size / comp_bdev->params.logical_block_size; 1647 1648 comp_bdev->comp_bdev.split_on_optimal_io_boundary = true; 1649 1650 comp_bdev->comp_bdev.blocklen = comp_bdev->params.logical_block_size; 1651 comp_bdev->comp_bdev.blockcnt = comp_bdev->params.vol_size / comp_bdev->comp_bdev.blocklen; 1652 assert(comp_bdev->comp_bdev.blockcnt > 0); 1653 1654 /* This is the context that is passed to us when the bdev 1655 * layer calls in so we'll save our comp_bdev node here. 1656 */ 1657 comp_bdev->comp_bdev.ctxt = comp_bdev; 1658 comp_bdev->comp_bdev.fn_table = &vbdev_compress_fn_table; 1659 comp_bdev->comp_bdev.module = &compress_if; 1660 1661 pthread_mutex_init(&comp_bdev->reduce_lock, NULL); 1662 1663 /* Save the thread where the base device is opened */ 1664 comp_bdev->thread = spdk_get_thread(); 1665 1666 spdk_io_device_register(comp_bdev, comp_bdev_ch_create_cb, comp_bdev_ch_destroy_cb, 1667 sizeof(struct comp_io_channel), 1668 comp_bdev->comp_bdev.name); 1669 1670 rc = spdk_bdev_module_claim_bdev(comp_bdev->base_bdev, comp_bdev->base_desc, 1671 comp_bdev->comp_bdev.module); 1672 if (rc) { 1673 SPDK_ERRLOG("could not claim bdev %s\n", spdk_bdev_get_name(comp_bdev->base_bdev)); 1674 goto error_claim; 1675 } 1676 1677 rc = spdk_bdev_register(&comp_bdev->comp_bdev); 1678 if (rc < 0) { 1679 SPDK_ERRLOG("trying to register bdev\n"); 1680 goto error_bdev_register; 1681 } 1682 1683 TAILQ_INSERT_TAIL(&g_vbdev_comp, comp_bdev, link); 1684 1685 SPDK_NOTICELOG("registered io_device and virtual bdev for: %s\n", comp_bdev->comp_bdev.name); 1686 1687 return 0; 1688 1689 /* Error cleanup paths. */ 1690 error_bdev_register: 1691 spdk_bdev_module_release_bdev(comp_bdev->base_bdev); 1692 error_claim: 1693 spdk_io_device_unregister(comp_bdev, NULL); 1694 free(comp_bdev->comp_bdev.name); 1695 return rc; 1696 } 1697 1698 static void 1699 _vbdev_compress_delete_done(void *_ctx) 1700 { 1701 struct vbdev_comp_delete_ctx *ctx = _ctx; 1702 1703 ctx->cb_fn(ctx->cb_arg, ctx->cb_rc); 1704 1705 free(ctx); 1706 } 1707 1708 static void 1709 vbdev_compress_delete_done(void *cb_arg, int bdeverrno) 1710 { 1711 struct vbdev_comp_delete_ctx *ctx = cb_arg; 1712 1713 ctx->cb_rc = bdeverrno; 1714 1715 if (ctx->orig_thread != spdk_get_thread()) { 1716 spdk_thread_send_msg(ctx->orig_thread, _vbdev_compress_delete_done, ctx); 1717 } else { 1718 _vbdev_compress_delete_done(ctx); 1719 } 1720 } 1721 1722 void 1723 bdev_compress_delete(const char *name, spdk_delete_compress_complete cb_fn, void *cb_arg) 1724 { 1725 struct vbdev_compress *comp_bdev = NULL; 1726 struct vbdev_comp_delete_ctx *ctx; 1727 1728 TAILQ_FOREACH(comp_bdev, &g_vbdev_comp, link) { 1729 if (strcmp(name, comp_bdev->comp_bdev.name) == 0) { 1730 break; 1731 } 1732 } 1733 1734 if (comp_bdev == NULL) { 1735 cb_fn(cb_arg, -ENODEV); 1736 return; 1737 } 1738 1739 ctx = calloc(1, sizeof(*ctx)); 1740 if (ctx == NULL) { 1741 SPDK_ERRLOG("Failed to allocate delete context\n"); 1742 cb_fn(cb_arg, -ENOMEM); 1743 return; 1744 } 1745 1746 /* Save these for after the vol is destroyed. */ 1747 ctx->cb_fn = cb_fn; 1748 ctx->cb_arg = cb_arg; 1749 ctx->orig_thread = spdk_get_thread(); 1750 1751 comp_bdev->delete_ctx = ctx; 1752 1753 /* Tell reducelib that we're done with this volume. */ 1754 if (comp_bdev->orphaned == false) { 1755 spdk_reduce_vol_unload(comp_bdev->vol, delete_vol_unload_cb, comp_bdev); 1756 } else { 1757 delete_vol_unload_cb(comp_bdev, 0); 1758 } 1759 } 1760 1761 static void 1762 _vbdev_reduce_load_cb(void *ctx) 1763 { 1764 struct vbdev_compress *meta_ctx = ctx; 1765 int rc; 1766 1767 assert(meta_ctx->base_desc != NULL); 1768 1769 /* Done with metadata operations */ 1770 spdk_put_io_channel(meta_ctx->base_ch); 1771 1772 if (meta_ctx->reduce_errno == 0) { 1773 if (_set_pmd(meta_ctx) == false) { 1774 SPDK_ERRLOG("could not find required pmd\n"); 1775 goto err; 1776 } 1777 1778 rc = vbdev_compress_claim(meta_ctx); 1779 if (rc != 0) { 1780 goto err; 1781 } 1782 } else if (meta_ctx->reduce_errno == -ENOENT) { 1783 if (_set_compbdev_name(meta_ctx)) { 1784 goto err; 1785 } 1786 1787 /* Save the thread where the base device is opened */ 1788 meta_ctx->thread = spdk_get_thread(); 1789 1790 meta_ctx->comp_bdev.module = &compress_if; 1791 pthread_mutex_init(&meta_ctx->reduce_lock, NULL); 1792 rc = spdk_bdev_module_claim_bdev(meta_ctx->base_bdev, meta_ctx->base_desc, 1793 meta_ctx->comp_bdev.module); 1794 if (rc) { 1795 SPDK_ERRLOG("could not claim bdev %s\n", spdk_bdev_get_name(meta_ctx->base_bdev)); 1796 free(meta_ctx->comp_bdev.name); 1797 goto err; 1798 } 1799 1800 meta_ctx->orphaned = true; 1801 TAILQ_INSERT_TAIL(&g_vbdev_comp, meta_ctx, link); 1802 } else { 1803 if (meta_ctx->reduce_errno != -EILSEQ) { 1804 SPDK_ERRLOG("for vol %s, error %u\n", 1805 spdk_bdev_get_name(meta_ctx->base_bdev), meta_ctx->reduce_errno); 1806 } 1807 goto err; 1808 } 1809 1810 spdk_bdev_module_examine_done(&compress_if); 1811 return; 1812 1813 err: 1814 /* Close the underlying bdev on its same opened thread. */ 1815 spdk_bdev_close(meta_ctx->base_desc); 1816 free(meta_ctx); 1817 spdk_bdev_module_examine_done(&compress_if); 1818 } 1819 1820 /* Callback from reduce for then load is complete. We'll pass the vbdev_comp struct 1821 * used for initial metadata operations to claim where it will be further filled out 1822 * and added to the global list. 1823 */ 1824 static void 1825 vbdev_reduce_load_cb(void *cb_arg, struct spdk_reduce_vol *vol, int reduce_errno) 1826 { 1827 struct vbdev_compress *meta_ctx = cb_arg; 1828 1829 if (reduce_errno == 0) { 1830 /* Update information following volume load. */ 1831 meta_ctx->vol = vol; 1832 memcpy(&meta_ctx->params, spdk_reduce_vol_get_params(vol), 1833 sizeof(struct spdk_reduce_vol_params)); 1834 } 1835 1836 meta_ctx->reduce_errno = reduce_errno; 1837 1838 if (meta_ctx->thread && meta_ctx->thread != spdk_get_thread()) { 1839 spdk_thread_send_msg(meta_ctx->thread, _vbdev_reduce_load_cb, meta_ctx); 1840 } else { 1841 _vbdev_reduce_load_cb(meta_ctx); 1842 } 1843 1844 } 1845 1846 /* Examine_disk entry point: will do a metadata load to see if this is ours, 1847 * and if so will go ahead and claim it. 1848 */ 1849 static void 1850 vbdev_compress_examine(struct spdk_bdev *bdev) 1851 { 1852 struct spdk_bdev_desc *bdev_desc = NULL; 1853 struct vbdev_compress *meta_ctx; 1854 int rc; 1855 1856 if (strcmp(bdev->product_name, COMP_BDEV_NAME) == 0) { 1857 spdk_bdev_module_examine_done(&compress_if); 1858 return; 1859 } 1860 1861 rc = spdk_bdev_open_ext(spdk_bdev_get_name(bdev), false, 1862 vbdev_compress_base_bdev_event_cb, NULL, &bdev_desc); 1863 if (rc) { 1864 SPDK_ERRLOG("could not open bdev %s\n", spdk_bdev_get_name(bdev)); 1865 spdk_bdev_module_examine_done(&compress_if); 1866 return; 1867 } 1868 1869 meta_ctx = _prepare_for_load_init(bdev_desc, 0); 1870 if (meta_ctx == NULL) { 1871 spdk_bdev_close(bdev_desc); 1872 spdk_bdev_module_examine_done(&compress_if); 1873 return; 1874 } 1875 1876 /* Save the thread where the base device is opened */ 1877 meta_ctx->thread = spdk_get_thread(); 1878 1879 meta_ctx->base_ch = spdk_bdev_get_io_channel(meta_ctx->base_desc); 1880 spdk_reduce_vol_load(&meta_ctx->backing_dev, vbdev_reduce_load_cb, meta_ctx); 1881 } 1882 1883 int 1884 compress_set_pmd(enum compress_pmd *opts) 1885 { 1886 g_opts = *opts; 1887 1888 return 0; 1889 } 1890 1891 SPDK_LOG_REGISTER_COMPONENT(vbdev_compress) 1892