1 /*- 2 * BSD LICENSE 3 * 4 * Copyright (c) Intel Corporation. 5 * All rights reserved. 6 * 7 * Redistribution and use in source and binary forms, with or without 8 * modification, are permitted provided that the following conditions 9 * are met: 10 * 11 * * Redistributions of source code must retain the above copyright 12 * notice, this list of conditions and the following disclaimer. 13 * * Redistributions in binary form must reproduce the above copyright 14 * notice, this list of conditions and the following disclaimer in 15 * the documentation and/or other materials provided with the 16 * distribution. 17 * * Neither the name of Intel Corporation nor the names of its 18 * contributors may be used to endorse or promote products derived 19 * from this software without specific prior written permission. 20 * 21 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 22 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 23 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR 24 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT 25 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, 26 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT 27 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 28 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY 29 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 30 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE 31 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 32 */ 33 34 #include "vbdev_compress.h" 35 36 #include "spdk/reduce.h" 37 #include "spdk/stdinc.h" 38 #include "spdk/rpc.h" 39 #include "spdk/env.h" 40 #include "spdk/conf.h" 41 #include "spdk/endian.h" 42 #include "spdk/string.h" 43 #include "spdk/thread.h" 44 #include "spdk/util.h" 45 #include "spdk/bdev_module.h" 46 47 #include "spdk_internal/log.h" 48 49 #include <rte_config.h> 50 #include <rte_bus_vdev.h> 51 #include <rte_compressdev.h> 52 #include <rte_comp.h> 53 54 #define NUM_MAX_XFORMS 2 55 #define NUM_MAX_INFLIGHT_OPS 128 56 #define DEFAULT_WINDOW_SIZE 15 57 /* We need extra mbufs per operation to accommodate host buffers that 58 * span a 2MB boundary. 59 */ 60 #define MAX_MBUFS_PER_OP (REDUCE_MAX_IOVECS * 2) 61 #define CHUNK_SIZE (1024 * 16) 62 #define COMP_BDEV_NAME "compress" 63 #define BACKING_IO_SZ (4 * 1024) 64 65 #define ISAL_PMD "compress_isal" 66 #define QAT_PMD "compress_qat" 67 #define NUM_MBUFS 8192 68 #define POOL_CACHE_SIZE 256 69 70 static enum compress_pmd g_opts; 71 72 /* Global list of available compression devices. */ 73 struct compress_dev { 74 struct rte_compressdev_info cdev_info; /* includes device friendly name */ 75 uint8_t cdev_id; /* identifier for the device */ 76 void *comp_xform; /* shared private xform for comp on this PMD */ 77 void *decomp_xform; /* shared private xform for decomp on this PMD */ 78 TAILQ_ENTRY(compress_dev) link; 79 }; 80 static TAILQ_HEAD(, compress_dev) g_compress_devs = TAILQ_HEAD_INITIALIZER(g_compress_devs); 81 82 /* Although ISAL PMD reports 'unlimited' qpairs, it has an unplanned limit of 99 due to 83 * the length of the internal ring name that it creates, it breaks a limit in the generic 84 * ring code and fails the qp initialization. 85 */ 86 #define MAX_NUM_QP 99 87 /* Global list and lock for unique device/queue pair combos */ 88 struct comp_device_qp { 89 struct compress_dev *device; /* ptr to compression device */ 90 uint8_t qp; /* queue pair for this node */ 91 struct spdk_thread *thread; /* thead that this qp is assigned to */ 92 TAILQ_ENTRY(comp_device_qp) link; 93 }; 94 static TAILQ_HEAD(, comp_device_qp) g_comp_device_qp = TAILQ_HEAD_INITIALIZER(g_comp_device_qp); 95 static pthread_mutex_t g_comp_device_qp_lock = PTHREAD_MUTEX_INITIALIZER; 96 97 /* For queueing up compression operations that we can't submit for some reason */ 98 struct vbdev_comp_op { 99 struct spdk_reduce_backing_dev *backing_dev; 100 struct iovec *src_iovs; 101 int src_iovcnt; 102 struct iovec *dst_iovs; 103 int dst_iovcnt; 104 bool compress; 105 void *cb_arg; 106 TAILQ_ENTRY(vbdev_comp_op) link; 107 }; 108 109 struct vbdev_comp_delete_ctx { 110 spdk_delete_compress_complete cb_fn; 111 void *cb_arg; 112 int cb_rc; 113 struct spdk_thread *orig_thread; 114 }; 115 116 /* List of virtual bdevs and associated info for each. */ 117 struct vbdev_compress { 118 struct spdk_bdev *base_bdev; /* the thing we're attaching to */ 119 struct spdk_bdev_desc *base_desc; /* its descriptor we get from open */ 120 struct spdk_io_channel *base_ch; /* IO channel of base device */ 121 struct spdk_bdev comp_bdev; /* the compression virtual bdev */ 122 struct comp_io_channel *comp_ch; /* channel associated with this bdev */ 123 char *drv_name; /* name of the compression device driver */ 124 struct comp_device_qp *device_qp; 125 struct spdk_thread *reduce_thread; 126 pthread_mutex_t reduce_lock; 127 uint32_t ch_count; 128 TAILQ_HEAD(, spdk_bdev_io) pending_comp_ios; /* outstanding operations to a comp library */ 129 struct spdk_poller *poller; /* completion poller */ 130 struct spdk_reduce_vol_params params; /* params for the reduce volume */ 131 struct spdk_reduce_backing_dev backing_dev; /* backing device info for the reduce volume */ 132 struct spdk_reduce_vol *vol; /* the reduce volume */ 133 struct vbdev_comp_delete_ctx *delete_ctx; 134 bool orphaned; /* base bdev claimed but comp_bdev not registered */ 135 TAILQ_HEAD(, vbdev_comp_op) queued_comp_ops; 136 TAILQ_ENTRY(vbdev_compress) link; 137 struct spdk_thread *thread; /* thread where base device is opened */ 138 }; 139 static TAILQ_HEAD(, vbdev_compress) g_vbdev_comp = TAILQ_HEAD_INITIALIZER(g_vbdev_comp); 140 141 /* The comp vbdev channel struct. It is allocated and freed on my behalf by the io channel code. 142 */ 143 struct comp_io_channel { 144 struct spdk_io_channel_iter *iter; /* used with for_each_channel in reset */ 145 }; 146 147 /* Per I/O context for the compression vbdev. */ 148 struct comp_bdev_io { 149 struct comp_io_channel *comp_ch; /* used in completion handling */ 150 struct vbdev_compress *comp_bdev; /* vbdev associated with this IO */ 151 struct spdk_bdev_io_wait_entry bdev_io_wait; /* for bdev_io_wait */ 152 struct spdk_bdev_io *orig_io; /* the original IO */ 153 struct spdk_io_channel *ch; /* for resubmission */ 154 int status; /* save for completion on orig thread */ 155 }; 156 157 /* Shared mempools between all devices on this system */ 158 static struct rte_mempool *g_mbuf_mp = NULL; /* mbuf mempool */ 159 static struct rte_mempool *g_comp_op_mp = NULL; /* comp operations, must be rte* mempool */ 160 static struct rte_mbuf_ext_shared_info g_shinfo = {}; /* used by DPDK mbuf macros */ 161 static bool g_qat_available = false; 162 static bool g_isal_available = false; 163 164 /* Create shared (between all ops per PMD) compress xforms. */ 165 static struct rte_comp_xform g_comp_xform = { 166 .type = RTE_COMP_COMPRESS, 167 .compress = { 168 .algo = RTE_COMP_ALGO_DEFLATE, 169 .deflate.huffman = RTE_COMP_HUFFMAN_DEFAULT, 170 .level = RTE_COMP_LEVEL_MAX, 171 .window_size = DEFAULT_WINDOW_SIZE, 172 .chksum = RTE_COMP_CHECKSUM_NONE, 173 .hash_algo = RTE_COMP_HASH_ALGO_NONE 174 } 175 }; 176 /* Create shared (between all ops per PMD) decompress xforms. */ 177 static struct rte_comp_xform g_decomp_xform = { 178 .type = RTE_COMP_DECOMPRESS, 179 .decompress = { 180 .algo = RTE_COMP_ALGO_DEFLATE, 181 .chksum = RTE_COMP_CHECKSUM_NONE, 182 .window_size = DEFAULT_WINDOW_SIZE, 183 .hash_algo = RTE_COMP_HASH_ALGO_NONE 184 } 185 }; 186 187 static void vbdev_compress_examine(struct spdk_bdev *bdev); 188 static void vbdev_compress_claim(struct vbdev_compress *comp_bdev); 189 static void vbdev_compress_queue_io(struct spdk_bdev_io *bdev_io); 190 struct vbdev_compress *_prepare_for_load_init(struct spdk_bdev *bdev); 191 static void vbdev_compress_submit_request(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io); 192 static void comp_bdev_ch_destroy_cb(void *io_device, void *ctx_buf); 193 static void vbdev_compress_delete_done(void *cb_arg, int bdeverrno); 194 195 /* Dummy function used by DPDK to free ext attached buffers 196 * to mbufs, we free them ourselves but this callback has to 197 * be here. 198 */ 199 static void 200 shinfo_free_cb(void *arg1, void *arg2) 201 { 202 } 203 204 /* Called by vbdev_init_compress_drivers() to init each discovered compression device */ 205 static int 206 create_compress_dev(uint8_t index) 207 { 208 struct compress_dev *device; 209 uint16_t q_pairs; 210 uint8_t cdev_id; 211 int rc, i; 212 struct comp_device_qp *dev_qp; 213 struct comp_device_qp *tmp_qp; 214 215 device = calloc(1, sizeof(struct compress_dev)); 216 if (!device) { 217 return -ENOMEM; 218 } 219 220 /* Get details about this device. */ 221 rte_compressdev_info_get(index, &device->cdev_info); 222 223 cdev_id = device->cdev_id = index; 224 225 /* Zero means no limit so choose number of lcores. */ 226 if (device->cdev_info.max_nb_queue_pairs == 0) { 227 q_pairs = MAX_NUM_QP; 228 } else { 229 q_pairs = spdk_min(device->cdev_info.max_nb_queue_pairs, MAX_NUM_QP); 230 } 231 232 /* Configure the compression device. */ 233 struct rte_compressdev_config config = { 234 .socket_id = rte_socket_id(), 235 .nb_queue_pairs = q_pairs, 236 .max_nb_priv_xforms = NUM_MAX_XFORMS, 237 .max_nb_streams = 0 238 }; 239 rc = rte_compressdev_configure(cdev_id, &config); 240 if (rc < 0) { 241 SPDK_ERRLOG("Failed to configure compressdev %u\n", cdev_id); 242 goto err; 243 } 244 245 /* Pre-setup all potential qpairs now and assign them in the channel 246 * callback. 247 */ 248 for (i = 0; i < q_pairs; i++) { 249 rc = rte_compressdev_queue_pair_setup(cdev_id, i, 250 NUM_MAX_INFLIGHT_OPS, 251 rte_socket_id()); 252 if (rc) { 253 if (i > 0) { 254 q_pairs = i; 255 SPDK_NOTICELOG("FYI failed to setup a queue pair on " 256 "compressdev %u with error %u " 257 "so limiting to %u qpairs\n", 258 cdev_id, rc, q_pairs); 259 break; 260 } else { 261 SPDK_ERRLOG("Failed to setup queue pair on " 262 "compressdev %u with error %u\n", cdev_id, rc); 263 rc = -EINVAL; 264 goto err; 265 } 266 } 267 } 268 269 rc = rte_compressdev_start(cdev_id); 270 if (rc < 0) { 271 SPDK_ERRLOG("Failed to start device %u: error %d\n", 272 cdev_id, rc); 273 goto err; 274 } 275 276 if (device->cdev_info.capabilities->comp_feature_flags & RTE_COMP_FF_SHAREABLE_PRIV_XFORM) { 277 rc = rte_compressdev_private_xform_create(cdev_id, &g_comp_xform, 278 &device->comp_xform); 279 if (rc < 0) { 280 SPDK_ERRLOG("Failed to create private comp xform device %u: error %d\n", 281 cdev_id, rc); 282 goto err; 283 } 284 285 rc = rte_compressdev_private_xform_create(cdev_id, &g_decomp_xform, 286 &device->decomp_xform); 287 if (rc) { 288 SPDK_ERRLOG("Failed to create private decomp xform device %u: error %d\n", 289 cdev_id, rc); 290 goto err; 291 } 292 } else { 293 SPDK_ERRLOG("PMD does not support shared transforms\n"); 294 goto err; 295 } 296 297 /* Build up list of device/qp combinations */ 298 for (i = 0; i < q_pairs; i++) { 299 dev_qp = calloc(1, sizeof(struct comp_device_qp)); 300 if (!dev_qp) { 301 rc = -ENOMEM; 302 goto err; 303 } 304 dev_qp->device = device; 305 dev_qp->qp = i; 306 dev_qp->thread = NULL; 307 TAILQ_INSERT_TAIL(&g_comp_device_qp, dev_qp, link); 308 } 309 310 TAILQ_INSERT_TAIL(&g_compress_devs, device, link); 311 312 if (strcmp(device->cdev_info.driver_name, QAT_PMD) == 0) { 313 g_qat_available = true; 314 } 315 if (strcmp(device->cdev_info.driver_name, ISAL_PMD) == 0) { 316 g_isal_available = true; 317 } 318 319 return 0; 320 321 err: 322 TAILQ_FOREACH_SAFE(dev_qp, &g_comp_device_qp, link, tmp_qp) { 323 TAILQ_REMOVE(&g_comp_device_qp, dev_qp, link); 324 free(dev_qp); 325 } 326 free(device); 327 return rc; 328 } 329 330 /* Called from driver init entry point, vbdev_compress_init() */ 331 static int 332 vbdev_init_compress_drivers(void) 333 { 334 uint8_t cdev_count, i; 335 struct compress_dev *tmp_dev; 336 struct compress_dev *device; 337 int rc; 338 339 /* We always init the compress_isal PMD */ 340 rc = rte_vdev_init(ISAL_PMD, NULL); 341 if (rc == 0) { 342 SPDK_NOTICELOG("created virtual PMD %s\n", ISAL_PMD); 343 } else if (rc == -EEXIST) { 344 SPDK_NOTICELOG("virtual PMD %s already exists.\n", ISAL_PMD); 345 } else { 346 SPDK_ERRLOG("creating virtual PMD %s\n", ISAL_PMD); 347 return -EINVAL; 348 } 349 350 /* If we have no compression devices, there's no reason to continue. */ 351 cdev_count = rte_compressdev_count(); 352 if (cdev_count == 0) { 353 return 0; 354 } 355 if (cdev_count > RTE_COMPRESS_MAX_DEVS) { 356 SPDK_ERRLOG("invalid device count from rte_compressdev_count()\n"); 357 return -EINVAL; 358 } 359 360 g_mbuf_mp = rte_pktmbuf_pool_create("comp_mbuf_mp", NUM_MBUFS, POOL_CACHE_SIZE, 361 sizeof(struct rte_mbuf), 0, rte_socket_id()); 362 if (g_mbuf_mp == NULL) { 363 SPDK_ERRLOG("Cannot create mbuf pool\n"); 364 rc = -ENOMEM; 365 goto error_create_mbuf; 366 } 367 368 g_comp_op_mp = rte_comp_op_pool_create("comp_op_pool", NUM_MBUFS, POOL_CACHE_SIZE, 369 0, rte_socket_id()); 370 if (g_comp_op_mp == NULL) { 371 SPDK_ERRLOG("Cannot create comp op pool\n"); 372 rc = -ENOMEM; 373 goto error_create_op; 374 } 375 376 /* Init all devices */ 377 for (i = 0; i < cdev_count; i++) { 378 rc = create_compress_dev(i); 379 if (rc != 0) { 380 goto error_create_compress_devs; 381 } 382 } 383 384 if (g_qat_available == true) { 385 SPDK_NOTICELOG("initialized QAT PMD\n"); 386 } 387 388 g_shinfo.free_cb = shinfo_free_cb; 389 390 return 0; 391 392 /* Error cleanup paths. */ 393 error_create_compress_devs: 394 TAILQ_FOREACH_SAFE(device, &g_compress_devs, link, tmp_dev) { 395 TAILQ_REMOVE(&g_compress_devs, device, link); 396 free(device); 397 } 398 error_create_op: 399 error_create_mbuf: 400 rte_mempool_free(g_mbuf_mp); 401 402 return rc; 403 } 404 405 /* for completing rw requests on the orig IO thread. */ 406 static void 407 _spdk_reduce_rw_blocks_cb(void *arg) 408 { 409 struct comp_bdev_io *io_ctx = arg; 410 411 if (io_ctx->status == 0) { 412 spdk_bdev_io_complete(io_ctx->orig_io, SPDK_BDEV_IO_STATUS_SUCCESS); 413 } else { 414 SPDK_ERRLOG("status %d on operation from reduce API\n", io_ctx->status); 415 spdk_bdev_io_complete(io_ctx->orig_io, SPDK_BDEV_IO_STATUS_FAILED); 416 } 417 } 418 419 /* Completion callback for r/w that were issued via reducelib. */ 420 static void 421 spdk_reduce_rw_blocks_cb(void *arg, int reduce_errno) 422 { 423 struct spdk_bdev_io *bdev_io = arg; 424 struct comp_bdev_io *io_ctx = (struct comp_bdev_io *)bdev_io->driver_ctx; 425 struct spdk_io_channel *ch = spdk_io_channel_from_ctx(io_ctx->comp_ch); 426 427 /* TODO: need to decide which error codes are bdev_io success vs failure; 428 * example examine calls reading metadata */ 429 430 io_ctx->status = reduce_errno; 431 432 /* Send this request to the orig IO thread. */ 433 if (spdk_io_channel_get_thread(ch) != spdk_get_thread()) { 434 spdk_thread_send_msg(spdk_io_channel_get_thread(ch), _spdk_reduce_rw_blocks_cb, io_ctx); 435 } else { 436 _spdk_reduce_rw_blocks_cb(io_ctx); 437 } 438 } 439 440 static int 441 _compress_operation(struct spdk_reduce_backing_dev *backing_dev, struct iovec *src_iovs, 442 int src_iovcnt, struct iovec *dst_iovs, 443 int dst_iovcnt, bool compress, void *cb_arg) 444 { 445 void *reduce_cb_arg = cb_arg; 446 struct vbdev_compress *comp_bdev = SPDK_CONTAINEROF(backing_dev, struct vbdev_compress, 447 backing_dev); 448 struct rte_comp_op *comp_op; 449 struct rte_mbuf *src_mbufs[MAX_MBUFS_PER_OP]; 450 struct rte_mbuf *dst_mbufs[MAX_MBUFS_PER_OP]; 451 uint8_t cdev_id = comp_bdev->device_qp->device->cdev_id; 452 uint64_t updated_length, remainder, phys_addr, total_length = 0; 453 uint8_t *current_src_base = NULL; 454 uint8_t *current_dst_base = NULL; 455 int iov_index, mbuf_index; 456 int rc = 0; 457 struct vbdev_comp_op *op_to_queue; 458 int i; 459 int src_mbuf_total = src_iovcnt; 460 int dst_mbuf_total = dst_iovcnt; 461 bool device_error = false; 462 463 assert(src_iovcnt < MAX_MBUFS_PER_OP); 464 465 #ifdef DEBUG 466 memset(src_mbufs, 0, sizeof(src_mbufs)); 467 memset(dst_mbufs, 0, sizeof(dst_mbufs)); 468 #endif 469 470 comp_op = rte_comp_op_alloc(g_comp_op_mp); 471 if (!comp_op) { 472 SPDK_ERRLOG("trying to get a comp op!\n"); 473 goto error_get_op; 474 } 475 476 /* get an mbuf per iov, src and dst */ 477 rc = rte_pktmbuf_alloc_bulk(g_mbuf_mp, (struct rte_mbuf **)&src_mbufs[0], src_iovcnt); 478 if (rc) { 479 SPDK_ERRLOG("ERROR trying to get src_mbufs!\n"); 480 goto error_get_src; 481 } 482 483 rc = rte_pktmbuf_alloc_bulk(g_mbuf_mp, (struct rte_mbuf **)&dst_mbufs[0], dst_iovcnt); 484 if (rc) { 485 SPDK_ERRLOG("ERROR trying to get dst_mbufs!\n"); 486 goto error_get_dst; 487 } 488 489 /* There is a 1:1 mapping between a bdev_io and a compression operation, but 490 * all compression PMDs that SPDK uses support chaining so build our mbuf chain 491 * and associate with our single comp_op. 492 */ 493 494 /* Setup src mbufs */ 495 iov_index = mbuf_index = 0; 496 while (iov_index < src_iovcnt) { 497 498 current_src_base = src_iovs[iov_index].iov_base; 499 total_length += src_iovs[iov_index].iov_len; 500 assert(src_mbufs[mbuf_index] != NULL); 501 src_mbufs[mbuf_index]->userdata = reduce_cb_arg; 502 updated_length = src_iovs[iov_index].iov_len; 503 phys_addr = spdk_vtophys((void *)current_src_base, &updated_length); 504 505 rte_pktmbuf_attach_extbuf(src_mbufs[mbuf_index], 506 current_src_base, 507 phys_addr, 508 updated_length, 509 &g_shinfo); 510 rte_pktmbuf_append(src_mbufs[mbuf_index], updated_length); 511 remainder = src_iovs[iov_index].iov_len - updated_length; 512 513 if (mbuf_index > 0) { 514 rte_pktmbuf_chain(src_mbufs[0], src_mbufs[mbuf_index]); 515 } 516 517 /* If we crossed 2 2MB boundary we need another mbuf for the remainder */ 518 if (remainder > 0) { 519 /* allocate an mbuf at the end of the array */ 520 rc = rte_pktmbuf_alloc_bulk(g_mbuf_mp, (struct rte_mbuf **)&src_mbufs[src_mbuf_total], 1); 521 if (rc) { 522 SPDK_ERRLOG("ERROR trying to get an extra src_mbuf!\n"); 523 goto error_src_dst; 524 } 525 src_mbuf_total++; 526 mbuf_index++; 527 src_mbufs[mbuf_index]->userdata = reduce_cb_arg; 528 current_src_base += updated_length; 529 phys_addr = spdk_vtophys((void *)current_src_base, &remainder); 530 /* assert we don't cross another */ 531 assert(remainder == src_iovs[iov_index].iov_len - updated_length); 532 533 rte_pktmbuf_attach_extbuf(src_mbufs[mbuf_index], 534 current_src_base, 535 phys_addr, 536 remainder, 537 &g_shinfo); 538 rte_pktmbuf_append(src_mbufs[mbuf_index], remainder); 539 rte_pktmbuf_chain(src_mbufs[0], src_mbufs[mbuf_index]); 540 } 541 iov_index++; 542 mbuf_index++; 543 } 544 545 comp_op->m_src = src_mbufs[0]; 546 comp_op->src.offset = 0; 547 comp_op->src.length = total_length; 548 549 /* setup dst mbufs, for the current test being used with this code there's only one vector */ 550 iov_index = mbuf_index = 0; 551 while (iov_index < dst_iovcnt) { 552 553 current_dst_base = dst_iovs[iov_index].iov_base; 554 updated_length = dst_iovs[iov_index].iov_len; 555 phys_addr = spdk_vtophys((void *)current_dst_base, &updated_length); 556 557 rte_pktmbuf_attach_extbuf(dst_mbufs[mbuf_index], 558 current_dst_base, 559 phys_addr, 560 updated_length, 561 &g_shinfo); 562 rte_pktmbuf_append(dst_mbufs[mbuf_index], updated_length); 563 remainder = dst_iovs[iov_index].iov_len - updated_length; 564 565 if (mbuf_index > 0) { 566 rte_pktmbuf_chain(dst_mbufs[0], dst_mbufs[mbuf_index]); 567 } 568 569 /* If we crossed 2 2MB boundary we need another mbuf for the remainder */ 570 if (remainder > 0) { 571 rc = rte_pktmbuf_alloc_bulk(g_mbuf_mp, (struct rte_mbuf **)&dst_mbufs[dst_mbuf_total], 1); 572 if (rc) { 573 SPDK_ERRLOG("ERROR trying to get an extra dst_mbuf!\n"); 574 goto error_src_dst; 575 } 576 dst_mbuf_total++; 577 mbuf_index++; 578 current_dst_base += updated_length; 579 phys_addr = spdk_vtophys((void *)current_dst_base, &remainder); 580 /* assert we don't cross another */ 581 assert(remainder == dst_iovs[iov_index].iov_len - updated_length); 582 583 rte_pktmbuf_attach_extbuf(dst_mbufs[mbuf_index], 584 current_dst_base, 585 phys_addr, 586 remainder, 587 &g_shinfo); 588 rte_pktmbuf_append(dst_mbufs[mbuf_index], remainder); 589 rte_pktmbuf_chain(dst_mbufs[0], dst_mbufs[mbuf_index]); 590 } 591 iov_index++; 592 mbuf_index++; 593 } 594 595 comp_op->m_dst = dst_mbufs[0]; 596 comp_op->dst.offset = 0; 597 598 if (compress == true) { 599 comp_op->private_xform = comp_bdev->device_qp->device->comp_xform; 600 } else { 601 comp_op->private_xform = comp_bdev->device_qp->device->decomp_xform; 602 } 603 604 comp_op->op_type = RTE_COMP_OP_STATELESS; 605 comp_op->flush_flag = RTE_COMP_FLUSH_FINAL; 606 607 rc = rte_compressdev_enqueue_burst(cdev_id, comp_bdev->device_qp->qp, &comp_op, 1); 608 assert(rc <= 1); 609 610 /* We always expect 1 got queued, if 0 then we need to queue it up. */ 611 if (rc == 1) { 612 return 0; 613 } else if (comp_op->status == RTE_COMP_OP_STATUS_NOT_PROCESSED) { 614 /* we free mbufs differently depending on whether they were chained or not */ 615 rte_pktmbuf_free(comp_op->m_src); 616 rte_pktmbuf_free(comp_op->m_dst); 617 goto error_enqueue; 618 } else { 619 device_error = true; 620 goto error_src_dst; 621 } 622 623 /* Error cleanup paths. */ 624 error_src_dst: 625 for (i = 0; i < dst_mbuf_total; i++) { 626 rte_pktmbuf_free((struct rte_mbuf *)&dst_mbufs[i]); 627 } 628 error_get_dst: 629 for (i = 0; i < src_mbuf_total; i++) { 630 rte_pktmbuf_free((struct rte_mbuf *)&src_mbufs[i]); 631 } 632 error_get_src: 633 error_enqueue: 634 rte_comp_op_free(comp_op); 635 error_get_op: 636 637 if (device_error == true) { 638 /* There was an error sending the op to the device, most 639 * likely with the parameters. 640 */ 641 SPDK_ERRLOG("Compression API returned 0x%x\n", comp_op->status); 642 return -EINVAL; 643 } 644 645 op_to_queue = calloc(1, sizeof(struct vbdev_comp_op)); 646 if (op_to_queue == NULL) { 647 SPDK_ERRLOG("unable to allocate operation for queueing.\n"); 648 return -ENOMEM; 649 } 650 op_to_queue->backing_dev = backing_dev; 651 op_to_queue->src_iovs = src_iovs; 652 op_to_queue->src_iovcnt = src_iovcnt; 653 op_to_queue->dst_iovs = dst_iovs; 654 op_to_queue->dst_iovcnt = dst_iovcnt; 655 op_to_queue->compress = compress; 656 op_to_queue->cb_arg = cb_arg; 657 TAILQ_INSERT_TAIL(&comp_bdev->queued_comp_ops, 658 op_to_queue, 659 link); 660 return 0; 661 } 662 663 /* Poller for the DPDK compression driver. */ 664 static int 665 comp_dev_poller(void *args) 666 { 667 struct vbdev_compress *comp_bdev = args; 668 uint8_t cdev_id = comp_bdev->device_qp->device->cdev_id; 669 struct rte_comp_op *deq_ops[NUM_MAX_INFLIGHT_OPS]; 670 uint16_t num_deq; 671 struct spdk_reduce_vol_cb_args *reduce_args; 672 struct vbdev_comp_op *op_to_resubmit; 673 int rc, i; 674 675 num_deq = rte_compressdev_dequeue_burst(cdev_id, comp_bdev->device_qp->qp, deq_ops, 676 NUM_MAX_INFLIGHT_OPS); 677 for (i = 0; i < num_deq; i++) { 678 reduce_args = (struct spdk_reduce_vol_cb_args *)deq_ops[i]->m_src->userdata; 679 680 if (deq_ops[i]->status == RTE_COMP_OP_STATUS_SUCCESS) { 681 682 /* tell reduce this is done and what the bytecount was */ 683 reduce_args->cb_fn(reduce_args->cb_arg, deq_ops[i]->produced); 684 } else { 685 SPDK_NOTICELOG("FYI storing data uncompressed due to deque status %u\n", 686 deq_ops[i]->status); 687 688 /* Reduce will simply store uncompressed on neg errno value. */ 689 reduce_args->cb_fn(reduce_args->cb_arg, -EINVAL); 690 } 691 692 /* Now free both mbufs and the compress operation. The rte_pktmbuf_free() 693 * call takes care of freeing all of the mbufs in the chain back to their 694 * original pool. 695 */ 696 rte_pktmbuf_free(deq_ops[i]->m_src); 697 rte_pktmbuf_free(deq_ops[i]->m_dst); 698 699 /* There is no bulk free for com ops so we have to free them one at a time 700 * here however it would be rare that we'd ever have more than 1 at a time 701 * anyways. 702 */ 703 rte_comp_op_free(deq_ops[i]); 704 705 /* Check if there are any pending comp ops to process, only pull one 706 * at a time off as _compress_operation() may re-queue the op. 707 */ 708 if (!TAILQ_EMPTY(&comp_bdev->queued_comp_ops)) { 709 op_to_resubmit = TAILQ_FIRST(&comp_bdev->queued_comp_ops); 710 rc = _compress_operation(op_to_resubmit->backing_dev, 711 op_to_resubmit->src_iovs, 712 op_to_resubmit->src_iovcnt, 713 op_to_resubmit->dst_iovs, 714 op_to_resubmit->dst_iovcnt, 715 op_to_resubmit->compress, 716 op_to_resubmit->cb_arg); 717 if (rc == 0) { 718 TAILQ_REMOVE(&comp_bdev->queued_comp_ops, op_to_resubmit, link); 719 free(op_to_resubmit); 720 } 721 } 722 } 723 return 0; 724 } 725 726 /* Entry point for reduce lib to issue a compress operation. */ 727 static void 728 _comp_reduce_compress(struct spdk_reduce_backing_dev *dev, 729 struct iovec *src_iovs, int src_iovcnt, 730 struct iovec *dst_iovs, int dst_iovcnt, 731 struct spdk_reduce_vol_cb_args *cb_arg) 732 { 733 int rc; 734 735 rc = _compress_operation(dev, src_iovs, src_iovcnt, dst_iovs, dst_iovcnt, true, cb_arg); 736 if (rc) { 737 SPDK_ERRLOG("with compress operation code %d (%s)\n", rc, spdk_strerror(-rc)); 738 cb_arg->cb_fn(cb_arg->cb_arg, rc); 739 } 740 } 741 742 /* Entry point for reduce lib to issue a decompress operation. */ 743 static void 744 _comp_reduce_decompress(struct spdk_reduce_backing_dev *dev, 745 struct iovec *src_iovs, int src_iovcnt, 746 struct iovec *dst_iovs, int dst_iovcnt, 747 struct spdk_reduce_vol_cb_args *cb_arg) 748 { 749 int rc; 750 751 rc = _compress_operation(dev, src_iovs, src_iovcnt, dst_iovs, dst_iovcnt, false, cb_arg); 752 if (rc) { 753 SPDK_ERRLOG("with decompress operation code %d (%s)\n", rc, spdk_strerror(-rc)); 754 cb_arg->cb_fn(cb_arg->cb_arg, rc); 755 } 756 } 757 758 /* Callback for getting a buf from the bdev pool in the event that the caller passed 759 * in NULL, we need to own the buffer so it doesn't get freed by another vbdev module 760 * beneath us before we're done with it. 761 */ 762 static void 763 comp_read_get_buf_cb(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io, bool success) 764 { 765 struct vbdev_compress *comp_bdev = SPDK_CONTAINEROF(bdev_io->bdev, struct vbdev_compress, 766 comp_bdev); 767 768 spdk_reduce_vol_readv(comp_bdev->vol, bdev_io->u.bdev.iovs, bdev_io->u.bdev.iovcnt, 769 bdev_io->u.bdev.offset_blocks, bdev_io->u.bdev.num_blocks, 770 spdk_reduce_rw_blocks_cb, bdev_io); 771 } 772 773 /* scheduled for completion on IO thread */ 774 static void 775 _complete_other_io(void *arg) 776 { 777 struct comp_bdev_io *io_ctx = (struct comp_bdev_io *)arg; 778 if (io_ctx->status == 0) { 779 spdk_bdev_io_complete(io_ctx->orig_io, SPDK_BDEV_IO_STATUS_SUCCESS); 780 } else { 781 spdk_bdev_io_complete(io_ctx->orig_io, SPDK_BDEV_IO_STATUS_FAILED); 782 } 783 } 784 785 /* scheduled for submission on reduce thread */ 786 static void 787 _comp_bdev_io_submit(void *arg) 788 { 789 struct spdk_bdev_io *bdev_io = arg; 790 struct comp_bdev_io *io_ctx = (struct comp_bdev_io *)bdev_io->driver_ctx; 791 struct spdk_io_channel *ch = spdk_io_channel_from_ctx(io_ctx->comp_ch); 792 struct vbdev_compress *comp_bdev = SPDK_CONTAINEROF(bdev_io->bdev, struct vbdev_compress, 793 comp_bdev); 794 int rc = 0; 795 796 switch (bdev_io->type) { 797 case SPDK_BDEV_IO_TYPE_READ: 798 spdk_bdev_io_get_buf(bdev_io, comp_read_get_buf_cb, 799 bdev_io->u.bdev.num_blocks * bdev_io->bdev->blocklen); 800 return; 801 case SPDK_BDEV_IO_TYPE_WRITE: 802 spdk_reduce_vol_writev(comp_bdev->vol, bdev_io->u.bdev.iovs, bdev_io->u.bdev.iovcnt, 803 bdev_io->u.bdev.offset_blocks, bdev_io->u.bdev.num_blocks, 804 spdk_reduce_rw_blocks_cb, bdev_io); 805 return; 806 /* TODO in future patch in the series */ 807 case SPDK_BDEV_IO_TYPE_RESET: 808 break; 809 case SPDK_BDEV_IO_TYPE_WRITE_ZEROES: 810 case SPDK_BDEV_IO_TYPE_UNMAP: 811 case SPDK_BDEV_IO_TYPE_FLUSH: 812 default: 813 SPDK_ERRLOG("Unknown I/O type %d\n", bdev_io->type); 814 rc = -EINVAL; 815 } 816 817 if (rc) { 818 if (rc == -ENOMEM) { 819 SPDK_ERRLOG("No memory, start to queue io for compress.\n"); 820 io_ctx->ch = ch; 821 vbdev_compress_queue_io(bdev_io); 822 return; 823 } else { 824 SPDK_ERRLOG("on bdev_io submission!\n"); 825 io_ctx->status = rc; 826 } 827 } 828 829 /* Complete this on the orig IO thread. */ 830 if (spdk_io_channel_get_thread(ch) != spdk_get_thread()) { 831 spdk_thread_send_msg(spdk_io_channel_get_thread(ch), _complete_other_io, io_ctx); 832 } else { 833 _complete_other_io(io_ctx); 834 } 835 } 836 837 /* Called when someone above submits IO to this vbdev. */ 838 static void 839 vbdev_compress_submit_request(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io) 840 { 841 struct comp_bdev_io *io_ctx = (struct comp_bdev_io *)bdev_io->driver_ctx; 842 struct vbdev_compress *comp_bdev = SPDK_CONTAINEROF(bdev_io->bdev, struct vbdev_compress, 843 comp_bdev); 844 struct comp_io_channel *comp_ch = spdk_io_channel_get_ctx(ch); 845 846 memset(io_ctx, 0, sizeof(struct comp_bdev_io)); 847 io_ctx->comp_bdev = comp_bdev; 848 io_ctx->comp_ch = comp_ch; 849 io_ctx->orig_io = bdev_io; 850 851 /* Send this request to the reduce_thread if that's not what we're on. */ 852 if (spdk_io_channel_get_thread(ch) != comp_bdev->reduce_thread) { 853 spdk_thread_send_msg(comp_bdev->reduce_thread, _comp_bdev_io_submit, bdev_io); 854 } else { 855 _comp_bdev_io_submit(bdev_io); 856 } 857 } 858 859 static bool 860 vbdev_compress_io_type_supported(void *ctx, enum spdk_bdev_io_type io_type) 861 { 862 struct vbdev_compress *comp_bdev = (struct vbdev_compress *)ctx; 863 864 switch (io_type) { 865 case SPDK_BDEV_IO_TYPE_READ: 866 case SPDK_BDEV_IO_TYPE_WRITE: 867 return spdk_bdev_io_type_supported(comp_bdev->base_bdev, io_type); 868 case SPDK_BDEV_IO_TYPE_UNMAP: 869 case SPDK_BDEV_IO_TYPE_RESET: 870 case SPDK_BDEV_IO_TYPE_FLUSH: 871 case SPDK_BDEV_IO_TYPE_WRITE_ZEROES: 872 default: 873 return false; 874 } 875 } 876 877 /* Resubmission function used by the bdev layer when a queued IO is ready to be 878 * submitted. 879 */ 880 static void 881 vbdev_compress_resubmit_io(void *arg) 882 { 883 struct spdk_bdev_io *bdev_io = (struct spdk_bdev_io *)arg; 884 struct comp_bdev_io *io_ctx = (struct comp_bdev_io *)bdev_io->driver_ctx; 885 886 vbdev_compress_submit_request(io_ctx->ch, bdev_io); 887 } 888 889 /* Used to queue an IO in the event of resource issues. */ 890 static void 891 vbdev_compress_queue_io(struct spdk_bdev_io *bdev_io) 892 { 893 struct comp_bdev_io *io_ctx = (struct comp_bdev_io *)bdev_io->driver_ctx; 894 int rc; 895 896 io_ctx->bdev_io_wait.bdev = bdev_io->bdev; 897 io_ctx->bdev_io_wait.cb_fn = vbdev_compress_resubmit_io; 898 io_ctx->bdev_io_wait.cb_arg = bdev_io; 899 900 rc = spdk_bdev_queue_io_wait(bdev_io->bdev, io_ctx->comp_bdev->base_ch, &io_ctx->bdev_io_wait); 901 if (rc) { 902 SPDK_ERRLOG("Queue io failed in vbdev_compress_queue_io, rc=%d.\n", rc); 903 assert(false); 904 spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED); 905 } 906 } 907 908 /* Callback for unregistering the IO device. */ 909 static void 910 _device_unregister_cb(void *io_device) 911 { 912 struct vbdev_compress *comp_bdev = io_device; 913 914 /* Done with this comp_bdev. */ 915 pthread_mutex_destroy(&comp_bdev->reduce_lock); 916 free(comp_bdev->comp_bdev.name); 917 free(comp_bdev); 918 } 919 920 static void 921 _vbdev_compress_destruct_cb(void *ctx) 922 { 923 struct spdk_bdev_desc *desc = ctx; 924 925 spdk_bdev_close(desc); 926 } 927 928 static void 929 vbdev_compress_destruct_cb(void *cb_arg, int reduce_errno) 930 { 931 struct vbdev_compress *comp_bdev = (struct vbdev_compress *)cb_arg; 932 933 if (reduce_errno) { 934 SPDK_ERRLOG("number %d\n", reduce_errno); 935 } else { 936 TAILQ_REMOVE(&g_vbdev_comp, comp_bdev, link); 937 spdk_bdev_module_release_bdev(comp_bdev->base_bdev); 938 /* Close the underlying bdev on its same opened thread. */ 939 if (comp_bdev->thread && comp_bdev->thread != spdk_get_thread()) { 940 spdk_thread_send_msg(comp_bdev->thread, _vbdev_compress_destruct_cb, comp_bdev->base_desc); 941 } else { 942 spdk_bdev_close(comp_bdev->base_desc); 943 } 944 comp_bdev->vol = NULL; 945 if (comp_bdev->orphaned == false) { 946 spdk_io_device_unregister(comp_bdev, _device_unregister_cb); 947 } else { 948 vbdev_compress_delete_done(comp_bdev->delete_ctx, 0); 949 _device_unregister_cb(comp_bdev); 950 } 951 } 952 } 953 954 static void 955 _reduce_destroy_cb(void *ctx, int reduce_errno) 956 { 957 struct vbdev_compress *comp_bdev = (struct vbdev_compress *)ctx; 958 959 if (reduce_errno) { 960 SPDK_ERRLOG("number %d\n", reduce_errno); 961 } 962 963 comp_bdev->vol = NULL; 964 spdk_put_io_channel(comp_bdev->base_ch); 965 if (comp_bdev->orphaned == false) { 966 spdk_bdev_unregister(&comp_bdev->comp_bdev, vbdev_compress_delete_done, 967 comp_bdev->delete_ctx); 968 } else { 969 vbdev_compress_destruct_cb((void *)comp_bdev, 0); 970 } 971 972 } 973 974 static void 975 _delete_vol_unload_cb(void *ctx) 976 { 977 struct vbdev_compress *comp_bdev = ctx; 978 979 /* FIXME: Assert if these conditions are not satisified for now. */ 980 assert(!comp_bdev->reduce_thread || 981 comp_bdev->reduce_thread == spdk_get_thread()); 982 983 /* reducelib needs a channel to comm with the backing device */ 984 comp_bdev->base_ch = spdk_bdev_get_io_channel(comp_bdev->base_desc); 985 986 /* Clean the device before we free our resources. */ 987 spdk_reduce_vol_destroy(&comp_bdev->backing_dev, _reduce_destroy_cb, comp_bdev); 988 } 989 990 /* Called by reduceLib after performing unload vol actions */ 991 static void 992 delete_vol_unload_cb(void *cb_arg, int reduce_errno) 993 { 994 struct vbdev_compress *comp_bdev = (struct vbdev_compress *)cb_arg; 995 996 if (reduce_errno) { 997 SPDK_ERRLOG("number %d\n", reduce_errno); 998 /* FIXME: callback should be executed. */ 999 return; 1000 } 1001 1002 pthread_mutex_lock(&comp_bdev->reduce_lock); 1003 if (comp_bdev->reduce_thread && comp_bdev->reduce_thread != spdk_get_thread()) { 1004 spdk_thread_send_msg(comp_bdev->reduce_thread, 1005 _delete_vol_unload_cb, comp_bdev); 1006 pthread_mutex_unlock(&comp_bdev->reduce_lock); 1007 } else { 1008 pthread_mutex_unlock(&comp_bdev->reduce_lock); 1009 1010 _delete_vol_unload_cb(comp_bdev); 1011 } 1012 } 1013 1014 const char * 1015 compress_get_name(const struct vbdev_compress *comp_bdev) 1016 { 1017 return comp_bdev->comp_bdev.name; 1018 } 1019 1020 struct vbdev_compress * 1021 compress_bdev_first(void) 1022 { 1023 struct vbdev_compress *comp_bdev; 1024 1025 comp_bdev = TAILQ_FIRST(&g_vbdev_comp); 1026 1027 return comp_bdev; 1028 } 1029 1030 struct vbdev_compress * 1031 compress_bdev_next(struct vbdev_compress *prev) 1032 { 1033 struct vbdev_compress *comp_bdev; 1034 1035 comp_bdev = TAILQ_NEXT(prev, link); 1036 1037 return comp_bdev; 1038 } 1039 1040 bool 1041 compress_has_orphan(const char *name) 1042 { 1043 struct vbdev_compress *comp_bdev; 1044 1045 TAILQ_FOREACH(comp_bdev, &g_vbdev_comp, link) { 1046 if (comp_bdev->orphaned && strcmp(name, comp_bdev->comp_bdev.name) == 0) { 1047 return true; 1048 } 1049 } 1050 return false; 1051 } 1052 1053 /* Called after we've unregistered following a hot remove callback. 1054 * Our finish entry point will be called next. 1055 */ 1056 static int 1057 vbdev_compress_destruct(void *ctx) 1058 { 1059 struct vbdev_compress *comp_bdev = (struct vbdev_compress *)ctx; 1060 1061 if (comp_bdev->vol != NULL) { 1062 /* Tell reducelib that we're done with this volume. */ 1063 spdk_reduce_vol_unload(comp_bdev->vol, vbdev_compress_destruct_cb, comp_bdev); 1064 } else { 1065 vbdev_compress_destruct_cb(comp_bdev, 0); 1066 } 1067 1068 return 0; 1069 } 1070 1071 /* We supplied this as an entry point for upper layers who want to communicate to this 1072 * bdev. This is how they get a channel. 1073 */ 1074 static struct spdk_io_channel * 1075 vbdev_compress_get_io_channel(void *ctx) 1076 { 1077 struct vbdev_compress *comp_bdev = (struct vbdev_compress *)ctx; 1078 1079 /* The IO channel code will allocate a channel for us which consists of 1080 * the SPDK channel structure plus the size of our comp_io_channel struct 1081 * that we passed in when we registered our IO device. It will then call 1082 * our channel create callback to populate any elements that we need to 1083 * update. 1084 */ 1085 return spdk_get_io_channel(comp_bdev); 1086 } 1087 1088 /* This is the output for bdev_get_bdevs() for this vbdev */ 1089 static int 1090 vbdev_compress_dump_info_json(void *ctx, struct spdk_json_write_ctx *w) 1091 { 1092 struct vbdev_compress *comp_bdev = (struct vbdev_compress *)ctx; 1093 1094 spdk_json_write_name(w, "compress"); 1095 spdk_json_write_object_begin(w); 1096 spdk_json_write_named_string(w, "name", spdk_bdev_get_name(&comp_bdev->comp_bdev)); 1097 spdk_json_write_named_string(w, "base_bdev_name", spdk_bdev_get_name(comp_bdev->base_bdev)); 1098 spdk_json_write_named_string(w, "compression_pmd", comp_bdev->drv_name); 1099 spdk_json_write_object_end(w); 1100 1101 return 0; 1102 } 1103 1104 /* This is used to generate JSON that can configure this module to its current state. */ 1105 static int 1106 vbdev_compress_config_json(struct spdk_json_write_ctx *w) 1107 { 1108 struct vbdev_compress *comp_bdev; 1109 1110 TAILQ_FOREACH(comp_bdev, &g_vbdev_comp, link) { 1111 spdk_json_write_object_begin(w); 1112 spdk_json_write_named_string(w, "method", "bdev_compress_create"); 1113 spdk_json_write_named_object_begin(w, "params"); 1114 spdk_json_write_named_string(w, "base_bdev_name", spdk_bdev_get_name(comp_bdev->base_bdev)); 1115 spdk_json_write_named_string(w, "name", spdk_bdev_get_name(&comp_bdev->comp_bdev)); 1116 spdk_json_write_named_string(w, "compression_pmd", comp_bdev->drv_name); 1117 spdk_json_write_object_end(w); 1118 spdk_json_write_object_end(w); 1119 } 1120 return 0; 1121 } 1122 1123 static void 1124 _vbdev_reduce_init_cb(void *ctx) 1125 { 1126 struct spdk_bdev_desc *desc = ctx; 1127 1128 spdk_bdev_close(desc); 1129 } 1130 1131 /* Callback from reduce for when init is complete. We'll pass the vbdev_comp struct 1132 * used for initial metadata operations to claim where it will be further filled out 1133 * and added to the global list. 1134 */ 1135 static void 1136 vbdev_reduce_init_cb(void *cb_arg, struct spdk_reduce_vol *vol, int reduce_errno) 1137 { 1138 struct vbdev_compress *meta_ctx = cb_arg; 1139 1140 /* We're done with metadata operations */ 1141 spdk_put_io_channel(meta_ctx->base_ch); 1142 /* Close the underlying bdev on its same opened thread. */ 1143 if (meta_ctx->thread && meta_ctx->thread != spdk_get_thread()) { 1144 spdk_thread_send_msg(meta_ctx->thread, _vbdev_reduce_init_cb, meta_ctx->base_desc); 1145 } else { 1146 spdk_bdev_close(meta_ctx->base_desc); 1147 } 1148 meta_ctx->base_desc = NULL; 1149 1150 if (reduce_errno == 0) { 1151 meta_ctx->vol = vol; 1152 vbdev_compress_claim(meta_ctx); 1153 } else { 1154 SPDK_ERRLOG("for vol %s, error %u\n", 1155 spdk_bdev_get_name(meta_ctx->base_bdev), reduce_errno); 1156 free(meta_ctx); 1157 } 1158 } 1159 1160 /* Callback for the function used by reduceLib to perform IO to/from the backing device. We just 1161 * call the callback provided by reduceLib when it called the read/write/unmap function and 1162 * free the bdev_io. 1163 */ 1164 static void 1165 comp_reduce_io_cb(struct spdk_bdev_io *bdev_io, bool success, void *arg) 1166 { 1167 struct spdk_reduce_vol_cb_args *cb_args = arg; 1168 int reduce_errno; 1169 1170 if (success) { 1171 reduce_errno = 0; 1172 } else { 1173 reduce_errno = -EIO; 1174 } 1175 spdk_bdev_free_io(bdev_io); 1176 cb_args->cb_fn(cb_args->cb_arg, reduce_errno); 1177 } 1178 1179 /* This is the function provided to the reduceLib for sending reads directly to 1180 * the backing device. 1181 */ 1182 static void 1183 _comp_reduce_readv(struct spdk_reduce_backing_dev *dev, struct iovec *iov, int iovcnt, 1184 uint64_t lba, uint32_t lba_count, struct spdk_reduce_vol_cb_args *args) 1185 { 1186 struct vbdev_compress *comp_bdev = SPDK_CONTAINEROF(dev, struct vbdev_compress, 1187 backing_dev); 1188 int rc; 1189 1190 rc = spdk_bdev_readv_blocks(comp_bdev->base_desc, comp_bdev->base_ch, 1191 iov, iovcnt, lba, lba_count, 1192 comp_reduce_io_cb, 1193 args); 1194 if (rc) { 1195 if (rc == -ENOMEM) { 1196 SPDK_ERRLOG("No memory, start to queue io.\n"); 1197 /* TODO: there's no bdev_io to queue */ 1198 } else { 1199 SPDK_ERRLOG("submitting readv request\n"); 1200 } 1201 args->cb_fn(args->cb_arg, rc); 1202 } 1203 } 1204 1205 /* This is the function provided to the reduceLib for sending writes directly to 1206 * the backing device. 1207 */ 1208 static void 1209 _comp_reduce_writev(struct spdk_reduce_backing_dev *dev, struct iovec *iov, int iovcnt, 1210 uint64_t lba, uint32_t lba_count, struct spdk_reduce_vol_cb_args *args) 1211 { 1212 struct vbdev_compress *comp_bdev = SPDK_CONTAINEROF(dev, struct vbdev_compress, 1213 backing_dev); 1214 int rc; 1215 1216 rc = spdk_bdev_writev_blocks(comp_bdev->base_desc, comp_bdev->base_ch, 1217 iov, iovcnt, lba, lba_count, 1218 comp_reduce_io_cb, 1219 args); 1220 if (rc) { 1221 if (rc == -ENOMEM) { 1222 SPDK_ERRLOG("No memory, start to queue io.\n"); 1223 /* TODO: there's no bdev_io to queue */ 1224 } else { 1225 SPDK_ERRLOG("error submitting writev request\n"); 1226 } 1227 args->cb_fn(args->cb_arg, rc); 1228 } 1229 } 1230 1231 /* This is the function provided to the reduceLib for sending unmaps directly to 1232 * the backing device. 1233 */ 1234 static void 1235 _comp_reduce_unmap(struct spdk_reduce_backing_dev *dev, 1236 uint64_t lba, uint32_t lba_count, struct spdk_reduce_vol_cb_args *args) 1237 { 1238 struct vbdev_compress *comp_bdev = SPDK_CONTAINEROF(dev, struct vbdev_compress, 1239 backing_dev); 1240 int rc; 1241 1242 rc = spdk_bdev_unmap_blocks(comp_bdev->base_desc, comp_bdev->base_ch, 1243 lba, lba_count, 1244 comp_reduce_io_cb, 1245 args); 1246 1247 if (rc) { 1248 if (rc == -ENOMEM) { 1249 SPDK_ERRLOG("No memory, start to queue io.\n"); 1250 /* TODO: there's no bdev_io to queue */ 1251 } else { 1252 SPDK_ERRLOG("submitting unmap request\n"); 1253 } 1254 args->cb_fn(args->cb_arg, rc); 1255 } 1256 } 1257 1258 /* Called by reduceLib after performing unload vol actions following base bdev hotremove */ 1259 static void 1260 bdev_hotremove_vol_unload_cb(void *cb_arg, int reduce_errno) 1261 { 1262 struct vbdev_compress *comp_bdev = (struct vbdev_compress *)cb_arg; 1263 1264 if (reduce_errno) { 1265 SPDK_ERRLOG("number %d\n", reduce_errno); 1266 } 1267 1268 comp_bdev->vol = NULL; 1269 spdk_bdev_unregister(&comp_bdev->comp_bdev, NULL, NULL); 1270 } 1271 1272 /* Called when the underlying base bdev goes away. */ 1273 static void 1274 vbdev_compress_base_bdev_hotremove_cb(void *ctx) 1275 { 1276 struct vbdev_compress *comp_bdev, *tmp; 1277 struct spdk_bdev *bdev_find = ctx; 1278 1279 TAILQ_FOREACH_SAFE(comp_bdev, &g_vbdev_comp, link, tmp) { 1280 if (bdev_find == comp_bdev->base_bdev) { 1281 /* Tell reduceLib that we're done with this volume. */ 1282 spdk_reduce_vol_unload(comp_bdev->vol, bdev_hotremove_vol_unload_cb, comp_bdev); 1283 } 1284 } 1285 } 1286 1287 /* TODO: determine which parms we want user configurable, HC for now 1288 * params.vol_size 1289 * params.chunk_size 1290 * compression PMD, algorithm, window size, comp level, etc. 1291 * DEV_MD_PATH 1292 */ 1293 1294 /* Common function for init and load to allocate and populate the minimal 1295 * information for reducelib to init or load. 1296 */ 1297 struct vbdev_compress * 1298 _prepare_for_load_init(struct spdk_bdev *bdev) 1299 { 1300 struct vbdev_compress *meta_ctx; 1301 1302 meta_ctx = calloc(1, sizeof(struct vbdev_compress)); 1303 if (meta_ctx == NULL) { 1304 SPDK_ERRLOG("failed to alloc init contexts\n"); 1305 return NULL; 1306 } 1307 1308 meta_ctx->drv_name = "None"; 1309 meta_ctx->base_bdev = bdev; 1310 meta_ctx->backing_dev.unmap = _comp_reduce_unmap; 1311 meta_ctx->backing_dev.readv = _comp_reduce_readv; 1312 meta_ctx->backing_dev.writev = _comp_reduce_writev; 1313 meta_ctx->backing_dev.compress = _comp_reduce_compress; 1314 meta_ctx->backing_dev.decompress = _comp_reduce_decompress; 1315 1316 meta_ctx->backing_dev.blocklen = bdev->blocklen; 1317 meta_ctx->backing_dev.blockcnt = bdev->blockcnt; 1318 1319 meta_ctx->params.chunk_size = CHUNK_SIZE; 1320 meta_ctx->params.logical_block_size = bdev->blocklen; 1321 meta_ctx->params.backing_io_unit_size = BACKING_IO_SZ; 1322 return meta_ctx; 1323 } 1324 1325 static bool 1326 _set_pmd(struct vbdev_compress *comp_dev) 1327 { 1328 if (g_opts == COMPRESS_PMD_AUTO) { 1329 if (g_qat_available) { 1330 comp_dev->drv_name = QAT_PMD; 1331 } else { 1332 comp_dev->drv_name = ISAL_PMD; 1333 } 1334 } else if (g_opts == COMPRESS_PMD_QAT_ONLY && g_qat_available) { 1335 comp_dev->drv_name = QAT_PMD; 1336 } else if (g_opts == COMPRESS_PMD_ISAL_ONLY && g_isal_available) { 1337 comp_dev->drv_name = ISAL_PMD; 1338 } else { 1339 SPDK_ERRLOG("Requested PMD is not available.\n"); 1340 return false; 1341 } 1342 SPDK_NOTICELOG("PMD being used: %s\n", comp_dev->drv_name); 1343 return true; 1344 } 1345 1346 /* Call reducelib to initialize a new volume */ 1347 static int 1348 vbdev_init_reduce(struct spdk_bdev *bdev, const char *pm_path) 1349 { 1350 struct vbdev_compress *meta_ctx; 1351 int rc; 1352 1353 meta_ctx = _prepare_for_load_init(bdev); 1354 if (meta_ctx == NULL) { 1355 return -EINVAL; 1356 } 1357 1358 if (_set_pmd(meta_ctx) == false) { 1359 SPDK_ERRLOG("could not find required pmd\n"); 1360 free(meta_ctx); 1361 return -EINVAL; 1362 } 1363 1364 rc = spdk_bdev_open(meta_ctx->base_bdev, true, vbdev_compress_base_bdev_hotremove_cb, 1365 meta_ctx->base_bdev, &meta_ctx->base_desc); 1366 if (rc) { 1367 SPDK_ERRLOG("could not open bdev %s\n", spdk_bdev_get_name(meta_ctx->base_bdev)); 1368 free(meta_ctx); 1369 return -EINVAL; 1370 } 1371 1372 /* Save the thread where the base device is opened */ 1373 meta_ctx->thread = spdk_get_thread(); 1374 1375 meta_ctx->base_ch = spdk_bdev_get_io_channel(meta_ctx->base_desc); 1376 1377 spdk_reduce_vol_init(&meta_ctx->params, &meta_ctx->backing_dev, 1378 pm_path, 1379 vbdev_reduce_init_cb, 1380 meta_ctx); 1381 return 0; 1382 } 1383 1384 /* We provide this callback for the SPDK channel code to create a channel using 1385 * the channel struct we provided in our module get_io_channel() entry point. Here 1386 * we get and save off an underlying base channel of the device below us so that 1387 * we can communicate with the base bdev on a per channel basis. If we needed 1388 * our own poller for this vbdev, we'd register it here. 1389 */ 1390 static int 1391 comp_bdev_ch_create_cb(void *io_device, void *ctx_buf) 1392 { 1393 struct vbdev_compress *comp_bdev = io_device; 1394 struct comp_device_qp *device_qp; 1395 1396 /* Now set the reduce channel if it's not already set. */ 1397 pthread_mutex_lock(&comp_bdev->reduce_lock); 1398 if (comp_bdev->ch_count == 0) { 1399 /* We use this queue to track outstanding IO in our layer. */ 1400 TAILQ_INIT(&comp_bdev->pending_comp_ios); 1401 1402 /* We use this to queue up compression operations as needed. */ 1403 TAILQ_INIT(&comp_bdev->queued_comp_ops); 1404 1405 comp_bdev->base_ch = spdk_bdev_get_io_channel(comp_bdev->base_desc); 1406 comp_bdev->reduce_thread = spdk_get_thread(); 1407 comp_bdev->poller = spdk_poller_register(comp_dev_poller, comp_bdev, 0); 1408 /* Now assign a q pair */ 1409 pthread_mutex_lock(&g_comp_device_qp_lock); 1410 TAILQ_FOREACH(device_qp, &g_comp_device_qp, link) { 1411 if (strcmp(device_qp->device->cdev_info.driver_name, comp_bdev->drv_name) == 0) { 1412 if (device_qp->thread == spdk_get_thread()) { 1413 comp_bdev->device_qp = device_qp; 1414 break; 1415 } 1416 if (device_qp->thread == NULL) { 1417 comp_bdev->device_qp = device_qp; 1418 device_qp->thread = spdk_get_thread(); 1419 break; 1420 } 1421 } 1422 } 1423 pthread_mutex_unlock(&g_comp_device_qp_lock); 1424 } 1425 comp_bdev->ch_count++; 1426 pthread_mutex_unlock(&comp_bdev->reduce_lock); 1427 1428 if (comp_bdev->device_qp != NULL) { 1429 return 0; 1430 } else { 1431 SPDK_ERRLOG("out of qpairs, cannot assign one to comp_bdev %p\n", comp_bdev); 1432 assert(false); 1433 return -ENOMEM; 1434 } 1435 } 1436 1437 static void 1438 _channel_cleanup(struct vbdev_compress *comp_bdev) 1439 { 1440 /* Note: comp_bdevs can share a device_qp if they are 1441 * on the same thread so we leave the device_qp element 1442 * alone for this comp_bdev and just clear the reduce thread. 1443 */ 1444 spdk_put_io_channel(comp_bdev->base_ch); 1445 comp_bdev->reduce_thread = NULL; 1446 spdk_poller_unregister(&comp_bdev->poller); 1447 } 1448 1449 /* Used to reroute destroy_ch to the correct thread */ 1450 static void 1451 _comp_bdev_ch_destroy_cb(void *arg) 1452 { 1453 struct vbdev_compress *comp_bdev = arg; 1454 1455 pthread_mutex_lock(&comp_bdev->reduce_lock); 1456 _channel_cleanup(comp_bdev); 1457 pthread_mutex_unlock(&comp_bdev->reduce_lock); 1458 } 1459 1460 /* We provide this callback for the SPDK channel code to destroy a channel 1461 * created with our create callback. We just need to undo anything we did 1462 * when we created. If this bdev used its own poller, we'd unregister it here. 1463 */ 1464 static void 1465 comp_bdev_ch_destroy_cb(void *io_device, void *ctx_buf) 1466 { 1467 struct vbdev_compress *comp_bdev = io_device; 1468 1469 pthread_mutex_lock(&comp_bdev->reduce_lock); 1470 comp_bdev->ch_count--; 1471 if (comp_bdev->ch_count == 0) { 1472 /* Send this request to the thread where the channel was created. */ 1473 if (comp_bdev->reduce_thread != spdk_get_thread()) { 1474 spdk_thread_send_msg(comp_bdev->reduce_thread, 1475 _comp_bdev_ch_destroy_cb, comp_bdev); 1476 } else { 1477 _channel_cleanup(comp_bdev); 1478 } 1479 } 1480 pthread_mutex_unlock(&comp_bdev->reduce_lock); 1481 } 1482 1483 /* RPC entry point for compression vbdev creation. */ 1484 int 1485 create_compress_bdev(const char *bdev_name, const char *pm_path) 1486 { 1487 struct spdk_bdev *bdev; 1488 1489 bdev = spdk_bdev_get_by_name(bdev_name); 1490 if (!bdev) { 1491 return -ENODEV; 1492 } 1493 1494 return vbdev_init_reduce(bdev, pm_path);; 1495 } 1496 1497 /* On init, just init the compress drivers. All metadata is stored on disk. */ 1498 static int 1499 vbdev_compress_init(void) 1500 { 1501 if (vbdev_init_compress_drivers()) { 1502 SPDK_ERRLOG("Error setting up compression devices\n"); 1503 return -EINVAL; 1504 } 1505 1506 return 0; 1507 } 1508 1509 /* Called when the entire module is being torn down. */ 1510 static void 1511 vbdev_compress_finish(void) 1512 { 1513 struct comp_device_qp *dev_qp; 1514 /* TODO: unload vol in a future patch */ 1515 1516 while ((dev_qp = TAILQ_FIRST(&g_comp_device_qp))) { 1517 TAILQ_REMOVE(&g_comp_device_qp, dev_qp, link); 1518 free(dev_qp); 1519 } 1520 pthread_mutex_destroy(&g_comp_device_qp_lock); 1521 1522 rte_mempool_free(g_comp_op_mp); 1523 rte_mempool_free(g_mbuf_mp); 1524 } 1525 1526 /* During init we'll be asked how much memory we'd like passed to us 1527 * in bev_io structures as context. Here's where we specify how 1528 * much context we want per IO. 1529 */ 1530 static int 1531 vbdev_compress_get_ctx_size(void) 1532 { 1533 return sizeof(struct comp_bdev_io); 1534 } 1535 1536 /* When we register our bdev this is how we specify our entry points. */ 1537 static const struct spdk_bdev_fn_table vbdev_compress_fn_table = { 1538 .destruct = vbdev_compress_destruct, 1539 .submit_request = vbdev_compress_submit_request, 1540 .io_type_supported = vbdev_compress_io_type_supported, 1541 .get_io_channel = vbdev_compress_get_io_channel, 1542 .dump_info_json = vbdev_compress_dump_info_json, 1543 .write_config_json = NULL, 1544 }; 1545 1546 static struct spdk_bdev_module compress_if = { 1547 .name = "compress", 1548 .module_init = vbdev_compress_init, 1549 .config_text = NULL, 1550 .get_ctx_size = vbdev_compress_get_ctx_size, 1551 .examine_disk = vbdev_compress_examine, 1552 .module_fini = vbdev_compress_finish, 1553 .config_json = vbdev_compress_config_json 1554 }; 1555 1556 SPDK_BDEV_MODULE_REGISTER(compress, &compress_if) 1557 1558 static int _set_compbdev_name(struct vbdev_compress *comp_bdev) 1559 { 1560 struct spdk_bdev_alias *aliases; 1561 1562 if (!TAILQ_EMPTY(spdk_bdev_get_aliases(comp_bdev->base_bdev))) { 1563 aliases = TAILQ_FIRST(spdk_bdev_get_aliases(comp_bdev->base_bdev)); 1564 comp_bdev->comp_bdev.name = spdk_sprintf_alloc("COMP_%s", aliases->alias); 1565 if (!comp_bdev->comp_bdev.name) { 1566 SPDK_ERRLOG("could not allocate comp_bdev name for alias\n"); 1567 return -ENOMEM; 1568 } 1569 } else { 1570 comp_bdev->comp_bdev.name = spdk_sprintf_alloc("COMP_%s", comp_bdev->base_bdev->name); 1571 if (!comp_bdev->comp_bdev.name) { 1572 SPDK_ERRLOG("could not allocate comp_bdev name for unique name\n"); 1573 return -ENOMEM; 1574 } 1575 } 1576 return 0; 1577 } 1578 1579 static void 1580 vbdev_compress_claim(struct vbdev_compress *comp_bdev) 1581 { 1582 int rc; 1583 1584 if (_set_compbdev_name(comp_bdev)) { 1585 goto error_bdev_name; 1586 } 1587 1588 /* Note: some of the fields below will change in the future - for example, 1589 * blockcnt specifically will not match (the compressed volume size will 1590 * be slightly less than the base bdev size) 1591 */ 1592 comp_bdev->comp_bdev.product_name = COMP_BDEV_NAME; 1593 comp_bdev->comp_bdev.write_cache = comp_bdev->base_bdev->write_cache; 1594 1595 if (strcmp(comp_bdev->drv_name, QAT_PMD) == 0) { 1596 comp_bdev->comp_bdev.required_alignment = 1597 spdk_max(spdk_u32log2(comp_bdev->base_bdev->blocklen), 1598 comp_bdev->base_bdev->required_alignment); 1599 SPDK_NOTICELOG("QAT in use: Required alignment set to %u\n", 1600 comp_bdev->comp_bdev.required_alignment); 1601 } else { 1602 comp_bdev->comp_bdev.required_alignment = comp_bdev->base_bdev->required_alignment; 1603 } 1604 comp_bdev->comp_bdev.optimal_io_boundary = 1605 comp_bdev->params.chunk_size / comp_bdev->params.logical_block_size; 1606 1607 comp_bdev->comp_bdev.split_on_optimal_io_boundary = true; 1608 1609 comp_bdev->comp_bdev.blocklen = comp_bdev->base_bdev->blocklen; 1610 comp_bdev->comp_bdev.blockcnt = comp_bdev->params.vol_size / comp_bdev->comp_bdev.blocklen; 1611 assert(comp_bdev->comp_bdev.blockcnt > 0); 1612 1613 /* This is the context that is passed to us when the bdev 1614 * layer calls in so we'll save our comp_bdev node here. 1615 */ 1616 comp_bdev->comp_bdev.ctxt = comp_bdev; 1617 comp_bdev->comp_bdev.fn_table = &vbdev_compress_fn_table; 1618 comp_bdev->comp_bdev.module = &compress_if; 1619 1620 pthread_mutex_init(&comp_bdev->reduce_lock, NULL); 1621 1622 rc = spdk_bdev_open(comp_bdev->base_bdev, true, vbdev_compress_base_bdev_hotremove_cb, 1623 comp_bdev->base_bdev, &comp_bdev->base_desc); 1624 if (rc) { 1625 SPDK_ERRLOG("could not open bdev %s\n", spdk_bdev_get_name(comp_bdev->base_bdev)); 1626 goto error_open; 1627 } 1628 1629 /* Save the thread where the base device is opened */ 1630 comp_bdev->thread = spdk_get_thread(); 1631 1632 spdk_io_device_register(comp_bdev, comp_bdev_ch_create_cb, comp_bdev_ch_destroy_cb, 1633 sizeof(struct comp_io_channel), 1634 comp_bdev->comp_bdev.name); 1635 1636 rc = spdk_bdev_module_claim_bdev(comp_bdev->base_bdev, comp_bdev->base_desc, 1637 comp_bdev->comp_bdev.module); 1638 if (rc) { 1639 SPDK_ERRLOG("could not claim bdev %s\n", spdk_bdev_get_name(comp_bdev->base_bdev)); 1640 goto error_claim; 1641 } 1642 1643 rc = spdk_bdev_register(&comp_bdev->comp_bdev); 1644 if (rc < 0) { 1645 SPDK_ERRLOG("trying to register bdev\n"); 1646 goto error_bdev_register; 1647 } 1648 1649 TAILQ_INSERT_TAIL(&g_vbdev_comp, comp_bdev, link); 1650 1651 SPDK_NOTICELOG("registered io_device and virtual bdev for: %s\n", comp_bdev->comp_bdev.name); 1652 1653 return; 1654 /* Error cleanup paths. */ 1655 error_bdev_register: 1656 spdk_bdev_module_release_bdev(comp_bdev->base_bdev); 1657 error_claim: 1658 spdk_io_device_unregister(comp_bdev, NULL); 1659 spdk_bdev_close(comp_bdev->base_desc); 1660 error_open: 1661 free(comp_bdev->comp_bdev.name); 1662 error_bdev_name: 1663 free(comp_bdev); 1664 } 1665 1666 static void 1667 _vbdev_compress_delete_done(void *_ctx) 1668 { 1669 struct vbdev_comp_delete_ctx *ctx = _ctx; 1670 1671 ctx->cb_fn(ctx->cb_arg, ctx->cb_rc); 1672 1673 free(ctx); 1674 } 1675 1676 static void 1677 vbdev_compress_delete_done(void *cb_arg, int bdeverrno) 1678 { 1679 struct vbdev_comp_delete_ctx *ctx = cb_arg; 1680 1681 ctx->cb_rc = bdeverrno; 1682 1683 if (ctx->orig_thread != spdk_get_thread()) { 1684 spdk_thread_send_msg(ctx->orig_thread, _vbdev_compress_delete_done, ctx); 1685 } else { 1686 _vbdev_compress_delete_done(ctx); 1687 } 1688 } 1689 1690 void 1691 bdev_compress_delete(const char *name, spdk_delete_compress_complete cb_fn, void *cb_arg) 1692 { 1693 struct vbdev_compress *comp_bdev = NULL; 1694 struct vbdev_comp_delete_ctx *ctx; 1695 1696 TAILQ_FOREACH(comp_bdev, &g_vbdev_comp, link) { 1697 if (strcmp(name, comp_bdev->comp_bdev.name) == 0) { 1698 break; 1699 } 1700 } 1701 1702 if (comp_bdev == NULL) { 1703 cb_fn(cb_arg, -ENODEV); 1704 return; 1705 } 1706 1707 ctx = calloc(1, sizeof(*ctx)); 1708 if (ctx == NULL) { 1709 SPDK_ERRLOG("Failed to allocate delete context\n"); 1710 cb_fn(cb_arg, -ENOMEM); 1711 return; 1712 } 1713 1714 /* Save these for after the vol is destroyed. */ 1715 ctx->cb_fn = cb_fn; 1716 ctx->cb_arg = cb_arg; 1717 ctx->orig_thread = spdk_get_thread(); 1718 1719 comp_bdev->delete_ctx = ctx; 1720 1721 /* Tell reducelib that we're done with this volume. */ 1722 if (comp_bdev->orphaned == false) { 1723 spdk_reduce_vol_unload(comp_bdev->vol, delete_vol_unload_cb, comp_bdev); 1724 } else { 1725 delete_vol_unload_cb(comp_bdev, 0); 1726 } 1727 } 1728 1729 static void 1730 _vbdev_reduce_load_cb(void *ctx) 1731 { 1732 struct spdk_bdev_desc *desc = ctx; 1733 1734 spdk_bdev_close(desc); 1735 } 1736 1737 /* Callback from reduce for then load is complete. We'll pass the vbdev_comp struct 1738 * used for initial metadata operations to claim where it will be further filled out 1739 * and added to the global list. 1740 */ 1741 static void 1742 vbdev_reduce_load_cb(void *cb_arg, struct spdk_reduce_vol *vol, int reduce_errno) 1743 { 1744 struct vbdev_compress *meta_ctx = cb_arg; 1745 int rc; 1746 1747 /* Done with metadata operations */ 1748 spdk_put_io_channel(meta_ctx->base_ch); 1749 /* Close the underlying bdev on its same opened thread. */ 1750 if (meta_ctx->thread && meta_ctx->thread != spdk_get_thread()) { 1751 spdk_thread_send_msg(meta_ctx->thread, _vbdev_reduce_load_cb, meta_ctx->base_desc); 1752 } else { 1753 spdk_bdev_close(meta_ctx->base_desc); 1754 } 1755 meta_ctx->base_desc = NULL; 1756 1757 if (reduce_errno == 0) { 1758 if (_set_pmd(meta_ctx) == false) { 1759 SPDK_ERRLOG("could not find required pmd\n"); 1760 goto err; 1761 } 1762 1763 /* Update information following volume load. */ 1764 meta_ctx->vol = vol; 1765 memcpy(&meta_ctx->params, spdk_reduce_vol_get_params(vol), 1766 sizeof(struct spdk_reduce_vol_params)); 1767 vbdev_compress_claim(meta_ctx); 1768 } else if (reduce_errno == -ENOENT) { 1769 if (_set_compbdev_name(meta_ctx)) { 1770 goto err; 1771 } 1772 1773 /* We still want to open and claim the backing device to protect the data until 1774 * either the pm metadata file is recovered or the comp bdev is deleted. 1775 */ 1776 rc = spdk_bdev_open(meta_ctx->base_bdev, true, vbdev_compress_base_bdev_hotremove_cb, 1777 meta_ctx->base_bdev, &meta_ctx->base_desc); 1778 if (rc) { 1779 SPDK_ERRLOG("could not open bdev %s\n", spdk_bdev_get_name(meta_ctx->base_bdev)); 1780 free(meta_ctx->comp_bdev.name); 1781 goto err; 1782 } 1783 1784 /* Save the thread where the base device is opened */ 1785 meta_ctx->thread = spdk_get_thread(); 1786 1787 meta_ctx->comp_bdev.module = &compress_if; 1788 pthread_mutex_init(&meta_ctx->reduce_lock, NULL); 1789 rc = spdk_bdev_module_claim_bdev(meta_ctx->base_bdev, meta_ctx->base_desc, 1790 meta_ctx->comp_bdev.module); 1791 if (rc) { 1792 SPDK_ERRLOG("could not claim bdev %s\n", spdk_bdev_get_name(meta_ctx->base_bdev)); 1793 spdk_bdev_close(meta_ctx->base_desc); 1794 free(meta_ctx->comp_bdev.name); 1795 goto err; 1796 } 1797 1798 meta_ctx->orphaned = true; 1799 TAILQ_INSERT_TAIL(&g_vbdev_comp, meta_ctx, link); 1800 } else { 1801 if (reduce_errno != -EILSEQ) { 1802 SPDK_ERRLOG("for vol %s, error %u\n", 1803 spdk_bdev_get_name(meta_ctx->base_bdev), reduce_errno); 1804 } 1805 goto err; 1806 } 1807 1808 spdk_bdev_module_examine_done(&compress_if); 1809 return; 1810 1811 err: 1812 free(meta_ctx); 1813 spdk_bdev_module_examine_done(&compress_if); 1814 } 1815 1816 /* Examine_disk entry point: will do a metadata load to see if this is ours, 1817 * and if so will go ahead and claim it. 1818 */ 1819 static void 1820 vbdev_compress_examine(struct spdk_bdev *bdev) 1821 { 1822 struct vbdev_compress *meta_ctx; 1823 int rc; 1824 1825 if (strcmp(bdev->product_name, COMP_BDEV_NAME) == 0) { 1826 spdk_bdev_module_examine_done(&compress_if); 1827 return; 1828 } 1829 1830 meta_ctx = _prepare_for_load_init(bdev); 1831 if (meta_ctx == NULL) { 1832 spdk_bdev_module_examine_done(&compress_if); 1833 return; 1834 } 1835 1836 rc = spdk_bdev_open(meta_ctx->base_bdev, false, vbdev_compress_base_bdev_hotremove_cb, 1837 meta_ctx->base_bdev, &meta_ctx->base_desc); 1838 if (rc) { 1839 SPDK_ERRLOG("could not open bdev %s\n", spdk_bdev_get_name(meta_ctx->base_bdev)); 1840 free(meta_ctx); 1841 spdk_bdev_module_examine_done(&compress_if); 1842 return; 1843 } 1844 1845 /* Save the thread where the base device is opened */ 1846 meta_ctx->thread = spdk_get_thread(); 1847 1848 meta_ctx->base_ch = spdk_bdev_get_io_channel(meta_ctx->base_desc); 1849 spdk_reduce_vol_load(&meta_ctx->backing_dev, vbdev_reduce_load_cb, meta_ctx); 1850 } 1851 1852 int 1853 compress_set_pmd(enum compress_pmd *opts) 1854 { 1855 g_opts = *opts; 1856 1857 return 0; 1858 } 1859 1860 SPDK_LOG_REGISTER_COMPONENT("vbdev_compress", SPDK_LOG_VBDEV_COMPRESS) 1861