1 /* SPDX-License-Identifier: BSD-3-Clause 2 * Copyright (C) 2017 Intel Corporation. 3 * All rights reserved. 4 * Copyright (c) 2022-2023 NVIDIA CORPORATION & AFFILIATES. All rights reserved. 5 */ 6 7 #include "spdk/stdinc.h" 8 9 #include "blobstore.h" 10 #include "request.h" 11 12 #include "spdk/thread.h" 13 #include "spdk/queue.h" 14 15 #include "spdk/log.h" 16 17 void 18 bs_call_cpl(struct spdk_bs_cpl *cpl, int bserrno) 19 { 20 switch (cpl->type) { 21 case SPDK_BS_CPL_TYPE_BS_BASIC: 22 cpl->u.bs_basic.cb_fn(cpl->u.bs_basic.cb_arg, 23 bserrno); 24 break; 25 case SPDK_BS_CPL_TYPE_BS_HANDLE: 26 cpl->u.bs_handle.cb_fn(cpl->u.bs_handle.cb_arg, 27 bserrno == 0 ? cpl->u.bs_handle.bs : NULL, 28 bserrno); 29 break; 30 case SPDK_BS_CPL_TYPE_BLOB_BASIC: 31 cpl->u.blob_basic.cb_fn(cpl->u.blob_basic.cb_arg, 32 bserrno); 33 break; 34 case SPDK_BS_CPL_TYPE_BLOBID: 35 cpl->u.blobid.cb_fn(cpl->u.blobid.cb_arg, 36 bserrno == 0 ? cpl->u.blobid.blobid : SPDK_BLOBID_INVALID, 37 bserrno); 38 break; 39 case SPDK_BS_CPL_TYPE_BLOB_HANDLE: 40 cpl->u.blob_handle.cb_fn(cpl->u.blob_handle.cb_arg, 41 bserrno == 0 ? cpl->u.blob_handle.blob : NULL, 42 bserrno); 43 break; 44 case SPDK_BS_CPL_TYPE_NESTED_SEQUENCE: 45 cpl->u.nested_seq.cb_fn(cpl->u.nested_seq.cb_arg, 46 cpl->u.nested_seq.parent, 47 bserrno); 48 break; 49 case SPDK_BS_CPL_TYPE_NONE: 50 /* this completion's callback is handled elsewhere */ 51 break; 52 } 53 } 54 55 static void 56 bs_request_set_complete(struct spdk_bs_request_set *set) 57 { 58 struct spdk_bs_cpl cpl = set->cpl; 59 int bserrno = set->bserrno; 60 61 TAILQ_INSERT_TAIL(&set->channel->reqs, set, link); 62 63 bs_call_cpl(&cpl, bserrno); 64 } 65 66 static void 67 bs_sequence_completion(struct spdk_io_channel *channel, void *cb_arg, int bserrno) 68 { 69 struct spdk_bs_request_set *set = cb_arg; 70 71 set->bserrno = bserrno; 72 set->u.sequence.cb_fn((spdk_bs_sequence_t *)set, set->u.sequence.cb_arg, bserrno); 73 } 74 75 static inline spdk_bs_sequence_t * 76 bs_sequence_start(struct spdk_io_channel *_channel, struct spdk_bs_cpl *cpl, 77 struct spdk_io_channel *back_channel) 78 { 79 struct spdk_bs_channel *channel; 80 struct spdk_bs_request_set *set; 81 82 channel = spdk_io_channel_get_ctx(_channel); 83 assert(channel != NULL); 84 set = TAILQ_FIRST(&channel->reqs); 85 if (!set) { 86 return NULL; 87 } 88 TAILQ_REMOVE(&channel->reqs, set, link); 89 90 set->cpl = *cpl; 91 set->bserrno = 0; 92 set->channel = channel; 93 set->back_channel = back_channel; 94 95 set->cb_args.cb_fn = bs_sequence_completion; 96 set->cb_args.cb_arg = set; 97 set->cb_args.channel = channel->dev_channel; 98 set->ext_io_opts = NULL; 99 100 return (spdk_bs_sequence_t *)set; 101 } 102 103 /* Use when performing IO directly on the blobstore (e.g. metadata - not a blob). */ 104 spdk_bs_sequence_t * 105 bs_sequence_start_bs(struct spdk_io_channel *_channel, struct spdk_bs_cpl *cpl) 106 { 107 return bs_sequence_start(_channel, cpl, _channel); 108 } 109 110 /* Use when performing IO on a blob. */ 111 spdk_bs_sequence_t * 112 bs_sequence_start_blob(struct spdk_io_channel *_channel, struct spdk_bs_cpl *cpl, 113 struct spdk_blob *blob) 114 { 115 struct spdk_io_channel *esnap_ch = _channel; 116 117 if (spdk_blob_is_esnap_clone(blob)) { 118 esnap_ch = blob_esnap_get_io_channel(_channel, blob); 119 if (esnap_ch == NULL) { 120 /* 121 * The most likely reason we are here is because of some logic error 122 * elsewhere that caused channel allocations to fail. We could get here due 123 * to being out of memory as well. If we are out of memory, the process is 124 * this will be just one of many problems that this process will be having. 125 * Killing it off debug builds now due to logic errors is the right thing to 126 * do and killing it off due to ENOMEM is no big loss. 127 */ 128 assert(false); 129 return NULL; 130 } 131 } 132 return bs_sequence_start(_channel, cpl, esnap_ch); 133 } 134 135 void 136 bs_sequence_read_bs_dev(spdk_bs_sequence_t *seq, struct spdk_bs_dev *bs_dev, 137 void *payload, uint64_t lba, uint32_t lba_count, 138 spdk_bs_sequence_cpl cb_fn, void *cb_arg) 139 { 140 struct spdk_bs_request_set *set = (struct spdk_bs_request_set *)seq; 141 struct spdk_io_channel *back_channel = set->back_channel; 142 143 SPDK_DEBUGLOG(blob_rw, "Reading %" PRIu32 " blocks from LBA %" PRIu64 "\n", lba_count, 144 lba); 145 146 set->u.sequence.cb_fn = cb_fn; 147 set->u.sequence.cb_arg = cb_arg; 148 149 bs_dev->read(bs_dev, back_channel, payload, lba, lba_count, &set->cb_args); 150 } 151 152 void 153 bs_sequence_read_dev(spdk_bs_sequence_t *seq, void *payload, 154 uint64_t lba, uint32_t lba_count, 155 spdk_bs_sequence_cpl cb_fn, void *cb_arg) 156 { 157 struct spdk_bs_request_set *set = (struct spdk_bs_request_set *)seq; 158 struct spdk_bs_channel *channel = set->channel; 159 160 SPDK_DEBUGLOG(blob_rw, "Reading %" PRIu32 " blocks from LBA %" PRIu64 "\n", lba_count, 161 lba); 162 163 set->u.sequence.cb_fn = cb_fn; 164 set->u.sequence.cb_arg = cb_arg; 165 166 channel->dev->read(channel->dev, channel->dev_channel, payload, lba, lba_count, &set->cb_args); 167 } 168 169 void 170 bs_sequence_write_dev(spdk_bs_sequence_t *seq, void *payload, 171 uint64_t lba, uint32_t lba_count, 172 spdk_bs_sequence_cpl cb_fn, void *cb_arg) 173 { 174 struct spdk_bs_request_set *set = (struct spdk_bs_request_set *)seq; 175 struct spdk_bs_channel *channel = set->channel; 176 177 SPDK_DEBUGLOG(blob_rw, "Writing %" PRIu32 " blocks from LBA %" PRIu64 "\n", lba_count, 178 lba); 179 180 set->u.sequence.cb_fn = cb_fn; 181 set->u.sequence.cb_arg = cb_arg; 182 183 channel->dev->write(channel->dev, channel->dev_channel, payload, lba, lba_count, 184 &set->cb_args); 185 } 186 187 void 188 bs_sequence_readv_bs_dev(spdk_bs_sequence_t *seq, struct spdk_bs_dev *bs_dev, 189 struct iovec *iov, int iovcnt, uint64_t lba, uint32_t lba_count, 190 spdk_bs_sequence_cpl cb_fn, void *cb_arg) 191 { 192 struct spdk_bs_request_set *set = (struct spdk_bs_request_set *)seq; 193 struct spdk_io_channel *back_channel = set->back_channel; 194 195 SPDK_DEBUGLOG(blob_rw, "Reading %" PRIu32 " blocks from LBA %" PRIu64 "\n", lba_count, 196 lba); 197 198 set->u.sequence.cb_fn = cb_fn; 199 set->u.sequence.cb_arg = cb_arg; 200 201 if (set->ext_io_opts) { 202 assert(bs_dev->readv_ext); 203 bs_dev->readv_ext(bs_dev, back_channel, iov, iovcnt, lba, lba_count, 204 &set->cb_args, set->ext_io_opts); 205 } else { 206 bs_dev->readv(bs_dev, back_channel, iov, iovcnt, lba, lba_count, &set->cb_args); 207 } 208 } 209 210 void 211 bs_sequence_readv_dev(spdk_bs_sequence_t *seq, struct iovec *iov, int iovcnt, 212 uint64_t lba, uint32_t lba_count, spdk_bs_sequence_cpl cb_fn, void *cb_arg) 213 { 214 struct spdk_bs_request_set *set = (struct spdk_bs_request_set *)seq; 215 struct spdk_bs_channel *channel = set->channel; 216 217 SPDK_DEBUGLOG(blob_rw, "Reading %" PRIu32 " blocks from LBA %" PRIu64 "\n", lba_count, 218 lba); 219 220 set->u.sequence.cb_fn = cb_fn; 221 set->u.sequence.cb_arg = cb_arg; 222 if (set->ext_io_opts) { 223 assert(channel->dev->readv_ext); 224 channel->dev->readv_ext(channel->dev, channel->dev_channel, iov, iovcnt, lba, lba_count, 225 &set->cb_args, set->ext_io_opts); 226 } else { 227 channel->dev->readv(channel->dev, channel->dev_channel, iov, iovcnt, lba, lba_count, &set->cb_args); 228 } 229 } 230 231 void 232 bs_sequence_writev_dev(spdk_bs_sequence_t *seq, struct iovec *iov, int iovcnt, 233 uint64_t lba, uint32_t lba_count, 234 spdk_bs_sequence_cpl cb_fn, void *cb_arg) 235 { 236 struct spdk_bs_request_set *set = (struct spdk_bs_request_set *)seq; 237 struct spdk_bs_channel *channel = set->channel; 238 239 SPDK_DEBUGLOG(blob_rw, "Writing %" PRIu32 " blocks from LBA %" PRIu64 "\n", lba_count, 240 lba); 241 242 set->u.sequence.cb_fn = cb_fn; 243 set->u.sequence.cb_arg = cb_arg; 244 245 if (set->ext_io_opts) { 246 assert(channel->dev->writev_ext); 247 channel->dev->writev_ext(channel->dev, channel->dev_channel, iov, iovcnt, lba, lba_count, 248 &set->cb_args, set->ext_io_opts); 249 } else { 250 channel->dev->writev(channel->dev, channel->dev_channel, iov, iovcnt, lba, lba_count, 251 &set->cb_args); 252 } 253 } 254 255 void 256 bs_sequence_write_zeroes_dev(spdk_bs_sequence_t *seq, 257 uint64_t lba, uint64_t lba_count, 258 spdk_bs_sequence_cpl cb_fn, void *cb_arg) 259 { 260 struct spdk_bs_request_set *set = (struct spdk_bs_request_set *)seq; 261 struct spdk_bs_channel *channel = set->channel; 262 263 SPDK_DEBUGLOG(blob_rw, "writing zeroes to %" PRIu64 " blocks at LBA %" PRIu64 "\n", 264 lba_count, lba); 265 266 set->u.sequence.cb_fn = cb_fn; 267 set->u.sequence.cb_arg = cb_arg; 268 269 channel->dev->write_zeroes(channel->dev, channel->dev_channel, lba, lba_count, 270 &set->cb_args); 271 } 272 273 void 274 bs_sequence_copy_dev(spdk_bs_sequence_t *seq, uint64_t dst_lba, uint64_t src_lba, 275 uint64_t lba_count, spdk_bs_sequence_cpl cb_fn, void *cb_arg) 276 { 277 struct spdk_bs_request_set *set = (struct spdk_bs_request_set *)seq; 278 struct spdk_bs_channel *channel = set->channel; 279 280 SPDK_DEBUGLOG(blob_rw, "Copying %" PRIu64 " blocks from LBA %" PRIu64 " to LBA %" PRIu64 "\n", 281 lba_count, src_lba, dst_lba); 282 283 set->u.sequence.cb_fn = cb_fn; 284 set->u.sequence.cb_arg = cb_arg; 285 286 channel->dev->copy(channel->dev, channel->dev_channel, dst_lba, src_lba, lba_count, &set->cb_args); 287 } 288 289 void 290 bs_sequence_finish(spdk_bs_sequence_t *seq, int bserrno) 291 { 292 if (bserrno != 0) { 293 seq->bserrno = bserrno; 294 } 295 bs_request_set_complete((struct spdk_bs_request_set *)seq); 296 } 297 298 void 299 bs_user_op_sequence_finish(void *cb_arg, int bserrno) 300 { 301 spdk_bs_sequence_t *seq = cb_arg; 302 303 bs_sequence_finish(seq, bserrno); 304 } 305 306 static void 307 bs_batch_completion(struct spdk_io_channel *_channel, 308 void *cb_arg, int bserrno) 309 { 310 struct spdk_bs_request_set *set = cb_arg; 311 312 set->u.batch.outstanding_ops--; 313 if (bserrno != 0) { 314 set->bserrno = bserrno; 315 } 316 317 if (set->u.batch.outstanding_ops == 0 && set->u.batch.batch_closed) { 318 if (set->u.batch.cb_fn) { 319 set->cb_args.cb_fn = bs_sequence_completion; 320 set->u.batch.cb_fn((spdk_bs_sequence_t *)set, set->u.batch.cb_arg, bserrno); 321 } else { 322 bs_request_set_complete(set); 323 } 324 } 325 } 326 327 spdk_bs_batch_t * 328 bs_batch_open(struct spdk_io_channel *_channel, struct spdk_bs_cpl *cpl, struct spdk_blob *blob) 329 { 330 struct spdk_bs_channel *channel; 331 struct spdk_bs_request_set *set; 332 struct spdk_io_channel *back_channel = _channel; 333 334 if (spdk_blob_is_esnap_clone(blob)) { 335 back_channel = blob_esnap_get_io_channel(_channel, blob); 336 if (back_channel == NULL) { 337 return NULL; 338 } 339 } 340 341 channel = spdk_io_channel_get_ctx(_channel); 342 assert(channel != NULL); 343 set = TAILQ_FIRST(&channel->reqs); 344 if (!set) { 345 return NULL; 346 } 347 TAILQ_REMOVE(&channel->reqs, set, link); 348 349 set->cpl = *cpl; 350 set->bserrno = 0; 351 set->channel = channel; 352 set->back_channel = back_channel; 353 354 set->u.batch.cb_fn = NULL; 355 set->u.batch.cb_arg = NULL; 356 set->u.batch.outstanding_ops = 0; 357 set->u.batch.batch_closed = 0; 358 359 set->cb_args.cb_fn = bs_batch_completion; 360 set->cb_args.cb_arg = set; 361 set->cb_args.channel = channel->dev_channel; 362 363 return (spdk_bs_batch_t *)set; 364 } 365 366 void 367 bs_batch_read_bs_dev(spdk_bs_batch_t *batch, struct spdk_bs_dev *bs_dev, 368 void *payload, uint64_t lba, uint32_t lba_count) 369 { 370 struct spdk_bs_request_set *set = (struct spdk_bs_request_set *)batch; 371 struct spdk_io_channel *back_channel = set->back_channel; 372 373 SPDK_DEBUGLOG(blob_rw, "Reading %" PRIu32 " blocks from LBA %" PRIu64 "\n", lba_count, 374 lba); 375 376 set->u.batch.outstanding_ops++; 377 bs_dev->read(bs_dev, back_channel, payload, lba, lba_count, &set->cb_args); 378 } 379 380 void 381 bs_batch_read_dev(spdk_bs_batch_t *batch, void *payload, 382 uint64_t lba, uint32_t lba_count) 383 { 384 struct spdk_bs_request_set *set = (struct spdk_bs_request_set *)batch; 385 struct spdk_bs_channel *channel = set->channel; 386 387 SPDK_DEBUGLOG(blob_rw, "Reading %" PRIu32 " blocks from LBA %" PRIu64 "\n", lba_count, 388 lba); 389 390 set->u.batch.outstanding_ops++; 391 channel->dev->read(channel->dev, channel->dev_channel, payload, lba, lba_count, &set->cb_args); 392 } 393 394 void 395 bs_batch_write_dev(spdk_bs_batch_t *batch, void *payload, 396 uint64_t lba, uint32_t lba_count) 397 { 398 struct spdk_bs_request_set *set = (struct spdk_bs_request_set *)batch; 399 struct spdk_bs_channel *channel = set->channel; 400 401 SPDK_DEBUGLOG(blob_rw, "Writing %" PRIu32 " blocks to LBA %" PRIu64 "\n", lba_count, lba); 402 403 set->u.batch.outstanding_ops++; 404 channel->dev->write(channel->dev, channel->dev_channel, payload, lba, lba_count, 405 &set->cb_args); 406 } 407 408 void 409 bs_batch_unmap_dev(spdk_bs_batch_t *batch, 410 uint64_t lba, uint64_t lba_count) 411 { 412 struct spdk_bs_request_set *set = (struct spdk_bs_request_set *)batch; 413 struct spdk_bs_channel *channel = set->channel; 414 415 SPDK_DEBUGLOG(blob_rw, "Unmapping %" PRIu64 " blocks at LBA %" PRIu64 "\n", lba_count, 416 lba); 417 418 set->u.batch.outstanding_ops++; 419 channel->dev->unmap(channel->dev, channel->dev_channel, lba, lba_count, 420 &set->cb_args); 421 } 422 423 void 424 bs_batch_write_zeroes_dev(spdk_bs_batch_t *batch, 425 uint64_t lba, uint64_t lba_count) 426 { 427 struct spdk_bs_request_set *set = (struct spdk_bs_request_set *)batch; 428 struct spdk_bs_channel *channel = set->channel; 429 430 SPDK_DEBUGLOG(blob_rw, "Zeroing %" PRIu64 " blocks at LBA %" PRIu64 "\n", lba_count, lba); 431 432 set->u.batch.outstanding_ops++; 433 channel->dev->write_zeroes(channel->dev, channel->dev_channel, lba, lba_count, 434 &set->cb_args); 435 } 436 437 void 438 bs_batch_close(spdk_bs_batch_t *batch) 439 { 440 struct spdk_bs_request_set *set = (struct spdk_bs_request_set *)batch; 441 442 set->u.batch.batch_closed = 1; 443 444 if (set->u.batch.outstanding_ops == 0) { 445 if (set->u.batch.cb_fn) { 446 set->cb_args.cb_fn = bs_sequence_completion; 447 set->u.batch.cb_fn((spdk_bs_sequence_t *)set, set->u.batch.cb_arg, set->bserrno); 448 } else { 449 bs_request_set_complete(set); 450 } 451 } 452 } 453 454 spdk_bs_batch_t * 455 bs_sequence_to_batch(spdk_bs_sequence_t *seq, spdk_bs_sequence_cpl cb_fn, void *cb_arg) 456 { 457 struct spdk_bs_request_set *set = (struct spdk_bs_request_set *)seq; 458 459 set->u.batch.cb_fn = cb_fn; 460 set->u.batch.cb_arg = cb_arg; 461 set->u.batch.outstanding_ops = 0; 462 set->u.batch.batch_closed = 0; 463 464 set->cb_args.cb_fn = bs_batch_completion; 465 466 return set; 467 } 468 469 spdk_bs_user_op_t * 470 bs_user_op_alloc(struct spdk_io_channel *_channel, struct spdk_bs_cpl *cpl, 471 enum spdk_blob_op_type op_type, struct spdk_blob *blob, 472 void *payload, int iovcnt, uint64_t offset, uint64_t length) 473 { 474 struct spdk_bs_channel *channel; 475 struct spdk_bs_request_set *set; 476 struct spdk_bs_user_op_args *args; 477 478 channel = spdk_io_channel_get_ctx(_channel); 479 assert(channel != NULL); 480 set = TAILQ_FIRST(&channel->reqs); 481 if (!set) { 482 return NULL; 483 } 484 TAILQ_REMOVE(&channel->reqs, set, link); 485 486 set->cpl = *cpl; 487 set->channel = channel; 488 set->back_channel = NULL; 489 set->ext_io_opts = NULL; 490 491 args = &set->u.user_op; 492 493 args->type = op_type; 494 args->iovcnt = iovcnt; 495 args->blob = blob; 496 args->offset = offset; 497 args->length = length; 498 args->payload = payload; 499 500 return (spdk_bs_user_op_t *)set; 501 } 502 503 void 504 bs_user_op_execute(spdk_bs_user_op_t *op) 505 { 506 struct spdk_bs_request_set *set; 507 struct spdk_bs_user_op_args *args; 508 struct spdk_io_channel *ch; 509 510 set = (struct spdk_bs_request_set *)op; 511 args = &set->u.user_op; 512 ch = spdk_io_channel_from_ctx(set->channel); 513 514 switch (args->type) { 515 case SPDK_BLOB_READ: 516 spdk_blob_io_read(args->blob, ch, args->payload, args->offset, args->length, 517 set->cpl.u.blob_basic.cb_fn, set->cpl.u.blob_basic.cb_arg); 518 break; 519 case SPDK_BLOB_WRITE: 520 spdk_blob_io_write(args->blob, ch, args->payload, args->offset, args->length, 521 set->cpl.u.blob_basic.cb_fn, set->cpl.u.blob_basic.cb_arg); 522 break; 523 case SPDK_BLOB_UNMAP: 524 spdk_blob_io_unmap(args->blob, ch, args->offset, args->length, 525 set->cpl.u.blob_basic.cb_fn, set->cpl.u.blob_basic.cb_arg); 526 break; 527 case SPDK_BLOB_WRITE_ZEROES: 528 spdk_blob_io_write_zeroes(args->blob, ch, args->offset, args->length, 529 set->cpl.u.blob_basic.cb_fn, set->cpl.u.blob_basic.cb_arg); 530 break; 531 case SPDK_BLOB_READV: 532 spdk_blob_io_readv_ext(args->blob, ch, args->payload, args->iovcnt, 533 args->offset, args->length, 534 set->cpl.u.blob_basic.cb_fn, set->cpl.u.blob_basic.cb_arg, 535 set->ext_io_opts); 536 break; 537 case SPDK_BLOB_WRITEV: 538 spdk_blob_io_writev_ext(args->blob, ch, args->payload, args->iovcnt, 539 args->offset, args->length, 540 set->cpl.u.blob_basic.cb_fn, set->cpl.u.blob_basic.cb_arg, 541 set->ext_io_opts); 542 break; 543 } 544 TAILQ_INSERT_TAIL(&set->channel->reqs, set, link); 545 } 546 547 void 548 bs_user_op_abort(spdk_bs_user_op_t *op, int bserrno) 549 { 550 struct spdk_bs_request_set *set; 551 552 set = (struct spdk_bs_request_set *)op; 553 554 set->cpl.u.blob_basic.cb_fn(set->cpl.u.blob_basic.cb_arg, bserrno); 555 TAILQ_INSERT_TAIL(&set->channel->reqs, set, link); 556 } 557 558 SPDK_LOG_REGISTER_COMPONENT(blob_rw) 559