1 /*- 2 * BSD LICENSE 3 * 4 * Copyright (c) Intel Corporation. All rights reserved. 5 * Copyright (c) 2019 Mellanox Technologies LTD. All rights reserved. 6 * 7 * Redistribution and use in source and binary forms, with or without 8 * modification, are permitted provided that the following conditions 9 * are met: 10 * 11 * * Redistributions of source code must retain the above copyright 12 * notice, this list of conditions and the following disclaimer. 13 * * Redistributions in binary form must reproduce the above copyright 14 * notice, this list of conditions and the following disclaimer in 15 * the documentation and/or other materials provided with the 16 * distribution. 17 * * Neither the name of Intel Corporation nor the names of its 18 * contributors may be used to endorse or promote products derived 19 * from this software without specific prior written permission. 20 * 21 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 22 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 23 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR 24 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT 25 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, 26 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT 27 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 28 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY 29 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 30 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE 31 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 32 */ 33 34 #include "spdk/stdinc.h" 35 36 #include "spdk/bdev.h" 37 #include "spdk/conf.h" 38 39 #include "spdk/config.h" 40 #include "spdk/env.h" 41 #include "spdk/thread.h" 42 #include "spdk/likely.h" 43 #include "spdk/queue.h" 44 #include "spdk/nvme_spec.h" 45 #include "spdk/scsi_spec.h" 46 #include "spdk/notify.h" 47 #include "spdk/util.h" 48 #include "spdk/trace.h" 49 50 #include "spdk/bdev_module.h" 51 #include "spdk_internal/log.h" 52 #include "spdk/string.h" 53 54 #include "bdev_internal.h" 55 56 #ifdef SPDK_CONFIG_VTUNE 57 #include "ittnotify.h" 58 #include "ittnotify_types.h" 59 int __itt_init_ittlib(const char *, __itt_group_id); 60 #endif 61 62 #define SPDK_BDEV_IO_POOL_SIZE (64 * 1024 - 1) 63 #define SPDK_BDEV_IO_CACHE_SIZE 256 64 #define SPDK_BDEV_AUTO_EXAMINE true 65 #define BUF_SMALL_POOL_SIZE 8191 66 #define BUF_LARGE_POOL_SIZE 1023 67 #define NOMEM_THRESHOLD_COUNT 8 68 #define ZERO_BUFFER_SIZE 0x100000 69 70 #define OWNER_BDEV 0x2 71 72 #define OBJECT_BDEV_IO 0x2 73 74 #define TRACE_GROUP_BDEV 0x3 75 #define TRACE_BDEV_IO_START SPDK_TPOINT_ID(TRACE_GROUP_BDEV, 0x0) 76 #define TRACE_BDEV_IO_DONE SPDK_TPOINT_ID(TRACE_GROUP_BDEV, 0x1) 77 78 #define SPDK_BDEV_QOS_TIMESLICE_IN_USEC 1000 79 #define SPDK_BDEV_QOS_MIN_IO_PER_TIMESLICE 1 80 #define SPDK_BDEV_QOS_MIN_BYTE_PER_TIMESLICE 512 81 #define SPDK_BDEV_QOS_MIN_IOS_PER_SEC 1000 82 #define SPDK_BDEV_QOS_MIN_BYTES_PER_SEC (1024 * 1024) 83 #define SPDK_BDEV_QOS_LIMIT_NOT_DEFINED UINT64_MAX 84 #define SPDK_BDEV_IO_POLL_INTERVAL_IN_MSEC 1000 85 86 #define SPDK_BDEV_POOL_ALIGNMENT 512 87 88 static const char *qos_conf_type[] = {"Limit_IOPS", 89 "Limit_BPS", "Limit_Read_BPS", "Limit_Write_BPS" 90 }; 91 static const char *qos_rpc_type[] = {"rw_ios_per_sec", 92 "rw_mbytes_per_sec", "r_mbytes_per_sec", "w_mbytes_per_sec" 93 }; 94 95 TAILQ_HEAD(spdk_bdev_list, spdk_bdev); 96 97 struct spdk_bdev_mgr { 98 struct spdk_mempool *bdev_io_pool; 99 100 struct spdk_mempool *buf_small_pool; 101 struct spdk_mempool *buf_large_pool; 102 103 void *zero_buffer; 104 105 TAILQ_HEAD(bdev_module_list, spdk_bdev_module) bdev_modules; 106 107 struct spdk_bdev_list bdevs; 108 109 bool init_complete; 110 bool module_init_complete; 111 112 pthread_mutex_t mutex; 113 114 #ifdef SPDK_CONFIG_VTUNE 115 __itt_domain *domain; 116 #endif 117 }; 118 119 static struct spdk_bdev_mgr g_bdev_mgr = { 120 .bdev_modules = TAILQ_HEAD_INITIALIZER(g_bdev_mgr.bdev_modules), 121 .bdevs = TAILQ_HEAD_INITIALIZER(g_bdev_mgr.bdevs), 122 .init_complete = false, 123 .module_init_complete = false, 124 .mutex = PTHREAD_MUTEX_INITIALIZER, 125 }; 126 127 typedef void (*lock_range_cb)(void *ctx, int status); 128 129 struct lba_range { 130 uint64_t offset; 131 uint64_t length; 132 void *locked_ctx; 133 struct spdk_bdev_channel *owner_ch; 134 TAILQ_ENTRY(lba_range) tailq; 135 }; 136 137 static struct spdk_bdev_opts g_bdev_opts = { 138 .bdev_io_pool_size = SPDK_BDEV_IO_POOL_SIZE, 139 .bdev_io_cache_size = SPDK_BDEV_IO_CACHE_SIZE, 140 .bdev_auto_examine = SPDK_BDEV_AUTO_EXAMINE, 141 }; 142 143 static spdk_bdev_init_cb g_init_cb_fn = NULL; 144 static void *g_init_cb_arg = NULL; 145 146 static spdk_bdev_fini_cb g_fini_cb_fn = NULL; 147 static void *g_fini_cb_arg = NULL; 148 static struct spdk_thread *g_fini_thread = NULL; 149 150 struct spdk_bdev_qos_limit { 151 /** IOs or bytes allowed per second (i.e., 1s). */ 152 uint64_t limit; 153 154 /** Remaining IOs or bytes allowed in current timeslice (e.g., 1ms). 155 * For remaining bytes, allowed to run negative if an I/O is submitted when 156 * some bytes are remaining, but the I/O is bigger than that amount. The 157 * excess will be deducted from the next timeslice. 158 */ 159 int64_t remaining_this_timeslice; 160 161 /** Minimum allowed IOs or bytes to be issued in one timeslice (e.g., 1ms). */ 162 uint32_t min_per_timeslice; 163 164 /** Maximum allowed IOs or bytes to be issued in one timeslice (e.g., 1ms). */ 165 uint32_t max_per_timeslice; 166 167 /** Function to check whether to queue the IO. */ 168 bool (*queue_io)(const struct spdk_bdev_qos_limit *limit, struct spdk_bdev_io *io); 169 170 /** Function to update for the submitted IO. */ 171 void (*update_quota)(struct spdk_bdev_qos_limit *limit, struct spdk_bdev_io *io); 172 }; 173 174 struct spdk_bdev_qos { 175 /** Types of structure of rate limits. */ 176 struct spdk_bdev_qos_limit rate_limits[SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES]; 177 178 /** The channel that all I/O are funneled through. */ 179 struct spdk_bdev_channel *ch; 180 181 /** The thread on which the poller is running. */ 182 struct spdk_thread *thread; 183 184 /** Queue of I/O waiting to be issued. */ 185 bdev_io_tailq_t queued; 186 187 /** Size of a timeslice in tsc ticks. */ 188 uint64_t timeslice_size; 189 190 /** Timestamp of start of last timeslice. */ 191 uint64_t last_timeslice; 192 193 /** Poller that processes queued I/O commands each time slice. */ 194 struct spdk_poller *poller; 195 }; 196 197 struct spdk_bdev_mgmt_channel { 198 bdev_io_stailq_t need_buf_small; 199 bdev_io_stailq_t need_buf_large; 200 201 /* 202 * Each thread keeps a cache of bdev_io - this allows 203 * bdev threads which are *not* DPDK threads to still 204 * benefit from a per-thread bdev_io cache. Without 205 * this, non-DPDK threads fetching from the mempool 206 * incur a cmpxchg on get and put. 207 */ 208 bdev_io_stailq_t per_thread_cache; 209 uint32_t per_thread_cache_count; 210 uint32_t bdev_io_cache_size; 211 212 TAILQ_HEAD(, spdk_bdev_shared_resource) shared_resources; 213 TAILQ_HEAD(, spdk_bdev_io_wait_entry) io_wait_queue; 214 }; 215 216 /* 217 * Per-module (or per-io_device) data. Multiple bdevs built on the same io_device 218 * will queue here their IO that awaits retry. It makes it possible to retry sending 219 * IO to one bdev after IO from other bdev completes. 220 */ 221 struct spdk_bdev_shared_resource { 222 /* The bdev management channel */ 223 struct spdk_bdev_mgmt_channel *mgmt_ch; 224 225 /* 226 * Count of I/O submitted to bdev module and waiting for completion. 227 * Incremented before submit_request() is called on an spdk_bdev_io. 228 */ 229 uint64_t io_outstanding; 230 231 /* 232 * Queue of IO awaiting retry because of a previous NOMEM status returned 233 * on this channel. 234 */ 235 bdev_io_tailq_t nomem_io; 236 237 /* 238 * Threshold which io_outstanding must drop to before retrying nomem_io. 239 */ 240 uint64_t nomem_threshold; 241 242 /* I/O channel allocated by a bdev module */ 243 struct spdk_io_channel *shared_ch; 244 245 /* Refcount of bdev channels using this resource */ 246 uint32_t ref; 247 248 TAILQ_ENTRY(spdk_bdev_shared_resource) link; 249 }; 250 251 #define BDEV_CH_RESET_IN_PROGRESS (1 << 0) 252 #define BDEV_CH_QOS_ENABLED (1 << 1) 253 254 struct spdk_bdev_channel { 255 struct spdk_bdev *bdev; 256 257 /* The channel for the underlying device */ 258 struct spdk_io_channel *channel; 259 260 /* Per io_device per thread data */ 261 struct spdk_bdev_shared_resource *shared_resource; 262 263 struct spdk_bdev_io_stat stat; 264 265 /* 266 * Count of I/O submitted to the underlying dev module through this channel 267 * and waiting for completion. 268 */ 269 uint64_t io_outstanding; 270 271 /* 272 * List of all submitted I/Os including I/O that are generated via splitting. 273 */ 274 bdev_io_tailq_t io_submitted; 275 276 /* 277 * List of spdk_bdev_io that are currently queued because they write to a locked 278 * LBA range. 279 */ 280 bdev_io_tailq_t io_locked; 281 282 uint32_t flags; 283 284 struct spdk_histogram_data *histogram; 285 286 #ifdef SPDK_CONFIG_VTUNE 287 uint64_t start_tsc; 288 uint64_t interval_tsc; 289 __itt_string_handle *handle; 290 struct spdk_bdev_io_stat prev_stat; 291 #endif 292 293 bdev_io_tailq_t queued_resets; 294 295 lba_range_tailq_t locked_ranges; 296 }; 297 298 struct media_event_entry { 299 struct spdk_bdev_media_event event; 300 TAILQ_ENTRY(media_event_entry) tailq; 301 }; 302 303 #define MEDIA_EVENT_POOL_SIZE 64 304 305 struct spdk_bdev_desc { 306 struct spdk_bdev *bdev; 307 struct spdk_thread *thread; 308 struct { 309 bool open_with_ext; 310 union { 311 spdk_bdev_remove_cb_t remove_fn; 312 spdk_bdev_event_cb_t event_fn; 313 }; 314 void *ctx; 315 } callback; 316 bool closed; 317 bool write; 318 pthread_mutex_t mutex; 319 uint32_t refs; 320 TAILQ_HEAD(, media_event_entry) pending_media_events; 321 TAILQ_HEAD(, media_event_entry) free_media_events; 322 struct media_event_entry *media_events_buffer; 323 TAILQ_ENTRY(spdk_bdev_desc) link; 324 325 uint64_t timeout_in_sec; 326 spdk_bdev_io_timeout_cb cb_fn; 327 void *cb_arg; 328 struct spdk_poller *io_timeout_poller; 329 }; 330 331 struct spdk_bdev_iostat_ctx { 332 struct spdk_bdev_io_stat *stat; 333 spdk_bdev_get_device_stat_cb cb; 334 void *cb_arg; 335 }; 336 337 struct set_qos_limit_ctx { 338 void (*cb_fn)(void *cb_arg, int status); 339 void *cb_arg; 340 struct spdk_bdev *bdev; 341 }; 342 343 #define __bdev_to_io_dev(bdev) (((char *)bdev) + 1) 344 #define __bdev_from_io_dev(io_dev) ((struct spdk_bdev *)(((char *)io_dev) - 1)) 345 346 static void bdev_write_zero_buffer_done(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg); 347 static void bdev_write_zero_buffer_next(void *_bdev_io); 348 349 static void bdev_enable_qos_msg(struct spdk_io_channel_iter *i); 350 static void bdev_enable_qos_done(struct spdk_io_channel_iter *i, int status); 351 352 static int 353 bdev_readv_blocks_with_md(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 354 struct iovec *iov, int iovcnt, void *md_buf, uint64_t offset_blocks, 355 uint64_t num_blocks, spdk_bdev_io_completion_cb cb, void *cb_arg); 356 static int 357 bdev_writev_blocks_with_md(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 358 struct iovec *iov, int iovcnt, void *md_buf, 359 uint64_t offset_blocks, uint64_t num_blocks, 360 spdk_bdev_io_completion_cb cb, void *cb_arg); 361 362 static int 363 bdev_lock_lba_range(struct spdk_bdev_desc *desc, struct spdk_io_channel *_ch, 364 uint64_t offset, uint64_t length, 365 lock_range_cb cb_fn, void *cb_arg); 366 367 static int 368 bdev_unlock_lba_range(struct spdk_bdev_desc *desc, struct spdk_io_channel *_ch, 369 uint64_t offset, uint64_t length, 370 lock_range_cb cb_fn, void *cb_arg); 371 372 static inline void bdev_io_complete(void *ctx); 373 374 static bool bdev_abort_queued_io(bdev_io_tailq_t *queue, struct spdk_bdev_io *bio_to_abort); 375 static bool bdev_abort_buf_io(bdev_io_stailq_t *queue, struct spdk_bdev_io *bio_to_abort); 376 377 void 378 spdk_bdev_get_opts(struct spdk_bdev_opts *opts) 379 { 380 *opts = g_bdev_opts; 381 } 382 383 int 384 spdk_bdev_set_opts(struct spdk_bdev_opts *opts) 385 { 386 uint32_t min_pool_size; 387 388 /* 389 * Add 1 to the thread count to account for the extra mgmt_ch that gets created during subsystem 390 * initialization. A second mgmt_ch will be created on the same thread when the application starts 391 * but before the deferred put_io_channel event is executed for the first mgmt_ch. 392 */ 393 min_pool_size = opts->bdev_io_cache_size * (spdk_thread_get_count() + 1); 394 if (opts->bdev_io_pool_size < min_pool_size) { 395 SPDK_ERRLOG("bdev_io_pool_size %" PRIu32 " is not compatible with bdev_io_cache_size %" PRIu32 396 " and %" PRIu32 " threads\n", opts->bdev_io_pool_size, opts->bdev_io_cache_size, 397 spdk_thread_get_count()); 398 SPDK_ERRLOG("bdev_io_pool_size must be at least %" PRIu32 "\n", min_pool_size); 399 return -1; 400 } 401 402 g_bdev_opts = *opts; 403 return 0; 404 } 405 406 struct spdk_bdev_examine_item { 407 char *name; 408 TAILQ_ENTRY(spdk_bdev_examine_item) link; 409 }; 410 411 TAILQ_HEAD(spdk_bdev_examine_allowlist, spdk_bdev_examine_item); 412 413 struct spdk_bdev_examine_allowlist g_bdev_examine_allowlist = TAILQ_HEAD_INITIALIZER( 414 g_bdev_examine_allowlist); 415 416 static inline bool 417 bdev_examine_allowlist_check(const char *name) 418 { 419 struct spdk_bdev_examine_item *item; 420 TAILQ_FOREACH(item, &g_bdev_examine_allowlist, link) { 421 if (strcmp(name, item->name) == 0) { 422 return true; 423 } 424 } 425 return false; 426 } 427 428 static inline bool 429 bdev_in_examine_allowlist(struct spdk_bdev *bdev) 430 { 431 struct spdk_bdev_alias *tmp; 432 if (bdev_examine_allowlist_check(bdev->name)) { 433 return true; 434 } 435 TAILQ_FOREACH(tmp, &bdev->aliases, tailq) { 436 if (bdev_examine_allowlist_check(tmp->alias)) { 437 return true; 438 } 439 } 440 return false; 441 } 442 443 static inline bool 444 bdev_ok_to_examine(struct spdk_bdev *bdev) 445 { 446 if (g_bdev_opts.bdev_auto_examine) { 447 return true; 448 } else { 449 return bdev_in_examine_allowlist(bdev); 450 } 451 } 452 453 static void 454 bdev_examine(struct spdk_bdev *bdev) 455 { 456 struct spdk_bdev_module *module; 457 uint32_t action; 458 459 TAILQ_FOREACH(module, &g_bdev_mgr.bdev_modules, internal.tailq) { 460 if (module->examine_config && bdev_ok_to_examine(bdev)) { 461 action = module->internal.action_in_progress; 462 module->internal.action_in_progress++; 463 module->examine_config(bdev); 464 if (action != module->internal.action_in_progress) { 465 SPDK_ERRLOG("examine_config for module %s did not call spdk_bdev_module_examine_done()\n", 466 module->name); 467 } 468 } 469 } 470 471 if (bdev->internal.claim_module && bdev_ok_to_examine(bdev)) { 472 if (bdev->internal.claim_module->examine_disk) { 473 bdev->internal.claim_module->internal.action_in_progress++; 474 bdev->internal.claim_module->examine_disk(bdev); 475 } 476 return; 477 } 478 479 TAILQ_FOREACH(module, &g_bdev_mgr.bdev_modules, internal.tailq) { 480 if (module->examine_disk && bdev_ok_to_examine(bdev)) { 481 module->internal.action_in_progress++; 482 module->examine_disk(bdev); 483 } 484 } 485 } 486 487 struct spdk_bdev * 488 spdk_bdev_first(void) 489 { 490 struct spdk_bdev *bdev; 491 492 bdev = TAILQ_FIRST(&g_bdev_mgr.bdevs); 493 if (bdev) { 494 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Starting bdev iteration at %s\n", bdev->name); 495 } 496 497 return bdev; 498 } 499 500 struct spdk_bdev * 501 spdk_bdev_next(struct spdk_bdev *prev) 502 { 503 struct spdk_bdev *bdev; 504 505 bdev = TAILQ_NEXT(prev, internal.link); 506 if (bdev) { 507 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Continuing bdev iteration at %s\n", bdev->name); 508 } 509 510 return bdev; 511 } 512 513 static struct spdk_bdev * 514 _bdev_next_leaf(struct spdk_bdev *bdev) 515 { 516 while (bdev != NULL) { 517 if (bdev->internal.claim_module == NULL) { 518 return bdev; 519 } else { 520 bdev = TAILQ_NEXT(bdev, internal.link); 521 } 522 } 523 524 return bdev; 525 } 526 527 struct spdk_bdev * 528 spdk_bdev_first_leaf(void) 529 { 530 struct spdk_bdev *bdev; 531 532 bdev = _bdev_next_leaf(TAILQ_FIRST(&g_bdev_mgr.bdevs)); 533 534 if (bdev) { 535 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Starting bdev iteration at %s\n", bdev->name); 536 } 537 538 return bdev; 539 } 540 541 struct spdk_bdev * 542 spdk_bdev_next_leaf(struct spdk_bdev *prev) 543 { 544 struct spdk_bdev *bdev; 545 546 bdev = _bdev_next_leaf(TAILQ_NEXT(prev, internal.link)); 547 548 if (bdev) { 549 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Continuing bdev iteration at %s\n", bdev->name); 550 } 551 552 return bdev; 553 } 554 555 struct spdk_bdev * 556 spdk_bdev_get_by_name(const char *bdev_name) 557 { 558 struct spdk_bdev_alias *tmp; 559 struct spdk_bdev *bdev = spdk_bdev_first(); 560 561 while (bdev != NULL) { 562 if (strcmp(bdev_name, bdev->name) == 0) { 563 return bdev; 564 } 565 566 TAILQ_FOREACH(tmp, &bdev->aliases, tailq) { 567 if (strcmp(bdev_name, tmp->alias) == 0) { 568 return bdev; 569 } 570 } 571 572 bdev = spdk_bdev_next(bdev); 573 } 574 575 return NULL; 576 } 577 578 void 579 spdk_bdev_io_set_buf(struct spdk_bdev_io *bdev_io, void *buf, size_t len) 580 { 581 struct iovec *iovs; 582 583 if (bdev_io->u.bdev.iovs == NULL) { 584 bdev_io->u.bdev.iovs = &bdev_io->iov; 585 bdev_io->u.bdev.iovcnt = 1; 586 } 587 588 iovs = bdev_io->u.bdev.iovs; 589 590 assert(iovs != NULL); 591 assert(bdev_io->u.bdev.iovcnt >= 1); 592 593 iovs[0].iov_base = buf; 594 iovs[0].iov_len = len; 595 } 596 597 void 598 spdk_bdev_io_set_md_buf(struct spdk_bdev_io *bdev_io, void *md_buf, size_t len) 599 { 600 assert((len / spdk_bdev_get_md_size(bdev_io->bdev)) >= bdev_io->u.bdev.num_blocks); 601 bdev_io->u.bdev.md_buf = md_buf; 602 } 603 604 static bool 605 _is_buf_allocated(const struct iovec *iovs) 606 { 607 if (iovs == NULL) { 608 return false; 609 } 610 611 return iovs[0].iov_base != NULL; 612 } 613 614 static bool 615 _are_iovs_aligned(struct iovec *iovs, int iovcnt, uint32_t alignment) 616 { 617 int i; 618 uintptr_t iov_base; 619 620 if (spdk_likely(alignment == 1)) { 621 return true; 622 } 623 624 for (i = 0; i < iovcnt; i++) { 625 iov_base = (uintptr_t)iovs[i].iov_base; 626 if ((iov_base & (alignment - 1)) != 0) { 627 return false; 628 } 629 } 630 631 return true; 632 } 633 634 static void 635 _copy_iovs_to_buf(void *buf, size_t buf_len, struct iovec *iovs, int iovcnt) 636 { 637 int i; 638 size_t len; 639 640 for (i = 0; i < iovcnt; i++) { 641 len = spdk_min(iovs[i].iov_len, buf_len); 642 memcpy(buf, iovs[i].iov_base, len); 643 buf += len; 644 buf_len -= len; 645 } 646 } 647 648 static void 649 _copy_buf_to_iovs(struct iovec *iovs, int iovcnt, void *buf, size_t buf_len) 650 { 651 int i; 652 size_t len; 653 654 for (i = 0; i < iovcnt; i++) { 655 len = spdk_min(iovs[i].iov_len, buf_len); 656 memcpy(iovs[i].iov_base, buf, len); 657 buf += len; 658 buf_len -= len; 659 } 660 } 661 662 static void 663 _bdev_io_set_bounce_buf(struct spdk_bdev_io *bdev_io, void *buf, size_t len) 664 { 665 /* save original iovec */ 666 bdev_io->internal.orig_iovs = bdev_io->u.bdev.iovs; 667 bdev_io->internal.orig_iovcnt = bdev_io->u.bdev.iovcnt; 668 /* set bounce iov */ 669 bdev_io->u.bdev.iovs = &bdev_io->internal.bounce_iov; 670 bdev_io->u.bdev.iovcnt = 1; 671 /* set bounce buffer for this operation */ 672 bdev_io->u.bdev.iovs[0].iov_base = buf; 673 bdev_io->u.bdev.iovs[0].iov_len = len; 674 /* if this is write path, copy data from original buffer to bounce buffer */ 675 if (bdev_io->type == SPDK_BDEV_IO_TYPE_WRITE) { 676 _copy_iovs_to_buf(buf, len, bdev_io->internal.orig_iovs, bdev_io->internal.orig_iovcnt); 677 } 678 } 679 680 static void 681 _bdev_io_set_bounce_md_buf(struct spdk_bdev_io *bdev_io, void *md_buf, size_t len) 682 { 683 /* save original md_buf */ 684 bdev_io->internal.orig_md_buf = bdev_io->u.bdev.md_buf; 685 /* set bounce md_buf */ 686 bdev_io->u.bdev.md_buf = md_buf; 687 688 if (bdev_io->type == SPDK_BDEV_IO_TYPE_WRITE) { 689 memcpy(md_buf, bdev_io->internal.orig_md_buf, len); 690 } 691 } 692 693 static void 694 bdev_io_get_buf_complete(struct spdk_bdev_io *bdev_io, void *buf, bool status) 695 { 696 struct spdk_io_channel *ch = spdk_bdev_io_get_io_channel(bdev_io); 697 698 if (spdk_unlikely(bdev_io->internal.get_aux_buf_cb != NULL)) { 699 bdev_io->internal.get_aux_buf_cb(ch, bdev_io, buf); 700 bdev_io->internal.get_aux_buf_cb = NULL; 701 } else { 702 assert(bdev_io->internal.get_buf_cb != NULL); 703 bdev_io->internal.buf = buf; 704 bdev_io->internal.get_buf_cb(ch, bdev_io, status); 705 bdev_io->internal.get_buf_cb = NULL; 706 } 707 } 708 709 static void 710 _bdev_io_set_buf(struct spdk_bdev_io *bdev_io, void *buf, uint64_t len) 711 { 712 struct spdk_bdev *bdev = bdev_io->bdev; 713 bool buf_allocated; 714 uint64_t md_len, alignment; 715 void *aligned_buf; 716 717 if (spdk_unlikely(bdev_io->internal.get_aux_buf_cb != NULL)) { 718 bdev_io_get_buf_complete(bdev_io, buf, true); 719 return; 720 } 721 722 alignment = spdk_bdev_get_buf_align(bdev); 723 buf_allocated = _is_buf_allocated(bdev_io->u.bdev.iovs); 724 aligned_buf = (void *)(((uintptr_t)buf + (alignment - 1)) & ~(alignment - 1)); 725 726 if (buf_allocated) { 727 _bdev_io_set_bounce_buf(bdev_io, aligned_buf, len); 728 } else { 729 spdk_bdev_io_set_buf(bdev_io, aligned_buf, len); 730 } 731 732 if (spdk_bdev_is_md_separate(bdev)) { 733 aligned_buf = (char *)aligned_buf + len; 734 md_len = bdev_io->u.bdev.num_blocks * bdev->md_len; 735 736 assert(((uintptr_t)aligned_buf & (alignment - 1)) == 0); 737 738 if (bdev_io->u.bdev.md_buf != NULL) { 739 _bdev_io_set_bounce_md_buf(bdev_io, aligned_buf, md_len); 740 } else { 741 spdk_bdev_io_set_md_buf(bdev_io, aligned_buf, md_len); 742 } 743 } 744 bdev_io_get_buf_complete(bdev_io, buf, true); 745 } 746 747 static void 748 _bdev_io_put_buf(struct spdk_bdev_io *bdev_io, void *buf, uint64_t buf_len) 749 { 750 struct spdk_bdev *bdev = bdev_io->bdev; 751 struct spdk_mempool *pool; 752 struct spdk_bdev_io *tmp; 753 bdev_io_stailq_t *stailq; 754 struct spdk_bdev_mgmt_channel *ch; 755 uint64_t md_len, alignment; 756 757 md_len = spdk_bdev_is_md_separate(bdev) ? bdev_io->u.bdev.num_blocks * bdev->md_len : 0; 758 alignment = spdk_bdev_get_buf_align(bdev); 759 ch = bdev_io->internal.ch->shared_resource->mgmt_ch; 760 761 if (buf_len + alignment + md_len <= SPDK_BDEV_BUF_SIZE_WITH_MD(SPDK_BDEV_SMALL_BUF_MAX_SIZE) + 762 SPDK_BDEV_POOL_ALIGNMENT) { 763 pool = g_bdev_mgr.buf_small_pool; 764 stailq = &ch->need_buf_small; 765 } else { 766 pool = g_bdev_mgr.buf_large_pool; 767 stailq = &ch->need_buf_large; 768 } 769 770 if (STAILQ_EMPTY(stailq)) { 771 spdk_mempool_put(pool, buf); 772 } else { 773 tmp = STAILQ_FIRST(stailq); 774 STAILQ_REMOVE_HEAD(stailq, internal.buf_link); 775 _bdev_io_set_buf(tmp, buf, tmp->internal.buf_len); 776 } 777 } 778 779 static void 780 bdev_io_put_buf(struct spdk_bdev_io *bdev_io) 781 { 782 assert(bdev_io->internal.buf != NULL); 783 _bdev_io_put_buf(bdev_io, bdev_io->internal.buf, bdev_io->internal.buf_len); 784 bdev_io->internal.buf = NULL; 785 } 786 787 void 788 spdk_bdev_io_put_aux_buf(struct spdk_bdev_io *bdev_io, void *buf) 789 { 790 uint64_t len = bdev_io->u.bdev.num_blocks * bdev_io->bdev->blocklen; 791 792 assert(buf != NULL); 793 _bdev_io_put_buf(bdev_io, buf, len); 794 } 795 796 static void 797 _bdev_io_unset_bounce_buf(struct spdk_bdev_io *bdev_io) 798 { 799 if (spdk_likely(bdev_io->internal.orig_iovcnt == 0)) { 800 assert(bdev_io->internal.orig_md_buf == NULL); 801 return; 802 } 803 804 /* if this is read path, copy data from bounce buffer to original buffer */ 805 if (bdev_io->type == SPDK_BDEV_IO_TYPE_READ && 806 bdev_io->internal.status == SPDK_BDEV_IO_STATUS_SUCCESS) { 807 _copy_buf_to_iovs(bdev_io->internal.orig_iovs, 808 bdev_io->internal.orig_iovcnt, 809 bdev_io->internal.bounce_iov.iov_base, 810 bdev_io->internal.bounce_iov.iov_len); 811 } 812 /* set original buffer for this io */ 813 bdev_io->u.bdev.iovcnt = bdev_io->internal.orig_iovcnt; 814 bdev_io->u.bdev.iovs = bdev_io->internal.orig_iovs; 815 /* disable bouncing buffer for this io */ 816 bdev_io->internal.orig_iovcnt = 0; 817 bdev_io->internal.orig_iovs = NULL; 818 819 /* do the same for metadata buffer */ 820 if (spdk_unlikely(bdev_io->internal.orig_md_buf != NULL)) { 821 assert(spdk_bdev_is_md_separate(bdev_io->bdev)); 822 823 if (bdev_io->type == SPDK_BDEV_IO_TYPE_READ && 824 bdev_io->internal.status == SPDK_BDEV_IO_STATUS_SUCCESS) { 825 memcpy(bdev_io->internal.orig_md_buf, bdev_io->u.bdev.md_buf, 826 bdev_io->u.bdev.num_blocks * spdk_bdev_get_md_size(bdev_io->bdev)); 827 } 828 829 bdev_io->u.bdev.md_buf = bdev_io->internal.orig_md_buf; 830 bdev_io->internal.orig_md_buf = NULL; 831 } 832 833 /* We want to free the bounce buffer here since we know we're done with it (as opposed 834 * to waiting for the conditional free of internal.buf in spdk_bdev_free_io()). 835 */ 836 bdev_io_put_buf(bdev_io); 837 } 838 839 static void 840 bdev_io_get_buf(struct spdk_bdev_io *bdev_io, uint64_t len) 841 { 842 struct spdk_bdev *bdev = bdev_io->bdev; 843 struct spdk_mempool *pool; 844 bdev_io_stailq_t *stailq; 845 struct spdk_bdev_mgmt_channel *mgmt_ch; 846 uint64_t alignment, md_len; 847 void *buf; 848 849 alignment = spdk_bdev_get_buf_align(bdev); 850 md_len = spdk_bdev_is_md_separate(bdev) ? bdev_io->u.bdev.num_blocks * bdev->md_len : 0; 851 852 if (len + alignment + md_len > SPDK_BDEV_BUF_SIZE_WITH_MD(SPDK_BDEV_LARGE_BUF_MAX_SIZE) + 853 SPDK_BDEV_POOL_ALIGNMENT) { 854 SPDK_ERRLOG("Length + alignment %" PRIu64 " is larger than allowed\n", 855 len + alignment); 856 bdev_io_get_buf_complete(bdev_io, NULL, false); 857 return; 858 } 859 860 mgmt_ch = bdev_io->internal.ch->shared_resource->mgmt_ch; 861 862 bdev_io->internal.buf_len = len; 863 864 if (len + alignment + md_len <= SPDK_BDEV_BUF_SIZE_WITH_MD(SPDK_BDEV_SMALL_BUF_MAX_SIZE) + 865 SPDK_BDEV_POOL_ALIGNMENT) { 866 pool = g_bdev_mgr.buf_small_pool; 867 stailq = &mgmt_ch->need_buf_small; 868 } else { 869 pool = g_bdev_mgr.buf_large_pool; 870 stailq = &mgmt_ch->need_buf_large; 871 } 872 873 buf = spdk_mempool_get(pool); 874 if (!buf) { 875 STAILQ_INSERT_TAIL(stailq, bdev_io, internal.buf_link); 876 } else { 877 _bdev_io_set_buf(bdev_io, buf, len); 878 } 879 } 880 881 void 882 spdk_bdev_io_get_buf(struct spdk_bdev_io *bdev_io, spdk_bdev_io_get_buf_cb cb, uint64_t len) 883 { 884 struct spdk_bdev *bdev = bdev_io->bdev; 885 uint64_t alignment; 886 887 assert(cb != NULL); 888 bdev_io->internal.get_buf_cb = cb; 889 890 alignment = spdk_bdev_get_buf_align(bdev); 891 892 if (_is_buf_allocated(bdev_io->u.bdev.iovs) && 893 _are_iovs_aligned(bdev_io->u.bdev.iovs, bdev_io->u.bdev.iovcnt, alignment)) { 894 /* Buffer already present and aligned */ 895 cb(spdk_bdev_io_get_io_channel(bdev_io), bdev_io, true); 896 return; 897 } 898 899 bdev_io_get_buf(bdev_io, len); 900 } 901 902 void 903 spdk_bdev_io_get_aux_buf(struct spdk_bdev_io *bdev_io, spdk_bdev_io_get_aux_buf_cb cb) 904 { 905 uint64_t len = bdev_io->u.bdev.num_blocks * bdev_io->bdev->blocklen; 906 907 assert(cb != NULL); 908 assert(bdev_io->internal.get_aux_buf_cb == NULL); 909 bdev_io->internal.get_aux_buf_cb = cb; 910 bdev_io_get_buf(bdev_io, len); 911 } 912 913 static int 914 bdev_module_get_max_ctx_size(void) 915 { 916 struct spdk_bdev_module *bdev_module; 917 int max_bdev_module_size = 0; 918 919 TAILQ_FOREACH(bdev_module, &g_bdev_mgr.bdev_modules, internal.tailq) { 920 if (bdev_module->get_ctx_size && bdev_module->get_ctx_size() > max_bdev_module_size) { 921 max_bdev_module_size = bdev_module->get_ctx_size(); 922 } 923 } 924 925 return max_bdev_module_size; 926 } 927 928 void 929 spdk_bdev_config_text(FILE *fp) 930 { 931 struct spdk_bdev_module *bdev_module; 932 933 TAILQ_FOREACH(bdev_module, &g_bdev_mgr.bdev_modules, internal.tailq) { 934 if (bdev_module->config_text) { 935 bdev_module->config_text(fp); 936 } 937 } 938 } 939 940 static void 941 bdev_qos_config_json(struct spdk_bdev *bdev, struct spdk_json_write_ctx *w) 942 { 943 int i; 944 struct spdk_bdev_qos *qos = bdev->internal.qos; 945 uint64_t limits[SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES]; 946 947 if (!qos) { 948 return; 949 } 950 951 spdk_bdev_get_qos_rate_limits(bdev, limits); 952 953 spdk_json_write_object_begin(w); 954 spdk_json_write_named_string(w, "method", "bdev_set_qos_limit"); 955 956 spdk_json_write_named_object_begin(w, "params"); 957 spdk_json_write_named_string(w, "name", bdev->name); 958 for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) { 959 if (limits[i] > 0) { 960 spdk_json_write_named_uint64(w, qos_rpc_type[i], limits[i]); 961 } 962 } 963 spdk_json_write_object_end(w); 964 965 spdk_json_write_object_end(w); 966 } 967 968 void 969 spdk_bdev_subsystem_config_json(struct spdk_json_write_ctx *w) 970 { 971 struct spdk_bdev_module *bdev_module; 972 struct spdk_bdev *bdev; 973 974 assert(w != NULL); 975 976 spdk_json_write_array_begin(w); 977 978 spdk_json_write_object_begin(w); 979 spdk_json_write_named_string(w, "method", "bdev_set_options"); 980 spdk_json_write_named_object_begin(w, "params"); 981 spdk_json_write_named_uint32(w, "bdev_io_pool_size", g_bdev_opts.bdev_io_pool_size); 982 spdk_json_write_named_uint32(w, "bdev_io_cache_size", g_bdev_opts.bdev_io_cache_size); 983 spdk_json_write_named_bool(w, "bdev_auto_examine", g_bdev_opts.bdev_auto_examine); 984 spdk_json_write_object_end(w); 985 spdk_json_write_object_end(w); 986 987 TAILQ_FOREACH(bdev_module, &g_bdev_mgr.bdev_modules, internal.tailq) { 988 if (bdev_module->config_json) { 989 bdev_module->config_json(w); 990 } 991 } 992 993 pthread_mutex_lock(&g_bdev_mgr.mutex); 994 995 TAILQ_FOREACH(bdev, &g_bdev_mgr.bdevs, internal.link) { 996 if (bdev->fn_table->write_config_json) { 997 bdev->fn_table->write_config_json(bdev, w); 998 } 999 1000 bdev_qos_config_json(bdev, w); 1001 } 1002 1003 pthread_mutex_unlock(&g_bdev_mgr.mutex); 1004 1005 spdk_json_write_array_end(w); 1006 } 1007 1008 static int 1009 bdev_mgmt_channel_create(void *io_device, void *ctx_buf) 1010 { 1011 struct spdk_bdev_mgmt_channel *ch = ctx_buf; 1012 struct spdk_bdev_io *bdev_io; 1013 uint32_t i; 1014 1015 STAILQ_INIT(&ch->need_buf_small); 1016 STAILQ_INIT(&ch->need_buf_large); 1017 1018 STAILQ_INIT(&ch->per_thread_cache); 1019 ch->bdev_io_cache_size = g_bdev_opts.bdev_io_cache_size; 1020 1021 /* Pre-populate bdev_io cache to ensure this thread cannot be starved. */ 1022 ch->per_thread_cache_count = 0; 1023 for (i = 0; i < ch->bdev_io_cache_size; i++) { 1024 bdev_io = spdk_mempool_get(g_bdev_mgr.bdev_io_pool); 1025 assert(bdev_io != NULL); 1026 ch->per_thread_cache_count++; 1027 STAILQ_INSERT_HEAD(&ch->per_thread_cache, bdev_io, internal.buf_link); 1028 } 1029 1030 TAILQ_INIT(&ch->shared_resources); 1031 TAILQ_INIT(&ch->io_wait_queue); 1032 1033 return 0; 1034 } 1035 1036 static void 1037 bdev_mgmt_channel_destroy(void *io_device, void *ctx_buf) 1038 { 1039 struct spdk_bdev_mgmt_channel *ch = ctx_buf; 1040 struct spdk_bdev_io *bdev_io; 1041 1042 if (!STAILQ_EMPTY(&ch->need_buf_small) || !STAILQ_EMPTY(&ch->need_buf_large)) { 1043 SPDK_ERRLOG("Pending I/O list wasn't empty on mgmt channel free\n"); 1044 } 1045 1046 if (!TAILQ_EMPTY(&ch->shared_resources)) { 1047 SPDK_ERRLOG("Module channel list wasn't empty on mgmt channel free\n"); 1048 } 1049 1050 while (!STAILQ_EMPTY(&ch->per_thread_cache)) { 1051 bdev_io = STAILQ_FIRST(&ch->per_thread_cache); 1052 STAILQ_REMOVE_HEAD(&ch->per_thread_cache, internal.buf_link); 1053 ch->per_thread_cache_count--; 1054 spdk_mempool_put(g_bdev_mgr.bdev_io_pool, (void *)bdev_io); 1055 } 1056 1057 assert(ch->per_thread_cache_count == 0); 1058 } 1059 1060 static void 1061 bdev_init_complete(int rc) 1062 { 1063 spdk_bdev_init_cb cb_fn = g_init_cb_fn; 1064 void *cb_arg = g_init_cb_arg; 1065 struct spdk_bdev_module *m; 1066 1067 g_bdev_mgr.init_complete = true; 1068 g_init_cb_fn = NULL; 1069 g_init_cb_arg = NULL; 1070 1071 /* 1072 * For modules that need to know when subsystem init is complete, 1073 * inform them now. 1074 */ 1075 if (rc == 0) { 1076 TAILQ_FOREACH(m, &g_bdev_mgr.bdev_modules, internal.tailq) { 1077 if (m->init_complete) { 1078 m->init_complete(); 1079 } 1080 } 1081 } 1082 1083 cb_fn(cb_arg, rc); 1084 } 1085 1086 static void 1087 bdev_module_action_complete(void) 1088 { 1089 struct spdk_bdev_module *m; 1090 1091 /* 1092 * Don't finish bdev subsystem initialization if 1093 * module pre-initialization is still in progress, or 1094 * the subsystem been already initialized. 1095 */ 1096 if (!g_bdev_mgr.module_init_complete || g_bdev_mgr.init_complete) { 1097 return; 1098 } 1099 1100 /* 1101 * Check all bdev modules for inits/examinations in progress. If any 1102 * exist, return immediately since we cannot finish bdev subsystem 1103 * initialization until all are completed. 1104 */ 1105 TAILQ_FOREACH(m, &g_bdev_mgr.bdev_modules, internal.tailq) { 1106 if (m->internal.action_in_progress > 0) { 1107 return; 1108 } 1109 } 1110 1111 /* 1112 * Modules already finished initialization - now that all 1113 * the bdev modules have finished their asynchronous I/O 1114 * processing, the entire bdev layer can be marked as complete. 1115 */ 1116 bdev_init_complete(0); 1117 } 1118 1119 static void 1120 bdev_module_action_done(struct spdk_bdev_module *module) 1121 { 1122 assert(module->internal.action_in_progress > 0); 1123 module->internal.action_in_progress--; 1124 bdev_module_action_complete(); 1125 } 1126 1127 void 1128 spdk_bdev_module_init_done(struct spdk_bdev_module *module) 1129 { 1130 bdev_module_action_done(module); 1131 } 1132 1133 void 1134 spdk_bdev_module_examine_done(struct spdk_bdev_module *module) 1135 { 1136 bdev_module_action_done(module); 1137 } 1138 1139 /** The last initialized bdev module */ 1140 static struct spdk_bdev_module *g_resume_bdev_module = NULL; 1141 1142 static void 1143 bdev_init_failed(void *cb_arg) 1144 { 1145 struct spdk_bdev_module *module = cb_arg; 1146 1147 module->internal.action_in_progress--; 1148 bdev_init_complete(-1); 1149 } 1150 1151 static int 1152 bdev_modules_init(void) 1153 { 1154 struct spdk_bdev_module *module; 1155 int rc = 0; 1156 1157 TAILQ_FOREACH(module, &g_bdev_mgr.bdev_modules, internal.tailq) { 1158 g_resume_bdev_module = module; 1159 if (module->async_init) { 1160 module->internal.action_in_progress = 1; 1161 } 1162 rc = module->module_init(); 1163 if (rc != 0) { 1164 /* Bump action_in_progress to prevent other modules from completion of modules_init 1165 * Send message to defer application shutdown until resources are cleaned up */ 1166 module->internal.action_in_progress = 1; 1167 spdk_thread_send_msg(spdk_get_thread(), bdev_init_failed, module); 1168 return rc; 1169 } 1170 } 1171 1172 g_resume_bdev_module = NULL; 1173 return 0; 1174 } 1175 1176 void 1177 spdk_bdev_initialize(spdk_bdev_init_cb cb_fn, void *cb_arg) 1178 { 1179 struct spdk_conf_section *sp; 1180 struct spdk_bdev_opts bdev_opts; 1181 int32_t bdev_io_pool_size, bdev_io_cache_size; 1182 int cache_size; 1183 int rc = 0; 1184 char mempool_name[32]; 1185 1186 assert(cb_fn != NULL); 1187 1188 sp = spdk_conf_find_section(NULL, "Bdev"); 1189 if (sp != NULL) { 1190 spdk_bdev_get_opts(&bdev_opts); 1191 1192 bdev_io_pool_size = spdk_conf_section_get_intval(sp, "BdevIoPoolSize"); 1193 if (bdev_io_pool_size >= 0) { 1194 bdev_opts.bdev_io_pool_size = bdev_io_pool_size; 1195 } 1196 1197 bdev_io_cache_size = spdk_conf_section_get_intval(sp, "BdevIoCacheSize"); 1198 if (bdev_io_cache_size >= 0) { 1199 bdev_opts.bdev_io_cache_size = bdev_io_cache_size; 1200 } 1201 1202 if (spdk_bdev_set_opts(&bdev_opts)) { 1203 bdev_init_complete(-1); 1204 return; 1205 } 1206 1207 assert(memcmp(&bdev_opts, &g_bdev_opts, sizeof(bdev_opts)) == 0); 1208 } 1209 1210 g_init_cb_fn = cb_fn; 1211 g_init_cb_arg = cb_arg; 1212 1213 spdk_notify_type_register("bdev_register"); 1214 spdk_notify_type_register("bdev_unregister"); 1215 1216 snprintf(mempool_name, sizeof(mempool_name), "bdev_io_%d", getpid()); 1217 1218 g_bdev_mgr.bdev_io_pool = spdk_mempool_create(mempool_name, 1219 g_bdev_opts.bdev_io_pool_size, 1220 sizeof(struct spdk_bdev_io) + 1221 bdev_module_get_max_ctx_size(), 1222 0, 1223 SPDK_ENV_SOCKET_ID_ANY); 1224 1225 if (g_bdev_mgr.bdev_io_pool == NULL) { 1226 SPDK_ERRLOG("could not allocate spdk_bdev_io pool\n"); 1227 bdev_init_complete(-1); 1228 return; 1229 } 1230 1231 /** 1232 * Ensure no more than half of the total buffers end up local caches, by 1233 * using spdk_env_get_core_count() to determine how many local caches we need 1234 * to account for. 1235 */ 1236 cache_size = BUF_SMALL_POOL_SIZE / (2 * spdk_env_get_core_count()); 1237 snprintf(mempool_name, sizeof(mempool_name), "buf_small_pool_%d", getpid()); 1238 1239 g_bdev_mgr.buf_small_pool = spdk_mempool_create(mempool_name, 1240 BUF_SMALL_POOL_SIZE, 1241 SPDK_BDEV_BUF_SIZE_WITH_MD(SPDK_BDEV_SMALL_BUF_MAX_SIZE) + 1242 SPDK_BDEV_POOL_ALIGNMENT, 1243 cache_size, 1244 SPDK_ENV_SOCKET_ID_ANY); 1245 if (!g_bdev_mgr.buf_small_pool) { 1246 SPDK_ERRLOG("create rbuf small pool failed\n"); 1247 bdev_init_complete(-1); 1248 return; 1249 } 1250 1251 cache_size = BUF_LARGE_POOL_SIZE / (2 * spdk_env_get_core_count()); 1252 snprintf(mempool_name, sizeof(mempool_name), "buf_large_pool_%d", getpid()); 1253 1254 g_bdev_mgr.buf_large_pool = spdk_mempool_create(mempool_name, 1255 BUF_LARGE_POOL_SIZE, 1256 SPDK_BDEV_BUF_SIZE_WITH_MD(SPDK_BDEV_LARGE_BUF_MAX_SIZE) + 1257 SPDK_BDEV_POOL_ALIGNMENT, 1258 cache_size, 1259 SPDK_ENV_SOCKET_ID_ANY); 1260 if (!g_bdev_mgr.buf_large_pool) { 1261 SPDK_ERRLOG("create rbuf large pool failed\n"); 1262 bdev_init_complete(-1); 1263 return; 1264 } 1265 1266 g_bdev_mgr.zero_buffer = spdk_zmalloc(ZERO_BUFFER_SIZE, ZERO_BUFFER_SIZE, 1267 NULL, SPDK_ENV_LCORE_ID_ANY, SPDK_MALLOC_DMA); 1268 if (!g_bdev_mgr.zero_buffer) { 1269 SPDK_ERRLOG("create bdev zero buffer failed\n"); 1270 bdev_init_complete(-1); 1271 return; 1272 } 1273 1274 #ifdef SPDK_CONFIG_VTUNE 1275 g_bdev_mgr.domain = __itt_domain_create("spdk_bdev"); 1276 #endif 1277 1278 spdk_io_device_register(&g_bdev_mgr, bdev_mgmt_channel_create, 1279 bdev_mgmt_channel_destroy, 1280 sizeof(struct spdk_bdev_mgmt_channel), 1281 "bdev_mgr"); 1282 1283 rc = bdev_modules_init(); 1284 g_bdev_mgr.module_init_complete = true; 1285 if (rc != 0) { 1286 SPDK_ERRLOG("bdev modules init failed\n"); 1287 return; 1288 } 1289 1290 bdev_module_action_complete(); 1291 } 1292 1293 static void 1294 bdev_mgr_unregister_cb(void *io_device) 1295 { 1296 spdk_bdev_fini_cb cb_fn = g_fini_cb_fn; 1297 1298 if (g_bdev_mgr.bdev_io_pool) { 1299 if (spdk_mempool_count(g_bdev_mgr.bdev_io_pool) != g_bdev_opts.bdev_io_pool_size) { 1300 SPDK_ERRLOG("bdev IO pool count is %zu but should be %u\n", 1301 spdk_mempool_count(g_bdev_mgr.bdev_io_pool), 1302 g_bdev_opts.bdev_io_pool_size); 1303 } 1304 1305 spdk_mempool_free(g_bdev_mgr.bdev_io_pool); 1306 } 1307 1308 if (g_bdev_mgr.buf_small_pool) { 1309 if (spdk_mempool_count(g_bdev_mgr.buf_small_pool) != BUF_SMALL_POOL_SIZE) { 1310 SPDK_ERRLOG("Small buffer pool count is %zu but should be %u\n", 1311 spdk_mempool_count(g_bdev_mgr.buf_small_pool), 1312 BUF_SMALL_POOL_SIZE); 1313 assert(false); 1314 } 1315 1316 spdk_mempool_free(g_bdev_mgr.buf_small_pool); 1317 } 1318 1319 if (g_bdev_mgr.buf_large_pool) { 1320 if (spdk_mempool_count(g_bdev_mgr.buf_large_pool) != BUF_LARGE_POOL_SIZE) { 1321 SPDK_ERRLOG("Large buffer pool count is %zu but should be %u\n", 1322 spdk_mempool_count(g_bdev_mgr.buf_large_pool), 1323 BUF_LARGE_POOL_SIZE); 1324 assert(false); 1325 } 1326 1327 spdk_mempool_free(g_bdev_mgr.buf_large_pool); 1328 } 1329 1330 spdk_free(g_bdev_mgr.zero_buffer); 1331 1332 cb_fn(g_fini_cb_arg); 1333 g_fini_cb_fn = NULL; 1334 g_fini_cb_arg = NULL; 1335 g_bdev_mgr.init_complete = false; 1336 g_bdev_mgr.module_init_complete = false; 1337 pthread_mutex_destroy(&g_bdev_mgr.mutex); 1338 } 1339 1340 static void 1341 bdev_module_finish_iter(void *arg) 1342 { 1343 struct spdk_bdev_module *bdev_module; 1344 1345 /* FIXME: Handling initialization failures is broken now, 1346 * so we won't even try cleaning up after successfully 1347 * initialized modules. if module_init_complete is false, 1348 * just call spdk_bdev_mgr_unregister_cb 1349 */ 1350 if (!g_bdev_mgr.module_init_complete) { 1351 bdev_mgr_unregister_cb(NULL); 1352 return; 1353 } 1354 1355 /* Start iterating from the last touched module */ 1356 if (!g_resume_bdev_module) { 1357 bdev_module = TAILQ_LAST(&g_bdev_mgr.bdev_modules, bdev_module_list); 1358 } else { 1359 bdev_module = TAILQ_PREV(g_resume_bdev_module, bdev_module_list, 1360 internal.tailq); 1361 } 1362 1363 while (bdev_module) { 1364 if (bdev_module->async_fini) { 1365 /* Save our place so we can resume later. We must 1366 * save the variable here, before calling module_fini() 1367 * below, because in some cases the module may immediately 1368 * call spdk_bdev_module_finish_done() and re-enter 1369 * this function to continue iterating. */ 1370 g_resume_bdev_module = bdev_module; 1371 } 1372 1373 if (bdev_module->module_fini) { 1374 bdev_module->module_fini(); 1375 } 1376 1377 if (bdev_module->async_fini) { 1378 return; 1379 } 1380 1381 bdev_module = TAILQ_PREV(bdev_module, bdev_module_list, 1382 internal.tailq); 1383 } 1384 1385 g_resume_bdev_module = NULL; 1386 spdk_io_device_unregister(&g_bdev_mgr, bdev_mgr_unregister_cb); 1387 } 1388 1389 void 1390 spdk_bdev_module_finish_done(void) 1391 { 1392 if (spdk_get_thread() != g_fini_thread) { 1393 spdk_thread_send_msg(g_fini_thread, bdev_module_finish_iter, NULL); 1394 } else { 1395 bdev_module_finish_iter(NULL); 1396 } 1397 } 1398 1399 static void 1400 bdev_finish_unregister_bdevs_iter(void *cb_arg, int bdeverrno) 1401 { 1402 struct spdk_bdev *bdev = cb_arg; 1403 1404 if (bdeverrno && bdev) { 1405 SPDK_WARNLOG("Unable to unregister bdev '%s' during spdk_bdev_finish()\n", 1406 bdev->name); 1407 1408 /* 1409 * Since the call to spdk_bdev_unregister() failed, we have no way to free this 1410 * bdev; try to continue by manually removing this bdev from the list and continue 1411 * with the next bdev in the list. 1412 */ 1413 TAILQ_REMOVE(&g_bdev_mgr.bdevs, bdev, internal.link); 1414 } 1415 1416 if (TAILQ_EMPTY(&g_bdev_mgr.bdevs)) { 1417 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Done unregistering bdevs\n"); 1418 /* 1419 * Bdev module finish need to be deferred as we might be in the middle of some context 1420 * (like bdev part free) that will use this bdev (or private bdev driver ctx data) 1421 * after returning. 1422 */ 1423 spdk_thread_send_msg(spdk_get_thread(), bdev_module_finish_iter, NULL); 1424 return; 1425 } 1426 1427 /* 1428 * Unregister last unclaimed bdev in the list, to ensure that bdev subsystem 1429 * shutdown proceeds top-down. The goal is to give virtual bdevs an opportunity 1430 * to detect clean shutdown as opposed to run-time hot removal of the underlying 1431 * base bdevs. 1432 * 1433 * Also, walk the list in the reverse order. 1434 */ 1435 for (bdev = TAILQ_LAST(&g_bdev_mgr.bdevs, spdk_bdev_list); 1436 bdev; bdev = TAILQ_PREV(bdev, spdk_bdev_list, internal.link)) { 1437 if (bdev->internal.claim_module != NULL) { 1438 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Skipping claimed bdev '%s'(<-'%s').\n", 1439 bdev->name, bdev->internal.claim_module->name); 1440 continue; 1441 } 1442 1443 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Unregistering bdev '%s'\n", bdev->name); 1444 spdk_bdev_unregister(bdev, bdev_finish_unregister_bdevs_iter, bdev); 1445 return; 1446 } 1447 1448 /* 1449 * If any bdev fails to unclaim underlying bdev properly, we may face the 1450 * case of bdev list consisting of claimed bdevs only (if claims are managed 1451 * correctly, this would mean there's a loop in the claims graph which is 1452 * clearly impossible). Warn and unregister last bdev on the list then. 1453 */ 1454 for (bdev = TAILQ_LAST(&g_bdev_mgr.bdevs, spdk_bdev_list); 1455 bdev; bdev = TAILQ_PREV(bdev, spdk_bdev_list, internal.link)) { 1456 SPDK_WARNLOG("Unregistering claimed bdev '%s'!\n", bdev->name); 1457 spdk_bdev_unregister(bdev, bdev_finish_unregister_bdevs_iter, bdev); 1458 return; 1459 } 1460 } 1461 1462 void 1463 spdk_bdev_finish(spdk_bdev_fini_cb cb_fn, void *cb_arg) 1464 { 1465 struct spdk_bdev_module *m; 1466 1467 assert(cb_fn != NULL); 1468 1469 g_fini_thread = spdk_get_thread(); 1470 1471 g_fini_cb_fn = cb_fn; 1472 g_fini_cb_arg = cb_arg; 1473 1474 TAILQ_FOREACH(m, &g_bdev_mgr.bdev_modules, internal.tailq) { 1475 if (m->fini_start) { 1476 m->fini_start(); 1477 } 1478 } 1479 1480 bdev_finish_unregister_bdevs_iter(NULL, 0); 1481 } 1482 1483 struct spdk_bdev_io * 1484 bdev_channel_get_io(struct spdk_bdev_channel *channel) 1485 { 1486 struct spdk_bdev_mgmt_channel *ch = channel->shared_resource->mgmt_ch; 1487 struct spdk_bdev_io *bdev_io; 1488 1489 if (ch->per_thread_cache_count > 0) { 1490 bdev_io = STAILQ_FIRST(&ch->per_thread_cache); 1491 STAILQ_REMOVE_HEAD(&ch->per_thread_cache, internal.buf_link); 1492 ch->per_thread_cache_count--; 1493 } else if (spdk_unlikely(!TAILQ_EMPTY(&ch->io_wait_queue))) { 1494 /* 1495 * Don't try to look for bdev_ios in the global pool if there are 1496 * waiters on bdev_ios - we don't want this caller to jump the line. 1497 */ 1498 bdev_io = NULL; 1499 } else { 1500 bdev_io = spdk_mempool_get(g_bdev_mgr.bdev_io_pool); 1501 } 1502 1503 return bdev_io; 1504 } 1505 1506 void 1507 spdk_bdev_free_io(struct spdk_bdev_io *bdev_io) 1508 { 1509 struct spdk_bdev_mgmt_channel *ch; 1510 1511 assert(bdev_io != NULL); 1512 assert(bdev_io->internal.status != SPDK_BDEV_IO_STATUS_PENDING); 1513 1514 ch = bdev_io->internal.ch->shared_resource->mgmt_ch; 1515 1516 if (bdev_io->internal.buf != NULL) { 1517 bdev_io_put_buf(bdev_io); 1518 } 1519 1520 if (ch->per_thread_cache_count < ch->bdev_io_cache_size) { 1521 ch->per_thread_cache_count++; 1522 STAILQ_INSERT_HEAD(&ch->per_thread_cache, bdev_io, internal.buf_link); 1523 while (ch->per_thread_cache_count > 0 && !TAILQ_EMPTY(&ch->io_wait_queue)) { 1524 struct spdk_bdev_io_wait_entry *entry; 1525 1526 entry = TAILQ_FIRST(&ch->io_wait_queue); 1527 TAILQ_REMOVE(&ch->io_wait_queue, entry, link); 1528 entry->cb_fn(entry->cb_arg); 1529 } 1530 } else { 1531 /* We should never have a full cache with entries on the io wait queue. */ 1532 assert(TAILQ_EMPTY(&ch->io_wait_queue)); 1533 spdk_mempool_put(g_bdev_mgr.bdev_io_pool, (void *)bdev_io); 1534 } 1535 } 1536 1537 static bool 1538 bdev_qos_is_iops_rate_limit(enum spdk_bdev_qos_rate_limit_type limit) 1539 { 1540 assert(limit != SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES); 1541 1542 switch (limit) { 1543 case SPDK_BDEV_QOS_RW_IOPS_RATE_LIMIT: 1544 return true; 1545 case SPDK_BDEV_QOS_RW_BPS_RATE_LIMIT: 1546 case SPDK_BDEV_QOS_R_BPS_RATE_LIMIT: 1547 case SPDK_BDEV_QOS_W_BPS_RATE_LIMIT: 1548 return false; 1549 case SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES: 1550 default: 1551 return false; 1552 } 1553 } 1554 1555 static bool 1556 bdev_qos_io_to_limit(struct spdk_bdev_io *bdev_io) 1557 { 1558 switch (bdev_io->type) { 1559 case SPDK_BDEV_IO_TYPE_NVME_IO: 1560 case SPDK_BDEV_IO_TYPE_NVME_IO_MD: 1561 case SPDK_BDEV_IO_TYPE_READ: 1562 case SPDK_BDEV_IO_TYPE_WRITE: 1563 return true; 1564 case SPDK_BDEV_IO_TYPE_ZCOPY: 1565 if (bdev_io->u.bdev.zcopy.start) { 1566 return true; 1567 } else { 1568 return false; 1569 } 1570 default: 1571 return false; 1572 } 1573 } 1574 1575 static bool 1576 bdev_is_read_io(struct spdk_bdev_io *bdev_io) 1577 { 1578 switch (bdev_io->type) { 1579 case SPDK_BDEV_IO_TYPE_NVME_IO: 1580 case SPDK_BDEV_IO_TYPE_NVME_IO_MD: 1581 /* Bit 1 (0x2) set for read operation */ 1582 if (bdev_io->u.nvme_passthru.cmd.opc & SPDK_NVME_OPC_READ) { 1583 return true; 1584 } else { 1585 return false; 1586 } 1587 case SPDK_BDEV_IO_TYPE_READ: 1588 return true; 1589 case SPDK_BDEV_IO_TYPE_ZCOPY: 1590 /* Populate to read from disk */ 1591 if (bdev_io->u.bdev.zcopy.populate) { 1592 return true; 1593 } else { 1594 return false; 1595 } 1596 default: 1597 return false; 1598 } 1599 } 1600 1601 static uint64_t 1602 bdev_get_io_size_in_byte(struct spdk_bdev_io *bdev_io) 1603 { 1604 struct spdk_bdev *bdev = bdev_io->bdev; 1605 1606 switch (bdev_io->type) { 1607 case SPDK_BDEV_IO_TYPE_NVME_IO: 1608 case SPDK_BDEV_IO_TYPE_NVME_IO_MD: 1609 return bdev_io->u.nvme_passthru.nbytes; 1610 case SPDK_BDEV_IO_TYPE_READ: 1611 case SPDK_BDEV_IO_TYPE_WRITE: 1612 return bdev_io->u.bdev.num_blocks * bdev->blocklen; 1613 case SPDK_BDEV_IO_TYPE_ZCOPY: 1614 /* Track the data in the start phase only */ 1615 if (bdev_io->u.bdev.zcopy.start) { 1616 return bdev_io->u.bdev.num_blocks * bdev->blocklen; 1617 } else { 1618 return 0; 1619 } 1620 default: 1621 return 0; 1622 } 1623 } 1624 1625 static bool 1626 bdev_qos_rw_queue_io(const struct spdk_bdev_qos_limit *limit, struct spdk_bdev_io *io) 1627 { 1628 if (limit->max_per_timeslice > 0 && limit->remaining_this_timeslice <= 0) { 1629 return true; 1630 } else { 1631 return false; 1632 } 1633 } 1634 1635 static bool 1636 bdev_qos_r_queue_io(const struct spdk_bdev_qos_limit *limit, struct spdk_bdev_io *io) 1637 { 1638 if (bdev_is_read_io(io) == false) { 1639 return false; 1640 } 1641 1642 return bdev_qos_rw_queue_io(limit, io); 1643 } 1644 1645 static bool 1646 bdev_qos_w_queue_io(const struct spdk_bdev_qos_limit *limit, struct spdk_bdev_io *io) 1647 { 1648 if (bdev_is_read_io(io) == true) { 1649 return false; 1650 } 1651 1652 return bdev_qos_rw_queue_io(limit, io); 1653 } 1654 1655 static void 1656 bdev_qos_rw_iops_update_quota(struct spdk_bdev_qos_limit *limit, struct spdk_bdev_io *io) 1657 { 1658 limit->remaining_this_timeslice--; 1659 } 1660 1661 static void 1662 bdev_qos_rw_bps_update_quota(struct spdk_bdev_qos_limit *limit, struct spdk_bdev_io *io) 1663 { 1664 limit->remaining_this_timeslice -= bdev_get_io_size_in_byte(io); 1665 } 1666 1667 static void 1668 bdev_qos_r_bps_update_quota(struct spdk_bdev_qos_limit *limit, struct spdk_bdev_io *io) 1669 { 1670 if (bdev_is_read_io(io) == false) { 1671 return; 1672 } 1673 1674 return bdev_qos_rw_bps_update_quota(limit, io); 1675 } 1676 1677 static void 1678 bdev_qos_w_bps_update_quota(struct spdk_bdev_qos_limit *limit, struct spdk_bdev_io *io) 1679 { 1680 if (bdev_is_read_io(io) == true) { 1681 return; 1682 } 1683 1684 return bdev_qos_rw_bps_update_quota(limit, io); 1685 } 1686 1687 static void 1688 bdev_qos_set_ops(struct spdk_bdev_qos *qos) 1689 { 1690 int i; 1691 1692 for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) { 1693 if (qos->rate_limits[i].limit == SPDK_BDEV_QOS_LIMIT_NOT_DEFINED) { 1694 qos->rate_limits[i].queue_io = NULL; 1695 qos->rate_limits[i].update_quota = NULL; 1696 continue; 1697 } 1698 1699 switch (i) { 1700 case SPDK_BDEV_QOS_RW_IOPS_RATE_LIMIT: 1701 qos->rate_limits[i].queue_io = bdev_qos_rw_queue_io; 1702 qos->rate_limits[i].update_quota = bdev_qos_rw_iops_update_quota; 1703 break; 1704 case SPDK_BDEV_QOS_RW_BPS_RATE_LIMIT: 1705 qos->rate_limits[i].queue_io = bdev_qos_rw_queue_io; 1706 qos->rate_limits[i].update_quota = bdev_qos_rw_bps_update_quota; 1707 break; 1708 case SPDK_BDEV_QOS_R_BPS_RATE_LIMIT: 1709 qos->rate_limits[i].queue_io = bdev_qos_r_queue_io; 1710 qos->rate_limits[i].update_quota = bdev_qos_r_bps_update_quota; 1711 break; 1712 case SPDK_BDEV_QOS_W_BPS_RATE_LIMIT: 1713 qos->rate_limits[i].queue_io = bdev_qos_w_queue_io; 1714 qos->rate_limits[i].update_quota = bdev_qos_w_bps_update_quota; 1715 break; 1716 default: 1717 break; 1718 } 1719 } 1720 } 1721 1722 static void 1723 _bdev_io_complete_in_submit(struct spdk_bdev_channel *bdev_ch, 1724 struct spdk_bdev_io *bdev_io, 1725 enum spdk_bdev_io_status status) 1726 { 1727 struct spdk_bdev_shared_resource *shared_resource = bdev_ch->shared_resource; 1728 1729 bdev_io->internal.in_submit_request = true; 1730 bdev_ch->io_outstanding++; 1731 shared_resource->io_outstanding++; 1732 spdk_bdev_io_complete(bdev_io, status); 1733 bdev_io->internal.in_submit_request = false; 1734 } 1735 1736 static inline void 1737 bdev_io_do_submit(struct spdk_bdev_channel *bdev_ch, struct spdk_bdev_io *bdev_io) 1738 { 1739 struct spdk_bdev *bdev = bdev_io->bdev; 1740 struct spdk_io_channel *ch = bdev_ch->channel; 1741 struct spdk_bdev_shared_resource *shared_resource = bdev_ch->shared_resource; 1742 1743 if (spdk_unlikely(bdev_io->type == SPDK_BDEV_IO_TYPE_ABORT)) { 1744 struct spdk_bdev_mgmt_channel *mgmt_channel = shared_resource->mgmt_ch; 1745 struct spdk_bdev_io *bio_to_abort = bdev_io->u.abort.bio_to_abort; 1746 1747 if (bdev_abort_queued_io(&shared_resource->nomem_io, bio_to_abort) || 1748 bdev_abort_buf_io(&mgmt_channel->need_buf_small, bio_to_abort) || 1749 bdev_abort_buf_io(&mgmt_channel->need_buf_large, bio_to_abort)) { 1750 _bdev_io_complete_in_submit(bdev_ch, bdev_io, 1751 SPDK_BDEV_IO_STATUS_SUCCESS); 1752 return; 1753 } 1754 } 1755 1756 if (spdk_likely(TAILQ_EMPTY(&shared_resource->nomem_io))) { 1757 bdev_ch->io_outstanding++; 1758 shared_resource->io_outstanding++; 1759 bdev_io->internal.in_submit_request = true; 1760 bdev->fn_table->submit_request(ch, bdev_io); 1761 bdev_io->internal.in_submit_request = false; 1762 } else { 1763 TAILQ_INSERT_TAIL(&shared_resource->nomem_io, bdev_io, internal.link); 1764 } 1765 } 1766 1767 static int 1768 bdev_qos_io_submit(struct spdk_bdev_channel *ch, struct spdk_bdev_qos *qos) 1769 { 1770 struct spdk_bdev_io *bdev_io = NULL, *tmp = NULL; 1771 int i, submitted_ios = 0; 1772 1773 TAILQ_FOREACH_SAFE(bdev_io, &qos->queued, internal.link, tmp) { 1774 if (bdev_qos_io_to_limit(bdev_io) == true) { 1775 for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) { 1776 if (!qos->rate_limits[i].queue_io) { 1777 continue; 1778 } 1779 1780 if (qos->rate_limits[i].queue_io(&qos->rate_limits[i], 1781 bdev_io) == true) { 1782 return submitted_ios; 1783 } 1784 } 1785 for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) { 1786 if (!qos->rate_limits[i].update_quota) { 1787 continue; 1788 } 1789 1790 qos->rate_limits[i].update_quota(&qos->rate_limits[i], bdev_io); 1791 } 1792 } 1793 1794 TAILQ_REMOVE(&qos->queued, bdev_io, internal.link); 1795 bdev_io_do_submit(ch, bdev_io); 1796 submitted_ios++; 1797 } 1798 1799 return submitted_ios; 1800 } 1801 1802 static void 1803 bdev_queue_io_wait_with_cb(struct spdk_bdev_io *bdev_io, spdk_bdev_io_wait_cb cb_fn) 1804 { 1805 int rc; 1806 1807 bdev_io->internal.waitq_entry.bdev = bdev_io->bdev; 1808 bdev_io->internal.waitq_entry.cb_fn = cb_fn; 1809 bdev_io->internal.waitq_entry.cb_arg = bdev_io; 1810 rc = spdk_bdev_queue_io_wait(bdev_io->bdev, spdk_io_channel_from_ctx(bdev_io->internal.ch), 1811 &bdev_io->internal.waitq_entry); 1812 if (rc != 0) { 1813 SPDK_ERRLOG("Queue IO failed, rc=%d\n", rc); 1814 bdev_io->internal.status = SPDK_BDEV_IO_STATUS_FAILED; 1815 bdev_io->internal.cb(bdev_io, false, bdev_io->internal.caller_ctx); 1816 } 1817 } 1818 1819 static bool 1820 bdev_io_type_can_split(uint8_t type) 1821 { 1822 assert(type != SPDK_BDEV_IO_TYPE_INVALID); 1823 assert(type < SPDK_BDEV_NUM_IO_TYPES); 1824 1825 /* Only split READ and WRITE I/O. Theoretically other types of I/O like 1826 * UNMAP could be split, but these types of I/O are typically much larger 1827 * in size (sometimes the size of the entire block device), and the bdev 1828 * module can more efficiently split these types of I/O. Plus those types 1829 * of I/O do not have a payload, which makes the splitting process simpler. 1830 */ 1831 if (type == SPDK_BDEV_IO_TYPE_READ || type == SPDK_BDEV_IO_TYPE_WRITE) { 1832 return true; 1833 } else { 1834 return false; 1835 } 1836 } 1837 1838 static bool 1839 bdev_io_should_split(struct spdk_bdev_io *bdev_io) 1840 { 1841 uint64_t start_stripe, end_stripe; 1842 uint32_t io_boundary = bdev_io->bdev->optimal_io_boundary; 1843 1844 if (io_boundary == 0) { 1845 return false; 1846 } 1847 1848 if (!bdev_io_type_can_split(bdev_io->type)) { 1849 return false; 1850 } 1851 1852 start_stripe = bdev_io->u.bdev.offset_blocks; 1853 end_stripe = start_stripe + bdev_io->u.bdev.num_blocks - 1; 1854 /* Avoid expensive div operations if possible. These spdk_u32 functions are very cheap. */ 1855 if (spdk_likely(spdk_u32_is_pow2(io_boundary))) { 1856 start_stripe >>= spdk_u32log2(io_boundary); 1857 end_stripe >>= spdk_u32log2(io_boundary); 1858 } else { 1859 start_stripe /= io_boundary; 1860 end_stripe /= io_boundary; 1861 } 1862 return (start_stripe != end_stripe); 1863 } 1864 1865 static uint32_t 1866 _to_next_boundary(uint64_t offset, uint32_t boundary) 1867 { 1868 return (boundary - (offset % boundary)); 1869 } 1870 1871 static void 1872 bdev_io_split_done(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg); 1873 1874 static void 1875 _bdev_io_split(void *_bdev_io) 1876 { 1877 struct spdk_bdev_io *bdev_io = _bdev_io; 1878 uint64_t current_offset, remaining; 1879 uint32_t blocklen, to_next_boundary, to_next_boundary_bytes, to_last_block_bytes; 1880 struct iovec *parent_iov, *iov; 1881 uint64_t parent_iov_offset, iov_len; 1882 uint32_t parent_iovpos, parent_iovcnt, child_iovcnt, iovcnt; 1883 void *md_buf = NULL; 1884 int rc; 1885 1886 remaining = bdev_io->u.bdev.split_remaining_num_blocks; 1887 current_offset = bdev_io->u.bdev.split_current_offset_blocks; 1888 blocklen = bdev_io->bdev->blocklen; 1889 parent_iov_offset = (current_offset - bdev_io->u.bdev.offset_blocks) * blocklen; 1890 parent_iovcnt = bdev_io->u.bdev.iovcnt; 1891 1892 for (parent_iovpos = 0; parent_iovpos < parent_iovcnt; parent_iovpos++) { 1893 parent_iov = &bdev_io->u.bdev.iovs[parent_iovpos]; 1894 if (parent_iov_offset < parent_iov->iov_len) { 1895 break; 1896 } 1897 parent_iov_offset -= parent_iov->iov_len; 1898 } 1899 1900 child_iovcnt = 0; 1901 while (remaining > 0 && parent_iovpos < parent_iovcnt && child_iovcnt < BDEV_IO_NUM_CHILD_IOV) { 1902 to_next_boundary = _to_next_boundary(current_offset, bdev_io->bdev->optimal_io_boundary); 1903 to_next_boundary = spdk_min(remaining, to_next_boundary); 1904 to_next_boundary_bytes = to_next_boundary * blocklen; 1905 iov = &bdev_io->child_iov[child_iovcnt]; 1906 iovcnt = 0; 1907 1908 if (bdev_io->u.bdev.md_buf) { 1909 assert((parent_iov_offset % blocklen) > 0); 1910 md_buf = (char *)bdev_io->u.bdev.md_buf + (parent_iov_offset / blocklen) * 1911 spdk_bdev_get_md_size(bdev_io->bdev); 1912 } 1913 1914 while (to_next_boundary_bytes > 0 && parent_iovpos < parent_iovcnt && 1915 child_iovcnt < BDEV_IO_NUM_CHILD_IOV) { 1916 parent_iov = &bdev_io->u.bdev.iovs[parent_iovpos]; 1917 iov_len = spdk_min(to_next_boundary_bytes, parent_iov->iov_len - parent_iov_offset); 1918 to_next_boundary_bytes -= iov_len; 1919 1920 bdev_io->child_iov[child_iovcnt].iov_base = parent_iov->iov_base + parent_iov_offset; 1921 bdev_io->child_iov[child_iovcnt].iov_len = iov_len; 1922 1923 if (iov_len < parent_iov->iov_len - parent_iov_offset) { 1924 parent_iov_offset += iov_len; 1925 } else { 1926 parent_iovpos++; 1927 parent_iov_offset = 0; 1928 } 1929 child_iovcnt++; 1930 iovcnt++; 1931 } 1932 1933 if (to_next_boundary_bytes > 0) { 1934 /* We had to stop this child I/O early because we ran out of 1935 * child_iov space. Ensure the iovs to be aligned with block 1936 * size and then adjust to_next_boundary before starting the 1937 * child I/O. 1938 */ 1939 assert(child_iovcnt == BDEV_IO_NUM_CHILD_IOV); 1940 to_last_block_bytes = to_next_boundary_bytes % blocklen; 1941 if (to_last_block_bytes != 0) { 1942 uint32_t child_iovpos = child_iovcnt - 1; 1943 /* don't decrease child_iovcnt so the loop will naturally end */ 1944 1945 to_last_block_bytes = blocklen - to_last_block_bytes; 1946 to_next_boundary_bytes += to_last_block_bytes; 1947 while (to_last_block_bytes > 0 && iovcnt > 0) { 1948 iov_len = spdk_min(to_last_block_bytes, 1949 bdev_io->child_iov[child_iovpos].iov_len); 1950 bdev_io->child_iov[child_iovpos].iov_len -= iov_len; 1951 if (bdev_io->child_iov[child_iovpos].iov_len == 0) { 1952 child_iovpos--; 1953 if (--iovcnt == 0) { 1954 return; 1955 } 1956 } 1957 to_last_block_bytes -= iov_len; 1958 } 1959 1960 assert(to_last_block_bytes == 0); 1961 } 1962 to_next_boundary -= to_next_boundary_bytes / blocklen; 1963 } 1964 1965 bdev_io->u.bdev.split_outstanding++; 1966 1967 if (bdev_io->type == SPDK_BDEV_IO_TYPE_READ) { 1968 rc = bdev_readv_blocks_with_md(bdev_io->internal.desc, 1969 spdk_io_channel_from_ctx(bdev_io->internal.ch), 1970 iov, iovcnt, md_buf, current_offset, 1971 to_next_boundary, 1972 bdev_io_split_done, bdev_io); 1973 } else { 1974 rc = bdev_writev_blocks_with_md(bdev_io->internal.desc, 1975 spdk_io_channel_from_ctx(bdev_io->internal.ch), 1976 iov, iovcnt, md_buf, current_offset, 1977 to_next_boundary, 1978 bdev_io_split_done, bdev_io); 1979 } 1980 1981 if (rc == 0) { 1982 current_offset += to_next_boundary; 1983 remaining -= to_next_boundary; 1984 bdev_io->u.bdev.split_current_offset_blocks = current_offset; 1985 bdev_io->u.bdev.split_remaining_num_blocks = remaining; 1986 } else { 1987 bdev_io->u.bdev.split_outstanding--; 1988 if (rc == -ENOMEM) { 1989 if (bdev_io->u.bdev.split_outstanding == 0) { 1990 /* No I/O is outstanding. Hence we should wait here. */ 1991 bdev_queue_io_wait_with_cb(bdev_io, _bdev_io_split); 1992 } 1993 } else { 1994 bdev_io->internal.status = SPDK_BDEV_IO_STATUS_FAILED; 1995 if (bdev_io->u.bdev.split_outstanding == 0) { 1996 spdk_trace_record_tsc(spdk_get_ticks(), TRACE_BDEV_IO_DONE, 0, 0, 1997 (uintptr_t)bdev_io, 0); 1998 TAILQ_REMOVE(&bdev_io->internal.ch->io_submitted, bdev_io, internal.ch_link); 1999 bdev_io->internal.cb(bdev_io, false, bdev_io->internal.caller_ctx); 2000 } 2001 } 2002 2003 return; 2004 } 2005 } 2006 } 2007 2008 static void 2009 bdev_io_split_done(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg) 2010 { 2011 struct spdk_bdev_io *parent_io = cb_arg; 2012 2013 spdk_bdev_free_io(bdev_io); 2014 2015 if (!success) { 2016 parent_io->internal.status = SPDK_BDEV_IO_STATUS_FAILED; 2017 /* If any child I/O failed, stop further splitting process. */ 2018 parent_io->u.bdev.split_current_offset_blocks += parent_io->u.bdev.split_remaining_num_blocks; 2019 parent_io->u.bdev.split_remaining_num_blocks = 0; 2020 } 2021 parent_io->u.bdev.split_outstanding--; 2022 if (parent_io->u.bdev.split_outstanding != 0) { 2023 return; 2024 } 2025 2026 /* 2027 * Parent I/O finishes when all blocks are consumed. 2028 */ 2029 if (parent_io->u.bdev.split_remaining_num_blocks == 0) { 2030 assert(parent_io->internal.cb != bdev_io_split_done); 2031 spdk_trace_record_tsc(spdk_get_ticks(), TRACE_BDEV_IO_DONE, 0, 0, 2032 (uintptr_t)parent_io, 0); 2033 TAILQ_REMOVE(&parent_io->internal.ch->io_submitted, parent_io, internal.ch_link); 2034 parent_io->internal.cb(parent_io, parent_io->internal.status == SPDK_BDEV_IO_STATUS_SUCCESS, 2035 parent_io->internal.caller_ctx); 2036 return; 2037 } 2038 2039 /* 2040 * Continue with the splitting process. This function will complete the parent I/O if the 2041 * splitting is done. 2042 */ 2043 _bdev_io_split(parent_io); 2044 } 2045 2046 static void 2047 bdev_io_split_get_buf_cb(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io, bool success); 2048 2049 static void 2050 bdev_io_split(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io) 2051 { 2052 assert(bdev_io_type_can_split(bdev_io->type)); 2053 2054 bdev_io->u.bdev.split_current_offset_blocks = bdev_io->u.bdev.offset_blocks; 2055 bdev_io->u.bdev.split_remaining_num_blocks = bdev_io->u.bdev.num_blocks; 2056 bdev_io->u.bdev.split_outstanding = 0; 2057 bdev_io->internal.status = SPDK_BDEV_IO_STATUS_SUCCESS; 2058 2059 if (_is_buf_allocated(bdev_io->u.bdev.iovs)) { 2060 _bdev_io_split(bdev_io); 2061 } else { 2062 assert(bdev_io->type == SPDK_BDEV_IO_TYPE_READ); 2063 spdk_bdev_io_get_buf(bdev_io, bdev_io_split_get_buf_cb, 2064 bdev_io->u.bdev.num_blocks * bdev_io->bdev->blocklen); 2065 } 2066 } 2067 2068 static void 2069 bdev_io_split_get_buf_cb(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io, bool success) 2070 { 2071 if (!success) { 2072 spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED); 2073 return; 2074 } 2075 2076 bdev_io_split(ch, bdev_io); 2077 } 2078 2079 /* Explicitly mark this inline, since it's used as a function pointer and otherwise won't 2080 * be inlined, at least on some compilers. 2081 */ 2082 static inline void 2083 _bdev_io_submit(void *ctx) 2084 { 2085 struct spdk_bdev_io *bdev_io = ctx; 2086 struct spdk_bdev *bdev = bdev_io->bdev; 2087 struct spdk_bdev_channel *bdev_ch = bdev_io->internal.ch; 2088 uint64_t tsc; 2089 2090 tsc = spdk_get_ticks(); 2091 bdev_io->internal.submit_tsc = tsc; 2092 spdk_trace_record_tsc(tsc, TRACE_BDEV_IO_START, 0, 0, (uintptr_t)bdev_io, bdev_io->type); 2093 2094 if (spdk_likely(bdev_ch->flags == 0)) { 2095 bdev_io_do_submit(bdev_ch, bdev_io); 2096 return; 2097 } 2098 2099 if (bdev_ch->flags & BDEV_CH_RESET_IN_PROGRESS) { 2100 _bdev_io_complete_in_submit(bdev_ch, bdev_io, SPDK_BDEV_IO_STATUS_ABORTED); 2101 } else if (bdev_ch->flags & BDEV_CH_QOS_ENABLED) { 2102 if (spdk_unlikely(bdev_io->type == SPDK_BDEV_IO_TYPE_ABORT) && 2103 bdev_abort_queued_io(&bdev->internal.qos->queued, bdev_io->u.abort.bio_to_abort)) { 2104 _bdev_io_complete_in_submit(bdev_ch, bdev_io, SPDK_BDEV_IO_STATUS_SUCCESS); 2105 } else { 2106 TAILQ_INSERT_TAIL(&bdev->internal.qos->queued, bdev_io, internal.link); 2107 bdev_qos_io_submit(bdev_ch, bdev->internal.qos); 2108 } 2109 } else { 2110 SPDK_ERRLOG("unknown bdev_ch flag %x found\n", bdev_ch->flags); 2111 _bdev_io_complete_in_submit(bdev_ch, bdev_io, SPDK_BDEV_IO_STATUS_FAILED); 2112 } 2113 } 2114 2115 bool 2116 bdev_lba_range_overlapped(struct lba_range *range1, struct lba_range *range2); 2117 2118 bool 2119 bdev_lba_range_overlapped(struct lba_range *range1, struct lba_range *range2) 2120 { 2121 if (range1->length == 0 || range2->length == 0) { 2122 return false; 2123 } 2124 2125 if (range1->offset + range1->length <= range2->offset) { 2126 return false; 2127 } 2128 2129 if (range2->offset + range2->length <= range1->offset) { 2130 return false; 2131 } 2132 2133 return true; 2134 } 2135 2136 static bool 2137 bdev_io_range_is_locked(struct spdk_bdev_io *bdev_io, struct lba_range *range) 2138 { 2139 struct spdk_bdev_channel *ch = bdev_io->internal.ch; 2140 struct lba_range r; 2141 2142 switch (bdev_io->type) { 2143 case SPDK_BDEV_IO_TYPE_NVME_IO: 2144 case SPDK_BDEV_IO_TYPE_NVME_IO_MD: 2145 /* Don't try to decode the NVMe command - just assume worst-case and that 2146 * it overlaps a locked range. 2147 */ 2148 return true; 2149 case SPDK_BDEV_IO_TYPE_WRITE: 2150 case SPDK_BDEV_IO_TYPE_UNMAP: 2151 case SPDK_BDEV_IO_TYPE_WRITE_ZEROES: 2152 case SPDK_BDEV_IO_TYPE_ZCOPY: 2153 r.offset = bdev_io->u.bdev.offset_blocks; 2154 r.length = bdev_io->u.bdev.num_blocks; 2155 if (!bdev_lba_range_overlapped(range, &r)) { 2156 /* This I/O doesn't overlap the specified LBA range. */ 2157 return false; 2158 } else if (range->owner_ch == ch && range->locked_ctx == bdev_io->internal.caller_ctx) { 2159 /* This I/O overlaps, but the I/O is on the same channel that locked this 2160 * range, and the caller_ctx is the same as the locked_ctx. This means 2161 * that this I/O is associated with the lock, and is allowed to execute. 2162 */ 2163 return false; 2164 } else { 2165 return true; 2166 } 2167 default: 2168 return false; 2169 } 2170 } 2171 2172 void 2173 bdev_io_submit(struct spdk_bdev_io *bdev_io) 2174 { 2175 struct spdk_bdev *bdev = bdev_io->bdev; 2176 struct spdk_thread *thread = spdk_bdev_io_get_thread(bdev_io); 2177 struct spdk_bdev_channel *ch = bdev_io->internal.ch; 2178 2179 assert(thread != NULL); 2180 assert(bdev_io->internal.status == SPDK_BDEV_IO_STATUS_PENDING); 2181 2182 if (!TAILQ_EMPTY(&ch->locked_ranges)) { 2183 struct lba_range *range; 2184 2185 TAILQ_FOREACH(range, &ch->locked_ranges, tailq) { 2186 if (bdev_io_range_is_locked(bdev_io, range)) { 2187 TAILQ_INSERT_TAIL(&ch->io_locked, bdev_io, internal.ch_link); 2188 return; 2189 } 2190 } 2191 } 2192 2193 TAILQ_INSERT_TAIL(&ch->io_submitted, bdev_io, internal.ch_link); 2194 2195 if (bdev->split_on_optimal_io_boundary && bdev_io_should_split(bdev_io)) { 2196 bdev_io->internal.submit_tsc = spdk_get_ticks(); 2197 spdk_trace_record_tsc(bdev_io->internal.submit_tsc, TRACE_BDEV_IO_START, 0, 0, 2198 (uintptr_t)bdev_io, bdev_io->type); 2199 bdev_io_split(NULL, bdev_io); 2200 return; 2201 } 2202 2203 if (ch->flags & BDEV_CH_QOS_ENABLED) { 2204 if ((thread == bdev->internal.qos->thread) || !bdev->internal.qos->thread) { 2205 _bdev_io_submit(bdev_io); 2206 } else { 2207 bdev_io->internal.io_submit_ch = ch; 2208 bdev_io->internal.ch = bdev->internal.qos->ch; 2209 spdk_thread_send_msg(bdev->internal.qos->thread, _bdev_io_submit, bdev_io); 2210 } 2211 } else { 2212 _bdev_io_submit(bdev_io); 2213 } 2214 } 2215 2216 static void 2217 bdev_io_submit_reset(struct spdk_bdev_io *bdev_io) 2218 { 2219 struct spdk_bdev *bdev = bdev_io->bdev; 2220 struct spdk_bdev_channel *bdev_ch = bdev_io->internal.ch; 2221 struct spdk_io_channel *ch = bdev_ch->channel; 2222 2223 assert(bdev_io->internal.status == SPDK_BDEV_IO_STATUS_PENDING); 2224 2225 bdev_io->internal.in_submit_request = true; 2226 bdev->fn_table->submit_request(ch, bdev_io); 2227 bdev_io->internal.in_submit_request = false; 2228 } 2229 2230 void 2231 bdev_io_init(struct spdk_bdev_io *bdev_io, 2232 struct spdk_bdev *bdev, void *cb_arg, 2233 spdk_bdev_io_completion_cb cb) 2234 { 2235 bdev_io->bdev = bdev; 2236 bdev_io->internal.caller_ctx = cb_arg; 2237 bdev_io->internal.cb = cb; 2238 bdev_io->internal.status = SPDK_BDEV_IO_STATUS_PENDING; 2239 bdev_io->internal.in_submit_request = false; 2240 bdev_io->internal.buf = NULL; 2241 bdev_io->internal.io_submit_ch = NULL; 2242 bdev_io->internal.orig_iovs = NULL; 2243 bdev_io->internal.orig_iovcnt = 0; 2244 bdev_io->internal.orig_md_buf = NULL; 2245 bdev_io->internal.error.nvme.cdw0 = 0; 2246 bdev_io->num_retries = 0; 2247 bdev_io->internal.get_buf_cb = NULL; 2248 bdev_io->internal.get_aux_buf_cb = NULL; 2249 } 2250 2251 static bool 2252 bdev_io_type_supported(struct spdk_bdev *bdev, enum spdk_bdev_io_type io_type) 2253 { 2254 return bdev->fn_table->io_type_supported(bdev->ctxt, io_type); 2255 } 2256 2257 bool 2258 spdk_bdev_io_type_supported(struct spdk_bdev *bdev, enum spdk_bdev_io_type io_type) 2259 { 2260 bool supported; 2261 2262 supported = bdev_io_type_supported(bdev, io_type); 2263 2264 if (!supported) { 2265 switch (io_type) { 2266 case SPDK_BDEV_IO_TYPE_WRITE_ZEROES: 2267 /* The bdev layer will emulate write zeroes as long as write is supported. */ 2268 supported = bdev_io_type_supported(bdev, SPDK_BDEV_IO_TYPE_WRITE); 2269 break; 2270 case SPDK_BDEV_IO_TYPE_ZCOPY: 2271 /* Zero copy can be emulated with regular read and write */ 2272 supported = bdev_io_type_supported(bdev, SPDK_BDEV_IO_TYPE_READ) && 2273 bdev_io_type_supported(bdev, SPDK_BDEV_IO_TYPE_WRITE); 2274 break; 2275 default: 2276 break; 2277 } 2278 } 2279 2280 return supported; 2281 } 2282 2283 int 2284 spdk_bdev_dump_info_json(struct spdk_bdev *bdev, struct spdk_json_write_ctx *w) 2285 { 2286 if (bdev->fn_table->dump_info_json) { 2287 return bdev->fn_table->dump_info_json(bdev->ctxt, w); 2288 } 2289 2290 return 0; 2291 } 2292 2293 static void 2294 bdev_qos_update_max_quota_per_timeslice(struct spdk_bdev_qos *qos) 2295 { 2296 uint32_t max_per_timeslice = 0; 2297 int i; 2298 2299 for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) { 2300 if (qos->rate_limits[i].limit == SPDK_BDEV_QOS_LIMIT_NOT_DEFINED) { 2301 qos->rate_limits[i].max_per_timeslice = 0; 2302 continue; 2303 } 2304 2305 max_per_timeslice = qos->rate_limits[i].limit * 2306 SPDK_BDEV_QOS_TIMESLICE_IN_USEC / SPDK_SEC_TO_USEC; 2307 2308 qos->rate_limits[i].max_per_timeslice = spdk_max(max_per_timeslice, 2309 qos->rate_limits[i].min_per_timeslice); 2310 2311 qos->rate_limits[i].remaining_this_timeslice = qos->rate_limits[i].max_per_timeslice; 2312 } 2313 2314 bdev_qos_set_ops(qos); 2315 } 2316 2317 static int 2318 bdev_channel_poll_qos(void *arg) 2319 { 2320 struct spdk_bdev_qos *qos = arg; 2321 uint64_t now = spdk_get_ticks(); 2322 int i; 2323 2324 if (now < (qos->last_timeslice + qos->timeslice_size)) { 2325 /* We received our callback earlier than expected - return 2326 * immediately and wait to do accounting until at least one 2327 * timeslice has actually expired. This should never happen 2328 * with a well-behaved timer implementation. 2329 */ 2330 return SPDK_POLLER_IDLE; 2331 } 2332 2333 /* Reset for next round of rate limiting */ 2334 for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) { 2335 /* We may have allowed the IOs or bytes to slightly overrun in the last 2336 * timeslice. remaining_this_timeslice is signed, so if it's negative 2337 * here, we'll account for the overrun so that the next timeslice will 2338 * be appropriately reduced. 2339 */ 2340 if (qos->rate_limits[i].remaining_this_timeslice > 0) { 2341 qos->rate_limits[i].remaining_this_timeslice = 0; 2342 } 2343 } 2344 2345 while (now >= (qos->last_timeslice + qos->timeslice_size)) { 2346 qos->last_timeslice += qos->timeslice_size; 2347 for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) { 2348 qos->rate_limits[i].remaining_this_timeslice += 2349 qos->rate_limits[i].max_per_timeslice; 2350 } 2351 } 2352 2353 return bdev_qos_io_submit(qos->ch, qos); 2354 } 2355 2356 static void 2357 bdev_channel_destroy_resource(struct spdk_bdev_channel *ch) 2358 { 2359 struct spdk_bdev_shared_resource *shared_resource; 2360 struct lba_range *range; 2361 2362 while (!TAILQ_EMPTY(&ch->locked_ranges)) { 2363 range = TAILQ_FIRST(&ch->locked_ranges); 2364 TAILQ_REMOVE(&ch->locked_ranges, range, tailq); 2365 free(range); 2366 } 2367 2368 spdk_put_io_channel(ch->channel); 2369 2370 shared_resource = ch->shared_resource; 2371 2372 assert(TAILQ_EMPTY(&ch->io_locked)); 2373 assert(TAILQ_EMPTY(&ch->io_submitted)); 2374 assert(ch->io_outstanding == 0); 2375 assert(shared_resource->ref > 0); 2376 shared_resource->ref--; 2377 if (shared_resource->ref == 0) { 2378 assert(shared_resource->io_outstanding == 0); 2379 TAILQ_REMOVE(&shared_resource->mgmt_ch->shared_resources, shared_resource, link); 2380 spdk_put_io_channel(spdk_io_channel_from_ctx(shared_resource->mgmt_ch)); 2381 free(shared_resource); 2382 } 2383 } 2384 2385 /* Caller must hold bdev->internal.mutex. */ 2386 static void 2387 bdev_enable_qos(struct spdk_bdev *bdev, struct spdk_bdev_channel *ch) 2388 { 2389 struct spdk_bdev_qos *qos = bdev->internal.qos; 2390 int i; 2391 2392 /* Rate limiting on this bdev enabled */ 2393 if (qos) { 2394 if (qos->ch == NULL) { 2395 struct spdk_io_channel *io_ch; 2396 2397 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Selecting channel %p as QoS channel for bdev %s on thread %p\n", ch, 2398 bdev->name, spdk_get_thread()); 2399 2400 /* No qos channel has been selected, so set one up */ 2401 2402 /* Take another reference to ch */ 2403 io_ch = spdk_get_io_channel(__bdev_to_io_dev(bdev)); 2404 assert(io_ch != NULL); 2405 qos->ch = ch; 2406 2407 qos->thread = spdk_io_channel_get_thread(io_ch); 2408 2409 TAILQ_INIT(&qos->queued); 2410 2411 for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) { 2412 if (bdev_qos_is_iops_rate_limit(i) == true) { 2413 qos->rate_limits[i].min_per_timeslice = 2414 SPDK_BDEV_QOS_MIN_IO_PER_TIMESLICE; 2415 } else { 2416 qos->rate_limits[i].min_per_timeslice = 2417 SPDK_BDEV_QOS_MIN_BYTE_PER_TIMESLICE; 2418 } 2419 2420 if (qos->rate_limits[i].limit == 0) { 2421 qos->rate_limits[i].limit = SPDK_BDEV_QOS_LIMIT_NOT_DEFINED; 2422 } 2423 } 2424 bdev_qos_update_max_quota_per_timeslice(qos); 2425 qos->timeslice_size = 2426 SPDK_BDEV_QOS_TIMESLICE_IN_USEC * spdk_get_ticks_hz() / SPDK_SEC_TO_USEC; 2427 qos->last_timeslice = spdk_get_ticks(); 2428 qos->poller = SPDK_POLLER_REGISTER(bdev_channel_poll_qos, 2429 qos, 2430 SPDK_BDEV_QOS_TIMESLICE_IN_USEC); 2431 } 2432 2433 ch->flags |= BDEV_CH_QOS_ENABLED; 2434 } 2435 } 2436 2437 struct poll_timeout_ctx { 2438 struct spdk_bdev_desc *desc; 2439 uint64_t timeout_in_sec; 2440 spdk_bdev_io_timeout_cb cb_fn; 2441 void *cb_arg; 2442 }; 2443 2444 static void 2445 bdev_desc_free(struct spdk_bdev_desc *desc) 2446 { 2447 pthread_mutex_destroy(&desc->mutex); 2448 free(desc->media_events_buffer); 2449 free(desc); 2450 } 2451 2452 static void 2453 bdev_channel_poll_timeout_io_done(struct spdk_io_channel_iter *i, int status) 2454 { 2455 struct poll_timeout_ctx *ctx = spdk_io_channel_iter_get_ctx(i); 2456 struct spdk_bdev_desc *desc = ctx->desc; 2457 2458 free(ctx); 2459 2460 pthread_mutex_lock(&desc->mutex); 2461 desc->refs--; 2462 if (desc->closed == true && desc->refs == 0) { 2463 pthread_mutex_unlock(&desc->mutex); 2464 bdev_desc_free(desc); 2465 return; 2466 } 2467 pthread_mutex_unlock(&desc->mutex); 2468 } 2469 2470 static void 2471 bdev_channel_poll_timeout_io(struct spdk_io_channel_iter *i) 2472 { 2473 struct poll_timeout_ctx *ctx = spdk_io_channel_iter_get_ctx(i); 2474 struct spdk_io_channel *io_ch = spdk_io_channel_iter_get_channel(i); 2475 struct spdk_bdev_channel *bdev_ch = spdk_io_channel_get_ctx(io_ch); 2476 struct spdk_bdev_desc *desc = ctx->desc; 2477 struct spdk_bdev_io *bdev_io; 2478 uint64_t now; 2479 2480 pthread_mutex_lock(&desc->mutex); 2481 if (desc->closed == true) { 2482 pthread_mutex_unlock(&desc->mutex); 2483 spdk_for_each_channel_continue(i, -1); 2484 return; 2485 } 2486 pthread_mutex_unlock(&desc->mutex); 2487 2488 now = spdk_get_ticks(); 2489 TAILQ_FOREACH(bdev_io, &bdev_ch->io_submitted, internal.ch_link) { 2490 /* Exclude any I/O that are generated via splitting. */ 2491 if (bdev_io->internal.cb == bdev_io_split_done) { 2492 continue; 2493 } 2494 2495 /* Once we find an I/O that has not timed out, we can immediately 2496 * exit the loop. 2497 */ 2498 if (now < (bdev_io->internal.submit_tsc + 2499 ctx->timeout_in_sec * spdk_get_ticks_hz())) { 2500 goto end; 2501 } 2502 2503 if (bdev_io->internal.desc == desc) { 2504 ctx->cb_fn(ctx->cb_arg, bdev_io); 2505 } 2506 } 2507 2508 end: 2509 spdk_for_each_channel_continue(i, 0); 2510 } 2511 2512 static int 2513 bdev_poll_timeout_io(void *arg) 2514 { 2515 struct spdk_bdev_desc *desc = arg; 2516 struct spdk_bdev *bdev = spdk_bdev_desc_get_bdev(desc); 2517 struct poll_timeout_ctx *ctx; 2518 2519 ctx = calloc(1, sizeof(struct poll_timeout_ctx)); 2520 if (!ctx) { 2521 SPDK_ERRLOG("failed to allocate memory\n"); 2522 return SPDK_POLLER_BUSY; 2523 } 2524 ctx->desc = desc; 2525 ctx->cb_arg = desc->cb_arg; 2526 ctx->cb_fn = desc->cb_fn; 2527 ctx->timeout_in_sec = desc->timeout_in_sec; 2528 2529 /* Take a ref on the descriptor in case it gets closed while we are checking 2530 * all of the channels. 2531 */ 2532 pthread_mutex_lock(&desc->mutex); 2533 desc->refs++; 2534 pthread_mutex_unlock(&desc->mutex); 2535 2536 spdk_for_each_channel(__bdev_to_io_dev(bdev), 2537 bdev_channel_poll_timeout_io, 2538 ctx, 2539 bdev_channel_poll_timeout_io_done); 2540 2541 return SPDK_POLLER_BUSY; 2542 } 2543 2544 int 2545 spdk_bdev_set_timeout(struct spdk_bdev_desc *desc, uint64_t timeout_in_sec, 2546 spdk_bdev_io_timeout_cb cb_fn, void *cb_arg) 2547 { 2548 assert(desc->thread == spdk_get_thread()); 2549 2550 spdk_poller_unregister(&desc->io_timeout_poller); 2551 2552 if (timeout_in_sec) { 2553 assert(cb_fn != NULL); 2554 desc->io_timeout_poller = SPDK_POLLER_REGISTER(bdev_poll_timeout_io, 2555 desc, 2556 SPDK_BDEV_IO_POLL_INTERVAL_IN_MSEC * SPDK_SEC_TO_USEC / 2557 1000); 2558 if (desc->io_timeout_poller == NULL) { 2559 SPDK_ERRLOG("can not register the desc timeout IO poller\n"); 2560 return -1; 2561 } 2562 } 2563 2564 desc->cb_fn = cb_fn; 2565 desc->cb_arg = cb_arg; 2566 desc->timeout_in_sec = timeout_in_sec; 2567 2568 return 0; 2569 } 2570 2571 static int 2572 bdev_channel_create(void *io_device, void *ctx_buf) 2573 { 2574 struct spdk_bdev *bdev = __bdev_from_io_dev(io_device); 2575 struct spdk_bdev_channel *ch = ctx_buf; 2576 struct spdk_io_channel *mgmt_io_ch; 2577 struct spdk_bdev_mgmt_channel *mgmt_ch; 2578 struct spdk_bdev_shared_resource *shared_resource; 2579 struct lba_range *range; 2580 2581 ch->bdev = bdev; 2582 ch->channel = bdev->fn_table->get_io_channel(bdev->ctxt); 2583 if (!ch->channel) { 2584 return -1; 2585 } 2586 2587 assert(ch->histogram == NULL); 2588 if (bdev->internal.histogram_enabled) { 2589 ch->histogram = spdk_histogram_data_alloc(); 2590 if (ch->histogram == NULL) { 2591 SPDK_ERRLOG("Could not allocate histogram\n"); 2592 } 2593 } 2594 2595 mgmt_io_ch = spdk_get_io_channel(&g_bdev_mgr); 2596 if (!mgmt_io_ch) { 2597 spdk_put_io_channel(ch->channel); 2598 return -1; 2599 } 2600 2601 mgmt_ch = spdk_io_channel_get_ctx(mgmt_io_ch); 2602 TAILQ_FOREACH(shared_resource, &mgmt_ch->shared_resources, link) { 2603 if (shared_resource->shared_ch == ch->channel) { 2604 spdk_put_io_channel(mgmt_io_ch); 2605 shared_resource->ref++; 2606 break; 2607 } 2608 } 2609 2610 if (shared_resource == NULL) { 2611 shared_resource = calloc(1, sizeof(*shared_resource)); 2612 if (shared_resource == NULL) { 2613 spdk_put_io_channel(ch->channel); 2614 spdk_put_io_channel(mgmt_io_ch); 2615 return -1; 2616 } 2617 2618 shared_resource->mgmt_ch = mgmt_ch; 2619 shared_resource->io_outstanding = 0; 2620 TAILQ_INIT(&shared_resource->nomem_io); 2621 shared_resource->nomem_threshold = 0; 2622 shared_resource->shared_ch = ch->channel; 2623 shared_resource->ref = 1; 2624 TAILQ_INSERT_TAIL(&mgmt_ch->shared_resources, shared_resource, link); 2625 } 2626 2627 memset(&ch->stat, 0, sizeof(ch->stat)); 2628 ch->stat.ticks_rate = spdk_get_ticks_hz(); 2629 ch->io_outstanding = 0; 2630 TAILQ_INIT(&ch->queued_resets); 2631 TAILQ_INIT(&ch->locked_ranges); 2632 ch->flags = 0; 2633 ch->shared_resource = shared_resource; 2634 2635 TAILQ_INIT(&ch->io_submitted); 2636 TAILQ_INIT(&ch->io_locked); 2637 2638 #ifdef SPDK_CONFIG_VTUNE 2639 { 2640 char *name; 2641 __itt_init_ittlib(NULL, 0); 2642 name = spdk_sprintf_alloc("spdk_bdev_%s_%p", ch->bdev->name, ch); 2643 if (!name) { 2644 bdev_channel_destroy_resource(ch); 2645 return -1; 2646 } 2647 ch->handle = __itt_string_handle_create(name); 2648 free(name); 2649 ch->start_tsc = spdk_get_ticks(); 2650 ch->interval_tsc = spdk_get_ticks_hz() / 100; 2651 memset(&ch->prev_stat, 0, sizeof(ch->prev_stat)); 2652 } 2653 #endif 2654 2655 pthread_mutex_lock(&bdev->internal.mutex); 2656 bdev_enable_qos(bdev, ch); 2657 2658 TAILQ_FOREACH(range, &bdev->internal.locked_ranges, tailq) { 2659 struct lba_range *new_range; 2660 2661 new_range = calloc(1, sizeof(*new_range)); 2662 if (new_range == NULL) { 2663 pthread_mutex_unlock(&bdev->internal.mutex); 2664 bdev_channel_destroy_resource(ch); 2665 return -1; 2666 } 2667 new_range->length = range->length; 2668 new_range->offset = range->offset; 2669 new_range->locked_ctx = range->locked_ctx; 2670 TAILQ_INSERT_TAIL(&ch->locked_ranges, new_range, tailq); 2671 } 2672 2673 pthread_mutex_unlock(&bdev->internal.mutex); 2674 2675 return 0; 2676 } 2677 2678 /* 2679 * Abort I/O that are waiting on a data buffer. These types of I/O are 2680 * linked using the spdk_bdev_io internal.buf_link TAILQ_ENTRY. 2681 */ 2682 static void 2683 bdev_abort_all_buf_io(bdev_io_stailq_t *queue, struct spdk_bdev_channel *ch) 2684 { 2685 bdev_io_stailq_t tmp; 2686 struct spdk_bdev_io *bdev_io; 2687 2688 STAILQ_INIT(&tmp); 2689 2690 while (!STAILQ_EMPTY(queue)) { 2691 bdev_io = STAILQ_FIRST(queue); 2692 STAILQ_REMOVE_HEAD(queue, internal.buf_link); 2693 if (bdev_io->internal.ch == ch) { 2694 spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_ABORTED); 2695 } else { 2696 STAILQ_INSERT_TAIL(&tmp, bdev_io, internal.buf_link); 2697 } 2698 } 2699 2700 STAILQ_SWAP(&tmp, queue, spdk_bdev_io); 2701 } 2702 2703 /* 2704 * Abort I/O that are queued waiting for submission. These types of I/O are 2705 * linked using the spdk_bdev_io link TAILQ_ENTRY. 2706 */ 2707 static void 2708 bdev_abort_all_queued_io(bdev_io_tailq_t *queue, struct spdk_bdev_channel *ch) 2709 { 2710 struct spdk_bdev_io *bdev_io, *tmp; 2711 2712 TAILQ_FOREACH_SAFE(bdev_io, queue, internal.link, tmp) { 2713 if (bdev_io->internal.ch == ch) { 2714 TAILQ_REMOVE(queue, bdev_io, internal.link); 2715 /* 2716 * spdk_bdev_io_complete() assumes that the completed I/O had 2717 * been submitted to the bdev module. Since in this case it 2718 * hadn't, bump io_outstanding to account for the decrement 2719 * that spdk_bdev_io_complete() will do. 2720 */ 2721 if (bdev_io->type != SPDK_BDEV_IO_TYPE_RESET) { 2722 ch->io_outstanding++; 2723 ch->shared_resource->io_outstanding++; 2724 } 2725 spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_ABORTED); 2726 } 2727 } 2728 } 2729 2730 static bool 2731 bdev_abort_queued_io(bdev_io_tailq_t *queue, struct spdk_bdev_io *bio_to_abort) 2732 { 2733 struct spdk_bdev_io *bdev_io; 2734 2735 TAILQ_FOREACH(bdev_io, queue, internal.link) { 2736 if (bdev_io == bio_to_abort) { 2737 TAILQ_REMOVE(queue, bio_to_abort, internal.link); 2738 spdk_bdev_io_complete(bio_to_abort, SPDK_BDEV_IO_STATUS_ABORTED); 2739 return true; 2740 } 2741 } 2742 2743 return false; 2744 } 2745 2746 static bool 2747 bdev_abort_buf_io(bdev_io_stailq_t *queue, struct spdk_bdev_io *bio_to_abort) 2748 { 2749 struct spdk_bdev_io *bdev_io; 2750 2751 STAILQ_FOREACH(bdev_io, queue, internal.buf_link) { 2752 if (bdev_io == bio_to_abort) { 2753 STAILQ_REMOVE(queue, bio_to_abort, spdk_bdev_io, internal.buf_link); 2754 spdk_bdev_io_complete(bio_to_abort, SPDK_BDEV_IO_STATUS_ABORTED); 2755 return true; 2756 } 2757 } 2758 2759 return false; 2760 } 2761 2762 static void 2763 bdev_qos_channel_destroy(void *cb_arg) 2764 { 2765 struct spdk_bdev_qos *qos = cb_arg; 2766 2767 spdk_put_io_channel(spdk_io_channel_from_ctx(qos->ch)); 2768 spdk_poller_unregister(&qos->poller); 2769 2770 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Free QoS %p.\n", qos); 2771 2772 free(qos); 2773 } 2774 2775 static int 2776 bdev_qos_destroy(struct spdk_bdev *bdev) 2777 { 2778 int i; 2779 2780 /* 2781 * Cleanly shutting down the QoS poller is tricky, because 2782 * during the asynchronous operation the user could open 2783 * a new descriptor and create a new channel, spawning 2784 * a new QoS poller. 2785 * 2786 * The strategy is to create a new QoS structure here and swap it 2787 * in. The shutdown path then continues to refer to the old one 2788 * until it completes and then releases it. 2789 */ 2790 struct spdk_bdev_qos *new_qos, *old_qos; 2791 2792 old_qos = bdev->internal.qos; 2793 2794 new_qos = calloc(1, sizeof(*new_qos)); 2795 if (!new_qos) { 2796 SPDK_ERRLOG("Unable to allocate memory to shut down QoS.\n"); 2797 return -ENOMEM; 2798 } 2799 2800 /* Copy the old QoS data into the newly allocated structure */ 2801 memcpy(new_qos, old_qos, sizeof(*new_qos)); 2802 2803 /* Zero out the key parts of the QoS structure */ 2804 new_qos->ch = NULL; 2805 new_qos->thread = NULL; 2806 new_qos->poller = NULL; 2807 TAILQ_INIT(&new_qos->queued); 2808 /* 2809 * The limit member of spdk_bdev_qos_limit structure is not zeroed. 2810 * It will be used later for the new QoS structure. 2811 */ 2812 for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) { 2813 new_qos->rate_limits[i].remaining_this_timeslice = 0; 2814 new_qos->rate_limits[i].min_per_timeslice = 0; 2815 new_qos->rate_limits[i].max_per_timeslice = 0; 2816 } 2817 2818 bdev->internal.qos = new_qos; 2819 2820 if (old_qos->thread == NULL) { 2821 free(old_qos); 2822 } else { 2823 spdk_thread_send_msg(old_qos->thread, bdev_qos_channel_destroy, old_qos); 2824 } 2825 2826 /* It is safe to continue with destroying the bdev even though the QoS channel hasn't 2827 * been destroyed yet. The destruction path will end up waiting for the final 2828 * channel to be put before it releases resources. */ 2829 2830 return 0; 2831 } 2832 2833 static void 2834 bdev_io_stat_add(struct spdk_bdev_io_stat *total, struct spdk_bdev_io_stat *add) 2835 { 2836 total->bytes_read += add->bytes_read; 2837 total->num_read_ops += add->num_read_ops; 2838 total->bytes_written += add->bytes_written; 2839 total->num_write_ops += add->num_write_ops; 2840 total->bytes_unmapped += add->bytes_unmapped; 2841 total->num_unmap_ops += add->num_unmap_ops; 2842 total->read_latency_ticks += add->read_latency_ticks; 2843 total->write_latency_ticks += add->write_latency_ticks; 2844 total->unmap_latency_ticks += add->unmap_latency_ticks; 2845 } 2846 2847 static void 2848 bdev_channel_destroy(void *io_device, void *ctx_buf) 2849 { 2850 struct spdk_bdev_channel *ch = ctx_buf; 2851 struct spdk_bdev_mgmt_channel *mgmt_ch; 2852 struct spdk_bdev_shared_resource *shared_resource = ch->shared_resource; 2853 2854 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Destroying channel %p for bdev %s on thread %p\n", ch, ch->bdev->name, 2855 spdk_get_thread()); 2856 2857 /* This channel is going away, so add its statistics into the bdev so that they don't get lost. */ 2858 pthread_mutex_lock(&ch->bdev->internal.mutex); 2859 bdev_io_stat_add(&ch->bdev->internal.stat, &ch->stat); 2860 pthread_mutex_unlock(&ch->bdev->internal.mutex); 2861 2862 mgmt_ch = shared_resource->mgmt_ch; 2863 2864 bdev_abort_all_queued_io(&ch->queued_resets, ch); 2865 bdev_abort_all_queued_io(&shared_resource->nomem_io, ch); 2866 bdev_abort_all_buf_io(&mgmt_ch->need_buf_small, ch); 2867 bdev_abort_all_buf_io(&mgmt_ch->need_buf_large, ch); 2868 2869 if (ch->histogram) { 2870 spdk_histogram_data_free(ch->histogram); 2871 } 2872 2873 bdev_channel_destroy_resource(ch); 2874 } 2875 2876 int 2877 spdk_bdev_alias_add(struct spdk_bdev *bdev, const char *alias) 2878 { 2879 struct spdk_bdev_alias *tmp; 2880 2881 if (alias == NULL) { 2882 SPDK_ERRLOG("Empty alias passed\n"); 2883 return -EINVAL; 2884 } 2885 2886 if (spdk_bdev_get_by_name(alias)) { 2887 SPDK_ERRLOG("Bdev name/alias: %s already exists\n", alias); 2888 return -EEXIST; 2889 } 2890 2891 tmp = calloc(1, sizeof(*tmp)); 2892 if (tmp == NULL) { 2893 SPDK_ERRLOG("Unable to allocate alias\n"); 2894 return -ENOMEM; 2895 } 2896 2897 tmp->alias = strdup(alias); 2898 if (tmp->alias == NULL) { 2899 free(tmp); 2900 SPDK_ERRLOG("Unable to allocate alias\n"); 2901 return -ENOMEM; 2902 } 2903 2904 TAILQ_INSERT_TAIL(&bdev->aliases, tmp, tailq); 2905 2906 return 0; 2907 } 2908 2909 int 2910 spdk_bdev_alias_del(struct spdk_bdev *bdev, const char *alias) 2911 { 2912 struct spdk_bdev_alias *tmp; 2913 2914 TAILQ_FOREACH(tmp, &bdev->aliases, tailq) { 2915 if (strcmp(alias, tmp->alias) == 0) { 2916 TAILQ_REMOVE(&bdev->aliases, tmp, tailq); 2917 free(tmp->alias); 2918 free(tmp); 2919 return 0; 2920 } 2921 } 2922 2923 SPDK_INFOLOG(SPDK_LOG_BDEV, "Alias %s does not exists\n", alias); 2924 2925 return -ENOENT; 2926 } 2927 2928 void 2929 spdk_bdev_alias_del_all(struct spdk_bdev *bdev) 2930 { 2931 struct spdk_bdev_alias *p, *tmp; 2932 2933 TAILQ_FOREACH_SAFE(p, &bdev->aliases, tailq, tmp) { 2934 TAILQ_REMOVE(&bdev->aliases, p, tailq); 2935 free(p->alias); 2936 free(p); 2937 } 2938 } 2939 2940 struct spdk_io_channel * 2941 spdk_bdev_get_io_channel(struct spdk_bdev_desc *desc) 2942 { 2943 return spdk_get_io_channel(__bdev_to_io_dev(spdk_bdev_desc_get_bdev(desc))); 2944 } 2945 2946 const char * 2947 spdk_bdev_get_name(const struct spdk_bdev *bdev) 2948 { 2949 return bdev->name; 2950 } 2951 2952 const char * 2953 spdk_bdev_get_product_name(const struct spdk_bdev *bdev) 2954 { 2955 return bdev->product_name; 2956 } 2957 2958 const struct spdk_bdev_aliases_list * 2959 spdk_bdev_get_aliases(const struct spdk_bdev *bdev) 2960 { 2961 return &bdev->aliases; 2962 } 2963 2964 uint32_t 2965 spdk_bdev_get_block_size(const struct spdk_bdev *bdev) 2966 { 2967 return bdev->blocklen; 2968 } 2969 2970 uint32_t 2971 spdk_bdev_get_write_unit_size(const struct spdk_bdev *bdev) 2972 { 2973 return bdev->write_unit_size; 2974 } 2975 2976 uint64_t 2977 spdk_bdev_get_num_blocks(const struct spdk_bdev *bdev) 2978 { 2979 return bdev->blockcnt; 2980 } 2981 2982 const char * 2983 spdk_bdev_get_qos_rpc_type(enum spdk_bdev_qos_rate_limit_type type) 2984 { 2985 return qos_rpc_type[type]; 2986 } 2987 2988 void 2989 spdk_bdev_get_qos_rate_limits(struct spdk_bdev *bdev, uint64_t *limits) 2990 { 2991 int i; 2992 2993 memset(limits, 0, sizeof(*limits) * SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES); 2994 2995 pthread_mutex_lock(&bdev->internal.mutex); 2996 if (bdev->internal.qos) { 2997 for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) { 2998 if (bdev->internal.qos->rate_limits[i].limit != 2999 SPDK_BDEV_QOS_LIMIT_NOT_DEFINED) { 3000 limits[i] = bdev->internal.qos->rate_limits[i].limit; 3001 if (bdev_qos_is_iops_rate_limit(i) == false) { 3002 /* Change from Byte to Megabyte which is user visible. */ 3003 limits[i] = limits[i] / 1024 / 1024; 3004 } 3005 } 3006 } 3007 } 3008 pthread_mutex_unlock(&bdev->internal.mutex); 3009 } 3010 3011 size_t 3012 spdk_bdev_get_buf_align(const struct spdk_bdev *bdev) 3013 { 3014 return 1 << bdev->required_alignment; 3015 } 3016 3017 uint32_t 3018 spdk_bdev_get_optimal_io_boundary(const struct spdk_bdev *bdev) 3019 { 3020 return bdev->optimal_io_boundary; 3021 } 3022 3023 bool 3024 spdk_bdev_has_write_cache(const struct spdk_bdev *bdev) 3025 { 3026 return bdev->write_cache; 3027 } 3028 3029 const struct spdk_uuid * 3030 spdk_bdev_get_uuid(const struct spdk_bdev *bdev) 3031 { 3032 return &bdev->uuid; 3033 } 3034 3035 uint16_t 3036 spdk_bdev_get_acwu(const struct spdk_bdev *bdev) 3037 { 3038 return bdev->acwu; 3039 } 3040 3041 uint32_t 3042 spdk_bdev_get_md_size(const struct spdk_bdev *bdev) 3043 { 3044 return bdev->md_len; 3045 } 3046 3047 bool 3048 spdk_bdev_is_md_interleaved(const struct spdk_bdev *bdev) 3049 { 3050 return (bdev->md_len != 0) && bdev->md_interleave; 3051 } 3052 3053 bool 3054 spdk_bdev_is_md_separate(const struct spdk_bdev *bdev) 3055 { 3056 return (bdev->md_len != 0) && !bdev->md_interleave; 3057 } 3058 3059 bool 3060 spdk_bdev_is_zoned(const struct spdk_bdev *bdev) 3061 { 3062 return bdev->zoned; 3063 } 3064 3065 uint32_t 3066 spdk_bdev_get_data_block_size(const struct spdk_bdev *bdev) 3067 { 3068 if (spdk_bdev_is_md_interleaved(bdev)) { 3069 return bdev->blocklen - bdev->md_len; 3070 } else { 3071 return bdev->blocklen; 3072 } 3073 } 3074 3075 static uint32_t 3076 _bdev_get_block_size_with_md(const struct spdk_bdev *bdev) 3077 { 3078 if (!spdk_bdev_is_md_interleaved(bdev)) { 3079 return bdev->blocklen + bdev->md_len; 3080 } else { 3081 return bdev->blocklen; 3082 } 3083 } 3084 3085 enum spdk_dif_type spdk_bdev_get_dif_type(const struct spdk_bdev *bdev) 3086 { 3087 if (bdev->md_len != 0) { 3088 return bdev->dif_type; 3089 } else { 3090 return SPDK_DIF_DISABLE; 3091 } 3092 } 3093 3094 bool 3095 spdk_bdev_is_dif_head_of_md(const struct spdk_bdev *bdev) 3096 { 3097 if (spdk_bdev_get_dif_type(bdev) != SPDK_DIF_DISABLE) { 3098 return bdev->dif_is_head_of_md; 3099 } else { 3100 return false; 3101 } 3102 } 3103 3104 bool 3105 spdk_bdev_is_dif_check_enabled(const struct spdk_bdev *bdev, 3106 enum spdk_dif_check_type check_type) 3107 { 3108 if (spdk_bdev_get_dif_type(bdev) == SPDK_DIF_DISABLE) { 3109 return false; 3110 } 3111 3112 switch (check_type) { 3113 case SPDK_DIF_CHECK_TYPE_REFTAG: 3114 return (bdev->dif_check_flags & SPDK_DIF_FLAGS_REFTAG_CHECK) != 0; 3115 case SPDK_DIF_CHECK_TYPE_APPTAG: 3116 return (bdev->dif_check_flags & SPDK_DIF_FLAGS_APPTAG_CHECK) != 0; 3117 case SPDK_DIF_CHECK_TYPE_GUARD: 3118 return (bdev->dif_check_flags & SPDK_DIF_FLAGS_GUARD_CHECK) != 0; 3119 default: 3120 return false; 3121 } 3122 } 3123 3124 uint64_t 3125 spdk_bdev_get_qd(const struct spdk_bdev *bdev) 3126 { 3127 return bdev->internal.measured_queue_depth; 3128 } 3129 3130 uint64_t 3131 spdk_bdev_get_qd_sampling_period(const struct spdk_bdev *bdev) 3132 { 3133 return bdev->internal.period; 3134 } 3135 3136 uint64_t 3137 spdk_bdev_get_weighted_io_time(const struct spdk_bdev *bdev) 3138 { 3139 return bdev->internal.weighted_io_time; 3140 } 3141 3142 uint64_t 3143 spdk_bdev_get_io_time(const struct spdk_bdev *bdev) 3144 { 3145 return bdev->internal.io_time; 3146 } 3147 3148 static void 3149 _calculate_measured_qd_cpl(struct spdk_io_channel_iter *i, int status) 3150 { 3151 struct spdk_bdev *bdev = spdk_io_channel_iter_get_ctx(i); 3152 3153 bdev->internal.measured_queue_depth = bdev->internal.temporary_queue_depth; 3154 3155 if (bdev->internal.measured_queue_depth) { 3156 bdev->internal.io_time += bdev->internal.period; 3157 bdev->internal.weighted_io_time += bdev->internal.period * bdev->internal.measured_queue_depth; 3158 } 3159 } 3160 3161 static void 3162 _calculate_measured_qd(struct spdk_io_channel_iter *i) 3163 { 3164 struct spdk_bdev *bdev = spdk_io_channel_iter_get_ctx(i); 3165 struct spdk_io_channel *io_ch = spdk_io_channel_iter_get_channel(i); 3166 struct spdk_bdev_channel *ch = spdk_io_channel_get_ctx(io_ch); 3167 3168 bdev->internal.temporary_queue_depth += ch->io_outstanding; 3169 spdk_for_each_channel_continue(i, 0); 3170 } 3171 3172 static int 3173 bdev_calculate_measured_queue_depth(void *ctx) 3174 { 3175 struct spdk_bdev *bdev = ctx; 3176 bdev->internal.temporary_queue_depth = 0; 3177 spdk_for_each_channel(__bdev_to_io_dev(bdev), _calculate_measured_qd, bdev, 3178 _calculate_measured_qd_cpl); 3179 return SPDK_POLLER_BUSY; 3180 } 3181 3182 void 3183 spdk_bdev_set_qd_sampling_period(struct spdk_bdev *bdev, uint64_t period) 3184 { 3185 bdev->internal.period = period; 3186 3187 if (bdev->internal.qd_poller != NULL) { 3188 spdk_poller_unregister(&bdev->internal.qd_poller); 3189 bdev->internal.measured_queue_depth = UINT64_MAX; 3190 } 3191 3192 if (period != 0) { 3193 bdev->internal.qd_poller = SPDK_POLLER_REGISTER(bdev_calculate_measured_queue_depth, bdev, 3194 period); 3195 } 3196 } 3197 3198 static void 3199 _resize_notify(void *arg) 3200 { 3201 struct spdk_bdev_desc *desc = arg; 3202 3203 pthread_mutex_lock(&desc->mutex); 3204 desc->refs--; 3205 if (!desc->closed) { 3206 pthread_mutex_unlock(&desc->mutex); 3207 desc->callback.event_fn(SPDK_BDEV_EVENT_RESIZE, 3208 desc->bdev, 3209 desc->callback.ctx); 3210 return; 3211 } else if (0 == desc->refs) { 3212 /* This descriptor was closed after this resize_notify message was sent. 3213 * spdk_bdev_close() could not free the descriptor since this message was 3214 * in flight, so we free it now using bdev_desc_free(). 3215 */ 3216 pthread_mutex_unlock(&desc->mutex); 3217 bdev_desc_free(desc); 3218 return; 3219 } 3220 pthread_mutex_unlock(&desc->mutex); 3221 } 3222 3223 int 3224 spdk_bdev_notify_blockcnt_change(struct spdk_bdev *bdev, uint64_t size) 3225 { 3226 struct spdk_bdev_desc *desc; 3227 int ret; 3228 3229 pthread_mutex_lock(&bdev->internal.mutex); 3230 3231 /* bdev has open descriptors */ 3232 if (!TAILQ_EMPTY(&bdev->internal.open_descs) && 3233 bdev->blockcnt > size) { 3234 ret = -EBUSY; 3235 } else { 3236 bdev->blockcnt = size; 3237 TAILQ_FOREACH(desc, &bdev->internal.open_descs, link) { 3238 pthread_mutex_lock(&desc->mutex); 3239 if (desc->callback.open_with_ext && !desc->closed) { 3240 desc->refs++; 3241 spdk_thread_send_msg(desc->thread, _resize_notify, desc); 3242 } 3243 pthread_mutex_unlock(&desc->mutex); 3244 } 3245 ret = 0; 3246 } 3247 3248 pthread_mutex_unlock(&bdev->internal.mutex); 3249 3250 return ret; 3251 } 3252 3253 /* 3254 * Convert I/O offset and length from bytes to blocks. 3255 * 3256 * Returns zero on success or non-zero if the byte parameters aren't divisible by the block size. 3257 */ 3258 static uint64_t 3259 bdev_bytes_to_blocks(struct spdk_bdev *bdev, uint64_t offset_bytes, uint64_t *offset_blocks, 3260 uint64_t num_bytes, uint64_t *num_blocks) 3261 { 3262 uint32_t block_size = bdev->blocklen; 3263 uint8_t shift_cnt; 3264 3265 /* Avoid expensive div operations if possible. These spdk_u32 functions are very cheap. */ 3266 if (spdk_likely(spdk_u32_is_pow2(block_size))) { 3267 shift_cnt = spdk_u32log2(block_size); 3268 *offset_blocks = offset_bytes >> shift_cnt; 3269 *num_blocks = num_bytes >> shift_cnt; 3270 return (offset_bytes - (*offset_blocks << shift_cnt)) | 3271 (num_bytes - (*num_blocks << shift_cnt)); 3272 } else { 3273 *offset_blocks = offset_bytes / block_size; 3274 *num_blocks = num_bytes / block_size; 3275 return (offset_bytes % block_size) | (num_bytes % block_size); 3276 } 3277 } 3278 3279 static bool 3280 bdev_io_valid_blocks(struct spdk_bdev *bdev, uint64_t offset_blocks, uint64_t num_blocks) 3281 { 3282 /* Return failure if offset_blocks + num_blocks is less than offset_blocks; indicates there 3283 * has been an overflow and hence the offset has been wrapped around */ 3284 if (offset_blocks + num_blocks < offset_blocks) { 3285 return false; 3286 } 3287 3288 /* Return failure if offset_blocks + num_blocks exceeds the size of the bdev */ 3289 if (offset_blocks + num_blocks > bdev->blockcnt) { 3290 return false; 3291 } 3292 3293 return true; 3294 } 3295 3296 static bool 3297 _bdev_io_check_md_buf(const struct iovec *iovs, const void *md_buf) 3298 { 3299 return _is_buf_allocated(iovs) == (md_buf != NULL); 3300 } 3301 3302 static int 3303 bdev_read_blocks_with_md(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, void *buf, 3304 void *md_buf, int64_t offset_blocks, uint64_t num_blocks, 3305 spdk_bdev_io_completion_cb cb, void *cb_arg) 3306 { 3307 struct spdk_bdev *bdev = spdk_bdev_desc_get_bdev(desc); 3308 struct spdk_bdev_io *bdev_io; 3309 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 3310 3311 if (!bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) { 3312 return -EINVAL; 3313 } 3314 3315 bdev_io = bdev_channel_get_io(channel); 3316 if (!bdev_io) { 3317 return -ENOMEM; 3318 } 3319 3320 bdev_io->internal.ch = channel; 3321 bdev_io->internal.desc = desc; 3322 bdev_io->type = SPDK_BDEV_IO_TYPE_READ; 3323 bdev_io->u.bdev.iovs = &bdev_io->iov; 3324 bdev_io->u.bdev.iovs[0].iov_base = buf; 3325 bdev_io->u.bdev.iovs[0].iov_len = num_blocks * bdev->blocklen; 3326 bdev_io->u.bdev.iovcnt = 1; 3327 bdev_io->u.bdev.md_buf = md_buf; 3328 bdev_io->u.bdev.num_blocks = num_blocks; 3329 bdev_io->u.bdev.offset_blocks = offset_blocks; 3330 bdev_io_init(bdev_io, bdev, cb_arg, cb); 3331 3332 bdev_io_submit(bdev_io); 3333 return 0; 3334 } 3335 3336 int 3337 spdk_bdev_read(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 3338 void *buf, uint64_t offset, uint64_t nbytes, 3339 spdk_bdev_io_completion_cb cb, void *cb_arg) 3340 { 3341 uint64_t offset_blocks, num_blocks; 3342 3343 if (bdev_bytes_to_blocks(spdk_bdev_desc_get_bdev(desc), offset, &offset_blocks, 3344 nbytes, &num_blocks) != 0) { 3345 return -EINVAL; 3346 } 3347 3348 return spdk_bdev_read_blocks(desc, ch, buf, offset_blocks, num_blocks, cb, cb_arg); 3349 } 3350 3351 int 3352 spdk_bdev_read_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 3353 void *buf, uint64_t offset_blocks, uint64_t num_blocks, 3354 spdk_bdev_io_completion_cb cb, void *cb_arg) 3355 { 3356 return bdev_read_blocks_with_md(desc, ch, buf, NULL, offset_blocks, num_blocks, cb, cb_arg); 3357 } 3358 3359 int 3360 spdk_bdev_read_blocks_with_md(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 3361 void *buf, void *md_buf, int64_t offset_blocks, uint64_t num_blocks, 3362 spdk_bdev_io_completion_cb cb, void *cb_arg) 3363 { 3364 struct iovec iov = { 3365 .iov_base = buf, 3366 }; 3367 3368 if (!spdk_bdev_is_md_separate(spdk_bdev_desc_get_bdev(desc))) { 3369 return -EINVAL; 3370 } 3371 3372 if (!_bdev_io_check_md_buf(&iov, md_buf)) { 3373 return -EINVAL; 3374 } 3375 3376 return bdev_read_blocks_with_md(desc, ch, buf, md_buf, offset_blocks, num_blocks, 3377 cb, cb_arg); 3378 } 3379 3380 int 3381 spdk_bdev_readv(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 3382 struct iovec *iov, int iovcnt, 3383 uint64_t offset, uint64_t nbytes, 3384 spdk_bdev_io_completion_cb cb, void *cb_arg) 3385 { 3386 uint64_t offset_blocks, num_blocks; 3387 3388 if (bdev_bytes_to_blocks(spdk_bdev_desc_get_bdev(desc), offset, &offset_blocks, 3389 nbytes, &num_blocks) != 0) { 3390 return -EINVAL; 3391 } 3392 3393 return spdk_bdev_readv_blocks(desc, ch, iov, iovcnt, offset_blocks, num_blocks, cb, cb_arg); 3394 } 3395 3396 static int 3397 bdev_readv_blocks_with_md(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 3398 struct iovec *iov, int iovcnt, void *md_buf, uint64_t offset_blocks, 3399 uint64_t num_blocks, spdk_bdev_io_completion_cb cb, void *cb_arg) 3400 { 3401 struct spdk_bdev *bdev = spdk_bdev_desc_get_bdev(desc); 3402 struct spdk_bdev_io *bdev_io; 3403 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 3404 3405 if (!bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) { 3406 return -EINVAL; 3407 } 3408 3409 bdev_io = bdev_channel_get_io(channel); 3410 if (!bdev_io) { 3411 return -ENOMEM; 3412 } 3413 3414 bdev_io->internal.ch = channel; 3415 bdev_io->internal.desc = desc; 3416 bdev_io->type = SPDK_BDEV_IO_TYPE_READ; 3417 bdev_io->u.bdev.iovs = iov; 3418 bdev_io->u.bdev.iovcnt = iovcnt; 3419 bdev_io->u.bdev.md_buf = md_buf; 3420 bdev_io->u.bdev.num_blocks = num_blocks; 3421 bdev_io->u.bdev.offset_blocks = offset_blocks; 3422 bdev_io_init(bdev_io, bdev, cb_arg, cb); 3423 3424 bdev_io_submit(bdev_io); 3425 return 0; 3426 } 3427 3428 int spdk_bdev_readv_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 3429 struct iovec *iov, int iovcnt, 3430 uint64_t offset_blocks, uint64_t num_blocks, 3431 spdk_bdev_io_completion_cb cb, void *cb_arg) 3432 { 3433 return bdev_readv_blocks_with_md(desc, ch, iov, iovcnt, NULL, offset_blocks, 3434 num_blocks, cb, cb_arg); 3435 } 3436 3437 int 3438 spdk_bdev_readv_blocks_with_md(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 3439 struct iovec *iov, int iovcnt, void *md_buf, 3440 uint64_t offset_blocks, uint64_t num_blocks, 3441 spdk_bdev_io_completion_cb cb, void *cb_arg) 3442 { 3443 if (!spdk_bdev_is_md_separate(spdk_bdev_desc_get_bdev(desc))) { 3444 return -EINVAL; 3445 } 3446 3447 if (!_bdev_io_check_md_buf(iov, md_buf)) { 3448 return -EINVAL; 3449 } 3450 3451 return bdev_readv_blocks_with_md(desc, ch, iov, iovcnt, md_buf, offset_blocks, 3452 num_blocks, cb, cb_arg); 3453 } 3454 3455 static int 3456 bdev_write_blocks_with_md(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 3457 void *buf, void *md_buf, uint64_t offset_blocks, uint64_t num_blocks, 3458 spdk_bdev_io_completion_cb cb, void *cb_arg) 3459 { 3460 struct spdk_bdev *bdev = spdk_bdev_desc_get_bdev(desc); 3461 struct spdk_bdev_io *bdev_io; 3462 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 3463 3464 if (!desc->write) { 3465 return -EBADF; 3466 } 3467 3468 if (!bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) { 3469 return -EINVAL; 3470 } 3471 3472 bdev_io = bdev_channel_get_io(channel); 3473 if (!bdev_io) { 3474 return -ENOMEM; 3475 } 3476 3477 bdev_io->internal.ch = channel; 3478 bdev_io->internal.desc = desc; 3479 bdev_io->type = SPDK_BDEV_IO_TYPE_WRITE; 3480 bdev_io->u.bdev.iovs = &bdev_io->iov; 3481 bdev_io->u.bdev.iovs[0].iov_base = buf; 3482 bdev_io->u.bdev.iovs[0].iov_len = num_blocks * bdev->blocklen; 3483 bdev_io->u.bdev.iovcnt = 1; 3484 bdev_io->u.bdev.md_buf = md_buf; 3485 bdev_io->u.bdev.num_blocks = num_blocks; 3486 bdev_io->u.bdev.offset_blocks = offset_blocks; 3487 bdev_io_init(bdev_io, bdev, cb_arg, cb); 3488 3489 bdev_io_submit(bdev_io); 3490 return 0; 3491 } 3492 3493 int 3494 spdk_bdev_write(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 3495 void *buf, uint64_t offset, uint64_t nbytes, 3496 spdk_bdev_io_completion_cb cb, void *cb_arg) 3497 { 3498 uint64_t offset_blocks, num_blocks; 3499 3500 if (bdev_bytes_to_blocks(spdk_bdev_desc_get_bdev(desc), offset, &offset_blocks, 3501 nbytes, &num_blocks) != 0) { 3502 return -EINVAL; 3503 } 3504 3505 return spdk_bdev_write_blocks(desc, ch, buf, offset_blocks, num_blocks, cb, cb_arg); 3506 } 3507 3508 int 3509 spdk_bdev_write_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 3510 void *buf, uint64_t offset_blocks, uint64_t num_blocks, 3511 spdk_bdev_io_completion_cb cb, void *cb_arg) 3512 { 3513 return bdev_write_blocks_with_md(desc, ch, buf, NULL, offset_blocks, num_blocks, 3514 cb, cb_arg); 3515 } 3516 3517 int 3518 spdk_bdev_write_blocks_with_md(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 3519 void *buf, void *md_buf, uint64_t offset_blocks, uint64_t num_blocks, 3520 spdk_bdev_io_completion_cb cb, void *cb_arg) 3521 { 3522 struct iovec iov = { 3523 .iov_base = buf, 3524 }; 3525 3526 if (!spdk_bdev_is_md_separate(spdk_bdev_desc_get_bdev(desc))) { 3527 return -EINVAL; 3528 } 3529 3530 if (!_bdev_io_check_md_buf(&iov, md_buf)) { 3531 return -EINVAL; 3532 } 3533 3534 return bdev_write_blocks_with_md(desc, ch, buf, md_buf, offset_blocks, num_blocks, 3535 cb, cb_arg); 3536 } 3537 3538 static int 3539 bdev_writev_blocks_with_md(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 3540 struct iovec *iov, int iovcnt, void *md_buf, 3541 uint64_t offset_blocks, uint64_t num_blocks, 3542 spdk_bdev_io_completion_cb cb, void *cb_arg) 3543 { 3544 struct spdk_bdev *bdev = spdk_bdev_desc_get_bdev(desc); 3545 struct spdk_bdev_io *bdev_io; 3546 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 3547 3548 if (!desc->write) { 3549 return -EBADF; 3550 } 3551 3552 if (!bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) { 3553 return -EINVAL; 3554 } 3555 3556 bdev_io = bdev_channel_get_io(channel); 3557 if (!bdev_io) { 3558 return -ENOMEM; 3559 } 3560 3561 bdev_io->internal.ch = channel; 3562 bdev_io->internal.desc = desc; 3563 bdev_io->type = SPDK_BDEV_IO_TYPE_WRITE; 3564 bdev_io->u.bdev.iovs = iov; 3565 bdev_io->u.bdev.iovcnt = iovcnt; 3566 bdev_io->u.bdev.md_buf = md_buf; 3567 bdev_io->u.bdev.num_blocks = num_blocks; 3568 bdev_io->u.bdev.offset_blocks = offset_blocks; 3569 bdev_io_init(bdev_io, bdev, cb_arg, cb); 3570 3571 bdev_io_submit(bdev_io); 3572 return 0; 3573 } 3574 3575 int 3576 spdk_bdev_writev(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 3577 struct iovec *iov, int iovcnt, 3578 uint64_t offset, uint64_t len, 3579 spdk_bdev_io_completion_cb cb, void *cb_arg) 3580 { 3581 uint64_t offset_blocks, num_blocks; 3582 3583 if (bdev_bytes_to_blocks(spdk_bdev_desc_get_bdev(desc), offset, &offset_blocks, 3584 len, &num_blocks) != 0) { 3585 return -EINVAL; 3586 } 3587 3588 return spdk_bdev_writev_blocks(desc, ch, iov, iovcnt, offset_blocks, num_blocks, cb, cb_arg); 3589 } 3590 3591 int 3592 spdk_bdev_writev_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 3593 struct iovec *iov, int iovcnt, 3594 uint64_t offset_blocks, uint64_t num_blocks, 3595 spdk_bdev_io_completion_cb cb, void *cb_arg) 3596 { 3597 return bdev_writev_blocks_with_md(desc, ch, iov, iovcnt, NULL, offset_blocks, 3598 num_blocks, cb, cb_arg); 3599 } 3600 3601 int 3602 spdk_bdev_writev_blocks_with_md(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 3603 struct iovec *iov, int iovcnt, void *md_buf, 3604 uint64_t offset_blocks, uint64_t num_blocks, 3605 spdk_bdev_io_completion_cb cb, void *cb_arg) 3606 { 3607 if (!spdk_bdev_is_md_separate(spdk_bdev_desc_get_bdev(desc))) { 3608 return -EINVAL; 3609 } 3610 3611 if (!_bdev_io_check_md_buf(iov, md_buf)) { 3612 return -EINVAL; 3613 } 3614 3615 return bdev_writev_blocks_with_md(desc, ch, iov, iovcnt, md_buf, offset_blocks, 3616 num_blocks, cb, cb_arg); 3617 } 3618 3619 static void 3620 bdev_compare_do_read_done(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg) 3621 { 3622 struct spdk_bdev_io *parent_io = cb_arg; 3623 uint8_t *read_buf = bdev_io->u.bdev.iovs[0].iov_base; 3624 int i, rc = 0; 3625 3626 if (!success) { 3627 parent_io->internal.status = SPDK_BDEV_IO_STATUS_FAILED; 3628 parent_io->internal.cb(parent_io, false, parent_io->internal.caller_ctx); 3629 spdk_bdev_free_io(bdev_io); 3630 return; 3631 } 3632 3633 for (i = 0; i < parent_io->u.bdev.iovcnt; i++) { 3634 rc = memcmp(read_buf, 3635 parent_io->u.bdev.iovs[i].iov_base, 3636 parent_io->u.bdev.iovs[i].iov_len); 3637 if (rc) { 3638 break; 3639 } 3640 read_buf += parent_io->u.bdev.iovs[i].iov_len; 3641 } 3642 3643 spdk_bdev_free_io(bdev_io); 3644 3645 if (rc == 0) { 3646 parent_io->internal.status = SPDK_BDEV_IO_STATUS_SUCCESS; 3647 parent_io->internal.cb(parent_io, true, parent_io->internal.caller_ctx); 3648 } else { 3649 parent_io->internal.status = SPDK_BDEV_IO_STATUS_MISCOMPARE; 3650 parent_io->internal.cb(parent_io, false, parent_io->internal.caller_ctx); 3651 } 3652 } 3653 3654 static void 3655 bdev_compare_do_read(void *_bdev_io) 3656 { 3657 struct spdk_bdev_io *bdev_io = _bdev_io; 3658 int rc; 3659 3660 rc = spdk_bdev_read_blocks(bdev_io->internal.desc, 3661 spdk_io_channel_from_ctx(bdev_io->internal.ch), NULL, 3662 bdev_io->u.bdev.offset_blocks, bdev_io->u.bdev.num_blocks, 3663 bdev_compare_do_read_done, bdev_io); 3664 3665 if (rc == -ENOMEM) { 3666 bdev_queue_io_wait_with_cb(bdev_io, bdev_compare_do_read); 3667 } else if (rc != 0) { 3668 bdev_io->internal.status = SPDK_BDEV_IO_STATUS_FAILED; 3669 bdev_io->internal.cb(bdev_io, false, bdev_io->internal.caller_ctx); 3670 } 3671 } 3672 3673 static int 3674 bdev_comparev_blocks_with_md(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 3675 struct iovec *iov, int iovcnt, void *md_buf, 3676 uint64_t offset_blocks, uint64_t num_blocks, 3677 spdk_bdev_io_completion_cb cb, void *cb_arg) 3678 { 3679 struct spdk_bdev *bdev = spdk_bdev_desc_get_bdev(desc); 3680 struct spdk_bdev_io *bdev_io; 3681 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 3682 3683 if (!bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) { 3684 return -EINVAL; 3685 } 3686 3687 bdev_io = bdev_channel_get_io(channel); 3688 if (!bdev_io) { 3689 return -ENOMEM; 3690 } 3691 3692 bdev_io->internal.ch = channel; 3693 bdev_io->internal.desc = desc; 3694 bdev_io->type = SPDK_BDEV_IO_TYPE_COMPARE; 3695 bdev_io->u.bdev.iovs = iov; 3696 bdev_io->u.bdev.iovcnt = iovcnt; 3697 bdev_io->u.bdev.md_buf = md_buf; 3698 bdev_io->u.bdev.num_blocks = num_blocks; 3699 bdev_io->u.bdev.offset_blocks = offset_blocks; 3700 bdev_io_init(bdev_io, bdev, cb_arg, cb); 3701 3702 if (bdev_io_type_supported(bdev, SPDK_BDEV_IO_TYPE_COMPARE)) { 3703 bdev_io_submit(bdev_io); 3704 return 0; 3705 } 3706 3707 bdev_compare_do_read(bdev_io); 3708 3709 return 0; 3710 } 3711 3712 int 3713 spdk_bdev_comparev_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 3714 struct iovec *iov, int iovcnt, 3715 uint64_t offset_blocks, uint64_t num_blocks, 3716 spdk_bdev_io_completion_cb cb, void *cb_arg) 3717 { 3718 return bdev_comparev_blocks_with_md(desc, ch, iov, iovcnt, NULL, offset_blocks, 3719 num_blocks, cb, cb_arg); 3720 } 3721 3722 int 3723 spdk_bdev_comparev_blocks_with_md(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 3724 struct iovec *iov, int iovcnt, void *md_buf, 3725 uint64_t offset_blocks, uint64_t num_blocks, 3726 spdk_bdev_io_completion_cb cb, void *cb_arg) 3727 { 3728 if (!spdk_bdev_is_md_separate(spdk_bdev_desc_get_bdev(desc))) { 3729 return -EINVAL; 3730 } 3731 3732 if (!_bdev_io_check_md_buf(iov, md_buf)) { 3733 return -EINVAL; 3734 } 3735 3736 return bdev_comparev_blocks_with_md(desc, ch, iov, iovcnt, md_buf, offset_blocks, 3737 num_blocks, cb, cb_arg); 3738 } 3739 3740 static int 3741 bdev_compare_blocks_with_md(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 3742 void *buf, void *md_buf, uint64_t offset_blocks, uint64_t num_blocks, 3743 spdk_bdev_io_completion_cb cb, void *cb_arg) 3744 { 3745 struct spdk_bdev *bdev = spdk_bdev_desc_get_bdev(desc); 3746 struct spdk_bdev_io *bdev_io; 3747 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 3748 3749 if (!bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) { 3750 return -EINVAL; 3751 } 3752 3753 bdev_io = bdev_channel_get_io(channel); 3754 if (!bdev_io) { 3755 return -ENOMEM; 3756 } 3757 3758 bdev_io->internal.ch = channel; 3759 bdev_io->internal.desc = desc; 3760 bdev_io->type = SPDK_BDEV_IO_TYPE_COMPARE; 3761 bdev_io->u.bdev.iovs = &bdev_io->iov; 3762 bdev_io->u.bdev.iovs[0].iov_base = buf; 3763 bdev_io->u.bdev.iovs[0].iov_len = num_blocks * bdev->blocklen; 3764 bdev_io->u.bdev.iovcnt = 1; 3765 bdev_io->u.bdev.md_buf = md_buf; 3766 bdev_io->u.bdev.num_blocks = num_blocks; 3767 bdev_io->u.bdev.offset_blocks = offset_blocks; 3768 bdev_io_init(bdev_io, bdev, cb_arg, cb); 3769 3770 if (bdev_io_type_supported(bdev, SPDK_BDEV_IO_TYPE_COMPARE)) { 3771 bdev_io_submit(bdev_io); 3772 return 0; 3773 } 3774 3775 bdev_compare_do_read(bdev_io); 3776 3777 return 0; 3778 } 3779 3780 int 3781 spdk_bdev_compare_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 3782 void *buf, uint64_t offset_blocks, uint64_t num_blocks, 3783 spdk_bdev_io_completion_cb cb, void *cb_arg) 3784 { 3785 return bdev_compare_blocks_with_md(desc, ch, buf, NULL, offset_blocks, num_blocks, 3786 cb, cb_arg); 3787 } 3788 3789 int 3790 spdk_bdev_compare_blocks_with_md(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 3791 void *buf, void *md_buf, uint64_t offset_blocks, uint64_t num_blocks, 3792 spdk_bdev_io_completion_cb cb, void *cb_arg) 3793 { 3794 struct iovec iov = { 3795 .iov_base = buf, 3796 }; 3797 3798 if (!spdk_bdev_is_md_separate(spdk_bdev_desc_get_bdev(desc))) { 3799 return -EINVAL; 3800 } 3801 3802 if (!_bdev_io_check_md_buf(&iov, md_buf)) { 3803 return -EINVAL; 3804 } 3805 3806 return bdev_compare_blocks_with_md(desc, ch, buf, md_buf, offset_blocks, num_blocks, 3807 cb, cb_arg); 3808 } 3809 3810 static void 3811 bdev_comparev_and_writev_blocks_unlocked(void *ctx, int unlock_status) 3812 { 3813 struct spdk_bdev_io *bdev_io = ctx; 3814 3815 if (unlock_status) { 3816 SPDK_ERRLOG("LBA range unlock failed\n"); 3817 } 3818 3819 bdev_io->internal.cb(bdev_io, bdev_io->internal.status == SPDK_BDEV_IO_STATUS_SUCCESS ? true : 3820 false, bdev_io->internal.caller_ctx); 3821 } 3822 3823 static void 3824 bdev_comparev_and_writev_blocks_unlock(struct spdk_bdev_io *bdev_io, int status) 3825 { 3826 bdev_io->internal.status = status; 3827 3828 bdev_unlock_lba_range(bdev_io->internal.desc, spdk_io_channel_from_ctx(bdev_io->internal.ch), 3829 bdev_io->u.bdev.offset_blocks, bdev_io->u.bdev.num_blocks, 3830 bdev_comparev_and_writev_blocks_unlocked, bdev_io); 3831 } 3832 3833 static void 3834 bdev_compare_and_write_do_write_done(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg) 3835 { 3836 struct spdk_bdev_io *parent_io = cb_arg; 3837 3838 if (!success) { 3839 SPDK_ERRLOG("Compare and write operation failed\n"); 3840 } 3841 3842 spdk_bdev_free_io(bdev_io); 3843 3844 bdev_comparev_and_writev_blocks_unlock(parent_io, 3845 success ? SPDK_BDEV_IO_STATUS_SUCCESS : SPDK_BDEV_IO_STATUS_FAILED); 3846 } 3847 3848 static void 3849 bdev_compare_and_write_do_write(void *_bdev_io) 3850 { 3851 struct spdk_bdev_io *bdev_io = _bdev_io; 3852 int rc; 3853 3854 rc = spdk_bdev_writev_blocks(bdev_io->internal.desc, 3855 spdk_io_channel_from_ctx(bdev_io->internal.ch), 3856 bdev_io->u.bdev.fused_iovs, bdev_io->u.bdev.fused_iovcnt, 3857 bdev_io->u.bdev.offset_blocks, bdev_io->u.bdev.num_blocks, 3858 bdev_compare_and_write_do_write_done, bdev_io); 3859 3860 3861 if (rc == -ENOMEM) { 3862 bdev_queue_io_wait_with_cb(bdev_io, bdev_compare_and_write_do_write); 3863 } else if (rc != 0) { 3864 bdev_comparev_and_writev_blocks_unlock(bdev_io, SPDK_BDEV_IO_STATUS_FAILED); 3865 } 3866 } 3867 3868 static void 3869 bdev_compare_and_write_do_compare_done(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg) 3870 { 3871 struct spdk_bdev_io *parent_io = cb_arg; 3872 3873 spdk_bdev_free_io(bdev_io); 3874 3875 if (!success) { 3876 bdev_comparev_and_writev_blocks_unlock(parent_io, SPDK_BDEV_IO_STATUS_MISCOMPARE); 3877 return; 3878 } 3879 3880 bdev_compare_and_write_do_write(parent_io); 3881 } 3882 3883 static void 3884 bdev_compare_and_write_do_compare(void *_bdev_io) 3885 { 3886 struct spdk_bdev_io *bdev_io = _bdev_io; 3887 int rc; 3888 3889 rc = spdk_bdev_comparev_blocks(bdev_io->internal.desc, 3890 spdk_io_channel_from_ctx(bdev_io->internal.ch), bdev_io->u.bdev.iovs, 3891 bdev_io->u.bdev.iovcnt, bdev_io->u.bdev.offset_blocks, bdev_io->u.bdev.num_blocks, 3892 bdev_compare_and_write_do_compare_done, bdev_io); 3893 3894 if (rc == -ENOMEM) { 3895 bdev_queue_io_wait_with_cb(bdev_io, bdev_compare_and_write_do_compare); 3896 } else if (rc != 0) { 3897 bdev_comparev_and_writev_blocks_unlock(bdev_io, SPDK_BDEV_IO_STATUS_FIRST_FUSED_FAILED); 3898 } 3899 } 3900 3901 static void 3902 bdev_comparev_and_writev_blocks_locked(void *ctx, int status) 3903 { 3904 struct spdk_bdev_io *bdev_io = ctx; 3905 3906 if (status) { 3907 bdev_io->internal.status = SPDK_BDEV_IO_STATUS_FIRST_FUSED_FAILED; 3908 bdev_io->internal.cb(bdev_io, false, bdev_io->internal.caller_ctx); 3909 } 3910 3911 bdev_compare_and_write_do_compare(bdev_io); 3912 } 3913 3914 int 3915 spdk_bdev_comparev_and_writev_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 3916 struct iovec *compare_iov, int compare_iovcnt, 3917 struct iovec *write_iov, int write_iovcnt, 3918 uint64_t offset_blocks, uint64_t num_blocks, 3919 spdk_bdev_io_completion_cb cb, void *cb_arg) 3920 { 3921 struct spdk_bdev *bdev = spdk_bdev_desc_get_bdev(desc); 3922 struct spdk_bdev_io *bdev_io; 3923 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 3924 3925 if (!desc->write) { 3926 return -EBADF; 3927 } 3928 3929 if (!bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) { 3930 return -EINVAL; 3931 } 3932 3933 if (num_blocks > bdev->acwu) { 3934 return -EINVAL; 3935 } 3936 3937 bdev_io = bdev_channel_get_io(channel); 3938 if (!bdev_io) { 3939 return -ENOMEM; 3940 } 3941 3942 bdev_io->internal.ch = channel; 3943 bdev_io->internal.desc = desc; 3944 bdev_io->type = SPDK_BDEV_IO_TYPE_COMPARE_AND_WRITE; 3945 bdev_io->u.bdev.iovs = compare_iov; 3946 bdev_io->u.bdev.iovcnt = compare_iovcnt; 3947 bdev_io->u.bdev.fused_iovs = write_iov; 3948 bdev_io->u.bdev.fused_iovcnt = write_iovcnt; 3949 bdev_io->u.bdev.md_buf = NULL; 3950 bdev_io->u.bdev.num_blocks = num_blocks; 3951 bdev_io->u.bdev.offset_blocks = offset_blocks; 3952 bdev_io_init(bdev_io, bdev, cb_arg, cb); 3953 3954 if (bdev_io_type_supported(bdev, SPDK_BDEV_IO_TYPE_COMPARE_AND_WRITE)) { 3955 bdev_io_submit(bdev_io); 3956 return 0; 3957 } 3958 3959 return bdev_lock_lba_range(desc, ch, offset_blocks, num_blocks, 3960 bdev_comparev_and_writev_blocks_locked, bdev_io); 3961 } 3962 3963 static void 3964 bdev_zcopy_get_buf(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io, bool success) 3965 { 3966 if (!success) { 3967 /* Don't use spdk_bdev_io_complete here - this bdev_io was never actually submitted. */ 3968 bdev_io->internal.status = SPDK_BDEV_IO_STATUS_NOMEM; 3969 bdev_io->internal.cb(bdev_io, success, bdev_io->internal.caller_ctx); 3970 return; 3971 } 3972 3973 if (bdev_io->u.bdev.zcopy.populate) { 3974 /* Read the real data into the buffer */ 3975 bdev_io->type = SPDK_BDEV_IO_TYPE_READ; 3976 bdev_io->internal.status = SPDK_BDEV_IO_STATUS_PENDING; 3977 bdev_io_submit(bdev_io); 3978 return; 3979 } 3980 3981 /* Don't use spdk_bdev_io_complete here - this bdev_io was never actually submitted. */ 3982 bdev_io->internal.status = SPDK_BDEV_IO_STATUS_SUCCESS; 3983 bdev_io->internal.cb(bdev_io, success, bdev_io->internal.caller_ctx); 3984 } 3985 3986 int 3987 spdk_bdev_zcopy_start(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 3988 uint64_t offset_blocks, uint64_t num_blocks, 3989 bool populate, 3990 spdk_bdev_io_completion_cb cb, void *cb_arg) 3991 { 3992 struct spdk_bdev *bdev = spdk_bdev_desc_get_bdev(desc); 3993 struct spdk_bdev_io *bdev_io; 3994 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 3995 3996 if (!desc->write) { 3997 return -EBADF; 3998 } 3999 4000 if (!bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) { 4001 return -EINVAL; 4002 } 4003 4004 if (!spdk_bdev_io_type_supported(bdev, SPDK_BDEV_IO_TYPE_ZCOPY)) { 4005 return -ENOTSUP; 4006 } 4007 4008 bdev_io = bdev_channel_get_io(channel); 4009 if (!bdev_io) { 4010 return -ENOMEM; 4011 } 4012 4013 bdev_io->internal.ch = channel; 4014 bdev_io->internal.desc = desc; 4015 bdev_io->type = SPDK_BDEV_IO_TYPE_ZCOPY; 4016 bdev_io->u.bdev.num_blocks = num_blocks; 4017 bdev_io->u.bdev.offset_blocks = offset_blocks; 4018 bdev_io->u.bdev.iovs = NULL; 4019 bdev_io->u.bdev.iovcnt = 0; 4020 bdev_io->u.bdev.md_buf = NULL; 4021 bdev_io->u.bdev.zcopy.populate = populate ? 1 : 0; 4022 bdev_io->u.bdev.zcopy.commit = 0; 4023 bdev_io->u.bdev.zcopy.start = 1; 4024 bdev_io_init(bdev_io, bdev, cb_arg, cb); 4025 4026 if (bdev_io_type_supported(bdev, SPDK_BDEV_IO_TYPE_ZCOPY)) { 4027 bdev_io_submit(bdev_io); 4028 } else { 4029 /* Emulate zcopy by allocating a buffer */ 4030 spdk_bdev_io_get_buf(bdev_io, bdev_zcopy_get_buf, 4031 bdev_io->u.bdev.num_blocks * bdev->blocklen); 4032 } 4033 4034 return 0; 4035 } 4036 4037 int 4038 spdk_bdev_zcopy_end(struct spdk_bdev_io *bdev_io, bool commit, 4039 spdk_bdev_io_completion_cb cb, void *cb_arg) 4040 { 4041 struct spdk_bdev *bdev = bdev_io->bdev; 4042 4043 if (bdev_io->type == SPDK_BDEV_IO_TYPE_READ) { 4044 /* This can happen if the zcopy was emulated in start */ 4045 if (bdev_io->u.bdev.zcopy.start != 1) { 4046 return -EINVAL; 4047 } 4048 bdev_io->type = SPDK_BDEV_IO_TYPE_ZCOPY; 4049 } 4050 4051 if (bdev_io->type != SPDK_BDEV_IO_TYPE_ZCOPY) { 4052 return -EINVAL; 4053 } 4054 4055 bdev_io->u.bdev.zcopy.commit = commit ? 1 : 0; 4056 bdev_io->u.bdev.zcopy.start = 0; 4057 bdev_io->internal.caller_ctx = cb_arg; 4058 bdev_io->internal.cb = cb; 4059 bdev_io->internal.status = SPDK_BDEV_IO_STATUS_PENDING; 4060 4061 if (bdev_io_type_supported(bdev, SPDK_BDEV_IO_TYPE_ZCOPY)) { 4062 bdev_io_submit(bdev_io); 4063 return 0; 4064 } 4065 4066 if (!bdev_io->u.bdev.zcopy.commit) { 4067 /* Don't use spdk_bdev_io_complete here - this bdev_io was never actually submitted. */ 4068 bdev_io->internal.status = SPDK_BDEV_IO_STATUS_SUCCESS; 4069 bdev_io->internal.cb(bdev_io, true, bdev_io->internal.caller_ctx); 4070 return 0; 4071 } 4072 4073 bdev_io->type = SPDK_BDEV_IO_TYPE_WRITE; 4074 bdev_io_submit(bdev_io); 4075 4076 return 0; 4077 } 4078 4079 int 4080 spdk_bdev_write_zeroes(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 4081 uint64_t offset, uint64_t len, 4082 spdk_bdev_io_completion_cb cb, void *cb_arg) 4083 { 4084 uint64_t offset_blocks, num_blocks; 4085 4086 if (bdev_bytes_to_blocks(spdk_bdev_desc_get_bdev(desc), offset, &offset_blocks, 4087 len, &num_blocks) != 0) { 4088 return -EINVAL; 4089 } 4090 4091 return spdk_bdev_write_zeroes_blocks(desc, ch, offset_blocks, num_blocks, cb, cb_arg); 4092 } 4093 4094 int 4095 spdk_bdev_write_zeroes_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 4096 uint64_t offset_blocks, uint64_t num_blocks, 4097 spdk_bdev_io_completion_cb cb, void *cb_arg) 4098 { 4099 struct spdk_bdev *bdev = spdk_bdev_desc_get_bdev(desc); 4100 struct spdk_bdev_io *bdev_io; 4101 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 4102 4103 if (!desc->write) { 4104 return -EBADF; 4105 } 4106 4107 if (!bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) { 4108 return -EINVAL; 4109 } 4110 4111 if (!bdev_io_type_supported(bdev, SPDK_BDEV_IO_TYPE_WRITE_ZEROES) && 4112 !bdev_io_type_supported(bdev, SPDK_BDEV_IO_TYPE_WRITE)) { 4113 return -ENOTSUP; 4114 } 4115 4116 bdev_io = bdev_channel_get_io(channel); 4117 4118 if (!bdev_io) { 4119 return -ENOMEM; 4120 } 4121 4122 bdev_io->type = SPDK_BDEV_IO_TYPE_WRITE_ZEROES; 4123 bdev_io->internal.ch = channel; 4124 bdev_io->internal.desc = desc; 4125 bdev_io->u.bdev.offset_blocks = offset_blocks; 4126 bdev_io->u.bdev.num_blocks = num_blocks; 4127 bdev_io_init(bdev_io, bdev, cb_arg, cb); 4128 4129 if (bdev_io_type_supported(bdev, SPDK_BDEV_IO_TYPE_WRITE_ZEROES)) { 4130 bdev_io_submit(bdev_io); 4131 return 0; 4132 } 4133 4134 assert(bdev_io_type_supported(bdev, SPDK_BDEV_IO_TYPE_WRITE)); 4135 assert(_bdev_get_block_size_with_md(bdev) <= ZERO_BUFFER_SIZE); 4136 bdev_io->u.bdev.split_remaining_num_blocks = num_blocks; 4137 bdev_io->u.bdev.split_current_offset_blocks = offset_blocks; 4138 bdev_write_zero_buffer_next(bdev_io); 4139 4140 return 0; 4141 } 4142 4143 int 4144 spdk_bdev_unmap(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 4145 uint64_t offset, uint64_t nbytes, 4146 spdk_bdev_io_completion_cb cb, void *cb_arg) 4147 { 4148 uint64_t offset_blocks, num_blocks; 4149 4150 if (bdev_bytes_to_blocks(spdk_bdev_desc_get_bdev(desc), offset, &offset_blocks, 4151 nbytes, &num_blocks) != 0) { 4152 return -EINVAL; 4153 } 4154 4155 return spdk_bdev_unmap_blocks(desc, ch, offset_blocks, num_blocks, cb, cb_arg); 4156 } 4157 4158 int 4159 spdk_bdev_unmap_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 4160 uint64_t offset_blocks, uint64_t num_blocks, 4161 spdk_bdev_io_completion_cb cb, void *cb_arg) 4162 { 4163 struct spdk_bdev *bdev = spdk_bdev_desc_get_bdev(desc); 4164 struct spdk_bdev_io *bdev_io; 4165 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 4166 4167 if (!desc->write) { 4168 return -EBADF; 4169 } 4170 4171 if (!bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) { 4172 return -EINVAL; 4173 } 4174 4175 if (num_blocks == 0) { 4176 SPDK_ERRLOG("Can't unmap 0 bytes\n"); 4177 return -EINVAL; 4178 } 4179 4180 bdev_io = bdev_channel_get_io(channel); 4181 if (!bdev_io) { 4182 return -ENOMEM; 4183 } 4184 4185 bdev_io->internal.ch = channel; 4186 bdev_io->internal.desc = desc; 4187 bdev_io->type = SPDK_BDEV_IO_TYPE_UNMAP; 4188 4189 bdev_io->u.bdev.iovs = &bdev_io->iov; 4190 bdev_io->u.bdev.iovs[0].iov_base = NULL; 4191 bdev_io->u.bdev.iovs[0].iov_len = 0; 4192 bdev_io->u.bdev.iovcnt = 1; 4193 4194 bdev_io->u.bdev.offset_blocks = offset_blocks; 4195 bdev_io->u.bdev.num_blocks = num_blocks; 4196 bdev_io_init(bdev_io, bdev, cb_arg, cb); 4197 4198 bdev_io_submit(bdev_io); 4199 return 0; 4200 } 4201 4202 int 4203 spdk_bdev_flush(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 4204 uint64_t offset, uint64_t length, 4205 spdk_bdev_io_completion_cb cb, void *cb_arg) 4206 { 4207 uint64_t offset_blocks, num_blocks; 4208 4209 if (bdev_bytes_to_blocks(spdk_bdev_desc_get_bdev(desc), offset, &offset_blocks, 4210 length, &num_blocks) != 0) { 4211 return -EINVAL; 4212 } 4213 4214 return spdk_bdev_flush_blocks(desc, ch, offset_blocks, num_blocks, cb, cb_arg); 4215 } 4216 4217 int 4218 spdk_bdev_flush_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 4219 uint64_t offset_blocks, uint64_t num_blocks, 4220 spdk_bdev_io_completion_cb cb, void *cb_arg) 4221 { 4222 struct spdk_bdev *bdev = spdk_bdev_desc_get_bdev(desc); 4223 struct spdk_bdev_io *bdev_io; 4224 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 4225 4226 if (!desc->write) { 4227 return -EBADF; 4228 } 4229 4230 if (!bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) { 4231 return -EINVAL; 4232 } 4233 4234 bdev_io = bdev_channel_get_io(channel); 4235 if (!bdev_io) { 4236 return -ENOMEM; 4237 } 4238 4239 bdev_io->internal.ch = channel; 4240 bdev_io->internal.desc = desc; 4241 bdev_io->type = SPDK_BDEV_IO_TYPE_FLUSH; 4242 bdev_io->u.bdev.iovs = NULL; 4243 bdev_io->u.bdev.iovcnt = 0; 4244 bdev_io->u.bdev.offset_blocks = offset_blocks; 4245 bdev_io->u.bdev.num_blocks = num_blocks; 4246 bdev_io_init(bdev_io, bdev, cb_arg, cb); 4247 4248 bdev_io_submit(bdev_io); 4249 return 0; 4250 } 4251 4252 static void 4253 bdev_reset_dev(struct spdk_io_channel_iter *i, int status) 4254 { 4255 struct spdk_bdev_channel *ch = spdk_io_channel_iter_get_ctx(i); 4256 struct spdk_bdev_io *bdev_io; 4257 4258 bdev_io = TAILQ_FIRST(&ch->queued_resets); 4259 TAILQ_REMOVE(&ch->queued_resets, bdev_io, internal.link); 4260 bdev_io_submit_reset(bdev_io); 4261 } 4262 4263 static void 4264 bdev_reset_freeze_channel(struct spdk_io_channel_iter *i) 4265 { 4266 struct spdk_io_channel *ch; 4267 struct spdk_bdev_channel *channel; 4268 struct spdk_bdev_mgmt_channel *mgmt_channel; 4269 struct spdk_bdev_shared_resource *shared_resource; 4270 bdev_io_tailq_t tmp_queued; 4271 4272 TAILQ_INIT(&tmp_queued); 4273 4274 ch = spdk_io_channel_iter_get_channel(i); 4275 channel = spdk_io_channel_get_ctx(ch); 4276 shared_resource = channel->shared_resource; 4277 mgmt_channel = shared_resource->mgmt_ch; 4278 4279 channel->flags |= BDEV_CH_RESET_IN_PROGRESS; 4280 4281 if ((channel->flags & BDEV_CH_QOS_ENABLED) != 0) { 4282 /* The QoS object is always valid and readable while 4283 * the channel flag is set, so the lock here should not 4284 * be necessary. We're not in the fast path though, so 4285 * just take it anyway. */ 4286 pthread_mutex_lock(&channel->bdev->internal.mutex); 4287 if (channel->bdev->internal.qos->ch == channel) { 4288 TAILQ_SWAP(&channel->bdev->internal.qos->queued, &tmp_queued, spdk_bdev_io, internal.link); 4289 } 4290 pthread_mutex_unlock(&channel->bdev->internal.mutex); 4291 } 4292 4293 bdev_abort_all_queued_io(&shared_resource->nomem_io, channel); 4294 bdev_abort_all_buf_io(&mgmt_channel->need_buf_small, channel); 4295 bdev_abort_all_buf_io(&mgmt_channel->need_buf_large, channel); 4296 bdev_abort_all_queued_io(&tmp_queued, channel); 4297 4298 spdk_for_each_channel_continue(i, 0); 4299 } 4300 4301 static void 4302 bdev_start_reset(void *ctx) 4303 { 4304 struct spdk_bdev_channel *ch = ctx; 4305 4306 spdk_for_each_channel(__bdev_to_io_dev(ch->bdev), bdev_reset_freeze_channel, 4307 ch, bdev_reset_dev); 4308 } 4309 4310 static void 4311 bdev_channel_start_reset(struct spdk_bdev_channel *ch) 4312 { 4313 struct spdk_bdev *bdev = ch->bdev; 4314 4315 assert(!TAILQ_EMPTY(&ch->queued_resets)); 4316 4317 pthread_mutex_lock(&bdev->internal.mutex); 4318 if (bdev->internal.reset_in_progress == NULL) { 4319 bdev->internal.reset_in_progress = TAILQ_FIRST(&ch->queued_resets); 4320 /* 4321 * Take a channel reference for the target bdev for the life of this 4322 * reset. This guards against the channel getting destroyed while 4323 * spdk_for_each_channel() calls related to this reset IO are in 4324 * progress. We will release the reference when this reset is 4325 * completed. 4326 */ 4327 bdev->internal.reset_in_progress->u.reset.ch_ref = spdk_get_io_channel(__bdev_to_io_dev(bdev)); 4328 bdev_start_reset(ch); 4329 } 4330 pthread_mutex_unlock(&bdev->internal.mutex); 4331 } 4332 4333 int 4334 spdk_bdev_reset(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 4335 spdk_bdev_io_completion_cb cb, void *cb_arg) 4336 { 4337 struct spdk_bdev *bdev = spdk_bdev_desc_get_bdev(desc); 4338 struct spdk_bdev_io *bdev_io; 4339 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 4340 4341 bdev_io = bdev_channel_get_io(channel); 4342 if (!bdev_io) { 4343 return -ENOMEM; 4344 } 4345 4346 bdev_io->internal.ch = channel; 4347 bdev_io->internal.desc = desc; 4348 bdev_io->internal.submit_tsc = spdk_get_ticks(); 4349 bdev_io->type = SPDK_BDEV_IO_TYPE_RESET; 4350 bdev_io->u.reset.ch_ref = NULL; 4351 bdev_io_init(bdev_io, bdev, cb_arg, cb); 4352 4353 pthread_mutex_lock(&bdev->internal.mutex); 4354 TAILQ_INSERT_TAIL(&channel->queued_resets, bdev_io, internal.link); 4355 pthread_mutex_unlock(&bdev->internal.mutex); 4356 4357 TAILQ_INSERT_TAIL(&bdev_io->internal.ch->io_submitted, bdev_io, 4358 internal.ch_link); 4359 4360 bdev_channel_start_reset(channel); 4361 4362 return 0; 4363 } 4364 4365 void 4366 spdk_bdev_get_io_stat(struct spdk_bdev *bdev, struct spdk_io_channel *ch, 4367 struct spdk_bdev_io_stat *stat) 4368 { 4369 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 4370 4371 *stat = channel->stat; 4372 } 4373 4374 static void 4375 bdev_get_device_stat_done(struct spdk_io_channel_iter *i, int status) 4376 { 4377 void *io_device = spdk_io_channel_iter_get_io_device(i); 4378 struct spdk_bdev_iostat_ctx *bdev_iostat_ctx = spdk_io_channel_iter_get_ctx(i); 4379 4380 bdev_iostat_ctx->cb(__bdev_from_io_dev(io_device), bdev_iostat_ctx->stat, 4381 bdev_iostat_ctx->cb_arg, 0); 4382 free(bdev_iostat_ctx); 4383 } 4384 4385 static void 4386 bdev_get_each_channel_stat(struct spdk_io_channel_iter *i) 4387 { 4388 struct spdk_bdev_iostat_ctx *bdev_iostat_ctx = spdk_io_channel_iter_get_ctx(i); 4389 struct spdk_io_channel *ch = spdk_io_channel_iter_get_channel(i); 4390 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 4391 4392 bdev_io_stat_add(bdev_iostat_ctx->stat, &channel->stat); 4393 spdk_for_each_channel_continue(i, 0); 4394 } 4395 4396 void 4397 spdk_bdev_get_device_stat(struct spdk_bdev *bdev, struct spdk_bdev_io_stat *stat, 4398 spdk_bdev_get_device_stat_cb cb, void *cb_arg) 4399 { 4400 struct spdk_bdev_iostat_ctx *bdev_iostat_ctx; 4401 4402 assert(bdev != NULL); 4403 assert(stat != NULL); 4404 assert(cb != NULL); 4405 4406 bdev_iostat_ctx = calloc(1, sizeof(struct spdk_bdev_iostat_ctx)); 4407 if (bdev_iostat_ctx == NULL) { 4408 SPDK_ERRLOG("Unable to allocate memory for spdk_bdev_iostat_ctx\n"); 4409 cb(bdev, stat, cb_arg, -ENOMEM); 4410 return; 4411 } 4412 4413 bdev_iostat_ctx->stat = stat; 4414 bdev_iostat_ctx->cb = cb; 4415 bdev_iostat_ctx->cb_arg = cb_arg; 4416 4417 /* Start with the statistics from previously deleted channels. */ 4418 pthread_mutex_lock(&bdev->internal.mutex); 4419 bdev_io_stat_add(bdev_iostat_ctx->stat, &bdev->internal.stat); 4420 pthread_mutex_unlock(&bdev->internal.mutex); 4421 4422 /* Then iterate and add the statistics from each existing channel. */ 4423 spdk_for_each_channel(__bdev_to_io_dev(bdev), 4424 bdev_get_each_channel_stat, 4425 bdev_iostat_ctx, 4426 bdev_get_device_stat_done); 4427 } 4428 4429 int 4430 spdk_bdev_nvme_admin_passthru(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 4431 const struct spdk_nvme_cmd *cmd, void *buf, size_t nbytes, 4432 spdk_bdev_io_completion_cb cb, void *cb_arg) 4433 { 4434 struct spdk_bdev *bdev = spdk_bdev_desc_get_bdev(desc); 4435 struct spdk_bdev_io *bdev_io; 4436 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 4437 4438 if (!desc->write) { 4439 return -EBADF; 4440 } 4441 4442 bdev_io = bdev_channel_get_io(channel); 4443 if (!bdev_io) { 4444 return -ENOMEM; 4445 } 4446 4447 bdev_io->internal.ch = channel; 4448 bdev_io->internal.desc = desc; 4449 bdev_io->type = SPDK_BDEV_IO_TYPE_NVME_ADMIN; 4450 bdev_io->u.nvme_passthru.cmd = *cmd; 4451 bdev_io->u.nvme_passthru.buf = buf; 4452 bdev_io->u.nvme_passthru.nbytes = nbytes; 4453 bdev_io->u.nvme_passthru.md_buf = NULL; 4454 bdev_io->u.nvme_passthru.md_len = 0; 4455 4456 bdev_io_init(bdev_io, bdev, cb_arg, cb); 4457 4458 bdev_io_submit(bdev_io); 4459 return 0; 4460 } 4461 4462 int 4463 spdk_bdev_nvme_io_passthru(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 4464 const struct spdk_nvme_cmd *cmd, void *buf, size_t nbytes, 4465 spdk_bdev_io_completion_cb cb, void *cb_arg) 4466 { 4467 struct spdk_bdev *bdev = spdk_bdev_desc_get_bdev(desc); 4468 struct spdk_bdev_io *bdev_io; 4469 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 4470 4471 if (!desc->write) { 4472 /* 4473 * Do not try to parse the NVMe command - we could maybe use bits in the opcode 4474 * to easily determine if the command is a read or write, but for now just 4475 * do not allow io_passthru with a read-only descriptor. 4476 */ 4477 return -EBADF; 4478 } 4479 4480 bdev_io = bdev_channel_get_io(channel); 4481 if (!bdev_io) { 4482 return -ENOMEM; 4483 } 4484 4485 bdev_io->internal.ch = channel; 4486 bdev_io->internal.desc = desc; 4487 bdev_io->type = SPDK_BDEV_IO_TYPE_NVME_IO; 4488 bdev_io->u.nvme_passthru.cmd = *cmd; 4489 bdev_io->u.nvme_passthru.buf = buf; 4490 bdev_io->u.nvme_passthru.nbytes = nbytes; 4491 bdev_io->u.nvme_passthru.md_buf = NULL; 4492 bdev_io->u.nvme_passthru.md_len = 0; 4493 4494 bdev_io_init(bdev_io, bdev, cb_arg, cb); 4495 4496 bdev_io_submit(bdev_io); 4497 return 0; 4498 } 4499 4500 int 4501 spdk_bdev_nvme_io_passthru_md(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 4502 const struct spdk_nvme_cmd *cmd, void *buf, size_t nbytes, void *md_buf, size_t md_len, 4503 spdk_bdev_io_completion_cb cb, void *cb_arg) 4504 { 4505 struct spdk_bdev *bdev = spdk_bdev_desc_get_bdev(desc); 4506 struct spdk_bdev_io *bdev_io; 4507 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 4508 4509 if (!desc->write) { 4510 /* 4511 * Do not try to parse the NVMe command - we could maybe use bits in the opcode 4512 * to easily determine if the command is a read or write, but for now just 4513 * do not allow io_passthru with a read-only descriptor. 4514 */ 4515 return -EBADF; 4516 } 4517 4518 bdev_io = bdev_channel_get_io(channel); 4519 if (!bdev_io) { 4520 return -ENOMEM; 4521 } 4522 4523 bdev_io->internal.ch = channel; 4524 bdev_io->internal.desc = desc; 4525 bdev_io->type = SPDK_BDEV_IO_TYPE_NVME_IO_MD; 4526 bdev_io->u.nvme_passthru.cmd = *cmd; 4527 bdev_io->u.nvme_passthru.buf = buf; 4528 bdev_io->u.nvme_passthru.nbytes = nbytes; 4529 bdev_io->u.nvme_passthru.md_buf = md_buf; 4530 bdev_io->u.nvme_passthru.md_len = md_len; 4531 4532 bdev_io_init(bdev_io, bdev, cb_arg, cb); 4533 4534 bdev_io_submit(bdev_io); 4535 return 0; 4536 } 4537 4538 static void bdev_abort_retry(void *ctx); 4539 static void bdev_abort(struct spdk_bdev_io *parent_io); 4540 4541 static void 4542 bdev_abort_io_done(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg) 4543 { 4544 struct spdk_bdev_channel *channel = bdev_io->internal.ch; 4545 struct spdk_bdev_io *parent_io = cb_arg; 4546 struct spdk_bdev_io *bio_to_abort, *tmp_io; 4547 4548 bio_to_abort = bdev_io->u.abort.bio_to_abort; 4549 4550 spdk_bdev_free_io(bdev_io); 4551 4552 if (!success) { 4553 /* Check if the target I/O completed in the meantime. */ 4554 TAILQ_FOREACH(tmp_io, &channel->io_submitted, internal.ch_link) { 4555 if (tmp_io == bio_to_abort) { 4556 break; 4557 } 4558 } 4559 4560 /* If the target I/O still exists, set the parent to failed. */ 4561 if (tmp_io != NULL) { 4562 parent_io->internal.status = SPDK_BDEV_IO_STATUS_FAILED; 4563 } 4564 } 4565 4566 parent_io->u.bdev.split_outstanding--; 4567 if (parent_io->u.bdev.split_outstanding == 0) { 4568 if (parent_io->internal.status == SPDK_BDEV_IO_STATUS_NOMEM) { 4569 bdev_abort_retry(parent_io); 4570 } else { 4571 bdev_io_complete(parent_io); 4572 } 4573 } 4574 } 4575 4576 static int 4577 bdev_abort_io(struct spdk_bdev_desc *desc, struct spdk_bdev_channel *channel, 4578 struct spdk_bdev_io *bio_to_abort, 4579 spdk_bdev_io_completion_cb cb, void *cb_arg) 4580 { 4581 struct spdk_bdev *bdev = spdk_bdev_desc_get_bdev(desc); 4582 struct spdk_bdev_io *bdev_io; 4583 4584 if (bio_to_abort->type == SPDK_BDEV_IO_TYPE_ABORT || 4585 bio_to_abort->type == SPDK_BDEV_IO_TYPE_RESET) { 4586 /* TODO: Abort reset or abort request. */ 4587 return -ENOTSUP; 4588 } 4589 4590 bdev_io = bdev_channel_get_io(channel); 4591 if (bdev_io == NULL) { 4592 return -ENOMEM; 4593 } 4594 4595 bdev_io->internal.ch = channel; 4596 bdev_io->internal.desc = desc; 4597 bdev_io->type = SPDK_BDEV_IO_TYPE_ABORT; 4598 bdev_io_init(bdev_io, bdev, cb_arg, cb); 4599 4600 if (bdev->split_on_optimal_io_boundary && bdev_io_should_split(bio_to_abort)) { 4601 bdev_io->u.bdev.abort.bio_cb_arg = bio_to_abort; 4602 4603 /* Parent abort request is not submitted directly, but to manage its 4604 * execution add it to the submitted list here. 4605 */ 4606 bdev_io->internal.submit_tsc = spdk_get_ticks(); 4607 TAILQ_INSERT_TAIL(&channel->io_submitted, bdev_io, internal.ch_link); 4608 4609 bdev_abort(bdev_io); 4610 4611 return 0; 4612 } 4613 4614 bdev_io->u.abort.bio_to_abort = bio_to_abort; 4615 4616 /* Submit the abort request to the underlying bdev module. */ 4617 bdev_io_submit(bdev_io); 4618 4619 return 0; 4620 } 4621 4622 static uint32_t 4623 _bdev_abort(struct spdk_bdev_io *parent_io) 4624 { 4625 struct spdk_bdev_desc *desc = parent_io->internal.desc; 4626 struct spdk_bdev_channel *channel = parent_io->internal.ch; 4627 void *bio_cb_arg; 4628 struct spdk_bdev_io *bio_to_abort; 4629 uint32_t matched_ios; 4630 int rc; 4631 4632 bio_cb_arg = parent_io->u.bdev.abort.bio_cb_arg; 4633 4634 /* matched_ios is returned and will be kept by the caller. 4635 * 4636 * This funcion will be used for two cases, 1) the same cb_arg is used for 4637 * multiple I/Os, 2) a single large I/O is split into smaller ones. 4638 * Incrementing split_outstanding directly here may confuse readers especially 4639 * for the 1st case. 4640 * 4641 * Completion of I/O abort is processed after stack unwinding. Hence this trick 4642 * works as expected. 4643 */ 4644 matched_ios = 0; 4645 parent_io->internal.status = SPDK_BDEV_IO_STATUS_SUCCESS; 4646 4647 TAILQ_FOREACH(bio_to_abort, &channel->io_submitted, internal.ch_link) { 4648 if (bio_to_abort->internal.caller_ctx != bio_cb_arg) { 4649 continue; 4650 } 4651 4652 if (bio_to_abort->internal.submit_tsc > parent_io->internal.submit_tsc) { 4653 /* Any I/O which was submitted after this abort command should be excluded. */ 4654 continue; 4655 } 4656 4657 rc = bdev_abort_io(desc, channel, bio_to_abort, bdev_abort_io_done, parent_io); 4658 if (rc != 0) { 4659 if (rc == -ENOMEM) { 4660 parent_io->internal.status = SPDK_BDEV_IO_STATUS_NOMEM; 4661 } else { 4662 parent_io->internal.status = SPDK_BDEV_IO_STATUS_FAILED; 4663 } 4664 break; 4665 } 4666 matched_ios++; 4667 } 4668 4669 return matched_ios; 4670 } 4671 4672 static void 4673 bdev_abort_retry(void *ctx) 4674 { 4675 struct spdk_bdev_io *parent_io = ctx; 4676 uint32_t matched_ios; 4677 4678 matched_ios = _bdev_abort(parent_io); 4679 4680 if (matched_ios == 0) { 4681 if (parent_io->internal.status == SPDK_BDEV_IO_STATUS_NOMEM) { 4682 bdev_queue_io_wait_with_cb(parent_io, bdev_abort_retry); 4683 } else { 4684 /* For retry, the case that no target I/O was found is success 4685 * because it means target I/Os completed in the meantime. 4686 */ 4687 bdev_io_complete(parent_io); 4688 } 4689 return; 4690 } 4691 4692 /* Use split_outstanding to manage the progress of aborting I/Os. */ 4693 parent_io->u.bdev.split_outstanding = matched_ios; 4694 } 4695 4696 static void 4697 bdev_abort(struct spdk_bdev_io *parent_io) 4698 { 4699 uint32_t matched_ios; 4700 4701 matched_ios = _bdev_abort(parent_io); 4702 4703 if (matched_ios == 0) { 4704 if (parent_io->internal.status == SPDK_BDEV_IO_STATUS_NOMEM) { 4705 bdev_queue_io_wait_with_cb(parent_io, bdev_abort_retry); 4706 } else { 4707 /* The case the no target I/O was found is failure. */ 4708 parent_io->internal.status = SPDK_BDEV_IO_STATUS_FAILED; 4709 bdev_io_complete(parent_io); 4710 } 4711 return; 4712 } 4713 4714 /* Use split_outstanding to manage the progress of aborting I/Os. */ 4715 parent_io->u.bdev.split_outstanding = matched_ios; 4716 } 4717 4718 int 4719 spdk_bdev_abort(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 4720 void *bio_cb_arg, 4721 spdk_bdev_io_completion_cb cb, void *cb_arg) 4722 { 4723 struct spdk_bdev *bdev = spdk_bdev_desc_get_bdev(desc); 4724 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 4725 struct spdk_bdev_io *bdev_io; 4726 4727 if (bio_cb_arg == NULL) { 4728 return -EINVAL; 4729 } 4730 4731 if (!spdk_bdev_io_type_supported(bdev, SPDK_BDEV_IO_TYPE_ABORT)) { 4732 return -ENOTSUP; 4733 } 4734 4735 bdev_io = bdev_channel_get_io(channel); 4736 if (bdev_io == NULL) { 4737 return -ENOMEM; 4738 } 4739 4740 bdev_io->internal.ch = channel; 4741 bdev_io->internal.desc = desc; 4742 bdev_io->internal.submit_tsc = spdk_get_ticks(); 4743 bdev_io->type = SPDK_BDEV_IO_TYPE_ABORT; 4744 bdev_io_init(bdev_io, bdev, cb_arg, cb); 4745 4746 bdev_io->u.bdev.abort.bio_cb_arg = bio_cb_arg; 4747 4748 /* Parent abort request is not submitted directly, but to manage its execution, 4749 * add it to the submitted list here. 4750 */ 4751 TAILQ_INSERT_TAIL(&channel->io_submitted, bdev_io, internal.ch_link); 4752 4753 bdev_abort(bdev_io); 4754 4755 return 0; 4756 } 4757 4758 int 4759 spdk_bdev_queue_io_wait(struct spdk_bdev *bdev, struct spdk_io_channel *ch, 4760 struct spdk_bdev_io_wait_entry *entry) 4761 { 4762 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 4763 struct spdk_bdev_mgmt_channel *mgmt_ch = channel->shared_resource->mgmt_ch; 4764 4765 if (bdev != entry->bdev) { 4766 SPDK_ERRLOG("bdevs do not match\n"); 4767 return -EINVAL; 4768 } 4769 4770 if (mgmt_ch->per_thread_cache_count > 0) { 4771 SPDK_ERRLOG("Cannot queue io_wait if spdk_bdev_io available in per-thread cache\n"); 4772 return -EINVAL; 4773 } 4774 4775 TAILQ_INSERT_TAIL(&mgmt_ch->io_wait_queue, entry, link); 4776 return 0; 4777 } 4778 4779 static void 4780 bdev_ch_retry_io(struct spdk_bdev_channel *bdev_ch) 4781 { 4782 struct spdk_bdev *bdev = bdev_ch->bdev; 4783 struct spdk_bdev_shared_resource *shared_resource = bdev_ch->shared_resource; 4784 struct spdk_bdev_io *bdev_io; 4785 4786 if (shared_resource->io_outstanding > shared_resource->nomem_threshold) { 4787 /* 4788 * Allow some more I/O to complete before retrying the nomem_io queue. 4789 * Some drivers (such as nvme) cannot immediately take a new I/O in 4790 * the context of a completion, because the resources for the I/O are 4791 * not released until control returns to the bdev poller. Also, we 4792 * may require several small I/O to complete before a larger I/O 4793 * (that requires splitting) can be submitted. 4794 */ 4795 return; 4796 } 4797 4798 while (!TAILQ_EMPTY(&shared_resource->nomem_io)) { 4799 bdev_io = TAILQ_FIRST(&shared_resource->nomem_io); 4800 TAILQ_REMOVE(&shared_resource->nomem_io, bdev_io, internal.link); 4801 bdev_io->internal.ch->io_outstanding++; 4802 shared_resource->io_outstanding++; 4803 bdev_io->internal.status = SPDK_BDEV_IO_STATUS_PENDING; 4804 bdev_io->internal.error.nvme.cdw0 = 0; 4805 bdev_io->num_retries++; 4806 bdev->fn_table->submit_request(spdk_bdev_io_get_io_channel(bdev_io), bdev_io); 4807 if (bdev_io->internal.status == SPDK_BDEV_IO_STATUS_NOMEM) { 4808 break; 4809 } 4810 } 4811 } 4812 4813 static inline void 4814 bdev_io_complete(void *ctx) 4815 { 4816 struct spdk_bdev_io *bdev_io = ctx; 4817 struct spdk_bdev_channel *bdev_ch = bdev_io->internal.ch; 4818 uint64_t tsc, tsc_diff; 4819 4820 if (spdk_unlikely(bdev_io->internal.in_submit_request || bdev_io->internal.io_submit_ch)) { 4821 /* 4822 * Send the completion to the thread that originally submitted the I/O, 4823 * which may not be the current thread in the case of QoS. 4824 */ 4825 if (bdev_io->internal.io_submit_ch) { 4826 bdev_io->internal.ch = bdev_io->internal.io_submit_ch; 4827 bdev_io->internal.io_submit_ch = NULL; 4828 } 4829 4830 /* 4831 * Defer completion to avoid potential infinite recursion if the 4832 * user's completion callback issues a new I/O. 4833 */ 4834 spdk_thread_send_msg(spdk_bdev_io_get_thread(bdev_io), 4835 bdev_io_complete, bdev_io); 4836 return; 4837 } 4838 4839 tsc = spdk_get_ticks(); 4840 tsc_diff = tsc - bdev_io->internal.submit_tsc; 4841 spdk_trace_record_tsc(tsc, TRACE_BDEV_IO_DONE, 0, 0, (uintptr_t)bdev_io, 0); 4842 4843 TAILQ_REMOVE(&bdev_ch->io_submitted, bdev_io, internal.ch_link); 4844 4845 if (bdev_io->internal.ch->histogram) { 4846 spdk_histogram_data_tally(bdev_io->internal.ch->histogram, tsc_diff); 4847 } 4848 4849 if (bdev_io->internal.status == SPDK_BDEV_IO_STATUS_SUCCESS) { 4850 switch (bdev_io->type) { 4851 case SPDK_BDEV_IO_TYPE_READ: 4852 bdev_io->internal.ch->stat.bytes_read += bdev_io->u.bdev.num_blocks * bdev_io->bdev->blocklen; 4853 bdev_io->internal.ch->stat.num_read_ops++; 4854 bdev_io->internal.ch->stat.read_latency_ticks += tsc_diff; 4855 break; 4856 case SPDK_BDEV_IO_TYPE_WRITE: 4857 bdev_io->internal.ch->stat.bytes_written += bdev_io->u.bdev.num_blocks * bdev_io->bdev->blocklen; 4858 bdev_io->internal.ch->stat.num_write_ops++; 4859 bdev_io->internal.ch->stat.write_latency_ticks += tsc_diff; 4860 break; 4861 case SPDK_BDEV_IO_TYPE_UNMAP: 4862 bdev_io->internal.ch->stat.bytes_unmapped += bdev_io->u.bdev.num_blocks * bdev_io->bdev->blocklen; 4863 bdev_io->internal.ch->stat.num_unmap_ops++; 4864 bdev_io->internal.ch->stat.unmap_latency_ticks += tsc_diff; 4865 break; 4866 case SPDK_BDEV_IO_TYPE_ZCOPY: 4867 /* Track the data in the start phase only */ 4868 if (bdev_io->u.bdev.zcopy.start) { 4869 if (bdev_io->u.bdev.zcopy.populate) { 4870 bdev_io->internal.ch->stat.bytes_read += 4871 bdev_io->u.bdev.num_blocks * bdev_io->bdev->blocklen; 4872 bdev_io->internal.ch->stat.num_read_ops++; 4873 bdev_io->internal.ch->stat.read_latency_ticks += tsc_diff; 4874 } else { 4875 bdev_io->internal.ch->stat.bytes_written += 4876 bdev_io->u.bdev.num_blocks * bdev_io->bdev->blocklen; 4877 bdev_io->internal.ch->stat.num_write_ops++; 4878 bdev_io->internal.ch->stat.write_latency_ticks += tsc_diff; 4879 } 4880 } 4881 break; 4882 default: 4883 break; 4884 } 4885 } 4886 4887 #ifdef SPDK_CONFIG_VTUNE 4888 uint64_t now_tsc = spdk_get_ticks(); 4889 if (now_tsc > (bdev_io->internal.ch->start_tsc + bdev_io->internal.ch->interval_tsc)) { 4890 uint64_t data[5]; 4891 4892 data[0] = bdev_io->internal.ch->stat.num_read_ops - bdev_io->internal.ch->prev_stat.num_read_ops; 4893 data[1] = bdev_io->internal.ch->stat.bytes_read - bdev_io->internal.ch->prev_stat.bytes_read; 4894 data[2] = bdev_io->internal.ch->stat.num_write_ops - bdev_io->internal.ch->prev_stat.num_write_ops; 4895 data[3] = bdev_io->internal.ch->stat.bytes_written - bdev_io->internal.ch->prev_stat.bytes_written; 4896 data[4] = bdev_io->bdev->fn_table->get_spin_time ? 4897 bdev_io->bdev->fn_table->get_spin_time(spdk_bdev_io_get_io_channel(bdev_io)) : 0; 4898 4899 __itt_metadata_add(g_bdev_mgr.domain, __itt_null, bdev_io->internal.ch->handle, 4900 __itt_metadata_u64, 5, data); 4901 4902 bdev_io->internal.ch->prev_stat = bdev_io->internal.ch->stat; 4903 bdev_io->internal.ch->start_tsc = now_tsc; 4904 } 4905 #endif 4906 4907 assert(bdev_io->internal.cb != NULL); 4908 assert(spdk_get_thread() == spdk_bdev_io_get_thread(bdev_io)); 4909 4910 bdev_io->internal.cb(bdev_io, bdev_io->internal.status == SPDK_BDEV_IO_STATUS_SUCCESS, 4911 bdev_io->internal.caller_ctx); 4912 } 4913 4914 static void 4915 bdev_reset_complete(struct spdk_io_channel_iter *i, int status) 4916 { 4917 struct spdk_bdev_io *bdev_io = spdk_io_channel_iter_get_ctx(i); 4918 4919 if (bdev_io->u.reset.ch_ref != NULL) { 4920 spdk_put_io_channel(bdev_io->u.reset.ch_ref); 4921 bdev_io->u.reset.ch_ref = NULL; 4922 } 4923 4924 bdev_io_complete(bdev_io); 4925 } 4926 4927 static void 4928 bdev_unfreeze_channel(struct spdk_io_channel_iter *i) 4929 { 4930 struct spdk_bdev_io *bdev_io = spdk_io_channel_iter_get_ctx(i); 4931 struct spdk_io_channel *_ch = spdk_io_channel_iter_get_channel(i); 4932 struct spdk_bdev_channel *ch = spdk_io_channel_get_ctx(_ch); 4933 struct spdk_bdev_io *queued_reset; 4934 4935 ch->flags &= ~BDEV_CH_RESET_IN_PROGRESS; 4936 while (!TAILQ_EMPTY(&ch->queued_resets)) { 4937 queued_reset = TAILQ_FIRST(&ch->queued_resets); 4938 TAILQ_REMOVE(&ch->queued_resets, queued_reset, internal.link); 4939 spdk_bdev_io_complete(queued_reset, bdev_io->internal.status); 4940 } 4941 4942 spdk_for_each_channel_continue(i, 0); 4943 } 4944 4945 void 4946 spdk_bdev_io_complete(struct spdk_bdev_io *bdev_io, enum spdk_bdev_io_status status) 4947 { 4948 struct spdk_bdev *bdev = bdev_io->bdev; 4949 struct spdk_bdev_channel *bdev_ch = bdev_io->internal.ch; 4950 struct spdk_bdev_shared_resource *shared_resource = bdev_ch->shared_resource; 4951 4952 bdev_io->internal.status = status; 4953 4954 if (spdk_unlikely(bdev_io->type == SPDK_BDEV_IO_TYPE_RESET)) { 4955 bool unlock_channels = false; 4956 4957 if (status == SPDK_BDEV_IO_STATUS_NOMEM) { 4958 SPDK_ERRLOG("NOMEM returned for reset\n"); 4959 } 4960 pthread_mutex_lock(&bdev->internal.mutex); 4961 if (bdev_io == bdev->internal.reset_in_progress) { 4962 bdev->internal.reset_in_progress = NULL; 4963 unlock_channels = true; 4964 } 4965 pthread_mutex_unlock(&bdev->internal.mutex); 4966 4967 if (unlock_channels) { 4968 spdk_for_each_channel(__bdev_to_io_dev(bdev), bdev_unfreeze_channel, 4969 bdev_io, bdev_reset_complete); 4970 return; 4971 } 4972 } else { 4973 _bdev_io_unset_bounce_buf(bdev_io); 4974 4975 assert(bdev_ch->io_outstanding > 0); 4976 assert(shared_resource->io_outstanding > 0); 4977 bdev_ch->io_outstanding--; 4978 shared_resource->io_outstanding--; 4979 4980 if (spdk_unlikely(status == SPDK_BDEV_IO_STATUS_NOMEM)) { 4981 TAILQ_INSERT_HEAD(&shared_resource->nomem_io, bdev_io, internal.link); 4982 /* 4983 * Wait for some of the outstanding I/O to complete before we 4984 * retry any of the nomem_io. Normally we will wait for 4985 * NOMEM_THRESHOLD_COUNT I/O to complete but for low queue 4986 * depth channels we will instead wait for half to complete. 4987 */ 4988 shared_resource->nomem_threshold = spdk_max((int64_t)shared_resource->io_outstanding / 2, 4989 (int64_t)shared_resource->io_outstanding - NOMEM_THRESHOLD_COUNT); 4990 return; 4991 } 4992 4993 if (spdk_unlikely(!TAILQ_EMPTY(&shared_resource->nomem_io))) { 4994 bdev_ch_retry_io(bdev_ch); 4995 } 4996 } 4997 4998 bdev_io_complete(bdev_io); 4999 } 5000 5001 void 5002 spdk_bdev_io_complete_scsi_status(struct spdk_bdev_io *bdev_io, enum spdk_scsi_status sc, 5003 enum spdk_scsi_sense sk, uint8_t asc, uint8_t ascq) 5004 { 5005 if (sc == SPDK_SCSI_STATUS_GOOD) { 5006 bdev_io->internal.status = SPDK_BDEV_IO_STATUS_SUCCESS; 5007 } else { 5008 bdev_io->internal.status = SPDK_BDEV_IO_STATUS_SCSI_ERROR; 5009 bdev_io->internal.error.scsi.sc = sc; 5010 bdev_io->internal.error.scsi.sk = sk; 5011 bdev_io->internal.error.scsi.asc = asc; 5012 bdev_io->internal.error.scsi.ascq = ascq; 5013 } 5014 5015 spdk_bdev_io_complete(bdev_io, bdev_io->internal.status); 5016 } 5017 5018 void 5019 spdk_bdev_io_get_scsi_status(const struct spdk_bdev_io *bdev_io, 5020 int *sc, int *sk, int *asc, int *ascq) 5021 { 5022 assert(sc != NULL); 5023 assert(sk != NULL); 5024 assert(asc != NULL); 5025 assert(ascq != NULL); 5026 5027 switch (bdev_io->internal.status) { 5028 case SPDK_BDEV_IO_STATUS_SUCCESS: 5029 *sc = SPDK_SCSI_STATUS_GOOD; 5030 *sk = SPDK_SCSI_SENSE_NO_SENSE; 5031 *asc = SPDK_SCSI_ASC_NO_ADDITIONAL_SENSE; 5032 *ascq = SPDK_SCSI_ASCQ_CAUSE_NOT_REPORTABLE; 5033 break; 5034 case SPDK_BDEV_IO_STATUS_NVME_ERROR: 5035 spdk_scsi_nvme_translate(bdev_io, sc, sk, asc, ascq); 5036 break; 5037 case SPDK_BDEV_IO_STATUS_SCSI_ERROR: 5038 *sc = bdev_io->internal.error.scsi.sc; 5039 *sk = bdev_io->internal.error.scsi.sk; 5040 *asc = bdev_io->internal.error.scsi.asc; 5041 *ascq = bdev_io->internal.error.scsi.ascq; 5042 break; 5043 default: 5044 *sc = SPDK_SCSI_STATUS_CHECK_CONDITION; 5045 *sk = SPDK_SCSI_SENSE_ABORTED_COMMAND; 5046 *asc = SPDK_SCSI_ASC_NO_ADDITIONAL_SENSE; 5047 *ascq = SPDK_SCSI_ASCQ_CAUSE_NOT_REPORTABLE; 5048 break; 5049 } 5050 } 5051 5052 void 5053 spdk_bdev_io_complete_nvme_status(struct spdk_bdev_io *bdev_io, uint32_t cdw0, int sct, int sc) 5054 { 5055 if (sct == SPDK_NVME_SCT_GENERIC && sc == SPDK_NVME_SC_SUCCESS) { 5056 bdev_io->internal.status = SPDK_BDEV_IO_STATUS_SUCCESS; 5057 } else { 5058 bdev_io->internal.status = SPDK_BDEV_IO_STATUS_NVME_ERROR; 5059 } 5060 5061 bdev_io->internal.error.nvme.cdw0 = cdw0; 5062 bdev_io->internal.error.nvme.sct = sct; 5063 bdev_io->internal.error.nvme.sc = sc; 5064 5065 spdk_bdev_io_complete(bdev_io, bdev_io->internal.status); 5066 } 5067 5068 void 5069 spdk_bdev_io_get_nvme_status(const struct spdk_bdev_io *bdev_io, uint32_t *cdw0, int *sct, int *sc) 5070 { 5071 assert(sct != NULL); 5072 assert(sc != NULL); 5073 assert(cdw0 != NULL); 5074 5075 if (bdev_io->internal.status == SPDK_BDEV_IO_STATUS_NVME_ERROR) { 5076 *sct = bdev_io->internal.error.nvme.sct; 5077 *sc = bdev_io->internal.error.nvme.sc; 5078 } else if (bdev_io->internal.status == SPDK_BDEV_IO_STATUS_SUCCESS) { 5079 *sct = SPDK_NVME_SCT_GENERIC; 5080 *sc = SPDK_NVME_SC_SUCCESS; 5081 } else if (bdev_io->internal.status == SPDK_BDEV_IO_STATUS_ABORTED) { 5082 *sct = SPDK_NVME_SCT_GENERIC; 5083 *sc = SPDK_NVME_SC_ABORTED_BY_REQUEST; 5084 } else { 5085 *sct = SPDK_NVME_SCT_GENERIC; 5086 *sc = SPDK_NVME_SC_INTERNAL_DEVICE_ERROR; 5087 } 5088 5089 *cdw0 = bdev_io->internal.error.nvme.cdw0; 5090 } 5091 5092 void 5093 spdk_bdev_io_get_nvme_fused_status(const struct spdk_bdev_io *bdev_io, uint32_t *cdw0, 5094 int *first_sct, int *first_sc, int *second_sct, int *second_sc) 5095 { 5096 assert(first_sct != NULL); 5097 assert(first_sc != NULL); 5098 assert(second_sct != NULL); 5099 assert(second_sc != NULL); 5100 assert(cdw0 != NULL); 5101 5102 if (bdev_io->internal.status == SPDK_BDEV_IO_STATUS_NVME_ERROR) { 5103 if (bdev_io->internal.error.nvme.sct == SPDK_NVME_SCT_MEDIA_ERROR && 5104 bdev_io->internal.error.nvme.sc == SPDK_NVME_SC_COMPARE_FAILURE) { 5105 *first_sct = bdev_io->internal.error.nvme.sct; 5106 *first_sc = bdev_io->internal.error.nvme.sc; 5107 *second_sct = SPDK_NVME_SCT_GENERIC; 5108 *second_sc = SPDK_NVME_SC_ABORTED_FAILED_FUSED; 5109 } else { 5110 *first_sct = SPDK_NVME_SCT_GENERIC; 5111 *first_sc = SPDK_NVME_SC_SUCCESS; 5112 *second_sct = bdev_io->internal.error.nvme.sct; 5113 *second_sc = bdev_io->internal.error.nvme.sc; 5114 } 5115 } else if (bdev_io->internal.status == SPDK_BDEV_IO_STATUS_SUCCESS) { 5116 *first_sct = SPDK_NVME_SCT_GENERIC; 5117 *first_sc = SPDK_NVME_SC_SUCCESS; 5118 *second_sct = SPDK_NVME_SCT_GENERIC; 5119 *second_sc = SPDK_NVME_SC_SUCCESS; 5120 } else if (bdev_io->internal.status == SPDK_BDEV_IO_STATUS_FIRST_FUSED_FAILED) { 5121 *first_sct = SPDK_NVME_SCT_GENERIC; 5122 *first_sc = SPDK_NVME_SC_INTERNAL_DEVICE_ERROR; 5123 *second_sct = SPDK_NVME_SCT_GENERIC; 5124 *second_sc = SPDK_NVME_SC_ABORTED_FAILED_FUSED; 5125 } else if (bdev_io->internal.status == SPDK_BDEV_IO_STATUS_MISCOMPARE) { 5126 *first_sct = SPDK_NVME_SCT_MEDIA_ERROR; 5127 *first_sc = SPDK_NVME_SC_COMPARE_FAILURE; 5128 *second_sct = SPDK_NVME_SCT_GENERIC; 5129 *second_sc = SPDK_NVME_SC_ABORTED_FAILED_FUSED; 5130 } else { 5131 *first_sct = SPDK_NVME_SCT_GENERIC; 5132 *first_sc = SPDK_NVME_SC_INTERNAL_DEVICE_ERROR; 5133 *second_sct = SPDK_NVME_SCT_GENERIC; 5134 *second_sc = SPDK_NVME_SC_INTERNAL_DEVICE_ERROR; 5135 } 5136 5137 *cdw0 = bdev_io->internal.error.nvme.cdw0; 5138 } 5139 5140 struct spdk_thread * 5141 spdk_bdev_io_get_thread(struct spdk_bdev_io *bdev_io) 5142 { 5143 return spdk_io_channel_get_thread(bdev_io->internal.ch->channel); 5144 } 5145 5146 struct spdk_io_channel * 5147 spdk_bdev_io_get_io_channel(struct spdk_bdev_io *bdev_io) 5148 { 5149 return bdev_io->internal.ch->channel; 5150 } 5151 5152 static void 5153 bdev_qos_config_limit(struct spdk_bdev *bdev, uint64_t *limits) 5154 { 5155 uint64_t min_qos_set; 5156 int i; 5157 5158 for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) { 5159 if (limits[i] != SPDK_BDEV_QOS_LIMIT_NOT_DEFINED) { 5160 break; 5161 } 5162 } 5163 5164 if (i == SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES) { 5165 SPDK_ERRLOG("Invalid rate limits set.\n"); 5166 return; 5167 } 5168 5169 for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) { 5170 if (limits[i] == SPDK_BDEV_QOS_LIMIT_NOT_DEFINED) { 5171 continue; 5172 } 5173 5174 if (bdev_qos_is_iops_rate_limit(i) == true) { 5175 min_qos_set = SPDK_BDEV_QOS_MIN_IOS_PER_SEC; 5176 } else { 5177 min_qos_set = SPDK_BDEV_QOS_MIN_BYTES_PER_SEC; 5178 } 5179 5180 if (limits[i] == 0 || limits[i] % min_qos_set) { 5181 SPDK_ERRLOG("Assigned limit %" PRIu64 " on bdev %s is not multiple of %" PRIu64 "\n", 5182 limits[i], bdev->name, min_qos_set); 5183 SPDK_ERRLOG("Failed to enable QoS on this bdev %s\n", bdev->name); 5184 return; 5185 } 5186 } 5187 5188 if (!bdev->internal.qos) { 5189 bdev->internal.qos = calloc(1, sizeof(*bdev->internal.qos)); 5190 if (!bdev->internal.qos) { 5191 SPDK_ERRLOG("Unable to allocate memory for QoS tracking\n"); 5192 return; 5193 } 5194 } 5195 5196 for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) { 5197 bdev->internal.qos->rate_limits[i].limit = limits[i]; 5198 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Bdev:%s QoS type:%d set:%lu\n", 5199 bdev->name, i, limits[i]); 5200 } 5201 5202 return; 5203 } 5204 5205 static void 5206 bdev_qos_config(struct spdk_bdev *bdev) 5207 { 5208 struct spdk_conf_section *sp = NULL; 5209 const char *val = NULL; 5210 int i = 0, j = 0; 5211 uint64_t limits[SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES] = {}; 5212 bool config_qos = false; 5213 5214 sp = spdk_conf_find_section(NULL, "QoS"); 5215 if (!sp) { 5216 return; 5217 } 5218 5219 while (j < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES) { 5220 limits[j] = SPDK_BDEV_QOS_LIMIT_NOT_DEFINED; 5221 5222 i = 0; 5223 while (true) { 5224 val = spdk_conf_section_get_nmval(sp, qos_conf_type[j], i, 0); 5225 if (!val) { 5226 break; 5227 } 5228 5229 if (strcmp(bdev->name, val) != 0) { 5230 i++; 5231 continue; 5232 } 5233 5234 val = spdk_conf_section_get_nmval(sp, qos_conf_type[j], i, 1); 5235 if (val) { 5236 if (bdev_qos_is_iops_rate_limit(j) == true) { 5237 limits[j] = strtoull(val, NULL, 10); 5238 } else { 5239 limits[j] = strtoull(val, NULL, 10) * 1024 * 1024; 5240 } 5241 config_qos = true; 5242 } 5243 5244 break; 5245 } 5246 5247 j++; 5248 } 5249 5250 if (config_qos == true) { 5251 bdev_qos_config_limit(bdev, limits); 5252 } 5253 5254 return; 5255 } 5256 5257 static int 5258 bdev_init(struct spdk_bdev *bdev) 5259 { 5260 char *bdev_name; 5261 5262 assert(bdev->module != NULL); 5263 5264 if (!bdev->name) { 5265 SPDK_ERRLOG("Bdev name is NULL\n"); 5266 return -EINVAL; 5267 } 5268 5269 if (!strlen(bdev->name)) { 5270 SPDK_ERRLOG("Bdev name must not be an empty string\n"); 5271 return -EINVAL; 5272 } 5273 5274 if (spdk_bdev_get_by_name(bdev->name)) { 5275 SPDK_ERRLOG("Bdev name:%s already exists\n", bdev->name); 5276 return -EEXIST; 5277 } 5278 5279 /* Users often register their own I/O devices using the bdev name. In 5280 * order to avoid conflicts, prepend bdev_. */ 5281 bdev_name = spdk_sprintf_alloc("bdev_%s", bdev->name); 5282 if (!bdev_name) { 5283 SPDK_ERRLOG("Unable to allocate memory for internal bdev name.\n"); 5284 return -ENOMEM; 5285 } 5286 5287 bdev->internal.status = SPDK_BDEV_STATUS_READY; 5288 bdev->internal.measured_queue_depth = UINT64_MAX; 5289 bdev->internal.claim_module = NULL; 5290 bdev->internal.qd_poller = NULL; 5291 bdev->internal.qos = NULL; 5292 5293 /* If the user didn't specify a uuid, generate one. */ 5294 if (spdk_mem_all_zero(&bdev->uuid, sizeof(bdev->uuid))) { 5295 spdk_uuid_generate(&bdev->uuid); 5296 } 5297 5298 if (spdk_bdev_get_buf_align(bdev) > 1) { 5299 if (bdev->split_on_optimal_io_boundary) { 5300 bdev->optimal_io_boundary = spdk_min(bdev->optimal_io_boundary, 5301 SPDK_BDEV_LARGE_BUF_MAX_SIZE / bdev->blocklen); 5302 } else { 5303 bdev->split_on_optimal_io_boundary = true; 5304 bdev->optimal_io_boundary = SPDK_BDEV_LARGE_BUF_MAX_SIZE / bdev->blocklen; 5305 } 5306 } 5307 5308 /* If the user didn't specify a write unit size, set it to one. */ 5309 if (bdev->write_unit_size == 0) { 5310 bdev->write_unit_size = 1; 5311 } 5312 5313 /* Set ACWU value to 1 if bdev module did not set it (does not support it natively) */ 5314 if (bdev->acwu == 0) { 5315 bdev->acwu = 1; 5316 } 5317 5318 TAILQ_INIT(&bdev->internal.open_descs); 5319 TAILQ_INIT(&bdev->internal.locked_ranges); 5320 TAILQ_INIT(&bdev->internal.pending_locked_ranges); 5321 5322 TAILQ_INIT(&bdev->aliases); 5323 5324 bdev->internal.reset_in_progress = NULL; 5325 5326 bdev_qos_config(bdev); 5327 5328 spdk_io_device_register(__bdev_to_io_dev(bdev), 5329 bdev_channel_create, bdev_channel_destroy, 5330 sizeof(struct spdk_bdev_channel), 5331 bdev_name); 5332 5333 free(bdev_name); 5334 5335 pthread_mutex_init(&bdev->internal.mutex, NULL); 5336 return 0; 5337 } 5338 5339 static void 5340 bdev_destroy_cb(void *io_device) 5341 { 5342 int rc; 5343 struct spdk_bdev *bdev; 5344 spdk_bdev_unregister_cb cb_fn; 5345 void *cb_arg; 5346 5347 bdev = __bdev_from_io_dev(io_device); 5348 cb_fn = bdev->internal.unregister_cb; 5349 cb_arg = bdev->internal.unregister_ctx; 5350 5351 rc = bdev->fn_table->destruct(bdev->ctxt); 5352 if (rc < 0) { 5353 SPDK_ERRLOG("destruct failed\n"); 5354 } 5355 if (rc <= 0 && cb_fn != NULL) { 5356 cb_fn(cb_arg, rc); 5357 } 5358 } 5359 5360 5361 static void 5362 bdev_fini(struct spdk_bdev *bdev) 5363 { 5364 pthread_mutex_destroy(&bdev->internal.mutex); 5365 5366 free(bdev->internal.qos); 5367 5368 spdk_io_device_unregister(__bdev_to_io_dev(bdev), bdev_destroy_cb); 5369 } 5370 5371 static void 5372 bdev_start(struct spdk_bdev *bdev) 5373 { 5374 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Inserting bdev %s into list\n", bdev->name); 5375 TAILQ_INSERT_TAIL(&g_bdev_mgr.bdevs, bdev, internal.link); 5376 5377 /* Examine configuration before initializing I/O */ 5378 bdev_examine(bdev); 5379 } 5380 5381 int 5382 spdk_bdev_register(struct spdk_bdev *bdev) 5383 { 5384 int rc = bdev_init(bdev); 5385 5386 if (rc == 0) { 5387 bdev_start(bdev); 5388 } 5389 5390 spdk_notify_send("bdev_register", spdk_bdev_get_name(bdev)); 5391 return rc; 5392 } 5393 5394 int 5395 spdk_vbdev_register(struct spdk_bdev *vbdev, struct spdk_bdev **base_bdevs, int base_bdev_count) 5396 { 5397 SPDK_ERRLOG("This function is deprecated. Use spdk_bdev_register() instead.\n"); 5398 return spdk_bdev_register(vbdev); 5399 } 5400 5401 void 5402 spdk_bdev_destruct_done(struct spdk_bdev *bdev, int bdeverrno) 5403 { 5404 if (bdev->internal.unregister_cb != NULL) { 5405 bdev->internal.unregister_cb(bdev->internal.unregister_ctx, bdeverrno); 5406 } 5407 } 5408 5409 static void 5410 _remove_notify(void *arg) 5411 { 5412 struct spdk_bdev_desc *desc = arg; 5413 5414 pthread_mutex_lock(&desc->mutex); 5415 desc->refs--; 5416 5417 if (!desc->closed) { 5418 pthread_mutex_unlock(&desc->mutex); 5419 if (desc->callback.open_with_ext) { 5420 desc->callback.event_fn(SPDK_BDEV_EVENT_REMOVE, desc->bdev, desc->callback.ctx); 5421 } else { 5422 desc->callback.remove_fn(desc->callback.ctx); 5423 } 5424 return; 5425 } else if (0 == desc->refs) { 5426 /* This descriptor was closed after this remove_notify message was sent. 5427 * spdk_bdev_close() could not free the descriptor since this message was 5428 * in flight, so we free it now using bdev_desc_free(). 5429 */ 5430 pthread_mutex_unlock(&desc->mutex); 5431 bdev_desc_free(desc); 5432 return; 5433 } 5434 pthread_mutex_unlock(&desc->mutex); 5435 } 5436 5437 /* Must be called while holding bdev->internal.mutex. 5438 * returns: 0 - bdev removed and ready to be destructed. 5439 * -EBUSY - bdev can't be destructed yet. */ 5440 static int 5441 bdev_unregister_unsafe(struct spdk_bdev *bdev) 5442 { 5443 struct spdk_bdev_desc *desc, *tmp; 5444 int rc = 0; 5445 5446 /* Notify each descriptor about hotremoval */ 5447 TAILQ_FOREACH_SAFE(desc, &bdev->internal.open_descs, link, tmp) { 5448 rc = -EBUSY; 5449 pthread_mutex_lock(&desc->mutex); 5450 /* 5451 * Defer invocation of the event_cb to a separate message that will 5452 * run later on its thread. This ensures this context unwinds and 5453 * we don't recursively unregister this bdev again if the event_cb 5454 * immediately closes its descriptor. 5455 */ 5456 desc->refs++; 5457 spdk_thread_send_msg(desc->thread, _remove_notify, desc); 5458 pthread_mutex_unlock(&desc->mutex); 5459 } 5460 5461 /* If there are no descriptors, proceed removing the bdev */ 5462 if (rc == 0) { 5463 TAILQ_REMOVE(&g_bdev_mgr.bdevs, bdev, internal.link); 5464 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Removing bdev %s from list done\n", bdev->name); 5465 spdk_notify_send("bdev_unregister", spdk_bdev_get_name(bdev)); 5466 } 5467 5468 return rc; 5469 } 5470 5471 void 5472 spdk_bdev_unregister(struct spdk_bdev *bdev, spdk_bdev_unregister_cb cb_fn, void *cb_arg) 5473 { 5474 struct spdk_thread *thread; 5475 int rc; 5476 5477 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Removing bdev %s from list\n", bdev->name); 5478 5479 thread = spdk_get_thread(); 5480 if (!thread) { 5481 /* The user called this from a non-SPDK thread. */ 5482 if (cb_fn != NULL) { 5483 cb_fn(cb_arg, -ENOTSUP); 5484 } 5485 return; 5486 } 5487 5488 pthread_mutex_lock(&g_bdev_mgr.mutex); 5489 pthread_mutex_lock(&bdev->internal.mutex); 5490 if (bdev->internal.status == SPDK_BDEV_STATUS_REMOVING) { 5491 pthread_mutex_unlock(&bdev->internal.mutex); 5492 pthread_mutex_unlock(&g_bdev_mgr.mutex); 5493 if (cb_fn) { 5494 cb_fn(cb_arg, -EBUSY); 5495 } 5496 return; 5497 } 5498 5499 bdev->internal.status = SPDK_BDEV_STATUS_REMOVING; 5500 bdev->internal.unregister_cb = cb_fn; 5501 bdev->internal.unregister_ctx = cb_arg; 5502 5503 /* Call under lock. */ 5504 rc = bdev_unregister_unsafe(bdev); 5505 pthread_mutex_unlock(&bdev->internal.mutex); 5506 pthread_mutex_unlock(&g_bdev_mgr.mutex); 5507 5508 if (rc == 0) { 5509 bdev_fini(bdev); 5510 } 5511 } 5512 5513 static void 5514 bdev_dummy_event_cb(void *remove_ctx) 5515 { 5516 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Bdev remove event received with no remove callback specified"); 5517 } 5518 5519 static int 5520 bdev_start_qos(struct spdk_bdev *bdev) 5521 { 5522 struct set_qos_limit_ctx *ctx; 5523 5524 /* Enable QoS */ 5525 if (bdev->internal.qos && bdev->internal.qos->thread == NULL) { 5526 ctx = calloc(1, sizeof(*ctx)); 5527 if (ctx == NULL) { 5528 SPDK_ERRLOG("Failed to allocate memory for QoS context\n"); 5529 return -ENOMEM; 5530 } 5531 ctx->bdev = bdev; 5532 spdk_for_each_channel(__bdev_to_io_dev(bdev), 5533 bdev_enable_qos_msg, ctx, 5534 bdev_enable_qos_done); 5535 } 5536 5537 return 0; 5538 } 5539 5540 static int 5541 bdev_open(struct spdk_bdev *bdev, bool write, struct spdk_bdev_desc *desc) 5542 { 5543 struct spdk_thread *thread; 5544 int rc = 0; 5545 5546 thread = spdk_get_thread(); 5547 if (!thread) { 5548 SPDK_ERRLOG("Cannot open bdev from non-SPDK thread.\n"); 5549 return -ENOTSUP; 5550 } 5551 5552 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Opening descriptor %p for bdev %s on thread %p\n", desc, bdev->name, 5553 spdk_get_thread()); 5554 5555 desc->bdev = bdev; 5556 desc->thread = thread; 5557 desc->write = write; 5558 5559 pthread_mutex_lock(&bdev->internal.mutex); 5560 if (bdev->internal.status == SPDK_BDEV_STATUS_REMOVING) { 5561 pthread_mutex_unlock(&bdev->internal.mutex); 5562 return -ENODEV; 5563 } 5564 5565 if (write && bdev->internal.claim_module) { 5566 SPDK_ERRLOG("Could not open %s - %s module already claimed it\n", 5567 bdev->name, bdev->internal.claim_module->name); 5568 pthread_mutex_unlock(&bdev->internal.mutex); 5569 return -EPERM; 5570 } 5571 5572 rc = bdev_start_qos(bdev); 5573 if (rc != 0) { 5574 SPDK_ERRLOG("Failed to start QoS on bdev %s\n", bdev->name); 5575 pthread_mutex_unlock(&bdev->internal.mutex); 5576 return rc; 5577 } 5578 5579 TAILQ_INSERT_TAIL(&bdev->internal.open_descs, desc, link); 5580 5581 pthread_mutex_unlock(&bdev->internal.mutex); 5582 5583 return 0; 5584 } 5585 5586 int 5587 spdk_bdev_open(struct spdk_bdev *bdev, bool write, spdk_bdev_remove_cb_t remove_cb, 5588 void *remove_ctx, struct spdk_bdev_desc **_desc) 5589 { 5590 struct spdk_bdev_desc *desc; 5591 int rc; 5592 5593 desc = calloc(1, sizeof(*desc)); 5594 if (desc == NULL) { 5595 SPDK_ERRLOG("Failed to allocate memory for bdev descriptor\n"); 5596 return -ENOMEM; 5597 } 5598 5599 if (remove_cb == NULL) { 5600 remove_cb = bdev_dummy_event_cb; 5601 } 5602 5603 TAILQ_INIT(&desc->pending_media_events); 5604 TAILQ_INIT(&desc->free_media_events); 5605 5606 desc->callback.open_with_ext = false; 5607 desc->callback.remove_fn = remove_cb; 5608 desc->callback.ctx = remove_ctx; 5609 pthread_mutex_init(&desc->mutex, NULL); 5610 5611 pthread_mutex_lock(&g_bdev_mgr.mutex); 5612 5613 rc = bdev_open(bdev, write, desc); 5614 if (rc != 0) { 5615 bdev_desc_free(desc); 5616 desc = NULL; 5617 } 5618 5619 *_desc = desc; 5620 5621 pthread_mutex_unlock(&g_bdev_mgr.mutex); 5622 5623 return rc; 5624 } 5625 5626 int 5627 spdk_bdev_open_ext(const char *bdev_name, bool write, spdk_bdev_event_cb_t event_cb, 5628 void *event_ctx, struct spdk_bdev_desc **_desc) 5629 { 5630 struct spdk_bdev_desc *desc; 5631 struct spdk_bdev *bdev; 5632 unsigned int event_id; 5633 int rc; 5634 5635 if (event_cb == NULL) { 5636 SPDK_ERRLOG("Missing event callback function\n"); 5637 return -EINVAL; 5638 } 5639 5640 pthread_mutex_lock(&g_bdev_mgr.mutex); 5641 5642 bdev = spdk_bdev_get_by_name(bdev_name); 5643 5644 if (bdev == NULL) { 5645 SPDK_ERRLOG("Failed to find bdev with name: %s\n", bdev_name); 5646 pthread_mutex_unlock(&g_bdev_mgr.mutex); 5647 return -EINVAL; 5648 } 5649 5650 desc = calloc(1, sizeof(*desc)); 5651 if (desc == NULL) { 5652 SPDK_ERRLOG("Failed to allocate memory for bdev descriptor\n"); 5653 pthread_mutex_unlock(&g_bdev_mgr.mutex); 5654 return -ENOMEM; 5655 } 5656 5657 TAILQ_INIT(&desc->pending_media_events); 5658 TAILQ_INIT(&desc->free_media_events); 5659 5660 desc->callback.open_with_ext = true; 5661 desc->callback.event_fn = event_cb; 5662 desc->callback.ctx = event_ctx; 5663 pthread_mutex_init(&desc->mutex, NULL); 5664 5665 if (bdev->media_events) { 5666 desc->media_events_buffer = calloc(MEDIA_EVENT_POOL_SIZE, 5667 sizeof(*desc->media_events_buffer)); 5668 if (desc->media_events_buffer == NULL) { 5669 SPDK_ERRLOG("Failed to initialize media event pool\n"); 5670 bdev_desc_free(desc); 5671 pthread_mutex_unlock(&g_bdev_mgr.mutex); 5672 return -ENOMEM; 5673 } 5674 5675 for (event_id = 0; event_id < MEDIA_EVENT_POOL_SIZE; ++event_id) { 5676 TAILQ_INSERT_TAIL(&desc->free_media_events, 5677 &desc->media_events_buffer[event_id], tailq); 5678 } 5679 } 5680 5681 rc = bdev_open(bdev, write, desc); 5682 if (rc != 0) { 5683 bdev_desc_free(desc); 5684 desc = NULL; 5685 } 5686 5687 *_desc = desc; 5688 5689 pthread_mutex_unlock(&g_bdev_mgr.mutex); 5690 5691 return rc; 5692 } 5693 5694 void 5695 spdk_bdev_close(struct spdk_bdev_desc *desc) 5696 { 5697 struct spdk_bdev *bdev = spdk_bdev_desc_get_bdev(desc); 5698 int rc; 5699 5700 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Closing descriptor %p for bdev %s on thread %p\n", desc, bdev->name, 5701 spdk_get_thread()); 5702 5703 assert(desc->thread == spdk_get_thread()); 5704 5705 spdk_poller_unregister(&desc->io_timeout_poller); 5706 5707 pthread_mutex_lock(&bdev->internal.mutex); 5708 pthread_mutex_lock(&desc->mutex); 5709 5710 TAILQ_REMOVE(&bdev->internal.open_descs, desc, link); 5711 5712 desc->closed = true; 5713 5714 if (0 == desc->refs) { 5715 pthread_mutex_unlock(&desc->mutex); 5716 bdev_desc_free(desc); 5717 } else { 5718 pthread_mutex_unlock(&desc->mutex); 5719 } 5720 5721 /* If no more descriptors, kill QoS channel */ 5722 if (bdev->internal.qos && TAILQ_EMPTY(&bdev->internal.open_descs)) { 5723 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Closed last descriptor for bdev %s on thread %p. Stopping QoS.\n", 5724 bdev->name, spdk_get_thread()); 5725 5726 if (bdev_qos_destroy(bdev)) { 5727 /* There isn't anything we can do to recover here. Just let the 5728 * old QoS poller keep running. The QoS handling won't change 5729 * cores when the user allocates a new channel, but it won't break. */ 5730 SPDK_ERRLOG("Unable to shut down QoS poller. It will continue running on the current thread.\n"); 5731 } 5732 } 5733 5734 spdk_bdev_set_qd_sampling_period(bdev, 0); 5735 5736 if (bdev->internal.status == SPDK_BDEV_STATUS_REMOVING && TAILQ_EMPTY(&bdev->internal.open_descs)) { 5737 rc = bdev_unregister_unsafe(bdev); 5738 pthread_mutex_unlock(&bdev->internal.mutex); 5739 5740 if (rc == 0) { 5741 bdev_fini(bdev); 5742 } 5743 } else { 5744 pthread_mutex_unlock(&bdev->internal.mutex); 5745 } 5746 } 5747 5748 int 5749 spdk_bdev_module_claim_bdev(struct spdk_bdev *bdev, struct spdk_bdev_desc *desc, 5750 struct spdk_bdev_module *module) 5751 { 5752 if (bdev->internal.claim_module != NULL) { 5753 SPDK_ERRLOG("bdev %s already claimed by module %s\n", bdev->name, 5754 bdev->internal.claim_module->name); 5755 return -EPERM; 5756 } 5757 5758 if (desc && !desc->write) { 5759 desc->write = true; 5760 } 5761 5762 bdev->internal.claim_module = module; 5763 return 0; 5764 } 5765 5766 void 5767 spdk_bdev_module_release_bdev(struct spdk_bdev *bdev) 5768 { 5769 assert(bdev->internal.claim_module != NULL); 5770 bdev->internal.claim_module = NULL; 5771 } 5772 5773 struct spdk_bdev * 5774 spdk_bdev_desc_get_bdev(struct spdk_bdev_desc *desc) 5775 { 5776 assert(desc != NULL); 5777 return desc->bdev; 5778 } 5779 5780 void 5781 spdk_bdev_io_get_iovec(struct spdk_bdev_io *bdev_io, struct iovec **iovp, int *iovcntp) 5782 { 5783 struct iovec *iovs; 5784 int iovcnt; 5785 5786 if (bdev_io == NULL) { 5787 return; 5788 } 5789 5790 switch (bdev_io->type) { 5791 case SPDK_BDEV_IO_TYPE_READ: 5792 case SPDK_BDEV_IO_TYPE_WRITE: 5793 case SPDK_BDEV_IO_TYPE_ZCOPY: 5794 iovs = bdev_io->u.bdev.iovs; 5795 iovcnt = bdev_io->u.bdev.iovcnt; 5796 break; 5797 default: 5798 iovs = NULL; 5799 iovcnt = 0; 5800 break; 5801 } 5802 5803 if (iovp) { 5804 *iovp = iovs; 5805 } 5806 if (iovcntp) { 5807 *iovcntp = iovcnt; 5808 } 5809 } 5810 5811 void * 5812 spdk_bdev_io_get_md_buf(struct spdk_bdev_io *bdev_io) 5813 { 5814 if (bdev_io == NULL) { 5815 return NULL; 5816 } 5817 5818 if (!spdk_bdev_is_md_separate(bdev_io->bdev)) { 5819 return NULL; 5820 } 5821 5822 if (bdev_io->type == SPDK_BDEV_IO_TYPE_READ || 5823 bdev_io->type == SPDK_BDEV_IO_TYPE_WRITE) { 5824 return bdev_io->u.bdev.md_buf; 5825 } 5826 5827 return NULL; 5828 } 5829 5830 void * 5831 spdk_bdev_io_get_cb_arg(struct spdk_bdev_io *bdev_io) 5832 { 5833 if (bdev_io == NULL) { 5834 assert(false); 5835 return NULL; 5836 } 5837 5838 return bdev_io->internal.caller_ctx; 5839 } 5840 5841 void 5842 spdk_bdev_module_list_add(struct spdk_bdev_module *bdev_module) 5843 { 5844 5845 if (spdk_bdev_module_list_find(bdev_module->name)) { 5846 SPDK_ERRLOG("ERROR: module '%s' already registered.\n", bdev_module->name); 5847 assert(false); 5848 } 5849 5850 /* 5851 * Modules with examine callbacks must be initialized first, so they are 5852 * ready to handle examine callbacks from later modules that will 5853 * register physical bdevs. 5854 */ 5855 if (bdev_module->examine_config != NULL || bdev_module->examine_disk != NULL) { 5856 TAILQ_INSERT_HEAD(&g_bdev_mgr.bdev_modules, bdev_module, internal.tailq); 5857 } else { 5858 TAILQ_INSERT_TAIL(&g_bdev_mgr.bdev_modules, bdev_module, internal.tailq); 5859 } 5860 } 5861 5862 struct spdk_bdev_module * 5863 spdk_bdev_module_list_find(const char *name) 5864 { 5865 struct spdk_bdev_module *bdev_module; 5866 5867 TAILQ_FOREACH(bdev_module, &g_bdev_mgr.bdev_modules, internal.tailq) { 5868 if (strcmp(name, bdev_module->name) == 0) { 5869 break; 5870 } 5871 } 5872 5873 return bdev_module; 5874 } 5875 5876 static void 5877 bdev_write_zero_buffer_next(void *_bdev_io) 5878 { 5879 struct spdk_bdev_io *bdev_io = _bdev_io; 5880 uint64_t num_bytes, num_blocks; 5881 void *md_buf = NULL; 5882 int rc; 5883 5884 num_bytes = spdk_min(_bdev_get_block_size_with_md(bdev_io->bdev) * 5885 bdev_io->u.bdev.split_remaining_num_blocks, 5886 ZERO_BUFFER_SIZE); 5887 num_blocks = num_bytes / _bdev_get_block_size_with_md(bdev_io->bdev); 5888 5889 if (spdk_bdev_is_md_separate(bdev_io->bdev)) { 5890 md_buf = (char *)g_bdev_mgr.zero_buffer + 5891 spdk_bdev_get_block_size(bdev_io->bdev) * num_blocks; 5892 } 5893 5894 rc = bdev_write_blocks_with_md(bdev_io->internal.desc, 5895 spdk_io_channel_from_ctx(bdev_io->internal.ch), 5896 g_bdev_mgr.zero_buffer, md_buf, 5897 bdev_io->u.bdev.split_current_offset_blocks, num_blocks, 5898 bdev_write_zero_buffer_done, bdev_io); 5899 if (rc == 0) { 5900 bdev_io->u.bdev.split_remaining_num_blocks -= num_blocks; 5901 bdev_io->u.bdev.split_current_offset_blocks += num_blocks; 5902 } else if (rc == -ENOMEM) { 5903 bdev_queue_io_wait_with_cb(bdev_io, bdev_write_zero_buffer_next); 5904 } else { 5905 bdev_io->internal.status = SPDK_BDEV_IO_STATUS_FAILED; 5906 bdev_io->internal.cb(bdev_io, false, bdev_io->internal.caller_ctx); 5907 } 5908 } 5909 5910 static void 5911 bdev_write_zero_buffer_done(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg) 5912 { 5913 struct spdk_bdev_io *parent_io = cb_arg; 5914 5915 spdk_bdev_free_io(bdev_io); 5916 5917 if (!success) { 5918 parent_io->internal.status = SPDK_BDEV_IO_STATUS_FAILED; 5919 parent_io->internal.cb(parent_io, false, parent_io->internal.caller_ctx); 5920 return; 5921 } 5922 5923 if (parent_io->u.bdev.split_remaining_num_blocks == 0) { 5924 parent_io->internal.status = SPDK_BDEV_IO_STATUS_SUCCESS; 5925 parent_io->internal.cb(parent_io, true, parent_io->internal.caller_ctx); 5926 return; 5927 } 5928 5929 bdev_write_zero_buffer_next(parent_io); 5930 } 5931 5932 static void 5933 bdev_set_qos_limit_done(struct set_qos_limit_ctx *ctx, int status) 5934 { 5935 pthread_mutex_lock(&ctx->bdev->internal.mutex); 5936 ctx->bdev->internal.qos_mod_in_progress = false; 5937 pthread_mutex_unlock(&ctx->bdev->internal.mutex); 5938 5939 if (ctx->cb_fn) { 5940 ctx->cb_fn(ctx->cb_arg, status); 5941 } 5942 free(ctx); 5943 } 5944 5945 static void 5946 bdev_disable_qos_done(void *cb_arg) 5947 { 5948 struct set_qos_limit_ctx *ctx = cb_arg; 5949 struct spdk_bdev *bdev = ctx->bdev; 5950 struct spdk_bdev_io *bdev_io; 5951 struct spdk_bdev_qos *qos; 5952 5953 pthread_mutex_lock(&bdev->internal.mutex); 5954 qos = bdev->internal.qos; 5955 bdev->internal.qos = NULL; 5956 pthread_mutex_unlock(&bdev->internal.mutex); 5957 5958 while (!TAILQ_EMPTY(&qos->queued)) { 5959 /* Send queued I/O back to their original thread for resubmission. */ 5960 bdev_io = TAILQ_FIRST(&qos->queued); 5961 TAILQ_REMOVE(&qos->queued, bdev_io, internal.link); 5962 5963 if (bdev_io->internal.io_submit_ch) { 5964 /* 5965 * Channel was changed when sending it to the QoS thread - change it back 5966 * before sending it back to the original thread. 5967 */ 5968 bdev_io->internal.ch = bdev_io->internal.io_submit_ch; 5969 bdev_io->internal.io_submit_ch = NULL; 5970 } 5971 5972 spdk_thread_send_msg(spdk_bdev_io_get_thread(bdev_io), 5973 _bdev_io_submit, bdev_io); 5974 } 5975 5976 if (qos->thread != NULL) { 5977 spdk_put_io_channel(spdk_io_channel_from_ctx(qos->ch)); 5978 spdk_poller_unregister(&qos->poller); 5979 } 5980 5981 free(qos); 5982 5983 bdev_set_qos_limit_done(ctx, 0); 5984 } 5985 5986 static void 5987 bdev_disable_qos_msg_done(struct spdk_io_channel_iter *i, int status) 5988 { 5989 void *io_device = spdk_io_channel_iter_get_io_device(i); 5990 struct spdk_bdev *bdev = __bdev_from_io_dev(io_device); 5991 struct set_qos_limit_ctx *ctx = spdk_io_channel_iter_get_ctx(i); 5992 struct spdk_thread *thread; 5993 5994 pthread_mutex_lock(&bdev->internal.mutex); 5995 thread = bdev->internal.qos->thread; 5996 pthread_mutex_unlock(&bdev->internal.mutex); 5997 5998 if (thread != NULL) { 5999 spdk_thread_send_msg(thread, bdev_disable_qos_done, ctx); 6000 } else { 6001 bdev_disable_qos_done(ctx); 6002 } 6003 } 6004 6005 static void 6006 bdev_disable_qos_msg(struct spdk_io_channel_iter *i) 6007 { 6008 struct spdk_io_channel *ch = spdk_io_channel_iter_get_channel(i); 6009 struct spdk_bdev_channel *bdev_ch = spdk_io_channel_get_ctx(ch); 6010 6011 bdev_ch->flags &= ~BDEV_CH_QOS_ENABLED; 6012 6013 spdk_for_each_channel_continue(i, 0); 6014 } 6015 6016 static void 6017 bdev_update_qos_rate_limit_msg(void *cb_arg) 6018 { 6019 struct set_qos_limit_ctx *ctx = cb_arg; 6020 struct spdk_bdev *bdev = ctx->bdev; 6021 6022 pthread_mutex_lock(&bdev->internal.mutex); 6023 bdev_qos_update_max_quota_per_timeslice(bdev->internal.qos); 6024 pthread_mutex_unlock(&bdev->internal.mutex); 6025 6026 bdev_set_qos_limit_done(ctx, 0); 6027 } 6028 6029 static void 6030 bdev_enable_qos_msg(struct spdk_io_channel_iter *i) 6031 { 6032 void *io_device = spdk_io_channel_iter_get_io_device(i); 6033 struct spdk_bdev *bdev = __bdev_from_io_dev(io_device); 6034 struct spdk_io_channel *ch = spdk_io_channel_iter_get_channel(i); 6035 struct spdk_bdev_channel *bdev_ch = spdk_io_channel_get_ctx(ch); 6036 6037 pthread_mutex_lock(&bdev->internal.mutex); 6038 bdev_enable_qos(bdev, bdev_ch); 6039 pthread_mutex_unlock(&bdev->internal.mutex); 6040 spdk_for_each_channel_continue(i, 0); 6041 } 6042 6043 static void 6044 bdev_enable_qos_done(struct spdk_io_channel_iter *i, int status) 6045 { 6046 struct set_qos_limit_ctx *ctx = spdk_io_channel_iter_get_ctx(i); 6047 6048 bdev_set_qos_limit_done(ctx, status); 6049 } 6050 6051 static void 6052 bdev_set_qos_rate_limits(struct spdk_bdev *bdev, uint64_t *limits) 6053 { 6054 int i; 6055 6056 assert(bdev->internal.qos != NULL); 6057 6058 for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) { 6059 if (limits[i] != SPDK_BDEV_QOS_LIMIT_NOT_DEFINED) { 6060 bdev->internal.qos->rate_limits[i].limit = limits[i]; 6061 6062 if (limits[i] == 0) { 6063 bdev->internal.qos->rate_limits[i].limit = 6064 SPDK_BDEV_QOS_LIMIT_NOT_DEFINED; 6065 } 6066 } 6067 } 6068 } 6069 6070 void 6071 spdk_bdev_set_qos_rate_limits(struct spdk_bdev *bdev, uint64_t *limits, 6072 void (*cb_fn)(void *cb_arg, int status), void *cb_arg) 6073 { 6074 struct set_qos_limit_ctx *ctx; 6075 uint32_t limit_set_complement; 6076 uint64_t min_limit_per_sec; 6077 int i; 6078 bool disable_rate_limit = true; 6079 6080 for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) { 6081 if (limits[i] == SPDK_BDEV_QOS_LIMIT_NOT_DEFINED) { 6082 continue; 6083 } 6084 6085 if (limits[i] > 0) { 6086 disable_rate_limit = false; 6087 } 6088 6089 if (bdev_qos_is_iops_rate_limit(i) == true) { 6090 min_limit_per_sec = SPDK_BDEV_QOS_MIN_IOS_PER_SEC; 6091 } else { 6092 /* Change from megabyte to byte rate limit */ 6093 limits[i] = limits[i] * 1024 * 1024; 6094 min_limit_per_sec = SPDK_BDEV_QOS_MIN_BYTES_PER_SEC; 6095 } 6096 6097 limit_set_complement = limits[i] % min_limit_per_sec; 6098 if (limit_set_complement) { 6099 SPDK_ERRLOG("Requested rate limit %" PRIu64 " is not a multiple of %" PRIu64 "\n", 6100 limits[i], min_limit_per_sec); 6101 limits[i] += min_limit_per_sec - limit_set_complement; 6102 SPDK_ERRLOG("Round up the rate limit to %" PRIu64 "\n", limits[i]); 6103 } 6104 } 6105 6106 ctx = calloc(1, sizeof(*ctx)); 6107 if (ctx == NULL) { 6108 cb_fn(cb_arg, -ENOMEM); 6109 return; 6110 } 6111 6112 ctx->cb_fn = cb_fn; 6113 ctx->cb_arg = cb_arg; 6114 ctx->bdev = bdev; 6115 6116 pthread_mutex_lock(&bdev->internal.mutex); 6117 if (bdev->internal.qos_mod_in_progress) { 6118 pthread_mutex_unlock(&bdev->internal.mutex); 6119 free(ctx); 6120 cb_fn(cb_arg, -EAGAIN); 6121 return; 6122 } 6123 bdev->internal.qos_mod_in_progress = true; 6124 6125 if (disable_rate_limit == true && bdev->internal.qos) { 6126 for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) { 6127 if (limits[i] == SPDK_BDEV_QOS_LIMIT_NOT_DEFINED && 6128 (bdev->internal.qos->rate_limits[i].limit > 0 && 6129 bdev->internal.qos->rate_limits[i].limit != 6130 SPDK_BDEV_QOS_LIMIT_NOT_DEFINED)) { 6131 disable_rate_limit = false; 6132 break; 6133 } 6134 } 6135 } 6136 6137 if (disable_rate_limit == false) { 6138 if (bdev->internal.qos == NULL) { 6139 bdev->internal.qos = calloc(1, sizeof(*bdev->internal.qos)); 6140 if (!bdev->internal.qos) { 6141 pthread_mutex_unlock(&bdev->internal.mutex); 6142 SPDK_ERRLOG("Unable to allocate memory for QoS tracking\n"); 6143 bdev_set_qos_limit_done(ctx, -ENOMEM); 6144 return; 6145 } 6146 } 6147 6148 if (bdev->internal.qos->thread == NULL) { 6149 /* Enabling */ 6150 bdev_set_qos_rate_limits(bdev, limits); 6151 6152 spdk_for_each_channel(__bdev_to_io_dev(bdev), 6153 bdev_enable_qos_msg, ctx, 6154 bdev_enable_qos_done); 6155 } else { 6156 /* Updating */ 6157 bdev_set_qos_rate_limits(bdev, limits); 6158 6159 spdk_thread_send_msg(bdev->internal.qos->thread, 6160 bdev_update_qos_rate_limit_msg, ctx); 6161 } 6162 } else { 6163 if (bdev->internal.qos != NULL) { 6164 bdev_set_qos_rate_limits(bdev, limits); 6165 6166 /* Disabling */ 6167 spdk_for_each_channel(__bdev_to_io_dev(bdev), 6168 bdev_disable_qos_msg, ctx, 6169 bdev_disable_qos_msg_done); 6170 } else { 6171 pthread_mutex_unlock(&bdev->internal.mutex); 6172 bdev_set_qos_limit_done(ctx, 0); 6173 return; 6174 } 6175 } 6176 6177 pthread_mutex_unlock(&bdev->internal.mutex); 6178 } 6179 6180 struct spdk_bdev_histogram_ctx { 6181 spdk_bdev_histogram_status_cb cb_fn; 6182 void *cb_arg; 6183 struct spdk_bdev *bdev; 6184 int status; 6185 }; 6186 6187 static void 6188 bdev_histogram_disable_channel_cb(struct spdk_io_channel_iter *i, int status) 6189 { 6190 struct spdk_bdev_histogram_ctx *ctx = spdk_io_channel_iter_get_ctx(i); 6191 6192 pthread_mutex_lock(&ctx->bdev->internal.mutex); 6193 ctx->bdev->internal.histogram_in_progress = false; 6194 pthread_mutex_unlock(&ctx->bdev->internal.mutex); 6195 ctx->cb_fn(ctx->cb_arg, ctx->status); 6196 free(ctx); 6197 } 6198 6199 static void 6200 bdev_histogram_disable_channel(struct spdk_io_channel_iter *i) 6201 { 6202 struct spdk_io_channel *_ch = spdk_io_channel_iter_get_channel(i); 6203 struct spdk_bdev_channel *ch = spdk_io_channel_get_ctx(_ch); 6204 6205 if (ch->histogram != NULL) { 6206 spdk_histogram_data_free(ch->histogram); 6207 ch->histogram = NULL; 6208 } 6209 spdk_for_each_channel_continue(i, 0); 6210 } 6211 6212 static void 6213 bdev_histogram_enable_channel_cb(struct spdk_io_channel_iter *i, int status) 6214 { 6215 struct spdk_bdev_histogram_ctx *ctx = spdk_io_channel_iter_get_ctx(i); 6216 6217 if (status != 0) { 6218 ctx->status = status; 6219 ctx->bdev->internal.histogram_enabled = false; 6220 spdk_for_each_channel(__bdev_to_io_dev(ctx->bdev), bdev_histogram_disable_channel, ctx, 6221 bdev_histogram_disable_channel_cb); 6222 } else { 6223 pthread_mutex_lock(&ctx->bdev->internal.mutex); 6224 ctx->bdev->internal.histogram_in_progress = false; 6225 pthread_mutex_unlock(&ctx->bdev->internal.mutex); 6226 ctx->cb_fn(ctx->cb_arg, ctx->status); 6227 free(ctx); 6228 } 6229 } 6230 6231 static void 6232 bdev_histogram_enable_channel(struct spdk_io_channel_iter *i) 6233 { 6234 struct spdk_io_channel *_ch = spdk_io_channel_iter_get_channel(i); 6235 struct spdk_bdev_channel *ch = spdk_io_channel_get_ctx(_ch); 6236 int status = 0; 6237 6238 if (ch->histogram == NULL) { 6239 ch->histogram = spdk_histogram_data_alloc(); 6240 if (ch->histogram == NULL) { 6241 status = -ENOMEM; 6242 } 6243 } 6244 6245 spdk_for_each_channel_continue(i, status); 6246 } 6247 6248 void 6249 spdk_bdev_histogram_enable(struct spdk_bdev *bdev, spdk_bdev_histogram_status_cb cb_fn, 6250 void *cb_arg, bool enable) 6251 { 6252 struct spdk_bdev_histogram_ctx *ctx; 6253 6254 ctx = calloc(1, sizeof(struct spdk_bdev_histogram_ctx)); 6255 if (ctx == NULL) { 6256 cb_fn(cb_arg, -ENOMEM); 6257 return; 6258 } 6259 6260 ctx->bdev = bdev; 6261 ctx->status = 0; 6262 ctx->cb_fn = cb_fn; 6263 ctx->cb_arg = cb_arg; 6264 6265 pthread_mutex_lock(&bdev->internal.mutex); 6266 if (bdev->internal.histogram_in_progress) { 6267 pthread_mutex_unlock(&bdev->internal.mutex); 6268 free(ctx); 6269 cb_fn(cb_arg, -EAGAIN); 6270 return; 6271 } 6272 6273 bdev->internal.histogram_in_progress = true; 6274 pthread_mutex_unlock(&bdev->internal.mutex); 6275 6276 bdev->internal.histogram_enabled = enable; 6277 6278 if (enable) { 6279 /* Allocate histogram for each channel */ 6280 spdk_for_each_channel(__bdev_to_io_dev(bdev), bdev_histogram_enable_channel, ctx, 6281 bdev_histogram_enable_channel_cb); 6282 } else { 6283 spdk_for_each_channel(__bdev_to_io_dev(bdev), bdev_histogram_disable_channel, ctx, 6284 bdev_histogram_disable_channel_cb); 6285 } 6286 } 6287 6288 struct spdk_bdev_histogram_data_ctx { 6289 spdk_bdev_histogram_data_cb cb_fn; 6290 void *cb_arg; 6291 struct spdk_bdev *bdev; 6292 /** merged histogram data from all channels */ 6293 struct spdk_histogram_data *histogram; 6294 }; 6295 6296 static void 6297 bdev_histogram_get_channel_cb(struct spdk_io_channel_iter *i, int status) 6298 { 6299 struct spdk_bdev_histogram_data_ctx *ctx = spdk_io_channel_iter_get_ctx(i); 6300 6301 ctx->cb_fn(ctx->cb_arg, status, ctx->histogram); 6302 free(ctx); 6303 } 6304 6305 static void 6306 bdev_histogram_get_channel(struct spdk_io_channel_iter *i) 6307 { 6308 struct spdk_io_channel *_ch = spdk_io_channel_iter_get_channel(i); 6309 struct spdk_bdev_channel *ch = spdk_io_channel_get_ctx(_ch); 6310 struct spdk_bdev_histogram_data_ctx *ctx = spdk_io_channel_iter_get_ctx(i); 6311 int status = 0; 6312 6313 if (ch->histogram == NULL) { 6314 status = -EFAULT; 6315 } else { 6316 spdk_histogram_data_merge(ctx->histogram, ch->histogram); 6317 } 6318 6319 spdk_for_each_channel_continue(i, status); 6320 } 6321 6322 void 6323 spdk_bdev_histogram_get(struct spdk_bdev *bdev, struct spdk_histogram_data *histogram, 6324 spdk_bdev_histogram_data_cb cb_fn, 6325 void *cb_arg) 6326 { 6327 struct spdk_bdev_histogram_data_ctx *ctx; 6328 6329 ctx = calloc(1, sizeof(struct spdk_bdev_histogram_data_ctx)); 6330 if (ctx == NULL) { 6331 cb_fn(cb_arg, -ENOMEM, NULL); 6332 return; 6333 } 6334 6335 ctx->bdev = bdev; 6336 ctx->cb_fn = cb_fn; 6337 ctx->cb_arg = cb_arg; 6338 6339 ctx->histogram = histogram; 6340 6341 spdk_for_each_channel(__bdev_to_io_dev(bdev), bdev_histogram_get_channel, ctx, 6342 bdev_histogram_get_channel_cb); 6343 } 6344 6345 size_t 6346 spdk_bdev_get_media_events(struct spdk_bdev_desc *desc, struct spdk_bdev_media_event *events, 6347 size_t max_events) 6348 { 6349 struct media_event_entry *entry; 6350 size_t num_events = 0; 6351 6352 for (; num_events < max_events; ++num_events) { 6353 entry = TAILQ_FIRST(&desc->pending_media_events); 6354 if (entry == NULL) { 6355 break; 6356 } 6357 6358 events[num_events] = entry->event; 6359 TAILQ_REMOVE(&desc->pending_media_events, entry, tailq); 6360 TAILQ_INSERT_TAIL(&desc->free_media_events, entry, tailq); 6361 } 6362 6363 return num_events; 6364 } 6365 6366 int 6367 spdk_bdev_push_media_events(struct spdk_bdev *bdev, const struct spdk_bdev_media_event *events, 6368 size_t num_events) 6369 { 6370 struct spdk_bdev_desc *desc; 6371 struct media_event_entry *entry; 6372 size_t event_id; 6373 int rc = 0; 6374 6375 assert(bdev->media_events); 6376 6377 pthread_mutex_lock(&bdev->internal.mutex); 6378 TAILQ_FOREACH(desc, &bdev->internal.open_descs, link) { 6379 if (desc->write) { 6380 break; 6381 } 6382 } 6383 6384 if (desc == NULL || desc->media_events_buffer == NULL) { 6385 rc = -ENODEV; 6386 goto out; 6387 } 6388 6389 for (event_id = 0; event_id < num_events; ++event_id) { 6390 entry = TAILQ_FIRST(&desc->free_media_events); 6391 if (entry == NULL) { 6392 break; 6393 } 6394 6395 TAILQ_REMOVE(&desc->free_media_events, entry, tailq); 6396 TAILQ_INSERT_TAIL(&desc->pending_media_events, entry, tailq); 6397 entry->event = events[event_id]; 6398 } 6399 6400 rc = event_id; 6401 out: 6402 pthread_mutex_unlock(&bdev->internal.mutex); 6403 return rc; 6404 } 6405 6406 void 6407 spdk_bdev_notify_media_management(struct spdk_bdev *bdev) 6408 { 6409 struct spdk_bdev_desc *desc; 6410 6411 pthread_mutex_lock(&bdev->internal.mutex); 6412 TAILQ_FOREACH(desc, &bdev->internal.open_descs, link) { 6413 if (!TAILQ_EMPTY(&desc->pending_media_events)) { 6414 desc->callback.event_fn(SPDK_BDEV_EVENT_MEDIA_MANAGEMENT, bdev, 6415 desc->callback.ctx); 6416 } 6417 } 6418 pthread_mutex_unlock(&bdev->internal.mutex); 6419 } 6420 6421 struct locked_lba_range_ctx { 6422 struct lba_range range; 6423 struct spdk_bdev *bdev; 6424 struct lba_range *current_range; 6425 struct lba_range *owner_range; 6426 struct spdk_poller *poller; 6427 lock_range_cb cb_fn; 6428 void *cb_arg; 6429 }; 6430 6431 static void 6432 bdev_lock_error_cleanup_cb(struct spdk_io_channel_iter *i, int status) 6433 { 6434 struct locked_lba_range_ctx *ctx = spdk_io_channel_iter_get_ctx(i); 6435 6436 ctx->cb_fn(ctx->cb_arg, -ENOMEM); 6437 free(ctx); 6438 } 6439 6440 static void 6441 bdev_unlock_lba_range_get_channel(struct spdk_io_channel_iter *i); 6442 6443 static void 6444 bdev_lock_lba_range_cb(struct spdk_io_channel_iter *i, int status) 6445 { 6446 struct locked_lba_range_ctx *ctx = spdk_io_channel_iter_get_ctx(i); 6447 struct spdk_bdev *bdev = ctx->bdev; 6448 6449 if (status == -ENOMEM) { 6450 /* One of the channels could not allocate a range object. 6451 * So we have to go back and clean up any ranges that were 6452 * allocated successfully before we return error status to 6453 * the caller. We can reuse the unlock function to do that 6454 * clean up. 6455 */ 6456 spdk_for_each_channel(__bdev_to_io_dev(bdev), 6457 bdev_unlock_lba_range_get_channel, ctx, 6458 bdev_lock_error_cleanup_cb); 6459 return; 6460 } 6461 6462 /* All channels have locked this range and no I/O overlapping the range 6463 * are outstanding! Set the owner_ch for the range object for the 6464 * locking channel, so that this channel will know that it is allowed 6465 * to write to this range. 6466 */ 6467 ctx->owner_range->owner_ch = ctx->range.owner_ch; 6468 ctx->cb_fn(ctx->cb_arg, status); 6469 6470 /* Don't free the ctx here. Its range is in the bdev's global list of 6471 * locked ranges still, and will be removed and freed when this range 6472 * is later unlocked. 6473 */ 6474 } 6475 6476 static int 6477 bdev_lock_lba_range_check_io(void *_i) 6478 { 6479 struct spdk_io_channel_iter *i = _i; 6480 struct spdk_io_channel *_ch = spdk_io_channel_iter_get_channel(i); 6481 struct spdk_bdev_channel *ch = spdk_io_channel_get_ctx(_ch); 6482 struct locked_lba_range_ctx *ctx = spdk_io_channel_iter_get_ctx(i); 6483 struct lba_range *range = ctx->current_range; 6484 struct spdk_bdev_io *bdev_io; 6485 6486 spdk_poller_unregister(&ctx->poller); 6487 6488 /* The range is now in the locked_ranges, so no new IO can be submitted to this 6489 * range. But we need to wait until any outstanding IO overlapping with this range 6490 * are completed. 6491 */ 6492 TAILQ_FOREACH(bdev_io, &ch->io_submitted, internal.ch_link) { 6493 if (bdev_io_range_is_locked(bdev_io, range)) { 6494 ctx->poller = SPDK_POLLER_REGISTER(bdev_lock_lba_range_check_io, i, 100); 6495 return SPDK_POLLER_BUSY; 6496 } 6497 } 6498 6499 spdk_for_each_channel_continue(i, 0); 6500 return SPDK_POLLER_BUSY; 6501 } 6502 6503 static void 6504 bdev_lock_lba_range_get_channel(struct spdk_io_channel_iter *i) 6505 { 6506 struct spdk_io_channel *_ch = spdk_io_channel_iter_get_channel(i); 6507 struct spdk_bdev_channel *ch = spdk_io_channel_get_ctx(_ch); 6508 struct locked_lba_range_ctx *ctx = spdk_io_channel_iter_get_ctx(i); 6509 struct lba_range *range; 6510 6511 TAILQ_FOREACH(range, &ch->locked_ranges, tailq) { 6512 if (range->length == ctx->range.length && 6513 range->offset == ctx->range.offset && 6514 range->locked_ctx == ctx->range.locked_ctx) { 6515 /* This range already exists on this channel, so don't add 6516 * it again. This can happen when a new channel is created 6517 * while the for_each_channel operation is in progress. 6518 * Do not check for outstanding I/O in that case, since the 6519 * range was locked before any I/O could be submitted to the 6520 * new channel. 6521 */ 6522 spdk_for_each_channel_continue(i, 0); 6523 return; 6524 } 6525 } 6526 6527 range = calloc(1, sizeof(*range)); 6528 if (range == NULL) { 6529 spdk_for_each_channel_continue(i, -ENOMEM); 6530 return; 6531 } 6532 6533 range->length = ctx->range.length; 6534 range->offset = ctx->range.offset; 6535 range->locked_ctx = ctx->range.locked_ctx; 6536 ctx->current_range = range; 6537 if (ctx->range.owner_ch == ch) { 6538 /* This is the range object for the channel that will hold 6539 * the lock. Store it in the ctx object so that we can easily 6540 * set its owner_ch after the lock is finally acquired. 6541 */ 6542 ctx->owner_range = range; 6543 } 6544 TAILQ_INSERT_TAIL(&ch->locked_ranges, range, tailq); 6545 bdev_lock_lba_range_check_io(i); 6546 } 6547 6548 static void 6549 bdev_lock_lba_range_ctx(struct spdk_bdev *bdev, struct locked_lba_range_ctx *ctx) 6550 { 6551 assert(spdk_get_thread() == ctx->range.owner_ch->channel->thread); 6552 6553 /* We will add a copy of this range to each channel now. */ 6554 spdk_for_each_channel(__bdev_to_io_dev(bdev), bdev_lock_lba_range_get_channel, ctx, 6555 bdev_lock_lba_range_cb); 6556 } 6557 6558 static bool 6559 bdev_lba_range_overlaps_tailq(struct lba_range *range, lba_range_tailq_t *tailq) 6560 { 6561 struct lba_range *r; 6562 6563 TAILQ_FOREACH(r, tailq, tailq) { 6564 if (bdev_lba_range_overlapped(range, r)) { 6565 return true; 6566 } 6567 } 6568 return false; 6569 } 6570 6571 static int 6572 bdev_lock_lba_range(struct spdk_bdev_desc *desc, struct spdk_io_channel *_ch, 6573 uint64_t offset, uint64_t length, 6574 lock_range_cb cb_fn, void *cb_arg) 6575 { 6576 struct spdk_bdev *bdev = spdk_bdev_desc_get_bdev(desc); 6577 struct spdk_bdev_channel *ch = spdk_io_channel_get_ctx(_ch); 6578 struct locked_lba_range_ctx *ctx; 6579 6580 if (cb_arg == NULL) { 6581 SPDK_ERRLOG("cb_arg must not be NULL\n"); 6582 return -EINVAL; 6583 } 6584 6585 ctx = calloc(1, sizeof(*ctx)); 6586 if (ctx == NULL) { 6587 return -ENOMEM; 6588 } 6589 6590 ctx->range.offset = offset; 6591 ctx->range.length = length; 6592 ctx->range.owner_ch = ch; 6593 ctx->range.locked_ctx = cb_arg; 6594 ctx->bdev = bdev; 6595 ctx->cb_fn = cb_fn; 6596 ctx->cb_arg = cb_arg; 6597 6598 pthread_mutex_lock(&bdev->internal.mutex); 6599 if (bdev_lba_range_overlaps_tailq(&ctx->range, &bdev->internal.locked_ranges)) { 6600 /* There is an active lock overlapping with this range. 6601 * Put it on the pending list until this range no 6602 * longer overlaps with another. 6603 */ 6604 TAILQ_INSERT_TAIL(&bdev->internal.pending_locked_ranges, &ctx->range, tailq); 6605 } else { 6606 TAILQ_INSERT_TAIL(&bdev->internal.locked_ranges, &ctx->range, tailq); 6607 bdev_lock_lba_range_ctx(bdev, ctx); 6608 } 6609 pthread_mutex_unlock(&bdev->internal.mutex); 6610 return 0; 6611 } 6612 6613 static void 6614 bdev_lock_lba_range_ctx_msg(void *_ctx) 6615 { 6616 struct locked_lba_range_ctx *ctx = _ctx; 6617 6618 bdev_lock_lba_range_ctx(ctx->bdev, ctx); 6619 } 6620 6621 static void 6622 bdev_unlock_lba_range_cb(struct spdk_io_channel_iter *i, int status) 6623 { 6624 struct locked_lba_range_ctx *ctx = spdk_io_channel_iter_get_ctx(i); 6625 struct locked_lba_range_ctx *pending_ctx; 6626 struct spdk_bdev_channel *ch = ctx->range.owner_ch; 6627 struct spdk_bdev *bdev = ch->bdev; 6628 struct lba_range *range, *tmp; 6629 6630 pthread_mutex_lock(&bdev->internal.mutex); 6631 /* Check if there are any pending locked ranges that overlap with this range 6632 * that was just unlocked. If there are, check that it doesn't overlap with any 6633 * other locked ranges before calling bdev_lock_lba_range_ctx which will start 6634 * the lock process. 6635 */ 6636 TAILQ_FOREACH_SAFE(range, &bdev->internal.pending_locked_ranges, tailq, tmp) { 6637 if (bdev_lba_range_overlapped(range, &ctx->range) && 6638 !bdev_lba_range_overlaps_tailq(range, &bdev->internal.locked_ranges)) { 6639 TAILQ_REMOVE(&bdev->internal.pending_locked_ranges, range, tailq); 6640 pending_ctx = SPDK_CONTAINEROF(range, struct locked_lba_range_ctx, range); 6641 TAILQ_INSERT_TAIL(&bdev->internal.locked_ranges, range, tailq); 6642 spdk_thread_send_msg(pending_ctx->range.owner_ch->channel->thread, 6643 bdev_lock_lba_range_ctx_msg, pending_ctx); 6644 } 6645 } 6646 pthread_mutex_unlock(&bdev->internal.mutex); 6647 6648 ctx->cb_fn(ctx->cb_arg, status); 6649 free(ctx); 6650 } 6651 6652 static void 6653 bdev_unlock_lba_range_get_channel(struct spdk_io_channel_iter *i) 6654 { 6655 struct spdk_io_channel *_ch = spdk_io_channel_iter_get_channel(i); 6656 struct spdk_bdev_channel *ch = spdk_io_channel_get_ctx(_ch); 6657 struct locked_lba_range_ctx *ctx = spdk_io_channel_iter_get_ctx(i); 6658 TAILQ_HEAD(, spdk_bdev_io) io_locked; 6659 struct spdk_bdev_io *bdev_io; 6660 struct lba_range *range; 6661 6662 TAILQ_FOREACH(range, &ch->locked_ranges, tailq) { 6663 if (ctx->range.offset == range->offset && 6664 ctx->range.length == range->length && 6665 ctx->range.locked_ctx == range->locked_ctx) { 6666 TAILQ_REMOVE(&ch->locked_ranges, range, tailq); 6667 free(range); 6668 break; 6669 } 6670 } 6671 6672 /* Note: we should almost always be able to assert that the range specified 6673 * was found. But there are some very rare corner cases where a new channel 6674 * gets created simultaneously with a range unlock, where this function 6675 * would execute on that new channel and wouldn't have the range. 6676 * We also use this to clean up range allocations when a later allocation 6677 * fails in the locking path. 6678 * So we can't actually assert() here. 6679 */ 6680 6681 /* Swap the locked IO into a temporary list, and then try to submit them again. 6682 * We could hyper-optimize this to only resubmit locked I/O that overlap 6683 * with the range that was just unlocked, but this isn't a performance path so 6684 * we go for simplicity here. 6685 */ 6686 TAILQ_INIT(&io_locked); 6687 TAILQ_SWAP(&ch->io_locked, &io_locked, spdk_bdev_io, internal.ch_link); 6688 while (!TAILQ_EMPTY(&io_locked)) { 6689 bdev_io = TAILQ_FIRST(&io_locked); 6690 TAILQ_REMOVE(&io_locked, bdev_io, internal.ch_link); 6691 bdev_io_submit(bdev_io); 6692 } 6693 6694 spdk_for_each_channel_continue(i, 0); 6695 } 6696 6697 static int 6698 bdev_unlock_lba_range(struct spdk_bdev_desc *desc, struct spdk_io_channel *_ch, 6699 uint64_t offset, uint64_t length, 6700 lock_range_cb cb_fn, void *cb_arg) 6701 { 6702 struct spdk_bdev *bdev = spdk_bdev_desc_get_bdev(desc); 6703 struct spdk_bdev_channel *ch = spdk_io_channel_get_ctx(_ch); 6704 struct locked_lba_range_ctx *ctx; 6705 struct lba_range *range; 6706 bool range_found = false; 6707 6708 /* Let's make sure the specified channel actually has a lock on 6709 * the specified range. Note that the range must match exactly. 6710 */ 6711 TAILQ_FOREACH(range, &ch->locked_ranges, tailq) { 6712 if (range->offset == offset && range->length == length && 6713 range->owner_ch == ch && range->locked_ctx == cb_arg) { 6714 range_found = true; 6715 break; 6716 } 6717 } 6718 6719 if (!range_found) { 6720 return -EINVAL; 6721 } 6722 6723 pthread_mutex_lock(&bdev->internal.mutex); 6724 /* We confirmed that this channel has locked the specified range. To 6725 * start the unlock the process, we find the range in the bdev's locked_ranges 6726 * and remove it. This ensures new channels don't inherit the locked range. 6727 * Then we will send a message to each channel (including the one specified 6728 * here) to remove the range from its per-channel list. 6729 */ 6730 TAILQ_FOREACH(range, &bdev->internal.locked_ranges, tailq) { 6731 if (range->offset == offset && range->length == length && 6732 range->locked_ctx == cb_arg) { 6733 break; 6734 } 6735 } 6736 if (range == NULL) { 6737 assert(false); 6738 pthread_mutex_unlock(&bdev->internal.mutex); 6739 return -EINVAL; 6740 } 6741 TAILQ_REMOVE(&bdev->internal.locked_ranges, range, tailq); 6742 ctx = SPDK_CONTAINEROF(range, struct locked_lba_range_ctx, range); 6743 pthread_mutex_unlock(&bdev->internal.mutex); 6744 6745 ctx->cb_fn = cb_fn; 6746 ctx->cb_arg = cb_arg; 6747 6748 spdk_for_each_channel(__bdev_to_io_dev(bdev), bdev_unlock_lba_range_get_channel, ctx, 6749 bdev_unlock_lba_range_cb); 6750 return 0; 6751 } 6752 6753 SPDK_LOG_REGISTER_COMPONENT("bdev", SPDK_LOG_BDEV) 6754 6755 SPDK_TRACE_REGISTER_FN(bdev_trace, "bdev", TRACE_GROUP_BDEV) 6756 { 6757 spdk_trace_register_owner(OWNER_BDEV, 'b'); 6758 spdk_trace_register_object(OBJECT_BDEV_IO, 'i'); 6759 spdk_trace_register_description("BDEV_IO_START", TRACE_BDEV_IO_START, OWNER_BDEV, 6760 OBJECT_BDEV_IO, 1, 0, "type: "); 6761 spdk_trace_register_description("BDEV_IO_DONE", TRACE_BDEV_IO_DONE, OWNER_BDEV, 6762 OBJECT_BDEV_IO, 0, 0, ""); 6763 } 6764