1 /*- 2 * BSD LICENSE 3 * 4 * Copyright (c) Intel Corporation. All rights reserved. 5 * Copyright (c) 2019 Mellanox Technologies LTD. All rights reserved. 6 * 7 * Redistribution and use in source and binary forms, with or without 8 * modification, are permitted provided that the following conditions 9 * are met: 10 * 11 * * Redistributions of source code must retain the above copyright 12 * notice, this list of conditions and the following disclaimer. 13 * * Redistributions in binary form must reproduce the above copyright 14 * notice, this list of conditions and the following disclaimer in 15 * the documentation and/or other materials provided with the 16 * distribution. 17 * * Neither the name of Intel Corporation nor the names of its 18 * contributors may be used to endorse or promote products derived 19 * from this software without specific prior written permission. 20 * 21 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 22 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 23 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR 24 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT 25 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, 26 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT 27 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 28 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY 29 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 30 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE 31 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 32 */ 33 34 #include "spdk/stdinc.h" 35 36 #include "spdk/bdev.h" 37 #include "spdk/conf.h" 38 39 #include "spdk/config.h" 40 #include "spdk/env.h" 41 #include "spdk/thread.h" 42 #include "spdk/likely.h" 43 #include "spdk/queue.h" 44 #include "spdk/nvme_spec.h" 45 #include "spdk/scsi_spec.h" 46 #include "spdk/notify.h" 47 #include "spdk/util.h" 48 #include "spdk/trace.h" 49 50 #include "spdk/bdev_module.h" 51 #include "spdk_internal/log.h" 52 #include "spdk/string.h" 53 54 #include "bdev_internal.h" 55 56 #ifdef SPDK_CONFIG_VTUNE 57 #include "ittnotify.h" 58 #include "ittnotify_types.h" 59 int __itt_init_ittlib(const char *, __itt_group_id); 60 #endif 61 62 #define SPDK_BDEV_IO_POOL_SIZE (64 * 1024 - 1) 63 #define SPDK_BDEV_IO_CACHE_SIZE 256 64 #define SPDK_BDEV_AUTO_EXAMINE true 65 #define BUF_SMALL_POOL_SIZE 8191 66 #define BUF_LARGE_POOL_SIZE 1023 67 #define NOMEM_THRESHOLD_COUNT 8 68 #define ZERO_BUFFER_SIZE 0x100000 69 70 #define OWNER_BDEV 0x2 71 72 #define OBJECT_BDEV_IO 0x2 73 74 #define TRACE_GROUP_BDEV 0x3 75 #define TRACE_BDEV_IO_START SPDK_TPOINT_ID(TRACE_GROUP_BDEV, 0x0) 76 #define TRACE_BDEV_IO_DONE SPDK_TPOINT_ID(TRACE_GROUP_BDEV, 0x1) 77 78 #define SPDK_BDEV_QOS_TIMESLICE_IN_USEC 1000 79 #define SPDK_BDEV_QOS_MIN_IO_PER_TIMESLICE 1 80 #define SPDK_BDEV_QOS_MIN_BYTE_PER_TIMESLICE 512 81 #define SPDK_BDEV_QOS_MIN_IOS_PER_SEC 1000 82 #define SPDK_BDEV_QOS_MIN_BYTES_PER_SEC (1024 * 1024) 83 #define SPDK_BDEV_QOS_LIMIT_NOT_DEFINED UINT64_MAX 84 #define SPDK_BDEV_IO_POLL_INTERVAL_IN_MSEC 1000 85 86 #define SPDK_BDEV_POOL_ALIGNMENT 512 87 88 static const char *qos_conf_type[] = {"Limit_IOPS", 89 "Limit_BPS", "Limit_Read_BPS", "Limit_Write_BPS" 90 }; 91 static const char *qos_rpc_type[] = {"rw_ios_per_sec", 92 "rw_mbytes_per_sec", "r_mbytes_per_sec", "w_mbytes_per_sec" 93 }; 94 95 TAILQ_HEAD(spdk_bdev_list, spdk_bdev); 96 97 struct spdk_bdev_mgr { 98 struct spdk_mempool *bdev_io_pool; 99 100 struct spdk_mempool *buf_small_pool; 101 struct spdk_mempool *buf_large_pool; 102 103 void *zero_buffer; 104 105 TAILQ_HEAD(bdev_module_list, spdk_bdev_module) bdev_modules; 106 107 struct spdk_bdev_list bdevs; 108 109 bool init_complete; 110 bool module_init_complete; 111 112 pthread_mutex_t mutex; 113 114 #ifdef SPDK_CONFIG_VTUNE 115 __itt_domain *domain; 116 #endif 117 }; 118 119 static struct spdk_bdev_mgr g_bdev_mgr = { 120 .bdev_modules = TAILQ_HEAD_INITIALIZER(g_bdev_mgr.bdev_modules), 121 .bdevs = TAILQ_HEAD_INITIALIZER(g_bdev_mgr.bdevs), 122 .init_complete = false, 123 .module_init_complete = false, 124 .mutex = PTHREAD_MUTEX_INITIALIZER, 125 }; 126 127 typedef void (*lock_range_cb)(void *ctx, int status); 128 129 struct lba_range { 130 uint64_t offset; 131 uint64_t length; 132 void *locked_ctx; 133 struct spdk_bdev_channel *owner_ch; 134 TAILQ_ENTRY(lba_range) tailq; 135 }; 136 137 static struct spdk_bdev_opts g_bdev_opts = { 138 .bdev_io_pool_size = SPDK_BDEV_IO_POOL_SIZE, 139 .bdev_io_cache_size = SPDK_BDEV_IO_CACHE_SIZE, 140 .bdev_auto_examine = SPDK_BDEV_AUTO_EXAMINE, 141 }; 142 143 static spdk_bdev_init_cb g_init_cb_fn = NULL; 144 static void *g_init_cb_arg = NULL; 145 146 static spdk_bdev_fini_cb g_fini_cb_fn = NULL; 147 static void *g_fini_cb_arg = NULL; 148 static struct spdk_thread *g_fini_thread = NULL; 149 150 struct spdk_bdev_qos_limit { 151 /** IOs or bytes allowed per second (i.e., 1s). */ 152 uint64_t limit; 153 154 /** Remaining IOs or bytes allowed in current timeslice (e.g., 1ms). 155 * For remaining bytes, allowed to run negative if an I/O is submitted when 156 * some bytes are remaining, but the I/O is bigger than that amount. The 157 * excess will be deducted from the next timeslice. 158 */ 159 int64_t remaining_this_timeslice; 160 161 /** Minimum allowed IOs or bytes to be issued in one timeslice (e.g., 1ms). */ 162 uint32_t min_per_timeslice; 163 164 /** Maximum allowed IOs or bytes to be issued in one timeslice (e.g., 1ms). */ 165 uint32_t max_per_timeslice; 166 167 /** Function to check whether to queue the IO. */ 168 bool (*queue_io)(const struct spdk_bdev_qos_limit *limit, struct spdk_bdev_io *io); 169 170 /** Function to update for the submitted IO. */ 171 void (*update_quota)(struct spdk_bdev_qos_limit *limit, struct spdk_bdev_io *io); 172 }; 173 174 struct spdk_bdev_qos { 175 /** Types of structure of rate limits. */ 176 struct spdk_bdev_qos_limit rate_limits[SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES]; 177 178 /** The channel that all I/O are funneled through. */ 179 struct spdk_bdev_channel *ch; 180 181 /** The thread on which the poller is running. */ 182 struct spdk_thread *thread; 183 184 /** Queue of I/O waiting to be issued. */ 185 bdev_io_tailq_t queued; 186 187 /** Size of a timeslice in tsc ticks. */ 188 uint64_t timeslice_size; 189 190 /** Timestamp of start of last timeslice. */ 191 uint64_t last_timeslice; 192 193 /** Poller that processes queued I/O commands each time slice. */ 194 struct spdk_poller *poller; 195 }; 196 197 struct spdk_bdev_mgmt_channel { 198 bdev_io_stailq_t need_buf_small; 199 bdev_io_stailq_t need_buf_large; 200 201 /* 202 * Each thread keeps a cache of bdev_io - this allows 203 * bdev threads which are *not* DPDK threads to still 204 * benefit from a per-thread bdev_io cache. Without 205 * this, non-DPDK threads fetching from the mempool 206 * incur a cmpxchg on get and put. 207 */ 208 bdev_io_stailq_t per_thread_cache; 209 uint32_t per_thread_cache_count; 210 uint32_t bdev_io_cache_size; 211 212 TAILQ_HEAD(, spdk_bdev_shared_resource) shared_resources; 213 TAILQ_HEAD(, spdk_bdev_io_wait_entry) io_wait_queue; 214 }; 215 216 /* 217 * Per-module (or per-io_device) data. Multiple bdevs built on the same io_device 218 * will queue here their IO that awaits retry. It makes it possible to retry sending 219 * IO to one bdev after IO from other bdev completes. 220 */ 221 struct spdk_bdev_shared_resource { 222 /* The bdev management channel */ 223 struct spdk_bdev_mgmt_channel *mgmt_ch; 224 225 /* 226 * Count of I/O submitted to bdev module and waiting for completion. 227 * Incremented before submit_request() is called on an spdk_bdev_io. 228 */ 229 uint64_t io_outstanding; 230 231 /* 232 * Queue of IO awaiting retry because of a previous NOMEM status returned 233 * on this channel. 234 */ 235 bdev_io_tailq_t nomem_io; 236 237 /* 238 * Threshold which io_outstanding must drop to before retrying nomem_io. 239 */ 240 uint64_t nomem_threshold; 241 242 /* I/O channel allocated by a bdev module */ 243 struct spdk_io_channel *shared_ch; 244 245 /* Refcount of bdev channels using this resource */ 246 uint32_t ref; 247 248 TAILQ_ENTRY(spdk_bdev_shared_resource) link; 249 }; 250 251 #define BDEV_CH_RESET_IN_PROGRESS (1 << 0) 252 #define BDEV_CH_QOS_ENABLED (1 << 1) 253 254 struct spdk_bdev_channel { 255 struct spdk_bdev *bdev; 256 257 /* The channel for the underlying device */ 258 struct spdk_io_channel *channel; 259 260 /* Per io_device per thread data */ 261 struct spdk_bdev_shared_resource *shared_resource; 262 263 struct spdk_bdev_io_stat stat; 264 265 /* 266 * Count of I/O submitted to the underlying dev module through this channel 267 * and waiting for completion. 268 */ 269 uint64_t io_outstanding; 270 271 /* 272 * List of all submitted I/Os including I/O that are generated via splitting. 273 */ 274 bdev_io_tailq_t io_submitted; 275 276 /* 277 * List of spdk_bdev_io that are currently queued because they write to a locked 278 * LBA range. 279 */ 280 bdev_io_tailq_t io_locked; 281 282 uint32_t flags; 283 284 struct spdk_histogram_data *histogram; 285 286 #ifdef SPDK_CONFIG_VTUNE 287 uint64_t start_tsc; 288 uint64_t interval_tsc; 289 __itt_string_handle *handle; 290 struct spdk_bdev_io_stat prev_stat; 291 #endif 292 293 bdev_io_tailq_t queued_resets; 294 295 lba_range_tailq_t locked_ranges; 296 }; 297 298 struct media_event_entry { 299 struct spdk_bdev_media_event event; 300 TAILQ_ENTRY(media_event_entry) tailq; 301 }; 302 303 #define MEDIA_EVENT_POOL_SIZE 64 304 305 struct spdk_bdev_desc { 306 struct spdk_bdev *bdev; 307 struct spdk_thread *thread; 308 struct { 309 bool open_with_ext; 310 union { 311 spdk_bdev_remove_cb_t remove_fn; 312 spdk_bdev_event_cb_t event_fn; 313 }; 314 void *ctx; 315 } callback; 316 bool closed; 317 bool write; 318 pthread_mutex_t mutex; 319 uint32_t refs; 320 TAILQ_HEAD(, media_event_entry) pending_media_events; 321 TAILQ_HEAD(, media_event_entry) free_media_events; 322 struct media_event_entry *media_events_buffer; 323 TAILQ_ENTRY(spdk_bdev_desc) link; 324 325 uint64_t timeout_in_sec; 326 spdk_bdev_io_timeout_cb cb_fn; 327 void *cb_arg; 328 struct spdk_poller *io_timeout_poller; 329 }; 330 331 struct spdk_bdev_iostat_ctx { 332 struct spdk_bdev_io_stat *stat; 333 spdk_bdev_get_device_stat_cb cb; 334 void *cb_arg; 335 }; 336 337 struct set_qos_limit_ctx { 338 void (*cb_fn)(void *cb_arg, int status); 339 void *cb_arg; 340 struct spdk_bdev *bdev; 341 }; 342 343 #define __bdev_to_io_dev(bdev) (((char *)bdev) + 1) 344 #define __bdev_from_io_dev(io_dev) ((struct spdk_bdev *)(((char *)io_dev) - 1)) 345 346 static void bdev_write_zero_buffer_done(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg); 347 static void bdev_write_zero_buffer_next(void *_bdev_io); 348 349 static void bdev_enable_qos_msg(struct spdk_io_channel_iter *i); 350 static void bdev_enable_qos_done(struct spdk_io_channel_iter *i, int status); 351 352 static int 353 bdev_readv_blocks_with_md(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 354 struct iovec *iov, int iovcnt, void *md_buf, uint64_t offset_blocks, 355 uint64_t num_blocks, spdk_bdev_io_completion_cb cb, void *cb_arg); 356 static int 357 bdev_writev_blocks_with_md(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 358 struct iovec *iov, int iovcnt, void *md_buf, 359 uint64_t offset_blocks, uint64_t num_blocks, 360 spdk_bdev_io_completion_cb cb, void *cb_arg); 361 362 static int 363 bdev_lock_lba_range(struct spdk_bdev_desc *desc, struct spdk_io_channel *_ch, 364 uint64_t offset, uint64_t length, 365 lock_range_cb cb_fn, void *cb_arg); 366 367 static int 368 bdev_unlock_lba_range(struct spdk_bdev_desc *desc, struct spdk_io_channel *_ch, 369 uint64_t offset, uint64_t length, 370 lock_range_cb cb_fn, void *cb_arg); 371 372 static inline void bdev_io_complete(void *ctx); 373 374 static bool bdev_abort_queued_io(bdev_io_tailq_t *queue, struct spdk_bdev_io *bio_to_abort); 375 static bool bdev_abort_buf_io(bdev_io_stailq_t *queue, struct spdk_bdev_io *bio_to_abort); 376 377 void 378 spdk_bdev_get_opts(struct spdk_bdev_opts *opts) 379 { 380 *opts = g_bdev_opts; 381 } 382 383 int 384 spdk_bdev_set_opts(struct spdk_bdev_opts *opts) 385 { 386 uint32_t min_pool_size; 387 388 /* 389 * Add 1 to the thread count to account for the extra mgmt_ch that gets created during subsystem 390 * initialization. A second mgmt_ch will be created on the same thread when the application starts 391 * but before the deferred put_io_channel event is executed for the first mgmt_ch. 392 */ 393 min_pool_size = opts->bdev_io_cache_size * (spdk_thread_get_count() + 1); 394 if (opts->bdev_io_pool_size < min_pool_size) { 395 SPDK_ERRLOG("bdev_io_pool_size %" PRIu32 " is not compatible with bdev_io_cache_size %" PRIu32 396 " and %" PRIu32 " threads\n", opts->bdev_io_pool_size, opts->bdev_io_cache_size, 397 spdk_thread_get_count()); 398 SPDK_ERRLOG("bdev_io_pool_size must be at least %" PRIu32 "\n", min_pool_size); 399 return -1; 400 } 401 402 g_bdev_opts = *opts; 403 return 0; 404 } 405 406 /* 407 * Will implement the whitelist in the furture 408 */ 409 static inline bool 410 bdev_in_examine_whitelist(struct spdk_bdev *bdev) 411 { 412 return false; 413 } 414 415 static inline bool 416 bdev_ok_to_examine(struct spdk_bdev *bdev) 417 { 418 if (g_bdev_opts.bdev_auto_examine) { 419 return true; 420 } else { 421 return bdev_in_examine_whitelist(bdev); 422 } 423 } 424 425 struct spdk_bdev * 426 spdk_bdev_first(void) 427 { 428 struct spdk_bdev *bdev; 429 430 bdev = TAILQ_FIRST(&g_bdev_mgr.bdevs); 431 if (bdev) { 432 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Starting bdev iteration at %s\n", bdev->name); 433 } 434 435 return bdev; 436 } 437 438 struct spdk_bdev * 439 spdk_bdev_next(struct spdk_bdev *prev) 440 { 441 struct spdk_bdev *bdev; 442 443 bdev = TAILQ_NEXT(prev, internal.link); 444 if (bdev) { 445 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Continuing bdev iteration at %s\n", bdev->name); 446 } 447 448 return bdev; 449 } 450 451 static struct spdk_bdev * 452 _bdev_next_leaf(struct spdk_bdev *bdev) 453 { 454 while (bdev != NULL) { 455 if (bdev->internal.claim_module == NULL) { 456 return bdev; 457 } else { 458 bdev = TAILQ_NEXT(bdev, internal.link); 459 } 460 } 461 462 return bdev; 463 } 464 465 struct spdk_bdev * 466 spdk_bdev_first_leaf(void) 467 { 468 struct spdk_bdev *bdev; 469 470 bdev = _bdev_next_leaf(TAILQ_FIRST(&g_bdev_mgr.bdevs)); 471 472 if (bdev) { 473 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Starting bdev iteration at %s\n", bdev->name); 474 } 475 476 return bdev; 477 } 478 479 struct spdk_bdev * 480 spdk_bdev_next_leaf(struct spdk_bdev *prev) 481 { 482 struct spdk_bdev *bdev; 483 484 bdev = _bdev_next_leaf(TAILQ_NEXT(prev, internal.link)); 485 486 if (bdev) { 487 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Continuing bdev iteration at %s\n", bdev->name); 488 } 489 490 return bdev; 491 } 492 493 struct spdk_bdev * 494 spdk_bdev_get_by_name(const char *bdev_name) 495 { 496 struct spdk_bdev_alias *tmp; 497 struct spdk_bdev *bdev = spdk_bdev_first(); 498 499 while (bdev != NULL) { 500 if (strcmp(bdev_name, bdev->name) == 0) { 501 return bdev; 502 } 503 504 TAILQ_FOREACH(tmp, &bdev->aliases, tailq) { 505 if (strcmp(bdev_name, tmp->alias) == 0) { 506 return bdev; 507 } 508 } 509 510 bdev = spdk_bdev_next(bdev); 511 } 512 513 return NULL; 514 } 515 516 void 517 spdk_bdev_io_set_buf(struct spdk_bdev_io *bdev_io, void *buf, size_t len) 518 { 519 struct iovec *iovs; 520 521 if (bdev_io->u.bdev.iovs == NULL) { 522 bdev_io->u.bdev.iovs = &bdev_io->iov; 523 bdev_io->u.bdev.iovcnt = 1; 524 } 525 526 iovs = bdev_io->u.bdev.iovs; 527 528 assert(iovs != NULL); 529 assert(bdev_io->u.bdev.iovcnt >= 1); 530 531 iovs[0].iov_base = buf; 532 iovs[0].iov_len = len; 533 } 534 535 void 536 spdk_bdev_io_set_md_buf(struct spdk_bdev_io *bdev_io, void *md_buf, size_t len) 537 { 538 assert((len / spdk_bdev_get_md_size(bdev_io->bdev)) >= bdev_io->u.bdev.num_blocks); 539 bdev_io->u.bdev.md_buf = md_buf; 540 } 541 542 static bool 543 _is_buf_allocated(const struct iovec *iovs) 544 { 545 if (iovs == NULL) { 546 return false; 547 } 548 549 return iovs[0].iov_base != NULL; 550 } 551 552 static bool 553 _are_iovs_aligned(struct iovec *iovs, int iovcnt, uint32_t alignment) 554 { 555 int i; 556 uintptr_t iov_base; 557 558 if (spdk_likely(alignment == 1)) { 559 return true; 560 } 561 562 for (i = 0; i < iovcnt; i++) { 563 iov_base = (uintptr_t)iovs[i].iov_base; 564 if ((iov_base & (alignment - 1)) != 0) { 565 return false; 566 } 567 } 568 569 return true; 570 } 571 572 static void 573 _copy_iovs_to_buf(void *buf, size_t buf_len, struct iovec *iovs, int iovcnt) 574 { 575 int i; 576 size_t len; 577 578 for (i = 0; i < iovcnt; i++) { 579 len = spdk_min(iovs[i].iov_len, buf_len); 580 memcpy(buf, iovs[i].iov_base, len); 581 buf += len; 582 buf_len -= len; 583 } 584 } 585 586 static void 587 _copy_buf_to_iovs(struct iovec *iovs, int iovcnt, void *buf, size_t buf_len) 588 { 589 int i; 590 size_t len; 591 592 for (i = 0; i < iovcnt; i++) { 593 len = spdk_min(iovs[i].iov_len, buf_len); 594 memcpy(iovs[i].iov_base, buf, len); 595 buf += len; 596 buf_len -= len; 597 } 598 } 599 600 static void 601 _bdev_io_set_bounce_buf(struct spdk_bdev_io *bdev_io, void *buf, size_t len) 602 { 603 /* save original iovec */ 604 bdev_io->internal.orig_iovs = bdev_io->u.bdev.iovs; 605 bdev_io->internal.orig_iovcnt = bdev_io->u.bdev.iovcnt; 606 /* set bounce iov */ 607 bdev_io->u.bdev.iovs = &bdev_io->internal.bounce_iov; 608 bdev_io->u.bdev.iovcnt = 1; 609 /* set bounce buffer for this operation */ 610 bdev_io->u.bdev.iovs[0].iov_base = buf; 611 bdev_io->u.bdev.iovs[0].iov_len = len; 612 /* if this is write path, copy data from original buffer to bounce buffer */ 613 if (bdev_io->type == SPDK_BDEV_IO_TYPE_WRITE) { 614 _copy_iovs_to_buf(buf, len, bdev_io->internal.orig_iovs, bdev_io->internal.orig_iovcnt); 615 } 616 } 617 618 static void 619 _bdev_io_set_bounce_md_buf(struct spdk_bdev_io *bdev_io, void *md_buf, size_t len) 620 { 621 /* save original md_buf */ 622 bdev_io->internal.orig_md_buf = bdev_io->u.bdev.md_buf; 623 /* set bounce md_buf */ 624 bdev_io->u.bdev.md_buf = md_buf; 625 626 if (bdev_io->type == SPDK_BDEV_IO_TYPE_WRITE) { 627 memcpy(md_buf, bdev_io->internal.orig_md_buf, len); 628 } 629 } 630 631 static void 632 bdev_io_get_buf_complete(struct spdk_bdev_io *bdev_io, void *buf, bool status) 633 { 634 struct spdk_io_channel *ch = spdk_bdev_io_get_io_channel(bdev_io); 635 636 if (spdk_unlikely(bdev_io->internal.get_aux_buf_cb != NULL)) { 637 bdev_io->internal.get_aux_buf_cb(ch, bdev_io, buf); 638 bdev_io->internal.get_aux_buf_cb = NULL; 639 } else { 640 assert(bdev_io->internal.get_buf_cb != NULL); 641 bdev_io->internal.buf = buf; 642 bdev_io->internal.get_buf_cb(ch, bdev_io, status); 643 bdev_io->internal.get_buf_cb = NULL; 644 } 645 } 646 647 static void 648 _bdev_io_set_buf(struct spdk_bdev_io *bdev_io, void *buf, uint64_t len) 649 { 650 struct spdk_bdev *bdev = bdev_io->bdev; 651 bool buf_allocated; 652 uint64_t md_len, alignment; 653 void *aligned_buf; 654 655 if (spdk_unlikely(bdev_io->internal.get_aux_buf_cb != NULL)) { 656 bdev_io_get_buf_complete(bdev_io, buf, true); 657 return; 658 } 659 660 alignment = spdk_bdev_get_buf_align(bdev); 661 buf_allocated = _is_buf_allocated(bdev_io->u.bdev.iovs); 662 aligned_buf = (void *)(((uintptr_t)buf + (alignment - 1)) & ~(alignment - 1)); 663 664 if (buf_allocated) { 665 _bdev_io_set_bounce_buf(bdev_io, aligned_buf, len); 666 } else { 667 spdk_bdev_io_set_buf(bdev_io, aligned_buf, len); 668 } 669 670 if (spdk_bdev_is_md_separate(bdev)) { 671 aligned_buf = (char *)aligned_buf + len; 672 md_len = bdev_io->u.bdev.num_blocks * bdev->md_len; 673 674 assert(((uintptr_t)aligned_buf & (alignment - 1)) == 0); 675 676 if (bdev_io->u.bdev.md_buf != NULL) { 677 _bdev_io_set_bounce_md_buf(bdev_io, aligned_buf, md_len); 678 } else { 679 spdk_bdev_io_set_md_buf(bdev_io, aligned_buf, md_len); 680 } 681 } 682 bdev_io_get_buf_complete(bdev_io, buf, true); 683 } 684 685 static void 686 _bdev_io_put_buf(struct spdk_bdev_io *bdev_io, void *buf, uint64_t buf_len) 687 { 688 struct spdk_bdev *bdev = bdev_io->bdev; 689 struct spdk_mempool *pool; 690 struct spdk_bdev_io *tmp; 691 bdev_io_stailq_t *stailq; 692 struct spdk_bdev_mgmt_channel *ch; 693 uint64_t md_len, alignment; 694 695 md_len = spdk_bdev_is_md_separate(bdev) ? bdev_io->u.bdev.num_blocks * bdev->md_len : 0; 696 alignment = spdk_bdev_get_buf_align(bdev); 697 ch = bdev_io->internal.ch->shared_resource->mgmt_ch; 698 699 if (buf_len + alignment + md_len <= SPDK_BDEV_BUF_SIZE_WITH_MD(SPDK_BDEV_SMALL_BUF_MAX_SIZE) + 700 SPDK_BDEV_POOL_ALIGNMENT) { 701 pool = g_bdev_mgr.buf_small_pool; 702 stailq = &ch->need_buf_small; 703 } else { 704 pool = g_bdev_mgr.buf_large_pool; 705 stailq = &ch->need_buf_large; 706 } 707 708 if (STAILQ_EMPTY(stailq)) { 709 spdk_mempool_put(pool, buf); 710 } else { 711 tmp = STAILQ_FIRST(stailq); 712 STAILQ_REMOVE_HEAD(stailq, internal.buf_link); 713 _bdev_io_set_buf(tmp, buf, tmp->internal.buf_len); 714 } 715 } 716 717 static void 718 bdev_io_put_buf(struct spdk_bdev_io *bdev_io) 719 { 720 assert(bdev_io->internal.buf != NULL); 721 _bdev_io_put_buf(bdev_io, bdev_io->internal.buf, bdev_io->internal.buf_len); 722 bdev_io->internal.buf = NULL; 723 } 724 725 void 726 spdk_bdev_io_put_aux_buf(struct spdk_bdev_io *bdev_io, void *buf) 727 { 728 uint64_t len = bdev_io->u.bdev.num_blocks * bdev_io->bdev->blocklen; 729 730 assert(buf != NULL); 731 _bdev_io_put_buf(bdev_io, buf, len); 732 } 733 734 static void 735 _bdev_io_unset_bounce_buf(struct spdk_bdev_io *bdev_io) 736 { 737 if (spdk_likely(bdev_io->internal.orig_iovcnt == 0)) { 738 assert(bdev_io->internal.orig_md_buf == NULL); 739 return; 740 } 741 742 /* if this is read path, copy data from bounce buffer to original buffer */ 743 if (bdev_io->type == SPDK_BDEV_IO_TYPE_READ && 744 bdev_io->internal.status == SPDK_BDEV_IO_STATUS_SUCCESS) { 745 _copy_buf_to_iovs(bdev_io->internal.orig_iovs, 746 bdev_io->internal.orig_iovcnt, 747 bdev_io->internal.bounce_iov.iov_base, 748 bdev_io->internal.bounce_iov.iov_len); 749 } 750 /* set original buffer for this io */ 751 bdev_io->u.bdev.iovcnt = bdev_io->internal.orig_iovcnt; 752 bdev_io->u.bdev.iovs = bdev_io->internal.orig_iovs; 753 /* disable bouncing buffer for this io */ 754 bdev_io->internal.orig_iovcnt = 0; 755 bdev_io->internal.orig_iovs = NULL; 756 757 /* do the same for metadata buffer */ 758 if (spdk_unlikely(bdev_io->internal.orig_md_buf != NULL)) { 759 assert(spdk_bdev_is_md_separate(bdev_io->bdev)); 760 761 if (bdev_io->type == SPDK_BDEV_IO_TYPE_READ && 762 bdev_io->internal.status == SPDK_BDEV_IO_STATUS_SUCCESS) { 763 memcpy(bdev_io->internal.orig_md_buf, bdev_io->u.bdev.md_buf, 764 bdev_io->u.bdev.num_blocks * spdk_bdev_get_md_size(bdev_io->bdev)); 765 } 766 767 bdev_io->u.bdev.md_buf = bdev_io->internal.orig_md_buf; 768 bdev_io->internal.orig_md_buf = NULL; 769 } 770 771 /* We want to free the bounce buffer here since we know we're done with it (as opposed 772 * to waiting for the conditional free of internal.buf in spdk_bdev_free_io()). 773 */ 774 bdev_io_put_buf(bdev_io); 775 } 776 777 static void 778 bdev_io_get_buf(struct spdk_bdev_io *bdev_io, uint64_t len) 779 { 780 struct spdk_bdev *bdev = bdev_io->bdev; 781 struct spdk_mempool *pool; 782 bdev_io_stailq_t *stailq; 783 struct spdk_bdev_mgmt_channel *mgmt_ch; 784 uint64_t alignment, md_len; 785 void *buf; 786 787 alignment = spdk_bdev_get_buf_align(bdev); 788 md_len = spdk_bdev_is_md_separate(bdev) ? bdev_io->u.bdev.num_blocks * bdev->md_len : 0; 789 790 if (len + alignment + md_len > SPDK_BDEV_BUF_SIZE_WITH_MD(SPDK_BDEV_LARGE_BUF_MAX_SIZE) + 791 SPDK_BDEV_POOL_ALIGNMENT) { 792 SPDK_ERRLOG("Length + alignment %" PRIu64 " is larger than allowed\n", 793 len + alignment); 794 bdev_io_get_buf_complete(bdev_io, NULL, false); 795 return; 796 } 797 798 mgmt_ch = bdev_io->internal.ch->shared_resource->mgmt_ch; 799 800 bdev_io->internal.buf_len = len; 801 802 if (len + alignment + md_len <= SPDK_BDEV_BUF_SIZE_WITH_MD(SPDK_BDEV_SMALL_BUF_MAX_SIZE) + 803 SPDK_BDEV_POOL_ALIGNMENT) { 804 pool = g_bdev_mgr.buf_small_pool; 805 stailq = &mgmt_ch->need_buf_small; 806 } else { 807 pool = g_bdev_mgr.buf_large_pool; 808 stailq = &mgmt_ch->need_buf_large; 809 } 810 811 buf = spdk_mempool_get(pool); 812 if (!buf) { 813 STAILQ_INSERT_TAIL(stailq, bdev_io, internal.buf_link); 814 } else { 815 _bdev_io_set_buf(bdev_io, buf, len); 816 } 817 } 818 819 void 820 spdk_bdev_io_get_buf(struct spdk_bdev_io *bdev_io, spdk_bdev_io_get_buf_cb cb, uint64_t len) 821 { 822 struct spdk_bdev *bdev = bdev_io->bdev; 823 uint64_t alignment; 824 825 assert(cb != NULL); 826 bdev_io->internal.get_buf_cb = cb; 827 828 alignment = spdk_bdev_get_buf_align(bdev); 829 830 if (_is_buf_allocated(bdev_io->u.bdev.iovs) && 831 _are_iovs_aligned(bdev_io->u.bdev.iovs, bdev_io->u.bdev.iovcnt, alignment)) { 832 /* Buffer already present and aligned */ 833 cb(spdk_bdev_io_get_io_channel(bdev_io), bdev_io, true); 834 return; 835 } 836 837 bdev_io_get_buf(bdev_io, len); 838 } 839 840 void 841 spdk_bdev_io_get_aux_buf(struct spdk_bdev_io *bdev_io, spdk_bdev_io_get_aux_buf_cb cb) 842 { 843 uint64_t len = bdev_io->u.bdev.num_blocks * bdev_io->bdev->blocklen; 844 845 assert(cb != NULL); 846 assert(bdev_io->internal.get_aux_buf_cb == NULL); 847 bdev_io->internal.get_aux_buf_cb = cb; 848 bdev_io_get_buf(bdev_io, len); 849 } 850 851 static int 852 bdev_module_get_max_ctx_size(void) 853 { 854 struct spdk_bdev_module *bdev_module; 855 int max_bdev_module_size = 0; 856 857 TAILQ_FOREACH(bdev_module, &g_bdev_mgr.bdev_modules, internal.tailq) { 858 if (bdev_module->get_ctx_size && bdev_module->get_ctx_size() > max_bdev_module_size) { 859 max_bdev_module_size = bdev_module->get_ctx_size(); 860 } 861 } 862 863 return max_bdev_module_size; 864 } 865 866 void 867 spdk_bdev_config_text(FILE *fp) 868 { 869 struct spdk_bdev_module *bdev_module; 870 871 TAILQ_FOREACH(bdev_module, &g_bdev_mgr.bdev_modules, internal.tailq) { 872 if (bdev_module->config_text) { 873 bdev_module->config_text(fp); 874 } 875 } 876 } 877 878 static void 879 bdev_qos_config_json(struct spdk_bdev *bdev, struct spdk_json_write_ctx *w) 880 { 881 int i; 882 struct spdk_bdev_qos *qos = bdev->internal.qos; 883 uint64_t limits[SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES]; 884 885 if (!qos) { 886 return; 887 } 888 889 spdk_bdev_get_qos_rate_limits(bdev, limits); 890 891 spdk_json_write_object_begin(w); 892 spdk_json_write_named_string(w, "method", "bdev_set_qos_limit"); 893 894 spdk_json_write_named_object_begin(w, "params"); 895 spdk_json_write_named_string(w, "name", bdev->name); 896 for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) { 897 if (limits[i] > 0) { 898 spdk_json_write_named_uint64(w, qos_rpc_type[i], limits[i]); 899 } 900 } 901 spdk_json_write_object_end(w); 902 903 spdk_json_write_object_end(w); 904 } 905 906 void 907 spdk_bdev_subsystem_config_json(struct spdk_json_write_ctx *w) 908 { 909 struct spdk_bdev_module *bdev_module; 910 struct spdk_bdev *bdev; 911 912 assert(w != NULL); 913 914 spdk_json_write_array_begin(w); 915 916 spdk_json_write_object_begin(w); 917 spdk_json_write_named_string(w, "method", "bdev_set_options"); 918 spdk_json_write_named_object_begin(w, "params"); 919 spdk_json_write_named_uint32(w, "bdev_io_pool_size", g_bdev_opts.bdev_io_pool_size); 920 spdk_json_write_named_uint32(w, "bdev_io_cache_size", g_bdev_opts.bdev_io_cache_size); 921 spdk_json_write_named_bool(w, "bdev_auto_examine", g_bdev_opts.bdev_auto_examine); 922 spdk_json_write_object_end(w); 923 spdk_json_write_object_end(w); 924 925 TAILQ_FOREACH(bdev_module, &g_bdev_mgr.bdev_modules, internal.tailq) { 926 if (bdev_module->config_json) { 927 bdev_module->config_json(w); 928 } 929 } 930 931 pthread_mutex_lock(&g_bdev_mgr.mutex); 932 933 TAILQ_FOREACH(bdev, &g_bdev_mgr.bdevs, internal.link) { 934 if (bdev->fn_table->write_config_json) { 935 bdev->fn_table->write_config_json(bdev, w); 936 } 937 938 bdev_qos_config_json(bdev, w); 939 } 940 941 pthread_mutex_unlock(&g_bdev_mgr.mutex); 942 943 spdk_json_write_array_end(w); 944 } 945 946 static int 947 bdev_mgmt_channel_create(void *io_device, void *ctx_buf) 948 { 949 struct spdk_bdev_mgmt_channel *ch = ctx_buf; 950 struct spdk_bdev_io *bdev_io; 951 uint32_t i; 952 953 STAILQ_INIT(&ch->need_buf_small); 954 STAILQ_INIT(&ch->need_buf_large); 955 956 STAILQ_INIT(&ch->per_thread_cache); 957 ch->bdev_io_cache_size = g_bdev_opts.bdev_io_cache_size; 958 959 /* Pre-populate bdev_io cache to ensure this thread cannot be starved. */ 960 ch->per_thread_cache_count = 0; 961 for (i = 0; i < ch->bdev_io_cache_size; i++) { 962 bdev_io = spdk_mempool_get(g_bdev_mgr.bdev_io_pool); 963 assert(bdev_io != NULL); 964 ch->per_thread_cache_count++; 965 STAILQ_INSERT_HEAD(&ch->per_thread_cache, bdev_io, internal.buf_link); 966 } 967 968 TAILQ_INIT(&ch->shared_resources); 969 TAILQ_INIT(&ch->io_wait_queue); 970 971 return 0; 972 } 973 974 static void 975 bdev_mgmt_channel_destroy(void *io_device, void *ctx_buf) 976 { 977 struct spdk_bdev_mgmt_channel *ch = ctx_buf; 978 struct spdk_bdev_io *bdev_io; 979 980 if (!STAILQ_EMPTY(&ch->need_buf_small) || !STAILQ_EMPTY(&ch->need_buf_large)) { 981 SPDK_ERRLOG("Pending I/O list wasn't empty on mgmt channel free\n"); 982 } 983 984 if (!TAILQ_EMPTY(&ch->shared_resources)) { 985 SPDK_ERRLOG("Module channel list wasn't empty on mgmt channel free\n"); 986 } 987 988 while (!STAILQ_EMPTY(&ch->per_thread_cache)) { 989 bdev_io = STAILQ_FIRST(&ch->per_thread_cache); 990 STAILQ_REMOVE_HEAD(&ch->per_thread_cache, internal.buf_link); 991 ch->per_thread_cache_count--; 992 spdk_mempool_put(g_bdev_mgr.bdev_io_pool, (void *)bdev_io); 993 } 994 995 assert(ch->per_thread_cache_count == 0); 996 } 997 998 static void 999 bdev_init_complete(int rc) 1000 { 1001 spdk_bdev_init_cb cb_fn = g_init_cb_fn; 1002 void *cb_arg = g_init_cb_arg; 1003 struct spdk_bdev_module *m; 1004 1005 g_bdev_mgr.init_complete = true; 1006 g_init_cb_fn = NULL; 1007 g_init_cb_arg = NULL; 1008 1009 /* 1010 * For modules that need to know when subsystem init is complete, 1011 * inform them now. 1012 */ 1013 if (rc == 0) { 1014 TAILQ_FOREACH(m, &g_bdev_mgr.bdev_modules, internal.tailq) { 1015 if (m->init_complete) { 1016 m->init_complete(); 1017 } 1018 } 1019 } 1020 1021 cb_fn(cb_arg, rc); 1022 } 1023 1024 static void 1025 bdev_module_action_complete(void) 1026 { 1027 struct spdk_bdev_module *m; 1028 1029 /* 1030 * Don't finish bdev subsystem initialization if 1031 * module pre-initialization is still in progress, or 1032 * the subsystem been already initialized. 1033 */ 1034 if (!g_bdev_mgr.module_init_complete || g_bdev_mgr.init_complete) { 1035 return; 1036 } 1037 1038 /* 1039 * Check all bdev modules for inits/examinations in progress. If any 1040 * exist, return immediately since we cannot finish bdev subsystem 1041 * initialization until all are completed. 1042 */ 1043 TAILQ_FOREACH(m, &g_bdev_mgr.bdev_modules, internal.tailq) { 1044 if (m->internal.action_in_progress > 0) { 1045 return; 1046 } 1047 } 1048 1049 /* 1050 * Modules already finished initialization - now that all 1051 * the bdev modules have finished their asynchronous I/O 1052 * processing, the entire bdev layer can be marked as complete. 1053 */ 1054 bdev_init_complete(0); 1055 } 1056 1057 static void 1058 bdev_module_action_done(struct spdk_bdev_module *module) 1059 { 1060 assert(module->internal.action_in_progress > 0); 1061 module->internal.action_in_progress--; 1062 bdev_module_action_complete(); 1063 } 1064 1065 void 1066 spdk_bdev_module_init_done(struct spdk_bdev_module *module) 1067 { 1068 bdev_module_action_done(module); 1069 } 1070 1071 void 1072 spdk_bdev_module_examine_done(struct spdk_bdev_module *module) 1073 { 1074 bdev_module_action_done(module); 1075 } 1076 1077 /** The last initialized bdev module */ 1078 static struct spdk_bdev_module *g_resume_bdev_module = NULL; 1079 1080 static void 1081 bdev_init_failed(void *cb_arg) 1082 { 1083 struct spdk_bdev_module *module = cb_arg; 1084 1085 module->internal.action_in_progress--; 1086 bdev_init_complete(-1); 1087 } 1088 1089 static int 1090 bdev_modules_init(void) 1091 { 1092 struct spdk_bdev_module *module; 1093 int rc = 0; 1094 1095 TAILQ_FOREACH(module, &g_bdev_mgr.bdev_modules, internal.tailq) { 1096 g_resume_bdev_module = module; 1097 if (module->async_init) { 1098 module->internal.action_in_progress = 1; 1099 } 1100 rc = module->module_init(); 1101 if (rc != 0) { 1102 /* Bump action_in_progress to prevent other modules from completion of modules_init 1103 * Send message to defer application shutdown until resources are cleaned up */ 1104 module->internal.action_in_progress = 1; 1105 spdk_thread_send_msg(spdk_get_thread(), bdev_init_failed, module); 1106 return rc; 1107 } 1108 } 1109 1110 g_resume_bdev_module = NULL; 1111 return 0; 1112 } 1113 1114 void 1115 spdk_bdev_initialize(spdk_bdev_init_cb cb_fn, void *cb_arg) 1116 { 1117 struct spdk_conf_section *sp; 1118 struct spdk_bdev_opts bdev_opts; 1119 int32_t bdev_io_pool_size, bdev_io_cache_size; 1120 int cache_size; 1121 int rc = 0; 1122 char mempool_name[32]; 1123 1124 assert(cb_fn != NULL); 1125 1126 sp = spdk_conf_find_section(NULL, "Bdev"); 1127 if (sp != NULL) { 1128 spdk_bdev_get_opts(&bdev_opts); 1129 1130 bdev_io_pool_size = spdk_conf_section_get_intval(sp, "BdevIoPoolSize"); 1131 if (bdev_io_pool_size >= 0) { 1132 bdev_opts.bdev_io_pool_size = bdev_io_pool_size; 1133 } 1134 1135 bdev_io_cache_size = spdk_conf_section_get_intval(sp, "BdevIoCacheSize"); 1136 if (bdev_io_cache_size >= 0) { 1137 bdev_opts.bdev_io_cache_size = bdev_io_cache_size; 1138 } 1139 1140 if (spdk_bdev_set_opts(&bdev_opts)) { 1141 bdev_init_complete(-1); 1142 return; 1143 } 1144 1145 assert(memcmp(&bdev_opts, &g_bdev_opts, sizeof(bdev_opts)) == 0); 1146 } 1147 1148 g_init_cb_fn = cb_fn; 1149 g_init_cb_arg = cb_arg; 1150 1151 spdk_notify_type_register("bdev_register"); 1152 spdk_notify_type_register("bdev_unregister"); 1153 1154 snprintf(mempool_name, sizeof(mempool_name), "bdev_io_%d", getpid()); 1155 1156 g_bdev_mgr.bdev_io_pool = spdk_mempool_create(mempool_name, 1157 g_bdev_opts.bdev_io_pool_size, 1158 sizeof(struct spdk_bdev_io) + 1159 bdev_module_get_max_ctx_size(), 1160 0, 1161 SPDK_ENV_SOCKET_ID_ANY); 1162 1163 if (g_bdev_mgr.bdev_io_pool == NULL) { 1164 SPDK_ERRLOG("could not allocate spdk_bdev_io pool\n"); 1165 bdev_init_complete(-1); 1166 return; 1167 } 1168 1169 /** 1170 * Ensure no more than half of the total buffers end up local caches, by 1171 * using spdk_thread_get_count() to determine how many local caches we need 1172 * to account for. 1173 */ 1174 cache_size = BUF_SMALL_POOL_SIZE / (2 * spdk_thread_get_count()); 1175 snprintf(mempool_name, sizeof(mempool_name), "buf_small_pool_%d", getpid()); 1176 1177 g_bdev_mgr.buf_small_pool = spdk_mempool_create(mempool_name, 1178 BUF_SMALL_POOL_SIZE, 1179 SPDK_BDEV_BUF_SIZE_WITH_MD(SPDK_BDEV_SMALL_BUF_MAX_SIZE) + 1180 SPDK_BDEV_POOL_ALIGNMENT, 1181 cache_size, 1182 SPDK_ENV_SOCKET_ID_ANY); 1183 if (!g_bdev_mgr.buf_small_pool) { 1184 SPDK_ERRLOG("create rbuf small pool failed\n"); 1185 bdev_init_complete(-1); 1186 return; 1187 } 1188 1189 cache_size = BUF_LARGE_POOL_SIZE / (2 * spdk_thread_get_count()); 1190 snprintf(mempool_name, sizeof(mempool_name), "buf_large_pool_%d", getpid()); 1191 1192 g_bdev_mgr.buf_large_pool = spdk_mempool_create(mempool_name, 1193 BUF_LARGE_POOL_SIZE, 1194 SPDK_BDEV_BUF_SIZE_WITH_MD(SPDK_BDEV_LARGE_BUF_MAX_SIZE) + 1195 SPDK_BDEV_POOL_ALIGNMENT, 1196 cache_size, 1197 SPDK_ENV_SOCKET_ID_ANY); 1198 if (!g_bdev_mgr.buf_large_pool) { 1199 SPDK_ERRLOG("create rbuf large pool failed\n"); 1200 bdev_init_complete(-1); 1201 return; 1202 } 1203 1204 g_bdev_mgr.zero_buffer = spdk_zmalloc(ZERO_BUFFER_SIZE, ZERO_BUFFER_SIZE, 1205 NULL, SPDK_ENV_LCORE_ID_ANY, SPDK_MALLOC_DMA); 1206 if (!g_bdev_mgr.zero_buffer) { 1207 SPDK_ERRLOG("create bdev zero buffer failed\n"); 1208 bdev_init_complete(-1); 1209 return; 1210 } 1211 1212 #ifdef SPDK_CONFIG_VTUNE 1213 g_bdev_mgr.domain = __itt_domain_create("spdk_bdev"); 1214 #endif 1215 1216 spdk_io_device_register(&g_bdev_mgr, bdev_mgmt_channel_create, 1217 bdev_mgmt_channel_destroy, 1218 sizeof(struct spdk_bdev_mgmt_channel), 1219 "bdev_mgr"); 1220 1221 rc = bdev_modules_init(); 1222 g_bdev_mgr.module_init_complete = true; 1223 if (rc != 0) { 1224 SPDK_ERRLOG("bdev modules init failed\n"); 1225 return; 1226 } 1227 1228 bdev_module_action_complete(); 1229 } 1230 1231 static void 1232 bdev_mgr_unregister_cb(void *io_device) 1233 { 1234 spdk_bdev_fini_cb cb_fn = g_fini_cb_fn; 1235 1236 if (g_bdev_mgr.bdev_io_pool) { 1237 if (spdk_mempool_count(g_bdev_mgr.bdev_io_pool) != g_bdev_opts.bdev_io_pool_size) { 1238 SPDK_ERRLOG("bdev IO pool count is %zu but should be %u\n", 1239 spdk_mempool_count(g_bdev_mgr.bdev_io_pool), 1240 g_bdev_opts.bdev_io_pool_size); 1241 } 1242 1243 spdk_mempool_free(g_bdev_mgr.bdev_io_pool); 1244 } 1245 1246 if (g_bdev_mgr.buf_small_pool) { 1247 if (spdk_mempool_count(g_bdev_mgr.buf_small_pool) != BUF_SMALL_POOL_SIZE) { 1248 SPDK_ERRLOG("Small buffer pool count is %zu but should be %u\n", 1249 spdk_mempool_count(g_bdev_mgr.buf_small_pool), 1250 BUF_SMALL_POOL_SIZE); 1251 assert(false); 1252 } 1253 1254 spdk_mempool_free(g_bdev_mgr.buf_small_pool); 1255 } 1256 1257 if (g_bdev_mgr.buf_large_pool) { 1258 if (spdk_mempool_count(g_bdev_mgr.buf_large_pool) != BUF_LARGE_POOL_SIZE) { 1259 SPDK_ERRLOG("Large buffer pool count is %zu but should be %u\n", 1260 spdk_mempool_count(g_bdev_mgr.buf_large_pool), 1261 BUF_LARGE_POOL_SIZE); 1262 assert(false); 1263 } 1264 1265 spdk_mempool_free(g_bdev_mgr.buf_large_pool); 1266 } 1267 1268 spdk_free(g_bdev_mgr.zero_buffer); 1269 1270 cb_fn(g_fini_cb_arg); 1271 g_fini_cb_fn = NULL; 1272 g_fini_cb_arg = NULL; 1273 g_bdev_mgr.init_complete = false; 1274 g_bdev_mgr.module_init_complete = false; 1275 pthread_mutex_destroy(&g_bdev_mgr.mutex); 1276 } 1277 1278 static void 1279 bdev_module_finish_iter(void *arg) 1280 { 1281 struct spdk_bdev_module *bdev_module; 1282 1283 /* FIXME: Handling initialization failures is broken now, 1284 * so we won't even try cleaning up after successfully 1285 * initialized modules. if module_init_complete is false, 1286 * just call spdk_bdev_mgr_unregister_cb 1287 */ 1288 if (!g_bdev_mgr.module_init_complete) { 1289 bdev_mgr_unregister_cb(NULL); 1290 return; 1291 } 1292 1293 /* Start iterating from the last touched module */ 1294 if (!g_resume_bdev_module) { 1295 bdev_module = TAILQ_LAST(&g_bdev_mgr.bdev_modules, bdev_module_list); 1296 } else { 1297 bdev_module = TAILQ_PREV(g_resume_bdev_module, bdev_module_list, 1298 internal.tailq); 1299 } 1300 1301 while (bdev_module) { 1302 if (bdev_module->async_fini) { 1303 /* Save our place so we can resume later. We must 1304 * save the variable here, before calling module_fini() 1305 * below, because in some cases the module may immediately 1306 * call spdk_bdev_module_finish_done() and re-enter 1307 * this function to continue iterating. */ 1308 g_resume_bdev_module = bdev_module; 1309 } 1310 1311 if (bdev_module->module_fini) { 1312 bdev_module->module_fini(); 1313 } 1314 1315 if (bdev_module->async_fini) { 1316 return; 1317 } 1318 1319 bdev_module = TAILQ_PREV(bdev_module, bdev_module_list, 1320 internal.tailq); 1321 } 1322 1323 g_resume_bdev_module = NULL; 1324 spdk_io_device_unregister(&g_bdev_mgr, bdev_mgr_unregister_cb); 1325 } 1326 1327 void 1328 spdk_bdev_module_finish_done(void) 1329 { 1330 if (spdk_get_thread() != g_fini_thread) { 1331 spdk_thread_send_msg(g_fini_thread, bdev_module_finish_iter, NULL); 1332 } else { 1333 bdev_module_finish_iter(NULL); 1334 } 1335 } 1336 1337 static void 1338 bdev_finish_unregister_bdevs_iter(void *cb_arg, int bdeverrno) 1339 { 1340 struct spdk_bdev *bdev = cb_arg; 1341 1342 if (bdeverrno && bdev) { 1343 SPDK_WARNLOG("Unable to unregister bdev '%s' during spdk_bdev_finish()\n", 1344 bdev->name); 1345 1346 /* 1347 * Since the call to spdk_bdev_unregister() failed, we have no way to free this 1348 * bdev; try to continue by manually removing this bdev from the list and continue 1349 * with the next bdev in the list. 1350 */ 1351 TAILQ_REMOVE(&g_bdev_mgr.bdevs, bdev, internal.link); 1352 } 1353 1354 if (TAILQ_EMPTY(&g_bdev_mgr.bdevs)) { 1355 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Done unregistering bdevs\n"); 1356 /* 1357 * Bdev module finish need to be deferred as we might be in the middle of some context 1358 * (like bdev part free) that will use this bdev (or private bdev driver ctx data) 1359 * after returning. 1360 */ 1361 spdk_thread_send_msg(spdk_get_thread(), bdev_module_finish_iter, NULL); 1362 return; 1363 } 1364 1365 /* 1366 * Unregister last unclaimed bdev in the list, to ensure that bdev subsystem 1367 * shutdown proceeds top-down. The goal is to give virtual bdevs an opportunity 1368 * to detect clean shutdown as opposed to run-time hot removal of the underlying 1369 * base bdevs. 1370 * 1371 * Also, walk the list in the reverse order. 1372 */ 1373 for (bdev = TAILQ_LAST(&g_bdev_mgr.bdevs, spdk_bdev_list); 1374 bdev; bdev = TAILQ_PREV(bdev, spdk_bdev_list, internal.link)) { 1375 if (bdev->internal.claim_module != NULL) { 1376 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Skipping claimed bdev '%s'(<-'%s').\n", 1377 bdev->name, bdev->internal.claim_module->name); 1378 continue; 1379 } 1380 1381 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Unregistering bdev '%s'\n", bdev->name); 1382 spdk_bdev_unregister(bdev, bdev_finish_unregister_bdevs_iter, bdev); 1383 return; 1384 } 1385 1386 /* 1387 * If any bdev fails to unclaim underlying bdev properly, we may face the 1388 * case of bdev list consisting of claimed bdevs only (if claims are managed 1389 * correctly, this would mean there's a loop in the claims graph which is 1390 * clearly impossible). Warn and unregister last bdev on the list then. 1391 */ 1392 for (bdev = TAILQ_LAST(&g_bdev_mgr.bdevs, spdk_bdev_list); 1393 bdev; bdev = TAILQ_PREV(bdev, spdk_bdev_list, internal.link)) { 1394 SPDK_WARNLOG("Unregistering claimed bdev '%s'!\n", bdev->name); 1395 spdk_bdev_unregister(bdev, bdev_finish_unregister_bdevs_iter, bdev); 1396 return; 1397 } 1398 } 1399 1400 void 1401 spdk_bdev_finish(spdk_bdev_fini_cb cb_fn, void *cb_arg) 1402 { 1403 struct spdk_bdev_module *m; 1404 1405 assert(cb_fn != NULL); 1406 1407 g_fini_thread = spdk_get_thread(); 1408 1409 g_fini_cb_fn = cb_fn; 1410 g_fini_cb_arg = cb_arg; 1411 1412 TAILQ_FOREACH(m, &g_bdev_mgr.bdev_modules, internal.tailq) { 1413 if (m->fini_start) { 1414 m->fini_start(); 1415 } 1416 } 1417 1418 bdev_finish_unregister_bdevs_iter(NULL, 0); 1419 } 1420 1421 struct spdk_bdev_io * 1422 bdev_channel_get_io(struct spdk_bdev_channel *channel) 1423 { 1424 struct spdk_bdev_mgmt_channel *ch = channel->shared_resource->mgmt_ch; 1425 struct spdk_bdev_io *bdev_io; 1426 1427 if (ch->per_thread_cache_count > 0) { 1428 bdev_io = STAILQ_FIRST(&ch->per_thread_cache); 1429 STAILQ_REMOVE_HEAD(&ch->per_thread_cache, internal.buf_link); 1430 ch->per_thread_cache_count--; 1431 } else if (spdk_unlikely(!TAILQ_EMPTY(&ch->io_wait_queue))) { 1432 /* 1433 * Don't try to look for bdev_ios in the global pool if there are 1434 * waiters on bdev_ios - we don't want this caller to jump the line. 1435 */ 1436 bdev_io = NULL; 1437 } else { 1438 bdev_io = spdk_mempool_get(g_bdev_mgr.bdev_io_pool); 1439 } 1440 1441 return bdev_io; 1442 } 1443 1444 void 1445 spdk_bdev_free_io(struct spdk_bdev_io *bdev_io) 1446 { 1447 struct spdk_bdev_mgmt_channel *ch; 1448 1449 assert(bdev_io != NULL); 1450 assert(bdev_io->internal.status != SPDK_BDEV_IO_STATUS_PENDING); 1451 1452 ch = bdev_io->internal.ch->shared_resource->mgmt_ch; 1453 1454 if (bdev_io->internal.buf != NULL) { 1455 bdev_io_put_buf(bdev_io); 1456 } 1457 1458 if (ch->per_thread_cache_count < ch->bdev_io_cache_size) { 1459 ch->per_thread_cache_count++; 1460 STAILQ_INSERT_HEAD(&ch->per_thread_cache, bdev_io, internal.buf_link); 1461 while (ch->per_thread_cache_count > 0 && !TAILQ_EMPTY(&ch->io_wait_queue)) { 1462 struct spdk_bdev_io_wait_entry *entry; 1463 1464 entry = TAILQ_FIRST(&ch->io_wait_queue); 1465 TAILQ_REMOVE(&ch->io_wait_queue, entry, link); 1466 entry->cb_fn(entry->cb_arg); 1467 } 1468 } else { 1469 /* We should never have a full cache with entries on the io wait queue. */ 1470 assert(TAILQ_EMPTY(&ch->io_wait_queue)); 1471 spdk_mempool_put(g_bdev_mgr.bdev_io_pool, (void *)bdev_io); 1472 } 1473 } 1474 1475 static bool 1476 bdev_qos_is_iops_rate_limit(enum spdk_bdev_qos_rate_limit_type limit) 1477 { 1478 assert(limit != SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES); 1479 1480 switch (limit) { 1481 case SPDK_BDEV_QOS_RW_IOPS_RATE_LIMIT: 1482 return true; 1483 case SPDK_BDEV_QOS_RW_BPS_RATE_LIMIT: 1484 case SPDK_BDEV_QOS_R_BPS_RATE_LIMIT: 1485 case SPDK_BDEV_QOS_W_BPS_RATE_LIMIT: 1486 return false; 1487 case SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES: 1488 default: 1489 return false; 1490 } 1491 } 1492 1493 static bool 1494 bdev_qos_io_to_limit(struct spdk_bdev_io *bdev_io) 1495 { 1496 switch (bdev_io->type) { 1497 case SPDK_BDEV_IO_TYPE_NVME_IO: 1498 case SPDK_BDEV_IO_TYPE_NVME_IO_MD: 1499 case SPDK_BDEV_IO_TYPE_READ: 1500 case SPDK_BDEV_IO_TYPE_WRITE: 1501 return true; 1502 case SPDK_BDEV_IO_TYPE_ZCOPY: 1503 if (bdev_io->u.bdev.zcopy.start) { 1504 return true; 1505 } else { 1506 return false; 1507 } 1508 default: 1509 return false; 1510 } 1511 } 1512 1513 static bool 1514 bdev_is_read_io(struct spdk_bdev_io *bdev_io) 1515 { 1516 switch (bdev_io->type) { 1517 case SPDK_BDEV_IO_TYPE_NVME_IO: 1518 case SPDK_BDEV_IO_TYPE_NVME_IO_MD: 1519 /* Bit 1 (0x2) set for read operation */ 1520 if (bdev_io->u.nvme_passthru.cmd.opc & SPDK_NVME_OPC_READ) { 1521 return true; 1522 } else { 1523 return false; 1524 } 1525 case SPDK_BDEV_IO_TYPE_READ: 1526 return true; 1527 case SPDK_BDEV_IO_TYPE_ZCOPY: 1528 /* Populate to read from disk */ 1529 if (bdev_io->u.bdev.zcopy.populate) { 1530 return true; 1531 } else { 1532 return false; 1533 } 1534 default: 1535 return false; 1536 } 1537 } 1538 1539 static uint64_t 1540 bdev_get_io_size_in_byte(struct spdk_bdev_io *bdev_io) 1541 { 1542 struct spdk_bdev *bdev = bdev_io->bdev; 1543 1544 switch (bdev_io->type) { 1545 case SPDK_BDEV_IO_TYPE_NVME_IO: 1546 case SPDK_BDEV_IO_TYPE_NVME_IO_MD: 1547 return bdev_io->u.nvme_passthru.nbytes; 1548 case SPDK_BDEV_IO_TYPE_READ: 1549 case SPDK_BDEV_IO_TYPE_WRITE: 1550 return bdev_io->u.bdev.num_blocks * bdev->blocklen; 1551 case SPDK_BDEV_IO_TYPE_ZCOPY: 1552 /* Track the data in the start phase only */ 1553 if (bdev_io->u.bdev.zcopy.start) { 1554 return bdev_io->u.bdev.num_blocks * bdev->blocklen; 1555 } else { 1556 return 0; 1557 } 1558 default: 1559 return 0; 1560 } 1561 } 1562 1563 static bool 1564 bdev_qos_rw_queue_io(const struct spdk_bdev_qos_limit *limit, struct spdk_bdev_io *io) 1565 { 1566 if (limit->max_per_timeslice > 0 && limit->remaining_this_timeslice <= 0) { 1567 return true; 1568 } else { 1569 return false; 1570 } 1571 } 1572 1573 static bool 1574 bdev_qos_r_queue_io(const struct spdk_bdev_qos_limit *limit, struct spdk_bdev_io *io) 1575 { 1576 if (bdev_is_read_io(io) == false) { 1577 return false; 1578 } 1579 1580 return bdev_qos_rw_queue_io(limit, io); 1581 } 1582 1583 static bool 1584 bdev_qos_w_queue_io(const struct spdk_bdev_qos_limit *limit, struct spdk_bdev_io *io) 1585 { 1586 if (bdev_is_read_io(io) == true) { 1587 return false; 1588 } 1589 1590 return bdev_qos_rw_queue_io(limit, io); 1591 } 1592 1593 static void 1594 bdev_qos_rw_iops_update_quota(struct spdk_bdev_qos_limit *limit, struct spdk_bdev_io *io) 1595 { 1596 limit->remaining_this_timeslice--; 1597 } 1598 1599 static void 1600 bdev_qos_rw_bps_update_quota(struct spdk_bdev_qos_limit *limit, struct spdk_bdev_io *io) 1601 { 1602 limit->remaining_this_timeslice -= bdev_get_io_size_in_byte(io); 1603 } 1604 1605 static void 1606 bdev_qos_r_bps_update_quota(struct spdk_bdev_qos_limit *limit, struct spdk_bdev_io *io) 1607 { 1608 if (bdev_is_read_io(io) == false) { 1609 return; 1610 } 1611 1612 return bdev_qos_rw_bps_update_quota(limit, io); 1613 } 1614 1615 static void 1616 bdev_qos_w_bps_update_quota(struct spdk_bdev_qos_limit *limit, struct spdk_bdev_io *io) 1617 { 1618 if (bdev_is_read_io(io) == true) { 1619 return; 1620 } 1621 1622 return bdev_qos_rw_bps_update_quota(limit, io); 1623 } 1624 1625 static void 1626 bdev_qos_set_ops(struct spdk_bdev_qos *qos) 1627 { 1628 int i; 1629 1630 for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) { 1631 if (qos->rate_limits[i].limit == SPDK_BDEV_QOS_LIMIT_NOT_DEFINED) { 1632 qos->rate_limits[i].queue_io = NULL; 1633 qos->rate_limits[i].update_quota = NULL; 1634 continue; 1635 } 1636 1637 switch (i) { 1638 case SPDK_BDEV_QOS_RW_IOPS_RATE_LIMIT: 1639 qos->rate_limits[i].queue_io = bdev_qos_rw_queue_io; 1640 qos->rate_limits[i].update_quota = bdev_qos_rw_iops_update_quota; 1641 break; 1642 case SPDK_BDEV_QOS_RW_BPS_RATE_LIMIT: 1643 qos->rate_limits[i].queue_io = bdev_qos_rw_queue_io; 1644 qos->rate_limits[i].update_quota = bdev_qos_rw_bps_update_quota; 1645 break; 1646 case SPDK_BDEV_QOS_R_BPS_RATE_LIMIT: 1647 qos->rate_limits[i].queue_io = bdev_qos_r_queue_io; 1648 qos->rate_limits[i].update_quota = bdev_qos_r_bps_update_quota; 1649 break; 1650 case SPDK_BDEV_QOS_W_BPS_RATE_LIMIT: 1651 qos->rate_limits[i].queue_io = bdev_qos_w_queue_io; 1652 qos->rate_limits[i].update_quota = bdev_qos_w_bps_update_quota; 1653 break; 1654 default: 1655 break; 1656 } 1657 } 1658 } 1659 1660 static void 1661 _bdev_io_complete_in_submit(struct spdk_bdev_channel *bdev_ch, 1662 struct spdk_bdev_io *bdev_io, 1663 enum spdk_bdev_io_status status) 1664 { 1665 struct spdk_bdev_shared_resource *shared_resource = bdev_ch->shared_resource; 1666 1667 bdev_io->internal.in_submit_request = true; 1668 bdev_ch->io_outstanding++; 1669 shared_resource->io_outstanding++; 1670 spdk_bdev_io_complete(bdev_io, status); 1671 bdev_io->internal.in_submit_request = false; 1672 } 1673 1674 static inline void 1675 bdev_io_do_submit(struct spdk_bdev_channel *bdev_ch, struct spdk_bdev_io *bdev_io) 1676 { 1677 struct spdk_bdev *bdev = bdev_io->bdev; 1678 struct spdk_io_channel *ch = bdev_ch->channel; 1679 struct spdk_bdev_shared_resource *shared_resource = bdev_ch->shared_resource; 1680 1681 if (spdk_unlikely(bdev_io->type == SPDK_BDEV_IO_TYPE_ABORT)) { 1682 struct spdk_bdev_mgmt_channel *mgmt_channel = shared_resource->mgmt_ch; 1683 struct spdk_bdev_io *bio_to_abort = bdev_io->u.abort.bio_to_abort; 1684 1685 if (bdev_abort_queued_io(&shared_resource->nomem_io, bio_to_abort) || 1686 bdev_abort_buf_io(&mgmt_channel->need_buf_small, bio_to_abort) || 1687 bdev_abort_buf_io(&mgmt_channel->need_buf_large, bio_to_abort)) { 1688 _bdev_io_complete_in_submit(bdev_ch, bdev_io, 1689 SPDK_BDEV_IO_STATUS_SUCCESS); 1690 return; 1691 } 1692 } 1693 1694 if (spdk_likely(TAILQ_EMPTY(&shared_resource->nomem_io))) { 1695 bdev_ch->io_outstanding++; 1696 shared_resource->io_outstanding++; 1697 bdev_io->internal.in_submit_request = true; 1698 bdev->fn_table->submit_request(ch, bdev_io); 1699 bdev_io->internal.in_submit_request = false; 1700 } else { 1701 TAILQ_INSERT_TAIL(&shared_resource->nomem_io, bdev_io, internal.link); 1702 } 1703 } 1704 1705 static int 1706 bdev_qos_io_submit(struct spdk_bdev_channel *ch, struct spdk_bdev_qos *qos) 1707 { 1708 struct spdk_bdev_io *bdev_io = NULL, *tmp = NULL; 1709 int i, submitted_ios = 0; 1710 1711 TAILQ_FOREACH_SAFE(bdev_io, &qos->queued, internal.link, tmp) { 1712 if (bdev_qos_io_to_limit(bdev_io) == true) { 1713 for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) { 1714 if (!qos->rate_limits[i].queue_io) { 1715 continue; 1716 } 1717 1718 if (qos->rate_limits[i].queue_io(&qos->rate_limits[i], 1719 bdev_io) == true) { 1720 return submitted_ios; 1721 } 1722 } 1723 for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) { 1724 if (!qos->rate_limits[i].update_quota) { 1725 continue; 1726 } 1727 1728 qos->rate_limits[i].update_quota(&qos->rate_limits[i], bdev_io); 1729 } 1730 } 1731 1732 TAILQ_REMOVE(&qos->queued, bdev_io, internal.link); 1733 bdev_io_do_submit(ch, bdev_io); 1734 submitted_ios++; 1735 } 1736 1737 return submitted_ios; 1738 } 1739 1740 static void 1741 bdev_queue_io_wait_with_cb(struct spdk_bdev_io *bdev_io, spdk_bdev_io_wait_cb cb_fn) 1742 { 1743 int rc; 1744 1745 bdev_io->internal.waitq_entry.bdev = bdev_io->bdev; 1746 bdev_io->internal.waitq_entry.cb_fn = cb_fn; 1747 bdev_io->internal.waitq_entry.cb_arg = bdev_io; 1748 rc = spdk_bdev_queue_io_wait(bdev_io->bdev, spdk_io_channel_from_ctx(bdev_io->internal.ch), 1749 &bdev_io->internal.waitq_entry); 1750 if (rc != 0) { 1751 SPDK_ERRLOG("Queue IO failed, rc=%d\n", rc); 1752 bdev_io->internal.status = SPDK_BDEV_IO_STATUS_FAILED; 1753 bdev_io->internal.cb(bdev_io, false, bdev_io->internal.caller_ctx); 1754 } 1755 } 1756 1757 static bool 1758 bdev_io_type_can_split(uint8_t type) 1759 { 1760 assert(type != SPDK_BDEV_IO_TYPE_INVALID); 1761 assert(type < SPDK_BDEV_NUM_IO_TYPES); 1762 1763 /* Only split READ and WRITE I/O. Theoretically other types of I/O like 1764 * UNMAP could be split, but these types of I/O are typically much larger 1765 * in size (sometimes the size of the entire block device), and the bdev 1766 * module can more efficiently split these types of I/O. Plus those types 1767 * of I/O do not have a payload, which makes the splitting process simpler. 1768 */ 1769 if (type == SPDK_BDEV_IO_TYPE_READ || type == SPDK_BDEV_IO_TYPE_WRITE) { 1770 return true; 1771 } else { 1772 return false; 1773 } 1774 } 1775 1776 static bool 1777 bdev_io_should_split(struct spdk_bdev_io *bdev_io) 1778 { 1779 uint64_t start_stripe, end_stripe; 1780 uint32_t io_boundary = bdev_io->bdev->optimal_io_boundary; 1781 1782 if (io_boundary == 0) { 1783 return false; 1784 } 1785 1786 if (!bdev_io_type_can_split(bdev_io->type)) { 1787 return false; 1788 } 1789 1790 start_stripe = bdev_io->u.bdev.offset_blocks; 1791 end_stripe = start_stripe + bdev_io->u.bdev.num_blocks - 1; 1792 /* Avoid expensive div operations if possible. These spdk_u32 functions are very cheap. */ 1793 if (spdk_likely(spdk_u32_is_pow2(io_boundary))) { 1794 start_stripe >>= spdk_u32log2(io_boundary); 1795 end_stripe >>= spdk_u32log2(io_boundary); 1796 } else { 1797 start_stripe /= io_boundary; 1798 end_stripe /= io_boundary; 1799 } 1800 return (start_stripe != end_stripe); 1801 } 1802 1803 static uint32_t 1804 _to_next_boundary(uint64_t offset, uint32_t boundary) 1805 { 1806 return (boundary - (offset % boundary)); 1807 } 1808 1809 static void 1810 bdev_io_split_done(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg); 1811 1812 static void 1813 _bdev_io_split(void *_bdev_io) 1814 { 1815 struct spdk_bdev_io *bdev_io = _bdev_io; 1816 uint64_t current_offset, remaining; 1817 uint32_t blocklen, to_next_boundary, to_next_boundary_bytes, to_last_block_bytes; 1818 struct iovec *parent_iov, *iov; 1819 uint64_t parent_iov_offset, iov_len; 1820 uint32_t parent_iovpos, parent_iovcnt, child_iovcnt, iovcnt; 1821 void *md_buf = NULL; 1822 int rc; 1823 1824 remaining = bdev_io->u.bdev.split_remaining_num_blocks; 1825 current_offset = bdev_io->u.bdev.split_current_offset_blocks; 1826 blocklen = bdev_io->bdev->blocklen; 1827 parent_iov_offset = (current_offset - bdev_io->u.bdev.offset_blocks) * blocklen; 1828 parent_iovcnt = bdev_io->u.bdev.iovcnt; 1829 1830 for (parent_iovpos = 0; parent_iovpos < parent_iovcnt; parent_iovpos++) { 1831 parent_iov = &bdev_io->u.bdev.iovs[parent_iovpos]; 1832 if (parent_iov_offset < parent_iov->iov_len) { 1833 break; 1834 } 1835 parent_iov_offset -= parent_iov->iov_len; 1836 } 1837 1838 child_iovcnt = 0; 1839 while (remaining > 0 && parent_iovpos < parent_iovcnt && child_iovcnt < BDEV_IO_NUM_CHILD_IOV) { 1840 to_next_boundary = _to_next_boundary(current_offset, bdev_io->bdev->optimal_io_boundary); 1841 to_next_boundary = spdk_min(remaining, to_next_boundary); 1842 to_next_boundary_bytes = to_next_boundary * blocklen; 1843 iov = &bdev_io->child_iov[child_iovcnt]; 1844 iovcnt = 0; 1845 1846 if (bdev_io->u.bdev.md_buf) { 1847 assert((parent_iov_offset % blocklen) > 0); 1848 md_buf = (char *)bdev_io->u.bdev.md_buf + (parent_iov_offset / blocklen) * 1849 spdk_bdev_get_md_size(bdev_io->bdev); 1850 } 1851 1852 while (to_next_boundary_bytes > 0 && parent_iovpos < parent_iovcnt && 1853 child_iovcnt < BDEV_IO_NUM_CHILD_IOV) { 1854 parent_iov = &bdev_io->u.bdev.iovs[parent_iovpos]; 1855 iov_len = spdk_min(to_next_boundary_bytes, parent_iov->iov_len - parent_iov_offset); 1856 to_next_boundary_bytes -= iov_len; 1857 1858 bdev_io->child_iov[child_iovcnt].iov_base = parent_iov->iov_base + parent_iov_offset; 1859 bdev_io->child_iov[child_iovcnt].iov_len = iov_len; 1860 1861 if (iov_len < parent_iov->iov_len - parent_iov_offset) { 1862 parent_iov_offset += iov_len; 1863 } else { 1864 parent_iovpos++; 1865 parent_iov_offset = 0; 1866 } 1867 child_iovcnt++; 1868 iovcnt++; 1869 } 1870 1871 if (to_next_boundary_bytes > 0) { 1872 /* We had to stop this child I/O early because we ran out of 1873 * child_iov space. Ensure the iovs to be aligned with block 1874 * size and then adjust to_next_boundary before starting the 1875 * child I/O. 1876 */ 1877 assert(child_iovcnt == BDEV_IO_NUM_CHILD_IOV); 1878 to_last_block_bytes = to_next_boundary_bytes % blocklen; 1879 if (to_last_block_bytes != 0) { 1880 uint32_t child_iovpos = child_iovcnt - 1; 1881 /* don't decrease child_iovcnt so the loop will naturally end */ 1882 1883 to_last_block_bytes = blocklen - to_last_block_bytes; 1884 to_next_boundary_bytes += to_last_block_bytes; 1885 while (to_last_block_bytes > 0 && iovcnt > 0) { 1886 iov_len = spdk_min(to_last_block_bytes, 1887 bdev_io->child_iov[child_iovpos].iov_len); 1888 bdev_io->child_iov[child_iovpos].iov_len -= iov_len; 1889 if (bdev_io->child_iov[child_iovpos].iov_len == 0) { 1890 child_iovpos--; 1891 if (--iovcnt == 0) { 1892 return; 1893 } 1894 } 1895 to_last_block_bytes -= iov_len; 1896 } 1897 1898 assert(to_last_block_bytes == 0); 1899 } 1900 to_next_boundary -= to_next_boundary_bytes / blocklen; 1901 } 1902 1903 bdev_io->u.bdev.split_outstanding++; 1904 1905 if (bdev_io->type == SPDK_BDEV_IO_TYPE_READ) { 1906 rc = bdev_readv_blocks_with_md(bdev_io->internal.desc, 1907 spdk_io_channel_from_ctx(bdev_io->internal.ch), 1908 iov, iovcnt, md_buf, current_offset, 1909 to_next_boundary, 1910 bdev_io_split_done, bdev_io); 1911 } else { 1912 rc = bdev_writev_blocks_with_md(bdev_io->internal.desc, 1913 spdk_io_channel_from_ctx(bdev_io->internal.ch), 1914 iov, iovcnt, md_buf, current_offset, 1915 to_next_boundary, 1916 bdev_io_split_done, bdev_io); 1917 } 1918 1919 if (rc == 0) { 1920 current_offset += to_next_boundary; 1921 remaining -= to_next_boundary; 1922 bdev_io->u.bdev.split_current_offset_blocks = current_offset; 1923 bdev_io->u.bdev.split_remaining_num_blocks = remaining; 1924 } else { 1925 bdev_io->u.bdev.split_outstanding--; 1926 if (rc == -ENOMEM) { 1927 if (bdev_io->u.bdev.split_outstanding == 0) { 1928 /* No I/O is outstanding. Hence we should wait here. */ 1929 bdev_queue_io_wait_with_cb(bdev_io, _bdev_io_split); 1930 } 1931 } else { 1932 bdev_io->internal.status = SPDK_BDEV_IO_STATUS_FAILED; 1933 if (bdev_io->u.bdev.split_outstanding == 0) { 1934 spdk_trace_record_tsc(spdk_get_ticks(), TRACE_BDEV_IO_DONE, 0, 0, 1935 (uintptr_t)bdev_io, 0); 1936 TAILQ_REMOVE(&bdev_io->internal.ch->io_submitted, bdev_io, internal.ch_link); 1937 bdev_io->internal.cb(bdev_io, false, bdev_io->internal.caller_ctx); 1938 } 1939 } 1940 1941 return; 1942 } 1943 } 1944 } 1945 1946 static void 1947 bdev_io_split_done(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg) 1948 { 1949 struct spdk_bdev_io *parent_io = cb_arg; 1950 1951 spdk_bdev_free_io(bdev_io); 1952 1953 if (!success) { 1954 parent_io->internal.status = SPDK_BDEV_IO_STATUS_FAILED; 1955 } 1956 parent_io->u.bdev.split_outstanding--; 1957 if (parent_io->u.bdev.split_outstanding != 0) { 1958 return; 1959 } 1960 1961 /* 1962 * Parent I/O finishes when all blocks are consumed. 1963 */ 1964 if (parent_io->u.bdev.split_remaining_num_blocks == 0) { 1965 assert(parent_io->internal.cb != bdev_io_split_done); 1966 spdk_trace_record_tsc(spdk_get_ticks(), TRACE_BDEV_IO_DONE, 0, 0, 1967 (uintptr_t)parent_io, 0); 1968 TAILQ_REMOVE(&parent_io->internal.ch->io_submitted, parent_io, internal.ch_link); 1969 parent_io->internal.cb(parent_io, parent_io->internal.status == SPDK_BDEV_IO_STATUS_SUCCESS, 1970 parent_io->internal.caller_ctx); 1971 return; 1972 } 1973 1974 /* 1975 * Continue with the splitting process. This function will complete the parent I/O if the 1976 * splitting is done. 1977 */ 1978 _bdev_io_split(parent_io); 1979 } 1980 1981 static void 1982 bdev_io_split_get_buf_cb(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io, bool success); 1983 1984 static void 1985 bdev_io_split(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io) 1986 { 1987 assert(bdev_io_type_can_split(bdev_io->type)); 1988 1989 bdev_io->u.bdev.split_current_offset_blocks = bdev_io->u.bdev.offset_blocks; 1990 bdev_io->u.bdev.split_remaining_num_blocks = bdev_io->u.bdev.num_blocks; 1991 bdev_io->u.bdev.split_outstanding = 0; 1992 bdev_io->internal.status = SPDK_BDEV_IO_STATUS_SUCCESS; 1993 1994 if (_is_buf_allocated(bdev_io->u.bdev.iovs)) { 1995 _bdev_io_split(bdev_io); 1996 } else { 1997 assert(bdev_io->type == SPDK_BDEV_IO_TYPE_READ); 1998 spdk_bdev_io_get_buf(bdev_io, bdev_io_split_get_buf_cb, 1999 bdev_io->u.bdev.num_blocks * bdev_io->bdev->blocklen); 2000 } 2001 } 2002 2003 static void 2004 bdev_io_split_get_buf_cb(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io, bool success) 2005 { 2006 if (!success) { 2007 spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED); 2008 return; 2009 } 2010 2011 bdev_io_split(ch, bdev_io); 2012 } 2013 2014 /* Explicitly mark this inline, since it's used as a function pointer and otherwise won't 2015 * be inlined, at least on some compilers. 2016 */ 2017 static inline void 2018 _bdev_io_submit(void *ctx) 2019 { 2020 struct spdk_bdev_io *bdev_io = ctx; 2021 struct spdk_bdev *bdev = bdev_io->bdev; 2022 struct spdk_bdev_channel *bdev_ch = bdev_io->internal.ch; 2023 uint64_t tsc; 2024 2025 tsc = spdk_get_ticks(); 2026 bdev_io->internal.submit_tsc = tsc; 2027 spdk_trace_record_tsc(tsc, TRACE_BDEV_IO_START, 0, 0, (uintptr_t)bdev_io, bdev_io->type); 2028 2029 if (spdk_likely(bdev_ch->flags == 0)) { 2030 bdev_io_do_submit(bdev_ch, bdev_io); 2031 return; 2032 } 2033 2034 if (bdev_ch->flags & BDEV_CH_RESET_IN_PROGRESS) { 2035 _bdev_io_complete_in_submit(bdev_ch, bdev_io, SPDK_BDEV_IO_STATUS_FAILED); 2036 } else if (bdev_ch->flags & BDEV_CH_QOS_ENABLED) { 2037 if (spdk_unlikely(bdev_io->type == SPDK_BDEV_IO_TYPE_ABORT) && 2038 bdev_abort_queued_io(&bdev->internal.qos->queued, bdev_io->u.abort.bio_to_abort)) { 2039 _bdev_io_complete_in_submit(bdev_ch, bdev_io, SPDK_BDEV_IO_STATUS_SUCCESS); 2040 } else { 2041 TAILQ_INSERT_TAIL(&bdev->internal.qos->queued, bdev_io, internal.link); 2042 bdev_qos_io_submit(bdev_ch, bdev->internal.qos); 2043 } 2044 } else { 2045 SPDK_ERRLOG("unknown bdev_ch flag %x found\n", bdev_ch->flags); 2046 _bdev_io_complete_in_submit(bdev_ch, bdev_io, SPDK_BDEV_IO_STATUS_FAILED); 2047 } 2048 } 2049 2050 bool 2051 bdev_lba_range_overlapped(struct lba_range *range1, struct lba_range *range2); 2052 2053 bool 2054 bdev_lba_range_overlapped(struct lba_range *range1, struct lba_range *range2) 2055 { 2056 if (range1->length == 0 || range2->length == 0) { 2057 return false; 2058 } 2059 2060 if (range1->offset + range1->length <= range2->offset) { 2061 return false; 2062 } 2063 2064 if (range2->offset + range2->length <= range1->offset) { 2065 return false; 2066 } 2067 2068 return true; 2069 } 2070 2071 static bool 2072 bdev_io_range_is_locked(struct spdk_bdev_io *bdev_io, struct lba_range *range) 2073 { 2074 struct spdk_bdev_channel *ch = bdev_io->internal.ch; 2075 struct lba_range r; 2076 2077 switch (bdev_io->type) { 2078 case SPDK_BDEV_IO_TYPE_NVME_IO: 2079 case SPDK_BDEV_IO_TYPE_NVME_IO_MD: 2080 /* Don't try to decode the NVMe command - just assume worst-case and that 2081 * it overlaps a locked range. 2082 */ 2083 return true; 2084 case SPDK_BDEV_IO_TYPE_WRITE: 2085 case SPDK_BDEV_IO_TYPE_UNMAP: 2086 case SPDK_BDEV_IO_TYPE_WRITE_ZEROES: 2087 case SPDK_BDEV_IO_TYPE_ZCOPY: 2088 r.offset = bdev_io->u.bdev.offset_blocks; 2089 r.length = bdev_io->u.bdev.num_blocks; 2090 if (!bdev_lba_range_overlapped(range, &r)) { 2091 /* This I/O doesn't overlap the specified LBA range. */ 2092 return false; 2093 } else if (range->owner_ch == ch && range->locked_ctx == bdev_io->internal.caller_ctx) { 2094 /* This I/O overlaps, but the I/O is on the same channel that locked this 2095 * range, and the caller_ctx is the same as the locked_ctx. This means 2096 * that this I/O is associated with the lock, and is allowed to execute. 2097 */ 2098 return false; 2099 } else { 2100 return true; 2101 } 2102 default: 2103 return false; 2104 } 2105 } 2106 2107 void 2108 bdev_io_submit(struct spdk_bdev_io *bdev_io) 2109 { 2110 struct spdk_bdev *bdev = bdev_io->bdev; 2111 struct spdk_thread *thread = spdk_bdev_io_get_thread(bdev_io); 2112 struct spdk_bdev_channel *ch = bdev_io->internal.ch; 2113 2114 assert(thread != NULL); 2115 assert(bdev_io->internal.status == SPDK_BDEV_IO_STATUS_PENDING); 2116 2117 if (!TAILQ_EMPTY(&ch->locked_ranges)) { 2118 struct lba_range *range; 2119 2120 TAILQ_FOREACH(range, &ch->locked_ranges, tailq) { 2121 if (bdev_io_range_is_locked(bdev_io, range)) { 2122 TAILQ_INSERT_TAIL(&ch->io_locked, bdev_io, internal.ch_link); 2123 return; 2124 } 2125 } 2126 } 2127 2128 TAILQ_INSERT_TAIL(&ch->io_submitted, bdev_io, internal.ch_link); 2129 2130 if (bdev->split_on_optimal_io_boundary && bdev_io_should_split(bdev_io)) { 2131 bdev_io->internal.submit_tsc = spdk_get_ticks(); 2132 spdk_trace_record_tsc(bdev_io->internal.submit_tsc, TRACE_BDEV_IO_START, 0, 0, 2133 (uintptr_t)bdev_io, bdev_io->type); 2134 bdev_io_split(NULL, bdev_io); 2135 return; 2136 } 2137 2138 if (ch->flags & BDEV_CH_QOS_ENABLED) { 2139 if ((thread == bdev->internal.qos->thread) || !bdev->internal.qos->thread) { 2140 _bdev_io_submit(bdev_io); 2141 } else { 2142 bdev_io->internal.io_submit_ch = ch; 2143 bdev_io->internal.ch = bdev->internal.qos->ch; 2144 spdk_thread_send_msg(bdev->internal.qos->thread, _bdev_io_submit, bdev_io); 2145 } 2146 } else { 2147 _bdev_io_submit(bdev_io); 2148 } 2149 } 2150 2151 static void 2152 bdev_io_submit_reset(struct spdk_bdev_io *bdev_io) 2153 { 2154 struct spdk_bdev *bdev = bdev_io->bdev; 2155 struct spdk_bdev_channel *bdev_ch = bdev_io->internal.ch; 2156 struct spdk_io_channel *ch = bdev_ch->channel; 2157 2158 assert(bdev_io->internal.status == SPDK_BDEV_IO_STATUS_PENDING); 2159 2160 bdev_io->internal.in_submit_request = true; 2161 bdev->fn_table->submit_request(ch, bdev_io); 2162 bdev_io->internal.in_submit_request = false; 2163 } 2164 2165 void 2166 bdev_io_init(struct spdk_bdev_io *bdev_io, 2167 struct spdk_bdev *bdev, void *cb_arg, 2168 spdk_bdev_io_completion_cb cb) 2169 { 2170 bdev_io->bdev = bdev; 2171 bdev_io->internal.caller_ctx = cb_arg; 2172 bdev_io->internal.cb = cb; 2173 bdev_io->internal.status = SPDK_BDEV_IO_STATUS_PENDING; 2174 bdev_io->internal.in_submit_request = false; 2175 bdev_io->internal.buf = NULL; 2176 bdev_io->internal.io_submit_ch = NULL; 2177 bdev_io->internal.orig_iovs = NULL; 2178 bdev_io->internal.orig_iovcnt = 0; 2179 bdev_io->internal.orig_md_buf = NULL; 2180 bdev_io->internal.error.nvme.cdw0 = 0; 2181 bdev_io->num_retries = 0; 2182 bdev_io->internal.get_buf_cb = NULL; 2183 bdev_io->internal.get_aux_buf_cb = NULL; 2184 } 2185 2186 static bool 2187 bdev_io_type_supported(struct spdk_bdev *bdev, enum spdk_bdev_io_type io_type) 2188 { 2189 return bdev->fn_table->io_type_supported(bdev->ctxt, io_type); 2190 } 2191 2192 bool 2193 spdk_bdev_io_type_supported(struct spdk_bdev *bdev, enum spdk_bdev_io_type io_type) 2194 { 2195 bool supported; 2196 2197 supported = bdev_io_type_supported(bdev, io_type); 2198 2199 if (!supported) { 2200 switch (io_type) { 2201 case SPDK_BDEV_IO_TYPE_WRITE_ZEROES: 2202 /* The bdev layer will emulate write zeroes as long as write is supported. */ 2203 supported = bdev_io_type_supported(bdev, SPDK_BDEV_IO_TYPE_WRITE); 2204 break; 2205 case SPDK_BDEV_IO_TYPE_ZCOPY: 2206 /* Zero copy can be emulated with regular read and write */ 2207 supported = bdev_io_type_supported(bdev, SPDK_BDEV_IO_TYPE_READ) && 2208 bdev_io_type_supported(bdev, SPDK_BDEV_IO_TYPE_WRITE); 2209 break; 2210 default: 2211 break; 2212 } 2213 } 2214 2215 return supported; 2216 } 2217 2218 int 2219 spdk_bdev_dump_info_json(struct spdk_bdev *bdev, struct spdk_json_write_ctx *w) 2220 { 2221 if (bdev->fn_table->dump_info_json) { 2222 return bdev->fn_table->dump_info_json(bdev->ctxt, w); 2223 } 2224 2225 return 0; 2226 } 2227 2228 static void 2229 bdev_qos_update_max_quota_per_timeslice(struct spdk_bdev_qos *qos) 2230 { 2231 uint32_t max_per_timeslice = 0; 2232 int i; 2233 2234 for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) { 2235 if (qos->rate_limits[i].limit == SPDK_BDEV_QOS_LIMIT_NOT_DEFINED) { 2236 qos->rate_limits[i].max_per_timeslice = 0; 2237 continue; 2238 } 2239 2240 max_per_timeslice = qos->rate_limits[i].limit * 2241 SPDK_BDEV_QOS_TIMESLICE_IN_USEC / SPDK_SEC_TO_USEC; 2242 2243 qos->rate_limits[i].max_per_timeslice = spdk_max(max_per_timeslice, 2244 qos->rate_limits[i].min_per_timeslice); 2245 2246 qos->rate_limits[i].remaining_this_timeslice = qos->rate_limits[i].max_per_timeslice; 2247 } 2248 2249 bdev_qos_set_ops(qos); 2250 } 2251 2252 static int 2253 bdev_channel_poll_qos(void *arg) 2254 { 2255 struct spdk_bdev_qos *qos = arg; 2256 uint64_t now = spdk_get_ticks(); 2257 int i; 2258 2259 if (now < (qos->last_timeslice + qos->timeslice_size)) { 2260 /* We received our callback earlier than expected - return 2261 * immediately and wait to do accounting until at least one 2262 * timeslice has actually expired. This should never happen 2263 * with a well-behaved timer implementation. 2264 */ 2265 return 0; 2266 } 2267 2268 /* Reset for next round of rate limiting */ 2269 for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) { 2270 /* We may have allowed the IOs or bytes to slightly overrun in the last 2271 * timeslice. remaining_this_timeslice is signed, so if it's negative 2272 * here, we'll account for the overrun so that the next timeslice will 2273 * be appropriately reduced. 2274 */ 2275 if (qos->rate_limits[i].remaining_this_timeslice > 0) { 2276 qos->rate_limits[i].remaining_this_timeslice = 0; 2277 } 2278 } 2279 2280 while (now >= (qos->last_timeslice + qos->timeslice_size)) { 2281 qos->last_timeslice += qos->timeslice_size; 2282 for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) { 2283 qos->rate_limits[i].remaining_this_timeslice += 2284 qos->rate_limits[i].max_per_timeslice; 2285 } 2286 } 2287 2288 return bdev_qos_io_submit(qos->ch, qos); 2289 } 2290 2291 static void 2292 bdev_channel_destroy_resource(struct spdk_bdev_channel *ch) 2293 { 2294 struct spdk_bdev_shared_resource *shared_resource; 2295 struct lba_range *range; 2296 2297 while (!TAILQ_EMPTY(&ch->locked_ranges)) { 2298 range = TAILQ_FIRST(&ch->locked_ranges); 2299 TAILQ_REMOVE(&ch->locked_ranges, range, tailq); 2300 free(range); 2301 } 2302 2303 spdk_put_io_channel(ch->channel); 2304 2305 shared_resource = ch->shared_resource; 2306 2307 assert(TAILQ_EMPTY(&ch->io_locked)); 2308 assert(TAILQ_EMPTY(&ch->io_submitted)); 2309 assert(ch->io_outstanding == 0); 2310 assert(shared_resource->ref > 0); 2311 shared_resource->ref--; 2312 if (shared_resource->ref == 0) { 2313 assert(shared_resource->io_outstanding == 0); 2314 TAILQ_REMOVE(&shared_resource->mgmt_ch->shared_resources, shared_resource, link); 2315 spdk_put_io_channel(spdk_io_channel_from_ctx(shared_resource->mgmt_ch)); 2316 free(shared_resource); 2317 } 2318 } 2319 2320 /* Caller must hold bdev->internal.mutex. */ 2321 static void 2322 bdev_enable_qos(struct spdk_bdev *bdev, struct spdk_bdev_channel *ch) 2323 { 2324 struct spdk_bdev_qos *qos = bdev->internal.qos; 2325 int i; 2326 2327 /* Rate limiting on this bdev enabled */ 2328 if (qos) { 2329 if (qos->ch == NULL) { 2330 struct spdk_io_channel *io_ch; 2331 2332 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Selecting channel %p as QoS channel for bdev %s on thread %p\n", ch, 2333 bdev->name, spdk_get_thread()); 2334 2335 /* No qos channel has been selected, so set one up */ 2336 2337 /* Take another reference to ch */ 2338 io_ch = spdk_get_io_channel(__bdev_to_io_dev(bdev)); 2339 assert(io_ch != NULL); 2340 qos->ch = ch; 2341 2342 qos->thread = spdk_io_channel_get_thread(io_ch); 2343 2344 TAILQ_INIT(&qos->queued); 2345 2346 for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) { 2347 if (bdev_qos_is_iops_rate_limit(i) == true) { 2348 qos->rate_limits[i].min_per_timeslice = 2349 SPDK_BDEV_QOS_MIN_IO_PER_TIMESLICE; 2350 } else { 2351 qos->rate_limits[i].min_per_timeslice = 2352 SPDK_BDEV_QOS_MIN_BYTE_PER_TIMESLICE; 2353 } 2354 2355 if (qos->rate_limits[i].limit == 0) { 2356 qos->rate_limits[i].limit = SPDK_BDEV_QOS_LIMIT_NOT_DEFINED; 2357 } 2358 } 2359 bdev_qos_update_max_quota_per_timeslice(qos); 2360 qos->timeslice_size = 2361 SPDK_BDEV_QOS_TIMESLICE_IN_USEC * spdk_get_ticks_hz() / SPDK_SEC_TO_USEC; 2362 qos->last_timeslice = spdk_get_ticks(); 2363 qos->poller = SPDK_POLLER_REGISTER(bdev_channel_poll_qos, 2364 qos, 2365 SPDK_BDEV_QOS_TIMESLICE_IN_USEC); 2366 } 2367 2368 ch->flags |= BDEV_CH_QOS_ENABLED; 2369 } 2370 } 2371 2372 struct poll_timeout_ctx { 2373 struct spdk_bdev_desc *desc; 2374 uint64_t timeout_in_sec; 2375 spdk_bdev_io_timeout_cb cb_fn; 2376 void *cb_arg; 2377 }; 2378 2379 static void 2380 bdev_desc_free(struct spdk_bdev_desc *desc) 2381 { 2382 pthread_mutex_destroy(&desc->mutex); 2383 free(desc->media_events_buffer); 2384 free(desc); 2385 } 2386 2387 static void 2388 bdev_channel_poll_timeout_io_done(struct spdk_io_channel_iter *i, int status) 2389 { 2390 struct poll_timeout_ctx *ctx = spdk_io_channel_iter_get_ctx(i); 2391 struct spdk_bdev_desc *desc = ctx->desc; 2392 2393 free(ctx); 2394 2395 pthread_mutex_lock(&desc->mutex); 2396 desc->refs--; 2397 if (desc->closed == true && desc->refs == 0) { 2398 pthread_mutex_unlock(&desc->mutex); 2399 bdev_desc_free(desc); 2400 return; 2401 } 2402 pthread_mutex_unlock(&desc->mutex); 2403 } 2404 2405 static void 2406 bdev_channel_poll_timeout_io(struct spdk_io_channel_iter *i) 2407 { 2408 struct poll_timeout_ctx *ctx = spdk_io_channel_iter_get_ctx(i); 2409 struct spdk_io_channel *io_ch = spdk_io_channel_iter_get_channel(i); 2410 struct spdk_bdev_channel *bdev_ch = spdk_io_channel_get_ctx(io_ch); 2411 struct spdk_bdev_desc *desc = ctx->desc; 2412 struct spdk_bdev_io *bdev_io; 2413 uint64_t now; 2414 2415 pthread_mutex_lock(&desc->mutex); 2416 if (desc->closed == true) { 2417 pthread_mutex_unlock(&desc->mutex); 2418 spdk_for_each_channel_continue(i, -1); 2419 return; 2420 } 2421 pthread_mutex_unlock(&desc->mutex); 2422 2423 now = spdk_get_ticks(); 2424 TAILQ_FOREACH(bdev_io, &bdev_ch->io_submitted, internal.ch_link) { 2425 /* Exclude any I/O that are generated via splitting. */ 2426 if (bdev_io->internal.cb == bdev_io_split_done) { 2427 continue; 2428 } 2429 2430 /* Once we find an I/O that has not timed out, we can immediately 2431 * exit the loop. 2432 */ 2433 if (now < (bdev_io->internal.submit_tsc + 2434 ctx->timeout_in_sec * spdk_get_ticks_hz())) { 2435 goto end; 2436 } 2437 2438 if (bdev_io->internal.desc == desc) { 2439 ctx->cb_fn(ctx->cb_arg, bdev_io); 2440 } 2441 } 2442 2443 end: 2444 spdk_for_each_channel_continue(i, 0); 2445 } 2446 2447 static int 2448 bdev_poll_timeout_io(void *arg) 2449 { 2450 struct spdk_bdev_desc *desc = arg; 2451 struct spdk_bdev *bdev = spdk_bdev_desc_get_bdev(desc); 2452 struct poll_timeout_ctx *ctx; 2453 2454 ctx = calloc(1, sizeof(struct poll_timeout_ctx)); 2455 if (!ctx) { 2456 SPDK_ERRLOG("failed to allocate memory\n"); 2457 return 1; 2458 } 2459 ctx->desc = desc; 2460 ctx->cb_arg = desc->cb_arg; 2461 ctx->cb_fn = desc->cb_fn; 2462 ctx->timeout_in_sec = desc->timeout_in_sec; 2463 2464 /* Take a ref on the descriptor in case it gets closed while we are checking 2465 * all of the channels. 2466 */ 2467 pthread_mutex_lock(&desc->mutex); 2468 desc->refs++; 2469 pthread_mutex_unlock(&desc->mutex); 2470 2471 spdk_for_each_channel(__bdev_to_io_dev(bdev), 2472 bdev_channel_poll_timeout_io, 2473 ctx, 2474 bdev_channel_poll_timeout_io_done); 2475 2476 return 1; 2477 } 2478 2479 int 2480 spdk_bdev_set_timeout(struct spdk_bdev_desc *desc, uint64_t timeout_in_sec, 2481 spdk_bdev_io_timeout_cb cb_fn, void *cb_arg) 2482 { 2483 assert(desc->thread == spdk_get_thread()); 2484 2485 spdk_poller_unregister(&desc->io_timeout_poller); 2486 2487 if (timeout_in_sec) { 2488 assert(cb_fn != NULL); 2489 desc->io_timeout_poller = SPDK_POLLER_REGISTER(bdev_poll_timeout_io, 2490 desc, 2491 SPDK_BDEV_IO_POLL_INTERVAL_IN_MSEC * SPDK_SEC_TO_USEC / 2492 1000); 2493 if (desc->io_timeout_poller == NULL) { 2494 SPDK_ERRLOG("can not register the desc timeout IO poller\n"); 2495 return -1; 2496 } 2497 } 2498 2499 desc->cb_fn = cb_fn; 2500 desc->cb_arg = cb_arg; 2501 desc->timeout_in_sec = timeout_in_sec; 2502 2503 return 0; 2504 } 2505 2506 static int 2507 bdev_channel_create(void *io_device, void *ctx_buf) 2508 { 2509 struct spdk_bdev *bdev = __bdev_from_io_dev(io_device); 2510 struct spdk_bdev_channel *ch = ctx_buf; 2511 struct spdk_io_channel *mgmt_io_ch; 2512 struct spdk_bdev_mgmt_channel *mgmt_ch; 2513 struct spdk_bdev_shared_resource *shared_resource; 2514 struct lba_range *range; 2515 2516 ch->bdev = bdev; 2517 ch->channel = bdev->fn_table->get_io_channel(bdev->ctxt); 2518 if (!ch->channel) { 2519 return -1; 2520 } 2521 2522 assert(ch->histogram == NULL); 2523 if (bdev->internal.histogram_enabled) { 2524 ch->histogram = spdk_histogram_data_alloc(); 2525 if (ch->histogram == NULL) { 2526 SPDK_ERRLOG("Could not allocate histogram\n"); 2527 } 2528 } 2529 2530 mgmt_io_ch = spdk_get_io_channel(&g_bdev_mgr); 2531 if (!mgmt_io_ch) { 2532 spdk_put_io_channel(ch->channel); 2533 return -1; 2534 } 2535 2536 mgmt_ch = spdk_io_channel_get_ctx(mgmt_io_ch); 2537 TAILQ_FOREACH(shared_resource, &mgmt_ch->shared_resources, link) { 2538 if (shared_resource->shared_ch == ch->channel) { 2539 spdk_put_io_channel(mgmt_io_ch); 2540 shared_resource->ref++; 2541 break; 2542 } 2543 } 2544 2545 if (shared_resource == NULL) { 2546 shared_resource = calloc(1, sizeof(*shared_resource)); 2547 if (shared_resource == NULL) { 2548 spdk_put_io_channel(ch->channel); 2549 spdk_put_io_channel(mgmt_io_ch); 2550 return -1; 2551 } 2552 2553 shared_resource->mgmt_ch = mgmt_ch; 2554 shared_resource->io_outstanding = 0; 2555 TAILQ_INIT(&shared_resource->nomem_io); 2556 shared_resource->nomem_threshold = 0; 2557 shared_resource->shared_ch = ch->channel; 2558 shared_resource->ref = 1; 2559 TAILQ_INSERT_TAIL(&mgmt_ch->shared_resources, shared_resource, link); 2560 } 2561 2562 memset(&ch->stat, 0, sizeof(ch->stat)); 2563 ch->stat.ticks_rate = spdk_get_ticks_hz(); 2564 ch->io_outstanding = 0; 2565 TAILQ_INIT(&ch->queued_resets); 2566 TAILQ_INIT(&ch->locked_ranges); 2567 ch->flags = 0; 2568 ch->shared_resource = shared_resource; 2569 2570 TAILQ_INIT(&ch->io_submitted); 2571 TAILQ_INIT(&ch->io_locked); 2572 2573 #ifdef SPDK_CONFIG_VTUNE 2574 { 2575 char *name; 2576 __itt_init_ittlib(NULL, 0); 2577 name = spdk_sprintf_alloc("spdk_bdev_%s_%p", ch->bdev->name, ch); 2578 if (!name) { 2579 bdev_channel_destroy_resource(ch); 2580 return -1; 2581 } 2582 ch->handle = __itt_string_handle_create(name); 2583 free(name); 2584 ch->start_tsc = spdk_get_ticks(); 2585 ch->interval_tsc = spdk_get_ticks_hz() / 100; 2586 memset(&ch->prev_stat, 0, sizeof(ch->prev_stat)); 2587 } 2588 #endif 2589 2590 pthread_mutex_lock(&bdev->internal.mutex); 2591 bdev_enable_qos(bdev, ch); 2592 2593 TAILQ_FOREACH(range, &bdev->internal.locked_ranges, tailq) { 2594 struct lba_range *new_range; 2595 2596 new_range = calloc(1, sizeof(*new_range)); 2597 if (new_range == NULL) { 2598 pthread_mutex_unlock(&bdev->internal.mutex); 2599 bdev_channel_destroy_resource(ch); 2600 return -1; 2601 } 2602 new_range->length = range->length; 2603 new_range->offset = range->offset; 2604 new_range->locked_ctx = range->locked_ctx; 2605 TAILQ_INSERT_TAIL(&ch->locked_ranges, new_range, tailq); 2606 } 2607 2608 pthread_mutex_unlock(&bdev->internal.mutex); 2609 2610 return 0; 2611 } 2612 2613 /* 2614 * Abort I/O that are waiting on a data buffer. These types of I/O are 2615 * linked using the spdk_bdev_io internal.buf_link TAILQ_ENTRY. 2616 */ 2617 static void 2618 bdev_abort_all_buf_io(bdev_io_stailq_t *queue, struct spdk_bdev_channel *ch) 2619 { 2620 bdev_io_stailq_t tmp; 2621 struct spdk_bdev_io *bdev_io; 2622 2623 STAILQ_INIT(&tmp); 2624 2625 while (!STAILQ_EMPTY(queue)) { 2626 bdev_io = STAILQ_FIRST(queue); 2627 STAILQ_REMOVE_HEAD(queue, internal.buf_link); 2628 if (bdev_io->internal.ch == ch) { 2629 spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED); 2630 } else { 2631 STAILQ_INSERT_TAIL(&tmp, bdev_io, internal.buf_link); 2632 } 2633 } 2634 2635 STAILQ_SWAP(&tmp, queue, spdk_bdev_io); 2636 } 2637 2638 /* 2639 * Abort I/O that are queued waiting for submission. These types of I/O are 2640 * linked using the spdk_bdev_io link TAILQ_ENTRY. 2641 */ 2642 static void 2643 bdev_abort_all_queued_io(bdev_io_tailq_t *queue, struct spdk_bdev_channel *ch) 2644 { 2645 struct spdk_bdev_io *bdev_io, *tmp; 2646 2647 TAILQ_FOREACH_SAFE(bdev_io, queue, internal.link, tmp) { 2648 if (bdev_io->internal.ch == ch) { 2649 TAILQ_REMOVE(queue, bdev_io, internal.link); 2650 /* 2651 * spdk_bdev_io_complete() assumes that the completed I/O had 2652 * been submitted to the bdev module. Since in this case it 2653 * hadn't, bump io_outstanding to account for the decrement 2654 * that spdk_bdev_io_complete() will do. 2655 */ 2656 if (bdev_io->type != SPDK_BDEV_IO_TYPE_RESET) { 2657 ch->io_outstanding++; 2658 ch->shared_resource->io_outstanding++; 2659 } 2660 spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED); 2661 } 2662 } 2663 } 2664 2665 static bool 2666 bdev_abort_queued_io(bdev_io_tailq_t *queue, struct spdk_bdev_io *bio_to_abort) 2667 { 2668 struct spdk_bdev_io *bdev_io; 2669 2670 TAILQ_FOREACH(bdev_io, queue, internal.link) { 2671 if (bdev_io == bio_to_abort) { 2672 TAILQ_REMOVE(queue, bio_to_abort, internal.link); 2673 spdk_bdev_io_complete(bio_to_abort, SPDK_BDEV_IO_STATUS_ABORTED); 2674 return true; 2675 } 2676 } 2677 2678 return false; 2679 } 2680 2681 static bool 2682 bdev_abort_buf_io(bdev_io_stailq_t *queue, struct spdk_bdev_io *bio_to_abort) 2683 { 2684 struct spdk_bdev_io *bdev_io; 2685 2686 STAILQ_FOREACH(bdev_io, queue, internal.buf_link) { 2687 if (bdev_io == bio_to_abort) { 2688 STAILQ_REMOVE(queue, bio_to_abort, spdk_bdev_io, internal.buf_link); 2689 spdk_bdev_io_complete(bio_to_abort, SPDK_BDEV_IO_STATUS_ABORTED); 2690 return true; 2691 } 2692 } 2693 2694 return false; 2695 } 2696 2697 static void 2698 bdev_qos_channel_destroy(void *cb_arg) 2699 { 2700 struct spdk_bdev_qos *qos = cb_arg; 2701 2702 spdk_put_io_channel(spdk_io_channel_from_ctx(qos->ch)); 2703 spdk_poller_unregister(&qos->poller); 2704 2705 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Free QoS %p.\n", qos); 2706 2707 free(qos); 2708 } 2709 2710 static int 2711 bdev_qos_destroy(struct spdk_bdev *bdev) 2712 { 2713 int i; 2714 2715 /* 2716 * Cleanly shutting down the QoS poller is tricky, because 2717 * during the asynchronous operation the user could open 2718 * a new descriptor and create a new channel, spawning 2719 * a new QoS poller. 2720 * 2721 * The strategy is to create a new QoS structure here and swap it 2722 * in. The shutdown path then continues to refer to the old one 2723 * until it completes and then releases it. 2724 */ 2725 struct spdk_bdev_qos *new_qos, *old_qos; 2726 2727 old_qos = bdev->internal.qos; 2728 2729 new_qos = calloc(1, sizeof(*new_qos)); 2730 if (!new_qos) { 2731 SPDK_ERRLOG("Unable to allocate memory to shut down QoS.\n"); 2732 return -ENOMEM; 2733 } 2734 2735 /* Copy the old QoS data into the newly allocated structure */ 2736 memcpy(new_qos, old_qos, sizeof(*new_qos)); 2737 2738 /* Zero out the key parts of the QoS structure */ 2739 new_qos->ch = NULL; 2740 new_qos->thread = NULL; 2741 new_qos->poller = NULL; 2742 TAILQ_INIT(&new_qos->queued); 2743 /* 2744 * The limit member of spdk_bdev_qos_limit structure is not zeroed. 2745 * It will be used later for the new QoS structure. 2746 */ 2747 for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) { 2748 new_qos->rate_limits[i].remaining_this_timeslice = 0; 2749 new_qos->rate_limits[i].min_per_timeslice = 0; 2750 new_qos->rate_limits[i].max_per_timeslice = 0; 2751 } 2752 2753 bdev->internal.qos = new_qos; 2754 2755 if (old_qos->thread == NULL) { 2756 free(old_qos); 2757 } else { 2758 spdk_thread_send_msg(old_qos->thread, bdev_qos_channel_destroy, old_qos); 2759 } 2760 2761 /* It is safe to continue with destroying the bdev even though the QoS channel hasn't 2762 * been destroyed yet. The destruction path will end up waiting for the final 2763 * channel to be put before it releases resources. */ 2764 2765 return 0; 2766 } 2767 2768 static void 2769 bdev_io_stat_add(struct spdk_bdev_io_stat *total, struct spdk_bdev_io_stat *add) 2770 { 2771 total->bytes_read += add->bytes_read; 2772 total->num_read_ops += add->num_read_ops; 2773 total->bytes_written += add->bytes_written; 2774 total->num_write_ops += add->num_write_ops; 2775 total->bytes_unmapped += add->bytes_unmapped; 2776 total->num_unmap_ops += add->num_unmap_ops; 2777 total->read_latency_ticks += add->read_latency_ticks; 2778 total->write_latency_ticks += add->write_latency_ticks; 2779 total->unmap_latency_ticks += add->unmap_latency_ticks; 2780 } 2781 2782 static void 2783 bdev_channel_destroy(void *io_device, void *ctx_buf) 2784 { 2785 struct spdk_bdev_channel *ch = ctx_buf; 2786 struct spdk_bdev_mgmt_channel *mgmt_ch; 2787 struct spdk_bdev_shared_resource *shared_resource = ch->shared_resource; 2788 2789 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Destroying channel %p for bdev %s on thread %p\n", ch, ch->bdev->name, 2790 spdk_get_thread()); 2791 2792 /* This channel is going away, so add its statistics into the bdev so that they don't get lost. */ 2793 pthread_mutex_lock(&ch->bdev->internal.mutex); 2794 bdev_io_stat_add(&ch->bdev->internal.stat, &ch->stat); 2795 pthread_mutex_unlock(&ch->bdev->internal.mutex); 2796 2797 mgmt_ch = shared_resource->mgmt_ch; 2798 2799 bdev_abort_all_queued_io(&ch->queued_resets, ch); 2800 bdev_abort_all_queued_io(&shared_resource->nomem_io, ch); 2801 bdev_abort_all_buf_io(&mgmt_ch->need_buf_small, ch); 2802 bdev_abort_all_buf_io(&mgmt_ch->need_buf_large, ch); 2803 2804 if (ch->histogram) { 2805 spdk_histogram_data_free(ch->histogram); 2806 } 2807 2808 bdev_channel_destroy_resource(ch); 2809 } 2810 2811 int 2812 spdk_bdev_alias_add(struct spdk_bdev *bdev, const char *alias) 2813 { 2814 struct spdk_bdev_alias *tmp; 2815 2816 if (alias == NULL) { 2817 SPDK_ERRLOG("Empty alias passed\n"); 2818 return -EINVAL; 2819 } 2820 2821 if (spdk_bdev_get_by_name(alias)) { 2822 SPDK_ERRLOG("Bdev name/alias: %s already exists\n", alias); 2823 return -EEXIST; 2824 } 2825 2826 tmp = calloc(1, sizeof(*tmp)); 2827 if (tmp == NULL) { 2828 SPDK_ERRLOG("Unable to allocate alias\n"); 2829 return -ENOMEM; 2830 } 2831 2832 tmp->alias = strdup(alias); 2833 if (tmp->alias == NULL) { 2834 free(tmp); 2835 SPDK_ERRLOG("Unable to allocate alias\n"); 2836 return -ENOMEM; 2837 } 2838 2839 TAILQ_INSERT_TAIL(&bdev->aliases, tmp, tailq); 2840 2841 return 0; 2842 } 2843 2844 int 2845 spdk_bdev_alias_del(struct spdk_bdev *bdev, const char *alias) 2846 { 2847 struct spdk_bdev_alias *tmp; 2848 2849 TAILQ_FOREACH(tmp, &bdev->aliases, tailq) { 2850 if (strcmp(alias, tmp->alias) == 0) { 2851 TAILQ_REMOVE(&bdev->aliases, tmp, tailq); 2852 free(tmp->alias); 2853 free(tmp); 2854 return 0; 2855 } 2856 } 2857 2858 SPDK_INFOLOG(SPDK_LOG_BDEV, "Alias %s does not exists\n", alias); 2859 2860 return -ENOENT; 2861 } 2862 2863 void 2864 spdk_bdev_alias_del_all(struct spdk_bdev *bdev) 2865 { 2866 struct spdk_bdev_alias *p, *tmp; 2867 2868 TAILQ_FOREACH_SAFE(p, &bdev->aliases, tailq, tmp) { 2869 TAILQ_REMOVE(&bdev->aliases, p, tailq); 2870 free(p->alias); 2871 free(p); 2872 } 2873 } 2874 2875 struct spdk_io_channel * 2876 spdk_bdev_get_io_channel(struct spdk_bdev_desc *desc) 2877 { 2878 return spdk_get_io_channel(__bdev_to_io_dev(spdk_bdev_desc_get_bdev(desc))); 2879 } 2880 2881 const char * 2882 spdk_bdev_get_name(const struct spdk_bdev *bdev) 2883 { 2884 return bdev->name; 2885 } 2886 2887 const char * 2888 spdk_bdev_get_product_name(const struct spdk_bdev *bdev) 2889 { 2890 return bdev->product_name; 2891 } 2892 2893 const struct spdk_bdev_aliases_list * 2894 spdk_bdev_get_aliases(const struct spdk_bdev *bdev) 2895 { 2896 return &bdev->aliases; 2897 } 2898 2899 uint32_t 2900 spdk_bdev_get_block_size(const struct spdk_bdev *bdev) 2901 { 2902 return bdev->blocklen; 2903 } 2904 2905 uint32_t 2906 spdk_bdev_get_write_unit_size(const struct spdk_bdev *bdev) 2907 { 2908 return bdev->write_unit_size; 2909 } 2910 2911 uint64_t 2912 spdk_bdev_get_num_blocks(const struct spdk_bdev *bdev) 2913 { 2914 return bdev->blockcnt; 2915 } 2916 2917 const char * 2918 spdk_bdev_get_qos_rpc_type(enum spdk_bdev_qos_rate_limit_type type) 2919 { 2920 return qos_rpc_type[type]; 2921 } 2922 2923 void 2924 spdk_bdev_get_qos_rate_limits(struct spdk_bdev *bdev, uint64_t *limits) 2925 { 2926 int i; 2927 2928 memset(limits, 0, sizeof(*limits) * SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES); 2929 2930 pthread_mutex_lock(&bdev->internal.mutex); 2931 if (bdev->internal.qos) { 2932 for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) { 2933 if (bdev->internal.qos->rate_limits[i].limit != 2934 SPDK_BDEV_QOS_LIMIT_NOT_DEFINED) { 2935 limits[i] = bdev->internal.qos->rate_limits[i].limit; 2936 if (bdev_qos_is_iops_rate_limit(i) == false) { 2937 /* Change from Byte to Megabyte which is user visible. */ 2938 limits[i] = limits[i] / 1024 / 1024; 2939 } 2940 } 2941 } 2942 } 2943 pthread_mutex_unlock(&bdev->internal.mutex); 2944 } 2945 2946 size_t 2947 spdk_bdev_get_buf_align(const struct spdk_bdev *bdev) 2948 { 2949 return 1 << bdev->required_alignment; 2950 } 2951 2952 uint32_t 2953 spdk_bdev_get_optimal_io_boundary(const struct spdk_bdev *bdev) 2954 { 2955 return bdev->optimal_io_boundary; 2956 } 2957 2958 bool 2959 spdk_bdev_has_write_cache(const struct spdk_bdev *bdev) 2960 { 2961 return bdev->write_cache; 2962 } 2963 2964 const struct spdk_uuid * 2965 spdk_bdev_get_uuid(const struct spdk_bdev *bdev) 2966 { 2967 return &bdev->uuid; 2968 } 2969 2970 uint16_t 2971 spdk_bdev_get_acwu(const struct spdk_bdev *bdev) 2972 { 2973 return bdev->acwu; 2974 } 2975 2976 uint32_t 2977 spdk_bdev_get_md_size(const struct spdk_bdev *bdev) 2978 { 2979 return bdev->md_len; 2980 } 2981 2982 bool 2983 spdk_bdev_is_md_interleaved(const struct spdk_bdev *bdev) 2984 { 2985 return (bdev->md_len != 0) && bdev->md_interleave; 2986 } 2987 2988 bool 2989 spdk_bdev_is_md_separate(const struct spdk_bdev *bdev) 2990 { 2991 return (bdev->md_len != 0) && !bdev->md_interleave; 2992 } 2993 2994 bool 2995 spdk_bdev_is_zoned(const struct spdk_bdev *bdev) 2996 { 2997 return bdev->zoned; 2998 } 2999 3000 uint32_t 3001 spdk_bdev_get_data_block_size(const struct spdk_bdev *bdev) 3002 { 3003 if (spdk_bdev_is_md_interleaved(bdev)) { 3004 return bdev->blocklen - bdev->md_len; 3005 } else { 3006 return bdev->blocklen; 3007 } 3008 } 3009 3010 static uint32_t 3011 _bdev_get_block_size_with_md(const struct spdk_bdev *bdev) 3012 { 3013 if (!spdk_bdev_is_md_interleaved(bdev)) { 3014 return bdev->blocklen + bdev->md_len; 3015 } else { 3016 return bdev->blocklen; 3017 } 3018 } 3019 3020 enum spdk_dif_type spdk_bdev_get_dif_type(const struct spdk_bdev *bdev) 3021 { 3022 if (bdev->md_len != 0) { 3023 return bdev->dif_type; 3024 } else { 3025 return SPDK_DIF_DISABLE; 3026 } 3027 } 3028 3029 bool 3030 spdk_bdev_is_dif_head_of_md(const struct spdk_bdev *bdev) 3031 { 3032 if (spdk_bdev_get_dif_type(bdev) != SPDK_DIF_DISABLE) { 3033 return bdev->dif_is_head_of_md; 3034 } else { 3035 return false; 3036 } 3037 } 3038 3039 bool 3040 spdk_bdev_is_dif_check_enabled(const struct spdk_bdev *bdev, 3041 enum spdk_dif_check_type check_type) 3042 { 3043 if (spdk_bdev_get_dif_type(bdev) == SPDK_DIF_DISABLE) { 3044 return false; 3045 } 3046 3047 switch (check_type) { 3048 case SPDK_DIF_CHECK_TYPE_REFTAG: 3049 return (bdev->dif_check_flags & SPDK_DIF_FLAGS_REFTAG_CHECK) != 0; 3050 case SPDK_DIF_CHECK_TYPE_APPTAG: 3051 return (bdev->dif_check_flags & SPDK_DIF_FLAGS_APPTAG_CHECK) != 0; 3052 case SPDK_DIF_CHECK_TYPE_GUARD: 3053 return (bdev->dif_check_flags & SPDK_DIF_FLAGS_GUARD_CHECK) != 0; 3054 default: 3055 return false; 3056 } 3057 } 3058 3059 uint64_t 3060 spdk_bdev_get_qd(const struct spdk_bdev *bdev) 3061 { 3062 return bdev->internal.measured_queue_depth; 3063 } 3064 3065 uint64_t 3066 spdk_bdev_get_qd_sampling_period(const struct spdk_bdev *bdev) 3067 { 3068 return bdev->internal.period; 3069 } 3070 3071 uint64_t 3072 spdk_bdev_get_weighted_io_time(const struct spdk_bdev *bdev) 3073 { 3074 return bdev->internal.weighted_io_time; 3075 } 3076 3077 uint64_t 3078 spdk_bdev_get_io_time(const struct spdk_bdev *bdev) 3079 { 3080 return bdev->internal.io_time; 3081 } 3082 3083 static void 3084 _calculate_measured_qd_cpl(struct spdk_io_channel_iter *i, int status) 3085 { 3086 struct spdk_bdev *bdev = spdk_io_channel_iter_get_ctx(i); 3087 3088 bdev->internal.measured_queue_depth = bdev->internal.temporary_queue_depth; 3089 3090 if (bdev->internal.measured_queue_depth) { 3091 bdev->internal.io_time += bdev->internal.period; 3092 bdev->internal.weighted_io_time += bdev->internal.period * bdev->internal.measured_queue_depth; 3093 } 3094 } 3095 3096 static void 3097 _calculate_measured_qd(struct spdk_io_channel_iter *i) 3098 { 3099 struct spdk_bdev *bdev = spdk_io_channel_iter_get_ctx(i); 3100 struct spdk_io_channel *io_ch = spdk_io_channel_iter_get_channel(i); 3101 struct spdk_bdev_channel *ch = spdk_io_channel_get_ctx(io_ch); 3102 3103 bdev->internal.temporary_queue_depth += ch->io_outstanding; 3104 spdk_for_each_channel_continue(i, 0); 3105 } 3106 3107 static int 3108 bdev_calculate_measured_queue_depth(void *ctx) 3109 { 3110 struct spdk_bdev *bdev = ctx; 3111 bdev->internal.temporary_queue_depth = 0; 3112 spdk_for_each_channel(__bdev_to_io_dev(bdev), _calculate_measured_qd, bdev, 3113 _calculate_measured_qd_cpl); 3114 return 0; 3115 } 3116 3117 void 3118 spdk_bdev_set_qd_sampling_period(struct spdk_bdev *bdev, uint64_t period) 3119 { 3120 bdev->internal.period = period; 3121 3122 if (bdev->internal.qd_poller != NULL) { 3123 spdk_poller_unregister(&bdev->internal.qd_poller); 3124 bdev->internal.measured_queue_depth = UINT64_MAX; 3125 } 3126 3127 if (period != 0) { 3128 bdev->internal.qd_poller = SPDK_POLLER_REGISTER(bdev_calculate_measured_queue_depth, bdev, 3129 period); 3130 } 3131 } 3132 3133 static void 3134 _resize_notify(void *arg) 3135 { 3136 struct spdk_bdev_desc *desc = arg; 3137 3138 pthread_mutex_lock(&desc->mutex); 3139 desc->refs--; 3140 if (!desc->closed) { 3141 pthread_mutex_unlock(&desc->mutex); 3142 desc->callback.event_fn(SPDK_BDEV_EVENT_RESIZE, 3143 desc->bdev, 3144 desc->callback.ctx); 3145 return; 3146 } else if (0 == desc->refs) { 3147 /* This descriptor was closed after this resize_notify message was sent. 3148 * spdk_bdev_close() could not free the descriptor since this message was 3149 * in flight, so we free it now using bdev_desc_free(). 3150 */ 3151 pthread_mutex_unlock(&desc->mutex); 3152 bdev_desc_free(desc); 3153 return; 3154 } 3155 pthread_mutex_unlock(&desc->mutex); 3156 } 3157 3158 int 3159 spdk_bdev_notify_blockcnt_change(struct spdk_bdev *bdev, uint64_t size) 3160 { 3161 struct spdk_bdev_desc *desc; 3162 int ret; 3163 3164 pthread_mutex_lock(&bdev->internal.mutex); 3165 3166 /* bdev has open descriptors */ 3167 if (!TAILQ_EMPTY(&bdev->internal.open_descs) && 3168 bdev->blockcnt > size) { 3169 ret = -EBUSY; 3170 } else { 3171 bdev->blockcnt = size; 3172 TAILQ_FOREACH(desc, &bdev->internal.open_descs, link) { 3173 pthread_mutex_lock(&desc->mutex); 3174 if (desc->callback.open_with_ext && !desc->closed) { 3175 desc->refs++; 3176 spdk_thread_send_msg(desc->thread, _resize_notify, desc); 3177 } 3178 pthread_mutex_unlock(&desc->mutex); 3179 } 3180 ret = 0; 3181 } 3182 3183 pthread_mutex_unlock(&bdev->internal.mutex); 3184 3185 return ret; 3186 } 3187 3188 /* 3189 * Convert I/O offset and length from bytes to blocks. 3190 * 3191 * Returns zero on success or non-zero if the byte parameters aren't divisible by the block size. 3192 */ 3193 static uint64_t 3194 bdev_bytes_to_blocks(struct spdk_bdev *bdev, uint64_t offset_bytes, uint64_t *offset_blocks, 3195 uint64_t num_bytes, uint64_t *num_blocks) 3196 { 3197 uint32_t block_size = bdev->blocklen; 3198 uint8_t shift_cnt; 3199 3200 /* Avoid expensive div operations if possible. These spdk_u32 functions are very cheap. */ 3201 if (spdk_likely(spdk_u32_is_pow2(block_size))) { 3202 shift_cnt = spdk_u32log2(block_size); 3203 *offset_blocks = offset_bytes >> shift_cnt; 3204 *num_blocks = num_bytes >> shift_cnt; 3205 return (offset_bytes - (*offset_blocks << shift_cnt)) | 3206 (num_bytes - (*num_blocks << shift_cnt)); 3207 } else { 3208 *offset_blocks = offset_bytes / block_size; 3209 *num_blocks = num_bytes / block_size; 3210 return (offset_bytes % block_size) | (num_bytes % block_size); 3211 } 3212 } 3213 3214 static bool 3215 bdev_io_valid_blocks(struct spdk_bdev *bdev, uint64_t offset_blocks, uint64_t num_blocks) 3216 { 3217 /* Return failure if offset_blocks + num_blocks is less than offset_blocks; indicates there 3218 * has been an overflow and hence the offset has been wrapped around */ 3219 if (offset_blocks + num_blocks < offset_blocks) { 3220 return false; 3221 } 3222 3223 /* Return failure if offset_blocks + num_blocks exceeds the size of the bdev */ 3224 if (offset_blocks + num_blocks > bdev->blockcnt) { 3225 return false; 3226 } 3227 3228 return true; 3229 } 3230 3231 static bool 3232 _bdev_io_check_md_buf(const struct iovec *iovs, const void *md_buf) 3233 { 3234 return _is_buf_allocated(iovs) == (md_buf != NULL); 3235 } 3236 3237 static int 3238 bdev_read_blocks_with_md(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, void *buf, 3239 void *md_buf, int64_t offset_blocks, uint64_t num_blocks, 3240 spdk_bdev_io_completion_cb cb, void *cb_arg) 3241 { 3242 struct spdk_bdev *bdev = spdk_bdev_desc_get_bdev(desc); 3243 struct spdk_bdev_io *bdev_io; 3244 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 3245 3246 if (!bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) { 3247 return -EINVAL; 3248 } 3249 3250 bdev_io = bdev_channel_get_io(channel); 3251 if (!bdev_io) { 3252 return -ENOMEM; 3253 } 3254 3255 bdev_io->internal.ch = channel; 3256 bdev_io->internal.desc = desc; 3257 bdev_io->type = SPDK_BDEV_IO_TYPE_READ; 3258 bdev_io->u.bdev.iovs = &bdev_io->iov; 3259 bdev_io->u.bdev.iovs[0].iov_base = buf; 3260 bdev_io->u.bdev.iovs[0].iov_len = num_blocks * bdev->blocklen; 3261 bdev_io->u.bdev.iovcnt = 1; 3262 bdev_io->u.bdev.md_buf = md_buf; 3263 bdev_io->u.bdev.num_blocks = num_blocks; 3264 bdev_io->u.bdev.offset_blocks = offset_blocks; 3265 bdev_io_init(bdev_io, bdev, cb_arg, cb); 3266 3267 bdev_io_submit(bdev_io); 3268 return 0; 3269 } 3270 3271 int 3272 spdk_bdev_read(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 3273 void *buf, uint64_t offset, uint64_t nbytes, 3274 spdk_bdev_io_completion_cb cb, void *cb_arg) 3275 { 3276 uint64_t offset_blocks, num_blocks; 3277 3278 if (bdev_bytes_to_blocks(spdk_bdev_desc_get_bdev(desc), offset, &offset_blocks, 3279 nbytes, &num_blocks) != 0) { 3280 return -EINVAL; 3281 } 3282 3283 return spdk_bdev_read_blocks(desc, ch, buf, offset_blocks, num_blocks, cb, cb_arg); 3284 } 3285 3286 int 3287 spdk_bdev_read_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 3288 void *buf, uint64_t offset_blocks, uint64_t num_blocks, 3289 spdk_bdev_io_completion_cb cb, void *cb_arg) 3290 { 3291 return bdev_read_blocks_with_md(desc, ch, buf, NULL, offset_blocks, num_blocks, cb, cb_arg); 3292 } 3293 3294 int 3295 spdk_bdev_read_blocks_with_md(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 3296 void *buf, void *md_buf, int64_t offset_blocks, uint64_t num_blocks, 3297 spdk_bdev_io_completion_cb cb, void *cb_arg) 3298 { 3299 struct iovec iov = { 3300 .iov_base = buf, 3301 }; 3302 3303 if (!spdk_bdev_is_md_separate(spdk_bdev_desc_get_bdev(desc))) { 3304 return -EINVAL; 3305 } 3306 3307 if (!_bdev_io_check_md_buf(&iov, md_buf)) { 3308 return -EINVAL; 3309 } 3310 3311 return bdev_read_blocks_with_md(desc, ch, buf, md_buf, offset_blocks, num_blocks, 3312 cb, cb_arg); 3313 } 3314 3315 int 3316 spdk_bdev_readv(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 3317 struct iovec *iov, int iovcnt, 3318 uint64_t offset, uint64_t nbytes, 3319 spdk_bdev_io_completion_cb cb, void *cb_arg) 3320 { 3321 uint64_t offset_blocks, num_blocks; 3322 3323 if (bdev_bytes_to_blocks(spdk_bdev_desc_get_bdev(desc), offset, &offset_blocks, 3324 nbytes, &num_blocks) != 0) { 3325 return -EINVAL; 3326 } 3327 3328 return spdk_bdev_readv_blocks(desc, ch, iov, iovcnt, offset_blocks, num_blocks, cb, cb_arg); 3329 } 3330 3331 static int 3332 bdev_readv_blocks_with_md(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 3333 struct iovec *iov, int iovcnt, void *md_buf, uint64_t offset_blocks, 3334 uint64_t num_blocks, spdk_bdev_io_completion_cb cb, void *cb_arg) 3335 { 3336 struct spdk_bdev *bdev = spdk_bdev_desc_get_bdev(desc); 3337 struct spdk_bdev_io *bdev_io; 3338 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 3339 3340 if (!bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) { 3341 return -EINVAL; 3342 } 3343 3344 bdev_io = bdev_channel_get_io(channel); 3345 if (!bdev_io) { 3346 return -ENOMEM; 3347 } 3348 3349 bdev_io->internal.ch = channel; 3350 bdev_io->internal.desc = desc; 3351 bdev_io->type = SPDK_BDEV_IO_TYPE_READ; 3352 bdev_io->u.bdev.iovs = iov; 3353 bdev_io->u.bdev.iovcnt = iovcnt; 3354 bdev_io->u.bdev.md_buf = md_buf; 3355 bdev_io->u.bdev.num_blocks = num_blocks; 3356 bdev_io->u.bdev.offset_blocks = offset_blocks; 3357 bdev_io_init(bdev_io, bdev, cb_arg, cb); 3358 3359 bdev_io_submit(bdev_io); 3360 return 0; 3361 } 3362 3363 int spdk_bdev_readv_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 3364 struct iovec *iov, int iovcnt, 3365 uint64_t offset_blocks, uint64_t num_blocks, 3366 spdk_bdev_io_completion_cb cb, void *cb_arg) 3367 { 3368 return bdev_readv_blocks_with_md(desc, ch, iov, iovcnt, NULL, offset_blocks, 3369 num_blocks, cb, cb_arg); 3370 } 3371 3372 int 3373 spdk_bdev_readv_blocks_with_md(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 3374 struct iovec *iov, int iovcnt, void *md_buf, 3375 uint64_t offset_blocks, uint64_t num_blocks, 3376 spdk_bdev_io_completion_cb cb, void *cb_arg) 3377 { 3378 if (!spdk_bdev_is_md_separate(spdk_bdev_desc_get_bdev(desc))) { 3379 return -EINVAL; 3380 } 3381 3382 if (!_bdev_io_check_md_buf(iov, md_buf)) { 3383 return -EINVAL; 3384 } 3385 3386 return bdev_readv_blocks_with_md(desc, ch, iov, iovcnt, md_buf, offset_blocks, 3387 num_blocks, cb, cb_arg); 3388 } 3389 3390 static int 3391 bdev_write_blocks_with_md(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 3392 void *buf, void *md_buf, uint64_t offset_blocks, uint64_t num_blocks, 3393 spdk_bdev_io_completion_cb cb, void *cb_arg) 3394 { 3395 struct spdk_bdev *bdev = spdk_bdev_desc_get_bdev(desc); 3396 struct spdk_bdev_io *bdev_io; 3397 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 3398 3399 if (!desc->write) { 3400 return -EBADF; 3401 } 3402 3403 if (!bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) { 3404 return -EINVAL; 3405 } 3406 3407 bdev_io = bdev_channel_get_io(channel); 3408 if (!bdev_io) { 3409 return -ENOMEM; 3410 } 3411 3412 bdev_io->internal.ch = channel; 3413 bdev_io->internal.desc = desc; 3414 bdev_io->type = SPDK_BDEV_IO_TYPE_WRITE; 3415 bdev_io->u.bdev.iovs = &bdev_io->iov; 3416 bdev_io->u.bdev.iovs[0].iov_base = buf; 3417 bdev_io->u.bdev.iovs[0].iov_len = num_blocks * bdev->blocklen; 3418 bdev_io->u.bdev.iovcnt = 1; 3419 bdev_io->u.bdev.md_buf = md_buf; 3420 bdev_io->u.bdev.num_blocks = num_blocks; 3421 bdev_io->u.bdev.offset_blocks = offset_blocks; 3422 bdev_io_init(bdev_io, bdev, cb_arg, cb); 3423 3424 bdev_io_submit(bdev_io); 3425 return 0; 3426 } 3427 3428 int 3429 spdk_bdev_write(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 3430 void *buf, uint64_t offset, uint64_t nbytes, 3431 spdk_bdev_io_completion_cb cb, void *cb_arg) 3432 { 3433 uint64_t offset_blocks, num_blocks; 3434 3435 if (bdev_bytes_to_blocks(spdk_bdev_desc_get_bdev(desc), offset, &offset_blocks, 3436 nbytes, &num_blocks) != 0) { 3437 return -EINVAL; 3438 } 3439 3440 return spdk_bdev_write_blocks(desc, ch, buf, offset_blocks, num_blocks, cb, cb_arg); 3441 } 3442 3443 int 3444 spdk_bdev_write_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 3445 void *buf, uint64_t offset_blocks, uint64_t num_blocks, 3446 spdk_bdev_io_completion_cb cb, void *cb_arg) 3447 { 3448 return bdev_write_blocks_with_md(desc, ch, buf, NULL, offset_blocks, num_blocks, 3449 cb, cb_arg); 3450 } 3451 3452 int 3453 spdk_bdev_write_blocks_with_md(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 3454 void *buf, void *md_buf, uint64_t offset_blocks, uint64_t num_blocks, 3455 spdk_bdev_io_completion_cb cb, void *cb_arg) 3456 { 3457 struct iovec iov = { 3458 .iov_base = buf, 3459 }; 3460 3461 if (!spdk_bdev_is_md_separate(spdk_bdev_desc_get_bdev(desc))) { 3462 return -EINVAL; 3463 } 3464 3465 if (!_bdev_io_check_md_buf(&iov, md_buf)) { 3466 return -EINVAL; 3467 } 3468 3469 return bdev_write_blocks_with_md(desc, ch, buf, md_buf, offset_blocks, num_blocks, 3470 cb, cb_arg); 3471 } 3472 3473 static int 3474 bdev_writev_blocks_with_md(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 3475 struct iovec *iov, int iovcnt, void *md_buf, 3476 uint64_t offset_blocks, uint64_t num_blocks, 3477 spdk_bdev_io_completion_cb cb, void *cb_arg) 3478 { 3479 struct spdk_bdev *bdev = spdk_bdev_desc_get_bdev(desc); 3480 struct spdk_bdev_io *bdev_io; 3481 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 3482 3483 if (!desc->write) { 3484 return -EBADF; 3485 } 3486 3487 if (!bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) { 3488 return -EINVAL; 3489 } 3490 3491 bdev_io = bdev_channel_get_io(channel); 3492 if (!bdev_io) { 3493 return -ENOMEM; 3494 } 3495 3496 bdev_io->internal.ch = channel; 3497 bdev_io->internal.desc = desc; 3498 bdev_io->type = SPDK_BDEV_IO_TYPE_WRITE; 3499 bdev_io->u.bdev.iovs = iov; 3500 bdev_io->u.bdev.iovcnt = iovcnt; 3501 bdev_io->u.bdev.md_buf = md_buf; 3502 bdev_io->u.bdev.num_blocks = num_blocks; 3503 bdev_io->u.bdev.offset_blocks = offset_blocks; 3504 bdev_io_init(bdev_io, bdev, cb_arg, cb); 3505 3506 bdev_io_submit(bdev_io); 3507 return 0; 3508 } 3509 3510 int 3511 spdk_bdev_writev(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 3512 struct iovec *iov, int iovcnt, 3513 uint64_t offset, uint64_t len, 3514 spdk_bdev_io_completion_cb cb, void *cb_arg) 3515 { 3516 uint64_t offset_blocks, num_blocks; 3517 3518 if (bdev_bytes_to_blocks(spdk_bdev_desc_get_bdev(desc), offset, &offset_blocks, 3519 len, &num_blocks) != 0) { 3520 return -EINVAL; 3521 } 3522 3523 return spdk_bdev_writev_blocks(desc, ch, iov, iovcnt, offset_blocks, num_blocks, cb, cb_arg); 3524 } 3525 3526 int 3527 spdk_bdev_writev_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 3528 struct iovec *iov, int iovcnt, 3529 uint64_t offset_blocks, uint64_t num_blocks, 3530 spdk_bdev_io_completion_cb cb, void *cb_arg) 3531 { 3532 return bdev_writev_blocks_with_md(desc, ch, iov, iovcnt, NULL, offset_blocks, 3533 num_blocks, cb, cb_arg); 3534 } 3535 3536 int 3537 spdk_bdev_writev_blocks_with_md(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 3538 struct iovec *iov, int iovcnt, void *md_buf, 3539 uint64_t offset_blocks, uint64_t num_blocks, 3540 spdk_bdev_io_completion_cb cb, void *cb_arg) 3541 { 3542 if (!spdk_bdev_is_md_separate(spdk_bdev_desc_get_bdev(desc))) { 3543 return -EINVAL; 3544 } 3545 3546 if (!_bdev_io_check_md_buf(iov, md_buf)) { 3547 return -EINVAL; 3548 } 3549 3550 return bdev_writev_blocks_with_md(desc, ch, iov, iovcnt, md_buf, offset_blocks, 3551 num_blocks, cb, cb_arg); 3552 } 3553 3554 static void 3555 bdev_compare_do_read_done(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg) 3556 { 3557 struct spdk_bdev_io *parent_io = cb_arg; 3558 uint8_t *read_buf = bdev_io->u.bdev.iovs[0].iov_base; 3559 int i, rc = 0; 3560 3561 if (!success) { 3562 parent_io->internal.status = SPDK_BDEV_IO_STATUS_FAILED; 3563 parent_io->internal.cb(parent_io, false, parent_io->internal.caller_ctx); 3564 spdk_bdev_free_io(bdev_io); 3565 return; 3566 } 3567 3568 for (i = 0; i < parent_io->u.bdev.iovcnt; i++) { 3569 rc = memcmp(read_buf, 3570 parent_io->u.bdev.iovs[i].iov_base, 3571 parent_io->u.bdev.iovs[i].iov_len); 3572 if (rc) { 3573 break; 3574 } 3575 read_buf += parent_io->u.bdev.iovs[i].iov_len; 3576 } 3577 3578 spdk_bdev_free_io(bdev_io); 3579 3580 if (rc == 0) { 3581 parent_io->internal.status = SPDK_BDEV_IO_STATUS_SUCCESS; 3582 parent_io->internal.cb(parent_io, true, parent_io->internal.caller_ctx); 3583 } else { 3584 parent_io->internal.status = SPDK_BDEV_IO_STATUS_MISCOMPARE; 3585 parent_io->internal.cb(parent_io, false, parent_io->internal.caller_ctx); 3586 } 3587 } 3588 3589 static void 3590 bdev_compare_do_read(void *_bdev_io) 3591 { 3592 struct spdk_bdev_io *bdev_io = _bdev_io; 3593 int rc; 3594 3595 rc = spdk_bdev_read_blocks(bdev_io->internal.desc, 3596 spdk_io_channel_from_ctx(bdev_io->internal.ch), NULL, 3597 bdev_io->u.bdev.offset_blocks, bdev_io->u.bdev.num_blocks, 3598 bdev_compare_do_read_done, bdev_io); 3599 3600 if (rc == -ENOMEM) { 3601 bdev_queue_io_wait_with_cb(bdev_io, bdev_compare_do_read); 3602 } else if (rc != 0) { 3603 bdev_io->internal.status = SPDK_BDEV_IO_STATUS_FAILED; 3604 bdev_io->internal.cb(bdev_io, false, bdev_io->internal.caller_ctx); 3605 } 3606 } 3607 3608 static int 3609 bdev_comparev_blocks_with_md(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 3610 struct iovec *iov, int iovcnt, void *md_buf, 3611 uint64_t offset_blocks, uint64_t num_blocks, 3612 spdk_bdev_io_completion_cb cb, void *cb_arg) 3613 { 3614 struct spdk_bdev *bdev = spdk_bdev_desc_get_bdev(desc); 3615 struct spdk_bdev_io *bdev_io; 3616 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 3617 3618 if (!bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) { 3619 return -EINVAL; 3620 } 3621 3622 bdev_io = bdev_channel_get_io(channel); 3623 if (!bdev_io) { 3624 return -ENOMEM; 3625 } 3626 3627 bdev_io->internal.ch = channel; 3628 bdev_io->internal.desc = desc; 3629 bdev_io->type = SPDK_BDEV_IO_TYPE_COMPARE; 3630 bdev_io->u.bdev.iovs = iov; 3631 bdev_io->u.bdev.iovcnt = iovcnt; 3632 bdev_io->u.bdev.md_buf = md_buf; 3633 bdev_io->u.bdev.num_blocks = num_blocks; 3634 bdev_io->u.bdev.offset_blocks = offset_blocks; 3635 bdev_io_init(bdev_io, bdev, cb_arg, cb); 3636 3637 if (bdev_io_type_supported(bdev, SPDK_BDEV_IO_TYPE_COMPARE)) { 3638 bdev_io_submit(bdev_io); 3639 return 0; 3640 } 3641 3642 bdev_compare_do_read(bdev_io); 3643 3644 return 0; 3645 } 3646 3647 int 3648 spdk_bdev_comparev_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 3649 struct iovec *iov, int iovcnt, 3650 uint64_t offset_blocks, uint64_t num_blocks, 3651 spdk_bdev_io_completion_cb cb, void *cb_arg) 3652 { 3653 return bdev_comparev_blocks_with_md(desc, ch, iov, iovcnt, NULL, offset_blocks, 3654 num_blocks, cb, cb_arg); 3655 } 3656 3657 int 3658 spdk_bdev_comparev_blocks_with_md(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 3659 struct iovec *iov, int iovcnt, void *md_buf, 3660 uint64_t offset_blocks, uint64_t num_blocks, 3661 spdk_bdev_io_completion_cb cb, void *cb_arg) 3662 { 3663 if (!spdk_bdev_is_md_separate(spdk_bdev_desc_get_bdev(desc))) { 3664 return -EINVAL; 3665 } 3666 3667 if (!_bdev_io_check_md_buf(iov, md_buf)) { 3668 return -EINVAL; 3669 } 3670 3671 return bdev_comparev_blocks_with_md(desc, ch, iov, iovcnt, md_buf, offset_blocks, 3672 num_blocks, cb, cb_arg); 3673 } 3674 3675 static int 3676 bdev_compare_blocks_with_md(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 3677 void *buf, void *md_buf, uint64_t offset_blocks, uint64_t num_blocks, 3678 spdk_bdev_io_completion_cb cb, void *cb_arg) 3679 { 3680 struct spdk_bdev *bdev = spdk_bdev_desc_get_bdev(desc); 3681 struct spdk_bdev_io *bdev_io; 3682 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 3683 3684 if (!bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) { 3685 return -EINVAL; 3686 } 3687 3688 bdev_io = bdev_channel_get_io(channel); 3689 if (!bdev_io) { 3690 return -ENOMEM; 3691 } 3692 3693 bdev_io->internal.ch = channel; 3694 bdev_io->internal.desc = desc; 3695 bdev_io->type = SPDK_BDEV_IO_TYPE_COMPARE; 3696 bdev_io->u.bdev.iovs = &bdev_io->iov; 3697 bdev_io->u.bdev.iovs[0].iov_base = buf; 3698 bdev_io->u.bdev.iovs[0].iov_len = num_blocks * bdev->blocklen; 3699 bdev_io->u.bdev.iovcnt = 1; 3700 bdev_io->u.bdev.md_buf = md_buf; 3701 bdev_io->u.bdev.num_blocks = num_blocks; 3702 bdev_io->u.bdev.offset_blocks = offset_blocks; 3703 bdev_io_init(bdev_io, bdev, cb_arg, cb); 3704 3705 if (bdev_io_type_supported(bdev, SPDK_BDEV_IO_TYPE_COMPARE)) { 3706 bdev_io_submit(bdev_io); 3707 return 0; 3708 } 3709 3710 bdev_compare_do_read(bdev_io); 3711 3712 return 0; 3713 } 3714 3715 int 3716 spdk_bdev_compare_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 3717 void *buf, uint64_t offset_blocks, uint64_t num_blocks, 3718 spdk_bdev_io_completion_cb cb, void *cb_arg) 3719 { 3720 return bdev_compare_blocks_with_md(desc, ch, buf, NULL, offset_blocks, num_blocks, 3721 cb, cb_arg); 3722 } 3723 3724 int 3725 spdk_bdev_compare_blocks_with_md(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 3726 void *buf, void *md_buf, uint64_t offset_blocks, uint64_t num_blocks, 3727 spdk_bdev_io_completion_cb cb, void *cb_arg) 3728 { 3729 struct iovec iov = { 3730 .iov_base = buf, 3731 }; 3732 3733 if (!spdk_bdev_is_md_separate(spdk_bdev_desc_get_bdev(desc))) { 3734 return -EINVAL; 3735 } 3736 3737 if (!_bdev_io_check_md_buf(&iov, md_buf)) { 3738 return -EINVAL; 3739 } 3740 3741 return bdev_compare_blocks_with_md(desc, ch, buf, md_buf, offset_blocks, num_blocks, 3742 cb, cb_arg); 3743 } 3744 3745 static void 3746 bdev_comparev_and_writev_blocks_unlocked(void *ctx, int unlock_status) 3747 { 3748 struct spdk_bdev_io *bdev_io = ctx; 3749 3750 if (unlock_status) { 3751 SPDK_ERRLOG("LBA range unlock failed\n"); 3752 } 3753 3754 bdev_io->internal.cb(bdev_io, bdev_io->internal.status == SPDK_BDEV_IO_STATUS_SUCCESS ? true : 3755 false, bdev_io->internal.caller_ctx); 3756 } 3757 3758 static void 3759 bdev_comparev_and_writev_blocks_unlock(struct spdk_bdev_io *bdev_io, int status) 3760 { 3761 bdev_io->internal.status = status; 3762 3763 bdev_unlock_lba_range(bdev_io->internal.desc, spdk_io_channel_from_ctx(bdev_io->internal.ch), 3764 bdev_io->u.bdev.offset_blocks, bdev_io->u.bdev.num_blocks, 3765 bdev_comparev_and_writev_blocks_unlocked, bdev_io); 3766 } 3767 3768 static void 3769 bdev_compare_and_write_do_write_done(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg) 3770 { 3771 struct spdk_bdev_io *parent_io = cb_arg; 3772 3773 if (!success) { 3774 SPDK_ERRLOG("Compare and write operation failed\n"); 3775 } 3776 3777 spdk_bdev_free_io(bdev_io); 3778 3779 bdev_comparev_and_writev_blocks_unlock(parent_io, 3780 success ? SPDK_BDEV_IO_STATUS_SUCCESS : SPDK_BDEV_IO_STATUS_FAILED); 3781 } 3782 3783 static void 3784 bdev_compare_and_write_do_write(void *_bdev_io) 3785 { 3786 struct spdk_bdev_io *bdev_io = _bdev_io; 3787 int rc; 3788 3789 rc = spdk_bdev_writev_blocks(bdev_io->internal.desc, 3790 spdk_io_channel_from_ctx(bdev_io->internal.ch), 3791 bdev_io->u.bdev.fused_iovs, bdev_io->u.bdev.fused_iovcnt, 3792 bdev_io->u.bdev.offset_blocks, bdev_io->u.bdev.num_blocks, 3793 bdev_compare_and_write_do_write_done, bdev_io); 3794 3795 3796 if (rc == -ENOMEM) { 3797 bdev_queue_io_wait_with_cb(bdev_io, bdev_compare_and_write_do_write); 3798 } else if (rc != 0) { 3799 bdev_comparev_and_writev_blocks_unlock(bdev_io, SPDK_BDEV_IO_STATUS_FAILED); 3800 } 3801 } 3802 3803 static void 3804 bdev_compare_and_write_do_compare_done(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg) 3805 { 3806 struct spdk_bdev_io *parent_io = cb_arg; 3807 3808 spdk_bdev_free_io(bdev_io); 3809 3810 if (!success) { 3811 bdev_comparev_and_writev_blocks_unlock(parent_io, SPDK_BDEV_IO_STATUS_MISCOMPARE); 3812 return; 3813 } 3814 3815 bdev_compare_and_write_do_write(parent_io); 3816 } 3817 3818 static void 3819 bdev_compare_and_write_do_compare(void *_bdev_io) 3820 { 3821 struct spdk_bdev_io *bdev_io = _bdev_io; 3822 int rc; 3823 3824 rc = spdk_bdev_comparev_blocks(bdev_io->internal.desc, 3825 spdk_io_channel_from_ctx(bdev_io->internal.ch), bdev_io->u.bdev.iovs, 3826 bdev_io->u.bdev.iovcnt, bdev_io->u.bdev.offset_blocks, bdev_io->u.bdev.num_blocks, 3827 bdev_compare_and_write_do_compare_done, bdev_io); 3828 3829 if (rc == -ENOMEM) { 3830 bdev_queue_io_wait_with_cb(bdev_io, bdev_compare_and_write_do_compare); 3831 } else if (rc != 0) { 3832 bdev_comparev_and_writev_blocks_unlock(bdev_io, SPDK_BDEV_IO_STATUS_FIRST_FUSED_FAILED); 3833 } 3834 } 3835 3836 static void 3837 bdev_comparev_and_writev_blocks_locked(void *ctx, int status) 3838 { 3839 struct spdk_bdev_io *bdev_io = ctx; 3840 3841 if (status) { 3842 bdev_io->internal.status = SPDK_BDEV_IO_STATUS_FIRST_FUSED_FAILED; 3843 bdev_io->internal.cb(bdev_io, false, bdev_io->internal.caller_ctx); 3844 } 3845 3846 bdev_compare_and_write_do_compare(bdev_io); 3847 } 3848 3849 int 3850 spdk_bdev_comparev_and_writev_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 3851 struct iovec *compare_iov, int compare_iovcnt, 3852 struct iovec *write_iov, int write_iovcnt, 3853 uint64_t offset_blocks, uint64_t num_blocks, 3854 spdk_bdev_io_completion_cb cb, void *cb_arg) 3855 { 3856 struct spdk_bdev *bdev = spdk_bdev_desc_get_bdev(desc); 3857 struct spdk_bdev_io *bdev_io; 3858 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 3859 3860 if (!desc->write) { 3861 return -EBADF; 3862 } 3863 3864 if (!bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) { 3865 return -EINVAL; 3866 } 3867 3868 if (num_blocks > bdev->acwu) { 3869 return -EINVAL; 3870 } 3871 3872 bdev_io = bdev_channel_get_io(channel); 3873 if (!bdev_io) { 3874 return -ENOMEM; 3875 } 3876 3877 bdev_io->internal.ch = channel; 3878 bdev_io->internal.desc = desc; 3879 bdev_io->type = SPDK_BDEV_IO_TYPE_COMPARE_AND_WRITE; 3880 bdev_io->u.bdev.iovs = compare_iov; 3881 bdev_io->u.bdev.iovcnt = compare_iovcnt; 3882 bdev_io->u.bdev.fused_iovs = write_iov; 3883 bdev_io->u.bdev.fused_iovcnt = write_iovcnt; 3884 bdev_io->u.bdev.md_buf = NULL; 3885 bdev_io->u.bdev.num_blocks = num_blocks; 3886 bdev_io->u.bdev.offset_blocks = offset_blocks; 3887 bdev_io_init(bdev_io, bdev, cb_arg, cb); 3888 3889 if (bdev_io_type_supported(bdev, SPDK_BDEV_IO_TYPE_COMPARE_AND_WRITE)) { 3890 bdev_io_submit(bdev_io); 3891 return 0; 3892 } 3893 3894 return bdev_lock_lba_range(desc, ch, offset_blocks, num_blocks, 3895 bdev_comparev_and_writev_blocks_locked, bdev_io); 3896 } 3897 3898 static void 3899 bdev_zcopy_get_buf(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io, bool success) 3900 { 3901 if (!success) { 3902 /* Don't use spdk_bdev_io_complete here - this bdev_io was never actually submitted. */ 3903 bdev_io->internal.status = SPDK_BDEV_IO_STATUS_NOMEM; 3904 bdev_io->internal.cb(bdev_io, success, bdev_io->internal.caller_ctx); 3905 return; 3906 } 3907 3908 if (bdev_io->u.bdev.zcopy.populate) { 3909 /* Read the real data into the buffer */ 3910 bdev_io->type = SPDK_BDEV_IO_TYPE_READ; 3911 bdev_io->internal.status = SPDK_BDEV_IO_STATUS_PENDING; 3912 bdev_io_submit(bdev_io); 3913 return; 3914 } 3915 3916 /* Don't use spdk_bdev_io_complete here - this bdev_io was never actually submitted. */ 3917 bdev_io->internal.status = SPDK_BDEV_IO_STATUS_SUCCESS; 3918 bdev_io->internal.cb(bdev_io, success, bdev_io->internal.caller_ctx); 3919 } 3920 3921 int 3922 spdk_bdev_zcopy_start(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 3923 uint64_t offset_blocks, uint64_t num_blocks, 3924 bool populate, 3925 spdk_bdev_io_completion_cb cb, void *cb_arg) 3926 { 3927 struct spdk_bdev *bdev = spdk_bdev_desc_get_bdev(desc); 3928 struct spdk_bdev_io *bdev_io; 3929 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 3930 3931 if (!desc->write) { 3932 return -EBADF; 3933 } 3934 3935 if (!bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) { 3936 return -EINVAL; 3937 } 3938 3939 if (!spdk_bdev_io_type_supported(bdev, SPDK_BDEV_IO_TYPE_ZCOPY)) { 3940 return -ENOTSUP; 3941 } 3942 3943 bdev_io = bdev_channel_get_io(channel); 3944 if (!bdev_io) { 3945 return -ENOMEM; 3946 } 3947 3948 bdev_io->internal.ch = channel; 3949 bdev_io->internal.desc = desc; 3950 bdev_io->type = SPDK_BDEV_IO_TYPE_ZCOPY; 3951 bdev_io->u.bdev.num_blocks = num_blocks; 3952 bdev_io->u.bdev.offset_blocks = offset_blocks; 3953 bdev_io->u.bdev.iovs = NULL; 3954 bdev_io->u.bdev.iovcnt = 0; 3955 bdev_io->u.bdev.md_buf = NULL; 3956 bdev_io->u.bdev.zcopy.populate = populate ? 1 : 0; 3957 bdev_io->u.bdev.zcopy.commit = 0; 3958 bdev_io->u.bdev.zcopy.start = 1; 3959 bdev_io_init(bdev_io, bdev, cb_arg, cb); 3960 3961 if (bdev_io_type_supported(bdev, SPDK_BDEV_IO_TYPE_ZCOPY)) { 3962 bdev_io_submit(bdev_io); 3963 } else { 3964 /* Emulate zcopy by allocating a buffer */ 3965 spdk_bdev_io_get_buf(bdev_io, bdev_zcopy_get_buf, 3966 bdev_io->u.bdev.num_blocks * bdev->blocklen); 3967 } 3968 3969 return 0; 3970 } 3971 3972 int 3973 spdk_bdev_zcopy_end(struct spdk_bdev_io *bdev_io, bool commit, 3974 spdk_bdev_io_completion_cb cb, void *cb_arg) 3975 { 3976 struct spdk_bdev *bdev = bdev_io->bdev; 3977 3978 if (bdev_io->type == SPDK_BDEV_IO_TYPE_READ) { 3979 /* This can happen if the zcopy was emulated in start */ 3980 if (bdev_io->u.bdev.zcopy.start != 1) { 3981 return -EINVAL; 3982 } 3983 bdev_io->type = SPDK_BDEV_IO_TYPE_ZCOPY; 3984 } 3985 3986 if (bdev_io->type != SPDK_BDEV_IO_TYPE_ZCOPY) { 3987 return -EINVAL; 3988 } 3989 3990 bdev_io->u.bdev.zcopy.commit = commit ? 1 : 0; 3991 bdev_io->u.bdev.zcopy.start = 0; 3992 bdev_io->internal.caller_ctx = cb_arg; 3993 bdev_io->internal.cb = cb; 3994 bdev_io->internal.status = SPDK_BDEV_IO_STATUS_PENDING; 3995 3996 if (bdev_io_type_supported(bdev, SPDK_BDEV_IO_TYPE_ZCOPY)) { 3997 bdev_io_submit(bdev_io); 3998 return 0; 3999 } 4000 4001 if (!bdev_io->u.bdev.zcopy.commit) { 4002 /* Don't use spdk_bdev_io_complete here - this bdev_io was never actually submitted. */ 4003 bdev_io->internal.status = SPDK_BDEV_IO_STATUS_SUCCESS; 4004 bdev_io->internal.cb(bdev_io, true, bdev_io->internal.caller_ctx); 4005 return 0; 4006 } 4007 4008 bdev_io->type = SPDK_BDEV_IO_TYPE_WRITE; 4009 bdev_io_submit(bdev_io); 4010 4011 return 0; 4012 } 4013 4014 int 4015 spdk_bdev_write_zeroes(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 4016 uint64_t offset, uint64_t len, 4017 spdk_bdev_io_completion_cb cb, void *cb_arg) 4018 { 4019 uint64_t offset_blocks, num_blocks; 4020 4021 if (bdev_bytes_to_blocks(spdk_bdev_desc_get_bdev(desc), offset, &offset_blocks, 4022 len, &num_blocks) != 0) { 4023 return -EINVAL; 4024 } 4025 4026 return spdk_bdev_write_zeroes_blocks(desc, ch, offset_blocks, num_blocks, cb, cb_arg); 4027 } 4028 4029 int 4030 spdk_bdev_write_zeroes_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 4031 uint64_t offset_blocks, uint64_t num_blocks, 4032 spdk_bdev_io_completion_cb cb, void *cb_arg) 4033 { 4034 struct spdk_bdev *bdev = spdk_bdev_desc_get_bdev(desc); 4035 struct spdk_bdev_io *bdev_io; 4036 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 4037 4038 if (!desc->write) { 4039 return -EBADF; 4040 } 4041 4042 if (!bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) { 4043 return -EINVAL; 4044 } 4045 4046 if (!bdev_io_type_supported(bdev, SPDK_BDEV_IO_TYPE_WRITE_ZEROES) && 4047 !bdev_io_type_supported(bdev, SPDK_BDEV_IO_TYPE_WRITE)) { 4048 return -ENOTSUP; 4049 } 4050 4051 bdev_io = bdev_channel_get_io(channel); 4052 4053 if (!bdev_io) { 4054 return -ENOMEM; 4055 } 4056 4057 bdev_io->type = SPDK_BDEV_IO_TYPE_WRITE_ZEROES; 4058 bdev_io->internal.ch = channel; 4059 bdev_io->internal.desc = desc; 4060 bdev_io->u.bdev.offset_blocks = offset_blocks; 4061 bdev_io->u.bdev.num_blocks = num_blocks; 4062 bdev_io_init(bdev_io, bdev, cb_arg, cb); 4063 4064 if (bdev_io_type_supported(bdev, SPDK_BDEV_IO_TYPE_WRITE_ZEROES)) { 4065 bdev_io_submit(bdev_io); 4066 return 0; 4067 } 4068 4069 assert(bdev_io_type_supported(bdev, SPDK_BDEV_IO_TYPE_WRITE)); 4070 assert(_bdev_get_block_size_with_md(bdev) <= ZERO_BUFFER_SIZE); 4071 bdev_io->u.bdev.split_remaining_num_blocks = num_blocks; 4072 bdev_io->u.bdev.split_current_offset_blocks = offset_blocks; 4073 bdev_write_zero_buffer_next(bdev_io); 4074 4075 return 0; 4076 } 4077 4078 int 4079 spdk_bdev_unmap(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 4080 uint64_t offset, uint64_t nbytes, 4081 spdk_bdev_io_completion_cb cb, void *cb_arg) 4082 { 4083 uint64_t offset_blocks, num_blocks; 4084 4085 if (bdev_bytes_to_blocks(spdk_bdev_desc_get_bdev(desc), offset, &offset_blocks, 4086 nbytes, &num_blocks) != 0) { 4087 return -EINVAL; 4088 } 4089 4090 return spdk_bdev_unmap_blocks(desc, ch, offset_blocks, num_blocks, cb, cb_arg); 4091 } 4092 4093 int 4094 spdk_bdev_unmap_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 4095 uint64_t offset_blocks, uint64_t num_blocks, 4096 spdk_bdev_io_completion_cb cb, void *cb_arg) 4097 { 4098 struct spdk_bdev *bdev = spdk_bdev_desc_get_bdev(desc); 4099 struct spdk_bdev_io *bdev_io; 4100 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 4101 4102 if (!desc->write) { 4103 return -EBADF; 4104 } 4105 4106 if (!bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) { 4107 return -EINVAL; 4108 } 4109 4110 if (num_blocks == 0) { 4111 SPDK_ERRLOG("Can't unmap 0 bytes\n"); 4112 return -EINVAL; 4113 } 4114 4115 bdev_io = bdev_channel_get_io(channel); 4116 if (!bdev_io) { 4117 return -ENOMEM; 4118 } 4119 4120 bdev_io->internal.ch = channel; 4121 bdev_io->internal.desc = desc; 4122 bdev_io->type = SPDK_BDEV_IO_TYPE_UNMAP; 4123 4124 bdev_io->u.bdev.iovs = &bdev_io->iov; 4125 bdev_io->u.bdev.iovs[0].iov_base = NULL; 4126 bdev_io->u.bdev.iovs[0].iov_len = 0; 4127 bdev_io->u.bdev.iovcnt = 1; 4128 4129 bdev_io->u.bdev.offset_blocks = offset_blocks; 4130 bdev_io->u.bdev.num_blocks = num_blocks; 4131 bdev_io_init(bdev_io, bdev, cb_arg, cb); 4132 4133 bdev_io_submit(bdev_io); 4134 return 0; 4135 } 4136 4137 int 4138 spdk_bdev_flush(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 4139 uint64_t offset, uint64_t length, 4140 spdk_bdev_io_completion_cb cb, void *cb_arg) 4141 { 4142 uint64_t offset_blocks, num_blocks; 4143 4144 if (bdev_bytes_to_blocks(spdk_bdev_desc_get_bdev(desc), offset, &offset_blocks, 4145 length, &num_blocks) != 0) { 4146 return -EINVAL; 4147 } 4148 4149 return spdk_bdev_flush_blocks(desc, ch, offset_blocks, num_blocks, cb, cb_arg); 4150 } 4151 4152 int 4153 spdk_bdev_flush_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 4154 uint64_t offset_blocks, uint64_t num_blocks, 4155 spdk_bdev_io_completion_cb cb, void *cb_arg) 4156 { 4157 struct spdk_bdev *bdev = spdk_bdev_desc_get_bdev(desc); 4158 struct spdk_bdev_io *bdev_io; 4159 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 4160 4161 if (!desc->write) { 4162 return -EBADF; 4163 } 4164 4165 if (!bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) { 4166 return -EINVAL; 4167 } 4168 4169 bdev_io = bdev_channel_get_io(channel); 4170 if (!bdev_io) { 4171 return -ENOMEM; 4172 } 4173 4174 bdev_io->internal.ch = channel; 4175 bdev_io->internal.desc = desc; 4176 bdev_io->type = SPDK_BDEV_IO_TYPE_FLUSH; 4177 bdev_io->u.bdev.iovs = NULL; 4178 bdev_io->u.bdev.iovcnt = 0; 4179 bdev_io->u.bdev.offset_blocks = offset_blocks; 4180 bdev_io->u.bdev.num_blocks = num_blocks; 4181 bdev_io_init(bdev_io, bdev, cb_arg, cb); 4182 4183 bdev_io_submit(bdev_io); 4184 return 0; 4185 } 4186 4187 static void 4188 bdev_reset_dev(struct spdk_io_channel_iter *i, int status) 4189 { 4190 struct spdk_bdev_channel *ch = spdk_io_channel_iter_get_ctx(i); 4191 struct spdk_bdev_io *bdev_io; 4192 4193 bdev_io = TAILQ_FIRST(&ch->queued_resets); 4194 TAILQ_REMOVE(&ch->queued_resets, bdev_io, internal.link); 4195 bdev_io_submit_reset(bdev_io); 4196 } 4197 4198 static void 4199 bdev_reset_freeze_channel(struct spdk_io_channel_iter *i) 4200 { 4201 struct spdk_io_channel *ch; 4202 struct spdk_bdev_channel *channel; 4203 struct spdk_bdev_mgmt_channel *mgmt_channel; 4204 struct spdk_bdev_shared_resource *shared_resource; 4205 bdev_io_tailq_t tmp_queued; 4206 4207 TAILQ_INIT(&tmp_queued); 4208 4209 ch = spdk_io_channel_iter_get_channel(i); 4210 channel = spdk_io_channel_get_ctx(ch); 4211 shared_resource = channel->shared_resource; 4212 mgmt_channel = shared_resource->mgmt_ch; 4213 4214 channel->flags |= BDEV_CH_RESET_IN_PROGRESS; 4215 4216 if ((channel->flags & BDEV_CH_QOS_ENABLED) != 0) { 4217 /* The QoS object is always valid and readable while 4218 * the channel flag is set, so the lock here should not 4219 * be necessary. We're not in the fast path though, so 4220 * just take it anyway. */ 4221 pthread_mutex_lock(&channel->bdev->internal.mutex); 4222 if (channel->bdev->internal.qos->ch == channel) { 4223 TAILQ_SWAP(&channel->bdev->internal.qos->queued, &tmp_queued, spdk_bdev_io, internal.link); 4224 } 4225 pthread_mutex_unlock(&channel->bdev->internal.mutex); 4226 } 4227 4228 bdev_abort_all_queued_io(&shared_resource->nomem_io, channel); 4229 bdev_abort_all_buf_io(&mgmt_channel->need_buf_small, channel); 4230 bdev_abort_all_buf_io(&mgmt_channel->need_buf_large, channel); 4231 bdev_abort_all_queued_io(&tmp_queued, channel); 4232 4233 spdk_for_each_channel_continue(i, 0); 4234 } 4235 4236 static void 4237 bdev_start_reset(void *ctx) 4238 { 4239 struct spdk_bdev_channel *ch = ctx; 4240 4241 spdk_for_each_channel(__bdev_to_io_dev(ch->bdev), bdev_reset_freeze_channel, 4242 ch, bdev_reset_dev); 4243 } 4244 4245 static void 4246 bdev_channel_start_reset(struct spdk_bdev_channel *ch) 4247 { 4248 struct spdk_bdev *bdev = ch->bdev; 4249 4250 assert(!TAILQ_EMPTY(&ch->queued_resets)); 4251 4252 pthread_mutex_lock(&bdev->internal.mutex); 4253 if (bdev->internal.reset_in_progress == NULL) { 4254 bdev->internal.reset_in_progress = TAILQ_FIRST(&ch->queued_resets); 4255 /* 4256 * Take a channel reference for the target bdev for the life of this 4257 * reset. This guards against the channel getting destroyed while 4258 * spdk_for_each_channel() calls related to this reset IO are in 4259 * progress. We will release the reference when this reset is 4260 * completed. 4261 */ 4262 bdev->internal.reset_in_progress->u.reset.ch_ref = spdk_get_io_channel(__bdev_to_io_dev(bdev)); 4263 bdev_start_reset(ch); 4264 } 4265 pthread_mutex_unlock(&bdev->internal.mutex); 4266 } 4267 4268 int 4269 spdk_bdev_reset(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 4270 spdk_bdev_io_completion_cb cb, void *cb_arg) 4271 { 4272 struct spdk_bdev *bdev = spdk_bdev_desc_get_bdev(desc); 4273 struct spdk_bdev_io *bdev_io; 4274 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 4275 4276 bdev_io = bdev_channel_get_io(channel); 4277 if (!bdev_io) { 4278 return -ENOMEM; 4279 } 4280 4281 bdev_io->internal.ch = channel; 4282 bdev_io->internal.desc = desc; 4283 bdev_io->internal.submit_tsc = spdk_get_ticks(); 4284 bdev_io->type = SPDK_BDEV_IO_TYPE_RESET; 4285 bdev_io->u.reset.ch_ref = NULL; 4286 bdev_io_init(bdev_io, bdev, cb_arg, cb); 4287 4288 pthread_mutex_lock(&bdev->internal.mutex); 4289 TAILQ_INSERT_TAIL(&channel->queued_resets, bdev_io, internal.link); 4290 pthread_mutex_unlock(&bdev->internal.mutex); 4291 4292 TAILQ_INSERT_TAIL(&bdev_io->internal.ch->io_submitted, bdev_io, 4293 internal.ch_link); 4294 4295 bdev_channel_start_reset(channel); 4296 4297 return 0; 4298 } 4299 4300 void 4301 spdk_bdev_get_io_stat(struct spdk_bdev *bdev, struct spdk_io_channel *ch, 4302 struct spdk_bdev_io_stat *stat) 4303 { 4304 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 4305 4306 *stat = channel->stat; 4307 } 4308 4309 static void 4310 bdev_get_device_stat_done(struct spdk_io_channel_iter *i, int status) 4311 { 4312 void *io_device = spdk_io_channel_iter_get_io_device(i); 4313 struct spdk_bdev_iostat_ctx *bdev_iostat_ctx = spdk_io_channel_iter_get_ctx(i); 4314 4315 bdev_iostat_ctx->cb(__bdev_from_io_dev(io_device), bdev_iostat_ctx->stat, 4316 bdev_iostat_ctx->cb_arg, 0); 4317 free(bdev_iostat_ctx); 4318 } 4319 4320 static void 4321 bdev_get_each_channel_stat(struct spdk_io_channel_iter *i) 4322 { 4323 struct spdk_bdev_iostat_ctx *bdev_iostat_ctx = spdk_io_channel_iter_get_ctx(i); 4324 struct spdk_io_channel *ch = spdk_io_channel_iter_get_channel(i); 4325 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 4326 4327 bdev_io_stat_add(bdev_iostat_ctx->stat, &channel->stat); 4328 spdk_for_each_channel_continue(i, 0); 4329 } 4330 4331 void 4332 spdk_bdev_get_device_stat(struct spdk_bdev *bdev, struct spdk_bdev_io_stat *stat, 4333 spdk_bdev_get_device_stat_cb cb, void *cb_arg) 4334 { 4335 struct spdk_bdev_iostat_ctx *bdev_iostat_ctx; 4336 4337 assert(bdev != NULL); 4338 assert(stat != NULL); 4339 assert(cb != NULL); 4340 4341 bdev_iostat_ctx = calloc(1, sizeof(struct spdk_bdev_iostat_ctx)); 4342 if (bdev_iostat_ctx == NULL) { 4343 SPDK_ERRLOG("Unable to allocate memory for spdk_bdev_iostat_ctx\n"); 4344 cb(bdev, stat, cb_arg, -ENOMEM); 4345 return; 4346 } 4347 4348 bdev_iostat_ctx->stat = stat; 4349 bdev_iostat_ctx->cb = cb; 4350 bdev_iostat_ctx->cb_arg = cb_arg; 4351 4352 /* Start with the statistics from previously deleted channels. */ 4353 pthread_mutex_lock(&bdev->internal.mutex); 4354 bdev_io_stat_add(bdev_iostat_ctx->stat, &bdev->internal.stat); 4355 pthread_mutex_unlock(&bdev->internal.mutex); 4356 4357 /* Then iterate and add the statistics from each existing channel. */ 4358 spdk_for_each_channel(__bdev_to_io_dev(bdev), 4359 bdev_get_each_channel_stat, 4360 bdev_iostat_ctx, 4361 bdev_get_device_stat_done); 4362 } 4363 4364 int 4365 spdk_bdev_nvme_admin_passthru(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 4366 const struct spdk_nvme_cmd *cmd, void *buf, size_t nbytes, 4367 spdk_bdev_io_completion_cb cb, void *cb_arg) 4368 { 4369 struct spdk_bdev *bdev = spdk_bdev_desc_get_bdev(desc); 4370 struct spdk_bdev_io *bdev_io; 4371 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 4372 4373 if (!desc->write) { 4374 return -EBADF; 4375 } 4376 4377 bdev_io = bdev_channel_get_io(channel); 4378 if (!bdev_io) { 4379 return -ENOMEM; 4380 } 4381 4382 bdev_io->internal.ch = channel; 4383 bdev_io->internal.desc = desc; 4384 bdev_io->type = SPDK_BDEV_IO_TYPE_NVME_ADMIN; 4385 bdev_io->u.nvme_passthru.cmd = *cmd; 4386 bdev_io->u.nvme_passthru.buf = buf; 4387 bdev_io->u.nvme_passthru.nbytes = nbytes; 4388 bdev_io->u.nvme_passthru.md_buf = NULL; 4389 bdev_io->u.nvme_passthru.md_len = 0; 4390 4391 bdev_io_init(bdev_io, bdev, cb_arg, cb); 4392 4393 bdev_io_submit(bdev_io); 4394 return 0; 4395 } 4396 4397 int 4398 spdk_bdev_nvme_io_passthru(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 4399 const struct spdk_nvme_cmd *cmd, void *buf, size_t nbytes, 4400 spdk_bdev_io_completion_cb cb, void *cb_arg) 4401 { 4402 struct spdk_bdev *bdev = spdk_bdev_desc_get_bdev(desc); 4403 struct spdk_bdev_io *bdev_io; 4404 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 4405 4406 if (!desc->write) { 4407 /* 4408 * Do not try to parse the NVMe command - we could maybe use bits in the opcode 4409 * to easily determine if the command is a read or write, but for now just 4410 * do not allow io_passthru with a read-only descriptor. 4411 */ 4412 return -EBADF; 4413 } 4414 4415 bdev_io = bdev_channel_get_io(channel); 4416 if (!bdev_io) { 4417 return -ENOMEM; 4418 } 4419 4420 bdev_io->internal.ch = channel; 4421 bdev_io->internal.desc = desc; 4422 bdev_io->type = SPDK_BDEV_IO_TYPE_NVME_IO; 4423 bdev_io->u.nvme_passthru.cmd = *cmd; 4424 bdev_io->u.nvme_passthru.buf = buf; 4425 bdev_io->u.nvme_passthru.nbytes = nbytes; 4426 bdev_io->u.nvme_passthru.md_buf = NULL; 4427 bdev_io->u.nvme_passthru.md_len = 0; 4428 4429 bdev_io_init(bdev_io, bdev, cb_arg, cb); 4430 4431 bdev_io_submit(bdev_io); 4432 return 0; 4433 } 4434 4435 int 4436 spdk_bdev_nvme_io_passthru_md(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 4437 const struct spdk_nvme_cmd *cmd, void *buf, size_t nbytes, void *md_buf, size_t md_len, 4438 spdk_bdev_io_completion_cb cb, void *cb_arg) 4439 { 4440 struct spdk_bdev *bdev = spdk_bdev_desc_get_bdev(desc); 4441 struct spdk_bdev_io *bdev_io; 4442 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 4443 4444 if (!desc->write) { 4445 /* 4446 * Do not try to parse the NVMe command - we could maybe use bits in the opcode 4447 * to easily determine if the command is a read or write, but for now just 4448 * do not allow io_passthru with a read-only descriptor. 4449 */ 4450 return -EBADF; 4451 } 4452 4453 bdev_io = bdev_channel_get_io(channel); 4454 if (!bdev_io) { 4455 return -ENOMEM; 4456 } 4457 4458 bdev_io->internal.ch = channel; 4459 bdev_io->internal.desc = desc; 4460 bdev_io->type = SPDK_BDEV_IO_TYPE_NVME_IO_MD; 4461 bdev_io->u.nvme_passthru.cmd = *cmd; 4462 bdev_io->u.nvme_passthru.buf = buf; 4463 bdev_io->u.nvme_passthru.nbytes = nbytes; 4464 bdev_io->u.nvme_passthru.md_buf = md_buf; 4465 bdev_io->u.nvme_passthru.md_len = md_len; 4466 4467 bdev_io_init(bdev_io, bdev, cb_arg, cb); 4468 4469 bdev_io_submit(bdev_io); 4470 return 0; 4471 } 4472 4473 static void bdev_abort_retry(void *ctx); 4474 4475 static void 4476 bdev_abort_io_done(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg) 4477 { 4478 struct spdk_bdev_channel *channel = bdev_io->internal.ch; 4479 struct spdk_bdev_io *parent_io = cb_arg; 4480 struct spdk_bdev_io *bio_to_abort, *tmp_io; 4481 4482 bio_to_abort = bdev_io->u.abort.bio_to_abort; 4483 4484 spdk_bdev_free_io(bdev_io); 4485 4486 if (!success) { 4487 /* Check if the target I/O completed in the meantime. */ 4488 TAILQ_FOREACH(tmp_io, &channel->io_submitted, internal.ch_link) { 4489 if (tmp_io == bio_to_abort) { 4490 break; 4491 } 4492 } 4493 4494 /* If the target I/O still exists, set the parent to failed. */ 4495 if (tmp_io != NULL) { 4496 parent_io->internal.status = SPDK_BDEV_IO_STATUS_FAILED; 4497 } 4498 } 4499 4500 parent_io->u.bdev.split_outstanding--; 4501 if (parent_io->u.bdev.split_outstanding == 0) { 4502 if (parent_io->internal.status == SPDK_BDEV_IO_STATUS_NOMEM) { 4503 bdev_abort_retry(parent_io); 4504 } else { 4505 bdev_io_complete(parent_io); 4506 } 4507 } 4508 } 4509 4510 static int 4511 bdev_abort_io(struct spdk_bdev_desc *desc, struct spdk_bdev_channel *channel, 4512 struct spdk_bdev_io *bio_to_abort, 4513 spdk_bdev_io_completion_cb cb, void *cb_arg) 4514 { 4515 struct spdk_bdev *bdev = spdk_bdev_desc_get_bdev(desc); 4516 struct spdk_bdev_io *bdev_io; 4517 4518 if (bio_to_abort->type == SPDK_BDEV_IO_TYPE_ABORT || 4519 bio_to_abort->type == SPDK_BDEV_IO_TYPE_RESET) { 4520 /* TODO: Abort reset or abort request. */ 4521 return -ENOTSUP; 4522 } 4523 4524 if (bdev->split_on_optimal_io_boundary && bdev_io_should_split(bio_to_abort)) { 4525 return -ENOTSUP; 4526 } 4527 4528 bdev_io = bdev_channel_get_io(channel); 4529 if (bdev_io == NULL) { 4530 return -ENOMEM; 4531 } 4532 4533 bdev_io->internal.ch = channel; 4534 bdev_io->internal.desc = desc; 4535 bdev_io->type = SPDK_BDEV_IO_TYPE_ABORT; 4536 bdev_io_init(bdev_io, bdev, cb_arg, cb); 4537 4538 bdev_io->u.abort.bio_to_abort = bio_to_abort; 4539 4540 /* Submit the abort request to the underlying bdev module. */ 4541 bdev_io_submit(bdev_io); 4542 4543 return 0; 4544 } 4545 4546 static uint32_t 4547 _bdev_abort(struct spdk_bdev_io *parent_io) 4548 { 4549 struct spdk_bdev_desc *desc = parent_io->internal.desc; 4550 struct spdk_bdev_channel *channel = parent_io->internal.ch; 4551 void *bio_cb_arg; 4552 struct spdk_bdev_io *bio_to_abort; 4553 uint32_t matched_ios; 4554 int rc; 4555 4556 bio_cb_arg = parent_io->u.bdev.abort.bio_cb_arg; 4557 4558 /* matched_ios is returned and will be kept by the caller. 4559 * 4560 * This funcion will be used for two cases, 1) the same cb_arg is used for 4561 * multiple I/Os, 2) a single large I/O is split into smaller ones. 4562 * Incrementing split_outstanding directly here may confuse readers especially 4563 * for the 1st case. 4564 * 4565 * Completion of I/O abort is processed after stack unwinding. Hence this trick 4566 * works as expected. 4567 */ 4568 matched_ios = 0; 4569 parent_io->internal.status = SPDK_BDEV_IO_STATUS_SUCCESS; 4570 4571 TAILQ_FOREACH(bio_to_abort, &channel->io_submitted, internal.ch_link) { 4572 if (bio_to_abort->internal.caller_ctx != bio_cb_arg) { 4573 continue; 4574 } 4575 4576 if (bio_to_abort->internal.submit_tsc > parent_io->internal.submit_tsc) { 4577 /* Any I/O which was submitted after this abort command should be excluded. */ 4578 continue; 4579 } 4580 4581 rc = bdev_abort_io(desc, channel, bio_to_abort, bdev_abort_io_done, parent_io); 4582 if (rc != 0) { 4583 if (rc == -ENOMEM) { 4584 parent_io->internal.status = SPDK_BDEV_IO_STATUS_NOMEM; 4585 } else { 4586 parent_io->internal.status = SPDK_BDEV_IO_STATUS_FAILED; 4587 } 4588 break; 4589 } 4590 matched_ios++; 4591 } 4592 4593 return matched_ios; 4594 } 4595 4596 static void 4597 bdev_abort_retry(void *ctx) 4598 { 4599 struct spdk_bdev_io *parent_io = ctx; 4600 uint32_t matched_ios; 4601 4602 matched_ios = _bdev_abort(parent_io); 4603 4604 if (matched_ios == 0) { 4605 if (parent_io->internal.status == SPDK_BDEV_IO_STATUS_NOMEM) { 4606 bdev_queue_io_wait_with_cb(parent_io, bdev_abort_retry); 4607 } else { 4608 /* For retry, the case that no target I/O was found is success 4609 * because it means target I/Os completed in the meantime. 4610 */ 4611 bdev_io_complete(parent_io); 4612 } 4613 return; 4614 } 4615 4616 /* Use split_outstanding to manage the progress of aborting I/Os. */ 4617 parent_io->u.bdev.split_outstanding = matched_ios; 4618 } 4619 4620 static void 4621 bdev_abort(struct spdk_bdev_io *parent_io) 4622 { 4623 uint32_t matched_ios; 4624 4625 matched_ios = _bdev_abort(parent_io); 4626 4627 if (matched_ios == 0) { 4628 if (parent_io->internal.status == SPDK_BDEV_IO_STATUS_NOMEM) { 4629 bdev_queue_io_wait_with_cb(parent_io, bdev_abort_retry); 4630 } else { 4631 /* The case the no target I/O was found is failure. */ 4632 parent_io->internal.status = SPDK_BDEV_IO_STATUS_FAILED; 4633 bdev_io_complete(parent_io); 4634 } 4635 return; 4636 } 4637 4638 /* Use split_outstanding to manage the progress of aborting I/Os. */ 4639 parent_io->u.bdev.split_outstanding = matched_ios; 4640 } 4641 4642 int 4643 spdk_bdev_abort(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 4644 void *bio_cb_arg, 4645 spdk_bdev_io_completion_cb cb, void *cb_arg) 4646 { 4647 struct spdk_bdev *bdev = spdk_bdev_desc_get_bdev(desc); 4648 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 4649 struct spdk_bdev_io *bdev_io; 4650 4651 if (bio_cb_arg == NULL) { 4652 return -EINVAL; 4653 } 4654 4655 if (!spdk_bdev_io_type_supported(bdev, SPDK_BDEV_IO_TYPE_ABORT)) { 4656 return -ENOTSUP; 4657 } 4658 4659 bdev_io = bdev_channel_get_io(channel); 4660 if (bdev_io == NULL) { 4661 return -ENOMEM; 4662 } 4663 4664 bdev_io->internal.ch = channel; 4665 bdev_io->internal.desc = desc; 4666 bdev_io->internal.submit_tsc = spdk_get_ticks(); 4667 bdev_io->type = SPDK_BDEV_IO_TYPE_ABORT; 4668 bdev_io_init(bdev_io, bdev, cb_arg, cb); 4669 4670 bdev_io->u.bdev.abort.bio_cb_arg = bio_cb_arg; 4671 4672 /* Parent abort request is not submitted directly, but to manage its execution, 4673 * add it to the submitted list here. 4674 */ 4675 TAILQ_INSERT_TAIL(&channel->io_submitted, bdev_io, internal.ch_link); 4676 4677 bdev_abort(bdev_io); 4678 4679 return 0; 4680 } 4681 4682 int 4683 spdk_bdev_queue_io_wait(struct spdk_bdev *bdev, struct spdk_io_channel *ch, 4684 struct spdk_bdev_io_wait_entry *entry) 4685 { 4686 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 4687 struct spdk_bdev_mgmt_channel *mgmt_ch = channel->shared_resource->mgmt_ch; 4688 4689 if (bdev != entry->bdev) { 4690 SPDK_ERRLOG("bdevs do not match\n"); 4691 return -EINVAL; 4692 } 4693 4694 if (mgmt_ch->per_thread_cache_count > 0) { 4695 SPDK_ERRLOG("Cannot queue io_wait if spdk_bdev_io available in per-thread cache\n"); 4696 return -EINVAL; 4697 } 4698 4699 TAILQ_INSERT_TAIL(&mgmt_ch->io_wait_queue, entry, link); 4700 return 0; 4701 } 4702 4703 static void 4704 bdev_ch_retry_io(struct spdk_bdev_channel *bdev_ch) 4705 { 4706 struct spdk_bdev *bdev = bdev_ch->bdev; 4707 struct spdk_bdev_shared_resource *shared_resource = bdev_ch->shared_resource; 4708 struct spdk_bdev_io *bdev_io; 4709 4710 if (shared_resource->io_outstanding > shared_resource->nomem_threshold) { 4711 /* 4712 * Allow some more I/O to complete before retrying the nomem_io queue. 4713 * Some drivers (such as nvme) cannot immediately take a new I/O in 4714 * the context of a completion, because the resources for the I/O are 4715 * not released until control returns to the bdev poller. Also, we 4716 * may require several small I/O to complete before a larger I/O 4717 * (that requires splitting) can be submitted. 4718 */ 4719 return; 4720 } 4721 4722 while (!TAILQ_EMPTY(&shared_resource->nomem_io)) { 4723 bdev_io = TAILQ_FIRST(&shared_resource->nomem_io); 4724 TAILQ_REMOVE(&shared_resource->nomem_io, bdev_io, internal.link); 4725 bdev_io->internal.ch->io_outstanding++; 4726 shared_resource->io_outstanding++; 4727 bdev_io->internal.status = SPDK_BDEV_IO_STATUS_PENDING; 4728 bdev_io->internal.error.nvme.cdw0 = 0; 4729 bdev_io->num_retries++; 4730 bdev->fn_table->submit_request(spdk_bdev_io_get_io_channel(bdev_io), bdev_io); 4731 if (bdev_io->internal.status == SPDK_BDEV_IO_STATUS_NOMEM) { 4732 break; 4733 } 4734 } 4735 } 4736 4737 static inline void 4738 bdev_io_complete(void *ctx) 4739 { 4740 struct spdk_bdev_io *bdev_io = ctx; 4741 struct spdk_bdev_channel *bdev_ch = bdev_io->internal.ch; 4742 uint64_t tsc, tsc_diff; 4743 4744 if (spdk_unlikely(bdev_io->internal.in_submit_request || bdev_io->internal.io_submit_ch)) { 4745 /* 4746 * Send the completion to the thread that originally submitted the I/O, 4747 * which may not be the current thread in the case of QoS. 4748 */ 4749 if (bdev_io->internal.io_submit_ch) { 4750 bdev_io->internal.ch = bdev_io->internal.io_submit_ch; 4751 bdev_io->internal.io_submit_ch = NULL; 4752 } 4753 4754 /* 4755 * Defer completion to avoid potential infinite recursion if the 4756 * user's completion callback issues a new I/O. 4757 */ 4758 spdk_thread_send_msg(spdk_bdev_io_get_thread(bdev_io), 4759 bdev_io_complete, bdev_io); 4760 return; 4761 } 4762 4763 tsc = spdk_get_ticks(); 4764 tsc_diff = tsc - bdev_io->internal.submit_tsc; 4765 spdk_trace_record_tsc(tsc, TRACE_BDEV_IO_DONE, 0, 0, (uintptr_t)bdev_io, 0); 4766 4767 TAILQ_REMOVE(&bdev_ch->io_submitted, bdev_io, internal.ch_link); 4768 4769 if (bdev_io->internal.ch->histogram) { 4770 spdk_histogram_data_tally(bdev_io->internal.ch->histogram, tsc_diff); 4771 } 4772 4773 if (bdev_io->internal.status == SPDK_BDEV_IO_STATUS_SUCCESS) { 4774 switch (bdev_io->type) { 4775 case SPDK_BDEV_IO_TYPE_READ: 4776 bdev_io->internal.ch->stat.bytes_read += bdev_io->u.bdev.num_blocks * bdev_io->bdev->blocklen; 4777 bdev_io->internal.ch->stat.num_read_ops++; 4778 bdev_io->internal.ch->stat.read_latency_ticks += tsc_diff; 4779 break; 4780 case SPDK_BDEV_IO_TYPE_WRITE: 4781 bdev_io->internal.ch->stat.bytes_written += bdev_io->u.bdev.num_blocks * bdev_io->bdev->blocklen; 4782 bdev_io->internal.ch->stat.num_write_ops++; 4783 bdev_io->internal.ch->stat.write_latency_ticks += tsc_diff; 4784 break; 4785 case SPDK_BDEV_IO_TYPE_UNMAP: 4786 bdev_io->internal.ch->stat.bytes_unmapped += bdev_io->u.bdev.num_blocks * bdev_io->bdev->blocklen; 4787 bdev_io->internal.ch->stat.num_unmap_ops++; 4788 bdev_io->internal.ch->stat.unmap_latency_ticks += tsc_diff; 4789 break; 4790 case SPDK_BDEV_IO_TYPE_ZCOPY: 4791 /* Track the data in the start phase only */ 4792 if (bdev_io->u.bdev.zcopy.start) { 4793 if (bdev_io->u.bdev.zcopy.populate) { 4794 bdev_io->internal.ch->stat.bytes_read += 4795 bdev_io->u.bdev.num_blocks * bdev_io->bdev->blocklen; 4796 bdev_io->internal.ch->stat.num_read_ops++; 4797 bdev_io->internal.ch->stat.read_latency_ticks += tsc_diff; 4798 } else { 4799 bdev_io->internal.ch->stat.bytes_written += 4800 bdev_io->u.bdev.num_blocks * bdev_io->bdev->blocklen; 4801 bdev_io->internal.ch->stat.num_write_ops++; 4802 bdev_io->internal.ch->stat.write_latency_ticks += tsc_diff; 4803 } 4804 } 4805 break; 4806 default: 4807 break; 4808 } 4809 } 4810 4811 #ifdef SPDK_CONFIG_VTUNE 4812 uint64_t now_tsc = spdk_get_ticks(); 4813 if (now_tsc > (bdev_io->internal.ch->start_tsc + bdev_io->internal.ch->interval_tsc)) { 4814 uint64_t data[5]; 4815 4816 data[0] = bdev_io->internal.ch->stat.num_read_ops - bdev_io->internal.ch->prev_stat.num_read_ops; 4817 data[1] = bdev_io->internal.ch->stat.bytes_read - bdev_io->internal.ch->prev_stat.bytes_read; 4818 data[2] = bdev_io->internal.ch->stat.num_write_ops - bdev_io->internal.ch->prev_stat.num_write_ops; 4819 data[3] = bdev_io->internal.ch->stat.bytes_written - bdev_io->internal.ch->prev_stat.bytes_written; 4820 data[4] = bdev_io->bdev->fn_table->get_spin_time ? 4821 bdev_io->bdev->fn_table->get_spin_time(spdk_bdev_io_get_io_channel(bdev_io)) : 0; 4822 4823 __itt_metadata_add(g_bdev_mgr.domain, __itt_null, bdev_io->internal.ch->handle, 4824 __itt_metadata_u64, 5, data); 4825 4826 bdev_io->internal.ch->prev_stat = bdev_io->internal.ch->stat; 4827 bdev_io->internal.ch->start_tsc = now_tsc; 4828 } 4829 #endif 4830 4831 assert(bdev_io->internal.cb != NULL); 4832 assert(spdk_get_thread() == spdk_bdev_io_get_thread(bdev_io)); 4833 4834 bdev_io->internal.cb(bdev_io, bdev_io->internal.status == SPDK_BDEV_IO_STATUS_SUCCESS, 4835 bdev_io->internal.caller_ctx); 4836 } 4837 4838 static void 4839 bdev_reset_complete(struct spdk_io_channel_iter *i, int status) 4840 { 4841 struct spdk_bdev_io *bdev_io = spdk_io_channel_iter_get_ctx(i); 4842 4843 if (bdev_io->u.reset.ch_ref != NULL) { 4844 spdk_put_io_channel(bdev_io->u.reset.ch_ref); 4845 bdev_io->u.reset.ch_ref = NULL; 4846 } 4847 4848 bdev_io_complete(bdev_io); 4849 } 4850 4851 static void 4852 bdev_unfreeze_channel(struct spdk_io_channel_iter *i) 4853 { 4854 struct spdk_bdev_io *bdev_io = spdk_io_channel_iter_get_ctx(i); 4855 struct spdk_io_channel *_ch = spdk_io_channel_iter_get_channel(i); 4856 struct spdk_bdev_channel *ch = spdk_io_channel_get_ctx(_ch); 4857 struct spdk_bdev_io *queued_reset; 4858 4859 ch->flags &= ~BDEV_CH_RESET_IN_PROGRESS; 4860 while (!TAILQ_EMPTY(&ch->queued_resets)) { 4861 queued_reset = TAILQ_FIRST(&ch->queued_resets); 4862 TAILQ_REMOVE(&ch->queued_resets, queued_reset, internal.link); 4863 spdk_bdev_io_complete(queued_reset, bdev_io->internal.status); 4864 } 4865 4866 spdk_for_each_channel_continue(i, 0); 4867 } 4868 4869 void 4870 spdk_bdev_io_complete(struct spdk_bdev_io *bdev_io, enum spdk_bdev_io_status status) 4871 { 4872 struct spdk_bdev *bdev = bdev_io->bdev; 4873 struct spdk_bdev_channel *bdev_ch = bdev_io->internal.ch; 4874 struct spdk_bdev_shared_resource *shared_resource = bdev_ch->shared_resource; 4875 4876 bdev_io->internal.status = status; 4877 4878 if (spdk_unlikely(bdev_io->type == SPDK_BDEV_IO_TYPE_RESET)) { 4879 bool unlock_channels = false; 4880 4881 if (status == SPDK_BDEV_IO_STATUS_NOMEM) { 4882 SPDK_ERRLOG("NOMEM returned for reset\n"); 4883 } 4884 pthread_mutex_lock(&bdev->internal.mutex); 4885 if (bdev_io == bdev->internal.reset_in_progress) { 4886 bdev->internal.reset_in_progress = NULL; 4887 unlock_channels = true; 4888 } 4889 pthread_mutex_unlock(&bdev->internal.mutex); 4890 4891 if (unlock_channels) { 4892 spdk_for_each_channel(__bdev_to_io_dev(bdev), bdev_unfreeze_channel, 4893 bdev_io, bdev_reset_complete); 4894 return; 4895 } 4896 } else { 4897 _bdev_io_unset_bounce_buf(bdev_io); 4898 4899 assert(bdev_ch->io_outstanding > 0); 4900 assert(shared_resource->io_outstanding > 0); 4901 bdev_ch->io_outstanding--; 4902 shared_resource->io_outstanding--; 4903 4904 if (spdk_unlikely(status == SPDK_BDEV_IO_STATUS_NOMEM)) { 4905 TAILQ_INSERT_HEAD(&shared_resource->nomem_io, bdev_io, internal.link); 4906 /* 4907 * Wait for some of the outstanding I/O to complete before we 4908 * retry any of the nomem_io. Normally we will wait for 4909 * NOMEM_THRESHOLD_COUNT I/O to complete but for low queue 4910 * depth channels we will instead wait for half to complete. 4911 */ 4912 shared_resource->nomem_threshold = spdk_max((int64_t)shared_resource->io_outstanding / 2, 4913 (int64_t)shared_resource->io_outstanding - NOMEM_THRESHOLD_COUNT); 4914 return; 4915 } 4916 4917 if (spdk_unlikely(!TAILQ_EMPTY(&shared_resource->nomem_io))) { 4918 bdev_ch_retry_io(bdev_ch); 4919 } 4920 } 4921 4922 bdev_io_complete(bdev_io); 4923 } 4924 4925 void 4926 spdk_bdev_io_complete_scsi_status(struct spdk_bdev_io *bdev_io, enum spdk_scsi_status sc, 4927 enum spdk_scsi_sense sk, uint8_t asc, uint8_t ascq) 4928 { 4929 if (sc == SPDK_SCSI_STATUS_GOOD) { 4930 bdev_io->internal.status = SPDK_BDEV_IO_STATUS_SUCCESS; 4931 } else { 4932 bdev_io->internal.status = SPDK_BDEV_IO_STATUS_SCSI_ERROR; 4933 bdev_io->internal.error.scsi.sc = sc; 4934 bdev_io->internal.error.scsi.sk = sk; 4935 bdev_io->internal.error.scsi.asc = asc; 4936 bdev_io->internal.error.scsi.ascq = ascq; 4937 } 4938 4939 spdk_bdev_io_complete(bdev_io, bdev_io->internal.status); 4940 } 4941 4942 void 4943 spdk_bdev_io_get_scsi_status(const struct spdk_bdev_io *bdev_io, 4944 int *sc, int *sk, int *asc, int *ascq) 4945 { 4946 assert(sc != NULL); 4947 assert(sk != NULL); 4948 assert(asc != NULL); 4949 assert(ascq != NULL); 4950 4951 switch (bdev_io->internal.status) { 4952 case SPDK_BDEV_IO_STATUS_SUCCESS: 4953 *sc = SPDK_SCSI_STATUS_GOOD; 4954 *sk = SPDK_SCSI_SENSE_NO_SENSE; 4955 *asc = SPDK_SCSI_ASC_NO_ADDITIONAL_SENSE; 4956 *ascq = SPDK_SCSI_ASCQ_CAUSE_NOT_REPORTABLE; 4957 break; 4958 case SPDK_BDEV_IO_STATUS_NVME_ERROR: 4959 spdk_scsi_nvme_translate(bdev_io, sc, sk, asc, ascq); 4960 break; 4961 case SPDK_BDEV_IO_STATUS_SCSI_ERROR: 4962 *sc = bdev_io->internal.error.scsi.sc; 4963 *sk = bdev_io->internal.error.scsi.sk; 4964 *asc = bdev_io->internal.error.scsi.asc; 4965 *ascq = bdev_io->internal.error.scsi.ascq; 4966 break; 4967 default: 4968 *sc = SPDK_SCSI_STATUS_CHECK_CONDITION; 4969 *sk = SPDK_SCSI_SENSE_ABORTED_COMMAND; 4970 *asc = SPDK_SCSI_ASC_NO_ADDITIONAL_SENSE; 4971 *ascq = SPDK_SCSI_ASCQ_CAUSE_NOT_REPORTABLE; 4972 break; 4973 } 4974 } 4975 4976 void 4977 spdk_bdev_io_complete_nvme_status(struct spdk_bdev_io *bdev_io, uint32_t cdw0, int sct, int sc) 4978 { 4979 if (sct == SPDK_NVME_SCT_GENERIC && sc == SPDK_NVME_SC_SUCCESS) { 4980 bdev_io->internal.status = SPDK_BDEV_IO_STATUS_SUCCESS; 4981 } else { 4982 bdev_io->internal.status = SPDK_BDEV_IO_STATUS_NVME_ERROR; 4983 } 4984 4985 bdev_io->internal.error.nvme.cdw0 = cdw0; 4986 bdev_io->internal.error.nvme.sct = sct; 4987 bdev_io->internal.error.nvme.sc = sc; 4988 4989 spdk_bdev_io_complete(bdev_io, bdev_io->internal.status); 4990 } 4991 4992 void 4993 spdk_bdev_io_get_nvme_status(const struct spdk_bdev_io *bdev_io, uint32_t *cdw0, int *sct, int *sc) 4994 { 4995 assert(sct != NULL); 4996 assert(sc != NULL); 4997 assert(cdw0 != NULL); 4998 4999 if (bdev_io->internal.status == SPDK_BDEV_IO_STATUS_NVME_ERROR) { 5000 *sct = bdev_io->internal.error.nvme.sct; 5001 *sc = bdev_io->internal.error.nvme.sc; 5002 } else if (bdev_io->internal.status == SPDK_BDEV_IO_STATUS_SUCCESS) { 5003 *sct = SPDK_NVME_SCT_GENERIC; 5004 *sc = SPDK_NVME_SC_SUCCESS; 5005 } else if (bdev_io->internal.status == SPDK_BDEV_IO_STATUS_ABORTED) { 5006 *sct = SPDK_NVME_SCT_GENERIC; 5007 *sc = SPDK_NVME_SC_ABORTED_BY_REQUEST; 5008 } else { 5009 *sct = SPDK_NVME_SCT_GENERIC; 5010 *sc = SPDK_NVME_SC_INTERNAL_DEVICE_ERROR; 5011 } 5012 5013 *cdw0 = bdev_io->internal.error.nvme.cdw0; 5014 } 5015 5016 void 5017 spdk_bdev_io_get_nvme_fused_status(const struct spdk_bdev_io *bdev_io, uint32_t *cdw0, 5018 int *first_sct, int *first_sc, int *second_sct, int *second_sc) 5019 { 5020 assert(first_sct != NULL); 5021 assert(first_sc != NULL); 5022 assert(second_sct != NULL); 5023 assert(second_sc != NULL); 5024 assert(cdw0 != NULL); 5025 5026 if (bdev_io->internal.status == SPDK_BDEV_IO_STATUS_NVME_ERROR) { 5027 if (bdev_io->internal.error.nvme.sct == SPDK_NVME_SCT_MEDIA_ERROR && 5028 bdev_io->internal.error.nvme.sc == SPDK_NVME_SC_COMPARE_FAILURE) { 5029 *first_sct = bdev_io->internal.error.nvme.sct; 5030 *first_sc = bdev_io->internal.error.nvme.sc; 5031 *second_sct = SPDK_NVME_SCT_GENERIC; 5032 *second_sc = SPDK_NVME_SC_ABORTED_FAILED_FUSED; 5033 } else { 5034 *first_sct = SPDK_NVME_SCT_GENERIC; 5035 *first_sc = SPDK_NVME_SC_SUCCESS; 5036 *second_sct = bdev_io->internal.error.nvme.sct; 5037 *second_sc = bdev_io->internal.error.nvme.sc; 5038 } 5039 } else if (bdev_io->internal.status == SPDK_BDEV_IO_STATUS_SUCCESS) { 5040 *first_sct = SPDK_NVME_SCT_GENERIC; 5041 *first_sc = SPDK_NVME_SC_SUCCESS; 5042 *second_sct = SPDK_NVME_SCT_GENERIC; 5043 *second_sc = SPDK_NVME_SC_SUCCESS; 5044 } else if (bdev_io->internal.status == SPDK_BDEV_IO_STATUS_FIRST_FUSED_FAILED) { 5045 *first_sct = SPDK_NVME_SCT_GENERIC; 5046 *first_sc = SPDK_NVME_SC_INTERNAL_DEVICE_ERROR; 5047 *second_sct = SPDK_NVME_SCT_GENERIC; 5048 *second_sc = SPDK_NVME_SC_ABORTED_FAILED_FUSED; 5049 } else if (bdev_io->internal.status == SPDK_BDEV_IO_STATUS_MISCOMPARE) { 5050 *first_sct = SPDK_NVME_SCT_MEDIA_ERROR; 5051 *first_sc = SPDK_NVME_SC_COMPARE_FAILURE; 5052 *second_sct = SPDK_NVME_SCT_GENERIC; 5053 *second_sc = SPDK_NVME_SC_ABORTED_FAILED_FUSED; 5054 } else { 5055 *first_sct = SPDK_NVME_SCT_GENERIC; 5056 *first_sc = SPDK_NVME_SC_INTERNAL_DEVICE_ERROR; 5057 *second_sct = SPDK_NVME_SCT_GENERIC; 5058 *second_sc = SPDK_NVME_SC_INTERNAL_DEVICE_ERROR; 5059 } 5060 5061 *cdw0 = bdev_io->internal.error.nvme.cdw0; 5062 } 5063 5064 struct spdk_thread * 5065 spdk_bdev_io_get_thread(struct spdk_bdev_io *bdev_io) 5066 { 5067 return spdk_io_channel_get_thread(bdev_io->internal.ch->channel); 5068 } 5069 5070 struct spdk_io_channel * 5071 spdk_bdev_io_get_io_channel(struct spdk_bdev_io *bdev_io) 5072 { 5073 return bdev_io->internal.ch->channel; 5074 } 5075 5076 static void 5077 bdev_qos_config_limit(struct spdk_bdev *bdev, uint64_t *limits) 5078 { 5079 uint64_t min_qos_set; 5080 int i; 5081 5082 for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) { 5083 if (limits[i] != SPDK_BDEV_QOS_LIMIT_NOT_DEFINED) { 5084 break; 5085 } 5086 } 5087 5088 if (i == SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES) { 5089 SPDK_ERRLOG("Invalid rate limits set.\n"); 5090 return; 5091 } 5092 5093 for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) { 5094 if (limits[i] == SPDK_BDEV_QOS_LIMIT_NOT_DEFINED) { 5095 continue; 5096 } 5097 5098 if (bdev_qos_is_iops_rate_limit(i) == true) { 5099 min_qos_set = SPDK_BDEV_QOS_MIN_IOS_PER_SEC; 5100 } else { 5101 min_qos_set = SPDK_BDEV_QOS_MIN_BYTES_PER_SEC; 5102 } 5103 5104 if (limits[i] == 0 || limits[i] % min_qos_set) { 5105 SPDK_ERRLOG("Assigned limit %" PRIu64 " on bdev %s is not multiple of %" PRIu64 "\n", 5106 limits[i], bdev->name, min_qos_set); 5107 SPDK_ERRLOG("Failed to enable QoS on this bdev %s\n", bdev->name); 5108 return; 5109 } 5110 } 5111 5112 if (!bdev->internal.qos) { 5113 bdev->internal.qos = calloc(1, sizeof(*bdev->internal.qos)); 5114 if (!bdev->internal.qos) { 5115 SPDK_ERRLOG("Unable to allocate memory for QoS tracking\n"); 5116 return; 5117 } 5118 } 5119 5120 for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) { 5121 bdev->internal.qos->rate_limits[i].limit = limits[i]; 5122 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Bdev:%s QoS type:%d set:%lu\n", 5123 bdev->name, i, limits[i]); 5124 } 5125 5126 return; 5127 } 5128 5129 static void 5130 bdev_qos_config(struct spdk_bdev *bdev) 5131 { 5132 struct spdk_conf_section *sp = NULL; 5133 const char *val = NULL; 5134 int i = 0, j = 0; 5135 uint64_t limits[SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES] = {}; 5136 bool config_qos = false; 5137 5138 sp = spdk_conf_find_section(NULL, "QoS"); 5139 if (!sp) { 5140 return; 5141 } 5142 5143 while (j < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES) { 5144 limits[j] = SPDK_BDEV_QOS_LIMIT_NOT_DEFINED; 5145 5146 i = 0; 5147 while (true) { 5148 val = spdk_conf_section_get_nmval(sp, qos_conf_type[j], i, 0); 5149 if (!val) { 5150 break; 5151 } 5152 5153 if (strcmp(bdev->name, val) != 0) { 5154 i++; 5155 continue; 5156 } 5157 5158 val = spdk_conf_section_get_nmval(sp, qos_conf_type[j], i, 1); 5159 if (val) { 5160 if (bdev_qos_is_iops_rate_limit(j) == true) { 5161 limits[j] = strtoull(val, NULL, 10); 5162 } else { 5163 limits[j] = strtoull(val, NULL, 10) * 1024 * 1024; 5164 } 5165 config_qos = true; 5166 } 5167 5168 break; 5169 } 5170 5171 j++; 5172 } 5173 5174 if (config_qos == true) { 5175 bdev_qos_config_limit(bdev, limits); 5176 } 5177 5178 return; 5179 } 5180 5181 static int 5182 bdev_init(struct spdk_bdev *bdev) 5183 { 5184 char *bdev_name; 5185 5186 assert(bdev->module != NULL); 5187 5188 if (!bdev->name) { 5189 SPDK_ERRLOG("Bdev name is NULL\n"); 5190 return -EINVAL; 5191 } 5192 5193 if (!strlen(bdev->name)) { 5194 SPDK_ERRLOG("Bdev name must not be an empty string\n"); 5195 return -EINVAL; 5196 } 5197 5198 if (spdk_bdev_get_by_name(bdev->name)) { 5199 SPDK_ERRLOG("Bdev name:%s already exists\n", bdev->name); 5200 return -EEXIST; 5201 } 5202 5203 /* Users often register their own I/O devices using the bdev name. In 5204 * order to avoid conflicts, prepend bdev_. */ 5205 bdev_name = spdk_sprintf_alloc("bdev_%s", bdev->name); 5206 if (!bdev_name) { 5207 SPDK_ERRLOG("Unable to allocate memory for internal bdev name.\n"); 5208 return -ENOMEM; 5209 } 5210 5211 bdev->internal.status = SPDK_BDEV_STATUS_READY; 5212 bdev->internal.measured_queue_depth = UINT64_MAX; 5213 bdev->internal.claim_module = NULL; 5214 bdev->internal.qd_poller = NULL; 5215 bdev->internal.qos = NULL; 5216 5217 /* If the user didn't specify a uuid, generate one. */ 5218 if (spdk_mem_all_zero(&bdev->uuid, sizeof(bdev->uuid))) { 5219 spdk_uuid_generate(&bdev->uuid); 5220 } 5221 5222 if (spdk_bdev_get_buf_align(bdev) > 1) { 5223 if (bdev->split_on_optimal_io_boundary) { 5224 bdev->optimal_io_boundary = spdk_min(bdev->optimal_io_boundary, 5225 SPDK_BDEV_LARGE_BUF_MAX_SIZE / bdev->blocklen); 5226 } else { 5227 bdev->split_on_optimal_io_boundary = true; 5228 bdev->optimal_io_boundary = SPDK_BDEV_LARGE_BUF_MAX_SIZE / bdev->blocklen; 5229 } 5230 } 5231 5232 /* If the user didn't specify a write unit size, set it to one. */ 5233 if (bdev->write_unit_size == 0) { 5234 bdev->write_unit_size = 1; 5235 } 5236 5237 /* Set ACWU value to 1 if bdev module did not set it (does not support it natively) */ 5238 if (bdev->acwu == 0) { 5239 bdev->acwu = 1; 5240 } 5241 5242 TAILQ_INIT(&bdev->internal.open_descs); 5243 TAILQ_INIT(&bdev->internal.locked_ranges); 5244 TAILQ_INIT(&bdev->internal.pending_locked_ranges); 5245 5246 TAILQ_INIT(&bdev->aliases); 5247 5248 bdev->internal.reset_in_progress = NULL; 5249 5250 bdev_qos_config(bdev); 5251 5252 spdk_io_device_register(__bdev_to_io_dev(bdev), 5253 bdev_channel_create, bdev_channel_destroy, 5254 sizeof(struct spdk_bdev_channel), 5255 bdev_name); 5256 5257 free(bdev_name); 5258 5259 pthread_mutex_init(&bdev->internal.mutex, NULL); 5260 return 0; 5261 } 5262 5263 static void 5264 bdev_destroy_cb(void *io_device) 5265 { 5266 int rc; 5267 struct spdk_bdev *bdev; 5268 spdk_bdev_unregister_cb cb_fn; 5269 void *cb_arg; 5270 5271 bdev = __bdev_from_io_dev(io_device); 5272 cb_fn = bdev->internal.unregister_cb; 5273 cb_arg = bdev->internal.unregister_ctx; 5274 5275 rc = bdev->fn_table->destruct(bdev->ctxt); 5276 if (rc < 0) { 5277 SPDK_ERRLOG("destruct failed\n"); 5278 } 5279 if (rc <= 0 && cb_fn != NULL) { 5280 cb_fn(cb_arg, rc); 5281 } 5282 } 5283 5284 5285 static void 5286 bdev_fini(struct spdk_bdev *bdev) 5287 { 5288 pthread_mutex_destroy(&bdev->internal.mutex); 5289 5290 free(bdev->internal.qos); 5291 5292 spdk_io_device_unregister(__bdev_to_io_dev(bdev), bdev_destroy_cb); 5293 } 5294 5295 static void 5296 bdev_start(struct spdk_bdev *bdev) 5297 { 5298 struct spdk_bdev_module *module; 5299 uint32_t action; 5300 5301 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Inserting bdev %s into list\n", bdev->name); 5302 TAILQ_INSERT_TAIL(&g_bdev_mgr.bdevs, bdev, internal.link); 5303 5304 /* Examine configuration before initializing I/O */ 5305 TAILQ_FOREACH(module, &g_bdev_mgr.bdev_modules, internal.tailq) { 5306 if (module->examine_config && bdev_ok_to_examine(bdev)) { 5307 action = module->internal.action_in_progress; 5308 module->internal.action_in_progress++; 5309 module->examine_config(bdev); 5310 if (action != module->internal.action_in_progress) { 5311 SPDK_ERRLOG("examine_config for module %s did not call spdk_bdev_module_examine_done()\n", 5312 module->name); 5313 } 5314 } 5315 } 5316 5317 if (bdev->internal.claim_module && bdev_ok_to_examine(bdev)) { 5318 if (bdev->internal.claim_module->examine_disk) { 5319 bdev->internal.claim_module->internal.action_in_progress++; 5320 bdev->internal.claim_module->examine_disk(bdev); 5321 } 5322 return; 5323 } 5324 5325 TAILQ_FOREACH(module, &g_bdev_mgr.bdev_modules, internal.tailq) { 5326 if (module->examine_disk && bdev_ok_to_examine(bdev)) { 5327 module->internal.action_in_progress++; 5328 module->examine_disk(bdev); 5329 } 5330 } 5331 } 5332 5333 int 5334 spdk_bdev_register(struct spdk_bdev *bdev) 5335 { 5336 int rc = bdev_init(bdev); 5337 5338 if (rc == 0) { 5339 bdev_start(bdev); 5340 } 5341 5342 spdk_notify_send("bdev_register", spdk_bdev_get_name(bdev)); 5343 return rc; 5344 } 5345 5346 int 5347 spdk_vbdev_register(struct spdk_bdev *vbdev, struct spdk_bdev **base_bdevs, int base_bdev_count) 5348 { 5349 SPDK_ERRLOG("This function is deprecated. Use spdk_bdev_register() instead.\n"); 5350 return spdk_bdev_register(vbdev); 5351 } 5352 5353 void 5354 spdk_bdev_destruct_done(struct spdk_bdev *bdev, int bdeverrno) 5355 { 5356 if (bdev->internal.unregister_cb != NULL) { 5357 bdev->internal.unregister_cb(bdev->internal.unregister_ctx, bdeverrno); 5358 } 5359 } 5360 5361 static void 5362 _remove_notify(void *arg) 5363 { 5364 struct spdk_bdev_desc *desc = arg; 5365 5366 pthread_mutex_lock(&desc->mutex); 5367 desc->refs--; 5368 5369 if (!desc->closed) { 5370 pthread_mutex_unlock(&desc->mutex); 5371 if (desc->callback.open_with_ext) { 5372 desc->callback.event_fn(SPDK_BDEV_EVENT_REMOVE, desc->bdev, desc->callback.ctx); 5373 } else { 5374 desc->callback.remove_fn(desc->callback.ctx); 5375 } 5376 return; 5377 } else if (0 == desc->refs) { 5378 /* This descriptor was closed after this remove_notify message was sent. 5379 * spdk_bdev_close() could not free the descriptor since this message was 5380 * in flight, so we free it now using bdev_desc_free(). 5381 */ 5382 pthread_mutex_unlock(&desc->mutex); 5383 bdev_desc_free(desc); 5384 return; 5385 } 5386 pthread_mutex_unlock(&desc->mutex); 5387 } 5388 5389 /* Must be called while holding bdev->internal.mutex. 5390 * returns: 0 - bdev removed and ready to be destructed. 5391 * -EBUSY - bdev can't be destructed yet. */ 5392 static int 5393 bdev_unregister_unsafe(struct spdk_bdev *bdev) 5394 { 5395 struct spdk_bdev_desc *desc, *tmp; 5396 int rc = 0; 5397 5398 /* Notify each descriptor about hotremoval */ 5399 TAILQ_FOREACH_SAFE(desc, &bdev->internal.open_descs, link, tmp) { 5400 rc = -EBUSY; 5401 pthread_mutex_lock(&desc->mutex); 5402 /* 5403 * Defer invocation of the event_cb to a separate message that will 5404 * run later on its thread. This ensures this context unwinds and 5405 * we don't recursively unregister this bdev again if the event_cb 5406 * immediately closes its descriptor. 5407 */ 5408 desc->refs++; 5409 spdk_thread_send_msg(desc->thread, _remove_notify, desc); 5410 pthread_mutex_unlock(&desc->mutex); 5411 } 5412 5413 /* If there are no descriptors, proceed removing the bdev */ 5414 if (rc == 0) { 5415 TAILQ_REMOVE(&g_bdev_mgr.bdevs, bdev, internal.link); 5416 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Removing bdev %s from list done\n", bdev->name); 5417 spdk_notify_send("bdev_unregister", spdk_bdev_get_name(bdev)); 5418 } 5419 5420 return rc; 5421 } 5422 5423 void 5424 spdk_bdev_unregister(struct spdk_bdev *bdev, spdk_bdev_unregister_cb cb_fn, void *cb_arg) 5425 { 5426 struct spdk_thread *thread; 5427 int rc; 5428 5429 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Removing bdev %s from list\n", bdev->name); 5430 5431 thread = spdk_get_thread(); 5432 if (!thread) { 5433 /* The user called this from a non-SPDK thread. */ 5434 if (cb_fn != NULL) { 5435 cb_fn(cb_arg, -ENOTSUP); 5436 } 5437 return; 5438 } 5439 5440 pthread_mutex_lock(&g_bdev_mgr.mutex); 5441 pthread_mutex_lock(&bdev->internal.mutex); 5442 if (bdev->internal.status == SPDK_BDEV_STATUS_REMOVING) { 5443 pthread_mutex_unlock(&bdev->internal.mutex); 5444 pthread_mutex_unlock(&g_bdev_mgr.mutex); 5445 if (cb_fn) { 5446 cb_fn(cb_arg, -EBUSY); 5447 } 5448 return; 5449 } 5450 5451 bdev->internal.status = SPDK_BDEV_STATUS_REMOVING; 5452 bdev->internal.unregister_cb = cb_fn; 5453 bdev->internal.unregister_ctx = cb_arg; 5454 5455 /* Call under lock. */ 5456 rc = bdev_unregister_unsafe(bdev); 5457 pthread_mutex_unlock(&bdev->internal.mutex); 5458 pthread_mutex_unlock(&g_bdev_mgr.mutex); 5459 5460 if (rc == 0) { 5461 bdev_fini(bdev); 5462 } 5463 } 5464 5465 static void 5466 bdev_dummy_event_cb(void *remove_ctx) 5467 { 5468 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Bdev remove event received with no remove callback specified"); 5469 } 5470 5471 static int 5472 bdev_start_qos(struct spdk_bdev *bdev) 5473 { 5474 struct set_qos_limit_ctx *ctx; 5475 5476 /* Enable QoS */ 5477 if (bdev->internal.qos && bdev->internal.qos->thread == NULL) { 5478 ctx = calloc(1, sizeof(*ctx)); 5479 if (ctx == NULL) { 5480 SPDK_ERRLOG("Failed to allocate memory for QoS context\n"); 5481 return -ENOMEM; 5482 } 5483 ctx->bdev = bdev; 5484 spdk_for_each_channel(__bdev_to_io_dev(bdev), 5485 bdev_enable_qos_msg, ctx, 5486 bdev_enable_qos_done); 5487 } 5488 5489 return 0; 5490 } 5491 5492 static int 5493 bdev_open(struct spdk_bdev *bdev, bool write, struct spdk_bdev_desc *desc) 5494 { 5495 struct spdk_thread *thread; 5496 int rc = 0; 5497 5498 thread = spdk_get_thread(); 5499 if (!thread) { 5500 SPDK_ERRLOG("Cannot open bdev from non-SPDK thread.\n"); 5501 return -ENOTSUP; 5502 } 5503 5504 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Opening descriptor %p for bdev %s on thread %p\n", desc, bdev->name, 5505 spdk_get_thread()); 5506 5507 desc->bdev = bdev; 5508 desc->thread = thread; 5509 desc->write = write; 5510 5511 pthread_mutex_lock(&bdev->internal.mutex); 5512 if (bdev->internal.status == SPDK_BDEV_STATUS_REMOVING) { 5513 pthread_mutex_unlock(&bdev->internal.mutex); 5514 return -ENODEV; 5515 } 5516 5517 if (write && bdev->internal.claim_module) { 5518 SPDK_ERRLOG("Could not open %s - %s module already claimed it\n", 5519 bdev->name, bdev->internal.claim_module->name); 5520 pthread_mutex_unlock(&bdev->internal.mutex); 5521 return -EPERM; 5522 } 5523 5524 rc = bdev_start_qos(bdev); 5525 if (rc != 0) { 5526 SPDK_ERRLOG("Failed to start QoS on bdev %s\n", bdev->name); 5527 pthread_mutex_unlock(&bdev->internal.mutex); 5528 return rc; 5529 } 5530 5531 TAILQ_INSERT_TAIL(&bdev->internal.open_descs, desc, link); 5532 5533 pthread_mutex_unlock(&bdev->internal.mutex); 5534 5535 return 0; 5536 } 5537 5538 int 5539 spdk_bdev_open(struct spdk_bdev *bdev, bool write, spdk_bdev_remove_cb_t remove_cb, 5540 void *remove_ctx, struct spdk_bdev_desc **_desc) 5541 { 5542 struct spdk_bdev_desc *desc; 5543 int rc; 5544 5545 desc = calloc(1, sizeof(*desc)); 5546 if (desc == NULL) { 5547 SPDK_ERRLOG("Failed to allocate memory for bdev descriptor\n"); 5548 return -ENOMEM; 5549 } 5550 5551 if (remove_cb == NULL) { 5552 remove_cb = bdev_dummy_event_cb; 5553 } 5554 5555 TAILQ_INIT(&desc->pending_media_events); 5556 TAILQ_INIT(&desc->free_media_events); 5557 5558 desc->callback.open_with_ext = false; 5559 desc->callback.remove_fn = remove_cb; 5560 desc->callback.ctx = remove_ctx; 5561 pthread_mutex_init(&desc->mutex, NULL); 5562 5563 pthread_mutex_lock(&g_bdev_mgr.mutex); 5564 5565 rc = bdev_open(bdev, write, desc); 5566 if (rc != 0) { 5567 bdev_desc_free(desc); 5568 desc = NULL; 5569 } 5570 5571 *_desc = desc; 5572 5573 pthread_mutex_unlock(&g_bdev_mgr.mutex); 5574 5575 return rc; 5576 } 5577 5578 int 5579 spdk_bdev_open_ext(const char *bdev_name, bool write, spdk_bdev_event_cb_t event_cb, 5580 void *event_ctx, struct spdk_bdev_desc **_desc) 5581 { 5582 struct spdk_bdev_desc *desc; 5583 struct spdk_bdev *bdev; 5584 unsigned int event_id; 5585 int rc; 5586 5587 if (event_cb == NULL) { 5588 SPDK_ERRLOG("Missing event callback function\n"); 5589 return -EINVAL; 5590 } 5591 5592 pthread_mutex_lock(&g_bdev_mgr.mutex); 5593 5594 bdev = spdk_bdev_get_by_name(bdev_name); 5595 5596 if (bdev == NULL) { 5597 SPDK_ERRLOG("Failed to find bdev with name: %s\n", bdev_name); 5598 pthread_mutex_unlock(&g_bdev_mgr.mutex); 5599 return -EINVAL; 5600 } 5601 5602 desc = calloc(1, sizeof(*desc)); 5603 if (desc == NULL) { 5604 SPDK_ERRLOG("Failed to allocate memory for bdev descriptor\n"); 5605 pthread_mutex_unlock(&g_bdev_mgr.mutex); 5606 return -ENOMEM; 5607 } 5608 5609 TAILQ_INIT(&desc->pending_media_events); 5610 TAILQ_INIT(&desc->free_media_events); 5611 5612 desc->callback.open_with_ext = true; 5613 desc->callback.event_fn = event_cb; 5614 desc->callback.ctx = event_ctx; 5615 pthread_mutex_init(&desc->mutex, NULL); 5616 5617 if (bdev->media_events) { 5618 desc->media_events_buffer = calloc(MEDIA_EVENT_POOL_SIZE, 5619 sizeof(*desc->media_events_buffer)); 5620 if (desc->media_events_buffer == NULL) { 5621 SPDK_ERRLOG("Failed to initialize media event pool\n"); 5622 bdev_desc_free(desc); 5623 pthread_mutex_unlock(&g_bdev_mgr.mutex); 5624 return -ENOMEM; 5625 } 5626 5627 for (event_id = 0; event_id < MEDIA_EVENT_POOL_SIZE; ++event_id) { 5628 TAILQ_INSERT_TAIL(&desc->free_media_events, 5629 &desc->media_events_buffer[event_id], tailq); 5630 } 5631 } 5632 5633 rc = bdev_open(bdev, write, desc); 5634 if (rc != 0) { 5635 bdev_desc_free(desc); 5636 desc = NULL; 5637 } 5638 5639 *_desc = desc; 5640 5641 pthread_mutex_unlock(&g_bdev_mgr.mutex); 5642 5643 return rc; 5644 } 5645 5646 void 5647 spdk_bdev_close(struct spdk_bdev_desc *desc) 5648 { 5649 struct spdk_bdev *bdev = spdk_bdev_desc_get_bdev(desc); 5650 int rc; 5651 5652 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Closing descriptor %p for bdev %s on thread %p\n", desc, bdev->name, 5653 spdk_get_thread()); 5654 5655 assert(desc->thread == spdk_get_thread()); 5656 5657 spdk_poller_unregister(&desc->io_timeout_poller); 5658 5659 pthread_mutex_lock(&bdev->internal.mutex); 5660 pthread_mutex_lock(&desc->mutex); 5661 5662 TAILQ_REMOVE(&bdev->internal.open_descs, desc, link); 5663 5664 desc->closed = true; 5665 5666 if (0 == desc->refs) { 5667 pthread_mutex_unlock(&desc->mutex); 5668 bdev_desc_free(desc); 5669 } else { 5670 pthread_mutex_unlock(&desc->mutex); 5671 } 5672 5673 /* If no more descriptors, kill QoS channel */ 5674 if (bdev->internal.qos && TAILQ_EMPTY(&bdev->internal.open_descs)) { 5675 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Closed last descriptor for bdev %s on thread %p. Stopping QoS.\n", 5676 bdev->name, spdk_get_thread()); 5677 5678 if (bdev_qos_destroy(bdev)) { 5679 /* There isn't anything we can do to recover here. Just let the 5680 * old QoS poller keep running. The QoS handling won't change 5681 * cores when the user allocates a new channel, but it won't break. */ 5682 SPDK_ERRLOG("Unable to shut down QoS poller. It will continue running on the current thread.\n"); 5683 } 5684 } 5685 5686 spdk_bdev_set_qd_sampling_period(bdev, 0); 5687 5688 if (bdev->internal.status == SPDK_BDEV_STATUS_REMOVING && TAILQ_EMPTY(&bdev->internal.open_descs)) { 5689 rc = bdev_unregister_unsafe(bdev); 5690 pthread_mutex_unlock(&bdev->internal.mutex); 5691 5692 if (rc == 0) { 5693 bdev_fini(bdev); 5694 } 5695 } else { 5696 pthread_mutex_unlock(&bdev->internal.mutex); 5697 } 5698 } 5699 5700 int 5701 spdk_bdev_module_claim_bdev(struct spdk_bdev *bdev, struct spdk_bdev_desc *desc, 5702 struct spdk_bdev_module *module) 5703 { 5704 if (bdev->internal.claim_module != NULL) { 5705 SPDK_ERRLOG("bdev %s already claimed by module %s\n", bdev->name, 5706 bdev->internal.claim_module->name); 5707 return -EPERM; 5708 } 5709 5710 if (desc && !desc->write) { 5711 desc->write = true; 5712 } 5713 5714 bdev->internal.claim_module = module; 5715 return 0; 5716 } 5717 5718 void 5719 spdk_bdev_module_release_bdev(struct spdk_bdev *bdev) 5720 { 5721 assert(bdev->internal.claim_module != NULL); 5722 bdev->internal.claim_module = NULL; 5723 } 5724 5725 struct spdk_bdev * 5726 spdk_bdev_desc_get_bdev(struct spdk_bdev_desc *desc) 5727 { 5728 assert(desc != NULL); 5729 return desc->bdev; 5730 } 5731 5732 void 5733 spdk_bdev_io_get_iovec(struct spdk_bdev_io *bdev_io, struct iovec **iovp, int *iovcntp) 5734 { 5735 struct iovec *iovs; 5736 int iovcnt; 5737 5738 if (bdev_io == NULL) { 5739 return; 5740 } 5741 5742 switch (bdev_io->type) { 5743 case SPDK_BDEV_IO_TYPE_READ: 5744 case SPDK_BDEV_IO_TYPE_WRITE: 5745 case SPDK_BDEV_IO_TYPE_ZCOPY: 5746 iovs = bdev_io->u.bdev.iovs; 5747 iovcnt = bdev_io->u.bdev.iovcnt; 5748 break; 5749 default: 5750 iovs = NULL; 5751 iovcnt = 0; 5752 break; 5753 } 5754 5755 if (iovp) { 5756 *iovp = iovs; 5757 } 5758 if (iovcntp) { 5759 *iovcntp = iovcnt; 5760 } 5761 } 5762 5763 void * 5764 spdk_bdev_io_get_md_buf(struct spdk_bdev_io *bdev_io) 5765 { 5766 if (bdev_io == NULL) { 5767 return NULL; 5768 } 5769 5770 if (!spdk_bdev_is_md_separate(bdev_io->bdev)) { 5771 return NULL; 5772 } 5773 5774 if (bdev_io->type == SPDK_BDEV_IO_TYPE_READ || 5775 bdev_io->type == SPDK_BDEV_IO_TYPE_WRITE) { 5776 return bdev_io->u.bdev.md_buf; 5777 } 5778 5779 return NULL; 5780 } 5781 5782 void 5783 spdk_bdev_module_list_add(struct spdk_bdev_module *bdev_module) 5784 { 5785 5786 if (spdk_bdev_module_list_find(bdev_module->name)) { 5787 SPDK_ERRLOG("ERROR: module '%s' already registered.\n", bdev_module->name); 5788 assert(false); 5789 } 5790 5791 /* 5792 * Modules with examine callbacks must be initialized first, so they are 5793 * ready to handle examine callbacks from later modules that will 5794 * register physical bdevs. 5795 */ 5796 if (bdev_module->examine_config != NULL || bdev_module->examine_disk != NULL) { 5797 TAILQ_INSERT_HEAD(&g_bdev_mgr.bdev_modules, bdev_module, internal.tailq); 5798 } else { 5799 TAILQ_INSERT_TAIL(&g_bdev_mgr.bdev_modules, bdev_module, internal.tailq); 5800 } 5801 } 5802 5803 struct spdk_bdev_module * 5804 spdk_bdev_module_list_find(const char *name) 5805 { 5806 struct spdk_bdev_module *bdev_module; 5807 5808 TAILQ_FOREACH(bdev_module, &g_bdev_mgr.bdev_modules, internal.tailq) { 5809 if (strcmp(name, bdev_module->name) == 0) { 5810 break; 5811 } 5812 } 5813 5814 return bdev_module; 5815 } 5816 5817 static void 5818 bdev_write_zero_buffer_next(void *_bdev_io) 5819 { 5820 struct spdk_bdev_io *bdev_io = _bdev_io; 5821 uint64_t num_bytes, num_blocks; 5822 void *md_buf = NULL; 5823 int rc; 5824 5825 num_bytes = spdk_min(_bdev_get_block_size_with_md(bdev_io->bdev) * 5826 bdev_io->u.bdev.split_remaining_num_blocks, 5827 ZERO_BUFFER_SIZE); 5828 num_blocks = num_bytes / _bdev_get_block_size_with_md(bdev_io->bdev); 5829 5830 if (spdk_bdev_is_md_separate(bdev_io->bdev)) { 5831 md_buf = (char *)g_bdev_mgr.zero_buffer + 5832 spdk_bdev_get_block_size(bdev_io->bdev) * num_blocks; 5833 } 5834 5835 rc = bdev_write_blocks_with_md(bdev_io->internal.desc, 5836 spdk_io_channel_from_ctx(bdev_io->internal.ch), 5837 g_bdev_mgr.zero_buffer, md_buf, 5838 bdev_io->u.bdev.split_current_offset_blocks, num_blocks, 5839 bdev_write_zero_buffer_done, bdev_io); 5840 if (rc == 0) { 5841 bdev_io->u.bdev.split_remaining_num_blocks -= num_blocks; 5842 bdev_io->u.bdev.split_current_offset_blocks += num_blocks; 5843 } else if (rc == -ENOMEM) { 5844 bdev_queue_io_wait_with_cb(bdev_io, bdev_write_zero_buffer_next); 5845 } else { 5846 bdev_io->internal.status = SPDK_BDEV_IO_STATUS_FAILED; 5847 bdev_io->internal.cb(bdev_io, false, bdev_io->internal.caller_ctx); 5848 } 5849 } 5850 5851 static void 5852 bdev_write_zero_buffer_done(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg) 5853 { 5854 struct spdk_bdev_io *parent_io = cb_arg; 5855 5856 spdk_bdev_free_io(bdev_io); 5857 5858 if (!success) { 5859 parent_io->internal.status = SPDK_BDEV_IO_STATUS_FAILED; 5860 parent_io->internal.cb(parent_io, false, parent_io->internal.caller_ctx); 5861 return; 5862 } 5863 5864 if (parent_io->u.bdev.split_remaining_num_blocks == 0) { 5865 parent_io->internal.status = SPDK_BDEV_IO_STATUS_SUCCESS; 5866 parent_io->internal.cb(parent_io, true, parent_io->internal.caller_ctx); 5867 return; 5868 } 5869 5870 bdev_write_zero_buffer_next(parent_io); 5871 } 5872 5873 static void 5874 bdev_set_qos_limit_done(struct set_qos_limit_ctx *ctx, int status) 5875 { 5876 pthread_mutex_lock(&ctx->bdev->internal.mutex); 5877 ctx->bdev->internal.qos_mod_in_progress = false; 5878 pthread_mutex_unlock(&ctx->bdev->internal.mutex); 5879 5880 if (ctx->cb_fn) { 5881 ctx->cb_fn(ctx->cb_arg, status); 5882 } 5883 free(ctx); 5884 } 5885 5886 static void 5887 bdev_disable_qos_done(void *cb_arg) 5888 { 5889 struct set_qos_limit_ctx *ctx = cb_arg; 5890 struct spdk_bdev *bdev = ctx->bdev; 5891 struct spdk_bdev_io *bdev_io; 5892 struct spdk_bdev_qos *qos; 5893 5894 pthread_mutex_lock(&bdev->internal.mutex); 5895 qos = bdev->internal.qos; 5896 bdev->internal.qos = NULL; 5897 pthread_mutex_unlock(&bdev->internal.mutex); 5898 5899 while (!TAILQ_EMPTY(&qos->queued)) { 5900 /* Send queued I/O back to their original thread for resubmission. */ 5901 bdev_io = TAILQ_FIRST(&qos->queued); 5902 TAILQ_REMOVE(&qos->queued, bdev_io, internal.link); 5903 5904 if (bdev_io->internal.io_submit_ch) { 5905 /* 5906 * Channel was changed when sending it to the QoS thread - change it back 5907 * before sending it back to the original thread. 5908 */ 5909 bdev_io->internal.ch = bdev_io->internal.io_submit_ch; 5910 bdev_io->internal.io_submit_ch = NULL; 5911 } 5912 5913 spdk_thread_send_msg(spdk_bdev_io_get_thread(bdev_io), 5914 _bdev_io_submit, bdev_io); 5915 } 5916 5917 if (qos->thread != NULL) { 5918 spdk_put_io_channel(spdk_io_channel_from_ctx(qos->ch)); 5919 spdk_poller_unregister(&qos->poller); 5920 } 5921 5922 free(qos); 5923 5924 bdev_set_qos_limit_done(ctx, 0); 5925 } 5926 5927 static void 5928 bdev_disable_qos_msg_done(struct spdk_io_channel_iter *i, int status) 5929 { 5930 void *io_device = spdk_io_channel_iter_get_io_device(i); 5931 struct spdk_bdev *bdev = __bdev_from_io_dev(io_device); 5932 struct set_qos_limit_ctx *ctx = spdk_io_channel_iter_get_ctx(i); 5933 struct spdk_thread *thread; 5934 5935 pthread_mutex_lock(&bdev->internal.mutex); 5936 thread = bdev->internal.qos->thread; 5937 pthread_mutex_unlock(&bdev->internal.mutex); 5938 5939 if (thread != NULL) { 5940 spdk_thread_send_msg(thread, bdev_disable_qos_done, ctx); 5941 } else { 5942 bdev_disable_qos_done(ctx); 5943 } 5944 } 5945 5946 static void 5947 bdev_disable_qos_msg(struct spdk_io_channel_iter *i) 5948 { 5949 struct spdk_io_channel *ch = spdk_io_channel_iter_get_channel(i); 5950 struct spdk_bdev_channel *bdev_ch = spdk_io_channel_get_ctx(ch); 5951 5952 bdev_ch->flags &= ~BDEV_CH_QOS_ENABLED; 5953 5954 spdk_for_each_channel_continue(i, 0); 5955 } 5956 5957 static void 5958 bdev_update_qos_rate_limit_msg(void *cb_arg) 5959 { 5960 struct set_qos_limit_ctx *ctx = cb_arg; 5961 struct spdk_bdev *bdev = ctx->bdev; 5962 5963 pthread_mutex_lock(&bdev->internal.mutex); 5964 bdev_qos_update_max_quota_per_timeslice(bdev->internal.qos); 5965 pthread_mutex_unlock(&bdev->internal.mutex); 5966 5967 bdev_set_qos_limit_done(ctx, 0); 5968 } 5969 5970 static void 5971 bdev_enable_qos_msg(struct spdk_io_channel_iter *i) 5972 { 5973 void *io_device = spdk_io_channel_iter_get_io_device(i); 5974 struct spdk_bdev *bdev = __bdev_from_io_dev(io_device); 5975 struct spdk_io_channel *ch = spdk_io_channel_iter_get_channel(i); 5976 struct spdk_bdev_channel *bdev_ch = spdk_io_channel_get_ctx(ch); 5977 5978 pthread_mutex_lock(&bdev->internal.mutex); 5979 bdev_enable_qos(bdev, bdev_ch); 5980 pthread_mutex_unlock(&bdev->internal.mutex); 5981 spdk_for_each_channel_continue(i, 0); 5982 } 5983 5984 static void 5985 bdev_enable_qos_done(struct spdk_io_channel_iter *i, int status) 5986 { 5987 struct set_qos_limit_ctx *ctx = spdk_io_channel_iter_get_ctx(i); 5988 5989 bdev_set_qos_limit_done(ctx, status); 5990 } 5991 5992 static void 5993 bdev_set_qos_rate_limits(struct spdk_bdev *bdev, uint64_t *limits) 5994 { 5995 int i; 5996 5997 assert(bdev->internal.qos != NULL); 5998 5999 for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) { 6000 if (limits[i] != SPDK_BDEV_QOS_LIMIT_NOT_DEFINED) { 6001 bdev->internal.qos->rate_limits[i].limit = limits[i]; 6002 6003 if (limits[i] == 0) { 6004 bdev->internal.qos->rate_limits[i].limit = 6005 SPDK_BDEV_QOS_LIMIT_NOT_DEFINED; 6006 } 6007 } 6008 } 6009 } 6010 6011 void 6012 spdk_bdev_set_qos_rate_limits(struct spdk_bdev *bdev, uint64_t *limits, 6013 void (*cb_fn)(void *cb_arg, int status), void *cb_arg) 6014 { 6015 struct set_qos_limit_ctx *ctx; 6016 uint32_t limit_set_complement; 6017 uint64_t min_limit_per_sec; 6018 int i; 6019 bool disable_rate_limit = true; 6020 6021 for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) { 6022 if (limits[i] == SPDK_BDEV_QOS_LIMIT_NOT_DEFINED) { 6023 continue; 6024 } 6025 6026 if (limits[i] > 0) { 6027 disable_rate_limit = false; 6028 } 6029 6030 if (bdev_qos_is_iops_rate_limit(i) == true) { 6031 min_limit_per_sec = SPDK_BDEV_QOS_MIN_IOS_PER_SEC; 6032 } else { 6033 /* Change from megabyte to byte rate limit */ 6034 limits[i] = limits[i] * 1024 * 1024; 6035 min_limit_per_sec = SPDK_BDEV_QOS_MIN_BYTES_PER_SEC; 6036 } 6037 6038 limit_set_complement = limits[i] % min_limit_per_sec; 6039 if (limit_set_complement) { 6040 SPDK_ERRLOG("Requested rate limit %" PRIu64 " is not a multiple of %" PRIu64 "\n", 6041 limits[i], min_limit_per_sec); 6042 limits[i] += min_limit_per_sec - limit_set_complement; 6043 SPDK_ERRLOG("Round up the rate limit to %" PRIu64 "\n", limits[i]); 6044 } 6045 } 6046 6047 ctx = calloc(1, sizeof(*ctx)); 6048 if (ctx == NULL) { 6049 cb_fn(cb_arg, -ENOMEM); 6050 return; 6051 } 6052 6053 ctx->cb_fn = cb_fn; 6054 ctx->cb_arg = cb_arg; 6055 ctx->bdev = bdev; 6056 6057 pthread_mutex_lock(&bdev->internal.mutex); 6058 if (bdev->internal.qos_mod_in_progress) { 6059 pthread_mutex_unlock(&bdev->internal.mutex); 6060 free(ctx); 6061 cb_fn(cb_arg, -EAGAIN); 6062 return; 6063 } 6064 bdev->internal.qos_mod_in_progress = true; 6065 6066 if (disable_rate_limit == true && bdev->internal.qos) { 6067 for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) { 6068 if (limits[i] == SPDK_BDEV_QOS_LIMIT_NOT_DEFINED && 6069 (bdev->internal.qos->rate_limits[i].limit > 0 && 6070 bdev->internal.qos->rate_limits[i].limit != 6071 SPDK_BDEV_QOS_LIMIT_NOT_DEFINED)) { 6072 disable_rate_limit = false; 6073 break; 6074 } 6075 } 6076 } 6077 6078 if (disable_rate_limit == false) { 6079 if (bdev->internal.qos == NULL) { 6080 bdev->internal.qos = calloc(1, sizeof(*bdev->internal.qos)); 6081 if (!bdev->internal.qos) { 6082 pthread_mutex_unlock(&bdev->internal.mutex); 6083 SPDK_ERRLOG("Unable to allocate memory for QoS tracking\n"); 6084 bdev_set_qos_limit_done(ctx, -ENOMEM); 6085 return; 6086 } 6087 } 6088 6089 if (bdev->internal.qos->thread == NULL) { 6090 /* Enabling */ 6091 bdev_set_qos_rate_limits(bdev, limits); 6092 6093 spdk_for_each_channel(__bdev_to_io_dev(bdev), 6094 bdev_enable_qos_msg, ctx, 6095 bdev_enable_qos_done); 6096 } else { 6097 /* Updating */ 6098 bdev_set_qos_rate_limits(bdev, limits); 6099 6100 spdk_thread_send_msg(bdev->internal.qos->thread, 6101 bdev_update_qos_rate_limit_msg, ctx); 6102 } 6103 } else { 6104 if (bdev->internal.qos != NULL) { 6105 bdev_set_qos_rate_limits(bdev, limits); 6106 6107 /* Disabling */ 6108 spdk_for_each_channel(__bdev_to_io_dev(bdev), 6109 bdev_disable_qos_msg, ctx, 6110 bdev_disable_qos_msg_done); 6111 } else { 6112 pthread_mutex_unlock(&bdev->internal.mutex); 6113 bdev_set_qos_limit_done(ctx, 0); 6114 return; 6115 } 6116 } 6117 6118 pthread_mutex_unlock(&bdev->internal.mutex); 6119 } 6120 6121 struct spdk_bdev_histogram_ctx { 6122 spdk_bdev_histogram_status_cb cb_fn; 6123 void *cb_arg; 6124 struct spdk_bdev *bdev; 6125 int status; 6126 }; 6127 6128 static void 6129 bdev_histogram_disable_channel_cb(struct spdk_io_channel_iter *i, int status) 6130 { 6131 struct spdk_bdev_histogram_ctx *ctx = spdk_io_channel_iter_get_ctx(i); 6132 6133 pthread_mutex_lock(&ctx->bdev->internal.mutex); 6134 ctx->bdev->internal.histogram_in_progress = false; 6135 pthread_mutex_unlock(&ctx->bdev->internal.mutex); 6136 ctx->cb_fn(ctx->cb_arg, ctx->status); 6137 free(ctx); 6138 } 6139 6140 static void 6141 bdev_histogram_disable_channel(struct spdk_io_channel_iter *i) 6142 { 6143 struct spdk_io_channel *_ch = spdk_io_channel_iter_get_channel(i); 6144 struct spdk_bdev_channel *ch = spdk_io_channel_get_ctx(_ch); 6145 6146 if (ch->histogram != NULL) { 6147 spdk_histogram_data_free(ch->histogram); 6148 ch->histogram = NULL; 6149 } 6150 spdk_for_each_channel_continue(i, 0); 6151 } 6152 6153 static void 6154 bdev_histogram_enable_channel_cb(struct spdk_io_channel_iter *i, int status) 6155 { 6156 struct spdk_bdev_histogram_ctx *ctx = spdk_io_channel_iter_get_ctx(i); 6157 6158 if (status != 0) { 6159 ctx->status = status; 6160 ctx->bdev->internal.histogram_enabled = false; 6161 spdk_for_each_channel(__bdev_to_io_dev(ctx->bdev), bdev_histogram_disable_channel, ctx, 6162 bdev_histogram_disable_channel_cb); 6163 } else { 6164 pthread_mutex_lock(&ctx->bdev->internal.mutex); 6165 ctx->bdev->internal.histogram_in_progress = false; 6166 pthread_mutex_unlock(&ctx->bdev->internal.mutex); 6167 ctx->cb_fn(ctx->cb_arg, ctx->status); 6168 free(ctx); 6169 } 6170 } 6171 6172 static void 6173 bdev_histogram_enable_channel(struct spdk_io_channel_iter *i) 6174 { 6175 struct spdk_io_channel *_ch = spdk_io_channel_iter_get_channel(i); 6176 struct spdk_bdev_channel *ch = spdk_io_channel_get_ctx(_ch); 6177 int status = 0; 6178 6179 if (ch->histogram == NULL) { 6180 ch->histogram = spdk_histogram_data_alloc(); 6181 if (ch->histogram == NULL) { 6182 status = -ENOMEM; 6183 } 6184 } 6185 6186 spdk_for_each_channel_continue(i, status); 6187 } 6188 6189 void 6190 spdk_bdev_histogram_enable(struct spdk_bdev *bdev, spdk_bdev_histogram_status_cb cb_fn, 6191 void *cb_arg, bool enable) 6192 { 6193 struct spdk_bdev_histogram_ctx *ctx; 6194 6195 ctx = calloc(1, sizeof(struct spdk_bdev_histogram_ctx)); 6196 if (ctx == NULL) { 6197 cb_fn(cb_arg, -ENOMEM); 6198 return; 6199 } 6200 6201 ctx->bdev = bdev; 6202 ctx->status = 0; 6203 ctx->cb_fn = cb_fn; 6204 ctx->cb_arg = cb_arg; 6205 6206 pthread_mutex_lock(&bdev->internal.mutex); 6207 if (bdev->internal.histogram_in_progress) { 6208 pthread_mutex_unlock(&bdev->internal.mutex); 6209 free(ctx); 6210 cb_fn(cb_arg, -EAGAIN); 6211 return; 6212 } 6213 6214 bdev->internal.histogram_in_progress = true; 6215 pthread_mutex_unlock(&bdev->internal.mutex); 6216 6217 bdev->internal.histogram_enabled = enable; 6218 6219 if (enable) { 6220 /* Allocate histogram for each channel */ 6221 spdk_for_each_channel(__bdev_to_io_dev(bdev), bdev_histogram_enable_channel, ctx, 6222 bdev_histogram_enable_channel_cb); 6223 } else { 6224 spdk_for_each_channel(__bdev_to_io_dev(bdev), bdev_histogram_disable_channel, ctx, 6225 bdev_histogram_disable_channel_cb); 6226 } 6227 } 6228 6229 struct spdk_bdev_histogram_data_ctx { 6230 spdk_bdev_histogram_data_cb cb_fn; 6231 void *cb_arg; 6232 struct spdk_bdev *bdev; 6233 /** merged histogram data from all channels */ 6234 struct spdk_histogram_data *histogram; 6235 }; 6236 6237 static void 6238 bdev_histogram_get_channel_cb(struct spdk_io_channel_iter *i, int status) 6239 { 6240 struct spdk_bdev_histogram_data_ctx *ctx = spdk_io_channel_iter_get_ctx(i); 6241 6242 ctx->cb_fn(ctx->cb_arg, status, ctx->histogram); 6243 free(ctx); 6244 } 6245 6246 static void 6247 bdev_histogram_get_channel(struct spdk_io_channel_iter *i) 6248 { 6249 struct spdk_io_channel *_ch = spdk_io_channel_iter_get_channel(i); 6250 struct spdk_bdev_channel *ch = spdk_io_channel_get_ctx(_ch); 6251 struct spdk_bdev_histogram_data_ctx *ctx = spdk_io_channel_iter_get_ctx(i); 6252 int status = 0; 6253 6254 if (ch->histogram == NULL) { 6255 status = -EFAULT; 6256 } else { 6257 spdk_histogram_data_merge(ctx->histogram, ch->histogram); 6258 } 6259 6260 spdk_for_each_channel_continue(i, status); 6261 } 6262 6263 void 6264 spdk_bdev_histogram_get(struct spdk_bdev *bdev, struct spdk_histogram_data *histogram, 6265 spdk_bdev_histogram_data_cb cb_fn, 6266 void *cb_arg) 6267 { 6268 struct spdk_bdev_histogram_data_ctx *ctx; 6269 6270 ctx = calloc(1, sizeof(struct spdk_bdev_histogram_data_ctx)); 6271 if (ctx == NULL) { 6272 cb_fn(cb_arg, -ENOMEM, NULL); 6273 return; 6274 } 6275 6276 ctx->bdev = bdev; 6277 ctx->cb_fn = cb_fn; 6278 ctx->cb_arg = cb_arg; 6279 6280 ctx->histogram = histogram; 6281 6282 spdk_for_each_channel(__bdev_to_io_dev(bdev), bdev_histogram_get_channel, ctx, 6283 bdev_histogram_get_channel_cb); 6284 } 6285 6286 size_t 6287 spdk_bdev_get_media_events(struct spdk_bdev_desc *desc, struct spdk_bdev_media_event *events, 6288 size_t max_events) 6289 { 6290 struct media_event_entry *entry; 6291 size_t num_events = 0; 6292 6293 for (; num_events < max_events; ++num_events) { 6294 entry = TAILQ_FIRST(&desc->pending_media_events); 6295 if (entry == NULL) { 6296 break; 6297 } 6298 6299 events[num_events] = entry->event; 6300 TAILQ_REMOVE(&desc->pending_media_events, entry, tailq); 6301 TAILQ_INSERT_TAIL(&desc->free_media_events, entry, tailq); 6302 } 6303 6304 return num_events; 6305 } 6306 6307 int 6308 spdk_bdev_push_media_events(struct spdk_bdev *bdev, const struct spdk_bdev_media_event *events, 6309 size_t num_events) 6310 { 6311 struct spdk_bdev_desc *desc; 6312 struct media_event_entry *entry; 6313 size_t event_id; 6314 int rc = 0; 6315 6316 assert(bdev->media_events); 6317 6318 pthread_mutex_lock(&bdev->internal.mutex); 6319 TAILQ_FOREACH(desc, &bdev->internal.open_descs, link) { 6320 if (desc->write) { 6321 break; 6322 } 6323 } 6324 6325 if (desc == NULL || desc->media_events_buffer == NULL) { 6326 rc = -ENODEV; 6327 goto out; 6328 } 6329 6330 for (event_id = 0; event_id < num_events; ++event_id) { 6331 entry = TAILQ_FIRST(&desc->free_media_events); 6332 if (entry == NULL) { 6333 break; 6334 } 6335 6336 TAILQ_REMOVE(&desc->free_media_events, entry, tailq); 6337 TAILQ_INSERT_TAIL(&desc->pending_media_events, entry, tailq); 6338 entry->event = events[event_id]; 6339 } 6340 6341 rc = event_id; 6342 out: 6343 pthread_mutex_unlock(&bdev->internal.mutex); 6344 return rc; 6345 } 6346 6347 void 6348 spdk_bdev_notify_media_management(struct spdk_bdev *bdev) 6349 { 6350 struct spdk_bdev_desc *desc; 6351 6352 pthread_mutex_lock(&bdev->internal.mutex); 6353 TAILQ_FOREACH(desc, &bdev->internal.open_descs, link) { 6354 if (!TAILQ_EMPTY(&desc->pending_media_events)) { 6355 desc->callback.event_fn(SPDK_BDEV_EVENT_MEDIA_MANAGEMENT, bdev, 6356 desc->callback.ctx); 6357 } 6358 } 6359 pthread_mutex_unlock(&bdev->internal.mutex); 6360 } 6361 6362 struct locked_lba_range_ctx { 6363 struct lba_range range; 6364 struct spdk_bdev *bdev; 6365 struct lba_range *current_range; 6366 struct lba_range *owner_range; 6367 struct spdk_poller *poller; 6368 lock_range_cb cb_fn; 6369 void *cb_arg; 6370 }; 6371 6372 static void 6373 bdev_lock_error_cleanup_cb(struct spdk_io_channel_iter *i, int status) 6374 { 6375 struct locked_lba_range_ctx *ctx = spdk_io_channel_iter_get_ctx(i); 6376 6377 ctx->cb_fn(ctx->cb_arg, -ENOMEM); 6378 free(ctx); 6379 } 6380 6381 static void 6382 bdev_unlock_lba_range_get_channel(struct spdk_io_channel_iter *i); 6383 6384 static void 6385 bdev_lock_lba_range_cb(struct spdk_io_channel_iter *i, int status) 6386 { 6387 struct locked_lba_range_ctx *ctx = spdk_io_channel_iter_get_ctx(i); 6388 struct spdk_bdev *bdev = ctx->bdev; 6389 6390 if (status == -ENOMEM) { 6391 /* One of the channels could not allocate a range object. 6392 * So we have to go back and clean up any ranges that were 6393 * allocated successfully before we return error status to 6394 * the caller. We can reuse the unlock function to do that 6395 * clean up. 6396 */ 6397 spdk_for_each_channel(__bdev_to_io_dev(bdev), 6398 bdev_unlock_lba_range_get_channel, ctx, 6399 bdev_lock_error_cleanup_cb); 6400 return; 6401 } 6402 6403 /* All channels have locked this range and no I/O overlapping the range 6404 * are outstanding! Set the owner_ch for the range object for the 6405 * locking channel, so that this channel will know that it is allowed 6406 * to write to this range. 6407 */ 6408 ctx->owner_range->owner_ch = ctx->range.owner_ch; 6409 ctx->cb_fn(ctx->cb_arg, status); 6410 6411 /* Don't free the ctx here. Its range is in the bdev's global list of 6412 * locked ranges still, and will be removed and freed when this range 6413 * is later unlocked. 6414 */ 6415 } 6416 6417 static int 6418 bdev_lock_lba_range_check_io(void *_i) 6419 { 6420 struct spdk_io_channel_iter *i = _i; 6421 struct spdk_io_channel *_ch = spdk_io_channel_iter_get_channel(i); 6422 struct spdk_bdev_channel *ch = spdk_io_channel_get_ctx(_ch); 6423 struct locked_lba_range_ctx *ctx = spdk_io_channel_iter_get_ctx(i); 6424 struct lba_range *range = ctx->current_range; 6425 struct spdk_bdev_io *bdev_io; 6426 6427 spdk_poller_unregister(&ctx->poller); 6428 6429 /* The range is now in the locked_ranges, so no new IO can be submitted to this 6430 * range. But we need to wait until any outstanding IO overlapping with this range 6431 * are completed. 6432 */ 6433 TAILQ_FOREACH(bdev_io, &ch->io_submitted, internal.ch_link) { 6434 if (bdev_io_range_is_locked(bdev_io, range)) { 6435 ctx->poller = SPDK_POLLER_REGISTER(bdev_lock_lba_range_check_io, i, 100); 6436 return 1; 6437 } 6438 } 6439 6440 spdk_for_each_channel_continue(i, 0); 6441 return 1; 6442 } 6443 6444 static void 6445 bdev_lock_lba_range_get_channel(struct spdk_io_channel_iter *i) 6446 { 6447 struct spdk_io_channel *_ch = spdk_io_channel_iter_get_channel(i); 6448 struct spdk_bdev_channel *ch = spdk_io_channel_get_ctx(_ch); 6449 struct locked_lba_range_ctx *ctx = spdk_io_channel_iter_get_ctx(i); 6450 struct lba_range *range; 6451 6452 TAILQ_FOREACH(range, &ch->locked_ranges, tailq) { 6453 if (range->length == ctx->range.length && 6454 range->offset == ctx->range.offset && 6455 range->locked_ctx == ctx->range.locked_ctx) { 6456 /* This range already exists on this channel, so don't add 6457 * it again. This can happen when a new channel is created 6458 * while the for_each_channel operation is in progress. 6459 * Do not check for outstanding I/O in that case, since the 6460 * range was locked before any I/O could be submitted to the 6461 * new channel. 6462 */ 6463 spdk_for_each_channel_continue(i, 0); 6464 return; 6465 } 6466 } 6467 6468 range = calloc(1, sizeof(*range)); 6469 if (range == NULL) { 6470 spdk_for_each_channel_continue(i, -ENOMEM); 6471 return; 6472 } 6473 6474 range->length = ctx->range.length; 6475 range->offset = ctx->range.offset; 6476 range->locked_ctx = ctx->range.locked_ctx; 6477 ctx->current_range = range; 6478 if (ctx->range.owner_ch == ch) { 6479 /* This is the range object for the channel that will hold 6480 * the lock. Store it in the ctx object so that we can easily 6481 * set its owner_ch after the lock is finally acquired. 6482 */ 6483 ctx->owner_range = range; 6484 } 6485 TAILQ_INSERT_TAIL(&ch->locked_ranges, range, tailq); 6486 bdev_lock_lba_range_check_io(i); 6487 } 6488 6489 static void 6490 bdev_lock_lba_range_ctx(struct spdk_bdev *bdev, struct locked_lba_range_ctx *ctx) 6491 { 6492 assert(spdk_get_thread() == ctx->range.owner_ch->channel->thread); 6493 6494 /* We will add a copy of this range to each channel now. */ 6495 spdk_for_each_channel(__bdev_to_io_dev(bdev), bdev_lock_lba_range_get_channel, ctx, 6496 bdev_lock_lba_range_cb); 6497 } 6498 6499 static bool 6500 bdev_lba_range_overlaps_tailq(struct lba_range *range, lba_range_tailq_t *tailq) 6501 { 6502 struct lba_range *r; 6503 6504 TAILQ_FOREACH(r, tailq, tailq) { 6505 if (bdev_lba_range_overlapped(range, r)) { 6506 return true; 6507 } 6508 } 6509 return false; 6510 } 6511 6512 static int 6513 bdev_lock_lba_range(struct spdk_bdev_desc *desc, struct spdk_io_channel *_ch, 6514 uint64_t offset, uint64_t length, 6515 lock_range_cb cb_fn, void *cb_arg) 6516 { 6517 struct spdk_bdev *bdev = spdk_bdev_desc_get_bdev(desc); 6518 struct spdk_bdev_channel *ch = spdk_io_channel_get_ctx(_ch); 6519 struct locked_lba_range_ctx *ctx; 6520 6521 if (cb_arg == NULL) { 6522 SPDK_ERRLOG("cb_arg must not be NULL\n"); 6523 return -EINVAL; 6524 } 6525 6526 ctx = calloc(1, sizeof(*ctx)); 6527 if (ctx == NULL) { 6528 return -ENOMEM; 6529 } 6530 6531 ctx->range.offset = offset; 6532 ctx->range.length = length; 6533 ctx->range.owner_ch = ch; 6534 ctx->range.locked_ctx = cb_arg; 6535 ctx->bdev = bdev; 6536 ctx->cb_fn = cb_fn; 6537 ctx->cb_arg = cb_arg; 6538 6539 pthread_mutex_lock(&bdev->internal.mutex); 6540 if (bdev_lba_range_overlaps_tailq(&ctx->range, &bdev->internal.locked_ranges)) { 6541 /* There is an active lock overlapping with this range. 6542 * Put it on the pending list until this range no 6543 * longer overlaps with another. 6544 */ 6545 TAILQ_INSERT_TAIL(&bdev->internal.pending_locked_ranges, &ctx->range, tailq); 6546 } else { 6547 TAILQ_INSERT_TAIL(&bdev->internal.locked_ranges, &ctx->range, tailq); 6548 bdev_lock_lba_range_ctx(bdev, ctx); 6549 } 6550 pthread_mutex_unlock(&bdev->internal.mutex); 6551 return 0; 6552 } 6553 6554 static void 6555 bdev_lock_lba_range_ctx_msg(void *_ctx) 6556 { 6557 struct locked_lba_range_ctx *ctx = _ctx; 6558 6559 bdev_lock_lba_range_ctx(ctx->bdev, ctx); 6560 } 6561 6562 static void 6563 bdev_unlock_lba_range_cb(struct spdk_io_channel_iter *i, int status) 6564 { 6565 struct locked_lba_range_ctx *ctx = spdk_io_channel_iter_get_ctx(i); 6566 struct locked_lba_range_ctx *pending_ctx; 6567 struct spdk_bdev_channel *ch = ctx->range.owner_ch; 6568 struct spdk_bdev *bdev = ch->bdev; 6569 struct lba_range *range, *tmp; 6570 6571 pthread_mutex_lock(&bdev->internal.mutex); 6572 /* Check if there are any pending locked ranges that overlap with this range 6573 * that was just unlocked. If there are, check that it doesn't overlap with any 6574 * other locked ranges before calling bdev_lock_lba_range_ctx which will start 6575 * the lock process. 6576 */ 6577 TAILQ_FOREACH_SAFE(range, &bdev->internal.pending_locked_ranges, tailq, tmp) { 6578 if (bdev_lba_range_overlapped(range, &ctx->range) && 6579 !bdev_lba_range_overlaps_tailq(range, &bdev->internal.locked_ranges)) { 6580 TAILQ_REMOVE(&bdev->internal.pending_locked_ranges, range, tailq); 6581 pending_ctx = SPDK_CONTAINEROF(range, struct locked_lba_range_ctx, range); 6582 TAILQ_INSERT_TAIL(&bdev->internal.locked_ranges, range, tailq); 6583 spdk_thread_send_msg(pending_ctx->range.owner_ch->channel->thread, 6584 bdev_lock_lba_range_ctx_msg, pending_ctx); 6585 } 6586 } 6587 pthread_mutex_unlock(&bdev->internal.mutex); 6588 6589 ctx->cb_fn(ctx->cb_arg, status); 6590 free(ctx); 6591 } 6592 6593 static void 6594 bdev_unlock_lba_range_get_channel(struct spdk_io_channel_iter *i) 6595 { 6596 struct spdk_io_channel *_ch = spdk_io_channel_iter_get_channel(i); 6597 struct spdk_bdev_channel *ch = spdk_io_channel_get_ctx(_ch); 6598 struct locked_lba_range_ctx *ctx = spdk_io_channel_iter_get_ctx(i); 6599 TAILQ_HEAD(, spdk_bdev_io) io_locked; 6600 struct spdk_bdev_io *bdev_io; 6601 struct lba_range *range; 6602 6603 TAILQ_FOREACH(range, &ch->locked_ranges, tailq) { 6604 if (ctx->range.offset == range->offset && 6605 ctx->range.length == range->length && 6606 ctx->range.locked_ctx == range->locked_ctx) { 6607 TAILQ_REMOVE(&ch->locked_ranges, range, tailq); 6608 free(range); 6609 break; 6610 } 6611 } 6612 6613 /* Note: we should almost always be able to assert that the range specified 6614 * was found. But there are some very rare corner cases where a new channel 6615 * gets created simultaneously with a range unlock, where this function 6616 * would execute on that new channel and wouldn't have the range. 6617 * We also use this to clean up range allocations when a later allocation 6618 * fails in the locking path. 6619 * So we can't actually assert() here. 6620 */ 6621 6622 /* Swap the locked IO into a temporary list, and then try to submit them again. 6623 * We could hyper-optimize this to only resubmit locked I/O that overlap 6624 * with the range that was just unlocked, but this isn't a performance path so 6625 * we go for simplicity here. 6626 */ 6627 TAILQ_INIT(&io_locked); 6628 TAILQ_SWAP(&ch->io_locked, &io_locked, spdk_bdev_io, internal.ch_link); 6629 while (!TAILQ_EMPTY(&io_locked)) { 6630 bdev_io = TAILQ_FIRST(&io_locked); 6631 TAILQ_REMOVE(&io_locked, bdev_io, internal.ch_link); 6632 bdev_io_submit(bdev_io); 6633 } 6634 6635 spdk_for_each_channel_continue(i, 0); 6636 } 6637 6638 static int 6639 bdev_unlock_lba_range(struct spdk_bdev_desc *desc, struct spdk_io_channel *_ch, 6640 uint64_t offset, uint64_t length, 6641 lock_range_cb cb_fn, void *cb_arg) 6642 { 6643 struct spdk_bdev *bdev = spdk_bdev_desc_get_bdev(desc); 6644 struct spdk_bdev_channel *ch = spdk_io_channel_get_ctx(_ch); 6645 struct locked_lba_range_ctx *ctx; 6646 struct lba_range *range; 6647 bool range_found = false; 6648 6649 /* Let's make sure the specified channel actually has a lock on 6650 * the specified range. Note that the range must match exactly. 6651 */ 6652 TAILQ_FOREACH(range, &ch->locked_ranges, tailq) { 6653 if (range->offset == offset && range->length == length && 6654 range->owner_ch == ch && range->locked_ctx == cb_arg) { 6655 range_found = true; 6656 break; 6657 } 6658 } 6659 6660 if (!range_found) { 6661 return -EINVAL; 6662 } 6663 6664 pthread_mutex_lock(&bdev->internal.mutex); 6665 /* We confirmed that this channel has locked the specified range. To 6666 * start the unlock the process, we find the range in the bdev's locked_ranges 6667 * and remove it. This ensures new channels don't inherit the locked range. 6668 * Then we will send a message to each channel (including the one specified 6669 * here) to remove the range from its per-channel list. 6670 */ 6671 TAILQ_FOREACH(range, &bdev->internal.locked_ranges, tailq) { 6672 if (range->offset == offset && range->length == length && 6673 range->locked_ctx == cb_arg) { 6674 break; 6675 } 6676 } 6677 if (range == NULL) { 6678 assert(false); 6679 pthread_mutex_unlock(&bdev->internal.mutex); 6680 return -EINVAL; 6681 } 6682 TAILQ_REMOVE(&bdev->internal.locked_ranges, range, tailq); 6683 ctx = SPDK_CONTAINEROF(range, struct locked_lba_range_ctx, range); 6684 pthread_mutex_unlock(&bdev->internal.mutex); 6685 6686 ctx->cb_fn = cb_fn; 6687 ctx->cb_arg = cb_arg; 6688 6689 spdk_for_each_channel(__bdev_to_io_dev(bdev), bdev_unlock_lba_range_get_channel, ctx, 6690 bdev_unlock_lba_range_cb); 6691 return 0; 6692 } 6693 6694 SPDK_LOG_REGISTER_COMPONENT("bdev", SPDK_LOG_BDEV) 6695 6696 SPDK_TRACE_REGISTER_FN(bdev_trace, "bdev", TRACE_GROUP_BDEV) 6697 { 6698 spdk_trace_register_owner(OWNER_BDEV, 'b'); 6699 spdk_trace_register_object(OBJECT_BDEV_IO, 'i'); 6700 spdk_trace_register_description("BDEV_IO_START", TRACE_BDEV_IO_START, OWNER_BDEV, 6701 OBJECT_BDEV_IO, 1, 0, "type: "); 6702 spdk_trace_register_description("BDEV_IO_DONE", TRACE_BDEV_IO_DONE, OWNER_BDEV, 6703 OBJECT_BDEV_IO, 0, 0, ""); 6704 } 6705