1 /*- 2 * BSD LICENSE 3 * 4 * Copyright (c) Intel Corporation. All rights reserved. 5 * Copyright (c) 2019 Mellanox Technologies LTD. All rights reserved. 6 * 7 * Redistribution and use in source and binary forms, with or without 8 * modification, are permitted provided that the following conditions 9 * are met: 10 * 11 * * Redistributions of source code must retain the above copyright 12 * notice, this list of conditions and the following disclaimer. 13 * * Redistributions in binary form must reproduce the above copyright 14 * notice, this list of conditions and the following disclaimer in 15 * the documentation and/or other materials provided with the 16 * distribution. 17 * * Neither the name of Intel Corporation nor the names of its 18 * contributors may be used to endorse or promote products derived 19 * from this software without specific prior written permission. 20 * 21 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 22 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 23 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR 24 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT 25 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, 26 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT 27 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 28 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY 29 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 30 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE 31 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 32 */ 33 34 #include "spdk/stdinc.h" 35 36 #include "spdk/bdev.h" 37 38 #include "spdk/config.h" 39 #include "spdk/env.h" 40 #include "spdk/thread.h" 41 #include "spdk/likely.h" 42 #include "spdk/queue.h" 43 #include "spdk/nvme_spec.h" 44 #include "spdk/scsi_spec.h" 45 #include "spdk/notify.h" 46 #include "spdk/util.h" 47 #include "spdk/trace.h" 48 49 #include "spdk/bdev_module.h" 50 #include "spdk/log.h" 51 #include "spdk/string.h" 52 53 #include "bdev_internal.h" 54 55 #ifdef SPDK_CONFIG_VTUNE 56 #include "ittnotify.h" 57 #include "ittnotify_types.h" 58 int __itt_init_ittlib(const char *, __itt_group_id); 59 #endif 60 61 #define SPDK_BDEV_IO_POOL_SIZE (64 * 1024 - 1) 62 #define SPDK_BDEV_IO_CACHE_SIZE 256 63 #define SPDK_BDEV_AUTO_EXAMINE true 64 #define BUF_SMALL_POOL_SIZE 8191 65 #define BUF_LARGE_POOL_SIZE 1023 66 #define NOMEM_THRESHOLD_COUNT 8 67 #define ZERO_BUFFER_SIZE 0x100000 68 69 #define OWNER_BDEV 0x2 70 71 #define OBJECT_BDEV_IO 0x2 72 73 #define TRACE_GROUP_BDEV 0x3 74 #define TRACE_BDEV_IO_START SPDK_TPOINT_ID(TRACE_GROUP_BDEV, 0x0) 75 #define TRACE_BDEV_IO_DONE SPDK_TPOINT_ID(TRACE_GROUP_BDEV, 0x1) 76 77 #define SPDK_BDEV_QOS_TIMESLICE_IN_USEC 1000 78 #define SPDK_BDEV_QOS_MIN_IO_PER_TIMESLICE 1 79 #define SPDK_BDEV_QOS_MIN_BYTE_PER_TIMESLICE 512 80 #define SPDK_BDEV_QOS_MIN_IOS_PER_SEC 1000 81 #define SPDK_BDEV_QOS_MIN_BYTES_PER_SEC (1024 * 1024) 82 #define SPDK_BDEV_QOS_LIMIT_NOT_DEFINED UINT64_MAX 83 #define SPDK_BDEV_IO_POLL_INTERVAL_IN_MSEC 1000 84 85 #define SPDK_BDEV_POOL_ALIGNMENT 512 86 87 static const char *qos_rpc_type[] = {"rw_ios_per_sec", 88 "rw_mbytes_per_sec", "r_mbytes_per_sec", "w_mbytes_per_sec" 89 }; 90 91 TAILQ_HEAD(spdk_bdev_list, spdk_bdev); 92 93 struct spdk_bdev_mgr { 94 struct spdk_mempool *bdev_io_pool; 95 96 struct spdk_mempool *buf_small_pool; 97 struct spdk_mempool *buf_large_pool; 98 99 void *zero_buffer; 100 101 TAILQ_HEAD(bdev_module_list, spdk_bdev_module) bdev_modules; 102 103 struct spdk_bdev_list bdevs; 104 105 bool init_complete; 106 bool module_init_complete; 107 108 pthread_mutex_t mutex; 109 110 #ifdef SPDK_CONFIG_VTUNE 111 __itt_domain *domain; 112 #endif 113 }; 114 115 static struct spdk_bdev_mgr g_bdev_mgr = { 116 .bdev_modules = TAILQ_HEAD_INITIALIZER(g_bdev_mgr.bdev_modules), 117 .bdevs = TAILQ_HEAD_INITIALIZER(g_bdev_mgr.bdevs), 118 .init_complete = false, 119 .module_init_complete = false, 120 .mutex = PTHREAD_MUTEX_INITIALIZER, 121 }; 122 123 typedef void (*lock_range_cb)(void *ctx, int status); 124 125 struct lba_range { 126 uint64_t offset; 127 uint64_t length; 128 void *locked_ctx; 129 struct spdk_bdev_channel *owner_ch; 130 TAILQ_ENTRY(lba_range) tailq; 131 }; 132 133 static struct spdk_bdev_opts g_bdev_opts = { 134 .bdev_io_pool_size = SPDK_BDEV_IO_POOL_SIZE, 135 .bdev_io_cache_size = SPDK_BDEV_IO_CACHE_SIZE, 136 .bdev_auto_examine = SPDK_BDEV_AUTO_EXAMINE, 137 }; 138 139 static spdk_bdev_init_cb g_init_cb_fn = NULL; 140 static void *g_init_cb_arg = NULL; 141 142 static spdk_bdev_fini_cb g_fini_cb_fn = NULL; 143 static void *g_fini_cb_arg = NULL; 144 static struct spdk_thread *g_fini_thread = NULL; 145 146 struct spdk_bdev_qos_limit { 147 /** IOs or bytes allowed per second (i.e., 1s). */ 148 uint64_t limit; 149 150 /** Remaining IOs or bytes allowed in current timeslice (e.g., 1ms). 151 * For remaining bytes, allowed to run negative if an I/O is submitted when 152 * some bytes are remaining, but the I/O is bigger than that amount. The 153 * excess will be deducted from the next timeslice. 154 */ 155 int64_t remaining_this_timeslice; 156 157 /** Minimum allowed IOs or bytes to be issued in one timeslice (e.g., 1ms). */ 158 uint32_t min_per_timeslice; 159 160 /** Maximum allowed IOs or bytes to be issued in one timeslice (e.g., 1ms). */ 161 uint32_t max_per_timeslice; 162 163 /** Function to check whether to queue the IO. */ 164 bool (*queue_io)(const struct spdk_bdev_qos_limit *limit, struct spdk_bdev_io *io); 165 166 /** Function to update for the submitted IO. */ 167 void (*update_quota)(struct spdk_bdev_qos_limit *limit, struct spdk_bdev_io *io); 168 }; 169 170 struct spdk_bdev_qos { 171 /** Types of structure of rate limits. */ 172 struct spdk_bdev_qos_limit rate_limits[SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES]; 173 174 /** The channel that all I/O are funneled through. */ 175 struct spdk_bdev_channel *ch; 176 177 /** The thread on which the poller is running. */ 178 struct spdk_thread *thread; 179 180 /** Queue of I/O waiting to be issued. */ 181 bdev_io_tailq_t queued; 182 183 /** Size of a timeslice in tsc ticks. */ 184 uint64_t timeslice_size; 185 186 /** Timestamp of start of last timeslice. */ 187 uint64_t last_timeslice; 188 189 /** Poller that processes queued I/O commands each time slice. */ 190 struct spdk_poller *poller; 191 }; 192 193 struct spdk_bdev_mgmt_channel { 194 bdev_io_stailq_t need_buf_small; 195 bdev_io_stailq_t need_buf_large; 196 197 /* 198 * Each thread keeps a cache of bdev_io - this allows 199 * bdev threads which are *not* DPDK threads to still 200 * benefit from a per-thread bdev_io cache. Without 201 * this, non-DPDK threads fetching from the mempool 202 * incur a cmpxchg on get and put. 203 */ 204 bdev_io_stailq_t per_thread_cache; 205 uint32_t per_thread_cache_count; 206 uint32_t bdev_io_cache_size; 207 208 TAILQ_HEAD(, spdk_bdev_shared_resource) shared_resources; 209 TAILQ_HEAD(, spdk_bdev_io_wait_entry) io_wait_queue; 210 }; 211 212 /* 213 * Per-module (or per-io_device) data. Multiple bdevs built on the same io_device 214 * will queue here their IO that awaits retry. It makes it possible to retry sending 215 * IO to one bdev after IO from other bdev completes. 216 */ 217 struct spdk_bdev_shared_resource { 218 /* The bdev management channel */ 219 struct spdk_bdev_mgmt_channel *mgmt_ch; 220 221 /* 222 * Count of I/O submitted to bdev module and waiting for completion. 223 * Incremented before submit_request() is called on an spdk_bdev_io. 224 */ 225 uint64_t io_outstanding; 226 227 /* 228 * Queue of IO awaiting retry because of a previous NOMEM status returned 229 * on this channel. 230 */ 231 bdev_io_tailq_t nomem_io; 232 233 /* 234 * Threshold which io_outstanding must drop to before retrying nomem_io. 235 */ 236 uint64_t nomem_threshold; 237 238 /* I/O channel allocated by a bdev module */ 239 struct spdk_io_channel *shared_ch; 240 241 /* Refcount of bdev channels using this resource */ 242 uint32_t ref; 243 244 TAILQ_ENTRY(spdk_bdev_shared_resource) link; 245 }; 246 247 #define BDEV_CH_RESET_IN_PROGRESS (1 << 0) 248 #define BDEV_CH_QOS_ENABLED (1 << 1) 249 250 struct spdk_bdev_channel { 251 struct spdk_bdev *bdev; 252 253 /* The channel for the underlying device */ 254 struct spdk_io_channel *channel; 255 256 /* Per io_device per thread data */ 257 struct spdk_bdev_shared_resource *shared_resource; 258 259 struct spdk_bdev_io_stat stat; 260 261 /* 262 * Count of I/O submitted to the underlying dev module through this channel 263 * and waiting for completion. 264 */ 265 uint64_t io_outstanding; 266 267 /* 268 * List of all submitted I/Os including I/O that are generated via splitting. 269 */ 270 bdev_io_tailq_t io_submitted; 271 272 /* 273 * List of spdk_bdev_io that are currently queued because they write to a locked 274 * LBA range. 275 */ 276 bdev_io_tailq_t io_locked; 277 278 uint32_t flags; 279 280 struct spdk_histogram_data *histogram; 281 282 #ifdef SPDK_CONFIG_VTUNE 283 uint64_t start_tsc; 284 uint64_t interval_tsc; 285 __itt_string_handle *handle; 286 struct spdk_bdev_io_stat prev_stat; 287 #endif 288 289 bdev_io_tailq_t queued_resets; 290 291 lba_range_tailq_t locked_ranges; 292 }; 293 294 struct media_event_entry { 295 struct spdk_bdev_media_event event; 296 TAILQ_ENTRY(media_event_entry) tailq; 297 }; 298 299 #define MEDIA_EVENT_POOL_SIZE 64 300 301 struct spdk_bdev_desc { 302 struct spdk_bdev *bdev; 303 struct spdk_thread *thread; 304 struct { 305 bool open_with_ext; 306 union { 307 spdk_bdev_remove_cb_t remove_fn; 308 spdk_bdev_event_cb_t event_fn; 309 }; 310 void *ctx; 311 } callback; 312 bool closed; 313 bool write; 314 pthread_mutex_t mutex; 315 uint32_t refs; 316 TAILQ_HEAD(, media_event_entry) pending_media_events; 317 TAILQ_HEAD(, media_event_entry) free_media_events; 318 struct media_event_entry *media_events_buffer; 319 TAILQ_ENTRY(spdk_bdev_desc) link; 320 321 uint64_t timeout_in_sec; 322 spdk_bdev_io_timeout_cb cb_fn; 323 void *cb_arg; 324 struct spdk_poller *io_timeout_poller; 325 }; 326 327 struct spdk_bdev_iostat_ctx { 328 struct spdk_bdev_io_stat *stat; 329 spdk_bdev_get_device_stat_cb cb; 330 void *cb_arg; 331 }; 332 333 struct set_qos_limit_ctx { 334 void (*cb_fn)(void *cb_arg, int status); 335 void *cb_arg; 336 struct spdk_bdev *bdev; 337 }; 338 339 #define __bdev_to_io_dev(bdev) (((char *)bdev) + 1) 340 #define __bdev_from_io_dev(io_dev) ((struct spdk_bdev *)(((char *)io_dev) - 1)) 341 342 static void bdev_write_zero_buffer_done(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg); 343 static void bdev_write_zero_buffer_next(void *_bdev_io); 344 345 static void bdev_enable_qos_msg(struct spdk_io_channel_iter *i); 346 static void bdev_enable_qos_done(struct spdk_io_channel_iter *i, int status); 347 348 static int 349 bdev_readv_blocks_with_md(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 350 struct iovec *iov, int iovcnt, void *md_buf, uint64_t offset_blocks, 351 uint64_t num_blocks, spdk_bdev_io_completion_cb cb, void *cb_arg); 352 static int 353 bdev_writev_blocks_with_md(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 354 struct iovec *iov, int iovcnt, void *md_buf, 355 uint64_t offset_blocks, uint64_t num_blocks, 356 spdk_bdev_io_completion_cb cb, void *cb_arg); 357 358 static int 359 bdev_lock_lba_range(struct spdk_bdev_desc *desc, struct spdk_io_channel *_ch, 360 uint64_t offset, uint64_t length, 361 lock_range_cb cb_fn, void *cb_arg); 362 363 static int 364 bdev_unlock_lba_range(struct spdk_bdev_desc *desc, struct spdk_io_channel *_ch, 365 uint64_t offset, uint64_t length, 366 lock_range_cb cb_fn, void *cb_arg); 367 368 static inline void bdev_io_complete(void *ctx); 369 370 static bool bdev_abort_queued_io(bdev_io_tailq_t *queue, struct spdk_bdev_io *bio_to_abort); 371 static bool bdev_abort_buf_io(bdev_io_stailq_t *queue, struct spdk_bdev_io *bio_to_abort); 372 373 void 374 spdk_bdev_get_opts(struct spdk_bdev_opts *opts) 375 { 376 *opts = g_bdev_opts; 377 } 378 379 int 380 spdk_bdev_set_opts(struct spdk_bdev_opts *opts) 381 { 382 uint32_t min_pool_size; 383 384 /* 385 * Add 1 to the thread count to account for the extra mgmt_ch that gets created during subsystem 386 * initialization. A second mgmt_ch will be created on the same thread when the application starts 387 * but before the deferred put_io_channel event is executed for the first mgmt_ch. 388 */ 389 min_pool_size = opts->bdev_io_cache_size * (spdk_thread_get_count() + 1); 390 if (opts->bdev_io_pool_size < min_pool_size) { 391 SPDK_ERRLOG("bdev_io_pool_size %" PRIu32 " is not compatible with bdev_io_cache_size %" PRIu32 392 " and %" PRIu32 " threads\n", opts->bdev_io_pool_size, opts->bdev_io_cache_size, 393 spdk_thread_get_count()); 394 SPDK_ERRLOG("bdev_io_pool_size must be at least %" PRIu32 "\n", min_pool_size); 395 return -1; 396 } 397 398 g_bdev_opts = *opts; 399 return 0; 400 } 401 402 struct spdk_bdev_examine_item { 403 char *name; 404 TAILQ_ENTRY(spdk_bdev_examine_item) link; 405 }; 406 407 TAILQ_HEAD(spdk_bdev_examine_allowlist, spdk_bdev_examine_item); 408 409 struct spdk_bdev_examine_allowlist g_bdev_examine_allowlist = TAILQ_HEAD_INITIALIZER( 410 g_bdev_examine_allowlist); 411 412 static inline bool 413 bdev_examine_allowlist_check(const char *name) 414 { 415 struct spdk_bdev_examine_item *item; 416 TAILQ_FOREACH(item, &g_bdev_examine_allowlist, link) { 417 if (strcmp(name, item->name) == 0) { 418 return true; 419 } 420 } 421 return false; 422 } 423 424 static inline void 425 bdev_examine_allowlist_free(void) 426 { 427 struct spdk_bdev_examine_item *item; 428 while (!TAILQ_EMPTY(&g_bdev_examine_allowlist)) { 429 item = TAILQ_FIRST(&g_bdev_examine_allowlist); 430 TAILQ_REMOVE(&g_bdev_examine_allowlist, item, link); 431 free(item->name); 432 free(item); 433 } 434 } 435 436 static inline bool 437 bdev_in_examine_allowlist(struct spdk_bdev *bdev) 438 { 439 struct spdk_bdev_alias *tmp; 440 if (bdev_examine_allowlist_check(bdev->name)) { 441 return true; 442 } 443 TAILQ_FOREACH(tmp, &bdev->aliases, tailq) { 444 if (bdev_examine_allowlist_check(tmp->alias)) { 445 return true; 446 } 447 } 448 return false; 449 } 450 451 static inline bool 452 bdev_ok_to_examine(struct spdk_bdev *bdev) 453 { 454 if (g_bdev_opts.bdev_auto_examine) { 455 return true; 456 } else { 457 return bdev_in_examine_allowlist(bdev); 458 } 459 } 460 461 static void 462 bdev_examine(struct spdk_bdev *bdev) 463 { 464 struct spdk_bdev_module *module; 465 uint32_t action; 466 467 TAILQ_FOREACH(module, &g_bdev_mgr.bdev_modules, internal.tailq) { 468 if (module->examine_config && bdev_ok_to_examine(bdev)) { 469 action = module->internal.action_in_progress; 470 module->internal.action_in_progress++; 471 module->examine_config(bdev); 472 if (action != module->internal.action_in_progress) { 473 SPDK_ERRLOG("examine_config for module %s did not call spdk_bdev_module_examine_done()\n", 474 module->name); 475 } 476 } 477 } 478 479 if (bdev->internal.claim_module && bdev_ok_to_examine(bdev)) { 480 if (bdev->internal.claim_module->examine_disk) { 481 bdev->internal.claim_module->internal.action_in_progress++; 482 bdev->internal.claim_module->examine_disk(bdev); 483 } 484 return; 485 } 486 487 TAILQ_FOREACH(module, &g_bdev_mgr.bdev_modules, internal.tailq) { 488 if (module->examine_disk && bdev_ok_to_examine(bdev)) { 489 module->internal.action_in_progress++; 490 module->examine_disk(bdev); 491 } 492 } 493 } 494 495 int 496 spdk_bdev_examine(const char *name) 497 { 498 struct spdk_bdev *bdev; 499 struct spdk_bdev_examine_item *item; 500 501 if (g_bdev_opts.bdev_auto_examine) { 502 SPDK_ERRLOG("Manual examine is not allowed if auto examine is enabled"); 503 return -EINVAL; 504 } 505 506 if (bdev_examine_allowlist_check(name)) { 507 SPDK_ERRLOG("Duplicate bdev name for manual examine: %s\n", name); 508 return -EEXIST; 509 } 510 511 item = calloc(1, sizeof(*item)); 512 if (!item) { 513 return -ENOMEM; 514 } 515 item->name = strdup(name); 516 if (!item->name) { 517 free(item); 518 return -ENOMEM; 519 } 520 TAILQ_INSERT_TAIL(&g_bdev_examine_allowlist, item, link); 521 522 bdev = spdk_bdev_get_by_name(name); 523 if (bdev) { 524 bdev_examine(bdev); 525 } 526 return 0; 527 } 528 529 static inline void 530 bdev_examine_allowlist_config_json(struct spdk_json_write_ctx *w) 531 { 532 struct spdk_bdev_examine_item *item; 533 TAILQ_FOREACH(item, &g_bdev_examine_allowlist, link) { 534 spdk_json_write_object_begin(w); 535 spdk_json_write_named_string(w, "method", "bdev_examine"); 536 spdk_json_write_named_object_begin(w, "params"); 537 spdk_json_write_named_string(w, "name", item->name); 538 spdk_json_write_object_end(w); 539 spdk_json_write_object_end(w); 540 } 541 } 542 543 struct spdk_bdev * 544 spdk_bdev_first(void) 545 { 546 struct spdk_bdev *bdev; 547 548 bdev = TAILQ_FIRST(&g_bdev_mgr.bdevs); 549 if (bdev) { 550 SPDK_DEBUGLOG(bdev, "Starting bdev iteration at %s\n", bdev->name); 551 } 552 553 return bdev; 554 } 555 556 struct spdk_bdev * 557 spdk_bdev_next(struct spdk_bdev *prev) 558 { 559 struct spdk_bdev *bdev; 560 561 bdev = TAILQ_NEXT(prev, internal.link); 562 if (bdev) { 563 SPDK_DEBUGLOG(bdev, "Continuing bdev iteration at %s\n", bdev->name); 564 } 565 566 return bdev; 567 } 568 569 static struct spdk_bdev * 570 _bdev_next_leaf(struct spdk_bdev *bdev) 571 { 572 while (bdev != NULL) { 573 if (bdev->internal.claim_module == NULL) { 574 return bdev; 575 } else { 576 bdev = TAILQ_NEXT(bdev, internal.link); 577 } 578 } 579 580 return bdev; 581 } 582 583 struct spdk_bdev * 584 spdk_bdev_first_leaf(void) 585 { 586 struct spdk_bdev *bdev; 587 588 bdev = _bdev_next_leaf(TAILQ_FIRST(&g_bdev_mgr.bdevs)); 589 590 if (bdev) { 591 SPDK_DEBUGLOG(bdev, "Starting bdev iteration at %s\n", bdev->name); 592 } 593 594 return bdev; 595 } 596 597 struct spdk_bdev * 598 spdk_bdev_next_leaf(struct spdk_bdev *prev) 599 { 600 struct spdk_bdev *bdev; 601 602 bdev = _bdev_next_leaf(TAILQ_NEXT(prev, internal.link)); 603 604 if (bdev) { 605 SPDK_DEBUGLOG(bdev, "Continuing bdev iteration at %s\n", bdev->name); 606 } 607 608 return bdev; 609 } 610 611 struct spdk_bdev * 612 spdk_bdev_get_by_name(const char *bdev_name) 613 { 614 struct spdk_bdev_alias *tmp; 615 struct spdk_bdev *bdev = spdk_bdev_first(); 616 617 while (bdev != NULL) { 618 if (strcmp(bdev_name, bdev->name) == 0) { 619 return bdev; 620 } 621 622 TAILQ_FOREACH(tmp, &bdev->aliases, tailq) { 623 if (strcmp(bdev_name, tmp->alias) == 0) { 624 return bdev; 625 } 626 } 627 628 bdev = spdk_bdev_next(bdev); 629 } 630 631 return NULL; 632 } 633 634 void 635 spdk_bdev_io_set_buf(struct spdk_bdev_io *bdev_io, void *buf, size_t len) 636 { 637 struct iovec *iovs; 638 639 if (bdev_io->u.bdev.iovs == NULL) { 640 bdev_io->u.bdev.iovs = &bdev_io->iov; 641 bdev_io->u.bdev.iovcnt = 1; 642 } 643 644 iovs = bdev_io->u.bdev.iovs; 645 646 assert(iovs != NULL); 647 assert(bdev_io->u.bdev.iovcnt >= 1); 648 649 iovs[0].iov_base = buf; 650 iovs[0].iov_len = len; 651 } 652 653 void 654 spdk_bdev_io_set_md_buf(struct spdk_bdev_io *bdev_io, void *md_buf, size_t len) 655 { 656 assert((len / spdk_bdev_get_md_size(bdev_io->bdev)) >= bdev_io->u.bdev.num_blocks); 657 bdev_io->u.bdev.md_buf = md_buf; 658 } 659 660 static bool 661 _is_buf_allocated(const struct iovec *iovs) 662 { 663 if (iovs == NULL) { 664 return false; 665 } 666 667 return iovs[0].iov_base != NULL; 668 } 669 670 static bool 671 _are_iovs_aligned(struct iovec *iovs, int iovcnt, uint32_t alignment) 672 { 673 int i; 674 uintptr_t iov_base; 675 676 if (spdk_likely(alignment == 1)) { 677 return true; 678 } 679 680 for (i = 0; i < iovcnt; i++) { 681 iov_base = (uintptr_t)iovs[i].iov_base; 682 if ((iov_base & (alignment - 1)) != 0) { 683 return false; 684 } 685 } 686 687 return true; 688 } 689 690 static void 691 _copy_iovs_to_buf(void *buf, size_t buf_len, struct iovec *iovs, int iovcnt) 692 { 693 int i; 694 size_t len; 695 696 for (i = 0; i < iovcnt; i++) { 697 len = spdk_min(iovs[i].iov_len, buf_len); 698 memcpy(buf, iovs[i].iov_base, len); 699 buf += len; 700 buf_len -= len; 701 } 702 } 703 704 static void 705 _copy_buf_to_iovs(struct iovec *iovs, int iovcnt, void *buf, size_t buf_len) 706 { 707 int i; 708 size_t len; 709 710 for (i = 0; i < iovcnt; i++) { 711 len = spdk_min(iovs[i].iov_len, buf_len); 712 memcpy(iovs[i].iov_base, buf, len); 713 buf += len; 714 buf_len -= len; 715 } 716 } 717 718 static void 719 _bdev_io_set_bounce_buf(struct spdk_bdev_io *bdev_io, void *buf, size_t len) 720 { 721 /* save original iovec */ 722 bdev_io->internal.orig_iovs = bdev_io->u.bdev.iovs; 723 bdev_io->internal.orig_iovcnt = bdev_io->u.bdev.iovcnt; 724 /* set bounce iov */ 725 bdev_io->u.bdev.iovs = &bdev_io->internal.bounce_iov; 726 bdev_io->u.bdev.iovcnt = 1; 727 /* set bounce buffer for this operation */ 728 bdev_io->u.bdev.iovs[0].iov_base = buf; 729 bdev_io->u.bdev.iovs[0].iov_len = len; 730 /* if this is write path, copy data from original buffer to bounce buffer */ 731 if (bdev_io->type == SPDK_BDEV_IO_TYPE_WRITE) { 732 _copy_iovs_to_buf(buf, len, bdev_io->internal.orig_iovs, bdev_io->internal.orig_iovcnt); 733 } 734 } 735 736 static void 737 _bdev_io_set_bounce_md_buf(struct spdk_bdev_io *bdev_io, void *md_buf, size_t len) 738 { 739 /* save original md_buf */ 740 bdev_io->internal.orig_md_buf = bdev_io->u.bdev.md_buf; 741 /* set bounce md_buf */ 742 bdev_io->u.bdev.md_buf = md_buf; 743 744 if (bdev_io->type == SPDK_BDEV_IO_TYPE_WRITE) { 745 memcpy(md_buf, bdev_io->internal.orig_md_buf, len); 746 } 747 } 748 749 static void 750 bdev_io_get_buf_complete(struct spdk_bdev_io *bdev_io, void *buf, bool status) 751 { 752 struct spdk_io_channel *ch = spdk_bdev_io_get_io_channel(bdev_io); 753 754 if (spdk_unlikely(bdev_io->internal.get_aux_buf_cb != NULL)) { 755 bdev_io->internal.get_aux_buf_cb(ch, bdev_io, buf); 756 bdev_io->internal.get_aux_buf_cb = NULL; 757 } else { 758 assert(bdev_io->internal.get_buf_cb != NULL); 759 bdev_io->internal.buf = buf; 760 bdev_io->internal.get_buf_cb(ch, bdev_io, status); 761 bdev_io->internal.get_buf_cb = NULL; 762 } 763 } 764 765 static void 766 _bdev_io_set_buf(struct spdk_bdev_io *bdev_io, void *buf, uint64_t len) 767 { 768 struct spdk_bdev *bdev = bdev_io->bdev; 769 bool buf_allocated; 770 uint64_t md_len, alignment; 771 void *aligned_buf; 772 773 if (spdk_unlikely(bdev_io->internal.get_aux_buf_cb != NULL)) { 774 bdev_io_get_buf_complete(bdev_io, buf, true); 775 return; 776 } 777 778 alignment = spdk_bdev_get_buf_align(bdev); 779 buf_allocated = _is_buf_allocated(bdev_io->u.bdev.iovs); 780 aligned_buf = (void *)(((uintptr_t)buf + (alignment - 1)) & ~(alignment - 1)); 781 782 if (buf_allocated) { 783 _bdev_io_set_bounce_buf(bdev_io, aligned_buf, len); 784 } else { 785 spdk_bdev_io_set_buf(bdev_io, aligned_buf, len); 786 } 787 788 if (spdk_bdev_is_md_separate(bdev)) { 789 aligned_buf = (char *)aligned_buf + len; 790 md_len = bdev_io->u.bdev.num_blocks * bdev->md_len; 791 792 assert(((uintptr_t)aligned_buf & (alignment - 1)) == 0); 793 794 if (bdev_io->u.bdev.md_buf != NULL) { 795 _bdev_io_set_bounce_md_buf(bdev_io, aligned_buf, md_len); 796 } else { 797 spdk_bdev_io_set_md_buf(bdev_io, aligned_buf, md_len); 798 } 799 } 800 bdev_io_get_buf_complete(bdev_io, buf, true); 801 } 802 803 static void 804 _bdev_io_put_buf(struct spdk_bdev_io *bdev_io, void *buf, uint64_t buf_len) 805 { 806 struct spdk_bdev *bdev = bdev_io->bdev; 807 struct spdk_mempool *pool; 808 struct spdk_bdev_io *tmp; 809 bdev_io_stailq_t *stailq; 810 struct spdk_bdev_mgmt_channel *ch; 811 uint64_t md_len, alignment; 812 813 md_len = spdk_bdev_is_md_separate(bdev) ? bdev_io->u.bdev.num_blocks * bdev->md_len : 0; 814 alignment = spdk_bdev_get_buf_align(bdev); 815 ch = bdev_io->internal.ch->shared_resource->mgmt_ch; 816 817 if (buf_len + alignment + md_len <= SPDK_BDEV_BUF_SIZE_WITH_MD(SPDK_BDEV_SMALL_BUF_MAX_SIZE) + 818 SPDK_BDEV_POOL_ALIGNMENT) { 819 pool = g_bdev_mgr.buf_small_pool; 820 stailq = &ch->need_buf_small; 821 } else { 822 pool = g_bdev_mgr.buf_large_pool; 823 stailq = &ch->need_buf_large; 824 } 825 826 if (STAILQ_EMPTY(stailq)) { 827 spdk_mempool_put(pool, buf); 828 } else { 829 tmp = STAILQ_FIRST(stailq); 830 STAILQ_REMOVE_HEAD(stailq, internal.buf_link); 831 _bdev_io_set_buf(tmp, buf, tmp->internal.buf_len); 832 } 833 } 834 835 static void 836 bdev_io_put_buf(struct spdk_bdev_io *bdev_io) 837 { 838 assert(bdev_io->internal.buf != NULL); 839 _bdev_io_put_buf(bdev_io, bdev_io->internal.buf, bdev_io->internal.buf_len); 840 bdev_io->internal.buf = NULL; 841 } 842 843 void 844 spdk_bdev_io_put_aux_buf(struct spdk_bdev_io *bdev_io, void *buf) 845 { 846 uint64_t len = bdev_io->u.bdev.num_blocks * bdev_io->bdev->blocklen; 847 848 assert(buf != NULL); 849 _bdev_io_put_buf(bdev_io, buf, len); 850 } 851 852 static void 853 _bdev_io_unset_bounce_buf(struct spdk_bdev_io *bdev_io) 854 { 855 if (spdk_likely(bdev_io->internal.orig_iovcnt == 0)) { 856 assert(bdev_io->internal.orig_md_buf == NULL); 857 return; 858 } 859 860 /* if this is read path, copy data from bounce buffer to original buffer */ 861 if (bdev_io->type == SPDK_BDEV_IO_TYPE_READ && 862 bdev_io->internal.status == SPDK_BDEV_IO_STATUS_SUCCESS) { 863 _copy_buf_to_iovs(bdev_io->internal.orig_iovs, 864 bdev_io->internal.orig_iovcnt, 865 bdev_io->internal.bounce_iov.iov_base, 866 bdev_io->internal.bounce_iov.iov_len); 867 } 868 /* set original buffer for this io */ 869 bdev_io->u.bdev.iovcnt = bdev_io->internal.orig_iovcnt; 870 bdev_io->u.bdev.iovs = bdev_io->internal.orig_iovs; 871 /* disable bouncing buffer for this io */ 872 bdev_io->internal.orig_iovcnt = 0; 873 bdev_io->internal.orig_iovs = NULL; 874 875 /* do the same for metadata buffer */ 876 if (spdk_unlikely(bdev_io->internal.orig_md_buf != NULL)) { 877 assert(spdk_bdev_is_md_separate(bdev_io->bdev)); 878 879 if (bdev_io->type == SPDK_BDEV_IO_TYPE_READ && 880 bdev_io->internal.status == SPDK_BDEV_IO_STATUS_SUCCESS) { 881 memcpy(bdev_io->internal.orig_md_buf, bdev_io->u.bdev.md_buf, 882 bdev_io->u.bdev.num_blocks * spdk_bdev_get_md_size(bdev_io->bdev)); 883 } 884 885 bdev_io->u.bdev.md_buf = bdev_io->internal.orig_md_buf; 886 bdev_io->internal.orig_md_buf = NULL; 887 } 888 889 /* We want to free the bounce buffer here since we know we're done with it (as opposed 890 * to waiting for the conditional free of internal.buf in spdk_bdev_free_io()). 891 */ 892 bdev_io_put_buf(bdev_io); 893 } 894 895 static void 896 bdev_io_get_buf(struct spdk_bdev_io *bdev_io, uint64_t len) 897 { 898 struct spdk_bdev *bdev = bdev_io->bdev; 899 struct spdk_mempool *pool; 900 bdev_io_stailq_t *stailq; 901 struct spdk_bdev_mgmt_channel *mgmt_ch; 902 uint64_t alignment, md_len; 903 void *buf; 904 905 alignment = spdk_bdev_get_buf_align(bdev); 906 md_len = spdk_bdev_is_md_separate(bdev) ? bdev_io->u.bdev.num_blocks * bdev->md_len : 0; 907 908 if (len + alignment + md_len > SPDK_BDEV_BUF_SIZE_WITH_MD(SPDK_BDEV_LARGE_BUF_MAX_SIZE) + 909 SPDK_BDEV_POOL_ALIGNMENT) { 910 SPDK_ERRLOG("Length + alignment %" PRIu64 " is larger than allowed\n", 911 len + alignment); 912 bdev_io_get_buf_complete(bdev_io, NULL, false); 913 return; 914 } 915 916 mgmt_ch = bdev_io->internal.ch->shared_resource->mgmt_ch; 917 918 bdev_io->internal.buf_len = len; 919 920 if (len + alignment + md_len <= SPDK_BDEV_BUF_SIZE_WITH_MD(SPDK_BDEV_SMALL_BUF_MAX_SIZE) + 921 SPDK_BDEV_POOL_ALIGNMENT) { 922 pool = g_bdev_mgr.buf_small_pool; 923 stailq = &mgmt_ch->need_buf_small; 924 } else { 925 pool = g_bdev_mgr.buf_large_pool; 926 stailq = &mgmt_ch->need_buf_large; 927 } 928 929 buf = spdk_mempool_get(pool); 930 if (!buf) { 931 STAILQ_INSERT_TAIL(stailq, bdev_io, internal.buf_link); 932 } else { 933 _bdev_io_set_buf(bdev_io, buf, len); 934 } 935 } 936 937 void 938 spdk_bdev_io_get_buf(struct spdk_bdev_io *bdev_io, spdk_bdev_io_get_buf_cb cb, uint64_t len) 939 { 940 struct spdk_bdev *bdev = bdev_io->bdev; 941 uint64_t alignment; 942 943 assert(cb != NULL); 944 bdev_io->internal.get_buf_cb = cb; 945 946 alignment = spdk_bdev_get_buf_align(bdev); 947 948 if (_is_buf_allocated(bdev_io->u.bdev.iovs) && 949 _are_iovs_aligned(bdev_io->u.bdev.iovs, bdev_io->u.bdev.iovcnt, alignment)) { 950 /* Buffer already present and aligned */ 951 cb(spdk_bdev_io_get_io_channel(bdev_io), bdev_io, true); 952 return; 953 } 954 955 bdev_io_get_buf(bdev_io, len); 956 } 957 958 void 959 spdk_bdev_io_get_aux_buf(struct spdk_bdev_io *bdev_io, spdk_bdev_io_get_aux_buf_cb cb) 960 { 961 uint64_t len = bdev_io->u.bdev.num_blocks * bdev_io->bdev->blocklen; 962 963 assert(cb != NULL); 964 assert(bdev_io->internal.get_aux_buf_cb == NULL); 965 bdev_io->internal.get_aux_buf_cb = cb; 966 bdev_io_get_buf(bdev_io, len); 967 } 968 969 static int 970 bdev_module_get_max_ctx_size(void) 971 { 972 struct spdk_bdev_module *bdev_module; 973 int max_bdev_module_size = 0; 974 975 TAILQ_FOREACH(bdev_module, &g_bdev_mgr.bdev_modules, internal.tailq) { 976 if (bdev_module->get_ctx_size && bdev_module->get_ctx_size() > max_bdev_module_size) { 977 max_bdev_module_size = bdev_module->get_ctx_size(); 978 } 979 } 980 981 return max_bdev_module_size; 982 } 983 984 static void 985 bdev_qos_config_json(struct spdk_bdev *bdev, struct spdk_json_write_ctx *w) 986 { 987 int i; 988 struct spdk_bdev_qos *qos = bdev->internal.qos; 989 uint64_t limits[SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES]; 990 991 if (!qos) { 992 return; 993 } 994 995 spdk_bdev_get_qos_rate_limits(bdev, limits); 996 997 spdk_json_write_object_begin(w); 998 spdk_json_write_named_string(w, "method", "bdev_set_qos_limit"); 999 1000 spdk_json_write_named_object_begin(w, "params"); 1001 spdk_json_write_named_string(w, "name", bdev->name); 1002 for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) { 1003 if (limits[i] > 0) { 1004 spdk_json_write_named_uint64(w, qos_rpc_type[i], limits[i]); 1005 } 1006 } 1007 spdk_json_write_object_end(w); 1008 1009 spdk_json_write_object_end(w); 1010 } 1011 1012 void 1013 spdk_bdev_subsystem_config_json(struct spdk_json_write_ctx *w) 1014 { 1015 struct spdk_bdev_module *bdev_module; 1016 struct spdk_bdev *bdev; 1017 1018 assert(w != NULL); 1019 1020 spdk_json_write_array_begin(w); 1021 1022 spdk_json_write_object_begin(w); 1023 spdk_json_write_named_string(w, "method", "bdev_set_options"); 1024 spdk_json_write_named_object_begin(w, "params"); 1025 spdk_json_write_named_uint32(w, "bdev_io_pool_size", g_bdev_opts.bdev_io_pool_size); 1026 spdk_json_write_named_uint32(w, "bdev_io_cache_size", g_bdev_opts.bdev_io_cache_size); 1027 spdk_json_write_named_bool(w, "bdev_auto_examine", g_bdev_opts.bdev_auto_examine); 1028 spdk_json_write_object_end(w); 1029 spdk_json_write_object_end(w); 1030 1031 bdev_examine_allowlist_config_json(w); 1032 1033 TAILQ_FOREACH(bdev_module, &g_bdev_mgr.bdev_modules, internal.tailq) { 1034 if (bdev_module->config_json) { 1035 bdev_module->config_json(w); 1036 } 1037 } 1038 1039 pthread_mutex_lock(&g_bdev_mgr.mutex); 1040 1041 TAILQ_FOREACH(bdev, &g_bdev_mgr.bdevs, internal.link) { 1042 if (bdev->fn_table->write_config_json) { 1043 bdev->fn_table->write_config_json(bdev, w); 1044 } 1045 1046 bdev_qos_config_json(bdev, w); 1047 } 1048 1049 pthread_mutex_unlock(&g_bdev_mgr.mutex); 1050 1051 spdk_json_write_array_end(w); 1052 } 1053 1054 static int 1055 bdev_mgmt_channel_create(void *io_device, void *ctx_buf) 1056 { 1057 struct spdk_bdev_mgmt_channel *ch = ctx_buf; 1058 struct spdk_bdev_io *bdev_io; 1059 uint32_t i; 1060 1061 STAILQ_INIT(&ch->need_buf_small); 1062 STAILQ_INIT(&ch->need_buf_large); 1063 1064 STAILQ_INIT(&ch->per_thread_cache); 1065 ch->bdev_io_cache_size = g_bdev_opts.bdev_io_cache_size; 1066 1067 /* Pre-populate bdev_io cache to ensure this thread cannot be starved. */ 1068 ch->per_thread_cache_count = 0; 1069 for (i = 0; i < ch->bdev_io_cache_size; i++) { 1070 bdev_io = spdk_mempool_get(g_bdev_mgr.bdev_io_pool); 1071 assert(bdev_io != NULL); 1072 ch->per_thread_cache_count++; 1073 STAILQ_INSERT_HEAD(&ch->per_thread_cache, bdev_io, internal.buf_link); 1074 } 1075 1076 TAILQ_INIT(&ch->shared_resources); 1077 TAILQ_INIT(&ch->io_wait_queue); 1078 1079 return 0; 1080 } 1081 1082 static void 1083 bdev_mgmt_channel_destroy(void *io_device, void *ctx_buf) 1084 { 1085 struct spdk_bdev_mgmt_channel *ch = ctx_buf; 1086 struct spdk_bdev_io *bdev_io; 1087 1088 if (!STAILQ_EMPTY(&ch->need_buf_small) || !STAILQ_EMPTY(&ch->need_buf_large)) { 1089 SPDK_ERRLOG("Pending I/O list wasn't empty on mgmt channel free\n"); 1090 } 1091 1092 if (!TAILQ_EMPTY(&ch->shared_resources)) { 1093 SPDK_ERRLOG("Module channel list wasn't empty on mgmt channel free\n"); 1094 } 1095 1096 while (!STAILQ_EMPTY(&ch->per_thread_cache)) { 1097 bdev_io = STAILQ_FIRST(&ch->per_thread_cache); 1098 STAILQ_REMOVE_HEAD(&ch->per_thread_cache, internal.buf_link); 1099 ch->per_thread_cache_count--; 1100 spdk_mempool_put(g_bdev_mgr.bdev_io_pool, (void *)bdev_io); 1101 } 1102 1103 assert(ch->per_thread_cache_count == 0); 1104 } 1105 1106 static void 1107 bdev_init_complete(int rc) 1108 { 1109 spdk_bdev_init_cb cb_fn = g_init_cb_fn; 1110 void *cb_arg = g_init_cb_arg; 1111 struct spdk_bdev_module *m; 1112 1113 g_bdev_mgr.init_complete = true; 1114 g_init_cb_fn = NULL; 1115 g_init_cb_arg = NULL; 1116 1117 /* 1118 * For modules that need to know when subsystem init is complete, 1119 * inform them now. 1120 */ 1121 if (rc == 0) { 1122 TAILQ_FOREACH(m, &g_bdev_mgr.bdev_modules, internal.tailq) { 1123 if (m->init_complete) { 1124 m->init_complete(); 1125 } 1126 } 1127 } 1128 1129 cb_fn(cb_arg, rc); 1130 } 1131 1132 static void 1133 bdev_module_action_complete(void) 1134 { 1135 struct spdk_bdev_module *m; 1136 1137 /* 1138 * Don't finish bdev subsystem initialization if 1139 * module pre-initialization is still in progress, or 1140 * the subsystem been already initialized. 1141 */ 1142 if (!g_bdev_mgr.module_init_complete || g_bdev_mgr.init_complete) { 1143 return; 1144 } 1145 1146 /* 1147 * Check all bdev modules for inits/examinations in progress. If any 1148 * exist, return immediately since we cannot finish bdev subsystem 1149 * initialization until all are completed. 1150 */ 1151 TAILQ_FOREACH(m, &g_bdev_mgr.bdev_modules, internal.tailq) { 1152 if (m->internal.action_in_progress > 0) { 1153 return; 1154 } 1155 } 1156 1157 /* 1158 * Modules already finished initialization - now that all 1159 * the bdev modules have finished their asynchronous I/O 1160 * processing, the entire bdev layer can be marked as complete. 1161 */ 1162 bdev_init_complete(0); 1163 } 1164 1165 static void 1166 bdev_module_action_done(struct spdk_bdev_module *module) 1167 { 1168 assert(module->internal.action_in_progress > 0); 1169 module->internal.action_in_progress--; 1170 bdev_module_action_complete(); 1171 } 1172 1173 void 1174 spdk_bdev_module_init_done(struct spdk_bdev_module *module) 1175 { 1176 bdev_module_action_done(module); 1177 } 1178 1179 void 1180 spdk_bdev_module_examine_done(struct spdk_bdev_module *module) 1181 { 1182 bdev_module_action_done(module); 1183 } 1184 1185 /** The last initialized bdev module */ 1186 static struct spdk_bdev_module *g_resume_bdev_module = NULL; 1187 1188 static void 1189 bdev_init_failed(void *cb_arg) 1190 { 1191 struct spdk_bdev_module *module = cb_arg; 1192 1193 module->internal.action_in_progress--; 1194 bdev_init_complete(-1); 1195 } 1196 1197 static int 1198 bdev_modules_init(void) 1199 { 1200 struct spdk_bdev_module *module; 1201 int rc = 0; 1202 1203 TAILQ_FOREACH(module, &g_bdev_mgr.bdev_modules, internal.tailq) { 1204 g_resume_bdev_module = module; 1205 if (module->async_init) { 1206 module->internal.action_in_progress = 1; 1207 } 1208 rc = module->module_init(); 1209 if (rc != 0) { 1210 /* Bump action_in_progress to prevent other modules from completion of modules_init 1211 * Send message to defer application shutdown until resources are cleaned up */ 1212 module->internal.action_in_progress = 1; 1213 spdk_thread_send_msg(spdk_get_thread(), bdev_init_failed, module); 1214 return rc; 1215 } 1216 } 1217 1218 g_resume_bdev_module = NULL; 1219 return 0; 1220 } 1221 1222 void 1223 spdk_bdev_initialize(spdk_bdev_init_cb cb_fn, void *cb_arg) 1224 { 1225 int cache_size; 1226 int rc = 0; 1227 char mempool_name[32]; 1228 1229 assert(cb_fn != NULL); 1230 1231 g_init_cb_fn = cb_fn; 1232 g_init_cb_arg = cb_arg; 1233 1234 spdk_notify_type_register("bdev_register"); 1235 spdk_notify_type_register("bdev_unregister"); 1236 1237 snprintf(mempool_name, sizeof(mempool_name), "bdev_io_%d", getpid()); 1238 1239 g_bdev_mgr.bdev_io_pool = spdk_mempool_create(mempool_name, 1240 g_bdev_opts.bdev_io_pool_size, 1241 sizeof(struct spdk_bdev_io) + 1242 bdev_module_get_max_ctx_size(), 1243 0, 1244 SPDK_ENV_SOCKET_ID_ANY); 1245 1246 if (g_bdev_mgr.bdev_io_pool == NULL) { 1247 SPDK_ERRLOG("could not allocate spdk_bdev_io pool\n"); 1248 bdev_init_complete(-1); 1249 return; 1250 } 1251 1252 /** 1253 * Ensure no more than half of the total buffers end up local caches, by 1254 * using spdk_env_get_core_count() to determine how many local caches we need 1255 * to account for. 1256 */ 1257 cache_size = BUF_SMALL_POOL_SIZE / (2 * spdk_env_get_core_count()); 1258 snprintf(mempool_name, sizeof(mempool_name), "buf_small_pool_%d", getpid()); 1259 1260 g_bdev_mgr.buf_small_pool = spdk_mempool_create(mempool_name, 1261 BUF_SMALL_POOL_SIZE, 1262 SPDK_BDEV_BUF_SIZE_WITH_MD(SPDK_BDEV_SMALL_BUF_MAX_SIZE) + 1263 SPDK_BDEV_POOL_ALIGNMENT, 1264 cache_size, 1265 SPDK_ENV_SOCKET_ID_ANY); 1266 if (!g_bdev_mgr.buf_small_pool) { 1267 SPDK_ERRLOG("create rbuf small pool failed\n"); 1268 bdev_init_complete(-1); 1269 return; 1270 } 1271 1272 cache_size = BUF_LARGE_POOL_SIZE / (2 * spdk_env_get_core_count()); 1273 snprintf(mempool_name, sizeof(mempool_name), "buf_large_pool_%d", getpid()); 1274 1275 g_bdev_mgr.buf_large_pool = spdk_mempool_create(mempool_name, 1276 BUF_LARGE_POOL_SIZE, 1277 SPDK_BDEV_BUF_SIZE_WITH_MD(SPDK_BDEV_LARGE_BUF_MAX_SIZE) + 1278 SPDK_BDEV_POOL_ALIGNMENT, 1279 cache_size, 1280 SPDK_ENV_SOCKET_ID_ANY); 1281 if (!g_bdev_mgr.buf_large_pool) { 1282 SPDK_ERRLOG("create rbuf large pool failed\n"); 1283 bdev_init_complete(-1); 1284 return; 1285 } 1286 1287 g_bdev_mgr.zero_buffer = spdk_zmalloc(ZERO_BUFFER_SIZE, ZERO_BUFFER_SIZE, 1288 NULL, SPDK_ENV_LCORE_ID_ANY, SPDK_MALLOC_DMA); 1289 if (!g_bdev_mgr.zero_buffer) { 1290 SPDK_ERRLOG("create bdev zero buffer failed\n"); 1291 bdev_init_complete(-1); 1292 return; 1293 } 1294 1295 #ifdef SPDK_CONFIG_VTUNE 1296 g_bdev_mgr.domain = __itt_domain_create("spdk_bdev"); 1297 #endif 1298 1299 spdk_io_device_register(&g_bdev_mgr, bdev_mgmt_channel_create, 1300 bdev_mgmt_channel_destroy, 1301 sizeof(struct spdk_bdev_mgmt_channel), 1302 "bdev_mgr"); 1303 1304 rc = bdev_modules_init(); 1305 g_bdev_mgr.module_init_complete = true; 1306 if (rc != 0) { 1307 SPDK_ERRLOG("bdev modules init failed\n"); 1308 return; 1309 } 1310 1311 bdev_module_action_complete(); 1312 } 1313 1314 static void 1315 bdev_mgr_unregister_cb(void *io_device) 1316 { 1317 spdk_bdev_fini_cb cb_fn = g_fini_cb_fn; 1318 1319 if (g_bdev_mgr.bdev_io_pool) { 1320 if (spdk_mempool_count(g_bdev_mgr.bdev_io_pool) != g_bdev_opts.bdev_io_pool_size) { 1321 SPDK_ERRLOG("bdev IO pool count is %zu but should be %u\n", 1322 spdk_mempool_count(g_bdev_mgr.bdev_io_pool), 1323 g_bdev_opts.bdev_io_pool_size); 1324 } 1325 1326 spdk_mempool_free(g_bdev_mgr.bdev_io_pool); 1327 } 1328 1329 if (g_bdev_mgr.buf_small_pool) { 1330 if (spdk_mempool_count(g_bdev_mgr.buf_small_pool) != BUF_SMALL_POOL_SIZE) { 1331 SPDK_ERRLOG("Small buffer pool count is %zu but should be %u\n", 1332 spdk_mempool_count(g_bdev_mgr.buf_small_pool), 1333 BUF_SMALL_POOL_SIZE); 1334 assert(false); 1335 } 1336 1337 spdk_mempool_free(g_bdev_mgr.buf_small_pool); 1338 } 1339 1340 if (g_bdev_mgr.buf_large_pool) { 1341 if (spdk_mempool_count(g_bdev_mgr.buf_large_pool) != BUF_LARGE_POOL_SIZE) { 1342 SPDK_ERRLOG("Large buffer pool count is %zu but should be %u\n", 1343 spdk_mempool_count(g_bdev_mgr.buf_large_pool), 1344 BUF_LARGE_POOL_SIZE); 1345 assert(false); 1346 } 1347 1348 spdk_mempool_free(g_bdev_mgr.buf_large_pool); 1349 } 1350 1351 spdk_free(g_bdev_mgr.zero_buffer); 1352 1353 bdev_examine_allowlist_free(); 1354 1355 cb_fn(g_fini_cb_arg); 1356 g_fini_cb_fn = NULL; 1357 g_fini_cb_arg = NULL; 1358 g_bdev_mgr.init_complete = false; 1359 g_bdev_mgr.module_init_complete = false; 1360 pthread_mutex_destroy(&g_bdev_mgr.mutex); 1361 } 1362 1363 static void 1364 bdev_module_finish_iter(void *arg) 1365 { 1366 struct spdk_bdev_module *bdev_module; 1367 1368 /* FIXME: Handling initialization failures is broken now, 1369 * so we won't even try cleaning up after successfully 1370 * initialized modules. if module_init_complete is false, 1371 * just call spdk_bdev_mgr_unregister_cb 1372 */ 1373 if (!g_bdev_mgr.module_init_complete) { 1374 bdev_mgr_unregister_cb(NULL); 1375 return; 1376 } 1377 1378 /* Start iterating from the last touched module */ 1379 if (!g_resume_bdev_module) { 1380 bdev_module = TAILQ_LAST(&g_bdev_mgr.bdev_modules, bdev_module_list); 1381 } else { 1382 bdev_module = TAILQ_PREV(g_resume_bdev_module, bdev_module_list, 1383 internal.tailq); 1384 } 1385 1386 while (bdev_module) { 1387 if (bdev_module->async_fini) { 1388 /* Save our place so we can resume later. We must 1389 * save the variable here, before calling module_fini() 1390 * below, because in some cases the module may immediately 1391 * call spdk_bdev_module_finish_done() and re-enter 1392 * this function to continue iterating. */ 1393 g_resume_bdev_module = bdev_module; 1394 } 1395 1396 if (bdev_module->module_fini) { 1397 bdev_module->module_fini(); 1398 } 1399 1400 if (bdev_module->async_fini) { 1401 return; 1402 } 1403 1404 bdev_module = TAILQ_PREV(bdev_module, bdev_module_list, 1405 internal.tailq); 1406 } 1407 1408 g_resume_bdev_module = NULL; 1409 spdk_io_device_unregister(&g_bdev_mgr, bdev_mgr_unregister_cb); 1410 } 1411 1412 void 1413 spdk_bdev_module_finish_done(void) 1414 { 1415 if (spdk_get_thread() != g_fini_thread) { 1416 spdk_thread_send_msg(g_fini_thread, bdev_module_finish_iter, NULL); 1417 } else { 1418 bdev_module_finish_iter(NULL); 1419 } 1420 } 1421 1422 static void 1423 bdev_finish_unregister_bdevs_iter(void *cb_arg, int bdeverrno) 1424 { 1425 struct spdk_bdev *bdev = cb_arg; 1426 1427 if (bdeverrno && bdev) { 1428 SPDK_WARNLOG("Unable to unregister bdev '%s' during spdk_bdev_finish()\n", 1429 bdev->name); 1430 1431 /* 1432 * Since the call to spdk_bdev_unregister() failed, we have no way to free this 1433 * bdev; try to continue by manually removing this bdev from the list and continue 1434 * with the next bdev in the list. 1435 */ 1436 TAILQ_REMOVE(&g_bdev_mgr.bdevs, bdev, internal.link); 1437 } 1438 1439 if (TAILQ_EMPTY(&g_bdev_mgr.bdevs)) { 1440 SPDK_DEBUGLOG(bdev, "Done unregistering bdevs\n"); 1441 /* 1442 * Bdev module finish need to be deferred as we might be in the middle of some context 1443 * (like bdev part free) that will use this bdev (or private bdev driver ctx data) 1444 * after returning. 1445 */ 1446 spdk_thread_send_msg(spdk_get_thread(), bdev_module_finish_iter, NULL); 1447 return; 1448 } 1449 1450 /* 1451 * Unregister last unclaimed bdev in the list, to ensure that bdev subsystem 1452 * shutdown proceeds top-down. The goal is to give virtual bdevs an opportunity 1453 * to detect clean shutdown as opposed to run-time hot removal of the underlying 1454 * base bdevs. 1455 * 1456 * Also, walk the list in the reverse order. 1457 */ 1458 for (bdev = TAILQ_LAST(&g_bdev_mgr.bdevs, spdk_bdev_list); 1459 bdev; bdev = TAILQ_PREV(bdev, spdk_bdev_list, internal.link)) { 1460 if (bdev->internal.claim_module != NULL) { 1461 SPDK_DEBUGLOG(bdev, "Skipping claimed bdev '%s'(<-'%s').\n", 1462 bdev->name, bdev->internal.claim_module->name); 1463 continue; 1464 } 1465 1466 SPDK_DEBUGLOG(bdev, "Unregistering bdev '%s'\n", bdev->name); 1467 spdk_bdev_unregister(bdev, bdev_finish_unregister_bdevs_iter, bdev); 1468 return; 1469 } 1470 1471 /* 1472 * If any bdev fails to unclaim underlying bdev properly, we may face the 1473 * case of bdev list consisting of claimed bdevs only (if claims are managed 1474 * correctly, this would mean there's a loop in the claims graph which is 1475 * clearly impossible). Warn and unregister last bdev on the list then. 1476 */ 1477 for (bdev = TAILQ_LAST(&g_bdev_mgr.bdevs, spdk_bdev_list); 1478 bdev; bdev = TAILQ_PREV(bdev, spdk_bdev_list, internal.link)) { 1479 SPDK_WARNLOG("Unregistering claimed bdev '%s'!\n", bdev->name); 1480 spdk_bdev_unregister(bdev, bdev_finish_unregister_bdevs_iter, bdev); 1481 return; 1482 } 1483 } 1484 1485 void 1486 spdk_bdev_finish(spdk_bdev_fini_cb cb_fn, void *cb_arg) 1487 { 1488 struct spdk_bdev_module *m; 1489 1490 assert(cb_fn != NULL); 1491 1492 g_fini_thread = spdk_get_thread(); 1493 1494 g_fini_cb_fn = cb_fn; 1495 g_fini_cb_arg = cb_arg; 1496 1497 TAILQ_FOREACH(m, &g_bdev_mgr.bdev_modules, internal.tailq) { 1498 if (m->fini_start) { 1499 m->fini_start(); 1500 } 1501 } 1502 1503 bdev_finish_unregister_bdevs_iter(NULL, 0); 1504 } 1505 1506 struct spdk_bdev_io * 1507 bdev_channel_get_io(struct spdk_bdev_channel *channel) 1508 { 1509 struct spdk_bdev_mgmt_channel *ch = channel->shared_resource->mgmt_ch; 1510 struct spdk_bdev_io *bdev_io; 1511 1512 if (ch->per_thread_cache_count > 0) { 1513 bdev_io = STAILQ_FIRST(&ch->per_thread_cache); 1514 STAILQ_REMOVE_HEAD(&ch->per_thread_cache, internal.buf_link); 1515 ch->per_thread_cache_count--; 1516 } else if (spdk_unlikely(!TAILQ_EMPTY(&ch->io_wait_queue))) { 1517 /* 1518 * Don't try to look for bdev_ios in the global pool if there are 1519 * waiters on bdev_ios - we don't want this caller to jump the line. 1520 */ 1521 bdev_io = NULL; 1522 } else { 1523 bdev_io = spdk_mempool_get(g_bdev_mgr.bdev_io_pool); 1524 } 1525 1526 return bdev_io; 1527 } 1528 1529 void 1530 spdk_bdev_free_io(struct spdk_bdev_io *bdev_io) 1531 { 1532 struct spdk_bdev_mgmt_channel *ch; 1533 1534 assert(bdev_io != NULL); 1535 assert(bdev_io->internal.status != SPDK_BDEV_IO_STATUS_PENDING); 1536 1537 ch = bdev_io->internal.ch->shared_resource->mgmt_ch; 1538 1539 if (bdev_io->internal.buf != NULL) { 1540 bdev_io_put_buf(bdev_io); 1541 } 1542 1543 if (ch->per_thread_cache_count < ch->bdev_io_cache_size) { 1544 ch->per_thread_cache_count++; 1545 STAILQ_INSERT_HEAD(&ch->per_thread_cache, bdev_io, internal.buf_link); 1546 while (ch->per_thread_cache_count > 0 && !TAILQ_EMPTY(&ch->io_wait_queue)) { 1547 struct spdk_bdev_io_wait_entry *entry; 1548 1549 entry = TAILQ_FIRST(&ch->io_wait_queue); 1550 TAILQ_REMOVE(&ch->io_wait_queue, entry, link); 1551 entry->cb_fn(entry->cb_arg); 1552 } 1553 } else { 1554 /* We should never have a full cache with entries on the io wait queue. */ 1555 assert(TAILQ_EMPTY(&ch->io_wait_queue)); 1556 spdk_mempool_put(g_bdev_mgr.bdev_io_pool, (void *)bdev_io); 1557 } 1558 } 1559 1560 static bool 1561 bdev_qos_is_iops_rate_limit(enum spdk_bdev_qos_rate_limit_type limit) 1562 { 1563 assert(limit != SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES); 1564 1565 switch (limit) { 1566 case SPDK_BDEV_QOS_RW_IOPS_RATE_LIMIT: 1567 return true; 1568 case SPDK_BDEV_QOS_RW_BPS_RATE_LIMIT: 1569 case SPDK_BDEV_QOS_R_BPS_RATE_LIMIT: 1570 case SPDK_BDEV_QOS_W_BPS_RATE_LIMIT: 1571 return false; 1572 case SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES: 1573 default: 1574 return false; 1575 } 1576 } 1577 1578 static bool 1579 bdev_qos_io_to_limit(struct spdk_bdev_io *bdev_io) 1580 { 1581 switch (bdev_io->type) { 1582 case SPDK_BDEV_IO_TYPE_NVME_IO: 1583 case SPDK_BDEV_IO_TYPE_NVME_IO_MD: 1584 case SPDK_BDEV_IO_TYPE_READ: 1585 case SPDK_BDEV_IO_TYPE_WRITE: 1586 return true; 1587 case SPDK_BDEV_IO_TYPE_ZCOPY: 1588 if (bdev_io->u.bdev.zcopy.start) { 1589 return true; 1590 } else { 1591 return false; 1592 } 1593 default: 1594 return false; 1595 } 1596 } 1597 1598 static bool 1599 bdev_is_read_io(struct spdk_bdev_io *bdev_io) 1600 { 1601 switch (bdev_io->type) { 1602 case SPDK_BDEV_IO_TYPE_NVME_IO: 1603 case SPDK_BDEV_IO_TYPE_NVME_IO_MD: 1604 /* Bit 1 (0x2) set for read operation */ 1605 if (bdev_io->u.nvme_passthru.cmd.opc & SPDK_NVME_OPC_READ) { 1606 return true; 1607 } else { 1608 return false; 1609 } 1610 case SPDK_BDEV_IO_TYPE_READ: 1611 return true; 1612 case SPDK_BDEV_IO_TYPE_ZCOPY: 1613 /* Populate to read from disk */ 1614 if (bdev_io->u.bdev.zcopy.populate) { 1615 return true; 1616 } else { 1617 return false; 1618 } 1619 default: 1620 return false; 1621 } 1622 } 1623 1624 static uint64_t 1625 bdev_get_io_size_in_byte(struct spdk_bdev_io *bdev_io) 1626 { 1627 struct spdk_bdev *bdev = bdev_io->bdev; 1628 1629 switch (bdev_io->type) { 1630 case SPDK_BDEV_IO_TYPE_NVME_IO: 1631 case SPDK_BDEV_IO_TYPE_NVME_IO_MD: 1632 return bdev_io->u.nvme_passthru.nbytes; 1633 case SPDK_BDEV_IO_TYPE_READ: 1634 case SPDK_BDEV_IO_TYPE_WRITE: 1635 return bdev_io->u.bdev.num_blocks * bdev->blocklen; 1636 case SPDK_BDEV_IO_TYPE_ZCOPY: 1637 /* Track the data in the start phase only */ 1638 if (bdev_io->u.bdev.zcopy.start) { 1639 return bdev_io->u.bdev.num_blocks * bdev->blocklen; 1640 } else { 1641 return 0; 1642 } 1643 default: 1644 return 0; 1645 } 1646 } 1647 1648 static bool 1649 bdev_qos_rw_queue_io(const struct spdk_bdev_qos_limit *limit, struct spdk_bdev_io *io) 1650 { 1651 if (limit->max_per_timeslice > 0 && limit->remaining_this_timeslice <= 0) { 1652 return true; 1653 } else { 1654 return false; 1655 } 1656 } 1657 1658 static bool 1659 bdev_qos_r_queue_io(const struct spdk_bdev_qos_limit *limit, struct spdk_bdev_io *io) 1660 { 1661 if (bdev_is_read_io(io) == false) { 1662 return false; 1663 } 1664 1665 return bdev_qos_rw_queue_io(limit, io); 1666 } 1667 1668 static bool 1669 bdev_qos_w_queue_io(const struct spdk_bdev_qos_limit *limit, struct spdk_bdev_io *io) 1670 { 1671 if (bdev_is_read_io(io) == true) { 1672 return false; 1673 } 1674 1675 return bdev_qos_rw_queue_io(limit, io); 1676 } 1677 1678 static void 1679 bdev_qos_rw_iops_update_quota(struct spdk_bdev_qos_limit *limit, struct spdk_bdev_io *io) 1680 { 1681 limit->remaining_this_timeslice--; 1682 } 1683 1684 static void 1685 bdev_qos_rw_bps_update_quota(struct spdk_bdev_qos_limit *limit, struct spdk_bdev_io *io) 1686 { 1687 limit->remaining_this_timeslice -= bdev_get_io_size_in_byte(io); 1688 } 1689 1690 static void 1691 bdev_qos_r_bps_update_quota(struct spdk_bdev_qos_limit *limit, struct spdk_bdev_io *io) 1692 { 1693 if (bdev_is_read_io(io) == false) { 1694 return; 1695 } 1696 1697 return bdev_qos_rw_bps_update_quota(limit, io); 1698 } 1699 1700 static void 1701 bdev_qos_w_bps_update_quota(struct spdk_bdev_qos_limit *limit, struct spdk_bdev_io *io) 1702 { 1703 if (bdev_is_read_io(io) == true) { 1704 return; 1705 } 1706 1707 return bdev_qos_rw_bps_update_quota(limit, io); 1708 } 1709 1710 static void 1711 bdev_qos_set_ops(struct spdk_bdev_qos *qos) 1712 { 1713 int i; 1714 1715 for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) { 1716 if (qos->rate_limits[i].limit == SPDK_BDEV_QOS_LIMIT_NOT_DEFINED) { 1717 qos->rate_limits[i].queue_io = NULL; 1718 qos->rate_limits[i].update_quota = NULL; 1719 continue; 1720 } 1721 1722 switch (i) { 1723 case SPDK_BDEV_QOS_RW_IOPS_RATE_LIMIT: 1724 qos->rate_limits[i].queue_io = bdev_qos_rw_queue_io; 1725 qos->rate_limits[i].update_quota = bdev_qos_rw_iops_update_quota; 1726 break; 1727 case SPDK_BDEV_QOS_RW_BPS_RATE_LIMIT: 1728 qos->rate_limits[i].queue_io = bdev_qos_rw_queue_io; 1729 qos->rate_limits[i].update_quota = bdev_qos_rw_bps_update_quota; 1730 break; 1731 case SPDK_BDEV_QOS_R_BPS_RATE_LIMIT: 1732 qos->rate_limits[i].queue_io = bdev_qos_r_queue_io; 1733 qos->rate_limits[i].update_quota = bdev_qos_r_bps_update_quota; 1734 break; 1735 case SPDK_BDEV_QOS_W_BPS_RATE_LIMIT: 1736 qos->rate_limits[i].queue_io = bdev_qos_w_queue_io; 1737 qos->rate_limits[i].update_quota = bdev_qos_w_bps_update_quota; 1738 break; 1739 default: 1740 break; 1741 } 1742 } 1743 } 1744 1745 static void 1746 _bdev_io_complete_in_submit(struct spdk_bdev_channel *bdev_ch, 1747 struct spdk_bdev_io *bdev_io, 1748 enum spdk_bdev_io_status status) 1749 { 1750 struct spdk_bdev_shared_resource *shared_resource = bdev_ch->shared_resource; 1751 1752 bdev_io->internal.in_submit_request = true; 1753 bdev_ch->io_outstanding++; 1754 shared_resource->io_outstanding++; 1755 spdk_bdev_io_complete(bdev_io, status); 1756 bdev_io->internal.in_submit_request = false; 1757 } 1758 1759 static inline void 1760 bdev_io_do_submit(struct spdk_bdev_channel *bdev_ch, struct spdk_bdev_io *bdev_io) 1761 { 1762 struct spdk_bdev *bdev = bdev_io->bdev; 1763 struct spdk_io_channel *ch = bdev_ch->channel; 1764 struct spdk_bdev_shared_resource *shared_resource = bdev_ch->shared_resource; 1765 1766 if (spdk_unlikely(bdev_io->type == SPDK_BDEV_IO_TYPE_ABORT)) { 1767 struct spdk_bdev_mgmt_channel *mgmt_channel = shared_resource->mgmt_ch; 1768 struct spdk_bdev_io *bio_to_abort = bdev_io->u.abort.bio_to_abort; 1769 1770 if (bdev_abort_queued_io(&shared_resource->nomem_io, bio_to_abort) || 1771 bdev_abort_buf_io(&mgmt_channel->need_buf_small, bio_to_abort) || 1772 bdev_abort_buf_io(&mgmt_channel->need_buf_large, bio_to_abort)) { 1773 _bdev_io_complete_in_submit(bdev_ch, bdev_io, 1774 SPDK_BDEV_IO_STATUS_SUCCESS); 1775 return; 1776 } 1777 } 1778 1779 if (spdk_likely(TAILQ_EMPTY(&shared_resource->nomem_io))) { 1780 bdev_ch->io_outstanding++; 1781 shared_resource->io_outstanding++; 1782 bdev_io->internal.in_submit_request = true; 1783 bdev->fn_table->submit_request(ch, bdev_io); 1784 bdev_io->internal.in_submit_request = false; 1785 } else { 1786 TAILQ_INSERT_TAIL(&shared_resource->nomem_io, bdev_io, internal.link); 1787 } 1788 } 1789 1790 static int 1791 bdev_qos_io_submit(struct spdk_bdev_channel *ch, struct spdk_bdev_qos *qos) 1792 { 1793 struct spdk_bdev_io *bdev_io = NULL, *tmp = NULL; 1794 int i, submitted_ios = 0; 1795 1796 TAILQ_FOREACH_SAFE(bdev_io, &qos->queued, internal.link, tmp) { 1797 if (bdev_qos_io_to_limit(bdev_io) == true) { 1798 for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) { 1799 if (!qos->rate_limits[i].queue_io) { 1800 continue; 1801 } 1802 1803 if (qos->rate_limits[i].queue_io(&qos->rate_limits[i], 1804 bdev_io) == true) { 1805 return submitted_ios; 1806 } 1807 } 1808 for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) { 1809 if (!qos->rate_limits[i].update_quota) { 1810 continue; 1811 } 1812 1813 qos->rate_limits[i].update_quota(&qos->rate_limits[i], bdev_io); 1814 } 1815 } 1816 1817 TAILQ_REMOVE(&qos->queued, bdev_io, internal.link); 1818 bdev_io_do_submit(ch, bdev_io); 1819 submitted_ios++; 1820 } 1821 1822 return submitted_ios; 1823 } 1824 1825 static void 1826 bdev_queue_io_wait_with_cb(struct spdk_bdev_io *bdev_io, spdk_bdev_io_wait_cb cb_fn) 1827 { 1828 int rc; 1829 1830 bdev_io->internal.waitq_entry.bdev = bdev_io->bdev; 1831 bdev_io->internal.waitq_entry.cb_fn = cb_fn; 1832 bdev_io->internal.waitq_entry.cb_arg = bdev_io; 1833 rc = spdk_bdev_queue_io_wait(bdev_io->bdev, spdk_io_channel_from_ctx(bdev_io->internal.ch), 1834 &bdev_io->internal.waitq_entry); 1835 if (rc != 0) { 1836 SPDK_ERRLOG("Queue IO failed, rc=%d\n", rc); 1837 bdev_io->internal.status = SPDK_BDEV_IO_STATUS_FAILED; 1838 bdev_io->internal.cb(bdev_io, false, bdev_io->internal.caller_ctx); 1839 } 1840 } 1841 1842 static bool 1843 bdev_io_type_can_split(uint8_t type) 1844 { 1845 assert(type != SPDK_BDEV_IO_TYPE_INVALID); 1846 assert(type < SPDK_BDEV_NUM_IO_TYPES); 1847 1848 /* Only split READ and WRITE I/O. Theoretically other types of I/O like 1849 * UNMAP could be split, but these types of I/O are typically much larger 1850 * in size (sometimes the size of the entire block device), and the bdev 1851 * module can more efficiently split these types of I/O. Plus those types 1852 * of I/O do not have a payload, which makes the splitting process simpler. 1853 */ 1854 if (type == SPDK_BDEV_IO_TYPE_READ || type == SPDK_BDEV_IO_TYPE_WRITE) { 1855 return true; 1856 } else { 1857 return false; 1858 } 1859 } 1860 1861 static bool 1862 bdev_io_should_split(struct spdk_bdev_io *bdev_io) 1863 { 1864 uint64_t start_stripe, end_stripe; 1865 uint32_t io_boundary = bdev_io->bdev->optimal_io_boundary; 1866 1867 if (io_boundary == 0) { 1868 return false; 1869 } 1870 1871 if (!bdev_io_type_can_split(bdev_io->type)) { 1872 return false; 1873 } 1874 1875 start_stripe = bdev_io->u.bdev.offset_blocks; 1876 end_stripe = start_stripe + bdev_io->u.bdev.num_blocks - 1; 1877 /* Avoid expensive div operations if possible. These spdk_u32 functions are very cheap. */ 1878 if (spdk_likely(spdk_u32_is_pow2(io_boundary))) { 1879 start_stripe >>= spdk_u32log2(io_boundary); 1880 end_stripe >>= spdk_u32log2(io_boundary); 1881 } else { 1882 start_stripe /= io_boundary; 1883 end_stripe /= io_boundary; 1884 } 1885 return (start_stripe != end_stripe); 1886 } 1887 1888 static uint32_t 1889 _to_next_boundary(uint64_t offset, uint32_t boundary) 1890 { 1891 return (boundary - (offset % boundary)); 1892 } 1893 1894 static void 1895 bdev_io_split_done(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg); 1896 1897 static void 1898 _bdev_io_split(void *_bdev_io) 1899 { 1900 struct spdk_bdev_io *bdev_io = _bdev_io; 1901 uint64_t current_offset, remaining; 1902 uint32_t blocklen, to_next_boundary, to_next_boundary_bytes, to_last_block_bytes; 1903 struct iovec *parent_iov, *iov; 1904 uint64_t parent_iov_offset, iov_len; 1905 uint32_t parent_iovpos, parent_iovcnt, child_iovcnt, iovcnt; 1906 void *md_buf = NULL; 1907 int rc; 1908 1909 remaining = bdev_io->u.bdev.split_remaining_num_blocks; 1910 current_offset = bdev_io->u.bdev.split_current_offset_blocks; 1911 blocklen = bdev_io->bdev->blocklen; 1912 parent_iov_offset = (current_offset - bdev_io->u.bdev.offset_blocks) * blocklen; 1913 parent_iovcnt = bdev_io->u.bdev.iovcnt; 1914 1915 for (parent_iovpos = 0; parent_iovpos < parent_iovcnt; parent_iovpos++) { 1916 parent_iov = &bdev_io->u.bdev.iovs[parent_iovpos]; 1917 if (parent_iov_offset < parent_iov->iov_len) { 1918 break; 1919 } 1920 parent_iov_offset -= parent_iov->iov_len; 1921 } 1922 1923 child_iovcnt = 0; 1924 while (remaining > 0 && parent_iovpos < parent_iovcnt && child_iovcnt < BDEV_IO_NUM_CHILD_IOV) { 1925 to_next_boundary = _to_next_boundary(current_offset, bdev_io->bdev->optimal_io_boundary); 1926 to_next_boundary = spdk_min(remaining, to_next_boundary); 1927 to_next_boundary_bytes = to_next_boundary * blocklen; 1928 iov = &bdev_io->child_iov[child_iovcnt]; 1929 iovcnt = 0; 1930 1931 if (bdev_io->u.bdev.md_buf) { 1932 assert((parent_iov_offset % blocklen) > 0); 1933 md_buf = (char *)bdev_io->u.bdev.md_buf + (parent_iov_offset / blocklen) * 1934 spdk_bdev_get_md_size(bdev_io->bdev); 1935 } 1936 1937 while (to_next_boundary_bytes > 0 && parent_iovpos < parent_iovcnt && 1938 child_iovcnt < BDEV_IO_NUM_CHILD_IOV) { 1939 parent_iov = &bdev_io->u.bdev.iovs[parent_iovpos]; 1940 iov_len = spdk_min(to_next_boundary_bytes, parent_iov->iov_len - parent_iov_offset); 1941 to_next_boundary_bytes -= iov_len; 1942 1943 bdev_io->child_iov[child_iovcnt].iov_base = parent_iov->iov_base + parent_iov_offset; 1944 bdev_io->child_iov[child_iovcnt].iov_len = iov_len; 1945 1946 if (iov_len < parent_iov->iov_len - parent_iov_offset) { 1947 parent_iov_offset += iov_len; 1948 } else { 1949 parent_iovpos++; 1950 parent_iov_offset = 0; 1951 } 1952 child_iovcnt++; 1953 iovcnt++; 1954 } 1955 1956 if (to_next_boundary_bytes > 0) { 1957 /* We had to stop this child I/O early because we ran out of 1958 * child_iov space. Ensure the iovs to be aligned with block 1959 * size and then adjust to_next_boundary before starting the 1960 * child I/O. 1961 */ 1962 assert(child_iovcnt == BDEV_IO_NUM_CHILD_IOV); 1963 to_last_block_bytes = to_next_boundary_bytes % blocklen; 1964 if (to_last_block_bytes != 0) { 1965 uint32_t child_iovpos = child_iovcnt - 1; 1966 /* don't decrease child_iovcnt so the loop will naturally end */ 1967 1968 to_last_block_bytes = blocklen - to_last_block_bytes; 1969 to_next_boundary_bytes += to_last_block_bytes; 1970 while (to_last_block_bytes > 0 && iovcnt > 0) { 1971 iov_len = spdk_min(to_last_block_bytes, 1972 bdev_io->child_iov[child_iovpos].iov_len); 1973 bdev_io->child_iov[child_iovpos].iov_len -= iov_len; 1974 if (bdev_io->child_iov[child_iovpos].iov_len == 0) { 1975 child_iovpos--; 1976 if (--iovcnt == 0) { 1977 return; 1978 } 1979 } 1980 to_last_block_bytes -= iov_len; 1981 } 1982 1983 assert(to_last_block_bytes == 0); 1984 } 1985 to_next_boundary -= to_next_boundary_bytes / blocklen; 1986 } 1987 1988 bdev_io->u.bdev.split_outstanding++; 1989 1990 if (bdev_io->type == SPDK_BDEV_IO_TYPE_READ) { 1991 rc = bdev_readv_blocks_with_md(bdev_io->internal.desc, 1992 spdk_io_channel_from_ctx(bdev_io->internal.ch), 1993 iov, iovcnt, md_buf, current_offset, 1994 to_next_boundary, 1995 bdev_io_split_done, bdev_io); 1996 } else { 1997 rc = bdev_writev_blocks_with_md(bdev_io->internal.desc, 1998 spdk_io_channel_from_ctx(bdev_io->internal.ch), 1999 iov, iovcnt, md_buf, current_offset, 2000 to_next_boundary, 2001 bdev_io_split_done, bdev_io); 2002 } 2003 2004 if (rc == 0) { 2005 current_offset += to_next_boundary; 2006 remaining -= to_next_boundary; 2007 bdev_io->u.bdev.split_current_offset_blocks = current_offset; 2008 bdev_io->u.bdev.split_remaining_num_blocks = remaining; 2009 } else { 2010 bdev_io->u.bdev.split_outstanding--; 2011 if (rc == -ENOMEM) { 2012 if (bdev_io->u.bdev.split_outstanding == 0) { 2013 /* No I/O is outstanding. Hence we should wait here. */ 2014 bdev_queue_io_wait_with_cb(bdev_io, _bdev_io_split); 2015 } 2016 } else { 2017 bdev_io->internal.status = SPDK_BDEV_IO_STATUS_FAILED; 2018 if (bdev_io->u.bdev.split_outstanding == 0) { 2019 spdk_trace_record_tsc(spdk_get_ticks(), TRACE_BDEV_IO_DONE, 0, 0, 2020 (uintptr_t)bdev_io, 0); 2021 TAILQ_REMOVE(&bdev_io->internal.ch->io_submitted, bdev_io, internal.ch_link); 2022 bdev_io->internal.cb(bdev_io, false, bdev_io->internal.caller_ctx); 2023 } 2024 } 2025 2026 return; 2027 } 2028 } 2029 } 2030 2031 static void 2032 bdev_io_split_done(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg) 2033 { 2034 struct spdk_bdev_io *parent_io = cb_arg; 2035 2036 spdk_bdev_free_io(bdev_io); 2037 2038 if (!success) { 2039 parent_io->internal.status = SPDK_BDEV_IO_STATUS_FAILED; 2040 /* If any child I/O failed, stop further splitting process. */ 2041 parent_io->u.bdev.split_current_offset_blocks += parent_io->u.bdev.split_remaining_num_blocks; 2042 parent_io->u.bdev.split_remaining_num_blocks = 0; 2043 } 2044 parent_io->u.bdev.split_outstanding--; 2045 if (parent_io->u.bdev.split_outstanding != 0) { 2046 return; 2047 } 2048 2049 /* 2050 * Parent I/O finishes when all blocks are consumed. 2051 */ 2052 if (parent_io->u.bdev.split_remaining_num_blocks == 0) { 2053 assert(parent_io->internal.cb != bdev_io_split_done); 2054 spdk_trace_record_tsc(spdk_get_ticks(), TRACE_BDEV_IO_DONE, 0, 0, 2055 (uintptr_t)parent_io, 0); 2056 TAILQ_REMOVE(&parent_io->internal.ch->io_submitted, parent_io, internal.ch_link); 2057 parent_io->internal.cb(parent_io, parent_io->internal.status == SPDK_BDEV_IO_STATUS_SUCCESS, 2058 parent_io->internal.caller_ctx); 2059 return; 2060 } 2061 2062 /* 2063 * Continue with the splitting process. This function will complete the parent I/O if the 2064 * splitting is done. 2065 */ 2066 _bdev_io_split(parent_io); 2067 } 2068 2069 static void 2070 bdev_io_split_get_buf_cb(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io, bool success); 2071 2072 static void 2073 bdev_io_split(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io) 2074 { 2075 assert(bdev_io_type_can_split(bdev_io->type)); 2076 2077 bdev_io->u.bdev.split_current_offset_blocks = bdev_io->u.bdev.offset_blocks; 2078 bdev_io->u.bdev.split_remaining_num_blocks = bdev_io->u.bdev.num_blocks; 2079 bdev_io->u.bdev.split_outstanding = 0; 2080 bdev_io->internal.status = SPDK_BDEV_IO_STATUS_SUCCESS; 2081 2082 if (_is_buf_allocated(bdev_io->u.bdev.iovs)) { 2083 _bdev_io_split(bdev_io); 2084 } else { 2085 assert(bdev_io->type == SPDK_BDEV_IO_TYPE_READ); 2086 spdk_bdev_io_get_buf(bdev_io, bdev_io_split_get_buf_cb, 2087 bdev_io->u.bdev.num_blocks * bdev_io->bdev->blocklen); 2088 } 2089 } 2090 2091 static void 2092 bdev_io_split_get_buf_cb(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io, bool success) 2093 { 2094 if (!success) { 2095 spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED); 2096 return; 2097 } 2098 2099 _bdev_io_split(bdev_io); 2100 } 2101 2102 /* Explicitly mark this inline, since it's used as a function pointer and otherwise won't 2103 * be inlined, at least on some compilers. 2104 */ 2105 static inline void 2106 _bdev_io_submit(void *ctx) 2107 { 2108 struct spdk_bdev_io *bdev_io = ctx; 2109 struct spdk_bdev *bdev = bdev_io->bdev; 2110 struct spdk_bdev_channel *bdev_ch = bdev_io->internal.ch; 2111 uint64_t tsc; 2112 2113 tsc = spdk_get_ticks(); 2114 bdev_io->internal.submit_tsc = tsc; 2115 spdk_trace_record_tsc(tsc, TRACE_BDEV_IO_START, 0, 0, (uintptr_t)bdev_io, bdev_io->type); 2116 2117 if (spdk_likely(bdev_ch->flags == 0)) { 2118 bdev_io_do_submit(bdev_ch, bdev_io); 2119 return; 2120 } 2121 2122 if (bdev_ch->flags & BDEV_CH_RESET_IN_PROGRESS) { 2123 _bdev_io_complete_in_submit(bdev_ch, bdev_io, SPDK_BDEV_IO_STATUS_ABORTED); 2124 } else if (bdev_ch->flags & BDEV_CH_QOS_ENABLED) { 2125 if (spdk_unlikely(bdev_io->type == SPDK_BDEV_IO_TYPE_ABORT) && 2126 bdev_abort_queued_io(&bdev->internal.qos->queued, bdev_io->u.abort.bio_to_abort)) { 2127 _bdev_io_complete_in_submit(bdev_ch, bdev_io, SPDK_BDEV_IO_STATUS_SUCCESS); 2128 } else { 2129 TAILQ_INSERT_TAIL(&bdev->internal.qos->queued, bdev_io, internal.link); 2130 bdev_qos_io_submit(bdev_ch, bdev->internal.qos); 2131 } 2132 } else { 2133 SPDK_ERRLOG("unknown bdev_ch flag %x found\n", bdev_ch->flags); 2134 _bdev_io_complete_in_submit(bdev_ch, bdev_io, SPDK_BDEV_IO_STATUS_FAILED); 2135 } 2136 } 2137 2138 bool 2139 bdev_lba_range_overlapped(struct lba_range *range1, struct lba_range *range2); 2140 2141 bool 2142 bdev_lba_range_overlapped(struct lba_range *range1, struct lba_range *range2) 2143 { 2144 if (range1->length == 0 || range2->length == 0) { 2145 return false; 2146 } 2147 2148 if (range1->offset + range1->length <= range2->offset) { 2149 return false; 2150 } 2151 2152 if (range2->offset + range2->length <= range1->offset) { 2153 return false; 2154 } 2155 2156 return true; 2157 } 2158 2159 static bool 2160 bdev_io_range_is_locked(struct spdk_bdev_io *bdev_io, struct lba_range *range) 2161 { 2162 struct spdk_bdev_channel *ch = bdev_io->internal.ch; 2163 struct lba_range r; 2164 2165 switch (bdev_io->type) { 2166 case SPDK_BDEV_IO_TYPE_NVME_IO: 2167 case SPDK_BDEV_IO_TYPE_NVME_IO_MD: 2168 /* Don't try to decode the NVMe command - just assume worst-case and that 2169 * it overlaps a locked range. 2170 */ 2171 return true; 2172 case SPDK_BDEV_IO_TYPE_WRITE: 2173 case SPDK_BDEV_IO_TYPE_UNMAP: 2174 case SPDK_BDEV_IO_TYPE_WRITE_ZEROES: 2175 case SPDK_BDEV_IO_TYPE_ZCOPY: 2176 r.offset = bdev_io->u.bdev.offset_blocks; 2177 r.length = bdev_io->u.bdev.num_blocks; 2178 if (!bdev_lba_range_overlapped(range, &r)) { 2179 /* This I/O doesn't overlap the specified LBA range. */ 2180 return false; 2181 } else if (range->owner_ch == ch && range->locked_ctx == bdev_io->internal.caller_ctx) { 2182 /* This I/O overlaps, but the I/O is on the same channel that locked this 2183 * range, and the caller_ctx is the same as the locked_ctx. This means 2184 * that this I/O is associated with the lock, and is allowed to execute. 2185 */ 2186 return false; 2187 } else { 2188 return true; 2189 } 2190 default: 2191 return false; 2192 } 2193 } 2194 2195 void 2196 bdev_io_submit(struct spdk_bdev_io *bdev_io) 2197 { 2198 struct spdk_bdev *bdev = bdev_io->bdev; 2199 struct spdk_thread *thread = spdk_bdev_io_get_thread(bdev_io); 2200 struct spdk_bdev_channel *ch = bdev_io->internal.ch; 2201 2202 assert(thread != NULL); 2203 assert(bdev_io->internal.status == SPDK_BDEV_IO_STATUS_PENDING); 2204 2205 if (!TAILQ_EMPTY(&ch->locked_ranges)) { 2206 struct lba_range *range; 2207 2208 TAILQ_FOREACH(range, &ch->locked_ranges, tailq) { 2209 if (bdev_io_range_is_locked(bdev_io, range)) { 2210 TAILQ_INSERT_TAIL(&ch->io_locked, bdev_io, internal.ch_link); 2211 return; 2212 } 2213 } 2214 } 2215 2216 TAILQ_INSERT_TAIL(&ch->io_submitted, bdev_io, internal.ch_link); 2217 2218 if (bdev->split_on_optimal_io_boundary && bdev_io_should_split(bdev_io)) { 2219 bdev_io->internal.submit_tsc = spdk_get_ticks(); 2220 spdk_trace_record_tsc(bdev_io->internal.submit_tsc, TRACE_BDEV_IO_START, 0, 0, 2221 (uintptr_t)bdev_io, bdev_io->type); 2222 bdev_io_split(NULL, bdev_io); 2223 return; 2224 } 2225 2226 if (ch->flags & BDEV_CH_QOS_ENABLED) { 2227 if ((thread == bdev->internal.qos->thread) || !bdev->internal.qos->thread) { 2228 _bdev_io_submit(bdev_io); 2229 } else { 2230 bdev_io->internal.io_submit_ch = ch; 2231 bdev_io->internal.ch = bdev->internal.qos->ch; 2232 spdk_thread_send_msg(bdev->internal.qos->thread, _bdev_io_submit, bdev_io); 2233 } 2234 } else { 2235 _bdev_io_submit(bdev_io); 2236 } 2237 } 2238 2239 static void 2240 bdev_io_submit_reset(struct spdk_bdev_io *bdev_io) 2241 { 2242 struct spdk_bdev *bdev = bdev_io->bdev; 2243 struct spdk_bdev_channel *bdev_ch = bdev_io->internal.ch; 2244 struct spdk_io_channel *ch = bdev_ch->channel; 2245 2246 assert(bdev_io->internal.status == SPDK_BDEV_IO_STATUS_PENDING); 2247 2248 bdev_io->internal.in_submit_request = true; 2249 bdev->fn_table->submit_request(ch, bdev_io); 2250 bdev_io->internal.in_submit_request = false; 2251 } 2252 2253 void 2254 bdev_io_init(struct spdk_bdev_io *bdev_io, 2255 struct spdk_bdev *bdev, void *cb_arg, 2256 spdk_bdev_io_completion_cb cb) 2257 { 2258 bdev_io->bdev = bdev; 2259 bdev_io->internal.caller_ctx = cb_arg; 2260 bdev_io->internal.cb = cb; 2261 bdev_io->internal.status = SPDK_BDEV_IO_STATUS_PENDING; 2262 bdev_io->internal.in_submit_request = false; 2263 bdev_io->internal.buf = NULL; 2264 bdev_io->internal.io_submit_ch = NULL; 2265 bdev_io->internal.orig_iovs = NULL; 2266 bdev_io->internal.orig_iovcnt = 0; 2267 bdev_io->internal.orig_md_buf = NULL; 2268 bdev_io->internal.error.nvme.cdw0 = 0; 2269 bdev_io->num_retries = 0; 2270 bdev_io->internal.get_buf_cb = NULL; 2271 bdev_io->internal.get_aux_buf_cb = NULL; 2272 } 2273 2274 static bool 2275 bdev_io_type_supported(struct spdk_bdev *bdev, enum spdk_bdev_io_type io_type) 2276 { 2277 return bdev->fn_table->io_type_supported(bdev->ctxt, io_type); 2278 } 2279 2280 bool 2281 spdk_bdev_io_type_supported(struct spdk_bdev *bdev, enum spdk_bdev_io_type io_type) 2282 { 2283 bool supported; 2284 2285 supported = bdev_io_type_supported(bdev, io_type); 2286 2287 if (!supported) { 2288 switch (io_type) { 2289 case SPDK_BDEV_IO_TYPE_WRITE_ZEROES: 2290 /* The bdev layer will emulate write zeroes as long as write is supported. */ 2291 supported = bdev_io_type_supported(bdev, SPDK_BDEV_IO_TYPE_WRITE); 2292 break; 2293 case SPDK_BDEV_IO_TYPE_ZCOPY: 2294 /* Zero copy can be emulated with regular read and write */ 2295 supported = bdev_io_type_supported(bdev, SPDK_BDEV_IO_TYPE_READ) && 2296 bdev_io_type_supported(bdev, SPDK_BDEV_IO_TYPE_WRITE); 2297 break; 2298 default: 2299 break; 2300 } 2301 } 2302 2303 return supported; 2304 } 2305 2306 int 2307 spdk_bdev_dump_info_json(struct spdk_bdev *bdev, struct spdk_json_write_ctx *w) 2308 { 2309 if (bdev->fn_table->dump_info_json) { 2310 return bdev->fn_table->dump_info_json(bdev->ctxt, w); 2311 } 2312 2313 return 0; 2314 } 2315 2316 static void 2317 bdev_qos_update_max_quota_per_timeslice(struct spdk_bdev_qos *qos) 2318 { 2319 uint32_t max_per_timeslice = 0; 2320 int i; 2321 2322 for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) { 2323 if (qos->rate_limits[i].limit == SPDK_BDEV_QOS_LIMIT_NOT_DEFINED) { 2324 qos->rate_limits[i].max_per_timeslice = 0; 2325 continue; 2326 } 2327 2328 max_per_timeslice = qos->rate_limits[i].limit * 2329 SPDK_BDEV_QOS_TIMESLICE_IN_USEC / SPDK_SEC_TO_USEC; 2330 2331 qos->rate_limits[i].max_per_timeslice = spdk_max(max_per_timeslice, 2332 qos->rate_limits[i].min_per_timeslice); 2333 2334 qos->rate_limits[i].remaining_this_timeslice = qos->rate_limits[i].max_per_timeslice; 2335 } 2336 2337 bdev_qos_set_ops(qos); 2338 } 2339 2340 static int 2341 bdev_channel_poll_qos(void *arg) 2342 { 2343 struct spdk_bdev_qos *qos = arg; 2344 uint64_t now = spdk_get_ticks(); 2345 int i; 2346 2347 if (now < (qos->last_timeslice + qos->timeslice_size)) { 2348 /* We received our callback earlier than expected - return 2349 * immediately and wait to do accounting until at least one 2350 * timeslice has actually expired. This should never happen 2351 * with a well-behaved timer implementation. 2352 */ 2353 return SPDK_POLLER_IDLE; 2354 } 2355 2356 /* Reset for next round of rate limiting */ 2357 for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) { 2358 /* We may have allowed the IOs or bytes to slightly overrun in the last 2359 * timeslice. remaining_this_timeslice is signed, so if it's negative 2360 * here, we'll account for the overrun so that the next timeslice will 2361 * be appropriately reduced. 2362 */ 2363 if (qos->rate_limits[i].remaining_this_timeslice > 0) { 2364 qos->rate_limits[i].remaining_this_timeslice = 0; 2365 } 2366 } 2367 2368 while (now >= (qos->last_timeslice + qos->timeslice_size)) { 2369 qos->last_timeslice += qos->timeslice_size; 2370 for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) { 2371 qos->rate_limits[i].remaining_this_timeslice += 2372 qos->rate_limits[i].max_per_timeslice; 2373 } 2374 } 2375 2376 return bdev_qos_io_submit(qos->ch, qos); 2377 } 2378 2379 static void 2380 bdev_channel_destroy_resource(struct spdk_bdev_channel *ch) 2381 { 2382 struct spdk_bdev_shared_resource *shared_resource; 2383 struct lba_range *range; 2384 2385 while (!TAILQ_EMPTY(&ch->locked_ranges)) { 2386 range = TAILQ_FIRST(&ch->locked_ranges); 2387 TAILQ_REMOVE(&ch->locked_ranges, range, tailq); 2388 free(range); 2389 } 2390 2391 spdk_put_io_channel(ch->channel); 2392 2393 shared_resource = ch->shared_resource; 2394 2395 assert(TAILQ_EMPTY(&ch->io_locked)); 2396 assert(TAILQ_EMPTY(&ch->io_submitted)); 2397 assert(ch->io_outstanding == 0); 2398 assert(shared_resource->ref > 0); 2399 shared_resource->ref--; 2400 if (shared_resource->ref == 0) { 2401 assert(shared_resource->io_outstanding == 0); 2402 TAILQ_REMOVE(&shared_resource->mgmt_ch->shared_resources, shared_resource, link); 2403 spdk_put_io_channel(spdk_io_channel_from_ctx(shared_resource->mgmt_ch)); 2404 free(shared_resource); 2405 } 2406 } 2407 2408 /* Caller must hold bdev->internal.mutex. */ 2409 static void 2410 bdev_enable_qos(struct spdk_bdev *bdev, struct spdk_bdev_channel *ch) 2411 { 2412 struct spdk_bdev_qos *qos = bdev->internal.qos; 2413 int i; 2414 2415 /* Rate limiting on this bdev enabled */ 2416 if (qos) { 2417 if (qos->ch == NULL) { 2418 struct spdk_io_channel *io_ch; 2419 2420 SPDK_DEBUGLOG(bdev, "Selecting channel %p as QoS channel for bdev %s on thread %p\n", ch, 2421 bdev->name, spdk_get_thread()); 2422 2423 /* No qos channel has been selected, so set one up */ 2424 2425 /* Take another reference to ch */ 2426 io_ch = spdk_get_io_channel(__bdev_to_io_dev(bdev)); 2427 assert(io_ch != NULL); 2428 qos->ch = ch; 2429 2430 qos->thread = spdk_io_channel_get_thread(io_ch); 2431 2432 TAILQ_INIT(&qos->queued); 2433 2434 for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) { 2435 if (bdev_qos_is_iops_rate_limit(i) == true) { 2436 qos->rate_limits[i].min_per_timeslice = 2437 SPDK_BDEV_QOS_MIN_IO_PER_TIMESLICE; 2438 } else { 2439 qos->rate_limits[i].min_per_timeslice = 2440 SPDK_BDEV_QOS_MIN_BYTE_PER_TIMESLICE; 2441 } 2442 2443 if (qos->rate_limits[i].limit == 0) { 2444 qos->rate_limits[i].limit = SPDK_BDEV_QOS_LIMIT_NOT_DEFINED; 2445 } 2446 } 2447 bdev_qos_update_max_quota_per_timeslice(qos); 2448 qos->timeslice_size = 2449 SPDK_BDEV_QOS_TIMESLICE_IN_USEC * spdk_get_ticks_hz() / SPDK_SEC_TO_USEC; 2450 qos->last_timeslice = spdk_get_ticks(); 2451 qos->poller = SPDK_POLLER_REGISTER(bdev_channel_poll_qos, 2452 qos, 2453 SPDK_BDEV_QOS_TIMESLICE_IN_USEC); 2454 } 2455 2456 ch->flags |= BDEV_CH_QOS_ENABLED; 2457 } 2458 } 2459 2460 struct poll_timeout_ctx { 2461 struct spdk_bdev_desc *desc; 2462 uint64_t timeout_in_sec; 2463 spdk_bdev_io_timeout_cb cb_fn; 2464 void *cb_arg; 2465 }; 2466 2467 static void 2468 bdev_desc_free(struct spdk_bdev_desc *desc) 2469 { 2470 pthread_mutex_destroy(&desc->mutex); 2471 free(desc->media_events_buffer); 2472 free(desc); 2473 } 2474 2475 static void 2476 bdev_channel_poll_timeout_io_done(struct spdk_io_channel_iter *i, int status) 2477 { 2478 struct poll_timeout_ctx *ctx = spdk_io_channel_iter_get_ctx(i); 2479 struct spdk_bdev_desc *desc = ctx->desc; 2480 2481 free(ctx); 2482 2483 pthread_mutex_lock(&desc->mutex); 2484 desc->refs--; 2485 if (desc->closed == true && desc->refs == 0) { 2486 pthread_mutex_unlock(&desc->mutex); 2487 bdev_desc_free(desc); 2488 return; 2489 } 2490 pthread_mutex_unlock(&desc->mutex); 2491 } 2492 2493 static void 2494 bdev_channel_poll_timeout_io(struct spdk_io_channel_iter *i) 2495 { 2496 struct poll_timeout_ctx *ctx = spdk_io_channel_iter_get_ctx(i); 2497 struct spdk_io_channel *io_ch = spdk_io_channel_iter_get_channel(i); 2498 struct spdk_bdev_channel *bdev_ch = spdk_io_channel_get_ctx(io_ch); 2499 struct spdk_bdev_desc *desc = ctx->desc; 2500 struct spdk_bdev_io *bdev_io; 2501 uint64_t now; 2502 2503 pthread_mutex_lock(&desc->mutex); 2504 if (desc->closed == true) { 2505 pthread_mutex_unlock(&desc->mutex); 2506 spdk_for_each_channel_continue(i, -1); 2507 return; 2508 } 2509 pthread_mutex_unlock(&desc->mutex); 2510 2511 now = spdk_get_ticks(); 2512 TAILQ_FOREACH(bdev_io, &bdev_ch->io_submitted, internal.ch_link) { 2513 /* Exclude any I/O that are generated via splitting. */ 2514 if (bdev_io->internal.cb == bdev_io_split_done) { 2515 continue; 2516 } 2517 2518 /* Once we find an I/O that has not timed out, we can immediately 2519 * exit the loop. 2520 */ 2521 if (now < (bdev_io->internal.submit_tsc + 2522 ctx->timeout_in_sec * spdk_get_ticks_hz())) { 2523 goto end; 2524 } 2525 2526 if (bdev_io->internal.desc == desc) { 2527 ctx->cb_fn(ctx->cb_arg, bdev_io); 2528 } 2529 } 2530 2531 end: 2532 spdk_for_each_channel_continue(i, 0); 2533 } 2534 2535 static int 2536 bdev_poll_timeout_io(void *arg) 2537 { 2538 struct spdk_bdev_desc *desc = arg; 2539 struct spdk_bdev *bdev = spdk_bdev_desc_get_bdev(desc); 2540 struct poll_timeout_ctx *ctx; 2541 2542 ctx = calloc(1, sizeof(struct poll_timeout_ctx)); 2543 if (!ctx) { 2544 SPDK_ERRLOG("failed to allocate memory\n"); 2545 return SPDK_POLLER_BUSY; 2546 } 2547 ctx->desc = desc; 2548 ctx->cb_arg = desc->cb_arg; 2549 ctx->cb_fn = desc->cb_fn; 2550 ctx->timeout_in_sec = desc->timeout_in_sec; 2551 2552 /* Take a ref on the descriptor in case it gets closed while we are checking 2553 * all of the channels. 2554 */ 2555 pthread_mutex_lock(&desc->mutex); 2556 desc->refs++; 2557 pthread_mutex_unlock(&desc->mutex); 2558 2559 spdk_for_each_channel(__bdev_to_io_dev(bdev), 2560 bdev_channel_poll_timeout_io, 2561 ctx, 2562 bdev_channel_poll_timeout_io_done); 2563 2564 return SPDK_POLLER_BUSY; 2565 } 2566 2567 int 2568 spdk_bdev_set_timeout(struct spdk_bdev_desc *desc, uint64_t timeout_in_sec, 2569 spdk_bdev_io_timeout_cb cb_fn, void *cb_arg) 2570 { 2571 assert(desc->thread == spdk_get_thread()); 2572 2573 spdk_poller_unregister(&desc->io_timeout_poller); 2574 2575 if (timeout_in_sec) { 2576 assert(cb_fn != NULL); 2577 desc->io_timeout_poller = SPDK_POLLER_REGISTER(bdev_poll_timeout_io, 2578 desc, 2579 SPDK_BDEV_IO_POLL_INTERVAL_IN_MSEC * SPDK_SEC_TO_USEC / 2580 1000); 2581 if (desc->io_timeout_poller == NULL) { 2582 SPDK_ERRLOG("can not register the desc timeout IO poller\n"); 2583 return -1; 2584 } 2585 } 2586 2587 desc->cb_fn = cb_fn; 2588 desc->cb_arg = cb_arg; 2589 desc->timeout_in_sec = timeout_in_sec; 2590 2591 return 0; 2592 } 2593 2594 static int 2595 bdev_channel_create(void *io_device, void *ctx_buf) 2596 { 2597 struct spdk_bdev *bdev = __bdev_from_io_dev(io_device); 2598 struct spdk_bdev_channel *ch = ctx_buf; 2599 struct spdk_io_channel *mgmt_io_ch; 2600 struct spdk_bdev_mgmt_channel *mgmt_ch; 2601 struct spdk_bdev_shared_resource *shared_resource; 2602 struct lba_range *range; 2603 2604 ch->bdev = bdev; 2605 ch->channel = bdev->fn_table->get_io_channel(bdev->ctxt); 2606 if (!ch->channel) { 2607 return -1; 2608 } 2609 2610 assert(ch->histogram == NULL); 2611 if (bdev->internal.histogram_enabled) { 2612 ch->histogram = spdk_histogram_data_alloc(); 2613 if (ch->histogram == NULL) { 2614 SPDK_ERRLOG("Could not allocate histogram\n"); 2615 } 2616 } 2617 2618 mgmt_io_ch = spdk_get_io_channel(&g_bdev_mgr); 2619 if (!mgmt_io_ch) { 2620 spdk_put_io_channel(ch->channel); 2621 return -1; 2622 } 2623 2624 mgmt_ch = spdk_io_channel_get_ctx(mgmt_io_ch); 2625 TAILQ_FOREACH(shared_resource, &mgmt_ch->shared_resources, link) { 2626 if (shared_resource->shared_ch == ch->channel) { 2627 spdk_put_io_channel(mgmt_io_ch); 2628 shared_resource->ref++; 2629 break; 2630 } 2631 } 2632 2633 if (shared_resource == NULL) { 2634 shared_resource = calloc(1, sizeof(*shared_resource)); 2635 if (shared_resource == NULL) { 2636 spdk_put_io_channel(ch->channel); 2637 spdk_put_io_channel(mgmt_io_ch); 2638 return -1; 2639 } 2640 2641 shared_resource->mgmt_ch = mgmt_ch; 2642 shared_resource->io_outstanding = 0; 2643 TAILQ_INIT(&shared_resource->nomem_io); 2644 shared_resource->nomem_threshold = 0; 2645 shared_resource->shared_ch = ch->channel; 2646 shared_resource->ref = 1; 2647 TAILQ_INSERT_TAIL(&mgmt_ch->shared_resources, shared_resource, link); 2648 } 2649 2650 memset(&ch->stat, 0, sizeof(ch->stat)); 2651 ch->stat.ticks_rate = spdk_get_ticks_hz(); 2652 ch->io_outstanding = 0; 2653 TAILQ_INIT(&ch->queued_resets); 2654 TAILQ_INIT(&ch->locked_ranges); 2655 ch->flags = 0; 2656 ch->shared_resource = shared_resource; 2657 2658 TAILQ_INIT(&ch->io_submitted); 2659 TAILQ_INIT(&ch->io_locked); 2660 2661 #ifdef SPDK_CONFIG_VTUNE 2662 { 2663 char *name; 2664 __itt_init_ittlib(NULL, 0); 2665 name = spdk_sprintf_alloc("spdk_bdev_%s_%p", ch->bdev->name, ch); 2666 if (!name) { 2667 bdev_channel_destroy_resource(ch); 2668 return -1; 2669 } 2670 ch->handle = __itt_string_handle_create(name); 2671 free(name); 2672 ch->start_tsc = spdk_get_ticks(); 2673 ch->interval_tsc = spdk_get_ticks_hz() / 100; 2674 memset(&ch->prev_stat, 0, sizeof(ch->prev_stat)); 2675 } 2676 #endif 2677 2678 pthread_mutex_lock(&bdev->internal.mutex); 2679 bdev_enable_qos(bdev, ch); 2680 2681 TAILQ_FOREACH(range, &bdev->internal.locked_ranges, tailq) { 2682 struct lba_range *new_range; 2683 2684 new_range = calloc(1, sizeof(*new_range)); 2685 if (new_range == NULL) { 2686 pthread_mutex_unlock(&bdev->internal.mutex); 2687 bdev_channel_destroy_resource(ch); 2688 return -1; 2689 } 2690 new_range->length = range->length; 2691 new_range->offset = range->offset; 2692 new_range->locked_ctx = range->locked_ctx; 2693 TAILQ_INSERT_TAIL(&ch->locked_ranges, new_range, tailq); 2694 } 2695 2696 pthread_mutex_unlock(&bdev->internal.mutex); 2697 2698 return 0; 2699 } 2700 2701 /* 2702 * Abort I/O that are waiting on a data buffer. These types of I/O are 2703 * linked using the spdk_bdev_io internal.buf_link TAILQ_ENTRY. 2704 */ 2705 static void 2706 bdev_abort_all_buf_io(bdev_io_stailq_t *queue, struct spdk_bdev_channel *ch) 2707 { 2708 bdev_io_stailq_t tmp; 2709 struct spdk_bdev_io *bdev_io; 2710 2711 STAILQ_INIT(&tmp); 2712 2713 while (!STAILQ_EMPTY(queue)) { 2714 bdev_io = STAILQ_FIRST(queue); 2715 STAILQ_REMOVE_HEAD(queue, internal.buf_link); 2716 if (bdev_io->internal.ch == ch) { 2717 spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_ABORTED); 2718 } else { 2719 STAILQ_INSERT_TAIL(&tmp, bdev_io, internal.buf_link); 2720 } 2721 } 2722 2723 STAILQ_SWAP(&tmp, queue, spdk_bdev_io); 2724 } 2725 2726 /* 2727 * Abort I/O that are queued waiting for submission. These types of I/O are 2728 * linked using the spdk_bdev_io link TAILQ_ENTRY. 2729 */ 2730 static void 2731 bdev_abort_all_queued_io(bdev_io_tailq_t *queue, struct spdk_bdev_channel *ch) 2732 { 2733 struct spdk_bdev_io *bdev_io, *tmp; 2734 2735 TAILQ_FOREACH_SAFE(bdev_io, queue, internal.link, tmp) { 2736 if (bdev_io->internal.ch == ch) { 2737 TAILQ_REMOVE(queue, bdev_io, internal.link); 2738 /* 2739 * spdk_bdev_io_complete() assumes that the completed I/O had 2740 * been submitted to the bdev module. Since in this case it 2741 * hadn't, bump io_outstanding to account for the decrement 2742 * that spdk_bdev_io_complete() will do. 2743 */ 2744 if (bdev_io->type != SPDK_BDEV_IO_TYPE_RESET) { 2745 ch->io_outstanding++; 2746 ch->shared_resource->io_outstanding++; 2747 } 2748 spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_ABORTED); 2749 } 2750 } 2751 } 2752 2753 static bool 2754 bdev_abort_queued_io(bdev_io_tailq_t *queue, struct spdk_bdev_io *bio_to_abort) 2755 { 2756 struct spdk_bdev_io *bdev_io; 2757 2758 TAILQ_FOREACH(bdev_io, queue, internal.link) { 2759 if (bdev_io == bio_to_abort) { 2760 TAILQ_REMOVE(queue, bio_to_abort, internal.link); 2761 spdk_bdev_io_complete(bio_to_abort, SPDK_BDEV_IO_STATUS_ABORTED); 2762 return true; 2763 } 2764 } 2765 2766 return false; 2767 } 2768 2769 static bool 2770 bdev_abort_buf_io(bdev_io_stailq_t *queue, struct spdk_bdev_io *bio_to_abort) 2771 { 2772 struct spdk_bdev_io *bdev_io; 2773 2774 STAILQ_FOREACH(bdev_io, queue, internal.buf_link) { 2775 if (bdev_io == bio_to_abort) { 2776 STAILQ_REMOVE(queue, bio_to_abort, spdk_bdev_io, internal.buf_link); 2777 spdk_bdev_io_complete(bio_to_abort, SPDK_BDEV_IO_STATUS_ABORTED); 2778 return true; 2779 } 2780 } 2781 2782 return false; 2783 } 2784 2785 static void 2786 bdev_qos_channel_destroy(void *cb_arg) 2787 { 2788 struct spdk_bdev_qos *qos = cb_arg; 2789 2790 spdk_put_io_channel(spdk_io_channel_from_ctx(qos->ch)); 2791 spdk_poller_unregister(&qos->poller); 2792 2793 SPDK_DEBUGLOG(bdev, "Free QoS %p.\n", qos); 2794 2795 free(qos); 2796 } 2797 2798 static int 2799 bdev_qos_destroy(struct spdk_bdev *bdev) 2800 { 2801 int i; 2802 2803 /* 2804 * Cleanly shutting down the QoS poller is tricky, because 2805 * during the asynchronous operation the user could open 2806 * a new descriptor and create a new channel, spawning 2807 * a new QoS poller. 2808 * 2809 * The strategy is to create a new QoS structure here and swap it 2810 * in. The shutdown path then continues to refer to the old one 2811 * until it completes and then releases it. 2812 */ 2813 struct spdk_bdev_qos *new_qos, *old_qos; 2814 2815 old_qos = bdev->internal.qos; 2816 2817 new_qos = calloc(1, sizeof(*new_qos)); 2818 if (!new_qos) { 2819 SPDK_ERRLOG("Unable to allocate memory to shut down QoS.\n"); 2820 return -ENOMEM; 2821 } 2822 2823 /* Copy the old QoS data into the newly allocated structure */ 2824 memcpy(new_qos, old_qos, sizeof(*new_qos)); 2825 2826 /* Zero out the key parts of the QoS structure */ 2827 new_qos->ch = NULL; 2828 new_qos->thread = NULL; 2829 new_qos->poller = NULL; 2830 TAILQ_INIT(&new_qos->queued); 2831 /* 2832 * The limit member of spdk_bdev_qos_limit structure is not zeroed. 2833 * It will be used later for the new QoS structure. 2834 */ 2835 for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) { 2836 new_qos->rate_limits[i].remaining_this_timeslice = 0; 2837 new_qos->rate_limits[i].min_per_timeslice = 0; 2838 new_qos->rate_limits[i].max_per_timeslice = 0; 2839 } 2840 2841 bdev->internal.qos = new_qos; 2842 2843 if (old_qos->thread == NULL) { 2844 free(old_qos); 2845 } else { 2846 spdk_thread_send_msg(old_qos->thread, bdev_qos_channel_destroy, old_qos); 2847 } 2848 2849 /* It is safe to continue with destroying the bdev even though the QoS channel hasn't 2850 * been destroyed yet. The destruction path will end up waiting for the final 2851 * channel to be put before it releases resources. */ 2852 2853 return 0; 2854 } 2855 2856 static void 2857 bdev_io_stat_add(struct spdk_bdev_io_stat *total, struct spdk_bdev_io_stat *add) 2858 { 2859 total->bytes_read += add->bytes_read; 2860 total->num_read_ops += add->num_read_ops; 2861 total->bytes_written += add->bytes_written; 2862 total->num_write_ops += add->num_write_ops; 2863 total->bytes_unmapped += add->bytes_unmapped; 2864 total->num_unmap_ops += add->num_unmap_ops; 2865 total->read_latency_ticks += add->read_latency_ticks; 2866 total->write_latency_ticks += add->write_latency_ticks; 2867 total->unmap_latency_ticks += add->unmap_latency_ticks; 2868 } 2869 2870 static void 2871 bdev_channel_destroy(void *io_device, void *ctx_buf) 2872 { 2873 struct spdk_bdev_channel *ch = ctx_buf; 2874 struct spdk_bdev_mgmt_channel *mgmt_ch; 2875 struct spdk_bdev_shared_resource *shared_resource = ch->shared_resource; 2876 2877 SPDK_DEBUGLOG(bdev, "Destroying channel %p for bdev %s on thread %p\n", ch, ch->bdev->name, 2878 spdk_get_thread()); 2879 2880 /* This channel is going away, so add its statistics into the bdev so that they don't get lost. */ 2881 pthread_mutex_lock(&ch->bdev->internal.mutex); 2882 bdev_io_stat_add(&ch->bdev->internal.stat, &ch->stat); 2883 pthread_mutex_unlock(&ch->bdev->internal.mutex); 2884 2885 mgmt_ch = shared_resource->mgmt_ch; 2886 2887 bdev_abort_all_queued_io(&ch->queued_resets, ch); 2888 bdev_abort_all_queued_io(&shared_resource->nomem_io, ch); 2889 bdev_abort_all_buf_io(&mgmt_ch->need_buf_small, ch); 2890 bdev_abort_all_buf_io(&mgmt_ch->need_buf_large, ch); 2891 2892 if (ch->histogram) { 2893 spdk_histogram_data_free(ch->histogram); 2894 } 2895 2896 bdev_channel_destroy_resource(ch); 2897 } 2898 2899 int 2900 spdk_bdev_alias_add(struct spdk_bdev *bdev, const char *alias) 2901 { 2902 struct spdk_bdev_alias *tmp; 2903 2904 if (alias == NULL) { 2905 SPDK_ERRLOG("Empty alias passed\n"); 2906 return -EINVAL; 2907 } 2908 2909 if (spdk_bdev_get_by_name(alias)) { 2910 SPDK_ERRLOG("Bdev name/alias: %s already exists\n", alias); 2911 return -EEXIST; 2912 } 2913 2914 tmp = calloc(1, sizeof(*tmp)); 2915 if (tmp == NULL) { 2916 SPDK_ERRLOG("Unable to allocate alias\n"); 2917 return -ENOMEM; 2918 } 2919 2920 tmp->alias = strdup(alias); 2921 if (tmp->alias == NULL) { 2922 free(tmp); 2923 SPDK_ERRLOG("Unable to allocate alias\n"); 2924 return -ENOMEM; 2925 } 2926 2927 TAILQ_INSERT_TAIL(&bdev->aliases, tmp, tailq); 2928 2929 return 0; 2930 } 2931 2932 int 2933 spdk_bdev_alias_del(struct spdk_bdev *bdev, const char *alias) 2934 { 2935 struct spdk_bdev_alias *tmp; 2936 2937 TAILQ_FOREACH(tmp, &bdev->aliases, tailq) { 2938 if (strcmp(alias, tmp->alias) == 0) { 2939 TAILQ_REMOVE(&bdev->aliases, tmp, tailq); 2940 free(tmp->alias); 2941 free(tmp); 2942 return 0; 2943 } 2944 } 2945 2946 SPDK_INFOLOG(bdev, "Alias %s does not exists\n", alias); 2947 2948 return -ENOENT; 2949 } 2950 2951 void 2952 spdk_bdev_alias_del_all(struct spdk_bdev *bdev) 2953 { 2954 struct spdk_bdev_alias *p, *tmp; 2955 2956 TAILQ_FOREACH_SAFE(p, &bdev->aliases, tailq, tmp) { 2957 TAILQ_REMOVE(&bdev->aliases, p, tailq); 2958 free(p->alias); 2959 free(p); 2960 } 2961 } 2962 2963 struct spdk_io_channel * 2964 spdk_bdev_get_io_channel(struct spdk_bdev_desc *desc) 2965 { 2966 return spdk_get_io_channel(__bdev_to_io_dev(spdk_bdev_desc_get_bdev(desc))); 2967 } 2968 2969 const char * 2970 spdk_bdev_get_name(const struct spdk_bdev *bdev) 2971 { 2972 return bdev->name; 2973 } 2974 2975 const char * 2976 spdk_bdev_get_product_name(const struct spdk_bdev *bdev) 2977 { 2978 return bdev->product_name; 2979 } 2980 2981 const struct spdk_bdev_aliases_list * 2982 spdk_bdev_get_aliases(const struct spdk_bdev *bdev) 2983 { 2984 return &bdev->aliases; 2985 } 2986 2987 uint32_t 2988 spdk_bdev_get_block_size(const struct spdk_bdev *bdev) 2989 { 2990 return bdev->blocklen; 2991 } 2992 2993 uint32_t 2994 spdk_bdev_get_write_unit_size(const struct spdk_bdev *bdev) 2995 { 2996 return bdev->write_unit_size; 2997 } 2998 2999 uint64_t 3000 spdk_bdev_get_num_blocks(const struct spdk_bdev *bdev) 3001 { 3002 return bdev->blockcnt; 3003 } 3004 3005 const char * 3006 spdk_bdev_get_qos_rpc_type(enum spdk_bdev_qos_rate_limit_type type) 3007 { 3008 return qos_rpc_type[type]; 3009 } 3010 3011 void 3012 spdk_bdev_get_qos_rate_limits(struct spdk_bdev *bdev, uint64_t *limits) 3013 { 3014 int i; 3015 3016 memset(limits, 0, sizeof(*limits) * SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES); 3017 3018 pthread_mutex_lock(&bdev->internal.mutex); 3019 if (bdev->internal.qos) { 3020 for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) { 3021 if (bdev->internal.qos->rate_limits[i].limit != 3022 SPDK_BDEV_QOS_LIMIT_NOT_DEFINED) { 3023 limits[i] = bdev->internal.qos->rate_limits[i].limit; 3024 if (bdev_qos_is_iops_rate_limit(i) == false) { 3025 /* Change from Byte to Megabyte which is user visible. */ 3026 limits[i] = limits[i] / 1024 / 1024; 3027 } 3028 } 3029 } 3030 } 3031 pthread_mutex_unlock(&bdev->internal.mutex); 3032 } 3033 3034 size_t 3035 spdk_bdev_get_buf_align(const struct spdk_bdev *bdev) 3036 { 3037 return 1 << bdev->required_alignment; 3038 } 3039 3040 uint32_t 3041 spdk_bdev_get_optimal_io_boundary(const struct spdk_bdev *bdev) 3042 { 3043 return bdev->optimal_io_boundary; 3044 } 3045 3046 bool 3047 spdk_bdev_has_write_cache(const struct spdk_bdev *bdev) 3048 { 3049 return bdev->write_cache; 3050 } 3051 3052 const struct spdk_uuid * 3053 spdk_bdev_get_uuid(const struct spdk_bdev *bdev) 3054 { 3055 return &bdev->uuid; 3056 } 3057 3058 uint16_t 3059 spdk_bdev_get_acwu(const struct spdk_bdev *bdev) 3060 { 3061 return bdev->acwu; 3062 } 3063 3064 uint32_t 3065 spdk_bdev_get_md_size(const struct spdk_bdev *bdev) 3066 { 3067 return bdev->md_len; 3068 } 3069 3070 bool 3071 spdk_bdev_is_md_interleaved(const struct spdk_bdev *bdev) 3072 { 3073 return (bdev->md_len != 0) && bdev->md_interleave; 3074 } 3075 3076 bool 3077 spdk_bdev_is_md_separate(const struct spdk_bdev *bdev) 3078 { 3079 return (bdev->md_len != 0) && !bdev->md_interleave; 3080 } 3081 3082 bool 3083 spdk_bdev_is_zoned(const struct spdk_bdev *bdev) 3084 { 3085 return bdev->zoned; 3086 } 3087 3088 uint32_t 3089 spdk_bdev_get_data_block_size(const struct spdk_bdev *bdev) 3090 { 3091 if (spdk_bdev_is_md_interleaved(bdev)) { 3092 return bdev->blocklen - bdev->md_len; 3093 } else { 3094 return bdev->blocklen; 3095 } 3096 } 3097 3098 static uint32_t 3099 _bdev_get_block_size_with_md(const struct spdk_bdev *bdev) 3100 { 3101 if (!spdk_bdev_is_md_interleaved(bdev)) { 3102 return bdev->blocklen + bdev->md_len; 3103 } else { 3104 return bdev->blocklen; 3105 } 3106 } 3107 3108 enum spdk_dif_type spdk_bdev_get_dif_type(const struct spdk_bdev *bdev) 3109 { 3110 if (bdev->md_len != 0) { 3111 return bdev->dif_type; 3112 } else { 3113 return SPDK_DIF_DISABLE; 3114 } 3115 } 3116 3117 bool 3118 spdk_bdev_is_dif_head_of_md(const struct spdk_bdev *bdev) 3119 { 3120 if (spdk_bdev_get_dif_type(bdev) != SPDK_DIF_DISABLE) { 3121 return bdev->dif_is_head_of_md; 3122 } else { 3123 return false; 3124 } 3125 } 3126 3127 bool 3128 spdk_bdev_is_dif_check_enabled(const struct spdk_bdev *bdev, 3129 enum spdk_dif_check_type check_type) 3130 { 3131 if (spdk_bdev_get_dif_type(bdev) == SPDK_DIF_DISABLE) { 3132 return false; 3133 } 3134 3135 switch (check_type) { 3136 case SPDK_DIF_CHECK_TYPE_REFTAG: 3137 return (bdev->dif_check_flags & SPDK_DIF_FLAGS_REFTAG_CHECK) != 0; 3138 case SPDK_DIF_CHECK_TYPE_APPTAG: 3139 return (bdev->dif_check_flags & SPDK_DIF_FLAGS_APPTAG_CHECK) != 0; 3140 case SPDK_DIF_CHECK_TYPE_GUARD: 3141 return (bdev->dif_check_flags & SPDK_DIF_FLAGS_GUARD_CHECK) != 0; 3142 default: 3143 return false; 3144 } 3145 } 3146 3147 uint64_t 3148 spdk_bdev_get_qd(const struct spdk_bdev *bdev) 3149 { 3150 return bdev->internal.measured_queue_depth; 3151 } 3152 3153 uint64_t 3154 spdk_bdev_get_qd_sampling_period(const struct spdk_bdev *bdev) 3155 { 3156 return bdev->internal.period; 3157 } 3158 3159 uint64_t 3160 spdk_bdev_get_weighted_io_time(const struct spdk_bdev *bdev) 3161 { 3162 return bdev->internal.weighted_io_time; 3163 } 3164 3165 uint64_t 3166 spdk_bdev_get_io_time(const struct spdk_bdev *bdev) 3167 { 3168 return bdev->internal.io_time; 3169 } 3170 3171 static void 3172 _calculate_measured_qd_cpl(struct spdk_io_channel_iter *i, int status) 3173 { 3174 struct spdk_bdev *bdev = spdk_io_channel_iter_get_ctx(i); 3175 3176 bdev->internal.measured_queue_depth = bdev->internal.temporary_queue_depth; 3177 3178 if (bdev->internal.measured_queue_depth) { 3179 bdev->internal.io_time += bdev->internal.period; 3180 bdev->internal.weighted_io_time += bdev->internal.period * bdev->internal.measured_queue_depth; 3181 } 3182 } 3183 3184 static void 3185 _calculate_measured_qd(struct spdk_io_channel_iter *i) 3186 { 3187 struct spdk_bdev *bdev = spdk_io_channel_iter_get_ctx(i); 3188 struct spdk_io_channel *io_ch = spdk_io_channel_iter_get_channel(i); 3189 struct spdk_bdev_channel *ch = spdk_io_channel_get_ctx(io_ch); 3190 3191 bdev->internal.temporary_queue_depth += ch->io_outstanding; 3192 spdk_for_each_channel_continue(i, 0); 3193 } 3194 3195 static int 3196 bdev_calculate_measured_queue_depth(void *ctx) 3197 { 3198 struct spdk_bdev *bdev = ctx; 3199 bdev->internal.temporary_queue_depth = 0; 3200 spdk_for_each_channel(__bdev_to_io_dev(bdev), _calculate_measured_qd, bdev, 3201 _calculate_measured_qd_cpl); 3202 return SPDK_POLLER_BUSY; 3203 } 3204 3205 void 3206 spdk_bdev_set_qd_sampling_period(struct spdk_bdev *bdev, uint64_t period) 3207 { 3208 bdev->internal.period = period; 3209 3210 if (bdev->internal.qd_poller != NULL) { 3211 spdk_poller_unregister(&bdev->internal.qd_poller); 3212 bdev->internal.measured_queue_depth = UINT64_MAX; 3213 } 3214 3215 if (period != 0) { 3216 bdev->internal.qd_poller = SPDK_POLLER_REGISTER(bdev_calculate_measured_queue_depth, bdev, 3217 period); 3218 } 3219 } 3220 3221 static void 3222 _resize_notify(void *arg) 3223 { 3224 struct spdk_bdev_desc *desc = arg; 3225 3226 pthread_mutex_lock(&desc->mutex); 3227 desc->refs--; 3228 if (!desc->closed) { 3229 pthread_mutex_unlock(&desc->mutex); 3230 desc->callback.event_fn(SPDK_BDEV_EVENT_RESIZE, 3231 desc->bdev, 3232 desc->callback.ctx); 3233 return; 3234 } else if (0 == desc->refs) { 3235 /* This descriptor was closed after this resize_notify message was sent. 3236 * spdk_bdev_close() could not free the descriptor since this message was 3237 * in flight, so we free it now using bdev_desc_free(). 3238 */ 3239 pthread_mutex_unlock(&desc->mutex); 3240 bdev_desc_free(desc); 3241 return; 3242 } 3243 pthread_mutex_unlock(&desc->mutex); 3244 } 3245 3246 int 3247 spdk_bdev_notify_blockcnt_change(struct spdk_bdev *bdev, uint64_t size) 3248 { 3249 struct spdk_bdev_desc *desc; 3250 int ret; 3251 3252 pthread_mutex_lock(&bdev->internal.mutex); 3253 3254 /* bdev has open descriptors */ 3255 if (!TAILQ_EMPTY(&bdev->internal.open_descs) && 3256 bdev->blockcnt > size) { 3257 ret = -EBUSY; 3258 } else { 3259 bdev->blockcnt = size; 3260 TAILQ_FOREACH(desc, &bdev->internal.open_descs, link) { 3261 pthread_mutex_lock(&desc->mutex); 3262 if (desc->callback.open_with_ext && !desc->closed) { 3263 desc->refs++; 3264 spdk_thread_send_msg(desc->thread, _resize_notify, desc); 3265 } 3266 pthread_mutex_unlock(&desc->mutex); 3267 } 3268 ret = 0; 3269 } 3270 3271 pthread_mutex_unlock(&bdev->internal.mutex); 3272 3273 return ret; 3274 } 3275 3276 /* 3277 * Convert I/O offset and length from bytes to blocks. 3278 * 3279 * Returns zero on success or non-zero if the byte parameters aren't divisible by the block size. 3280 */ 3281 static uint64_t 3282 bdev_bytes_to_blocks(struct spdk_bdev *bdev, uint64_t offset_bytes, uint64_t *offset_blocks, 3283 uint64_t num_bytes, uint64_t *num_blocks) 3284 { 3285 uint32_t block_size = bdev->blocklen; 3286 uint8_t shift_cnt; 3287 3288 /* Avoid expensive div operations if possible. These spdk_u32 functions are very cheap. */ 3289 if (spdk_likely(spdk_u32_is_pow2(block_size))) { 3290 shift_cnt = spdk_u32log2(block_size); 3291 *offset_blocks = offset_bytes >> shift_cnt; 3292 *num_blocks = num_bytes >> shift_cnt; 3293 return (offset_bytes - (*offset_blocks << shift_cnt)) | 3294 (num_bytes - (*num_blocks << shift_cnt)); 3295 } else { 3296 *offset_blocks = offset_bytes / block_size; 3297 *num_blocks = num_bytes / block_size; 3298 return (offset_bytes % block_size) | (num_bytes % block_size); 3299 } 3300 } 3301 3302 static bool 3303 bdev_io_valid_blocks(struct spdk_bdev *bdev, uint64_t offset_blocks, uint64_t num_blocks) 3304 { 3305 /* Return failure if offset_blocks + num_blocks is less than offset_blocks; indicates there 3306 * has been an overflow and hence the offset has been wrapped around */ 3307 if (offset_blocks + num_blocks < offset_blocks) { 3308 return false; 3309 } 3310 3311 /* Return failure if offset_blocks + num_blocks exceeds the size of the bdev */ 3312 if (offset_blocks + num_blocks > bdev->blockcnt) { 3313 return false; 3314 } 3315 3316 return true; 3317 } 3318 3319 static bool 3320 _bdev_io_check_md_buf(const struct iovec *iovs, const void *md_buf) 3321 { 3322 return _is_buf_allocated(iovs) == (md_buf != NULL); 3323 } 3324 3325 static int 3326 bdev_read_blocks_with_md(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, void *buf, 3327 void *md_buf, int64_t offset_blocks, uint64_t num_blocks, 3328 spdk_bdev_io_completion_cb cb, void *cb_arg) 3329 { 3330 struct spdk_bdev *bdev = spdk_bdev_desc_get_bdev(desc); 3331 struct spdk_bdev_io *bdev_io; 3332 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 3333 3334 if (!bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) { 3335 return -EINVAL; 3336 } 3337 3338 bdev_io = bdev_channel_get_io(channel); 3339 if (!bdev_io) { 3340 return -ENOMEM; 3341 } 3342 3343 bdev_io->internal.ch = channel; 3344 bdev_io->internal.desc = desc; 3345 bdev_io->type = SPDK_BDEV_IO_TYPE_READ; 3346 bdev_io->u.bdev.iovs = &bdev_io->iov; 3347 bdev_io->u.bdev.iovs[0].iov_base = buf; 3348 bdev_io->u.bdev.iovs[0].iov_len = num_blocks * bdev->blocklen; 3349 bdev_io->u.bdev.iovcnt = 1; 3350 bdev_io->u.bdev.md_buf = md_buf; 3351 bdev_io->u.bdev.num_blocks = num_blocks; 3352 bdev_io->u.bdev.offset_blocks = offset_blocks; 3353 bdev_io_init(bdev_io, bdev, cb_arg, cb); 3354 3355 bdev_io_submit(bdev_io); 3356 return 0; 3357 } 3358 3359 int 3360 spdk_bdev_read(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 3361 void *buf, uint64_t offset, uint64_t nbytes, 3362 spdk_bdev_io_completion_cb cb, void *cb_arg) 3363 { 3364 uint64_t offset_blocks, num_blocks; 3365 3366 if (bdev_bytes_to_blocks(spdk_bdev_desc_get_bdev(desc), offset, &offset_blocks, 3367 nbytes, &num_blocks) != 0) { 3368 return -EINVAL; 3369 } 3370 3371 return spdk_bdev_read_blocks(desc, ch, buf, offset_blocks, num_blocks, cb, cb_arg); 3372 } 3373 3374 int 3375 spdk_bdev_read_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 3376 void *buf, uint64_t offset_blocks, uint64_t num_blocks, 3377 spdk_bdev_io_completion_cb cb, void *cb_arg) 3378 { 3379 return bdev_read_blocks_with_md(desc, ch, buf, NULL, offset_blocks, num_blocks, cb, cb_arg); 3380 } 3381 3382 int 3383 spdk_bdev_read_blocks_with_md(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 3384 void *buf, void *md_buf, int64_t offset_blocks, uint64_t num_blocks, 3385 spdk_bdev_io_completion_cb cb, void *cb_arg) 3386 { 3387 struct iovec iov = { 3388 .iov_base = buf, 3389 }; 3390 3391 if (!spdk_bdev_is_md_separate(spdk_bdev_desc_get_bdev(desc))) { 3392 return -EINVAL; 3393 } 3394 3395 if (!_bdev_io_check_md_buf(&iov, md_buf)) { 3396 return -EINVAL; 3397 } 3398 3399 return bdev_read_blocks_with_md(desc, ch, buf, md_buf, offset_blocks, num_blocks, 3400 cb, cb_arg); 3401 } 3402 3403 int 3404 spdk_bdev_readv(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 3405 struct iovec *iov, int iovcnt, 3406 uint64_t offset, uint64_t nbytes, 3407 spdk_bdev_io_completion_cb cb, void *cb_arg) 3408 { 3409 uint64_t offset_blocks, num_blocks; 3410 3411 if (bdev_bytes_to_blocks(spdk_bdev_desc_get_bdev(desc), offset, &offset_blocks, 3412 nbytes, &num_blocks) != 0) { 3413 return -EINVAL; 3414 } 3415 3416 return spdk_bdev_readv_blocks(desc, ch, iov, iovcnt, offset_blocks, num_blocks, cb, cb_arg); 3417 } 3418 3419 static int 3420 bdev_readv_blocks_with_md(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 3421 struct iovec *iov, int iovcnt, void *md_buf, uint64_t offset_blocks, 3422 uint64_t num_blocks, spdk_bdev_io_completion_cb cb, void *cb_arg) 3423 { 3424 struct spdk_bdev *bdev = spdk_bdev_desc_get_bdev(desc); 3425 struct spdk_bdev_io *bdev_io; 3426 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 3427 3428 if (!bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) { 3429 return -EINVAL; 3430 } 3431 3432 bdev_io = bdev_channel_get_io(channel); 3433 if (!bdev_io) { 3434 return -ENOMEM; 3435 } 3436 3437 bdev_io->internal.ch = channel; 3438 bdev_io->internal.desc = desc; 3439 bdev_io->type = SPDK_BDEV_IO_TYPE_READ; 3440 bdev_io->u.bdev.iovs = iov; 3441 bdev_io->u.bdev.iovcnt = iovcnt; 3442 bdev_io->u.bdev.md_buf = md_buf; 3443 bdev_io->u.bdev.num_blocks = num_blocks; 3444 bdev_io->u.bdev.offset_blocks = offset_blocks; 3445 bdev_io_init(bdev_io, bdev, cb_arg, cb); 3446 3447 bdev_io_submit(bdev_io); 3448 return 0; 3449 } 3450 3451 int spdk_bdev_readv_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 3452 struct iovec *iov, int iovcnt, 3453 uint64_t offset_blocks, uint64_t num_blocks, 3454 spdk_bdev_io_completion_cb cb, void *cb_arg) 3455 { 3456 return bdev_readv_blocks_with_md(desc, ch, iov, iovcnt, NULL, offset_blocks, 3457 num_blocks, cb, cb_arg); 3458 } 3459 3460 int 3461 spdk_bdev_readv_blocks_with_md(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 3462 struct iovec *iov, int iovcnt, void *md_buf, 3463 uint64_t offset_blocks, uint64_t num_blocks, 3464 spdk_bdev_io_completion_cb cb, void *cb_arg) 3465 { 3466 if (!spdk_bdev_is_md_separate(spdk_bdev_desc_get_bdev(desc))) { 3467 return -EINVAL; 3468 } 3469 3470 if (!_bdev_io_check_md_buf(iov, md_buf)) { 3471 return -EINVAL; 3472 } 3473 3474 return bdev_readv_blocks_with_md(desc, ch, iov, iovcnt, md_buf, offset_blocks, 3475 num_blocks, cb, cb_arg); 3476 } 3477 3478 static int 3479 bdev_write_blocks_with_md(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 3480 void *buf, void *md_buf, uint64_t offset_blocks, uint64_t num_blocks, 3481 spdk_bdev_io_completion_cb cb, void *cb_arg) 3482 { 3483 struct spdk_bdev *bdev = spdk_bdev_desc_get_bdev(desc); 3484 struct spdk_bdev_io *bdev_io; 3485 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 3486 3487 if (!desc->write) { 3488 return -EBADF; 3489 } 3490 3491 if (!bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) { 3492 return -EINVAL; 3493 } 3494 3495 bdev_io = bdev_channel_get_io(channel); 3496 if (!bdev_io) { 3497 return -ENOMEM; 3498 } 3499 3500 bdev_io->internal.ch = channel; 3501 bdev_io->internal.desc = desc; 3502 bdev_io->type = SPDK_BDEV_IO_TYPE_WRITE; 3503 bdev_io->u.bdev.iovs = &bdev_io->iov; 3504 bdev_io->u.bdev.iovs[0].iov_base = buf; 3505 bdev_io->u.bdev.iovs[0].iov_len = num_blocks * bdev->blocklen; 3506 bdev_io->u.bdev.iovcnt = 1; 3507 bdev_io->u.bdev.md_buf = md_buf; 3508 bdev_io->u.bdev.num_blocks = num_blocks; 3509 bdev_io->u.bdev.offset_blocks = offset_blocks; 3510 bdev_io_init(bdev_io, bdev, cb_arg, cb); 3511 3512 bdev_io_submit(bdev_io); 3513 return 0; 3514 } 3515 3516 int 3517 spdk_bdev_write(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 3518 void *buf, uint64_t offset, uint64_t nbytes, 3519 spdk_bdev_io_completion_cb cb, void *cb_arg) 3520 { 3521 uint64_t offset_blocks, num_blocks; 3522 3523 if (bdev_bytes_to_blocks(spdk_bdev_desc_get_bdev(desc), offset, &offset_blocks, 3524 nbytes, &num_blocks) != 0) { 3525 return -EINVAL; 3526 } 3527 3528 return spdk_bdev_write_blocks(desc, ch, buf, offset_blocks, num_blocks, cb, cb_arg); 3529 } 3530 3531 int 3532 spdk_bdev_write_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 3533 void *buf, uint64_t offset_blocks, uint64_t num_blocks, 3534 spdk_bdev_io_completion_cb cb, void *cb_arg) 3535 { 3536 return bdev_write_blocks_with_md(desc, ch, buf, NULL, offset_blocks, num_blocks, 3537 cb, cb_arg); 3538 } 3539 3540 int 3541 spdk_bdev_write_blocks_with_md(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 3542 void *buf, void *md_buf, uint64_t offset_blocks, uint64_t num_blocks, 3543 spdk_bdev_io_completion_cb cb, void *cb_arg) 3544 { 3545 struct iovec iov = { 3546 .iov_base = buf, 3547 }; 3548 3549 if (!spdk_bdev_is_md_separate(spdk_bdev_desc_get_bdev(desc))) { 3550 return -EINVAL; 3551 } 3552 3553 if (!_bdev_io_check_md_buf(&iov, md_buf)) { 3554 return -EINVAL; 3555 } 3556 3557 return bdev_write_blocks_with_md(desc, ch, buf, md_buf, offset_blocks, num_blocks, 3558 cb, cb_arg); 3559 } 3560 3561 static int 3562 bdev_writev_blocks_with_md(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 3563 struct iovec *iov, int iovcnt, void *md_buf, 3564 uint64_t offset_blocks, uint64_t num_blocks, 3565 spdk_bdev_io_completion_cb cb, void *cb_arg) 3566 { 3567 struct spdk_bdev *bdev = spdk_bdev_desc_get_bdev(desc); 3568 struct spdk_bdev_io *bdev_io; 3569 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 3570 3571 if (!desc->write) { 3572 return -EBADF; 3573 } 3574 3575 if (!bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) { 3576 return -EINVAL; 3577 } 3578 3579 bdev_io = bdev_channel_get_io(channel); 3580 if (!bdev_io) { 3581 return -ENOMEM; 3582 } 3583 3584 bdev_io->internal.ch = channel; 3585 bdev_io->internal.desc = desc; 3586 bdev_io->type = SPDK_BDEV_IO_TYPE_WRITE; 3587 bdev_io->u.bdev.iovs = iov; 3588 bdev_io->u.bdev.iovcnt = iovcnt; 3589 bdev_io->u.bdev.md_buf = md_buf; 3590 bdev_io->u.bdev.num_blocks = num_blocks; 3591 bdev_io->u.bdev.offset_blocks = offset_blocks; 3592 bdev_io_init(bdev_io, bdev, cb_arg, cb); 3593 3594 bdev_io_submit(bdev_io); 3595 return 0; 3596 } 3597 3598 int 3599 spdk_bdev_writev(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 3600 struct iovec *iov, int iovcnt, 3601 uint64_t offset, uint64_t len, 3602 spdk_bdev_io_completion_cb cb, void *cb_arg) 3603 { 3604 uint64_t offset_blocks, num_blocks; 3605 3606 if (bdev_bytes_to_blocks(spdk_bdev_desc_get_bdev(desc), offset, &offset_blocks, 3607 len, &num_blocks) != 0) { 3608 return -EINVAL; 3609 } 3610 3611 return spdk_bdev_writev_blocks(desc, ch, iov, iovcnt, offset_blocks, num_blocks, cb, cb_arg); 3612 } 3613 3614 int 3615 spdk_bdev_writev_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 3616 struct iovec *iov, int iovcnt, 3617 uint64_t offset_blocks, uint64_t num_blocks, 3618 spdk_bdev_io_completion_cb cb, void *cb_arg) 3619 { 3620 return bdev_writev_blocks_with_md(desc, ch, iov, iovcnt, NULL, offset_blocks, 3621 num_blocks, cb, cb_arg); 3622 } 3623 3624 int 3625 spdk_bdev_writev_blocks_with_md(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 3626 struct iovec *iov, int iovcnt, void *md_buf, 3627 uint64_t offset_blocks, uint64_t num_blocks, 3628 spdk_bdev_io_completion_cb cb, void *cb_arg) 3629 { 3630 if (!spdk_bdev_is_md_separate(spdk_bdev_desc_get_bdev(desc))) { 3631 return -EINVAL; 3632 } 3633 3634 if (!_bdev_io_check_md_buf(iov, md_buf)) { 3635 return -EINVAL; 3636 } 3637 3638 return bdev_writev_blocks_with_md(desc, ch, iov, iovcnt, md_buf, offset_blocks, 3639 num_blocks, cb, cb_arg); 3640 } 3641 3642 static void 3643 bdev_compare_do_read_done(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg) 3644 { 3645 struct spdk_bdev_io *parent_io = cb_arg; 3646 uint8_t *read_buf = bdev_io->u.bdev.iovs[0].iov_base; 3647 int i, rc = 0; 3648 3649 if (!success) { 3650 parent_io->internal.status = SPDK_BDEV_IO_STATUS_FAILED; 3651 parent_io->internal.cb(parent_io, false, parent_io->internal.caller_ctx); 3652 spdk_bdev_free_io(bdev_io); 3653 return; 3654 } 3655 3656 for (i = 0; i < parent_io->u.bdev.iovcnt; i++) { 3657 rc = memcmp(read_buf, 3658 parent_io->u.bdev.iovs[i].iov_base, 3659 parent_io->u.bdev.iovs[i].iov_len); 3660 if (rc) { 3661 break; 3662 } 3663 read_buf += parent_io->u.bdev.iovs[i].iov_len; 3664 } 3665 3666 spdk_bdev_free_io(bdev_io); 3667 3668 if (rc == 0) { 3669 parent_io->internal.status = SPDK_BDEV_IO_STATUS_SUCCESS; 3670 parent_io->internal.cb(parent_io, true, parent_io->internal.caller_ctx); 3671 } else { 3672 parent_io->internal.status = SPDK_BDEV_IO_STATUS_MISCOMPARE; 3673 parent_io->internal.cb(parent_io, false, parent_io->internal.caller_ctx); 3674 } 3675 } 3676 3677 static void 3678 bdev_compare_do_read(void *_bdev_io) 3679 { 3680 struct spdk_bdev_io *bdev_io = _bdev_io; 3681 int rc; 3682 3683 rc = spdk_bdev_read_blocks(bdev_io->internal.desc, 3684 spdk_io_channel_from_ctx(bdev_io->internal.ch), NULL, 3685 bdev_io->u.bdev.offset_blocks, bdev_io->u.bdev.num_blocks, 3686 bdev_compare_do_read_done, bdev_io); 3687 3688 if (rc == -ENOMEM) { 3689 bdev_queue_io_wait_with_cb(bdev_io, bdev_compare_do_read); 3690 } else if (rc != 0) { 3691 bdev_io->internal.status = SPDK_BDEV_IO_STATUS_FAILED; 3692 bdev_io->internal.cb(bdev_io, false, bdev_io->internal.caller_ctx); 3693 } 3694 } 3695 3696 static int 3697 bdev_comparev_blocks_with_md(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 3698 struct iovec *iov, int iovcnt, void *md_buf, 3699 uint64_t offset_blocks, uint64_t num_blocks, 3700 spdk_bdev_io_completion_cb cb, void *cb_arg) 3701 { 3702 struct spdk_bdev *bdev = spdk_bdev_desc_get_bdev(desc); 3703 struct spdk_bdev_io *bdev_io; 3704 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 3705 3706 if (!bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) { 3707 return -EINVAL; 3708 } 3709 3710 bdev_io = bdev_channel_get_io(channel); 3711 if (!bdev_io) { 3712 return -ENOMEM; 3713 } 3714 3715 bdev_io->internal.ch = channel; 3716 bdev_io->internal.desc = desc; 3717 bdev_io->type = SPDK_BDEV_IO_TYPE_COMPARE; 3718 bdev_io->u.bdev.iovs = iov; 3719 bdev_io->u.bdev.iovcnt = iovcnt; 3720 bdev_io->u.bdev.md_buf = md_buf; 3721 bdev_io->u.bdev.num_blocks = num_blocks; 3722 bdev_io->u.bdev.offset_blocks = offset_blocks; 3723 bdev_io_init(bdev_io, bdev, cb_arg, cb); 3724 3725 if (bdev_io_type_supported(bdev, SPDK_BDEV_IO_TYPE_COMPARE)) { 3726 bdev_io_submit(bdev_io); 3727 return 0; 3728 } 3729 3730 bdev_compare_do_read(bdev_io); 3731 3732 return 0; 3733 } 3734 3735 int 3736 spdk_bdev_comparev_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 3737 struct iovec *iov, int iovcnt, 3738 uint64_t offset_blocks, uint64_t num_blocks, 3739 spdk_bdev_io_completion_cb cb, void *cb_arg) 3740 { 3741 return bdev_comparev_blocks_with_md(desc, ch, iov, iovcnt, NULL, offset_blocks, 3742 num_blocks, cb, cb_arg); 3743 } 3744 3745 int 3746 spdk_bdev_comparev_blocks_with_md(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 3747 struct iovec *iov, int iovcnt, void *md_buf, 3748 uint64_t offset_blocks, uint64_t num_blocks, 3749 spdk_bdev_io_completion_cb cb, void *cb_arg) 3750 { 3751 if (!spdk_bdev_is_md_separate(spdk_bdev_desc_get_bdev(desc))) { 3752 return -EINVAL; 3753 } 3754 3755 if (!_bdev_io_check_md_buf(iov, md_buf)) { 3756 return -EINVAL; 3757 } 3758 3759 return bdev_comparev_blocks_with_md(desc, ch, iov, iovcnt, md_buf, offset_blocks, 3760 num_blocks, cb, cb_arg); 3761 } 3762 3763 static int 3764 bdev_compare_blocks_with_md(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 3765 void *buf, void *md_buf, uint64_t offset_blocks, uint64_t num_blocks, 3766 spdk_bdev_io_completion_cb cb, void *cb_arg) 3767 { 3768 struct spdk_bdev *bdev = spdk_bdev_desc_get_bdev(desc); 3769 struct spdk_bdev_io *bdev_io; 3770 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 3771 3772 if (!bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) { 3773 return -EINVAL; 3774 } 3775 3776 bdev_io = bdev_channel_get_io(channel); 3777 if (!bdev_io) { 3778 return -ENOMEM; 3779 } 3780 3781 bdev_io->internal.ch = channel; 3782 bdev_io->internal.desc = desc; 3783 bdev_io->type = SPDK_BDEV_IO_TYPE_COMPARE; 3784 bdev_io->u.bdev.iovs = &bdev_io->iov; 3785 bdev_io->u.bdev.iovs[0].iov_base = buf; 3786 bdev_io->u.bdev.iovs[0].iov_len = num_blocks * bdev->blocklen; 3787 bdev_io->u.bdev.iovcnt = 1; 3788 bdev_io->u.bdev.md_buf = md_buf; 3789 bdev_io->u.bdev.num_blocks = num_blocks; 3790 bdev_io->u.bdev.offset_blocks = offset_blocks; 3791 bdev_io_init(bdev_io, bdev, cb_arg, cb); 3792 3793 if (bdev_io_type_supported(bdev, SPDK_BDEV_IO_TYPE_COMPARE)) { 3794 bdev_io_submit(bdev_io); 3795 return 0; 3796 } 3797 3798 bdev_compare_do_read(bdev_io); 3799 3800 return 0; 3801 } 3802 3803 int 3804 spdk_bdev_compare_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 3805 void *buf, uint64_t offset_blocks, uint64_t num_blocks, 3806 spdk_bdev_io_completion_cb cb, void *cb_arg) 3807 { 3808 return bdev_compare_blocks_with_md(desc, ch, buf, NULL, offset_blocks, num_blocks, 3809 cb, cb_arg); 3810 } 3811 3812 int 3813 spdk_bdev_compare_blocks_with_md(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 3814 void *buf, void *md_buf, uint64_t offset_blocks, uint64_t num_blocks, 3815 spdk_bdev_io_completion_cb cb, void *cb_arg) 3816 { 3817 struct iovec iov = { 3818 .iov_base = buf, 3819 }; 3820 3821 if (!spdk_bdev_is_md_separate(spdk_bdev_desc_get_bdev(desc))) { 3822 return -EINVAL; 3823 } 3824 3825 if (!_bdev_io_check_md_buf(&iov, md_buf)) { 3826 return -EINVAL; 3827 } 3828 3829 return bdev_compare_blocks_with_md(desc, ch, buf, md_buf, offset_blocks, num_blocks, 3830 cb, cb_arg); 3831 } 3832 3833 static void 3834 bdev_comparev_and_writev_blocks_unlocked(void *ctx, int unlock_status) 3835 { 3836 struct spdk_bdev_io *bdev_io = ctx; 3837 3838 if (unlock_status) { 3839 SPDK_ERRLOG("LBA range unlock failed\n"); 3840 } 3841 3842 bdev_io->internal.cb(bdev_io, bdev_io->internal.status == SPDK_BDEV_IO_STATUS_SUCCESS ? true : 3843 false, bdev_io->internal.caller_ctx); 3844 } 3845 3846 static void 3847 bdev_comparev_and_writev_blocks_unlock(struct spdk_bdev_io *bdev_io, int status) 3848 { 3849 bdev_io->internal.status = status; 3850 3851 bdev_unlock_lba_range(bdev_io->internal.desc, spdk_io_channel_from_ctx(bdev_io->internal.ch), 3852 bdev_io->u.bdev.offset_blocks, bdev_io->u.bdev.num_blocks, 3853 bdev_comparev_and_writev_blocks_unlocked, bdev_io); 3854 } 3855 3856 static void 3857 bdev_compare_and_write_do_write_done(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg) 3858 { 3859 struct spdk_bdev_io *parent_io = cb_arg; 3860 3861 if (!success) { 3862 SPDK_ERRLOG("Compare and write operation failed\n"); 3863 } 3864 3865 spdk_bdev_free_io(bdev_io); 3866 3867 bdev_comparev_and_writev_blocks_unlock(parent_io, 3868 success ? SPDK_BDEV_IO_STATUS_SUCCESS : SPDK_BDEV_IO_STATUS_FAILED); 3869 } 3870 3871 static void 3872 bdev_compare_and_write_do_write(void *_bdev_io) 3873 { 3874 struct spdk_bdev_io *bdev_io = _bdev_io; 3875 int rc; 3876 3877 rc = spdk_bdev_writev_blocks(bdev_io->internal.desc, 3878 spdk_io_channel_from_ctx(bdev_io->internal.ch), 3879 bdev_io->u.bdev.fused_iovs, bdev_io->u.bdev.fused_iovcnt, 3880 bdev_io->u.bdev.offset_blocks, bdev_io->u.bdev.num_blocks, 3881 bdev_compare_and_write_do_write_done, bdev_io); 3882 3883 3884 if (rc == -ENOMEM) { 3885 bdev_queue_io_wait_with_cb(bdev_io, bdev_compare_and_write_do_write); 3886 } else if (rc != 0) { 3887 bdev_comparev_and_writev_blocks_unlock(bdev_io, SPDK_BDEV_IO_STATUS_FAILED); 3888 } 3889 } 3890 3891 static void 3892 bdev_compare_and_write_do_compare_done(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg) 3893 { 3894 struct spdk_bdev_io *parent_io = cb_arg; 3895 3896 spdk_bdev_free_io(bdev_io); 3897 3898 if (!success) { 3899 bdev_comparev_and_writev_blocks_unlock(parent_io, SPDK_BDEV_IO_STATUS_MISCOMPARE); 3900 return; 3901 } 3902 3903 bdev_compare_and_write_do_write(parent_io); 3904 } 3905 3906 static void 3907 bdev_compare_and_write_do_compare(void *_bdev_io) 3908 { 3909 struct spdk_bdev_io *bdev_io = _bdev_io; 3910 int rc; 3911 3912 rc = spdk_bdev_comparev_blocks(bdev_io->internal.desc, 3913 spdk_io_channel_from_ctx(bdev_io->internal.ch), bdev_io->u.bdev.iovs, 3914 bdev_io->u.bdev.iovcnt, bdev_io->u.bdev.offset_blocks, bdev_io->u.bdev.num_blocks, 3915 bdev_compare_and_write_do_compare_done, bdev_io); 3916 3917 if (rc == -ENOMEM) { 3918 bdev_queue_io_wait_with_cb(bdev_io, bdev_compare_and_write_do_compare); 3919 } else if (rc != 0) { 3920 bdev_comparev_and_writev_blocks_unlock(bdev_io, SPDK_BDEV_IO_STATUS_FIRST_FUSED_FAILED); 3921 } 3922 } 3923 3924 static void 3925 bdev_comparev_and_writev_blocks_locked(void *ctx, int status) 3926 { 3927 struct spdk_bdev_io *bdev_io = ctx; 3928 3929 if (status) { 3930 bdev_io->internal.status = SPDK_BDEV_IO_STATUS_FIRST_FUSED_FAILED; 3931 bdev_io->internal.cb(bdev_io, false, bdev_io->internal.caller_ctx); 3932 return; 3933 } 3934 3935 bdev_compare_and_write_do_compare(bdev_io); 3936 } 3937 3938 int 3939 spdk_bdev_comparev_and_writev_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 3940 struct iovec *compare_iov, int compare_iovcnt, 3941 struct iovec *write_iov, int write_iovcnt, 3942 uint64_t offset_blocks, uint64_t num_blocks, 3943 spdk_bdev_io_completion_cb cb, void *cb_arg) 3944 { 3945 struct spdk_bdev *bdev = spdk_bdev_desc_get_bdev(desc); 3946 struct spdk_bdev_io *bdev_io; 3947 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 3948 3949 if (!desc->write) { 3950 return -EBADF; 3951 } 3952 3953 if (!bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) { 3954 return -EINVAL; 3955 } 3956 3957 if (num_blocks > bdev->acwu) { 3958 return -EINVAL; 3959 } 3960 3961 bdev_io = bdev_channel_get_io(channel); 3962 if (!bdev_io) { 3963 return -ENOMEM; 3964 } 3965 3966 bdev_io->internal.ch = channel; 3967 bdev_io->internal.desc = desc; 3968 bdev_io->type = SPDK_BDEV_IO_TYPE_COMPARE_AND_WRITE; 3969 bdev_io->u.bdev.iovs = compare_iov; 3970 bdev_io->u.bdev.iovcnt = compare_iovcnt; 3971 bdev_io->u.bdev.fused_iovs = write_iov; 3972 bdev_io->u.bdev.fused_iovcnt = write_iovcnt; 3973 bdev_io->u.bdev.md_buf = NULL; 3974 bdev_io->u.bdev.num_blocks = num_blocks; 3975 bdev_io->u.bdev.offset_blocks = offset_blocks; 3976 bdev_io_init(bdev_io, bdev, cb_arg, cb); 3977 3978 if (bdev_io_type_supported(bdev, SPDK_BDEV_IO_TYPE_COMPARE_AND_WRITE)) { 3979 bdev_io_submit(bdev_io); 3980 return 0; 3981 } 3982 3983 return bdev_lock_lba_range(desc, ch, offset_blocks, num_blocks, 3984 bdev_comparev_and_writev_blocks_locked, bdev_io); 3985 } 3986 3987 static void 3988 bdev_zcopy_get_buf(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io, bool success) 3989 { 3990 if (!success) { 3991 /* Don't use spdk_bdev_io_complete here - this bdev_io was never actually submitted. */ 3992 bdev_io->internal.status = SPDK_BDEV_IO_STATUS_NOMEM; 3993 bdev_io->internal.cb(bdev_io, success, bdev_io->internal.caller_ctx); 3994 return; 3995 } 3996 3997 if (bdev_io->u.bdev.zcopy.populate) { 3998 /* Read the real data into the buffer */ 3999 bdev_io->type = SPDK_BDEV_IO_TYPE_READ; 4000 bdev_io->internal.status = SPDK_BDEV_IO_STATUS_PENDING; 4001 bdev_io_submit(bdev_io); 4002 return; 4003 } 4004 4005 /* Don't use spdk_bdev_io_complete here - this bdev_io was never actually submitted. */ 4006 bdev_io->internal.status = SPDK_BDEV_IO_STATUS_SUCCESS; 4007 bdev_io->internal.cb(bdev_io, success, bdev_io->internal.caller_ctx); 4008 } 4009 4010 int 4011 spdk_bdev_zcopy_start(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 4012 uint64_t offset_blocks, uint64_t num_blocks, 4013 bool populate, 4014 spdk_bdev_io_completion_cb cb, void *cb_arg) 4015 { 4016 struct spdk_bdev *bdev = spdk_bdev_desc_get_bdev(desc); 4017 struct spdk_bdev_io *bdev_io; 4018 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 4019 4020 if (!desc->write) { 4021 return -EBADF; 4022 } 4023 4024 if (!bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) { 4025 return -EINVAL; 4026 } 4027 4028 if (!spdk_bdev_io_type_supported(bdev, SPDK_BDEV_IO_TYPE_ZCOPY)) { 4029 return -ENOTSUP; 4030 } 4031 4032 bdev_io = bdev_channel_get_io(channel); 4033 if (!bdev_io) { 4034 return -ENOMEM; 4035 } 4036 4037 bdev_io->internal.ch = channel; 4038 bdev_io->internal.desc = desc; 4039 bdev_io->type = SPDK_BDEV_IO_TYPE_ZCOPY; 4040 bdev_io->u.bdev.num_blocks = num_blocks; 4041 bdev_io->u.bdev.offset_blocks = offset_blocks; 4042 bdev_io->u.bdev.iovs = NULL; 4043 bdev_io->u.bdev.iovcnt = 0; 4044 bdev_io->u.bdev.md_buf = NULL; 4045 bdev_io->u.bdev.zcopy.populate = populate ? 1 : 0; 4046 bdev_io->u.bdev.zcopy.commit = 0; 4047 bdev_io->u.bdev.zcopy.start = 1; 4048 bdev_io_init(bdev_io, bdev, cb_arg, cb); 4049 4050 if (bdev_io_type_supported(bdev, SPDK_BDEV_IO_TYPE_ZCOPY)) { 4051 bdev_io_submit(bdev_io); 4052 } else { 4053 /* Emulate zcopy by allocating a buffer */ 4054 spdk_bdev_io_get_buf(bdev_io, bdev_zcopy_get_buf, 4055 bdev_io->u.bdev.num_blocks * bdev->blocklen); 4056 } 4057 4058 return 0; 4059 } 4060 4061 int 4062 spdk_bdev_zcopy_end(struct spdk_bdev_io *bdev_io, bool commit, 4063 spdk_bdev_io_completion_cb cb, void *cb_arg) 4064 { 4065 struct spdk_bdev *bdev = bdev_io->bdev; 4066 4067 if (bdev_io->type == SPDK_BDEV_IO_TYPE_READ) { 4068 /* This can happen if the zcopy was emulated in start */ 4069 if (bdev_io->u.bdev.zcopy.start != 1) { 4070 return -EINVAL; 4071 } 4072 bdev_io->type = SPDK_BDEV_IO_TYPE_ZCOPY; 4073 } 4074 4075 if (bdev_io->type != SPDK_BDEV_IO_TYPE_ZCOPY) { 4076 return -EINVAL; 4077 } 4078 4079 bdev_io->u.bdev.zcopy.commit = commit ? 1 : 0; 4080 bdev_io->u.bdev.zcopy.start = 0; 4081 bdev_io->internal.caller_ctx = cb_arg; 4082 bdev_io->internal.cb = cb; 4083 bdev_io->internal.status = SPDK_BDEV_IO_STATUS_PENDING; 4084 4085 if (bdev_io_type_supported(bdev, SPDK_BDEV_IO_TYPE_ZCOPY)) { 4086 bdev_io_submit(bdev_io); 4087 return 0; 4088 } 4089 4090 if (!bdev_io->u.bdev.zcopy.commit) { 4091 /* Don't use spdk_bdev_io_complete here - this bdev_io was never actually submitted. */ 4092 bdev_io->internal.status = SPDK_BDEV_IO_STATUS_SUCCESS; 4093 bdev_io->internal.cb(bdev_io, true, bdev_io->internal.caller_ctx); 4094 return 0; 4095 } 4096 4097 bdev_io->type = SPDK_BDEV_IO_TYPE_WRITE; 4098 bdev_io_submit(bdev_io); 4099 4100 return 0; 4101 } 4102 4103 int 4104 spdk_bdev_write_zeroes(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 4105 uint64_t offset, uint64_t len, 4106 spdk_bdev_io_completion_cb cb, void *cb_arg) 4107 { 4108 uint64_t offset_blocks, num_blocks; 4109 4110 if (bdev_bytes_to_blocks(spdk_bdev_desc_get_bdev(desc), offset, &offset_blocks, 4111 len, &num_blocks) != 0) { 4112 return -EINVAL; 4113 } 4114 4115 return spdk_bdev_write_zeroes_blocks(desc, ch, offset_blocks, num_blocks, cb, cb_arg); 4116 } 4117 4118 int 4119 spdk_bdev_write_zeroes_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 4120 uint64_t offset_blocks, uint64_t num_blocks, 4121 spdk_bdev_io_completion_cb cb, void *cb_arg) 4122 { 4123 struct spdk_bdev *bdev = spdk_bdev_desc_get_bdev(desc); 4124 struct spdk_bdev_io *bdev_io; 4125 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 4126 4127 if (!desc->write) { 4128 return -EBADF; 4129 } 4130 4131 if (!bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) { 4132 return -EINVAL; 4133 } 4134 4135 if (!bdev_io_type_supported(bdev, SPDK_BDEV_IO_TYPE_WRITE_ZEROES) && 4136 !bdev_io_type_supported(bdev, SPDK_BDEV_IO_TYPE_WRITE)) { 4137 return -ENOTSUP; 4138 } 4139 4140 bdev_io = bdev_channel_get_io(channel); 4141 4142 if (!bdev_io) { 4143 return -ENOMEM; 4144 } 4145 4146 bdev_io->type = SPDK_BDEV_IO_TYPE_WRITE_ZEROES; 4147 bdev_io->internal.ch = channel; 4148 bdev_io->internal.desc = desc; 4149 bdev_io->u.bdev.offset_blocks = offset_blocks; 4150 bdev_io->u.bdev.num_blocks = num_blocks; 4151 bdev_io_init(bdev_io, bdev, cb_arg, cb); 4152 4153 if (bdev_io_type_supported(bdev, SPDK_BDEV_IO_TYPE_WRITE_ZEROES)) { 4154 bdev_io_submit(bdev_io); 4155 return 0; 4156 } 4157 4158 assert(bdev_io_type_supported(bdev, SPDK_BDEV_IO_TYPE_WRITE)); 4159 assert(_bdev_get_block_size_with_md(bdev) <= ZERO_BUFFER_SIZE); 4160 bdev_io->u.bdev.split_remaining_num_blocks = num_blocks; 4161 bdev_io->u.bdev.split_current_offset_blocks = offset_blocks; 4162 bdev_write_zero_buffer_next(bdev_io); 4163 4164 return 0; 4165 } 4166 4167 int 4168 spdk_bdev_unmap(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 4169 uint64_t offset, uint64_t nbytes, 4170 spdk_bdev_io_completion_cb cb, void *cb_arg) 4171 { 4172 uint64_t offset_blocks, num_blocks; 4173 4174 if (bdev_bytes_to_blocks(spdk_bdev_desc_get_bdev(desc), offset, &offset_blocks, 4175 nbytes, &num_blocks) != 0) { 4176 return -EINVAL; 4177 } 4178 4179 return spdk_bdev_unmap_blocks(desc, ch, offset_blocks, num_blocks, cb, cb_arg); 4180 } 4181 4182 int 4183 spdk_bdev_unmap_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 4184 uint64_t offset_blocks, uint64_t num_blocks, 4185 spdk_bdev_io_completion_cb cb, void *cb_arg) 4186 { 4187 struct spdk_bdev *bdev = spdk_bdev_desc_get_bdev(desc); 4188 struct spdk_bdev_io *bdev_io; 4189 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 4190 4191 if (!desc->write) { 4192 return -EBADF; 4193 } 4194 4195 if (!bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) { 4196 return -EINVAL; 4197 } 4198 4199 if (num_blocks == 0) { 4200 SPDK_ERRLOG("Can't unmap 0 bytes\n"); 4201 return -EINVAL; 4202 } 4203 4204 bdev_io = bdev_channel_get_io(channel); 4205 if (!bdev_io) { 4206 return -ENOMEM; 4207 } 4208 4209 bdev_io->internal.ch = channel; 4210 bdev_io->internal.desc = desc; 4211 bdev_io->type = SPDK_BDEV_IO_TYPE_UNMAP; 4212 4213 bdev_io->u.bdev.iovs = &bdev_io->iov; 4214 bdev_io->u.bdev.iovs[0].iov_base = NULL; 4215 bdev_io->u.bdev.iovs[0].iov_len = 0; 4216 bdev_io->u.bdev.iovcnt = 1; 4217 4218 bdev_io->u.bdev.offset_blocks = offset_blocks; 4219 bdev_io->u.bdev.num_blocks = num_blocks; 4220 bdev_io_init(bdev_io, bdev, cb_arg, cb); 4221 4222 bdev_io_submit(bdev_io); 4223 return 0; 4224 } 4225 4226 int 4227 spdk_bdev_flush(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 4228 uint64_t offset, uint64_t length, 4229 spdk_bdev_io_completion_cb cb, void *cb_arg) 4230 { 4231 uint64_t offset_blocks, num_blocks; 4232 4233 if (bdev_bytes_to_blocks(spdk_bdev_desc_get_bdev(desc), offset, &offset_blocks, 4234 length, &num_blocks) != 0) { 4235 return -EINVAL; 4236 } 4237 4238 return spdk_bdev_flush_blocks(desc, ch, offset_blocks, num_blocks, cb, cb_arg); 4239 } 4240 4241 int 4242 spdk_bdev_flush_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 4243 uint64_t offset_blocks, uint64_t num_blocks, 4244 spdk_bdev_io_completion_cb cb, void *cb_arg) 4245 { 4246 struct spdk_bdev *bdev = spdk_bdev_desc_get_bdev(desc); 4247 struct spdk_bdev_io *bdev_io; 4248 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 4249 4250 if (!desc->write) { 4251 return -EBADF; 4252 } 4253 4254 if (!bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) { 4255 return -EINVAL; 4256 } 4257 4258 bdev_io = bdev_channel_get_io(channel); 4259 if (!bdev_io) { 4260 return -ENOMEM; 4261 } 4262 4263 bdev_io->internal.ch = channel; 4264 bdev_io->internal.desc = desc; 4265 bdev_io->type = SPDK_BDEV_IO_TYPE_FLUSH; 4266 bdev_io->u.bdev.iovs = NULL; 4267 bdev_io->u.bdev.iovcnt = 0; 4268 bdev_io->u.bdev.offset_blocks = offset_blocks; 4269 bdev_io->u.bdev.num_blocks = num_blocks; 4270 bdev_io_init(bdev_io, bdev, cb_arg, cb); 4271 4272 bdev_io_submit(bdev_io); 4273 return 0; 4274 } 4275 4276 static void 4277 bdev_reset_dev(struct spdk_io_channel_iter *i, int status) 4278 { 4279 struct spdk_bdev_channel *ch = spdk_io_channel_iter_get_ctx(i); 4280 struct spdk_bdev_io *bdev_io; 4281 4282 bdev_io = TAILQ_FIRST(&ch->queued_resets); 4283 TAILQ_REMOVE(&ch->queued_resets, bdev_io, internal.link); 4284 bdev_io_submit_reset(bdev_io); 4285 } 4286 4287 static void 4288 bdev_reset_freeze_channel(struct spdk_io_channel_iter *i) 4289 { 4290 struct spdk_io_channel *ch; 4291 struct spdk_bdev_channel *channel; 4292 struct spdk_bdev_mgmt_channel *mgmt_channel; 4293 struct spdk_bdev_shared_resource *shared_resource; 4294 bdev_io_tailq_t tmp_queued; 4295 4296 TAILQ_INIT(&tmp_queued); 4297 4298 ch = spdk_io_channel_iter_get_channel(i); 4299 channel = spdk_io_channel_get_ctx(ch); 4300 shared_resource = channel->shared_resource; 4301 mgmt_channel = shared_resource->mgmt_ch; 4302 4303 channel->flags |= BDEV_CH_RESET_IN_PROGRESS; 4304 4305 if ((channel->flags & BDEV_CH_QOS_ENABLED) != 0) { 4306 /* The QoS object is always valid and readable while 4307 * the channel flag is set, so the lock here should not 4308 * be necessary. We're not in the fast path though, so 4309 * just take it anyway. */ 4310 pthread_mutex_lock(&channel->bdev->internal.mutex); 4311 if (channel->bdev->internal.qos->ch == channel) { 4312 TAILQ_SWAP(&channel->bdev->internal.qos->queued, &tmp_queued, spdk_bdev_io, internal.link); 4313 } 4314 pthread_mutex_unlock(&channel->bdev->internal.mutex); 4315 } 4316 4317 bdev_abort_all_queued_io(&shared_resource->nomem_io, channel); 4318 bdev_abort_all_buf_io(&mgmt_channel->need_buf_small, channel); 4319 bdev_abort_all_buf_io(&mgmt_channel->need_buf_large, channel); 4320 bdev_abort_all_queued_io(&tmp_queued, channel); 4321 4322 spdk_for_each_channel_continue(i, 0); 4323 } 4324 4325 static void 4326 bdev_start_reset(void *ctx) 4327 { 4328 struct spdk_bdev_channel *ch = ctx; 4329 4330 spdk_for_each_channel(__bdev_to_io_dev(ch->bdev), bdev_reset_freeze_channel, 4331 ch, bdev_reset_dev); 4332 } 4333 4334 static void 4335 bdev_channel_start_reset(struct spdk_bdev_channel *ch) 4336 { 4337 struct spdk_bdev *bdev = ch->bdev; 4338 4339 assert(!TAILQ_EMPTY(&ch->queued_resets)); 4340 4341 pthread_mutex_lock(&bdev->internal.mutex); 4342 if (bdev->internal.reset_in_progress == NULL) { 4343 bdev->internal.reset_in_progress = TAILQ_FIRST(&ch->queued_resets); 4344 /* 4345 * Take a channel reference for the target bdev for the life of this 4346 * reset. This guards against the channel getting destroyed while 4347 * spdk_for_each_channel() calls related to this reset IO are in 4348 * progress. We will release the reference when this reset is 4349 * completed. 4350 */ 4351 bdev->internal.reset_in_progress->u.reset.ch_ref = spdk_get_io_channel(__bdev_to_io_dev(bdev)); 4352 bdev_start_reset(ch); 4353 } 4354 pthread_mutex_unlock(&bdev->internal.mutex); 4355 } 4356 4357 int 4358 spdk_bdev_reset(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 4359 spdk_bdev_io_completion_cb cb, void *cb_arg) 4360 { 4361 struct spdk_bdev *bdev = spdk_bdev_desc_get_bdev(desc); 4362 struct spdk_bdev_io *bdev_io; 4363 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 4364 4365 bdev_io = bdev_channel_get_io(channel); 4366 if (!bdev_io) { 4367 return -ENOMEM; 4368 } 4369 4370 bdev_io->internal.ch = channel; 4371 bdev_io->internal.desc = desc; 4372 bdev_io->internal.submit_tsc = spdk_get_ticks(); 4373 bdev_io->type = SPDK_BDEV_IO_TYPE_RESET; 4374 bdev_io->u.reset.ch_ref = NULL; 4375 bdev_io_init(bdev_io, bdev, cb_arg, cb); 4376 4377 pthread_mutex_lock(&bdev->internal.mutex); 4378 TAILQ_INSERT_TAIL(&channel->queued_resets, bdev_io, internal.link); 4379 pthread_mutex_unlock(&bdev->internal.mutex); 4380 4381 TAILQ_INSERT_TAIL(&bdev_io->internal.ch->io_submitted, bdev_io, 4382 internal.ch_link); 4383 4384 bdev_channel_start_reset(channel); 4385 4386 return 0; 4387 } 4388 4389 void 4390 spdk_bdev_get_io_stat(struct spdk_bdev *bdev, struct spdk_io_channel *ch, 4391 struct spdk_bdev_io_stat *stat) 4392 { 4393 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 4394 4395 *stat = channel->stat; 4396 } 4397 4398 static void 4399 bdev_get_device_stat_done(struct spdk_io_channel_iter *i, int status) 4400 { 4401 void *io_device = spdk_io_channel_iter_get_io_device(i); 4402 struct spdk_bdev_iostat_ctx *bdev_iostat_ctx = spdk_io_channel_iter_get_ctx(i); 4403 4404 bdev_iostat_ctx->cb(__bdev_from_io_dev(io_device), bdev_iostat_ctx->stat, 4405 bdev_iostat_ctx->cb_arg, 0); 4406 free(bdev_iostat_ctx); 4407 } 4408 4409 static void 4410 bdev_get_each_channel_stat(struct spdk_io_channel_iter *i) 4411 { 4412 struct spdk_bdev_iostat_ctx *bdev_iostat_ctx = spdk_io_channel_iter_get_ctx(i); 4413 struct spdk_io_channel *ch = spdk_io_channel_iter_get_channel(i); 4414 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 4415 4416 bdev_io_stat_add(bdev_iostat_ctx->stat, &channel->stat); 4417 spdk_for_each_channel_continue(i, 0); 4418 } 4419 4420 void 4421 spdk_bdev_get_device_stat(struct spdk_bdev *bdev, struct spdk_bdev_io_stat *stat, 4422 spdk_bdev_get_device_stat_cb cb, void *cb_arg) 4423 { 4424 struct spdk_bdev_iostat_ctx *bdev_iostat_ctx; 4425 4426 assert(bdev != NULL); 4427 assert(stat != NULL); 4428 assert(cb != NULL); 4429 4430 bdev_iostat_ctx = calloc(1, sizeof(struct spdk_bdev_iostat_ctx)); 4431 if (bdev_iostat_ctx == NULL) { 4432 SPDK_ERRLOG("Unable to allocate memory for spdk_bdev_iostat_ctx\n"); 4433 cb(bdev, stat, cb_arg, -ENOMEM); 4434 return; 4435 } 4436 4437 bdev_iostat_ctx->stat = stat; 4438 bdev_iostat_ctx->cb = cb; 4439 bdev_iostat_ctx->cb_arg = cb_arg; 4440 4441 /* Start with the statistics from previously deleted channels. */ 4442 pthread_mutex_lock(&bdev->internal.mutex); 4443 bdev_io_stat_add(bdev_iostat_ctx->stat, &bdev->internal.stat); 4444 pthread_mutex_unlock(&bdev->internal.mutex); 4445 4446 /* Then iterate and add the statistics from each existing channel. */ 4447 spdk_for_each_channel(__bdev_to_io_dev(bdev), 4448 bdev_get_each_channel_stat, 4449 bdev_iostat_ctx, 4450 bdev_get_device_stat_done); 4451 } 4452 4453 int 4454 spdk_bdev_nvme_admin_passthru(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 4455 const struct spdk_nvme_cmd *cmd, void *buf, size_t nbytes, 4456 spdk_bdev_io_completion_cb cb, void *cb_arg) 4457 { 4458 struct spdk_bdev *bdev = spdk_bdev_desc_get_bdev(desc); 4459 struct spdk_bdev_io *bdev_io; 4460 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 4461 4462 if (!desc->write) { 4463 return -EBADF; 4464 } 4465 4466 bdev_io = bdev_channel_get_io(channel); 4467 if (!bdev_io) { 4468 return -ENOMEM; 4469 } 4470 4471 bdev_io->internal.ch = channel; 4472 bdev_io->internal.desc = desc; 4473 bdev_io->type = SPDK_BDEV_IO_TYPE_NVME_ADMIN; 4474 bdev_io->u.nvme_passthru.cmd = *cmd; 4475 bdev_io->u.nvme_passthru.buf = buf; 4476 bdev_io->u.nvme_passthru.nbytes = nbytes; 4477 bdev_io->u.nvme_passthru.md_buf = NULL; 4478 bdev_io->u.nvme_passthru.md_len = 0; 4479 4480 bdev_io_init(bdev_io, bdev, cb_arg, cb); 4481 4482 bdev_io_submit(bdev_io); 4483 return 0; 4484 } 4485 4486 int 4487 spdk_bdev_nvme_io_passthru(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 4488 const struct spdk_nvme_cmd *cmd, void *buf, size_t nbytes, 4489 spdk_bdev_io_completion_cb cb, void *cb_arg) 4490 { 4491 struct spdk_bdev *bdev = spdk_bdev_desc_get_bdev(desc); 4492 struct spdk_bdev_io *bdev_io; 4493 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 4494 4495 if (!desc->write) { 4496 /* 4497 * Do not try to parse the NVMe command - we could maybe use bits in the opcode 4498 * to easily determine if the command is a read or write, but for now just 4499 * do not allow io_passthru with a read-only descriptor. 4500 */ 4501 return -EBADF; 4502 } 4503 4504 bdev_io = bdev_channel_get_io(channel); 4505 if (!bdev_io) { 4506 return -ENOMEM; 4507 } 4508 4509 bdev_io->internal.ch = channel; 4510 bdev_io->internal.desc = desc; 4511 bdev_io->type = SPDK_BDEV_IO_TYPE_NVME_IO; 4512 bdev_io->u.nvme_passthru.cmd = *cmd; 4513 bdev_io->u.nvme_passthru.buf = buf; 4514 bdev_io->u.nvme_passthru.nbytes = nbytes; 4515 bdev_io->u.nvme_passthru.md_buf = NULL; 4516 bdev_io->u.nvme_passthru.md_len = 0; 4517 4518 bdev_io_init(bdev_io, bdev, cb_arg, cb); 4519 4520 bdev_io_submit(bdev_io); 4521 return 0; 4522 } 4523 4524 int 4525 spdk_bdev_nvme_io_passthru_md(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 4526 const struct spdk_nvme_cmd *cmd, void *buf, size_t nbytes, void *md_buf, size_t md_len, 4527 spdk_bdev_io_completion_cb cb, void *cb_arg) 4528 { 4529 struct spdk_bdev *bdev = spdk_bdev_desc_get_bdev(desc); 4530 struct spdk_bdev_io *bdev_io; 4531 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 4532 4533 if (!desc->write) { 4534 /* 4535 * Do not try to parse the NVMe command - we could maybe use bits in the opcode 4536 * to easily determine if the command is a read or write, but for now just 4537 * do not allow io_passthru with a read-only descriptor. 4538 */ 4539 return -EBADF; 4540 } 4541 4542 bdev_io = bdev_channel_get_io(channel); 4543 if (!bdev_io) { 4544 return -ENOMEM; 4545 } 4546 4547 bdev_io->internal.ch = channel; 4548 bdev_io->internal.desc = desc; 4549 bdev_io->type = SPDK_BDEV_IO_TYPE_NVME_IO_MD; 4550 bdev_io->u.nvme_passthru.cmd = *cmd; 4551 bdev_io->u.nvme_passthru.buf = buf; 4552 bdev_io->u.nvme_passthru.nbytes = nbytes; 4553 bdev_io->u.nvme_passthru.md_buf = md_buf; 4554 bdev_io->u.nvme_passthru.md_len = md_len; 4555 4556 bdev_io_init(bdev_io, bdev, cb_arg, cb); 4557 4558 bdev_io_submit(bdev_io); 4559 return 0; 4560 } 4561 4562 static void bdev_abort_retry(void *ctx); 4563 static void bdev_abort(struct spdk_bdev_io *parent_io); 4564 4565 static void 4566 bdev_abort_io_done(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg) 4567 { 4568 struct spdk_bdev_channel *channel = bdev_io->internal.ch; 4569 struct spdk_bdev_io *parent_io = cb_arg; 4570 struct spdk_bdev_io *bio_to_abort, *tmp_io; 4571 4572 bio_to_abort = bdev_io->u.abort.bio_to_abort; 4573 4574 spdk_bdev_free_io(bdev_io); 4575 4576 if (!success) { 4577 /* Check if the target I/O completed in the meantime. */ 4578 TAILQ_FOREACH(tmp_io, &channel->io_submitted, internal.ch_link) { 4579 if (tmp_io == bio_to_abort) { 4580 break; 4581 } 4582 } 4583 4584 /* If the target I/O still exists, set the parent to failed. */ 4585 if (tmp_io != NULL) { 4586 parent_io->internal.status = SPDK_BDEV_IO_STATUS_FAILED; 4587 } 4588 } 4589 4590 parent_io->u.bdev.split_outstanding--; 4591 if (parent_io->u.bdev.split_outstanding == 0) { 4592 if (parent_io->internal.status == SPDK_BDEV_IO_STATUS_NOMEM) { 4593 bdev_abort_retry(parent_io); 4594 } else { 4595 bdev_io_complete(parent_io); 4596 } 4597 } 4598 } 4599 4600 static int 4601 bdev_abort_io(struct spdk_bdev_desc *desc, struct spdk_bdev_channel *channel, 4602 struct spdk_bdev_io *bio_to_abort, 4603 spdk_bdev_io_completion_cb cb, void *cb_arg) 4604 { 4605 struct spdk_bdev *bdev = spdk_bdev_desc_get_bdev(desc); 4606 struct spdk_bdev_io *bdev_io; 4607 4608 if (bio_to_abort->type == SPDK_BDEV_IO_TYPE_ABORT || 4609 bio_to_abort->type == SPDK_BDEV_IO_TYPE_RESET) { 4610 /* TODO: Abort reset or abort request. */ 4611 return -ENOTSUP; 4612 } 4613 4614 bdev_io = bdev_channel_get_io(channel); 4615 if (bdev_io == NULL) { 4616 return -ENOMEM; 4617 } 4618 4619 bdev_io->internal.ch = channel; 4620 bdev_io->internal.desc = desc; 4621 bdev_io->type = SPDK_BDEV_IO_TYPE_ABORT; 4622 bdev_io_init(bdev_io, bdev, cb_arg, cb); 4623 4624 if (bdev->split_on_optimal_io_boundary && bdev_io_should_split(bio_to_abort)) { 4625 bdev_io->u.bdev.abort.bio_cb_arg = bio_to_abort; 4626 4627 /* Parent abort request is not submitted directly, but to manage its 4628 * execution add it to the submitted list here. 4629 */ 4630 bdev_io->internal.submit_tsc = spdk_get_ticks(); 4631 TAILQ_INSERT_TAIL(&channel->io_submitted, bdev_io, internal.ch_link); 4632 4633 bdev_abort(bdev_io); 4634 4635 return 0; 4636 } 4637 4638 bdev_io->u.abort.bio_to_abort = bio_to_abort; 4639 4640 /* Submit the abort request to the underlying bdev module. */ 4641 bdev_io_submit(bdev_io); 4642 4643 return 0; 4644 } 4645 4646 static uint32_t 4647 _bdev_abort(struct spdk_bdev_io *parent_io) 4648 { 4649 struct spdk_bdev_desc *desc = parent_io->internal.desc; 4650 struct spdk_bdev_channel *channel = parent_io->internal.ch; 4651 void *bio_cb_arg; 4652 struct spdk_bdev_io *bio_to_abort; 4653 uint32_t matched_ios; 4654 int rc; 4655 4656 bio_cb_arg = parent_io->u.bdev.abort.bio_cb_arg; 4657 4658 /* matched_ios is returned and will be kept by the caller. 4659 * 4660 * This funcion will be used for two cases, 1) the same cb_arg is used for 4661 * multiple I/Os, 2) a single large I/O is split into smaller ones. 4662 * Incrementing split_outstanding directly here may confuse readers especially 4663 * for the 1st case. 4664 * 4665 * Completion of I/O abort is processed after stack unwinding. Hence this trick 4666 * works as expected. 4667 */ 4668 matched_ios = 0; 4669 parent_io->internal.status = SPDK_BDEV_IO_STATUS_SUCCESS; 4670 4671 TAILQ_FOREACH(bio_to_abort, &channel->io_submitted, internal.ch_link) { 4672 if (bio_to_abort->internal.caller_ctx != bio_cb_arg) { 4673 continue; 4674 } 4675 4676 if (bio_to_abort->internal.submit_tsc > parent_io->internal.submit_tsc) { 4677 /* Any I/O which was submitted after this abort command should be excluded. */ 4678 continue; 4679 } 4680 4681 rc = bdev_abort_io(desc, channel, bio_to_abort, bdev_abort_io_done, parent_io); 4682 if (rc != 0) { 4683 if (rc == -ENOMEM) { 4684 parent_io->internal.status = SPDK_BDEV_IO_STATUS_NOMEM; 4685 } else { 4686 parent_io->internal.status = SPDK_BDEV_IO_STATUS_FAILED; 4687 } 4688 break; 4689 } 4690 matched_ios++; 4691 } 4692 4693 return matched_ios; 4694 } 4695 4696 static void 4697 bdev_abort_retry(void *ctx) 4698 { 4699 struct spdk_bdev_io *parent_io = ctx; 4700 uint32_t matched_ios; 4701 4702 matched_ios = _bdev_abort(parent_io); 4703 4704 if (matched_ios == 0) { 4705 if (parent_io->internal.status == SPDK_BDEV_IO_STATUS_NOMEM) { 4706 bdev_queue_io_wait_with_cb(parent_io, bdev_abort_retry); 4707 } else { 4708 /* For retry, the case that no target I/O was found is success 4709 * because it means target I/Os completed in the meantime. 4710 */ 4711 bdev_io_complete(parent_io); 4712 } 4713 return; 4714 } 4715 4716 /* Use split_outstanding to manage the progress of aborting I/Os. */ 4717 parent_io->u.bdev.split_outstanding = matched_ios; 4718 } 4719 4720 static void 4721 bdev_abort(struct spdk_bdev_io *parent_io) 4722 { 4723 uint32_t matched_ios; 4724 4725 matched_ios = _bdev_abort(parent_io); 4726 4727 if (matched_ios == 0) { 4728 if (parent_io->internal.status == SPDK_BDEV_IO_STATUS_NOMEM) { 4729 bdev_queue_io_wait_with_cb(parent_io, bdev_abort_retry); 4730 } else { 4731 /* The case the no target I/O was found is failure. */ 4732 parent_io->internal.status = SPDK_BDEV_IO_STATUS_FAILED; 4733 bdev_io_complete(parent_io); 4734 } 4735 return; 4736 } 4737 4738 /* Use split_outstanding to manage the progress of aborting I/Os. */ 4739 parent_io->u.bdev.split_outstanding = matched_ios; 4740 } 4741 4742 int 4743 spdk_bdev_abort(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 4744 void *bio_cb_arg, 4745 spdk_bdev_io_completion_cb cb, void *cb_arg) 4746 { 4747 struct spdk_bdev *bdev = spdk_bdev_desc_get_bdev(desc); 4748 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 4749 struct spdk_bdev_io *bdev_io; 4750 4751 if (bio_cb_arg == NULL) { 4752 return -EINVAL; 4753 } 4754 4755 if (!spdk_bdev_io_type_supported(bdev, SPDK_BDEV_IO_TYPE_ABORT)) { 4756 return -ENOTSUP; 4757 } 4758 4759 bdev_io = bdev_channel_get_io(channel); 4760 if (bdev_io == NULL) { 4761 return -ENOMEM; 4762 } 4763 4764 bdev_io->internal.ch = channel; 4765 bdev_io->internal.desc = desc; 4766 bdev_io->internal.submit_tsc = spdk_get_ticks(); 4767 bdev_io->type = SPDK_BDEV_IO_TYPE_ABORT; 4768 bdev_io_init(bdev_io, bdev, cb_arg, cb); 4769 4770 bdev_io->u.bdev.abort.bio_cb_arg = bio_cb_arg; 4771 4772 /* Parent abort request is not submitted directly, but to manage its execution, 4773 * add it to the submitted list here. 4774 */ 4775 TAILQ_INSERT_TAIL(&channel->io_submitted, bdev_io, internal.ch_link); 4776 4777 bdev_abort(bdev_io); 4778 4779 return 0; 4780 } 4781 4782 int 4783 spdk_bdev_queue_io_wait(struct spdk_bdev *bdev, struct spdk_io_channel *ch, 4784 struct spdk_bdev_io_wait_entry *entry) 4785 { 4786 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 4787 struct spdk_bdev_mgmt_channel *mgmt_ch = channel->shared_resource->mgmt_ch; 4788 4789 if (bdev != entry->bdev) { 4790 SPDK_ERRLOG("bdevs do not match\n"); 4791 return -EINVAL; 4792 } 4793 4794 if (mgmt_ch->per_thread_cache_count > 0) { 4795 SPDK_ERRLOG("Cannot queue io_wait if spdk_bdev_io available in per-thread cache\n"); 4796 return -EINVAL; 4797 } 4798 4799 TAILQ_INSERT_TAIL(&mgmt_ch->io_wait_queue, entry, link); 4800 return 0; 4801 } 4802 4803 static void 4804 bdev_ch_retry_io(struct spdk_bdev_channel *bdev_ch) 4805 { 4806 struct spdk_bdev *bdev = bdev_ch->bdev; 4807 struct spdk_bdev_shared_resource *shared_resource = bdev_ch->shared_resource; 4808 struct spdk_bdev_io *bdev_io; 4809 4810 if (shared_resource->io_outstanding > shared_resource->nomem_threshold) { 4811 /* 4812 * Allow some more I/O to complete before retrying the nomem_io queue. 4813 * Some drivers (such as nvme) cannot immediately take a new I/O in 4814 * the context of a completion, because the resources for the I/O are 4815 * not released until control returns to the bdev poller. Also, we 4816 * may require several small I/O to complete before a larger I/O 4817 * (that requires splitting) can be submitted. 4818 */ 4819 return; 4820 } 4821 4822 while (!TAILQ_EMPTY(&shared_resource->nomem_io)) { 4823 bdev_io = TAILQ_FIRST(&shared_resource->nomem_io); 4824 TAILQ_REMOVE(&shared_resource->nomem_io, bdev_io, internal.link); 4825 bdev_io->internal.ch->io_outstanding++; 4826 shared_resource->io_outstanding++; 4827 bdev_io->internal.status = SPDK_BDEV_IO_STATUS_PENDING; 4828 bdev_io->internal.error.nvme.cdw0 = 0; 4829 bdev_io->num_retries++; 4830 bdev->fn_table->submit_request(spdk_bdev_io_get_io_channel(bdev_io), bdev_io); 4831 if (bdev_io->internal.status == SPDK_BDEV_IO_STATUS_NOMEM) { 4832 break; 4833 } 4834 } 4835 } 4836 4837 static inline void 4838 bdev_io_complete(void *ctx) 4839 { 4840 struct spdk_bdev_io *bdev_io = ctx; 4841 struct spdk_bdev_channel *bdev_ch = bdev_io->internal.ch; 4842 uint64_t tsc, tsc_diff; 4843 4844 if (spdk_unlikely(bdev_io->internal.in_submit_request || bdev_io->internal.io_submit_ch)) { 4845 /* 4846 * Send the completion to the thread that originally submitted the I/O, 4847 * which may not be the current thread in the case of QoS. 4848 */ 4849 if (bdev_io->internal.io_submit_ch) { 4850 bdev_io->internal.ch = bdev_io->internal.io_submit_ch; 4851 bdev_io->internal.io_submit_ch = NULL; 4852 } 4853 4854 /* 4855 * Defer completion to avoid potential infinite recursion if the 4856 * user's completion callback issues a new I/O. 4857 */ 4858 spdk_thread_send_msg(spdk_bdev_io_get_thread(bdev_io), 4859 bdev_io_complete, bdev_io); 4860 return; 4861 } 4862 4863 tsc = spdk_get_ticks(); 4864 tsc_diff = tsc - bdev_io->internal.submit_tsc; 4865 spdk_trace_record_tsc(tsc, TRACE_BDEV_IO_DONE, 0, 0, (uintptr_t)bdev_io, 0); 4866 4867 TAILQ_REMOVE(&bdev_ch->io_submitted, bdev_io, internal.ch_link); 4868 4869 if (bdev_io->internal.ch->histogram) { 4870 spdk_histogram_data_tally(bdev_io->internal.ch->histogram, tsc_diff); 4871 } 4872 4873 if (bdev_io->internal.status == SPDK_BDEV_IO_STATUS_SUCCESS) { 4874 switch (bdev_io->type) { 4875 case SPDK_BDEV_IO_TYPE_READ: 4876 bdev_io->internal.ch->stat.bytes_read += bdev_io->u.bdev.num_blocks * bdev_io->bdev->blocklen; 4877 bdev_io->internal.ch->stat.num_read_ops++; 4878 bdev_io->internal.ch->stat.read_latency_ticks += tsc_diff; 4879 break; 4880 case SPDK_BDEV_IO_TYPE_WRITE: 4881 bdev_io->internal.ch->stat.bytes_written += bdev_io->u.bdev.num_blocks * bdev_io->bdev->blocklen; 4882 bdev_io->internal.ch->stat.num_write_ops++; 4883 bdev_io->internal.ch->stat.write_latency_ticks += tsc_diff; 4884 break; 4885 case SPDK_BDEV_IO_TYPE_UNMAP: 4886 bdev_io->internal.ch->stat.bytes_unmapped += bdev_io->u.bdev.num_blocks * bdev_io->bdev->blocklen; 4887 bdev_io->internal.ch->stat.num_unmap_ops++; 4888 bdev_io->internal.ch->stat.unmap_latency_ticks += tsc_diff; 4889 break; 4890 case SPDK_BDEV_IO_TYPE_ZCOPY: 4891 /* Track the data in the start phase only */ 4892 if (bdev_io->u.bdev.zcopy.start) { 4893 if (bdev_io->u.bdev.zcopy.populate) { 4894 bdev_io->internal.ch->stat.bytes_read += 4895 bdev_io->u.bdev.num_blocks * bdev_io->bdev->blocklen; 4896 bdev_io->internal.ch->stat.num_read_ops++; 4897 bdev_io->internal.ch->stat.read_latency_ticks += tsc_diff; 4898 } else { 4899 bdev_io->internal.ch->stat.bytes_written += 4900 bdev_io->u.bdev.num_blocks * bdev_io->bdev->blocklen; 4901 bdev_io->internal.ch->stat.num_write_ops++; 4902 bdev_io->internal.ch->stat.write_latency_ticks += tsc_diff; 4903 } 4904 } 4905 break; 4906 default: 4907 break; 4908 } 4909 } 4910 4911 #ifdef SPDK_CONFIG_VTUNE 4912 uint64_t now_tsc = spdk_get_ticks(); 4913 if (now_tsc > (bdev_io->internal.ch->start_tsc + bdev_io->internal.ch->interval_tsc)) { 4914 uint64_t data[5]; 4915 4916 data[0] = bdev_io->internal.ch->stat.num_read_ops - bdev_io->internal.ch->prev_stat.num_read_ops; 4917 data[1] = bdev_io->internal.ch->stat.bytes_read - bdev_io->internal.ch->prev_stat.bytes_read; 4918 data[2] = bdev_io->internal.ch->stat.num_write_ops - bdev_io->internal.ch->prev_stat.num_write_ops; 4919 data[3] = bdev_io->internal.ch->stat.bytes_written - bdev_io->internal.ch->prev_stat.bytes_written; 4920 data[4] = bdev_io->bdev->fn_table->get_spin_time ? 4921 bdev_io->bdev->fn_table->get_spin_time(spdk_bdev_io_get_io_channel(bdev_io)) : 0; 4922 4923 __itt_metadata_add(g_bdev_mgr.domain, __itt_null, bdev_io->internal.ch->handle, 4924 __itt_metadata_u64, 5, data); 4925 4926 bdev_io->internal.ch->prev_stat = bdev_io->internal.ch->stat; 4927 bdev_io->internal.ch->start_tsc = now_tsc; 4928 } 4929 #endif 4930 4931 assert(bdev_io->internal.cb != NULL); 4932 assert(spdk_get_thread() == spdk_bdev_io_get_thread(bdev_io)); 4933 4934 bdev_io->internal.cb(bdev_io, bdev_io->internal.status == SPDK_BDEV_IO_STATUS_SUCCESS, 4935 bdev_io->internal.caller_ctx); 4936 } 4937 4938 static void 4939 bdev_reset_complete(struct spdk_io_channel_iter *i, int status) 4940 { 4941 struct spdk_bdev_io *bdev_io = spdk_io_channel_iter_get_ctx(i); 4942 4943 if (bdev_io->u.reset.ch_ref != NULL) { 4944 spdk_put_io_channel(bdev_io->u.reset.ch_ref); 4945 bdev_io->u.reset.ch_ref = NULL; 4946 } 4947 4948 bdev_io_complete(bdev_io); 4949 } 4950 4951 static void 4952 bdev_unfreeze_channel(struct spdk_io_channel_iter *i) 4953 { 4954 struct spdk_bdev_io *bdev_io = spdk_io_channel_iter_get_ctx(i); 4955 struct spdk_io_channel *_ch = spdk_io_channel_iter_get_channel(i); 4956 struct spdk_bdev_channel *ch = spdk_io_channel_get_ctx(_ch); 4957 struct spdk_bdev_io *queued_reset; 4958 4959 ch->flags &= ~BDEV_CH_RESET_IN_PROGRESS; 4960 while (!TAILQ_EMPTY(&ch->queued_resets)) { 4961 queued_reset = TAILQ_FIRST(&ch->queued_resets); 4962 TAILQ_REMOVE(&ch->queued_resets, queued_reset, internal.link); 4963 spdk_bdev_io_complete(queued_reset, bdev_io->internal.status); 4964 } 4965 4966 spdk_for_each_channel_continue(i, 0); 4967 } 4968 4969 void 4970 spdk_bdev_io_complete(struct spdk_bdev_io *bdev_io, enum spdk_bdev_io_status status) 4971 { 4972 struct spdk_bdev *bdev = bdev_io->bdev; 4973 struct spdk_bdev_channel *bdev_ch = bdev_io->internal.ch; 4974 struct spdk_bdev_shared_resource *shared_resource = bdev_ch->shared_resource; 4975 4976 bdev_io->internal.status = status; 4977 4978 if (spdk_unlikely(bdev_io->type == SPDK_BDEV_IO_TYPE_RESET)) { 4979 bool unlock_channels = false; 4980 4981 if (status == SPDK_BDEV_IO_STATUS_NOMEM) { 4982 SPDK_ERRLOG("NOMEM returned for reset\n"); 4983 } 4984 pthread_mutex_lock(&bdev->internal.mutex); 4985 if (bdev_io == bdev->internal.reset_in_progress) { 4986 bdev->internal.reset_in_progress = NULL; 4987 unlock_channels = true; 4988 } 4989 pthread_mutex_unlock(&bdev->internal.mutex); 4990 4991 if (unlock_channels) { 4992 spdk_for_each_channel(__bdev_to_io_dev(bdev), bdev_unfreeze_channel, 4993 bdev_io, bdev_reset_complete); 4994 return; 4995 } 4996 } else { 4997 _bdev_io_unset_bounce_buf(bdev_io); 4998 4999 assert(bdev_ch->io_outstanding > 0); 5000 assert(shared_resource->io_outstanding > 0); 5001 bdev_ch->io_outstanding--; 5002 shared_resource->io_outstanding--; 5003 5004 if (spdk_unlikely(status == SPDK_BDEV_IO_STATUS_NOMEM)) { 5005 TAILQ_INSERT_HEAD(&shared_resource->nomem_io, bdev_io, internal.link); 5006 /* 5007 * Wait for some of the outstanding I/O to complete before we 5008 * retry any of the nomem_io. Normally we will wait for 5009 * NOMEM_THRESHOLD_COUNT I/O to complete but for low queue 5010 * depth channels we will instead wait for half to complete. 5011 */ 5012 shared_resource->nomem_threshold = spdk_max((int64_t)shared_resource->io_outstanding / 2, 5013 (int64_t)shared_resource->io_outstanding - NOMEM_THRESHOLD_COUNT); 5014 return; 5015 } 5016 5017 if (spdk_unlikely(!TAILQ_EMPTY(&shared_resource->nomem_io))) { 5018 bdev_ch_retry_io(bdev_ch); 5019 } 5020 } 5021 5022 bdev_io_complete(bdev_io); 5023 } 5024 5025 void 5026 spdk_bdev_io_complete_scsi_status(struct spdk_bdev_io *bdev_io, enum spdk_scsi_status sc, 5027 enum spdk_scsi_sense sk, uint8_t asc, uint8_t ascq) 5028 { 5029 if (sc == SPDK_SCSI_STATUS_GOOD) { 5030 bdev_io->internal.status = SPDK_BDEV_IO_STATUS_SUCCESS; 5031 } else { 5032 bdev_io->internal.status = SPDK_BDEV_IO_STATUS_SCSI_ERROR; 5033 bdev_io->internal.error.scsi.sc = sc; 5034 bdev_io->internal.error.scsi.sk = sk; 5035 bdev_io->internal.error.scsi.asc = asc; 5036 bdev_io->internal.error.scsi.ascq = ascq; 5037 } 5038 5039 spdk_bdev_io_complete(bdev_io, bdev_io->internal.status); 5040 } 5041 5042 void 5043 spdk_bdev_io_get_scsi_status(const struct spdk_bdev_io *bdev_io, 5044 int *sc, int *sk, int *asc, int *ascq) 5045 { 5046 assert(sc != NULL); 5047 assert(sk != NULL); 5048 assert(asc != NULL); 5049 assert(ascq != NULL); 5050 5051 switch (bdev_io->internal.status) { 5052 case SPDK_BDEV_IO_STATUS_SUCCESS: 5053 *sc = SPDK_SCSI_STATUS_GOOD; 5054 *sk = SPDK_SCSI_SENSE_NO_SENSE; 5055 *asc = SPDK_SCSI_ASC_NO_ADDITIONAL_SENSE; 5056 *ascq = SPDK_SCSI_ASCQ_CAUSE_NOT_REPORTABLE; 5057 break; 5058 case SPDK_BDEV_IO_STATUS_NVME_ERROR: 5059 spdk_scsi_nvme_translate(bdev_io, sc, sk, asc, ascq); 5060 break; 5061 case SPDK_BDEV_IO_STATUS_SCSI_ERROR: 5062 *sc = bdev_io->internal.error.scsi.sc; 5063 *sk = bdev_io->internal.error.scsi.sk; 5064 *asc = bdev_io->internal.error.scsi.asc; 5065 *ascq = bdev_io->internal.error.scsi.ascq; 5066 break; 5067 default: 5068 *sc = SPDK_SCSI_STATUS_CHECK_CONDITION; 5069 *sk = SPDK_SCSI_SENSE_ABORTED_COMMAND; 5070 *asc = SPDK_SCSI_ASC_NO_ADDITIONAL_SENSE; 5071 *ascq = SPDK_SCSI_ASCQ_CAUSE_NOT_REPORTABLE; 5072 break; 5073 } 5074 } 5075 5076 void 5077 spdk_bdev_io_complete_nvme_status(struct spdk_bdev_io *bdev_io, uint32_t cdw0, int sct, int sc) 5078 { 5079 if (sct == SPDK_NVME_SCT_GENERIC && sc == SPDK_NVME_SC_SUCCESS) { 5080 bdev_io->internal.status = SPDK_BDEV_IO_STATUS_SUCCESS; 5081 } else { 5082 bdev_io->internal.status = SPDK_BDEV_IO_STATUS_NVME_ERROR; 5083 } 5084 5085 bdev_io->internal.error.nvme.cdw0 = cdw0; 5086 bdev_io->internal.error.nvme.sct = sct; 5087 bdev_io->internal.error.nvme.sc = sc; 5088 5089 spdk_bdev_io_complete(bdev_io, bdev_io->internal.status); 5090 } 5091 5092 void 5093 spdk_bdev_io_get_nvme_status(const struct spdk_bdev_io *bdev_io, uint32_t *cdw0, int *sct, int *sc) 5094 { 5095 assert(sct != NULL); 5096 assert(sc != NULL); 5097 assert(cdw0 != NULL); 5098 5099 if (bdev_io->internal.status == SPDK_BDEV_IO_STATUS_NVME_ERROR) { 5100 *sct = bdev_io->internal.error.nvme.sct; 5101 *sc = bdev_io->internal.error.nvme.sc; 5102 } else if (bdev_io->internal.status == SPDK_BDEV_IO_STATUS_SUCCESS) { 5103 *sct = SPDK_NVME_SCT_GENERIC; 5104 *sc = SPDK_NVME_SC_SUCCESS; 5105 } else if (bdev_io->internal.status == SPDK_BDEV_IO_STATUS_ABORTED) { 5106 *sct = SPDK_NVME_SCT_GENERIC; 5107 *sc = SPDK_NVME_SC_ABORTED_BY_REQUEST; 5108 } else { 5109 *sct = SPDK_NVME_SCT_GENERIC; 5110 *sc = SPDK_NVME_SC_INTERNAL_DEVICE_ERROR; 5111 } 5112 5113 *cdw0 = bdev_io->internal.error.nvme.cdw0; 5114 } 5115 5116 void 5117 spdk_bdev_io_get_nvme_fused_status(const struct spdk_bdev_io *bdev_io, uint32_t *cdw0, 5118 int *first_sct, int *first_sc, int *second_sct, int *second_sc) 5119 { 5120 assert(first_sct != NULL); 5121 assert(first_sc != NULL); 5122 assert(second_sct != NULL); 5123 assert(second_sc != NULL); 5124 assert(cdw0 != NULL); 5125 5126 if (bdev_io->internal.status == SPDK_BDEV_IO_STATUS_NVME_ERROR) { 5127 if (bdev_io->internal.error.nvme.sct == SPDK_NVME_SCT_MEDIA_ERROR && 5128 bdev_io->internal.error.nvme.sc == SPDK_NVME_SC_COMPARE_FAILURE) { 5129 *first_sct = bdev_io->internal.error.nvme.sct; 5130 *first_sc = bdev_io->internal.error.nvme.sc; 5131 *second_sct = SPDK_NVME_SCT_GENERIC; 5132 *second_sc = SPDK_NVME_SC_ABORTED_FAILED_FUSED; 5133 } else { 5134 *first_sct = SPDK_NVME_SCT_GENERIC; 5135 *first_sc = SPDK_NVME_SC_SUCCESS; 5136 *second_sct = bdev_io->internal.error.nvme.sct; 5137 *second_sc = bdev_io->internal.error.nvme.sc; 5138 } 5139 } else if (bdev_io->internal.status == SPDK_BDEV_IO_STATUS_SUCCESS) { 5140 *first_sct = SPDK_NVME_SCT_GENERIC; 5141 *first_sc = SPDK_NVME_SC_SUCCESS; 5142 *second_sct = SPDK_NVME_SCT_GENERIC; 5143 *second_sc = SPDK_NVME_SC_SUCCESS; 5144 } else if (bdev_io->internal.status == SPDK_BDEV_IO_STATUS_FIRST_FUSED_FAILED) { 5145 *first_sct = SPDK_NVME_SCT_GENERIC; 5146 *first_sc = SPDK_NVME_SC_INTERNAL_DEVICE_ERROR; 5147 *second_sct = SPDK_NVME_SCT_GENERIC; 5148 *second_sc = SPDK_NVME_SC_ABORTED_FAILED_FUSED; 5149 } else if (bdev_io->internal.status == SPDK_BDEV_IO_STATUS_MISCOMPARE) { 5150 *first_sct = SPDK_NVME_SCT_MEDIA_ERROR; 5151 *first_sc = SPDK_NVME_SC_COMPARE_FAILURE; 5152 *second_sct = SPDK_NVME_SCT_GENERIC; 5153 *second_sc = SPDK_NVME_SC_ABORTED_FAILED_FUSED; 5154 } else { 5155 *first_sct = SPDK_NVME_SCT_GENERIC; 5156 *first_sc = SPDK_NVME_SC_INTERNAL_DEVICE_ERROR; 5157 *second_sct = SPDK_NVME_SCT_GENERIC; 5158 *second_sc = SPDK_NVME_SC_INTERNAL_DEVICE_ERROR; 5159 } 5160 5161 *cdw0 = bdev_io->internal.error.nvme.cdw0; 5162 } 5163 5164 struct spdk_thread * 5165 spdk_bdev_io_get_thread(struct spdk_bdev_io *bdev_io) 5166 { 5167 return spdk_io_channel_get_thread(bdev_io->internal.ch->channel); 5168 } 5169 5170 struct spdk_io_channel * 5171 spdk_bdev_io_get_io_channel(struct spdk_bdev_io *bdev_io) 5172 { 5173 return bdev_io->internal.ch->channel; 5174 } 5175 5176 static int 5177 bdev_init(struct spdk_bdev *bdev) 5178 { 5179 char *bdev_name; 5180 5181 assert(bdev->module != NULL); 5182 5183 if (!bdev->name) { 5184 SPDK_ERRLOG("Bdev name is NULL\n"); 5185 return -EINVAL; 5186 } 5187 5188 if (!strlen(bdev->name)) { 5189 SPDK_ERRLOG("Bdev name must not be an empty string\n"); 5190 return -EINVAL; 5191 } 5192 5193 if (spdk_bdev_get_by_name(bdev->name)) { 5194 SPDK_ERRLOG("Bdev name:%s already exists\n", bdev->name); 5195 return -EEXIST; 5196 } 5197 5198 /* Users often register their own I/O devices using the bdev name. In 5199 * order to avoid conflicts, prepend bdev_. */ 5200 bdev_name = spdk_sprintf_alloc("bdev_%s", bdev->name); 5201 if (!bdev_name) { 5202 SPDK_ERRLOG("Unable to allocate memory for internal bdev name.\n"); 5203 return -ENOMEM; 5204 } 5205 5206 bdev->internal.status = SPDK_BDEV_STATUS_READY; 5207 bdev->internal.measured_queue_depth = UINT64_MAX; 5208 bdev->internal.claim_module = NULL; 5209 bdev->internal.qd_poller = NULL; 5210 bdev->internal.qos = NULL; 5211 5212 /* If the user didn't specify a uuid, generate one. */ 5213 if (spdk_mem_all_zero(&bdev->uuid, sizeof(bdev->uuid))) { 5214 spdk_uuid_generate(&bdev->uuid); 5215 } 5216 5217 if (spdk_bdev_get_buf_align(bdev) > 1) { 5218 if (bdev->split_on_optimal_io_boundary) { 5219 bdev->optimal_io_boundary = spdk_min(bdev->optimal_io_boundary, 5220 SPDK_BDEV_LARGE_BUF_MAX_SIZE / bdev->blocklen); 5221 } else { 5222 bdev->split_on_optimal_io_boundary = true; 5223 bdev->optimal_io_boundary = SPDK_BDEV_LARGE_BUF_MAX_SIZE / bdev->blocklen; 5224 } 5225 } 5226 5227 /* If the user didn't specify a write unit size, set it to one. */ 5228 if (bdev->write_unit_size == 0) { 5229 bdev->write_unit_size = 1; 5230 } 5231 5232 /* Set ACWU value to 1 if bdev module did not set it (does not support it natively) */ 5233 if (bdev->acwu == 0) { 5234 bdev->acwu = 1; 5235 } 5236 5237 TAILQ_INIT(&bdev->internal.open_descs); 5238 TAILQ_INIT(&bdev->internal.locked_ranges); 5239 TAILQ_INIT(&bdev->internal.pending_locked_ranges); 5240 5241 TAILQ_INIT(&bdev->aliases); 5242 5243 bdev->internal.reset_in_progress = NULL; 5244 5245 spdk_io_device_register(__bdev_to_io_dev(bdev), 5246 bdev_channel_create, bdev_channel_destroy, 5247 sizeof(struct spdk_bdev_channel), 5248 bdev_name); 5249 5250 free(bdev_name); 5251 5252 pthread_mutex_init(&bdev->internal.mutex, NULL); 5253 return 0; 5254 } 5255 5256 static void 5257 bdev_destroy_cb(void *io_device) 5258 { 5259 int rc; 5260 struct spdk_bdev *bdev; 5261 spdk_bdev_unregister_cb cb_fn; 5262 void *cb_arg; 5263 5264 bdev = __bdev_from_io_dev(io_device); 5265 cb_fn = bdev->internal.unregister_cb; 5266 cb_arg = bdev->internal.unregister_ctx; 5267 5268 rc = bdev->fn_table->destruct(bdev->ctxt); 5269 if (rc < 0) { 5270 SPDK_ERRLOG("destruct failed\n"); 5271 } 5272 if (rc <= 0 && cb_fn != NULL) { 5273 cb_fn(cb_arg, rc); 5274 } 5275 } 5276 5277 5278 static void 5279 bdev_fini(struct spdk_bdev *bdev) 5280 { 5281 pthread_mutex_destroy(&bdev->internal.mutex); 5282 5283 free(bdev->internal.qos); 5284 5285 spdk_io_device_unregister(__bdev_to_io_dev(bdev), bdev_destroy_cb); 5286 } 5287 5288 static void 5289 bdev_start(struct spdk_bdev *bdev) 5290 { 5291 SPDK_DEBUGLOG(bdev, "Inserting bdev %s into list\n", bdev->name); 5292 TAILQ_INSERT_TAIL(&g_bdev_mgr.bdevs, bdev, internal.link); 5293 5294 /* Examine configuration before initializing I/O */ 5295 bdev_examine(bdev); 5296 } 5297 5298 int 5299 spdk_bdev_register(struct spdk_bdev *bdev) 5300 { 5301 int rc = bdev_init(bdev); 5302 5303 if (rc == 0) { 5304 bdev_start(bdev); 5305 } 5306 5307 spdk_notify_send("bdev_register", spdk_bdev_get_name(bdev)); 5308 return rc; 5309 } 5310 5311 int 5312 spdk_vbdev_register(struct spdk_bdev *vbdev, struct spdk_bdev **base_bdevs, int base_bdev_count) 5313 { 5314 SPDK_ERRLOG("This function is deprecated. Use spdk_bdev_register() instead.\n"); 5315 return spdk_bdev_register(vbdev); 5316 } 5317 5318 void 5319 spdk_bdev_destruct_done(struct spdk_bdev *bdev, int bdeverrno) 5320 { 5321 if (bdev->internal.unregister_cb != NULL) { 5322 bdev->internal.unregister_cb(bdev->internal.unregister_ctx, bdeverrno); 5323 } 5324 } 5325 5326 static void 5327 _remove_notify(void *arg) 5328 { 5329 struct spdk_bdev_desc *desc = arg; 5330 5331 pthread_mutex_lock(&desc->mutex); 5332 desc->refs--; 5333 5334 if (!desc->closed) { 5335 pthread_mutex_unlock(&desc->mutex); 5336 if (desc->callback.open_with_ext) { 5337 desc->callback.event_fn(SPDK_BDEV_EVENT_REMOVE, desc->bdev, desc->callback.ctx); 5338 } else { 5339 desc->callback.remove_fn(desc->callback.ctx); 5340 } 5341 return; 5342 } else if (0 == desc->refs) { 5343 /* This descriptor was closed after this remove_notify message was sent. 5344 * spdk_bdev_close() could not free the descriptor since this message was 5345 * in flight, so we free it now using bdev_desc_free(). 5346 */ 5347 pthread_mutex_unlock(&desc->mutex); 5348 bdev_desc_free(desc); 5349 return; 5350 } 5351 pthread_mutex_unlock(&desc->mutex); 5352 } 5353 5354 /* Must be called while holding bdev->internal.mutex. 5355 * returns: 0 - bdev removed and ready to be destructed. 5356 * -EBUSY - bdev can't be destructed yet. */ 5357 static int 5358 bdev_unregister_unsafe(struct spdk_bdev *bdev) 5359 { 5360 struct spdk_bdev_desc *desc, *tmp; 5361 int rc = 0; 5362 5363 /* Notify each descriptor about hotremoval */ 5364 TAILQ_FOREACH_SAFE(desc, &bdev->internal.open_descs, link, tmp) { 5365 rc = -EBUSY; 5366 pthread_mutex_lock(&desc->mutex); 5367 /* 5368 * Defer invocation of the event_cb to a separate message that will 5369 * run later on its thread. This ensures this context unwinds and 5370 * we don't recursively unregister this bdev again if the event_cb 5371 * immediately closes its descriptor. 5372 */ 5373 desc->refs++; 5374 spdk_thread_send_msg(desc->thread, _remove_notify, desc); 5375 pthread_mutex_unlock(&desc->mutex); 5376 } 5377 5378 /* If there are no descriptors, proceed removing the bdev */ 5379 if (rc == 0) { 5380 TAILQ_REMOVE(&g_bdev_mgr.bdevs, bdev, internal.link); 5381 SPDK_DEBUGLOG(bdev, "Removing bdev %s from list done\n", bdev->name); 5382 spdk_notify_send("bdev_unregister", spdk_bdev_get_name(bdev)); 5383 } 5384 5385 return rc; 5386 } 5387 5388 void 5389 spdk_bdev_unregister(struct spdk_bdev *bdev, spdk_bdev_unregister_cb cb_fn, void *cb_arg) 5390 { 5391 struct spdk_thread *thread; 5392 int rc; 5393 5394 SPDK_DEBUGLOG(bdev, "Removing bdev %s from list\n", bdev->name); 5395 5396 thread = spdk_get_thread(); 5397 if (!thread) { 5398 /* The user called this from a non-SPDK thread. */ 5399 if (cb_fn != NULL) { 5400 cb_fn(cb_arg, -ENOTSUP); 5401 } 5402 return; 5403 } 5404 5405 pthread_mutex_lock(&g_bdev_mgr.mutex); 5406 pthread_mutex_lock(&bdev->internal.mutex); 5407 if (bdev->internal.status == SPDK_BDEV_STATUS_REMOVING) { 5408 pthread_mutex_unlock(&bdev->internal.mutex); 5409 pthread_mutex_unlock(&g_bdev_mgr.mutex); 5410 if (cb_fn) { 5411 cb_fn(cb_arg, -EBUSY); 5412 } 5413 return; 5414 } 5415 5416 bdev->internal.status = SPDK_BDEV_STATUS_REMOVING; 5417 bdev->internal.unregister_cb = cb_fn; 5418 bdev->internal.unregister_ctx = cb_arg; 5419 5420 /* Call under lock. */ 5421 rc = bdev_unregister_unsafe(bdev); 5422 pthread_mutex_unlock(&bdev->internal.mutex); 5423 pthread_mutex_unlock(&g_bdev_mgr.mutex); 5424 5425 if (rc == 0) { 5426 bdev_fini(bdev); 5427 } 5428 } 5429 5430 static void 5431 bdev_dummy_event_cb(void *remove_ctx) 5432 { 5433 SPDK_DEBUGLOG(bdev, "Bdev remove event received with no remove callback specified"); 5434 } 5435 5436 static int 5437 bdev_start_qos(struct spdk_bdev *bdev) 5438 { 5439 struct set_qos_limit_ctx *ctx; 5440 5441 /* Enable QoS */ 5442 if (bdev->internal.qos && bdev->internal.qos->thread == NULL) { 5443 ctx = calloc(1, sizeof(*ctx)); 5444 if (ctx == NULL) { 5445 SPDK_ERRLOG("Failed to allocate memory for QoS context\n"); 5446 return -ENOMEM; 5447 } 5448 ctx->bdev = bdev; 5449 spdk_for_each_channel(__bdev_to_io_dev(bdev), 5450 bdev_enable_qos_msg, ctx, 5451 bdev_enable_qos_done); 5452 } 5453 5454 return 0; 5455 } 5456 5457 static int 5458 bdev_open(struct spdk_bdev *bdev, bool write, struct spdk_bdev_desc *desc) 5459 { 5460 struct spdk_thread *thread; 5461 int rc = 0; 5462 5463 thread = spdk_get_thread(); 5464 if (!thread) { 5465 SPDK_ERRLOG("Cannot open bdev from non-SPDK thread.\n"); 5466 return -ENOTSUP; 5467 } 5468 5469 SPDK_DEBUGLOG(bdev, "Opening descriptor %p for bdev %s on thread %p\n", desc, bdev->name, 5470 spdk_get_thread()); 5471 5472 desc->bdev = bdev; 5473 desc->thread = thread; 5474 desc->write = write; 5475 5476 pthread_mutex_lock(&bdev->internal.mutex); 5477 if (bdev->internal.status == SPDK_BDEV_STATUS_REMOVING) { 5478 pthread_mutex_unlock(&bdev->internal.mutex); 5479 return -ENODEV; 5480 } 5481 5482 if (write && bdev->internal.claim_module) { 5483 SPDK_ERRLOG("Could not open %s - %s module already claimed it\n", 5484 bdev->name, bdev->internal.claim_module->name); 5485 pthread_mutex_unlock(&bdev->internal.mutex); 5486 return -EPERM; 5487 } 5488 5489 rc = bdev_start_qos(bdev); 5490 if (rc != 0) { 5491 SPDK_ERRLOG("Failed to start QoS on bdev %s\n", bdev->name); 5492 pthread_mutex_unlock(&bdev->internal.mutex); 5493 return rc; 5494 } 5495 5496 TAILQ_INSERT_TAIL(&bdev->internal.open_descs, desc, link); 5497 5498 pthread_mutex_unlock(&bdev->internal.mutex); 5499 5500 return 0; 5501 } 5502 5503 int 5504 spdk_bdev_open(struct spdk_bdev *bdev, bool write, spdk_bdev_remove_cb_t remove_cb, 5505 void *remove_ctx, struct spdk_bdev_desc **_desc) 5506 { 5507 struct spdk_bdev_desc *desc; 5508 int rc; 5509 5510 desc = calloc(1, sizeof(*desc)); 5511 if (desc == NULL) { 5512 SPDK_ERRLOG("Failed to allocate memory for bdev descriptor\n"); 5513 return -ENOMEM; 5514 } 5515 5516 if (remove_cb == NULL) { 5517 remove_cb = bdev_dummy_event_cb; 5518 } 5519 5520 TAILQ_INIT(&desc->pending_media_events); 5521 TAILQ_INIT(&desc->free_media_events); 5522 5523 desc->callback.open_with_ext = false; 5524 desc->callback.remove_fn = remove_cb; 5525 desc->callback.ctx = remove_ctx; 5526 pthread_mutex_init(&desc->mutex, NULL); 5527 5528 pthread_mutex_lock(&g_bdev_mgr.mutex); 5529 5530 rc = bdev_open(bdev, write, desc); 5531 if (rc != 0) { 5532 bdev_desc_free(desc); 5533 desc = NULL; 5534 } 5535 5536 *_desc = desc; 5537 5538 pthread_mutex_unlock(&g_bdev_mgr.mutex); 5539 5540 return rc; 5541 } 5542 5543 int 5544 spdk_bdev_open_ext(const char *bdev_name, bool write, spdk_bdev_event_cb_t event_cb, 5545 void *event_ctx, struct spdk_bdev_desc **_desc) 5546 { 5547 struct spdk_bdev_desc *desc; 5548 struct spdk_bdev *bdev; 5549 unsigned int event_id; 5550 int rc; 5551 5552 if (event_cb == NULL) { 5553 SPDK_ERRLOG("Missing event callback function\n"); 5554 return -EINVAL; 5555 } 5556 5557 pthread_mutex_lock(&g_bdev_mgr.mutex); 5558 5559 bdev = spdk_bdev_get_by_name(bdev_name); 5560 5561 if (bdev == NULL) { 5562 SPDK_NOTICELOG("Currently unable to find bdev with name: %s\n", bdev_name); 5563 pthread_mutex_unlock(&g_bdev_mgr.mutex); 5564 return -ENODEV; 5565 } 5566 5567 desc = calloc(1, sizeof(*desc)); 5568 if (desc == NULL) { 5569 SPDK_ERRLOG("Failed to allocate memory for bdev descriptor\n"); 5570 pthread_mutex_unlock(&g_bdev_mgr.mutex); 5571 return -ENOMEM; 5572 } 5573 5574 TAILQ_INIT(&desc->pending_media_events); 5575 TAILQ_INIT(&desc->free_media_events); 5576 5577 desc->callback.open_with_ext = true; 5578 desc->callback.event_fn = event_cb; 5579 desc->callback.ctx = event_ctx; 5580 pthread_mutex_init(&desc->mutex, NULL); 5581 5582 if (bdev->media_events) { 5583 desc->media_events_buffer = calloc(MEDIA_EVENT_POOL_SIZE, 5584 sizeof(*desc->media_events_buffer)); 5585 if (desc->media_events_buffer == NULL) { 5586 SPDK_ERRLOG("Failed to initialize media event pool\n"); 5587 bdev_desc_free(desc); 5588 pthread_mutex_unlock(&g_bdev_mgr.mutex); 5589 return -ENOMEM; 5590 } 5591 5592 for (event_id = 0; event_id < MEDIA_EVENT_POOL_SIZE; ++event_id) { 5593 TAILQ_INSERT_TAIL(&desc->free_media_events, 5594 &desc->media_events_buffer[event_id], tailq); 5595 } 5596 } 5597 5598 rc = bdev_open(bdev, write, desc); 5599 if (rc != 0) { 5600 bdev_desc_free(desc); 5601 desc = NULL; 5602 } 5603 5604 *_desc = desc; 5605 5606 pthread_mutex_unlock(&g_bdev_mgr.mutex); 5607 5608 return rc; 5609 } 5610 5611 void 5612 spdk_bdev_close(struct spdk_bdev_desc *desc) 5613 { 5614 struct spdk_bdev *bdev = spdk_bdev_desc_get_bdev(desc); 5615 int rc; 5616 5617 SPDK_DEBUGLOG(bdev, "Closing descriptor %p for bdev %s on thread %p\n", desc, bdev->name, 5618 spdk_get_thread()); 5619 5620 assert(desc->thread == spdk_get_thread()); 5621 5622 spdk_poller_unregister(&desc->io_timeout_poller); 5623 5624 pthread_mutex_lock(&bdev->internal.mutex); 5625 pthread_mutex_lock(&desc->mutex); 5626 5627 TAILQ_REMOVE(&bdev->internal.open_descs, desc, link); 5628 5629 desc->closed = true; 5630 5631 if (0 == desc->refs) { 5632 pthread_mutex_unlock(&desc->mutex); 5633 bdev_desc_free(desc); 5634 } else { 5635 pthread_mutex_unlock(&desc->mutex); 5636 } 5637 5638 /* If no more descriptors, kill QoS channel */ 5639 if (bdev->internal.qos && TAILQ_EMPTY(&bdev->internal.open_descs)) { 5640 SPDK_DEBUGLOG(bdev, "Closed last descriptor for bdev %s on thread %p. Stopping QoS.\n", 5641 bdev->name, spdk_get_thread()); 5642 5643 if (bdev_qos_destroy(bdev)) { 5644 /* There isn't anything we can do to recover here. Just let the 5645 * old QoS poller keep running. The QoS handling won't change 5646 * cores when the user allocates a new channel, but it won't break. */ 5647 SPDK_ERRLOG("Unable to shut down QoS poller. It will continue running on the current thread.\n"); 5648 } 5649 } 5650 5651 spdk_bdev_set_qd_sampling_period(bdev, 0); 5652 5653 if (bdev->internal.status == SPDK_BDEV_STATUS_REMOVING && TAILQ_EMPTY(&bdev->internal.open_descs)) { 5654 rc = bdev_unregister_unsafe(bdev); 5655 pthread_mutex_unlock(&bdev->internal.mutex); 5656 5657 if (rc == 0) { 5658 bdev_fini(bdev); 5659 } 5660 } else { 5661 pthread_mutex_unlock(&bdev->internal.mutex); 5662 } 5663 } 5664 5665 int 5666 spdk_bdev_module_claim_bdev(struct spdk_bdev *bdev, struct spdk_bdev_desc *desc, 5667 struct spdk_bdev_module *module) 5668 { 5669 if (bdev->internal.claim_module != NULL) { 5670 SPDK_ERRLOG("bdev %s already claimed by module %s\n", bdev->name, 5671 bdev->internal.claim_module->name); 5672 return -EPERM; 5673 } 5674 5675 if (desc && !desc->write) { 5676 desc->write = true; 5677 } 5678 5679 bdev->internal.claim_module = module; 5680 return 0; 5681 } 5682 5683 void 5684 spdk_bdev_module_release_bdev(struct spdk_bdev *bdev) 5685 { 5686 assert(bdev->internal.claim_module != NULL); 5687 bdev->internal.claim_module = NULL; 5688 } 5689 5690 struct spdk_bdev * 5691 spdk_bdev_desc_get_bdev(struct spdk_bdev_desc *desc) 5692 { 5693 assert(desc != NULL); 5694 return desc->bdev; 5695 } 5696 5697 void 5698 spdk_bdev_io_get_iovec(struct spdk_bdev_io *bdev_io, struct iovec **iovp, int *iovcntp) 5699 { 5700 struct iovec *iovs; 5701 int iovcnt; 5702 5703 if (bdev_io == NULL) { 5704 return; 5705 } 5706 5707 switch (bdev_io->type) { 5708 case SPDK_BDEV_IO_TYPE_READ: 5709 case SPDK_BDEV_IO_TYPE_WRITE: 5710 case SPDK_BDEV_IO_TYPE_ZCOPY: 5711 iovs = bdev_io->u.bdev.iovs; 5712 iovcnt = bdev_io->u.bdev.iovcnt; 5713 break; 5714 default: 5715 iovs = NULL; 5716 iovcnt = 0; 5717 break; 5718 } 5719 5720 if (iovp) { 5721 *iovp = iovs; 5722 } 5723 if (iovcntp) { 5724 *iovcntp = iovcnt; 5725 } 5726 } 5727 5728 void * 5729 spdk_bdev_io_get_md_buf(struct spdk_bdev_io *bdev_io) 5730 { 5731 if (bdev_io == NULL) { 5732 return NULL; 5733 } 5734 5735 if (!spdk_bdev_is_md_separate(bdev_io->bdev)) { 5736 return NULL; 5737 } 5738 5739 if (bdev_io->type == SPDK_BDEV_IO_TYPE_READ || 5740 bdev_io->type == SPDK_BDEV_IO_TYPE_WRITE) { 5741 return bdev_io->u.bdev.md_buf; 5742 } 5743 5744 return NULL; 5745 } 5746 5747 void * 5748 spdk_bdev_io_get_cb_arg(struct spdk_bdev_io *bdev_io) 5749 { 5750 if (bdev_io == NULL) { 5751 assert(false); 5752 return NULL; 5753 } 5754 5755 return bdev_io->internal.caller_ctx; 5756 } 5757 5758 void 5759 spdk_bdev_module_list_add(struct spdk_bdev_module *bdev_module) 5760 { 5761 5762 if (spdk_bdev_module_list_find(bdev_module->name)) { 5763 SPDK_ERRLOG("ERROR: module '%s' already registered.\n", bdev_module->name); 5764 assert(false); 5765 } 5766 5767 /* 5768 * Modules with examine callbacks must be initialized first, so they are 5769 * ready to handle examine callbacks from later modules that will 5770 * register physical bdevs. 5771 */ 5772 if (bdev_module->examine_config != NULL || bdev_module->examine_disk != NULL) { 5773 TAILQ_INSERT_HEAD(&g_bdev_mgr.bdev_modules, bdev_module, internal.tailq); 5774 } else { 5775 TAILQ_INSERT_TAIL(&g_bdev_mgr.bdev_modules, bdev_module, internal.tailq); 5776 } 5777 } 5778 5779 struct spdk_bdev_module * 5780 spdk_bdev_module_list_find(const char *name) 5781 { 5782 struct spdk_bdev_module *bdev_module; 5783 5784 TAILQ_FOREACH(bdev_module, &g_bdev_mgr.bdev_modules, internal.tailq) { 5785 if (strcmp(name, bdev_module->name) == 0) { 5786 break; 5787 } 5788 } 5789 5790 return bdev_module; 5791 } 5792 5793 static void 5794 bdev_write_zero_buffer_next(void *_bdev_io) 5795 { 5796 struct spdk_bdev_io *bdev_io = _bdev_io; 5797 uint64_t num_bytes, num_blocks; 5798 void *md_buf = NULL; 5799 int rc; 5800 5801 num_bytes = spdk_min(_bdev_get_block_size_with_md(bdev_io->bdev) * 5802 bdev_io->u.bdev.split_remaining_num_blocks, 5803 ZERO_BUFFER_SIZE); 5804 num_blocks = num_bytes / _bdev_get_block_size_with_md(bdev_io->bdev); 5805 5806 if (spdk_bdev_is_md_separate(bdev_io->bdev)) { 5807 md_buf = (char *)g_bdev_mgr.zero_buffer + 5808 spdk_bdev_get_block_size(bdev_io->bdev) * num_blocks; 5809 } 5810 5811 rc = bdev_write_blocks_with_md(bdev_io->internal.desc, 5812 spdk_io_channel_from_ctx(bdev_io->internal.ch), 5813 g_bdev_mgr.zero_buffer, md_buf, 5814 bdev_io->u.bdev.split_current_offset_blocks, num_blocks, 5815 bdev_write_zero_buffer_done, bdev_io); 5816 if (rc == 0) { 5817 bdev_io->u.bdev.split_remaining_num_blocks -= num_blocks; 5818 bdev_io->u.bdev.split_current_offset_blocks += num_blocks; 5819 } else if (rc == -ENOMEM) { 5820 bdev_queue_io_wait_with_cb(bdev_io, bdev_write_zero_buffer_next); 5821 } else { 5822 bdev_io->internal.status = SPDK_BDEV_IO_STATUS_FAILED; 5823 bdev_io->internal.cb(bdev_io, false, bdev_io->internal.caller_ctx); 5824 } 5825 } 5826 5827 static void 5828 bdev_write_zero_buffer_done(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg) 5829 { 5830 struct spdk_bdev_io *parent_io = cb_arg; 5831 5832 spdk_bdev_free_io(bdev_io); 5833 5834 if (!success) { 5835 parent_io->internal.status = SPDK_BDEV_IO_STATUS_FAILED; 5836 parent_io->internal.cb(parent_io, false, parent_io->internal.caller_ctx); 5837 return; 5838 } 5839 5840 if (parent_io->u.bdev.split_remaining_num_blocks == 0) { 5841 parent_io->internal.status = SPDK_BDEV_IO_STATUS_SUCCESS; 5842 parent_io->internal.cb(parent_io, true, parent_io->internal.caller_ctx); 5843 return; 5844 } 5845 5846 bdev_write_zero_buffer_next(parent_io); 5847 } 5848 5849 static void 5850 bdev_set_qos_limit_done(struct set_qos_limit_ctx *ctx, int status) 5851 { 5852 pthread_mutex_lock(&ctx->bdev->internal.mutex); 5853 ctx->bdev->internal.qos_mod_in_progress = false; 5854 pthread_mutex_unlock(&ctx->bdev->internal.mutex); 5855 5856 if (ctx->cb_fn) { 5857 ctx->cb_fn(ctx->cb_arg, status); 5858 } 5859 free(ctx); 5860 } 5861 5862 static void 5863 bdev_disable_qos_done(void *cb_arg) 5864 { 5865 struct set_qos_limit_ctx *ctx = cb_arg; 5866 struct spdk_bdev *bdev = ctx->bdev; 5867 struct spdk_bdev_io *bdev_io; 5868 struct spdk_bdev_qos *qos; 5869 5870 pthread_mutex_lock(&bdev->internal.mutex); 5871 qos = bdev->internal.qos; 5872 bdev->internal.qos = NULL; 5873 pthread_mutex_unlock(&bdev->internal.mutex); 5874 5875 while (!TAILQ_EMPTY(&qos->queued)) { 5876 /* Send queued I/O back to their original thread for resubmission. */ 5877 bdev_io = TAILQ_FIRST(&qos->queued); 5878 TAILQ_REMOVE(&qos->queued, bdev_io, internal.link); 5879 5880 if (bdev_io->internal.io_submit_ch) { 5881 /* 5882 * Channel was changed when sending it to the QoS thread - change it back 5883 * before sending it back to the original thread. 5884 */ 5885 bdev_io->internal.ch = bdev_io->internal.io_submit_ch; 5886 bdev_io->internal.io_submit_ch = NULL; 5887 } 5888 5889 spdk_thread_send_msg(spdk_bdev_io_get_thread(bdev_io), 5890 _bdev_io_submit, bdev_io); 5891 } 5892 5893 if (qos->thread != NULL) { 5894 spdk_put_io_channel(spdk_io_channel_from_ctx(qos->ch)); 5895 spdk_poller_unregister(&qos->poller); 5896 } 5897 5898 free(qos); 5899 5900 bdev_set_qos_limit_done(ctx, 0); 5901 } 5902 5903 static void 5904 bdev_disable_qos_msg_done(struct spdk_io_channel_iter *i, int status) 5905 { 5906 void *io_device = spdk_io_channel_iter_get_io_device(i); 5907 struct spdk_bdev *bdev = __bdev_from_io_dev(io_device); 5908 struct set_qos_limit_ctx *ctx = spdk_io_channel_iter_get_ctx(i); 5909 struct spdk_thread *thread; 5910 5911 pthread_mutex_lock(&bdev->internal.mutex); 5912 thread = bdev->internal.qos->thread; 5913 pthread_mutex_unlock(&bdev->internal.mutex); 5914 5915 if (thread != NULL) { 5916 spdk_thread_send_msg(thread, bdev_disable_qos_done, ctx); 5917 } else { 5918 bdev_disable_qos_done(ctx); 5919 } 5920 } 5921 5922 static void 5923 bdev_disable_qos_msg(struct spdk_io_channel_iter *i) 5924 { 5925 struct spdk_io_channel *ch = spdk_io_channel_iter_get_channel(i); 5926 struct spdk_bdev_channel *bdev_ch = spdk_io_channel_get_ctx(ch); 5927 5928 bdev_ch->flags &= ~BDEV_CH_QOS_ENABLED; 5929 5930 spdk_for_each_channel_continue(i, 0); 5931 } 5932 5933 static void 5934 bdev_update_qos_rate_limit_msg(void *cb_arg) 5935 { 5936 struct set_qos_limit_ctx *ctx = cb_arg; 5937 struct spdk_bdev *bdev = ctx->bdev; 5938 5939 pthread_mutex_lock(&bdev->internal.mutex); 5940 bdev_qos_update_max_quota_per_timeslice(bdev->internal.qos); 5941 pthread_mutex_unlock(&bdev->internal.mutex); 5942 5943 bdev_set_qos_limit_done(ctx, 0); 5944 } 5945 5946 static void 5947 bdev_enable_qos_msg(struct spdk_io_channel_iter *i) 5948 { 5949 void *io_device = spdk_io_channel_iter_get_io_device(i); 5950 struct spdk_bdev *bdev = __bdev_from_io_dev(io_device); 5951 struct spdk_io_channel *ch = spdk_io_channel_iter_get_channel(i); 5952 struct spdk_bdev_channel *bdev_ch = spdk_io_channel_get_ctx(ch); 5953 5954 pthread_mutex_lock(&bdev->internal.mutex); 5955 bdev_enable_qos(bdev, bdev_ch); 5956 pthread_mutex_unlock(&bdev->internal.mutex); 5957 spdk_for_each_channel_continue(i, 0); 5958 } 5959 5960 static void 5961 bdev_enable_qos_done(struct spdk_io_channel_iter *i, int status) 5962 { 5963 struct set_qos_limit_ctx *ctx = spdk_io_channel_iter_get_ctx(i); 5964 5965 bdev_set_qos_limit_done(ctx, status); 5966 } 5967 5968 static void 5969 bdev_set_qos_rate_limits(struct spdk_bdev *bdev, uint64_t *limits) 5970 { 5971 int i; 5972 5973 assert(bdev->internal.qos != NULL); 5974 5975 for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) { 5976 if (limits[i] != SPDK_BDEV_QOS_LIMIT_NOT_DEFINED) { 5977 bdev->internal.qos->rate_limits[i].limit = limits[i]; 5978 5979 if (limits[i] == 0) { 5980 bdev->internal.qos->rate_limits[i].limit = 5981 SPDK_BDEV_QOS_LIMIT_NOT_DEFINED; 5982 } 5983 } 5984 } 5985 } 5986 5987 void 5988 spdk_bdev_set_qos_rate_limits(struct spdk_bdev *bdev, uint64_t *limits, 5989 void (*cb_fn)(void *cb_arg, int status), void *cb_arg) 5990 { 5991 struct set_qos_limit_ctx *ctx; 5992 uint32_t limit_set_complement; 5993 uint64_t min_limit_per_sec; 5994 int i; 5995 bool disable_rate_limit = true; 5996 5997 for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) { 5998 if (limits[i] == SPDK_BDEV_QOS_LIMIT_NOT_DEFINED) { 5999 continue; 6000 } 6001 6002 if (limits[i] > 0) { 6003 disable_rate_limit = false; 6004 } 6005 6006 if (bdev_qos_is_iops_rate_limit(i) == true) { 6007 min_limit_per_sec = SPDK_BDEV_QOS_MIN_IOS_PER_SEC; 6008 } else { 6009 /* Change from megabyte to byte rate limit */ 6010 limits[i] = limits[i] * 1024 * 1024; 6011 min_limit_per_sec = SPDK_BDEV_QOS_MIN_BYTES_PER_SEC; 6012 } 6013 6014 limit_set_complement = limits[i] % min_limit_per_sec; 6015 if (limit_set_complement) { 6016 SPDK_ERRLOG("Requested rate limit %" PRIu64 " is not a multiple of %" PRIu64 "\n", 6017 limits[i], min_limit_per_sec); 6018 limits[i] += min_limit_per_sec - limit_set_complement; 6019 SPDK_ERRLOG("Round up the rate limit to %" PRIu64 "\n", limits[i]); 6020 } 6021 } 6022 6023 ctx = calloc(1, sizeof(*ctx)); 6024 if (ctx == NULL) { 6025 cb_fn(cb_arg, -ENOMEM); 6026 return; 6027 } 6028 6029 ctx->cb_fn = cb_fn; 6030 ctx->cb_arg = cb_arg; 6031 ctx->bdev = bdev; 6032 6033 pthread_mutex_lock(&bdev->internal.mutex); 6034 if (bdev->internal.qos_mod_in_progress) { 6035 pthread_mutex_unlock(&bdev->internal.mutex); 6036 free(ctx); 6037 cb_fn(cb_arg, -EAGAIN); 6038 return; 6039 } 6040 bdev->internal.qos_mod_in_progress = true; 6041 6042 if (disable_rate_limit == true && bdev->internal.qos) { 6043 for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) { 6044 if (limits[i] == SPDK_BDEV_QOS_LIMIT_NOT_DEFINED && 6045 (bdev->internal.qos->rate_limits[i].limit > 0 && 6046 bdev->internal.qos->rate_limits[i].limit != 6047 SPDK_BDEV_QOS_LIMIT_NOT_DEFINED)) { 6048 disable_rate_limit = false; 6049 break; 6050 } 6051 } 6052 } 6053 6054 if (disable_rate_limit == false) { 6055 if (bdev->internal.qos == NULL) { 6056 bdev->internal.qos = calloc(1, sizeof(*bdev->internal.qos)); 6057 if (!bdev->internal.qos) { 6058 pthread_mutex_unlock(&bdev->internal.mutex); 6059 SPDK_ERRLOG("Unable to allocate memory for QoS tracking\n"); 6060 bdev_set_qos_limit_done(ctx, -ENOMEM); 6061 return; 6062 } 6063 } 6064 6065 if (bdev->internal.qos->thread == NULL) { 6066 /* Enabling */ 6067 bdev_set_qos_rate_limits(bdev, limits); 6068 6069 spdk_for_each_channel(__bdev_to_io_dev(bdev), 6070 bdev_enable_qos_msg, ctx, 6071 bdev_enable_qos_done); 6072 } else { 6073 /* Updating */ 6074 bdev_set_qos_rate_limits(bdev, limits); 6075 6076 spdk_thread_send_msg(bdev->internal.qos->thread, 6077 bdev_update_qos_rate_limit_msg, ctx); 6078 } 6079 } else { 6080 if (bdev->internal.qos != NULL) { 6081 bdev_set_qos_rate_limits(bdev, limits); 6082 6083 /* Disabling */ 6084 spdk_for_each_channel(__bdev_to_io_dev(bdev), 6085 bdev_disable_qos_msg, ctx, 6086 bdev_disable_qos_msg_done); 6087 } else { 6088 pthread_mutex_unlock(&bdev->internal.mutex); 6089 bdev_set_qos_limit_done(ctx, 0); 6090 return; 6091 } 6092 } 6093 6094 pthread_mutex_unlock(&bdev->internal.mutex); 6095 } 6096 6097 struct spdk_bdev_histogram_ctx { 6098 spdk_bdev_histogram_status_cb cb_fn; 6099 void *cb_arg; 6100 struct spdk_bdev *bdev; 6101 int status; 6102 }; 6103 6104 static void 6105 bdev_histogram_disable_channel_cb(struct spdk_io_channel_iter *i, int status) 6106 { 6107 struct spdk_bdev_histogram_ctx *ctx = spdk_io_channel_iter_get_ctx(i); 6108 6109 pthread_mutex_lock(&ctx->bdev->internal.mutex); 6110 ctx->bdev->internal.histogram_in_progress = false; 6111 pthread_mutex_unlock(&ctx->bdev->internal.mutex); 6112 ctx->cb_fn(ctx->cb_arg, ctx->status); 6113 free(ctx); 6114 } 6115 6116 static void 6117 bdev_histogram_disable_channel(struct spdk_io_channel_iter *i) 6118 { 6119 struct spdk_io_channel *_ch = spdk_io_channel_iter_get_channel(i); 6120 struct spdk_bdev_channel *ch = spdk_io_channel_get_ctx(_ch); 6121 6122 if (ch->histogram != NULL) { 6123 spdk_histogram_data_free(ch->histogram); 6124 ch->histogram = NULL; 6125 } 6126 spdk_for_each_channel_continue(i, 0); 6127 } 6128 6129 static void 6130 bdev_histogram_enable_channel_cb(struct spdk_io_channel_iter *i, int status) 6131 { 6132 struct spdk_bdev_histogram_ctx *ctx = spdk_io_channel_iter_get_ctx(i); 6133 6134 if (status != 0) { 6135 ctx->status = status; 6136 ctx->bdev->internal.histogram_enabled = false; 6137 spdk_for_each_channel(__bdev_to_io_dev(ctx->bdev), bdev_histogram_disable_channel, ctx, 6138 bdev_histogram_disable_channel_cb); 6139 } else { 6140 pthread_mutex_lock(&ctx->bdev->internal.mutex); 6141 ctx->bdev->internal.histogram_in_progress = false; 6142 pthread_mutex_unlock(&ctx->bdev->internal.mutex); 6143 ctx->cb_fn(ctx->cb_arg, ctx->status); 6144 free(ctx); 6145 } 6146 } 6147 6148 static void 6149 bdev_histogram_enable_channel(struct spdk_io_channel_iter *i) 6150 { 6151 struct spdk_io_channel *_ch = spdk_io_channel_iter_get_channel(i); 6152 struct spdk_bdev_channel *ch = spdk_io_channel_get_ctx(_ch); 6153 int status = 0; 6154 6155 if (ch->histogram == NULL) { 6156 ch->histogram = spdk_histogram_data_alloc(); 6157 if (ch->histogram == NULL) { 6158 status = -ENOMEM; 6159 } 6160 } 6161 6162 spdk_for_each_channel_continue(i, status); 6163 } 6164 6165 void 6166 spdk_bdev_histogram_enable(struct spdk_bdev *bdev, spdk_bdev_histogram_status_cb cb_fn, 6167 void *cb_arg, bool enable) 6168 { 6169 struct spdk_bdev_histogram_ctx *ctx; 6170 6171 ctx = calloc(1, sizeof(struct spdk_bdev_histogram_ctx)); 6172 if (ctx == NULL) { 6173 cb_fn(cb_arg, -ENOMEM); 6174 return; 6175 } 6176 6177 ctx->bdev = bdev; 6178 ctx->status = 0; 6179 ctx->cb_fn = cb_fn; 6180 ctx->cb_arg = cb_arg; 6181 6182 pthread_mutex_lock(&bdev->internal.mutex); 6183 if (bdev->internal.histogram_in_progress) { 6184 pthread_mutex_unlock(&bdev->internal.mutex); 6185 free(ctx); 6186 cb_fn(cb_arg, -EAGAIN); 6187 return; 6188 } 6189 6190 bdev->internal.histogram_in_progress = true; 6191 pthread_mutex_unlock(&bdev->internal.mutex); 6192 6193 bdev->internal.histogram_enabled = enable; 6194 6195 if (enable) { 6196 /* Allocate histogram for each channel */ 6197 spdk_for_each_channel(__bdev_to_io_dev(bdev), bdev_histogram_enable_channel, ctx, 6198 bdev_histogram_enable_channel_cb); 6199 } else { 6200 spdk_for_each_channel(__bdev_to_io_dev(bdev), bdev_histogram_disable_channel, ctx, 6201 bdev_histogram_disable_channel_cb); 6202 } 6203 } 6204 6205 struct spdk_bdev_histogram_data_ctx { 6206 spdk_bdev_histogram_data_cb cb_fn; 6207 void *cb_arg; 6208 struct spdk_bdev *bdev; 6209 /** merged histogram data from all channels */ 6210 struct spdk_histogram_data *histogram; 6211 }; 6212 6213 static void 6214 bdev_histogram_get_channel_cb(struct spdk_io_channel_iter *i, int status) 6215 { 6216 struct spdk_bdev_histogram_data_ctx *ctx = spdk_io_channel_iter_get_ctx(i); 6217 6218 ctx->cb_fn(ctx->cb_arg, status, ctx->histogram); 6219 free(ctx); 6220 } 6221 6222 static void 6223 bdev_histogram_get_channel(struct spdk_io_channel_iter *i) 6224 { 6225 struct spdk_io_channel *_ch = spdk_io_channel_iter_get_channel(i); 6226 struct spdk_bdev_channel *ch = spdk_io_channel_get_ctx(_ch); 6227 struct spdk_bdev_histogram_data_ctx *ctx = spdk_io_channel_iter_get_ctx(i); 6228 int status = 0; 6229 6230 if (ch->histogram == NULL) { 6231 status = -EFAULT; 6232 } else { 6233 spdk_histogram_data_merge(ctx->histogram, ch->histogram); 6234 } 6235 6236 spdk_for_each_channel_continue(i, status); 6237 } 6238 6239 void 6240 spdk_bdev_histogram_get(struct spdk_bdev *bdev, struct spdk_histogram_data *histogram, 6241 spdk_bdev_histogram_data_cb cb_fn, 6242 void *cb_arg) 6243 { 6244 struct spdk_bdev_histogram_data_ctx *ctx; 6245 6246 ctx = calloc(1, sizeof(struct spdk_bdev_histogram_data_ctx)); 6247 if (ctx == NULL) { 6248 cb_fn(cb_arg, -ENOMEM, NULL); 6249 return; 6250 } 6251 6252 ctx->bdev = bdev; 6253 ctx->cb_fn = cb_fn; 6254 ctx->cb_arg = cb_arg; 6255 6256 ctx->histogram = histogram; 6257 6258 spdk_for_each_channel(__bdev_to_io_dev(bdev), bdev_histogram_get_channel, ctx, 6259 bdev_histogram_get_channel_cb); 6260 } 6261 6262 size_t 6263 spdk_bdev_get_media_events(struct spdk_bdev_desc *desc, struct spdk_bdev_media_event *events, 6264 size_t max_events) 6265 { 6266 struct media_event_entry *entry; 6267 size_t num_events = 0; 6268 6269 for (; num_events < max_events; ++num_events) { 6270 entry = TAILQ_FIRST(&desc->pending_media_events); 6271 if (entry == NULL) { 6272 break; 6273 } 6274 6275 events[num_events] = entry->event; 6276 TAILQ_REMOVE(&desc->pending_media_events, entry, tailq); 6277 TAILQ_INSERT_TAIL(&desc->free_media_events, entry, tailq); 6278 } 6279 6280 return num_events; 6281 } 6282 6283 int 6284 spdk_bdev_push_media_events(struct spdk_bdev *bdev, const struct spdk_bdev_media_event *events, 6285 size_t num_events) 6286 { 6287 struct spdk_bdev_desc *desc; 6288 struct media_event_entry *entry; 6289 size_t event_id; 6290 int rc = 0; 6291 6292 assert(bdev->media_events); 6293 6294 pthread_mutex_lock(&bdev->internal.mutex); 6295 TAILQ_FOREACH(desc, &bdev->internal.open_descs, link) { 6296 if (desc->write) { 6297 break; 6298 } 6299 } 6300 6301 if (desc == NULL || desc->media_events_buffer == NULL) { 6302 rc = -ENODEV; 6303 goto out; 6304 } 6305 6306 for (event_id = 0; event_id < num_events; ++event_id) { 6307 entry = TAILQ_FIRST(&desc->free_media_events); 6308 if (entry == NULL) { 6309 break; 6310 } 6311 6312 TAILQ_REMOVE(&desc->free_media_events, entry, tailq); 6313 TAILQ_INSERT_TAIL(&desc->pending_media_events, entry, tailq); 6314 entry->event = events[event_id]; 6315 } 6316 6317 rc = event_id; 6318 out: 6319 pthread_mutex_unlock(&bdev->internal.mutex); 6320 return rc; 6321 } 6322 6323 void 6324 spdk_bdev_notify_media_management(struct spdk_bdev *bdev) 6325 { 6326 struct spdk_bdev_desc *desc; 6327 6328 pthread_mutex_lock(&bdev->internal.mutex); 6329 TAILQ_FOREACH(desc, &bdev->internal.open_descs, link) { 6330 if (!TAILQ_EMPTY(&desc->pending_media_events)) { 6331 desc->callback.event_fn(SPDK_BDEV_EVENT_MEDIA_MANAGEMENT, bdev, 6332 desc->callback.ctx); 6333 } 6334 } 6335 pthread_mutex_unlock(&bdev->internal.mutex); 6336 } 6337 6338 struct locked_lba_range_ctx { 6339 struct lba_range range; 6340 struct spdk_bdev *bdev; 6341 struct lba_range *current_range; 6342 struct lba_range *owner_range; 6343 struct spdk_poller *poller; 6344 lock_range_cb cb_fn; 6345 void *cb_arg; 6346 }; 6347 6348 static void 6349 bdev_lock_error_cleanup_cb(struct spdk_io_channel_iter *i, int status) 6350 { 6351 struct locked_lba_range_ctx *ctx = spdk_io_channel_iter_get_ctx(i); 6352 6353 ctx->cb_fn(ctx->cb_arg, -ENOMEM); 6354 free(ctx); 6355 } 6356 6357 static void 6358 bdev_unlock_lba_range_get_channel(struct spdk_io_channel_iter *i); 6359 6360 static void 6361 bdev_lock_lba_range_cb(struct spdk_io_channel_iter *i, int status) 6362 { 6363 struct locked_lba_range_ctx *ctx = spdk_io_channel_iter_get_ctx(i); 6364 struct spdk_bdev *bdev = ctx->bdev; 6365 6366 if (status == -ENOMEM) { 6367 /* One of the channels could not allocate a range object. 6368 * So we have to go back and clean up any ranges that were 6369 * allocated successfully before we return error status to 6370 * the caller. We can reuse the unlock function to do that 6371 * clean up. 6372 */ 6373 spdk_for_each_channel(__bdev_to_io_dev(bdev), 6374 bdev_unlock_lba_range_get_channel, ctx, 6375 bdev_lock_error_cleanup_cb); 6376 return; 6377 } 6378 6379 /* All channels have locked this range and no I/O overlapping the range 6380 * are outstanding! Set the owner_ch for the range object for the 6381 * locking channel, so that this channel will know that it is allowed 6382 * to write to this range. 6383 */ 6384 ctx->owner_range->owner_ch = ctx->range.owner_ch; 6385 ctx->cb_fn(ctx->cb_arg, status); 6386 6387 /* Don't free the ctx here. Its range is in the bdev's global list of 6388 * locked ranges still, and will be removed and freed when this range 6389 * is later unlocked. 6390 */ 6391 } 6392 6393 static int 6394 bdev_lock_lba_range_check_io(void *_i) 6395 { 6396 struct spdk_io_channel_iter *i = _i; 6397 struct spdk_io_channel *_ch = spdk_io_channel_iter_get_channel(i); 6398 struct spdk_bdev_channel *ch = spdk_io_channel_get_ctx(_ch); 6399 struct locked_lba_range_ctx *ctx = spdk_io_channel_iter_get_ctx(i); 6400 struct lba_range *range = ctx->current_range; 6401 struct spdk_bdev_io *bdev_io; 6402 6403 spdk_poller_unregister(&ctx->poller); 6404 6405 /* The range is now in the locked_ranges, so no new IO can be submitted to this 6406 * range. But we need to wait until any outstanding IO overlapping with this range 6407 * are completed. 6408 */ 6409 TAILQ_FOREACH(bdev_io, &ch->io_submitted, internal.ch_link) { 6410 if (bdev_io_range_is_locked(bdev_io, range)) { 6411 ctx->poller = SPDK_POLLER_REGISTER(bdev_lock_lba_range_check_io, i, 100); 6412 return SPDK_POLLER_BUSY; 6413 } 6414 } 6415 6416 spdk_for_each_channel_continue(i, 0); 6417 return SPDK_POLLER_BUSY; 6418 } 6419 6420 static void 6421 bdev_lock_lba_range_get_channel(struct spdk_io_channel_iter *i) 6422 { 6423 struct spdk_io_channel *_ch = spdk_io_channel_iter_get_channel(i); 6424 struct spdk_bdev_channel *ch = spdk_io_channel_get_ctx(_ch); 6425 struct locked_lba_range_ctx *ctx = spdk_io_channel_iter_get_ctx(i); 6426 struct lba_range *range; 6427 6428 TAILQ_FOREACH(range, &ch->locked_ranges, tailq) { 6429 if (range->length == ctx->range.length && 6430 range->offset == ctx->range.offset && 6431 range->locked_ctx == ctx->range.locked_ctx) { 6432 /* This range already exists on this channel, so don't add 6433 * it again. This can happen when a new channel is created 6434 * while the for_each_channel operation is in progress. 6435 * Do not check for outstanding I/O in that case, since the 6436 * range was locked before any I/O could be submitted to the 6437 * new channel. 6438 */ 6439 spdk_for_each_channel_continue(i, 0); 6440 return; 6441 } 6442 } 6443 6444 range = calloc(1, sizeof(*range)); 6445 if (range == NULL) { 6446 spdk_for_each_channel_continue(i, -ENOMEM); 6447 return; 6448 } 6449 6450 range->length = ctx->range.length; 6451 range->offset = ctx->range.offset; 6452 range->locked_ctx = ctx->range.locked_ctx; 6453 ctx->current_range = range; 6454 if (ctx->range.owner_ch == ch) { 6455 /* This is the range object for the channel that will hold 6456 * the lock. Store it in the ctx object so that we can easily 6457 * set its owner_ch after the lock is finally acquired. 6458 */ 6459 ctx->owner_range = range; 6460 } 6461 TAILQ_INSERT_TAIL(&ch->locked_ranges, range, tailq); 6462 bdev_lock_lba_range_check_io(i); 6463 } 6464 6465 static void 6466 bdev_lock_lba_range_ctx(struct spdk_bdev *bdev, struct locked_lba_range_ctx *ctx) 6467 { 6468 assert(spdk_get_thread() == ctx->range.owner_ch->channel->thread); 6469 6470 /* We will add a copy of this range to each channel now. */ 6471 spdk_for_each_channel(__bdev_to_io_dev(bdev), bdev_lock_lba_range_get_channel, ctx, 6472 bdev_lock_lba_range_cb); 6473 } 6474 6475 static bool 6476 bdev_lba_range_overlaps_tailq(struct lba_range *range, lba_range_tailq_t *tailq) 6477 { 6478 struct lba_range *r; 6479 6480 TAILQ_FOREACH(r, tailq, tailq) { 6481 if (bdev_lba_range_overlapped(range, r)) { 6482 return true; 6483 } 6484 } 6485 return false; 6486 } 6487 6488 static int 6489 bdev_lock_lba_range(struct spdk_bdev_desc *desc, struct spdk_io_channel *_ch, 6490 uint64_t offset, uint64_t length, 6491 lock_range_cb cb_fn, void *cb_arg) 6492 { 6493 struct spdk_bdev *bdev = spdk_bdev_desc_get_bdev(desc); 6494 struct spdk_bdev_channel *ch = spdk_io_channel_get_ctx(_ch); 6495 struct locked_lba_range_ctx *ctx; 6496 6497 if (cb_arg == NULL) { 6498 SPDK_ERRLOG("cb_arg must not be NULL\n"); 6499 return -EINVAL; 6500 } 6501 6502 ctx = calloc(1, sizeof(*ctx)); 6503 if (ctx == NULL) { 6504 return -ENOMEM; 6505 } 6506 6507 ctx->range.offset = offset; 6508 ctx->range.length = length; 6509 ctx->range.owner_ch = ch; 6510 ctx->range.locked_ctx = cb_arg; 6511 ctx->bdev = bdev; 6512 ctx->cb_fn = cb_fn; 6513 ctx->cb_arg = cb_arg; 6514 6515 pthread_mutex_lock(&bdev->internal.mutex); 6516 if (bdev_lba_range_overlaps_tailq(&ctx->range, &bdev->internal.locked_ranges)) { 6517 /* There is an active lock overlapping with this range. 6518 * Put it on the pending list until this range no 6519 * longer overlaps with another. 6520 */ 6521 TAILQ_INSERT_TAIL(&bdev->internal.pending_locked_ranges, &ctx->range, tailq); 6522 } else { 6523 TAILQ_INSERT_TAIL(&bdev->internal.locked_ranges, &ctx->range, tailq); 6524 bdev_lock_lba_range_ctx(bdev, ctx); 6525 } 6526 pthread_mutex_unlock(&bdev->internal.mutex); 6527 return 0; 6528 } 6529 6530 static void 6531 bdev_lock_lba_range_ctx_msg(void *_ctx) 6532 { 6533 struct locked_lba_range_ctx *ctx = _ctx; 6534 6535 bdev_lock_lba_range_ctx(ctx->bdev, ctx); 6536 } 6537 6538 static void 6539 bdev_unlock_lba_range_cb(struct spdk_io_channel_iter *i, int status) 6540 { 6541 struct locked_lba_range_ctx *ctx = spdk_io_channel_iter_get_ctx(i); 6542 struct locked_lba_range_ctx *pending_ctx; 6543 struct spdk_bdev_channel *ch = ctx->range.owner_ch; 6544 struct spdk_bdev *bdev = ch->bdev; 6545 struct lba_range *range, *tmp; 6546 6547 pthread_mutex_lock(&bdev->internal.mutex); 6548 /* Check if there are any pending locked ranges that overlap with this range 6549 * that was just unlocked. If there are, check that it doesn't overlap with any 6550 * other locked ranges before calling bdev_lock_lba_range_ctx which will start 6551 * the lock process. 6552 */ 6553 TAILQ_FOREACH_SAFE(range, &bdev->internal.pending_locked_ranges, tailq, tmp) { 6554 if (bdev_lba_range_overlapped(range, &ctx->range) && 6555 !bdev_lba_range_overlaps_tailq(range, &bdev->internal.locked_ranges)) { 6556 TAILQ_REMOVE(&bdev->internal.pending_locked_ranges, range, tailq); 6557 pending_ctx = SPDK_CONTAINEROF(range, struct locked_lba_range_ctx, range); 6558 TAILQ_INSERT_TAIL(&bdev->internal.locked_ranges, range, tailq); 6559 spdk_thread_send_msg(pending_ctx->range.owner_ch->channel->thread, 6560 bdev_lock_lba_range_ctx_msg, pending_ctx); 6561 } 6562 } 6563 pthread_mutex_unlock(&bdev->internal.mutex); 6564 6565 ctx->cb_fn(ctx->cb_arg, status); 6566 free(ctx); 6567 } 6568 6569 static void 6570 bdev_unlock_lba_range_get_channel(struct spdk_io_channel_iter *i) 6571 { 6572 struct spdk_io_channel *_ch = spdk_io_channel_iter_get_channel(i); 6573 struct spdk_bdev_channel *ch = spdk_io_channel_get_ctx(_ch); 6574 struct locked_lba_range_ctx *ctx = spdk_io_channel_iter_get_ctx(i); 6575 TAILQ_HEAD(, spdk_bdev_io) io_locked; 6576 struct spdk_bdev_io *bdev_io; 6577 struct lba_range *range; 6578 6579 TAILQ_FOREACH(range, &ch->locked_ranges, tailq) { 6580 if (ctx->range.offset == range->offset && 6581 ctx->range.length == range->length && 6582 ctx->range.locked_ctx == range->locked_ctx) { 6583 TAILQ_REMOVE(&ch->locked_ranges, range, tailq); 6584 free(range); 6585 break; 6586 } 6587 } 6588 6589 /* Note: we should almost always be able to assert that the range specified 6590 * was found. But there are some very rare corner cases where a new channel 6591 * gets created simultaneously with a range unlock, where this function 6592 * would execute on that new channel and wouldn't have the range. 6593 * We also use this to clean up range allocations when a later allocation 6594 * fails in the locking path. 6595 * So we can't actually assert() here. 6596 */ 6597 6598 /* Swap the locked IO into a temporary list, and then try to submit them again. 6599 * We could hyper-optimize this to only resubmit locked I/O that overlap 6600 * with the range that was just unlocked, but this isn't a performance path so 6601 * we go for simplicity here. 6602 */ 6603 TAILQ_INIT(&io_locked); 6604 TAILQ_SWAP(&ch->io_locked, &io_locked, spdk_bdev_io, internal.ch_link); 6605 while (!TAILQ_EMPTY(&io_locked)) { 6606 bdev_io = TAILQ_FIRST(&io_locked); 6607 TAILQ_REMOVE(&io_locked, bdev_io, internal.ch_link); 6608 bdev_io_submit(bdev_io); 6609 } 6610 6611 spdk_for_each_channel_continue(i, 0); 6612 } 6613 6614 static int 6615 bdev_unlock_lba_range(struct spdk_bdev_desc *desc, struct spdk_io_channel *_ch, 6616 uint64_t offset, uint64_t length, 6617 lock_range_cb cb_fn, void *cb_arg) 6618 { 6619 struct spdk_bdev *bdev = spdk_bdev_desc_get_bdev(desc); 6620 struct spdk_bdev_channel *ch = spdk_io_channel_get_ctx(_ch); 6621 struct locked_lba_range_ctx *ctx; 6622 struct lba_range *range; 6623 bool range_found = false; 6624 6625 /* Let's make sure the specified channel actually has a lock on 6626 * the specified range. Note that the range must match exactly. 6627 */ 6628 TAILQ_FOREACH(range, &ch->locked_ranges, tailq) { 6629 if (range->offset == offset && range->length == length && 6630 range->owner_ch == ch && range->locked_ctx == cb_arg) { 6631 range_found = true; 6632 break; 6633 } 6634 } 6635 6636 if (!range_found) { 6637 return -EINVAL; 6638 } 6639 6640 pthread_mutex_lock(&bdev->internal.mutex); 6641 /* We confirmed that this channel has locked the specified range. To 6642 * start the unlock the process, we find the range in the bdev's locked_ranges 6643 * and remove it. This ensures new channels don't inherit the locked range. 6644 * Then we will send a message to each channel (including the one specified 6645 * here) to remove the range from its per-channel list. 6646 */ 6647 TAILQ_FOREACH(range, &bdev->internal.locked_ranges, tailq) { 6648 if (range->offset == offset && range->length == length && 6649 range->locked_ctx == cb_arg) { 6650 break; 6651 } 6652 } 6653 if (range == NULL) { 6654 assert(false); 6655 pthread_mutex_unlock(&bdev->internal.mutex); 6656 return -EINVAL; 6657 } 6658 TAILQ_REMOVE(&bdev->internal.locked_ranges, range, tailq); 6659 ctx = SPDK_CONTAINEROF(range, struct locked_lba_range_ctx, range); 6660 pthread_mutex_unlock(&bdev->internal.mutex); 6661 6662 ctx->cb_fn = cb_fn; 6663 ctx->cb_arg = cb_arg; 6664 6665 spdk_for_each_channel(__bdev_to_io_dev(bdev), bdev_unlock_lba_range_get_channel, ctx, 6666 bdev_unlock_lba_range_cb); 6667 return 0; 6668 } 6669 6670 SPDK_LOG_REGISTER_COMPONENT(bdev) 6671 6672 SPDK_TRACE_REGISTER_FN(bdev_trace, "bdev", TRACE_GROUP_BDEV) 6673 { 6674 spdk_trace_register_owner(OWNER_BDEV, 'b'); 6675 spdk_trace_register_object(OBJECT_BDEV_IO, 'i'); 6676 spdk_trace_register_description("BDEV_IO_START", TRACE_BDEV_IO_START, OWNER_BDEV, 6677 OBJECT_BDEV_IO, 1, 0, "type: "); 6678 spdk_trace_register_description("BDEV_IO_DONE", TRACE_BDEV_IO_DONE, OWNER_BDEV, 6679 OBJECT_BDEV_IO, 0, 0, ""); 6680 } 6681