1 /*- 2 * BSD LICENSE 3 * 4 * Copyright (c) Intel Corporation. All rights reserved. 5 * Copyright (c) 2019 Mellanox Technologies LTD. All rights reserved. 6 * 7 * Redistribution and use in source and binary forms, with or without 8 * modification, are permitted provided that the following conditions 9 * are met: 10 * 11 * * Redistributions of source code must retain the above copyright 12 * notice, this list of conditions and the following disclaimer. 13 * * Redistributions in binary form must reproduce the above copyright 14 * notice, this list of conditions and the following disclaimer in 15 * the documentation and/or other materials provided with the 16 * distribution. 17 * * Neither the name of Intel Corporation nor the names of its 18 * contributors may be used to endorse or promote products derived 19 * from this software without specific prior written permission. 20 * 21 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 22 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 23 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR 24 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT 25 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, 26 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT 27 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 28 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY 29 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 30 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE 31 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 32 */ 33 34 #include "spdk/stdinc.h" 35 36 #include "spdk/bdev.h" 37 #include "spdk/conf.h" 38 39 #include "spdk/config.h" 40 #include "spdk/env.h" 41 #include "spdk/thread.h" 42 #include "spdk/likely.h" 43 #include "spdk/queue.h" 44 #include "spdk/nvme_spec.h" 45 #include "spdk/scsi_spec.h" 46 #include "spdk/notify.h" 47 #include "spdk/util.h" 48 #include "spdk/trace.h" 49 50 #include "spdk/bdev_module.h" 51 #include "spdk_internal/log.h" 52 #include "spdk/string.h" 53 54 #include "bdev_internal.h" 55 56 #ifdef SPDK_CONFIG_VTUNE 57 #include "ittnotify.h" 58 #include "ittnotify_types.h" 59 int __itt_init_ittlib(const char *, __itt_group_id); 60 #endif 61 62 #define SPDK_BDEV_IO_POOL_SIZE (64 * 1024 - 1) 63 #define SPDK_BDEV_IO_CACHE_SIZE 256 64 #define SPDK_BDEV_AUTO_EXAMINE true 65 #define BUF_SMALL_POOL_SIZE 8191 66 #define BUF_LARGE_POOL_SIZE 1023 67 #define NOMEM_THRESHOLD_COUNT 8 68 #define ZERO_BUFFER_SIZE 0x100000 69 70 #define OWNER_BDEV 0x2 71 72 #define OBJECT_BDEV_IO 0x2 73 74 #define TRACE_GROUP_BDEV 0x3 75 #define TRACE_BDEV_IO_START SPDK_TPOINT_ID(TRACE_GROUP_BDEV, 0x0) 76 #define TRACE_BDEV_IO_DONE SPDK_TPOINT_ID(TRACE_GROUP_BDEV, 0x1) 77 78 #define SPDK_BDEV_QOS_TIMESLICE_IN_USEC 1000 79 #define SPDK_BDEV_QOS_MIN_IO_PER_TIMESLICE 1 80 #define SPDK_BDEV_QOS_MIN_BYTE_PER_TIMESLICE 512 81 #define SPDK_BDEV_QOS_MIN_IOS_PER_SEC 1000 82 #define SPDK_BDEV_QOS_MIN_BYTES_PER_SEC (1024 * 1024) 83 #define SPDK_BDEV_QOS_LIMIT_NOT_DEFINED UINT64_MAX 84 #define SPDK_BDEV_IO_POLL_INTERVAL_IN_MSEC 1000 85 86 #define SPDK_BDEV_POOL_ALIGNMENT 512 87 88 static const char *qos_conf_type[] = {"Limit_IOPS", 89 "Limit_BPS", "Limit_Read_BPS", "Limit_Write_BPS" 90 }; 91 static const char *qos_rpc_type[] = {"rw_ios_per_sec", 92 "rw_mbytes_per_sec", "r_mbytes_per_sec", "w_mbytes_per_sec" 93 }; 94 95 TAILQ_HEAD(spdk_bdev_list, spdk_bdev); 96 97 struct spdk_bdev_mgr { 98 struct spdk_mempool *bdev_io_pool; 99 100 struct spdk_mempool *buf_small_pool; 101 struct spdk_mempool *buf_large_pool; 102 103 void *zero_buffer; 104 105 TAILQ_HEAD(bdev_module_list, spdk_bdev_module) bdev_modules; 106 107 struct spdk_bdev_list bdevs; 108 109 bool init_complete; 110 bool module_init_complete; 111 112 pthread_mutex_t mutex; 113 114 #ifdef SPDK_CONFIG_VTUNE 115 __itt_domain *domain; 116 #endif 117 }; 118 119 static struct spdk_bdev_mgr g_bdev_mgr = { 120 .bdev_modules = TAILQ_HEAD_INITIALIZER(g_bdev_mgr.bdev_modules), 121 .bdevs = TAILQ_HEAD_INITIALIZER(g_bdev_mgr.bdevs), 122 .init_complete = false, 123 .module_init_complete = false, 124 .mutex = PTHREAD_MUTEX_INITIALIZER, 125 }; 126 127 typedef void (*lock_range_cb)(void *ctx, int status); 128 129 struct lba_range { 130 uint64_t offset; 131 uint64_t length; 132 void *locked_ctx; 133 struct spdk_bdev_channel *owner_ch; 134 TAILQ_ENTRY(lba_range) tailq; 135 }; 136 137 static struct spdk_bdev_opts g_bdev_opts = { 138 .bdev_io_pool_size = SPDK_BDEV_IO_POOL_SIZE, 139 .bdev_io_cache_size = SPDK_BDEV_IO_CACHE_SIZE, 140 .bdev_auto_examine = SPDK_BDEV_AUTO_EXAMINE, 141 }; 142 143 static spdk_bdev_init_cb g_init_cb_fn = NULL; 144 static void *g_init_cb_arg = NULL; 145 146 static spdk_bdev_fini_cb g_fini_cb_fn = NULL; 147 static void *g_fini_cb_arg = NULL; 148 static struct spdk_thread *g_fini_thread = NULL; 149 150 struct spdk_bdev_qos_limit { 151 /** IOs or bytes allowed per second (i.e., 1s). */ 152 uint64_t limit; 153 154 /** Remaining IOs or bytes allowed in current timeslice (e.g., 1ms). 155 * For remaining bytes, allowed to run negative if an I/O is submitted when 156 * some bytes are remaining, but the I/O is bigger than that amount. The 157 * excess will be deducted from the next timeslice. 158 */ 159 int64_t remaining_this_timeslice; 160 161 /** Minimum allowed IOs or bytes to be issued in one timeslice (e.g., 1ms). */ 162 uint32_t min_per_timeslice; 163 164 /** Maximum allowed IOs or bytes to be issued in one timeslice (e.g., 1ms). */ 165 uint32_t max_per_timeslice; 166 167 /** Function to check whether to queue the IO. */ 168 bool (*queue_io)(const struct spdk_bdev_qos_limit *limit, struct spdk_bdev_io *io); 169 170 /** Function to update for the submitted IO. */ 171 void (*update_quota)(struct spdk_bdev_qos_limit *limit, struct spdk_bdev_io *io); 172 }; 173 174 struct spdk_bdev_qos { 175 /** Types of structure of rate limits. */ 176 struct spdk_bdev_qos_limit rate_limits[SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES]; 177 178 /** The channel that all I/O are funneled through. */ 179 struct spdk_bdev_channel *ch; 180 181 /** The thread on which the poller is running. */ 182 struct spdk_thread *thread; 183 184 /** Queue of I/O waiting to be issued. */ 185 bdev_io_tailq_t queued; 186 187 /** Size of a timeslice in tsc ticks. */ 188 uint64_t timeslice_size; 189 190 /** Timestamp of start of last timeslice. */ 191 uint64_t last_timeslice; 192 193 /** Poller that processes queued I/O commands each time slice. */ 194 struct spdk_poller *poller; 195 }; 196 197 struct spdk_bdev_mgmt_channel { 198 bdev_io_stailq_t need_buf_small; 199 bdev_io_stailq_t need_buf_large; 200 201 /* 202 * Each thread keeps a cache of bdev_io - this allows 203 * bdev threads which are *not* DPDK threads to still 204 * benefit from a per-thread bdev_io cache. Without 205 * this, non-DPDK threads fetching from the mempool 206 * incur a cmpxchg on get and put. 207 */ 208 bdev_io_stailq_t per_thread_cache; 209 uint32_t per_thread_cache_count; 210 uint32_t bdev_io_cache_size; 211 212 TAILQ_HEAD(, spdk_bdev_shared_resource) shared_resources; 213 TAILQ_HEAD(, spdk_bdev_io_wait_entry) io_wait_queue; 214 }; 215 216 /* 217 * Per-module (or per-io_device) data. Multiple bdevs built on the same io_device 218 * will queue here their IO that awaits retry. It makes it possible to retry sending 219 * IO to one bdev after IO from other bdev completes. 220 */ 221 struct spdk_bdev_shared_resource { 222 /* The bdev management channel */ 223 struct spdk_bdev_mgmt_channel *mgmt_ch; 224 225 /* 226 * Count of I/O submitted to bdev module and waiting for completion. 227 * Incremented before submit_request() is called on an spdk_bdev_io. 228 */ 229 uint64_t io_outstanding; 230 231 /* 232 * Queue of IO awaiting retry because of a previous NOMEM status returned 233 * on this channel. 234 */ 235 bdev_io_tailq_t nomem_io; 236 237 /* 238 * Threshold which io_outstanding must drop to before retrying nomem_io. 239 */ 240 uint64_t nomem_threshold; 241 242 /* I/O channel allocated by a bdev module */ 243 struct spdk_io_channel *shared_ch; 244 245 /* Refcount of bdev channels using this resource */ 246 uint32_t ref; 247 248 TAILQ_ENTRY(spdk_bdev_shared_resource) link; 249 }; 250 251 #define BDEV_CH_RESET_IN_PROGRESS (1 << 0) 252 #define BDEV_CH_QOS_ENABLED (1 << 1) 253 254 struct spdk_bdev_channel { 255 struct spdk_bdev *bdev; 256 257 /* The channel for the underlying device */ 258 struct spdk_io_channel *channel; 259 260 /* Per io_device per thread data */ 261 struct spdk_bdev_shared_resource *shared_resource; 262 263 struct spdk_bdev_io_stat stat; 264 265 /* 266 * Count of I/O submitted to the underlying dev module through this channel 267 * and waiting for completion. 268 */ 269 uint64_t io_outstanding; 270 271 /* 272 * List of all submitted I/Os including I/O that are generated via splitting. 273 */ 274 bdev_io_tailq_t io_submitted; 275 276 /* 277 * List of spdk_bdev_io that are currently queued because they write to a locked 278 * LBA range. 279 */ 280 bdev_io_tailq_t io_locked; 281 282 uint32_t flags; 283 284 struct spdk_histogram_data *histogram; 285 286 #ifdef SPDK_CONFIG_VTUNE 287 uint64_t start_tsc; 288 uint64_t interval_tsc; 289 __itt_string_handle *handle; 290 struct spdk_bdev_io_stat prev_stat; 291 #endif 292 293 bdev_io_tailq_t queued_resets; 294 295 lba_range_tailq_t locked_ranges; 296 }; 297 298 struct media_event_entry { 299 struct spdk_bdev_media_event event; 300 TAILQ_ENTRY(media_event_entry) tailq; 301 }; 302 303 #define MEDIA_EVENT_POOL_SIZE 64 304 305 struct spdk_bdev_desc { 306 struct spdk_bdev *bdev; 307 struct spdk_thread *thread; 308 struct { 309 bool open_with_ext; 310 union { 311 spdk_bdev_remove_cb_t remove_fn; 312 spdk_bdev_event_cb_t event_fn; 313 }; 314 void *ctx; 315 } callback; 316 bool closed; 317 bool write; 318 pthread_mutex_t mutex; 319 uint32_t refs; 320 TAILQ_HEAD(, media_event_entry) pending_media_events; 321 TAILQ_HEAD(, media_event_entry) free_media_events; 322 struct media_event_entry *media_events_buffer; 323 TAILQ_ENTRY(spdk_bdev_desc) link; 324 325 uint64_t timeout_in_sec; 326 spdk_bdev_io_timeout_cb cb_fn; 327 void *cb_arg; 328 struct spdk_poller *io_timeout_poller; 329 }; 330 331 struct spdk_bdev_iostat_ctx { 332 struct spdk_bdev_io_stat *stat; 333 spdk_bdev_get_device_stat_cb cb; 334 void *cb_arg; 335 }; 336 337 struct set_qos_limit_ctx { 338 void (*cb_fn)(void *cb_arg, int status); 339 void *cb_arg; 340 struct spdk_bdev *bdev; 341 }; 342 343 #define __bdev_to_io_dev(bdev) (((char *)bdev) + 1) 344 #define __bdev_from_io_dev(io_dev) ((struct spdk_bdev *)(((char *)io_dev) - 1)) 345 346 static void bdev_write_zero_buffer_done(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg); 347 static void bdev_write_zero_buffer_next(void *_bdev_io); 348 349 static void bdev_enable_qos_msg(struct spdk_io_channel_iter *i); 350 static void bdev_enable_qos_done(struct spdk_io_channel_iter *i, int status); 351 352 static int 353 bdev_readv_blocks_with_md(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 354 struct iovec *iov, int iovcnt, void *md_buf, uint64_t offset_blocks, 355 uint64_t num_blocks, spdk_bdev_io_completion_cb cb, void *cb_arg); 356 static int 357 bdev_writev_blocks_with_md(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 358 struct iovec *iov, int iovcnt, void *md_buf, 359 uint64_t offset_blocks, uint64_t num_blocks, 360 spdk_bdev_io_completion_cb cb, void *cb_arg); 361 362 static int 363 bdev_lock_lba_range(struct spdk_bdev_desc *desc, struct spdk_io_channel *_ch, 364 uint64_t offset, uint64_t length, 365 lock_range_cb cb_fn, void *cb_arg); 366 367 static int 368 bdev_unlock_lba_range(struct spdk_bdev_desc *desc, struct spdk_io_channel *_ch, 369 uint64_t offset, uint64_t length, 370 lock_range_cb cb_fn, void *cb_arg); 371 372 static inline void bdev_io_complete(void *ctx); 373 374 static bool bdev_abort_queued_io(bdev_io_tailq_t *queue, struct spdk_bdev_io *bio_to_abort); 375 static bool bdev_abort_buf_io(bdev_io_stailq_t *queue, struct spdk_bdev_io *bio_to_abort); 376 377 void 378 spdk_bdev_get_opts(struct spdk_bdev_opts *opts) 379 { 380 *opts = g_bdev_opts; 381 } 382 383 int 384 spdk_bdev_set_opts(struct spdk_bdev_opts *opts) 385 { 386 uint32_t min_pool_size; 387 388 /* 389 * Add 1 to the thread count to account for the extra mgmt_ch that gets created during subsystem 390 * initialization. A second mgmt_ch will be created on the same thread when the application starts 391 * but before the deferred put_io_channel event is executed for the first mgmt_ch. 392 */ 393 min_pool_size = opts->bdev_io_cache_size * (spdk_thread_get_count() + 1); 394 if (opts->bdev_io_pool_size < min_pool_size) { 395 SPDK_ERRLOG("bdev_io_pool_size %" PRIu32 " is not compatible with bdev_io_cache_size %" PRIu32 396 " and %" PRIu32 " threads\n", opts->bdev_io_pool_size, opts->bdev_io_cache_size, 397 spdk_thread_get_count()); 398 SPDK_ERRLOG("bdev_io_pool_size must be at least %" PRIu32 "\n", min_pool_size); 399 return -1; 400 } 401 402 g_bdev_opts = *opts; 403 return 0; 404 } 405 406 /* 407 * Will implement the whitelist in the furture 408 */ 409 static inline bool 410 bdev_in_examine_whitelist(struct spdk_bdev *bdev) 411 { 412 return false; 413 } 414 415 static inline bool 416 bdev_ok_to_examine(struct spdk_bdev *bdev) 417 { 418 if (g_bdev_opts.bdev_auto_examine) { 419 return true; 420 } else { 421 return bdev_in_examine_whitelist(bdev); 422 } 423 } 424 425 struct spdk_bdev * 426 spdk_bdev_first(void) 427 { 428 struct spdk_bdev *bdev; 429 430 bdev = TAILQ_FIRST(&g_bdev_mgr.bdevs); 431 if (bdev) { 432 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Starting bdev iteration at %s\n", bdev->name); 433 } 434 435 return bdev; 436 } 437 438 struct spdk_bdev * 439 spdk_bdev_next(struct spdk_bdev *prev) 440 { 441 struct spdk_bdev *bdev; 442 443 bdev = TAILQ_NEXT(prev, internal.link); 444 if (bdev) { 445 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Continuing bdev iteration at %s\n", bdev->name); 446 } 447 448 return bdev; 449 } 450 451 static struct spdk_bdev * 452 _bdev_next_leaf(struct spdk_bdev *bdev) 453 { 454 while (bdev != NULL) { 455 if (bdev->internal.claim_module == NULL) { 456 return bdev; 457 } else { 458 bdev = TAILQ_NEXT(bdev, internal.link); 459 } 460 } 461 462 return bdev; 463 } 464 465 struct spdk_bdev * 466 spdk_bdev_first_leaf(void) 467 { 468 struct spdk_bdev *bdev; 469 470 bdev = _bdev_next_leaf(TAILQ_FIRST(&g_bdev_mgr.bdevs)); 471 472 if (bdev) { 473 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Starting bdev iteration at %s\n", bdev->name); 474 } 475 476 return bdev; 477 } 478 479 struct spdk_bdev * 480 spdk_bdev_next_leaf(struct spdk_bdev *prev) 481 { 482 struct spdk_bdev *bdev; 483 484 bdev = _bdev_next_leaf(TAILQ_NEXT(prev, internal.link)); 485 486 if (bdev) { 487 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Continuing bdev iteration at %s\n", bdev->name); 488 } 489 490 return bdev; 491 } 492 493 struct spdk_bdev * 494 spdk_bdev_get_by_name(const char *bdev_name) 495 { 496 struct spdk_bdev_alias *tmp; 497 struct spdk_bdev *bdev = spdk_bdev_first(); 498 499 while (bdev != NULL) { 500 if (strcmp(bdev_name, bdev->name) == 0) { 501 return bdev; 502 } 503 504 TAILQ_FOREACH(tmp, &bdev->aliases, tailq) { 505 if (strcmp(bdev_name, tmp->alias) == 0) { 506 return bdev; 507 } 508 } 509 510 bdev = spdk_bdev_next(bdev); 511 } 512 513 return NULL; 514 } 515 516 void 517 spdk_bdev_io_set_buf(struct spdk_bdev_io *bdev_io, void *buf, size_t len) 518 { 519 struct iovec *iovs; 520 521 if (bdev_io->u.bdev.iovs == NULL) { 522 bdev_io->u.bdev.iovs = &bdev_io->iov; 523 bdev_io->u.bdev.iovcnt = 1; 524 } 525 526 iovs = bdev_io->u.bdev.iovs; 527 528 assert(iovs != NULL); 529 assert(bdev_io->u.bdev.iovcnt >= 1); 530 531 iovs[0].iov_base = buf; 532 iovs[0].iov_len = len; 533 } 534 535 void 536 spdk_bdev_io_set_md_buf(struct spdk_bdev_io *bdev_io, void *md_buf, size_t len) 537 { 538 assert((len / spdk_bdev_get_md_size(bdev_io->bdev)) >= bdev_io->u.bdev.num_blocks); 539 bdev_io->u.bdev.md_buf = md_buf; 540 } 541 542 static bool 543 _is_buf_allocated(const struct iovec *iovs) 544 { 545 if (iovs == NULL) { 546 return false; 547 } 548 549 return iovs[0].iov_base != NULL; 550 } 551 552 static bool 553 _are_iovs_aligned(struct iovec *iovs, int iovcnt, uint32_t alignment) 554 { 555 int i; 556 uintptr_t iov_base; 557 558 if (spdk_likely(alignment == 1)) { 559 return true; 560 } 561 562 for (i = 0; i < iovcnt; i++) { 563 iov_base = (uintptr_t)iovs[i].iov_base; 564 if ((iov_base & (alignment - 1)) != 0) { 565 return false; 566 } 567 } 568 569 return true; 570 } 571 572 static void 573 _copy_iovs_to_buf(void *buf, size_t buf_len, struct iovec *iovs, int iovcnt) 574 { 575 int i; 576 size_t len; 577 578 for (i = 0; i < iovcnt; i++) { 579 len = spdk_min(iovs[i].iov_len, buf_len); 580 memcpy(buf, iovs[i].iov_base, len); 581 buf += len; 582 buf_len -= len; 583 } 584 } 585 586 static void 587 _copy_buf_to_iovs(struct iovec *iovs, int iovcnt, void *buf, size_t buf_len) 588 { 589 int i; 590 size_t len; 591 592 for (i = 0; i < iovcnt; i++) { 593 len = spdk_min(iovs[i].iov_len, buf_len); 594 memcpy(iovs[i].iov_base, buf, len); 595 buf += len; 596 buf_len -= len; 597 } 598 } 599 600 static void 601 _bdev_io_set_bounce_buf(struct spdk_bdev_io *bdev_io, void *buf, size_t len) 602 { 603 /* save original iovec */ 604 bdev_io->internal.orig_iovs = bdev_io->u.bdev.iovs; 605 bdev_io->internal.orig_iovcnt = bdev_io->u.bdev.iovcnt; 606 /* set bounce iov */ 607 bdev_io->u.bdev.iovs = &bdev_io->internal.bounce_iov; 608 bdev_io->u.bdev.iovcnt = 1; 609 /* set bounce buffer for this operation */ 610 bdev_io->u.bdev.iovs[0].iov_base = buf; 611 bdev_io->u.bdev.iovs[0].iov_len = len; 612 /* if this is write path, copy data from original buffer to bounce buffer */ 613 if (bdev_io->type == SPDK_BDEV_IO_TYPE_WRITE) { 614 _copy_iovs_to_buf(buf, len, bdev_io->internal.orig_iovs, bdev_io->internal.orig_iovcnt); 615 } 616 } 617 618 static void 619 _bdev_io_set_bounce_md_buf(struct spdk_bdev_io *bdev_io, void *md_buf, size_t len) 620 { 621 /* save original md_buf */ 622 bdev_io->internal.orig_md_buf = bdev_io->u.bdev.md_buf; 623 /* set bounce md_buf */ 624 bdev_io->u.bdev.md_buf = md_buf; 625 626 if (bdev_io->type == SPDK_BDEV_IO_TYPE_WRITE) { 627 memcpy(md_buf, bdev_io->internal.orig_md_buf, len); 628 } 629 } 630 631 static void 632 bdev_io_get_buf_complete(struct spdk_bdev_io *bdev_io, void *buf, bool status) 633 { 634 struct spdk_io_channel *ch = spdk_bdev_io_get_io_channel(bdev_io); 635 636 if (spdk_unlikely(bdev_io->internal.get_aux_buf_cb != NULL)) { 637 bdev_io->internal.get_aux_buf_cb(ch, bdev_io, buf); 638 bdev_io->internal.get_aux_buf_cb = NULL; 639 } else { 640 assert(bdev_io->internal.get_buf_cb != NULL); 641 bdev_io->internal.buf = buf; 642 bdev_io->internal.get_buf_cb(ch, bdev_io, status); 643 bdev_io->internal.get_buf_cb = NULL; 644 } 645 } 646 647 static void 648 _bdev_io_set_buf(struct spdk_bdev_io *bdev_io, void *buf, uint64_t len) 649 { 650 struct spdk_bdev *bdev = bdev_io->bdev; 651 bool buf_allocated; 652 uint64_t md_len, alignment; 653 void *aligned_buf; 654 655 if (spdk_unlikely(bdev_io->internal.get_aux_buf_cb != NULL)) { 656 bdev_io_get_buf_complete(bdev_io, buf, true); 657 return; 658 } 659 660 alignment = spdk_bdev_get_buf_align(bdev); 661 buf_allocated = _is_buf_allocated(bdev_io->u.bdev.iovs); 662 aligned_buf = (void *)(((uintptr_t)buf + (alignment - 1)) & ~(alignment - 1)); 663 664 if (buf_allocated) { 665 _bdev_io_set_bounce_buf(bdev_io, aligned_buf, len); 666 } else { 667 spdk_bdev_io_set_buf(bdev_io, aligned_buf, len); 668 } 669 670 if (spdk_bdev_is_md_separate(bdev)) { 671 aligned_buf = (char *)aligned_buf + len; 672 md_len = bdev_io->u.bdev.num_blocks * bdev->md_len; 673 674 assert(((uintptr_t)aligned_buf & (alignment - 1)) == 0); 675 676 if (bdev_io->u.bdev.md_buf != NULL) { 677 _bdev_io_set_bounce_md_buf(bdev_io, aligned_buf, md_len); 678 } else { 679 spdk_bdev_io_set_md_buf(bdev_io, aligned_buf, md_len); 680 } 681 } 682 bdev_io_get_buf_complete(bdev_io, buf, true); 683 } 684 685 static void 686 _bdev_io_put_buf(struct spdk_bdev_io *bdev_io, void *buf, uint64_t buf_len) 687 { 688 struct spdk_bdev *bdev = bdev_io->bdev; 689 struct spdk_mempool *pool; 690 struct spdk_bdev_io *tmp; 691 bdev_io_stailq_t *stailq; 692 struct spdk_bdev_mgmt_channel *ch; 693 uint64_t md_len, alignment; 694 695 md_len = spdk_bdev_is_md_separate(bdev) ? bdev_io->u.bdev.num_blocks * bdev->md_len : 0; 696 alignment = spdk_bdev_get_buf_align(bdev); 697 ch = bdev_io->internal.ch->shared_resource->mgmt_ch; 698 699 if (buf_len + alignment + md_len <= SPDK_BDEV_BUF_SIZE_WITH_MD(SPDK_BDEV_SMALL_BUF_MAX_SIZE) + 700 SPDK_BDEV_POOL_ALIGNMENT) { 701 pool = g_bdev_mgr.buf_small_pool; 702 stailq = &ch->need_buf_small; 703 } else { 704 pool = g_bdev_mgr.buf_large_pool; 705 stailq = &ch->need_buf_large; 706 } 707 708 if (STAILQ_EMPTY(stailq)) { 709 spdk_mempool_put(pool, buf); 710 } else { 711 tmp = STAILQ_FIRST(stailq); 712 STAILQ_REMOVE_HEAD(stailq, internal.buf_link); 713 _bdev_io_set_buf(tmp, buf, tmp->internal.buf_len); 714 } 715 } 716 717 static void 718 bdev_io_put_buf(struct spdk_bdev_io *bdev_io) 719 { 720 assert(bdev_io->internal.buf != NULL); 721 _bdev_io_put_buf(bdev_io, bdev_io->internal.buf, bdev_io->internal.buf_len); 722 bdev_io->internal.buf = NULL; 723 } 724 725 void 726 spdk_bdev_io_put_aux_buf(struct spdk_bdev_io *bdev_io, void *buf) 727 { 728 uint64_t len = bdev_io->u.bdev.num_blocks * bdev_io->bdev->blocklen; 729 730 assert(buf != NULL); 731 _bdev_io_put_buf(bdev_io, buf, len); 732 } 733 734 static void 735 _bdev_io_unset_bounce_buf(struct spdk_bdev_io *bdev_io) 736 { 737 if (spdk_likely(bdev_io->internal.orig_iovcnt == 0)) { 738 assert(bdev_io->internal.orig_md_buf == NULL); 739 return; 740 } 741 742 /* if this is read path, copy data from bounce buffer to original buffer */ 743 if (bdev_io->type == SPDK_BDEV_IO_TYPE_READ && 744 bdev_io->internal.status == SPDK_BDEV_IO_STATUS_SUCCESS) { 745 _copy_buf_to_iovs(bdev_io->internal.orig_iovs, 746 bdev_io->internal.orig_iovcnt, 747 bdev_io->internal.bounce_iov.iov_base, 748 bdev_io->internal.bounce_iov.iov_len); 749 } 750 /* set original buffer for this io */ 751 bdev_io->u.bdev.iovcnt = bdev_io->internal.orig_iovcnt; 752 bdev_io->u.bdev.iovs = bdev_io->internal.orig_iovs; 753 /* disable bouncing buffer for this io */ 754 bdev_io->internal.orig_iovcnt = 0; 755 bdev_io->internal.orig_iovs = NULL; 756 757 /* do the same for metadata buffer */ 758 if (spdk_unlikely(bdev_io->internal.orig_md_buf != NULL)) { 759 assert(spdk_bdev_is_md_separate(bdev_io->bdev)); 760 761 if (bdev_io->type == SPDK_BDEV_IO_TYPE_READ && 762 bdev_io->internal.status == SPDK_BDEV_IO_STATUS_SUCCESS) { 763 memcpy(bdev_io->internal.orig_md_buf, bdev_io->u.bdev.md_buf, 764 bdev_io->u.bdev.num_blocks * spdk_bdev_get_md_size(bdev_io->bdev)); 765 } 766 767 bdev_io->u.bdev.md_buf = bdev_io->internal.orig_md_buf; 768 bdev_io->internal.orig_md_buf = NULL; 769 } 770 771 /* We want to free the bounce buffer here since we know we're done with it (as opposed 772 * to waiting for the conditional free of internal.buf in spdk_bdev_free_io()). 773 */ 774 bdev_io_put_buf(bdev_io); 775 } 776 777 static void 778 bdev_io_get_buf(struct spdk_bdev_io *bdev_io, uint64_t len) 779 { 780 struct spdk_bdev *bdev = bdev_io->bdev; 781 struct spdk_mempool *pool; 782 bdev_io_stailq_t *stailq; 783 struct spdk_bdev_mgmt_channel *mgmt_ch; 784 uint64_t alignment, md_len; 785 void *buf; 786 787 alignment = spdk_bdev_get_buf_align(bdev); 788 md_len = spdk_bdev_is_md_separate(bdev) ? bdev_io->u.bdev.num_blocks * bdev->md_len : 0; 789 790 if (len + alignment + md_len > SPDK_BDEV_BUF_SIZE_WITH_MD(SPDK_BDEV_LARGE_BUF_MAX_SIZE) + 791 SPDK_BDEV_POOL_ALIGNMENT) { 792 SPDK_ERRLOG("Length + alignment %" PRIu64 " is larger than allowed\n", 793 len + alignment); 794 bdev_io_get_buf_complete(bdev_io, NULL, false); 795 return; 796 } 797 798 mgmt_ch = bdev_io->internal.ch->shared_resource->mgmt_ch; 799 800 bdev_io->internal.buf_len = len; 801 802 if (len + alignment + md_len <= SPDK_BDEV_BUF_SIZE_WITH_MD(SPDK_BDEV_SMALL_BUF_MAX_SIZE) + 803 SPDK_BDEV_POOL_ALIGNMENT) { 804 pool = g_bdev_mgr.buf_small_pool; 805 stailq = &mgmt_ch->need_buf_small; 806 } else { 807 pool = g_bdev_mgr.buf_large_pool; 808 stailq = &mgmt_ch->need_buf_large; 809 } 810 811 buf = spdk_mempool_get(pool); 812 if (!buf) { 813 STAILQ_INSERT_TAIL(stailq, bdev_io, internal.buf_link); 814 } else { 815 _bdev_io_set_buf(bdev_io, buf, len); 816 } 817 } 818 819 void 820 spdk_bdev_io_get_buf(struct spdk_bdev_io *bdev_io, spdk_bdev_io_get_buf_cb cb, uint64_t len) 821 { 822 struct spdk_bdev *bdev = bdev_io->bdev; 823 uint64_t alignment; 824 825 assert(cb != NULL); 826 bdev_io->internal.get_buf_cb = cb; 827 828 alignment = spdk_bdev_get_buf_align(bdev); 829 830 if (_is_buf_allocated(bdev_io->u.bdev.iovs) && 831 _are_iovs_aligned(bdev_io->u.bdev.iovs, bdev_io->u.bdev.iovcnt, alignment)) { 832 /* Buffer already present and aligned */ 833 cb(spdk_bdev_io_get_io_channel(bdev_io), bdev_io, true); 834 return; 835 } 836 837 bdev_io_get_buf(bdev_io, len); 838 } 839 840 void 841 spdk_bdev_io_get_aux_buf(struct spdk_bdev_io *bdev_io, spdk_bdev_io_get_aux_buf_cb cb) 842 { 843 uint64_t len = bdev_io->u.bdev.num_blocks * bdev_io->bdev->blocklen; 844 845 assert(cb != NULL); 846 assert(bdev_io->internal.get_aux_buf_cb == NULL); 847 bdev_io->internal.get_aux_buf_cb = cb; 848 bdev_io_get_buf(bdev_io, len); 849 } 850 851 static int 852 bdev_module_get_max_ctx_size(void) 853 { 854 struct spdk_bdev_module *bdev_module; 855 int max_bdev_module_size = 0; 856 857 TAILQ_FOREACH(bdev_module, &g_bdev_mgr.bdev_modules, internal.tailq) { 858 if (bdev_module->get_ctx_size && bdev_module->get_ctx_size() > max_bdev_module_size) { 859 max_bdev_module_size = bdev_module->get_ctx_size(); 860 } 861 } 862 863 return max_bdev_module_size; 864 } 865 866 void 867 spdk_bdev_config_text(FILE *fp) 868 { 869 struct spdk_bdev_module *bdev_module; 870 871 TAILQ_FOREACH(bdev_module, &g_bdev_mgr.bdev_modules, internal.tailq) { 872 if (bdev_module->config_text) { 873 bdev_module->config_text(fp); 874 } 875 } 876 } 877 878 static void 879 bdev_qos_config_json(struct spdk_bdev *bdev, struct spdk_json_write_ctx *w) 880 { 881 int i; 882 struct spdk_bdev_qos *qos = bdev->internal.qos; 883 uint64_t limits[SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES]; 884 885 if (!qos) { 886 return; 887 } 888 889 spdk_bdev_get_qos_rate_limits(bdev, limits); 890 891 spdk_json_write_object_begin(w); 892 spdk_json_write_named_string(w, "method", "bdev_set_qos_limit"); 893 894 spdk_json_write_named_object_begin(w, "params"); 895 spdk_json_write_named_string(w, "name", bdev->name); 896 for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) { 897 if (limits[i] > 0) { 898 spdk_json_write_named_uint64(w, qos_rpc_type[i], limits[i]); 899 } 900 } 901 spdk_json_write_object_end(w); 902 903 spdk_json_write_object_end(w); 904 } 905 906 void 907 spdk_bdev_subsystem_config_json(struct spdk_json_write_ctx *w) 908 { 909 struct spdk_bdev_module *bdev_module; 910 struct spdk_bdev *bdev; 911 912 assert(w != NULL); 913 914 spdk_json_write_array_begin(w); 915 916 spdk_json_write_object_begin(w); 917 spdk_json_write_named_string(w, "method", "bdev_set_options"); 918 spdk_json_write_named_object_begin(w, "params"); 919 spdk_json_write_named_uint32(w, "bdev_io_pool_size", g_bdev_opts.bdev_io_pool_size); 920 spdk_json_write_named_uint32(w, "bdev_io_cache_size", g_bdev_opts.bdev_io_cache_size); 921 spdk_json_write_named_bool(w, "bdev_auto_examine", g_bdev_opts.bdev_auto_examine); 922 spdk_json_write_object_end(w); 923 spdk_json_write_object_end(w); 924 925 TAILQ_FOREACH(bdev_module, &g_bdev_mgr.bdev_modules, internal.tailq) { 926 if (bdev_module->config_json) { 927 bdev_module->config_json(w); 928 } 929 } 930 931 pthread_mutex_lock(&g_bdev_mgr.mutex); 932 933 TAILQ_FOREACH(bdev, &g_bdev_mgr.bdevs, internal.link) { 934 if (bdev->fn_table->write_config_json) { 935 bdev->fn_table->write_config_json(bdev, w); 936 } 937 938 bdev_qos_config_json(bdev, w); 939 } 940 941 pthread_mutex_unlock(&g_bdev_mgr.mutex); 942 943 spdk_json_write_array_end(w); 944 } 945 946 static int 947 bdev_mgmt_channel_create(void *io_device, void *ctx_buf) 948 { 949 struct spdk_bdev_mgmt_channel *ch = ctx_buf; 950 struct spdk_bdev_io *bdev_io; 951 uint32_t i; 952 953 STAILQ_INIT(&ch->need_buf_small); 954 STAILQ_INIT(&ch->need_buf_large); 955 956 STAILQ_INIT(&ch->per_thread_cache); 957 ch->bdev_io_cache_size = g_bdev_opts.bdev_io_cache_size; 958 959 /* Pre-populate bdev_io cache to ensure this thread cannot be starved. */ 960 ch->per_thread_cache_count = 0; 961 for (i = 0; i < ch->bdev_io_cache_size; i++) { 962 bdev_io = spdk_mempool_get(g_bdev_mgr.bdev_io_pool); 963 assert(bdev_io != NULL); 964 ch->per_thread_cache_count++; 965 STAILQ_INSERT_HEAD(&ch->per_thread_cache, bdev_io, internal.buf_link); 966 } 967 968 TAILQ_INIT(&ch->shared_resources); 969 TAILQ_INIT(&ch->io_wait_queue); 970 971 return 0; 972 } 973 974 static void 975 bdev_mgmt_channel_destroy(void *io_device, void *ctx_buf) 976 { 977 struct spdk_bdev_mgmt_channel *ch = ctx_buf; 978 struct spdk_bdev_io *bdev_io; 979 980 if (!STAILQ_EMPTY(&ch->need_buf_small) || !STAILQ_EMPTY(&ch->need_buf_large)) { 981 SPDK_ERRLOG("Pending I/O list wasn't empty on mgmt channel free\n"); 982 } 983 984 if (!TAILQ_EMPTY(&ch->shared_resources)) { 985 SPDK_ERRLOG("Module channel list wasn't empty on mgmt channel free\n"); 986 } 987 988 while (!STAILQ_EMPTY(&ch->per_thread_cache)) { 989 bdev_io = STAILQ_FIRST(&ch->per_thread_cache); 990 STAILQ_REMOVE_HEAD(&ch->per_thread_cache, internal.buf_link); 991 ch->per_thread_cache_count--; 992 spdk_mempool_put(g_bdev_mgr.bdev_io_pool, (void *)bdev_io); 993 } 994 995 assert(ch->per_thread_cache_count == 0); 996 } 997 998 static void 999 bdev_init_complete(int rc) 1000 { 1001 spdk_bdev_init_cb cb_fn = g_init_cb_fn; 1002 void *cb_arg = g_init_cb_arg; 1003 struct spdk_bdev_module *m; 1004 1005 g_bdev_mgr.init_complete = true; 1006 g_init_cb_fn = NULL; 1007 g_init_cb_arg = NULL; 1008 1009 /* 1010 * For modules that need to know when subsystem init is complete, 1011 * inform them now. 1012 */ 1013 if (rc == 0) { 1014 TAILQ_FOREACH(m, &g_bdev_mgr.bdev_modules, internal.tailq) { 1015 if (m->init_complete) { 1016 m->init_complete(); 1017 } 1018 } 1019 } 1020 1021 cb_fn(cb_arg, rc); 1022 } 1023 1024 static void 1025 bdev_module_action_complete(void) 1026 { 1027 struct spdk_bdev_module *m; 1028 1029 /* 1030 * Don't finish bdev subsystem initialization if 1031 * module pre-initialization is still in progress, or 1032 * the subsystem been already initialized. 1033 */ 1034 if (!g_bdev_mgr.module_init_complete || g_bdev_mgr.init_complete) { 1035 return; 1036 } 1037 1038 /* 1039 * Check all bdev modules for inits/examinations in progress. If any 1040 * exist, return immediately since we cannot finish bdev subsystem 1041 * initialization until all are completed. 1042 */ 1043 TAILQ_FOREACH(m, &g_bdev_mgr.bdev_modules, internal.tailq) { 1044 if (m->internal.action_in_progress > 0) { 1045 return; 1046 } 1047 } 1048 1049 /* 1050 * Modules already finished initialization - now that all 1051 * the bdev modules have finished their asynchronous I/O 1052 * processing, the entire bdev layer can be marked as complete. 1053 */ 1054 bdev_init_complete(0); 1055 } 1056 1057 static void 1058 bdev_module_action_done(struct spdk_bdev_module *module) 1059 { 1060 assert(module->internal.action_in_progress > 0); 1061 module->internal.action_in_progress--; 1062 bdev_module_action_complete(); 1063 } 1064 1065 void 1066 spdk_bdev_module_init_done(struct spdk_bdev_module *module) 1067 { 1068 bdev_module_action_done(module); 1069 } 1070 1071 void 1072 spdk_bdev_module_examine_done(struct spdk_bdev_module *module) 1073 { 1074 bdev_module_action_done(module); 1075 } 1076 1077 /** The last initialized bdev module */ 1078 static struct spdk_bdev_module *g_resume_bdev_module = NULL; 1079 1080 static void 1081 bdev_init_failed(void *cb_arg) 1082 { 1083 struct spdk_bdev_module *module = cb_arg; 1084 1085 module->internal.action_in_progress--; 1086 bdev_init_complete(-1); 1087 } 1088 1089 static int 1090 bdev_modules_init(void) 1091 { 1092 struct spdk_bdev_module *module; 1093 int rc = 0; 1094 1095 TAILQ_FOREACH(module, &g_bdev_mgr.bdev_modules, internal.tailq) { 1096 g_resume_bdev_module = module; 1097 if (module->async_init) { 1098 module->internal.action_in_progress = 1; 1099 } 1100 rc = module->module_init(); 1101 if (rc != 0) { 1102 /* Bump action_in_progress to prevent other modules from completion of modules_init 1103 * Send message to defer application shutdown until resources are cleaned up */ 1104 module->internal.action_in_progress = 1; 1105 spdk_thread_send_msg(spdk_get_thread(), bdev_init_failed, module); 1106 return rc; 1107 } 1108 } 1109 1110 g_resume_bdev_module = NULL; 1111 return 0; 1112 } 1113 1114 void 1115 spdk_bdev_initialize(spdk_bdev_init_cb cb_fn, void *cb_arg) 1116 { 1117 struct spdk_conf_section *sp; 1118 struct spdk_bdev_opts bdev_opts; 1119 int32_t bdev_io_pool_size, bdev_io_cache_size; 1120 int cache_size; 1121 int rc = 0; 1122 char mempool_name[32]; 1123 1124 assert(cb_fn != NULL); 1125 1126 sp = spdk_conf_find_section(NULL, "Bdev"); 1127 if (sp != NULL) { 1128 spdk_bdev_get_opts(&bdev_opts); 1129 1130 bdev_io_pool_size = spdk_conf_section_get_intval(sp, "BdevIoPoolSize"); 1131 if (bdev_io_pool_size >= 0) { 1132 bdev_opts.bdev_io_pool_size = bdev_io_pool_size; 1133 } 1134 1135 bdev_io_cache_size = spdk_conf_section_get_intval(sp, "BdevIoCacheSize"); 1136 if (bdev_io_cache_size >= 0) { 1137 bdev_opts.bdev_io_cache_size = bdev_io_cache_size; 1138 } 1139 1140 if (spdk_bdev_set_opts(&bdev_opts)) { 1141 bdev_init_complete(-1); 1142 return; 1143 } 1144 1145 assert(memcmp(&bdev_opts, &g_bdev_opts, sizeof(bdev_opts)) == 0); 1146 } 1147 1148 g_init_cb_fn = cb_fn; 1149 g_init_cb_arg = cb_arg; 1150 1151 spdk_notify_type_register("bdev_register"); 1152 spdk_notify_type_register("bdev_unregister"); 1153 1154 snprintf(mempool_name, sizeof(mempool_name), "bdev_io_%d", getpid()); 1155 1156 g_bdev_mgr.bdev_io_pool = spdk_mempool_create(mempool_name, 1157 g_bdev_opts.bdev_io_pool_size, 1158 sizeof(struct spdk_bdev_io) + 1159 bdev_module_get_max_ctx_size(), 1160 0, 1161 SPDK_ENV_SOCKET_ID_ANY); 1162 1163 if (g_bdev_mgr.bdev_io_pool == NULL) { 1164 SPDK_ERRLOG("could not allocate spdk_bdev_io pool\n"); 1165 bdev_init_complete(-1); 1166 return; 1167 } 1168 1169 /** 1170 * Ensure no more than half of the total buffers end up local caches, by 1171 * using spdk_env_get_core_count() to determine how many local caches we need 1172 * to account for. 1173 */ 1174 cache_size = BUF_SMALL_POOL_SIZE / (2 * spdk_env_get_core_count()); 1175 snprintf(mempool_name, sizeof(mempool_name), "buf_small_pool_%d", getpid()); 1176 1177 g_bdev_mgr.buf_small_pool = spdk_mempool_create(mempool_name, 1178 BUF_SMALL_POOL_SIZE, 1179 SPDK_BDEV_BUF_SIZE_WITH_MD(SPDK_BDEV_SMALL_BUF_MAX_SIZE) + 1180 SPDK_BDEV_POOL_ALIGNMENT, 1181 cache_size, 1182 SPDK_ENV_SOCKET_ID_ANY); 1183 if (!g_bdev_mgr.buf_small_pool) { 1184 SPDK_ERRLOG("create rbuf small pool failed\n"); 1185 bdev_init_complete(-1); 1186 return; 1187 } 1188 1189 cache_size = BUF_LARGE_POOL_SIZE / (2 * spdk_env_get_core_count()); 1190 snprintf(mempool_name, sizeof(mempool_name), "buf_large_pool_%d", getpid()); 1191 1192 g_bdev_mgr.buf_large_pool = spdk_mempool_create(mempool_name, 1193 BUF_LARGE_POOL_SIZE, 1194 SPDK_BDEV_BUF_SIZE_WITH_MD(SPDK_BDEV_LARGE_BUF_MAX_SIZE) + 1195 SPDK_BDEV_POOL_ALIGNMENT, 1196 cache_size, 1197 SPDK_ENV_SOCKET_ID_ANY); 1198 if (!g_bdev_mgr.buf_large_pool) { 1199 SPDK_ERRLOG("create rbuf large pool failed\n"); 1200 bdev_init_complete(-1); 1201 return; 1202 } 1203 1204 g_bdev_mgr.zero_buffer = spdk_zmalloc(ZERO_BUFFER_SIZE, ZERO_BUFFER_SIZE, 1205 NULL, SPDK_ENV_LCORE_ID_ANY, SPDK_MALLOC_DMA); 1206 if (!g_bdev_mgr.zero_buffer) { 1207 SPDK_ERRLOG("create bdev zero buffer failed\n"); 1208 bdev_init_complete(-1); 1209 return; 1210 } 1211 1212 #ifdef SPDK_CONFIG_VTUNE 1213 g_bdev_mgr.domain = __itt_domain_create("spdk_bdev"); 1214 #endif 1215 1216 spdk_io_device_register(&g_bdev_mgr, bdev_mgmt_channel_create, 1217 bdev_mgmt_channel_destroy, 1218 sizeof(struct spdk_bdev_mgmt_channel), 1219 "bdev_mgr"); 1220 1221 rc = bdev_modules_init(); 1222 g_bdev_mgr.module_init_complete = true; 1223 if (rc != 0) { 1224 SPDK_ERRLOG("bdev modules init failed\n"); 1225 return; 1226 } 1227 1228 bdev_module_action_complete(); 1229 } 1230 1231 static void 1232 bdev_mgr_unregister_cb(void *io_device) 1233 { 1234 spdk_bdev_fini_cb cb_fn = g_fini_cb_fn; 1235 1236 if (g_bdev_mgr.bdev_io_pool) { 1237 if (spdk_mempool_count(g_bdev_mgr.bdev_io_pool) != g_bdev_opts.bdev_io_pool_size) { 1238 SPDK_ERRLOG("bdev IO pool count is %zu but should be %u\n", 1239 spdk_mempool_count(g_bdev_mgr.bdev_io_pool), 1240 g_bdev_opts.bdev_io_pool_size); 1241 } 1242 1243 spdk_mempool_free(g_bdev_mgr.bdev_io_pool); 1244 } 1245 1246 if (g_bdev_mgr.buf_small_pool) { 1247 if (spdk_mempool_count(g_bdev_mgr.buf_small_pool) != BUF_SMALL_POOL_SIZE) { 1248 SPDK_ERRLOG("Small buffer pool count is %zu but should be %u\n", 1249 spdk_mempool_count(g_bdev_mgr.buf_small_pool), 1250 BUF_SMALL_POOL_SIZE); 1251 assert(false); 1252 } 1253 1254 spdk_mempool_free(g_bdev_mgr.buf_small_pool); 1255 } 1256 1257 if (g_bdev_mgr.buf_large_pool) { 1258 if (spdk_mempool_count(g_bdev_mgr.buf_large_pool) != BUF_LARGE_POOL_SIZE) { 1259 SPDK_ERRLOG("Large buffer pool count is %zu but should be %u\n", 1260 spdk_mempool_count(g_bdev_mgr.buf_large_pool), 1261 BUF_LARGE_POOL_SIZE); 1262 assert(false); 1263 } 1264 1265 spdk_mempool_free(g_bdev_mgr.buf_large_pool); 1266 } 1267 1268 spdk_free(g_bdev_mgr.zero_buffer); 1269 1270 cb_fn(g_fini_cb_arg); 1271 g_fini_cb_fn = NULL; 1272 g_fini_cb_arg = NULL; 1273 g_bdev_mgr.init_complete = false; 1274 g_bdev_mgr.module_init_complete = false; 1275 pthread_mutex_destroy(&g_bdev_mgr.mutex); 1276 } 1277 1278 static void 1279 bdev_module_finish_iter(void *arg) 1280 { 1281 struct spdk_bdev_module *bdev_module; 1282 1283 /* FIXME: Handling initialization failures is broken now, 1284 * so we won't even try cleaning up after successfully 1285 * initialized modules. if module_init_complete is false, 1286 * just call spdk_bdev_mgr_unregister_cb 1287 */ 1288 if (!g_bdev_mgr.module_init_complete) { 1289 bdev_mgr_unregister_cb(NULL); 1290 return; 1291 } 1292 1293 /* Start iterating from the last touched module */ 1294 if (!g_resume_bdev_module) { 1295 bdev_module = TAILQ_LAST(&g_bdev_mgr.bdev_modules, bdev_module_list); 1296 } else { 1297 bdev_module = TAILQ_PREV(g_resume_bdev_module, bdev_module_list, 1298 internal.tailq); 1299 } 1300 1301 while (bdev_module) { 1302 if (bdev_module->async_fini) { 1303 /* Save our place so we can resume later. We must 1304 * save the variable here, before calling module_fini() 1305 * below, because in some cases the module may immediately 1306 * call spdk_bdev_module_finish_done() and re-enter 1307 * this function to continue iterating. */ 1308 g_resume_bdev_module = bdev_module; 1309 } 1310 1311 if (bdev_module->module_fini) { 1312 bdev_module->module_fini(); 1313 } 1314 1315 if (bdev_module->async_fini) { 1316 return; 1317 } 1318 1319 bdev_module = TAILQ_PREV(bdev_module, bdev_module_list, 1320 internal.tailq); 1321 } 1322 1323 g_resume_bdev_module = NULL; 1324 spdk_io_device_unregister(&g_bdev_mgr, bdev_mgr_unregister_cb); 1325 } 1326 1327 void 1328 spdk_bdev_module_finish_done(void) 1329 { 1330 if (spdk_get_thread() != g_fini_thread) { 1331 spdk_thread_send_msg(g_fini_thread, bdev_module_finish_iter, NULL); 1332 } else { 1333 bdev_module_finish_iter(NULL); 1334 } 1335 } 1336 1337 static void 1338 bdev_finish_unregister_bdevs_iter(void *cb_arg, int bdeverrno) 1339 { 1340 struct spdk_bdev *bdev = cb_arg; 1341 1342 if (bdeverrno && bdev) { 1343 SPDK_WARNLOG("Unable to unregister bdev '%s' during spdk_bdev_finish()\n", 1344 bdev->name); 1345 1346 /* 1347 * Since the call to spdk_bdev_unregister() failed, we have no way to free this 1348 * bdev; try to continue by manually removing this bdev from the list and continue 1349 * with the next bdev in the list. 1350 */ 1351 TAILQ_REMOVE(&g_bdev_mgr.bdevs, bdev, internal.link); 1352 } 1353 1354 if (TAILQ_EMPTY(&g_bdev_mgr.bdevs)) { 1355 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Done unregistering bdevs\n"); 1356 /* 1357 * Bdev module finish need to be deferred as we might be in the middle of some context 1358 * (like bdev part free) that will use this bdev (or private bdev driver ctx data) 1359 * after returning. 1360 */ 1361 spdk_thread_send_msg(spdk_get_thread(), bdev_module_finish_iter, NULL); 1362 return; 1363 } 1364 1365 /* 1366 * Unregister last unclaimed bdev in the list, to ensure that bdev subsystem 1367 * shutdown proceeds top-down. The goal is to give virtual bdevs an opportunity 1368 * to detect clean shutdown as opposed to run-time hot removal of the underlying 1369 * base bdevs. 1370 * 1371 * Also, walk the list in the reverse order. 1372 */ 1373 for (bdev = TAILQ_LAST(&g_bdev_mgr.bdevs, spdk_bdev_list); 1374 bdev; bdev = TAILQ_PREV(bdev, spdk_bdev_list, internal.link)) { 1375 if (bdev->internal.claim_module != NULL) { 1376 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Skipping claimed bdev '%s'(<-'%s').\n", 1377 bdev->name, bdev->internal.claim_module->name); 1378 continue; 1379 } 1380 1381 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Unregistering bdev '%s'\n", bdev->name); 1382 spdk_bdev_unregister(bdev, bdev_finish_unregister_bdevs_iter, bdev); 1383 return; 1384 } 1385 1386 /* 1387 * If any bdev fails to unclaim underlying bdev properly, we may face the 1388 * case of bdev list consisting of claimed bdevs only (if claims are managed 1389 * correctly, this would mean there's a loop in the claims graph which is 1390 * clearly impossible). Warn and unregister last bdev on the list then. 1391 */ 1392 for (bdev = TAILQ_LAST(&g_bdev_mgr.bdevs, spdk_bdev_list); 1393 bdev; bdev = TAILQ_PREV(bdev, spdk_bdev_list, internal.link)) { 1394 SPDK_WARNLOG("Unregistering claimed bdev '%s'!\n", bdev->name); 1395 spdk_bdev_unregister(bdev, bdev_finish_unregister_bdevs_iter, bdev); 1396 return; 1397 } 1398 } 1399 1400 void 1401 spdk_bdev_finish(spdk_bdev_fini_cb cb_fn, void *cb_arg) 1402 { 1403 struct spdk_bdev_module *m; 1404 1405 assert(cb_fn != NULL); 1406 1407 g_fini_thread = spdk_get_thread(); 1408 1409 g_fini_cb_fn = cb_fn; 1410 g_fini_cb_arg = cb_arg; 1411 1412 TAILQ_FOREACH(m, &g_bdev_mgr.bdev_modules, internal.tailq) { 1413 if (m->fini_start) { 1414 m->fini_start(); 1415 } 1416 } 1417 1418 bdev_finish_unregister_bdevs_iter(NULL, 0); 1419 } 1420 1421 struct spdk_bdev_io * 1422 bdev_channel_get_io(struct spdk_bdev_channel *channel) 1423 { 1424 struct spdk_bdev_mgmt_channel *ch = channel->shared_resource->mgmt_ch; 1425 struct spdk_bdev_io *bdev_io; 1426 1427 if (ch->per_thread_cache_count > 0) { 1428 bdev_io = STAILQ_FIRST(&ch->per_thread_cache); 1429 STAILQ_REMOVE_HEAD(&ch->per_thread_cache, internal.buf_link); 1430 ch->per_thread_cache_count--; 1431 } else if (spdk_unlikely(!TAILQ_EMPTY(&ch->io_wait_queue))) { 1432 /* 1433 * Don't try to look for bdev_ios in the global pool if there are 1434 * waiters on bdev_ios - we don't want this caller to jump the line. 1435 */ 1436 bdev_io = NULL; 1437 } else { 1438 bdev_io = spdk_mempool_get(g_bdev_mgr.bdev_io_pool); 1439 } 1440 1441 return bdev_io; 1442 } 1443 1444 void 1445 spdk_bdev_free_io(struct spdk_bdev_io *bdev_io) 1446 { 1447 struct spdk_bdev_mgmt_channel *ch; 1448 1449 assert(bdev_io != NULL); 1450 assert(bdev_io->internal.status != SPDK_BDEV_IO_STATUS_PENDING); 1451 1452 ch = bdev_io->internal.ch->shared_resource->mgmt_ch; 1453 1454 if (bdev_io->internal.buf != NULL) { 1455 bdev_io_put_buf(bdev_io); 1456 } 1457 1458 if (ch->per_thread_cache_count < ch->bdev_io_cache_size) { 1459 ch->per_thread_cache_count++; 1460 STAILQ_INSERT_HEAD(&ch->per_thread_cache, bdev_io, internal.buf_link); 1461 while (ch->per_thread_cache_count > 0 && !TAILQ_EMPTY(&ch->io_wait_queue)) { 1462 struct spdk_bdev_io_wait_entry *entry; 1463 1464 entry = TAILQ_FIRST(&ch->io_wait_queue); 1465 TAILQ_REMOVE(&ch->io_wait_queue, entry, link); 1466 entry->cb_fn(entry->cb_arg); 1467 } 1468 } else { 1469 /* We should never have a full cache with entries on the io wait queue. */ 1470 assert(TAILQ_EMPTY(&ch->io_wait_queue)); 1471 spdk_mempool_put(g_bdev_mgr.bdev_io_pool, (void *)bdev_io); 1472 } 1473 } 1474 1475 static bool 1476 bdev_qos_is_iops_rate_limit(enum spdk_bdev_qos_rate_limit_type limit) 1477 { 1478 assert(limit != SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES); 1479 1480 switch (limit) { 1481 case SPDK_BDEV_QOS_RW_IOPS_RATE_LIMIT: 1482 return true; 1483 case SPDK_BDEV_QOS_RW_BPS_RATE_LIMIT: 1484 case SPDK_BDEV_QOS_R_BPS_RATE_LIMIT: 1485 case SPDK_BDEV_QOS_W_BPS_RATE_LIMIT: 1486 return false; 1487 case SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES: 1488 default: 1489 return false; 1490 } 1491 } 1492 1493 static bool 1494 bdev_qos_io_to_limit(struct spdk_bdev_io *bdev_io) 1495 { 1496 switch (bdev_io->type) { 1497 case SPDK_BDEV_IO_TYPE_NVME_IO: 1498 case SPDK_BDEV_IO_TYPE_NVME_IO_MD: 1499 case SPDK_BDEV_IO_TYPE_READ: 1500 case SPDK_BDEV_IO_TYPE_WRITE: 1501 return true; 1502 case SPDK_BDEV_IO_TYPE_ZCOPY: 1503 if (bdev_io->u.bdev.zcopy.start) { 1504 return true; 1505 } else { 1506 return false; 1507 } 1508 default: 1509 return false; 1510 } 1511 } 1512 1513 static bool 1514 bdev_is_read_io(struct spdk_bdev_io *bdev_io) 1515 { 1516 switch (bdev_io->type) { 1517 case SPDK_BDEV_IO_TYPE_NVME_IO: 1518 case SPDK_BDEV_IO_TYPE_NVME_IO_MD: 1519 /* Bit 1 (0x2) set for read operation */ 1520 if (bdev_io->u.nvme_passthru.cmd.opc & SPDK_NVME_OPC_READ) { 1521 return true; 1522 } else { 1523 return false; 1524 } 1525 case SPDK_BDEV_IO_TYPE_READ: 1526 return true; 1527 case SPDK_BDEV_IO_TYPE_ZCOPY: 1528 /* Populate to read from disk */ 1529 if (bdev_io->u.bdev.zcopy.populate) { 1530 return true; 1531 } else { 1532 return false; 1533 } 1534 default: 1535 return false; 1536 } 1537 } 1538 1539 static uint64_t 1540 bdev_get_io_size_in_byte(struct spdk_bdev_io *bdev_io) 1541 { 1542 struct spdk_bdev *bdev = bdev_io->bdev; 1543 1544 switch (bdev_io->type) { 1545 case SPDK_BDEV_IO_TYPE_NVME_IO: 1546 case SPDK_BDEV_IO_TYPE_NVME_IO_MD: 1547 return bdev_io->u.nvme_passthru.nbytes; 1548 case SPDK_BDEV_IO_TYPE_READ: 1549 case SPDK_BDEV_IO_TYPE_WRITE: 1550 return bdev_io->u.bdev.num_blocks * bdev->blocklen; 1551 case SPDK_BDEV_IO_TYPE_ZCOPY: 1552 /* Track the data in the start phase only */ 1553 if (bdev_io->u.bdev.zcopy.start) { 1554 return bdev_io->u.bdev.num_blocks * bdev->blocklen; 1555 } else { 1556 return 0; 1557 } 1558 default: 1559 return 0; 1560 } 1561 } 1562 1563 static bool 1564 bdev_qos_rw_queue_io(const struct spdk_bdev_qos_limit *limit, struct spdk_bdev_io *io) 1565 { 1566 if (limit->max_per_timeslice > 0 && limit->remaining_this_timeslice <= 0) { 1567 return true; 1568 } else { 1569 return false; 1570 } 1571 } 1572 1573 static bool 1574 bdev_qos_r_queue_io(const struct spdk_bdev_qos_limit *limit, struct spdk_bdev_io *io) 1575 { 1576 if (bdev_is_read_io(io) == false) { 1577 return false; 1578 } 1579 1580 return bdev_qos_rw_queue_io(limit, io); 1581 } 1582 1583 static bool 1584 bdev_qos_w_queue_io(const struct spdk_bdev_qos_limit *limit, struct spdk_bdev_io *io) 1585 { 1586 if (bdev_is_read_io(io) == true) { 1587 return false; 1588 } 1589 1590 return bdev_qos_rw_queue_io(limit, io); 1591 } 1592 1593 static void 1594 bdev_qos_rw_iops_update_quota(struct spdk_bdev_qos_limit *limit, struct spdk_bdev_io *io) 1595 { 1596 limit->remaining_this_timeslice--; 1597 } 1598 1599 static void 1600 bdev_qos_rw_bps_update_quota(struct spdk_bdev_qos_limit *limit, struct spdk_bdev_io *io) 1601 { 1602 limit->remaining_this_timeslice -= bdev_get_io_size_in_byte(io); 1603 } 1604 1605 static void 1606 bdev_qos_r_bps_update_quota(struct spdk_bdev_qos_limit *limit, struct spdk_bdev_io *io) 1607 { 1608 if (bdev_is_read_io(io) == false) { 1609 return; 1610 } 1611 1612 return bdev_qos_rw_bps_update_quota(limit, io); 1613 } 1614 1615 static void 1616 bdev_qos_w_bps_update_quota(struct spdk_bdev_qos_limit *limit, struct spdk_bdev_io *io) 1617 { 1618 if (bdev_is_read_io(io) == true) { 1619 return; 1620 } 1621 1622 return bdev_qos_rw_bps_update_quota(limit, io); 1623 } 1624 1625 static void 1626 bdev_qos_set_ops(struct spdk_bdev_qos *qos) 1627 { 1628 int i; 1629 1630 for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) { 1631 if (qos->rate_limits[i].limit == SPDK_BDEV_QOS_LIMIT_NOT_DEFINED) { 1632 qos->rate_limits[i].queue_io = NULL; 1633 qos->rate_limits[i].update_quota = NULL; 1634 continue; 1635 } 1636 1637 switch (i) { 1638 case SPDK_BDEV_QOS_RW_IOPS_RATE_LIMIT: 1639 qos->rate_limits[i].queue_io = bdev_qos_rw_queue_io; 1640 qos->rate_limits[i].update_quota = bdev_qos_rw_iops_update_quota; 1641 break; 1642 case SPDK_BDEV_QOS_RW_BPS_RATE_LIMIT: 1643 qos->rate_limits[i].queue_io = bdev_qos_rw_queue_io; 1644 qos->rate_limits[i].update_quota = bdev_qos_rw_bps_update_quota; 1645 break; 1646 case SPDK_BDEV_QOS_R_BPS_RATE_LIMIT: 1647 qos->rate_limits[i].queue_io = bdev_qos_r_queue_io; 1648 qos->rate_limits[i].update_quota = bdev_qos_r_bps_update_quota; 1649 break; 1650 case SPDK_BDEV_QOS_W_BPS_RATE_LIMIT: 1651 qos->rate_limits[i].queue_io = bdev_qos_w_queue_io; 1652 qos->rate_limits[i].update_quota = bdev_qos_w_bps_update_quota; 1653 break; 1654 default: 1655 break; 1656 } 1657 } 1658 } 1659 1660 static void 1661 _bdev_io_complete_in_submit(struct spdk_bdev_channel *bdev_ch, 1662 struct spdk_bdev_io *bdev_io, 1663 enum spdk_bdev_io_status status) 1664 { 1665 struct spdk_bdev_shared_resource *shared_resource = bdev_ch->shared_resource; 1666 1667 bdev_io->internal.in_submit_request = true; 1668 bdev_ch->io_outstanding++; 1669 shared_resource->io_outstanding++; 1670 spdk_bdev_io_complete(bdev_io, status); 1671 bdev_io->internal.in_submit_request = false; 1672 } 1673 1674 static inline void 1675 bdev_io_do_submit(struct spdk_bdev_channel *bdev_ch, struct spdk_bdev_io *bdev_io) 1676 { 1677 struct spdk_bdev *bdev = bdev_io->bdev; 1678 struct spdk_io_channel *ch = bdev_ch->channel; 1679 struct spdk_bdev_shared_resource *shared_resource = bdev_ch->shared_resource; 1680 1681 if (spdk_unlikely(bdev_io->type == SPDK_BDEV_IO_TYPE_ABORT)) { 1682 struct spdk_bdev_mgmt_channel *mgmt_channel = shared_resource->mgmt_ch; 1683 struct spdk_bdev_io *bio_to_abort = bdev_io->u.abort.bio_to_abort; 1684 1685 if (bdev_abort_queued_io(&shared_resource->nomem_io, bio_to_abort) || 1686 bdev_abort_buf_io(&mgmt_channel->need_buf_small, bio_to_abort) || 1687 bdev_abort_buf_io(&mgmt_channel->need_buf_large, bio_to_abort)) { 1688 _bdev_io_complete_in_submit(bdev_ch, bdev_io, 1689 SPDK_BDEV_IO_STATUS_SUCCESS); 1690 return; 1691 } 1692 } 1693 1694 if (spdk_likely(TAILQ_EMPTY(&shared_resource->nomem_io))) { 1695 bdev_ch->io_outstanding++; 1696 shared_resource->io_outstanding++; 1697 bdev_io->internal.in_submit_request = true; 1698 bdev->fn_table->submit_request(ch, bdev_io); 1699 bdev_io->internal.in_submit_request = false; 1700 } else { 1701 TAILQ_INSERT_TAIL(&shared_resource->nomem_io, bdev_io, internal.link); 1702 } 1703 } 1704 1705 static int 1706 bdev_qos_io_submit(struct spdk_bdev_channel *ch, struct spdk_bdev_qos *qos) 1707 { 1708 struct spdk_bdev_io *bdev_io = NULL, *tmp = NULL; 1709 int i, submitted_ios = 0; 1710 1711 TAILQ_FOREACH_SAFE(bdev_io, &qos->queued, internal.link, tmp) { 1712 if (bdev_qos_io_to_limit(bdev_io) == true) { 1713 for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) { 1714 if (!qos->rate_limits[i].queue_io) { 1715 continue; 1716 } 1717 1718 if (qos->rate_limits[i].queue_io(&qos->rate_limits[i], 1719 bdev_io) == true) { 1720 return submitted_ios; 1721 } 1722 } 1723 for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) { 1724 if (!qos->rate_limits[i].update_quota) { 1725 continue; 1726 } 1727 1728 qos->rate_limits[i].update_quota(&qos->rate_limits[i], bdev_io); 1729 } 1730 } 1731 1732 TAILQ_REMOVE(&qos->queued, bdev_io, internal.link); 1733 bdev_io_do_submit(ch, bdev_io); 1734 submitted_ios++; 1735 } 1736 1737 return submitted_ios; 1738 } 1739 1740 static void 1741 bdev_queue_io_wait_with_cb(struct spdk_bdev_io *bdev_io, spdk_bdev_io_wait_cb cb_fn) 1742 { 1743 int rc; 1744 1745 bdev_io->internal.waitq_entry.bdev = bdev_io->bdev; 1746 bdev_io->internal.waitq_entry.cb_fn = cb_fn; 1747 bdev_io->internal.waitq_entry.cb_arg = bdev_io; 1748 rc = spdk_bdev_queue_io_wait(bdev_io->bdev, spdk_io_channel_from_ctx(bdev_io->internal.ch), 1749 &bdev_io->internal.waitq_entry); 1750 if (rc != 0) { 1751 SPDK_ERRLOG("Queue IO failed, rc=%d\n", rc); 1752 bdev_io->internal.status = SPDK_BDEV_IO_STATUS_FAILED; 1753 bdev_io->internal.cb(bdev_io, false, bdev_io->internal.caller_ctx); 1754 } 1755 } 1756 1757 static bool 1758 bdev_io_type_can_split(uint8_t type) 1759 { 1760 assert(type != SPDK_BDEV_IO_TYPE_INVALID); 1761 assert(type < SPDK_BDEV_NUM_IO_TYPES); 1762 1763 /* Only split READ and WRITE I/O. Theoretically other types of I/O like 1764 * UNMAP could be split, but these types of I/O are typically much larger 1765 * in size (sometimes the size of the entire block device), and the bdev 1766 * module can more efficiently split these types of I/O. Plus those types 1767 * of I/O do not have a payload, which makes the splitting process simpler. 1768 */ 1769 if (type == SPDK_BDEV_IO_TYPE_READ || type == SPDK_BDEV_IO_TYPE_WRITE) { 1770 return true; 1771 } else { 1772 return false; 1773 } 1774 } 1775 1776 static bool 1777 bdev_io_should_split(struct spdk_bdev_io *bdev_io) 1778 { 1779 uint64_t start_stripe, end_stripe; 1780 uint32_t io_boundary = bdev_io->bdev->optimal_io_boundary; 1781 1782 if (io_boundary == 0) { 1783 return false; 1784 } 1785 1786 if (!bdev_io_type_can_split(bdev_io->type)) { 1787 return false; 1788 } 1789 1790 start_stripe = bdev_io->u.bdev.offset_blocks; 1791 end_stripe = start_stripe + bdev_io->u.bdev.num_blocks - 1; 1792 /* Avoid expensive div operations if possible. These spdk_u32 functions are very cheap. */ 1793 if (spdk_likely(spdk_u32_is_pow2(io_boundary))) { 1794 start_stripe >>= spdk_u32log2(io_boundary); 1795 end_stripe >>= spdk_u32log2(io_boundary); 1796 } else { 1797 start_stripe /= io_boundary; 1798 end_stripe /= io_boundary; 1799 } 1800 return (start_stripe != end_stripe); 1801 } 1802 1803 static uint32_t 1804 _to_next_boundary(uint64_t offset, uint32_t boundary) 1805 { 1806 return (boundary - (offset % boundary)); 1807 } 1808 1809 static void 1810 bdev_io_split_done(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg); 1811 1812 static void 1813 _bdev_io_split(void *_bdev_io) 1814 { 1815 struct spdk_bdev_io *bdev_io = _bdev_io; 1816 uint64_t current_offset, remaining; 1817 uint32_t blocklen, to_next_boundary, to_next_boundary_bytes, to_last_block_bytes; 1818 struct iovec *parent_iov, *iov; 1819 uint64_t parent_iov_offset, iov_len; 1820 uint32_t parent_iovpos, parent_iovcnt, child_iovcnt, iovcnt; 1821 void *md_buf = NULL; 1822 int rc; 1823 1824 remaining = bdev_io->u.bdev.split_remaining_num_blocks; 1825 current_offset = bdev_io->u.bdev.split_current_offset_blocks; 1826 blocklen = bdev_io->bdev->blocklen; 1827 parent_iov_offset = (current_offset - bdev_io->u.bdev.offset_blocks) * blocklen; 1828 parent_iovcnt = bdev_io->u.bdev.iovcnt; 1829 1830 for (parent_iovpos = 0; parent_iovpos < parent_iovcnt; parent_iovpos++) { 1831 parent_iov = &bdev_io->u.bdev.iovs[parent_iovpos]; 1832 if (parent_iov_offset < parent_iov->iov_len) { 1833 break; 1834 } 1835 parent_iov_offset -= parent_iov->iov_len; 1836 } 1837 1838 child_iovcnt = 0; 1839 while (remaining > 0 && parent_iovpos < parent_iovcnt && child_iovcnt < BDEV_IO_NUM_CHILD_IOV) { 1840 to_next_boundary = _to_next_boundary(current_offset, bdev_io->bdev->optimal_io_boundary); 1841 to_next_boundary = spdk_min(remaining, to_next_boundary); 1842 to_next_boundary_bytes = to_next_boundary * blocklen; 1843 iov = &bdev_io->child_iov[child_iovcnt]; 1844 iovcnt = 0; 1845 1846 if (bdev_io->u.bdev.md_buf) { 1847 assert((parent_iov_offset % blocklen) > 0); 1848 md_buf = (char *)bdev_io->u.bdev.md_buf + (parent_iov_offset / blocklen) * 1849 spdk_bdev_get_md_size(bdev_io->bdev); 1850 } 1851 1852 while (to_next_boundary_bytes > 0 && parent_iovpos < parent_iovcnt && 1853 child_iovcnt < BDEV_IO_NUM_CHILD_IOV) { 1854 parent_iov = &bdev_io->u.bdev.iovs[parent_iovpos]; 1855 iov_len = spdk_min(to_next_boundary_bytes, parent_iov->iov_len - parent_iov_offset); 1856 to_next_boundary_bytes -= iov_len; 1857 1858 bdev_io->child_iov[child_iovcnt].iov_base = parent_iov->iov_base + parent_iov_offset; 1859 bdev_io->child_iov[child_iovcnt].iov_len = iov_len; 1860 1861 if (iov_len < parent_iov->iov_len - parent_iov_offset) { 1862 parent_iov_offset += iov_len; 1863 } else { 1864 parent_iovpos++; 1865 parent_iov_offset = 0; 1866 } 1867 child_iovcnt++; 1868 iovcnt++; 1869 } 1870 1871 if (to_next_boundary_bytes > 0) { 1872 /* We had to stop this child I/O early because we ran out of 1873 * child_iov space. Ensure the iovs to be aligned with block 1874 * size and then adjust to_next_boundary before starting the 1875 * child I/O. 1876 */ 1877 assert(child_iovcnt == BDEV_IO_NUM_CHILD_IOV); 1878 to_last_block_bytes = to_next_boundary_bytes % blocklen; 1879 if (to_last_block_bytes != 0) { 1880 uint32_t child_iovpos = child_iovcnt - 1; 1881 /* don't decrease child_iovcnt so the loop will naturally end */ 1882 1883 to_last_block_bytes = blocklen - to_last_block_bytes; 1884 to_next_boundary_bytes += to_last_block_bytes; 1885 while (to_last_block_bytes > 0 && iovcnt > 0) { 1886 iov_len = spdk_min(to_last_block_bytes, 1887 bdev_io->child_iov[child_iovpos].iov_len); 1888 bdev_io->child_iov[child_iovpos].iov_len -= iov_len; 1889 if (bdev_io->child_iov[child_iovpos].iov_len == 0) { 1890 child_iovpos--; 1891 if (--iovcnt == 0) { 1892 return; 1893 } 1894 } 1895 to_last_block_bytes -= iov_len; 1896 } 1897 1898 assert(to_last_block_bytes == 0); 1899 } 1900 to_next_boundary -= to_next_boundary_bytes / blocklen; 1901 } 1902 1903 bdev_io->u.bdev.split_outstanding++; 1904 1905 if (bdev_io->type == SPDK_BDEV_IO_TYPE_READ) { 1906 rc = bdev_readv_blocks_with_md(bdev_io->internal.desc, 1907 spdk_io_channel_from_ctx(bdev_io->internal.ch), 1908 iov, iovcnt, md_buf, current_offset, 1909 to_next_boundary, 1910 bdev_io_split_done, bdev_io); 1911 } else { 1912 rc = bdev_writev_blocks_with_md(bdev_io->internal.desc, 1913 spdk_io_channel_from_ctx(bdev_io->internal.ch), 1914 iov, iovcnt, md_buf, current_offset, 1915 to_next_boundary, 1916 bdev_io_split_done, bdev_io); 1917 } 1918 1919 if (rc == 0) { 1920 current_offset += to_next_boundary; 1921 remaining -= to_next_boundary; 1922 bdev_io->u.bdev.split_current_offset_blocks = current_offset; 1923 bdev_io->u.bdev.split_remaining_num_blocks = remaining; 1924 } else { 1925 bdev_io->u.bdev.split_outstanding--; 1926 if (rc == -ENOMEM) { 1927 if (bdev_io->u.bdev.split_outstanding == 0) { 1928 /* No I/O is outstanding. Hence we should wait here. */ 1929 bdev_queue_io_wait_with_cb(bdev_io, _bdev_io_split); 1930 } 1931 } else { 1932 bdev_io->internal.status = SPDK_BDEV_IO_STATUS_FAILED; 1933 if (bdev_io->u.bdev.split_outstanding == 0) { 1934 spdk_trace_record_tsc(spdk_get_ticks(), TRACE_BDEV_IO_DONE, 0, 0, 1935 (uintptr_t)bdev_io, 0); 1936 TAILQ_REMOVE(&bdev_io->internal.ch->io_submitted, bdev_io, internal.ch_link); 1937 bdev_io->internal.cb(bdev_io, false, bdev_io->internal.caller_ctx); 1938 } 1939 } 1940 1941 return; 1942 } 1943 } 1944 } 1945 1946 static void 1947 bdev_io_split_done(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg) 1948 { 1949 struct spdk_bdev_io *parent_io = cb_arg; 1950 1951 spdk_bdev_free_io(bdev_io); 1952 1953 if (!success) { 1954 parent_io->internal.status = SPDK_BDEV_IO_STATUS_FAILED; 1955 /* If any child I/O failed, stop further splitting process. */ 1956 parent_io->u.bdev.split_current_offset_blocks += parent_io->u.bdev.split_remaining_num_blocks; 1957 parent_io->u.bdev.split_remaining_num_blocks = 0; 1958 } 1959 parent_io->u.bdev.split_outstanding--; 1960 if (parent_io->u.bdev.split_outstanding != 0) { 1961 return; 1962 } 1963 1964 /* 1965 * Parent I/O finishes when all blocks are consumed. 1966 */ 1967 if (parent_io->u.bdev.split_remaining_num_blocks == 0) { 1968 assert(parent_io->internal.cb != bdev_io_split_done); 1969 spdk_trace_record_tsc(spdk_get_ticks(), TRACE_BDEV_IO_DONE, 0, 0, 1970 (uintptr_t)parent_io, 0); 1971 TAILQ_REMOVE(&parent_io->internal.ch->io_submitted, parent_io, internal.ch_link); 1972 parent_io->internal.cb(parent_io, parent_io->internal.status == SPDK_BDEV_IO_STATUS_SUCCESS, 1973 parent_io->internal.caller_ctx); 1974 return; 1975 } 1976 1977 /* 1978 * Continue with the splitting process. This function will complete the parent I/O if the 1979 * splitting is done. 1980 */ 1981 _bdev_io_split(parent_io); 1982 } 1983 1984 static void 1985 bdev_io_split_get_buf_cb(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io, bool success); 1986 1987 static void 1988 bdev_io_split(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io) 1989 { 1990 assert(bdev_io_type_can_split(bdev_io->type)); 1991 1992 bdev_io->u.bdev.split_current_offset_blocks = bdev_io->u.bdev.offset_blocks; 1993 bdev_io->u.bdev.split_remaining_num_blocks = bdev_io->u.bdev.num_blocks; 1994 bdev_io->u.bdev.split_outstanding = 0; 1995 bdev_io->internal.status = SPDK_BDEV_IO_STATUS_SUCCESS; 1996 1997 if (_is_buf_allocated(bdev_io->u.bdev.iovs)) { 1998 _bdev_io_split(bdev_io); 1999 } else { 2000 assert(bdev_io->type == SPDK_BDEV_IO_TYPE_READ); 2001 spdk_bdev_io_get_buf(bdev_io, bdev_io_split_get_buf_cb, 2002 bdev_io->u.bdev.num_blocks * bdev_io->bdev->blocklen); 2003 } 2004 } 2005 2006 static void 2007 bdev_io_split_get_buf_cb(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io, bool success) 2008 { 2009 if (!success) { 2010 spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED); 2011 return; 2012 } 2013 2014 bdev_io_split(ch, bdev_io); 2015 } 2016 2017 /* Explicitly mark this inline, since it's used as a function pointer and otherwise won't 2018 * be inlined, at least on some compilers. 2019 */ 2020 static inline void 2021 _bdev_io_submit(void *ctx) 2022 { 2023 struct spdk_bdev_io *bdev_io = ctx; 2024 struct spdk_bdev *bdev = bdev_io->bdev; 2025 struct spdk_bdev_channel *bdev_ch = bdev_io->internal.ch; 2026 uint64_t tsc; 2027 2028 tsc = spdk_get_ticks(); 2029 bdev_io->internal.submit_tsc = tsc; 2030 spdk_trace_record_tsc(tsc, TRACE_BDEV_IO_START, 0, 0, (uintptr_t)bdev_io, bdev_io->type); 2031 2032 if (spdk_likely(bdev_ch->flags == 0)) { 2033 bdev_io_do_submit(bdev_ch, bdev_io); 2034 return; 2035 } 2036 2037 if (bdev_ch->flags & BDEV_CH_RESET_IN_PROGRESS) { 2038 _bdev_io_complete_in_submit(bdev_ch, bdev_io, SPDK_BDEV_IO_STATUS_ABORTED); 2039 } else if (bdev_ch->flags & BDEV_CH_QOS_ENABLED) { 2040 if (spdk_unlikely(bdev_io->type == SPDK_BDEV_IO_TYPE_ABORT) && 2041 bdev_abort_queued_io(&bdev->internal.qos->queued, bdev_io->u.abort.bio_to_abort)) { 2042 _bdev_io_complete_in_submit(bdev_ch, bdev_io, SPDK_BDEV_IO_STATUS_SUCCESS); 2043 } else { 2044 TAILQ_INSERT_TAIL(&bdev->internal.qos->queued, bdev_io, internal.link); 2045 bdev_qos_io_submit(bdev_ch, bdev->internal.qos); 2046 } 2047 } else { 2048 SPDK_ERRLOG("unknown bdev_ch flag %x found\n", bdev_ch->flags); 2049 _bdev_io_complete_in_submit(bdev_ch, bdev_io, SPDK_BDEV_IO_STATUS_FAILED); 2050 } 2051 } 2052 2053 bool 2054 bdev_lba_range_overlapped(struct lba_range *range1, struct lba_range *range2); 2055 2056 bool 2057 bdev_lba_range_overlapped(struct lba_range *range1, struct lba_range *range2) 2058 { 2059 if (range1->length == 0 || range2->length == 0) { 2060 return false; 2061 } 2062 2063 if (range1->offset + range1->length <= range2->offset) { 2064 return false; 2065 } 2066 2067 if (range2->offset + range2->length <= range1->offset) { 2068 return false; 2069 } 2070 2071 return true; 2072 } 2073 2074 static bool 2075 bdev_io_range_is_locked(struct spdk_bdev_io *bdev_io, struct lba_range *range) 2076 { 2077 struct spdk_bdev_channel *ch = bdev_io->internal.ch; 2078 struct lba_range r; 2079 2080 switch (bdev_io->type) { 2081 case SPDK_BDEV_IO_TYPE_NVME_IO: 2082 case SPDK_BDEV_IO_TYPE_NVME_IO_MD: 2083 /* Don't try to decode the NVMe command - just assume worst-case and that 2084 * it overlaps a locked range. 2085 */ 2086 return true; 2087 case SPDK_BDEV_IO_TYPE_WRITE: 2088 case SPDK_BDEV_IO_TYPE_UNMAP: 2089 case SPDK_BDEV_IO_TYPE_WRITE_ZEROES: 2090 case SPDK_BDEV_IO_TYPE_ZCOPY: 2091 r.offset = bdev_io->u.bdev.offset_blocks; 2092 r.length = bdev_io->u.bdev.num_blocks; 2093 if (!bdev_lba_range_overlapped(range, &r)) { 2094 /* This I/O doesn't overlap the specified LBA range. */ 2095 return false; 2096 } else if (range->owner_ch == ch && range->locked_ctx == bdev_io->internal.caller_ctx) { 2097 /* This I/O overlaps, but the I/O is on the same channel that locked this 2098 * range, and the caller_ctx is the same as the locked_ctx. This means 2099 * that this I/O is associated with the lock, and is allowed to execute. 2100 */ 2101 return false; 2102 } else { 2103 return true; 2104 } 2105 default: 2106 return false; 2107 } 2108 } 2109 2110 void 2111 bdev_io_submit(struct spdk_bdev_io *bdev_io) 2112 { 2113 struct spdk_bdev *bdev = bdev_io->bdev; 2114 struct spdk_thread *thread = spdk_bdev_io_get_thread(bdev_io); 2115 struct spdk_bdev_channel *ch = bdev_io->internal.ch; 2116 2117 assert(thread != NULL); 2118 assert(bdev_io->internal.status == SPDK_BDEV_IO_STATUS_PENDING); 2119 2120 if (!TAILQ_EMPTY(&ch->locked_ranges)) { 2121 struct lba_range *range; 2122 2123 TAILQ_FOREACH(range, &ch->locked_ranges, tailq) { 2124 if (bdev_io_range_is_locked(bdev_io, range)) { 2125 TAILQ_INSERT_TAIL(&ch->io_locked, bdev_io, internal.ch_link); 2126 return; 2127 } 2128 } 2129 } 2130 2131 TAILQ_INSERT_TAIL(&ch->io_submitted, bdev_io, internal.ch_link); 2132 2133 if (bdev->split_on_optimal_io_boundary && bdev_io_should_split(bdev_io)) { 2134 bdev_io->internal.submit_tsc = spdk_get_ticks(); 2135 spdk_trace_record_tsc(bdev_io->internal.submit_tsc, TRACE_BDEV_IO_START, 0, 0, 2136 (uintptr_t)bdev_io, bdev_io->type); 2137 bdev_io_split(NULL, bdev_io); 2138 return; 2139 } 2140 2141 if (ch->flags & BDEV_CH_QOS_ENABLED) { 2142 if ((thread == bdev->internal.qos->thread) || !bdev->internal.qos->thread) { 2143 _bdev_io_submit(bdev_io); 2144 } else { 2145 bdev_io->internal.io_submit_ch = ch; 2146 bdev_io->internal.ch = bdev->internal.qos->ch; 2147 spdk_thread_send_msg(bdev->internal.qos->thread, _bdev_io_submit, bdev_io); 2148 } 2149 } else { 2150 _bdev_io_submit(bdev_io); 2151 } 2152 } 2153 2154 static void 2155 bdev_io_submit_reset(struct spdk_bdev_io *bdev_io) 2156 { 2157 struct spdk_bdev *bdev = bdev_io->bdev; 2158 struct spdk_bdev_channel *bdev_ch = bdev_io->internal.ch; 2159 struct spdk_io_channel *ch = bdev_ch->channel; 2160 2161 assert(bdev_io->internal.status == SPDK_BDEV_IO_STATUS_PENDING); 2162 2163 bdev_io->internal.in_submit_request = true; 2164 bdev->fn_table->submit_request(ch, bdev_io); 2165 bdev_io->internal.in_submit_request = false; 2166 } 2167 2168 void 2169 bdev_io_init(struct spdk_bdev_io *bdev_io, 2170 struct spdk_bdev *bdev, void *cb_arg, 2171 spdk_bdev_io_completion_cb cb) 2172 { 2173 bdev_io->bdev = bdev; 2174 bdev_io->internal.caller_ctx = cb_arg; 2175 bdev_io->internal.cb = cb; 2176 bdev_io->internal.status = SPDK_BDEV_IO_STATUS_PENDING; 2177 bdev_io->internal.in_submit_request = false; 2178 bdev_io->internal.buf = NULL; 2179 bdev_io->internal.io_submit_ch = NULL; 2180 bdev_io->internal.orig_iovs = NULL; 2181 bdev_io->internal.orig_iovcnt = 0; 2182 bdev_io->internal.orig_md_buf = NULL; 2183 bdev_io->internal.error.nvme.cdw0 = 0; 2184 bdev_io->num_retries = 0; 2185 bdev_io->internal.get_buf_cb = NULL; 2186 bdev_io->internal.get_aux_buf_cb = NULL; 2187 } 2188 2189 static bool 2190 bdev_io_type_supported(struct spdk_bdev *bdev, enum spdk_bdev_io_type io_type) 2191 { 2192 return bdev->fn_table->io_type_supported(bdev->ctxt, io_type); 2193 } 2194 2195 bool 2196 spdk_bdev_io_type_supported(struct spdk_bdev *bdev, enum spdk_bdev_io_type io_type) 2197 { 2198 bool supported; 2199 2200 supported = bdev_io_type_supported(bdev, io_type); 2201 2202 if (!supported) { 2203 switch (io_type) { 2204 case SPDK_BDEV_IO_TYPE_WRITE_ZEROES: 2205 /* The bdev layer will emulate write zeroes as long as write is supported. */ 2206 supported = bdev_io_type_supported(bdev, SPDK_BDEV_IO_TYPE_WRITE); 2207 break; 2208 case SPDK_BDEV_IO_TYPE_ZCOPY: 2209 /* Zero copy can be emulated with regular read and write */ 2210 supported = bdev_io_type_supported(bdev, SPDK_BDEV_IO_TYPE_READ) && 2211 bdev_io_type_supported(bdev, SPDK_BDEV_IO_TYPE_WRITE); 2212 break; 2213 default: 2214 break; 2215 } 2216 } 2217 2218 return supported; 2219 } 2220 2221 int 2222 spdk_bdev_dump_info_json(struct spdk_bdev *bdev, struct spdk_json_write_ctx *w) 2223 { 2224 if (bdev->fn_table->dump_info_json) { 2225 return bdev->fn_table->dump_info_json(bdev->ctxt, w); 2226 } 2227 2228 return 0; 2229 } 2230 2231 static void 2232 bdev_qos_update_max_quota_per_timeslice(struct spdk_bdev_qos *qos) 2233 { 2234 uint32_t max_per_timeslice = 0; 2235 int i; 2236 2237 for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) { 2238 if (qos->rate_limits[i].limit == SPDK_BDEV_QOS_LIMIT_NOT_DEFINED) { 2239 qos->rate_limits[i].max_per_timeslice = 0; 2240 continue; 2241 } 2242 2243 max_per_timeslice = qos->rate_limits[i].limit * 2244 SPDK_BDEV_QOS_TIMESLICE_IN_USEC / SPDK_SEC_TO_USEC; 2245 2246 qos->rate_limits[i].max_per_timeslice = spdk_max(max_per_timeslice, 2247 qos->rate_limits[i].min_per_timeslice); 2248 2249 qos->rate_limits[i].remaining_this_timeslice = qos->rate_limits[i].max_per_timeslice; 2250 } 2251 2252 bdev_qos_set_ops(qos); 2253 } 2254 2255 static int 2256 bdev_channel_poll_qos(void *arg) 2257 { 2258 struct spdk_bdev_qos *qos = arg; 2259 uint64_t now = spdk_get_ticks(); 2260 int i; 2261 2262 if (now < (qos->last_timeslice + qos->timeslice_size)) { 2263 /* We received our callback earlier than expected - return 2264 * immediately and wait to do accounting until at least one 2265 * timeslice has actually expired. This should never happen 2266 * with a well-behaved timer implementation. 2267 */ 2268 return 0; 2269 } 2270 2271 /* Reset for next round of rate limiting */ 2272 for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) { 2273 /* We may have allowed the IOs or bytes to slightly overrun in the last 2274 * timeslice. remaining_this_timeslice is signed, so if it's negative 2275 * here, we'll account for the overrun so that the next timeslice will 2276 * be appropriately reduced. 2277 */ 2278 if (qos->rate_limits[i].remaining_this_timeslice > 0) { 2279 qos->rate_limits[i].remaining_this_timeslice = 0; 2280 } 2281 } 2282 2283 while (now >= (qos->last_timeslice + qos->timeslice_size)) { 2284 qos->last_timeslice += qos->timeslice_size; 2285 for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) { 2286 qos->rate_limits[i].remaining_this_timeslice += 2287 qos->rate_limits[i].max_per_timeslice; 2288 } 2289 } 2290 2291 return bdev_qos_io_submit(qos->ch, qos); 2292 } 2293 2294 static void 2295 bdev_channel_destroy_resource(struct spdk_bdev_channel *ch) 2296 { 2297 struct spdk_bdev_shared_resource *shared_resource; 2298 struct lba_range *range; 2299 2300 while (!TAILQ_EMPTY(&ch->locked_ranges)) { 2301 range = TAILQ_FIRST(&ch->locked_ranges); 2302 TAILQ_REMOVE(&ch->locked_ranges, range, tailq); 2303 free(range); 2304 } 2305 2306 spdk_put_io_channel(ch->channel); 2307 2308 shared_resource = ch->shared_resource; 2309 2310 assert(TAILQ_EMPTY(&ch->io_locked)); 2311 assert(TAILQ_EMPTY(&ch->io_submitted)); 2312 assert(ch->io_outstanding == 0); 2313 assert(shared_resource->ref > 0); 2314 shared_resource->ref--; 2315 if (shared_resource->ref == 0) { 2316 assert(shared_resource->io_outstanding == 0); 2317 TAILQ_REMOVE(&shared_resource->mgmt_ch->shared_resources, shared_resource, link); 2318 spdk_put_io_channel(spdk_io_channel_from_ctx(shared_resource->mgmt_ch)); 2319 free(shared_resource); 2320 } 2321 } 2322 2323 /* Caller must hold bdev->internal.mutex. */ 2324 static void 2325 bdev_enable_qos(struct spdk_bdev *bdev, struct spdk_bdev_channel *ch) 2326 { 2327 struct spdk_bdev_qos *qos = bdev->internal.qos; 2328 int i; 2329 2330 /* Rate limiting on this bdev enabled */ 2331 if (qos) { 2332 if (qos->ch == NULL) { 2333 struct spdk_io_channel *io_ch; 2334 2335 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Selecting channel %p as QoS channel for bdev %s on thread %p\n", ch, 2336 bdev->name, spdk_get_thread()); 2337 2338 /* No qos channel has been selected, so set one up */ 2339 2340 /* Take another reference to ch */ 2341 io_ch = spdk_get_io_channel(__bdev_to_io_dev(bdev)); 2342 assert(io_ch != NULL); 2343 qos->ch = ch; 2344 2345 qos->thread = spdk_io_channel_get_thread(io_ch); 2346 2347 TAILQ_INIT(&qos->queued); 2348 2349 for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) { 2350 if (bdev_qos_is_iops_rate_limit(i) == true) { 2351 qos->rate_limits[i].min_per_timeslice = 2352 SPDK_BDEV_QOS_MIN_IO_PER_TIMESLICE; 2353 } else { 2354 qos->rate_limits[i].min_per_timeslice = 2355 SPDK_BDEV_QOS_MIN_BYTE_PER_TIMESLICE; 2356 } 2357 2358 if (qos->rate_limits[i].limit == 0) { 2359 qos->rate_limits[i].limit = SPDK_BDEV_QOS_LIMIT_NOT_DEFINED; 2360 } 2361 } 2362 bdev_qos_update_max_quota_per_timeslice(qos); 2363 qos->timeslice_size = 2364 SPDK_BDEV_QOS_TIMESLICE_IN_USEC * spdk_get_ticks_hz() / SPDK_SEC_TO_USEC; 2365 qos->last_timeslice = spdk_get_ticks(); 2366 qos->poller = SPDK_POLLER_REGISTER(bdev_channel_poll_qos, 2367 qos, 2368 SPDK_BDEV_QOS_TIMESLICE_IN_USEC); 2369 } 2370 2371 ch->flags |= BDEV_CH_QOS_ENABLED; 2372 } 2373 } 2374 2375 struct poll_timeout_ctx { 2376 struct spdk_bdev_desc *desc; 2377 uint64_t timeout_in_sec; 2378 spdk_bdev_io_timeout_cb cb_fn; 2379 void *cb_arg; 2380 }; 2381 2382 static void 2383 bdev_desc_free(struct spdk_bdev_desc *desc) 2384 { 2385 pthread_mutex_destroy(&desc->mutex); 2386 free(desc->media_events_buffer); 2387 free(desc); 2388 } 2389 2390 static void 2391 bdev_channel_poll_timeout_io_done(struct spdk_io_channel_iter *i, int status) 2392 { 2393 struct poll_timeout_ctx *ctx = spdk_io_channel_iter_get_ctx(i); 2394 struct spdk_bdev_desc *desc = ctx->desc; 2395 2396 free(ctx); 2397 2398 pthread_mutex_lock(&desc->mutex); 2399 desc->refs--; 2400 if (desc->closed == true && desc->refs == 0) { 2401 pthread_mutex_unlock(&desc->mutex); 2402 bdev_desc_free(desc); 2403 return; 2404 } 2405 pthread_mutex_unlock(&desc->mutex); 2406 } 2407 2408 static void 2409 bdev_channel_poll_timeout_io(struct spdk_io_channel_iter *i) 2410 { 2411 struct poll_timeout_ctx *ctx = spdk_io_channel_iter_get_ctx(i); 2412 struct spdk_io_channel *io_ch = spdk_io_channel_iter_get_channel(i); 2413 struct spdk_bdev_channel *bdev_ch = spdk_io_channel_get_ctx(io_ch); 2414 struct spdk_bdev_desc *desc = ctx->desc; 2415 struct spdk_bdev_io *bdev_io; 2416 uint64_t now; 2417 2418 pthread_mutex_lock(&desc->mutex); 2419 if (desc->closed == true) { 2420 pthread_mutex_unlock(&desc->mutex); 2421 spdk_for_each_channel_continue(i, -1); 2422 return; 2423 } 2424 pthread_mutex_unlock(&desc->mutex); 2425 2426 now = spdk_get_ticks(); 2427 TAILQ_FOREACH(bdev_io, &bdev_ch->io_submitted, internal.ch_link) { 2428 /* Exclude any I/O that are generated via splitting. */ 2429 if (bdev_io->internal.cb == bdev_io_split_done) { 2430 continue; 2431 } 2432 2433 /* Once we find an I/O that has not timed out, we can immediately 2434 * exit the loop. 2435 */ 2436 if (now < (bdev_io->internal.submit_tsc + 2437 ctx->timeout_in_sec * spdk_get_ticks_hz())) { 2438 goto end; 2439 } 2440 2441 if (bdev_io->internal.desc == desc) { 2442 ctx->cb_fn(ctx->cb_arg, bdev_io); 2443 } 2444 } 2445 2446 end: 2447 spdk_for_each_channel_continue(i, 0); 2448 } 2449 2450 static int 2451 bdev_poll_timeout_io(void *arg) 2452 { 2453 struct spdk_bdev_desc *desc = arg; 2454 struct spdk_bdev *bdev = spdk_bdev_desc_get_bdev(desc); 2455 struct poll_timeout_ctx *ctx; 2456 2457 ctx = calloc(1, sizeof(struct poll_timeout_ctx)); 2458 if (!ctx) { 2459 SPDK_ERRLOG("failed to allocate memory\n"); 2460 return 1; 2461 } 2462 ctx->desc = desc; 2463 ctx->cb_arg = desc->cb_arg; 2464 ctx->cb_fn = desc->cb_fn; 2465 ctx->timeout_in_sec = desc->timeout_in_sec; 2466 2467 /* Take a ref on the descriptor in case it gets closed while we are checking 2468 * all of the channels. 2469 */ 2470 pthread_mutex_lock(&desc->mutex); 2471 desc->refs++; 2472 pthread_mutex_unlock(&desc->mutex); 2473 2474 spdk_for_each_channel(__bdev_to_io_dev(bdev), 2475 bdev_channel_poll_timeout_io, 2476 ctx, 2477 bdev_channel_poll_timeout_io_done); 2478 2479 return 1; 2480 } 2481 2482 int 2483 spdk_bdev_set_timeout(struct spdk_bdev_desc *desc, uint64_t timeout_in_sec, 2484 spdk_bdev_io_timeout_cb cb_fn, void *cb_arg) 2485 { 2486 assert(desc->thread == spdk_get_thread()); 2487 2488 spdk_poller_unregister(&desc->io_timeout_poller); 2489 2490 if (timeout_in_sec) { 2491 assert(cb_fn != NULL); 2492 desc->io_timeout_poller = SPDK_POLLER_REGISTER(bdev_poll_timeout_io, 2493 desc, 2494 SPDK_BDEV_IO_POLL_INTERVAL_IN_MSEC * SPDK_SEC_TO_USEC / 2495 1000); 2496 if (desc->io_timeout_poller == NULL) { 2497 SPDK_ERRLOG("can not register the desc timeout IO poller\n"); 2498 return -1; 2499 } 2500 } 2501 2502 desc->cb_fn = cb_fn; 2503 desc->cb_arg = cb_arg; 2504 desc->timeout_in_sec = timeout_in_sec; 2505 2506 return 0; 2507 } 2508 2509 static int 2510 bdev_channel_create(void *io_device, void *ctx_buf) 2511 { 2512 struct spdk_bdev *bdev = __bdev_from_io_dev(io_device); 2513 struct spdk_bdev_channel *ch = ctx_buf; 2514 struct spdk_io_channel *mgmt_io_ch; 2515 struct spdk_bdev_mgmt_channel *mgmt_ch; 2516 struct spdk_bdev_shared_resource *shared_resource; 2517 struct lba_range *range; 2518 2519 ch->bdev = bdev; 2520 ch->channel = bdev->fn_table->get_io_channel(bdev->ctxt); 2521 if (!ch->channel) { 2522 return -1; 2523 } 2524 2525 assert(ch->histogram == NULL); 2526 if (bdev->internal.histogram_enabled) { 2527 ch->histogram = spdk_histogram_data_alloc(); 2528 if (ch->histogram == NULL) { 2529 SPDK_ERRLOG("Could not allocate histogram\n"); 2530 } 2531 } 2532 2533 mgmt_io_ch = spdk_get_io_channel(&g_bdev_mgr); 2534 if (!mgmt_io_ch) { 2535 spdk_put_io_channel(ch->channel); 2536 return -1; 2537 } 2538 2539 mgmt_ch = spdk_io_channel_get_ctx(mgmt_io_ch); 2540 TAILQ_FOREACH(shared_resource, &mgmt_ch->shared_resources, link) { 2541 if (shared_resource->shared_ch == ch->channel) { 2542 spdk_put_io_channel(mgmt_io_ch); 2543 shared_resource->ref++; 2544 break; 2545 } 2546 } 2547 2548 if (shared_resource == NULL) { 2549 shared_resource = calloc(1, sizeof(*shared_resource)); 2550 if (shared_resource == NULL) { 2551 spdk_put_io_channel(ch->channel); 2552 spdk_put_io_channel(mgmt_io_ch); 2553 return -1; 2554 } 2555 2556 shared_resource->mgmt_ch = mgmt_ch; 2557 shared_resource->io_outstanding = 0; 2558 TAILQ_INIT(&shared_resource->nomem_io); 2559 shared_resource->nomem_threshold = 0; 2560 shared_resource->shared_ch = ch->channel; 2561 shared_resource->ref = 1; 2562 TAILQ_INSERT_TAIL(&mgmt_ch->shared_resources, shared_resource, link); 2563 } 2564 2565 memset(&ch->stat, 0, sizeof(ch->stat)); 2566 ch->stat.ticks_rate = spdk_get_ticks_hz(); 2567 ch->io_outstanding = 0; 2568 TAILQ_INIT(&ch->queued_resets); 2569 TAILQ_INIT(&ch->locked_ranges); 2570 ch->flags = 0; 2571 ch->shared_resource = shared_resource; 2572 2573 TAILQ_INIT(&ch->io_submitted); 2574 TAILQ_INIT(&ch->io_locked); 2575 2576 #ifdef SPDK_CONFIG_VTUNE 2577 { 2578 char *name; 2579 __itt_init_ittlib(NULL, 0); 2580 name = spdk_sprintf_alloc("spdk_bdev_%s_%p", ch->bdev->name, ch); 2581 if (!name) { 2582 bdev_channel_destroy_resource(ch); 2583 return -1; 2584 } 2585 ch->handle = __itt_string_handle_create(name); 2586 free(name); 2587 ch->start_tsc = spdk_get_ticks(); 2588 ch->interval_tsc = spdk_get_ticks_hz() / 100; 2589 memset(&ch->prev_stat, 0, sizeof(ch->prev_stat)); 2590 } 2591 #endif 2592 2593 pthread_mutex_lock(&bdev->internal.mutex); 2594 bdev_enable_qos(bdev, ch); 2595 2596 TAILQ_FOREACH(range, &bdev->internal.locked_ranges, tailq) { 2597 struct lba_range *new_range; 2598 2599 new_range = calloc(1, sizeof(*new_range)); 2600 if (new_range == NULL) { 2601 pthread_mutex_unlock(&bdev->internal.mutex); 2602 bdev_channel_destroy_resource(ch); 2603 return -1; 2604 } 2605 new_range->length = range->length; 2606 new_range->offset = range->offset; 2607 new_range->locked_ctx = range->locked_ctx; 2608 TAILQ_INSERT_TAIL(&ch->locked_ranges, new_range, tailq); 2609 } 2610 2611 pthread_mutex_unlock(&bdev->internal.mutex); 2612 2613 return 0; 2614 } 2615 2616 /* 2617 * Abort I/O that are waiting on a data buffer. These types of I/O are 2618 * linked using the spdk_bdev_io internal.buf_link TAILQ_ENTRY. 2619 */ 2620 static void 2621 bdev_abort_all_buf_io(bdev_io_stailq_t *queue, struct spdk_bdev_channel *ch) 2622 { 2623 bdev_io_stailq_t tmp; 2624 struct spdk_bdev_io *bdev_io; 2625 2626 STAILQ_INIT(&tmp); 2627 2628 while (!STAILQ_EMPTY(queue)) { 2629 bdev_io = STAILQ_FIRST(queue); 2630 STAILQ_REMOVE_HEAD(queue, internal.buf_link); 2631 if (bdev_io->internal.ch == ch) { 2632 spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_ABORTED); 2633 } else { 2634 STAILQ_INSERT_TAIL(&tmp, bdev_io, internal.buf_link); 2635 } 2636 } 2637 2638 STAILQ_SWAP(&tmp, queue, spdk_bdev_io); 2639 } 2640 2641 /* 2642 * Abort I/O that are queued waiting for submission. These types of I/O are 2643 * linked using the spdk_bdev_io link TAILQ_ENTRY. 2644 */ 2645 static void 2646 bdev_abort_all_queued_io(bdev_io_tailq_t *queue, struct spdk_bdev_channel *ch) 2647 { 2648 struct spdk_bdev_io *bdev_io, *tmp; 2649 2650 TAILQ_FOREACH_SAFE(bdev_io, queue, internal.link, tmp) { 2651 if (bdev_io->internal.ch == ch) { 2652 TAILQ_REMOVE(queue, bdev_io, internal.link); 2653 /* 2654 * spdk_bdev_io_complete() assumes that the completed I/O had 2655 * been submitted to the bdev module. Since in this case it 2656 * hadn't, bump io_outstanding to account for the decrement 2657 * that spdk_bdev_io_complete() will do. 2658 */ 2659 if (bdev_io->type != SPDK_BDEV_IO_TYPE_RESET) { 2660 ch->io_outstanding++; 2661 ch->shared_resource->io_outstanding++; 2662 } 2663 spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_ABORTED); 2664 } 2665 } 2666 } 2667 2668 static bool 2669 bdev_abort_queued_io(bdev_io_tailq_t *queue, struct spdk_bdev_io *bio_to_abort) 2670 { 2671 struct spdk_bdev_io *bdev_io; 2672 2673 TAILQ_FOREACH(bdev_io, queue, internal.link) { 2674 if (bdev_io == bio_to_abort) { 2675 TAILQ_REMOVE(queue, bio_to_abort, internal.link); 2676 spdk_bdev_io_complete(bio_to_abort, SPDK_BDEV_IO_STATUS_ABORTED); 2677 return true; 2678 } 2679 } 2680 2681 return false; 2682 } 2683 2684 static bool 2685 bdev_abort_buf_io(bdev_io_stailq_t *queue, struct spdk_bdev_io *bio_to_abort) 2686 { 2687 struct spdk_bdev_io *bdev_io; 2688 2689 STAILQ_FOREACH(bdev_io, queue, internal.buf_link) { 2690 if (bdev_io == bio_to_abort) { 2691 STAILQ_REMOVE(queue, bio_to_abort, spdk_bdev_io, internal.buf_link); 2692 spdk_bdev_io_complete(bio_to_abort, SPDK_BDEV_IO_STATUS_ABORTED); 2693 return true; 2694 } 2695 } 2696 2697 return false; 2698 } 2699 2700 static void 2701 bdev_qos_channel_destroy(void *cb_arg) 2702 { 2703 struct spdk_bdev_qos *qos = cb_arg; 2704 2705 spdk_put_io_channel(spdk_io_channel_from_ctx(qos->ch)); 2706 spdk_poller_unregister(&qos->poller); 2707 2708 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Free QoS %p.\n", qos); 2709 2710 free(qos); 2711 } 2712 2713 static int 2714 bdev_qos_destroy(struct spdk_bdev *bdev) 2715 { 2716 int i; 2717 2718 /* 2719 * Cleanly shutting down the QoS poller is tricky, because 2720 * during the asynchronous operation the user could open 2721 * a new descriptor and create a new channel, spawning 2722 * a new QoS poller. 2723 * 2724 * The strategy is to create a new QoS structure here and swap it 2725 * in. The shutdown path then continues to refer to the old one 2726 * until it completes and then releases it. 2727 */ 2728 struct spdk_bdev_qos *new_qos, *old_qos; 2729 2730 old_qos = bdev->internal.qos; 2731 2732 new_qos = calloc(1, sizeof(*new_qos)); 2733 if (!new_qos) { 2734 SPDK_ERRLOG("Unable to allocate memory to shut down QoS.\n"); 2735 return -ENOMEM; 2736 } 2737 2738 /* Copy the old QoS data into the newly allocated structure */ 2739 memcpy(new_qos, old_qos, sizeof(*new_qos)); 2740 2741 /* Zero out the key parts of the QoS structure */ 2742 new_qos->ch = NULL; 2743 new_qos->thread = NULL; 2744 new_qos->poller = NULL; 2745 TAILQ_INIT(&new_qos->queued); 2746 /* 2747 * The limit member of spdk_bdev_qos_limit structure is not zeroed. 2748 * It will be used later for the new QoS structure. 2749 */ 2750 for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) { 2751 new_qos->rate_limits[i].remaining_this_timeslice = 0; 2752 new_qos->rate_limits[i].min_per_timeslice = 0; 2753 new_qos->rate_limits[i].max_per_timeslice = 0; 2754 } 2755 2756 bdev->internal.qos = new_qos; 2757 2758 if (old_qos->thread == NULL) { 2759 free(old_qos); 2760 } else { 2761 spdk_thread_send_msg(old_qos->thread, bdev_qos_channel_destroy, old_qos); 2762 } 2763 2764 /* It is safe to continue with destroying the bdev even though the QoS channel hasn't 2765 * been destroyed yet. The destruction path will end up waiting for the final 2766 * channel to be put before it releases resources. */ 2767 2768 return 0; 2769 } 2770 2771 static void 2772 bdev_io_stat_add(struct spdk_bdev_io_stat *total, struct spdk_bdev_io_stat *add) 2773 { 2774 total->bytes_read += add->bytes_read; 2775 total->num_read_ops += add->num_read_ops; 2776 total->bytes_written += add->bytes_written; 2777 total->num_write_ops += add->num_write_ops; 2778 total->bytes_unmapped += add->bytes_unmapped; 2779 total->num_unmap_ops += add->num_unmap_ops; 2780 total->read_latency_ticks += add->read_latency_ticks; 2781 total->write_latency_ticks += add->write_latency_ticks; 2782 total->unmap_latency_ticks += add->unmap_latency_ticks; 2783 } 2784 2785 static void 2786 bdev_channel_destroy(void *io_device, void *ctx_buf) 2787 { 2788 struct spdk_bdev_channel *ch = ctx_buf; 2789 struct spdk_bdev_mgmt_channel *mgmt_ch; 2790 struct spdk_bdev_shared_resource *shared_resource = ch->shared_resource; 2791 2792 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Destroying channel %p for bdev %s on thread %p\n", ch, ch->bdev->name, 2793 spdk_get_thread()); 2794 2795 /* This channel is going away, so add its statistics into the bdev so that they don't get lost. */ 2796 pthread_mutex_lock(&ch->bdev->internal.mutex); 2797 bdev_io_stat_add(&ch->bdev->internal.stat, &ch->stat); 2798 pthread_mutex_unlock(&ch->bdev->internal.mutex); 2799 2800 mgmt_ch = shared_resource->mgmt_ch; 2801 2802 bdev_abort_all_queued_io(&ch->queued_resets, ch); 2803 bdev_abort_all_queued_io(&shared_resource->nomem_io, ch); 2804 bdev_abort_all_buf_io(&mgmt_ch->need_buf_small, ch); 2805 bdev_abort_all_buf_io(&mgmt_ch->need_buf_large, ch); 2806 2807 if (ch->histogram) { 2808 spdk_histogram_data_free(ch->histogram); 2809 } 2810 2811 bdev_channel_destroy_resource(ch); 2812 } 2813 2814 int 2815 spdk_bdev_alias_add(struct spdk_bdev *bdev, const char *alias) 2816 { 2817 struct spdk_bdev_alias *tmp; 2818 2819 if (alias == NULL) { 2820 SPDK_ERRLOG("Empty alias passed\n"); 2821 return -EINVAL; 2822 } 2823 2824 if (spdk_bdev_get_by_name(alias)) { 2825 SPDK_ERRLOG("Bdev name/alias: %s already exists\n", alias); 2826 return -EEXIST; 2827 } 2828 2829 tmp = calloc(1, sizeof(*tmp)); 2830 if (tmp == NULL) { 2831 SPDK_ERRLOG("Unable to allocate alias\n"); 2832 return -ENOMEM; 2833 } 2834 2835 tmp->alias = strdup(alias); 2836 if (tmp->alias == NULL) { 2837 free(tmp); 2838 SPDK_ERRLOG("Unable to allocate alias\n"); 2839 return -ENOMEM; 2840 } 2841 2842 TAILQ_INSERT_TAIL(&bdev->aliases, tmp, tailq); 2843 2844 return 0; 2845 } 2846 2847 int 2848 spdk_bdev_alias_del(struct spdk_bdev *bdev, const char *alias) 2849 { 2850 struct spdk_bdev_alias *tmp; 2851 2852 TAILQ_FOREACH(tmp, &bdev->aliases, tailq) { 2853 if (strcmp(alias, tmp->alias) == 0) { 2854 TAILQ_REMOVE(&bdev->aliases, tmp, tailq); 2855 free(tmp->alias); 2856 free(tmp); 2857 return 0; 2858 } 2859 } 2860 2861 SPDK_INFOLOG(SPDK_LOG_BDEV, "Alias %s does not exists\n", alias); 2862 2863 return -ENOENT; 2864 } 2865 2866 void 2867 spdk_bdev_alias_del_all(struct spdk_bdev *bdev) 2868 { 2869 struct spdk_bdev_alias *p, *tmp; 2870 2871 TAILQ_FOREACH_SAFE(p, &bdev->aliases, tailq, tmp) { 2872 TAILQ_REMOVE(&bdev->aliases, p, tailq); 2873 free(p->alias); 2874 free(p); 2875 } 2876 } 2877 2878 struct spdk_io_channel * 2879 spdk_bdev_get_io_channel(struct spdk_bdev_desc *desc) 2880 { 2881 return spdk_get_io_channel(__bdev_to_io_dev(spdk_bdev_desc_get_bdev(desc))); 2882 } 2883 2884 const char * 2885 spdk_bdev_get_name(const struct spdk_bdev *bdev) 2886 { 2887 return bdev->name; 2888 } 2889 2890 const char * 2891 spdk_bdev_get_product_name(const struct spdk_bdev *bdev) 2892 { 2893 return bdev->product_name; 2894 } 2895 2896 const struct spdk_bdev_aliases_list * 2897 spdk_bdev_get_aliases(const struct spdk_bdev *bdev) 2898 { 2899 return &bdev->aliases; 2900 } 2901 2902 uint32_t 2903 spdk_bdev_get_block_size(const struct spdk_bdev *bdev) 2904 { 2905 return bdev->blocklen; 2906 } 2907 2908 uint32_t 2909 spdk_bdev_get_write_unit_size(const struct spdk_bdev *bdev) 2910 { 2911 return bdev->write_unit_size; 2912 } 2913 2914 uint64_t 2915 spdk_bdev_get_num_blocks(const struct spdk_bdev *bdev) 2916 { 2917 return bdev->blockcnt; 2918 } 2919 2920 const char * 2921 spdk_bdev_get_qos_rpc_type(enum spdk_bdev_qos_rate_limit_type type) 2922 { 2923 return qos_rpc_type[type]; 2924 } 2925 2926 void 2927 spdk_bdev_get_qos_rate_limits(struct spdk_bdev *bdev, uint64_t *limits) 2928 { 2929 int i; 2930 2931 memset(limits, 0, sizeof(*limits) * SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES); 2932 2933 pthread_mutex_lock(&bdev->internal.mutex); 2934 if (bdev->internal.qos) { 2935 for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) { 2936 if (bdev->internal.qos->rate_limits[i].limit != 2937 SPDK_BDEV_QOS_LIMIT_NOT_DEFINED) { 2938 limits[i] = bdev->internal.qos->rate_limits[i].limit; 2939 if (bdev_qos_is_iops_rate_limit(i) == false) { 2940 /* Change from Byte to Megabyte which is user visible. */ 2941 limits[i] = limits[i] / 1024 / 1024; 2942 } 2943 } 2944 } 2945 } 2946 pthread_mutex_unlock(&bdev->internal.mutex); 2947 } 2948 2949 size_t 2950 spdk_bdev_get_buf_align(const struct spdk_bdev *bdev) 2951 { 2952 return 1 << bdev->required_alignment; 2953 } 2954 2955 uint32_t 2956 spdk_bdev_get_optimal_io_boundary(const struct spdk_bdev *bdev) 2957 { 2958 return bdev->optimal_io_boundary; 2959 } 2960 2961 bool 2962 spdk_bdev_has_write_cache(const struct spdk_bdev *bdev) 2963 { 2964 return bdev->write_cache; 2965 } 2966 2967 const struct spdk_uuid * 2968 spdk_bdev_get_uuid(const struct spdk_bdev *bdev) 2969 { 2970 return &bdev->uuid; 2971 } 2972 2973 uint16_t 2974 spdk_bdev_get_acwu(const struct spdk_bdev *bdev) 2975 { 2976 return bdev->acwu; 2977 } 2978 2979 uint32_t 2980 spdk_bdev_get_md_size(const struct spdk_bdev *bdev) 2981 { 2982 return bdev->md_len; 2983 } 2984 2985 bool 2986 spdk_bdev_is_md_interleaved(const struct spdk_bdev *bdev) 2987 { 2988 return (bdev->md_len != 0) && bdev->md_interleave; 2989 } 2990 2991 bool 2992 spdk_bdev_is_md_separate(const struct spdk_bdev *bdev) 2993 { 2994 return (bdev->md_len != 0) && !bdev->md_interleave; 2995 } 2996 2997 bool 2998 spdk_bdev_is_zoned(const struct spdk_bdev *bdev) 2999 { 3000 return bdev->zoned; 3001 } 3002 3003 uint32_t 3004 spdk_bdev_get_data_block_size(const struct spdk_bdev *bdev) 3005 { 3006 if (spdk_bdev_is_md_interleaved(bdev)) { 3007 return bdev->blocklen - bdev->md_len; 3008 } else { 3009 return bdev->blocklen; 3010 } 3011 } 3012 3013 static uint32_t 3014 _bdev_get_block_size_with_md(const struct spdk_bdev *bdev) 3015 { 3016 if (!spdk_bdev_is_md_interleaved(bdev)) { 3017 return bdev->blocklen + bdev->md_len; 3018 } else { 3019 return bdev->blocklen; 3020 } 3021 } 3022 3023 enum spdk_dif_type spdk_bdev_get_dif_type(const struct spdk_bdev *bdev) 3024 { 3025 if (bdev->md_len != 0) { 3026 return bdev->dif_type; 3027 } else { 3028 return SPDK_DIF_DISABLE; 3029 } 3030 } 3031 3032 bool 3033 spdk_bdev_is_dif_head_of_md(const struct spdk_bdev *bdev) 3034 { 3035 if (spdk_bdev_get_dif_type(bdev) != SPDK_DIF_DISABLE) { 3036 return bdev->dif_is_head_of_md; 3037 } else { 3038 return false; 3039 } 3040 } 3041 3042 bool 3043 spdk_bdev_is_dif_check_enabled(const struct spdk_bdev *bdev, 3044 enum spdk_dif_check_type check_type) 3045 { 3046 if (spdk_bdev_get_dif_type(bdev) == SPDK_DIF_DISABLE) { 3047 return false; 3048 } 3049 3050 switch (check_type) { 3051 case SPDK_DIF_CHECK_TYPE_REFTAG: 3052 return (bdev->dif_check_flags & SPDK_DIF_FLAGS_REFTAG_CHECK) != 0; 3053 case SPDK_DIF_CHECK_TYPE_APPTAG: 3054 return (bdev->dif_check_flags & SPDK_DIF_FLAGS_APPTAG_CHECK) != 0; 3055 case SPDK_DIF_CHECK_TYPE_GUARD: 3056 return (bdev->dif_check_flags & SPDK_DIF_FLAGS_GUARD_CHECK) != 0; 3057 default: 3058 return false; 3059 } 3060 } 3061 3062 uint64_t 3063 spdk_bdev_get_qd(const struct spdk_bdev *bdev) 3064 { 3065 return bdev->internal.measured_queue_depth; 3066 } 3067 3068 uint64_t 3069 spdk_bdev_get_qd_sampling_period(const struct spdk_bdev *bdev) 3070 { 3071 return bdev->internal.period; 3072 } 3073 3074 uint64_t 3075 spdk_bdev_get_weighted_io_time(const struct spdk_bdev *bdev) 3076 { 3077 return bdev->internal.weighted_io_time; 3078 } 3079 3080 uint64_t 3081 spdk_bdev_get_io_time(const struct spdk_bdev *bdev) 3082 { 3083 return bdev->internal.io_time; 3084 } 3085 3086 static void 3087 _calculate_measured_qd_cpl(struct spdk_io_channel_iter *i, int status) 3088 { 3089 struct spdk_bdev *bdev = spdk_io_channel_iter_get_ctx(i); 3090 3091 bdev->internal.measured_queue_depth = bdev->internal.temporary_queue_depth; 3092 3093 if (bdev->internal.measured_queue_depth) { 3094 bdev->internal.io_time += bdev->internal.period; 3095 bdev->internal.weighted_io_time += bdev->internal.period * bdev->internal.measured_queue_depth; 3096 } 3097 } 3098 3099 static void 3100 _calculate_measured_qd(struct spdk_io_channel_iter *i) 3101 { 3102 struct spdk_bdev *bdev = spdk_io_channel_iter_get_ctx(i); 3103 struct spdk_io_channel *io_ch = spdk_io_channel_iter_get_channel(i); 3104 struct spdk_bdev_channel *ch = spdk_io_channel_get_ctx(io_ch); 3105 3106 bdev->internal.temporary_queue_depth += ch->io_outstanding; 3107 spdk_for_each_channel_continue(i, 0); 3108 } 3109 3110 static int 3111 bdev_calculate_measured_queue_depth(void *ctx) 3112 { 3113 struct spdk_bdev *bdev = ctx; 3114 bdev->internal.temporary_queue_depth = 0; 3115 spdk_for_each_channel(__bdev_to_io_dev(bdev), _calculate_measured_qd, bdev, 3116 _calculate_measured_qd_cpl); 3117 return 0; 3118 } 3119 3120 void 3121 spdk_bdev_set_qd_sampling_period(struct spdk_bdev *bdev, uint64_t period) 3122 { 3123 bdev->internal.period = period; 3124 3125 if (bdev->internal.qd_poller != NULL) { 3126 spdk_poller_unregister(&bdev->internal.qd_poller); 3127 bdev->internal.measured_queue_depth = UINT64_MAX; 3128 } 3129 3130 if (period != 0) { 3131 bdev->internal.qd_poller = SPDK_POLLER_REGISTER(bdev_calculate_measured_queue_depth, bdev, 3132 period); 3133 } 3134 } 3135 3136 static void 3137 _resize_notify(void *arg) 3138 { 3139 struct spdk_bdev_desc *desc = arg; 3140 3141 pthread_mutex_lock(&desc->mutex); 3142 desc->refs--; 3143 if (!desc->closed) { 3144 pthread_mutex_unlock(&desc->mutex); 3145 desc->callback.event_fn(SPDK_BDEV_EVENT_RESIZE, 3146 desc->bdev, 3147 desc->callback.ctx); 3148 return; 3149 } else if (0 == desc->refs) { 3150 /* This descriptor was closed after this resize_notify message was sent. 3151 * spdk_bdev_close() could not free the descriptor since this message was 3152 * in flight, so we free it now using bdev_desc_free(). 3153 */ 3154 pthread_mutex_unlock(&desc->mutex); 3155 bdev_desc_free(desc); 3156 return; 3157 } 3158 pthread_mutex_unlock(&desc->mutex); 3159 } 3160 3161 int 3162 spdk_bdev_notify_blockcnt_change(struct spdk_bdev *bdev, uint64_t size) 3163 { 3164 struct spdk_bdev_desc *desc; 3165 int ret; 3166 3167 pthread_mutex_lock(&bdev->internal.mutex); 3168 3169 /* bdev has open descriptors */ 3170 if (!TAILQ_EMPTY(&bdev->internal.open_descs) && 3171 bdev->blockcnt > size) { 3172 ret = -EBUSY; 3173 } else { 3174 bdev->blockcnt = size; 3175 TAILQ_FOREACH(desc, &bdev->internal.open_descs, link) { 3176 pthread_mutex_lock(&desc->mutex); 3177 if (desc->callback.open_with_ext && !desc->closed) { 3178 desc->refs++; 3179 spdk_thread_send_msg(desc->thread, _resize_notify, desc); 3180 } 3181 pthread_mutex_unlock(&desc->mutex); 3182 } 3183 ret = 0; 3184 } 3185 3186 pthread_mutex_unlock(&bdev->internal.mutex); 3187 3188 return ret; 3189 } 3190 3191 /* 3192 * Convert I/O offset and length from bytes to blocks. 3193 * 3194 * Returns zero on success or non-zero if the byte parameters aren't divisible by the block size. 3195 */ 3196 static uint64_t 3197 bdev_bytes_to_blocks(struct spdk_bdev *bdev, uint64_t offset_bytes, uint64_t *offset_blocks, 3198 uint64_t num_bytes, uint64_t *num_blocks) 3199 { 3200 uint32_t block_size = bdev->blocklen; 3201 uint8_t shift_cnt; 3202 3203 /* Avoid expensive div operations if possible. These spdk_u32 functions are very cheap. */ 3204 if (spdk_likely(spdk_u32_is_pow2(block_size))) { 3205 shift_cnt = spdk_u32log2(block_size); 3206 *offset_blocks = offset_bytes >> shift_cnt; 3207 *num_blocks = num_bytes >> shift_cnt; 3208 return (offset_bytes - (*offset_blocks << shift_cnt)) | 3209 (num_bytes - (*num_blocks << shift_cnt)); 3210 } else { 3211 *offset_blocks = offset_bytes / block_size; 3212 *num_blocks = num_bytes / block_size; 3213 return (offset_bytes % block_size) | (num_bytes % block_size); 3214 } 3215 } 3216 3217 static bool 3218 bdev_io_valid_blocks(struct spdk_bdev *bdev, uint64_t offset_blocks, uint64_t num_blocks) 3219 { 3220 /* Return failure if offset_blocks + num_blocks is less than offset_blocks; indicates there 3221 * has been an overflow and hence the offset has been wrapped around */ 3222 if (offset_blocks + num_blocks < offset_blocks) { 3223 return false; 3224 } 3225 3226 /* Return failure if offset_blocks + num_blocks exceeds the size of the bdev */ 3227 if (offset_blocks + num_blocks > bdev->blockcnt) { 3228 return false; 3229 } 3230 3231 return true; 3232 } 3233 3234 static bool 3235 _bdev_io_check_md_buf(const struct iovec *iovs, const void *md_buf) 3236 { 3237 return _is_buf_allocated(iovs) == (md_buf != NULL); 3238 } 3239 3240 static int 3241 bdev_read_blocks_with_md(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, void *buf, 3242 void *md_buf, int64_t offset_blocks, uint64_t num_blocks, 3243 spdk_bdev_io_completion_cb cb, void *cb_arg) 3244 { 3245 struct spdk_bdev *bdev = spdk_bdev_desc_get_bdev(desc); 3246 struct spdk_bdev_io *bdev_io; 3247 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 3248 3249 if (!bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) { 3250 return -EINVAL; 3251 } 3252 3253 bdev_io = bdev_channel_get_io(channel); 3254 if (!bdev_io) { 3255 return -ENOMEM; 3256 } 3257 3258 bdev_io->internal.ch = channel; 3259 bdev_io->internal.desc = desc; 3260 bdev_io->type = SPDK_BDEV_IO_TYPE_READ; 3261 bdev_io->u.bdev.iovs = &bdev_io->iov; 3262 bdev_io->u.bdev.iovs[0].iov_base = buf; 3263 bdev_io->u.bdev.iovs[0].iov_len = num_blocks * bdev->blocklen; 3264 bdev_io->u.bdev.iovcnt = 1; 3265 bdev_io->u.bdev.md_buf = md_buf; 3266 bdev_io->u.bdev.num_blocks = num_blocks; 3267 bdev_io->u.bdev.offset_blocks = offset_blocks; 3268 bdev_io_init(bdev_io, bdev, cb_arg, cb); 3269 3270 bdev_io_submit(bdev_io); 3271 return 0; 3272 } 3273 3274 int 3275 spdk_bdev_read(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 3276 void *buf, uint64_t offset, uint64_t nbytes, 3277 spdk_bdev_io_completion_cb cb, void *cb_arg) 3278 { 3279 uint64_t offset_blocks, num_blocks; 3280 3281 if (bdev_bytes_to_blocks(spdk_bdev_desc_get_bdev(desc), offset, &offset_blocks, 3282 nbytes, &num_blocks) != 0) { 3283 return -EINVAL; 3284 } 3285 3286 return spdk_bdev_read_blocks(desc, ch, buf, offset_blocks, num_blocks, cb, cb_arg); 3287 } 3288 3289 int 3290 spdk_bdev_read_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 3291 void *buf, uint64_t offset_blocks, uint64_t num_blocks, 3292 spdk_bdev_io_completion_cb cb, void *cb_arg) 3293 { 3294 return bdev_read_blocks_with_md(desc, ch, buf, NULL, offset_blocks, num_blocks, cb, cb_arg); 3295 } 3296 3297 int 3298 spdk_bdev_read_blocks_with_md(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 3299 void *buf, void *md_buf, int64_t offset_blocks, uint64_t num_blocks, 3300 spdk_bdev_io_completion_cb cb, void *cb_arg) 3301 { 3302 struct iovec iov = { 3303 .iov_base = buf, 3304 }; 3305 3306 if (!spdk_bdev_is_md_separate(spdk_bdev_desc_get_bdev(desc))) { 3307 return -EINVAL; 3308 } 3309 3310 if (!_bdev_io_check_md_buf(&iov, md_buf)) { 3311 return -EINVAL; 3312 } 3313 3314 return bdev_read_blocks_with_md(desc, ch, buf, md_buf, offset_blocks, num_blocks, 3315 cb, cb_arg); 3316 } 3317 3318 int 3319 spdk_bdev_readv(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 3320 struct iovec *iov, int iovcnt, 3321 uint64_t offset, uint64_t nbytes, 3322 spdk_bdev_io_completion_cb cb, void *cb_arg) 3323 { 3324 uint64_t offset_blocks, num_blocks; 3325 3326 if (bdev_bytes_to_blocks(spdk_bdev_desc_get_bdev(desc), offset, &offset_blocks, 3327 nbytes, &num_blocks) != 0) { 3328 return -EINVAL; 3329 } 3330 3331 return spdk_bdev_readv_blocks(desc, ch, iov, iovcnt, offset_blocks, num_blocks, cb, cb_arg); 3332 } 3333 3334 static int 3335 bdev_readv_blocks_with_md(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 3336 struct iovec *iov, int iovcnt, void *md_buf, uint64_t offset_blocks, 3337 uint64_t num_blocks, spdk_bdev_io_completion_cb cb, void *cb_arg) 3338 { 3339 struct spdk_bdev *bdev = spdk_bdev_desc_get_bdev(desc); 3340 struct spdk_bdev_io *bdev_io; 3341 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 3342 3343 if (!bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) { 3344 return -EINVAL; 3345 } 3346 3347 bdev_io = bdev_channel_get_io(channel); 3348 if (!bdev_io) { 3349 return -ENOMEM; 3350 } 3351 3352 bdev_io->internal.ch = channel; 3353 bdev_io->internal.desc = desc; 3354 bdev_io->type = SPDK_BDEV_IO_TYPE_READ; 3355 bdev_io->u.bdev.iovs = iov; 3356 bdev_io->u.bdev.iovcnt = iovcnt; 3357 bdev_io->u.bdev.md_buf = md_buf; 3358 bdev_io->u.bdev.num_blocks = num_blocks; 3359 bdev_io->u.bdev.offset_blocks = offset_blocks; 3360 bdev_io_init(bdev_io, bdev, cb_arg, cb); 3361 3362 bdev_io_submit(bdev_io); 3363 return 0; 3364 } 3365 3366 int spdk_bdev_readv_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 3367 struct iovec *iov, int iovcnt, 3368 uint64_t offset_blocks, uint64_t num_blocks, 3369 spdk_bdev_io_completion_cb cb, void *cb_arg) 3370 { 3371 return bdev_readv_blocks_with_md(desc, ch, iov, iovcnt, NULL, offset_blocks, 3372 num_blocks, cb, cb_arg); 3373 } 3374 3375 int 3376 spdk_bdev_readv_blocks_with_md(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 3377 struct iovec *iov, int iovcnt, void *md_buf, 3378 uint64_t offset_blocks, uint64_t num_blocks, 3379 spdk_bdev_io_completion_cb cb, void *cb_arg) 3380 { 3381 if (!spdk_bdev_is_md_separate(spdk_bdev_desc_get_bdev(desc))) { 3382 return -EINVAL; 3383 } 3384 3385 if (!_bdev_io_check_md_buf(iov, md_buf)) { 3386 return -EINVAL; 3387 } 3388 3389 return bdev_readv_blocks_with_md(desc, ch, iov, iovcnt, md_buf, offset_blocks, 3390 num_blocks, cb, cb_arg); 3391 } 3392 3393 static int 3394 bdev_write_blocks_with_md(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 3395 void *buf, void *md_buf, uint64_t offset_blocks, uint64_t num_blocks, 3396 spdk_bdev_io_completion_cb cb, void *cb_arg) 3397 { 3398 struct spdk_bdev *bdev = spdk_bdev_desc_get_bdev(desc); 3399 struct spdk_bdev_io *bdev_io; 3400 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 3401 3402 if (!desc->write) { 3403 return -EBADF; 3404 } 3405 3406 if (!bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) { 3407 return -EINVAL; 3408 } 3409 3410 bdev_io = bdev_channel_get_io(channel); 3411 if (!bdev_io) { 3412 return -ENOMEM; 3413 } 3414 3415 bdev_io->internal.ch = channel; 3416 bdev_io->internal.desc = desc; 3417 bdev_io->type = SPDK_BDEV_IO_TYPE_WRITE; 3418 bdev_io->u.bdev.iovs = &bdev_io->iov; 3419 bdev_io->u.bdev.iovs[0].iov_base = buf; 3420 bdev_io->u.bdev.iovs[0].iov_len = num_blocks * bdev->blocklen; 3421 bdev_io->u.bdev.iovcnt = 1; 3422 bdev_io->u.bdev.md_buf = md_buf; 3423 bdev_io->u.bdev.num_blocks = num_blocks; 3424 bdev_io->u.bdev.offset_blocks = offset_blocks; 3425 bdev_io_init(bdev_io, bdev, cb_arg, cb); 3426 3427 bdev_io_submit(bdev_io); 3428 return 0; 3429 } 3430 3431 int 3432 spdk_bdev_write(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 3433 void *buf, uint64_t offset, uint64_t nbytes, 3434 spdk_bdev_io_completion_cb cb, void *cb_arg) 3435 { 3436 uint64_t offset_blocks, num_blocks; 3437 3438 if (bdev_bytes_to_blocks(spdk_bdev_desc_get_bdev(desc), offset, &offset_blocks, 3439 nbytes, &num_blocks) != 0) { 3440 return -EINVAL; 3441 } 3442 3443 return spdk_bdev_write_blocks(desc, ch, buf, offset_blocks, num_blocks, cb, cb_arg); 3444 } 3445 3446 int 3447 spdk_bdev_write_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 3448 void *buf, uint64_t offset_blocks, uint64_t num_blocks, 3449 spdk_bdev_io_completion_cb cb, void *cb_arg) 3450 { 3451 return bdev_write_blocks_with_md(desc, ch, buf, NULL, offset_blocks, num_blocks, 3452 cb, cb_arg); 3453 } 3454 3455 int 3456 spdk_bdev_write_blocks_with_md(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 3457 void *buf, void *md_buf, uint64_t offset_blocks, uint64_t num_blocks, 3458 spdk_bdev_io_completion_cb cb, void *cb_arg) 3459 { 3460 struct iovec iov = { 3461 .iov_base = buf, 3462 }; 3463 3464 if (!spdk_bdev_is_md_separate(spdk_bdev_desc_get_bdev(desc))) { 3465 return -EINVAL; 3466 } 3467 3468 if (!_bdev_io_check_md_buf(&iov, md_buf)) { 3469 return -EINVAL; 3470 } 3471 3472 return bdev_write_blocks_with_md(desc, ch, buf, md_buf, offset_blocks, num_blocks, 3473 cb, cb_arg); 3474 } 3475 3476 static int 3477 bdev_writev_blocks_with_md(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 3478 struct iovec *iov, int iovcnt, void *md_buf, 3479 uint64_t offset_blocks, uint64_t num_blocks, 3480 spdk_bdev_io_completion_cb cb, void *cb_arg) 3481 { 3482 struct spdk_bdev *bdev = spdk_bdev_desc_get_bdev(desc); 3483 struct spdk_bdev_io *bdev_io; 3484 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 3485 3486 if (!desc->write) { 3487 return -EBADF; 3488 } 3489 3490 if (!bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) { 3491 return -EINVAL; 3492 } 3493 3494 bdev_io = bdev_channel_get_io(channel); 3495 if (!bdev_io) { 3496 return -ENOMEM; 3497 } 3498 3499 bdev_io->internal.ch = channel; 3500 bdev_io->internal.desc = desc; 3501 bdev_io->type = SPDK_BDEV_IO_TYPE_WRITE; 3502 bdev_io->u.bdev.iovs = iov; 3503 bdev_io->u.bdev.iovcnt = iovcnt; 3504 bdev_io->u.bdev.md_buf = md_buf; 3505 bdev_io->u.bdev.num_blocks = num_blocks; 3506 bdev_io->u.bdev.offset_blocks = offset_blocks; 3507 bdev_io_init(bdev_io, bdev, cb_arg, cb); 3508 3509 bdev_io_submit(bdev_io); 3510 return 0; 3511 } 3512 3513 int 3514 spdk_bdev_writev(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 3515 struct iovec *iov, int iovcnt, 3516 uint64_t offset, uint64_t len, 3517 spdk_bdev_io_completion_cb cb, void *cb_arg) 3518 { 3519 uint64_t offset_blocks, num_blocks; 3520 3521 if (bdev_bytes_to_blocks(spdk_bdev_desc_get_bdev(desc), offset, &offset_blocks, 3522 len, &num_blocks) != 0) { 3523 return -EINVAL; 3524 } 3525 3526 return spdk_bdev_writev_blocks(desc, ch, iov, iovcnt, offset_blocks, num_blocks, cb, cb_arg); 3527 } 3528 3529 int 3530 spdk_bdev_writev_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 3531 struct iovec *iov, int iovcnt, 3532 uint64_t offset_blocks, uint64_t num_blocks, 3533 spdk_bdev_io_completion_cb cb, void *cb_arg) 3534 { 3535 return bdev_writev_blocks_with_md(desc, ch, iov, iovcnt, NULL, offset_blocks, 3536 num_blocks, cb, cb_arg); 3537 } 3538 3539 int 3540 spdk_bdev_writev_blocks_with_md(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 3541 struct iovec *iov, int iovcnt, void *md_buf, 3542 uint64_t offset_blocks, uint64_t num_blocks, 3543 spdk_bdev_io_completion_cb cb, void *cb_arg) 3544 { 3545 if (!spdk_bdev_is_md_separate(spdk_bdev_desc_get_bdev(desc))) { 3546 return -EINVAL; 3547 } 3548 3549 if (!_bdev_io_check_md_buf(iov, md_buf)) { 3550 return -EINVAL; 3551 } 3552 3553 return bdev_writev_blocks_with_md(desc, ch, iov, iovcnt, md_buf, offset_blocks, 3554 num_blocks, cb, cb_arg); 3555 } 3556 3557 static void 3558 bdev_compare_do_read_done(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg) 3559 { 3560 struct spdk_bdev_io *parent_io = cb_arg; 3561 uint8_t *read_buf = bdev_io->u.bdev.iovs[0].iov_base; 3562 int i, rc = 0; 3563 3564 if (!success) { 3565 parent_io->internal.status = SPDK_BDEV_IO_STATUS_FAILED; 3566 parent_io->internal.cb(parent_io, false, parent_io->internal.caller_ctx); 3567 spdk_bdev_free_io(bdev_io); 3568 return; 3569 } 3570 3571 for (i = 0; i < parent_io->u.bdev.iovcnt; i++) { 3572 rc = memcmp(read_buf, 3573 parent_io->u.bdev.iovs[i].iov_base, 3574 parent_io->u.bdev.iovs[i].iov_len); 3575 if (rc) { 3576 break; 3577 } 3578 read_buf += parent_io->u.bdev.iovs[i].iov_len; 3579 } 3580 3581 spdk_bdev_free_io(bdev_io); 3582 3583 if (rc == 0) { 3584 parent_io->internal.status = SPDK_BDEV_IO_STATUS_SUCCESS; 3585 parent_io->internal.cb(parent_io, true, parent_io->internal.caller_ctx); 3586 } else { 3587 parent_io->internal.status = SPDK_BDEV_IO_STATUS_MISCOMPARE; 3588 parent_io->internal.cb(parent_io, false, parent_io->internal.caller_ctx); 3589 } 3590 } 3591 3592 static void 3593 bdev_compare_do_read(void *_bdev_io) 3594 { 3595 struct spdk_bdev_io *bdev_io = _bdev_io; 3596 int rc; 3597 3598 rc = spdk_bdev_read_blocks(bdev_io->internal.desc, 3599 spdk_io_channel_from_ctx(bdev_io->internal.ch), NULL, 3600 bdev_io->u.bdev.offset_blocks, bdev_io->u.bdev.num_blocks, 3601 bdev_compare_do_read_done, bdev_io); 3602 3603 if (rc == -ENOMEM) { 3604 bdev_queue_io_wait_with_cb(bdev_io, bdev_compare_do_read); 3605 } else if (rc != 0) { 3606 bdev_io->internal.status = SPDK_BDEV_IO_STATUS_FAILED; 3607 bdev_io->internal.cb(bdev_io, false, bdev_io->internal.caller_ctx); 3608 } 3609 } 3610 3611 static int 3612 bdev_comparev_blocks_with_md(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 3613 struct iovec *iov, int iovcnt, void *md_buf, 3614 uint64_t offset_blocks, uint64_t num_blocks, 3615 spdk_bdev_io_completion_cb cb, void *cb_arg) 3616 { 3617 struct spdk_bdev *bdev = spdk_bdev_desc_get_bdev(desc); 3618 struct spdk_bdev_io *bdev_io; 3619 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 3620 3621 if (!bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) { 3622 return -EINVAL; 3623 } 3624 3625 bdev_io = bdev_channel_get_io(channel); 3626 if (!bdev_io) { 3627 return -ENOMEM; 3628 } 3629 3630 bdev_io->internal.ch = channel; 3631 bdev_io->internal.desc = desc; 3632 bdev_io->type = SPDK_BDEV_IO_TYPE_COMPARE; 3633 bdev_io->u.bdev.iovs = iov; 3634 bdev_io->u.bdev.iovcnt = iovcnt; 3635 bdev_io->u.bdev.md_buf = md_buf; 3636 bdev_io->u.bdev.num_blocks = num_blocks; 3637 bdev_io->u.bdev.offset_blocks = offset_blocks; 3638 bdev_io_init(bdev_io, bdev, cb_arg, cb); 3639 3640 if (bdev_io_type_supported(bdev, SPDK_BDEV_IO_TYPE_COMPARE)) { 3641 bdev_io_submit(bdev_io); 3642 return 0; 3643 } 3644 3645 bdev_compare_do_read(bdev_io); 3646 3647 return 0; 3648 } 3649 3650 int 3651 spdk_bdev_comparev_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 3652 struct iovec *iov, int iovcnt, 3653 uint64_t offset_blocks, uint64_t num_blocks, 3654 spdk_bdev_io_completion_cb cb, void *cb_arg) 3655 { 3656 return bdev_comparev_blocks_with_md(desc, ch, iov, iovcnt, NULL, offset_blocks, 3657 num_blocks, cb, cb_arg); 3658 } 3659 3660 int 3661 spdk_bdev_comparev_blocks_with_md(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 3662 struct iovec *iov, int iovcnt, void *md_buf, 3663 uint64_t offset_blocks, uint64_t num_blocks, 3664 spdk_bdev_io_completion_cb cb, void *cb_arg) 3665 { 3666 if (!spdk_bdev_is_md_separate(spdk_bdev_desc_get_bdev(desc))) { 3667 return -EINVAL; 3668 } 3669 3670 if (!_bdev_io_check_md_buf(iov, md_buf)) { 3671 return -EINVAL; 3672 } 3673 3674 return bdev_comparev_blocks_with_md(desc, ch, iov, iovcnt, md_buf, offset_blocks, 3675 num_blocks, cb, cb_arg); 3676 } 3677 3678 static int 3679 bdev_compare_blocks_with_md(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 3680 void *buf, void *md_buf, uint64_t offset_blocks, uint64_t num_blocks, 3681 spdk_bdev_io_completion_cb cb, void *cb_arg) 3682 { 3683 struct spdk_bdev *bdev = spdk_bdev_desc_get_bdev(desc); 3684 struct spdk_bdev_io *bdev_io; 3685 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 3686 3687 if (!bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) { 3688 return -EINVAL; 3689 } 3690 3691 bdev_io = bdev_channel_get_io(channel); 3692 if (!bdev_io) { 3693 return -ENOMEM; 3694 } 3695 3696 bdev_io->internal.ch = channel; 3697 bdev_io->internal.desc = desc; 3698 bdev_io->type = SPDK_BDEV_IO_TYPE_COMPARE; 3699 bdev_io->u.bdev.iovs = &bdev_io->iov; 3700 bdev_io->u.bdev.iovs[0].iov_base = buf; 3701 bdev_io->u.bdev.iovs[0].iov_len = num_blocks * bdev->blocklen; 3702 bdev_io->u.bdev.iovcnt = 1; 3703 bdev_io->u.bdev.md_buf = md_buf; 3704 bdev_io->u.bdev.num_blocks = num_blocks; 3705 bdev_io->u.bdev.offset_blocks = offset_blocks; 3706 bdev_io_init(bdev_io, bdev, cb_arg, cb); 3707 3708 if (bdev_io_type_supported(bdev, SPDK_BDEV_IO_TYPE_COMPARE)) { 3709 bdev_io_submit(bdev_io); 3710 return 0; 3711 } 3712 3713 bdev_compare_do_read(bdev_io); 3714 3715 return 0; 3716 } 3717 3718 int 3719 spdk_bdev_compare_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 3720 void *buf, uint64_t offset_blocks, uint64_t num_blocks, 3721 spdk_bdev_io_completion_cb cb, void *cb_arg) 3722 { 3723 return bdev_compare_blocks_with_md(desc, ch, buf, NULL, offset_blocks, num_blocks, 3724 cb, cb_arg); 3725 } 3726 3727 int 3728 spdk_bdev_compare_blocks_with_md(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 3729 void *buf, void *md_buf, uint64_t offset_blocks, uint64_t num_blocks, 3730 spdk_bdev_io_completion_cb cb, void *cb_arg) 3731 { 3732 struct iovec iov = { 3733 .iov_base = buf, 3734 }; 3735 3736 if (!spdk_bdev_is_md_separate(spdk_bdev_desc_get_bdev(desc))) { 3737 return -EINVAL; 3738 } 3739 3740 if (!_bdev_io_check_md_buf(&iov, md_buf)) { 3741 return -EINVAL; 3742 } 3743 3744 return bdev_compare_blocks_with_md(desc, ch, buf, md_buf, offset_blocks, num_blocks, 3745 cb, cb_arg); 3746 } 3747 3748 static void 3749 bdev_comparev_and_writev_blocks_unlocked(void *ctx, int unlock_status) 3750 { 3751 struct spdk_bdev_io *bdev_io = ctx; 3752 3753 if (unlock_status) { 3754 SPDK_ERRLOG("LBA range unlock failed\n"); 3755 } 3756 3757 bdev_io->internal.cb(bdev_io, bdev_io->internal.status == SPDK_BDEV_IO_STATUS_SUCCESS ? true : 3758 false, bdev_io->internal.caller_ctx); 3759 } 3760 3761 static void 3762 bdev_comparev_and_writev_blocks_unlock(struct spdk_bdev_io *bdev_io, int status) 3763 { 3764 bdev_io->internal.status = status; 3765 3766 bdev_unlock_lba_range(bdev_io->internal.desc, spdk_io_channel_from_ctx(bdev_io->internal.ch), 3767 bdev_io->u.bdev.offset_blocks, bdev_io->u.bdev.num_blocks, 3768 bdev_comparev_and_writev_blocks_unlocked, bdev_io); 3769 } 3770 3771 static void 3772 bdev_compare_and_write_do_write_done(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg) 3773 { 3774 struct spdk_bdev_io *parent_io = cb_arg; 3775 3776 if (!success) { 3777 SPDK_ERRLOG("Compare and write operation failed\n"); 3778 } 3779 3780 spdk_bdev_free_io(bdev_io); 3781 3782 bdev_comparev_and_writev_blocks_unlock(parent_io, 3783 success ? SPDK_BDEV_IO_STATUS_SUCCESS : SPDK_BDEV_IO_STATUS_FAILED); 3784 } 3785 3786 static void 3787 bdev_compare_and_write_do_write(void *_bdev_io) 3788 { 3789 struct spdk_bdev_io *bdev_io = _bdev_io; 3790 int rc; 3791 3792 rc = spdk_bdev_writev_blocks(bdev_io->internal.desc, 3793 spdk_io_channel_from_ctx(bdev_io->internal.ch), 3794 bdev_io->u.bdev.fused_iovs, bdev_io->u.bdev.fused_iovcnt, 3795 bdev_io->u.bdev.offset_blocks, bdev_io->u.bdev.num_blocks, 3796 bdev_compare_and_write_do_write_done, bdev_io); 3797 3798 3799 if (rc == -ENOMEM) { 3800 bdev_queue_io_wait_with_cb(bdev_io, bdev_compare_and_write_do_write); 3801 } else if (rc != 0) { 3802 bdev_comparev_and_writev_blocks_unlock(bdev_io, SPDK_BDEV_IO_STATUS_FAILED); 3803 } 3804 } 3805 3806 static void 3807 bdev_compare_and_write_do_compare_done(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg) 3808 { 3809 struct spdk_bdev_io *parent_io = cb_arg; 3810 3811 spdk_bdev_free_io(bdev_io); 3812 3813 if (!success) { 3814 bdev_comparev_and_writev_blocks_unlock(parent_io, SPDK_BDEV_IO_STATUS_MISCOMPARE); 3815 return; 3816 } 3817 3818 bdev_compare_and_write_do_write(parent_io); 3819 } 3820 3821 static void 3822 bdev_compare_and_write_do_compare(void *_bdev_io) 3823 { 3824 struct spdk_bdev_io *bdev_io = _bdev_io; 3825 int rc; 3826 3827 rc = spdk_bdev_comparev_blocks(bdev_io->internal.desc, 3828 spdk_io_channel_from_ctx(bdev_io->internal.ch), bdev_io->u.bdev.iovs, 3829 bdev_io->u.bdev.iovcnt, bdev_io->u.bdev.offset_blocks, bdev_io->u.bdev.num_blocks, 3830 bdev_compare_and_write_do_compare_done, bdev_io); 3831 3832 if (rc == -ENOMEM) { 3833 bdev_queue_io_wait_with_cb(bdev_io, bdev_compare_and_write_do_compare); 3834 } else if (rc != 0) { 3835 bdev_comparev_and_writev_blocks_unlock(bdev_io, SPDK_BDEV_IO_STATUS_FIRST_FUSED_FAILED); 3836 } 3837 } 3838 3839 static void 3840 bdev_comparev_and_writev_blocks_locked(void *ctx, int status) 3841 { 3842 struct spdk_bdev_io *bdev_io = ctx; 3843 3844 if (status) { 3845 bdev_io->internal.status = SPDK_BDEV_IO_STATUS_FIRST_FUSED_FAILED; 3846 bdev_io->internal.cb(bdev_io, false, bdev_io->internal.caller_ctx); 3847 } 3848 3849 bdev_compare_and_write_do_compare(bdev_io); 3850 } 3851 3852 int 3853 spdk_bdev_comparev_and_writev_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 3854 struct iovec *compare_iov, int compare_iovcnt, 3855 struct iovec *write_iov, int write_iovcnt, 3856 uint64_t offset_blocks, uint64_t num_blocks, 3857 spdk_bdev_io_completion_cb cb, void *cb_arg) 3858 { 3859 struct spdk_bdev *bdev = spdk_bdev_desc_get_bdev(desc); 3860 struct spdk_bdev_io *bdev_io; 3861 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 3862 3863 if (!desc->write) { 3864 return -EBADF; 3865 } 3866 3867 if (!bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) { 3868 return -EINVAL; 3869 } 3870 3871 if (num_blocks > bdev->acwu) { 3872 return -EINVAL; 3873 } 3874 3875 bdev_io = bdev_channel_get_io(channel); 3876 if (!bdev_io) { 3877 return -ENOMEM; 3878 } 3879 3880 bdev_io->internal.ch = channel; 3881 bdev_io->internal.desc = desc; 3882 bdev_io->type = SPDK_BDEV_IO_TYPE_COMPARE_AND_WRITE; 3883 bdev_io->u.bdev.iovs = compare_iov; 3884 bdev_io->u.bdev.iovcnt = compare_iovcnt; 3885 bdev_io->u.bdev.fused_iovs = write_iov; 3886 bdev_io->u.bdev.fused_iovcnt = write_iovcnt; 3887 bdev_io->u.bdev.md_buf = NULL; 3888 bdev_io->u.bdev.num_blocks = num_blocks; 3889 bdev_io->u.bdev.offset_blocks = offset_blocks; 3890 bdev_io_init(bdev_io, bdev, cb_arg, cb); 3891 3892 if (bdev_io_type_supported(bdev, SPDK_BDEV_IO_TYPE_COMPARE_AND_WRITE)) { 3893 bdev_io_submit(bdev_io); 3894 return 0; 3895 } 3896 3897 return bdev_lock_lba_range(desc, ch, offset_blocks, num_blocks, 3898 bdev_comparev_and_writev_blocks_locked, bdev_io); 3899 } 3900 3901 static void 3902 bdev_zcopy_get_buf(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io, bool success) 3903 { 3904 if (!success) { 3905 /* Don't use spdk_bdev_io_complete here - this bdev_io was never actually submitted. */ 3906 bdev_io->internal.status = SPDK_BDEV_IO_STATUS_NOMEM; 3907 bdev_io->internal.cb(bdev_io, success, bdev_io->internal.caller_ctx); 3908 return; 3909 } 3910 3911 if (bdev_io->u.bdev.zcopy.populate) { 3912 /* Read the real data into the buffer */ 3913 bdev_io->type = SPDK_BDEV_IO_TYPE_READ; 3914 bdev_io->internal.status = SPDK_BDEV_IO_STATUS_PENDING; 3915 bdev_io_submit(bdev_io); 3916 return; 3917 } 3918 3919 /* Don't use spdk_bdev_io_complete here - this bdev_io was never actually submitted. */ 3920 bdev_io->internal.status = SPDK_BDEV_IO_STATUS_SUCCESS; 3921 bdev_io->internal.cb(bdev_io, success, bdev_io->internal.caller_ctx); 3922 } 3923 3924 int 3925 spdk_bdev_zcopy_start(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 3926 uint64_t offset_blocks, uint64_t num_blocks, 3927 bool populate, 3928 spdk_bdev_io_completion_cb cb, void *cb_arg) 3929 { 3930 struct spdk_bdev *bdev = spdk_bdev_desc_get_bdev(desc); 3931 struct spdk_bdev_io *bdev_io; 3932 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 3933 3934 if (!desc->write) { 3935 return -EBADF; 3936 } 3937 3938 if (!bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) { 3939 return -EINVAL; 3940 } 3941 3942 if (!spdk_bdev_io_type_supported(bdev, SPDK_BDEV_IO_TYPE_ZCOPY)) { 3943 return -ENOTSUP; 3944 } 3945 3946 bdev_io = bdev_channel_get_io(channel); 3947 if (!bdev_io) { 3948 return -ENOMEM; 3949 } 3950 3951 bdev_io->internal.ch = channel; 3952 bdev_io->internal.desc = desc; 3953 bdev_io->type = SPDK_BDEV_IO_TYPE_ZCOPY; 3954 bdev_io->u.bdev.num_blocks = num_blocks; 3955 bdev_io->u.bdev.offset_blocks = offset_blocks; 3956 bdev_io->u.bdev.iovs = NULL; 3957 bdev_io->u.bdev.iovcnt = 0; 3958 bdev_io->u.bdev.md_buf = NULL; 3959 bdev_io->u.bdev.zcopy.populate = populate ? 1 : 0; 3960 bdev_io->u.bdev.zcopy.commit = 0; 3961 bdev_io->u.bdev.zcopy.start = 1; 3962 bdev_io_init(bdev_io, bdev, cb_arg, cb); 3963 3964 if (bdev_io_type_supported(bdev, SPDK_BDEV_IO_TYPE_ZCOPY)) { 3965 bdev_io_submit(bdev_io); 3966 } else { 3967 /* Emulate zcopy by allocating a buffer */ 3968 spdk_bdev_io_get_buf(bdev_io, bdev_zcopy_get_buf, 3969 bdev_io->u.bdev.num_blocks * bdev->blocklen); 3970 } 3971 3972 return 0; 3973 } 3974 3975 int 3976 spdk_bdev_zcopy_end(struct spdk_bdev_io *bdev_io, bool commit, 3977 spdk_bdev_io_completion_cb cb, void *cb_arg) 3978 { 3979 struct spdk_bdev *bdev = bdev_io->bdev; 3980 3981 if (bdev_io->type == SPDK_BDEV_IO_TYPE_READ) { 3982 /* This can happen if the zcopy was emulated in start */ 3983 if (bdev_io->u.bdev.zcopy.start != 1) { 3984 return -EINVAL; 3985 } 3986 bdev_io->type = SPDK_BDEV_IO_TYPE_ZCOPY; 3987 } 3988 3989 if (bdev_io->type != SPDK_BDEV_IO_TYPE_ZCOPY) { 3990 return -EINVAL; 3991 } 3992 3993 bdev_io->u.bdev.zcopy.commit = commit ? 1 : 0; 3994 bdev_io->u.bdev.zcopy.start = 0; 3995 bdev_io->internal.caller_ctx = cb_arg; 3996 bdev_io->internal.cb = cb; 3997 bdev_io->internal.status = SPDK_BDEV_IO_STATUS_PENDING; 3998 3999 if (bdev_io_type_supported(bdev, SPDK_BDEV_IO_TYPE_ZCOPY)) { 4000 bdev_io_submit(bdev_io); 4001 return 0; 4002 } 4003 4004 if (!bdev_io->u.bdev.zcopy.commit) { 4005 /* Don't use spdk_bdev_io_complete here - this bdev_io was never actually submitted. */ 4006 bdev_io->internal.status = SPDK_BDEV_IO_STATUS_SUCCESS; 4007 bdev_io->internal.cb(bdev_io, true, bdev_io->internal.caller_ctx); 4008 return 0; 4009 } 4010 4011 bdev_io->type = SPDK_BDEV_IO_TYPE_WRITE; 4012 bdev_io_submit(bdev_io); 4013 4014 return 0; 4015 } 4016 4017 int 4018 spdk_bdev_write_zeroes(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 4019 uint64_t offset, uint64_t len, 4020 spdk_bdev_io_completion_cb cb, void *cb_arg) 4021 { 4022 uint64_t offset_blocks, num_blocks; 4023 4024 if (bdev_bytes_to_blocks(spdk_bdev_desc_get_bdev(desc), offset, &offset_blocks, 4025 len, &num_blocks) != 0) { 4026 return -EINVAL; 4027 } 4028 4029 return spdk_bdev_write_zeroes_blocks(desc, ch, offset_blocks, num_blocks, cb, cb_arg); 4030 } 4031 4032 int 4033 spdk_bdev_write_zeroes_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 4034 uint64_t offset_blocks, uint64_t num_blocks, 4035 spdk_bdev_io_completion_cb cb, void *cb_arg) 4036 { 4037 struct spdk_bdev *bdev = spdk_bdev_desc_get_bdev(desc); 4038 struct spdk_bdev_io *bdev_io; 4039 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 4040 4041 if (!desc->write) { 4042 return -EBADF; 4043 } 4044 4045 if (!bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) { 4046 return -EINVAL; 4047 } 4048 4049 if (!bdev_io_type_supported(bdev, SPDK_BDEV_IO_TYPE_WRITE_ZEROES) && 4050 !bdev_io_type_supported(bdev, SPDK_BDEV_IO_TYPE_WRITE)) { 4051 return -ENOTSUP; 4052 } 4053 4054 bdev_io = bdev_channel_get_io(channel); 4055 4056 if (!bdev_io) { 4057 return -ENOMEM; 4058 } 4059 4060 bdev_io->type = SPDK_BDEV_IO_TYPE_WRITE_ZEROES; 4061 bdev_io->internal.ch = channel; 4062 bdev_io->internal.desc = desc; 4063 bdev_io->u.bdev.offset_blocks = offset_blocks; 4064 bdev_io->u.bdev.num_blocks = num_blocks; 4065 bdev_io_init(bdev_io, bdev, cb_arg, cb); 4066 4067 if (bdev_io_type_supported(bdev, SPDK_BDEV_IO_TYPE_WRITE_ZEROES)) { 4068 bdev_io_submit(bdev_io); 4069 return 0; 4070 } 4071 4072 assert(bdev_io_type_supported(bdev, SPDK_BDEV_IO_TYPE_WRITE)); 4073 assert(_bdev_get_block_size_with_md(bdev) <= ZERO_BUFFER_SIZE); 4074 bdev_io->u.bdev.split_remaining_num_blocks = num_blocks; 4075 bdev_io->u.bdev.split_current_offset_blocks = offset_blocks; 4076 bdev_write_zero_buffer_next(bdev_io); 4077 4078 return 0; 4079 } 4080 4081 int 4082 spdk_bdev_unmap(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 4083 uint64_t offset, uint64_t nbytes, 4084 spdk_bdev_io_completion_cb cb, void *cb_arg) 4085 { 4086 uint64_t offset_blocks, num_blocks; 4087 4088 if (bdev_bytes_to_blocks(spdk_bdev_desc_get_bdev(desc), offset, &offset_blocks, 4089 nbytes, &num_blocks) != 0) { 4090 return -EINVAL; 4091 } 4092 4093 return spdk_bdev_unmap_blocks(desc, ch, offset_blocks, num_blocks, cb, cb_arg); 4094 } 4095 4096 int 4097 spdk_bdev_unmap_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 4098 uint64_t offset_blocks, uint64_t num_blocks, 4099 spdk_bdev_io_completion_cb cb, void *cb_arg) 4100 { 4101 struct spdk_bdev *bdev = spdk_bdev_desc_get_bdev(desc); 4102 struct spdk_bdev_io *bdev_io; 4103 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 4104 4105 if (!desc->write) { 4106 return -EBADF; 4107 } 4108 4109 if (!bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) { 4110 return -EINVAL; 4111 } 4112 4113 if (num_blocks == 0) { 4114 SPDK_ERRLOG("Can't unmap 0 bytes\n"); 4115 return -EINVAL; 4116 } 4117 4118 bdev_io = bdev_channel_get_io(channel); 4119 if (!bdev_io) { 4120 return -ENOMEM; 4121 } 4122 4123 bdev_io->internal.ch = channel; 4124 bdev_io->internal.desc = desc; 4125 bdev_io->type = SPDK_BDEV_IO_TYPE_UNMAP; 4126 4127 bdev_io->u.bdev.iovs = &bdev_io->iov; 4128 bdev_io->u.bdev.iovs[0].iov_base = NULL; 4129 bdev_io->u.bdev.iovs[0].iov_len = 0; 4130 bdev_io->u.bdev.iovcnt = 1; 4131 4132 bdev_io->u.bdev.offset_blocks = offset_blocks; 4133 bdev_io->u.bdev.num_blocks = num_blocks; 4134 bdev_io_init(bdev_io, bdev, cb_arg, cb); 4135 4136 bdev_io_submit(bdev_io); 4137 return 0; 4138 } 4139 4140 int 4141 spdk_bdev_flush(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 4142 uint64_t offset, uint64_t length, 4143 spdk_bdev_io_completion_cb cb, void *cb_arg) 4144 { 4145 uint64_t offset_blocks, num_blocks; 4146 4147 if (bdev_bytes_to_blocks(spdk_bdev_desc_get_bdev(desc), offset, &offset_blocks, 4148 length, &num_blocks) != 0) { 4149 return -EINVAL; 4150 } 4151 4152 return spdk_bdev_flush_blocks(desc, ch, offset_blocks, num_blocks, cb, cb_arg); 4153 } 4154 4155 int 4156 spdk_bdev_flush_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 4157 uint64_t offset_blocks, uint64_t num_blocks, 4158 spdk_bdev_io_completion_cb cb, void *cb_arg) 4159 { 4160 struct spdk_bdev *bdev = spdk_bdev_desc_get_bdev(desc); 4161 struct spdk_bdev_io *bdev_io; 4162 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 4163 4164 if (!desc->write) { 4165 return -EBADF; 4166 } 4167 4168 if (!bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) { 4169 return -EINVAL; 4170 } 4171 4172 bdev_io = bdev_channel_get_io(channel); 4173 if (!bdev_io) { 4174 return -ENOMEM; 4175 } 4176 4177 bdev_io->internal.ch = channel; 4178 bdev_io->internal.desc = desc; 4179 bdev_io->type = SPDK_BDEV_IO_TYPE_FLUSH; 4180 bdev_io->u.bdev.iovs = NULL; 4181 bdev_io->u.bdev.iovcnt = 0; 4182 bdev_io->u.bdev.offset_blocks = offset_blocks; 4183 bdev_io->u.bdev.num_blocks = num_blocks; 4184 bdev_io_init(bdev_io, bdev, cb_arg, cb); 4185 4186 bdev_io_submit(bdev_io); 4187 return 0; 4188 } 4189 4190 static void 4191 bdev_reset_dev(struct spdk_io_channel_iter *i, int status) 4192 { 4193 struct spdk_bdev_channel *ch = spdk_io_channel_iter_get_ctx(i); 4194 struct spdk_bdev_io *bdev_io; 4195 4196 bdev_io = TAILQ_FIRST(&ch->queued_resets); 4197 TAILQ_REMOVE(&ch->queued_resets, bdev_io, internal.link); 4198 bdev_io_submit_reset(bdev_io); 4199 } 4200 4201 static void 4202 bdev_reset_freeze_channel(struct spdk_io_channel_iter *i) 4203 { 4204 struct spdk_io_channel *ch; 4205 struct spdk_bdev_channel *channel; 4206 struct spdk_bdev_mgmt_channel *mgmt_channel; 4207 struct spdk_bdev_shared_resource *shared_resource; 4208 bdev_io_tailq_t tmp_queued; 4209 4210 TAILQ_INIT(&tmp_queued); 4211 4212 ch = spdk_io_channel_iter_get_channel(i); 4213 channel = spdk_io_channel_get_ctx(ch); 4214 shared_resource = channel->shared_resource; 4215 mgmt_channel = shared_resource->mgmt_ch; 4216 4217 channel->flags |= BDEV_CH_RESET_IN_PROGRESS; 4218 4219 if ((channel->flags & BDEV_CH_QOS_ENABLED) != 0) { 4220 /* The QoS object is always valid and readable while 4221 * the channel flag is set, so the lock here should not 4222 * be necessary. We're not in the fast path though, so 4223 * just take it anyway. */ 4224 pthread_mutex_lock(&channel->bdev->internal.mutex); 4225 if (channel->bdev->internal.qos->ch == channel) { 4226 TAILQ_SWAP(&channel->bdev->internal.qos->queued, &tmp_queued, spdk_bdev_io, internal.link); 4227 } 4228 pthread_mutex_unlock(&channel->bdev->internal.mutex); 4229 } 4230 4231 bdev_abort_all_queued_io(&shared_resource->nomem_io, channel); 4232 bdev_abort_all_buf_io(&mgmt_channel->need_buf_small, channel); 4233 bdev_abort_all_buf_io(&mgmt_channel->need_buf_large, channel); 4234 bdev_abort_all_queued_io(&tmp_queued, channel); 4235 4236 spdk_for_each_channel_continue(i, 0); 4237 } 4238 4239 static void 4240 bdev_start_reset(void *ctx) 4241 { 4242 struct spdk_bdev_channel *ch = ctx; 4243 4244 spdk_for_each_channel(__bdev_to_io_dev(ch->bdev), bdev_reset_freeze_channel, 4245 ch, bdev_reset_dev); 4246 } 4247 4248 static void 4249 bdev_channel_start_reset(struct spdk_bdev_channel *ch) 4250 { 4251 struct spdk_bdev *bdev = ch->bdev; 4252 4253 assert(!TAILQ_EMPTY(&ch->queued_resets)); 4254 4255 pthread_mutex_lock(&bdev->internal.mutex); 4256 if (bdev->internal.reset_in_progress == NULL) { 4257 bdev->internal.reset_in_progress = TAILQ_FIRST(&ch->queued_resets); 4258 /* 4259 * Take a channel reference for the target bdev for the life of this 4260 * reset. This guards against the channel getting destroyed while 4261 * spdk_for_each_channel() calls related to this reset IO are in 4262 * progress. We will release the reference when this reset is 4263 * completed. 4264 */ 4265 bdev->internal.reset_in_progress->u.reset.ch_ref = spdk_get_io_channel(__bdev_to_io_dev(bdev)); 4266 bdev_start_reset(ch); 4267 } 4268 pthread_mutex_unlock(&bdev->internal.mutex); 4269 } 4270 4271 int 4272 spdk_bdev_reset(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 4273 spdk_bdev_io_completion_cb cb, void *cb_arg) 4274 { 4275 struct spdk_bdev *bdev = spdk_bdev_desc_get_bdev(desc); 4276 struct spdk_bdev_io *bdev_io; 4277 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 4278 4279 bdev_io = bdev_channel_get_io(channel); 4280 if (!bdev_io) { 4281 return -ENOMEM; 4282 } 4283 4284 bdev_io->internal.ch = channel; 4285 bdev_io->internal.desc = desc; 4286 bdev_io->internal.submit_tsc = spdk_get_ticks(); 4287 bdev_io->type = SPDK_BDEV_IO_TYPE_RESET; 4288 bdev_io->u.reset.ch_ref = NULL; 4289 bdev_io_init(bdev_io, bdev, cb_arg, cb); 4290 4291 pthread_mutex_lock(&bdev->internal.mutex); 4292 TAILQ_INSERT_TAIL(&channel->queued_resets, bdev_io, internal.link); 4293 pthread_mutex_unlock(&bdev->internal.mutex); 4294 4295 TAILQ_INSERT_TAIL(&bdev_io->internal.ch->io_submitted, bdev_io, 4296 internal.ch_link); 4297 4298 bdev_channel_start_reset(channel); 4299 4300 return 0; 4301 } 4302 4303 void 4304 spdk_bdev_get_io_stat(struct spdk_bdev *bdev, struct spdk_io_channel *ch, 4305 struct spdk_bdev_io_stat *stat) 4306 { 4307 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 4308 4309 *stat = channel->stat; 4310 } 4311 4312 static void 4313 bdev_get_device_stat_done(struct spdk_io_channel_iter *i, int status) 4314 { 4315 void *io_device = spdk_io_channel_iter_get_io_device(i); 4316 struct spdk_bdev_iostat_ctx *bdev_iostat_ctx = spdk_io_channel_iter_get_ctx(i); 4317 4318 bdev_iostat_ctx->cb(__bdev_from_io_dev(io_device), bdev_iostat_ctx->stat, 4319 bdev_iostat_ctx->cb_arg, 0); 4320 free(bdev_iostat_ctx); 4321 } 4322 4323 static void 4324 bdev_get_each_channel_stat(struct spdk_io_channel_iter *i) 4325 { 4326 struct spdk_bdev_iostat_ctx *bdev_iostat_ctx = spdk_io_channel_iter_get_ctx(i); 4327 struct spdk_io_channel *ch = spdk_io_channel_iter_get_channel(i); 4328 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 4329 4330 bdev_io_stat_add(bdev_iostat_ctx->stat, &channel->stat); 4331 spdk_for_each_channel_continue(i, 0); 4332 } 4333 4334 void 4335 spdk_bdev_get_device_stat(struct spdk_bdev *bdev, struct spdk_bdev_io_stat *stat, 4336 spdk_bdev_get_device_stat_cb cb, void *cb_arg) 4337 { 4338 struct spdk_bdev_iostat_ctx *bdev_iostat_ctx; 4339 4340 assert(bdev != NULL); 4341 assert(stat != NULL); 4342 assert(cb != NULL); 4343 4344 bdev_iostat_ctx = calloc(1, sizeof(struct spdk_bdev_iostat_ctx)); 4345 if (bdev_iostat_ctx == NULL) { 4346 SPDK_ERRLOG("Unable to allocate memory for spdk_bdev_iostat_ctx\n"); 4347 cb(bdev, stat, cb_arg, -ENOMEM); 4348 return; 4349 } 4350 4351 bdev_iostat_ctx->stat = stat; 4352 bdev_iostat_ctx->cb = cb; 4353 bdev_iostat_ctx->cb_arg = cb_arg; 4354 4355 /* Start with the statistics from previously deleted channels. */ 4356 pthread_mutex_lock(&bdev->internal.mutex); 4357 bdev_io_stat_add(bdev_iostat_ctx->stat, &bdev->internal.stat); 4358 pthread_mutex_unlock(&bdev->internal.mutex); 4359 4360 /* Then iterate and add the statistics from each existing channel. */ 4361 spdk_for_each_channel(__bdev_to_io_dev(bdev), 4362 bdev_get_each_channel_stat, 4363 bdev_iostat_ctx, 4364 bdev_get_device_stat_done); 4365 } 4366 4367 int 4368 spdk_bdev_nvme_admin_passthru(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 4369 const struct spdk_nvme_cmd *cmd, void *buf, size_t nbytes, 4370 spdk_bdev_io_completion_cb cb, void *cb_arg) 4371 { 4372 struct spdk_bdev *bdev = spdk_bdev_desc_get_bdev(desc); 4373 struct spdk_bdev_io *bdev_io; 4374 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 4375 4376 if (!desc->write) { 4377 return -EBADF; 4378 } 4379 4380 bdev_io = bdev_channel_get_io(channel); 4381 if (!bdev_io) { 4382 return -ENOMEM; 4383 } 4384 4385 bdev_io->internal.ch = channel; 4386 bdev_io->internal.desc = desc; 4387 bdev_io->type = SPDK_BDEV_IO_TYPE_NVME_ADMIN; 4388 bdev_io->u.nvme_passthru.cmd = *cmd; 4389 bdev_io->u.nvme_passthru.buf = buf; 4390 bdev_io->u.nvme_passthru.nbytes = nbytes; 4391 bdev_io->u.nvme_passthru.md_buf = NULL; 4392 bdev_io->u.nvme_passthru.md_len = 0; 4393 4394 bdev_io_init(bdev_io, bdev, cb_arg, cb); 4395 4396 bdev_io_submit(bdev_io); 4397 return 0; 4398 } 4399 4400 int 4401 spdk_bdev_nvme_io_passthru(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 4402 const struct spdk_nvme_cmd *cmd, void *buf, size_t nbytes, 4403 spdk_bdev_io_completion_cb cb, void *cb_arg) 4404 { 4405 struct spdk_bdev *bdev = spdk_bdev_desc_get_bdev(desc); 4406 struct spdk_bdev_io *bdev_io; 4407 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 4408 4409 if (!desc->write) { 4410 /* 4411 * Do not try to parse the NVMe command - we could maybe use bits in the opcode 4412 * to easily determine if the command is a read or write, but for now just 4413 * do not allow io_passthru with a read-only descriptor. 4414 */ 4415 return -EBADF; 4416 } 4417 4418 bdev_io = bdev_channel_get_io(channel); 4419 if (!bdev_io) { 4420 return -ENOMEM; 4421 } 4422 4423 bdev_io->internal.ch = channel; 4424 bdev_io->internal.desc = desc; 4425 bdev_io->type = SPDK_BDEV_IO_TYPE_NVME_IO; 4426 bdev_io->u.nvme_passthru.cmd = *cmd; 4427 bdev_io->u.nvme_passthru.buf = buf; 4428 bdev_io->u.nvme_passthru.nbytes = nbytes; 4429 bdev_io->u.nvme_passthru.md_buf = NULL; 4430 bdev_io->u.nvme_passthru.md_len = 0; 4431 4432 bdev_io_init(bdev_io, bdev, cb_arg, cb); 4433 4434 bdev_io_submit(bdev_io); 4435 return 0; 4436 } 4437 4438 int 4439 spdk_bdev_nvme_io_passthru_md(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 4440 const struct spdk_nvme_cmd *cmd, void *buf, size_t nbytes, void *md_buf, size_t md_len, 4441 spdk_bdev_io_completion_cb cb, void *cb_arg) 4442 { 4443 struct spdk_bdev *bdev = spdk_bdev_desc_get_bdev(desc); 4444 struct spdk_bdev_io *bdev_io; 4445 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 4446 4447 if (!desc->write) { 4448 /* 4449 * Do not try to parse the NVMe command - we could maybe use bits in the opcode 4450 * to easily determine if the command is a read or write, but for now just 4451 * do not allow io_passthru with a read-only descriptor. 4452 */ 4453 return -EBADF; 4454 } 4455 4456 bdev_io = bdev_channel_get_io(channel); 4457 if (!bdev_io) { 4458 return -ENOMEM; 4459 } 4460 4461 bdev_io->internal.ch = channel; 4462 bdev_io->internal.desc = desc; 4463 bdev_io->type = SPDK_BDEV_IO_TYPE_NVME_IO_MD; 4464 bdev_io->u.nvme_passthru.cmd = *cmd; 4465 bdev_io->u.nvme_passthru.buf = buf; 4466 bdev_io->u.nvme_passthru.nbytes = nbytes; 4467 bdev_io->u.nvme_passthru.md_buf = md_buf; 4468 bdev_io->u.nvme_passthru.md_len = md_len; 4469 4470 bdev_io_init(bdev_io, bdev, cb_arg, cb); 4471 4472 bdev_io_submit(bdev_io); 4473 return 0; 4474 } 4475 4476 static void bdev_abort_retry(void *ctx); 4477 static void bdev_abort(struct spdk_bdev_io *parent_io); 4478 4479 static void 4480 bdev_abort_io_done(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg) 4481 { 4482 struct spdk_bdev_channel *channel = bdev_io->internal.ch; 4483 struct spdk_bdev_io *parent_io = cb_arg; 4484 struct spdk_bdev_io *bio_to_abort, *tmp_io; 4485 4486 bio_to_abort = bdev_io->u.abort.bio_to_abort; 4487 4488 spdk_bdev_free_io(bdev_io); 4489 4490 if (!success) { 4491 /* Check if the target I/O completed in the meantime. */ 4492 TAILQ_FOREACH(tmp_io, &channel->io_submitted, internal.ch_link) { 4493 if (tmp_io == bio_to_abort) { 4494 break; 4495 } 4496 } 4497 4498 /* If the target I/O still exists, set the parent to failed. */ 4499 if (tmp_io != NULL) { 4500 parent_io->internal.status = SPDK_BDEV_IO_STATUS_FAILED; 4501 } 4502 } 4503 4504 parent_io->u.bdev.split_outstanding--; 4505 if (parent_io->u.bdev.split_outstanding == 0) { 4506 if (parent_io->internal.status == SPDK_BDEV_IO_STATUS_NOMEM) { 4507 bdev_abort_retry(parent_io); 4508 } else { 4509 bdev_io_complete(parent_io); 4510 } 4511 } 4512 } 4513 4514 static int 4515 bdev_abort_io(struct spdk_bdev_desc *desc, struct spdk_bdev_channel *channel, 4516 struct spdk_bdev_io *bio_to_abort, 4517 spdk_bdev_io_completion_cb cb, void *cb_arg) 4518 { 4519 struct spdk_bdev *bdev = spdk_bdev_desc_get_bdev(desc); 4520 struct spdk_bdev_io *bdev_io; 4521 4522 if (bio_to_abort->type == SPDK_BDEV_IO_TYPE_ABORT || 4523 bio_to_abort->type == SPDK_BDEV_IO_TYPE_RESET) { 4524 /* TODO: Abort reset or abort request. */ 4525 return -ENOTSUP; 4526 } 4527 4528 bdev_io = bdev_channel_get_io(channel); 4529 if (bdev_io == NULL) { 4530 return -ENOMEM; 4531 } 4532 4533 bdev_io->internal.ch = channel; 4534 bdev_io->internal.desc = desc; 4535 bdev_io->type = SPDK_BDEV_IO_TYPE_ABORT; 4536 bdev_io_init(bdev_io, bdev, cb_arg, cb); 4537 4538 if (bdev->split_on_optimal_io_boundary && bdev_io_should_split(bio_to_abort)) { 4539 bdev_io->u.bdev.abort.bio_cb_arg = bio_to_abort; 4540 4541 /* Parent abort request is not submitted directly, but to manage its 4542 * execution add it to the submitted list here. 4543 */ 4544 bdev_io->internal.submit_tsc = spdk_get_ticks(); 4545 TAILQ_INSERT_TAIL(&channel->io_submitted, bdev_io, internal.ch_link); 4546 4547 bdev_abort(bdev_io); 4548 4549 return 0; 4550 } 4551 4552 bdev_io->u.abort.bio_to_abort = bio_to_abort; 4553 4554 /* Submit the abort request to the underlying bdev module. */ 4555 bdev_io_submit(bdev_io); 4556 4557 return 0; 4558 } 4559 4560 static uint32_t 4561 _bdev_abort(struct spdk_bdev_io *parent_io) 4562 { 4563 struct spdk_bdev_desc *desc = parent_io->internal.desc; 4564 struct spdk_bdev_channel *channel = parent_io->internal.ch; 4565 void *bio_cb_arg; 4566 struct spdk_bdev_io *bio_to_abort; 4567 uint32_t matched_ios; 4568 int rc; 4569 4570 bio_cb_arg = parent_io->u.bdev.abort.bio_cb_arg; 4571 4572 /* matched_ios is returned and will be kept by the caller. 4573 * 4574 * This funcion will be used for two cases, 1) the same cb_arg is used for 4575 * multiple I/Os, 2) a single large I/O is split into smaller ones. 4576 * Incrementing split_outstanding directly here may confuse readers especially 4577 * for the 1st case. 4578 * 4579 * Completion of I/O abort is processed after stack unwinding. Hence this trick 4580 * works as expected. 4581 */ 4582 matched_ios = 0; 4583 parent_io->internal.status = SPDK_BDEV_IO_STATUS_SUCCESS; 4584 4585 TAILQ_FOREACH(bio_to_abort, &channel->io_submitted, internal.ch_link) { 4586 if (bio_to_abort->internal.caller_ctx != bio_cb_arg) { 4587 continue; 4588 } 4589 4590 if (bio_to_abort->internal.submit_tsc > parent_io->internal.submit_tsc) { 4591 /* Any I/O which was submitted after this abort command should be excluded. */ 4592 continue; 4593 } 4594 4595 rc = bdev_abort_io(desc, channel, bio_to_abort, bdev_abort_io_done, parent_io); 4596 if (rc != 0) { 4597 if (rc == -ENOMEM) { 4598 parent_io->internal.status = SPDK_BDEV_IO_STATUS_NOMEM; 4599 } else { 4600 parent_io->internal.status = SPDK_BDEV_IO_STATUS_FAILED; 4601 } 4602 break; 4603 } 4604 matched_ios++; 4605 } 4606 4607 return matched_ios; 4608 } 4609 4610 static void 4611 bdev_abort_retry(void *ctx) 4612 { 4613 struct spdk_bdev_io *parent_io = ctx; 4614 uint32_t matched_ios; 4615 4616 matched_ios = _bdev_abort(parent_io); 4617 4618 if (matched_ios == 0) { 4619 if (parent_io->internal.status == SPDK_BDEV_IO_STATUS_NOMEM) { 4620 bdev_queue_io_wait_with_cb(parent_io, bdev_abort_retry); 4621 } else { 4622 /* For retry, the case that no target I/O was found is success 4623 * because it means target I/Os completed in the meantime. 4624 */ 4625 bdev_io_complete(parent_io); 4626 } 4627 return; 4628 } 4629 4630 /* Use split_outstanding to manage the progress of aborting I/Os. */ 4631 parent_io->u.bdev.split_outstanding = matched_ios; 4632 } 4633 4634 static void 4635 bdev_abort(struct spdk_bdev_io *parent_io) 4636 { 4637 uint32_t matched_ios; 4638 4639 matched_ios = _bdev_abort(parent_io); 4640 4641 if (matched_ios == 0) { 4642 if (parent_io->internal.status == SPDK_BDEV_IO_STATUS_NOMEM) { 4643 bdev_queue_io_wait_with_cb(parent_io, bdev_abort_retry); 4644 } else { 4645 /* The case the no target I/O was found is failure. */ 4646 parent_io->internal.status = SPDK_BDEV_IO_STATUS_FAILED; 4647 bdev_io_complete(parent_io); 4648 } 4649 return; 4650 } 4651 4652 /* Use split_outstanding to manage the progress of aborting I/Os. */ 4653 parent_io->u.bdev.split_outstanding = matched_ios; 4654 } 4655 4656 int 4657 spdk_bdev_abort(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 4658 void *bio_cb_arg, 4659 spdk_bdev_io_completion_cb cb, void *cb_arg) 4660 { 4661 struct spdk_bdev *bdev = spdk_bdev_desc_get_bdev(desc); 4662 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 4663 struct spdk_bdev_io *bdev_io; 4664 4665 if (bio_cb_arg == NULL) { 4666 return -EINVAL; 4667 } 4668 4669 if (!spdk_bdev_io_type_supported(bdev, SPDK_BDEV_IO_TYPE_ABORT)) { 4670 return -ENOTSUP; 4671 } 4672 4673 bdev_io = bdev_channel_get_io(channel); 4674 if (bdev_io == NULL) { 4675 return -ENOMEM; 4676 } 4677 4678 bdev_io->internal.ch = channel; 4679 bdev_io->internal.desc = desc; 4680 bdev_io->internal.submit_tsc = spdk_get_ticks(); 4681 bdev_io->type = SPDK_BDEV_IO_TYPE_ABORT; 4682 bdev_io_init(bdev_io, bdev, cb_arg, cb); 4683 4684 bdev_io->u.bdev.abort.bio_cb_arg = bio_cb_arg; 4685 4686 /* Parent abort request is not submitted directly, but to manage its execution, 4687 * add it to the submitted list here. 4688 */ 4689 TAILQ_INSERT_TAIL(&channel->io_submitted, bdev_io, internal.ch_link); 4690 4691 bdev_abort(bdev_io); 4692 4693 return 0; 4694 } 4695 4696 int 4697 spdk_bdev_queue_io_wait(struct spdk_bdev *bdev, struct spdk_io_channel *ch, 4698 struct spdk_bdev_io_wait_entry *entry) 4699 { 4700 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 4701 struct spdk_bdev_mgmt_channel *mgmt_ch = channel->shared_resource->mgmt_ch; 4702 4703 if (bdev != entry->bdev) { 4704 SPDK_ERRLOG("bdevs do not match\n"); 4705 return -EINVAL; 4706 } 4707 4708 if (mgmt_ch->per_thread_cache_count > 0) { 4709 SPDK_ERRLOG("Cannot queue io_wait if spdk_bdev_io available in per-thread cache\n"); 4710 return -EINVAL; 4711 } 4712 4713 TAILQ_INSERT_TAIL(&mgmt_ch->io_wait_queue, entry, link); 4714 return 0; 4715 } 4716 4717 static void 4718 bdev_ch_retry_io(struct spdk_bdev_channel *bdev_ch) 4719 { 4720 struct spdk_bdev *bdev = bdev_ch->bdev; 4721 struct spdk_bdev_shared_resource *shared_resource = bdev_ch->shared_resource; 4722 struct spdk_bdev_io *bdev_io; 4723 4724 if (shared_resource->io_outstanding > shared_resource->nomem_threshold) { 4725 /* 4726 * Allow some more I/O to complete before retrying the nomem_io queue. 4727 * Some drivers (such as nvme) cannot immediately take a new I/O in 4728 * the context of a completion, because the resources for the I/O are 4729 * not released until control returns to the bdev poller. Also, we 4730 * may require several small I/O to complete before a larger I/O 4731 * (that requires splitting) can be submitted. 4732 */ 4733 return; 4734 } 4735 4736 while (!TAILQ_EMPTY(&shared_resource->nomem_io)) { 4737 bdev_io = TAILQ_FIRST(&shared_resource->nomem_io); 4738 TAILQ_REMOVE(&shared_resource->nomem_io, bdev_io, internal.link); 4739 bdev_io->internal.ch->io_outstanding++; 4740 shared_resource->io_outstanding++; 4741 bdev_io->internal.status = SPDK_BDEV_IO_STATUS_PENDING; 4742 bdev_io->internal.error.nvme.cdw0 = 0; 4743 bdev_io->num_retries++; 4744 bdev->fn_table->submit_request(spdk_bdev_io_get_io_channel(bdev_io), bdev_io); 4745 if (bdev_io->internal.status == SPDK_BDEV_IO_STATUS_NOMEM) { 4746 break; 4747 } 4748 } 4749 } 4750 4751 static inline void 4752 bdev_io_complete(void *ctx) 4753 { 4754 struct spdk_bdev_io *bdev_io = ctx; 4755 struct spdk_bdev_channel *bdev_ch = bdev_io->internal.ch; 4756 uint64_t tsc, tsc_diff; 4757 4758 if (spdk_unlikely(bdev_io->internal.in_submit_request || bdev_io->internal.io_submit_ch)) { 4759 /* 4760 * Send the completion to the thread that originally submitted the I/O, 4761 * which may not be the current thread in the case of QoS. 4762 */ 4763 if (bdev_io->internal.io_submit_ch) { 4764 bdev_io->internal.ch = bdev_io->internal.io_submit_ch; 4765 bdev_io->internal.io_submit_ch = NULL; 4766 } 4767 4768 /* 4769 * Defer completion to avoid potential infinite recursion if the 4770 * user's completion callback issues a new I/O. 4771 */ 4772 spdk_thread_send_msg(spdk_bdev_io_get_thread(bdev_io), 4773 bdev_io_complete, bdev_io); 4774 return; 4775 } 4776 4777 tsc = spdk_get_ticks(); 4778 tsc_diff = tsc - bdev_io->internal.submit_tsc; 4779 spdk_trace_record_tsc(tsc, TRACE_BDEV_IO_DONE, 0, 0, (uintptr_t)bdev_io, 0); 4780 4781 TAILQ_REMOVE(&bdev_ch->io_submitted, bdev_io, internal.ch_link); 4782 4783 if (bdev_io->internal.ch->histogram) { 4784 spdk_histogram_data_tally(bdev_io->internal.ch->histogram, tsc_diff); 4785 } 4786 4787 if (bdev_io->internal.status == SPDK_BDEV_IO_STATUS_SUCCESS) { 4788 switch (bdev_io->type) { 4789 case SPDK_BDEV_IO_TYPE_READ: 4790 bdev_io->internal.ch->stat.bytes_read += bdev_io->u.bdev.num_blocks * bdev_io->bdev->blocklen; 4791 bdev_io->internal.ch->stat.num_read_ops++; 4792 bdev_io->internal.ch->stat.read_latency_ticks += tsc_diff; 4793 break; 4794 case SPDK_BDEV_IO_TYPE_WRITE: 4795 bdev_io->internal.ch->stat.bytes_written += bdev_io->u.bdev.num_blocks * bdev_io->bdev->blocklen; 4796 bdev_io->internal.ch->stat.num_write_ops++; 4797 bdev_io->internal.ch->stat.write_latency_ticks += tsc_diff; 4798 break; 4799 case SPDK_BDEV_IO_TYPE_UNMAP: 4800 bdev_io->internal.ch->stat.bytes_unmapped += bdev_io->u.bdev.num_blocks * bdev_io->bdev->blocklen; 4801 bdev_io->internal.ch->stat.num_unmap_ops++; 4802 bdev_io->internal.ch->stat.unmap_latency_ticks += tsc_diff; 4803 break; 4804 case SPDK_BDEV_IO_TYPE_ZCOPY: 4805 /* Track the data in the start phase only */ 4806 if (bdev_io->u.bdev.zcopy.start) { 4807 if (bdev_io->u.bdev.zcopy.populate) { 4808 bdev_io->internal.ch->stat.bytes_read += 4809 bdev_io->u.bdev.num_blocks * bdev_io->bdev->blocklen; 4810 bdev_io->internal.ch->stat.num_read_ops++; 4811 bdev_io->internal.ch->stat.read_latency_ticks += tsc_diff; 4812 } else { 4813 bdev_io->internal.ch->stat.bytes_written += 4814 bdev_io->u.bdev.num_blocks * bdev_io->bdev->blocklen; 4815 bdev_io->internal.ch->stat.num_write_ops++; 4816 bdev_io->internal.ch->stat.write_latency_ticks += tsc_diff; 4817 } 4818 } 4819 break; 4820 default: 4821 break; 4822 } 4823 } 4824 4825 #ifdef SPDK_CONFIG_VTUNE 4826 uint64_t now_tsc = spdk_get_ticks(); 4827 if (now_tsc > (bdev_io->internal.ch->start_tsc + bdev_io->internal.ch->interval_tsc)) { 4828 uint64_t data[5]; 4829 4830 data[0] = bdev_io->internal.ch->stat.num_read_ops - bdev_io->internal.ch->prev_stat.num_read_ops; 4831 data[1] = bdev_io->internal.ch->stat.bytes_read - bdev_io->internal.ch->prev_stat.bytes_read; 4832 data[2] = bdev_io->internal.ch->stat.num_write_ops - bdev_io->internal.ch->prev_stat.num_write_ops; 4833 data[3] = bdev_io->internal.ch->stat.bytes_written - bdev_io->internal.ch->prev_stat.bytes_written; 4834 data[4] = bdev_io->bdev->fn_table->get_spin_time ? 4835 bdev_io->bdev->fn_table->get_spin_time(spdk_bdev_io_get_io_channel(bdev_io)) : 0; 4836 4837 __itt_metadata_add(g_bdev_mgr.domain, __itt_null, bdev_io->internal.ch->handle, 4838 __itt_metadata_u64, 5, data); 4839 4840 bdev_io->internal.ch->prev_stat = bdev_io->internal.ch->stat; 4841 bdev_io->internal.ch->start_tsc = now_tsc; 4842 } 4843 #endif 4844 4845 assert(bdev_io->internal.cb != NULL); 4846 assert(spdk_get_thread() == spdk_bdev_io_get_thread(bdev_io)); 4847 4848 bdev_io->internal.cb(bdev_io, bdev_io->internal.status == SPDK_BDEV_IO_STATUS_SUCCESS, 4849 bdev_io->internal.caller_ctx); 4850 } 4851 4852 static void 4853 bdev_reset_complete(struct spdk_io_channel_iter *i, int status) 4854 { 4855 struct spdk_bdev_io *bdev_io = spdk_io_channel_iter_get_ctx(i); 4856 4857 if (bdev_io->u.reset.ch_ref != NULL) { 4858 spdk_put_io_channel(bdev_io->u.reset.ch_ref); 4859 bdev_io->u.reset.ch_ref = NULL; 4860 } 4861 4862 bdev_io_complete(bdev_io); 4863 } 4864 4865 static void 4866 bdev_unfreeze_channel(struct spdk_io_channel_iter *i) 4867 { 4868 struct spdk_bdev_io *bdev_io = spdk_io_channel_iter_get_ctx(i); 4869 struct spdk_io_channel *_ch = spdk_io_channel_iter_get_channel(i); 4870 struct spdk_bdev_channel *ch = spdk_io_channel_get_ctx(_ch); 4871 struct spdk_bdev_io *queued_reset; 4872 4873 ch->flags &= ~BDEV_CH_RESET_IN_PROGRESS; 4874 while (!TAILQ_EMPTY(&ch->queued_resets)) { 4875 queued_reset = TAILQ_FIRST(&ch->queued_resets); 4876 TAILQ_REMOVE(&ch->queued_resets, queued_reset, internal.link); 4877 spdk_bdev_io_complete(queued_reset, bdev_io->internal.status); 4878 } 4879 4880 spdk_for_each_channel_continue(i, 0); 4881 } 4882 4883 void 4884 spdk_bdev_io_complete(struct spdk_bdev_io *bdev_io, enum spdk_bdev_io_status status) 4885 { 4886 struct spdk_bdev *bdev = bdev_io->bdev; 4887 struct spdk_bdev_channel *bdev_ch = bdev_io->internal.ch; 4888 struct spdk_bdev_shared_resource *shared_resource = bdev_ch->shared_resource; 4889 4890 bdev_io->internal.status = status; 4891 4892 if (spdk_unlikely(bdev_io->type == SPDK_BDEV_IO_TYPE_RESET)) { 4893 bool unlock_channels = false; 4894 4895 if (status == SPDK_BDEV_IO_STATUS_NOMEM) { 4896 SPDK_ERRLOG("NOMEM returned for reset\n"); 4897 } 4898 pthread_mutex_lock(&bdev->internal.mutex); 4899 if (bdev_io == bdev->internal.reset_in_progress) { 4900 bdev->internal.reset_in_progress = NULL; 4901 unlock_channels = true; 4902 } 4903 pthread_mutex_unlock(&bdev->internal.mutex); 4904 4905 if (unlock_channels) { 4906 spdk_for_each_channel(__bdev_to_io_dev(bdev), bdev_unfreeze_channel, 4907 bdev_io, bdev_reset_complete); 4908 return; 4909 } 4910 } else { 4911 _bdev_io_unset_bounce_buf(bdev_io); 4912 4913 assert(bdev_ch->io_outstanding > 0); 4914 assert(shared_resource->io_outstanding > 0); 4915 bdev_ch->io_outstanding--; 4916 shared_resource->io_outstanding--; 4917 4918 if (spdk_unlikely(status == SPDK_BDEV_IO_STATUS_NOMEM)) { 4919 TAILQ_INSERT_HEAD(&shared_resource->nomem_io, bdev_io, internal.link); 4920 /* 4921 * Wait for some of the outstanding I/O to complete before we 4922 * retry any of the nomem_io. Normally we will wait for 4923 * NOMEM_THRESHOLD_COUNT I/O to complete but for low queue 4924 * depth channels we will instead wait for half to complete. 4925 */ 4926 shared_resource->nomem_threshold = spdk_max((int64_t)shared_resource->io_outstanding / 2, 4927 (int64_t)shared_resource->io_outstanding - NOMEM_THRESHOLD_COUNT); 4928 return; 4929 } 4930 4931 if (spdk_unlikely(!TAILQ_EMPTY(&shared_resource->nomem_io))) { 4932 bdev_ch_retry_io(bdev_ch); 4933 } 4934 } 4935 4936 bdev_io_complete(bdev_io); 4937 } 4938 4939 void 4940 spdk_bdev_io_complete_scsi_status(struct spdk_bdev_io *bdev_io, enum spdk_scsi_status sc, 4941 enum spdk_scsi_sense sk, uint8_t asc, uint8_t ascq) 4942 { 4943 if (sc == SPDK_SCSI_STATUS_GOOD) { 4944 bdev_io->internal.status = SPDK_BDEV_IO_STATUS_SUCCESS; 4945 } else { 4946 bdev_io->internal.status = SPDK_BDEV_IO_STATUS_SCSI_ERROR; 4947 bdev_io->internal.error.scsi.sc = sc; 4948 bdev_io->internal.error.scsi.sk = sk; 4949 bdev_io->internal.error.scsi.asc = asc; 4950 bdev_io->internal.error.scsi.ascq = ascq; 4951 } 4952 4953 spdk_bdev_io_complete(bdev_io, bdev_io->internal.status); 4954 } 4955 4956 void 4957 spdk_bdev_io_get_scsi_status(const struct spdk_bdev_io *bdev_io, 4958 int *sc, int *sk, int *asc, int *ascq) 4959 { 4960 assert(sc != NULL); 4961 assert(sk != NULL); 4962 assert(asc != NULL); 4963 assert(ascq != NULL); 4964 4965 switch (bdev_io->internal.status) { 4966 case SPDK_BDEV_IO_STATUS_SUCCESS: 4967 *sc = SPDK_SCSI_STATUS_GOOD; 4968 *sk = SPDK_SCSI_SENSE_NO_SENSE; 4969 *asc = SPDK_SCSI_ASC_NO_ADDITIONAL_SENSE; 4970 *ascq = SPDK_SCSI_ASCQ_CAUSE_NOT_REPORTABLE; 4971 break; 4972 case SPDK_BDEV_IO_STATUS_NVME_ERROR: 4973 spdk_scsi_nvme_translate(bdev_io, sc, sk, asc, ascq); 4974 break; 4975 case SPDK_BDEV_IO_STATUS_SCSI_ERROR: 4976 *sc = bdev_io->internal.error.scsi.sc; 4977 *sk = bdev_io->internal.error.scsi.sk; 4978 *asc = bdev_io->internal.error.scsi.asc; 4979 *ascq = bdev_io->internal.error.scsi.ascq; 4980 break; 4981 default: 4982 *sc = SPDK_SCSI_STATUS_CHECK_CONDITION; 4983 *sk = SPDK_SCSI_SENSE_ABORTED_COMMAND; 4984 *asc = SPDK_SCSI_ASC_NO_ADDITIONAL_SENSE; 4985 *ascq = SPDK_SCSI_ASCQ_CAUSE_NOT_REPORTABLE; 4986 break; 4987 } 4988 } 4989 4990 void 4991 spdk_bdev_io_complete_nvme_status(struct spdk_bdev_io *bdev_io, uint32_t cdw0, int sct, int sc) 4992 { 4993 if (sct == SPDK_NVME_SCT_GENERIC && sc == SPDK_NVME_SC_SUCCESS) { 4994 bdev_io->internal.status = SPDK_BDEV_IO_STATUS_SUCCESS; 4995 } else { 4996 bdev_io->internal.status = SPDK_BDEV_IO_STATUS_NVME_ERROR; 4997 } 4998 4999 bdev_io->internal.error.nvme.cdw0 = cdw0; 5000 bdev_io->internal.error.nvme.sct = sct; 5001 bdev_io->internal.error.nvme.sc = sc; 5002 5003 spdk_bdev_io_complete(bdev_io, bdev_io->internal.status); 5004 } 5005 5006 void 5007 spdk_bdev_io_get_nvme_status(const struct spdk_bdev_io *bdev_io, uint32_t *cdw0, int *sct, int *sc) 5008 { 5009 assert(sct != NULL); 5010 assert(sc != NULL); 5011 assert(cdw0 != NULL); 5012 5013 if (bdev_io->internal.status == SPDK_BDEV_IO_STATUS_NVME_ERROR) { 5014 *sct = bdev_io->internal.error.nvme.sct; 5015 *sc = bdev_io->internal.error.nvme.sc; 5016 } else if (bdev_io->internal.status == SPDK_BDEV_IO_STATUS_SUCCESS) { 5017 *sct = SPDK_NVME_SCT_GENERIC; 5018 *sc = SPDK_NVME_SC_SUCCESS; 5019 } else if (bdev_io->internal.status == SPDK_BDEV_IO_STATUS_ABORTED) { 5020 *sct = SPDK_NVME_SCT_GENERIC; 5021 *sc = SPDK_NVME_SC_ABORTED_BY_REQUEST; 5022 } else { 5023 *sct = SPDK_NVME_SCT_GENERIC; 5024 *sc = SPDK_NVME_SC_INTERNAL_DEVICE_ERROR; 5025 } 5026 5027 *cdw0 = bdev_io->internal.error.nvme.cdw0; 5028 } 5029 5030 void 5031 spdk_bdev_io_get_nvme_fused_status(const struct spdk_bdev_io *bdev_io, uint32_t *cdw0, 5032 int *first_sct, int *first_sc, int *second_sct, int *second_sc) 5033 { 5034 assert(first_sct != NULL); 5035 assert(first_sc != NULL); 5036 assert(second_sct != NULL); 5037 assert(second_sc != NULL); 5038 assert(cdw0 != NULL); 5039 5040 if (bdev_io->internal.status == SPDK_BDEV_IO_STATUS_NVME_ERROR) { 5041 if (bdev_io->internal.error.nvme.sct == SPDK_NVME_SCT_MEDIA_ERROR && 5042 bdev_io->internal.error.nvme.sc == SPDK_NVME_SC_COMPARE_FAILURE) { 5043 *first_sct = bdev_io->internal.error.nvme.sct; 5044 *first_sc = bdev_io->internal.error.nvme.sc; 5045 *second_sct = SPDK_NVME_SCT_GENERIC; 5046 *second_sc = SPDK_NVME_SC_ABORTED_FAILED_FUSED; 5047 } else { 5048 *first_sct = SPDK_NVME_SCT_GENERIC; 5049 *first_sc = SPDK_NVME_SC_SUCCESS; 5050 *second_sct = bdev_io->internal.error.nvme.sct; 5051 *second_sc = bdev_io->internal.error.nvme.sc; 5052 } 5053 } else if (bdev_io->internal.status == SPDK_BDEV_IO_STATUS_SUCCESS) { 5054 *first_sct = SPDK_NVME_SCT_GENERIC; 5055 *first_sc = SPDK_NVME_SC_SUCCESS; 5056 *second_sct = SPDK_NVME_SCT_GENERIC; 5057 *second_sc = SPDK_NVME_SC_SUCCESS; 5058 } else if (bdev_io->internal.status == SPDK_BDEV_IO_STATUS_FIRST_FUSED_FAILED) { 5059 *first_sct = SPDK_NVME_SCT_GENERIC; 5060 *first_sc = SPDK_NVME_SC_INTERNAL_DEVICE_ERROR; 5061 *second_sct = SPDK_NVME_SCT_GENERIC; 5062 *second_sc = SPDK_NVME_SC_ABORTED_FAILED_FUSED; 5063 } else if (bdev_io->internal.status == SPDK_BDEV_IO_STATUS_MISCOMPARE) { 5064 *first_sct = SPDK_NVME_SCT_MEDIA_ERROR; 5065 *first_sc = SPDK_NVME_SC_COMPARE_FAILURE; 5066 *second_sct = SPDK_NVME_SCT_GENERIC; 5067 *second_sc = SPDK_NVME_SC_ABORTED_FAILED_FUSED; 5068 } else { 5069 *first_sct = SPDK_NVME_SCT_GENERIC; 5070 *first_sc = SPDK_NVME_SC_INTERNAL_DEVICE_ERROR; 5071 *second_sct = SPDK_NVME_SCT_GENERIC; 5072 *second_sc = SPDK_NVME_SC_INTERNAL_DEVICE_ERROR; 5073 } 5074 5075 *cdw0 = bdev_io->internal.error.nvme.cdw0; 5076 } 5077 5078 struct spdk_thread * 5079 spdk_bdev_io_get_thread(struct spdk_bdev_io *bdev_io) 5080 { 5081 return spdk_io_channel_get_thread(bdev_io->internal.ch->channel); 5082 } 5083 5084 struct spdk_io_channel * 5085 spdk_bdev_io_get_io_channel(struct spdk_bdev_io *bdev_io) 5086 { 5087 return bdev_io->internal.ch->channel; 5088 } 5089 5090 static void 5091 bdev_qos_config_limit(struct spdk_bdev *bdev, uint64_t *limits) 5092 { 5093 uint64_t min_qos_set; 5094 int i; 5095 5096 for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) { 5097 if (limits[i] != SPDK_BDEV_QOS_LIMIT_NOT_DEFINED) { 5098 break; 5099 } 5100 } 5101 5102 if (i == SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES) { 5103 SPDK_ERRLOG("Invalid rate limits set.\n"); 5104 return; 5105 } 5106 5107 for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) { 5108 if (limits[i] == SPDK_BDEV_QOS_LIMIT_NOT_DEFINED) { 5109 continue; 5110 } 5111 5112 if (bdev_qos_is_iops_rate_limit(i) == true) { 5113 min_qos_set = SPDK_BDEV_QOS_MIN_IOS_PER_SEC; 5114 } else { 5115 min_qos_set = SPDK_BDEV_QOS_MIN_BYTES_PER_SEC; 5116 } 5117 5118 if (limits[i] == 0 || limits[i] % min_qos_set) { 5119 SPDK_ERRLOG("Assigned limit %" PRIu64 " on bdev %s is not multiple of %" PRIu64 "\n", 5120 limits[i], bdev->name, min_qos_set); 5121 SPDK_ERRLOG("Failed to enable QoS on this bdev %s\n", bdev->name); 5122 return; 5123 } 5124 } 5125 5126 if (!bdev->internal.qos) { 5127 bdev->internal.qos = calloc(1, sizeof(*bdev->internal.qos)); 5128 if (!bdev->internal.qos) { 5129 SPDK_ERRLOG("Unable to allocate memory for QoS tracking\n"); 5130 return; 5131 } 5132 } 5133 5134 for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) { 5135 bdev->internal.qos->rate_limits[i].limit = limits[i]; 5136 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Bdev:%s QoS type:%d set:%lu\n", 5137 bdev->name, i, limits[i]); 5138 } 5139 5140 return; 5141 } 5142 5143 static void 5144 bdev_qos_config(struct spdk_bdev *bdev) 5145 { 5146 struct spdk_conf_section *sp = NULL; 5147 const char *val = NULL; 5148 int i = 0, j = 0; 5149 uint64_t limits[SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES] = {}; 5150 bool config_qos = false; 5151 5152 sp = spdk_conf_find_section(NULL, "QoS"); 5153 if (!sp) { 5154 return; 5155 } 5156 5157 while (j < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES) { 5158 limits[j] = SPDK_BDEV_QOS_LIMIT_NOT_DEFINED; 5159 5160 i = 0; 5161 while (true) { 5162 val = spdk_conf_section_get_nmval(sp, qos_conf_type[j], i, 0); 5163 if (!val) { 5164 break; 5165 } 5166 5167 if (strcmp(bdev->name, val) != 0) { 5168 i++; 5169 continue; 5170 } 5171 5172 val = spdk_conf_section_get_nmval(sp, qos_conf_type[j], i, 1); 5173 if (val) { 5174 if (bdev_qos_is_iops_rate_limit(j) == true) { 5175 limits[j] = strtoull(val, NULL, 10); 5176 } else { 5177 limits[j] = strtoull(val, NULL, 10) * 1024 * 1024; 5178 } 5179 config_qos = true; 5180 } 5181 5182 break; 5183 } 5184 5185 j++; 5186 } 5187 5188 if (config_qos == true) { 5189 bdev_qos_config_limit(bdev, limits); 5190 } 5191 5192 return; 5193 } 5194 5195 static int 5196 bdev_init(struct spdk_bdev *bdev) 5197 { 5198 char *bdev_name; 5199 5200 assert(bdev->module != NULL); 5201 5202 if (!bdev->name) { 5203 SPDK_ERRLOG("Bdev name is NULL\n"); 5204 return -EINVAL; 5205 } 5206 5207 if (!strlen(bdev->name)) { 5208 SPDK_ERRLOG("Bdev name must not be an empty string\n"); 5209 return -EINVAL; 5210 } 5211 5212 if (spdk_bdev_get_by_name(bdev->name)) { 5213 SPDK_ERRLOG("Bdev name:%s already exists\n", bdev->name); 5214 return -EEXIST; 5215 } 5216 5217 /* Users often register their own I/O devices using the bdev name. In 5218 * order to avoid conflicts, prepend bdev_. */ 5219 bdev_name = spdk_sprintf_alloc("bdev_%s", bdev->name); 5220 if (!bdev_name) { 5221 SPDK_ERRLOG("Unable to allocate memory for internal bdev name.\n"); 5222 return -ENOMEM; 5223 } 5224 5225 bdev->internal.status = SPDK_BDEV_STATUS_READY; 5226 bdev->internal.measured_queue_depth = UINT64_MAX; 5227 bdev->internal.claim_module = NULL; 5228 bdev->internal.qd_poller = NULL; 5229 bdev->internal.qos = NULL; 5230 5231 /* If the user didn't specify a uuid, generate one. */ 5232 if (spdk_mem_all_zero(&bdev->uuid, sizeof(bdev->uuid))) { 5233 spdk_uuid_generate(&bdev->uuid); 5234 } 5235 5236 if (spdk_bdev_get_buf_align(bdev) > 1) { 5237 if (bdev->split_on_optimal_io_boundary) { 5238 bdev->optimal_io_boundary = spdk_min(bdev->optimal_io_boundary, 5239 SPDK_BDEV_LARGE_BUF_MAX_SIZE / bdev->blocklen); 5240 } else { 5241 bdev->split_on_optimal_io_boundary = true; 5242 bdev->optimal_io_boundary = SPDK_BDEV_LARGE_BUF_MAX_SIZE / bdev->blocklen; 5243 } 5244 } 5245 5246 /* If the user didn't specify a write unit size, set it to one. */ 5247 if (bdev->write_unit_size == 0) { 5248 bdev->write_unit_size = 1; 5249 } 5250 5251 /* Set ACWU value to 1 if bdev module did not set it (does not support it natively) */ 5252 if (bdev->acwu == 0) { 5253 bdev->acwu = 1; 5254 } 5255 5256 TAILQ_INIT(&bdev->internal.open_descs); 5257 TAILQ_INIT(&bdev->internal.locked_ranges); 5258 TAILQ_INIT(&bdev->internal.pending_locked_ranges); 5259 5260 TAILQ_INIT(&bdev->aliases); 5261 5262 bdev->internal.reset_in_progress = NULL; 5263 5264 bdev_qos_config(bdev); 5265 5266 spdk_io_device_register(__bdev_to_io_dev(bdev), 5267 bdev_channel_create, bdev_channel_destroy, 5268 sizeof(struct spdk_bdev_channel), 5269 bdev_name); 5270 5271 free(bdev_name); 5272 5273 pthread_mutex_init(&bdev->internal.mutex, NULL); 5274 return 0; 5275 } 5276 5277 static void 5278 bdev_destroy_cb(void *io_device) 5279 { 5280 int rc; 5281 struct spdk_bdev *bdev; 5282 spdk_bdev_unregister_cb cb_fn; 5283 void *cb_arg; 5284 5285 bdev = __bdev_from_io_dev(io_device); 5286 cb_fn = bdev->internal.unregister_cb; 5287 cb_arg = bdev->internal.unregister_ctx; 5288 5289 rc = bdev->fn_table->destruct(bdev->ctxt); 5290 if (rc < 0) { 5291 SPDK_ERRLOG("destruct failed\n"); 5292 } 5293 if (rc <= 0 && cb_fn != NULL) { 5294 cb_fn(cb_arg, rc); 5295 } 5296 } 5297 5298 5299 static void 5300 bdev_fini(struct spdk_bdev *bdev) 5301 { 5302 pthread_mutex_destroy(&bdev->internal.mutex); 5303 5304 free(bdev->internal.qos); 5305 5306 spdk_io_device_unregister(__bdev_to_io_dev(bdev), bdev_destroy_cb); 5307 } 5308 5309 static void 5310 bdev_start(struct spdk_bdev *bdev) 5311 { 5312 struct spdk_bdev_module *module; 5313 uint32_t action; 5314 5315 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Inserting bdev %s into list\n", bdev->name); 5316 TAILQ_INSERT_TAIL(&g_bdev_mgr.bdevs, bdev, internal.link); 5317 5318 /* Examine configuration before initializing I/O */ 5319 TAILQ_FOREACH(module, &g_bdev_mgr.bdev_modules, internal.tailq) { 5320 if (module->examine_config && bdev_ok_to_examine(bdev)) { 5321 action = module->internal.action_in_progress; 5322 module->internal.action_in_progress++; 5323 module->examine_config(bdev); 5324 if (action != module->internal.action_in_progress) { 5325 SPDK_ERRLOG("examine_config for module %s did not call spdk_bdev_module_examine_done()\n", 5326 module->name); 5327 } 5328 } 5329 } 5330 5331 if (bdev->internal.claim_module && bdev_ok_to_examine(bdev)) { 5332 if (bdev->internal.claim_module->examine_disk) { 5333 bdev->internal.claim_module->internal.action_in_progress++; 5334 bdev->internal.claim_module->examine_disk(bdev); 5335 } 5336 return; 5337 } 5338 5339 TAILQ_FOREACH(module, &g_bdev_mgr.bdev_modules, internal.tailq) { 5340 if (module->examine_disk && bdev_ok_to_examine(bdev)) { 5341 module->internal.action_in_progress++; 5342 module->examine_disk(bdev); 5343 } 5344 } 5345 } 5346 5347 int 5348 spdk_bdev_register(struct spdk_bdev *bdev) 5349 { 5350 int rc = bdev_init(bdev); 5351 5352 if (rc == 0) { 5353 bdev_start(bdev); 5354 } 5355 5356 spdk_notify_send("bdev_register", spdk_bdev_get_name(bdev)); 5357 return rc; 5358 } 5359 5360 int 5361 spdk_vbdev_register(struct spdk_bdev *vbdev, struct spdk_bdev **base_bdevs, int base_bdev_count) 5362 { 5363 SPDK_ERRLOG("This function is deprecated. Use spdk_bdev_register() instead.\n"); 5364 return spdk_bdev_register(vbdev); 5365 } 5366 5367 void 5368 spdk_bdev_destruct_done(struct spdk_bdev *bdev, int bdeverrno) 5369 { 5370 if (bdev->internal.unregister_cb != NULL) { 5371 bdev->internal.unregister_cb(bdev->internal.unregister_ctx, bdeverrno); 5372 } 5373 } 5374 5375 static void 5376 _remove_notify(void *arg) 5377 { 5378 struct spdk_bdev_desc *desc = arg; 5379 5380 pthread_mutex_lock(&desc->mutex); 5381 desc->refs--; 5382 5383 if (!desc->closed) { 5384 pthread_mutex_unlock(&desc->mutex); 5385 if (desc->callback.open_with_ext) { 5386 desc->callback.event_fn(SPDK_BDEV_EVENT_REMOVE, desc->bdev, desc->callback.ctx); 5387 } else { 5388 desc->callback.remove_fn(desc->callback.ctx); 5389 } 5390 return; 5391 } else if (0 == desc->refs) { 5392 /* This descriptor was closed after this remove_notify message was sent. 5393 * spdk_bdev_close() could not free the descriptor since this message was 5394 * in flight, so we free it now using bdev_desc_free(). 5395 */ 5396 pthread_mutex_unlock(&desc->mutex); 5397 bdev_desc_free(desc); 5398 return; 5399 } 5400 pthread_mutex_unlock(&desc->mutex); 5401 } 5402 5403 /* Must be called while holding bdev->internal.mutex. 5404 * returns: 0 - bdev removed and ready to be destructed. 5405 * -EBUSY - bdev can't be destructed yet. */ 5406 static int 5407 bdev_unregister_unsafe(struct spdk_bdev *bdev) 5408 { 5409 struct spdk_bdev_desc *desc, *tmp; 5410 int rc = 0; 5411 5412 /* Notify each descriptor about hotremoval */ 5413 TAILQ_FOREACH_SAFE(desc, &bdev->internal.open_descs, link, tmp) { 5414 rc = -EBUSY; 5415 pthread_mutex_lock(&desc->mutex); 5416 /* 5417 * Defer invocation of the event_cb to a separate message that will 5418 * run later on its thread. This ensures this context unwinds and 5419 * we don't recursively unregister this bdev again if the event_cb 5420 * immediately closes its descriptor. 5421 */ 5422 desc->refs++; 5423 spdk_thread_send_msg(desc->thread, _remove_notify, desc); 5424 pthread_mutex_unlock(&desc->mutex); 5425 } 5426 5427 /* If there are no descriptors, proceed removing the bdev */ 5428 if (rc == 0) { 5429 TAILQ_REMOVE(&g_bdev_mgr.bdevs, bdev, internal.link); 5430 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Removing bdev %s from list done\n", bdev->name); 5431 spdk_notify_send("bdev_unregister", spdk_bdev_get_name(bdev)); 5432 } 5433 5434 return rc; 5435 } 5436 5437 void 5438 spdk_bdev_unregister(struct spdk_bdev *bdev, spdk_bdev_unregister_cb cb_fn, void *cb_arg) 5439 { 5440 struct spdk_thread *thread; 5441 int rc; 5442 5443 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Removing bdev %s from list\n", bdev->name); 5444 5445 thread = spdk_get_thread(); 5446 if (!thread) { 5447 /* The user called this from a non-SPDK thread. */ 5448 if (cb_fn != NULL) { 5449 cb_fn(cb_arg, -ENOTSUP); 5450 } 5451 return; 5452 } 5453 5454 pthread_mutex_lock(&g_bdev_mgr.mutex); 5455 pthread_mutex_lock(&bdev->internal.mutex); 5456 if (bdev->internal.status == SPDK_BDEV_STATUS_REMOVING) { 5457 pthread_mutex_unlock(&bdev->internal.mutex); 5458 pthread_mutex_unlock(&g_bdev_mgr.mutex); 5459 if (cb_fn) { 5460 cb_fn(cb_arg, -EBUSY); 5461 } 5462 return; 5463 } 5464 5465 bdev->internal.status = SPDK_BDEV_STATUS_REMOVING; 5466 bdev->internal.unregister_cb = cb_fn; 5467 bdev->internal.unregister_ctx = cb_arg; 5468 5469 /* Call under lock. */ 5470 rc = bdev_unregister_unsafe(bdev); 5471 pthread_mutex_unlock(&bdev->internal.mutex); 5472 pthread_mutex_unlock(&g_bdev_mgr.mutex); 5473 5474 if (rc == 0) { 5475 bdev_fini(bdev); 5476 } 5477 } 5478 5479 static void 5480 bdev_dummy_event_cb(void *remove_ctx) 5481 { 5482 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Bdev remove event received with no remove callback specified"); 5483 } 5484 5485 static int 5486 bdev_start_qos(struct spdk_bdev *bdev) 5487 { 5488 struct set_qos_limit_ctx *ctx; 5489 5490 /* Enable QoS */ 5491 if (bdev->internal.qos && bdev->internal.qos->thread == NULL) { 5492 ctx = calloc(1, sizeof(*ctx)); 5493 if (ctx == NULL) { 5494 SPDK_ERRLOG("Failed to allocate memory for QoS context\n"); 5495 return -ENOMEM; 5496 } 5497 ctx->bdev = bdev; 5498 spdk_for_each_channel(__bdev_to_io_dev(bdev), 5499 bdev_enable_qos_msg, ctx, 5500 bdev_enable_qos_done); 5501 } 5502 5503 return 0; 5504 } 5505 5506 static int 5507 bdev_open(struct spdk_bdev *bdev, bool write, struct spdk_bdev_desc *desc) 5508 { 5509 struct spdk_thread *thread; 5510 int rc = 0; 5511 5512 thread = spdk_get_thread(); 5513 if (!thread) { 5514 SPDK_ERRLOG("Cannot open bdev from non-SPDK thread.\n"); 5515 return -ENOTSUP; 5516 } 5517 5518 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Opening descriptor %p for bdev %s on thread %p\n", desc, bdev->name, 5519 spdk_get_thread()); 5520 5521 desc->bdev = bdev; 5522 desc->thread = thread; 5523 desc->write = write; 5524 5525 pthread_mutex_lock(&bdev->internal.mutex); 5526 if (bdev->internal.status == SPDK_BDEV_STATUS_REMOVING) { 5527 pthread_mutex_unlock(&bdev->internal.mutex); 5528 return -ENODEV; 5529 } 5530 5531 if (write && bdev->internal.claim_module) { 5532 SPDK_ERRLOG("Could not open %s - %s module already claimed it\n", 5533 bdev->name, bdev->internal.claim_module->name); 5534 pthread_mutex_unlock(&bdev->internal.mutex); 5535 return -EPERM; 5536 } 5537 5538 rc = bdev_start_qos(bdev); 5539 if (rc != 0) { 5540 SPDK_ERRLOG("Failed to start QoS on bdev %s\n", bdev->name); 5541 pthread_mutex_unlock(&bdev->internal.mutex); 5542 return rc; 5543 } 5544 5545 TAILQ_INSERT_TAIL(&bdev->internal.open_descs, desc, link); 5546 5547 pthread_mutex_unlock(&bdev->internal.mutex); 5548 5549 return 0; 5550 } 5551 5552 int 5553 spdk_bdev_open(struct spdk_bdev *bdev, bool write, spdk_bdev_remove_cb_t remove_cb, 5554 void *remove_ctx, struct spdk_bdev_desc **_desc) 5555 { 5556 struct spdk_bdev_desc *desc; 5557 int rc; 5558 5559 desc = calloc(1, sizeof(*desc)); 5560 if (desc == NULL) { 5561 SPDK_ERRLOG("Failed to allocate memory for bdev descriptor\n"); 5562 return -ENOMEM; 5563 } 5564 5565 if (remove_cb == NULL) { 5566 remove_cb = bdev_dummy_event_cb; 5567 } 5568 5569 TAILQ_INIT(&desc->pending_media_events); 5570 TAILQ_INIT(&desc->free_media_events); 5571 5572 desc->callback.open_with_ext = false; 5573 desc->callback.remove_fn = remove_cb; 5574 desc->callback.ctx = remove_ctx; 5575 pthread_mutex_init(&desc->mutex, NULL); 5576 5577 pthread_mutex_lock(&g_bdev_mgr.mutex); 5578 5579 rc = bdev_open(bdev, write, desc); 5580 if (rc != 0) { 5581 bdev_desc_free(desc); 5582 desc = NULL; 5583 } 5584 5585 *_desc = desc; 5586 5587 pthread_mutex_unlock(&g_bdev_mgr.mutex); 5588 5589 return rc; 5590 } 5591 5592 int 5593 spdk_bdev_open_ext(const char *bdev_name, bool write, spdk_bdev_event_cb_t event_cb, 5594 void *event_ctx, struct spdk_bdev_desc **_desc) 5595 { 5596 struct spdk_bdev_desc *desc; 5597 struct spdk_bdev *bdev; 5598 unsigned int event_id; 5599 int rc; 5600 5601 if (event_cb == NULL) { 5602 SPDK_ERRLOG("Missing event callback function\n"); 5603 return -EINVAL; 5604 } 5605 5606 pthread_mutex_lock(&g_bdev_mgr.mutex); 5607 5608 bdev = spdk_bdev_get_by_name(bdev_name); 5609 5610 if (bdev == NULL) { 5611 SPDK_ERRLOG("Failed to find bdev with name: %s\n", bdev_name); 5612 pthread_mutex_unlock(&g_bdev_mgr.mutex); 5613 return -EINVAL; 5614 } 5615 5616 desc = calloc(1, sizeof(*desc)); 5617 if (desc == NULL) { 5618 SPDK_ERRLOG("Failed to allocate memory for bdev descriptor\n"); 5619 pthread_mutex_unlock(&g_bdev_mgr.mutex); 5620 return -ENOMEM; 5621 } 5622 5623 TAILQ_INIT(&desc->pending_media_events); 5624 TAILQ_INIT(&desc->free_media_events); 5625 5626 desc->callback.open_with_ext = true; 5627 desc->callback.event_fn = event_cb; 5628 desc->callback.ctx = event_ctx; 5629 pthread_mutex_init(&desc->mutex, NULL); 5630 5631 if (bdev->media_events) { 5632 desc->media_events_buffer = calloc(MEDIA_EVENT_POOL_SIZE, 5633 sizeof(*desc->media_events_buffer)); 5634 if (desc->media_events_buffer == NULL) { 5635 SPDK_ERRLOG("Failed to initialize media event pool\n"); 5636 bdev_desc_free(desc); 5637 pthread_mutex_unlock(&g_bdev_mgr.mutex); 5638 return -ENOMEM; 5639 } 5640 5641 for (event_id = 0; event_id < MEDIA_EVENT_POOL_SIZE; ++event_id) { 5642 TAILQ_INSERT_TAIL(&desc->free_media_events, 5643 &desc->media_events_buffer[event_id], tailq); 5644 } 5645 } 5646 5647 rc = bdev_open(bdev, write, desc); 5648 if (rc != 0) { 5649 bdev_desc_free(desc); 5650 desc = NULL; 5651 } 5652 5653 *_desc = desc; 5654 5655 pthread_mutex_unlock(&g_bdev_mgr.mutex); 5656 5657 return rc; 5658 } 5659 5660 void 5661 spdk_bdev_close(struct spdk_bdev_desc *desc) 5662 { 5663 struct spdk_bdev *bdev = spdk_bdev_desc_get_bdev(desc); 5664 int rc; 5665 5666 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Closing descriptor %p for bdev %s on thread %p\n", desc, bdev->name, 5667 spdk_get_thread()); 5668 5669 assert(desc->thread == spdk_get_thread()); 5670 5671 spdk_poller_unregister(&desc->io_timeout_poller); 5672 5673 pthread_mutex_lock(&bdev->internal.mutex); 5674 pthread_mutex_lock(&desc->mutex); 5675 5676 TAILQ_REMOVE(&bdev->internal.open_descs, desc, link); 5677 5678 desc->closed = true; 5679 5680 if (0 == desc->refs) { 5681 pthread_mutex_unlock(&desc->mutex); 5682 bdev_desc_free(desc); 5683 } else { 5684 pthread_mutex_unlock(&desc->mutex); 5685 } 5686 5687 /* If no more descriptors, kill QoS channel */ 5688 if (bdev->internal.qos && TAILQ_EMPTY(&bdev->internal.open_descs)) { 5689 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Closed last descriptor for bdev %s on thread %p. Stopping QoS.\n", 5690 bdev->name, spdk_get_thread()); 5691 5692 if (bdev_qos_destroy(bdev)) { 5693 /* There isn't anything we can do to recover here. Just let the 5694 * old QoS poller keep running. The QoS handling won't change 5695 * cores when the user allocates a new channel, but it won't break. */ 5696 SPDK_ERRLOG("Unable to shut down QoS poller. It will continue running on the current thread.\n"); 5697 } 5698 } 5699 5700 spdk_bdev_set_qd_sampling_period(bdev, 0); 5701 5702 if (bdev->internal.status == SPDK_BDEV_STATUS_REMOVING && TAILQ_EMPTY(&bdev->internal.open_descs)) { 5703 rc = bdev_unregister_unsafe(bdev); 5704 pthread_mutex_unlock(&bdev->internal.mutex); 5705 5706 if (rc == 0) { 5707 bdev_fini(bdev); 5708 } 5709 } else { 5710 pthread_mutex_unlock(&bdev->internal.mutex); 5711 } 5712 } 5713 5714 int 5715 spdk_bdev_module_claim_bdev(struct spdk_bdev *bdev, struct spdk_bdev_desc *desc, 5716 struct spdk_bdev_module *module) 5717 { 5718 if (bdev->internal.claim_module != NULL) { 5719 SPDK_ERRLOG("bdev %s already claimed by module %s\n", bdev->name, 5720 bdev->internal.claim_module->name); 5721 return -EPERM; 5722 } 5723 5724 if (desc && !desc->write) { 5725 desc->write = true; 5726 } 5727 5728 bdev->internal.claim_module = module; 5729 return 0; 5730 } 5731 5732 void 5733 spdk_bdev_module_release_bdev(struct spdk_bdev *bdev) 5734 { 5735 assert(bdev->internal.claim_module != NULL); 5736 bdev->internal.claim_module = NULL; 5737 } 5738 5739 struct spdk_bdev * 5740 spdk_bdev_desc_get_bdev(struct spdk_bdev_desc *desc) 5741 { 5742 assert(desc != NULL); 5743 return desc->bdev; 5744 } 5745 5746 void 5747 spdk_bdev_io_get_iovec(struct spdk_bdev_io *bdev_io, struct iovec **iovp, int *iovcntp) 5748 { 5749 struct iovec *iovs; 5750 int iovcnt; 5751 5752 if (bdev_io == NULL) { 5753 return; 5754 } 5755 5756 switch (bdev_io->type) { 5757 case SPDK_BDEV_IO_TYPE_READ: 5758 case SPDK_BDEV_IO_TYPE_WRITE: 5759 case SPDK_BDEV_IO_TYPE_ZCOPY: 5760 iovs = bdev_io->u.bdev.iovs; 5761 iovcnt = bdev_io->u.bdev.iovcnt; 5762 break; 5763 default: 5764 iovs = NULL; 5765 iovcnt = 0; 5766 break; 5767 } 5768 5769 if (iovp) { 5770 *iovp = iovs; 5771 } 5772 if (iovcntp) { 5773 *iovcntp = iovcnt; 5774 } 5775 } 5776 5777 void * 5778 spdk_bdev_io_get_md_buf(struct spdk_bdev_io *bdev_io) 5779 { 5780 if (bdev_io == NULL) { 5781 return NULL; 5782 } 5783 5784 if (!spdk_bdev_is_md_separate(bdev_io->bdev)) { 5785 return NULL; 5786 } 5787 5788 if (bdev_io->type == SPDK_BDEV_IO_TYPE_READ || 5789 bdev_io->type == SPDK_BDEV_IO_TYPE_WRITE) { 5790 return bdev_io->u.bdev.md_buf; 5791 } 5792 5793 return NULL; 5794 } 5795 5796 void * 5797 spdk_bdev_io_get_cb_arg(struct spdk_bdev_io *bdev_io) 5798 { 5799 if (bdev_io == NULL) { 5800 assert(false); 5801 return NULL; 5802 } 5803 5804 return bdev_io->internal.caller_ctx; 5805 } 5806 5807 void 5808 spdk_bdev_module_list_add(struct spdk_bdev_module *bdev_module) 5809 { 5810 5811 if (spdk_bdev_module_list_find(bdev_module->name)) { 5812 SPDK_ERRLOG("ERROR: module '%s' already registered.\n", bdev_module->name); 5813 assert(false); 5814 } 5815 5816 /* 5817 * Modules with examine callbacks must be initialized first, so they are 5818 * ready to handle examine callbacks from later modules that will 5819 * register physical bdevs. 5820 */ 5821 if (bdev_module->examine_config != NULL || bdev_module->examine_disk != NULL) { 5822 TAILQ_INSERT_HEAD(&g_bdev_mgr.bdev_modules, bdev_module, internal.tailq); 5823 } else { 5824 TAILQ_INSERT_TAIL(&g_bdev_mgr.bdev_modules, bdev_module, internal.tailq); 5825 } 5826 } 5827 5828 struct spdk_bdev_module * 5829 spdk_bdev_module_list_find(const char *name) 5830 { 5831 struct spdk_bdev_module *bdev_module; 5832 5833 TAILQ_FOREACH(bdev_module, &g_bdev_mgr.bdev_modules, internal.tailq) { 5834 if (strcmp(name, bdev_module->name) == 0) { 5835 break; 5836 } 5837 } 5838 5839 return bdev_module; 5840 } 5841 5842 static void 5843 bdev_write_zero_buffer_next(void *_bdev_io) 5844 { 5845 struct spdk_bdev_io *bdev_io = _bdev_io; 5846 uint64_t num_bytes, num_blocks; 5847 void *md_buf = NULL; 5848 int rc; 5849 5850 num_bytes = spdk_min(_bdev_get_block_size_with_md(bdev_io->bdev) * 5851 bdev_io->u.bdev.split_remaining_num_blocks, 5852 ZERO_BUFFER_SIZE); 5853 num_blocks = num_bytes / _bdev_get_block_size_with_md(bdev_io->bdev); 5854 5855 if (spdk_bdev_is_md_separate(bdev_io->bdev)) { 5856 md_buf = (char *)g_bdev_mgr.zero_buffer + 5857 spdk_bdev_get_block_size(bdev_io->bdev) * num_blocks; 5858 } 5859 5860 rc = bdev_write_blocks_with_md(bdev_io->internal.desc, 5861 spdk_io_channel_from_ctx(bdev_io->internal.ch), 5862 g_bdev_mgr.zero_buffer, md_buf, 5863 bdev_io->u.bdev.split_current_offset_blocks, num_blocks, 5864 bdev_write_zero_buffer_done, bdev_io); 5865 if (rc == 0) { 5866 bdev_io->u.bdev.split_remaining_num_blocks -= num_blocks; 5867 bdev_io->u.bdev.split_current_offset_blocks += num_blocks; 5868 } else if (rc == -ENOMEM) { 5869 bdev_queue_io_wait_with_cb(bdev_io, bdev_write_zero_buffer_next); 5870 } else { 5871 bdev_io->internal.status = SPDK_BDEV_IO_STATUS_FAILED; 5872 bdev_io->internal.cb(bdev_io, false, bdev_io->internal.caller_ctx); 5873 } 5874 } 5875 5876 static void 5877 bdev_write_zero_buffer_done(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg) 5878 { 5879 struct spdk_bdev_io *parent_io = cb_arg; 5880 5881 spdk_bdev_free_io(bdev_io); 5882 5883 if (!success) { 5884 parent_io->internal.status = SPDK_BDEV_IO_STATUS_FAILED; 5885 parent_io->internal.cb(parent_io, false, parent_io->internal.caller_ctx); 5886 return; 5887 } 5888 5889 if (parent_io->u.bdev.split_remaining_num_blocks == 0) { 5890 parent_io->internal.status = SPDK_BDEV_IO_STATUS_SUCCESS; 5891 parent_io->internal.cb(parent_io, true, parent_io->internal.caller_ctx); 5892 return; 5893 } 5894 5895 bdev_write_zero_buffer_next(parent_io); 5896 } 5897 5898 static void 5899 bdev_set_qos_limit_done(struct set_qos_limit_ctx *ctx, int status) 5900 { 5901 pthread_mutex_lock(&ctx->bdev->internal.mutex); 5902 ctx->bdev->internal.qos_mod_in_progress = false; 5903 pthread_mutex_unlock(&ctx->bdev->internal.mutex); 5904 5905 if (ctx->cb_fn) { 5906 ctx->cb_fn(ctx->cb_arg, status); 5907 } 5908 free(ctx); 5909 } 5910 5911 static void 5912 bdev_disable_qos_done(void *cb_arg) 5913 { 5914 struct set_qos_limit_ctx *ctx = cb_arg; 5915 struct spdk_bdev *bdev = ctx->bdev; 5916 struct spdk_bdev_io *bdev_io; 5917 struct spdk_bdev_qos *qos; 5918 5919 pthread_mutex_lock(&bdev->internal.mutex); 5920 qos = bdev->internal.qos; 5921 bdev->internal.qos = NULL; 5922 pthread_mutex_unlock(&bdev->internal.mutex); 5923 5924 while (!TAILQ_EMPTY(&qos->queued)) { 5925 /* Send queued I/O back to their original thread for resubmission. */ 5926 bdev_io = TAILQ_FIRST(&qos->queued); 5927 TAILQ_REMOVE(&qos->queued, bdev_io, internal.link); 5928 5929 if (bdev_io->internal.io_submit_ch) { 5930 /* 5931 * Channel was changed when sending it to the QoS thread - change it back 5932 * before sending it back to the original thread. 5933 */ 5934 bdev_io->internal.ch = bdev_io->internal.io_submit_ch; 5935 bdev_io->internal.io_submit_ch = NULL; 5936 } 5937 5938 spdk_thread_send_msg(spdk_bdev_io_get_thread(bdev_io), 5939 _bdev_io_submit, bdev_io); 5940 } 5941 5942 if (qos->thread != NULL) { 5943 spdk_put_io_channel(spdk_io_channel_from_ctx(qos->ch)); 5944 spdk_poller_unregister(&qos->poller); 5945 } 5946 5947 free(qos); 5948 5949 bdev_set_qos_limit_done(ctx, 0); 5950 } 5951 5952 static void 5953 bdev_disable_qos_msg_done(struct spdk_io_channel_iter *i, int status) 5954 { 5955 void *io_device = spdk_io_channel_iter_get_io_device(i); 5956 struct spdk_bdev *bdev = __bdev_from_io_dev(io_device); 5957 struct set_qos_limit_ctx *ctx = spdk_io_channel_iter_get_ctx(i); 5958 struct spdk_thread *thread; 5959 5960 pthread_mutex_lock(&bdev->internal.mutex); 5961 thread = bdev->internal.qos->thread; 5962 pthread_mutex_unlock(&bdev->internal.mutex); 5963 5964 if (thread != NULL) { 5965 spdk_thread_send_msg(thread, bdev_disable_qos_done, ctx); 5966 } else { 5967 bdev_disable_qos_done(ctx); 5968 } 5969 } 5970 5971 static void 5972 bdev_disable_qos_msg(struct spdk_io_channel_iter *i) 5973 { 5974 struct spdk_io_channel *ch = spdk_io_channel_iter_get_channel(i); 5975 struct spdk_bdev_channel *bdev_ch = spdk_io_channel_get_ctx(ch); 5976 5977 bdev_ch->flags &= ~BDEV_CH_QOS_ENABLED; 5978 5979 spdk_for_each_channel_continue(i, 0); 5980 } 5981 5982 static void 5983 bdev_update_qos_rate_limit_msg(void *cb_arg) 5984 { 5985 struct set_qos_limit_ctx *ctx = cb_arg; 5986 struct spdk_bdev *bdev = ctx->bdev; 5987 5988 pthread_mutex_lock(&bdev->internal.mutex); 5989 bdev_qos_update_max_quota_per_timeslice(bdev->internal.qos); 5990 pthread_mutex_unlock(&bdev->internal.mutex); 5991 5992 bdev_set_qos_limit_done(ctx, 0); 5993 } 5994 5995 static void 5996 bdev_enable_qos_msg(struct spdk_io_channel_iter *i) 5997 { 5998 void *io_device = spdk_io_channel_iter_get_io_device(i); 5999 struct spdk_bdev *bdev = __bdev_from_io_dev(io_device); 6000 struct spdk_io_channel *ch = spdk_io_channel_iter_get_channel(i); 6001 struct spdk_bdev_channel *bdev_ch = spdk_io_channel_get_ctx(ch); 6002 6003 pthread_mutex_lock(&bdev->internal.mutex); 6004 bdev_enable_qos(bdev, bdev_ch); 6005 pthread_mutex_unlock(&bdev->internal.mutex); 6006 spdk_for_each_channel_continue(i, 0); 6007 } 6008 6009 static void 6010 bdev_enable_qos_done(struct spdk_io_channel_iter *i, int status) 6011 { 6012 struct set_qos_limit_ctx *ctx = spdk_io_channel_iter_get_ctx(i); 6013 6014 bdev_set_qos_limit_done(ctx, status); 6015 } 6016 6017 static void 6018 bdev_set_qos_rate_limits(struct spdk_bdev *bdev, uint64_t *limits) 6019 { 6020 int i; 6021 6022 assert(bdev->internal.qos != NULL); 6023 6024 for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) { 6025 if (limits[i] != SPDK_BDEV_QOS_LIMIT_NOT_DEFINED) { 6026 bdev->internal.qos->rate_limits[i].limit = limits[i]; 6027 6028 if (limits[i] == 0) { 6029 bdev->internal.qos->rate_limits[i].limit = 6030 SPDK_BDEV_QOS_LIMIT_NOT_DEFINED; 6031 } 6032 } 6033 } 6034 } 6035 6036 void 6037 spdk_bdev_set_qos_rate_limits(struct spdk_bdev *bdev, uint64_t *limits, 6038 void (*cb_fn)(void *cb_arg, int status), void *cb_arg) 6039 { 6040 struct set_qos_limit_ctx *ctx; 6041 uint32_t limit_set_complement; 6042 uint64_t min_limit_per_sec; 6043 int i; 6044 bool disable_rate_limit = true; 6045 6046 for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) { 6047 if (limits[i] == SPDK_BDEV_QOS_LIMIT_NOT_DEFINED) { 6048 continue; 6049 } 6050 6051 if (limits[i] > 0) { 6052 disable_rate_limit = false; 6053 } 6054 6055 if (bdev_qos_is_iops_rate_limit(i) == true) { 6056 min_limit_per_sec = SPDK_BDEV_QOS_MIN_IOS_PER_SEC; 6057 } else { 6058 /* Change from megabyte to byte rate limit */ 6059 limits[i] = limits[i] * 1024 * 1024; 6060 min_limit_per_sec = SPDK_BDEV_QOS_MIN_BYTES_PER_SEC; 6061 } 6062 6063 limit_set_complement = limits[i] % min_limit_per_sec; 6064 if (limit_set_complement) { 6065 SPDK_ERRLOG("Requested rate limit %" PRIu64 " is not a multiple of %" PRIu64 "\n", 6066 limits[i], min_limit_per_sec); 6067 limits[i] += min_limit_per_sec - limit_set_complement; 6068 SPDK_ERRLOG("Round up the rate limit to %" PRIu64 "\n", limits[i]); 6069 } 6070 } 6071 6072 ctx = calloc(1, sizeof(*ctx)); 6073 if (ctx == NULL) { 6074 cb_fn(cb_arg, -ENOMEM); 6075 return; 6076 } 6077 6078 ctx->cb_fn = cb_fn; 6079 ctx->cb_arg = cb_arg; 6080 ctx->bdev = bdev; 6081 6082 pthread_mutex_lock(&bdev->internal.mutex); 6083 if (bdev->internal.qos_mod_in_progress) { 6084 pthread_mutex_unlock(&bdev->internal.mutex); 6085 free(ctx); 6086 cb_fn(cb_arg, -EAGAIN); 6087 return; 6088 } 6089 bdev->internal.qos_mod_in_progress = true; 6090 6091 if (disable_rate_limit == true && bdev->internal.qos) { 6092 for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) { 6093 if (limits[i] == SPDK_BDEV_QOS_LIMIT_NOT_DEFINED && 6094 (bdev->internal.qos->rate_limits[i].limit > 0 && 6095 bdev->internal.qos->rate_limits[i].limit != 6096 SPDK_BDEV_QOS_LIMIT_NOT_DEFINED)) { 6097 disable_rate_limit = false; 6098 break; 6099 } 6100 } 6101 } 6102 6103 if (disable_rate_limit == false) { 6104 if (bdev->internal.qos == NULL) { 6105 bdev->internal.qos = calloc(1, sizeof(*bdev->internal.qos)); 6106 if (!bdev->internal.qos) { 6107 pthread_mutex_unlock(&bdev->internal.mutex); 6108 SPDK_ERRLOG("Unable to allocate memory for QoS tracking\n"); 6109 bdev_set_qos_limit_done(ctx, -ENOMEM); 6110 return; 6111 } 6112 } 6113 6114 if (bdev->internal.qos->thread == NULL) { 6115 /* Enabling */ 6116 bdev_set_qos_rate_limits(bdev, limits); 6117 6118 spdk_for_each_channel(__bdev_to_io_dev(bdev), 6119 bdev_enable_qos_msg, ctx, 6120 bdev_enable_qos_done); 6121 } else { 6122 /* Updating */ 6123 bdev_set_qos_rate_limits(bdev, limits); 6124 6125 spdk_thread_send_msg(bdev->internal.qos->thread, 6126 bdev_update_qos_rate_limit_msg, ctx); 6127 } 6128 } else { 6129 if (bdev->internal.qos != NULL) { 6130 bdev_set_qos_rate_limits(bdev, limits); 6131 6132 /* Disabling */ 6133 spdk_for_each_channel(__bdev_to_io_dev(bdev), 6134 bdev_disable_qos_msg, ctx, 6135 bdev_disable_qos_msg_done); 6136 } else { 6137 pthread_mutex_unlock(&bdev->internal.mutex); 6138 bdev_set_qos_limit_done(ctx, 0); 6139 return; 6140 } 6141 } 6142 6143 pthread_mutex_unlock(&bdev->internal.mutex); 6144 } 6145 6146 struct spdk_bdev_histogram_ctx { 6147 spdk_bdev_histogram_status_cb cb_fn; 6148 void *cb_arg; 6149 struct spdk_bdev *bdev; 6150 int status; 6151 }; 6152 6153 static void 6154 bdev_histogram_disable_channel_cb(struct spdk_io_channel_iter *i, int status) 6155 { 6156 struct spdk_bdev_histogram_ctx *ctx = spdk_io_channel_iter_get_ctx(i); 6157 6158 pthread_mutex_lock(&ctx->bdev->internal.mutex); 6159 ctx->bdev->internal.histogram_in_progress = false; 6160 pthread_mutex_unlock(&ctx->bdev->internal.mutex); 6161 ctx->cb_fn(ctx->cb_arg, ctx->status); 6162 free(ctx); 6163 } 6164 6165 static void 6166 bdev_histogram_disable_channel(struct spdk_io_channel_iter *i) 6167 { 6168 struct spdk_io_channel *_ch = spdk_io_channel_iter_get_channel(i); 6169 struct spdk_bdev_channel *ch = spdk_io_channel_get_ctx(_ch); 6170 6171 if (ch->histogram != NULL) { 6172 spdk_histogram_data_free(ch->histogram); 6173 ch->histogram = NULL; 6174 } 6175 spdk_for_each_channel_continue(i, 0); 6176 } 6177 6178 static void 6179 bdev_histogram_enable_channel_cb(struct spdk_io_channel_iter *i, int status) 6180 { 6181 struct spdk_bdev_histogram_ctx *ctx = spdk_io_channel_iter_get_ctx(i); 6182 6183 if (status != 0) { 6184 ctx->status = status; 6185 ctx->bdev->internal.histogram_enabled = false; 6186 spdk_for_each_channel(__bdev_to_io_dev(ctx->bdev), bdev_histogram_disable_channel, ctx, 6187 bdev_histogram_disable_channel_cb); 6188 } else { 6189 pthread_mutex_lock(&ctx->bdev->internal.mutex); 6190 ctx->bdev->internal.histogram_in_progress = false; 6191 pthread_mutex_unlock(&ctx->bdev->internal.mutex); 6192 ctx->cb_fn(ctx->cb_arg, ctx->status); 6193 free(ctx); 6194 } 6195 } 6196 6197 static void 6198 bdev_histogram_enable_channel(struct spdk_io_channel_iter *i) 6199 { 6200 struct spdk_io_channel *_ch = spdk_io_channel_iter_get_channel(i); 6201 struct spdk_bdev_channel *ch = spdk_io_channel_get_ctx(_ch); 6202 int status = 0; 6203 6204 if (ch->histogram == NULL) { 6205 ch->histogram = spdk_histogram_data_alloc(); 6206 if (ch->histogram == NULL) { 6207 status = -ENOMEM; 6208 } 6209 } 6210 6211 spdk_for_each_channel_continue(i, status); 6212 } 6213 6214 void 6215 spdk_bdev_histogram_enable(struct spdk_bdev *bdev, spdk_bdev_histogram_status_cb cb_fn, 6216 void *cb_arg, bool enable) 6217 { 6218 struct spdk_bdev_histogram_ctx *ctx; 6219 6220 ctx = calloc(1, sizeof(struct spdk_bdev_histogram_ctx)); 6221 if (ctx == NULL) { 6222 cb_fn(cb_arg, -ENOMEM); 6223 return; 6224 } 6225 6226 ctx->bdev = bdev; 6227 ctx->status = 0; 6228 ctx->cb_fn = cb_fn; 6229 ctx->cb_arg = cb_arg; 6230 6231 pthread_mutex_lock(&bdev->internal.mutex); 6232 if (bdev->internal.histogram_in_progress) { 6233 pthread_mutex_unlock(&bdev->internal.mutex); 6234 free(ctx); 6235 cb_fn(cb_arg, -EAGAIN); 6236 return; 6237 } 6238 6239 bdev->internal.histogram_in_progress = true; 6240 pthread_mutex_unlock(&bdev->internal.mutex); 6241 6242 bdev->internal.histogram_enabled = enable; 6243 6244 if (enable) { 6245 /* Allocate histogram for each channel */ 6246 spdk_for_each_channel(__bdev_to_io_dev(bdev), bdev_histogram_enable_channel, ctx, 6247 bdev_histogram_enable_channel_cb); 6248 } else { 6249 spdk_for_each_channel(__bdev_to_io_dev(bdev), bdev_histogram_disable_channel, ctx, 6250 bdev_histogram_disable_channel_cb); 6251 } 6252 } 6253 6254 struct spdk_bdev_histogram_data_ctx { 6255 spdk_bdev_histogram_data_cb cb_fn; 6256 void *cb_arg; 6257 struct spdk_bdev *bdev; 6258 /** merged histogram data from all channels */ 6259 struct spdk_histogram_data *histogram; 6260 }; 6261 6262 static void 6263 bdev_histogram_get_channel_cb(struct spdk_io_channel_iter *i, int status) 6264 { 6265 struct spdk_bdev_histogram_data_ctx *ctx = spdk_io_channel_iter_get_ctx(i); 6266 6267 ctx->cb_fn(ctx->cb_arg, status, ctx->histogram); 6268 free(ctx); 6269 } 6270 6271 static void 6272 bdev_histogram_get_channel(struct spdk_io_channel_iter *i) 6273 { 6274 struct spdk_io_channel *_ch = spdk_io_channel_iter_get_channel(i); 6275 struct spdk_bdev_channel *ch = spdk_io_channel_get_ctx(_ch); 6276 struct spdk_bdev_histogram_data_ctx *ctx = spdk_io_channel_iter_get_ctx(i); 6277 int status = 0; 6278 6279 if (ch->histogram == NULL) { 6280 status = -EFAULT; 6281 } else { 6282 spdk_histogram_data_merge(ctx->histogram, ch->histogram); 6283 } 6284 6285 spdk_for_each_channel_continue(i, status); 6286 } 6287 6288 void 6289 spdk_bdev_histogram_get(struct spdk_bdev *bdev, struct spdk_histogram_data *histogram, 6290 spdk_bdev_histogram_data_cb cb_fn, 6291 void *cb_arg) 6292 { 6293 struct spdk_bdev_histogram_data_ctx *ctx; 6294 6295 ctx = calloc(1, sizeof(struct spdk_bdev_histogram_data_ctx)); 6296 if (ctx == NULL) { 6297 cb_fn(cb_arg, -ENOMEM, NULL); 6298 return; 6299 } 6300 6301 ctx->bdev = bdev; 6302 ctx->cb_fn = cb_fn; 6303 ctx->cb_arg = cb_arg; 6304 6305 ctx->histogram = histogram; 6306 6307 spdk_for_each_channel(__bdev_to_io_dev(bdev), bdev_histogram_get_channel, ctx, 6308 bdev_histogram_get_channel_cb); 6309 } 6310 6311 size_t 6312 spdk_bdev_get_media_events(struct spdk_bdev_desc *desc, struct spdk_bdev_media_event *events, 6313 size_t max_events) 6314 { 6315 struct media_event_entry *entry; 6316 size_t num_events = 0; 6317 6318 for (; num_events < max_events; ++num_events) { 6319 entry = TAILQ_FIRST(&desc->pending_media_events); 6320 if (entry == NULL) { 6321 break; 6322 } 6323 6324 events[num_events] = entry->event; 6325 TAILQ_REMOVE(&desc->pending_media_events, entry, tailq); 6326 TAILQ_INSERT_TAIL(&desc->free_media_events, entry, tailq); 6327 } 6328 6329 return num_events; 6330 } 6331 6332 int 6333 spdk_bdev_push_media_events(struct spdk_bdev *bdev, const struct spdk_bdev_media_event *events, 6334 size_t num_events) 6335 { 6336 struct spdk_bdev_desc *desc; 6337 struct media_event_entry *entry; 6338 size_t event_id; 6339 int rc = 0; 6340 6341 assert(bdev->media_events); 6342 6343 pthread_mutex_lock(&bdev->internal.mutex); 6344 TAILQ_FOREACH(desc, &bdev->internal.open_descs, link) { 6345 if (desc->write) { 6346 break; 6347 } 6348 } 6349 6350 if (desc == NULL || desc->media_events_buffer == NULL) { 6351 rc = -ENODEV; 6352 goto out; 6353 } 6354 6355 for (event_id = 0; event_id < num_events; ++event_id) { 6356 entry = TAILQ_FIRST(&desc->free_media_events); 6357 if (entry == NULL) { 6358 break; 6359 } 6360 6361 TAILQ_REMOVE(&desc->free_media_events, entry, tailq); 6362 TAILQ_INSERT_TAIL(&desc->pending_media_events, entry, tailq); 6363 entry->event = events[event_id]; 6364 } 6365 6366 rc = event_id; 6367 out: 6368 pthread_mutex_unlock(&bdev->internal.mutex); 6369 return rc; 6370 } 6371 6372 void 6373 spdk_bdev_notify_media_management(struct spdk_bdev *bdev) 6374 { 6375 struct spdk_bdev_desc *desc; 6376 6377 pthread_mutex_lock(&bdev->internal.mutex); 6378 TAILQ_FOREACH(desc, &bdev->internal.open_descs, link) { 6379 if (!TAILQ_EMPTY(&desc->pending_media_events)) { 6380 desc->callback.event_fn(SPDK_BDEV_EVENT_MEDIA_MANAGEMENT, bdev, 6381 desc->callback.ctx); 6382 } 6383 } 6384 pthread_mutex_unlock(&bdev->internal.mutex); 6385 } 6386 6387 struct locked_lba_range_ctx { 6388 struct lba_range range; 6389 struct spdk_bdev *bdev; 6390 struct lba_range *current_range; 6391 struct lba_range *owner_range; 6392 struct spdk_poller *poller; 6393 lock_range_cb cb_fn; 6394 void *cb_arg; 6395 }; 6396 6397 static void 6398 bdev_lock_error_cleanup_cb(struct spdk_io_channel_iter *i, int status) 6399 { 6400 struct locked_lba_range_ctx *ctx = spdk_io_channel_iter_get_ctx(i); 6401 6402 ctx->cb_fn(ctx->cb_arg, -ENOMEM); 6403 free(ctx); 6404 } 6405 6406 static void 6407 bdev_unlock_lba_range_get_channel(struct spdk_io_channel_iter *i); 6408 6409 static void 6410 bdev_lock_lba_range_cb(struct spdk_io_channel_iter *i, int status) 6411 { 6412 struct locked_lba_range_ctx *ctx = spdk_io_channel_iter_get_ctx(i); 6413 struct spdk_bdev *bdev = ctx->bdev; 6414 6415 if (status == -ENOMEM) { 6416 /* One of the channels could not allocate a range object. 6417 * So we have to go back and clean up any ranges that were 6418 * allocated successfully before we return error status to 6419 * the caller. We can reuse the unlock function to do that 6420 * clean up. 6421 */ 6422 spdk_for_each_channel(__bdev_to_io_dev(bdev), 6423 bdev_unlock_lba_range_get_channel, ctx, 6424 bdev_lock_error_cleanup_cb); 6425 return; 6426 } 6427 6428 /* All channels have locked this range and no I/O overlapping the range 6429 * are outstanding! Set the owner_ch for the range object for the 6430 * locking channel, so that this channel will know that it is allowed 6431 * to write to this range. 6432 */ 6433 ctx->owner_range->owner_ch = ctx->range.owner_ch; 6434 ctx->cb_fn(ctx->cb_arg, status); 6435 6436 /* Don't free the ctx here. Its range is in the bdev's global list of 6437 * locked ranges still, and will be removed and freed when this range 6438 * is later unlocked. 6439 */ 6440 } 6441 6442 static int 6443 bdev_lock_lba_range_check_io(void *_i) 6444 { 6445 struct spdk_io_channel_iter *i = _i; 6446 struct spdk_io_channel *_ch = spdk_io_channel_iter_get_channel(i); 6447 struct spdk_bdev_channel *ch = spdk_io_channel_get_ctx(_ch); 6448 struct locked_lba_range_ctx *ctx = spdk_io_channel_iter_get_ctx(i); 6449 struct lba_range *range = ctx->current_range; 6450 struct spdk_bdev_io *bdev_io; 6451 6452 spdk_poller_unregister(&ctx->poller); 6453 6454 /* The range is now in the locked_ranges, so no new IO can be submitted to this 6455 * range. But we need to wait until any outstanding IO overlapping with this range 6456 * are completed. 6457 */ 6458 TAILQ_FOREACH(bdev_io, &ch->io_submitted, internal.ch_link) { 6459 if (bdev_io_range_is_locked(bdev_io, range)) { 6460 ctx->poller = SPDK_POLLER_REGISTER(bdev_lock_lba_range_check_io, i, 100); 6461 return 1; 6462 } 6463 } 6464 6465 spdk_for_each_channel_continue(i, 0); 6466 return 1; 6467 } 6468 6469 static void 6470 bdev_lock_lba_range_get_channel(struct spdk_io_channel_iter *i) 6471 { 6472 struct spdk_io_channel *_ch = spdk_io_channel_iter_get_channel(i); 6473 struct spdk_bdev_channel *ch = spdk_io_channel_get_ctx(_ch); 6474 struct locked_lba_range_ctx *ctx = spdk_io_channel_iter_get_ctx(i); 6475 struct lba_range *range; 6476 6477 TAILQ_FOREACH(range, &ch->locked_ranges, tailq) { 6478 if (range->length == ctx->range.length && 6479 range->offset == ctx->range.offset && 6480 range->locked_ctx == ctx->range.locked_ctx) { 6481 /* This range already exists on this channel, so don't add 6482 * it again. This can happen when a new channel is created 6483 * while the for_each_channel operation is in progress. 6484 * Do not check for outstanding I/O in that case, since the 6485 * range was locked before any I/O could be submitted to the 6486 * new channel. 6487 */ 6488 spdk_for_each_channel_continue(i, 0); 6489 return; 6490 } 6491 } 6492 6493 range = calloc(1, sizeof(*range)); 6494 if (range == NULL) { 6495 spdk_for_each_channel_continue(i, -ENOMEM); 6496 return; 6497 } 6498 6499 range->length = ctx->range.length; 6500 range->offset = ctx->range.offset; 6501 range->locked_ctx = ctx->range.locked_ctx; 6502 ctx->current_range = range; 6503 if (ctx->range.owner_ch == ch) { 6504 /* This is the range object for the channel that will hold 6505 * the lock. Store it in the ctx object so that we can easily 6506 * set its owner_ch after the lock is finally acquired. 6507 */ 6508 ctx->owner_range = range; 6509 } 6510 TAILQ_INSERT_TAIL(&ch->locked_ranges, range, tailq); 6511 bdev_lock_lba_range_check_io(i); 6512 } 6513 6514 static void 6515 bdev_lock_lba_range_ctx(struct spdk_bdev *bdev, struct locked_lba_range_ctx *ctx) 6516 { 6517 assert(spdk_get_thread() == ctx->range.owner_ch->channel->thread); 6518 6519 /* We will add a copy of this range to each channel now. */ 6520 spdk_for_each_channel(__bdev_to_io_dev(bdev), bdev_lock_lba_range_get_channel, ctx, 6521 bdev_lock_lba_range_cb); 6522 } 6523 6524 static bool 6525 bdev_lba_range_overlaps_tailq(struct lba_range *range, lba_range_tailq_t *tailq) 6526 { 6527 struct lba_range *r; 6528 6529 TAILQ_FOREACH(r, tailq, tailq) { 6530 if (bdev_lba_range_overlapped(range, r)) { 6531 return true; 6532 } 6533 } 6534 return false; 6535 } 6536 6537 static int 6538 bdev_lock_lba_range(struct spdk_bdev_desc *desc, struct spdk_io_channel *_ch, 6539 uint64_t offset, uint64_t length, 6540 lock_range_cb cb_fn, void *cb_arg) 6541 { 6542 struct spdk_bdev *bdev = spdk_bdev_desc_get_bdev(desc); 6543 struct spdk_bdev_channel *ch = spdk_io_channel_get_ctx(_ch); 6544 struct locked_lba_range_ctx *ctx; 6545 6546 if (cb_arg == NULL) { 6547 SPDK_ERRLOG("cb_arg must not be NULL\n"); 6548 return -EINVAL; 6549 } 6550 6551 ctx = calloc(1, sizeof(*ctx)); 6552 if (ctx == NULL) { 6553 return -ENOMEM; 6554 } 6555 6556 ctx->range.offset = offset; 6557 ctx->range.length = length; 6558 ctx->range.owner_ch = ch; 6559 ctx->range.locked_ctx = cb_arg; 6560 ctx->bdev = bdev; 6561 ctx->cb_fn = cb_fn; 6562 ctx->cb_arg = cb_arg; 6563 6564 pthread_mutex_lock(&bdev->internal.mutex); 6565 if (bdev_lba_range_overlaps_tailq(&ctx->range, &bdev->internal.locked_ranges)) { 6566 /* There is an active lock overlapping with this range. 6567 * Put it on the pending list until this range no 6568 * longer overlaps with another. 6569 */ 6570 TAILQ_INSERT_TAIL(&bdev->internal.pending_locked_ranges, &ctx->range, tailq); 6571 } else { 6572 TAILQ_INSERT_TAIL(&bdev->internal.locked_ranges, &ctx->range, tailq); 6573 bdev_lock_lba_range_ctx(bdev, ctx); 6574 } 6575 pthread_mutex_unlock(&bdev->internal.mutex); 6576 return 0; 6577 } 6578 6579 static void 6580 bdev_lock_lba_range_ctx_msg(void *_ctx) 6581 { 6582 struct locked_lba_range_ctx *ctx = _ctx; 6583 6584 bdev_lock_lba_range_ctx(ctx->bdev, ctx); 6585 } 6586 6587 static void 6588 bdev_unlock_lba_range_cb(struct spdk_io_channel_iter *i, int status) 6589 { 6590 struct locked_lba_range_ctx *ctx = spdk_io_channel_iter_get_ctx(i); 6591 struct locked_lba_range_ctx *pending_ctx; 6592 struct spdk_bdev_channel *ch = ctx->range.owner_ch; 6593 struct spdk_bdev *bdev = ch->bdev; 6594 struct lba_range *range, *tmp; 6595 6596 pthread_mutex_lock(&bdev->internal.mutex); 6597 /* Check if there are any pending locked ranges that overlap with this range 6598 * that was just unlocked. If there are, check that it doesn't overlap with any 6599 * other locked ranges before calling bdev_lock_lba_range_ctx which will start 6600 * the lock process. 6601 */ 6602 TAILQ_FOREACH_SAFE(range, &bdev->internal.pending_locked_ranges, tailq, tmp) { 6603 if (bdev_lba_range_overlapped(range, &ctx->range) && 6604 !bdev_lba_range_overlaps_tailq(range, &bdev->internal.locked_ranges)) { 6605 TAILQ_REMOVE(&bdev->internal.pending_locked_ranges, range, tailq); 6606 pending_ctx = SPDK_CONTAINEROF(range, struct locked_lba_range_ctx, range); 6607 TAILQ_INSERT_TAIL(&bdev->internal.locked_ranges, range, tailq); 6608 spdk_thread_send_msg(pending_ctx->range.owner_ch->channel->thread, 6609 bdev_lock_lba_range_ctx_msg, pending_ctx); 6610 } 6611 } 6612 pthread_mutex_unlock(&bdev->internal.mutex); 6613 6614 ctx->cb_fn(ctx->cb_arg, status); 6615 free(ctx); 6616 } 6617 6618 static void 6619 bdev_unlock_lba_range_get_channel(struct spdk_io_channel_iter *i) 6620 { 6621 struct spdk_io_channel *_ch = spdk_io_channel_iter_get_channel(i); 6622 struct spdk_bdev_channel *ch = spdk_io_channel_get_ctx(_ch); 6623 struct locked_lba_range_ctx *ctx = spdk_io_channel_iter_get_ctx(i); 6624 TAILQ_HEAD(, spdk_bdev_io) io_locked; 6625 struct spdk_bdev_io *bdev_io; 6626 struct lba_range *range; 6627 6628 TAILQ_FOREACH(range, &ch->locked_ranges, tailq) { 6629 if (ctx->range.offset == range->offset && 6630 ctx->range.length == range->length && 6631 ctx->range.locked_ctx == range->locked_ctx) { 6632 TAILQ_REMOVE(&ch->locked_ranges, range, tailq); 6633 free(range); 6634 break; 6635 } 6636 } 6637 6638 /* Note: we should almost always be able to assert that the range specified 6639 * was found. But there are some very rare corner cases where a new channel 6640 * gets created simultaneously with a range unlock, where this function 6641 * would execute on that new channel and wouldn't have the range. 6642 * We also use this to clean up range allocations when a later allocation 6643 * fails in the locking path. 6644 * So we can't actually assert() here. 6645 */ 6646 6647 /* Swap the locked IO into a temporary list, and then try to submit them again. 6648 * We could hyper-optimize this to only resubmit locked I/O that overlap 6649 * with the range that was just unlocked, but this isn't a performance path so 6650 * we go for simplicity here. 6651 */ 6652 TAILQ_INIT(&io_locked); 6653 TAILQ_SWAP(&ch->io_locked, &io_locked, spdk_bdev_io, internal.ch_link); 6654 while (!TAILQ_EMPTY(&io_locked)) { 6655 bdev_io = TAILQ_FIRST(&io_locked); 6656 TAILQ_REMOVE(&io_locked, bdev_io, internal.ch_link); 6657 bdev_io_submit(bdev_io); 6658 } 6659 6660 spdk_for_each_channel_continue(i, 0); 6661 } 6662 6663 static int 6664 bdev_unlock_lba_range(struct spdk_bdev_desc *desc, struct spdk_io_channel *_ch, 6665 uint64_t offset, uint64_t length, 6666 lock_range_cb cb_fn, void *cb_arg) 6667 { 6668 struct spdk_bdev *bdev = spdk_bdev_desc_get_bdev(desc); 6669 struct spdk_bdev_channel *ch = spdk_io_channel_get_ctx(_ch); 6670 struct locked_lba_range_ctx *ctx; 6671 struct lba_range *range; 6672 bool range_found = false; 6673 6674 /* Let's make sure the specified channel actually has a lock on 6675 * the specified range. Note that the range must match exactly. 6676 */ 6677 TAILQ_FOREACH(range, &ch->locked_ranges, tailq) { 6678 if (range->offset == offset && range->length == length && 6679 range->owner_ch == ch && range->locked_ctx == cb_arg) { 6680 range_found = true; 6681 break; 6682 } 6683 } 6684 6685 if (!range_found) { 6686 return -EINVAL; 6687 } 6688 6689 pthread_mutex_lock(&bdev->internal.mutex); 6690 /* We confirmed that this channel has locked the specified range. To 6691 * start the unlock the process, we find the range in the bdev's locked_ranges 6692 * and remove it. This ensures new channels don't inherit the locked range. 6693 * Then we will send a message to each channel (including the one specified 6694 * here) to remove the range from its per-channel list. 6695 */ 6696 TAILQ_FOREACH(range, &bdev->internal.locked_ranges, tailq) { 6697 if (range->offset == offset && range->length == length && 6698 range->locked_ctx == cb_arg) { 6699 break; 6700 } 6701 } 6702 if (range == NULL) { 6703 assert(false); 6704 pthread_mutex_unlock(&bdev->internal.mutex); 6705 return -EINVAL; 6706 } 6707 TAILQ_REMOVE(&bdev->internal.locked_ranges, range, tailq); 6708 ctx = SPDK_CONTAINEROF(range, struct locked_lba_range_ctx, range); 6709 pthread_mutex_unlock(&bdev->internal.mutex); 6710 6711 ctx->cb_fn = cb_fn; 6712 ctx->cb_arg = cb_arg; 6713 6714 spdk_for_each_channel(__bdev_to_io_dev(bdev), bdev_unlock_lba_range_get_channel, ctx, 6715 bdev_unlock_lba_range_cb); 6716 return 0; 6717 } 6718 6719 SPDK_LOG_REGISTER_COMPONENT("bdev", SPDK_LOG_BDEV) 6720 6721 SPDK_TRACE_REGISTER_FN(bdev_trace, "bdev", TRACE_GROUP_BDEV) 6722 { 6723 spdk_trace_register_owner(OWNER_BDEV, 'b'); 6724 spdk_trace_register_object(OBJECT_BDEV_IO, 'i'); 6725 spdk_trace_register_description("BDEV_IO_START", TRACE_BDEV_IO_START, OWNER_BDEV, 6726 OBJECT_BDEV_IO, 1, 0, "type: "); 6727 spdk_trace_register_description("BDEV_IO_DONE", TRACE_BDEV_IO_DONE, OWNER_BDEV, 6728 OBJECT_BDEV_IO, 0, 0, ""); 6729 } 6730