1 /*- 2 * BSD LICENSE 3 * 4 * Copyright (c) Intel Corporation. 5 * All rights reserved. 6 * Copyright (c) 2021 NVIDIA CORPORATION & AFFILIATES. All rights reserved. 7 * 8 * Redistribution and use in source and binary forms, with or without 9 * modification, are permitted provided that the following conditions 10 * are met: 11 * 12 * * Redistributions of source code must retain the above copyright 13 * notice, this list of conditions and the following disclaimer. 14 * * Redistributions in binary form must reproduce the above copyright 15 * notice, this list of conditions and the following disclaimer in 16 * the documentation and/or other materials provided with the 17 * distribution. 18 * * Neither the name of Intel Corporation nor the names of its 19 * contributors may be used to endorse or promote products derived 20 * from this software without specific prior written permission. 21 * 22 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 23 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 24 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR 25 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT 26 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, 27 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT 28 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 29 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY 30 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 31 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE 32 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 33 */ 34 35 #include "vbdev_compress.h" 36 37 #include "spdk/reduce.h" 38 #include "spdk/stdinc.h" 39 #include "spdk/rpc.h" 40 #include "spdk/env.h" 41 #include "spdk/endian.h" 42 #include "spdk/string.h" 43 #include "spdk/thread.h" 44 #include "spdk/util.h" 45 #include "spdk/bdev_module.h" 46 47 #include "spdk/log.h" 48 49 #include <rte_config.h> 50 #include <rte_bus_vdev.h> 51 #include <rte_compressdev.h> 52 #include <rte_comp.h> 53 #include <rte_mbuf_dyn.h> 54 55 /* Used to store IO context in mbuf */ 56 static const struct rte_mbuf_dynfield rte_mbuf_dynfield_io_context = { 57 .name = "context_reduce", 58 .size = sizeof(uint64_t), 59 .align = __alignof__(uint64_t), 60 .flags = 0, 61 }; 62 static int g_mbuf_offset; 63 64 #define NUM_MAX_XFORMS 2 65 #define NUM_MAX_INFLIGHT_OPS 128 66 #define DEFAULT_WINDOW_SIZE 15 67 /* We need extra mbufs per operation to accommodate host buffers that 68 * span a 2MB boundary. 69 */ 70 #define MAX_MBUFS_PER_OP (REDUCE_MAX_IOVECS * 2) 71 #define CHUNK_SIZE (1024 * 16) 72 #define COMP_BDEV_NAME "compress" 73 #define BACKING_IO_SZ (4 * 1024) 74 75 #define ISAL_PMD "compress_isal" 76 #define QAT_PMD "compress_qat" 77 #define MLX5_PMD "mlx5_pci" 78 #define NUM_MBUFS 8192 79 #define POOL_CACHE_SIZE 256 80 81 static enum compress_pmd g_opts; 82 83 /* Global list of available compression devices. */ 84 struct compress_dev { 85 struct rte_compressdev_info cdev_info; /* includes device friendly name */ 86 uint8_t cdev_id; /* identifier for the device */ 87 void *comp_xform; /* shared private xform for comp on this PMD */ 88 void *decomp_xform; /* shared private xform for decomp on this PMD */ 89 TAILQ_ENTRY(compress_dev) link; 90 }; 91 static TAILQ_HEAD(, compress_dev) g_compress_devs = TAILQ_HEAD_INITIALIZER(g_compress_devs); 92 93 /* Although ISAL PMD reports 'unlimited' qpairs, it has an unplanned limit of 99 due to 94 * the length of the internal ring name that it creates, it breaks a limit in the generic 95 * ring code and fails the qp initialization. 96 */ 97 #define MAX_NUM_QP 99 98 /* Global list and lock for unique device/queue pair combos */ 99 struct comp_device_qp { 100 struct compress_dev *device; /* ptr to compression device */ 101 uint8_t qp; /* queue pair for this node */ 102 struct spdk_thread *thread; /* thread that this qp is assigned to */ 103 TAILQ_ENTRY(comp_device_qp) link; 104 }; 105 static TAILQ_HEAD(, comp_device_qp) g_comp_device_qp = TAILQ_HEAD_INITIALIZER(g_comp_device_qp); 106 static pthread_mutex_t g_comp_device_qp_lock = PTHREAD_MUTEX_INITIALIZER; 107 108 /* For queueing up compression operations that we can't submit for some reason */ 109 struct vbdev_comp_op { 110 struct spdk_reduce_backing_dev *backing_dev; 111 struct iovec *src_iovs; 112 int src_iovcnt; 113 struct iovec *dst_iovs; 114 int dst_iovcnt; 115 bool compress; 116 void *cb_arg; 117 TAILQ_ENTRY(vbdev_comp_op) link; 118 }; 119 120 struct vbdev_comp_delete_ctx { 121 spdk_delete_compress_complete cb_fn; 122 void *cb_arg; 123 int cb_rc; 124 struct spdk_thread *orig_thread; 125 }; 126 127 /* List of virtual bdevs and associated info for each. */ 128 struct vbdev_compress { 129 struct spdk_bdev *base_bdev; /* the thing we're attaching to */ 130 struct spdk_bdev_desc *base_desc; /* its descriptor we get from open */ 131 struct spdk_io_channel *base_ch; /* IO channel of base device */ 132 struct spdk_bdev comp_bdev; /* the compression virtual bdev */ 133 struct comp_io_channel *comp_ch; /* channel associated with this bdev */ 134 char *drv_name; /* name of the compression device driver */ 135 struct comp_device_qp *device_qp; 136 struct spdk_thread *reduce_thread; 137 pthread_mutex_t reduce_lock; 138 uint32_t ch_count; 139 TAILQ_HEAD(, spdk_bdev_io) pending_comp_ios; /* outstanding operations to a comp library */ 140 struct spdk_poller *poller; /* completion poller */ 141 struct spdk_reduce_vol_params params; /* params for the reduce volume */ 142 struct spdk_reduce_backing_dev backing_dev; /* backing device info for the reduce volume */ 143 struct spdk_reduce_vol *vol; /* the reduce volume */ 144 struct vbdev_comp_delete_ctx *delete_ctx; 145 bool orphaned; /* base bdev claimed but comp_bdev not registered */ 146 int reduce_errno; 147 TAILQ_HEAD(, vbdev_comp_op) queued_comp_ops; 148 TAILQ_ENTRY(vbdev_compress) link; 149 struct spdk_thread *thread; /* thread where base device is opened */ 150 }; 151 static TAILQ_HEAD(, vbdev_compress) g_vbdev_comp = TAILQ_HEAD_INITIALIZER(g_vbdev_comp); 152 153 /* The comp vbdev channel struct. It is allocated and freed on my behalf by the io channel code. 154 */ 155 struct comp_io_channel { 156 struct spdk_io_channel_iter *iter; /* used with for_each_channel in reset */ 157 }; 158 159 /* Per I/O context for the compression vbdev. */ 160 struct comp_bdev_io { 161 struct comp_io_channel *comp_ch; /* used in completion handling */ 162 struct vbdev_compress *comp_bdev; /* vbdev associated with this IO */ 163 struct spdk_bdev_io_wait_entry bdev_io_wait; /* for bdev_io_wait */ 164 struct spdk_bdev_io *orig_io; /* the original IO */ 165 struct spdk_io_channel *ch; /* for resubmission */ 166 int status; /* save for completion on orig thread */ 167 }; 168 169 /* Shared mempools between all devices on this system */ 170 static struct rte_mempool *g_mbuf_mp = NULL; /* mbuf mempool */ 171 static struct rte_mempool *g_comp_op_mp = NULL; /* comp operations, must be rte* mempool */ 172 static struct rte_mbuf_ext_shared_info g_shinfo = {}; /* used by DPDK mbuf macros */ 173 static bool g_qat_available = false; 174 static bool g_isal_available = false; 175 static bool g_mlx5_pci_available = false; 176 177 /* Create shared (between all ops per PMD) compress xforms. */ 178 static struct rte_comp_xform g_comp_xform = { 179 .type = RTE_COMP_COMPRESS, 180 .compress = { 181 .algo = RTE_COMP_ALGO_DEFLATE, 182 .deflate.huffman = RTE_COMP_HUFFMAN_DEFAULT, 183 .level = RTE_COMP_LEVEL_MAX, 184 .window_size = DEFAULT_WINDOW_SIZE, 185 .chksum = RTE_COMP_CHECKSUM_NONE, 186 .hash_algo = RTE_COMP_HASH_ALGO_NONE 187 } 188 }; 189 /* Create shared (between all ops per PMD) decompress xforms. */ 190 static struct rte_comp_xform g_decomp_xform = { 191 .type = RTE_COMP_DECOMPRESS, 192 .decompress = { 193 .algo = RTE_COMP_ALGO_DEFLATE, 194 .chksum = RTE_COMP_CHECKSUM_NONE, 195 .window_size = DEFAULT_WINDOW_SIZE, 196 .hash_algo = RTE_COMP_HASH_ALGO_NONE 197 } 198 }; 199 200 static void vbdev_compress_examine(struct spdk_bdev *bdev); 201 static int vbdev_compress_claim(struct vbdev_compress *comp_bdev); 202 static void vbdev_compress_queue_io(struct spdk_bdev_io *bdev_io); 203 struct vbdev_compress *_prepare_for_load_init(struct spdk_bdev_desc *bdev_desc, uint32_t lb_size); 204 static void vbdev_compress_submit_request(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io); 205 static void comp_bdev_ch_destroy_cb(void *io_device, void *ctx_buf); 206 static void vbdev_compress_delete_done(void *cb_arg, int bdeverrno); 207 208 /* Dummy function used by DPDK to free ext attached buffers 209 * to mbufs, we free them ourselves but this callback has to 210 * be here. 211 */ 212 static void 213 shinfo_free_cb(void *arg1, void *arg2) 214 { 215 } 216 217 /* Called by vbdev_init_compress_drivers() to init each discovered compression device */ 218 static int 219 create_compress_dev(uint8_t index) 220 { 221 struct compress_dev *device; 222 uint16_t q_pairs; 223 uint8_t cdev_id; 224 int rc, i; 225 struct comp_device_qp *dev_qp; 226 struct comp_device_qp *tmp_qp; 227 228 device = calloc(1, sizeof(struct compress_dev)); 229 if (!device) { 230 return -ENOMEM; 231 } 232 233 /* Get details about this device. */ 234 rte_compressdev_info_get(index, &device->cdev_info); 235 236 cdev_id = device->cdev_id = index; 237 238 /* Zero means no limit so choose number of lcores. */ 239 if (device->cdev_info.max_nb_queue_pairs == 0) { 240 q_pairs = MAX_NUM_QP; 241 } else { 242 q_pairs = spdk_min(device->cdev_info.max_nb_queue_pairs, MAX_NUM_QP); 243 } 244 245 /* Configure the compression device. */ 246 struct rte_compressdev_config config = { 247 .socket_id = rte_socket_id(), 248 .nb_queue_pairs = q_pairs, 249 .max_nb_priv_xforms = NUM_MAX_XFORMS, 250 .max_nb_streams = 0 251 }; 252 rc = rte_compressdev_configure(cdev_id, &config); 253 if (rc < 0) { 254 SPDK_ERRLOG("Failed to configure compressdev %u\n", cdev_id); 255 goto err; 256 } 257 258 /* Pre-setup all potential qpairs now and assign them in the channel 259 * callback. 260 */ 261 for (i = 0; i < q_pairs; i++) { 262 rc = rte_compressdev_queue_pair_setup(cdev_id, i, 263 NUM_MAX_INFLIGHT_OPS, 264 rte_socket_id()); 265 if (rc) { 266 if (i > 0) { 267 q_pairs = i; 268 SPDK_NOTICELOG("FYI failed to setup a queue pair on " 269 "compressdev %u with error %u " 270 "so limiting to %u qpairs\n", 271 cdev_id, rc, q_pairs); 272 break; 273 } else { 274 SPDK_ERRLOG("Failed to setup queue pair on " 275 "compressdev %u with error %u\n", cdev_id, rc); 276 rc = -EINVAL; 277 goto err; 278 } 279 } 280 } 281 282 rc = rte_compressdev_start(cdev_id); 283 if (rc < 0) { 284 SPDK_ERRLOG("Failed to start device %u: error %d\n", 285 cdev_id, rc); 286 goto err; 287 } 288 289 if (device->cdev_info.capabilities->comp_feature_flags & RTE_COMP_FF_SHAREABLE_PRIV_XFORM) { 290 rc = rte_compressdev_private_xform_create(cdev_id, &g_comp_xform, 291 &device->comp_xform); 292 if (rc < 0) { 293 SPDK_ERRLOG("Failed to create private comp xform device %u: error %d\n", 294 cdev_id, rc); 295 goto err; 296 } 297 298 rc = rte_compressdev_private_xform_create(cdev_id, &g_decomp_xform, 299 &device->decomp_xform); 300 if (rc) { 301 SPDK_ERRLOG("Failed to create private decomp xform device %u: error %d\n", 302 cdev_id, rc); 303 goto err; 304 } 305 } else { 306 SPDK_ERRLOG("PMD does not support shared transforms\n"); 307 goto err; 308 } 309 310 /* Build up list of device/qp combinations */ 311 for (i = 0; i < q_pairs; i++) { 312 dev_qp = calloc(1, sizeof(struct comp_device_qp)); 313 if (!dev_qp) { 314 rc = -ENOMEM; 315 goto err; 316 } 317 dev_qp->device = device; 318 dev_qp->qp = i; 319 dev_qp->thread = NULL; 320 TAILQ_INSERT_TAIL(&g_comp_device_qp, dev_qp, link); 321 } 322 323 TAILQ_INSERT_TAIL(&g_compress_devs, device, link); 324 325 if (strcmp(device->cdev_info.driver_name, QAT_PMD) == 0) { 326 g_qat_available = true; 327 } 328 if (strcmp(device->cdev_info.driver_name, ISAL_PMD) == 0) { 329 g_isal_available = true; 330 } 331 if (strcmp(device->cdev_info.driver_name, MLX5_PMD) == 0) { 332 g_mlx5_pci_available = true; 333 } 334 335 return 0; 336 337 err: 338 TAILQ_FOREACH_SAFE(dev_qp, &g_comp_device_qp, link, tmp_qp) { 339 TAILQ_REMOVE(&g_comp_device_qp, dev_qp, link); 340 free(dev_qp); 341 } 342 free(device); 343 return rc; 344 } 345 346 /* Called from driver init entry point, vbdev_compress_init() */ 347 static int 348 vbdev_init_compress_drivers(void) 349 { 350 uint8_t cdev_count, i; 351 struct compress_dev *tmp_dev; 352 struct compress_dev *device; 353 int rc; 354 355 /* We always init the compress_isal PMD */ 356 rc = rte_vdev_init(ISAL_PMD, NULL); 357 if (rc == 0) { 358 SPDK_NOTICELOG("created virtual PMD %s\n", ISAL_PMD); 359 } else if (rc == -EEXIST) { 360 SPDK_NOTICELOG("virtual PMD %s already exists.\n", ISAL_PMD); 361 } else { 362 SPDK_ERRLOG("creating virtual PMD %s\n", ISAL_PMD); 363 return -EINVAL; 364 } 365 366 /* If we have no compression devices, there's no reason to continue. */ 367 cdev_count = rte_compressdev_count(); 368 if (cdev_count == 0) { 369 return 0; 370 } 371 if (cdev_count > RTE_COMPRESS_MAX_DEVS) { 372 SPDK_ERRLOG("invalid device count from rte_compressdev_count()\n"); 373 return -EINVAL; 374 } 375 376 g_mbuf_offset = rte_mbuf_dynfield_register(&rte_mbuf_dynfield_io_context); 377 if (g_mbuf_offset < 0) { 378 SPDK_ERRLOG("error registering dynamic field with DPDK\n"); 379 return -EINVAL; 380 } 381 382 g_mbuf_mp = rte_pktmbuf_pool_create("comp_mbuf_mp", NUM_MBUFS, POOL_CACHE_SIZE, 383 sizeof(struct rte_mbuf), 0, rte_socket_id()); 384 if (g_mbuf_mp == NULL) { 385 SPDK_ERRLOG("Cannot create mbuf pool\n"); 386 rc = -ENOMEM; 387 goto error_create_mbuf; 388 } 389 390 g_comp_op_mp = rte_comp_op_pool_create("comp_op_pool", NUM_MBUFS, POOL_CACHE_SIZE, 391 0, rte_socket_id()); 392 if (g_comp_op_mp == NULL) { 393 SPDK_ERRLOG("Cannot create comp op pool\n"); 394 rc = -ENOMEM; 395 goto error_create_op; 396 } 397 398 /* Init all devices */ 399 for (i = 0; i < cdev_count; i++) { 400 rc = create_compress_dev(i); 401 if (rc != 0) { 402 goto error_create_compress_devs; 403 } 404 } 405 406 if (g_qat_available == true) { 407 SPDK_NOTICELOG("initialized QAT PMD\n"); 408 } 409 410 g_shinfo.free_cb = shinfo_free_cb; 411 412 return 0; 413 414 /* Error cleanup paths. */ 415 error_create_compress_devs: 416 TAILQ_FOREACH_SAFE(device, &g_compress_devs, link, tmp_dev) { 417 TAILQ_REMOVE(&g_compress_devs, device, link); 418 free(device); 419 } 420 error_create_op: 421 error_create_mbuf: 422 rte_mempool_free(g_mbuf_mp); 423 424 return rc; 425 } 426 427 /* for completing rw requests on the orig IO thread. */ 428 static void 429 _reduce_rw_blocks_cb(void *arg) 430 { 431 struct comp_bdev_io *io_ctx = arg; 432 433 if (io_ctx->status == 0) { 434 spdk_bdev_io_complete(io_ctx->orig_io, SPDK_BDEV_IO_STATUS_SUCCESS); 435 } else { 436 SPDK_ERRLOG("status %d on operation from reduce API\n", io_ctx->status); 437 spdk_bdev_io_complete(io_ctx->orig_io, SPDK_BDEV_IO_STATUS_FAILED); 438 } 439 } 440 441 /* Completion callback for r/w that were issued via reducelib. */ 442 static void 443 reduce_rw_blocks_cb(void *arg, int reduce_errno) 444 { 445 struct spdk_bdev_io *bdev_io = arg; 446 struct comp_bdev_io *io_ctx = (struct comp_bdev_io *)bdev_io->driver_ctx; 447 struct spdk_io_channel *ch = spdk_io_channel_from_ctx(io_ctx->comp_ch); 448 struct spdk_thread *orig_thread; 449 450 /* TODO: need to decide which error codes are bdev_io success vs failure; 451 * example examine calls reading metadata */ 452 453 io_ctx->status = reduce_errno; 454 455 /* Send this request to the orig IO thread. */ 456 orig_thread = spdk_io_channel_get_thread(ch); 457 if (orig_thread != spdk_get_thread()) { 458 spdk_thread_send_msg(orig_thread, _reduce_rw_blocks_cb, io_ctx); 459 } else { 460 _reduce_rw_blocks_cb(io_ctx); 461 } 462 } 463 464 static uint64_t 465 _setup_compress_mbuf(struct rte_mbuf **mbufs, int *mbuf_total, uint64_t *total_length, 466 struct iovec *iovs, int iovcnt, void *reduce_cb_arg) 467 { 468 uint64_t updated_length, remainder, phys_addr; 469 uint8_t *current_base = NULL; 470 int iov_index, mbuf_index; 471 int rc = 0; 472 473 /* Setup mbufs */ 474 iov_index = mbuf_index = 0; 475 while (iov_index < iovcnt) { 476 477 current_base = iovs[iov_index].iov_base; 478 if (total_length) { 479 *total_length += iovs[iov_index].iov_len; 480 } 481 assert(mbufs[mbuf_index] != NULL); 482 *RTE_MBUF_DYNFIELD(mbufs[mbuf_index], g_mbuf_offset, uint64_t *) = (uint64_t)reduce_cb_arg; 483 updated_length = iovs[iov_index].iov_len; 484 phys_addr = spdk_vtophys((void *)current_base, &updated_length); 485 486 rte_pktmbuf_attach_extbuf(mbufs[mbuf_index], 487 current_base, 488 phys_addr, 489 updated_length, 490 &g_shinfo); 491 rte_pktmbuf_append(mbufs[mbuf_index], updated_length); 492 remainder = iovs[iov_index].iov_len - updated_length; 493 494 if (mbuf_index > 0) { 495 rte_pktmbuf_chain(mbufs[0], mbufs[mbuf_index]); 496 } 497 498 /* If we crossed 2 2MB boundary we need another mbuf for the remainder */ 499 if (remainder > 0) { 500 /* allocate an mbuf at the end of the array */ 501 rc = rte_pktmbuf_alloc_bulk(g_mbuf_mp, 502 (struct rte_mbuf **)&mbufs[*mbuf_total], 1); 503 if (rc) { 504 SPDK_ERRLOG("ERROR trying to get an extra mbuf!\n"); 505 return -1; 506 } 507 (*mbuf_total)++; 508 mbuf_index++; 509 *RTE_MBUF_DYNFIELD(mbufs[mbuf_index], g_mbuf_offset, uint64_t *) = (uint64_t)reduce_cb_arg; 510 current_base += updated_length; 511 phys_addr = spdk_vtophys((void *)current_base, &remainder); 512 /* assert we don't cross another */ 513 assert(remainder == iovs[iov_index].iov_len - updated_length); 514 515 rte_pktmbuf_attach_extbuf(mbufs[mbuf_index], 516 current_base, 517 phys_addr, 518 remainder, 519 &g_shinfo); 520 rte_pktmbuf_append(mbufs[mbuf_index], remainder); 521 rte_pktmbuf_chain(mbufs[0], mbufs[mbuf_index]); 522 } 523 iov_index++; 524 mbuf_index++; 525 } 526 527 return 0; 528 } 529 530 static int 531 _compress_operation(struct spdk_reduce_backing_dev *backing_dev, struct iovec *src_iovs, 532 int src_iovcnt, struct iovec *dst_iovs, 533 int dst_iovcnt, bool compress, void *cb_arg) 534 { 535 void *reduce_cb_arg = cb_arg; 536 struct vbdev_compress *comp_bdev = SPDK_CONTAINEROF(backing_dev, struct vbdev_compress, 537 backing_dev); 538 struct rte_comp_op *comp_op; 539 struct rte_mbuf *src_mbufs[MAX_MBUFS_PER_OP]; 540 struct rte_mbuf *dst_mbufs[MAX_MBUFS_PER_OP]; 541 uint8_t cdev_id = comp_bdev->device_qp->device->cdev_id; 542 uint64_t total_length = 0; 543 int rc = 0; 544 struct vbdev_comp_op *op_to_queue; 545 int i; 546 int src_mbuf_total = src_iovcnt; 547 int dst_mbuf_total = dst_iovcnt; 548 bool device_error = false; 549 550 assert(src_iovcnt < MAX_MBUFS_PER_OP); 551 552 #ifdef DEBUG 553 memset(src_mbufs, 0, sizeof(src_mbufs)); 554 memset(dst_mbufs, 0, sizeof(dst_mbufs)); 555 #endif 556 557 comp_op = rte_comp_op_alloc(g_comp_op_mp); 558 if (!comp_op) { 559 SPDK_ERRLOG("trying to get a comp op!\n"); 560 goto error_get_op; 561 } 562 563 /* get an mbuf per iov, src and dst */ 564 rc = rte_pktmbuf_alloc_bulk(g_mbuf_mp, (struct rte_mbuf **)&src_mbufs[0], src_iovcnt); 565 if (rc) { 566 SPDK_ERRLOG("ERROR trying to get src_mbufs!\n"); 567 goto error_get_src; 568 } 569 570 rc = rte_pktmbuf_alloc_bulk(g_mbuf_mp, (struct rte_mbuf **)&dst_mbufs[0], dst_iovcnt); 571 if (rc) { 572 SPDK_ERRLOG("ERROR trying to get dst_mbufs!\n"); 573 goto error_get_dst; 574 } 575 576 /* There is a 1:1 mapping between a bdev_io and a compression operation, but 577 * all compression PMDs that SPDK uses support chaining so build our mbuf chain 578 * and associate with our single comp_op. 579 */ 580 581 rc = _setup_compress_mbuf(&src_mbufs[0], &src_mbuf_total, &total_length, 582 src_iovs, src_iovcnt, reduce_cb_arg); 583 if (rc < 0) { 584 goto error_src_dst; 585 } 586 587 comp_op->m_src = src_mbufs[0]; 588 comp_op->src.offset = 0; 589 comp_op->src.length = total_length; 590 591 /* setup dst mbufs, for the current test being used with this code there's only one vector */ 592 rc = _setup_compress_mbuf(&dst_mbufs[0], &dst_mbuf_total, NULL, 593 dst_iovs, dst_iovcnt, reduce_cb_arg); 594 if (rc < 0) { 595 goto error_src_dst; 596 } 597 598 comp_op->m_dst = dst_mbufs[0]; 599 comp_op->dst.offset = 0; 600 601 if (compress == true) { 602 comp_op->private_xform = comp_bdev->device_qp->device->comp_xform; 603 } else { 604 comp_op->private_xform = comp_bdev->device_qp->device->decomp_xform; 605 } 606 607 comp_op->op_type = RTE_COMP_OP_STATELESS; 608 comp_op->flush_flag = RTE_COMP_FLUSH_FINAL; 609 610 rc = rte_compressdev_enqueue_burst(cdev_id, comp_bdev->device_qp->qp, &comp_op, 1); 611 assert(rc <= 1); 612 613 /* We always expect 1 got queued, if 0 then we need to queue it up. */ 614 if (rc == 1) { 615 return 0; 616 } else if (comp_op->status == RTE_COMP_OP_STATUS_NOT_PROCESSED) { 617 /* we free mbufs differently depending on whether they were chained or not */ 618 rte_pktmbuf_free(comp_op->m_src); 619 rte_pktmbuf_free(comp_op->m_dst); 620 goto error_enqueue; 621 } else { 622 device_error = true; 623 goto error_src_dst; 624 } 625 626 /* Error cleanup paths. */ 627 error_src_dst: 628 for (i = 0; i < dst_mbuf_total; i++) { 629 rte_pktmbuf_free((struct rte_mbuf *)&dst_mbufs[i]); 630 } 631 error_get_dst: 632 for (i = 0; i < src_mbuf_total; i++) { 633 rte_pktmbuf_free((struct rte_mbuf *)&src_mbufs[i]); 634 } 635 error_get_src: 636 error_enqueue: 637 rte_comp_op_free(comp_op); 638 error_get_op: 639 640 if (device_error == true) { 641 /* There was an error sending the op to the device, most 642 * likely with the parameters. 643 */ 644 SPDK_ERRLOG("Compression API returned 0x%x\n", comp_op->status); 645 return -EINVAL; 646 } 647 648 op_to_queue = calloc(1, sizeof(struct vbdev_comp_op)); 649 if (op_to_queue == NULL) { 650 SPDK_ERRLOG("unable to allocate operation for queueing.\n"); 651 return -ENOMEM; 652 } 653 op_to_queue->backing_dev = backing_dev; 654 op_to_queue->src_iovs = src_iovs; 655 op_to_queue->src_iovcnt = src_iovcnt; 656 op_to_queue->dst_iovs = dst_iovs; 657 op_to_queue->dst_iovcnt = dst_iovcnt; 658 op_to_queue->compress = compress; 659 op_to_queue->cb_arg = cb_arg; 660 TAILQ_INSERT_TAIL(&comp_bdev->queued_comp_ops, 661 op_to_queue, 662 link); 663 return 0; 664 } 665 666 /* Poller for the DPDK compression driver. */ 667 static int 668 comp_dev_poller(void *args) 669 { 670 struct vbdev_compress *comp_bdev = args; 671 uint8_t cdev_id = comp_bdev->device_qp->device->cdev_id; 672 struct rte_comp_op *deq_ops[NUM_MAX_INFLIGHT_OPS]; 673 uint16_t num_deq; 674 struct spdk_reduce_vol_cb_args *reduce_args; 675 struct vbdev_comp_op *op_to_resubmit; 676 int rc, i; 677 678 num_deq = rte_compressdev_dequeue_burst(cdev_id, comp_bdev->device_qp->qp, deq_ops, 679 NUM_MAX_INFLIGHT_OPS); 680 for (i = 0; i < num_deq; i++) { 681 reduce_args = (struct spdk_reduce_vol_cb_args *)*RTE_MBUF_DYNFIELD(deq_ops[i]->m_src, g_mbuf_offset, 682 uint64_t *); 683 if (deq_ops[i]->status == RTE_COMP_OP_STATUS_SUCCESS) { 684 685 /* tell reduce this is done and what the bytecount was */ 686 reduce_args->cb_fn(reduce_args->cb_arg, deq_ops[i]->produced); 687 } else { 688 SPDK_NOTICELOG("FYI storing data uncompressed due to deque status %u\n", 689 deq_ops[i]->status); 690 691 /* Reduce will simply store uncompressed on neg errno value. */ 692 reduce_args->cb_fn(reduce_args->cb_arg, -EINVAL); 693 } 694 695 /* Now free both mbufs and the compress operation. The rte_pktmbuf_free() 696 * call takes care of freeing all of the mbufs in the chain back to their 697 * original pool. 698 */ 699 rte_pktmbuf_free(deq_ops[i]->m_src); 700 rte_pktmbuf_free(deq_ops[i]->m_dst); 701 702 /* There is no bulk free for com ops so we have to free them one at a time 703 * here however it would be rare that we'd ever have more than 1 at a time 704 * anyways. 705 */ 706 rte_comp_op_free(deq_ops[i]); 707 708 /* Check if there are any pending comp ops to process, only pull one 709 * at a time off as _compress_operation() may re-queue the op. 710 */ 711 if (!TAILQ_EMPTY(&comp_bdev->queued_comp_ops)) { 712 op_to_resubmit = TAILQ_FIRST(&comp_bdev->queued_comp_ops); 713 rc = _compress_operation(op_to_resubmit->backing_dev, 714 op_to_resubmit->src_iovs, 715 op_to_resubmit->src_iovcnt, 716 op_to_resubmit->dst_iovs, 717 op_to_resubmit->dst_iovcnt, 718 op_to_resubmit->compress, 719 op_to_resubmit->cb_arg); 720 if (rc == 0) { 721 TAILQ_REMOVE(&comp_bdev->queued_comp_ops, op_to_resubmit, link); 722 free(op_to_resubmit); 723 } 724 } 725 } 726 return num_deq == 0 ? SPDK_POLLER_IDLE : SPDK_POLLER_BUSY; 727 } 728 729 /* Entry point for reduce lib to issue a compress operation. */ 730 static void 731 _comp_reduce_compress(struct spdk_reduce_backing_dev *dev, 732 struct iovec *src_iovs, int src_iovcnt, 733 struct iovec *dst_iovs, int dst_iovcnt, 734 struct spdk_reduce_vol_cb_args *cb_arg) 735 { 736 int rc; 737 738 rc = _compress_operation(dev, src_iovs, src_iovcnt, dst_iovs, dst_iovcnt, true, cb_arg); 739 if (rc) { 740 SPDK_ERRLOG("with compress operation code %d (%s)\n", rc, spdk_strerror(-rc)); 741 cb_arg->cb_fn(cb_arg->cb_arg, rc); 742 } 743 } 744 745 /* Entry point for reduce lib to issue a decompress operation. */ 746 static void 747 _comp_reduce_decompress(struct spdk_reduce_backing_dev *dev, 748 struct iovec *src_iovs, int src_iovcnt, 749 struct iovec *dst_iovs, int dst_iovcnt, 750 struct spdk_reduce_vol_cb_args *cb_arg) 751 { 752 int rc; 753 754 rc = _compress_operation(dev, src_iovs, src_iovcnt, dst_iovs, dst_iovcnt, false, cb_arg); 755 if (rc) { 756 SPDK_ERRLOG("with decompress operation code %d (%s)\n", rc, spdk_strerror(-rc)); 757 cb_arg->cb_fn(cb_arg->cb_arg, rc); 758 } 759 } 760 761 /* Callback for getting a buf from the bdev pool in the event that the caller passed 762 * in NULL, we need to own the buffer so it doesn't get freed by another vbdev module 763 * beneath us before we're done with it. 764 */ 765 static void 766 comp_read_get_buf_cb(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io, bool success) 767 { 768 struct vbdev_compress *comp_bdev = SPDK_CONTAINEROF(bdev_io->bdev, struct vbdev_compress, 769 comp_bdev); 770 771 spdk_reduce_vol_readv(comp_bdev->vol, bdev_io->u.bdev.iovs, bdev_io->u.bdev.iovcnt, 772 bdev_io->u.bdev.offset_blocks, bdev_io->u.bdev.num_blocks, 773 reduce_rw_blocks_cb, bdev_io); 774 } 775 776 /* scheduled for completion on IO thread */ 777 static void 778 _complete_other_io(void *arg) 779 { 780 struct comp_bdev_io *io_ctx = (struct comp_bdev_io *)arg; 781 if (io_ctx->status == 0) { 782 spdk_bdev_io_complete(io_ctx->orig_io, SPDK_BDEV_IO_STATUS_SUCCESS); 783 } else { 784 spdk_bdev_io_complete(io_ctx->orig_io, SPDK_BDEV_IO_STATUS_FAILED); 785 } 786 } 787 788 /* scheduled for submission on reduce thread */ 789 static void 790 _comp_bdev_io_submit(void *arg) 791 { 792 struct spdk_bdev_io *bdev_io = arg; 793 struct comp_bdev_io *io_ctx = (struct comp_bdev_io *)bdev_io->driver_ctx; 794 struct spdk_io_channel *ch = spdk_io_channel_from_ctx(io_ctx->comp_ch); 795 struct vbdev_compress *comp_bdev = SPDK_CONTAINEROF(bdev_io->bdev, struct vbdev_compress, 796 comp_bdev); 797 struct spdk_thread *orig_thread; 798 int rc = 0; 799 800 switch (bdev_io->type) { 801 case SPDK_BDEV_IO_TYPE_READ: 802 spdk_bdev_io_get_buf(bdev_io, comp_read_get_buf_cb, 803 bdev_io->u.bdev.num_blocks * bdev_io->bdev->blocklen); 804 return; 805 case SPDK_BDEV_IO_TYPE_WRITE: 806 spdk_reduce_vol_writev(comp_bdev->vol, bdev_io->u.bdev.iovs, bdev_io->u.bdev.iovcnt, 807 bdev_io->u.bdev.offset_blocks, bdev_io->u.bdev.num_blocks, 808 reduce_rw_blocks_cb, bdev_io); 809 return; 810 /* TODO in future patch in the series */ 811 case SPDK_BDEV_IO_TYPE_RESET: 812 break; 813 case SPDK_BDEV_IO_TYPE_WRITE_ZEROES: 814 case SPDK_BDEV_IO_TYPE_UNMAP: 815 case SPDK_BDEV_IO_TYPE_FLUSH: 816 default: 817 SPDK_ERRLOG("Unknown I/O type %d\n", bdev_io->type); 818 rc = -EINVAL; 819 } 820 821 if (rc) { 822 if (rc == -ENOMEM) { 823 SPDK_ERRLOG("No memory, start to queue io for compress.\n"); 824 io_ctx->ch = ch; 825 vbdev_compress_queue_io(bdev_io); 826 return; 827 } else { 828 SPDK_ERRLOG("on bdev_io submission!\n"); 829 io_ctx->status = rc; 830 } 831 } 832 833 /* Complete this on the orig IO thread. */ 834 orig_thread = spdk_io_channel_get_thread(ch); 835 if (orig_thread != spdk_get_thread()) { 836 spdk_thread_send_msg(orig_thread, _complete_other_io, io_ctx); 837 } else { 838 _complete_other_io(io_ctx); 839 } 840 } 841 842 /* Called when someone above submits IO to this vbdev. */ 843 static void 844 vbdev_compress_submit_request(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io) 845 { 846 struct comp_bdev_io *io_ctx = (struct comp_bdev_io *)bdev_io->driver_ctx; 847 struct vbdev_compress *comp_bdev = SPDK_CONTAINEROF(bdev_io->bdev, struct vbdev_compress, 848 comp_bdev); 849 struct comp_io_channel *comp_ch = spdk_io_channel_get_ctx(ch); 850 851 memset(io_ctx, 0, sizeof(struct comp_bdev_io)); 852 io_ctx->comp_bdev = comp_bdev; 853 io_ctx->comp_ch = comp_ch; 854 io_ctx->orig_io = bdev_io; 855 856 /* Send this request to the reduce_thread if that's not what we're on. */ 857 if (spdk_get_thread() != comp_bdev->reduce_thread) { 858 spdk_thread_send_msg(comp_bdev->reduce_thread, _comp_bdev_io_submit, bdev_io); 859 } else { 860 _comp_bdev_io_submit(bdev_io); 861 } 862 } 863 864 static bool 865 vbdev_compress_io_type_supported(void *ctx, enum spdk_bdev_io_type io_type) 866 { 867 struct vbdev_compress *comp_bdev = (struct vbdev_compress *)ctx; 868 869 switch (io_type) { 870 case SPDK_BDEV_IO_TYPE_READ: 871 case SPDK_BDEV_IO_TYPE_WRITE: 872 return spdk_bdev_io_type_supported(comp_bdev->base_bdev, io_type); 873 case SPDK_BDEV_IO_TYPE_UNMAP: 874 case SPDK_BDEV_IO_TYPE_RESET: 875 case SPDK_BDEV_IO_TYPE_FLUSH: 876 case SPDK_BDEV_IO_TYPE_WRITE_ZEROES: 877 default: 878 return false; 879 } 880 } 881 882 /* Resubmission function used by the bdev layer when a queued IO is ready to be 883 * submitted. 884 */ 885 static void 886 vbdev_compress_resubmit_io(void *arg) 887 { 888 struct spdk_bdev_io *bdev_io = (struct spdk_bdev_io *)arg; 889 struct comp_bdev_io *io_ctx = (struct comp_bdev_io *)bdev_io->driver_ctx; 890 891 vbdev_compress_submit_request(io_ctx->ch, bdev_io); 892 } 893 894 /* Used to queue an IO in the event of resource issues. */ 895 static void 896 vbdev_compress_queue_io(struct spdk_bdev_io *bdev_io) 897 { 898 struct comp_bdev_io *io_ctx = (struct comp_bdev_io *)bdev_io->driver_ctx; 899 int rc; 900 901 io_ctx->bdev_io_wait.bdev = bdev_io->bdev; 902 io_ctx->bdev_io_wait.cb_fn = vbdev_compress_resubmit_io; 903 io_ctx->bdev_io_wait.cb_arg = bdev_io; 904 905 rc = spdk_bdev_queue_io_wait(bdev_io->bdev, io_ctx->comp_bdev->base_ch, &io_ctx->bdev_io_wait); 906 if (rc) { 907 SPDK_ERRLOG("Queue io failed in vbdev_compress_queue_io, rc=%d.\n", rc); 908 assert(false); 909 spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED); 910 } 911 } 912 913 /* Callback for unregistering the IO device. */ 914 static void 915 _device_unregister_cb(void *io_device) 916 { 917 struct vbdev_compress *comp_bdev = io_device; 918 919 /* Done with this comp_bdev. */ 920 pthread_mutex_destroy(&comp_bdev->reduce_lock); 921 free(comp_bdev->comp_bdev.name); 922 free(comp_bdev); 923 } 924 925 static void 926 _vbdev_compress_destruct_cb(void *ctx) 927 { 928 struct vbdev_compress *comp_bdev = ctx; 929 930 TAILQ_REMOVE(&g_vbdev_comp, comp_bdev, link); 931 spdk_bdev_module_release_bdev(comp_bdev->base_bdev); 932 /* Close the underlying bdev on its same opened thread. */ 933 spdk_bdev_close(comp_bdev->base_desc); 934 comp_bdev->vol = NULL; 935 if (comp_bdev->orphaned == false) { 936 spdk_io_device_unregister(comp_bdev, _device_unregister_cb); 937 } else { 938 vbdev_compress_delete_done(comp_bdev->delete_ctx, 0); 939 _device_unregister_cb(comp_bdev); 940 } 941 } 942 943 static void 944 vbdev_compress_destruct_cb(void *cb_arg, int reduce_errno) 945 { 946 struct vbdev_compress *comp_bdev = (struct vbdev_compress *)cb_arg; 947 948 if (reduce_errno) { 949 SPDK_ERRLOG("number %d\n", reduce_errno); 950 } else { 951 if (comp_bdev->thread && comp_bdev->thread != spdk_get_thread()) { 952 spdk_thread_send_msg(comp_bdev->thread, 953 _vbdev_compress_destruct_cb, comp_bdev); 954 } else { 955 _vbdev_compress_destruct_cb(comp_bdev); 956 } 957 } 958 } 959 960 static void 961 _reduce_destroy_cb(void *ctx, int reduce_errno) 962 { 963 struct vbdev_compress *comp_bdev = (struct vbdev_compress *)ctx; 964 965 if (reduce_errno) { 966 SPDK_ERRLOG("number %d\n", reduce_errno); 967 } 968 969 comp_bdev->vol = NULL; 970 spdk_put_io_channel(comp_bdev->base_ch); 971 if (comp_bdev->orphaned == false) { 972 spdk_bdev_unregister(&comp_bdev->comp_bdev, vbdev_compress_delete_done, 973 comp_bdev->delete_ctx); 974 } else { 975 vbdev_compress_destruct_cb((void *)comp_bdev, 0); 976 } 977 978 } 979 980 static void 981 _delete_vol_unload_cb(void *ctx) 982 { 983 struct vbdev_compress *comp_bdev = ctx; 984 985 /* FIXME: Assert if these conditions are not satisfied for now. */ 986 assert(!comp_bdev->reduce_thread || 987 comp_bdev->reduce_thread == spdk_get_thread()); 988 989 /* reducelib needs a channel to comm with the backing device */ 990 comp_bdev->base_ch = spdk_bdev_get_io_channel(comp_bdev->base_desc); 991 992 /* Clean the device before we free our resources. */ 993 spdk_reduce_vol_destroy(&comp_bdev->backing_dev, _reduce_destroy_cb, comp_bdev); 994 } 995 996 /* Called by reduceLib after performing unload vol actions */ 997 static void 998 delete_vol_unload_cb(void *cb_arg, int reduce_errno) 999 { 1000 struct vbdev_compress *comp_bdev = (struct vbdev_compress *)cb_arg; 1001 1002 if (reduce_errno) { 1003 SPDK_ERRLOG("number %d\n", reduce_errno); 1004 /* FIXME: callback should be executed. */ 1005 return; 1006 } 1007 1008 pthread_mutex_lock(&comp_bdev->reduce_lock); 1009 if (comp_bdev->reduce_thread && comp_bdev->reduce_thread != spdk_get_thread()) { 1010 spdk_thread_send_msg(comp_bdev->reduce_thread, 1011 _delete_vol_unload_cb, comp_bdev); 1012 pthread_mutex_unlock(&comp_bdev->reduce_lock); 1013 } else { 1014 pthread_mutex_unlock(&comp_bdev->reduce_lock); 1015 1016 _delete_vol_unload_cb(comp_bdev); 1017 } 1018 } 1019 1020 const char * 1021 compress_get_name(const struct vbdev_compress *comp_bdev) 1022 { 1023 return comp_bdev->comp_bdev.name; 1024 } 1025 1026 struct vbdev_compress * 1027 compress_bdev_first(void) 1028 { 1029 struct vbdev_compress *comp_bdev; 1030 1031 comp_bdev = TAILQ_FIRST(&g_vbdev_comp); 1032 1033 return comp_bdev; 1034 } 1035 1036 struct vbdev_compress * 1037 compress_bdev_next(struct vbdev_compress *prev) 1038 { 1039 struct vbdev_compress *comp_bdev; 1040 1041 comp_bdev = TAILQ_NEXT(prev, link); 1042 1043 return comp_bdev; 1044 } 1045 1046 bool 1047 compress_has_orphan(const char *name) 1048 { 1049 struct vbdev_compress *comp_bdev; 1050 1051 TAILQ_FOREACH(comp_bdev, &g_vbdev_comp, link) { 1052 if (comp_bdev->orphaned && strcmp(name, comp_bdev->comp_bdev.name) == 0) { 1053 return true; 1054 } 1055 } 1056 return false; 1057 } 1058 1059 /* Called after we've unregistered following a hot remove callback. 1060 * Our finish entry point will be called next. 1061 */ 1062 static int 1063 vbdev_compress_destruct(void *ctx) 1064 { 1065 struct vbdev_compress *comp_bdev = (struct vbdev_compress *)ctx; 1066 1067 if (comp_bdev->vol != NULL) { 1068 /* Tell reducelib that we're done with this volume. */ 1069 spdk_reduce_vol_unload(comp_bdev->vol, vbdev_compress_destruct_cb, comp_bdev); 1070 } else { 1071 vbdev_compress_destruct_cb(comp_bdev, 0); 1072 } 1073 1074 return 0; 1075 } 1076 1077 /* We supplied this as an entry point for upper layers who want to communicate to this 1078 * bdev. This is how they get a channel. 1079 */ 1080 static struct spdk_io_channel * 1081 vbdev_compress_get_io_channel(void *ctx) 1082 { 1083 struct vbdev_compress *comp_bdev = (struct vbdev_compress *)ctx; 1084 1085 /* The IO channel code will allocate a channel for us which consists of 1086 * the SPDK channel structure plus the size of our comp_io_channel struct 1087 * that we passed in when we registered our IO device. It will then call 1088 * our channel create callback to populate any elements that we need to 1089 * update. 1090 */ 1091 return spdk_get_io_channel(comp_bdev); 1092 } 1093 1094 /* This is the output for bdev_get_bdevs() for this vbdev */ 1095 static int 1096 vbdev_compress_dump_info_json(void *ctx, struct spdk_json_write_ctx *w) 1097 { 1098 struct vbdev_compress *comp_bdev = (struct vbdev_compress *)ctx; 1099 1100 spdk_json_write_name(w, "compress"); 1101 spdk_json_write_object_begin(w); 1102 spdk_json_write_named_string(w, "name", spdk_bdev_get_name(&comp_bdev->comp_bdev)); 1103 spdk_json_write_named_string(w, "base_bdev_name", spdk_bdev_get_name(comp_bdev->base_bdev)); 1104 spdk_json_write_named_string(w, "compression_pmd", comp_bdev->drv_name); 1105 spdk_json_write_object_end(w); 1106 1107 return 0; 1108 } 1109 1110 /* This is used to generate JSON that can configure this module to its current state. */ 1111 static int 1112 vbdev_compress_config_json(struct spdk_json_write_ctx *w) 1113 { 1114 struct vbdev_compress *comp_bdev; 1115 1116 TAILQ_FOREACH(comp_bdev, &g_vbdev_comp, link) { 1117 spdk_json_write_object_begin(w); 1118 spdk_json_write_named_string(w, "method", "bdev_compress_create"); 1119 spdk_json_write_named_object_begin(w, "params"); 1120 spdk_json_write_named_string(w, "base_bdev_name", spdk_bdev_get_name(comp_bdev->base_bdev)); 1121 spdk_json_write_named_string(w, "name", spdk_bdev_get_name(&comp_bdev->comp_bdev)); 1122 spdk_json_write_named_string(w, "compression_pmd", comp_bdev->drv_name); 1123 spdk_json_write_object_end(w); 1124 spdk_json_write_object_end(w); 1125 } 1126 return 0; 1127 } 1128 1129 static void 1130 _vbdev_reduce_init_cb(void *ctx) 1131 { 1132 struct vbdev_compress *meta_ctx = ctx; 1133 int rc; 1134 1135 assert(meta_ctx->base_desc != NULL); 1136 1137 /* We're done with metadata operations */ 1138 spdk_put_io_channel(meta_ctx->base_ch); 1139 1140 if (meta_ctx->vol) { 1141 rc = vbdev_compress_claim(meta_ctx); 1142 if (rc == 0) { 1143 return; 1144 } 1145 } 1146 1147 /* Close the underlying bdev on its same opened thread. */ 1148 spdk_bdev_close(meta_ctx->base_desc); 1149 free(meta_ctx); 1150 } 1151 1152 /* Callback from reduce for when init is complete. We'll pass the vbdev_comp struct 1153 * used for initial metadata operations to claim where it will be further filled out 1154 * and added to the global list. 1155 */ 1156 static void 1157 vbdev_reduce_init_cb(void *cb_arg, struct spdk_reduce_vol *vol, int reduce_errno) 1158 { 1159 struct vbdev_compress *meta_ctx = cb_arg; 1160 1161 if (reduce_errno == 0) { 1162 meta_ctx->vol = vol; 1163 } else { 1164 SPDK_ERRLOG("for vol %s, error %u\n", 1165 spdk_bdev_get_name(meta_ctx->base_bdev), reduce_errno); 1166 } 1167 1168 if (meta_ctx->thread && meta_ctx->thread != spdk_get_thread()) { 1169 spdk_thread_send_msg(meta_ctx->thread, _vbdev_reduce_init_cb, meta_ctx); 1170 } else { 1171 _vbdev_reduce_init_cb(meta_ctx); 1172 } 1173 } 1174 1175 /* Callback for the function used by reduceLib to perform IO to/from the backing device. We just 1176 * call the callback provided by reduceLib when it called the read/write/unmap function and 1177 * free the bdev_io. 1178 */ 1179 static void 1180 comp_reduce_io_cb(struct spdk_bdev_io *bdev_io, bool success, void *arg) 1181 { 1182 struct spdk_reduce_vol_cb_args *cb_args = arg; 1183 int reduce_errno; 1184 1185 if (success) { 1186 reduce_errno = 0; 1187 } else { 1188 reduce_errno = -EIO; 1189 } 1190 spdk_bdev_free_io(bdev_io); 1191 cb_args->cb_fn(cb_args->cb_arg, reduce_errno); 1192 } 1193 1194 /* This is the function provided to the reduceLib for sending reads directly to 1195 * the backing device. 1196 */ 1197 static void 1198 _comp_reduce_readv(struct spdk_reduce_backing_dev *dev, struct iovec *iov, int iovcnt, 1199 uint64_t lba, uint32_t lba_count, struct spdk_reduce_vol_cb_args *args) 1200 { 1201 struct vbdev_compress *comp_bdev = SPDK_CONTAINEROF(dev, struct vbdev_compress, 1202 backing_dev); 1203 int rc; 1204 1205 rc = spdk_bdev_readv_blocks(comp_bdev->base_desc, comp_bdev->base_ch, 1206 iov, iovcnt, lba, lba_count, 1207 comp_reduce_io_cb, 1208 args); 1209 if (rc) { 1210 if (rc == -ENOMEM) { 1211 SPDK_ERRLOG("No memory, start to queue io.\n"); 1212 /* TODO: there's no bdev_io to queue */ 1213 } else { 1214 SPDK_ERRLOG("submitting readv request\n"); 1215 } 1216 args->cb_fn(args->cb_arg, rc); 1217 } 1218 } 1219 1220 /* This is the function provided to the reduceLib for sending writes directly to 1221 * the backing device. 1222 */ 1223 static void 1224 _comp_reduce_writev(struct spdk_reduce_backing_dev *dev, struct iovec *iov, int iovcnt, 1225 uint64_t lba, uint32_t lba_count, struct spdk_reduce_vol_cb_args *args) 1226 { 1227 struct vbdev_compress *comp_bdev = SPDK_CONTAINEROF(dev, struct vbdev_compress, 1228 backing_dev); 1229 int rc; 1230 1231 rc = spdk_bdev_writev_blocks(comp_bdev->base_desc, comp_bdev->base_ch, 1232 iov, iovcnt, lba, lba_count, 1233 comp_reduce_io_cb, 1234 args); 1235 if (rc) { 1236 if (rc == -ENOMEM) { 1237 SPDK_ERRLOG("No memory, start to queue io.\n"); 1238 /* TODO: there's no bdev_io to queue */ 1239 } else { 1240 SPDK_ERRLOG("error submitting writev request\n"); 1241 } 1242 args->cb_fn(args->cb_arg, rc); 1243 } 1244 } 1245 1246 /* This is the function provided to the reduceLib for sending unmaps directly to 1247 * the backing device. 1248 */ 1249 static void 1250 _comp_reduce_unmap(struct spdk_reduce_backing_dev *dev, 1251 uint64_t lba, uint32_t lba_count, struct spdk_reduce_vol_cb_args *args) 1252 { 1253 struct vbdev_compress *comp_bdev = SPDK_CONTAINEROF(dev, struct vbdev_compress, 1254 backing_dev); 1255 int rc; 1256 1257 rc = spdk_bdev_unmap_blocks(comp_bdev->base_desc, comp_bdev->base_ch, 1258 lba, lba_count, 1259 comp_reduce_io_cb, 1260 args); 1261 1262 if (rc) { 1263 if (rc == -ENOMEM) { 1264 SPDK_ERRLOG("No memory, start to queue io.\n"); 1265 /* TODO: there's no bdev_io to queue */ 1266 } else { 1267 SPDK_ERRLOG("submitting unmap request\n"); 1268 } 1269 args->cb_fn(args->cb_arg, rc); 1270 } 1271 } 1272 1273 /* Called by reduceLib after performing unload vol actions following base bdev hotremove */ 1274 static void 1275 bdev_hotremove_vol_unload_cb(void *cb_arg, int reduce_errno) 1276 { 1277 struct vbdev_compress *comp_bdev = (struct vbdev_compress *)cb_arg; 1278 1279 if (reduce_errno) { 1280 SPDK_ERRLOG("number %d\n", reduce_errno); 1281 } 1282 1283 comp_bdev->vol = NULL; 1284 spdk_bdev_unregister(&comp_bdev->comp_bdev, NULL, NULL); 1285 } 1286 1287 static void 1288 vbdev_compress_base_bdev_hotremove_cb(struct spdk_bdev *bdev_find) 1289 { 1290 struct vbdev_compress *comp_bdev, *tmp; 1291 1292 TAILQ_FOREACH_SAFE(comp_bdev, &g_vbdev_comp, link, tmp) { 1293 if (bdev_find == comp_bdev->base_bdev) { 1294 /* Tell reduceLib that we're done with this volume. */ 1295 spdk_reduce_vol_unload(comp_bdev->vol, bdev_hotremove_vol_unload_cb, comp_bdev); 1296 } 1297 } 1298 } 1299 1300 /* Called when the underlying base bdev triggers asynchronous event such as bdev removal. */ 1301 static void 1302 vbdev_compress_base_bdev_event_cb(enum spdk_bdev_event_type type, struct spdk_bdev *bdev, 1303 void *event_ctx) 1304 { 1305 switch (type) { 1306 case SPDK_BDEV_EVENT_REMOVE: 1307 vbdev_compress_base_bdev_hotremove_cb(bdev); 1308 break; 1309 default: 1310 SPDK_NOTICELOG("Unsupported bdev event: type %d\n", type); 1311 break; 1312 } 1313 } 1314 1315 /* TODO: determine which parms we want user configurable, HC for now 1316 * params.vol_size 1317 * params.chunk_size 1318 * compression PMD, algorithm, window size, comp level, etc. 1319 * DEV_MD_PATH 1320 */ 1321 1322 /* Common function for init and load to allocate and populate the minimal 1323 * information for reducelib to init or load. 1324 */ 1325 struct vbdev_compress * 1326 _prepare_for_load_init(struct spdk_bdev_desc *bdev_desc, uint32_t lb_size) 1327 { 1328 struct vbdev_compress *meta_ctx; 1329 struct spdk_bdev *bdev; 1330 1331 meta_ctx = calloc(1, sizeof(struct vbdev_compress)); 1332 if (meta_ctx == NULL) { 1333 SPDK_ERRLOG("failed to alloc init contexts\n"); 1334 return NULL; 1335 } 1336 1337 meta_ctx->drv_name = "None"; 1338 meta_ctx->backing_dev.unmap = _comp_reduce_unmap; 1339 meta_ctx->backing_dev.readv = _comp_reduce_readv; 1340 meta_ctx->backing_dev.writev = _comp_reduce_writev; 1341 meta_ctx->backing_dev.compress = _comp_reduce_compress; 1342 meta_ctx->backing_dev.decompress = _comp_reduce_decompress; 1343 1344 meta_ctx->base_desc = bdev_desc; 1345 bdev = spdk_bdev_desc_get_bdev(bdev_desc); 1346 meta_ctx->base_bdev = bdev; 1347 1348 meta_ctx->backing_dev.blocklen = bdev->blocklen; 1349 meta_ctx->backing_dev.blockcnt = bdev->blockcnt; 1350 1351 meta_ctx->params.chunk_size = CHUNK_SIZE; 1352 if (lb_size == 0) { 1353 meta_ctx->params.logical_block_size = bdev->blocklen; 1354 } else { 1355 meta_ctx->params.logical_block_size = lb_size; 1356 } 1357 1358 meta_ctx->params.backing_io_unit_size = BACKING_IO_SZ; 1359 return meta_ctx; 1360 } 1361 1362 static bool 1363 _set_pmd(struct vbdev_compress *comp_dev) 1364 { 1365 if (g_opts == COMPRESS_PMD_AUTO) { 1366 if (g_qat_available) { 1367 comp_dev->drv_name = QAT_PMD; 1368 } else if (g_mlx5_pci_available) { 1369 comp_dev->drv_name = MLX5_PMD; 1370 } else { 1371 comp_dev->drv_name = ISAL_PMD; 1372 } 1373 } else if (g_opts == COMPRESS_PMD_QAT_ONLY && g_qat_available) { 1374 comp_dev->drv_name = QAT_PMD; 1375 } else if (g_opts == COMPRESS_PMD_ISAL_ONLY && g_isal_available) { 1376 comp_dev->drv_name = ISAL_PMD; 1377 } else if (g_opts == COMPRESS_PMD_MLX5_PCI_ONLY && g_mlx5_pci_available) { 1378 comp_dev->drv_name = MLX5_PMD; 1379 } else { 1380 SPDK_ERRLOG("Requested PMD is not available.\n"); 1381 return false; 1382 } 1383 SPDK_NOTICELOG("PMD being used: %s\n", comp_dev->drv_name); 1384 return true; 1385 } 1386 1387 /* Call reducelib to initialize a new volume */ 1388 static int 1389 vbdev_init_reduce(const char *bdev_name, const char *pm_path, uint32_t lb_size) 1390 { 1391 struct spdk_bdev_desc *bdev_desc = NULL; 1392 struct vbdev_compress *meta_ctx; 1393 int rc; 1394 1395 rc = spdk_bdev_open_ext(bdev_name, true, vbdev_compress_base_bdev_event_cb, 1396 NULL, &bdev_desc); 1397 if (rc) { 1398 SPDK_ERRLOG("could not open bdev %s\n", bdev_name); 1399 return rc; 1400 } 1401 1402 meta_ctx = _prepare_for_load_init(bdev_desc, lb_size); 1403 if (meta_ctx == NULL) { 1404 spdk_bdev_close(bdev_desc); 1405 return -EINVAL; 1406 } 1407 1408 if (_set_pmd(meta_ctx) == false) { 1409 SPDK_ERRLOG("could not find required pmd\n"); 1410 free(meta_ctx); 1411 spdk_bdev_close(bdev_desc); 1412 return -EINVAL; 1413 } 1414 1415 /* Save the thread where the base device is opened */ 1416 meta_ctx->thread = spdk_get_thread(); 1417 1418 meta_ctx->base_ch = spdk_bdev_get_io_channel(meta_ctx->base_desc); 1419 1420 spdk_reduce_vol_init(&meta_ctx->params, &meta_ctx->backing_dev, 1421 pm_path, 1422 vbdev_reduce_init_cb, 1423 meta_ctx); 1424 return 0; 1425 } 1426 1427 /* We provide this callback for the SPDK channel code to create a channel using 1428 * the channel struct we provided in our module get_io_channel() entry point. Here 1429 * we get and save off an underlying base channel of the device below us so that 1430 * we can communicate with the base bdev on a per channel basis. If we needed 1431 * our own poller for this vbdev, we'd register it here. 1432 */ 1433 static int 1434 comp_bdev_ch_create_cb(void *io_device, void *ctx_buf) 1435 { 1436 struct vbdev_compress *comp_bdev = io_device; 1437 struct comp_device_qp *device_qp; 1438 1439 /* Now set the reduce channel if it's not already set. */ 1440 pthread_mutex_lock(&comp_bdev->reduce_lock); 1441 if (comp_bdev->ch_count == 0) { 1442 /* We use this queue to track outstanding IO in our layer. */ 1443 TAILQ_INIT(&comp_bdev->pending_comp_ios); 1444 1445 /* We use this to queue up compression operations as needed. */ 1446 TAILQ_INIT(&comp_bdev->queued_comp_ops); 1447 1448 comp_bdev->base_ch = spdk_bdev_get_io_channel(comp_bdev->base_desc); 1449 comp_bdev->reduce_thread = spdk_get_thread(); 1450 comp_bdev->poller = SPDK_POLLER_REGISTER(comp_dev_poller, comp_bdev, 0); 1451 /* Now assign a q pair */ 1452 pthread_mutex_lock(&g_comp_device_qp_lock); 1453 TAILQ_FOREACH(device_qp, &g_comp_device_qp, link) { 1454 if (strcmp(device_qp->device->cdev_info.driver_name, comp_bdev->drv_name) == 0) { 1455 if (device_qp->thread == spdk_get_thread()) { 1456 comp_bdev->device_qp = device_qp; 1457 break; 1458 } 1459 if (device_qp->thread == NULL) { 1460 comp_bdev->device_qp = device_qp; 1461 device_qp->thread = spdk_get_thread(); 1462 break; 1463 } 1464 } 1465 } 1466 pthread_mutex_unlock(&g_comp_device_qp_lock); 1467 } 1468 comp_bdev->ch_count++; 1469 pthread_mutex_unlock(&comp_bdev->reduce_lock); 1470 1471 if (comp_bdev->device_qp != NULL) { 1472 return 0; 1473 } else { 1474 SPDK_ERRLOG("out of qpairs, cannot assign one to comp_bdev %p\n", comp_bdev); 1475 assert(false); 1476 return -ENOMEM; 1477 } 1478 } 1479 1480 static void 1481 _channel_cleanup(struct vbdev_compress *comp_bdev) 1482 { 1483 /* Note: comp_bdevs can share a device_qp if they are 1484 * on the same thread so we leave the device_qp element 1485 * alone for this comp_bdev and just clear the reduce thread. 1486 */ 1487 spdk_put_io_channel(comp_bdev->base_ch); 1488 comp_bdev->reduce_thread = NULL; 1489 spdk_poller_unregister(&comp_bdev->poller); 1490 } 1491 1492 /* Used to reroute destroy_ch to the correct thread */ 1493 static void 1494 _comp_bdev_ch_destroy_cb(void *arg) 1495 { 1496 struct vbdev_compress *comp_bdev = arg; 1497 1498 pthread_mutex_lock(&comp_bdev->reduce_lock); 1499 _channel_cleanup(comp_bdev); 1500 pthread_mutex_unlock(&comp_bdev->reduce_lock); 1501 } 1502 1503 /* We provide this callback for the SPDK channel code to destroy a channel 1504 * created with our create callback. We just need to undo anything we did 1505 * when we created. If this bdev used its own poller, we'd unregister it here. 1506 */ 1507 static void 1508 comp_bdev_ch_destroy_cb(void *io_device, void *ctx_buf) 1509 { 1510 struct vbdev_compress *comp_bdev = io_device; 1511 1512 pthread_mutex_lock(&comp_bdev->reduce_lock); 1513 comp_bdev->ch_count--; 1514 if (comp_bdev->ch_count == 0) { 1515 /* Send this request to the thread where the channel was created. */ 1516 if (comp_bdev->reduce_thread != spdk_get_thread()) { 1517 spdk_thread_send_msg(comp_bdev->reduce_thread, 1518 _comp_bdev_ch_destroy_cb, comp_bdev); 1519 } else { 1520 _channel_cleanup(comp_bdev); 1521 } 1522 } 1523 pthread_mutex_unlock(&comp_bdev->reduce_lock); 1524 } 1525 1526 /* RPC entry point for compression vbdev creation. */ 1527 int 1528 create_compress_bdev(const char *bdev_name, const char *pm_path, uint32_t lb_size) 1529 { 1530 if ((lb_size != 0) && (lb_size != LB_SIZE_4K) && (lb_size != LB_SIZE_512B)) { 1531 SPDK_ERRLOG("Logical block size must be 512 or 4096\n"); 1532 return -EINVAL; 1533 } 1534 1535 return vbdev_init_reduce(bdev_name, pm_path, lb_size); 1536 } 1537 1538 /* On init, just init the compress drivers. All metadata is stored on disk. */ 1539 static int 1540 vbdev_compress_init(void) 1541 { 1542 if (vbdev_init_compress_drivers()) { 1543 SPDK_ERRLOG("Error setting up compression devices\n"); 1544 return -EINVAL; 1545 } 1546 1547 return 0; 1548 } 1549 1550 /* Called when the entire module is being torn down. */ 1551 static void 1552 vbdev_compress_finish(void) 1553 { 1554 struct comp_device_qp *dev_qp; 1555 /* TODO: unload vol in a future patch */ 1556 1557 while ((dev_qp = TAILQ_FIRST(&g_comp_device_qp))) { 1558 TAILQ_REMOVE(&g_comp_device_qp, dev_qp, link); 1559 free(dev_qp); 1560 } 1561 pthread_mutex_destroy(&g_comp_device_qp_lock); 1562 1563 rte_mempool_free(g_comp_op_mp); 1564 rte_mempool_free(g_mbuf_mp); 1565 } 1566 1567 /* During init we'll be asked how much memory we'd like passed to us 1568 * in bev_io structures as context. Here's where we specify how 1569 * much context we want per IO. 1570 */ 1571 static int 1572 vbdev_compress_get_ctx_size(void) 1573 { 1574 return sizeof(struct comp_bdev_io); 1575 } 1576 1577 /* When we register our bdev this is how we specify our entry points. */ 1578 static const struct spdk_bdev_fn_table vbdev_compress_fn_table = { 1579 .destruct = vbdev_compress_destruct, 1580 .submit_request = vbdev_compress_submit_request, 1581 .io_type_supported = vbdev_compress_io_type_supported, 1582 .get_io_channel = vbdev_compress_get_io_channel, 1583 .dump_info_json = vbdev_compress_dump_info_json, 1584 .write_config_json = NULL, 1585 }; 1586 1587 static struct spdk_bdev_module compress_if = { 1588 .name = "compress", 1589 .module_init = vbdev_compress_init, 1590 .get_ctx_size = vbdev_compress_get_ctx_size, 1591 .examine_disk = vbdev_compress_examine, 1592 .module_fini = vbdev_compress_finish, 1593 .config_json = vbdev_compress_config_json 1594 }; 1595 1596 SPDK_BDEV_MODULE_REGISTER(compress, &compress_if) 1597 1598 static int _set_compbdev_name(struct vbdev_compress *comp_bdev) 1599 { 1600 struct spdk_bdev_alias *aliases; 1601 1602 if (!TAILQ_EMPTY(spdk_bdev_get_aliases(comp_bdev->base_bdev))) { 1603 aliases = TAILQ_FIRST(spdk_bdev_get_aliases(comp_bdev->base_bdev)); 1604 comp_bdev->comp_bdev.name = spdk_sprintf_alloc("COMP_%s", aliases->alias.name); 1605 if (!comp_bdev->comp_bdev.name) { 1606 SPDK_ERRLOG("could not allocate comp_bdev name for alias\n"); 1607 return -ENOMEM; 1608 } 1609 } else { 1610 comp_bdev->comp_bdev.name = spdk_sprintf_alloc("COMP_%s", comp_bdev->base_bdev->name); 1611 if (!comp_bdev->comp_bdev.name) { 1612 SPDK_ERRLOG("could not allocate comp_bdev name for unique name\n"); 1613 return -ENOMEM; 1614 } 1615 } 1616 return 0; 1617 } 1618 1619 static int 1620 vbdev_compress_claim(struct vbdev_compress *comp_bdev) 1621 { 1622 int rc; 1623 1624 if (_set_compbdev_name(comp_bdev)) { 1625 return -EINVAL; 1626 } 1627 1628 /* Note: some of the fields below will change in the future - for example, 1629 * blockcnt specifically will not match (the compressed volume size will 1630 * be slightly less than the base bdev size) 1631 */ 1632 comp_bdev->comp_bdev.product_name = COMP_BDEV_NAME; 1633 comp_bdev->comp_bdev.write_cache = comp_bdev->base_bdev->write_cache; 1634 1635 if (strcmp(comp_bdev->drv_name, QAT_PMD) == 0) { 1636 comp_bdev->comp_bdev.required_alignment = 1637 spdk_max(spdk_u32log2(comp_bdev->base_bdev->blocklen), 1638 comp_bdev->base_bdev->required_alignment); 1639 SPDK_NOTICELOG("QAT in use: Required alignment set to %u\n", 1640 comp_bdev->comp_bdev.required_alignment); 1641 } else { 1642 comp_bdev->comp_bdev.required_alignment = comp_bdev->base_bdev->required_alignment; 1643 } 1644 comp_bdev->comp_bdev.optimal_io_boundary = 1645 comp_bdev->params.chunk_size / comp_bdev->params.logical_block_size; 1646 1647 comp_bdev->comp_bdev.split_on_optimal_io_boundary = true; 1648 1649 comp_bdev->comp_bdev.blocklen = comp_bdev->params.logical_block_size; 1650 comp_bdev->comp_bdev.blockcnt = comp_bdev->params.vol_size / comp_bdev->comp_bdev.blocklen; 1651 assert(comp_bdev->comp_bdev.blockcnt > 0); 1652 1653 /* This is the context that is passed to us when the bdev 1654 * layer calls in so we'll save our comp_bdev node here. 1655 */ 1656 comp_bdev->comp_bdev.ctxt = comp_bdev; 1657 comp_bdev->comp_bdev.fn_table = &vbdev_compress_fn_table; 1658 comp_bdev->comp_bdev.module = &compress_if; 1659 1660 pthread_mutex_init(&comp_bdev->reduce_lock, NULL); 1661 1662 /* Save the thread where the base device is opened */ 1663 comp_bdev->thread = spdk_get_thread(); 1664 1665 spdk_io_device_register(comp_bdev, comp_bdev_ch_create_cb, comp_bdev_ch_destroy_cb, 1666 sizeof(struct comp_io_channel), 1667 comp_bdev->comp_bdev.name); 1668 1669 rc = spdk_bdev_module_claim_bdev(comp_bdev->base_bdev, comp_bdev->base_desc, 1670 comp_bdev->comp_bdev.module); 1671 if (rc) { 1672 SPDK_ERRLOG("could not claim bdev %s\n", spdk_bdev_get_name(comp_bdev->base_bdev)); 1673 goto error_claim; 1674 } 1675 1676 rc = spdk_bdev_register(&comp_bdev->comp_bdev); 1677 if (rc < 0) { 1678 SPDK_ERRLOG("trying to register bdev\n"); 1679 goto error_bdev_register; 1680 } 1681 1682 TAILQ_INSERT_TAIL(&g_vbdev_comp, comp_bdev, link); 1683 1684 SPDK_NOTICELOG("registered io_device and virtual bdev for: %s\n", comp_bdev->comp_bdev.name); 1685 1686 return 0; 1687 1688 /* Error cleanup paths. */ 1689 error_bdev_register: 1690 spdk_bdev_module_release_bdev(comp_bdev->base_bdev); 1691 error_claim: 1692 spdk_io_device_unregister(comp_bdev, NULL); 1693 free(comp_bdev->comp_bdev.name); 1694 return rc; 1695 } 1696 1697 static void 1698 _vbdev_compress_delete_done(void *_ctx) 1699 { 1700 struct vbdev_comp_delete_ctx *ctx = _ctx; 1701 1702 ctx->cb_fn(ctx->cb_arg, ctx->cb_rc); 1703 1704 free(ctx); 1705 } 1706 1707 static void 1708 vbdev_compress_delete_done(void *cb_arg, int bdeverrno) 1709 { 1710 struct vbdev_comp_delete_ctx *ctx = cb_arg; 1711 1712 ctx->cb_rc = bdeverrno; 1713 1714 if (ctx->orig_thread != spdk_get_thread()) { 1715 spdk_thread_send_msg(ctx->orig_thread, _vbdev_compress_delete_done, ctx); 1716 } else { 1717 _vbdev_compress_delete_done(ctx); 1718 } 1719 } 1720 1721 void 1722 bdev_compress_delete(const char *name, spdk_delete_compress_complete cb_fn, void *cb_arg) 1723 { 1724 struct vbdev_compress *comp_bdev = NULL; 1725 struct vbdev_comp_delete_ctx *ctx; 1726 1727 TAILQ_FOREACH(comp_bdev, &g_vbdev_comp, link) { 1728 if (strcmp(name, comp_bdev->comp_bdev.name) == 0) { 1729 break; 1730 } 1731 } 1732 1733 if (comp_bdev == NULL) { 1734 cb_fn(cb_arg, -ENODEV); 1735 return; 1736 } 1737 1738 ctx = calloc(1, sizeof(*ctx)); 1739 if (ctx == NULL) { 1740 SPDK_ERRLOG("Failed to allocate delete context\n"); 1741 cb_fn(cb_arg, -ENOMEM); 1742 return; 1743 } 1744 1745 /* Save these for after the vol is destroyed. */ 1746 ctx->cb_fn = cb_fn; 1747 ctx->cb_arg = cb_arg; 1748 ctx->orig_thread = spdk_get_thread(); 1749 1750 comp_bdev->delete_ctx = ctx; 1751 1752 /* Tell reducelib that we're done with this volume. */ 1753 if (comp_bdev->orphaned == false) { 1754 spdk_reduce_vol_unload(comp_bdev->vol, delete_vol_unload_cb, comp_bdev); 1755 } else { 1756 delete_vol_unload_cb(comp_bdev, 0); 1757 } 1758 } 1759 1760 static void 1761 _vbdev_reduce_load_cb(void *ctx) 1762 { 1763 struct vbdev_compress *meta_ctx = ctx; 1764 int rc; 1765 1766 assert(meta_ctx->base_desc != NULL); 1767 1768 /* Done with metadata operations */ 1769 spdk_put_io_channel(meta_ctx->base_ch); 1770 1771 if (meta_ctx->reduce_errno == 0) { 1772 if (_set_pmd(meta_ctx) == false) { 1773 SPDK_ERRLOG("could not find required pmd\n"); 1774 goto err; 1775 } 1776 1777 rc = vbdev_compress_claim(meta_ctx); 1778 if (rc != 0) { 1779 goto err; 1780 } 1781 } else if (meta_ctx->reduce_errno == -ENOENT) { 1782 if (_set_compbdev_name(meta_ctx)) { 1783 goto err; 1784 } 1785 1786 /* Save the thread where the base device is opened */ 1787 meta_ctx->thread = spdk_get_thread(); 1788 1789 meta_ctx->comp_bdev.module = &compress_if; 1790 pthread_mutex_init(&meta_ctx->reduce_lock, NULL); 1791 rc = spdk_bdev_module_claim_bdev(meta_ctx->base_bdev, meta_ctx->base_desc, 1792 meta_ctx->comp_bdev.module); 1793 if (rc) { 1794 SPDK_ERRLOG("could not claim bdev %s\n", spdk_bdev_get_name(meta_ctx->base_bdev)); 1795 free(meta_ctx->comp_bdev.name); 1796 goto err; 1797 } 1798 1799 meta_ctx->orphaned = true; 1800 TAILQ_INSERT_TAIL(&g_vbdev_comp, meta_ctx, link); 1801 } else { 1802 if (meta_ctx->reduce_errno != -EILSEQ) { 1803 SPDK_ERRLOG("for vol %s, error %u\n", 1804 spdk_bdev_get_name(meta_ctx->base_bdev), meta_ctx->reduce_errno); 1805 } 1806 goto err; 1807 } 1808 1809 spdk_bdev_module_examine_done(&compress_if); 1810 return; 1811 1812 err: 1813 /* Close the underlying bdev on its same opened thread. */ 1814 spdk_bdev_close(meta_ctx->base_desc); 1815 free(meta_ctx); 1816 spdk_bdev_module_examine_done(&compress_if); 1817 } 1818 1819 /* Callback from reduce for then load is complete. We'll pass the vbdev_comp struct 1820 * used for initial metadata operations to claim where it will be further filled out 1821 * and added to the global list. 1822 */ 1823 static void 1824 vbdev_reduce_load_cb(void *cb_arg, struct spdk_reduce_vol *vol, int reduce_errno) 1825 { 1826 struct vbdev_compress *meta_ctx = cb_arg; 1827 1828 if (reduce_errno == 0) { 1829 /* Update information following volume load. */ 1830 meta_ctx->vol = vol; 1831 memcpy(&meta_ctx->params, spdk_reduce_vol_get_params(vol), 1832 sizeof(struct spdk_reduce_vol_params)); 1833 } 1834 1835 meta_ctx->reduce_errno = reduce_errno; 1836 1837 if (meta_ctx->thread && meta_ctx->thread != spdk_get_thread()) { 1838 spdk_thread_send_msg(meta_ctx->thread, _vbdev_reduce_load_cb, meta_ctx); 1839 } else { 1840 _vbdev_reduce_load_cb(meta_ctx); 1841 } 1842 1843 } 1844 1845 /* Examine_disk entry point: will do a metadata load to see if this is ours, 1846 * and if so will go ahead and claim it. 1847 */ 1848 static void 1849 vbdev_compress_examine(struct spdk_bdev *bdev) 1850 { 1851 struct spdk_bdev_desc *bdev_desc = NULL; 1852 struct vbdev_compress *meta_ctx; 1853 int rc; 1854 1855 if (strcmp(bdev->product_name, COMP_BDEV_NAME) == 0) { 1856 spdk_bdev_module_examine_done(&compress_if); 1857 return; 1858 } 1859 1860 rc = spdk_bdev_open_ext(spdk_bdev_get_name(bdev), false, 1861 vbdev_compress_base_bdev_event_cb, NULL, &bdev_desc); 1862 if (rc) { 1863 SPDK_ERRLOG("could not open bdev %s\n", spdk_bdev_get_name(bdev)); 1864 spdk_bdev_module_examine_done(&compress_if); 1865 return; 1866 } 1867 1868 meta_ctx = _prepare_for_load_init(bdev_desc, 0); 1869 if (meta_ctx == NULL) { 1870 spdk_bdev_close(bdev_desc); 1871 spdk_bdev_module_examine_done(&compress_if); 1872 return; 1873 } 1874 1875 /* Save the thread where the base device is opened */ 1876 meta_ctx->thread = spdk_get_thread(); 1877 1878 meta_ctx->base_ch = spdk_bdev_get_io_channel(meta_ctx->base_desc); 1879 spdk_reduce_vol_load(&meta_ctx->backing_dev, vbdev_reduce_load_cb, meta_ctx); 1880 } 1881 1882 int 1883 compress_set_pmd(enum compress_pmd *opts) 1884 { 1885 g_opts = *opts; 1886 1887 return 0; 1888 } 1889 1890 SPDK_LOG_REGISTER_COMPONENT(vbdev_compress) 1891