1 /*- 2 * BSD LICENSE 3 * 4 * Copyright (c) Intel Corporation. All rights reserved. 5 * Copyright (c) 2019 Mellanox Technologies LTD. All rights reserved. 6 * 7 * Redistribution and use in source and binary forms, with or without 8 * modification, are permitted provided that the following conditions 9 * are met: 10 * 11 * * Redistributions of source code must retain the above copyright 12 * notice, this list of conditions and the following disclaimer. 13 * * Redistributions in binary form must reproduce the above copyright 14 * notice, this list of conditions and the following disclaimer in 15 * the documentation and/or other materials provided with the 16 * distribution. 17 * * Neither the name of Intel Corporation nor the names of its 18 * contributors may be used to endorse or promote products derived 19 * from this software without specific prior written permission. 20 * 21 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 22 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 23 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR 24 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT 25 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, 26 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT 27 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 28 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY 29 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 30 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE 31 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 32 */ 33 34 #include "spdk/stdinc.h" 35 36 #include "spdk/bdev.h" 37 #include "spdk/conf.h" 38 39 #include "spdk/config.h" 40 #include "spdk/env.h" 41 #include "spdk/event.h" 42 #include "spdk/thread.h" 43 #include "spdk/likely.h" 44 #include "spdk/queue.h" 45 #include "spdk/nvme_spec.h" 46 #include "spdk/scsi_spec.h" 47 #include "spdk/notify.h" 48 #include "spdk/util.h" 49 #include "spdk/trace.h" 50 51 #include "spdk/bdev_module.h" 52 #include "spdk_internal/log.h" 53 #include "spdk/string.h" 54 55 #include "bdev_internal.h" 56 57 #ifdef SPDK_CONFIG_VTUNE 58 #include "ittnotify.h" 59 #include "ittnotify_types.h" 60 int __itt_init_ittlib(const char *, __itt_group_id); 61 #endif 62 63 #define SPDK_BDEV_IO_POOL_SIZE (64 * 1024 - 1) 64 #define SPDK_BDEV_IO_CACHE_SIZE 256 65 #define SPDK_BDEV_AUTO_EXAMINE true 66 #define BUF_SMALL_POOL_SIZE 8191 67 #define BUF_LARGE_POOL_SIZE 1023 68 #define NOMEM_THRESHOLD_COUNT 8 69 #define ZERO_BUFFER_SIZE 0x100000 70 71 #define OWNER_BDEV 0x2 72 73 #define OBJECT_BDEV_IO 0x2 74 75 #define TRACE_GROUP_BDEV 0x3 76 #define TRACE_BDEV_IO_START SPDK_TPOINT_ID(TRACE_GROUP_BDEV, 0x0) 77 #define TRACE_BDEV_IO_DONE SPDK_TPOINT_ID(TRACE_GROUP_BDEV, 0x1) 78 79 #define SPDK_BDEV_QOS_TIMESLICE_IN_USEC 1000 80 #define SPDK_BDEV_QOS_MIN_IO_PER_TIMESLICE 1 81 #define SPDK_BDEV_QOS_MIN_BYTE_PER_TIMESLICE 512 82 #define SPDK_BDEV_QOS_MIN_IOS_PER_SEC 1000 83 #define SPDK_BDEV_QOS_MIN_BYTES_PER_SEC (1024 * 1024) 84 #define SPDK_BDEV_QOS_LIMIT_NOT_DEFINED UINT64_MAX 85 #define SPDK_BDEV_IO_POLL_INTERVAL_IN_MSEC 1000 86 87 #define SPDK_BDEV_POOL_ALIGNMENT 512 88 89 static const char *qos_conf_type[] = {"Limit_IOPS", 90 "Limit_BPS", "Limit_Read_BPS", "Limit_Write_BPS" 91 }; 92 static const char *qos_rpc_type[] = {"rw_ios_per_sec", 93 "rw_mbytes_per_sec", "r_mbytes_per_sec", "w_mbytes_per_sec" 94 }; 95 96 TAILQ_HEAD(spdk_bdev_list, spdk_bdev); 97 98 struct spdk_bdev_mgr { 99 struct spdk_mempool *bdev_io_pool; 100 101 struct spdk_mempool *buf_small_pool; 102 struct spdk_mempool *buf_large_pool; 103 104 void *zero_buffer; 105 106 TAILQ_HEAD(bdev_module_list, spdk_bdev_module) bdev_modules; 107 108 struct spdk_bdev_list bdevs; 109 110 bool init_complete; 111 bool module_init_complete; 112 113 pthread_mutex_t mutex; 114 115 #ifdef SPDK_CONFIG_VTUNE 116 __itt_domain *domain; 117 #endif 118 }; 119 120 static struct spdk_bdev_mgr g_bdev_mgr = { 121 .bdev_modules = TAILQ_HEAD_INITIALIZER(g_bdev_mgr.bdev_modules), 122 .bdevs = TAILQ_HEAD_INITIALIZER(g_bdev_mgr.bdevs), 123 .init_complete = false, 124 .module_init_complete = false, 125 .mutex = PTHREAD_MUTEX_INITIALIZER, 126 }; 127 128 typedef void (*lock_range_cb)(void *ctx, int status); 129 130 struct lba_range { 131 uint64_t offset; 132 uint64_t length; 133 void *locked_ctx; 134 struct spdk_bdev_channel *owner_ch; 135 TAILQ_ENTRY(lba_range) tailq; 136 }; 137 138 static struct spdk_bdev_opts g_bdev_opts = { 139 .bdev_io_pool_size = SPDK_BDEV_IO_POOL_SIZE, 140 .bdev_io_cache_size = SPDK_BDEV_IO_CACHE_SIZE, 141 .bdev_auto_examine = SPDK_BDEV_AUTO_EXAMINE, 142 }; 143 144 static spdk_bdev_init_cb g_init_cb_fn = NULL; 145 static void *g_init_cb_arg = NULL; 146 147 static spdk_bdev_fini_cb g_fini_cb_fn = NULL; 148 static void *g_fini_cb_arg = NULL; 149 static struct spdk_thread *g_fini_thread = NULL; 150 151 struct spdk_bdev_qos_limit { 152 /** IOs or bytes allowed per second (i.e., 1s). */ 153 uint64_t limit; 154 155 /** Remaining IOs or bytes allowed in current timeslice (e.g., 1ms). 156 * For remaining bytes, allowed to run negative if an I/O is submitted when 157 * some bytes are remaining, but the I/O is bigger than that amount. The 158 * excess will be deducted from the next timeslice. 159 */ 160 int64_t remaining_this_timeslice; 161 162 /** Minimum allowed IOs or bytes to be issued in one timeslice (e.g., 1ms). */ 163 uint32_t min_per_timeslice; 164 165 /** Maximum allowed IOs or bytes to be issued in one timeslice (e.g., 1ms). */ 166 uint32_t max_per_timeslice; 167 168 /** Function to check whether to queue the IO. */ 169 bool (*queue_io)(const struct spdk_bdev_qos_limit *limit, struct spdk_bdev_io *io); 170 171 /** Function to update for the submitted IO. */ 172 void (*update_quota)(struct spdk_bdev_qos_limit *limit, struct spdk_bdev_io *io); 173 }; 174 175 struct spdk_bdev_qos { 176 /** Types of structure of rate limits. */ 177 struct spdk_bdev_qos_limit rate_limits[SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES]; 178 179 /** The channel that all I/O are funneled through. */ 180 struct spdk_bdev_channel *ch; 181 182 /** The thread on which the poller is running. */ 183 struct spdk_thread *thread; 184 185 /** Queue of I/O waiting to be issued. */ 186 bdev_io_tailq_t queued; 187 188 /** Size of a timeslice in tsc ticks. */ 189 uint64_t timeslice_size; 190 191 /** Timestamp of start of last timeslice. */ 192 uint64_t last_timeslice; 193 194 /** Poller that processes queued I/O commands each time slice. */ 195 struct spdk_poller *poller; 196 }; 197 198 struct spdk_bdev_mgmt_channel { 199 bdev_io_stailq_t need_buf_small; 200 bdev_io_stailq_t need_buf_large; 201 202 /* 203 * Each thread keeps a cache of bdev_io - this allows 204 * bdev threads which are *not* DPDK threads to still 205 * benefit from a per-thread bdev_io cache. Without 206 * this, non-DPDK threads fetching from the mempool 207 * incur a cmpxchg on get and put. 208 */ 209 bdev_io_stailq_t per_thread_cache; 210 uint32_t per_thread_cache_count; 211 uint32_t bdev_io_cache_size; 212 213 TAILQ_HEAD(, spdk_bdev_shared_resource) shared_resources; 214 TAILQ_HEAD(, spdk_bdev_io_wait_entry) io_wait_queue; 215 }; 216 217 /* 218 * Per-module (or per-io_device) data. Multiple bdevs built on the same io_device 219 * will queue here their IO that awaits retry. It makes it possible to retry sending 220 * IO to one bdev after IO from other bdev completes. 221 */ 222 struct spdk_bdev_shared_resource { 223 /* The bdev management channel */ 224 struct spdk_bdev_mgmt_channel *mgmt_ch; 225 226 /* 227 * Count of I/O submitted to bdev module and waiting for completion. 228 * Incremented before submit_request() is called on an spdk_bdev_io. 229 */ 230 uint64_t io_outstanding; 231 232 /* 233 * Queue of IO awaiting retry because of a previous NOMEM status returned 234 * on this channel. 235 */ 236 bdev_io_tailq_t nomem_io; 237 238 /* 239 * Threshold which io_outstanding must drop to before retrying nomem_io. 240 */ 241 uint64_t nomem_threshold; 242 243 /* I/O channel allocated by a bdev module */ 244 struct spdk_io_channel *shared_ch; 245 246 /* Refcount of bdev channels using this resource */ 247 uint32_t ref; 248 249 TAILQ_ENTRY(spdk_bdev_shared_resource) link; 250 }; 251 252 #define BDEV_CH_RESET_IN_PROGRESS (1 << 0) 253 #define BDEV_CH_QOS_ENABLED (1 << 1) 254 255 struct spdk_bdev_channel { 256 struct spdk_bdev *bdev; 257 258 /* The channel for the underlying device */ 259 struct spdk_io_channel *channel; 260 261 /* Per io_device per thread data */ 262 struct spdk_bdev_shared_resource *shared_resource; 263 264 struct spdk_bdev_io_stat stat; 265 266 /* 267 * Count of I/O submitted to the underlying dev module through this channel 268 * and waiting for completion. 269 */ 270 uint64_t io_outstanding; 271 272 /* 273 * List of spdk_bdev_io directly associated with a call to the public bdev API. 274 * It does not include any spdk_bdev_io that are generated via splitting. 275 */ 276 bdev_io_tailq_t io_submitted; 277 278 /* 279 * List of spdk_bdev_io that are currently queued because they write to a locked 280 * LBA range. 281 */ 282 bdev_io_tailq_t io_locked; 283 284 uint32_t flags; 285 286 struct spdk_histogram_data *histogram; 287 288 #ifdef SPDK_CONFIG_VTUNE 289 uint64_t start_tsc; 290 uint64_t interval_tsc; 291 __itt_string_handle *handle; 292 struct spdk_bdev_io_stat prev_stat; 293 #endif 294 295 bdev_io_tailq_t queued_resets; 296 297 lba_range_tailq_t locked_ranges; 298 }; 299 300 struct media_event_entry { 301 struct spdk_bdev_media_event event; 302 TAILQ_ENTRY(media_event_entry) tailq; 303 }; 304 305 #define MEDIA_EVENT_POOL_SIZE 64 306 307 struct spdk_bdev_desc { 308 struct spdk_bdev *bdev; 309 struct spdk_thread *thread; 310 struct { 311 bool open_with_ext; 312 union { 313 spdk_bdev_remove_cb_t remove_fn; 314 spdk_bdev_event_cb_t event_fn; 315 }; 316 void *ctx; 317 } callback; 318 bool closed; 319 bool write; 320 pthread_mutex_t mutex; 321 uint32_t refs; 322 TAILQ_HEAD(, media_event_entry) pending_media_events; 323 TAILQ_HEAD(, media_event_entry) free_media_events; 324 struct media_event_entry *media_events_buffer; 325 TAILQ_ENTRY(spdk_bdev_desc) link; 326 327 uint64_t timeout_in_sec; 328 spdk_bdev_io_timeout_cb cb_fn; 329 void *cb_arg; 330 struct spdk_poller *io_timeout_poller; 331 }; 332 333 struct spdk_bdev_iostat_ctx { 334 struct spdk_bdev_io_stat *stat; 335 spdk_bdev_get_device_stat_cb cb; 336 void *cb_arg; 337 }; 338 339 struct set_qos_limit_ctx { 340 void (*cb_fn)(void *cb_arg, int status); 341 void *cb_arg; 342 struct spdk_bdev *bdev; 343 }; 344 345 #define __bdev_to_io_dev(bdev) (((char *)bdev) + 1) 346 #define __bdev_from_io_dev(io_dev) ((struct spdk_bdev *)(((char *)io_dev) - 1)) 347 348 static void bdev_write_zero_buffer_done(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg); 349 static void bdev_write_zero_buffer_next(void *_bdev_io); 350 351 static void bdev_enable_qos_msg(struct spdk_io_channel_iter *i); 352 static void bdev_enable_qos_done(struct spdk_io_channel_iter *i, int status); 353 354 static int 355 bdev_readv_blocks_with_md(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 356 struct iovec *iov, int iovcnt, void *md_buf, uint64_t offset_blocks, 357 uint64_t num_blocks, spdk_bdev_io_completion_cb cb, void *cb_arg); 358 static int 359 bdev_writev_blocks_with_md(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 360 struct iovec *iov, int iovcnt, void *md_buf, 361 uint64_t offset_blocks, uint64_t num_blocks, 362 spdk_bdev_io_completion_cb cb, void *cb_arg); 363 364 static int 365 bdev_lock_lba_range(struct spdk_bdev_desc *desc, struct spdk_io_channel *_ch, 366 uint64_t offset, uint64_t length, 367 lock_range_cb cb_fn, void *cb_arg); 368 369 static int 370 bdev_unlock_lba_range(struct spdk_bdev_desc *desc, struct spdk_io_channel *_ch, 371 uint64_t offset, uint64_t length, 372 lock_range_cb cb_fn, void *cb_arg); 373 374 void 375 spdk_bdev_get_opts(struct spdk_bdev_opts *opts) 376 { 377 *opts = g_bdev_opts; 378 } 379 380 int 381 spdk_bdev_set_opts(struct spdk_bdev_opts *opts) 382 { 383 uint32_t min_pool_size; 384 385 /* 386 * Add 1 to the thread count to account for the extra mgmt_ch that gets created during subsystem 387 * initialization. A second mgmt_ch will be created on the same thread when the application starts 388 * but before the deferred put_io_channel event is executed for the first mgmt_ch. 389 */ 390 min_pool_size = opts->bdev_io_cache_size * (spdk_thread_get_count() + 1); 391 if (opts->bdev_io_pool_size < min_pool_size) { 392 SPDK_ERRLOG("bdev_io_pool_size %" PRIu32 " is not compatible with bdev_io_cache_size %" PRIu32 393 " and %" PRIu32 " threads\n", opts->bdev_io_pool_size, opts->bdev_io_cache_size, 394 spdk_thread_get_count()); 395 SPDK_ERRLOG("bdev_io_pool_size must be at least %" PRIu32 "\n", min_pool_size); 396 return -1; 397 } 398 399 g_bdev_opts = *opts; 400 return 0; 401 } 402 403 /* 404 * Will implement the whitelist in the furture 405 */ 406 static inline bool 407 bdev_in_examine_whitelist(struct spdk_bdev *bdev) 408 { 409 return false; 410 } 411 412 static inline bool 413 bdev_ok_to_examine(struct spdk_bdev *bdev) 414 { 415 if (g_bdev_opts.bdev_auto_examine) { 416 return true; 417 } else { 418 return bdev_in_examine_whitelist(bdev); 419 } 420 } 421 422 struct spdk_bdev * 423 spdk_bdev_first(void) 424 { 425 struct spdk_bdev *bdev; 426 427 bdev = TAILQ_FIRST(&g_bdev_mgr.bdevs); 428 if (bdev) { 429 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Starting bdev iteration at %s\n", bdev->name); 430 } 431 432 return bdev; 433 } 434 435 struct spdk_bdev * 436 spdk_bdev_next(struct spdk_bdev *prev) 437 { 438 struct spdk_bdev *bdev; 439 440 bdev = TAILQ_NEXT(prev, internal.link); 441 if (bdev) { 442 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Continuing bdev iteration at %s\n", bdev->name); 443 } 444 445 return bdev; 446 } 447 448 static struct spdk_bdev * 449 _bdev_next_leaf(struct spdk_bdev *bdev) 450 { 451 while (bdev != NULL) { 452 if (bdev->internal.claim_module == NULL) { 453 return bdev; 454 } else { 455 bdev = TAILQ_NEXT(bdev, internal.link); 456 } 457 } 458 459 return bdev; 460 } 461 462 struct spdk_bdev * 463 spdk_bdev_first_leaf(void) 464 { 465 struct spdk_bdev *bdev; 466 467 bdev = _bdev_next_leaf(TAILQ_FIRST(&g_bdev_mgr.bdevs)); 468 469 if (bdev) { 470 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Starting bdev iteration at %s\n", bdev->name); 471 } 472 473 return bdev; 474 } 475 476 struct spdk_bdev * 477 spdk_bdev_next_leaf(struct spdk_bdev *prev) 478 { 479 struct spdk_bdev *bdev; 480 481 bdev = _bdev_next_leaf(TAILQ_NEXT(prev, internal.link)); 482 483 if (bdev) { 484 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Continuing bdev iteration at %s\n", bdev->name); 485 } 486 487 return bdev; 488 } 489 490 struct spdk_bdev * 491 spdk_bdev_get_by_name(const char *bdev_name) 492 { 493 struct spdk_bdev_alias *tmp; 494 struct spdk_bdev *bdev = spdk_bdev_first(); 495 496 while (bdev != NULL) { 497 if (strcmp(bdev_name, bdev->name) == 0) { 498 return bdev; 499 } 500 501 TAILQ_FOREACH(tmp, &bdev->aliases, tailq) { 502 if (strcmp(bdev_name, tmp->alias) == 0) { 503 return bdev; 504 } 505 } 506 507 bdev = spdk_bdev_next(bdev); 508 } 509 510 return NULL; 511 } 512 513 void 514 spdk_bdev_io_set_buf(struct spdk_bdev_io *bdev_io, void *buf, size_t len) 515 { 516 struct iovec *iovs; 517 518 if (bdev_io->u.bdev.iovs == NULL) { 519 bdev_io->u.bdev.iovs = &bdev_io->iov; 520 bdev_io->u.bdev.iovcnt = 1; 521 } 522 523 iovs = bdev_io->u.bdev.iovs; 524 525 assert(iovs != NULL); 526 assert(bdev_io->u.bdev.iovcnt >= 1); 527 528 iovs[0].iov_base = buf; 529 iovs[0].iov_len = len; 530 } 531 532 void 533 spdk_bdev_io_set_md_buf(struct spdk_bdev_io *bdev_io, void *md_buf, size_t len) 534 { 535 assert((len / spdk_bdev_get_md_size(bdev_io->bdev)) >= bdev_io->u.bdev.num_blocks); 536 bdev_io->u.bdev.md_buf = md_buf; 537 } 538 539 static bool 540 _is_buf_allocated(const struct iovec *iovs) 541 { 542 if (iovs == NULL) { 543 return false; 544 } 545 546 return iovs[0].iov_base != NULL; 547 } 548 549 static bool 550 _are_iovs_aligned(struct iovec *iovs, int iovcnt, uint32_t alignment) 551 { 552 int i; 553 uintptr_t iov_base; 554 555 if (spdk_likely(alignment == 1)) { 556 return true; 557 } 558 559 for (i = 0; i < iovcnt; i++) { 560 iov_base = (uintptr_t)iovs[i].iov_base; 561 if ((iov_base & (alignment - 1)) != 0) { 562 return false; 563 } 564 } 565 566 return true; 567 } 568 569 static void 570 _copy_iovs_to_buf(void *buf, size_t buf_len, struct iovec *iovs, int iovcnt) 571 { 572 int i; 573 size_t len; 574 575 for (i = 0; i < iovcnt; i++) { 576 len = spdk_min(iovs[i].iov_len, buf_len); 577 memcpy(buf, iovs[i].iov_base, len); 578 buf += len; 579 buf_len -= len; 580 } 581 } 582 583 static void 584 _copy_buf_to_iovs(struct iovec *iovs, int iovcnt, void *buf, size_t buf_len) 585 { 586 int i; 587 size_t len; 588 589 for (i = 0; i < iovcnt; i++) { 590 len = spdk_min(iovs[i].iov_len, buf_len); 591 memcpy(iovs[i].iov_base, buf, len); 592 buf += len; 593 buf_len -= len; 594 } 595 } 596 597 static void 598 _bdev_io_set_bounce_buf(struct spdk_bdev_io *bdev_io, void *buf, size_t len) 599 { 600 /* save original iovec */ 601 bdev_io->internal.orig_iovs = bdev_io->u.bdev.iovs; 602 bdev_io->internal.orig_iovcnt = bdev_io->u.bdev.iovcnt; 603 /* set bounce iov */ 604 bdev_io->u.bdev.iovs = &bdev_io->internal.bounce_iov; 605 bdev_io->u.bdev.iovcnt = 1; 606 /* set bounce buffer for this operation */ 607 bdev_io->u.bdev.iovs[0].iov_base = buf; 608 bdev_io->u.bdev.iovs[0].iov_len = len; 609 /* if this is write path, copy data from original buffer to bounce buffer */ 610 if (bdev_io->type == SPDK_BDEV_IO_TYPE_WRITE) { 611 _copy_iovs_to_buf(buf, len, bdev_io->internal.orig_iovs, bdev_io->internal.orig_iovcnt); 612 } 613 } 614 615 static void 616 _bdev_io_set_bounce_md_buf(struct spdk_bdev_io *bdev_io, void *md_buf, size_t len) 617 { 618 /* save original md_buf */ 619 bdev_io->internal.orig_md_buf = bdev_io->u.bdev.md_buf; 620 /* set bounce md_buf */ 621 bdev_io->u.bdev.md_buf = md_buf; 622 623 if (bdev_io->type == SPDK_BDEV_IO_TYPE_WRITE) { 624 memcpy(md_buf, bdev_io->internal.orig_md_buf, len); 625 } 626 } 627 628 static void 629 bdev_io_get_buf_complete(struct spdk_bdev_io *bdev_io, void *buf, bool status) 630 { 631 struct spdk_io_channel *ch = spdk_bdev_io_get_io_channel(bdev_io); 632 633 if (spdk_unlikely(bdev_io->internal.get_aux_buf_cb != NULL)) { 634 bdev_io->internal.get_aux_buf_cb(ch, bdev_io, buf); 635 bdev_io->internal.get_aux_buf_cb = NULL; 636 } else { 637 assert(bdev_io->internal.get_buf_cb != NULL); 638 bdev_io->internal.buf = buf; 639 bdev_io->internal.get_buf_cb(ch, bdev_io, status); 640 bdev_io->internal.get_buf_cb = NULL; 641 } 642 } 643 644 static void 645 _bdev_io_set_buf(struct spdk_bdev_io *bdev_io, void *buf, uint64_t len) 646 { 647 struct spdk_bdev *bdev = bdev_io->bdev; 648 bool buf_allocated; 649 uint64_t md_len, alignment; 650 void *aligned_buf; 651 652 if (spdk_unlikely(bdev_io->internal.get_aux_buf_cb != NULL)) { 653 bdev_io_get_buf_complete(bdev_io, buf, true); 654 return; 655 } 656 657 alignment = spdk_bdev_get_buf_align(bdev); 658 buf_allocated = _is_buf_allocated(bdev_io->u.bdev.iovs); 659 aligned_buf = (void *)(((uintptr_t)buf + (alignment - 1)) & ~(alignment - 1)); 660 661 if (buf_allocated) { 662 _bdev_io_set_bounce_buf(bdev_io, aligned_buf, len); 663 } else { 664 spdk_bdev_io_set_buf(bdev_io, aligned_buf, len); 665 } 666 667 if (spdk_bdev_is_md_separate(bdev)) { 668 aligned_buf = (char *)aligned_buf + len; 669 md_len = bdev_io->u.bdev.num_blocks * bdev->md_len; 670 671 assert(((uintptr_t)aligned_buf & (alignment - 1)) == 0); 672 673 if (bdev_io->u.bdev.md_buf != NULL) { 674 _bdev_io_set_bounce_md_buf(bdev_io, aligned_buf, md_len); 675 } else { 676 spdk_bdev_io_set_md_buf(bdev_io, aligned_buf, md_len); 677 } 678 } 679 bdev_io_get_buf_complete(bdev_io, buf, true); 680 } 681 682 static void 683 _bdev_io_put_buf(struct spdk_bdev_io *bdev_io, void *buf, uint64_t buf_len) 684 { 685 struct spdk_bdev *bdev = bdev_io->bdev; 686 struct spdk_mempool *pool; 687 struct spdk_bdev_io *tmp; 688 bdev_io_stailq_t *stailq; 689 struct spdk_bdev_mgmt_channel *ch; 690 uint64_t md_len, alignment; 691 692 md_len = spdk_bdev_is_md_separate(bdev) ? bdev_io->u.bdev.num_blocks * bdev->md_len : 0; 693 alignment = spdk_bdev_get_buf_align(bdev); 694 ch = bdev_io->internal.ch->shared_resource->mgmt_ch; 695 696 if (buf_len + alignment + md_len <= SPDK_BDEV_BUF_SIZE_WITH_MD(SPDK_BDEV_SMALL_BUF_MAX_SIZE) + 697 SPDK_BDEV_POOL_ALIGNMENT) { 698 pool = g_bdev_mgr.buf_small_pool; 699 stailq = &ch->need_buf_small; 700 } else { 701 pool = g_bdev_mgr.buf_large_pool; 702 stailq = &ch->need_buf_large; 703 } 704 705 if (STAILQ_EMPTY(stailq)) { 706 spdk_mempool_put(pool, buf); 707 } else { 708 tmp = STAILQ_FIRST(stailq); 709 STAILQ_REMOVE_HEAD(stailq, internal.buf_link); 710 _bdev_io_set_buf(tmp, buf, tmp->internal.buf_len); 711 } 712 } 713 714 static void 715 bdev_io_put_buf(struct spdk_bdev_io *bdev_io) 716 { 717 assert(bdev_io->internal.buf != NULL); 718 _bdev_io_put_buf(bdev_io, bdev_io->internal.buf, bdev_io->internal.buf_len); 719 bdev_io->internal.buf = NULL; 720 } 721 722 void 723 spdk_bdev_io_put_aux_buf(struct spdk_bdev_io *bdev_io, void *buf) 724 { 725 uint64_t len = bdev_io->u.bdev.num_blocks * bdev_io->bdev->blocklen; 726 727 assert(buf != NULL); 728 _bdev_io_put_buf(bdev_io, buf, len); 729 } 730 731 static void 732 _bdev_io_unset_bounce_buf(struct spdk_bdev_io *bdev_io) 733 { 734 if (spdk_likely(bdev_io->internal.orig_iovcnt == 0)) { 735 assert(bdev_io->internal.orig_md_buf == NULL); 736 return; 737 } 738 739 /* if this is read path, copy data from bounce buffer to original buffer */ 740 if (bdev_io->type == SPDK_BDEV_IO_TYPE_READ && 741 bdev_io->internal.status == SPDK_BDEV_IO_STATUS_SUCCESS) { 742 _copy_buf_to_iovs(bdev_io->internal.orig_iovs, 743 bdev_io->internal.orig_iovcnt, 744 bdev_io->internal.bounce_iov.iov_base, 745 bdev_io->internal.bounce_iov.iov_len); 746 } 747 /* set original buffer for this io */ 748 bdev_io->u.bdev.iovcnt = bdev_io->internal.orig_iovcnt; 749 bdev_io->u.bdev.iovs = bdev_io->internal.orig_iovs; 750 /* disable bouncing buffer for this io */ 751 bdev_io->internal.orig_iovcnt = 0; 752 bdev_io->internal.orig_iovs = NULL; 753 754 /* do the same for metadata buffer */ 755 if (spdk_unlikely(bdev_io->internal.orig_md_buf != NULL)) { 756 assert(spdk_bdev_is_md_separate(bdev_io->bdev)); 757 758 if (bdev_io->type == SPDK_BDEV_IO_TYPE_READ && 759 bdev_io->internal.status == SPDK_BDEV_IO_STATUS_SUCCESS) { 760 memcpy(bdev_io->internal.orig_md_buf, bdev_io->u.bdev.md_buf, 761 bdev_io->u.bdev.num_blocks * spdk_bdev_get_md_size(bdev_io->bdev)); 762 } 763 764 bdev_io->u.bdev.md_buf = bdev_io->internal.orig_md_buf; 765 bdev_io->internal.orig_md_buf = NULL; 766 } 767 768 /* We want to free the bounce buffer here since we know we're done with it (as opposed 769 * to waiting for the conditional free of internal.buf in spdk_bdev_free_io()). 770 */ 771 bdev_io_put_buf(bdev_io); 772 } 773 774 static void 775 bdev_io_get_buf(struct spdk_bdev_io *bdev_io, uint64_t len) 776 { 777 struct spdk_bdev *bdev = bdev_io->bdev; 778 struct spdk_mempool *pool; 779 bdev_io_stailq_t *stailq; 780 struct spdk_bdev_mgmt_channel *mgmt_ch; 781 uint64_t alignment, md_len; 782 void *buf; 783 784 alignment = spdk_bdev_get_buf_align(bdev); 785 md_len = spdk_bdev_is_md_separate(bdev) ? bdev_io->u.bdev.num_blocks * bdev->md_len : 0; 786 787 if (len + alignment + md_len > SPDK_BDEV_BUF_SIZE_WITH_MD(SPDK_BDEV_LARGE_BUF_MAX_SIZE) + 788 SPDK_BDEV_POOL_ALIGNMENT) { 789 SPDK_ERRLOG("Length + alignment %" PRIu64 " is larger than allowed\n", 790 len + alignment); 791 bdev_io_get_buf_complete(bdev_io, NULL, false); 792 return; 793 } 794 795 mgmt_ch = bdev_io->internal.ch->shared_resource->mgmt_ch; 796 797 bdev_io->internal.buf_len = len; 798 799 if (len + alignment + md_len <= SPDK_BDEV_BUF_SIZE_WITH_MD(SPDK_BDEV_SMALL_BUF_MAX_SIZE) + 800 SPDK_BDEV_POOL_ALIGNMENT) { 801 pool = g_bdev_mgr.buf_small_pool; 802 stailq = &mgmt_ch->need_buf_small; 803 } else { 804 pool = g_bdev_mgr.buf_large_pool; 805 stailq = &mgmt_ch->need_buf_large; 806 } 807 808 buf = spdk_mempool_get(pool); 809 if (!buf) { 810 STAILQ_INSERT_TAIL(stailq, bdev_io, internal.buf_link); 811 } else { 812 _bdev_io_set_buf(bdev_io, buf, len); 813 } 814 } 815 816 void 817 spdk_bdev_io_get_buf(struct spdk_bdev_io *bdev_io, spdk_bdev_io_get_buf_cb cb, uint64_t len) 818 { 819 struct spdk_bdev *bdev = bdev_io->bdev; 820 uint64_t alignment; 821 822 assert(cb != NULL); 823 bdev_io->internal.get_buf_cb = cb; 824 825 alignment = spdk_bdev_get_buf_align(bdev); 826 827 if (_is_buf_allocated(bdev_io->u.bdev.iovs) && 828 _are_iovs_aligned(bdev_io->u.bdev.iovs, bdev_io->u.bdev.iovcnt, alignment)) { 829 /* Buffer already present and aligned */ 830 cb(spdk_bdev_io_get_io_channel(bdev_io), bdev_io, true); 831 return; 832 } 833 834 bdev_io_get_buf(bdev_io, len); 835 } 836 837 void 838 spdk_bdev_io_get_aux_buf(struct spdk_bdev_io *bdev_io, spdk_bdev_io_get_aux_buf_cb cb) 839 { 840 uint64_t len = bdev_io->u.bdev.num_blocks * bdev_io->bdev->blocklen; 841 842 assert(cb != NULL); 843 assert(bdev_io->internal.get_aux_buf_cb == NULL); 844 bdev_io->internal.get_aux_buf_cb = cb; 845 bdev_io_get_buf(bdev_io, len); 846 } 847 848 static int 849 bdev_module_get_max_ctx_size(void) 850 { 851 struct spdk_bdev_module *bdev_module; 852 int max_bdev_module_size = 0; 853 854 TAILQ_FOREACH(bdev_module, &g_bdev_mgr.bdev_modules, internal.tailq) { 855 if (bdev_module->get_ctx_size && bdev_module->get_ctx_size() > max_bdev_module_size) { 856 max_bdev_module_size = bdev_module->get_ctx_size(); 857 } 858 } 859 860 return max_bdev_module_size; 861 } 862 863 void 864 spdk_bdev_config_text(FILE *fp) 865 { 866 struct spdk_bdev_module *bdev_module; 867 868 TAILQ_FOREACH(bdev_module, &g_bdev_mgr.bdev_modules, internal.tailq) { 869 if (bdev_module->config_text) { 870 bdev_module->config_text(fp); 871 } 872 } 873 } 874 875 static void 876 bdev_qos_config_json(struct spdk_bdev *bdev, struct spdk_json_write_ctx *w) 877 { 878 int i; 879 struct spdk_bdev_qos *qos = bdev->internal.qos; 880 uint64_t limits[SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES]; 881 882 if (!qos) { 883 return; 884 } 885 886 spdk_bdev_get_qos_rate_limits(bdev, limits); 887 888 spdk_json_write_object_begin(w); 889 spdk_json_write_named_string(w, "method", "bdev_set_qos_limit"); 890 891 spdk_json_write_named_object_begin(w, "params"); 892 spdk_json_write_named_string(w, "name", bdev->name); 893 for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) { 894 if (limits[i] > 0) { 895 spdk_json_write_named_uint64(w, qos_rpc_type[i], limits[i]); 896 } 897 } 898 spdk_json_write_object_end(w); 899 900 spdk_json_write_object_end(w); 901 } 902 903 void 904 spdk_bdev_subsystem_config_json(struct spdk_json_write_ctx *w) 905 { 906 struct spdk_bdev_module *bdev_module; 907 struct spdk_bdev *bdev; 908 909 assert(w != NULL); 910 911 spdk_json_write_array_begin(w); 912 913 spdk_json_write_object_begin(w); 914 spdk_json_write_named_string(w, "method", "bdev_set_options"); 915 spdk_json_write_named_object_begin(w, "params"); 916 spdk_json_write_named_uint32(w, "bdev_io_pool_size", g_bdev_opts.bdev_io_pool_size); 917 spdk_json_write_named_uint32(w, "bdev_io_cache_size", g_bdev_opts.bdev_io_cache_size); 918 spdk_json_write_named_bool(w, "bdev_auto_examine", g_bdev_opts.bdev_auto_examine); 919 spdk_json_write_object_end(w); 920 spdk_json_write_object_end(w); 921 922 TAILQ_FOREACH(bdev_module, &g_bdev_mgr.bdev_modules, internal.tailq) { 923 if (bdev_module->config_json) { 924 bdev_module->config_json(w); 925 } 926 } 927 928 pthread_mutex_lock(&g_bdev_mgr.mutex); 929 930 TAILQ_FOREACH(bdev, &g_bdev_mgr.bdevs, internal.link) { 931 if (bdev->fn_table->write_config_json) { 932 bdev->fn_table->write_config_json(bdev, w); 933 } 934 935 bdev_qos_config_json(bdev, w); 936 } 937 938 pthread_mutex_unlock(&g_bdev_mgr.mutex); 939 940 spdk_json_write_array_end(w); 941 } 942 943 static int 944 bdev_mgmt_channel_create(void *io_device, void *ctx_buf) 945 { 946 struct spdk_bdev_mgmt_channel *ch = ctx_buf; 947 struct spdk_bdev_io *bdev_io; 948 uint32_t i; 949 950 STAILQ_INIT(&ch->need_buf_small); 951 STAILQ_INIT(&ch->need_buf_large); 952 953 STAILQ_INIT(&ch->per_thread_cache); 954 ch->bdev_io_cache_size = g_bdev_opts.bdev_io_cache_size; 955 956 /* Pre-populate bdev_io cache to ensure this thread cannot be starved. */ 957 ch->per_thread_cache_count = 0; 958 for (i = 0; i < ch->bdev_io_cache_size; i++) { 959 bdev_io = spdk_mempool_get(g_bdev_mgr.bdev_io_pool); 960 assert(bdev_io != NULL); 961 ch->per_thread_cache_count++; 962 STAILQ_INSERT_HEAD(&ch->per_thread_cache, bdev_io, internal.buf_link); 963 } 964 965 TAILQ_INIT(&ch->shared_resources); 966 TAILQ_INIT(&ch->io_wait_queue); 967 968 return 0; 969 } 970 971 static void 972 bdev_mgmt_channel_destroy(void *io_device, void *ctx_buf) 973 { 974 struct spdk_bdev_mgmt_channel *ch = ctx_buf; 975 struct spdk_bdev_io *bdev_io; 976 977 if (!STAILQ_EMPTY(&ch->need_buf_small) || !STAILQ_EMPTY(&ch->need_buf_large)) { 978 SPDK_ERRLOG("Pending I/O list wasn't empty on mgmt channel free\n"); 979 } 980 981 if (!TAILQ_EMPTY(&ch->shared_resources)) { 982 SPDK_ERRLOG("Module channel list wasn't empty on mgmt channel free\n"); 983 } 984 985 while (!STAILQ_EMPTY(&ch->per_thread_cache)) { 986 bdev_io = STAILQ_FIRST(&ch->per_thread_cache); 987 STAILQ_REMOVE_HEAD(&ch->per_thread_cache, internal.buf_link); 988 ch->per_thread_cache_count--; 989 spdk_mempool_put(g_bdev_mgr.bdev_io_pool, (void *)bdev_io); 990 } 991 992 assert(ch->per_thread_cache_count == 0); 993 } 994 995 static void 996 bdev_init_complete(int rc) 997 { 998 spdk_bdev_init_cb cb_fn = g_init_cb_fn; 999 void *cb_arg = g_init_cb_arg; 1000 struct spdk_bdev_module *m; 1001 1002 g_bdev_mgr.init_complete = true; 1003 g_init_cb_fn = NULL; 1004 g_init_cb_arg = NULL; 1005 1006 /* 1007 * For modules that need to know when subsystem init is complete, 1008 * inform them now. 1009 */ 1010 if (rc == 0) { 1011 TAILQ_FOREACH(m, &g_bdev_mgr.bdev_modules, internal.tailq) { 1012 if (m->init_complete) { 1013 m->init_complete(); 1014 } 1015 } 1016 } 1017 1018 cb_fn(cb_arg, rc); 1019 } 1020 1021 static void 1022 bdev_module_action_complete(void) 1023 { 1024 struct spdk_bdev_module *m; 1025 1026 /* 1027 * Don't finish bdev subsystem initialization if 1028 * module pre-initialization is still in progress, or 1029 * the subsystem been already initialized. 1030 */ 1031 if (!g_bdev_mgr.module_init_complete || g_bdev_mgr.init_complete) { 1032 return; 1033 } 1034 1035 /* 1036 * Check all bdev modules for inits/examinations in progress. If any 1037 * exist, return immediately since we cannot finish bdev subsystem 1038 * initialization until all are completed. 1039 */ 1040 TAILQ_FOREACH(m, &g_bdev_mgr.bdev_modules, internal.tailq) { 1041 if (m->internal.action_in_progress > 0) { 1042 return; 1043 } 1044 } 1045 1046 /* 1047 * Modules already finished initialization - now that all 1048 * the bdev modules have finished their asynchronous I/O 1049 * processing, the entire bdev layer can be marked as complete. 1050 */ 1051 bdev_init_complete(0); 1052 } 1053 1054 static void 1055 bdev_module_action_done(struct spdk_bdev_module *module) 1056 { 1057 assert(module->internal.action_in_progress > 0); 1058 module->internal.action_in_progress--; 1059 bdev_module_action_complete(); 1060 } 1061 1062 void 1063 spdk_bdev_module_init_done(struct spdk_bdev_module *module) 1064 { 1065 bdev_module_action_done(module); 1066 } 1067 1068 void 1069 spdk_bdev_module_examine_done(struct spdk_bdev_module *module) 1070 { 1071 bdev_module_action_done(module); 1072 } 1073 1074 /** The last initialized bdev module */ 1075 static struct spdk_bdev_module *g_resume_bdev_module = NULL; 1076 1077 static void 1078 bdev_init_failed(void *cb_arg) 1079 { 1080 struct spdk_bdev_module *module = cb_arg; 1081 1082 module->internal.action_in_progress--; 1083 bdev_init_complete(-1); 1084 } 1085 1086 static int 1087 bdev_modules_init(void) 1088 { 1089 struct spdk_bdev_module *module; 1090 int rc = 0; 1091 1092 TAILQ_FOREACH(module, &g_bdev_mgr.bdev_modules, internal.tailq) { 1093 g_resume_bdev_module = module; 1094 if (module->async_init) { 1095 module->internal.action_in_progress = 1; 1096 } 1097 rc = module->module_init(); 1098 if (rc != 0) { 1099 /* Bump action_in_progress to prevent other modules from completion of modules_init 1100 * Send message to defer application shutdown until resources are cleaned up */ 1101 module->internal.action_in_progress = 1; 1102 spdk_thread_send_msg(spdk_get_thread(), bdev_init_failed, module); 1103 return rc; 1104 } 1105 } 1106 1107 g_resume_bdev_module = NULL; 1108 return 0; 1109 } 1110 1111 void 1112 spdk_bdev_initialize(spdk_bdev_init_cb cb_fn, void *cb_arg) 1113 { 1114 struct spdk_conf_section *sp; 1115 struct spdk_bdev_opts bdev_opts; 1116 int32_t bdev_io_pool_size, bdev_io_cache_size; 1117 int cache_size; 1118 int rc = 0; 1119 char mempool_name[32]; 1120 1121 assert(cb_fn != NULL); 1122 1123 sp = spdk_conf_find_section(NULL, "Bdev"); 1124 if (sp != NULL) { 1125 spdk_bdev_get_opts(&bdev_opts); 1126 1127 bdev_io_pool_size = spdk_conf_section_get_intval(sp, "BdevIoPoolSize"); 1128 if (bdev_io_pool_size >= 0) { 1129 bdev_opts.bdev_io_pool_size = bdev_io_pool_size; 1130 } 1131 1132 bdev_io_cache_size = spdk_conf_section_get_intval(sp, "BdevIoCacheSize"); 1133 if (bdev_io_cache_size >= 0) { 1134 bdev_opts.bdev_io_cache_size = bdev_io_cache_size; 1135 } 1136 1137 if (spdk_bdev_set_opts(&bdev_opts)) { 1138 bdev_init_complete(-1); 1139 return; 1140 } 1141 1142 assert(memcmp(&bdev_opts, &g_bdev_opts, sizeof(bdev_opts)) == 0); 1143 } 1144 1145 g_init_cb_fn = cb_fn; 1146 g_init_cb_arg = cb_arg; 1147 1148 spdk_notify_type_register("bdev_register"); 1149 spdk_notify_type_register("bdev_unregister"); 1150 1151 snprintf(mempool_name, sizeof(mempool_name), "bdev_io_%d", getpid()); 1152 1153 g_bdev_mgr.bdev_io_pool = spdk_mempool_create(mempool_name, 1154 g_bdev_opts.bdev_io_pool_size, 1155 sizeof(struct spdk_bdev_io) + 1156 bdev_module_get_max_ctx_size(), 1157 0, 1158 SPDK_ENV_SOCKET_ID_ANY); 1159 1160 if (g_bdev_mgr.bdev_io_pool == NULL) { 1161 SPDK_ERRLOG("could not allocate spdk_bdev_io pool\n"); 1162 bdev_init_complete(-1); 1163 return; 1164 } 1165 1166 /** 1167 * Ensure no more than half of the total buffers end up local caches, by 1168 * using spdk_thread_get_count() to determine how many local caches we need 1169 * to account for. 1170 */ 1171 cache_size = BUF_SMALL_POOL_SIZE / (2 * spdk_thread_get_count()); 1172 snprintf(mempool_name, sizeof(mempool_name), "buf_small_pool_%d", getpid()); 1173 1174 g_bdev_mgr.buf_small_pool = spdk_mempool_create(mempool_name, 1175 BUF_SMALL_POOL_SIZE, 1176 SPDK_BDEV_BUF_SIZE_WITH_MD(SPDK_BDEV_SMALL_BUF_MAX_SIZE) + 1177 SPDK_BDEV_POOL_ALIGNMENT, 1178 cache_size, 1179 SPDK_ENV_SOCKET_ID_ANY); 1180 if (!g_bdev_mgr.buf_small_pool) { 1181 SPDK_ERRLOG("create rbuf small pool failed\n"); 1182 bdev_init_complete(-1); 1183 return; 1184 } 1185 1186 cache_size = BUF_LARGE_POOL_SIZE / (2 * spdk_thread_get_count()); 1187 snprintf(mempool_name, sizeof(mempool_name), "buf_large_pool_%d", getpid()); 1188 1189 g_bdev_mgr.buf_large_pool = spdk_mempool_create(mempool_name, 1190 BUF_LARGE_POOL_SIZE, 1191 SPDK_BDEV_BUF_SIZE_WITH_MD(SPDK_BDEV_LARGE_BUF_MAX_SIZE) + 1192 SPDK_BDEV_POOL_ALIGNMENT, 1193 cache_size, 1194 SPDK_ENV_SOCKET_ID_ANY); 1195 if (!g_bdev_mgr.buf_large_pool) { 1196 SPDK_ERRLOG("create rbuf large pool failed\n"); 1197 bdev_init_complete(-1); 1198 return; 1199 } 1200 1201 g_bdev_mgr.zero_buffer = spdk_zmalloc(ZERO_BUFFER_SIZE, ZERO_BUFFER_SIZE, 1202 NULL, SPDK_ENV_LCORE_ID_ANY, SPDK_MALLOC_DMA); 1203 if (!g_bdev_mgr.zero_buffer) { 1204 SPDK_ERRLOG("create bdev zero buffer failed\n"); 1205 bdev_init_complete(-1); 1206 return; 1207 } 1208 1209 #ifdef SPDK_CONFIG_VTUNE 1210 g_bdev_mgr.domain = __itt_domain_create("spdk_bdev"); 1211 #endif 1212 1213 spdk_io_device_register(&g_bdev_mgr, bdev_mgmt_channel_create, 1214 bdev_mgmt_channel_destroy, 1215 sizeof(struct spdk_bdev_mgmt_channel), 1216 "bdev_mgr"); 1217 1218 rc = bdev_modules_init(); 1219 g_bdev_mgr.module_init_complete = true; 1220 if (rc != 0) { 1221 SPDK_ERRLOG("bdev modules init failed\n"); 1222 return; 1223 } 1224 1225 bdev_module_action_complete(); 1226 } 1227 1228 static void 1229 bdev_mgr_unregister_cb(void *io_device) 1230 { 1231 spdk_bdev_fini_cb cb_fn = g_fini_cb_fn; 1232 1233 if (g_bdev_mgr.bdev_io_pool) { 1234 if (spdk_mempool_count(g_bdev_mgr.bdev_io_pool) != g_bdev_opts.bdev_io_pool_size) { 1235 SPDK_ERRLOG("bdev IO pool count is %zu but should be %u\n", 1236 spdk_mempool_count(g_bdev_mgr.bdev_io_pool), 1237 g_bdev_opts.bdev_io_pool_size); 1238 } 1239 1240 spdk_mempool_free(g_bdev_mgr.bdev_io_pool); 1241 } 1242 1243 if (g_bdev_mgr.buf_small_pool) { 1244 if (spdk_mempool_count(g_bdev_mgr.buf_small_pool) != BUF_SMALL_POOL_SIZE) { 1245 SPDK_ERRLOG("Small buffer pool count is %zu but should be %u\n", 1246 spdk_mempool_count(g_bdev_mgr.buf_small_pool), 1247 BUF_SMALL_POOL_SIZE); 1248 assert(false); 1249 } 1250 1251 spdk_mempool_free(g_bdev_mgr.buf_small_pool); 1252 } 1253 1254 if (g_bdev_mgr.buf_large_pool) { 1255 if (spdk_mempool_count(g_bdev_mgr.buf_large_pool) != BUF_LARGE_POOL_SIZE) { 1256 SPDK_ERRLOG("Large buffer pool count is %zu but should be %u\n", 1257 spdk_mempool_count(g_bdev_mgr.buf_large_pool), 1258 BUF_LARGE_POOL_SIZE); 1259 assert(false); 1260 } 1261 1262 spdk_mempool_free(g_bdev_mgr.buf_large_pool); 1263 } 1264 1265 spdk_free(g_bdev_mgr.zero_buffer); 1266 1267 cb_fn(g_fini_cb_arg); 1268 g_fini_cb_fn = NULL; 1269 g_fini_cb_arg = NULL; 1270 g_bdev_mgr.init_complete = false; 1271 g_bdev_mgr.module_init_complete = false; 1272 pthread_mutex_destroy(&g_bdev_mgr.mutex); 1273 } 1274 1275 static void 1276 bdev_module_finish_iter(void *arg) 1277 { 1278 struct spdk_bdev_module *bdev_module; 1279 1280 /* FIXME: Handling initialization failures is broken now, 1281 * so we won't even try cleaning up after successfully 1282 * initialized modules. if module_init_complete is false, 1283 * just call spdk_bdev_mgr_unregister_cb 1284 */ 1285 if (!g_bdev_mgr.module_init_complete) { 1286 bdev_mgr_unregister_cb(NULL); 1287 return; 1288 } 1289 1290 /* Start iterating from the last touched module */ 1291 if (!g_resume_bdev_module) { 1292 bdev_module = TAILQ_LAST(&g_bdev_mgr.bdev_modules, bdev_module_list); 1293 } else { 1294 bdev_module = TAILQ_PREV(g_resume_bdev_module, bdev_module_list, 1295 internal.tailq); 1296 } 1297 1298 while (bdev_module) { 1299 if (bdev_module->async_fini) { 1300 /* Save our place so we can resume later. We must 1301 * save the variable here, before calling module_fini() 1302 * below, because in some cases the module may immediately 1303 * call spdk_bdev_module_finish_done() and re-enter 1304 * this function to continue iterating. */ 1305 g_resume_bdev_module = bdev_module; 1306 } 1307 1308 if (bdev_module->module_fini) { 1309 bdev_module->module_fini(); 1310 } 1311 1312 if (bdev_module->async_fini) { 1313 return; 1314 } 1315 1316 bdev_module = TAILQ_PREV(bdev_module, bdev_module_list, 1317 internal.tailq); 1318 } 1319 1320 g_resume_bdev_module = NULL; 1321 spdk_io_device_unregister(&g_bdev_mgr, bdev_mgr_unregister_cb); 1322 } 1323 1324 void 1325 spdk_bdev_module_finish_done(void) 1326 { 1327 if (spdk_get_thread() != g_fini_thread) { 1328 spdk_thread_send_msg(g_fini_thread, bdev_module_finish_iter, NULL); 1329 } else { 1330 bdev_module_finish_iter(NULL); 1331 } 1332 } 1333 1334 static void 1335 bdev_finish_unregister_bdevs_iter(void *cb_arg, int bdeverrno) 1336 { 1337 struct spdk_bdev *bdev = cb_arg; 1338 1339 if (bdeverrno && bdev) { 1340 SPDK_WARNLOG("Unable to unregister bdev '%s' during spdk_bdev_finish()\n", 1341 bdev->name); 1342 1343 /* 1344 * Since the call to spdk_bdev_unregister() failed, we have no way to free this 1345 * bdev; try to continue by manually removing this bdev from the list and continue 1346 * with the next bdev in the list. 1347 */ 1348 TAILQ_REMOVE(&g_bdev_mgr.bdevs, bdev, internal.link); 1349 } 1350 1351 if (TAILQ_EMPTY(&g_bdev_mgr.bdevs)) { 1352 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Done unregistering bdevs\n"); 1353 /* 1354 * Bdev module finish need to be deferred as we might be in the middle of some context 1355 * (like bdev part free) that will use this bdev (or private bdev driver ctx data) 1356 * after returning. 1357 */ 1358 spdk_thread_send_msg(spdk_get_thread(), bdev_module_finish_iter, NULL); 1359 return; 1360 } 1361 1362 /* 1363 * Unregister last unclaimed bdev in the list, to ensure that bdev subsystem 1364 * shutdown proceeds top-down. The goal is to give virtual bdevs an opportunity 1365 * to detect clean shutdown as opposed to run-time hot removal of the underlying 1366 * base bdevs. 1367 * 1368 * Also, walk the list in the reverse order. 1369 */ 1370 for (bdev = TAILQ_LAST(&g_bdev_mgr.bdevs, spdk_bdev_list); 1371 bdev; bdev = TAILQ_PREV(bdev, spdk_bdev_list, internal.link)) { 1372 if (bdev->internal.claim_module != NULL) { 1373 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Skipping claimed bdev '%s'(<-'%s').\n", 1374 bdev->name, bdev->internal.claim_module->name); 1375 continue; 1376 } 1377 1378 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Unregistering bdev '%s'\n", bdev->name); 1379 spdk_bdev_unregister(bdev, bdev_finish_unregister_bdevs_iter, bdev); 1380 return; 1381 } 1382 1383 /* 1384 * If any bdev fails to unclaim underlying bdev properly, we may face the 1385 * case of bdev list consisting of claimed bdevs only (if claims are managed 1386 * correctly, this would mean there's a loop in the claims graph which is 1387 * clearly impossible). Warn and unregister last bdev on the list then. 1388 */ 1389 for (bdev = TAILQ_LAST(&g_bdev_mgr.bdevs, spdk_bdev_list); 1390 bdev; bdev = TAILQ_PREV(bdev, spdk_bdev_list, internal.link)) { 1391 SPDK_WARNLOG("Unregistering claimed bdev '%s'!\n", bdev->name); 1392 spdk_bdev_unregister(bdev, bdev_finish_unregister_bdevs_iter, bdev); 1393 return; 1394 } 1395 } 1396 1397 void 1398 spdk_bdev_finish(spdk_bdev_fini_cb cb_fn, void *cb_arg) 1399 { 1400 struct spdk_bdev_module *m; 1401 1402 assert(cb_fn != NULL); 1403 1404 g_fini_thread = spdk_get_thread(); 1405 1406 g_fini_cb_fn = cb_fn; 1407 g_fini_cb_arg = cb_arg; 1408 1409 TAILQ_FOREACH(m, &g_bdev_mgr.bdev_modules, internal.tailq) { 1410 if (m->fini_start) { 1411 m->fini_start(); 1412 } 1413 } 1414 1415 bdev_finish_unregister_bdevs_iter(NULL, 0); 1416 } 1417 1418 struct spdk_bdev_io * 1419 bdev_channel_get_io(struct spdk_bdev_channel *channel) 1420 { 1421 struct spdk_bdev_mgmt_channel *ch = channel->shared_resource->mgmt_ch; 1422 struct spdk_bdev_io *bdev_io; 1423 1424 if (ch->per_thread_cache_count > 0) { 1425 bdev_io = STAILQ_FIRST(&ch->per_thread_cache); 1426 STAILQ_REMOVE_HEAD(&ch->per_thread_cache, internal.buf_link); 1427 ch->per_thread_cache_count--; 1428 } else if (spdk_unlikely(!TAILQ_EMPTY(&ch->io_wait_queue))) { 1429 /* 1430 * Don't try to look for bdev_ios in the global pool if there are 1431 * waiters on bdev_ios - we don't want this caller to jump the line. 1432 */ 1433 bdev_io = NULL; 1434 } else { 1435 bdev_io = spdk_mempool_get(g_bdev_mgr.bdev_io_pool); 1436 } 1437 1438 return bdev_io; 1439 } 1440 1441 void 1442 spdk_bdev_free_io(struct spdk_bdev_io *bdev_io) 1443 { 1444 struct spdk_bdev_mgmt_channel *ch; 1445 1446 assert(bdev_io != NULL); 1447 assert(bdev_io->internal.status != SPDK_BDEV_IO_STATUS_PENDING); 1448 1449 ch = bdev_io->internal.ch->shared_resource->mgmt_ch; 1450 1451 if (bdev_io->internal.buf != NULL) { 1452 bdev_io_put_buf(bdev_io); 1453 } 1454 1455 if (ch->per_thread_cache_count < ch->bdev_io_cache_size) { 1456 ch->per_thread_cache_count++; 1457 STAILQ_INSERT_HEAD(&ch->per_thread_cache, bdev_io, internal.buf_link); 1458 while (ch->per_thread_cache_count > 0 && !TAILQ_EMPTY(&ch->io_wait_queue)) { 1459 struct spdk_bdev_io_wait_entry *entry; 1460 1461 entry = TAILQ_FIRST(&ch->io_wait_queue); 1462 TAILQ_REMOVE(&ch->io_wait_queue, entry, link); 1463 entry->cb_fn(entry->cb_arg); 1464 } 1465 } else { 1466 /* We should never have a full cache with entries on the io wait queue. */ 1467 assert(TAILQ_EMPTY(&ch->io_wait_queue)); 1468 spdk_mempool_put(g_bdev_mgr.bdev_io_pool, (void *)bdev_io); 1469 } 1470 } 1471 1472 static bool 1473 bdev_qos_is_iops_rate_limit(enum spdk_bdev_qos_rate_limit_type limit) 1474 { 1475 assert(limit != SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES); 1476 1477 switch (limit) { 1478 case SPDK_BDEV_QOS_RW_IOPS_RATE_LIMIT: 1479 return true; 1480 case SPDK_BDEV_QOS_RW_BPS_RATE_LIMIT: 1481 case SPDK_BDEV_QOS_R_BPS_RATE_LIMIT: 1482 case SPDK_BDEV_QOS_W_BPS_RATE_LIMIT: 1483 return false; 1484 case SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES: 1485 default: 1486 return false; 1487 } 1488 } 1489 1490 static bool 1491 bdev_qos_io_to_limit(struct spdk_bdev_io *bdev_io) 1492 { 1493 switch (bdev_io->type) { 1494 case SPDK_BDEV_IO_TYPE_NVME_IO: 1495 case SPDK_BDEV_IO_TYPE_NVME_IO_MD: 1496 case SPDK_BDEV_IO_TYPE_READ: 1497 case SPDK_BDEV_IO_TYPE_WRITE: 1498 return true; 1499 case SPDK_BDEV_IO_TYPE_ZCOPY: 1500 if (bdev_io->u.bdev.zcopy.start) { 1501 return true; 1502 } else { 1503 return false; 1504 } 1505 default: 1506 return false; 1507 } 1508 } 1509 1510 static bool 1511 bdev_is_read_io(struct spdk_bdev_io *bdev_io) 1512 { 1513 switch (bdev_io->type) { 1514 case SPDK_BDEV_IO_TYPE_NVME_IO: 1515 case SPDK_BDEV_IO_TYPE_NVME_IO_MD: 1516 /* Bit 1 (0x2) set for read operation */ 1517 if (bdev_io->u.nvme_passthru.cmd.opc & SPDK_NVME_OPC_READ) { 1518 return true; 1519 } else { 1520 return false; 1521 } 1522 case SPDK_BDEV_IO_TYPE_READ: 1523 return true; 1524 case SPDK_BDEV_IO_TYPE_ZCOPY: 1525 /* Populate to read from disk */ 1526 if (bdev_io->u.bdev.zcopy.populate) { 1527 return true; 1528 } else { 1529 return false; 1530 } 1531 default: 1532 return false; 1533 } 1534 } 1535 1536 static uint64_t 1537 bdev_get_io_size_in_byte(struct spdk_bdev_io *bdev_io) 1538 { 1539 struct spdk_bdev *bdev = bdev_io->bdev; 1540 1541 switch (bdev_io->type) { 1542 case SPDK_BDEV_IO_TYPE_NVME_IO: 1543 case SPDK_BDEV_IO_TYPE_NVME_IO_MD: 1544 return bdev_io->u.nvme_passthru.nbytes; 1545 case SPDK_BDEV_IO_TYPE_READ: 1546 case SPDK_BDEV_IO_TYPE_WRITE: 1547 return bdev_io->u.bdev.num_blocks * bdev->blocklen; 1548 case SPDK_BDEV_IO_TYPE_ZCOPY: 1549 /* Track the data in the start phase only */ 1550 if (bdev_io->u.bdev.zcopy.start) { 1551 return bdev_io->u.bdev.num_blocks * bdev->blocklen; 1552 } else { 1553 return 0; 1554 } 1555 default: 1556 return 0; 1557 } 1558 } 1559 1560 static bool 1561 bdev_qos_rw_queue_io(const struct spdk_bdev_qos_limit *limit, struct spdk_bdev_io *io) 1562 { 1563 if (limit->max_per_timeslice > 0 && limit->remaining_this_timeslice <= 0) { 1564 return true; 1565 } else { 1566 return false; 1567 } 1568 } 1569 1570 static bool 1571 bdev_qos_r_queue_io(const struct spdk_bdev_qos_limit *limit, struct spdk_bdev_io *io) 1572 { 1573 if (bdev_is_read_io(io) == false) { 1574 return false; 1575 } 1576 1577 return bdev_qos_rw_queue_io(limit, io); 1578 } 1579 1580 static bool 1581 bdev_qos_w_queue_io(const struct spdk_bdev_qos_limit *limit, struct spdk_bdev_io *io) 1582 { 1583 if (bdev_is_read_io(io) == true) { 1584 return false; 1585 } 1586 1587 return bdev_qos_rw_queue_io(limit, io); 1588 } 1589 1590 static void 1591 bdev_qos_rw_iops_update_quota(struct spdk_bdev_qos_limit *limit, struct spdk_bdev_io *io) 1592 { 1593 limit->remaining_this_timeslice--; 1594 } 1595 1596 static void 1597 bdev_qos_rw_bps_update_quota(struct spdk_bdev_qos_limit *limit, struct spdk_bdev_io *io) 1598 { 1599 limit->remaining_this_timeslice -= bdev_get_io_size_in_byte(io); 1600 } 1601 1602 static void 1603 bdev_qos_r_bps_update_quota(struct spdk_bdev_qos_limit *limit, struct spdk_bdev_io *io) 1604 { 1605 if (bdev_is_read_io(io) == false) { 1606 return; 1607 } 1608 1609 return bdev_qos_rw_bps_update_quota(limit, io); 1610 } 1611 1612 static void 1613 bdev_qos_w_bps_update_quota(struct spdk_bdev_qos_limit *limit, struct spdk_bdev_io *io) 1614 { 1615 if (bdev_is_read_io(io) == true) { 1616 return; 1617 } 1618 1619 return bdev_qos_rw_bps_update_quota(limit, io); 1620 } 1621 1622 static void 1623 bdev_qos_set_ops(struct spdk_bdev_qos *qos) 1624 { 1625 int i; 1626 1627 for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) { 1628 if (qos->rate_limits[i].limit == SPDK_BDEV_QOS_LIMIT_NOT_DEFINED) { 1629 qos->rate_limits[i].queue_io = NULL; 1630 qos->rate_limits[i].update_quota = NULL; 1631 continue; 1632 } 1633 1634 switch (i) { 1635 case SPDK_BDEV_QOS_RW_IOPS_RATE_LIMIT: 1636 qos->rate_limits[i].queue_io = bdev_qos_rw_queue_io; 1637 qos->rate_limits[i].update_quota = bdev_qos_rw_iops_update_quota; 1638 break; 1639 case SPDK_BDEV_QOS_RW_BPS_RATE_LIMIT: 1640 qos->rate_limits[i].queue_io = bdev_qos_rw_queue_io; 1641 qos->rate_limits[i].update_quota = bdev_qos_rw_bps_update_quota; 1642 break; 1643 case SPDK_BDEV_QOS_R_BPS_RATE_LIMIT: 1644 qos->rate_limits[i].queue_io = bdev_qos_r_queue_io; 1645 qos->rate_limits[i].update_quota = bdev_qos_r_bps_update_quota; 1646 break; 1647 case SPDK_BDEV_QOS_W_BPS_RATE_LIMIT: 1648 qos->rate_limits[i].queue_io = bdev_qos_w_queue_io; 1649 qos->rate_limits[i].update_quota = bdev_qos_w_bps_update_quota; 1650 break; 1651 default: 1652 break; 1653 } 1654 } 1655 } 1656 1657 static inline void 1658 bdev_io_do_submit(struct spdk_bdev_channel *bdev_ch, struct spdk_bdev_io *bdev_io) 1659 { 1660 struct spdk_bdev *bdev = bdev_io->bdev; 1661 struct spdk_io_channel *ch = bdev_ch->channel; 1662 struct spdk_bdev_shared_resource *shared_resource = bdev_ch->shared_resource; 1663 1664 if (spdk_likely(TAILQ_EMPTY(&shared_resource->nomem_io))) { 1665 bdev_ch->io_outstanding++; 1666 shared_resource->io_outstanding++; 1667 bdev_io->internal.in_submit_request = true; 1668 bdev->fn_table->submit_request(ch, bdev_io); 1669 bdev_io->internal.in_submit_request = false; 1670 } else { 1671 TAILQ_INSERT_TAIL(&shared_resource->nomem_io, bdev_io, internal.link); 1672 } 1673 } 1674 1675 static int 1676 bdev_qos_io_submit(struct spdk_bdev_channel *ch, struct spdk_bdev_qos *qos) 1677 { 1678 struct spdk_bdev_io *bdev_io = NULL, *tmp = NULL; 1679 int i, submitted_ios = 0; 1680 1681 TAILQ_FOREACH_SAFE(bdev_io, &qos->queued, internal.link, tmp) { 1682 if (bdev_qos_io_to_limit(bdev_io) == true) { 1683 for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) { 1684 if (!qos->rate_limits[i].queue_io) { 1685 continue; 1686 } 1687 1688 if (qos->rate_limits[i].queue_io(&qos->rate_limits[i], 1689 bdev_io) == true) { 1690 return submitted_ios; 1691 } 1692 } 1693 for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) { 1694 if (!qos->rate_limits[i].update_quota) { 1695 continue; 1696 } 1697 1698 qos->rate_limits[i].update_quota(&qos->rate_limits[i], bdev_io); 1699 } 1700 } 1701 1702 TAILQ_REMOVE(&qos->queued, bdev_io, internal.link); 1703 bdev_io_do_submit(ch, bdev_io); 1704 submitted_ios++; 1705 } 1706 1707 return submitted_ios; 1708 } 1709 1710 static void 1711 bdev_queue_io_wait_with_cb(struct spdk_bdev_io *bdev_io, spdk_bdev_io_wait_cb cb_fn) 1712 { 1713 int rc; 1714 1715 bdev_io->internal.waitq_entry.bdev = bdev_io->bdev; 1716 bdev_io->internal.waitq_entry.cb_fn = cb_fn; 1717 bdev_io->internal.waitq_entry.cb_arg = bdev_io; 1718 rc = spdk_bdev_queue_io_wait(bdev_io->bdev, spdk_io_channel_from_ctx(bdev_io->internal.ch), 1719 &bdev_io->internal.waitq_entry); 1720 if (rc != 0) { 1721 SPDK_ERRLOG("Queue IO failed, rc=%d\n", rc); 1722 bdev_io->internal.status = SPDK_BDEV_IO_STATUS_FAILED; 1723 bdev_io->internal.cb(bdev_io, false, bdev_io->internal.caller_ctx); 1724 } 1725 } 1726 1727 static bool 1728 bdev_io_type_can_split(uint8_t type) 1729 { 1730 assert(type != SPDK_BDEV_IO_TYPE_INVALID); 1731 assert(type < SPDK_BDEV_NUM_IO_TYPES); 1732 1733 /* Only split READ and WRITE I/O. Theoretically other types of I/O like 1734 * UNMAP could be split, but these types of I/O are typically much larger 1735 * in size (sometimes the size of the entire block device), and the bdev 1736 * module can more efficiently split these types of I/O. Plus those types 1737 * of I/O do not have a payload, which makes the splitting process simpler. 1738 */ 1739 if (type == SPDK_BDEV_IO_TYPE_READ || type == SPDK_BDEV_IO_TYPE_WRITE) { 1740 return true; 1741 } else { 1742 return false; 1743 } 1744 } 1745 1746 static bool 1747 bdev_io_should_split(struct spdk_bdev_io *bdev_io) 1748 { 1749 uint64_t start_stripe, end_stripe; 1750 uint32_t io_boundary = bdev_io->bdev->optimal_io_boundary; 1751 1752 if (io_boundary == 0) { 1753 return false; 1754 } 1755 1756 if (!bdev_io_type_can_split(bdev_io->type)) { 1757 return false; 1758 } 1759 1760 start_stripe = bdev_io->u.bdev.offset_blocks; 1761 end_stripe = start_stripe + bdev_io->u.bdev.num_blocks - 1; 1762 /* Avoid expensive div operations if possible. These spdk_u32 functions are very cheap. */ 1763 if (spdk_likely(spdk_u32_is_pow2(io_boundary))) { 1764 start_stripe >>= spdk_u32log2(io_boundary); 1765 end_stripe >>= spdk_u32log2(io_boundary); 1766 } else { 1767 start_stripe /= io_boundary; 1768 end_stripe /= io_boundary; 1769 } 1770 return (start_stripe != end_stripe); 1771 } 1772 1773 static uint32_t 1774 _to_next_boundary(uint64_t offset, uint32_t boundary) 1775 { 1776 return (boundary - (offset % boundary)); 1777 } 1778 1779 static void 1780 bdev_io_split_done(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg); 1781 1782 static void 1783 _bdev_io_split(void *_bdev_io) 1784 { 1785 struct spdk_bdev_io *bdev_io = _bdev_io; 1786 uint64_t current_offset, remaining; 1787 uint32_t blocklen, to_next_boundary, to_next_boundary_bytes, to_last_block_bytes; 1788 struct iovec *parent_iov, *iov; 1789 uint64_t parent_iov_offset, iov_len; 1790 uint32_t parent_iovpos, parent_iovcnt, child_iovcnt, iovcnt; 1791 void *md_buf = NULL; 1792 int rc; 1793 1794 remaining = bdev_io->u.bdev.split_remaining_num_blocks; 1795 current_offset = bdev_io->u.bdev.split_current_offset_blocks; 1796 blocklen = bdev_io->bdev->blocklen; 1797 parent_iov_offset = (current_offset - bdev_io->u.bdev.offset_blocks) * blocklen; 1798 parent_iovcnt = bdev_io->u.bdev.iovcnt; 1799 1800 for (parent_iovpos = 0; parent_iovpos < parent_iovcnt; parent_iovpos++) { 1801 parent_iov = &bdev_io->u.bdev.iovs[parent_iovpos]; 1802 if (parent_iov_offset < parent_iov->iov_len) { 1803 break; 1804 } 1805 parent_iov_offset -= parent_iov->iov_len; 1806 } 1807 1808 child_iovcnt = 0; 1809 while (remaining > 0 && parent_iovpos < parent_iovcnt && child_iovcnt < BDEV_IO_NUM_CHILD_IOV) { 1810 to_next_boundary = _to_next_boundary(current_offset, bdev_io->bdev->optimal_io_boundary); 1811 to_next_boundary = spdk_min(remaining, to_next_boundary); 1812 to_next_boundary_bytes = to_next_boundary * blocklen; 1813 iov = &bdev_io->child_iov[child_iovcnt]; 1814 iovcnt = 0; 1815 1816 if (bdev_io->u.bdev.md_buf) { 1817 assert((parent_iov_offset % blocklen) > 0); 1818 md_buf = (char *)bdev_io->u.bdev.md_buf + (parent_iov_offset / blocklen) * 1819 spdk_bdev_get_md_size(bdev_io->bdev); 1820 } 1821 1822 while (to_next_boundary_bytes > 0 && parent_iovpos < parent_iovcnt && 1823 child_iovcnt < BDEV_IO_NUM_CHILD_IOV) { 1824 parent_iov = &bdev_io->u.bdev.iovs[parent_iovpos]; 1825 iov_len = spdk_min(to_next_boundary_bytes, parent_iov->iov_len - parent_iov_offset); 1826 to_next_boundary_bytes -= iov_len; 1827 1828 bdev_io->child_iov[child_iovcnt].iov_base = parent_iov->iov_base + parent_iov_offset; 1829 bdev_io->child_iov[child_iovcnt].iov_len = iov_len; 1830 1831 if (iov_len < parent_iov->iov_len - parent_iov_offset) { 1832 parent_iov_offset += iov_len; 1833 } else { 1834 parent_iovpos++; 1835 parent_iov_offset = 0; 1836 } 1837 child_iovcnt++; 1838 iovcnt++; 1839 } 1840 1841 if (to_next_boundary_bytes > 0) { 1842 /* We had to stop this child I/O early because we ran out of 1843 * child_iov space. Ensure the iovs to be aligned with block 1844 * size and then adjust to_next_boundary before starting the 1845 * child I/O. 1846 */ 1847 assert(child_iovcnt == BDEV_IO_NUM_CHILD_IOV); 1848 to_last_block_bytes = to_next_boundary_bytes % blocklen; 1849 if (to_last_block_bytes != 0) { 1850 uint32_t child_iovpos = child_iovcnt - 1; 1851 /* don't decrease child_iovcnt so the loop will naturally end */ 1852 1853 to_last_block_bytes = blocklen - to_last_block_bytes; 1854 to_next_boundary_bytes += to_last_block_bytes; 1855 while (to_last_block_bytes > 0 && iovcnt > 0) { 1856 iov_len = spdk_min(to_last_block_bytes, 1857 bdev_io->child_iov[child_iovpos].iov_len); 1858 bdev_io->child_iov[child_iovpos].iov_len -= iov_len; 1859 if (bdev_io->child_iov[child_iovpos].iov_len == 0) { 1860 child_iovpos--; 1861 if (--iovcnt == 0) { 1862 return; 1863 } 1864 } 1865 to_last_block_bytes -= iov_len; 1866 } 1867 1868 assert(to_last_block_bytes == 0); 1869 } 1870 to_next_boundary -= to_next_boundary_bytes / blocklen; 1871 } 1872 1873 bdev_io->u.bdev.split_outstanding++; 1874 1875 if (bdev_io->type == SPDK_BDEV_IO_TYPE_READ) { 1876 rc = bdev_readv_blocks_with_md(bdev_io->internal.desc, 1877 spdk_io_channel_from_ctx(bdev_io->internal.ch), 1878 iov, iovcnt, md_buf, current_offset, 1879 to_next_boundary, 1880 bdev_io_split_done, bdev_io); 1881 } else { 1882 rc = bdev_writev_blocks_with_md(bdev_io->internal.desc, 1883 spdk_io_channel_from_ctx(bdev_io->internal.ch), 1884 iov, iovcnt, md_buf, current_offset, 1885 to_next_boundary, 1886 bdev_io_split_done, bdev_io); 1887 } 1888 1889 if (rc == 0) { 1890 current_offset += to_next_boundary; 1891 remaining -= to_next_boundary; 1892 bdev_io->u.bdev.split_current_offset_blocks = current_offset; 1893 bdev_io->u.bdev.split_remaining_num_blocks = remaining; 1894 } else { 1895 bdev_io->u.bdev.split_outstanding--; 1896 if (rc == -ENOMEM) { 1897 if (bdev_io->u.bdev.split_outstanding == 0) { 1898 /* No I/O is outstanding. Hence we should wait here. */ 1899 bdev_queue_io_wait_with_cb(bdev_io, _bdev_io_split); 1900 } 1901 } else { 1902 bdev_io->internal.status = SPDK_BDEV_IO_STATUS_FAILED; 1903 if (bdev_io->u.bdev.split_outstanding == 0) { 1904 spdk_trace_record_tsc(spdk_get_ticks(), TRACE_BDEV_IO_DONE, 0, 0, 1905 (uintptr_t)bdev_io, 0); 1906 TAILQ_REMOVE(&bdev_io->internal.ch->io_submitted, bdev_io, internal.ch_link); 1907 bdev_io->internal.cb(bdev_io, false, bdev_io->internal.caller_ctx); 1908 } 1909 } 1910 1911 return; 1912 } 1913 } 1914 } 1915 1916 static void 1917 bdev_io_split_done(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg) 1918 { 1919 struct spdk_bdev_io *parent_io = cb_arg; 1920 1921 spdk_bdev_free_io(bdev_io); 1922 1923 if (!success) { 1924 parent_io->internal.status = SPDK_BDEV_IO_STATUS_FAILED; 1925 } 1926 parent_io->u.bdev.split_outstanding--; 1927 if (parent_io->u.bdev.split_outstanding != 0) { 1928 return; 1929 } 1930 1931 /* 1932 * Parent I/O finishes when all blocks are consumed. 1933 */ 1934 if (parent_io->u.bdev.split_remaining_num_blocks == 0) { 1935 assert(parent_io->internal.cb != bdev_io_split_done); 1936 spdk_trace_record_tsc(spdk_get_ticks(), TRACE_BDEV_IO_DONE, 0, 0, 1937 (uintptr_t)parent_io, 0); 1938 TAILQ_REMOVE(&parent_io->internal.ch->io_submitted, parent_io, internal.ch_link); 1939 parent_io->internal.cb(parent_io, parent_io->internal.status == SPDK_BDEV_IO_STATUS_SUCCESS, 1940 parent_io->internal.caller_ctx); 1941 return; 1942 } 1943 1944 /* 1945 * Continue with the splitting process. This function will complete the parent I/O if the 1946 * splitting is done. 1947 */ 1948 _bdev_io_split(parent_io); 1949 } 1950 1951 static void 1952 bdev_io_split_get_buf_cb(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io, bool success); 1953 1954 static void 1955 bdev_io_split(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io) 1956 { 1957 assert(bdev_io_type_can_split(bdev_io->type)); 1958 1959 bdev_io->u.bdev.split_current_offset_blocks = bdev_io->u.bdev.offset_blocks; 1960 bdev_io->u.bdev.split_remaining_num_blocks = bdev_io->u.bdev.num_blocks; 1961 bdev_io->u.bdev.split_outstanding = 0; 1962 bdev_io->internal.status = SPDK_BDEV_IO_STATUS_SUCCESS; 1963 1964 if (_is_buf_allocated(bdev_io->u.bdev.iovs)) { 1965 _bdev_io_split(bdev_io); 1966 } else { 1967 assert(bdev_io->type == SPDK_BDEV_IO_TYPE_READ); 1968 spdk_bdev_io_get_buf(bdev_io, bdev_io_split_get_buf_cb, 1969 bdev_io->u.bdev.num_blocks * bdev_io->bdev->blocklen); 1970 } 1971 } 1972 1973 static void 1974 bdev_io_split_get_buf_cb(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io, bool success) 1975 { 1976 if (!success) { 1977 spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED); 1978 return; 1979 } 1980 1981 bdev_io_split(ch, bdev_io); 1982 } 1983 1984 /* Explicitly mark this inline, since it's used as a function pointer and otherwise won't 1985 * be inlined, at least on some compilers. 1986 */ 1987 static inline void 1988 _bdev_io_submit(void *ctx) 1989 { 1990 struct spdk_bdev_io *bdev_io = ctx; 1991 struct spdk_bdev *bdev = bdev_io->bdev; 1992 struct spdk_bdev_channel *bdev_ch = bdev_io->internal.ch; 1993 struct spdk_bdev_shared_resource *shared_resource = bdev_ch->shared_resource; 1994 uint64_t tsc; 1995 1996 tsc = spdk_get_ticks(); 1997 bdev_io->internal.submit_tsc = tsc; 1998 spdk_trace_record_tsc(tsc, TRACE_BDEV_IO_START, 0, 0, (uintptr_t)bdev_io, bdev_io->type); 1999 2000 if (spdk_likely(bdev_ch->flags == 0)) { 2001 bdev_io_do_submit(bdev_ch, bdev_io); 2002 return; 2003 } 2004 2005 bdev_ch->io_outstanding++; 2006 shared_resource->io_outstanding++; 2007 bdev_io->internal.in_submit_request = true; 2008 if (bdev_ch->flags & BDEV_CH_RESET_IN_PROGRESS) { 2009 spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED); 2010 } else if (bdev_ch->flags & BDEV_CH_QOS_ENABLED) { 2011 bdev_ch->io_outstanding--; 2012 shared_resource->io_outstanding--; 2013 TAILQ_INSERT_TAIL(&bdev->internal.qos->queued, bdev_io, internal.link); 2014 bdev_qos_io_submit(bdev_ch, bdev->internal.qos); 2015 } else { 2016 SPDK_ERRLOG("unknown bdev_ch flag %x found\n", bdev_ch->flags); 2017 spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED); 2018 } 2019 bdev_io->internal.in_submit_request = false; 2020 } 2021 2022 bool 2023 bdev_lba_range_overlapped(struct lba_range *range1, struct lba_range *range2); 2024 2025 bool 2026 bdev_lba_range_overlapped(struct lba_range *range1, struct lba_range *range2) 2027 { 2028 if (range1->length == 0 || range2->length == 0) { 2029 return false; 2030 } 2031 2032 if (range1->offset + range1->length <= range2->offset) { 2033 return false; 2034 } 2035 2036 if (range2->offset + range2->length <= range1->offset) { 2037 return false; 2038 } 2039 2040 return true; 2041 } 2042 2043 static bool 2044 bdev_io_range_is_locked(struct spdk_bdev_io *bdev_io, struct lba_range *range) 2045 { 2046 struct spdk_bdev_channel *ch = bdev_io->internal.ch; 2047 struct lba_range r; 2048 2049 switch (bdev_io->type) { 2050 case SPDK_BDEV_IO_TYPE_NVME_IO: 2051 case SPDK_BDEV_IO_TYPE_NVME_IO_MD: 2052 /* Don't try to decode the NVMe command - just assume worst-case and that 2053 * it overlaps a locked range. 2054 */ 2055 return true; 2056 case SPDK_BDEV_IO_TYPE_WRITE: 2057 case SPDK_BDEV_IO_TYPE_UNMAP: 2058 case SPDK_BDEV_IO_TYPE_WRITE_ZEROES: 2059 case SPDK_BDEV_IO_TYPE_ZCOPY: 2060 r.offset = bdev_io->u.bdev.offset_blocks; 2061 r.length = bdev_io->u.bdev.num_blocks; 2062 if (!bdev_lba_range_overlapped(range, &r)) { 2063 /* This I/O doesn't overlap the specified LBA range. */ 2064 return false; 2065 } else if (range->owner_ch == ch && range->locked_ctx == bdev_io->internal.caller_ctx) { 2066 /* This I/O overlaps, but the I/O is on the same channel that locked this 2067 * range, and the caller_ctx is the same as the locked_ctx. This means 2068 * that this I/O is associated with the lock, and is allowed to execute. 2069 */ 2070 return false; 2071 } else { 2072 return true; 2073 } 2074 default: 2075 return false; 2076 } 2077 } 2078 2079 void 2080 bdev_io_submit(struct spdk_bdev_io *bdev_io) 2081 { 2082 struct spdk_bdev *bdev = bdev_io->bdev; 2083 struct spdk_thread *thread = spdk_bdev_io_get_thread(bdev_io); 2084 struct spdk_bdev_channel *ch = bdev_io->internal.ch; 2085 2086 assert(thread != NULL); 2087 assert(bdev_io->internal.status == SPDK_BDEV_IO_STATUS_PENDING); 2088 2089 if (!TAILQ_EMPTY(&ch->locked_ranges)) { 2090 struct lba_range *range; 2091 2092 TAILQ_FOREACH(range, &ch->locked_ranges, tailq) { 2093 if (bdev_io_range_is_locked(bdev_io, range)) { 2094 TAILQ_INSERT_TAIL(&ch->io_locked, bdev_io, internal.ch_link); 2095 return; 2096 } 2097 } 2098 } 2099 2100 /* Add the bdev_io to io_submitted only if it is the original 2101 * submission from the bdev user. When a bdev_io is split, 2102 * it comes back through this code path, so we need to make sure 2103 * we don't try to add it a second time. 2104 */ 2105 if (bdev_io->internal.cb != bdev_io_split_done) { 2106 TAILQ_INSERT_TAIL(&ch->io_submitted, bdev_io, internal.ch_link); 2107 } 2108 2109 if (bdev->split_on_optimal_io_boundary && bdev_io_should_split(bdev_io)) { 2110 bdev_io->internal.submit_tsc = spdk_get_ticks(); 2111 spdk_trace_record_tsc(bdev_io->internal.submit_tsc, TRACE_BDEV_IO_START, 0, 0, 2112 (uintptr_t)bdev_io, bdev_io->type); 2113 bdev_io_split(NULL, bdev_io); 2114 return; 2115 } 2116 2117 if (ch->flags & BDEV_CH_QOS_ENABLED) { 2118 if ((thread == bdev->internal.qos->thread) || !bdev->internal.qos->thread) { 2119 _bdev_io_submit(bdev_io); 2120 } else { 2121 bdev_io->internal.io_submit_ch = ch; 2122 bdev_io->internal.ch = bdev->internal.qos->ch; 2123 spdk_thread_send_msg(bdev->internal.qos->thread, _bdev_io_submit, bdev_io); 2124 } 2125 } else { 2126 _bdev_io_submit(bdev_io); 2127 } 2128 } 2129 2130 static void 2131 bdev_io_submit_reset(struct spdk_bdev_io *bdev_io) 2132 { 2133 struct spdk_bdev *bdev = bdev_io->bdev; 2134 struct spdk_bdev_channel *bdev_ch = bdev_io->internal.ch; 2135 struct spdk_io_channel *ch = bdev_ch->channel; 2136 2137 assert(bdev_io->internal.status == SPDK_BDEV_IO_STATUS_PENDING); 2138 2139 bdev_io->internal.in_submit_request = true; 2140 bdev->fn_table->submit_request(ch, bdev_io); 2141 bdev_io->internal.in_submit_request = false; 2142 } 2143 2144 void 2145 bdev_io_init(struct spdk_bdev_io *bdev_io, 2146 struct spdk_bdev *bdev, void *cb_arg, 2147 spdk_bdev_io_completion_cb cb) 2148 { 2149 bdev_io->bdev = bdev; 2150 bdev_io->internal.caller_ctx = cb_arg; 2151 bdev_io->internal.cb = cb; 2152 bdev_io->internal.status = SPDK_BDEV_IO_STATUS_PENDING; 2153 bdev_io->internal.in_submit_request = false; 2154 bdev_io->internal.buf = NULL; 2155 bdev_io->internal.io_submit_ch = NULL; 2156 bdev_io->internal.orig_iovs = NULL; 2157 bdev_io->internal.orig_iovcnt = 0; 2158 bdev_io->internal.orig_md_buf = NULL; 2159 bdev_io->internal.error.nvme.cdw0 = 0; 2160 bdev_io->num_retries = 0; 2161 bdev_io->internal.get_buf_cb = NULL; 2162 bdev_io->internal.get_aux_buf_cb = NULL; 2163 } 2164 2165 static bool 2166 bdev_io_type_supported(struct spdk_bdev *bdev, enum spdk_bdev_io_type io_type) 2167 { 2168 return bdev->fn_table->io_type_supported(bdev->ctxt, io_type); 2169 } 2170 2171 bool 2172 spdk_bdev_io_type_supported(struct spdk_bdev *bdev, enum spdk_bdev_io_type io_type) 2173 { 2174 bool supported; 2175 2176 supported = bdev_io_type_supported(bdev, io_type); 2177 2178 if (!supported) { 2179 switch (io_type) { 2180 case SPDK_BDEV_IO_TYPE_WRITE_ZEROES: 2181 /* The bdev layer will emulate write zeroes as long as write is supported. */ 2182 supported = bdev_io_type_supported(bdev, SPDK_BDEV_IO_TYPE_WRITE); 2183 break; 2184 case SPDK_BDEV_IO_TYPE_ZCOPY: 2185 /* Zero copy can be emulated with regular read and write */ 2186 supported = bdev_io_type_supported(bdev, SPDK_BDEV_IO_TYPE_READ) && 2187 bdev_io_type_supported(bdev, SPDK_BDEV_IO_TYPE_WRITE); 2188 break; 2189 default: 2190 break; 2191 } 2192 } 2193 2194 return supported; 2195 } 2196 2197 int 2198 spdk_bdev_dump_info_json(struct spdk_bdev *bdev, struct spdk_json_write_ctx *w) 2199 { 2200 if (bdev->fn_table->dump_info_json) { 2201 return bdev->fn_table->dump_info_json(bdev->ctxt, w); 2202 } 2203 2204 return 0; 2205 } 2206 2207 static void 2208 bdev_qos_update_max_quota_per_timeslice(struct spdk_bdev_qos *qos) 2209 { 2210 uint32_t max_per_timeslice = 0; 2211 int i; 2212 2213 for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) { 2214 if (qos->rate_limits[i].limit == SPDK_BDEV_QOS_LIMIT_NOT_DEFINED) { 2215 qos->rate_limits[i].max_per_timeslice = 0; 2216 continue; 2217 } 2218 2219 max_per_timeslice = qos->rate_limits[i].limit * 2220 SPDK_BDEV_QOS_TIMESLICE_IN_USEC / SPDK_SEC_TO_USEC; 2221 2222 qos->rate_limits[i].max_per_timeslice = spdk_max(max_per_timeslice, 2223 qos->rate_limits[i].min_per_timeslice); 2224 2225 qos->rate_limits[i].remaining_this_timeslice = qos->rate_limits[i].max_per_timeslice; 2226 } 2227 2228 bdev_qos_set_ops(qos); 2229 } 2230 2231 static int 2232 bdev_channel_poll_qos(void *arg) 2233 { 2234 struct spdk_bdev_qos *qos = arg; 2235 uint64_t now = spdk_get_ticks(); 2236 int i; 2237 2238 if (now < (qos->last_timeslice + qos->timeslice_size)) { 2239 /* We received our callback earlier than expected - return 2240 * immediately and wait to do accounting until at least one 2241 * timeslice has actually expired. This should never happen 2242 * with a well-behaved timer implementation. 2243 */ 2244 return 0; 2245 } 2246 2247 /* Reset for next round of rate limiting */ 2248 for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) { 2249 /* We may have allowed the IOs or bytes to slightly overrun in the last 2250 * timeslice. remaining_this_timeslice is signed, so if it's negative 2251 * here, we'll account for the overrun so that the next timeslice will 2252 * be appropriately reduced. 2253 */ 2254 if (qos->rate_limits[i].remaining_this_timeslice > 0) { 2255 qos->rate_limits[i].remaining_this_timeslice = 0; 2256 } 2257 } 2258 2259 while (now >= (qos->last_timeslice + qos->timeslice_size)) { 2260 qos->last_timeslice += qos->timeslice_size; 2261 for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) { 2262 qos->rate_limits[i].remaining_this_timeslice += 2263 qos->rate_limits[i].max_per_timeslice; 2264 } 2265 } 2266 2267 return bdev_qos_io_submit(qos->ch, qos); 2268 } 2269 2270 static void 2271 bdev_channel_destroy_resource(struct spdk_bdev_channel *ch) 2272 { 2273 struct spdk_bdev_shared_resource *shared_resource; 2274 struct lba_range *range; 2275 2276 while (!TAILQ_EMPTY(&ch->locked_ranges)) { 2277 range = TAILQ_FIRST(&ch->locked_ranges); 2278 TAILQ_REMOVE(&ch->locked_ranges, range, tailq); 2279 free(range); 2280 } 2281 2282 spdk_put_io_channel(ch->channel); 2283 2284 shared_resource = ch->shared_resource; 2285 2286 assert(TAILQ_EMPTY(&ch->io_locked)); 2287 assert(TAILQ_EMPTY(&ch->io_submitted)); 2288 assert(ch->io_outstanding == 0); 2289 assert(shared_resource->ref > 0); 2290 shared_resource->ref--; 2291 if (shared_resource->ref == 0) { 2292 assert(shared_resource->io_outstanding == 0); 2293 TAILQ_REMOVE(&shared_resource->mgmt_ch->shared_resources, shared_resource, link); 2294 spdk_put_io_channel(spdk_io_channel_from_ctx(shared_resource->mgmt_ch)); 2295 free(shared_resource); 2296 } 2297 } 2298 2299 /* Caller must hold bdev->internal.mutex. */ 2300 static void 2301 bdev_enable_qos(struct spdk_bdev *bdev, struct spdk_bdev_channel *ch) 2302 { 2303 struct spdk_bdev_qos *qos = bdev->internal.qos; 2304 int i; 2305 2306 /* Rate limiting on this bdev enabled */ 2307 if (qos) { 2308 if (qos->ch == NULL) { 2309 struct spdk_io_channel *io_ch; 2310 2311 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Selecting channel %p as QoS channel for bdev %s on thread %p\n", ch, 2312 bdev->name, spdk_get_thread()); 2313 2314 /* No qos channel has been selected, so set one up */ 2315 2316 /* Take another reference to ch */ 2317 io_ch = spdk_get_io_channel(__bdev_to_io_dev(bdev)); 2318 assert(io_ch != NULL); 2319 qos->ch = ch; 2320 2321 qos->thread = spdk_io_channel_get_thread(io_ch); 2322 2323 TAILQ_INIT(&qos->queued); 2324 2325 for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) { 2326 if (bdev_qos_is_iops_rate_limit(i) == true) { 2327 qos->rate_limits[i].min_per_timeslice = 2328 SPDK_BDEV_QOS_MIN_IO_PER_TIMESLICE; 2329 } else { 2330 qos->rate_limits[i].min_per_timeslice = 2331 SPDK_BDEV_QOS_MIN_BYTE_PER_TIMESLICE; 2332 } 2333 2334 if (qos->rate_limits[i].limit == 0) { 2335 qos->rate_limits[i].limit = SPDK_BDEV_QOS_LIMIT_NOT_DEFINED; 2336 } 2337 } 2338 bdev_qos_update_max_quota_per_timeslice(qos); 2339 qos->timeslice_size = 2340 SPDK_BDEV_QOS_TIMESLICE_IN_USEC * spdk_get_ticks_hz() / SPDK_SEC_TO_USEC; 2341 qos->last_timeslice = spdk_get_ticks(); 2342 qos->poller = SPDK_POLLER_REGISTER(bdev_channel_poll_qos, 2343 qos, 2344 SPDK_BDEV_QOS_TIMESLICE_IN_USEC); 2345 } 2346 2347 ch->flags |= BDEV_CH_QOS_ENABLED; 2348 } 2349 } 2350 2351 struct poll_timeout_ctx { 2352 struct spdk_bdev_desc *desc; 2353 uint64_t timeout_in_sec; 2354 spdk_bdev_io_timeout_cb cb_fn; 2355 void *cb_arg; 2356 }; 2357 2358 static void 2359 bdev_desc_free(struct spdk_bdev_desc *desc) 2360 { 2361 pthread_mutex_destroy(&desc->mutex); 2362 free(desc->media_events_buffer); 2363 free(desc); 2364 } 2365 2366 static void 2367 bdev_channel_poll_timeout_io_done(struct spdk_io_channel_iter *i, int status) 2368 { 2369 struct poll_timeout_ctx *ctx = spdk_io_channel_iter_get_ctx(i); 2370 struct spdk_bdev_desc *desc = ctx->desc; 2371 2372 free(ctx); 2373 2374 pthread_mutex_lock(&desc->mutex); 2375 desc->refs--; 2376 if (desc->closed == true && desc->refs == 0) { 2377 pthread_mutex_unlock(&desc->mutex); 2378 bdev_desc_free(desc); 2379 return; 2380 } 2381 pthread_mutex_unlock(&desc->mutex); 2382 } 2383 2384 static void 2385 bdev_channel_poll_timeout_io(struct spdk_io_channel_iter *i) 2386 { 2387 struct poll_timeout_ctx *ctx = spdk_io_channel_iter_get_ctx(i); 2388 struct spdk_io_channel *io_ch = spdk_io_channel_iter_get_channel(i); 2389 struct spdk_bdev_channel *bdev_ch = spdk_io_channel_get_ctx(io_ch); 2390 struct spdk_bdev_desc *desc = ctx->desc; 2391 struct spdk_bdev_io *bdev_io; 2392 uint64_t now; 2393 2394 pthread_mutex_lock(&desc->mutex); 2395 if (desc->closed == true) { 2396 pthread_mutex_unlock(&desc->mutex); 2397 spdk_for_each_channel_continue(i, -1); 2398 return; 2399 } 2400 pthread_mutex_unlock(&desc->mutex); 2401 2402 now = spdk_get_ticks(); 2403 TAILQ_FOREACH(bdev_io, &bdev_ch->io_submitted, internal.ch_link) { 2404 /* I/O are added to this TAILQ as they are submitted. 2405 * So once we find an I/O that has not timed out, we can immediately exit the loop. */ 2406 if (now < (bdev_io->internal.submit_tsc + 2407 ctx->timeout_in_sec * spdk_get_ticks_hz())) { 2408 goto end; 2409 } 2410 2411 if (bdev_io->internal.desc == desc) { 2412 ctx->cb_fn(ctx->cb_arg, bdev_io); 2413 } 2414 } 2415 2416 end: 2417 spdk_for_each_channel_continue(i, 0); 2418 } 2419 2420 static int 2421 bdev_poll_timeout_io(void *arg) 2422 { 2423 struct spdk_bdev_desc *desc = arg; 2424 struct spdk_bdev *bdev = spdk_bdev_desc_get_bdev(desc); 2425 struct poll_timeout_ctx *ctx; 2426 2427 ctx = calloc(1, sizeof(struct poll_timeout_ctx)); 2428 if (!ctx) { 2429 SPDK_ERRLOG("failed to allocate memory\n"); 2430 return 1; 2431 } 2432 ctx->desc = desc; 2433 ctx->cb_arg = desc->cb_arg; 2434 ctx->cb_fn = desc->cb_fn; 2435 ctx->timeout_in_sec = desc->timeout_in_sec; 2436 2437 /* Take a ref on the descriptor in case it gets closed while we are checking 2438 * all of the channels. 2439 */ 2440 pthread_mutex_lock(&desc->mutex); 2441 desc->refs++; 2442 pthread_mutex_unlock(&desc->mutex); 2443 2444 spdk_for_each_channel(__bdev_to_io_dev(bdev), 2445 bdev_channel_poll_timeout_io, 2446 ctx, 2447 bdev_channel_poll_timeout_io_done); 2448 2449 return 1; 2450 } 2451 2452 int 2453 spdk_bdev_set_timeout(struct spdk_bdev_desc *desc, uint64_t timeout_in_sec, 2454 spdk_bdev_io_timeout_cb cb_fn, void *cb_arg) 2455 { 2456 assert(desc->thread == spdk_get_thread()); 2457 2458 spdk_poller_unregister(&desc->io_timeout_poller); 2459 2460 if (timeout_in_sec) { 2461 assert(cb_fn != NULL); 2462 desc->io_timeout_poller = SPDK_POLLER_REGISTER(bdev_poll_timeout_io, 2463 desc, 2464 SPDK_BDEV_IO_POLL_INTERVAL_IN_MSEC * SPDK_SEC_TO_USEC / 2465 1000); 2466 if (desc->io_timeout_poller == NULL) { 2467 SPDK_ERRLOG("can not register the desc timeout IO poller\n"); 2468 return -1; 2469 } 2470 } 2471 2472 desc->cb_fn = cb_fn; 2473 desc->cb_arg = cb_arg; 2474 desc->timeout_in_sec = timeout_in_sec; 2475 2476 return 0; 2477 } 2478 2479 static int 2480 bdev_channel_create(void *io_device, void *ctx_buf) 2481 { 2482 struct spdk_bdev *bdev = __bdev_from_io_dev(io_device); 2483 struct spdk_bdev_channel *ch = ctx_buf; 2484 struct spdk_io_channel *mgmt_io_ch; 2485 struct spdk_bdev_mgmt_channel *mgmt_ch; 2486 struct spdk_bdev_shared_resource *shared_resource; 2487 struct lba_range *range; 2488 2489 ch->bdev = bdev; 2490 ch->channel = bdev->fn_table->get_io_channel(bdev->ctxt); 2491 if (!ch->channel) { 2492 return -1; 2493 } 2494 2495 assert(ch->histogram == NULL); 2496 if (bdev->internal.histogram_enabled) { 2497 ch->histogram = spdk_histogram_data_alloc(); 2498 if (ch->histogram == NULL) { 2499 SPDK_ERRLOG("Could not allocate histogram\n"); 2500 } 2501 } 2502 2503 mgmt_io_ch = spdk_get_io_channel(&g_bdev_mgr); 2504 if (!mgmt_io_ch) { 2505 spdk_put_io_channel(ch->channel); 2506 return -1; 2507 } 2508 2509 mgmt_ch = spdk_io_channel_get_ctx(mgmt_io_ch); 2510 TAILQ_FOREACH(shared_resource, &mgmt_ch->shared_resources, link) { 2511 if (shared_resource->shared_ch == ch->channel) { 2512 spdk_put_io_channel(mgmt_io_ch); 2513 shared_resource->ref++; 2514 break; 2515 } 2516 } 2517 2518 if (shared_resource == NULL) { 2519 shared_resource = calloc(1, sizeof(*shared_resource)); 2520 if (shared_resource == NULL) { 2521 spdk_put_io_channel(ch->channel); 2522 spdk_put_io_channel(mgmt_io_ch); 2523 return -1; 2524 } 2525 2526 shared_resource->mgmt_ch = mgmt_ch; 2527 shared_resource->io_outstanding = 0; 2528 TAILQ_INIT(&shared_resource->nomem_io); 2529 shared_resource->nomem_threshold = 0; 2530 shared_resource->shared_ch = ch->channel; 2531 shared_resource->ref = 1; 2532 TAILQ_INSERT_TAIL(&mgmt_ch->shared_resources, shared_resource, link); 2533 } 2534 2535 memset(&ch->stat, 0, sizeof(ch->stat)); 2536 ch->stat.ticks_rate = spdk_get_ticks_hz(); 2537 ch->io_outstanding = 0; 2538 TAILQ_INIT(&ch->queued_resets); 2539 TAILQ_INIT(&ch->locked_ranges); 2540 ch->flags = 0; 2541 ch->shared_resource = shared_resource; 2542 2543 TAILQ_INIT(&ch->io_submitted); 2544 TAILQ_INIT(&ch->io_locked); 2545 2546 #ifdef SPDK_CONFIG_VTUNE 2547 { 2548 char *name; 2549 __itt_init_ittlib(NULL, 0); 2550 name = spdk_sprintf_alloc("spdk_bdev_%s_%p", ch->bdev->name, ch); 2551 if (!name) { 2552 bdev_channel_destroy_resource(ch); 2553 return -1; 2554 } 2555 ch->handle = __itt_string_handle_create(name); 2556 free(name); 2557 ch->start_tsc = spdk_get_ticks(); 2558 ch->interval_tsc = spdk_get_ticks_hz() / 100; 2559 memset(&ch->prev_stat, 0, sizeof(ch->prev_stat)); 2560 } 2561 #endif 2562 2563 pthread_mutex_lock(&bdev->internal.mutex); 2564 bdev_enable_qos(bdev, ch); 2565 2566 TAILQ_FOREACH(range, &bdev->internal.locked_ranges, tailq) { 2567 struct lba_range *new_range; 2568 2569 new_range = calloc(1, sizeof(*new_range)); 2570 if (new_range == NULL) { 2571 pthread_mutex_unlock(&bdev->internal.mutex); 2572 bdev_channel_destroy_resource(ch); 2573 return -1; 2574 } 2575 new_range->length = range->length; 2576 new_range->offset = range->offset; 2577 new_range->locked_ctx = range->locked_ctx; 2578 TAILQ_INSERT_TAIL(&ch->locked_ranges, new_range, tailq); 2579 } 2580 2581 pthread_mutex_unlock(&bdev->internal.mutex); 2582 2583 return 0; 2584 } 2585 2586 /* 2587 * Abort I/O that are waiting on a data buffer. These types of I/O are 2588 * linked using the spdk_bdev_io internal.buf_link TAILQ_ENTRY. 2589 */ 2590 static void 2591 bdev_abort_buf_io(bdev_io_stailq_t *queue, struct spdk_bdev_channel *ch) 2592 { 2593 bdev_io_stailq_t tmp; 2594 struct spdk_bdev_io *bdev_io; 2595 2596 STAILQ_INIT(&tmp); 2597 2598 while (!STAILQ_EMPTY(queue)) { 2599 bdev_io = STAILQ_FIRST(queue); 2600 STAILQ_REMOVE_HEAD(queue, internal.buf_link); 2601 if (bdev_io->internal.ch == ch) { 2602 spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED); 2603 } else { 2604 STAILQ_INSERT_TAIL(&tmp, bdev_io, internal.buf_link); 2605 } 2606 } 2607 2608 STAILQ_SWAP(&tmp, queue, spdk_bdev_io); 2609 } 2610 2611 /* 2612 * Abort I/O that are queued waiting for submission. These types of I/O are 2613 * linked using the spdk_bdev_io link TAILQ_ENTRY. 2614 */ 2615 static void 2616 bdev_abort_queued_io(bdev_io_tailq_t *queue, struct spdk_bdev_channel *ch) 2617 { 2618 struct spdk_bdev_io *bdev_io, *tmp; 2619 2620 TAILQ_FOREACH_SAFE(bdev_io, queue, internal.link, tmp) { 2621 if (bdev_io->internal.ch == ch) { 2622 TAILQ_REMOVE(queue, bdev_io, internal.link); 2623 /* 2624 * spdk_bdev_io_complete() assumes that the completed I/O had 2625 * been submitted to the bdev module. Since in this case it 2626 * hadn't, bump io_outstanding to account for the decrement 2627 * that spdk_bdev_io_complete() will do. 2628 */ 2629 if (bdev_io->type != SPDK_BDEV_IO_TYPE_RESET) { 2630 ch->io_outstanding++; 2631 ch->shared_resource->io_outstanding++; 2632 } 2633 spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED); 2634 } 2635 } 2636 } 2637 2638 static void 2639 bdev_qos_channel_destroy(void *cb_arg) 2640 { 2641 struct spdk_bdev_qos *qos = cb_arg; 2642 2643 spdk_put_io_channel(spdk_io_channel_from_ctx(qos->ch)); 2644 spdk_poller_unregister(&qos->poller); 2645 2646 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Free QoS %p.\n", qos); 2647 2648 free(qos); 2649 } 2650 2651 static int 2652 bdev_qos_destroy(struct spdk_bdev *bdev) 2653 { 2654 int i; 2655 2656 /* 2657 * Cleanly shutting down the QoS poller is tricky, because 2658 * during the asynchronous operation the user could open 2659 * a new descriptor and create a new channel, spawning 2660 * a new QoS poller. 2661 * 2662 * The strategy is to create a new QoS structure here and swap it 2663 * in. The shutdown path then continues to refer to the old one 2664 * until it completes and then releases it. 2665 */ 2666 struct spdk_bdev_qos *new_qos, *old_qos; 2667 2668 old_qos = bdev->internal.qos; 2669 2670 new_qos = calloc(1, sizeof(*new_qos)); 2671 if (!new_qos) { 2672 SPDK_ERRLOG("Unable to allocate memory to shut down QoS.\n"); 2673 return -ENOMEM; 2674 } 2675 2676 /* Copy the old QoS data into the newly allocated structure */ 2677 memcpy(new_qos, old_qos, sizeof(*new_qos)); 2678 2679 /* Zero out the key parts of the QoS structure */ 2680 new_qos->ch = NULL; 2681 new_qos->thread = NULL; 2682 new_qos->poller = NULL; 2683 TAILQ_INIT(&new_qos->queued); 2684 /* 2685 * The limit member of spdk_bdev_qos_limit structure is not zeroed. 2686 * It will be used later for the new QoS structure. 2687 */ 2688 for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) { 2689 new_qos->rate_limits[i].remaining_this_timeslice = 0; 2690 new_qos->rate_limits[i].min_per_timeslice = 0; 2691 new_qos->rate_limits[i].max_per_timeslice = 0; 2692 } 2693 2694 bdev->internal.qos = new_qos; 2695 2696 if (old_qos->thread == NULL) { 2697 free(old_qos); 2698 } else { 2699 spdk_thread_send_msg(old_qos->thread, bdev_qos_channel_destroy, old_qos); 2700 } 2701 2702 /* It is safe to continue with destroying the bdev even though the QoS channel hasn't 2703 * been destroyed yet. The destruction path will end up waiting for the final 2704 * channel to be put before it releases resources. */ 2705 2706 return 0; 2707 } 2708 2709 static void 2710 bdev_io_stat_add(struct spdk_bdev_io_stat *total, struct spdk_bdev_io_stat *add) 2711 { 2712 total->bytes_read += add->bytes_read; 2713 total->num_read_ops += add->num_read_ops; 2714 total->bytes_written += add->bytes_written; 2715 total->num_write_ops += add->num_write_ops; 2716 total->bytes_unmapped += add->bytes_unmapped; 2717 total->num_unmap_ops += add->num_unmap_ops; 2718 total->read_latency_ticks += add->read_latency_ticks; 2719 total->write_latency_ticks += add->write_latency_ticks; 2720 total->unmap_latency_ticks += add->unmap_latency_ticks; 2721 } 2722 2723 static void 2724 bdev_channel_destroy(void *io_device, void *ctx_buf) 2725 { 2726 struct spdk_bdev_channel *ch = ctx_buf; 2727 struct spdk_bdev_mgmt_channel *mgmt_ch; 2728 struct spdk_bdev_shared_resource *shared_resource = ch->shared_resource; 2729 2730 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Destroying channel %p for bdev %s on thread %p\n", ch, ch->bdev->name, 2731 spdk_get_thread()); 2732 2733 /* This channel is going away, so add its statistics into the bdev so that they don't get lost. */ 2734 pthread_mutex_lock(&ch->bdev->internal.mutex); 2735 bdev_io_stat_add(&ch->bdev->internal.stat, &ch->stat); 2736 pthread_mutex_unlock(&ch->bdev->internal.mutex); 2737 2738 mgmt_ch = shared_resource->mgmt_ch; 2739 2740 bdev_abort_queued_io(&ch->queued_resets, ch); 2741 bdev_abort_queued_io(&shared_resource->nomem_io, ch); 2742 bdev_abort_buf_io(&mgmt_ch->need_buf_small, ch); 2743 bdev_abort_buf_io(&mgmt_ch->need_buf_large, ch); 2744 2745 if (ch->histogram) { 2746 spdk_histogram_data_free(ch->histogram); 2747 } 2748 2749 bdev_channel_destroy_resource(ch); 2750 } 2751 2752 int 2753 spdk_bdev_alias_add(struct spdk_bdev *bdev, const char *alias) 2754 { 2755 struct spdk_bdev_alias *tmp; 2756 2757 if (alias == NULL) { 2758 SPDK_ERRLOG("Empty alias passed\n"); 2759 return -EINVAL; 2760 } 2761 2762 if (spdk_bdev_get_by_name(alias)) { 2763 SPDK_ERRLOG("Bdev name/alias: %s already exists\n", alias); 2764 return -EEXIST; 2765 } 2766 2767 tmp = calloc(1, sizeof(*tmp)); 2768 if (tmp == NULL) { 2769 SPDK_ERRLOG("Unable to allocate alias\n"); 2770 return -ENOMEM; 2771 } 2772 2773 tmp->alias = strdup(alias); 2774 if (tmp->alias == NULL) { 2775 free(tmp); 2776 SPDK_ERRLOG("Unable to allocate alias\n"); 2777 return -ENOMEM; 2778 } 2779 2780 TAILQ_INSERT_TAIL(&bdev->aliases, tmp, tailq); 2781 2782 return 0; 2783 } 2784 2785 int 2786 spdk_bdev_alias_del(struct spdk_bdev *bdev, const char *alias) 2787 { 2788 struct spdk_bdev_alias *tmp; 2789 2790 TAILQ_FOREACH(tmp, &bdev->aliases, tailq) { 2791 if (strcmp(alias, tmp->alias) == 0) { 2792 TAILQ_REMOVE(&bdev->aliases, tmp, tailq); 2793 free(tmp->alias); 2794 free(tmp); 2795 return 0; 2796 } 2797 } 2798 2799 SPDK_INFOLOG(SPDK_LOG_BDEV, "Alias %s does not exists\n", alias); 2800 2801 return -ENOENT; 2802 } 2803 2804 void 2805 spdk_bdev_alias_del_all(struct spdk_bdev *bdev) 2806 { 2807 struct spdk_bdev_alias *p, *tmp; 2808 2809 TAILQ_FOREACH_SAFE(p, &bdev->aliases, tailq, tmp) { 2810 TAILQ_REMOVE(&bdev->aliases, p, tailq); 2811 free(p->alias); 2812 free(p); 2813 } 2814 } 2815 2816 struct spdk_io_channel * 2817 spdk_bdev_get_io_channel(struct spdk_bdev_desc *desc) 2818 { 2819 return spdk_get_io_channel(__bdev_to_io_dev(spdk_bdev_desc_get_bdev(desc))); 2820 } 2821 2822 const char * 2823 spdk_bdev_get_name(const struct spdk_bdev *bdev) 2824 { 2825 return bdev->name; 2826 } 2827 2828 const char * 2829 spdk_bdev_get_product_name(const struct spdk_bdev *bdev) 2830 { 2831 return bdev->product_name; 2832 } 2833 2834 const struct spdk_bdev_aliases_list * 2835 spdk_bdev_get_aliases(const struct spdk_bdev *bdev) 2836 { 2837 return &bdev->aliases; 2838 } 2839 2840 uint32_t 2841 spdk_bdev_get_block_size(const struct spdk_bdev *bdev) 2842 { 2843 return bdev->blocklen; 2844 } 2845 2846 uint32_t 2847 spdk_bdev_get_write_unit_size(const struct spdk_bdev *bdev) 2848 { 2849 return bdev->write_unit_size; 2850 } 2851 2852 uint64_t 2853 spdk_bdev_get_num_blocks(const struct spdk_bdev *bdev) 2854 { 2855 return bdev->blockcnt; 2856 } 2857 2858 const char * 2859 spdk_bdev_get_qos_rpc_type(enum spdk_bdev_qos_rate_limit_type type) 2860 { 2861 return qos_rpc_type[type]; 2862 } 2863 2864 void 2865 spdk_bdev_get_qos_rate_limits(struct spdk_bdev *bdev, uint64_t *limits) 2866 { 2867 int i; 2868 2869 memset(limits, 0, sizeof(*limits) * SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES); 2870 2871 pthread_mutex_lock(&bdev->internal.mutex); 2872 if (bdev->internal.qos) { 2873 for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) { 2874 if (bdev->internal.qos->rate_limits[i].limit != 2875 SPDK_BDEV_QOS_LIMIT_NOT_DEFINED) { 2876 limits[i] = bdev->internal.qos->rate_limits[i].limit; 2877 if (bdev_qos_is_iops_rate_limit(i) == false) { 2878 /* Change from Byte to Megabyte which is user visible. */ 2879 limits[i] = limits[i] / 1024 / 1024; 2880 } 2881 } 2882 } 2883 } 2884 pthread_mutex_unlock(&bdev->internal.mutex); 2885 } 2886 2887 size_t 2888 spdk_bdev_get_buf_align(const struct spdk_bdev *bdev) 2889 { 2890 return 1 << bdev->required_alignment; 2891 } 2892 2893 uint32_t 2894 spdk_bdev_get_optimal_io_boundary(const struct spdk_bdev *bdev) 2895 { 2896 return bdev->optimal_io_boundary; 2897 } 2898 2899 bool 2900 spdk_bdev_has_write_cache(const struct spdk_bdev *bdev) 2901 { 2902 return bdev->write_cache; 2903 } 2904 2905 const struct spdk_uuid * 2906 spdk_bdev_get_uuid(const struct spdk_bdev *bdev) 2907 { 2908 return &bdev->uuid; 2909 } 2910 2911 uint16_t 2912 spdk_bdev_get_acwu(const struct spdk_bdev *bdev) 2913 { 2914 return bdev->acwu; 2915 } 2916 2917 uint32_t 2918 spdk_bdev_get_md_size(const struct spdk_bdev *bdev) 2919 { 2920 return bdev->md_len; 2921 } 2922 2923 bool 2924 spdk_bdev_is_md_interleaved(const struct spdk_bdev *bdev) 2925 { 2926 return (bdev->md_len != 0) && bdev->md_interleave; 2927 } 2928 2929 bool 2930 spdk_bdev_is_md_separate(const struct spdk_bdev *bdev) 2931 { 2932 return (bdev->md_len != 0) && !bdev->md_interleave; 2933 } 2934 2935 bool 2936 spdk_bdev_is_zoned(const struct spdk_bdev *bdev) 2937 { 2938 return bdev->zoned; 2939 } 2940 2941 uint32_t 2942 spdk_bdev_get_data_block_size(const struct spdk_bdev *bdev) 2943 { 2944 if (spdk_bdev_is_md_interleaved(bdev)) { 2945 return bdev->blocklen - bdev->md_len; 2946 } else { 2947 return bdev->blocklen; 2948 } 2949 } 2950 2951 static uint32_t 2952 _bdev_get_block_size_with_md(const struct spdk_bdev *bdev) 2953 { 2954 if (!spdk_bdev_is_md_interleaved(bdev)) { 2955 return bdev->blocklen + bdev->md_len; 2956 } else { 2957 return bdev->blocklen; 2958 } 2959 } 2960 2961 enum spdk_dif_type spdk_bdev_get_dif_type(const struct spdk_bdev *bdev) 2962 { 2963 if (bdev->md_len != 0) { 2964 return bdev->dif_type; 2965 } else { 2966 return SPDK_DIF_DISABLE; 2967 } 2968 } 2969 2970 bool 2971 spdk_bdev_is_dif_head_of_md(const struct spdk_bdev *bdev) 2972 { 2973 if (spdk_bdev_get_dif_type(bdev) != SPDK_DIF_DISABLE) { 2974 return bdev->dif_is_head_of_md; 2975 } else { 2976 return false; 2977 } 2978 } 2979 2980 bool 2981 spdk_bdev_is_dif_check_enabled(const struct spdk_bdev *bdev, 2982 enum spdk_dif_check_type check_type) 2983 { 2984 if (spdk_bdev_get_dif_type(bdev) == SPDK_DIF_DISABLE) { 2985 return false; 2986 } 2987 2988 switch (check_type) { 2989 case SPDK_DIF_CHECK_TYPE_REFTAG: 2990 return (bdev->dif_check_flags & SPDK_DIF_FLAGS_REFTAG_CHECK) != 0; 2991 case SPDK_DIF_CHECK_TYPE_APPTAG: 2992 return (bdev->dif_check_flags & SPDK_DIF_FLAGS_APPTAG_CHECK) != 0; 2993 case SPDK_DIF_CHECK_TYPE_GUARD: 2994 return (bdev->dif_check_flags & SPDK_DIF_FLAGS_GUARD_CHECK) != 0; 2995 default: 2996 return false; 2997 } 2998 } 2999 3000 uint64_t 3001 spdk_bdev_get_qd(const struct spdk_bdev *bdev) 3002 { 3003 return bdev->internal.measured_queue_depth; 3004 } 3005 3006 uint64_t 3007 spdk_bdev_get_qd_sampling_period(const struct spdk_bdev *bdev) 3008 { 3009 return bdev->internal.period; 3010 } 3011 3012 uint64_t 3013 spdk_bdev_get_weighted_io_time(const struct spdk_bdev *bdev) 3014 { 3015 return bdev->internal.weighted_io_time; 3016 } 3017 3018 uint64_t 3019 spdk_bdev_get_io_time(const struct spdk_bdev *bdev) 3020 { 3021 return bdev->internal.io_time; 3022 } 3023 3024 static void 3025 _calculate_measured_qd_cpl(struct spdk_io_channel_iter *i, int status) 3026 { 3027 struct spdk_bdev *bdev = spdk_io_channel_iter_get_ctx(i); 3028 3029 bdev->internal.measured_queue_depth = bdev->internal.temporary_queue_depth; 3030 3031 if (bdev->internal.measured_queue_depth) { 3032 bdev->internal.io_time += bdev->internal.period; 3033 bdev->internal.weighted_io_time += bdev->internal.period * bdev->internal.measured_queue_depth; 3034 } 3035 } 3036 3037 static void 3038 _calculate_measured_qd(struct spdk_io_channel_iter *i) 3039 { 3040 struct spdk_bdev *bdev = spdk_io_channel_iter_get_ctx(i); 3041 struct spdk_io_channel *io_ch = spdk_io_channel_iter_get_channel(i); 3042 struct spdk_bdev_channel *ch = spdk_io_channel_get_ctx(io_ch); 3043 3044 bdev->internal.temporary_queue_depth += ch->io_outstanding; 3045 spdk_for_each_channel_continue(i, 0); 3046 } 3047 3048 static int 3049 bdev_calculate_measured_queue_depth(void *ctx) 3050 { 3051 struct spdk_bdev *bdev = ctx; 3052 bdev->internal.temporary_queue_depth = 0; 3053 spdk_for_each_channel(__bdev_to_io_dev(bdev), _calculate_measured_qd, bdev, 3054 _calculate_measured_qd_cpl); 3055 return 0; 3056 } 3057 3058 void 3059 spdk_bdev_set_qd_sampling_period(struct spdk_bdev *bdev, uint64_t period) 3060 { 3061 bdev->internal.period = period; 3062 3063 if (bdev->internal.qd_poller != NULL) { 3064 spdk_poller_unregister(&bdev->internal.qd_poller); 3065 bdev->internal.measured_queue_depth = UINT64_MAX; 3066 } 3067 3068 if (period != 0) { 3069 bdev->internal.qd_poller = SPDK_POLLER_REGISTER(bdev_calculate_measured_queue_depth, bdev, 3070 period); 3071 } 3072 } 3073 3074 static void 3075 _resize_notify(void *arg) 3076 { 3077 struct spdk_bdev_desc *desc = arg; 3078 3079 pthread_mutex_lock(&desc->mutex); 3080 desc->refs--; 3081 if (!desc->closed) { 3082 pthread_mutex_unlock(&desc->mutex); 3083 desc->callback.event_fn(SPDK_BDEV_EVENT_RESIZE, 3084 desc->bdev, 3085 desc->callback.ctx); 3086 return; 3087 } else if (0 == desc->refs) { 3088 /* This descriptor was closed after this resize_notify message was sent. 3089 * spdk_bdev_close() could not free the descriptor since this message was 3090 * in flight, so we free it now using bdev_desc_free(). 3091 */ 3092 pthread_mutex_unlock(&desc->mutex); 3093 bdev_desc_free(desc); 3094 return; 3095 } 3096 pthread_mutex_unlock(&desc->mutex); 3097 } 3098 3099 int 3100 spdk_bdev_notify_blockcnt_change(struct spdk_bdev *bdev, uint64_t size) 3101 { 3102 struct spdk_bdev_desc *desc; 3103 int ret; 3104 3105 pthread_mutex_lock(&bdev->internal.mutex); 3106 3107 /* bdev has open descriptors */ 3108 if (!TAILQ_EMPTY(&bdev->internal.open_descs) && 3109 bdev->blockcnt > size) { 3110 ret = -EBUSY; 3111 } else { 3112 bdev->blockcnt = size; 3113 TAILQ_FOREACH(desc, &bdev->internal.open_descs, link) { 3114 pthread_mutex_lock(&desc->mutex); 3115 if (desc->callback.open_with_ext && !desc->closed) { 3116 desc->refs++; 3117 spdk_thread_send_msg(desc->thread, _resize_notify, desc); 3118 } 3119 pthread_mutex_unlock(&desc->mutex); 3120 } 3121 ret = 0; 3122 } 3123 3124 pthread_mutex_unlock(&bdev->internal.mutex); 3125 3126 return ret; 3127 } 3128 3129 /* 3130 * Convert I/O offset and length from bytes to blocks. 3131 * 3132 * Returns zero on success or non-zero if the byte parameters aren't divisible by the block size. 3133 */ 3134 static uint64_t 3135 bdev_bytes_to_blocks(struct spdk_bdev *bdev, uint64_t offset_bytes, uint64_t *offset_blocks, 3136 uint64_t num_bytes, uint64_t *num_blocks) 3137 { 3138 uint32_t block_size = bdev->blocklen; 3139 uint8_t shift_cnt; 3140 3141 /* Avoid expensive div operations if possible. These spdk_u32 functions are very cheap. */ 3142 if (spdk_likely(spdk_u32_is_pow2(block_size))) { 3143 shift_cnt = spdk_u32log2(block_size); 3144 *offset_blocks = offset_bytes >> shift_cnt; 3145 *num_blocks = num_bytes >> shift_cnt; 3146 return (offset_bytes - (*offset_blocks << shift_cnt)) | 3147 (num_bytes - (*num_blocks << shift_cnt)); 3148 } else { 3149 *offset_blocks = offset_bytes / block_size; 3150 *num_blocks = num_bytes / block_size; 3151 return (offset_bytes % block_size) | (num_bytes % block_size); 3152 } 3153 } 3154 3155 static bool 3156 bdev_io_valid_blocks(struct spdk_bdev *bdev, uint64_t offset_blocks, uint64_t num_blocks) 3157 { 3158 /* Return failure if offset_blocks + num_blocks is less than offset_blocks; indicates there 3159 * has been an overflow and hence the offset has been wrapped around */ 3160 if (offset_blocks + num_blocks < offset_blocks) { 3161 return false; 3162 } 3163 3164 /* Return failure if offset_blocks + num_blocks exceeds the size of the bdev */ 3165 if (offset_blocks + num_blocks > bdev->blockcnt) { 3166 return false; 3167 } 3168 3169 return true; 3170 } 3171 3172 static bool 3173 _bdev_io_check_md_buf(const struct iovec *iovs, const void *md_buf) 3174 { 3175 return _is_buf_allocated(iovs) == (md_buf != NULL); 3176 } 3177 3178 static int 3179 bdev_read_blocks_with_md(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, void *buf, 3180 void *md_buf, int64_t offset_blocks, uint64_t num_blocks, 3181 spdk_bdev_io_completion_cb cb, void *cb_arg) 3182 { 3183 struct spdk_bdev *bdev = spdk_bdev_desc_get_bdev(desc); 3184 struct spdk_bdev_io *bdev_io; 3185 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 3186 3187 if (!bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) { 3188 return -EINVAL; 3189 } 3190 3191 bdev_io = bdev_channel_get_io(channel); 3192 if (!bdev_io) { 3193 return -ENOMEM; 3194 } 3195 3196 bdev_io->internal.ch = channel; 3197 bdev_io->internal.desc = desc; 3198 bdev_io->type = SPDK_BDEV_IO_TYPE_READ; 3199 bdev_io->u.bdev.iovs = &bdev_io->iov; 3200 bdev_io->u.bdev.iovs[0].iov_base = buf; 3201 bdev_io->u.bdev.iovs[0].iov_len = num_blocks * bdev->blocklen; 3202 bdev_io->u.bdev.iovcnt = 1; 3203 bdev_io->u.bdev.md_buf = md_buf; 3204 bdev_io->u.bdev.num_blocks = num_blocks; 3205 bdev_io->u.bdev.offset_blocks = offset_blocks; 3206 bdev_io_init(bdev_io, bdev, cb_arg, cb); 3207 3208 bdev_io_submit(bdev_io); 3209 return 0; 3210 } 3211 3212 int 3213 spdk_bdev_read(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 3214 void *buf, uint64_t offset, uint64_t nbytes, 3215 spdk_bdev_io_completion_cb cb, void *cb_arg) 3216 { 3217 uint64_t offset_blocks, num_blocks; 3218 3219 if (bdev_bytes_to_blocks(spdk_bdev_desc_get_bdev(desc), offset, &offset_blocks, 3220 nbytes, &num_blocks) != 0) { 3221 return -EINVAL; 3222 } 3223 3224 return spdk_bdev_read_blocks(desc, ch, buf, offset_blocks, num_blocks, cb, cb_arg); 3225 } 3226 3227 int 3228 spdk_bdev_read_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 3229 void *buf, uint64_t offset_blocks, uint64_t num_blocks, 3230 spdk_bdev_io_completion_cb cb, void *cb_arg) 3231 { 3232 return bdev_read_blocks_with_md(desc, ch, buf, NULL, offset_blocks, num_blocks, cb, cb_arg); 3233 } 3234 3235 int 3236 spdk_bdev_read_blocks_with_md(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 3237 void *buf, void *md_buf, int64_t offset_blocks, uint64_t num_blocks, 3238 spdk_bdev_io_completion_cb cb, void *cb_arg) 3239 { 3240 struct iovec iov = { 3241 .iov_base = buf, 3242 }; 3243 3244 if (!spdk_bdev_is_md_separate(spdk_bdev_desc_get_bdev(desc))) { 3245 return -EINVAL; 3246 } 3247 3248 if (!_bdev_io_check_md_buf(&iov, md_buf)) { 3249 return -EINVAL; 3250 } 3251 3252 return bdev_read_blocks_with_md(desc, ch, buf, md_buf, offset_blocks, num_blocks, 3253 cb, cb_arg); 3254 } 3255 3256 int 3257 spdk_bdev_readv(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 3258 struct iovec *iov, int iovcnt, 3259 uint64_t offset, uint64_t nbytes, 3260 spdk_bdev_io_completion_cb cb, void *cb_arg) 3261 { 3262 uint64_t offset_blocks, num_blocks; 3263 3264 if (bdev_bytes_to_blocks(spdk_bdev_desc_get_bdev(desc), offset, &offset_blocks, 3265 nbytes, &num_blocks) != 0) { 3266 return -EINVAL; 3267 } 3268 3269 return spdk_bdev_readv_blocks(desc, ch, iov, iovcnt, offset_blocks, num_blocks, cb, cb_arg); 3270 } 3271 3272 static int 3273 bdev_readv_blocks_with_md(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 3274 struct iovec *iov, int iovcnt, void *md_buf, uint64_t offset_blocks, 3275 uint64_t num_blocks, spdk_bdev_io_completion_cb cb, void *cb_arg) 3276 { 3277 struct spdk_bdev *bdev = spdk_bdev_desc_get_bdev(desc); 3278 struct spdk_bdev_io *bdev_io; 3279 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 3280 3281 if (!bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) { 3282 return -EINVAL; 3283 } 3284 3285 bdev_io = bdev_channel_get_io(channel); 3286 if (!bdev_io) { 3287 return -ENOMEM; 3288 } 3289 3290 bdev_io->internal.ch = channel; 3291 bdev_io->internal.desc = desc; 3292 bdev_io->type = SPDK_BDEV_IO_TYPE_READ; 3293 bdev_io->u.bdev.iovs = iov; 3294 bdev_io->u.bdev.iovcnt = iovcnt; 3295 bdev_io->u.bdev.md_buf = md_buf; 3296 bdev_io->u.bdev.num_blocks = num_blocks; 3297 bdev_io->u.bdev.offset_blocks = offset_blocks; 3298 bdev_io_init(bdev_io, bdev, cb_arg, cb); 3299 3300 bdev_io_submit(bdev_io); 3301 return 0; 3302 } 3303 3304 int spdk_bdev_readv_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 3305 struct iovec *iov, int iovcnt, 3306 uint64_t offset_blocks, uint64_t num_blocks, 3307 spdk_bdev_io_completion_cb cb, void *cb_arg) 3308 { 3309 return bdev_readv_blocks_with_md(desc, ch, iov, iovcnt, NULL, offset_blocks, 3310 num_blocks, cb, cb_arg); 3311 } 3312 3313 int 3314 spdk_bdev_readv_blocks_with_md(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 3315 struct iovec *iov, int iovcnt, void *md_buf, 3316 uint64_t offset_blocks, uint64_t num_blocks, 3317 spdk_bdev_io_completion_cb cb, void *cb_arg) 3318 { 3319 if (!spdk_bdev_is_md_separate(spdk_bdev_desc_get_bdev(desc))) { 3320 return -EINVAL; 3321 } 3322 3323 if (!_bdev_io_check_md_buf(iov, md_buf)) { 3324 return -EINVAL; 3325 } 3326 3327 return bdev_readv_blocks_with_md(desc, ch, iov, iovcnt, md_buf, offset_blocks, 3328 num_blocks, cb, cb_arg); 3329 } 3330 3331 static int 3332 bdev_write_blocks_with_md(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 3333 void *buf, void *md_buf, uint64_t offset_blocks, uint64_t num_blocks, 3334 spdk_bdev_io_completion_cb cb, void *cb_arg) 3335 { 3336 struct spdk_bdev *bdev = spdk_bdev_desc_get_bdev(desc); 3337 struct spdk_bdev_io *bdev_io; 3338 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 3339 3340 if (!desc->write) { 3341 return -EBADF; 3342 } 3343 3344 if (!bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) { 3345 return -EINVAL; 3346 } 3347 3348 bdev_io = bdev_channel_get_io(channel); 3349 if (!bdev_io) { 3350 return -ENOMEM; 3351 } 3352 3353 bdev_io->internal.ch = channel; 3354 bdev_io->internal.desc = desc; 3355 bdev_io->type = SPDK_BDEV_IO_TYPE_WRITE; 3356 bdev_io->u.bdev.iovs = &bdev_io->iov; 3357 bdev_io->u.bdev.iovs[0].iov_base = buf; 3358 bdev_io->u.bdev.iovs[0].iov_len = num_blocks * bdev->blocklen; 3359 bdev_io->u.bdev.iovcnt = 1; 3360 bdev_io->u.bdev.md_buf = md_buf; 3361 bdev_io->u.bdev.num_blocks = num_blocks; 3362 bdev_io->u.bdev.offset_blocks = offset_blocks; 3363 bdev_io_init(bdev_io, bdev, cb_arg, cb); 3364 3365 bdev_io_submit(bdev_io); 3366 return 0; 3367 } 3368 3369 int 3370 spdk_bdev_write(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 3371 void *buf, uint64_t offset, uint64_t nbytes, 3372 spdk_bdev_io_completion_cb cb, void *cb_arg) 3373 { 3374 uint64_t offset_blocks, num_blocks; 3375 3376 if (bdev_bytes_to_blocks(spdk_bdev_desc_get_bdev(desc), offset, &offset_blocks, 3377 nbytes, &num_blocks) != 0) { 3378 return -EINVAL; 3379 } 3380 3381 return spdk_bdev_write_blocks(desc, ch, buf, offset_blocks, num_blocks, cb, cb_arg); 3382 } 3383 3384 int 3385 spdk_bdev_write_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 3386 void *buf, uint64_t offset_blocks, uint64_t num_blocks, 3387 spdk_bdev_io_completion_cb cb, void *cb_arg) 3388 { 3389 return bdev_write_blocks_with_md(desc, ch, buf, NULL, offset_blocks, num_blocks, 3390 cb, cb_arg); 3391 } 3392 3393 int 3394 spdk_bdev_write_blocks_with_md(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 3395 void *buf, void *md_buf, uint64_t offset_blocks, uint64_t num_blocks, 3396 spdk_bdev_io_completion_cb cb, void *cb_arg) 3397 { 3398 struct iovec iov = { 3399 .iov_base = buf, 3400 }; 3401 3402 if (!spdk_bdev_is_md_separate(spdk_bdev_desc_get_bdev(desc))) { 3403 return -EINVAL; 3404 } 3405 3406 if (!_bdev_io_check_md_buf(&iov, md_buf)) { 3407 return -EINVAL; 3408 } 3409 3410 return bdev_write_blocks_with_md(desc, ch, buf, md_buf, offset_blocks, num_blocks, 3411 cb, cb_arg); 3412 } 3413 3414 static int 3415 bdev_writev_blocks_with_md(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 3416 struct iovec *iov, int iovcnt, void *md_buf, 3417 uint64_t offset_blocks, uint64_t num_blocks, 3418 spdk_bdev_io_completion_cb cb, void *cb_arg) 3419 { 3420 struct spdk_bdev *bdev = spdk_bdev_desc_get_bdev(desc); 3421 struct spdk_bdev_io *bdev_io; 3422 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 3423 3424 if (!desc->write) { 3425 return -EBADF; 3426 } 3427 3428 if (!bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) { 3429 return -EINVAL; 3430 } 3431 3432 bdev_io = bdev_channel_get_io(channel); 3433 if (!bdev_io) { 3434 return -ENOMEM; 3435 } 3436 3437 bdev_io->internal.ch = channel; 3438 bdev_io->internal.desc = desc; 3439 bdev_io->type = SPDK_BDEV_IO_TYPE_WRITE; 3440 bdev_io->u.bdev.iovs = iov; 3441 bdev_io->u.bdev.iovcnt = iovcnt; 3442 bdev_io->u.bdev.md_buf = md_buf; 3443 bdev_io->u.bdev.num_blocks = num_blocks; 3444 bdev_io->u.bdev.offset_blocks = offset_blocks; 3445 bdev_io_init(bdev_io, bdev, cb_arg, cb); 3446 3447 bdev_io_submit(bdev_io); 3448 return 0; 3449 } 3450 3451 int 3452 spdk_bdev_writev(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 3453 struct iovec *iov, int iovcnt, 3454 uint64_t offset, uint64_t len, 3455 spdk_bdev_io_completion_cb cb, void *cb_arg) 3456 { 3457 uint64_t offset_blocks, num_blocks; 3458 3459 if (bdev_bytes_to_blocks(spdk_bdev_desc_get_bdev(desc), offset, &offset_blocks, 3460 len, &num_blocks) != 0) { 3461 return -EINVAL; 3462 } 3463 3464 return spdk_bdev_writev_blocks(desc, ch, iov, iovcnt, offset_blocks, num_blocks, cb, cb_arg); 3465 } 3466 3467 int 3468 spdk_bdev_writev_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 3469 struct iovec *iov, int iovcnt, 3470 uint64_t offset_blocks, uint64_t num_blocks, 3471 spdk_bdev_io_completion_cb cb, void *cb_arg) 3472 { 3473 return bdev_writev_blocks_with_md(desc, ch, iov, iovcnt, NULL, offset_blocks, 3474 num_blocks, cb, cb_arg); 3475 } 3476 3477 int 3478 spdk_bdev_writev_blocks_with_md(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 3479 struct iovec *iov, int iovcnt, void *md_buf, 3480 uint64_t offset_blocks, uint64_t num_blocks, 3481 spdk_bdev_io_completion_cb cb, void *cb_arg) 3482 { 3483 if (!spdk_bdev_is_md_separate(spdk_bdev_desc_get_bdev(desc))) { 3484 return -EINVAL; 3485 } 3486 3487 if (!_bdev_io_check_md_buf(iov, md_buf)) { 3488 return -EINVAL; 3489 } 3490 3491 return bdev_writev_blocks_with_md(desc, ch, iov, iovcnt, md_buf, offset_blocks, 3492 num_blocks, cb, cb_arg); 3493 } 3494 3495 static void 3496 bdev_compare_do_read_done(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg) 3497 { 3498 struct spdk_bdev_io *parent_io = cb_arg; 3499 uint8_t *read_buf = bdev_io->u.bdev.iovs[0].iov_base; 3500 int i, rc = 0; 3501 3502 if (!success) { 3503 parent_io->internal.status = SPDK_BDEV_IO_STATUS_FAILED; 3504 parent_io->internal.cb(parent_io, false, parent_io->internal.caller_ctx); 3505 spdk_bdev_free_io(bdev_io); 3506 return; 3507 } 3508 3509 for (i = 0; i < parent_io->u.bdev.iovcnt; i++) { 3510 rc = memcmp(read_buf, 3511 parent_io->u.bdev.iovs[i].iov_base, 3512 parent_io->u.bdev.iovs[i].iov_len); 3513 if (rc) { 3514 break; 3515 } 3516 read_buf += parent_io->u.bdev.iovs[i].iov_len; 3517 } 3518 3519 spdk_bdev_free_io(bdev_io); 3520 3521 if (rc == 0) { 3522 parent_io->internal.status = SPDK_BDEV_IO_STATUS_SUCCESS; 3523 parent_io->internal.cb(parent_io, true, parent_io->internal.caller_ctx); 3524 } else { 3525 parent_io->internal.status = SPDK_BDEV_IO_STATUS_MISCOMPARE; 3526 parent_io->internal.cb(parent_io, false, parent_io->internal.caller_ctx); 3527 } 3528 } 3529 3530 static void 3531 bdev_compare_do_read(void *_bdev_io) 3532 { 3533 struct spdk_bdev_io *bdev_io = _bdev_io; 3534 int rc; 3535 3536 rc = spdk_bdev_read_blocks(bdev_io->internal.desc, 3537 spdk_io_channel_from_ctx(bdev_io->internal.ch), NULL, 3538 bdev_io->u.bdev.offset_blocks, bdev_io->u.bdev.num_blocks, 3539 bdev_compare_do_read_done, bdev_io); 3540 3541 if (rc == -ENOMEM) { 3542 bdev_queue_io_wait_with_cb(bdev_io, bdev_compare_do_read); 3543 } else if (rc != 0) { 3544 bdev_io->internal.status = SPDK_BDEV_IO_STATUS_FAILED; 3545 bdev_io->internal.cb(bdev_io, false, bdev_io->internal.caller_ctx); 3546 } 3547 } 3548 3549 static int 3550 bdev_comparev_blocks_with_md(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 3551 struct iovec *iov, int iovcnt, void *md_buf, 3552 uint64_t offset_blocks, uint64_t num_blocks, 3553 spdk_bdev_io_completion_cb cb, void *cb_arg) 3554 { 3555 struct spdk_bdev *bdev = spdk_bdev_desc_get_bdev(desc); 3556 struct spdk_bdev_io *bdev_io; 3557 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 3558 3559 if (!bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) { 3560 return -EINVAL; 3561 } 3562 3563 bdev_io = bdev_channel_get_io(channel); 3564 if (!bdev_io) { 3565 return -ENOMEM; 3566 } 3567 3568 bdev_io->internal.ch = channel; 3569 bdev_io->internal.desc = desc; 3570 bdev_io->type = SPDK_BDEV_IO_TYPE_COMPARE; 3571 bdev_io->u.bdev.iovs = iov; 3572 bdev_io->u.bdev.iovcnt = iovcnt; 3573 bdev_io->u.bdev.md_buf = md_buf; 3574 bdev_io->u.bdev.num_blocks = num_blocks; 3575 bdev_io->u.bdev.offset_blocks = offset_blocks; 3576 bdev_io_init(bdev_io, bdev, cb_arg, cb); 3577 3578 if (bdev_io_type_supported(bdev, SPDK_BDEV_IO_TYPE_COMPARE)) { 3579 bdev_io_submit(bdev_io); 3580 return 0; 3581 } 3582 3583 bdev_compare_do_read(bdev_io); 3584 3585 return 0; 3586 } 3587 3588 int 3589 spdk_bdev_comparev_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 3590 struct iovec *iov, int iovcnt, 3591 uint64_t offset_blocks, uint64_t num_blocks, 3592 spdk_bdev_io_completion_cb cb, void *cb_arg) 3593 { 3594 return bdev_comparev_blocks_with_md(desc, ch, iov, iovcnt, NULL, offset_blocks, 3595 num_blocks, cb, cb_arg); 3596 } 3597 3598 int 3599 spdk_bdev_comparev_blocks_with_md(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 3600 struct iovec *iov, int iovcnt, void *md_buf, 3601 uint64_t offset_blocks, uint64_t num_blocks, 3602 spdk_bdev_io_completion_cb cb, void *cb_arg) 3603 { 3604 if (!spdk_bdev_is_md_separate(spdk_bdev_desc_get_bdev(desc))) { 3605 return -EINVAL; 3606 } 3607 3608 if (!_bdev_io_check_md_buf(iov, md_buf)) { 3609 return -EINVAL; 3610 } 3611 3612 return bdev_comparev_blocks_with_md(desc, ch, iov, iovcnt, md_buf, offset_blocks, 3613 num_blocks, cb, cb_arg); 3614 } 3615 3616 static int 3617 bdev_compare_blocks_with_md(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 3618 void *buf, void *md_buf, uint64_t offset_blocks, uint64_t num_blocks, 3619 spdk_bdev_io_completion_cb cb, void *cb_arg) 3620 { 3621 struct spdk_bdev *bdev = spdk_bdev_desc_get_bdev(desc); 3622 struct spdk_bdev_io *bdev_io; 3623 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 3624 3625 if (!bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) { 3626 return -EINVAL; 3627 } 3628 3629 bdev_io = bdev_channel_get_io(channel); 3630 if (!bdev_io) { 3631 return -ENOMEM; 3632 } 3633 3634 bdev_io->internal.ch = channel; 3635 bdev_io->internal.desc = desc; 3636 bdev_io->type = SPDK_BDEV_IO_TYPE_COMPARE; 3637 bdev_io->u.bdev.iovs = &bdev_io->iov; 3638 bdev_io->u.bdev.iovs[0].iov_base = buf; 3639 bdev_io->u.bdev.iovs[0].iov_len = num_blocks * bdev->blocklen; 3640 bdev_io->u.bdev.iovcnt = 1; 3641 bdev_io->u.bdev.md_buf = md_buf; 3642 bdev_io->u.bdev.num_blocks = num_blocks; 3643 bdev_io->u.bdev.offset_blocks = offset_blocks; 3644 bdev_io_init(bdev_io, bdev, cb_arg, cb); 3645 3646 if (bdev_io_type_supported(bdev, SPDK_BDEV_IO_TYPE_COMPARE)) { 3647 bdev_io_submit(bdev_io); 3648 return 0; 3649 } 3650 3651 bdev_compare_do_read(bdev_io); 3652 3653 return 0; 3654 } 3655 3656 int 3657 spdk_bdev_compare_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 3658 void *buf, uint64_t offset_blocks, uint64_t num_blocks, 3659 spdk_bdev_io_completion_cb cb, void *cb_arg) 3660 { 3661 return bdev_compare_blocks_with_md(desc, ch, buf, NULL, offset_blocks, num_blocks, 3662 cb, cb_arg); 3663 } 3664 3665 int 3666 spdk_bdev_compare_blocks_with_md(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 3667 void *buf, void *md_buf, uint64_t offset_blocks, uint64_t num_blocks, 3668 spdk_bdev_io_completion_cb cb, void *cb_arg) 3669 { 3670 struct iovec iov = { 3671 .iov_base = buf, 3672 }; 3673 3674 if (!spdk_bdev_is_md_separate(spdk_bdev_desc_get_bdev(desc))) { 3675 return -EINVAL; 3676 } 3677 3678 if (!_bdev_io_check_md_buf(&iov, md_buf)) { 3679 return -EINVAL; 3680 } 3681 3682 return bdev_compare_blocks_with_md(desc, ch, buf, md_buf, offset_blocks, num_blocks, 3683 cb, cb_arg); 3684 } 3685 3686 static void 3687 bdev_comparev_and_writev_blocks_unlocked(void *ctx, int unlock_status) 3688 { 3689 struct spdk_bdev_io *bdev_io = ctx; 3690 3691 if (unlock_status) { 3692 SPDK_ERRLOG("LBA range unlock failed\n"); 3693 } 3694 3695 bdev_io->internal.cb(bdev_io, bdev_io->internal.status == SPDK_BDEV_IO_STATUS_SUCCESS ? true : 3696 false, bdev_io->internal.caller_ctx); 3697 } 3698 3699 static void 3700 bdev_comparev_and_writev_blocks_unlock(struct spdk_bdev_io *bdev_io, int status) 3701 { 3702 bdev_io->internal.status = status; 3703 3704 bdev_unlock_lba_range(bdev_io->internal.desc, spdk_io_channel_from_ctx(bdev_io->internal.ch), 3705 bdev_io->u.bdev.offset_blocks, bdev_io->u.bdev.num_blocks, 3706 bdev_comparev_and_writev_blocks_unlocked, bdev_io); 3707 } 3708 3709 static void 3710 bdev_compare_and_write_do_write_done(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg) 3711 { 3712 struct spdk_bdev_io *parent_io = cb_arg; 3713 3714 if (!success) { 3715 SPDK_ERRLOG("Compare and write operation failed\n"); 3716 } 3717 3718 spdk_bdev_free_io(bdev_io); 3719 3720 bdev_comparev_and_writev_blocks_unlock(parent_io, 3721 success ? SPDK_BDEV_IO_STATUS_SUCCESS : SPDK_BDEV_IO_STATUS_FAILED); 3722 } 3723 3724 static void 3725 bdev_compare_and_write_do_write(void *_bdev_io) 3726 { 3727 struct spdk_bdev_io *bdev_io = _bdev_io; 3728 int rc; 3729 3730 rc = spdk_bdev_writev_blocks(bdev_io->internal.desc, 3731 spdk_io_channel_from_ctx(bdev_io->internal.ch), 3732 bdev_io->u.bdev.fused_iovs, bdev_io->u.bdev.fused_iovcnt, 3733 bdev_io->u.bdev.offset_blocks, bdev_io->u.bdev.num_blocks, 3734 bdev_compare_and_write_do_write_done, bdev_io); 3735 3736 3737 if (rc == -ENOMEM) { 3738 bdev_queue_io_wait_with_cb(bdev_io, bdev_compare_and_write_do_write); 3739 } else if (rc != 0) { 3740 bdev_comparev_and_writev_blocks_unlock(bdev_io, SPDK_BDEV_IO_STATUS_FAILED); 3741 } 3742 } 3743 3744 static void 3745 bdev_compare_and_write_do_compare_done(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg) 3746 { 3747 struct spdk_bdev_io *parent_io = cb_arg; 3748 3749 spdk_bdev_free_io(bdev_io); 3750 3751 if (!success) { 3752 bdev_comparev_and_writev_blocks_unlock(parent_io, SPDK_BDEV_IO_STATUS_MISCOMPARE); 3753 return; 3754 } 3755 3756 bdev_compare_and_write_do_write(parent_io); 3757 } 3758 3759 static void 3760 bdev_compare_and_write_do_compare(void *_bdev_io) 3761 { 3762 struct spdk_bdev_io *bdev_io = _bdev_io; 3763 int rc; 3764 3765 rc = spdk_bdev_comparev_blocks(bdev_io->internal.desc, 3766 spdk_io_channel_from_ctx(bdev_io->internal.ch), bdev_io->u.bdev.iovs, 3767 bdev_io->u.bdev.iovcnt, bdev_io->u.bdev.offset_blocks, bdev_io->u.bdev.num_blocks, 3768 bdev_compare_and_write_do_compare_done, bdev_io); 3769 3770 if (rc == -ENOMEM) { 3771 bdev_queue_io_wait_with_cb(bdev_io, bdev_compare_and_write_do_compare); 3772 } else if (rc != 0) { 3773 bdev_comparev_and_writev_blocks_unlock(bdev_io, SPDK_BDEV_IO_STATUS_FIRST_FUSED_FAILED); 3774 } 3775 } 3776 3777 static void 3778 bdev_comparev_and_writev_blocks_locked(void *ctx, int status) 3779 { 3780 struct spdk_bdev_io *bdev_io = ctx; 3781 3782 if (status) { 3783 bdev_io->internal.status = SPDK_BDEV_IO_STATUS_FIRST_FUSED_FAILED; 3784 bdev_io->internal.cb(bdev_io, false, bdev_io->internal.caller_ctx); 3785 } 3786 3787 bdev_compare_and_write_do_compare(bdev_io); 3788 } 3789 3790 int 3791 spdk_bdev_comparev_and_writev_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 3792 struct iovec *compare_iov, int compare_iovcnt, 3793 struct iovec *write_iov, int write_iovcnt, 3794 uint64_t offset_blocks, uint64_t num_blocks, 3795 spdk_bdev_io_completion_cb cb, void *cb_arg) 3796 { 3797 struct spdk_bdev *bdev = spdk_bdev_desc_get_bdev(desc); 3798 struct spdk_bdev_io *bdev_io; 3799 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 3800 3801 if (!desc->write) { 3802 return -EBADF; 3803 } 3804 3805 if (!bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) { 3806 return -EINVAL; 3807 } 3808 3809 if (num_blocks > bdev->acwu) { 3810 return -EINVAL; 3811 } 3812 3813 bdev_io = bdev_channel_get_io(channel); 3814 if (!bdev_io) { 3815 return -ENOMEM; 3816 } 3817 3818 bdev_io->internal.ch = channel; 3819 bdev_io->internal.desc = desc; 3820 bdev_io->type = SPDK_BDEV_IO_TYPE_COMPARE_AND_WRITE; 3821 bdev_io->u.bdev.iovs = compare_iov; 3822 bdev_io->u.bdev.iovcnt = compare_iovcnt; 3823 bdev_io->u.bdev.fused_iovs = write_iov; 3824 bdev_io->u.bdev.fused_iovcnt = write_iovcnt; 3825 bdev_io->u.bdev.md_buf = NULL; 3826 bdev_io->u.bdev.num_blocks = num_blocks; 3827 bdev_io->u.bdev.offset_blocks = offset_blocks; 3828 bdev_io_init(bdev_io, bdev, cb_arg, cb); 3829 3830 if (bdev_io_type_supported(bdev, SPDK_BDEV_IO_TYPE_COMPARE_AND_WRITE)) { 3831 bdev_io_submit(bdev_io); 3832 return 0; 3833 } 3834 3835 return bdev_lock_lba_range(desc, ch, offset_blocks, num_blocks, 3836 bdev_comparev_and_writev_blocks_locked, bdev_io); 3837 } 3838 3839 static void 3840 bdev_zcopy_get_buf(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io, bool success) 3841 { 3842 if (!success) { 3843 /* Don't use spdk_bdev_io_complete here - this bdev_io was never actually submitted. */ 3844 bdev_io->internal.status = SPDK_BDEV_IO_STATUS_NOMEM; 3845 bdev_io->internal.cb(bdev_io, success, bdev_io->internal.caller_ctx); 3846 return; 3847 } 3848 3849 if (bdev_io->u.bdev.zcopy.populate) { 3850 /* Read the real data into the buffer */ 3851 bdev_io->type = SPDK_BDEV_IO_TYPE_READ; 3852 bdev_io->internal.status = SPDK_BDEV_IO_STATUS_PENDING; 3853 bdev_io_submit(bdev_io); 3854 return; 3855 } 3856 3857 /* Don't use spdk_bdev_io_complete here - this bdev_io was never actually submitted. */ 3858 bdev_io->internal.status = SPDK_BDEV_IO_STATUS_SUCCESS; 3859 bdev_io->internal.cb(bdev_io, success, bdev_io->internal.caller_ctx); 3860 } 3861 3862 int 3863 spdk_bdev_zcopy_start(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 3864 uint64_t offset_blocks, uint64_t num_blocks, 3865 bool populate, 3866 spdk_bdev_io_completion_cb cb, void *cb_arg) 3867 { 3868 struct spdk_bdev *bdev = spdk_bdev_desc_get_bdev(desc); 3869 struct spdk_bdev_io *bdev_io; 3870 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 3871 3872 if (!desc->write) { 3873 return -EBADF; 3874 } 3875 3876 if (!bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) { 3877 return -EINVAL; 3878 } 3879 3880 if (!spdk_bdev_io_type_supported(bdev, SPDK_BDEV_IO_TYPE_ZCOPY)) { 3881 return -ENOTSUP; 3882 } 3883 3884 bdev_io = bdev_channel_get_io(channel); 3885 if (!bdev_io) { 3886 return -ENOMEM; 3887 } 3888 3889 bdev_io->internal.ch = channel; 3890 bdev_io->internal.desc = desc; 3891 bdev_io->type = SPDK_BDEV_IO_TYPE_ZCOPY; 3892 bdev_io->u.bdev.num_blocks = num_blocks; 3893 bdev_io->u.bdev.offset_blocks = offset_blocks; 3894 bdev_io->u.bdev.iovs = NULL; 3895 bdev_io->u.bdev.iovcnt = 0; 3896 bdev_io->u.bdev.md_buf = NULL; 3897 bdev_io->u.bdev.zcopy.populate = populate ? 1 : 0; 3898 bdev_io->u.bdev.zcopy.commit = 0; 3899 bdev_io->u.bdev.zcopy.start = 1; 3900 bdev_io_init(bdev_io, bdev, cb_arg, cb); 3901 3902 if (bdev_io_type_supported(bdev, SPDK_BDEV_IO_TYPE_ZCOPY)) { 3903 bdev_io_submit(bdev_io); 3904 } else { 3905 /* Emulate zcopy by allocating a buffer */ 3906 spdk_bdev_io_get_buf(bdev_io, bdev_zcopy_get_buf, 3907 bdev_io->u.bdev.num_blocks * bdev->blocklen); 3908 } 3909 3910 return 0; 3911 } 3912 3913 int 3914 spdk_bdev_zcopy_end(struct spdk_bdev_io *bdev_io, bool commit, 3915 spdk_bdev_io_completion_cb cb, void *cb_arg) 3916 { 3917 struct spdk_bdev *bdev = bdev_io->bdev; 3918 3919 if (bdev_io->type == SPDK_BDEV_IO_TYPE_READ) { 3920 /* This can happen if the zcopy was emulated in start */ 3921 if (bdev_io->u.bdev.zcopy.start != 1) { 3922 return -EINVAL; 3923 } 3924 bdev_io->type = SPDK_BDEV_IO_TYPE_ZCOPY; 3925 } 3926 3927 if (bdev_io->type != SPDK_BDEV_IO_TYPE_ZCOPY) { 3928 return -EINVAL; 3929 } 3930 3931 bdev_io->u.bdev.zcopy.commit = commit ? 1 : 0; 3932 bdev_io->u.bdev.zcopy.start = 0; 3933 bdev_io->internal.caller_ctx = cb_arg; 3934 bdev_io->internal.cb = cb; 3935 bdev_io->internal.status = SPDK_BDEV_IO_STATUS_PENDING; 3936 3937 if (bdev_io_type_supported(bdev, SPDK_BDEV_IO_TYPE_ZCOPY)) { 3938 bdev_io_submit(bdev_io); 3939 return 0; 3940 } 3941 3942 if (!bdev_io->u.bdev.zcopy.commit) { 3943 /* Don't use spdk_bdev_io_complete here - this bdev_io was never actually submitted. */ 3944 bdev_io->internal.status = SPDK_BDEV_IO_STATUS_SUCCESS; 3945 bdev_io->internal.cb(bdev_io, true, bdev_io->internal.caller_ctx); 3946 return 0; 3947 } 3948 3949 bdev_io->type = SPDK_BDEV_IO_TYPE_WRITE; 3950 bdev_io_submit(bdev_io); 3951 3952 return 0; 3953 } 3954 3955 int 3956 spdk_bdev_write_zeroes(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 3957 uint64_t offset, uint64_t len, 3958 spdk_bdev_io_completion_cb cb, void *cb_arg) 3959 { 3960 uint64_t offset_blocks, num_blocks; 3961 3962 if (bdev_bytes_to_blocks(spdk_bdev_desc_get_bdev(desc), offset, &offset_blocks, 3963 len, &num_blocks) != 0) { 3964 return -EINVAL; 3965 } 3966 3967 return spdk_bdev_write_zeroes_blocks(desc, ch, offset_blocks, num_blocks, cb, cb_arg); 3968 } 3969 3970 int 3971 spdk_bdev_write_zeroes_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 3972 uint64_t offset_blocks, uint64_t num_blocks, 3973 spdk_bdev_io_completion_cb cb, void *cb_arg) 3974 { 3975 struct spdk_bdev *bdev = spdk_bdev_desc_get_bdev(desc); 3976 struct spdk_bdev_io *bdev_io; 3977 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 3978 3979 if (!desc->write) { 3980 return -EBADF; 3981 } 3982 3983 if (!bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) { 3984 return -EINVAL; 3985 } 3986 3987 if (!bdev_io_type_supported(bdev, SPDK_BDEV_IO_TYPE_WRITE_ZEROES) && 3988 !bdev_io_type_supported(bdev, SPDK_BDEV_IO_TYPE_WRITE)) { 3989 return -ENOTSUP; 3990 } 3991 3992 bdev_io = bdev_channel_get_io(channel); 3993 3994 if (!bdev_io) { 3995 return -ENOMEM; 3996 } 3997 3998 bdev_io->type = SPDK_BDEV_IO_TYPE_WRITE_ZEROES; 3999 bdev_io->internal.ch = channel; 4000 bdev_io->internal.desc = desc; 4001 bdev_io->u.bdev.offset_blocks = offset_blocks; 4002 bdev_io->u.bdev.num_blocks = num_blocks; 4003 bdev_io_init(bdev_io, bdev, cb_arg, cb); 4004 4005 if (bdev_io_type_supported(bdev, SPDK_BDEV_IO_TYPE_WRITE_ZEROES)) { 4006 bdev_io_submit(bdev_io); 4007 return 0; 4008 } 4009 4010 assert(bdev_io_type_supported(bdev, SPDK_BDEV_IO_TYPE_WRITE)); 4011 assert(_bdev_get_block_size_with_md(bdev) <= ZERO_BUFFER_SIZE); 4012 bdev_io->u.bdev.split_remaining_num_blocks = num_blocks; 4013 bdev_io->u.bdev.split_current_offset_blocks = offset_blocks; 4014 bdev_write_zero_buffer_next(bdev_io); 4015 4016 return 0; 4017 } 4018 4019 int 4020 spdk_bdev_unmap(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 4021 uint64_t offset, uint64_t nbytes, 4022 spdk_bdev_io_completion_cb cb, void *cb_arg) 4023 { 4024 uint64_t offset_blocks, num_blocks; 4025 4026 if (bdev_bytes_to_blocks(spdk_bdev_desc_get_bdev(desc), offset, &offset_blocks, 4027 nbytes, &num_blocks) != 0) { 4028 return -EINVAL; 4029 } 4030 4031 return spdk_bdev_unmap_blocks(desc, ch, offset_blocks, num_blocks, cb, cb_arg); 4032 } 4033 4034 int 4035 spdk_bdev_unmap_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 4036 uint64_t offset_blocks, uint64_t num_blocks, 4037 spdk_bdev_io_completion_cb cb, void *cb_arg) 4038 { 4039 struct spdk_bdev *bdev = spdk_bdev_desc_get_bdev(desc); 4040 struct spdk_bdev_io *bdev_io; 4041 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 4042 4043 if (!desc->write) { 4044 return -EBADF; 4045 } 4046 4047 if (!bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) { 4048 return -EINVAL; 4049 } 4050 4051 if (num_blocks == 0) { 4052 SPDK_ERRLOG("Can't unmap 0 bytes\n"); 4053 return -EINVAL; 4054 } 4055 4056 bdev_io = bdev_channel_get_io(channel); 4057 if (!bdev_io) { 4058 return -ENOMEM; 4059 } 4060 4061 bdev_io->internal.ch = channel; 4062 bdev_io->internal.desc = desc; 4063 bdev_io->type = SPDK_BDEV_IO_TYPE_UNMAP; 4064 4065 bdev_io->u.bdev.iovs = &bdev_io->iov; 4066 bdev_io->u.bdev.iovs[0].iov_base = NULL; 4067 bdev_io->u.bdev.iovs[0].iov_len = 0; 4068 bdev_io->u.bdev.iovcnt = 1; 4069 4070 bdev_io->u.bdev.offset_blocks = offset_blocks; 4071 bdev_io->u.bdev.num_blocks = num_blocks; 4072 bdev_io_init(bdev_io, bdev, cb_arg, cb); 4073 4074 bdev_io_submit(bdev_io); 4075 return 0; 4076 } 4077 4078 int 4079 spdk_bdev_flush(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 4080 uint64_t offset, uint64_t length, 4081 spdk_bdev_io_completion_cb cb, void *cb_arg) 4082 { 4083 uint64_t offset_blocks, num_blocks; 4084 4085 if (bdev_bytes_to_blocks(spdk_bdev_desc_get_bdev(desc), offset, &offset_blocks, 4086 length, &num_blocks) != 0) { 4087 return -EINVAL; 4088 } 4089 4090 return spdk_bdev_flush_blocks(desc, ch, offset_blocks, num_blocks, cb, cb_arg); 4091 } 4092 4093 int 4094 spdk_bdev_flush_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 4095 uint64_t offset_blocks, uint64_t num_blocks, 4096 spdk_bdev_io_completion_cb cb, void *cb_arg) 4097 { 4098 struct spdk_bdev *bdev = spdk_bdev_desc_get_bdev(desc); 4099 struct spdk_bdev_io *bdev_io; 4100 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 4101 4102 if (!desc->write) { 4103 return -EBADF; 4104 } 4105 4106 if (!bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) { 4107 return -EINVAL; 4108 } 4109 4110 bdev_io = bdev_channel_get_io(channel); 4111 if (!bdev_io) { 4112 return -ENOMEM; 4113 } 4114 4115 bdev_io->internal.ch = channel; 4116 bdev_io->internal.desc = desc; 4117 bdev_io->type = SPDK_BDEV_IO_TYPE_FLUSH; 4118 bdev_io->u.bdev.iovs = NULL; 4119 bdev_io->u.bdev.iovcnt = 0; 4120 bdev_io->u.bdev.offset_blocks = offset_blocks; 4121 bdev_io->u.bdev.num_blocks = num_blocks; 4122 bdev_io_init(bdev_io, bdev, cb_arg, cb); 4123 4124 bdev_io_submit(bdev_io); 4125 return 0; 4126 } 4127 4128 static void 4129 bdev_reset_dev(struct spdk_io_channel_iter *i, int status) 4130 { 4131 struct spdk_bdev_channel *ch = spdk_io_channel_iter_get_ctx(i); 4132 struct spdk_bdev_io *bdev_io; 4133 4134 bdev_io = TAILQ_FIRST(&ch->queued_resets); 4135 TAILQ_REMOVE(&ch->queued_resets, bdev_io, internal.link); 4136 bdev_io_submit_reset(bdev_io); 4137 } 4138 4139 static void 4140 bdev_reset_freeze_channel(struct spdk_io_channel_iter *i) 4141 { 4142 struct spdk_io_channel *ch; 4143 struct spdk_bdev_channel *channel; 4144 struct spdk_bdev_mgmt_channel *mgmt_channel; 4145 struct spdk_bdev_shared_resource *shared_resource; 4146 bdev_io_tailq_t tmp_queued; 4147 4148 TAILQ_INIT(&tmp_queued); 4149 4150 ch = spdk_io_channel_iter_get_channel(i); 4151 channel = spdk_io_channel_get_ctx(ch); 4152 shared_resource = channel->shared_resource; 4153 mgmt_channel = shared_resource->mgmt_ch; 4154 4155 channel->flags |= BDEV_CH_RESET_IN_PROGRESS; 4156 4157 if ((channel->flags & BDEV_CH_QOS_ENABLED) != 0) { 4158 /* The QoS object is always valid and readable while 4159 * the channel flag is set, so the lock here should not 4160 * be necessary. We're not in the fast path though, so 4161 * just take it anyway. */ 4162 pthread_mutex_lock(&channel->bdev->internal.mutex); 4163 if (channel->bdev->internal.qos->ch == channel) { 4164 TAILQ_SWAP(&channel->bdev->internal.qos->queued, &tmp_queued, spdk_bdev_io, internal.link); 4165 } 4166 pthread_mutex_unlock(&channel->bdev->internal.mutex); 4167 } 4168 4169 bdev_abort_queued_io(&shared_resource->nomem_io, channel); 4170 bdev_abort_buf_io(&mgmt_channel->need_buf_small, channel); 4171 bdev_abort_buf_io(&mgmt_channel->need_buf_large, channel); 4172 bdev_abort_queued_io(&tmp_queued, channel); 4173 4174 spdk_for_each_channel_continue(i, 0); 4175 } 4176 4177 static void 4178 bdev_start_reset(void *ctx) 4179 { 4180 struct spdk_bdev_channel *ch = ctx; 4181 4182 spdk_for_each_channel(__bdev_to_io_dev(ch->bdev), bdev_reset_freeze_channel, 4183 ch, bdev_reset_dev); 4184 } 4185 4186 static void 4187 bdev_channel_start_reset(struct spdk_bdev_channel *ch) 4188 { 4189 struct spdk_bdev *bdev = ch->bdev; 4190 4191 assert(!TAILQ_EMPTY(&ch->queued_resets)); 4192 4193 pthread_mutex_lock(&bdev->internal.mutex); 4194 if (bdev->internal.reset_in_progress == NULL) { 4195 bdev->internal.reset_in_progress = TAILQ_FIRST(&ch->queued_resets); 4196 /* 4197 * Take a channel reference for the target bdev for the life of this 4198 * reset. This guards against the channel getting destroyed while 4199 * spdk_for_each_channel() calls related to this reset IO are in 4200 * progress. We will release the reference when this reset is 4201 * completed. 4202 */ 4203 bdev->internal.reset_in_progress->u.reset.ch_ref = spdk_get_io_channel(__bdev_to_io_dev(bdev)); 4204 bdev_start_reset(ch); 4205 } 4206 pthread_mutex_unlock(&bdev->internal.mutex); 4207 } 4208 4209 int 4210 spdk_bdev_reset(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 4211 spdk_bdev_io_completion_cb cb, void *cb_arg) 4212 { 4213 struct spdk_bdev *bdev = spdk_bdev_desc_get_bdev(desc); 4214 struct spdk_bdev_io *bdev_io; 4215 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 4216 4217 bdev_io = bdev_channel_get_io(channel); 4218 if (!bdev_io) { 4219 return -ENOMEM; 4220 } 4221 4222 bdev_io->internal.ch = channel; 4223 bdev_io->internal.desc = desc; 4224 bdev_io->internal.submit_tsc = spdk_get_ticks(); 4225 bdev_io->type = SPDK_BDEV_IO_TYPE_RESET; 4226 bdev_io->u.reset.ch_ref = NULL; 4227 bdev_io_init(bdev_io, bdev, cb_arg, cb); 4228 4229 pthread_mutex_lock(&bdev->internal.mutex); 4230 TAILQ_INSERT_TAIL(&channel->queued_resets, bdev_io, internal.link); 4231 pthread_mutex_unlock(&bdev->internal.mutex); 4232 4233 TAILQ_INSERT_TAIL(&bdev_io->internal.ch->io_submitted, bdev_io, 4234 internal.ch_link); 4235 4236 bdev_channel_start_reset(channel); 4237 4238 return 0; 4239 } 4240 4241 void 4242 spdk_bdev_get_io_stat(struct spdk_bdev *bdev, struct spdk_io_channel *ch, 4243 struct spdk_bdev_io_stat *stat) 4244 { 4245 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 4246 4247 *stat = channel->stat; 4248 } 4249 4250 static void 4251 bdev_get_device_stat_done(struct spdk_io_channel_iter *i, int status) 4252 { 4253 void *io_device = spdk_io_channel_iter_get_io_device(i); 4254 struct spdk_bdev_iostat_ctx *bdev_iostat_ctx = spdk_io_channel_iter_get_ctx(i); 4255 4256 bdev_iostat_ctx->cb(__bdev_from_io_dev(io_device), bdev_iostat_ctx->stat, 4257 bdev_iostat_ctx->cb_arg, 0); 4258 free(bdev_iostat_ctx); 4259 } 4260 4261 static void 4262 bdev_get_each_channel_stat(struct spdk_io_channel_iter *i) 4263 { 4264 struct spdk_bdev_iostat_ctx *bdev_iostat_ctx = spdk_io_channel_iter_get_ctx(i); 4265 struct spdk_io_channel *ch = spdk_io_channel_iter_get_channel(i); 4266 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 4267 4268 bdev_io_stat_add(bdev_iostat_ctx->stat, &channel->stat); 4269 spdk_for_each_channel_continue(i, 0); 4270 } 4271 4272 void 4273 spdk_bdev_get_device_stat(struct spdk_bdev *bdev, struct spdk_bdev_io_stat *stat, 4274 spdk_bdev_get_device_stat_cb cb, void *cb_arg) 4275 { 4276 struct spdk_bdev_iostat_ctx *bdev_iostat_ctx; 4277 4278 assert(bdev != NULL); 4279 assert(stat != NULL); 4280 assert(cb != NULL); 4281 4282 bdev_iostat_ctx = calloc(1, sizeof(struct spdk_bdev_iostat_ctx)); 4283 if (bdev_iostat_ctx == NULL) { 4284 SPDK_ERRLOG("Unable to allocate memory for spdk_bdev_iostat_ctx\n"); 4285 cb(bdev, stat, cb_arg, -ENOMEM); 4286 return; 4287 } 4288 4289 bdev_iostat_ctx->stat = stat; 4290 bdev_iostat_ctx->cb = cb; 4291 bdev_iostat_ctx->cb_arg = cb_arg; 4292 4293 /* Start with the statistics from previously deleted channels. */ 4294 pthread_mutex_lock(&bdev->internal.mutex); 4295 bdev_io_stat_add(bdev_iostat_ctx->stat, &bdev->internal.stat); 4296 pthread_mutex_unlock(&bdev->internal.mutex); 4297 4298 /* Then iterate and add the statistics from each existing channel. */ 4299 spdk_for_each_channel(__bdev_to_io_dev(bdev), 4300 bdev_get_each_channel_stat, 4301 bdev_iostat_ctx, 4302 bdev_get_device_stat_done); 4303 } 4304 4305 int 4306 spdk_bdev_nvme_admin_passthru(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 4307 const struct spdk_nvme_cmd *cmd, void *buf, size_t nbytes, 4308 spdk_bdev_io_completion_cb cb, void *cb_arg) 4309 { 4310 struct spdk_bdev *bdev = spdk_bdev_desc_get_bdev(desc); 4311 struct spdk_bdev_io *bdev_io; 4312 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 4313 4314 if (!desc->write) { 4315 return -EBADF; 4316 } 4317 4318 bdev_io = bdev_channel_get_io(channel); 4319 if (!bdev_io) { 4320 return -ENOMEM; 4321 } 4322 4323 bdev_io->internal.ch = channel; 4324 bdev_io->internal.desc = desc; 4325 bdev_io->type = SPDK_BDEV_IO_TYPE_NVME_ADMIN; 4326 bdev_io->u.nvme_passthru.cmd = *cmd; 4327 bdev_io->u.nvme_passthru.buf = buf; 4328 bdev_io->u.nvme_passthru.nbytes = nbytes; 4329 bdev_io->u.nvme_passthru.md_buf = NULL; 4330 bdev_io->u.nvme_passthru.md_len = 0; 4331 4332 bdev_io_init(bdev_io, bdev, cb_arg, cb); 4333 4334 bdev_io_submit(bdev_io); 4335 return 0; 4336 } 4337 4338 int 4339 spdk_bdev_nvme_io_passthru(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 4340 const struct spdk_nvme_cmd *cmd, void *buf, size_t nbytes, 4341 spdk_bdev_io_completion_cb cb, void *cb_arg) 4342 { 4343 struct spdk_bdev *bdev = spdk_bdev_desc_get_bdev(desc); 4344 struct spdk_bdev_io *bdev_io; 4345 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 4346 4347 if (!desc->write) { 4348 /* 4349 * Do not try to parse the NVMe command - we could maybe use bits in the opcode 4350 * to easily determine if the command is a read or write, but for now just 4351 * do not allow io_passthru with a read-only descriptor. 4352 */ 4353 return -EBADF; 4354 } 4355 4356 bdev_io = bdev_channel_get_io(channel); 4357 if (!bdev_io) { 4358 return -ENOMEM; 4359 } 4360 4361 bdev_io->internal.ch = channel; 4362 bdev_io->internal.desc = desc; 4363 bdev_io->type = SPDK_BDEV_IO_TYPE_NVME_IO; 4364 bdev_io->u.nvme_passthru.cmd = *cmd; 4365 bdev_io->u.nvme_passthru.buf = buf; 4366 bdev_io->u.nvme_passthru.nbytes = nbytes; 4367 bdev_io->u.nvme_passthru.md_buf = NULL; 4368 bdev_io->u.nvme_passthru.md_len = 0; 4369 4370 bdev_io_init(bdev_io, bdev, cb_arg, cb); 4371 4372 bdev_io_submit(bdev_io); 4373 return 0; 4374 } 4375 4376 int 4377 spdk_bdev_nvme_io_passthru_md(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 4378 const struct spdk_nvme_cmd *cmd, void *buf, size_t nbytes, void *md_buf, size_t md_len, 4379 spdk_bdev_io_completion_cb cb, void *cb_arg) 4380 { 4381 struct spdk_bdev *bdev = spdk_bdev_desc_get_bdev(desc); 4382 struct spdk_bdev_io *bdev_io; 4383 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 4384 4385 if (!desc->write) { 4386 /* 4387 * Do not try to parse the NVMe command - we could maybe use bits in the opcode 4388 * to easily determine if the command is a read or write, but for now just 4389 * do not allow io_passthru with a read-only descriptor. 4390 */ 4391 return -EBADF; 4392 } 4393 4394 bdev_io = bdev_channel_get_io(channel); 4395 if (!bdev_io) { 4396 return -ENOMEM; 4397 } 4398 4399 bdev_io->internal.ch = channel; 4400 bdev_io->internal.desc = desc; 4401 bdev_io->type = SPDK_BDEV_IO_TYPE_NVME_IO_MD; 4402 bdev_io->u.nvme_passthru.cmd = *cmd; 4403 bdev_io->u.nvme_passthru.buf = buf; 4404 bdev_io->u.nvme_passthru.nbytes = nbytes; 4405 bdev_io->u.nvme_passthru.md_buf = md_buf; 4406 bdev_io->u.nvme_passthru.md_len = md_len; 4407 4408 bdev_io_init(bdev_io, bdev, cb_arg, cb); 4409 4410 bdev_io_submit(bdev_io); 4411 return 0; 4412 } 4413 4414 int 4415 spdk_bdev_queue_io_wait(struct spdk_bdev *bdev, struct spdk_io_channel *ch, 4416 struct spdk_bdev_io_wait_entry *entry) 4417 { 4418 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 4419 struct spdk_bdev_mgmt_channel *mgmt_ch = channel->shared_resource->mgmt_ch; 4420 4421 if (bdev != entry->bdev) { 4422 SPDK_ERRLOG("bdevs do not match\n"); 4423 return -EINVAL; 4424 } 4425 4426 if (mgmt_ch->per_thread_cache_count > 0) { 4427 SPDK_ERRLOG("Cannot queue io_wait if spdk_bdev_io available in per-thread cache\n"); 4428 return -EINVAL; 4429 } 4430 4431 TAILQ_INSERT_TAIL(&mgmt_ch->io_wait_queue, entry, link); 4432 return 0; 4433 } 4434 4435 static void 4436 bdev_ch_retry_io(struct spdk_bdev_channel *bdev_ch) 4437 { 4438 struct spdk_bdev *bdev = bdev_ch->bdev; 4439 struct spdk_bdev_shared_resource *shared_resource = bdev_ch->shared_resource; 4440 struct spdk_bdev_io *bdev_io; 4441 4442 if (shared_resource->io_outstanding > shared_resource->nomem_threshold) { 4443 /* 4444 * Allow some more I/O to complete before retrying the nomem_io queue. 4445 * Some drivers (such as nvme) cannot immediately take a new I/O in 4446 * the context of a completion, because the resources for the I/O are 4447 * not released until control returns to the bdev poller. Also, we 4448 * may require several small I/O to complete before a larger I/O 4449 * (that requires splitting) can be submitted. 4450 */ 4451 return; 4452 } 4453 4454 while (!TAILQ_EMPTY(&shared_resource->nomem_io)) { 4455 bdev_io = TAILQ_FIRST(&shared_resource->nomem_io); 4456 TAILQ_REMOVE(&shared_resource->nomem_io, bdev_io, internal.link); 4457 bdev_io->internal.ch->io_outstanding++; 4458 shared_resource->io_outstanding++; 4459 bdev_io->internal.status = SPDK_BDEV_IO_STATUS_PENDING; 4460 bdev_io->internal.error.nvme.cdw0 = 0; 4461 bdev_io->num_retries++; 4462 bdev->fn_table->submit_request(spdk_bdev_io_get_io_channel(bdev_io), bdev_io); 4463 if (bdev_io->internal.status == SPDK_BDEV_IO_STATUS_NOMEM) { 4464 break; 4465 } 4466 } 4467 } 4468 4469 static inline void 4470 bdev_io_complete(void *ctx) 4471 { 4472 struct spdk_bdev_io *bdev_io = ctx; 4473 struct spdk_bdev_channel *bdev_ch = bdev_io->internal.ch; 4474 uint64_t tsc, tsc_diff; 4475 4476 if (spdk_unlikely(bdev_io->internal.in_submit_request || bdev_io->internal.io_submit_ch)) { 4477 /* 4478 * Send the completion to the thread that originally submitted the I/O, 4479 * which may not be the current thread in the case of QoS. 4480 */ 4481 if (bdev_io->internal.io_submit_ch) { 4482 bdev_io->internal.ch = bdev_io->internal.io_submit_ch; 4483 bdev_io->internal.io_submit_ch = NULL; 4484 } 4485 4486 /* 4487 * Defer completion to avoid potential infinite recursion if the 4488 * user's completion callback issues a new I/O. 4489 */ 4490 spdk_thread_send_msg(spdk_bdev_io_get_thread(bdev_io), 4491 bdev_io_complete, bdev_io); 4492 return; 4493 } 4494 4495 tsc = spdk_get_ticks(); 4496 tsc_diff = tsc - bdev_io->internal.submit_tsc; 4497 spdk_trace_record_tsc(tsc, TRACE_BDEV_IO_DONE, 0, 0, (uintptr_t)bdev_io, 0); 4498 /* When a bdev_io is split, the children bdev_io are not added 4499 * to the io_submitted list. So don't try to remove them in that 4500 * case. 4501 */ 4502 if (bdev_io->internal.cb != bdev_io_split_done) { 4503 TAILQ_REMOVE(&bdev_ch->io_submitted, bdev_io, internal.ch_link); 4504 } 4505 4506 if (bdev_io->internal.ch->histogram) { 4507 spdk_histogram_data_tally(bdev_io->internal.ch->histogram, tsc_diff); 4508 } 4509 4510 if (bdev_io->internal.status == SPDK_BDEV_IO_STATUS_SUCCESS) { 4511 switch (bdev_io->type) { 4512 case SPDK_BDEV_IO_TYPE_READ: 4513 bdev_io->internal.ch->stat.bytes_read += bdev_io->u.bdev.num_blocks * bdev_io->bdev->blocklen; 4514 bdev_io->internal.ch->stat.num_read_ops++; 4515 bdev_io->internal.ch->stat.read_latency_ticks += tsc_diff; 4516 break; 4517 case SPDK_BDEV_IO_TYPE_WRITE: 4518 bdev_io->internal.ch->stat.bytes_written += bdev_io->u.bdev.num_blocks * bdev_io->bdev->blocklen; 4519 bdev_io->internal.ch->stat.num_write_ops++; 4520 bdev_io->internal.ch->stat.write_latency_ticks += tsc_diff; 4521 break; 4522 case SPDK_BDEV_IO_TYPE_UNMAP: 4523 bdev_io->internal.ch->stat.bytes_unmapped += bdev_io->u.bdev.num_blocks * bdev_io->bdev->blocklen; 4524 bdev_io->internal.ch->stat.num_unmap_ops++; 4525 bdev_io->internal.ch->stat.unmap_latency_ticks += tsc_diff; 4526 break; 4527 case SPDK_BDEV_IO_TYPE_ZCOPY: 4528 /* Track the data in the start phase only */ 4529 if (bdev_io->u.bdev.zcopy.start) { 4530 if (bdev_io->u.bdev.zcopy.populate) { 4531 bdev_io->internal.ch->stat.bytes_read += 4532 bdev_io->u.bdev.num_blocks * bdev_io->bdev->blocklen; 4533 bdev_io->internal.ch->stat.num_read_ops++; 4534 bdev_io->internal.ch->stat.read_latency_ticks += tsc_diff; 4535 } else { 4536 bdev_io->internal.ch->stat.bytes_written += 4537 bdev_io->u.bdev.num_blocks * bdev_io->bdev->blocklen; 4538 bdev_io->internal.ch->stat.num_write_ops++; 4539 bdev_io->internal.ch->stat.write_latency_ticks += tsc_diff; 4540 } 4541 } 4542 break; 4543 default: 4544 break; 4545 } 4546 } 4547 4548 #ifdef SPDK_CONFIG_VTUNE 4549 uint64_t now_tsc = spdk_get_ticks(); 4550 if (now_tsc > (bdev_io->internal.ch->start_tsc + bdev_io->internal.ch->interval_tsc)) { 4551 uint64_t data[5]; 4552 4553 data[0] = bdev_io->internal.ch->stat.num_read_ops - bdev_io->internal.ch->prev_stat.num_read_ops; 4554 data[1] = bdev_io->internal.ch->stat.bytes_read - bdev_io->internal.ch->prev_stat.bytes_read; 4555 data[2] = bdev_io->internal.ch->stat.num_write_ops - bdev_io->internal.ch->prev_stat.num_write_ops; 4556 data[3] = bdev_io->internal.ch->stat.bytes_written - bdev_io->internal.ch->prev_stat.bytes_written; 4557 data[4] = bdev_io->bdev->fn_table->get_spin_time ? 4558 bdev_io->bdev->fn_table->get_spin_time(spdk_bdev_io_get_io_channel(bdev_io)) : 0; 4559 4560 __itt_metadata_add(g_bdev_mgr.domain, __itt_null, bdev_io->internal.ch->handle, 4561 __itt_metadata_u64, 5, data); 4562 4563 bdev_io->internal.ch->prev_stat = bdev_io->internal.ch->stat; 4564 bdev_io->internal.ch->start_tsc = now_tsc; 4565 } 4566 #endif 4567 4568 assert(bdev_io->internal.cb != NULL); 4569 assert(spdk_get_thread() == spdk_bdev_io_get_thread(bdev_io)); 4570 4571 bdev_io->internal.cb(bdev_io, bdev_io->internal.status == SPDK_BDEV_IO_STATUS_SUCCESS, 4572 bdev_io->internal.caller_ctx); 4573 } 4574 4575 static void 4576 bdev_reset_complete(struct spdk_io_channel_iter *i, int status) 4577 { 4578 struct spdk_bdev_io *bdev_io = spdk_io_channel_iter_get_ctx(i); 4579 4580 if (bdev_io->u.reset.ch_ref != NULL) { 4581 spdk_put_io_channel(bdev_io->u.reset.ch_ref); 4582 bdev_io->u.reset.ch_ref = NULL; 4583 } 4584 4585 bdev_io_complete(bdev_io); 4586 } 4587 4588 static void 4589 bdev_unfreeze_channel(struct spdk_io_channel_iter *i) 4590 { 4591 struct spdk_bdev_io *bdev_io = spdk_io_channel_iter_get_ctx(i); 4592 struct spdk_io_channel *_ch = spdk_io_channel_iter_get_channel(i); 4593 struct spdk_bdev_channel *ch = spdk_io_channel_get_ctx(_ch); 4594 struct spdk_bdev_io *queued_reset; 4595 4596 ch->flags &= ~BDEV_CH_RESET_IN_PROGRESS; 4597 while (!TAILQ_EMPTY(&ch->queued_resets)) { 4598 queued_reset = TAILQ_FIRST(&ch->queued_resets); 4599 TAILQ_REMOVE(&ch->queued_resets, queued_reset, internal.link); 4600 spdk_bdev_io_complete(queued_reset, bdev_io->internal.status); 4601 } 4602 4603 spdk_for_each_channel_continue(i, 0); 4604 } 4605 4606 void 4607 spdk_bdev_io_complete(struct spdk_bdev_io *bdev_io, enum spdk_bdev_io_status status) 4608 { 4609 struct spdk_bdev *bdev = bdev_io->bdev; 4610 struct spdk_bdev_channel *bdev_ch = bdev_io->internal.ch; 4611 struct spdk_bdev_shared_resource *shared_resource = bdev_ch->shared_resource; 4612 4613 bdev_io->internal.status = status; 4614 4615 if (spdk_unlikely(bdev_io->type == SPDK_BDEV_IO_TYPE_RESET)) { 4616 bool unlock_channels = false; 4617 4618 if (status == SPDK_BDEV_IO_STATUS_NOMEM) { 4619 SPDK_ERRLOG("NOMEM returned for reset\n"); 4620 } 4621 pthread_mutex_lock(&bdev->internal.mutex); 4622 if (bdev_io == bdev->internal.reset_in_progress) { 4623 bdev->internal.reset_in_progress = NULL; 4624 unlock_channels = true; 4625 } 4626 pthread_mutex_unlock(&bdev->internal.mutex); 4627 4628 if (unlock_channels) { 4629 spdk_for_each_channel(__bdev_to_io_dev(bdev), bdev_unfreeze_channel, 4630 bdev_io, bdev_reset_complete); 4631 return; 4632 } 4633 } else { 4634 _bdev_io_unset_bounce_buf(bdev_io); 4635 4636 assert(bdev_ch->io_outstanding > 0); 4637 assert(shared_resource->io_outstanding > 0); 4638 bdev_ch->io_outstanding--; 4639 shared_resource->io_outstanding--; 4640 4641 if (spdk_unlikely(status == SPDK_BDEV_IO_STATUS_NOMEM)) { 4642 TAILQ_INSERT_HEAD(&shared_resource->nomem_io, bdev_io, internal.link); 4643 /* 4644 * Wait for some of the outstanding I/O to complete before we 4645 * retry any of the nomem_io. Normally we will wait for 4646 * NOMEM_THRESHOLD_COUNT I/O to complete but for low queue 4647 * depth channels we will instead wait for half to complete. 4648 */ 4649 shared_resource->nomem_threshold = spdk_max((int64_t)shared_resource->io_outstanding / 2, 4650 (int64_t)shared_resource->io_outstanding - NOMEM_THRESHOLD_COUNT); 4651 return; 4652 } 4653 4654 if (spdk_unlikely(!TAILQ_EMPTY(&shared_resource->nomem_io))) { 4655 bdev_ch_retry_io(bdev_ch); 4656 } 4657 } 4658 4659 bdev_io_complete(bdev_io); 4660 } 4661 4662 void 4663 spdk_bdev_io_complete_scsi_status(struct spdk_bdev_io *bdev_io, enum spdk_scsi_status sc, 4664 enum spdk_scsi_sense sk, uint8_t asc, uint8_t ascq) 4665 { 4666 if (sc == SPDK_SCSI_STATUS_GOOD) { 4667 bdev_io->internal.status = SPDK_BDEV_IO_STATUS_SUCCESS; 4668 } else { 4669 bdev_io->internal.status = SPDK_BDEV_IO_STATUS_SCSI_ERROR; 4670 bdev_io->internal.error.scsi.sc = sc; 4671 bdev_io->internal.error.scsi.sk = sk; 4672 bdev_io->internal.error.scsi.asc = asc; 4673 bdev_io->internal.error.scsi.ascq = ascq; 4674 } 4675 4676 spdk_bdev_io_complete(bdev_io, bdev_io->internal.status); 4677 } 4678 4679 void 4680 spdk_bdev_io_get_scsi_status(const struct spdk_bdev_io *bdev_io, 4681 int *sc, int *sk, int *asc, int *ascq) 4682 { 4683 assert(sc != NULL); 4684 assert(sk != NULL); 4685 assert(asc != NULL); 4686 assert(ascq != NULL); 4687 4688 switch (bdev_io->internal.status) { 4689 case SPDK_BDEV_IO_STATUS_SUCCESS: 4690 *sc = SPDK_SCSI_STATUS_GOOD; 4691 *sk = SPDK_SCSI_SENSE_NO_SENSE; 4692 *asc = SPDK_SCSI_ASC_NO_ADDITIONAL_SENSE; 4693 *ascq = SPDK_SCSI_ASCQ_CAUSE_NOT_REPORTABLE; 4694 break; 4695 case SPDK_BDEV_IO_STATUS_NVME_ERROR: 4696 spdk_scsi_nvme_translate(bdev_io, sc, sk, asc, ascq); 4697 break; 4698 case SPDK_BDEV_IO_STATUS_SCSI_ERROR: 4699 *sc = bdev_io->internal.error.scsi.sc; 4700 *sk = bdev_io->internal.error.scsi.sk; 4701 *asc = bdev_io->internal.error.scsi.asc; 4702 *ascq = bdev_io->internal.error.scsi.ascq; 4703 break; 4704 default: 4705 *sc = SPDK_SCSI_STATUS_CHECK_CONDITION; 4706 *sk = SPDK_SCSI_SENSE_ABORTED_COMMAND; 4707 *asc = SPDK_SCSI_ASC_NO_ADDITIONAL_SENSE; 4708 *ascq = SPDK_SCSI_ASCQ_CAUSE_NOT_REPORTABLE; 4709 break; 4710 } 4711 } 4712 4713 void 4714 spdk_bdev_io_complete_nvme_status(struct spdk_bdev_io *bdev_io, uint32_t cdw0, int sct, int sc) 4715 { 4716 if (sct == SPDK_NVME_SCT_GENERIC && sc == SPDK_NVME_SC_SUCCESS) { 4717 bdev_io->internal.status = SPDK_BDEV_IO_STATUS_SUCCESS; 4718 } else { 4719 bdev_io->internal.status = SPDK_BDEV_IO_STATUS_NVME_ERROR; 4720 } 4721 4722 bdev_io->internal.error.nvme.cdw0 = cdw0; 4723 bdev_io->internal.error.nvme.sct = sct; 4724 bdev_io->internal.error.nvme.sc = sc; 4725 4726 spdk_bdev_io_complete(bdev_io, bdev_io->internal.status); 4727 } 4728 4729 void 4730 spdk_bdev_io_get_nvme_status(const struct spdk_bdev_io *bdev_io, uint32_t *cdw0, int *sct, int *sc) 4731 { 4732 assert(sct != NULL); 4733 assert(sc != NULL); 4734 assert(cdw0 != NULL); 4735 4736 if (bdev_io->internal.status == SPDK_BDEV_IO_STATUS_NVME_ERROR) { 4737 *sct = bdev_io->internal.error.nvme.sct; 4738 *sc = bdev_io->internal.error.nvme.sc; 4739 } else if (bdev_io->internal.status == SPDK_BDEV_IO_STATUS_SUCCESS) { 4740 *sct = SPDK_NVME_SCT_GENERIC; 4741 *sc = SPDK_NVME_SC_SUCCESS; 4742 } else { 4743 *sct = SPDK_NVME_SCT_GENERIC; 4744 *sc = SPDK_NVME_SC_INTERNAL_DEVICE_ERROR; 4745 } 4746 4747 *cdw0 = bdev_io->internal.error.nvme.cdw0; 4748 } 4749 4750 void 4751 spdk_bdev_io_get_nvme_fused_status(const struct spdk_bdev_io *bdev_io, uint32_t *cdw0, 4752 int *first_sct, int *first_sc, int *second_sct, int *second_sc) 4753 { 4754 assert(first_sct != NULL); 4755 assert(first_sc != NULL); 4756 assert(second_sct != NULL); 4757 assert(second_sc != NULL); 4758 assert(cdw0 != NULL); 4759 4760 if (bdev_io->internal.status == SPDK_BDEV_IO_STATUS_NVME_ERROR) { 4761 if (bdev_io->internal.error.nvme.sct == SPDK_NVME_SCT_MEDIA_ERROR && 4762 bdev_io->internal.error.nvme.sc == SPDK_NVME_SC_COMPARE_FAILURE) { 4763 *first_sct = bdev_io->internal.error.nvme.sct; 4764 *first_sc = bdev_io->internal.error.nvme.sc; 4765 *second_sct = SPDK_NVME_SCT_GENERIC; 4766 *second_sc = SPDK_NVME_SC_ABORTED_FAILED_FUSED; 4767 } else { 4768 *first_sct = SPDK_NVME_SCT_GENERIC; 4769 *first_sc = SPDK_NVME_SC_SUCCESS; 4770 *second_sct = bdev_io->internal.error.nvme.sct; 4771 *second_sc = bdev_io->internal.error.nvme.sc; 4772 } 4773 } else if (bdev_io->internal.status == SPDK_BDEV_IO_STATUS_SUCCESS) { 4774 *first_sct = SPDK_NVME_SCT_GENERIC; 4775 *first_sc = SPDK_NVME_SC_SUCCESS; 4776 *second_sct = SPDK_NVME_SCT_GENERIC; 4777 *second_sc = SPDK_NVME_SC_SUCCESS; 4778 } else if (bdev_io->internal.status == SPDK_BDEV_IO_STATUS_FIRST_FUSED_FAILED) { 4779 *first_sct = SPDK_NVME_SCT_GENERIC; 4780 *first_sc = SPDK_NVME_SC_INTERNAL_DEVICE_ERROR; 4781 *second_sct = SPDK_NVME_SCT_GENERIC; 4782 *second_sc = SPDK_NVME_SC_ABORTED_FAILED_FUSED; 4783 } else if (bdev_io->internal.status == SPDK_BDEV_IO_STATUS_MISCOMPARE) { 4784 *first_sct = SPDK_NVME_SCT_MEDIA_ERROR; 4785 *first_sc = SPDK_NVME_SC_COMPARE_FAILURE; 4786 *second_sct = SPDK_NVME_SCT_GENERIC; 4787 *second_sc = SPDK_NVME_SC_ABORTED_FAILED_FUSED; 4788 } else { 4789 *first_sct = SPDK_NVME_SCT_GENERIC; 4790 *first_sc = SPDK_NVME_SC_INTERNAL_DEVICE_ERROR; 4791 *second_sct = SPDK_NVME_SCT_GENERIC; 4792 *second_sc = SPDK_NVME_SC_INTERNAL_DEVICE_ERROR; 4793 } 4794 4795 *cdw0 = bdev_io->internal.error.nvme.cdw0; 4796 } 4797 4798 struct spdk_thread * 4799 spdk_bdev_io_get_thread(struct spdk_bdev_io *bdev_io) 4800 { 4801 return spdk_io_channel_get_thread(bdev_io->internal.ch->channel); 4802 } 4803 4804 struct spdk_io_channel * 4805 spdk_bdev_io_get_io_channel(struct spdk_bdev_io *bdev_io) 4806 { 4807 return bdev_io->internal.ch->channel; 4808 } 4809 4810 static void 4811 bdev_qos_config_limit(struct spdk_bdev *bdev, uint64_t *limits) 4812 { 4813 uint64_t min_qos_set; 4814 int i; 4815 4816 for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) { 4817 if (limits[i] != SPDK_BDEV_QOS_LIMIT_NOT_DEFINED) { 4818 break; 4819 } 4820 } 4821 4822 if (i == SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES) { 4823 SPDK_ERRLOG("Invalid rate limits set.\n"); 4824 return; 4825 } 4826 4827 for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) { 4828 if (limits[i] == SPDK_BDEV_QOS_LIMIT_NOT_DEFINED) { 4829 continue; 4830 } 4831 4832 if (bdev_qos_is_iops_rate_limit(i) == true) { 4833 min_qos_set = SPDK_BDEV_QOS_MIN_IOS_PER_SEC; 4834 } else { 4835 min_qos_set = SPDK_BDEV_QOS_MIN_BYTES_PER_SEC; 4836 } 4837 4838 if (limits[i] == 0 || limits[i] % min_qos_set) { 4839 SPDK_ERRLOG("Assigned limit %" PRIu64 " on bdev %s is not multiple of %" PRIu64 "\n", 4840 limits[i], bdev->name, min_qos_set); 4841 SPDK_ERRLOG("Failed to enable QoS on this bdev %s\n", bdev->name); 4842 return; 4843 } 4844 } 4845 4846 if (!bdev->internal.qos) { 4847 bdev->internal.qos = calloc(1, sizeof(*bdev->internal.qos)); 4848 if (!bdev->internal.qos) { 4849 SPDK_ERRLOG("Unable to allocate memory for QoS tracking\n"); 4850 return; 4851 } 4852 } 4853 4854 for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) { 4855 bdev->internal.qos->rate_limits[i].limit = limits[i]; 4856 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Bdev:%s QoS type:%d set:%lu\n", 4857 bdev->name, i, limits[i]); 4858 } 4859 4860 return; 4861 } 4862 4863 static void 4864 bdev_qos_config(struct spdk_bdev *bdev) 4865 { 4866 struct spdk_conf_section *sp = NULL; 4867 const char *val = NULL; 4868 int i = 0, j = 0; 4869 uint64_t limits[SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES] = {}; 4870 bool config_qos = false; 4871 4872 sp = spdk_conf_find_section(NULL, "QoS"); 4873 if (!sp) { 4874 return; 4875 } 4876 4877 while (j < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES) { 4878 limits[j] = SPDK_BDEV_QOS_LIMIT_NOT_DEFINED; 4879 4880 i = 0; 4881 while (true) { 4882 val = spdk_conf_section_get_nmval(sp, qos_conf_type[j], i, 0); 4883 if (!val) { 4884 break; 4885 } 4886 4887 if (strcmp(bdev->name, val) != 0) { 4888 i++; 4889 continue; 4890 } 4891 4892 val = spdk_conf_section_get_nmval(sp, qos_conf_type[j], i, 1); 4893 if (val) { 4894 if (bdev_qos_is_iops_rate_limit(j) == true) { 4895 limits[j] = strtoull(val, NULL, 10); 4896 } else { 4897 limits[j] = strtoull(val, NULL, 10) * 1024 * 1024; 4898 } 4899 config_qos = true; 4900 } 4901 4902 break; 4903 } 4904 4905 j++; 4906 } 4907 4908 if (config_qos == true) { 4909 bdev_qos_config_limit(bdev, limits); 4910 } 4911 4912 return; 4913 } 4914 4915 static int 4916 bdev_init(struct spdk_bdev *bdev) 4917 { 4918 char *bdev_name; 4919 4920 assert(bdev->module != NULL); 4921 4922 if (!bdev->name) { 4923 SPDK_ERRLOG("Bdev name is NULL\n"); 4924 return -EINVAL; 4925 } 4926 4927 if (!strlen(bdev->name)) { 4928 SPDK_ERRLOG("Bdev name must not be an empty string\n"); 4929 return -EINVAL; 4930 } 4931 4932 if (spdk_bdev_get_by_name(bdev->name)) { 4933 SPDK_ERRLOG("Bdev name:%s already exists\n", bdev->name); 4934 return -EEXIST; 4935 } 4936 4937 /* Users often register their own I/O devices using the bdev name. In 4938 * order to avoid conflicts, prepend bdev_. */ 4939 bdev_name = spdk_sprintf_alloc("bdev_%s", bdev->name); 4940 if (!bdev_name) { 4941 SPDK_ERRLOG("Unable to allocate memory for internal bdev name.\n"); 4942 return -ENOMEM; 4943 } 4944 4945 bdev->internal.status = SPDK_BDEV_STATUS_READY; 4946 bdev->internal.measured_queue_depth = UINT64_MAX; 4947 bdev->internal.claim_module = NULL; 4948 bdev->internal.qd_poller = NULL; 4949 bdev->internal.qos = NULL; 4950 4951 /* If the user didn't specify a uuid, generate one. */ 4952 if (spdk_mem_all_zero(&bdev->uuid, sizeof(bdev->uuid))) { 4953 spdk_uuid_generate(&bdev->uuid); 4954 } 4955 4956 if (spdk_bdev_get_buf_align(bdev) > 1) { 4957 if (bdev->split_on_optimal_io_boundary) { 4958 bdev->optimal_io_boundary = spdk_min(bdev->optimal_io_boundary, 4959 SPDK_BDEV_LARGE_BUF_MAX_SIZE / bdev->blocklen); 4960 } else { 4961 bdev->split_on_optimal_io_boundary = true; 4962 bdev->optimal_io_boundary = SPDK_BDEV_LARGE_BUF_MAX_SIZE / bdev->blocklen; 4963 } 4964 } 4965 4966 /* If the user didn't specify a write unit size, set it to one. */ 4967 if (bdev->write_unit_size == 0) { 4968 bdev->write_unit_size = 1; 4969 } 4970 4971 /* Set ACWU value to 1 if bdev module did not set it (does not support it natively) */ 4972 if (bdev->acwu == 0) { 4973 bdev->acwu = 1; 4974 } 4975 4976 TAILQ_INIT(&bdev->internal.open_descs); 4977 TAILQ_INIT(&bdev->internal.locked_ranges); 4978 TAILQ_INIT(&bdev->internal.pending_locked_ranges); 4979 4980 TAILQ_INIT(&bdev->aliases); 4981 4982 bdev->internal.reset_in_progress = NULL; 4983 4984 bdev_qos_config(bdev); 4985 4986 spdk_io_device_register(__bdev_to_io_dev(bdev), 4987 bdev_channel_create, bdev_channel_destroy, 4988 sizeof(struct spdk_bdev_channel), 4989 bdev_name); 4990 4991 free(bdev_name); 4992 4993 pthread_mutex_init(&bdev->internal.mutex, NULL); 4994 return 0; 4995 } 4996 4997 static void 4998 bdev_destroy_cb(void *io_device) 4999 { 5000 int rc; 5001 struct spdk_bdev *bdev; 5002 spdk_bdev_unregister_cb cb_fn; 5003 void *cb_arg; 5004 5005 bdev = __bdev_from_io_dev(io_device); 5006 cb_fn = bdev->internal.unregister_cb; 5007 cb_arg = bdev->internal.unregister_ctx; 5008 5009 rc = bdev->fn_table->destruct(bdev->ctxt); 5010 if (rc < 0) { 5011 SPDK_ERRLOG("destruct failed\n"); 5012 } 5013 if (rc <= 0 && cb_fn != NULL) { 5014 cb_fn(cb_arg, rc); 5015 } 5016 } 5017 5018 5019 static void 5020 bdev_fini(struct spdk_bdev *bdev) 5021 { 5022 pthread_mutex_destroy(&bdev->internal.mutex); 5023 5024 free(bdev->internal.qos); 5025 5026 spdk_io_device_unregister(__bdev_to_io_dev(bdev), bdev_destroy_cb); 5027 } 5028 5029 static void 5030 bdev_start(struct spdk_bdev *bdev) 5031 { 5032 struct spdk_bdev_module *module; 5033 uint32_t action; 5034 5035 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Inserting bdev %s into list\n", bdev->name); 5036 TAILQ_INSERT_TAIL(&g_bdev_mgr.bdevs, bdev, internal.link); 5037 5038 /* Examine configuration before initializing I/O */ 5039 TAILQ_FOREACH(module, &g_bdev_mgr.bdev_modules, internal.tailq) { 5040 if (module->examine_config && bdev_ok_to_examine(bdev)) { 5041 action = module->internal.action_in_progress; 5042 module->internal.action_in_progress++; 5043 module->examine_config(bdev); 5044 if (action != module->internal.action_in_progress) { 5045 SPDK_ERRLOG("examine_config for module %s did not call spdk_bdev_module_examine_done()\n", 5046 module->name); 5047 } 5048 } 5049 } 5050 5051 if (bdev->internal.claim_module && bdev_ok_to_examine(bdev)) { 5052 if (bdev->internal.claim_module->examine_disk) { 5053 bdev->internal.claim_module->internal.action_in_progress++; 5054 bdev->internal.claim_module->examine_disk(bdev); 5055 } 5056 return; 5057 } 5058 5059 TAILQ_FOREACH(module, &g_bdev_mgr.bdev_modules, internal.tailq) { 5060 if (module->examine_disk && bdev_ok_to_examine(bdev)) { 5061 module->internal.action_in_progress++; 5062 module->examine_disk(bdev); 5063 } 5064 } 5065 } 5066 5067 int 5068 spdk_bdev_register(struct spdk_bdev *bdev) 5069 { 5070 int rc = bdev_init(bdev); 5071 5072 if (rc == 0) { 5073 bdev_start(bdev); 5074 } 5075 5076 spdk_notify_send("bdev_register", spdk_bdev_get_name(bdev)); 5077 return rc; 5078 } 5079 5080 int 5081 spdk_vbdev_register(struct spdk_bdev *vbdev, struct spdk_bdev **base_bdevs, int base_bdev_count) 5082 { 5083 SPDK_ERRLOG("This function is deprecated. Use spdk_bdev_register() instead.\n"); 5084 return spdk_bdev_register(vbdev); 5085 } 5086 5087 void 5088 spdk_bdev_destruct_done(struct spdk_bdev *bdev, int bdeverrno) 5089 { 5090 if (bdev->internal.unregister_cb != NULL) { 5091 bdev->internal.unregister_cb(bdev->internal.unregister_ctx, bdeverrno); 5092 } 5093 } 5094 5095 static void 5096 _remove_notify(void *arg) 5097 { 5098 struct spdk_bdev_desc *desc = arg; 5099 5100 pthread_mutex_lock(&desc->mutex); 5101 desc->refs--; 5102 5103 if (!desc->closed) { 5104 pthread_mutex_unlock(&desc->mutex); 5105 if (desc->callback.open_with_ext) { 5106 desc->callback.event_fn(SPDK_BDEV_EVENT_REMOVE, desc->bdev, desc->callback.ctx); 5107 } else { 5108 desc->callback.remove_fn(desc->callback.ctx); 5109 } 5110 return; 5111 } else if (0 == desc->refs) { 5112 /* This descriptor was closed after this remove_notify message was sent. 5113 * spdk_bdev_close() could not free the descriptor since this message was 5114 * in flight, so we free it now using bdev_desc_free(). 5115 */ 5116 pthread_mutex_unlock(&desc->mutex); 5117 bdev_desc_free(desc); 5118 return; 5119 } 5120 pthread_mutex_unlock(&desc->mutex); 5121 } 5122 5123 /* Must be called while holding bdev->internal.mutex. 5124 * returns: 0 - bdev removed and ready to be destructed. 5125 * -EBUSY - bdev can't be destructed yet. */ 5126 static int 5127 bdev_unregister_unsafe(struct spdk_bdev *bdev) 5128 { 5129 struct spdk_bdev_desc *desc, *tmp; 5130 int rc = 0; 5131 5132 /* Notify each descriptor about hotremoval */ 5133 TAILQ_FOREACH_SAFE(desc, &bdev->internal.open_descs, link, tmp) { 5134 rc = -EBUSY; 5135 pthread_mutex_lock(&desc->mutex); 5136 /* 5137 * Defer invocation of the event_cb to a separate message that will 5138 * run later on its thread. This ensures this context unwinds and 5139 * we don't recursively unregister this bdev again if the event_cb 5140 * immediately closes its descriptor. 5141 */ 5142 desc->refs++; 5143 spdk_thread_send_msg(desc->thread, _remove_notify, desc); 5144 pthread_mutex_unlock(&desc->mutex); 5145 } 5146 5147 /* If there are no descriptors, proceed removing the bdev */ 5148 if (rc == 0) { 5149 TAILQ_REMOVE(&g_bdev_mgr.bdevs, bdev, internal.link); 5150 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Removing bdev %s from list done\n", bdev->name); 5151 spdk_notify_send("bdev_unregister", spdk_bdev_get_name(bdev)); 5152 } 5153 5154 return rc; 5155 } 5156 5157 void 5158 spdk_bdev_unregister(struct spdk_bdev *bdev, spdk_bdev_unregister_cb cb_fn, void *cb_arg) 5159 { 5160 struct spdk_thread *thread; 5161 int rc; 5162 5163 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Removing bdev %s from list\n", bdev->name); 5164 5165 thread = spdk_get_thread(); 5166 if (!thread) { 5167 /* The user called this from a non-SPDK thread. */ 5168 if (cb_fn != NULL) { 5169 cb_fn(cb_arg, -ENOTSUP); 5170 } 5171 return; 5172 } 5173 5174 pthread_mutex_lock(&g_bdev_mgr.mutex); 5175 pthread_mutex_lock(&bdev->internal.mutex); 5176 if (bdev->internal.status == SPDK_BDEV_STATUS_REMOVING) { 5177 pthread_mutex_unlock(&bdev->internal.mutex); 5178 pthread_mutex_unlock(&g_bdev_mgr.mutex); 5179 if (cb_fn) { 5180 cb_fn(cb_arg, -EBUSY); 5181 } 5182 return; 5183 } 5184 5185 bdev->internal.status = SPDK_BDEV_STATUS_REMOVING; 5186 bdev->internal.unregister_cb = cb_fn; 5187 bdev->internal.unregister_ctx = cb_arg; 5188 5189 /* Call under lock. */ 5190 rc = bdev_unregister_unsafe(bdev); 5191 pthread_mutex_unlock(&bdev->internal.mutex); 5192 pthread_mutex_unlock(&g_bdev_mgr.mutex); 5193 5194 if (rc == 0) { 5195 bdev_fini(bdev); 5196 } 5197 } 5198 5199 static void 5200 bdev_dummy_event_cb(void *remove_ctx) 5201 { 5202 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Bdev remove event received with no remove callback specified"); 5203 } 5204 5205 static int 5206 bdev_start_qos(struct spdk_bdev *bdev) 5207 { 5208 struct set_qos_limit_ctx *ctx; 5209 5210 /* Enable QoS */ 5211 if (bdev->internal.qos && bdev->internal.qos->thread == NULL) { 5212 ctx = calloc(1, sizeof(*ctx)); 5213 if (ctx == NULL) { 5214 SPDK_ERRLOG("Failed to allocate memory for QoS context\n"); 5215 return -ENOMEM; 5216 } 5217 ctx->bdev = bdev; 5218 spdk_for_each_channel(__bdev_to_io_dev(bdev), 5219 bdev_enable_qos_msg, ctx, 5220 bdev_enable_qos_done); 5221 } 5222 5223 return 0; 5224 } 5225 5226 static int 5227 bdev_open(struct spdk_bdev *bdev, bool write, struct spdk_bdev_desc *desc) 5228 { 5229 struct spdk_thread *thread; 5230 int rc = 0; 5231 5232 thread = spdk_get_thread(); 5233 if (!thread) { 5234 SPDK_ERRLOG("Cannot open bdev from non-SPDK thread.\n"); 5235 return -ENOTSUP; 5236 } 5237 5238 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Opening descriptor %p for bdev %s on thread %p\n", desc, bdev->name, 5239 spdk_get_thread()); 5240 5241 desc->bdev = bdev; 5242 desc->thread = thread; 5243 desc->write = write; 5244 5245 pthread_mutex_lock(&bdev->internal.mutex); 5246 if (bdev->internal.status == SPDK_BDEV_STATUS_REMOVING) { 5247 pthread_mutex_unlock(&bdev->internal.mutex); 5248 return -ENODEV; 5249 } 5250 5251 if (write && bdev->internal.claim_module) { 5252 SPDK_ERRLOG("Could not open %s - %s module already claimed it\n", 5253 bdev->name, bdev->internal.claim_module->name); 5254 pthread_mutex_unlock(&bdev->internal.mutex); 5255 return -EPERM; 5256 } 5257 5258 rc = bdev_start_qos(bdev); 5259 if (rc != 0) { 5260 SPDK_ERRLOG("Failed to start QoS on bdev %s\n", bdev->name); 5261 pthread_mutex_unlock(&bdev->internal.mutex); 5262 return rc; 5263 } 5264 5265 TAILQ_INSERT_TAIL(&bdev->internal.open_descs, desc, link); 5266 5267 pthread_mutex_unlock(&bdev->internal.mutex); 5268 5269 return 0; 5270 } 5271 5272 int 5273 spdk_bdev_open(struct spdk_bdev *bdev, bool write, spdk_bdev_remove_cb_t remove_cb, 5274 void *remove_ctx, struct spdk_bdev_desc **_desc) 5275 { 5276 struct spdk_bdev_desc *desc; 5277 int rc; 5278 5279 desc = calloc(1, sizeof(*desc)); 5280 if (desc == NULL) { 5281 SPDK_ERRLOG("Failed to allocate memory for bdev descriptor\n"); 5282 return -ENOMEM; 5283 } 5284 5285 if (remove_cb == NULL) { 5286 remove_cb = bdev_dummy_event_cb; 5287 } 5288 5289 TAILQ_INIT(&desc->pending_media_events); 5290 TAILQ_INIT(&desc->free_media_events); 5291 5292 desc->callback.open_with_ext = false; 5293 desc->callback.remove_fn = remove_cb; 5294 desc->callback.ctx = remove_ctx; 5295 pthread_mutex_init(&desc->mutex, NULL); 5296 5297 pthread_mutex_lock(&g_bdev_mgr.mutex); 5298 5299 rc = bdev_open(bdev, write, desc); 5300 if (rc != 0) { 5301 bdev_desc_free(desc); 5302 desc = NULL; 5303 } 5304 5305 *_desc = desc; 5306 5307 pthread_mutex_unlock(&g_bdev_mgr.mutex); 5308 5309 return rc; 5310 } 5311 5312 int 5313 spdk_bdev_open_ext(const char *bdev_name, bool write, spdk_bdev_event_cb_t event_cb, 5314 void *event_ctx, struct spdk_bdev_desc **_desc) 5315 { 5316 struct spdk_bdev_desc *desc; 5317 struct spdk_bdev *bdev; 5318 unsigned int event_id; 5319 int rc; 5320 5321 if (event_cb == NULL) { 5322 SPDK_ERRLOG("Missing event callback function\n"); 5323 return -EINVAL; 5324 } 5325 5326 pthread_mutex_lock(&g_bdev_mgr.mutex); 5327 5328 bdev = spdk_bdev_get_by_name(bdev_name); 5329 5330 if (bdev == NULL) { 5331 SPDK_ERRLOG("Failed to find bdev with name: %s\n", bdev_name); 5332 pthread_mutex_unlock(&g_bdev_mgr.mutex); 5333 return -EINVAL; 5334 } 5335 5336 desc = calloc(1, sizeof(*desc)); 5337 if (desc == NULL) { 5338 SPDK_ERRLOG("Failed to allocate memory for bdev descriptor\n"); 5339 pthread_mutex_unlock(&g_bdev_mgr.mutex); 5340 return -ENOMEM; 5341 } 5342 5343 TAILQ_INIT(&desc->pending_media_events); 5344 TAILQ_INIT(&desc->free_media_events); 5345 5346 desc->callback.open_with_ext = true; 5347 desc->callback.event_fn = event_cb; 5348 desc->callback.ctx = event_ctx; 5349 pthread_mutex_init(&desc->mutex, NULL); 5350 5351 if (bdev->media_events) { 5352 desc->media_events_buffer = calloc(MEDIA_EVENT_POOL_SIZE, 5353 sizeof(*desc->media_events_buffer)); 5354 if (desc->media_events_buffer == NULL) { 5355 SPDK_ERRLOG("Failed to initialize media event pool\n"); 5356 bdev_desc_free(desc); 5357 pthread_mutex_unlock(&g_bdev_mgr.mutex); 5358 return -ENOMEM; 5359 } 5360 5361 for (event_id = 0; event_id < MEDIA_EVENT_POOL_SIZE; ++event_id) { 5362 TAILQ_INSERT_TAIL(&desc->free_media_events, 5363 &desc->media_events_buffer[event_id], tailq); 5364 } 5365 } 5366 5367 rc = bdev_open(bdev, write, desc); 5368 if (rc != 0) { 5369 bdev_desc_free(desc); 5370 desc = NULL; 5371 } 5372 5373 *_desc = desc; 5374 5375 pthread_mutex_unlock(&g_bdev_mgr.mutex); 5376 5377 return rc; 5378 } 5379 5380 void 5381 spdk_bdev_close(struct spdk_bdev_desc *desc) 5382 { 5383 struct spdk_bdev *bdev = spdk_bdev_desc_get_bdev(desc); 5384 int rc; 5385 5386 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Closing descriptor %p for bdev %s on thread %p\n", desc, bdev->name, 5387 spdk_get_thread()); 5388 5389 assert(desc->thread == spdk_get_thread()); 5390 5391 spdk_poller_unregister(&desc->io_timeout_poller); 5392 5393 pthread_mutex_lock(&bdev->internal.mutex); 5394 pthread_mutex_lock(&desc->mutex); 5395 5396 TAILQ_REMOVE(&bdev->internal.open_descs, desc, link); 5397 5398 desc->closed = true; 5399 5400 if (0 == desc->refs) { 5401 pthread_mutex_unlock(&desc->mutex); 5402 bdev_desc_free(desc); 5403 } else { 5404 pthread_mutex_unlock(&desc->mutex); 5405 } 5406 5407 /* If no more descriptors, kill QoS channel */ 5408 if (bdev->internal.qos && TAILQ_EMPTY(&bdev->internal.open_descs)) { 5409 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Closed last descriptor for bdev %s on thread %p. Stopping QoS.\n", 5410 bdev->name, spdk_get_thread()); 5411 5412 if (bdev_qos_destroy(bdev)) { 5413 /* There isn't anything we can do to recover here. Just let the 5414 * old QoS poller keep running. The QoS handling won't change 5415 * cores when the user allocates a new channel, but it won't break. */ 5416 SPDK_ERRLOG("Unable to shut down QoS poller. It will continue running on the current thread.\n"); 5417 } 5418 } 5419 5420 spdk_bdev_set_qd_sampling_period(bdev, 0); 5421 5422 if (bdev->internal.status == SPDK_BDEV_STATUS_REMOVING && TAILQ_EMPTY(&bdev->internal.open_descs)) { 5423 rc = bdev_unregister_unsafe(bdev); 5424 pthread_mutex_unlock(&bdev->internal.mutex); 5425 5426 if (rc == 0) { 5427 bdev_fini(bdev); 5428 } 5429 } else { 5430 pthread_mutex_unlock(&bdev->internal.mutex); 5431 } 5432 } 5433 5434 int 5435 spdk_bdev_module_claim_bdev(struct spdk_bdev *bdev, struct spdk_bdev_desc *desc, 5436 struct spdk_bdev_module *module) 5437 { 5438 if (bdev->internal.claim_module != NULL) { 5439 SPDK_ERRLOG("bdev %s already claimed by module %s\n", bdev->name, 5440 bdev->internal.claim_module->name); 5441 return -EPERM; 5442 } 5443 5444 if (desc && !desc->write) { 5445 desc->write = true; 5446 } 5447 5448 bdev->internal.claim_module = module; 5449 return 0; 5450 } 5451 5452 void 5453 spdk_bdev_module_release_bdev(struct spdk_bdev *bdev) 5454 { 5455 assert(bdev->internal.claim_module != NULL); 5456 bdev->internal.claim_module = NULL; 5457 } 5458 5459 struct spdk_bdev * 5460 spdk_bdev_desc_get_bdev(struct spdk_bdev_desc *desc) 5461 { 5462 assert(desc != NULL); 5463 return desc->bdev; 5464 } 5465 5466 void 5467 spdk_bdev_io_get_iovec(struct spdk_bdev_io *bdev_io, struct iovec **iovp, int *iovcntp) 5468 { 5469 struct iovec *iovs; 5470 int iovcnt; 5471 5472 if (bdev_io == NULL) { 5473 return; 5474 } 5475 5476 switch (bdev_io->type) { 5477 case SPDK_BDEV_IO_TYPE_READ: 5478 case SPDK_BDEV_IO_TYPE_WRITE: 5479 case SPDK_BDEV_IO_TYPE_ZCOPY: 5480 iovs = bdev_io->u.bdev.iovs; 5481 iovcnt = bdev_io->u.bdev.iovcnt; 5482 break; 5483 default: 5484 iovs = NULL; 5485 iovcnt = 0; 5486 break; 5487 } 5488 5489 if (iovp) { 5490 *iovp = iovs; 5491 } 5492 if (iovcntp) { 5493 *iovcntp = iovcnt; 5494 } 5495 } 5496 5497 void * 5498 spdk_bdev_io_get_md_buf(struct spdk_bdev_io *bdev_io) 5499 { 5500 if (bdev_io == NULL) { 5501 return NULL; 5502 } 5503 5504 if (!spdk_bdev_is_md_separate(bdev_io->bdev)) { 5505 return NULL; 5506 } 5507 5508 if (bdev_io->type == SPDK_BDEV_IO_TYPE_READ || 5509 bdev_io->type == SPDK_BDEV_IO_TYPE_WRITE) { 5510 return bdev_io->u.bdev.md_buf; 5511 } 5512 5513 return NULL; 5514 } 5515 5516 void 5517 spdk_bdev_module_list_add(struct spdk_bdev_module *bdev_module) 5518 { 5519 5520 if (spdk_bdev_module_list_find(bdev_module->name)) { 5521 SPDK_ERRLOG("ERROR: module '%s' already registered.\n", bdev_module->name); 5522 assert(false); 5523 } 5524 5525 /* 5526 * Modules with examine callbacks must be initialized first, so they are 5527 * ready to handle examine callbacks from later modules that will 5528 * register physical bdevs. 5529 */ 5530 if (bdev_module->examine_config != NULL || bdev_module->examine_disk != NULL) { 5531 TAILQ_INSERT_HEAD(&g_bdev_mgr.bdev_modules, bdev_module, internal.tailq); 5532 } else { 5533 TAILQ_INSERT_TAIL(&g_bdev_mgr.bdev_modules, bdev_module, internal.tailq); 5534 } 5535 } 5536 5537 struct spdk_bdev_module * 5538 spdk_bdev_module_list_find(const char *name) 5539 { 5540 struct spdk_bdev_module *bdev_module; 5541 5542 TAILQ_FOREACH(bdev_module, &g_bdev_mgr.bdev_modules, internal.tailq) { 5543 if (strcmp(name, bdev_module->name) == 0) { 5544 break; 5545 } 5546 } 5547 5548 return bdev_module; 5549 } 5550 5551 static void 5552 bdev_write_zero_buffer_next(void *_bdev_io) 5553 { 5554 struct spdk_bdev_io *bdev_io = _bdev_io; 5555 uint64_t num_bytes, num_blocks; 5556 void *md_buf = NULL; 5557 int rc; 5558 5559 num_bytes = spdk_min(_bdev_get_block_size_with_md(bdev_io->bdev) * 5560 bdev_io->u.bdev.split_remaining_num_blocks, 5561 ZERO_BUFFER_SIZE); 5562 num_blocks = num_bytes / _bdev_get_block_size_with_md(bdev_io->bdev); 5563 5564 if (spdk_bdev_is_md_separate(bdev_io->bdev)) { 5565 md_buf = (char *)g_bdev_mgr.zero_buffer + 5566 spdk_bdev_get_block_size(bdev_io->bdev) * num_blocks; 5567 } 5568 5569 rc = bdev_write_blocks_with_md(bdev_io->internal.desc, 5570 spdk_io_channel_from_ctx(bdev_io->internal.ch), 5571 g_bdev_mgr.zero_buffer, md_buf, 5572 bdev_io->u.bdev.split_current_offset_blocks, num_blocks, 5573 bdev_write_zero_buffer_done, bdev_io); 5574 if (rc == 0) { 5575 bdev_io->u.bdev.split_remaining_num_blocks -= num_blocks; 5576 bdev_io->u.bdev.split_current_offset_blocks += num_blocks; 5577 } else if (rc == -ENOMEM) { 5578 bdev_queue_io_wait_with_cb(bdev_io, bdev_write_zero_buffer_next); 5579 } else { 5580 bdev_io->internal.status = SPDK_BDEV_IO_STATUS_FAILED; 5581 bdev_io->internal.cb(bdev_io, false, bdev_io->internal.caller_ctx); 5582 } 5583 } 5584 5585 static void 5586 bdev_write_zero_buffer_done(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg) 5587 { 5588 struct spdk_bdev_io *parent_io = cb_arg; 5589 5590 spdk_bdev_free_io(bdev_io); 5591 5592 if (!success) { 5593 parent_io->internal.status = SPDK_BDEV_IO_STATUS_FAILED; 5594 parent_io->internal.cb(parent_io, false, parent_io->internal.caller_ctx); 5595 return; 5596 } 5597 5598 if (parent_io->u.bdev.split_remaining_num_blocks == 0) { 5599 parent_io->internal.status = SPDK_BDEV_IO_STATUS_SUCCESS; 5600 parent_io->internal.cb(parent_io, true, parent_io->internal.caller_ctx); 5601 return; 5602 } 5603 5604 bdev_write_zero_buffer_next(parent_io); 5605 } 5606 5607 static void 5608 bdev_set_qos_limit_done(struct set_qos_limit_ctx *ctx, int status) 5609 { 5610 pthread_mutex_lock(&ctx->bdev->internal.mutex); 5611 ctx->bdev->internal.qos_mod_in_progress = false; 5612 pthread_mutex_unlock(&ctx->bdev->internal.mutex); 5613 5614 if (ctx->cb_fn) { 5615 ctx->cb_fn(ctx->cb_arg, status); 5616 } 5617 free(ctx); 5618 } 5619 5620 static void 5621 bdev_disable_qos_done(void *cb_arg) 5622 { 5623 struct set_qos_limit_ctx *ctx = cb_arg; 5624 struct spdk_bdev *bdev = ctx->bdev; 5625 struct spdk_bdev_io *bdev_io; 5626 struct spdk_bdev_qos *qos; 5627 5628 pthread_mutex_lock(&bdev->internal.mutex); 5629 qos = bdev->internal.qos; 5630 bdev->internal.qos = NULL; 5631 pthread_mutex_unlock(&bdev->internal.mutex); 5632 5633 while (!TAILQ_EMPTY(&qos->queued)) { 5634 /* Send queued I/O back to their original thread for resubmission. */ 5635 bdev_io = TAILQ_FIRST(&qos->queued); 5636 TAILQ_REMOVE(&qos->queued, bdev_io, internal.link); 5637 5638 if (bdev_io->internal.io_submit_ch) { 5639 /* 5640 * Channel was changed when sending it to the QoS thread - change it back 5641 * before sending it back to the original thread. 5642 */ 5643 bdev_io->internal.ch = bdev_io->internal.io_submit_ch; 5644 bdev_io->internal.io_submit_ch = NULL; 5645 } 5646 5647 spdk_thread_send_msg(spdk_bdev_io_get_thread(bdev_io), 5648 _bdev_io_submit, bdev_io); 5649 } 5650 5651 if (qos->thread != NULL) { 5652 spdk_put_io_channel(spdk_io_channel_from_ctx(qos->ch)); 5653 spdk_poller_unregister(&qos->poller); 5654 } 5655 5656 free(qos); 5657 5658 bdev_set_qos_limit_done(ctx, 0); 5659 } 5660 5661 static void 5662 bdev_disable_qos_msg_done(struct spdk_io_channel_iter *i, int status) 5663 { 5664 void *io_device = spdk_io_channel_iter_get_io_device(i); 5665 struct spdk_bdev *bdev = __bdev_from_io_dev(io_device); 5666 struct set_qos_limit_ctx *ctx = spdk_io_channel_iter_get_ctx(i); 5667 struct spdk_thread *thread; 5668 5669 pthread_mutex_lock(&bdev->internal.mutex); 5670 thread = bdev->internal.qos->thread; 5671 pthread_mutex_unlock(&bdev->internal.mutex); 5672 5673 if (thread != NULL) { 5674 spdk_thread_send_msg(thread, bdev_disable_qos_done, ctx); 5675 } else { 5676 bdev_disable_qos_done(ctx); 5677 } 5678 } 5679 5680 static void 5681 bdev_disable_qos_msg(struct spdk_io_channel_iter *i) 5682 { 5683 struct spdk_io_channel *ch = spdk_io_channel_iter_get_channel(i); 5684 struct spdk_bdev_channel *bdev_ch = spdk_io_channel_get_ctx(ch); 5685 5686 bdev_ch->flags &= ~BDEV_CH_QOS_ENABLED; 5687 5688 spdk_for_each_channel_continue(i, 0); 5689 } 5690 5691 static void 5692 bdev_update_qos_rate_limit_msg(void *cb_arg) 5693 { 5694 struct set_qos_limit_ctx *ctx = cb_arg; 5695 struct spdk_bdev *bdev = ctx->bdev; 5696 5697 pthread_mutex_lock(&bdev->internal.mutex); 5698 bdev_qos_update_max_quota_per_timeslice(bdev->internal.qos); 5699 pthread_mutex_unlock(&bdev->internal.mutex); 5700 5701 bdev_set_qos_limit_done(ctx, 0); 5702 } 5703 5704 static void 5705 bdev_enable_qos_msg(struct spdk_io_channel_iter *i) 5706 { 5707 void *io_device = spdk_io_channel_iter_get_io_device(i); 5708 struct spdk_bdev *bdev = __bdev_from_io_dev(io_device); 5709 struct spdk_io_channel *ch = spdk_io_channel_iter_get_channel(i); 5710 struct spdk_bdev_channel *bdev_ch = spdk_io_channel_get_ctx(ch); 5711 5712 pthread_mutex_lock(&bdev->internal.mutex); 5713 bdev_enable_qos(bdev, bdev_ch); 5714 pthread_mutex_unlock(&bdev->internal.mutex); 5715 spdk_for_each_channel_continue(i, 0); 5716 } 5717 5718 static void 5719 bdev_enable_qos_done(struct spdk_io_channel_iter *i, int status) 5720 { 5721 struct set_qos_limit_ctx *ctx = spdk_io_channel_iter_get_ctx(i); 5722 5723 bdev_set_qos_limit_done(ctx, status); 5724 } 5725 5726 static void 5727 bdev_set_qos_rate_limits(struct spdk_bdev *bdev, uint64_t *limits) 5728 { 5729 int i; 5730 5731 assert(bdev->internal.qos != NULL); 5732 5733 for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) { 5734 if (limits[i] != SPDK_BDEV_QOS_LIMIT_NOT_DEFINED) { 5735 bdev->internal.qos->rate_limits[i].limit = limits[i]; 5736 5737 if (limits[i] == 0) { 5738 bdev->internal.qos->rate_limits[i].limit = 5739 SPDK_BDEV_QOS_LIMIT_NOT_DEFINED; 5740 } 5741 } 5742 } 5743 } 5744 5745 void 5746 spdk_bdev_set_qos_rate_limits(struct spdk_bdev *bdev, uint64_t *limits, 5747 void (*cb_fn)(void *cb_arg, int status), void *cb_arg) 5748 { 5749 struct set_qos_limit_ctx *ctx; 5750 uint32_t limit_set_complement; 5751 uint64_t min_limit_per_sec; 5752 int i; 5753 bool disable_rate_limit = true; 5754 5755 for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) { 5756 if (limits[i] == SPDK_BDEV_QOS_LIMIT_NOT_DEFINED) { 5757 continue; 5758 } 5759 5760 if (limits[i] > 0) { 5761 disable_rate_limit = false; 5762 } 5763 5764 if (bdev_qos_is_iops_rate_limit(i) == true) { 5765 min_limit_per_sec = SPDK_BDEV_QOS_MIN_IOS_PER_SEC; 5766 } else { 5767 /* Change from megabyte to byte rate limit */ 5768 limits[i] = limits[i] * 1024 * 1024; 5769 min_limit_per_sec = SPDK_BDEV_QOS_MIN_BYTES_PER_SEC; 5770 } 5771 5772 limit_set_complement = limits[i] % min_limit_per_sec; 5773 if (limit_set_complement) { 5774 SPDK_ERRLOG("Requested rate limit %" PRIu64 " is not a multiple of %" PRIu64 "\n", 5775 limits[i], min_limit_per_sec); 5776 limits[i] += min_limit_per_sec - limit_set_complement; 5777 SPDK_ERRLOG("Round up the rate limit to %" PRIu64 "\n", limits[i]); 5778 } 5779 } 5780 5781 ctx = calloc(1, sizeof(*ctx)); 5782 if (ctx == NULL) { 5783 cb_fn(cb_arg, -ENOMEM); 5784 return; 5785 } 5786 5787 ctx->cb_fn = cb_fn; 5788 ctx->cb_arg = cb_arg; 5789 ctx->bdev = bdev; 5790 5791 pthread_mutex_lock(&bdev->internal.mutex); 5792 if (bdev->internal.qos_mod_in_progress) { 5793 pthread_mutex_unlock(&bdev->internal.mutex); 5794 free(ctx); 5795 cb_fn(cb_arg, -EAGAIN); 5796 return; 5797 } 5798 bdev->internal.qos_mod_in_progress = true; 5799 5800 if (disable_rate_limit == true && bdev->internal.qos) { 5801 for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) { 5802 if (limits[i] == SPDK_BDEV_QOS_LIMIT_NOT_DEFINED && 5803 (bdev->internal.qos->rate_limits[i].limit > 0 && 5804 bdev->internal.qos->rate_limits[i].limit != 5805 SPDK_BDEV_QOS_LIMIT_NOT_DEFINED)) { 5806 disable_rate_limit = false; 5807 break; 5808 } 5809 } 5810 } 5811 5812 if (disable_rate_limit == false) { 5813 if (bdev->internal.qos == NULL) { 5814 bdev->internal.qos = calloc(1, sizeof(*bdev->internal.qos)); 5815 if (!bdev->internal.qos) { 5816 pthread_mutex_unlock(&bdev->internal.mutex); 5817 SPDK_ERRLOG("Unable to allocate memory for QoS tracking\n"); 5818 bdev_set_qos_limit_done(ctx, -ENOMEM); 5819 return; 5820 } 5821 } 5822 5823 if (bdev->internal.qos->thread == NULL) { 5824 /* Enabling */ 5825 bdev_set_qos_rate_limits(bdev, limits); 5826 5827 spdk_for_each_channel(__bdev_to_io_dev(bdev), 5828 bdev_enable_qos_msg, ctx, 5829 bdev_enable_qos_done); 5830 } else { 5831 /* Updating */ 5832 bdev_set_qos_rate_limits(bdev, limits); 5833 5834 spdk_thread_send_msg(bdev->internal.qos->thread, 5835 bdev_update_qos_rate_limit_msg, ctx); 5836 } 5837 } else { 5838 if (bdev->internal.qos != NULL) { 5839 bdev_set_qos_rate_limits(bdev, limits); 5840 5841 /* Disabling */ 5842 spdk_for_each_channel(__bdev_to_io_dev(bdev), 5843 bdev_disable_qos_msg, ctx, 5844 bdev_disable_qos_msg_done); 5845 } else { 5846 pthread_mutex_unlock(&bdev->internal.mutex); 5847 bdev_set_qos_limit_done(ctx, 0); 5848 return; 5849 } 5850 } 5851 5852 pthread_mutex_unlock(&bdev->internal.mutex); 5853 } 5854 5855 struct spdk_bdev_histogram_ctx { 5856 spdk_bdev_histogram_status_cb cb_fn; 5857 void *cb_arg; 5858 struct spdk_bdev *bdev; 5859 int status; 5860 }; 5861 5862 static void 5863 bdev_histogram_disable_channel_cb(struct spdk_io_channel_iter *i, int status) 5864 { 5865 struct spdk_bdev_histogram_ctx *ctx = spdk_io_channel_iter_get_ctx(i); 5866 5867 pthread_mutex_lock(&ctx->bdev->internal.mutex); 5868 ctx->bdev->internal.histogram_in_progress = false; 5869 pthread_mutex_unlock(&ctx->bdev->internal.mutex); 5870 ctx->cb_fn(ctx->cb_arg, ctx->status); 5871 free(ctx); 5872 } 5873 5874 static void 5875 bdev_histogram_disable_channel(struct spdk_io_channel_iter *i) 5876 { 5877 struct spdk_io_channel *_ch = spdk_io_channel_iter_get_channel(i); 5878 struct spdk_bdev_channel *ch = spdk_io_channel_get_ctx(_ch); 5879 5880 if (ch->histogram != NULL) { 5881 spdk_histogram_data_free(ch->histogram); 5882 ch->histogram = NULL; 5883 } 5884 spdk_for_each_channel_continue(i, 0); 5885 } 5886 5887 static void 5888 bdev_histogram_enable_channel_cb(struct spdk_io_channel_iter *i, int status) 5889 { 5890 struct spdk_bdev_histogram_ctx *ctx = spdk_io_channel_iter_get_ctx(i); 5891 5892 if (status != 0) { 5893 ctx->status = status; 5894 ctx->bdev->internal.histogram_enabled = false; 5895 spdk_for_each_channel(__bdev_to_io_dev(ctx->bdev), bdev_histogram_disable_channel, ctx, 5896 bdev_histogram_disable_channel_cb); 5897 } else { 5898 pthread_mutex_lock(&ctx->bdev->internal.mutex); 5899 ctx->bdev->internal.histogram_in_progress = false; 5900 pthread_mutex_unlock(&ctx->bdev->internal.mutex); 5901 ctx->cb_fn(ctx->cb_arg, ctx->status); 5902 free(ctx); 5903 } 5904 } 5905 5906 static void 5907 bdev_histogram_enable_channel(struct spdk_io_channel_iter *i) 5908 { 5909 struct spdk_io_channel *_ch = spdk_io_channel_iter_get_channel(i); 5910 struct spdk_bdev_channel *ch = spdk_io_channel_get_ctx(_ch); 5911 int status = 0; 5912 5913 if (ch->histogram == NULL) { 5914 ch->histogram = spdk_histogram_data_alloc(); 5915 if (ch->histogram == NULL) { 5916 status = -ENOMEM; 5917 } 5918 } 5919 5920 spdk_for_each_channel_continue(i, status); 5921 } 5922 5923 void 5924 spdk_bdev_histogram_enable(struct spdk_bdev *bdev, spdk_bdev_histogram_status_cb cb_fn, 5925 void *cb_arg, bool enable) 5926 { 5927 struct spdk_bdev_histogram_ctx *ctx; 5928 5929 ctx = calloc(1, sizeof(struct spdk_bdev_histogram_ctx)); 5930 if (ctx == NULL) { 5931 cb_fn(cb_arg, -ENOMEM); 5932 return; 5933 } 5934 5935 ctx->bdev = bdev; 5936 ctx->status = 0; 5937 ctx->cb_fn = cb_fn; 5938 ctx->cb_arg = cb_arg; 5939 5940 pthread_mutex_lock(&bdev->internal.mutex); 5941 if (bdev->internal.histogram_in_progress) { 5942 pthread_mutex_unlock(&bdev->internal.mutex); 5943 free(ctx); 5944 cb_fn(cb_arg, -EAGAIN); 5945 return; 5946 } 5947 5948 bdev->internal.histogram_in_progress = true; 5949 pthread_mutex_unlock(&bdev->internal.mutex); 5950 5951 bdev->internal.histogram_enabled = enable; 5952 5953 if (enable) { 5954 /* Allocate histogram for each channel */ 5955 spdk_for_each_channel(__bdev_to_io_dev(bdev), bdev_histogram_enable_channel, ctx, 5956 bdev_histogram_enable_channel_cb); 5957 } else { 5958 spdk_for_each_channel(__bdev_to_io_dev(bdev), bdev_histogram_disable_channel, ctx, 5959 bdev_histogram_disable_channel_cb); 5960 } 5961 } 5962 5963 struct spdk_bdev_histogram_data_ctx { 5964 spdk_bdev_histogram_data_cb cb_fn; 5965 void *cb_arg; 5966 struct spdk_bdev *bdev; 5967 /** merged histogram data from all channels */ 5968 struct spdk_histogram_data *histogram; 5969 }; 5970 5971 static void 5972 bdev_histogram_get_channel_cb(struct spdk_io_channel_iter *i, int status) 5973 { 5974 struct spdk_bdev_histogram_data_ctx *ctx = spdk_io_channel_iter_get_ctx(i); 5975 5976 ctx->cb_fn(ctx->cb_arg, status, ctx->histogram); 5977 free(ctx); 5978 } 5979 5980 static void 5981 bdev_histogram_get_channel(struct spdk_io_channel_iter *i) 5982 { 5983 struct spdk_io_channel *_ch = spdk_io_channel_iter_get_channel(i); 5984 struct spdk_bdev_channel *ch = spdk_io_channel_get_ctx(_ch); 5985 struct spdk_bdev_histogram_data_ctx *ctx = spdk_io_channel_iter_get_ctx(i); 5986 int status = 0; 5987 5988 if (ch->histogram == NULL) { 5989 status = -EFAULT; 5990 } else { 5991 spdk_histogram_data_merge(ctx->histogram, ch->histogram); 5992 } 5993 5994 spdk_for_each_channel_continue(i, status); 5995 } 5996 5997 void 5998 spdk_bdev_histogram_get(struct spdk_bdev *bdev, struct spdk_histogram_data *histogram, 5999 spdk_bdev_histogram_data_cb cb_fn, 6000 void *cb_arg) 6001 { 6002 struct spdk_bdev_histogram_data_ctx *ctx; 6003 6004 ctx = calloc(1, sizeof(struct spdk_bdev_histogram_data_ctx)); 6005 if (ctx == NULL) { 6006 cb_fn(cb_arg, -ENOMEM, NULL); 6007 return; 6008 } 6009 6010 ctx->bdev = bdev; 6011 ctx->cb_fn = cb_fn; 6012 ctx->cb_arg = cb_arg; 6013 6014 ctx->histogram = histogram; 6015 6016 spdk_for_each_channel(__bdev_to_io_dev(bdev), bdev_histogram_get_channel, ctx, 6017 bdev_histogram_get_channel_cb); 6018 } 6019 6020 size_t 6021 spdk_bdev_get_media_events(struct spdk_bdev_desc *desc, struct spdk_bdev_media_event *events, 6022 size_t max_events) 6023 { 6024 struct media_event_entry *entry; 6025 size_t num_events = 0; 6026 6027 for (; num_events < max_events; ++num_events) { 6028 entry = TAILQ_FIRST(&desc->pending_media_events); 6029 if (entry == NULL) { 6030 break; 6031 } 6032 6033 events[num_events] = entry->event; 6034 TAILQ_REMOVE(&desc->pending_media_events, entry, tailq); 6035 TAILQ_INSERT_TAIL(&desc->free_media_events, entry, tailq); 6036 } 6037 6038 return num_events; 6039 } 6040 6041 int 6042 spdk_bdev_push_media_events(struct spdk_bdev *bdev, const struct spdk_bdev_media_event *events, 6043 size_t num_events) 6044 { 6045 struct spdk_bdev_desc *desc; 6046 struct media_event_entry *entry; 6047 size_t event_id; 6048 int rc = 0; 6049 6050 assert(bdev->media_events); 6051 6052 pthread_mutex_lock(&bdev->internal.mutex); 6053 TAILQ_FOREACH(desc, &bdev->internal.open_descs, link) { 6054 if (desc->write) { 6055 break; 6056 } 6057 } 6058 6059 if (desc == NULL || desc->media_events_buffer == NULL) { 6060 rc = -ENODEV; 6061 goto out; 6062 } 6063 6064 for (event_id = 0; event_id < num_events; ++event_id) { 6065 entry = TAILQ_FIRST(&desc->free_media_events); 6066 if (entry == NULL) { 6067 break; 6068 } 6069 6070 TAILQ_REMOVE(&desc->free_media_events, entry, tailq); 6071 TAILQ_INSERT_TAIL(&desc->pending_media_events, entry, tailq); 6072 entry->event = events[event_id]; 6073 } 6074 6075 rc = event_id; 6076 out: 6077 pthread_mutex_unlock(&bdev->internal.mutex); 6078 return rc; 6079 } 6080 6081 void 6082 spdk_bdev_notify_media_management(struct spdk_bdev *bdev) 6083 { 6084 struct spdk_bdev_desc *desc; 6085 6086 pthread_mutex_lock(&bdev->internal.mutex); 6087 TAILQ_FOREACH(desc, &bdev->internal.open_descs, link) { 6088 if (!TAILQ_EMPTY(&desc->pending_media_events)) { 6089 desc->callback.event_fn(SPDK_BDEV_EVENT_MEDIA_MANAGEMENT, bdev, 6090 desc->callback.ctx); 6091 } 6092 } 6093 pthread_mutex_unlock(&bdev->internal.mutex); 6094 } 6095 6096 struct locked_lba_range_ctx { 6097 struct lba_range range; 6098 struct spdk_bdev *bdev; 6099 struct lba_range *current_range; 6100 struct lba_range *owner_range; 6101 struct spdk_poller *poller; 6102 lock_range_cb cb_fn; 6103 void *cb_arg; 6104 }; 6105 6106 static void 6107 bdev_lock_error_cleanup_cb(struct spdk_io_channel_iter *i, int status) 6108 { 6109 struct locked_lba_range_ctx *ctx = spdk_io_channel_iter_get_ctx(i); 6110 6111 ctx->cb_fn(ctx->cb_arg, -ENOMEM); 6112 free(ctx); 6113 } 6114 6115 static void 6116 bdev_unlock_lba_range_get_channel(struct spdk_io_channel_iter *i); 6117 6118 static void 6119 bdev_lock_lba_range_cb(struct spdk_io_channel_iter *i, int status) 6120 { 6121 struct locked_lba_range_ctx *ctx = spdk_io_channel_iter_get_ctx(i); 6122 struct spdk_bdev *bdev = ctx->bdev; 6123 6124 if (status == -ENOMEM) { 6125 /* One of the channels could not allocate a range object. 6126 * So we have to go back and clean up any ranges that were 6127 * allocated successfully before we return error status to 6128 * the caller. We can reuse the unlock function to do that 6129 * clean up. 6130 */ 6131 spdk_for_each_channel(__bdev_to_io_dev(bdev), 6132 bdev_unlock_lba_range_get_channel, ctx, 6133 bdev_lock_error_cleanup_cb); 6134 return; 6135 } 6136 6137 /* All channels have locked this range and no I/O overlapping the range 6138 * are outstanding! Set the owner_ch for the range object for the 6139 * locking channel, so that this channel will know that it is allowed 6140 * to write to this range. 6141 */ 6142 ctx->owner_range->owner_ch = ctx->range.owner_ch; 6143 ctx->cb_fn(ctx->cb_arg, status); 6144 6145 /* Don't free the ctx here. Its range is in the bdev's global list of 6146 * locked ranges still, and will be removed and freed when this range 6147 * is later unlocked. 6148 */ 6149 } 6150 6151 static int 6152 bdev_lock_lba_range_check_io(void *_i) 6153 { 6154 struct spdk_io_channel_iter *i = _i; 6155 struct spdk_io_channel *_ch = spdk_io_channel_iter_get_channel(i); 6156 struct spdk_bdev_channel *ch = spdk_io_channel_get_ctx(_ch); 6157 struct locked_lba_range_ctx *ctx = spdk_io_channel_iter_get_ctx(i); 6158 struct lba_range *range = ctx->current_range; 6159 struct spdk_bdev_io *bdev_io; 6160 6161 spdk_poller_unregister(&ctx->poller); 6162 6163 /* The range is now in the locked_ranges, so no new IO can be submitted to this 6164 * range. But we need to wait until any outstanding IO overlapping with this range 6165 * are completed. 6166 */ 6167 TAILQ_FOREACH(bdev_io, &ch->io_submitted, internal.ch_link) { 6168 if (bdev_io_range_is_locked(bdev_io, range)) { 6169 ctx->poller = SPDK_POLLER_REGISTER(bdev_lock_lba_range_check_io, i, 100); 6170 return 1; 6171 } 6172 } 6173 6174 spdk_for_each_channel_continue(i, 0); 6175 return 1; 6176 } 6177 6178 static void 6179 bdev_lock_lba_range_get_channel(struct spdk_io_channel_iter *i) 6180 { 6181 struct spdk_io_channel *_ch = spdk_io_channel_iter_get_channel(i); 6182 struct spdk_bdev_channel *ch = spdk_io_channel_get_ctx(_ch); 6183 struct locked_lba_range_ctx *ctx = spdk_io_channel_iter_get_ctx(i); 6184 struct lba_range *range; 6185 6186 TAILQ_FOREACH(range, &ch->locked_ranges, tailq) { 6187 if (range->length == ctx->range.length && 6188 range->offset == ctx->range.offset && 6189 range->locked_ctx == ctx->range.locked_ctx) { 6190 /* This range already exists on this channel, so don't add 6191 * it again. This can happen when a new channel is created 6192 * while the for_each_channel operation is in progress. 6193 * Do not check for outstanding I/O in that case, since the 6194 * range was locked before any I/O could be submitted to the 6195 * new channel. 6196 */ 6197 spdk_for_each_channel_continue(i, 0); 6198 return; 6199 } 6200 } 6201 6202 range = calloc(1, sizeof(*range)); 6203 if (range == NULL) { 6204 spdk_for_each_channel_continue(i, -ENOMEM); 6205 return; 6206 } 6207 6208 range->length = ctx->range.length; 6209 range->offset = ctx->range.offset; 6210 range->locked_ctx = ctx->range.locked_ctx; 6211 ctx->current_range = range; 6212 if (ctx->range.owner_ch == ch) { 6213 /* This is the range object for the channel that will hold 6214 * the lock. Store it in the ctx object so that we can easily 6215 * set its owner_ch after the lock is finally acquired. 6216 */ 6217 ctx->owner_range = range; 6218 } 6219 TAILQ_INSERT_TAIL(&ch->locked_ranges, range, tailq); 6220 bdev_lock_lba_range_check_io(i); 6221 } 6222 6223 static void 6224 bdev_lock_lba_range_ctx(struct spdk_bdev *bdev, struct locked_lba_range_ctx *ctx) 6225 { 6226 assert(spdk_get_thread() == ctx->range.owner_ch->channel->thread); 6227 6228 /* We will add a copy of this range to each channel now. */ 6229 spdk_for_each_channel(__bdev_to_io_dev(bdev), bdev_lock_lba_range_get_channel, ctx, 6230 bdev_lock_lba_range_cb); 6231 } 6232 6233 static bool 6234 bdev_lba_range_overlaps_tailq(struct lba_range *range, lba_range_tailq_t *tailq) 6235 { 6236 struct lba_range *r; 6237 6238 TAILQ_FOREACH(r, tailq, tailq) { 6239 if (bdev_lba_range_overlapped(range, r)) { 6240 return true; 6241 } 6242 } 6243 return false; 6244 } 6245 6246 static int 6247 bdev_lock_lba_range(struct spdk_bdev_desc *desc, struct spdk_io_channel *_ch, 6248 uint64_t offset, uint64_t length, 6249 lock_range_cb cb_fn, void *cb_arg) 6250 { 6251 struct spdk_bdev *bdev = spdk_bdev_desc_get_bdev(desc); 6252 struct spdk_bdev_channel *ch = spdk_io_channel_get_ctx(_ch); 6253 struct locked_lba_range_ctx *ctx; 6254 6255 if (cb_arg == NULL) { 6256 SPDK_ERRLOG("cb_arg must not be NULL\n"); 6257 return -EINVAL; 6258 } 6259 6260 ctx = calloc(1, sizeof(*ctx)); 6261 if (ctx == NULL) { 6262 return -ENOMEM; 6263 } 6264 6265 ctx->range.offset = offset; 6266 ctx->range.length = length; 6267 ctx->range.owner_ch = ch; 6268 ctx->range.locked_ctx = cb_arg; 6269 ctx->bdev = bdev; 6270 ctx->cb_fn = cb_fn; 6271 ctx->cb_arg = cb_arg; 6272 6273 pthread_mutex_lock(&bdev->internal.mutex); 6274 if (bdev_lba_range_overlaps_tailq(&ctx->range, &bdev->internal.locked_ranges)) { 6275 /* There is an active lock overlapping with this range. 6276 * Put it on the pending list until this range no 6277 * longer overlaps with another. 6278 */ 6279 TAILQ_INSERT_TAIL(&bdev->internal.pending_locked_ranges, &ctx->range, tailq); 6280 } else { 6281 TAILQ_INSERT_TAIL(&bdev->internal.locked_ranges, &ctx->range, tailq); 6282 bdev_lock_lba_range_ctx(bdev, ctx); 6283 } 6284 pthread_mutex_unlock(&bdev->internal.mutex); 6285 return 0; 6286 } 6287 6288 static void 6289 bdev_lock_lba_range_ctx_msg(void *_ctx) 6290 { 6291 struct locked_lba_range_ctx *ctx = _ctx; 6292 6293 bdev_lock_lba_range_ctx(ctx->bdev, ctx); 6294 } 6295 6296 static void 6297 bdev_unlock_lba_range_cb(struct spdk_io_channel_iter *i, int status) 6298 { 6299 struct locked_lba_range_ctx *ctx = spdk_io_channel_iter_get_ctx(i); 6300 struct locked_lba_range_ctx *pending_ctx; 6301 struct spdk_bdev_channel *ch = ctx->range.owner_ch; 6302 struct spdk_bdev *bdev = ch->bdev; 6303 struct lba_range *range, *tmp; 6304 6305 pthread_mutex_lock(&bdev->internal.mutex); 6306 /* Check if there are any pending locked ranges that overlap with this range 6307 * that was just unlocked. If there are, check that it doesn't overlap with any 6308 * other locked ranges before calling bdev_lock_lba_range_ctx which will start 6309 * the lock process. 6310 */ 6311 TAILQ_FOREACH_SAFE(range, &bdev->internal.pending_locked_ranges, tailq, tmp) { 6312 if (bdev_lba_range_overlapped(range, &ctx->range) && 6313 !bdev_lba_range_overlaps_tailq(range, &bdev->internal.locked_ranges)) { 6314 TAILQ_REMOVE(&bdev->internal.pending_locked_ranges, range, tailq); 6315 pending_ctx = SPDK_CONTAINEROF(range, struct locked_lba_range_ctx, range); 6316 TAILQ_INSERT_TAIL(&bdev->internal.locked_ranges, range, tailq); 6317 spdk_thread_send_msg(pending_ctx->range.owner_ch->channel->thread, 6318 bdev_lock_lba_range_ctx_msg, pending_ctx); 6319 } 6320 } 6321 pthread_mutex_unlock(&bdev->internal.mutex); 6322 6323 ctx->cb_fn(ctx->cb_arg, status); 6324 free(ctx); 6325 } 6326 6327 static void 6328 bdev_unlock_lba_range_get_channel(struct spdk_io_channel_iter *i) 6329 { 6330 struct spdk_io_channel *_ch = spdk_io_channel_iter_get_channel(i); 6331 struct spdk_bdev_channel *ch = spdk_io_channel_get_ctx(_ch); 6332 struct locked_lba_range_ctx *ctx = spdk_io_channel_iter_get_ctx(i); 6333 TAILQ_HEAD(, spdk_bdev_io) io_locked; 6334 struct spdk_bdev_io *bdev_io; 6335 struct lba_range *range; 6336 6337 TAILQ_FOREACH(range, &ch->locked_ranges, tailq) { 6338 if (ctx->range.offset == range->offset && 6339 ctx->range.length == range->length && 6340 ctx->range.locked_ctx == range->locked_ctx) { 6341 TAILQ_REMOVE(&ch->locked_ranges, range, tailq); 6342 free(range); 6343 break; 6344 } 6345 } 6346 6347 /* Note: we should almost always be able to assert that the range specified 6348 * was found. But there are some very rare corner cases where a new channel 6349 * gets created simultaneously with a range unlock, where this function 6350 * would execute on that new channel and wouldn't have the range. 6351 * We also use this to clean up range allocations when a later allocation 6352 * fails in the locking path. 6353 * So we can't actually assert() here. 6354 */ 6355 6356 /* Swap the locked IO into a temporary list, and then try to submit them again. 6357 * We could hyper-optimize this to only resubmit locked I/O that overlap 6358 * with the range that was just unlocked, but this isn't a performance path so 6359 * we go for simplicity here. 6360 */ 6361 TAILQ_INIT(&io_locked); 6362 TAILQ_SWAP(&ch->io_locked, &io_locked, spdk_bdev_io, internal.ch_link); 6363 while (!TAILQ_EMPTY(&io_locked)) { 6364 bdev_io = TAILQ_FIRST(&io_locked); 6365 TAILQ_REMOVE(&io_locked, bdev_io, internal.ch_link); 6366 bdev_io_submit(bdev_io); 6367 } 6368 6369 spdk_for_each_channel_continue(i, 0); 6370 } 6371 6372 static int 6373 bdev_unlock_lba_range(struct spdk_bdev_desc *desc, struct spdk_io_channel *_ch, 6374 uint64_t offset, uint64_t length, 6375 lock_range_cb cb_fn, void *cb_arg) 6376 { 6377 struct spdk_bdev *bdev = spdk_bdev_desc_get_bdev(desc); 6378 struct spdk_bdev_channel *ch = spdk_io_channel_get_ctx(_ch); 6379 struct locked_lba_range_ctx *ctx; 6380 struct lba_range *range; 6381 bool range_found = false; 6382 6383 /* Let's make sure the specified channel actually has a lock on 6384 * the specified range. Note that the range must match exactly. 6385 */ 6386 TAILQ_FOREACH(range, &ch->locked_ranges, tailq) { 6387 if (range->offset == offset && range->length == length && 6388 range->owner_ch == ch && range->locked_ctx == cb_arg) { 6389 range_found = true; 6390 break; 6391 } 6392 } 6393 6394 if (!range_found) { 6395 return -EINVAL; 6396 } 6397 6398 pthread_mutex_lock(&bdev->internal.mutex); 6399 /* We confirmed that this channel has locked the specified range. To 6400 * start the unlock the process, we find the range in the bdev's locked_ranges 6401 * and remove it. This ensures new channels don't inherit the locked range. 6402 * Then we will send a message to each channel (including the one specified 6403 * here) to remove the range from its per-channel list. 6404 */ 6405 TAILQ_FOREACH(range, &bdev->internal.locked_ranges, tailq) { 6406 if (range->offset == offset && range->length == length && 6407 range->locked_ctx == cb_arg) { 6408 break; 6409 } 6410 } 6411 if (range == NULL) { 6412 assert(false); 6413 pthread_mutex_unlock(&bdev->internal.mutex); 6414 return -EINVAL; 6415 } 6416 TAILQ_REMOVE(&bdev->internal.locked_ranges, range, tailq); 6417 ctx = SPDK_CONTAINEROF(range, struct locked_lba_range_ctx, range); 6418 pthread_mutex_unlock(&bdev->internal.mutex); 6419 6420 ctx->cb_fn = cb_fn; 6421 ctx->cb_arg = cb_arg; 6422 6423 spdk_for_each_channel(__bdev_to_io_dev(bdev), bdev_unlock_lba_range_get_channel, ctx, 6424 bdev_unlock_lba_range_cb); 6425 return 0; 6426 } 6427 6428 SPDK_LOG_REGISTER_COMPONENT("bdev", SPDK_LOG_BDEV) 6429 6430 SPDK_TRACE_REGISTER_FN(bdev_trace, "bdev", TRACE_GROUP_BDEV) 6431 { 6432 spdk_trace_register_owner(OWNER_BDEV, 'b'); 6433 spdk_trace_register_object(OBJECT_BDEV_IO, 'i'); 6434 spdk_trace_register_description("BDEV_IO_START", TRACE_BDEV_IO_START, OWNER_BDEV, 6435 OBJECT_BDEV_IO, 1, 0, "type: "); 6436 spdk_trace_register_description("BDEV_IO_DONE", TRACE_BDEV_IO_DONE, OWNER_BDEV, 6437 OBJECT_BDEV_IO, 0, 0, ""); 6438 } 6439