1 /*- 2 * BSD LICENSE 3 * 4 * Copyright (c) Intel Corporation. All rights reserved. 5 * Copyright (c) 2019 Mellanox Technologies LTD. All rights reserved. 6 * 7 * Redistribution and use in source and binary forms, with or without 8 * modification, are permitted provided that the following conditions 9 * are met: 10 * 11 * * Redistributions of source code must retain the above copyright 12 * notice, this list of conditions and the following disclaimer. 13 * * Redistributions in binary form must reproduce the above copyright 14 * notice, this list of conditions and the following disclaimer in 15 * the documentation and/or other materials provided with the 16 * distribution. 17 * * Neither the name of Intel Corporation nor the names of its 18 * contributors may be used to endorse or promote products derived 19 * from this software without specific prior written permission. 20 * 21 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 22 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 23 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR 24 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT 25 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, 26 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT 27 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 28 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY 29 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 30 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE 31 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 32 */ 33 34 #include "spdk/stdinc.h" 35 36 #include "spdk/bdev.h" 37 #include "spdk/conf.h" 38 39 #include "spdk/config.h" 40 #include "spdk/env.h" 41 #include "spdk/thread.h" 42 #include "spdk/likely.h" 43 #include "spdk/queue.h" 44 #include "spdk/nvme_spec.h" 45 #include "spdk/scsi_spec.h" 46 #include "spdk/notify.h" 47 #include "spdk/util.h" 48 #include "spdk/trace.h" 49 50 #include "spdk/bdev_module.h" 51 #include "spdk_internal/log.h" 52 #include "spdk/string.h" 53 54 #include "bdev_internal.h" 55 56 #ifdef SPDK_CONFIG_VTUNE 57 #include "ittnotify.h" 58 #include "ittnotify_types.h" 59 int __itt_init_ittlib(const char *, __itt_group_id); 60 #endif 61 62 #define SPDK_BDEV_IO_POOL_SIZE (64 * 1024 - 1) 63 #define SPDK_BDEV_IO_CACHE_SIZE 256 64 #define SPDK_BDEV_AUTO_EXAMINE true 65 #define BUF_SMALL_POOL_SIZE 8191 66 #define BUF_LARGE_POOL_SIZE 1023 67 #define NOMEM_THRESHOLD_COUNT 8 68 #define ZERO_BUFFER_SIZE 0x100000 69 70 #define OWNER_BDEV 0x2 71 72 #define OBJECT_BDEV_IO 0x2 73 74 #define TRACE_GROUP_BDEV 0x3 75 #define TRACE_BDEV_IO_START SPDK_TPOINT_ID(TRACE_GROUP_BDEV, 0x0) 76 #define TRACE_BDEV_IO_DONE SPDK_TPOINT_ID(TRACE_GROUP_BDEV, 0x1) 77 78 #define SPDK_BDEV_QOS_TIMESLICE_IN_USEC 1000 79 #define SPDK_BDEV_QOS_MIN_IO_PER_TIMESLICE 1 80 #define SPDK_BDEV_QOS_MIN_BYTE_PER_TIMESLICE 512 81 #define SPDK_BDEV_QOS_MIN_IOS_PER_SEC 1000 82 #define SPDK_BDEV_QOS_MIN_BYTES_PER_SEC (1024 * 1024) 83 #define SPDK_BDEV_QOS_LIMIT_NOT_DEFINED UINT64_MAX 84 #define SPDK_BDEV_IO_POLL_INTERVAL_IN_MSEC 1000 85 86 #define SPDK_BDEV_POOL_ALIGNMENT 512 87 88 static const char *qos_conf_type[] = {"Limit_IOPS", 89 "Limit_BPS", "Limit_Read_BPS", "Limit_Write_BPS" 90 }; 91 static const char *qos_rpc_type[] = {"rw_ios_per_sec", 92 "rw_mbytes_per_sec", "r_mbytes_per_sec", "w_mbytes_per_sec" 93 }; 94 95 TAILQ_HEAD(spdk_bdev_list, spdk_bdev); 96 97 struct spdk_bdev_mgr { 98 struct spdk_mempool *bdev_io_pool; 99 100 struct spdk_mempool *buf_small_pool; 101 struct spdk_mempool *buf_large_pool; 102 103 void *zero_buffer; 104 105 TAILQ_HEAD(bdev_module_list, spdk_bdev_module) bdev_modules; 106 107 struct spdk_bdev_list bdevs; 108 109 bool init_complete; 110 bool module_init_complete; 111 112 pthread_mutex_t mutex; 113 114 #ifdef SPDK_CONFIG_VTUNE 115 __itt_domain *domain; 116 #endif 117 }; 118 119 static struct spdk_bdev_mgr g_bdev_mgr = { 120 .bdev_modules = TAILQ_HEAD_INITIALIZER(g_bdev_mgr.bdev_modules), 121 .bdevs = TAILQ_HEAD_INITIALIZER(g_bdev_mgr.bdevs), 122 .init_complete = false, 123 .module_init_complete = false, 124 .mutex = PTHREAD_MUTEX_INITIALIZER, 125 }; 126 127 typedef void (*lock_range_cb)(void *ctx, int status); 128 129 struct lba_range { 130 uint64_t offset; 131 uint64_t length; 132 void *locked_ctx; 133 struct spdk_bdev_channel *owner_ch; 134 TAILQ_ENTRY(lba_range) tailq; 135 }; 136 137 static struct spdk_bdev_opts g_bdev_opts = { 138 .bdev_io_pool_size = SPDK_BDEV_IO_POOL_SIZE, 139 .bdev_io_cache_size = SPDK_BDEV_IO_CACHE_SIZE, 140 .bdev_auto_examine = SPDK_BDEV_AUTO_EXAMINE, 141 }; 142 143 static spdk_bdev_init_cb g_init_cb_fn = NULL; 144 static void *g_init_cb_arg = NULL; 145 146 static spdk_bdev_fini_cb g_fini_cb_fn = NULL; 147 static void *g_fini_cb_arg = NULL; 148 static struct spdk_thread *g_fini_thread = NULL; 149 150 struct spdk_bdev_qos_limit { 151 /** IOs or bytes allowed per second (i.e., 1s). */ 152 uint64_t limit; 153 154 /** Remaining IOs or bytes allowed in current timeslice (e.g., 1ms). 155 * For remaining bytes, allowed to run negative if an I/O is submitted when 156 * some bytes are remaining, but the I/O is bigger than that amount. The 157 * excess will be deducted from the next timeslice. 158 */ 159 int64_t remaining_this_timeslice; 160 161 /** Minimum allowed IOs or bytes to be issued in one timeslice (e.g., 1ms). */ 162 uint32_t min_per_timeslice; 163 164 /** Maximum allowed IOs or bytes to be issued in one timeslice (e.g., 1ms). */ 165 uint32_t max_per_timeslice; 166 167 /** Function to check whether to queue the IO. */ 168 bool (*queue_io)(const struct spdk_bdev_qos_limit *limit, struct spdk_bdev_io *io); 169 170 /** Function to update for the submitted IO. */ 171 void (*update_quota)(struct spdk_bdev_qos_limit *limit, struct spdk_bdev_io *io); 172 }; 173 174 struct spdk_bdev_qos { 175 /** Types of structure of rate limits. */ 176 struct spdk_bdev_qos_limit rate_limits[SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES]; 177 178 /** The channel that all I/O are funneled through. */ 179 struct spdk_bdev_channel *ch; 180 181 /** The thread on which the poller is running. */ 182 struct spdk_thread *thread; 183 184 /** Queue of I/O waiting to be issued. */ 185 bdev_io_tailq_t queued; 186 187 /** Size of a timeslice in tsc ticks. */ 188 uint64_t timeslice_size; 189 190 /** Timestamp of start of last timeslice. */ 191 uint64_t last_timeslice; 192 193 /** Poller that processes queued I/O commands each time slice. */ 194 struct spdk_poller *poller; 195 }; 196 197 struct spdk_bdev_mgmt_channel { 198 bdev_io_stailq_t need_buf_small; 199 bdev_io_stailq_t need_buf_large; 200 201 /* 202 * Each thread keeps a cache of bdev_io - this allows 203 * bdev threads which are *not* DPDK threads to still 204 * benefit from a per-thread bdev_io cache. Without 205 * this, non-DPDK threads fetching from the mempool 206 * incur a cmpxchg on get and put. 207 */ 208 bdev_io_stailq_t per_thread_cache; 209 uint32_t per_thread_cache_count; 210 uint32_t bdev_io_cache_size; 211 212 TAILQ_HEAD(, spdk_bdev_shared_resource) shared_resources; 213 TAILQ_HEAD(, spdk_bdev_io_wait_entry) io_wait_queue; 214 }; 215 216 /* 217 * Per-module (or per-io_device) data. Multiple bdevs built on the same io_device 218 * will queue here their IO that awaits retry. It makes it possible to retry sending 219 * IO to one bdev after IO from other bdev completes. 220 */ 221 struct spdk_bdev_shared_resource { 222 /* The bdev management channel */ 223 struct spdk_bdev_mgmt_channel *mgmt_ch; 224 225 /* 226 * Count of I/O submitted to bdev module and waiting for completion. 227 * Incremented before submit_request() is called on an spdk_bdev_io. 228 */ 229 uint64_t io_outstanding; 230 231 /* 232 * Queue of IO awaiting retry because of a previous NOMEM status returned 233 * on this channel. 234 */ 235 bdev_io_tailq_t nomem_io; 236 237 /* 238 * Threshold which io_outstanding must drop to before retrying nomem_io. 239 */ 240 uint64_t nomem_threshold; 241 242 /* I/O channel allocated by a bdev module */ 243 struct spdk_io_channel *shared_ch; 244 245 /* Refcount of bdev channels using this resource */ 246 uint32_t ref; 247 248 TAILQ_ENTRY(spdk_bdev_shared_resource) link; 249 }; 250 251 #define BDEV_CH_RESET_IN_PROGRESS (1 << 0) 252 #define BDEV_CH_QOS_ENABLED (1 << 1) 253 254 struct spdk_bdev_channel { 255 struct spdk_bdev *bdev; 256 257 /* The channel for the underlying device */ 258 struct spdk_io_channel *channel; 259 260 /* Per io_device per thread data */ 261 struct spdk_bdev_shared_resource *shared_resource; 262 263 struct spdk_bdev_io_stat stat; 264 265 /* 266 * Count of I/O submitted to the underlying dev module through this channel 267 * and waiting for completion. 268 */ 269 uint64_t io_outstanding; 270 271 /* 272 * List of spdk_bdev_io directly associated with a call to the public bdev API. 273 * It does not include any spdk_bdev_io that are generated via splitting. 274 */ 275 bdev_io_tailq_t io_submitted; 276 277 /* 278 * List of spdk_bdev_io that are currently queued because they write to a locked 279 * LBA range. 280 */ 281 bdev_io_tailq_t io_locked; 282 283 uint32_t flags; 284 285 struct spdk_histogram_data *histogram; 286 287 #ifdef SPDK_CONFIG_VTUNE 288 uint64_t start_tsc; 289 uint64_t interval_tsc; 290 __itt_string_handle *handle; 291 struct spdk_bdev_io_stat prev_stat; 292 #endif 293 294 bdev_io_tailq_t queued_resets; 295 296 lba_range_tailq_t locked_ranges; 297 }; 298 299 struct media_event_entry { 300 struct spdk_bdev_media_event event; 301 TAILQ_ENTRY(media_event_entry) tailq; 302 }; 303 304 #define MEDIA_EVENT_POOL_SIZE 64 305 306 struct spdk_bdev_desc { 307 struct spdk_bdev *bdev; 308 struct spdk_thread *thread; 309 struct { 310 bool open_with_ext; 311 union { 312 spdk_bdev_remove_cb_t remove_fn; 313 spdk_bdev_event_cb_t event_fn; 314 }; 315 void *ctx; 316 } callback; 317 bool closed; 318 bool write; 319 pthread_mutex_t mutex; 320 uint32_t refs; 321 TAILQ_HEAD(, media_event_entry) pending_media_events; 322 TAILQ_HEAD(, media_event_entry) free_media_events; 323 struct media_event_entry *media_events_buffer; 324 TAILQ_ENTRY(spdk_bdev_desc) link; 325 326 uint64_t timeout_in_sec; 327 spdk_bdev_io_timeout_cb cb_fn; 328 void *cb_arg; 329 struct spdk_poller *io_timeout_poller; 330 }; 331 332 struct spdk_bdev_iostat_ctx { 333 struct spdk_bdev_io_stat *stat; 334 spdk_bdev_get_device_stat_cb cb; 335 void *cb_arg; 336 }; 337 338 struct set_qos_limit_ctx { 339 void (*cb_fn)(void *cb_arg, int status); 340 void *cb_arg; 341 struct spdk_bdev *bdev; 342 }; 343 344 #define __bdev_to_io_dev(bdev) (((char *)bdev) + 1) 345 #define __bdev_from_io_dev(io_dev) ((struct spdk_bdev *)(((char *)io_dev) - 1)) 346 347 static void bdev_write_zero_buffer_done(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg); 348 static void bdev_write_zero_buffer_next(void *_bdev_io); 349 350 static void bdev_enable_qos_msg(struct spdk_io_channel_iter *i); 351 static void bdev_enable_qos_done(struct spdk_io_channel_iter *i, int status); 352 353 static int 354 bdev_readv_blocks_with_md(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 355 struct iovec *iov, int iovcnt, void *md_buf, uint64_t offset_blocks, 356 uint64_t num_blocks, spdk_bdev_io_completion_cb cb, void *cb_arg); 357 static int 358 bdev_writev_blocks_with_md(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 359 struct iovec *iov, int iovcnt, void *md_buf, 360 uint64_t offset_blocks, uint64_t num_blocks, 361 spdk_bdev_io_completion_cb cb, void *cb_arg); 362 363 static int 364 bdev_lock_lba_range(struct spdk_bdev_desc *desc, struct spdk_io_channel *_ch, 365 uint64_t offset, uint64_t length, 366 lock_range_cb cb_fn, void *cb_arg); 367 368 static int 369 bdev_unlock_lba_range(struct spdk_bdev_desc *desc, struct spdk_io_channel *_ch, 370 uint64_t offset, uint64_t length, 371 lock_range_cb cb_fn, void *cb_arg); 372 373 static inline void bdev_io_complete(void *ctx); 374 375 static bool bdev_abort_queued_io(bdev_io_tailq_t *queue, struct spdk_bdev_io *bio_to_abort); 376 377 void 378 spdk_bdev_get_opts(struct spdk_bdev_opts *opts) 379 { 380 *opts = g_bdev_opts; 381 } 382 383 int 384 spdk_bdev_set_opts(struct spdk_bdev_opts *opts) 385 { 386 uint32_t min_pool_size; 387 388 /* 389 * Add 1 to the thread count to account for the extra mgmt_ch that gets created during subsystem 390 * initialization. A second mgmt_ch will be created on the same thread when the application starts 391 * but before the deferred put_io_channel event is executed for the first mgmt_ch. 392 */ 393 min_pool_size = opts->bdev_io_cache_size * (spdk_thread_get_count() + 1); 394 if (opts->bdev_io_pool_size < min_pool_size) { 395 SPDK_ERRLOG("bdev_io_pool_size %" PRIu32 " is not compatible with bdev_io_cache_size %" PRIu32 396 " and %" PRIu32 " threads\n", opts->bdev_io_pool_size, opts->bdev_io_cache_size, 397 spdk_thread_get_count()); 398 SPDK_ERRLOG("bdev_io_pool_size must be at least %" PRIu32 "\n", min_pool_size); 399 return -1; 400 } 401 402 g_bdev_opts = *opts; 403 return 0; 404 } 405 406 /* 407 * Will implement the whitelist in the furture 408 */ 409 static inline bool 410 bdev_in_examine_whitelist(struct spdk_bdev *bdev) 411 { 412 return false; 413 } 414 415 static inline bool 416 bdev_ok_to_examine(struct spdk_bdev *bdev) 417 { 418 if (g_bdev_opts.bdev_auto_examine) { 419 return true; 420 } else { 421 return bdev_in_examine_whitelist(bdev); 422 } 423 } 424 425 struct spdk_bdev * 426 spdk_bdev_first(void) 427 { 428 struct spdk_bdev *bdev; 429 430 bdev = TAILQ_FIRST(&g_bdev_mgr.bdevs); 431 if (bdev) { 432 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Starting bdev iteration at %s\n", bdev->name); 433 } 434 435 return bdev; 436 } 437 438 struct spdk_bdev * 439 spdk_bdev_next(struct spdk_bdev *prev) 440 { 441 struct spdk_bdev *bdev; 442 443 bdev = TAILQ_NEXT(prev, internal.link); 444 if (bdev) { 445 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Continuing bdev iteration at %s\n", bdev->name); 446 } 447 448 return bdev; 449 } 450 451 static struct spdk_bdev * 452 _bdev_next_leaf(struct spdk_bdev *bdev) 453 { 454 while (bdev != NULL) { 455 if (bdev->internal.claim_module == NULL) { 456 return bdev; 457 } else { 458 bdev = TAILQ_NEXT(bdev, internal.link); 459 } 460 } 461 462 return bdev; 463 } 464 465 struct spdk_bdev * 466 spdk_bdev_first_leaf(void) 467 { 468 struct spdk_bdev *bdev; 469 470 bdev = _bdev_next_leaf(TAILQ_FIRST(&g_bdev_mgr.bdevs)); 471 472 if (bdev) { 473 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Starting bdev iteration at %s\n", bdev->name); 474 } 475 476 return bdev; 477 } 478 479 struct spdk_bdev * 480 spdk_bdev_next_leaf(struct spdk_bdev *prev) 481 { 482 struct spdk_bdev *bdev; 483 484 bdev = _bdev_next_leaf(TAILQ_NEXT(prev, internal.link)); 485 486 if (bdev) { 487 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Continuing bdev iteration at %s\n", bdev->name); 488 } 489 490 return bdev; 491 } 492 493 struct spdk_bdev * 494 spdk_bdev_get_by_name(const char *bdev_name) 495 { 496 struct spdk_bdev_alias *tmp; 497 struct spdk_bdev *bdev = spdk_bdev_first(); 498 499 while (bdev != NULL) { 500 if (strcmp(bdev_name, bdev->name) == 0) { 501 return bdev; 502 } 503 504 TAILQ_FOREACH(tmp, &bdev->aliases, tailq) { 505 if (strcmp(bdev_name, tmp->alias) == 0) { 506 return bdev; 507 } 508 } 509 510 bdev = spdk_bdev_next(bdev); 511 } 512 513 return NULL; 514 } 515 516 void 517 spdk_bdev_io_set_buf(struct spdk_bdev_io *bdev_io, void *buf, size_t len) 518 { 519 struct iovec *iovs; 520 521 if (bdev_io->u.bdev.iovs == NULL) { 522 bdev_io->u.bdev.iovs = &bdev_io->iov; 523 bdev_io->u.bdev.iovcnt = 1; 524 } 525 526 iovs = bdev_io->u.bdev.iovs; 527 528 assert(iovs != NULL); 529 assert(bdev_io->u.bdev.iovcnt >= 1); 530 531 iovs[0].iov_base = buf; 532 iovs[0].iov_len = len; 533 } 534 535 void 536 spdk_bdev_io_set_md_buf(struct spdk_bdev_io *bdev_io, void *md_buf, size_t len) 537 { 538 assert((len / spdk_bdev_get_md_size(bdev_io->bdev)) >= bdev_io->u.bdev.num_blocks); 539 bdev_io->u.bdev.md_buf = md_buf; 540 } 541 542 static bool 543 _is_buf_allocated(const struct iovec *iovs) 544 { 545 if (iovs == NULL) { 546 return false; 547 } 548 549 return iovs[0].iov_base != NULL; 550 } 551 552 static bool 553 _are_iovs_aligned(struct iovec *iovs, int iovcnt, uint32_t alignment) 554 { 555 int i; 556 uintptr_t iov_base; 557 558 if (spdk_likely(alignment == 1)) { 559 return true; 560 } 561 562 for (i = 0; i < iovcnt; i++) { 563 iov_base = (uintptr_t)iovs[i].iov_base; 564 if ((iov_base & (alignment - 1)) != 0) { 565 return false; 566 } 567 } 568 569 return true; 570 } 571 572 static void 573 _copy_iovs_to_buf(void *buf, size_t buf_len, struct iovec *iovs, int iovcnt) 574 { 575 int i; 576 size_t len; 577 578 for (i = 0; i < iovcnt; i++) { 579 len = spdk_min(iovs[i].iov_len, buf_len); 580 memcpy(buf, iovs[i].iov_base, len); 581 buf += len; 582 buf_len -= len; 583 } 584 } 585 586 static void 587 _copy_buf_to_iovs(struct iovec *iovs, int iovcnt, void *buf, size_t buf_len) 588 { 589 int i; 590 size_t len; 591 592 for (i = 0; i < iovcnt; i++) { 593 len = spdk_min(iovs[i].iov_len, buf_len); 594 memcpy(iovs[i].iov_base, buf, len); 595 buf += len; 596 buf_len -= len; 597 } 598 } 599 600 static void 601 _bdev_io_set_bounce_buf(struct spdk_bdev_io *bdev_io, void *buf, size_t len) 602 { 603 /* save original iovec */ 604 bdev_io->internal.orig_iovs = bdev_io->u.bdev.iovs; 605 bdev_io->internal.orig_iovcnt = bdev_io->u.bdev.iovcnt; 606 /* set bounce iov */ 607 bdev_io->u.bdev.iovs = &bdev_io->internal.bounce_iov; 608 bdev_io->u.bdev.iovcnt = 1; 609 /* set bounce buffer for this operation */ 610 bdev_io->u.bdev.iovs[0].iov_base = buf; 611 bdev_io->u.bdev.iovs[0].iov_len = len; 612 /* if this is write path, copy data from original buffer to bounce buffer */ 613 if (bdev_io->type == SPDK_BDEV_IO_TYPE_WRITE) { 614 _copy_iovs_to_buf(buf, len, bdev_io->internal.orig_iovs, bdev_io->internal.orig_iovcnt); 615 } 616 } 617 618 static void 619 _bdev_io_set_bounce_md_buf(struct spdk_bdev_io *bdev_io, void *md_buf, size_t len) 620 { 621 /* save original md_buf */ 622 bdev_io->internal.orig_md_buf = bdev_io->u.bdev.md_buf; 623 /* set bounce md_buf */ 624 bdev_io->u.bdev.md_buf = md_buf; 625 626 if (bdev_io->type == SPDK_BDEV_IO_TYPE_WRITE) { 627 memcpy(md_buf, bdev_io->internal.orig_md_buf, len); 628 } 629 } 630 631 static void 632 bdev_io_get_buf_complete(struct spdk_bdev_io *bdev_io, void *buf, bool status) 633 { 634 struct spdk_io_channel *ch = spdk_bdev_io_get_io_channel(bdev_io); 635 636 if (spdk_unlikely(bdev_io->internal.get_aux_buf_cb != NULL)) { 637 bdev_io->internal.get_aux_buf_cb(ch, bdev_io, buf); 638 bdev_io->internal.get_aux_buf_cb = NULL; 639 } else { 640 assert(bdev_io->internal.get_buf_cb != NULL); 641 bdev_io->internal.buf = buf; 642 bdev_io->internal.get_buf_cb(ch, bdev_io, status); 643 bdev_io->internal.get_buf_cb = NULL; 644 } 645 } 646 647 static void 648 _bdev_io_set_buf(struct spdk_bdev_io *bdev_io, void *buf, uint64_t len) 649 { 650 struct spdk_bdev *bdev = bdev_io->bdev; 651 bool buf_allocated; 652 uint64_t md_len, alignment; 653 void *aligned_buf; 654 655 if (spdk_unlikely(bdev_io->internal.get_aux_buf_cb != NULL)) { 656 bdev_io_get_buf_complete(bdev_io, buf, true); 657 return; 658 } 659 660 alignment = spdk_bdev_get_buf_align(bdev); 661 buf_allocated = _is_buf_allocated(bdev_io->u.bdev.iovs); 662 aligned_buf = (void *)(((uintptr_t)buf + (alignment - 1)) & ~(alignment - 1)); 663 664 if (buf_allocated) { 665 _bdev_io_set_bounce_buf(bdev_io, aligned_buf, len); 666 } else { 667 spdk_bdev_io_set_buf(bdev_io, aligned_buf, len); 668 } 669 670 if (spdk_bdev_is_md_separate(bdev)) { 671 aligned_buf = (char *)aligned_buf + len; 672 md_len = bdev_io->u.bdev.num_blocks * bdev->md_len; 673 674 assert(((uintptr_t)aligned_buf & (alignment - 1)) == 0); 675 676 if (bdev_io->u.bdev.md_buf != NULL) { 677 _bdev_io_set_bounce_md_buf(bdev_io, aligned_buf, md_len); 678 } else { 679 spdk_bdev_io_set_md_buf(bdev_io, aligned_buf, md_len); 680 } 681 } 682 bdev_io_get_buf_complete(bdev_io, buf, true); 683 } 684 685 static void 686 _bdev_io_put_buf(struct spdk_bdev_io *bdev_io, void *buf, uint64_t buf_len) 687 { 688 struct spdk_bdev *bdev = bdev_io->bdev; 689 struct spdk_mempool *pool; 690 struct spdk_bdev_io *tmp; 691 bdev_io_stailq_t *stailq; 692 struct spdk_bdev_mgmt_channel *ch; 693 uint64_t md_len, alignment; 694 695 md_len = spdk_bdev_is_md_separate(bdev) ? bdev_io->u.bdev.num_blocks * bdev->md_len : 0; 696 alignment = spdk_bdev_get_buf_align(bdev); 697 ch = bdev_io->internal.ch->shared_resource->mgmt_ch; 698 699 if (buf_len + alignment + md_len <= SPDK_BDEV_BUF_SIZE_WITH_MD(SPDK_BDEV_SMALL_BUF_MAX_SIZE) + 700 SPDK_BDEV_POOL_ALIGNMENT) { 701 pool = g_bdev_mgr.buf_small_pool; 702 stailq = &ch->need_buf_small; 703 } else { 704 pool = g_bdev_mgr.buf_large_pool; 705 stailq = &ch->need_buf_large; 706 } 707 708 if (STAILQ_EMPTY(stailq)) { 709 spdk_mempool_put(pool, buf); 710 } else { 711 tmp = STAILQ_FIRST(stailq); 712 STAILQ_REMOVE_HEAD(stailq, internal.buf_link); 713 _bdev_io_set_buf(tmp, buf, tmp->internal.buf_len); 714 } 715 } 716 717 static void 718 bdev_io_put_buf(struct spdk_bdev_io *bdev_io) 719 { 720 assert(bdev_io->internal.buf != NULL); 721 _bdev_io_put_buf(bdev_io, bdev_io->internal.buf, bdev_io->internal.buf_len); 722 bdev_io->internal.buf = NULL; 723 } 724 725 void 726 spdk_bdev_io_put_aux_buf(struct spdk_bdev_io *bdev_io, void *buf) 727 { 728 uint64_t len = bdev_io->u.bdev.num_blocks * bdev_io->bdev->blocklen; 729 730 assert(buf != NULL); 731 _bdev_io_put_buf(bdev_io, buf, len); 732 } 733 734 static void 735 _bdev_io_unset_bounce_buf(struct spdk_bdev_io *bdev_io) 736 { 737 if (spdk_likely(bdev_io->internal.orig_iovcnt == 0)) { 738 assert(bdev_io->internal.orig_md_buf == NULL); 739 return; 740 } 741 742 /* if this is read path, copy data from bounce buffer to original buffer */ 743 if (bdev_io->type == SPDK_BDEV_IO_TYPE_READ && 744 bdev_io->internal.status == SPDK_BDEV_IO_STATUS_SUCCESS) { 745 _copy_buf_to_iovs(bdev_io->internal.orig_iovs, 746 bdev_io->internal.orig_iovcnt, 747 bdev_io->internal.bounce_iov.iov_base, 748 bdev_io->internal.bounce_iov.iov_len); 749 } 750 /* set original buffer for this io */ 751 bdev_io->u.bdev.iovcnt = bdev_io->internal.orig_iovcnt; 752 bdev_io->u.bdev.iovs = bdev_io->internal.orig_iovs; 753 /* disable bouncing buffer for this io */ 754 bdev_io->internal.orig_iovcnt = 0; 755 bdev_io->internal.orig_iovs = NULL; 756 757 /* do the same for metadata buffer */ 758 if (spdk_unlikely(bdev_io->internal.orig_md_buf != NULL)) { 759 assert(spdk_bdev_is_md_separate(bdev_io->bdev)); 760 761 if (bdev_io->type == SPDK_BDEV_IO_TYPE_READ && 762 bdev_io->internal.status == SPDK_BDEV_IO_STATUS_SUCCESS) { 763 memcpy(bdev_io->internal.orig_md_buf, bdev_io->u.bdev.md_buf, 764 bdev_io->u.bdev.num_blocks * spdk_bdev_get_md_size(bdev_io->bdev)); 765 } 766 767 bdev_io->u.bdev.md_buf = bdev_io->internal.orig_md_buf; 768 bdev_io->internal.orig_md_buf = NULL; 769 } 770 771 /* We want to free the bounce buffer here since we know we're done with it (as opposed 772 * to waiting for the conditional free of internal.buf in spdk_bdev_free_io()). 773 */ 774 bdev_io_put_buf(bdev_io); 775 } 776 777 static void 778 bdev_io_get_buf(struct spdk_bdev_io *bdev_io, uint64_t len) 779 { 780 struct spdk_bdev *bdev = bdev_io->bdev; 781 struct spdk_mempool *pool; 782 bdev_io_stailq_t *stailq; 783 struct spdk_bdev_mgmt_channel *mgmt_ch; 784 uint64_t alignment, md_len; 785 void *buf; 786 787 alignment = spdk_bdev_get_buf_align(bdev); 788 md_len = spdk_bdev_is_md_separate(bdev) ? bdev_io->u.bdev.num_blocks * bdev->md_len : 0; 789 790 if (len + alignment + md_len > SPDK_BDEV_BUF_SIZE_WITH_MD(SPDK_BDEV_LARGE_BUF_MAX_SIZE) + 791 SPDK_BDEV_POOL_ALIGNMENT) { 792 SPDK_ERRLOG("Length + alignment %" PRIu64 " is larger than allowed\n", 793 len + alignment); 794 bdev_io_get_buf_complete(bdev_io, NULL, false); 795 return; 796 } 797 798 mgmt_ch = bdev_io->internal.ch->shared_resource->mgmt_ch; 799 800 bdev_io->internal.buf_len = len; 801 802 if (len + alignment + md_len <= SPDK_BDEV_BUF_SIZE_WITH_MD(SPDK_BDEV_SMALL_BUF_MAX_SIZE) + 803 SPDK_BDEV_POOL_ALIGNMENT) { 804 pool = g_bdev_mgr.buf_small_pool; 805 stailq = &mgmt_ch->need_buf_small; 806 } else { 807 pool = g_bdev_mgr.buf_large_pool; 808 stailq = &mgmt_ch->need_buf_large; 809 } 810 811 buf = spdk_mempool_get(pool); 812 if (!buf) { 813 STAILQ_INSERT_TAIL(stailq, bdev_io, internal.buf_link); 814 } else { 815 _bdev_io_set_buf(bdev_io, buf, len); 816 } 817 } 818 819 void 820 spdk_bdev_io_get_buf(struct spdk_bdev_io *bdev_io, spdk_bdev_io_get_buf_cb cb, uint64_t len) 821 { 822 struct spdk_bdev *bdev = bdev_io->bdev; 823 uint64_t alignment; 824 825 assert(cb != NULL); 826 bdev_io->internal.get_buf_cb = cb; 827 828 alignment = spdk_bdev_get_buf_align(bdev); 829 830 if (_is_buf_allocated(bdev_io->u.bdev.iovs) && 831 _are_iovs_aligned(bdev_io->u.bdev.iovs, bdev_io->u.bdev.iovcnt, alignment)) { 832 /* Buffer already present and aligned */ 833 cb(spdk_bdev_io_get_io_channel(bdev_io), bdev_io, true); 834 return; 835 } 836 837 bdev_io_get_buf(bdev_io, len); 838 } 839 840 void 841 spdk_bdev_io_get_aux_buf(struct spdk_bdev_io *bdev_io, spdk_bdev_io_get_aux_buf_cb cb) 842 { 843 uint64_t len = bdev_io->u.bdev.num_blocks * bdev_io->bdev->blocklen; 844 845 assert(cb != NULL); 846 assert(bdev_io->internal.get_aux_buf_cb == NULL); 847 bdev_io->internal.get_aux_buf_cb = cb; 848 bdev_io_get_buf(bdev_io, len); 849 } 850 851 static int 852 bdev_module_get_max_ctx_size(void) 853 { 854 struct spdk_bdev_module *bdev_module; 855 int max_bdev_module_size = 0; 856 857 TAILQ_FOREACH(bdev_module, &g_bdev_mgr.bdev_modules, internal.tailq) { 858 if (bdev_module->get_ctx_size && bdev_module->get_ctx_size() > max_bdev_module_size) { 859 max_bdev_module_size = bdev_module->get_ctx_size(); 860 } 861 } 862 863 return max_bdev_module_size; 864 } 865 866 void 867 spdk_bdev_config_text(FILE *fp) 868 { 869 struct spdk_bdev_module *bdev_module; 870 871 TAILQ_FOREACH(bdev_module, &g_bdev_mgr.bdev_modules, internal.tailq) { 872 if (bdev_module->config_text) { 873 bdev_module->config_text(fp); 874 } 875 } 876 } 877 878 static void 879 bdev_qos_config_json(struct spdk_bdev *bdev, struct spdk_json_write_ctx *w) 880 { 881 int i; 882 struct spdk_bdev_qos *qos = bdev->internal.qos; 883 uint64_t limits[SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES]; 884 885 if (!qos) { 886 return; 887 } 888 889 spdk_bdev_get_qos_rate_limits(bdev, limits); 890 891 spdk_json_write_object_begin(w); 892 spdk_json_write_named_string(w, "method", "bdev_set_qos_limit"); 893 894 spdk_json_write_named_object_begin(w, "params"); 895 spdk_json_write_named_string(w, "name", bdev->name); 896 for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) { 897 if (limits[i] > 0) { 898 spdk_json_write_named_uint64(w, qos_rpc_type[i], limits[i]); 899 } 900 } 901 spdk_json_write_object_end(w); 902 903 spdk_json_write_object_end(w); 904 } 905 906 void 907 spdk_bdev_subsystem_config_json(struct spdk_json_write_ctx *w) 908 { 909 struct spdk_bdev_module *bdev_module; 910 struct spdk_bdev *bdev; 911 912 assert(w != NULL); 913 914 spdk_json_write_array_begin(w); 915 916 spdk_json_write_object_begin(w); 917 spdk_json_write_named_string(w, "method", "bdev_set_options"); 918 spdk_json_write_named_object_begin(w, "params"); 919 spdk_json_write_named_uint32(w, "bdev_io_pool_size", g_bdev_opts.bdev_io_pool_size); 920 spdk_json_write_named_uint32(w, "bdev_io_cache_size", g_bdev_opts.bdev_io_cache_size); 921 spdk_json_write_named_bool(w, "bdev_auto_examine", g_bdev_opts.bdev_auto_examine); 922 spdk_json_write_object_end(w); 923 spdk_json_write_object_end(w); 924 925 TAILQ_FOREACH(bdev_module, &g_bdev_mgr.bdev_modules, internal.tailq) { 926 if (bdev_module->config_json) { 927 bdev_module->config_json(w); 928 } 929 } 930 931 pthread_mutex_lock(&g_bdev_mgr.mutex); 932 933 TAILQ_FOREACH(bdev, &g_bdev_mgr.bdevs, internal.link) { 934 if (bdev->fn_table->write_config_json) { 935 bdev->fn_table->write_config_json(bdev, w); 936 } 937 938 bdev_qos_config_json(bdev, w); 939 } 940 941 pthread_mutex_unlock(&g_bdev_mgr.mutex); 942 943 spdk_json_write_array_end(w); 944 } 945 946 static int 947 bdev_mgmt_channel_create(void *io_device, void *ctx_buf) 948 { 949 struct spdk_bdev_mgmt_channel *ch = ctx_buf; 950 struct spdk_bdev_io *bdev_io; 951 uint32_t i; 952 953 STAILQ_INIT(&ch->need_buf_small); 954 STAILQ_INIT(&ch->need_buf_large); 955 956 STAILQ_INIT(&ch->per_thread_cache); 957 ch->bdev_io_cache_size = g_bdev_opts.bdev_io_cache_size; 958 959 /* Pre-populate bdev_io cache to ensure this thread cannot be starved. */ 960 ch->per_thread_cache_count = 0; 961 for (i = 0; i < ch->bdev_io_cache_size; i++) { 962 bdev_io = spdk_mempool_get(g_bdev_mgr.bdev_io_pool); 963 assert(bdev_io != NULL); 964 ch->per_thread_cache_count++; 965 STAILQ_INSERT_HEAD(&ch->per_thread_cache, bdev_io, internal.buf_link); 966 } 967 968 TAILQ_INIT(&ch->shared_resources); 969 TAILQ_INIT(&ch->io_wait_queue); 970 971 return 0; 972 } 973 974 static void 975 bdev_mgmt_channel_destroy(void *io_device, void *ctx_buf) 976 { 977 struct spdk_bdev_mgmt_channel *ch = ctx_buf; 978 struct spdk_bdev_io *bdev_io; 979 980 if (!STAILQ_EMPTY(&ch->need_buf_small) || !STAILQ_EMPTY(&ch->need_buf_large)) { 981 SPDK_ERRLOG("Pending I/O list wasn't empty on mgmt channel free\n"); 982 } 983 984 if (!TAILQ_EMPTY(&ch->shared_resources)) { 985 SPDK_ERRLOG("Module channel list wasn't empty on mgmt channel free\n"); 986 } 987 988 while (!STAILQ_EMPTY(&ch->per_thread_cache)) { 989 bdev_io = STAILQ_FIRST(&ch->per_thread_cache); 990 STAILQ_REMOVE_HEAD(&ch->per_thread_cache, internal.buf_link); 991 ch->per_thread_cache_count--; 992 spdk_mempool_put(g_bdev_mgr.bdev_io_pool, (void *)bdev_io); 993 } 994 995 assert(ch->per_thread_cache_count == 0); 996 } 997 998 static void 999 bdev_init_complete(int rc) 1000 { 1001 spdk_bdev_init_cb cb_fn = g_init_cb_fn; 1002 void *cb_arg = g_init_cb_arg; 1003 struct spdk_bdev_module *m; 1004 1005 g_bdev_mgr.init_complete = true; 1006 g_init_cb_fn = NULL; 1007 g_init_cb_arg = NULL; 1008 1009 /* 1010 * For modules that need to know when subsystem init is complete, 1011 * inform them now. 1012 */ 1013 if (rc == 0) { 1014 TAILQ_FOREACH(m, &g_bdev_mgr.bdev_modules, internal.tailq) { 1015 if (m->init_complete) { 1016 m->init_complete(); 1017 } 1018 } 1019 } 1020 1021 cb_fn(cb_arg, rc); 1022 } 1023 1024 static void 1025 bdev_module_action_complete(void) 1026 { 1027 struct spdk_bdev_module *m; 1028 1029 /* 1030 * Don't finish bdev subsystem initialization if 1031 * module pre-initialization is still in progress, or 1032 * the subsystem been already initialized. 1033 */ 1034 if (!g_bdev_mgr.module_init_complete || g_bdev_mgr.init_complete) { 1035 return; 1036 } 1037 1038 /* 1039 * Check all bdev modules for inits/examinations in progress. If any 1040 * exist, return immediately since we cannot finish bdev subsystem 1041 * initialization until all are completed. 1042 */ 1043 TAILQ_FOREACH(m, &g_bdev_mgr.bdev_modules, internal.tailq) { 1044 if (m->internal.action_in_progress > 0) { 1045 return; 1046 } 1047 } 1048 1049 /* 1050 * Modules already finished initialization - now that all 1051 * the bdev modules have finished their asynchronous I/O 1052 * processing, the entire bdev layer can be marked as complete. 1053 */ 1054 bdev_init_complete(0); 1055 } 1056 1057 static void 1058 bdev_module_action_done(struct spdk_bdev_module *module) 1059 { 1060 assert(module->internal.action_in_progress > 0); 1061 module->internal.action_in_progress--; 1062 bdev_module_action_complete(); 1063 } 1064 1065 void 1066 spdk_bdev_module_init_done(struct spdk_bdev_module *module) 1067 { 1068 bdev_module_action_done(module); 1069 } 1070 1071 void 1072 spdk_bdev_module_examine_done(struct spdk_bdev_module *module) 1073 { 1074 bdev_module_action_done(module); 1075 } 1076 1077 /** The last initialized bdev module */ 1078 static struct spdk_bdev_module *g_resume_bdev_module = NULL; 1079 1080 static void 1081 bdev_init_failed(void *cb_arg) 1082 { 1083 struct spdk_bdev_module *module = cb_arg; 1084 1085 module->internal.action_in_progress--; 1086 bdev_init_complete(-1); 1087 } 1088 1089 static int 1090 bdev_modules_init(void) 1091 { 1092 struct spdk_bdev_module *module; 1093 int rc = 0; 1094 1095 TAILQ_FOREACH(module, &g_bdev_mgr.bdev_modules, internal.tailq) { 1096 g_resume_bdev_module = module; 1097 if (module->async_init) { 1098 module->internal.action_in_progress = 1; 1099 } 1100 rc = module->module_init(); 1101 if (rc != 0) { 1102 /* Bump action_in_progress to prevent other modules from completion of modules_init 1103 * Send message to defer application shutdown until resources are cleaned up */ 1104 module->internal.action_in_progress = 1; 1105 spdk_thread_send_msg(spdk_get_thread(), bdev_init_failed, module); 1106 return rc; 1107 } 1108 } 1109 1110 g_resume_bdev_module = NULL; 1111 return 0; 1112 } 1113 1114 void 1115 spdk_bdev_initialize(spdk_bdev_init_cb cb_fn, void *cb_arg) 1116 { 1117 struct spdk_conf_section *sp; 1118 struct spdk_bdev_opts bdev_opts; 1119 int32_t bdev_io_pool_size, bdev_io_cache_size; 1120 int cache_size; 1121 int rc = 0; 1122 char mempool_name[32]; 1123 1124 assert(cb_fn != NULL); 1125 1126 sp = spdk_conf_find_section(NULL, "Bdev"); 1127 if (sp != NULL) { 1128 spdk_bdev_get_opts(&bdev_opts); 1129 1130 bdev_io_pool_size = spdk_conf_section_get_intval(sp, "BdevIoPoolSize"); 1131 if (bdev_io_pool_size >= 0) { 1132 bdev_opts.bdev_io_pool_size = bdev_io_pool_size; 1133 } 1134 1135 bdev_io_cache_size = spdk_conf_section_get_intval(sp, "BdevIoCacheSize"); 1136 if (bdev_io_cache_size >= 0) { 1137 bdev_opts.bdev_io_cache_size = bdev_io_cache_size; 1138 } 1139 1140 if (spdk_bdev_set_opts(&bdev_opts)) { 1141 bdev_init_complete(-1); 1142 return; 1143 } 1144 1145 assert(memcmp(&bdev_opts, &g_bdev_opts, sizeof(bdev_opts)) == 0); 1146 } 1147 1148 g_init_cb_fn = cb_fn; 1149 g_init_cb_arg = cb_arg; 1150 1151 spdk_notify_type_register("bdev_register"); 1152 spdk_notify_type_register("bdev_unregister"); 1153 1154 snprintf(mempool_name, sizeof(mempool_name), "bdev_io_%d", getpid()); 1155 1156 g_bdev_mgr.bdev_io_pool = spdk_mempool_create(mempool_name, 1157 g_bdev_opts.bdev_io_pool_size, 1158 sizeof(struct spdk_bdev_io) + 1159 bdev_module_get_max_ctx_size(), 1160 0, 1161 SPDK_ENV_SOCKET_ID_ANY); 1162 1163 if (g_bdev_mgr.bdev_io_pool == NULL) { 1164 SPDK_ERRLOG("could not allocate spdk_bdev_io pool\n"); 1165 bdev_init_complete(-1); 1166 return; 1167 } 1168 1169 /** 1170 * Ensure no more than half of the total buffers end up local caches, by 1171 * using spdk_thread_get_count() to determine how many local caches we need 1172 * to account for. 1173 */ 1174 cache_size = BUF_SMALL_POOL_SIZE / (2 * spdk_thread_get_count()); 1175 snprintf(mempool_name, sizeof(mempool_name), "buf_small_pool_%d", getpid()); 1176 1177 g_bdev_mgr.buf_small_pool = spdk_mempool_create(mempool_name, 1178 BUF_SMALL_POOL_SIZE, 1179 SPDK_BDEV_BUF_SIZE_WITH_MD(SPDK_BDEV_SMALL_BUF_MAX_SIZE) + 1180 SPDK_BDEV_POOL_ALIGNMENT, 1181 cache_size, 1182 SPDK_ENV_SOCKET_ID_ANY); 1183 if (!g_bdev_mgr.buf_small_pool) { 1184 SPDK_ERRLOG("create rbuf small pool failed\n"); 1185 bdev_init_complete(-1); 1186 return; 1187 } 1188 1189 cache_size = BUF_LARGE_POOL_SIZE / (2 * spdk_thread_get_count()); 1190 snprintf(mempool_name, sizeof(mempool_name), "buf_large_pool_%d", getpid()); 1191 1192 g_bdev_mgr.buf_large_pool = spdk_mempool_create(mempool_name, 1193 BUF_LARGE_POOL_SIZE, 1194 SPDK_BDEV_BUF_SIZE_WITH_MD(SPDK_BDEV_LARGE_BUF_MAX_SIZE) + 1195 SPDK_BDEV_POOL_ALIGNMENT, 1196 cache_size, 1197 SPDK_ENV_SOCKET_ID_ANY); 1198 if (!g_bdev_mgr.buf_large_pool) { 1199 SPDK_ERRLOG("create rbuf large pool failed\n"); 1200 bdev_init_complete(-1); 1201 return; 1202 } 1203 1204 g_bdev_mgr.zero_buffer = spdk_zmalloc(ZERO_BUFFER_SIZE, ZERO_BUFFER_SIZE, 1205 NULL, SPDK_ENV_LCORE_ID_ANY, SPDK_MALLOC_DMA); 1206 if (!g_bdev_mgr.zero_buffer) { 1207 SPDK_ERRLOG("create bdev zero buffer failed\n"); 1208 bdev_init_complete(-1); 1209 return; 1210 } 1211 1212 #ifdef SPDK_CONFIG_VTUNE 1213 g_bdev_mgr.domain = __itt_domain_create("spdk_bdev"); 1214 #endif 1215 1216 spdk_io_device_register(&g_bdev_mgr, bdev_mgmt_channel_create, 1217 bdev_mgmt_channel_destroy, 1218 sizeof(struct spdk_bdev_mgmt_channel), 1219 "bdev_mgr"); 1220 1221 rc = bdev_modules_init(); 1222 g_bdev_mgr.module_init_complete = true; 1223 if (rc != 0) { 1224 SPDK_ERRLOG("bdev modules init failed\n"); 1225 return; 1226 } 1227 1228 bdev_module_action_complete(); 1229 } 1230 1231 static void 1232 bdev_mgr_unregister_cb(void *io_device) 1233 { 1234 spdk_bdev_fini_cb cb_fn = g_fini_cb_fn; 1235 1236 if (g_bdev_mgr.bdev_io_pool) { 1237 if (spdk_mempool_count(g_bdev_mgr.bdev_io_pool) != g_bdev_opts.bdev_io_pool_size) { 1238 SPDK_ERRLOG("bdev IO pool count is %zu but should be %u\n", 1239 spdk_mempool_count(g_bdev_mgr.bdev_io_pool), 1240 g_bdev_opts.bdev_io_pool_size); 1241 } 1242 1243 spdk_mempool_free(g_bdev_mgr.bdev_io_pool); 1244 } 1245 1246 if (g_bdev_mgr.buf_small_pool) { 1247 if (spdk_mempool_count(g_bdev_mgr.buf_small_pool) != BUF_SMALL_POOL_SIZE) { 1248 SPDK_ERRLOG("Small buffer pool count is %zu but should be %u\n", 1249 spdk_mempool_count(g_bdev_mgr.buf_small_pool), 1250 BUF_SMALL_POOL_SIZE); 1251 assert(false); 1252 } 1253 1254 spdk_mempool_free(g_bdev_mgr.buf_small_pool); 1255 } 1256 1257 if (g_bdev_mgr.buf_large_pool) { 1258 if (spdk_mempool_count(g_bdev_mgr.buf_large_pool) != BUF_LARGE_POOL_SIZE) { 1259 SPDK_ERRLOG("Large buffer pool count is %zu but should be %u\n", 1260 spdk_mempool_count(g_bdev_mgr.buf_large_pool), 1261 BUF_LARGE_POOL_SIZE); 1262 assert(false); 1263 } 1264 1265 spdk_mempool_free(g_bdev_mgr.buf_large_pool); 1266 } 1267 1268 spdk_free(g_bdev_mgr.zero_buffer); 1269 1270 cb_fn(g_fini_cb_arg); 1271 g_fini_cb_fn = NULL; 1272 g_fini_cb_arg = NULL; 1273 g_bdev_mgr.init_complete = false; 1274 g_bdev_mgr.module_init_complete = false; 1275 pthread_mutex_destroy(&g_bdev_mgr.mutex); 1276 } 1277 1278 static void 1279 bdev_module_finish_iter(void *arg) 1280 { 1281 struct spdk_bdev_module *bdev_module; 1282 1283 /* FIXME: Handling initialization failures is broken now, 1284 * so we won't even try cleaning up after successfully 1285 * initialized modules. if module_init_complete is false, 1286 * just call spdk_bdev_mgr_unregister_cb 1287 */ 1288 if (!g_bdev_mgr.module_init_complete) { 1289 bdev_mgr_unregister_cb(NULL); 1290 return; 1291 } 1292 1293 /* Start iterating from the last touched module */ 1294 if (!g_resume_bdev_module) { 1295 bdev_module = TAILQ_LAST(&g_bdev_mgr.bdev_modules, bdev_module_list); 1296 } else { 1297 bdev_module = TAILQ_PREV(g_resume_bdev_module, bdev_module_list, 1298 internal.tailq); 1299 } 1300 1301 while (bdev_module) { 1302 if (bdev_module->async_fini) { 1303 /* Save our place so we can resume later. We must 1304 * save the variable here, before calling module_fini() 1305 * below, because in some cases the module may immediately 1306 * call spdk_bdev_module_finish_done() and re-enter 1307 * this function to continue iterating. */ 1308 g_resume_bdev_module = bdev_module; 1309 } 1310 1311 if (bdev_module->module_fini) { 1312 bdev_module->module_fini(); 1313 } 1314 1315 if (bdev_module->async_fini) { 1316 return; 1317 } 1318 1319 bdev_module = TAILQ_PREV(bdev_module, bdev_module_list, 1320 internal.tailq); 1321 } 1322 1323 g_resume_bdev_module = NULL; 1324 spdk_io_device_unregister(&g_bdev_mgr, bdev_mgr_unregister_cb); 1325 } 1326 1327 void 1328 spdk_bdev_module_finish_done(void) 1329 { 1330 if (spdk_get_thread() != g_fini_thread) { 1331 spdk_thread_send_msg(g_fini_thread, bdev_module_finish_iter, NULL); 1332 } else { 1333 bdev_module_finish_iter(NULL); 1334 } 1335 } 1336 1337 static void 1338 bdev_finish_unregister_bdevs_iter(void *cb_arg, int bdeverrno) 1339 { 1340 struct spdk_bdev *bdev = cb_arg; 1341 1342 if (bdeverrno && bdev) { 1343 SPDK_WARNLOG("Unable to unregister bdev '%s' during spdk_bdev_finish()\n", 1344 bdev->name); 1345 1346 /* 1347 * Since the call to spdk_bdev_unregister() failed, we have no way to free this 1348 * bdev; try to continue by manually removing this bdev from the list and continue 1349 * with the next bdev in the list. 1350 */ 1351 TAILQ_REMOVE(&g_bdev_mgr.bdevs, bdev, internal.link); 1352 } 1353 1354 if (TAILQ_EMPTY(&g_bdev_mgr.bdevs)) { 1355 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Done unregistering bdevs\n"); 1356 /* 1357 * Bdev module finish need to be deferred as we might be in the middle of some context 1358 * (like bdev part free) that will use this bdev (or private bdev driver ctx data) 1359 * after returning. 1360 */ 1361 spdk_thread_send_msg(spdk_get_thread(), bdev_module_finish_iter, NULL); 1362 return; 1363 } 1364 1365 /* 1366 * Unregister last unclaimed bdev in the list, to ensure that bdev subsystem 1367 * shutdown proceeds top-down. The goal is to give virtual bdevs an opportunity 1368 * to detect clean shutdown as opposed to run-time hot removal of the underlying 1369 * base bdevs. 1370 * 1371 * Also, walk the list in the reverse order. 1372 */ 1373 for (bdev = TAILQ_LAST(&g_bdev_mgr.bdevs, spdk_bdev_list); 1374 bdev; bdev = TAILQ_PREV(bdev, spdk_bdev_list, internal.link)) { 1375 if (bdev->internal.claim_module != NULL) { 1376 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Skipping claimed bdev '%s'(<-'%s').\n", 1377 bdev->name, bdev->internal.claim_module->name); 1378 continue; 1379 } 1380 1381 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Unregistering bdev '%s'\n", bdev->name); 1382 spdk_bdev_unregister(bdev, bdev_finish_unregister_bdevs_iter, bdev); 1383 return; 1384 } 1385 1386 /* 1387 * If any bdev fails to unclaim underlying bdev properly, we may face the 1388 * case of bdev list consisting of claimed bdevs only (if claims are managed 1389 * correctly, this would mean there's a loop in the claims graph which is 1390 * clearly impossible). Warn and unregister last bdev on the list then. 1391 */ 1392 for (bdev = TAILQ_LAST(&g_bdev_mgr.bdevs, spdk_bdev_list); 1393 bdev; bdev = TAILQ_PREV(bdev, spdk_bdev_list, internal.link)) { 1394 SPDK_WARNLOG("Unregistering claimed bdev '%s'!\n", bdev->name); 1395 spdk_bdev_unregister(bdev, bdev_finish_unregister_bdevs_iter, bdev); 1396 return; 1397 } 1398 } 1399 1400 void 1401 spdk_bdev_finish(spdk_bdev_fini_cb cb_fn, void *cb_arg) 1402 { 1403 struct spdk_bdev_module *m; 1404 1405 assert(cb_fn != NULL); 1406 1407 g_fini_thread = spdk_get_thread(); 1408 1409 g_fini_cb_fn = cb_fn; 1410 g_fini_cb_arg = cb_arg; 1411 1412 TAILQ_FOREACH(m, &g_bdev_mgr.bdev_modules, internal.tailq) { 1413 if (m->fini_start) { 1414 m->fini_start(); 1415 } 1416 } 1417 1418 bdev_finish_unregister_bdevs_iter(NULL, 0); 1419 } 1420 1421 struct spdk_bdev_io * 1422 bdev_channel_get_io(struct spdk_bdev_channel *channel) 1423 { 1424 struct spdk_bdev_mgmt_channel *ch = channel->shared_resource->mgmt_ch; 1425 struct spdk_bdev_io *bdev_io; 1426 1427 if (ch->per_thread_cache_count > 0) { 1428 bdev_io = STAILQ_FIRST(&ch->per_thread_cache); 1429 STAILQ_REMOVE_HEAD(&ch->per_thread_cache, internal.buf_link); 1430 ch->per_thread_cache_count--; 1431 } else if (spdk_unlikely(!TAILQ_EMPTY(&ch->io_wait_queue))) { 1432 /* 1433 * Don't try to look for bdev_ios in the global pool if there are 1434 * waiters on bdev_ios - we don't want this caller to jump the line. 1435 */ 1436 bdev_io = NULL; 1437 } else { 1438 bdev_io = spdk_mempool_get(g_bdev_mgr.bdev_io_pool); 1439 } 1440 1441 return bdev_io; 1442 } 1443 1444 void 1445 spdk_bdev_free_io(struct spdk_bdev_io *bdev_io) 1446 { 1447 struct spdk_bdev_mgmt_channel *ch; 1448 1449 assert(bdev_io != NULL); 1450 assert(bdev_io->internal.status != SPDK_BDEV_IO_STATUS_PENDING); 1451 1452 ch = bdev_io->internal.ch->shared_resource->mgmt_ch; 1453 1454 if (bdev_io->internal.buf != NULL) { 1455 bdev_io_put_buf(bdev_io); 1456 } 1457 1458 if (ch->per_thread_cache_count < ch->bdev_io_cache_size) { 1459 ch->per_thread_cache_count++; 1460 STAILQ_INSERT_HEAD(&ch->per_thread_cache, bdev_io, internal.buf_link); 1461 while (ch->per_thread_cache_count > 0 && !TAILQ_EMPTY(&ch->io_wait_queue)) { 1462 struct spdk_bdev_io_wait_entry *entry; 1463 1464 entry = TAILQ_FIRST(&ch->io_wait_queue); 1465 TAILQ_REMOVE(&ch->io_wait_queue, entry, link); 1466 entry->cb_fn(entry->cb_arg); 1467 } 1468 } else { 1469 /* We should never have a full cache with entries on the io wait queue. */ 1470 assert(TAILQ_EMPTY(&ch->io_wait_queue)); 1471 spdk_mempool_put(g_bdev_mgr.bdev_io_pool, (void *)bdev_io); 1472 } 1473 } 1474 1475 static bool 1476 bdev_qos_is_iops_rate_limit(enum spdk_bdev_qos_rate_limit_type limit) 1477 { 1478 assert(limit != SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES); 1479 1480 switch (limit) { 1481 case SPDK_BDEV_QOS_RW_IOPS_RATE_LIMIT: 1482 return true; 1483 case SPDK_BDEV_QOS_RW_BPS_RATE_LIMIT: 1484 case SPDK_BDEV_QOS_R_BPS_RATE_LIMIT: 1485 case SPDK_BDEV_QOS_W_BPS_RATE_LIMIT: 1486 return false; 1487 case SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES: 1488 default: 1489 return false; 1490 } 1491 } 1492 1493 static bool 1494 bdev_qos_io_to_limit(struct spdk_bdev_io *bdev_io) 1495 { 1496 switch (bdev_io->type) { 1497 case SPDK_BDEV_IO_TYPE_NVME_IO: 1498 case SPDK_BDEV_IO_TYPE_NVME_IO_MD: 1499 case SPDK_BDEV_IO_TYPE_READ: 1500 case SPDK_BDEV_IO_TYPE_WRITE: 1501 return true; 1502 case SPDK_BDEV_IO_TYPE_ZCOPY: 1503 if (bdev_io->u.bdev.zcopy.start) { 1504 return true; 1505 } else { 1506 return false; 1507 } 1508 default: 1509 return false; 1510 } 1511 } 1512 1513 static bool 1514 bdev_is_read_io(struct spdk_bdev_io *bdev_io) 1515 { 1516 switch (bdev_io->type) { 1517 case SPDK_BDEV_IO_TYPE_NVME_IO: 1518 case SPDK_BDEV_IO_TYPE_NVME_IO_MD: 1519 /* Bit 1 (0x2) set for read operation */ 1520 if (bdev_io->u.nvme_passthru.cmd.opc & SPDK_NVME_OPC_READ) { 1521 return true; 1522 } else { 1523 return false; 1524 } 1525 case SPDK_BDEV_IO_TYPE_READ: 1526 return true; 1527 case SPDK_BDEV_IO_TYPE_ZCOPY: 1528 /* Populate to read from disk */ 1529 if (bdev_io->u.bdev.zcopy.populate) { 1530 return true; 1531 } else { 1532 return false; 1533 } 1534 default: 1535 return false; 1536 } 1537 } 1538 1539 static uint64_t 1540 bdev_get_io_size_in_byte(struct spdk_bdev_io *bdev_io) 1541 { 1542 struct spdk_bdev *bdev = bdev_io->bdev; 1543 1544 switch (bdev_io->type) { 1545 case SPDK_BDEV_IO_TYPE_NVME_IO: 1546 case SPDK_BDEV_IO_TYPE_NVME_IO_MD: 1547 return bdev_io->u.nvme_passthru.nbytes; 1548 case SPDK_BDEV_IO_TYPE_READ: 1549 case SPDK_BDEV_IO_TYPE_WRITE: 1550 return bdev_io->u.bdev.num_blocks * bdev->blocklen; 1551 case SPDK_BDEV_IO_TYPE_ZCOPY: 1552 /* Track the data in the start phase only */ 1553 if (bdev_io->u.bdev.zcopy.start) { 1554 return bdev_io->u.bdev.num_blocks * bdev->blocklen; 1555 } else { 1556 return 0; 1557 } 1558 default: 1559 return 0; 1560 } 1561 } 1562 1563 static bool 1564 bdev_qos_rw_queue_io(const struct spdk_bdev_qos_limit *limit, struct spdk_bdev_io *io) 1565 { 1566 if (limit->max_per_timeslice > 0 && limit->remaining_this_timeslice <= 0) { 1567 return true; 1568 } else { 1569 return false; 1570 } 1571 } 1572 1573 static bool 1574 bdev_qos_r_queue_io(const struct spdk_bdev_qos_limit *limit, struct spdk_bdev_io *io) 1575 { 1576 if (bdev_is_read_io(io) == false) { 1577 return false; 1578 } 1579 1580 return bdev_qos_rw_queue_io(limit, io); 1581 } 1582 1583 static bool 1584 bdev_qos_w_queue_io(const struct spdk_bdev_qos_limit *limit, struct spdk_bdev_io *io) 1585 { 1586 if (bdev_is_read_io(io) == true) { 1587 return false; 1588 } 1589 1590 return bdev_qos_rw_queue_io(limit, io); 1591 } 1592 1593 static void 1594 bdev_qos_rw_iops_update_quota(struct spdk_bdev_qos_limit *limit, struct spdk_bdev_io *io) 1595 { 1596 limit->remaining_this_timeslice--; 1597 } 1598 1599 static void 1600 bdev_qos_rw_bps_update_quota(struct spdk_bdev_qos_limit *limit, struct spdk_bdev_io *io) 1601 { 1602 limit->remaining_this_timeslice -= bdev_get_io_size_in_byte(io); 1603 } 1604 1605 static void 1606 bdev_qos_r_bps_update_quota(struct spdk_bdev_qos_limit *limit, struct spdk_bdev_io *io) 1607 { 1608 if (bdev_is_read_io(io) == false) { 1609 return; 1610 } 1611 1612 return bdev_qos_rw_bps_update_quota(limit, io); 1613 } 1614 1615 static void 1616 bdev_qos_w_bps_update_quota(struct spdk_bdev_qos_limit *limit, struct spdk_bdev_io *io) 1617 { 1618 if (bdev_is_read_io(io) == true) { 1619 return; 1620 } 1621 1622 return bdev_qos_rw_bps_update_quota(limit, io); 1623 } 1624 1625 static void 1626 bdev_qos_set_ops(struct spdk_bdev_qos *qos) 1627 { 1628 int i; 1629 1630 for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) { 1631 if (qos->rate_limits[i].limit == SPDK_BDEV_QOS_LIMIT_NOT_DEFINED) { 1632 qos->rate_limits[i].queue_io = NULL; 1633 qos->rate_limits[i].update_quota = NULL; 1634 continue; 1635 } 1636 1637 switch (i) { 1638 case SPDK_BDEV_QOS_RW_IOPS_RATE_LIMIT: 1639 qos->rate_limits[i].queue_io = bdev_qos_rw_queue_io; 1640 qos->rate_limits[i].update_quota = bdev_qos_rw_iops_update_quota; 1641 break; 1642 case SPDK_BDEV_QOS_RW_BPS_RATE_LIMIT: 1643 qos->rate_limits[i].queue_io = bdev_qos_rw_queue_io; 1644 qos->rate_limits[i].update_quota = bdev_qos_rw_bps_update_quota; 1645 break; 1646 case SPDK_BDEV_QOS_R_BPS_RATE_LIMIT: 1647 qos->rate_limits[i].queue_io = bdev_qos_r_queue_io; 1648 qos->rate_limits[i].update_quota = bdev_qos_r_bps_update_quota; 1649 break; 1650 case SPDK_BDEV_QOS_W_BPS_RATE_LIMIT: 1651 qos->rate_limits[i].queue_io = bdev_qos_w_queue_io; 1652 qos->rate_limits[i].update_quota = bdev_qos_w_bps_update_quota; 1653 break; 1654 default: 1655 break; 1656 } 1657 } 1658 } 1659 1660 static void 1661 _bdev_io_complete_in_submit(struct spdk_bdev_channel *bdev_ch, 1662 struct spdk_bdev_io *bdev_io, 1663 enum spdk_bdev_io_status status) 1664 { 1665 struct spdk_bdev_shared_resource *shared_resource = bdev_ch->shared_resource; 1666 1667 bdev_io->internal.in_submit_request = true; 1668 bdev_ch->io_outstanding++; 1669 shared_resource->io_outstanding++; 1670 spdk_bdev_io_complete(bdev_io, status); 1671 bdev_io->internal.in_submit_request = false; 1672 } 1673 1674 static inline void 1675 bdev_io_do_submit(struct spdk_bdev_channel *bdev_ch, struct spdk_bdev_io *bdev_io) 1676 { 1677 struct spdk_bdev *bdev = bdev_io->bdev; 1678 struct spdk_io_channel *ch = bdev_ch->channel; 1679 struct spdk_bdev_shared_resource *shared_resource = bdev_ch->shared_resource; 1680 1681 if (spdk_likely(TAILQ_EMPTY(&shared_resource->nomem_io))) { 1682 bdev_ch->io_outstanding++; 1683 shared_resource->io_outstanding++; 1684 bdev_io->internal.in_submit_request = true; 1685 bdev->fn_table->submit_request(ch, bdev_io); 1686 bdev_io->internal.in_submit_request = false; 1687 } else { 1688 TAILQ_INSERT_TAIL(&shared_resource->nomem_io, bdev_io, internal.link); 1689 } 1690 } 1691 1692 static int 1693 bdev_qos_io_submit(struct spdk_bdev_channel *ch, struct spdk_bdev_qos *qos) 1694 { 1695 struct spdk_bdev_io *bdev_io = NULL, *tmp = NULL; 1696 int i, submitted_ios = 0; 1697 1698 TAILQ_FOREACH_SAFE(bdev_io, &qos->queued, internal.link, tmp) { 1699 if (bdev_qos_io_to_limit(bdev_io) == true) { 1700 for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) { 1701 if (!qos->rate_limits[i].queue_io) { 1702 continue; 1703 } 1704 1705 if (qos->rate_limits[i].queue_io(&qos->rate_limits[i], 1706 bdev_io) == true) { 1707 return submitted_ios; 1708 } 1709 } 1710 for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) { 1711 if (!qos->rate_limits[i].update_quota) { 1712 continue; 1713 } 1714 1715 qos->rate_limits[i].update_quota(&qos->rate_limits[i], bdev_io); 1716 } 1717 } 1718 1719 TAILQ_REMOVE(&qos->queued, bdev_io, internal.link); 1720 bdev_io_do_submit(ch, bdev_io); 1721 submitted_ios++; 1722 } 1723 1724 return submitted_ios; 1725 } 1726 1727 static void 1728 bdev_queue_io_wait_with_cb(struct spdk_bdev_io *bdev_io, spdk_bdev_io_wait_cb cb_fn) 1729 { 1730 int rc; 1731 1732 bdev_io->internal.waitq_entry.bdev = bdev_io->bdev; 1733 bdev_io->internal.waitq_entry.cb_fn = cb_fn; 1734 bdev_io->internal.waitq_entry.cb_arg = bdev_io; 1735 rc = spdk_bdev_queue_io_wait(bdev_io->bdev, spdk_io_channel_from_ctx(bdev_io->internal.ch), 1736 &bdev_io->internal.waitq_entry); 1737 if (rc != 0) { 1738 SPDK_ERRLOG("Queue IO failed, rc=%d\n", rc); 1739 bdev_io->internal.status = SPDK_BDEV_IO_STATUS_FAILED; 1740 bdev_io->internal.cb(bdev_io, false, bdev_io->internal.caller_ctx); 1741 } 1742 } 1743 1744 static bool 1745 bdev_io_type_can_split(uint8_t type) 1746 { 1747 assert(type != SPDK_BDEV_IO_TYPE_INVALID); 1748 assert(type < SPDK_BDEV_NUM_IO_TYPES); 1749 1750 /* Only split READ and WRITE I/O. Theoretically other types of I/O like 1751 * UNMAP could be split, but these types of I/O are typically much larger 1752 * in size (sometimes the size of the entire block device), and the bdev 1753 * module can more efficiently split these types of I/O. Plus those types 1754 * of I/O do not have a payload, which makes the splitting process simpler. 1755 */ 1756 if (type == SPDK_BDEV_IO_TYPE_READ || type == SPDK_BDEV_IO_TYPE_WRITE) { 1757 return true; 1758 } else { 1759 return false; 1760 } 1761 } 1762 1763 static bool 1764 bdev_io_should_split(struct spdk_bdev_io *bdev_io) 1765 { 1766 uint64_t start_stripe, end_stripe; 1767 uint32_t io_boundary = bdev_io->bdev->optimal_io_boundary; 1768 1769 if (io_boundary == 0) { 1770 return false; 1771 } 1772 1773 if (!bdev_io_type_can_split(bdev_io->type)) { 1774 return false; 1775 } 1776 1777 start_stripe = bdev_io->u.bdev.offset_blocks; 1778 end_stripe = start_stripe + bdev_io->u.bdev.num_blocks - 1; 1779 /* Avoid expensive div operations if possible. These spdk_u32 functions are very cheap. */ 1780 if (spdk_likely(spdk_u32_is_pow2(io_boundary))) { 1781 start_stripe >>= spdk_u32log2(io_boundary); 1782 end_stripe >>= spdk_u32log2(io_boundary); 1783 } else { 1784 start_stripe /= io_boundary; 1785 end_stripe /= io_boundary; 1786 } 1787 return (start_stripe != end_stripe); 1788 } 1789 1790 static uint32_t 1791 _to_next_boundary(uint64_t offset, uint32_t boundary) 1792 { 1793 return (boundary - (offset % boundary)); 1794 } 1795 1796 static void 1797 bdev_io_split_done(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg); 1798 1799 static void 1800 _bdev_io_split(void *_bdev_io) 1801 { 1802 struct spdk_bdev_io *bdev_io = _bdev_io; 1803 uint64_t current_offset, remaining; 1804 uint32_t blocklen, to_next_boundary, to_next_boundary_bytes, to_last_block_bytes; 1805 struct iovec *parent_iov, *iov; 1806 uint64_t parent_iov_offset, iov_len; 1807 uint32_t parent_iovpos, parent_iovcnt, child_iovcnt, iovcnt; 1808 void *md_buf = NULL; 1809 int rc; 1810 1811 remaining = bdev_io->u.bdev.split_remaining_num_blocks; 1812 current_offset = bdev_io->u.bdev.split_current_offset_blocks; 1813 blocklen = bdev_io->bdev->blocklen; 1814 parent_iov_offset = (current_offset - bdev_io->u.bdev.offset_blocks) * blocklen; 1815 parent_iovcnt = bdev_io->u.bdev.iovcnt; 1816 1817 for (parent_iovpos = 0; parent_iovpos < parent_iovcnt; parent_iovpos++) { 1818 parent_iov = &bdev_io->u.bdev.iovs[parent_iovpos]; 1819 if (parent_iov_offset < parent_iov->iov_len) { 1820 break; 1821 } 1822 parent_iov_offset -= parent_iov->iov_len; 1823 } 1824 1825 child_iovcnt = 0; 1826 while (remaining > 0 && parent_iovpos < parent_iovcnt && child_iovcnt < BDEV_IO_NUM_CHILD_IOV) { 1827 to_next_boundary = _to_next_boundary(current_offset, bdev_io->bdev->optimal_io_boundary); 1828 to_next_boundary = spdk_min(remaining, to_next_boundary); 1829 to_next_boundary_bytes = to_next_boundary * blocklen; 1830 iov = &bdev_io->child_iov[child_iovcnt]; 1831 iovcnt = 0; 1832 1833 if (bdev_io->u.bdev.md_buf) { 1834 assert((parent_iov_offset % blocklen) > 0); 1835 md_buf = (char *)bdev_io->u.bdev.md_buf + (parent_iov_offset / blocklen) * 1836 spdk_bdev_get_md_size(bdev_io->bdev); 1837 } 1838 1839 while (to_next_boundary_bytes > 0 && parent_iovpos < parent_iovcnt && 1840 child_iovcnt < BDEV_IO_NUM_CHILD_IOV) { 1841 parent_iov = &bdev_io->u.bdev.iovs[parent_iovpos]; 1842 iov_len = spdk_min(to_next_boundary_bytes, parent_iov->iov_len - parent_iov_offset); 1843 to_next_boundary_bytes -= iov_len; 1844 1845 bdev_io->child_iov[child_iovcnt].iov_base = parent_iov->iov_base + parent_iov_offset; 1846 bdev_io->child_iov[child_iovcnt].iov_len = iov_len; 1847 1848 if (iov_len < parent_iov->iov_len - parent_iov_offset) { 1849 parent_iov_offset += iov_len; 1850 } else { 1851 parent_iovpos++; 1852 parent_iov_offset = 0; 1853 } 1854 child_iovcnt++; 1855 iovcnt++; 1856 } 1857 1858 if (to_next_boundary_bytes > 0) { 1859 /* We had to stop this child I/O early because we ran out of 1860 * child_iov space. Ensure the iovs to be aligned with block 1861 * size and then adjust to_next_boundary before starting the 1862 * child I/O. 1863 */ 1864 assert(child_iovcnt == BDEV_IO_NUM_CHILD_IOV); 1865 to_last_block_bytes = to_next_boundary_bytes % blocklen; 1866 if (to_last_block_bytes != 0) { 1867 uint32_t child_iovpos = child_iovcnt - 1; 1868 /* don't decrease child_iovcnt so the loop will naturally end */ 1869 1870 to_last_block_bytes = blocklen - to_last_block_bytes; 1871 to_next_boundary_bytes += to_last_block_bytes; 1872 while (to_last_block_bytes > 0 && iovcnt > 0) { 1873 iov_len = spdk_min(to_last_block_bytes, 1874 bdev_io->child_iov[child_iovpos].iov_len); 1875 bdev_io->child_iov[child_iovpos].iov_len -= iov_len; 1876 if (bdev_io->child_iov[child_iovpos].iov_len == 0) { 1877 child_iovpos--; 1878 if (--iovcnt == 0) { 1879 return; 1880 } 1881 } 1882 to_last_block_bytes -= iov_len; 1883 } 1884 1885 assert(to_last_block_bytes == 0); 1886 } 1887 to_next_boundary -= to_next_boundary_bytes / blocklen; 1888 } 1889 1890 bdev_io->u.bdev.split_outstanding++; 1891 1892 if (bdev_io->type == SPDK_BDEV_IO_TYPE_READ) { 1893 rc = bdev_readv_blocks_with_md(bdev_io->internal.desc, 1894 spdk_io_channel_from_ctx(bdev_io->internal.ch), 1895 iov, iovcnt, md_buf, current_offset, 1896 to_next_boundary, 1897 bdev_io_split_done, bdev_io); 1898 } else { 1899 rc = bdev_writev_blocks_with_md(bdev_io->internal.desc, 1900 spdk_io_channel_from_ctx(bdev_io->internal.ch), 1901 iov, iovcnt, md_buf, current_offset, 1902 to_next_boundary, 1903 bdev_io_split_done, bdev_io); 1904 } 1905 1906 if (rc == 0) { 1907 current_offset += to_next_boundary; 1908 remaining -= to_next_boundary; 1909 bdev_io->u.bdev.split_current_offset_blocks = current_offset; 1910 bdev_io->u.bdev.split_remaining_num_blocks = remaining; 1911 } else { 1912 bdev_io->u.bdev.split_outstanding--; 1913 if (rc == -ENOMEM) { 1914 if (bdev_io->u.bdev.split_outstanding == 0) { 1915 /* No I/O is outstanding. Hence we should wait here. */ 1916 bdev_queue_io_wait_with_cb(bdev_io, _bdev_io_split); 1917 } 1918 } else { 1919 bdev_io->internal.status = SPDK_BDEV_IO_STATUS_FAILED; 1920 if (bdev_io->u.bdev.split_outstanding == 0) { 1921 spdk_trace_record_tsc(spdk_get_ticks(), TRACE_BDEV_IO_DONE, 0, 0, 1922 (uintptr_t)bdev_io, 0); 1923 TAILQ_REMOVE(&bdev_io->internal.ch->io_submitted, bdev_io, internal.ch_link); 1924 bdev_io->internal.cb(bdev_io, false, bdev_io->internal.caller_ctx); 1925 } 1926 } 1927 1928 return; 1929 } 1930 } 1931 } 1932 1933 static void 1934 bdev_io_split_done(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg) 1935 { 1936 struct spdk_bdev_io *parent_io = cb_arg; 1937 1938 spdk_bdev_free_io(bdev_io); 1939 1940 if (!success) { 1941 parent_io->internal.status = SPDK_BDEV_IO_STATUS_FAILED; 1942 } 1943 parent_io->u.bdev.split_outstanding--; 1944 if (parent_io->u.bdev.split_outstanding != 0) { 1945 return; 1946 } 1947 1948 /* 1949 * Parent I/O finishes when all blocks are consumed. 1950 */ 1951 if (parent_io->u.bdev.split_remaining_num_blocks == 0) { 1952 assert(parent_io->internal.cb != bdev_io_split_done); 1953 spdk_trace_record_tsc(spdk_get_ticks(), TRACE_BDEV_IO_DONE, 0, 0, 1954 (uintptr_t)parent_io, 0); 1955 TAILQ_REMOVE(&parent_io->internal.ch->io_submitted, parent_io, internal.ch_link); 1956 parent_io->internal.cb(parent_io, parent_io->internal.status == SPDK_BDEV_IO_STATUS_SUCCESS, 1957 parent_io->internal.caller_ctx); 1958 return; 1959 } 1960 1961 /* 1962 * Continue with the splitting process. This function will complete the parent I/O if the 1963 * splitting is done. 1964 */ 1965 _bdev_io_split(parent_io); 1966 } 1967 1968 static void 1969 bdev_io_split_get_buf_cb(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io, bool success); 1970 1971 static void 1972 bdev_io_split(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io) 1973 { 1974 assert(bdev_io_type_can_split(bdev_io->type)); 1975 1976 bdev_io->u.bdev.split_current_offset_blocks = bdev_io->u.bdev.offset_blocks; 1977 bdev_io->u.bdev.split_remaining_num_blocks = bdev_io->u.bdev.num_blocks; 1978 bdev_io->u.bdev.split_outstanding = 0; 1979 bdev_io->internal.status = SPDK_BDEV_IO_STATUS_SUCCESS; 1980 1981 if (_is_buf_allocated(bdev_io->u.bdev.iovs)) { 1982 _bdev_io_split(bdev_io); 1983 } else { 1984 assert(bdev_io->type == SPDK_BDEV_IO_TYPE_READ); 1985 spdk_bdev_io_get_buf(bdev_io, bdev_io_split_get_buf_cb, 1986 bdev_io->u.bdev.num_blocks * bdev_io->bdev->blocklen); 1987 } 1988 } 1989 1990 static void 1991 bdev_io_split_get_buf_cb(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io, bool success) 1992 { 1993 if (!success) { 1994 spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED); 1995 return; 1996 } 1997 1998 bdev_io_split(ch, bdev_io); 1999 } 2000 2001 /* Explicitly mark this inline, since it's used as a function pointer and otherwise won't 2002 * be inlined, at least on some compilers. 2003 */ 2004 static inline void 2005 _bdev_io_submit(void *ctx) 2006 { 2007 struct spdk_bdev_io *bdev_io = ctx; 2008 struct spdk_bdev *bdev = bdev_io->bdev; 2009 struct spdk_bdev_channel *bdev_ch = bdev_io->internal.ch; 2010 uint64_t tsc; 2011 2012 tsc = spdk_get_ticks(); 2013 bdev_io->internal.submit_tsc = tsc; 2014 spdk_trace_record_tsc(tsc, TRACE_BDEV_IO_START, 0, 0, (uintptr_t)bdev_io, bdev_io->type); 2015 2016 if (spdk_likely(bdev_ch->flags == 0)) { 2017 bdev_io_do_submit(bdev_ch, bdev_io); 2018 return; 2019 } 2020 2021 if (bdev_ch->flags & BDEV_CH_RESET_IN_PROGRESS) { 2022 _bdev_io_complete_in_submit(bdev_ch, bdev_io, SPDK_BDEV_IO_STATUS_FAILED); 2023 } else if (bdev_ch->flags & BDEV_CH_QOS_ENABLED) { 2024 if (spdk_unlikely(bdev_io->type == SPDK_BDEV_IO_TYPE_ABORT) && 2025 bdev_abort_queued_io(&bdev->internal.qos->queued, bdev_io->u.abort.bio_to_abort)) { 2026 _bdev_io_complete_in_submit(bdev_ch, bdev_io, SPDK_BDEV_IO_STATUS_SUCCESS); 2027 } else { 2028 TAILQ_INSERT_TAIL(&bdev->internal.qos->queued, bdev_io, internal.link); 2029 bdev_qos_io_submit(bdev_ch, bdev->internal.qos); 2030 } 2031 } else { 2032 SPDK_ERRLOG("unknown bdev_ch flag %x found\n", bdev_ch->flags); 2033 _bdev_io_complete_in_submit(bdev_ch, bdev_io, SPDK_BDEV_IO_STATUS_FAILED); 2034 } 2035 } 2036 2037 bool 2038 bdev_lba_range_overlapped(struct lba_range *range1, struct lba_range *range2); 2039 2040 bool 2041 bdev_lba_range_overlapped(struct lba_range *range1, struct lba_range *range2) 2042 { 2043 if (range1->length == 0 || range2->length == 0) { 2044 return false; 2045 } 2046 2047 if (range1->offset + range1->length <= range2->offset) { 2048 return false; 2049 } 2050 2051 if (range2->offset + range2->length <= range1->offset) { 2052 return false; 2053 } 2054 2055 return true; 2056 } 2057 2058 static bool 2059 bdev_io_range_is_locked(struct spdk_bdev_io *bdev_io, struct lba_range *range) 2060 { 2061 struct spdk_bdev_channel *ch = bdev_io->internal.ch; 2062 struct lba_range r; 2063 2064 switch (bdev_io->type) { 2065 case SPDK_BDEV_IO_TYPE_NVME_IO: 2066 case SPDK_BDEV_IO_TYPE_NVME_IO_MD: 2067 /* Don't try to decode the NVMe command - just assume worst-case and that 2068 * it overlaps a locked range. 2069 */ 2070 return true; 2071 case SPDK_BDEV_IO_TYPE_WRITE: 2072 case SPDK_BDEV_IO_TYPE_UNMAP: 2073 case SPDK_BDEV_IO_TYPE_WRITE_ZEROES: 2074 case SPDK_BDEV_IO_TYPE_ZCOPY: 2075 r.offset = bdev_io->u.bdev.offset_blocks; 2076 r.length = bdev_io->u.bdev.num_blocks; 2077 if (!bdev_lba_range_overlapped(range, &r)) { 2078 /* This I/O doesn't overlap the specified LBA range. */ 2079 return false; 2080 } else if (range->owner_ch == ch && range->locked_ctx == bdev_io->internal.caller_ctx) { 2081 /* This I/O overlaps, but the I/O is on the same channel that locked this 2082 * range, and the caller_ctx is the same as the locked_ctx. This means 2083 * that this I/O is associated with the lock, and is allowed to execute. 2084 */ 2085 return false; 2086 } else { 2087 return true; 2088 } 2089 default: 2090 return false; 2091 } 2092 } 2093 2094 void 2095 bdev_io_submit(struct spdk_bdev_io *bdev_io) 2096 { 2097 struct spdk_bdev *bdev = bdev_io->bdev; 2098 struct spdk_thread *thread = spdk_bdev_io_get_thread(bdev_io); 2099 struct spdk_bdev_channel *ch = bdev_io->internal.ch; 2100 2101 assert(thread != NULL); 2102 assert(bdev_io->internal.status == SPDK_BDEV_IO_STATUS_PENDING); 2103 2104 if (!TAILQ_EMPTY(&ch->locked_ranges)) { 2105 struct lba_range *range; 2106 2107 TAILQ_FOREACH(range, &ch->locked_ranges, tailq) { 2108 if (bdev_io_range_is_locked(bdev_io, range)) { 2109 TAILQ_INSERT_TAIL(&ch->io_locked, bdev_io, internal.ch_link); 2110 return; 2111 } 2112 } 2113 } 2114 2115 /* Add the bdev_io to io_submitted only if it is the original 2116 * submission from the bdev user. When a bdev_io is split, 2117 * it comes back through this code path, so we need to make sure 2118 * we don't try to add it a second time. 2119 */ 2120 if (bdev_io->internal.cb != bdev_io_split_done) { 2121 TAILQ_INSERT_TAIL(&ch->io_submitted, bdev_io, internal.ch_link); 2122 } 2123 2124 if (bdev->split_on_optimal_io_boundary && bdev_io_should_split(bdev_io)) { 2125 bdev_io->internal.submit_tsc = spdk_get_ticks(); 2126 spdk_trace_record_tsc(bdev_io->internal.submit_tsc, TRACE_BDEV_IO_START, 0, 0, 2127 (uintptr_t)bdev_io, bdev_io->type); 2128 bdev_io_split(NULL, bdev_io); 2129 return; 2130 } 2131 2132 if (ch->flags & BDEV_CH_QOS_ENABLED) { 2133 if ((thread == bdev->internal.qos->thread) || !bdev->internal.qos->thread) { 2134 _bdev_io_submit(bdev_io); 2135 } else { 2136 bdev_io->internal.io_submit_ch = ch; 2137 bdev_io->internal.ch = bdev->internal.qos->ch; 2138 spdk_thread_send_msg(bdev->internal.qos->thread, _bdev_io_submit, bdev_io); 2139 } 2140 } else { 2141 _bdev_io_submit(bdev_io); 2142 } 2143 } 2144 2145 static void 2146 bdev_io_submit_reset(struct spdk_bdev_io *bdev_io) 2147 { 2148 struct spdk_bdev *bdev = bdev_io->bdev; 2149 struct spdk_bdev_channel *bdev_ch = bdev_io->internal.ch; 2150 struct spdk_io_channel *ch = bdev_ch->channel; 2151 2152 assert(bdev_io->internal.status == SPDK_BDEV_IO_STATUS_PENDING); 2153 2154 bdev_io->internal.in_submit_request = true; 2155 bdev->fn_table->submit_request(ch, bdev_io); 2156 bdev_io->internal.in_submit_request = false; 2157 } 2158 2159 void 2160 bdev_io_init(struct spdk_bdev_io *bdev_io, 2161 struct spdk_bdev *bdev, void *cb_arg, 2162 spdk_bdev_io_completion_cb cb) 2163 { 2164 bdev_io->bdev = bdev; 2165 bdev_io->internal.caller_ctx = cb_arg; 2166 bdev_io->internal.cb = cb; 2167 bdev_io->internal.status = SPDK_BDEV_IO_STATUS_PENDING; 2168 bdev_io->internal.in_submit_request = false; 2169 bdev_io->internal.buf = NULL; 2170 bdev_io->internal.io_submit_ch = NULL; 2171 bdev_io->internal.orig_iovs = NULL; 2172 bdev_io->internal.orig_iovcnt = 0; 2173 bdev_io->internal.orig_md_buf = NULL; 2174 bdev_io->internal.error.nvme.cdw0 = 0; 2175 bdev_io->num_retries = 0; 2176 bdev_io->internal.get_buf_cb = NULL; 2177 bdev_io->internal.get_aux_buf_cb = NULL; 2178 } 2179 2180 static bool 2181 bdev_io_type_supported(struct spdk_bdev *bdev, enum spdk_bdev_io_type io_type) 2182 { 2183 return bdev->fn_table->io_type_supported(bdev->ctxt, io_type); 2184 } 2185 2186 bool 2187 spdk_bdev_io_type_supported(struct spdk_bdev *bdev, enum spdk_bdev_io_type io_type) 2188 { 2189 bool supported; 2190 2191 supported = bdev_io_type_supported(bdev, io_type); 2192 2193 if (!supported) { 2194 switch (io_type) { 2195 case SPDK_BDEV_IO_TYPE_WRITE_ZEROES: 2196 /* The bdev layer will emulate write zeroes as long as write is supported. */ 2197 supported = bdev_io_type_supported(bdev, SPDK_BDEV_IO_TYPE_WRITE); 2198 break; 2199 case SPDK_BDEV_IO_TYPE_ZCOPY: 2200 /* Zero copy can be emulated with regular read and write */ 2201 supported = bdev_io_type_supported(bdev, SPDK_BDEV_IO_TYPE_READ) && 2202 bdev_io_type_supported(bdev, SPDK_BDEV_IO_TYPE_WRITE); 2203 break; 2204 default: 2205 break; 2206 } 2207 } 2208 2209 return supported; 2210 } 2211 2212 int 2213 spdk_bdev_dump_info_json(struct spdk_bdev *bdev, struct spdk_json_write_ctx *w) 2214 { 2215 if (bdev->fn_table->dump_info_json) { 2216 return bdev->fn_table->dump_info_json(bdev->ctxt, w); 2217 } 2218 2219 return 0; 2220 } 2221 2222 static void 2223 bdev_qos_update_max_quota_per_timeslice(struct spdk_bdev_qos *qos) 2224 { 2225 uint32_t max_per_timeslice = 0; 2226 int i; 2227 2228 for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) { 2229 if (qos->rate_limits[i].limit == SPDK_BDEV_QOS_LIMIT_NOT_DEFINED) { 2230 qos->rate_limits[i].max_per_timeslice = 0; 2231 continue; 2232 } 2233 2234 max_per_timeslice = qos->rate_limits[i].limit * 2235 SPDK_BDEV_QOS_TIMESLICE_IN_USEC / SPDK_SEC_TO_USEC; 2236 2237 qos->rate_limits[i].max_per_timeslice = spdk_max(max_per_timeslice, 2238 qos->rate_limits[i].min_per_timeslice); 2239 2240 qos->rate_limits[i].remaining_this_timeslice = qos->rate_limits[i].max_per_timeslice; 2241 } 2242 2243 bdev_qos_set_ops(qos); 2244 } 2245 2246 static int 2247 bdev_channel_poll_qos(void *arg) 2248 { 2249 struct spdk_bdev_qos *qos = arg; 2250 uint64_t now = spdk_get_ticks(); 2251 int i; 2252 2253 if (now < (qos->last_timeslice + qos->timeslice_size)) { 2254 /* We received our callback earlier than expected - return 2255 * immediately and wait to do accounting until at least one 2256 * timeslice has actually expired. This should never happen 2257 * with a well-behaved timer implementation. 2258 */ 2259 return 0; 2260 } 2261 2262 /* Reset for next round of rate limiting */ 2263 for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) { 2264 /* We may have allowed the IOs or bytes to slightly overrun in the last 2265 * timeslice. remaining_this_timeslice is signed, so if it's negative 2266 * here, we'll account for the overrun so that the next timeslice will 2267 * be appropriately reduced. 2268 */ 2269 if (qos->rate_limits[i].remaining_this_timeslice > 0) { 2270 qos->rate_limits[i].remaining_this_timeslice = 0; 2271 } 2272 } 2273 2274 while (now >= (qos->last_timeslice + qos->timeslice_size)) { 2275 qos->last_timeslice += qos->timeslice_size; 2276 for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) { 2277 qos->rate_limits[i].remaining_this_timeslice += 2278 qos->rate_limits[i].max_per_timeslice; 2279 } 2280 } 2281 2282 return bdev_qos_io_submit(qos->ch, qos); 2283 } 2284 2285 static void 2286 bdev_channel_destroy_resource(struct spdk_bdev_channel *ch) 2287 { 2288 struct spdk_bdev_shared_resource *shared_resource; 2289 struct lba_range *range; 2290 2291 while (!TAILQ_EMPTY(&ch->locked_ranges)) { 2292 range = TAILQ_FIRST(&ch->locked_ranges); 2293 TAILQ_REMOVE(&ch->locked_ranges, range, tailq); 2294 free(range); 2295 } 2296 2297 spdk_put_io_channel(ch->channel); 2298 2299 shared_resource = ch->shared_resource; 2300 2301 assert(TAILQ_EMPTY(&ch->io_locked)); 2302 assert(TAILQ_EMPTY(&ch->io_submitted)); 2303 assert(ch->io_outstanding == 0); 2304 assert(shared_resource->ref > 0); 2305 shared_resource->ref--; 2306 if (shared_resource->ref == 0) { 2307 assert(shared_resource->io_outstanding == 0); 2308 TAILQ_REMOVE(&shared_resource->mgmt_ch->shared_resources, shared_resource, link); 2309 spdk_put_io_channel(spdk_io_channel_from_ctx(shared_resource->mgmt_ch)); 2310 free(shared_resource); 2311 } 2312 } 2313 2314 /* Caller must hold bdev->internal.mutex. */ 2315 static void 2316 bdev_enable_qos(struct spdk_bdev *bdev, struct spdk_bdev_channel *ch) 2317 { 2318 struct spdk_bdev_qos *qos = bdev->internal.qos; 2319 int i; 2320 2321 /* Rate limiting on this bdev enabled */ 2322 if (qos) { 2323 if (qos->ch == NULL) { 2324 struct spdk_io_channel *io_ch; 2325 2326 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Selecting channel %p as QoS channel for bdev %s on thread %p\n", ch, 2327 bdev->name, spdk_get_thread()); 2328 2329 /* No qos channel has been selected, so set one up */ 2330 2331 /* Take another reference to ch */ 2332 io_ch = spdk_get_io_channel(__bdev_to_io_dev(bdev)); 2333 assert(io_ch != NULL); 2334 qos->ch = ch; 2335 2336 qos->thread = spdk_io_channel_get_thread(io_ch); 2337 2338 TAILQ_INIT(&qos->queued); 2339 2340 for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) { 2341 if (bdev_qos_is_iops_rate_limit(i) == true) { 2342 qos->rate_limits[i].min_per_timeslice = 2343 SPDK_BDEV_QOS_MIN_IO_PER_TIMESLICE; 2344 } else { 2345 qos->rate_limits[i].min_per_timeslice = 2346 SPDK_BDEV_QOS_MIN_BYTE_PER_TIMESLICE; 2347 } 2348 2349 if (qos->rate_limits[i].limit == 0) { 2350 qos->rate_limits[i].limit = SPDK_BDEV_QOS_LIMIT_NOT_DEFINED; 2351 } 2352 } 2353 bdev_qos_update_max_quota_per_timeslice(qos); 2354 qos->timeslice_size = 2355 SPDK_BDEV_QOS_TIMESLICE_IN_USEC * spdk_get_ticks_hz() / SPDK_SEC_TO_USEC; 2356 qos->last_timeslice = spdk_get_ticks(); 2357 qos->poller = SPDK_POLLER_REGISTER(bdev_channel_poll_qos, 2358 qos, 2359 SPDK_BDEV_QOS_TIMESLICE_IN_USEC); 2360 } 2361 2362 ch->flags |= BDEV_CH_QOS_ENABLED; 2363 } 2364 } 2365 2366 struct poll_timeout_ctx { 2367 struct spdk_bdev_desc *desc; 2368 uint64_t timeout_in_sec; 2369 spdk_bdev_io_timeout_cb cb_fn; 2370 void *cb_arg; 2371 }; 2372 2373 static void 2374 bdev_desc_free(struct spdk_bdev_desc *desc) 2375 { 2376 pthread_mutex_destroy(&desc->mutex); 2377 free(desc->media_events_buffer); 2378 free(desc); 2379 } 2380 2381 static void 2382 bdev_channel_poll_timeout_io_done(struct spdk_io_channel_iter *i, int status) 2383 { 2384 struct poll_timeout_ctx *ctx = spdk_io_channel_iter_get_ctx(i); 2385 struct spdk_bdev_desc *desc = ctx->desc; 2386 2387 free(ctx); 2388 2389 pthread_mutex_lock(&desc->mutex); 2390 desc->refs--; 2391 if (desc->closed == true && desc->refs == 0) { 2392 pthread_mutex_unlock(&desc->mutex); 2393 bdev_desc_free(desc); 2394 return; 2395 } 2396 pthread_mutex_unlock(&desc->mutex); 2397 } 2398 2399 static void 2400 bdev_channel_poll_timeout_io(struct spdk_io_channel_iter *i) 2401 { 2402 struct poll_timeout_ctx *ctx = spdk_io_channel_iter_get_ctx(i); 2403 struct spdk_io_channel *io_ch = spdk_io_channel_iter_get_channel(i); 2404 struct spdk_bdev_channel *bdev_ch = spdk_io_channel_get_ctx(io_ch); 2405 struct spdk_bdev_desc *desc = ctx->desc; 2406 struct spdk_bdev_io *bdev_io; 2407 uint64_t now; 2408 2409 pthread_mutex_lock(&desc->mutex); 2410 if (desc->closed == true) { 2411 pthread_mutex_unlock(&desc->mutex); 2412 spdk_for_each_channel_continue(i, -1); 2413 return; 2414 } 2415 pthread_mutex_unlock(&desc->mutex); 2416 2417 now = spdk_get_ticks(); 2418 TAILQ_FOREACH(bdev_io, &bdev_ch->io_submitted, internal.ch_link) { 2419 /* I/O are added to this TAILQ as they are submitted. 2420 * So once we find an I/O that has not timed out, we can immediately exit the loop. */ 2421 if (now < (bdev_io->internal.submit_tsc + 2422 ctx->timeout_in_sec * spdk_get_ticks_hz())) { 2423 goto end; 2424 } 2425 2426 if (bdev_io->internal.desc == desc) { 2427 ctx->cb_fn(ctx->cb_arg, bdev_io); 2428 } 2429 } 2430 2431 end: 2432 spdk_for_each_channel_continue(i, 0); 2433 } 2434 2435 static int 2436 bdev_poll_timeout_io(void *arg) 2437 { 2438 struct spdk_bdev_desc *desc = arg; 2439 struct spdk_bdev *bdev = spdk_bdev_desc_get_bdev(desc); 2440 struct poll_timeout_ctx *ctx; 2441 2442 ctx = calloc(1, sizeof(struct poll_timeout_ctx)); 2443 if (!ctx) { 2444 SPDK_ERRLOG("failed to allocate memory\n"); 2445 return 1; 2446 } 2447 ctx->desc = desc; 2448 ctx->cb_arg = desc->cb_arg; 2449 ctx->cb_fn = desc->cb_fn; 2450 ctx->timeout_in_sec = desc->timeout_in_sec; 2451 2452 /* Take a ref on the descriptor in case it gets closed while we are checking 2453 * all of the channels. 2454 */ 2455 pthread_mutex_lock(&desc->mutex); 2456 desc->refs++; 2457 pthread_mutex_unlock(&desc->mutex); 2458 2459 spdk_for_each_channel(__bdev_to_io_dev(bdev), 2460 bdev_channel_poll_timeout_io, 2461 ctx, 2462 bdev_channel_poll_timeout_io_done); 2463 2464 return 1; 2465 } 2466 2467 int 2468 spdk_bdev_set_timeout(struct spdk_bdev_desc *desc, uint64_t timeout_in_sec, 2469 spdk_bdev_io_timeout_cb cb_fn, void *cb_arg) 2470 { 2471 assert(desc->thread == spdk_get_thread()); 2472 2473 spdk_poller_unregister(&desc->io_timeout_poller); 2474 2475 if (timeout_in_sec) { 2476 assert(cb_fn != NULL); 2477 desc->io_timeout_poller = SPDK_POLLER_REGISTER(bdev_poll_timeout_io, 2478 desc, 2479 SPDK_BDEV_IO_POLL_INTERVAL_IN_MSEC * SPDK_SEC_TO_USEC / 2480 1000); 2481 if (desc->io_timeout_poller == NULL) { 2482 SPDK_ERRLOG("can not register the desc timeout IO poller\n"); 2483 return -1; 2484 } 2485 } 2486 2487 desc->cb_fn = cb_fn; 2488 desc->cb_arg = cb_arg; 2489 desc->timeout_in_sec = timeout_in_sec; 2490 2491 return 0; 2492 } 2493 2494 static int 2495 bdev_channel_create(void *io_device, void *ctx_buf) 2496 { 2497 struct spdk_bdev *bdev = __bdev_from_io_dev(io_device); 2498 struct spdk_bdev_channel *ch = ctx_buf; 2499 struct spdk_io_channel *mgmt_io_ch; 2500 struct spdk_bdev_mgmt_channel *mgmt_ch; 2501 struct spdk_bdev_shared_resource *shared_resource; 2502 struct lba_range *range; 2503 2504 ch->bdev = bdev; 2505 ch->channel = bdev->fn_table->get_io_channel(bdev->ctxt); 2506 if (!ch->channel) { 2507 return -1; 2508 } 2509 2510 assert(ch->histogram == NULL); 2511 if (bdev->internal.histogram_enabled) { 2512 ch->histogram = spdk_histogram_data_alloc(); 2513 if (ch->histogram == NULL) { 2514 SPDK_ERRLOG("Could not allocate histogram\n"); 2515 } 2516 } 2517 2518 mgmt_io_ch = spdk_get_io_channel(&g_bdev_mgr); 2519 if (!mgmt_io_ch) { 2520 spdk_put_io_channel(ch->channel); 2521 return -1; 2522 } 2523 2524 mgmt_ch = spdk_io_channel_get_ctx(mgmt_io_ch); 2525 TAILQ_FOREACH(shared_resource, &mgmt_ch->shared_resources, link) { 2526 if (shared_resource->shared_ch == ch->channel) { 2527 spdk_put_io_channel(mgmt_io_ch); 2528 shared_resource->ref++; 2529 break; 2530 } 2531 } 2532 2533 if (shared_resource == NULL) { 2534 shared_resource = calloc(1, sizeof(*shared_resource)); 2535 if (shared_resource == NULL) { 2536 spdk_put_io_channel(ch->channel); 2537 spdk_put_io_channel(mgmt_io_ch); 2538 return -1; 2539 } 2540 2541 shared_resource->mgmt_ch = mgmt_ch; 2542 shared_resource->io_outstanding = 0; 2543 TAILQ_INIT(&shared_resource->nomem_io); 2544 shared_resource->nomem_threshold = 0; 2545 shared_resource->shared_ch = ch->channel; 2546 shared_resource->ref = 1; 2547 TAILQ_INSERT_TAIL(&mgmt_ch->shared_resources, shared_resource, link); 2548 } 2549 2550 memset(&ch->stat, 0, sizeof(ch->stat)); 2551 ch->stat.ticks_rate = spdk_get_ticks_hz(); 2552 ch->io_outstanding = 0; 2553 TAILQ_INIT(&ch->queued_resets); 2554 TAILQ_INIT(&ch->locked_ranges); 2555 ch->flags = 0; 2556 ch->shared_resource = shared_resource; 2557 2558 TAILQ_INIT(&ch->io_submitted); 2559 TAILQ_INIT(&ch->io_locked); 2560 2561 #ifdef SPDK_CONFIG_VTUNE 2562 { 2563 char *name; 2564 __itt_init_ittlib(NULL, 0); 2565 name = spdk_sprintf_alloc("spdk_bdev_%s_%p", ch->bdev->name, ch); 2566 if (!name) { 2567 bdev_channel_destroy_resource(ch); 2568 return -1; 2569 } 2570 ch->handle = __itt_string_handle_create(name); 2571 free(name); 2572 ch->start_tsc = spdk_get_ticks(); 2573 ch->interval_tsc = spdk_get_ticks_hz() / 100; 2574 memset(&ch->prev_stat, 0, sizeof(ch->prev_stat)); 2575 } 2576 #endif 2577 2578 pthread_mutex_lock(&bdev->internal.mutex); 2579 bdev_enable_qos(bdev, ch); 2580 2581 TAILQ_FOREACH(range, &bdev->internal.locked_ranges, tailq) { 2582 struct lba_range *new_range; 2583 2584 new_range = calloc(1, sizeof(*new_range)); 2585 if (new_range == NULL) { 2586 pthread_mutex_unlock(&bdev->internal.mutex); 2587 bdev_channel_destroy_resource(ch); 2588 return -1; 2589 } 2590 new_range->length = range->length; 2591 new_range->offset = range->offset; 2592 new_range->locked_ctx = range->locked_ctx; 2593 TAILQ_INSERT_TAIL(&ch->locked_ranges, new_range, tailq); 2594 } 2595 2596 pthread_mutex_unlock(&bdev->internal.mutex); 2597 2598 return 0; 2599 } 2600 2601 /* 2602 * Abort I/O that are waiting on a data buffer. These types of I/O are 2603 * linked using the spdk_bdev_io internal.buf_link TAILQ_ENTRY. 2604 */ 2605 static void 2606 bdev_abort_buf_io(bdev_io_stailq_t *queue, struct spdk_bdev_channel *ch) 2607 { 2608 bdev_io_stailq_t tmp; 2609 struct spdk_bdev_io *bdev_io; 2610 2611 STAILQ_INIT(&tmp); 2612 2613 while (!STAILQ_EMPTY(queue)) { 2614 bdev_io = STAILQ_FIRST(queue); 2615 STAILQ_REMOVE_HEAD(queue, internal.buf_link); 2616 if (bdev_io->internal.ch == ch) { 2617 spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED); 2618 } else { 2619 STAILQ_INSERT_TAIL(&tmp, bdev_io, internal.buf_link); 2620 } 2621 } 2622 2623 STAILQ_SWAP(&tmp, queue, spdk_bdev_io); 2624 } 2625 2626 /* 2627 * Abort I/O that are queued waiting for submission. These types of I/O are 2628 * linked using the spdk_bdev_io link TAILQ_ENTRY. 2629 */ 2630 static void 2631 bdev_abort_all_queued_io(bdev_io_tailq_t *queue, struct spdk_bdev_channel *ch) 2632 { 2633 struct spdk_bdev_io *bdev_io, *tmp; 2634 2635 TAILQ_FOREACH_SAFE(bdev_io, queue, internal.link, tmp) { 2636 if (bdev_io->internal.ch == ch) { 2637 TAILQ_REMOVE(queue, bdev_io, internal.link); 2638 /* 2639 * spdk_bdev_io_complete() assumes that the completed I/O had 2640 * been submitted to the bdev module. Since in this case it 2641 * hadn't, bump io_outstanding to account for the decrement 2642 * that spdk_bdev_io_complete() will do. 2643 */ 2644 if (bdev_io->type != SPDK_BDEV_IO_TYPE_RESET) { 2645 ch->io_outstanding++; 2646 ch->shared_resource->io_outstanding++; 2647 } 2648 spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED); 2649 } 2650 } 2651 } 2652 2653 static bool 2654 bdev_abort_queued_io(bdev_io_tailq_t *queue, struct spdk_bdev_io *bio_to_abort) 2655 { 2656 struct spdk_bdev_io *bdev_io; 2657 2658 TAILQ_FOREACH(bdev_io, queue, internal.link) { 2659 if (bdev_io == bio_to_abort) { 2660 TAILQ_REMOVE(queue, bio_to_abort, internal.link); 2661 spdk_bdev_io_complete(bio_to_abort, SPDK_BDEV_IO_STATUS_ABORTED); 2662 return true; 2663 } 2664 } 2665 2666 return false; 2667 } 2668 2669 static void 2670 bdev_qos_channel_destroy(void *cb_arg) 2671 { 2672 struct spdk_bdev_qos *qos = cb_arg; 2673 2674 spdk_put_io_channel(spdk_io_channel_from_ctx(qos->ch)); 2675 spdk_poller_unregister(&qos->poller); 2676 2677 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Free QoS %p.\n", qos); 2678 2679 free(qos); 2680 } 2681 2682 static int 2683 bdev_qos_destroy(struct spdk_bdev *bdev) 2684 { 2685 int i; 2686 2687 /* 2688 * Cleanly shutting down the QoS poller is tricky, because 2689 * during the asynchronous operation the user could open 2690 * a new descriptor and create a new channel, spawning 2691 * a new QoS poller. 2692 * 2693 * The strategy is to create a new QoS structure here and swap it 2694 * in. The shutdown path then continues to refer to the old one 2695 * until it completes and then releases it. 2696 */ 2697 struct spdk_bdev_qos *new_qos, *old_qos; 2698 2699 old_qos = bdev->internal.qos; 2700 2701 new_qos = calloc(1, sizeof(*new_qos)); 2702 if (!new_qos) { 2703 SPDK_ERRLOG("Unable to allocate memory to shut down QoS.\n"); 2704 return -ENOMEM; 2705 } 2706 2707 /* Copy the old QoS data into the newly allocated structure */ 2708 memcpy(new_qos, old_qos, sizeof(*new_qos)); 2709 2710 /* Zero out the key parts of the QoS structure */ 2711 new_qos->ch = NULL; 2712 new_qos->thread = NULL; 2713 new_qos->poller = NULL; 2714 TAILQ_INIT(&new_qos->queued); 2715 /* 2716 * The limit member of spdk_bdev_qos_limit structure is not zeroed. 2717 * It will be used later for the new QoS structure. 2718 */ 2719 for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) { 2720 new_qos->rate_limits[i].remaining_this_timeslice = 0; 2721 new_qos->rate_limits[i].min_per_timeslice = 0; 2722 new_qos->rate_limits[i].max_per_timeslice = 0; 2723 } 2724 2725 bdev->internal.qos = new_qos; 2726 2727 if (old_qos->thread == NULL) { 2728 free(old_qos); 2729 } else { 2730 spdk_thread_send_msg(old_qos->thread, bdev_qos_channel_destroy, old_qos); 2731 } 2732 2733 /* It is safe to continue with destroying the bdev even though the QoS channel hasn't 2734 * been destroyed yet. The destruction path will end up waiting for the final 2735 * channel to be put before it releases resources. */ 2736 2737 return 0; 2738 } 2739 2740 static void 2741 bdev_io_stat_add(struct spdk_bdev_io_stat *total, struct spdk_bdev_io_stat *add) 2742 { 2743 total->bytes_read += add->bytes_read; 2744 total->num_read_ops += add->num_read_ops; 2745 total->bytes_written += add->bytes_written; 2746 total->num_write_ops += add->num_write_ops; 2747 total->bytes_unmapped += add->bytes_unmapped; 2748 total->num_unmap_ops += add->num_unmap_ops; 2749 total->read_latency_ticks += add->read_latency_ticks; 2750 total->write_latency_ticks += add->write_latency_ticks; 2751 total->unmap_latency_ticks += add->unmap_latency_ticks; 2752 } 2753 2754 static void 2755 bdev_channel_destroy(void *io_device, void *ctx_buf) 2756 { 2757 struct spdk_bdev_channel *ch = ctx_buf; 2758 struct spdk_bdev_mgmt_channel *mgmt_ch; 2759 struct spdk_bdev_shared_resource *shared_resource = ch->shared_resource; 2760 2761 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Destroying channel %p for bdev %s on thread %p\n", ch, ch->bdev->name, 2762 spdk_get_thread()); 2763 2764 /* This channel is going away, so add its statistics into the bdev so that they don't get lost. */ 2765 pthread_mutex_lock(&ch->bdev->internal.mutex); 2766 bdev_io_stat_add(&ch->bdev->internal.stat, &ch->stat); 2767 pthread_mutex_unlock(&ch->bdev->internal.mutex); 2768 2769 mgmt_ch = shared_resource->mgmt_ch; 2770 2771 bdev_abort_all_queued_io(&ch->queued_resets, ch); 2772 bdev_abort_all_queued_io(&shared_resource->nomem_io, ch); 2773 bdev_abort_buf_io(&mgmt_ch->need_buf_small, ch); 2774 bdev_abort_buf_io(&mgmt_ch->need_buf_large, ch); 2775 2776 if (ch->histogram) { 2777 spdk_histogram_data_free(ch->histogram); 2778 } 2779 2780 bdev_channel_destroy_resource(ch); 2781 } 2782 2783 int 2784 spdk_bdev_alias_add(struct spdk_bdev *bdev, const char *alias) 2785 { 2786 struct spdk_bdev_alias *tmp; 2787 2788 if (alias == NULL) { 2789 SPDK_ERRLOG("Empty alias passed\n"); 2790 return -EINVAL; 2791 } 2792 2793 if (spdk_bdev_get_by_name(alias)) { 2794 SPDK_ERRLOG("Bdev name/alias: %s already exists\n", alias); 2795 return -EEXIST; 2796 } 2797 2798 tmp = calloc(1, sizeof(*tmp)); 2799 if (tmp == NULL) { 2800 SPDK_ERRLOG("Unable to allocate alias\n"); 2801 return -ENOMEM; 2802 } 2803 2804 tmp->alias = strdup(alias); 2805 if (tmp->alias == NULL) { 2806 free(tmp); 2807 SPDK_ERRLOG("Unable to allocate alias\n"); 2808 return -ENOMEM; 2809 } 2810 2811 TAILQ_INSERT_TAIL(&bdev->aliases, tmp, tailq); 2812 2813 return 0; 2814 } 2815 2816 int 2817 spdk_bdev_alias_del(struct spdk_bdev *bdev, const char *alias) 2818 { 2819 struct spdk_bdev_alias *tmp; 2820 2821 TAILQ_FOREACH(tmp, &bdev->aliases, tailq) { 2822 if (strcmp(alias, tmp->alias) == 0) { 2823 TAILQ_REMOVE(&bdev->aliases, tmp, tailq); 2824 free(tmp->alias); 2825 free(tmp); 2826 return 0; 2827 } 2828 } 2829 2830 SPDK_INFOLOG(SPDK_LOG_BDEV, "Alias %s does not exists\n", alias); 2831 2832 return -ENOENT; 2833 } 2834 2835 void 2836 spdk_bdev_alias_del_all(struct spdk_bdev *bdev) 2837 { 2838 struct spdk_bdev_alias *p, *tmp; 2839 2840 TAILQ_FOREACH_SAFE(p, &bdev->aliases, tailq, tmp) { 2841 TAILQ_REMOVE(&bdev->aliases, p, tailq); 2842 free(p->alias); 2843 free(p); 2844 } 2845 } 2846 2847 struct spdk_io_channel * 2848 spdk_bdev_get_io_channel(struct spdk_bdev_desc *desc) 2849 { 2850 return spdk_get_io_channel(__bdev_to_io_dev(spdk_bdev_desc_get_bdev(desc))); 2851 } 2852 2853 const char * 2854 spdk_bdev_get_name(const struct spdk_bdev *bdev) 2855 { 2856 return bdev->name; 2857 } 2858 2859 const char * 2860 spdk_bdev_get_product_name(const struct spdk_bdev *bdev) 2861 { 2862 return bdev->product_name; 2863 } 2864 2865 const struct spdk_bdev_aliases_list * 2866 spdk_bdev_get_aliases(const struct spdk_bdev *bdev) 2867 { 2868 return &bdev->aliases; 2869 } 2870 2871 uint32_t 2872 spdk_bdev_get_block_size(const struct spdk_bdev *bdev) 2873 { 2874 return bdev->blocklen; 2875 } 2876 2877 uint32_t 2878 spdk_bdev_get_write_unit_size(const struct spdk_bdev *bdev) 2879 { 2880 return bdev->write_unit_size; 2881 } 2882 2883 uint64_t 2884 spdk_bdev_get_num_blocks(const struct spdk_bdev *bdev) 2885 { 2886 return bdev->blockcnt; 2887 } 2888 2889 const char * 2890 spdk_bdev_get_qos_rpc_type(enum spdk_bdev_qos_rate_limit_type type) 2891 { 2892 return qos_rpc_type[type]; 2893 } 2894 2895 void 2896 spdk_bdev_get_qos_rate_limits(struct spdk_bdev *bdev, uint64_t *limits) 2897 { 2898 int i; 2899 2900 memset(limits, 0, sizeof(*limits) * SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES); 2901 2902 pthread_mutex_lock(&bdev->internal.mutex); 2903 if (bdev->internal.qos) { 2904 for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) { 2905 if (bdev->internal.qos->rate_limits[i].limit != 2906 SPDK_BDEV_QOS_LIMIT_NOT_DEFINED) { 2907 limits[i] = bdev->internal.qos->rate_limits[i].limit; 2908 if (bdev_qos_is_iops_rate_limit(i) == false) { 2909 /* Change from Byte to Megabyte which is user visible. */ 2910 limits[i] = limits[i] / 1024 / 1024; 2911 } 2912 } 2913 } 2914 } 2915 pthread_mutex_unlock(&bdev->internal.mutex); 2916 } 2917 2918 size_t 2919 spdk_bdev_get_buf_align(const struct spdk_bdev *bdev) 2920 { 2921 return 1 << bdev->required_alignment; 2922 } 2923 2924 uint32_t 2925 spdk_bdev_get_optimal_io_boundary(const struct spdk_bdev *bdev) 2926 { 2927 return bdev->optimal_io_boundary; 2928 } 2929 2930 bool 2931 spdk_bdev_has_write_cache(const struct spdk_bdev *bdev) 2932 { 2933 return bdev->write_cache; 2934 } 2935 2936 const struct spdk_uuid * 2937 spdk_bdev_get_uuid(const struct spdk_bdev *bdev) 2938 { 2939 return &bdev->uuid; 2940 } 2941 2942 uint16_t 2943 spdk_bdev_get_acwu(const struct spdk_bdev *bdev) 2944 { 2945 return bdev->acwu; 2946 } 2947 2948 uint32_t 2949 spdk_bdev_get_md_size(const struct spdk_bdev *bdev) 2950 { 2951 return bdev->md_len; 2952 } 2953 2954 bool 2955 spdk_bdev_is_md_interleaved(const struct spdk_bdev *bdev) 2956 { 2957 return (bdev->md_len != 0) && bdev->md_interleave; 2958 } 2959 2960 bool 2961 spdk_bdev_is_md_separate(const struct spdk_bdev *bdev) 2962 { 2963 return (bdev->md_len != 0) && !bdev->md_interleave; 2964 } 2965 2966 bool 2967 spdk_bdev_is_zoned(const struct spdk_bdev *bdev) 2968 { 2969 return bdev->zoned; 2970 } 2971 2972 uint32_t 2973 spdk_bdev_get_data_block_size(const struct spdk_bdev *bdev) 2974 { 2975 if (spdk_bdev_is_md_interleaved(bdev)) { 2976 return bdev->blocklen - bdev->md_len; 2977 } else { 2978 return bdev->blocklen; 2979 } 2980 } 2981 2982 static uint32_t 2983 _bdev_get_block_size_with_md(const struct spdk_bdev *bdev) 2984 { 2985 if (!spdk_bdev_is_md_interleaved(bdev)) { 2986 return bdev->blocklen + bdev->md_len; 2987 } else { 2988 return bdev->blocklen; 2989 } 2990 } 2991 2992 enum spdk_dif_type spdk_bdev_get_dif_type(const struct spdk_bdev *bdev) 2993 { 2994 if (bdev->md_len != 0) { 2995 return bdev->dif_type; 2996 } else { 2997 return SPDK_DIF_DISABLE; 2998 } 2999 } 3000 3001 bool 3002 spdk_bdev_is_dif_head_of_md(const struct spdk_bdev *bdev) 3003 { 3004 if (spdk_bdev_get_dif_type(bdev) != SPDK_DIF_DISABLE) { 3005 return bdev->dif_is_head_of_md; 3006 } else { 3007 return false; 3008 } 3009 } 3010 3011 bool 3012 spdk_bdev_is_dif_check_enabled(const struct spdk_bdev *bdev, 3013 enum spdk_dif_check_type check_type) 3014 { 3015 if (spdk_bdev_get_dif_type(bdev) == SPDK_DIF_DISABLE) { 3016 return false; 3017 } 3018 3019 switch (check_type) { 3020 case SPDK_DIF_CHECK_TYPE_REFTAG: 3021 return (bdev->dif_check_flags & SPDK_DIF_FLAGS_REFTAG_CHECK) != 0; 3022 case SPDK_DIF_CHECK_TYPE_APPTAG: 3023 return (bdev->dif_check_flags & SPDK_DIF_FLAGS_APPTAG_CHECK) != 0; 3024 case SPDK_DIF_CHECK_TYPE_GUARD: 3025 return (bdev->dif_check_flags & SPDK_DIF_FLAGS_GUARD_CHECK) != 0; 3026 default: 3027 return false; 3028 } 3029 } 3030 3031 uint64_t 3032 spdk_bdev_get_qd(const struct spdk_bdev *bdev) 3033 { 3034 return bdev->internal.measured_queue_depth; 3035 } 3036 3037 uint64_t 3038 spdk_bdev_get_qd_sampling_period(const struct spdk_bdev *bdev) 3039 { 3040 return bdev->internal.period; 3041 } 3042 3043 uint64_t 3044 spdk_bdev_get_weighted_io_time(const struct spdk_bdev *bdev) 3045 { 3046 return bdev->internal.weighted_io_time; 3047 } 3048 3049 uint64_t 3050 spdk_bdev_get_io_time(const struct spdk_bdev *bdev) 3051 { 3052 return bdev->internal.io_time; 3053 } 3054 3055 static void 3056 _calculate_measured_qd_cpl(struct spdk_io_channel_iter *i, int status) 3057 { 3058 struct spdk_bdev *bdev = spdk_io_channel_iter_get_ctx(i); 3059 3060 bdev->internal.measured_queue_depth = bdev->internal.temporary_queue_depth; 3061 3062 if (bdev->internal.measured_queue_depth) { 3063 bdev->internal.io_time += bdev->internal.period; 3064 bdev->internal.weighted_io_time += bdev->internal.period * bdev->internal.measured_queue_depth; 3065 } 3066 } 3067 3068 static void 3069 _calculate_measured_qd(struct spdk_io_channel_iter *i) 3070 { 3071 struct spdk_bdev *bdev = spdk_io_channel_iter_get_ctx(i); 3072 struct spdk_io_channel *io_ch = spdk_io_channel_iter_get_channel(i); 3073 struct spdk_bdev_channel *ch = spdk_io_channel_get_ctx(io_ch); 3074 3075 bdev->internal.temporary_queue_depth += ch->io_outstanding; 3076 spdk_for_each_channel_continue(i, 0); 3077 } 3078 3079 static int 3080 bdev_calculate_measured_queue_depth(void *ctx) 3081 { 3082 struct spdk_bdev *bdev = ctx; 3083 bdev->internal.temporary_queue_depth = 0; 3084 spdk_for_each_channel(__bdev_to_io_dev(bdev), _calculate_measured_qd, bdev, 3085 _calculate_measured_qd_cpl); 3086 return 0; 3087 } 3088 3089 void 3090 spdk_bdev_set_qd_sampling_period(struct spdk_bdev *bdev, uint64_t period) 3091 { 3092 bdev->internal.period = period; 3093 3094 if (bdev->internal.qd_poller != NULL) { 3095 spdk_poller_unregister(&bdev->internal.qd_poller); 3096 bdev->internal.measured_queue_depth = UINT64_MAX; 3097 } 3098 3099 if (period != 0) { 3100 bdev->internal.qd_poller = SPDK_POLLER_REGISTER(bdev_calculate_measured_queue_depth, bdev, 3101 period); 3102 } 3103 } 3104 3105 static void 3106 _resize_notify(void *arg) 3107 { 3108 struct spdk_bdev_desc *desc = arg; 3109 3110 pthread_mutex_lock(&desc->mutex); 3111 desc->refs--; 3112 if (!desc->closed) { 3113 pthread_mutex_unlock(&desc->mutex); 3114 desc->callback.event_fn(SPDK_BDEV_EVENT_RESIZE, 3115 desc->bdev, 3116 desc->callback.ctx); 3117 return; 3118 } else if (0 == desc->refs) { 3119 /* This descriptor was closed after this resize_notify message was sent. 3120 * spdk_bdev_close() could not free the descriptor since this message was 3121 * in flight, so we free it now using bdev_desc_free(). 3122 */ 3123 pthread_mutex_unlock(&desc->mutex); 3124 bdev_desc_free(desc); 3125 return; 3126 } 3127 pthread_mutex_unlock(&desc->mutex); 3128 } 3129 3130 int 3131 spdk_bdev_notify_blockcnt_change(struct spdk_bdev *bdev, uint64_t size) 3132 { 3133 struct spdk_bdev_desc *desc; 3134 int ret; 3135 3136 pthread_mutex_lock(&bdev->internal.mutex); 3137 3138 /* bdev has open descriptors */ 3139 if (!TAILQ_EMPTY(&bdev->internal.open_descs) && 3140 bdev->blockcnt > size) { 3141 ret = -EBUSY; 3142 } else { 3143 bdev->blockcnt = size; 3144 TAILQ_FOREACH(desc, &bdev->internal.open_descs, link) { 3145 pthread_mutex_lock(&desc->mutex); 3146 if (desc->callback.open_with_ext && !desc->closed) { 3147 desc->refs++; 3148 spdk_thread_send_msg(desc->thread, _resize_notify, desc); 3149 } 3150 pthread_mutex_unlock(&desc->mutex); 3151 } 3152 ret = 0; 3153 } 3154 3155 pthread_mutex_unlock(&bdev->internal.mutex); 3156 3157 return ret; 3158 } 3159 3160 /* 3161 * Convert I/O offset and length from bytes to blocks. 3162 * 3163 * Returns zero on success or non-zero if the byte parameters aren't divisible by the block size. 3164 */ 3165 static uint64_t 3166 bdev_bytes_to_blocks(struct spdk_bdev *bdev, uint64_t offset_bytes, uint64_t *offset_blocks, 3167 uint64_t num_bytes, uint64_t *num_blocks) 3168 { 3169 uint32_t block_size = bdev->blocklen; 3170 uint8_t shift_cnt; 3171 3172 /* Avoid expensive div operations if possible. These spdk_u32 functions are very cheap. */ 3173 if (spdk_likely(spdk_u32_is_pow2(block_size))) { 3174 shift_cnt = spdk_u32log2(block_size); 3175 *offset_blocks = offset_bytes >> shift_cnt; 3176 *num_blocks = num_bytes >> shift_cnt; 3177 return (offset_bytes - (*offset_blocks << shift_cnt)) | 3178 (num_bytes - (*num_blocks << shift_cnt)); 3179 } else { 3180 *offset_blocks = offset_bytes / block_size; 3181 *num_blocks = num_bytes / block_size; 3182 return (offset_bytes % block_size) | (num_bytes % block_size); 3183 } 3184 } 3185 3186 static bool 3187 bdev_io_valid_blocks(struct spdk_bdev *bdev, uint64_t offset_blocks, uint64_t num_blocks) 3188 { 3189 /* Return failure if offset_blocks + num_blocks is less than offset_blocks; indicates there 3190 * has been an overflow and hence the offset has been wrapped around */ 3191 if (offset_blocks + num_blocks < offset_blocks) { 3192 return false; 3193 } 3194 3195 /* Return failure if offset_blocks + num_blocks exceeds the size of the bdev */ 3196 if (offset_blocks + num_blocks > bdev->blockcnt) { 3197 return false; 3198 } 3199 3200 return true; 3201 } 3202 3203 static bool 3204 _bdev_io_check_md_buf(const struct iovec *iovs, const void *md_buf) 3205 { 3206 return _is_buf_allocated(iovs) == (md_buf != NULL); 3207 } 3208 3209 static int 3210 bdev_read_blocks_with_md(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, void *buf, 3211 void *md_buf, int64_t offset_blocks, uint64_t num_blocks, 3212 spdk_bdev_io_completion_cb cb, void *cb_arg) 3213 { 3214 struct spdk_bdev *bdev = spdk_bdev_desc_get_bdev(desc); 3215 struct spdk_bdev_io *bdev_io; 3216 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 3217 3218 if (!bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) { 3219 return -EINVAL; 3220 } 3221 3222 bdev_io = bdev_channel_get_io(channel); 3223 if (!bdev_io) { 3224 return -ENOMEM; 3225 } 3226 3227 bdev_io->internal.ch = channel; 3228 bdev_io->internal.desc = desc; 3229 bdev_io->type = SPDK_BDEV_IO_TYPE_READ; 3230 bdev_io->u.bdev.iovs = &bdev_io->iov; 3231 bdev_io->u.bdev.iovs[0].iov_base = buf; 3232 bdev_io->u.bdev.iovs[0].iov_len = num_blocks * bdev->blocklen; 3233 bdev_io->u.bdev.iovcnt = 1; 3234 bdev_io->u.bdev.md_buf = md_buf; 3235 bdev_io->u.bdev.num_blocks = num_blocks; 3236 bdev_io->u.bdev.offset_blocks = offset_blocks; 3237 bdev_io_init(bdev_io, bdev, cb_arg, cb); 3238 3239 bdev_io_submit(bdev_io); 3240 return 0; 3241 } 3242 3243 int 3244 spdk_bdev_read(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 3245 void *buf, uint64_t offset, uint64_t nbytes, 3246 spdk_bdev_io_completion_cb cb, void *cb_arg) 3247 { 3248 uint64_t offset_blocks, num_blocks; 3249 3250 if (bdev_bytes_to_blocks(spdk_bdev_desc_get_bdev(desc), offset, &offset_blocks, 3251 nbytes, &num_blocks) != 0) { 3252 return -EINVAL; 3253 } 3254 3255 return spdk_bdev_read_blocks(desc, ch, buf, offset_blocks, num_blocks, cb, cb_arg); 3256 } 3257 3258 int 3259 spdk_bdev_read_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 3260 void *buf, uint64_t offset_blocks, uint64_t num_blocks, 3261 spdk_bdev_io_completion_cb cb, void *cb_arg) 3262 { 3263 return bdev_read_blocks_with_md(desc, ch, buf, NULL, offset_blocks, num_blocks, cb, cb_arg); 3264 } 3265 3266 int 3267 spdk_bdev_read_blocks_with_md(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 3268 void *buf, void *md_buf, int64_t offset_blocks, uint64_t num_blocks, 3269 spdk_bdev_io_completion_cb cb, void *cb_arg) 3270 { 3271 struct iovec iov = { 3272 .iov_base = buf, 3273 }; 3274 3275 if (!spdk_bdev_is_md_separate(spdk_bdev_desc_get_bdev(desc))) { 3276 return -EINVAL; 3277 } 3278 3279 if (!_bdev_io_check_md_buf(&iov, md_buf)) { 3280 return -EINVAL; 3281 } 3282 3283 return bdev_read_blocks_with_md(desc, ch, buf, md_buf, offset_blocks, num_blocks, 3284 cb, cb_arg); 3285 } 3286 3287 int 3288 spdk_bdev_readv(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 3289 struct iovec *iov, int iovcnt, 3290 uint64_t offset, uint64_t nbytes, 3291 spdk_bdev_io_completion_cb cb, void *cb_arg) 3292 { 3293 uint64_t offset_blocks, num_blocks; 3294 3295 if (bdev_bytes_to_blocks(spdk_bdev_desc_get_bdev(desc), offset, &offset_blocks, 3296 nbytes, &num_blocks) != 0) { 3297 return -EINVAL; 3298 } 3299 3300 return spdk_bdev_readv_blocks(desc, ch, iov, iovcnt, offset_blocks, num_blocks, cb, cb_arg); 3301 } 3302 3303 static int 3304 bdev_readv_blocks_with_md(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 3305 struct iovec *iov, int iovcnt, void *md_buf, uint64_t offset_blocks, 3306 uint64_t num_blocks, spdk_bdev_io_completion_cb cb, void *cb_arg) 3307 { 3308 struct spdk_bdev *bdev = spdk_bdev_desc_get_bdev(desc); 3309 struct spdk_bdev_io *bdev_io; 3310 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 3311 3312 if (!bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) { 3313 return -EINVAL; 3314 } 3315 3316 bdev_io = bdev_channel_get_io(channel); 3317 if (!bdev_io) { 3318 return -ENOMEM; 3319 } 3320 3321 bdev_io->internal.ch = channel; 3322 bdev_io->internal.desc = desc; 3323 bdev_io->type = SPDK_BDEV_IO_TYPE_READ; 3324 bdev_io->u.bdev.iovs = iov; 3325 bdev_io->u.bdev.iovcnt = iovcnt; 3326 bdev_io->u.bdev.md_buf = md_buf; 3327 bdev_io->u.bdev.num_blocks = num_blocks; 3328 bdev_io->u.bdev.offset_blocks = offset_blocks; 3329 bdev_io_init(bdev_io, bdev, cb_arg, cb); 3330 3331 bdev_io_submit(bdev_io); 3332 return 0; 3333 } 3334 3335 int spdk_bdev_readv_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 3336 struct iovec *iov, int iovcnt, 3337 uint64_t offset_blocks, uint64_t num_blocks, 3338 spdk_bdev_io_completion_cb cb, void *cb_arg) 3339 { 3340 return bdev_readv_blocks_with_md(desc, ch, iov, iovcnt, NULL, offset_blocks, 3341 num_blocks, cb, cb_arg); 3342 } 3343 3344 int 3345 spdk_bdev_readv_blocks_with_md(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 3346 struct iovec *iov, int iovcnt, void *md_buf, 3347 uint64_t offset_blocks, uint64_t num_blocks, 3348 spdk_bdev_io_completion_cb cb, void *cb_arg) 3349 { 3350 if (!spdk_bdev_is_md_separate(spdk_bdev_desc_get_bdev(desc))) { 3351 return -EINVAL; 3352 } 3353 3354 if (!_bdev_io_check_md_buf(iov, md_buf)) { 3355 return -EINVAL; 3356 } 3357 3358 return bdev_readv_blocks_with_md(desc, ch, iov, iovcnt, md_buf, offset_blocks, 3359 num_blocks, cb, cb_arg); 3360 } 3361 3362 static int 3363 bdev_write_blocks_with_md(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 3364 void *buf, void *md_buf, uint64_t offset_blocks, uint64_t num_blocks, 3365 spdk_bdev_io_completion_cb cb, void *cb_arg) 3366 { 3367 struct spdk_bdev *bdev = spdk_bdev_desc_get_bdev(desc); 3368 struct spdk_bdev_io *bdev_io; 3369 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 3370 3371 if (!desc->write) { 3372 return -EBADF; 3373 } 3374 3375 if (!bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) { 3376 return -EINVAL; 3377 } 3378 3379 bdev_io = bdev_channel_get_io(channel); 3380 if (!bdev_io) { 3381 return -ENOMEM; 3382 } 3383 3384 bdev_io->internal.ch = channel; 3385 bdev_io->internal.desc = desc; 3386 bdev_io->type = SPDK_BDEV_IO_TYPE_WRITE; 3387 bdev_io->u.bdev.iovs = &bdev_io->iov; 3388 bdev_io->u.bdev.iovs[0].iov_base = buf; 3389 bdev_io->u.bdev.iovs[0].iov_len = num_blocks * bdev->blocklen; 3390 bdev_io->u.bdev.iovcnt = 1; 3391 bdev_io->u.bdev.md_buf = md_buf; 3392 bdev_io->u.bdev.num_blocks = num_blocks; 3393 bdev_io->u.bdev.offset_blocks = offset_blocks; 3394 bdev_io_init(bdev_io, bdev, cb_arg, cb); 3395 3396 bdev_io_submit(bdev_io); 3397 return 0; 3398 } 3399 3400 int 3401 spdk_bdev_write(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 3402 void *buf, uint64_t offset, uint64_t nbytes, 3403 spdk_bdev_io_completion_cb cb, void *cb_arg) 3404 { 3405 uint64_t offset_blocks, num_blocks; 3406 3407 if (bdev_bytes_to_blocks(spdk_bdev_desc_get_bdev(desc), offset, &offset_blocks, 3408 nbytes, &num_blocks) != 0) { 3409 return -EINVAL; 3410 } 3411 3412 return spdk_bdev_write_blocks(desc, ch, buf, offset_blocks, num_blocks, cb, cb_arg); 3413 } 3414 3415 int 3416 spdk_bdev_write_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 3417 void *buf, uint64_t offset_blocks, uint64_t num_blocks, 3418 spdk_bdev_io_completion_cb cb, void *cb_arg) 3419 { 3420 return bdev_write_blocks_with_md(desc, ch, buf, NULL, offset_blocks, num_blocks, 3421 cb, cb_arg); 3422 } 3423 3424 int 3425 spdk_bdev_write_blocks_with_md(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 3426 void *buf, void *md_buf, uint64_t offset_blocks, uint64_t num_blocks, 3427 spdk_bdev_io_completion_cb cb, void *cb_arg) 3428 { 3429 struct iovec iov = { 3430 .iov_base = buf, 3431 }; 3432 3433 if (!spdk_bdev_is_md_separate(spdk_bdev_desc_get_bdev(desc))) { 3434 return -EINVAL; 3435 } 3436 3437 if (!_bdev_io_check_md_buf(&iov, md_buf)) { 3438 return -EINVAL; 3439 } 3440 3441 return bdev_write_blocks_with_md(desc, ch, buf, md_buf, offset_blocks, num_blocks, 3442 cb, cb_arg); 3443 } 3444 3445 static int 3446 bdev_writev_blocks_with_md(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 3447 struct iovec *iov, int iovcnt, void *md_buf, 3448 uint64_t offset_blocks, uint64_t num_blocks, 3449 spdk_bdev_io_completion_cb cb, void *cb_arg) 3450 { 3451 struct spdk_bdev *bdev = spdk_bdev_desc_get_bdev(desc); 3452 struct spdk_bdev_io *bdev_io; 3453 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 3454 3455 if (!desc->write) { 3456 return -EBADF; 3457 } 3458 3459 if (!bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) { 3460 return -EINVAL; 3461 } 3462 3463 bdev_io = bdev_channel_get_io(channel); 3464 if (!bdev_io) { 3465 return -ENOMEM; 3466 } 3467 3468 bdev_io->internal.ch = channel; 3469 bdev_io->internal.desc = desc; 3470 bdev_io->type = SPDK_BDEV_IO_TYPE_WRITE; 3471 bdev_io->u.bdev.iovs = iov; 3472 bdev_io->u.bdev.iovcnt = iovcnt; 3473 bdev_io->u.bdev.md_buf = md_buf; 3474 bdev_io->u.bdev.num_blocks = num_blocks; 3475 bdev_io->u.bdev.offset_blocks = offset_blocks; 3476 bdev_io_init(bdev_io, bdev, cb_arg, cb); 3477 3478 bdev_io_submit(bdev_io); 3479 return 0; 3480 } 3481 3482 int 3483 spdk_bdev_writev(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 3484 struct iovec *iov, int iovcnt, 3485 uint64_t offset, uint64_t len, 3486 spdk_bdev_io_completion_cb cb, void *cb_arg) 3487 { 3488 uint64_t offset_blocks, num_blocks; 3489 3490 if (bdev_bytes_to_blocks(spdk_bdev_desc_get_bdev(desc), offset, &offset_blocks, 3491 len, &num_blocks) != 0) { 3492 return -EINVAL; 3493 } 3494 3495 return spdk_bdev_writev_blocks(desc, ch, iov, iovcnt, offset_blocks, num_blocks, cb, cb_arg); 3496 } 3497 3498 int 3499 spdk_bdev_writev_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 3500 struct iovec *iov, int iovcnt, 3501 uint64_t offset_blocks, uint64_t num_blocks, 3502 spdk_bdev_io_completion_cb cb, void *cb_arg) 3503 { 3504 return bdev_writev_blocks_with_md(desc, ch, iov, iovcnt, NULL, offset_blocks, 3505 num_blocks, cb, cb_arg); 3506 } 3507 3508 int 3509 spdk_bdev_writev_blocks_with_md(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 3510 struct iovec *iov, int iovcnt, void *md_buf, 3511 uint64_t offset_blocks, uint64_t num_blocks, 3512 spdk_bdev_io_completion_cb cb, void *cb_arg) 3513 { 3514 if (!spdk_bdev_is_md_separate(spdk_bdev_desc_get_bdev(desc))) { 3515 return -EINVAL; 3516 } 3517 3518 if (!_bdev_io_check_md_buf(iov, md_buf)) { 3519 return -EINVAL; 3520 } 3521 3522 return bdev_writev_blocks_with_md(desc, ch, iov, iovcnt, md_buf, offset_blocks, 3523 num_blocks, cb, cb_arg); 3524 } 3525 3526 static void 3527 bdev_compare_do_read_done(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg) 3528 { 3529 struct spdk_bdev_io *parent_io = cb_arg; 3530 uint8_t *read_buf = bdev_io->u.bdev.iovs[0].iov_base; 3531 int i, rc = 0; 3532 3533 if (!success) { 3534 parent_io->internal.status = SPDK_BDEV_IO_STATUS_FAILED; 3535 parent_io->internal.cb(parent_io, false, parent_io->internal.caller_ctx); 3536 spdk_bdev_free_io(bdev_io); 3537 return; 3538 } 3539 3540 for (i = 0; i < parent_io->u.bdev.iovcnt; i++) { 3541 rc = memcmp(read_buf, 3542 parent_io->u.bdev.iovs[i].iov_base, 3543 parent_io->u.bdev.iovs[i].iov_len); 3544 if (rc) { 3545 break; 3546 } 3547 read_buf += parent_io->u.bdev.iovs[i].iov_len; 3548 } 3549 3550 spdk_bdev_free_io(bdev_io); 3551 3552 if (rc == 0) { 3553 parent_io->internal.status = SPDK_BDEV_IO_STATUS_SUCCESS; 3554 parent_io->internal.cb(parent_io, true, parent_io->internal.caller_ctx); 3555 } else { 3556 parent_io->internal.status = SPDK_BDEV_IO_STATUS_MISCOMPARE; 3557 parent_io->internal.cb(parent_io, false, parent_io->internal.caller_ctx); 3558 } 3559 } 3560 3561 static void 3562 bdev_compare_do_read(void *_bdev_io) 3563 { 3564 struct spdk_bdev_io *bdev_io = _bdev_io; 3565 int rc; 3566 3567 rc = spdk_bdev_read_blocks(bdev_io->internal.desc, 3568 spdk_io_channel_from_ctx(bdev_io->internal.ch), NULL, 3569 bdev_io->u.bdev.offset_blocks, bdev_io->u.bdev.num_blocks, 3570 bdev_compare_do_read_done, bdev_io); 3571 3572 if (rc == -ENOMEM) { 3573 bdev_queue_io_wait_with_cb(bdev_io, bdev_compare_do_read); 3574 } else if (rc != 0) { 3575 bdev_io->internal.status = SPDK_BDEV_IO_STATUS_FAILED; 3576 bdev_io->internal.cb(bdev_io, false, bdev_io->internal.caller_ctx); 3577 } 3578 } 3579 3580 static int 3581 bdev_comparev_blocks_with_md(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 3582 struct iovec *iov, int iovcnt, void *md_buf, 3583 uint64_t offset_blocks, uint64_t num_blocks, 3584 spdk_bdev_io_completion_cb cb, void *cb_arg) 3585 { 3586 struct spdk_bdev *bdev = spdk_bdev_desc_get_bdev(desc); 3587 struct spdk_bdev_io *bdev_io; 3588 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 3589 3590 if (!bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) { 3591 return -EINVAL; 3592 } 3593 3594 bdev_io = bdev_channel_get_io(channel); 3595 if (!bdev_io) { 3596 return -ENOMEM; 3597 } 3598 3599 bdev_io->internal.ch = channel; 3600 bdev_io->internal.desc = desc; 3601 bdev_io->type = SPDK_BDEV_IO_TYPE_COMPARE; 3602 bdev_io->u.bdev.iovs = iov; 3603 bdev_io->u.bdev.iovcnt = iovcnt; 3604 bdev_io->u.bdev.md_buf = md_buf; 3605 bdev_io->u.bdev.num_blocks = num_blocks; 3606 bdev_io->u.bdev.offset_blocks = offset_blocks; 3607 bdev_io_init(bdev_io, bdev, cb_arg, cb); 3608 3609 if (bdev_io_type_supported(bdev, SPDK_BDEV_IO_TYPE_COMPARE)) { 3610 bdev_io_submit(bdev_io); 3611 return 0; 3612 } 3613 3614 bdev_compare_do_read(bdev_io); 3615 3616 return 0; 3617 } 3618 3619 int 3620 spdk_bdev_comparev_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 3621 struct iovec *iov, int iovcnt, 3622 uint64_t offset_blocks, uint64_t num_blocks, 3623 spdk_bdev_io_completion_cb cb, void *cb_arg) 3624 { 3625 return bdev_comparev_blocks_with_md(desc, ch, iov, iovcnt, NULL, offset_blocks, 3626 num_blocks, cb, cb_arg); 3627 } 3628 3629 int 3630 spdk_bdev_comparev_blocks_with_md(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 3631 struct iovec *iov, int iovcnt, void *md_buf, 3632 uint64_t offset_blocks, uint64_t num_blocks, 3633 spdk_bdev_io_completion_cb cb, void *cb_arg) 3634 { 3635 if (!spdk_bdev_is_md_separate(spdk_bdev_desc_get_bdev(desc))) { 3636 return -EINVAL; 3637 } 3638 3639 if (!_bdev_io_check_md_buf(iov, md_buf)) { 3640 return -EINVAL; 3641 } 3642 3643 return bdev_comparev_blocks_with_md(desc, ch, iov, iovcnt, md_buf, offset_blocks, 3644 num_blocks, cb, cb_arg); 3645 } 3646 3647 static int 3648 bdev_compare_blocks_with_md(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 3649 void *buf, void *md_buf, uint64_t offset_blocks, uint64_t num_blocks, 3650 spdk_bdev_io_completion_cb cb, void *cb_arg) 3651 { 3652 struct spdk_bdev *bdev = spdk_bdev_desc_get_bdev(desc); 3653 struct spdk_bdev_io *bdev_io; 3654 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 3655 3656 if (!bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) { 3657 return -EINVAL; 3658 } 3659 3660 bdev_io = bdev_channel_get_io(channel); 3661 if (!bdev_io) { 3662 return -ENOMEM; 3663 } 3664 3665 bdev_io->internal.ch = channel; 3666 bdev_io->internal.desc = desc; 3667 bdev_io->type = SPDK_BDEV_IO_TYPE_COMPARE; 3668 bdev_io->u.bdev.iovs = &bdev_io->iov; 3669 bdev_io->u.bdev.iovs[0].iov_base = buf; 3670 bdev_io->u.bdev.iovs[0].iov_len = num_blocks * bdev->blocklen; 3671 bdev_io->u.bdev.iovcnt = 1; 3672 bdev_io->u.bdev.md_buf = md_buf; 3673 bdev_io->u.bdev.num_blocks = num_blocks; 3674 bdev_io->u.bdev.offset_blocks = offset_blocks; 3675 bdev_io_init(bdev_io, bdev, cb_arg, cb); 3676 3677 if (bdev_io_type_supported(bdev, SPDK_BDEV_IO_TYPE_COMPARE)) { 3678 bdev_io_submit(bdev_io); 3679 return 0; 3680 } 3681 3682 bdev_compare_do_read(bdev_io); 3683 3684 return 0; 3685 } 3686 3687 int 3688 spdk_bdev_compare_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 3689 void *buf, uint64_t offset_blocks, uint64_t num_blocks, 3690 spdk_bdev_io_completion_cb cb, void *cb_arg) 3691 { 3692 return bdev_compare_blocks_with_md(desc, ch, buf, NULL, offset_blocks, num_blocks, 3693 cb, cb_arg); 3694 } 3695 3696 int 3697 spdk_bdev_compare_blocks_with_md(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 3698 void *buf, void *md_buf, uint64_t offset_blocks, uint64_t num_blocks, 3699 spdk_bdev_io_completion_cb cb, void *cb_arg) 3700 { 3701 struct iovec iov = { 3702 .iov_base = buf, 3703 }; 3704 3705 if (!spdk_bdev_is_md_separate(spdk_bdev_desc_get_bdev(desc))) { 3706 return -EINVAL; 3707 } 3708 3709 if (!_bdev_io_check_md_buf(&iov, md_buf)) { 3710 return -EINVAL; 3711 } 3712 3713 return bdev_compare_blocks_with_md(desc, ch, buf, md_buf, offset_blocks, num_blocks, 3714 cb, cb_arg); 3715 } 3716 3717 static void 3718 bdev_comparev_and_writev_blocks_unlocked(void *ctx, int unlock_status) 3719 { 3720 struct spdk_bdev_io *bdev_io = ctx; 3721 3722 if (unlock_status) { 3723 SPDK_ERRLOG("LBA range unlock failed\n"); 3724 } 3725 3726 bdev_io->internal.cb(bdev_io, bdev_io->internal.status == SPDK_BDEV_IO_STATUS_SUCCESS ? true : 3727 false, bdev_io->internal.caller_ctx); 3728 } 3729 3730 static void 3731 bdev_comparev_and_writev_blocks_unlock(struct spdk_bdev_io *bdev_io, int status) 3732 { 3733 bdev_io->internal.status = status; 3734 3735 bdev_unlock_lba_range(bdev_io->internal.desc, spdk_io_channel_from_ctx(bdev_io->internal.ch), 3736 bdev_io->u.bdev.offset_blocks, bdev_io->u.bdev.num_blocks, 3737 bdev_comparev_and_writev_blocks_unlocked, bdev_io); 3738 } 3739 3740 static void 3741 bdev_compare_and_write_do_write_done(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg) 3742 { 3743 struct spdk_bdev_io *parent_io = cb_arg; 3744 3745 if (!success) { 3746 SPDK_ERRLOG("Compare and write operation failed\n"); 3747 } 3748 3749 spdk_bdev_free_io(bdev_io); 3750 3751 bdev_comparev_and_writev_blocks_unlock(parent_io, 3752 success ? SPDK_BDEV_IO_STATUS_SUCCESS : SPDK_BDEV_IO_STATUS_FAILED); 3753 } 3754 3755 static void 3756 bdev_compare_and_write_do_write(void *_bdev_io) 3757 { 3758 struct spdk_bdev_io *bdev_io = _bdev_io; 3759 int rc; 3760 3761 rc = spdk_bdev_writev_blocks(bdev_io->internal.desc, 3762 spdk_io_channel_from_ctx(bdev_io->internal.ch), 3763 bdev_io->u.bdev.fused_iovs, bdev_io->u.bdev.fused_iovcnt, 3764 bdev_io->u.bdev.offset_blocks, bdev_io->u.bdev.num_blocks, 3765 bdev_compare_and_write_do_write_done, bdev_io); 3766 3767 3768 if (rc == -ENOMEM) { 3769 bdev_queue_io_wait_with_cb(bdev_io, bdev_compare_and_write_do_write); 3770 } else if (rc != 0) { 3771 bdev_comparev_and_writev_blocks_unlock(bdev_io, SPDK_BDEV_IO_STATUS_FAILED); 3772 } 3773 } 3774 3775 static void 3776 bdev_compare_and_write_do_compare_done(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg) 3777 { 3778 struct spdk_bdev_io *parent_io = cb_arg; 3779 3780 spdk_bdev_free_io(bdev_io); 3781 3782 if (!success) { 3783 bdev_comparev_and_writev_blocks_unlock(parent_io, SPDK_BDEV_IO_STATUS_MISCOMPARE); 3784 return; 3785 } 3786 3787 bdev_compare_and_write_do_write(parent_io); 3788 } 3789 3790 static void 3791 bdev_compare_and_write_do_compare(void *_bdev_io) 3792 { 3793 struct spdk_bdev_io *bdev_io = _bdev_io; 3794 int rc; 3795 3796 rc = spdk_bdev_comparev_blocks(bdev_io->internal.desc, 3797 spdk_io_channel_from_ctx(bdev_io->internal.ch), bdev_io->u.bdev.iovs, 3798 bdev_io->u.bdev.iovcnt, bdev_io->u.bdev.offset_blocks, bdev_io->u.bdev.num_blocks, 3799 bdev_compare_and_write_do_compare_done, bdev_io); 3800 3801 if (rc == -ENOMEM) { 3802 bdev_queue_io_wait_with_cb(bdev_io, bdev_compare_and_write_do_compare); 3803 } else if (rc != 0) { 3804 bdev_comparev_and_writev_blocks_unlock(bdev_io, SPDK_BDEV_IO_STATUS_FIRST_FUSED_FAILED); 3805 } 3806 } 3807 3808 static void 3809 bdev_comparev_and_writev_blocks_locked(void *ctx, int status) 3810 { 3811 struct spdk_bdev_io *bdev_io = ctx; 3812 3813 if (status) { 3814 bdev_io->internal.status = SPDK_BDEV_IO_STATUS_FIRST_FUSED_FAILED; 3815 bdev_io->internal.cb(bdev_io, false, bdev_io->internal.caller_ctx); 3816 } 3817 3818 bdev_compare_and_write_do_compare(bdev_io); 3819 } 3820 3821 int 3822 spdk_bdev_comparev_and_writev_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 3823 struct iovec *compare_iov, int compare_iovcnt, 3824 struct iovec *write_iov, int write_iovcnt, 3825 uint64_t offset_blocks, uint64_t num_blocks, 3826 spdk_bdev_io_completion_cb cb, void *cb_arg) 3827 { 3828 struct spdk_bdev *bdev = spdk_bdev_desc_get_bdev(desc); 3829 struct spdk_bdev_io *bdev_io; 3830 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 3831 3832 if (!desc->write) { 3833 return -EBADF; 3834 } 3835 3836 if (!bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) { 3837 return -EINVAL; 3838 } 3839 3840 if (num_blocks > bdev->acwu) { 3841 return -EINVAL; 3842 } 3843 3844 bdev_io = bdev_channel_get_io(channel); 3845 if (!bdev_io) { 3846 return -ENOMEM; 3847 } 3848 3849 bdev_io->internal.ch = channel; 3850 bdev_io->internal.desc = desc; 3851 bdev_io->type = SPDK_BDEV_IO_TYPE_COMPARE_AND_WRITE; 3852 bdev_io->u.bdev.iovs = compare_iov; 3853 bdev_io->u.bdev.iovcnt = compare_iovcnt; 3854 bdev_io->u.bdev.fused_iovs = write_iov; 3855 bdev_io->u.bdev.fused_iovcnt = write_iovcnt; 3856 bdev_io->u.bdev.md_buf = NULL; 3857 bdev_io->u.bdev.num_blocks = num_blocks; 3858 bdev_io->u.bdev.offset_blocks = offset_blocks; 3859 bdev_io_init(bdev_io, bdev, cb_arg, cb); 3860 3861 if (bdev_io_type_supported(bdev, SPDK_BDEV_IO_TYPE_COMPARE_AND_WRITE)) { 3862 bdev_io_submit(bdev_io); 3863 return 0; 3864 } 3865 3866 return bdev_lock_lba_range(desc, ch, offset_blocks, num_blocks, 3867 bdev_comparev_and_writev_blocks_locked, bdev_io); 3868 } 3869 3870 static void 3871 bdev_zcopy_get_buf(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io, bool success) 3872 { 3873 if (!success) { 3874 /* Don't use spdk_bdev_io_complete here - this bdev_io was never actually submitted. */ 3875 bdev_io->internal.status = SPDK_BDEV_IO_STATUS_NOMEM; 3876 bdev_io->internal.cb(bdev_io, success, bdev_io->internal.caller_ctx); 3877 return; 3878 } 3879 3880 if (bdev_io->u.bdev.zcopy.populate) { 3881 /* Read the real data into the buffer */ 3882 bdev_io->type = SPDK_BDEV_IO_TYPE_READ; 3883 bdev_io->internal.status = SPDK_BDEV_IO_STATUS_PENDING; 3884 bdev_io_submit(bdev_io); 3885 return; 3886 } 3887 3888 /* Don't use spdk_bdev_io_complete here - this bdev_io was never actually submitted. */ 3889 bdev_io->internal.status = SPDK_BDEV_IO_STATUS_SUCCESS; 3890 bdev_io->internal.cb(bdev_io, success, bdev_io->internal.caller_ctx); 3891 } 3892 3893 int 3894 spdk_bdev_zcopy_start(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 3895 uint64_t offset_blocks, uint64_t num_blocks, 3896 bool populate, 3897 spdk_bdev_io_completion_cb cb, void *cb_arg) 3898 { 3899 struct spdk_bdev *bdev = spdk_bdev_desc_get_bdev(desc); 3900 struct spdk_bdev_io *bdev_io; 3901 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 3902 3903 if (!desc->write) { 3904 return -EBADF; 3905 } 3906 3907 if (!bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) { 3908 return -EINVAL; 3909 } 3910 3911 if (!spdk_bdev_io_type_supported(bdev, SPDK_BDEV_IO_TYPE_ZCOPY)) { 3912 return -ENOTSUP; 3913 } 3914 3915 bdev_io = bdev_channel_get_io(channel); 3916 if (!bdev_io) { 3917 return -ENOMEM; 3918 } 3919 3920 bdev_io->internal.ch = channel; 3921 bdev_io->internal.desc = desc; 3922 bdev_io->type = SPDK_BDEV_IO_TYPE_ZCOPY; 3923 bdev_io->u.bdev.num_blocks = num_blocks; 3924 bdev_io->u.bdev.offset_blocks = offset_blocks; 3925 bdev_io->u.bdev.iovs = NULL; 3926 bdev_io->u.bdev.iovcnt = 0; 3927 bdev_io->u.bdev.md_buf = NULL; 3928 bdev_io->u.bdev.zcopy.populate = populate ? 1 : 0; 3929 bdev_io->u.bdev.zcopy.commit = 0; 3930 bdev_io->u.bdev.zcopy.start = 1; 3931 bdev_io_init(bdev_io, bdev, cb_arg, cb); 3932 3933 if (bdev_io_type_supported(bdev, SPDK_BDEV_IO_TYPE_ZCOPY)) { 3934 bdev_io_submit(bdev_io); 3935 } else { 3936 /* Emulate zcopy by allocating a buffer */ 3937 spdk_bdev_io_get_buf(bdev_io, bdev_zcopy_get_buf, 3938 bdev_io->u.bdev.num_blocks * bdev->blocklen); 3939 } 3940 3941 return 0; 3942 } 3943 3944 int 3945 spdk_bdev_zcopy_end(struct spdk_bdev_io *bdev_io, bool commit, 3946 spdk_bdev_io_completion_cb cb, void *cb_arg) 3947 { 3948 struct spdk_bdev *bdev = bdev_io->bdev; 3949 3950 if (bdev_io->type == SPDK_BDEV_IO_TYPE_READ) { 3951 /* This can happen if the zcopy was emulated in start */ 3952 if (bdev_io->u.bdev.zcopy.start != 1) { 3953 return -EINVAL; 3954 } 3955 bdev_io->type = SPDK_BDEV_IO_TYPE_ZCOPY; 3956 } 3957 3958 if (bdev_io->type != SPDK_BDEV_IO_TYPE_ZCOPY) { 3959 return -EINVAL; 3960 } 3961 3962 bdev_io->u.bdev.zcopy.commit = commit ? 1 : 0; 3963 bdev_io->u.bdev.zcopy.start = 0; 3964 bdev_io->internal.caller_ctx = cb_arg; 3965 bdev_io->internal.cb = cb; 3966 bdev_io->internal.status = SPDK_BDEV_IO_STATUS_PENDING; 3967 3968 if (bdev_io_type_supported(bdev, SPDK_BDEV_IO_TYPE_ZCOPY)) { 3969 bdev_io_submit(bdev_io); 3970 return 0; 3971 } 3972 3973 if (!bdev_io->u.bdev.zcopy.commit) { 3974 /* Don't use spdk_bdev_io_complete here - this bdev_io was never actually submitted. */ 3975 bdev_io->internal.status = SPDK_BDEV_IO_STATUS_SUCCESS; 3976 bdev_io->internal.cb(bdev_io, true, bdev_io->internal.caller_ctx); 3977 return 0; 3978 } 3979 3980 bdev_io->type = SPDK_BDEV_IO_TYPE_WRITE; 3981 bdev_io_submit(bdev_io); 3982 3983 return 0; 3984 } 3985 3986 int 3987 spdk_bdev_write_zeroes(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 3988 uint64_t offset, uint64_t len, 3989 spdk_bdev_io_completion_cb cb, void *cb_arg) 3990 { 3991 uint64_t offset_blocks, num_blocks; 3992 3993 if (bdev_bytes_to_blocks(spdk_bdev_desc_get_bdev(desc), offset, &offset_blocks, 3994 len, &num_blocks) != 0) { 3995 return -EINVAL; 3996 } 3997 3998 return spdk_bdev_write_zeroes_blocks(desc, ch, offset_blocks, num_blocks, cb, cb_arg); 3999 } 4000 4001 int 4002 spdk_bdev_write_zeroes_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 4003 uint64_t offset_blocks, uint64_t num_blocks, 4004 spdk_bdev_io_completion_cb cb, void *cb_arg) 4005 { 4006 struct spdk_bdev *bdev = spdk_bdev_desc_get_bdev(desc); 4007 struct spdk_bdev_io *bdev_io; 4008 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 4009 4010 if (!desc->write) { 4011 return -EBADF; 4012 } 4013 4014 if (!bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) { 4015 return -EINVAL; 4016 } 4017 4018 if (!bdev_io_type_supported(bdev, SPDK_BDEV_IO_TYPE_WRITE_ZEROES) && 4019 !bdev_io_type_supported(bdev, SPDK_BDEV_IO_TYPE_WRITE)) { 4020 return -ENOTSUP; 4021 } 4022 4023 bdev_io = bdev_channel_get_io(channel); 4024 4025 if (!bdev_io) { 4026 return -ENOMEM; 4027 } 4028 4029 bdev_io->type = SPDK_BDEV_IO_TYPE_WRITE_ZEROES; 4030 bdev_io->internal.ch = channel; 4031 bdev_io->internal.desc = desc; 4032 bdev_io->u.bdev.offset_blocks = offset_blocks; 4033 bdev_io->u.bdev.num_blocks = num_blocks; 4034 bdev_io_init(bdev_io, bdev, cb_arg, cb); 4035 4036 if (bdev_io_type_supported(bdev, SPDK_BDEV_IO_TYPE_WRITE_ZEROES)) { 4037 bdev_io_submit(bdev_io); 4038 return 0; 4039 } 4040 4041 assert(bdev_io_type_supported(bdev, SPDK_BDEV_IO_TYPE_WRITE)); 4042 assert(_bdev_get_block_size_with_md(bdev) <= ZERO_BUFFER_SIZE); 4043 bdev_io->u.bdev.split_remaining_num_blocks = num_blocks; 4044 bdev_io->u.bdev.split_current_offset_blocks = offset_blocks; 4045 bdev_write_zero_buffer_next(bdev_io); 4046 4047 return 0; 4048 } 4049 4050 int 4051 spdk_bdev_unmap(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 4052 uint64_t offset, uint64_t nbytes, 4053 spdk_bdev_io_completion_cb cb, void *cb_arg) 4054 { 4055 uint64_t offset_blocks, num_blocks; 4056 4057 if (bdev_bytes_to_blocks(spdk_bdev_desc_get_bdev(desc), offset, &offset_blocks, 4058 nbytes, &num_blocks) != 0) { 4059 return -EINVAL; 4060 } 4061 4062 return spdk_bdev_unmap_blocks(desc, ch, offset_blocks, num_blocks, cb, cb_arg); 4063 } 4064 4065 int 4066 spdk_bdev_unmap_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 4067 uint64_t offset_blocks, uint64_t num_blocks, 4068 spdk_bdev_io_completion_cb cb, void *cb_arg) 4069 { 4070 struct spdk_bdev *bdev = spdk_bdev_desc_get_bdev(desc); 4071 struct spdk_bdev_io *bdev_io; 4072 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 4073 4074 if (!desc->write) { 4075 return -EBADF; 4076 } 4077 4078 if (!bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) { 4079 return -EINVAL; 4080 } 4081 4082 if (num_blocks == 0) { 4083 SPDK_ERRLOG("Can't unmap 0 bytes\n"); 4084 return -EINVAL; 4085 } 4086 4087 bdev_io = bdev_channel_get_io(channel); 4088 if (!bdev_io) { 4089 return -ENOMEM; 4090 } 4091 4092 bdev_io->internal.ch = channel; 4093 bdev_io->internal.desc = desc; 4094 bdev_io->type = SPDK_BDEV_IO_TYPE_UNMAP; 4095 4096 bdev_io->u.bdev.iovs = &bdev_io->iov; 4097 bdev_io->u.bdev.iovs[0].iov_base = NULL; 4098 bdev_io->u.bdev.iovs[0].iov_len = 0; 4099 bdev_io->u.bdev.iovcnt = 1; 4100 4101 bdev_io->u.bdev.offset_blocks = offset_blocks; 4102 bdev_io->u.bdev.num_blocks = num_blocks; 4103 bdev_io_init(bdev_io, bdev, cb_arg, cb); 4104 4105 bdev_io_submit(bdev_io); 4106 return 0; 4107 } 4108 4109 int 4110 spdk_bdev_flush(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 4111 uint64_t offset, uint64_t length, 4112 spdk_bdev_io_completion_cb cb, void *cb_arg) 4113 { 4114 uint64_t offset_blocks, num_blocks; 4115 4116 if (bdev_bytes_to_blocks(spdk_bdev_desc_get_bdev(desc), offset, &offset_blocks, 4117 length, &num_blocks) != 0) { 4118 return -EINVAL; 4119 } 4120 4121 return spdk_bdev_flush_blocks(desc, ch, offset_blocks, num_blocks, cb, cb_arg); 4122 } 4123 4124 int 4125 spdk_bdev_flush_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 4126 uint64_t offset_blocks, uint64_t num_blocks, 4127 spdk_bdev_io_completion_cb cb, void *cb_arg) 4128 { 4129 struct spdk_bdev *bdev = spdk_bdev_desc_get_bdev(desc); 4130 struct spdk_bdev_io *bdev_io; 4131 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 4132 4133 if (!desc->write) { 4134 return -EBADF; 4135 } 4136 4137 if (!bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) { 4138 return -EINVAL; 4139 } 4140 4141 bdev_io = bdev_channel_get_io(channel); 4142 if (!bdev_io) { 4143 return -ENOMEM; 4144 } 4145 4146 bdev_io->internal.ch = channel; 4147 bdev_io->internal.desc = desc; 4148 bdev_io->type = SPDK_BDEV_IO_TYPE_FLUSH; 4149 bdev_io->u.bdev.iovs = NULL; 4150 bdev_io->u.bdev.iovcnt = 0; 4151 bdev_io->u.bdev.offset_blocks = offset_blocks; 4152 bdev_io->u.bdev.num_blocks = num_blocks; 4153 bdev_io_init(bdev_io, bdev, cb_arg, cb); 4154 4155 bdev_io_submit(bdev_io); 4156 return 0; 4157 } 4158 4159 static void 4160 bdev_reset_dev(struct spdk_io_channel_iter *i, int status) 4161 { 4162 struct spdk_bdev_channel *ch = spdk_io_channel_iter_get_ctx(i); 4163 struct spdk_bdev_io *bdev_io; 4164 4165 bdev_io = TAILQ_FIRST(&ch->queued_resets); 4166 TAILQ_REMOVE(&ch->queued_resets, bdev_io, internal.link); 4167 bdev_io_submit_reset(bdev_io); 4168 } 4169 4170 static void 4171 bdev_reset_freeze_channel(struct spdk_io_channel_iter *i) 4172 { 4173 struct spdk_io_channel *ch; 4174 struct spdk_bdev_channel *channel; 4175 struct spdk_bdev_mgmt_channel *mgmt_channel; 4176 struct spdk_bdev_shared_resource *shared_resource; 4177 bdev_io_tailq_t tmp_queued; 4178 4179 TAILQ_INIT(&tmp_queued); 4180 4181 ch = spdk_io_channel_iter_get_channel(i); 4182 channel = spdk_io_channel_get_ctx(ch); 4183 shared_resource = channel->shared_resource; 4184 mgmt_channel = shared_resource->mgmt_ch; 4185 4186 channel->flags |= BDEV_CH_RESET_IN_PROGRESS; 4187 4188 if ((channel->flags & BDEV_CH_QOS_ENABLED) != 0) { 4189 /* The QoS object is always valid and readable while 4190 * the channel flag is set, so the lock here should not 4191 * be necessary. We're not in the fast path though, so 4192 * just take it anyway. */ 4193 pthread_mutex_lock(&channel->bdev->internal.mutex); 4194 if (channel->bdev->internal.qos->ch == channel) { 4195 TAILQ_SWAP(&channel->bdev->internal.qos->queued, &tmp_queued, spdk_bdev_io, internal.link); 4196 } 4197 pthread_mutex_unlock(&channel->bdev->internal.mutex); 4198 } 4199 4200 bdev_abort_all_queued_io(&shared_resource->nomem_io, channel); 4201 bdev_abort_buf_io(&mgmt_channel->need_buf_small, channel); 4202 bdev_abort_buf_io(&mgmt_channel->need_buf_large, channel); 4203 bdev_abort_all_queued_io(&tmp_queued, channel); 4204 4205 spdk_for_each_channel_continue(i, 0); 4206 } 4207 4208 static void 4209 bdev_start_reset(void *ctx) 4210 { 4211 struct spdk_bdev_channel *ch = ctx; 4212 4213 spdk_for_each_channel(__bdev_to_io_dev(ch->bdev), bdev_reset_freeze_channel, 4214 ch, bdev_reset_dev); 4215 } 4216 4217 static void 4218 bdev_channel_start_reset(struct spdk_bdev_channel *ch) 4219 { 4220 struct spdk_bdev *bdev = ch->bdev; 4221 4222 assert(!TAILQ_EMPTY(&ch->queued_resets)); 4223 4224 pthread_mutex_lock(&bdev->internal.mutex); 4225 if (bdev->internal.reset_in_progress == NULL) { 4226 bdev->internal.reset_in_progress = TAILQ_FIRST(&ch->queued_resets); 4227 /* 4228 * Take a channel reference for the target bdev for the life of this 4229 * reset. This guards against the channel getting destroyed while 4230 * spdk_for_each_channel() calls related to this reset IO are in 4231 * progress. We will release the reference when this reset is 4232 * completed. 4233 */ 4234 bdev->internal.reset_in_progress->u.reset.ch_ref = spdk_get_io_channel(__bdev_to_io_dev(bdev)); 4235 bdev_start_reset(ch); 4236 } 4237 pthread_mutex_unlock(&bdev->internal.mutex); 4238 } 4239 4240 int 4241 spdk_bdev_reset(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 4242 spdk_bdev_io_completion_cb cb, void *cb_arg) 4243 { 4244 struct spdk_bdev *bdev = spdk_bdev_desc_get_bdev(desc); 4245 struct spdk_bdev_io *bdev_io; 4246 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 4247 4248 bdev_io = bdev_channel_get_io(channel); 4249 if (!bdev_io) { 4250 return -ENOMEM; 4251 } 4252 4253 bdev_io->internal.ch = channel; 4254 bdev_io->internal.desc = desc; 4255 bdev_io->internal.submit_tsc = spdk_get_ticks(); 4256 bdev_io->type = SPDK_BDEV_IO_TYPE_RESET; 4257 bdev_io->u.reset.ch_ref = NULL; 4258 bdev_io_init(bdev_io, bdev, cb_arg, cb); 4259 4260 pthread_mutex_lock(&bdev->internal.mutex); 4261 TAILQ_INSERT_TAIL(&channel->queued_resets, bdev_io, internal.link); 4262 pthread_mutex_unlock(&bdev->internal.mutex); 4263 4264 TAILQ_INSERT_TAIL(&bdev_io->internal.ch->io_submitted, bdev_io, 4265 internal.ch_link); 4266 4267 bdev_channel_start_reset(channel); 4268 4269 return 0; 4270 } 4271 4272 void 4273 spdk_bdev_get_io_stat(struct spdk_bdev *bdev, struct spdk_io_channel *ch, 4274 struct spdk_bdev_io_stat *stat) 4275 { 4276 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 4277 4278 *stat = channel->stat; 4279 } 4280 4281 static void 4282 bdev_get_device_stat_done(struct spdk_io_channel_iter *i, int status) 4283 { 4284 void *io_device = spdk_io_channel_iter_get_io_device(i); 4285 struct spdk_bdev_iostat_ctx *bdev_iostat_ctx = spdk_io_channel_iter_get_ctx(i); 4286 4287 bdev_iostat_ctx->cb(__bdev_from_io_dev(io_device), bdev_iostat_ctx->stat, 4288 bdev_iostat_ctx->cb_arg, 0); 4289 free(bdev_iostat_ctx); 4290 } 4291 4292 static void 4293 bdev_get_each_channel_stat(struct spdk_io_channel_iter *i) 4294 { 4295 struct spdk_bdev_iostat_ctx *bdev_iostat_ctx = spdk_io_channel_iter_get_ctx(i); 4296 struct spdk_io_channel *ch = spdk_io_channel_iter_get_channel(i); 4297 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 4298 4299 bdev_io_stat_add(bdev_iostat_ctx->stat, &channel->stat); 4300 spdk_for_each_channel_continue(i, 0); 4301 } 4302 4303 void 4304 spdk_bdev_get_device_stat(struct spdk_bdev *bdev, struct spdk_bdev_io_stat *stat, 4305 spdk_bdev_get_device_stat_cb cb, void *cb_arg) 4306 { 4307 struct spdk_bdev_iostat_ctx *bdev_iostat_ctx; 4308 4309 assert(bdev != NULL); 4310 assert(stat != NULL); 4311 assert(cb != NULL); 4312 4313 bdev_iostat_ctx = calloc(1, sizeof(struct spdk_bdev_iostat_ctx)); 4314 if (bdev_iostat_ctx == NULL) { 4315 SPDK_ERRLOG("Unable to allocate memory for spdk_bdev_iostat_ctx\n"); 4316 cb(bdev, stat, cb_arg, -ENOMEM); 4317 return; 4318 } 4319 4320 bdev_iostat_ctx->stat = stat; 4321 bdev_iostat_ctx->cb = cb; 4322 bdev_iostat_ctx->cb_arg = cb_arg; 4323 4324 /* Start with the statistics from previously deleted channels. */ 4325 pthread_mutex_lock(&bdev->internal.mutex); 4326 bdev_io_stat_add(bdev_iostat_ctx->stat, &bdev->internal.stat); 4327 pthread_mutex_unlock(&bdev->internal.mutex); 4328 4329 /* Then iterate and add the statistics from each existing channel. */ 4330 spdk_for_each_channel(__bdev_to_io_dev(bdev), 4331 bdev_get_each_channel_stat, 4332 bdev_iostat_ctx, 4333 bdev_get_device_stat_done); 4334 } 4335 4336 int 4337 spdk_bdev_nvme_admin_passthru(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 4338 const struct spdk_nvme_cmd *cmd, void *buf, size_t nbytes, 4339 spdk_bdev_io_completion_cb cb, void *cb_arg) 4340 { 4341 struct spdk_bdev *bdev = spdk_bdev_desc_get_bdev(desc); 4342 struct spdk_bdev_io *bdev_io; 4343 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 4344 4345 if (!desc->write) { 4346 return -EBADF; 4347 } 4348 4349 bdev_io = bdev_channel_get_io(channel); 4350 if (!bdev_io) { 4351 return -ENOMEM; 4352 } 4353 4354 bdev_io->internal.ch = channel; 4355 bdev_io->internal.desc = desc; 4356 bdev_io->type = SPDK_BDEV_IO_TYPE_NVME_ADMIN; 4357 bdev_io->u.nvme_passthru.cmd = *cmd; 4358 bdev_io->u.nvme_passthru.buf = buf; 4359 bdev_io->u.nvme_passthru.nbytes = nbytes; 4360 bdev_io->u.nvme_passthru.md_buf = NULL; 4361 bdev_io->u.nvme_passthru.md_len = 0; 4362 4363 bdev_io_init(bdev_io, bdev, cb_arg, cb); 4364 4365 bdev_io_submit(bdev_io); 4366 return 0; 4367 } 4368 4369 int 4370 spdk_bdev_nvme_io_passthru(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 4371 const struct spdk_nvme_cmd *cmd, void *buf, size_t nbytes, 4372 spdk_bdev_io_completion_cb cb, void *cb_arg) 4373 { 4374 struct spdk_bdev *bdev = spdk_bdev_desc_get_bdev(desc); 4375 struct spdk_bdev_io *bdev_io; 4376 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 4377 4378 if (!desc->write) { 4379 /* 4380 * Do not try to parse the NVMe command - we could maybe use bits in the opcode 4381 * to easily determine if the command is a read or write, but for now just 4382 * do not allow io_passthru with a read-only descriptor. 4383 */ 4384 return -EBADF; 4385 } 4386 4387 bdev_io = bdev_channel_get_io(channel); 4388 if (!bdev_io) { 4389 return -ENOMEM; 4390 } 4391 4392 bdev_io->internal.ch = channel; 4393 bdev_io->internal.desc = desc; 4394 bdev_io->type = SPDK_BDEV_IO_TYPE_NVME_IO; 4395 bdev_io->u.nvme_passthru.cmd = *cmd; 4396 bdev_io->u.nvme_passthru.buf = buf; 4397 bdev_io->u.nvme_passthru.nbytes = nbytes; 4398 bdev_io->u.nvme_passthru.md_buf = NULL; 4399 bdev_io->u.nvme_passthru.md_len = 0; 4400 4401 bdev_io_init(bdev_io, bdev, cb_arg, cb); 4402 4403 bdev_io_submit(bdev_io); 4404 return 0; 4405 } 4406 4407 int 4408 spdk_bdev_nvme_io_passthru_md(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 4409 const struct spdk_nvme_cmd *cmd, void *buf, size_t nbytes, void *md_buf, size_t md_len, 4410 spdk_bdev_io_completion_cb cb, void *cb_arg) 4411 { 4412 struct spdk_bdev *bdev = spdk_bdev_desc_get_bdev(desc); 4413 struct spdk_bdev_io *bdev_io; 4414 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 4415 4416 if (!desc->write) { 4417 /* 4418 * Do not try to parse the NVMe command - we could maybe use bits in the opcode 4419 * to easily determine if the command is a read or write, but for now just 4420 * do not allow io_passthru with a read-only descriptor. 4421 */ 4422 return -EBADF; 4423 } 4424 4425 bdev_io = bdev_channel_get_io(channel); 4426 if (!bdev_io) { 4427 return -ENOMEM; 4428 } 4429 4430 bdev_io->internal.ch = channel; 4431 bdev_io->internal.desc = desc; 4432 bdev_io->type = SPDK_BDEV_IO_TYPE_NVME_IO_MD; 4433 bdev_io->u.nvme_passthru.cmd = *cmd; 4434 bdev_io->u.nvme_passthru.buf = buf; 4435 bdev_io->u.nvme_passthru.nbytes = nbytes; 4436 bdev_io->u.nvme_passthru.md_buf = md_buf; 4437 bdev_io->u.nvme_passthru.md_len = md_len; 4438 4439 bdev_io_init(bdev_io, bdev, cb_arg, cb); 4440 4441 bdev_io_submit(bdev_io); 4442 return 0; 4443 } 4444 4445 static void bdev_abort_retry(void *ctx); 4446 4447 static void 4448 bdev_abort_io_done(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg) 4449 { 4450 struct spdk_bdev_channel *channel = bdev_io->internal.ch; 4451 struct spdk_bdev_io *parent_io = cb_arg; 4452 struct spdk_bdev_io *bio_to_abort, *tmp_io; 4453 4454 bio_to_abort = bdev_io->u.abort.bio_to_abort; 4455 4456 spdk_bdev_free_io(bdev_io); 4457 4458 if (!success) { 4459 /* Check if the target I/O completed in the meantime. */ 4460 TAILQ_FOREACH(tmp_io, &channel->io_submitted, internal.ch_link) { 4461 if (tmp_io == bio_to_abort) { 4462 break; 4463 } 4464 } 4465 4466 /* If the target I/O still exists, set the parent to failed. */ 4467 if (tmp_io != NULL) { 4468 parent_io->internal.status = SPDK_BDEV_IO_STATUS_FAILED; 4469 } 4470 } 4471 4472 parent_io->u.bdev.split_outstanding--; 4473 if (parent_io->u.bdev.split_outstanding == 0) { 4474 if (parent_io->internal.status == SPDK_BDEV_IO_STATUS_NOMEM) { 4475 bdev_abort_retry(parent_io); 4476 } else { 4477 bdev_io_complete(parent_io); 4478 } 4479 } 4480 } 4481 4482 static int 4483 bdev_abort_io(struct spdk_bdev_desc *desc, struct spdk_bdev_channel *channel, 4484 struct spdk_bdev_io *bio_to_abort, 4485 spdk_bdev_io_completion_cb cb, void *cb_arg) 4486 { 4487 struct spdk_bdev *bdev = spdk_bdev_desc_get_bdev(desc); 4488 struct spdk_bdev_io *bdev_io; 4489 4490 if (bio_to_abort->type == SPDK_BDEV_IO_TYPE_ABORT || 4491 bio_to_abort->type == SPDK_BDEV_IO_TYPE_RESET) { 4492 /* TODO: Abort reset or abort request. */ 4493 return -ENOTSUP; 4494 } 4495 4496 if (bdev->split_on_optimal_io_boundary && bdev_io_should_split(bio_to_abort)) { 4497 return -ENOTSUP; 4498 } 4499 4500 bdev_io = bdev_channel_get_io(channel); 4501 if (bdev_io == NULL) { 4502 return -ENOMEM; 4503 } 4504 4505 bdev_io->internal.ch = channel; 4506 bdev_io->internal.desc = desc; 4507 bdev_io->type = SPDK_BDEV_IO_TYPE_ABORT; 4508 bdev_io_init(bdev_io, bdev, cb_arg, cb); 4509 4510 bdev_io->u.abort.bio_to_abort = bio_to_abort; 4511 4512 /* Submit the abort request to the underlying bdev module. */ 4513 bdev_io_submit(bdev_io); 4514 4515 return 0; 4516 } 4517 4518 static uint32_t 4519 _bdev_abort(struct spdk_bdev_io *parent_io) 4520 { 4521 struct spdk_bdev_desc *desc = parent_io->internal.desc; 4522 struct spdk_bdev_channel *channel = parent_io->internal.ch; 4523 void *bio_cb_arg; 4524 struct spdk_bdev_io *bio_to_abort; 4525 uint32_t matched_ios; 4526 int rc; 4527 4528 bio_cb_arg = parent_io->u.bdev.abort.bio_cb_arg; 4529 4530 /* matched_ios is returned and will be kept by the caller. 4531 * 4532 * This funcion will be used for two cases, 1) the same cb_arg is used for 4533 * multiple I/Os, 2) a single large I/O is split into smaller ones. 4534 * Incrementing split_outstanding directly here may confuse readers especially 4535 * for the 1st case. 4536 * 4537 * Completion of I/O abort is processed after stack unwinding. Hence this trick 4538 * works as expected. 4539 */ 4540 matched_ios = 0; 4541 parent_io->internal.status = SPDK_BDEV_IO_STATUS_SUCCESS; 4542 4543 TAILQ_FOREACH(bio_to_abort, &channel->io_submitted, internal.ch_link) { 4544 if (bio_to_abort->internal.caller_ctx != bio_cb_arg) { 4545 continue; 4546 } 4547 4548 if (bio_to_abort->internal.submit_tsc > parent_io->internal.submit_tsc) { 4549 /* Any I/O which was submitted after this abort command should be excluded. */ 4550 continue; 4551 } 4552 4553 rc = bdev_abort_io(desc, channel, bio_to_abort, bdev_abort_io_done, parent_io); 4554 if (rc != 0) { 4555 if (rc == -ENOMEM) { 4556 parent_io->internal.status = SPDK_BDEV_IO_STATUS_NOMEM; 4557 } else { 4558 parent_io->internal.status = SPDK_BDEV_IO_STATUS_FAILED; 4559 } 4560 break; 4561 } 4562 matched_ios++; 4563 } 4564 4565 return matched_ios; 4566 } 4567 4568 static void 4569 bdev_abort_retry(void *ctx) 4570 { 4571 struct spdk_bdev_io *parent_io = ctx; 4572 uint32_t matched_ios; 4573 4574 matched_ios = _bdev_abort(parent_io); 4575 4576 if (matched_ios == 0) { 4577 if (parent_io->internal.status == SPDK_BDEV_IO_STATUS_NOMEM) { 4578 bdev_queue_io_wait_with_cb(parent_io, bdev_abort_retry); 4579 } else { 4580 /* For retry, the case that no target I/O was found is success 4581 * because it means target I/Os completed in the meantime. 4582 */ 4583 bdev_io_complete(parent_io); 4584 } 4585 return; 4586 } 4587 4588 /* Use split_outstanding to manage the progress of aborting I/Os. */ 4589 parent_io->u.bdev.split_outstanding = matched_ios; 4590 } 4591 4592 static void 4593 bdev_abort(struct spdk_bdev_io *parent_io) 4594 { 4595 uint32_t matched_ios; 4596 4597 matched_ios = _bdev_abort(parent_io); 4598 4599 if (matched_ios == 0) { 4600 if (parent_io->internal.status == SPDK_BDEV_IO_STATUS_NOMEM) { 4601 bdev_queue_io_wait_with_cb(parent_io, bdev_abort_retry); 4602 } else { 4603 /* The case the no target I/O was found is failure. */ 4604 parent_io->internal.status = SPDK_BDEV_IO_STATUS_FAILED; 4605 bdev_io_complete(parent_io); 4606 } 4607 return; 4608 } 4609 4610 /* Use split_outstanding to manage the progress of aborting I/Os. */ 4611 parent_io->u.bdev.split_outstanding = matched_ios; 4612 } 4613 4614 int 4615 spdk_bdev_abort(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 4616 void *bio_cb_arg, 4617 spdk_bdev_io_completion_cb cb, void *cb_arg) 4618 { 4619 struct spdk_bdev *bdev = spdk_bdev_desc_get_bdev(desc); 4620 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 4621 struct spdk_bdev_io *bdev_io; 4622 4623 if (bio_cb_arg == NULL) { 4624 return -EINVAL; 4625 } 4626 4627 if (!spdk_bdev_io_type_supported(bdev, SPDK_BDEV_IO_TYPE_ABORT)) { 4628 return -ENOTSUP; 4629 } 4630 4631 bdev_io = bdev_channel_get_io(channel); 4632 if (bdev_io == NULL) { 4633 return -ENOMEM; 4634 } 4635 4636 bdev_io->internal.ch = channel; 4637 bdev_io->internal.desc = desc; 4638 bdev_io->internal.submit_tsc = spdk_get_ticks(); 4639 bdev_io->type = SPDK_BDEV_IO_TYPE_ABORT; 4640 bdev_io_init(bdev_io, bdev, cb_arg, cb); 4641 4642 bdev_io->u.bdev.abort.bio_cb_arg = bio_cb_arg; 4643 4644 /* Parent abort request is not submitted directly, but to manage its execution, 4645 * add it to the submitted list here. 4646 */ 4647 TAILQ_INSERT_TAIL(&channel->io_submitted, bdev_io, internal.ch_link); 4648 4649 bdev_abort(bdev_io); 4650 4651 return 0; 4652 } 4653 4654 int 4655 spdk_bdev_queue_io_wait(struct spdk_bdev *bdev, struct spdk_io_channel *ch, 4656 struct spdk_bdev_io_wait_entry *entry) 4657 { 4658 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 4659 struct spdk_bdev_mgmt_channel *mgmt_ch = channel->shared_resource->mgmt_ch; 4660 4661 if (bdev != entry->bdev) { 4662 SPDK_ERRLOG("bdevs do not match\n"); 4663 return -EINVAL; 4664 } 4665 4666 if (mgmt_ch->per_thread_cache_count > 0) { 4667 SPDK_ERRLOG("Cannot queue io_wait if spdk_bdev_io available in per-thread cache\n"); 4668 return -EINVAL; 4669 } 4670 4671 TAILQ_INSERT_TAIL(&mgmt_ch->io_wait_queue, entry, link); 4672 return 0; 4673 } 4674 4675 static void 4676 bdev_ch_retry_io(struct spdk_bdev_channel *bdev_ch) 4677 { 4678 struct spdk_bdev *bdev = bdev_ch->bdev; 4679 struct spdk_bdev_shared_resource *shared_resource = bdev_ch->shared_resource; 4680 struct spdk_bdev_io *bdev_io; 4681 4682 if (shared_resource->io_outstanding > shared_resource->nomem_threshold) { 4683 /* 4684 * Allow some more I/O to complete before retrying the nomem_io queue. 4685 * Some drivers (such as nvme) cannot immediately take a new I/O in 4686 * the context of a completion, because the resources for the I/O are 4687 * not released until control returns to the bdev poller. Also, we 4688 * may require several small I/O to complete before a larger I/O 4689 * (that requires splitting) can be submitted. 4690 */ 4691 return; 4692 } 4693 4694 while (!TAILQ_EMPTY(&shared_resource->nomem_io)) { 4695 bdev_io = TAILQ_FIRST(&shared_resource->nomem_io); 4696 TAILQ_REMOVE(&shared_resource->nomem_io, bdev_io, internal.link); 4697 bdev_io->internal.ch->io_outstanding++; 4698 shared_resource->io_outstanding++; 4699 bdev_io->internal.status = SPDK_BDEV_IO_STATUS_PENDING; 4700 bdev_io->internal.error.nvme.cdw0 = 0; 4701 bdev_io->num_retries++; 4702 bdev->fn_table->submit_request(spdk_bdev_io_get_io_channel(bdev_io), bdev_io); 4703 if (bdev_io->internal.status == SPDK_BDEV_IO_STATUS_NOMEM) { 4704 break; 4705 } 4706 } 4707 } 4708 4709 static inline void 4710 bdev_io_complete(void *ctx) 4711 { 4712 struct spdk_bdev_io *bdev_io = ctx; 4713 struct spdk_bdev_channel *bdev_ch = bdev_io->internal.ch; 4714 uint64_t tsc, tsc_diff; 4715 4716 if (spdk_unlikely(bdev_io->internal.in_submit_request || bdev_io->internal.io_submit_ch)) { 4717 /* 4718 * Send the completion to the thread that originally submitted the I/O, 4719 * which may not be the current thread in the case of QoS. 4720 */ 4721 if (bdev_io->internal.io_submit_ch) { 4722 bdev_io->internal.ch = bdev_io->internal.io_submit_ch; 4723 bdev_io->internal.io_submit_ch = NULL; 4724 } 4725 4726 /* 4727 * Defer completion to avoid potential infinite recursion if the 4728 * user's completion callback issues a new I/O. 4729 */ 4730 spdk_thread_send_msg(spdk_bdev_io_get_thread(bdev_io), 4731 bdev_io_complete, bdev_io); 4732 return; 4733 } 4734 4735 tsc = spdk_get_ticks(); 4736 tsc_diff = tsc - bdev_io->internal.submit_tsc; 4737 spdk_trace_record_tsc(tsc, TRACE_BDEV_IO_DONE, 0, 0, (uintptr_t)bdev_io, 0); 4738 /* When a bdev_io is split, the children bdev_io are not added 4739 * to the io_submitted list. So don't try to remove them in that 4740 * case. 4741 */ 4742 if (bdev_io->internal.cb != bdev_io_split_done) { 4743 TAILQ_REMOVE(&bdev_ch->io_submitted, bdev_io, internal.ch_link); 4744 } 4745 4746 if (bdev_io->internal.ch->histogram) { 4747 spdk_histogram_data_tally(bdev_io->internal.ch->histogram, tsc_diff); 4748 } 4749 4750 if (bdev_io->internal.status == SPDK_BDEV_IO_STATUS_SUCCESS) { 4751 switch (bdev_io->type) { 4752 case SPDK_BDEV_IO_TYPE_READ: 4753 bdev_io->internal.ch->stat.bytes_read += bdev_io->u.bdev.num_blocks * bdev_io->bdev->blocklen; 4754 bdev_io->internal.ch->stat.num_read_ops++; 4755 bdev_io->internal.ch->stat.read_latency_ticks += tsc_diff; 4756 break; 4757 case SPDK_BDEV_IO_TYPE_WRITE: 4758 bdev_io->internal.ch->stat.bytes_written += bdev_io->u.bdev.num_blocks * bdev_io->bdev->blocklen; 4759 bdev_io->internal.ch->stat.num_write_ops++; 4760 bdev_io->internal.ch->stat.write_latency_ticks += tsc_diff; 4761 break; 4762 case SPDK_BDEV_IO_TYPE_UNMAP: 4763 bdev_io->internal.ch->stat.bytes_unmapped += bdev_io->u.bdev.num_blocks * bdev_io->bdev->blocklen; 4764 bdev_io->internal.ch->stat.num_unmap_ops++; 4765 bdev_io->internal.ch->stat.unmap_latency_ticks += tsc_diff; 4766 break; 4767 case SPDK_BDEV_IO_TYPE_ZCOPY: 4768 /* Track the data in the start phase only */ 4769 if (bdev_io->u.bdev.zcopy.start) { 4770 if (bdev_io->u.bdev.zcopy.populate) { 4771 bdev_io->internal.ch->stat.bytes_read += 4772 bdev_io->u.bdev.num_blocks * bdev_io->bdev->blocklen; 4773 bdev_io->internal.ch->stat.num_read_ops++; 4774 bdev_io->internal.ch->stat.read_latency_ticks += tsc_diff; 4775 } else { 4776 bdev_io->internal.ch->stat.bytes_written += 4777 bdev_io->u.bdev.num_blocks * bdev_io->bdev->blocklen; 4778 bdev_io->internal.ch->stat.num_write_ops++; 4779 bdev_io->internal.ch->stat.write_latency_ticks += tsc_diff; 4780 } 4781 } 4782 break; 4783 default: 4784 break; 4785 } 4786 } 4787 4788 #ifdef SPDK_CONFIG_VTUNE 4789 uint64_t now_tsc = spdk_get_ticks(); 4790 if (now_tsc > (bdev_io->internal.ch->start_tsc + bdev_io->internal.ch->interval_tsc)) { 4791 uint64_t data[5]; 4792 4793 data[0] = bdev_io->internal.ch->stat.num_read_ops - bdev_io->internal.ch->prev_stat.num_read_ops; 4794 data[1] = bdev_io->internal.ch->stat.bytes_read - bdev_io->internal.ch->prev_stat.bytes_read; 4795 data[2] = bdev_io->internal.ch->stat.num_write_ops - bdev_io->internal.ch->prev_stat.num_write_ops; 4796 data[3] = bdev_io->internal.ch->stat.bytes_written - bdev_io->internal.ch->prev_stat.bytes_written; 4797 data[4] = bdev_io->bdev->fn_table->get_spin_time ? 4798 bdev_io->bdev->fn_table->get_spin_time(spdk_bdev_io_get_io_channel(bdev_io)) : 0; 4799 4800 __itt_metadata_add(g_bdev_mgr.domain, __itt_null, bdev_io->internal.ch->handle, 4801 __itt_metadata_u64, 5, data); 4802 4803 bdev_io->internal.ch->prev_stat = bdev_io->internal.ch->stat; 4804 bdev_io->internal.ch->start_tsc = now_tsc; 4805 } 4806 #endif 4807 4808 assert(bdev_io->internal.cb != NULL); 4809 assert(spdk_get_thread() == spdk_bdev_io_get_thread(bdev_io)); 4810 4811 bdev_io->internal.cb(bdev_io, bdev_io->internal.status == SPDK_BDEV_IO_STATUS_SUCCESS, 4812 bdev_io->internal.caller_ctx); 4813 } 4814 4815 static void 4816 bdev_reset_complete(struct spdk_io_channel_iter *i, int status) 4817 { 4818 struct spdk_bdev_io *bdev_io = spdk_io_channel_iter_get_ctx(i); 4819 4820 if (bdev_io->u.reset.ch_ref != NULL) { 4821 spdk_put_io_channel(bdev_io->u.reset.ch_ref); 4822 bdev_io->u.reset.ch_ref = NULL; 4823 } 4824 4825 bdev_io_complete(bdev_io); 4826 } 4827 4828 static void 4829 bdev_unfreeze_channel(struct spdk_io_channel_iter *i) 4830 { 4831 struct spdk_bdev_io *bdev_io = spdk_io_channel_iter_get_ctx(i); 4832 struct spdk_io_channel *_ch = spdk_io_channel_iter_get_channel(i); 4833 struct spdk_bdev_channel *ch = spdk_io_channel_get_ctx(_ch); 4834 struct spdk_bdev_io *queued_reset; 4835 4836 ch->flags &= ~BDEV_CH_RESET_IN_PROGRESS; 4837 while (!TAILQ_EMPTY(&ch->queued_resets)) { 4838 queued_reset = TAILQ_FIRST(&ch->queued_resets); 4839 TAILQ_REMOVE(&ch->queued_resets, queued_reset, internal.link); 4840 spdk_bdev_io_complete(queued_reset, bdev_io->internal.status); 4841 } 4842 4843 spdk_for_each_channel_continue(i, 0); 4844 } 4845 4846 void 4847 spdk_bdev_io_complete(struct spdk_bdev_io *bdev_io, enum spdk_bdev_io_status status) 4848 { 4849 struct spdk_bdev *bdev = bdev_io->bdev; 4850 struct spdk_bdev_channel *bdev_ch = bdev_io->internal.ch; 4851 struct spdk_bdev_shared_resource *shared_resource = bdev_ch->shared_resource; 4852 4853 bdev_io->internal.status = status; 4854 4855 if (spdk_unlikely(bdev_io->type == SPDK_BDEV_IO_TYPE_RESET)) { 4856 bool unlock_channels = false; 4857 4858 if (status == SPDK_BDEV_IO_STATUS_NOMEM) { 4859 SPDK_ERRLOG("NOMEM returned for reset\n"); 4860 } 4861 pthread_mutex_lock(&bdev->internal.mutex); 4862 if (bdev_io == bdev->internal.reset_in_progress) { 4863 bdev->internal.reset_in_progress = NULL; 4864 unlock_channels = true; 4865 } 4866 pthread_mutex_unlock(&bdev->internal.mutex); 4867 4868 if (unlock_channels) { 4869 spdk_for_each_channel(__bdev_to_io_dev(bdev), bdev_unfreeze_channel, 4870 bdev_io, bdev_reset_complete); 4871 return; 4872 } 4873 } else { 4874 _bdev_io_unset_bounce_buf(bdev_io); 4875 4876 assert(bdev_ch->io_outstanding > 0); 4877 assert(shared_resource->io_outstanding > 0); 4878 bdev_ch->io_outstanding--; 4879 shared_resource->io_outstanding--; 4880 4881 if (spdk_unlikely(status == SPDK_BDEV_IO_STATUS_NOMEM)) { 4882 TAILQ_INSERT_HEAD(&shared_resource->nomem_io, bdev_io, internal.link); 4883 /* 4884 * Wait for some of the outstanding I/O to complete before we 4885 * retry any of the nomem_io. Normally we will wait for 4886 * NOMEM_THRESHOLD_COUNT I/O to complete but for low queue 4887 * depth channels we will instead wait for half to complete. 4888 */ 4889 shared_resource->nomem_threshold = spdk_max((int64_t)shared_resource->io_outstanding / 2, 4890 (int64_t)shared_resource->io_outstanding - NOMEM_THRESHOLD_COUNT); 4891 return; 4892 } 4893 4894 if (spdk_unlikely(!TAILQ_EMPTY(&shared_resource->nomem_io))) { 4895 bdev_ch_retry_io(bdev_ch); 4896 } 4897 } 4898 4899 bdev_io_complete(bdev_io); 4900 } 4901 4902 void 4903 spdk_bdev_io_complete_scsi_status(struct spdk_bdev_io *bdev_io, enum spdk_scsi_status sc, 4904 enum spdk_scsi_sense sk, uint8_t asc, uint8_t ascq) 4905 { 4906 if (sc == SPDK_SCSI_STATUS_GOOD) { 4907 bdev_io->internal.status = SPDK_BDEV_IO_STATUS_SUCCESS; 4908 } else { 4909 bdev_io->internal.status = SPDK_BDEV_IO_STATUS_SCSI_ERROR; 4910 bdev_io->internal.error.scsi.sc = sc; 4911 bdev_io->internal.error.scsi.sk = sk; 4912 bdev_io->internal.error.scsi.asc = asc; 4913 bdev_io->internal.error.scsi.ascq = ascq; 4914 } 4915 4916 spdk_bdev_io_complete(bdev_io, bdev_io->internal.status); 4917 } 4918 4919 void 4920 spdk_bdev_io_get_scsi_status(const struct spdk_bdev_io *bdev_io, 4921 int *sc, int *sk, int *asc, int *ascq) 4922 { 4923 assert(sc != NULL); 4924 assert(sk != NULL); 4925 assert(asc != NULL); 4926 assert(ascq != NULL); 4927 4928 switch (bdev_io->internal.status) { 4929 case SPDK_BDEV_IO_STATUS_SUCCESS: 4930 *sc = SPDK_SCSI_STATUS_GOOD; 4931 *sk = SPDK_SCSI_SENSE_NO_SENSE; 4932 *asc = SPDK_SCSI_ASC_NO_ADDITIONAL_SENSE; 4933 *ascq = SPDK_SCSI_ASCQ_CAUSE_NOT_REPORTABLE; 4934 break; 4935 case SPDK_BDEV_IO_STATUS_NVME_ERROR: 4936 spdk_scsi_nvme_translate(bdev_io, sc, sk, asc, ascq); 4937 break; 4938 case SPDK_BDEV_IO_STATUS_SCSI_ERROR: 4939 *sc = bdev_io->internal.error.scsi.sc; 4940 *sk = bdev_io->internal.error.scsi.sk; 4941 *asc = bdev_io->internal.error.scsi.asc; 4942 *ascq = bdev_io->internal.error.scsi.ascq; 4943 break; 4944 default: 4945 *sc = SPDK_SCSI_STATUS_CHECK_CONDITION; 4946 *sk = SPDK_SCSI_SENSE_ABORTED_COMMAND; 4947 *asc = SPDK_SCSI_ASC_NO_ADDITIONAL_SENSE; 4948 *ascq = SPDK_SCSI_ASCQ_CAUSE_NOT_REPORTABLE; 4949 break; 4950 } 4951 } 4952 4953 void 4954 spdk_bdev_io_complete_nvme_status(struct spdk_bdev_io *bdev_io, uint32_t cdw0, int sct, int sc) 4955 { 4956 if (sct == SPDK_NVME_SCT_GENERIC && sc == SPDK_NVME_SC_SUCCESS) { 4957 bdev_io->internal.status = SPDK_BDEV_IO_STATUS_SUCCESS; 4958 } else { 4959 bdev_io->internal.status = SPDK_BDEV_IO_STATUS_NVME_ERROR; 4960 } 4961 4962 bdev_io->internal.error.nvme.cdw0 = cdw0; 4963 bdev_io->internal.error.nvme.sct = sct; 4964 bdev_io->internal.error.nvme.sc = sc; 4965 4966 spdk_bdev_io_complete(bdev_io, bdev_io->internal.status); 4967 } 4968 4969 void 4970 spdk_bdev_io_get_nvme_status(const struct spdk_bdev_io *bdev_io, uint32_t *cdw0, int *sct, int *sc) 4971 { 4972 assert(sct != NULL); 4973 assert(sc != NULL); 4974 assert(cdw0 != NULL); 4975 4976 if (bdev_io->internal.status == SPDK_BDEV_IO_STATUS_NVME_ERROR) { 4977 *sct = bdev_io->internal.error.nvme.sct; 4978 *sc = bdev_io->internal.error.nvme.sc; 4979 } else if (bdev_io->internal.status == SPDK_BDEV_IO_STATUS_SUCCESS) { 4980 *sct = SPDK_NVME_SCT_GENERIC; 4981 *sc = SPDK_NVME_SC_SUCCESS; 4982 } else if (bdev_io->internal.status == SPDK_BDEV_IO_STATUS_ABORTED) { 4983 *sct = SPDK_NVME_SCT_GENERIC; 4984 *sc = SPDK_NVME_SC_ABORTED_BY_REQUEST; 4985 } else { 4986 *sct = SPDK_NVME_SCT_GENERIC; 4987 *sc = SPDK_NVME_SC_INTERNAL_DEVICE_ERROR; 4988 } 4989 4990 *cdw0 = bdev_io->internal.error.nvme.cdw0; 4991 } 4992 4993 void 4994 spdk_bdev_io_get_nvme_fused_status(const struct spdk_bdev_io *bdev_io, uint32_t *cdw0, 4995 int *first_sct, int *first_sc, int *second_sct, int *second_sc) 4996 { 4997 assert(first_sct != NULL); 4998 assert(first_sc != NULL); 4999 assert(second_sct != NULL); 5000 assert(second_sc != NULL); 5001 assert(cdw0 != NULL); 5002 5003 if (bdev_io->internal.status == SPDK_BDEV_IO_STATUS_NVME_ERROR) { 5004 if (bdev_io->internal.error.nvme.sct == SPDK_NVME_SCT_MEDIA_ERROR && 5005 bdev_io->internal.error.nvme.sc == SPDK_NVME_SC_COMPARE_FAILURE) { 5006 *first_sct = bdev_io->internal.error.nvme.sct; 5007 *first_sc = bdev_io->internal.error.nvme.sc; 5008 *second_sct = SPDK_NVME_SCT_GENERIC; 5009 *second_sc = SPDK_NVME_SC_ABORTED_FAILED_FUSED; 5010 } else { 5011 *first_sct = SPDK_NVME_SCT_GENERIC; 5012 *first_sc = SPDK_NVME_SC_SUCCESS; 5013 *second_sct = bdev_io->internal.error.nvme.sct; 5014 *second_sc = bdev_io->internal.error.nvme.sc; 5015 } 5016 } else if (bdev_io->internal.status == SPDK_BDEV_IO_STATUS_SUCCESS) { 5017 *first_sct = SPDK_NVME_SCT_GENERIC; 5018 *first_sc = SPDK_NVME_SC_SUCCESS; 5019 *second_sct = SPDK_NVME_SCT_GENERIC; 5020 *second_sc = SPDK_NVME_SC_SUCCESS; 5021 } else if (bdev_io->internal.status == SPDK_BDEV_IO_STATUS_FIRST_FUSED_FAILED) { 5022 *first_sct = SPDK_NVME_SCT_GENERIC; 5023 *first_sc = SPDK_NVME_SC_INTERNAL_DEVICE_ERROR; 5024 *second_sct = SPDK_NVME_SCT_GENERIC; 5025 *second_sc = SPDK_NVME_SC_ABORTED_FAILED_FUSED; 5026 } else if (bdev_io->internal.status == SPDK_BDEV_IO_STATUS_MISCOMPARE) { 5027 *first_sct = SPDK_NVME_SCT_MEDIA_ERROR; 5028 *first_sc = SPDK_NVME_SC_COMPARE_FAILURE; 5029 *second_sct = SPDK_NVME_SCT_GENERIC; 5030 *second_sc = SPDK_NVME_SC_ABORTED_FAILED_FUSED; 5031 } else { 5032 *first_sct = SPDK_NVME_SCT_GENERIC; 5033 *first_sc = SPDK_NVME_SC_INTERNAL_DEVICE_ERROR; 5034 *second_sct = SPDK_NVME_SCT_GENERIC; 5035 *second_sc = SPDK_NVME_SC_INTERNAL_DEVICE_ERROR; 5036 } 5037 5038 *cdw0 = bdev_io->internal.error.nvme.cdw0; 5039 } 5040 5041 struct spdk_thread * 5042 spdk_bdev_io_get_thread(struct spdk_bdev_io *bdev_io) 5043 { 5044 return spdk_io_channel_get_thread(bdev_io->internal.ch->channel); 5045 } 5046 5047 struct spdk_io_channel * 5048 spdk_bdev_io_get_io_channel(struct spdk_bdev_io *bdev_io) 5049 { 5050 return bdev_io->internal.ch->channel; 5051 } 5052 5053 static void 5054 bdev_qos_config_limit(struct spdk_bdev *bdev, uint64_t *limits) 5055 { 5056 uint64_t min_qos_set; 5057 int i; 5058 5059 for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) { 5060 if (limits[i] != SPDK_BDEV_QOS_LIMIT_NOT_DEFINED) { 5061 break; 5062 } 5063 } 5064 5065 if (i == SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES) { 5066 SPDK_ERRLOG("Invalid rate limits set.\n"); 5067 return; 5068 } 5069 5070 for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) { 5071 if (limits[i] == SPDK_BDEV_QOS_LIMIT_NOT_DEFINED) { 5072 continue; 5073 } 5074 5075 if (bdev_qos_is_iops_rate_limit(i) == true) { 5076 min_qos_set = SPDK_BDEV_QOS_MIN_IOS_PER_SEC; 5077 } else { 5078 min_qos_set = SPDK_BDEV_QOS_MIN_BYTES_PER_SEC; 5079 } 5080 5081 if (limits[i] == 0 || limits[i] % min_qos_set) { 5082 SPDK_ERRLOG("Assigned limit %" PRIu64 " on bdev %s is not multiple of %" PRIu64 "\n", 5083 limits[i], bdev->name, min_qos_set); 5084 SPDK_ERRLOG("Failed to enable QoS on this bdev %s\n", bdev->name); 5085 return; 5086 } 5087 } 5088 5089 if (!bdev->internal.qos) { 5090 bdev->internal.qos = calloc(1, sizeof(*bdev->internal.qos)); 5091 if (!bdev->internal.qos) { 5092 SPDK_ERRLOG("Unable to allocate memory for QoS tracking\n"); 5093 return; 5094 } 5095 } 5096 5097 for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) { 5098 bdev->internal.qos->rate_limits[i].limit = limits[i]; 5099 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Bdev:%s QoS type:%d set:%lu\n", 5100 bdev->name, i, limits[i]); 5101 } 5102 5103 return; 5104 } 5105 5106 static void 5107 bdev_qos_config(struct spdk_bdev *bdev) 5108 { 5109 struct spdk_conf_section *sp = NULL; 5110 const char *val = NULL; 5111 int i = 0, j = 0; 5112 uint64_t limits[SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES] = {}; 5113 bool config_qos = false; 5114 5115 sp = spdk_conf_find_section(NULL, "QoS"); 5116 if (!sp) { 5117 return; 5118 } 5119 5120 while (j < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES) { 5121 limits[j] = SPDK_BDEV_QOS_LIMIT_NOT_DEFINED; 5122 5123 i = 0; 5124 while (true) { 5125 val = spdk_conf_section_get_nmval(sp, qos_conf_type[j], i, 0); 5126 if (!val) { 5127 break; 5128 } 5129 5130 if (strcmp(bdev->name, val) != 0) { 5131 i++; 5132 continue; 5133 } 5134 5135 val = spdk_conf_section_get_nmval(sp, qos_conf_type[j], i, 1); 5136 if (val) { 5137 if (bdev_qos_is_iops_rate_limit(j) == true) { 5138 limits[j] = strtoull(val, NULL, 10); 5139 } else { 5140 limits[j] = strtoull(val, NULL, 10) * 1024 * 1024; 5141 } 5142 config_qos = true; 5143 } 5144 5145 break; 5146 } 5147 5148 j++; 5149 } 5150 5151 if (config_qos == true) { 5152 bdev_qos_config_limit(bdev, limits); 5153 } 5154 5155 return; 5156 } 5157 5158 static int 5159 bdev_init(struct spdk_bdev *bdev) 5160 { 5161 char *bdev_name; 5162 5163 assert(bdev->module != NULL); 5164 5165 if (!bdev->name) { 5166 SPDK_ERRLOG("Bdev name is NULL\n"); 5167 return -EINVAL; 5168 } 5169 5170 if (!strlen(bdev->name)) { 5171 SPDK_ERRLOG("Bdev name must not be an empty string\n"); 5172 return -EINVAL; 5173 } 5174 5175 if (spdk_bdev_get_by_name(bdev->name)) { 5176 SPDK_ERRLOG("Bdev name:%s already exists\n", bdev->name); 5177 return -EEXIST; 5178 } 5179 5180 /* Users often register their own I/O devices using the bdev name. In 5181 * order to avoid conflicts, prepend bdev_. */ 5182 bdev_name = spdk_sprintf_alloc("bdev_%s", bdev->name); 5183 if (!bdev_name) { 5184 SPDK_ERRLOG("Unable to allocate memory for internal bdev name.\n"); 5185 return -ENOMEM; 5186 } 5187 5188 bdev->internal.status = SPDK_BDEV_STATUS_READY; 5189 bdev->internal.measured_queue_depth = UINT64_MAX; 5190 bdev->internal.claim_module = NULL; 5191 bdev->internal.qd_poller = NULL; 5192 bdev->internal.qos = NULL; 5193 5194 /* If the user didn't specify a uuid, generate one. */ 5195 if (spdk_mem_all_zero(&bdev->uuid, sizeof(bdev->uuid))) { 5196 spdk_uuid_generate(&bdev->uuid); 5197 } 5198 5199 if (spdk_bdev_get_buf_align(bdev) > 1) { 5200 if (bdev->split_on_optimal_io_boundary) { 5201 bdev->optimal_io_boundary = spdk_min(bdev->optimal_io_boundary, 5202 SPDK_BDEV_LARGE_BUF_MAX_SIZE / bdev->blocklen); 5203 } else { 5204 bdev->split_on_optimal_io_boundary = true; 5205 bdev->optimal_io_boundary = SPDK_BDEV_LARGE_BUF_MAX_SIZE / bdev->blocklen; 5206 } 5207 } 5208 5209 /* If the user didn't specify a write unit size, set it to one. */ 5210 if (bdev->write_unit_size == 0) { 5211 bdev->write_unit_size = 1; 5212 } 5213 5214 /* Set ACWU value to 1 if bdev module did not set it (does not support it natively) */ 5215 if (bdev->acwu == 0) { 5216 bdev->acwu = 1; 5217 } 5218 5219 TAILQ_INIT(&bdev->internal.open_descs); 5220 TAILQ_INIT(&bdev->internal.locked_ranges); 5221 TAILQ_INIT(&bdev->internal.pending_locked_ranges); 5222 5223 TAILQ_INIT(&bdev->aliases); 5224 5225 bdev->internal.reset_in_progress = NULL; 5226 5227 bdev_qos_config(bdev); 5228 5229 spdk_io_device_register(__bdev_to_io_dev(bdev), 5230 bdev_channel_create, bdev_channel_destroy, 5231 sizeof(struct spdk_bdev_channel), 5232 bdev_name); 5233 5234 free(bdev_name); 5235 5236 pthread_mutex_init(&bdev->internal.mutex, NULL); 5237 return 0; 5238 } 5239 5240 static void 5241 bdev_destroy_cb(void *io_device) 5242 { 5243 int rc; 5244 struct spdk_bdev *bdev; 5245 spdk_bdev_unregister_cb cb_fn; 5246 void *cb_arg; 5247 5248 bdev = __bdev_from_io_dev(io_device); 5249 cb_fn = bdev->internal.unregister_cb; 5250 cb_arg = bdev->internal.unregister_ctx; 5251 5252 rc = bdev->fn_table->destruct(bdev->ctxt); 5253 if (rc < 0) { 5254 SPDK_ERRLOG("destruct failed\n"); 5255 } 5256 if (rc <= 0 && cb_fn != NULL) { 5257 cb_fn(cb_arg, rc); 5258 } 5259 } 5260 5261 5262 static void 5263 bdev_fini(struct spdk_bdev *bdev) 5264 { 5265 pthread_mutex_destroy(&bdev->internal.mutex); 5266 5267 free(bdev->internal.qos); 5268 5269 spdk_io_device_unregister(__bdev_to_io_dev(bdev), bdev_destroy_cb); 5270 } 5271 5272 static void 5273 bdev_start(struct spdk_bdev *bdev) 5274 { 5275 struct spdk_bdev_module *module; 5276 uint32_t action; 5277 5278 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Inserting bdev %s into list\n", bdev->name); 5279 TAILQ_INSERT_TAIL(&g_bdev_mgr.bdevs, bdev, internal.link); 5280 5281 /* Examine configuration before initializing I/O */ 5282 TAILQ_FOREACH(module, &g_bdev_mgr.bdev_modules, internal.tailq) { 5283 if (module->examine_config && bdev_ok_to_examine(bdev)) { 5284 action = module->internal.action_in_progress; 5285 module->internal.action_in_progress++; 5286 module->examine_config(bdev); 5287 if (action != module->internal.action_in_progress) { 5288 SPDK_ERRLOG("examine_config for module %s did not call spdk_bdev_module_examine_done()\n", 5289 module->name); 5290 } 5291 } 5292 } 5293 5294 if (bdev->internal.claim_module && bdev_ok_to_examine(bdev)) { 5295 if (bdev->internal.claim_module->examine_disk) { 5296 bdev->internal.claim_module->internal.action_in_progress++; 5297 bdev->internal.claim_module->examine_disk(bdev); 5298 } 5299 return; 5300 } 5301 5302 TAILQ_FOREACH(module, &g_bdev_mgr.bdev_modules, internal.tailq) { 5303 if (module->examine_disk && bdev_ok_to_examine(bdev)) { 5304 module->internal.action_in_progress++; 5305 module->examine_disk(bdev); 5306 } 5307 } 5308 } 5309 5310 int 5311 spdk_bdev_register(struct spdk_bdev *bdev) 5312 { 5313 int rc = bdev_init(bdev); 5314 5315 if (rc == 0) { 5316 bdev_start(bdev); 5317 } 5318 5319 spdk_notify_send("bdev_register", spdk_bdev_get_name(bdev)); 5320 return rc; 5321 } 5322 5323 int 5324 spdk_vbdev_register(struct spdk_bdev *vbdev, struct spdk_bdev **base_bdevs, int base_bdev_count) 5325 { 5326 SPDK_ERRLOG("This function is deprecated. Use spdk_bdev_register() instead.\n"); 5327 return spdk_bdev_register(vbdev); 5328 } 5329 5330 void 5331 spdk_bdev_destruct_done(struct spdk_bdev *bdev, int bdeverrno) 5332 { 5333 if (bdev->internal.unregister_cb != NULL) { 5334 bdev->internal.unregister_cb(bdev->internal.unregister_ctx, bdeverrno); 5335 } 5336 } 5337 5338 static void 5339 _remove_notify(void *arg) 5340 { 5341 struct spdk_bdev_desc *desc = arg; 5342 5343 pthread_mutex_lock(&desc->mutex); 5344 desc->refs--; 5345 5346 if (!desc->closed) { 5347 pthread_mutex_unlock(&desc->mutex); 5348 if (desc->callback.open_with_ext) { 5349 desc->callback.event_fn(SPDK_BDEV_EVENT_REMOVE, desc->bdev, desc->callback.ctx); 5350 } else { 5351 desc->callback.remove_fn(desc->callback.ctx); 5352 } 5353 return; 5354 } else if (0 == desc->refs) { 5355 /* This descriptor was closed after this remove_notify message was sent. 5356 * spdk_bdev_close() could not free the descriptor since this message was 5357 * in flight, so we free it now using bdev_desc_free(). 5358 */ 5359 pthread_mutex_unlock(&desc->mutex); 5360 bdev_desc_free(desc); 5361 return; 5362 } 5363 pthread_mutex_unlock(&desc->mutex); 5364 } 5365 5366 /* Must be called while holding bdev->internal.mutex. 5367 * returns: 0 - bdev removed and ready to be destructed. 5368 * -EBUSY - bdev can't be destructed yet. */ 5369 static int 5370 bdev_unregister_unsafe(struct spdk_bdev *bdev) 5371 { 5372 struct spdk_bdev_desc *desc, *tmp; 5373 int rc = 0; 5374 5375 /* Notify each descriptor about hotremoval */ 5376 TAILQ_FOREACH_SAFE(desc, &bdev->internal.open_descs, link, tmp) { 5377 rc = -EBUSY; 5378 pthread_mutex_lock(&desc->mutex); 5379 /* 5380 * Defer invocation of the event_cb to a separate message that will 5381 * run later on its thread. This ensures this context unwinds and 5382 * we don't recursively unregister this bdev again if the event_cb 5383 * immediately closes its descriptor. 5384 */ 5385 desc->refs++; 5386 spdk_thread_send_msg(desc->thread, _remove_notify, desc); 5387 pthread_mutex_unlock(&desc->mutex); 5388 } 5389 5390 /* If there are no descriptors, proceed removing the bdev */ 5391 if (rc == 0) { 5392 TAILQ_REMOVE(&g_bdev_mgr.bdevs, bdev, internal.link); 5393 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Removing bdev %s from list done\n", bdev->name); 5394 spdk_notify_send("bdev_unregister", spdk_bdev_get_name(bdev)); 5395 } 5396 5397 return rc; 5398 } 5399 5400 void 5401 spdk_bdev_unregister(struct spdk_bdev *bdev, spdk_bdev_unregister_cb cb_fn, void *cb_arg) 5402 { 5403 struct spdk_thread *thread; 5404 int rc; 5405 5406 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Removing bdev %s from list\n", bdev->name); 5407 5408 thread = spdk_get_thread(); 5409 if (!thread) { 5410 /* The user called this from a non-SPDK thread. */ 5411 if (cb_fn != NULL) { 5412 cb_fn(cb_arg, -ENOTSUP); 5413 } 5414 return; 5415 } 5416 5417 pthread_mutex_lock(&g_bdev_mgr.mutex); 5418 pthread_mutex_lock(&bdev->internal.mutex); 5419 if (bdev->internal.status == SPDK_BDEV_STATUS_REMOVING) { 5420 pthread_mutex_unlock(&bdev->internal.mutex); 5421 pthread_mutex_unlock(&g_bdev_mgr.mutex); 5422 if (cb_fn) { 5423 cb_fn(cb_arg, -EBUSY); 5424 } 5425 return; 5426 } 5427 5428 bdev->internal.status = SPDK_BDEV_STATUS_REMOVING; 5429 bdev->internal.unregister_cb = cb_fn; 5430 bdev->internal.unregister_ctx = cb_arg; 5431 5432 /* Call under lock. */ 5433 rc = bdev_unregister_unsafe(bdev); 5434 pthread_mutex_unlock(&bdev->internal.mutex); 5435 pthread_mutex_unlock(&g_bdev_mgr.mutex); 5436 5437 if (rc == 0) { 5438 bdev_fini(bdev); 5439 } 5440 } 5441 5442 static void 5443 bdev_dummy_event_cb(void *remove_ctx) 5444 { 5445 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Bdev remove event received with no remove callback specified"); 5446 } 5447 5448 static int 5449 bdev_start_qos(struct spdk_bdev *bdev) 5450 { 5451 struct set_qos_limit_ctx *ctx; 5452 5453 /* Enable QoS */ 5454 if (bdev->internal.qos && bdev->internal.qos->thread == NULL) { 5455 ctx = calloc(1, sizeof(*ctx)); 5456 if (ctx == NULL) { 5457 SPDK_ERRLOG("Failed to allocate memory for QoS context\n"); 5458 return -ENOMEM; 5459 } 5460 ctx->bdev = bdev; 5461 spdk_for_each_channel(__bdev_to_io_dev(bdev), 5462 bdev_enable_qos_msg, ctx, 5463 bdev_enable_qos_done); 5464 } 5465 5466 return 0; 5467 } 5468 5469 static int 5470 bdev_open(struct spdk_bdev *bdev, bool write, struct spdk_bdev_desc *desc) 5471 { 5472 struct spdk_thread *thread; 5473 int rc = 0; 5474 5475 thread = spdk_get_thread(); 5476 if (!thread) { 5477 SPDK_ERRLOG("Cannot open bdev from non-SPDK thread.\n"); 5478 return -ENOTSUP; 5479 } 5480 5481 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Opening descriptor %p for bdev %s on thread %p\n", desc, bdev->name, 5482 spdk_get_thread()); 5483 5484 desc->bdev = bdev; 5485 desc->thread = thread; 5486 desc->write = write; 5487 5488 pthread_mutex_lock(&bdev->internal.mutex); 5489 if (bdev->internal.status == SPDK_BDEV_STATUS_REMOVING) { 5490 pthread_mutex_unlock(&bdev->internal.mutex); 5491 return -ENODEV; 5492 } 5493 5494 if (write && bdev->internal.claim_module) { 5495 SPDK_ERRLOG("Could not open %s - %s module already claimed it\n", 5496 bdev->name, bdev->internal.claim_module->name); 5497 pthread_mutex_unlock(&bdev->internal.mutex); 5498 return -EPERM; 5499 } 5500 5501 rc = bdev_start_qos(bdev); 5502 if (rc != 0) { 5503 SPDK_ERRLOG("Failed to start QoS on bdev %s\n", bdev->name); 5504 pthread_mutex_unlock(&bdev->internal.mutex); 5505 return rc; 5506 } 5507 5508 TAILQ_INSERT_TAIL(&bdev->internal.open_descs, desc, link); 5509 5510 pthread_mutex_unlock(&bdev->internal.mutex); 5511 5512 return 0; 5513 } 5514 5515 int 5516 spdk_bdev_open(struct spdk_bdev *bdev, bool write, spdk_bdev_remove_cb_t remove_cb, 5517 void *remove_ctx, struct spdk_bdev_desc **_desc) 5518 { 5519 struct spdk_bdev_desc *desc; 5520 int rc; 5521 5522 desc = calloc(1, sizeof(*desc)); 5523 if (desc == NULL) { 5524 SPDK_ERRLOG("Failed to allocate memory for bdev descriptor\n"); 5525 return -ENOMEM; 5526 } 5527 5528 if (remove_cb == NULL) { 5529 remove_cb = bdev_dummy_event_cb; 5530 } 5531 5532 TAILQ_INIT(&desc->pending_media_events); 5533 TAILQ_INIT(&desc->free_media_events); 5534 5535 desc->callback.open_with_ext = false; 5536 desc->callback.remove_fn = remove_cb; 5537 desc->callback.ctx = remove_ctx; 5538 pthread_mutex_init(&desc->mutex, NULL); 5539 5540 pthread_mutex_lock(&g_bdev_mgr.mutex); 5541 5542 rc = bdev_open(bdev, write, desc); 5543 if (rc != 0) { 5544 bdev_desc_free(desc); 5545 desc = NULL; 5546 } 5547 5548 *_desc = desc; 5549 5550 pthread_mutex_unlock(&g_bdev_mgr.mutex); 5551 5552 return rc; 5553 } 5554 5555 int 5556 spdk_bdev_open_ext(const char *bdev_name, bool write, spdk_bdev_event_cb_t event_cb, 5557 void *event_ctx, struct spdk_bdev_desc **_desc) 5558 { 5559 struct spdk_bdev_desc *desc; 5560 struct spdk_bdev *bdev; 5561 unsigned int event_id; 5562 int rc; 5563 5564 if (event_cb == NULL) { 5565 SPDK_ERRLOG("Missing event callback function\n"); 5566 return -EINVAL; 5567 } 5568 5569 pthread_mutex_lock(&g_bdev_mgr.mutex); 5570 5571 bdev = spdk_bdev_get_by_name(bdev_name); 5572 5573 if (bdev == NULL) { 5574 SPDK_ERRLOG("Failed to find bdev with name: %s\n", bdev_name); 5575 pthread_mutex_unlock(&g_bdev_mgr.mutex); 5576 return -EINVAL; 5577 } 5578 5579 desc = calloc(1, sizeof(*desc)); 5580 if (desc == NULL) { 5581 SPDK_ERRLOG("Failed to allocate memory for bdev descriptor\n"); 5582 pthread_mutex_unlock(&g_bdev_mgr.mutex); 5583 return -ENOMEM; 5584 } 5585 5586 TAILQ_INIT(&desc->pending_media_events); 5587 TAILQ_INIT(&desc->free_media_events); 5588 5589 desc->callback.open_with_ext = true; 5590 desc->callback.event_fn = event_cb; 5591 desc->callback.ctx = event_ctx; 5592 pthread_mutex_init(&desc->mutex, NULL); 5593 5594 if (bdev->media_events) { 5595 desc->media_events_buffer = calloc(MEDIA_EVENT_POOL_SIZE, 5596 sizeof(*desc->media_events_buffer)); 5597 if (desc->media_events_buffer == NULL) { 5598 SPDK_ERRLOG("Failed to initialize media event pool\n"); 5599 bdev_desc_free(desc); 5600 pthread_mutex_unlock(&g_bdev_mgr.mutex); 5601 return -ENOMEM; 5602 } 5603 5604 for (event_id = 0; event_id < MEDIA_EVENT_POOL_SIZE; ++event_id) { 5605 TAILQ_INSERT_TAIL(&desc->free_media_events, 5606 &desc->media_events_buffer[event_id], tailq); 5607 } 5608 } 5609 5610 rc = bdev_open(bdev, write, desc); 5611 if (rc != 0) { 5612 bdev_desc_free(desc); 5613 desc = NULL; 5614 } 5615 5616 *_desc = desc; 5617 5618 pthread_mutex_unlock(&g_bdev_mgr.mutex); 5619 5620 return rc; 5621 } 5622 5623 void 5624 spdk_bdev_close(struct spdk_bdev_desc *desc) 5625 { 5626 struct spdk_bdev *bdev = spdk_bdev_desc_get_bdev(desc); 5627 int rc; 5628 5629 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Closing descriptor %p for bdev %s on thread %p\n", desc, bdev->name, 5630 spdk_get_thread()); 5631 5632 assert(desc->thread == spdk_get_thread()); 5633 5634 spdk_poller_unregister(&desc->io_timeout_poller); 5635 5636 pthread_mutex_lock(&bdev->internal.mutex); 5637 pthread_mutex_lock(&desc->mutex); 5638 5639 TAILQ_REMOVE(&bdev->internal.open_descs, desc, link); 5640 5641 desc->closed = true; 5642 5643 if (0 == desc->refs) { 5644 pthread_mutex_unlock(&desc->mutex); 5645 bdev_desc_free(desc); 5646 } else { 5647 pthread_mutex_unlock(&desc->mutex); 5648 } 5649 5650 /* If no more descriptors, kill QoS channel */ 5651 if (bdev->internal.qos && TAILQ_EMPTY(&bdev->internal.open_descs)) { 5652 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Closed last descriptor for bdev %s on thread %p. Stopping QoS.\n", 5653 bdev->name, spdk_get_thread()); 5654 5655 if (bdev_qos_destroy(bdev)) { 5656 /* There isn't anything we can do to recover here. Just let the 5657 * old QoS poller keep running. The QoS handling won't change 5658 * cores when the user allocates a new channel, but it won't break. */ 5659 SPDK_ERRLOG("Unable to shut down QoS poller. It will continue running on the current thread.\n"); 5660 } 5661 } 5662 5663 spdk_bdev_set_qd_sampling_period(bdev, 0); 5664 5665 if (bdev->internal.status == SPDK_BDEV_STATUS_REMOVING && TAILQ_EMPTY(&bdev->internal.open_descs)) { 5666 rc = bdev_unregister_unsafe(bdev); 5667 pthread_mutex_unlock(&bdev->internal.mutex); 5668 5669 if (rc == 0) { 5670 bdev_fini(bdev); 5671 } 5672 } else { 5673 pthread_mutex_unlock(&bdev->internal.mutex); 5674 } 5675 } 5676 5677 int 5678 spdk_bdev_module_claim_bdev(struct spdk_bdev *bdev, struct spdk_bdev_desc *desc, 5679 struct spdk_bdev_module *module) 5680 { 5681 if (bdev->internal.claim_module != NULL) { 5682 SPDK_ERRLOG("bdev %s already claimed by module %s\n", bdev->name, 5683 bdev->internal.claim_module->name); 5684 return -EPERM; 5685 } 5686 5687 if (desc && !desc->write) { 5688 desc->write = true; 5689 } 5690 5691 bdev->internal.claim_module = module; 5692 return 0; 5693 } 5694 5695 void 5696 spdk_bdev_module_release_bdev(struct spdk_bdev *bdev) 5697 { 5698 assert(bdev->internal.claim_module != NULL); 5699 bdev->internal.claim_module = NULL; 5700 } 5701 5702 struct spdk_bdev * 5703 spdk_bdev_desc_get_bdev(struct spdk_bdev_desc *desc) 5704 { 5705 assert(desc != NULL); 5706 return desc->bdev; 5707 } 5708 5709 void 5710 spdk_bdev_io_get_iovec(struct spdk_bdev_io *bdev_io, struct iovec **iovp, int *iovcntp) 5711 { 5712 struct iovec *iovs; 5713 int iovcnt; 5714 5715 if (bdev_io == NULL) { 5716 return; 5717 } 5718 5719 switch (bdev_io->type) { 5720 case SPDK_BDEV_IO_TYPE_READ: 5721 case SPDK_BDEV_IO_TYPE_WRITE: 5722 case SPDK_BDEV_IO_TYPE_ZCOPY: 5723 iovs = bdev_io->u.bdev.iovs; 5724 iovcnt = bdev_io->u.bdev.iovcnt; 5725 break; 5726 default: 5727 iovs = NULL; 5728 iovcnt = 0; 5729 break; 5730 } 5731 5732 if (iovp) { 5733 *iovp = iovs; 5734 } 5735 if (iovcntp) { 5736 *iovcntp = iovcnt; 5737 } 5738 } 5739 5740 void * 5741 spdk_bdev_io_get_md_buf(struct spdk_bdev_io *bdev_io) 5742 { 5743 if (bdev_io == NULL) { 5744 return NULL; 5745 } 5746 5747 if (!spdk_bdev_is_md_separate(bdev_io->bdev)) { 5748 return NULL; 5749 } 5750 5751 if (bdev_io->type == SPDK_BDEV_IO_TYPE_READ || 5752 bdev_io->type == SPDK_BDEV_IO_TYPE_WRITE) { 5753 return bdev_io->u.bdev.md_buf; 5754 } 5755 5756 return NULL; 5757 } 5758 5759 void 5760 spdk_bdev_module_list_add(struct spdk_bdev_module *bdev_module) 5761 { 5762 5763 if (spdk_bdev_module_list_find(bdev_module->name)) { 5764 SPDK_ERRLOG("ERROR: module '%s' already registered.\n", bdev_module->name); 5765 assert(false); 5766 } 5767 5768 /* 5769 * Modules with examine callbacks must be initialized first, so they are 5770 * ready to handle examine callbacks from later modules that will 5771 * register physical bdevs. 5772 */ 5773 if (bdev_module->examine_config != NULL || bdev_module->examine_disk != NULL) { 5774 TAILQ_INSERT_HEAD(&g_bdev_mgr.bdev_modules, bdev_module, internal.tailq); 5775 } else { 5776 TAILQ_INSERT_TAIL(&g_bdev_mgr.bdev_modules, bdev_module, internal.tailq); 5777 } 5778 } 5779 5780 struct spdk_bdev_module * 5781 spdk_bdev_module_list_find(const char *name) 5782 { 5783 struct spdk_bdev_module *bdev_module; 5784 5785 TAILQ_FOREACH(bdev_module, &g_bdev_mgr.bdev_modules, internal.tailq) { 5786 if (strcmp(name, bdev_module->name) == 0) { 5787 break; 5788 } 5789 } 5790 5791 return bdev_module; 5792 } 5793 5794 static void 5795 bdev_write_zero_buffer_next(void *_bdev_io) 5796 { 5797 struct spdk_bdev_io *bdev_io = _bdev_io; 5798 uint64_t num_bytes, num_blocks; 5799 void *md_buf = NULL; 5800 int rc; 5801 5802 num_bytes = spdk_min(_bdev_get_block_size_with_md(bdev_io->bdev) * 5803 bdev_io->u.bdev.split_remaining_num_blocks, 5804 ZERO_BUFFER_SIZE); 5805 num_blocks = num_bytes / _bdev_get_block_size_with_md(bdev_io->bdev); 5806 5807 if (spdk_bdev_is_md_separate(bdev_io->bdev)) { 5808 md_buf = (char *)g_bdev_mgr.zero_buffer + 5809 spdk_bdev_get_block_size(bdev_io->bdev) * num_blocks; 5810 } 5811 5812 rc = bdev_write_blocks_with_md(bdev_io->internal.desc, 5813 spdk_io_channel_from_ctx(bdev_io->internal.ch), 5814 g_bdev_mgr.zero_buffer, md_buf, 5815 bdev_io->u.bdev.split_current_offset_blocks, num_blocks, 5816 bdev_write_zero_buffer_done, bdev_io); 5817 if (rc == 0) { 5818 bdev_io->u.bdev.split_remaining_num_blocks -= num_blocks; 5819 bdev_io->u.bdev.split_current_offset_blocks += num_blocks; 5820 } else if (rc == -ENOMEM) { 5821 bdev_queue_io_wait_with_cb(bdev_io, bdev_write_zero_buffer_next); 5822 } else { 5823 bdev_io->internal.status = SPDK_BDEV_IO_STATUS_FAILED; 5824 bdev_io->internal.cb(bdev_io, false, bdev_io->internal.caller_ctx); 5825 } 5826 } 5827 5828 static void 5829 bdev_write_zero_buffer_done(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg) 5830 { 5831 struct spdk_bdev_io *parent_io = cb_arg; 5832 5833 spdk_bdev_free_io(bdev_io); 5834 5835 if (!success) { 5836 parent_io->internal.status = SPDK_BDEV_IO_STATUS_FAILED; 5837 parent_io->internal.cb(parent_io, false, parent_io->internal.caller_ctx); 5838 return; 5839 } 5840 5841 if (parent_io->u.bdev.split_remaining_num_blocks == 0) { 5842 parent_io->internal.status = SPDK_BDEV_IO_STATUS_SUCCESS; 5843 parent_io->internal.cb(parent_io, true, parent_io->internal.caller_ctx); 5844 return; 5845 } 5846 5847 bdev_write_zero_buffer_next(parent_io); 5848 } 5849 5850 static void 5851 bdev_set_qos_limit_done(struct set_qos_limit_ctx *ctx, int status) 5852 { 5853 pthread_mutex_lock(&ctx->bdev->internal.mutex); 5854 ctx->bdev->internal.qos_mod_in_progress = false; 5855 pthread_mutex_unlock(&ctx->bdev->internal.mutex); 5856 5857 if (ctx->cb_fn) { 5858 ctx->cb_fn(ctx->cb_arg, status); 5859 } 5860 free(ctx); 5861 } 5862 5863 static void 5864 bdev_disable_qos_done(void *cb_arg) 5865 { 5866 struct set_qos_limit_ctx *ctx = cb_arg; 5867 struct spdk_bdev *bdev = ctx->bdev; 5868 struct spdk_bdev_io *bdev_io; 5869 struct spdk_bdev_qos *qos; 5870 5871 pthread_mutex_lock(&bdev->internal.mutex); 5872 qos = bdev->internal.qos; 5873 bdev->internal.qos = NULL; 5874 pthread_mutex_unlock(&bdev->internal.mutex); 5875 5876 while (!TAILQ_EMPTY(&qos->queued)) { 5877 /* Send queued I/O back to their original thread for resubmission. */ 5878 bdev_io = TAILQ_FIRST(&qos->queued); 5879 TAILQ_REMOVE(&qos->queued, bdev_io, internal.link); 5880 5881 if (bdev_io->internal.io_submit_ch) { 5882 /* 5883 * Channel was changed when sending it to the QoS thread - change it back 5884 * before sending it back to the original thread. 5885 */ 5886 bdev_io->internal.ch = bdev_io->internal.io_submit_ch; 5887 bdev_io->internal.io_submit_ch = NULL; 5888 } 5889 5890 spdk_thread_send_msg(spdk_bdev_io_get_thread(bdev_io), 5891 _bdev_io_submit, bdev_io); 5892 } 5893 5894 if (qos->thread != NULL) { 5895 spdk_put_io_channel(spdk_io_channel_from_ctx(qos->ch)); 5896 spdk_poller_unregister(&qos->poller); 5897 } 5898 5899 free(qos); 5900 5901 bdev_set_qos_limit_done(ctx, 0); 5902 } 5903 5904 static void 5905 bdev_disable_qos_msg_done(struct spdk_io_channel_iter *i, int status) 5906 { 5907 void *io_device = spdk_io_channel_iter_get_io_device(i); 5908 struct spdk_bdev *bdev = __bdev_from_io_dev(io_device); 5909 struct set_qos_limit_ctx *ctx = spdk_io_channel_iter_get_ctx(i); 5910 struct spdk_thread *thread; 5911 5912 pthread_mutex_lock(&bdev->internal.mutex); 5913 thread = bdev->internal.qos->thread; 5914 pthread_mutex_unlock(&bdev->internal.mutex); 5915 5916 if (thread != NULL) { 5917 spdk_thread_send_msg(thread, bdev_disable_qos_done, ctx); 5918 } else { 5919 bdev_disable_qos_done(ctx); 5920 } 5921 } 5922 5923 static void 5924 bdev_disable_qos_msg(struct spdk_io_channel_iter *i) 5925 { 5926 struct spdk_io_channel *ch = spdk_io_channel_iter_get_channel(i); 5927 struct spdk_bdev_channel *bdev_ch = spdk_io_channel_get_ctx(ch); 5928 5929 bdev_ch->flags &= ~BDEV_CH_QOS_ENABLED; 5930 5931 spdk_for_each_channel_continue(i, 0); 5932 } 5933 5934 static void 5935 bdev_update_qos_rate_limit_msg(void *cb_arg) 5936 { 5937 struct set_qos_limit_ctx *ctx = cb_arg; 5938 struct spdk_bdev *bdev = ctx->bdev; 5939 5940 pthread_mutex_lock(&bdev->internal.mutex); 5941 bdev_qos_update_max_quota_per_timeslice(bdev->internal.qos); 5942 pthread_mutex_unlock(&bdev->internal.mutex); 5943 5944 bdev_set_qos_limit_done(ctx, 0); 5945 } 5946 5947 static void 5948 bdev_enable_qos_msg(struct spdk_io_channel_iter *i) 5949 { 5950 void *io_device = spdk_io_channel_iter_get_io_device(i); 5951 struct spdk_bdev *bdev = __bdev_from_io_dev(io_device); 5952 struct spdk_io_channel *ch = spdk_io_channel_iter_get_channel(i); 5953 struct spdk_bdev_channel *bdev_ch = spdk_io_channel_get_ctx(ch); 5954 5955 pthread_mutex_lock(&bdev->internal.mutex); 5956 bdev_enable_qos(bdev, bdev_ch); 5957 pthread_mutex_unlock(&bdev->internal.mutex); 5958 spdk_for_each_channel_continue(i, 0); 5959 } 5960 5961 static void 5962 bdev_enable_qos_done(struct spdk_io_channel_iter *i, int status) 5963 { 5964 struct set_qos_limit_ctx *ctx = spdk_io_channel_iter_get_ctx(i); 5965 5966 bdev_set_qos_limit_done(ctx, status); 5967 } 5968 5969 static void 5970 bdev_set_qos_rate_limits(struct spdk_bdev *bdev, uint64_t *limits) 5971 { 5972 int i; 5973 5974 assert(bdev->internal.qos != NULL); 5975 5976 for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) { 5977 if (limits[i] != SPDK_BDEV_QOS_LIMIT_NOT_DEFINED) { 5978 bdev->internal.qos->rate_limits[i].limit = limits[i]; 5979 5980 if (limits[i] == 0) { 5981 bdev->internal.qos->rate_limits[i].limit = 5982 SPDK_BDEV_QOS_LIMIT_NOT_DEFINED; 5983 } 5984 } 5985 } 5986 } 5987 5988 void 5989 spdk_bdev_set_qos_rate_limits(struct spdk_bdev *bdev, uint64_t *limits, 5990 void (*cb_fn)(void *cb_arg, int status), void *cb_arg) 5991 { 5992 struct set_qos_limit_ctx *ctx; 5993 uint32_t limit_set_complement; 5994 uint64_t min_limit_per_sec; 5995 int i; 5996 bool disable_rate_limit = true; 5997 5998 for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) { 5999 if (limits[i] == SPDK_BDEV_QOS_LIMIT_NOT_DEFINED) { 6000 continue; 6001 } 6002 6003 if (limits[i] > 0) { 6004 disable_rate_limit = false; 6005 } 6006 6007 if (bdev_qos_is_iops_rate_limit(i) == true) { 6008 min_limit_per_sec = SPDK_BDEV_QOS_MIN_IOS_PER_SEC; 6009 } else { 6010 /* Change from megabyte to byte rate limit */ 6011 limits[i] = limits[i] * 1024 * 1024; 6012 min_limit_per_sec = SPDK_BDEV_QOS_MIN_BYTES_PER_SEC; 6013 } 6014 6015 limit_set_complement = limits[i] % min_limit_per_sec; 6016 if (limit_set_complement) { 6017 SPDK_ERRLOG("Requested rate limit %" PRIu64 " is not a multiple of %" PRIu64 "\n", 6018 limits[i], min_limit_per_sec); 6019 limits[i] += min_limit_per_sec - limit_set_complement; 6020 SPDK_ERRLOG("Round up the rate limit to %" PRIu64 "\n", limits[i]); 6021 } 6022 } 6023 6024 ctx = calloc(1, sizeof(*ctx)); 6025 if (ctx == NULL) { 6026 cb_fn(cb_arg, -ENOMEM); 6027 return; 6028 } 6029 6030 ctx->cb_fn = cb_fn; 6031 ctx->cb_arg = cb_arg; 6032 ctx->bdev = bdev; 6033 6034 pthread_mutex_lock(&bdev->internal.mutex); 6035 if (bdev->internal.qos_mod_in_progress) { 6036 pthread_mutex_unlock(&bdev->internal.mutex); 6037 free(ctx); 6038 cb_fn(cb_arg, -EAGAIN); 6039 return; 6040 } 6041 bdev->internal.qos_mod_in_progress = true; 6042 6043 if (disable_rate_limit == true && bdev->internal.qos) { 6044 for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) { 6045 if (limits[i] == SPDK_BDEV_QOS_LIMIT_NOT_DEFINED && 6046 (bdev->internal.qos->rate_limits[i].limit > 0 && 6047 bdev->internal.qos->rate_limits[i].limit != 6048 SPDK_BDEV_QOS_LIMIT_NOT_DEFINED)) { 6049 disable_rate_limit = false; 6050 break; 6051 } 6052 } 6053 } 6054 6055 if (disable_rate_limit == false) { 6056 if (bdev->internal.qos == NULL) { 6057 bdev->internal.qos = calloc(1, sizeof(*bdev->internal.qos)); 6058 if (!bdev->internal.qos) { 6059 pthread_mutex_unlock(&bdev->internal.mutex); 6060 SPDK_ERRLOG("Unable to allocate memory for QoS tracking\n"); 6061 bdev_set_qos_limit_done(ctx, -ENOMEM); 6062 return; 6063 } 6064 } 6065 6066 if (bdev->internal.qos->thread == NULL) { 6067 /* Enabling */ 6068 bdev_set_qos_rate_limits(bdev, limits); 6069 6070 spdk_for_each_channel(__bdev_to_io_dev(bdev), 6071 bdev_enable_qos_msg, ctx, 6072 bdev_enable_qos_done); 6073 } else { 6074 /* Updating */ 6075 bdev_set_qos_rate_limits(bdev, limits); 6076 6077 spdk_thread_send_msg(bdev->internal.qos->thread, 6078 bdev_update_qos_rate_limit_msg, ctx); 6079 } 6080 } else { 6081 if (bdev->internal.qos != NULL) { 6082 bdev_set_qos_rate_limits(bdev, limits); 6083 6084 /* Disabling */ 6085 spdk_for_each_channel(__bdev_to_io_dev(bdev), 6086 bdev_disable_qos_msg, ctx, 6087 bdev_disable_qos_msg_done); 6088 } else { 6089 pthread_mutex_unlock(&bdev->internal.mutex); 6090 bdev_set_qos_limit_done(ctx, 0); 6091 return; 6092 } 6093 } 6094 6095 pthread_mutex_unlock(&bdev->internal.mutex); 6096 } 6097 6098 struct spdk_bdev_histogram_ctx { 6099 spdk_bdev_histogram_status_cb cb_fn; 6100 void *cb_arg; 6101 struct spdk_bdev *bdev; 6102 int status; 6103 }; 6104 6105 static void 6106 bdev_histogram_disable_channel_cb(struct spdk_io_channel_iter *i, int status) 6107 { 6108 struct spdk_bdev_histogram_ctx *ctx = spdk_io_channel_iter_get_ctx(i); 6109 6110 pthread_mutex_lock(&ctx->bdev->internal.mutex); 6111 ctx->bdev->internal.histogram_in_progress = false; 6112 pthread_mutex_unlock(&ctx->bdev->internal.mutex); 6113 ctx->cb_fn(ctx->cb_arg, ctx->status); 6114 free(ctx); 6115 } 6116 6117 static void 6118 bdev_histogram_disable_channel(struct spdk_io_channel_iter *i) 6119 { 6120 struct spdk_io_channel *_ch = spdk_io_channel_iter_get_channel(i); 6121 struct spdk_bdev_channel *ch = spdk_io_channel_get_ctx(_ch); 6122 6123 if (ch->histogram != NULL) { 6124 spdk_histogram_data_free(ch->histogram); 6125 ch->histogram = NULL; 6126 } 6127 spdk_for_each_channel_continue(i, 0); 6128 } 6129 6130 static void 6131 bdev_histogram_enable_channel_cb(struct spdk_io_channel_iter *i, int status) 6132 { 6133 struct spdk_bdev_histogram_ctx *ctx = spdk_io_channel_iter_get_ctx(i); 6134 6135 if (status != 0) { 6136 ctx->status = status; 6137 ctx->bdev->internal.histogram_enabled = false; 6138 spdk_for_each_channel(__bdev_to_io_dev(ctx->bdev), bdev_histogram_disable_channel, ctx, 6139 bdev_histogram_disable_channel_cb); 6140 } else { 6141 pthread_mutex_lock(&ctx->bdev->internal.mutex); 6142 ctx->bdev->internal.histogram_in_progress = false; 6143 pthread_mutex_unlock(&ctx->bdev->internal.mutex); 6144 ctx->cb_fn(ctx->cb_arg, ctx->status); 6145 free(ctx); 6146 } 6147 } 6148 6149 static void 6150 bdev_histogram_enable_channel(struct spdk_io_channel_iter *i) 6151 { 6152 struct spdk_io_channel *_ch = spdk_io_channel_iter_get_channel(i); 6153 struct spdk_bdev_channel *ch = spdk_io_channel_get_ctx(_ch); 6154 int status = 0; 6155 6156 if (ch->histogram == NULL) { 6157 ch->histogram = spdk_histogram_data_alloc(); 6158 if (ch->histogram == NULL) { 6159 status = -ENOMEM; 6160 } 6161 } 6162 6163 spdk_for_each_channel_continue(i, status); 6164 } 6165 6166 void 6167 spdk_bdev_histogram_enable(struct spdk_bdev *bdev, spdk_bdev_histogram_status_cb cb_fn, 6168 void *cb_arg, bool enable) 6169 { 6170 struct spdk_bdev_histogram_ctx *ctx; 6171 6172 ctx = calloc(1, sizeof(struct spdk_bdev_histogram_ctx)); 6173 if (ctx == NULL) { 6174 cb_fn(cb_arg, -ENOMEM); 6175 return; 6176 } 6177 6178 ctx->bdev = bdev; 6179 ctx->status = 0; 6180 ctx->cb_fn = cb_fn; 6181 ctx->cb_arg = cb_arg; 6182 6183 pthread_mutex_lock(&bdev->internal.mutex); 6184 if (bdev->internal.histogram_in_progress) { 6185 pthread_mutex_unlock(&bdev->internal.mutex); 6186 free(ctx); 6187 cb_fn(cb_arg, -EAGAIN); 6188 return; 6189 } 6190 6191 bdev->internal.histogram_in_progress = true; 6192 pthread_mutex_unlock(&bdev->internal.mutex); 6193 6194 bdev->internal.histogram_enabled = enable; 6195 6196 if (enable) { 6197 /* Allocate histogram for each channel */ 6198 spdk_for_each_channel(__bdev_to_io_dev(bdev), bdev_histogram_enable_channel, ctx, 6199 bdev_histogram_enable_channel_cb); 6200 } else { 6201 spdk_for_each_channel(__bdev_to_io_dev(bdev), bdev_histogram_disable_channel, ctx, 6202 bdev_histogram_disable_channel_cb); 6203 } 6204 } 6205 6206 struct spdk_bdev_histogram_data_ctx { 6207 spdk_bdev_histogram_data_cb cb_fn; 6208 void *cb_arg; 6209 struct spdk_bdev *bdev; 6210 /** merged histogram data from all channels */ 6211 struct spdk_histogram_data *histogram; 6212 }; 6213 6214 static void 6215 bdev_histogram_get_channel_cb(struct spdk_io_channel_iter *i, int status) 6216 { 6217 struct spdk_bdev_histogram_data_ctx *ctx = spdk_io_channel_iter_get_ctx(i); 6218 6219 ctx->cb_fn(ctx->cb_arg, status, ctx->histogram); 6220 free(ctx); 6221 } 6222 6223 static void 6224 bdev_histogram_get_channel(struct spdk_io_channel_iter *i) 6225 { 6226 struct spdk_io_channel *_ch = spdk_io_channel_iter_get_channel(i); 6227 struct spdk_bdev_channel *ch = spdk_io_channel_get_ctx(_ch); 6228 struct spdk_bdev_histogram_data_ctx *ctx = spdk_io_channel_iter_get_ctx(i); 6229 int status = 0; 6230 6231 if (ch->histogram == NULL) { 6232 status = -EFAULT; 6233 } else { 6234 spdk_histogram_data_merge(ctx->histogram, ch->histogram); 6235 } 6236 6237 spdk_for_each_channel_continue(i, status); 6238 } 6239 6240 void 6241 spdk_bdev_histogram_get(struct spdk_bdev *bdev, struct spdk_histogram_data *histogram, 6242 spdk_bdev_histogram_data_cb cb_fn, 6243 void *cb_arg) 6244 { 6245 struct spdk_bdev_histogram_data_ctx *ctx; 6246 6247 ctx = calloc(1, sizeof(struct spdk_bdev_histogram_data_ctx)); 6248 if (ctx == NULL) { 6249 cb_fn(cb_arg, -ENOMEM, NULL); 6250 return; 6251 } 6252 6253 ctx->bdev = bdev; 6254 ctx->cb_fn = cb_fn; 6255 ctx->cb_arg = cb_arg; 6256 6257 ctx->histogram = histogram; 6258 6259 spdk_for_each_channel(__bdev_to_io_dev(bdev), bdev_histogram_get_channel, ctx, 6260 bdev_histogram_get_channel_cb); 6261 } 6262 6263 size_t 6264 spdk_bdev_get_media_events(struct spdk_bdev_desc *desc, struct spdk_bdev_media_event *events, 6265 size_t max_events) 6266 { 6267 struct media_event_entry *entry; 6268 size_t num_events = 0; 6269 6270 for (; num_events < max_events; ++num_events) { 6271 entry = TAILQ_FIRST(&desc->pending_media_events); 6272 if (entry == NULL) { 6273 break; 6274 } 6275 6276 events[num_events] = entry->event; 6277 TAILQ_REMOVE(&desc->pending_media_events, entry, tailq); 6278 TAILQ_INSERT_TAIL(&desc->free_media_events, entry, tailq); 6279 } 6280 6281 return num_events; 6282 } 6283 6284 int 6285 spdk_bdev_push_media_events(struct spdk_bdev *bdev, const struct spdk_bdev_media_event *events, 6286 size_t num_events) 6287 { 6288 struct spdk_bdev_desc *desc; 6289 struct media_event_entry *entry; 6290 size_t event_id; 6291 int rc = 0; 6292 6293 assert(bdev->media_events); 6294 6295 pthread_mutex_lock(&bdev->internal.mutex); 6296 TAILQ_FOREACH(desc, &bdev->internal.open_descs, link) { 6297 if (desc->write) { 6298 break; 6299 } 6300 } 6301 6302 if (desc == NULL || desc->media_events_buffer == NULL) { 6303 rc = -ENODEV; 6304 goto out; 6305 } 6306 6307 for (event_id = 0; event_id < num_events; ++event_id) { 6308 entry = TAILQ_FIRST(&desc->free_media_events); 6309 if (entry == NULL) { 6310 break; 6311 } 6312 6313 TAILQ_REMOVE(&desc->free_media_events, entry, tailq); 6314 TAILQ_INSERT_TAIL(&desc->pending_media_events, entry, tailq); 6315 entry->event = events[event_id]; 6316 } 6317 6318 rc = event_id; 6319 out: 6320 pthread_mutex_unlock(&bdev->internal.mutex); 6321 return rc; 6322 } 6323 6324 void 6325 spdk_bdev_notify_media_management(struct spdk_bdev *bdev) 6326 { 6327 struct spdk_bdev_desc *desc; 6328 6329 pthread_mutex_lock(&bdev->internal.mutex); 6330 TAILQ_FOREACH(desc, &bdev->internal.open_descs, link) { 6331 if (!TAILQ_EMPTY(&desc->pending_media_events)) { 6332 desc->callback.event_fn(SPDK_BDEV_EVENT_MEDIA_MANAGEMENT, bdev, 6333 desc->callback.ctx); 6334 } 6335 } 6336 pthread_mutex_unlock(&bdev->internal.mutex); 6337 } 6338 6339 struct locked_lba_range_ctx { 6340 struct lba_range range; 6341 struct spdk_bdev *bdev; 6342 struct lba_range *current_range; 6343 struct lba_range *owner_range; 6344 struct spdk_poller *poller; 6345 lock_range_cb cb_fn; 6346 void *cb_arg; 6347 }; 6348 6349 static void 6350 bdev_lock_error_cleanup_cb(struct spdk_io_channel_iter *i, int status) 6351 { 6352 struct locked_lba_range_ctx *ctx = spdk_io_channel_iter_get_ctx(i); 6353 6354 ctx->cb_fn(ctx->cb_arg, -ENOMEM); 6355 free(ctx); 6356 } 6357 6358 static void 6359 bdev_unlock_lba_range_get_channel(struct spdk_io_channel_iter *i); 6360 6361 static void 6362 bdev_lock_lba_range_cb(struct spdk_io_channel_iter *i, int status) 6363 { 6364 struct locked_lba_range_ctx *ctx = spdk_io_channel_iter_get_ctx(i); 6365 struct spdk_bdev *bdev = ctx->bdev; 6366 6367 if (status == -ENOMEM) { 6368 /* One of the channels could not allocate a range object. 6369 * So we have to go back and clean up any ranges that were 6370 * allocated successfully before we return error status to 6371 * the caller. We can reuse the unlock function to do that 6372 * clean up. 6373 */ 6374 spdk_for_each_channel(__bdev_to_io_dev(bdev), 6375 bdev_unlock_lba_range_get_channel, ctx, 6376 bdev_lock_error_cleanup_cb); 6377 return; 6378 } 6379 6380 /* All channels have locked this range and no I/O overlapping the range 6381 * are outstanding! Set the owner_ch for the range object for the 6382 * locking channel, so that this channel will know that it is allowed 6383 * to write to this range. 6384 */ 6385 ctx->owner_range->owner_ch = ctx->range.owner_ch; 6386 ctx->cb_fn(ctx->cb_arg, status); 6387 6388 /* Don't free the ctx here. Its range is in the bdev's global list of 6389 * locked ranges still, and will be removed and freed when this range 6390 * is later unlocked. 6391 */ 6392 } 6393 6394 static int 6395 bdev_lock_lba_range_check_io(void *_i) 6396 { 6397 struct spdk_io_channel_iter *i = _i; 6398 struct spdk_io_channel *_ch = spdk_io_channel_iter_get_channel(i); 6399 struct spdk_bdev_channel *ch = spdk_io_channel_get_ctx(_ch); 6400 struct locked_lba_range_ctx *ctx = spdk_io_channel_iter_get_ctx(i); 6401 struct lba_range *range = ctx->current_range; 6402 struct spdk_bdev_io *bdev_io; 6403 6404 spdk_poller_unregister(&ctx->poller); 6405 6406 /* The range is now in the locked_ranges, so no new IO can be submitted to this 6407 * range. But we need to wait until any outstanding IO overlapping with this range 6408 * are completed. 6409 */ 6410 TAILQ_FOREACH(bdev_io, &ch->io_submitted, internal.ch_link) { 6411 if (bdev_io_range_is_locked(bdev_io, range)) { 6412 ctx->poller = SPDK_POLLER_REGISTER(bdev_lock_lba_range_check_io, i, 100); 6413 return 1; 6414 } 6415 } 6416 6417 spdk_for_each_channel_continue(i, 0); 6418 return 1; 6419 } 6420 6421 static void 6422 bdev_lock_lba_range_get_channel(struct spdk_io_channel_iter *i) 6423 { 6424 struct spdk_io_channel *_ch = spdk_io_channel_iter_get_channel(i); 6425 struct spdk_bdev_channel *ch = spdk_io_channel_get_ctx(_ch); 6426 struct locked_lba_range_ctx *ctx = spdk_io_channel_iter_get_ctx(i); 6427 struct lba_range *range; 6428 6429 TAILQ_FOREACH(range, &ch->locked_ranges, tailq) { 6430 if (range->length == ctx->range.length && 6431 range->offset == ctx->range.offset && 6432 range->locked_ctx == ctx->range.locked_ctx) { 6433 /* This range already exists on this channel, so don't add 6434 * it again. This can happen when a new channel is created 6435 * while the for_each_channel operation is in progress. 6436 * Do not check for outstanding I/O in that case, since the 6437 * range was locked before any I/O could be submitted to the 6438 * new channel. 6439 */ 6440 spdk_for_each_channel_continue(i, 0); 6441 return; 6442 } 6443 } 6444 6445 range = calloc(1, sizeof(*range)); 6446 if (range == NULL) { 6447 spdk_for_each_channel_continue(i, -ENOMEM); 6448 return; 6449 } 6450 6451 range->length = ctx->range.length; 6452 range->offset = ctx->range.offset; 6453 range->locked_ctx = ctx->range.locked_ctx; 6454 ctx->current_range = range; 6455 if (ctx->range.owner_ch == ch) { 6456 /* This is the range object for the channel that will hold 6457 * the lock. Store it in the ctx object so that we can easily 6458 * set its owner_ch after the lock is finally acquired. 6459 */ 6460 ctx->owner_range = range; 6461 } 6462 TAILQ_INSERT_TAIL(&ch->locked_ranges, range, tailq); 6463 bdev_lock_lba_range_check_io(i); 6464 } 6465 6466 static void 6467 bdev_lock_lba_range_ctx(struct spdk_bdev *bdev, struct locked_lba_range_ctx *ctx) 6468 { 6469 assert(spdk_get_thread() == ctx->range.owner_ch->channel->thread); 6470 6471 /* We will add a copy of this range to each channel now. */ 6472 spdk_for_each_channel(__bdev_to_io_dev(bdev), bdev_lock_lba_range_get_channel, ctx, 6473 bdev_lock_lba_range_cb); 6474 } 6475 6476 static bool 6477 bdev_lba_range_overlaps_tailq(struct lba_range *range, lba_range_tailq_t *tailq) 6478 { 6479 struct lba_range *r; 6480 6481 TAILQ_FOREACH(r, tailq, tailq) { 6482 if (bdev_lba_range_overlapped(range, r)) { 6483 return true; 6484 } 6485 } 6486 return false; 6487 } 6488 6489 static int 6490 bdev_lock_lba_range(struct spdk_bdev_desc *desc, struct spdk_io_channel *_ch, 6491 uint64_t offset, uint64_t length, 6492 lock_range_cb cb_fn, void *cb_arg) 6493 { 6494 struct spdk_bdev *bdev = spdk_bdev_desc_get_bdev(desc); 6495 struct spdk_bdev_channel *ch = spdk_io_channel_get_ctx(_ch); 6496 struct locked_lba_range_ctx *ctx; 6497 6498 if (cb_arg == NULL) { 6499 SPDK_ERRLOG("cb_arg must not be NULL\n"); 6500 return -EINVAL; 6501 } 6502 6503 ctx = calloc(1, sizeof(*ctx)); 6504 if (ctx == NULL) { 6505 return -ENOMEM; 6506 } 6507 6508 ctx->range.offset = offset; 6509 ctx->range.length = length; 6510 ctx->range.owner_ch = ch; 6511 ctx->range.locked_ctx = cb_arg; 6512 ctx->bdev = bdev; 6513 ctx->cb_fn = cb_fn; 6514 ctx->cb_arg = cb_arg; 6515 6516 pthread_mutex_lock(&bdev->internal.mutex); 6517 if (bdev_lba_range_overlaps_tailq(&ctx->range, &bdev->internal.locked_ranges)) { 6518 /* There is an active lock overlapping with this range. 6519 * Put it on the pending list until this range no 6520 * longer overlaps with another. 6521 */ 6522 TAILQ_INSERT_TAIL(&bdev->internal.pending_locked_ranges, &ctx->range, tailq); 6523 } else { 6524 TAILQ_INSERT_TAIL(&bdev->internal.locked_ranges, &ctx->range, tailq); 6525 bdev_lock_lba_range_ctx(bdev, ctx); 6526 } 6527 pthread_mutex_unlock(&bdev->internal.mutex); 6528 return 0; 6529 } 6530 6531 static void 6532 bdev_lock_lba_range_ctx_msg(void *_ctx) 6533 { 6534 struct locked_lba_range_ctx *ctx = _ctx; 6535 6536 bdev_lock_lba_range_ctx(ctx->bdev, ctx); 6537 } 6538 6539 static void 6540 bdev_unlock_lba_range_cb(struct spdk_io_channel_iter *i, int status) 6541 { 6542 struct locked_lba_range_ctx *ctx = spdk_io_channel_iter_get_ctx(i); 6543 struct locked_lba_range_ctx *pending_ctx; 6544 struct spdk_bdev_channel *ch = ctx->range.owner_ch; 6545 struct spdk_bdev *bdev = ch->bdev; 6546 struct lba_range *range, *tmp; 6547 6548 pthread_mutex_lock(&bdev->internal.mutex); 6549 /* Check if there are any pending locked ranges that overlap with this range 6550 * that was just unlocked. If there are, check that it doesn't overlap with any 6551 * other locked ranges before calling bdev_lock_lba_range_ctx which will start 6552 * the lock process. 6553 */ 6554 TAILQ_FOREACH_SAFE(range, &bdev->internal.pending_locked_ranges, tailq, tmp) { 6555 if (bdev_lba_range_overlapped(range, &ctx->range) && 6556 !bdev_lba_range_overlaps_tailq(range, &bdev->internal.locked_ranges)) { 6557 TAILQ_REMOVE(&bdev->internal.pending_locked_ranges, range, tailq); 6558 pending_ctx = SPDK_CONTAINEROF(range, struct locked_lba_range_ctx, range); 6559 TAILQ_INSERT_TAIL(&bdev->internal.locked_ranges, range, tailq); 6560 spdk_thread_send_msg(pending_ctx->range.owner_ch->channel->thread, 6561 bdev_lock_lba_range_ctx_msg, pending_ctx); 6562 } 6563 } 6564 pthread_mutex_unlock(&bdev->internal.mutex); 6565 6566 ctx->cb_fn(ctx->cb_arg, status); 6567 free(ctx); 6568 } 6569 6570 static void 6571 bdev_unlock_lba_range_get_channel(struct spdk_io_channel_iter *i) 6572 { 6573 struct spdk_io_channel *_ch = spdk_io_channel_iter_get_channel(i); 6574 struct spdk_bdev_channel *ch = spdk_io_channel_get_ctx(_ch); 6575 struct locked_lba_range_ctx *ctx = spdk_io_channel_iter_get_ctx(i); 6576 TAILQ_HEAD(, spdk_bdev_io) io_locked; 6577 struct spdk_bdev_io *bdev_io; 6578 struct lba_range *range; 6579 6580 TAILQ_FOREACH(range, &ch->locked_ranges, tailq) { 6581 if (ctx->range.offset == range->offset && 6582 ctx->range.length == range->length && 6583 ctx->range.locked_ctx == range->locked_ctx) { 6584 TAILQ_REMOVE(&ch->locked_ranges, range, tailq); 6585 free(range); 6586 break; 6587 } 6588 } 6589 6590 /* Note: we should almost always be able to assert that the range specified 6591 * was found. But there are some very rare corner cases where a new channel 6592 * gets created simultaneously with a range unlock, where this function 6593 * would execute on that new channel and wouldn't have the range. 6594 * We also use this to clean up range allocations when a later allocation 6595 * fails in the locking path. 6596 * So we can't actually assert() here. 6597 */ 6598 6599 /* Swap the locked IO into a temporary list, and then try to submit them again. 6600 * We could hyper-optimize this to only resubmit locked I/O that overlap 6601 * with the range that was just unlocked, but this isn't a performance path so 6602 * we go for simplicity here. 6603 */ 6604 TAILQ_INIT(&io_locked); 6605 TAILQ_SWAP(&ch->io_locked, &io_locked, spdk_bdev_io, internal.ch_link); 6606 while (!TAILQ_EMPTY(&io_locked)) { 6607 bdev_io = TAILQ_FIRST(&io_locked); 6608 TAILQ_REMOVE(&io_locked, bdev_io, internal.ch_link); 6609 bdev_io_submit(bdev_io); 6610 } 6611 6612 spdk_for_each_channel_continue(i, 0); 6613 } 6614 6615 static int 6616 bdev_unlock_lba_range(struct spdk_bdev_desc *desc, struct spdk_io_channel *_ch, 6617 uint64_t offset, uint64_t length, 6618 lock_range_cb cb_fn, void *cb_arg) 6619 { 6620 struct spdk_bdev *bdev = spdk_bdev_desc_get_bdev(desc); 6621 struct spdk_bdev_channel *ch = spdk_io_channel_get_ctx(_ch); 6622 struct locked_lba_range_ctx *ctx; 6623 struct lba_range *range; 6624 bool range_found = false; 6625 6626 /* Let's make sure the specified channel actually has a lock on 6627 * the specified range. Note that the range must match exactly. 6628 */ 6629 TAILQ_FOREACH(range, &ch->locked_ranges, tailq) { 6630 if (range->offset == offset && range->length == length && 6631 range->owner_ch == ch && range->locked_ctx == cb_arg) { 6632 range_found = true; 6633 break; 6634 } 6635 } 6636 6637 if (!range_found) { 6638 return -EINVAL; 6639 } 6640 6641 pthread_mutex_lock(&bdev->internal.mutex); 6642 /* We confirmed that this channel has locked the specified range. To 6643 * start the unlock the process, we find the range in the bdev's locked_ranges 6644 * and remove it. This ensures new channels don't inherit the locked range. 6645 * Then we will send a message to each channel (including the one specified 6646 * here) to remove the range from its per-channel list. 6647 */ 6648 TAILQ_FOREACH(range, &bdev->internal.locked_ranges, tailq) { 6649 if (range->offset == offset && range->length == length && 6650 range->locked_ctx == cb_arg) { 6651 break; 6652 } 6653 } 6654 if (range == NULL) { 6655 assert(false); 6656 pthread_mutex_unlock(&bdev->internal.mutex); 6657 return -EINVAL; 6658 } 6659 TAILQ_REMOVE(&bdev->internal.locked_ranges, range, tailq); 6660 ctx = SPDK_CONTAINEROF(range, struct locked_lba_range_ctx, range); 6661 pthread_mutex_unlock(&bdev->internal.mutex); 6662 6663 ctx->cb_fn = cb_fn; 6664 ctx->cb_arg = cb_arg; 6665 6666 spdk_for_each_channel(__bdev_to_io_dev(bdev), bdev_unlock_lba_range_get_channel, ctx, 6667 bdev_unlock_lba_range_cb); 6668 return 0; 6669 } 6670 6671 SPDK_LOG_REGISTER_COMPONENT("bdev", SPDK_LOG_BDEV) 6672 6673 SPDK_TRACE_REGISTER_FN(bdev_trace, "bdev", TRACE_GROUP_BDEV) 6674 { 6675 spdk_trace_register_owner(OWNER_BDEV, 'b'); 6676 spdk_trace_register_object(OBJECT_BDEV_IO, 'i'); 6677 spdk_trace_register_description("BDEV_IO_START", TRACE_BDEV_IO_START, OWNER_BDEV, 6678 OBJECT_BDEV_IO, 1, 0, "type: "); 6679 spdk_trace_register_description("BDEV_IO_DONE", TRACE_BDEV_IO_DONE, OWNER_BDEV, 6680 OBJECT_BDEV_IO, 0, 0, ""); 6681 } 6682