1 /* SPDX-License-Identifier: BSD-3-Clause 2 * Copyright (C) 2018 Intel Corporation. 3 * All rights reserved. 4 * Copyright (c) 2021, 2022 NVIDIA CORPORATION & AFFILIATES. All rights reserved. 5 */ 6 7 #include "vbdev_compress.h" 8 9 #include "spdk/reduce.h" 10 #include "spdk/stdinc.h" 11 #include "spdk/rpc.h" 12 #include "spdk/env.h" 13 #include "spdk/endian.h" 14 #include "spdk/string.h" 15 #include "spdk/thread.h" 16 #include "spdk/util.h" 17 #include "spdk/bdev_module.h" 18 #include "spdk/likely.h" 19 #include "spdk/log.h" 20 #include "spdk/accel.h" 21 22 #include "spdk/accel_module.h" 23 24 #define CHUNK_SIZE (1024 * 16) 25 #define COMP_BDEV_NAME "compress" 26 #define BACKING_IO_SZ (4 * 1024) 27 28 /* This namespace UUID was generated using uuid_generate() method. */ 29 #define BDEV_COMPRESS_NAMESPACE_UUID "c3fad6da-832f-4cc0-9cdc-5c552b225e7b" 30 31 struct vbdev_comp_delete_ctx { 32 spdk_delete_compress_complete cb_fn; 33 void *cb_arg; 34 int cb_rc; 35 struct spdk_thread *orig_thread; 36 }; 37 38 /* List of virtual bdevs and associated info for each. */ 39 struct vbdev_compress { 40 struct spdk_bdev *base_bdev; /* the thing we're attaching to */ 41 struct spdk_bdev_desc *base_desc; /* its descriptor we get from open */ 42 struct spdk_io_channel *base_ch; /* IO channel of base device */ 43 struct spdk_bdev comp_bdev; /* the compression virtual bdev */ 44 struct comp_io_channel *comp_ch; /* channel associated with this bdev */ 45 struct spdk_io_channel *accel_channel; /* to communicate with the accel framework */ 46 struct spdk_thread *reduce_thread; 47 pthread_mutex_t reduce_lock; 48 uint32_t ch_count; 49 TAILQ_HEAD(, spdk_bdev_io) pending_comp_ios; /* outstanding operations to a comp library */ 50 struct spdk_poller *poller; /* completion poller */ 51 struct spdk_reduce_vol_params params; /* params for the reduce volume */ 52 struct spdk_reduce_backing_dev backing_dev; /* backing device info for the reduce volume */ 53 struct spdk_reduce_vol *vol; /* the reduce volume */ 54 struct vbdev_comp_delete_ctx *delete_ctx; 55 bool orphaned; /* base bdev claimed but comp_bdev not registered */ 56 int reduce_errno; 57 TAILQ_HEAD(, vbdev_comp_op) queued_comp_ops; 58 TAILQ_ENTRY(vbdev_compress) link; 59 struct spdk_thread *thread; /* thread where base device is opened */ 60 enum spdk_accel_comp_algo comp_algo; /* compression algorithm for compress bdev */ 61 uint32_t comp_level; /* compression algorithm level */ 62 }; 63 static TAILQ_HEAD(, vbdev_compress) g_vbdev_comp = TAILQ_HEAD_INITIALIZER(g_vbdev_comp); 64 65 /* The comp vbdev channel struct. It is allocated and freed on my behalf by the io channel code. 66 */ 67 struct comp_io_channel { 68 struct spdk_io_channel_iter *iter; /* used with for_each_channel in reset */ 69 }; 70 71 /* Per I/O context for the compression vbdev. */ 72 struct comp_bdev_io { 73 struct comp_io_channel *comp_ch; /* used in completion handling */ 74 struct vbdev_compress *comp_bdev; /* vbdev associated with this IO */ 75 struct spdk_bdev_io_wait_entry bdev_io_wait; /* for bdev_io_wait */ 76 struct spdk_bdev_io *orig_io; /* the original IO */ 77 struct spdk_io_channel *ch; /* for resubmission */ 78 int status; /* save for completion on orig thread */ 79 }; 80 81 static void vbdev_compress_examine(struct spdk_bdev *bdev); 82 static int vbdev_compress_claim(struct vbdev_compress *comp_bdev); 83 static void vbdev_compress_queue_io(struct spdk_bdev_io *bdev_io); 84 struct vbdev_compress *_prepare_for_load_init(struct spdk_bdev_desc *bdev_desc, uint32_t lb_size, 85 uint8_t comp_algo, uint32_t comp_level); 86 static void vbdev_compress_submit_request(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io); 87 static void comp_bdev_ch_destroy_cb(void *io_device, void *ctx_buf); 88 static void vbdev_compress_delete_done(void *cb_arg, int bdeverrno); 89 static void _comp_reduce_resubmit_backing_io(void *_backing_io); 90 91 /* for completing rw requests on the orig IO thread. */ 92 static void 93 _reduce_rw_blocks_cb(void *arg) 94 { 95 struct comp_bdev_io *io_ctx = arg; 96 97 if (spdk_likely(io_ctx->status == 0)) { 98 spdk_bdev_io_complete(io_ctx->orig_io, SPDK_BDEV_IO_STATUS_SUCCESS); 99 } else if (io_ctx->status == -ENOMEM) { 100 vbdev_compress_queue_io(spdk_bdev_io_from_ctx(io_ctx)); 101 } else { 102 SPDK_ERRLOG("Failed to execute reduce api. %s\n", spdk_strerror(-io_ctx->status)); 103 spdk_bdev_io_complete(io_ctx->orig_io, SPDK_BDEV_IO_STATUS_FAILED); 104 } 105 } 106 107 /* Completion callback for r/w that were issued via reducelib. */ 108 static void 109 reduce_rw_blocks_cb(void *arg, int reduce_errno) 110 { 111 struct spdk_bdev_io *bdev_io = arg; 112 struct comp_bdev_io *io_ctx = (struct comp_bdev_io *)bdev_io->driver_ctx; 113 struct spdk_io_channel *ch = spdk_io_channel_from_ctx(io_ctx->comp_ch); 114 struct spdk_thread *orig_thread; 115 116 /* TODO: need to decide which error codes are bdev_io success vs failure; 117 * example examine calls reading metadata */ 118 119 io_ctx->status = reduce_errno; 120 121 /* Send this request to the orig IO thread. */ 122 orig_thread = spdk_io_channel_get_thread(ch); 123 124 spdk_thread_exec_msg(orig_thread, _reduce_rw_blocks_cb, io_ctx); 125 } 126 127 static int 128 _compress_operation(struct spdk_reduce_backing_dev *backing_dev, struct iovec *src_iovs, 129 int src_iovcnt, struct iovec *dst_iovs, 130 int dst_iovcnt, bool compress, void *cb_arg) 131 { 132 struct spdk_reduce_vol_cb_args *reduce_cb_arg = cb_arg; 133 struct vbdev_compress *comp_bdev = SPDK_CONTAINEROF(backing_dev, struct vbdev_compress, 134 backing_dev); 135 int rc; 136 137 if (compress) { 138 assert(dst_iovcnt == 1); 139 rc = spdk_accel_submit_compress_ext(comp_bdev->accel_channel, dst_iovs[0].iov_base, 140 dst_iovs[0].iov_len, src_iovs, src_iovcnt, 141 comp_bdev->comp_algo, comp_bdev->comp_level, 142 &reduce_cb_arg->output_size, reduce_cb_arg->cb_fn, 143 reduce_cb_arg->cb_arg); 144 } else { 145 rc = spdk_accel_submit_decompress_ext(comp_bdev->accel_channel, dst_iovs, dst_iovcnt, 146 src_iovs, src_iovcnt, comp_bdev->comp_algo, 147 &reduce_cb_arg->output_size, reduce_cb_arg->cb_fn, 148 reduce_cb_arg->cb_arg); 149 } 150 151 return rc; 152 } 153 154 /* Entry point for reduce lib to issue a compress operation. */ 155 static void 156 _comp_reduce_compress(struct spdk_reduce_backing_dev *dev, 157 struct iovec *src_iovs, int src_iovcnt, 158 struct iovec *dst_iovs, int dst_iovcnt, 159 struct spdk_reduce_vol_cb_args *cb_arg) 160 { 161 int rc; 162 163 rc = _compress_operation(dev, src_iovs, src_iovcnt, dst_iovs, dst_iovcnt, true, cb_arg); 164 if (rc) { 165 SPDK_ERRLOG("with compress operation code %d (%s)\n", rc, spdk_strerror(-rc)); 166 cb_arg->cb_fn(cb_arg->cb_arg, rc); 167 } 168 } 169 170 /* Entry point for reduce lib to issue a decompress operation. */ 171 static void 172 _comp_reduce_decompress(struct spdk_reduce_backing_dev *dev, 173 struct iovec *src_iovs, int src_iovcnt, 174 struct iovec *dst_iovs, int dst_iovcnt, 175 struct spdk_reduce_vol_cb_args *cb_arg) 176 { 177 int rc; 178 179 rc = _compress_operation(dev, src_iovs, src_iovcnt, dst_iovs, dst_iovcnt, false, cb_arg); 180 if (rc) { 181 SPDK_ERRLOG("with decompress operation code %d (%s)\n", rc, spdk_strerror(-rc)); 182 cb_arg->cb_fn(cb_arg->cb_arg, rc); 183 } 184 } 185 186 static void 187 _comp_submit_write(void *ctx) 188 { 189 struct spdk_bdev_io *bdev_io = ctx; 190 struct vbdev_compress *comp_bdev = SPDK_CONTAINEROF(bdev_io->bdev, struct vbdev_compress, 191 comp_bdev); 192 193 spdk_reduce_vol_writev(comp_bdev->vol, bdev_io->u.bdev.iovs, bdev_io->u.bdev.iovcnt, 194 bdev_io->u.bdev.offset_blocks, bdev_io->u.bdev.num_blocks, 195 reduce_rw_blocks_cb, bdev_io); 196 } 197 198 static void 199 _comp_submit_read(void *ctx) 200 { 201 struct spdk_bdev_io *bdev_io = ctx; 202 struct vbdev_compress *comp_bdev = SPDK_CONTAINEROF(bdev_io->bdev, struct vbdev_compress, 203 comp_bdev); 204 205 spdk_reduce_vol_readv(comp_bdev->vol, bdev_io->u.bdev.iovs, bdev_io->u.bdev.iovcnt, 206 bdev_io->u.bdev.offset_blocks, bdev_io->u.bdev.num_blocks, 207 reduce_rw_blocks_cb, bdev_io); 208 } 209 210 211 /* Callback for getting a buf from the bdev pool in the event that the caller passed 212 * in NULL, we need to own the buffer so it doesn't get freed by another vbdev module 213 * beneath us before we're done with it. 214 */ 215 static void 216 comp_read_get_buf_cb(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io, bool success) 217 { 218 struct vbdev_compress *comp_bdev = SPDK_CONTAINEROF(bdev_io->bdev, struct vbdev_compress, 219 comp_bdev); 220 221 if (spdk_unlikely(!success)) { 222 SPDK_ERRLOG("Failed to get data buffer\n"); 223 reduce_rw_blocks_cb(bdev_io, -ENOMEM); 224 return; 225 } 226 227 spdk_thread_exec_msg(comp_bdev->reduce_thread, _comp_submit_read, bdev_io); 228 } 229 230 /* Called when someone above submits IO to this vbdev. */ 231 static void 232 vbdev_compress_submit_request(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io) 233 { 234 struct comp_bdev_io *io_ctx = (struct comp_bdev_io *)bdev_io->driver_ctx; 235 struct vbdev_compress *comp_bdev = SPDK_CONTAINEROF(bdev_io->bdev, struct vbdev_compress, 236 comp_bdev); 237 struct comp_io_channel *comp_ch = spdk_io_channel_get_ctx(ch); 238 239 memset(io_ctx, 0, sizeof(struct comp_bdev_io)); 240 io_ctx->comp_bdev = comp_bdev; 241 io_ctx->comp_ch = comp_ch; 242 io_ctx->orig_io = bdev_io; 243 244 switch (bdev_io->type) { 245 case SPDK_BDEV_IO_TYPE_READ: 246 spdk_bdev_io_get_buf(bdev_io, comp_read_get_buf_cb, 247 bdev_io->u.bdev.num_blocks * bdev_io->bdev->blocklen); 248 return; 249 case SPDK_BDEV_IO_TYPE_WRITE: 250 spdk_thread_exec_msg(comp_bdev->reduce_thread, _comp_submit_write, bdev_io); 251 return; 252 /* TODO support RESET in future patch in the series */ 253 case SPDK_BDEV_IO_TYPE_RESET: 254 case SPDK_BDEV_IO_TYPE_WRITE_ZEROES: 255 case SPDK_BDEV_IO_TYPE_UNMAP: 256 case SPDK_BDEV_IO_TYPE_FLUSH: 257 default: 258 SPDK_ERRLOG("Unknown I/O type %d\n", bdev_io->type); 259 spdk_bdev_io_complete(io_ctx->orig_io, SPDK_BDEV_IO_STATUS_FAILED); 260 break; 261 } 262 } 263 264 static bool 265 vbdev_compress_io_type_supported(void *ctx, enum spdk_bdev_io_type io_type) 266 { 267 struct vbdev_compress *comp_bdev = (struct vbdev_compress *)ctx; 268 269 switch (io_type) { 270 case SPDK_BDEV_IO_TYPE_READ: 271 case SPDK_BDEV_IO_TYPE_WRITE: 272 return spdk_bdev_io_type_supported(comp_bdev->base_bdev, io_type); 273 case SPDK_BDEV_IO_TYPE_UNMAP: 274 case SPDK_BDEV_IO_TYPE_RESET: 275 case SPDK_BDEV_IO_TYPE_FLUSH: 276 case SPDK_BDEV_IO_TYPE_WRITE_ZEROES: 277 default: 278 return false; 279 } 280 } 281 282 /* Resubmission function used by the bdev layer when a queued IO is ready to be 283 * submitted. 284 */ 285 static void 286 vbdev_compress_resubmit_io(void *arg) 287 { 288 struct spdk_bdev_io *bdev_io = (struct spdk_bdev_io *)arg; 289 struct comp_bdev_io *io_ctx = (struct comp_bdev_io *)bdev_io->driver_ctx; 290 291 vbdev_compress_submit_request(io_ctx->ch, bdev_io); 292 } 293 294 /* Used to queue an IO in the event of resource issues. */ 295 static void 296 vbdev_compress_queue_io(struct spdk_bdev_io *bdev_io) 297 { 298 struct comp_bdev_io *io_ctx = (struct comp_bdev_io *)bdev_io->driver_ctx; 299 int rc; 300 301 io_ctx->bdev_io_wait.bdev = bdev_io->bdev; 302 io_ctx->bdev_io_wait.cb_fn = vbdev_compress_resubmit_io; 303 io_ctx->bdev_io_wait.cb_arg = bdev_io; 304 305 rc = spdk_bdev_queue_io_wait(bdev_io->bdev, io_ctx->comp_bdev->base_ch, &io_ctx->bdev_io_wait); 306 if (rc) { 307 SPDK_ERRLOG("Queue io failed in vbdev_compress_queue_io, rc=%d.\n", rc); 308 assert(false); 309 spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED); 310 } 311 } 312 313 /* Callback for unregistering the IO device. */ 314 static void 315 _device_unregister_cb(void *io_device) 316 { 317 struct vbdev_compress *comp_bdev = io_device; 318 319 /* Done with this comp_bdev. */ 320 pthread_mutex_destroy(&comp_bdev->reduce_lock); 321 free(comp_bdev->comp_bdev.name); 322 free(comp_bdev); 323 } 324 325 static void 326 _vbdev_compress_destruct_cb(void *ctx) 327 { 328 struct vbdev_compress *comp_bdev = ctx; 329 330 TAILQ_REMOVE(&g_vbdev_comp, comp_bdev, link); 331 spdk_bdev_module_release_bdev(comp_bdev->base_bdev); 332 /* Close the underlying bdev on its same opened thread. */ 333 spdk_bdev_close(comp_bdev->base_desc); 334 comp_bdev->vol = NULL; 335 if (comp_bdev->orphaned == false) { 336 spdk_io_device_unregister(comp_bdev, _device_unregister_cb); 337 } else { 338 vbdev_compress_delete_done(comp_bdev->delete_ctx, 0); 339 _device_unregister_cb(comp_bdev); 340 } 341 } 342 343 static void 344 vbdev_compress_destruct_cb(void *cb_arg, int reduce_errno) 345 { 346 struct vbdev_compress *comp_bdev = (struct vbdev_compress *)cb_arg; 347 348 if (reduce_errno) { 349 SPDK_ERRLOG("number %d\n", reduce_errno); 350 } else { 351 if (comp_bdev->thread && comp_bdev->thread != spdk_get_thread()) { 352 spdk_thread_send_msg(comp_bdev->thread, 353 _vbdev_compress_destruct_cb, comp_bdev); 354 } else { 355 _vbdev_compress_destruct_cb(comp_bdev); 356 } 357 } 358 } 359 360 static void 361 _reduce_destroy_cb(void *ctx, int reduce_errno) 362 { 363 struct vbdev_compress *comp_bdev = (struct vbdev_compress *)ctx; 364 365 if (reduce_errno) { 366 SPDK_ERRLOG("number %d\n", reduce_errno); 367 } 368 369 comp_bdev->vol = NULL; 370 spdk_put_io_channel(comp_bdev->base_ch); 371 if (comp_bdev->orphaned == false) { 372 spdk_bdev_unregister(&comp_bdev->comp_bdev, vbdev_compress_delete_done, 373 comp_bdev->delete_ctx); 374 } else { 375 vbdev_compress_destruct_cb((void *)comp_bdev, 0); 376 } 377 378 } 379 380 static void 381 _delete_vol_unload_cb(void *ctx) 382 { 383 struct vbdev_compress *comp_bdev = ctx; 384 385 /* FIXME: Assert if these conditions are not satisfied for now. */ 386 assert(!comp_bdev->reduce_thread || 387 comp_bdev->reduce_thread == spdk_get_thread()); 388 389 /* reducelib needs a channel to comm with the backing device */ 390 comp_bdev->base_ch = spdk_bdev_get_io_channel(comp_bdev->base_desc); 391 392 /* Clean the device before we free our resources. */ 393 spdk_reduce_vol_destroy(&comp_bdev->backing_dev, _reduce_destroy_cb, comp_bdev); 394 } 395 396 /* Called by reduceLib after performing unload vol actions */ 397 static void 398 delete_vol_unload_cb(void *cb_arg, int reduce_errno) 399 { 400 struct vbdev_compress *comp_bdev = (struct vbdev_compress *)cb_arg; 401 402 if (reduce_errno) { 403 SPDK_ERRLOG("number %d\n", reduce_errno); 404 /* FIXME: callback should be executed. */ 405 return; 406 } 407 408 pthread_mutex_lock(&comp_bdev->reduce_lock); 409 if (comp_bdev->reduce_thread && comp_bdev->reduce_thread != spdk_get_thread()) { 410 spdk_thread_send_msg(comp_bdev->reduce_thread, 411 _delete_vol_unload_cb, comp_bdev); 412 pthread_mutex_unlock(&comp_bdev->reduce_lock); 413 } else { 414 pthread_mutex_unlock(&comp_bdev->reduce_lock); 415 416 _delete_vol_unload_cb(comp_bdev); 417 } 418 } 419 420 const char * 421 compress_get_name(const struct vbdev_compress *comp_bdev) 422 { 423 return comp_bdev->comp_bdev.name; 424 } 425 426 struct vbdev_compress * 427 compress_bdev_first(void) 428 { 429 struct vbdev_compress *comp_bdev; 430 431 comp_bdev = TAILQ_FIRST(&g_vbdev_comp); 432 433 return comp_bdev; 434 } 435 436 struct vbdev_compress * 437 compress_bdev_next(struct vbdev_compress *prev) 438 { 439 struct vbdev_compress *comp_bdev; 440 441 comp_bdev = TAILQ_NEXT(prev, link); 442 443 return comp_bdev; 444 } 445 446 bool 447 compress_has_orphan(const char *name) 448 { 449 struct vbdev_compress *comp_bdev; 450 451 TAILQ_FOREACH(comp_bdev, &g_vbdev_comp, link) { 452 if (comp_bdev->orphaned && strcmp(name, comp_bdev->comp_bdev.name) == 0) { 453 return true; 454 } 455 } 456 return false; 457 } 458 459 /* Called after we've unregistered following a hot remove callback. 460 * Our finish entry point will be called next. 461 */ 462 static int 463 vbdev_compress_destruct(void *ctx) 464 { 465 struct vbdev_compress *comp_bdev = (struct vbdev_compress *)ctx; 466 467 if (comp_bdev->vol != NULL) { 468 /* Tell reducelib that we're done with this volume. */ 469 spdk_reduce_vol_unload(comp_bdev->vol, vbdev_compress_destruct_cb, comp_bdev); 470 } else { 471 vbdev_compress_destruct_cb(comp_bdev, 0); 472 } 473 474 return 0; 475 } 476 477 /* We supplied this as an entry point for upper layers who want to communicate to this 478 * bdev. This is how they get a channel. 479 */ 480 static struct spdk_io_channel * 481 vbdev_compress_get_io_channel(void *ctx) 482 { 483 struct vbdev_compress *comp_bdev = (struct vbdev_compress *)ctx; 484 485 /* The IO channel code will allocate a channel for us which consists of 486 * the SPDK channel structure plus the size of our comp_io_channel struct 487 * that we passed in when we registered our IO device. It will then call 488 * our channel create callback to populate any elements that we need to 489 * update. 490 */ 491 return spdk_get_io_channel(comp_bdev); 492 } 493 494 /* This is the output for bdev_get_bdevs() for this vbdev */ 495 static int 496 vbdev_compress_dump_info_json(void *ctx, struct spdk_json_write_ctx *w) 497 { 498 struct vbdev_compress *comp_bdev = (struct vbdev_compress *)ctx; 499 500 spdk_json_write_name(w, "compress"); 501 spdk_json_write_object_begin(w); 502 spdk_json_write_named_string(w, "name", spdk_bdev_get_name(&comp_bdev->comp_bdev)); 503 spdk_json_write_named_string(w, "base_bdev_name", spdk_bdev_get_name(comp_bdev->base_bdev)); 504 spdk_json_write_named_string(w, "pm_path", spdk_reduce_vol_get_pm_path(comp_bdev->vol)); 505 spdk_json_write_object_end(w); 506 507 return 0; 508 } 509 510 static int 511 vbdev_compress_config_json(struct spdk_json_write_ctx *w) 512 { 513 /* Nothing to dump as compress bdev configuration is saved on physical device. */ 514 return 0; 515 } 516 517 struct vbdev_init_reduce_ctx { 518 struct vbdev_compress *comp_bdev; 519 int status; 520 bdev_compress_create_cb cb_fn; 521 void *cb_ctx; 522 }; 523 524 static void 525 _vbdev_reduce_init_unload_cb(void *ctx, int reduce_errno) 526 { 527 } 528 529 static void 530 _vbdev_reduce_init_cb(void *ctx) 531 { 532 struct vbdev_init_reduce_ctx *init_ctx = ctx; 533 struct vbdev_compress *comp_bdev = init_ctx->comp_bdev; 534 int rc; 535 536 assert(comp_bdev->base_desc != NULL); 537 538 /* We're done with metadata operations */ 539 spdk_put_io_channel(comp_bdev->base_ch); 540 541 if (comp_bdev->vol) { 542 rc = vbdev_compress_claim(comp_bdev); 543 if (rc == 0) { 544 init_ctx->cb_fn(init_ctx->cb_ctx, rc); 545 free(init_ctx); 546 return; 547 } else { 548 spdk_reduce_vol_unload(comp_bdev->vol, _vbdev_reduce_init_unload_cb, NULL); 549 } 550 init_ctx->cb_fn(init_ctx->cb_ctx, rc); 551 } 552 553 /* Close the underlying bdev on its same opened thread. */ 554 spdk_bdev_close(comp_bdev->base_desc); 555 free(comp_bdev); 556 free(init_ctx); 557 } 558 559 /* Callback from reduce for when init is complete. We'll pass the vbdev_comp struct 560 * used for initial metadata operations to claim where it will be further filled out 561 * and added to the global list. 562 */ 563 static void 564 vbdev_reduce_init_cb(void *cb_arg, struct spdk_reduce_vol *vol, int reduce_errno) 565 { 566 struct vbdev_init_reduce_ctx *init_ctx = cb_arg; 567 struct vbdev_compress *comp_bdev = init_ctx->comp_bdev; 568 569 if (reduce_errno == 0) { 570 comp_bdev->vol = vol; 571 } else { 572 SPDK_ERRLOG("for vol %s, error %s\n", 573 spdk_bdev_get_name(comp_bdev->base_bdev), spdk_strerror(-reduce_errno)); 574 init_ctx->cb_fn(init_ctx->cb_ctx, reduce_errno); 575 } 576 577 init_ctx->status = reduce_errno; 578 579 if (comp_bdev->thread && comp_bdev->thread != spdk_get_thread()) { 580 spdk_thread_send_msg(comp_bdev->thread, _vbdev_reduce_init_cb, init_ctx); 581 } else { 582 _vbdev_reduce_init_cb(init_ctx); 583 } 584 } 585 586 /* Callback for the function used by reduceLib to perform IO to/from the backing device. We just 587 * call the callback provided by reduceLib when it called the read/write/unmap function and 588 * free the bdev_io. 589 */ 590 static void 591 comp_reduce_io_cb(struct spdk_bdev_io *bdev_io, bool success, void *arg) 592 { 593 struct spdk_reduce_vol_cb_args *cb_args = arg; 594 int reduce_errno; 595 596 if (success) { 597 reduce_errno = 0; 598 } else { 599 reduce_errno = -EIO; 600 } 601 spdk_bdev_free_io(bdev_io); 602 cb_args->cb_fn(cb_args->cb_arg, reduce_errno); 603 } 604 605 static void 606 _comp_backing_bdev_queue_io_wait(struct vbdev_compress *comp_bdev, 607 struct spdk_reduce_backing_io *backing_io) 608 { 609 struct spdk_bdev_io_wait_entry *waitq_entry; 610 int rc; 611 612 waitq_entry = (struct spdk_bdev_io_wait_entry *) &backing_io->user_ctx; 613 waitq_entry->bdev = spdk_bdev_desc_get_bdev(comp_bdev->base_desc); 614 waitq_entry->cb_fn = _comp_reduce_resubmit_backing_io; 615 waitq_entry->cb_arg = backing_io; 616 617 rc = spdk_bdev_queue_io_wait(waitq_entry->bdev, comp_bdev->base_ch, waitq_entry); 618 if (rc) { 619 SPDK_ERRLOG("Queue io failed in _comp_backing_bdev_queue_io_wait, rc=%d.\n", rc); 620 assert(false); 621 backing_io->backing_cb_args->cb_fn(backing_io->backing_cb_args->cb_arg, rc); 622 } 623 } 624 625 static void 626 _comp_backing_bdev_read(struct spdk_reduce_backing_io *backing_io) 627 { 628 struct spdk_reduce_vol_cb_args *backing_cb_args = backing_io->backing_cb_args; 629 struct vbdev_compress *comp_bdev = SPDK_CONTAINEROF(backing_io->dev, struct vbdev_compress, 630 backing_dev); 631 int rc; 632 633 rc = spdk_bdev_readv_blocks(comp_bdev->base_desc, comp_bdev->base_ch, 634 backing_io->iov, backing_io->iovcnt, 635 backing_io->lba, backing_io->lba_count, 636 comp_reduce_io_cb, 637 backing_cb_args); 638 639 if (rc) { 640 if (rc == -ENOMEM) { 641 _comp_backing_bdev_queue_io_wait(comp_bdev, backing_io); 642 return; 643 } else { 644 SPDK_ERRLOG("submitting readv request, rc=%d\n", rc); 645 } 646 backing_cb_args->cb_fn(backing_cb_args->cb_arg, rc); 647 } 648 } 649 650 static void 651 _comp_backing_bdev_write(struct spdk_reduce_backing_io *backing_io) 652 { 653 struct spdk_reduce_vol_cb_args *backing_cb_args = backing_io->backing_cb_args; 654 struct vbdev_compress *comp_bdev = SPDK_CONTAINEROF(backing_io->dev, struct vbdev_compress, 655 backing_dev); 656 int rc; 657 658 rc = spdk_bdev_writev_blocks(comp_bdev->base_desc, comp_bdev->base_ch, 659 backing_io->iov, backing_io->iovcnt, 660 backing_io->lba, backing_io->lba_count, 661 comp_reduce_io_cb, 662 backing_cb_args); 663 664 if (rc) { 665 if (rc == -ENOMEM) { 666 _comp_backing_bdev_queue_io_wait(comp_bdev, backing_io); 667 return; 668 } else { 669 SPDK_ERRLOG("error submitting writev request, rc=%d\n", rc); 670 } 671 backing_cb_args->cb_fn(backing_cb_args->cb_arg, rc); 672 } 673 } 674 675 static void 676 _comp_backing_bdev_unmap(struct spdk_reduce_backing_io *backing_io) 677 { 678 struct spdk_reduce_vol_cb_args *backing_cb_args = backing_io->backing_cb_args; 679 struct vbdev_compress *comp_bdev = SPDK_CONTAINEROF(backing_io->dev, struct vbdev_compress, 680 backing_dev); 681 int rc; 682 683 rc = spdk_bdev_unmap_blocks(comp_bdev->base_desc, comp_bdev->base_ch, 684 backing_io->lba, backing_io->lba_count, 685 comp_reduce_io_cb, 686 backing_cb_args); 687 688 if (rc) { 689 if (rc == -ENOMEM) { 690 _comp_backing_bdev_queue_io_wait(comp_bdev, backing_io); 691 return; 692 } else { 693 SPDK_ERRLOG("submitting unmap request, rc=%d\n", rc); 694 } 695 backing_cb_args->cb_fn(backing_cb_args->cb_arg, rc); 696 } 697 } 698 699 /* This is the function provided to the reduceLib for sending reads/writes/unmaps 700 * directly to the backing device. 701 */ 702 static void 703 _comp_reduce_submit_backing_io(struct spdk_reduce_backing_io *backing_io) 704 { 705 switch (backing_io->backing_io_type) { 706 case SPDK_REDUCE_BACKING_IO_WRITE: 707 _comp_backing_bdev_write(backing_io); 708 break; 709 case SPDK_REDUCE_BACKING_IO_READ: 710 _comp_backing_bdev_read(backing_io); 711 break; 712 case SPDK_REDUCE_BACKING_IO_UNMAP: 713 _comp_backing_bdev_unmap(backing_io); 714 break; 715 default: 716 SPDK_ERRLOG("Unknown I/O type %d\n", backing_io->backing_io_type); 717 backing_io->backing_cb_args->cb_fn(backing_io->backing_cb_args->cb_arg, -EINVAL); 718 break; 719 } 720 } 721 722 static void 723 _comp_reduce_resubmit_backing_io(void *_backing_io) 724 { 725 struct spdk_reduce_backing_io *backing_io = _backing_io; 726 727 _comp_reduce_submit_backing_io(backing_io); 728 } 729 730 /* Called by reduceLib after performing unload vol actions following base bdev hotremove */ 731 static void 732 bdev_hotremove_vol_unload_cb(void *cb_arg, int reduce_errno) 733 { 734 struct vbdev_compress *comp_bdev = (struct vbdev_compress *)cb_arg; 735 736 if (reduce_errno) { 737 SPDK_ERRLOG("number %d\n", reduce_errno); 738 } 739 740 comp_bdev->vol = NULL; 741 spdk_bdev_unregister(&comp_bdev->comp_bdev, NULL, NULL); 742 } 743 744 static void 745 vbdev_compress_base_bdev_hotremove_cb(struct spdk_bdev *bdev_find) 746 { 747 struct vbdev_compress *comp_bdev, *tmp; 748 749 TAILQ_FOREACH_SAFE(comp_bdev, &g_vbdev_comp, link, tmp) { 750 if (bdev_find == comp_bdev->base_bdev) { 751 /* Tell reduceLib that we're done with this volume. */ 752 spdk_reduce_vol_unload(comp_bdev->vol, bdev_hotremove_vol_unload_cb, comp_bdev); 753 } 754 } 755 } 756 757 /* Called when the underlying base bdev triggers asynchronous event such as bdev removal. */ 758 static void 759 vbdev_compress_base_bdev_event_cb(enum spdk_bdev_event_type type, struct spdk_bdev *bdev, 760 void *event_ctx) 761 { 762 switch (type) { 763 case SPDK_BDEV_EVENT_REMOVE: 764 vbdev_compress_base_bdev_hotremove_cb(bdev); 765 break; 766 default: 767 SPDK_NOTICELOG("Unsupported bdev event: type %d\n", type); 768 break; 769 } 770 } 771 772 /* TODO: determine which parms we want user configurable, HC for now 773 * params.vol_size 774 * params.chunk_size 775 * compression PMD, algorithm, window size, comp level, etc. 776 * DEV_MD_PATH 777 */ 778 779 /* Common function for init and load to allocate and populate the minimal 780 * information for reducelib to init or load. 781 */ 782 struct vbdev_compress * 783 _prepare_for_load_init(struct spdk_bdev_desc *bdev_desc, uint32_t lb_size, uint8_t comp_algo, 784 uint32_t comp_level) 785 { 786 struct vbdev_compress *comp_bdev; 787 struct spdk_bdev *bdev; 788 789 comp_bdev = calloc(1, sizeof(struct vbdev_compress)); 790 if (comp_bdev == NULL) { 791 SPDK_ERRLOG("failed to alloc comp_bdev\n"); 792 return NULL; 793 } 794 795 comp_bdev->backing_dev.submit_backing_io = _comp_reduce_submit_backing_io; 796 comp_bdev->backing_dev.compress = _comp_reduce_compress; 797 comp_bdev->backing_dev.decompress = _comp_reduce_decompress; 798 799 comp_bdev->base_desc = bdev_desc; 800 bdev = spdk_bdev_desc_get_bdev(bdev_desc); 801 comp_bdev->base_bdev = bdev; 802 803 comp_bdev->backing_dev.blocklen = bdev->blocklen; 804 comp_bdev->backing_dev.blockcnt = bdev->blockcnt; 805 806 comp_bdev->backing_dev.user_ctx_size = sizeof(struct spdk_bdev_io_wait_entry); 807 808 comp_bdev->comp_algo = comp_algo; 809 comp_bdev->comp_level = comp_level; 810 comp_bdev->params.comp_algo = comp_algo; 811 comp_bdev->params.comp_level = comp_level; 812 comp_bdev->params.chunk_size = CHUNK_SIZE; 813 if (lb_size == 0) { 814 comp_bdev->params.logical_block_size = bdev->blocklen; 815 } else { 816 comp_bdev->params.logical_block_size = lb_size; 817 } 818 819 comp_bdev->params.backing_io_unit_size = BACKING_IO_SZ; 820 return comp_bdev; 821 } 822 823 /* Call reducelib to initialize a new volume */ 824 static int 825 vbdev_init_reduce(const char *bdev_name, const char *pm_path, uint32_t lb_size, uint8_t comp_algo, 826 uint32_t comp_level, bdev_compress_create_cb cb_fn, void *cb_arg) 827 { 828 struct spdk_bdev_desc *bdev_desc = NULL; 829 struct vbdev_init_reduce_ctx *init_ctx; 830 struct vbdev_compress *comp_bdev; 831 int rc; 832 833 init_ctx = calloc(1, sizeof(*init_ctx)); 834 if (init_ctx == NULL) { 835 SPDK_ERRLOG("failed to alloc init contexts\n"); 836 return - ENOMEM; 837 } 838 839 init_ctx->cb_fn = cb_fn; 840 init_ctx->cb_ctx = cb_arg; 841 842 rc = spdk_bdev_open_ext(bdev_name, true, vbdev_compress_base_bdev_event_cb, 843 NULL, &bdev_desc); 844 if (rc) { 845 SPDK_ERRLOG("could not open bdev %s, error %s\n", bdev_name, spdk_strerror(-rc)); 846 free(init_ctx); 847 return rc; 848 } 849 850 comp_bdev = _prepare_for_load_init(bdev_desc, lb_size, comp_algo, comp_level); 851 if (comp_bdev == NULL) { 852 free(init_ctx); 853 spdk_bdev_close(bdev_desc); 854 return -EINVAL; 855 } 856 857 init_ctx->comp_bdev = comp_bdev; 858 859 /* Save the thread where the base device is opened */ 860 comp_bdev->thread = spdk_get_thread(); 861 862 comp_bdev->base_ch = spdk_bdev_get_io_channel(comp_bdev->base_desc); 863 864 spdk_reduce_vol_init(&comp_bdev->params, &comp_bdev->backing_dev, 865 pm_path, 866 vbdev_reduce_init_cb, 867 init_ctx); 868 return 0; 869 } 870 871 /* We provide this callback for the SPDK channel code to create a channel using 872 * the channel struct we provided in our module get_io_channel() entry point. Here 873 * we get and save off an underlying base channel of the device below us so that 874 * we can communicate with the base bdev on a per channel basis. If we needed 875 * our own poller for this vbdev, we'd register it here. 876 */ 877 static int 878 comp_bdev_ch_create_cb(void *io_device, void *ctx_buf) 879 { 880 struct vbdev_compress *comp_bdev = io_device; 881 882 /* Now set the reduce channel if it's not already set. */ 883 pthread_mutex_lock(&comp_bdev->reduce_lock); 884 if (comp_bdev->ch_count == 0) { 885 /* We use this queue to track outstanding IO in our layer. */ 886 TAILQ_INIT(&comp_bdev->pending_comp_ios); 887 888 /* We use this to queue up compression operations as needed. */ 889 TAILQ_INIT(&comp_bdev->queued_comp_ops); 890 891 comp_bdev->base_ch = spdk_bdev_get_io_channel(comp_bdev->base_desc); 892 comp_bdev->reduce_thread = spdk_get_thread(); 893 comp_bdev->accel_channel = spdk_accel_get_io_channel(); 894 } 895 comp_bdev->ch_count++; 896 pthread_mutex_unlock(&comp_bdev->reduce_lock); 897 898 return 0; 899 } 900 901 static void 902 _channel_cleanup(struct vbdev_compress *comp_bdev) 903 { 904 spdk_put_io_channel(comp_bdev->base_ch); 905 spdk_put_io_channel(comp_bdev->accel_channel); 906 comp_bdev->reduce_thread = NULL; 907 } 908 909 /* Used to reroute destroy_ch to the correct thread */ 910 static void 911 _comp_bdev_ch_destroy_cb(void *arg) 912 { 913 struct vbdev_compress *comp_bdev = arg; 914 915 pthread_mutex_lock(&comp_bdev->reduce_lock); 916 _channel_cleanup(comp_bdev); 917 pthread_mutex_unlock(&comp_bdev->reduce_lock); 918 } 919 920 /* We provide this callback for the SPDK channel code to destroy a channel 921 * created with our create callback. We just need to undo anything we did 922 * when we created. If this bdev used its own poller, we'd unregister it here. 923 */ 924 static void 925 comp_bdev_ch_destroy_cb(void *io_device, void *ctx_buf) 926 { 927 struct vbdev_compress *comp_bdev = io_device; 928 929 pthread_mutex_lock(&comp_bdev->reduce_lock); 930 comp_bdev->ch_count--; 931 if (comp_bdev->ch_count == 0) { 932 /* Send this request to the thread where the channel was created. */ 933 if (comp_bdev->reduce_thread != spdk_get_thread()) { 934 spdk_thread_send_msg(comp_bdev->reduce_thread, 935 _comp_bdev_ch_destroy_cb, comp_bdev); 936 } else { 937 _channel_cleanup(comp_bdev); 938 } 939 } 940 pthread_mutex_unlock(&comp_bdev->reduce_lock); 941 } 942 943 static int 944 _check_compress_bdev_comp_algo(enum spdk_accel_comp_algo algo, uint32_t comp_level) 945 { 946 uint32_t min_level, max_level; 947 int rc; 948 949 rc = spdk_accel_get_compress_level_range(algo, &min_level, &max_level); 950 if (rc != 0) { 951 return rc; 952 } 953 954 /* If both min_level and max_level are 0, the compression level can be ignored. 955 * The back-end implementation hardcodes the compression level. 956 */ 957 if (min_level == 0 && max_level == 0) { 958 return 0; 959 } 960 961 if (comp_level > max_level || comp_level < min_level) { 962 return -EINVAL; 963 } 964 965 return 0; 966 } 967 968 /* RPC entry point for compression vbdev creation. */ 969 int 970 create_compress_bdev(const char *bdev_name, const char *pm_path, uint32_t lb_size, 971 uint8_t comp_algo, uint32_t comp_level, 972 bdev_compress_create_cb cb_fn, void *cb_arg) 973 { 974 struct vbdev_compress *comp_bdev = NULL; 975 struct stat info; 976 int rc; 977 978 if (stat(pm_path, &info) != 0) { 979 SPDK_ERRLOG("PM path %s does not exist.\n", pm_path); 980 return -EINVAL; 981 } else if (!S_ISDIR(info.st_mode)) { 982 SPDK_ERRLOG("PM path %s is not a directory.\n", pm_path); 983 return -EINVAL; 984 } 985 986 if ((lb_size != 0) && (lb_size != LB_SIZE_4K) && (lb_size != LB_SIZE_512B)) { 987 SPDK_ERRLOG("Logical block size must be 512 or 4096\n"); 988 return -EINVAL; 989 } 990 991 rc = _check_compress_bdev_comp_algo(comp_algo, comp_level); 992 if (rc != 0) { 993 SPDK_ERRLOG("Compress bdev doesn't support compression algo(%u) or level(%u)\n", 994 comp_algo, comp_level); 995 return rc; 996 } 997 998 TAILQ_FOREACH(comp_bdev, &g_vbdev_comp, link) { 999 if (strcmp(bdev_name, comp_bdev->base_bdev->name) == 0) { 1000 SPDK_ERRLOG("Bass bdev %s already being used for a compress bdev\n", bdev_name); 1001 return -EBUSY; 1002 } 1003 } 1004 return vbdev_init_reduce(bdev_name, pm_path, lb_size, comp_algo, comp_level, cb_fn, cb_arg); 1005 } 1006 1007 static int 1008 vbdev_compress_init(void) 1009 { 1010 return 0; 1011 } 1012 1013 /* Called when the entire module is being torn down. */ 1014 static void 1015 vbdev_compress_finish(void) 1016 { 1017 /* TODO: unload vol in a future patch */ 1018 } 1019 1020 /* During init we'll be asked how much memory we'd like passed to us 1021 * in bev_io structures as context. Here's where we specify how 1022 * much context we want per IO. 1023 */ 1024 static int 1025 vbdev_compress_get_ctx_size(void) 1026 { 1027 return sizeof(struct comp_bdev_io); 1028 } 1029 1030 /* When we register our bdev this is how we specify our entry points. */ 1031 static const struct spdk_bdev_fn_table vbdev_compress_fn_table = { 1032 .destruct = vbdev_compress_destruct, 1033 .submit_request = vbdev_compress_submit_request, 1034 .io_type_supported = vbdev_compress_io_type_supported, 1035 .get_io_channel = vbdev_compress_get_io_channel, 1036 .dump_info_json = vbdev_compress_dump_info_json, 1037 .write_config_json = NULL, 1038 }; 1039 1040 static struct spdk_bdev_module compress_if = { 1041 .name = "compress", 1042 .module_init = vbdev_compress_init, 1043 .get_ctx_size = vbdev_compress_get_ctx_size, 1044 .examine_disk = vbdev_compress_examine, 1045 .module_fini = vbdev_compress_finish, 1046 .config_json = vbdev_compress_config_json 1047 }; 1048 1049 SPDK_BDEV_MODULE_REGISTER(compress, &compress_if) 1050 1051 static int _set_compbdev_name(struct vbdev_compress *comp_bdev) 1052 { 1053 struct spdk_bdev_alias *aliases; 1054 1055 if (!TAILQ_EMPTY(spdk_bdev_get_aliases(comp_bdev->base_bdev))) { 1056 aliases = TAILQ_FIRST(spdk_bdev_get_aliases(comp_bdev->base_bdev)); 1057 comp_bdev->comp_bdev.name = spdk_sprintf_alloc("COMP_%s", aliases->alias.name); 1058 if (!comp_bdev->comp_bdev.name) { 1059 SPDK_ERRLOG("could not allocate comp_bdev name for alias\n"); 1060 return -ENOMEM; 1061 } 1062 } else { 1063 comp_bdev->comp_bdev.name = spdk_sprintf_alloc("COMP_%s", comp_bdev->base_bdev->name); 1064 if (!comp_bdev->comp_bdev.name) { 1065 SPDK_ERRLOG("could not allocate comp_bdev name for unique name\n"); 1066 return -ENOMEM; 1067 } 1068 } 1069 return 0; 1070 } 1071 1072 static int 1073 vbdev_compress_claim(struct vbdev_compress *comp_bdev) 1074 { 1075 struct spdk_uuid ns_uuid; 1076 int rc; 1077 1078 if (_set_compbdev_name(comp_bdev)) { 1079 return -EINVAL; 1080 } 1081 1082 /* Note: some of the fields below will change in the future - for example, 1083 * blockcnt specifically will not match (the compressed volume size will 1084 * be slightly less than the base bdev size) 1085 */ 1086 comp_bdev->comp_bdev.product_name = COMP_BDEV_NAME; 1087 comp_bdev->comp_bdev.write_cache = comp_bdev->base_bdev->write_cache; 1088 1089 comp_bdev->comp_bdev.optimal_io_boundary = 1090 comp_bdev->params.chunk_size / comp_bdev->params.logical_block_size; 1091 1092 comp_bdev->comp_bdev.split_on_optimal_io_boundary = true; 1093 1094 comp_bdev->comp_bdev.blocklen = comp_bdev->params.logical_block_size; 1095 comp_bdev->comp_bdev.blockcnt = comp_bdev->params.vol_size / comp_bdev->comp_bdev.blocklen; 1096 assert(comp_bdev->comp_bdev.blockcnt > 0); 1097 1098 /* This is the context that is passed to us when the bdev 1099 * layer calls in so we'll save our comp_bdev node here. 1100 */ 1101 comp_bdev->comp_bdev.ctxt = comp_bdev; 1102 comp_bdev->comp_bdev.fn_table = &vbdev_compress_fn_table; 1103 comp_bdev->comp_bdev.module = &compress_if; 1104 1105 /* Generate UUID based on namespace UUID + base bdev UUID. */ 1106 spdk_uuid_parse(&ns_uuid, BDEV_COMPRESS_NAMESPACE_UUID); 1107 rc = spdk_uuid_generate_sha1(&comp_bdev->comp_bdev.uuid, &ns_uuid, 1108 (const char *)&comp_bdev->base_bdev->uuid, sizeof(struct spdk_uuid)); 1109 if (rc) { 1110 SPDK_ERRLOG("Unable to generate new UUID for compress bdev, error %s\n", spdk_strerror(-rc)); 1111 return -EINVAL; 1112 } 1113 1114 pthread_mutex_init(&comp_bdev->reduce_lock, NULL); 1115 1116 /* Save the thread where the base device is opened */ 1117 comp_bdev->thread = spdk_get_thread(); 1118 1119 spdk_io_device_register(comp_bdev, comp_bdev_ch_create_cb, comp_bdev_ch_destroy_cb, 1120 sizeof(struct comp_io_channel), 1121 comp_bdev->comp_bdev.name); 1122 1123 rc = spdk_bdev_module_claim_bdev(comp_bdev->base_bdev, comp_bdev->base_desc, 1124 comp_bdev->comp_bdev.module); 1125 if (rc) { 1126 SPDK_ERRLOG("could not claim bdev %s, error %s\n", spdk_bdev_get_name(comp_bdev->base_bdev), 1127 spdk_strerror(-rc)); 1128 goto error_claim; 1129 } 1130 1131 rc = spdk_bdev_register(&comp_bdev->comp_bdev); 1132 if (rc < 0) { 1133 SPDK_ERRLOG("trying to register bdev, error %s\n", spdk_strerror(-rc)); 1134 goto error_bdev_register; 1135 } 1136 1137 TAILQ_INSERT_TAIL(&g_vbdev_comp, comp_bdev, link); 1138 1139 SPDK_NOTICELOG("registered io_device and virtual bdev for: %s\n", comp_bdev->comp_bdev.name); 1140 1141 return 0; 1142 1143 /* Error cleanup paths. */ 1144 error_bdev_register: 1145 spdk_bdev_module_release_bdev(comp_bdev->base_bdev); 1146 error_claim: 1147 spdk_io_device_unregister(comp_bdev, NULL); 1148 free(comp_bdev->comp_bdev.name); 1149 return rc; 1150 } 1151 1152 static void 1153 _vbdev_compress_delete_done(void *_ctx) 1154 { 1155 struct vbdev_comp_delete_ctx *ctx = _ctx; 1156 1157 ctx->cb_fn(ctx->cb_arg, ctx->cb_rc); 1158 1159 free(ctx); 1160 } 1161 1162 static void 1163 vbdev_compress_delete_done(void *cb_arg, int bdeverrno) 1164 { 1165 struct vbdev_comp_delete_ctx *ctx = cb_arg; 1166 1167 ctx->cb_rc = bdeverrno; 1168 1169 if (ctx->orig_thread != spdk_get_thread()) { 1170 spdk_thread_send_msg(ctx->orig_thread, _vbdev_compress_delete_done, ctx); 1171 } else { 1172 _vbdev_compress_delete_done(ctx); 1173 } 1174 } 1175 1176 void 1177 bdev_compress_delete(const char *name, spdk_delete_compress_complete cb_fn, void *cb_arg) 1178 { 1179 struct vbdev_compress *comp_bdev = NULL; 1180 struct vbdev_comp_delete_ctx *ctx; 1181 1182 TAILQ_FOREACH(comp_bdev, &g_vbdev_comp, link) { 1183 if (strcmp(name, comp_bdev->comp_bdev.name) == 0) { 1184 break; 1185 } 1186 } 1187 1188 if (comp_bdev == NULL) { 1189 cb_fn(cb_arg, -ENODEV); 1190 return; 1191 } 1192 1193 ctx = calloc(1, sizeof(*ctx)); 1194 if (ctx == NULL) { 1195 SPDK_ERRLOG("Failed to allocate delete context\n"); 1196 cb_fn(cb_arg, -ENOMEM); 1197 return; 1198 } 1199 1200 /* Save these for after the vol is destroyed. */ 1201 ctx->cb_fn = cb_fn; 1202 ctx->cb_arg = cb_arg; 1203 ctx->orig_thread = spdk_get_thread(); 1204 1205 comp_bdev->delete_ctx = ctx; 1206 1207 /* Tell reducelib that we're done with this volume. */ 1208 if (comp_bdev->orphaned == false) { 1209 spdk_reduce_vol_unload(comp_bdev->vol, delete_vol_unload_cb, comp_bdev); 1210 } else { 1211 delete_vol_unload_cb(comp_bdev, 0); 1212 } 1213 } 1214 1215 static void 1216 _vbdev_reduce_load_unload_cb(void *ctx, int reduce_errno) 1217 { 1218 } 1219 1220 static void 1221 _vbdev_reduce_load_cb(void *ctx) 1222 { 1223 struct vbdev_compress *comp_bdev = ctx; 1224 int rc; 1225 1226 assert(comp_bdev->base_desc != NULL); 1227 1228 /* Done with metadata operations */ 1229 spdk_put_io_channel(comp_bdev->base_ch); 1230 1231 if (comp_bdev->reduce_errno == 0) { 1232 rc = vbdev_compress_claim(comp_bdev); 1233 if (rc != 0) { 1234 spdk_reduce_vol_unload(comp_bdev->vol, _vbdev_reduce_load_unload_cb, NULL); 1235 goto err; 1236 } 1237 } else if (comp_bdev->reduce_errno == -ENOENT) { 1238 if (_set_compbdev_name(comp_bdev)) { 1239 goto err; 1240 } 1241 1242 /* Save the thread where the base device is opened */ 1243 comp_bdev->thread = spdk_get_thread(); 1244 1245 comp_bdev->comp_bdev.module = &compress_if; 1246 pthread_mutex_init(&comp_bdev->reduce_lock, NULL); 1247 rc = spdk_bdev_module_claim_bdev(comp_bdev->base_bdev, comp_bdev->base_desc, 1248 comp_bdev->comp_bdev.module); 1249 if (rc) { 1250 SPDK_ERRLOG("could not claim bdev %s, error %s\n", spdk_bdev_get_name(comp_bdev->base_bdev), 1251 spdk_strerror(-rc)); 1252 free(comp_bdev->comp_bdev.name); 1253 goto err; 1254 } 1255 1256 comp_bdev->orphaned = true; 1257 TAILQ_INSERT_TAIL(&g_vbdev_comp, comp_bdev, link); 1258 } else { 1259 if (comp_bdev->reduce_errno != -EILSEQ) { 1260 SPDK_ERRLOG("for vol %s, error %s\n", spdk_bdev_get_name(comp_bdev->base_bdev), 1261 spdk_strerror(-comp_bdev->reduce_errno)); 1262 } 1263 goto err; 1264 } 1265 1266 spdk_bdev_module_examine_done(&compress_if); 1267 return; 1268 1269 err: 1270 /* Close the underlying bdev on its same opened thread. */ 1271 spdk_bdev_close(comp_bdev->base_desc); 1272 free(comp_bdev); 1273 spdk_bdev_module_examine_done(&compress_if); 1274 } 1275 1276 /* Callback from reduce for then load is complete. We'll pass the vbdev_comp struct 1277 * used for initial metadata operations to claim where it will be further filled out 1278 * and added to the global list. 1279 */ 1280 static void 1281 vbdev_reduce_load_cb(void *cb_arg, struct spdk_reduce_vol *vol, int reduce_errno) 1282 { 1283 struct vbdev_compress *comp_bdev = cb_arg; 1284 1285 if (reduce_errno == 0) { 1286 /* Update information following volume load. */ 1287 comp_bdev->vol = vol; 1288 memcpy(&comp_bdev->params, spdk_reduce_vol_get_params(vol), 1289 sizeof(struct spdk_reduce_vol_params)); 1290 comp_bdev->comp_algo = comp_bdev->params.comp_algo; 1291 comp_bdev->comp_level = comp_bdev->params.comp_level; 1292 } 1293 1294 comp_bdev->reduce_errno = reduce_errno; 1295 1296 if (comp_bdev->thread && comp_bdev->thread != spdk_get_thread()) { 1297 spdk_thread_send_msg(comp_bdev->thread, _vbdev_reduce_load_cb, comp_bdev); 1298 } else { 1299 _vbdev_reduce_load_cb(comp_bdev); 1300 } 1301 1302 } 1303 1304 /* Examine_disk entry point: will do a metadata load to see if this is ours, 1305 * and if so will go ahead and claim it. 1306 */ 1307 static void 1308 vbdev_compress_examine(struct spdk_bdev *bdev) 1309 { 1310 struct spdk_bdev_desc *bdev_desc = NULL; 1311 struct vbdev_compress *comp_bdev; 1312 int rc; 1313 1314 if (strcmp(bdev->product_name, COMP_BDEV_NAME) == 0) { 1315 spdk_bdev_module_examine_done(&compress_if); 1316 return; 1317 } 1318 1319 rc = spdk_bdev_open_ext(spdk_bdev_get_name(bdev), false, 1320 vbdev_compress_base_bdev_event_cb, NULL, &bdev_desc); 1321 if (rc) { 1322 SPDK_ERRLOG("could not open bdev %s, error %s\n", spdk_bdev_get_name(bdev), 1323 spdk_strerror(-rc)); 1324 spdk_bdev_module_examine_done(&compress_if); 1325 return; 1326 } 1327 1328 comp_bdev = _prepare_for_load_init(bdev_desc, 0, SPDK_ACCEL_COMP_ALGO_DEFLATE, 1); 1329 if (comp_bdev == NULL) { 1330 spdk_bdev_close(bdev_desc); 1331 spdk_bdev_module_examine_done(&compress_if); 1332 return; 1333 } 1334 1335 /* Save the thread where the base device is opened */ 1336 comp_bdev->thread = spdk_get_thread(); 1337 1338 comp_bdev->base_ch = spdk_bdev_get_io_channel(comp_bdev->base_desc); 1339 spdk_reduce_vol_load(&comp_bdev->backing_dev, vbdev_reduce_load_cb, comp_bdev); 1340 } 1341 1342 SPDK_LOG_REGISTER_COMPONENT(vbdev_compress) 1343