1 /* SPDX-License-Identifier: BSD-3-Clause 2 * Copyright (C) 2018 Intel Corporation. 3 * All rights reserved. 4 * Copyright (c) 2021, 2022 NVIDIA CORPORATION & AFFILIATES. All rights reserved. 5 */ 6 7 #include "vbdev_compress.h" 8 9 #include "spdk/reduce.h" 10 #include "spdk/stdinc.h" 11 #include "spdk/rpc.h" 12 #include "spdk/env.h" 13 #include "spdk/endian.h" 14 #include "spdk/string.h" 15 #include "spdk/thread.h" 16 #include "spdk/util.h" 17 #include "spdk/bdev_module.h" 18 #include "spdk/likely.h" 19 #include "spdk/log.h" 20 #include "spdk/accel.h" 21 22 #include "spdk/accel_module.h" 23 24 25 #define CHUNK_SIZE (1024 * 16) 26 #define COMP_BDEV_NAME "compress" 27 #define BACKING_IO_SZ (4 * 1024) 28 29 /* This namespace UUID was generated using uuid_generate() method. */ 30 #define BDEV_COMPRESS_NAMESPACE_UUID "c3fad6da-832f-4cc0-9cdc-5c552b225e7b" 31 32 struct vbdev_comp_delete_ctx { 33 spdk_delete_compress_complete cb_fn; 34 void *cb_arg; 35 int cb_rc; 36 struct spdk_thread *orig_thread; 37 }; 38 39 /* List of virtual bdevs and associated info for each. */ 40 struct vbdev_compress { 41 struct spdk_bdev *base_bdev; /* the thing we're attaching to */ 42 struct spdk_bdev_desc *base_desc; /* its descriptor we get from open */ 43 struct spdk_io_channel *base_ch; /* IO channel of base device */ 44 struct spdk_bdev comp_bdev; /* the compression virtual bdev */ 45 struct comp_io_channel *comp_ch; /* channel associated with this bdev */ 46 struct spdk_io_channel *accel_channel; /* to communicate with the accel framework */ 47 struct spdk_thread *reduce_thread; 48 pthread_mutex_t reduce_lock; 49 uint32_t ch_count; 50 TAILQ_HEAD(, spdk_bdev_io) pending_comp_ios; /* outstanding operations to a comp library */ 51 struct spdk_poller *poller; /* completion poller */ 52 struct spdk_reduce_vol_params params; /* params for the reduce volume */ 53 struct spdk_reduce_backing_dev backing_dev; /* backing device info for the reduce volume */ 54 struct spdk_reduce_vol *vol; /* the reduce volume */ 55 struct vbdev_comp_delete_ctx *delete_ctx; 56 bool orphaned; /* base bdev claimed but comp_bdev not registered */ 57 int reduce_errno; 58 TAILQ_HEAD(, vbdev_comp_op) queued_comp_ops; 59 TAILQ_ENTRY(vbdev_compress) link; 60 struct spdk_thread *thread; /* thread where base device is opened */ 61 }; 62 static TAILQ_HEAD(, vbdev_compress) g_vbdev_comp = TAILQ_HEAD_INITIALIZER(g_vbdev_comp); 63 64 /* The comp vbdev channel struct. It is allocated and freed on my behalf by the io channel code. 65 */ 66 struct comp_io_channel { 67 struct spdk_io_channel_iter *iter; /* used with for_each_channel in reset */ 68 }; 69 70 /* Per I/O context for the compression vbdev. */ 71 struct comp_bdev_io { 72 struct comp_io_channel *comp_ch; /* used in completion handling */ 73 struct vbdev_compress *comp_bdev; /* vbdev associated with this IO */ 74 struct spdk_bdev_io_wait_entry bdev_io_wait; /* for bdev_io_wait */ 75 struct spdk_bdev_io *orig_io; /* the original IO */ 76 struct spdk_io_channel *ch; /* for resubmission */ 77 int status; /* save for completion on orig thread */ 78 }; 79 80 static void vbdev_compress_examine(struct spdk_bdev *bdev); 81 static int vbdev_compress_claim(struct vbdev_compress *comp_bdev); 82 static void vbdev_compress_queue_io(struct spdk_bdev_io *bdev_io); 83 struct vbdev_compress *_prepare_for_load_init(struct spdk_bdev_desc *bdev_desc, uint32_t lb_size); 84 static void vbdev_compress_submit_request(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io); 85 static void comp_bdev_ch_destroy_cb(void *io_device, void *ctx_buf); 86 static void vbdev_compress_delete_done(void *cb_arg, int bdeverrno); 87 static void _comp_reduce_resubmit_backing_io(void *_backing_io); 88 89 /* for completing rw requests on the orig IO thread. */ 90 static void 91 _reduce_rw_blocks_cb(void *arg) 92 { 93 struct comp_bdev_io *io_ctx = arg; 94 95 if (spdk_likely(io_ctx->status == 0)) { 96 spdk_bdev_io_complete(io_ctx->orig_io, SPDK_BDEV_IO_STATUS_SUCCESS); 97 } else if (io_ctx->status == -ENOMEM) { 98 vbdev_compress_queue_io(spdk_bdev_io_from_ctx(io_ctx)); 99 } else { 100 SPDK_ERRLOG("Failed to execute reduce api. %s\n", spdk_strerror(-io_ctx->status)); 101 spdk_bdev_io_complete(io_ctx->orig_io, SPDK_BDEV_IO_STATUS_FAILED); 102 } 103 } 104 105 /* Completion callback for r/w that were issued via reducelib. */ 106 static void 107 reduce_rw_blocks_cb(void *arg, int reduce_errno) 108 { 109 struct spdk_bdev_io *bdev_io = arg; 110 struct comp_bdev_io *io_ctx = (struct comp_bdev_io *)bdev_io->driver_ctx; 111 struct spdk_io_channel *ch = spdk_io_channel_from_ctx(io_ctx->comp_ch); 112 struct spdk_thread *orig_thread; 113 114 /* TODO: need to decide which error codes are bdev_io success vs failure; 115 * example examine calls reading metadata */ 116 117 io_ctx->status = reduce_errno; 118 119 /* Send this request to the orig IO thread. */ 120 orig_thread = spdk_io_channel_get_thread(ch); 121 122 spdk_thread_exec_msg(orig_thread, _reduce_rw_blocks_cb, io_ctx); 123 } 124 125 static int 126 _compress_operation(struct spdk_reduce_backing_dev *backing_dev, struct iovec *src_iovs, 127 int src_iovcnt, struct iovec *dst_iovs, 128 int dst_iovcnt, bool compress, void *cb_arg) 129 { 130 struct spdk_reduce_vol_cb_args *reduce_cb_arg = cb_arg; 131 struct vbdev_compress *comp_bdev = SPDK_CONTAINEROF(backing_dev, struct vbdev_compress, 132 backing_dev); 133 int rc; 134 135 if (compress) { 136 assert(dst_iovcnt == 1); 137 rc = spdk_accel_submit_compress(comp_bdev->accel_channel, dst_iovs[0].iov_base, dst_iovs[0].iov_len, 138 src_iovs, src_iovcnt, &reduce_cb_arg->output_size, 139 reduce_cb_arg->cb_fn, reduce_cb_arg->cb_arg); 140 } else { 141 rc = spdk_accel_submit_decompress(comp_bdev->accel_channel, dst_iovs, dst_iovcnt, 142 src_iovs, src_iovcnt, &reduce_cb_arg->output_size, 143 reduce_cb_arg->cb_fn, reduce_cb_arg->cb_arg); 144 } 145 146 return rc; 147 } 148 149 /* Entry point for reduce lib to issue a compress operation. */ 150 static void 151 _comp_reduce_compress(struct spdk_reduce_backing_dev *dev, 152 struct iovec *src_iovs, int src_iovcnt, 153 struct iovec *dst_iovs, int dst_iovcnt, 154 struct spdk_reduce_vol_cb_args *cb_arg) 155 { 156 int rc; 157 158 rc = _compress_operation(dev, src_iovs, src_iovcnt, dst_iovs, dst_iovcnt, true, cb_arg); 159 if (rc) { 160 SPDK_ERRLOG("with compress operation code %d (%s)\n", rc, spdk_strerror(-rc)); 161 cb_arg->cb_fn(cb_arg->cb_arg, rc); 162 } 163 } 164 165 /* Entry point for reduce lib to issue a decompress operation. */ 166 static void 167 _comp_reduce_decompress(struct spdk_reduce_backing_dev *dev, 168 struct iovec *src_iovs, int src_iovcnt, 169 struct iovec *dst_iovs, int dst_iovcnt, 170 struct spdk_reduce_vol_cb_args *cb_arg) 171 { 172 int rc; 173 174 rc = _compress_operation(dev, src_iovs, src_iovcnt, dst_iovs, dst_iovcnt, false, cb_arg); 175 if (rc) { 176 SPDK_ERRLOG("with decompress operation code %d (%s)\n", rc, spdk_strerror(-rc)); 177 cb_arg->cb_fn(cb_arg->cb_arg, rc); 178 } 179 } 180 181 static void 182 _comp_submit_write(void *ctx) 183 { 184 struct spdk_bdev_io *bdev_io = ctx; 185 struct vbdev_compress *comp_bdev = SPDK_CONTAINEROF(bdev_io->bdev, struct vbdev_compress, 186 comp_bdev); 187 188 spdk_reduce_vol_writev(comp_bdev->vol, bdev_io->u.bdev.iovs, bdev_io->u.bdev.iovcnt, 189 bdev_io->u.bdev.offset_blocks, bdev_io->u.bdev.num_blocks, 190 reduce_rw_blocks_cb, bdev_io); 191 } 192 193 static void 194 _comp_submit_read(void *ctx) 195 { 196 struct spdk_bdev_io *bdev_io = ctx; 197 struct vbdev_compress *comp_bdev = SPDK_CONTAINEROF(bdev_io->bdev, struct vbdev_compress, 198 comp_bdev); 199 200 spdk_reduce_vol_readv(comp_bdev->vol, bdev_io->u.bdev.iovs, bdev_io->u.bdev.iovcnt, 201 bdev_io->u.bdev.offset_blocks, bdev_io->u.bdev.num_blocks, 202 reduce_rw_blocks_cb, bdev_io); 203 } 204 205 206 /* Callback for getting a buf from the bdev pool in the event that the caller passed 207 * in NULL, we need to own the buffer so it doesn't get freed by another vbdev module 208 * beneath us before we're done with it. 209 */ 210 static void 211 comp_read_get_buf_cb(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io, bool success) 212 { 213 struct vbdev_compress *comp_bdev = SPDK_CONTAINEROF(bdev_io->bdev, struct vbdev_compress, 214 comp_bdev); 215 216 if (spdk_unlikely(!success)) { 217 SPDK_ERRLOG("Failed to get data buffer\n"); 218 reduce_rw_blocks_cb(bdev_io, -ENOMEM); 219 return; 220 } 221 222 spdk_thread_exec_msg(comp_bdev->reduce_thread, _comp_submit_read, bdev_io); 223 } 224 225 /* Called when someone above submits IO to this vbdev. */ 226 static void 227 vbdev_compress_submit_request(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io) 228 { 229 struct comp_bdev_io *io_ctx = (struct comp_bdev_io *)bdev_io->driver_ctx; 230 struct vbdev_compress *comp_bdev = SPDK_CONTAINEROF(bdev_io->bdev, struct vbdev_compress, 231 comp_bdev); 232 struct comp_io_channel *comp_ch = spdk_io_channel_get_ctx(ch); 233 234 memset(io_ctx, 0, sizeof(struct comp_bdev_io)); 235 io_ctx->comp_bdev = comp_bdev; 236 io_ctx->comp_ch = comp_ch; 237 io_ctx->orig_io = bdev_io; 238 239 switch (bdev_io->type) { 240 case SPDK_BDEV_IO_TYPE_READ: 241 spdk_bdev_io_get_buf(bdev_io, comp_read_get_buf_cb, 242 bdev_io->u.bdev.num_blocks * bdev_io->bdev->blocklen); 243 return; 244 case SPDK_BDEV_IO_TYPE_WRITE: 245 spdk_thread_exec_msg(comp_bdev->reduce_thread, _comp_submit_write, bdev_io); 246 return; 247 /* TODO support RESET in future patch in the series */ 248 case SPDK_BDEV_IO_TYPE_RESET: 249 case SPDK_BDEV_IO_TYPE_WRITE_ZEROES: 250 case SPDK_BDEV_IO_TYPE_UNMAP: 251 case SPDK_BDEV_IO_TYPE_FLUSH: 252 default: 253 SPDK_ERRLOG("Unknown I/O type %d\n", bdev_io->type); 254 spdk_bdev_io_complete(io_ctx->orig_io, SPDK_BDEV_IO_STATUS_FAILED); 255 break; 256 } 257 } 258 259 static bool 260 vbdev_compress_io_type_supported(void *ctx, enum spdk_bdev_io_type io_type) 261 { 262 struct vbdev_compress *comp_bdev = (struct vbdev_compress *)ctx; 263 264 switch (io_type) { 265 case SPDK_BDEV_IO_TYPE_READ: 266 case SPDK_BDEV_IO_TYPE_WRITE: 267 return spdk_bdev_io_type_supported(comp_bdev->base_bdev, io_type); 268 case SPDK_BDEV_IO_TYPE_UNMAP: 269 case SPDK_BDEV_IO_TYPE_RESET: 270 case SPDK_BDEV_IO_TYPE_FLUSH: 271 case SPDK_BDEV_IO_TYPE_WRITE_ZEROES: 272 default: 273 return false; 274 } 275 } 276 277 /* Resubmission function used by the bdev layer when a queued IO is ready to be 278 * submitted. 279 */ 280 static void 281 vbdev_compress_resubmit_io(void *arg) 282 { 283 struct spdk_bdev_io *bdev_io = (struct spdk_bdev_io *)arg; 284 struct comp_bdev_io *io_ctx = (struct comp_bdev_io *)bdev_io->driver_ctx; 285 286 vbdev_compress_submit_request(io_ctx->ch, bdev_io); 287 } 288 289 /* Used to queue an IO in the event of resource issues. */ 290 static void 291 vbdev_compress_queue_io(struct spdk_bdev_io *bdev_io) 292 { 293 struct comp_bdev_io *io_ctx = (struct comp_bdev_io *)bdev_io->driver_ctx; 294 int rc; 295 296 io_ctx->bdev_io_wait.bdev = bdev_io->bdev; 297 io_ctx->bdev_io_wait.cb_fn = vbdev_compress_resubmit_io; 298 io_ctx->bdev_io_wait.cb_arg = bdev_io; 299 300 rc = spdk_bdev_queue_io_wait(bdev_io->bdev, io_ctx->comp_bdev->base_ch, &io_ctx->bdev_io_wait); 301 if (rc) { 302 SPDK_ERRLOG("Queue io failed in vbdev_compress_queue_io, rc=%d.\n", rc); 303 assert(false); 304 spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED); 305 } 306 } 307 308 /* Callback for unregistering the IO device. */ 309 static void 310 _device_unregister_cb(void *io_device) 311 { 312 struct vbdev_compress *comp_bdev = io_device; 313 314 /* Done with this comp_bdev. */ 315 pthread_mutex_destroy(&comp_bdev->reduce_lock); 316 free(comp_bdev->comp_bdev.name); 317 free(comp_bdev); 318 } 319 320 static void 321 _vbdev_compress_destruct_cb(void *ctx) 322 { 323 struct vbdev_compress *comp_bdev = ctx; 324 325 TAILQ_REMOVE(&g_vbdev_comp, comp_bdev, link); 326 spdk_bdev_module_release_bdev(comp_bdev->base_bdev); 327 /* Close the underlying bdev on its same opened thread. */ 328 spdk_bdev_close(comp_bdev->base_desc); 329 comp_bdev->vol = NULL; 330 if (comp_bdev->orphaned == false) { 331 spdk_io_device_unregister(comp_bdev, _device_unregister_cb); 332 } else { 333 vbdev_compress_delete_done(comp_bdev->delete_ctx, 0); 334 _device_unregister_cb(comp_bdev); 335 } 336 } 337 338 static void 339 vbdev_compress_destruct_cb(void *cb_arg, int reduce_errno) 340 { 341 struct vbdev_compress *comp_bdev = (struct vbdev_compress *)cb_arg; 342 343 if (reduce_errno) { 344 SPDK_ERRLOG("number %d\n", reduce_errno); 345 } else { 346 if (comp_bdev->thread && comp_bdev->thread != spdk_get_thread()) { 347 spdk_thread_send_msg(comp_bdev->thread, 348 _vbdev_compress_destruct_cb, comp_bdev); 349 } else { 350 _vbdev_compress_destruct_cb(comp_bdev); 351 } 352 } 353 } 354 355 static void 356 _reduce_destroy_cb(void *ctx, int reduce_errno) 357 { 358 struct vbdev_compress *comp_bdev = (struct vbdev_compress *)ctx; 359 360 if (reduce_errno) { 361 SPDK_ERRLOG("number %d\n", reduce_errno); 362 } 363 364 comp_bdev->vol = NULL; 365 spdk_put_io_channel(comp_bdev->base_ch); 366 if (comp_bdev->orphaned == false) { 367 spdk_bdev_unregister(&comp_bdev->comp_bdev, vbdev_compress_delete_done, 368 comp_bdev->delete_ctx); 369 } else { 370 vbdev_compress_destruct_cb((void *)comp_bdev, 0); 371 } 372 373 } 374 375 static void 376 _delete_vol_unload_cb(void *ctx) 377 { 378 struct vbdev_compress *comp_bdev = ctx; 379 380 /* FIXME: Assert if these conditions are not satisfied for now. */ 381 assert(!comp_bdev->reduce_thread || 382 comp_bdev->reduce_thread == spdk_get_thread()); 383 384 /* reducelib needs a channel to comm with the backing device */ 385 comp_bdev->base_ch = spdk_bdev_get_io_channel(comp_bdev->base_desc); 386 387 /* Clean the device before we free our resources. */ 388 spdk_reduce_vol_destroy(&comp_bdev->backing_dev, _reduce_destroy_cb, comp_bdev); 389 } 390 391 /* Called by reduceLib after performing unload vol actions */ 392 static void 393 delete_vol_unload_cb(void *cb_arg, int reduce_errno) 394 { 395 struct vbdev_compress *comp_bdev = (struct vbdev_compress *)cb_arg; 396 397 if (reduce_errno) { 398 SPDK_ERRLOG("number %d\n", reduce_errno); 399 /* FIXME: callback should be executed. */ 400 return; 401 } 402 403 pthread_mutex_lock(&comp_bdev->reduce_lock); 404 if (comp_bdev->reduce_thread && comp_bdev->reduce_thread != spdk_get_thread()) { 405 spdk_thread_send_msg(comp_bdev->reduce_thread, 406 _delete_vol_unload_cb, comp_bdev); 407 pthread_mutex_unlock(&comp_bdev->reduce_lock); 408 } else { 409 pthread_mutex_unlock(&comp_bdev->reduce_lock); 410 411 _delete_vol_unload_cb(comp_bdev); 412 } 413 } 414 415 const char * 416 compress_get_name(const struct vbdev_compress *comp_bdev) 417 { 418 return comp_bdev->comp_bdev.name; 419 } 420 421 struct vbdev_compress * 422 compress_bdev_first(void) 423 { 424 struct vbdev_compress *comp_bdev; 425 426 comp_bdev = TAILQ_FIRST(&g_vbdev_comp); 427 428 return comp_bdev; 429 } 430 431 struct vbdev_compress * 432 compress_bdev_next(struct vbdev_compress *prev) 433 { 434 struct vbdev_compress *comp_bdev; 435 436 comp_bdev = TAILQ_NEXT(prev, link); 437 438 return comp_bdev; 439 } 440 441 bool 442 compress_has_orphan(const char *name) 443 { 444 struct vbdev_compress *comp_bdev; 445 446 TAILQ_FOREACH(comp_bdev, &g_vbdev_comp, link) { 447 if (comp_bdev->orphaned && strcmp(name, comp_bdev->comp_bdev.name) == 0) { 448 return true; 449 } 450 } 451 return false; 452 } 453 454 /* Called after we've unregistered following a hot remove callback. 455 * Our finish entry point will be called next. 456 */ 457 static int 458 vbdev_compress_destruct(void *ctx) 459 { 460 struct vbdev_compress *comp_bdev = (struct vbdev_compress *)ctx; 461 462 if (comp_bdev->vol != NULL) { 463 /* Tell reducelib that we're done with this volume. */ 464 spdk_reduce_vol_unload(comp_bdev->vol, vbdev_compress_destruct_cb, comp_bdev); 465 } else { 466 vbdev_compress_destruct_cb(comp_bdev, 0); 467 } 468 469 return 0; 470 } 471 472 /* We supplied this as an entry point for upper layers who want to communicate to this 473 * bdev. This is how they get a channel. 474 */ 475 static struct spdk_io_channel * 476 vbdev_compress_get_io_channel(void *ctx) 477 { 478 struct vbdev_compress *comp_bdev = (struct vbdev_compress *)ctx; 479 480 /* The IO channel code will allocate a channel for us which consists of 481 * the SPDK channel structure plus the size of our comp_io_channel struct 482 * that we passed in when we registered our IO device. It will then call 483 * our channel create callback to populate any elements that we need to 484 * update. 485 */ 486 return spdk_get_io_channel(comp_bdev); 487 } 488 489 /* This is the output for bdev_get_bdevs() for this vbdev */ 490 static int 491 vbdev_compress_dump_info_json(void *ctx, struct spdk_json_write_ctx *w) 492 { 493 struct vbdev_compress *comp_bdev = (struct vbdev_compress *)ctx; 494 495 spdk_json_write_name(w, "compress"); 496 spdk_json_write_object_begin(w); 497 spdk_json_write_named_string(w, "name", spdk_bdev_get_name(&comp_bdev->comp_bdev)); 498 spdk_json_write_named_string(w, "base_bdev_name", spdk_bdev_get_name(comp_bdev->base_bdev)); 499 spdk_json_write_named_string(w, "pm_path", spdk_reduce_vol_get_pm_path(comp_bdev->vol)); 500 spdk_json_write_object_end(w); 501 502 return 0; 503 } 504 505 static int 506 vbdev_compress_config_json(struct spdk_json_write_ctx *w) 507 { 508 /* Nothing to dump as compress bdev configuration is saved on physical device. */ 509 return 0; 510 } 511 512 struct vbdev_init_reduce_ctx { 513 struct vbdev_compress *comp_bdev; 514 int status; 515 bdev_compress_create_cb cb_fn; 516 void *cb_ctx; 517 }; 518 519 static void 520 _vbdev_reduce_init_unload_cb(void *ctx, int reduce_errno) 521 { 522 } 523 524 static void 525 _vbdev_reduce_init_cb(void *ctx) 526 { 527 struct vbdev_init_reduce_ctx *init_ctx = ctx; 528 struct vbdev_compress *comp_bdev = init_ctx->comp_bdev; 529 int rc; 530 531 assert(comp_bdev->base_desc != NULL); 532 533 /* We're done with metadata operations */ 534 spdk_put_io_channel(comp_bdev->base_ch); 535 536 if (comp_bdev->vol) { 537 rc = vbdev_compress_claim(comp_bdev); 538 if (rc == 0) { 539 init_ctx->cb_fn(init_ctx->cb_ctx, rc); 540 free(init_ctx); 541 return; 542 } else { 543 spdk_reduce_vol_unload(comp_bdev->vol, _vbdev_reduce_init_unload_cb, NULL); 544 } 545 init_ctx->cb_fn(init_ctx->cb_ctx, rc); 546 } 547 548 /* Close the underlying bdev on its same opened thread. */ 549 spdk_bdev_close(comp_bdev->base_desc); 550 free(comp_bdev); 551 free(init_ctx); 552 } 553 554 /* Callback from reduce for when init is complete. We'll pass the vbdev_comp struct 555 * used for initial metadata operations to claim where it will be further filled out 556 * and added to the global list. 557 */ 558 static void 559 vbdev_reduce_init_cb(void *cb_arg, struct spdk_reduce_vol *vol, int reduce_errno) 560 { 561 struct vbdev_init_reduce_ctx *init_ctx = cb_arg; 562 struct vbdev_compress *comp_bdev = init_ctx->comp_bdev; 563 564 if (reduce_errno == 0) { 565 comp_bdev->vol = vol; 566 } else { 567 SPDK_ERRLOG("for vol %s, error %u\n", 568 spdk_bdev_get_name(comp_bdev->base_bdev), reduce_errno); 569 init_ctx->cb_fn(init_ctx->cb_ctx, reduce_errno); 570 } 571 572 init_ctx->status = reduce_errno; 573 574 if (comp_bdev->thread && comp_bdev->thread != spdk_get_thread()) { 575 spdk_thread_send_msg(comp_bdev->thread, _vbdev_reduce_init_cb, init_ctx); 576 } else { 577 _vbdev_reduce_init_cb(init_ctx); 578 } 579 } 580 581 /* Callback for the function used by reduceLib to perform IO to/from the backing device. We just 582 * call the callback provided by reduceLib when it called the read/write/unmap function and 583 * free the bdev_io. 584 */ 585 static void 586 comp_reduce_io_cb(struct spdk_bdev_io *bdev_io, bool success, void *arg) 587 { 588 struct spdk_reduce_vol_cb_args *cb_args = arg; 589 int reduce_errno; 590 591 if (success) { 592 reduce_errno = 0; 593 } else { 594 reduce_errno = -EIO; 595 } 596 spdk_bdev_free_io(bdev_io); 597 cb_args->cb_fn(cb_args->cb_arg, reduce_errno); 598 } 599 600 static void 601 _comp_backing_bdev_queue_io_wait(struct vbdev_compress *comp_bdev, 602 struct spdk_reduce_backing_io *backing_io) 603 { 604 struct spdk_bdev_io_wait_entry *waitq_entry; 605 int rc; 606 607 waitq_entry = (struct spdk_bdev_io_wait_entry *) &backing_io->user_ctx; 608 waitq_entry->bdev = spdk_bdev_desc_get_bdev(comp_bdev->base_desc); 609 waitq_entry->cb_fn = _comp_reduce_resubmit_backing_io; 610 waitq_entry->cb_arg = backing_io; 611 612 rc = spdk_bdev_queue_io_wait(waitq_entry->bdev, comp_bdev->base_ch, waitq_entry); 613 if (rc) { 614 SPDK_ERRLOG("Queue io failed in _comp_backing_bdev_queue_io_wait, rc=%d.\n", rc); 615 assert(false); 616 backing_io->backing_cb_args->cb_fn(backing_io->backing_cb_args->cb_arg, rc); 617 } 618 } 619 620 static void 621 _comp_backing_bdev_read(struct spdk_reduce_backing_io *backing_io) 622 { 623 struct spdk_reduce_vol_cb_args *backing_cb_args = backing_io->backing_cb_args; 624 struct vbdev_compress *comp_bdev = SPDK_CONTAINEROF(backing_io->dev, struct vbdev_compress, 625 backing_dev); 626 int rc; 627 628 rc = spdk_bdev_readv_blocks(comp_bdev->base_desc, comp_bdev->base_ch, 629 backing_io->iov, backing_io->iovcnt, 630 backing_io->lba, backing_io->lba_count, 631 comp_reduce_io_cb, 632 backing_cb_args); 633 634 if (rc) { 635 if (rc == -ENOMEM) { 636 _comp_backing_bdev_queue_io_wait(comp_bdev, backing_io); 637 return; 638 } else { 639 SPDK_ERRLOG("submitting readv request, rc=%d\n", rc); 640 } 641 backing_cb_args->cb_fn(backing_cb_args->cb_arg, rc); 642 } 643 } 644 645 static void 646 _comp_backing_bdev_write(struct spdk_reduce_backing_io *backing_io) 647 { 648 struct spdk_reduce_vol_cb_args *backing_cb_args = backing_io->backing_cb_args; 649 struct vbdev_compress *comp_bdev = SPDK_CONTAINEROF(backing_io->dev, struct vbdev_compress, 650 backing_dev); 651 int rc; 652 653 rc = spdk_bdev_writev_blocks(comp_bdev->base_desc, comp_bdev->base_ch, 654 backing_io->iov, backing_io->iovcnt, 655 backing_io->lba, backing_io->lba_count, 656 comp_reduce_io_cb, 657 backing_cb_args); 658 659 if (rc) { 660 if (rc == -ENOMEM) { 661 _comp_backing_bdev_queue_io_wait(comp_bdev, backing_io); 662 return; 663 } else { 664 SPDK_ERRLOG("error submitting writev request, rc=%d\n", rc); 665 } 666 backing_cb_args->cb_fn(backing_cb_args->cb_arg, rc); 667 } 668 } 669 670 static void 671 _comp_backing_bdev_unmap(struct spdk_reduce_backing_io *backing_io) 672 { 673 struct spdk_reduce_vol_cb_args *backing_cb_args = backing_io->backing_cb_args; 674 struct vbdev_compress *comp_bdev = SPDK_CONTAINEROF(backing_io->dev, struct vbdev_compress, 675 backing_dev); 676 int rc; 677 678 rc = spdk_bdev_unmap_blocks(comp_bdev->base_desc, comp_bdev->base_ch, 679 backing_io->lba, backing_io->lba_count, 680 comp_reduce_io_cb, 681 backing_cb_args); 682 683 if (rc) { 684 if (rc == -ENOMEM) { 685 _comp_backing_bdev_queue_io_wait(comp_bdev, backing_io); 686 return; 687 } else { 688 SPDK_ERRLOG("submitting unmap request, rc=%d\n", rc); 689 } 690 backing_cb_args->cb_fn(backing_cb_args->cb_arg, rc); 691 } 692 } 693 694 /* This is the function provided to the reduceLib for sending reads/writes/unmaps 695 * directly to the backing device. 696 */ 697 static void 698 _comp_reduce_submit_backing_io(struct spdk_reduce_backing_io *backing_io) 699 { 700 switch (backing_io->backing_io_type) { 701 case SPDK_REDUCE_BACKING_IO_WRITE: 702 _comp_backing_bdev_write(backing_io); 703 break; 704 case SPDK_REDUCE_BACKING_IO_READ: 705 _comp_backing_bdev_read(backing_io); 706 break; 707 case SPDK_REDUCE_BACKING_IO_UNMAP: 708 _comp_backing_bdev_unmap(backing_io); 709 break; 710 default: 711 SPDK_ERRLOG("Unknown I/O type %d\n", backing_io->backing_io_type); 712 backing_io->backing_cb_args->cb_fn(backing_io->backing_cb_args->cb_arg, -EINVAL); 713 break; 714 } 715 } 716 717 static void 718 _comp_reduce_resubmit_backing_io(void *_backing_io) 719 { 720 struct spdk_reduce_backing_io *backing_io = _backing_io; 721 722 _comp_reduce_submit_backing_io(backing_io); 723 } 724 725 /* Called by reduceLib after performing unload vol actions following base bdev hotremove */ 726 static void 727 bdev_hotremove_vol_unload_cb(void *cb_arg, int reduce_errno) 728 { 729 struct vbdev_compress *comp_bdev = (struct vbdev_compress *)cb_arg; 730 731 if (reduce_errno) { 732 SPDK_ERRLOG("number %d\n", reduce_errno); 733 } 734 735 comp_bdev->vol = NULL; 736 spdk_bdev_unregister(&comp_bdev->comp_bdev, NULL, NULL); 737 } 738 739 static void 740 vbdev_compress_base_bdev_hotremove_cb(struct spdk_bdev *bdev_find) 741 { 742 struct vbdev_compress *comp_bdev, *tmp; 743 744 TAILQ_FOREACH_SAFE(comp_bdev, &g_vbdev_comp, link, tmp) { 745 if (bdev_find == comp_bdev->base_bdev) { 746 /* Tell reduceLib that we're done with this volume. */ 747 spdk_reduce_vol_unload(comp_bdev->vol, bdev_hotremove_vol_unload_cb, comp_bdev); 748 } 749 } 750 } 751 752 /* Called when the underlying base bdev triggers asynchronous event such as bdev removal. */ 753 static void 754 vbdev_compress_base_bdev_event_cb(enum spdk_bdev_event_type type, struct spdk_bdev *bdev, 755 void *event_ctx) 756 { 757 switch (type) { 758 case SPDK_BDEV_EVENT_REMOVE: 759 vbdev_compress_base_bdev_hotremove_cb(bdev); 760 break; 761 default: 762 SPDK_NOTICELOG("Unsupported bdev event: type %d\n", type); 763 break; 764 } 765 } 766 767 /* TODO: determine which parms we want user configurable, HC for now 768 * params.vol_size 769 * params.chunk_size 770 * compression PMD, algorithm, window size, comp level, etc. 771 * DEV_MD_PATH 772 */ 773 774 /* Common function for init and load to allocate and populate the minimal 775 * information for reducelib to init or load. 776 */ 777 struct vbdev_compress * 778 _prepare_for_load_init(struct spdk_bdev_desc *bdev_desc, uint32_t lb_size) 779 { 780 struct vbdev_compress *comp_bdev; 781 struct spdk_bdev *bdev; 782 783 comp_bdev = calloc(1, sizeof(struct vbdev_compress)); 784 if (comp_bdev == NULL) { 785 SPDK_ERRLOG("failed to alloc comp_bdev\n"); 786 return NULL; 787 } 788 789 comp_bdev->backing_dev.submit_backing_io = _comp_reduce_submit_backing_io; 790 comp_bdev->backing_dev.compress = _comp_reduce_compress; 791 comp_bdev->backing_dev.decompress = _comp_reduce_decompress; 792 793 comp_bdev->base_desc = bdev_desc; 794 bdev = spdk_bdev_desc_get_bdev(bdev_desc); 795 comp_bdev->base_bdev = bdev; 796 797 comp_bdev->backing_dev.blocklen = bdev->blocklen; 798 comp_bdev->backing_dev.blockcnt = bdev->blockcnt; 799 800 comp_bdev->backing_dev.user_ctx_size = sizeof(struct spdk_bdev_io_wait_entry); 801 802 comp_bdev->params.chunk_size = CHUNK_SIZE; 803 if (lb_size == 0) { 804 comp_bdev->params.logical_block_size = bdev->blocklen; 805 } else { 806 comp_bdev->params.logical_block_size = lb_size; 807 } 808 809 comp_bdev->params.backing_io_unit_size = BACKING_IO_SZ; 810 return comp_bdev; 811 } 812 813 /* Call reducelib to initialize a new volume */ 814 static int 815 vbdev_init_reduce(const char *bdev_name, const char *pm_path, uint32_t lb_size, 816 bdev_compress_create_cb cb_fn, void *cb_arg) 817 { 818 struct spdk_bdev_desc *bdev_desc = NULL; 819 struct vbdev_init_reduce_ctx *init_ctx; 820 struct vbdev_compress *comp_bdev; 821 int rc; 822 823 init_ctx = calloc(1, sizeof(*init_ctx)); 824 if (init_ctx == NULL) { 825 SPDK_ERRLOG("failed to alloc init contexts\n"); 826 return - ENOMEM; 827 } 828 829 init_ctx->cb_fn = cb_fn; 830 init_ctx->cb_ctx = cb_arg; 831 832 rc = spdk_bdev_open_ext(bdev_name, true, vbdev_compress_base_bdev_event_cb, 833 NULL, &bdev_desc); 834 if (rc) { 835 SPDK_ERRLOG("could not open bdev %s, error %s\n", bdev_name, spdk_strerror(-rc)); 836 free(init_ctx); 837 return rc; 838 } 839 840 comp_bdev = _prepare_for_load_init(bdev_desc, lb_size); 841 if (comp_bdev == NULL) { 842 free(init_ctx); 843 spdk_bdev_close(bdev_desc); 844 return -EINVAL; 845 } 846 847 init_ctx->comp_bdev = comp_bdev; 848 849 /* Save the thread where the base device is opened */ 850 comp_bdev->thread = spdk_get_thread(); 851 852 comp_bdev->base_ch = spdk_bdev_get_io_channel(comp_bdev->base_desc); 853 854 spdk_reduce_vol_init(&comp_bdev->params, &comp_bdev->backing_dev, 855 pm_path, 856 vbdev_reduce_init_cb, 857 init_ctx); 858 return 0; 859 } 860 861 /* We provide this callback for the SPDK channel code to create a channel using 862 * the channel struct we provided in our module get_io_channel() entry point. Here 863 * we get and save off an underlying base channel of the device below us so that 864 * we can communicate with the base bdev on a per channel basis. If we needed 865 * our own poller for this vbdev, we'd register it here. 866 */ 867 static int 868 comp_bdev_ch_create_cb(void *io_device, void *ctx_buf) 869 { 870 struct vbdev_compress *comp_bdev = io_device; 871 872 /* Now set the reduce channel if it's not already set. */ 873 pthread_mutex_lock(&comp_bdev->reduce_lock); 874 if (comp_bdev->ch_count == 0) { 875 /* We use this queue to track outstanding IO in our layer. */ 876 TAILQ_INIT(&comp_bdev->pending_comp_ios); 877 878 /* We use this to queue up compression operations as needed. */ 879 TAILQ_INIT(&comp_bdev->queued_comp_ops); 880 881 comp_bdev->base_ch = spdk_bdev_get_io_channel(comp_bdev->base_desc); 882 comp_bdev->reduce_thread = spdk_get_thread(); 883 comp_bdev->accel_channel = spdk_accel_get_io_channel(); 884 } 885 comp_bdev->ch_count++; 886 pthread_mutex_unlock(&comp_bdev->reduce_lock); 887 888 return 0; 889 } 890 891 static void 892 _channel_cleanup(struct vbdev_compress *comp_bdev) 893 { 894 spdk_put_io_channel(comp_bdev->base_ch); 895 spdk_put_io_channel(comp_bdev->accel_channel); 896 comp_bdev->reduce_thread = NULL; 897 } 898 899 /* Used to reroute destroy_ch to the correct thread */ 900 static void 901 _comp_bdev_ch_destroy_cb(void *arg) 902 { 903 struct vbdev_compress *comp_bdev = arg; 904 905 pthread_mutex_lock(&comp_bdev->reduce_lock); 906 _channel_cleanup(comp_bdev); 907 pthread_mutex_unlock(&comp_bdev->reduce_lock); 908 } 909 910 /* We provide this callback for the SPDK channel code to destroy a channel 911 * created with our create callback. We just need to undo anything we did 912 * when we created. If this bdev used its own poller, we'd unregister it here. 913 */ 914 static void 915 comp_bdev_ch_destroy_cb(void *io_device, void *ctx_buf) 916 { 917 struct vbdev_compress *comp_bdev = io_device; 918 919 pthread_mutex_lock(&comp_bdev->reduce_lock); 920 comp_bdev->ch_count--; 921 if (comp_bdev->ch_count == 0) { 922 /* Send this request to the thread where the channel was created. */ 923 if (comp_bdev->reduce_thread != spdk_get_thread()) { 924 spdk_thread_send_msg(comp_bdev->reduce_thread, 925 _comp_bdev_ch_destroy_cb, comp_bdev); 926 } else { 927 _channel_cleanup(comp_bdev); 928 } 929 } 930 pthread_mutex_unlock(&comp_bdev->reduce_lock); 931 } 932 933 /* RPC entry point for compression vbdev creation. */ 934 int 935 create_compress_bdev(const char *bdev_name, const char *pm_path, uint32_t lb_size, 936 bdev_compress_create_cb cb_fn, void *cb_arg) 937 { 938 struct vbdev_compress *comp_bdev = NULL; 939 struct stat info; 940 941 if (stat(pm_path, &info) != 0) { 942 SPDK_ERRLOG("PM path %s does not exist.\n", pm_path); 943 return -EINVAL; 944 } else if (!S_ISDIR(info.st_mode)) { 945 SPDK_ERRLOG("PM path %s is not a directory.\n", pm_path); 946 return -EINVAL; 947 } 948 949 if ((lb_size != 0) && (lb_size != LB_SIZE_4K) && (lb_size != LB_SIZE_512B)) { 950 SPDK_ERRLOG("Logical block size must be 512 or 4096\n"); 951 return -EINVAL; 952 } 953 954 TAILQ_FOREACH(comp_bdev, &g_vbdev_comp, link) { 955 if (strcmp(bdev_name, comp_bdev->base_bdev->name) == 0) { 956 SPDK_ERRLOG("Bass bdev %s already being used for a compress bdev\n", bdev_name); 957 return -EBUSY; 958 } 959 } 960 return vbdev_init_reduce(bdev_name, pm_path, lb_size, cb_fn, cb_arg); 961 } 962 963 static int 964 vbdev_compress_init(void) 965 { 966 return 0; 967 } 968 969 /* Called when the entire module is being torn down. */ 970 static void 971 vbdev_compress_finish(void) 972 { 973 /* TODO: unload vol in a future patch */ 974 } 975 976 /* During init we'll be asked how much memory we'd like passed to us 977 * in bev_io structures as context. Here's where we specify how 978 * much context we want per IO. 979 */ 980 static int 981 vbdev_compress_get_ctx_size(void) 982 { 983 return sizeof(struct comp_bdev_io); 984 } 985 986 /* When we register our bdev this is how we specify our entry points. */ 987 static const struct spdk_bdev_fn_table vbdev_compress_fn_table = { 988 .destruct = vbdev_compress_destruct, 989 .submit_request = vbdev_compress_submit_request, 990 .io_type_supported = vbdev_compress_io_type_supported, 991 .get_io_channel = vbdev_compress_get_io_channel, 992 .dump_info_json = vbdev_compress_dump_info_json, 993 .write_config_json = NULL, 994 }; 995 996 static struct spdk_bdev_module compress_if = { 997 .name = "compress", 998 .module_init = vbdev_compress_init, 999 .get_ctx_size = vbdev_compress_get_ctx_size, 1000 .examine_disk = vbdev_compress_examine, 1001 .module_fini = vbdev_compress_finish, 1002 .config_json = vbdev_compress_config_json 1003 }; 1004 1005 SPDK_BDEV_MODULE_REGISTER(compress, &compress_if) 1006 1007 static int _set_compbdev_name(struct vbdev_compress *comp_bdev) 1008 { 1009 struct spdk_bdev_alias *aliases; 1010 1011 if (!TAILQ_EMPTY(spdk_bdev_get_aliases(comp_bdev->base_bdev))) { 1012 aliases = TAILQ_FIRST(spdk_bdev_get_aliases(comp_bdev->base_bdev)); 1013 comp_bdev->comp_bdev.name = spdk_sprintf_alloc("COMP_%s", aliases->alias.name); 1014 if (!comp_bdev->comp_bdev.name) { 1015 SPDK_ERRLOG("could not allocate comp_bdev name for alias\n"); 1016 return -ENOMEM; 1017 } 1018 } else { 1019 comp_bdev->comp_bdev.name = spdk_sprintf_alloc("COMP_%s", comp_bdev->base_bdev->name); 1020 if (!comp_bdev->comp_bdev.name) { 1021 SPDK_ERRLOG("could not allocate comp_bdev name for unique name\n"); 1022 return -ENOMEM; 1023 } 1024 } 1025 return 0; 1026 } 1027 1028 static int 1029 vbdev_compress_claim(struct vbdev_compress *comp_bdev) 1030 { 1031 struct spdk_uuid ns_uuid; 1032 int rc; 1033 1034 if (_set_compbdev_name(comp_bdev)) { 1035 return -EINVAL; 1036 } 1037 1038 /* Note: some of the fields below will change in the future - for example, 1039 * blockcnt specifically will not match (the compressed volume size will 1040 * be slightly less than the base bdev size) 1041 */ 1042 comp_bdev->comp_bdev.product_name = COMP_BDEV_NAME; 1043 comp_bdev->comp_bdev.write_cache = comp_bdev->base_bdev->write_cache; 1044 1045 comp_bdev->comp_bdev.optimal_io_boundary = 1046 comp_bdev->params.chunk_size / comp_bdev->params.logical_block_size; 1047 1048 comp_bdev->comp_bdev.split_on_optimal_io_boundary = true; 1049 1050 comp_bdev->comp_bdev.blocklen = comp_bdev->params.logical_block_size; 1051 comp_bdev->comp_bdev.blockcnt = comp_bdev->params.vol_size / comp_bdev->comp_bdev.blocklen; 1052 assert(comp_bdev->comp_bdev.blockcnt > 0); 1053 1054 /* This is the context that is passed to us when the bdev 1055 * layer calls in so we'll save our comp_bdev node here. 1056 */ 1057 comp_bdev->comp_bdev.ctxt = comp_bdev; 1058 comp_bdev->comp_bdev.fn_table = &vbdev_compress_fn_table; 1059 comp_bdev->comp_bdev.module = &compress_if; 1060 1061 /* Generate UUID based on namespace UUID + base bdev UUID. */ 1062 spdk_uuid_parse(&ns_uuid, BDEV_COMPRESS_NAMESPACE_UUID); 1063 rc = spdk_uuid_generate_sha1(&comp_bdev->comp_bdev.uuid, &ns_uuid, 1064 (const char *)&comp_bdev->base_bdev->uuid, sizeof(struct spdk_uuid)); 1065 if (rc) { 1066 SPDK_ERRLOG("Unable to generate new UUID for compress bdev, error %s\n", spdk_strerror(-rc)); 1067 return -EINVAL; 1068 } 1069 1070 pthread_mutex_init(&comp_bdev->reduce_lock, NULL); 1071 1072 /* Save the thread where the base device is opened */ 1073 comp_bdev->thread = spdk_get_thread(); 1074 1075 spdk_io_device_register(comp_bdev, comp_bdev_ch_create_cb, comp_bdev_ch_destroy_cb, 1076 sizeof(struct comp_io_channel), 1077 comp_bdev->comp_bdev.name); 1078 1079 rc = spdk_bdev_module_claim_bdev(comp_bdev->base_bdev, comp_bdev->base_desc, 1080 comp_bdev->comp_bdev.module); 1081 if (rc) { 1082 SPDK_ERRLOG("could not claim bdev %s, error %s\n", spdk_bdev_get_name(comp_bdev->base_bdev), 1083 spdk_strerror(-rc)); 1084 goto error_claim; 1085 } 1086 1087 rc = spdk_bdev_register(&comp_bdev->comp_bdev); 1088 if (rc < 0) { 1089 SPDK_ERRLOG("trying to register bdev, error %s\n", spdk_strerror(-rc)); 1090 goto error_bdev_register; 1091 } 1092 1093 TAILQ_INSERT_TAIL(&g_vbdev_comp, comp_bdev, link); 1094 1095 SPDK_NOTICELOG("registered io_device and virtual bdev for: %s\n", comp_bdev->comp_bdev.name); 1096 1097 return 0; 1098 1099 /* Error cleanup paths. */ 1100 error_bdev_register: 1101 spdk_bdev_module_release_bdev(comp_bdev->base_bdev); 1102 error_claim: 1103 spdk_io_device_unregister(comp_bdev, NULL); 1104 free(comp_bdev->comp_bdev.name); 1105 return rc; 1106 } 1107 1108 static void 1109 _vbdev_compress_delete_done(void *_ctx) 1110 { 1111 struct vbdev_comp_delete_ctx *ctx = _ctx; 1112 1113 ctx->cb_fn(ctx->cb_arg, ctx->cb_rc); 1114 1115 free(ctx); 1116 } 1117 1118 static void 1119 vbdev_compress_delete_done(void *cb_arg, int bdeverrno) 1120 { 1121 struct vbdev_comp_delete_ctx *ctx = cb_arg; 1122 1123 ctx->cb_rc = bdeverrno; 1124 1125 if (ctx->orig_thread != spdk_get_thread()) { 1126 spdk_thread_send_msg(ctx->orig_thread, _vbdev_compress_delete_done, ctx); 1127 } else { 1128 _vbdev_compress_delete_done(ctx); 1129 } 1130 } 1131 1132 void 1133 bdev_compress_delete(const char *name, spdk_delete_compress_complete cb_fn, void *cb_arg) 1134 { 1135 struct vbdev_compress *comp_bdev = NULL; 1136 struct vbdev_comp_delete_ctx *ctx; 1137 1138 TAILQ_FOREACH(comp_bdev, &g_vbdev_comp, link) { 1139 if (strcmp(name, comp_bdev->comp_bdev.name) == 0) { 1140 break; 1141 } 1142 } 1143 1144 if (comp_bdev == NULL) { 1145 cb_fn(cb_arg, -ENODEV); 1146 return; 1147 } 1148 1149 ctx = calloc(1, sizeof(*ctx)); 1150 if (ctx == NULL) { 1151 SPDK_ERRLOG("Failed to allocate delete context\n"); 1152 cb_fn(cb_arg, -ENOMEM); 1153 return; 1154 } 1155 1156 /* Save these for after the vol is destroyed. */ 1157 ctx->cb_fn = cb_fn; 1158 ctx->cb_arg = cb_arg; 1159 ctx->orig_thread = spdk_get_thread(); 1160 1161 comp_bdev->delete_ctx = ctx; 1162 1163 /* Tell reducelib that we're done with this volume. */ 1164 if (comp_bdev->orphaned == false) { 1165 spdk_reduce_vol_unload(comp_bdev->vol, delete_vol_unload_cb, comp_bdev); 1166 } else { 1167 delete_vol_unload_cb(comp_bdev, 0); 1168 } 1169 } 1170 1171 static void 1172 _vbdev_reduce_load_unload_cb(void *ctx, int reduce_errno) 1173 { 1174 } 1175 1176 static void 1177 _vbdev_reduce_load_cb(void *ctx) 1178 { 1179 struct vbdev_compress *comp_bdev = ctx; 1180 int rc; 1181 1182 assert(comp_bdev->base_desc != NULL); 1183 1184 /* Done with metadata operations */ 1185 spdk_put_io_channel(comp_bdev->base_ch); 1186 1187 if (comp_bdev->reduce_errno == 0) { 1188 rc = vbdev_compress_claim(comp_bdev); 1189 if (rc != 0) { 1190 spdk_reduce_vol_unload(comp_bdev->vol, _vbdev_reduce_load_unload_cb, NULL); 1191 goto err; 1192 } 1193 } else if (comp_bdev->reduce_errno == -ENOENT) { 1194 if (_set_compbdev_name(comp_bdev)) { 1195 goto err; 1196 } 1197 1198 /* Save the thread where the base device is opened */ 1199 comp_bdev->thread = spdk_get_thread(); 1200 1201 comp_bdev->comp_bdev.module = &compress_if; 1202 pthread_mutex_init(&comp_bdev->reduce_lock, NULL); 1203 rc = spdk_bdev_module_claim_bdev(comp_bdev->base_bdev, comp_bdev->base_desc, 1204 comp_bdev->comp_bdev.module); 1205 if (rc) { 1206 SPDK_ERRLOG("could not claim bdev %s, error %s\n", spdk_bdev_get_name(comp_bdev->base_bdev), 1207 spdk_strerror(-rc)); 1208 free(comp_bdev->comp_bdev.name); 1209 goto err; 1210 } 1211 1212 comp_bdev->orphaned = true; 1213 TAILQ_INSERT_TAIL(&g_vbdev_comp, comp_bdev, link); 1214 } else { 1215 if (comp_bdev->reduce_errno != -EILSEQ) { 1216 SPDK_ERRLOG("for vol %s, error %s\n", spdk_bdev_get_name(comp_bdev->base_bdev), 1217 spdk_strerror(-comp_bdev->reduce_errno)); 1218 } 1219 goto err; 1220 } 1221 1222 spdk_bdev_module_examine_done(&compress_if); 1223 return; 1224 1225 err: 1226 /* Close the underlying bdev on its same opened thread. */ 1227 spdk_bdev_close(comp_bdev->base_desc); 1228 free(comp_bdev); 1229 spdk_bdev_module_examine_done(&compress_if); 1230 } 1231 1232 /* Callback from reduce for then load is complete. We'll pass the vbdev_comp struct 1233 * used for initial metadata operations to claim where it will be further filled out 1234 * and added to the global list. 1235 */ 1236 static void 1237 vbdev_reduce_load_cb(void *cb_arg, struct spdk_reduce_vol *vol, int reduce_errno) 1238 { 1239 struct vbdev_compress *comp_bdev = cb_arg; 1240 1241 if (reduce_errno == 0) { 1242 /* Update information following volume load. */ 1243 comp_bdev->vol = vol; 1244 memcpy(&comp_bdev->params, spdk_reduce_vol_get_params(vol), 1245 sizeof(struct spdk_reduce_vol_params)); 1246 } 1247 1248 comp_bdev->reduce_errno = reduce_errno; 1249 1250 if (comp_bdev->thread && comp_bdev->thread != spdk_get_thread()) { 1251 spdk_thread_send_msg(comp_bdev->thread, _vbdev_reduce_load_cb, comp_bdev); 1252 } else { 1253 _vbdev_reduce_load_cb(comp_bdev); 1254 } 1255 1256 } 1257 1258 /* Examine_disk entry point: will do a metadata load to see if this is ours, 1259 * and if so will go ahead and claim it. 1260 */ 1261 static void 1262 vbdev_compress_examine(struct spdk_bdev *bdev) 1263 { 1264 struct spdk_bdev_desc *bdev_desc = NULL; 1265 struct vbdev_compress *comp_bdev; 1266 int rc; 1267 1268 if (strcmp(bdev->product_name, COMP_BDEV_NAME) == 0) { 1269 spdk_bdev_module_examine_done(&compress_if); 1270 return; 1271 } 1272 1273 rc = spdk_bdev_open_ext(spdk_bdev_get_name(bdev), false, 1274 vbdev_compress_base_bdev_event_cb, NULL, &bdev_desc); 1275 if (rc) { 1276 SPDK_ERRLOG("could not open bdev %s, error %s\n", spdk_bdev_get_name(bdev), 1277 spdk_strerror(-rc)); 1278 spdk_bdev_module_examine_done(&compress_if); 1279 return; 1280 } 1281 1282 comp_bdev = _prepare_for_load_init(bdev_desc, 0); 1283 if (comp_bdev == NULL) { 1284 spdk_bdev_close(bdev_desc); 1285 spdk_bdev_module_examine_done(&compress_if); 1286 return; 1287 } 1288 1289 /* Save the thread where the base device is opened */ 1290 comp_bdev->thread = spdk_get_thread(); 1291 1292 comp_bdev->base_ch = spdk_bdev_get_io_channel(comp_bdev->base_desc); 1293 spdk_reduce_vol_load(&comp_bdev->backing_dev, vbdev_reduce_load_cb, comp_bdev); 1294 } 1295 1296 SPDK_LOG_REGISTER_COMPONENT(vbdev_compress) 1297