1 /*- 2 * BSD LICENSE 3 * 4 * Copyright (c) Intel Corporation. All rights reserved. 5 * Copyright (c) 2019 Mellanox Technologies LTD. All rights reserved. 6 * 7 * Redistribution and use in source and binary forms, with or without 8 * modification, are permitted provided that the following conditions 9 * are met: 10 * 11 * * Redistributions of source code must retain the above copyright 12 * notice, this list of conditions and the following disclaimer. 13 * * Redistributions in binary form must reproduce the above copyright 14 * notice, this list of conditions and the following disclaimer in 15 * the documentation and/or other materials provided with the 16 * distribution. 17 * * Neither the name of Intel Corporation nor the names of its 18 * contributors may be used to endorse or promote products derived 19 * from this software without specific prior written permission. 20 * 21 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 22 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 23 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR 24 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT 25 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, 26 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT 27 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 28 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY 29 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 30 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE 31 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 32 */ 33 34 #include "spdk/stdinc.h" 35 36 #include "spdk/bdev.h" 37 #include "spdk/conf.h" 38 39 #include "spdk/config.h" 40 #include "spdk/env.h" 41 #include "spdk/thread.h" 42 #include "spdk/likely.h" 43 #include "spdk/queue.h" 44 #include "spdk/nvme_spec.h" 45 #include "spdk/scsi_spec.h" 46 #include "spdk/notify.h" 47 #include "spdk/util.h" 48 #include "spdk/trace.h" 49 50 #include "spdk/bdev_module.h" 51 #include "spdk_internal/log.h" 52 #include "spdk/string.h" 53 54 #include "bdev_internal.h" 55 56 #ifdef SPDK_CONFIG_VTUNE 57 #include "ittnotify.h" 58 #include "ittnotify_types.h" 59 int __itt_init_ittlib(const char *, __itt_group_id); 60 #endif 61 62 #define SPDK_BDEV_IO_POOL_SIZE (64 * 1024 - 1) 63 #define SPDK_BDEV_IO_CACHE_SIZE 256 64 #define SPDK_BDEV_AUTO_EXAMINE true 65 #define BUF_SMALL_POOL_SIZE 8191 66 #define BUF_LARGE_POOL_SIZE 1023 67 #define NOMEM_THRESHOLD_COUNT 8 68 #define ZERO_BUFFER_SIZE 0x100000 69 70 #define OWNER_BDEV 0x2 71 72 #define OBJECT_BDEV_IO 0x2 73 74 #define TRACE_GROUP_BDEV 0x3 75 #define TRACE_BDEV_IO_START SPDK_TPOINT_ID(TRACE_GROUP_BDEV, 0x0) 76 #define TRACE_BDEV_IO_DONE SPDK_TPOINT_ID(TRACE_GROUP_BDEV, 0x1) 77 78 #define SPDK_BDEV_QOS_TIMESLICE_IN_USEC 1000 79 #define SPDK_BDEV_QOS_MIN_IO_PER_TIMESLICE 1 80 #define SPDK_BDEV_QOS_MIN_BYTE_PER_TIMESLICE 512 81 #define SPDK_BDEV_QOS_MIN_IOS_PER_SEC 1000 82 #define SPDK_BDEV_QOS_MIN_BYTES_PER_SEC (1024 * 1024) 83 #define SPDK_BDEV_QOS_LIMIT_NOT_DEFINED UINT64_MAX 84 #define SPDK_BDEV_IO_POLL_INTERVAL_IN_MSEC 1000 85 86 #define SPDK_BDEV_POOL_ALIGNMENT 512 87 88 static const char *qos_conf_type[] = {"Limit_IOPS", 89 "Limit_BPS", "Limit_Read_BPS", "Limit_Write_BPS" 90 }; 91 static const char *qos_rpc_type[] = {"rw_ios_per_sec", 92 "rw_mbytes_per_sec", "r_mbytes_per_sec", "w_mbytes_per_sec" 93 }; 94 95 TAILQ_HEAD(spdk_bdev_list, spdk_bdev); 96 97 struct spdk_bdev_mgr { 98 struct spdk_mempool *bdev_io_pool; 99 100 struct spdk_mempool *buf_small_pool; 101 struct spdk_mempool *buf_large_pool; 102 103 void *zero_buffer; 104 105 TAILQ_HEAD(bdev_module_list, spdk_bdev_module) bdev_modules; 106 107 struct spdk_bdev_list bdevs; 108 109 bool init_complete; 110 bool module_init_complete; 111 112 pthread_mutex_t mutex; 113 114 #ifdef SPDK_CONFIG_VTUNE 115 __itt_domain *domain; 116 #endif 117 }; 118 119 static struct spdk_bdev_mgr g_bdev_mgr = { 120 .bdev_modules = TAILQ_HEAD_INITIALIZER(g_bdev_mgr.bdev_modules), 121 .bdevs = TAILQ_HEAD_INITIALIZER(g_bdev_mgr.bdevs), 122 .init_complete = false, 123 .module_init_complete = false, 124 .mutex = PTHREAD_MUTEX_INITIALIZER, 125 }; 126 127 typedef void (*lock_range_cb)(void *ctx, int status); 128 129 struct lba_range { 130 uint64_t offset; 131 uint64_t length; 132 void *locked_ctx; 133 struct spdk_bdev_channel *owner_ch; 134 TAILQ_ENTRY(lba_range) tailq; 135 }; 136 137 static struct spdk_bdev_opts g_bdev_opts = { 138 .bdev_io_pool_size = SPDK_BDEV_IO_POOL_SIZE, 139 .bdev_io_cache_size = SPDK_BDEV_IO_CACHE_SIZE, 140 .bdev_auto_examine = SPDK_BDEV_AUTO_EXAMINE, 141 }; 142 143 static spdk_bdev_init_cb g_init_cb_fn = NULL; 144 static void *g_init_cb_arg = NULL; 145 146 static spdk_bdev_fini_cb g_fini_cb_fn = NULL; 147 static void *g_fini_cb_arg = NULL; 148 static struct spdk_thread *g_fini_thread = NULL; 149 150 struct spdk_bdev_qos_limit { 151 /** IOs or bytes allowed per second (i.e., 1s). */ 152 uint64_t limit; 153 154 /** Remaining IOs or bytes allowed in current timeslice (e.g., 1ms). 155 * For remaining bytes, allowed to run negative if an I/O is submitted when 156 * some bytes are remaining, but the I/O is bigger than that amount. The 157 * excess will be deducted from the next timeslice. 158 */ 159 int64_t remaining_this_timeslice; 160 161 /** Minimum allowed IOs or bytes to be issued in one timeslice (e.g., 1ms). */ 162 uint32_t min_per_timeslice; 163 164 /** Maximum allowed IOs or bytes to be issued in one timeslice (e.g., 1ms). */ 165 uint32_t max_per_timeslice; 166 167 /** Function to check whether to queue the IO. */ 168 bool (*queue_io)(const struct spdk_bdev_qos_limit *limit, struct spdk_bdev_io *io); 169 170 /** Function to update for the submitted IO. */ 171 void (*update_quota)(struct spdk_bdev_qos_limit *limit, struct spdk_bdev_io *io); 172 }; 173 174 struct spdk_bdev_qos { 175 /** Types of structure of rate limits. */ 176 struct spdk_bdev_qos_limit rate_limits[SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES]; 177 178 /** The channel that all I/O are funneled through. */ 179 struct spdk_bdev_channel *ch; 180 181 /** The thread on which the poller is running. */ 182 struct spdk_thread *thread; 183 184 /** Queue of I/O waiting to be issued. */ 185 bdev_io_tailq_t queued; 186 187 /** Size of a timeslice in tsc ticks. */ 188 uint64_t timeslice_size; 189 190 /** Timestamp of start of last timeslice. */ 191 uint64_t last_timeslice; 192 193 /** Poller that processes queued I/O commands each time slice. */ 194 struct spdk_poller *poller; 195 }; 196 197 struct spdk_bdev_mgmt_channel { 198 bdev_io_stailq_t need_buf_small; 199 bdev_io_stailq_t need_buf_large; 200 201 /* 202 * Each thread keeps a cache of bdev_io - this allows 203 * bdev threads which are *not* DPDK threads to still 204 * benefit from a per-thread bdev_io cache. Without 205 * this, non-DPDK threads fetching from the mempool 206 * incur a cmpxchg on get and put. 207 */ 208 bdev_io_stailq_t per_thread_cache; 209 uint32_t per_thread_cache_count; 210 uint32_t bdev_io_cache_size; 211 212 TAILQ_HEAD(, spdk_bdev_shared_resource) shared_resources; 213 TAILQ_HEAD(, spdk_bdev_io_wait_entry) io_wait_queue; 214 }; 215 216 /* 217 * Per-module (or per-io_device) data. Multiple bdevs built on the same io_device 218 * will queue here their IO that awaits retry. It makes it possible to retry sending 219 * IO to one bdev after IO from other bdev completes. 220 */ 221 struct spdk_bdev_shared_resource { 222 /* The bdev management channel */ 223 struct spdk_bdev_mgmt_channel *mgmt_ch; 224 225 /* 226 * Count of I/O submitted to bdev module and waiting for completion. 227 * Incremented before submit_request() is called on an spdk_bdev_io. 228 */ 229 uint64_t io_outstanding; 230 231 /* 232 * Queue of IO awaiting retry because of a previous NOMEM status returned 233 * on this channel. 234 */ 235 bdev_io_tailq_t nomem_io; 236 237 /* 238 * Threshold which io_outstanding must drop to before retrying nomem_io. 239 */ 240 uint64_t nomem_threshold; 241 242 /* I/O channel allocated by a bdev module */ 243 struct spdk_io_channel *shared_ch; 244 245 /* Refcount of bdev channels using this resource */ 246 uint32_t ref; 247 248 TAILQ_ENTRY(spdk_bdev_shared_resource) link; 249 }; 250 251 #define BDEV_CH_RESET_IN_PROGRESS (1 << 0) 252 #define BDEV_CH_QOS_ENABLED (1 << 1) 253 254 struct spdk_bdev_channel { 255 struct spdk_bdev *bdev; 256 257 /* The channel for the underlying device */ 258 struct spdk_io_channel *channel; 259 260 /* Per io_device per thread data */ 261 struct spdk_bdev_shared_resource *shared_resource; 262 263 struct spdk_bdev_io_stat stat; 264 265 /* 266 * Count of I/O submitted to the underlying dev module through this channel 267 * and waiting for completion. 268 */ 269 uint64_t io_outstanding; 270 271 /* 272 * List of all submitted I/Os including I/O that are generated via splitting. 273 */ 274 bdev_io_tailq_t io_submitted; 275 276 /* 277 * List of spdk_bdev_io that are currently queued because they write to a locked 278 * LBA range. 279 */ 280 bdev_io_tailq_t io_locked; 281 282 uint32_t flags; 283 284 struct spdk_histogram_data *histogram; 285 286 #ifdef SPDK_CONFIG_VTUNE 287 uint64_t start_tsc; 288 uint64_t interval_tsc; 289 __itt_string_handle *handle; 290 struct spdk_bdev_io_stat prev_stat; 291 #endif 292 293 bdev_io_tailq_t queued_resets; 294 295 lba_range_tailq_t locked_ranges; 296 }; 297 298 struct media_event_entry { 299 struct spdk_bdev_media_event event; 300 TAILQ_ENTRY(media_event_entry) tailq; 301 }; 302 303 #define MEDIA_EVENT_POOL_SIZE 64 304 305 struct spdk_bdev_desc { 306 struct spdk_bdev *bdev; 307 struct spdk_thread *thread; 308 struct { 309 bool open_with_ext; 310 union { 311 spdk_bdev_remove_cb_t remove_fn; 312 spdk_bdev_event_cb_t event_fn; 313 }; 314 void *ctx; 315 } callback; 316 bool closed; 317 bool write; 318 pthread_mutex_t mutex; 319 uint32_t refs; 320 TAILQ_HEAD(, media_event_entry) pending_media_events; 321 TAILQ_HEAD(, media_event_entry) free_media_events; 322 struct media_event_entry *media_events_buffer; 323 TAILQ_ENTRY(spdk_bdev_desc) link; 324 325 uint64_t timeout_in_sec; 326 spdk_bdev_io_timeout_cb cb_fn; 327 void *cb_arg; 328 struct spdk_poller *io_timeout_poller; 329 }; 330 331 struct spdk_bdev_iostat_ctx { 332 struct spdk_bdev_io_stat *stat; 333 spdk_bdev_get_device_stat_cb cb; 334 void *cb_arg; 335 }; 336 337 struct set_qos_limit_ctx { 338 void (*cb_fn)(void *cb_arg, int status); 339 void *cb_arg; 340 struct spdk_bdev *bdev; 341 }; 342 343 #define __bdev_to_io_dev(bdev) (((char *)bdev) + 1) 344 #define __bdev_from_io_dev(io_dev) ((struct spdk_bdev *)(((char *)io_dev) - 1)) 345 346 static void bdev_write_zero_buffer_done(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg); 347 static void bdev_write_zero_buffer_next(void *_bdev_io); 348 349 static void bdev_enable_qos_msg(struct spdk_io_channel_iter *i); 350 static void bdev_enable_qos_done(struct spdk_io_channel_iter *i, int status); 351 352 static int 353 bdev_readv_blocks_with_md(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 354 struct iovec *iov, int iovcnt, void *md_buf, uint64_t offset_blocks, 355 uint64_t num_blocks, spdk_bdev_io_completion_cb cb, void *cb_arg); 356 static int 357 bdev_writev_blocks_with_md(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 358 struct iovec *iov, int iovcnt, void *md_buf, 359 uint64_t offset_blocks, uint64_t num_blocks, 360 spdk_bdev_io_completion_cb cb, void *cb_arg); 361 362 static int 363 bdev_lock_lba_range(struct spdk_bdev_desc *desc, struct spdk_io_channel *_ch, 364 uint64_t offset, uint64_t length, 365 lock_range_cb cb_fn, void *cb_arg); 366 367 static int 368 bdev_unlock_lba_range(struct spdk_bdev_desc *desc, struct spdk_io_channel *_ch, 369 uint64_t offset, uint64_t length, 370 lock_range_cb cb_fn, void *cb_arg); 371 372 static inline void bdev_io_complete(void *ctx); 373 374 static bool bdev_abort_queued_io(bdev_io_tailq_t *queue, struct spdk_bdev_io *bio_to_abort); 375 static bool bdev_abort_buf_io(bdev_io_stailq_t *queue, struct spdk_bdev_io *bio_to_abort); 376 377 void 378 spdk_bdev_get_opts(struct spdk_bdev_opts *opts) 379 { 380 *opts = g_bdev_opts; 381 } 382 383 int 384 spdk_bdev_set_opts(struct spdk_bdev_opts *opts) 385 { 386 uint32_t min_pool_size; 387 388 /* 389 * Add 1 to the thread count to account for the extra mgmt_ch that gets created during subsystem 390 * initialization. A second mgmt_ch will be created on the same thread when the application starts 391 * but before the deferred put_io_channel event is executed for the first mgmt_ch. 392 */ 393 min_pool_size = opts->bdev_io_cache_size * (spdk_thread_get_count() + 1); 394 if (opts->bdev_io_pool_size < min_pool_size) { 395 SPDK_ERRLOG("bdev_io_pool_size %" PRIu32 " is not compatible with bdev_io_cache_size %" PRIu32 396 " and %" PRIu32 " threads\n", opts->bdev_io_pool_size, opts->bdev_io_cache_size, 397 spdk_thread_get_count()); 398 SPDK_ERRLOG("bdev_io_pool_size must be at least %" PRIu32 "\n", min_pool_size); 399 return -1; 400 } 401 402 g_bdev_opts = *opts; 403 return 0; 404 } 405 406 struct spdk_bdev_examine_item { 407 char *name; 408 TAILQ_ENTRY(spdk_bdev_examine_item) link; 409 }; 410 411 TAILQ_HEAD(spdk_bdev_examine_allowlist, spdk_bdev_examine_item); 412 413 struct spdk_bdev_examine_allowlist g_bdev_examine_allowlist = TAILQ_HEAD_INITIALIZER( 414 g_bdev_examine_allowlist); 415 416 static inline bool 417 bdev_examine_allowlist_check(const char *name) 418 { 419 struct spdk_bdev_examine_item *item; 420 TAILQ_FOREACH(item, &g_bdev_examine_allowlist, link) { 421 if (strcmp(name, item->name) == 0) { 422 return true; 423 } 424 } 425 return false; 426 } 427 428 static inline void 429 bdev_examine_allowlist_free(void) 430 { 431 struct spdk_bdev_examine_item *item; 432 while (!TAILQ_EMPTY(&g_bdev_examine_allowlist)) { 433 item = TAILQ_FIRST(&g_bdev_examine_allowlist); 434 TAILQ_REMOVE(&g_bdev_examine_allowlist, item, link); 435 free(item->name); 436 free(item); 437 } 438 } 439 440 static inline bool 441 bdev_in_examine_allowlist(struct spdk_bdev *bdev) 442 { 443 struct spdk_bdev_alias *tmp; 444 if (bdev_examine_allowlist_check(bdev->name)) { 445 return true; 446 } 447 TAILQ_FOREACH(tmp, &bdev->aliases, tailq) { 448 if (bdev_examine_allowlist_check(tmp->alias)) { 449 return true; 450 } 451 } 452 return false; 453 } 454 455 static inline bool 456 bdev_ok_to_examine(struct spdk_bdev *bdev) 457 { 458 if (g_bdev_opts.bdev_auto_examine) { 459 return true; 460 } else { 461 return bdev_in_examine_allowlist(bdev); 462 } 463 } 464 465 static void 466 bdev_examine(struct spdk_bdev *bdev) 467 { 468 struct spdk_bdev_module *module; 469 uint32_t action; 470 471 TAILQ_FOREACH(module, &g_bdev_mgr.bdev_modules, internal.tailq) { 472 if (module->examine_config && bdev_ok_to_examine(bdev)) { 473 action = module->internal.action_in_progress; 474 module->internal.action_in_progress++; 475 module->examine_config(bdev); 476 if (action != module->internal.action_in_progress) { 477 SPDK_ERRLOG("examine_config for module %s did not call spdk_bdev_module_examine_done()\n", 478 module->name); 479 } 480 } 481 } 482 483 if (bdev->internal.claim_module && bdev_ok_to_examine(bdev)) { 484 if (bdev->internal.claim_module->examine_disk) { 485 bdev->internal.claim_module->internal.action_in_progress++; 486 bdev->internal.claim_module->examine_disk(bdev); 487 } 488 return; 489 } 490 491 TAILQ_FOREACH(module, &g_bdev_mgr.bdev_modules, internal.tailq) { 492 if (module->examine_disk && bdev_ok_to_examine(bdev)) { 493 module->internal.action_in_progress++; 494 module->examine_disk(bdev); 495 } 496 } 497 } 498 499 int 500 spdk_bdev_examine(const char *name) 501 { 502 struct spdk_bdev *bdev; 503 struct spdk_bdev_examine_item *item; 504 505 if (g_bdev_opts.bdev_auto_examine) { 506 SPDK_ERRLOG("Manual examine is not allowed if auto examine is enabled"); 507 return -EINVAL; 508 } 509 510 if (bdev_examine_allowlist_check(name)) { 511 SPDK_ERRLOG("Duplicate bdev name for manual examine: %s\n", name); 512 return -EEXIST; 513 } 514 515 item = calloc(1, sizeof(*item)); 516 if (!item) { 517 return -ENOMEM; 518 } 519 item->name = strdup(name); 520 if (!item->name) { 521 free(item); 522 return -ENOMEM; 523 } 524 TAILQ_INSERT_TAIL(&g_bdev_examine_allowlist, item, link); 525 526 bdev = spdk_bdev_get_by_name(name); 527 if (bdev) { 528 bdev_examine(bdev); 529 } 530 return 0; 531 } 532 533 static inline void 534 bdev_examine_allowlist_config_json(struct spdk_json_write_ctx *w) 535 { 536 struct spdk_bdev_examine_item *item; 537 TAILQ_FOREACH(item, &g_bdev_examine_allowlist, link) { 538 spdk_json_write_object_begin(w); 539 spdk_json_write_named_string(w, "method", "bdev_examine"); 540 spdk_json_write_named_object_begin(w, "params"); 541 spdk_json_write_named_string(w, "name", item->name); 542 spdk_json_write_object_end(w); 543 spdk_json_write_object_end(w); 544 } 545 } 546 547 struct spdk_bdev * 548 spdk_bdev_first(void) 549 { 550 struct spdk_bdev *bdev; 551 552 bdev = TAILQ_FIRST(&g_bdev_mgr.bdevs); 553 if (bdev) { 554 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Starting bdev iteration at %s\n", bdev->name); 555 } 556 557 return bdev; 558 } 559 560 struct spdk_bdev * 561 spdk_bdev_next(struct spdk_bdev *prev) 562 { 563 struct spdk_bdev *bdev; 564 565 bdev = TAILQ_NEXT(prev, internal.link); 566 if (bdev) { 567 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Continuing bdev iteration at %s\n", bdev->name); 568 } 569 570 return bdev; 571 } 572 573 static struct spdk_bdev * 574 _bdev_next_leaf(struct spdk_bdev *bdev) 575 { 576 while (bdev != NULL) { 577 if (bdev->internal.claim_module == NULL) { 578 return bdev; 579 } else { 580 bdev = TAILQ_NEXT(bdev, internal.link); 581 } 582 } 583 584 return bdev; 585 } 586 587 struct spdk_bdev * 588 spdk_bdev_first_leaf(void) 589 { 590 struct spdk_bdev *bdev; 591 592 bdev = _bdev_next_leaf(TAILQ_FIRST(&g_bdev_mgr.bdevs)); 593 594 if (bdev) { 595 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Starting bdev iteration at %s\n", bdev->name); 596 } 597 598 return bdev; 599 } 600 601 struct spdk_bdev * 602 spdk_bdev_next_leaf(struct spdk_bdev *prev) 603 { 604 struct spdk_bdev *bdev; 605 606 bdev = _bdev_next_leaf(TAILQ_NEXT(prev, internal.link)); 607 608 if (bdev) { 609 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Continuing bdev iteration at %s\n", bdev->name); 610 } 611 612 return bdev; 613 } 614 615 struct spdk_bdev * 616 spdk_bdev_get_by_name(const char *bdev_name) 617 { 618 struct spdk_bdev_alias *tmp; 619 struct spdk_bdev *bdev = spdk_bdev_first(); 620 621 while (bdev != NULL) { 622 if (strcmp(bdev_name, bdev->name) == 0) { 623 return bdev; 624 } 625 626 TAILQ_FOREACH(tmp, &bdev->aliases, tailq) { 627 if (strcmp(bdev_name, tmp->alias) == 0) { 628 return bdev; 629 } 630 } 631 632 bdev = spdk_bdev_next(bdev); 633 } 634 635 return NULL; 636 } 637 638 void 639 spdk_bdev_io_set_buf(struct spdk_bdev_io *bdev_io, void *buf, size_t len) 640 { 641 struct iovec *iovs; 642 643 if (bdev_io->u.bdev.iovs == NULL) { 644 bdev_io->u.bdev.iovs = &bdev_io->iov; 645 bdev_io->u.bdev.iovcnt = 1; 646 } 647 648 iovs = bdev_io->u.bdev.iovs; 649 650 assert(iovs != NULL); 651 assert(bdev_io->u.bdev.iovcnt >= 1); 652 653 iovs[0].iov_base = buf; 654 iovs[0].iov_len = len; 655 } 656 657 void 658 spdk_bdev_io_set_md_buf(struct spdk_bdev_io *bdev_io, void *md_buf, size_t len) 659 { 660 assert((len / spdk_bdev_get_md_size(bdev_io->bdev)) >= bdev_io->u.bdev.num_blocks); 661 bdev_io->u.bdev.md_buf = md_buf; 662 } 663 664 static bool 665 _is_buf_allocated(const struct iovec *iovs) 666 { 667 if (iovs == NULL) { 668 return false; 669 } 670 671 return iovs[0].iov_base != NULL; 672 } 673 674 static bool 675 _are_iovs_aligned(struct iovec *iovs, int iovcnt, uint32_t alignment) 676 { 677 int i; 678 uintptr_t iov_base; 679 680 if (spdk_likely(alignment == 1)) { 681 return true; 682 } 683 684 for (i = 0; i < iovcnt; i++) { 685 iov_base = (uintptr_t)iovs[i].iov_base; 686 if ((iov_base & (alignment - 1)) != 0) { 687 return false; 688 } 689 } 690 691 return true; 692 } 693 694 static void 695 _copy_iovs_to_buf(void *buf, size_t buf_len, struct iovec *iovs, int iovcnt) 696 { 697 int i; 698 size_t len; 699 700 for (i = 0; i < iovcnt; i++) { 701 len = spdk_min(iovs[i].iov_len, buf_len); 702 memcpy(buf, iovs[i].iov_base, len); 703 buf += len; 704 buf_len -= len; 705 } 706 } 707 708 static void 709 _copy_buf_to_iovs(struct iovec *iovs, int iovcnt, void *buf, size_t buf_len) 710 { 711 int i; 712 size_t len; 713 714 for (i = 0; i < iovcnt; i++) { 715 len = spdk_min(iovs[i].iov_len, buf_len); 716 memcpy(iovs[i].iov_base, buf, len); 717 buf += len; 718 buf_len -= len; 719 } 720 } 721 722 static void 723 _bdev_io_set_bounce_buf(struct spdk_bdev_io *bdev_io, void *buf, size_t len) 724 { 725 /* save original iovec */ 726 bdev_io->internal.orig_iovs = bdev_io->u.bdev.iovs; 727 bdev_io->internal.orig_iovcnt = bdev_io->u.bdev.iovcnt; 728 /* set bounce iov */ 729 bdev_io->u.bdev.iovs = &bdev_io->internal.bounce_iov; 730 bdev_io->u.bdev.iovcnt = 1; 731 /* set bounce buffer for this operation */ 732 bdev_io->u.bdev.iovs[0].iov_base = buf; 733 bdev_io->u.bdev.iovs[0].iov_len = len; 734 /* if this is write path, copy data from original buffer to bounce buffer */ 735 if (bdev_io->type == SPDK_BDEV_IO_TYPE_WRITE) { 736 _copy_iovs_to_buf(buf, len, bdev_io->internal.orig_iovs, bdev_io->internal.orig_iovcnt); 737 } 738 } 739 740 static void 741 _bdev_io_set_bounce_md_buf(struct spdk_bdev_io *bdev_io, void *md_buf, size_t len) 742 { 743 /* save original md_buf */ 744 bdev_io->internal.orig_md_buf = bdev_io->u.bdev.md_buf; 745 /* set bounce md_buf */ 746 bdev_io->u.bdev.md_buf = md_buf; 747 748 if (bdev_io->type == SPDK_BDEV_IO_TYPE_WRITE) { 749 memcpy(md_buf, bdev_io->internal.orig_md_buf, len); 750 } 751 } 752 753 static void 754 bdev_io_get_buf_complete(struct spdk_bdev_io *bdev_io, void *buf, bool status) 755 { 756 struct spdk_io_channel *ch = spdk_bdev_io_get_io_channel(bdev_io); 757 758 if (spdk_unlikely(bdev_io->internal.get_aux_buf_cb != NULL)) { 759 bdev_io->internal.get_aux_buf_cb(ch, bdev_io, buf); 760 bdev_io->internal.get_aux_buf_cb = NULL; 761 } else { 762 assert(bdev_io->internal.get_buf_cb != NULL); 763 bdev_io->internal.buf = buf; 764 bdev_io->internal.get_buf_cb(ch, bdev_io, status); 765 bdev_io->internal.get_buf_cb = NULL; 766 } 767 } 768 769 static void 770 _bdev_io_set_buf(struct spdk_bdev_io *bdev_io, void *buf, uint64_t len) 771 { 772 struct spdk_bdev *bdev = bdev_io->bdev; 773 bool buf_allocated; 774 uint64_t md_len, alignment; 775 void *aligned_buf; 776 777 if (spdk_unlikely(bdev_io->internal.get_aux_buf_cb != NULL)) { 778 bdev_io_get_buf_complete(bdev_io, buf, true); 779 return; 780 } 781 782 alignment = spdk_bdev_get_buf_align(bdev); 783 buf_allocated = _is_buf_allocated(bdev_io->u.bdev.iovs); 784 aligned_buf = (void *)(((uintptr_t)buf + (alignment - 1)) & ~(alignment - 1)); 785 786 if (buf_allocated) { 787 _bdev_io_set_bounce_buf(bdev_io, aligned_buf, len); 788 } else { 789 spdk_bdev_io_set_buf(bdev_io, aligned_buf, len); 790 } 791 792 if (spdk_bdev_is_md_separate(bdev)) { 793 aligned_buf = (char *)aligned_buf + len; 794 md_len = bdev_io->u.bdev.num_blocks * bdev->md_len; 795 796 assert(((uintptr_t)aligned_buf & (alignment - 1)) == 0); 797 798 if (bdev_io->u.bdev.md_buf != NULL) { 799 _bdev_io_set_bounce_md_buf(bdev_io, aligned_buf, md_len); 800 } else { 801 spdk_bdev_io_set_md_buf(bdev_io, aligned_buf, md_len); 802 } 803 } 804 bdev_io_get_buf_complete(bdev_io, buf, true); 805 } 806 807 static void 808 _bdev_io_put_buf(struct spdk_bdev_io *bdev_io, void *buf, uint64_t buf_len) 809 { 810 struct spdk_bdev *bdev = bdev_io->bdev; 811 struct spdk_mempool *pool; 812 struct spdk_bdev_io *tmp; 813 bdev_io_stailq_t *stailq; 814 struct spdk_bdev_mgmt_channel *ch; 815 uint64_t md_len, alignment; 816 817 md_len = spdk_bdev_is_md_separate(bdev) ? bdev_io->u.bdev.num_blocks * bdev->md_len : 0; 818 alignment = spdk_bdev_get_buf_align(bdev); 819 ch = bdev_io->internal.ch->shared_resource->mgmt_ch; 820 821 if (buf_len + alignment + md_len <= SPDK_BDEV_BUF_SIZE_WITH_MD(SPDK_BDEV_SMALL_BUF_MAX_SIZE) + 822 SPDK_BDEV_POOL_ALIGNMENT) { 823 pool = g_bdev_mgr.buf_small_pool; 824 stailq = &ch->need_buf_small; 825 } else { 826 pool = g_bdev_mgr.buf_large_pool; 827 stailq = &ch->need_buf_large; 828 } 829 830 if (STAILQ_EMPTY(stailq)) { 831 spdk_mempool_put(pool, buf); 832 } else { 833 tmp = STAILQ_FIRST(stailq); 834 STAILQ_REMOVE_HEAD(stailq, internal.buf_link); 835 _bdev_io_set_buf(tmp, buf, tmp->internal.buf_len); 836 } 837 } 838 839 static void 840 bdev_io_put_buf(struct spdk_bdev_io *bdev_io) 841 { 842 assert(bdev_io->internal.buf != NULL); 843 _bdev_io_put_buf(bdev_io, bdev_io->internal.buf, bdev_io->internal.buf_len); 844 bdev_io->internal.buf = NULL; 845 } 846 847 void 848 spdk_bdev_io_put_aux_buf(struct spdk_bdev_io *bdev_io, void *buf) 849 { 850 uint64_t len = bdev_io->u.bdev.num_blocks * bdev_io->bdev->blocklen; 851 852 assert(buf != NULL); 853 _bdev_io_put_buf(bdev_io, buf, len); 854 } 855 856 static void 857 _bdev_io_unset_bounce_buf(struct spdk_bdev_io *bdev_io) 858 { 859 if (spdk_likely(bdev_io->internal.orig_iovcnt == 0)) { 860 assert(bdev_io->internal.orig_md_buf == NULL); 861 return; 862 } 863 864 /* if this is read path, copy data from bounce buffer to original buffer */ 865 if (bdev_io->type == SPDK_BDEV_IO_TYPE_READ && 866 bdev_io->internal.status == SPDK_BDEV_IO_STATUS_SUCCESS) { 867 _copy_buf_to_iovs(bdev_io->internal.orig_iovs, 868 bdev_io->internal.orig_iovcnt, 869 bdev_io->internal.bounce_iov.iov_base, 870 bdev_io->internal.bounce_iov.iov_len); 871 } 872 /* set original buffer for this io */ 873 bdev_io->u.bdev.iovcnt = bdev_io->internal.orig_iovcnt; 874 bdev_io->u.bdev.iovs = bdev_io->internal.orig_iovs; 875 /* disable bouncing buffer for this io */ 876 bdev_io->internal.orig_iovcnt = 0; 877 bdev_io->internal.orig_iovs = NULL; 878 879 /* do the same for metadata buffer */ 880 if (spdk_unlikely(bdev_io->internal.orig_md_buf != NULL)) { 881 assert(spdk_bdev_is_md_separate(bdev_io->bdev)); 882 883 if (bdev_io->type == SPDK_BDEV_IO_TYPE_READ && 884 bdev_io->internal.status == SPDK_BDEV_IO_STATUS_SUCCESS) { 885 memcpy(bdev_io->internal.orig_md_buf, bdev_io->u.bdev.md_buf, 886 bdev_io->u.bdev.num_blocks * spdk_bdev_get_md_size(bdev_io->bdev)); 887 } 888 889 bdev_io->u.bdev.md_buf = bdev_io->internal.orig_md_buf; 890 bdev_io->internal.orig_md_buf = NULL; 891 } 892 893 /* We want to free the bounce buffer here since we know we're done with it (as opposed 894 * to waiting for the conditional free of internal.buf in spdk_bdev_free_io()). 895 */ 896 bdev_io_put_buf(bdev_io); 897 } 898 899 static void 900 bdev_io_get_buf(struct spdk_bdev_io *bdev_io, uint64_t len) 901 { 902 struct spdk_bdev *bdev = bdev_io->bdev; 903 struct spdk_mempool *pool; 904 bdev_io_stailq_t *stailq; 905 struct spdk_bdev_mgmt_channel *mgmt_ch; 906 uint64_t alignment, md_len; 907 void *buf; 908 909 alignment = spdk_bdev_get_buf_align(bdev); 910 md_len = spdk_bdev_is_md_separate(bdev) ? bdev_io->u.bdev.num_blocks * bdev->md_len : 0; 911 912 if (len + alignment + md_len > SPDK_BDEV_BUF_SIZE_WITH_MD(SPDK_BDEV_LARGE_BUF_MAX_SIZE) + 913 SPDK_BDEV_POOL_ALIGNMENT) { 914 SPDK_ERRLOG("Length + alignment %" PRIu64 " is larger than allowed\n", 915 len + alignment); 916 bdev_io_get_buf_complete(bdev_io, NULL, false); 917 return; 918 } 919 920 mgmt_ch = bdev_io->internal.ch->shared_resource->mgmt_ch; 921 922 bdev_io->internal.buf_len = len; 923 924 if (len + alignment + md_len <= SPDK_BDEV_BUF_SIZE_WITH_MD(SPDK_BDEV_SMALL_BUF_MAX_SIZE) + 925 SPDK_BDEV_POOL_ALIGNMENT) { 926 pool = g_bdev_mgr.buf_small_pool; 927 stailq = &mgmt_ch->need_buf_small; 928 } else { 929 pool = g_bdev_mgr.buf_large_pool; 930 stailq = &mgmt_ch->need_buf_large; 931 } 932 933 buf = spdk_mempool_get(pool); 934 if (!buf) { 935 STAILQ_INSERT_TAIL(stailq, bdev_io, internal.buf_link); 936 } else { 937 _bdev_io_set_buf(bdev_io, buf, len); 938 } 939 } 940 941 void 942 spdk_bdev_io_get_buf(struct spdk_bdev_io *bdev_io, spdk_bdev_io_get_buf_cb cb, uint64_t len) 943 { 944 struct spdk_bdev *bdev = bdev_io->bdev; 945 uint64_t alignment; 946 947 assert(cb != NULL); 948 bdev_io->internal.get_buf_cb = cb; 949 950 alignment = spdk_bdev_get_buf_align(bdev); 951 952 if (_is_buf_allocated(bdev_io->u.bdev.iovs) && 953 _are_iovs_aligned(bdev_io->u.bdev.iovs, bdev_io->u.bdev.iovcnt, alignment)) { 954 /* Buffer already present and aligned */ 955 cb(spdk_bdev_io_get_io_channel(bdev_io), bdev_io, true); 956 return; 957 } 958 959 bdev_io_get_buf(bdev_io, len); 960 } 961 962 void 963 spdk_bdev_io_get_aux_buf(struct spdk_bdev_io *bdev_io, spdk_bdev_io_get_aux_buf_cb cb) 964 { 965 uint64_t len = bdev_io->u.bdev.num_blocks * bdev_io->bdev->blocklen; 966 967 assert(cb != NULL); 968 assert(bdev_io->internal.get_aux_buf_cb == NULL); 969 bdev_io->internal.get_aux_buf_cb = cb; 970 bdev_io_get_buf(bdev_io, len); 971 } 972 973 static int 974 bdev_module_get_max_ctx_size(void) 975 { 976 struct spdk_bdev_module *bdev_module; 977 int max_bdev_module_size = 0; 978 979 TAILQ_FOREACH(bdev_module, &g_bdev_mgr.bdev_modules, internal.tailq) { 980 if (bdev_module->get_ctx_size && bdev_module->get_ctx_size() > max_bdev_module_size) { 981 max_bdev_module_size = bdev_module->get_ctx_size(); 982 } 983 } 984 985 return max_bdev_module_size; 986 } 987 988 void 989 spdk_bdev_config_text(FILE *fp) 990 { 991 struct spdk_bdev_module *bdev_module; 992 993 TAILQ_FOREACH(bdev_module, &g_bdev_mgr.bdev_modules, internal.tailq) { 994 if (bdev_module->config_text) { 995 bdev_module->config_text(fp); 996 } 997 } 998 } 999 1000 static void 1001 bdev_qos_config_json(struct spdk_bdev *bdev, struct spdk_json_write_ctx *w) 1002 { 1003 int i; 1004 struct spdk_bdev_qos *qos = bdev->internal.qos; 1005 uint64_t limits[SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES]; 1006 1007 if (!qos) { 1008 return; 1009 } 1010 1011 spdk_bdev_get_qos_rate_limits(bdev, limits); 1012 1013 spdk_json_write_object_begin(w); 1014 spdk_json_write_named_string(w, "method", "bdev_set_qos_limit"); 1015 1016 spdk_json_write_named_object_begin(w, "params"); 1017 spdk_json_write_named_string(w, "name", bdev->name); 1018 for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) { 1019 if (limits[i] > 0) { 1020 spdk_json_write_named_uint64(w, qos_rpc_type[i], limits[i]); 1021 } 1022 } 1023 spdk_json_write_object_end(w); 1024 1025 spdk_json_write_object_end(w); 1026 } 1027 1028 void 1029 spdk_bdev_subsystem_config_json(struct spdk_json_write_ctx *w) 1030 { 1031 struct spdk_bdev_module *bdev_module; 1032 struct spdk_bdev *bdev; 1033 1034 assert(w != NULL); 1035 1036 spdk_json_write_array_begin(w); 1037 1038 spdk_json_write_object_begin(w); 1039 spdk_json_write_named_string(w, "method", "bdev_set_options"); 1040 spdk_json_write_named_object_begin(w, "params"); 1041 spdk_json_write_named_uint32(w, "bdev_io_pool_size", g_bdev_opts.bdev_io_pool_size); 1042 spdk_json_write_named_uint32(w, "bdev_io_cache_size", g_bdev_opts.bdev_io_cache_size); 1043 spdk_json_write_named_bool(w, "bdev_auto_examine", g_bdev_opts.bdev_auto_examine); 1044 spdk_json_write_object_end(w); 1045 spdk_json_write_object_end(w); 1046 1047 bdev_examine_allowlist_config_json(w); 1048 1049 TAILQ_FOREACH(bdev_module, &g_bdev_mgr.bdev_modules, internal.tailq) { 1050 if (bdev_module->config_json) { 1051 bdev_module->config_json(w); 1052 } 1053 } 1054 1055 pthread_mutex_lock(&g_bdev_mgr.mutex); 1056 1057 TAILQ_FOREACH(bdev, &g_bdev_mgr.bdevs, internal.link) { 1058 if (bdev->fn_table->write_config_json) { 1059 bdev->fn_table->write_config_json(bdev, w); 1060 } 1061 1062 bdev_qos_config_json(bdev, w); 1063 } 1064 1065 pthread_mutex_unlock(&g_bdev_mgr.mutex); 1066 1067 spdk_json_write_array_end(w); 1068 } 1069 1070 static int 1071 bdev_mgmt_channel_create(void *io_device, void *ctx_buf) 1072 { 1073 struct spdk_bdev_mgmt_channel *ch = ctx_buf; 1074 struct spdk_bdev_io *bdev_io; 1075 uint32_t i; 1076 1077 STAILQ_INIT(&ch->need_buf_small); 1078 STAILQ_INIT(&ch->need_buf_large); 1079 1080 STAILQ_INIT(&ch->per_thread_cache); 1081 ch->bdev_io_cache_size = g_bdev_opts.bdev_io_cache_size; 1082 1083 /* Pre-populate bdev_io cache to ensure this thread cannot be starved. */ 1084 ch->per_thread_cache_count = 0; 1085 for (i = 0; i < ch->bdev_io_cache_size; i++) { 1086 bdev_io = spdk_mempool_get(g_bdev_mgr.bdev_io_pool); 1087 assert(bdev_io != NULL); 1088 ch->per_thread_cache_count++; 1089 STAILQ_INSERT_HEAD(&ch->per_thread_cache, bdev_io, internal.buf_link); 1090 } 1091 1092 TAILQ_INIT(&ch->shared_resources); 1093 TAILQ_INIT(&ch->io_wait_queue); 1094 1095 return 0; 1096 } 1097 1098 static void 1099 bdev_mgmt_channel_destroy(void *io_device, void *ctx_buf) 1100 { 1101 struct spdk_bdev_mgmt_channel *ch = ctx_buf; 1102 struct spdk_bdev_io *bdev_io; 1103 1104 if (!STAILQ_EMPTY(&ch->need_buf_small) || !STAILQ_EMPTY(&ch->need_buf_large)) { 1105 SPDK_ERRLOG("Pending I/O list wasn't empty on mgmt channel free\n"); 1106 } 1107 1108 if (!TAILQ_EMPTY(&ch->shared_resources)) { 1109 SPDK_ERRLOG("Module channel list wasn't empty on mgmt channel free\n"); 1110 } 1111 1112 while (!STAILQ_EMPTY(&ch->per_thread_cache)) { 1113 bdev_io = STAILQ_FIRST(&ch->per_thread_cache); 1114 STAILQ_REMOVE_HEAD(&ch->per_thread_cache, internal.buf_link); 1115 ch->per_thread_cache_count--; 1116 spdk_mempool_put(g_bdev_mgr.bdev_io_pool, (void *)bdev_io); 1117 } 1118 1119 assert(ch->per_thread_cache_count == 0); 1120 } 1121 1122 static void 1123 bdev_init_complete(int rc) 1124 { 1125 spdk_bdev_init_cb cb_fn = g_init_cb_fn; 1126 void *cb_arg = g_init_cb_arg; 1127 struct spdk_bdev_module *m; 1128 1129 g_bdev_mgr.init_complete = true; 1130 g_init_cb_fn = NULL; 1131 g_init_cb_arg = NULL; 1132 1133 /* 1134 * For modules that need to know when subsystem init is complete, 1135 * inform them now. 1136 */ 1137 if (rc == 0) { 1138 TAILQ_FOREACH(m, &g_bdev_mgr.bdev_modules, internal.tailq) { 1139 if (m->init_complete) { 1140 m->init_complete(); 1141 } 1142 } 1143 } 1144 1145 cb_fn(cb_arg, rc); 1146 } 1147 1148 static void 1149 bdev_module_action_complete(void) 1150 { 1151 struct spdk_bdev_module *m; 1152 1153 /* 1154 * Don't finish bdev subsystem initialization if 1155 * module pre-initialization is still in progress, or 1156 * the subsystem been already initialized. 1157 */ 1158 if (!g_bdev_mgr.module_init_complete || g_bdev_mgr.init_complete) { 1159 return; 1160 } 1161 1162 /* 1163 * Check all bdev modules for inits/examinations in progress. If any 1164 * exist, return immediately since we cannot finish bdev subsystem 1165 * initialization until all are completed. 1166 */ 1167 TAILQ_FOREACH(m, &g_bdev_mgr.bdev_modules, internal.tailq) { 1168 if (m->internal.action_in_progress > 0) { 1169 return; 1170 } 1171 } 1172 1173 /* 1174 * Modules already finished initialization - now that all 1175 * the bdev modules have finished their asynchronous I/O 1176 * processing, the entire bdev layer can be marked as complete. 1177 */ 1178 bdev_init_complete(0); 1179 } 1180 1181 static void 1182 bdev_module_action_done(struct spdk_bdev_module *module) 1183 { 1184 assert(module->internal.action_in_progress > 0); 1185 module->internal.action_in_progress--; 1186 bdev_module_action_complete(); 1187 } 1188 1189 void 1190 spdk_bdev_module_init_done(struct spdk_bdev_module *module) 1191 { 1192 bdev_module_action_done(module); 1193 } 1194 1195 void 1196 spdk_bdev_module_examine_done(struct spdk_bdev_module *module) 1197 { 1198 bdev_module_action_done(module); 1199 } 1200 1201 /** The last initialized bdev module */ 1202 static struct spdk_bdev_module *g_resume_bdev_module = NULL; 1203 1204 static void 1205 bdev_init_failed(void *cb_arg) 1206 { 1207 struct spdk_bdev_module *module = cb_arg; 1208 1209 module->internal.action_in_progress--; 1210 bdev_init_complete(-1); 1211 } 1212 1213 static int 1214 bdev_modules_init(void) 1215 { 1216 struct spdk_bdev_module *module; 1217 int rc = 0; 1218 1219 TAILQ_FOREACH(module, &g_bdev_mgr.bdev_modules, internal.tailq) { 1220 g_resume_bdev_module = module; 1221 if (module->async_init) { 1222 module->internal.action_in_progress = 1; 1223 } 1224 rc = module->module_init(); 1225 if (rc != 0) { 1226 /* Bump action_in_progress to prevent other modules from completion of modules_init 1227 * Send message to defer application shutdown until resources are cleaned up */ 1228 module->internal.action_in_progress = 1; 1229 spdk_thread_send_msg(spdk_get_thread(), bdev_init_failed, module); 1230 return rc; 1231 } 1232 } 1233 1234 g_resume_bdev_module = NULL; 1235 return 0; 1236 } 1237 1238 void 1239 spdk_bdev_initialize(spdk_bdev_init_cb cb_fn, void *cb_arg) 1240 { 1241 struct spdk_conf_section *sp; 1242 struct spdk_bdev_opts bdev_opts; 1243 int32_t bdev_io_pool_size, bdev_io_cache_size; 1244 int cache_size; 1245 int rc = 0; 1246 char mempool_name[32]; 1247 1248 assert(cb_fn != NULL); 1249 1250 sp = spdk_conf_find_section(NULL, "Bdev"); 1251 if (sp != NULL) { 1252 spdk_bdev_get_opts(&bdev_opts); 1253 1254 bdev_io_pool_size = spdk_conf_section_get_intval(sp, "BdevIoPoolSize"); 1255 if (bdev_io_pool_size >= 0) { 1256 bdev_opts.bdev_io_pool_size = bdev_io_pool_size; 1257 } 1258 1259 bdev_io_cache_size = spdk_conf_section_get_intval(sp, "BdevIoCacheSize"); 1260 if (bdev_io_cache_size >= 0) { 1261 bdev_opts.bdev_io_cache_size = bdev_io_cache_size; 1262 } 1263 1264 if (spdk_bdev_set_opts(&bdev_opts)) { 1265 bdev_init_complete(-1); 1266 return; 1267 } 1268 1269 assert(memcmp(&bdev_opts, &g_bdev_opts, sizeof(bdev_opts)) == 0); 1270 } 1271 1272 g_init_cb_fn = cb_fn; 1273 g_init_cb_arg = cb_arg; 1274 1275 spdk_notify_type_register("bdev_register"); 1276 spdk_notify_type_register("bdev_unregister"); 1277 1278 snprintf(mempool_name, sizeof(mempool_name), "bdev_io_%d", getpid()); 1279 1280 g_bdev_mgr.bdev_io_pool = spdk_mempool_create(mempool_name, 1281 g_bdev_opts.bdev_io_pool_size, 1282 sizeof(struct spdk_bdev_io) + 1283 bdev_module_get_max_ctx_size(), 1284 0, 1285 SPDK_ENV_SOCKET_ID_ANY); 1286 1287 if (g_bdev_mgr.bdev_io_pool == NULL) { 1288 SPDK_ERRLOG("could not allocate spdk_bdev_io pool\n"); 1289 bdev_init_complete(-1); 1290 return; 1291 } 1292 1293 /** 1294 * Ensure no more than half of the total buffers end up local caches, by 1295 * using spdk_env_get_core_count() to determine how many local caches we need 1296 * to account for. 1297 */ 1298 cache_size = BUF_SMALL_POOL_SIZE / (2 * spdk_env_get_core_count()); 1299 snprintf(mempool_name, sizeof(mempool_name), "buf_small_pool_%d", getpid()); 1300 1301 g_bdev_mgr.buf_small_pool = spdk_mempool_create(mempool_name, 1302 BUF_SMALL_POOL_SIZE, 1303 SPDK_BDEV_BUF_SIZE_WITH_MD(SPDK_BDEV_SMALL_BUF_MAX_SIZE) + 1304 SPDK_BDEV_POOL_ALIGNMENT, 1305 cache_size, 1306 SPDK_ENV_SOCKET_ID_ANY); 1307 if (!g_bdev_mgr.buf_small_pool) { 1308 SPDK_ERRLOG("create rbuf small pool failed\n"); 1309 bdev_init_complete(-1); 1310 return; 1311 } 1312 1313 cache_size = BUF_LARGE_POOL_SIZE / (2 * spdk_env_get_core_count()); 1314 snprintf(mempool_name, sizeof(mempool_name), "buf_large_pool_%d", getpid()); 1315 1316 g_bdev_mgr.buf_large_pool = spdk_mempool_create(mempool_name, 1317 BUF_LARGE_POOL_SIZE, 1318 SPDK_BDEV_BUF_SIZE_WITH_MD(SPDK_BDEV_LARGE_BUF_MAX_SIZE) + 1319 SPDK_BDEV_POOL_ALIGNMENT, 1320 cache_size, 1321 SPDK_ENV_SOCKET_ID_ANY); 1322 if (!g_bdev_mgr.buf_large_pool) { 1323 SPDK_ERRLOG("create rbuf large pool failed\n"); 1324 bdev_init_complete(-1); 1325 return; 1326 } 1327 1328 g_bdev_mgr.zero_buffer = spdk_zmalloc(ZERO_BUFFER_SIZE, ZERO_BUFFER_SIZE, 1329 NULL, SPDK_ENV_LCORE_ID_ANY, SPDK_MALLOC_DMA); 1330 if (!g_bdev_mgr.zero_buffer) { 1331 SPDK_ERRLOG("create bdev zero buffer failed\n"); 1332 bdev_init_complete(-1); 1333 return; 1334 } 1335 1336 #ifdef SPDK_CONFIG_VTUNE 1337 g_bdev_mgr.domain = __itt_domain_create("spdk_bdev"); 1338 #endif 1339 1340 spdk_io_device_register(&g_bdev_mgr, bdev_mgmt_channel_create, 1341 bdev_mgmt_channel_destroy, 1342 sizeof(struct spdk_bdev_mgmt_channel), 1343 "bdev_mgr"); 1344 1345 rc = bdev_modules_init(); 1346 g_bdev_mgr.module_init_complete = true; 1347 if (rc != 0) { 1348 SPDK_ERRLOG("bdev modules init failed\n"); 1349 return; 1350 } 1351 1352 bdev_module_action_complete(); 1353 } 1354 1355 static void 1356 bdev_mgr_unregister_cb(void *io_device) 1357 { 1358 spdk_bdev_fini_cb cb_fn = g_fini_cb_fn; 1359 1360 if (g_bdev_mgr.bdev_io_pool) { 1361 if (spdk_mempool_count(g_bdev_mgr.bdev_io_pool) != g_bdev_opts.bdev_io_pool_size) { 1362 SPDK_ERRLOG("bdev IO pool count is %zu but should be %u\n", 1363 spdk_mempool_count(g_bdev_mgr.bdev_io_pool), 1364 g_bdev_opts.bdev_io_pool_size); 1365 } 1366 1367 spdk_mempool_free(g_bdev_mgr.bdev_io_pool); 1368 } 1369 1370 if (g_bdev_mgr.buf_small_pool) { 1371 if (spdk_mempool_count(g_bdev_mgr.buf_small_pool) != BUF_SMALL_POOL_SIZE) { 1372 SPDK_ERRLOG("Small buffer pool count is %zu but should be %u\n", 1373 spdk_mempool_count(g_bdev_mgr.buf_small_pool), 1374 BUF_SMALL_POOL_SIZE); 1375 assert(false); 1376 } 1377 1378 spdk_mempool_free(g_bdev_mgr.buf_small_pool); 1379 } 1380 1381 if (g_bdev_mgr.buf_large_pool) { 1382 if (spdk_mempool_count(g_bdev_mgr.buf_large_pool) != BUF_LARGE_POOL_SIZE) { 1383 SPDK_ERRLOG("Large buffer pool count is %zu but should be %u\n", 1384 spdk_mempool_count(g_bdev_mgr.buf_large_pool), 1385 BUF_LARGE_POOL_SIZE); 1386 assert(false); 1387 } 1388 1389 spdk_mempool_free(g_bdev_mgr.buf_large_pool); 1390 } 1391 1392 spdk_free(g_bdev_mgr.zero_buffer); 1393 1394 bdev_examine_allowlist_free(); 1395 1396 cb_fn(g_fini_cb_arg); 1397 g_fini_cb_fn = NULL; 1398 g_fini_cb_arg = NULL; 1399 g_bdev_mgr.init_complete = false; 1400 g_bdev_mgr.module_init_complete = false; 1401 pthread_mutex_destroy(&g_bdev_mgr.mutex); 1402 } 1403 1404 static void 1405 bdev_module_finish_iter(void *arg) 1406 { 1407 struct spdk_bdev_module *bdev_module; 1408 1409 /* FIXME: Handling initialization failures is broken now, 1410 * so we won't even try cleaning up after successfully 1411 * initialized modules. if module_init_complete is false, 1412 * just call spdk_bdev_mgr_unregister_cb 1413 */ 1414 if (!g_bdev_mgr.module_init_complete) { 1415 bdev_mgr_unregister_cb(NULL); 1416 return; 1417 } 1418 1419 /* Start iterating from the last touched module */ 1420 if (!g_resume_bdev_module) { 1421 bdev_module = TAILQ_LAST(&g_bdev_mgr.bdev_modules, bdev_module_list); 1422 } else { 1423 bdev_module = TAILQ_PREV(g_resume_bdev_module, bdev_module_list, 1424 internal.tailq); 1425 } 1426 1427 while (bdev_module) { 1428 if (bdev_module->async_fini) { 1429 /* Save our place so we can resume later. We must 1430 * save the variable here, before calling module_fini() 1431 * below, because in some cases the module may immediately 1432 * call spdk_bdev_module_finish_done() and re-enter 1433 * this function to continue iterating. */ 1434 g_resume_bdev_module = bdev_module; 1435 } 1436 1437 if (bdev_module->module_fini) { 1438 bdev_module->module_fini(); 1439 } 1440 1441 if (bdev_module->async_fini) { 1442 return; 1443 } 1444 1445 bdev_module = TAILQ_PREV(bdev_module, bdev_module_list, 1446 internal.tailq); 1447 } 1448 1449 g_resume_bdev_module = NULL; 1450 spdk_io_device_unregister(&g_bdev_mgr, bdev_mgr_unregister_cb); 1451 } 1452 1453 void 1454 spdk_bdev_module_finish_done(void) 1455 { 1456 if (spdk_get_thread() != g_fini_thread) { 1457 spdk_thread_send_msg(g_fini_thread, bdev_module_finish_iter, NULL); 1458 } else { 1459 bdev_module_finish_iter(NULL); 1460 } 1461 } 1462 1463 static void 1464 bdev_finish_unregister_bdevs_iter(void *cb_arg, int bdeverrno) 1465 { 1466 struct spdk_bdev *bdev = cb_arg; 1467 1468 if (bdeverrno && bdev) { 1469 SPDK_WARNLOG("Unable to unregister bdev '%s' during spdk_bdev_finish()\n", 1470 bdev->name); 1471 1472 /* 1473 * Since the call to spdk_bdev_unregister() failed, we have no way to free this 1474 * bdev; try to continue by manually removing this bdev from the list and continue 1475 * with the next bdev in the list. 1476 */ 1477 TAILQ_REMOVE(&g_bdev_mgr.bdevs, bdev, internal.link); 1478 } 1479 1480 if (TAILQ_EMPTY(&g_bdev_mgr.bdevs)) { 1481 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Done unregistering bdevs\n"); 1482 /* 1483 * Bdev module finish need to be deferred as we might be in the middle of some context 1484 * (like bdev part free) that will use this bdev (or private bdev driver ctx data) 1485 * after returning. 1486 */ 1487 spdk_thread_send_msg(spdk_get_thread(), bdev_module_finish_iter, NULL); 1488 return; 1489 } 1490 1491 /* 1492 * Unregister last unclaimed bdev in the list, to ensure that bdev subsystem 1493 * shutdown proceeds top-down. The goal is to give virtual bdevs an opportunity 1494 * to detect clean shutdown as opposed to run-time hot removal of the underlying 1495 * base bdevs. 1496 * 1497 * Also, walk the list in the reverse order. 1498 */ 1499 for (bdev = TAILQ_LAST(&g_bdev_mgr.bdevs, spdk_bdev_list); 1500 bdev; bdev = TAILQ_PREV(bdev, spdk_bdev_list, internal.link)) { 1501 if (bdev->internal.claim_module != NULL) { 1502 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Skipping claimed bdev '%s'(<-'%s').\n", 1503 bdev->name, bdev->internal.claim_module->name); 1504 continue; 1505 } 1506 1507 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Unregistering bdev '%s'\n", bdev->name); 1508 spdk_bdev_unregister(bdev, bdev_finish_unregister_bdevs_iter, bdev); 1509 return; 1510 } 1511 1512 /* 1513 * If any bdev fails to unclaim underlying bdev properly, we may face the 1514 * case of bdev list consisting of claimed bdevs only (if claims are managed 1515 * correctly, this would mean there's a loop in the claims graph which is 1516 * clearly impossible). Warn and unregister last bdev on the list then. 1517 */ 1518 for (bdev = TAILQ_LAST(&g_bdev_mgr.bdevs, spdk_bdev_list); 1519 bdev; bdev = TAILQ_PREV(bdev, spdk_bdev_list, internal.link)) { 1520 SPDK_WARNLOG("Unregistering claimed bdev '%s'!\n", bdev->name); 1521 spdk_bdev_unregister(bdev, bdev_finish_unregister_bdevs_iter, bdev); 1522 return; 1523 } 1524 } 1525 1526 void 1527 spdk_bdev_finish(spdk_bdev_fini_cb cb_fn, void *cb_arg) 1528 { 1529 struct spdk_bdev_module *m; 1530 1531 assert(cb_fn != NULL); 1532 1533 g_fini_thread = spdk_get_thread(); 1534 1535 g_fini_cb_fn = cb_fn; 1536 g_fini_cb_arg = cb_arg; 1537 1538 TAILQ_FOREACH(m, &g_bdev_mgr.bdev_modules, internal.tailq) { 1539 if (m->fini_start) { 1540 m->fini_start(); 1541 } 1542 } 1543 1544 bdev_finish_unregister_bdevs_iter(NULL, 0); 1545 } 1546 1547 struct spdk_bdev_io * 1548 bdev_channel_get_io(struct spdk_bdev_channel *channel) 1549 { 1550 struct spdk_bdev_mgmt_channel *ch = channel->shared_resource->mgmt_ch; 1551 struct spdk_bdev_io *bdev_io; 1552 1553 if (ch->per_thread_cache_count > 0) { 1554 bdev_io = STAILQ_FIRST(&ch->per_thread_cache); 1555 STAILQ_REMOVE_HEAD(&ch->per_thread_cache, internal.buf_link); 1556 ch->per_thread_cache_count--; 1557 } else if (spdk_unlikely(!TAILQ_EMPTY(&ch->io_wait_queue))) { 1558 /* 1559 * Don't try to look for bdev_ios in the global pool if there are 1560 * waiters on bdev_ios - we don't want this caller to jump the line. 1561 */ 1562 bdev_io = NULL; 1563 } else { 1564 bdev_io = spdk_mempool_get(g_bdev_mgr.bdev_io_pool); 1565 } 1566 1567 return bdev_io; 1568 } 1569 1570 void 1571 spdk_bdev_free_io(struct spdk_bdev_io *bdev_io) 1572 { 1573 struct spdk_bdev_mgmt_channel *ch; 1574 1575 assert(bdev_io != NULL); 1576 assert(bdev_io->internal.status != SPDK_BDEV_IO_STATUS_PENDING); 1577 1578 ch = bdev_io->internal.ch->shared_resource->mgmt_ch; 1579 1580 if (bdev_io->internal.buf != NULL) { 1581 bdev_io_put_buf(bdev_io); 1582 } 1583 1584 if (ch->per_thread_cache_count < ch->bdev_io_cache_size) { 1585 ch->per_thread_cache_count++; 1586 STAILQ_INSERT_HEAD(&ch->per_thread_cache, bdev_io, internal.buf_link); 1587 while (ch->per_thread_cache_count > 0 && !TAILQ_EMPTY(&ch->io_wait_queue)) { 1588 struct spdk_bdev_io_wait_entry *entry; 1589 1590 entry = TAILQ_FIRST(&ch->io_wait_queue); 1591 TAILQ_REMOVE(&ch->io_wait_queue, entry, link); 1592 entry->cb_fn(entry->cb_arg); 1593 } 1594 } else { 1595 /* We should never have a full cache with entries on the io wait queue. */ 1596 assert(TAILQ_EMPTY(&ch->io_wait_queue)); 1597 spdk_mempool_put(g_bdev_mgr.bdev_io_pool, (void *)bdev_io); 1598 } 1599 } 1600 1601 static bool 1602 bdev_qos_is_iops_rate_limit(enum spdk_bdev_qos_rate_limit_type limit) 1603 { 1604 assert(limit != SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES); 1605 1606 switch (limit) { 1607 case SPDK_BDEV_QOS_RW_IOPS_RATE_LIMIT: 1608 return true; 1609 case SPDK_BDEV_QOS_RW_BPS_RATE_LIMIT: 1610 case SPDK_BDEV_QOS_R_BPS_RATE_LIMIT: 1611 case SPDK_BDEV_QOS_W_BPS_RATE_LIMIT: 1612 return false; 1613 case SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES: 1614 default: 1615 return false; 1616 } 1617 } 1618 1619 static bool 1620 bdev_qos_io_to_limit(struct spdk_bdev_io *bdev_io) 1621 { 1622 switch (bdev_io->type) { 1623 case SPDK_BDEV_IO_TYPE_NVME_IO: 1624 case SPDK_BDEV_IO_TYPE_NVME_IO_MD: 1625 case SPDK_BDEV_IO_TYPE_READ: 1626 case SPDK_BDEV_IO_TYPE_WRITE: 1627 return true; 1628 case SPDK_BDEV_IO_TYPE_ZCOPY: 1629 if (bdev_io->u.bdev.zcopy.start) { 1630 return true; 1631 } else { 1632 return false; 1633 } 1634 default: 1635 return false; 1636 } 1637 } 1638 1639 static bool 1640 bdev_is_read_io(struct spdk_bdev_io *bdev_io) 1641 { 1642 switch (bdev_io->type) { 1643 case SPDK_BDEV_IO_TYPE_NVME_IO: 1644 case SPDK_BDEV_IO_TYPE_NVME_IO_MD: 1645 /* Bit 1 (0x2) set for read operation */ 1646 if (bdev_io->u.nvme_passthru.cmd.opc & SPDK_NVME_OPC_READ) { 1647 return true; 1648 } else { 1649 return false; 1650 } 1651 case SPDK_BDEV_IO_TYPE_READ: 1652 return true; 1653 case SPDK_BDEV_IO_TYPE_ZCOPY: 1654 /* Populate to read from disk */ 1655 if (bdev_io->u.bdev.zcopy.populate) { 1656 return true; 1657 } else { 1658 return false; 1659 } 1660 default: 1661 return false; 1662 } 1663 } 1664 1665 static uint64_t 1666 bdev_get_io_size_in_byte(struct spdk_bdev_io *bdev_io) 1667 { 1668 struct spdk_bdev *bdev = bdev_io->bdev; 1669 1670 switch (bdev_io->type) { 1671 case SPDK_BDEV_IO_TYPE_NVME_IO: 1672 case SPDK_BDEV_IO_TYPE_NVME_IO_MD: 1673 return bdev_io->u.nvme_passthru.nbytes; 1674 case SPDK_BDEV_IO_TYPE_READ: 1675 case SPDK_BDEV_IO_TYPE_WRITE: 1676 return bdev_io->u.bdev.num_blocks * bdev->blocklen; 1677 case SPDK_BDEV_IO_TYPE_ZCOPY: 1678 /* Track the data in the start phase only */ 1679 if (bdev_io->u.bdev.zcopy.start) { 1680 return bdev_io->u.bdev.num_blocks * bdev->blocklen; 1681 } else { 1682 return 0; 1683 } 1684 default: 1685 return 0; 1686 } 1687 } 1688 1689 static bool 1690 bdev_qos_rw_queue_io(const struct spdk_bdev_qos_limit *limit, struct spdk_bdev_io *io) 1691 { 1692 if (limit->max_per_timeslice > 0 && limit->remaining_this_timeslice <= 0) { 1693 return true; 1694 } else { 1695 return false; 1696 } 1697 } 1698 1699 static bool 1700 bdev_qos_r_queue_io(const struct spdk_bdev_qos_limit *limit, struct spdk_bdev_io *io) 1701 { 1702 if (bdev_is_read_io(io) == false) { 1703 return false; 1704 } 1705 1706 return bdev_qos_rw_queue_io(limit, io); 1707 } 1708 1709 static bool 1710 bdev_qos_w_queue_io(const struct spdk_bdev_qos_limit *limit, struct spdk_bdev_io *io) 1711 { 1712 if (bdev_is_read_io(io) == true) { 1713 return false; 1714 } 1715 1716 return bdev_qos_rw_queue_io(limit, io); 1717 } 1718 1719 static void 1720 bdev_qos_rw_iops_update_quota(struct spdk_bdev_qos_limit *limit, struct spdk_bdev_io *io) 1721 { 1722 limit->remaining_this_timeslice--; 1723 } 1724 1725 static void 1726 bdev_qos_rw_bps_update_quota(struct spdk_bdev_qos_limit *limit, struct spdk_bdev_io *io) 1727 { 1728 limit->remaining_this_timeslice -= bdev_get_io_size_in_byte(io); 1729 } 1730 1731 static void 1732 bdev_qos_r_bps_update_quota(struct spdk_bdev_qos_limit *limit, struct spdk_bdev_io *io) 1733 { 1734 if (bdev_is_read_io(io) == false) { 1735 return; 1736 } 1737 1738 return bdev_qos_rw_bps_update_quota(limit, io); 1739 } 1740 1741 static void 1742 bdev_qos_w_bps_update_quota(struct spdk_bdev_qos_limit *limit, struct spdk_bdev_io *io) 1743 { 1744 if (bdev_is_read_io(io) == true) { 1745 return; 1746 } 1747 1748 return bdev_qos_rw_bps_update_quota(limit, io); 1749 } 1750 1751 static void 1752 bdev_qos_set_ops(struct spdk_bdev_qos *qos) 1753 { 1754 int i; 1755 1756 for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) { 1757 if (qos->rate_limits[i].limit == SPDK_BDEV_QOS_LIMIT_NOT_DEFINED) { 1758 qos->rate_limits[i].queue_io = NULL; 1759 qos->rate_limits[i].update_quota = NULL; 1760 continue; 1761 } 1762 1763 switch (i) { 1764 case SPDK_BDEV_QOS_RW_IOPS_RATE_LIMIT: 1765 qos->rate_limits[i].queue_io = bdev_qos_rw_queue_io; 1766 qos->rate_limits[i].update_quota = bdev_qos_rw_iops_update_quota; 1767 break; 1768 case SPDK_BDEV_QOS_RW_BPS_RATE_LIMIT: 1769 qos->rate_limits[i].queue_io = bdev_qos_rw_queue_io; 1770 qos->rate_limits[i].update_quota = bdev_qos_rw_bps_update_quota; 1771 break; 1772 case SPDK_BDEV_QOS_R_BPS_RATE_LIMIT: 1773 qos->rate_limits[i].queue_io = bdev_qos_r_queue_io; 1774 qos->rate_limits[i].update_quota = bdev_qos_r_bps_update_quota; 1775 break; 1776 case SPDK_BDEV_QOS_W_BPS_RATE_LIMIT: 1777 qos->rate_limits[i].queue_io = bdev_qos_w_queue_io; 1778 qos->rate_limits[i].update_quota = bdev_qos_w_bps_update_quota; 1779 break; 1780 default: 1781 break; 1782 } 1783 } 1784 } 1785 1786 static void 1787 _bdev_io_complete_in_submit(struct spdk_bdev_channel *bdev_ch, 1788 struct spdk_bdev_io *bdev_io, 1789 enum spdk_bdev_io_status status) 1790 { 1791 struct spdk_bdev_shared_resource *shared_resource = bdev_ch->shared_resource; 1792 1793 bdev_io->internal.in_submit_request = true; 1794 bdev_ch->io_outstanding++; 1795 shared_resource->io_outstanding++; 1796 spdk_bdev_io_complete(bdev_io, status); 1797 bdev_io->internal.in_submit_request = false; 1798 } 1799 1800 static inline void 1801 bdev_io_do_submit(struct spdk_bdev_channel *bdev_ch, struct spdk_bdev_io *bdev_io) 1802 { 1803 struct spdk_bdev *bdev = bdev_io->bdev; 1804 struct spdk_io_channel *ch = bdev_ch->channel; 1805 struct spdk_bdev_shared_resource *shared_resource = bdev_ch->shared_resource; 1806 1807 if (spdk_unlikely(bdev_io->type == SPDK_BDEV_IO_TYPE_ABORT)) { 1808 struct spdk_bdev_mgmt_channel *mgmt_channel = shared_resource->mgmt_ch; 1809 struct spdk_bdev_io *bio_to_abort = bdev_io->u.abort.bio_to_abort; 1810 1811 if (bdev_abort_queued_io(&shared_resource->nomem_io, bio_to_abort) || 1812 bdev_abort_buf_io(&mgmt_channel->need_buf_small, bio_to_abort) || 1813 bdev_abort_buf_io(&mgmt_channel->need_buf_large, bio_to_abort)) { 1814 _bdev_io_complete_in_submit(bdev_ch, bdev_io, 1815 SPDK_BDEV_IO_STATUS_SUCCESS); 1816 return; 1817 } 1818 } 1819 1820 if (spdk_likely(TAILQ_EMPTY(&shared_resource->nomem_io))) { 1821 bdev_ch->io_outstanding++; 1822 shared_resource->io_outstanding++; 1823 bdev_io->internal.in_submit_request = true; 1824 bdev->fn_table->submit_request(ch, bdev_io); 1825 bdev_io->internal.in_submit_request = false; 1826 } else { 1827 TAILQ_INSERT_TAIL(&shared_resource->nomem_io, bdev_io, internal.link); 1828 } 1829 } 1830 1831 static int 1832 bdev_qos_io_submit(struct spdk_bdev_channel *ch, struct spdk_bdev_qos *qos) 1833 { 1834 struct spdk_bdev_io *bdev_io = NULL, *tmp = NULL; 1835 int i, submitted_ios = 0; 1836 1837 TAILQ_FOREACH_SAFE(bdev_io, &qos->queued, internal.link, tmp) { 1838 if (bdev_qos_io_to_limit(bdev_io) == true) { 1839 for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) { 1840 if (!qos->rate_limits[i].queue_io) { 1841 continue; 1842 } 1843 1844 if (qos->rate_limits[i].queue_io(&qos->rate_limits[i], 1845 bdev_io) == true) { 1846 return submitted_ios; 1847 } 1848 } 1849 for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) { 1850 if (!qos->rate_limits[i].update_quota) { 1851 continue; 1852 } 1853 1854 qos->rate_limits[i].update_quota(&qos->rate_limits[i], bdev_io); 1855 } 1856 } 1857 1858 TAILQ_REMOVE(&qos->queued, bdev_io, internal.link); 1859 bdev_io_do_submit(ch, bdev_io); 1860 submitted_ios++; 1861 } 1862 1863 return submitted_ios; 1864 } 1865 1866 static void 1867 bdev_queue_io_wait_with_cb(struct spdk_bdev_io *bdev_io, spdk_bdev_io_wait_cb cb_fn) 1868 { 1869 int rc; 1870 1871 bdev_io->internal.waitq_entry.bdev = bdev_io->bdev; 1872 bdev_io->internal.waitq_entry.cb_fn = cb_fn; 1873 bdev_io->internal.waitq_entry.cb_arg = bdev_io; 1874 rc = spdk_bdev_queue_io_wait(bdev_io->bdev, spdk_io_channel_from_ctx(bdev_io->internal.ch), 1875 &bdev_io->internal.waitq_entry); 1876 if (rc != 0) { 1877 SPDK_ERRLOG("Queue IO failed, rc=%d\n", rc); 1878 bdev_io->internal.status = SPDK_BDEV_IO_STATUS_FAILED; 1879 bdev_io->internal.cb(bdev_io, false, bdev_io->internal.caller_ctx); 1880 } 1881 } 1882 1883 static bool 1884 bdev_io_type_can_split(uint8_t type) 1885 { 1886 assert(type != SPDK_BDEV_IO_TYPE_INVALID); 1887 assert(type < SPDK_BDEV_NUM_IO_TYPES); 1888 1889 /* Only split READ and WRITE I/O. Theoretically other types of I/O like 1890 * UNMAP could be split, but these types of I/O are typically much larger 1891 * in size (sometimes the size of the entire block device), and the bdev 1892 * module can more efficiently split these types of I/O. Plus those types 1893 * of I/O do not have a payload, which makes the splitting process simpler. 1894 */ 1895 if (type == SPDK_BDEV_IO_TYPE_READ || type == SPDK_BDEV_IO_TYPE_WRITE) { 1896 return true; 1897 } else { 1898 return false; 1899 } 1900 } 1901 1902 static bool 1903 bdev_io_should_split(struct spdk_bdev_io *bdev_io) 1904 { 1905 uint64_t start_stripe, end_stripe; 1906 uint32_t io_boundary = bdev_io->bdev->optimal_io_boundary; 1907 1908 if (io_boundary == 0) { 1909 return false; 1910 } 1911 1912 if (!bdev_io_type_can_split(bdev_io->type)) { 1913 return false; 1914 } 1915 1916 start_stripe = bdev_io->u.bdev.offset_blocks; 1917 end_stripe = start_stripe + bdev_io->u.bdev.num_blocks - 1; 1918 /* Avoid expensive div operations if possible. These spdk_u32 functions are very cheap. */ 1919 if (spdk_likely(spdk_u32_is_pow2(io_boundary))) { 1920 start_stripe >>= spdk_u32log2(io_boundary); 1921 end_stripe >>= spdk_u32log2(io_boundary); 1922 } else { 1923 start_stripe /= io_boundary; 1924 end_stripe /= io_boundary; 1925 } 1926 return (start_stripe != end_stripe); 1927 } 1928 1929 static uint32_t 1930 _to_next_boundary(uint64_t offset, uint32_t boundary) 1931 { 1932 return (boundary - (offset % boundary)); 1933 } 1934 1935 static void 1936 bdev_io_split_done(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg); 1937 1938 static void 1939 _bdev_io_split(void *_bdev_io) 1940 { 1941 struct spdk_bdev_io *bdev_io = _bdev_io; 1942 uint64_t current_offset, remaining; 1943 uint32_t blocklen, to_next_boundary, to_next_boundary_bytes, to_last_block_bytes; 1944 struct iovec *parent_iov, *iov; 1945 uint64_t parent_iov_offset, iov_len; 1946 uint32_t parent_iovpos, parent_iovcnt, child_iovcnt, iovcnt; 1947 void *md_buf = NULL; 1948 int rc; 1949 1950 remaining = bdev_io->u.bdev.split_remaining_num_blocks; 1951 current_offset = bdev_io->u.bdev.split_current_offset_blocks; 1952 blocklen = bdev_io->bdev->blocklen; 1953 parent_iov_offset = (current_offset - bdev_io->u.bdev.offset_blocks) * blocklen; 1954 parent_iovcnt = bdev_io->u.bdev.iovcnt; 1955 1956 for (parent_iovpos = 0; parent_iovpos < parent_iovcnt; parent_iovpos++) { 1957 parent_iov = &bdev_io->u.bdev.iovs[parent_iovpos]; 1958 if (parent_iov_offset < parent_iov->iov_len) { 1959 break; 1960 } 1961 parent_iov_offset -= parent_iov->iov_len; 1962 } 1963 1964 child_iovcnt = 0; 1965 while (remaining > 0 && parent_iovpos < parent_iovcnt && child_iovcnt < BDEV_IO_NUM_CHILD_IOV) { 1966 to_next_boundary = _to_next_boundary(current_offset, bdev_io->bdev->optimal_io_boundary); 1967 to_next_boundary = spdk_min(remaining, to_next_boundary); 1968 to_next_boundary_bytes = to_next_boundary * blocklen; 1969 iov = &bdev_io->child_iov[child_iovcnt]; 1970 iovcnt = 0; 1971 1972 if (bdev_io->u.bdev.md_buf) { 1973 assert((parent_iov_offset % blocklen) > 0); 1974 md_buf = (char *)bdev_io->u.bdev.md_buf + (parent_iov_offset / blocklen) * 1975 spdk_bdev_get_md_size(bdev_io->bdev); 1976 } 1977 1978 while (to_next_boundary_bytes > 0 && parent_iovpos < parent_iovcnt && 1979 child_iovcnt < BDEV_IO_NUM_CHILD_IOV) { 1980 parent_iov = &bdev_io->u.bdev.iovs[parent_iovpos]; 1981 iov_len = spdk_min(to_next_boundary_bytes, parent_iov->iov_len - parent_iov_offset); 1982 to_next_boundary_bytes -= iov_len; 1983 1984 bdev_io->child_iov[child_iovcnt].iov_base = parent_iov->iov_base + parent_iov_offset; 1985 bdev_io->child_iov[child_iovcnt].iov_len = iov_len; 1986 1987 if (iov_len < parent_iov->iov_len - parent_iov_offset) { 1988 parent_iov_offset += iov_len; 1989 } else { 1990 parent_iovpos++; 1991 parent_iov_offset = 0; 1992 } 1993 child_iovcnt++; 1994 iovcnt++; 1995 } 1996 1997 if (to_next_boundary_bytes > 0) { 1998 /* We had to stop this child I/O early because we ran out of 1999 * child_iov space. Ensure the iovs to be aligned with block 2000 * size and then adjust to_next_boundary before starting the 2001 * child I/O. 2002 */ 2003 assert(child_iovcnt == BDEV_IO_NUM_CHILD_IOV); 2004 to_last_block_bytes = to_next_boundary_bytes % blocklen; 2005 if (to_last_block_bytes != 0) { 2006 uint32_t child_iovpos = child_iovcnt - 1; 2007 /* don't decrease child_iovcnt so the loop will naturally end */ 2008 2009 to_last_block_bytes = blocklen - to_last_block_bytes; 2010 to_next_boundary_bytes += to_last_block_bytes; 2011 while (to_last_block_bytes > 0 && iovcnt > 0) { 2012 iov_len = spdk_min(to_last_block_bytes, 2013 bdev_io->child_iov[child_iovpos].iov_len); 2014 bdev_io->child_iov[child_iovpos].iov_len -= iov_len; 2015 if (bdev_io->child_iov[child_iovpos].iov_len == 0) { 2016 child_iovpos--; 2017 if (--iovcnt == 0) { 2018 return; 2019 } 2020 } 2021 to_last_block_bytes -= iov_len; 2022 } 2023 2024 assert(to_last_block_bytes == 0); 2025 } 2026 to_next_boundary -= to_next_boundary_bytes / blocklen; 2027 } 2028 2029 bdev_io->u.bdev.split_outstanding++; 2030 2031 if (bdev_io->type == SPDK_BDEV_IO_TYPE_READ) { 2032 rc = bdev_readv_blocks_with_md(bdev_io->internal.desc, 2033 spdk_io_channel_from_ctx(bdev_io->internal.ch), 2034 iov, iovcnt, md_buf, current_offset, 2035 to_next_boundary, 2036 bdev_io_split_done, bdev_io); 2037 } else { 2038 rc = bdev_writev_blocks_with_md(bdev_io->internal.desc, 2039 spdk_io_channel_from_ctx(bdev_io->internal.ch), 2040 iov, iovcnt, md_buf, current_offset, 2041 to_next_boundary, 2042 bdev_io_split_done, bdev_io); 2043 } 2044 2045 if (rc == 0) { 2046 current_offset += to_next_boundary; 2047 remaining -= to_next_boundary; 2048 bdev_io->u.bdev.split_current_offset_blocks = current_offset; 2049 bdev_io->u.bdev.split_remaining_num_blocks = remaining; 2050 } else { 2051 bdev_io->u.bdev.split_outstanding--; 2052 if (rc == -ENOMEM) { 2053 if (bdev_io->u.bdev.split_outstanding == 0) { 2054 /* No I/O is outstanding. Hence we should wait here. */ 2055 bdev_queue_io_wait_with_cb(bdev_io, _bdev_io_split); 2056 } 2057 } else { 2058 bdev_io->internal.status = SPDK_BDEV_IO_STATUS_FAILED; 2059 if (bdev_io->u.bdev.split_outstanding == 0) { 2060 spdk_trace_record_tsc(spdk_get_ticks(), TRACE_BDEV_IO_DONE, 0, 0, 2061 (uintptr_t)bdev_io, 0); 2062 TAILQ_REMOVE(&bdev_io->internal.ch->io_submitted, bdev_io, internal.ch_link); 2063 bdev_io->internal.cb(bdev_io, false, bdev_io->internal.caller_ctx); 2064 } 2065 } 2066 2067 return; 2068 } 2069 } 2070 } 2071 2072 static void 2073 bdev_io_split_done(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg) 2074 { 2075 struct spdk_bdev_io *parent_io = cb_arg; 2076 2077 spdk_bdev_free_io(bdev_io); 2078 2079 if (!success) { 2080 parent_io->internal.status = SPDK_BDEV_IO_STATUS_FAILED; 2081 /* If any child I/O failed, stop further splitting process. */ 2082 parent_io->u.bdev.split_current_offset_blocks += parent_io->u.bdev.split_remaining_num_blocks; 2083 parent_io->u.bdev.split_remaining_num_blocks = 0; 2084 } 2085 parent_io->u.bdev.split_outstanding--; 2086 if (parent_io->u.bdev.split_outstanding != 0) { 2087 return; 2088 } 2089 2090 /* 2091 * Parent I/O finishes when all blocks are consumed. 2092 */ 2093 if (parent_io->u.bdev.split_remaining_num_blocks == 0) { 2094 assert(parent_io->internal.cb != bdev_io_split_done); 2095 spdk_trace_record_tsc(spdk_get_ticks(), TRACE_BDEV_IO_DONE, 0, 0, 2096 (uintptr_t)parent_io, 0); 2097 TAILQ_REMOVE(&parent_io->internal.ch->io_submitted, parent_io, internal.ch_link); 2098 parent_io->internal.cb(parent_io, parent_io->internal.status == SPDK_BDEV_IO_STATUS_SUCCESS, 2099 parent_io->internal.caller_ctx); 2100 return; 2101 } 2102 2103 /* 2104 * Continue with the splitting process. This function will complete the parent I/O if the 2105 * splitting is done. 2106 */ 2107 _bdev_io_split(parent_io); 2108 } 2109 2110 static void 2111 bdev_io_split_get_buf_cb(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io, bool success); 2112 2113 static void 2114 bdev_io_split(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io) 2115 { 2116 assert(bdev_io_type_can_split(bdev_io->type)); 2117 2118 bdev_io->u.bdev.split_current_offset_blocks = bdev_io->u.bdev.offset_blocks; 2119 bdev_io->u.bdev.split_remaining_num_blocks = bdev_io->u.bdev.num_blocks; 2120 bdev_io->u.bdev.split_outstanding = 0; 2121 bdev_io->internal.status = SPDK_BDEV_IO_STATUS_SUCCESS; 2122 2123 if (_is_buf_allocated(bdev_io->u.bdev.iovs)) { 2124 _bdev_io_split(bdev_io); 2125 } else { 2126 assert(bdev_io->type == SPDK_BDEV_IO_TYPE_READ); 2127 spdk_bdev_io_get_buf(bdev_io, bdev_io_split_get_buf_cb, 2128 bdev_io->u.bdev.num_blocks * bdev_io->bdev->blocklen); 2129 } 2130 } 2131 2132 static void 2133 bdev_io_split_get_buf_cb(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io, bool success) 2134 { 2135 if (!success) { 2136 spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED); 2137 return; 2138 } 2139 2140 bdev_io_split(ch, bdev_io); 2141 } 2142 2143 /* Explicitly mark this inline, since it's used as a function pointer and otherwise won't 2144 * be inlined, at least on some compilers. 2145 */ 2146 static inline void 2147 _bdev_io_submit(void *ctx) 2148 { 2149 struct spdk_bdev_io *bdev_io = ctx; 2150 struct spdk_bdev *bdev = bdev_io->bdev; 2151 struct spdk_bdev_channel *bdev_ch = bdev_io->internal.ch; 2152 uint64_t tsc; 2153 2154 tsc = spdk_get_ticks(); 2155 bdev_io->internal.submit_tsc = tsc; 2156 spdk_trace_record_tsc(tsc, TRACE_BDEV_IO_START, 0, 0, (uintptr_t)bdev_io, bdev_io->type); 2157 2158 if (spdk_likely(bdev_ch->flags == 0)) { 2159 bdev_io_do_submit(bdev_ch, bdev_io); 2160 return; 2161 } 2162 2163 if (bdev_ch->flags & BDEV_CH_RESET_IN_PROGRESS) { 2164 _bdev_io_complete_in_submit(bdev_ch, bdev_io, SPDK_BDEV_IO_STATUS_ABORTED); 2165 } else if (bdev_ch->flags & BDEV_CH_QOS_ENABLED) { 2166 if (spdk_unlikely(bdev_io->type == SPDK_BDEV_IO_TYPE_ABORT) && 2167 bdev_abort_queued_io(&bdev->internal.qos->queued, bdev_io->u.abort.bio_to_abort)) { 2168 _bdev_io_complete_in_submit(bdev_ch, bdev_io, SPDK_BDEV_IO_STATUS_SUCCESS); 2169 } else { 2170 TAILQ_INSERT_TAIL(&bdev->internal.qos->queued, bdev_io, internal.link); 2171 bdev_qos_io_submit(bdev_ch, bdev->internal.qos); 2172 } 2173 } else { 2174 SPDK_ERRLOG("unknown bdev_ch flag %x found\n", bdev_ch->flags); 2175 _bdev_io_complete_in_submit(bdev_ch, bdev_io, SPDK_BDEV_IO_STATUS_FAILED); 2176 } 2177 } 2178 2179 bool 2180 bdev_lba_range_overlapped(struct lba_range *range1, struct lba_range *range2); 2181 2182 bool 2183 bdev_lba_range_overlapped(struct lba_range *range1, struct lba_range *range2) 2184 { 2185 if (range1->length == 0 || range2->length == 0) { 2186 return false; 2187 } 2188 2189 if (range1->offset + range1->length <= range2->offset) { 2190 return false; 2191 } 2192 2193 if (range2->offset + range2->length <= range1->offset) { 2194 return false; 2195 } 2196 2197 return true; 2198 } 2199 2200 static bool 2201 bdev_io_range_is_locked(struct spdk_bdev_io *bdev_io, struct lba_range *range) 2202 { 2203 struct spdk_bdev_channel *ch = bdev_io->internal.ch; 2204 struct lba_range r; 2205 2206 switch (bdev_io->type) { 2207 case SPDK_BDEV_IO_TYPE_NVME_IO: 2208 case SPDK_BDEV_IO_TYPE_NVME_IO_MD: 2209 /* Don't try to decode the NVMe command - just assume worst-case and that 2210 * it overlaps a locked range. 2211 */ 2212 return true; 2213 case SPDK_BDEV_IO_TYPE_WRITE: 2214 case SPDK_BDEV_IO_TYPE_UNMAP: 2215 case SPDK_BDEV_IO_TYPE_WRITE_ZEROES: 2216 case SPDK_BDEV_IO_TYPE_ZCOPY: 2217 r.offset = bdev_io->u.bdev.offset_blocks; 2218 r.length = bdev_io->u.bdev.num_blocks; 2219 if (!bdev_lba_range_overlapped(range, &r)) { 2220 /* This I/O doesn't overlap the specified LBA range. */ 2221 return false; 2222 } else if (range->owner_ch == ch && range->locked_ctx == bdev_io->internal.caller_ctx) { 2223 /* This I/O overlaps, but the I/O is on the same channel that locked this 2224 * range, and the caller_ctx is the same as the locked_ctx. This means 2225 * that this I/O is associated with the lock, and is allowed to execute. 2226 */ 2227 return false; 2228 } else { 2229 return true; 2230 } 2231 default: 2232 return false; 2233 } 2234 } 2235 2236 void 2237 bdev_io_submit(struct spdk_bdev_io *bdev_io) 2238 { 2239 struct spdk_bdev *bdev = bdev_io->bdev; 2240 struct spdk_thread *thread = spdk_bdev_io_get_thread(bdev_io); 2241 struct spdk_bdev_channel *ch = bdev_io->internal.ch; 2242 2243 assert(thread != NULL); 2244 assert(bdev_io->internal.status == SPDK_BDEV_IO_STATUS_PENDING); 2245 2246 if (!TAILQ_EMPTY(&ch->locked_ranges)) { 2247 struct lba_range *range; 2248 2249 TAILQ_FOREACH(range, &ch->locked_ranges, tailq) { 2250 if (bdev_io_range_is_locked(bdev_io, range)) { 2251 TAILQ_INSERT_TAIL(&ch->io_locked, bdev_io, internal.ch_link); 2252 return; 2253 } 2254 } 2255 } 2256 2257 TAILQ_INSERT_TAIL(&ch->io_submitted, bdev_io, internal.ch_link); 2258 2259 if (bdev->split_on_optimal_io_boundary && bdev_io_should_split(bdev_io)) { 2260 bdev_io->internal.submit_tsc = spdk_get_ticks(); 2261 spdk_trace_record_tsc(bdev_io->internal.submit_tsc, TRACE_BDEV_IO_START, 0, 0, 2262 (uintptr_t)bdev_io, bdev_io->type); 2263 bdev_io_split(NULL, bdev_io); 2264 return; 2265 } 2266 2267 if (ch->flags & BDEV_CH_QOS_ENABLED) { 2268 if ((thread == bdev->internal.qos->thread) || !bdev->internal.qos->thread) { 2269 _bdev_io_submit(bdev_io); 2270 } else { 2271 bdev_io->internal.io_submit_ch = ch; 2272 bdev_io->internal.ch = bdev->internal.qos->ch; 2273 spdk_thread_send_msg(bdev->internal.qos->thread, _bdev_io_submit, bdev_io); 2274 } 2275 } else { 2276 _bdev_io_submit(bdev_io); 2277 } 2278 } 2279 2280 static void 2281 bdev_io_submit_reset(struct spdk_bdev_io *bdev_io) 2282 { 2283 struct spdk_bdev *bdev = bdev_io->bdev; 2284 struct spdk_bdev_channel *bdev_ch = bdev_io->internal.ch; 2285 struct spdk_io_channel *ch = bdev_ch->channel; 2286 2287 assert(bdev_io->internal.status == SPDK_BDEV_IO_STATUS_PENDING); 2288 2289 bdev_io->internal.in_submit_request = true; 2290 bdev->fn_table->submit_request(ch, bdev_io); 2291 bdev_io->internal.in_submit_request = false; 2292 } 2293 2294 void 2295 bdev_io_init(struct spdk_bdev_io *bdev_io, 2296 struct spdk_bdev *bdev, void *cb_arg, 2297 spdk_bdev_io_completion_cb cb) 2298 { 2299 bdev_io->bdev = bdev; 2300 bdev_io->internal.caller_ctx = cb_arg; 2301 bdev_io->internal.cb = cb; 2302 bdev_io->internal.status = SPDK_BDEV_IO_STATUS_PENDING; 2303 bdev_io->internal.in_submit_request = false; 2304 bdev_io->internal.buf = NULL; 2305 bdev_io->internal.io_submit_ch = NULL; 2306 bdev_io->internal.orig_iovs = NULL; 2307 bdev_io->internal.orig_iovcnt = 0; 2308 bdev_io->internal.orig_md_buf = NULL; 2309 bdev_io->internal.error.nvme.cdw0 = 0; 2310 bdev_io->num_retries = 0; 2311 bdev_io->internal.get_buf_cb = NULL; 2312 bdev_io->internal.get_aux_buf_cb = NULL; 2313 } 2314 2315 static bool 2316 bdev_io_type_supported(struct spdk_bdev *bdev, enum spdk_bdev_io_type io_type) 2317 { 2318 return bdev->fn_table->io_type_supported(bdev->ctxt, io_type); 2319 } 2320 2321 bool 2322 spdk_bdev_io_type_supported(struct spdk_bdev *bdev, enum spdk_bdev_io_type io_type) 2323 { 2324 bool supported; 2325 2326 supported = bdev_io_type_supported(bdev, io_type); 2327 2328 if (!supported) { 2329 switch (io_type) { 2330 case SPDK_BDEV_IO_TYPE_WRITE_ZEROES: 2331 /* The bdev layer will emulate write zeroes as long as write is supported. */ 2332 supported = bdev_io_type_supported(bdev, SPDK_BDEV_IO_TYPE_WRITE); 2333 break; 2334 case SPDK_BDEV_IO_TYPE_ZCOPY: 2335 /* Zero copy can be emulated with regular read and write */ 2336 supported = bdev_io_type_supported(bdev, SPDK_BDEV_IO_TYPE_READ) && 2337 bdev_io_type_supported(bdev, SPDK_BDEV_IO_TYPE_WRITE); 2338 break; 2339 default: 2340 break; 2341 } 2342 } 2343 2344 return supported; 2345 } 2346 2347 int 2348 spdk_bdev_dump_info_json(struct spdk_bdev *bdev, struct spdk_json_write_ctx *w) 2349 { 2350 if (bdev->fn_table->dump_info_json) { 2351 return bdev->fn_table->dump_info_json(bdev->ctxt, w); 2352 } 2353 2354 return 0; 2355 } 2356 2357 static void 2358 bdev_qos_update_max_quota_per_timeslice(struct spdk_bdev_qos *qos) 2359 { 2360 uint32_t max_per_timeslice = 0; 2361 int i; 2362 2363 for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) { 2364 if (qos->rate_limits[i].limit == SPDK_BDEV_QOS_LIMIT_NOT_DEFINED) { 2365 qos->rate_limits[i].max_per_timeslice = 0; 2366 continue; 2367 } 2368 2369 max_per_timeslice = qos->rate_limits[i].limit * 2370 SPDK_BDEV_QOS_TIMESLICE_IN_USEC / SPDK_SEC_TO_USEC; 2371 2372 qos->rate_limits[i].max_per_timeslice = spdk_max(max_per_timeslice, 2373 qos->rate_limits[i].min_per_timeslice); 2374 2375 qos->rate_limits[i].remaining_this_timeslice = qos->rate_limits[i].max_per_timeslice; 2376 } 2377 2378 bdev_qos_set_ops(qos); 2379 } 2380 2381 static int 2382 bdev_channel_poll_qos(void *arg) 2383 { 2384 struct spdk_bdev_qos *qos = arg; 2385 uint64_t now = spdk_get_ticks(); 2386 int i; 2387 2388 if (now < (qos->last_timeslice + qos->timeslice_size)) { 2389 /* We received our callback earlier than expected - return 2390 * immediately and wait to do accounting until at least one 2391 * timeslice has actually expired. This should never happen 2392 * with a well-behaved timer implementation. 2393 */ 2394 return SPDK_POLLER_IDLE; 2395 } 2396 2397 /* Reset for next round of rate limiting */ 2398 for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) { 2399 /* We may have allowed the IOs or bytes to slightly overrun in the last 2400 * timeslice. remaining_this_timeslice is signed, so if it's negative 2401 * here, we'll account for the overrun so that the next timeslice will 2402 * be appropriately reduced. 2403 */ 2404 if (qos->rate_limits[i].remaining_this_timeslice > 0) { 2405 qos->rate_limits[i].remaining_this_timeslice = 0; 2406 } 2407 } 2408 2409 while (now >= (qos->last_timeslice + qos->timeslice_size)) { 2410 qos->last_timeslice += qos->timeslice_size; 2411 for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) { 2412 qos->rate_limits[i].remaining_this_timeslice += 2413 qos->rate_limits[i].max_per_timeslice; 2414 } 2415 } 2416 2417 return bdev_qos_io_submit(qos->ch, qos); 2418 } 2419 2420 static void 2421 bdev_channel_destroy_resource(struct spdk_bdev_channel *ch) 2422 { 2423 struct spdk_bdev_shared_resource *shared_resource; 2424 struct lba_range *range; 2425 2426 while (!TAILQ_EMPTY(&ch->locked_ranges)) { 2427 range = TAILQ_FIRST(&ch->locked_ranges); 2428 TAILQ_REMOVE(&ch->locked_ranges, range, tailq); 2429 free(range); 2430 } 2431 2432 spdk_put_io_channel(ch->channel); 2433 2434 shared_resource = ch->shared_resource; 2435 2436 assert(TAILQ_EMPTY(&ch->io_locked)); 2437 assert(TAILQ_EMPTY(&ch->io_submitted)); 2438 assert(ch->io_outstanding == 0); 2439 assert(shared_resource->ref > 0); 2440 shared_resource->ref--; 2441 if (shared_resource->ref == 0) { 2442 assert(shared_resource->io_outstanding == 0); 2443 TAILQ_REMOVE(&shared_resource->mgmt_ch->shared_resources, shared_resource, link); 2444 spdk_put_io_channel(spdk_io_channel_from_ctx(shared_resource->mgmt_ch)); 2445 free(shared_resource); 2446 } 2447 } 2448 2449 /* Caller must hold bdev->internal.mutex. */ 2450 static void 2451 bdev_enable_qos(struct spdk_bdev *bdev, struct spdk_bdev_channel *ch) 2452 { 2453 struct spdk_bdev_qos *qos = bdev->internal.qos; 2454 int i; 2455 2456 /* Rate limiting on this bdev enabled */ 2457 if (qos) { 2458 if (qos->ch == NULL) { 2459 struct spdk_io_channel *io_ch; 2460 2461 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Selecting channel %p as QoS channel for bdev %s on thread %p\n", ch, 2462 bdev->name, spdk_get_thread()); 2463 2464 /* No qos channel has been selected, so set one up */ 2465 2466 /* Take another reference to ch */ 2467 io_ch = spdk_get_io_channel(__bdev_to_io_dev(bdev)); 2468 assert(io_ch != NULL); 2469 qos->ch = ch; 2470 2471 qos->thread = spdk_io_channel_get_thread(io_ch); 2472 2473 TAILQ_INIT(&qos->queued); 2474 2475 for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) { 2476 if (bdev_qos_is_iops_rate_limit(i) == true) { 2477 qos->rate_limits[i].min_per_timeslice = 2478 SPDK_BDEV_QOS_MIN_IO_PER_TIMESLICE; 2479 } else { 2480 qos->rate_limits[i].min_per_timeslice = 2481 SPDK_BDEV_QOS_MIN_BYTE_PER_TIMESLICE; 2482 } 2483 2484 if (qos->rate_limits[i].limit == 0) { 2485 qos->rate_limits[i].limit = SPDK_BDEV_QOS_LIMIT_NOT_DEFINED; 2486 } 2487 } 2488 bdev_qos_update_max_quota_per_timeslice(qos); 2489 qos->timeslice_size = 2490 SPDK_BDEV_QOS_TIMESLICE_IN_USEC * spdk_get_ticks_hz() / SPDK_SEC_TO_USEC; 2491 qos->last_timeslice = spdk_get_ticks(); 2492 qos->poller = SPDK_POLLER_REGISTER(bdev_channel_poll_qos, 2493 qos, 2494 SPDK_BDEV_QOS_TIMESLICE_IN_USEC); 2495 } 2496 2497 ch->flags |= BDEV_CH_QOS_ENABLED; 2498 } 2499 } 2500 2501 struct poll_timeout_ctx { 2502 struct spdk_bdev_desc *desc; 2503 uint64_t timeout_in_sec; 2504 spdk_bdev_io_timeout_cb cb_fn; 2505 void *cb_arg; 2506 }; 2507 2508 static void 2509 bdev_desc_free(struct spdk_bdev_desc *desc) 2510 { 2511 pthread_mutex_destroy(&desc->mutex); 2512 free(desc->media_events_buffer); 2513 free(desc); 2514 } 2515 2516 static void 2517 bdev_channel_poll_timeout_io_done(struct spdk_io_channel_iter *i, int status) 2518 { 2519 struct poll_timeout_ctx *ctx = spdk_io_channel_iter_get_ctx(i); 2520 struct spdk_bdev_desc *desc = ctx->desc; 2521 2522 free(ctx); 2523 2524 pthread_mutex_lock(&desc->mutex); 2525 desc->refs--; 2526 if (desc->closed == true && desc->refs == 0) { 2527 pthread_mutex_unlock(&desc->mutex); 2528 bdev_desc_free(desc); 2529 return; 2530 } 2531 pthread_mutex_unlock(&desc->mutex); 2532 } 2533 2534 static void 2535 bdev_channel_poll_timeout_io(struct spdk_io_channel_iter *i) 2536 { 2537 struct poll_timeout_ctx *ctx = spdk_io_channel_iter_get_ctx(i); 2538 struct spdk_io_channel *io_ch = spdk_io_channel_iter_get_channel(i); 2539 struct spdk_bdev_channel *bdev_ch = spdk_io_channel_get_ctx(io_ch); 2540 struct spdk_bdev_desc *desc = ctx->desc; 2541 struct spdk_bdev_io *bdev_io; 2542 uint64_t now; 2543 2544 pthread_mutex_lock(&desc->mutex); 2545 if (desc->closed == true) { 2546 pthread_mutex_unlock(&desc->mutex); 2547 spdk_for_each_channel_continue(i, -1); 2548 return; 2549 } 2550 pthread_mutex_unlock(&desc->mutex); 2551 2552 now = spdk_get_ticks(); 2553 TAILQ_FOREACH(bdev_io, &bdev_ch->io_submitted, internal.ch_link) { 2554 /* Exclude any I/O that are generated via splitting. */ 2555 if (bdev_io->internal.cb == bdev_io_split_done) { 2556 continue; 2557 } 2558 2559 /* Once we find an I/O that has not timed out, we can immediately 2560 * exit the loop. 2561 */ 2562 if (now < (bdev_io->internal.submit_tsc + 2563 ctx->timeout_in_sec * spdk_get_ticks_hz())) { 2564 goto end; 2565 } 2566 2567 if (bdev_io->internal.desc == desc) { 2568 ctx->cb_fn(ctx->cb_arg, bdev_io); 2569 } 2570 } 2571 2572 end: 2573 spdk_for_each_channel_continue(i, 0); 2574 } 2575 2576 static int 2577 bdev_poll_timeout_io(void *arg) 2578 { 2579 struct spdk_bdev_desc *desc = arg; 2580 struct spdk_bdev *bdev = spdk_bdev_desc_get_bdev(desc); 2581 struct poll_timeout_ctx *ctx; 2582 2583 ctx = calloc(1, sizeof(struct poll_timeout_ctx)); 2584 if (!ctx) { 2585 SPDK_ERRLOG("failed to allocate memory\n"); 2586 return SPDK_POLLER_BUSY; 2587 } 2588 ctx->desc = desc; 2589 ctx->cb_arg = desc->cb_arg; 2590 ctx->cb_fn = desc->cb_fn; 2591 ctx->timeout_in_sec = desc->timeout_in_sec; 2592 2593 /* Take a ref on the descriptor in case it gets closed while we are checking 2594 * all of the channels. 2595 */ 2596 pthread_mutex_lock(&desc->mutex); 2597 desc->refs++; 2598 pthread_mutex_unlock(&desc->mutex); 2599 2600 spdk_for_each_channel(__bdev_to_io_dev(bdev), 2601 bdev_channel_poll_timeout_io, 2602 ctx, 2603 bdev_channel_poll_timeout_io_done); 2604 2605 return SPDK_POLLER_BUSY; 2606 } 2607 2608 int 2609 spdk_bdev_set_timeout(struct spdk_bdev_desc *desc, uint64_t timeout_in_sec, 2610 spdk_bdev_io_timeout_cb cb_fn, void *cb_arg) 2611 { 2612 assert(desc->thread == spdk_get_thread()); 2613 2614 spdk_poller_unregister(&desc->io_timeout_poller); 2615 2616 if (timeout_in_sec) { 2617 assert(cb_fn != NULL); 2618 desc->io_timeout_poller = SPDK_POLLER_REGISTER(bdev_poll_timeout_io, 2619 desc, 2620 SPDK_BDEV_IO_POLL_INTERVAL_IN_MSEC * SPDK_SEC_TO_USEC / 2621 1000); 2622 if (desc->io_timeout_poller == NULL) { 2623 SPDK_ERRLOG("can not register the desc timeout IO poller\n"); 2624 return -1; 2625 } 2626 } 2627 2628 desc->cb_fn = cb_fn; 2629 desc->cb_arg = cb_arg; 2630 desc->timeout_in_sec = timeout_in_sec; 2631 2632 return 0; 2633 } 2634 2635 static int 2636 bdev_channel_create(void *io_device, void *ctx_buf) 2637 { 2638 struct spdk_bdev *bdev = __bdev_from_io_dev(io_device); 2639 struct spdk_bdev_channel *ch = ctx_buf; 2640 struct spdk_io_channel *mgmt_io_ch; 2641 struct spdk_bdev_mgmt_channel *mgmt_ch; 2642 struct spdk_bdev_shared_resource *shared_resource; 2643 struct lba_range *range; 2644 2645 ch->bdev = bdev; 2646 ch->channel = bdev->fn_table->get_io_channel(bdev->ctxt); 2647 if (!ch->channel) { 2648 return -1; 2649 } 2650 2651 assert(ch->histogram == NULL); 2652 if (bdev->internal.histogram_enabled) { 2653 ch->histogram = spdk_histogram_data_alloc(); 2654 if (ch->histogram == NULL) { 2655 SPDK_ERRLOG("Could not allocate histogram\n"); 2656 } 2657 } 2658 2659 mgmt_io_ch = spdk_get_io_channel(&g_bdev_mgr); 2660 if (!mgmt_io_ch) { 2661 spdk_put_io_channel(ch->channel); 2662 return -1; 2663 } 2664 2665 mgmt_ch = spdk_io_channel_get_ctx(mgmt_io_ch); 2666 TAILQ_FOREACH(shared_resource, &mgmt_ch->shared_resources, link) { 2667 if (shared_resource->shared_ch == ch->channel) { 2668 spdk_put_io_channel(mgmt_io_ch); 2669 shared_resource->ref++; 2670 break; 2671 } 2672 } 2673 2674 if (shared_resource == NULL) { 2675 shared_resource = calloc(1, sizeof(*shared_resource)); 2676 if (shared_resource == NULL) { 2677 spdk_put_io_channel(ch->channel); 2678 spdk_put_io_channel(mgmt_io_ch); 2679 return -1; 2680 } 2681 2682 shared_resource->mgmt_ch = mgmt_ch; 2683 shared_resource->io_outstanding = 0; 2684 TAILQ_INIT(&shared_resource->nomem_io); 2685 shared_resource->nomem_threshold = 0; 2686 shared_resource->shared_ch = ch->channel; 2687 shared_resource->ref = 1; 2688 TAILQ_INSERT_TAIL(&mgmt_ch->shared_resources, shared_resource, link); 2689 } 2690 2691 memset(&ch->stat, 0, sizeof(ch->stat)); 2692 ch->stat.ticks_rate = spdk_get_ticks_hz(); 2693 ch->io_outstanding = 0; 2694 TAILQ_INIT(&ch->queued_resets); 2695 TAILQ_INIT(&ch->locked_ranges); 2696 ch->flags = 0; 2697 ch->shared_resource = shared_resource; 2698 2699 TAILQ_INIT(&ch->io_submitted); 2700 TAILQ_INIT(&ch->io_locked); 2701 2702 #ifdef SPDK_CONFIG_VTUNE 2703 { 2704 char *name; 2705 __itt_init_ittlib(NULL, 0); 2706 name = spdk_sprintf_alloc("spdk_bdev_%s_%p", ch->bdev->name, ch); 2707 if (!name) { 2708 bdev_channel_destroy_resource(ch); 2709 return -1; 2710 } 2711 ch->handle = __itt_string_handle_create(name); 2712 free(name); 2713 ch->start_tsc = spdk_get_ticks(); 2714 ch->interval_tsc = spdk_get_ticks_hz() / 100; 2715 memset(&ch->prev_stat, 0, sizeof(ch->prev_stat)); 2716 } 2717 #endif 2718 2719 pthread_mutex_lock(&bdev->internal.mutex); 2720 bdev_enable_qos(bdev, ch); 2721 2722 TAILQ_FOREACH(range, &bdev->internal.locked_ranges, tailq) { 2723 struct lba_range *new_range; 2724 2725 new_range = calloc(1, sizeof(*new_range)); 2726 if (new_range == NULL) { 2727 pthread_mutex_unlock(&bdev->internal.mutex); 2728 bdev_channel_destroy_resource(ch); 2729 return -1; 2730 } 2731 new_range->length = range->length; 2732 new_range->offset = range->offset; 2733 new_range->locked_ctx = range->locked_ctx; 2734 TAILQ_INSERT_TAIL(&ch->locked_ranges, new_range, tailq); 2735 } 2736 2737 pthread_mutex_unlock(&bdev->internal.mutex); 2738 2739 return 0; 2740 } 2741 2742 /* 2743 * Abort I/O that are waiting on a data buffer. These types of I/O are 2744 * linked using the spdk_bdev_io internal.buf_link TAILQ_ENTRY. 2745 */ 2746 static void 2747 bdev_abort_all_buf_io(bdev_io_stailq_t *queue, struct spdk_bdev_channel *ch) 2748 { 2749 bdev_io_stailq_t tmp; 2750 struct spdk_bdev_io *bdev_io; 2751 2752 STAILQ_INIT(&tmp); 2753 2754 while (!STAILQ_EMPTY(queue)) { 2755 bdev_io = STAILQ_FIRST(queue); 2756 STAILQ_REMOVE_HEAD(queue, internal.buf_link); 2757 if (bdev_io->internal.ch == ch) { 2758 spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_ABORTED); 2759 } else { 2760 STAILQ_INSERT_TAIL(&tmp, bdev_io, internal.buf_link); 2761 } 2762 } 2763 2764 STAILQ_SWAP(&tmp, queue, spdk_bdev_io); 2765 } 2766 2767 /* 2768 * Abort I/O that are queued waiting for submission. These types of I/O are 2769 * linked using the spdk_bdev_io link TAILQ_ENTRY. 2770 */ 2771 static void 2772 bdev_abort_all_queued_io(bdev_io_tailq_t *queue, struct spdk_bdev_channel *ch) 2773 { 2774 struct spdk_bdev_io *bdev_io, *tmp; 2775 2776 TAILQ_FOREACH_SAFE(bdev_io, queue, internal.link, tmp) { 2777 if (bdev_io->internal.ch == ch) { 2778 TAILQ_REMOVE(queue, bdev_io, internal.link); 2779 /* 2780 * spdk_bdev_io_complete() assumes that the completed I/O had 2781 * been submitted to the bdev module. Since in this case it 2782 * hadn't, bump io_outstanding to account for the decrement 2783 * that spdk_bdev_io_complete() will do. 2784 */ 2785 if (bdev_io->type != SPDK_BDEV_IO_TYPE_RESET) { 2786 ch->io_outstanding++; 2787 ch->shared_resource->io_outstanding++; 2788 } 2789 spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_ABORTED); 2790 } 2791 } 2792 } 2793 2794 static bool 2795 bdev_abort_queued_io(bdev_io_tailq_t *queue, struct spdk_bdev_io *bio_to_abort) 2796 { 2797 struct spdk_bdev_io *bdev_io; 2798 2799 TAILQ_FOREACH(bdev_io, queue, internal.link) { 2800 if (bdev_io == bio_to_abort) { 2801 TAILQ_REMOVE(queue, bio_to_abort, internal.link); 2802 spdk_bdev_io_complete(bio_to_abort, SPDK_BDEV_IO_STATUS_ABORTED); 2803 return true; 2804 } 2805 } 2806 2807 return false; 2808 } 2809 2810 static bool 2811 bdev_abort_buf_io(bdev_io_stailq_t *queue, struct spdk_bdev_io *bio_to_abort) 2812 { 2813 struct spdk_bdev_io *bdev_io; 2814 2815 STAILQ_FOREACH(bdev_io, queue, internal.buf_link) { 2816 if (bdev_io == bio_to_abort) { 2817 STAILQ_REMOVE(queue, bio_to_abort, spdk_bdev_io, internal.buf_link); 2818 spdk_bdev_io_complete(bio_to_abort, SPDK_BDEV_IO_STATUS_ABORTED); 2819 return true; 2820 } 2821 } 2822 2823 return false; 2824 } 2825 2826 static void 2827 bdev_qos_channel_destroy(void *cb_arg) 2828 { 2829 struct spdk_bdev_qos *qos = cb_arg; 2830 2831 spdk_put_io_channel(spdk_io_channel_from_ctx(qos->ch)); 2832 spdk_poller_unregister(&qos->poller); 2833 2834 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Free QoS %p.\n", qos); 2835 2836 free(qos); 2837 } 2838 2839 static int 2840 bdev_qos_destroy(struct spdk_bdev *bdev) 2841 { 2842 int i; 2843 2844 /* 2845 * Cleanly shutting down the QoS poller is tricky, because 2846 * during the asynchronous operation the user could open 2847 * a new descriptor and create a new channel, spawning 2848 * a new QoS poller. 2849 * 2850 * The strategy is to create a new QoS structure here and swap it 2851 * in. The shutdown path then continues to refer to the old one 2852 * until it completes and then releases it. 2853 */ 2854 struct spdk_bdev_qos *new_qos, *old_qos; 2855 2856 old_qos = bdev->internal.qos; 2857 2858 new_qos = calloc(1, sizeof(*new_qos)); 2859 if (!new_qos) { 2860 SPDK_ERRLOG("Unable to allocate memory to shut down QoS.\n"); 2861 return -ENOMEM; 2862 } 2863 2864 /* Copy the old QoS data into the newly allocated structure */ 2865 memcpy(new_qos, old_qos, sizeof(*new_qos)); 2866 2867 /* Zero out the key parts of the QoS structure */ 2868 new_qos->ch = NULL; 2869 new_qos->thread = NULL; 2870 new_qos->poller = NULL; 2871 TAILQ_INIT(&new_qos->queued); 2872 /* 2873 * The limit member of spdk_bdev_qos_limit structure is not zeroed. 2874 * It will be used later for the new QoS structure. 2875 */ 2876 for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) { 2877 new_qos->rate_limits[i].remaining_this_timeslice = 0; 2878 new_qos->rate_limits[i].min_per_timeslice = 0; 2879 new_qos->rate_limits[i].max_per_timeslice = 0; 2880 } 2881 2882 bdev->internal.qos = new_qos; 2883 2884 if (old_qos->thread == NULL) { 2885 free(old_qos); 2886 } else { 2887 spdk_thread_send_msg(old_qos->thread, bdev_qos_channel_destroy, old_qos); 2888 } 2889 2890 /* It is safe to continue with destroying the bdev even though the QoS channel hasn't 2891 * been destroyed yet. The destruction path will end up waiting for the final 2892 * channel to be put before it releases resources. */ 2893 2894 return 0; 2895 } 2896 2897 static void 2898 bdev_io_stat_add(struct spdk_bdev_io_stat *total, struct spdk_bdev_io_stat *add) 2899 { 2900 total->bytes_read += add->bytes_read; 2901 total->num_read_ops += add->num_read_ops; 2902 total->bytes_written += add->bytes_written; 2903 total->num_write_ops += add->num_write_ops; 2904 total->bytes_unmapped += add->bytes_unmapped; 2905 total->num_unmap_ops += add->num_unmap_ops; 2906 total->read_latency_ticks += add->read_latency_ticks; 2907 total->write_latency_ticks += add->write_latency_ticks; 2908 total->unmap_latency_ticks += add->unmap_latency_ticks; 2909 } 2910 2911 static void 2912 bdev_channel_destroy(void *io_device, void *ctx_buf) 2913 { 2914 struct spdk_bdev_channel *ch = ctx_buf; 2915 struct spdk_bdev_mgmt_channel *mgmt_ch; 2916 struct spdk_bdev_shared_resource *shared_resource = ch->shared_resource; 2917 2918 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Destroying channel %p for bdev %s on thread %p\n", ch, ch->bdev->name, 2919 spdk_get_thread()); 2920 2921 /* This channel is going away, so add its statistics into the bdev so that they don't get lost. */ 2922 pthread_mutex_lock(&ch->bdev->internal.mutex); 2923 bdev_io_stat_add(&ch->bdev->internal.stat, &ch->stat); 2924 pthread_mutex_unlock(&ch->bdev->internal.mutex); 2925 2926 mgmt_ch = shared_resource->mgmt_ch; 2927 2928 bdev_abort_all_queued_io(&ch->queued_resets, ch); 2929 bdev_abort_all_queued_io(&shared_resource->nomem_io, ch); 2930 bdev_abort_all_buf_io(&mgmt_ch->need_buf_small, ch); 2931 bdev_abort_all_buf_io(&mgmt_ch->need_buf_large, ch); 2932 2933 if (ch->histogram) { 2934 spdk_histogram_data_free(ch->histogram); 2935 } 2936 2937 bdev_channel_destroy_resource(ch); 2938 } 2939 2940 int 2941 spdk_bdev_alias_add(struct spdk_bdev *bdev, const char *alias) 2942 { 2943 struct spdk_bdev_alias *tmp; 2944 2945 if (alias == NULL) { 2946 SPDK_ERRLOG("Empty alias passed\n"); 2947 return -EINVAL; 2948 } 2949 2950 if (spdk_bdev_get_by_name(alias)) { 2951 SPDK_ERRLOG("Bdev name/alias: %s already exists\n", alias); 2952 return -EEXIST; 2953 } 2954 2955 tmp = calloc(1, sizeof(*tmp)); 2956 if (tmp == NULL) { 2957 SPDK_ERRLOG("Unable to allocate alias\n"); 2958 return -ENOMEM; 2959 } 2960 2961 tmp->alias = strdup(alias); 2962 if (tmp->alias == NULL) { 2963 free(tmp); 2964 SPDK_ERRLOG("Unable to allocate alias\n"); 2965 return -ENOMEM; 2966 } 2967 2968 TAILQ_INSERT_TAIL(&bdev->aliases, tmp, tailq); 2969 2970 return 0; 2971 } 2972 2973 int 2974 spdk_bdev_alias_del(struct spdk_bdev *bdev, const char *alias) 2975 { 2976 struct spdk_bdev_alias *tmp; 2977 2978 TAILQ_FOREACH(tmp, &bdev->aliases, tailq) { 2979 if (strcmp(alias, tmp->alias) == 0) { 2980 TAILQ_REMOVE(&bdev->aliases, tmp, tailq); 2981 free(tmp->alias); 2982 free(tmp); 2983 return 0; 2984 } 2985 } 2986 2987 SPDK_INFOLOG(SPDK_LOG_BDEV, "Alias %s does not exists\n", alias); 2988 2989 return -ENOENT; 2990 } 2991 2992 void 2993 spdk_bdev_alias_del_all(struct spdk_bdev *bdev) 2994 { 2995 struct spdk_bdev_alias *p, *tmp; 2996 2997 TAILQ_FOREACH_SAFE(p, &bdev->aliases, tailq, tmp) { 2998 TAILQ_REMOVE(&bdev->aliases, p, tailq); 2999 free(p->alias); 3000 free(p); 3001 } 3002 } 3003 3004 struct spdk_io_channel * 3005 spdk_bdev_get_io_channel(struct spdk_bdev_desc *desc) 3006 { 3007 return spdk_get_io_channel(__bdev_to_io_dev(spdk_bdev_desc_get_bdev(desc))); 3008 } 3009 3010 const char * 3011 spdk_bdev_get_name(const struct spdk_bdev *bdev) 3012 { 3013 return bdev->name; 3014 } 3015 3016 const char * 3017 spdk_bdev_get_product_name(const struct spdk_bdev *bdev) 3018 { 3019 return bdev->product_name; 3020 } 3021 3022 const struct spdk_bdev_aliases_list * 3023 spdk_bdev_get_aliases(const struct spdk_bdev *bdev) 3024 { 3025 return &bdev->aliases; 3026 } 3027 3028 uint32_t 3029 spdk_bdev_get_block_size(const struct spdk_bdev *bdev) 3030 { 3031 return bdev->blocklen; 3032 } 3033 3034 uint32_t 3035 spdk_bdev_get_write_unit_size(const struct spdk_bdev *bdev) 3036 { 3037 return bdev->write_unit_size; 3038 } 3039 3040 uint64_t 3041 spdk_bdev_get_num_blocks(const struct spdk_bdev *bdev) 3042 { 3043 return bdev->blockcnt; 3044 } 3045 3046 const char * 3047 spdk_bdev_get_qos_rpc_type(enum spdk_bdev_qos_rate_limit_type type) 3048 { 3049 return qos_rpc_type[type]; 3050 } 3051 3052 void 3053 spdk_bdev_get_qos_rate_limits(struct spdk_bdev *bdev, uint64_t *limits) 3054 { 3055 int i; 3056 3057 memset(limits, 0, sizeof(*limits) * SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES); 3058 3059 pthread_mutex_lock(&bdev->internal.mutex); 3060 if (bdev->internal.qos) { 3061 for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) { 3062 if (bdev->internal.qos->rate_limits[i].limit != 3063 SPDK_BDEV_QOS_LIMIT_NOT_DEFINED) { 3064 limits[i] = bdev->internal.qos->rate_limits[i].limit; 3065 if (bdev_qos_is_iops_rate_limit(i) == false) { 3066 /* Change from Byte to Megabyte which is user visible. */ 3067 limits[i] = limits[i] / 1024 / 1024; 3068 } 3069 } 3070 } 3071 } 3072 pthread_mutex_unlock(&bdev->internal.mutex); 3073 } 3074 3075 size_t 3076 spdk_bdev_get_buf_align(const struct spdk_bdev *bdev) 3077 { 3078 return 1 << bdev->required_alignment; 3079 } 3080 3081 uint32_t 3082 spdk_bdev_get_optimal_io_boundary(const struct spdk_bdev *bdev) 3083 { 3084 return bdev->optimal_io_boundary; 3085 } 3086 3087 bool 3088 spdk_bdev_has_write_cache(const struct spdk_bdev *bdev) 3089 { 3090 return bdev->write_cache; 3091 } 3092 3093 const struct spdk_uuid * 3094 spdk_bdev_get_uuid(const struct spdk_bdev *bdev) 3095 { 3096 return &bdev->uuid; 3097 } 3098 3099 uint16_t 3100 spdk_bdev_get_acwu(const struct spdk_bdev *bdev) 3101 { 3102 return bdev->acwu; 3103 } 3104 3105 uint32_t 3106 spdk_bdev_get_md_size(const struct spdk_bdev *bdev) 3107 { 3108 return bdev->md_len; 3109 } 3110 3111 bool 3112 spdk_bdev_is_md_interleaved(const struct spdk_bdev *bdev) 3113 { 3114 return (bdev->md_len != 0) && bdev->md_interleave; 3115 } 3116 3117 bool 3118 spdk_bdev_is_md_separate(const struct spdk_bdev *bdev) 3119 { 3120 return (bdev->md_len != 0) && !bdev->md_interleave; 3121 } 3122 3123 bool 3124 spdk_bdev_is_zoned(const struct spdk_bdev *bdev) 3125 { 3126 return bdev->zoned; 3127 } 3128 3129 uint32_t 3130 spdk_bdev_get_data_block_size(const struct spdk_bdev *bdev) 3131 { 3132 if (spdk_bdev_is_md_interleaved(bdev)) { 3133 return bdev->blocklen - bdev->md_len; 3134 } else { 3135 return bdev->blocklen; 3136 } 3137 } 3138 3139 static uint32_t 3140 _bdev_get_block_size_with_md(const struct spdk_bdev *bdev) 3141 { 3142 if (!spdk_bdev_is_md_interleaved(bdev)) { 3143 return bdev->blocklen + bdev->md_len; 3144 } else { 3145 return bdev->blocklen; 3146 } 3147 } 3148 3149 enum spdk_dif_type spdk_bdev_get_dif_type(const struct spdk_bdev *bdev) 3150 { 3151 if (bdev->md_len != 0) { 3152 return bdev->dif_type; 3153 } else { 3154 return SPDK_DIF_DISABLE; 3155 } 3156 } 3157 3158 bool 3159 spdk_bdev_is_dif_head_of_md(const struct spdk_bdev *bdev) 3160 { 3161 if (spdk_bdev_get_dif_type(bdev) != SPDK_DIF_DISABLE) { 3162 return bdev->dif_is_head_of_md; 3163 } else { 3164 return false; 3165 } 3166 } 3167 3168 bool 3169 spdk_bdev_is_dif_check_enabled(const struct spdk_bdev *bdev, 3170 enum spdk_dif_check_type check_type) 3171 { 3172 if (spdk_bdev_get_dif_type(bdev) == SPDK_DIF_DISABLE) { 3173 return false; 3174 } 3175 3176 switch (check_type) { 3177 case SPDK_DIF_CHECK_TYPE_REFTAG: 3178 return (bdev->dif_check_flags & SPDK_DIF_FLAGS_REFTAG_CHECK) != 0; 3179 case SPDK_DIF_CHECK_TYPE_APPTAG: 3180 return (bdev->dif_check_flags & SPDK_DIF_FLAGS_APPTAG_CHECK) != 0; 3181 case SPDK_DIF_CHECK_TYPE_GUARD: 3182 return (bdev->dif_check_flags & SPDK_DIF_FLAGS_GUARD_CHECK) != 0; 3183 default: 3184 return false; 3185 } 3186 } 3187 3188 uint64_t 3189 spdk_bdev_get_qd(const struct spdk_bdev *bdev) 3190 { 3191 return bdev->internal.measured_queue_depth; 3192 } 3193 3194 uint64_t 3195 spdk_bdev_get_qd_sampling_period(const struct spdk_bdev *bdev) 3196 { 3197 return bdev->internal.period; 3198 } 3199 3200 uint64_t 3201 spdk_bdev_get_weighted_io_time(const struct spdk_bdev *bdev) 3202 { 3203 return bdev->internal.weighted_io_time; 3204 } 3205 3206 uint64_t 3207 spdk_bdev_get_io_time(const struct spdk_bdev *bdev) 3208 { 3209 return bdev->internal.io_time; 3210 } 3211 3212 static void 3213 _calculate_measured_qd_cpl(struct spdk_io_channel_iter *i, int status) 3214 { 3215 struct spdk_bdev *bdev = spdk_io_channel_iter_get_ctx(i); 3216 3217 bdev->internal.measured_queue_depth = bdev->internal.temporary_queue_depth; 3218 3219 if (bdev->internal.measured_queue_depth) { 3220 bdev->internal.io_time += bdev->internal.period; 3221 bdev->internal.weighted_io_time += bdev->internal.period * bdev->internal.measured_queue_depth; 3222 } 3223 } 3224 3225 static void 3226 _calculate_measured_qd(struct spdk_io_channel_iter *i) 3227 { 3228 struct spdk_bdev *bdev = spdk_io_channel_iter_get_ctx(i); 3229 struct spdk_io_channel *io_ch = spdk_io_channel_iter_get_channel(i); 3230 struct spdk_bdev_channel *ch = spdk_io_channel_get_ctx(io_ch); 3231 3232 bdev->internal.temporary_queue_depth += ch->io_outstanding; 3233 spdk_for_each_channel_continue(i, 0); 3234 } 3235 3236 static int 3237 bdev_calculate_measured_queue_depth(void *ctx) 3238 { 3239 struct spdk_bdev *bdev = ctx; 3240 bdev->internal.temporary_queue_depth = 0; 3241 spdk_for_each_channel(__bdev_to_io_dev(bdev), _calculate_measured_qd, bdev, 3242 _calculate_measured_qd_cpl); 3243 return SPDK_POLLER_BUSY; 3244 } 3245 3246 void 3247 spdk_bdev_set_qd_sampling_period(struct spdk_bdev *bdev, uint64_t period) 3248 { 3249 bdev->internal.period = period; 3250 3251 if (bdev->internal.qd_poller != NULL) { 3252 spdk_poller_unregister(&bdev->internal.qd_poller); 3253 bdev->internal.measured_queue_depth = UINT64_MAX; 3254 } 3255 3256 if (period != 0) { 3257 bdev->internal.qd_poller = SPDK_POLLER_REGISTER(bdev_calculate_measured_queue_depth, bdev, 3258 period); 3259 } 3260 } 3261 3262 static void 3263 _resize_notify(void *arg) 3264 { 3265 struct spdk_bdev_desc *desc = arg; 3266 3267 pthread_mutex_lock(&desc->mutex); 3268 desc->refs--; 3269 if (!desc->closed) { 3270 pthread_mutex_unlock(&desc->mutex); 3271 desc->callback.event_fn(SPDK_BDEV_EVENT_RESIZE, 3272 desc->bdev, 3273 desc->callback.ctx); 3274 return; 3275 } else if (0 == desc->refs) { 3276 /* This descriptor was closed after this resize_notify message was sent. 3277 * spdk_bdev_close() could not free the descriptor since this message was 3278 * in flight, so we free it now using bdev_desc_free(). 3279 */ 3280 pthread_mutex_unlock(&desc->mutex); 3281 bdev_desc_free(desc); 3282 return; 3283 } 3284 pthread_mutex_unlock(&desc->mutex); 3285 } 3286 3287 int 3288 spdk_bdev_notify_blockcnt_change(struct spdk_bdev *bdev, uint64_t size) 3289 { 3290 struct spdk_bdev_desc *desc; 3291 int ret; 3292 3293 pthread_mutex_lock(&bdev->internal.mutex); 3294 3295 /* bdev has open descriptors */ 3296 if (!TAILQ_EMPTY(&bdev->internal.open_descs) && 3297 bdev->blockcnt > size) { 3298 ret = -EBUSY; 3299 } else { 3300 bdev->blockcnt = size; 3301 TAILQ_FOREACH(desc, &bdev->internal.open_descs, link) { 3302 pthread_mutex_lock(&desc->mutex); 3303 if (desc->callback.open_with_ext && !desc->closed) { 3304 desc->refs++; 3305 spdk_thread_send_msg(desc->thread, _resize_notify, desc); 3306 } 3307 pthread_mutex_unlock(&desc->mutex); 3308 } 3309 ret = 0; 3310 } 3311 3312 pthread_mutex_unlock(&bdev->internal.mutex); 3313 3314 return ret; 3315 } 3316 3317 /* 3318 * Convert I/O offset and length from bytes to blocks. 3319 * 3320 * Returns zero on success or non-zero if the byte parameters aren't divisible by the block size. 3321 */ 3322 static uint64_t 3323 bdev_bytes_to_blocks(struct spdk_bdev *bdev, uint64_t offset_bytes, uint64_t *offset_blocks, 3324 uint64_t num_bytes, uint64_t *num_blocks) 3325 { 3326 uint32_t block_size = bdev->blocklen; 3327 uint8_t shift_cnt; 3328 3329 /* Avoid expensive div operations if possible. These spdk_u32 functions are very cheap. */ 3330 if (spdk_likely(spdk_u32_is_pow2(block_size))) { 3331 shift_cnt = spdk_u32log2(block_size); 3332 *offset_blocks = offset_bytes >> shift_cnt; 3333 *num_blocks = num_bytes >> shift_cnt; 3334 return (offset_bytes - (*offset_blocks << shift_cnt)) | 3335 (num_bytes - (*num_blocks << shift_cnt)); 3336 } else { 3337 *offset_blocks = offset_bytes / block_size; 3338 *num_blocks = num_bytes / block_size; 3339 return (offset_bytes % block_size) | (num_bytes % block_size); 3340 } 3341 } 3342 3343 static bool 3344 bdev_io_valid_blocks(struct spdk_bdev *bdev, uint64_t offset_blocks, uint64_t num_blocks) 3345 { 3346 /* Return failure if offset_blocks + num_blocks is less than offset_blocks; indicates there 3347 * has been an overflow and hence the offset has been wrapped around */ 3348 if (offset_blocks + num_blocks < offset_blocks) { 3349 return false; 3350 } 3351 3352 /* Return failure if offset_blocks + num_blocks exceeds the size of the bdev */ 3353 if (offset_blocks + num_blocks > bdev->blockcnt) { 3354 return false; 3355 } 3356 3357 return true; 3358 } 3359 3360 static bool 3361 _bdev_io_check_md_buf(const struct iovec *iovs, const void *md_buf) 3362 { 3363 return _is_buf_allocated(iovs) == (md_buf != NULL); 3364 } 3365 3366 static int 3367 bdev_read_blocks_with_md(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, void *buf, 3368 void *md_buf, int64_t offset_blocks, uint64_t num_blocks, 3369 spdk_bdev_io_completion_cb cb, void *cb_arg) 3370 { 3371 struct spdk_bdev *bdev = spdk_bdev_desc_get_bdev(desc); 3372 struct spdk_bdev_io *bdev_io; 3373 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 3374 3375 if (!bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) { 3376 return -EINVAL; 3377 } 3378 3379 bdev_io = bdev_channel_get_io(channel); 3380 if (!bdev_io) { 3381 return -ENOMEM; 3382 } 3383 3384 bdev_io->internal.ch = channel; 3385 bdev_io->internal.desc = desc; 3386 bdev_io->type = SPDK_BDEV_IO_TYPE_READ; 3387 bdev_io->u.bdev.iovs = &bdev_io->iov; 3388 bdev_io->u.bdev.iovs[0].iov_base = buf; 3389 bdev_io->u.bdev.iovs[0].iov_len = num_blocks * bdev->blocklen; 3390 bdev_io->u.bdev.iovcnt = 1; 3391 bdev_io->u.bdev.md_buf = md_buf; 3392 bdev_io->u.bdev.num_blocks = num_blocks; 3393 bdev_io->u.bdev.offset_blocks = offset_blocks; 3394 bdev_io_init(bdev_io, bdev, cb_arg, cb); 3395 3396 bdev_io_submit(bdev_io); 3397 return 0; 3398 } 3399 3400 int 3401 spdk_bdev_read(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 3402 void *buf, uint64_t offset, uint64_t nbytes, 3403 spdk_bdev_io_completion_cb cb, void *cb_arg) 3404 { 3405 uint64_t offset_blocks, num_blocks; 3406 3407 if (bdev_bytes_to_blocks(spdk_bdev_desc_get_bdev(desc), offset, &offset_blocks, 3408 nbytes, &num_blocks) != 0) { 3409 return -EINVAL; 3410 } 3411 3412 return spdk_bdev_read_blocks(desc, ch, buf, offset_blocks, num_blocks, cb, cb_arg); 3413 } 3414 3415 int 3416 spdk_bdev_read_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 3417 void *buf, uint64_t offset_blocks, uint64_t num_blocks, 3418 spdk_bdev_io_completion_cb cb, void *cb_arg) 3419 { 3420 return bdev_read_blocks_with_md(desc, ch, buf, NULL, offset_blocks, num_blocks, cb, cb_arg); 3421 } 3422 3423 int 3424 spdk_bdev_read_blocks_with_md(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 3425 void *buf, void *md_buf, int64_t offset_blocks, uint64_t num_blocks, 3426 spdk_bdev_io_completion_cb cb, void *cb_arg) 3427 { 3428 struct iovec iov = { 3429 .iov_base = buf, 3430 }; 3431 3432 if (!spdk_bdev_is_md_separate(spdk_bdev_desc_get_bdev(desc))) { 3433 return -EINVAL; 3434 } 3435 3436 if (!_bdev_io_check_md_buf(&iov, md_buf)) { 3437 return -EINVAL; 3438 } 3439 3440 return bdev_read_blocks_with_md(desc, ch, buf, md_buf, offset_blocks, num_blocks, 3441 cb, cb_arg); 3442 } 3443 3444 int 3445 spdk_bdev_readv(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 3446 struct iovec *iov, int iovcnt, 3447 uint64_t offset, uint64_t nbytes, 3448 spdk_bdev_io_completion_cb cb, void *cb_arg) 3449 { 3450 uint64_t offset_blocks, num_blocks; 3451 3452 if (bdev_bytes_to_blocks(spdk_bdev_desc_get_bdev(desc), offset, &offset_blocks, 3453 nbytes, &num_blocks) != 0) { 3454 return -EINVAL; 3455 } 3456 3457 return spdk_bdev_readv_blocks(desc, ch, iov, iovcnt, offset_blocks, num_blocks, cb, cb_arg); 3458 } 3459 3460 static int 3461 bdev_readv_blocks_with_md(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 3462 struct iovec *iov, int iovcnt, void *md_buf, uint64_t offset_blocks, 3463 uint64_t num_blocks, spdk_bdev_io_completion_cb cb, void *cb_arg) 3464 { 3465 struct spdk_bdev *bdev = spdk_bdev_desc_get_bdev(desc); 3466 struct spdk_bdev_io *bdev_io; 3467 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 3468 3469 if (!bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) { 3470 return -EINVAL; 3471 } 3472 3473 bdev_io = bdev_channel_get_io(channel); 3474 if (!bdev_io) { 3475 return -ENOMEM; 3476 } 3477 3478 bdev_io->internal.ch = channel; 3479 bdev_io->internal.desc = desc; 3480 bdev_io->type = SPDK_BDEV_IO_TYPE_READ; 3481 bdev_io->u.bdev.iovs = iov; 3482 bdev_io->u.bdev.iovcnt = iovcnt; 3483 bdev_io->u.bdev.md_buf = md_buf; 3484 bdev_io->u.bdev.num_blocks = num_blocks; 3485 bdev_io->u.bdev.offset_blocks = offset_blocks; 3486 bdev_io_init(bdev_io, bdev, cb_arg, cb); 3487 3488 bdev_io_submit(bdev_io); 3489 return 0; 3490 } 3491 3492 int spdk_bdev_readv_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 3493 struct iovec *iov, int iovcnt, 3494 uint64_t offset_blocks, uint64_t num_blocks, 3495 spdk_bdev_io_completion_cb cb, void *cb_arg) 3496 { 3497 return bdev_readv_blocks_with_md(desc, ch, iov, iovcnt, NULL, offset_blocks, 3498 num_blocks, cb, cb_arg); 3499 } 3500 3501 int 3502 spdk_bdev_readv_blocks_with_md(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 3503 struct iovec *iov, int iovcnt, void *md_buf, 3504 uint64_t offset_blocks, uint64_t num_blocks, 3505 spdk_bdev_io_completion_cb cb, void *cb_arg) 3506 { 3507 if (!spdk_bdev_is_md_separate(spdk_bdev_desc_get_bdev(desc))) { 3508 return -EINVAL; 3509 } 3510 3511 if (!_bdev_io_check_md_buf(iov, md_buf)) { 3512 return -EINVAL; 3513 } 3514 3515 return bdev_readv_blocks_with_md(desc, ch, iov, iovcnt, md_buf, offset_blocks, 3516 num_blocks, cb, cb_arg); 3517 } 3518 3519 static int 3520 bdev_write_blocks_with_md(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 3521 void *buf, void *md_buf, uint64_t offset_blocks, uint64_t num_blocks, 3522 spdk_bdev_io_completion_cb cb, void *cb_arg) 3523 { 3524 struct spdk_bdev *bdev = spdk_bdev_desc_get_bdev(desc); 3525 struct spdk_bdev_io *bdev_io; 3526 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 3527 3528 if (!desc->write) { 3529 return -EBADF; 3530 } 3531 3532 if (!bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) { 3533 return -EINVAL; 3534 } 3535 3536 bdev_io = bdev_channel_get_io(channel); 3537 if (!bdev_io) { 3538 return -ENOMEM; 3539 } 3540 3541 bdev_io->internal.ch = channel; 3542 bdev_io->internal.desc = desc; 3543 bdev_io->type = SPDK_BDEV_IO_TYPE_WRITE; 3544 bdev_io->u.bdev.iovs = &bdev_io->iov; 3545 bdev_io->u.bdev.iovs[0].iov_base = buf; 3546 bdev_io->u.bdev.iovs[0].iov_len = num_blocks * bdev->blocklen; 3547 bdev_io->u.bdev.iovcnt = 1; 3548 bdev_io->u.bdev.md_buf = md_buf; 3549 bdev_io->u.bdev.num_blocks = num_blocks; 3550 bdev_io->u.bdev.offset_blocks = offset_blocks; 3551 bdev_io_init(bdev_io, bdev, cb_arg, cb); 3552 3553 bdev_io_submit(bdev_io); 3554 return 0; 3555 } 3556 3557 int 3558 spdk_bdev_write(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 3559 void *buf, uint64_t offset, uint64_t nbytes, 3560 spdk_bdev_io_completion_cb cb, void *cb_arg) 3561 { 3562 uint64_t offset_blocks, num_blocks; 3563 3564 if (bdev_bytes_to_blocks(spdk_bdev_desc_get_bdev(desc), offset, &offset_blocks, 3565 nbytes, &num_blocks) != 0) { 3566 return -EINVAL; 3567 } 3568 3569 return spdk_bdev_write_blocks(desc, ch, buf, offset_blocks, num_blocks, cb, cb_arg); 3570 } 3571 3572 int 3573 spdk_bdev_write_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 3574 void *buf, uint64_t offset_blocks, uint64_t num_blocks, 3575 spdk_bdev_io_completion_cb cb, void *cb_arg) 3576 { 3577 return bdev_write_blocks_with_md(desc, ch, buf, NULL, offset_blocks, num_blocks, 3578 cb, cb_arg); 3579 } 3580 3581 int 3582 spdk_bdev_write_blocks_with_md(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 3583 void *buf, void *md_buf, uint64_t offset_blocks, uint64_t num_blocks, 3584 spdk_bdev_io_completion_cb cb, void *cb_arg) 3585 { 3586 struct iovec iov = { 3587 .iov_base = buf, 3588 }; 3589 3590 if (!spdk_bdev_is_md_separate(spdk_bdev_desc_get_bdev(desc))) { 3591 return -EINVAL; 3592 } 3593 3594 if (!_bdev_io_check_md_buf(&iov, md_buf)) { 3595 return -EINVAL; 3596 } 3597 3598 return bdev_write_blocks_with_md(desc, ch, buf, md_buf, offset_blocks, num_blocks, 3599 cb, cb_arg); 3600 } 3601 3602 static int 3603 bdev_writev_blocks_with_md(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 3604 struct iovec *iov, int iovcnt, void *md_buf, 3605 uint64_t offset_blocks, uint64_t num_blocks, 3606 spdk_bdev_io_completion_cb cb, void *cb_arg) 3607 { 3608 struct spdk_bdev *bdev = spdk_bdev_desc_get_bdev(desc); 3609 struct spdk_bdev_io *bdev_io; 3610 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 3611 3612 if (!desc->write) { 3613 return -EBADF; 3614 } 3615 3616 if (!bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) { 3617 return -EINVAL; 3618 } 3619 3620 bdev_io = bdev_channel_get_io(channel); 3621 if (!bdev_io) { 3622 return -ENOMEM; 3623 } 3624 3625 bdev_io->internal.ch = channel; 3626 bdev_io->internal.desc = desc; 3627 bdev_io->type = SPDK_BDEV_IO_TYPE_WRITE; 3628 bdev_io->u.bdev.iovs = iov; 3629 bdev_io->u.bdev.iovcnt = iovcnt; 3630 bdev_io->u.bdev.md_buf = md_buf; 3631 bdev_io->u.bdev.num_blocks = num_blocks; 3632 bdev_io->u.bdev.offset_blocks = offset_blocks; 3633 bdev_io_init(bdev_io, bdev, cb_arg, cb); 3634 3635 bdev_io_submit(bdev_io); 3636 return 0; 3637 } 3638 3639 int 3640 spdk_bdev_writev(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 3641 struct iovec *iov, int iovcnt, 3642 uint64_t offset, uint64_t len, 3643 spdk_bdev_io_completion_cb cb, void *cb_arg) 3644 { 3645 uint64_t offset_blocks, num_blocks; 3646 3647 if (bdev_bytes_to_blocks(spdk_bdev_desc_get_bdev(desc), offset, &offset_blocks, 3648 len, &num_blocks) != 0) { 3649 return -EINVAL; 3650 } 3651 3652 return spdk_bdev_writev_blocks(desc, ch, iov, iovcnt, offset_blocks, num_blocks, cb, cb_arg); 3653 } 3654 3655 int 3656 spdk_bdev_writev_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 3657 struct iovec *iov, int iovcnt, 3658 uint64_t offset_blocks, uint64_t num_blocks, 3659 spdk_bdev_io_completion_cb cb, void *cb_arg) 3660 { 3661 return bdev_writev_blocks_with_md(desc, ch, iov, iovcnt, NULL, offset_blocks, 3662 num_blocks, cb, cb_arg); 3663 } 3664 3665 int 3666 spdk_bdev_writev_blocks_with_md(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 3667 struct iovec *iov, int iovcnt, void *md_buf, 3668 uint64_t offset_blocks, uint64_t num_blocks, 3669 spdk_bdev_io_completion_cb cb, void *cb_arg) 3670 { 3671 if (!spdk_bdev_is_md_separate(spdk_bdev_desc_get_bdev(desc))) { 3672 return -EINVAL; 3673 } 3674 3675 if (!_bdev_io_check_md_buf(iov, md_buf)) { 3676 return -EINVAL; 3677 } 3678 3679 return bdev_writev_blocks_with_md(desc, ch, iov, iovcnt, md_buf, offset_blocks, 3680 num_blocks, cb, cb_arg); 3681 } 3682 3683 static void 3684 bdev_compare_do_read_done(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg) 3685 { 3686 struct spdk_bdev_io *parent_io = cb_arg; 3687 uint8_t *read_buf = bdev_io->u.bdev.iovs[0].iov_base; 3688 int i, rc = 0; 3689 3690 if (!success) { 3691 parent_io->internal.status = SPDK_BDEV_IO_STATUS_FAILED; 3692 parent_io->internal.cb(parent_io, false, parent_io->internal.caller_ctx); 3693 spdk_bdev_free_io(bdev_io); 3694 return; 3695 } 3696 3697 for (i = 0; i < parent_io->u.bdev.iovcnt; i++) { 3698 rc = memcmp(read_buf, 3699 parent_io->u.bdev.iovs[i].iov_base, 3700 parent_io->u.bdev.iovs[i].iov_len); 3701 if (rc) { 3702 break; 3703 } 3704 read_buf += parent_io->u.bdev.iovs[i].iov_len; 3705 } 3706 3707 spdk_bdev_free_io(bdev_io); 3708 3709 if (rc == 0) { 3710 parent_io->internal.status = SPDK_BDEV_IO_STATUS_SUCCESS; 3711 parent_io->internal.cb(parent_io, true, parent_io->internal.caller_ctx); 3712 } else { 3713 parent_io->internal.status = SPDK_BDEV_IO_STATUS_MISCOMPARE; 3714 parent_io->internal.cb(parent_io, false, parent_io->internal.caller_ctx); 3715 } 3716 } 3717 3718 static void 3719 bdev_compare_do_read(void *_bdev_io) 3720 { 3721 struct spdk_bdev_io *bdev_io = _bdev_io; 3722 int rc; 3723 3724 rc = spdk_bdev_read_blocks(bdev_io->internal.desc, 3725 spdk_io_channel_from_ctx(bdev_io->internal.ch), NULL, 3726 bdev_io->u.bdev.offset_blocks, bdev_io->u.bdev.num_blocks, 3727 bdev_compare_do_read_done, bdev_io); 3728 3729 if (rc == -ENOMEM) { 3730 bdev_queue_io_wait_with_cb(bdev_io, bdev_compare_do_read); 3731 } else if (rc != 0) { 3732 bdev_io->internal.status = SPDK_BDEV_IO_STATUS_FAILED; 3733 bdev_io->internal.cb(bdev_io, false, bdev_io->internal.caller_ctx); 3734 } 3735 } 3736 3737 static int 3738 bdev_comparev_blocks_with_md(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 3739 struct iovec *iov, int iovcnt, void *md_buf, 3740 uint64_t offset_blocks, uint64_t num_blocks, 3741 spdk_bdev_io_completion_cb cb, void *cb_arg) 3742 { 3743 struct spdk_bdev *bdev = spdk_bdev_desc_get_bdev(desc); 3744 struct spdk_bdev_io *bdev_io; 3745 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 3746 3747 if (!bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) { 3748 return -EINVAL; 3749 } 3750 3751 bdev_io = bdev_channel_get_io(channel); 3752 if (!bdev_io) { 3753 return -ENOMEM; 3754 } 3755 3756 bdev_io->internal.ch = channel; 3757 bdev_io->internal.desc = desc; 3758 bdev_io->type = SPDK_BDEV_IO_TYPE_COMPARE; 3759 bdev_io->u.bdev.iovs = iov; 3760 bdev_io->u.bdev.iovcnt = iovcnt; 3761 bdev_io->u.bdev.md_buf = md_buf; 3762 bdev_io->u.bdev.num_blocks = num_blocks; 3763 bdev_io->u.bdev.offset_blocks = offset_blocks; 3764 bdev_io_init(bdev_io, bdev, cb_arg, cb); 3765 3766 if (bdev_io_type_supported(bdev, SPDK_BDEV_IO_TYPE_COMPARE)) { 3767 bdev_io_submit(bdev_io); 3768 return 0; 3769 } 3770 3771 bdev_compare_do_read(bdev_io); 3772 3773 return 0; 3774 } 3775 3776 int 3777 spdk_bdev_comparev_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 3778 struct iovec *iov, int iovcnt, 3779 uint64_t offset_blocks, uint64_t num_blocks, 3780 spdk_bdev_io_completion_cb cb, void *cb_arg) 3781 { 3782 return bdev_comparev_blocks_with_md(desc, ch, iov, iovcnt, NULL, offset_blocks, 3783 num_blocks, cb, cb_arg); 3784 } 3785 3786 int 3787 spdk_bdev_comparev_blocks_with_md(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 3788 struct iovec *iov, int iovcnt, void *md_buf, 3789 uint64_t offset_blocks, uint64_t num_blocks, 3790 spdk_bdev_io_completion_cb cb, void *cb_arg) 3791 { 3792 if (!spdk_bdev_is_md_separate(spdk_bdev_desc_get_bdev(desc))) { 3793 return -EINVAL; 3794 } 3795 3796 if (!_bdev_io_check_md_buf(iov, md_buf)) { 3797 return -EINVAL; 3798 } 3799 3800 return bdev_comparev_blocks_with_md(desc, ch, iov, iovcnt, md_buf, offset_blocks, 3801 num_blocks, cb, cb_arg); 3802 } 3803 3804 static int 3805 bdev_compare_blocks_with_md(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 3806 void *buf, void *md_buf, uint64_t offset_blocks, uint64_t num_blocks, 3807 spdk_bdev_io_completion_cb cb, void *cb_arg) 3808 { 3809 struct spdk_bdev *bdev = spdk_bdev_desc_get_bdev(desc); 3810 struct spdk_bdev_io *bdev_io; 3811 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 3812 3813 if (!bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) { 3814 return -EINVAL; 3815 } 3816 3817 bdev_io = bdev_channel_get_io(channel); 3818 if (!bdev_io) { 3819 return -ENOMEM; 3820 } 3821 3822 bdev_io->internal.ch = channel; 3823 bdev_io->internal.desc = desc; 3824 bdev_io->type = SPDK_BDEV_IO_TYPE_COMPARE; 3825 bdev_io->u.bdev.iovs = &bdev_io->iov; 3826 bdev_io->u.bdev.iovs[0].iov_base = buf; 3827 bdev_io->u.bdev.iovs[0].iov_len = num_blocks * bdev->blocklen; 3828 bdev_io->u.bdev.iovcnt = 1; 3829 bdev_io->u.bdev.md_buf = md_buf; 3830 bdev_io->u.bdev.num_blocks = num_blocks; 3831 bdev_io->u.bdev.offset_blocks = offset_blocks; 3832 bdev_io_init(bdev_io, bdev, cb_arg, cb); 3833 3834 if (bdev_io_type_supported(bdev, SPDK_BDEV_IO_TYPE_COMPARE)) { 3835 bdev_io_submit(bdev_io); 3836 return 0; 3837 } 3838 3839 bdev_compare_do_read(bdev_io); 3840 3841 return 0; 3842 } 3843 3844 int 3845 spdk_bdev_compare_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 3846 void *buf, uint64_t offset_blocks, uint64_t num_blocks, 3847 spdk_bdev_io_completion_cb cb, void *cb_arg) 3848 { 3849 return bdev_compare_blocks_with_md(desc, ch, buf, NULL, offset_blocks, num_blocks, 3850 cb, cb_arg); 3851 } 3852 3853 int 3854 spdk_bdev_compare_blocks_with_md(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 3855 void *buf, void *md_buf, uint64_t offset_blocks, uint64_t num_blocks, 3856 spdk_bdev_io_completion_cb cb, void *cb_arg) 3857 { 3858 struct iovec iov = { 3859 .iov_base = buf, 3860 }; 3861 3862 if (!spdk_bdev_is_md_separate(spdk_bdev_desc_get_bdev(desc))) { 3863 return -EINVAL; 3864 } 3865 3866 if (!_bdev_io_check_md_buf(&iov, md_buf)) { 3867 return -EINVAL; 3868 } 3869 3870 return bdev_compare_blocks_with_md(desc, ch, buf, md_buf, offset_blocks, num_blocks, 3871 cb, cb_arg); 3872 } 3873 3874 static void 3875 bdev_comparev_and_writev_blocks_unlocked(void *ctx, int unlock_status) 3876 { 3877 struct spdk_bdev_io *bdev_io = ctx; 3878 3879 if (unlock_status) { 3880 SPDK_ERRLOG("LBA range unlock failed\n"); 3881 } 3882 3883 bdev_io->internal.cb(bdev_io, bdev_io->internal.status == SPDK_BDEV_IO_STATUS_SUCCESS ? true : 3884 false, bdev_io->internal.caller_ctx); 3885 } 3886 3887 static void 3888 bdev_comparev_and_writev_blocks_unlock(struct spdk_bdev_io *bdev_io, int status) 3889 { 3890 bdev_io->internal.status = status; 3891 3892 bdev_unlock_lba_range(bdev_io->internal.desc, spdk_io_channel_from_ctx(bdev_io->internal.ch), 3893 bdev_io->u.bdev.offset_blocks, bdev_io->u.bdev.num_blocks, 3894 bdev_comparev_and_writev_blocks_unlocked, bdev_io); 3895 } 3896 3897 static void 3898 bdev_compare_and_write_do_write_done(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg) 3899 { 3900 struct spdk_bdev_io *parent_io = cb_arg; 3901 3902 if (!success) { 3903 SPDK_ERRLOG("Compare and write operation failed\n"); 3904 } 3905 3906 spdk_bdev_free_io(bdev_io); 3907 3908 bdev_comparev_and_writev_blocks_unlock(parent_io, 3909 success ? SPDK_BDEV_IO_STATUS_SUCCESS : SPDK_BDEV_IO_STATUS_FAILED); 3910 } 3911 3912 static void 3913 bdev_compare_and_write_do_write(void *_bdev_io) 3914 { 3915 struct spdk_bdev_io *bdev_io = _bdev_io; 3916 int rc; 3917 3918 rc = spdk_bdev_writev_blocks(bdev_io->internal.desc, 3919 spdk_io_channel_from_ctx(bdev_io->internal.ch), 3920 bdev_io->u.bdev.fused_iovs, bdev_io->u.bdev.fused_iovcnt, 3921 bdev_io->u.bdev.offset_blocks, bdev_io->u.bdev.num_blocks, 3922 bdev_compare_and_write_do_write_done, bdev_io); 3923 3924 3925 if (rc == -ENOMEM) { 3926 bdev_queue_io_wait_with_cb(bdev_io, bdev_compare_and_write_do_write); 3927 } else if (rc != 0) { 3928 bdev_comparev_and_writev_blocks_unlock(bdev_io, SPDK_BDEV_IO_STATUS_FAILED); 3929 } 3930 } 3931 3932 static void 3933 bdev_compare_and_write_do_compare_done(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg) 3934 { 3935 struct spdk_bdev_io *parent_io = cb_arg; 3936 3937 spdk_bdev_free_io(bdev_io); 3938 3939 if (!success) { 3940 bdev_comparev_and_writev_blocks_unlock(parent_io, SPDK_BDEV_IO_STATUS_MISCOMPARE); 3941 return; 3942 } 3943 3944 bdev_compare_and_write_do_write(parent_io); 3945 } 3946 3947 static void 3948 bdev_compare_and_write_do_compare(void *_bdev_io) 3949 { 3950 struct spdk_bdev_io *bdev_io = _bdev_io; 3951 int rc; 3952 3953 rc = spdk_bdev_comparev_blocks(bdev_io->internal.desc, 3954 spdk_io_channel_from_ctx(bdev_io->internal.ch), bdev_io->u.bdev.iovs, 3955 bdev_io->u.bdev.iovcnt, bdev_io->u.bdev.offset_blocks, bdev_io->u.bdev.num_blocks, 3956 bdev_compare_and_write_do_compare_done, bdev_io); 3957 3958 if (rc == -ENOMEM) { 3959 bdev_queue_io_wait_with_cb(bdev_io, bdev_compare_and_write_do_compare); 3960 } else if (rc != 0) { 3961 bdev_comparev_and_writev_blocks_unlock(bdev_io, SPDK_BDEV_IO_STATUS_FIRST_FUSED_FAILED); 3962 } 3963 } 3964 3965 static void 3966 bdev_comparev_and_writev_blocks_locked(void *ctx, int status) 3967 { 3968 struct spdk_bdev_io *bdev_io = ctx; 3969 3970 if (status) { 3971 bdev_io->internal.status = SPDK_BDEV_IO_STATUS_FIRST_FUSED_FAILED; 3972 bdev_io->internal.cb(bdev_io, false, bdev_io->internal.caller_ctx); 3973 return; 3974 } 3975 3976 bdev_compare_and_write_do_compare(bdev_io); 3977 } 3978 3979 int 3980 spdk_bdev_comparev_and_writev_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 3981 struct iovec *compare_iov, int compare_iovcnt, 3982 struct iovec *write_iov, int write_iovcnt, 3983 uint64_t offset_blocks, uint64_t num_blocks, 3984 spdk_bdev_io_completion_cb cb, void *cb_arg) 3985 { 3986 struct spdk_bdev *bdev = spdk_bdev_desc_get_bdev(desc); 3987 struct spdk_bdev_io *bdev_io; 3988 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 3989 3990 if (!desc->write) { 3991 return -EBADF; 3992 } 3993 3994 if (!bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) { 3995 return -EINVAL; 3996 } 3997 3998 if (num_blocks > bdev->acwu) { 3999 return -EINVAL; 4000 } 4001 4002 bdev_io = bdev_channel_get_io(channel); 4003 if (!bdev_io) { 4004 return -ENOMEM; 4005 } 4006 4007 bdev_io->internal.ch = channel; 4008 bdev_io->internal.desc = desc; 4009 bdev_io->type = SPDK_BDEV_IO_TYPE_COMPARE_AND_WRITE; 4010 bdev_io->u.bdev.iovs = compare_iov; 4011 bdev_io->u.bdev.iovcnt = compare_iovcnt; 4012 bdev_io->u.bdev.fused_iovs = write_iov; 4013 bdev_io->u.bdev.fused_iovcnt = write_iovcnt; 4014 bdev_io->u.bdev.md_buf = NULL; 4015 bdev_io->u.bdev.num_blocks = num_blocks; 4016 bdev_io->u.bdev.offset_blocks = offset_blocks; 4017 bdev_io_init(bdev_io, bdev, cb_arg, cb); 4018 4019 if (bdev_io_type_supported(bdev, SPDK_BDEV_IO_TYPE_COMPARE_AND_WRITE)) { 4020 bdev_io_submit(bdev_io); 4021 return 0; 4022 } 4023 4024 return bdev_lock_lba_range(desc, ch, offset_blocks, num_blocks, 4025 bdev_comparev_and_writev_blocks_locked, bdev_io); 4026 } 4027 4028 static void 4029 bdev_zcopy_get_buf(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io, bool success) 4030 { 4031 if (!success) { 4032 /* Don't use spdk_bdev_io_complete here - this bdev_io was never actually submitted. */ 4033 bdev_io->internal.status = SPDK_BDEV_IO_STATUS_NOMEM; 4034 bdev_io->internal.cb(bdev_io, success, bdev_io->internal.caller_ctx); 4035 return; 4036 } 4037 4038 if (bdev_io->u.bdev.zcopy.populate) { 4039 /* Read the real data into the buffer */ 4040 bdev_io->type = SPDK_BDEV_IO_TYPE_READ; 4041 bdev_io->internal.status = SPDK_BDEV_IO_STATUS_PENDING; 4042 bdev_io_submit(bdev_io); 4043 return; 4044 } 4045 4046 /* Don't use spdk_bdev_io_complete here - this bdev_io was never actually submitted. */ 4047 bdev_io->internal.status = SPDK_BDEV_IO_STATUS_SUCCESS; 4048 bdev_io->internal.cb(bdev_io, success, bdev_io->internal.caller_ctx); 4049 } 4050 4051 int 4052 spdk_bdev_zcopy_start(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 4053 uint64_t offset_blocks, uint64_t num_blocks, 4054 bool populate, 4055 spdk_bdev_io_completion_cb cb, void *cb_arg) 4056 { 4057 struct spdk_bdev *bdev = spdk_bdev_desc_get_bdev(desc); 4058 struct spdk_bdev_io *bdev_io; 4059 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 4060 4061 if (!desc->write) { 4062 return -EBADF; 4063 } 4064 4065 if (!bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) { 4066 return -EINVAL; 4067 } 4068 4069 if (!spdk_bdev_io_type_supported(bdev, SPDK_BDEV_IO_TYPE_ZCOPY)) { 4070 return -ENOTSUP; 4071 } 4072 4073 bdev_io = bdev_channel_get_io(channel); 4074 if (!bdev_io) { 4075 return -ENOMEM; 4076 } 4077 4078 bdev_io->internal.ch = channel; 4079 bdev_io->internal.desc = desc; 4080 bdev_io->type = SPDK_BDEV_IO_TYPE_ZCOPY; 4081 bdev_io->u.bdev.num_blocks = num_blocks; 4082 bdev_io->u.bdev.offset_blocks = offset_blocks; 4083 bdev_io->u.bdev.iovs = NULL; 4084 bdev_io->u.bdev.iovcnt = 0; 4085 bdev_io->u.bdev.md_buf = NULL; 4086 bdev_io->u.bdev.zcopy.populate = populate ? 1 : 0; 4087 bdev_io->u.bdev.zcopy.commit = 0; 4088 bdev_io->u.bdev.zcopy.start = 1; 4089 bdev_io_init(bdev_io, bdev, cb_arg, cb); 4090 4091 if (bdev_io_type_supported(bdev, SPDK_BDEV_IO_TYPE_ZCOPY)) { 4092 bdev_io_submit(bdev_io); 4093 } else { 4094 /* Emulate zcopy by allocating a buffer */ 4095 spdk_bdev_io_get_buf(bdev_io, bdev_zcopy_get_buf, 4096 bdev_io->u.bdev.num_blocks * bdev->blocklen); 4097 } 4098 4099 return 0; 4100 } 4101 4102 int 4103 spdk_bdev_zcopy_end(struct spdk_bdev_io *bdev_io, bool commit, 4104 spdk_bdev_io_completion_cb cb, void *cb_arg) 4105 { 4106 struct spdk_bdev *bdev = bdev_io->bdev; 4107 4108 if (bdev_io->type == SPDK_BDEV_IO_TYPE_READ) { 4109 /* This can happen if the zcopy was emulated in start */ 4110 if (bdev_io->u.bdev.zcopy.start != 1) { 4111 return -EINVAL; 4112 } 4113 bdev_io->type = SPDK_BDEV_IO_TYPE_ZCOPY; 4114 } 4115 4116 if (bdev_io->type != SPDK_BDEV_IO_TYPE_ZCOPY) { 4117 return -EINVAL; 4118 } 4119 4120 bdev_io->u.bdev.zcopy.commit = commit ? 1 : 0; 4121 bdev_io->u.bdev.zcopy.start = 0; 4122 bdev_io->internal.caller_ctx = cb_arg; 4123 bdev_io->internal.cb = cb; 4124 bdev_io->internal.status = SPDK_BDEV_IO_STATUS_PENDING; 4125 4126 if (bdev_io_type_supported(bdev, SPDK_BDEV_IO_TYPE_ZCOPY)) { 4127 bdev_io_submit(bdev_io); 4128 return 0; 4129 } 4130 4131 if (!bdev_io->u.bdev.zcopy.commit) { 4132 /* Don't use spdk_bdev_io_complete here - this bdev_io was never actually submitted. */ 4133 bdev_io->internal.status = SPDK_BDEV_IO_STATUS_SUCCESS; 4134 bdev_io->internal.cb(bdev_io, true, bdev_io->internal.caller_ctx); 4135 return 0; 4136 } 4137 4138 bdev_io->type = SPDK_BDEV_IO_TYPE_WRITE; 4139 bdev_io_submit(bdev_io); 4140 4141 return 0; 4142 } 4143 4144 int 4145 spdk_bdev_write_zeroes(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 4146 uint64_t offset, uint64_t len, 4147 spdk_bdev_io_completion_cb cb, void *cb_arg) 4148 { 4149 uint64_t offset_blocks, num_blocks; 4150 4151 if (bdev_bytes_to_blocks(spdk_bdev_desc_get_bdev(desc), offset, &offset_blocks, 4152 len, &num_blocks) != 0) { 4153 return -EINVAL; 4154 } 4155 4156 return spdk_bdev_write_zeroes_blocks(desc, ch, offset_blocks, num_blocks, cb, cb_arg); 4157 } 4158 4159 int 4160 spdk_bdev_write_zeroes_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 4161 uint64_t offset_blocks, uint64_t num_blocks, 4162 spdk_bdev_io_completion_cb cb, void *cb_arg) 4163 { 4164 struct spdk_bdev *bdev = spdk_bdev_desc_get_bdev(desc); 4165 struct spdk_bdev_io *bdev_io; 4166 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 4167 4168 if (!desc->write) { 4169 return -EBADF; 4170 } 4171 4172 if (!bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) { 4173 return -EINVAL; 4174 } 4175 4176 if (!bdev_io_type_supported(bdev, SPDK_BDEV_IO_TYPE_WRITE_ZEROES) && 4177 !bdev_io_type_supported(bdev, SPDK_BDEV_IO_TYPE_WRITE)) { 4178 return -ENOTSUP; 4179 } 4180 4181 bdev_io = bdev_channel_get_io(channel); 4182 4183 if (!bdev_io) { 4184 return -ENOMEM; 4185 } 4186 4187 bdev_io->type = SPDK_BDEV_IO_TYPE_WRITE_ZEROES; 4188 bdev_io->internal.ch = channel; 4189 bdev_io->internal.desc = desc; 4190 bdev_io->u.bdev.offset_blocks = offset_blocks; 4191 bdev_io->u.bdev.num_blocks = num_blocks; 4192 bdev_io_init(bdev_io, bdev, cb_arg, cb); 4193 4194 if (bdev_io_type_supported(bdev, SPDK_BDEV_IO_TYPE_WRITE_ZEROES)) { 4195 bdev_io_submit(bdev_io); 4196 return 0; 4197 } 4198 4199 assert(bdev_io_type_supported(bdev, SPDK_BDEV_IO_TYPE_WRITE)); 4200 assert(_bdev_get_block_size_with_md(bdev) <= ZERO_BUFFER_SIZE); 4201 bdev_io->u.bdev.split_remaining_num_blocks = num_blocks; 4202 bdev_io->u.bdev.split_current_offset_blocks = offset_blocks; 4203 bdev_write_zero_buffer_next(bdev_io); 4204 4205 return 0; 4206 } 4207 4208 int 4209 spdk_bdev_unmap(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 4210 uint64_t offset, uint64_t nbytes, 4211 spdk_bdev_io_completion_cb cb, void *cb_arg) 4212 { 4213 uint64_t offset_blocks, num_blocks; 4214 4215 if (bdev_bytes_to_blocks(spdk_bdev_desc_get_bdev(desc), offset, &offset_blocks, 4216 nbytes, &num_blocks) != 0) { 4217 return -EINVAL; 4218 } 4219 4220 return spdk_bdev_unmap_blocks(desc, ch, offset_blocks, num_blocks, cb, cb_arg); 4221 } 4222 4223 int 4224 spdk_bdev_unmap_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 4225 uint64_t offset_blocks, uint64_t num_blocks, 4226 spdk_bdev_io_completion_cb cb, void *cb_arg) 4227 { 4228 struct spdk_bdev *bdev = spdk_bdev_desc_get_bdev(desc); 4229 struct spdk_bdev_io *bdev_io; 4230 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 4231 4232 if (!desc->write) { 4233 return -EBADF; 4234 } 4235 4236 if (!bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) { 4237 return -EINVAL; 4238 } 4239 4240 if (num_blocks == 0) { 4241 SPDK_ERRLOG("Can't unmap 0 bytes\n"); 4242 return -EINVAL; 4243 } 4244 4245 bdev_io = bdev_channel_get_io(channel); 4246 if (!bdev_io) { 4247 return -ENOMEM; 4248 } 4249 4250 bdev_io->internal.ch = channel; 4251 bdev_io->internal.desc = desc; 4252 bdev_io->type = SPDK_BDEV_IO_TYPE_UNMAP; 4253 4254 bdev_io->u.bdev.iovs = &bdev_io->iov; 4255 bdev_io->u.bdev.iovs[0].iov_base = NULL; 4256 bdev_io->u.bdev.iovs[0].iov_len = 0; 4257 bdev_io->u.bdev.iovcnt = 1; 4258 4259 bdev_io->u.bdev.offset_blocks = offset_blocks; 4260 bdev_io->u.bdev.num_blocks = num_blocks; 4261 bdev_io_init(bdev_io, bdev, cb_arg, cb); 4262 4263 bdev_io_submit(bdev_io); 4264 return 0; 4265 } 4266 4267 int 4268 spdk_bdev_flush(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 4269 uint64_t offset, uint64_t length, 4270 spdk_bdev_io_completion_cb cb, void *cb_arg) 4271 { 4272 uint64_t offset_blocks, num_blocks; 4273 4274 if (bdev_bytes_to_blocks(spdk_bdev_desc_get_bdev(desc), offset, &offset_blocks, 4275 length, &num_blocks) != 0) { 4276 return -EINVAL; 4277 } 4278 4279 return spdk_bdev_flush_blocks(desc, ch, offset_blocks, num_blocks, cb, cb_arg); 4280 } 4281 4282 int 4283 spdk_bdev_flush_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 4284 uint64_t offset_blocks, uint64_t num_blocks, 4285 spdk_bdev_io_completion_cb cb, void *cb_arg) 4286 { 4287 struct spdk_bdev *bdev = spdk_bdev_desc_get_bdev(desc); 4288 struct spdk_bdev_io *bdev_io; 4289 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 4290 4291 if (!desc->write) { 4292 return -EBADF; 4293 } 4294 4295 if (!bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) { 4296 return -EINVAL; 4297 } 4298 4299 bdev_io = bdev_channel_get_io(channel); 4300 if (!bdev_io) { 4301 return -ENOMEM; 4302 } 4303 4304 bdev_io->internal.ch = channel; 4305 bdev_io->internal.desc = desc; 4306 bdev_io->type = SPDK_BDEV_IO_TYPE_FLUSH; 4307 bdev_io->u.bdev.iovs = NULL; 4308 bdev_io->u.bdev.iovcnt = 0; 4309 bdev_io->u.bdev.offset_blocks = offset_blocks; 4310 bdev_io->u.bdev.num_blocks = num_blocks; 4311 bdev_io_init(bdev_io, bdev, cb_arg, cb); 4312 4313 bdev_io_submit(bdev_io); 4314 return 0; 4315 } 4316 4317 static void 4318 bdev_reset_dev(struct spdk_io_channel_iter *i, int status) 4319 { 4320 struct spdk_bdev_channel *ch = spdk_io_channel_iter_get_ctx(i); 4321 struct spdk_bdev_io *bdev_io; 4322 4323 bdev_io = TAILQ_FIRST(&ch->queued_resets); 4324 TAILQ_REMOVE(&ch->queued_resets, bdev_io, internal.link); 4325 bdev_io_submit_reset(bdev_io); 4326 } 4327 4328 static void 4329 bdev_reset_freeze_channel(struct spdk_io_channel_iter *i) 4330 { 4331 struct spdk_io_channel *ch; 4332 struct spdk_bdev_channel *channel; 4333 struct spdk_bdev_mgmt_channel *mgmt_channel; 4334 struct spdk_bdev_shared_resource *shared_resource; 4335 bdev_io_tailq_t tmp_queued; 4336 4337 TAILQ_INIT(&tmp_queued); 4338 4339 ch = spdk_io_channel_iter_get_channel(i); 4340 channel = spdk_io_channel_get_ctx(ch); 4341 shared_resource = channel->shared_resource; 4342 mgmt_channel = shared_resource->mgmt_ch; 4343 4344 channel->flags |= BDEV_CH_RESET_IN_PROGRESS; 4345 4346 if ((channel->flags & BDEV_CH_QOS_ENABLED) != 0) { 4347 /* The QoS object is always valid and readable while 4348 * the channel flag is set, so the lock here should not 4349 * be necessary. We're not in the fast path though, so 4350 * just take it anyway. */ 4351 pthread_mutex_lock(&channel->bdev->internal.mutex); 4352 if (channel->bdev->internal.qos->ch == channel) { 4353 TAILQ_SWAP(&channel->bdev->internal.qos->queued, &tmp_queued, spdk_bdev_io, internal.link); 4354 } 4355 pthread_mutex_unlock(&channel->bdev->internal.mutex); 4356 } 4357 4358 bdev_abort_all_queued_io(&shared_resource->nomem_io, channel); 4359 bdev_abort_all_buf_io(&mgmt_channel->need_buf_small, channel); 4360 bdev_abort_all_buf_io(&mgmt_channel->need_buf_large, channel); 4361 bdev_abort_all_queued_io(&tmp_queued, channel); 4362 4363 spdk_for_each_channel_continue(i, 0); 4364 } 4365 4366 static void 4367 bdev_start_reset(void *ctx) 4368 { 4369 struct spdk_bdev_channel *ch = ctx; 4370 4371 spdk_for_each_channel(__bdev_to_io_dev(ch->bdev), bdev_reset_freeze_channel, 4372 ch, bdev_reset_dev); 4373 } 4374 4375 static void 4376 bdev_channel_start_reset(struct spdk_bdev_channel *ch) 4377 { 4378 struct spdk_bdev *bdev = ch->bdev; 4379 4380 assert(!TAILQ_EMPTY(&ch->queued_resets)); 4381 4382 pthread_mutex_lock(&bdev->internal.mutex); 4383 if (bdev->internal.reset_in_progress == NULL) { 4384 bdev->internal.reset_in_progress = TAILQ_FIRST(&ch->queued_resets); 4385 /* 4386 * Take a channel reference for the target bdev for the life of this 4387 * reset. This guards against the channel getting destroyed while 4388 * spdk_for_each_channel() calls related to this reset IO are in 4389 * progress. We will release the reference when this reset is 4390 * completed. 4391 */ 4392 bdev->internal.reset_in_progress->u.reset.ch_ref = spdk_get_io_channel(__bdev_to_io_dev(bdev)); 4393 bdev_start_reset(ch); 4394 } 4395 pthread_mutex_unlock(&bdev->internal.mutex); 4396 } 4397 4398 int 4399 spdk_bdev_reset(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 4400 spdk_bdev_io_completion_cb cb, void *cb_arg) 4401 { 4402 struct spdk_bdev *bdev = spdk_bdev_desc_get_bdev(desc); 4403 struct spdk_bdev_io *bdev_io; 4404 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 4405 4406 bdev_io = bdev_channel_get_io(channel); 4407 if (!bdev_io) { 4408 return -ENOMEM; 4409 } 4410 4411 bdev_io->internal.ch = channel; 4412 bdev_io->internal.desc = desc; 4413 bdev_io->internal.submit_tsc = spdk_get_ticks(); 4414 bdev_io->type = SPDK_BDEV_IO_TYPE_RESET; 4415 bdev_io->u.reset.ch_ref = NULL; 4416 bdev_io_init(bdev_io, bdev, cb_arg, cb); 4417 4418 pthread_mutex_lock(&bdev->internal.mutex); 4419 TAILQ_INSERT_TAIL(&channel->queued_resets, bdev_io, internal.link); 4420 pthread_mutex_unlock(&bdev->internal.mutex); 4421 4422 TAILQ_INSERT_TAIL(&bdev_io->internal.ch->io_submitted, bdev_io, 4423 internal.ch_link); 4424 4425 bdev_channel_start_reset(channel); 4426 4427 return 0; 4428 } 4429 4430 void 4431 spdk_bdev_get_io_stat(struct spdk_bdev *bdev, struct spdk_io_channel *ch, 4432 struct spdk_bdev_io_stat *stat) 4433 { 4434 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 4435 4436 *stat = channel->stat; 4437 } 4438 4439 static void 4440 bdev_get_device_stat_done(struct spdk_io_channel_iter *i, int status) 4441 { 4442 void *io_device = spdk_io_channel_iter_get_io_device(i); 4443 struct spdk_bdev_iostat_ctx *bdev_iostat_ctx = spdk_io_channel_iter_get_ctx(i); 4444 4445 bdev_iostat_ctx->cb(__bdev_from_io_dev(io_device), bdev_iostat_ctx->stat, 4446 bdev_iostat_ctx->cb_arg, 0); 4447 free(bdev_iostat_ctx); 4448 } 4449 4450 static void 4451 bdev_get_each_channel_stat(struct spdk_io_channel_iter *i) 4452 { 4453 struct spdk_bdev_iostat_ctx *bdev_iostat_ctx = spdk_io_channel_iter_get_ctx(i); 4454 struct spdk_io_channel *ch = spdk_io_channel_iter_get_channel(i); 4455 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 4456 4457 bdev_io_stat_add(bdev_iostat_ctx->stat, &channel->stat); 4458 spdk_for_each_channel_continue(i, 0); 4459 } 4460 4461 void 4462 spdk_bdev_get_device_stat(struct spdk_bdev *bdev, struct spdk_bdev_io_stat *stat, 4463 spdk_bdev_get_device_stat_cb cb, void *cb_arg) 4464 { 4465 struct spdk_bdev_iostat_ctx *bdev_iostat_ctx; 4466 4467 assert(bdev != NULL); 4468 assert(stat != NULL); 4469 assert(cb != NULL); 4470 4471 bdev_iostat_ctx = calloc(1, sizeof(struct spdk_bdev_iostat_ctx)); 4472 if (bdev_iostat_ctx == NULL) { 4473 SPDK_ERRLOG("Unable to allocate memory for spdk_bdev_iostat_ctx\n"); 4474 cb(bdev, stat, cb_arg, -ENOMEM); 4475 return; 4476 } 4477 4478 bdev_iostat_ctx->stat = stat; 4479 bdev_iostat_ctx->cb = cb; 4480 bdev_iostat_ctx->cb_arg = cb_arg; 4481 4482 /* Start with the statistics from previously deleted channels. */ 4483 pthread_mutex_lock(&bdev->internal.mutex); 4484 bdev_io_stat_add(bdev_iostat_ctx->stat, &bdev->internal.stat); 4485 pthread_mutex_unlock(&bdev->internal.mutex); 4486 4487 /* Then iterate and add the statistics from each existing channel. */ 4488 spdk_for_each_channel(__bdev_to_io_dev(bdev), 4489 bdev_get_each_channel_stat, 4490 bdev_iostat_ctx, 4491 bdev_get_device_stat_done); 4492 } 4493 4494 int 4495 spdk_bdev_nvme_admin_passthru(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 4496 const struct spdk_nvme_cmd *cmd, void *buf, size_t nbytes, 4497 spdk_bdev_io_completion_cb cb, void *cb_arg) 4498 { 4499 struct spdk_bdev *bdev = spdk_bdev_desc_get_bdev(desc); 4500 struct spdk_bdev_io *bdev_io; 4501 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 4502 4503 if (!desc->write) { 4504 return -EBADF; 4505 } 4506 4507 bdev_io = bdev_channel_get_io(channel); 4508 if (!bdev_io) { 4509 return -ENOMEM; 4510 } 4511 4512 bdev_io->internal.ch = channel; 4513 bdev_io->internal.desc = desc; 4514 bdev_io->type = SPDK_BDEV_IO_TYPE_NVME_ADMIN; 4515 bdev_io->u.nvme_passthru.cmd = *cmd; 4516 bdev_io->u.nvme_passthru.buf = buf; 4517 bdev_io->u.nvme_passthru.nbytes = nbytes; 4518 bdev_io->u.nvme_passthru.md_buf = NULL; 4519 bdev_io->u.nvme_passthru.md_len = 0; 4520 4521 bdev_io_init(bdev_io, bdev, cb_arg, cb); 4522 4523 bdev_io_submit(bdev_io); 4524 return 0; 4525 } 4526 4527 int 4528 spdk_bdev_nvme_io_passthru(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 4529 const struct spdk_nvme_cmd *cmd, void *buf, size_t nbytes, 4530 spdk_bdev_io_completion_cb cb, void *cb_arg) 4531 { 4532 struct spdk_bdev *bdev = spdk_bdev_desc_get_bdev(desc); 4533 struct spdk_bdev_io *bdev_io; 4534 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 4535 4536 if (!desc->write) { 4537 /* 4538 * Do not try to parse the NVMe command - we could maybe use bits in the opcode 4539 * to easily determine if the command is a read or write, but for now just 4540 * do not allow io_passthru with a read-only descriptor. 4541 */ 4542 return -EBADF; 4543 } 4544 4545 bdev_io = bdev_channel_get_io(channel); 4546 if (!bdev_io) { 4547 return -ENOMEM; 4548 } 4549 4550 bdev_io->internal.ch = channel; 4551 bdev_io->internal.desc = desc; 4552 bdev_io->type = SPDK_BDEV_IO_TYPE_NVME_IO; 4553 bdev_io->u.nvme_passthru.cmd = *cmd; 4554 bdev_io->u.nvme_passthru.buf = buf; 4555 bdev_io->u.nvme_passthru.nbytes = nbytes; 4556 bdev_io->u.nvme_passthru.md_buf = NULL; 4557 bdev_io->u.nvme_passthru.md_len = 0; 4558 4559 bdev_io_init(bdev_io, bdev, cb_arg, cb); 4560 4561 bdev_io_submit(bdev_io); 4562 return 0; 4563 } 4564 4565 int 4566 spdk_bdev_nvme_io_passthru_md(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 4567 const struct spdk_nvme_cmd *cmd, void *buf, size_t nbytes, void *md_buf, size_t md_len, 4568 spdk_bdev_io_completion_cb cb, void *cb_arg) 4569 { 4570 struct spdk_bdev *bdev = spdk_bdev_desc_get_bdev(desc); 4571 struct spdk_bdev_io *bdev_io; 4572 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 4573 4574 if (!desc->write) { 4575 /* 4576 * Do not try to parse the NVMe command - we could maybe use bits in the opcode 4577 * to easily determine if the command is a read or write, but for now just 4578 * do not allow io_passthru with a read-only descriptor. 4579 */ 4580 return -EBADF; 4581 } 4582 4583 bdev_io = bdev_channel_get_io(channel); 4584 if (!bdev_io) { 4585 return -ENOMEM; 4586 } 4587 4588 bdev_io->internal.ch = channel; 4589 bdev_io->internal.desc = desc; 4590 bdev_io->type = SPDK_BDEV_IO_TYPE_NVME_IO_MD; 4591 bdev_io->u.nvme_passthru.cmd = *cmd; 4592 bdev_io->u.nvme_passthru.buf = buf; 4593 bdev_io->u.nvme_passthru.nbytes = nbytes; 4594 bdev_io->u.nvme_passthru.md_buf = md_buf; 4595 bdev_io->u.nvme_passthru.md_len = md_len; 4596 4597 bdev_io_init(bdev_io, bdev, cb_arg, cb); 4598 4599 bdev_io_submit(bdev_io); 4600 return 0; 4601 } 4602 4603 static void bdev_abort_retry(void *ctx); 4604 static void bdev_abort(struct spdk_bdev_io *parent_io); 4605 4606 static void 4607 bdev_abort_io_done(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg) 4608 { 4609 struct spdk_bdev_channel *channel = bdev_io->internal.ch; 4610 struct spdk_bdev_io *parent_io = cb_arg; 4611 struct spdk_bdev_io *bio_to_abort, *tmp_io; 4612 4613 bio_to_abort = bdev_io->u.abort.bio_to_abort; 4614 4615 spdk_bdev_free_io(bdev_io); 4616 4617 if (!success) { 4618 /* Check if the target I/O completed in the meantime. */ 4619 TAILQ_FOREACH(tmp_io, &channel->io_submitted, internal.ch_link) { 4620 if (tmp_io == bio_to_abort) { 4621 break; 4622 } 4623 } 4624 4625 /* If the target I/O still exists, set the parent to failed. */ 4626 if (tmp_io != NULL) { 4627 parent_io->internal.status = SPDK_BDEV_IO_STATUS_FAILED; 4628 } 4629 } 4630 4631 parent_io->u.bdev.split_outstanding--; 4632 if (parent_io->u.bdev.split_outstanding == 0) { 4633 if (parent_io->internal.status == SPDK_BDEV_IO_STATUS_NOMEM) { 4634 bdev_abort_retry(parent_io); 4635 } else { 4636 bdev_io_complete(parent_io); 4637 } 4638 } 4639 } 4640 4641 static int 4642 bdev_abort_io(struct spdk_bdev_desc *desc, struct spdk_bdev_channel *channel, 4643 struct spdk_bdev_io *bio_to_abort, 4644 spdk_bdev_io_completion_cb cb, void *cb_arg) 4645 { 4646 struct spdk_bdev *bdev = spdk_bdev_desc_get_bdev(desc); 4647 struct spdk_bdev_io *bdev_io; 4648 4649 if (bio_to_abort->type == SPDK_BDEV_IO_TYPE_ABORT || 4650 bio_to_abort->type == SPDK_BDEV_IO_TYPE_RESET) { 4651 /* TODO: Abort reset or abort request. */ 4652 return -ENOTSUP; 4653 } 4654 4655 bdev_io = bdev_channel_get_io(channel); 4656 if (bdev_io == NULL) { 4657 return -ENOMEM; 4658 } 4659 4660 bdev_io->internal.ch = channel; 4661 bdev_io->internal.desc = desc; 4662 bdev_io->type = SPDK_BDEV_IO_TYPE_ABORT; 4663 bdev_io_init(bdev_io, bdev, cb_arg, cb); 4664 4665 if (bdev->split_on_optimal_io_boundary && bdev_io_should_split(bio_to_abort)) { 4666 bdev_io->u.bdev.abort.bio_cb_arg = bio_to_abort; 4667 4668 /* Parent abort request is not submitted directly, but to manage its 4669 * execution add it to the submitted list here. 4670 */ 4671 bdev_io->internal.submit_tsc = spdk_get_ticks(); 4672 TAILQ_INSERT_TAIL(&channel->io_submitted, bdev_io, internal.ch_link); 4673 4674 bdev_abort(bdev_io); 4675 4676 return 0; 4677 } 4678 4679 bdev_io->u.abort.bio_to_abort = bio_to_abort; 4680 4681 /* Submit the abort request to the underlying bdev module. */ 4682 bdev_io_submit(bdev_io); 4683 4684 return 0; 4685 } 4686 4687 static uint32_t 4688 _bdev_abort(struct spdk_bdev_io *parent_io) 4689 { 4690 struct spdk_bdev_desc *desc = parent_io->internal.desc; 4691 struct spdk_bdev_channel *channel = parent_io->internal.ch; 4692 void *bio_cb_arg; 4693 struct spdk_bdev_io *bio_to_abort; 4694 uint32_t matched_ios; 4695 int rc; 4696 4697 bio_cb_arg = parent_io->u.bdev.abort.bio_cb_arg; 4698 4699 /* matched_ios is returned and will be kept by the caller. 4700 * 4701 * This funcion will be used for two cases, 1) the same cb_arg is used for 4702 * multiple I/Os, 2) a single large I/O is split into smaller ones. 4703 * Incrementing split_outstanding directly here may confuse readers especially 4704 * for the 1st case. 4705 * 4706 * Completion of I/O abort is processed after stack unwinding. Hence this trick 4707 * works as expected. 4708 */ 4709 matched_ios = 0; 4710 parent_io->internal.status = SPDK_BDEV_IO_STATUS_SUCCESS; 4711 4712 TAILQ_FOREACH(bio_to_abort, &channel->io_submitted, internal.ch_link) { 4713 if (bio_to_abort->internal.caller_ctx != bio_cb_arg) { 4714 continue; 4715 } 4716 4717 if (bio_to_abort->internal.submit_tsc > parent_io->internal.submit_tsc) { 4718 /* Any I/O which was submitted after this abort command should be excluded. */ 4719 continue; 4720 } 4721 4722 rc = bdev_abort_io(desc, channel, bio_to_abort, bdev_abort_io_done, parent_io); 4723 if (rc != 0) { 4724 if (rc == -ENOMEM) { 4725 parent_io->internal.status = SPDK_BDEV_IO_STATUS_NOMEM; 4726 } else { 4727 parent_io->internal.status = SPDK_BDEV_IO_STATUS_FAILED; 4728 } 4729 break; 4730 } 4731 matched_ios++; 4732 } 4733 4734 return matched_ios; 4735 } 4736 4737 static void 4738 bdev_abort_retry(void *ctx) 4739 { 4740 struct spdk_bdev_io *parent_io = ctx; 4741 uint32_t matched_ios; 4742 4743 matched_ios = _bdev_abort(parent_io); 4744 4745 if (matched_ios == 0) { 4746 if (parent_io->internal.status == SPDK_BDEV_IO_STATUS_NOMEM) { 4747 bdev_queue_io_wait_with_cb(parent_io, bdev_abort_retry); 4748 } else { 4749 /* For retry, the case that no target I/O was found is success 4750 * because it means target I/Os completed in the meantime. 4751 */ 4752 bdev_io_complete(parent_io); 4753 } 4754 return; 4755 } 4756 4757 /* Use split_outstanding to manage the progress of aborting I/Os. */ 4758 parent_io->u.bdev.split_outstanding = matched_ios; 4759 } 4760 4761 static void 4762 bdev_abort(struct spdk_bdev_io *parent_io) 4763 { 4764 uint32_t matched_ios; 4765 4766 matched_ios = _bdev_abort(parent_io); 4767 4768 if (matched_ios == 0) { 4769 if (parent_io->internal.status == SPDK_BDEV_IO_STATUS_NOMEM) { 4770 bdev_queue_io_wait_with_cb(parent_io, bdev_abort_retry); 4771 } else { 4772 /* The case the no target I/O was found is failure. */ 4773 parent_io->internal.status = SPDK_BDEV_IO_STATUS_FAILED; 4774 bdev_io_complete(parent_io); 4775 } 4776 return; 4777 } 4778 4779 /* Use split_outstanding to manage the progress of aborting I/Os. */ 4780 parent_io->u.bdev.split_outstanding = matched_ios; 4781 } 4782 4783 int 4784 spdk_bdev_abort(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 4785 void *bio_cb_arg, 4786 spdk_bdev_io_completion_cb cb, void *cb_arg) 4787 { 4788 struct spdk_bdev *bdev = spdk_bdev_desc_get_bdev(desc); 4789 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 4790 struct spdk_bdev_io *bdev_io; 4791 4792 if (bio_cb_arg == NULL) { 4793 return -EINVAL; 4794 } 4795 4796 if (!spdk_bdev_io_type_supported(bdev, SPDK_BDEV_IO_TYPE_ABORT)) { 4797 return -ENOTSUP; 4798 } 4799 4800 bdev_io = bdev_channel_get_io(channel); 4801 if (bdev_io == NULL) { 4802 return -ENOMEM; 4803 } 4804 4805 bdev_io->internal.ch = channel; 4806 bdev_io->internal.desc = desc; 4807 bdev_io->internal.submit_tsc = spdk_get_ticks(); 4808 bdev_io->type = SPDK_BDEV_IO_TYPE_ABORT; 4809 bdev_io_init(bdev_io, bdev, cb_arg, cb); 4810 4811 bdev_io->u.bdev.abort.bio_cb_arg = bio_cb_arg; 4812 4813 /* Parent abort request is not submitted directly, but to manage its execution, 4814 * add it to the submitted list here. 4815 */ 4816 TAILQ_INSERT_TAIL(&channel->io_submitted, bdev_io, internal.ch_link); 4817 4818 bdev_abort(bdev_io); 4819 4820 return 0; 4821 } 4822 4823 int 4824 spdk_bdev_queue_io_wait(struct spdk_bdev *bdev, struct spdk_io_channel *ch, 4825 struct spdk_bdev_io_wait_entry *entry) 4826 { 4827 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 4828 struct spdk_bdev_mgmt_channel *mgmt_ch = channel->shared_resource->mgmt_ch; 4829 4830 if (bdev != entry->bdev) { 4831 SPDK_ERRLOG("bdevs do not match\n"); 4832 return -EINVAL; 4833 } 4834 4835 if (mgmt_ch->per_thread_cache_count > 0) { 4836 SPDK_ERRLOG("Cannot queue io_wait if spdk_bdev_io available in per-thread cache\n"); 4837 return -EINVAL; 4838 } 4839 4840 TAILQ_INSERT_TAIL(&mgmt_ch->io_wait_queue, entry, link); 4841 return 0; 4842 } 4843 4844 static void 4845 bdev_ch_retry_io(struct spdk_bdev_channel *bdev_ch) 4846 { 4847 struct spdk_bdev *bdev = bdev_ch->bdev; 4848 struct spdk_bdev_shared_resource *shared_resource = bdev_ch->shared_resource; 4849 struct spdk_bdev_io *bdev_io; 4850 4851 if (shared_resource->io_outstanding > shared_resource->nomem_threshold) { 4852 /* 4853 * Allow some more I/O to complete before retrying the nomem_io queue. 4854 * Some drivers (such as nvme) cannot immediately take a new I/O in 4855 * the context of a completion, because the resources for the I/O are 4856 * not released until control returns to the bdev poller. Also, we 4857 * may require several small I/O to complete before a larger I/O 4858 * (that requires splitting) can be submitted. 4859 */ 4860 return; 4861 } 4862 4863 while (!TAILQ_EMPTY(&shared_resource->nomem_io)) { 4864 bdev_io = TAILQ_FIRST(&shared_resource->nomem_io); 4865 TAILQ_REMOVE(&shared_resource->nomem_io, bdev_io, internal.link); 4866 bdev_io->internal.ch->io_outstanding++; 4867 shared_resource->io_outstanding++; 4868 bdev_io->internal.status = SPDK_BDEV_IO_STATUS_PENDING; 4869 bdev_io->internal.error.nvme.cdw0 = 0; 4870 bdev_io->num_retries++; 4871 bdev->fn_table->submit_request(spdk_bdev_io_get_io_channel(bdev_io), bdev_io); 4872 if (bdev_io->internal.status == SPDK_BDEV_IO_STATUS_NOMEM) { 4873 break; 4874 } 4875 } 4876 } 4877 4878 static inline void 4879 bdev_io_complete(void *ctx) 4880 { 4881 struct spdk_bdev_io *bdev_io = ctx; 4882 struct spdk_bdev_channel *bdev_ch = bdev_io->internal.ch; 4883 uint64_t tsc, tsc_diff; 4884 4885 if (spdk_unlikely(bdev_io->internal.in_submit_request || bdev_io->internal.io_submit_ch)) { 4886 /* 4887 * Send the completion to the thread that originally submitted the I/O, 4888 * which may not be the current thread in the case of QoS. 4889 */ 4890 if (bdev_io->internal.io_submit_ch) { 4891 bdev_io->internal.ch = bdev_io->internal.io_submit_ch; 4892 bdev_io->internal.io_submit_ch = NULL; 4893 } 4894 4895 /* 4896 * Defer completion to avoid potential infinite recursion if the 4897 * user's completion callback issues a new I/O. 4898 */ 4899 spdk_thread_send_msg(spdk_bdev_io_get_thread(bdev_io), 4900 bdev_io_complete, bdev_io); 4901 return; 4902 } 4903 4904 tsc = spdk_get_ticks(); 4905 tsc_diff = tsc - bdev_io->internal.submit_tsc; 4906 spdk_trace_record_tsc(tsc, TRACE_BDEV_IO_DONE, 0, 0, (uintptr_t)bdev_io, 0); 4907 4908 TAILQ_REMOVE(&bdev_ch->io_submitted, bdev_io, internal.ch_link); 4909 4910 if (bdev_io->internal.ch->histogram) { 4911 spdk_histogram_data_tally(bdev_io->internal.ch->histogram, tsc_diff); 4912 } 4913 4914 if (bdev_io->internal.status == SPDK_BDEV_IO_STATUS_SUCCESS) { 4915 switch (bdev_io->type) { 4916 case SPDK_BDEV_IO_TYPE_READ: 4917 bdev_io->internal.ch->stat.bytes_read += bdev_io->u.bdev.num_blocks * bdev_io->bdev->blocklen; 4918 bdev_io->internal.ch->stat.num_read_ops++; 4919 bdev_io->internal.ch->stat.read_latency_ticks += tsc_diff; 4920 break; 4921 case SPDK_BDEV_IO_TYPE_WRITE: 4922 bdev_io->internal.ch->stat.bytes_written += bdev_io->u.bdev.num_blocks * bdev_io->bdev->blocklen; 4923 bdev_io->internal.ch->stat.num_write_ops++; 4924 bdev_io->internal.ch->stat.write_latency_ticks += tsc_diff; 4925 break; 4926 case SPDK_BDEV_IO_TYPE_UNMAP: 4927 bdev_io->internal.ch->stat.bytes_unmapped += bdev_io->u.bdev.num_blocks * bdev_io->bdev->blocklen; 4928 bdev_io->internal.ch->stat.num_unmap_ops++; 4929 bdev_io->internal.ch->stat.unmap_latency_ticks += tsc_diff; 4930 break; 4931 case SPDK_BDEV_IO_TYPE_ZCOPY: 4932 /* Track the data in the start phase only */ 4933 if (bdev_io->u.bdev.zcopy.start) { 4934 if (bdev_io->u.bdev.zcopy.populate) { 4935 bdev_io->internal.ch->stat.bytes_read += 4936 bdev_io->u.bdev.num_blocks * bdev_io->bdev->blocklen; 4937 bdev_io->internal.ch->stat.num_read_ops++; 4938 bdev_io->internal.ch->stat.read_latency_ticks += tsc_diff; 4939 } else { 4940 bdev_io->internal.ch->stat.bytes_written += 4941 bdev_io->u.bdev.num_blocks * bdev_io->bdev->blocklen; 4942 bdev_io->internal.ch->stat.num_write_ops++; 4943 bdev_io->internal.ch->stat.write_latency_ticks += tsc_diff; 4944 } 4945 } 4946 break; 4947 default: 4948 break; 4949 } 4950 } 4951 4952 #ifdef SPDK_CONFIG_VTUNE 4953 uint64_t now_tsc = spdk_get_ticks(); 4954 if (now_tsc > (bdev_io->internal.ch->start_tsc + bdev_io->internal.ch->interval_tsc)) { 4955 uint64_t data[5]; 4956 4957 data[0] = bdev_io->internal.ch->stat.num_read_ops - bdev_io->internal.ch->prev_stat.num_read_ops; 4958 data[1] = bdev_io->internal.ch->stat.bytes_read - bdev_io->internal.ch->prev_stat.bytes_read; 4959 data[2] = bdev_io->internal.ch->stat.num_write_ops - bdev_io->internal.ch->prev_stat.num_write_ops; 4960 data[3] = bdev_io->internal.ch->stat.bytes_written - bdev_io->internal.ch->prev_stat.bytes_written; 4961 data[4] = bdev_io->bdev->fn_table->get_spin_time ? 4962 bdev_io->bdev->fn_table->get_spin_time(spdk_bdev_io_get_io_channel(bdev_io)) : 0; 4963 4964 __itt_metadata_add(g_bdev_mgr.domain, __itt_null, bdev_io->internal.ch->handle, 4965 __itt_metadata_u64, 5, data); 4966 4967 bdev_io->internal.ch->prev_stat = bdev_io->internal.ch->stat; 4968 bdev_io->internal.ch->start_tsc = now_tsc; 4969 } 4970 #endif 4971 4972 assert(bdev_io->internal.cb != NULL); 4973 assert(spdk_get_thread() == spdk_bdev_io_get_thread(bdev_io)); 4974 4975 bdev_io->internal.cb(bdev_io, bdev_io->internal.status == SPDK_BDEV_IO_STATUS_SUCCESS, 4976 bdev_io->internal.caller_ctx); 4977 } 4978 4979 static void 4980 bdev_reset_complete(struct spdk_io_channel_iter *i, int status) 4981 { 4982 struct spdk_bdev_io *bdev_io = spdk_io_channel_iter_get_ctx(i); 4983 4984 if (bdev_io->u.reset.ch_ref != NULL) { 4985 spdk_put_io_channel(bdev_io->u.reset.ch_ref); 4986 bdev_io->u.reset.ch_ref = NULL; 4987 } 4988 4989 bdev_io_complete(bdev_io); 4990 } 4991 4992 static void 4993 bdev_unfreeze_channel(struct spdk_io_channel_iter *i) 4994 { 4995 struct spdk_bdev_io *bdev_io = spdk_io_channel_iter_get_ctx(i); 4996 struct spdk_io_channel *_ch = spdk_io_channel_iter_get_channel(i); 4997 struct spdk_bdev_channel *ch = spdk_io_channel_get_ctx(_ch); 4998 struct spdk_bdev_io *queued_reset; 4999 5000 ch->flags &= ~BDEV_CH_RESET_IN_PROGRESS; 5001 while (!TAILQ_EMPTY(&ch->queued_resets)) { 5002 queued_reset = TAILQ_FIRST(&ch->queued_resets); 5003 TAILQ_REMOVE(&ch->queued_resets, queued_reset, internal.link); 5004 spdk_bdev_io_complete(queued_reset, bdev_io->internal.status); 5005 } 5006 5007 spdk_for_each_channel_continue(i, 0); 5008 } 5009 5010 void 5011 spdk_bdev_io_complete(struct spdk_bdev_io *bdev_io, enum spdk_bdev_io_status status) 5012 { 5013 struct spdk_bdev *bdev = bdev_io->bdev; 5014 struct spdk_bdev_channel *bdev_ch = bdev_io->internal.ch; 5015 struct spdk_bdev_shared_resource *shared_resource = bdev_ch->shared_resource; 5016 5017 bdev_io->internal.status = status; 5018 5019 if (spdk_unlikely(bdev_io->type == SPDK_BDEV_IO_TYPE_RESET)) { 5020 bool unlock_channels = false; 5021 5022 if (status == SPDK_BDEV_IO_STATUS_NOMEM) { 5023 SPDK_ERRLOG("NOMEM returned for reset\n"); 5024 } 5025 pthread_mutex_lock(&bdev->internal.mutex); 5026 if (bdev_io == bdev->internal.reset_in_progress) { 5027 bdev->internal.reset_in_progress = NULL; 5028 unlock_channels = true; 5029 } 5030 pthread_mutex_unlock(&bdev->internal.mutex); 5031 5032 if (unlock_channels) { 5033 spdk_for_each_channel(__bdev_to_io_dev(bdev), bdev_unfreeze_channel, 5034 bdev_io, bdev_reset_complete); 5035 return; 5036 } 5037 } else { 5038 _bdev_io_unset_bounce_buf(bdev_io); 5039 5040 assert(bdev_ch->io_outstanding > 0); 5041 assert(shared_resource->io_outstanding > 0); 5042 bdev_ch->io_outstanding--; 5043 shared_resource->io_outstanding--; 5044 5045 if (spdk_unlikely(status == SPDK_BDEV_IO_STATUS_NOMEM)) { 5046 TAILQ_INSERT_HEAD(&shared_resource->nomem_io, bdev_io, internal.link); 5047 /* 5048 * Wait for some of the outstanding I/O to complete before we 5049 * retry any of the nomem_io. Normally we will wait for 5050 * NOMEM_THRESHOLD_COUNT I/O to complete but for low queue 5051 * depth channels we will instead wait for half to complete. 5052 */ 5053 shared_resource->nomem_threshold = spdk_max((int64_t)shared_resource->io_outstanding / 2, 5054 (int64_t)shared_resource->io_outstanding - NOMEM_THRESHOLD_COUNT); 5055 return; 5056 } 5057 5058 if (spdk_unlikely(!TAILQ_EMPTY(&shared_resource->nomem_io))) { 5059 bdev_ch_retry_io(bdev_ch); 5060 } 5061 } 5062 5063 bdev_io_complete(bdev_io); 5064 } 5065 5066 void 5067 spdk_bdev_io_complete_scsi_status(struct spdk_bdev_io *bdev_io, enum spdk_scsi_status sc, 5068 enum spdk_scsi_sense sk, uint8_t asc, uint8_t ascq) 5069 { 5070 if (sc == SPDK_SCSI_STATUS_GOOD) { 5071 bdev_io->internal.status = SPDK_BDEV_IO_STATUS_SUCCESS; 5072 } else { 5073 bdev_io->internal.status = SPDK_BDEV_IO_STATUS_SCSI_ERROR; 5074 bdev_io->internal.error.scsi.sc = sc; 5075 bdev_io->internal.error.scsi.sk = sk; 5076 bdev_io->internal.error.scsi.asc = asc; 5077 bdev_io->internal.error.scsi.ascq = ascq; 5078 } 5079 5080 spdk_bdev_io_complete(bdev_io, bdev_io->internal.status); 5081 } 5082 5083 void 5084 spdk_bdev_io_get_scsi_status(const struct spdk_bdev_io *bdev_io, 5085 int *sc, int *sk, int *asc, int *ascq) 5086 { 5087 assert(sc != NULL); 5088 assert(sk != NULL); 5089 assert(asc != NULL); 5090 assert(ascq != NULL); 5091 5092 switch (bdev_io->internal.status) { 5093 case SPDK_BDEV_IO_STATUS_SUCCESS: 5094 *sc = SPDK_SCSI_STATUS_GOOD; 5095 *sk = SPDK_SCSI_SENSE_NO_SENSE; 5096 *asc = SPDK_SCSI_ASC_NO_ADDITIONAL_SENSE; 5097 *ascq = SPDK_SCSI_ASCQ_CAUSE_NOT_REPORTABLE; 5098 break; 5099 case SPDK_BDEV_IO_STATUS_NVME_ERROR: 5100 spdk_scsi_nvme_translate(bdev_io, sc, sk, asc, ascq); 5101 break; 5102 case SPDK_BDEV_IO_STATUS_SCSI_ERROR: 5103 *sc = bdev_io->internal.error.scsi.sc; 5104 *sk = bdev_io->internal.error.scsi.sk; 5105 *asc = bdev_io->internal.error.scsi.asc; 5106 *ascq = bdev_io->internal.error.scsi.ascq; 5107 break; 5108 default: 5109 *sc = SPDK_SCSI_STATUS_CHECK_CONDITION; 5110 *sk = SPDK_SCSI_SENSE_ABORTED_COMMAND; 5111 *asc = SPDK_SCSI_ASC_NO_ADDITIONAL_SENSE; 5112 *ascq = SPDK_SCSI_ASCQ_CAUSE_NOT_REPORTABLE; 5113 break; 5114 } 5115 } 5116 5117 void 5118 spdk_bdev_io_complete_nvme_status(struct spdk_bdev_io *bdev_io, uint32_t cdw0, int sct, int sc) 5119 { 5120 if (sct == SPDK_NVME_SCT_GENERIC && sc == SPDK_NVME_SC_SUCCESS) { 5121 bdev_io->internal.status = SPDK_BDEV_IO_STATUS_SUCCESS; 5122 } else { 5123 bdev_io->internal.status = SPDK_BDEV_IO_STATUS_NVME_ERROR; 5124 } 5125 5126 bdev_io->internal.error.nvme.cdw0 = cdw0; 5127 bdev_io->internal.error.nvme.sct = sct; 5128 bdev_io->internal.error.nvme.sc = sc; 5129 5130 spdk_bdev_io_complete(bdev_io, bdev_io->internal.status); 5131 } 5132 5133 void 5134 spdk_bdev_io_get_nvme_status(const struct spdk_bdev_io *bdev_io, uint32_t *cdw0, int *sct, int *sc) 5135 { 5136 assert(sct != NULL); 5137 assert(sc != NULL); 5138 assert(cdw0 != NULL); 5139 5140 if (bdev_io->internal.status == SPDK_BDEV_IO_STATUS_NVME_ERROR) { 5141 *sct = bdev_io->internal.error.nvme.sct; 5142 *sc = bdev_io->internal.error.nvme.sc; 5143 } else if (bdev_io->internal.status == SPDK_BDEV_IO_STATUS_SUCCESS) { 5144 *sct = SPDK_NVME_SCT_GENERIC; 5145 *sc = SPDK_NVME_SC_SUCCESS; 5146 } else if (bdev_io->internal.status == SPDK_BDEV_IO_STATUS_ABORTED) { 5147 *sct = SPDK_NVME_SCT_GENERIC; 5148 *sc = SPDK_NVME_SC_ABORTED_BY_REQUEST; 5149 } else { 5150 *sct = SPDK_NVME_SCT_GENERIC; 5151 *sc = SPDK_NVME_SC_INTERNAL_DEVICE_ERROR; 5152 } 5153 5154 *cdw0 = bdev_io->internal.error.nvme.cdw0; 5155 } 5156 5157 void 5158 spdk_bdev_io_get_nvme_fused_status(const struct spdk_bdev_io *bdev_io, uint32_t *cdw0, 5159 int *first_sct, int *first_sc, int *second_sct, int *second_sc) 5160 { 5161 assert(first_sct != NULL); 5162 assert(first_sc != NULL); 5163 assert(second_sct != NULL); 5164 assert(second_sc != NULL); 5165 assert(cdw0 != NULL); 5166 5167 if (bdev_io->internal.status == SPDK_BDEV_IO_STATUS_NVME_ERROR) { 5168 if (bdev_io->internal.error.nvme.sct == SPDK_NVME_SCT_MEDIA_ERROR && 5169 bdev_io->internal.error.nvme.sc == SPDK_NVME_SC_COMPARE_FAILURE) { 5170 *first_sct = bdev_io->internal.error.nvme.sct; 5171 *first_sc = bdev_io->internal.error.nvme.sc; 5172 *second_sct = SPDK_NVME_SCT_GENERIC; 5173 *second_sc = SPDK_NVME_SC_ABORTED_FAILED_FUSED; 5174 } else { 5175 *first_sct = SPDK_NVME_SCT_GENERIC; 5176 *first_sc = SPDK_NVME_SC_SUCCESS; 5177 *second_sct = bdev_io->internal.error.nvme.sct; 5178 *second_sc = bdev_io->internal.error.nvme.sc; 5179 } 5180 } else if (bdev_io->internal.status == SPDK_BDEV_IO_STATUS_SUCCESS) { 5181 *first_sct = SPDK_NVME_SCT_GENERIC; 5182 *first_sc = SPDK_NVME_SC_SUCCESS; 5183 *second_sct = SPDK_NVME_SCT_GENERIC; 5184 *second_sc = SPDK_NVME_SC_SUCCESS; 5185 } else if (bdev_io->internal.status == SPDK_BDEV_IO_STATUS_FIRST_FUSED_FAILED) { 5186 *first_sct = SPDK_NVME_SCT_GENERIC; 5187 *first_sc = SPDK_NVME_SC_INTERNAL_DEVICE_ERROR; 5188 *second_sct = SPDK_NVME_SCT_GENERIC; 5189 *second_sc = SPDK_NVME_SC_ABORTED_FAILED_FUSED; 5190 } else if (bdev_io->internal.status == SPDK_BDEV_IO_STATUS_MISCOMPARE) { 5191 *first_sct = SPDK_NVME_SCT_MEDIA_ERROR; 5192 *first_sc = SPDK_NVME_SC_COMPARE_FAILURE; 5193 *second_sct = SPDK_NVME_SCT_GENERIC; 5194 *second_sc = SPDK_NVME_SC_ABORTED_FAILED_FUSED; 5195 } else { 5196 *first_sct = SPDK_NVME_SCT_GENERIC; 5197 *first_sc = SPDK_NVME_SC_INTERNAL_DEVICE_ERROR; 5198 *second_sct = SPDK_NVME_SCT_GENERIC; 5199 *second_sc = SPDK_NVME_SC_INTERNAL_DEVICE_ERROR; 5200 } 5201 5202 *cdw0 = bdev_io->internal.error.nvme.cdw0; 5203 } 5204 5205 struct spdk_thread * 5206 spdk_bdev_io_get_thread(struct spdk_bdev_io *bdev_io) 5207 { 5208 return spdk_io_channel_get_thread(bdev_io->internal.ch->channel); 5209 } 5210 5211 struct spdk_io_channel * 5212 spdk_bdev_io_get_io_channel(struct spdk_bdev_io *bdev_io) 5213 { 5214 return bdev_io->internal.ch->channel; 5215 } 5216 5217 static void 5218 bdev_qos_config_limit(struct spdk_bdev *bdev, uint64_t *limits) 5219 { 5220 uint64_t min_qos_set; 5221 int i; 5222 5223 for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) { 5224 if (limits[i] != SPDK_BDEV_QOS_LIMIT_NOT_DEFINED) { 5225 break; 5226 } 5227 } 5228 5229 if (i == SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES) { 5230 SPDK_ERRLOG("Invalid rate limits set.\n"); 5231 return; 5232 } 5233 5234 for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) { 5235 if (limits[i] == SPDK_BDEV_QOS_LIMIT_NOT_DEFINED) { 5236 continue; 5237 } 5238 5239 if (bdev_qos_is_iops_rate_limit(i) == true) { 5240 min_qos_set = SPDK_BDEV_QOS_MIN_IOS_PER_SEC; 5241 } else { 5242 min_qos_set = SPDK_BDEV_QOS_MIN_BYTES_PER_SEC; 5243 } 5244 5245 if (limits[i] == 0 || limits[i] % min_qos_set) { 5246 SPDK_ERRLOG("Assigned limit %" PRIu64 " on bdev %s is not multiple of %" PRIu64 "\n", 5247 limits[i], bdev->name, min_qos_set); 5248 SPDK_ERRLOG("Failed to enable QoS on this bdev %s\n", bdev->name); 5249 return; 5250 } 5251 } 5252 5253 if (!bdev->internal.qos) { 5254 bdev->internal.qos = calloc(1, sizeof(*bdev->internal.qos)); 5255 if (!bdev->internal.qos) { 5256 SPDK_ERRLOG("Unable to allocate memory for QoS tracking\n"); 5257 return; 5258 } 5259 } 5260 5261 for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) { 5262 bdev->internal.qos->rate_limits[i].limit = limits[i]; 5263 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Bdev:%s QoS type:%d set:%lu\n", 5264 bdev->name, i, limits[i]); 5265 } 5266 5267 return; 5268 } 5269 5270 static void 5271 bdev_qos_config(struct spdk_bdev *bdev) 5272 { 5273 struct spdk_conf_section *sp = NULL; 5274 const char *val = NULL; 5275 int i = 0, j = 0; 5276 uint64_t limits[SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES] = {}; 5277 bool config_qos = false; 5278 5279 sp = spdk_conf_find_section(NULL, "QoS"); 5280 if (!sp) { 5281 return; 5282 } 5283 5284 while (j < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES) { 5285 limits[j] = SPDK_BDEV_QOS_LIMIT_NOT_DEFINED; 5286 5287 i = 0; 5288 while (true) { 5289 val = spdk_conf_section_get_nmval(sp, qos_conf_type[j], i, 0); 5290 if (!val) { 5291 break; 5292 } 5293 5294 if (strcmp(bdev->name, val) != 0) { 5295 i++; 5296 continue; 5297 } 5298 5299 val = spdk_conf_section_get_nmval(sp, qos_conf_type[j], i, 1); 5300 if (val) { 5301 if (bdev_qos_is_iops_rate_limit(j) == true) { 5302 limits[j] = strtoull(val, NULL, 10); 5303 } else { 5304 limits[j] = strtoull(val, NULL, 10) * 1024 * 1024; 5305 } 5306 config_qos = true; 5307 } 5308 5309 break; 5310 } 5311 5312 j++; 5313 } 5314 5315 if (config_qos == true) { 5316 bdev_qos_config_limit(bdev, limits); 5317 } 5318 5319 return; 5320 } 5321 5322 static int 5323 bdev_init(struct spdk_bdev *bdev) 5324 { 5325 char *bdev_name; 5326 5327 assert(bdev->module != NULL); 5328 5329 if (!bdev->name) { 5330 SPDK_ERRLOG("Bdev name is NULL\n"); 5331 return -EINVAL; 5332 } 5333 5334 if (!strlen(bdev->name)) { 5335 SPDK_ERRLOG("Bdev name must not be an empty string\n"); 5336 return -EINVAL; 5337 } 5338 5339 if (spdk_bdev_get_by_name(bdev->name)) { 5340 SPDK_ERRLOG("Bdev name:%s already exists\n", bdev->name); 5341 return -EEXIST; 5342 } 5343 5344 /* Users often register their own I/O devices using the bdev name. In 5345 * order to avoid conflicts, prepend bdev_. */ 5346 bdev_name = spdk_sprintf_alloc("bdev_%s", bdev->name); 5347 if (!bdev_name) { 5348 SPDK_ERRLOG("Unable to allocate memory for internal bdev name.\n"); 5349 return -ENOMEM; 5350 } 5351 5352 bdev->internal.status = SPDK_BDEV_STATUS_READY; 5353 bdev->internal.measured_queue_depth = UINT64_MAX; 5354 bdev->internal.claim_module = NULL; 5355 bdev->internal.qd_poller = NULL; 5356 bdev->internal.qos = NULL; 5357 5358 /* If the user didn't specify a uuid, generate one. */ 5359 if (spdk_mem_all_zero(&bdev->uuid, sizeof(bdev->uuid))) { 5360 spdk_uuid_generate(&bdev->uuid); 5361 } 5362 5363 if (spdk_bdev_get_buf_align(bdev) > 1) { 5364 if (bdev->split_on_optimal_io_boundary) { 5365 bdev->optimal_io_boundary = spdk_min(bdev->optimal_io_boundary, 5366 SPDK_BDEV_LARGE_BUF_MAX_SIZE / bdev->blocklen); 5367 } else { 5368 bdev->split_on_optimal_io_boundary = true; 5369 bdev->optimal_io_boundary = SPDK_BDEV_LARGE_BUF_MAX_SIZE / bdev->blocklen; 5370 } 5371 } 5372 5373 /* If the user didn't specify a write unit size, set it to one. */ 5374 if (bdev->write_unit_size == 0) { 5375 bdev->write_unit_size = 1; 5376 } 5377 5378 /* Set ACWU value to 1 if bdev module did not set it (does not support it natively) */ 5379 if (bdev->acwu == 0) { 5380 bdev->acwu = 1; 5381 } 5382 5383 TAILQ_INIT(&bdev->internal.open_descs); 5384 TAILQ_INIT(&bdev->internal.locked_ranges); 5385 TAILQ_INIT(&bdev->internal.pending_locked_ranges); 5386 5387 TAILQ_INIT(&bdev->aliases); 5388 5389 bdev->internal.reset_in_progress = NULL; 5390 5391 bdev_qos_config(bdev); 5392 5393 spdk_io_device_register(__bdev_to_io_dev(bdev), 5394 bdev_channel_create, bdev_channel_destroy, 5395 sizeof(struct spdk_bdev_channel), 5396 bdev_name); 5397 5398 free(bdev_name); 5399 5400 pthread_mutex_init(&bdev->internal.mutex, NULL); 5401 return 0; 5402 } 5403 5404 static void 5405 bdev_destroy_cb(void *io_device) 5406 { 5407 int rc; 5408 struct spdk_bdev *bdev; 5409 spdk_bdev_unregister_cb cb_fn; 5410 void *cb_arg; 5411 5412 bdev = __bdev_from_io_dev(io_device); 5413 cb_fn = bdev->internal.unregister_cb; 5414 cb_arg = bdev->internal.unregister_ctx; 5415 5416 rc = bdev->fn_table->destruct(bdev->ctxt); 5417 if (rc < 0) { 5418 SPDK_ERRLOG("destruct failed\n"); 5419 } 5420 if (rc <= 0 && cb_fn != NULL) { 5421 cb_fn(cb_arg, rc); 5422 } 5423 } 5424 5425 5426 static void 5427 bdev_fini(struct spdk_bdev *bdev) 5428 { 5429 pthread_mutex_destroy(&bdev->internal.mutex); 5430 5431 free(bdev->internal.qos); 5432 5433 spdk_io_device_unregister(__bdev_to_io_dev(bdev), bdev_destroy_cb); 5434 } 5435 5436 static void 5437 bdev_start(struct spdk_bdev *bdev) 5438 { 5439 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Inserting bdev %s into list\n", bdev->name); 5440 TAILQ_INSERT_TAIL(&g_bdev_mgr.bdevs, bdev, internal.link); 5441 5442 /* Examine configuration before initializing I/O */ 5443 bdev_examine(bdev); 5444 } 5445 5446 int 5447 spdk_bdev_register(struct spdk_bdev *bdev) 5448 { 5449 int rc = bdev_init(bdev); 5450 5451 if (rc == 0) { 5452 bdev_start(bdev); 5453 } 5454 5455 spdk_notify_send("bdev_register", spdk_bdev_get_name(bdev)); 5456 return rc; 5457 } 5458 5459 int 5460 spdk_vbdev_register(struct spdk_bdev *vbdev, struct spdk_bdev **base_bdevs, int base_bdev_count) 5461 { 5462 SPDK_ERRLOG("This function is deprecated. Use spdk_bdev_register() instead.\n"); 5463 return spdk_bdev_register(vbdev); 5464 } 5465 5466 void 5467 spdk_bdev_destruct_done(struct spdk_bdev *bdev, int bdeverrno) 5468 { 5469 if (bdev->internal.unregister_cb != NULL) { 5470 bdev->internal.unregister_cb(bdev->internal.unregister_ctx, bdeverrno); 5471 } 5472 } 5473 5474 static void 5475 _remove_notify(void *arg) 5476 { 5477 struct spdk_bdev_desc *desc = arg; 5478 5479 pthread_mutex_lock(&desc->mutex); 5480 desc->refs--; 5481 5482 if (!desc->closed) { 5483 pthread_mutex_unlock(&desc->mutex); 5484 if (desc->callback.open_with_ext) { 5485 desc->callback.event_fn(SPDK_BDEV_EVENT_REMOVE, desc->bdev, desc->callback.ctx); 5486 } else { 5487 desc->callback.remove_fn(desc->callback.ctx); 5488 } 5489 return; 5490 } else if (0 == desc->refs) { 5491 /* This descriptor was closed after this remove_notify message was sent. 5492 * spdk_bdev_close() could not free the descriptor since this message was 5493 * in flight, so we free it now using bdev_desc_free(). 5494 */ 5495 pthread_mutex_unlock(&desc->mutex); 5496 bdev_desc_free(desc); 5497 return; 5498 } 5499 pthread_mutex_unlock(&desc->mutex); 5500 } 5501 5502 /* Must be called while holding bdev->internal.mutex. 5503 * returns: 0 - bdev removed and ready to be destructed. 5504 * -EBUSY - bdev can't be destructed yet. */ 5505 static int 5506 bdev_unregister_unsafe(struct spdk_bdev *bdev) 5507 { 5508 struct spdk_bdev_desc *desc, *tmp; 5509 int rc = 0; 5510 5511 /* Notify each descriptor about hotremoval */ 5512 TAILQ_FOREACH_SAFE(desc, &bdev->internal.open_descs, link, tmp) { 5513 rc = -EBUSY; 5514 pthread_mutex_lock(&desc->mutex); 5515 /* 5516 * Defer invocation of the event_cb to a separate message that will 5517 * run later on its thread. This ensures this context unwinds and 5518 * we don't recursively unregister this bdev again if the event_cb 5519 * immediately closes its descriptor. 5520 */ 5521 desc->refs++; 5522 spdk_thread_send_msg(desc->thread, _remove_notify, desc); 5523 pthread_mutex_unlock(&desc->mutex); 5524 } 5525 5526 /* If there are no descriptors, proceed removing the bdev */ 5527 if (rc == 0) { 5528 TAILQ_REMOVE(&g_bdev_mgr.bdevs, bdev, internal.link); 5529 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Removing bdev %s from list done\n", bdev->name); 5530 spdk_notify_send("bdev_unregister", spdk_bdev_get_name(bdev)); 5531 } 5532 5533 return rc; 5534 } 5535 5536 void 5537 spdk_bdev_unregister(struct spdk_bdev *bdev, spdk_bdev_unregister_cb cb_fn, void *cb_arg) 5538 { 5539 struct spdk_thread *thread; 5540 int rc; 5541 5542 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Removing bdev %s from list\n", bdev->name); 5543 5544 thread = spdk_get_thread(); 5545 if (!thread) { 5546 /* The user called this from a non-SPDK thread. */ 5547 if (cb_fn != NULL) { 5548 cb_fn(cb_arg, -ENOTSUP); 5549 } 5550 return; 5551 } 5552 5553 pthread_mutex_lock(&g_bdev_mgr.mutex); 5554 pthread_mutex_lock(&bdev->internal.mutex); 5555 if (bdev->internal.status == SPDK_BDEV_STATUS_REMOVING) { 5556 pthread_mutex_unlock(&bdev->internal.mutex); 5557 pthread_mutex_unlock(&g_bdev_mgr.mutex); 5558 if (cb_fn) { 5559 cb_fn(cb_arg, -EBUSY); 5560 } 5561 return; 5562 } 5563 5564 bdev->internal.status = SPDK_BDEV_STATUS_REMOVING; 5565 bdev->internal.unregister_cb = cb_fn; 5566 bdev->internal.unregister_ctx = cb_arg; 5567 5568 /* Call under lock. */ 5569 rc = bdev_unregister_unsafe(bdev); 5570 pthread_mutex_unlock(&bdev->internal.mutex); 5571 pthread_mutex_unlock(&g_bdev_mgr.mutex); 5572 5573 if (rc == 0) { 5574 bdev_fini(bdev); 5575 } 5576 } 5577 5578 static void 5579 bdev_dummy_event_cb(void *remove_ctx) 5580 { 5581 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Bdev remove event received with no remove callback specified"); 5582 } 5583 5584 static int 5585 bdev_start_qos(struct spdk_bdev *bdev) 5586 { 5587 struct set_qos_limit_ctx *ctx; 5588 5589 /* Enable QoS */ 5590 if (bdev->internal.qos && bdev->internal.qos->thread == NULL) { 5591 ctx = calloc(1, sizeof(*ctx)); 5592 if (ctx == NULL) { 5593 SPDK_ERRLOG("Failed to allocate memory for QoS context\n"); 5594 return -ENOMEM; 5595 } 5596 ctx->bdev = bdev; 5597 spdk_for_each_channel(__bdev_to_io_dev(bdev), 5598 bdev_enable_qos_msg, ctx, 5599 bdev_enable_qos_done); 5600 } 5601 5602 return 0; 5603 } 5604 5605 static int 5606 bdev_open(struct spdk_bdev *bdev, bool write, struct spdk_bdev_desc *desc) 5607 { 5608 struct spdk_thread *thread; 5609 int rc = 0; 5610 5611 thread = spdk_get_thread(); 5612 if (!thread) { 5613 SPDK_ERRLOG("Cannot open bdev from non-SPDK thread.\n"); 5614 return -ENOTSUP; 5615 } 5616 5617 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Opening descriptor %p for bdev %s on thread %p\n", desc, bdev->name, 5618 spdk_get_thread()); 5619 5620 desc->bdev = bdev; 5621 desc->thread = thread; 5622 desc->write = write; 5623 5624 pthread_mutex_lock(&bdev->internal.mutex); 5625 if (bdev->internal.status == SPDK_BDEV_STATUS_REMOVING) { 5626 pthread_mutex_unlock(&bdev->internal.mutex); 5627 return -ENODEV; 5628 } 5629 5630 if (write && bdev->internal.claim_module) { 5631 SPDK_ERRLOG("Could not open %s - %s module already claimed it\n", 5632 bdev->name, bdev->internal.claim_module->name); 5633 pthread_mutex_unlock(&bdev->internal.mutex); 5634 return -EPERM; 5635 } 5636 5637 rc = bdev_start_qos(bdev); 5638 if (rc != 0) { 5639 SPDK_ERRLOG("Failed to start QoS on bdev %s\n", bdev->name); 5640 pthread_mutex_unlock(&bdev->internal.mutex); 5641 return rc; 5642 } 5643 5644 TAILQ_INSERT_TAIL(&bdev->internal.open_descs, desc, link); 5645 5646 pthread_mutex_unlock(&bdev->internal.mutex); 5647 5648 return 0; 5649 } 5650 5651 int 5652 spdk_bdev_open(struct spdk_bdev *bdev, bool write, spdk_bdev_remove_cb_t remove_cb, 5653 void *remove_ctx, struct spdk_bdev_desc **_desc) 5654 { 5655 struct spdk_bdev_desc *desc; 5656 int rc; 5657 5658 desc = calloc(1, sizeof(*desc)); 5659 if (desc == NULL) { 5660 SPDK_ERRLOG("Failed to allocate memory for bdev descriptor\n"); 5661 return -ENOMEM; 5662 } 5663 5664 if (remove_cb == NULL) { 5665 remove_cb = bdev_dummy_event_cb; 5666 } 5667 5668 TAILQ_INIT(&desc->pending_media_events); 5669 TAILQ_INIT(&desc->free_media_events); 5670 5671 desc->callback.open_with_ext = false; 5672 desc->callback.remove_fn = remove_cb; 5673 desc->callback.ctx = remove_ctx; 5674 pthread_mutex_init(&desc->mutex, NULL); 5675 5676 pthread_mutex_lock(&g_bdev_mgr.mutex); 5677 5678 rc = bdev_open(bdev, write, desc); 5679 if (rc != 0) { 5680 bdev_desc_free(desc); 5681 desc = NULL; 5682 } 5683 5684 *_desc = desc; 5685 5686 pthread_mutex_unlock(&g_bdev_mgr.mutex); 5687 5688 return rc; 5689 } 5690 5691 int 5692 spdk_bdev_open_ext(const char *bdev_name, bool write, spdk_bdev_event_cb_t event_cb, 5693 void *event_ctx, struct spdk_bdev_desc **_desc) 5694 { 5695 struct spdk_bdev_desc *desc; 5696 struct spdk_bdev *bdev; 5697 unsigned int event_id; 5698 int rc; 5699 5700 if (event_cb == NULL) { 5701 SPDK_ERRLOG("Missing event callback function\n"); 5702 return -EINVAL; 5703 } 5704 5705 pthread_mutex_lock(&g_bdev_mgr.mutex); 5706 5707 bdev = spdk_bdev_get_by_name(bdev_name); 5708 5709 if (bdev == NULL) { 5710 SPDK_ERRLOG("Failed to find bdev with name: %s\n", bdev_name); 5711 pthread_mutex_unlock(&g_bdev_mgr.mutex); 5712 return -EINVAL; 5713 } 5714 5715 desc = calloc(1, sizeof(*desc)); 5716 if (desc == NULL) { 5717 SPDK_ERRLOG("Failed to allocate memory for bdev descriptor\n"); 5718 pthread_mutex_unlock(&g_bdev_mgr.mutex); 5719 return -ENOMEM; 5720 } 5721 5722 TAILQ_INIT(&desc->pending_media_events); 5723 TAILQ_INIT(&desc->free_media_events); 5724 5725 desc->callback.open_with_ext = true; 5726 desc->callback.event_fn = event_cb; 5727 desc->callback.ctx = event_ctx; 5728 pthread_mutex_init(&desc->mutex, NULL); 5729 5730 if (bdev->media_events) { 5731 desc->media_events_buffer = calloc(MEDIA_EVENT_POOL_SIZE, 5732 sizeof(*desc->media_events_buffer)); 5733 if (desc->media_events_buffer == NULL) { 5734 SPDK_ERRLOG("Failed to initialize media event pool\n"); 5735 bdev_desc_free(desc); 5736 pthread_mutex_unlock(&g_bdev_mgr.mutex); 5737 return -ENOMEM; 5738 } 5739 5740 for (event_id = 0; event_id < MEDIA_EVENT_POOL_SIZE; ++event_id) { 5741 TAILQ_INSERT_TAIL(&desc->free_media_events, 5742 &desc->media_events_buffer[event_id], tailq); 5743 } 5744 } 5745 5746 rc = bdev_open(bdev, write, desc); 5747 if (rc != 0) { 5748 bdev_desc_free(desc); 5749 desc = NULL; 5750 } 5751 5752 *_desc = desc; 5753 5754 pthread_mutex_unlock(&g_bdev_mgr.mutex); 5755 5756 return rc; 5757 } 5758 5759 void 5760 spdk_bdev_close(struct spdk_bdev_desc *desc) 5761 { 5762 struct spdk_bdev *bdev = spdk_bdev_desc_get_bdev(desc); 5763 int rc; 5764 5765 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Closing descriptor %p for bdev %s on thread %p\n", desc, bdev->name, 5766 spdk_get_thread()); 5767 5768 assert(desc->thread == spdk_get_thread()); 5769 5770 spdk_poller_unregister(&desc->io_timeout_poller); 5771 5772 pthread_mutex_lock(&bdev->internal.mutex); 5773 pthread_mutex_lock(&desc->mutex); 5774 5775 TAILQ_REMOVE(&bdev->internal.open_descs, desc, link); 5776 5777 desc->closed = true; 5778 5779 if (0 == desc->refs) { 5780 pthread_mutex_unlock(&desc->mutex); 5781 bdev_desc_free(desc); 5782 } else { 5783 pthread_mutex_unlock(&desc->mutex); 5784 } 5785 5786 /* If no more descriptors, kill QoS channel */ 5787 if (bdev->internal.qos && TAILQ_EMPTY(&bdev->internal.open_descs)) { 5788 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Closed last descriptor for bdev %s on thread %p. Stopping QoS.\n", 5789 bdev->name, spdk_get_thread()); 5790 5791 if (bdev_qos_destroy(bdev)) { 5792 /* There isn't anything we can do to recover here. Just let the 5793 * old QoS poller keep running. The QoS handling won't change 5794 * cores when the user allocates a new channel, but it won't break. */ 5795 SPDK_ERRLOG("Unable to shut down QoS poller. It will continue running on the current thread.\n"); 5796 } 5797 } 5798 5799 spdk_bdev_set_qd_sampling_period(bdev, 0); 5800 5801 if (bdev->internal.status == SPDK_BDEV_STATUS_REMOVING && TAILQ_EMPTY(&bdev->internal.open_descs)) { 5802 rc = bdev_unregister_unsafe(bdev); 5803 pthread_mutex_unlock(&bdev->internal.mutex); 5804 5805 if (rc == 0) { 5806 bdev_fini(bdev); 5807 } 5808 } else { 5809 pthread_mutex_unlock(&bdev->internal.mutex); 5810 } 5811 } 5812 5813 int 5814 spdk_bdev_module_claim_bdev(struct spdk_bdev *bdev, struct spdk_bdev_desc *desc, 5815 struct spdk_bdev_module *module) 5816 { 5817 if (bdev->internal.claim_module != NULL) { 5818 SPDK_ERRLOG("bdev %s already claimed by module %s\n", bdev->name, 5819 bdev->internal.claim_module->name); 5820 return -EPERM; 5821 } 5822 5823 if (desc && !desc->write) { 5824 desc->write = true; 5825 } 5826 5827 bdev->internal.claim_module = module; 5828 return 0; 5829 } 5830 5831 void 5832 spdk_bdev_module_release_bdev(struct spdk_bdev *bdev) 5833 { 5834 assert(bdev->internal.claim_module != NULL); 5835 bdev->internal.claim_module = NULL; 5836 } 5837 5838 struct spdk_bdev * 5839 spdk_bdev_desc_get_bdev(struct spdk_bdev_desc *desc) 5840 { 5841 assert(desc != NULL); 5842 return desc->bdev; 5843 } 5844 5845 void 5846 spdk_bdev_io_get_iovec(struct spdk_bdev_io *bdev_io, struct iovec **iovp, int *iovcntp) 5847 { 5848 struct iovec *iovs; 5849 int iovcnt; 5850 5851 if (bdev_io == NULL) { 5852 return; 5853 } 5854 5855 switch (bdev_io->type) { 5856 case SPDK_BDEV_IO_TYPE_READ: 5857 case SPDK_BDEV_IO_TYPE_WRITE: 5858 case SPDK_BDEV_IO_TYPE_ZCOPY: 5859 iovs = bdev_io->u.bdev.iovs; 5860 iovcnt = bdev_io->u.bdev.iovcnt; 5861 break; 5862 default: 5863 iovs = NULL; 5864 iovcnt = 0; 5865 break; 5866 } 5867 5868 if (iovp) { 5869 *iovp = iovs; 5870 } 5871 if (iovcntp) { 5872 *iovcntp = iovcnt; 5873 } 5874 } 5875 5876 void * 5877 spdk_bdev_io_get_md_buf(struct spdk_bdev_io *bdev_io) 5878 { 5879 if (bdev_io == NULL) { 5880 return NULL; 5881 } 5882 5883 if (!spdk_bdev_is_md_separate(bdev_io->bdev)) { 5884 return NULL; 5885 } 5886 5887 if (bdev_io->type == SPDK_BDEV_IO_TYPE_READ || 5888 bdev_io->type == SPDK_BDEV_IO_TYPE_WRITE) { 5889 return bdev_io->u.bdev.md_buf; 5890 } 5891 5892 return NULL; 5893 } 5894 5895 void * 5896 spdk_bdev_io_get_cb_arg(struct spdk_bdev_io *bdev_io) 5897 { 5898 if (bdev_io == NULL) { 5899 assert(false); 5900 return NULL; 5901 } 5902 5903 return bdev_io->internal.caller_ctx; 5904 } 5905 5906 void 5907 spdk_bdev_module_list_add(struct spdk_bdev_module *bdev_module) 5908 { 5909 5910 if (spdk_bdev_module_list_find(bdev_module->name)) { 5911 SPDK_ERRLOG("ERROR: module '%s' already registered.\n", bdev_module->name); 5912 assert(false); 5913 } 5914 5915 /* 5916 * Modules with examine callbacks must be initialized first, so they are 5917 * ready to handle examine callbacks from later modules that will 5918 * register physical bdevs. 5919 */ 5920 if (bdev_module->examine_config != NULL || bdev_module->examine_disk != NULL) { 5921 TAILQ_INSERT_HEAD(&g_bdev_mgr.bdev_modules, bdev_module, internal.tailq); 5922 } else { 5923 TAILQ_INSERT_TAIL(&g_bdev_mgr.bdev_modules, bdev_module, internal.tailq); 5924 } 5925 } 5926 5927 struct spdk_bdev_module * 5928 spdk_bdev_module_list_find(const char *name) 5929 { 5930 struct spdk_bdev_module *bdev_module; 5931 5932 TAILQ_FOREACH(bdev_module, &g_bdev_mgr.bdev_modules, internal.tailq) { 5933 if (strcmp(name, bdev_module->name) == 0) { 5934 break; 5935 } 5936 } 5937 5938 return bdev_module; 5939 } 5940 5941 static void 5942 bdev_write_zero_buffer_next(void *_bdev_io) 5943 { 5944 struct spdk_bdev_io *bdev_io = _bdev_io; 5945 uint64_t num_bytes, num_blocks; 5946 void *md_buf = NULL; 5947 int rc; 5948 5949 num_bytes = spdk_min(_bdev_get_block_size_with_md(bdev_io->bdev) * 5950 bdev_io->u.bdev.split_remaining_num_blocks, 5951 ZERO_BUFFER_SIZE); 5952 num_blocks = num_bytes / _bdev_get_block_size_with_md(bdev_io->bdev); 5953 5954 if (spdk_bdev_is_md_separate(bdev_io->bdev)) { 5955 md_buf = (char *)g_bdev_mgr.zero_buffer + 5956 spdk_bdev_get_block_size(bdev_io->bdev) * num_blocks; 5957 } 5958 5959 rc = bdev_write_blocks_with_md(bdev_io->internal.desc, 5960 spdk_io_channel_from_ctx(bdev_io->internal.ch), 5961 g_bdev_mgr.zero_buffer, md_buf, 5962 bdev_io->u.bdev.split_current_offset_blocks, num_blocks, 5963 bdev_write_zero_buffer_done, bdev_io); 5964 if (rc == 0) { 5965 bdev_io->u.bdev.split_remaining_num_blocks -= num_blocks; 5966 bdev_io->u.bdev.split_current_offset_blocks += num_blocks; 5967 } else if (rc == -ENOMEM) { 5968 bdev_queue_io_wait_with_cb(bdev_io, bdev_write_zero_buffer_next); 5969 } else { 5970 bdev_io->internal.status = SPDK_BDEV_IO_STATUS_FAILED; 5971 bdev_io->internal.cb(bdev_io, false, bdev_io->internal.caller_ctx); 5972 } 5973 } 5974 5975 static void 5976 bdev_write_zero_buffer_done(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg) 5977 { 5978 struct spdk_bdev_io *parent_io = cb_arg; 5979 5980 spdk_bdev_free_io(bdev_io); 5981 5982 if (!success) { 5983 parent_io->internal.status = SPDK_BDEV_IO_STATUS_FAILED; 5984 parent_io->internal.cb(parent_io, false, parent_io->internal.caller_ctx); 5985 return; 5986 } 5987 5988 if (parent_io->u.bdev.split_remaining_num_blocks == 0) { 5989 parent_io->internal.status = SPDK_BDEV_IO_STATUS_SUCCESS; 5990 parent_io->internal.cb(parent_io, true, parent_io->internal.caller_ctx); 5991 return; 5992 } 5993 5994 bdev_write_zero_buffer_next(parent_io); 5995 } 5996 5997 static void 5998 bdev_set_qos_limit_done(struct set_qos_limit_ctx *ctx, int status) 5999 { 6000 pthread_mutex_lock(&ctx->bdev->internal.mutex); 6001 ctx->bdev->internal.qos_mod_in_progress = false; 6002 pthread_mutex_unlock(&ctx->bdev->internal.mutex); 6003 6004 if (ctx->cb_fn) { 6005 ctx->cb_fn(ctx->cb_arg, status); 6006 } 6007 free(ctx); 6008 } 6009 6010 static void 6011 bdev_disable_qos_done(void *cb_arg) 6012 { 6013 struct set_qos_limit_ctx *ctx = cb_arg; 6014 struct spdk_bdev *bdev = ctx->bdev; 6015 struct spdk_bdev_io *bdev_io; 6016 struct spdk_bdev_qos *qos; 6017 6018 pthread_mutex_lock(&bdev->internal.mutex); 6019 qos = bdev->internal.qos; 6020 bdev->internal.qos = NULL; 6021 pthread_mutex_unlock(&bdev->internal.mutex); 6022 6023 while (!TAILQ_EMPTY(&qos->queued)) { 6024 /* Send queued I/O back to their original thread for resubmission. */ 6025 bdev_io = TAILQ_FIRST(&qos->queued); 6026 TAILQ_REMOVE(&qos->queued, bdev_io, internal.link); 6027 6028 if (bdev_io->internal.io_submit_ch) { 6029 /* 6030 * Channel was changed when sending it to the QoS thread - change it back 6031 * before sending it back to the original thread. 6032 */ 6033 bdev_io->internal.ch = bdev_io->internal.io_submit_ch; 6034 bdev_io->internal.io_submit_ch = NULL; 6035 } 6036 6037 spdk_thread_send_msg(spdk_bdev_io_get_thread(bdev_io), 6038 _bdev_io_submit, bdev_io); 6039 } 6040 6041 if (qos->thread != NULL) { 6042 spdk_put_io_channel(spdk_io_channel_from_ctx(qos->ch)); 6043 spdk_poller_unregister(&qos->poller); 6044 } 6045 6046 free(qos); 6047 6048 bdev_set_qos_limit_done(ctx, 0); 6049 } 6050 6051 static void 6052 bdev_disable_qos_msg_done(struct spdk_io_channel_iter *i, int status) 6053 { 6054 void *io_device = spdk_io_channel_iter_get_io_device(i); 6055 struct spdk_bdev *bdev = __bdev_from_io_dev(io_device); 6056 struct set_qos_limit_ctx *ctx = spdk_io_channel_iter_get_ctx(i); 6057 struct spdk_thread *thread; 6058 6059 pthread_mutex_lock(&bdev->internal.mutex); 6060 thread = bdev->internal.qos->thread; 6061 pthread_mutex_unlock(&bdev->internal.mutex); 6062 6063 if (thread != NULL) { 6064 spdk_thread_send_msg(thread, bdev_disable_qos_done, ctx); 6065 } else { 6066 bdev_disable_qos_done(ctx); 6067 } 6068 } 6069 6070 static void 6071 bdev_disable_qos_msg(struct spdk_io_channel_iter *i) 6072 { 6073 struct spdk_io_channel *ch = spdk_io_channel_iter_get_channel(i); 6074 struct spdk_bdev_channel *bdev_ch = spdk_io_channel_get_ctx(ch); 6075 6076 bdev_ch->flags &= ~BDEV_CH_QOS_ENABLED; 6077 6078 spdk_for_each_channel_continue(i, 0); 6079 } 6080 6081 static void 6082 bdev_update_qos_rate_limit_msg(void *cb_arg) 6083 { 6084 struct set_qos_limit_ctx *ctx = cb_arg; 6085 struct spdk_bdev *bdev = ctx->bdev; 6086 6087 pthread_mutex_lock(&bdev->internal.mutex); 6088 bdev_qos_update_max_quota_per_timeslice(bdev->internal.qos); 6089 pthread_mutex_unlock(&bdev->internal.mutex); 6090 6091 bdev_set_qos_limit_done(ctx, 0); 6092 } 6093 6094 static void 6095 bdev_enable_qos_msg(struct spdk_io_channel_iter *i) 6096 { 6097 void *io_device = spdk_io_channel_iter_get_io_device(i); 6098 struct spdk_bdev *bdev = __bdev_from_io_dev(io_device); 6099 struct spdk_io_channel *ch = spdk_io_channel_iter_get_channel(i); 6100 struct spdk_bdev_channel *bdev_ch = spdk_io_channel_get_ctx(ch); 6101 6102 pthread_mutex_lock(&bdev->internal.mutex); 6103 bdev_enable_qos(bdev, bdev_ch); 6104 pthread_mutex_unlock(&bdev->internal.mutex); 6105 spdk_for_each_channel_continue(i, 0); 6106 } 6107 6108 static void 6109 bdev_enable_qos_done(struct spdk_io_channel_iter *i, int status) 6110 { 6111 struct set_qos_limit_ctx *ctx = spdk_io_channel_iter_get_ctx(i); 6112 6113 bdev_set_qos_limit_done(ctx, status); 6114 } 6115 6116 static void 6117 bdev_set_qos_rate_limits(struct spdk_bdev *bdev, uint64_t *limits) 6118 { 6119 int i; 6120 6121 assert(bdev->internal.qos != NULL); 6122 6123 for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) { 6124 if (limits[i] != SPDK_BDEV_QOS_LIMIT_NOT_DEFINED) { 6125 bdev->internal.qos->rate_limits[i].limit = limits[i]; 6126 6127 if (limits[i] == 0) { 6128 bdev->internal.qos->rate_limits[i].limit = 6129 SPDK_BDEV_QOS_LIMIT_NOT_DEFINED; 6130 } 6131 } 6132 } 6133 } 6134 6135 void 6136 spdk_bdev_set_qos_rate_limits(struct spdk_bdev *bdev, uint64_t *limits, 6137 void (*cb_fn)(void *cb_arg, int status), void *cb_arg) 6138 { 6139 struct set_qos_limit_ctx *ctx; 6140 uint32_t limit_set_complement; 6141 uint64_t min_limit_per_sec; 6142 int i; 6143 bool disable_rate_limit = true; 6144 6145 for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) { 6146 if (limits[i] == SPDK_BDEV_QOS_LIMIT_NOT_DEFINED) { 6147 continue; 6148 } 6149 6150 if (limits[i] > 0) { 6151 disable_rate_limit = false; 6152 } 6153 6154 if (bdev_qos_is_iops_rate_limit(i) == true) { 6155 min_limit_per_sec = SPDK_BDEV_QOS_MIN_IOS_PER_SEC; 6156 } else { 6157 /* Change from megabyte to byte rate limit */ 6158 limits[i] = limits[i] * 1024 * 1024; 6159 min_limit_per_sec = SPDK_BDEV_QOS_MIN_BYTES_PER_SEC; 6160 } 6161 6162 limit_set_complement = limits[i] % min_limit_per_sec; 6163 if (limit_set_complement) { 6164 SPDK_ERRLOG("Requested rate limit %" PRIu64 " is not a multiple of %" PRIu64 "\n", 6165 limits[i], min_limit_per_sec); 6166 limits[i] += min_limit_per_sec - limit_set_complement; 6167 SPDK_ERRLOG("Round up the rate limit to %" PRIu64 "\n", limits[i]); 6168 } 6169 } 6170 6171 ctx = calloc(1, sizeof(*ctx)); 6172 if (ctx == NULL) { 6173 cb_fn(cb_arg, -ENOMEM); 6174 return; 6175 } 6176 6177 ctx->cb_fn = cb_fn; 6178 ctx->cb_arg = cb_arg; 6179 ctx->bdev = bdev; 6180 6181 pthread_mutex_lock(&bdev->internal.mutex); 6182 if (bdev->internal.qos_mod_in_progress) { 6183 pthread_mutex_unlock(&bdev->internal.mutex); 6184 free(ctx); 6185 cb_fn(cb_arg, -EAGAIN); 6186 return; 6187 } 6188 bdev->internal.qos_mod_in_progress = true; 6189 6190 if (disable_rate_limit == true && bdev->internal.qos) { 6191 for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) { 6192 if (limits[i] == SPDK_BDEV_QOS_LIMIT_NOT_DEFINED && 6193 (bdev->internal.qos->rate_limits[i].limit > 0 && 6194 bdev->internal.qos->rate_limits[i].limit != 6195 SPDK_BDEV_QOS_LIMIT_NOT_DEFINED)) { 6196 disable_rate_limit = false; 6197 break; 6198 } 6199 } 6200 } 6201 6202 if (disable_rate_limit == false) { 6203 if (bdev->internal.qos == NULL) { 6204 bdev->internal.qos = calloc(1, sizeof(*bdev->internal.qos)); 6205 if (!bdev->internal.qos) { 6206 pthread_mutex_unlock(&bdev->internal.mutex); 6207 SPDK_ERRLOG("Unable to allocate memory for QoS tracking\n"); 6208 bdev_set_qos_limit_done(ctx, -ENOMEM); 6209 return; 6210 } 6211 } 6212 6213 if (bdev->internal.qos->thread == NULL) { 6214 /* Enabling */ 6215 bdev_set_qos_rate_limits(bdev, limits); 6216 6217 spdk_for_each_channel(__bdev_to_io_dev(bdev), 6218 bdev_enable_qos_msg, ctx, 6219 bdev_enable_qos_done); 6220 } else { 6221 /* Updating */ 6222 bdev_set_qos_rate_limits(bdev, limits); 6223 6224 spdk_thread_send_msg(bdev->internal.qos->thread, 6225 bdev_update_qos_rate_limit_msg, ctx); 6226 } 6227 } else { 6228 if (bdev->internal.qos != NULL) { 6229 bdev_set_qos_rate_limits(bdev, limits); 6230 6231 /* Disabling */ 6232 spdk_for_each_channel(__bdev_to_io_dev(bdev), 6233 bdev_disable_qos_msg, ctx, 6234 bdev_disable_qos_msg_done); 6235 } else { 6236 pthread_mutex_unlock(&bdev->internal.mutex); 6237 bdev_set_qos_limit_done(ctx, 0); 6238 return; 6239 } 6240 } 6241 6242 pthread_mutex_unlock(&bdev->internal.mutex); 6243 } 6244 6245 struct spdk_bdev_histogram_ctx { 6246 spdk_bdev_histogram_status_cb cb_fn; 6247 void *cb_arg; 6248 struct spdk_bdev *bdev; 6249 int status; 6250 }; 6251 6252 static void 6253 bdev_histogram_disable_channel_cb(struct spdk_io_channel_iter *i, int status) 6254 { 6255 struct spdk_bdev_histogram_ctx *ctx = spdk_io_channel_iter_get_ctx(i); 6256 6257 pthread_mutex_lock(&ctx->bdev->internal.mutex); 6258 ctx->bdev->internal.histogram_in_progress = false; 6259 pthread_mutex_unlock(&ctx->bdev->internal.mutex); 6260 ctx->cb_fn(ctx->cb_arg, ctx->status); 6261 free(ctx); 6262 } 6263 6264 static void 6265 bdev_histogram_disable_channel(struct spdk_io_channel_iter *i) 6266 { 6267 struct spdk_io_channel *_ch = spdk_io_channel_iter_get_channel(i); 6268 struct spdk_bdev_channel *ch = spdk_io_channel_get_ctx(_ch); 6269 6270 if (ch->histogram != NULL) { 6271 spdk_histogram_data_free(ch->histogram); 6272 ch->histogram = NULL; 6273 } 6274 spdk_for_each_channel_continue(i, 0); 6275 } 6276 6277 static void 6278 bdev_histogram_enable_channel_cb(struct spdk_io_channel_iter *i, int status) 6279 { 6280 struct spdk_bdev_histogram_ctx *ctx = spdk_io_channel_iter_get_ctx(i); 6281 6282 if (status != 0) { 6283 ctx->status = status; 6284 ctx->bdev->internal.histogram_enabled = false; 6285 spdk_for_each_channel(__bdev_to_io_dev(ctx->bdev), bdev_histogram_disable_channel, ctx, 6286 bdev_histogram_disable_channel_cb); 6287 } else { 6288 pthread_mutex_lock(&ctx->bdev->internal.mutex); 6289 ctx->bdev->internal.histogram_in_progress = false; 6290 pthread_mutex_unlock(&ctx->bdev->internal.mutex); 6291 ctx->cb_fn(ctx->cb_arg, ctx->status); 6292 free(ctx); 6293 } 6294 } 6295 6296 static void 6297 bdev_histogram_enable_channel(struct spdk_io_channel_iter *i) 6298 { 6299 struct spdk_io_channel *_ch = spdk_io_channel_iter_get_channel(i); 6300 struct spdk_bdev_channel *ch = spdk_io_channel_get_ctx(_ch); 6301 int status = 0; 6302 6303 if (ch->histogram == NULL) { 6304 ch->histogram = spdk_histogram_data_alloc(); 6305 if (ch->histogram == NULL) { 6306 status = -ENOMEM; 6307 } 6308 } 6309 6310 spdk_for_each_channel_continue(i, status); 6311 } 6312 6313 void 6314 spdk_bdev_histogram_enable(struct spdk_bdev *bdev, spdk_bdev_histogram_status_cb cb_fn, 6315 void *cb_arg, bool enable) 6316 { 6317 struct spdk_bdev_histogram_ctx *ctx; 6318 6319 ctx = calloc(1, sizeof(struct spdk_bdev_histogram_ctx)); 6320 if (ctx == NULL) { 6321 cb_fn(cb_arg, -ENOMEM); 6322 return; 6323 } 6324 6325 ctx->bdev = bdev; 6326 ctx->status = 0; 6327 ctx->cb_fn = cb_fn; 6328 ctx->cb_arg = cb_arg; 6329 6330 pthread_mutex_lock(&bdev->internal.mutex); 6331 if (bdev->internal.histogram_in_progress) { 6332 pthread_mutex_unlock(&bdev->internal.mutex); 6333 free(ctx); 6334 cb_fn(cb_arg, -EAGAIN); 6335 return; 6336 } 6337 6338 bdev->internal.histogram_in_progress = true; 6339 pthread_mutex_unlock(&bdev->internal.mutex); 6340 6341 bdev->internal.histogram_enabled = enable; 6342 6343 if (enable) { 6344 /* Allocate histogram for each channel */ 6345 spdk_for_each_channel(__bdev_to_io_dev(bdev), bdev_histogram_enable_channel, ctx, 6346 bdev_histogram_enable_channel_cb); 6347 } else { 6348 spdk_for_each_channel(__bdev_to_io_dev(bdev), bdev_histogram_disable_channel, ctx, 6349 bdev_histogram_disable_channel_cb); 6350 } 6351 } 6352 6353 struct spdk_bdev_histogram_data_ctx { 6354 spdk_bdev_histogram_data_cb cb_fn; 6355 void *cb_arg; 6356 struct spdk_bdev *bdev; 6357 /** merged histogram data from all channels */ 6358 struct spdk_histogram_data *histogram; 6359 }; 6360 6361 static void 6362 bdev_histogram_get_channel_cb(struct spdk_io_channel_iter *i, int status) 6363 { 6364 struct spdk_bdev_histogram_data_ctx *ctx = spdk_io_channel_iter_get_ctx(i); 6365 6366 ctx->cb_fn(ctx->cb_arg, status, ctx->histogram); 6367 free(ctx); 6368 } 6369 6370 static void 6371 bdev_histogram_get_channel(struct spdk_io_channel_iter *i) 6372 { 6373 struct spdk_io_channel *_ch = spdk_io_channel_iter_get_channel(i); 6374 struct spdk_bdev_channel *ch = spdk_io_channel_get_ctx(_ch); 6375 struct spdk_bdev_histogram_data_ctx *ctx = spdk_io_channel_iter_get_ctx(i); 6376 int status = 0; 6377 6378 if (ch->histogram == NULL) { 6379 status = -EFAULT; 6380 } else { 6381 spdk_histogram_data_merge(ctx->histogram, ch->histogram); 6382 } 6383 6384 spdk_for_each_channel_continue(i, status); 6385 } 6386 6387 void 6388 spdk_bdev_histogram_get(struct spdk_bdev *bdev, struct spdk_histogram_data *histogram, 6389 spdk_bdev_histogram_data_cb cb_fn, 6390 void *cb_arg) 6391 { 6392 struct spdk_bdev_histogram_data_ctx *ctx; 6393 6394 ctx = calloc(1, sizeof(struct spdk_bdev_histogram_data_ctx)); 6395 if (ctx == NULL) { 6396 cb_fn(cb_arg, -ENOMEM, NULL); 6397 return; 6398 } 6399 6400 ctx->bdev = bdev; 6401 ctx->cb_fn = cb_fn; 6402 ctx->cb_arg = cb_arg; 6403 6404 ctx->histogram = histogram; 6405 6406 spdk_for_each_channel(__bdev_to_io_dev(bdev), bdev_histogram_get_channel, ctx, 6407 bdev_histogram_get_channel_cb); 6408 } 6409 6410 size_t 6411 spdk_bdev_get_media_events(struct spdk_bdev_desc *desc, struct spdk_bdev_media_event *events, 6412 size_t max_events) 6413 { 6414 struct media_event_entry *entry; 6415 size_t num_events = 0; 6416 6417 for (; num_events < max_events; ++num_events) { 6418 entry = TAILQ_FIRST(&desc->pending_media_events); 6419 if (entry == NULL) { 6420 break; 6421 } 6422 6423 events[num_events] = entry->event; 6424 TAILQ_REMOVE(&desc->pending_media_events, entry, tailq); 6425 TAILQ_INSERT_TAIL(&desc->free_media_events, entry, tailq); 6426 } 6427 6428 return num_events; 6429 } 6430 6431 int 6432 spdk_bdev_push_media_events(struct spdk_bdev *bdev, const struct spdk_bdev_media_event *events, 6433 size_t num_events) 6434 { 6435 struct spdk_bdev_desc *desc; 6436 struct media_event_entry *entry; 6437 size_t event_id; 6438 int rc = 0; 6439 6440 assert(bdev->media_events); 6441 6442 pthread_mutex_lock(&bdev->internal.mutex); 6443 TAILQ_FOREACH(desc, &bdev->internal.open_descs, link) { 6444 if (desc->write) { 6445 break; 6446 } 6447 } 6448 6449 if (desc == NULL || desc->media_events_buffer == NULL) { 6450 rc = -ENODEV; 6451 goto out; 6452 } 6453 6454 for (event_id = 0; event_id < num_events; ++event_id) { 6455 entry = TAILQ_FIRST(&desc->free_media_events); 6456 if (entry == NULL) { 6457 break; 6458 } 6459 6460 TAILQ_REMOVE(&desc->free_media_events, entry, tailq); 6461 TAILQ_INSERT_TAIL(&desc->pending_media_events, entry, tailq); 6462 entry->event = events[event_id]; 6463 } 6464 6465 rc = event_id; 6466 out: 6467 pthread_mutex_unlock(&bdev->internal.mutex); 6468 return rc; 6469 } 6470 6471 void 6472 spdk_bdev_notify_media_management(struct spdk_bdev *bdev) 6473 { 6474 struct spdk_bdev_desc *desc; 6475 6476 pthread_mutex_lock(&bdev->internal.mutex); 6477 TAILQ_FOREACH(desc, &bdev->internal.open_descs, link) { 6478 if (!TAILQ_EMPTY(&desc->pending_media_events)) { 6479 desc->callback.event_fn(SPDK_BDEV_EVENT_MEDIA_MANAGEMENT, bdev, 6480 desc->callback.ctx); 6481 } 6482 } 6483 pthread_mutex_unlock(&bdev->internal.mutex); 6484 } 6485 6486 struct locked_lba_range_ctx { 6487 struct lba_range range; 6488 struct spdk_bdev *bdev; 6489 struct lba_range *current_range; 6490 struct lba_range *owner_range; 6491 struct spdk_poller *poller; 6492 lock_range_cb cb_fn; 6493 void *cb_arg; 6494 }; 6495 6496 static void 6497 bdev_lock_error_cleanup_cb(struct spdk_io_channel_iter *i, int status) 6498 { 6499 struct locked_lba_range_ctx *ctx = spdk_io_channel_iter_get_ctx(i); 6500 6501 ctx->cb_fn(ctx->cb_arg, -ENOMEM); 6502 free(ctx); 6503 } 6504 6505 static void 6506 bdev_unlock_lba_range_get_channel(struct spdk_io_channel_iter *i); 6507 6508 static void 6509 bdev_lock_lba_range_cb(struct spdk_io_channel_iter *i, int status) 6510 { 6511 struct locked_lba_range_ctx *ctx = spdk_io_channel_iter_get_ctx(i); 6512 struct spdk_bdev *bdev = ctx->bdev; 6513 6514 if (status == -ENOMEM) { 6515 /* One of the channels could not allocate a range object. 6516 * So we have to go back and clean up any ranges that were 6517 * allocated successfully before we return error status to 6518 * the caller. We can reuse the unlock function to do that 6519 * clean up. 6520 */ 6521 spdk_for_each_channel(__bdev_to_io_dev(bdev), 6522 bdev_unlock_lba_range_get_channel, ctx, 6523 bdev_lock_error_cleanup_cb); 6524 return; 6525 } 6526 6527 /* All channels have locked this range and no I/O overlapping the range 6528 * are outstanding! Set the owner_ch for the range object for the 6529 * locking channel, so that this channel will know that it is allowed 6530 * to write to this range. 6531 */ 6532 ctx->owner_range->owner_ch = ctx->range.owner_ch; 6533 ctx->cb_fn(ctx->cb_arg, status); 6534 6535 /* Don't free the ctx here. Its range is in the bdev's global list of 6536 * locked ranges still, and will be removed and freed when this range 6537 * is later unlocked. 6538 */ 6539 } 6540 6541 static int 6542 bdev_lock_lba_range_check_io(void *_i) 6543 { 6544 struct spdk_io_channel_iter *i = _i; 6545 struct spdk_io_channel *_ch = spdk_io_channel_iter_get_channel(i); 6546 struct spdk_bdev_channel *ch = spdk_io_channel_get_ctx(_ch); 6547 struct locked_lba_range_ctx *ctx = spdk_io_channel_iter_get_ctx(i); 6548 struct lba_range *range = ctx->current_range; 6549 struct spdk_bdev_io *bdev_io; 6550 6551 spdk_poller_unregister(&ctx->poller); 6552 6553 /* The range is now in the locked_ranges, so no new IO can be submitted to this 6554 * range. But we need to wait until any outstanding IO overlapping with this range 6555 * are completed. 6556 */ 6557 TAILQ_FOREACH(bdev_io, &ch->io_submitted, internal.ch_link) { 6558 if (bdev_io_range_is_locked(bdev_io, range)) { 6559 ctx->poller = SPDK_POLLER_REGISTER(bdev_lock_lba_range_check_io, i, 100); 6560 return SPDK_POLLER_BUSY; 6561 } 6562 } 6563 6564 spdk_for_each_channel_continue(i, 0); 6565 return SPDK_POLLER_BUSY; 6566 } 6567 6568 static void 6569 bdev_lock_lba_range_get_channel(struct spdk_io_channel_iter *i) 6570 { 6571 struct spdk_io_channel *_ch = spdk_io_channel_iter_get_channel(i); 6572 struct spdk_bdev_channel *ch = spdk_io_channel_get_ctx(_ch); 6573 struct locked_lba_range_ctx *ctx = spdk_io_channel_iter_get_ctx(i); 6574 struct lba_range *range; 6575 6576 TAILQ_FOREACH(range, &ch->locked_ranges, tailq) { 6577 if (range->length == ctx->range.length && 6578 range->offset == ctx->range.offset && 6579 range->locked_ctx == ctx->range.locked_ctx) { 6580 /* This range already exists on this channel, so don't add 6581 * it again. This can happen when a new channel is created 6582 * while the for_each_channel operation is in progress. 6583 * Do not check for outstanding I/O in that case, since the 6584 * range was locked before any I/O could be submitted to the 6585 * new channel. 6586 */ 6587 spdk_for_each_channel_continue(i, 0); 6588 return; 6589 } 6590 } 6591 6592 range = calloc(1, sizeof(*range)); 6593 if (range == NULL) { 6594 spdk_for_each_channel_continue(i, -ENOMEM); 6595 return; 6596 } 6597 6598 range->length = ctx->range.length; 6599 range->offset = ctx->range.offset; 6600 range->locked_ctx = ctx->range.locked_ctx; 6601 ctx->current_range = range; 6602 if (ctx->range.owner_ch == ch) { 6603 /* This is the range object for the channel that will hold 6604 * the lock. Store it in the ctx object so that we can easily 6605 * set its owner_ch after the lock is finally acquired. 6606 */ 6607 ctx->owner_range = range; 6608 } 6609 TAILQ_INSERT_TAIL(&ch->locked_ranges, range, tailq); 6610 bdev_lock_lba_range_check_io(i); 6611 } 6612 6613 static void 6614 bdev_lock_lba_range_ctx(struct spdk_bdev *bdev, struct locked_lba_range_ctx *ctx) 6615 { 6616 assert(spdk_get_thread() == ctx->range.owner_ch->channel->thread); 6617 6618 /* We will add a copy of this range to each channel now. */ 6619 spdk_for_each_channel(__bdev_to_io_dev(bdev), bdev_lock_lba_range_get_channel, ctx, 6620 bdev_lock_lba_range_cb); 6621 } 6622 6623 static bool 6624 bdev_lba_range_overlaps_tailq(struct lba_range *range, lba_range_tailq_t *tailq) 6625 { 6626 struct lba_range *r; 6627 6628 TAILQ_FOREACH(r, tailq, tailq) { 6629 if (bdev_lba_range_overlapped(range, r)) { 6630 return true; 6631 } 6632 } 6633 return false; 6634 } 6635 6636 static int 6637 bdev_lock_lba_range(struct spdk_bdev_desc *desc, struct spdk_io_channel *_ch, 6638 uint64_t offset, uint64_t length, 6639 lock_range_cb cb_fn, void *cb_arg) 6640 { 6641 struct spdk_bdev *bdev = spdk_bdev_desc_get_bdev(desc); 6642 struct spdk_bdev_channel *ch = spdk_io_channel_get_ctx(_ch); 6643 struct locked_lba_range_ctx *ctx; 6644 6645 if (cb_arg == NULL) { 6646 SPDK_ERRLOG("cb_arg must not be NULL\n"); 6647 return -EINVAL; 6648 } 6649 6650 ctx = calloc(1, sizeof(*ctx)); 6651 if (ctx == NULL) { 6652 return -ENOMEM; 6653 } 6654 6655 ctx->range.offset = offset; 6656 ctx->range.length = length; 6657 ctx->range.owner_ch = ch; 6658 ctx->range.locked_ctx = cb_arg; 6659 ctx->bdev = bdev; 6660 ctx->cb_fn = cb_fn; 6661 ctx->cb_arg = cb_arg; 6662 6663 pthread_mutex_lock(&bdev->internal.mutex); 6664 if (bdev_lba_range_overlaps_tailq(&ctx->range, &bdev->internal.locked_ranges)) { 6665 /* There is an active lock overlapping with this range. 6666 * Put it on the pending list until this range no 6667 * longer overlaps with another. 6668 */ 6669 TAILQ_INSERT_TAIL(&bdev->internal.pending_locked_ranges, &ctx->range, tailq); 6670 } else { 6671 TAILQ_INSERT_TAIL(&bdev->internal.locked_ranges, &ctx->range, tailq); 6672 bdev_lock_lba_range_ctx(bdev, ctx); 6673 } 6674 pthread_mutex_unlock(&bdev->internal.mutex); 6675 return 0; 6676 } 6677 6678 static void 6679 bdev_lock_lba_range_ctx_msg(void *_ctx) 6680 { 6681 struct locked_lba_range_ctx *ctx = _ctx; 6682 6683 bdev_lock_lba_range_ctx(ctx->bdev, ctx); 6684 } 6685 6686 static void 6687 bdev_unlock_lba_range_cb(struct spdk_io_channel_iter *i, int status) 6688 { 6689 struct locked_lba_range_ctx *ctx = spdk_io_channel_iter_get_ctx(i); 6690 struct locked_lba_range_ctx *pending_ctx; 6691 struct spdk_bdev_channel *ch = ctx->range.owner_ch; 6692 struct spdk_bdev *bdev = ch->bdev; 6693 struct lba_range *range, *tmp; 6694 6695 pthread_mutex_lock(&bdev->internal.mutex); 6696 /* Check if there are any pending locked ranges that overlap with this range 6697 * that was just unlocked. If there are, check that it doesn't overlap with any 6698 * other locked ranges before calling bdev_lock_lba_range_ctx which will start 6699 * the lock process. 6700 */ 6701 TAILQ_FOREACH_SAFE(range, &bdev->internal.pending_locked_ranges, tailq, tmp) { 6702 if (bdev_lba_range_overlapped(range, &ctx->range) && 6703 !bdev_lba_range_overlaps_tailq(range, &bdev->internal.locked_ranges)) { 6704 TAILQ_REMOVE(&bdev->internal.pending_locked_ranges, range, tailq); 6705 pending_ctx = SPDK_CONTAINEROF(range, struct locked_lba_range_ctx, range); 6706 TAILQ_INSERT_TAIL(&bdev->internal.locked_ranges, range, tailq); 6707 spdk_thread_send_msg(pending_ctx->range.owner_ch->channel->thread, 6708 bdev_lock_lba_range_ctx_msg, pending_ctx); 6709 } 6710 } 6711 pthread_mutex_unlock(&bdev->internal.mutex); 6712 6713 ctx->cb_fn(ctx->cb_arg, status); 6714 free(ctx); 6715 } 6716 6717 static void 6718 bdev_unlock_lba_range_get_channel(struct spdk_io_channel_iter *i) 6719 { 6720 struct spdk_io_channel *_ch = spdk_io_channel_iter_get_channel(i); 6721 struct spdk_bdev_channel *ch = spdk_io_channel_get_ctx(_ch); 6722 struct locked_lba_range_ctx *ctx = spdk_io_channel_iter_get_ctx(i); 6723 TAILQ_HEAD(, spdk_bdev_io) io_locked; 6724 struct spdk_bdev_io *bdev_io; 6725 struct lba_range *range; 6726 6727 TAILQ_FOREACH(range, &ch->locked_ranges, tailq) { 6728 if (ctx->range.offset == range->offset && 6729 ctx->range.length == range->length && 6730 ctx->range.locked_ctx == range->locked_ctx) { 6731 TAILQ_REMOVE(&ch->locked_ranges, range, tailq); 6732 free(range); 6733 break; 6734 } 6735 } 6736 6737 /* Note: we should almost always be able to assert that the range specified 6738 * was found. But there are some very rare corner cases where a new channel 6739 * gets created simultaneously with a range unlock, where this function 6740 * would execute on that new channel and wouldn't have the range. 6741 * We also use this to clean up range allocations when a later allocation 6742 * fails in the locking path. 6743 * So we can't actually assert() here. 6744 */ 6745 6746 /* Swap the locked IO into a temporary list, and then try to submit them again. 6747 * We could hyper-optimize this to only resubmit locked I/O that overlap 6748 * with the range that was just unlocked, but this isn't a performance path so 6749 * we go for simplicity here. 6750 */ 6751 TAILQ_INIT(&io_locked); 6752 TAILQ_SWAP(&ch->io_locked, &io_locked, spdk_bdev_io, internal.ch_link); 6753 while (!TAILQ_EMPTY(&io_locked)) { 6754 bdev_io = TAILQ_FIRST(&io_locked); 6755 TAILQ_REMOVE(&io_locked, bdev_io, internal.ch_link); 6756 bdev_io_submit(bdev_io); 6757 } 6758 6759 spdk_for_each_channel_continue(i, 0); 6760 } 6761 6762 static int 6763 bdev_unlock_lba_range(struct spdk_bdev_desc *desc, struct spdk_io_channel *_ch, 6764 uint64_t offset, uint64_t length, 6765 lock_range_cb cb_fn, void *cb_arg) 6766 { 6767 struct spdk_bdev *bdev = spdk_bdev_desc_get_bdev(desc); 6768 struct spdk_bdev_channel *ch = spdk_io_channel_get_ctx(_ch); 6769 struct locked_lba_range_ctx *ctx; 6770 struct lba_range *range; 6771 bool range_found = false; 6772 6773 /* Let's make sure the specified channel actually has a lock on 6774 * the specified range. Note that the range must match exactly. 6775 */ 6776 TAILQ_FOREACH(range, &ch->locked_ranges, tailq) { 6777 if (range->offset == offset && range->length == length && 6778 range->owner_ch == ch && range->locked_ctx == cb_arg) { 6779 range_found = true; 6780 break; 6781 } 6782 } 6783 6784 if (!range_found) { 6785 return -EINVAL; 6786 } 6787 6788 pthread_mutex_lock(&bdev->internal.mutex); 6789 /* We confirmed that this channel has locked the specified range. To 6790 * start the unlock the process, we find the range in the bdev's locked_ranges 6791 * and remove it. This ensures new channels don't inherit the locked range. 6792 * Then we will send a message to each channel (including the one specified 6793 * here) to remove the range from its per-channel list. 6794 */ 6795 TAILQ_FOREACH(range, &bdev->internal.locked_ranges, tailq) { 6796 if (range->offset == offset && range->length == length && 6797 range->locked_ctx == cb_arg) { 6798 break; 6799 } 6800 } 6801 if (range == NULL) { 6802 assert(false); 6803 pthread_mutex_unlock(&bdev->internal.mutex); 6804 return -EINVAL; 6805 } 6806 TAILQ_REMOVE(&bdev->internal.locked_ranges, range, tailq); 6807 ctx = SPDK_CONTAINEROF(range, struct locked_lba_range_ctx, range); 6808 pthread_mutex_unlock(&bdev->internal.mutex); 6809 6810 ctx->cb_fn = cb_fn; 6811 ctx->cb_arg = cb_arg; 6812 6813 spdk_for_each_channel(__bdev_to_io_dev(bdev), bdev_unlock_lba_range_get_channel, ctx, 6814 bdev_unlock_lba_range_cb); 6815 return 0; 6816 } 6817 6818 SPDK_LOG_REGISTER_COMPONENT("bdev", SPDK_LOG_BDEV) 6819 6820 SPDK_TRACE_REGISTER_FN(bdev_trace, "bdev", TRACE_GROUP_BDEV) 6821 { 6822 spdk_trace_register_owner(OWNER_BDEV, 'b'); 6823 spdk_trace_register_object(OBJECT_BDEV_IO, 'i'); 6824 spdk_trace_register_description("BDEV_IO_START", TRACE_BDEV_IO_START, OWNER_BDEV, 6825 OBJECT_BDEV_IO, 1, 0, "type: "); 6826 spdk_trace_register_description("BDEV_IO_DONE", TRACE_BDEV_IO_DONE, OWNER_BDEV, 6827 OBJECT_BDEV_IO, 0, 0, ""); 6828 } 6829