1 /*- 2 * BSD LICENSE 3 * 4 * Copyright (c) Intel Corporation. 5 * All rights reserved. 6 * 7 * Redistribution and use in source and binary forms, with or without 8 * modification, are permitted provided that the following conditions 9 * are met: 10 * 11 * * Redistributions of source code must retain the above copyright 12 * notice, this list of conditions and the following disclaimer. 13 * * Redistributions in binary form must reproduce the above copyright 14 * notice, this list of conditions and the following disclaimer in 15 * the documentation and/or other materials provided with the 16 * distribution. 17 * * Neither the name of Intel Corporation nor the names of its 18 * contributors may be used to endorse or promote products derived 19 * from this software without specific prior written permission. 20 * 21 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 22 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 23 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR 24 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT 25 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, 26 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT 27 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 28 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY 29 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 30 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE 31 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 32 */ 33 34 #include "vbdev_compress.h" 35 36 #include "spdk/reduce.h" 37 #include "spdk/stdinc.h" 38 #include "spdk/rpc.h" 39 #include "spdk/env.h" 40 #include "spdk/endian.h" 41 #include "spdk/string.h" 42 #include "spdk/thread.h" 43 #include "spdk/util.h" 44 #include "spdk/bdev_module.h" 45 46 #include "spdk/log.h" 47 48 #include <rte_config.h> 49 #include <rte_bus_vdev.h> 50 #include <rte_compressdev.h> 51 #include <rte_comp.h> 52 #include <rte_mbuf_dyn.h> 53 54 /* Used to store IO context in mbuf */ 55 static const struct rte_mbuf_dynfield rte_mbuf_dynfield_io_context = { 56 .name = "context_reduce", 57 .size = sizeof(uint64_t), 58 .align = __alignof__(uint64_t), 59 .flags = 0, 60 }; 61 static int g_mbuf_offset; 62 63 #define NUM_MAX_XFORMS 2 64 #define NUM_MAX_INFLIGHT_OPS 128 65 #define DEFAULT_WINDOW_SIZE 15 66 /* We need extra mbufs per operation to accommodate host buffers that 67 * span a 2MB boundary. 68 */ 69 #define MAX_MBUFS_PER_OP (REDUCE_MAX_IOVECS * 2) 70 #define CHUNK_SIZE (1024 * 16) 71 #define COMP_BDEV_NAME "compress" 72 #define BACKING_IO_SZ (4 * 1024) 73 74 #define ISAL_PMD "compress_isal" 75 #define QAT_PMD "compress_qat" 76 #define NUM_MBUFS 8192 77 #define POOL_CACHE_SIZE 256 78 79 static enum compress_pmd g_opts; 80 81 /* Global list of available compression devices. */ 82 struct compress_dev { 83 struct rte_compressdev_info cdev_info; /* includes device friendly name */ 84 uint8_t cdev_id; /* identifier for the device */ 85 void *comp_xform; /* shared private xform for comp on this PMD */ 86 void *decomp_xform; /* shared private xform for decomp on this PMD */ 87 TAILQ_ENTRY(compress_dev) link; 88 }; 89 static TAILQ_HEAD(, compress_dev) g_compress_devs = TAILQ_HEAD_INITIALIZER(g_compress_devs); 90 91 /* Although ISAL PMD reports 'unlimited' qpairs, it has an unplanned limit of 99 due to 92 * the length of the internal ring name that it creates, it breaks a limit in the generic 93 * ring code and fails the qp initialization. 94 */ 95 #define MAX_NUM_QP 99 96 /* Global list and lock for unique device/queue pair combos */ 97 struct comp_device_qp { 98 struct compress_dev *device; /* ptr to compression device */ 99 uint8_t qp; /* queue pair for this node */ 100 struct spdk_thread *thread; /* thead that this qp is assigned to */ 101 TAILQ_ENTRY(comp_device_qp) link; 102 }; 103 static TAILQ_HEAD(, comp_device_qp) g_comp_device_qp = TAILQ_HEAD_INITIALIZER(g_comp_device_qp); 104 static pthread_mutex_t g_comp_device_qp_lock = PTHREAD_MUTEX_INITIALIZER; 105 106 /* For queueing up compression operations that we can't submit for some reason */ 107 struct vbdev_comp_op { 108 struct spdk_reduce_backing_dev *backing_dev; 109 struct iovec *src_iovs; 110 int src_iovcnt; 111 struct iovec *dst_iovs; 112 int dst_iovcnt; 113 bool compress; 114 void *cb_arg; 115 TAILQ_ENTRY(vbdev_comp_op) link; 116 }; 117 118 struct vbdev_comp_delete_ctx { 119 spdk_delete_compress_complete cb_fn; 120 void *cb_arg; 121 int cb_rc; 122 struct spdk_thread *orig_thread; 123 }; 124 125 /* List of virtual bdevs and associated info for each. */ 126 struct vbdev_compress { 127 struct spdk_bdev *base_bdev; /* the thing we're attaching to */ 128 struct spdk_bdev_desc *base_desc; /* its descriptor we get from open */ 129 struct spdk_io_channel *base_ch; /* IO channel of base device */ 130 struct spdk_bdev comp_bdev; /* the compression virtual bdev */ 131 struct comp_io_channel *comp_ch; /* channel associated with this bdev */ 132 char *drv_name; /* name of the compression device driver */ 133 struct comp_device_qp *device_qp; 134 struct spdk_thread *reduce_thread; 135 pthread_mutex_t reduce_lock; 136 uint32_t ch_count; 137 TAILQ_HEAD(, spdk_bdev_io) pending_comp_ios; /* outstanding operations to a comp library */ 138 struct spdk_poller *poller; /* completion poller */ 139 struct spdk_reduce_vol_params params; /* params for the reduce volume */ 140 struct spdk_reduce_backing_dev backing_dev; /* backing device info for the reduce volume */ 141 struct spdk_reduce_vol *vol; /* the reduce volume */ 142 struct vbdev_comp_delete_ctx *delete_ctx; 143 bool orphaned; /* base bdev claimed but comp_bdev not registered */ 144 int reduce_errno; 145 TAILQ_HEAD(, vbdev_comp_op) queued_comp_ops; 146 TAILQ_ENTRY(vbdev_compress) link; 147 struct spdk_thread *thread; /* thread where base device is opened */ 148 }; 149 static TAILQ_HEAD(, vbdev_compress) g_vbdev_comp = TAILQ_HEAD_INITIALIZER(g_vbdev_comp); 150 151 /* The comp vbdev channel struct. It is allocated and freed on my behalf by the io channel code. 152 */ 153 struct comp_io_channel { 154 struct spdk_io_channel_iter *iter; /* used with for_each_channel in reset */ 155 }; 156 157 /* Per I/O context for the compression vbdev. */ 158 struct comp_bdev_io { 159 struct comp_io_channel *comp_ch; /* used in completion handling */ 160 struct vbdev_compress *comp_bdev; /* vbdev associated with this IO */ 161 struct spdk_bdev_io_wait_entry bdev_io_wait; /* for bdev_io_wait */ 162 struct spdk_bdev_io *orig_io; /* the original IO */ 163 struct spdk_io_channel *ch; /* for resubmission */ 164 int status; /* save for completion on orig thread */ 165 }; 166 167 /* Shared mempools between all devices on this system */ 168 static struct rte_mempool *g_mbuf_mp = NULL; /* mbuf mempool */ 169 static struct rte_mempool *g_comp_op_mp = NULL; /* comp operations, must be rte* mempool */ 170 static struct rte_mbuf_ext_shared_info g_shinfo = {}; /* used by DPDK mbuf macros */ 171 static bool g_qat_available = false; 172 static bool g_isal_available = false; 173 174 /* Create shared (between all ops per PMD) compress xforms. */ 175 static struct rte_comp_xform g_comp_xform = { 176 .type = RTE_COMP_COMPRESS, 177 .compress = { 178 .algo = RTE_COMP_ALGO_DEFLATE, 179 .deflate.huffman = RTE_COMP_HUFFMAN_DEFAULT, 180 .level = RTE_COMP_LEVEL_MAX, 181 .window_size = DEFAULT_WINDOW_SIZE, 182 .chksum = RTE_COMP_CHECKSUM_NONE, 183 .hash_algo = RTE_COMP_HASH_ALGO_NONE 184 } 185 }; 186 /* Create shared (between all ops per PMD) decompress xforms. */ 187 static struct rte_comp_xform g_decomp_xform = { 188 .type = RTE_COMP_DECOMPRESS, 189 .decompress = { 190 .algo = RTE_COMP_ALGO_DEFLATE, 191 .chksum = RTE_COMP_CHECKSUM_NONE, 192 .window_size = DEFAULT_WINDOW_SIZE, 193 .hash_algo = RTE_COMP_HASH_ALGO_NONE 194 } 195 }; 196 197 static void vbdev_compress_examine(struct spdk_bdev *bdev); 198 static int vbdev_compress_claim(struct vbdev_compress *comp_bdev); 199 static void vbdev_compress_queue_io(struct spdk_bdev_io *bdev_io); 200 struct vbdev_compress *_prepare_for_load_init(struct spdk_bdev_desc *bdev_desc, uint32_t lb_size); 201 static void vbdev_compress_submit_request(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io); 202 static void comp_bdev_ch_destroy_cb(void *io_device, void *ctx_buf); 203 static void vbdev_compress_delete_done(void *cb_arg, int bdeverrno); 204 205 /* Dummy function used by DPDK to free ext attached buffers 206 * to mbufs, we free them ourselves but this callback has to 207 * be here. 208 */ 209 static void 210 shinfo_free_cb(void *arg1, void *arg2) 211 { 212 } 213 214 /* Called by vbdev_init_compress_drivers() to init each discovered compression device */ 215 static int 216 create_compress_dev(uint8_t index) 217 { 218 struct compress_dev *device; 219 uint16_t q_pairs; 220 uint8_t cdev_id; 221 int rc, i; 222 struct comp_device_qp *dev_qp; 223 struct comp_device_qp *tmp_qp; 224 225 device = calloc(1, sizeof(struct compress_dev)); 226 if (!device) { 227 return -ENOMEM; 228 } 229 230 /* Get details about this device. */ 231 rte_compressdev_info_get(index, &device->cdev_info); 232 233 cdev_id = device->cdev_id = index; 234 235 /* Zero means no limit so choose number of lcores. */ 236 if (device->cdev_info.max_nb_queue_pairs == 0) { 237 q_pairs = MAX_NUM_QP; 238 } else { 239 q_pairs = spdk_min(device->cdev_info.max_nb_queue_pairs, MAX_NUM_QP); 240 } 241 242 /* Configure the compression device. */ 243 struct rte_compressdev_config config = { 244 .socket_id = rte_socket_id(), 245 .nb_queue_pairs = q_pairs, 246 .max_nb_priv_xforms = NUM_MAX_XFORMS, 247 .max_nb_streams = 0 248 }; 249 rc = rte_compressdev_configure(cdev_id, &config); 250 if (rc < 0) { 251 SPDK_ERRLOG("Failed to configure compressdev %u\n", cdev_id); 252 goto err; 253 } 254 255 /* Pre-setup all potential qpairs now and assign them in the channel 256 * callback. 257 */ 258 for (i = 0; i < q_pairs; i++) { 259 rc = rte_compressdev_queue_pair_setup(cdev_id, i, 260 NUM_MAX_INFLIGHT_OPS, 261 rte_socket_id()); 262 if (rc) { 263 if (i > 0) { 264 q_pairs = i; 265 SPDK_NOTICELOG("FYI failed to setup a queue pair on " 266 "compressdev %u with error %u " 267 "so limiting to %u qpairs\n", 268 cdev_id, rc, q_pairs); 269 break; 270 } else { 271 SPDK_ERRLOG("Failed to setup queue pair on " 272 "compressdev %u with error %u\n", cdev_id, rc); 273 rc = -EINVAL; 274 goto err; 275 } 276 } 277 } 278 279 rc = rte_compressdev_start(cdev_id); 280 if (rc < 0) { 281 SPDK_ERRLOG("Failed to start device %u: error %d\n", 282 cdev_id, rc); 283 goto err; 284 } 285 286 if (device->cdev_info.capabilities->comp_feature_flags & RTE_COMP_FF_SHAREABLE_PRIV_XFORM) { 287 rc = rte_compressdev_private_xform_create(cdev_id, &g_comp_xform, 288 &device->comp_xform); 289 if (rc < 0) { 290 SPDK_ERRLOG("Failed to create private comp xform device %u: error %d\n", 291 cdev_id, rc); 292 goto err; 293 } 294 295 rc = rte_compressdev_private_xform_create(cdev_id, &g_decomp_xform, 296 &device->decomp_xform); 297 if (rc) { 298 SPDK_ERRLOG("Failed to create private decomp xform device %u: error %d\n", 299 cdev_id, rc); 300 goto err; 301 } 302 } else { 303 SPDK_ERRLOG("PMD does not support shared transforms\n"); 304 goto err; 305 } 306 307 /* Build up list of device/qp combinations */ 308 for (i = 0; i < q_pairs; i++) { 309 dev_qp = calloc(1, sizeof(struct comp_device_qp)); 310 if (!dev_qp) { 311 rc = -ENOMEM; 312 goto err; 313 } 314 dev_qp->device = device; 315 dev_qp->qp = i; 316 dev_qp->thread = NULL; 317 TAILQ_INSERT_TAIL(&g_comp_device_qp, dev_qp, link); 318 } 319 320 TAILQ_INSERT_TAIL(&g_compress_devs, device, link); 321 322 if (strcmp(device->cdev_info.driver_name, QAT_PMD) == 0) { 323 g_qat_available = true; 324 } 325 if (strcmp(device->cdev_info.driver_name, ISAL_PMD) == 0) { 326 g_isal_available = true; 327 } 328 329 return 0; 330 331 err: 332 TAILQ_FOREACH_SAFE(dev_qp, &g_comp_device_qp, link, tmp_qp) { 333 TAILQ_REMOVE(&g_comp_device_qp, dev_qp, link); 334 free(dev_qp); 335 } 336 free(device); 337 return rc; 338 } 339 340 /* Called from driver init entry point, vbdev_compress_init() */ 341 static int 342 vbdev_init_compress_drivers(void) 343 { 344 uint8_t cdev_count, i; 345 struct compress_dev *tmp_dev; 346 struct compress_dev *device; 347 int rc; 348 349 /* We always init the compress_isal PMD */ 350 rc = rte_vdev_init(ISAL_PMD, NULL); 351 if (rc == 0) { 352 SPDK_NOTICELOG("created virtual PMD %s\n", ISAL_PMD); 353 } else if (rc == -EEXIST) { 354 SPDK_NOTICELOG("virtual PMD %s already exists.\n", ISAL_PMD); 355 } else { 356 SPDK_ERRLOG("creating virtual PMD %s\n", ISAL_PMD); 357 return -EINVAL; 358 } 359 360 /* If we have no compression devices, there's no reason to continue. */ 361 cdev_count = rte_compressdev_count(); 362 if (cdev_count == 0) { 363 return 0; 364 } 365 if (cdev_count > RTE_COMPRESS_MAX_DEVS) { 366 SPDK_ERRLOG("invalid device count from rte_compressdev_count()\n"); 367 return -EINVAL; 368 } 369 370 g_mbuf_offset = rte_mbuf_dynfield_register(&rte_mbuf_dynfield_io_context); 371 if (g_mbuf_offset < 0) { 372 SPDK_ERRLOG("error registering dynamic field with DPDK\n"); 373 return -EINVAL; 374 } 375 376 g_mbuf_mp = rte_pktmbuf_pool_create("comp_mbuf_mp", NUM_MBUFS, POOL_CACHE_SIZE, 377 sizeof(struct rte_mbuf), 0, rte_socket_id()); 378 if (g_mbuf_mp == NULL) { 379 SPDK_ERRLOG("Cannot create mbuf pool\n"); 380 rc = -ENOMEM; 381 goto error_create_mbuf; 382 } 383 384 g_comp_op_mp = rte_comp_op_pool_create("comp_op_pool", NUM_MBUFS, POOL_CACHE_SIZE, 385 0, rte_socket_id()); 386 if (g_comp_op_mp == NULL) { 387 SPDK_ERRLOG("Cannot create comp op pool\n"); 388 rc = -ENOMEM; 389 goto error_create_op; 390 } 391 392 /* Init all devices */ 393 for (i = 0; i < cdev_count; i++) { 394 rc = create_compress_dev(i); 395 if (rc != 0) { 396 goto error_create_compress_devs; 397 } 398 } 399 400 if (g_qat_available == true) { 401 SPDK_NOTICELOG("initialized QAT PMD\n"); 402 } 403 404 g_shinfo.free_cb = shinfo_free_cb; 405 406 return 0; 407 408 /* Error cleanup paths. */ 409 error_create_compress_devs: 410 TAILQ_FOREACH_SAFE(device, &g_compress_devs, link, tmp_dev) { 411 TAILQ_REMOVE(&g_compress_devs, device, link); 412 free(device); 413 } 414 error_create_op: 415 error_create_mbuf: 416 rte_mempool_free(g_mbuf_mp); 417 418 return rc; 419 } 420 421 /* for completing rw requests on the orig IO thread. */ 422 static void 423 _reduce_rw_blocks_cb(void *arg) 424 { 425 struct comp_bdev_io *io_ctx = arg; 426 427 if (io_ctx->status == 0) { 428 spdk_bdev_io_complete(io_ctx->orig_io, SPDK_BDEV_IO_STATUS_SUCCESS); 429 } else { 430 SPDK_ERRLOG("status %d on operation from reduce API\n", io_ctx->status); 431 spdk_bdev_io_complete(io_ctx->orig_io, SPDK_BDEV_IO_STATUS_FAILED); 432 } 433 } 434 435 /* Completion callback for r/w that were issued via reducelib. */ 436 static void 437 reduce_rw_blocks_cb(void *arg, int reduce_errno) 438 { 439 struct spdk_bdev_io *bdev_io = arg; 440 struct comp_bdev_io *io_ctx = (struct comp_bdev_io *)bdev_io->driver_ctx; 441 struct spdk_io_channel *ch = spdk_io_channel_from_ctx(io_ctx->comp_ch); 442 struct spdk_thread *orig_thread; 443 444 /* TODO: need to decide which error codes are bdev_io success vs failure; 445 * example examine calls reading metadata */ 446 447 io_ctx->status = reduce_errno; 448 449 /* Send this request to the orig IO thread. */ 450 orig_thread = spdk_io_channel_get_thread(ch); 451 if (orig_thread != spdk_get_thread()) { 452 spdk_thread_send_msg(orig_thread, _reduce_rw_blocks_cb, io_ctx); 453 } else { 454 _reduce_rw_blocks_cb(io_ctx); 455 } 456 } 457 458 static uint64_t 459 _setup_compress_mbuf(struct rte_mbuf **mbufs, int *mbuf_total, uint64_t *total_length, 460 struct iovec *iovs, int iovcnt, void *reduce_cb_arg) 461 { 462 uint64_t updated_length, remainder, phys_addr; 463 uint8_t *current_base = NULL; 464 int iov_index, mbuf_index; 465 int rc = 0; 466 467 /* Setup mbufs */ 468 iov_index = mbuf_index = 0; 469 while (iov_index < iovcnt) { 470 471 current_base = iovs[iov_index].iov_base; 472 if (total_length) { 473 *total_length += iovs[iov_index].iov_len; 474 } 475 assert(mbufs[mbuf_index] != NULL); 476 *RTE_MBUF_DYNFIELD(mbufs[mbuf_index], g_mbuf_offset, uint64_t *) = (uint64_t)reduce_cb_arg; 477 updated_length = iovs[iov_index].iov_len; 478 phys_addr = spdk_vtophys((void *)current_base, &updated_length); 479 480 rte_pktmbuf_attach_extbuf(mbufs[mbuf_index], 481 current_base, 482 phys_addr, 483 updated_length, 484 &g_shinfo); 485 rte_pktmbuf_append(mbufs[mbuf_index], updated_length); 486 remainder = iovs[iov_index].iov_len - updated_length; 487 488 if (mbuf_index > 0) { 489 rte_pktmbuf_chain(mbufs[0], mbufs[mbuf_index]); 490 } 491 492 /* If we crossed 2 2MB boundary we need another mbuf for the remainder */ 493 if (remainder > 0) { 494 /* allocate an mbuf at the end of the array */ 495 rc = rte_pktmbuf_alloc_bulk(g_mbuf_mp, 496 (struct rte_mbuf **)&mbufs[*mbuf_total], 1); 497 if (rc) { 498 SPDK_ERRLOG("ERROR trying to get an extra mbuf!\n"); 499 return -1; 500 } 501 (*mbuf_total)++; 502 mbuf_index++; 503 *RTE_MBUF_DYNFIELD(mbufs[mbuf_index], g_mbuf_offset, uint64_t *) = (uint64_t)reduce_cb_arg; 504 current_base += updated_length; 505 phys_addr = spdk_vtophys((void *)current_base, &remainder); 506 /* assert we don't cross another */ 507 assert(remainder == iovs[iov_index].iov_len - updated_length); 508 509 rte_pktmbuf_attach_extbuf(mbufs[mbuf_index], 510 current_base, 511 phys_addr, 512 remainder, 513 &g_shinfo); 514 rte_pktmbuf_append(mbufs[mbuf_index], remainder); 515 rte_pktmbuf_chain(mbufs[0], mbufs[mbuf_index]); 516 } 517 iov_index++; 518 mbuf_index++; 519 } 520 521 return 0; 522 } 523 524 static int 525 _compress_operation(struct spdk_reduce_backing_dev *backing_dev, struct iovec *src_iovs, 526 int src_iovcnt, struct iovec *dst_iovs, 527 int dst_iovcnt, bool compress, void *cb_arg) 528 { 529 void *reduce_cb_arg = cb_arg; 530 struct vbdev_compress *comp_bdev = SPDK_CONTAINEROF(backing_dev, struct vbdev_compress, 531 backing_dev); 532 struct rte_comp_op *comp_op; 533 struct rte_mbuf *src_mbufs[MAX_MBUFS_PER_OP]; 534 struct rte_mbuf *dst_mbufs[MAX_MBUFS_PER_OP]; 535 uint8_t cdev_id = comp_bdev->device_qp->device->cdev_id; 536 uint64_t total_length = 0; 537 int rc = 0; 538 struct vbdev_comp_op *op_to_queue; 539 int i; 540 int src_mbuf_total = src_iovcnt; 541 int dst_mbuf_total = dst_iovcnt; 542 bool device_error = false; 543 544 assert(src_iovcnt < MAX_MBUFS_PER_OP); 545 546 #ifdef DEBUG 547 memset(src_mbufs, 0, sizeof(src_mbufs)); 548 memset(dst_mbufs, 0, sizeof(dst_mbufs)); 549 #endif 550 551 comp_op = rte_comp_op_alloc(g_comp_op_mp); 552 if (!comp_op) { 553 SPDK_ERRLOG("trying to get a comp op!\n"); 554 goto error_get_op; 555 } 556 557 /* get an mbuf per iov, src and dst */ 558 rc = rte_pktmbuf_alloc_bulk(g_mbuf_mp, (struct rte_mbuf **)&src_mbufs[0], src_iovcnt); 559 if (rc) { 560 SPDK_ERRLOG("ERROR trying to get src_mbufs!\n"); 561 goto error_get_src; 562 } 563 564 rc = rte_pktmbuf_alloc_bulk(g_mbuf_mp, (struct rte_mbuf **)&dst_mbufs[0], dst_iovcnt); 565 if (rc) { 566 SPDK_ERRLOG("ERROR trying to get dst_mbufs!\n"); 567 goto error_get_dst; 568 } 569 570 /* There is a 1:1 mapping between a bdev_io and a compression operation, but 571 * all compression PMDs that SPDK uses support chaining so build our mbuf chain 572 * and associate with our single comp_op. 573 */ 574 575 rc = _setup_compress_mbuf(&src_mbufs[0], &src_mbuf_total, &total_length, 576 src_iovs, src_iovcnt, reduce_cb_arg); 577 if (rc < 0) { 578 goto error_src_dst; 579 } 580 581 comp_op->m_src = src_mbufs[0]; 582 comp_op->src.offset = 0; 583 comp_op->src.length = total_length; 584 585 /* setup dst mbufs, for the current test being used with this code there's only one vector */ 586 rc = _setup_compress_mbuf(&dst_mbufs[0], &dst_mbuf_total, NULL, 587 dst_iovs, dst_iovcnt, reduce_cb_arg); 588 if (rc < 0) { 589 goto error_src_dst; 590 } 591 592 comp_op->m_dst = dst_mbufs[0]; 593 comp_op->dst.offset = 0; 594 595 if (compress == true) { 596 comp_op->private_xform = comp_bdev->device_qp->device->comp_xform; 597 } else { 598 comp_op->private_xform = comp_bdev->device_qp->device->decomp_xform; 599 } 600 601 comp_op->op_type = RTE_COMP_OP_STATELESS; 602 comp_op->flush_flag = RTE_COMP_FLUSH_FINAL; 603 604 rc = rte_compressdev_enqueue_burst(cdev_id, comp_bdev->device_qp->qp, &comp_op, 1); 605 assert(rc <= 1); 606 607 /* We always expect 1 got queued, if 0 then we need to queue it up. */ 608 if (rc == 1) { 609 return 0; 610 } else if (comp_op->status == RTE_COMP_OP_STATUS_NOT_PROCESSED) { 611 /* we free mbufs differently depending on whether they were chained or not */ 612 rte_pktmbuf_free(comp_op->m_src); 613 rte_pktmbuf_free(comp_op->m_dst); 614 goto error_enqueue; 615 } else { 616 device_error = true; 617 goto error_src_dst; 618 } 619 620 /* Error cleanup paths. */ 621 error_src_dst: 622 for (i = 0; i < dst_mbuf_total; i++) { 623 rte_pktmbuf_free((struct rte_mbuf *)&dst_mbufs[i]); 624 } 625 error_get_dst: 626 for (i = 0; i < src_mbuf_total; i++) { 627 rte_pktmbuf_free((struct rte_mbuf *)&src_mbufs[i]); 628 } 629 error_get_src: 630 error_enqueue: 631 rte_comp_op_free(comp_op); 632 error_get_op: 633 634 if (device_error == true) { 635 /* There was an error sending the op to the device, most 636 * likely with the parameters. 637 */ 638 SPDK_ERRLOG("Compression API returned 0x%x\n", comp_op->status); 639 return -EINVAL; 640 } 641 642 op_to_queue = calloc(1, sizeof(struct vbdev_comp_op)); 643 if (op_to_queue == NULL) { 644 SPDK_ERRLOG("unable to allocate operation for queueing.\n"); 645 return -ENOMEM; 646 } 647 op_to_queue->backing_dev = backing_dev; 648 op_to_queue->src_iovs = src_iovs; 649 op_to_queue->src_iovcnt = src_iovcnt; 650 op_to_queue->dst_iovs = dst_iovs; 651 op_to_queue->dst_iovcnt = dst_iovcnt; 652 op_to_queue->compress = compress; 653 op_to_queue->cb_arg = cb_arg; 654 TAILQ_INSERT_TAIL(&comp_bdev->queued_comp_ops, 655 op_to_queue, 656 link); 657 return 0; 658 } 659 660 /* Poller for the DPDK compression driver. */ 661 static int 662 comp_dev_poller(void *args) 663 { 664 struct vbdev_compress *comp_bdev = args; 665 uint8_t cdev_id = comp_bdev->device_qp->device->cdev_id; 666 struct rte_comp_op *deq_ops[NUM_MAX_INFLIGHT_OPS]; 667 uint16_t num_deq; 668 struct spdk_reduce_vol_cb_args *reduce_args; 669 struct vbdev_comp_op *op_to_resubmit; 670 int rc, i; 671 672 num_deq = rte_compressdev_dequeue_burst(cdev_id, comp_bdev->device_qp->qp, deq_ops, 673 NUM_MAX_INFLIGHT_OPS); 674 for (i = 0; i < num_deq; i++) { 675 reduce_args = (struct spdk_reduce_vol_cb_args *)*RTE_MBUF_DYNFIELD(deq_ops[i]->m_src, g_mbuf_offset, 676 uint64_t *); 677 if (deq_ops[i]->status == RTE_COMP_OP_STATUS_SUCCESS) { 678 679 /* tell reduce this is done and what the bytecount was */ 680 reduce_args->cb_fn(reduce_args->cb_arg, deq_ops[i]->produced); 681 } else { 682 SPDK_NOTICELOG("FYI storing data uncompressed due to deque status %u\n", 683 deq_ops[i]->status); 684 685 /* Reduce will simply store uncompressed on neg errno value. */ 686 reduce_args->cb_fn(reduce_args->cb_arg, -EINVAL); 687 } 688 689 /* Now free both mbufs and the compress operation. The rte_pktmbuf_free() 690 * call takes care of freeing all of the mbufs in the chain back to their 691 * original pool. 692 */ 693 rte_pktmbuf_free(deq_ops[i]->m_src); 694 rte_pktmbuf_free(deq_ops[i]->m_dst); 695 696 /* There is no bulk free for com ops so we have to free them one at a time 697 * here however it would be rare that we'd ever have more than 1 at a time 698 * anyways. 699 */ 700 rte_comp_op_free(deq_ops[i]); 701 702 /* Check if there are any pending comp ops to process, only pull one 703 * at a time off as _compress_operation() may re-queue the op. 704 */ 705 if (!TAILQ_EMPTY(&comp_bdev->queued_comp_ops)) { 706 op_to_resubmit = TAILQ_FIRST(&comp_bdev->queued_comp_ops); 707 rc = _compress_operation(op_to_resubmit->backing_dev, 708 op_to_resubmit->src_iovs, 709 op_to_resubmit->src_iovcnt, 710 op_to_resubmit->dst_iovs, 711 op_to_resubmit->dst_iovcnt, 712 op_to_resubmit->compress, 713 op_to_resubmit->cb_arg); 714 if (rc == 0) { 715 TAILQ_REMOVE(&comp_bdev->queued_comp_ops, op_to_resubmit, link); 716 free(op_to_resubmit); 717 } 718 } 719 } 720 return num_deq == 0 ? SPDK_POLLER_IDLE : SPDK_POLLER_BUSY; 721 } 722 723 /* Entry point for reduce lib to issue a compress operation. */ 724 static void 725 _comp_reduce_compress(struct spdk_reduce_backing_dev *dev, 726 struct iovec *src_iovs, int src_iovcnt, 727 struct iovec *dst_iovs, int dst_iovcnt, 728 struct spdk_reduce_vol_cb_args *cb_arg) 729 { 730 int rc; 731 732 rc = _compress_operation(dev, src_iovs, src_iovcnt, dst_iovs, dst_iovcnt, true, cb_arg); 733 if (rc) { 734 SPDK_ERRLOG("with compress operation code %d (%s)\n", rc, spdk_strerror(-rc)); 735 cb_arg->cb_fn(cb_arg->cb_arg, rc); 736 } 737 } 738 739 /* Entry point for reduce lib to issue a decompress operation. */ 740 static void 741 _comp_reduce_decompress(struct spdk_reduce_backing_dev *dev, 742 struct iovec *src_iovs, int src_iovcnt, 743 struct iovec *dst_iovs, int dst_iovcnt, 744 struct spdk_reduce_vol_cb_args *cb_arg) 745 { 746 int rc; 747 748 rc = _compress_operation(dev, src_iovs, src_iovcnt, dst_iovs, dst_iovcnt, false, cb_arg); 749 if (rc) { 750 SPDK_ERRLOG("with decompress operation code %d (%s)\n", rc, spdk_strerror(-rc)); 751 cb_arg->cb_fn(cb_arg->cb_arg, rc); 752 } 753 } 754 755 /* Callback for getting a buf from the bdev pool in the event that the caller passed 756 * in NULL, we need to own the buffer so it doesn't get freed by another vbdev module 757 * beneath us before we're done with it. 758 */ 759 static void 760 comp_read_get_buf_cb(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io, bool success) 761 { 762 struct vbdev_compress *comp_bdev = SPDK_CONTAINEROF(bdev_io->bdev, struct vbdev_compress, 763 comp_bdev); 764 765 spdk_reduce_vol_readv(comp_bdev->vol, bdev_io->u.bdev.iovs, bdev_io->u.bdev.iovcnt, 766 bdev_io->u.bdev.offset_blocks, bdev_io->u.bdev.num_blocks, 767 reduce_rw_blocks_cb, bdev_io); 768 } 769 770 /* scheduled for completion on IO thread */ 771 static void 772 _complete_other_io(void *arg) 773 { 774 struct comp_bdev_io *io_ctx = (struct comp_bdev_io *)arg; 775 if (io_ctx->status == 0) { 776 spdk_bdev_io_complete(io_ctx->orig_io, SPDK_BDEV_IO_STATUS_SUCCESS); 777 } else { 778 spdk_bdev_io_complete(io_ctx->orig_io, SPDK_BDEV_IO_STATUS_FAILED); 779 } 780 } 781 782 /* scheduled for submission on reduce thread */ 783 static void 784 _comp_bdev_io_submit(void *arg) 785 { 786 struct spdk_bdev_io *bdev_io = arg; 787 struct comp_bdev_io *io_ctx = (struct comp_bdev_io *)bdev_io->driver_ctx; 788 struct spdk_io_channel *ch = spdk_io_channel_from_ctx(io_ctx->comp_ch); 789 struct vbdev_compress *comp_bdev = SPDK_CONTAINEROF(bdev_io->bdev, struct vbdev_compress, 790 comp_bdev); 791 struct spdk_thread *orig_thread; 792 int rc = 0; 793 794 switch (bdev_io->type) { 795 case SPDK_BDEV_IO_TYPE_READ: 796 spdk_bdev_io_get_buf(bdev_io, comp_read_get_buf_cb, 797 bdev_io->u.bdev.num_blocks * bdev_io->bdev->blocklen); 798 return; 799 case SPDK_BDEV_IO_TYPE_WRITE: 800 spdk_reduce_vol_writev(comp_bdev->vol, bdev_io->u.bdev.iovs, bdev_io->u.bdev.iovcnt, 801 bdev_io->u.bdev.offset_blocks, bdev_io->u.bdev.num_blocks, 802 reduce_rw_blocks_cb, bdev_io); 803 return; 804 /* TODO in future patch in the series */ 805 case SPDK_BDEV_IO_TYPE_RESET: 806 break; 807 case SPDK_BDEV_IO_TYPE_WRITE_ZEROES: 808 case SPDK_BDEV_IO_TYPE_UNMAP: 809 case SPDK_BDEV_IO_TYPE_FLUSH: 810 default: 811 SPDK_ERRLOG("Unknown I/O type %d\n", bdev_io->type); 812 rc = -EINVAL; 813 } 814 815 if (rc) { 816 if (rc == -ENOMEM) { 817 SPDK_ERRLOG("No memory, start to queue io for compress.\n"); 818 io_ctx->ch = ch; 819 vbdev_compress_queue_io(bdev_io); 820 return; 821 } else { 822 SPDK_ERRLOG("on bdev_io submission!\n"); 823 io_ctx->status = rc; 824 } 825 } 826 827 /* Complete this on the orig IO thread. */ 828 orig_thread = spdk_io_channel_get_thread(ch); 829 if (orig_thread != spdk_get_thread()) { 830 spdk_thread_send_msg(orig_thread, _complete_other_io, io_ctx); 831 } else { 832 _complete_other_io(io_ctx); 833 } 834 } 835 836 /* Called when someone above submits IO to this vbdev. */ 837 static void 838 vbdev_compress_submit_request(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io) 839 { 840 struct comp_bdev_io *io_ctx = (struct comp_bdev_io *)bdev_io->driver_ctx; 841 struct vbdev_compress *comp_bdev = SPDK_CONTAINEROF(bdev_io->bdev, struct vbdev_compress, 842 comp_bdev); 843 struct comp_io_channel *comp_ch = spdk_io_channel_get_ctx(ch); 844 845 memset(io_ctx, 0, sizeof(struct comp_bdev_io)); 846 io_ctx->comp_bdev = comp_bdev; 847 io_ctx->comp_ch = comp_ch; 848 io_ctx->orig_io = bdev_io; 849 850 /* Send this request to the reduce_thread if that's not what we're on. */ 851 if (spdk_get_thread() != comp_bdev->reduce_thread) { 852 spdk_thread_send_msg(comp_bdev->reduce_thread, _comp_bdev_io_submit, bdev_io); 853 } else { 854 _comp_bdev_io_submit(bdev_io); 855 } 856 } 857 858 static bool 859 vbdev_compress_io_type_supported(void *ctx, enum spdk_bdev_io_type io_type) 860 { 861 struct vbdev_compress *comp_bdev = (struct vbdev_compress *)ctx; 862 863 switch (io_type) { 864 case SPDK_BDEV_IO_TYPE_READ: 865 case SPDK_BDEV_IO_TYPE_WRITE: 866 return spdk_bdev_io_type_supported(comp_bdev->base_bdev, io_type); 867 case SPDK_BDEV_IO_TYPE_UNMAP: 868 case SPDK_BDEV_IO_TYPE_RESET: 869 case SPDK_BDEV_IO_TYPE_FLUSH: 870 case SPDK_BDEV_IO_TYPE_WRITE_ZEROES: 871 default: 872 return false; 873 } 874 } 875 876 /* Resubmission function used by the bdev layer when a queued IO is ready to be 877 * submitted. 878 */ 879 static void 880 vbdev_compress_resubmit_io(void *arg) 881 { 882 struct spdk_bdev_io *bdev_io = (struct spdk_bdev_io *)arg; 883 struct comp_bdev_io *io_ctx = (struct comp_bdev_io *)bdev_io->driver_ctx; 884 885 vbdev_compress_submit_request(io_ctx->ch, bdev_io); 886 } 887 888 /* Used to queue an IO in the event of resource issues. */ 889 static void 890 vbdev_compress_queue_io(struct spdk_bdev_io *bdev_io) 891 { 892 struct comp_bdev_io *io_ctx = (struct comp_bdev_io *)bdev_io->driver_ctx; 893 int rc; 894 895 io_ctx->bdev_io_wait.bdev = bdev_io->bdev; 896 io_ctx->bdev_io_wait.cb_fn = vbdev_compress_resubmit_io; 897 io_ctx->bdev_io_wait.cb_arg = bdev_io; 898 899 rc = spdk_bdev_queue_io_wait(bdev_io->bdev, io_ctx->comp_bdev->base_ch, &io_ctx->bdev_io_wait); 900 if (rc) { 901 SPDK_ERRLOG("Queue io failed in vbdev_compress_queue_io, rc=%d.\n", rc); 902 assert(false); 903 spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED); 904 } 905 } 906 907 /* Callback for unregistering the IO device. */ 908 static void 909 _device_unregister_cb(void *io_device) 910 { 911 struct vbdev_compress *comp_bdev = io_device; 912 913 /* Done with this comp_bdev. */ 914 pthread_mutex_destroy(&comp_bdev->reduce_lock); 915 free(comp_bdev->comp_bdev.name); 916 free(comp_bdev); 917 } 918 919 static void 920 _vbdev_compress_destruct_cb(void *ctx) 921 { 922 struct vbdev_compress *comp_bdev = ctx; 923 924 TAILQ_REMOVE(&g_vbdev_comp, comp_bdev, link); 925 spdk_bdev_module_release_bdev(comp_bdev->base_bdev); 926 /* Close the underlying bdev on its same opened thread. */ 927 spdk_bdev_close(comp_bdev->base_desc); 928 comp_bdev->vol = NULL; 929 if (comp_bdev->orphaned == false) { 930 spdk_io_device_unregister(comp_bdev, _device_unregister_cb); 931 } else { 932 vbdev_compress_delete_done(comp_bdev->delete_ctx, 0); 933 _device_unregister_cb(comp_bdev); 934 } 935 } 936 937 static void 938 vbdev_compress_destruct_cb(void *cb_arg, int reduce_errno) 939 { 940 struct vbdev_compress *comp_bdev = (struct vbdev_compress *)cb_arg; 941 942 if (reduce_errno) { 943 SPDK_ERRLOG("number %d\n", reduce_errno); 944 } else { 945 if (comp_bdev->thread && comp_bdev->thread != spdk_get_thread()) { 946 spdk_thread_send_msg(comp_bdev->thread, 947 _vbdev_compress_destruct_cb, comp_bdev); 948 } else { 949 _vbdev_compress_destruct_cb(comp_bdev); 950 } 951 } 952 } 953 954 static void 955 _reduce_destroy_cb(void *ctx, int reduce_errno) 956 { 957 struct vbdev_compress *comp_bdev = (struct vbdev_compress *)ctx; 958 959 if (reduce_errno) { 960 SPDK_ERRLOG("number %d\n", reduce_errno); 961 } 962 963 comp_bdev->vol = NULL; 964 spdk_put_io_channel(comp_bdev->base_ch); 965 if (comp_bdev->orphaned == false) { 966 spdk_bdev_unregister(&comp_bdev->comp_bdev, vbdev_compress_delete_done, 967 comp_bdev->delete_ctx); 968 } else { 969 vbdev_compress_destruct_cb((void *)comp_bdev, 0); 970 } 971 972 } 973 974 static void 975 _delete_vol_unload_cb(void *ctx) 976 { 977 struct vbdev_compress *comp_bdev = ctx; 978 979 /* FIXME: Assert if these conditions are not satisified for now. */ 980 assert(!comp_bdev->reduce_thread || 981 comp_bdev->reduce_thread == spdk_get_thread()); 982 983 /* reducelib needs a channel to comm with the backing device */ 984 comp_bdev->base_ch = spdk_bdev_get_io_channel(comp_bdev->base_desc); 985 986 /* Clean the device before we free our resources. */ 987 spdk_reduce_vol_destroy(&comp_bdev->backing_dev, _reduce_destroy_cb, comp_bdev); 988 } 989 990 /* Called by reduceLib after performing unload vol actions */ 991 static void 992 delete_vol_unload_cb(void *cb_arg, int reduce_errno) 993 { 994 struct vbdev_compress *comp_bdev = (struct vbdev_compress *)cb_arg; 995 996 if (reduce_errno) { 997 SPDK_ERRLOG("number %d\n", reduce_errno); 998 /* FIXME: callback should be executed. */ 999 return; 1000 } 1001 1002 pthread_mutex_lock(&comp_bdev->reduce_lock); 1003 if (comp_bdev->reduce_thread && comp_bdev->reduce_thread != spdk_get_thread()) { 1004 spdk_thread_send_msg(comp_bdev->reduce_thread, 1005 _delete_vol_unload_cb, comp_bdev); 1006 pthread_mutex_unlock(&comp_bdev->reduce_lock); 1007 } else { 1008 pthread_mutex_unlock(&comp_bdev->reduce_lock); 1009 1010 _delete_vol_unload_cb(comp_bdev); 1011 } 1012 } 1013 1014 const char * 1015 compress_get_name(const struct vbdev_compress *comp_bdev) 1016 { 1017 return comp_bdev->comp_bdev.name; 1018 } 1019 1020 struct vbdev_compress * 1021 compress_bdev_first(void) 1022 { 1023 struct vbdev_compress *comp_bdev; 1024 1025 comp_bdev = TAILQ_FIRST(&g_vbdev_comp); 1026 1027 return comp_bdev; 1028 } 1029 1030 struct vbdev_compress * 1031 compress_bdev_next(struct vbdev_compress *prev) 1032 { 1033 struct vbdev_compress *comp_bdev; 1034 1035 comp_bdev = TAILQ_NEXT(prev, link); 1036 1037 return comp_bdev; 1038 } 1039 1040 bool 1041 compress_has_orphan(const char *name) 1042 { 1043 struct vbdev_compress *comp_bdev; 1044 1045 TAILQ_FOREACH(comp_bdev, &g_vbdev_comp, link) { 1046 if (comp_bdev->orphaned && strcmp(name, comp_bdev->comp_bdev.name) == 0) { 1047 return true; 1048 } 1049 } 1050 return false; 1051 } 1052 1053 /* Called after we've unregistered following a hot remove callback. 1054 * Our finish entry point will be called next. 1055 */ 1056 static int 1057 vbdev_compress_destruct(void *ctx) 1058 { 1059 struct vbdev_compress *comp_bdev = (struct vbdev_compress *)ctx; 1060 1061 if (comp_bdev->vol != NULL) { 1062 /* Tell reducelib that we're done with this volume. */ 1063 spdk_reduce_vol_unload(comp_bdev->vol, vbdev_compress_destruct_cb, comp_bdev); 1064 } else { 1065 vbdev_compress_destruct_cb(comp_bdev, 0); 1066 } 1067 1068 return 0; 1069 } 1070 1071 /* We supplied this as an entry point for upper layers who want to communicate to this 1072 * bdev. This is how they get a channel. 1073 */ 1074 static struct spdk_io_channel * 1075 vbdev_compress_get_io_channel(void *ctx) 1076 { 1077 struct vbdev_compress *comp_bdev = (struct vbdev_compress *)ctx; 1078 1079 /* The IO channel code will allocate a channel for us which consists of 1080 * the SPDK channel structure plus the size of our comp_io_channel struct 1081 * that we passed in when we registered our IO device. It will then call 1082 * our channel create callback to populate any elements that we need to 1083 * update. 1084 */ 1085 return spdk_get_io_channel(comp_bdev); 1086 } 1087 1088 /* This is the output for bdev_get_bdevs() for this vbdev */ 1089 static int 1090 vbdev_compress_dump_info_json(void *ctx, struct spdk_json_write_ctx *w) 1091 { 1092 struct vbdev_compress *comp_bdev = (struct vbdev_compress *)ctx; 1093 1094 spdk_json_write_name(w, "compress"); 1095 spdk_json_write_object_begin(w); 1096 spdk_json_write_named_string(w, "name", spdk_bdev_get_name(&comp_bdev->comp_bdev)); 1097 spdk_json_write_named_string(w, "base_bdev_name", spdk_bdev_get_name(comp_bdev->base_bdev)); 1098 spdk_json_write_named_string(w, "compression_pmd", comp_bdev->drv_name); 1099 spdk_json_write_object_end(w); 1100 1101 return 0; 1102 } 1103 1104 /* This is used to generate JSON that can configure this module to its current state. */ 1105 static int 1106 vbdev_compress_config_json(struct spdk_json_write_ctx *w) 1107 { 1108 struct vbdev_compress *comp_bdev; 1109 1110 TAILQ_FOREACH(comp_bdev, &g_vbdev_comp, link) { 1111 spdk_json_write_object_begin(w); 1112 spdk_json_write_named_string(w, "method", "bdev_compress_create"); 1113 spdk_json_write_named_object_begin(w, "params"); 1114 spdk_json_write_named_string(w, "base_bdev_name", spdk_bdev_get_name(comp_bdev->base_bdev)); 1115 spdk_json_write_named_string(w, "name", spdk_bdev_get_name(&comp_bdev->comp_bdev)); 1116 spdk_json_write_named_string(w, "compression_pmd", comp_bdev->drv_name); 1117 spdk_json_write_object_end(w); 1118 spdk_json_write_object_end(w); 1119 } 1120 return 0; 1121 } 1122 1123 static void 1124 _vbdev_reduce_init_cb(void *ctx) 1125 { 1126 struct vbdev_compress *meta_ctx = ctx; 1127 int rc; 1128 1129 assert(meta_ctx->base_desc != NULL); 1130 1131 /* We're done with metadata operations */ 1132 spdk_put_io_channel(meta_ctx->base_ch); 1133 1134 if (meta_ctx->vol) { 1135 rc = vbdev_compress_claim(meta_ctx); 1136 if (rc == 0) { 1137 return; 1138 } 1139 } 1140 1141 /* Close the underlying bdev on its same opened thread. */ 1142 spdk_bdev_close(meta_ctx->base_desc); 1143 free(meta_ctx); 1144 } 1145 1146 /* Callback from reduce for when init is complete. We'll pass the vbdev_comp struct 1147 * used for initial metadata operations to claim where it will be further filled out 1148 * and added to the global list. 1149 */ 1150 static void 1151 vbdev_reduce_init_cb(void *cb_arg, struct spdk_reduce_vol *vol, int reduce_errno) 1152 { 1153 struct vbdev_compress *meta_ctx = cb_arg; 1154 1155 if (reduce_errno == 0) { 1156 meta_ctx->vol = vol; 1157 } else { 1158 SPDK_ERRLOG("for vol %s, error %u\n", 1159 spdk_bdev_get_name(meta_ctx->base_bdev), reduce_errno); 1160 } 1161 1162 if (meta_ctx->thread && meta_ctx->thread != spdk_get_thread()) { 1163 spdk_thread_send_msg(meta_ctx->thread, _vbdev_reduce_init_cb, meta_ctx); 1164 } else { 1165 _vbdev_reduce_init_cb(meta_ctx); 1166 } 1167 } 1168 1169 /* Callback for the function used by reduceLib to perform IO to/from the backing device. We just 1170 * call the callback provided by reduceLib when it called the read/write/unmap function and 1171 * free the bdev_io. 1172 */ 1173 static void 1174 comp_reduce_io_cb(struct spdk_bdev_io *bdev_io, bool success, void *arg) 1175 { 1176 struct spdk_reduce_vol_cb_args *cb_args = arg; 1177 int reduce_errno; 1178 1179 if (success) { 1180 reduce_errno = 0; 1181 } else { 1182 reduce_errno = -EIO; 1183 } 1184 spdk_bdev_free_io(bdev_io); 1185 cb_args->cb_fn(cb_args->cb_arg, reduce_errno); 1186 } 1187 1188 /* This is the function provided to the reduceLib for sending reads directly to 1189 * the backing device. 1190 */ 1191 static void 1192 _comp_reduce_readv(struct spdk_reduce_backing_dev *dev, struct iovec *iov, int iovcnt, 1193 uint64_t lba, uint32_t lba_count, struct spdk_reduce_vol_cb_args *args) 1194 { 1195 struct vbdev_compress *comp_bdev = SPDK_CONTAINEROF(dev, struct vbdev_compress, 1196 backing_dev); 1197 int rc; 1198 1199 rc = spdk_bdev_readv_blocks(comp_bdev->base_desc, comp_bdev->base_ch, 1200 iov, iovcnt, lba, lba_count, 1201 comp_reduce_io_cb, 1202 args); 1203 if (rc) { 1204 if (rc == -ENOMEM) { 1205 SPDK_ERRLOG("No memory, start to queue io.\n"); 1206 /* TODO: there's no bdev_io to queue */ 1207 } else { 1208 SPDK_ERRLOG("submitting readv request\n"); 1209 } 1210 args->cb_fn(args->cb_arg, rc); 1211 } 1212 } 1213 1214 /* This is the function provided to the reduceLib for sending writes directly to 1215 * the backing device. 1216 */ 1217 static void 1218 _comp_reduce_writev(struct spdk_reduce_backing_dev *dev, struct iovec *iov, int iovcnt, 1219 uint64_t lba, uint32_t lba_count, struct spdk_reduce_vol_cb_args *args) 1220 { 1221 struct vbdev_compress *comp_bdev = SPDK_CONTAINEROF(dev, struct vbdev_compress, 1222 backing_dev); 1223 int rc; 1224 1225 rc = spdk_bdev_writev_blocks(comp_bdev->base_desc, comp_bdev->base_ch, 1226 iov, iovcnt, lba, lba_count, 1227 comp_reduce_io_cb, 1228 args); 1229 if (rc) { 1230 if (rc == -ENOMEM) { 1231 SPDK_ERRLOG("No memory, start to queue io.\n"); 1232 /* TODO: there's no bdev_io to queue */ 1233 } else { 1234 SPDK_ERRLOG("error submitting writev request\n"); 1235 } 1236 args->cb_fn(args->cb_arg, rc); 1237 } 1238 } 1239 1240 /* This is the function provided to the reduceLib for sending unmaps directly to 1241 * the backing device. 1242 */ 1243 static void 1244 _comp_reduce_unmap(struct spdk_reduce_backing_dev *dev, 1245 uint64_t lba, uint32_t lba_count, struct spdk_reduce_vol_cb_args *args) 1246 { 1247 struct vbdev_compress *comp_bdev = SPDK_CONTAINEROF(dev, struct vbdev_compress, 1248 backing_dev); 1249 int rc; 1250 1251 rc = spdk_bdev_unmap_blocks(comp_bdev->base_desc, comp_bdev->base_ch, 1252 lba, lba_count, 1253 comp_reduce_io_cb, 1254 args); 1255 1256 if (rc) { 1257 if (rc == -ENOMEM) { 1258 SPDK_ERRLOG("No memory, start to queue io.\n"); 1259 /* TODO: there's no bdev_io to queue */ 1260 } else { 1261 SPDK_ERRLOG("submitting unmap request\n"); 1262 } 1263 args->cb_fn(args->cb_arg, rc); 1264 } 1265 } 1266 1267 /* Called by reduceLib after performing unload vol actions following base bdev hotremove */ 1268 static void 1269 bdev_hotremove_vol_unload_cb(void *cb_arg, int reduce_errno) 1270 { 1271 struct vbdev_compress *comp_bdev = (struct vbdev_compress *)cb_arg; 1272 1273 if (reduce_errno) { 1274 SPDK_ERRLOG("number %d\n", reduce_errno); 1275 } 1276 1277 comp_bdev->vol = NULL; 1278 spdk_bdev_unregister(&comp_bdev->comp_bdev, NULL, NULL); 1279 } 1280 1281 static void 1282 vbdev_compress_base_bdev_hotremove_cb(struct spdk_bdev *bdev_find) 1283 { 1284 struct vbdev_compress *comp_bdev, *tmp; 1285 1286 TAILQ_FOREACH_SAFE(comp_bdev, &g_vbdev_comp, link, tmp) { 1287 if (bdev_find == comp_bdev->base_bdev) { 1288 /* Tell reduceLib that we're done with this volume. */ 1289 spdk_reduce_vol_unload(comp_bdev->vol, bdev_hotremove_vol_unload_cb, comp_bdev); 1290 } 1291 } 1292 } 1293 1294 /* Called when the underlying base bdev triggers asynchronous event such as bdev removal. */ 1295 static void 1296 vbdev_compress_base_bdev_event_cb(enum spdk_bdev_event_type type, struct spdk_bdev *bdev, 1297 void *event_ctx) 1298 { 1299 switch (type) { 1300 case SPDK_BDEV_EVENT_REMOVE: 1301 vbdev_compress_base_bdev_hotremove_cb(bdev); 1302 break; 1303 default: 1304 SPDK_NOTICELOG("Unsupported bdev event: type %d\n", type); 1305 break; 1306 } 1307 } 1308 1309 /* TODO: determine which parms we want user configurable, HC for now 1310 * params.vol_size 1311 * params.chunk_size 1312 * compression PMD, algorithm, window size, comp level, etc. 1313 * DEV_MD_PATH 1314 */ 1315 1316 /* Common function for init and load to allocate and populate the minimal 1317 * information for reducelib to init or load. 1318 */ 1319 struct vbdev_compress * 1320 _prepare_for_load_init(struct spdk_bdev_desc *bdev_desc, uint32_t lb_size) 1321 { 1322 struct vbdev_compress *meta_ctx; 1323 struct spdk_bdev *bdev; 1324 1325 meta_ctx = calloc(1, sizeof(struct vbdev_compress)); 1326 if (meta_ctx == NULL) { 1327 SPDK_ERRLOG("failed to alloc init contexts\n"); 1328 return NULL; 1329 } 1330 1331 meta_ctx->drv_name = "None"; 1332 meta_ctx->backing_dev.unmap = _comp_reduce_unmap; 1333 meta_ctx->backing_dev.readv = _comp_reduce_readv; 1334 meta_ctx->backing_dev.writev = _comp_reduce_writev; 1335 meta_ctx->backing_dev.compress = _comp_reduce_compress; 1336 meta_ctx->backing_dev.decompress = _comp_reduce_decompress; 1337 1338 meta_ctx->base_desc = bdev_desc; 1339 bdev = spdk_bdev_desc_get_bdev(bdev_desc); 1340 meta_ctx->base_bdev = bdev; 1341 1342 meta_ctx->backing_dev.blocklen = bdev->blocklen; 1343 meta_ctx->backing_dev.blockcnt = bdev->blockcnt; 1344 1345 meta_ctx->params.chunk_size = CHUNK_SIZE; 1346 if (lb_size == 0) { 1347 meta_ctx->params.logical_block_size = bdev->blocklen; 1348 } else { 1349 meta_ctx->params.logical_block_size = lb_size; 1350 } 1351 1352 meta_ctx->params.backing_io_unit_size = BACKING_IO_SZ; 1353 return meta_ctx; 1354 } 1355 1356 static bool 1357 _set_pmd(struct vbdev_compress *comp_dev) 1358 { 1359 if (g_opts == COMPRESS_PMD_AUTO) { 1360 if (g_qat_available) { 1361 comp_dev->drv_name = QAT_PMD; 1362 } else { 1363 comp_dev->drv_name = ISAL_PMD; 1364 } 1365 } else if (g_opts == COMPRESS_PMD_QAT_ONLY && g_qat_available) { 1366 comp_dev->drv_name = QAT_PMD; 1367 } else if (g_opts == COMPRESS_PMD_ISAL_ONLY && g_isal_available) { 1368 comp_dev->drv_name = ISAL_PMD; 1369 } else { 1370 SPDK_ERRLOG("Requested PMD is not available.\n"); 1371 return false; 1372 } 1373 SPDK_NOTICELOG("PMD being used: %s\n", comp_dev->drv_name); 1374 return true; 1375 } 1376 1377 /* Call reducelib to initialize a new volume */ 1378 static int 1379 vbdev_init_reduce(const char *bdev_name, const char *pm_path, uint32_t lb_size) 1380 { 1381 struct spdk_bdev_desc *bdev_desc = NULL; 1382 struct vbdev_compress *meta_ctx; 1383 int rc; 1384 1385 rc = spdk_bdev_open_ext(bdev_name, true, vbdev_compress_base_bdev_event_cb, 1386 NULL, &bdev_desc); 1387 if (rc) { 1388 SPDK_ERRLOG("could not open bdev %s\n", bdev_name); 1389 return rc; 1390 } 1391 1392 meta_ctx = _prepare_for_load_init(bdev_desc, lb_size); 1393 if (meta_ctx == NULL) { 1394 spdk_bdev_close(bdev_desc); 1395 return -EINVAL; 1396 } 1397 1398 if (_set_pmd(meta_ctx) == false) { 1399 SPDK_ERRLOG("could not find required pmd\n"); 1400 free(meta_ctx); 1401 spdk_bdev_close(bdev_desc); 1402 return -EINVAL; 1403 } 1404 1405 /* Save the thread where the base device is opened */ 1406 meta_ctx->thread = spdk_get_thread(); 1407 1408 meta_ctx->base_ch = spdk_bdev_get_io_channel(meta_ctx->base_desc); 1409 1410 spdk_reduce_vol_init(&meta_ctx->params, &meta_ctx->backing_dev, 1411 pm_path, 1412 vbdev_reduce_init_cb, 1413 meta_ctx); 1414 return 0; 1415 } 1416 1417 /* We provide this callback for the SPDK channel code to create a channel using 1418 * the channel struct we provided in our module get_io_channel() entry point. Here 1419 * we get and save off an underlying base channel of the device below us so that 1420 * we can communicate with the base bdev on a per channel basis. If we needed 1421 * our own poller for this vbdev, we'd register it here. 1422 */ 1423 static int 1424 comp_bdev_ch_create_cb(void *io_device, void *ctx_buf) 1425 { 1426 struct vbdev_compress *comp_bdev = io_device; 1427 struct comp_device_qp *device_qp; 1428 1429 /* Now set the reduce channel if it's not already set. */ 1430 pthread_mutex_lock(&comp_bdev->reduce_lock); 1431 if (comp_bdev->ch_count == 0) { 1432 /* We use this queue to track outstanding IO in our layer. */ 1433 TAILQ_INIT(&comp_bdev->pending_comp_ios); 1434 1435 /* We use this to queue up compression operations as needed. */ 1436 TAILQ_INIT(&comp_bdev->queued_comp_ops); 1437 1438 comp_bdev->base_ch = spdk_bdev_get_io_channel(comp_bdev->base_desc); 1439 comp_bdev->reduce_thread = spdk_get_thread(); 1440 comp_bdev->poller = SPDK_POLLER_REGISTER(comp_dev_poller, comp_bdev, 0); 1441 /* Now assign a q pair */ 1442 pthread_mutex_lock(&g_comp_device_qp_lock); 1443 TAILQ_FOREACH(device_qp, &g_comp_device_qp, link) { 1444 if (strcmp(device_qp->device->cdev_info.driver_name, comp_bdev->drv_name) == 0) { 1445 if (device_qp->thread == spdk_get_thread()) { 1446 comp_bdev->device_qp = device_qp; 1447 break; 1448 } 1449 if (device_qp->thread == NULL) { 1450 comp_bdev->device_qp = device_qp; 1451 device_qp->thread = spdk_get_thread(); 1452 break; 1453 } 1454 } 1455 } 1456 pthread_mutex_unlock(&g_comp_device_qp_lock); 1457 } 1458 comp_bdev->ch_count++; 1459 pthread_mutex_unlock(&comp_bdev->reduce_lock); 1460 1461 if (comp_bdev->device_qp != NULL) { 1462 return 0; 1463 } else { 1464 SPDK_ERRLOG("out of qpairs, cannot assign one to comp_bdev %p\n", comp_bdev); 1465 assert(false); 1466 return -ENOMEM; 1467 } 1468 } 1469 1470 static void 1471 _channel_cleanup(struct vbdev_compress *comp_bdev) 1472 { 1473 /* Note: comp_bdevs can share a device_qp if they are 1474 * on the same thread so we leave the device_qp element 1475 * alone for this comp_bdev and just clear the reduce thread. 1476 */ 1477 spdk_put_io_channel(comp_bdev->base_ch); 1478 comp_bdev->reduce_thread = NULL; 1479 spdk_poller_unregister(&comp_bdev->poller); 1480 } 1481 1482 /* Used to reroute destroy_ch to the correct thread */ 1483 static void 1484 _comp_bdev_ch_destroy_cb(void *arg) 1485 { 1486 struct vbdev_compress *comp_bdev = arg; 1487 1488 pthread_mutex_lock(&comp_bdev->reduce_lock); 1489 _channel_cleanup(comp_bdev); 1490 pthread_mutex_unlock(&comp_bdev->reduce_lock); 1491 } 1492 1493 /* We provide this callback for the SPDK channel code to destroy a channel 1494 * created with our create callback. We just need to undo anything we did 1495 * when we created. If this bdev used its own poller, we'd unregister it here. 1496 */ 1497 static void 1498 comp_bdev_ch_destroy_cb(void *io_device, void *ctx_buf) 1499 { 1500 struct vbdev_compress *comp_bdev = io_device; 1501 1502 pthread_mutex_lock(&comp_bdev->reduce_lock); 1503 comp_bdev->ch_count--; 1504 if (comp_bdev->ch_count == 0) { 1505 /* Send this request to the thread where the channel was created. */ 1506 if (comp_bdev->reduce_thread != spdk_get_thread()) { 1507 spdk_thread_send_msg(comp_bdev->reduce_thread, 1508 _comp_bdev_ch_destroy_cb, comp_bdev); 1509 } else { 1510 _channel_cleanup(comp_bdev); 1511 } 1512 } 1513 pthread_mutex_unlock(&comp_bdev->reduce_lock); 1514 } 1515 1516 /* RPC entry point for compression vbdev creation. */ 1517 int 1518 create_compress_bdev(const char *bdev_name, const char *pm_path, uint32_t lb_size) 1519 { 1520 if ((lb_size != 0) && (lb_size != LB_SIZE_4K) && (lb_size != LB_SIZE_512B)) { 1521 SPDK_ERRLOG("Logical block size must be 512 or 4096\n"); 1522 return -EINVAL; 1523 } 1524 1525 return vbdev_init_reduce(bdev_name, pm_path, lb_size); 1526 } 1527 1528 /* On init, just init the compress drivers. All metadata is stored on disk. */ 1529 static int 1530 vbdev_compress_init(void) 1531 { 1532 if (vbdev_init_compress_drivers()) { 1533 SPDK_ERRLOG("Error setting up compression devices\n"); 1534 return -EINVAL; 1535 } 1536 1537 return 0; 1538 } 1539 1540 /* Called when the entire module is being torn down. */ 1541 static void 1542 vbdev_compress_finish(void) 1543 { 1544 struct comp_device_qp *dev_qp; 1545 /* TODO: unload vol in a future patch */ 1546 1547 while ((dev_qp = TAILQ_FIRST(&g_comp_device_qp))) { 1548 TAILQ_REMOVE(&g_comp_device_qp, dev_qp, link); 1549 free(dev_qp); 1550 } 1551 pthread_mutex_destroy(&g_comp_device_qp_lock); 1552 1553 rte_mempool_free(g_comp_op_mp); 1554 rte_mempool_free(g_mbuf_mp); 1555 } 1556 1557 /* During init we'll be asked how much memory we'd like passed to us 1558 * in bev_io structures as context. Here's where we specify how 1559 * much context we want per IO. 1560 */ 1561 static int 1562 vbdev_compress_get_ctx_size(void) 1563 { 1564 return sizeof(struct comp_bdev_io); 1565 } 1566 1567 /* When we register our bdev this is how we specify our entry points. */ 1568 static const struct spdk_bdev_fn_table vbdev_compress_fn_table = { 1569 .destruct = vbdev_compress_destruct, 1570 .submit_request = vbdev_compress_submit_request, 1571 .io_type_supported = vbdev_compress_io_type_supported, 1572 .get_io_channel = vbdev_compress_get_io_channel, 1573 .dump_info_json = vbdev_compress_dump_info_json, 1574 .write_config_json = NULL, 1575 }; 1576 1577 static struct spdk_bdev_module compress_if = { 1578 .name = "compress", 1579 .module_init = vbdev_compress_init, 1580 .get_ctx_size = vbdev_compress_get_ctx_size, 1581 .examine_disk = vbdev_compress_examine, 1582 .module_fini = vbdev_compress_finish, 1583 .config_json = vbdev_compress_config_json 1584 }; 1585 1586 SPDK_BDEV_MODULE_REGISTER(compress, &compress_if) 1587 1588 static int _set_compbdev_name(struct vbdev_compress *comp_bdev) 1589 { 1590 struct spdk_bdev_alias *aliases; 1591 1592 if (!TAILQ_EMPTY(spdk_bdev_get_aliases(comp_bdev->base_bdev))) { 1593 aliases = TAILQ_FIRST(spdk_bdev_get_aliases(comp_bdev->base_bdev)); 1594 comp_bdev->comp_bdev.name = spdk_sprintf_alloc("COMP_%s", aliases->alias); 1595 if (!comp_bdev->comp_bdev.name) { 1596 SPDK_ERRLOG("could not allocate comp_bdev name for alias\n"); 1597 return -ENOMEM; 1598 } 1599 } else { 1600 comp_bdev->comp_bdev.name = spdk_sprintf_alloc("COMP_%s", comp_bdev->base_bdev->name); 1601 if (!comp_bdev->comp_bdev.name) { 1602 SPDK_ERRLOG("could not allocate comp_bdev name for unique name\n"); 1603 return -ENOMEM; 1604 } 1605 } 1606 return 0; 1607 } 1608 1609 static int 1610 vbdev_compress_claim(struct vbdev_compress *comp_bdev) 1611 { 1612 int rc; 1613 1614 if (_set_compbdev_name(comp_bdev)) { 1615 return -EINVAL; 1616 } 1617 1618 /* Note: some of the fields below will change in the future - for example, 1619 * blockcnt specifically will not match (the compressed volume size will 1620 * be slightly less than the base bdev size) 1621 */ 1622 comp_bdev->comp_bdev.product_name = COMP_BDEV_NAME; 1623 comp_bdev->comp_bdev.write_cache = comp_bdev->base_bdev->write_cache; 1624 1625 if (strcmp(comp_bdev->drv_name, QAT_PMD) == 0) { 1626 comp_bdev->comp_bdev.required_alignment = 1627 spdk_max(spdk_u32log2(comp_bdev->base_bdev->blocklen), 1628 comp_bdev->base_bdev->required_alignment); 1629 SPDK_NOTICELOG("QAT in use: Required alignment set to %u\n", 1630 comp_bdev->comp_bdev.required_alignment); 1631 } else { 1632 comp_bdev->comp_bdev.required_alignment = comp_bdev->base_bdev->required_alignment; 1633 } 1634 comp_bdev->comp_bdev.optimal_io_boundary = 1635 comp_bdev->params.chunk_size / comp_bdev->params.logical_block_size; 1636 1637 comp_bdev->comp_bdev.split_on_optimal_io_boundary = true; 1638 1639 comp_bdev->comp_bdev.blocklen = comp_bdev->params.logical_block_size; 1640 comp_bdev->comp_bdev.blockcnt = comp_bdev->params.vol_size / comp_bdev->comp_bdev.blocklen; 1641 assert(comp_bdev->comp_bdev.blockcnt > 0); 1642 1643 /* This is the context that is passed to us when the bdev 1644 * layer calls in so we'll save our comp_bdev node here. 1645 */ 1646 comp_bdev->comp_bdev.ctxt = comp_bdev; 1647 comp_bdev->comp_bdev.fn_table = &vbdev_compress_fn_table; 1648 comp_bdev->comp_bdev.module = &compress_if; 1649 1650 pthread_mutex_init(&comp_bdev->reduce_lock, NULL); 1651 1652 /* Save the thread where the base device is opened */ 1653 comp_bdev->thread = spdk_get_thread(); 1654 1655 spdk_io_device_register(comp_bdev, comp_bdev_ch_create_cb, comp_bdev_ch_destroy_cb, 1656 sizeof(struct comp_io_channel), 1657 comp_bdev->comp_bdev.name); 1658 1659 rc = spdk_bdev_module_claim_bdev(comp_bdev->base_bdev, comp_bdev->base_desc, 1660 comp_bdev->comp_bdev.module); 1661 if (rc) { 1662 SPDK_ERRLOG("could not claim bdev %s\n", spdk_bdev_get_name(comp_bdev->base_bdev)); 1663 goto error_claim; 1664 } 1665 1666 rc = spdk_bdev_register(&comp_bdev->comp_bdev); 1667 if (rc < 0) { 1668 SPDK_ERRLOG("trying to register bdev\n"); 1669 goto error_bdev_register; 1670 } 1671 1672 TAILQ_INSERT_TAIL(&g_vbdev_comp, comp_bdev, link); 1673 1674 SPDK_NOTICELOG("registered io_device and virtual bdev for: %s\n", comp_bdev->comp_bdev.name); 1675 1676 return 0; 1677 1678 /* Error cleanup paths. */ 1679 error_bdev_register: 1680 spdk_bdev_module_release_bdev(comp_bdev->base_bdev); 1681 error_claim: 1682 spdk_io_device_unregister(comp_bdev, NULL); 1683 free(comp_bdev->comp_bdev.name); 1684 return rc; 1685 } 1686 1687 static void 1688 _vbdev_compress_delete_done(void *_ctx) 1689 { 1690 struct vbdev_comp_delete_ctx *ctx = _ctx; 1691 1692 ctx->cb_fn(ctx->cb_arg, ctx->cb_rc); 1693 1694 free(ctx); 1695 } 1696 1697 static void 1698 vbdev_compress_delete_done(void *cb_arg, int bdeverrno) 1699 { 1700 struct vbdev_comp_delete_ctx *ctx = cb_arg; 1701 1702 ctx->cb_rc = bdeverrno; 1703 1704 if (ctx->orig_thread != spdk_get_thread()) { 1705 spdk_thread_send_msg(ctx->orig_thread, _vbdev_compress_delete_done, ctx); 1706 } else { 1707 _vbdev_compress_delete_done(ctx); 1708 } 1709 } 1710 1711 void 1712 bdev_compress_delete(const char *name, spdk_delete_compress_complete cb_fn, void *cb_arg) 1713 { 1714 struct vbdev_compress *comp_bdev = NULL; 1715 struct vbdev_comp_delete_ctx *ctx; 1716 1717 TAILQ_FOREACH(comp_bdev, &g_vbdev_comp, link) { 1718 if (strcmp(name, comp_bdev->comp_bdev.name) == 0) { 1719 break; 1720 } 1721 } 1722 1723 if (comp_bdev == NULL) { 1724 cb_fn(cb_arg, -ENODEV); 1725 return; 1726 } 1727 1728 ctx = calloc(1, sizeof(*ctx)); 1729 if (ctx == NULL) { 1730 SPDK_ERRLOG("Failed to allocate delete context\n"); 1731 cb_fn(cb_arg, -ENOMEM); 1732 return; 1733 } 1734 1735 /* Save these for after the vol is destroyed. */ 1736 ctx->cb_fn = cb_fn; 1737 ctx->cb_arg = cb_arg; 1738 ctx->orig_thread = spdk_get_thread(); 1739 1740 comp_bdev->delete_ctx = ctx; 1741 1742 /* Tell reducelib that we're done with this volume. */ 1743 if (comp_bdev->orphaned == false) { 1744 spdk_reduce_vol_unload(comp_bdev->vol, delete_vol_unload_cb, comp_bdev); 1745 } else { 1746 delete_vol_unload_cb(comp_bdev, 0); 1747 } 1748 } 1749 1750 static void 1751 _vbdev_reduce_load_cb(void *ctx) 1752 { 1753 struct vbdev_compress *meta_ctx = ctx; 1754 int rc; 1755 1756 assert(meta_ctx->base_desc != NULL); 1757 1758 /* Done with metadata operations */ 1759 spdk_put_io_channel(meta_ctx->base_ch); 1760 1761 if (meta_ctx->reduce_errno == 0) { 1762 if (_set_pmd(meta_ctx) == false) { 1763 SPDK_ERRLOG("could not find required pmd\n"); 1764 goto err; 1765 } 1766 1767 rc = vbdev_compress_claim(meta_ctx); 1768 if (rc != 0) { 1769 goto err; 1770 } 1771 } else if (meta_ctx->reduce_errno == -ENOENT) { 1772 if (_set_compbdev_name(meta_ctx)) { 1773 goto err; 1774 } 1775 1776 /* Save the thread where the base device is opened */ 1777 meta_ctx->thread = spdk_get_thread(); 1778 1779 meta_ctx->comp_bdev.module = &compress_if; 1780 pthread_mutex_init(&meta_ctx->reduce_lock, NULL); 1781 rc = spdk_bdev_module_claim_bdev(meta_ctx->base_bdev, meta_ctx->base_desc, 1782 meta_ctx->comp_bdev.module); 1783 if (rc) { 1784 SPDK_ERRLOG("could not claim bdev %s\n", spdk_bdev_get_name(meta_ctx->base_bdev)); 1785 free(meta_ctx->comp_bdev.name); 1786 goto err; 1787 } 1788 1789 meta_ctx->orphaned = true; 1790 TAILQ_INSERT_TAIL(&g_vbdev_comp, meta_ctx, link); 1791 } else { 1792 if (meta_ctx->reduce_errno != -EILSEQ) { 1793 SPDK_ERRLOG("for vol %s, error %u\n", 1794 spdk_bdev_get_name(meta_ctx->base_bdev), meta_ctx->reduce_errno); 1795 } 1796 goto err; 1797 } 1798 1799 spdk_bdev_module_examine_done(&compress_if); 1800 return; 1801 1802 err: 1803 /* Close the underlying bdev on its same opened thread. */ 1804 spdk_bdev_close(meta_ctx->base_desc); 1805 free(meta_ctx); 1806 spdk_bdev_module_examine_done(&compress_if); 1807 } 1808 1809 /* Callback from reduce for then load is complete. We'll pass the vbdev_comp struct 1810 * used for initial metadata operations to claim where it will be further filled out 1811 * and added to the global list. 1812 */ 1813 static void 1814 vbdev_reduce_load_cb(void *cb_arg, struct spdk_reduce_vol *vol, int reduce_errno) 1815 { 1816 struct vbdev_compress *meta_ctx = cb_arg; 1817 1818 if (reduce_errno == 0) { 1819 /* Update information following volume load. */ 1820 meta_ctx->vol = vol; 1821 memcpy(&meta_ctx->params, spdk_reduce_vol_get_params(vol), 1822 sizeof(struct spdk_reduce_vol_params)); 1823 } 1824 1825 meta_ctx->reduce_errno = reduce_errno; 1826 1827 if (meta_ctx->thread && meta_ctx->thread != spdk_get_thread()) { 1828 spdk_thread_send_msg(meta_ctx->thread, _vbdev_reduce_load_cb, meta_ctx); 1829 } else { 1830 _vbdev_reduce_load_cb(meta_ctx); 1831 } 1832 1833 } 1834 1835 /* Examine_disk entry point: will do a metadata load to see if this is ours, 1836 * and if so will go ahead and claim it. 1837 */ 1838 static void 1839 vbdev_compress_examine(struct spdk_bdev *bdev) 1840 { 1841 struct spdk_bdev_desc *bdev_desc = NULL; 1842 struct vbdev_compress *meta_ctx; 1843 int rc; 1844 1845 if (strcmp(bdev->product_name, COMP_BDEV_NAME) == 0) { 1846 spdk_bdev_module_examine_done(&compress_if); 1847 return; 1848 } 1849 1850 rc = spdk_bdev_open_ext(spdk_bdev_get_name(bdev), false, 1851 vbdev_compress_base_bdev_event_cb, NULL, &bdev_desc); 1852 if (rc) { 1853 SPDK_ERRLOG("could not open bdev %s\n", spdk_bdev_get_name(bdev)); 1854 spdk_bdev_module_examine_done(&compress_if); 1855 return; 1856 } 1857 1858 meta_ctx = _prepare_for_load_init(bdev_desc, 0); 1859 if (meta_ctx == NULL) { 1860 spdk_bdev_close(bdev_desc); 1861 spdk_bdev_module_examine_done(&compress_if); 1862 return; 1863 } 1864 1865 /* Save the thread where the base device is opened */ 1866 meta_ctx->thread = spdk_get_thread(); 1867 1868 meta_ctx->base_ch = spdk_bdev_get_io_channel(meta_ctx->base_desc); 1869 spdk_reduce_vol_load(&meta_ctx->backing_dev, vbdev_reduce_load_cb, meta_ctx); 1870 } 1871 1872 int 1873 compress_set_pmd(enum compress_pmd *opts) 1874 { 1875 g_opts = *opts; 1876 1877 return 0; 1878 } 1879 1880 SPDK_LOG_REGISTER_COMPONENT(vbdev_compress) 1881