1 /*- 2 * BSD LICENSE 3 * 4 * Copyright (c) Intel Corporation. All rights reserved. 5 * Copyright (c) 2019 Mellanox Technologies LTD. All rights reserved. 6 * 7 * Redistribution and use in source and binary forms, with or without 8 * modification, are permitted provided that the following conditions 9 * are met: 10 * 11 * * Redistributions of source code must retain the above copyright 12 * notice, this list of conditions and the following disclaimer. 13 * * Redistributions in binary form must reproduce the above copyright 14 * notice, this list of conditions and the following disclaimer in 15 * the documentation and/or other materials provided with the 16 * distribution. 17 * * Neither the name of Intel Corporation nor the names of its 18 * contributors may be used to endorse or promote products derived 19 * from this software without specific prior written permission. 20 * 21 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 22 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 23 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR 24 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT 25 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, 26 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT 27 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 28 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY 29 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 30 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE 31 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 32 */ 33 34 #include "spdk/stdinc.h" 35 36 #include "spdk/bdev.h" 37 38 #include "spdk/config.h" 39 #include "spdk/env.h" 40 #include "spdk/thread.h" 41 #include "spdk/likely.h" 42 #include "spdk/queue.h" 43 #include "spdk/nvme_spec.h" 44 #include "spdk/scsi_spec.h" 45 #include "spdk/notify.h" 46 #include "spdk/util.h" 47 #include "spdk/trace.h" 48 49 #include "spdk/bdev_module.h" 50 #include "spdk/log.h" 51 #include "spdk/string.h" 52 53 #include "bdev_internal.h" 54 55 #ifdef SPDK_CONFIG_VTUNE 56 #include "ittnotify.h" 57 #include "ittnotify_types.h" 58 int __itt_init_ittlib(const char *, __itt_group_id); 59 #endif 60 61 #define SPDK_BDEV_IO_POOL_SIZE (64 * 1024 - 1) 62 #define SPDK_BDEV_IO_CACHE_SIZE 256 63 #define SPDK_BDEV_AUTO_EXAMINE true 64 #define BUF_SMALL_POOL_SIZE 8191 65 #define BUF_LARGE_POOL_SIZE 1023 66 #define NOMEM_THRESHOLD_COUNT 8 67 #define ZERO_BUFFER_SIZE 0x100000 68 69 #define OWNER_BDEV 0x2 70 71 #define OBJECT_BDEV_IO 0x2 72 73 #define TRACE_GROUP_BDEV 0x3 74 #define TRACE_BDEV_IO_START SPDK_TPOINT_ID(TRACE_GROUP_BDEV, 0x0) 75 #define TRACE_BDEV_IO_DONE SPDK_TPOINT_ID(TRACE_GROUP_BDEV, 0x1) 76 77 #define SPDK_BDEV_QOS_TIMESLICE_IN_USEC 1000 78 #define SPDK_BDEV_QOS_MIN_IO_PER_TIMESLICE 1 79 #define SPDK_BDEV_QOS_MIN_BYTE_PER_TIMESLICE 512 80 #define SPDK_BDEV_QOS_MIN_IOS_PER_SEC 1000 81 #define SPDK_BDEV_QOS_MIN_BYTES_PER_SEC (1024 * 1024) 82 #define SPDK_BDEV_QOS_LIMIT_NOT_DEFINED UINT64_MAX 83 #define SPDK_BDEV_IO_POLL_INTERVAL_IN_MSEC 1000 84 85 #define SPDK_BDEV_POOL_ALIGNMENT 512 86 87 static const char *qos_rpc_type[] = {"rw_ios_per_sec", 88 "rw_mbytes_per_sec", "r_mbytes_per_sec", "w_mbytes_per_sec" 89 }; 90 91 TAILQ_HEAD(spdk_bdev_list, spdk_bdev); 92 93 struct spdk_bdev_mgr { 94 struct spdk_mempool *bdev_io_pool; 95 96 struct spdk_mempool *buf_small_pool; 97 struct spdk_mempool *buf_large_pool; 98 99 void *zero_buffer; 100 101 TAILQ_HEAD(bdev_module_list, spdk_bdev_module) bdev_modules; 102 103 struct spdk_bdev_list bdevs; 104 105 bool init_complete; 106 bool module_init_complete; 107 108 pthread_mutex_t mutex; 109 110 #ifdef SPDK_CONFIG_VTUNE 111 __itt_domain *domain; 112 #endif 113 }; 114 115 static struct spdk_bdev_mgr g_bdev_mgr = { 116 .bdev_modules = TAILQ_HEAD_INITIALIZER(g_bdev_mgr.bdev_modules), 117 .bdevs = TAILQ_HEAD_INITIALIZER(g_bdev_mgr.bdevs), 118 .init_complete = false, 119 .module_init_complete = false, 120 .mutex = PTHREAD_MUTEX_INITIALIZER, 121 }; 122 123 typedef void (*lock_range_cb)(void *ctx, int status); 124 125 struct lba_range { 126 uint64_t offset; 127 uint64_t length; 128 void *locked_ctx; 129 struct spdk_bdev_channel *owner_ch; 130 TAILQ_ENTRY(lba_range) tailq; 131 }; 132 133 static struct spdk_bdev_opts g_bdev_opts = { 134 .bdev_io_pool_size = SPDK_BDEV_IO_POOL_SIZE, 135 .bdev_io_cache_size = SPDK_BDEV_IO_CACHE_SIZE, 136 .bdev_auto_examine = SPDK_BDEV_AUTO_EXAMINE, 137 }; 138 139 static spdk_bdev_init_cb g_init_cb_fn = NULL; 140 static void *g_init_cb_arg = NULL; 141 142 static spdk_bdev_fini_cb g_fini_cb_fn = NULL; 143 static void *g_fini_cb_arg = NULL; 144 static struct spdk_thread *g_fini_thread = NULL; 145 146 struct spdk_bdev_qos_limit { 147 /** IOs or bytes allowed per second (i.e., 1s). */ 148 uint64_t limit; 149 150 /** Remaining IOs or bytes allowed in current timeslice (e.g., 1ms). 151 * For remaining bytes, allowed to run negative if an I/O is submitted when 152 * some bytes are remaining, but the I/O is bigger than that amount. The 153 * excess will be deducted from the next timeslice. 154 */ 155 int64_t remaining_this_timeslice; 156 157 /** Minimum allowed IOs or bytes to be issued in one timeslice (e.g., 1ms). */ 158 uint32_t min_per_timeslice; 159 160 /** Maximum allowed IOs or bytes to be issued in one timeslice (e.g., 1ms). */ 161 uint32_t max_per_timeslice; 162 163 /** Function to check whether to queue the IO. */ 164 bool (*queue_io)(const struct spdk_bdev_qos_limit *limit, struct spdk_bdev_io *io); 165 166 /** Function to update for the submitted IO. */ 167 void (*update_quota)(struct spdk_bdev_qos_limit *limit, struct spdk_bdev_io *io); 168 }; 169 170 struct spdk_bdev_qos { 171 /** Types of structure of rate limits. */ 172 struct spdk_bdev_qos_limit rate_limits[SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES]; 173 174 /** The channel that all I/O are funneled through. */ 175 struct spdk_bdev_channel *ch; 176 177 /** The thread on which the poller is running. */ 178 struct spdk_thread *thread; 179 180 /** Queue of I/O waiting to be issued. */ 181 bdev_io_tailq_t queued; 182 183 /** Size of a timeslice in tsc ticks. */ 184 uint64_t timeslice_size; 185 186 /** Timestamp of start of last timeslice. */ 187 uint64_t last_timeslice; 188 189 /** Poller that processes queued I/O commands each time slice. */ 190 struct spdk_poller *poller; 191 }; 192 193 struct spdk_bdev_mgmt_channel { 194 bdev_io_stailq_t need_buf_small; 195 bdev_io_stailq_t need_buf_large; 196 197 /* 198 * Each thread keeps a cache of bdev_io - this allows 199 * bdev threads which are *not* DPDK threads to still 200 * benefit from a per-thread bdev_io cache. Without 201 * this, non-DPDK threads fetching from the mempool 202 * incur a cmpxchg on get and put. 203 */ 204 bdev_io_stailq_t per_thread_cache; 205 uint32_t per_thread_cache_count; 206 uint32_t bdev_io_cache_size; 207 208 TAILQ_HEAD(, spdk_bdev_shared_resource) shared_resources; 209 TAILQ_HEAD(, spdk_bdev_io_wait_entry) io_wait_queue; 210 }; 211 212 /* 213 * Per-module (or per-io_device) data. Multiple bdevs built on the same io_device 214 * will queue here their IO that awaits retry. It makes it possible to retry sending 215 * IO to one bdev after IO from other bdev completes. 216 */ 217 struct spdk_bdev_shared_resource { 218 /* The bdev management channel */ 219 struct spdk_bdev_mgmt_channel *mgmt_ch; 220 221 /* 222 * Count of I/O submitted to bdev module and waiting for completion. 223 * Incremented before submit_request() is called on an spdk_bdev_io. 224 */ 225 uint64_t io_outstanding; 226 227 /* 228 * Queue of IO awaiting retry because of a previous NOMEM status returned 229 * on this channel. 230 */ 231 bdev_io_tailq_t nomem_io; 232 233 /* 234 * Threshold which io_outstanding must drop to before retrying nomem_io. 235 */ 236 uint64_t nomem_threshold; 237 238 /* I/O channel allocated by a bdev module */ 239 struct spdk_io_channel *shared_ch; 240 241 /* Refcount of bdev channels using this resource */ 242 uint32_t ref; 243 244 TAILQ_ENTRY(spdk_bdev_shared_resource) link; 245 }; 246 247 #define BDEV_CH_RESET_IN_PROGRESS (1 << 0) 248 #define BDEV_CH_QOS_ENABLED (1 << 1) 249 250 struct spdk_bdev_channel { 251 struct spdk_bdev *bdev; 252 253 /* The channel for the underlying device */ 254 struct spdk_io_channel *channel; 255 256 /* Per io_device per thread data */ 257 struct spdk_bdev_shared_resource *shared_resource; 258 259 struct spdk_bdev_io_stat stat; 260 261 /* 262 * Count of I/O submitted to the underlying dev module through this channel 263 * and waiting for completion. 264 */ 265 uint64_t io_outstanding; 266 267 /* 268 * List of all submitted I/Os including I/O that are generated via splitting. 269 */ 270 bdev_io_tailq_t io_submitted; 271 272 /* 273 * List of spdk_bdev_io that are currently queued because they write to a locked 274 * LBA range. 275 */ 276 bdev_io_tailq_t io_locked; 277 278 uint32_t flags; 279 280 struct spdk_histogram_data *histogram; 281 282 #ifdef SPDK_CONFIG_VTUNE 283 uint64_t start_tsc; 284 uint64_t interval_tsc; 285 __itt_string_handle *handle; 286 struct spdk_bdev_io_stat prev_stat; 287 #endif 288 289 bdev_io_tailq_t queued_resets; 290 291 lba_range_tailq_t locked_ranges; 292 }; 293 294 struct media_event_entry { 295 struct spdk_bdev_media_event event; 296 TAILQ_ENTRY(media_event_entry) tailq; 297 }; 298 299 #define MEDIA_EVENT_POOL_SIZE 64 300 301 struct spdk_bdev_desc { 302 struct spdk_bdev *bdev; 303 struct spdk_thread *thread; 304 struct { 305 bool open_with_ext; 306 union { 307 spdk_bdev_remove_cb_t remove_fn; 308 spdk_bdev_event_cb_t event_fn; 309 }; 310 void *ctx; 311 } callback; 312 bool closed; 313 bool write; 314 pthread_mutex_t mutex; 315 uint32_t refs; 316 TAILQ_HEAD(, media_event_entry) pending_media_events; 317 TAILQ_HEAD(, media_event_entry) free_media_events; 318 struct media_event_entry *media_events_buffer; 319 TAILQ_ENTRY(spdk_bdev_desc) link; 320 321 uint64_t timeout_in_sec; 322 spdk_bdev_io_timeout_cb cb_fn; 323 void *cb_arg; 324 struct spdk_poller *io_timeout_poller; 325 }; 326 327 struct spdk_bdev_iostat_ctx { 328 struct spdk_bdev_io_stat *stat; 329 spdk_bdev_get_device_stat_cb cb; 330 void *cb_arg; 331 }; 332 333 struct set_qos_limit_ctx { 334 void (*cb_fn)(void *cb_arg, int status); 335 void *cb_arg; 336 struct spdk_bdev *bdev; 337 }; 338 339 #define __bdev_to_io_dev(bdev) (((char *)bdev) + 1) 340 #define __bdev_from_io_dev(io_dev) ((struct spdk_bdev *)(((char *)io_dev) - 1)) 341 342 static void bdev_write_zero_buffer_done(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg); 343 static void bdev_write_zero_buffer_next(void *_bdev_io); 344 345 static void bdev_enable_qos_msg(struct spdk_io_channel_iter *i); 346 static void bdev_enable_qos_done(struct spdk_io_channel_iter *i, int status); 347 348 static int 349 bdev_readv_blocks_with_md(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 350 struct iovec *iov, int iovcnt, void *md_buf, uint64_t offset_blocks, 351 uint64_t num_blocks, spdk_bdev_io_completion_cb cb, void *cb_arg); 352 static int 353 bdev_writev_blocks_with_md(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 354 struct iovec *iov, int iovcnt, void *md_buf, 355 uint64_t offset_blocks, uint64_t num_blocks, 356 spdk_bdev_io_completion_cb cb, void *cb_arg); 357 358 static int 359 bdev_lock_lba_range(struct spdk_bdev_desc *desc, struct spdk_io_channel *_ch, 360 uint64_t offset, uint64_t length, 361 lock_range_cb cb_fn, void *cb_arg); 362 363 static int 364 bdev_unlock_lba_range(struct spdk_bdev_desc *desc, struct spdk_io_channel *_ch, 365 uint64_t offset, uint64_t length, 366 lock_range_cb cb_fn, void *cb_arg); 367 368 static inline void bdev_io_complete(void *ctx); 369 370 static bool bdev_abort_queued_io(bdev_io_tailq_t *queue, struct spdk_bdev_io *bio_to_abort); 371 static bool bdev_abort_buf_io(bdev_io_stailq_t *queue, struct spdk_bdev_io *bio_to_abort); 372 373 void 374 spdk_bdev_get_opts(struct spdk_bdev_opts *opts) 375 { 376 *opts = g_bdev_opts; 377 } 378 379 int 380 spdk_bdev_set_opts(struct spdk_bdev_opts *opts) 381 { 382 uint32_t min_pool_size; 383 384 /* 385 * Add 1 to the thread count to account for the extra mgmt_ch that gets created during subsystem 386 * initialization. A second mgmt_ch will be created on the same thread when the application starts 387 * but before the deferred put_io_channel event is executed for the first mgmt_ch. 388 */ 389 min_pool_size = opts->bdev_io_cache_size * (spdk_thread_get_count() + 1); 390 if (opts->bdev_io_pool_size < min_pool_size) { 391 SPDK_ERRLOG("bdev_io_pool_size %" PRIu32 " is not compatible with bdev_io_cache_size %" PRIu32 392 " and %" PRIu32 " threads\n", opts->bdev_io_pool_size, opts->bdev_io_cache_size, 393 spdk_thread_get_count()); 394 SPDK_ERRLOG("bdev_io_pool_size must be at least %" PRIu32 "\n", min_pool_size); 395 return -1; 396 } 397 398 g_bdev_opts = *opts; 399 return 0; 400 } 401 402 struct spdk_bdev_examine_item { 403 char *name; 404 TAILQ_ENTRY(spdk_bdev_examine_item) link; 405 }; 406 407 TAILQ_HEAD(spdk_bdev_examine_allowlist, spdk_bdev_examine_item); 408 409 struct spdk_bdev_examine_allowlist g_bdev_examine_allowlist = TAILQ_HEAD_INITIALIZER( 410 g_bdev_examine_allowlist); 411 412 static inline bool 413 bdev_examine_allowlist_check(const char *name) 414 { 415 struct spdk_bdev_examine_item *item; 416 TAILQ_FOREACH(item, &g_bdev_examine_allowlist, link) { 417 if (strcmp(name, item->name) == 0) { 418 return true; 419 } 420 } 421 return false; 422 } 423 424 static inline void 425 bdev_examine_allowlist_free(void) 426 { 427 struct spdk_bdev_examine_item *item; 428 while (!TAILQ_EMPTY(&g_bdev_examine_allowlist)) { 429 item = TAILQ_FIRST(&g_bdev_examine_allowlist); 430 TAILQ_REMOVE(&g_bdev_examine_allowlist, item, link); 431 free(item->name); 432 free(item); 433 } 434 } 435 436 static inline bool 437 bdev_in_examine_allowlist(struct spdk_bdev *bdev) 438 { 439 struct spdk_bdev_alias *tmp; 440 if (bdev_examine_allowlist_check(bdev->name)) { 441 return true; 442 } 443 TAILQ_FOREACH(tmp, &bdev->aliases, tailq) { 444 if (bdev_examine_allowlist_check(tmp->alias)) { 445 return true; 446 } 447 } 448 return false; 449 } 450 451 static inline bool 452 bdev_ok_to_examine(struct spdk_bdev *bdev) 453 { 454 if (g_bdev_opts.bdev_auto_examine) { 455 return true; 456 } else { 457 return bdev_in_examine_allowlist(bdev); 458 } 459 } 460 461 static void 462 bdev_examine(struct spdk_bdev *bdev) 463 { 464 struct spdk_bdev_module *module; 465 uint32_t action; 466 467 TAILQ_FOREACH(module, &g_bdev_mgr.bdev_modules, internal.tailq) { 468 if (module->examine_config && bdev_ok_to_examine(bdev)) { 469 action = module->internal.action_in_progress; 470 module->internal.action_in_progress++; 471 module->examine_config(bdev); 472 if (action != module->internal.action_in_progress) { 473 SPDK_ERRLOG("examine_config for module %s did not call spdk_bdev_module_examine_done()\n", 474 module->name); 475 } 476 } 477 } 478 479 if (bdev->internal.claim_module && bdev_ok_to_examine(bdev)) { 480 if (bdev->internal.claim_module->examine_disk) { 481 bdev->internal.claim_module->internal.action_in_progress++; 482 bdev->internal.claim_module->examine_disk(bdev); 483 } 484 return; 485 } 486 487 TAILQ_FOREACH(module, &g_bdev_mgr.bdev_modules, internal.tailq) { 488 if (module->examine_disk && bdev_ok_to_examine(bdev)) { 489 module->internal.action_in_progress++; 490 module->examine_disk(bdev); 491 } 492 } 493 } 494 495 int 496 spdk_bdev_examine(const char *name) 497 { 498 struct spdk_bdev *bdev; 499 struct spdk_bdev_examine_item *item; 500 501 if (g_bdev_opts.bdev_auto_examine) { 502 SPDK_ERRLOG("Manual examine is not allowed if auto examine is enabled"); 503 return -EINVAL; 504 } 505 506 if (bdev_examine_allowlist_check(name)) { 507 SPDK_ERRLOG("Duplicate bdev name for manual examine: %s\n", name); 508 return -EEXIST; 509 } 510 511 item = calloc(1, sizeof(*item)); 512 if (!item) { 513 return -ENOMEM; 514 } 515 item->name = strdup(name); 516 if (!item->name) { 517 free(item); 518 return -ENOMEM; 519 } 520 TAILQ_INSERT_TAIL(&g_bdev_examine_allowlist, item, link); 521 522 bdev = spdk_bdev_get_by_name(name); 523 if (bdev) { 524 bdev_examine(bdev); 525 } 526 return 0; 527 } 528 529 static inline void 530 bdev_examine_allowlist_config_json(struct spdk_json_write_ctx *w) 531 { 532 struct spdk_bdev_examine_item *item; 533 TAILQ_FOREACH(item, &g_bdev_examine_allowlist, link) { 534 spdk_json_write_object_begin(w); 535 spdk_json_write_named_string(w, "method", "bdev_examine"); 536 spdk_json_write_named_object_begin(w, "params"); 537 spdk_json_write_named_string(w, "name", item->name); 538 spdk_json_write_object_end(w); 539 spdk_json_write_object_end(w); 540 } 541 } 542 543 struct spdk_bdev * 544 spdk_bdev_first(void) 545 { 546 struct spdk_bdev *bdev; 547 548 bdev = TAILQ_FIRST(&g_bdev_mgr.bdevs); 549 if (bdev) { 550 SPDK_DEBUGLOG(bdev, "Starting bdev iteration at %s\n", bdev->name); 551 } 552 553 return bdev; 554 } 555 556 struct spdk_bdev * 557 spdk_bdev_next(struct spdk_bdev *prev) 558 { 559 struct spdk_bdev *bdev; 560 561 bdev = TAILQ_NEXT(prev, internal.link); 562 if (bdev) { 563 SPDK_DEBUGLOG(bdev, "Continuing bdev iteration at %s\n", bdev->name); 564 } 565 566 return bdev; 567 } 568 569 static struct spdk_bdev * 570 _bdev_next_leaf(struct spdk_bdev *bdev) 571 { 572 while (bdev != NULL) { 573 if (bdev->internal.claim_module == NULL) { 574 return bdev; 575 } else { 576 bdev = TAILQ_NEXT(bdev, internal.link); 577 } 578 } 579 580 return bdev; 581 } 582 583 struct spdk_bdev * 584 spdk_bdev_first_leaf(void) 585 { 586 struct spdk_bdev *bdev; 587 588 bdev = _bdev_next_leaf(TAILQ_FIRST(&g_bdev_mgr.bdevs)); 589 590 if (bdev) { 591 SPDK_DEBUGLOG(bdev, "Starting bdev iteration at %s\n", bdev->name); 592 } 593 594 return bdev; 595 } 596 597 struct spdk_bdev * 598 spdk_bdev_next_leaf(struct spdk_bdev *prev) 599 { 600 struct spdk_bdev *bdev; 601 602 bdev = _bdev_next_leaf(TAILQ_NEXT(prev, internal.link)); 603 604 if (bdev) { 605 SPDK_DEBUGLOG(bdev, "Continuing bdev iteration at %s\n", bdev->name); 606 } 607 608 return bdev; 609 } 610 611 struct spdk_bdev * 612 spdk_bdev_get_by_name(const char *bdev_name) 613 { 614 struct spdk_bdev_alias *tmp; 615 struct spdk_bdev *bdev = spdk_bdev_first(); 616 617 while (bdev != NULL) { 618 if (strcmp(bdev_name, bdev->name) == 0) { 619 return bdev; 620 } 621 622 TAILQ_FOREACH(tmp, &bdev->aliases, tailq) { 623 if (strcmp(bdev_name, tmp->alias) == 0) { 624 return bdev; 625 } 626 } 627 628 bdev = spdk_bdev_next(bdev); 629 } 630 631 return NULL; 632 } 633 634 void 635 spdk_bdev_io_set_buf(struct spdk_bdev_io *bdev_io, void *buf, size_t len) 636 { 637 struct iovec *iovs; 638 639 if (bdev_io->u.bdev.iovs == NULL) { 640 bdev_io->u.bdev.iovs = &bdev_io->iov; 641 bdev_io->u.bdev.iovcnt = 1; 642 } 643 644 iovs = bdev_io->u.bdev.iovs; 645 646 assert(iovs != NULL); 647 assert(bdev_io->u.bdev.iovcnt >= 1); 648 649 iovs[0].iov_base = buf; 650 iovs[0].iov_len = len; 651 } 652 653 void 654 spdk_bdev_io_set_md_buf(struct spdk_bdev_io *bdev_io, void *md_buf, size_t len) 655 { 656 assert((len / spdk_bdev_get_md_size(bdev_io->bdev)) >= bdev_io->u.bdev.num_blocks); 657 bdev_io->u.bdev.md_buf = md_buf; 658 } 659 660 static bool 661 _is_buf_allocated(const struct iovec *iovs) 662 { 663 if (iovs == NULL) { 664 return false; 665 } 666 667 return iovs[0].iov_base != NULL; 668 } 669 670 static bool 671 _are_iovs_aligned(struct iovec *iovs, int iovcnt, uint32_t alignment) 672 { 673 int i; 674 uintptr_t iov_base; 675 676 if (spdk_likely(alignment == 1)) { 677 return true; 678 } 679 680 for (i = 0; i < iovcnt; i++) { 681 iov_base = (uintptr_t)iovs[i].iov_base; 682 if ((iov_base & (alignment - 1)) != 0) { 683 return false; 684 } 685 } 686 687 return true; 688 } 689 690 static void 691 _copy_iovs_to_buf(void *buf, size_t buf_len, struct iovec *iovs, int iovcnt) 692 { 693 int i; 694 size_t len; 695 696 for (i = 0; i < iovcnt; i++) { 697 len = spdk_min(iovs[i].iov_len, buf_len); 698 memcpy(buf, iovs[i].iov_base, len); 699 buf += len; 700 buf_len -= len; 701 } 702 } 703 704 static void 705 _copy_buf_to_iovs(struct iovec *iovs, int iovcnt, void *buf, size_t buf_len) 706 { 707 int i; 708 size_t len; 709 710 for (i = 0; i < iovcnt; i++) { 711 len = spdk_min(iovs[i].iov_len, buf_len); 712 memcpy(iovs[i].iov_base, buf, len); 713 buf += len; 714 buf_len -= len; 715 } 716 } 717 718 static void 719 _bdev_io_set_bounce_buf(struct spdk_bdev_io *bdev_io, void *buf, size_t len) 720 { 721 /* save original iovec */ 722 bdev_io->internal.orig_iovs = bdev_io->u.bdev.iovs; 723 bdev_io->internal.orig_iovcnt = bdev_io->u.bdev.iovcnt; 724 /* set bounce iov */ 725 bdev_io->u.bdev.iovs = &bdev_io->internal.bounce_iov; 726 bdev_io->u.bdev.iovcnt = 1; 727 /* set bounce buffer for this operation */ 728 bdev_io->u.bdev.iovs[0].iov_base = buf; 729 bdev_io->u.bdev.iovs[0].iov_len = len; 730 /* if this is write path, copy data from original buffer to bounce buffer */ 731 if (bdev_io->type == SPDK_BDEV_IO_TYPE_WRITE) { 732 _copy_iovs_to_buf(buf, len, bdev_io->internal.orig_iovs, bdev_io->internal.orig_iovcnt); 733 } 734 } 735 736 static void 737 _bdev_io_set_bounce_md_buf(struct spdk_bdev_io *bdev_io, void *md_buf, size_t len) 738 { 739 /* save original md_buf */ 740 bdev_io->internal.orig_md_buf = bdev_io->u.bdev.md_buf; 741 /* set bounce md_buf */ 742 bdev_io->u.bdev.md_buf = md_buf; 743 744 if (bdev_io->type == SPDK_BDEV_IO_TYPE_WRITE) { 745 memcpy(md_buf, bdev_io->internal.orig_md_buf, len); 746 } 747 } 748 749 static void 750 bdev_io_get_buf_complete(struct spdk_bdev_io *bdev_io, void *buf, bool status) 751 { 752 struct spdk_io_channel *ch = spdk_bdev_io_get_io_channel(bdev_io); 753 754 if (spdk_unlikely(bdev_io->internal.get_aux_buf_cb != NULL)) { 755 bdev_io->internal.get_aux_buf_cb(ch, bdev_io, buf); 756 bdev_io->internal.get_aux_buf_cb = NULL; 757 } else { 758 assert(bdev_io->internal.get_buf_cb != NULL); 759 bdev_io->internal.buf = buf; 760 bdev_io->internal.get_buf_cb(ch, bdev_io, status); 761 bdev_io->internal.get_buf_cb = NULL; 762 } 763 } 764 765 static void 766 _bdev_io_set_buf(struct spdk_bdev_io *bdev_io, void *buf, uint64_t len) 767 { 768 struct spdk_bdev *bdev = bdev_io->bdev; 769 bool buf_allocated; 770 uint64_t md_len, alignment; 771 void *aligned_buf; 772 773 if (spdk_unlikely(bdev_io->internal.get_aux_buf_cb != NULL)) { 774 bdev_io_get_buf_complete(bdev_io, buf, true); 775 return; 776 } 777 778 alignment = spdk_bdev_get_buf_align(bdev); 779 buf_allocated = _is_buf_allocated(bdev_io->u.bdev.iovs); 780 aligned_buf = (void *)(((uintptr_t)buf + (alignment - 1)) & ~(alignment - 1)); 781 782 if (buf_allocated) { 783 _bdev_io_set_bounce_buf(bdev_io, aligned_buf, len); 784 } else { 785 spdk_bdev_io_set_buf(bdev_io, aligned_buf, len); 786 } 787 788 if (spdk_bdev_is_md_separate(bdev)) { 789 aligned_buf = (char *)aligned_buf + len; 790 md_len = bdev_io->u.bdev.num_blocks * bdev->md_len; 791 792 assert(((uintptr_t)aligned_buf & (alignment - 1)) == 0); 793 794 if (bdev_io->u.bdev.md_buf != NULL) { 795 _bdev_io_set_bounce_md_buf(bdev_io, aligned_buf, md_len); 796 } else { 797 spdk_bdev_io_set_md_buf(bdev_io, aligned_buf, md_len); 798 } 799 } 800 bdev_io_get_buf_complete(bdev_io, buf, true); 801 } 802 803 static void 804 _bdev_io_put_buf(struct spdk_bdev_io *bdev_io, void *buf, uint64_t buf_len) 805 { 806 struct spdk_bdev *bdev = bdev_io->bdev; 807 struct spdk_mempool *pool; 808 struct spdk_bdev_io *tmp; 809 bdev_io_stailq_t *stailq; 810 struct spdk_bdev_mgmt_channel *ch; 811 uint64_t md_len, alignment; 812 813 md_len = spdk_bdev_is_md_separate(bdev) ? bdev_io->u.bdev.num_blocks * bdev->md_len : 0; 814 alignment = spdk_bdev_get_buf_align(bdev); 815 ch = bdev_io->internal.ch->shared_resource->mgmt_ch; 816 817 if (buf_len + alignment + md_len <= SPDK_BDEV_BUF_SIZE_WITH_MD(SPDK_BDEV_SMALL_BUF_MAX_SIZE) + 818 SPDK_BDEV_POOL_ALIGNMENT) { 819 pool = g_bdev_mgr.buf_small_pool; 820 stailq = &ch->need_buf_small; 821 } else { 822 pool = g_bdev_mgr.buf_large_pool; 823 stailq = &ch->need_buf_large; 824 } 825 826 if (STAILQ_EMPTY(stailq)) { 827 spdk_mempool_put(pool, buf); 828 } else { 829 tmp = STAILQ_FIRST(stailq); 830 STAILQ_REMOVE_HEAD(stailq, internal.buf_link); 831 _bdev_io_set_buf(tmp, buf, tmp->internal.buf_len); 832 } 833 } 834 835 static void 836 bdev_io_put_buf(struct spdk_bdev_io *bdev_io) 837 { 838 assert(bdev_io->internal.buf != NULL); 839 _bdev_io_put_buf(bdev_io, bdev_io->internal.buf, bdev_io->internal.buf_len); 840 bdev_io->internal.buf = NULL; 841 } 842 843 void 844 spdk_bdev_io_put_aux_buf(struct spdk_bdev_io *bdev_io, void *buf) 845 { 846 uint64_t len = bdev_io->u.bdev.num_blocks * bdev_io->bdev->blocklen; 847 848 assert(buf != NULL); 849 _bdev_io_put_buf(bdev_io, buf, len); 850 } 851 852 static void 853 _bdev_io_unset_bounce_buf(struct spdk_bdev_io *bdev_io) 854 { 855 if (spdk_likely(bdev_io->internal.orig_iovcnt == 0)) { 856 assert(bdev_io->internal.orig_md_buf == NULL); 857 return; 858 } 859 860 /* if this is read path, copy data from bounce buffer to original buffer */ 861 if (bdev_io->type == SPDK_BDEV_IO_TYPE_READ && 862 bdev_io->internal.status == SPDK_BDEV_IO_STATUS_SUCCESS) { 863 _copy_buf_to_iovs(bdev_io->internal.orig_iovs, 864 bdev_io->internal.orig_iovcnt, 865 bdev_io->internal.bounce_iov.iov_base, 866 bdev_io->internal.bounce_iov.iov_len); 867 } 868 /* set original buffer for this io */ 869 bdev_io->u.bdev.iovcnt = bdev_io->internal.orig_iovcnt; 870 bdev_io->u.bdev.iovs = bdev_io->internal.orig_iovs; 871 /* disable bouncing buffer for this io */ 872 bdev_io->internal.orig_iovcnt = 0; 873 bdev_io->internal.orig_iovs = NULL; 874 875 /* do the same for metadata buffer */ 876 if (spdk_unlikely(bdev_io->internal.orig_md_buf != NULL)) { 877 assert(spdk_bdev_is_md_separate(bdev_io->bdev)); 878 879 if (bdev_io->type == SPDK_BDEV_IO_TYPE_READ && 880 bdev_io->internal.status == SPDK_BDEV_IO_STATUS_SUCCESS) { 881 memcpy(bdev_io->internal.orig_md_buf, bdev_io->u.bdev.md_buf, 882 bdev_io->u.bdev.num_blocks * spdk_bdev_get_md_size(bdev_io->bdev)); 883 } 884 885 bdev_io->u.bdev.md_buf = bdev_io->internal.orig_md_buf; 886 bdev_io->internal.orig_md_buf = NULL; 887 } 888 889 /* We want to free the bounce buffer here since we know we're done with it (as opposed 890 * to waiting for the conditional free of internal.buf in spdk_bdev_free_io()). 891 */ 892 bdev_io_put_buf(bdev_io); 893 } 894 895 static void 896 bdev_io_get_buf(struct spdk_bdev_io *bdev_io, uint64_t len) 897 { 898 struct spdk_bdev *bdev = bdev_io->bdev; 899 struct spdk_mempool *pool; 900 bdev_io_stailq_t *stailq; 901 struct spdk_bdev_mgmt_channel *mgmt_ch; 902 uint64_t alignment, md_len; 903 void *buf; 904 905 alignment = spdk_bdev_get_buf_align(bdev); 906 md_len = spdk_bdev_is_md_separate(bdev) ? bdev_io->u.bdev.num_blocks * bdev->md_len : 0; 907 908 if (len + alignment + md_len > SPDK_BDEV_BUF_SIZE_WITH_MD(SPDK_BDEV_LARGE_BUF_MAX_SIZE) + 909 SPDK_BDEV_POOL_ALIGNMENT) { 910 SPDK_ERRLOG("Length + alignment %" PRIu64 " is larger than allowed\n", 911 len + alignment); 912 bdev_io_get_buf_complete(bdev_io, NULL, false); 913 return; 914 } 915 916 mgmt_ch = bdev_io->internal.ch->shared_resource->mgmt_ch; 917 918 bdev_io->internal.buf_len = len; 919 920 if (len + alignment + md_len <= SPDK_BDEV_BUF_SIZE_WITH_MD(SPDK_BDEV_SMALL_BUF_MAX_SIZE) + 921 SPDK_BDEV_POOL_ALIGNMENT) { 922 pool = g_bdev_mgr.buf_small_pool; 923 stailq = &mgmt_ch->need_buf_small; 924 } else { 925 pool = g_bdev_mgr.buf_large_pool; 926 stailq = &mgmt_ch->need_buf_large; 927 } 928 929 buf = spdk_mempool_get(pool); 930 if (!buf) { 931 STAILQ_INSERT_TAIL(stailq, bdev_io, internal.buf_link); 932 } else { 933 _bdev_io_set_buf(bdev_io, buf, len); 934 } 935 } 936 937 void 938 spdk_bdev_io_get_buf(struct spdk_bdev_io *bdev_io, spdk_bdev_io_get_buf_cb cb, uint64_t len) 939 { 940 struct spdk_bdev *bdev = bdev_io->bdev; 941 uint64_t alignment; 942 943 assert(cb != NULL); 944 bdev_io->internal.get_buf_cb = cb; 945 946 alignment = spdk_bdev_get_buf_align(bdev); 947 948 if (_is_buf_allocated(bdev_io->u.bdev.iovs) && 949 _are_iovs_aligned(bdev_io->u.bdev.iovs, bdev_io->u.bdev.iovcnt, alignment)) { 950 /* Buffer already present and aligned */ 951 cb(spdk_bdev_io_get_io_channel(bdev_io), bdev_io, true); 952 return; 953 } 954 955 bdev_io_get_buf(bdev_io, len); 956 } 957 958 void 959 spdk_bdev_io_get_aux_buf(struct spdk_bdev_io *bdev_io, spdk_bdev_io_get_aux_buf_cb cb) 960 { 961 uint64_t len = bdev_io->u.bdev.num_blocks * bdev_io->bdev->blocklen; 962 963 assert(cb != NULL); 964 assert(bdev_io->internal.get_aux_buf_cb == NULL); 965 bdev_io->internal.get_aux_buf_cb = cb; 966 bdev_io_get_buf(bdev_io, len); 967 } 968 969 static int 970 bdev_module_get_max_ctx_size(void) 971 { 972 struct spdk_bdev_module *bdev_module; 973 int max_bdev_module_size = 0; 974 975 TAILQ_FOREACH(bdev_module, &g_bdev_mgr.bdev_modules, internal.tailq) { 976 if (bdev_module->get_ctx_size && bdev_module->get_ctx_size() > max_bdev_module_size) { 977 max_bdev_module_size = bdev_module->get_ctx_size(); 978 } 979 } 980 981 return max_bdev_module_size; 982 } 983 984 static void 985 bdev_qos_config_json(struct spdk_bdev *bdev, struct spdk_json_write_ctx *w) 986 { 987 int i; 988 struct spdk_bdev_qos *qos = bdev->internal.qos; 989 uint64_t limits[SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES]; 990 991 if (!qos) { 992 return; 993 } 994 995 spdk_bdev_get_qos_rate_limits(bdev, limits); 996 997 spdk_json_write_object_begin(w); 998 spdk_json_write_named_string(w, "method", "bdev_set_qos_limit"); 999 1000 spdk_json_write_named_object_begin(w, "params"); 1001 spdk_json_write_named_string(w, "name", bdev->name); 1002 for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) { 1003 if (limits[i] > 0) { 1004 spdk_json_write_named_uint64(w, qos_rpc_type[i], limits[i]); 1005 } 1006 } 1007 spdk_json_write_object_end(w); 1008 1009 spdk_json_write_object_end(w); 1010 } 1011 1012 void 1013 spdk_bdev_subsystem_config_json(struct spdk_json_write_ctx *w) 1014 { 1015 struct spdk_bdev_module *bdev_module; 1016 struct spdk_bdev *bdev; 1017 1018 assert(w != NULL); 1019 1020 spdk_json_write_array_begin(w); 1021 1022 spdk_json_write_object_begin(w); 1023 spdk_json_write_named_string(w, "method", "bdev_set_options"); 1024 spdk_json_write_named_object_begin(w, "params"); 1025 spdk_json_write_named_uint32(w, "bdev_io_pool_size", g_bdev_opts.bdev_io_pool_size); 1026 spdk_json_write_named_uint32(w, "bdev_io_cache_size", g_bdev_opts.bdev_io_cache_size); 1027 spdk_json_write_named_bool(w, "bdev_auto_examine", g_bdev_opts.bdev_auto_examine); 1028 spdk_json_write_object_end(w); 1029 spdk_json_write_object_end(w); 1030 1031 bdev_examine_allowlist_config_json(w); 1032 1033 TAILQ_FOREACH(bdev_module, &g_bdev_mgr.bdev_modules, internal.tailq) { 1034 if (bdev_module->config_json) { 1035 bdev_module->config_json(w); 1036 } 1037 } 1038 1039 pthread_mutex_lock(&g_bdev_mgr.mutex); 1040 1041 TAILQ_FOREACH(bdev, &g_bdev_mgr.bdevs, internal.link) { 1042 if (bdev->fn_table->write_config_json) { 1043 bdev->fn_table->write_config_json(bdev, w); 1044 } 1045 1046 bdev_qos_config_json(bdev, w); 1047 } 1048 1049 pthread_mutex_unlock(&g_bdev_mgr.mutex); 1050 1051 spdk_json_write_array_end(w); 1052 } 1053 1054 static int 1055 bdev_mgmt_channel_create(void *io_device, void *ctx_buf) 1056 { 1057 struct spdk_bdev_mgmt_channel *ch = ctx_buf; 1058 struct spdk_bdev_io *bdev_io; 1059 uint32_t i; 1060 1061 STAILQ_INIT(&ch->need_buf_small); 1062 STAILQ_INIT(&ch->need_buf_large); 1063 1064 STAILQ_INIT(&ch->per_thread_cache); 1065 ch->bdev_io_cache_size = g_bdev_opts.bdev_io_cache_size; 1066 1067 /* Pre-populate bdev_io cache to ensure this thread cannot be starved. */ 1068 ch->per_thread_cache_count = 0; 1069 for (i = 0; i < ch->bdev_io_cache_size; i++) { 1070 bdev_io = spdk_mempool_get(g_bdev_mgr.bdev_io_pool); 1071 assert(bdev_io != NULL); 1072 ch->per_thread_cache_count++; 1073 STAILQ_INSERT_HEAD(&ch->per_thread_cache, bdev_io, internal.buf_link); 1074 } 1075 1076 TAILQ_INIT(&ch->shared_resources); 1077 TAILQ_INIT(&ch->io_wait_queue); 1078 1079 return 0; 1080 } 1081 1082 static void 1083 bdev_mgmt_channel_destroy(void *io_device, void *ctx_buf) 1084 { 1085 struct spdk_bdev_mgmt_channel *ch = ctx_buf; 1086 struct spdk_bdev_io *bdev_io; 1087 1088 if (!STAILQ_EMPTY(&ch->need_buf_small) || !STAILQ_EMPTY(&ch->need_buf_large)) { 1089 SPDK_ERRLOG("Pending I/O list wasn't empty on mgmt channel free\n"); 1090 } 1091 1092 if (!TAILQ_EMPTY(&ch->shared_resources)) { 1093 SPDK_ERRLOG("Module channel list wasn't empty on mgmt channel free\n"); 1094 } 1095 1096 while (!STAILQ_EMPTY(&ch->per_thread_cache)) { 1097 bdev_io = STAILQ_FIRST(&ch->per_thread_cache); 1098 STAILQ_REMOVE_HEAD(&ch->per_thread_cache, internal.buf_link); 1099 ch->per_thread_cache_count--; 1100 spdk_mempool_put(g_bdev_mgr.bdev_io_pool, (void *)bdev_io); 1101 } 1102 1103 assert(ch->per_thread_cache_count == 0); 1104 } 1105 1106 static void 1107 bdev_init_complete(int rc) 1108 { 1109 spdk_bdev_init_cb cb_fn = g_init_cb_fn; 1110 void *cb_arg = g_init_cb_arg; 1111 struct spdk_bdev_module *m; 1112 1113 g_bdev_mgr.init_complete = true; 1114 g_init_cb_fn = NULL; 1115 g_init_cb_arg = NULL; 1116 1117 /* 1118 * For modules that need to know when subsystem init is complete, 1119 * inform them now. 1120 */ 1121 if (rc == 0) { 1122 TAILQ_FOREACH(m, &g_bdev_mgr.bdev_modules, internal.tailq) { 1123 if (m->init_complete) { 1124 m->init_complete(); 1125 } 1126 } 1127 } 1128 1129 cb_fn(cb_arg, rc); 1130 } 1131 1132 static void 1133 bdev_module_action_complete(void) 1134 { 1135 struct spdk_bdev_module *m; 1136 1137 /* 1138 * Don't finish bdev subsystem initialization if 1139 * module pre-initialization is still in progress, or 1140 * the subsystem been already initialized. 1141 */ 1142 if (!g_bdev_mgr.module_init_complete || g_bdev_mgr.init_complete) { 1143 return; 1144 } 1145 1146 /* 1147 * Check all bdev modules for inits/examinations in progress. If any 1148 * exist, return immediately since we cannot finish bdev subsystem 1149 * initialization until all are completed. 1150 */ 1151 TAILQ_FOREACH(m, &g_bdev_mgr.bdev_modules, internal.tailq) { 1152 if (m->internal.action_in_progress > 0) { 1153 return; 1154 } 1155 } 1156 1157 /* 1158 * Modules already finished initialization - now that all 1159 * the bdev modules have finished their asynchronous I/O 1160 * processing, the entire bdev layer can be marked as complete. 1161 */ 1162 bdev_init_complete(0); 1163 } 1164 1165 static void 1166 bdev_module_action_done(struct spdk_bdev_module *module) 1167 { 1168 assert(module->internal.action_in_progress > 0); 1169 module->internal.action_in_progress--; 1170 bdev_module_action_complete(); 1171 } 1172 1173 void 1174 spdk_bdev_module_init_done(struct spdk_bdev_module *module) 1175 { 1176 bdev_module_action_done(module); 1177 } 1178 1179 void 1180 spdk_bdev_module_examine_done(struct spdk_bdev_module *module) 1181 { 1182 bdev_module_action_done(module); 1183 } 1184 1185 /** The last initialized bdev module */ 1186 static struct spdk_bdev_module *g_resume_bdev_module = NULL; 1187 1188 static void 1189 bdev_init_failed(void *cb_arg) 1190 { 1191 struct spdk_bdev_module *module = cb_arg; 1192 1193 module->internal.action_in_progress--; 1194 bdev_init_complete(-1); 1195 } 1196 1197 static int 1198 bdev_modules_init(void) 1199 { 1200 struct spdk_bdev_module *module; 1201 int rc = 0; 1202 1203 TAILQ_FOREACH(module, &g_bdev_mgr.bdev_modules, internal.tailq) { 1204 g_resume_bdev_module = module; 1205 if (module->async_init) { 1206 module->internal.action_in_progress = 1; 1207 } 1208 rc = module->module_init(); 1209 if (rc != 0) { 1210 /* Bump action_in_progress to prevent other modules from completion of modules_init 1211 * Send message to defer application shutdown until resources are cleaned up */ 1212 module->internal.action_in_progress = 1; 1213 spdk_thread_send_msg(spdk_get_thread(), bdev_init_failed, module); 1214 return rc; 1215 } 1216 } 1217 1218 g_resume_bdev_module = NULL; 1219 return 0; 1220 } 1221 1222 void 1223 spdk_bdev_initialize(spdk_bdev_init_cb cb_fn, void *cb_arg) 1224 { 1225 int cache_size; 1226 int rc = 0; 1227 char mempool_name[32]; 1228 1229 assert(cb_fn != NULL); 1230 1231 g_init_cb_fn = cb_fn; 1232 g_init_cb_arg = cb_arg; 1233 1234 spdk_notify_type_register("bdev_register"); 1235 spdk_notify_type_register("bdev_unregister"); 1236 1237 snprintf(mempool_name, sizeof(mempool_name), "bdev_io_%d", getpid()); 1238 1239 g_bdev_mgr.bdev_io_pool = spdk_mempool_create(mempool_name, 1240 g_bdev_opts.bdev_io_pool_size, 1241 sizeof(struct spdk_bdev_io) + 1242 bdev_module_get_max_ctx_size(), 1243 0, 1244 SPDK_ENV_SOCKET_ID_ANY); 1245 1246 if (g_bdev_mgr.bdev_io_pool == NULL) { 1247 SPDK_ERRLOG("could not allocate spdk_bdev_io pool\n"); 1248 bdev_init_complete(-1); 1249 return; 1250 } 1251 1252 /** 1253 * Ensure no more than half of the total buffers end up local caches, by 1254 * using spdk_env_get_core_count() to determine how many local caches we need 1255 * to account for. 1256 */ 1257 cache_size = BUF_SMALL_POOL_SIZE / (2 * spdk_env_get_core_count()); 1258 snprintf(mempool_name, sizeof(mempool_name), "buf_small_pool_%d", getpid()); 1259 1260 g_bdev_mgr.buf_small_pool = spdk_mempool_create(mempool_name, 1261 BUF_SMALL_POOL_SIZE, 1262 SPDK_BDEV_BUF_SIZE_WITH_MD(SPDK_BDEV_SMALL_BUF_MAX_SIZE) + 1263 SPDK_BDEV_POOL_ALIGNMENT, 1264 cache_size, 1265 SPDK_ENV_SOCKET_ID_ANY); 1266 if (!g_bdev_mgr.buf_small_pool) { 1267 SPDK_ERRLOG("create rbuf small pool failed\n"); 1268 bdev_init_complete(-1); 1269 return; 1270 } 1271 1272 cache_size = BUF_LARGE_POOL_SIZE / (2 * spdk_env_get_core_count()); 1273 snprintf(mempool_name, sizeof(mempool_name), "buf_large_pool_%d", getpid()); 1274 1275 g_bdev_mgr.buf_large_pool = spdk_mempool_create(mempool_name, 1276 BUF_LARGE_POOL_SIZE, 1277 SPDK_BDEV_BUF_SIZE_WITH_MD(SPDK_BDEV_LARGE_BUF_MAX_SIZE) + 1278 SPDK_BDEV_POOL_ALIGNMENT, 1279 cache_size, 1280 SPDK_ENV_SOCKET_ID_ANY); 1281 if (!g_bdev_mgr.buf_large_pool) { 1282 SPDK_ERRLOG("create rbuf large pool failed\n"); 1283 bdev_init_complete(-1); 1284 return; 1285 } 1286 1287 g_bdev_mgr.zero_buffer = spdk_zmalloc(ZERO_BUFFER_SIZE, ZERO_BUFFER_SIZE, 1288 NULL, SPDK_ENV_LCORE_ID_ANY, SPDK_MALLOC_DMA); 1289 if (!g_bdev_mgr.zero_buffer) { 1290 SPDK_ERRLOG("create bdev zero buffer failed\n"); 1291 bdev_init_complete(-1); 1292 return; 1293 } 1294 1295 #ifdef SPDK_CONFIG_VTUNE 1296 g_bdev_mgr.domain = __itt_domain_create("spdk_bdev"); 1297 #endif 1298 1299 spdk_io_device_register(&g_bdev_mgr, bdev_mgmt_channel_create, 1300 bdev_mgmt_channel_destroy, 1301 sizeof(struct spdk_bdev_mgmt_channel), 1302 "bdev_mgr"); 1303 1304 rc = bdev_modules_init(); 1305 g_bdev_mgr.module_init_complete = true; 1306 if (rc != 0) { 1307 SPDK_ERRLOG("bdev modules init failed\n"); 1308 return; 1309 } 1310 1311 bdev_module_action_complete(); 1312 } 1313 1314 static void 1315 bdev_mgr_unregister_cb(void *io_device) 1316 { 1317 spdk_bdev_fini_cb cb_fn = g_fini_cb_fn; 1318 1319 if (g_bdev_mgr.bdev_io_pool) { 1320 if (spdk_mempool_count(g_bdev_mgr.bdev_io_pool) != g_bdev_opts.bdev_io_pool_size) { 1321 SPDK_ERRLOG("bdev IO pool count is %zu but should be %u\n", 1322 spdk_mempool_count(g_bdev_mgr.bdev_io_pool), 1323 g_bdev_opts.bdev_io_pool_size); 1324 } 1325 1326 spdk_mempool_free(g_bdev_mgr.bdev_io_pool); 1327 } 1328 1329 if (g_bdev_mgr.buf_small_pool) { 1330 if (spdk_mempool_count(g_bdev_mgr.buf_small_pool) != BUF_SMALL_POOL_SIZE) { 1331 SPDK_ERRLOG("Small buffer pool count is %zu but should be %u\n", 1332 spdk_mempool_count(g_bdev_mgr.buf_small_pool), 1333 BUF_SMALL_POOL_SIZE); 1334 assert(false); 1335 } 1336 1337 spdk_mempool_free(g_bdev_mgr.buf_small_pool); 1338 } 1339 1340 if (g_bdev_mgr.buf_large_pool) { 1341 if (spdk_mempool_count(g_bdev_mgr.buf_large_pool) != BUF_LARGE_POOL_SIZE) { 1342 SPDK_ERRLOG("Large buffer pool count is %zu but should be %u\n", 1343 spdk_mempool_count(g_bdev_mgr.buf_large_pool), 1344 BUF_LARGE_POOL_SIZE); 1345 assert(false); 1346 } 1347 1348 spdk_mempool_free(g_bdev_mgr.buf_large_pool); 1349 } 1350 1351 spdk_free(g_bdev_mgr.zero_buffer); 1352 1353 bdev_examine_allowlist_free(); 1354 1355 cb_fn(g_fini_cb_arg); 1356 g_fini_cb_fn = NULL; 1357 g_fini_cb_arg = NULL; 1358 g_bdev_mgr.init_complete = false; 1359 g_bdev_mgr.module_init_complete = false; 1360 pthread_mutex_destroy(&g_bdev_mgr.mutex); 1361 } 1362 1363 static void 1364 bdev_module_finish_iter(void *arg) 1365 { 1366 struct spdk_bdev_module *bdev_module; 1367 1368 /* FIXME: Handling initialization failures is broken now, 1369 * so we won't even try cleaning up after successfully 1370 * initialized modules. if module_init_complete is false, 1371 * just call spdk_bdev_mgr_unregister_cb 1372 */ 1373 if (!g_bdev_mgr.module_init_complete) { 1374 bdev_mgr_unregister_cb(NULL); 1375 return; 1376 } 1377 1378 /* Start iterating from the last touched module */ 1379 if (!g_resume_bdev_module) { 1380 bdev_module = TAILQ_LAST(&g_bdev_mgr.bdev_modules, bdev_module_list); 1381 } else { 1382 bdev_module = TAILQ_PREV(g_resume_bdev_module, bdev_module_list, 1383 internal.tailq); 1384 } 1385 1386 while (bdev_module) { 1387 if (bdev_module->async_fini) { 1388 /* Save our place so we can resume later. We must 1389 * save the variable here, before calling module_fini() 1390 * below, because in some cases the module may immediately 1391 * call spdk_bdev_module_finish_done() and re-enter 1392 * this function to continue iterating. */ 1393 g_resume_bdev_module = bdev_module; 1394 } 1395 1396 if (bdev_module->module_fini) { 1397 bdev_module->module_fini(); 1398 } 1399 1400 if (bdev_module->async_fini) { 1401 return; 1402 } 1403 1404 bdev_module = TAILQ_PREV(bdev_module, bdev_module_list, 1405 internal.tailq); 1406 } 1407 1408 g_resume_bdev_module = NULL; 1409 spdk_io_device_unregister(&g_bdev_mgr, bdev_mgr_unregister_cb); 1410 } 1411 1412 void 1413 spdk_bdev_module_finish_done(void) 1414 { 1415 if (spdk_get_thread() != g_fini_thread) { 1416 spdk_thread_send_msg(g_fini_thread, bdev_module_finish_iter, NULL); 1417 } else { 1418 bdev_module_finish_iter(NULL); 1419 } 1420 } 1421 1422 static void 1423 bdev_finish_unregister_bdevs_iter(void *cb_arg, int bdeverrno) 1424 { 1425 struct spdk_bdev *bdev = cb_arg; 1426 1427 if (bdeverrno && bdev) { 1428 SPDK_WARNLOG("Unable to unregister bdev '%s' during spdk_bdev_finish()\n", 1429 bdev->name); 1430 1431 /* 1432 * Since the call to spdk_bdev_unregister() failed, we have no way to free this 1433 * bdev; try to continue by manually removing this bdev from the list and continue 1434 * with the next bdev in the list. 1435 */ 1436 TAILQ_REMOVE(&g_bdev_mgr.bdevs, bdev, internal.link); 1437 } 1438 1439 if (TAILQ_EMPTY(&g_bdev_mgr.bdevs)) { 1440 SPDK_DEBUGLOG(bdev, "Done unregistering bdevs\n"); 1441 /* 1442 * Bdev module finish need to be deferred as we might be in the middle of some context 1443 * (like bdev part free) that will use this bdev (or private bdev driver ctx data) 1444 * after returning. 1445 */ 1446 spdk_thread_send_msg(spdk_get_thread(), bdev_module_finish_iter, NULL); 1447 return; 1448 } 1449 1450 /* 1451 * Unregister last unclaimed bdev in the list, to ensure that bdev subsystem 1452 * shutdown proceeds top-down. The goal is to give virtual bdevs an opportunity 1453 * to detect clean shutdown as opposed to run-time hot removal of the underlying 1454 * base bdevs. 1455 * 1456 * Also, walk the list in the reverse order. 1457 */ 1458 for (bdev = TAILQ_LAST(&g_bdev_mgr.bdevs, spdk_bdev_list); 1459 bdev; bdev = TAILQ_PREV(bdev, spdk_bdev_list, internal.link)) { 1460 if (bdev->internal.claim_module != NULL) { 1461 SPDK_DEBUGLOG(bdev, "Skipping claimed bdev '%s'(<-'%s').\n", 1462 bdev->name, bdev->internal.claim_module->name); 1463 continue; 1464 } 1465 1466 SPDK_DEBUGLOG(bdev, "Unregistering bdev '%s'\n", bdev->name); 1467 spdk_bdev_unregister(bdev, bdev_finish_unregister_bdevs_iter, bdev); 1468 return; 1469 } 1470 1471 /* 1472 * If any bdev fails to unclaim underlying bdev properly, we may face the 1473 * case of bdev list consisting of claimed bdevs only (if claims are managed 1474 * correctly, this would mean there's a loop in the claims graph which is 1475 * clearly impossible). Warn and unregister last bdev on the list then. 1476 */ 1477 for (bdev = TAILQ_LAST(&g_bdev_mgr.bdevs, spdk_bdev_list); 1478 bdev; bdev = TAILQ_PREV(bdev, spdk_bdev_list, internal.link)) { 1479 SPDK_WARNLOG("Unregistering claimed bdev '%s'!\n", bdev->name); 1480 spdk_bdev_unregister(bdev, bdev_finish_unregister_bdevs_iter, bdev); 1481 return; 1482 } 1483 } 1484 1485 void 1486 spdk_bdev_finish(spdk_bdev_fini_cb cb_fn, void *cb_arg) 1487 { 1488 struct spdk_bdev_module *m; 1489 1490 assert(cb_fn != NULL); 1491 1492 g_fini_thread = spdk_get_thread(); 1493 1494 g_fini_cb_fn = cb_fn; 1495 g_fini_cb_arg = cb_arg; 1496 1497 TAILQ_FOREACH(m, &g_bdev_mgr.bdev_modules, internal.tailq) { 1498 if (m->fini_start) { 1499 m->fini_start(); 1500 } 1501 } 1502 1503 bdev_finish_unregister_bdevs_iter(NULL, 0); 1504 } 1505 1506 struct spdk_bdev_io * 1507 bdev_channel_get_io(struct spdk_bdev_channel *channel) 1508 { 1509 struct spdk_bdev_mgmt_channel *ch = channel->shared_resource->mgmt_ch; 1510 struct spdk_bdev_io *bdev_io; 1511 1512 if (ch->per_thread_cache_count > 0) { 1513 bdev_io = STAILQ_FIRST(&ch->per_thread_cache); 1514 STAILQ_REMOVE_HEAD(&ch->per_thread_cache, internal.buf_link); 1515 ch->per_thread_cache_count--; 1516 } else if (spdk_unlikely(!TAILQ_EMPTY(&ch->io_wait_queue))) { 1517 /* 1518 * Don't try to look for bdev_ios in the global pool if there are 1519 * waiters on bdev_ios - we don't want this caller to jump the line. 1520 */ 1521 bdev_io = NULL; 1522 } else { 1523 bdev_io = spdk_mempool_get(g_bdev_mgr.bdev_io_pool); 1524 } 1525 1526 return bdev_io; 1527 } 1528 1529 void 1530 spdk_bdev_free_io(struct spdk_bdev_io *bdev_io) 1531 { 1532 struct spdk_bdev_mgmt_channel *ch; 1533 1534 assert(bdev_io != NULL); 1535 assert(bdev_io->internal.status != SPDK_BDEV_IO_STATUS_PENDING); 1536 1537 ch = bdev_io->internal.ch->shared_resource->mgmt_ch; 1538 1539 if (bdev_io->internal.buf != NULL) { 1540 bdev_io_put_buf(bdev_io); 1541 } 1542 1543 if (ch->per_thread_cache_count < ch->bdev_io_cache_size) { 1544 ch->per_thread_cache_count++; 1545 STAILQ_INSERT_HEAD(&ch->per_thread_cache, bdev_io, internal.buf_link); 1546 while (ch->per_thread_cache_count > 0 && !TAILQ_EMPTY(&ch->io_wait_queue)) { 1547 struct spdk_bdev_io_wait_entry *entry; 1548 1549 entry = TAILQ_FIRST(&ch->io_wait_queue); 1550 TAILQ_REMOVE(&ch->io_wait_queue, entry, link); 1551 entry->cb_fn(entry->cb_arg); 1552 } 1553 } else { 1554 /* We should never have a full cache with entries on the io wait queue. */ 1555 assert(TAILQ_EMPTY(&ch->io_wait_queue)); 1556 spdk_mempool_put(g_bdev_mgr.bdev_io_pool, (void *)bdev_io); 1557 } 1558 } 1559 1560 static bool 1561 bdev_qos_is_iops_rate_limit(enum spdk_bdev_qos_rate_limit_type limit) 1562 { 1563 assert(limit != SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES); 1564 1565 switch (limit) { 1566 case SPDK_BDEV_QOS_RW_IOPS_RATE_LIMIT: 1567 return true; 1568 case SPDK_BDEV_QOS_RW_BPS_RATE_LIMIT: 1569 case SPDK_BDEV_QOS_R_BPS_RATE_LIMIT: 1570 case SPDK_BDEV_QOS_W_BPS_RATE_LIMIT: 1571 return false; 1572 case SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES: 1573 default: 1574 return false; 1575 } 1576 } 1577 1578 static bool 1579 bdev_qos_io_to_limit(struct spdk_bdev_io *bdev_io) 1580 { 1581 switch (bdev_io->type) { 1582 case SPDK_BDEV_IO_TYPE_NVME_IO: 1583 case SPDK_BDEV_IO_TYPE_NVME_IO_MD: 1584 case SPDK_BDEV_IO_TYPE_READ: 1585 case SPDK_BDEV_IO_TYPE_WRITE: 1586 return true; 1587 case SPDK_BDEV_IO_TYPE_ZCOPY: 1588 if (bdev_io->u.bdev.zcopy.start) { 1589 return true; 1590 } else { 1591 return false; 1592 } 1593 default: 1594 return false; 1595 } 1596 } 1597 1598 static bool 1599 bdev_is_read_io(struct spdk_bdev_io *bdev_io) 1600 { 1601 switch (bdev_io->type) { 1602 case SPDK_BDEV_IO_TYPE_NVME_IO: 1603 case SPDK_BDEV_IO_TYPE_NVME_IO_MD: 1604 /* Bit 1 (0x2) set for read operation */ 1605 if (bdev_io->u.nvme_passthru.cmd.opc & SPDK_NVME_OPC_READ) { 1606 return true; 1607 } else { 1608 return false; 1609 } 1610 case SPDK_BDEV_IO_TYPE_READ: 1611 return true; 1612 case SPDK_BDEV_IO_TYPE_ZCOPY: 1613 /* Populate to read from disk */ 1614 if (bdev_io->u.bdev.zcopy.populate) { 1615 return true; 1616 } else { 1617 return false; 1618 } 1619 default: 1620 return false; 1621 } 1622 } 1623 1624 static uint64_t 1625 bdev_get_io_size_in_byte(struct spdk_bdev_io *bdev_io) 1626 { 1627 struct spdk_bdev *bdev = bdev_io->bdev; 1628 1629 switch (bdev_io->type) { 1630 case SPDK_BDEV_IO_TYPE_NVME_IO: 1631 case SPDK_BDEV_IO_TYPE_NVME_IO_MD: 1632 return bdev_io->u.nvme_passthru.nbytes; 1633 case SPDK_BDEV_IO_TYPE_READ: 1634 case SPDK_BDEV_IO_TYPE_WRITE: 1635 return bdev_io->u.bdev.num_blocks * bdev->blocklen; 1636 case SPDK_BDEV_IO_TYPE_ZCOPY: 1637 /* Track the data in the start phase only */ 1638 if (bdev_io->u.bdev.zcopy.start) { 1639 return bdev_io->u.bdev.num_blocks * bdev->blocklen; 1640 } else { 1641 return 0; 1642 } 1643 default: 1644 return 0; 1645 } 1646 } 1647 1648 static bool 1649 bdev_qos_rw_queue_io(const struct spdk_bdev_qos_limit *limit, struct spdk_bdev_io *io) 1650 { 1651 if (limit->max_per_timeslice > 0 && limit->remaining_this_timeslice <= 0) { 1652 return true; 1653 } else { 1654 return false; 1655 } 1656 } 1657 1658 static bool 1659 bdev_qos_r_queue_io(const struct spdk_bdev_qos_limit *limit, struct spdk_bdev_io *io) 1660 { 1661 if (bdev_is_read_io(io) == false) { 1662 return false; 1663 } 1664 1665 return bdev_qos_rw_queue_io(limit, io); 1666 } 1667 1668 static bool 1669 bdev_qos_w_queue_io(const struct spdk_bdev_qos_limit *limit, struct spdk_bdev_io *io) 1670 { 1671 if (bdev_is_read_io(io) == true) { 1672 return false; 1673 } 1674 1675 return bdev_qos_rw_queue_io(limit, io); 1676 } 1677 1678 static void 1679 bdev_qos_rw_iops_update_quota(struct spdk_bdev_qos_limit *limit, struct spdk_bdev_io *io) 1680 { 1681 limit->remaining_this_timeslice--; 1682 } 1683 1684 static void 1685 bdev_qos_rw_bps_update_quota(struct spdk_bdev_qos_limit *limit, struct spdk_bdev_io *io) 1686 { 1687 limit->remaining_this_timeslice -= bdev_get_io_size_in_byte(io); 1688 } 1689 1690 static void 1691 bdev_qos_r_bps_update_quota(struct spdk_bdev_qos_limit *limit, struct spdk_bdev_io *io) 1692 { 1693 if (bdev_is_read_io(io) == false) { 1694 return; 1695 } 1696 1697 return bdev_qos_rw_bps_update_quota(limit, io); 1698 } 1699 1700 static void 1701 bdev_qos_w_bps_update_quota(struct spdk_bdev_qos_limit *limit, struct spdk_bdev_io *io) 1702 { 1703 if (bdev_is_read_io(io) == true) { 1704 return; 1705 } 1706 1707 return bdev_qos_rw_bps_update_quota(limit, io); 1708 } 1709 1710 static void 1711 bdev_qos_set_ops(struct spdk_bdev_qos *qos) 1712 { 1713 int i; 1714 1715 for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) { 1716 if (qos->rate_limits[i].limit == SPDK_BDEV_QOS_LIMIT_NOT_DEFINED) { 1717 qos->rate_limits[i].queue_io = NULL; 1718 qos->rate_limits[i].update_quota = NULL; 1719 continue; 1720 } 1721 1722 switch (i) { 1723 case SPDK_BDEV_QOS_RW_IOPS_RATE_LIMIT: 1724 qos->rate_limits[i].queue_io = bdev_qos_rw_queue_io; 1725 qos->rate_limits[i].update_quota = bdev_qos_rw_iops_update_quota; 1726 break; 1727 case SPDK_BDEV_QOS_RW_BPS_RATE_LIMIT: 1728 qos->rate_limits[i].queue_io = bdev_qos_rw_queue_io; 1729 qos->rate_limits[i].update_quota = bdev_qos_rw_bps_update_quota; 1730 break; 1731 case SPDK_BDEV_QOS_R_BPS_RATE_LIMIT: 1732 qos->rate_limits[i].queue_io = bdev_qos_r_queue_io; 1733 qos->rate_limits[i].update_quota = bdev_qos_r_bps_update_quota; 1734 break; 1735 case SPDK_BDEV_QOS_W_BPS_RATE_LIMIT: 1736 qos->rate_limits[i].queue_io = bdev_qos_w_queue_io; 1737 qos->rate_limits[i].update_quota = bdev_qos_w_bps_update_quota; 1738 break; 1739 default: 1740 break; 1741 } 1742 } 1743 } 1744 1745 static void 1746 _bdev_io_complete_in_submit(struct spdk_bdev_channel *bdev_ch, 1747 struct spdk_bdev_io *bdev_io, 1748 enum spdk_bdev_io_status status) 1749 { 1750 struct spdk_bdev_shared_resource *shared_resource = bdev_ch->shared_resource; 1751 1752 bdev_io->internal.in_submit_request = true; 1753 bdev_ch->io_outstanding++; 1754 shared_resource->io_outstanding++; 1755 spdk_bdev_io_complete(bdev_io, status); 1756 bdev_io->internal.in_submit_request = false; 1757 } 1758 1759 static inline void 1760 bdev_io_do_submit(struct spdk_bdev_channel *bdev_ch, struct spdk_bdev_io *bdev_io) 1761 { 1762 struct spdk_bdev *bdev = bdev_io->bdev; 1763 struct spdk_io_channel *ch = bdev_ch->channel; 1764 struct spdk_bdev_shared_resource *shared_resource = bdev_ch->shared_resource; 1765 1766 if (spdk_unlikely(bdev_io->type == SPDK_BDEV_IO_TYPE_ABORT)) { 1767 struct spdk_bdev_mgmt_channel *mgmt_channel = shared_resource->mgmt_ch; 1768 struct spdk_bdev_io *bio_to_abort = bdev_io->u.abort.bio_to_abort; 1769 1770 if (bdev_abort_queued_io(&shared_resource->nomem_io, bio_to_abort) || 1771 bdev_abort_buf_io(&mgmt_channel->need_buf_small, bio_to_abort) || 1772 bdev_abort_buf_io(&mgmt_channel->need_buf_large, bio_to_abort)) { 1773 _bdev_io_complete_in_submit(bdev_ch, bdev_io, 1774 SPDK_BDEV_IO_STATUS_SUCCESS); 1775 return; 1776 } 1777 } 1778 1779 if (spdk_likely(TAILQ_EMPTY(&shared_resource->nomem_io))) { 1780 bdev_ch->io_outstanding++; 1781 shared_resource->io_outstanding++; 1782 bdev_io->internal.in_submit_request = true; 1783 bdev->fn_table->submit_request(ch, bdev_io); 1784 bdev_io->internal.in_submit_request = false; 1785 } else { 1786 TAILQ_INSERT_TAIL(&shared_resource->nomem_io, bdev_io, internal.link); 1787 } 1788 } 1789 1790 static int 1791 bdev_qos_io_submit(struct spdk_bdev_channel *ch, struct spdk_bdev_qos *qos) 1792 { 1793 struct spdk_bdev_io *bdev_io = NULL, *tmp = NULL; 1794 int i, submitted_ios = 0; 1795 1796 TAILQ_FOREACH_SAFE(bdev_io, &qos->queued, internal.link, tmp) { 1797 if (bdev_qos_io_to_limit(bdev_io) == true) { 1798 for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) { 1799 if (!qos->rate_limits[i].queue_io) { 1800 continue; 1801 } 1802 1803 if (qos->rate_limits[i].queue_io(&qos->rate_limits[i], 1804 bdev_io) == true) { 1805 return submitted_ios; 1806 } 1807 } 1808 for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) { 1809 if (!qos->rate_limits[i].update_quota) { 1810 continue; 1811 } 1812 1813 qos->rate_limits[i].update_quota(&qos->rate_limits[i], bdev_io); 1814 } 1815 } 1816 1817 TAILQ_REMOVE(&qos->queued, bdev_io, internal.link); 1818 bdev_io_do_submit(ch, bdev_io); 1819 submitted_ios++; 1820 } 1821 1822 return submitted_ios; 1823 } 1824 1825 static void 1826 bdev_queue_io_wait_with_cb(struct spdk_bdev_io *bdev_io, spdk_bdev_io_wait_cb cb_fn) 1827 { 1828 int rc; 1829 1830 bdev_io->internal.waitq_entry.bdev = bdev_io->bdev; 1831 bdev_io->internal.waitq_entry.cb_fn = cb_fn; 1832 bdev_io->internal.waitq_entry.cb_arg = bdev_io; 1833 rc = spdk_bdev_queue_io_wait(bdev_io->bdev, spdk_io_channel_from_ctx(bdev_io->internal.ch), 1834 &bdev_io->internal.waitq_entry); 1835 if (rc != 0) { 1836 SPDK_ERRLOG("Queue IO failed, rc=%d\n", rc); 1837 bdev_io->internal.status = SPDK_BDEV_IO_STATUS_FAILED; 1838 bdev_io->internal.cb(bdev_io, false, bdev_io->internal.caller_ctx); 1839 } 1840 } 1841 1842 static bool 1843 bdev_io_type_can_split(uint8_t type) 1844 { 1845 assert(type != SPDK_BDEV_IO_TYPE_INVALID); 1846 assert(type < SPDK_BDEV_NUM_IO_TYPES); 1847 1848 /* Only split READ and WRITE I/O. Theoretically other types of I/O like 1849 * UNMAP could be split, but these types of I/O are typically much larger 1850 * in size (sometimes the size of the entire block device), and the bdev 1851 * module can more efficiently split these types of I/O. Plus those types 1852 * of I/O do not have a payload, which makes the splitting process simpler. 1853 */ 1854 if (type == SPDK_BDEV_IO_TYPE_READ || type == SPDK_BDEV_IO_TYPE_WRITE) { 1855 return true; 1856 } else { 1857 return false; 1858 } 1859 } 1860 1861 static bool 1862 bdev_io_should_split(struct spdk_bdev_io *bdev_io) 1863 { 1864 uint64_t start_stripe, end_stripe; 1865 uint32_t io_boundary = bdev_io->bdev->optimal_io_boundary; 1866 1867 if (io_boundary == 0) { 1868 return false; 1869 } 1870 1871 if (!bdev_io_type_can_split(bdev_io->type)) { 1872 return false; 1873 } 1874 1875 start_stripe = bdev_io->u.bdev.offset_blocks; 1876 end_stripe = start_stripe + bdev_io->u.bdev.num_blocks - 1; 1877 /* Avoid expensive div operations if possible. These spdk_u32 functions are very cheap. */ 1878 if (spdk_likely(spdk_u32_is_pow2(io_boundary))) { 1879 start_stripe >>= spdk_u32log2(io_boundary); 1880 end_stripe >>= spdk_u32log2(io_boundary); 1881 } else { 1882 start_stripe /= io_boundary; 1883 end_stripe /= io_boundary; 1884 } 1885 return (start_stripe != end_stripe); 1886 } 1887 1888 static uint32_t 1889 _to_next_boundary(uint64_t offset, uint32_t boundary) 1890 { 1891 return (boundary - (offset % boundary)); 1892 } 1893 1894 static void 1895 bdev_io_split_done(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg); 1896 1897 static void 1898 _bdev_io_split(void *_bdev_io) 1899 { 1900 struct spdk_bdev_io *bdev_io = _bdev_io; 1901 uint64_t parent_offset, current_offset, remaining; 1902 uint32_t blocklen, to_next_boundary, to_next_boundary_bytes, to_last_block_bytes; 1903 struct iovec *parent_iov, *iov; 1904 uint64_t parent_iov_offset, iov_len; 1905 uint32_t parent_iovpos, parent_iovcnt, child_iovcnt, iovcnt; 1906 void *md_buf = NULL; 1907 int rc; 1908 1909 remaining = bdev_io->u.bdev.split_remaining_num_blocks; 1910 current_offset = bdev_io->u.bdev.split_current_offset_blocks; 1911 parent_offset = bdev_io->u.bdev.offset_blocks; 1912 blocklen = bdev_io->bdev->blocklen; 1913 parent_iov_offset = (current_offset - parent_offset) * blocklen; 1914 parent_iovcnt = bdev_io->u.bdev.iovcnt; 1915 1916 for (parent_iovpos = 0; parent_iovpos < parent_iovcnt; parent_iovpos++) { 1917 parent_iov = &bdev_io->u.bdev.iovs[parent_iovpos]; 1918 if (parent_iov_offset < parent_iov->iov_len) { 1919 break; 1920 } 1921 parent_iov_offset -= parent_iov->iov_len; 1922 } 1923 1924 child_iovcnt = 0; 1925 while (remaining > 0 && parent_iovpos < parent_iovcnt && child_iovcnt < BDEV_IO_NUM_CHILD_IOV) { 1926 to_next_boundary = _to_next_boundary(current_offset, bdev_io->bdev->optimal_io_boundary); 1927 to_next_boundary = spdk_min(remaining, to_next_boundary); 1928 to_next_boundary_bytes = to_next_boundary * blocklen; 1929 iov = &bdev_io->child_iov[child_iovcnt]; 1930 iovcnt = 0; 1931 1932 if (bdev_io->u.bdev.md_buf) { 1933 md_buf = (char *)bdev_io->u.bdev.md_buf + 1934 (current_offset - parent_offset) * spdk_bdev_get_md_size(bdev_io->bdev); 1935 } 1936 1937 while (to_next_boundary_bytes > 0 && parent_iovpos < parent_iovcnt && 1938 child_iovcnt < BDEV_IO_NUM_CHILD_IOV) { 1939 parent_iov = &bdev_io->u.bdev.iovs[parent_iovpos]; 1940 iov_len = spdk_min(to_next_boundary_bytes, parent_iov->iov_len - parent_iov_offset); 1941 to_next_boundary_bytes -= iov_len; 1942 1943 bdev_io->child_iov[child_iovcnt].iov_base = parent_iov->iov_base + parent_iov_offset; 1944 bdev_io->child_iov[child_iovcnt].iov_len = iov_len; 1945 1946 if (iov_len < parent_iov->iov_len - parent_iov_offset) { 1947 parent_iov_offset += iov_len; 1948 } else { 1949 parent_iovpos++; 1950 parent_iov_offset = 0; 1951 } 1952 child_iovcnt++; 1953 iovcnt++; 1954 } 1955 1956 if (to_next_boundary_bytes > 0) { 1957 /* We had to stop this child I/O early because we ran out of 1958 * child_iov space. Ensure the iovs to be aligned with block 1959 * size and then adjust to_next_boundary before starting the 1960 * child I/O. 1961 */ 1962 assert(child_iovcnt == BDEV_IO_NUM_CHILD_IOV); 1963 to_last_block_bytes = to_next_boundary_bytes % blocklen; 1964 if (to_last_block_bytes != 0) { 1965 uint32_t child_iovpos = child_iovcnt - 1; 1966 /* don't decrease child_iovcnt so the loop will naturally end */ 1967 1968 to_last_block_bytes = blocklen - to_last_block_bytes; 1969 to_next_boundary_bytes += to_last_block_bytes; 1970 while (to_last_block_bytes > 0 && iovcnt > 0) { 1971 iov_len = spdk_min(to_last_block_bytes, 1972 bdev_io->child_iov[child_iovpos].iov_len); 1973 bdev_io->child_iov[child_iovpos].iov_len -= iov_len; 1974 if (bdev_io->child_iov[child_iovpos].iov_len == 0) { 1975 child_iovpos--; 1976 if (--iovcnt == 0) { 1977 return; 1978 } 1979 } 1980 to_last_block_bytes -= iov_len; 1981 } 1982 1983 assert(to_last_block_bytes == 0); 1984 } 1985 to_next_boundary -= to_next_boundary_bytes / blocklen; 1986 } 1987 1988 bdev_io->u.bdev.split_outstanding++; 1989 1990 if (bdev_io->type == SPDK_BDEV_IO_TYPE_READ) { 1991 rc = bdev_readv_blocks_with_md(bdev_io->internal.desc, 1992 spdk_io_channel_from_ctx(bdev_io->internal.ch), 1993 iov, iovcnt, md_buf, current_offset, 1994 to_next_boundary, 1995 bdev_io_split_done, bdev_io); 1996 } else { 1997 rc = bdev_writev_blocks_with_md(bdev_io->internal.desc, 1998 spdk_io_channel_from_ctx(bdev_io->internal.ch), 1999 iov, iovcnt, md_buf, current_offset, 2000 to_next_boundary, 2001 bdev_io_split_done, bdev_io); 2002 } 2003 2004 if (rc == 0) { 2005 current_offset += to_next_boundary; 2006 remaining -= to_next_boundary; 2007 bdev_io->u.bdev.split_current_offset_blocks = current_offset; 2008 bdev_io->u.bdev.split_remaining_num_blocks = remaining; 2009 } else { 2010 bdev_io->u.bdev.split_outstanding--; 2011 if (rc == -ENOMEM) { 2012 if (bdev_io->u.bdev.split_outstanding == 0) { 2013 /* No I/O is outstanding. Hence we should wait here. */ 2014 bdev_queue_io_wait_with_cb(bdev_io, _bdev_io_split); 2015 } 2016 } else { 2017 bdev_io->internal.status = SPDK_BDEV_IO_STATUS_FAILED; 2018 if (bdev_io->u.bdev.split_outstanding == 0) { 2019 spdk_trace_record_tsc(spdk_get_ticks(), TRACE_BDEV_IO_DONE, 0, 0, 2020 (uintptr_t)bdev_io, 0); 2021 TAILQ_REMOVE(&bdev_io->internal.ch->io_submitted, bdev_io, internal.ch_link); 2022 bdev_io->internal.cb(bdev_io, false, bdev_io->internal.caller_ctx); 2023 } 2024 } 2025 2026 return; 2027 } 2028 } 2029 } 2030 2031 static void 2032 bdev_io_split_done(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg) 2033 { 2034 struct spdk_bdev_io *parent_io = cb_arg; 2035 2036 spdk_bdev_free_io(bdev_io); 2037 2038 if (!success) { 2039 parent_io->internal.status = SPDK_BDEV_IO_STATUS_FAILED; 2040 /* If any child I/O failed, stop further splitting process. */ 2041 parent_io->u.bdev.split_current_offset_blocks += parent_io->u.bdev.split_remaining_num_blocks; 2042 parent_io->u.bdev.split_remaining_num_blocks = 0; 2043 } 2044 parent_io->u.bdev.split_outstanding--; 2045 if (parent_io->u.bdev.split_outstanding != 0) { 2046 return; 2047 } 2048 2049 /* 2050 * Parent I/O finishes when all blocks are consumed. 2051 */ 2052 if (parent_io->u.bdev.split_remaining_num_blocks == 0) { 2053 assert(parent_io->internal.cb != bdev_io_split_done); 2054 spdk_trace_record_tsc(spdk_get_ticks(), TRACE_BDEV_IO_DONE, 0, 0, 2055 (uintptr_t)parent_io, 0); 2056 TAILQ_REMOVE(&parent_io->internal.ch->io_submitted, parent_io, internal.ch_link); 2057 parent_io->internal.cb(parent_io, parent_io->internal.status == SPDK_BDEV_IO_STATUS_SUCCESS, 2058 parent_io->internal.caller_ctx); 2059 return; 2060 } 2061 2062 /* 2063 * Continue with the splitting process. This function will complete the parent I/O if the 2064 * splitting is done. 2065 */ 2066 _bdev_io_split(parent_io); 2067 } 2068 2069 static void 2070 bdev_io_split_get_buf_cb(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io, bool success); 2071 2072 static void 2073 bdev_io_split(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io) 2074 { 2075 assert(bdev_io_type_can_split(bdev_io->type)); 2076 2077 bdev_io->u.bdev.split_current_offset_blocks = bdev_io->u.bdev.offset_blocks; 2078 bdev_io->u.bdev.split_remaining_num_blocks = bdev_io->u.bdev.num_blocks; 2079 bdev_io->u.bdev.split_outstanding = 0; 2080 bdev_io->internal.status = SPDK_BDEV_IO_STATUS_SUCCESS; 2081 2082 if (_is_buf_allocated(bdev_io->u.bdev.iovs)) { 2083 _bdev_io_split(bdev_io); 2084 } else { 2085 assert(bdev_io->type == SPDK_BDEV_IO_TYPE_READ); 2086 spdk_bdev_io_get_buf(bdev_io, bdev_io_split_get_buf_cb, 2087 bdev_io->u.bdev.num_blocks * bdev_io->bdev->blocklen); 2088 } 2089 } 2090 2091 static void 2092 bdev_io_split_get_buf_cb(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io, bool success) 2093 { 2094 if (!success) { 2095 spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED); 2096 return; 2097 } 2098 2099 _bdev_io_split(bdev_io); 2100 } 2101 2102 /* Explicitly mark this inline, since it's used as a function pointer and otherwise won't 2103 * be inlined, at least on some compilers. 2104 */ 2105 static inline void 2106 _bdev_io_submit(void *ctx) 2107 { 2108 struct spdk_bdev_io *bdev_io = ctx; 2109 struct spdk_bdev *bdev = bdev_io->bdev; 2110 struct spdk_bdev_channel *bdev_ch = bdev_io->internal.ch; 2111 uint64_t tsc; 2112 2113 tsc = spdk_get_ticks(); 2114 bdev_io->internal.submit_tsc = tsc; 2115 spdk_trace_record_tsc(tsc, TRACE_BDEV_IO_START, 0, 0, (uintptr_t)bdev_io, bdev_io->type); 2116 2117 if (spdk_likely(bdev_ch->flags == 0)) { 2118 bdev_io_do_submit(bdev_ch, bdev_io); 2119 return; 2120 } 2121 2122 if (bdev_ch->flags & BDEV_CH_RESET_IN_PROGRESS) { 2123 _bdev_io_complete_in_submit(bdev_ch, bdev_io, SPDK_BDEV_IO_STATUS_ABORTED); 2124 } else if (bdev_ch->flags & BDEV_CH_QOS_ENABLED) { 2125 if (spdk_unlikely(bdev_io->type == SPDK_BDEV_IO_TYPE_ABORT) && 2126 bdev_abort_queued_io(&bdev->internal.qos->queued, bdev_io->u.abort.bio_to_abort)) { 2127 _bdev_io_complete_in_submit(bdev_ch, bdev_io, SPDK_BDEV_IO_STATUS_SUCCESS); 2128 } else { 2129 TAILQ_INSERT_TAIL(&bdev->internal.qos->queued, bdev_io, internal.link); 2130 bdev_qos_io_submit(bdev_ch, bdev->internal.qos); 2131 } 2132 } else { 2133 SPDK_ERRLOG("unknown bdev_ch flag %x found\n", bdev_ch->flags); 2134 _bdev_io_complete_in_submit(bdev_ch, bdev_io, SPDK_BDEV_IO_STATUS_FAILED); 2135 } 2136 } 2137 2138 bool 2139 bdev_lba_range_overlapped(struct lba_range *range1, struct lba_range *range2); 2140 2141 bool 2142 bdev_lba_range_overlapped(struct lba_range *range1, struct lba_range *range2) 2143 { 2144 if (range1->length == 0 || range2->length == 0) { 2145 return false; 2146 } 2147 2148 if (range1->offset + range1->length <= range2->offset) { 2149 return false; 2150 } 2151 2152 if (range2->offset + range2->length <= range1->offset) { 2153 return false; 2154 } 2155 2156 return true; 2157 } 2158 2159 static bool 2160 bdev_io_range_is_locked(struct spdk_bdev_io *bdev_io, struct lba_range *range) 2161 { 2162 struct spdk_bdev_channel *ch = bdev_io->internal.ch; 2163 struct lba_range r; 2164 2165 switch (bdev_io->type) { 2166 case SPDK_BDEV_IO_TYPE_NVME_IO: 2167 case SPDK_BDEV_IO_TYPE_NVME_IO_MD: 2168 /* Don't try to decode the NVMe command - just assume worst-case and that 2169 * it overlaps a locked range. 2170 */ 2171 return true; 2172 case SPDK_BDEV_IO_TYPE_WRITE: 2173 case SPDK_BDEV_IO_TYPE_UNMAP: 2174 case SPDK_BDEV_IO_TYPE_WRITE_ZEROES: 2175 case SPDK_BDEV_IO_TYPE_ZCOPY: 2176 r.offset = bdev_io->u.bdev.offset_blocks; 2177 r.length = bdev_io->u.bdev.num_blocks; 2178 if (!bdev_lba_range_overlapped(range, &r)) { 2179 /* This I/O doesn't overlap the specified LBA range. */ 2180 return false; 2181 } else if (range->owner_ch == ch && range->locked_ctx == bdev_io->internal.caller_ctx) { 2182 /* This I/O overlaps, but the I/O is on the same channel that locked this 2183 * range, and the caller_ctx is the same as the locked_ctx. This means 2184 * that this I/O is associated with the lock, and is allowed to execute. 2185 */ 2186 return false; 2187 } else { 2188 return true; 2189 } 2190 default: 2191 return false; 2192 } 2193 } 2194 2195 void 2196 bdev_io_submit(struct spdk_bdev_io *bdev_io) 2197 { 2198 struct spdk_bdev *bdev = bdev_io->bdev; 2199 struct spdk_thread *thread = spdk_bdev_io_get_thread(bdev_io); 2200 struct spdk_bdev_channel *ch = bdev_io->internal.ch; 2201 2202 assert(thread != NULL); 2203 assert(bdev_io->internal.status == SPDK_BDEV_IO_STATUS_PENDING); 2204 2205 if (!TAILQ_EMPTY(&ch->locked_ranges)) { 2206 struct lba_range *range; 2207 2208 TAILQ_FOREACH(range, &ch->locked_ranges, tailq) { 2209 if (bdev_io_range_is_locked(bdev_io, range)) { 2210 TAILQ_INSERT_TAIL(&ch->io_locked, bdev_io, internal.ch_link); 2211 return; 2212 } 2213 } 2214 } 2215 2216 TAILQ_INSERT_TAIL(&ch->io_submitted, bdev_io, internal.ch_link); 2217 2218 if (bdev->split_on_optimal_io_boundary && bdev_io_should_split(bdev_io)) { 2219 bdev_io->internal.submit_tsc = spdk_get_ticks(); 2220 spdk_trace_record_tsc(bdev_io->internal.submit_tsc, TRACE_BDEV_IO_START, 0, 0, 2221 (uintptr_t)bdev_io, bdev_io->type); 2222 bdev_io_split(NULL, bdev_io); 2223 return; 2224 } 2225 2226 if (ch->flags & BDEV_CH_QOS_ENABLED) { 2227 if ((thread == bdev->internal.qos->thread) || !bdev->internal.qos->thread) { 2228 _bdev_io_submit(bdev_io); 2229 } else { 2230 bdev_io->internal.io_submit_ch = ch; 2231 bdev_io->internal.ch = bdev->internal.qos->ch; 2232 spdk_thread_send_msg(bdev->internal.qos->thread, _bdev_io_submit, bdev_io); 2233 } 2234 } else { 2235 _bdev_io_submit(bdev_io); 2236 } 2237 } 2238 2239 static void 2240 bdev_io_submit_reset(struct spdk_bdev_io *bdev_io) 2241 { 2242 struct spdk_bdev *bdev = bdev_io->bdev; 2243 struct spdk_bdev_channel *bdev_ch = bdev_io->internal.ch; 2244 struct spdk_io_channel *ch = bdev_ch->channel; 2245 2246 assert(bdev_io->internal.status == SPDK_BDEV_IO_STATUS_PENDING); 2247 2248 bdev_io->internal.in_submit_request = true; 2249 bdev->fn_table->submit_request(ch, bdev_io); 2250 bdev_io->internal.in_submit_request = false; 2251 } 2252 2253 void 2254 bdev_io_init(struct spdk_bdev_io *bdev_io, 2255 struct spdk_bdev *bdev, void *cb_arg, 2256 spdk_bdev_io_completion_cb cb) 2257 { 2258 bdev_io->bdev = bdev; 2259 bdev_io->internal.caller_ctx = cb_arg; 2260 bdev_io->internal.cb = cb; 2261 bdev_io->internal.status = SPDK_BDEV_IO_STATUS_PENDING; 2262 bdev_io->internal.in_submit_request = false; 2263 bdev_io->internal.buf = NULL; 2264 bdev_io->internal.io_submit_ch = NULL; 2265 bdev_io->internal.orig_iovs = NULL; 2266 bdev_io->internal.orig_iovcnt = 0; 2267 bdev_io->internal.orig_md_buf = NULL; 2268 bdev_io->internal.error.nvme.cdw0 = 0; 2269 bdev_io->num_retries = 0; 2270 bdev_io->internal.get_buf_cb = NULL; 2271 bdev_io->internal.get_aux_buf_cb = NULL; 2272 } 2273 2274 static bool 2275 bdev_io_type_supported(struct spdk_bdev *bdev, enum spdk_bdev_io_type io_type) 2276 { 2277 return bdev->fn_table->io_type_supported(bdev->ctxt, io_type); 2278 } 2279 2280 bool 2281 spdk_bdev_io_type_supported(struct spdk_bdev *bdev, enum spdk_bdev_io_type io_type) 2282 { 2283 bool supported; 2284 2285 supported = bdev_io_type_supported(bdev, io_type); 2286 2287 if (!supported) { 2288 switch (io_type) { 2289 case SPDK_BDEV_IO_TYPE_WRITE_ZEROES: 2290 /* The bdev layer will emulate write zeroes as long as write is supported. */ 2291 supported = bdev_io_type_supported(bdev, SPDK_BDEV_IO_TYPE_WRITE); 2292 break; 2293 case SPDK_BDEV_IO_TYPE_ZCOPY: 2294 /* Zero copy can be emulated with regular read and write */ 2295 supported = bdev_io_type_supported(bdev, SPDK_BDEV_IO_TYPE_READ) && 2296 bdev_io_type_supported(bdev, SPDK_BDEV_IO_TYPE_WRITE); 2297 break; 2298 default: 2299 break; 2300 } 2301 } 2302 2303 return supported; 2304 } 2305 2306 int 2307 spdk_bdev_dump_info_json(struct spdk_bdev *bdev, struct spdk_json_write_ctx *w) 2308 { 2309 if (bdev->fn_table->dump_info_json) { 2310 return bdev->fn_table->dump_info_json(bdev->ctxt, w); 2311 } 2312 2313 return 0; 2314 } 2315 2316 static void 2317 bdev_qos_update_max_quota_per_timeslice(struct spdk_bdev_qos *qos) 2318 { 2319 uint32_t max_per_timeslice = 0; 2320 int i; 2321 2322 for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) { 2323 if (qos->rate_limits[i].limit == SPDK_BDEV_QOS_LIMIT_NOT_DEFINED) { 2324 qos->rate_limits[i].max_per_timeslice = 0; 2325 continue; 2326 } 2327 2328 max_per_timeslice = qos->rate_limits[i].limit * 2329 SPDK_BDEV_QOS_TIMESLICE_IN_USEC / SPDK_SEC_TO_USEC; 2330 2331 qos->rate_limits[i].max_per_timeslice = spdk_max(max_per_timeslice, 2332 qos->rate_limits[i].min_per_timeslice); 2333 2334 qos->rate_limits[i].remaining_this_timeslice = qos->rate_limits[i].max_per_timeslice; 2335 } 2336 2337 bdev_qos_set_ops(qos); 2338 } 2339 2340 static int 2341 bdev_channel_poll_qos(void *arg) 2342 { 2343 struct spdk_bdev_qos *qos = arg; 2344 uint64_t now = spdk_get_ticks(); 2345 int i; 2346 2347 if (now < (qos->last_timeslice + qos->timeslice_size)) { 2348 /* We received our callback earlier than expected - return 2349 * immediately and wait to do accounting until at least one 2350 * timeslice has actually expired. This should never happen 2351 * with a well-behaved timer implementation. 2352 */ 2353 return SPDK_POLLER_IDLE; 2354 } 2355 2356 /* Reset for next round of rate limiting */ 2357 for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) { 2358 /* We may have allowed the IOs or bytes to slightly overrun in the last 2359 * timeslice. remaining_this_timeslice is signed, so if it's negative 2360 * here, we'll account for the overrun so that the next timeslice will 2361 * be appropriately reduced. 2362 */ 2363 if (qos->rate_limits[i].remaining_this_timeslice > 0) { 2364 qos->rate_limits[i].remaining_this_timeslice = 0; 2365 } 2366 } 2367 2368 while (now >= (qos->last_timeslice + qos->timeslice_size)) { 2369 qos->last_timeslice += qos->timeslice_size; 2370 for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) { 2371 qos->rate_limits[i].remaining_this_timeslice += 2372 qos->rate_limits[i].max_per_timeslice; 2373 } 2374 } 2375 2376 return bdev_qos_io_submit(qos->ch, qos); 2377 } 2378 2379 static void 2380 bdev_channel_destroy_resource(struct spdk_bdev_channel *ch) 2381 { 2382 struct spdk_bdev_shared_resource *shared_resource; 2383 struct lba_range *range; 2384 2385 while (!TAILQ_EMPTY(&ch->locked_ranges)) { 2386 range = TAILQ_FIRST(&ch->locked_ranges); 2387 TAILQ_REMOVE(&ch->locked_ranges, range, tailq); 2388 free(range); 2389 } 2390 2391 spdk_put_io_channel(ch->channel); 2392 2393 shared_resource = ch->shared_resource; 2394 2395 assert(TAILQ_EMPTY(&ch->io_locked)); 2396 assert(TAILQ_EMPTY(&ch->io_submitted)); 2397 assert(ch->io_outstanding == 0); 2398 assert(shared_resource->ref > 0); 2399 shared_resource->ref--; 2400 if (shared_resource->ref == 0) { 2401 assert(shared_resource->io_outstanding == 0); 2402 TAILQ_REMOVE(&shared_resource->mgmt_ch->shared_resources, shared_resource, link); 2403 spdk_put_io_channel(spdk_io_channel_from_ctx(shared_resource->mgmt_ch)); 2404 free(shared_resource); 2405 } 2406 } 2407 2408 /* Caller must hold bdev->internal.mutex. */ 2409 static void 2410 bdev_enable_qos(struct spdk_bdev *bdev, struct spdk_bdev_channel *ch) 2411 { 2412 struct spdk_bdev_qos *qos = bdev->internal.qos; 2413 int i; 2414 2415 /* Rate limiting on this bdev enabled */ 2416 if (qos) { 2417 if (qos->ch == NULL) { 2418 struct spdk_io_channel *io_ch; 2419 2420 SPDK_DEBUGLOG(bdev, "Selecting channel %p as QoS channel for bdev %s on thread %p\n", ch, 2421 bdev->name, spdk_get_thread()); 2422 2423 /* No qos channel has been selected, so set one up */ 2424 2425 /* Take another reference to ch */ 2426 io_ch = spdk_get_io_channel(__bdev_to_io_dev(bdev)); 2427 assert(io_ch != NULL); 2428 qos->ch = ch; 2429 2430 qos->thread = spdk_io_channel_get_thread(io_ch); 2431 2432 TAILQ_INIT(&qos->queued); 2433 2434 for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) { 2435 if (bdev_qos_is_iops_rate_limit(i) == true) { 2436 qos->rate_limits[i].min_per_timeslice = 2437 SPDK_BDEV_QOS_MIN_IO_PER_TIMESLICE; 2438 } else { 2439 qos->rate_limits[i].min_per_timeslice = 2440 SPDK_BDEV_QOS_MIN_BYTE_PER_TIMESLICE; 2441 } 2442 2443 if (qos->rate_limits[i].limit == 0) { 2444 qos->rate_limits[i].limit = SPDK_BDEV_QOS_LIMIT_NOT_DEFINED; 2445 } 2446 } 2447 bdev_qos_update_max_quota_per_timeslice(qos); 2448 qos->timeslice_size = 2449 SPDK_BDEV_QOS_TIMESLICE_IN_USEC * spdk_get_ticks_hz() / SPDK_SEC_TO_USEC; 2450 qos->last_timeslice = spdk_get_ticks(); 2451 qos->poller = SPDK_POLLER_REGISTER(bdev_channel_poll_qos, 2452 qos, 2453 SPDK_BDEV_QOS_TIMESLICE_IN_USEC); 2454 } 2455 2456 ch->flags |= BDEV_CH_QOS_ENABLED; 2457 } 2458 } 2459 2460 struct poll_timeout_ctx { 2461 struct spdk_bdev_desc *desc; 2462 uint64_t timeout_in_sec; 2463 spdk_bdev_io_timeout_cb cb_fn; 2464 void *cb_arg; 2465 }; 2466 2467 static void 2468 bdev_desc_free(struct spdk_bdev_desc *desc) 2469 { 2470 pthread_mutex_destroy(&desc->mutex); 2471 free(desc->media_events_buffer); 2472 free(desc); 2473 } 2474 2475 static void 2476 bdev_channel_poll_timeout_io_done(struct spdk_io_channel_iter *i, int status) 2477 { 2478 struct poll_timeout_ctx *ctx = spdk_io_channel_iter_get_ctx(i); 2479 struct spdk_bdev_desc *desc = ctx->desc; 2480 2481 free(ctx); 2482 2483 pthread_mutex_lock(&desc->mutex); 2484 desc->refs--; 2485 if (desc->closed == true && desc->refs == 0) { 2486 pthread_mutex_unlock(&desc->mutex); 2487 bdev_desc_free(desc); 2488 return; 2489 } 2490 pthread_mutex_unlock(&desc->mutex); 2491 } 2492 2493 static void 2494 bdev_channel_poll_timeout_io(struct spdk_io_channel_iter *i) 2495 { 2496 struct poll_timeout_ctx *ctx = spdk_io_channel_iter_get_ctx(i); 2497 struct spdk_io_channel *io_ch = spdk_io_channel_iter_get_channel(i); 2498 struct spdk_bdev_channel *bdev_ch = spdk_io_channel_get_ctx(io_ch); 2499 struct spdk_bdev_desc *desc = ctx->desc; 2500 struct spdk_bdev_io *bdev_io; 2501 uint64_t now; 2502 2503 pthread_mutex_lock(&desc->mutex); 2504 if (desc->closed == true) { 2505 pthread_mutex_unlock(&desc->mutex); 2506 spdk_for_each_channel_continue(i, -1); 2507 return; 2508 } 2509 pthread_mutex_unlock(&desc->mutex); 2510 2511 now = spdk_get_ticks(); 2512 TAILQ_FOREACH(bdev_io, &bdev_ch->io_submitted, internal.ch_link) { 2513 /* Exclude any I/O that are generated via splitting. */ 2514 if (bdev_io->internal.cb == bdev_io_split_done) { 2515 continue; 2516 } 2517 2518 /* Once we find an I/O that has not timed out, we can immediately 2519 * exit the loop. 2520 */ 2521 if (now < (bdev_io->internal.submit_tsc + 2522 ctx->timeout_in_sec * spdk_get_ticks_hz())) { 2523 goto end; 2524 } 2525 2526 if (bdev_io->internal.desc == desc) { 2527 ctx->cb_fn(ctx->cb_arg, bdev_io); 2528 } 2529 } 2530 2531 end: 2532 spdk_for_each_channel_continue(i, 0); 2533 } 2534 2535 static int 2536 bdev_poll_timeout_io(void *arg) 2537 { 2538 struct spdk_bdev_desc *desc = arg; 2539 struct spdk_bdev *bdev = spdk_bdev_desc_get_bdev(desc); 2540 struct poll_timeout_ctx *ctx; 2541 2542 ctx = calloc(1, sizeof(struct poll_timeout_ctx)); 2543 if (!ctx) { 2544 SPDK_ERRLOG("failed to allocate memory\n"); 2545 return SPDK_POLLER_BUSY; 2546 } 2547 ctx->desc = desc; 2548 ctx->cb_arg = desc->cb_arg; 2549 ctx->cb_fn = desc->cb_fn; 2550 ctx->timeout_in_sec = desc->timeout_in_sec; 2551 2552 /* Take a ref on the descriptor in case it gets closed while we are checking 2553 * all of the channels. 2554 */ 2555 pthread_mutex_lock(&desc->mutex); 2556 desc->refs++; 2557 pthread_mutex_unlock(&desc->mutex); 2558 2559 spdk_for_each_channel(__bdev_to_io_dev(bdev), 2560 bdev_channel_poll_timeout_io, 2561 ctx, 2562 bdev_channel_poll_timeout_io_done); 2563 2564 return SPDK_POLLER_BUSY; 2565 } 2566 2567 int 2568 spdk_bdev_set_timeout(struct spdk_bdev_desc *desc, uint64_t timeout_in_sec, 2569 spdk_bdev_io_timeout_cb cb_fn, void *cb_arg) 2570 { 2571 assert(desc->thread == spdk_get_thread()); 2572 2573 spdk_poller_unregister(&desc->io_timeout_poller); 2574 2575 if (timeout_in_sec) { 2576 assert(cb_fn != NULL); 2577 desc->io_timeout_poller = SPDK_POLLER_REGISTER(bdev_poll_timeout_io, 2578 desc, 2579 SPDK_BDEV_IO_POLL_INTERVAL_IN_MSEC * SPDK_SEC_TO_USEC / 2580 1000); 2581 if (desc->io_timeout_poller == NULL) { 2582 SPDK_ERRLOG("can not register the desc timeout IO poller\n"); 2583 return -1; 2584 } 2585 } 2586 2587 desc->cb_fn = cb_fn; 2588 desc->cb_arg = cb_arg; 2589 desc->timeout_in_sec = timeout_in_sec; 2590 2591 return 0; 2592 } 2593 2594 static int 2595 bdev_channel_create(void *io_device, void *ctx_buf) 2596 { 2597 struct spdk_bdev *bdev = __bdev_from_io_dev(io_device); 2598 struct spdk_bdev_channel *ch = ctx_buf; 2599 struct spdk_io_channel *mgmt_io_ch; 2600 struct spdk_bdev_mgmt_channel *mgmt_ch; 2601 struct spdk_bdev_shared_resource *shared_resource; 2602 struct lba_range *range; 2603 2604 ch->bdev = bdev; 2605 ch->channel = bdev->fn_table->get_io_channel(bdev->ctxt); 2606 if (!ch->channel) { 2607 return -1; 2608 } 2609 2610 assert(ch->histogram == NULL); 2611 if (bdev->internal.histogram_enabled) { 2612 ch->histogram = spdk_histogram_data_alloc(); 2613 if (ch->histogram == NULL) { 2614 SPDK_ERRLOG("Could not allocate histogram\n"); 2615 } 2616 } 2617 2618 mgmt_io_ch = spdk_get_io_channel(&g_bdev_mgr); 2619 if (!mgmt_io_ch) { 2620 spdk_put_io_channel(ch->channel); 2621 return -1; 2622 } 2623 2624 mgmt_ch = spdk_io_channel_get_ctx(mgmt_io_ch); 2625 TAILQ_FOREACH(shared_resource, &mgmt_ch->shared_resources, link) { 2626 if (shared_resource->shared_ch == ch->channel) { 2627 spdk_put_io_channel(mgmt_io_ch); 2628 shared_resource->ref++; 2629 break; 2630 } 2631 } 2632 2633 if (shared_resource == NULL) { 2634 shared_resource = calloc(1, sizeof(*shared_resource)); 2635 if (shared_resource == NULL) { 2636 spdk_put_io_channel(ch->channel); 2637 spdk_put_io_channel(mgmt_io_ch); 2638 return -1; 2639 } 2640 2641 shared_resource->mgmt_ch = mgmt_ch; 2642 shared_resource->io_outstanding = 0; 2643 TAILQ_INIT(&shared_resource->nomem_io); 2644 shared_resource->nomem_threshold = 0; 2645 shared_resource->shared_ch = ch->channel; 2646 shared_resource->ref = 1; 2647 TAILQ_INSERT_TAIL(&mgmt_ch->shared_resources, shared_resource, link); 2648 } 2649 2650 memset(&ch->stat, 0, sizeof(ch->stat)); 2651 ch->stat.ticks_rate = spdk_get_ticks_hz(); 2652 ch->io_outstanding = 0; 2653 TAILQ_INIT(&ch->queued_resets); 2654 TAILQ_INIT(&ch->locked_ranges); 2655 ch->flags = 0; 2656 ch->shared_resource = shared_resource; 2657 2658 TAILQ_INIT(&ch->io_submitted); 2659 TAILQ_INIT(&ch->io_locked); 2660 2661 #ifdef SPDK_CONFIG_VTUNE 2662 { 2663 char *name; 2664 __itt_init_ittlib(NULL, 0); 2665 name = spdk_sprintf_alloc("spdk_bdev_%s_%p", ch->bdev->name, ch); 2666 if (!name) { 2667 bdev_channel_destroy_resource(ch); 2668 return -1; 2669 } 2670 ch->handle = __itt_string_handle_create(name); 2671 free(name); 2672 ch->start_tsc = spdk_get_ticks(); 2673 ch->interval_tsc = spdk_get_ticks_hz() / 100; 2674 memset(&ch->prev_stat, 0, sizeof(ch->prev_stat)); 2675 } 2676 #endif 2677 2678 pthread_mutex_lock(&bdev->internal.mutex); 2679 bdev_enable_qos(bdev, ch); 2680 2681 TAILQ_FOREACH(range, &bdev->internal.locked_ranges, tailq) { 2682 struct lba_range *new_range; 2683 2684 new_range = calloc(1, sizeof(*new_range)); 2685 if (new_range == NULL) { 2686 pthread_mutex_unlock(&bdev->internal.mutex); 2687 bdev_channel_destroy_resource(ch); 2688 return -1; 2689 } 2690 new_range->length = range->length; 2691 new_range->offset = range->offset; 2692 new_range->locked_ctx = range->locked_ctx; 2693 TAILQ_INSERT_TAIL(&ch->locked_ranges, new_range, tailq); 2694 } 2695 2696 pthread_mutex_unlock(&bdev->internal.mutex); 2697 2698 return 0; 2699 } 2700 2701 /* 2702 * Abort I/O that are waiting on a data buffer. These types of I/O are 2703 * linked using the spdk_bdev_io internal.buf_link TAILQ_ENTRY. 2704 */ 2705 static void 2706 bdev_abort_all_buf_io(bdev_io_stailq_t *queue, struct spdk_bdev_channel *ch) 2707 { 2708 bdev_io_stailq_t tmp; 2709 struct spdk_bdev_io *bdev_io; 2710 2711 STAILQ_INIT(&tmp); 2712 2713 while (!STAILQ_EMPTY(queue)) { 2714 bdev_io = STAILQ_FIRST(queue); 2715 STAILQ_REMOVE_HEAD(queue, internal.buf_link); 2716 if (bdev_io->internal.ch == ch) { 2717 spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_ABORTED); 2718 } else { 2719 STAILQ_INSERT_TAIL(&tmp, bdev_io, internal.buf_link); 2720 } 2721 } 2722 2723 STAILQ_SWAP(&tmp, queue, spdk_bdev_io); 2724 } 2725 2726 /* 2727 * Abort I/O that are queued waiting for submission. These types of I/O are 2728 * linked using the spdk_bdev_io link TAILQ_ENTRY. 2729 */ 2730 static void 2731 bdev_abort_all_queued_io(bdev_io_tailq_t *queue, struct spdk_bdev_channel *ch) 2732 { 2733 struct spdk_bdev_io *bdev_io, *tmp; 2734 2735 TAILQ_FOREACH_SAFE(bdev_io, queue, internal.link, tmp) { 2736 if (bdev_io->internal.ch == ch) { 2737 TAILQ_REMOVE(queue, bdev_io, internal.link); 2738 /* 2739 * spdk_bdev_io_complete() assumes that the completed I/O had 2740 * been submitted to the bdev module. Since in this case it 2741 * hadn't, bump io_outstanding to account for the decrement 2742 * that spdk_bdev_io_complete() will do. 2743 */ 2744 if (bdev_io->type != SPDK_BDEV_IO_TYPE_RESET) { 2745 ch->io_outstanding++; 2746 ch->shared_resource->io_outstanding++; 2747 } 2748 spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_ABORTED); 2749 } 2750 } 2751 } 2752 2753 static bool 2754 bdev_abort_queued_io(bdev_io_tailq_t *queue, struct spdk_bdev_io *bio_to_abort) 2755 { 2756 struct spdk_bdev_io *bdev_io; 2757 2758 TAILQ_FOREACH(bdev_io, queue, internal.link) { 2759 if (bdev_io == bio_to_abort) { 2760 TAILQ_REMOVE(queue, bio_to_abort, internal.link); 2761 spdk_bdev_io_complete(bio_to_abort, SPDK_BDEV_IO_STATUS_ABORTED); 2762 return true; 2763 } 2764 } 2765 2766 return false; 2767 } 2768 2769 static bool 2770 bdev_abort_buf_io(bdev_io_stailq_t *queue, struct spdk_bdev_io *bio_to_abort) 2771 { 2772 struct spdk_bdev_io *bdev_io; 2773 2774 STAILQ_FOREACH(bdev_io, queue, internal.buf_link) { 2775 if (bdev_io == bio_to_abort) { 2776 STAILQ_REMOVE(queue, bio_to_abort, spdk_bdev_io, internal.buf_link); 2777 spdk_bdev_io_complete(bio_to_abort, SPDK_BDEV_IO_STATUS_ABORTED); 2778 return true; 2779 } 2780 } 2781 2782 return false; 2783 } 2784 2785 static void 2786 bdev_qos_channel_destroy(void *cb_arg) 2787 { 2788 struct spdk_bdev_qos *qos = cb_arg; 2789 2790 spdk_put_io_channel(spdk_io_channel_from_ctx(qos->ch)); 2791 spdk_poller_unregister(&qos->poller); 2792 2793 SPDK_DEBUGLOG(bdev, "Free QoS %p.\n", qos); 2794 2795 free(qos); 2796 } 2797 2798 static int 2799 bdev_qos_destroy(struct spdk_bdev *bdev) 2800 { 2801 int i; 2802 2803 /* 2804 * Cleanly shutting down the QoS poller is tricky, because 2805 * during the asynchronous operation the user could open 2806 * a new descriptor and create a new channel, spawning 2807 * a new QoS poller. 2808 * 2809 * The strategy is to create a new QoS structure here and swap it 2810 * in. The shutdown path then continues to refer to the old one 2811 * until it completes and then releases it. 2812 */ 2813 struct spdk_bdev_qos *new_qos, *old_qos; 2814 2815 old_qos = bdev->internal.qos; 2816 2817 new_qos = calloc(1, sizeof(*new_qos)); 2818 if (!new_qos) { 2819 SPDK_ERRLOG("Unable to allocate memory to shut down QoS.\n"); 2820 return -ENOMEM; 2821 } 2822 2823 /* Copy the old QoS data into the newly allocated structure */ 2824 memcpy(new_qos, old_qos, sizeof(*new_qos)); 2825 2826 /* Zero out the key parts of the QoS structure */ 2827 new_qos->ch = NULL; 2828 new_qos->thread = NULL; 2829 new_qos->poller = NULL; 2830 TAILQ_INIT(&new_qos->queued); 2831 /* 2832 * The limit member of spdk_bdev_qos_limit structure is not zeroed. 2833 * It will be used later for the new QoS structure. 2834 */ 2835 for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) { 2836 new_qos->rate_limits[i].remaining_this_timeslice = 0; 2837 new_qos->rate_limits[i].min_per_timeslice = 0; 2838 new_qos->rate_limits[i].max_per_timeslice = 0; 2839 } 2840 2841 bdev->internal.qos = new_qos; 2842 2843 if (old_qos->thread == NULL) { 2844 free(old_qos); 2845 } else { 2846 spdk_thread_send_msg(old_qos->thread, bdev_qos_channel_destroy, old_qos); 2847 } 2848 2849 /* It is safe to continue with destroying the bdev even though the QoS channel hasn't 2850 * been destroyed yet. The destruction path will end up waiting for the final 2851 * channel to be put before it releases resources. */ 2852 2853 return 0; 2854 } 2855 2856 static void 2857 bdev_io_stat_add(struct spdk_bdev_io_stat *total, struct spdk_bdev_io_stat *add) 2858 { 2859 total->bytes_read += add->bytes_read; 2860 total->num_read_ops += add->num_read_ops; 2861 total->bytes_written += add->bytes_written; 2862 total->num_write_ops += add->num_write_ops; 2863 total->bytes_unmapped += add->bytes_unmapped; 2864 total->num_unmap_ops += add->num_unmap_ops; 2865 total->read_latency_ticks += add->read_latency_ticks; 2866 total->write_latency_ticks += add->write_latency_ticks; 2867 total->unmap_latency_ticks += add->unmap_latency_ticks; 2868 } 2869 2870 static void 2871 bdev_channel_destroy(void *io_device, void *ctx_buf) 2872 { 2873 struct spdk_bdev_channel *ch = ctx_buf; 2874 struct spdk_bdev_mgmt_channel *mgmt_ch; 2875 struct spdk_bdev_shared_resource *shared_resource = ch->shared_resource; 2876 2877 SPDK_DEBUGLOG(bdev, "Destroying channel %p for bdev %s on thread %p\n", ch, ch->bdev->name, 2878 spdk_get_thread()); 2879 2880 /* This channel is going away, so add its statistics into the bdev so that they don't get lost. */ 2881 pthread_mutex_lock(&ch->bdev->internal.mutex); 2882 bdev_io_stat_add(&ch->bdev->internal.stat, &ch->stat); 2883 pthread_mutex_unlock(&ch->bdev->internal.mutex); 2884 2885 mgmt_ch = shared_resource->mgmt_ch; 2886 2887 bdev_abort_all_queued_io(&ch->queued_resets, ch); 2888 bdev_abort_all_queued_io(&shared_resource->nomem_io, ch); 2889 bdev_abort_all_buf_io(&mgmt_ch->need_buf_small, ch); 2890 bdev_abort_all_buf_io(&mgmt_ch->need_buf_large, ch); 2891 2892 if (ch->histogram) { 2893 spdk_histogram_data_free(ch->histogram); 2894 } 2895 2896 bdev_channel_destroy_resource(ch); 2897 } 2898 2899 int 2900 spdk_bdev_alias_add(struct spdk_bdev *bdev, const char *alias) 2901 { 2902 struct spdk_bdev_alias *tmp; 2903 2904 if (alias == NULL) { 2905 SPDK_ERRLOG("Empty alias passed\n"); 2906 return -EINVAL; 2907 } 2908 2909 if (spdk_bdev_get_by_name(alias)) { 2910 SPDK_ERRLOG("Bdev name/alias: %s already exists\n", alias); 2911 return -EEXIST; 2912 } 2913 2914 tmp = calloc(1, sizeof(*tmp)); 2915 if (tmp == NULL) { 2916 SPDK_ERRLOG("Unable to allocate alias\n"); 2917 return -ENOMEM; 2918 } 2919 2920 tmp->alias = strdup(alias); 2921 if (tmp->alias == NULL) { 2922 free(tmp); 2923 SPDK_ERRLOG("Unable to allocate alias\n"); 2924 return -ENOMEM; 2925 } 2926 2927 TAILQ_INSERT_TAIL(&bdev->aliases, tmp, tailq); 2928 2929 return 0; 2930 } 2931 2932 int 2933 spdk_bdev_alias_del(struct spdk_bdev *bdev, const char *alias) 2934 { 2935 struct spdk_bdev_alias *tmp; 2936 2937 TAILQ_FOREACH(tmp, &bdev->aliases, tailq) { 2938 if (strcmp(alias, tmp->alias) == 0) { 2939 TAILQ_REMOVE(&bdev->aliases, tmp, tailq); 2940 free(tmp->alias); 2941 free(tmp); 2942 return 0; 2943 } 2944 } 2945 2946 SPDK_INFOLOG(bdev, "Alias %s does not exists\n", alias); 2947 2948 return -ENOENT; 2949 } 2950 2951 void 2952 spdk_bdev_alias_del_all(struct spdk_bdev *bdev) 2953 { 2954 struct spdk_bdev_alias *p, *tmp; 2955 2956 TAILQ_FOREACH_SAFE(p, &bdev->aliases, tailq, tmp) { 2957 TAILQ_REMOVE(&bdev->aliases, p, tailq); 2958 free(p->alias); 2959 free(p); 2960 } 2961 } 2962 2963 struct spdk_io_channel * 2964 spdk_bdev_get_io_channel(struct spdk_bdev_desc *desc) 2965 { 2966 return spdk_get_io_channel(__bdev_to_io_dev(spdk_bdev_desc_get_bdev(desc))); 2967 } 2968 2969 void * 2970 spdk_bdev_get_module_ctx(struct spdk_bdev_desc *desc) 2971 { 2972 struct spdk_bdev *bdev = spdk_bdev_desc_get_bdev(desc); 2973 void *ctx = NULL; 2974 2975 if (bdev->fn_table->get_module_ctx) { 2976 ctx = bdev->fn_table->get_module_ctx(bdev->ctxt); 2977 } 2978 2979 return ctx; 2980 } 2981 2982 const char * 2983 spdk_bdev_get_module_name(const struct spdk_bdev *bdev) 2984 { 2985 return bdev->module->name; 2986 } 2987 2988 const char * 2989 spdk_bdev_get_name(const struct spdk_bdev *bdev) 2990 { 2991 return bdev->name; 2992 } 2993 2994 const char * 2995 spdk_bdev_get_product_name(const struct spdk_bdev *bdev) 2996 { 2997 return bdev->product_name; 2998 } 2999 3000 const struct spdk_bdev_aliases_list * 3001 spdk_bdev_get_aliases(const struct spdk_bdev *bdev) 3002 { 3003 return &bdev->aliases; 3004 } 3005 3006 uint32_t 3007 spdk_bdev_get_block_size(const struct spdk_bdev *bdev) 3008 { 3009 return bdev->blocklen; 3010 } 3011 3012 uint32_t 3013 spdk_bdev_get_write_unit_size(const struct spdk_bdev *bdev) 3014 { 3015 return bdev->write_unit_size; 3016 } 3017 3018 uint64_t 3019 spdk_bdev_get_num_blocks(const struct spdk_bdev *bdev) 3020 { 3021 return bdev->blockcnt; 3022 } 3023 3024 const char * 3025 spdk_bdev_get_qos_rpc_type(enum spdk_bdev_qos_rate_limit_type type) 3026 { 3027 return qos_rpc_type[type]; 3028 } 3029 3030 void 3031 spdk_bdev_get_qos_rate_limits(struct spdk_bdev *bdev, uint64_t *limits) 3032 { 3033 int i; 3034 3035 memset(limits, 0, sizeof(*limits) * SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES); 3036 3037 pthread_mutex_lock(&bdev->internal.mutex); 3038 if (bdev->internal.qos) { 3039 for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) { 3040 if (bdev->internal.qos->rate_limits[i].limit != 3041 SPDK_BDEV_QOS_LIMIT_NOT_DEFINED) { 3042 limits[i] = bdev->internal.qos->rate_limits[i].limit; 3043 if (bdev_qos_is_iops_rate_limit(i) == false) { 3044 /* Change from Byte to Megabyte which is user visible. */ 3045 limits[i] = limits[i] / 1024 / 1024; 3046 } 3047 } 3048 } 3049 } 3050 pthread_mutex_unlock(&bdev->internal.mutex); 3051 } 3052 3053 size_t 3054 spdk_bdev_get_buf_align(const struct spdk_bdev *bdev) 3055 { 3056 return 1 << bdev->required_alignment; 3057 } 3058 3059 uint32_t 3060 spdk_bdev_get_optimal_io_boundary(const struct spdk_bdev *bdev) 3061 { 3062 return bdev->optimal_io_boundary; 3063 } 3064 3065 bool 3066 spdk_bdev_has_write_cache(const struct spdk_bdev *bdev) 3067 { 3068 return bdev->write_cache; 3069 } 3070 3071 const struct spdk_uuid * 3072 spdk_bdev_get_uuid(const struct spdk_bdev *bdev) 3073 { 3074 return &bdev->uuid; 3075 } 3076 3077 uint16_t 3078 spdk_bdev_get_acwu(const struct spdk_bdev *bdev) 3079 { 3080 return bdev->acwu; 3081 } 3082 3083 uint32_t 3084 spdk_bdev_get_md_size(const struct spdk_bdev *bdev) 3085 { 3086 return bdev->md_len; 3087 } 3088 3089 bool 3090 spdk_bdev_is_md_interleaved(const struct spdk_bdev *bdev) 3091 { 3092 return (bdev->md_len != 0) && bdev->md_interleave; 3093 } 3094 3095 bool 3096 spdk_bdev_is_md_separate(const struct spdk_bdev *bdev) 3097 { 3098 return (bdev->md_len != 0) && !bdev->md_interleave; 3099 } 3100 3101 bool 3102 spdk_bdev_is_zoned(const struct spdk_bdev *bdev) 3103 { 3104 return bdev->zoned; 3105 } 3106 3107 uint32_t 3108 spdk_bdev_get_data_block_size(const struct spdk_bdev *bdev) 3109 { 3110 if (spdk_bdev_is_md_interleaved(bdev)) { 3111 return bdev->blocklen - bdev->md_len; 3112 } else { 3113 return bdev->blocklen; 3114 } 3115 } 3116 3117 static uint32_t 3118 _bdev_get_block_size_with_md(const struct spdk_bdev *bdev) 3119 { 3120 if (!spdk_bdev_is_md_interleaved(bdev)) { 3121 return bdev->blocklen + bdev->md_len; 3122 } else { 3123 return bdev->blocklen; 3124 } 3125 } 3126 3127 enum spdk_dif_type spdk_bdev_get_dif_type(const struct spdk_bdev *bdev) 3128 { 3129 if (bdev->md_len != 0) { 3130 return bdev->dif_type; 3131 } else { 3132 return SPDK_DIF_DISABLE; 3133 } 3134 } 3135 3136 bool 3137 spdk_bdev_is_dif_head_of_md(const struct spdk_bdev *bdev) 3138 { 3139 if (spdk_bdev_get_dif_type(bdev) != SPDK_DIF_DISABLE) { 3140 return bdev->dif_is_head_of_md; 3141 } else { 3142 return false; 3143 } 3144 } 3145 3146 bool 3147 spdk_bdev_is_dif_check_enabled(const struct spdk_bdev *bdev, 3148 enum spdk_dif_check_type check_type) 3149 { 3150 if (spdk_bdev_get_dif_type(bdev) == SPDK_DIF_DISABLE) { 3151 return false; 3152 } 3153 3154 switch (check_type) { 3155 case SPDK_DIF_CHECK_TYPE_REFTAG: 3156 return (bdev->dif_check_flags & SPDK_DIF_FLAGS_REFTAG_CHECK) != 0; 3157 case SPDK_DIF_CHECK_TYPE_APPTAG: 3158 return (bdev->dif_check_flags & SPDK_DIF_FLAGS_APPTAG_CHECK) != 0; 3159 case SPDK_DIF_CHECK_TYPE_GUARD: 3160 return (bdev->dif_check_flags & SPDK_DIF_FLAGS_GUARD_CHECK) != 0; 3161 default: 3162 return false; 3163 } 3164 } 3165 3166 uint64_t 3167 spdk_bdev_get_qd(const struct spdk_bdev *bdev) 3168 { 3169 return bdev->internal.measured_queue_depth; 3170 } 3171 3172 uint64_t 3173 spdk_bdev_get_qd_sampling_period(const struct spdk_bdev *bdev) 3174 { 3175 return bdev->internal.period; 3176 } 3177 3178 uint64_t 3179 spdk_bdev_get_weighted_io_time(const struct spdk_bdev *bdev) 3180 { 3181 return bdev->internal.weighted_io_time; 3182 } 3183 3184 uint64_t 3185 spdk_bdev_get_io_time(const struct spdk_bdev *bdev) 3186 { 3187 return bdev->internal.io_time; 3188 } 3189 3190 static void 3191 _calculate_measured_qd_cpl(struct spdk_io_channel_iter *i, int status) 3192 { 3193 struct spdk_bdev *bdev = spdk_io_channel_iter_get_ctx(i); 3194 3195 bdev->internal.measured_queue_depth = bdev->internal.temporary_queue_depth; 3196 3197 if (bdev->internal.measured_queue_depth) { 3198 bdev->internal.io_time += bdev->internal.period; 3199 bdev->internal.weighted_io_time += bdev->internal.period * bdev->internal.measured_queue_depth; 3200 } 3201 } 3202 3203 static void 3204 _calculate_measured_qd(struct spdk_io_channel_iter *i) 3205 { 3206 struct spdk_bdev *bdev = spdk_io_channel_iter_get_ctx(i); 3207 struct spdk_io_channel *io_ch = spdk_io_channel_iter_get_channel(i); 3208 struct spdk_bdev_channel *ch = spdk_io_channel_get_ctx(io_ch); 3209 3210 bdev->internal.temporary_queue_depth += ch->io_outstanding; 3211 spdk_for_each_channel_continue(i, 0); 3212 } 3213 3214 static int 3215 bdev_calculate_measured_queue_depth(void *ctx) 3216 { 3217 struct spdk_bdev *bdev = ctx; 3218 bdev->internal.temporary_queue_depth = 0; 3219 spdk_for_each_channel(__bdev_to_io_dev(bdev), _calculate_measured_qd, bdev, 3220 _calculate_measured_qd_cpl); 3221 return SPDK_POLLER_BUSY; 3222 } 3223 3224 void 3225 spdk_bdev_set_qd_sampling_period(struct spdk_bdev *bdev, uint64_t period) 3226 { 3227 bdev->internal.period = period; 3228 3229 if (bdev->internal.qd_poller != NULL) { 3230 spdk_poller_unregister(&bdev->internal.qd_poller); 3231 bdev->internal.measured_queue_depth = UINT64_MAX; 3232 } 3233 3234 if (period != 0) { 3235 bdev->internal.qd_poller = SPDK_POLLER_REGISTER(bdev_calculate_measured_queue_depth, bdev, 3236 period); 3237 } 3238 } 3239 3240 static void 3241 _resize_notify(void *arg) 3242 { 3243 struct spdk_bdev_desc *desc = arg; 3244 3245 pthread_mutex_lock(&desc->mutex); 3246 desc->refs--; 3247 if (!desc->closed) { 3248 pthread_mutex_unlock(&desc->mutex); 3249 desc->callback.event_fn(SPDK_BDEV_EVENT_RESIZE, 3250 desc->bdev, 3251 desc->callback.ctx); 3252 return; 3253 } else if (0 == desc->refs) { 3254 /* This descriptor was closed after this resize_notify message was sent. 3255 * spdk_bdev_close() could not free the descriptor since this message was 3256 * in flight, so we free it now using bdev_desc_free(). 3257 */ 3258 pthread_mutex_unlock(&desc->mutex); 3259 bdev_desc_free(desc); 3260 return; 3261 } 3262 pthread_mutex_unlock(&desc->mutex); 3263 } 3264 3265 int 3266 spdk_bdev_notify_blockcnt_change(struct spdk_bdev *bdev, uint64_t size) 3267 { 3268 struct spdk_bdev_desc *desc; 3269 int ret; 3270 3271 pthread_mutex_lock(&bdev->internal.mutex); 3272 3273 /* bdev has open descriptors */ 3274 if (!TAILQ_EMPTY(&bdev->internal.open_descs) && 3275 bdev->blockcnt > size) { 3276 ret = -EBUSY; 3277 } else { 3278 bdev->blockcnt = size; 3279 TAILQ_FOREACH(desc, &bdev->internal.open_descs, link) { 3280 pthread_mutex_lock(&desc->mutex); 3281 if (desc->callback.open_with_ext && !desc->closed) { 3282 desc->refs++; 3283 spdk_thread_send_msg(desc->thread, _resize_notify, desc); 3284 } 3285 pthread_mutex_unlock(&desc->mutex); 3286 } 3287 ret = 0; 3288 } 3289 3290 pthread_mutex_unlock(&bdev->internal.mutex); 3291 3292 return ret; 3293 } 3294 3295 /* 3296 * Convert I/O offset and length from bytes to blocks. 3297 * 3298 * Returns zero on success or non-zero if the byte parameters aren't divisible by the block size. 3299 */ 3300 static uint64_t 3301 bdev_bytes_to_blocks(struct spdk_bdev *bdev, uint64_t offset_bytes, uint64_t *offset_blocks, 3302 uint64_t num_bytes, uint64_t *num_blocks) 3303 { 3304 uint32_t block_size = bdev->blocklen; 3305 uint8_t shift_cnt; 3306 3307 /* Avoid expensive div operations if possible. These spdk_u32 functions are very cheap. */ 3308 if (spdk_likely(spdk_u32_is_pow2(block_size))) { 3309 shift_cnt = spdk_u32log2(block_size); 3310 *offset_blocks = offset_bytes >> shift_cnt; 3311 *num_blocks = num_bytes >> shift_cnt; 3312 return (offset_bytes - (*offset_blocks << shift_cnt)) | 3313 (num_bytes - (*num_blocks << shift_cnt)); 3314 } else { 3315 *offset_blocks = offset_bytes / block_size; 3316 *num_blocks = num_bytes / block_size; 3317 return (offset_bytes % block_size) | (num_bytes % block_size); 3318 } 3319 } 3320 3321 static bool 3322 bdev_io_valid_blocks(struct spdk_bdev *bdev, uint64_t offset_blocks, uint64_t num_blocks) 3323 { 3324 /* Return failure if offset_blocks + num_blocks is less than offset_blocks; indicates there 3325 * has been an overflow and hence the offset has been wrapped around */ 3326 if (offset_blocks + num_blocks < offset_blocks) { 3327 return false; 3328 } 3329 3330 /* Return failure if offset_blocks + num_blocks exceeds the size of the bdev */ 3331 if (offset_blocks + num_blocks > bdev->blockcnt) { 3332 return false; 3333 } 3334 3335 return true; 3336 } 3337 3338 static bool 3339 _bdev_io_check_md_buf(const struct iovec *iovs, const void *md_buf) 3340 { 3341 return _is_buf_allocated(iovs) == (md_buf != NULL); 3342 } 3343 3344 static int 3345 bdev_read_blocks_with_md(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, void *buf, 3346 void *md_buf, int64_t offset_blocks, uint64_t num_blocks, 3347 spdk_bdev_io_completion_cb cb, void *cb_arg) 3348 { 3349 struct spdk_bdev *bdev = spdk_bdev_desc_get_bdev(desc); 3350 struct spdk_bdev_io *bdev_io; 3351 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 3352 3353 if (!bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) { 3354 return -EINVAL; 3355 } 3356 3357 bdev_io = bdev_channel_get_io(channel); 3358 if (!bdev_io) { 3359 return -ENOMEM; 3360 } 3361 3362 bdev_io->internal.ch = channel; 3363 bdev_io->internal.desc = desc; 3364 bdev_io->type = SPDK_BDEV_IO_TYPE_READ; 3365 bdev_io->u.bdev.iovs = &bdev_io->iov; 3366 bdev_io->u.bdev.iovs[0].iov_base = buf; 3367 bdev_io->u.bdev.iovs[0].iov_len = num_blocks * bdev->blocklen; 3368 bdev_io->u.bdev.iovcnt = 1; 3369 bdev_io->u.bdev.md_buf = md_buf; 3370 bdev_io->u.bdev.num_blocks = num_blocks; 3371 bdev_io->u.bdev.offset_blocks = offset_blocks; 3372 bdev_io_init(bdev_io, bdev, cb_arg, cb); 3373 3374 bdev_io_submit(bdev_io); 3375 return 0; 3376 } 3377 3378 int 3379 spdk_bdev_read(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 3380 void *buf, uint64_t offset, uint64_t nbytes, 3381 spdk_bdev_io_completion_cb cb, void *cb_arg) 3382 { 3383 uint64_t offset_blocks, num_blocks; 3384 3385 if (bdev_bytes_to_blocks(spdk_bdev_desc_get_bdev(desc), offset, &offset_blocks, 3386 nbytes, &num_blocks) != 0) { 3387 return -EINVAL; 3388 } 3389 3390 return spdk_bdev_read_blocks(desc, ch, buf, offset_blocks, num_blocks, cb, cb_arg); 3391 } 3392 3393 int 3394 spdk_bdev_read_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 3395 void *buf, uint64_t offset_blocks, uint64_t num_blocks, 3396 spdk_bdev_io_completion_cb cb, void *cb_arg) 3397 { 3398 return bdev_read_blocks_with_md(desc, ch, buf, NULL, offset_blocks, num_blocks, cb, cb_arg); 3399 } 3400 3401 int 3402 spdk_bdev_read_blocks_with_md(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 3403 void *buf, void *md_buf, int64_t offset_blocks, uint64_t num_blocks, 3404 spdk_bdev_io_completion_cb cb, void *cb_arg) 3405 { 3406 struct iovec iov = { 3407 .iov_base = buf, 3408 }; 3409 3410 if (!spdk_bdev_is_md_separate(spdk_bdev_desc_get_bdev(desc))) { 3411 return -EINVAL; 3412 } 3413 3414 if (!_bdev_io_check_md_buf(&iov, md_buf)) { 3415 return -EINVAL; 3416 } 3417 3418 return bdev_read_blocks_with_md(desc, ch, buf, md_buf, offset_blocks, num_blocks, 3419 cb, cb_arg); 3420 } 3421 3422 int 3423 spdk_bdev_readv(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 3424 struct iovec *iov, int iovcnt, 3425 uint64_t offset, uint64_t nbytes, 3426 spdk_bdev_io_completion_cb cb, void *cb_arg) 3427 { 3428 uint64_t offset_blocks, num_blocks; 3429 3430 if (bdev_bytes_to_blocks(spdk_bdev_desc_get_bdev(desc), offset, &offset_blocks, 3431 nbytes, &num_blocks) != 0) { 3432 return -EINVAL; 3433 } 3434 3435 return spdk_bdev_readv_blocks(desc, ch, iov, iovcnt, offset_blocks, num_blocks, cb, cb_arg); 3436 } 3437 3438 static int 3439 bdev_readv_blocks_with_md(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 3440 struct iovec *iov, int iovcnt, void *md_buf, uint64_t offset_blocks, 3441 uint64_t num_blocks, spdk_bdev_io_completion_cb cb, void *cb_arg) 3442 { 3443 struct spdk_bdev *bdev = spdk_bdev_desc_get_bdev(desc); 3444 struct spdk_bdev_io *bdev_io; 3445 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 3446 3447 if (!bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) { 3448 return -EINVAL; 3449 } 3450 3451 bdev_io = bdev_channel_get_io(channel); 3452 if (!bdev_io) { 3453 return -ENOMEM; 3454 } 3455 3456 bdev_io->internal.ch = channel; 3457 bdev_io->internal.desc = desc; 3458 bdev_io->type = SPDK_BDEV_IO_TYPE_READ; 3459 bdev_io->u.bdev.iovs = iov; 3460 bdev_io->u.bdev.iovcnt = iovcnt; 3461 bdev_io->u.bdev.md_buf = md_buf; 3462 bdev_io->u.bdev.num_blocks = num_blocks; 3463 bdev_io->u.bdev.offset_blocks = offset_blocks; 3464 bdev_io_init(bdev_io, bdev, cb_arg, cb); 3465 3466 bdev_io_submit(bdev_io); 3467 return 0; 3468 } 3469 3470 int spdk_bdev_readv_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 3471 struct iovec *iov, int iovcnt, 3472 uint64_t offset_blocks, uint64_t num_blocks, 3473 spdk_bdev_io_completion_cb cb, void *cb_arg) 3474 { 3475 return bdev_readv_blocks_with_md(desc, ch, iov, iovcnt, NULL, offset_blocks, 3476 num_blocks, cb, cb_arg); 3477 } 3478 3479 int 3480 spdk_bdev_readv_blocks_with_md(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 3481 struct iovec *iov, int iovcnt, void *md_buf, 3482 uint64_t offset_blocks, uint64_t num_blocks, 3483 spdk_bdev_io_completion_cb cb, void *cb_arg) 3484 { 3485 if (!spdk_bdev_is_md_separate(spdk_bdev_desc_get_bdev(desc))) { 3486 return -EINVAL; 3487 } 3488 3489 if (!_bdev_io_check_md_buf(iov, md_buf)) { 3490 return -EINVAL; 3491 } 3492 3493 return bdev_readv_blocks_with_md(desc, ch, iov, iovcnt, md_buf, offset_blocks, 3494 num_blocks, cb, cb_arg); 3495 } 3496 3497 static int 3498 bdev_write_blocks_with_md(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 3499 void *buf, void *md_buf, uint64_t offset_blocks, uint64_t num_blocks, 3500 spdk_bdev_io_completion_cb cb, void *cb_arg) 3501 { 3502 struct spdk_bdev *bdev = spdk_bdev_desc_get_bdev(desc); 3503 struct spdk_bdev_io *bdev_io; 3504 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 3505 3506 if (!desc->write) { 3507 return -EBADF; 3508 } 3509 3510 if (!bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) { 3511 return -EINVAL; 3512 } 3513 3514 bdev_io = bdev_channel_get_io(channel); 3515 if (!bdev_io) { 3516 return -ENOMEM; 3517 } 3518 3519 bdev_io->internal.ch = channel; 3520 bdev_io->internal.desc = desc; 3521 bdev_io->type = SPDK_BDEV_IO_TYPE_WRITE; 3522 bdev_io->u.bdev.iovs = &bdev_io->iov; 3523 bdev_io->u.bdev.iovs[0].iov_base = buf; 3524 bdev_io->u.bdev.iovs[0].iov_len = num_blocks * bdev->blocklen; 3525 bdev_io->u.bdev.iovcnt = 1; 3526 bdev_io->u.bdev.md_buf = md_buf; 3527 bdev_io->u.bdev.num_blocks = num_blocks; 3528 bdev_io->u.bdev.offset_blocks = offset_blocks; 3529 bdev_io_init(bdev_io, bdev, cb_arg, cb); 3530 3531 bdev_io_submit(bdev_io); 3532 return 0; 3533 } 3534 3535 int 3536 spdk_bdev_write(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 3537 void *buf, uint64_t offset, uint64_t nbytes, 3538 spdk_bdev_io_completion_cb cb, void *cb_arg) 3539 { 3540 uint64_t offset_blocks, num_blocks; 3541 3542 if (bdev_bytes_to_blocks(spdk_bdev_desc_get_bdev(desc), offset, &offset_blocks, 3543 nbytes, &num_blocks) != 0) { 3544 return -EINVAL; 3545 } 3546 3547 return spdk_bdev_write_blocks(desc, ch, buf, offset_blocks, num_blocks, cb, cb_arg); 3548 } 3549 3550 int 3551 spdk_bdev_write_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 3552 void *buf, uint64_t offset_blocks, uint64_t num_blocks, 3553 spdk_bdev_io_completion_cb cb, void *cb_arg) 3554 { 3555 return bdev_write_blocks_with_md(desc, ch, buf, NULL, offset_blocks, num_blocks, 3556 cb, cb_arg); 3557 } 3558 3559 int 3560 spdk_bdev_write_blocks_with_md(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 3561 void *buf, void *md_buf, uint64_t offset_blocks, uint64_t num_blocks, 3562 spdk_bdev_io_completion_cb cb, void *cb_arg) 3563 { 3564 struct iovec iov = { 3565 .iov_base = buf, 3566 }; 3567 3568 if (!spdk_bdev_is_md_separate(spdk_bdev_desc_get_bdev(desc))) { 3569 return -EINVAL; 3570 } 3571 3572 if (!_bdev_io_check_md_buf(&iov, md_buf)) { 3573 return -EINVAL; 3574 } 3575 3576 return bdev_write_blocks_with_md(desc, ch, buf, md_buf, offset_blocks, num_blocks, 3577 cb, cb_arg); 3578 } 3579 3580 static int 3581 bdev_writev_blocks_with_md(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 3582 struct iovec *iov, int iovcnt, void *md_buf, 3583 uint64_t offset_blocks, uint64_t num_blocks, 3584 spdk_bdev_io_completion_cb cb, void *cb_arg) 3585 { 3586 struct spdk_bdev *bdev = spdk_bdev_desc_get_bdev(desc); 3587 struct spdk_bdev_io *bdev_io; 3588 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 3589 3590 if (!desc->write) { 3591 return -EBADF; 3592 } 3593 3594 if (!bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) { 3595 return -EINVAL; 3596 } 3597 3598 bdev_io = bdev_channel_get_io(channel); 3599 if (!bdev_io) { 3600 return -ENOMEM; 3601 } 3602 3603 bdev_io->internal.ch = channel; 3604 bdev_io->internal.desc = desc; 3605 bdev_io->type = SPDK_BDEV_IO_TYPE_WRITE; 3606 bdev_io->u.bdev.iovs = iov; 3607 bdev_io->u.bdev.iovcnt = iovcnt; 3608 bdev_io->u.bdev.md_buf = md_buf; 3609 bdev_io->u.bdev.num_blocks = num_blocks; 3610 bdev_io->u.bdev.offset_blocks = offset_blocks; 3611 bdev_io_init(bdev_io, bdev, cb_arg, cb); 3612 3613 bdev_io_submit(bdev_io); 3614 return 0; 3615 } 3616 3617 int 3618 spdk_bdev_writev(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 3619 struct iovec *iov, int iovcnt, 3620 uint64_t offset, uint64_t len, 3621 spdk_bdev_io_completion_cb cb, void *cb_arg) 3622 { 3623 uint64_t offset_blocks, num_blocks; 3624 3625 if (bdev_bytes_to_blocks(spdk_bdev_desc_get_bdev(desc), offset, &offset_blocks, 3626 len, &num_blocks) != 0) { 3627 return -EINVAL; 3628 } 3629 3630 return spdk_bdev_writev_blocks(desc, ch, iov, iovcnt, offset_blocks, num_blocks, cb, cb_arg); 3631 } 3632 3633 int 3634 spdk_bdev_writev_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 3635 struct iovec *iov, int iovcnt, 3636 uint64_t offset_blocks, uint64_t num_blocks, 3637 spdk_bdev_io_completion_cb cb, void *cb_arg) 3638 { 3639 return bdev_writev_blocks_with_md(desc, ch, iov, iovcnt, NULL, offset_blocks, 3640 num_blocks, cb, cb_arg); 3641 } 3642 3643 int 3644 spdk_bdev_writev_blocks_with_md(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 3645 struct iovec *iov, int iovcnt, void *md_buf, 3646 uint64_t offset_blocks, uint64_t num_blocks, 3647 spdk_bdev_io_completion_cb cb, void *cb_arg) 3648 { 3649 if (!spdk_bdev_is_md_separate(spdk_bdev_desc_get_bdev(desc))) { 3650 return -EINVAL; 3651 } 3652 3653 if (!_bdev_io_check_md_buf(iov, md_buf)) { 3654 return -EINVAL; 3655 } 3656 3657 return bdev_writev_blocks_with_md(desc, ch, iov, iovcnt, md_buf, offset_blocks, 3658 num_blocks, cb, cb_arg); 3659 } 3660 3661 static void 3662 bdev_compare_do_read_done(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg) 3663 { 3664 struct spdk_bdev_io *parent_io = cb_arg; 3665 uint8_t *read_buf = bdev_io->u.bdev.iovs[0].iov_base; 3666 int i, rc = 0; 3667 3668 if (!success) { 3669 parent_io->internal.status = SPDK_BDEV_IO_STATUS_FAILED; 3670 parent_io->internal.cb(parent_io, false, parent_io->internal.caller_ctx); 3671 spdk_bdev_free_io(bdev_io); 3672 return; 3673 } 3674 3675 for (i = 0; i < parent_io->u.bdev.iovcnt; i++) { 3676 rc = memcmp(read_buf, 3677 parent_io->u.bdev.iovs[i].iov_base, 3678 parent_io->u.bdev.iovs[i].iov_len); 3679 if (rc) { 3680 break; 3681 } 3682 read_buf += parent_io->u.bdev.iovs[i].iov_len; 3683 } 3684 3685 spdk_bdev_free_io(bdev_io); 3686 3687 if (rc == 0) { 3688 parent_io->internal.status = SPDK_BDEV_IO_STATUS_SUCCESS; 3689 parent_io->internal.cb(parent_io, true, parent_io->internal.caller_ctx); 3690 } else { 3691 parent_io->internal.status = SPDK_BDEV_IO_STATUS_MISCOMPARE; 3692 parent_io->internal.cb(parent_io, false, parent_io->internal.caller_ctx); 3693 } 3694 } 3695 3696 static void 3697 bdev_compare_do_read(void *_bdev_io) 3698 { 3699 struct spdk_bdev_io *bdev_io = _bdev_io; 3700 int rc; 3701 3702 rc = spdk_bdev_read_blocks(bdev_io->internal.desc, 3703 spdk_io_channel_from_ctx(bdev_io->internal.ch), NULL, 3704 bdev_io->u.bdev.offset_blocks, bdev_io->u.bdev.num_blocks, 3705 bdev_compare_do_read_done, bdev_io); 3706 3707 if (rc == -ENOMEM) { 3708 bdev_queue_io_wait_with_cb(bdev_io, bdev_compare_do_read); 3709 } else if (rc != 0) { 3710 bdev_io->internal.status = SPDK_BDEV_IO_STATUS_FAILED; 3711 bdev_io->internal.cb(bdev_io, false, bdev_io->internal.caller_ctx); 3712 } 3713 } 3714 3715 static int 3716 bdev_comparev_blocks_with_md(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 3717 struct iovec *iov, int iovcnt, void *md_buf, 3718 uint64_t offset_blocks, uint64_t num_blocks, 3719 spdk_bdev_io_completion_cb cb, void *cb_arg) 3720 { 3721 struct spdk_bdev *bdev = spdk_bdev_desc_get_bdev(desc); 3722 struct spdk_bdev_io *bdev_io; 3723 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 3724 3725 if (!bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) { 3726 return -EINVAL; 3727 } 3728 3729 bdev_io = bdev_channel_get_io(channel); 3730 if (!bdev_io) { 3731 return -ENOMEM; 3732 } 3733 3734 bdev_io->internal.ch = channel; 3735 bdev_io->internal.desc = desc; 3736 bdev_io->type = SPDK_BDEV_IO_TYPE_COMPARE; 3737 bdev_io->u.bdev.iovs = iov; 3738 bdev_io->u.bdev.iovcnt = iovcnt; 3739 bdev_io->u.bdev.md_buf = md_buf; 3740 bdev_io->u.bdev.num_blocks = num_blocks; 3741 bdev_io->u.bdev.offset_blocks = offset_blocks; 3742 bdev_io_init(bdev_io, bdev, cb_arg, cb); 3743 3744 if (bdev_io_type_supported(bdev, SPDK_BDEV_IO_TYPE_COMPARE)) { 3745 bdev_io_submit(bdev_io); 3746 return 0; 3747 } 3748 3749 bdev_compare_do_read(bdev_io); 3750 3751 return 0; 3752 } 3753 3754 int 3755 spdk_bdev_comparev_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 3756 struct iovec *iov, int iovcnt, 3757 uint64_t offset_blocks, uint64_t num_blocks, 3758 spdk_bdev_io_completion_cb cb, void *cb_arg) 3759 { 3760 return bdev_comparev_blocks_with_md(desc, ch, iov, iovcnt, NULL, offset_blocks, 3761 num_blocks, cb, cb_arg); 3762 } 3763 3764 int 3765 spdk_bdev_comparev_blocks_with_md(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 3766 struct iovec *iov, int iovcnt, void *md_buf, 3767 uint64_t offset_blocks, uint64_t num_blocks, 3768 spdk_bdev_io_completion_cb cb, void *cb_arg) 3769 { 3770 if (!spdk_bdev_is_md_separate(spdk_bdev_desc_get_bdev(desc))) { 3771 return -EINVAL; 3772 } 3773 3774 if (!_bdev_io_check_md_buf(iov, md_buf)) { 3775 return -EINVAL; 3776 } 3777 3778 return bdev_comparev_blocks_with_md(desc, ch, iov, iovcnt, md_buf, offset_blocks, 3779 num_blocks, cb, cb_arg); 3780 } 3781 3782 static int 3783 bdev_compare_blocks_with_md(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 3784 void *buf, void *md_buf, uint64_t offset_blocks, uint64_t num_blocks, 3785 spdk_bdev_io_completion_cb cb, void *cb_arg) 3786 { 3787 struct spdk_bdev *bdev = spdk_bdev_desc_get_bdev(desc); 3788 struct spdk_bdev_io *bdev_io; 3789 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 3790 3791 if (!bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) { 3792 return -EINVAL; 3793 } 3794 3795 bdev_io = bdev_channel_get_io(channel); 3796 if (!bdev_io) { 3797 return -ENOMEM; 3798 } 3799 3800 bdev_io->internal.ch = channel; 3801 bdev_io->internal.desc = desc; 3802 bdev_io->type = SPDK_BDEV_IO_TYPE_COMPARE; 3803 bdev_io->u.bdev.iovs = &bdev_io->iov; 3804 bdev_io->u.bdev.iovs[0].iov_base = buf; 3805 bdev_io->u.bdev.iovs[0].iov_len = num_blocks * bdev->blocklen; 3806 bdev_io->u.bdev.iovcnt = 1; 3807 bdev_io->u.bdev.md_buf = md_buf; 3808 bdev_io->u.bdev.num_blocks = num_blocks; 3809 bdev_io->u.bdev.offset_blocks = offset_blocks; 3810 bdev_io_init(bdev_io, bdev, cb_arg, cb); 3811 3812 if (bdev_io_type_supported(bdev, SPDK_BDEV_IO_TYPE_COMPARE)) { 3813 bdev_io_submit(bdev_io); 3814 return 0; 3815 } 3816 3817 bdev_compare_do_read(bdev_io); 3818 3819 return 0; 3820 } 3821 3822 int 3823 spdk_bdev_compare_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 3824 void *buf, uint64_t offset_blocks, uint64_t num_blocks, 3825 spdk_bdev_io_completion_cb cb, void *cb_arg) 3826 { 3827 return bdev_compare_blocks_with_md(desc, ch, buf, NULL, offset_blocks, num_blocks, 3828 cb, cb_arg); 3829 } 3830 3831 int 3832 spdk_bdev_compare_blocks_with_md(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 3833 void *buf, void *md_buf, uint64_t offset_blocks, uint64_t num_blocks, 3834 spdk_bdev_io_completion_cb cb, void *cb_arg) 3835 { 3836 struct iovec iov = { 3837 .iov_base = buf, 3838 }; 3839 3840 if (!spdk_bdev_is_md_separate(spdk_bdev_desc_get_bdev(desc))) { 3841 return -EINVAL; 3842 } 3843 3844 if (!_bdev_io_check_md_buf(&iov, md_buf)) { 3845 return -EINVAL; 3846 } 3847 3848 return bdev_compare_blocks_with_md(desc, ch, buf, md_buf, offset_blocks, num_blocks, 3849 cb, cb_arg); 3850 } 3851 3852 static void 3853 bdev_comparev_and_writev_blocks_unlocked(void *ctx, int unlock_status) 3854 { 3855 struct spdk_bdev_io *bdev_io = ctx; 3856 3857 if (unlock_status) { 3858 SPDK_ERRLOG("LBA range unlock failed\n"); 3859 } 3860 3861 bdev_io->internal.cb(bdev_io, bdev_io->internal.status == SPDK_BDEV_IO_STATUS_SUCCESS ? true : 3862 false, bdev_io->internal.caller_ctx); 3863 } 3864 3865 static void 3866 bdev_comparev_and_writev_blocks_unlock(struct spdk_bdev_io *bdev_io, int status) 3867 { 3868 bdev_io->internal.status = status; 3869 3870 bdev_unlock_lba_range(bdev_io->internal.desc, spdk_io_channel_from_ctx(bdev_io->internal.ch), 3871 bdev_io->u.bdev.offset_blocks, bdev_io->u.bdev.num_blocks, 3872 bdev_comparev_and_writev_blocks_unlocked, bdev_io); 3873 } 3874 3875 static void 3876 bdev_compare_and_write_do_write_done(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg) 3877 { 3878 struct spdk_bdev_io *parent_io = cb_arg; 3879 3880 if (!success) { 3881 SPDK_ERRLOG("Compare and write operation failed\n"); 3882 } 3883 3884 spdk_bdev_free_io(bdev_io); 3885 3886 bdev_comparev_and_writev_blocks_unlock(parent_io, 3887 success ? SPDK_BDEV_IO_STATUS_SUCCESS : SPDK_BDEV_IO_STATUS_FAILED); 3888 } 3889 3890 static void 3891 bdev_compare_and_write_do_write(void *_bdev_io) 3892 { 3893 struct spdk_bdev_io *bdev_io = _bdev_io; 3894 int rc; 3895 3896 rc = spdk_bdev_writev_blocks(bdev_io->internal.desc, 3897 spdk_io_channel_from_ctx(bdev_io->internal.ch), 3898 bdev_io->u.bdev.fused_iovs, bdev_io->u.bdev.fused_iovcnt, 3899 bdev_io->u.bdev.offset_blocks, bdev_io->u.bdev.num_blocks, 3900 bdev_compare_and_write_do_write_done, bdev_io); 3901 3902 3903 if (rc == -ENOMEM) { 3904 bdev_queue_io_wait_with_cb(bdev_io, bdev_compare_and_write_do_write); 3905 } else if (rc != 0) { 3906 bdev_comparev_and_writev_blocks_unlock(bdev_io, SPDK_BDEV_IO_STATUS_FAILED); 3907 } 3908 } 3909 3910 static void 3911 bdev_compare_and_write_do_compare_done(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg) 3912 { 3913 struct spdk_bdev_io *parent_io = cb_arg; 3914 3915 spdk_bdev_free_io(bdev_io); 3916 3917 if (!success) { 3918 bdev_comparev_and_writev_blocks_unlock(parent_io, SPDK_BDEV_IO_STATUS_MISCOMPARE); 3919 return; 3920 } 3921 3922 bdev_compare_and_write_do_write(parent_io); 3923 } 3924 3925 static void 3926 bdev_compare_and_write_do_compare(void *_bdev_io) 3927 { 3928 struct spdk_bdev_io *bdev_io = _bdev_io; 3929 int rc; 3930 3931 rc = spdk_bdev_comparev_blocks(bdev_io->internal.desc, 3932 spdk_io_channel_from_ctx(bdev_io->internal.ch), bdev_io->u.bdev.iovs, 3933 bdev_io->u.bdev.iovcnt, bdev_io->u.bdev.offset_blocks, bdev_io->u.bdev.num_blocks, 3934 bdev_compare_and_write_do_compare_done, bdev_io); 3935 3936 if (rc == -ENOMEM) { 3937 bdev_queue_io_wait_with_cb(bdev_io, bdev_compare_and_write_do_compare); 3938 } else if (rc != 0) { 3939 bdev_comparev_and_writev_blocks_unlock(bdev_io, SPDK_BDEV_IO_STATUS_FIRST_FUSED_FAILED); 3940 } 3941 } 3942 3943 static void 3944 bdev_comparev_and_writev_blocks_locked(void *ctx, int status) 3945 { 3946 struct spdk_bdev_io *bdev_io = ctx; 3947 3948 if (status) { 3949 bdev_io->internal.status = SPDK_BDEV_IO_STATUS_FIRST_FUSED_FAILED; 3950 bdev_io->internal.cb(bdev_io, false, bdev_io->internal.caller_ctx); 3951 return; 3952 } 3953 3954 bdev_compare_and_write_do_compare(bdev_io); 3955 } 3956 3957 int 3958 spdk_bdev_comparev_and_writev_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 3959 struct iovec *compare_iov, int compare_iovcnt, 3960 struct iovec *write_iov, int write_iovcnt, 3961 uint64_t offset_blocks, uint64_t num_blocks, 3962 spdk_bdev_io_completion_cb cb, void *cb_arg) 3963 { 3964 struct spdk_bdev *bdev = spdk_bdev_desc_get_bdev(desc); 3965 struct spdk_bdev_io *bdev_io; 3966 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 3967 3968 if (!desc->write) { 3969 return -EBADF; 3970 } 3971 3972 if (!bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) { 3973 return -EINVAL; 3974 } 3975 3976 if (num_blocks > bdev->acwu) { 3977 return -EINVAL; 3978 } 3979 3980 bdev_io = bdev_channel_get_io(channel); 3981 if (!bdev_io) { 3982 return -ENOMEM; 3983 } 3984 3985 bdev_io->internal.ch = channel; 3986 bdev_io->internal.desc = desc; 3987 bdev_io->type = SPDK_BDEV_IO_TYPE_COMPARE_AND_WRITE; 3988 bdev_io->u.bdev.iovs = compare_iov; 3989 bdev_io->u.bdev.iovcnt = compare_iovcnt; 3990 bdev_io->u.bdev.fused_iovs = write_iov; 3991 bdev_io->u.bdev.fused_iovcnt = write_iovcnt; 3992 bdev_io->u.bdev.md_buf = NULL; 3993 bdev_io->u.bdev.num_blocks = num_blocks; 3994 bdev_io->u.bdev.offset_blocks = offset_blocks; 3995 bdev_io_init(bdev_io, bdev, cb_arg, cb); 3996 3997 if (bdev_io_type_supported(bdev, SPDK_BDEV_IO_TYPE_COMPARE_AND_WRITE)) { 3998 bdev_io_submit(bdev_io); 3999 return 0; 4000 } 4001 4002 return bdev_lock_lba_range(desc, ch, offset_blocks, num_blocks, 4003 bdev_comparev_and_writev_blocks_locked, bdev_io); 4004 } 4005 4006 static void 4007 bdev_zcopy_get_buf(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io, bool success) 4008 { 4009 if (!success) { 4010 /* Don't use spdk_bdev_io_complete here - this bdev_io was never actually submitted. */ 4011 bdev_io->internal.status = SPDK_BDEV_IO_STATUS_NOMEM; 4012 bdev_io->internal.cb(bdev_io, success, bdev_io->internal.caller_ctx); 4013 return; 4014 } 4015 4016 if (bdev_io->u.bdev.zcopy.populate) { 4017 /* Read the real data into the buffer */ 4018 bdev_io->type = SPDK_BDEV_IO_TYPE_READ; 4019 bdev_io->internal.status = SPDK_BDEV_IO_STATUS_PENDING; 4020 bdev_io_submit(bdev_io); 4021 return; 4022 } 4023 4024 /* Don't use spdk_bdev_io_complete here - this bdev_io was never actually submitted. */ 4025 bdev_io->internal.status = SPDK_BDEV_IO_STATUS_SUCCESS; 4026 bdev_io->internal.cb(bdev_io, success, bdev_io->internal.caller_ctx); 4027 } 4028 4029 int 4030 spdk_bdev_zcopy_start(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 4031 uint64_t offset_blocks, uint64_t num_blocks, 4032 bool populate, 4033 spdk_bdev_io_completion_cb cb, void *cb_arg) 4034 { 4035 struct spdk_bdev *bdev = spdk_bdev_desc_get_bdev(desc); 4036 struct spdk_bdev_io *bdev_io; 4037 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 4038 4039 if (!desc->write) { 4040 return -EBADF; 4041 } 4042 4043 if (!bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) { 4044 return -EINVAL; 4045 } 4046 4047 if (!spdk_bdev_io_type_supported(bdev, SPDK_BDEV_IO_TYPE_ZCOPY)) { 4048 return -ENOTSUP; 4049 } 4050 4051 bdev_io = bdev_channel_get_io(channel); 4052 if (!bdev_io) { 4053 return -ENOMEM; 4054 } 4055 4056 bdev_io->internal.ch = channel; 4057 bdev_io->internal.desc = desc; 4058 bdev_io->type = SPDK_BDEV_IO_TYPE_ZCOPY; 4059 bdev_io->u.bdev.num_blocks = num_blocks; 4060 bdev_io->u.bdev.offset_blocks = offset_blocks; 4061 bdev_io->u.bdev.iovs = NULL; 4062 bdev_io->u.bdev.iovcnt = 0; 4063 bdev_io->u.bdev.md_buf = NULL; 4064 bdev_io->u.bdev.zcopy.populate = populate ? 1 : 0; 4065 bdev_io->u.bdev.zcopy.commit = 0; 4066 bdev_io->u.bdev.zcopy.start = 1; 4067 bdev_io_init(bdev_io, bdev, cb_arg, cb); 4068 4069 if (bdev_io_type_supported(bdev, SPDK_BDEV_IO_TYPE_ZCOPY)) { 4070 bdev_io_submit(bdev_io); 4071 } else { 4072 /* Emulate zcopy by allocating a buffer */ 4073 spdk_bdev_io_get_buf(bdev_io, bdev_zcopy_get_buf, 4074 bdev_io->u.bdev.num_blocks * bdev->blocklen); 4075 } 4076 4077 return 0; 4078 } 4079 4080 int 4081 spdk_bdev_zcopy_end(struct spdk_bdev_io *bdev_io, bool commit, 4082 spdk_bdev_io_completion_cb cb, void *cb_arg) 4083 { 4084 struct spdk_bdev *bdev = bdev_io->bdev; 4085 4086 if (bdev_io->type == SPDK_BDEV_IO_TYPE_READ) { 4087 /* This can happen if the zcopy was emulated in start */ 4088 if (bdev_io->u.bdev.zcopy.start != 1) { 4089 return -EINVAL; 4090 } 4091 bdev_io->type = SPDK_BDEV_IO_TYPE_ZCOPY; 4092 } 4093 4094 if (bdev_io->type != SPDK_BDEV_IO_TYPE_ZCOPY) { 4095 return -EINVAL; 4096 } 4097 4098 bdev_io->u.bdev.zcopy.commit = commit ? 1 : 0; 4099 bdev_io->u.bdev.zcopy.start = 0; 4100 bdev_io->internal.caller_ctx = cb_arg; 4101 bdev_io->internal.cb = cb; 4102 bdev_io->internal.status = SPDK_BDEV_IO_STATUS_PENDING; 4103 4104 if (bdev_io_type_supported(bdev, SPDK_BDEV_IO_TYPE_ZCOPY)) { 4105 bdev_io_submit(bdev_io); 4106 return 0; 4107 } 4108 4109 if (!bdev_io->u.bdev.zcopy.commit) { 4110 /* Don't use spdk_bdev_io_complete here - this bdev_io was never actually submitted. */ 4111 bdev_io->internal.status = SPDK_BDEV_IO_STATUS_SUCCESS; 4112 bdev_io->internal.cb(bdev_io, true, bdev_io->internal.caller_ctx); 4113 return 0; 4114 } 4115 4116 bdev_io->type = SPDK_BDEV_IO_TYPE_WRITE; 4117 bdev_io_submit(bdev_io); 4118 4119 return 0; 4120 } 4121 4122 int 4123 spdk_bdev_write_zeroes(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 4124 uint64_t offset, uint64_t len, 4125 spdk_bdev_io_completion_cb cb, void *cb_arg) 4126 { 4127 uint64_t offset_blocks, num_blocks; 4128 4129 if (bdev_bytes_to_blocks(spdk_bdev_desc_get_bdev(desc), offset, &offset_blocks, 4130 len, &num_blocks) != 0) { 4131 return -EINVAL; 4132 } 4133 4134 return spdk_bdev_write_zeroes_blocks(desc, ch, offset_blocks, num_blocks, cb, cb_arg); 4135 } 4136 4137 int 4138 spdk_bdev_write_zeroes_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 4139 uint64_t offset_blocks, uint64_t num_blocks, 4140 spdk_bdev_io_completion_cb cb, void *cb_arg) 4141 { 4142 struct spdk_bdev *bdev = spdk_bdev_desc_get_bdev(desc); 4143 struct spdk_bdev_io *bdev_io; 4144 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 4145 4146 if (!desc->write) { 4147 return -EBADF; 4148 } 4149 4150 if (!bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) { 4151 return -EINVAL; 4152 } 4153 4154 if (!bdev_io_type_supported(bdev, SPDK_BDEV_IO_TYPE_WRITE_ZEROES) && 4155 !bdev_io_type_supported(bdev, SPDK_BDEV_IO_TYPE_WRITE)) { 4156 return -ENOTSUP; 4157 } 4158 4159 bdev_io = bdev_channel_get_io(channel); 4160 4161 if (!bdev_io) { 4162 return -ENOMEM; 4163 } 4164 4165 bdev_io->type = SPDK_BDEV_IO_TYPE_WRITE_ZEROES; 4166 bdev_io->internal.ch = channel; 4167 bdev_io->internal.desc = desc; 4168 bdev_io->u.bdev.offset_blocks = offset_blocks; 4169 bdev_io->u.bdev.num_blocks = num_blocks; 4170 bdev_io_init(bdev_io, bdev, cb_arg, cb); 4171 4172 if (bdev_io_type_supported(bdev, SPDK_BDEV_IO_TYPE_WRITE_ZEROES)) { 4173 bdev_io_submit(bdev_io); 4174 return 0; 4175 } 4176 4177 assert(bdev_io_type_supported(bdev, SPDK_BDEV_IO_TYPE_WRITE)); 4178 assert(_bdev_get_block_size_with_md(bdev) <= ZERO_BUFFER_SIZE); 4179 bdev_io->u.bdev.split_remaining_num_blocks = num_blocks; 4180 bdev_io->u.bdev.split_current_offset_blocks = offset_blocks; 4181 bdev_write_zero_buffer_next(bdev_io); 4182 4183 return 0; 4184 } 4185 4186 int 4187 spdk_bdev_unmap(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 4188 uint64_t offset, uint64_t nbytes, 4189 spdk_bdev_io_completion_cb cb, void *cb_arg) 4190 { 4191 uint64_t offset_blocks, num_blocks; 4192 4193 if (bdev_bytes_to_blocks(spdk_bdev_desc_get_bdev(desc), offset, &offset_blocks, 4194 nbytes, &num_blocks) != 0) { 4195 return -EINVAL; 4196 } 4197 4198 return spdk_bdev_unmap_blocks(desc, ch, offset_blocks, num_blocks, cb, cb_arg); 4199 } 4200 4201 int 4202 spdk_bdev_unmap_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 4203 uint64_t offset_blocks, uint64_t num_blocks, 4204 spdk_bdev_io_completion_cb cb, void *cb_arg) 4205 { 4206 struct spdk_bdev *bdev = spdk_bdev_desc_get_bdev(desc); 4207 struct spdk_bdev_io *bdev_io; 4208 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 4209 4210 if (!desc->write) { 4211 return -EBADF; 4212 } 4213 4214 if (!bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) { 4215 return -EINVAL; 4216 } 4217 4218 if (num_blocks == 0) { 4219 SPDK_ERRLOG("Can't unmap 0 bytes\n"); 4220 return -EINVAL; 4221 } 4222 4223 bdev_io = bdev_channel_get_io(channel); 4224 if (!bdev_io) { 4225 return -ENOMEM; 4226 } 4227 4228 bdev_io->internal.ch = channel; 4229 bdev_io->internal.desc = desc; 4230 bdev_io->type = SPDK_BDEV_IO_TYPE_UNMAP; 4231 4232 bdev_io->u.bdev.iovs = &bdev_io->iov; 4233 bdev_io->u.bdev.iovs[0].iov_base = NULL; 4234 bdev_io->u.bdev.iovs[0].iov_len = 0; 4235 bdev_io->u.bdev.iovcnt = 1; 4236 4237 bdev_io->u.bdev.offset_blocks = offset_blocks; 4238 bdev_io->u.bdev.num_blocks = num_blocks; 4239 bdev_io_init(bdev_io, bdev, cb_arg, cb); 4240 4241 bdev_io_submit(bdev_io); 4242 return 0; 4243 } 4244 4245 int 4246 spdk_bdev_flush(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 4247 uint64_t offset, uint64_t length, 4248 spdk_bdev_io_completion_cb cb, void *cb_arg) 4249 { 4250 uint64_t offset_blocks, num_blocks; 4251 4252 if (bdev_bytes_to_blocks(spdk_bdev_desc_get_bdev(desc), offset, &offset_blocks, 4253 length, &num_blocks) != 0) { 4254 return -EINVAL; 4255 } 4256 4257 return spdk_bdev_flush_blocks(desc, ch, offset_blocks, num_blocks, cb, cb_arg); 4258 } 4259 4260 int 4261 spdk_bdev_flush_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 4262 uint64_t offset_blocks, uint64_t num_blocks, 4263 spdk_bdev_io_completion_cb cb, void *cb_arg) 4264 { 4265 struct spdk_bdev *bdev = spdk_bdev_desc_get_bdev(desc); 4266 struct spdk_bdev_io *bdev_io; 4267 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 4268 4269 if (!desc->write) { 4270 return -EBADF; 4271 } 4272 4273 if (!bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) { 4274 return -EINVAL; 4275 } 4276 4277 bdev_io = bdev_channel_get_io(channel); 4278 if (!bdev_io) { 4279 return -ENOMEM; 4280 } 4281 4282 bdev_io->internal.ch = channel; 4283 bdev_io->internal.desc = desc; 4284 bdev_io->type = SPDK_BDEV_IO_TYPE_FLUSH; 4285 bdev_io->u.bdev.iovs = NULL; 4286 bdev_io->u.bdev.iovcnt = 0; 4287 bdev_io->u.bdev.offset_blocks = offset_blocks; 4288 bdev_io->u.bdev.num_blocks = num_blocks; 4289 bdev_io_init(bdev_io, bdev, cb_arg, cb); 4290 4291 bdev_io_submit(bdev_io); 4292 return 0; 4293 } 4294 4295 static void 4296 bdev_reset_dev(struct spdk_io_channel_iter *i, int status) 4297 { 4298 struct spdk_bdev_channel *ch = spdk_io_channel_iter_get_ctx(i); 4299 struct spdk_bdev_io *bdev_io; 4300 4301 bdev_io = TAILQ_FIRST(&ch->queued_resets); 4302 TAILQ_REMOVE(&ch->queued_resets, bdev_io, internal.link); 4303 bdev_io_submit_reset(bdev_io); 4304 } 4305 4306 static void 4307 bdev_reset_freeze_channel(struct spdk_io_channel_iter *i) 4308 { 4309 struct spdk_io_channel *ch; 4310 struct spdk_bdev_channel *channel; 4311 struct spdk_bdev_mgmt_channel *mgmt_channel; 4312 struct spdk_bdev_shared_resource *shared_resource; 4313 bdev_io_tailq_t tmp_queued; 4314 4315 TAILQ_INIT(&tmp_queued); 4316 4317 ch = spdk_io_channel_iter_get_channel(i); 4318 channel = spdk_io_channel_get_ctx(ch); 4319 shared_resource = channel->shared_resource; 4320 mgmt_channel = shared_resource->mgmt_ch; 4321 4322 channel->flags |= BDEV_CH_RESET_IN_PROGRESS; 4323 4324 if ((channel->flags & BDEV_CH_QOS_ENABLED) != 0) { 4325 /* The QoS object is always valid and readable while 4326 * the channel flag is set, so the lock here should not 4327 * be necessary. We're not in the fast path though, so 4328 * just take it anyway. */ 4329 pthread_mutex_lock(&channel->bdev->internal.mutex); 4330 if (channel->bdev->internal.qos->ch == channel) { 4331 TAILQ_SWAP(&channel->bdev->internal.qos->queued, &tmp_queued, spdk_bdev_io, internal.link); 4332 } 4333 pthread_mutex_unlock(&channel->bdev->internal.mutex); 4334 } 4335 4336 bdev_abort_all_queued_io(&shared_resource->nomem_io, channel); 4337 bdev_abort_all_buf_io(&mgmt_channel->need_buf_small, channel); 4338 bdev_abort_all_buf_io(&mgmt_channel->need_buf_large, channel); 4339 bdev_abort_all_queued_io(&tmp_queued, channel); 4340 4341 spdk_for_each_channel_continue(i, 0); 4342 } 4343 4344 static void 4345 bdev_start_reset(void *ctx) 4346 { 4347 struct spdk_bdev_channel *ch = ctx; 4348 4349 spdk_for_each_channel(__bdev_to_io_dev(ch->bdev), bdev_reset_freeze_channel, 4350 ch, bdev_reset_dev); 4351 } 4352 4353 static void 4354 bdev_channel_start_reset(struct spdk_bdev_channel *ch) 4355 { 4356 struct spdk_bdev *bdev = ch->bdev; 4357 4358 assert(!TAILQ_EMPTY(&ch->queued_resets)); 4359 4360 pthread_mutex_lock(&bdev->internal.mutex); 4361 if (bdev->internal.reset_in_progress == NULL) { 4362 bdev->internal.reset_in_progress = TAILQ_FIRST(&ch->queued_resets); 4363 /* 4364 * Take a channel reference for the target bdev for the life of this 4365 * reset. This guards against the channel getting destroyed while 4366 * spdk_for_each_channel() calls related to this reset IO are in 4367 * progress. We will release the reference when this reset is 4368 * completed. 4369 */ 4370 bdev->internal.reset_in_progress->u.reset.ch_ref = spdk_get_io_channel(__bdev_to_io_dev(bdev)); 4371 bdev_start_reset(ch); 4372 } 4373 pthread_mutex_unlock(&bdev->internal.mutex); 4374 } 4375 4376 int 4377 spdk_bdev_reset(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 4378 spdk_bdev_io_completion_cb cb, void *cb_arg) 4379 { 4380 struct spdk_bdev *bdev = spdk_bdev_desc_get_bdev(desc); 4381 struct spdk_bdev_io *bdev_io; 4382 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 4383 4384 bdev_io = bdev_channel_get_io(channel); 4385 if (!bdev_io) { 4386 return -ENOMEM; 4387 } 4388 4389 bdev_io->internal.ch = channel; 4390 bdev_io->internal.desc = desc; 4391 bdev_io->internal.submit_tsc = spdk_get_ticks(); 4392 bdev_io->type = SPDK_BDEV_IO_TYPE_RESET; 4393 bdev_io->u.reset.ch_ref = NULL; 4394 bdev_io_init(bdev_io, bdev, cb_arg, cb); 4395 4396 pthread_mutex_lock(&bdev->internal.mutex); 4397 TAILQ_INSERT_TAIL(&channel->queued_resets, bdev_io, internal.link); 4398 pthread_mutex_unlock(&bdev->internal.mutex); 4399 4400 TAILQ_INSERT_TAIL(&bdev_io->internal.ch->io_submitted, bdev_io, 4401 internal.ch_link); 4402 4403 bdev_channel_start_reset(channel); 4404 4405 return 0; 4406 } 4407 4408 void 4409 spdk_bdev_get_io_stat(struct spdk_bdev *bdev, struct spdk_io_channel *ch, 4410 struct spdk_bdev_io_stat *stat) 4411 { 4412 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 4413 4414 *stat = channel->stat; 4415 } 4416 4417 static void 4418 bdev_get_device_stat_done(struct spdk_io_channel_iter *i, int status) 4419 { 4420 void *io_device = spdk_io_channel_iter_get_io_device(i); 4421 struct spdk_bdev_iostat_ctx *bdev_iostat_ctx = spdk_io_channel_iter_get_ctx(i); 4422 4423 bdev_iostat_ctx->cb(__bdev_from_io_dev(io_device), bdev_iostat_ctx->stat, 4424 bdev_iostat_ctx->cb_arg, 0); 4425 free(bdev_iostat_ctx); 4426 } 4427 4428 static void 4429 bdev_get_each_channel_stat(struct spdk_io_channel_iter *i) 4430 { 4431 struct spdk_bdev_iostat_ctx *bdev_iostat_ctx = spdk_io_channel_iter_get_ctx(i); 4432 struct spdk_io_channel *ch = spdk_io_channel_iter_get_channel(i); 4433 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 4434 4435 bdev_io_stat_add(bdev_iostat_ctx->stat, &channel->stat); 4436 spdk_for_each_channel_continue(i, 0); 4437 } 4438 4439 void 4440 spdk_bdev_get_device_stat(struct spdk_bdev *bdev, struct spdk_bdev_io_stat *stat, 4441 spdk_bdev_get_device_stat_cb cb, void *cb_arg) 4442 { 4443 struct spdk_bdev_iostat_ctx *bdev_iostat_ctx; 4444 4445 assert(bdev != NULL); 4446 assert(stat != NULL); 4447 assert(cb != NULL); 4448 4449 bdev_iostat_ctx = calloc(1, sizeof(struct spdk_bdev_iostat_ctx)); 4450 if (bdev_iostat_ctx == NULL) { 4451 SPDK_ERRLOG("Unable to allocate memory for spdk_bdev_iostat_ctx\n"); 4452 cb(bdev, stat, cb_arg, -ENOMEM); 4453 return; 4454 } 4455 4456 bdev_iostat_ctx->stat = stat; 4457 bdev_iostat_ctx->cb = cb; 4458 bdev_iostat_ctx->cb_arg = cb_arg; 4459 4460 /* Start with the statistics from previously deleted channels. */ 4461 pthread_mutex_lock(&bdev->internal.mutex); 4462 bdev_io_stat_add(bdev_iostat_ctx->stat, &bdev->internal.stat); 4463 pthread_mutex_unlock(&bdev->internal.mutex); 4464 4465 /* Then iterate and add the statistics from each existing channel. */ 4466 spdk_for_each_channel(__bdev_to_io_dev(bdev), 4467 bdev_get_each_channel_stat, 4468 bdev_iostat_ctx, 4469 bdev_get_device_stat_done); 4470 } 4471 4472 int 4473 spdk_bdev_nvme_admin_passthru(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 4474 const struct spdk_nvme_cmd *cmd, void *buf, size_t nbytes, 4475 spdk_bdev_io_completion_cb cb, void *cb_arg) 4476 { 4477 struct spdk_bdev *bdev = spdk_bdev_desc_get_bdev(desc); 4478 struct spdk_bdev_io *bdev_io; 4479 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 4480 4481 if (!desc->write) { 4482 return -EBADF; 4483 } 4484 4485 bdev_io = bdev_channel_get_io(channel); 4486 if (!bdev_io) { 4487 return -ENOMEM; 4488 } 4489 4490 bdev_io->internal.ch = channel; 4491 bdev_io->internal.desc = desc; 4492 bdev_io->type = SPDK_BDEV_IO_TYPE_NVME_ADMIN; 4493 bdev_io->u.nvme_passthru.cmd = *cmd; 4494 bdev_io->u.nvme_passthru.buf = buf; 4495 bdev_io->u.nvme_passthru.nbytes = nbytes; 4496 bdev_io->u.nvme_passthru.md_buf = NULL; 4497 bdev_io->u.nvme_passthru.md_len = 0; 4498 4499 bdev_io_init(bdev_io, bdev, cb_arg, cb); 4500 4501 bdev_io_submit(bdev_io); 4502 return 0; 4503 } 4504 4505 int 4506 spdk_bdev_nvme_io_passthru(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 4507 const struct spdk_nvme_cmd *cmd, void *buf, size_t nbytes, 4508 spdk_bdev_io_completion_cb cb, void *cb_arg) 4509 { 4510 struct spdk_bdev *bdev = spdk_bdev_desc_get_bdev(desc); 4511 struct spdk_bdev_io *bdev_io; 4512 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 4513 4514 if (!desc->write) { 4515 /* 4516 * Do not try to parse the NVMe command - we could maybe use bits in the opcode 4517 * to easily determine if the command is a read or write, but for now just 4518 * do not allow io_passthru with a read-only descriptor. 4519 */ 4520 return -EBADF; 4521 } 4522 4523 bdev_io = bdev_channel_get_io(channel); 4524 if (!bdev_io) { 4525 return -ENOMEM; 4526 } 4527 4528 bdev_io->internal.ch = channel; 4529 bdev_io->internal.desc = desc; 4530 bdev_io->type = SPDK_BDEV_IO_TYPE_NVME_IO; 4531 bdev_io->u.nvme_passthru.cmd = *cmd; 4532 bdev_io->u.nvme_passthru.buf = buf; 4533 bdev_io->u.nvme_passthru.nbytes = nbytes; 4534 bdev_io->u.nvme_passthru.md_buf = NULL; 4535 bdev_io->u.nvme_passthru.md_len = 0; 4536 4537 bdev_io_init(bdev_io, bdev, cb_arg, cb); 4538 4539 bdev_io_submit(bdev_io); 4540 return 0; 4541 } 4542 4543 int 4544 spdk_bdev_nvme_io_passthru_md(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 4545 const struct spdk_nvme_cmd *cmd, void *buf, size_t nbytes, void *md_buf, size_t md_len, 4546 spdk_bdev_io_completion_cb cb, void *cb_arg) 4547 { 4548 struct spdk_bdev *bdev = spdk_bdev_desc_get_bdev(desc); 4549 struct spdk_bdev_io *bdev_io; 4550 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 4551 4552 if (!desc->write) { 4553 /* 4554 * Do not try to parse the NVMe command - we could maybe use bits in the opcode 4555 * to easily determine if the command is a read or write, but for now just 4556 * do not allow io_passthru with a read-only descriptor. 4557 */ 4558 return -EBADF; 4559 } 4560 4561 bdev_io = bdev_channel_get_io(channel); 4562 if (!bdev_io) { 4563 return -ENOMEM; 4564 } 4565 4566 bdev_io->internal.ch = channel; 4567 bdev_io->internal.desc = desc; 4568 bdev_io->type = SPDK_BDEV_IO_TYPE_NVME_IO_MD; 4569 bdev_io->u.nvme_passthru.cmd = *cmd; 4570 bdev_io->u.nvme_passthru.buf = buf; 4571 bdev_io->u.nvme_passthru.nbytes = nbytes; 4572 bdev_io->u.nvme_passthru.md_buf = md_buf; 4573 bdev_io->u.nvme_passthru.md_len = md_len; 4574 4575 bdev_io_init(bdev_io, bdev, cb_arg, cb); 4576 4577 bdev_io_submit(bdev_io); 4578 return 0; 4579 } 4580 4581 static void bdev_abort_retry(void *ctx); 4582 static void bdev_abort(struct spdk_bdev_io *parent_io); 4583 4584 static void 4585 bdev_abort_io_done(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg) 4586 { 4587 struct spdk_bdev_channel *channel = bdev_io->internal.ch; 4588 struct spdk_bdev_io *parent_io = cb_arg; 4589 struct spdk_bdev_io *bio_to_abort, *tmp_io; 4590 4591 bio_to_abort = bdev_io->u.abort.bio_to_abort; 4592 4593 spdk_bdev_free_io(bdev_io); 4594 4595 if (!success) { 4596 /* Check if the target I/O completed in the meantime. */ 4597 TAILQ_FOREACH(tmp_io, &channel->io_submitted, internal.ch_link) { 4598 if (tmp_io == bio_to_abort) { 4599 break; 4600 } 4601 } 4602 4603 /* If the target I/O still exists, set the parent to failed. */ 4604 if (tmp_io != NULL) { 4605 parent_io->internal.status = SPDK_BDEV_IO_STATUS_FAILED; 4606 } 4607 } 4608 4609 parent_io->u.bdev.split_outstanding--; 4610 if (parent_io->u.bdev.split_outstanding == 0) { 4611 if (parent_io->internal.status == SPDK_BDEV_IO_STATUS_NOMEM) { 4612 bdev_abort_retry(parent_io); 4613 } else { 4614 bdev_io_complete(parent_io); 4615 } 4616 } 4617 } 4618 4619 static int 4620 bdev_abort_io(struct spdk_bdev_desc *desc, struct spdk_bdev_channel *channel, 4621 struct spdk_bdev_io *bio_to_abort, 4622 spdk_bdev_io_completion_cb cb, void *cb_arg) 4623 { 4624 struct spdk_bdev *bdev = spdk_bdev_desc_get_bdev(desc); 4625 struct spdk_bdev_io *bdev_io; 4626 4627 if (bio_to_abort->type == SPDK_BDEV_IO_TYPE_ABORT || 4628 bio_to_abort->type == SPDK_BDEV_IO_TYPE_RESET) { 4629 /* TODO: Abort reset or abort request. */ 4630 return -ENOTSUP; 4631 } 4632 4633 bdev_io = bdev_channel_get_io(channel); 4634 if (bdev_io == NULL) { 4635 return -ENOMEM; 4636 } 4637 4638 bdev_io->internal.ch = channel; 4639 bdev_io->internal.desc = desc; 4640 bdev_io->type = SPDK_BDEV_IO_TYPE_ABORT; 4641 bdev_io_init(bdev_io, bdev, cb_arg, cb); 4642 4643 if (bdev->split_on_optimal_io_boundary && bdev_io_should_split(bio_to_abort)) { 4644 bdev_io->u.bdev.abort.bio_cb_arg = bio_to_abort; 4645 4646 /* Parent abort request is not submitted directly, but to manage its 4647 * execution add it to the submitted list here. 4648 */ 4649 bdev_io->internal.submit_tsc = spdk_get_ticks(); 4650 TAILQ_INSERT_TAIL(&channel->io_submitted, bdev_io, internal.ch_link); 4651 4652 bdev_abort(bdev_io); 4653 4654 return 0; 4655 } 4656 4657 bdev_io->u.abort.bio_to_abort = bio_to_abort; 4658 4659 /* Submit the abort request to the underlying bdev module. */ 4660 bdev_io_submit(bdev_io); 4661 4662 return 0; 4663 } 4664 4665 static uint32_t 4666 _bdev_abort(struct spdk_bdev_io *parent_io) 4667 { 4668 struct spdk_bdev_desc *desc = parent_io->internal.desc; 4669 struct spdk_bdev_channel *channel = parent_io->internal.ch; 4670 void *bio_cb_arg; 4671 struct spdk_bdev_io *bio_to_abort; 4672 uint32_t matched_ios; 4673 int rc; 4674 4675 bio_cb_arg = parent_io->u.bdev.abort.bio_cb_arg; 4676 4677 /* matched_ios is returned and will be kept by the caller. 4678 * 4679 * This funcion will be used for two cases, 1) the same cb_arg is used for 4680 * multiple I/Os, 2) a single large I/O is split into smaller ones. 4681 * Incrementing split_outstanding directly here may confuse readers especially 4682 * for the 1st case. 4683 * 4684 * Completion of I/O abort is processed after stack unwinding. Hence this trick 4685 * works as expected. 4686 */ 4687 matched_ios = 0; 4688 parent_io->internal.status = SPDK_BDEV_IO_STATUS_SUCCESS; 4689 4690 TAILQ_FOREACH(bio_to_abort, &channel->io_submitted, internal.ch_link) { 4691 if (bio_to_abort->internal.caller_ctx != bio_cb_arg) { 4692 continue; 4693 } 4694 4695 if (bio_to_abort->internal.submit_tsc > parent_io->internal.submit_tsc) { 4696 /* Any I/O which was submitted after this abort command should be excluded. */ 4697 continue; 4698 } 4699 4700 rc = bdev_abort_io(desc, channel, bio_to_abort, bdev_abort_io_done, parent_io); 4701 if (rc != 0) { 4702 if (rc == -ENOMEM) { 4703 parent_io->internal.status = SPDK_BDEV_IO_STATUS_NOMEM; 4704 } else { 4705 parent_io->internal.status = SPDK_BDEV_IO_STATUS_FAILED; 4706 } 4707 break; 4708 } 4709 matched_ios++; 4710 } 4711 4712 return matched_ios; 4713 } 4714 4715 static void 4716 bdev_abort_retry(void *ctx) 4717 { 4718 struct spdk_bdev_io *parent_io = ctx; 4719 uint32_t matched_ios; 4720 4721 matched_ios = _bdev_abort(parent_io); 4722 4723 if (matched_ios == 0) { 4724 if (parent_io->internal.status == SPDK_BDEV_IO_STATUS_NOMEM) { 4725 bdev_queue_io_wait_with_cb(parent_io, bdev_abort_retry); 4726 } else { 4727 /* For retry, the case that no target I/O was found is success 4728 * because it means target I/Os completed in the meantime. 4729 */ 4730 bdev_io_complete(parent_io); 4731 } 4732 return; 4733 } 4734 4735 /* Use split_outstanding to manage the progress of aborting I/Os. */ 4736 parent_io->u.bdev.split_outstanding = matched_ios; 4737 } 4738 4739 static void 4740 bdev_abort(struct spdk_bdev_io *parent_io) 4741 { 4742 uint32_t matched_ios; 4743 4744 matched_ios = _bdev_abort(parent_io); 4745 4746 if (matched_ios == 0) { 4747 if (parent_io->internal.status == SPDK_BDEV_IO_STATUS_NOMEM) { 4748 bdev_queue_io_wait_with_cb(parent_io, bdev_abort_retry); 4749 } else { 4750 /* The case the no target I/O was found is failure. */ 4751 parent_io->internal.status = SPDK_BDEV_IO_STATUS_FAILED; 4752 bdev_io_complete(parent_io); 4753 } 4754 return; 4755 } 4756 4757 /* Use split_outstanding to manage the progress of aborting I/Os. */ 4758 parent_io->u.bdev.split_outstanding = matched_ios; 4759 } 4760 4761 int 4762 spdk_bdev_abort(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 4763 void *bio_cb_arg, 4764 spdk_bdev_io_completion_cb cb, void *cb_arg) 4765 { 4766 struct spdk_bdev *bdev = spdk_bdev_desc_get_bdev(desc); 4767 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 4768 struct spdk_bdev_io *bdev_io; 4769 4770 if (bio_cb_arg == NULL) { 4771 return -EINVAL; 4772 } 4773 4774 if (!spdk_bdev_io_type_supported(bdev, SPDK_BDEV_IO_TYPE_ABORT)) { 4775 return -ENOTSUP; 4776 } 4777 4778 bdev_io = bdev_channel_get_io(channel); 4779 if (bdev_io == NULL) { 4780 return -ENOMEM; 4781 } 4782 4783 bdev_io->internal.ch = channel; 4784 bdev_io->internal.desc = desc; 4785 bdev_io->internal.submit_tsc = spdk_get_ticks(); 4786 bdev_io->type = SPDK_BDEV_IO_TYPE_ABORT; 4787 bdev_io_init(bdev_io, bdev, cb_arg, cb); 4788 4789 bdev_io->u.bdev.abort.bio_cb_arg = bio_cb_arg; 4790 4791 /* Parent abort request is not submitted directly, but to manage its execution, 4792 * add it to the submitted list here. 4793 */ 4794 TAILQ_INSERT_TAIL(&channel->io_submitted, bdev_io, internal.ch_link); 4795 4796 bdev_abort(bdev_io); 4797 4798 return 0; 4799 } 4800 4801 int 4802 spdk_bdev_queue_io_wait(struct spdk_bdev *bdev, struct spdk_io_channel *ch, 4803 struct spdk_bdev_io_wait_entry *entry) 4804 { 4805 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 4806 struct spdk_bdev_mgmt_channel *mgmt_ch = channel->shared_resource->mgmt_ch; 4807 4808 if (bdev != entry->bdev) { 4809 SPDK_ERRLOG("bdevs do not match\n"); 4810 return -EINVAL; 4811 } 4812 4813 if (mgmt_ch->per_thread_cache_count > 0) { 4814 SPDK_ERRLOG("Cannot queue io_wait if spdk_bdev_io available in per-thread cache\n"); 4815 return -EINVAL; 4816 } 4817 4818 TAILQ_INSERT_TAIL(&mgmt_ch->io_wait_queue, entry, link); 4819 return 0; 4820 } 4821 4822 static void 4823 bdev_ch_retry_io(struct spdk_bdev_channel *bdev_ch) 4824 { 4825 struct spdk_bdev *bdev = bdev_ch->bdev; 4826 struct spdk_bdev_shared_resource *shared_resource = bdev_ch->shared_resource; 4827 struct spdk_bdev_io *bdev_io; 4828 4829 if (shared_resource->io_outstanding > shared_resource->nomem_threshold) { 4830 /* 4831 * Allow some more I/O to complete before retrying the nomem_io queue. 4832 * Some drivers (such as nvme) cannot immediately take a new I/O in 4833 * the context of a completion, because the resources for the I/O are 4834 * not released until control returns to the bdev poller. Also, we 4835 * may require several small I/O to complete before a larger I/O 4836 * (that requires splitting) can be submitted. 4837 */ 4838 return; 4839 } 4840 4841 while (!TAILQ_EMPTY(&shared_resource->nomem_io)) { 4842 bdev_io = TAILQ_FIRST(&shared_resource->nomem_io); 4843 TAILQ_REMOVE(&shared_resource->nomem_io, bdev_io, internal.link); 4844 bdev_io->internal.ch->io_outstanding++; 4845 shared_resource->io_outstanding++; 4846 bdev_io->internal.status = SPDK_BDEV_IO_STATUS_PENDING; 4847 bdev_io->internal.error.nvme.cdw0 = 0; 4848 bdev_io->num_retries++; 4849 bdev->fn_table->submit_request(spdk_bdev_io_get_io_channel(bdev_io), bdev_io); 4850 if (bdev_io->internal.status == SPDK_BDEV_IO_STATUS_NOMEM) { 4851 break; 4852 } 4853 } 4854 } 4855 4856 static inline void 4857 bdev_io_complete(void *ctx) 4858 { 4859 struct spdk_bdev_io *bdev_io = ctx; 4860 struct spdk_bdev_channel *bdev_ch = bdev_io->internal.ch; 4861 uint64_t tsc, tsc_diff; 4862 4863 if (spdk_unlikely(bdev_io->internal.in_submit_request || bdev_io->internal.io_submit_ch)) { 4864 /* 4865 * Send the completion to the thread that originally submitted the I/O, 4866 * which may not be the current thread in the case of QoS. 4867 */ 4868 if (bdev_io->internal.io_submit_ch) { 4869 bdev_io->internal.ch = bdev_io->internal.io_submit_ch; 4870 bdev_io->internal.io_submit_ch = NULL; 4871 } 4872 4873 /* 4874 * Defer completion to avoid potential infinite recursion if the 4875 * user's completion callback issues a new I/O. 4876 */ 4877 spdk_thread_send_msg(spdk_bdev_io_get_thread(bdev_io), 4878 bdev_io_complete, bdev_io); 4879 return; 4880 } 4881 4882 tsc = spdk_get_ticks(); 4883 tsc_diff = tsc - bdev_io->internal.submit_tsc; 4884 spdk_trace_record_tsc(tsc, TRACE_BDEV_IO_DONE, 0, 0, (uintptr_t)bdev_io, 0); 4885 4886 TAILQ_REMOVE(&bdev_ch->io_submitted, bdev_io, internal.ch_link); 4887 4888 if (bdev_io->internal.ch->histogram) { 4889 spdk_histogram_data_tally(bdev_io->internal.ch->histogram, tsc_diff); 4890 } 4891 4892 if (bdev_io->internal.status == SPDK_BDEV_IO_STATUS_SUCCESS) { 4893 switch (bdev_io->type) { 4894 case SPDK_BDEV_IO_TYPE_READ: 4895 bdev_io->internal.ch->stat.bytes_read += bdev_io->u.bdev.num_blocks * bdev_io->bdev->blocklen; 4896 bdev_io->internal.ch->stat.num_read_ops++; 4897 bdev_io->internal.ch->stat.read_latency_ticks += tsc_diff; 4898 break; 4899 case SPDK_BDEV_IO_TYPE_WRITE: 4900 bdev_io->internal.ch->stat.bytes_written += bdev_io->u.bdev.num_blocks * bdev_io->bdev->blocklen; 4901 bdev_io->internal.ch->stat.num_write_ops++; 4902 bdev_io->internal.ch->stat.write_latency_ticks += tsc_diff; 4903 break; 4904 case SPDK_BDEV_IO_TYPE_UNMAP: 4905 bdev_io->internal.ch->stat.bytes_unmapped += bdev_io->u.bdev.num_blocks * bdev_io->bdev->blocklen; 4906 bdev_io->internal.ch->stat.num_unmap_ops++; 4907 bdev_io->internal.ch->stat.unmap_latency_ticks += tsc_diff; 4908 break; 4909 case SPDK_BDEV_IO_TYPE_ZCOPY: 4910 /* Track the data in the start phase only */ 4911 if (bdev_io->u.bdev.zcopy.start) { 4912 if (bdev_io->u.bdev.zcopy.populate) { 4913 bdev_io->internal.ch->stat.bytes_read += 4914 bdev_io->u.bdev.num_blocks * bdev_io->bdev->blocklen; 4915 bdev_io->internal.ch->stat.num_read_ops++; 4916 bdev_io->internal.ch->stat.read_latency_ticks += tsc_diff; 4917 } else { 4918 bdev_io->internal.ch->stat.bytes_written += 4919 bdev_io->u.bdev.num_blocks * bdev_io->bdev->blocklen; 4920 bdev_io->internal.ch->stat.num_write_ops++; 4921 bdev_io->internal.ch->stat.write_latency_ticks += tsc_diff; 4922 } 4923 } 4924 break; 4925 default: 4926 break; 4927 } 4928 } 4929 4930 #ifdef SPDK_CONFIG_VTUNE 4931 uint64_t now_tsc = spdk_get_ticks(); 4932 if (now_tsc > (bdev_io->internal.ch->start_tsc + bdev_io->internal.ch->interval_tsc)) { 4933 uint64_t data[5]; 4934 4935 data[0] = bdev_io->internal.ch->stat.num_read_ops - bdev_io->internal.ch->prev_stat.num_read_ops; 4936 data[1] = bdev_io->internal.ch->stat.bytes_read - bdev_io->internal.ch->prev_stat.bytes_read; 4937 data[2] = bdev_io->internal.ch->stat.num_write_ops - bdev_io->internal.ch->prev_stat.num_write_ops; 4938 data[3] = bdev_io->internal.ch->stat.bytes_written - bdev_io->internal.ch->prev_stat.bytes_written; 4939 data[4] = bdev_io->bdev->fn_table->get_spin_time ? 4940 bdev_io->bdev->fn_table->get_spin_time(spdk_bdev_io_get_io_channel(bdev_io)) : 0; 4941 4942 __itt_metadata_add(g_bdev_mgr.domain, __itt_null, bdev_io->internal.ch->handle, 4943 __itt_metadata_u64, 5, data); 4944 4945 bdev_io->internal.ch->prev_stat = bdev_io->internal.ch->stat; 4946 bdev_io->internal.ch->start_tsc = now_tsc; 4947 } 4948 #endif 4949 4950 assert(bdev_io->internal.cb != NULL); 4951 assert(spdk_get_thread() == spdk_bdev_io_get_thread(bdev_io)); 4952 4953 bdev_io->internal.cb(bdev_io, bdev_io->internal.status == SPDK_BDEV_IO_STATUS_SUCCESS, 4954 bdev_io->internal.caller_ctx); 4955 } 4956 4957 static void 4958 bdev_reset_complete(struct spdk_io_channel_iter *i, int status) 4959 { 4960 struct spdk_bdev_io *bdev_io = spdk_io_channel_iter_get_ctx(i); 4961 4962 if (bdev_io->u.reset.ch_ref != NULL) { 4963 spdk_put_io_channel(bdev_io->u.reset.ch_ref); 4964 bdev_io->u.reset.ch_ref = NULL; 4965 } 4966 4967 bdev_io_complete(bdev_io); 4968 } 4969 4970 static void 4971 bdev_unfreeze_channel(struct spdk_io_channel_iter *i) 4972 { 4973 struct spdk_bdev_io *bdev_io = spdk_io_channel_iter_get_ctx(i); 4974 struct spdk_io_channel *_ch = spdk_io_channel_iter_get_channel(i); 4975 struct spdk_bdev_channel *ch = spdk_io_channel_get_ctx(_ch); 4976 struct spdk_bdev_io *queued_reset; 4977 4978 ch->flags &= ~BDEV_CH_RESET_IN_PROGRESS; 4979 while (!TAILQ_EMPTY(&ch->queued_resets)) { 4980 queued_reset = TAILQ_FIRST(&ch->queued_resets); 4981 TAILQ_REMOVE(&ch->queued_resets, queued_reset, internal.link); 4982 spdk_bdev_io_complete(queued_reset, bdev_io->internal.status); 4983 } 4984 4985 spdk_for_each_channel_continue(i, 0); 4986 } 4987 4988 void 4989 spdk_bdev_io_complete(struct spdk_bdev_io *bdev_io, enum spdk_bdev_io_status status) 4990 { 4991 struct spdk_bdev *bdev = bdev_io->bdev; 4992 struct spdk_bdev_channel *bdev_ch = bdev_io->internal.ch; 4993 struct spdk_bdev_shared_resource *shared_resource = bdev_ch->shared_resource; 4994 4995 bdev_io->internal.status = status; 4996 4997 if (spdk_unlikely(bdev_io->type == SPDK_BDEV_IO_TYPE_RESET)) { 4998 bool unlock_channels = false; 4999 5000 if (status == SPDK_BDEV_IO_STATUS_NOMEM) { 5001 SPDK_ERRLOG("NOMEM returned for reset\n"); 5002 } 5003 pthread_mutex_lock(&bdev->internal.mutex); 5004 if (bdev_io == bdev->internal.reset_in_progress) { 5005 bdev->internal.reset_in_progress = NULL; 5006 unlock_channels = true; 5007 } 5008 pthread_mutex_unlock(&bdev->internal.mutex); 5009 5010 if (unlock_channels) { 5011 spdk_for_each_channel(__bdev_to_io_dev(bdev), bdev_unfreeze_channel, 5012 bdev_io, bdev_reset_complete); 5013 return; 5014 } 5015 } else { 5016 _bdev_io_unset_bounce_buf(bdev_io); 5017 5018 assert(bdev_ch->io_outstanding > 0); 5019 assert(shared_resource->io_outstanding > 0); 5020 bdev_ch->io_outstanding--; 5021 shared_resource->io_outstanding--; 5022 5023 if (spdk_unlikely(status == SPDK_BDEV_IO_STATUS_NOMEM)) { 5024 TAILQ_INSERT_HEAD(&shared_resource->nomem_io, bdev_io, internal.link); 5025 /* 5026 * Wait for some of the outstanding I/O to complete before we 5027 * retry any of the nomem_io. Normally we will wait for 5028 * NOMEM_THRESHOLD_COUNT I/O to complete but for low queue 5029 * depth channels we will instead wait for half to complete. 5030 */ 5031 shared_resource->nomem_threshold = spdk_max((int64_t)shared_resource->io_outstanding / 2, 5032 (int64_t)shared_resource->io_outstanding - NOMEM_THRESHOLD_COUNT); 5033 return; 5034 } 5035 5036 if (spdk_unlikely(!TAILQ_EMPTY(&shared_resource->nomem_io))) { 5037 bdev_ch_retry_io(bdev_ch); 5038 } 5039 } 5040 5041 bdev_io_complete(bdev_io); 5042 } 5043 5044 void 5045 spdk_bdev_io_complete_scsi_status(struct spdk_bdev_io *bdev_io, enum spdk_scsi_status sc, 5046 enum spdk_scsi_sense sk, uint8_t asc, uint8_t ascq) 5047 { 5048 if (sc == SPDK_SCSI_STATUS_GOOD) { 5049 bdev_io->internal.status = SPDK_BDEV_IO_STATUS_SUCCESS; 5050 } else { 5051 bdev_io->internal.status = SPDK_BDEV_IO_STATUS_SCSI_ERROR; 5052 bdev_io->internal.error.scsi.sc = sc; 5053 bdev_io->internal.error.scsi.sk = sk; 5054 bdev_io->internal.error.scsi.asc = asc; 5055 bdev_io->internal.error.scsi.ascq = ascq; 5056 } 5057 5058 spdk_bdev_io_complete(bdev_io, bdev_io->internal.status); 5059 } 5060 5061 void 5062 spdk_bdev_io_get_scsi_status(const struct spdk_bdev_io *bdev_io, 5063 int *sc, int *sk, int *asc, int *ascq) 5064 { 5065 assert(sc != NULL); 5066 assert(sk != NULL); 5067 assert(asc != NULL); 5068 assert(ascq != NULL); 5069 5070 switch (bdev_io->internal.status) { 5071 case SPDK_BDEV_IO_STATUS_SUCCESS: 5072 *sc = SPDK_SCSI_STATUS_GOOD; 5073 *sk = SPDK_SCSI_SENSE_NO_SENSE; 5074 *asc = SPDK_SCSI_ASC_NO_ADDITIONAL_SENSE; 5075 *ascq = SPDK_SCSI_ASCQ_CAUSE_NOT_REPORTABLE; 5076 break; 5077 case SPDK_BDEV_IO_STATUS_NVME_ERROR: 5078 spdk_scsi_nvme_translate(bdev_io, sc, sk, asc, ascq); 5079 break; 5080 case SPDK_BDEV_IO_STATUS_SCSI_ERROR: 5081 *sc = bdev_io->internal.error.scsi.sc; 5082 *sk = bdev_io->internal.error.scsi.sk; 5083 *asc = bdev_io->internal.error.scsi.asc; 5084 *ascq = bdev_io->internal.error.scsi.ascq; 5085 break; 5086 default: 5087 *sc = SPDK_SCSI_STATUS_CHECK_CONDITION; 5088 *sk = SPDK_SCSI_SENSE_ABORTED_COMMAND; 5089 *asc = SPDK_SCSI_ASC_NO_ADDITIONAL_SENSE; 5090 *ascq = SPDK_SCSI_ASCQ_CAUSE_NOT_REPORTABLE; 5091 break; 5092 } 5093 } 5094 5095 void 5096 spdk_bdev_io_complete_nvme_status(struct spdk_bdev_io *bdev_io, uint32_t cdw0, int sct, int sc) 5097 { 5098 if (sct == SPDK_NVME_SCT_GENERIC && sc == SPDK_NVME_SC_SUCCESS) { 5099 bdev_io->internal.status = SPDK_BDEV_IO_STATUS_SUCCESS; 5100 } else { 5101 bdev_io->internal.status = SPDK_BDEV_IO_STATUS_NVME_ERROR; 5102 } 5103 5104 bdev_io->internal.error.nvme.cdw0 = cdw0; 5105 bdev_io->internal.error.nvme.sct = sct; 5106 bdev_io->internal.error.nvme.sc = sc; 5107 5108 spdk_bdev_io_complete(bdev_io, bdev_io->internal.status); 5109 } 5110 5111 void 5112 spdk_bdev_io_get_nvme_status(const struct spdk_bdev_io *bdev_io, uint32_t *cdw0, int *sct, int *sc) 5113 { 5114 assert(sct != NULL); 5115 assert(sc != NULL); 5116 assert(cdw0 != NULL); 5117 5118 if (bdev_io->internal.status == SPDK_BDEV_IO_STATUS_NVME_ERROR) { 5119 *sct = bdev_io->internal.error.nvme.sct; 5120 *sc = bdev_io->internal.error.nvme.sc; 5121 } else if (bdev_io->internal.status == SPDK_BDEV_IO_STATUS_SUCCESS) { 5122 *sct = SPDK_NVME_SCT_GENERIC; 5123 *sc = SPDK_NVME_SC_SUCCESS; 5124 } else if (bdev_io->internal.status == SPDK_BDEV_IO_STATUS_ABORTED) { 5125 *sct = SPDK_NVME_SCT_GENERIC; 5126 *sc = SPDK_NVME_SC_ABORTED_BY_REQUEST; 5127 } else { 5128 *sct = SPDK_NVME_SCT_GENERIC; 5129 *sc = SPDK_NVME_SC_INTERNAL_DEVICE_ERROR; 5130 } 5131 5132 *cdw0 = bdev_io->internal.error.nvme.cdw0; 5133 } 5134 5135 void 5136 spdk_bdev_io_get_nvme_fused_status(const struct spdk_bdev_io *bdev_io, uint32_t *cdw0, 5137 int *first_sct, int *first_sc, int *second_sct, int *second_sc) 5138 { 5139 assert(first_sct != NULL); 5140 assert(first_sc != NULL); 5141 assert(second_sct != NULL); 5142 assert(second_sc != NULL); 5143 assert(cdw0 != NULL); 5144 5145 if (bdev_io->internal.status == SPDK_BDEV_IO_STATUS_NVME_ERROR) { 5146 if (bdev_io->internal.error.nvme.sct == SPDK_NVME_SCT_MEDIA_ERROR && 5147 bdev_io->internal.error.nvme.sc == SPDK_NVME_SC_COMPARE_FAILURE) { 5148 *first_sct = bdev_io->internal.error.nvme.sct; 5149 *first_sc = bdev_io->internal.error.nvme.sc; 5150 *second_sct = SPDK_NVME_SCT_GENERIC; 5151 *second_sc = SPDK_NVME_SC_ABORTED_FAILED_FUSED; 5152 } else { 5153 *first_sct = SPDK_NVME_SCT_GENERIC; 5154 *first_sc = SPDK_NVME_SC_SUCCESS; 5155 *second_sct = bdev_io->internal.error.nvme.sct; 5156 *second_sc = bdev_io->internal.error.nvme.sc; 5157 } 5158 } else if (bdev_io->internal.status == SPDK_BDEV_IO_STATUS_SUCCESS) { 5159 *first_sct = SPDK_NVME_SCT_GENERIC; 5160 *first_sc = SPDK_NVME_SC_SUCCESS; 5161 *second_sct = SPDK_NVME_SCT_GENERIC; 5162 *second_sc = SPDK_NVME_SC_SUCCESS; 5163 } else if (bdev_io->internal.status == SPDK_BDEV_IO_STATUS_FIRST_FUSED_FAILED) { 5164 *first_sct = SPDK_NVME_SCT_GENERIC; 5165 *first_sc = SPDK_NVME_SC_INTERNAL_DEVICE_ERROR; 5166 *second_sct = SPDK_NVME_SCT_GENERIC; 5167 *second_sc = SPDK_NVME_SC_ABORTED_FAILED_FUSED; 5168 } else if (bdev_io->internal.status == SPDK_BDEV_IO_STATUS_MISCOMPARE) { 5169 *first_sct = SPDK_NVME_SCT_MEDIA_ERROR; 5170 *first_sc = SPDK_NVME_SC_COMPARE_FAILURE; 5171 *second_sct = SPDK_NVME_SCT_GENERIC; 5172 *second_sc = SPDK_NVME_SC_ABORTED_FAILED_FUSED; 5173 } else { 5174 *first_sct = SPDK_NVME_SCT_GENERIC; 5175 *first_sc = SPDK_NVME_SC_INTERNAL_DEVICE_ERROR; 5176 *second_sct = SPDK_NVME_SCT_GENERIC; 5177 *second_sc = SPDK_NVME_SC_INTERNAL_DEVICE_ERROR; 5178 } 5179 5180 *cdw0 = bdev_io->internal.error.nvme.cdw0; 5181 } 5182 5183 struct spdk_thread * 5184 spdk_bdev_io_get_thread(struct spdk_bdev_io *bdev_io) 5185 { 5186 return spdk_io_channel_get_thread(bdev_io->internal.ch->channel); 5187 } 5188 5189 struct spdk_io_channel * 5190 spdk_bdev_io_get_io_channel(struct spdk_bdev_io *bdev_io) 5191 { 5192 return bdev_io->internal.ch->channel; 5193 } 5194 5195 static int 5196 bdev_init(struct spdk_bdev *bdev) 5197 { 5198 char *bdev_name; 5199 5200 assert(bdev->module != NULL); 5201 5202 if (!bdev->name) { 5203 SPDK_ERRLOG("Bdev name is NULL\n"); 5204 return -EINVAL; 5205 } 5206 5207 if (!strlen(bdev->name)) { 5208 SPDK_ERRLOG("Bdev name must not be an empty string\n"); 5209 return -EINVAL; 5210 } 5211 5212 if (spdk_bdev_get_by_name(bdev->name)) { 5213 SPDK_ERRLOG("Bdev name:%s already exists\n", bdev->name); 5214 return -EEXIST; 5215 } 5216 5217 /* Users often register their own I/O devices using the bdev name. In 5218 * order to avoid conflicts, prepend bdev_. */ 5219 bdev_name = spdk_sprintf_alloc("bdev_%s", bdev->name); 5220 if (!bdev_name) { 5221 SPDK_ERRLOG("Unable to allocate memory for internal bdev name.\n"); 5222 return -ENOMEM; 5223 } 5224 5225 bdev->internal.status = SPDK_BDEV_STATUS_READY; 5226 bdev->internal.measured_queue_depth = UINT64_MAX; 5227 bdev->internal.claim_module = NULL; 5228 bdev->internal.qd_poller = NULL; 5229 bdev->internal.qos = NULL; 5230 5231 /* If the user didn't specify a uuid, generate one. */ 5232 if (spdk_mem_all_zero(&bdev->uuid, sizeof(bdev->uuid))) { 5233 spdk_uuid_generate(&bdev->uuid); 5234 } 5235 5236 if (spdk_bdev_get_buf_align(bdev) > 1) { 5237 if (bdev->split_on_optimal_io_boundary) { 5238 bdev->optimal_io_boundary = spdk_min(bdev->optimal_io_boundary, 5239 SPDK_BDEV_LARGE_BUF_MAX_SIZE / bdev->blocklen); 5240 } else { 5241 bdev->split_on_optimal_io_boundary = true; 5242 bdev->optimal_io_boundary = SPDK_BDEV_LARGE_BUF_MAX_SIZE / bdev->blocklen; 5243 } 5244 } 5245 5246 /* If the user didn't specify a write unit size, set it to one. */ 5247 if (bdev->write_unit_size == 0) { 5248 bdev->write_unit_size = 1; 5249 } 5250 5251 /* Set ACWU value to 1 if bdev module did not set it (does not support it natively) */ 5252 if (bdev->acwu == 0) { 5253 bdev->acwu = 1; 5254 } 5255 5256 TAILQ_INIT(&bdev->internal.open_descs); 5257 TAILQ_INIT(&bdev->internal.locked_ranges); 5258 TAILQ_INIT(&bdev->internal.pending_locked_ranges); 5259 5260 TAILQ_INIT(&bdev->aliases); 5261 5262 bdev->internal.reset_in_progress = NULL; 5263 5264 spdk_io_device_register(__bdev_to_io_dev(bdev), 5265 bdev_channel_create, bdev_channel_destroy, 5266 sizeof(struct spdk_bdev_channel), 5267 bdev_name); 5268 5269 free(bdev_name); 5270 5271 pthread_mutex_init(&bdev->internal.mutex, NULL); 5272 return 0; 5273 } 5274 5275 static void 5276 bdev_destroy_cb(void *io_device) 5277 { 5278 int rc; 5279 struct spdk_bdev *bdev; 5280 spdk_bdev_unregister_cb cb_fn; 5281 void *cb_arg; 5282 5283 bdev = __bdev_from_io_dev(io_device); 5284 cb_fn = bdev->internal.unregister_cb; 5285 cb_arg = bdev->internal.unregister_ctx; 5286 5287 rc = bdev->fn_table->destruct(bdev->ctxt); 5288 if (rc < 0) { 5289 SPDK_ERRLOG("destruct failed\n"); 5290 } 5291 if (rc <= 0 && cb_fn != NULL) { 5292 cb_fn(cb_arg, rc); 5293 } 5294 } 5295 5296 5297 static void 5298 bdev_fini(struct spdk_bdev *bdev) 5299 { 5300 pthread_mutex_destroy(&bdev->internal.mutex); 5301 5302 free(bdev->internal.qos); 5303 5304 spdk_io_device_unregister(__bdev_to_io_dev(bdev), bdev_destroy_cb); 5305 } 5306 5307 static void 5308 bdev_start(struct spdk_bdev *bdev) 5309 { 5310 SPDK_DEBUGLOG(bdev, "Inserting bdev %s into list\n", bdev->name); 5311 TAILQ_INSERT_TAIL(&g_bdev_mgr.bdevs, bdev, internal.link); 5312 5313 /* Examine configuration before initializing I/O */ 5314 bdev_examine(bdev); 5315 } 5316 5317 int 5318 spdk_bdev_register(struct spdk_bdev *bdev) 5319 { 5320 int rc = bdev_init(bdev); 5321 5322 if (rc == 0) { 5323 bdev_start(bdev); 5324 } 5325 5326 spdk_notify_send("bdev_register", spdk_bdev_get_name(bdev)); 5327 return rc; 5328 } 5329 5330 int 5331 spdk_vbdev_register(struct spdk_bdev *vbdev, struct spdk_bdev **base_bdevs, int base_bdev_count) 5332 { 5333 SPDK_ERRLOG("This function is deprecated. Use spdk_bdev_register() instead.\n"); 5334 return spdk_bdev_register(vbdev); 5335 } 5336 5337 void 5338 spdk_bdev_destruct_done(struct spdk_bdev *bdev, int bdeverrno) 5339 { 5340 if (bdev->internal.unregister_cb != NULL) { 5341 bdev->internal.unregister_cb(bdev->internal.unregister_ctx, bdeverrno); 5342 } 5343 } 5344 5345 static void 5346 _remove_notify(void *arg) 5347 { 5348 struct spdk_bdev_desc *desc = arg; 5349 5350 pthread_mutex_lock(&desc->mutex); 5351 desc->refs--; 5352 5353 if (!desc->closed) { 5354 pthread_mutex_unlock(&desc->mutex); 5355 if (desc->callback.open_with_ext) { 5356 desc->callback.event_fn(SPDK_BDEV_EVENT_REMOVE, desc->bdev, desc->callback.ctx); 5357 } else { 5358 desc->callback.remove_fn(desc->callback.ctx); 5359 } 5360 return; 5361 } else if (0 == desc->refs) { 5362 /* This descriptor was closed after this remove_notify message was sent. 5363 * spdk_bdev_close() could not free the descriptor since this message was 5364 * in flight, so we free it now using bdev_desc_free(). 5365 */ 5366 pthread_mutex_unlock(&desc->mutex); 5367 bdev_desc_free(desc); 5368 return; 5369 } 5370 pthread_mutex_unlock(&desc->mutex); 5371 } 5372 5373 /* Must be called while holding bdev->internal.mutex. 5374 * returns: 0 - bdev removed and ready to be destructed. 5375 * -EBUSY - bdev can't be destructed yet. */ 5376 static int 5377 bdev_unregister_unsafe(struct spdk_bdev *bdev) 5378 { 5379 struct spdk_bdev_desc *desc, *tmp; 5380 int rc = 0; 5381 5382 /* Notify each descriptor about hotremoval */ 5383 TAILQ_FOREACH_SAFE(desc, &bdev->internal.open_descs, link, tmp) { 5384 rc = -EBUSY; 5385 pthread_mutex_lock(&desc->mutex); 5386 /* 5387 * Defer invocation of the event_cb to a separate message that will 5388 * run later on its thread. This ensures this context unwinds and 5389 * we don't recursively unregister this bdev again if the event_cb 5390 * immediately closes its descriptor. 5391 */ 5392 desc->refs++; 5393 spdk_thread_send_msg(desc->thread, _remove_notify, desc); 5394 pthread_mutex_unlock(&desc->mutex); 5395 } 5396 5397 /* If there are no descriptors, proceed removing the bdev */ 5398 if (rc == 0) { 5399 TAILQ_REMOVE(&g_bdev_mgr.bdevs, bdev, internal.link); 5400 SPDK_DEBUGLOG(bdev, "Removing bdev %s from list done\n", bdev->name); 5401 spdk_notify_send("bdev_unregister", spdk_bdev_get_name(bdev)); 5402 } 5403 5404 return rc; 5405 } 5406 5407 void 5408 spdk_bdev_unregister(struct spdk_bdev *bdev, spdk_bdev_unregister_cb cb_fn, void *cb_arg) 5409 { 5410 struct spdk_thread *thread; 5411 int rc; 5412 5413 SPDK_DEBUGLOG(bdev, "Removing bdev %s from list\n", bdev->name); 5414 5415 thread = spdk_get_thread(); 5416 if (!thread) { 5417 /* The user called this from a non-SPDK thread. */ 5418 if (cb_fn != NULL) { 5419 cb_fn(cb_arg, -ENOTSUP); 5420 } 5421 return; 5422 } 5423 5424 pthread_mutex_lock(&g_bdev_mgr.mutex); 5425 pthread_mutex_lock(&bdev->internal.mutex); 5426 if (bdev->internal.status == SPDK_BDEV_STATUS_REMOVING) { 5427 pthread_mutex_unlock(&bdev->internal.mutex); 5428 pthread_mutex_unlock(&g_bdev_mgr.mutex); 5429 if (cb_fn) { 5430 cb_fn(cb_arg, -EBUSY); 5431 } 5432 return; 5433 } 5434 5435 bdev->internal.status = SPDK_BDEV_STATUS_REMOVING; 5436 bdev->internal.unregister_cb = cb_fn; 5437 bdev->internal.unregister_ctx = cb_arg; 5438 5439 /* Call under lock. */ 5440 rc = bdev_unregister_unsafe(bdev); 5441 pthread_mutex_unlock(&bdev->internal.mutex); 5442 pthread_mutex_unlock(&g_bdev_mgr.mutex); 5443 5444 if (rc == 0) { 5445 bdev_fini(bdev); 5446 } 5447 } 5448 5449 static void 5450 bdev_dummy_event_cb(void *remove_ctx) 5451 { 5452 SPDK_DEBUGLOG(bdev, "Bdev remove event received with no remove callback specified"); 5453 } 5454 5455 static int 5456 bdev_start_qos(struct spdk_bdev *bdev) 5457 { 5458 struct set_qos_limit_ctx *ctx; 5459 5460 /* Enable QoS */ 5461 if (bdev->internal.qos && bdev->internal.qos->thread == NULL) { 5462 ctx = calloc(1, sizeof(*ctx)); 5463 if (ctx == NULL) { 5464 SPDK_ERRLOG("Failed to allocate memory for QoS context\n"); 5465 return -ENOMEM; 5466 } 5467 ctx->bdev = bdev; 5468 spdk_for_each_channel(__bdev_to_io_dev(bdev), 5469 bdev_enable_qos_msg, ctx, 5470 bdev_enable_qos_done); 5471 } 5472 5473 return 0; 5474 } 5475 5476 static int 5477 bdev_open(struct spdk_bdev *bdev, bool write, struct spdk_bdev_desc *desc) 5478 { 5479 struct spdk_thread *thread; 5480 int rc = 0; 5481 5482 thread = spdk_get_thread(); 5483 if (!thread) { 5484 SPDK_ERRLOG("Cannot open bdev from non-SPDK thread.\n"); 5485 return -ENOTSUP; 5486 } 5487 5488 SPDK_DEBUGLOG(bdev, "Opening descriptor %p for bdev %s on thread %p\n", desc, bdev->name, 5489 spdk_get_thread()); 5490 5491 desc->bdev = bdev; 5492 desc->thread = thread; 5493 desc->write = write; 5494 5495 pthread_mutex_lock(&bdev->internal.mutex); 5496 if (bdev->internal.status == SPDK_BDEV_STATUS_REMOVING) { 5497 pthread_mutex_unlock(&bdev->internal.mutex); 5498 return -ENODEV; 5499 } 5500 5501 if (write && bdev->internal.claim_module) { 5502 SPDK_ERRLOG("Could not open %s - %s module already claimed it\n", 5503 bdev->name, bdev->internal.claim_module->name); 5504 pthread_mutex_unlock(&bdev->internal.mutex); 5505 return -EPERM; 5506 } 5507 5508 rc = bdev_start_qos(bdev); 5509 if (rc != 0) { 5510 SPDK_ERRLOG("Failed to start QoS on bdev %s\n", bdev->name); 5511 pthread_mutex_unlock(&bdev->internal.mutex); 5512 return rc; 5513 } 5514 5515 TAILQ_INSERT_TAIL(&bdev->internal.open_descs, desc, link); 5516 5517 pthread_mutex_unlock(&bdev->internal.mutex); 5518 5519 return 0; 5520 } 5521 5522 int 5523 spdk_bdev_open(struct spdk_bdev *bdev, bool write, spdk_bdev_remove_cb_t remove_cb, 5524 void *remove_ctx, struct spdk_bdev_desc **_desc) 5525 { 5526 struct spdk_bdev_desc *desc; 5527 int rc; 5528 5529 desc = calloc(1, sizeof(*desc)); 5530 if (desc == NULL) { 5531 SPDK_ERRLOG("Failed to allocate memory for bdev descriptor\n"); 5532 return -ENOMEM; 5533 } 5534 5535 if (remove_cb == NULL) { 5536 remove_cb = bdev_dummy_event_cb; 5537 } 5538 5539 TAILQ_INIT(&desc->pending_media_events); 5540 TAILQ_INIT(&desc->free_media_events); 5541 5542 desc->callback.open_with_ext = false; 5543 desc->callback.remove_fn = remove_cb; 5544 desc->callback.ctx = remove_ctx; 5545 pthread_mutex_init(&desc->mutex, NULL); 5546 5547 pthread_mutex_lock(&g_bdev_mgr.mutex); 5548 5549 rc = bdev_open(bdev, write, desc); 5550 if (rc != 0) { 5551 bdev_desc_free(desc); 5552 desc = NULL; 5553 } 5554 5555 *_desc = desc; 5556 5557 pthread_mutex_unlock(&g_bdev_mgr.mutex); 5558 5559 return rc; 5560 } 5561 5562 int 5563 spdk_bdev_open_ext(const char *bdev_name, bool write, spdk_bdev_event_cb_t event_cb, 5564 void *event_ctx, struct spdk_bdev_desc **_desc) 5565 { 5566 struct spdk_bdev_desc *desc; 5567 struct spdk_bdev *bdev; 5568 unsigned int event_id; 5569 int rc; 5570 5571 if (event_cb == NULL) { 5572 SPDK_ERRLOG("Missing event callback function\n"); 5573 return -EINVAL; 5574 } 5575 5576 pthread_mutex_lock(&g_bdev_mgr.mutex); 5577 5578 bdev = spdk_bdev_get_by_name(bdev_name); 5579 5580 if (bdev == NULL) { 5581 SPDK_NOTICELOG("Currently unable to find bdev with name: %s\n", bdev_name); 5582 pthread_mutex_unlock(&g_bdev_mgr.mutex); 5583 return -ENODEV; 5584 } 5585 5586 desc = calloc(1, sizeof(*desc)); 5587 if (desc == NULL) { 5588 SPDK_ERRLOG("Failed to allocate memory for bdev descriptor\n"); 5589 pthread_mutex_unlock(&g_bdev_mgr.mutex); 5590 return -ENOMEM; 5591 } 5592 5593 TAILQ_INIT(&desc->pending_media_events); 5594 TAILQ_INIT(&desc->free_media_events); 5595 5596 desc->callback.open_with_ext = true; 5597 desc->callback.event_fn = event_cb; 5598 desc->callback.ctx = event_ctx; 5599 pthread_mutex_init(&desc->mutex, NULL); 5600 5601 if (bdev->media_events) { 5602 desc->media_events_buffer = calloc(MEDIA_EVENT_POOL_SIZE, 5603 sizeof(*desc->media_events_buffer)); 5604 if (desc->media_events_buffer == NULL) { 5605 SPDK_ERRLOG("Failed to initialize media event pool\n"); 5606 bdev_desc_free(desc); 5607 pthread_mutex_unlock(&g_bdev_mgr.mutex); 5608 return -ENOMEM; 5609 } 5610 5611 for (event_id = 0; event_id < MEDIA_EVENT_POOL_SIZE; ++event_id) { 5612 TAILQ_INSERT_TAIL(&desc->free_media_events, 5613 &desc->media_events_buffer[event_id], tailq); 5614 } 5615 } 5616 5617 rc = bdev_open(bdev, write, desc); 5618 if (rc != 0) { 5619 bdev_desc_free(desc); 5620 desc = NULL; 5621 } 5622 5623 *_desc = desc; 5624 5625 pthread_mutex_unlock(&g_bdev_mgr.mutex); 5626 5627 return rc; 5628 } 5629 5630 void 5631 spdk_bdev_close(struct spdk_bdev_desc *desc) 5632 { 5633 struct spdk_bdev *bdev = spdk_bdev_desc_get_bdev(desc); 5634 int rc; 5635 5636 SPDK_DEBUGLOG(bdev, "Closing descriptor %p for bdev %s on thread %p\n", desc, bdev->name, 5637 spdk_get_thread()); 5638 5639 assert(desc->thread == spdk_get_thread()); 5640 5641 spdk_poller_unregister(&desc->io_timeout_poller); 5642 5643 pthread_mutex_lock(&bdev->internal.mutex); 5644 pthread_mutex_lock(&desc->mutex); 5645 5646 TAILQ_REMOVE(&bdev->internal.open_descs, desc, link); 5647 5648 desc->closed = true; 5649 5650 if (0 == desc->refs) { 5651 pthread_mutex_unlock(&desc->mutex); 5652 bdev_desc_free(desc); 5653 } else { 5654 pthread_mutex_unlock(&desc->mutex); 5655 } 5656 5657 /* If no more descriptors, kill QoS channel */ 5658 if (bdev->internal.qos && TAILQ_EMPTY(&bdev->internal.open_descs)) { 5659 SPDK_DEBUGLOG(bdev, "Closed last descriptor for bdev %s on thread %p. Stopping QoS.\n", 5660 bdev->name, spdk_get_thread()); 5661 5662 if (bdev_qos_destroy(bdev)) { 5663 /* There isn't anything we can do to recover here. Just let the 5664 * old QoS poller keep running. The QoS handling won't change 5665 * cores when the user allocates a new channel, but it won't break. */ 5666 SPDK_ERRLOG("Unable to shut down QoS poller. It will continue running on the current thread.\n"); 5667 } 5668 } 5669 5670 spdk_bdev_set_qd_sampling_period(bdev, 0); 5671 5672 if (bdev->internal.status == SPDK_BDEV_STATUS_REMOVING && TAILQ_EMPTY(&bdev->internal.open_descs)) { 5673 rc = bdev_unregister_unsafe(bdev); 5674 pthread_mutex_unlock(&bdev->internal.mutex); 5675 5676 if (rc == 0) { 5677 bdev_fini(bdev); 5678 } 5679 } else { 5680 pthread_mutex_unlock(&bdev->internal.mutex); 5681 } 5682 } 5683 5684 int 5685 spdk_bdev_module_claim_bdev(struct spdk_bdev *bdev, struct spdk_bdev_desc *desc, 5686 struct spdk_bdev_module *module) 5687 { 5688 if (bdev->internal.claim_module != NULL) { 5689 SPDK_ERRLOG("bdev %s already claimed by module %s\n", bdev->name, 5690 bdev->internal.claim_module->name); 5691 return -EPERM; 5692 } 5693 5694 if (desc && !desc->write) { 5695 desc->write = true; 5696 } 5697 5698 bdev->internal.claim_module = module; 5699 return 0; 5700 } 5701 5702 void 5703 spdk_bdev_module_release_bdev(struct spdk_bdev *bdev) 5704 { 5705 assert(bdev->internal.claim_module != NULL); 5706 bdev->internal.claim_module = NULL; 5707 } 5708 5709 struct spdk_bdev * 5710 spdk_bdev_desc_get_bdev(struct spdk_bdev_desc *desc) 5711 { 5712 assert(desc != NULL); 5713 return desc->bdev; 5714 } 5715 5716 void 5717 spdk_bdev_io_get_iovec(struct spdk_bdev_io *bdev_io, struct iovec **iovp, int *iovcntp) 5718 { 5719 struct iovec *iovs; 5720 int iovcnt; 5721 5722 if (bdev_io == NULL) { 5723 return; 5724 } 5725 5726 switch (bdev_io->type) { 5727 case SPDK_BDEV_IO_TYPE_READ: 5728 case SPDK_BDEV_IO_TYPE_WRITE: 5729 case SPDK_BDEV_IO_TYPE_ZCOPY: 5730 iovs = bdev_io->u.bdev.iovs; 5731 iovcnt = bdev_io->u.bdev.iovcnt; 5732 break; 5733 default: 5734 iovs = NULL; 5735 iovcnt = 0; 5736 break; 5737 } 5738 5739 if (iovp) { 5740 *iovp = iovs; 5741 } 5742 if (iovcntp) { 5743 *iovcntp = iovcnt; 5744 } 5745 } 5746 5747 void * 5748 spdk_bdev_io_get_md_buf(struct spdk_bdev_io *bdev_io) 5749 { 5750 if (bdev_io == NULL) { 5751 return NULL; 5752 } 5753 5754 if (!spdk_bdev_is_md_separate(bdev_io->bdev)) { 5755 return NULL; 5756 } 5757 5758 if (bdev_io->type == SPDK_BDEV_IO_TYPE_READ || 5759 bdev_io->type == SPDK_BDEV_IO_TYPE_WRITE) { 5760 return bdev_io->u.bdev.md_buf; 5761 } 5762 5763 return NULL; 5764 } 5765 5766 void * 5767 spdk_bdev_io_get_cb_arg(struct spdk_bdev_io *bdev_io) 5768 { 5769 if (bdev_io == NULL) { 5770 assert(false); 5771 return NULL; 5772 } 5773 5774 return bdev_io->internal.caller_ctx; 5775 } 5776 5777 void 5778 spdk_bdev_module_list_add(struct spdk_bdev_module *bdev_module) 5779 { 5780 5781 if (spdk_bdev_module_list_find(bdev_module->name)) { 5782 SPDK_ERRLOG("ERROR: module '%s' already registered.\n", bdev_module->name); 5783 assert(false); 5784 } 5785 5786 /* 5787 * Modules with examine callbacks must be initialized first, so they are 5788 * ready to handle examine callbacks from later modules that will 5789 * register physical bdevs. 5790 */ 5791 if (bdev_module->examine_config != NULL || bdev_module->examine_disk != NULL) { 5792 TAILQ_INSERT_HEAD(&g_bdev_mgr.bdev_modules, bdev_module, internal.tailq); 5793 } else { 5794 TAILQ_INSERT_TAIL(&g_bdev_mgr.bdev_modules, bdev_module, internal.tailq); 5795 } 5796 } 5797 5798 struct spdk_bdev_module * 5799 spdk_bdev_module_list_find(const char *name) 5800 { 5801 struct spdk_bdev_module *bdev_module; 5802 5803 TAILQ_FOREACH(bdev_module, &g_bdev_mgr.bdev_modules, internal.tailq) { 5804 if (strcmp(name, bdev_module->name) == 0) { 5805 break; 5806 } 5807 } 5808 5809 return bdev_module; 5810 } 5811 5812 static void 5813 bdev_write_zero_buffer_next(void *_bdev_io) 5814 { 5815 struct spdk_bdev_io *bdev_io = _bdev_io; 5816 uint64_t num_bytes, num_blocks; 5817 void *md_buf = NULL; 5818 int rc; 5819 5820 num_bytes = spdk_min(_bdev_get_block_size_with_md(bdev_io->bdev) * 5821 bdev_io->u.bdev.split_remaining_num_blocks, 5822 ZERO_BUFFER_SIZE); 5823 num_blocks = num_bytes / _bdev_get_block_size_with_md(bdev_io->bdev); 5824 5825 if (spdk_bdev_is_md_separate(bdev_io->bdev)) { 5826 md_buf = (char *)g_bdev_mgr.zero_buffer + 5827 spdk_bdev_get_block_size(bdev_io->bdev) * num_blocks; 5828 } 5829 5830 rc = bdev_write_blocks_with_md(bdev_io->internal.desc, 5831 spdk_io_channel_from_ctx(bdev_io->internal.ch), 5832 g_bdev_mgr.zero_buffer, md_buf, 5833 bdev_io->u.bdev.split_current_offset_blocks, num_blocks, 5834 bdev_write_zero_buffer_done, bdev_io); 5835 if (rc == 0) { 5836 bdev_io->u.bdev.split_remaining_num_blocks -= num_blocks; 5837 bdev_io->u.bdev.split_current_offset_blocks += num_blocks; 5838 } else if (rc == -ENOMEM) { 5839 bdev_queue_io_wait_with_cb(bdev_io, bdev_write_zero_buffer_next); 5840 } else { 5841 bdev_io->internal.status = SPDK_BDEV_IO_STATUS_FAILED; 5842 bdev_io->internal.cb(bdev_io, false, bdev_io->internal.caller_ctx); 5843 } 5844 } 5845 5846 static void 5847 bdev_write_zero_buffer_done(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg) 5848 { 5849 struct spdk_bdev_io *parent_io = cb_arg; 5850 5851 spdk_bdev_free_io(bdev_io); 5852 5853 if (!success) { 5854 parent_io->internal.status = SPDK_BDEV_IO_STATUS_FAILED; 5855 parent_io->internal.cb(parent_io, false, parent_io->internal.caller_ctx); 5856 return; 5857 } 5858 5859 if (parent_io->u.bdev.split_remaining_num_blocks == 0) { 5860 parent_io->internal.status = SPDK_BDEV_IO_STATUS_SUCCESS; 5861 parent_io->internal.cb(parent_io, true, parent_io->internal.caller_ctx); 5862 return; 5863 } 5864 5865 bdev_write_zero_buffer_next(parent_io); 5866 } 5867 5868 static void 5869 bdev_set_qos_limit_done(struct set_qos_limit_ctx *ctx, int status) 5870 { 5871 pthread_mutex_lock(&ctx->bdev->internal.mutex); 5872 ctx->bdev->internal.qos_mod_in_progress = false; 5873 pthread_mutex_unlock(&ctx->bdev->internal.mutex); 5874 5875 if (ctx->cb_fn) { 5876 ctx->cb_fn(ctx->cb_arg, status); 5877 } 5878 free(ctx); 5879 } 5880 5881 static void 5882 bdev_disable_qos_done(void *cb_arg) 5883 { 5884 struct set_qos_limit_ctx *ctx = cb_arg; 5885 struct spdk_bdev *bdev = ctx->bdev; 5886 struct spdk_bdev_io *bdev_io; 5887 struct spdk_bdev_qos *qos; 5888 5889 pthread_mutex_lock(&bdev->internal.mutex); 5890 qos = bdev->internal.qos; 5891 bdev->internal.qos = NULL; 5892 pthread_mutex_unlock(&bdev->internal.mutex); 5893 5894 while (!TAILQ_EMPTY(&qos->queued)) { 5895 /* Send queued I/O back to their original thread for resubmission. */ 5896 bdev_io = TAILQ_FIRST(&qos->queued); 5897 TAILQ_REMOVE(&qos->queued, bdev_io, internal.link); 5898 5899 if (bdev_io->internal.io_submit_ch) { 5900 /* 5901 * Channel was changed when sending it to the QoS thread - change it back 5902 * before sending it back to the original thread. 5903 */ 5904 bdev_io->internal.ch = bdev_io->internal.io_submit_ch; 5905 bdev_io->internal.io_submit_ch = NULL; 5906 } 5907 5908 spdk_thread_send_msg(spdk_bdev_io_get_thread(bdev_io), 5909 _bdev_io_submit, bdev_io); 5910 } 5911 5912 if (qos->thread != NULL) { 5913 spdk_put_io_channel(spdk_io_channel_from_ctx(qos->ch)); 5914 spdk_poller_unregister(&qos->poller); 5915 } 5916 5917 free(qos); 5918 5919 bdev_set_qos_limit_done(ctx, 0); 5920 } 5921 5922 static void 5923 bdev_disable_qos_msg_done(struct spdk_io_channel_iter *i, int status) 5924 { 5925 void *io_device = spdk_io_channel_iter_get_io_device(i); 5926 struct spdk_bdev *bdev = __bdev_from_io_dev(io_device); 5927 struct set_qos_limit_ctx *ctx = spdk_io_channel_iter_get_ctx(i); 5928 struct spdk_thread *thread; 5929 5930 pthread_mutex_lock(&bdev->internal.mutex); 5931 thread = bdev->internal.qos->thread; 5932 pthread_mutex_unlock(&bdev->internal.mutex); 5933 5934 if (thread != NULL) { 5935 spdk_thread_send_msg(thread, bdev_disable_qos_done, ctx); 5936 } else { 5937 bdev_disable_qos_done(ctx); 5938 } 5939 } 5940 5941 static void 5942 bdev_disable_qos_msg(struct spdk_io_channel_iter *i) 5943 { 5944 struct spdk_io_channel *ch = spdk_io_channel_iter_get_channel(i); 5945 struct spdk_bdev_channel *bdev_ch = spdk_io_channel_get_ctx(ch); 5946 5947 bdev_ch->flags &= ~BDEV_CH_QOS_ENABLED; 5948 5949 spdk_for_each_channel_continue(i, 0); 5950 } 5951 5952 static void 5953 bdev_update_qos_rate_limit_msg(void *cb_arg) 5954 { 5955 struct set_qos_limit_ctx *ctx = cb_arg; 5956 struct spdk_bdev *bdev = ctx->bdev; 5957 5958 pthread_mutex_lock(&bdev->internal.mutex); 5959 bdev_qos_update_max_quota_per_timeslice(bdev->internal.qos); 5960 pthread_mutex_unlock(&bdev->internal.mutex); 5961 5962 bdev_set_qos_limit_done(ctx, 0); 5963 } 5964 5965 static void 5966 bdev_enable_qos_msg(struct spdk_io_channel_iter *i) 5967 { 5968 void *io_device = spdk_io_channel_iter_get_io_device(i); 5969 struct spdk_bdev *bdev = __bdev_from_io_dev(io_device); 5970 struct spdk_io_channel *ch = spdk_io_channel_iter_get_channel(i); 5971 struct spdk_bdev_channel *bdev_ch = spdk_io_channel_get_ctx(ch); 5972 5973 pthread_mutex_lock(&bdev->internal.mutex); 5974 bdev_enable_qos(bdev, bdev_ch); 5975 pthread_mutex_unlock(&bdev->internal.mutex); 5976 spdk_for_each_channel_continue(i, 0); 5977 } 5978 5979 static void 5980 bdev_enable_qos_done(struct spdk_io_channel_iter *i, int status) 5981 { 5982 struct set_qos_limit_ctx *ctx = spdk_io_channel_iter_get_ctx(i); 5983 5984 bdev_set_qos_limit_done(ctx, status); 5985 } 5986 5987 static void 5988 bdev_set_qos_rate_limits(struct spdk_bdev *bdev, uint64_t *limits) 5989 { 5990 int i; 5991 5992 assert(bdev->internal.qos != NULL); 5993 5994 for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) { 5995 if (limits[i] != SPDK_BDEV_QOS_LIMIT_NOT_DEFINED) { 5996 bdev->internal.qos->rate_limits[i].limit = limits[i]; 5997 5998 if (limits[i] == 0) { 5999 bdev->internal.qos->rate_limits[i].limit = 6000 SPDK_BDEV_QOS_LIMIT_NOT_DEFINED; 6001 } 6002 } 6003 } 6004 } 6005 6006 void 6007 spdk_bdev_set_qos_rate_limits(struct spdk_bdev *bdev, uint64_t *limits, 6008 void (*cb_fn)(void *cb_arg, int status), void *cb_arg) 6009 { 6010 struct set_qos_limit_ctx *ctx; 6011 uint32_t limit_set_complement; 6012 uint64_t min_limit_per_sec; 6013 int i; 6014 bool disable_rate_limit = true; 6015 6016 for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) { 6017 if (limits[i] == SPDK_BDEV_QOS_LIMIT_NOT_DEFINED) { 6018 continue; 6019 } 6020 6021 if (limits[i] > 0) { 6022 disable_rate_limit = false; 6023 } 6024 6025 if (bdev_qos_is_iops_rate_limit(i) == true) { 6026 min_limit_per_sec = SPDK_BDEV_QOS_MIN_IOS_PER_SEC; 6027 } else { 6028 /* Change from megabyte to byte rate limit */ 6029 limits[i] = limits[i] * 1024 * 1024; 6030 min_limit_per_sec = SPDK_BDEV_QOS_MIN_BYTES_PER_SEC; 6031 } 6032 6033 limit_set_complement = limits[i] % min_limit_per_sec; 6034 if (limit_set_complement) { 6035 SPDK_ERRLOG("Requested rate limit %" PRIu64 " is not a multiple of %" PRIu64 "\n", 6036 limits[i], min_limit_per_sec); 6037 limits[i] += min_limit_per_sec - limit_set_complement; 6038 SPDK_ERRLOG("Round up the rate limit to %" PRIu64 "\n", limits[i]); 6039 } 6040 } 6041 6042 ctx = calloc(1, sizeof(*ctx)); 6043 if (ctx == NULL) { 6044 cb_fn(cb_arg, -ENOMEM); 6045 return; 6046 } 6047 6048 ctx->cb_fn = cb_fn; 6049 ctx->cb_arg = cb_arg; 6050 ctx->bdev = bdev; 6051 6052 pthread_mutex_lock(&bdev->internal.mutex); 6053 if (bdev->internal.qos_mod_in_progress) { 6054 pthread_mutex_unlock(&bdev->internal.mutex); 6055 free(ctx); 6056 cb_fn(cb_arg, -EAGAIN); 6057 return; 6058 } 6059 bdev->internal.qos_mod_in_progress = true; 6060 6061 if (disable_rate_limit == true && bdev->internal.qos) { 6062 for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) { 6063 if (limits[i] == SPDK_BDEV_QOS_LIMIT_NOT_DEFINED && 6064 (bdev->internal.qos->rate_limits[i].limit > 0 && 6065 bdev->internal.qos->rate_limits[i].limit != 6066 SPDK_BDEV_QOS_LIMIT_NOT_DEFINED)) { 6067 disable_rate_limit = false; 6068 break; 6069 } 6070 } 6071 } 6072 6073 if (disable_rate_limit == false) { 6074 if (bdev->internal.qos == NULL) { 6075 bdev->internal.qos = calloc(1, sizeof(*bdev->internal.qos)); 6076 if (!bdev->internal.qos) { 6077 pthread_mutex_unlock(&bdev->internal.mutex); 6078 SPDK_ERRLOG("Unable to allocate memory for QoS tracking\n"); 6079 bdev_set_qos_limit_done(ctx, -ENOMEM); 6080 return; 6081 } 6082 } 6083 6084 if (bdev->internal.qos->thread == NULL) { 6085 /* Enabling */ 6086 bdev_set_qos_rate_limits(bdev, limits); 6087 6088 spdk_for_each_channel(__bdev_to_io_dev(bdev), 6089 bdev_enable_qos_msg, ctx, 6090 bdev_enable_qos_done); 6091 } else { 6092 /* Updating */ 6093 bdev_set_qos_rate_limits(bdev, limits); 6094 6095 spdk_thread_send_msg(bdev->internal.qos->thread, 6096 bdev_update_qos_rate_limit_msg, ctx); 6097 } 6098 } else { 6099 if (bdev->internal.qos != NULL) { 6100 bdev_set_qos_rate_limits(bdev, limits); 6101 6102 /* Disabling */ 6103 spdk_for_each_channel(__bdev_to_io_dev(bdev), 6104 bdev_disable_qos_msg, ctx, 6105 bdev_disable_qos_msg_done); 6106 } else { 6107 pthread_mutex_unlock(&bdev->internal.mutex); 6108 bdev_set_qos_limit_done(ctx, 0); 6109 return; 6110 } 6111 } 6112 6113 pthread_mutex_unlock(&bdev->internal.mutex); 6114 } 6115 6116 struct spdk_bdev_histogram_ctx { 6117 spdk_bdev_histogram_status_cb cb_fn; 6118 void *cb_arg; 6119 struct spdk_bdev *bdev; 6120 int status; 6121 }; 6122 6123 static void 6124 bdev_histogram_disable_channel_cb(struct spdk_io_channel_iter *i, int status) 6125 { 6126 struct spdk_bdev_histogram_ctx *ctx = spdk_io_channel_iter_get_ctx(i); 6127 6128 pthread_mutex_lock(&ctx->bdev->internal.mutex); 6129 ctx->bdev->internal.histogram_in_progress = false; 6130 pthread_mutex_unlock(&ctx->bdev->internal.mutex); 6131 ctx->cb_fn(ctx->cb_arg, ctx->status); 6132 free(ctx); 6133 } 6134 6135 static void 6136 bdev_histogram_disable_channel(struct spdk_io_channel_iter *i) 6137 { 6138 struct spdk_io_channel *_ch = spdk_io_channel_iter_get_channel(i); 6139 struct spdk_bdev_channel *ch = spdk_io_channel_get_ctx(_ch); 6140 6141 if (ch->histogram != NULL) { 6142 spdk_histogram_data_free(ch->histogram); 6143 ch->histogram = NULL; 6144 } 6145 spdk_for_each_channel_continue(i, 0); 6146 } 6147 6148 static void 6149 bdev_histogram_enable_channel_cb(struct spdk_io_channel_iter *i, int status) 6150 { 6151 struct spdk_bdev_histogram_ctx *ctx = spdk_io_channel_iter_get_ctx(i); 6152 6153 if (status != 0) { 6154 ctx->status = status; 6155 ctx->bdev->internal.histogram_enabled = false; 6156 spdk_for_each_channel(__bdev_to_io_dev(ctx->bdev), bdev_histogram_disable_channel, ctx, 6157 bdev_histogram_disable_channel_cb); 6158 } else { 6159 pthread_mutex_lock(&ctx->bdev->internal.mutex); 6160 ctx->bdev->internal.histogram_in_progress = false; 6161 pthread_mutex_unlock(&ctx->bdev->internal.mutex); 6162 ctx->cb_fn(ctx->cb_arg, ctx->status); 6163 free(ctx); 6164 } 6165 } 6166 6167 static void 6168 bdev_histogram_enable_channel(struct spdk_io_channel_iter *i) 6169 { 6170 struct spdk_io_channel *_ch = spdk_io_channel_iter_get_channel(i); 6171 struct spdk_bdev_channel *ch = spdk_io_channel_get_ctx(_ch); 6172 int status = 0; 6173 6174 if (ch->histogram == NULL) { 6175 ch->histogram = spdk_histogram_data_alloc(); 6176 if (ch->histogram == NULL) { 6177 status = -ENOMEM; 6178 } 6179 } 6180 6181 spdk_for_each_channel_continue(i, status); 6182 } 6183 6184 void 6185 spdk_bdev_histogram_enable(struct spdk_bdev *bdev, spdk_bdev_histogram_status_cb cb_fn, 6186 void *cb_arg, bool enable) 6187 { 6188 struct spdk_bdev_histogram_ctx *ctx; 6189 6190 ctx = calloc(1, sizeof(struct spdk_bdev_histogram_ctx)); 6191 if (ctx == NULL) { 6192 cb_fn(cb_arg, -ENOMEM); 6193 return; 6194 } 6195 6196 ctx->bdev = bdev; 6197 ctx->status = 0; 6198 ctx->cb_fn = cb_fn; 6199 ctx->cb_arg = cb_arg; 6200 6201 pthread_mutex_lock(&bdev->internal.mutex); 6202 if (bdev->internal.histogram_in_progress) { 6203 pthread_mutex_unlock(&bdev->internal.mutex); 6204 free(ctx); 6205 cb_fn(cb_arg, -EAGAIN); 6206 return; 6207 } 6208 6209 bdev->internal.histogram_in_progress = true; 6210 pthread_mutex_unlock(&bdev->internal.mutex); 6211 6212 bdev->internal.histogram_enabled = enable; 6213 6214 if (enable) { 6215 /* Allocate histogram for each channel */ 6216 spdk_for_each_channel(__bdev_to_io_dev(bdev), bdev_histogram_enable_channel, ctx, 6217 bdev_histogram_enable_channel_cb); 6218 } else { 6219 spdk_for_each_channel(__bdev_to_io_dev(bdev), bdev_histogram_disable_channel, ctx, 6220 bdev_histogram_disable_channel_cb); 6221 } 6222 } 6223 6224 struct spdk_bdev_histogram_data_ctx { 6225 spdk_bdev_histogram_data_cb cb_fn; 6226 void *cb_arg; 6227 struct spdk_bdev *bdev; 6228 /** merged histogram data from all channels */ 6229 struct spdk_histogram_data *histogram; 6230 }; 6231 6232 static void 6233 bdev_histogram_get_channel_cb(struct spdk_io_channel_iter *i, int status) 6234 { 6235 struct spdk_bdev_histogram_data_ctx *ctx = spdk_io_channel_iter_get_ctx(i); 6236 6237 ctx->cb_fn(ctx->cb_arg, status, ctx->histogram); 6238 free(ctx); 6239 } 6240 6241 static void 6242 bdev_histogram_get_channel(struct spdk_io_channel_iter *i) 6243 { 6244 struct spdk_io_channel *_ch = spdk_io_channel_iter_get_channel(i); 6245 struct spdk_bdev_channel *ch = spdk_io_channel_get_ctx(_ch); 6246 struct spdk_bdev_histogram_data_ctx *ctx = spdk_io_channel_iter_get_ctx(i); 6247 int status = 0; 6248 6249 if (ch->histogram == NULL) { 6250 status = -EFAULT; 6251 } else { 6252 spdk_histogram_data_merge(ctx->histogram, ch->histogram); 6253 } 6254 6255 spdk_for_each_channel_continue(i, status); 6256 } 6257 6258 void 6259 spdk_bdev_histogram_get(struct spdk_bdev *bdev, struct spdk_histogram_data *histogram, 6260 spdk_bdev_histogram_data_cb cb_fn, 6261 void *cb_arg) 6262 { 6263 struct spdk_bdev_histogram_data_ctx *ctx; 6264 6265 ctx = calloc(1, sizeof(struct spdk_bdev_histogram_data_ctx)); 6266 if (ctx == NULL) { 6267 cb_fn(cb_arg, -ENOMEM, NULL); 6268 return; 6269 } 6270 6271 ctx->bdev = bdev; 6272 ctx->cb_fn = cb_fn; 6273 ctx->cb_arg = cb_arg; 6274 6275 ctx->histogram = histogram; 6276 6277 spdk_for_each_channel(__bdev_to_io_dev(bdev), bdev_histogram_get_channel, ctx, 6278 bdev_histogram_get_channel_cb); 6279 } 6280 6281 size_t 6282 spdk_bdev_get_media_events(struct spdk_bdev_desc *desc, struct spdk_bdev_media_event *events, 6283 size_t max_events) 6284 { 6285 struct media_event_entry *entry; 6286 size_t num_events = 0; 6287 6288 for (; num_events < max_events; ++num_events) { 6289 entry = TAILQ_FIRST(&desc->pending_media_events); 6290 if (entry == NULL) { 6291 break; 6292 } 6293 6294 events[num_events] = entry->event; 6295 TAILQ_REMOVE(&desc->pending_media_events, entry, tailq); 6296 TAILQ_INSERT_TAIL(&desc->free_media_events, entry, tailq); 6297 } 6298 6299 return num_events; 6300 } 6301 6302 int 6303 spdk_bdev_push_media_events(struct spdk_bdev *bdev, const struct spdk_bdev_media_event *events, 6304 size_t num_events) 6305 { 6306 struct spdk_bdev_desc *desc; 6307 struct media_event_entry *entry; 6308 size_t event_id; 6309 int rc = 0; 6310 6311 assert(bdev->media_events); 6312 6313 pthread_mutex_lock(&bdev->internal.mutex); 6314 TAILQ_FOREACH(desc, &bdev->internal.open_descs, link) { 6315 if (desc->write) { 6316 break; 6317 } 6318 } 6319 6320 if (desc == NULL || desc->media_events_buffer == NULL) { 6321 rc = -ENODEV; 6322 goto out; 6323 } 6324 6325 for (event_id = 0; event_id < num_events; ++event_id) { 6326 entry = TAILQ_FIRST(&desc->free_media_events); 6327 if (entry == NULL) { 6328 break; 6329 } 6330 6331 TAILQ_REMOVE(&desc->free_media_events, entry, tailq); 6332 TAILQ_INSERT_TAIL(&desc->pending_media_events, entry, tailq); 6333 entry->event = events[event_id]; 6334 } 6335 6336 rc = event_id; 6337 out: 6338 pthread_mutex_unlock(&bdev->internal.mutex); 6339 return rc; 6340 } 6341 6342 void 6343 spdk_bdev_notify_media_management(struct spdk_bdev *bdev) 6344 { 6345 struct spdk_bdev_desc *desc; 6346 6347 pthread_mutex_lock(&bdev->internal.mutex); 6348 TAILQ_FOREACH(desc, &bdev->internal.open_descs, link) { 6349 if (!TAILQ_EMPTY(&desc->pending_media_events)) { 6350 desc->callback.event_fn(SPDK_BDEV_EVENT_MEDIA_MANAGEMENT, bdev, 6351 desc->callback.ctx); 6352 } 6353 } 6354 pthread_mutex_unlock(&bdev->internal.mutex); 6355 } 6356 6357 struct locked_lba_range_ctx { 6358 struct lba_range range; 6359 struct spdk_bdev *bdev; 6360 struct lba_range *current_range; 6361 struct lba_range *owner_range; 6362 struct spdk_poller *poller; 6363 lock_range_cb cb_fn; 6364 void *cb_arg; 6365 }; 6366 6367 static void 6368 bdev_lock_error_cleanup_cb(struct spdk_io_channel_iter *i, int status) 6369 { 6370 struct locked_lba_range_ctx *ctx = spdk_io_channel_iter_get_ctx(i); 6371 6372 ctx->cb_fn(ctx->cb_arg, -ENOMEM); 6373 free(ctx); 6374 } 6375 6376 static void 6377 bdev_unlock_lba_range_get_channel(struct spdk_io_channel_iter *i); 6378 6379 static void 6380 bdev_lock_lba_range_cb(struct spdk_io_channel_iter *i, int status) 6381 { 6382 struct locked_lba_range_ctx *ctx = spdk_io_channel_iter_get_ctx(i); 6383 struct spdk_bdev *bdev = ctx->bdev; 6384 6385 if (status == -ENOMEM) { 6386 /* One of the channels could not allocate a range object. 6387 * So we have to go back and clean up any ranges that were 6388 * allocated successfully before we return error status to 6389 * the caller. We can reuse the unlock function to do that 6390 * clean up. 6391 */ 6392 spdk_for_each_channel(__bdev_to_io_dev(bdev), 6393 bdev_unlock_lba_range_get_channel, ctx, 6394 bdev_lock_error_cleanup_cb); 6395 return; 6396 } 6397 6398 /* All channels have locked this range and no I/O overlapping the range 6399 * are outstanding! Set the owner_ch for the range object for the 6400 * locking channel, so that this channel will know that it is allowed 6401 * to write to this range. 6402 */ 6403 ctx->owner_range->owner_ch = ctx->range.owner_ch; 6404 ctx->cb_fn(ctx->cb_arg, status); 6405 6406 /* Don't free the ctx here. Its range is in the bdev's global list of 6407 * locked ranges still, and will be removed and freed when this range 6408 * is later unlocked. 6409 */ 6410 } 6411 6412 static int 6413 bdev_lock_lba_range_check_io(void *_i) 6414 { 6415 struct spdk_io_channel_iter *i = _i; 6416 struct spdk_io_channel *_ch = spdk_io_channel_iter_get_channel(i); 6417 struct spdk_bdev_channel *ch = spdk_io_channel_get_ctx(_ch); 6418 struct locked_lba_range_ctx *ctx = spdk_io_channel_iter_get_ctx(i); 6419 struct lba_range *range = ctx->current_range; 6420 struct spdk_bdev_io *bdev_io; 6421 6422 spdk_poller_unregister(&ctx->poller); 6423 6424 /* The range is now in the locked_ranges, so no new IO can be submitted to this 6425 * range. But we need to wait until any outstanding IO overlapping with this range 6426 * are completed. 6427 */ 6428 TAILQ_FOREACH(bdev_io, &ch->io_submitted, internal.ch_link) { 6429 if (bdev_io_range_is_locked(bdev_io, range)) { 6430 ctx->poller = SPDK_POLLER_REGISTER(bdev_lock_lba_range_check_io, i, 100); 6431 return SPDK_POLLER_BUSY; 6432 } 6433 } 6434 6435 spdk_for_each_channel_continue(i, 0); 6436 return SPDK_POLLER_BUSY; 6437 } 6438 6439 static void 6440 bdev_lock_lba_range_get_channel(struct spdk_io_channel_iter *i) 6441 { 6442 struct spdk_io_channel *_ch = spdk_io_channel_iter_get_channel(i); 6443 struct spdk_bdev_channel *ch = spdk_io_channel_get_ctx(_ch); 6444 struct locked_lba_range_ctx *ctx = spdk_io_channel_iter_get_ctx(i); 6445 struct lba_range *range; 6446 6447 TAILQ_FOREACH(range, &ch->locked_ranges, tailq) { 6448 if (range->length == ctx->range.length && 6449 range->offset == ctx->range.offset && 6450 range->locked_ctx == ctx->range.locked_ctx) { 6451 /* This range already exists on this channel, so don't add 6452 * it again. This can happen when a new channel is created 6453 * while the for_each_channel operation is in progress. 6454 * Do not check for outstanding I/O in that case, since the 6455 * range was locked before any I/O could be submitted to the 6456 * new channel. 6457 */ 6458 spdk_for_each_channel_continue(i, 0); 6459 return; 6460 } 6461 } 6462 6463 range = calloc(1, sizeof(*range)); 6464 if (range == NULL) { 6465 spdk_for_each_channel_continue(i, -ENOMEM); 6466 return; 6467 } 6468 6469 range->length = ctx->range.length; 6470 range->offset = ctx->range.offset; 6471 range->locked_ctx = ctx->range.locked_ctx; 6472 ctx->current_range = range; 6473 if (ctx->range.owner_ch == ch) { 6474 /* This is the range object for the channel that will hold 6475 * the lock. Store it in the ctx object so that we can easily 6476 * set its owner_ch after the lock is finally acquired. 6477 */ 6478 ctx->owner_range = range; 6479 } 6480 TAILQ_INSERT_TAIL(&ch->locked_ranges, range, tailq); 6481 bdev_lock_lba_range_check_io(i); 6482 } 6483 6484 static void 6485 bdev_lock_lba_range_ctx(struct spdk_bdev *bdev, struct locked_lba_range_ctx *ctx) 6486 { 6487 assert(spdk_get_thread() == ctx->range.owner_ch->channel->thread); 6488 6489 /* We will add a copy of this range to each channel now. */ 6490 spdk_for_each_channel(__bdev_to_io_dev(bdev), bdev_lock_lba_range_get_channel, ctx, 6491 bdev_lock_lba_range_cb); 6492 } 6493 6494 static bool 6495 bdev_lba_range_overlaps_tailq(struct lba_range *range, lba_range_tailq_t *tailq) 6496 { 6497 struct lba_range *r; 6498 6499 TAILQ_FOREACH(r, tailq, tailq) { 6500 if (bdev_lba_range_overlapped(range, r)) { 6501 return true; 6502 } 6503 } 6504 return false; 6505 } 6506 6507 static int 6508 bdev_lock_lba_range(struct spdk_bdev_desc *desc, struct spdk_io_channel *_ch, 6509 uint64_t offset, uint64_t length, 6510 lock_range_cb cb_fn, void *cb_arg) 6511 { 6512 struct spdk_bdev *bdev = spdk_bdev_desc_get_bdev(desc); 6513 struct spdk_bdev_channel *ch = spdk_io_channel_get_ctx(_ch); 6514 struct locked_lba_range_ctx *ctx; 6515 6516 if (cb_arg == NULL) { 6517 SPDK_ERRLOG("cb_arg must not be NULL\n"); 6518 return -EINVAL; 6519 } 6520 6521 ctx = calloc(1, sizeof(*ctx)); 6522 if (ctx == NULL) { 6523 return -ENOMEM; 6524 } 6525 6526 ctx->range.offset = offset; 6527 ctx->range.length = length; 6528 ctx->range.owner_ch = ch; 6529 ctx->range.locked_ctx = cb_arg; 6530 ctx->bdev = bdev; 6531 ctx->cb_fn = cb_fn; 6532 ctx->cb_arg = cb_arg; 6533 6534 pthread_mutex_lock(&bdev->internal.mutex); 6535 if (bdev_lba_range_overlaps_tailq(&ctx->range, &bdev->internal.locked_ranges)) { 6536 /* There is an active lock overlapping with this range. 6537 * Put it on the pending list until this range no 6538 * longer overlaps with another. 6539 */ 6540 TAILQ_INSERT_TAIL(&bdev->internal.pending_locked_ranges, &ctx->range, tailq); 6541 } else { 6542 TAILQ_INSERT_TAIL(&bdev->internal.locked_ranges, &ctx->range, tailq); 6543 bdev_lock_lba_range_ctx(bdev, ctx); 6544 } 6545 pthread_mutex_unlock(&bdev->internal.mutex); 6546 return 0; 6547 } 6548 6549 static void 6550 bdev_lock_lba_range_ctx_msg(void *_ctx) 6551 { 6552 struct locked_lba_range_ctx *ctx = _ctx; 6553 6554 bdev_lock_lba_range_ctx(ctx->bdev, ctx); 6555 } 6556 6557 static void 6558 bdev_unlock_lba_range_cb(struct spdk_io_channel_iter *i, int status) 6559 { 6560 struct locked_lba_range_ctx *ctx = spdk_io_channel_iter_get_ctx(i); 6561 struct locked_lba_range_ctx *pending_ctx; 6562 struct spdk_bdev_channel *ch = ctx->range.owner_ch; 6563 struct spdk_bdev *bdev = ch->bdev; 6564 struct lba_range *range, *tmp; 6565 6566 pthread_mutex_lock(&bdev->internal.mutex); 6567 /* Check if there are any pending locked ranges that overlap with this range 6568 * that was just unlocked. If there are, check that it doesn't overlap with any 6569 * other locked ranges before calling bdev_lock_lba_range_ctx which will start 6570 * the lock process. 6571 */ 6572 TAILQ_FOREACH_SAFE(range, &bdev->internal.pending_locked_ranges, tailq, tmp) { 6573 if (bdev_lba_range_overlapped(range, &ctx->range) && 6574 !bdev_lba_range_overlaps_tailq(range, &bdev->internal.locked_ranges)) { 6575 TAILQ_REMOVE(&bdev->internal.pending_locked_ranges, range, tailq); 6576 pending_ctx = SPDK_CONTAINEROF(range, struct locked_lba_range_ctx, range); 6577 TAILQ_INSERT_TAIL(&bdev->internal.locked_ranges, range, tailq); 6578 spdk_thread_send_msg(pending_ctx->range.owner_ch->channel->thread, 6579 bdev_lock_lba_range_ctx_msg, pending_ctx); 6580 } 6581 } 6582 pthread_mutex_unlock(&bdev->internal.mutex); 6583 6584 ctx->cb_fn(ctx->cb_arg, status); 6585 free(ctx); 6586 } 6587 6588 static void 6589 bdev_unlock_lba_range_get_channel(struct spdk_io_channel_iter *i) 6590 { 6591 struct spdk_io_channel *_ch = spdk_io_channel_iter_get_channel(i); 6592 struct spdk_bdev_channel *ch = spdk_io_channel_get_ctx(_ch); 6593 struct locked_lba_range_ctx *ctx = spdk_io_channel_iter_get_ctx(i); 6594 TAILQ_HEAD(, spdk_bdev_io) io_locked; 6595 struct spdk_bdev_io *bdev_io; 6596 struct lba_range *range; 6597 6598 TAILQ_FOREACH(range, &ch->locked_ranges, tailq) { 6599 if (ctx->range.offset == range->offset && 6600 ctx->range.length == range->length && 6601 ctx->range.locked_ctx == range->locked_ctx) { 6602 TAILQ_REMOVE(&ch->locked_ranges, range, tailq); 6603 free(range); 6604 break; 6605 } 6606 } 6607 6608 /* Note: we should almost always be able to assert that the range specified 6609 * was found. But there are some very rare corner cases where a new channel 6610 * gets created simultaneously with a range unlock, where this function 6611 * would execute on that new channel and wouldn't have the range. 6612 * We also use this to clean up range allocations when a later allocation 6613 * fails in the locking path. 6614 * So we can't actually assert() here. 6615 */ 6616 6617 /* Swap the locked IO into a temporary list, and then try to submit them again. 6618 * We could hyper-optimize this to only resubmit locked I/O that overlap 6619 * with the range that was just unlocked, but this isn't a performance path so 6620 * we go for simplicity here. 6621 */ 6622 TAILQ_INIT(&io_locked); 6623 TAILQ_SWAP(&ch->io_locked, &io_locked, spdk_bdev_io, internal.ch_link); 6624 while (!TAILQ_EMPTY(&io_locked)) { 6625 bdev_io = TAILQ_FIRST(&io_locked); 6626 TAILQ_REMOVE(&io_locked, bdev_io, internal.ch_link); 6627 bdev_io_submit(bdev_io); 6628 } 6629 6630 spdk_for_each_channel_continue(i, 0); 6631 } 6632 6633 static int 6634 bdev_unlock_lba_range(struct spdk_bdev_desc *desc, struct spdk_io_channel *_ch, 6635 uint64_t offset, uint64_t length, 6636 lock_range_cb cb_fn, void *cb_arg) 6637 { 6638 struct spdk_bdev *bdev = spdk_bdev_desc_get_bdev(desc); 6639 struct spdk_bdev_channel *ch = spdk_io_channel_get_ctx(_ch); 6640 struct locked_lba_range_ctx *ctx; 6641 struct lba_range *range; 6642 bool range_found = false; 6643 6644 /* Let's make sure the specified channel actually has a lock on 6645 * the specified range. Note that the range must match exactly. 6646 */ 6647 TAILQ_FOREACH(range, &ch->locked_ranges, tailq) { 6648 if (range->offset == offset && range->length == length && 6649 range->owner_ch == ch && range->locked_ctx == cb_arg) { 6650 range_found = true; 6651 break; 6652 } 6653 } 6654 6655 if (!range_found) { 6656 return -EINVAL; 6657 } 6658 6659 pthread_mutex_lock(&bdev->internal.mutex); 6660 /* We confirmed that this channel has locked the specified range. To 6661 * start the unlock the process, we find the range in the bdev's locked_ranges 6662 * and remove it. This ensures new channels don't inherit the locked range. 6663 * Then we will send a message to each channel (including the one specified 6664 * here) to remove the range from its per-channel list. 6665 */ 6666 TAILQ_FOREACH(range, &bdev->internal.locked_ranges, tailq) { 6667 if (range->offset == offset && range->length == length && 6668 range->locked_ctx == cb_arg) { 6669 break; 6670 } 6671 } 6672 if (range == NULL) { 6673 assert(false); 6674 pthread_mutex_unlock(&bdev->internal.mutex); 6675 return -EINVAL; 6676 } 6677 TAILQ_REMOVE(&bdev->internal.locked_ranges, range, tailq); 6678 ctx = SPDK_CONTAINEROF(range, struct locked_lba_range_ctx, range); 6679 pthread_mutex_unlock(&bdev->internal.mutex); 6680 6681 ctx->cb_fn = cb_fn; 6682 ctx->cb_arg = cb_arg; 6683 6684 spdk_for_each_channel(__bdev_to_io_dev(bdev), bdev_unlock_lba_range_get_channel, ctx, 6685 bdev_unlock_lba_range_cb); 6686 return 0; 6687 } 6688 6689 SPDK_LOG_REGISTER_COMPONENT(bdev) 6690 6691 SPDK_TRACE_REGISTER_FN(bdev_trace, "bdev", TRACE_GROUP_BDEV) 6692 { 6693 spdk_trace_register_owner(OWNER_BDEV, 'b'); 6694 spdk_trace_register_object(OBJECT_BDEV_IO, 'i'); 6695 spdk_trace_register_description("BDEV_IO_START", TRACE_BDEV_IO_START, OWNER_BDEV, 6696 OBJECT_BDEV_IO, 1, 0, "type: "); 6697 spdk_trace_register_description("BDEV_IO_DONE", TRACE_BDEV_IO_DONE, OWNER_BDEV, 6698 OBJECT_BDEV_IO, 0, 0, ""); 6699 } 6700