1 /*- 2 * BSD LICENSE 3 * 4 * Copyright (c) Intel Corporation. All rights reserved. 5 * Copyright (c) 2019 Mellanox Technologies LTD. All rights reserved. 6 * 7 * Redistribution and use in source and binary forms, with or without 8 * modification, are permitted provided that the following conditions 9 * are met: 10 * 11 * * Redistributions of source code must retain the above copyright 12 * notice, this list of conditions and the following disclaimer. 13 * * Redistributions in binary form must reproduce the above copyright 14 * notice, this list of conditions and the following disclaimer in 15 * the documentation and/or other materials provided with the 16 * distribution. 17 * * Neither the name of Intel Corporation nor the names of its 18 * contributors may be used to endorse or promote products derived 19 * from this software without specific prior written permission. 20 * 21 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 22 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 23 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR 24 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT 25 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, 26 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT 27 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 28 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY 29 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 30 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE 31 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 32 */ 33 34 #include "spdk/stdinc.h" 35 36 #include "spdk/bdev.h" 37 38 #include "spdk/config.h" 39 #include "spdk/env.h" 40 #include "spdk/thread.h" 41 #include "spdk/likely.h" 42 #include "spdk/queue.h" 43 #include "spdk/nvme_spec.h" 44 #include "spdk/scsi_spec.h" 45 #include "spdk/notify.h" 46 #include "spdk/util.h" 47 #include "spdk/trace.h" 48 49 #include "spdk/bdev_module.h" 50 #include "spdk/log.h" 51 #include "spdk/string.h" 52 53 #include "bdev_internal.h" 54 55 #ifdef SPDK_CONFIG_VTUNE 56 #include "ittnotify.h" 57 #include "ittnotify_types.h" 58 int __itt_init_ittlib(const char *, __itt_group_id); 59 #endif 60 61 #define SPDK_BDEV_IO_POOL_SIZE (64 * 1024 - 1) 62 #define SPDK_BDEV_IO_CACHE_SIZE 256 63 #define SPDK_BDEV_AUTO_EXAMINE true 64 #define BUF_SMALL_POOL_SIZE 8191 65 #define BUF_LARGE_POOL_SIZE 1023 66 #define NOMEM_THRESHOLD_COUNT 8 67 #define ZERO_BUFFER_SIZE 0x100000 68 69 #define OWNER_BDEV 0x2 70 71 #define OBJECT_BDEV_IO 0x2 72 73 #define TRACE_GROUP_BDEV 0x3 74 #define TRACE_BDEV_IO_START SPDK_TPOINT_ID(TRACE_GROUP_BDEV, 0x0) 75 #define TRACE_BDEV_IO_DONE SPDK_TPOINT_ID(TRACE_GROUP_BDEV, 0x1) 76 77 #define SPDK_BDEV_QOS_TIMESLICE_IN_USEC 1000 78 #define SPDK_BDEV_QOS_MIN_IO_PER_TIMESLICE 1 79 #define SPDK_BDEV_QOS_MIN_BYTE_PER_TIMESLICE 512 80 #define SPDK_BDEV_QOS_MIN_IOS_PER_SEC 1000 81 #define SPDK_BDEV_QOS_MIN_BYTES_PER_SEC (1024 * 1024) 82 #define SPDK_BDEV_QOS_LIMIT_NOT_DEFINED UINT64_MAX 83 #define SPDK_BDEV_IO_POLL_INTERVAL_IN_MSEC 1000 84 85 #define SPDK_BDEV_POOL_ALIGNMENT 512 86 87 static const char *qos_rpc_type[] = {"rw_ios_per_sec", 88 "rw_mbytes_per_sec", "r_mbytes_per_sec", "w_mbytes_per_sec" 89 }; 90 91 TAILQ_HEAD(spdk_bdev_list, spdk_bdev); 92 93 struct spdk_bdev_mgr { 94 struct spdk_mempool *bdev_io_pool; 95 96 struct spdk_mempool *buf_small_pool; 97 struct spdk_mempool *buf_large_pool; 98 99 void *zero_buffer; 100 101 TAILQ_HEAD(bdev_module_list, spdk_bdev_module) bdev_modules; 102 103 struct spdk_bdev_list bdevs; 104 105 bool init_complete; 106 bool module_init_complete; 107 108 pthread_mutex_t mutex; 109 110 #ifdef SPDK_CONFIG_VTUNE 111 __itt_domain *domain; 112 #endif 113 }; 114 115 static struct spdk_bdev_mgr g_bdev_mgr = { 116 .bdev_modules = TAILQ_HEAD_INITIALIZER(g_bdev_mgr.bdev_modules), 117 .bdevs = TAILQ_HEAD_INITIALIZER(g_bdev_mgr.bdevs), 118 .init_complete = false, 119 .module_init_complete = false, 120 .mutex = PTHREAD_MUTEX_INITIALIZER, 121 }; 122 123 typedef void (*lock_range_cb)(void *ctx, int status); 124 125 struct lba_range { 126 uint64_t offset; 127 uint64_t length; 128 void *locked_ctx; 129 struct spdk_bdev_channel *owner_ch; 130 TAILQ_ENTRY(lba_range) tailq; 131 }; 132 133 static struct spdk_bdev_opts g_bdev_opts = { 134 .bdev_io_pool_size = SPDK_BDEV_IO_POOL_SIZE, 135 .bdev_io_cache_size = SPDK_BDEV_IO_CACHE_SIZE, 136 .bdev_auto_examine = SPDK_BDEV_AUTO_EXAMINE, 137 }; 138 139 static spdk_bdev_init_cb g_init_cb_fn = NULL; 140 static void *g_init_cb_arg = NULL; 141 142 static spdk_bdev_fini_cb g_fini_cb_fn = NULL; 143 static void *g_fini_cb_arg = NULL; 144 static struct spdk_thread *g_fini_thread = NULL; 145 146 struct spdk_bdev_qos_limit { 147 /** IOs or bytes allowed per second (i.e., 1s). */ 148 uint64_t limit; 149 150 /** Remaining IOs or bytes allowed in current timeslice (e.g., 1ms). 151 * For remaining bytes, allowed to run negative if an I/O is submitted when 152 * some bytes are remaining, but the I/O is bigger than that amount. The 153 * excess will be deducted from the next timeslice. 154 */ 155 int64_t remaining_this_timeslice; 156 157 /** Minimum allowed IOs or bytes to be issued in one timeslice (e.g., 1ms). */ 158 uint32_t min_per_timeslice; 159 160 /** Maximum allowed IOs or bytes to be issued in one timeslice (e.g., 1ms). */ 161 uint32_t max_per_timeslice; 162 163 /** Function to check whether to queue the IO. */ 164 bool (*queue_io)(const struct spdk_bdev_qos_limit *limit, struct spdk_bdev_io *io); 165 166 /** Function to update for the submitted IO. */ 167 void (*update_quota)(struct spdk_bdev_qos_limit *limit, struct spdk_bdev_io *io); 168 }; 169 170 struct spdk_bdev_qos { 171 /** Types of structure of rate limits. */ 172 struct spdk_bdev_qos_limit rate_limits[SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES]; 173 174 /** The channel that all I/O are funneled through. */ 175 struct spdk_bdev_channel *ch; 176 177 /** The thread on which the poller is running. */ 178 struct spdk_thread *thread; 179 180 /** Queue of I/O waiting to be issued. */ 181 bdev_io_tailq_t queued; 182 183 /** Size of a timeslice in tsc ticks. */ 184 uint64_t timeslice_size; 185 186 /** Timestamp of start of last timeslice. */ 187 uint64_t last_timeslice; 188 189 /** Poller that processes queued I/O commands each time slice. */ 190 struct spdk_poller *poller; 191 }; 192 193 struct spdk_bdev_mgmt_channel { 194 bdev_io_stailq_t need_buf_small; 195 bdev_io_stailq_t need_buf_large; 196 197 /* 198 * Each thread keeps a cache of bdev_io - this allows 199 * bdev threads which are *not* DPDK threads to still 200 * benefit from a per-thread bdev_io cache. Without 201 * this, non-DPDK threads fetching from the mempool 202 * incur a cmpxchg on get and put. 203 */ 204 bdev_io_stailq_t per_thread_cache; 205 uint32_t per_thread_cache_count; 206 uint32_t bdev_io_cache_size; 207 208 TAILQ_HEAD(, spdk_bdev_shared_resource) shared_resources; 209 TAILQ_HEAD(, spdk_bdev_io_wait_entry) io_wait_queue; 210 }; 211 212 /* 213 * Per-module (or per-io_device) data. Multiple bdevs built on the same io_device 214 * will queue here their IO that awaits retry. It makes it possible to retry sending 215 * IO to one bdev after IO from other bdev completes. 216 */ 217 struct spdk_bdev_shared_resource { 218 /* The bdev management channel */ 219 struct spdk_bdev_mgmt_channel *mgmt_ch; 220 221 /* 222 * Count of I/O submitted to bdev module and waiting for completion. 223 * Incremented before submit_request() is called on an spdk_bdev_io. 224 */ 225 uint64_t io_outstanding; 226 227 /* 228 * Queue of IO awaiting retry because of a previous NOMEM status returned 229 * on this channel. 230 */ 231 bdev_io_tailq_t nomem_io; 232 233 /* 234 * Threshold which io_outstanding must drop to before retrying nomem_io. 235 */ 236 uint64_t nomem_threshold; 237 238 /* I/O channel allocated by a bdev module */ 239 struct spdk_io_channel *shared_ch; 240 241 /* Refcount of bdev channels using this resource */ 242 uint32_t ref; 243 244 TAILQ_ENTRY(spdk_bdev_shared_resource) link; 245 }; 246 247 #define BDEV_CH_RESET_IN_PROGRESS (1 << 0) 248 #define BDEV_CH_QOS_ENABLED (1 << 1) 249 250 struct spdk_bdev_channel { 251 struct spdk_bdev *bdev; 252 253 /* The channel for the underlying device */ 254 struct spdk_io_channel *channel; 255 256 /* Per io_device per thread data */ 257 struct spdk_bdev_shared_resource *shared_resource; 258 259 struct spdk_bdev_io_stat stat; 260 261 /* 262 * Count of I/O submitted to the underlying dev module through this channel 263 * and waiting for completion. 264 */ 265 uint64_t io_outstanding; 266 267 /* 268 * List of all submitted I/Os including I/O that are generated via splitting. 269 */ 270 bdev_io_tailq_t io_submitted; 271 272 /* 273 * List of spdk_bdev_io that are currently queued because they write to a locked 274 * LBA range. 275 */ 276 bdev_io_tailq_t io_locked; 277 278 uint32_t flags; 279 280 struct spdk_histogram_data *histogram; 281 282 #ifdef SPDK_CONFIG_VTUNE 283 uint64_t start_tsc; 284 uint64_t interval_tsc; 285 __itt_string_handle *handle; 286 struct spdk_bdev_io_stat prev_stat; 287 #endif 288 289 bdev_io_tailq_t queued_resets; 290 291 lba_range_tailq_t locked_ranges; 292 }; 293 294 struct media_event_entry { 295 struct spdk_bdev_media_event event; 296 TAILQ_ENTRY(media_event_entry) tailq; 297 }; 298 299 #define MEDIA_EVENT_POOL_SIZE 64 300 301 struct spdk_bdev_desc { 302 struct spdk_bdev *bdev; 303 struct spdk_thread *thread; 304 struct { 305 bool open_with_ext; 306 union { 307 spdk_bdev_remove_cb_t remove_fn; 308 spdk_bdev_event_cb_t event_fn; 309 }; 310 void *ctx; 311 } callback; 312 bool closed; 313 bool write; 314 pthread_mutex_t mutex; 315 uint32_t refs; 316 TAILQ_HEAD(, media_event_entry) pending_media_events; 317 TAILQ_HEAD(, media_event_entry) free_media_events; 318 struct media_event_entry *media_events_buffer; 319 TAILQ_ENTRY(spdk_bdev_desc) link; 320 321 uint64_t timeout_in_sec; 322 spdk_bdev_io_timeout_cb cb_fn; 323 void *cb_arg; 324 struct spdk_poller *io_timeout_poller; 325 }; 326 327 struct spdk_bdev_iostat_ctx { 328 struct spdk_bdev_io_stat *stat; 329 spdk_bdev_get_device_stat_cb cb; 330 void *cb_arg; 331 }; 332 333 struct set_qos_limit_ctx { 334 void (*cb_fn)(void *cb_arg, int status); 335 void *cb_arg; 336 struct spdk_bdev *bdev; 337 }; 338 339 #define __bdev_to_io_dev(bdev) (((char *)bdev) + 1) 340 #define __bdev_from_io_dev(io_dev) ((struct spdk_bdev *)(((char *)io_dev) - 1)) 341 342 static void bdev_write_zero_buffer_done(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg); 343 static void bdev_write_zero_buffer_next(void *_bdev_io); 344 345 static void bdev_enable_qos_msg(struct spdk_io_channel_iter *i); 346 static void bdev_enable_qos_done(struct spdk_io_channel_iter *i, int status); 347 348 static int 349 bdev_readv_blocks_with_md(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 350 struct iovec *iov, int iovcnt, void *md_buf, uint64_t offset_blocks, 351 uint64_t num_blocks, spdk_bdev_io_completion_cb cb, void *cb_arg); 352 static int 353 bdev_writev_blocks_with_md(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 354 struct iovec *iov, int iovcnt, void *md_buf, 355 uint64_t offset_blocks, uint64_t num_blocks, 356 spdk_bdev_io_completion_cb cb, void *cb_arg); 357 358 static int 359 bdev_lock_lba_range(struct spdk_bdev_desc *desc, struct spdk_io_channel *_ch, 360 uint64_t offset, uint64_t length, 361 lock_range_cb cb_fn, void *cb_arg); 362 363 static int 364 bdev_unlock_lba_range(struct spdk_bdev_desc *desc, struct spdk_io_channel *_ch, 365 uint64_t offset, uint64_t length, 366 lock_range_cb cb_fn, void *cb_arg); 367 368 static inline void bdev_io_complete(void *ctx); 369 370 static bool bdev_abort_queued_io(bdev_io_tailq_t *queue, struct spdk_bdev_io *bio_to_abort); 371 static bool bdev_abort_buf_io(bdev_io_stailq_t *queue, struct spdk_bdev_io *bio_to_abort); 372 373 void 374 spdk_bdev_get_opts(struct spdk_bdev_opts *opts) 375 { 376 *opts = g_bdev_opts; 377 } 378 379 int 380 spdk_bdev_set_opts(struct spdk_bdev_opts *opts) 381 { 382 uint32_t min_pool_size; 383 384 /* 385 * Add 1 to the thread count to account for the extra mgmt_ch that gets created during subsystem 386 * initialization. A second mgmt_ch will be created on the same thread when the application starts 387 * but before the deferred put_io_channel event is executed for the first mgmt_ch. 388 */ 389 min_pool_size = opts->bdev_io_cache_size * (spdk_thread_get_count() + 1); 390 if (opts->bdev_io_pool_size < min_pool_size) { 391 SPDK_ERRLOG("bdev_io_pool_size %" PRIu32 " is not compatible with bdev_io_cache_size %" PRIu32 392 " and %" PRIu32 " threads\n", opts->bdev_io_pool_size, opts->bdev_io_cache_size, 393 spdk_thread_get_count()); 394 SPDK_ERRLOG("bdev_io_pool_size must be at least %" PRIu32 "\n", min_pool_size); 395 return -1; 396 } 397 398 g_bdev_opts = *opts; 399 return 0; 400 } 401 402 struct spdk_bdev_examine_item { 403 char *name; 404 TAILQ_ENTRY(spdk_bdev_examine_item) link; 405 }; 406 407 TAILQ_HEAD(spdk_bdev_examine_allowlist, spdk_bdev_examine_item); 408 409 struct spdk_bdev_examine_allowlist g_bdev_examine_allowlist = TAILQ_HEAD_INITIALIZER( 410 g_bdev_examine_allowlist); 411 412 static inline bool 413 bdev_examine_allowlist_check(const char *name) 414 { 415 struct spdk_bdev_examine_item *item; 416 TAILQ_FOREACH(item, &g_bdev_examine_allowlist, link) { 417 if (strcmp(name, item->name) == 0) { 418 return true; 419 } 420 } 421 return false; 422 } 423 424 static inline void 425 bdev_examine_allowlist_free(void) 426 { 427 struct spdk_bdev_examine_item *item; 428 while (!TAILQ_EMPTY(&g_bdev_examine_allowlist)) { 429 item = TAILQ_FIRST(&g_bdev_examine_allowlist); 430 TAILQ_REMOVE(&g_bdev_examine_allowlist, item, link); 431 free(item->name); 432 free(item); 433 } 434 } 435 436 static inline bool 437 bdev_in_examine_allowlist(struct spdk_bdev *bdev) 438 { 439 struct spdk_bdev_alias *tmp; 440 if (bdev_examine_allowlist_check(bdev->name)) { 441 return true; 442 } 443 TAILQ_FOREACH(tmp, &bdev->aliases, tailq) { 444 if (bdev_examine_allowlist_check(tmp->alias)) { 445 return true; 446 } 447 } 448 return false; 449 } 450 451 static inline bool 452 bdev_ok_to_examine(struct spdk_bdev *bdev) 453 { 454 if (g_bdev_opts.bdev_auto_examine) { 455 return true; 456 } else { 457 return bdev_in_examine_allowlist(bdev); 458 } 459 } 460 461 static void 462 bdev_examine(struct spdk_bdev *bdev) 463 { 464 struct spdk_bdev_module *module; 465 uint32_t action; 466 467 TAILQ_FOREACH(module, &g_bdev_mgr.bdev_modules, internal.tailq) { 468 if (module->examine_config && bdev_ok_to_examine(bdev)) { 469 action = module->internal.action_in_progress; 470 module->internal.action_in_progress++; 471 module->examine_config(bdev); 472 if (action != module->internal.action_in_progress) { 473 SPDK_ERRLOG("examine_config for module %s did not call spdk_bdev_module_examine_done()\n", 474 module->name); 475 } 476 } 477 } 478 479 if (bdev->internal.claim_module && bdev_ok_to_examine(bdev)) { 480 if (bdev->internal.claim_module->examine_disk) { 481 bdev->internal.claim_module->internal.action_in_progress++; 482 bdev->internal.claim_module->examine_disk(bdev); 483 } 484 return; 485 } 486 487 TAILQ_FOREACH(module, &g_bdev_mgr.bdev_modules, internal.tailq) { 488 if (module->examine_disk && bdev_ok_to_examine(bdev)) { 489 module->internal.action_in_progress++; 490 module->examine_disk(bdev); 491 } 492 } 493 } 494 495 int 496 spdk_bdev_examine(const char *name) 497 { 498 struct spdk_bdev *bdev; 499 struct spdk_bdev_examine_item *item; 500 501 if (g_bdev_opts.bdev_auto_examine) { 502 SPDK_ERRLOG("Manual examine is not allowed if auto examine is enabled"); 503 return -EINVAL; 504 } 505 506 if (bdev_examine_allowlist_check(name)) { 507 SPDK_ERRLOG("Duplicate bdev name for manual examine: %s\n", name); 508 return -EEXIST; 509 } 510 511 item = calloc(1, sizeof(*item)); 512 if (!item) { 513 return -ENOMEM; 514 } 515 item->name = strdup(name); 516 if (!item->name) { 517 free(item); 518 return -ENOMEM; 519 } 520 TAILQ_INSERT_TAIL(&g_bdev_examine_allowlist, item, link); 521 522 bdev = spdk_bdev_get_by_name(name); 523 if (bdev) { 524 bdev_examine(bdev); 525 } 526 return 0; 527 } 528 529 static inline void 530 bdev_examine_allowlist_config_json(struct spdk_json_write_ctx *w) 531 { 532 struct spdk_bdev_examine_item *item; 533 TAILQ_FOREACH(item, &g_bdev_examine_allowlist, link) { 534 spdk_json_write_object_begin(w); 535 spdk_json_write_named_string(w, "method", "bdev_examine"); 536 spdk_json_write_named_object_begin(w, "params"); 537 spdk_json_write_named_string(w, "name", item->name); 538 spdk_json_write_object_end(w); 539 spdk_json_write_object_end(w); 540 } 541 } 542 543 struct spdk_bdev * 544 spdk_bdev_first(void) 545 { 546 struct spdk_bdev *bdev; 547 548 bdev = TAILQ_FIRST(&g_bdev_mgr.bdevs); 549 if (bdev) { 550 SPDK_DEBUGLOG(bdev, "Starting bdev iteration at %s\n", bdev->name); 551 } 552 553 return bdev; 554 } 555 556 struct spdk_bdev * 557 spdk_bdev_next(struct spdk_bdev *prev) 558 { 559 struct spdk_bdev *bdev; 560 561 bdev = TAILQ_NEXT(prev, internal.link); 562 if (bdev) { 563 SPDK_DEBUGLOG(bdev, "Continuing bdev iteration at %s\n", bdev->name); 564 } 565 566 return bdev; 567 } 568 569 static struct spdk_bdev * 570 _bdev_next_leaf(struct spdk_bdev *bdev) 571 { 572 while (bdev != NULL) { 573 if (bdev->internal.claim_module == NULL) { 574 return bdev; 575 } else { 576 bdev = TAILQ_NEXT(bdev, internal.link); 577 } 578 } 579 580 return bdev; 581 } 582 583 struct spdk_bdev * 584 spdk_bdev_first_leaf(void) 585 { 586 struct spdk_bdev *bdev; 587 588 bdev = _bdev_next_leaf(TAILQ_FIRST(&g_bdev_mgr.bdevs)); 589 590 if (bdev) { 591 SPDK_DEBUGLOG(bdev, "Starting bdev iteration at %s\n", bdev->name); 592 } 593 594 return bdev; 595 } 596 597 struct spdk_bdev * 598 spdk_bdev_next_leaf(struct spdk_bdev *prev) 599 { 600 struct spdk_bdev *bdev; 601 602 bdev = _bdev_next_leaf(TAILQ_NEXT(prev, internal.link)); 603 604 if (bdev) { 605 SPDK_DEBUGLOG(bdev, "Continuing bdev iteration at %s\n", bdev->name); 606 } 607 608 return bdev; 609 } 610 611 struct spdk_bdev * 612 spdk_bdev_get_by_name(const char *bdev_name) 613 { 614 struct spdk_bdev_alias *tmp; 615 struct spdk_bdev *bdev = spdk_bdev_first(); 616 617 while (bdev != NULL) { 618 if (strcmp(bdev_name, bdev->name) == 0) { 619 return bdev; 620 } 621 622 TAILQ_FOREACH(tmp, &bdev->aliases, tailq) { 623 if (strcmp(bdev_name, tmp->alias) == 0) { 624 return bdev; 625 } 626 } 627 628 bdev = spdk_bdev_next(bdev); 629 } 630 631 return NULL; 632 } 633 634 void 635 spdk_bdev_io_set_buf(struct spdk_bdev_io *bdev_io, void *buf, size_t len) 636 { 637 struct iovec *iovs; 638 639 if (bdev_io->u.bdev.iovs == NULL) { 640 bdev_io->u.bdev.iovs = &bdev_io->iov; 641 bdev_io->u.bdev.iovcnt = 1; 642 } 643 644 iovs = bdev_io->u.bdev.iovs; 645 646 assert(iovs != NULL); 647 assert(bdev_io->u.bdev.iovcnt >= 1); 648 649 iovs[0].iov_base = buf; 650 iovs[0].iov_len = len; 651 } 652 653 void 654 spdk_bdev_io_set_md_buf(struct spdk_bdev_io *bdev_io, void *md_buf, size_t len) 655 { 656 assert((len / spdk_bdev_get_md_size(bdev_io->bdev)) >= bdev_io->u.bdev.num_blocks); 657 bdev_io->u.bdev.md_buf = md_buf; 658 } 659 660 static bool 661 _is_buf_allocated(const struct iovec *iovs) 662 { 663 if (iovs == NULL) { 664 return false; 665 } 666 667 return iovs[0].iov_base != NULL; 668 } 669 670 static bool 671 _are_iovs_aligned(struct iovec *iovs, int iovcnt, uint32_t alignment) 672 { 673 int i; 674 uintptr_t iov_base; 675 676 if (spdk_likely(alignment == 1)) { 677 return true; 678 } 679 680 for (i = 0; i < iovcnt; i++) { 681 iov_base = (uintptr_t)iovs[i].iov_base; 682 if ((iov_base & (alignment - 1)) != 0) { 683 return false; 684 } 685 } 686 687 return true; 688 } 689 690 static void 691 _copy_iovs_to_buf(void *buf, size_t buf_len, struct iovec *iovs, int iovcnt) 692 { 693 int i; 694 size_t len; 695 696 for (i = 0; i < iovcnt; i++) { 697 len = spdk_min(iovs[i].iov_len, buf_len); 698 memcpy(buf, iovs[i].iov_base, len); 699 buf += len; 700 buf_len -= len; 701 } 702 } 703 704 static void 705 _copy_buf_to_iovs(struct iovec *iovs, int iovcnt, void *buf, size_t buf_len) 706 { 707 int i; 708 size_t len; 709 710 for (i = 0; i < iovcnt; i++) { 711 len = spdk_min(iovs[i].iov_len, buf_len); 712 memcpy(iovs[i].iov_base, buf, len); 713 buf += len; 714 buf_len -= len; 715 } 716 } 717 718 static void 719 _bdev_io_set_bounce_buf(struct spdk_bdev_io *bdev_io, void *buf, size_t len) 720 { 721 /* save original iovec */ 722 bdev_io->internal.orig_iovs = bdev_io->u.bdev.iovs; 723 bdev_io->internal.orig_iovcnt = bdev_io->u.bdev.iovcnt; 724 /* set bounce iov */ 725 bdev_io->u.bdev.iovs = &bdev_io->internal.bounce_iov; 726 bdev_io->u.bdev.iovcnt = 1; 727 /* set bounce buffer for this operation */ 728 bdev_io->u.bdev.iovs[0].iov_base = buf; 729 bdev_io->u.bdev.iovs[0].iov_len = len; 730 /* if this is write path, copy data from original buffer to bounce buffer */ 731 if (bdev_io->type == SPDK_BDEV_IO_TYPE_WRITE) { 732 _copy_iovs_to_buf(buf, len, bdev_io->internal.orig_iovs, bdev_io->internal.orig_iovcnt); 733 } 734 } 735 736 static void 737 _bdev_io_set_bounce_md_buf(struct spdk_bdev_io *bdev_io, void *md_buf, size_t len) 738 { 739 /* save original md_buf */ 740 bdev_io->internal.orig_md_buf = bdev_io->u.bdev.md_buf; 741 /* set bounce md_buf */ 742 bdev_io->u.bdev.md_buf = md_buf; 743 744 if (bdev_io->type == SPDK_BDEV_IO_TYPE_WRITE) { 745 memcpy(md_buf, bdev_io->internal.orig_md_buf, len); 746 } 747 } 748 749 static void 750 bdev_io_get_buf_complete(struct spdk_bdev_io *bdev_io, void *buf, bool status) 751 { 752 struct spdk_io_channel *ch = spdk_bdev_io_get_io_channel(bdev_io); 753 754 if (spdk_unlikely(bdev_io->internal.get_aux_buf_cb != NULL)) { 755 bdev_io->internal.get_aux_buf_cb(ch, bdev_io, buf); 756 bdev_io->internal.get_aux_buf_cb = NULL; 757 } else { 758 assert(bdev_io->internal.get_buf_cb != NULL); 759 bdev_io->internal.buf = buf; 760 bdev_io->internal.get_buf_cb(ch, bdev_io, status); 761 bdev_io->internal.get_buf_cb = NULL; 762 } 763 } 764 765 static void 766 _bdev_io_set_buf(struct spdk_bdev_io *bdev_io, void *buf, uint64_t len) 767 { 768 struct spdk_bdev *bdev = bdev_io->bdev; 769 bool buf_allocated; 770 uint64_t md_len, alignment; 771 void *aligned_buf; 772 773 if (spdk_unlikely(bdev_io->internal.get_aux_buf_cb != NULL)) { 774 bdev_io_get_buf_complete(bdev_io, buf, true); 775 return; 776 } 777 778 alignment = spdk_bdev_get_buf_align(bdev); 779 buf_allocated = _is_buf_allocated(bdev_io->u.bdev.iovs); 780 aligned_buf = (void *)(((uintptr_t)buf + (alignment - 1)) & ~(alignment - 1)); 781 782 if (buf_allocated) { 783 _bdev_io_set_bounce_buf(bdev_io, aligned_buf, len); 784 } else { 785 spdk_bdev_io_set_buf(bdev_io, aligned_buf, len); 786 } 787 788 if (spdk_bdev_is_md_separate(bdev)) { 789 aligned_buf = (char *)aligned_buf + len; 790 md_len = bdev_io->u.bdev.num_blocks * bdev->md_len; 791 792 assert(((uintptr_t)aligned_buf & (alignment - 1)) == 0); 793 794 if (bdev_io->u.bdev.md_buf != NULL) { 795 _bdev_io_set_bounce_md_buf(bdev_io, aligned_buf, md_len); 796 } else { 797 spdk_bdev_io_set_md_buf(bdev_io, aligned_buf, md_len); 798 } 799 } 800 bdev_io_get_buf_complete(bdev_io, buf, true); 801 } 802 803 static void 804 _bdev_io_put_buf(struct spdk_bdev_io *bdev_io, void *buf, uint64_t buf_len) 805 { 806 struct spdk_bdev *bdev = bdev_io->bdev; 807 struct spdk_mempool *pool; 808 struct spdk_bdev_io *tmp; 809 bdev_io_stailq_t *stailq; 810 struct spdk_bdev_mgmt_channel *ch; 811 uint64_t md_len, alignment; 812 813 md_len = spdk_bdev_is_md_separate(bdev) ? bdev_io->u.bdev.num_blocks * bdev->md_len : 0; 814 alignment = spdk_bdev_get_buf_align(bdev); 815 ch = bdev_io->internal.ch->shared_resource->mgmt_ch; 816 817 if (buf_len + alignment + md_len <= SPDK_BDEV_BUF_SIZE_WITH_MD(SPDK_BDEV_SMALL_BUF_MAX_SIZE) + 818 SPDK_BDEV_POOL_ALIGNMENT) { 819 pool = g_bdev_mgr.buf_small_pool; 820 stailq = &ch->need_buf_small; 821 } else { 822 pool = g_bdev_mgr.buf_large_pool; 823 stailq = &ch->need_buf_large; 824 } 825 826 if (STAILQ_EMPTY(stailq)) { 827 spdk_mempool_put(pool, buf); 828 } else { 829 tmp = STAILQ_FIRST(stailq); 830 STAILQ_REMOVE_HEAD(stailq, internal.buf_link); 831 _bdev_io_set_buf(tmp, buf, tmp->internal.buf_len); 832 } 833 } 834 835 static void 836 bdev_io_put_buf(struct spdk_bdev_io *bdev_io) 837 { 838 assert(bdev_io->internal.buf != NULL); 839 _bdev_io_put_buf(bdev_io, bdev_io->internal.buf, bdev_io->internal.buf_len); 840 bdev_io->internal.buf = NULL; 841 } 842 843 void 844 spdk_bdev_io_put_aux_buf(struct spdk_bdev_io *bdev_io, void *buf) 845 { 846 uint64_t len = bdev_io->u.bdev.num_blocks * bdev_io->bdev->blocklen; 847 848 assert(buf != NULL); 849 _bdev_io_put_buf(bdev_io, buf, len); 850 } 851 852 static void 853 _bdev_io_unset_bounce_buf(struct spdk_bdev_io *bdev_io) 854 { 855 if (spdk_likely(bdev_io->internal.orig_iovcnt == 0)) { 856 assert(bdev_io->internal.orig_md_buf == NULL); 857 return; 858 } 859 860 /* if this is read path, copy data from bounce buffer to original buffer */ 861 if (bdev_io->type == SPDK_BDEV_IO_TYPE_READ && 862 bdev_io->internal.status == SPDK_BDEV_IO_STATUS_SUCCESS) { 863 _copy_buf_to_iovs(bdev_io->internal.orig_iovs, 864 bdev_io->internal.orig_iovcnt, 865 bdev_io->internal.bounce_iov.iov_base, 866 bdev_io->internal.bounce_iov.iov_len); 867 } 868 /* set original buffer for this io */ 869 bdev_io->u.bdev.iovcnt = bdev_io->internal.orig_iovcnt; 870 bdev_io->u.bdev.iovs = bdev_io->internal.orig_iovs; 871 /* disable bouncing buffer for this io */ 872 bdev_io->internal.orig_iovcnt = 0; 873 bdev_io->internal.orig_iovs = NULL; 874 875 /* do the same for metadata buffer */ 876 if (spdk_unlikely(bdev_io->internal.orig_md_buf != NULL)) { 877 assert(spdk_bdev_is_md_separate(bdev_io->bdev)); 878 879 if (bdev_io->type == SPDK_BDEV_IO_TYPE_READ && 880 bdev_io->internal.status == SPDK_BDEV_IO_STATUS_SUCCESS) { 881 memcpy(bdev_io->internal.orig_md_buf, bdev_io->u.bdev.md_buf, 882 bdev_io->u.bdev.num_blocks * spdk_bdev_get_md_size(bdev_io->bdev)); 883 } 884 885 bdev_io->u.bdev.md_buf = bdev_io->internal.orig_md_buf; 886 bdev_io->internal.orig_md_buf = NULL; 887 } 888 889 /* We want to free the bounce buffer here since we know we're done with it (as opposed 890 * to waiting for the conditional free of internal.buf in spdk_bdev_free_io()). 891 */ 892 bdev_io_put_buf(bdev_io); 893 } 894 895 static void 896 bdev_io_get_buf(struct spdk_bdev_io *bdev_io, uint64_t len) 897 { 898 struct spdk_bdev *bdev = bdev_io->bdev; 899 struct spdk_mempool *pool; 900 bdev_io_stailq_t *stailq; 901 struct spdk_bdev_mgmt_channel *mgmt_ch; 902 uint64_t alignment, md_len; 903 void *buf; 904 905 alignment = spdk_bdev_get_buf_align(bdev); 906 md_len = spdk_bdev_is_md_separate(bdev) ? bdev_io->u.bdev.num_blocks * bdev->md_len : 0; 907 908 if (len + alignment + md_len > SPDK_BDEV_BUF_SIZE_WITH_MD(SPDK_BDEV_LARGE_BUF_MAX_SIZE) + 909 SPDK_BDEV_POOL_ALIGNMENT) { 910 SPDK_ERRLOG("Length + alignment %" PRIu64 " is larger than allowed\n", 911 len + alignment); 912 bdev_io_get_buf_complete(bdev_io, NULL, false); 913 return; 914 } 915 916 mgmt_ch = bdev_io->internal.ch->shared_resource->mgmt_ch; 917 918 bdev_io->internal.buf_len = len; 919 920 if (len + alignment + md_len <= SPDK_BDEV_BUF_SIZE_WITH_MD(SPDK_BDEV_SMALL_BUF_MAX_SIZE) + 921 SPDK_BDEV_POOL_ALIGNMENT) { 922 pool = g_bdev_mgr.buf_small_pool; 923 stailq = &mgmt_ch->need_buf_small; 924 } else { 925 pool = g_bdev_mgr.buf_large_pool; 926 stailq = &mgmt_ch->need_buf_large; 927 } 928 929 buf = spdk_mempool_get(pool); 930 if (!buf) { 931 STAILQ_INSERT_TAIL(stailq, bdev_io, internal.buf_link); 932 } else { 933 _bdev_io_set_buf(bdev_io, buf, len); 934 } 935 } 936 937 void 938 spdk_bdev_io_get_buf(struct spdk_bdev_io *bdev_io, spdk_bdev_io_get_buf_cb cb, uint64_t len) 939 { 940 struct spdk_bdev *bdev = bdev_io->bdev; 941 uint64_t alignment; 942 943 assert(cb != NULL); 944 bdev_io->internal.get_buf_cb = cb; 945 946 alignment = spdk_bdev_get_buf_align(bdev); 947 948 if (_is_buf_allocated(bdev_io->u.bdev.iovs) && 949 _are_iovs_aligned(bdev_io->u.bdev.iovs, bdev_io->u.bdev.iovcnt, alignment)) { 950 /* Buffer already present and aligned */ 951 cb(spdk_bdev_io_get_io_channel(bdev_io), bdev_io, true); 952 return; 953 } 954 955 bdev_io_get_buf(bdev_io, len); 956 } 957 958 void 959 spdk_bdev_io_get_aux_buf(struct spdk_bdev_io *bdev_io, spdk_bdev_io_get_aux_buf_cb cb) 960 { 961 uint64_t len = bdev_io->u.bdev.num_blocks * bdev_io->bdev->blocklen; 962 963 assert(cb != NULL); 964 assert(bdev_io->internal.get_aux_buf_cb == NULL); 965 bdev_io->internal.get_aux_buf_cb = cb; 966 bdev_io_get_buf(bdev_io, len); 967 } 968 969 static int 970 bdev_module_get_max_ctx_size(void) 971 { 972 struct spdk_bdev_module *bdev_module; 973 int max_bdev_module_size = 0; 974 975 TAILQ_FOREACH(bdev_module, &g_bdev_mgr.bdev_modules, internal.tailq) { 976 if (bdev_module->get_ctx_size && bdev_module->get_ctx_size() > max_bdev_module_size) { 977 max_bdev_module_size = bdev_module->get_ctx_size(); 978 } 979 } 980 981 return max_bdev_module_size; 982 } 983 984 void 985 spdk_bdev_config_text(FILE *fp) 986 { 987 struct spdk_bdev_module *bdev_module; 988 989 TAILQ_FOREACH(bdev_module, &g_bdev_mgr.bdev_modules, internal.tailq) { 990 if (bdev_module->config_text) { 991 bdev_module->config_text(fp); 992 } 993 } 994 } 995 996 static void 997 bdev_qos_config_json(struct spdk_bdev *bdev, struct spdk_json_write_ctx *w) 998 { 999 int i; 1000 struct spdk_bdev_qos *qos = bdev->internal.qos; 1001 uint64_t limits[SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES]; 1002 1003 if (!qos) { 1004 return; 1005 } 1006 1007 spdk_bdev_get_qos_rate_limits(bdev, limits); 1008 1009 spdk_json_write_object_begin(w); 1010 spdk_json_write_named_string(w, "method", "bdev_set_qos_limit"); 1011 1012 spdk_json_write_named_object_begin(w, "params"); 1013 spdk_json_write_named_string(w, "name", bdev->name); 1014 for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) { 1015 if (limits[i] > 0) { 1016 spdk_json_write_named_uint64(w, qos_rpc_type[i], limits[i]); 1017 } 1018 } 1019 spdk_json_write_object_end(w); 1020 1021 spdk_json_write_object_end(w); 1022 } 1023 1024 void 1025 spdk_bdev_subsystem_config_json(struct spdk_json_write_ctx *w) 1026 { 1027 struct spdk_bdev_module *bdev_module; 1028 struct spdk_bdev *bdev; 1029 1030 assert(w != NULL); 1031 1032 spdk_json_write_array_begin(w); 1033 1034 spdk_json_write_object_begin(w); 1035 spdk_json_write_named_string(w, "method", "bdev_set_options"); 1036 spdk_json_write_named_object_begin(w, "params"); 1037 spdk_json_write_named_uint32(w, "bdev_io_pool_size", g_bdev_opts.bdev_io_pool_size); 1038 spdk_json_write_named_uint32(w, "bdev_io_cache_size", g_bdev_opts.bdev_io_cache_size); 1039 spdk_json_write_named_bool(w, "bdev_auto_examine", g_bdev_opts.bdev_auto_examine); 1040 spdk_json_write_object_end(w); 1041 spdk_json_write_object_end(w); 1042 1043 bdev_examine_allowlist_config_json(w); 1044 1045 TAILQ_FOREACH(bdev_module, &g_bdev_mgr.bdev_modules, internal.tailq) { 1046 if (bdev_module->config_json) { 1047 bdev_module->config_json(w); 1048 } 1049 } 1050 1051 pthread_mutex_lock(&g_bdev_mgr.mutex); 1052 1053 TAILQ_FOREACH(bdev, &g_bdev_mgr.bdevs, internal.link) { 1054 if (bdev->fn_table->write_config_json) { 1055 bdev->fn_table->write_config_json(bdev, w); 1056 } 1057 1058 bdev_qos_config_json(bdev, w); 1059 } 1060 1061 pthread_mutex_unlock(&g_bdev_mgr.mutex); 1062 1063 spdk_json_write_array_end(w); 1064 } 1065 1066 static int 1067 bdev_mgmt_channel_create(void *io_device, void *ctx_buf) 1068 { 1069 struct spdk_bdev_mgmt_channel *ch = ctx_buf; 1070 struct spdk_bdev_io *bdev_io; 1071 uint32_t i; 1072 1073 STAILQ_INIT(&ch->need_buf_small); 1074 STAILQ_INIT(&ch->need_buf_large); 1075 1076 STAILQ_INIT(&ch->per_thread_cache); 1077 ch->bdev_io_cache_size = g_bdev_opts.bdev_io_cache_size; 1078 1079 /* Pre-populate bdev_io cache to ensure this thread cannot be starved. */ 1080 ch->per_thread_cache_count = 0; 1081 for (i = 0; i < ch->bdev_io_cache_size; i++) { 1082 bdev_io = spdk_mempool_get(g_bdev_mgr.bdev_io_pool); 1083 assert(bdev_io != NULL); 1084 ch->per_thread_cache_count++; 1085 STAILQ_INSERT_HEAD(&ch->per_thread_cache, bdev_io, internal.buf_link); 1086 } 1087 1088 TAILQ_INIT(&ch->shared_resources); 1089 TAILQ_INIT(&ch->io_wait_queue); 1090 1091 return 0; 1092 } 1093 1094 static void 1095 bdev_mgmt_channel_destroy(void *io_device, void *ctx_buf) 1096 { 1097 struct spdk_bdev_mgmt_channel *ch = ctx_buf; 1098 struct spdk_bdev_io *bdev_io; 1099 1100 if (!STAILQ_EMPTY(&ch->need_buf_small) || !STAILQ_EMPTY(&ch->need_buf_large)) { 1101 SPDK_ERRLOG("Pending I/O list wasn't empty on mgmt channel free\n"); 1102 } 1103 1104 if (!TAILQ_EMPTY(&ch->shared_resources)) { 1105 SPDK_ERRLOG("Module channel list wasn't empty on mgmt channel free\n"); 1106 } 1107 1108 while (!STAILQ_EMPTY(&ch->per_thread_cache)) { 1109 bdev_io = STAILQ_FIRST(&ch->per_thread_cache); 1110 STAILQ_REMOVE_HEAD(&ch->per_thread_cache, internal.buf_link); 1111 ch->per_thread_cache_count--; 1112 spdk_mempool_put(g_bdev_mgr.bdev_io_pool, (void *)bdev_io); 1113 } 1114 1115 assert(ch->per_thread_cache_count == 0); 1116 } 1117 1118 static void 1119 bdev_init_complete(int rc) 1120 { 1121 spdk_bdev_init_cb cb_fn = g_init_cb_fn; 1122 void *cb_arg = g_init_cb_arg; 1123 struct spdk_bdev_module *m; 1124 1125 g_bdev_mgr.init_complete = true; 1126 g_init_cb_fn = NULL; 1127 g_init_cb_arg = NULL; 1128 1129 /* 1130 * For modules that need to know when subsystem init is complete, 1131 * inform them now. 1132 */ 1133 if (rc == 0) { 1134 TAILQ_FOREACH(m, &g_bdev_mgr.bdev_modules, internal.tailq) { 1135 if (m->init_complete) { 1136 m->init_complete(); 1137 } 1138 } 1139 } 1140 1141 cb_fn(cb_arg, rc); 1142 } 1143 1144 static void 1145 bdev_module_action_complete(void) 1146 { 1147 struct spdk_bdev_module *m; 1148 1149 /* 1150 * Don't finish bdev subsystem initialization if 1151 * module pre-initialization is still in progress, or 1152 * the subsystem been already initialized. 1153 */ 1154 if (!g_bdev_mgr.module_init_complete || g_bdev_mgr.init_complete) { 1155 return; 1156 } 1157 1158 /* 1159 * Check all bdev modules for inits/examinations in progress. If any 1160 * exist, return immediately since we cannot finish bdev subsystem 1161 * initialization until all are completed. 1162 */ 1163 TAILQ_FOREACH(m, &g_bdev_mgr.bdev_modules, internal.tailq) { 1164 if (m->internal.action_in_progress > 0) { 1165 return; 1166 } 1167 } 1168 1169 /* 1170 * Modules already finished initialization - now that all 1171 * the bdev modules have finished their asynchronous I/O 1172 * processing, the entire bdev layer can be marked as complete. 1173 */ 1174 bdev_init_complete(0); 1175 } 1176 1177 static void 1178 bdev_module_action_done(struct spdk_bdev_module *module) 1179 { 1180 assert(module->internal.action_in_progress > 0); 1181 module->internal.action_in_progress--; 1182 bdev_module_action_complete(); 1183 } 1184 1185 void 1186 spdk_bdev_module_init_done(struct spdk_bdev_module *module) 1187 { 1188 bdev_module_action_done(module); 1189 } 1190 1191 void 1192 spdk_bdev_module_examine_done(struct spdk_bdev_module *module) 1193 { 1194 bdev_module_action_done(module); 1195 } 1196 1197 /** The last initialized bdev module */ 1198 static struct spdk_bdev_module *g_resume_bdev_module = NULL; 1199 1200 static void 1201 bdev_init_failed(void *cb_arg) 1202 { 1203 struct spdk_bdev_module *module = cb_arg; 1204 1205 module->internal.action_in_progress--; 1206 bdev_init_complete(-1); 1207 } 1208 1209 static int 1210 bdev_modules_init(void) 1211 { 1212 struct spdk_bdev_module *module; 1213 int rc = 0; 1214 1215 TAILQ_FOREACH(module, &g_bdev_mgr.bdev_modules, internal.tailq) { 1216 g_resume_bdev_module = module; 1217 if (module->async_init) { 1218 module->internal.action_in_progress = 1; 1219 } 1220 rc = module->module_init(); 1221 if (rc != 0) { 1222 /* Bump action_in_progress to prevent other modules from completion of modules_init 1223 * Send message to defer application shutdown until resources are cleaned up */ 1224 module->internal.action_in_progress = 1; 1225 spdk_thread_send_msg(spdk_get_thread(), bdev_init_failed, module); 1226 return rc; 1227 } 1228 } 1229 1230 g_resume_bdev_module = NULL; 1231 return 0; 1232 } 1233 1234 void 1235 spdk_bdev_initialize(spdk_bdev_init_cb cb_fn, void *cb_arg) 1236 { 1237 int cache_size; 1238 int rc = 0; 1239 char mempool_name[32]; 1240 1241 assert(cb_fn != NULL); 1242 1243 g_init_cb_fn = cb_fn; 1244 g_init_cb_arg = cb_arg; 1245 1246 spdk_notify_type_register("bdev_register"); 1247 spdk_notify_type_register("bdev_unregister"); 1248 1249 snprintf(mempool_name, sizeof(mempool_name), "bdev_io_%d", getpid()); 1250 1251 g_bdev_mgr.bdev_io_pool = spdk_mempool_create(mempool_name, 1252 g_bdev_opts.bdev_io_pool_size, 1253 sizeof(struct spdk_bdev_io) + 1254 bdev_module_get_max_ctx_size(), 1255 0, 1256 SPDK_ENV_SOCKET_ID_ANY); 1257 1258 if (g_bdev_mgr.bdev_io_pool == NULL) { 1259 SPDK_ERRLOG("could not allocate spdk_bdev_io pool\n"); 1260 bdev_init_complete(-1); 1261 return; 1262 } 1263 1264 /** 1265 * Ensure no more than half of the total buffers end up local caches, by 1266 * using spdk_env_get_core_count() to determine how many local caches we need 1267 * to account for. 1268 */ 1269 cache_size = BUF_SMALL_POOL_SIZE / (2 * spdk_env_get_core_count()); 1270 snprintf(mempool_name, sizeof(mempool_name), "buf_small_pool_%d", getpid()); 1271 1272 g_bdev_mgr.buf_small_pool = spdk_mempool_create(mempool_name, 1273 BUF_SMALL_POOL_SIZE, 1274 SPDK_BDEV_BUF_SIZE_WITH_MD(SPDK_BDEV_SMALL_BUF_MAX_SIZE) + 1275 SPDK_BDEV_POOL_ALIGNMENT, 1276 cache_size, 1277 SPDK_ENV_SOCKET_ID_ANY); 1278 if (!g_bdev_mgr.buf_small_pool) { 1279 SPDK_ERRLOG("create rbuf small pool failed\n"); 1280 bdev_init_complete(-1); 1281 return; 1282 } 1283 1284 cache_size = BUF_LARGE_POOL_SIZE / (2 * spdk_env_get_core_count()); 1285 snprintf(mempool_name, sizeof(mempool_name), "buf_large_pool_%d", getpid()); 1286 1287 g_bdev_mgr.buf_large_pool = spdk_mempool_create(mempool_name, 1288 BUF_LARGE_POOL_SIZE, 1289 SPDK_BDEV_BUF_SIZE_WITH_MD(SPDK_BDEV_LARGE_BUF_MAX_SIZE) + 1290 SPDK_BDEV_POOL_ALIGNMENT, 1291 cache_size, 1292 SPDK_ENV_SOCKET_ID_ANY); 1293 if (!g_bdev_mgr.buf_large_pool) { 1294 SPDK_ERRLOG("create rbuf large pool failed\n"); 1295 bdev_init_complete(-1); 1296 return; 1297 } 1298 1299 g_bdev_mgr.zero_buffer = spdk_zmalloc(ZERO_BUFFER_SIZE, ZERO_BUFFER_SIZE, 1300 NULL, SPDK_ENV_LCORE_ID_ANY, SPDK_MALLOC_DMA); 1301 if (!g_bdev_mgr.zero_buffer) { 1302 SPDK_ERRLOG("create bdev zero buffer failed\n"); 1303 bdev_init_complete(-1); 1304 return; 1305 } 1306 1307 #ifdef SPDK_CONFIG_VTUNE 1308 g_bdev_mgr.domain = __itt_domain_create("spdk_bdev"); 1309 #endif 1310 1311 spdk_io_device_register(&g_bdev_mgr, bdev_mgmt_channel_create, 1312 bdev_mgmt_channel_destroy, 1313 sizeof(struct spdk_bdev_mgmt_channel), 1314 "bdev_mgr"); 1315 1316 rc = bdev_modules_init(); 1317 g_bdev_mgr.module_init_complete = true; 1318 if (rc != 0) { 1319 SPDK_ERRLOG("bdev modules init failed\n"); 1320 return; 1321 } 1322 1323 bdev_module_action_complete(); 1324 } 1325 1326 static void 1327 bdev_mgr_unregister_cb(void *io_device) 1328 { 1329 spdk_bdev_fini_cb cb_fn = g_fini_cb_fn; 1330 1331 if (g_bdev_mgr.bdev_io_pool) { 1332 if (spdk_mempool_count(g_bdev_mgr.bdev_io_pool) != g_bdev_opts.bdev_io_pool_size) { 1333 SPDK_ERRLOG("bdev IO pool count is %zu but should be %u\n", 1334 spdk_mempool_count(g_bdev_mgr.bdev_io_pool), 1335 g_bdev_opts.bdev_io_pool_size); 1336 } 1337 1338 spdk_mempool_free(g_bdev_mgr.bdev_io_pool); 1339 } 1340 1341 if (g_bdev_mgr.buf_small_pool) { 1342 if (spdk_mempool_count(g_bdev_mgr.buf_small_pool) != BUF_SMALL_POOL_SIZE) { 1343 SPDK_ERRLOG("Small buffer pool count is %zu but should be %u\n", 1344 spdk_mempool_count(g_bdev_mgr.buf_small_pool), 1345 BUF_SMALL_POOL_SIZE); 1346 assert(false); 1347 } 1348 1349 spdk_mempool_free(g_bdev_mgr.buf_small_pool); 1350 } 1351 1352 if (g_bdev_mgr.buf_large_pool) { 1353 if (spdk_mempool_count(g_bdev_mgr.buf_large_pool) != BUF_LARGE_POOL_SIZE) { 1354 SPDK_ERRLOG("Large buffer pool count is %zu but should be %u\n", 1355 spdk_mempool_count(g_bdev_mgr.buf_large_pool), 1356 BUF_LARGE_POOL_SIZE); 1357 assert(false); 1358 } 1359 1360 spdk_mempool_free(g_bdev_mgr.buf_large_pool); 1361 } 1362 1363 spdk_free(g_bdev_mgr.zero_buffer); 1364 1365 bdev_examine_allowlist_free(); 1366 1367 cb_fn(g_fini_cb_arg); 1368 g_fini_cb_fn = NULL; 1369 g_fini_cb_arg = NULL; 1370 g_bdev_mgr.init_complete = false; 1371 g_bdev_mgr.module_init_complete = false; 1372 pthread_mutex_destroy(&g_bdev_mgr.mutex); 1373 } 1374 1375 static void 1376 bdev_module_finish_iter(void *arg) 1377 { 1378 struct spdk_bdev_module *bdev_module; 1379 1380 /* FIXME: Handling initialization failures is broken now, 1381 * so we won't even try cleaning up after successfully 1382 * initialized modules. if module_init_complete is false, 1383 * just call spdk_bdev_mgr_unregister_cb 1384 */ 1385 if (!g_bdev_mgr.module_init_complete) { 1386 bdev_mgr_unregister_cb(NULL); 1387 return; 1388 } 1389 1390 /* Start iterating from the last touched module */ 1391 if (!g_resume_bdev_module) { 1392 bdev_module = TAILQ_LAST(&g_bdev_mgr.bdev_modules, bdev_module_list); 1393 } else { 1394 bdev_module = TAILQ_PREV(g_resume_bdev_module, bdev_module_list, 1395 internal.tailq); 1396 } 1397 1398 while (bdev_module) { 1399 if (bdev_module->async_fini) { 1400 /* Save our place so we can resume later. We must 1401 * save the variable here, before calling module_fini() 1402 * below, because in some cases the module may immediately 1403 * call spdk_bdev_module_finish_done() and re-enter 1404 * this function to continue iterating. */ 1405 g_resume_bdev_module = bdev_module; 1406 } 1407 1408 if (bdev_module->module_fini) { 1409 bdev_module->module_fini(); 1410 } 1411 1412 if (bdev_module->async_fini) { 1413 return; 1414 } 1415 1416 bdev_module = TAILQ_PREV(bdev_module, bdev_module_list, 1417 internal.tailq); 1418 } 1419 1420 g_resume_bdev_module = NULL; 1421 spdk_io_device_unregister(&g_bdev_mgr, bdev_mgr_unregister_cb); 1422 } 1423 1424 void 1425 spdk_bdev_module_finish_done(void) 1426 { 1427 if (spdk_get_thread() != g_fini_thread) { 1428 spdk_thread_send_msg(g_fini_thread, bdev_module_finish_iter, NULL); 1429 } else { 1430 bdev_module_finish_iter(NULL); 1431 } 1432 } 1433 1434 static void 1435 bdev_finish_unregister_bdevs_iter(void *cb_arg, int bdeverrno) 1436 { 1437 struct spdk_bdev *bdev = cb_arg; 1438 1439 if (bdeverrno && bdev) { 1440 SPDK_WARNLOG("Unable to unregister bdev '%s' during spdk_bdev_finish()\n", 1441 bdev->name); 1442 1443 /* 1444 * Since the call to spdk_bdev_unregister() failed, we have no way to free this 1445 * bdev; try to continue by manually removing this bdev from the list and continue 1446 * with the next bdev in the list. 1447 */ 1448 TAILQ_REMOVE(&g_bdev_mgr.bdevs, bdev, internal.link); 1449 } 1450 1451 if (TAILQ_EMPTY(&g_bdev_mgr.bdevs)) { 1452 SPDK_DEBUGLOG(bdev, "Done unregistering bdevs\n"); 1453 /* 1454 * Bdev module finish need to be deferred as we might be in the middle of some context 1455 * (like bdev part free) that will use this bdev (or private bdev driver ctx data) 1456 * after returning. 1457 */ 1458 spdk_thread_send_msg(spdk_get_thread(), bdev_module_finish_iter, NULL); 1459 return; 1460 } 1461 1462 /* 1463 * Unregister last unclaimed bdev in the list, to ensure that bdev subsystem 1464 * shutdown proceeds top-down. The goal is to give virtual bdevs an opportunity 1465 * to detect clean shutdown as opposed to run-time hot removal of the underlying 1466 * base bdevs. 1467 * 1468 * Also, walk the list in the reverse order. 1469 */ 1470 for (bdev = TAILQ_LAST(&g_bdev_mgr.bdevs, spdk_bdev_list); 1471 bdev; bdev = TAILQ_PREV(bdev, spdk_bdev_list, internal.link)) { 1472 if (bdev->internal.claim_module != NULL) { 1473 SPDK_DEBUGLOG(bdev, "Skipping claimed bdev '%s'(<-'%s').\n", 1474 bdev->name, bdev->internal.claim_module->name); 1475 continue; 1476 } 1477 1478 SPDK_DEBUGLOG(bdev, "Unregistering bdev '%s'\n", bdev->name); 1479 spdk_bdev_unregister(bdev, bdev_finish_unregister_bdevs_iter, bdev); 1480 return; 1481 } 1482 1483 /* 1484 * If any bdev fails to unclaim underlying bdev properly, we may face the 1485 * case of bdev list consisting of claimed bdevs only (if claims are managed 1486 * correctly, this would mean there's a loop in the claims graph which is 1487 * clearly impossible). Warn and unregister last bdev on the list then. 1488 */ 1489 for (bdev = TAILQ_LAST(&g_bdev_mgr.bdevs, spdk_bdev_list); 1490 bdev; bdev = TAILQ_PREV(bdev, spdk_bdev_list, internal.link)) { 1491 SPDK_WARNLOG("Unregistering claimed bdev '%s'!\n", bdev->name); 1492 spdk_bdev_unregister(bdev, bdev_finish_unregister_bdevs_iter, bdev); 1493 return; 1494 } 1495 } 1496 1497 void 1498 spdk_bdev_finish(spdk_bdev_fini_cb cb_fn, void *cb_arg) 1499 { 1500 struct spdk_bdev_module *m; 1501 1502 assert(cb_fn != NULL); 1503 1504 g_fini_thread = spdk_get_thread(); 1505 1506 g_fini_cb_fn = cb_fn; 1507 g_fini_cb_arg = cb_arg; 1508 1509 TAILQ_FOREACH(m, &g_bdev_mgr.bdev_modules, internal.tailq) { 1510 if (m->fini_start) { 1511 m->fini_start(); 1512 } 1513 } 1514 1515 bdev_finish_unregister_bdevs_iter(NULL, 0); 1516 } 1517 1518 struct spdk_bdev_io * 1519 bdev_channel_get_io(struct spdk_bdev_channel *channel) 1520 { 1521 struct spdk_bdev_mgmt_channel *ch = channel->shared_resource->mgmt_ch; 1522 struct spdk_bdev_io *bdev_io; 1523 1524 if (ch->per_thread_cache_count > 0) { 1525 bdev_io = STAILQ_FIRST(&ch->per_thread_cache); 1526 STAILQ_REMOVE_HEAD(&ch->per_thread_cache, internal.buf_link); 1527 ch->per_thread_cache_count--; 1528 } else if (spdk_unlikely(!TAILQ_EMPTY(&ch->io_wait_queue))) { 1529 /* 1530 * Don't try to look for bdev_ios in the global pool if there are 1531 * waiters on bdev_ios - we don't want this caller to jump the line. 1532 */ 1533 bdev_io = NULL; 1534 } else { 1535 bdev_io = spdk_mempool_get(g_bdev_mgr.bdev_io_pool); 1536 } 1537 1538 return bdev_io; 1539 } 1540 1541 void 1542 spdk_bdev_free_io(struct spdk_bdev_io *bdev_io) 1543 { 1544 struct spdk_bdev_mgmt_channel *ch; 1545 1546 assert(bdev_io != NULL); 1547 assert(bdev_io->internal.status != SPDK_BDEV_IO_STATUS_PENDING); 1548 1549 ch = bdev_io->internal.ch->shared_resource->mgmt_ch; 1550 1551 if (bdev_io->internal.buf != NULL) { 1552 bdev_io_put_buf(bdev_io); 1553 } 1554 1555 if (ch->per_thread_cache_count < ch->bdev_io_cache_size) { 1556 ch->per_thread_cache_count++; 1557 STAILQ_INSERT_HEAD(&ch->per_thread_cache, bdev_io, internal.buf_link); 1558 while (ch->per_thread_cache_count > 0 && !TAILQ_EMPTY(&ch->io_wait_queue)) { 1559 struct spdk_bdev_io_wait_entry *entry; 1560 1561 entry = TAILQ_FIRST(&ch->io_wait_queue); 1562 TAILQ_REMOVE(&ch->io_wait_queue, entry, link); 1563 entry->cb_fn(entry->cb_arg); 1564 } 1565 } else { 1566 /* We should never have a full cache with entries on the io wait queue. */ 1567 assert(TAILQ_EMPTY(&ch->io_wait_queue)); 1568 spdk_mempool_put(g_bdev_mgr.bdev_io_pool, (void *)bdev_io); 1569 } 1570 } 1571 1572 static bool 1573 bdev_qos_is_iops_rate_limit(enum spdk_bdev_qos_rate_limit_type limit) 1574 { 1575 assert(limit != SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES); 1576 1577 switch (limit) { 1578 case SPDK_BDEV_QOS_RW_IOPS_RATE_LIMIT: 1579 return true; 1580 case SPDK_BDEV_QOS_RW_BPS_RATE_LIMIT: 1581 case SPDK_BDEV_QOS_R_BPS_RATE_LIMIT: 1582 case SPDK_BDEV_QOS_W_BPS_RATE_LIMIT: 1583 return false; 1584 case SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES: 1585 default: 1586 return false; 1587 } 1588 } 1589 1590 static bool 1591 bdev_qos_io_to_limit(struct spdk_bdev_io *bdev_io) 1592 { 1593 switch (bdev_io->type) { 1594 case SPDK_BDEV_IO_TYPE_NVME_IO: 1595 case SPDK_BDEV_IO_TYPE_NVME_IO_MD: 1596 case SPDK_BDEV_IO_TYPE_READ: 1597 case SPDK_BDEV_IO_TYPE_WRITE: 1598 return true; 1599 case SPDK_BDEV_IO_TYPE_ZCOPY: 1600 if (bdev_io->u.bdev.zcopy.start) { 1601 return true; 1602 } else { 1603 return false; 1604 } 1605 default: 1606 return false; 1607 } 1608 } 1609 1610 static bool 1611 bdev_is_read_io(struct spdk_bdev_io *bdev_io) 1612 { 1613 switch (bdev_io->type) { 1614 case SPDK_BDEV_IO_TYPE_NVME_IO: 1615 case SPDK_BDEV_IO_TYPE_NVME_IO_MD: 1616 /* Bit 1 (0x2) set for read operation */ 1617 if (bdev_io->u.nvme_passthru.cmd.opc & SPDK_NVME_OPC_READ) { 1618 return true; 1619 } else { 1620 return false; 1621 } 1622 case SPDK_BDEV_IO_TYPE_READ: 1623 return true; 1624 case SPDK_BDEV_IO_TYPE_ZCOPY: 1625 /* Populate to read from disk */ 1626 if (bdev_io->u.bdev.zcopy.populate) { 1627 return true; 1628 } else { 1629 return false; 1630 } 1631 default: 1632 return false; 1633 } 1634 } 1635 1636 static uint64_t 1637 bdev_get_io_size_in_byte(struct spdk_bdev_io *bdev_io) 1638 { 1639 struct spdk_bdev *bdev = bdev_io->bdev; 1640 1641 switch (bdev_io->type) { 1642 case SPDK_BDEV_IO_TYPE_NVME_IO: 1643 case SPDK_BDEV_IO_TYPE_NVME_IO_MD: 1644 return bdev_io->u.nvme_passthru.nbytes; 1645 case SPDK_BDEV_IO_TYPE_READ: 1646 case SPDK_BDEV_IO_TYPE_WRITE: 1647 return bdev_io->u.bdev.num_blocks * bdev->blocklen; 1648 case SPDK_BDEV_IO_TYPE_ZCOPY: 1649 /* Track the data in the start phase only */ 1650 if (bdev_io->u.bdev.zcopy.start) { 1651 return bdev_io->u.bdev.num_blocks * bdev->blocklen; 1652 } else { 1653 return 0; 1654 } 1655 default: 1656 return 0; 1657 } 1658 } 1659 1660 static bool 1661 bdev_qos_rw_queue_io(const struct spdk_bdev_qos_limit *limit, struct spdk_bdev_io *io) 1662 { 1663 if (limit->max_per_timeslice > 0 && limit->remaining_this_timeslice <= 0) { 1664 return true; 1665 } else { 1666 return false; 1667 } 1668 } 1669 1670 static bool 1671 bdev_qos_r_queue_io(const struct spdk_bdev_qos_limit *limit, struct spdk_bdev_io *io) 1672 { 1673 if (bdev_is_read_io(io) == false) { 1674 return false; 1675 } 1676 1677 return bdev_qos_rw_queue_io(limit, io); 1678 } 1679 1680 static bool 1681 bdev_qos_w_queue_io(const struct spdk_bdev_qos_limit *limit, struct spdk_bdev_io *io) 1682 { 1683 if (bdev_is_read_io(io) == true) { 1684 return false; 1685 } 1686 1687 return bdev_qos_rw_queue_io(limit, io); 1688 } 1689 1690 static void 1691 bdev_qos_rw_iops_update_quota(struct spdk_bdev_qos_limit *limit, struct spdk_bdev_io *io) 1692 { 1693 limit->remaining_this_timeslice--; 1694 } 1695 1696 static void 1697 bdev_qos_rw_bps_update_quota(struct spdk_bdev_qos_limit *limit, struct spdk_bdev_io *io) 1698 { 1699 limit->remaining_this_timeslice -= bdev_get_io_size_in_byte(io); 1700 } 1701 1702 static void 1703 bdev_qos_r_bps_update_quota(struct spdk_bdev_qos_limit *limit, struct spdk_bdev_io *io) 1704 { 1705 if (bdev_is_read_io(io) == false) { 1706 return; 1707 } 1708 1709 return bdev_qos_rw_bps_update_quota(limit, io); 1710 } 1711 1712 static void 1713 bdev_qos_w_bps_update_quota(struct spdk_bdev_qos_limit *limit, struct spdk_bdev_io *io) 1714 { 1715 if (bdev_is_read_io(io) == true) { 1716 return; 1717 } 1718 1719 return bdev_qos_rw_bps_update_quota(limit, io); 1720 } 1721 1722 static void 1723 bdev_qos_set_ops(struct spdk_bdev_qos *qos) 1724 { 1725 int i; 1726 1727 for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) { 1728 if (qos->rate_limits[i].limit == SPDK_BDEV_QOS_LIMIT_NOT_DEFINED) { 1729 qos->rate_limits[i].queue_io = NULL; 1730 qos->rate_limits[i].update_quota = NULL; 1731 continue; 1732 } 1733 1734 switch (i) { 1735 case SPDK_BDEV_QOS_RW_IOPS_RATE_LIMIT: 1736 qos->rate_limits[i].queue_io = bdev_qos_rw_queue_io; 1737 qos->rate_limits[i].update_quota = bdev_qos_rw_iops_update_quota; 1738 break; 1739 case SPDK_BDEV_QOS_RW_BPS_RATE_LIMIT: 1740 qos->rate_limits[i].queue_io = bdev_qos_rw_queue_io; 1741 qos->rate_limits[i].update_quota = bdev_qos_rw_bps_update_quota; 1742 break; 1743 case SPDK_BDEV_QOS_R_BPS_RATE_LIMIT: 1744 qos->rate_limits[i].queue_io = bdev_qos_r_queue_io; 1745 qos->rate_limits[i].update_quota = bdev_qos_r_bps_update_quota; 1746 break; 1747 case SPDK_BDEV_QOS_W_BPS_RATE_LIMIT: 1748 qos->rate_limits[i].queue_io = bdev_qos_w_queue_io; 1749 qos->rate_limits[i].update_quota = bdev_qos_w_bps_update_quota; 1750 break; 1751 default: 1752 break; 1753 } 1754 } 1755 } 1756 1757 static void 1758 _bdev_io_complete_in_submit(struct spdk_bdev_channel *bdev_ch, 1759 struct spdk_bdev_io *bdev_io, 1760 enum spdk_bdev_io_status status) 1761 { 1762 struct spdk_bdev_shared_resource *shared_resource = bdev_ch->shared_resource; 1763 1764 bdev_io->internal.in_submit_request = true; 1765 bdev_ch->io_outstanding++; 1766 shared_resource->io_outstanding++; 1767 spdk_bdev_io_complete(bdev_io, status); 1768 bdev_io->internal.in_submit_request = false; 1769 } 1770 1771 static inline void 1772 bdev_io_do_submit(struct spdk_bdev_channel *bdev_ch, struct spdk_bdev_io *bdev_io) 1773 { 1774 struct spdk_bdev *bdev = bdev_io->bdev; 1775 struct spdk_io_channel *ch = bdev_ch->channel; 1776 struct spdk_bdev_shared_resource *shared_resource = bdev_ch->shared_resource; 1777 1778 if (spdk_unlikely(bdev_io->type == SPDK_BDEV_IO_TYPE_ABORT)) { 1779 struct spdk_bdev_mgmt_channel *mgmt_channel = shared_resource->mgmt_ch; 1780 struct spdk_bdev_io *bio_to_abort = bdev_io->u.abort.bio_to_abort; 1781 1782 if (bdev_abort_queued_io(&shared_resource->nomem_io, bio_to_abort) || 1783 bdev_abort_buf_io(&mgmt_channel->need_buf_small, bio_to_abort) || 1784 bdev_abort_buf_io(&mgmt_channel->need_buf_large, bio_to_abort)) { 1785 _bdev_io_complete_in_submit(bdev_ch, bdev_io, 1786 SPDK_BDEV_IO_STATUS_SUCCESS); 1787 return; 1788 } 1789 } 1790 1791 if (spdk_likely(TAILQ_EMPTY(&shared_resource->nomem_io))) { 1792 bdev_ch->io_outstanding++; 1793 shared_resource->io_outstanding++; 1794 bdev_io->internal.in_submit_request = true; 1795 bdev->fn_table->submit_request(ch, bdev_io); 1796 bdev_io->internal.in_submit_request = false; 1797 } else { 1798 TAILQ_INSERT_TAIL(&shared_resource->nomem_io, bdev_io, internal.link); 1799 } 1800 } 1801 1802 static int 1803 bdev_qos_io_submit(struct spdk_bdev_channel *ch, struct spdk_bdev_qos *qos) 1804 { 1805 struct spdk_bdev_io *bdev_io = NULL, *tmp = NULL; 1806 int i, submitted_ios = 0; 1807 1808 TAILQ_FOREACH_SAFE(bdev_io, &qos->queued, internal.link, tmp) { 1809 if (bdev_qos_io_to_limit(bdev_io) == true) { 1810 for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) { 1811 if (!qos->rate_limits[i].queue_io) { 1812 continue; 1813 } 1814 1815 if (qos->rate_limits[i].queue_io(&qos->rate_limits[i], 1816 bdev_io) == true) { 1817 return submitted_ios; 1818 } 1819 } 1820 for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) { 1821 if (!qos->rate_limits[i].update_quota) { 1822 continue; 1823 } 1824 1825 qos->rate_limits[i].update_quota(&qos->rate_limits[i], bdev_io); 1826 } 1827 } 1828 1829 TAILQ_REMOVE(&qos->queued, bdev_io, internal.link); 1830 bdev_io_do_submit(ch, bdev_io); 1831 submitted_ios++; 1832 } 1833 1834 return submitted_ios; 1835 } 1836 1837 static void 1838 bdev_queue_io_wait_with_cb(struct spdk_bdev_io *bdev_io, spdk_bdev_io_wait_cb cb_fn) 1839 { 1840 int rc; 1841 1842 bdev_io->internal.waitq_entry.bdev = bdev_io->bdev; 1843 bdev_io->internal.waitq_entry.cb_fn = cb_fn; 1844 bdev_io->internal.waitq_entry.cb_arg = bdev_io; 1845 rc = spdk_bdev_queue_io_wait(bdev_io->bdev, spdk_io_channel_from_ctx(bdev_io->internal.ch), 1846 &bdev_io->internal.waitq_entry); 1847 if (rc != 0) { 1848 SPDK_ERRLOG("Queue IO failed, rc=%d\n", rc); 1849 bdev_io->internal.status = SPDK_BDEV_IO_STATUS_FAILED; 1850 bdev_io->internal.cb(bdev_io, false, bdev_io->internal.caller_ctx); 1851 } 1852 } 1853 1854 static bool 1855 bdev_io_type_can_split(uint8_t type) 1856 { 1857 assert(type != SPDK_BDEV_IO_TYPE_INVALID); 1858 assert(type < SPDK_BDEV_NUM_IO_TYPES); 1859 1860 /* Only split READ and WRITE I/O. Theoretically other types of I/O like 1861 * UNMAP could be split, but these types of I/O are typically much larger 1862 * in size (sometimes the size of the entire block device), and the bdev 1863 * module can more efficiently split these types of I/O. Plus those types 1864 * of I/O do not have a payload, which makes the splitting process simpler. 1865 */ 1866 if (type == SPDK_BDEV_IO_TYPE_READ || type == SPDK_BDEV_IO_TYPE_WRITE) { 1867 return true; 1868 } else { 1869 return false; 1870 } 1871 } 1872 1873 static bool 1874 bdev_io_should_split(struct spdk_bdev_io *bdev_io) 1875 { 1876 uint64_t start_stripe, end_stripe; 1877 uint32_t io_boundary = bdev_io->bdev->optimal_io_boundary; 1878 1879 if (io_boundary == 0) { 1880 return false; 1881 } 1882 1883 if (!bdev_io_type_can_split(bdev_io->type)) { 1884 return false; 1885 } 1886 1887 start_stripe = bdev_io->u.bdev.offset_blocks; 1888 end_stripe = start_stripe + bdev_io->u.bdev.num_blocks - 1; 1889 /* Avoid expensive div operations if possible. These spdk_u32 functions are very cheap. */ 1890 if (spdk_likely(spdk_u32_is_pow2(io_boundary))) { 1891 start_stripe >>= spdk_u32log2(io_boundary); 1892 end_stripe >>= spdk_u32log2(io_boundary); 1893 } else { 1894 start_stripe /= io_boundary; 1895 end_stripe /= io_boundary; 1896 } 1897 return (start_stripe != end_stripe); 1898 } 1899 1900 static uint32_t 1901 _to_next_boundary(uint64_t offset, uint32_t boundary) 1902 { 1903 return (boundary - (offset % boundary)); 1904 } 1905 1906 static void 1907 bdev_io_split_done(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg); 1908 1909 static void 1910 _bdev_io_split(void *_bdev_io) 1911 { 1912 struct spdk_bdev_io *bdev_io = _bdev_io; 1913 uint64_t current_offset, remaining; 1914 uint32_t blocklen, to_next_boundary, to_next_boundary_bytes, to_last_block_bytes; 1915 struct iovec *parent_iov, *iov; 1916 uint64_t parent_iov_offset, iov_len; 1917 uint32_t parent_iovpos, parent_iovcnt, child_iovcnt, iovcnt; 1918 void *md_buf = NULL; 1919 int rc; 1920 1921 remaining = bdev_io->u.bdev.split_remaining_num_blocks; 1922 current_offset = bdev_io->u.bdev.split_current_offset_blocks; 1923 blocklen = bdev_io->bdev->blocklen; 1924 parent_iov_offset = (current_offset - bdev_io->u.bdev.offset_blocks) * blocklen; 1925 parent_iovcnt = bdev_io->u.bdev.iovcnt; 1926 1927 for (parent_iovpos = 0; parent_iovpos < parent_iovcnt; parent_iovpos++) { 1928 parent_iov = &bdev_io->u.bdev.iovs[parent_iovpos]; 1929 if (parent_iov_offset < parent_iov->iov_len) { 1930 break; 1931 } 1932 parent_iov_offset -= parent_iov->iov_len; 1933 } 1934 1935 child_iovcnt = 0; 1936 while (remaining > 0 && parent_iovpos < parent_iovcnt && child_iovcnt < BDEV_IO_NUM_CHILD_IOV) { 1937 to_next_boundary = _to_next_boundary(current_offset, bdev_io->bdev->optimal_io_boundary); 1938 to_next_boundary = spdk_min(remaining, to_next_boundary); 1939 to_next_boundary_bytes = to_next_boundary * blocklen; 1940 iov = &bdev_io->child_iov[child_iovcnt]; 1941 iovcnt = 0; 1942 1943 if (bdev_io->u.bdev.md_buf) { 1944 assert((parent_iov_offset % blocklen) > 0); 1945 md_buf = (char *)bdev_io->u.bdev.md_buf + (parent_iov_offset / blocklen) * 1946 spdk_bdev_get_md_size(bdev_io->bdev); 1947 } 1948 1949 while (to_next_boundary_bytes > 0 && parent_iovpos < parent_iovcnt && 1950 child_iovcnt < BDEV_IO_NUM_CHILD_IOV) { 1951 parent_iov = &bdev_io->u.bdev.iovs[parent_iovpos]; 1952 iov_len = spdk_min(to_next_boundary_bytes, parent_iov->iov_len - parent_iov_offset); 1953 to_next_boundary_bytes -= iov_len; 1954 1955 bdev_io->child_iov[child_iovcnt].iov_base = parent_iov->iov_base + parent_iov_offset; 1956 bdev_io->child_iov[child_iovcnt].iov_len = iov_len; 1957 1958 if (iov_len < parent_iov->iov_len - parent_iov_offset) { 1959 parent_iov_offset += iov_len; 1960 } else { 1961 parent_iovpos++; 1962 parent_iov_offset = 0; 1963 } 1964 child_iovcnt++; 1965 iovcnt++; 1966 } 1967 1968 if (to_next_boundary_bytes > 0) { 1969 /* We had to stop this child I/O early because we ran out of 1970 * child_iov space. Ensure the iovs to be aligned with block 1971 * size and then adjust to_next_boundary before starting the 1972 * child I/O. 1973 */ 1974 assert(child_iovcnt == BDEV_IO_NUM_CHILD_IOV); 1975 to_last_block_bytes = to_next_boundary_bytes % blocklen; 1976 if (to_last_block_bytes != 0) { 1977 uint32_t child_iovpos = child_iovcnt - 1; 1978 /* don't decrease child_iovcnt so the loop will naturally end */ 1979 1980 to_last_block_bytes = blocklen - to_last_block_bytes; 1981 to_next_boundary_bytes += to_last_block_bytes; 1982 while (to_last_block_bytes > 0 && iovcnt > 0) { 1983 iov_len = spdk_min(to_last_block_bytes, 1984 bdev_io->child_iov[child_iovpos].iov_len); 1985 bdev_io->child_iov[child_iovpos].iov_len -= iov_len; 1986 if (bdev_io->child_iov[child_iovpos].iov_len == 0) { 1987 child_iovpos--; 1988 if (--iovcnt == 0) { 1989 return; 1990 } 1991 } 1992 to_last_block_bytes -= iov_len; 1993 } 1994 1995 assert(to_last_block_bytes == 0); 1996 } 1997 to_next_boundary -= to_next_boundary_bytes / blocklen; 1998 } 1999 2000 bdev_io->u.bdev.split_outstanding++; 2001 2002 if (bdev_io->type == SPDK_BDEV_IO_TYPE_READ) { 2003 rc = bdev_readv_blocks_with_md(bdev_io->internal.desc, 2004 spdk_io_channel_from_ctx(bdev_io->internal.ch), 2005 iov, iovcnt, md_buf, current_offset, 2006 to_next_boundary, 2007 bdev_io_split_done, bdev_io); 2008 } else { 2009 rc = bdev_writev_blocks_with_md(bdev_io->internal.desc, 2010 spdk_io_channel_from_ctx(bdev_io->internal.ch), 2011 iov, iovcnt, md_buf, current_offset, 2012 to_next_boundary, 2013 bdev_io_split_done, bdev_io); 2014 } 2015 2016 if (rc == 0) { 2017 current_offset += to_next_boundary; 2018 remaining -= to_next_boundary; 2019 bdev_io->u.bdev.split_current_offset_blocks = current_offset; 2020 bdev_io->u.bdev.split_remaining_num_blocks = remaining; 2021 } else { 2022 bdev_io->u.bdev.split_outstanding--; 2023 if (rc == -ENOMEM) { 2024 if (bdev_io->u.bdev.split_outstanding == 0) { 2025 /* No I/O is outstanding. Hence we should wait here. */ 2026 bdev_queue_io_wait_with_cb(bdev_io, _bdev_io_split); 2027 } 2028 } else { 2029 bdev_io->internal.status = SPDK_BDEV_IO_STATUS_FAILED; 2030 if (bdev_io->u.bdev.split_outstanding == 0) { 2031 spdk_trace_record_tsc(spdk_get_ticks(), TRACE_BDEV_IO_DONE, 0, 0, 2032 (uintptr_t)bdev_io, 0); 2033 TAILQ_REMOVE(&bdev_io->internal.ch->io_submitted, bdev_io, internal.ch_link); 2034 bdev_io->internal.cb(bdev_io, false, bdev_io->internal.caller_ctx); 2035 } 2036 } 2037 2038 return; 2039 } 2040 } 2041 } 2042 2043 static void 2044 bdev_io_split_done(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg) 2045 { 2046 struct spdk_bdev_io *parent_io = cb_arg; 2047 2048 spdk_bdev_free_io(bdev_io); 2049 2050 if (!success) { 2051 parent_io->internal.status = SPDK_BDEV_IO_STATUS_FAILED; 2052 /* If any child I/O failed, stop further splitting process. */ 2053 parent_io->u.bdev.split_current_offset_blocks += parent_io->u.bdev.split_remaining_num_blocks; 2054 parent_io->u.bdev.split_remaining_num_blocks = 0; 2055 } 2056 parent_io->u.bdev.split_outstanding--; 2057 if (parent_io->u.bdev.split_outstanding != 0) { 2058 return; 2059 } 2060 2061 /* 2062 * Parent I/O finishes when all blocks are consumed. 2063 */ 2064 if (parent_io->u.bdev.split_remaining_num_blocks == 0) { 2065 assert(parent_io->internal.cb != bdev_io_split_done); 2066 spdk_trace_record_tsc(spdk_get_ticks(), TRACE_BDEV_IO_DONE, 0, 0, 2067 (uintptr_t)parent_io, 0); 2068 TAILQ_REMOVE(&parent_io->internal.ch->io_submitted, parent_io, internal.ch_link); 2069 parent_io->internal.cb(parent_io, parent_io->internal.status == SPDK_BDEV_IO_STATUS_SUCCESS, 2070 parent_io->internal.caller_ctx); 2071 return; 2072 } 2073 2074 /* 2075 * Continue with the splitting process. This function will complete the parent I/O if the 2076 * splitting is done. 2077 */ 2078 _bdev_io_split(parent_io); 2079 } 2080 2081 static void 2082 bdev_io_split_get_buf_cb(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io, bool success); 2083 2084 static void 2085 bdev_io_split(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io) 2086 { 2087 assert(bdev_io_type_can_split(bdev_io->type)); 2088 2089 bdev_io->u.bdev.split_current_offset_blocks = bdev_io->u.bdev.offset_blocks; 2090 bdev_io->u.bdev.split_remaining_num_blocks = bdev_io->u.bdev.num_blocks; 2091 bdev_io->u.bdev.split_outstanding = 0; 2092 bdev_io->internal.status = SPDK_BDEV_IO_STATUS_SUCCESS; 2093 2094 if (_is_buf_allocated(bdev_io->u.bdev.iovs)) { 2095 _bdev_io_split(bdev_io); 2096 } else { 2097 assert(bdev_io->type == SPDK_BDEV_IO_TYPE_READ); 2098 spdk_bdev_io_get_buf(bdev_io, bdev_io_split_get_buf_cb, 2099 bdev_io->u.bdev.num_blocks * bdev_io->bdev->blocklen); 2100 } 2101 } 2102 2103 static void 2104 bdev_io_split_get_buf_cb(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io, bool success) 2105 { 2106 if (!success) { 2107 spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED); 2108 return; 2109 } 2110 2111 _bdev_io_split(bdev_io); 2112 } 2113 2114 /* Explicitly mark this inline, since it's used as a function pointer and otherwise won't 2115 * be inlined, at least on some compilers. 2116 */ 2117 static inline void 2118 _bdev_io_submit(void *ctx) 2119 { 2120 struct spdk_bdev_io *bdev_io = ctx; 2121 struct spdk_bdev *bdev = bdev_io->bdev; 2122 struct spdk_bdev_channel *bdev_ch = bdev_io->internal.ch; 2123 uint64_t tsc; 2124 2125 tsc = spdk_get_ticks(); 2126 bdev_io->internal.submit_tsc = tsc; 2127 spdk_trace_record_tsc(tsc, TRACE_BDEV_IO_START, 0, 0, (uintptr_t)bdev_io, bdev_io->type); 2128 2129 if (spdk_likely(bdev_ch->flags == 0)) { 2130 bdev_io_do_submit(bdev_ch, bdev_io); 2131 return; 2132 } 2133 2134 if (bdev_ch->flags & BDEV_CH_RESET_IN_PROGRESS) { 2135 _bdev_io_complete_in_submit(bdev_ch, bdev_io, SPDK_BDEV_IO_STATUS_ABORTED); 2136 } else if (bdev_ch->flags & BDEV_CH_QOS_ENABLED) { 2137 if (spdk_unlikely(bdev_io->type == SPDK_BDEV_IO_TYPE_ABORT) && 2138 bdev_abort_queued_io(&bdev->internal.qos->queued, bdev_io->u.abort.bio_to_abort)) { 2139 _bdev_io_complete_in_submit(bdev_ch, bdev_io, SPDK_BDEV_IO_STATUS_SUCCESS); 2140 } else { 2141 TAILQ_INSERT_TAIL(&bdev->internal.qos->queued, bdev_io, internal.link); 2142 bdev_qos_io_submit(bdev_ch, bdev->internal.qos); 2143 } 2144 } else { 2145 SPDK_ERRLOG("unknown bdev_ch flag %x found\n", bdev_ch->flags); 2146 _bdev_io_complete_in_submit(bdev_ch, bdev_io, SPDK_BDEV_IO_STATUS_FAILED); 2147 } 2148 } 2149 2150 bool 2151 bdev_lba_range_overlapped(struct lba_range *range1, struct lba_range *range2); 2152 2153 bool 2154 bdev_lba_range_overlapped(struct lba_range *range1, struct lba_range *range2) 2155 { 2156 if (range1->length == 0 || range2->length == 0) { 2157 return false; 2158 } 2159 2160 if (range1->offset + range1->length <= range2->offset) { 2161 return false; 2162 } 2163 2164 if (range2->offset + range2->length <= range1->offset) { 2165 return false; 2166 } 2167 2168 return true; 2169 } 2170 2171 static bool 2172 bdev_io_range_is_locked(struct spdk_bdev_io *bdev_io, struct lba_range *range) 2173 { 2174 struct spdk_bdev_channel *ch = bdev_io->internal.ch; 2175 struct lba_range r; 2176 2177 switch (bdev_io->type) { 2178 case SPDK_BDEV_IO_TYPE_NVME_IO: 2179 case SPDK_BDEV_IO_TYPE_NVME_IO_MD: 2180 /* Don't try to decode the NVMe command - just assume worst-case and that 2181 * it overlaps a locked range. 2182 */ 2183 return true; 2184 case SPDK_BDEV_IO_TYPE_WRITE: 2185 case SPDK_BDEV_IO_TYPE_UNMAP: 2186 case SPDK_BDEV_IO_TYPE_WRITE_ZEROES: 2187 case SPDK_BDEV_IO_TYPE_ZCOPY: 2188 r.offset = bdev_io->u.bdev.offset_blocks; 2189 r.length = bdev_io->u.bdev.num_blocks; 2190 if (!bdev_lba_range_overlapped(range, &r)) { 2191 /* This I/O doesn't overlap the specified LBA range. */ 2192 return false; 2193 } else if (range->owner_ch == ch && range->locked_ctx == bdev_io->internal.caller_ctx) { 2194 /* This I/O overlaps, but the I/O is on the same channel that locked this 2195 * range, and the caller_ctx is the same as the locked_ctx. This means 2196 * that this I/O is associated with the lock, and is allowed to execute. 2197 */ 2198 return false; 2199 } else { 2200 return true; 2201 } 2202 default: 2203 return false; 2204 } 2205 } 2206 2207 void 2208 bdev_io_submit(struct spdk_bdev_io *bdev_io) 2209 { 2210 struct spdk_bdev *bdev = bdev_io->bdev; 2211 struct spdk_thread *thread = spdk_bdev_io_get_thread(bdev_io); 2212 struct spdk_bdev_channel *ch = bdev_io->internal.ch; 2213 2214 assert(thread != NULL); 2215 assert(bdev_io->internal.status == SPDK_BDEV_IO_STATUS_PENDING); 2216 2217 if (!TAILQ_EMPTY(&ch->locked_ranges)) { 2218 struct lba_range *range; 2219 2220 TAILQ_FOREACH(range, &ch->locked_ranges, tailq) { 2221 if (bdev_io_range_is_locked(bdev_io, range)) { 2222 TAILQ_INSERT_TAIL(&ch->io_locked, bdev_io, internal.ch_link); 2223 return; 2224 } 2225 } 2226 } 2227 2228 TAILQ_INSERT_TAIL(&ch->io_submitted, bdev_io, internal.ch_link); 2229 2230 if (bdev->split_on_optimal_io_boundary && bdev_io_should_split(bdev_io)) { 2231 bdev_io->internal.submit_tsc = spdk_get_ticks(); 2232 spdk_trace_record_tsc(bdev_io->internal.submit_tsc, TRACE_BDEV_IO_START, 0, 0, 2233 (uintptr_t)bdev_io, bdev_io->type); 2234 bdev_io_split(NULL, bdev_io); 2235 return; 2236 } 2237 2238 if (ch->flags & BDEV_CH_QOS_ENABLED) { 2239 if ((thread == bdev->internal.qos->thread) || !bdev->internal.qos->thread) { 2240 _bdev_io_submit(bdev_io); 2241 } else { 2242 bdev_io->internal.io_submit_ch = ch; 2243 bdev_io->internal.ch = bdev->internal.qos->ch; 2244 spdk_thread_send_msg(bdev->internal.qos->thread, _bdev_io_submit, bdev_io); 2245 } 2246 } else { 2247 _bdev_io_submit(bdev_io); 2248 } 2249 } 2250 2251 static void 2252 bdev_io_submit_reset(struct spdk_bdev_io *bdev_io) 2253 { 2254 struct spdk_bdev *bdev = bdev_io->bdev; 2255 struct spdk_bdev_channel *bdev_ch = bdev_io->internal.ch; 2256 struct spdk_io_channel *ch = bdev_ch->channel; 2257 2258 assert(bdev_io->internal.status == SPDK_BDEV_IO_STATUS_PENDING); 2259 2260 bdev_io->internal.in_submit_request = true; 2261 bdev->fn_table->submit_request(ch, bdev_io); 2262 bdev_io->internal.in_submit_request = false; 2263 } 2264 2265 void 2266 bdev_io_init(struct spdk_bdev_io *bdev_io, 2267 struct spdk_bdev *bdev, void *cb_arg, 2268 spdk_bdev_io_completion_cb cb) 2269 { 2270 bdev_io->bdev = bdev; 2271 bdev_io->internal.caller_ctx = cb_arg; 2272 bdev_io->internal.cb = cb; 2273 bdev_io->internal.status = SPDK_BDEV_IO_STATUS_PENDING; 2274 bdev_io->internal.in_submit_request = false; 2275 bdev_io->internal.buf = NULL; 2276 bdev_io->internal.io_submit_ch = NULL; 2277 bdev_io->internal.orig_iovs = NULL; 2278 bdev_io->internal.orig_iovcnt = 0; 2279 bdev_io->internal.orig_md_buf = NULL; 2280 bdev_io->internal.error.nvme.cdw0 = 0; 2281 bdev_io->num_retries = 0; 2282 bdev_io->internal.get_buf_cb = NULL; 2283 bdev_io->internal.get_aux_buf_cb = NULL; 2284 } 2285 2286 static bool 2287 bdev_io_type_supported(struct spdk_bdev *bdev, enum spdk_bdev_io_type io_type) 2288 { 2289 return bdev->fn_table->io_type_supported(bdev->ctxt, io_type); 2290 } 2291 2292 bool 2293 spdk_bdev_io_type_supported(struct spdk_bdev *bdev, enum spdk_bdev_io_type io_type) 2294 { 2295 bool supported; 2296 2297 supported = bdev_io_type_supported(bdev, io_type); 2298 2299 if (!supported) { 2300 switch (io_type) { 2301 case SPDK_BDEV_IO_TYPE_WRITE_ZEROES: 2302 /* The bdev layer will emulate write zeroes as long as write is supported. */ 2303 supported = bdev_io_type_supported(bdev, SPDK_BDEV_IO_TYPE_WRITE); 2304 break; 2305 case SPDK_BDEV_IO_TYPE_ZCOPY: 2306 /* Zero copy can be emulated with regular read and write */ 2307 supported = bdev_io_type_supported(bdev, SPDK_BDEV_IO_TYPE_READ) && 2308 bdev_io_type_supported(bdev, SPDK_BDEV_IO_TYPE_WRITE); 2309 break; 2310 default: 2311 break; 2312 } 2313 } 2314 2315 return supported; 2316 } 2317 2318 int 2319 spdk_bdev_dump_info_json(struct spdk_bdev *bdev, struct spdk_json_write_ctx *w) 2320 { 2321 if (bdev->fn_table->dump_info_json) { 2322 return bdev->fn_table->dump_info_json(bdev->ctxt, w); 2323 } 2324 2325 return 0; 2326 } 2327 2328 static void 2329 bdev_qos_update_max_quota_per_timeslice(struct spdk_bdev_qos *qos) 2330 { 2331 uint32_t max_per_timeslice = 0; 2332 int i; 2333 2334 for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) { 2335 if (qos->rate_limits[i].limit == SPDK_BDEV_QOS_LIMIT_NOT_DEFINED) { 2336 qos->rate_limits[i].max_per_timeslice = 0; 2337 continue; 2338 } 2339 2340 max_per_timeslice = qos->rate_limits[i].limit * 2341 SPDK_BDEV_QOS_TIMESLICE_IN_USEC / SPDK_SEC_TO_USEC; 2342 2343 qos->rate_limits[i].max_per_timeslice = spdk_max(max_per_timeslice, 2344 qos->rate_limits[i].min_per_timeslice); 2345 2346 qos->rate_limits[i].remaining_this_timeslice = qos->rate_limits[i].max_per_timeslice; 2347 } 2348 2349 bdev_qos_set_ops(qos); 2350 } 2351 2352 static int 2353 bdev_channel_poll_qos(void *arg) 2354 { 2355 struct spdk_bdev_qos *qos = arg; 2356 uint64_t now = spdk_get_ticks(); 2357 int i; 2358 2359 if (now < (qos->last_timeslice + qos->timeslice_size)) { 2360 /* We received our callback earlier than expected - return 2361 * immediately and wait to do accounting until at least one 2362 * timeslice has actually expired. This should never happen 2363 * with a well-behaved timer implementation. 2364 */ 2365 return SPDK_POLLER_IDLE; 2366 } 2367 2368 /* Reset for next round of rate limiting */ 2369 for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) { 2370 /* We may have allowed the IOs or bytes to slightly overrun in the last 2371 * timeslice. remaining_this_timeslice is signed, so if it's negative 2372 * here, we'll account for the overrun so that the next timeslice will 2373 * be appropriately reduced. 2374 */ 2375 if (qos->rate_limits[i].remaining_this_timeslice > 0) { 2376 qos->rate_limits[i].remaining_this_timeslice = 0; 2377 } 2378 } 2379 2380 while (now >= (qos->last_timeslice + qos->timeslice_size)) { 2381 qos->last_timeslice += qos->timeslice_size; 2382 for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) { 2383 qos->rate_limits[i].remaining_this_timeslice += 2384 qos->rate_limits[i].max_per_timeslice; 2385 } 2386 } 2387 2388 return bdev_qos_io_submit(qos->ch, qos); 2389 } 2390 2391 static void 2392 bdev_channel_destroy_resource(struct spdk_bdev_channel *ch) 2393 { 2394 struct spdk_bdev_shared_resource *shared_resource; 2395 struct lba_range *range; 2396 2397 while (!TAILQ_EMPTY(&ch->locked_ranges)) { 2398 range = TAILQ_FIRST(&ch->locked_ranges); 2399 TAILQ_REMOVE(&ch->locked_ranges, range, tailq); 2400 free(range); 2401 } 2402 2403 spdk_put_io_channel(ch->channel); 2404 2405 shared_resource = ch->shared_resource; 2406 2407 assert(TAILQ_EMPTY(&ch->io_locked)); 2408 assert(TAILQ_EMPTY(&ch->io_submitted)); 2409 assert(ch->io_outstanding == 0); 2410 assert(shared_resource->ref > 0); 2411 shared_resource->ref--; 2412 if (shared_resource->ref == 0) { 2413 assert(shared_resource->io_outstanding == 0); 2414 TAILQ_REMOVE(&shared_resource->mgmt_ch->shared_resources, shared_resource, link); 2415 spdk_put_io_channel(spdk_io_channel_from_ctx(shared_resource->mgmt_ch)); 2416 free(shared_resource); 2417 } 2418 } 2419 2420 /* Caller must hold bdev->internal.mutex. */ 2421 static void 2422 bdev_enable_qos(struct spdk_bdev *bdev, struct spdk_bdev_channel *ch) 2423 { 2424 struct spdk_bdev_qos *qos = bdev->internal.qos; 2425 int i; 2426 2427 /* Rate limiting on this bdev enabled */ 2428 if (qos) { 2429 if (qos->ch == NULL) { 2430 struct spdk_io_channel *io_ch; 2431 2432 SPDK_DEBUGLOG(bdev, "Selecting channel %p as QoS channel for bdev %s on thread %p\n", ch, 2433 bdev->name, spdk_get_thread()); 2434 2435 /* No qos channel has been selected, so set one up */ 2436 2437 /* Take another reference to ch */ 2438 io_ch = spdk_get_io_channel(__bdev_to_io_dev(bdev)); 2439 assert(io_ch != NULL); 2440 qos->ch = ch; 2441 2442 qos->thread = spdk_io_channel_get_thread(io_ch); 2443 2444 TAILQ_INIT(&qos->queued); 2445 2446 for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) { 2447 if (bdev_qos_is_iops_rate_limit(i) == true) { 2448 qos->rate_limits[i].min_per_timeslice = 2449 SPDK_BDEV_QOS_MIN_IO_PER_TIMESLICE; 2450 } else { 2451 qos->rate_limits[i].min_per_timeslice = 2452 SPDK_BDEV_QOS_MIN_BYTE_PER_TIMESLICE; 2453 } 2454 2455 if (qos->rate_limits[i].limit == 0) { 2456 qos->rate_limits[i].limit = SPDK_BDEV_QOS_LIMIT_NOT_DEFINED; 2457 } 2458 } 2459 bdev_qos_update_max_quota_per_timeslice(qos); 2460 qos->timeslice_size = 2461 SPDK_BDEV_QOS_TIMESLICE_IN_USEC * spdk_get_ticks_hz() / SPDK_SEC_TO_USEC; 2462 qos->last_timeslice = spdk_get_ticks(); 2463 qos->poller = SPDK_POLLER_REGISTER(bdev_channel_poll_qos, 2464 qos, 2465 SPDK_BDEV_QOS_TIMESLICE_IN_USEC); 2466 } 2467 2468 ch->flags |= BDEV_CH_QOS_ENABLED; 2469 } 2470 } 2471 2472 struct poll_timeout_ctx { 2473 struct spdk_bdev_desc *desc; 2474 uint64_t timeout_in_sec; 2475 spdk_bdev_io_timeout_cb cb_fn; 2476 void *cb_arg; 2477 }; 2478 2479 static void 2480 bdev_desc_free(struct spdk_bdev_desc *desc) 2481 { 2482 pthread_mutex_destroy(&desc->mutex); 2483 free(desc->media_events_buffer); 2484 free(desc); 2485 } 2486 2487 static void 2488 bdev_channel_poll_timeout_io_done(struct spdk_io_channel_iter *i, int status) 2489 { 2490 struct poll_timeout_ctx *ctx = spdk_io_channel_iter_get_ctx(i); 2491 struct spdk_bdev_desc *desc = ctx->desc; 2492 2493 free(ctx); 2494 2495 pthread_mutex_lock(&desc->mutex); 2496 desc->refs--; 2497 if (desc->closed == true && desc->refs == 0) { 2498 pthread_mutex_unlock(&desc->mutex); 2499 bdev_desc_free(desc); 2500 return; 2501 } 2502 pthread_mutex_unlock(&desc->mutex); 2503 } 2504 2505 static void 2506 bdev_channel_poll_timeout_io(struct spdk_io_channel_iter *i) 2507 { 2508 struct poll_timeout_ctx *ctx = spdk_io_channel_iter_get_ctx(i); 2509 struct spdk_io_channel *io_ch = spdk_io_channel_iter_get_channel(i); 2510 struct spdk_bdev_channel *bdev_ch = spdk_io_channel_get_ctx(io_ch); 2511 struct spdk_bdev_desc *desc = ctx->desc; 2512 struct spdk_bdev_io *bdev_io; 2513 uint64_t now; 2514 2515 pthread_mutex_lock(&desc->mutex); 2516 if (desc->closed == true) { 2517 pthread_mutex_unlock(&desc->mutex); 2518 spdk_for_each_channel_continue(i, -1); 2519 return; 2520 } 2521 pthread_mutex_unlock(&desc->mutex); 2522 2523 now = spdk_get_ticks(); 2524 TAILQ_FOREACH(bdev_io, &bdev_ch->io_submitted, internal.ch_link) { 2525 /* Exclude any I/O that are generated via splitting. */ 2526 if (bdev_io->internal.cb == bdev_io_split_done) { 2527 continue; 2528 } 2529 2530 /* Once we find an I/O that has not timed out, we can immediately 2531 * exit the loop. 2532 */ 2533 if (now < (bdev_io->internal.submit_tsc + 2534 ctx->timeout_in_sec * spdk_get_ticks_hz())) { 2535 goto end; 2536 } 2537 2538 if (bdev_io->internal.desc == desc) { 2539 ctx->cb_fn(ctx->cb_arg, bdev_io); 2540 } 2541 } 2542 2543 end: 2544 spdk_for_each_channel_continue(i, 0); 2545 } 2546 2547 static int 2548 bdev_poll_timeout_io(void *arg) 2549 { 2550 struct spdk_bdev_desc *desc = arg; 2551 struct spdk_bdev *bdev = spdk_bdev_desc_get_bdev(desc); 2552 struct poll_timeout_ctx *ctx; 2553 2554 ctx = calloc(1, sizeof(struct poll_timeout_ctx)); 2555 if (!ctx) { 2556 SPDK_ERRLOG("failed to allocate memory\n"); 2557 return SPDK_POLLER_BUSY; 2558 } 2559 ctx->desc = desc; 2560 ctx->cb_arg = desc->cb_arg; 2561 ctx->cb_fn = desc->cb_fn; 2562 ctx->timeout_in_sec = desc->timeout_in_sec; 2563 2564 /* Take a ref on the descriptor in case it gets closed while we are checking 2565 * all of the channels. 2566 */ 2567 pthread_mutex_lock(&desc->mutex); 2568 desc->refs++; 2569 pthread_mutex_unlock(&desc->mutex); 2570 2571 spdk_for_each_channel(__bdev_to_io_dev(bdev), 2572 bdev_channel_poll_timeout_io, 2573 ctx, 2574 bdev_channel_poll_timeout_io_done); 2575 2576 return SPDK_POLLER_BUSY; 2577 } 2578 2579 int 2580 spdk_bdev_set_timeout(struct spdk_bdev_desc *desc, uint64_t timeout_in_sec, 2581 spdk_bdev_io_timeout_cb cb_fn, void *cb_arg) 2582 { 2583 assert(desc->thread == spdk_get_thread()); 2584 2585 spdk_poller_unregister(&desc->io_timeout_poller); 2586 2587 if (timeout_in_sec) { 2588 assert(cb_fn != NULL); 2589 desc->io_timeout_poller = SPDK_POLLER_REGISTER(bdev_poll_timeout_io, 2590 desc, 2591 SPDK_BDEV_IO_POLL_INTERVAL_IN_MSEC * SPDK_SEC_TO_USEC / 2592 1000); 2593 if (desc->io_timeout_poller == NULL) { 2594 SPDK_ERRLOG("can not register the desc timeout IO poller\n"); 2595 return -1; 2596 } 2597 } 2598 2599 desc->cb_fn = cb_fn; 2600 desc->cb_arg = cb_arg; 2601 desc->timeout_in_sec = timeout_in_sec; 2602 2603 return 0; 2604 } 2605 2606 static int 2607 bdev_channel_create(void *io_device, void *ctx_buf) 2608 { 2609 struct spdk_bdev *bdev = __bdev_from_io_dev(io_device); 2610 struct spdk_bdev_channel *ch = ctx_buf; 2611 struct spdk_io_channel *mgmt_io_ch; 2612 struct spdk_bdev_mgmt_channel *mgmt_ch; 2613 struct spdk_bdev_shared_resource *shared_resource; 2614 struct lba_range *range; 2615 2616 ch->bdev = bdev; 2617 ch->channel = bdev->fn_table->get_io_channel(bdev->ctxt); 2618 if (!ch->channel) { 2619 return -1; 2620 } 2621 2622 assert(ch->histogram == NULL); 2623 if (bdev->internal.histogram_enabled) { 2624 ch->histogram = spdk_histogram_data_alloc(); 2625 if (ch->histogram == NULL) { 2626 SPDK_ERRLOG("Could not allocate histogram\n"); 2627 } 2628 } 2629 2630 mgmt_io_ch = spdk_get_io_channel(&g_bdev_mgr); 2631 if (!mgmt_io_ch) { 2632 spdk_put_io_channel(ch->channel); 2633 return -1; 2634 } 2635 2636 mgmt_ch = spdk_io_channel_get_ctx(mgmt_io_ch); 2637 TAILQ_FOREACH(shared_resource, &mgmt_ch->shared_resources, link) { 2638 if (shared_resource->shared_ch == ch->channel) { 2639 spdk_put_io_channel(mgmt_io_ch); 2640 shared_resource->ref++; 2641 break; 2642 } 2643 } 2644 2645 if (shared_resource == NULL) { 2646 shared_resource = calloc(1, sizeof(*shared_resource)); 2647 if (shared_resource == NULL) { 2648 spdk_put_io_channel(ch->channel); 2649 spdk_put_io_channel(mgmt_io_ch); 2650 return -1; 2651 } 2652 2653 shared_resource->mgmt_ch = mgmt_ch; 2654 shared_resource->io_outstanding = 0; 2655 TAILQ_INIT(&shared_resource->nomem_io); 2656 shared_resource->nomem_threshold = 0; 2657 shared_resource->shared_ch = ch->channel; 2658 shared_resource->ref = 1; 2659 TAILQ_INSERT_TAIL(&mgmt_ch->shared_resources, shared_resource, link); 2660 } 2661 2662 memset(&ch->stat, 0, sizeof(ch->stat)); 2663 ch->stat.ticks_rate = spdk_get_ticks_hz(); 2664 ch->io_outstanding = 0; 2665 TAILQ_INIT(&ch->queued_resets); 2666 TAILQ_INIT(&ch->locked_ranges); 2667 ch->flags = 0; 2668 ch->shared_resource = shared_resource; 2669 2670 TAILQ_INIT(&ch->io_submitted); 2671 TAILQ_INIT(&ch->io_locked); 2672 2673 #ifdef SPDK_CONFIG_VTUNE 2674 { 2675 char *name; 2676 __itt_init_ittlib(NULL, 0); 2677 name = spdk_sprintf_alloc("spdk_bdev_%s_%p", ch->bdev->name, ch); 2678 if (!name) { 2679 bdev_channel_destroy_resource(ch); 2680 return -1; 2681 } 2682 ch->handle = __itt_string_handle_create(name); 2683 free(name); 2684 ch->start_tsc = spdk_get_ticks(); 2685 ch->interval_tsc = spdk_get_ticks_hz() / 100; 2686 memset(&ch->prev_stat, 0, sizeof(ch->prev_stat)); 2687 } 2688 #endif 2689 2690 pthread_mutex_lock(&bdev->internal.mutex); 2691 bdev_enable_qos(bdev, ch); 2692 2693 TAILQ_FOREACH(range, &bdev->internal.locked_ranges, tailq) { 2694 struct lba_range *new_range; 2695 2696 new_range = calloc(1, sizeof(*new_range)); 2697 if (new_range == NULL) { 2698 pthread_mutex_unlock(&bdev->internal.mutex); 2699 bdev_channel_destroy_resource(ch); 2700 return -1; 2701 } 2702 new_range->length = range->length; 2703 new_range->offset = range->offset; 2704 new_range->locked_ctx = range->locked_ctx; 2705 TAILQ_INSERT_TAIL(&ch->locked_ranges, new_range, tailq); 2706 } 2707 2708 pthread_mutex_unlock(&bdev->internal.mutex); 2709 2710 return 0; 2711 } 2712 2713 /* 2714 * Abort I/O that are waiting on a data buffer. These types of I/O are 2715 * linked using the spdk_bdev_io internal.buf_link TAILQ_ENTRY. 2716 */ 2717 static void 2718 bdev_abort_all_buf_io(bdev_io_stailq_t *queue, struct spdk_bdev_channel *ch) 2719 { 2720 bdev_io_stailq_t tmp; 2721 struct spdk_bdev_io *bdev_io; 2722 2723 STAILQ_INIT(&tmp); 2724 2725 while (!STAILQ_EMPTY(queue)) { 2726 bdev_io = STAILQ_FIRST(queue); 2727 STAILQ_REMOVE_HEAD(queue, internal.buf_link); 2728 if (bdev_io->internal.ch == ch) { 2729 spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_ABORTED); 2730 } else { 2731 STAILQ_INSERT_TAIL(&tmp, bdev_io, internal.buf_link); 2732 } 2733 } 2734 2735 STAILQ_SWAP(&tmp, queue, spdk_bdev_io); 2736 } 2737 2738 /* 2739 * Abort I/O that are queued waiting for submission. These types of I/O are 2740 * linked using the spdk_bdev_io link TAILQ_ENTRY. 2741 */ 2742 static void 2743 bdev_abort_all_queued_io(bdev_io_tailq_t *queue, struct spdk_bdev_channel *ch) 2744 { 2745 struct spdk_bdev_io *bdev_io, *tmp; 2746 2747 TAILQ_FOREACH_SAFE(bdev_io, queue, internal.link, tmp) { 2748 if (bdev_io->internal.ch == ch) { 2749 TAILQ_REMOVE(queue, bdev_io, internal.link); 2750 /* 2751 * spdk_bdev_io_complete() assumes that the completed I/O had 2752 * been submitted to the bdev module. Since in this case it 2753 * hadn't, bump io_outstanding to account for the decrement 2754 * that spdk_bdev_io_complete() will do. 2755 */ 2756 if (bdev_io->type != SPDK_BDEV_IO_TYPE_RESET) { 2757 ch->io_outstanding++; 2758 ch->shared_resource->io_outstanding++; 2759 } 2760 spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_ABORTED); 2761 } 2762 } 2763 } 2764 2765 static bool 2766 bdev_abort_queued_io(bdev_io_tailq_t *queue, struct spdk_bdev_io *bio_to_abort) 2767 { 2768 struct spdk_bdev_io *bdev_io; 2769 2770 TAILQ_FOREACH(bdev_io, queue, internal.link) { 2771 if (bdev_io == bio_to_abort) { 2772 TAILQ_REMOVE(queue, bio_to_abort, internal.link); 2773 spdk_bdev_io_complete(bio_to_abort, SPDK_BDEV_IO_STATUS_ABORTED); 2774 return true; 2775 } 2776 } 2777 2778 return false; 2779 } 2780 2781 static bool 2782 bdev_abort_buf_io(bdev_io_stailq_t *queue, struct spdk_bdev_io *bio_to_abort) 2783 { 2784 struct spdk_bdev_io *bdev_io; 2785 2786 STAILQ_FOREACH(bdev_io, queue, internal.buf_link) { 2787 if (bdev_io == bio_to_abort) { 2788 STAILQ_REMOVE(queue, bio_to_abort, spdk_bdev_io, internal.buf_link); 2789 spdk_bdev_io_complete(bio_to_abort, SPDK_BDEV_IO_STATUS_ABORTED); 2790 return true; 2791 } 2792 } 2793 2794 return false; 2795 } 2796 2797 static void 2798 bdev_qos_channel_destroy(void *cb_arg) 2799 { 2800 struct spdk_bdev_qos *qos = cb_arg; 2801 2802 spdk_put_io_channel(spdk_io_channel_from_ctx(qos->ch)); 2803 spdk_poller_unregister(&qos->poller); 2804 2805 SPDK_DEBUGLOG(bdev, "Free QoS %p.\n", qos); 2806 2807 free(qos); 2808 } 2809 2810 static int 2811 bdev_qos_destroy(struct spdk_bdev *bdev) 2812 { 2813 int i; 2814 2815 /* 2816 * Cleanly shutting down the QoS poller is tricky, because 2817 * during the asynchronous operation the user could open 2818 * a new descriptor and create a new channel, spawning 2819 * a new QoS poller. 2820 * 2821 * The strategy is to create a new QoS structure here and swap it 2822 * in. The shutdown path then continues to refer to the old one 2823 * until it completes and then releases it. 2824 */ 2825 struct spdk_bdev_qos *new_qos, *old_qos; 2826 2827 old_qos = bdev->internal.qos; 2828 2829 new_qos = calloc(1, sizeof(*new_qos)); 2830 if (!new_qos) { 2831 SPDK_ERRLOG("Unable to allocate memory to shut down QoS.\n"); 2832 return -ENOMEM; 2833 } 2834 2835 /* Copy the old QoS data into the newly allocated structure */ 2836 memcpy(new_qos, old_qos, sizeof(*new_qos)); 2837 2838 /* Zero out the key parts of the QoS structure */ 2839 new_qos->ch = NULL; 2840 new_qos->thread = NULL; 2841 new_qos->poller = NULL; 2842 TAILQ_INIT(&new_qos->queued); 2843 /* 2844 * The limit member of spdk_bdev_qos_limit structure is not zeroed. 2845 * It will be used later for the new QoS structure. 2846 */ 2847 for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) { 2848 new_qos->rate_limits[i].remaining_this_timeslice = 0; 2849 new_qos->rate_limits[i].min_per_timeslice = 0; 2850 new_qos->rate_limits[i].max_per_timeslice = 0; 2851 } 2852 2853 bdev->internal.qos = new_qos; 2854 2855 if (old_qos->thread == NULL) { 2856 free(old_qos); 2857 } else { 2858 spdk_thread_send_msg(old_qos->thread, bdev_qos_channel_destroy, old_qos); 2859 } 2860 2861 /* It is safe to continue with destroying the bdev even though the QoS channel hasn't 2862 * been destroyed yet. The destruction path will end up waiting for the final 2863 * channel to be put before it releases resources. */ 2864 2865 return 0; 2866 } 2867 2868 static void 2869 bdev_io_stat_add(struct spdk_bdev_io_stat *total, struct spdk_bdev_io_stat *add) 2870 { 2871 total->bytes_read += add->bytes_read; 2872 total->num_read_ops += add->num_read_ops; 2873 total->bytes_written += add->bytes_written; 2874 total->num_write_ops += add->num_write_ops; 2875 total->bytes_unmapped += add->bytes_unmapped; 2876 total->num_unmap_ops += add->num_unmap_ops; 2877 total->read_latency_ticks += add->read_latency_ticks; 2878 total->write_latency_ticks += add->write_latency_ticks; 2879 total->unmap_latency_ticks += add->unmap_latency_ticks; 2880 } 2881 2882 static void 2883 bdev_channel_destroy(void *io_device, void *ctx_buf) 2884 { 2885 struct spdk_bdev_channel *ch = ctx_buf; 2886 struct spdk_bdev_mgmt_channel *mgmt_ch; 2887 struct spdk_bdev_shared_resource *shared_resource = ch->shared_resource; 2888 2889 SPDK_DEBUGLOG(bdev, "Destroying channel %p for bdev %s on thread %p\n", ch, ch->bdev->name, 2890 spdk_get_thread()); 2891 2892 /* This channel is going away, so add its statistics into the bdev so that they don't get lost. */ 2893 pthread_mutex_lock(&ch->bdev->internal.mutex); 2894 bdev_io_stat_add(&ch->bdev->internal.stat, &ch->stat); 2895 pthread_mutex_unlock(&ch->bdev->internal.mutex); 2896 2897 mgmt_ch = shared_resource->mgmt_ch; 2898 2899 bdev_abort_all_queued_io(&ch->queued_resets, ch); 2900 bdev_abort_all_queued_io(&shared_resource->nomem_io, ch); 2901 bdev_abort_all_buf_io(&mgmt_ch->need_buf_small, ch); 2902 bdev_abort_all_buf_io(&mgmt_ch->need_buf_large, ch); 2903 2904 if (ch->histogram) { 2905 spdk_histogram_data_free(ch->histogram); 2906 } 2907 2908 bdev_channel_destroy_resource(ch); 2909 } 2910 2911 int 2912 spdk_bdev_alias_add(struct spdk_bdev *bdev, const char *alias) 2913 { 2914 struct spdk_bdev_alias *tmp; 2915 2916 if (alias == NULL) { 2917 SPDK_ERRLOG("Empty alias passed\n"); 2918 return -EINVAL; 2919 } 2920 2921 if (spdk_bdev_get_by_name(alias)) { 2922 SPDK_ERRLOG("Bdev name/alias: %s already exists\n", alias); 2923 return -EEXIST; 2924 } 2925 2926 tmp = calloc(1, sizeof(*tmp)); 2927 if (tmp == NULL) { 2928 SPDK_ERRLOG("Unable to allocate alias\n"); 2929 return -ENOMEM; 2930 } 2931 2932 tmp->alias = strdup(alias); 2933 if (tmp->alias == NULL) { 2934 free(tmp); 2935 SPDK_ERRLOG("Unable to allocate alias\n"); 2936 return -ENOMEM; 2937 } 2938 2939 TAILQ_INSERT_TAIL(&bdev->aliases, tmp, tailq); 2940 2941 return 0; 2942 } 2943 2944 int 2945 spdk_bdev_alias_del(struct spdk_bdev *bdev, const char *alias) 2946 { 2947 struct spdk_bdev_alias *tmp; 2948 2949 TAILQ_FOREACH(tmp, &bdev->aliases, tailq) { 2950 if (strcmp(alias, tmp->alias) == 0) { 2951 TAILQ_REMOVE(&bdev->aliases, tmp, tailq); 2952 free(tmp->alias); 2953 free(tmp); 2954 return 0; 2955 } 2956 } 2957 2958 SPDK_INFOLOG(bdev, "Alias %s does not exists\n", alias); 2959 2960 return -ENOENT; 2961 } 2962 2963 void 2964 spdk_bdev_alias_del_all(struct spdk_bdev *bdev) 2965 { 2966 struct spdk_bdev_alias *p, *tmp; 2967 2968 TAILQ_FOREACH_SAFE(p, &bdev->aliases, tailq, tmp) { 2969 TAILQ_REMOVE(&bdev->aliases, p, tailq); 2970 free(p->alias); 2971 free(p); 2972 } 2973 } 2974 2975 struct spdk_io_channel * 2976 spdk_bdev_get_io_channel(struct spdk_bdev_desc *desc) 2977 { 2978 return spdk_get_io_channel(__bdev_to_io_dev(spdk_bdev_desc_get_bdev(desc))); 2979 } 2980 2981 const char * 2982 spdk_bdev_get_name(const struct spdk_bdev *bdev) 2983 { 2984 return bdev->name; 2985 } 2986 2987 const char * 2988 spdk_bdev_get_product_name(const struct spdk_bdev *bdev) 2989 { 2990 return bdev->product_name; 2991 } 2992 2993 const struct spdk_bdev_aliases_list * 2994 spdk_bdev_get_aliases(const struct spdk_bdev *bdev) 2995 { 2996 return &bdev->aliases; 2997 } 2998 2999 uint32_t 3000 spdk_bdev_get_block_size(const struct spdk_bdev *bdev) 3001 { 3002 return bdev->blocklen; 3003 } 3004 3005 uint32_t 3006 spdk_bdev_get_write_unit_size(const struct spdk_bdev *bdev) 3007 { 3008 return bdev->write_unit_size; 3009 } 3010 3011 uint64_t 3012 spdk_bdev_get_num_blocks(const struct spdk_bdev *bdev) 3013 { 3014 return bdev->blockcnt; 3015 } 3016 3017 const char * 3018 spdk_bdev_get_qos_rpc_type(enum spdk_bdev_qos_rate_limit_type type) 3019 { 3020 return qos_rpc_type[type]; 3021 } 3022 3023 void 3024 spdk_bdev_get_qos_rate_limits(struct spdk_bdev *bdev, uint64_t *limits) 3025 { 3026 int i; 3027 3028 memset(limits, 0, sizeof(*limits) * SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES); 3029 3030 pthread_mutex_lock(&bdev->internal.mutex); 3031 if (bdev->internal.qos) { 3032 for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) { 3033 if (bdev->internal.qos->rate_limits[i].limit != 3034 SPDK_BDEV_QOS_LIMIT_NOT_DEFINED) { 3035 limits[i] = bdev->internal.qos->rate_limits[i].limit; 3036 if (bdev_qos_is_iops_rate_limit(i) == false) { 3037 /* Change from Byte to Megabyte which is user visible. */ 3038 limits[i] = limits[i] / 1024 / 1024; 3039 } 3040 } 3041 } 3042 } 3043 pthread_mutex_unlock(&bdev->internal.mutex); 3044 } 3045 3046 size_t 3047 spdk_bdev_get_buf_align(const struct spdk_bdev *bdev) 3048 { 3049 return 1 << bdev->required_alignment; 3050 } 3051 3052 uint32_t 3053 spdk_bdev_get_optimal_io_boundary(const struct spdk_bdev *bdev) 3054 { 3055 return bdev->optimal_io_boundary; 3056 } 3057 3058 bool 3059 spdk_bdev_has_write_cache(const struct spdk_bdev *bdev) 3060 { 3061 return bdev->write_cache; 3062 } 3063 3064 const struct spdk_uuid * 3065 spdk_bdev_get_uuid(const struct spdk_bdev *bdev) 3066 { 3067 return &bdev->uuid; 3068 } 3069 3070 uint16_t 3071 spdk_bdev_get_acwu(const struct spdk_bdev *bdev) 3072 { 3073 return bdev->acwu; 3074 } 3075 3076 uint32_t 3077 spdk_bdev_get_md_size(const struct spdk_bdev *bdev) 3078 { 3079 return bdev->md_len; 3080 } 3081 3082 bool 3083 spdk_bdev_is_md_interleaved(const struct spdk_bdev *bdev) 3084 { 3085 return (bdev->md_len != 0) && bdev->md_interleave; 3086 } 3087 3088 bool 3089 spdk_bdev_is_md_separate(const struct spdk_bdev *bdev) 3090 { 3091 return (bdev->md_len != 0) && !bdev->md_interleave; 3092 } 3093 3094 bool 3095 spdk_bdev_is_zoned(const struct spdk_bdev *bdev) 3096 { 3097 return bdev->zoned; 3098 } 3099 3100 uint32_t 3101 spdk_bdev_get_data_block_size(const struct spdk_bdev *bdev) 3102 { 3103 if (spdk_bdev_is_md_interleaved(bdev)) { 3104 return bdev->blocklen - bdev->md_len; 3105 } else { 3106 return bdev->blocklen; 3107 } 3108 } 3109 3110 static uint32_t 3111 _bdev_get_block_size_with_md(const struct spdk_bdev *bdev) 3112 { 3113 if (!spdk_bdev_is_md_interleaved(bdev)) { 3114 return bdev->blocklen + bdev->md_len; 3115 } else { 3116 return bdev->blocklen; 3117 } 3118 } 3119 3120 enum spdk_dif_type spdk_bdev_get_dif_type(const struct spdk_bdev *bdev) 3121 { 3122 if (bdev->md_len != 0) { 3123 return bdev->dif_type; 3124 } else { 3125 return SPDK_DIF_DISABLE; 3126 } 3127 } 3128 3129 bool 3130 spdk_bdev_is_dif_head_of_md(const struct spdk_bdev *bdev) 3131 { 3132 if (spdk_bdev_get_dif_type(bdev) != SPDK_DIF_DISABLE) { 3133 return bdev->dif_is_head_of_md; 3134 } else { 3135 return false; 3136 } 3137 } 3138 3139 bool 3140 spdk_bdev_is_dif_check_enabled(const struct spdk_bdev *bdev, 3141 enum spdk_dif_check_type check_type) 3142 { 3143 if (spdk_bdev_get_dif_type(bdev) == SPDK_DIF_DISABLE) { 3144 return false; 3145 } 3146 3147 switch (check_type) { 3148 case SPDK_DIF_CHECK_TYPE_REFTAG: 3149 return (bdev->dif_check_flags & SPDK_DIF_FLAGS_REFTAG_CHECK) != 0; 3150 case SPDK_DIF_CHECK_TYPE_APPTAG: 3151 return (bdev->dif_check_flags & SPDK_DIF_FLAGS_APPTAG_CHECK) != 0; 3152 case SPDK_DIF_CHECK_TYPE_GUARD: 3153 return (bdev->dif_check_flags & SPDK_DIF_FLAGS_GUARD_CHECK) != 0; 3154 default: 3155 return false; 3156 } 3157 } 3158 3159 uint64_t 3160 spdk_bdev_get_qd(const struct spdk_bdev *bdev) 3161 { 3162 return bdev->internal.measured_queue_depth; 3163 } 3164 3165 uint64_t 3166 spdk_bdev_get_qd_sampling_period(const struct spdk_bdev *bdev) 3167 { 3168 return bdev->internal.period; 3169 } 3170 3171 uint64_t 3172 spdk_bdev_get_weighted_io_time(const struct spdk_bdev *bdev) 3173 { 3174 return bdev->internal.weighted_io_time; 3175 } 3176 3177 uint64_t 3178 spdk_bdev_get_io_time(const struct spdk_bdev *bdev) 3179 { 3180 return bdev->internal.io_time; 3181 } 3182 3183 static void 3184 _calculate_measured_qd_cpl(struct spdk_io_channel_iter *i, int status) 3185 { 3186 struct spdk_bdev *bdev = spdk_io_channel_iter_get_ctx(i); 3187 3188 bdev->internal.measured_queue_depth = bdev->internal.temporary_queue_depth; 3189 3190 if (bdev->internal.measured_queue_depth) { 3191 bdev->internal.io_time += bdev->internal.period; 3192 bdev->internal.weighted_io_time += bdev->internal.period * bdev->internal.measured_queue_depth; 3193 } 3194 } 3195 3196 static void 3197 _calculate_measured_qd(struct spdk_io_channel_iter *i) 3198 { 3199 struct spdk_bdev *bdev = spdk_io_channel_iter_get_ctx(i); 3200 struct spdk_io_channel *io_ch = spdk_io_channel_iter_get_channel(i); 3201 struct spdk_bdev_channel *ch = spdk_io_channel_get_ctx(io_ch); 3202 3203 bdev->internal.temporary_queue_depth += ch->io_outstanding; 3204 spdk_for_each_channel_continue(i, 0); 3205 } 3206 3207 static int 3208 bdev_calculate_measured_queue_depth(void *ctx) 3209 { 3210 struct spdk_bdev *bdev = ctx; 3211 bdev->internal.temporary_queue_depth = 0; 3212 spdk_for_each_channel(__bdev_to_io_dev(bdev), _calculate_measured_qd, bdev, 3213 _calculate_measured_qd_cpl); 3214 return SPDK_POLLER_BUSY; 3215 } 3216 3217 void 3218 spdk_bdev_set_qd_sampling_period(struct spdk_bdev *bdev, uint64_t period) 3219 { 3220 bdev->internal.period = period; 3221 3222 if (bdev->internal.qd_poller != NULL) { 3223 spdk_poller_unregister(&bdev->internal.qd_poller); 3224 bdev->internal.measured_queue_depth = UINT64_MAX; 3225 } 3226 3227 if (period != 0) { 3228 bdev->internal.qd_poller = SPDK_POLLER_REGISTER(bdev_calculate_measured_queue_depth, bdev, 3229 period); 3230 } 3231 } 3232 3233 static void 3234 _resize_notify(void *arg) 3235 { 3236 struct spdk_bdev_desc *desc = arg; 3237 3238 pthread_mutex_lock(&desc->mutex); 3239 desc->refs--; 3240 if (!desc->closed) { 3241 pthread_mutex_unlock(&desc->mutex); 3242 desc->callback.event_fn(SPDK_BDEV_EVENT_RESIZE, 3243 desc->bdev, 3244 desc->callback.ctx); 3245 return; 3246 } else if (0 == desc->refs) { 3247 /* This descriptor was closed after this resize_notify message was sent. 3248 * spdk_bdev_close() could not free the descriptor since this message was 3249 * in flight, so we free it now using bdev_desc_free(). 3250 */ 3251 pthread_mutex_unlock(&desc->mutex); 3252 bdev_desc_free(desc); 3253 return; 3254 } 3255 pthread_mutex_unlock(&desc->mutex); 3256 } 3257 3258 int 3259 spdk_bdev_notify_blockcnt_change(struct spdk_bdev *bdev, uint64_t size) 3260 { 3261 struct spdk_bdev_desc *desc; 3262 int ret; 3263 3264 pthread_mutex_lock(&bdev->internal.mutex); 3265 3266 /* bdev has open descriptors */ 3267 if (!TAILQ_EMPTY(&bdev->internal.open_descs) && 3268 bdev->blockcnt > size) { 3269 ret = -EBUSY; 3270 } else { 3271 bdev->blockcnt = size; 3272 TAILQ_FOREACH(desc, &bdev->internal.open_descs, link) { 3273 pthread_mutex_lock(&desc->mutex); 3274 if (desc->callback.open_with_ext && !desc->closed) { 3275 desc->refs++; 3276 spdk_thread_send_msg(desc->thread, _resize_notify, desc); 3277 } 3278 pthread_mutex_unlock(&desc->mutex); 3279 } 3280 ret = 0; 3281 } 3282 3283 pthread_mutex_unlock(&bdev->internal.mutex); 3284 3285 return ret; 3286 } 3287 3288 /* 3289 * Convert I/O offset and length from bytes to blocks. 3290 * 3291 * Returns zero on success or non-zero if the byte parameters aren't divisible by the block size. 3292 */ 3293 static uint64_t 3294 bdev_bytes_to_blocks(struct spdk_bdev *bdev, uint64_t offset_bytes, uint64_t *offset_blocks, 3295 uint64_t num_bytes, uint64_t *num_blocks) 3296 { 3297 uint32_t block_size = bdev->blocklen; 3298 uint8_t shift_cnt; 3299 3300 /* Avoid expensive div operations if possible. These spdk_u32 functions are very cheap. */ 3301 if (spdk_likely(spdk_u32_is_pow2(block_size))) { 3302 shift_cnt = spdk_u32log2(block_size); 3303 *offset_blocks = offset_bytes >> shift_cnt; 3304 *num_blocks = num_bytes >> shift_cnt; 3305 return (offset_bytes - (*offset_blocks << shift_cnt)) | 3306 (num_bytes - (*num_blocks << shift_cnt)); 3307 } else { 3308 *offset_blocks = offset_bytes / block_size; 3309 *num_blocks = num_bytes / block_size; 3310 return (offset_bytes % block_size) | (num_bytes % block_size); 3311 } 3312 } 3313 3314 static bool 3315 bdev_io_valid_blocks(struct spdk_bdev *bdev, uint64_t offset_blocks, uint64_t num_blocks) 3316 { 3317 /* Return failure if offset_blocks + num_blocks is less than offset_blocks; indicates there 3318 * has been an overflow and hence the offset has been wrapped around */ 3319 if (offset_blocks + num_blocks < offset_blocks) { 3320 return false; 3321 } 3322 3323 /* Return failure if offset_blocks + num_blocks exceeds the size of the bdev */ 3324 if (offset_blocks + num_blocks > bdev->blockcnt) { 3325 return false; 3326 } 3327 3328 return true; 3329 } 3330 3331 static bool 3332 _bdev_io_check_md_buf(const struct iovec *iovs, const void *md_buf) 3333 { 3334 return _is_buf_allocated(iovs) == (md_buf != NULL); 3335 } 3336 3337 static int 3338 bdev_read_blocks_with_md(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, void *buf, 3339 void *md_buf, int64_t offset_blocks, uint64_t num_blocks, 3340 spdk_bdev_io_completion_cb cb, void *cb_arg) 3341 { 3342 struct spdk_bdev *bdev = spdk_bdev_desc_get_bdev(desc); 3343 struct spdk_bdev_io *bdev_io; 3344 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 3345 3346 if (!bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) { 3347 return -EINVAL; 3348 } 3349 3350 bdev_io = bdev_channel_get_io(channel); 3351 if (!bdev_io) { 3352 return -ENOMEM; 3353 } 3354 3355 bdev_io->internal.ch = channel; 3356 bdev_io->internal.desc = desc; 3357 bdev_io->type = SPDK_BDEV_IO_TYPE_READ; 3358 bdev_io->u.bdev.iovs = &bdev_io->iov; 3359 bdev_io->u.bdev.iovs[0].iov_base = buf; 3360 bdev_io->u.bdev.iovs[0].iov_len = num_blocks * bdev->blocklen; 3361 bdev_io->u.bdev.iovcnt = 1; 3362 bdev_io->u.bdev.md_buf = md_buf; 3363 bdev_io->u.bdev.num_blocks = num_blocks; 3364 bdev_io->u.bdev.offset_blocks = offset_blocks; 3365 bdev_io_init(bdev_io, bdev, cb_arg, cb); 3366 3367 bdev_io_submit(bdev_io); 3368 return 0; 3369 } 3370 3371 int 3372 spdk_bdev_read(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 3373 void *buf, uint64_t offset, uint64_t nbytes, 3374 spdk_bdev_io_completion_cb cb, void *cb_arg) 3375 { 3376 uint64_t offset_blocks, num_blocks; 3377 3378 if (bdev_bytes_to_blocks(spdk_bdev_desc_get_bdev(desc), offset, &offset_blocks, 3379 nbytes, &num_blocks) != 0) { 3380 return -EINVAL; 3381 } 3382 3383 return spdk_bdev_read_blocks(desc, ch, buf, offset_blocks, num_blocks, cb, cb_arg); 3384 } 3385 3386 int 3387 spdk_bdev_read_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 3388 void *buf, uint64_t offset_blocks, uint64_t num_blocks, 3389 spdk_bdev_io_completion_cb cb, void *cb_arg) 3390 { 3391 return bdev_read_blocks_with_md(desc, ch, buf, NULL, offset_blocks, num_blocks, cb, cb_arg); 3392 } 3393 3394 int 3395 spdk_bdev_read_blocks_with_md(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 3396 void *buf, void *md_buf, int64_t offset_blocks, uint64_t num_blocks, 3397 spdk_bdev_io_completion_cb cb, void *cb_arg) 3398 { 3399 struct iovec iov = { 3400 .iov_base = buf, 3401 }; 3402 3403 if (!spdk_bdev_is_md_separate(spdk_bdev_desc_get_bdev(desc))) { 3404 return -EINVAL; 3405 } 3406 3407 if (!_bdev_io_check_md_buf(&iov, md_buf)) { 3408 return -EINVAL; 3409 } 3410 3411 return bdev_read_blocks_with_md(desc, ch, buf, md_buf, offset_blocks, num_blocks, 3412 cb, cb_arg); 3413 } 3414 3415 int 3416 spdk_bdev_readv(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 3417 struct iovec *iov, int iovcnt, 3418 uint64_t offset, uint64_t nbytes, 3419 spdk_bdev_io_completion_cb cb, void *cb_arg) 3420 { 3421 uint64_t offset_blocks, num_blocks; 3422 3423 if (bdev_bytes_to_blocks(spdk_bdev_desc_get_bdev(desc), offset, &offset_blocks, 3424 nbytes, &num_blocks) != 0) { 3425 return -EINVAL; 3426 } 3427 3428 return spdk_bdev_readv_blocks(desc, ch, iov, iovcnt, offset_blocks, num_blocks, cb, cb_arg); 3429 } 3430 3431 static int 3432 bdev_readv_blocks_with_md(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 3433 struct iovec *iov, int iovcnt, void *md_buf, uint64_t offset_blocks, 3434 uint64_t num_blocks, spdk_bdev_io_completion_cb cb, void *cb_arg) 3435 { 3436 struct spdk_bdev *bdev = spdk_bdev_desc_get_bdev(desc); 3437 struct spdk_bdev_io *bdev_io; 3438 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 3439 3440 if (!bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) { 3441 return -EINVAL; 3442 } 3443 3444 bdev_io = bdev_channel_get_io(channel); 3445 if (!bdev_io) { 3446 return -ENOMEM; 3447 } 3448 3449 bdev_io->internal.ch = channel; 3450 bdev_io->internal.desc = desc; 3451 bdev_io->type = SPDK_BDEV_IO_TYPE_READ; 3452 bdev_io->u.bdev.iovs = iov; 3453 bdev_io->u.bdev.iovcnt = iovcnt; 3454 bdev_io->u.bdev.md_buf = md_buf; 3455 bdev_io->u.bdev.num_blocks = num_blocks; 3456 bdev_io->u.bdev.offset_blocks = offset_blocks; 3457 bdev_io_init(bdev_io, bdev, cb_arg, cb); 3458 3459 bdev_io_submit(bdev_io); 3460 return 0; 3461 } 3462 3463 int spdk_bdev_readv_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 3464 struct iovec *iov, int iovcnt, 3465 uint64_t offset_blocks, uint64_t num_blocks, 3466 spdk_bdev_io_completion_cb cb, void *cb_arg) 3467 { 3468 return bdev_readv_blocks_with_md(desc, ch, iov, iovcnt, NULL, offset_blocks, 3469 num_blocks, cb, cb_arg); 3470 } 3471 3472 int 3473 spdk_bdev_readv_blocks_with_md(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 3474 struct iovec *iov, int iovcnt, void *md_buf, 3475 uint64_t offset_blocks, uint64_t num_blocks, 3476 spdk_bdev_io_completion_cb cb, void *cb_arg) 3477 { 3478 if (!spdk_bdev_is_md_separate(spdk_bdev_desc_get_bdev(desc))) { 3479 return -EINVAL; 3480 } 3481 3482 if (!_bdev_io_check_md_buf(iov, md_buf)) { 3483 return -EINVAL; 3484 } 3485 3486 return bdev_readv_blocks_with_md(desc, ch, iov, iovcnt, md_buf, offset_blocks, 3487 num_blocks, cb, cb_arg); 3488 } 3489 3490 static int 3491 bdev_write_blocks_with_md(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 3492 void *buf, void *md_buf, uint64_t offset_blocks, uint64_t num_blocks, 3493 spdk_bdev_io_completion_cb cb, void *cb_arg) 3494 { 3495 struct spdk_bdev *bdev = spdk_bdev_desc_get_bdev(desc); 3496 struct spdk_bdev_io *bdev_io; 3497 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 3498 3499 if (!desc->write) { 3500 return -EBADF; 3501 } 3502 3503 if (!bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) { 3504 return -EINVAL; 3505 } 3506 3507 bdev_io = bdev_channel_get_io(channel); 3508 if (!bdev_io) { 3509 return -ENOMEM; 3510 } 3511 3512 bdev_io->internal.ch = channel; 3513 bdev_io->internal.desc = desc; 3514 bdev_io->type = SPDK_BDEV_IO_TYPE_WRITE; 3515 bdev_io->u.bdev.iovs = &bdev_io->iov; 3516 bdev_io->u.bdev.iovs[0].iov_base = buf; 3517 bdev_io->u.bdev.iovs[0].iov_len = num_blocks * bdev->blocklen; 3518 bdev_io->u.bdev.iovcnt = 1; 3519 bdev_io->u.bdev.md_buf = md_buf; 3520 bdev_io->u.bdev.num_blocks = num_blocks; 3521 bdev_io->u.bdev.offset_blocks = offset_blocks; 3522 bdev_io_init(bdev_io, bdev, cb_arg, cb); 3523 3524 bdev_io_submit(bdev_io); 3525 return 0; 3526 } 3527 3528 int 3529 spdk_bdev_write(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 3530 void *buf, uint64_t offset, uint64_t nbytes, 3531 spdk_bdev_io_completion_cb cb, void *cb_arg) 3532 { 3533 uint64_t offset_blocks, num_blocks; 3534 3535 if (bdev_bytes_to_blocks(spdk_bdev_desc_get_bdev(desc), offset, &offset_blocks, 3536 nbytes, &num_blocks) != 0) { 3537 return -EINVAL; 3538 } 3539 3540 return spdk_bdev_write_blocks(desc, ch, buf, offset_blocks, num_blocks, cb, cb_arg); 3541 } 3542 3543 int 3544 spdk_bdev_write_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 3545 void *buf, uint64_t offset_blocks, uint64_t num_blocks, 3546 spdk_bdev_io_completion_cb cb, void *cb_arg) 3547 { 3548 return bdev_write_blocks_with_md(desc, ch, buf, NULL, offset_blocks, num_blocks, 3549 cb, cb_arg); 3550 } 3551 3552 int 3553 spdk_bdev_write_blocks_with_md(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 3554 void *buf, void *md_buf, uint64_t offset_blocks, uint64_t num_blocks, 3555 spdk_bdev_io_completion_cb cb, void *cb_arg) 3556 { 3557 struct iovec iov = { 3558 .iov_base = buf, 3559 }; 3560 3561 if (!spdk_bdev_is_md_separate(spdk_bdev_desc_get_bdev(desc))) { 3562 return -EINVAL; 3563 } 3564 3565 if (!_bdev_io_check_md_buf(&iov, md_buf)) { 3566 return -EINVAL; 3567 } 3568 3569 return bdev_write_blocks_with_md(desc, ch, buf, md_buf, offset_blocks, num_blocks, 3570 cb, cb_arg); 3571 } 3572 3573 static int 3574 bdev_writev_blocks_with_md(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 3575 struct iovec *iov, int iovcnt, void *md_buf, 3576 uint64_t offset_blocks, uint64_t num_blocks, 3577 spdk_bdev_io_completion_cb cb, void *cb_arg) 3578 { 3579 struct spdk_bdev *bdev = spdk_bdev_desc_get_bdev(desc); 3580 struct spdk_bdev_io *bdev_io; 3581 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 3582 3583 if (!desc->write) { 3584 return -EBADF; 3585 } 3586 3587 if (!bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) { 3588 return -EINVAL; 3589 } 3590 3591 bdev_io = bdev_channel_get_io(channel); 3592 if (!bdev_io) { 3593 return -ENOMEM; 3594 } 3595 3596 bdev_io->internal.ch = channel; 3597 bdev_io->internal.desc = desc; 3598 bdev_io->type = SPDK_BDEV_IO_TYPE_WRITE; 3599 bdev_io->u.bdev.iovs = iov; 3600 bdev_io->u.bdev.iovcnt = iovcnt; 3601 bdev_io->u.bdev.md_buf = md_buf; 3602 bdev_io->u.bdev.num_blocks = num_blocks; 3603 bdev_io->u.bdev.offset_blocks = offset_blocks; 3604 bdev_io_init(bdev_io, bdev, cb_arg, cb); 3605 3606 bdev_io_submit(bdev_io); 3607 return 0; 3608 } 3609 3610 int 3611 spdk_bdev_writev(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 3612 struct iovec *iov, int iovcnt, 3613 uint64_t offset, uint64_t len, 3614 spdk_bdev_io_completion_cb cb, void *cb_arg) 3615 { 3616 uint64_t offset_blocks, num_blocks; 3617 3618 if (bdev_bytes_to_blocks(spdk_bdev_desc_get_bdev(desc), offset, &offset_blocks, 3619 len, &num_blocks) != 0) { 3620 return -EINVAL; 3621 } 3622 3623 return spdk_bdev_writev_blocks(desc, ch, iov, iovcnt, offset_blocks, num_blocks, cb, cb_arg); 3624 } 3625 3626 int 3627 spdk_bdev_writev_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 3628 struct iovec *iov, int iovcnt, 3629 uint64_t offset_blocks, uint64_t num_blocks, 3630 spdk_bdev_io_completion_cb cb, void *cb_arg) 3631 { 3632 return bdev_writev_blocks_with_md(desc, ch, iov, iovcnt, NULL, offset_blocks, 3633 num_blocks, cb, cb_arg); 3634 } 3635 3636 int 3637 spdk_bdev_writev_blocks_with_md(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 3638 struct iovec *iov, int iovcnt, void *md_buf, 3639 uint64_t offset_blocks, uint64_t num_blocks, 3640 spdk_bdev_io_completion_cb cb, void *cb_arg) 3641 { 3642 if (!spdk_bdev_is_md_separate(spdk_bdev_desc_get_bdev(desc))) { 3643 return -EINVAL; 3644 } 3645 3646 if (!_bdev_io_check_md_buf(iov, md_buf)) { 3647 return -EINVAL; 3648 } 3649 3650 return bdev_writev_blocks_with_md(desc, ch, iov, iovcnt, md_buf, offset_blocks, 3651 num_blocks, cb, cb_arg); 3652 } 3653 3654 static void 3655 bdev_compare_do_read_done(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg) 3656 { 3657 struct spdk_bdev_io *parent_io = cb_arg; 3658 uint8_t *read_buf = bdev_io->u.bdev.iovs[0].iov_base; 3659 int i, rc = 0; 3660 3661 if (!success) { 3662 parent_io->internal.status = SPDK_BDEV_IO_STATUS_FAILED; 3663 parent_io->internal.cb(parent_io, false, parent_io->internal.caller_ctx); 3664 spdk_bdev_free_io(bdev_io); 3665 return; 3666 } 3667 3668 for (i = 0; i < parent_io->u.bdev.iovcnt; i++) { 3669 rc = memcmp(read_buf, 3670 parent_io->u.bdev.iovs[i].iov_base, 3671 parent_io->u.bdev.iovs[i].iov_len); 3672 if (rc) { 3673 break; 3674 } 3675 read_buf += parent_io->u.bdev.iovs[i].iov_len; 3676 } 3677 3678 spdk_bdev_free_io(bdev_io); 3679 3680 if (rc == 0) { 3681 parent_io->internal.status = SPDK_BDEV_IO_STATUS_SUCCESS; 3682 parent_io->internal.cb(parent_io, true, parent_io->internal.caller_ctx); 3683 } else { 3684 parent_io->internal.status = SPDK_BDEV_IO_STATUS_MISCOMPARE; 3685 parent_io->internal.cb(parent_io, false, parent_io->internal.caller_ctx); 3686 } 3687 } 3688 3689 static void 3690 bdev_compare_do_read(void *_bdev_io) 3691 { 3692 struct spdk_bdev_io *bdev_io = _bdev_io; 3693 int rc; 3694 3695 rc = spdk_bdev_read_blocks(bdev_io->internal.desc, 3696 spdk_io_channel_from_ctx(bdev_io->internal.ch), NULL, 3697 bdev_io->u.bdev.offset_blocks, bdev_io->u.bdev.num_blocks, 3698 bdev_compare_do_read_done, bdev_io); 3699 3700 if (rc == -ENOMEM) { 3701 bdev_queue_io_wait_with_cb(bdev_io, bdev_compare_do_read); 3702 } else if (rc != 0) { 3703 bdev_io->internal.status = SPDK_BDEV_IO_STATUS_FAILED; 3704 bdev_io->internal.cb(bdev_io, false, bdev_io->internal.caller_ctx); 3705 } 3706 } 3707 3708 static int 3709 bdev_comparev_blocks_with_md(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 3710 struct iovec *iov, int iovcnt, void *md_buf, 3711 uint64_t offset_blocks, uint64_t num_blocks, 3712 spdk_bdev_io_completion_cb cb, void *cb_arg) 3713 { 3714 struct spdk_bdev *bdev = spdk_bdev_desc_get_bdev(desc); 3715 struct spdk_bdev_io *bdev_io; 3716 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 3717 3718 if (!bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) { 3719 return -EINVAL; 3720 } 3721 3722 bdev_io = bdev_channel_get_io(channel); 3723 if (!bdev_io) { 3724 return -ENOMEM; 3725 } 3726 3727 bdev_io->internal.ch = channel; 3728 bdev_io->internal.desc = desc; 3729 bdev_io->type = SPDK_BDEV_IO_TYPE_COMPARE; 3730 bdev_io->u.bdev.iovs = iov; 3731 bdev_io->u.bdev.iovcnt = iovcnt; 3732 bdev_io->u.bdev.md_buf = md_buf; 3733 bdev_io->u.bdev.num_blocks = num_blocks; 3734 bdev_io->u.bdev.offset_blocks = offset_blocks; 3735 bdev_io_init(bdev_io, bdev, cb_arg, cb); 3736 3737 if (bdev_io_type_supported(bdev, SPDK_BDEV_IO_TYPE_COMPARE)) { 3738 bdev_io_submit(bdev_io); 3739 return 0; 3740 } 3741 3742 bdev_compare_do_read(bdev_io); 3743 3744 return 0; 3745 } 3746 3747 int 3748 spdk_bdev_comparev_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 3749 struct iovec *iov, int iovcnt, 3750 uint64_t offset_blocks, uint64_t num_blocks, 3751 spdk_bdev_io_completion_cb cb, void *cb_arg) 3752 { 3753 return bdev_comparev_blocks_with_md(desc, ch, iov, iovcnt, NULL, offset_blocks, 3754 num_blocks, cb, cb_arg); 3755 } 3756 3757 int 3758 spdk_bdev_comparev_blocks_with_md(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 3759 struct iovec *iov, int iovcnt, void *md_buf, 3760 uint64_t offset_blocks, uint64_t num_blocks, 3761 spdk_bdev_io_completion_cb cb, void *cb_arg) 3762 { 3763 if (!spdk_bdev_is_md_separate(spdk_bdev_desc_get_bdev(desc))) { 3764 return -EINVAL; 3765 } 3766 3767 if (!_bdev_io_check_md_buf(iov, md_buf)) { 3768 return -EINVAL; 3769 } 3770 3771 return bdev_comparev_blocks_with_md(desc, ch, iov, iovcnt, md_buf, offset_blocks, 3772 num_blocks, cb, cb_arg); 3773 } 3774 3775 static int 3776 bdev_compare_blocks_with_md(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 3777 void *buf, void *md_buf, uint64_t offset_blocks, uint64_t num_blocks, 3778 spdk_bdev_io_completion_cb cb, void *cb_arg) 3779 { 3780 struct spdk_bdev *bdev = spdk_bdev_desc_get_bdev(desc); 3781 struct spdk_bdev_io *bdev_io; 3782 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 3783 3784 if (!bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) { 3785 return -EINVAL; 3786 } 3787 3788 bdev_io = bdev_channel_get_io(channel); 3789 if (!bdev_io) { 3790 return -ENOMEM; 3791 } 3792 3793 bdev_io->internal.ch = channel; 3794 bdev_io->internal.desc = desc; 3795 bdev_io->type = SPDK_BDEV_IO_TYPE_COMPARE; 3796 bdev_io->u.bdev.iovs = &bdev_io->iov; 3797 bdev_io->u.bdev.iovs[0].iov_base = buf; 3798 bdev_io->u.bdev.iovs[0].iov_len = num_blocks * bdev->blocklen; 3799 bdev_io->u.bdev.iovcnt = 1; 3800 bdev_io->u.bdev.md_buf = md_buf; 3801 bdev_io->u.bdev.num_blocks = num_blocks; 3802 bdev_io->u.bdev.offset_blocks = offset_blocks; 3803 bdev_io_init(bdev_io, bdev, cb_arg, cb); 3804 3805 if (bdev_io_type_supported(bdev, SPDK_BDEV_IO_TYPE_COMPARE)) { 3806 bdev_io_submit(bdev_io); 3807 return 0; 3808 } 3809 3810 bdev_compare_do_read(bdev_io); 3811 3812 return 0; 3813 } 3814 3815 int 3816 spdk_bdev_compare_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 3817 void *buf, uint64_t offset_blocks, uint64_t num_blocks, 3818 spdk_bdev_io_completion_cb cb, void *cb_arg) 3819 { 3820 return bdev_compare_blocks_with_md(desc, ch, buf, NULL, offset_blocks, num_blocks, 3821 cb, cb_arg); 3822 } 3823 3824 int 3825 spdk_bdev_compare_blocks_with_md(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 3826 void *buf, void *md_buf, uint64_t offset_blocks, uint64_t num_blocks, 3827 spdk_bdev_io_completion_cb cb, void *cb_arg) 3828 { 3829 struct iovec iov = { 3830 .iov_base = buf, 3831 }; 3832 3833 if (!spdk_bdev_is_md_separate(spdk_bdev_desc_get_bdev(desc))) { 3834 return -EINVAL; 3835 } 3836 3837 if (!_bdev_io_check_md_buf(&iov, md_buf)) { 3838 return -EINVAL; 3839 } 3840 3841 return bdev_compare_blocks_with_md(desc, ch, buf, md_buf, offset_blocks, num_blocks, 3842 cb, cb_arg); 3843 } 3844 3845 static void 3846 bdev_comparev_and_writev_blocks_unlocked(void *ctx, int unlock_status) 3847 { 3848 struct spdk_bdev_io *bdev_io = ctx; 3849 3850 if (unlock_status) { 3851 SPDK_ERRLOG("LBA range unlock failed\n"); 3852 } 3853 3854 bdev_io->internal.cb(bdev_io, bdev_io->internal.status == SPDK_BDEV_IO_STATUS_SUCCESS ? true : 3855 false, bdev_io->internal.caller_ctx); 3856 } 3857 3858 static void 3859 bdev_comparev_and_writev_blocks_unlock(struct spdk_bdev_io *bdev_io, int status) 3860 { 3861 bdev_io->internal.status = status; 3862 3863 bdev_unlock_lba_range(bdev_io->internal.desc, spdk_io_channel_from_ctx(bdev_io->internal.ch), 3864 bdev_io->u.bdev.offset_blocks, bdev_io->u.bdev.num_blocks, 3865 bdev_comparev_and_writev_blocks_unlocked, bdev_io); 3866 } 3867 3868 static void 3869 bdev_compare_and_write_do_write_done(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg) 3870 { 3871 struct spdk_bdev_io *parent_io = cb_arg; 3872 3873 if (!success) { 3874 SPDK_ERRLOG("Compare and write operation failed\n"); 3875 } 3876 3877 spdk_bdev_free_io(bdev_io); 3878 3879 bdev_comparev_and_writev_blocks_unlock(parent_io, 3880 success ? SPDK_BDEV_IO_STATUS_SUCCESS : SPDK_BDEV_IO_STATUS_FAILED); 3881 } 3882 3883 static void 3884 bdev_compare_and_write_do_write(void *_bdev_io) 3885 { 3886 struct spdk_bdev_io *bdev_io = _bdev_io; 3887 int rc; 3888 3889 rc = spdk_bdev_writev_blocks(bdev_io->internal.desc, 3890 spdk_io_channel_from_ctx(bdev_io->internal.ch), 3891 bdev_io->u.bdev.fused_iovs, bdev_io->u.bdev.fused_iovcnt, 3892 bdev_io->u.bdev.offset_blocks, bdev_io->u.bdev.num_blocks, 3893 bdev_compare_and_write_do_write_done, bdev_io); 3894 3895 3896 if (rc == -ENOMEM) { 3897 bdev_queue_io_wait_with_cb(bdev_io, bdev_compare_and_write_do_write); 3898 } else if (rc != 0) { 3899 bdev_comparev_and_writev_blocks_unlock(bdev_io, SPDK_BDEV_IO_STATUS_FAILED); 3900 } 3901 } 3902 3903 static void 3904 bdev_compare_and_write_do_compare_done(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg) 3905 { 3906 struct spdk_bdev_io *parent_io = cb_arg; 3907 3908 spdk_bdev_free_io(bdev_io); 3909 3910 if (!success) { 3911 bdev_comparev_and_writev_blocks_unlock(parent_io, SPDK_BDEV_IO_STATUS_MISCOMPARE); 3912 return; 3913 } 3914 3915 bdev_compare_and_write_do_write(parent_io); 3916 } 3917 3918 static void 3919 bdev_compare_and_write_do_compare(void *_bdev_io) 3920 { 3921 struct spdk_bdev_io *bdev_io = _bdev_io; 3922 int rc; 3923 3924 rc = spdk_bdev_comparev_blocks(bdev_io->internal.desc, 3925 spdk_io_channel_from_ctx(bdev_io->internal.ch), bdev_io->u.bdev.iovs, 3926 bdev_io->u.bdev.iovcnt, bdev_io->u.bdev.offset_blocks, bdev_io->u.bdev.num_blocks, 3927 bdev_compare_and_write_do_compare_done, bdev_io); 3928 3929 if (rc == -ENOMEM) { 3930 bdev_queue_io_wait_with_cb(bdev_io, bdev_compare_and_write_do_compare); 3931 } else if (rc != 0) { 3932 bdev_comparev_and_writev_blocks_unlock(bdev_io, SPDK_BDEV_IO_STATUS_FIRST_FUSED_FAILED); 3933 } 3934 } 3935 3936 static void 3937 bdev_comparev_and_writev_blocks_locked(void *ctx, int status) 3938 { 3939 struct spdk_bdev_io *bdev_io = ctx; 3940 3941 if (status) { 3942 bdev_io->internal.status = SPDK_BDEV_IO_STATUS_FIRST_FUSED_FAILED; 3943 bdev_io->internal.cb(bdev_io, false, bdev_io->internal.caller_ctx); 3944 return; 3945 } 3946 3947 bdev_compare_and_write_do_compare(bdev_io); 3948 } 3949 3950 int 3951 spdk_bdev_comparev_and_writev_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 3952 struct iovec *compare_iov, int compare_iovcnt, 3953 struct iovec *write_iov, int write_iovcnt, 3954 uint64_t offset_blocks, uint64_t num_blocks, 3955 spdk_bdev_io_completion_cb cb, void *cb_arg) 3956 { 3957 struct spdk_bdev *bdev = spdk_bdev_desc_get_bdev(desc); 3958 struct spdk_bdev_io *bdev_io; 3959 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 3960 3961 if (!desc->write) { 3962 return -EBADF; 3963 } 3964 3965 if (!bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) { 3966 return -EINVAL; 3967 } 3968 3969 if (num_blocks > bdev->acwu) { 3970 return -EINVAL; 3971 } 3972 3973 bdev_io = bdev_channel_get_io(channel); 3974 if (!bdev_io) { 3975 return -ENOMEM; 3976 } 3977 3978 bdev_io->internal.ch = channel; 3979 bdev_io->internal.desc = desc; 3980 bdev_io->type = SPDK_BDEV_IO_TYPE_COMPARE_AND_WRITE; 3981 bdev_io->u.bdev.iovs = compare_iov; 3982 bdev_io->u.bdev.iovcnt = compare_iovcnt; 3983 bdev_io->u.bdev.fused_iovs = write_iov; 3984 bdev_io->u.bdev.fused_iovcnt = write_iovcnt; 3985 bdev_io->u.bdev.md_buf = NULL; 3986 bdev_io->u.bdev.num_blocks = num_blocks; 3987 bdev_io->u.bdev.offset_blocks = offset_blocks; 3988 bdev_io_init(bdev_io, bdev, cb_arg, cb); 3989 3990 if (bdev_io_type_supported(bdev, SPDK_BDEV_IO_TYPE_COMPARE_AND_WRITE)) { 3991 bdev_io_submit(bdev_io); 3992 return 0; 3993 } 3994 3995 return bdev_lock_lba_range(desc, ch, offset_blocks, num_blocks, 3996 bdev_comparev_and_writev_blocks_locked, bdev_io); 3997 } 3998 3999 static void 4000 bdev_zcopy_get_buf(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io, bool success) 4001 { 4002 if (!success) { 4003 /* Don't use spdk_bdev_io_complete here - this bdev_io was never actually submitted. */ 4004 bdev_io->internal.status = SPDK_BDEV_IO_STATUS_NOMEM; 4005 bdev_io->internal.cb(bdev_io, success, bdev_io->internal.caller_ctx); 4006 return; 4007 } 4008 4009 if (bdev_io->u.bdev.zcopy.populate) { 4010 /* Read the real data into the buffer */ 4011 bdev_io->type = SPDK_BDEV_IO_TYPE_READ; 4012 bdev_io->internal.status = SPDK_BDEV_IO_STATUS_PENDING; 4013 bdev_io_submit(bdev_io); 4014 return; 4015 } 4016 4017 /* Don't use spdk_bdev_io_complete here - this bdev_io was never actually submitted. */ 4018 bdev_io->internal.status = SPDK_BDEV_IO_STATUS_SUCCESS; 4019 bdev_io->internal.cb(bdev_io, success, bdev_io->internal.caller_ctx); 4020 } 4021 4022 int 4023 spdk_bdev_zcopy_start(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 4024 uint64_t offset_blocks, uint64_t num_blocks, 4025 bool populate, 4026 spdk_bdev_io_completion_cb cb, void *cb_arg) 4027 { 4028 struct spdk_bdev *bdev = spdk_bdev_desc_get_bdev(desc); 4029 struct spdk_bdev_io *bdev_io; 4030 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 4031 4032 if (!desc->write) { 4033 return -EBADF; 4034 } 4035 4036 if (!bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) { 4037 return -EINVAL; 4038 } 4039 4040 if (!spdk_bdev_io_type_supported(bdev, SPDK_BDEV_IO_TYPE_ZCOPY)) { 4041 return -ENOTSUP; 4042 } 4043 4044 bdev_io = bdev_channel_get_io(channel); 4045 if (!bdev_io) { 4046 return -ENOMEM; 4047 } 4048 4049 bdev_io->internal.ch = channel; 4050 bdev_io->internal.desc = desc; 4051 bdev_io->type = SPDK_BDEV_IO_TYPE_ZCOPY; 4052 bdev_io->u.bdev.num_blocks = num_blocks; 4053 bdev_io->u.bdev.offset_blocks = offset_blocks; 4054 bdev_io->u.bdev.iovs = NULL; 4055 bdev_io->u.bdev.iovcnt = 0; 4056 bdev_io->u.bdev.md_buf = NULL; 4057 bdev_io->u.bdev.zcopy.populate = populate ? 1 : 0; 4058 bdev_io->u.bdev.zcopy.commit = 0; 4059 bdev_io->u.bdev.zcopy.start = 1; 4060 bdev_io_init(bdev_io, bdev, cb_arg, cb); 4061 4062 if (bdev_io_type_supported(bdev, SPDK_BDEV_IO_TYPE_ZCOPY)) { 4063 bdev_io_submit(bdev_io); 4064 } else { 4065 /* Emulate zcopy by allocating a buffer */ 4066 spdk_bdev_io_get_buf(bdev_io, bdev_zcopy_get_buf, 4067 bdev_io->u.bdev.num_blocks * bdev->blocklen); 4068 } 4069 4070 return 0; 4071 } 4072 4073 int 4074 spdk_bdev_zcopy_end(struct spdk_bdev_io *bdev_io, bool commit, 4075 spdk_bdev_io_completion_cb cb, void *cb_arg) 4076 { 4077 struct spdk_bdev *bdev = bdev_io->bdev; 4078 4079 if (bdev_io->type == SPDK_BDEV_IO_TYPE_READ) { 4080 /* This can happen if the zcopy was emulated in start */ 4081 if (bdev_io->u.bdev.zcopy.start != 1) { 4082 return -EINVAL; 4083 } 4084 bdev_io->type = SPDK_BDEV_IO_TYPE_ZCOPY; 4085 } 4086 4087 if (bdev_io->type != SPDK_BDEV_IO_TYPE_ZCOPY) { 4088 return -EINVAL; 4089 } 4090 4091 bdev_io->u.bdev.zcopy.commit = commit ? 1 : 0; 4092 bdev_io->u.bdev.zcopy.start = 0; 4093 bdev_io->internal.caller_ctx = cb_arg; 4094 bdev_io->internal.cb = cb; 4095 bdev_io->internal.status = SPDK_BDEV_IO_STATUS_PENDING; 4096 4097 if (bdev_io_type_supported(bdev, SPDK_BDEV_IO_TYPE_ZCOPY)) { 4098 bdev_io_submit(bdev_io); 4099 return 0; 4100 } 4101 4102 if (!bdev_io->u.bdev.zcopy.commit) { 4103 /* Don't use spdk_bdev_io_complete here - this bdev_io was never actually submitted. */ 4104 bdev_io->internal.status = SPDK_BDEV_IO_STATUS_SUCCESS; 4105 bdev_io->internal.cb(bdev_io, true, bdev_io->internal.caller_ctx); 4106 return 0; 4107 } 4108 4109 bdev_io->type = SPDK_BDEV_IO_TYPE_WRITE; 4110 bdev_io_submit(bdev_io); 4111 4112 return 0; 4113 } 4114 4115 int 4116 spdk_bdev_write_zeroes(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 4117 uint64_t offset, uint64_t len, 4118 spdk_bdev_io_completion_cb cb, void *cb_arg) 4119 { 4120 uint64_t offset_blocks, num_blocks; 4121 4122 if (bdev_bytes_to_blocks(spdk_bdev_desc_get_bdev(desc), offset, &offset_blocks, 4123 len, &num_blocks) != 0) { 4124 return -EINVAL; 4125 } 4126 4127 return spdk_bdev_write_zeroes_blocks(desc, ch, offset_blocks, num_blocks, cb, cb_arg); 4128 } 4129 4130 int 4131 spdk_bdev_write_zeroes_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 4132 uint64_t offset_blocks, uint64_t num_blocks, 4133 spdk_bdev_io_completion_cb cb, void *cb_arg) 4134 { 4135 struct spdk_bdev *bdev = spdk_bdev_desc_get_bdev(desc); 4136 struct spdk_bdev_io *bdev_io; 4137 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 4138 4139 if (!desc->write) { 4140 return -EBADF; 4141 } 4142 4143 if (!bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) { 4144 return -EINVAL; 4145 } 4146 4147 if (!bdev_io_type_supported(bdev, SPDK_BDEV_IO_TYPE_WRITE_ZEROES) && 4148 !bdev_io_type_supported(bdev, SPDK_BDEV_IO_TYPE_WRITE)) { 4149 return -ENOTSUP; 4150 } 4151 4152 bdev_io = bdev_channel_get_io(channel); 4153 4154 if (!bdev_io) { 4155 return -ENOMEM; 4156 } 4157 4158 bdev_io->type = SPDK_BDEV_IO_TYPE_WRITE_ZEROES; 4159 bdev_io->internal.ch = channel; 4160 bdev_io->internal.desc = desc; 4161 bdev_io->u.bdev.offset_blocks = offset_blocks; 4162 bdev_io->u.bdev.num_blocks = num_blocks; 4163 bdev_io_init(bdev_io, bdev, cb_arg, cb); 4164 4165 if (bdev_io_type_supported(bdev, SPDK_BDEV_IO_TYPE_WRITE_ZEROES)) { 4166 bdev_io_submit(bdev_io); 4167 return 0; 4168 } 4169 4170 assert(bdev_io_type_supported(bdev, SPDK_BDEV_IO_TYPE_WRITE)); 4171 assert(_bdev_get_block_size_with_md(bdev) <= ZERO_BUFFER_SIZE); 4172 bdev_io->u.bdev.split_remaining_num_blocks = num_blocks; 4173 bdev_io->u.bdev.split_current_offset_blocks = offset_blocks; 4174 bdev_write_zero_buffer_next(bdev_io); 4175 4176 return 0; 4177 } 4178 4179 int 4180 spdk_bdev_unmap(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 4181 uint64_t offset, uint64_t nbytes, 4182 spdk_bdev_io_completion_cb cb, void *cb_arg) 4183 { 4184 uint64_t offset_blocks, num_blocks; 4185 4186 if (bdev_bytes_to_blocks(spdk_bdev_desc_get_bdev(desc), offset, &offset_blocks, 4187 nbytes, &num_blocks) != 0) { 4188 return -EINVAL; 4189 } 4190 4191 return spdk_bdev_unmap_blocks(desc, ch, offset_blocks, num_blocks, cb, cb_arg); 4192 } 4193 4194 int 4195 spdk_bdev_unmap_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 4196 uint64_t offset_blocks, uint64_t num_blocks, 4197 spdk_bdev_io_completion_cb cb, void *cb_arg) 4198 { 4199 struct spdk_bdev *bdev = spdk_bdev_desc_get_bdev(desc); 4200 struct spdk_bdev_io *bdev_io; 4201 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 4202 4203 if (!desc->write) { 4204 return -EBADF; 4205 } 4206 4207 if (!bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) { 4208 return -EINVAL; 4209 } 4210 4211 if (num_blocks == 0) { 4212 SPDK_ERRLOG("Can't unmap 0 bytes\n"); 4213 return -EINVAL; 4214 } 4215 4216 bdev_io = bdev_channel_get_io(channel); 4217 if (!bdev_io) { 4218 return -ENOMEM; 4219 } 4220 4221 bdev_io->internal.ch = channel; 4222 bdev_io->internal.desc = desc; 4223 bdev_io->type = SPDK_BDEV_IO_TYPE_UNMAP; 4224 4225 bdev_io->u.bdev.iovs = &bdev_io->iov; 4226 bdev_io->u.bdev.iovs[0].iov_base = NULL; 4227 bdev_io->u.bdev.iovs[0].iov_len = 0; 4228 bdev_io->u.bdev.iovcnt = 1; 4229 4230 bdev_io->u.bdev.offset_blocks = offset_blocks; 4231 bdev_io->u.bdev.num_blocks = num_blocks; 4232 bdev_io_init(bdev_io, bdev, cb_arg, cb); 4233 4234 bdev_io_submit(bdev_io); 4235 return 0; 4236 } 4237 4238 int 4239 spdk_bdev_flush(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 4240 uint64_t offset, uint64_t length, 4241 spdk_bdev_io_completion_cb cb, void *cb_arg) 4242 { 4243 uint64_t offset_blocks, num_blocks; 4244 4245 if (bdev_bytes_to_blocks(spdk_bdev_desc_get_bdev(desc), offset, &offset_blocks, 4246 length, &num_blocks) != 0) { 4247 return -EINVAL; 4248 } 4249 4250 return spdk_bdev_flush_blocks(desc, ch, offset_blocks, num_blocks, cb, cb_arg); 4251 } 4252 4253 int 4254 spdk_bdev_flush_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 4255 uint64_t offset_blocks, uint64_t num_blocks, 4256 spdk_bdev_io_completion_cb cb, void *cb_arg) 4257 { 4258 struct spdk_bdev *bdev = spdk_bdev_desc_get_bdev(desc); 4259 struct spdk_bdev_io *bdev_io; 4260 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 4261 4262 if (!desc->write) { 4263 return -EBADF; 4264 } 4265 4266 if (!bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) { 4267 return -EINVAL; 4268 } 4269 4270 bdev_io = bdev_channel_get_io(channel); 4271 if (!bdev_io) { 4272 return -ENOMEM; 4273 } 4274 4275 bdev_io->internal.ch = channel; 4276 bdev_io->internal.desc = desc; 4277 bdev_io->type = SPDK_BDEV_IO_TYPE_FLUSH; 4278 bdev_io->u.bdev.iovs = NULL; 4279 bdev_io->u.bdev.iovcnt = 0; 4280 bdev_io->u.bdev.offset_blocks = offset_blocks; 4281 bdev_io->u.bdev.num_blocks = num_blocks; 4282 bdev_io_init(bdev_io, bdev, cb_arg, cb); 4283 4284 bdev_io_submit(bdev_io); 4285 return 0; 4286 } 4287 4288 static void 4289 bdev_reset_dev(struct spdk_io_channel_iter *i, int status) 4290 { 4291 struct spdk_bdev_channel *ch = spdk_io_channel_iter_get_ctx(i); 4292 struct spdk_bdev_io *bdev_io; 4293 4294 bdev_io = TAILQ_FIRST(&ch->queued_resets); 4295 TAILQ_REMOVE(&ch->queued_resets, bdev_io, internal.link); 4296 bdev_io_submit_reset(bdev_io); 4297 } 4298 4299 static void 4300 bdev_reset_freeze_channel(struct spdk_io_channel_iter *i) 4301 { 4302 struct spdk_io_channel *ch; 4303 struct spdk_bdev_channel *channel; 4304 struct spdk_bdev_mgmt_channel *mgmt_channel; 4305 struct spdk_bdev_shared_resource *shared_resource; 4306 bdev_io_tailq_t tmp_queued; 4307 4308 TAILQ_INIT(&tmp_queued); 4309 4310 ch = spdk_io_channel_iter_get_channel(i); 4311 channel = spdk_io_channel_get_ctx(ch); 4312 shared_resource = channel->shared_resource; 4313 mgmt_channel = shared_resource->mgmt_ch; 4314 4315 channel->flags |= BDEV_CH_RESET_IN_PROGRESS; 4316 4317 if ((channel->flags & BDEV_CH_QOS_ENABLED) != 0) { 4318 /* The QoS object is always valid and readable while 4319 * the channel flag is set, so the lock here should not 4320 * be necessary. We're not in the fast path though, so 4321 * just take it anyway. */ 4322 pthread_mutex_lock(&channel->bdev->internal.mutex); 4323 if (channel->bdev->internal.qos->ch == channel) { 4324 TAILQ_SWAP(&channel->bdev->internal.qos->queued, &tmp_queued, spdk_bdev_io, internal.link); 4325 } 4326 pthread_mutex_unlock(&channel->bdev->internal.mutex); 4327 } 4328 4329 bdev_abort_all_queued_io(&shared_resource->nomem_io, channel); 4330 bdev_abort_all_buf_io(&mgmt_channel->need_buf_small, channel); 4331 bdev_abort_all_buf_io(&mgmt_channel->need_buf_large, channel); 4332 bdev_abort_all_queued_io(&tmp_queued, channel); 4333 4334 spdk_for_each_channel_continue(i, 0); 4335 } 4336 4337 static void 4338 bdev_start_reset(void *ctx) 4339 { 4340 struct spdk_bdev_channel *ch = ctx; 4341 4342 spdk_for_each_channel(__bdev_to_io_dev(ch->bdev), bdev_reset_freeze_channel, 4343 ch, bdev_reset_dev); 4344 } 4345 4346 static void 4347 bdev_channel_start_reset(struct spdk_bdev_channel *ch) 4348 { 4349 struct spdk_bdev *bdev = ch->bdev; 4350 4351 assert(!TAILQ_EMPTY(&ch->queued_resets)); 4352 4353 pthread_mutex_lock(&bdev->internal.mutex); 4354 if (bdev->internal.reset_in_progress == NULL) { 4355 bdev->internal.reset_in_progress = TAILQ_FIRST(&ch->queued_resets); 4356 /* 4357 * Take a channel reference for the target bdev for the life of this 4358 * reset. This guards against the channel getting destroyed while 4359 * spdk_for_each_channel() calls related to this reset IO are in 4360 * progress. We will release the reference when this reset is 4361 * completed. 4362 */ 4363 bdev->internal.reset_in_progress->u.reset.ch_ref = spdk_get_io_channel(__bdev_to_io_dev(bdev)); 4364 bdev_start_reset(ch); 4365 } 4366 pthread_mutex_unlock(&bdev->internal.mutex); 4367 } 4368 4369 int 4370 spdk_bdev_reset(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 4371 spdk_bdev_io_completion_cb cb, void *cb_arg) 4372 { 4373 struct spdk_bdev *bdev = spdk_bdev_desc_get_bdev(desc); 4374 struct spdk_bdev_io *bdev_io; 4375 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 4376 4377 bdev_io = bdev_channel_get_io(channel); 4378 if (!bdev_io) { 4379 return -ENOMEM; 4380 } 4381 4382 bdev_io->internal.ch = channel; 4383 bdev_io->internal.desc = desc; 4384 bdev_io->internal.submit_tsc = spdk_get_ticks(); 4385 bdev_io->type = SPDK_BDEV_IO_TYPE_RESET; 4386 bdev_io->u.reset.ch_ref = NULL; 4387 bdev_io_init(bdev_io, bdev, cb_arg, cb); 4388 4389 pthread_mutex_lock(&bdev->internal.mutex); 4390 TAILQ_INSERT_TAIL(&channel->queued_resets, bdev_io, internal.link); 4391 pthread_mutex_unlock(&bdev->internal.mutex); 4392 4393 TAILQ_INSERT_TAIL(&bdev_io->internal.ch->io_submitted, bdev_io, 4394 internal.ch_link); 4395 4396 bdev_channel_start_reset(channel); 4397 4398 return 0; 4399 } 4400 4401 void 4402 spdk_bdev_get_io_stat(struct spdk_bdev *bdev, struct spdk_io_channel *ch, 4403 struct spdk_bdev_io_stat *stat) 4404 { 4405 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 4406 4407 *stat = channel->stat; 4408 } 4409 4410 static void 4411 bdev_get_device_stat_done(struct spdk_io_channel_iter *i, int status) 4412 { 4413 void *io_device = spdk_io_channel_iter_get_io_device(i); 4414 struct spdk_bdev_iostat_ctx *bdev_iostat_ctx = spdk_io_channel_iter_get_ctx(i); 4415 4416 bdev_iostat_ctx->cb(__bdev_from_io_dev(io_device), bdev_iostat_ctx->stat, 4417 bdev_iostat_ctx->cb_arg, 0); 4418 free(bdev_iostat_ctx); 4419 } 4420 4421 static void 4422 bdev_get_each_channel_stat(struct spdk_io_channel_iter *i) 4423 { 4424 struct spdk_bdev_iostat_ctx *bdev_iostat_ctx = spdk_io_channel_iter_get_ctx(i); 4425 struct spdk_io_channel *ch = spdk_io_channel_iter_get_channel(i); 4426 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 4427 4428 bdev_io_stat_add(bdev_iostat_ctx->stat, &channel->stat); 4429 spdk_for_each_channel_continue(i, 0); 4430 } 4431 4432 void 4433 spdk_bdev_get_device_stat(struct spdk_bdev *bdev, struct spdk_bdev_io_stat *stat, 4434 spdk_bdev_get_device_stat_cb cb, void *cb_arg) 4435 { 4436 struct spdk_bdev_iostat_ctx *bdev_iostat_ctx; 4437 4438 assert(bdev != NULL); 4439 assert(stat != NULL); 4440 assert(cb != NULL); 4441 4442 bdev_iostat_ctx = calloc(1, sizeof(struct spdk_bdev_iostat_ctx)); 4443 if (bdev_iostat_ctx == NULL) { 4444 SPDK_ERRLOG("Unable to allocate memory for spdk_bdev_iostat_ctx\n"); 4445 cb(bdev, stat, cb_arg, -ENOMEM); 4446 return; 4447 } 4448 4449 bdev_iostat_ctx->stat = stat; 4450 bdev_iostat_ctx->cb = cb; 4451 bdev_iostat_ctx->cb_arg = cb_arg; 4452 4453 /* Start with the statistics from previously deleted channels. */ 4454 pthread_mutex_lock(&bdev->internal.mutex); 4455 bdev_io_stat_add(bdev_iostat_ctx->stat, &bdev->internal.stat); 4456 pthread_mutex_unlock(&bdev->internal.mutex); 4457 4458 /* Then iterate and add the statistics from each existing channel. */ 4459 spdk_for_each_channel(__bdev_to_io_dev(bdev), 4460 bdev_get_each_channel_stat, 4461 bdev_iostat_ctx, 4462 bdev_get_device_stat_done); 4463 } 4464 4465 int 4466 spdk_bdev_nvme_admin_passthru(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 4467 const struct spdk_nvme_cmd *cmd, void *buf, size_t nbytes, 4468 spdk_bdev_io_completion_cb cb, void *cb_arg) 4469 { 4470 struct spdk_bdev *bdev = spdk_bdev_desc_get_bdev(desc); 4471 struct spdk_bdev_io *bdev_io; 4472 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 4473 4474 if (!desc->write) { 4475 return -EBADF; 4476 } 4477 4478 bdev_io = bdev_channel_get_io(channel); 4479 if (!bdev_io) { 4480 return -ENOMEM; 4481 } 4482 4483 bdev_io->internal.ch = channel; 4484 bdev_io->internal.desc = desc; 4485 bdev_io->type = SPDK_BDEV_IO_TYPE_NVME_ADMIN; 4486 bdev_io->u.nvme_passthru.cmd = *cmd; 4487 bdev_io->u.nvme_passthru.buf = buf; 4488 bdev_io->u.nvme_passthru.nbytes = nbytes; 4489 bdev_io->u.nvme_passthru.md_buf = NULL; 4490 bdev_io->u.nvme_passthru.md_len = 0; 4491 4492 bdev_io_init(bdev_io, bdev, cb_arg, cb); 4493 4494 bdev_io_submit(bdev_io); 4495 return 0; 4496 } 4497 4498 int 4499 spdk_bdev_nvme_io_passthru(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 4500 const struct spdk_nvme_cmd *cmd, void *buf, size_t nbytes, 4501 spdk_bdev_io_completion_cb cb, void *cb_arg) 4502 { 4503 struct spdk_bdev *bdev = spdk_bdev_desc_get_bdev(desc); 4504 struct spdk_bdev_io *bdev_io; 4505 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 4506 4507 if (!desc->write) { 4508 /* 4509 * Do not try to parse the NVMe command - we could maybe use bits in the opcode 4510 * to easily determine if the command is a read or write, but for now just 4511 * do not allow io_passthru with a read-only descriptor. 4512 */ 4513 return -EBADF; 4514 } 4515 4516 bdev_io = bdev_channel_get_io(channel); 4517 if (!bdev_io) { 4518 return -ENOMEM; 4519 } 4520 4521 bdev_io->internal.ch = channel; 4522 bdev_io->internal.desc = desc; 4523 bdev_io->type = SPDK_BDEV_IO_TYPE_NVME_IO; 4524 bdev_io->u.nvme_passthru.cmd = *cmd; 4525 bdev_io->u.nvme_passthru.buf = buf; 4526 bdev_io->u.nvme_passthru.nbytes = nbytes; 4527 bdev_io->u.nvme_passthru.md_buf = NULL; 4528 bdev_io->u.nvme_passthru.md_len = 0; 4529 4530 bdev_io_init(bdev_io, bdev, cb_arg, cb); 4531 4532 bdev_io_submit(bdev_io); 4533 return 0; 4534 } 4535 4536 int 4537 spdk_bdev_nvme_io_passthru_md(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 4538 const struct spdk_nvme_cmd *cmd, void *buf, size_t nbytes, void *md_buf, size_t md_len, 4539 spdk_bdev_io_completion_cb cb, void *cb_arg) 4540 { 4541 struct spdk_bdev *bdev = spdk_bdev_desc_get_bdev(desc); 4542 struct spdk_bdev_io *bdev_io; 4543 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 4544 4545 if (!desc->write) { 4546 /* 4547 * Do not try to parse the NVMe command - we could maybe use bits in the opcode 4548 * to easily determine if the command is a read or write, but for now just 4549 * do not allow io_passthru with a read-only descriptor. 4550 */ 4551 return -EBADF; 4552 } 4553 4554 bdev_io = bdev_channel_get_io(channel); 4555 if (!bdev_io) { 4556 return -ENOMEM; 4557 } 4558 4559 bdev_io->internal.ch = channel; 4560 bdev_io->internal.desc = desc; 4561 bdev_io->type = SPDK_BDEV_IO_TYPE_NVME_IO_MD; 4562 bdev_io->u.nvme_passthru.cmd = *cmd; 4563 bdev_io->u.nvme_passthru.buf = buf; 4564 bdev_io->u.nvme_passthru.nbytes = nbytes; 4565 bdev_io->u.nvme_passthru.md_buf = md_buf; 4566 bdev_io->u.nvme_passthru.md_len = md_len; 4567 4568 bdev_io_init(bdev_io, bdev, cb_arg, cb); 4569 4570 bdev_io_submit(bdev_io); 4571 return 0; 4572 } 4573 4574 static void bdev_abort_retry(void *ctx); 4575 static void bdev_abort(struct spdk_bdev_io *parent_io); 4576 4577 static void 4578 bdev_abort_io_done(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg) 4579 { 4580 struct spdk_bdev_channel *channel = bdev_io->internal.ch; 4581 struct spdk_bdev_io *parent_io = cb_arg; 4582 struct spdk_bdev_io *bio_to_abort, *tmp_io; 4583 4584 bio_to_abort = bdev_io->u.abort.bio_to_abort; 4585 4586 spdk_bdev_free_io(bdev_io); 4587 4588 if (!success) { 4589 /* Check if the target I/O completed in the meantime. */ 4590 TAILQ_FOREACH(tmp_io, &channel->io_submitted, internal.ch_link) { 4591 if (tmp_io == bio_to_abort) { 4592 break; 4593 } 4594 } 4595 4596 /* If the target I/O still exists, set the parent to failed. */ 4597 if (tmp_io != NULL) { 4598 parent_io->internal.status = SPDK_BDEV_IO_STATUS_FAILED; 4599 } 4600 } 4601 4602 parent_io->u.bdev.split_outstanding--; 4603 if (parent_io->u.bdev.split_outstanding == 0) { 4604 if (parent_io->internal.status == SPDK_BDEV_IO_STATUS_NOMEM) { 4605 bdev_abort_retry(parent_io); 4606 } else { 4607 bdev_io_complete(parent_io); 4608 } 4609 } 4610 } 4611 4612 static int 4613 bdev_abort_io(struct spdk_bdev_desc *desc, struct spdk_bdev_channel *channel, 4614 struct spdk_bdev_io *bio_to_abort, 4615 spdk_bdev_io_completion_cb cb, void *cb_arg) 4616 { 4617 struct spdk_bdev *bdev = spdk_bdev_desc_get_bdev(desc); 4618 struct spdk_bdev_io *bdev_io; 4619 4620 if (bio_to_abort->type == SPDK_BDEV_IO_TYPE_ABORT || 4621 bio_to_abort->type == SPDK_BDEV_IO_TYPE_RESET) { 4622 /* TODO: Abort reset or abort request. */ 4623 return -ENOTSUP; 4624 } 4625 4626 bdev_io = bdev_channel_get_io(channel); 4627 if (bdev_io == NULL) { 4628 return -ENOMEM; 4629 } 4630 4631 bdev_io->internal.ch = channel; 4632 bdev_io->internal.desc = desc; 4633 bdev_io->type = SPDK_BDEV_IO_TYPE_ABORT; 4634 bdev_io_init(bdev_io, bdev, cb_arg, cb); 4635 4636 if (bdev->split_on_optimal_io_boundary && bdev_io_should_split(bio_to_abort)) { 4637 bdev_io->u.bdev.abort.bio_cb_arg = bio_to_abort; 4638 4639 /* Parent abort request is not submitted directly, but to manage its 4640 * execution add it to the submitted list here. 4641 */ 4642 bdev_io->internal.submit_tsc = spdk_get_ticks(); 4643 TAILQ_INSERT_TAIL(&channel->io_submitted, bdev_io, internal.ch_link); 4644 4645 bdev_abort(bdev_io); 4646 4647 return 0; 4648 } 4649 4650 bdev_io->u.abort.bio_to_abort = bio_to_abort; 4651 4652 /* Submit the abort request to the underlying bdev module. */ 4653 bdev_io_submit(bdev_io); 4654 4655 return 0; 4656 } 4657 4658 static uint32_t 4659 _bdev_abort(struct spdk_bdev_io *parent_io) 4660 { 4661 struct spdk_bdev_desc *desc = parent_io->internal.desc; 4662 struct spdk_bdev_channel *channel = parent_io->internal.ch; 4663 void *bio_cb_arg; 4664 struct spdk_bdev_io *bio_to_abort; 4665 uint32_t matched_ios; 4666 int rc; 4667 4668 bio_cb_arg = parent_io->u.bdev.abort.bio_cb_arg; 4669 4670 /* matched_ios is returned and will be kept by the caller. 4671 * 4672 * This funcion will be used for two cases, 1) the same cb_arg is used for 4673 * multiple I/Os, 2) a single large I/O is split into smaller ones. 4674 * Incrementing split_outstanding directly here may confuse readers especially 4675 * for the 1st case. 4676 * 4677 * Completion of I/O abort is processed after stack unwinding. Hence this trick 4678 * works as expected. 4679 */ 4680 matched_ios = 0; 4681 parent_io->internal.status = SPDK_BDEV_IO_STATUS_SUCCESS; 4682 4683 TAILQ_FOREACH(bio_to_abort, &channel->io_submitted, internal.ch_link) { 4684 if (bio_to_abort->internal.caller_ctx != bio_cb_arg) { 4685 continue; 4686 } 4687 4688 if (bio_to_abort->internal.submit_tsc > parent_io->internal.submit_tsc) { 4689 /* Any I/O which was submitted after this abort command should be excluded. */ 4690 continue; 4691 } 4692 4693 rc = bdev_abort_io(desc, channel, bio_to_abort, bdev_abort_io_done, parent_io); 4694 if (rc != 0) { 4695 if (rc == -ENOMEM) { 4696 parent_io->internal.status = SPDK_BDEV_IO_STATUS_NOMEM; 4697 } else { 4698 parent_io->internal.status = SPDK_BDEV_IO_STATUS_FAILED; 4699 } 4700 break; 4701 } 4702 matched_ios++; 4703 } 4704 4705 return matched_ios; 4706 } 4707 4708 static void 4709 bdev_abort_retry(void *ctx) 4710 { 4711 struct spdk_bdev_io *parent_io = ctx; 4712 uint32_t matched_ios; 4713 4714 matched_ios = _bdev_abort(parent_io); 4715 4716 if (matched_ios == 0) { 4717 if (parent_io->internal.status == SPDK_BDEV_IO_STATUS_NOMEM) { 4718 bdev_queue_io_wait_with_cb(parent_io, bdev_abort_retry); 4719 } else { 4720 /* For retry, the case that no target I/O was found is success 4721 * because it means target I/Os completed in the meantime. 4722 */ 4723 bdev_io_complete(parent_io); 4724 } 4725 return; 4726 } 4727 4728 /* Use split_outstanding to manage the progress of aborting I/Os. */ 4729 parent_io->u.bdev.split_outstanding = matched_ios; 4730 } 4731 4732 static void 4733 bdev_abort(struct spdk_bdev_io *parent_io) 4734 { 4735 uint32_t matched_ios; 4736 4737 matched_ios = _bdev_abort(parent_io); 4738 4739 if (matched_ios == 0) { 4740 if (parent_io->internal.status == SPDK_BDEV_IO_STATUS_NOMEM) { 4741 bdev_queue_io_wait_with_cb(parent_io, bdev_abort_retry); 4742 } else { 4743 /* The case the no target I/O was found is failure. */ 4744 parent_io->internal.status = SPDK_BDEV_IO_STATUS_FAILED; 4745 bdev_io_complete(parent_io); 4746 } 4747 return; 4748 } 4749 4750 /* Use split_outstanding to manage the progress of aborting I/Os. */ 4751 parent_io->u.bdev.split_outstanding = matched_ios; 4752 } 4753 4754 int 4755 spdk_bdev_abort(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 4756 void *bio_cb_arg, 4757 spdk_bdev_io_completion_cb cb, void *cb_arg) 4758 { 4759 struct spdk_bdev *bdev = spdk_bdev_desc_get_bdev(desc); 4760 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 4761 struct spdk_bdev_io *bdev_io; 4762 4763 if (bio_cb_arg == NULL) { 4764 return -EINVAL; 4765 } 4766 4767 if (!spdk_bdev_io_type_supported(bdev, SPDK_BDEV_IO_TYPE_ABORT)) { 4768 return -ENOTSUP; 4769 } 4770 4771 bdev_io = bdev_channel_get_io(channel); 4772 if (bdev_io == NULL) { 4773 return -ENOMEM; 4774 } 4775 4776 bdev_io->internal.ch = channel; 4777 bdev_io->internal.desc = desc; 4778 bdev_io->internal.submit_tsc = spdk_get_ticks(); 4779 bdev_io->type = SPDK_BDEV_IO_TYPE_ABORT; 4780 bdev_io_init(bdev_io, bdev, cb_arg, cb); 4781 4782 bdev_io->u.bdev.abort.bio_cb_arg = bio_cb_arg; 4783 4784 /* Parent abort request is not submitted directly, but to manage its execution, 4785 * add it to the submitted list here. 4786 */ 4787 TAILQ_INSERT_TAIL(&channel->io_submitted, bdev_io, internal.ch_link); 4788 4789 bdev_abort(bdev_io); 4790 4791 return 0; 4792 } 4793 4794 int 4795 spdk_bdev_queue_io_wait(struct spdk_bdev *bdev, struct spdk_io_channel *ch, 4796 struct spdk_bdev_io_wait_entry *entry) 4797 { 4798 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 4799 struct spdk_bdev_mgmt_channel *mgmt_ch = channel->shared_resource->mgmt_ch; 4800 4801 if (bdev != entry->bdev) { 4802 SPDK_ERRLOG("bdevs do not match\n"); 4803 return -EINVAL; 4804 } 4805 4806 if (mgmt_ch->per_thread_cache_count > 0) { 4807 SPDK_ERRLOG("Cannot queue io_wait if spdk_bdev_io available in per-thread cache\n"); 4808 return -EINVAL; 4809 } 4810 4811 TAILQ_INSERT_TAIL(&mgmt_ch->io_wait_queue, entry, link); 4812 return 0; 4813 } 4814 4815 static void 4816 bdev_ch_retry_io(struct spdk_bdev_channel *bdev_ch) 4817 { 4818 struct spdk_bdev *bdev = bdev_ch->bdev; 4819 struct spdk_bdev_shared_resource *shared_resource = bdev_ch->shared_resource; 4820 struct spdk_bdev_io *bdev_io; 4821 4822 if (shared_resource->io_outstanding > shared_resource->nomem_threshold) { 4823 /* 4824 * Allow some more I/O to complete before retrying the nomem_io queue. 4825 * Some drivers (such as nvme) cannot immediately take a new I/O in 4826 * the context of a completion, because the resources for the I/O are 4827 * not released until control returns to the bdev poller. Also, we 4828 * may require several small I/O to complete before a larger I/O 4829 * (that requires splitting) can be submitted. 4830 */ 4831 return; 4832 } 4833 4834 while (!TAILQ_EMPTY(&shared_resource->nomem_io)) { 4835 bdev_io = TAILQ_FIRST(&shared_resource->nomem_io); 4836 TAILQ_REMOVE(&shared_resource->nomem_io, bdev_io, internal.link); 4837 bdev_io->internal.ch->io_outstanding++; 4838 shared_resource->io_outstanding++; 4839 bdev_io->internal.status = SPDK_BDEV_IO_STATUS_PENDING; 4840 bdev_io->internal.error.nvme.cdw0 = 0; 4841 bdev_io->num_retries++; 4842 bdev->fn_table->submit_request(spdk_bdev_io_get_io_channel(bdev_io), bdev_io); 4843 if (bdev_io->internal.status == SPDK_BDEV_IO_STATUS_NOMEM) { 4844 break; 4845 } 4846 } 4847 } 4848 4849 static inline void 4850 bdev_io_complete(void *ctx) 4851 { 4852 struct spdk_bdev_io *bdev_io = ctx; 4853 struct spdk_bdev_channel *bdev_ch = bdev_io->internal.ch; 4854 uint64_t tsc, tsc_diff; 4855 4856 if (spdk_unlikely(bdev_io->internal.in_submit_request || bdev_io->internal.io_submit_ch)) { 4857 /* 4858 * Send the completion to the thread that originally submitted the I/O, 4859 * which may not be the current thread in the case of QoS. 4860 */ 4861 if (bdev_io->internal.io_submit_ch) { 4862 bdev_io->internal.ch = bdev_io->internal.io_submit_ch; 4863 bdev_io->internal.io_submit_ch = NULL; 4864 } 4865 4866 /* 4867 * Defer completion to avoid potential infinite recursion if the 4868 * user's completion callback issues a new I/O. 4869 */ 4870 spdk_thread_send_msg(spdk_bdev_io_get_thread(bdev_io), 4871 bdev_io_complete, bdev_io); 4872 return; 4873 } 4874 4875 tsc = spdk_get_ticks(); 4876 tsc_diff = tsc - bdev_io->internal.submit_tsc; 4877 spdk_trace_record_tsc(tsc, TRACE_BDEV_IO_DONE, 0, 0, (uintptr_t)bdev_io, 0); 4878 4879 TAILQ_REMOVE(&bdev_ch->io_submitted, bdev_io, internal.ch_link); 4880 4881 if (bdev_io->internal.ch->histogram) { 4882 spdk_histogram_data_tally(bdev_io->internal.ch->histogram, tsc_diff); 4883 } 4884 4885 if (bdev_io->internal.status == SPDK_BDEV_IO_STATUS_SUCCESS) { 4886 switch (bdev_io->type) { 4887 case SPDK_BDEV_IO_TYPE_READ: 4888 bdev_io->internal.ch->stat.bytes_read += bdev_io->u.bdev.num_blocks * bdev_io->bdev->blocklen; 4889 bdev_io->internal.ch->stat.num_read_ops++; 4890 bdev_io->internal.ch->stat.read_latency_ticks += tsc_diff; 4891 break; 4892 case SPDK_BDEV_IO_TYPE_WRITE: 4893 bdev_io->internal.ch->stat.bytes_written += bdev_io->u.bdev.num_blocks * bdev_io->bdev->blocklen; 4894 bdev_io->internal.ch->stat.num_write_ops++; 4895 bdev_io->internal.ch->stat.write_latency_ticks += tsc_diff; 4896 break; 4897 case SPDK_BDEV_IO_TYPE_UNMAP: 4898 bdev_io->internal.ch->stat.bytes_unmapped += bdev_io->u.bdev.num_blocks * bdev_io->bdev->blocklen; 4899 bdev_io->internal.ch->stat.num_unmap_ops++; 4900 bdev_io->internal.ch->stat.unmap_latency_ticks += tsc_diff; 4901 break; 4902 case SPDK_BDEV_IO_TYPE_ZCOPY: 4903 /* Track the data in the start phase only */ 4904 if (bdev_io->u.bdev.zcopy.start) { 4905 if (bdev_io->u.bdev.zcopy.populate) { 4906 bdev_io->internal.ch->stat.bytes_read += 4907 bdev_io->u.bdev.num_blocks * bdev_io->bdev->blocklen; 4908 bdev_io->internal.ch->stat.num_read_ops++; 4909 bdev_io->internal.ch->stat.read_latency_ticks += tsc_diff; 4910 } else { 4911 bdev_io->internal.ch->stat.bytes_written += 4912 bdev_io->u.bdev.num_blocks * bdev_io->bdev->blocklen; 4913 bdev_io->internal.ch->stat.num_write_ops++; 4914 bdev_io->internal.ch->stat.write_latency_ticks += tsc_diff; 4915 } 4916 } 4917 break; 4918 default: 4919 break; 4920 } 4921 } 4922 4923 #ifdef SPDK_CONFIG_VTUNE 4924 uint64_t now_tsc = spdk_get_ticks(); 4925 if (now_tsc > (bdev_io->internal.ch->start_tsc + bdev_io->internal.ch->interval_tsc)) { 4926 uint64_t data[5]; 4927 4928 data[0] = bdev_io->internal.ch->stat.num_read_ops - bdev_io->internal.ch->prev_stat.num_read_ops; 4929 data[1] = bdev_io->internal.ch->stat.bytes_read - bdev_io->internal.ch->prev_stat.bytes_read; 4930 data[2] = bdev_io->internal.ch->stat.num_write_ops - bdev_io->internal.ch->prev_stat.num_write_ops; 4931 data[3] = bdev_io->internal.ch->stat.bytes_written - bdev_io->internal.ch->prev_stat.bytes_written; 4932 data[4] = bdev_io->bdev->fn_table->get_spin_time ? 4933 bdev_io->bdev->fn_table->get_spin_time(spdk_bdev_io_get_io_channel(bdev_io)) : 0; 4934 4935 __itt_metadata_add(g_bdev_mgr.domain, __itt_null, bdev_io->internal.ch->handle, 4936 __itt_metadata_u64, 5, data); 4937 4938 bdev_io->internal.ch->prev_stat = bdev_io->internal.ch->stat; 4939 bdev_io->internal.ch->start_tsc = now_tsc; 4940 } 4941 #endif 4942 4943 assert(bdev_io->internal.cb != NULL); 4944 assert(spdk_get_thread() == spdk_bdev_io_get_thread(bdev_io)); 4945 4946 bdev_io->internal.cb(bdev_io, bdev_io->internal.status == SPDK_BDEV_IO_STATUS_SUCCESS, 4947 bdev_io->internal.caller_ctx); 4948 } 4949 4950 static void 4951 bdev_reset_complete(struct spdk_io_channel_iter *i, int status) 4952 { 4953 struct spdk_bdev_io *bdev_io = spdk_io_channel_iter_get_ctx(i); 4954 4955 if (bdev_io->u.reset.ch_ref != NULL) { 4956 spdk_put_io_channel(bdev_io->u.reset.ch_ref); 4957 bdev_io->u.reset.ch_ref = NULL; 4958 } 4959 4960 bdev_io_complete(bdev_io); 4961 } 4962 4963 static void 4964 bdev_unfreeze_channel(struct spdk_io_channel_iter *i) 4965 { 4966 struct spdk_bdev_io *bdev_io = spdk_io_channel_iter_get_ctx(i); 4967 struct spdk_io_channel *_ch = spdk_io_channel_iter_get_channel(i); 4968 struct spdk_bdev_channel *ch = spdk_io_channel_get_ctx(_ch); 4969 struct spdk_bdev_io *queued_reset; 4970 4971 ch->flags &= ~BDEV_CH_RESET_IN_PROGRESS; 4972 while (!TAILQ_EMPTY(&ch->queued_resets)) { 4973 queued_reset = TAILQ_FIRST(&ch->queued_resets); 4974 TAILQ_REMOVE(&ch->queued_resets, queued_reset, internal.link); 4975 spdk_bdev_io_complete(queued_reset, bdev_io->internal.status); 4976 } 4977 4978 spdk_for_each_channel_continue(i, 0); 4979 } 4980 4981 void 4982 spdk_bdev_io_complete(struct spdk_bdev_io *bdev_io, enum spdk_bdev_io_status status) 4983 { 4984 struct spdk_bdev *bdev = bdev_io->bdev; 4985 struct spdk_bdev_channel *bdev_ch = bdev_io->internal.ch; 4986 struct spdk_bdev_shared_resource *shared_resource = bdev_ch->shared_resource; 4987 4988 bdev_io->internal.status = status; 4989 4990 if (spdk_unlikely(bdev_io->type == SPDK_BDEV_IO_TYPE_RESET)) { 4991 bool unlock_channels = false; 4992 4993 if (status == SPDK_BDEV_IO_STATUS_NOMEM) { 4994 SPDK_ERRLOG("NOMEM returned for reset\n"); 4995 } 4996 pthread_mutex_lock(&bdev->internal.mutex); 4997 if (bdev_io == bdev->internal.reset_in_progress) { 4998 bdev->internal.reset_in_progress = NULL; 4999 unlock_channels = true; 5000 } 5001 pthread_mutex_unlock(&bdev->internal.mutex); 5002 5003 if (unlock_channels) { 5004 spdk_for_each_channel(__bdev_to_io_dev(bdev), bdev_unfreeze_channel, 5005 bdev_io, bdev_reset_complete); 5006 return; 5007 } 5008 } else { 5009 _bdev_io_unset_bounce_buf(bdev_io); 5010 5011 assert(bdev_ch->io_outstanding > 0); 5012 assert(shared_resource->io_outstanding > 0); 5013 bdev_ch->io_outstanding--; 5014 shared_resource->io_outstanding--; 5015 5016 if (spdk_unlikely(status == SPDK_BDEV_IO_STATUS_NOMEM)) { 5017 TAILQ_INSERT_HEAD(&shared_resource->nomem_io, bdev_io, internal.link); 5018 /* 5019 * Wait for some of the outstanding I/O to complete before we 5020 * retry any of the nomem_io. Normally we will wait for 5021 * NOMEM_THRESHOLD_COUNT I/O to complete but for low queue 5022 * depth channels we will instead wait for half to complete. 5023 */ 5024 shared_resource->nomem_threshold = spdk_max((int64_t)shared_resource->io_outstanding / 2, 5025 (int64_t)shared_resource->io_outstanding - NOMEM_THRESHOLD_COUNT); 5026 return; 5027 } 5028 5029 if (spdk_unlikely(!TAILQ_EMPTY(&shared_resource->nomem_io))) { 5030 bdev_ch_retry_io(bdev_ch); 5031 } 5032 } 5033 5034 bdev_io_complete(bdev_io); 5035 } 5036 5037 void 5038 spdk_bdev_io_complete_scsi_status(struct spdk_bdev_io *bdev_io, enum spdk_scsi_status sc, 5039 enum spdk_scsi_sense sk, uint8_t asc, uint8_t ascq) 5040 { 5041 if (sc == SPDK_SCSI_STATUS_GOOD) { 5042 bdev_io->internal.status = SPDK_BDEV_IO_STATUS_SUCCESS; 5043 } else { 5044 bdev_io->internal.status = SPDK_BDEV_IO_STATUS_SCSI_ERROR; 5045 bdev_io->internal.error.scsi.sc = sc; 5046 bdev_io->internal.error.scsi.sk = sk; 5047 bdev_io->internal.error.scsi.asc = asc; 5048 bdev_io->internal.error.scsi.ascq = ascq; 5049 } 5050 5051 spdk_bdev_io_complete(bdev_io, bdev_io->internal.status); 5052 } 5053 5054 void 5055 spdk_bdev_io_get_scsi_status(const struct spdk_bdev_io *bdev_io, 5056 int *sc, int *sk, int *asc, int *ascq) 5057 { 5058 assert(sc != NULL); 5059 assert(sk != NULL); 5060 assert(asc != NULL); 5061 assert(ascq != NULL); 5062 5063 switch (bdev_io->internal.status) { 5064 case SPDK_BDEV_IO_STATUS_SUCCESS: 5065 *sc = SPDK_SCSI_STATUS_GOOD; 5066 *sk = SPDK_SCSI_SENSE_NO_SENSE; 5067 *asc = SPDK_SCSI_ASC_NO_ADDITIONAL_SENSE; 5068 *ascq = SPDK_SCSI_ASCQ_CAUSE_NOT_REPORTABLE; 5069 break; 5070 case SPDK_BDEV_IO_STATUS_NVME_ERROR: 5071 spdk_scsi_nvme_translate(bdev_io, sc, sk, asc, ascq); 5072 break; 5073 case SPDK_BDEV_IO_STATUS_SCSI_ERROR: 5074 *sc = bdev_io->internal.error.scsi.sc; 5075 *sk = bdev_io->internal.error.scsi.sk; 5076 *asc = bdev_io->internal.error.scsi.asc; 5077 *ascq = bdev_io->internal.error.scsi.ascq; 5078 break; 5079 default: 5080 *sc = SPDK_SCSI_STATUS_CHECK_CONDITION; 5081 *sk = SPDK_SCSI_SENSE_ABORTED_COMMAND; 5082 *asc = SPDK_SCSI_ASC_NO_ADDITIONAL_SENSE; 5083 *ascq = SPDK_SCSI_ASCQ_CAUSE_NOT_REPORTABLE; 5084 break; 5085 } 5086 } 5087 5088 void 5089 spdk_bdev_io_complete_nvme_status(struct spdk_bdev_io *bdev_io, uint32_t cdw0, int sct, int sc) 5090 { 5091 if (sct == SPDK_NVME_SCT_GENERIC && sc == SPDK_NVME_SC_SUCCESS) { 5092 bdev_io->internal.status = SPDK_BDEV_IO_STATUS_SUCCESS; 5093 } else { 5094 bdev_io->internal.status = SPDK_BDEV_IO_STATUS_NVME_ERROR; 5095 } 5096 5097 bdev_io->internal.error.nvme.cdw0 = cdw0; 5098 bdev_io->internal.error.nvme.sct = sct; 5099 bdev_io->internal.error.nvme.sc = sc; 5100 5101 spdk_bdev_io_complete(bdev_io, bdev_io->internal.status); 5102 } 5103 5104 void 5105 spdk_bdev_io_get_nvme_status(const struct spdk_bdev_io *bdev_io, uint32_t *cdw0, int *sct, int *sc) 5106 { 5107 assert(sct != NULL); 5108 assert(sc != NULL); 5109 assert(cdw0 != NULL); 5110 5111 if (bdev_io->internal.status == SPDK_BDEV_IO_STATUS_NVME_ERROR) { 5112 *sct = bdev_io->internal.error.nvme.sct; 5113 *sc = bdev_io->internal.error.nvme.sc; 5114 } else if (bdev_io->internal.status == SPDK_BDEV_IO_STATUS_SUCCESS) { 5115 *sct = SPDK_NVME_SCT_GENERIC; 5116 *sc = SPDK_NVME_SC_SUCCESS; 5117 } else if (bdev_io->internal.status == SPDK_BDEV_IO_STATUS_ABORTED) { 5118 *sct = SPDK_NVME_SCT_GENERIC; 5119 *sc = SPDK_NVME_SC_ABORTED_BY_REQUEST; 5120 } else { 5121 *sct = SPDK_NVME_SCT_GENERIC; 5122 *sc = SPDK_NVME_SC_INTERNAL_DEVICE_ERROR; 5123 } 5124 5125 *cdw0 = bdev_io->internal.error.nvme.cdw0; 5126 } 5127 5128 void 5129 spdk_bdev_io_get_nvme_fused_status(const struct spdk_bdev_io *bdev_io, uint32_t *cdw0, 5130 int *first_sct, int *first_sc, int *second_sct, int *second_sc) 5131 { 5132 assert(first_sct != NULL); 5133 assert(first_sc != NULL); 5134 assert(second_sct != NULL); 5135 assert(second_sc != NULL); 5136 assert(cdw0 != NULL); 5137 5138 if (bdev_io->internal.status == SPDK_BDEV_IO_STATUS_NVME_ERROR) { 5139 if (bdev_io->internal.error.nvme.sct == SPDK_NVME_SCT_MEDIA_ERROR && 5140 bdev_io->internal.error.nvme.sc == SPDK_NVME_SC_COMPARE_FAILURE) { 5141 *first_sct = bdev_io->internal.error.nvme.sct; 5142 *first_sc = bdev_io->internal.error.nvme.sc; 5143 *second_sct = SPDK_NVME_SCT_GENERIC; 5144 *second_sc = SPDK_NVME_SC_ABORTED_FAILED_FUSED; 5145 } else { 5146 *first_sct = SPDK_NVME_SCT_GENERIC; 5147 *first_sc = SPDK_NVME_SC_SUCCESS; 5148 *second_sct = bdev_io->internal.error.nvme.sct; 5149 *second_sc = bdev_io->internal.error.nvme.sc; 5150 } 5151 } else if (bdev_io->internal.status == SPDK_BDEV_IO_STATUS_SUCCESS) { 5152 *first_sct = SPDK_NVME_SCT_GENERIC; 5153 *first_sc = SPDK_NVME_SC_SUCCESS; 5154 *second_sct = SPDK_NVME_SCT_GENERIC; 5155 *second_sc = SPDK_NVME_SC_SUCCESS; 5156 } else if (bdev_io->internal.status == SPDK_BDEV_IO_STATUS_FIRST_FUSED_FAILED) { 5157 *first_sct = SPDK_NVME_SCT_GENERIC; 5158 *first_sc = SPDK_NVME_SC_INTERNAL_DEVICE_ERROR; 5159 *second_sct = SPDK_NVME_SCT_GENERIC; 5160 *second_sc = SPDK_NVME_SC_ABORTED_FAILED_FUSED; 5161 } else if (bdev_io->internal.status == SPDK_BDEV_IO_STATUS_MISCOMPARE) { 5162 *first_sct = SPDK_NVME_SCT_MEDIA_ERROR; 5163 *first_sc = SPDK_NVME_SC_COMPARE_FAILURE; 5164 *second_sct = SPDK_NVME_SCT_GENERIC; 5165 *second_sc = SPDK_NVME_SC_ABORTED_FAILED_FUSED; 5166 } else { 5167 *first_sct = SPDK_NVME_SCT_GENERIC; 5168 *first_sc = SPDK_NVME_SC_INTERNAL_DEVICE_ERROR; 5169 *second_sct = SPDK_NVME_SCT_GENERIC; 5170 *second_sc = SPDK_NVME_SC_INTERNAL_DEVICE_ERROR; 5171 } 5172 5173 *cdw0 = bdev_io->internal.error.nvme.cdw0; 5174 } 5175 5176 struct spdk_thread * 5177 spdk_bdev_io_get_thread(struct spdk_bdev_io *bdev_io) 5178 { 5179 return spdk_io_channel_get_thread(bdev_io->internal.ch->channel); 5180 } 5181 5182 struct spdk_io_channel * 5183 spdk_bdev_io_get_io_channel(struct spdk_bdev_io *bdev_io) 5184 { 5185 return bdev_io->internal.ch->channel; 5186 } 5187 5188 static int 5189 bdev_init(struct spdk_bdev *bdev) 5190 { 5191 char *bdev_name; 5192 5193 assert(bdev->module != NULL); 5194 5195 if (!bdev->name) { 5196 SPDK_ERRLOG("Bdev name is NULL\n"); 5197 return -EINVAL; 5198 } 5199 5200 if (!strlen(bdev->name)) { 5201 SPDK_ERRLOG("Bdev name must not be an empty string\n"); 5202 return -EINVAL; 5203 } 5204 5205 if (spdk_bdev_get_by_name(bdev->name)) { 5206 SPDK_ERRLOG("Bdev name:%s already exists\n", bdev->name); 5207 return -EEXIST; 5208 } 5209 5210 /* Users often register their own I/O devices using the bdev name. In 5211 * order to avoid conflicts, prepend bdev_. */ 5212 bdev_name = spdk_sprintf_alloc("bdev_%s", bdev->name); 5213 if (!bdev_name) { 5214 SPDK_ERRLOG("Unable to allocate memory for internal bdev name.\n"); 5215 return -ENOMEM; 5216 } 5217 5218 bdev->internal.status = SPDK_BDEV_STATUS_READY; 5219 bdev->internal.measured_queue_depth = UINT64_MAX; 5220 bdev->internal.claim_module = NULL; 5221 bdev->internal.qd_poller = NULL; 5222 bdev->internal.qos = NULL; 5223 5224 /* If the user didn't specify a uuid, generate one. */ 5225 if (spdk_mem_all_zero(&bdev->uuid, sizeof(bdev->uuid))) { 5226 spdk_uuid_generate(&bdev->uuid); 5227 } 5228 5229 if (spdk_bdev_get_buf_align(bdev) > 1) { 5230 if (bdev->split_on_optimal_io_boundary) { 5231 bdev->optimal_io_boundary = spdk_min(bdev->optimal_io_boundary, 5232 SPDK_BDEV_LARGE_BUF_MAX_SIZE / bdev->blocklen); 5233 } else { 5234 bdev->split_on_optimal_io_boundary = true; 5235 bdev->optimal_io_boundary = SPDK_BDEV_LARGE_BUF_MAX_SIZE / bdev->blocklen; 5236 } 5237 } 5238 5239 /* If the user didn't specify a write unit size, set it to one. */ 5240 if (bdev->write_unit_size == 0) { 5241 bdev->write_unit_size = 1; 5242 } 5243 5244 /* Set ACWU value to 1 if bdev module did not set it (does not support it natively) */ 5245 if (bdev->acwu == 0) { 5246 bdev->acwu = 1; 5247 } 5248 5249 TAILQ_INIT(&bdev->internal.open_descs); 5250 TAILQ_INIT(&bdev->internal.locked_ranges); 5251 TAILQ_INIT(&bdev->internal.pending_locked_ranges); 5252 5253 TAILQ_INIT(&bdev->aliases); 5254 5255 bdev->internal.reset_in_progress = NULL; 5256 5257 spdk_io_device_register(__bdev_to_io_dev(bdev), 5258 bdev_channel_create, bdev_channel_destroy, 5259 sizeof(struct spdk_bdev_channel), 5260 bdev_name); 5261 5262 free(bdev_name); 5263 5264 pthread_mutex_init(&bdev->internal.mutex, NULL); 5265 return 0; 5266 } 5267 5268 static void 5269 bdev_destroy_cb(void *io_device) 5270 { 5271 int rc; 5272 struct spdk_bdev *bdev; 5273 spdk_bdev_unregister_cb cb_fn; 5274 void *cb_arg; 5275 5276 bdev = __bdev_from_io_dev(io_device); 5277 cb_fn = bdev->internal.unregister_cb; 5278 cb_arg = bdev->internal.unregister_ctx; 5279 5280 rc = bdev->fn_table->destruct(bdev->ctxt); 5281 if (rc < 0) { 5282 SPDK_ERRLOG("destruct failed\n"); 5283 } 5284 if (rc <= 0 && cb_fn != NULL) { 5285 cb_fn(cb_arg, rc); 5286 } 5287 } 5288 5289 5290 static void 5291 bdev_fini(struct spdk_bdev *bdev) 5292 { 5293 pthread_mutex_destroy(&bdev->internal.mutex); 5294 5295 free(bdev->internal.qos); 5296 5297 spdk_io_device_unregister(__bdev_to_io_dev(bdev), bdev_destroy_cb); 5298 } 5299 5300 static void 5301 bdev_start(struct spdk_bdev *bdev) 5302 { 5303 SPDK_DEBUGLOG(bdev, "Inserting bdev %s into list\n", bdev->name); 5304 TAILQ_INSERT_TAIL(&g_bdev_mgr.bdevs, bdev, internal.link); 5305 5306 /* Examine configuration before initializing I/O */ 5307 bdev_examine(bdev); 5308 } 5309 5310 int 5311 spdk_bdev_register(struct spdk_bdev *bdev) 5312 { 5313 int rc = bdev_init(bdev); 5314 5315 if (rc == 0) { 5316 bdev_start(bdev); 5317 } 5318 5319 spdk_notify_send("bdev_register", spdk_bdev_get_name(bdev)); 5320 return rc; 5321 } 5322 5323 int 5324 spdk_vbdev_register(struct spdk_bdev *vbdev, struct spdk_bdev **base_bdevs, int base_bdev_count) 5325 { 5326 SPDK_ERRLOG("This function is deprecated. Use spdk_bdev_register() instead.\n"); 5327 return spdk_bdev_register(vbdev); 5328 } 5329 5330 void 5331 spdk_bdev_destruct_done(struct spdk_bdev *bdev, int bdeverrno) 5332 { 5333 if (bdev->internal.unregister_cb != NULL) { 5334 bdev->internal.unregister_cb(bdev->internal.unregister_ctx, bdeverrno); 5335 } 5336 } 5337 5338 static void 5339 _remove_notify(void *arg) 5340 { 5341 struct spdk_bdev_desc *desc = arg; 5342 5343 pthread_mutex_lock(&desc->mutex); 5344 desc->refs--; 5345 5346 if (!desc->closed) { 5347 pthread_mutex_unlock(&desc->mutex); 5348 if (desc->callback.open_with_ext) { 5349 desc->callback.event_fn(SPDK_BDEV_EVENT_REMOVE, desc->bdev, desc->callback.ctx); 5350 } else { 5351 desc->callback.remove_fn(desc->callback.ctx); 5352 } 5353 return; 5354 } else if (0 == desc->refs) { 5355 /* This descriptor was closed after this remove_notify message was sent. 5356 * spdk_bdev_close() could not free the descriptor since this message was 5357 * in flight, so we free it now using bdev_desc_free(). 5358 */ 5359 pthread_mutex_unlock(&desc->mutex); 5360 bdev_desc_free(desc); 5361 return; 5362 } 5363 pthread_mutex_unlock(&desc->mutex); 5364 } 5365 5366 /* Must be called while holding bdev->internal.mutex. 5367 * returns: 0 - bdev removed and ready to be destructed. 5368 * -EBUSY - bdev can't be destructed yet. */ 5369 static int 5370 bdev_unregister_unsafe(struct spdk_bdev *bdev) 5371 { 5372 struct spdk_bdev_desc *desc, *tmp; 5373 int rc = 0; 5374 5375 /* Notify each descriptor about hotremoval */ 5376 TAILQ_FOREACH_SAFE(desc, &bdev->internal.open_descs, link, tmp) { 5377 rc = -EBUSY; 5378 pthread_mutex_lock(&desc->mutex); 5379 /* 5380 * Defer invocation of the event_cb to a separate message that will 5381 * run later on its thread. This ensures this context unwinds and 5382 * we don't recursively unregister this bdev again if the event_cb 5383 * immediately closes its descriptor. 5384 */ 5385 desc->refs++; 5386 spdk_thread_send_msg(desc->thread, _remove_notify, desc); 5387 pthread_mutex_unlock(&desc->mutex); 5388 } 5389 5390 /* If there are no descriptors, proceed removing the bdev */ 5391 if (rc == 0) { 5392 TAILQ_REMOVE(&g_bdev_mgr.bdevs, bdev, internal.link); 5393 SPDK_DEBUGLOG(bdev, "Removing bdev %s from list done\n", bdev->name); 5394 spdk_notify_send("bdev_unregister", spdk_bdev_get_name(bdev)); 5395 } 5396 5397 return rc; 5398 } 5399 5400 void 5401 spdk_bdev_unregister(struct spdk_bdev *bdev, spdk_bdev_unregister_cb cb_fn, void *cb_arg) 5402 { 5403 struct spdk_thread *thread; 5404 int rc; 5405 5406 SPDK_DEBUGLOG(bdev, "Removing bdev %s from list\n", bdev->name); 5407 5408 thread = spdk_get_thread(); 5409 if (!thread) { 5410 /* The user called this from a non-SPDK thread. */ 5411 if (cb_fn != NULL) { 5412 cb_fn(cb_arg, -ENOTSUP); 5413 } 5414 return; 5415 } 5416 5417 pthread_mutex_lock(&g_bdev_mgr.mutex); 5418 pthread_mutex_lock(&bdev->internal.mutex); 5419 if (bdev->internal.status == SPDK_BDEV_STATUS_REMOVING) { 5420 pthread_mutex_unlock(&bdev->internal.mutex); 5421 pthread_mutex_unlock(&g_bdev_mgr.mutex); 5422 if (cb_fn) { 5423 cb_fn(cb_arg, -EBUSY); 5424 } 5425 return; 5426 } 5427 5428 bdev->internal.status = SPDK_BDEV_STATUS_REMOVING; 5429 bdev->internal.unregister_cb = cb_fn; 5430 bdev->internal.unregister_ctx = cb_arg; 5431 5432 /* Call under lock. */ 5433 rc = bdev_unregister_unsafe(bdev); 5434 pthread_mutex_unlock(&bdev->internal.mutex); 5435 pthread_mutex_unlock(&g_bdev_mgr.mutex); 5436 5437 if (rc == 0) { 5438 bdev_fini(bdev); 5439 } 5440 } 5441 5442 static void 5443 bdev_dummy_event_cb(void *remove_ctx) 5444 { 5445 SPDK_DEBUGLOG(bdev, "Bdev remove event received with no remove callback specified"); 5446 } 5447 5448 static int 5449 bdev_start_qos(struct spdk_bdev *bdev) 5450 { 5451 struct set_qos_limit_ctx *ctx; 5452 5453 /* Enable QoS */ 5454 if (bdev->internal.qos && bdev->internal.qos->thread == NULL) { 5455 ctx = calloc(1, sizeof(*ctx)); 5456 if (ctx == NULL) { 5457 SPDK_ERRLOG("Failed to allocate memory for QoS context\n"); 5458 return -ENOMEM; 5459 } 5460 ctx->bdev = bdev; 5461 spdk_for_each_channel(__bdev_to_io_dev(bdev), 5462 bdev_enable_qos_msg, ctx, 5463 bdev_enable_qos_done); 5464 } 5465 5466 return 0; 5467 } 5468 5469 static int 5470 bdev_open(struct spdk_bdev *bdev, bool write, struct spdk_bdev_desc *desc) 5471 { 5472 struct spdk_thread *thread; 5473 int rc = 0; 5474 5475 thread = spdk_get_thread(); 5476 if (!thread) { 5477 SPDK_ERRLOG("Cannot open bdev from non-SPDK thread.\n"); 5478 return -ENOTSUP; 5479 } 5480 5481 SPDK_DEBUGLOG(bdev, "Opening descriptor %p for bdev %s on thread %p\n", desc, bdev->name, 5482 spdk_get_thread()); 5483 5484 desc->bdev = bdev; 5485 desc->thread = thread; 5486 desc->write = write; 5487 5488 pthread_mutex_lock(&bdev->internal.mutex); 5489 if (bdev->internal.status == SPDK_BDEV_STATUS_REMOVING) { 5490 pthread_mutex_unlock(&bdev->internal.mutex); 5491 return -ENODEV; 5492 } 5493 5494 if (write && bdev->internal.claim_module) { 5495 SPDK_ERRLOG("Could not open %s - %s module already claimed it\n", 5496 bdev->name, bdev->internal.claim_module->name); 5497 pthread_mutex_unlock(&bdev->internal.mutex); 5498 return -EPERM; 5499 } 5500 5501 rc = bdev_start_qos(bdev); 5502 if (rc != 0) { 5503 SPDK_ERRLOG("Failed to start QoS on bdev %s\n", bdev->name); 5504 pthread_mutex_unlock(&bdev->internal.mutex); 5505 return rc; 5506 } 5507 5508 TAILQ_INSERT_TAIL(&bdev->internal.open_descs, desc, link); 5509 5510 pthread_mutex_unlock(&bdev->internal.mutex); 5511 5512 return 0; 5513 } 5514 5515 int 5516 spdk_bdev_open(struct spdk_bdev *bdev, bool write, spdk_bdev_remove_cb_t remove_cb, 5517 void *remove_ctx, struct spdk_bdev_desc **_desc) 5518 { 5519 struct spdk_bdev_desc *desc; 5520 int rc; 5521 5522 desc = calloc(1, sizeof(*desc)); 5523 if (desc == NULL) { 5524 SPDK_ERRLOG("Failed to allocate memory for bdev descriptor\n"); 5525 return -ENOMEM; 5526 } 5527 5528 if (remove_cb == NULL) { 5529 remove_cb = bdev_dummy_event_cb; 5530 } 5531 5532 TAILQ_INIT(&desc->pending_media_events); 5533 TAILQ_INIT(&desc->free_media_events); 5534 5535 desc->callback.open_with_ext = false; 5536 desc->callback.remove_fn = remove_cb; 5537 desc->callback.ctx = remove_ctx; 5538 pthread_mutex_init(&desc->mutex, NULL); 5539 5540 pthread_mutex_lock(&g_bdev_mgr.mutex); 5541 5542 rc = bdev_open(bdev, write, desc); 5543 if (rc != 0) { 5544 bdev_desc_free(desc); 5545 desc = NULL; 5546 } 5547 5548 *_desc = desc; 5549 5550 pthread_mutex_unlock(&g_bdev_mgr.mutex); 5551 5552 return rc; 5553 } 5554 5555 int 5556 spdk_bdev_open_ext(const char *bdev_name, bool write, spdk_bdev_event_cb_t event_cb, 5557 void *event_ctx, struct spdk_bdev_desc **_desc) 5558 { 5559 struct spdk_bdev_desc *desc; 5560 struct spdk_bdev *bdev; 5561 unsigned int event_id; 5562 int rc; 5563 5564 if (event_cb == NULL) { 5565 SPDK_ERRLOG("Missing event callback function\n"); 5566 return -EINVAL; 5567 } 5568 5569 pthread_mutex_lock(&g_bdev_mgr.mutex); 5570 5571 bdev = spdk_bdev_get_by_name(bdev_name); 5572 5573 if (bdev == NULL) { 5574 SPDK_NOTICELOG("Currently unable to find bdev with name: %s\n", bdev_name); 5575 pthread_mutex_unlock(&g_bdev_mgr.mutex); 5576 return -ENODEV; 5577 } 5578 5579 desc = calloc(1, sizeof(*desc)); 5580 if (desc == NULL) { 5581 SPDK_ERRLOG("Failed to allocate memory for bdev descriptor\n"); 5582 pthread_mutex_unlock(&g_bdev_mgr.mutex); 5583 return -ENOMEM; 5584 } 5585 5586 TAILQ_INIT(&desc->pending_media_events); 5587 TAILQ_INIT(&desc->free_media_events); 5588 5589 desc->callback.open_with_ext = true; 5590 desc->callback.event_fn = event_cb; 5591 desc->callback.ctx = event_ctx; 5592 pthread_mutex_init(&desc->mutex, NULL); 5593 5594 if (bdev->media_events) { 5595 desc->media_events_buffer = calloc(MEDIA_EVENT_POOL_SIZE, 5596 sizeof(*desc->media_events_buffer)); 5597 if (desc->media_events_buffer == NULL) { 5598 SPDK_ERRLOG("Failed to initialize media event pool\n"); 5599 bdev_desc_free(desc); 5600 pthread_mutex_unlock(&g_bdev_mgr.mutex); 5601 return -ENOMEM; 5602 } 5603 5604 for (event_id = 0; event_id < MEDIA_EVENT_POOL_SIZE; ++event_id) { 5605 TAILQ_INSERT_TAIL(&desc->free_media_events, 5606 &desc->media_events_buffer[event_id], tailq); 5607 } 5608 } 5609 5610 rc = bdev_open(bdev, write, desc); 5611 if (rc != 0) { 5612 bdev_desc_free(desc); 5613 desc = NULL; 5614 } 5615 5616 *_desc = desc; 5617 5618 pthread_mutex_unlock(&g_bdev_mgr.mutex); 5619 5620 return rc; 5621 } 5622 5623 void 5624 spdk_bdev_close(struct spdk_bdev_desc *desc) 5625 { 5626 struct spdk_bdev *bdev = spdk_bdev_desc_get_bdev(desc); 5627 int rc; 5628 5629 SPDK_DEBUGLOG(bdev, "Closing descriptor %p for bdev %s on thread %p\n", desc, bdev->name, 5630 spdk_get_thread()); 5631 5632 assert(desc->thread == spdk_get_thread()); 5633 5634 spdk_poller_unregister(&desc->io_timeout_poller); 5635 5636 pthread_mutex_lock(&bdev->internal.mutex); 5637 pthread_mutex_lock(&desc->mutex); 5638 5639 TAILQ_REMOVE(&bdev->internal.open_descs, desc, link); 5640 5641 desc->closed = true; 5642 5643 if (0 == desc->refs) { 5644 pthread_mutex_unlock(&desc->mutex); 5645 bdev_desc_free(desc); 5646 } else { 5647 pthread_mutex_unlock(&desc->mutex); 5648 } 5649 5650 /* If no more descriptors, kill QoS channel */ 5651 if (bdev->internal.qos && TAILQ_EMPTY(&bdev->internal.open_descs)) { 5652 SPDK_DEBUGLOG(bdev, "Closed last descriptor for bdev %s on thread %p. Stopping QoS.\n", 5653 bdev->name, spdk_get_thread()); 5654 5655 if (bdev_qos_destroy(bdev)) { 5656 /* There isn't anything we can do to recover here. Just let the 5657 * old QoS poller keep running. The QoS handling won't change 5658 * cores when the user allocates a new channel, but it won't break. */ 5659 SPDK_ERRLOG("Unable to shut down QoS poller. It will continue running on the current thread.\n"); 5660 } 5661 } 5662 5663 spdk_bdev_set_qd_sampling_period(bdev, 0); 5664 5665 if (bdev->internal.status == SPDK_BDEV_STATUS_REMOVING && TAILQ_EMPTY(&bdev->internal.open_descs)) { 5666 rc = bdev_unregister_unsafe(bdev); 5667 pthread_mutex_unlock(&bdev->internal.mutex); 5668 5669 if (rc == 0) { 5670 bdev_fini(bdev); 5671 } 5672 } else { 5673 pthread_mutex_unlock(&bdev->internal.mutex); 5674 } 5675 } 5676 5677 int 5678 spdk_bdev_module_claim_bdev(struct spdk_bdev *bdev, struct spdk_bdev_desc *desc, 5679 struct spdk_bdev_module *module) 5680 { 5681 if (bdev->internal.claim_module != NULL) { 5682 SPDK_ERRLOG("bdev %s already claimed by module %s\n", bdev->name, 5683 bdev->internal.claim_module->name); 5684 return -EPERM; 5685 } 5686 5687 if (desc && !desc->write) { 5688 desc->write = true; 5689 } 5690 5691 bdev->internal.claim_module = module; 5692 return 0; 5693 } 5694 5695 void 5696 spdk_bdev_module_release_bdev(struct spdk_bdev *bdev) 5697 { 5698 assert(bdev->internal.claim_module != NULL); 5699 bdev->internal.claim_module = NULL; 5700 } 5701 5702 struct spdk_bdev * 5703 spdk_bdev_desc_get_bdev(struct spdk_bdev_desc *desc) 5704 { 5705 assert(desc != NULL); 5706 return desc->bdev; 5707 } 5708 5709 void 5710 spdk_bdev_io_get_iovec(struct spdk_bdev_io *bdev_io, struct iovec **iovp, int *iovcntp) 5711 { 5712 struct iovec *iovs; 5713 int iovcnt; 5714 5715 if (bdev_io == NULL) { 5716 return; 5717 } 5718 5719 switch (bdev_io->type) { 5720 case SPDK_BDEV_IO_TYPE_READ: 5721 case SPDK_BDEV_IO_TYPE_WRITE: 5722 case SPDK_BDEV_IO_TYPE_ZCOPY: 5723 iovs = bdev_io->u.bdev.iovs; 5724 iovcnt = bdev_io->u.bdev.iovcnt; 5725 break; 5726 default: 5727 iovs = NULL; 5728 iovcnt = 0; 5729 break; 5730 } 5731 5732 if (iovp) { 5733 *iovp = iovs; 5734 } 5735 if (iovcntp) { 5736 *iovcntp = iovcnt; 5737 } 5738 } 5739 5740 void * 5741 spdk_bdev_io_get_md_buf(struct spdk_bdev_io *bdev_io) 5742 { 5743 if (bdev_io == NULL) { 5744 return NULL; 5745 } 5746 5747 if (!spdk_bdev_is_md_separate(bdev_io->bdev)) { 5748 return NULL; 5749 } 5750 5751 if (bdev_io->type == SPDK_BDEV_IO_TYPE_READ || 5752 bdev_io->type == SPDK_BDEV_IO_TYPE_WRITE) { 5753 return bdev_io->u.bdev.md_buf; 5754 } 5755 5756 return NULL; 5757 } 5758 5759 void * 5760 spdk_bdev_io_get_cb_arg(struct spdk_bdev_io *bdev_io) 5761 { 5762 if (bdev_io == NULL) { 5763 assert(false); 5764 return NULL; 5765 } 5766 5767 return bdev_io->internal.caller_ctx; 5768 } 5769 5770 void 5771 spdk_bdev_module_list_add(struct spdk_bdev_module *bdev_module) 5772 { 5773 5774 if (spdk_bdev_module_list_find(bdev_module->name)) { 5775 SPDK_ERRLOG("ERROR: module '%s' already registered.\n", bdev_module->name); 5776 assert(false); 5777 } 5778 5779 /* 5780 * Modules with examine callbacks must be initialized first, so they are 5781 * ready to handle examine callbacks from later modules that will 5782 * register physical bdevs. 5783 */ 5784 if (bdev_module->examine_config != NULL || bdev_module->examine_disk != NULL) { 5785 TAILQ_INSERT_HEAD(&g_bdev_mgr.bdev_modules, bdev_module, internal.tailq); 5786 } else { 5787 TAILQ_INSERT_TAIL(&g_bdev_mgr.bdev_modules, bdev_module, internal.tailq); 5788 } 5789 } 5790 5791 struct spdk_bdev_module * 5792 spdk_bdev_module_list_find(const char *name) 5793 { 5794 struct spdk_bdev_module *bdev_module; 5795 5796 TAILQ_FOREACH(bdev_module, &g_bdev_mgr.bdev_modules, internal.tailq) { 5797 if (strcmp(name, bdev_module->name) == 0) { 5798 break; 5799 } 5800 } 5801 5802 return bdev_module; 5803 } 5804 5805 static void 5806 bdev_write_zero_buffer_next(void *_bdev_io) 5807 { 5808 struct spdk_bdev_io *bdev_io = _bdev_io; 5809 uint64_t num_bytes, num_blocks; 5810 void *md_buf = NULL; 5811 int rc; 5812 5813 num_bytes = spdk_min(_bdev_get_block_size_with_md(bdev_io->bdev) * 5814 bdev_io->u.bdev.split_remaining_num_blocks, 5815 ZERO_BUFFER_SIZE); 5816 num_blocks = num_bytes / _bdev_get_block_size_with_md(bdev_io->bdev); 5817 5818 if (spdk_bdev_is_md_separate(bdev_io->bdev)) { 5819 md_buf = (char *)g_bdev_mgr.zero_buffer + 5820 spdk_bdev_get_block_size(bdev_io->bdev) * num_blocks; 5821 } 5822 5823 rc = bdev_write_blocks_with_md(bdev_io->internal.desc, 5824 spdk_io_channel_from_ctx(bdev_io->internal.ch), 5825 g_bdev_mgr.zero_buffer, md_buf, 5826 bdev_io->u.bdev.split_current_offset_blocks, num_blocks, 5827 bdev_write_zero_buffer_done, bdev_io); 5828 if (rc == 0) { 5829 bdev_io->u.bdev.split_remaining_num_blocks -= num_blocks; 5830 bdev_io->u.bdev.split_current_offset_blocks += num_blocks; 5831 } else if (rc == -ENOMEM) { 5832 bdev_queue_io_wait_with_cb(bdev_io, bdev_write_zero_buffer_next); 5833 } else { 5834 bdev_io->internal.status = SPDK_BDEV_IO_STATUS_FAILED; 5835 bdev_io->internal.cb(bdev_io, false, bdev_io->internal.caller_ctx); 5836 } 5837 } 5838 5839 static void 5840 bdev_write_zero_buffer_done(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg) 5841 { 5842 struct spdk_bdev_io *parent_io = cb_arg; 5843 5844 spdk_bdev_free_io(bdev_io); 5845 5846 if (!success) { 5847 parent_io->internal.status = SPDK_BDEV_IO_STATUS_FAILED; 5848 parent_io->internal.cb(parent_io, false, parent_io->internal.caller_ctx); 5849 return; 5850 } 5851 5852 if (parent_io->u.bdev.split_remaining_num_blocks == 0) { 5853 parent_io->internal.status = SPDK_BDEV_IO_STATUS_SUCCESS; 5854 parent_io->internal.cb(parent_io, true, parent_io->internal.caller_ctx); 5855 return; 5856 } 5857 5858 bdev_write_zero_buffer_next(parent_io); 5859 } 5860 5861 static void 5862 bdev_set_qos_limit_done(struct set_qos_limit_ctx *ctx, int status) 5863 { 5864 pthread_mutex_lock(&ctx->bdev->internal.mutex); 5865 ctx->bdev->internal.qos_mod_in_progress = false; 5866 pthread_mutex_unlock(&ctx->bdev->internal.mutex); 5867 5868 if (ctx->cb_fn) { 5869 ctx->cb_fn(ctx->cb_arg, status); 5870 } 5871 free(ctx); 5872 } 5873 5874 static void 5875 bdev_disable_qos_done(void *cb_arg) 5876 { 5877 struct set_qos_limit_ctx *ctx = cb_arg; 5878 struct spdk_bdev *bdev = ctx->bdev; 5879 struct spdk_bdev_io *bdev_io; 5880 struct spdk_bdev_qos *qos; 5881 5882 pthread_mutex_lock(&bdev->internal.mutex); 5883 qos = bdev->internal.qos; 5884 bdev->internal.qos = NULL; 5885 pthread_mutex_unlock(&bdev->internal.mutex); 5886 5887 while (!TAILQ_EMPTY(&qos->queued)) { 5888 /* Send queued I/O back to their original thread for resubmission. */ 5889 bdev_io = TAILQ_FIRST(&qos->queued); 5890 TAILQ_REMOVE(&qos->queued, bdev_io, internal.link); 5891 5892 if (bdev_io->internal.io_submit_ch) { 5893 /* 5894 * Channel was changed when sending it to the QoS thread - change it back 5895 * before sending it back to the original thread. 5896 */ 5897 bdev_io->internal.ch = bdev_io->internal.io_submit_ch; 5898 bdev_io->internal.io_submit_ch = NULL; 5899 } 5900 5901 spdk_thread_send_msg(spdk_bdev_io_get_thread(bdev_io), 5902 _bdev_io_submit, bdev_io); 5903 } 5904 5905 if (qos->thread != NULL) { 5906 spdk_put_io_channel(spdk_io_channel_from_ctx(qos->ch)); 5907 spdk_poller_unregister(&qos->poller); 5908 } 5909 5910 free(qos); 5911 5912 bdev_set_qos_limit_done(ctx, 0); 5913 } 5914 5915 static void 5916 bdev_disable_qos_msg_done(struct spdk_io_channel_iter *i, int status) 5917 { 5918 void *io_device = spdk_io_channel_iter_get_io_device(i); 5919 struct spdk_bdev *bdev = __bdev_from_io_dev(io_device); 5920 struct set_qos_limit_ctx *ctx = spdk_io_channel_iter_get_ctx(i); 5921 struct spdk_thread *thread; 5922 5923 pthread_mutex_lock(&bdev->internal.mutex); 5924 thread = bdev->internal.qos->thread; 5925 pthread_mutex_unlock(&bdev->internal.mutex); 5926 5927 if (thread != NULL) { 5928 spdk_thread_send_msg(thread, bdev_disable_qos_done, ctx); 5929 } else { 5930 bdev_disable_qos_done(ctx); 5931 } 5932 } 5933 5934 static void 5935 bdev_disable_qos_msg(struct spdk_io_channel_iter *i) 5936 { 5937 struct spdk_io_channel *ch = spdk_io_channel_iter_get_channel(i); 5938 struct spdk_bdev_channel *bdev_ch = spdk_io_channel_get_ctx(ch); 5939 5940 bdev_ch->flags &= ~BDEV_CH_QOS_ENABLED; 5941 5942 spdk_for_each_channel_continue(i, 0); 5943 } 5944 5945 static void 5946 bdev_update_qos_rate_limit_msg(void *cb_arg) 5947 { 5948 struct set_qos_limit_ctx *ctx = cb_arg; 5949 struct spdk_bdev *bdev = ctx->bdev; 5950 5951 pthread_mutex_lock(&bdev->internal.mutex); 5952 bdev_qos_update_max_quota_per_timeslice(bdev->internal.qos); 5953 pthread_mutex_unlock(&bdev->internal.mutex); 5954 5955 bdev_set_qos_limit_done(ctx, 0); 5956 } 5957 5958 static void 5959 bdev_enable_qos_msg(struct spdk_io_channel_iter *i) 5960 { 5961 void *io_device = spdk_io_channel_iter_get_io_device(i); 5962 struct spdk_bdev *bdev = __bdev_from_io_dev(io_device); 5963 struct spdk_io_channel *ch = spdk_io_channel_iter_get_channel(i); 5964 struct spdk_bdev_channel *bdev_ch = spdk_io_channel_get_ctx(ch); 5965 5966 pthread_mutex_lock(&bdev->internal.mutex); 5967 bdev_enable_qos(bdev, bdev_ch); 5968 pthread_mutex_unlock(&bdev->internal.mutex); 5969 spdk_for_each_channel_continue(i, 0); 5970 } 5971 5972 static void 5973 bdev_enable_qos_done(struct spdk_io_channel_iter *i, int status) 5974 { 5975 struct set_qos_limit_ctx *ctx = spdk_io_channel_iter_get_ctx(i); 5976 5977 bdev_set_qos_limit_done(ctx, status); 5978 } 5979 5980 static void 5981 bdev_set_qos_rate_limits(struct spdk_bdev *bdev, uint64_t *limits) 5982 { 5983 int i; 5984 5985 assert(bdev->internal.qos != NULL); 5986 5987 for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) { 5988 if (limits[i] != SPDK_BDEV_QOS_LIMIT_NOT_DEFINED) { 5989 bdev->internal.qos->rate_limits[i].limit = limits[i]; 5990 5991 if (limits[i] == 0) { 5992 bdev->internal.qos->rate_limits[i].limit = 5993 SPDK_BDEV_QOS_LIMIT_NOT_DEFINED; 5994 } 5995 } 5996 } 5997 } 5998 5999 void 6000 spdk_bdev_set_qos_rate_limits(struct spdk_bdev *bdev, uint64_t *limits, 6001 void (*cb_fn)(void *cb_arg, int status), void *cb_arg) 6002 { 6003 struct set_qos_limit_ctx *ctx; 6004 uint32_t limit_set_complement; 6005 uint64_t min_limit_per_sec; 6006 int i; 6007 bool disable_rate_limit = true; 6008 6009 for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) { 6010 if (limits[i] == SPDK_BDEV_QOS_LIMIT_NOT_DEFINED) { 6011 continue; 6012 } 6013 6014 if (limits[i] > 0) { 6015 disable_rate_limit = false; 6016 } 6017 6018 if (bdev_qos_is_iops_rate_limit(i) == true) { 6019 min_limit_per_sec = SPDK_BDEV_QOS_MIN_IOS_PER_SEC; 6020 } else { 6021 /* Change from megabyte to byte rate limit */ 6022 limits[i] = limits[i] * 1024 * 1024; 6023 min_limit_per_sec = SPDK_BDEV_QOS_MIN_BYTES_PER_SEC; 6024 } 6025 6026 limit_set_complement = limits[i] % min_limit_per_sec; 6027 if (limit_set_complement) { 6028 SPDK_ERRLOG("Requested rate limit %" PRIu64 " is not a multiple of %" PRIu64 "\n", 6029 limits[i], min_limit_per_sec); 6030 limits[i] += min_limit_per_sec - limit_set_complement; 6031 SPDK_ERRLOG("Round up the rate limit to %" PRIu64 "\n", limits[i]); 6032 } 6033 } 6034 6035 ctx = calloc(1, sizeof(*ctx)); 6036 if (ctx == NULL) { 6037 cb_fn(cb_arg, -ENOMEM); 6038 return; 6039 } 6040 6041 ctx->cb_fn = cb_fn; 6042 ctx->cb_arg = cb_arg; 6043 ctx->bdev = bdev; 6044 6045 pthread_mutex_lock(&bdev->internal.mutex); 6046 if (bdev->internal.qos_mod_in_progress) { 6047 pthread_mutex_unlock(&bdev->internal.mutex); 6048 free(ctx); 6049 cb_fn(cb_arg, -EAGAIN); 6050 return; 6051 } 6052 bdev->internal.qos_mod_in_progress = true; 6053 6054 if (disable_rate_limit == true && bdev->internal.qos) { 6055 for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) { 6056 if (limits[i] == SPDK_BDEV_QOS_LIMIT_NOT_DEFINED && 6057 (bdev->internal.qos->rate_limits[i].limit > 0 && 6058 bdev->internal.qos->rate_limits[i].limit != 6059 SPDK_BDEV_QOS_LIMIT_NOT_DEFINED)) { 6060 disable_rate_limit = false; 6061 break; 6062 } 6063 } 6064 } 6065 6066 if (disable_rate_limit == false) { 6067 if (bdev->internal.qos == NULL) { 6068 bdev->internal.qos = calloc(1, sizeof(*bdev->internal.qos)); 6069 if (!bdev->internal.qos) { 6070 pthread_mutex_unlock(&bdev->internal.mutex); 6071 SPDK_ERRLOG("Unable to allocate memory for QoS tracking\n"); 6072 bdev_set_qos_limit_done(ctx, -ENOMEM); 6073 return; 6074 } 6075 } 6076 6077 if (bdev->internal.qos->thread == NULL) { 6078 /* Enabling */ 6079 bdev_set_qos_rate_limits(bdev, limits); 6080 6081 spdk_for_each_channel(__bdev_to_io_dev(bdev), 6082 bdev_enable_qos_msg, ctx, 6083 bdev_enable_qos_done); 6084 } else { 6085 /* Updating */ 6086 bdev_set_qos_rate_limits(bdev, limits); 6087 6088 spdk_thread_send_msg(bdev->internal.qos->thread, 6089 bdev_update_qos_rate_limit_msg, ctx); 6090 } 6091 } else { 6092 if (bdev->internal.qos != NULL) { 6093 bdev_set_qos_rate_limits(bdev, limits); 6094 6095 /* Disabling */ 6096 spdk_for_each_channel(__bdev_to_io_dev(bdev), 6097 bdev_disable_qos_msg, ctx, 6098 bdev_disable_qos_msg_done); 6099 } else { 6100 pthread_mutex_unlock(&bdev->internal.mutex); 6101 bdev_set_qos_limit_done(ctx, 0); 6102 return; 6103 } 6104 } 6105 6106 pthread_mutex_unlock(&bdev->internal.mutex); 6107 } 6108 6109 struct spdk_bdev_histogram_ctx { 6110 spdk_bdev_histogram_status_cb cb_fn; 6111 void *cb_arg; 6112 struct spdk_bdev *bdev; 6113 int status; 6114 }; 6115 6116 static void 6117 bdev_histogram_disable_channel_cb(struct spdk_io_channel_iter *i, int status) 6118 { 6119 struct spdk_bdev_histogram_ctx *ctx = spdk_io_channel_iter_get_ctx(i); 6120 6121 pthread_mutex_lock(&ctx->bdev->internal.mutex); 6122 ctx->bdev->internal.histogram_in_progress = false; 6123 pthread_mutex_unlock(&ctx->bdev->internal.mutex); 6124 ctx->cb_fn(ctx->cb_arg, ctx->status); 6125 free(ctx); 6126 } 6127 6128 static void 6129 bdev_histogram_disable_channel(struct spdk_io_channel_iter *i) 6130 { 6131 struct spdk_io_channel *_ch = spdk_io_channel_iter_get_channel(i); 6132 struct spdk_bdev_channel *ch = spdk_io_channel_get_ctx(_ch); 6133 6134 if (ch->histogram != NULL) { 6135 spdk_histogram_data_free(ch->histogram); 6136 ch->histogram = NULL; 6137 } 6138 spdk_for_each_channel_continue(i, 0); 6139 } 6140 6141 static void 6142 bdev_histogram_enable_channel_cb(struct spdk_io_channel_iter *i, int status) 6143 { 6144 struct spdk_bdev_histogram_ctx *ctx = spdk_io_channel_iter_get_ctx(i); 6145 6146 if (status != 0) { 6147 ctx->status = status; 6148 ctx->bdev->internal.histogram_enabled = false; 6149 spdk_for_each_channel(__bdev_to_io_dev(ctx->bdev), bdev_histogram_disable_channel, ctx, 6150 bdev_histogram_disable_channel_cb); 6151 } else { 6152 pthread_mutex_lock(&ctx->bdev->internal.mutex); 6153 ctx->bdev->internal.histogram_in_progress = false; 6154 pthread_mutex_unlock(&ctx->bdev->internal.mutex); 6155 ctx->cb_fn(ctx->cb_arg, ctx->status); 6156 free(ctx); 6157 } 6158 } 6159 6160 static void 6161 bdev_histogram_enable_channel(struct spdk_io_channel_iter *i) 6162 { 6163 struct spdk_io_channel *_ch = spdk_io_channel_iter_get_channel(i); 6164 struct spdk_bdev_channel *ch = spdk_io_channel_get_ctx(_ch); 6165 int status = 0; 6166 6167 if (ch->histogram == NULL) { 6168 ch->histogram = spdk_histogram_data_alloc(); 6169 if (ch->histogram == NULL) { 6170 status = -ENOMEM; 6171 } 6172 } 6173 6174 spdk_for_each_channel_continue(i, status); 6175 } 6176 6177 void 6178 spdk_bdev_histogram_enable(struct spdk_bdev *bdev, spdk_bdev_histogram_status_cb cb_fn, 6179 void *cb_arg, bool enable) 6180 { 6181 struct spdk_bdev_histogram_ctx *ctx; 6182 6183 ctx = calloc(1, sizeof(struct spdk_bdev_histogram_ctx)); 6184 if (ctx == NULL) { 6185 cb_fn(cb_arg, -ENOMEM); 6186 return; 6187 } 6188 6189 ctx->bdev = bdev; 6190 ctx->status = 0; 6191 ctx->cb_fn = cb_fn; 6192 ctx->cb_arg = cb_arg; 6193 6194 pthread_mutex_lock(&bdev->internal.mutex); 6195 if (bdev->internal.histogram_in_progress) { 6196 pthread_mutex_unlock(&bdev->internal.mutex); 6197 free(ctx); 6198 cb_fn(cb_arg, -EAGAIN); 6199 return; 6200 } 6201 6202 bdev->internal.histogram_in_progress = true; 6203 pthread_mutex_unlock(&bdev->internal.mutex); 6204 6205 bdev->internal.histogram_enabled = enable; 6206 6207 if (enable) { 6208 /* Allocate histogram for each channel */ 6209 spdk_for_each_channel(__bdev_to_io_dev(bdev), bdev_histogram_enable_channel, ctx, 6210 bdev_histogram_enable_channel_cb); 6211 } else { 6212 spdk_for_each_channel(__bdev_to_io_dev(bdev), bdev_histogram_disable_channel, ctx, 6213 bdev_histogram_disable_channel_cb); 6214 } 6215 } 6216 6217 struct spdk_bdev_histogram_data_ctx { 6218 spdk_bdev_histogram_data_cb cb_fn; 6219 void *cb_arg; 6220 struct spdk_bdev *bdev; 6221 /** merged histogram data from all channels */ 6222 struct spdk_histogram_data *histogram; 6223 }; 6224 6225 static void 6226 bdev_histogram_get_channel_cb(struct spdk_io_channel_iter *i, int status) 6227 { 6228 struct spdk_bdev_histogram_data_ctx *ctx = spdk_io_channel_iter_get_ctx(i); 6229 6230 ctx->cb_fn(ctx->cb_arg, status, ctx->histogram); 6231 free(ctx); 6232 } 6233 6234 static void 6235 bdev_histogram_get_channel(struct spdk_io_channel_iter *i) 6236 { 6237 struct spdk_io_channel *_ch = spdk_io_channel_iter_get_channel(i); 6238 struct spdk_bdev_channel *ch = spdk_io_channel_get_ctx(_ch); 6239 struct spdk_bdev_histogram_data_ctx *ctx = spdk_io_channel_iter_get_ctx(i); 6240 int status = 0; 6241 6242 if (ch->histogram == NULL) { 6243 status = -EFAULT; 6244 } else { 6245 spdk_histogram_data_merge(ctx->histogram, ch->histogram); 6246 } 6247 6248 spdk_for_each_channel_continue(i, status); 6249 } 6250 6251 void 6252 spdk_bdev_histogram_get(struct spdk_bdev *bdev, struct spdk_histogram_data *histogram, 6253 spdk_bdev_histogram_data_cb cb_fn, 6254 void *cb_arg) 6255 { 6256 struct spdk_bdev_histogram_data_ctx *ctx; 6257 6258 ctx = calloc(1, sizeof(struct spdk_bdev_histogram_data_ctx)); 6259 if (ctx == NULL) { 6260 cb_fn(cb_arg, -ENOMEM, NULL); 6261 return; 6262 } 6263 6264 ctx->bdev = bdev; 6265 ctx->cb_fn = cb_fn; 6266 ctx->cb_arg = cb_arg; 6267 6268 ctx->histogram = histogram; 6269 6270 spdk_for_each_channel(__bdev_to_io_dev(bdev), bdev_histogram_get_channel, ctx, 6271 bdev_histogram_get_channel_cb); 6272 } 6273 6274 size_t 6275 spdk_bdev_get_media_events(struct spdk_bdev_desc *desc, struct spdk_bdev_media_event *events, 6276 size_t max_events) 6277 { 6278 struct media_event_entry *entry; 6279 size_t num_events = 0; 6280 6281 for (; num_events < max_events; ++num_events) { 6282 entry = TAILQ_FIRST(&desc->pending_media_events); 6283 if (entry == NULL) { 6284 break; 6285 } 6286 6287 events[num_events] = entry->event; 6288 TAILQ_REMOVE(&desc->pending_media_events, entry, tailq); 6289 TAILQ_INSERT_TAIL(&desc->free_media_events, entry, tailq); 6290 } 6291 6292 return num_events; 6293 } 6294 6295 int 6296 spdk_bdev_push_media_events(struct spdk_bdev *bdev, const struct spdk_bdev_media_event *events, 6297 size_t num_events) 6298 { 6299 struct spdk_bdev_desc *desc; 6300 struct media_event_entry *entry; 6301 size_t event_id; 6302 int rc = 0; 6303 6304 assert(bdev->media_events); 6305 6306 pthread_mutex_lock(&bdev->internal.mutex); 6307 TAILQ_FOREACH(desc, &bdev->internal.open_descs, link) { 6308 if (desc->write) { 6309 break; 6310 } 6311 } 6312 6313 if (desc == NULL || desc->media_events_buffer == NULL) { 6314 rc = -ENODEV; 6315 goto out; 6316 } 6317 6318 for (event_id = 0; event_id < num_events; ++event_id) { 6319 entry = TAILQ_FIRST(&desc->free_media_events); 6320 if (entry == NULL) { 6321 break; 6322 } 6323 6324 TAILQ_REMOVE(&desc->free_media_events, entry, tailq); 6325 TAILQ_INSERT_TAIL(&desc->pending_media_events, entry, tailq); 6326 entry->event = events[event_id]; 6327 } 6328 6329 rc = event_id; 6330 out: 6331 pthread_mutex_unlock(&bdev->internal.mutex); 6332 return rc; 6333 } 6334 6335 void 6336 spdk_bdev_notify_media_management(struct spdk_bdev *bdev) 6337 { 6338 struct spdk_bdev_desc *desc; 6339 6340 pthread_mutex_lock(&bdev->internal.mutex); 6341 TAILQ_FOREACH(desc, &bdev->internal.open_descs, link) { 6342 if (!TAILQ_EMPTY(&desc->pending_media_events)) { 6343 desc->callback.event_fn(SPDK_BDEV_EVENT_MEDIA_MANAGEMENT, bdev, 6344 desc->callback.ctx); 6345 } 6346 } 6347 pthread_mutex_unlock(&bdev->internal.mutex); 6348 } 6349 6350 struct locked_lba_range_ctx { 6351 struct lba_range range; 6352 struct spdk_bdev *bdev; 6353 struct lba_range *current_range; 6354 struct lba_range *owner_range; 6355 struct spdk_poller *poller; 6356 lock_range_cb cb_fn; 6357 void *cb_arg; 6358 }; 6359 6360 static void 6361 bdev_lock_error_cleanup_cb(struct spdk_io_channel_iter *i, int status) 6362 { 6363 struct locked_lba_range_ctx *ctx = spdk_io_channel_iter_get_ctx(i); 6364 6365 ctx->cb_fn(ctx->cb_arg, -ENOMEM); 6366 free(ctx); 6367 } 6368 6369 static void 6370 bdev_unlock_lba_range_get_channel(struct spdk_io_channel_iter *i); 6371 6372 static void 6373 bdev_lock_lba_range_cb(struct spdk_io_channel_iter *i, int status) 6374 { 6375 struct locked_lba_range_ctx *ctx = spdk_io_channel_iter_get_ctx(i); 6376 struct spdk_bdev *bdev = ctx->bdev; 6377 6378 if (status == -ENOMEM) { 6379 /* One of the channels could not allocate a range object. 6380 * So we have to go back and clean up any ranges that were 6381 * allocated successfully before we return error status to 6382 * the caller. We can reuse the unlock function to do that 6383 * clean up. 6384 */ 6385 spdk_for_each_channel(__bdev_to_io_dev(bdev), 6386 bdev_unlock_lba_range_get_channel, ctx, 6387 bdev_lock_error_cleanup_cb); 6388 return; 6389 } 6390 6391 /* All channels have locked this range and no I/O overlapping the range 6392 * are outstanding! Set the owner_ch for the range object for the 6393 * locking channel, so that this channel will know that it is allowed 6394 * to write to this range. 6395 */ 6396 ctx->owner_range->owner_ch = ctx->range.owner_ch; 6397 ctx->cb_fn(ctx->cb_arg, status); 6398 6399 /* Don't free the ctx here. Its range is in the bdev's global list of 6400 * locked ranges still, and will be removed and freed when this range 6401 * is later unlocked. 6402 */ 6403 } 6404 6405 static int 6406 bdev_lock_lba_range_check_io(void *_i) 6407 { 6408 struct spdk_io_channel_iter *i = _i; 6409 struct spdk_io_channel *_ch = spdk_io_channel_iter_get_channel(i); 6410 struct spdk_bdev_channel *ch = spdk_io_channel_get_ctx(_ch); 6411 struct locked_lba_range_ctx *ctx = spdk_io_channel_iter_get_ctx(i); 6412 struct lba_range *range = ctx->current_range; 6413 struct spdk_bdev_io *bdev_io; 6414 6415 spdk_poller_unregister(&ctx->poller); 6416 6417 /* The range is now in the locked_ranges, so no new IO can be submitted to this 6418 * range. But we need to wait until any outstanding IO overlapping with this range 6419 * are completed. 6420 */ 6421 TAILQ_FOREACH(bdev_io, &ch->io_submitted, internal.ch_link) { 6422 if (bdev_io_range_is_locked(bdev_io, range)) { 6423 ctx->poller = SPDK_POLLER_REGISTER(bdev_lock_lba_range_check_io, i, 100); 6424 return SPDK_POLLER_BUSY; 6425 } 6426 } 6427 6428 spdk_for_each_channel_continue(i, 0); 6429 return SPDK_POLLER_BUSY; 6430 } 6431 6432 static void 6433 bdev_lock_lba_range_get_channel(struct spdk_io_channel_iter *i) 6434 { 6435 struct spdk_io_channel *_ch = spdk_io_channel_iter_get_channel(i); 6436 struct spdk_bdev_channel *ch = spdk_io_channel_get_ctx(_ch); 6437 struct locked_lba_range_ctx *ctx = spdk_io_channel_iter_get_ctx(i); 6438 struct lba_range *range; 6439 6440 TAILQ_FOREACH(range, &ch->locked_ranges, tailq) { 6441 if (range->length == ctx->range.length && 6442 range->offset == ctx->range.offset && 6443 range->locked_ctx == ctx->range.locked_ctx) { 6444 /* This range already exists on this channel, so don't add 6445 * it again. This can happen when a new channel is created 6446 * while the for_each_channel operation is in progress. 6447 * Do not check for outstanding I/O in that case, since the 6448 * range was locked before any I/O could be submitted to the 6449 * new channel. 6450 */ 6451 spdk_for_each_channel_continue(i, 0); 6452 return; 6453 } 6454 } 6455 6456 range = calloc(1, sizeof(*range)); 6457 if (range == NULL) { 6458 spdk_for_each_channel_continue(i, -ENOMEM); 6459 return; 6460 } 6461 6462 range->length = ctx->range.length; 6463 range->offset = ctx->range.offset; 6464 range->locked_ctx = ctx->range.locked_ctx; 6465 ctx->current_range = range; 6466 if (ctx->range.owner_ch == ch) { 6467 /* This is the range object for the channel that will hold 6468 * the lock. Store it in the ctx object so that we can easily 6469 * set its owner_ch after the lock is finally acquired. 6470 */ 6471 ctx->owner_range = range; 6472 } 6473 TAILQ_INSERT_TAIL(&ch->locked_ranges, range, tailq); 6474 bdev_lock_lba_range_check_io(i); 6475 } 6476 6477 static void 6478 bdev_lock_lba_range_ctx(struct spdk_bdev *bdev, struct locked_lba_range_ctx *ctx) 6479 { 6480 assert(spdk_get_thread() == ctx->range.owner_ch->channel->thread); 6481 6482 /* We will add a copy of this range to each channel now. */ 6483 spdk_for_each_channel(__bdev_to_io_dev(bdev), bdev_lock_lba_range_get_channel, ctx, 6484 bdev_lock_lba_range_cb); 6485 } 6486 6487 static bool 6488 bdev_lba_range_overlaps_tailq(struct lba_range *range, lba_range_tailq_t *tailq) 6489 { 6490 struct lba_range *r; 6491 6492 TAILQ_FOREACH(r, tailq, tailq) { 6493 if (bdev_lba_range_overlapped(range, r)) { 6494 return true; 6495 } 6496 } 6497 return false; 6498 } 6499 6500 static int 6501 bdev_lock_lba_range(struct spdk_bdev_desc *desc, struct spdk_io_channel *_ch, 6502 uint64_t offset, uint64_t length, 6503 lock_range_cb cb_fn, void *cb_arg) 6504 { 6505 struct spdk_bdev *bdev = spdk_bdev_desc_get_bdev(desc); 6506 struct spdk_bdev_channel *ch = spdk_io_channel_get_ctx(_ch); 6507 struct locked_lba_range_ctx *ctx; 6508 6509 if (cb_arg == NULL) { 6510 SPDK_ERRLOG("cb_arg must not be NULL\n"); 6511 return -EINVAL; 6512 } 6513 6514 ctx = calloc(1, sizeof(*ctx)); 6515 if (ctx == NULL) { 6516 return -ENOMEM; 6517 } 6518 6519 ctx->range.offset = offset; 6520 ctx->range.length = length; 6521 ctx->range.owner_ch = ch; 6522 ctx->range.locked_ctx = cb_arg; 6523 ctx->bdev = bdev; 6524 ctx->cb_fn = cb_fn; 6525 ctx->cb_arg = cb_arg; 6526 6527 pthread_mutex_lock(&bdev->internal.mutex); 6528 if (bdev_lba_range_overlaps_tailq(&ctx->range, &bdev->internal.locked_ranges)) { 6529 /* There is an active lock overlapping with this range. 6530 * Put it on the pending list until this range no 6531 * longer overlaps with another. 6532 */ 6533 TAILQ_INSERT_TAIL(&bdev->internal.pending_locked_ranges, &ctx->range, tailq); 6534 } else { 6535 TAILQ_INSERT_TAIL(&bdev->internal.locked_ranges, &ctx->range, tailq); 6536 bdev_lock_lba_range_ctx(bdev, ctx); 6537 } 6538 pthread_mutex_unlock(&bdev->internal.mutex); 6539 return 0; 6540 } 6541 6542 static void 6543 bdev_lock_lba_range_ctx_msg(void *_ctx) 6544 { 6545 struct locked_lba_range_ctx *ctx = _ctx; 6546 6547 bdev_lock_lba_range_ctx(ctx->bdev, ctx); 6548 } 6549 6550 static void 6551 bdev_unlock_lba_range_cb(struct spdk_io_channel_iter *i, int status) 6552 { 6553 struct locked_lba_range_ctx *ctx = spdk_io_channel_iter_get_ctx(i); 6554 struct locked_lba_range_ctx *pending_ctx; 6555 struct spdk_bdev_channel *ch = ctx->range.owner_ch; 6556 struct spdk_bdev *bdev = ch->bdev; 6557 struct lba_range *range, *tmp; 6558 6559 pthread_mutex_lock(&bdev->internal.mutex); 6560 /* Check if there are any pending locked ranges that overlap with this range 6561 * that was just unlocked. If there are, check that it doesn't overlap with any 6562 * other locked ranges before calling bdev_lock_lba_range_ctx which will start 6563 * the lock process. 6564 */ 6565 TAILQ_FOREACH_SAFE(range, &bdev->internal.pending_locked_ranges, tailq, tmp) { 6566 if (bdev_lba_range_overlapped(range, &ctx->range) && 6567 !bdev_lba_range_overlaps_tailq(range, &bdev->internal.locked_ranges)) { 6568 TAILQ_REMOVE(&bdev->internal.pending_locked_ranges, range, tailq); 6569 pending_ctx = SPDK_CONTAINEROF(range, struct locked_lba_range_ctx, range); 6570 TAILQ_INSERT_TAIL(&bdev->internal.locked_ranges, range, tailq); 6571 spdk_thread_send_msg(pending_ctx->range.owner_ch->channel->thread, 6572 bdev_lock_lba_range_ctx_msg, pending_ctx); 6573 } 6574 } 6575 pthread_mutex_unlock(&bdev->internal.mutex); 6576 6577 ctx->cb_fn(ctx->cb_arg, status); 6578 free(ctx); 6579 } 6580 6581 static void 6582 bdev_unlock_lba_range_get_channel(struct spdk_io_channel_iter *i) 6583 { 6584 struct spdk_io_channel *_ch = spdk_io_channel_iter_get_channel(i); 6585 struct spdk_bdev_channel *ch = spdk_io_channel_get_ctx(_ch); 6586 struct locked_lba_range_ctx *ctx = spdk_io_channel_iter_get_ctx(i); 6587 TAILQ_HEAD(, spdk_bdev_io) io_locked; 6588 struct spdk_bdev_io *bdev_io; 6589 struct lba_range *range; 6590 6591 TAILQ_FOREACH(range, &ch->locked_ranges, tailq) { 6592 if (ctx->range.offset == range->offset && 6593 ctx->range.length == range->length && 6594 ctx->range.locked_ctx == range->locked_ctx) { 6595 TAILQ_REMOVE(&ch->locked_ranges, range, tailq); 6596 free(range); 6597 break; 6598 } 6599 } 6600 6601 /* Note: we should almost always be able to assert that the range specified 6602 * was found. But there are some very rare corner cases where a new channel 6603 * gets created simultaneously with a range unlock, where this function 6604 * would execute on that new channel and wouldn't have the range. 6605 * We also use this to clean up range allocations when a later allocation 6606 * fails in the locking path. 6607 * So we can't actually assert() here. 6608 */ 6609 6610 /* Swap the locked IO into a temporary list, and then try to submit them again. 6611 * We could hyper-optimize this to only resubmit locked I/O that overlap 6612 * with the range that was just unlocked, but this isn't a performance path so 6613 * we go for simplicity here. 6614 */ 6615 TAILQ_INIT(&io_locked); 6616 TAILQ_SWAP(&ch->io_locked, &io_locked, spdk_bdev_io, internal.ch_link); 6617 while (!TAILQ_EMPTY(&io_locked)) { 6618 bdev_io = TAILQ_FIRST(&io_locked); 6619 TAILQ_REMOVE(&io_locked, bdev_io, internal.ch_link); 6620 bdev_io_submit(bdev_io); 6621 } 6622 6623 spdk_for_each_channel_continue(i, 0); 6624 } 6625 6626 static int 6627 bdev_unlock_lba_range(struct spdk_bdev_desc *desc, struct spdk_io_channel *_ch, 6628 uint64_t offset, uint64_t length, 6629 lock_range_cb cb_fn, void *cb_arg) 6630 { 6631 struct spdk_bdev *bdev = spdk_bdev_desc_get_bdev(desc); 6632 struct spdk_bdev_channel *ch = spdk_io_channel_get_ctx(_ch); 6633 struct locked_lba_range_ctx *ctx; 6634 struct lba_range *range; 6635 bool range_found = false; 6636 6637 /* Let's make sure the specified channel actually has a lock on 6638 * the specified range. Note that the range must match exactly. 6639 */ 6640 TAILQ_FOREACH(range, &ch->locked_ranges, tailq) { 6641 if (range->offset == offset && range->length == length && 6642 range->owner_ch == ch && range->locked_ctx == cb_arg) { 6643 range_found = true; 6644 break; 6645 } 6646 } 6647 6648 if (!range_found) { 6649 return -EINVAL; 6650 } 6651 6652 pthread_mutex_lock(&bdev->internal.mutex); 6653 /* We confirmed that this channel has locked the specified range. To 6654 * start the unlock the process, we find the range in the bdev's locked_ranges 6655 * and remove it. This ensures new channels don't inherit the locked range. 6656 * Then we will send a message to each channel (including the one specified 6657 * here) to remove the range from its per-channel list. 6658 */ 6659 TAILQ_FOREACH(range, &bdev->internal.locked_ranges, tailq) { 6660 if (range->offset == offset && range->length == length && 6661 range->locked_ctx == cb_arg) { 6662 break; 6663 } 6664 } 6665 if (range == NULL) { 6666 assert(false); 6667 pthread_mutex_unlock(&bdev->internal.mutex); 6668 return -EINVAL; 6669 } 6670 TAILQ_REMOVE(&bdev->internal.locked_ranges, range, tailq); 6671 ctx = SPDK_CONTAINEROF(range, struct locked_lba_range_ctx, range); 6672 pthread_mutex_unlock(&bdev->internal.mutex); 6673 6674 ctx->cb_fn = cb_fn; 6675 ctx->cb_arg = cb_arg; 6676 6677 spdk_for_each_channel(__bdev_to_io_dev(bdev), bdev_unlock_lba_range_get_channel, ctx, 6678 bdev_unlock_lba_range_cb); 6679 return 0; 6680 } 6681 6682 SPDK_LOG_REGISTER_COMPONENT(bdev) 6683 6684 SPDK_TRACE_REGISTER_FN(bdev_trace, "bdev", TRACE_GROUP_BDEV) 6685 { 6686 spdk_trace_register_owner(OWNER_BDEV, 'b'); 6687 spdk_trace_register_object(OBJECT_BDEV_IO, 'i'); 6688 spdk_trace_register_description("BDEV_IO_START", TRACE_BDEV_IO_START, OWNER_BDEV, 6689 OBJECT_BDEV_IO, 1, 0, "type: "); 6690 spdk_trace_register_description("BDEV_IO_DONE", TRACE_BDEV_IO_DONE, OWNER_BDEV, 6691 OBJECT_BDEV_IO, 0, 0, ""); 6692 } 6693