1 /*- 2 * BSD LICENSE 3 * 4 * Copyright (c) Intel Corporation. All rights reserved. 5 * Copyright (c) 2019 Mellanox Technologies LTD. All rights reserved. 6 * 7 * Redistribution and use in source and binary forms, with or without 8 * modification, are permitted provided that the following conditions 9 * are met: 10 * 11 * * Redistributions of source code must retain the above copyright 12 * notice, this list of conditions and the following disclaimer. 13 * * Redistributions in binary form must reproduce the above copyright 14 * notice, this list of conditions and the following disclaimer in 15 * the documentation and/or other materials provided with the 16 * distribution. 17 * * Neither the name of Intel Corporation nor the names of its 18 * contributors may be used to endorse or promote products derived 19 * from this software without specific prior written permission. 20 * 21 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 22 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 23 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR 24 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT 25 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, 26 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT 27 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 28 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY 29 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 30 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE 31 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 32 */ 33 34 #include "spdk/stdinc.h" 35 36 #include "spdk/bdev.h" 37 38 #include "spdk/config.h" 39 #include "spdk/env.h" 40 #include "spdk/thread.h" 41 #include "spdk/likely.h" 42 #include "spdk/queue.h" 43 #include "spdk/nvme_spec.h" 44 #include "spdk/scsi_spec.h" 45 #include "spdk/notify.h" 46 #include "spdk/util.h" 47 #include "spdk/trace.h" 48 49 #include "spdk/bdev_module.h" 50 #include "spdk/log.h" 51 #include "spdk/string.h" 52 53 #include "bdev_internal.h" 54 55 #ifdef SPDK_CONFIG_VTUNE 56 #include "ittnotify.h" 57 #include "ittnotify_types.h" 58 int __itt_init_ittlib(const char *, __itt_group_id); 59 #endif 60 61 #define SPDK_BDEV_IO_POOL_SIZE (64 * 1024 - 1) 62 #define SPDK_BDEV_IO_CACHE_SIZE 256 63 #define SPDK_BDEV_AUTO_EXAMINE true 64 #define BUF_SMALL_POOL_SIZE 8191 65 #define BUF_LARGE_POOL_SIZE 1023 66 #define NOMEM_THRESHOLD_COUNT 8 67 #define ZERO_BUFFER_SIZE 0x100000 68 69 #define OWNER_BDEV 0x2 70 71 #define OBJECT_BDEV_IO 0x2 72 73 #define TRACE_GROUP_BDEV 0x3 74 #define TRACE_BDEV_IO_START SPDK_TPOINT_ID(TRACE_GROUP_BDEV, 0x0) 75 #define TRACE_BDEV_IO_DONE SPDK_TPOINT_ID(TRACE_GROUP_BDEV, 0x1) 76 77 #define SPDK_BDEV_QOS_TIMESLICE_IN_USEC 1000 78 #define SPDK_BDEV_QOS_MIN_IO_PER_TIMESLICE 1 79 #define SPDK_BDEV_QOS_MIN_BYTE_PER_TIMESLICE 512 80 #define SPDK_BDEV_QOS_MIN_IOS_PER_SEC 1000 81 #define SPDK_BDEV_QOS_MIN_BYTES_PER_SEC (1024 * 1024) 82 #define SPDK_BDEV_QOS_LIMIT_NOT_DEFINED UINT64_MAX 83 #define SPDK_BDEV_IO_POLL_INTERVAL_IN_MSEC 1000 84 85 #define SPDK_BDEV_POOL_ALIGNMENT 512 86 87 static const char *qos_rpc_type[] = {"rw_ios_per_sec", 88 "rw_mbytes_per_sec", "r_mbytes_per_sec", "w_mbytes_per_sec" 89 }; 90 91 TAILQ_HEAD(spdk_bdev_list, spdk_bdev); 92 93 struct spdk_bdev_mgr { 94 struct spdk_mempool *bdev_io_pool; 95 96 struct spdk_mempool *buf_small_pool; 97 struct spdk_mempool *buf_large_pool; 98 99 void *zero_buffer; 100 101 TAILQ_HEAD(bdev_module_list, spdk_bdev_module) bdev_modules; 102 103 struct spdk_bdev_list bdevs; 104 105 bool init_complete; 106 bool module_init_complete; 107 108 pthread_mutex_t mutex; 109 110 #ifdef SPDK_CONFIG_VTUNE 111 __itt_domain *domain; 112 #endif 113 }; 114 115 static struct spdk_bdev_mgr g_bdev_mgr = { 116 .bdev_modules = TAILQ_HEAD_INITIALIZER(g_bdev_mgr.bdev_modules), 117 .bdevs = TAILQ_HEAD_INITIALIZER(g_bdev_mgr.bdevs), 118 .init_complete = false, 119 .module_init_complete = false, 120 .mutex = PTHREAD_MUTEX_INITIALIZER, 121 }; 122 123 typedef void (*lock_range_cb)(void *ctx, int status); 124 125 struct lba_range { 126 uint64_t offset; 127 uint64_t length; 128 void *locked_ctx; 129 struct spdk_bdev_channel *owner_ch; 130 TAILQ_ENTRY(lba_range) tailq; 131 }; 132 133 static struct spdk_bdev_opts g_bdev_opts = { 134 .bdev_io_pool_size = SPDK_BDEV_IO_POOL_SIZE, 135 .bdev_io_cache_size = SPDK_BDEV_IO_CACHE_SIZE, 136 .bdev_auto_examine = SPDK_BDEV_AUTO_EXAMINE, 137 .small_buf_pool_size = BUF_SMALL_POOL_SIZE, 138 .large_buf_pool_size = BUF_LARGE_POOL_SIZE, 139 }; 140 141 static spdk_bdev_init_cb g_init_cb_fn = NULL; 142 static void *g_init_cb_arg = NULL; 143 144 static spdk_bdev_fini_cb g_fini_cb_fn = NULL; 145 static void *g_fini_cb_arg = NULL; 146 static struct spdk_thread *g_fini_thread = NULL; 147 148 struct spdk_bdev_qos_limit { 149 /** IOs or bytes allowed per second (i.e., 1s). */ 150 uint64_t limit; 151 152 /** Remaining IOs or bytes allowed in current timeslice (e.g., 1ms). 153 * For remaining bytes, allowed to run negative if an I/O is submitted when 154 * some bytes are remaining, but the I/O is bigger than that amount. The 155 * excess will be deducted from the next timeslice. 156 */ 157 int64_t remaining_this_timeslice; 158 159 /** Minimum allowed IOs or bytes to be issued in one timeslice (e.g., 1ms). */ 160 uint32_t min_per_timeslice; 161 162 /** Maximum allowed IOs or bytes to be issued in one timeslice (e.g., 1ms). */ 163 uint32_t max_per_timeslice; 164 165 /** Function to check whether to queue the IO. */ 166 bool (*queue_io)(const struct spdk_bdev_qos_limit *limit, struct spdk_bdev_io *io); 167 168 /** Function to update for the submitted IO. */ 169 void (*update_quota)(struct spdk_bdev_qos_limit *limit, struct spdk_bdev_io *io); 170 }; 171 172 struct spdk_bdev_qos { 173 /** Types of structure of rate limits. */ 174 struct spdk_bdev_qos_limit rate_limits[SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES]; 175 176 /** The channel that all I/O are funneled through. */ 177 struct spdk_bdev_channel *ch; 178 179 /** The thread on which the poller is running. */ 180 struct spdk_thread *thread; 181 182 /** Queue of I/O waiting to be issued. */ 183 bdev_io_tailq_t queued; 184 185 /** Size of a timeslice in tsc ticks. */ 186 uint64_t timeslice_size; 187 188 /** Timestamp of start of last timeslice. */ 189 uint64_t last_timeslice; 190 191 /** Poller that processes queued I/O commands each time slice. */ 192 struct spdk_poller *poller; 193 }; 194 195 struct spdk_bdev_mgmt_channel { 196 bdev_io_stailq_t need_buf_small; 197 bdev_io_stailq_t need_buf_large; 198 199 /* 200 * Each thread keeps a cache of bdev_io - this allows 201 * bdev threads which are *not* DPDK threads to still 202 * benefit from a per-thread bdev_io cache. Without 203 * this, non-DPDK threads fetching from the mempool 204 * incur a cmpxchg on get and put. 205 */ 206 bdev_io_stailq_t per_thread_cache; 207 uint32_t per_thread_cache_count; 208 uint32_t bdev_io_cache_size; 209 210 TAILQ_HEAD(, spdk_bdev_shared_resource) shared_resources; 211 TAILQ_HEAD(, spdk_bdev_io_wait_entry) io_wait_queue; 212 }; 213 214 /* 215 * Per-module (or per-io_device) data. Multiple bdevs built on the same io_device 216 * will queue here their IO that awaits retry. It makes it possible to retry sending 217 * IO to one bdev after IO from other bdev completes. 218 */ 219 struct spdk_bdev_shared_resource { 220 /* The bdev management channel */ 221 struct spdk_bdev_mgmt_channel *mgmt_ch; 222 223 /* 224 * Count of I/O submitted to bdev module and waiting for completion. 225 * Incremented before submit_request() is called on an spdk_bdev_io. 226 */ 227 uint64_t io_outstanding; 228 229 /* 230 * Queue of IO awaiting retry because of a previous NOMEM status returned 231 * on this channel. 232 */ 233 bdev_io_tailq_t nomem_io; 234 235 /* 236 * Threshold which io_outstanding must drop to before retrying nomem_io. 237 */ 238 uint64_t nomem_threshold; 239 240 /* I/O channel allocated by a bdev module */ 241 struct spdk_io_channel *shared_ch; 242 243 /* Refcount of bdev channels using this resource */ 244 uint32_t ref; 245 246 TAILQ_ENTRY(spdk_bdev_shared_resource) link; 247 }; 248 249 #define BDEV_CH_RESET_IN_PROGRESS (1 << 0) 250 #define BDEV_CH_QOS_ENABLED (1 << 1) 251 252 struct spdk_bdev_channel { 253 struct spdk_bdev *bdev; 254 255 /* The channel for the underlying device */ 256 struct spdk_io_channel *channel; 257 258 /* Per io_device per thread data */ 259 struct spdk_bdev_shared_resource *shared_resource; 260 261 struct spdk_bdev_io_stat stat; 262 263 /* 264 * Count of I/O submitted to the underlying dev module through this channel 265 * and waiting for completion. 266 */ 267 uint64_t io_outstanding; 268 269 /* 270 * List of all submitted I/Os including I/O that are generated via splitting. 271 */ 272 bdev_io_tailq_t io_submitted; 273 274 /* 275 * List of spdk_bdev_io that are currently queued because they write to a locked 276 * LBA range. 277 */ 278 bdev_io_tailq_t io_locked; 279 280 uint32_t flags; 281 282 struct spdk_histogram_data *histogram; 283 284 #ifdef SPDK_CONFIG_VTUNE 285 uint64_t start_tsc; 286 uint64_t interval_tsc; 287 __itt_string_handle *handle; 288 struct spdk_bdev_io_stat prev_stat; 289 #endif 290 291 bdev_io_tailq_t queued_resets; 292 293 lba_range_tailq_t locked_ranges; 294 }; 295 296 struct media_event_entry { 297 struct spdk_bdev_media_event event; 298 TAILQ_ENTRY(media_event_entry) tailq; 299 }; 300 301 #define MEDIA_EVENT_POOL_SIZE 64 302 303 struct spdk_bdev_desc { 304 struct spdk_bdev *bdev; 305 struct spdk_thread *thread; 306 struct { 307 bool open_with_ext; 308 union { 309 spdk_bdev_remove_cb_t remove_fn; 310 spdk_bdev_event_cb_t event_fn; 311 }; 312 void *ctx; 313 } callback; 314 bool closed; 315 bool write; 316 pthread_mutex_t mutex; 317 uint32_t refs; 318 TAILQ_HEAD(, media_event_entry) pending_media_events; 319 TAILQ_HEAD(, media_event_entry) free_media_events; 320 struct media_event_entry *media_events_buffer; 321 TAILQ_ENTRY(spdk_bdev_desc) link; 322 323 uint64_t timeout_in_sec; 324 spdk_bdev_io_timeout_cb cb_fn; 325 void *cb_arg; 326 struct spdk_poller *io_timeout_poller; 327 }; 328 329 struct spdk_bdev_iostat_ctx { 330 struct spdk_bdev_io_stat *stat; 331 spdk_bdev_get_device_stat_cb cb; 332 void *cb_arg; 333 }; 334 335 struct set_qos_limit_ctx { 336 void (*cb_fn)(void *cb_arg, int status); 337 void *cb_arg; 338 struct spdk_bdev *bdev; 339 }; 340 341 #define __bdev_to_io_dev(bdev) (((char *)bdev) + 1) 342 #define __bdev_from_io_dev(io_dev) ((struct spdk_bdev *)(((char *)io_dev) - 1)) 343 344 static void bdev_write_zero_buffer_done(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg); 345 static void bdev_write_zero_buffer_next(void *_bdev_io); 346 347 static void bdev_enable_qos_msg(struct spdk_io_channel_iter *i); 348 static void bdev_enable_qos_done(struct spdk_io_channel_iter *i, int status); 349 350 static int 351 bdev_readv_blocks_with_md(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 352 struct iovec *iov, int iovcnt, void *md_buf, uint64_t offset_blocks, 353 uint64_t num_blocks, spdk_bdev_io_completion_cb cb, void *cb_arg); 354 static int 355 bdev_writev_blocks_with_md(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 356 struct iovec *iov, int iovcnt, void *md_buf, 357 uint64_t offset_blocks, uint64_t num_blocks, 358 spdk_bdev_io_completion_cb cb, void *cb_arg); 359 360 static int 361 bdev_lock_lba_range(struct spdk_bdev_desc *desc, struct spdk_io_channel *_ch, 362 uint64_t offset, uint64_t length, 363 lock_range_cb cb_fn, void *cb_arg); 364 365 static int 366 bdev_unlock_lba_range(struct spdk_bdev_desc *desc, struct spdk_io_channel *_ch, 367 uint64_t offset, uint64_t length, 368 lock_range_cb cb_fn, void *cb_arg); 369 370 static inline void bdev_io_complete(void *ctx); 371 372 static bool bdev_abort_queued_io(bdev_io_tailq_t *queue, struct spdk_bdev_io *bio_to_abort); 373 static bool bdev_abort_buf_io(bdev_io_stailq_t *queue, struct spdk_bdev_io *bio_to_abort); 374 375 void 376 spdk_bdev_get_opts(struct spdk_bdev_opts *opts, size_t opts_size) 377 { 378 if (!opts) { 379 SPDK_ERRLOG("opts should not be NULL\n"); 380 return; 381 } 382 383 if (!opts_size) { 384 SPDK_ERRLOG("opts_size should not be zero value\n"); 385 return; 386 } 387 388 opts->opts_size = opts_size; 389 390 #define SET_FIELD(field) \ 391 if (offsetof(struct spdk_bdev_opts, field) + sizeof(opts->field) <= opts_size) { \ 392 opts->field = g_bdev_opts.field; \ 393 } \ 394 395 SET_FIELD(bdev_io_pool_size); 396 SET_FIELD(bdev_io_cache_size); 397 SET_FIELD(bdev_auto_examine); 398 SET_FIELD(small_buf_pool_size); 399 SET_FIELD(large_buf_pool_size); 400 401 /* Do not remove this statement, you should always update this statement when you adding a new field, 402 * and do not forget to add the SET_FIELD statement for your added field. */ 403 SPDK_STATIC_ASSERT(sizeof(struct spdk_bdev_opts) == 32, "Incorrect size"); 404 405 #undef SET_FIELD 406 } 407 408 int 409 spdk_bdev_set_opts(struct spdk_bdev_opts *opts) 410 { 411 uint32_t min_pool_size; 412 413 if (!opts) { 414 SPDK_ERRLOG("opts cannot be NULL\n"); 415 return -1; 416 } 417 418 if (!opts->opts_size) { 419 SPDK_ERRLOG("opts_size inside opts cannot be zero value\n"); 420 return -1; 421 } 422 423 /* 424 * Add 1 to the thread count to account for the extra mgmt_ch that gets created during subsystem 425 * initialization. A second mgmt_ch will be created on the same thread when the application starts 426 * but before the deferred put_io_channel event is executed for the first mgmt_ch. 427 */ 428 min_pool_size = opts->bdev_io_cache_size * (spdk_thread_get_count() + 1); 429 if (opts->bdev_io_pool_size < min_pool_size) { 430 SPDK_ERRLOG("bdev_io_pool_size %" PRIu32 " is not compatible with bdev_io_cache_size %" PRIu32 431 " and %" PRIu32 " threads\n", opts->bdev_io_pool_size, opts->bdev_io_cache_size, 432 spdk_thread_get_count()); 433 SPDK_ERRLOG("bdev_io_pool_size must be at least %" PRIu32 "\n", min_pool_size); 434 return -1; 435 } 436 437 if (opts->small_buf_pool_size < BUF_SMALL_POOL_SIZE) { 438 SPDK_ERRLOG("small_buf_pool_size must be at least %" PRIu32 "\n", BUF_SMALL_POOL_SIZE); 439 return -1; 440 } 441 442 if (opts->large_buf_pool_size < BUF_LARGE_POOL_SIZE) { 443 SPDK_ERRLOG("large_buf_pool_size must be at least %" PRIu32 "\n", BUF_LARGE_POOL_SIZE); 444 return -1; 445 } 446 447 #define SET_FIELD(field) \ 448 if (offsetof(struct spdk_bdev_opts, field) + sizeof(opts->field) <= opts->opts_size) { \ 449 g_bdev_opts.field = opts->field; \ 450 } \ 451 452 SET_FIELD(bdev_io_pool_size); 453 SET_FIELD(bdev_io_cache_size); 454 SET_FIELD(bdev_auto_examine); 455 SET_FIELD(small_buf_pool_size); 456 SET_FIELD(large_buf_pool_size); 457 458 g_bdev_opts.opts_size = opts->opts_size; 459 460 #undef SET_FIELD 461 462 return 0; 463 } 464 465 struct spdk_bdev_wait_for_examine_ctx { 466 struct spdk_poller *poller; 467 spdk_bdev_wait_for_examine_cb cb_fn; 468 void *cb_arg; 469 }; 470 471 static bool 472 bdev_module_all_actions_completed(void); 473 474 static int 475 bdev_wait_for_examine_cb(void *arg) 476 { 477 struct spdk_bdev_wait_for_examine_ctx *ctx = arg; 478 479 if (!bdev_module_all_actions_completed()) { 480 return SPDK_POLLER_IDLE; 481 } 482 483 spdk_poller_unregister(&ctx->poller); 484 ctx->cb_fn(ctx->cb_arg); 485 free(ctx); 486 487 return SPDK_POLLER_BUSY; 488 } 489 490 int 491 spdk_bdev_wait_for_examine(spdk_bdev_wait_for_examine_cb cb_fn, void *cb_arg) 492 { 493 struct spdk_bdev_wait_for_examine_ctx *ctx; 494 495 ctx = calloc(1, sizeof(*ctx)); 496 if (ctx == NULL) { 497 return -ENOMEM; 498 } 499 ctx->cb_fn = cb_fn; 500 ctx->cb_arg = cb_arg; 501 ctx->poller = SPDK_POLLER_REGISTER(bdev_wait_for_examine_cb, ctx, 0); 502 503 return 0; 504 } 505 506 struct spdk_bdev_examine_item { 507 char *name; 508 TAILQ_ENTRY(spdk_bdev_examine_item) link; 509 }; 510 511 TAILQ_HEAD(spdk_bdev_examine_allowlist, spdk_bdev_examine_item); 512 513 struct spdk_bdev_examine_allowlist g_bdev_examine_allowlist = TAILQ_HEAD_INITIALIZER( 514 g_bdev_examine_allowlist); 515 516 static inline bool 517 bdev_examine_allowlist_check(const char *name) 518 { 519 struct spdk_bdev_examine_item *item; 520 TAILQ_FOREACH(item, &g_bdev_examine_allowlist, link) { 521 if (strcmp(name, item->name) == 0) { 522 return true; 523 } 524 } 525 return false; 526 } 527 528 static inline void 529 bdev_examine_allowlist_free(void) 530 { 531 struct spdk_bdev_examine_item *item; 532 while (!TAILQ_EMPTY(&g_bdev_examine_allowlist)) { 533 item = TAILQ_FIRST(&g_bdev_examine_allowlist); 534 TAILQ_REMOVE(&g_bdev_examine_allowlist, item, link); 535 free(item->name); 536 free(item); 537 } 538 } 539 540 static inline bool 541 bdev_in_examine_allowlist(struct spdk_bdev *bdev) 542 { 543 struct spdk_bdev_alias *tmp; 544 if (bdev_examine_allowlist_check(bdev->name)) { 545 return true; 546 } 547 TAILQ_FOREACH(tmp, &bdev->aliases, tailq) { 548 if (bdev_examine_allowlist_check(tmp->alias)) { 549 return true; 550 } 551 } 552 return false; 553 } 554 555 static inline bool 556 bdev_ok_to_examine(struct spdk_bdev *bdev) 557 { 558 if (g_bdev_opts.bdev_auto_examine) { 559 return true; 560 } else { 561 return bdev_in_examine_allowlist(bdev); 562 } 563 } 564 565 static void 566 bdev_examine(struct spdk_bdev *bdev) 567 { 568 struct spdk_bdev_module *module; 569 uint32_t action; 570 571 TAILQ_FOREACH(module, &g_bdev_mgr.bdev_modules, internal.tailq) { 572 if (module->examine_config && bdev_ok_to_examine(bdev)) { 573 action = module->internal.action_in_progress; 574 module->internal.action_in_progress++; 575 module->examine_config(bdev); 576 if (action != module->internal.action_in_progress) { 577 SPDK_ERRLOG("examine_config for module %s did not call spdk_bdev_module_examine_done()\n", 578 module->name); 579 } 580 } 581 } 582 583 if (bdev->internal.claim_module && bdev_ok_to_examine(bdev)) { 584 if (bdev->internal.claim_module->examine_disk) { 585 bdev->internal.claim_module->internal.action_in_progress++; 586 bdev->internal.claim_module->examine_disk(bdev); 587 } 588 return; 589 } 590 591 TAILQ_FOREACH(module, &g_bdev_mgr.bdev_modules, internal.tailq) { 592 if (module->examine_disk && bdev_ok_to_examine(bdev)) { 593 module->internal.action_in_progress++; 594 module->examine_disk(bdev); 595 } 596 } 597 } 598 599 int 600 spdk_bdev_examine(const char *name) 601 { 602 struct spdk_bdev *bdev; 603 struct spdk_bdev_examine_item *item; 604 605 if (g_bdev_opts.bdev_auto_examine) { 606 SPDK_ERRLOG("Manual examine is not allowed if auto examine is enabled"); 607 return -EINVAL; 608 } 609 610 if (bdev_examine_allowlist_check(name)) { 611 SPDK_ERRLOG("Duplicate bdev name for manual examine: %s\n", name); 612 return -EEXIST; 613 } 614 615 item = calloc(1, sizeof(*item)); 616 if (!item) { 617 return -ENOMEM; 618 } 619 item->name = strdup(name); 620 if (!item->name) { 621 free(item); 622 return -ENOMEM; 623 } 624 TAILQ_INSERT_TAIL(&g_bdev_examine_allowlist, item, link); 625 626 bdev = spdk_bdev_get_by_name(name); 627 if (bdev) { 628 bdev_examine(bdev); 629 } 630 return 0; 631 } 632 633 static inline void 634 bdev_examine_allowlist_config_json(struct spdk_json_write_ctx *w) 635 { 636 struct spdk_bdev_examine_item *item; 637 TAILQ_FOREACH(item, &g_bdev_examine_allowlist, link) { 638 spdk_json_write_object_begin(w); 639 spdk_json_write_named_string(w, "method", "bdev_examine"); 640 spdk_json_write_named_object_begin(w, "params"); 641 spdk_json_write_named_string(w, "name", item->name); 642 spdk_json_write_object_end(w); 643 spdk_json_write_object_end(w); 644 } 645 } 646 647 struct spdk_bdev * 648 spdk_bdev_first(void) 649 { 650 struct spdk_bdev *bdev; 651 652 bdev = TAILQ_FIRST(&g_bdev_mgr.bdevs); 653 if (bdev) { 654 SPDK_DEBUGLOG(bdev, "Starting bdev iteration at %s\n", bdev->name); 655 } 656 657 return bdev; 658 } 659 660 struct spdk_bdev * 661 spdk_bdev_next(struct spdk_bdev *prev) 662 { 663 struct spdk_bdev *bdev; 664 665 bdev = TAILQ_NEXT(prev, internal.link); 666 if (bdev) { 667 SPDK_DEBUGLOG(bdev, "Continuing bdev iteration at %s\n", bdev->name); 668 } 669 670 return bdev; 671 } 672 673 static struct spdk_bdev * 674 _bdev_next_leaf(struct spdk_bdev *bdev) 675 { 676 while (bdev != NULL) { 677 if (bdev->internal.claim_module == NULL) { 678 return bdev; 679 } else { 680 bdev = TAILQ_NEXT(bdev, internal.link); 681 } 682 } 683 684 return bdev; 685 } 686 687 struct spdk_bdev * 688 spdk_bdev_first_leaf(void) 689 { 690 struct spdk_bdev *bdev; 691 692 bdev = _bdev_next_leaf(TAILQ_FIRST(&g_bdev_mgr.bdevs)); 693 694 if (bdev) { 695 SPDK_DEBUGLOG(bdev, "Starting bdev iteration at %s\n", bdev->name); 696 } 697 698 return bdev; 699 } 700 701 struct spdk_bdev * 702 spdk_bdev_next_leaf(struct spdk_bdev *prev) 703 { 704 struct spdk_bdev *bdev; 705 706 bdev = _bdev_next_leaf(TAILQ_NEXT(prev, internal.link)); 707 708 if (bdev) { 709 SPDK_DEBUGLOG(bdev, "Continuing bdev iteration at %s\n", bdev->name); 710 } 711 712 return bdev; 713 } 714 715 struct spdk_bdev * 716 spdk_bdev_get_by_name(const char *bdev_name) 717 { 718 struct spdk_bdev_alias *tmp; 719 struct spdk_bdev *bdev = spdk_bdev_first(); 720 721 while (bdev != NULL) { 722 if (strcmp(bdev_name, bdev->name) == 0) { 723 return bdev; 724 } 725 726 TAILQ_FOREACH(tmp, &bdev->aliases, tailq) { 727 if (strcmp(bdev_name, tmp->alias) == 0) { 728 return bdev; 729 } 730 } 731 732 bdev = spdk_bdev_next(bdev); 733 } 734 735 return NULL; 736 } 737 738 void 739 spdk_bdev_io_set_buf(struct spdk_bdev_io *bdev_io, void *buf, size_t len) 740 { 741 struct iovec *iovs; 742 743 if (bdev_io->u.bdev.iovs == NULL) { 744 bdev_io->u.bdev.iovs = &bdev_io->iov; 745 bdev_io->u.bdev.iovcnt = 1; 746 } 747 748 iovs = bdev_io->u.bdev.iovs; 749 750 assert(iovs != NULL); 751 assert(bdev_io->u.bdev.iovcnt >= 1); 752 753 iovs[0].iov_base = buf; 754 iovs[0].iov_len = len; 755 } 756 757 void 758 spdk_bdev_io_set_md_buf(struct spdk_bdev_io *bdev_io, void *md_buf, size_t len) 759 { 760 assert((len / spdk_bdev_get_md_size(bdev_io->bdev)) >= bdev_io->u.bdev.num_blocks); 761 bdev_io->u.bdev.md_buf = md_buf; 762 } 763 764 static bool 765 _is_buf_allocated(const struct iovec *iovs) 766 { 767 if (iovs == NULL) { 768 return false; 769 } 770 771 return iovs[0].iov_base != NULL; 772 } 773 774 static bool 775 _are_iovs_aligned(struct iovec *iovs, int iovcnt, uint32_t alignment) 776 { 777 int i; 778 uintptr_t iov_base; 779 780 if (spdk_likely(alignment == 1)) { 781 return true; 782 } 783 784 for (i = 0; i < iovcnt; i++) { 785 iov_base = (uintptr_t)iovs[i].iov_base; 786 if ((iov_base & (alignment - 1)) != 0) { 787 return false; 788 } 789 } 790 791 return true; 792 } 793 794 static void 795 _copy_iovs_to_buf(void *buf, size_t buf_len, struct iovec *iovs, int iovcnt) 796 { 797 int i; 798 size_t len; 799 800 for (i = 0; i < iovcnt; i++) { 801 len = spdk_min(iovs[i].iov_len, buf_len); 802 memcpy(buf, iovs[i].iov_base, len); 803 buf += len; 804 buf_len -= len; 805 } 806 } 807 808 static void 809 _copy_buf_to_iovs(struct iovec *iovs, int iovcnt, void *buf, size_t buf_len) 810 { 811 int i; 812 size_t len; 813 814 for (i = 0; i < iovcnt; i++) { 815 len = spdk_min(iovs[i].iov_len, buf_len); 816 memcpy(iovs[i].iov_base, buf, len); 817 buf += len; 818 buf_len -= len; 819 } 820 } 821 822 static void 823 _bdev_io_set_bounce_buf(struct spdk_bdev_io *bdev_io, void *buf, size_t len) 824 { 825 /* save original iovec */ 826 bdev_io->internal.orig_iovs = bdev_io->u.bdev.iovs; 827 bdev_io->internal.orig_iovcnt = bdev_io->u.bdev.iovcnt; 828 /* set bounce iov */ 829 bdev_io->u.bdev.iovs = &bdev_io->internal.bounce_iov; 830 bdev_io->u.bdev.iovcnt = 1; 831 /* set bounce buffer for this operation */ 832 bdev_io->u.bdev.iovs[0].iov_base = buf; 833 bdev_io->u.bdev.iovs[0].iov_len = len; 834 /* if this is write path, copy data from original buffer to bounce buffer */ 835 if (bdev_io->type == SPDK_BDEV_IO_TYPE_WRITE) { 836 _copy_iovs_to_buf(buf, len, bdev_io->internal.orig_iovs, bdev_io->internal.orig_iovcnt); 837 } 838 } 839 840 static void 841 _bdev_io_set_bounce_md_buf(struct spdk_bdev_io *bdev_io, void *md_buf, size_t len) 842 { 843 /* save original md_buf */ 844 bdev_io->internal.orig_md_buf = bdev_io->u.bdev.md_buf; 845 /* set bounce md_buf */ 846 bdev_io->u.bdev.md_buf = md_buf; 847 848 if (bdev_io->type == SPDK_BDEV_IO_TYPE_WRITE) { 849 memcpy(md_buf, bdev_io->internal.orig_md_buf, len); 850 } 851 } 852 853 static void 854 bdev_io_get_buf_complete(struct spdk_bdev_io *bdev_io, void *buf, bool status) 855 { 856 struct spdk_io_channel *ch = spdk_bdev_io_get_io_channel(bdev_io); 857 858 if (spdk_unlikely(bdev_io->internal.get_aux_buf_cb != NULL)) { 859 bdev_io->internal.get_aux_buf_cb(ch, bdev_io, buf); 860 bdev_io->internal.get_aux_buf_cb = NULL; 861 } else { 862 assert(bdev_io->internal.get_buf_cb != NULL); 863 bdev_io->internal.buf = buf; 864 bdev_io->internal.get_buf_cb(ch, bdev_io, status); 865 bdev_io->internal.get_buf_cb = NULL; 866 } 867 } 868 869 static void 870 _bdev_io_set_buf(struct spdk_bdev_io *bdev_io, void *buf, uint64_t len) 871 { 872 struct spdk_bdev *bdev = bdev_io->bdev; 873 bool buf_allocated; 874 uint64_t md_len, alignment; 875 void *aligned_buf; 876 877 if (spdk_unlikely(bdev_io->internal.get_aux_buf_cb != NULL)) { 878 bdev_io_get_buf_complete(bdev_io, buf, true); 879 return; 880 } 881 882 alignment = spdk_bdev_get_buf_align(bdev); 883 buf_allocated = _is_buf_allocated(bdev_io->u.bdev.iovs); 884 aligned_buf = (void *)(((uintptr_t)buf + (alignment - 1)) & ~(alignment - 1)); 885 886 if (buf_allocated) { 887 _bdev_io_set_bounce_buf(bdev_io, aligned_buf, len); 888 } else { 889 spdk_bdev_io_set_buf(bdev_io, aligned_buf, len); 890 } 891 892 if (spdk_bdev_is_md_separate(bdev)) { 893 aligned_buf = (char *)aligned_buf + len; 894 md_len = bdev_io->u.bdev.num_blocks * bdev->md_len; 895 896 assert(((uintptr_t)aligned_buf & (alignment - 1)) == 0); 897 898 if (bdev_io->u.bdev.md_buf != NULL) { 899 _bdev_io_set_bounce_md_buf(bdev_io, aligned_buf, md_len); 900 } else { 901 spdk_bdev_io_set_md_buf(bdev_io, aligned_buf, md_len); 902 } 903 } 904 bdev_io_get_buf_complete(bdev_io, buf, true); 905 } 906 907 static void 908 _bdev_io_put_buf(struct spdk_bdev_io *bdev_io, void *buf, uint64_t buf_len) 909 { 910 struct spdk_bdev *bdev = bdev_io->bdev; 911 struct spdk_mempool *pool; 912 struct spdk_bdev_io *tmp; 913 bdev_io_stailq_t *stailq; 914 struct spdk_bdev_mgmt_channel *ch; 915 uint64_t md_len, alignment; 916 917 md_len = spdk_bdev_is_md_separate(bdev) ? bdev_io->u.bdev.num_blocks * bdev->md_len : 0; 918 alignment = spdk_bdev_get_buf_align(bdev); 919 ch = bdev_io->internal.ch->shared_resource->mgmt_ch; 920 921 if (buf_len + alignment + md_len <= SPDK_BDEV_BUF_SIZE_WITH_MD(SPDK_BDEV_SMALL_BUF_MAX_SIZE) + 922 SPDK_BDEV_POOL_ALIGNMENT) { 923 pool = g_bdev_mgr.buf_small_pool; 924 stailq = &ch->need_buf_small; 925 } else { 926 pool = g_bdev_mgr.buf_large_pool; 927 stailq = &ch->need_buf_large; 928 } 929 930 if (STAILQ_EMPTY(stailq)) { 931 spdk_mempool_put(pool, buf); 932 } else { 933 tmp = STAILQ_FIRST(stailq); 934 STAILQ_REMOVE_HEAD(stailq, internal.buf_link); 935 _bdev_io_set_buf(tmp, buf, tmp->internal.buf_len); 936 } 937 } 938 939 static void 940 bdev_io_put_buf(struct spdk_bdev_io *bdev_io) 941 { 942 assert(bdev_io->internal.buf != NULL); 943 _bdev_io_put_buf(bdev_io, bdev_io->internal.buf, bdev_io->internal.buf_len); 944 bdev_io->internal.buf = NULL; 945 } 946 947 void 948 spdk_bdev_io_put_aux_buf(struct spdk_bdev_io *bdev_io, void *buf) 949 { 950 uint64_t len = bdev_io->u.bdev.num_blocks * bdev_io->bdev->blocklen; 951 952 assert(buf != NULL); 953 _bdev_io_put_buf(bdev_io, buf, len); 954 } 955 956 static void 957 _bdev_io_unset_bounce_buf(struct spdk_bdev_io *bdev_io) 958 { 959 if (spdk_likely(bdev_io->internal.orig_iovcnt == 0)) { 960 assert(bdev_io->internal.orig_md_buf == NULL); 961 return; 962 } 963 964 /* if this is read path, copy data from bounce buffer to original buffer */ 965 if (bdev_io->type == SPDK_BDEV_IO_TYPE_READ && 966 bdev_io->internal.status == SPDK_BDEV_IO_STATUS_SUCCESS) { 967 _copy_buf_to_iovs(bdev_io->internal.orig_iovs, 968 bdev_io->internal.orig_iovcnt, 969 bdev_io->internal.bounce_iov.iov_base, 970 bdev_io->internal.bounce_iov.iov_len); 971 } 972 /* set original buffer for this io */ 973 bdev_io->u.bdev.iovcnt = bdev_io->internal.orig_iovcnt; 974 bdev_io->u.bdev.iovs = bdev_io->internal.orig_iovs; 975 /* disable bouncing buffer for this io */ 976 bdev_io->internal.orig_iovcnt = 0; 977 bdev_io->internal.orig_iovs = NULL; 978 979 /* do the same for metadata buffer */ 980 if (spdk_unlikely(bdev_io->internal.orig_md_buf != NULL)) { 981 assert(spdk_bdev_is_md_separate(bdev_io->bdev)); 982 983 if (bdev_io->type == SPDK_BDEV_IO_TYPE_READ && 984 bdev_io->internal.status == SPDK_BDEV_IO_STATUS_SUCCESS) { 985 memcpy(bdev_io->internal.orig_md_buf, bdev_io->u.bdev.md_buf, 986 bdev_io->u.bdev.num_blocks * spdk_bdev_get_md_size(bdev_io->bdev)); 987 } 988 989 bdev_io->u.bdev.md_buf = bdev_io->internal.orig_md_buf; 990 bdev_io->internal.orig_md_buf = NULL; 991 } 992 993 /* We want to free the bounce buffer here since we know we're done with it (as opposed 994 * to waiting for the conditional free of internal.buf in spdk_bdev_free_io()). 995 */ 996 bdev_io_put_buf(bdev_io); 997 } 998 999 static void 1000 bdev_io_get_buf(struct spdk_bdev_io *bdev_io, uint64_t len) 1001 { 1002 struct spdk_bdev *bdev = bdev_io->bdev; 1003 struct spdk_mempool *pool; 1004 bdev_io_stailq_t *stailq; 1005 struct spdk_bdev_mgmt_channel *mgmt_ch; 1006 uint64_t alignment, md_len; 1007 void *buf; 1008 1009 alignment = spdk_bdev_get_buf_align(bdev); 1010 md_len = spdk_bdev_is_md_separate(bdev) ? bdev_io->u.bdev.num_blocks * bdev->md_len : 0; 1011 1012 if (len + alignment + md_len > SPDK_BDEV_BUF_SIZE_WITH_MD(SPDK_BDEV_LARGE_BUF_MAX_SIZE) + 1013 SPDK_BDEV_POOL_ALIGNMENT) { 1014 SPDK_ERRLOG("Length + alignment %" PRIu64 " is larger than allowed\n", 1015 len + alignment); 1016 bdev_io_get_buf_complete(bdev_io, NULL, false); 1017 return; 1018 } 1019 1020 mgmt_ch = bdev_io->internal.ch->shared_resource->mgmt_ch; 1021 1022 bdev_io->internal.buf_len = len; 1023 1024 if (len + alignment + md_len <= SPDK_BDEV_BUF_SIZE_WITH_MD(SPDK_BDEV_SMALL_BUF_MAX_SIZE) + 1025 SPDK_BDEV_POOL_ALIGNMENT) { 1026 pool = g_bdev_mgr.buf_small_pool; 1027 stailq = &mgmt_ch->need_buf_small; 1028 } else { 1029 pool = g_bdev_mgr.buf_large_pool; 1030 stailq = &mgmt_ch->need_buf_large; 1031 } 1032 1033 buf = spdk_mempool_get(pool); 1034 if (!buf) { 1035 STAILQ_INSERT_TAIL(stailq, bdev_io, internal.buf_link); 1036 } else { 1037 _bdev_io_set_buf(bdev_io, buf, len); 1038 } 1039 } 1040 1041 void 1042 spdk_bdev_io_get_buf(struct spdk_bdev_io *bdev_io, spdk_bdev_io_get_buf_cb cb, uint64_t len) 1043 { 1044 struct spdk_bdev *bdev = bdev_io->bdev; 1045 uint64_t alignment; 1046 1047 assert(cb != NULL); 1048 bdev_io->internal.get_buf_cb = cb; 1049 1050 alignment = spdk_bdev_get_buf_align(bdev); 1051 1052 if (_is_buf_allocated(bdev_io->u.bdev.iovs) && 1053 _are_iovs_aligned(bdev_io->u.bdev.iovs, bdev_io->u.bdev.iovcnt, alignment)) { 1054 /* Buffer already present and aligned */ 1055 cb(spdk_bdev_io_get_io_channel(bdev_io), bdev_io, true); 1056 return; 1057 } 1058 1059 bdev_io_get_buf(bdev_io, len); 1060 } 1061 1062 void 1063 spdk_bdev_io_get_aux_buf(struct spdk_bdev_io *bdev_io, spdk_bdev_io_get_aux_buf_cb cb) 1064 { 1065 uint64_t len = bdev_io->u.bdev.num_blocks * bdev_io->bdev->blocklen; 1066 1067 assert(cb != NULL); 1068 assert(bdev_io->internal.get_aux_buf_cb == NULL); 1069 bdev_io->internal.get_aux_buf_cb = cb; 1070 bdev_io_get_buf(bdev_io, len); 1071 } 1072 1073 static int 1074 bdev_module_get_max_ctx_size(void) 1075 { 1076 struct spdk_bdev_module *bdev_module; 1077 int max_bdev_module_size = 0; 1078 1079 TAILQ_FOREACH(bdev_module, &g_bdev_mgr.bdev_modules, internal.tailq) { 1080 if (bdev_module->get_ctx_size && bdev_module->get_ctx_size() > max_bdev_module_size) { 1081 max_bdev_module_size = bdev_module->get_ctx_size(); 1082 } 1083 } 1084 1085 return max_bdev_module_size; 1086 } 1087 1088 static void 1089 bdev_qos_config_json(struct spdk_bdev *bdev, struct spdk_json_write_ctx *w) 1090 { 1091 int i; 1092 struct spdk_bdev_qos *qos = bdev->internal.qos; 1093 uint64_t limits[SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES]; 1094 1095 if (!qos) { 1096 return; 1097 } 1098 1099 spdk_bdev_get_qos_rate_limits(bdev, limits); 1100 1101 spdk_json_write_object_begin(w); 1102 spdk_json_write_named_string(w, "method", "bdev_set_qos_limit"); 1103 1104 spdk_json_write_named_object_begin(w, "params"); 1105 spdk_json_write_named_string(w, "name", bdev->name); 1106 for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) { 1107 if (limits[i] > 0) { 1108 spdk_json_write_named_uint64(w, qos_rpc_type[i], limits[i]); 1109 } 1110 } 1111 spdk_json_write_object_end(w); 1112 1113 spdk_json_write_object_end(w); 1114 } 1115 1116 void 1117 spdk_bdev_subsystem_config_json(struct spdk_json_write_ctx *w) 1118 { 1119 struct spdk_bdev_module *bdev_module; 1120 struct spdk_bdev *bdev; 1121 1122 assert(w != NULL); 1123 1124 spdk_json_write_array_begin(w); 1125 1126 spdk_json_write_object_begin(w); 1127 spdk_json_write_named_string(w, "method", "bdev_set_options"); 1128 spdk_json_write_named_object_begin(w, "params"); 1129 spdk_json_write_named_uint32(w, "bdev_io_pool_size", g_bdev_opts.bdev_io_pool_size); 1130 spdk_json_write_named_uint32(w, "bdev_io_cache_size", g_bdev_opts.bdev_io_cache_size); 1131 spdk_json_write_named_bool(w, "bdev_auto_examine", g_bdev_opts.bdev_auto_examine); 1132 spdk_json_write_object_end(w); 1133 spdk_json_write_object_end(w); 1134 1135 bdev_examine_allowlist_config_json(w); 1136 1137 TAILQ_FOREACH(bdev_module, &g_bdev_mgr.bdev_modules, internal.tailq) { 1138 if (bdev_module->config_json) { 1139 bdev_module->config_json(w); 1140 } 1141 } 1142 1143 pthread_mutex_lock(&g_bdev_mgr.mutex); 1144 1145 TAILQ_FOREACH(bdev, &g_bdev_mgr.bdevs, internal.link) { 1146 if (bdev->fn_table->write_config_json) { 1147 bdev->fn_table->write_config_json(bdev, w); 1148 } 1149 1150 bdev_qos_config_json(bdev, w); 1151 } 1152 1153 pthread_mutex_unlock(&g_bdev_mgr.mutex); 1154 1155 /* This has to be last RPC in array to make sure all bdevs finished examine */ 1156 spdk_json_write_object_begin(w); 1157 spdk_json_write_named_string(w, "method", "bdev_wait_for_examine"); 1158 spdk_json_write_object_end(w); 1159 1160 spdk_json_write_array_end(w); 1161 } 1162 1163 static int 1164 bdev_mgmt_channel_create(void *io_device, void *ctx_buf) 1165 { 1166 struct spdk_bdev_mgmt_channel *ch = ctx_buf; 1167 struct spdk_bdev_io *bdev_io; 1168 uint32_t i; 1169 1170 STAILQ_INIT(&ch->need_buf_small); 1171 STAILQ_INIT(&ch->need_buf_large); 1172 1173 STAILQ_INIT(&ch->per_thread_cache); 1174 ch->bdev_io_cache_size = g_bdev_opts.bdev_io_cache_size; 1175 1176 /* Pre-populate bdev_io cache to ensure this thread cannot be starved. */ 1177 ch->per_thread_cache_count = 0; 1178 for (i = 0; i < ch->bdev_io_cache_size; i++) { 1179 bdev_io = spdk_mempool_get(g_bdev_mgr.bdev_io_pool); 1180 assert(bdev_io != NULL); 1181 ch->per_thread_cache_count++; 1182 STAILQ_INSERT_HEAD(&ch->per_thread_cache, bdev_io, internal.buf_link); 1183 } 1184 1185 TAILQ_INIT(&ch->shared_resources); 1186 TAILQ_INIT(&ch->io_wait_queue); 1187 1188 return 0; 1189 } 1190 1191 static void 1192 bdev_mgmt_channel_destroy(void *io_device, void *ctx_buf) 1193 { 1194 struct spdk_bdev_mgmt_channel *ch = ctx_buf; 1195 struct spdk_bdev_io *bdev_io; 1196 1197 if (!STAILQ_EMPTY(&ch->need_buf_small) || !STAILQ_EMPTY(&ch->need_buf_large)) { 1198 SPDK_ERRLOG("Pending I/O list wasn't empty on mgmt channel free\n"); 1199 } 1200 1201 if (!TAILQ_EMPTY(&ch->shared_resources)) { 1202 SPDK_ERRLOG("Module channel list wasn't empty on mgmt channel free\n"); 1203 } 1204 1205 while (!STAILQ_EMPTY(&ch->per_thread_cache)) { 1206 bdev_io = STAILQ_FIRST(&ch->per_thread_cache); 1207 STAILQ_REMOVE_HEAD(&ch->per_thread_cache, internal.buf_link); 1208 ch->per_thread_cache_count--; 1209 spdk_mempool_put(g_bdev_mgr.bdev_io_pool, (void *)bdev_io); 1210 } 1211 1212 assert(ch->per_thread_cache_count == 0); 1213 } 1214 1215 static void 1216 bdev_init_complete(int rc) 1217 { 1218 spdk_bdev_init_cb cb_fn = g_init_cb_fn; 1219 void *cb_arg = g_init_cb_arg; 1220 struct spdk_bdev_module *m; 1221 1222 g_bdev_mgr.init_complete = true; 1223 g_init_cb_fn = NULL; 1224 g_init_cb_arg = NULL; 1225 1226 /* 1227 * For modules that need to know when subsystem init is complete, 1228 * inform them now. 1229 */ 1230 if (rc == 0) { 1231 TAILQ_FOREACH(m, &g_bdev_mgr.bdev_modules, internal.tailq) { 1232 if (m->init_complete) { 1233 m->init_complete(); 1234 } 1235 } 1236 } 1237 1238 cb_fn(cb_arg, rc); 1239 } 1240 1241 static bool 1242 bdev_module_all_actions_completed(void) 1243 { 1244 struct spdk_bdev_module *m; 1245 1246 TAILQ_FOREACH(m, &g_bdev_mgr.bdev_modules, internal.tailq) { 1247 if (m->internal.action_in_progress > 0) { 1248 return false; 1249 } 1250 } 1251 return true; 1252 } 1253 1254 static void 1255 bdev_module_action_complete(void) 1256 { 1257 /* 1258 * Don't finish bdev subsystem initialization if 1259 * module pre-initialization is still in progress, or 1260 * the subsystem been already initialized. 1261 */ 1262 if (!g_bdev_mgr.module_init_complete || g_bdev_mgr.init_complete) { 1263 return; 1264 } 1265 1266 /* 1267 * Check all bdev modules for inits/examinations in progress. If any 1268 * exist, return immediately since we cannot finish bdev subsystem 1269 * initialization until all are completed. 1270 */ 1271 if (!bdev_module_all_actions_completed()) { 1272 return; 1273 } 1274 1275 /* 1276 * Modules already finished initialization - now that all 1277 * the bdev modules have finished their asynchronous I/O 1278 * processing, the entire bdev layer can be marked as complete. 1279 */ 1280 bdev_init_complete(0); 1281 } 1282 1283 static void 1284 bdev_module_action_done(struct spdk_bdev_module *module) 1285 { 1286 assert(module->internal.action_in_progress > 0); 1287 module->internal.action_in_progress--; 1288 bdev_module_action_complete(); 1289 } 1290 1291 void 1292 spdk_bdev_module_init_done(struct spdk_bdev_module *module) 1293 { 1294 bdev_module_action_done(module); 1295 } 1296 1297 void 1298 spdk_bdev_module_examine_done(struct spdk_bdev_module *module) 1299 { 1300 bdev_module_action_done(module); 1301 } 1302 1303 /** The last initialized bdev module */ 1304 static struct spdk_bdev_module *g_resume_bdev_module = NULL; 1305 1306 static void 1307 bdev_init_failed(void *cb_arg) 1308 { 1309 struct spdk_bdev_module *module = cb_arg; 1310 1311 module->internal.action_in_progress--; 1312 bdev_init_complete(-1); 1313 } 1314 1315 static int 1316 bdev_modules_init(void) 1317 { 1318 struct spdk_bdev_module *module; 1319 int rc = 0; 1320 1321 TAILQ_FOREACH(module, &g_bdev_mgr.bdev_modules, internal.tailq) { 1322 g_resume_bdev_module = module; 1323 if (module->async_init) { 1324 module->internal.action_in_progress = 1; 1325 } 1326 rc = module->module_init(); 1327 if (rc != 0) { 1328 /* Bump action_in_progress to prevent other modules from completion of modules_init 1329 * Send message to defer application shutdown until resources are cleaned up */ 1330 module->internal.action_in_progress = 1; 1331 spdk_thread_send_msg(spdk_get_thread(), bdev_init_failed, module); 1332 return rc; 1333 } 1334 } 1335 1336 g_resume_bdev_module = NULL; 1337 return 0; 1338 } 1339 1340 void 1341 spdk_bdev_initialize(spdk_bdev_init_cb cb_fn, void *cb_arg) 1342 { 1343 int cache_size; 1344 int rc = 0; 1345 char mempool_name[32]; 1346 1347 assert(cb_fn != NULL); 1348 1349 g_init_cb_fn = cb_fn; 1350 g_init_cb_arg = cb_arg; 1351 1352 spdk_notify_type_register("bdev_register"); 1353 spdk_notify_type_register("bdev_unregister"); 1354 1355 snprintf(mempool_name, sizeof(mempool_name), "bdev_io_%d", getpid()); 1356 1357 g_bdev_mgr.bdev_io_pool = spdk_mempool_create(mempool_name, 1358 g_bdev_opts.bdev_io_pool_size, 1359 sizeof(struct spdk_bdev_io) + 1360 bdev_module_get_max_ctx_size(), 1361 0, 1362 SPDK_ENV_SOCKET_ID_ANY); 1363 1364 if (g_bdev_mgr.bdev_io_pool == NULL) { 1365 SPDK_ERRLOG("could not allocate spdk_bdev_io pool\n"); 1366 bdev_init_complete(-1); 1367 return; 1368 } 1369 1370 /** 1371 * Ensure no more than half of the total buffers end up local caches, by 1372 * using spdk_env_get_core_count() to determine how many local caches we need 1373 * to account for. 1374 */ 1375 cache_size = BUF_SMALL_POOL_SIZE / (2 * spdk_env_get_core_count()); 1376 snprintf(mempool_name, sizeof(mempool_name), "buf_small_pool_%d", getpid()); 1377 1378 g_bdev_mgr.buf_small_pool = spdk_mempool_create(mempool_name, 1379 g_bdev_opts.small_buf_pool_size, 1380 SPDK_BDEV_BUF_SIZE_WITH_MD(SPDK_BDEV_SMALL_BUF_MAX_SIZE) + 1381 SPDK_BDEV_POOL_ALIGNMENT, 1382 cache_size, 1383 SPDK_ENV_SOCKET_ID_ANY); 1384 if (!g_bdev_mgr.buf_small_pool) { 1385 SPDK_ERRLOG("create rbuf small pool failed\n"); 1386 bdev_init_complete(-1); 1387 return; 1388 } 1389 1390 cache_size = BUF_LARGE_POOL_SIZE / (2 * spdk_env_get_core_count()); 1391 snprintf(mempool_name, sizeof(mempool_name), "buf_large_pool_%d", getpid()); 1392 1393 g_bdev_mgr.buf_large_pool = spdk_mempool_create(mempool_name, 1394 g_bdev_opts.large_buf_pool_size, 1395 SPDK_BDEV_BUF_SIZE_WITH_MD(SPDK_BDEV_LARGE_BUF_MAX_SIZE) + 1396 SPDK_BDEV_POOL_ALIGNMENT, 1397 cache_size, 1398 SPDK_ENV_SOCKET_ID_ANY); 1399 if (!g_bdev_mgr.buf_large_pool) { 1400 SPDK_ERRLOG("create rbuf large pool failed\n"); 1401 bdev_init_complete(-1); 1402 return; 1403 } 1404 1405 g_bdev_mgr.zero_buffer = spdk_zmalloc(ZERO_BUFFER_SIZE, ZERO_BUFFER_SIZE, 1406 NULL, SPDK_ENV_LCORE_ID_ANY, SPDK_MALLOC_DMA); 1407 if (!g_bdev_mgr.zero_buffer) { 1408 SPDK_ERRLOG("create bdev zero buffer failed\n"); 1409 bdev_init_complete(-1); 1410 return; 1411 } 1412 1413 #ifdef SPDK_CONFIG_VTUNE 1414 g_bdev_mgr.domain = __itt_domain_create("spdk_bdev"); 1415 #endif 1416 1417 spdk_io_device_register(&g_bdev_mgr, bdev_mgmt_channel_create, 1418 bdev_mgmt_channel_destroy, 1419 sizeof(struct spdk_bdev_mgmt_channel), 1420 "bdev_mgr"); 1421 1422 rc = bdev_modules_init(); 1423 g_bdev_mgr.module_init_complete = true; 1424 if (rc != 0) { 1425 SPDK_ERRLOG("bdev modules init failed\n"); 1426 return; 1427 } 1428 1429 bdev_module_action_complete(); 1430 } 1431 1432 static void 1433 bdev_mgr_unregister_cb(void *io_device) 1434 { 1435 spdk_bdev_fini_cb cb_fn = g_fini_cb_fn; 1436 1437 if (g_bdev_mgr.bdev_io_pool) { 1438 if (spdk_mempool_count(g_bdev_mgr.bdev_io_pool) != g_bdev_opts.bdev_io_pool_size) { 1439 SPDK_ERRLOG("bdev IO pool count is %zu but should be %u\n", 1440 spdk_mempool_count(g_bdev_mgr.bdev_io_pool), 1441 g_bdev_opts.bdev_io_pool_size); 1442 } 1443 1444 spdk_mempool_free(g_bdev_mgr.bdev_io_pool); 1445 } 1446 1447 if (g_bdev_mgr.buf_small_pool) { 1448 if (spdk_mempool_count(g_bdev_mgr.buf_small_pool) != g_bdev_opts.small_buf_pool_size) { 1449 SPDK_ERRLOG("Small buffer pool count is %zu but should be %u\n", 1450 spdk_mempool_count(g_bdev_mgr.buf_small_pool), 1451 g_bdev_opts.small_buf_pool_size); 1452 assert(false); 1453 } 1454 1455 spdk_mempool_free(g_bdev_mgr.buf_small_pool); 1456 } 1457 1458 if (g_bdev_mgr.buf_large_pool) { 1459 if (spdk_mempool_count(g_bdev_mgr.buf_large_pool) != g_bdev_opts.large_buf_pool_size) { 1460 SPDK_ERRLOG("Large buffer pool count is %zu but should be %u\n", 1461 spdk_mempool_count(g_bdev_mgr.buf_large_pool), 1462 g_bdev_opts.large_buf_pool_size); 1463 assert(false); 1464 } 1465 1466 spdk_mempool_free(g_bdev_mgr.buf_large_pool); 1467 } 1468 1469 spdk_free(g_bdev_mgr.zero_buffer); 1470 1471 bdev_examine_allowlist_free(); 1472 1473 cb_fn(g_fini_cb_arg); 1474 g_fini_cb_fn = NULL; 1475 g_fini_cb_arg = NULL; 1476 g_bdev_mgr.init_complete = false; 1477 g_bdev_mgr.module_init_complete = false; 1478 pthread_mutex_destroy(&g_bdev_mgr.mutex); 1479 } 1480 1481 static void 1482 bdev_module_finish_iter(void *arg) 1483 { 1484 struct spdk_bdev_module *bdev_module; 1485 1486 /* FIXME: Handling initialization failures is broken now, 1487 * so we won't even try cleaning up after successfully 1488 * initialized modules. if module_init_complete is false, 1489 * just call spdk_bdev_mgr_unregister_cb 1490 */ 1491 if (!g_bdev_mgr.module_init_complete) { 1492 bdev_mgr_unregister_cb(NULL); 1493 return; 1494 } 1495 1496 /* Start iterating from the last touched module */ 1497 if (!g_resume_bdev_module) { 1498 bdev_module = TAILQ_LAST(&g_bdev_mgr.bdev_modules, bdev_module_list); 1499 } else { 1500 bdev_module = TAILQ_PREV(g_resume_bdev_module, bdev_module_list, 1501 internal.tailq); 1502 } 1503 1504 while (bdev_module) { 1505 if (bdev_module->async_fini) { 1506 /* Save our place so we can resume later. We must 1507 * save the variable here, before calling module_fini() 1508 * below, because in some cases the module may immediately 1509 * call spdk_bdev_module_finish_done() and re-enter 1510 * this function to continue iterating. */ 1511 g_resume_bdev_module = bdev_module; 1512 } 1513 1514 if (bdev_module->module_fini) { 1515 bdev_module->module_fini(); 1516 } 1517 1518 if (bdev_module->async_fini) { 1519 return; 1520 } 1521 1522 bdev_module = TAILQ_PREV(bdev_module, bdev_module_list, 1523 internal.tailq); 1524 } 1525 1526 g_resume_bdev_module = NULL; 1527 spdk_io_device_unregister(&g_bdev_mgr, bdev_mgr_unregister_cb); 1528 } 1529 1530 void 1531 spdk_bdev_module_finish_done(void) 1532 { 1533 if (spdk_get_thread() != g_fini_thread) { 1534 spdk_thread_send_msg(g_fini_thread, bdev_module_finish_iter, NULL); 1535 } else { 1536 bdev_module_finish_iter(NULL); 1537 } 1538 } 1539 1540 static void 1541 bdev_finish_unregister_bdevs_iter(void *cb_arg, int bdeverrno) 1542 { 1543 struct spdk_bdev *bdev = cb_arg; 1544 1545 if (bdeverrno && bdev) { 1546 SPDK_WARNLOG("Unable to unregister bdev '%s' during spdk_bdev_finish()\n", 1547 bdev->name); 1548 1549 /* 1550 * Since the call to spdk_bdev_unregister() failed, we have no way to free this 1551 * bdev; try to continue by manually removing this bdev from the list and continue 1552 * with the next bdev in the list. 1553 */ 1554 TAILQ_REMOVE(&g_bdev_mgr.bdevs, bdev, internal.link); 1555 } 1556 1557 if (TAILQ_EMPTY(&g_bdev_mgr.bdevs)) { 1558 SPDK_DEBUGLOG(bdev, "Done unregistering bdevs\n"); 1559 /* 1560 * Bdev module finish need to be deferred as we might be in the middle of some context 1561 * (like bdev part free) that will use this bdev (or private bdev driver ctx data) 1562 * after returning. 1563 */ 1564 spdk_thread_send_msg(spdk_get_thread(), bdev_module_finish_iter, NULL); 1565 return; 1566 } 1567 1568 /* 1569 * Unregister last unclaimed bdev in the list, to ensure that bdev subsystem 1570 * shutdown proceeds top-down. The goal is to give virtual bdevs an opportunity 1571 * to detect clean shutdown as opposed to run-time hot removal of the underlying 1572 * base bdevs. 1573 * 1574 * Also, walk the list in the reverse order. 1575 */ 1576 for (bdev = TAILQ_LAST(&g_bdev_mgr.bdevs, spdk_bdev_list); 1577 bdev; bdev = TAILQ_PREV(bdev, spdk_bdev_list, internal.link)) { 1578 if (bdev->internal.claim_module != NULL) { 1579 SPDK_DEBUGLOG(bdev, "Skipping claimed bdev '%s'(<-'%s').\n", 1580 bdev->name, bdev->internal.claim_module->name); 1581 continue; 1582 } 1583 1584 SPDK_DEBUGLOG(bdev, "Unregistering bdev '%s'\n", bdev->name); 1585 spdk_bdev_unregister(bdev, bdev_finish_unregister_bdevs_iter, bdev); 1586 return; 1587 } 1588 1589 /* 1590 * If any bdev fails to unclaim underlying bdev properly, we may face the 1591 * case of bdev list consisting of claimed bdevs only (if claims are managed 1592 * correctly, this would mean there's a loop in the claims graph which is 1593 * clearly impossible). Warn and unregister last bdev on the list then. 1594 */ 1595 for (bdev = TAILQ_LAST(&g_bdev_mgr.bdevs, spdk_bdev_list); 1596 bdev; bdev = TAILQ_PREV(bdev, spdk_bdev_list, internal.link)) { 1597 SPDK_WARNLOG("Unregistering claimed bdev '%s'!\n", bdev->name); 1598 spdk_bdev_unregister(bdev, bdev_finish_unregister_bdevs_iter, bdev); 1599 return; 1600 } 1601 } 1602 1603 void 1604 spdk_bdev_finish(spdk_bdev_fini_cb cb_fn, void *cb_arg) 1605 { 1606 struct spdk_bdev_module *m; 1607 1608 assert(cb_fn != NULL); 1609 1610 g_fini_thread = spdk_get_thread(); 1611 1612 g_fini_cb_fn = cb_fn; 1613 g_fini_cb_arg = cb_arg; 1614 1615 TAILQ_FOREACH(m, &g_bdev_mgr.bdev_modules, internal.tailq) { 1616 if (m->fini_start) { 1617 m->fini_start(); 1618 } 1619 } 1620 1621 bdev_finish_unregister_bdevs_iter(NULL, 0); 1622 } 1623 1624 struct spdk_bdev_io * 1625 bdev_channel_get_io(struct spdk_bdev_channel *channel) 1626 { 1627 struct spdk_bdev_mgmt_channel *ch = channel->shared_resource->mgmt_ch; 1628 struct spdk_bdev_io *bdev_io; 1629 1630 if (ch->per_thread_cache_count > 0) { 1631 bdev_io = STAILQ_FIRST(&ch->per_thread_cache); 1632 STAILQ_REMOVE_HEAD(&ch->per_thread_cache, internal.buf_link); 1633 ch->per_thread_cache_count--; 1634 } else if (spdk_unlikely(!TAILQ_EMPTY(&ch->io_wait_queue))) { 1635 /* 1636 * Don't try to look for bdev_ios in the global pool if there are 1637 * waiters on bdev_ios - we don't want this caller to jump the line. 1638 */ 1639 bdev_io = NULL; 1640 } else { 1641 bdev_io = spdk_mempool_get(g_bdev_mgr.bdev_io_pool); 1642 } 1643 1644 return bdev_io; 1645 } 1646 1647 void 1648 spdk_bdev_free_io(struct spdk_bdev_io *bdev_io) 1649 { 1650 struct spdk_bdev_mgmt_channel *ch; 1651 1652 assert(bdev_io != NULL); 1653 assert(bdev_io->internal.status != SPDK_BDEV_IO_STATUS_PENDING); 1654 1655 ch = bdev_io->internal.ch->shared_resource->mgmt_ch; 1656 1657 if (bdev_io->internal.buf != NULL) { 1658 bdev_io_put_buf(bdev_io); 1659 } 1660 1661 if (ch->per_thread_cache_count < ch->bdev_io_cache_size) { 1662 ch->per_thread_cache_count++; 1663 STAILQ_INSERT_HEAD(&ch->per_thread_cache, bdev_io, internal.buf_link); 1664 while (ch->per_thread_cache_count > 0 && !TAILQ_EMPTY(&ch->io_wait_queue)) { 1665 struct spdk_bdev_io_wait_entry *entry; 1666 1667 entry = TAILQ_FIRST(&ch->io_wait_queue); 1668 TAILQ_REMOVE(&ch->io_wait_queue, entry, link); 1669 entry->cb_fn(entry->cb_arg); 1670 } 1671 } else { 1672 /* We should never have a full cache with entries on the io wait queue. */ 1673 assert(TAILQ_EMPTY(&ch->io_wait_queue)); 1674 spdk_mempool_put(g_bdev_mgr.bdev_io_pool, (void *)bdev_io); 1675 } 1676 } 1677 1678 static bool 1679 bdev_qos_is_iops_rate_limit(enum spdk_bdev_qos_rate_limit_type limit) 1680 { 1681 assert(limit != SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES); 1682 1683 switch (limit) { 1684 case SPDK_BDEV_QOS_RW_IOPS_RATE_LIMIT: 1685 return true; 1686 case SPDK_BDEV_QOS_RW_BPS_RATE_LIMIT: 1687 case SPDK_BDEV_QOS_R_BPS_RATE_LIMIT: 1688 case SPDK_BDEV_QOS_W_BPS_RATE_LIMIT: 1689 return false; 1690 case SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES: 1691 default: 1692 return false; 1693 } 1694 } 1695 1696 static bool 1697 bdev_qos_io_to_limit(struct spdk_bdev_io *bdev_io) 1698 { 1699 switch (bdev_io->type) { 1700 case SPDK_BDEV_IO_TYPE_NVME_IO: 1701 case SPDK_BDEV_IO_TYPE_NVME_IO_MD: 1702 case SPDK_BDEV_IO_TYPE_READ: 1703 case SPDK_BDEV_IO_TYPE_WRITE: 1704 return true; 1705 case SPDK_BDEV_IO_TYPE_ZCOPY: 1706 if (bdev_io->u.bdev.zcopy.start) { 1707 return true; 1708 } else { 1709 return false; 1710 } 1711 default: 1712 return false; 1713 } 1714 } 1715 1716 static bool 1717 bdev_is_read_io(struct spdk_bdev_io *bdev_io) 1718 { 1719 switch (bdev_io->type) { 1720 case SPDK_BDEV_IO_TYPE_NVME_IO: 1721 case SPDK_BDEV_IO_TYPE_NVME_IO_MD: 1722 /* Bit 1 (0x2) set for read operation */ 1723 if (bdev_io->u.nvme_passthru.cmd.opc & SPDK_NVME_OPC_READ) { 1724 return true; 1725 } else { 1726 return false; 1727 } 1728 case SPDK_BDEV_IO_TYPE_READ: 1729 return true; 1730 case SPDK_BDEV_IO_TYPE_ZCOPY: 1731 /* Populate to read from disk */ 1732 if (bdev_io->u.bdev.zcopy.populate) { 1733 return true; 1734 } else { 1735 return false; 1736 } 1737 default: 1738 return false; 1739 } 1740 } 1741 1742 static uint64_t 1743 bdev_get_io_size_in_byte(struct spdk_bdev_io *bdev_io) 1744 { 1745 struct spdk_bdev *bdev = bdev_io->bdev; 1746 1747 switch (bdev_io->type) { 1748 case SPDK_BDEV_IO_TYPE_NVME_IO: 1749 case SPDK_BDEV_IO_TYPE_NVME_IO_MD: 1750 return bdev_io->u.nvme_passthru.nbytes; 1751 case SPDK_BDEV_IO_TYPE_READ: 1752 case SPDK_BDEV_IO_TYPE_WRITE: 1753 return bdev_io->u.bdev.num_blocks * bdev->blocklen; 1754 case SPDK_BDEV_IO_TYPE_ZCOPY: 1755 /* Track the data in the start phase only */ 1756 if (bdev_io->u.bdev.zcopy.start) { 1757 return bdev_io->u.bdev.num_blocks * bdev->blocklen; 1758 } else { 1759 return 0; 1760 } 1761 default: 1762 return 0; 1763 } 1764 } 1765 1766 static bool 1767 bdev_qos_rw_queue_io(const struct spdk_bdev_qos_limit *limit, struct spdk_bdev_io *io) 1768 { 1769 if (limit->max_per_timeslice > 0 && limit->remaining_this_timeslice <= 0) { 1770 return true; 1771 } else { 1772 return false; 1773 } 1774 } 1775 1776 static bool 1777 bdev_qos_r_queue_io(const struct spdk_bdev_qos_limit *limit, struct spdk_bdev_io *io) 1778 { 1779 if (bdev_is_read_io(io) == false) { 1780 return false; 1781 } 1782 1783 return bdev_qos_rw_queue_io(limit, io); 1784 } 1785 1786 static bool 1787 bdev_qos_w_queue_io(const struct spdk_bdev_qos_limit *limit, struct spdk_bdev_io *io) 1788 { 1789 if (bdev_is_read_io(io) == true) { 1790 return false; 1791 } 1792 1793 return bdev_qos_rw_queue_io(limit, io); 1794 } 1795 1796 static void 1797 bdev_qos_rw_iops_update_quota(struct spdk_bdev_qos_limit *limit, struct spdk_bdev_io *io) 1798 { 1799 limit->remaining_this_timeslice--; 1800 } 1801 1802 static void 1803 bdev_qos_rw_bps_update_quota(struct spdk_bdev_qos_limit *limit, struct spdk_bdev_io *io) 1804 { 1805 limit->remaining_this_timeslice -= bdev_get_io_size_in_byte(io); 1806 } 1807 1808 static void 1809 bdev_qos_r_bps_update_quota(struct spdk_bdev_qos_limit *limit, struct spdk_bdev_io *io) 1810 { 1811 if (bdev_is_read_io(io) == false) { 1812 return; 1813 } 1814 1815 return bdev_qos_rw_bps_update_quota(limit, io); 1816 } 1817 1818 static void 1819 bdev_qos_w_bps_update_quota(struct spdk_bdev_qos_limit *limit, struct spdk_bdev_io *io) 1820 { 1821 if (bdev_is_read_io(io) == true) { 1822 return; 1823 } 1824 1825 return bdev_qos_rw_bps_update_quota(limit, io); 1826 } 1827 1828 static void 1829 bdev_qos_set_ops(struct spdk_bdev_qos *qos) 1830 { 1831 int i; 1832 1833 for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) { 1834 if (qos->rate_limits[i].limit == SPDK_BDEV_QOS_LIMIT_NOT_DEFINED) { 1835 qos->rate_limits[i].queue_io = NULL; 1836 qos->rate_limits[i].update_quota = NULL; 1837 continue; 1838 } 1839 1840 switch (i) { 1841 case SPDK_BDEV_QOS_RW_IOPS_RATE_LIMIT: 1842 qos->rate_limits[i].queue_io = bdev_qos_rw_queue_io; 1843 qos->rate_limits[i].update_quota = bdev_qos_rw_iops_update_quota; 1844 break; 1845 case SPDK_BDEV_QOS_RW_BPS_RATE_LIMIT: 1846 qos->rate_limits[i].queue_io = bdev_qos_rw_queue_io; 1847 qos->rate_limits[i].update_quota = bdev_qos_rw_bps_update_quota; 1848 break; 1849 case SPDK_BDEV_QOS_R_BPS_RATE_LIMIT: 1850 qos->rate_limits[i].queue_io = bdev_qos_r_queue_io; 1851 qos->rate_limits[i].update_quota = bdev_qos_r_bps_update_quota; 1852 break; 1853 case SPDK_BDEV_QOS_W_BPS_RATE_LIMIT: 1854 qos->rate_limits[i].queue_io = bdev_qos_w_queue_io; 1855 qos->rate_limits[i].update_quota = bdev_qos_w_bps_update_quota; 1856 break; 1857 default: 1858 break; 1859 } 1860 } 1861 } 1862 1863 static void 1864 _bdev_io_complete_in_submit(struct spdk_bdev_channel *bdev_ch, 1865 struct spdk_bdev_io *bdev_io, 1866 enum spdk_bdev_io_status status) 1867 { 1868 struct spdk_bdev_shared_resource *shared_resource = bdev_ch->shared_resource; 1869 1870 bdev_io->internal.in_submit_request = true; 1871 bdev_ch->io_outstanding++; 1872 shared_resource->io_outstanding++; 1873 spdk_bdev_io_complete(bdev_io, status); 1874 bdev_io->internal.in_submit_request = false; 1875 } 1876 1877 static inline void 1878 bdev_io_do_submit(struct spdk_bdev_channel *bdev_ch, struct spdk_bdev_io *bdev_io) 1879 { 1880 struct spdk_bdev *bdev = bdev_io->bdev; 1881 struct spdk_io_channel *ch = bdev_ch->channel; 1882 struct spdk_bdev_shared_resource *shared_resource = bdev_ch->shared_resource; 1883 1884 if (spdk_unlikely(bdev_io->type == SPDK_BDEV_IO_TYPE_ABORT)) { 1885 struct spdk_bdev_mgmt_channel *mgmt_channel = shared_resource->mgmt_ch; 1886 struct spdk_bdev_io *bio_to_abort = bdev_io->u.abort.bio_to_abort; 1887 1888 if (bdev_abort_queued_io(&shared_resource->nomem_io, bio_to_abort) || 1889 bdev_abort_buf_io(&mgmt_channel->need_buf_small, bio_to_abort) || 1890 bdev_abort_buf_io(&mgmt_channel->need_buf_large, bio_to_abort)) { 1891 _bdev_io_complete_in_submit(bdev_ch, bdev_io, 1892 SPDK_BDEV_IO_STATUS_SUCCESS); 1893 return; 1894 } 1895 } 1896 1897 if (spdk_likely(TAILQ_EMPTY(&shared_resource->nomem_io))) { 1898 bdev_ch->io_outstanding++; 1899 shared_resource->io_outstanding++; 1900 bdev_io->internal.in_submit_request = true; 1901 bdev->fn_table->submit_request(ch, bdev_io); 1902 bdev_io->internal.in_submit_request = false; 1903 } else { 1904 TAILQ_INSERT_TAIL(&shared_resource->nomem_io, bdev_io, internal.link); 1905 } 1906 } 1907 1908 static int 1909 bdev_qos_io_submit(struct spdk_bdev_channel *ch, struct spdk_bdev_qos *qos) 1910 { 1911 struct spdk_bdev_io *bdev_io = NULL, *tmp = NULL; 1912 int i, submitted_ios = 0; 1913 1914 TAILQ_FOREACH_SAFE(bdev_io, &qos->queued, internal.link, tmp) { 1915 if (bdev_qos_io_to_limit(bdev_io) == true) { 1916 for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) { 1917 if (!qos->rate_limits[i].queue_io) { 1918 continue; 1919 } 1920 1921 if (qos->rate_limits[i].queue_io(&qos->rate_limits[i], 1922 bdev_io) == true) { 1923 return submitted_ios; 1924 } 1925 } 1926 for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) { 1927 if (!qos->rate_limits[i].update_quota) { 1928 continue; 1929 } 1930 1931 qos->rate_limits[i].update_quota(&qos->rate_limits[i], bdev_io); 1932 } 1933 } 1934 1935 TAILQ_REMOVE(&qos->queued, bdev_io, internal.link); 1936 bdev_io_do_submit(ch, bdev_io); 1937 submitted_ios++; 1938 } 1939 1940 return submitted_ios; 1941 } 1942 1943 static void 1944 bdev_queue_io_wait_with_cb(struct spdk_bdev_io *bdev_io, spdk_bdev_io_wait_cb cb_fn) 1945 { 1946 int rc; 1947 1948 bdev_io->internal.waitq_entry.bdev = bdev_io->bdev; 1949 bdev_io->internal.waitq_entry.cb_fn = cb_fn; 1950 bdev_io->internal.waitq_entry.cb_arg = bdev_io; 1951 rc = spdk_bdev_queue_io_wait(bdev_io->bdev, spdk_io_channel_from_ctx(bdev_io->internal.ch), 1952 &bdev_io->internal.waitq_entry); 1953 if (rc != 0) { 1954 SPDK_ERRLOG("Queue IO failed, rc=%d\n", rc); 1955 bdev_io->internal.status = SPDK_BDEV_IO_STATUS_FAILED; 1956 bdev_io->internal.cb(bdev_io, false, bdev_io->internal.caller_ctx); 1957 } 1958 } 1959 1960 static bool 1961 bdev_io_type_can_split(uint8_t type) 1962 { 1963 assert(type != SPDK_BDEV_IO_TYPE_INVALID); 1964 assert(type < SPDK_BDEV_NUM_IO_TYPES); 1965 1966 /* Only split READ and WRITE I/O. Theoretically other types of I/O like 1967 * UNMAP could be split, but these types of I/O are typically much larger 1968 * in size (sometimes the size of the entire block device), and the bdev 1969 * module can more efficiently split these types of I/O. Plus those types 1970 * of I/O do not have a payload, which makes the splitting process simpler. 1971 */ 1972 if (type == SPDK_BDEV_IO_TYPE_READ || type == SPDK_BDEV_IO_TYPE_WRITE) { 1973 return true; 1974 } else { 1975 return false; 1976 } 1977 } 1978 1979 static bool 1980 bdev_io_should_split(struct spdk_bdev_io *bdev_io) 1981 { 1982 uint32_t io_boundary = bdev_io->bdev->optimal_io_boundary; 1983 uint32_t max_size = bdev_io->bdev->max_segment_size; 1984 int max_segs = bdev_io->bdev->max_num_segments; 1985 1986 io_boundary = bdev_io->bdev->split_on_optimal_io_boundary ? io_boundary : 0; 1987 1988 if (spdk_likely(!io_boundary && !max_segs && !max_size)) { 1989 return false; 1990 } 1991 1992 if (!bdev_io_type_can_split(bdev_io->type)) { 1993 return false; 1994 } 1995 1996 if (io_boundary) { 1997 uint64_t start_stripe, end_stripe; 1998 1999 start_stripe = bdev_io->u.bdev.offset_blocks; 2000 end_stripe = start_stripe + bdev_io->u.bdev.num_blocks - 1; 2001 /* Avoid expensive div operations if possible. These spdk_u32 functions are very cheap. */ 2002 if (spdk_likely(spdk_u32_is_pow2(io_boundary))) { 2003 start_stripe >>= spdk_u32log2(io_boundary); 2004 end_stripe >>= spdk_u32log2(io_boundary); 2005 } else { 2006 start_stripe /= io_boundary; 2007 end_stripe /= io_boundary; 2008 } 2009 2010 if (start_stripe != end_stripe) { 2011 return true; 2012 } 2013 } 2014 2015 if (max_segs) { 2016 if (bdev_io->u.bdev.iovcnt > max_segs) { 2017 return true; 2018 } 2019 } 2020 2021 if (max_size) { 2022 for (int i = 0; i < bdev_io->u.bdev.iovcnt; i++) { 2023 if (bdev_io->u.bdev.iovs[i].iov_len > max_size) { 2024 return true; 2025 } 2026 } 2027 } 2028 2029 return false; 2030 } 2031 2032 static uint32_t 2033 _to_next_boundary(uint64_t offset, uint32_t boundary) 2034 { 2035 return (boundary - (offset % boundary)); 2036 } 2037 2038 static void 2039 bdev_io_split_done(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg); 2040 2041 static void 2042 _bdev_io_split(void *_bdev_io) 2043 { 2044 struct iovec *parent_iov, *iov; 2045 struct spdk_bdev_io *bdev_io = _bdev_io; 2046 struct spdk_bdev *bdev = bdev_io->bdev; 2047 uint64_t parent_offset, current_offset, remaining; 2048 uint32_t parent_iov_offset, parent_iovcnt, parent_iovpos, child_iovcnt; 2049 uint32_t to_next_boundary, to_next_boundary_bytes, to_last_block_bytes; 2050 uint32_t iovcnt, iov_len, child_iovsize; 2051 uint32_t blocklen = bdev->blocklen; 2052 uint32_t io_boundary = bdev->optimal_io_boundary; 2053 uint32_t max_segment_size = bdev->max_segment_size; 2054 uint32_t max_child_iovcnt = bdev->max_num_segments; 2055 void *md_buf = NULL; 2056 int rc; 2057 2058 max_segment_size = max_segment_size ? max_segment_size : UINT32_MAX; 2059 max_child_iovcnt = max_child_iovcnt ? spdk_min(max_child_iovcnt, BDEV_IO_NUM_CHILD_IOV) : 2060 BDEV_IO_NUM_CHILD_IOV; 2061 io_boundary = bdev->split_on_optimal_io_boundary ? io_boundary : UINT32_MAX; 2062 2063 remaining = bdev_io->u.bdev.split_remaining_num_blocks; 2064 current_offset = bdev_io->u.bdev.split_current_offset_blocks; 2065 parent_offset = bdev_io->u.bdev.offset_blocks; 2066 parent_iov_offset = (current_offset - parent_offset) * blocklen; 2067 parent_iovcnt = bdev_io->u.bdev.iovcnt; 2068 2069 for (parent_iovpos = 0; parent_iovpos < parent_iovcnt; parent_iovpos++) { 2070 parent_iov = &bdev_io->u.bdev.iovs[parent_iovpos]; 2071 if (parent_iov_offset < parent_iov->iov_len) { 2072 break; 2073 } 2074 parent_iov_offset -= parent_iov->iov_len; 2075 } 2076 2077 child_iovcnt = 0; 2078 while (remaining > 0 && parent_iovpos < parent_iovcnt && child_iovcnt < BDEV_IO_NUM_CHILD_IOV) { 2079 to_next_boundary = _to_next_boundary(current_offset, io_boundary); 2080 to_next_boundary = spdk_min(remaining, to_next_boundary); 2081 to_next_boundary_bytes = to_next_boundary * blocklen; 2082 2083 iov = &bdev_io->child_iov[child_iovcnt]; 2084 iovcnt = 0; 2085 2086 if (bdev_io->u.bdev.md_buf) { 2087 md_buf = (char *)bdev_io->u.bdev.md_buf + 2088 (current_offset - parent_offset) * spdk_bdev_get_md_size(bdev); 2089 } 2090 2091 child_iovsize = spdk_min(BDEV_IO_NUM_CHILD_IOV - child_iovcnt, max_child_iovcnt); 2092 while (to_next_boundary_bytes > 0 && parent_iovpos < parent_iovcnt && 2093 iovcnt < child_iovsize) { 2094 parent_iov = &bdev_io->u.bdev.iovs[parent_iovpos]; 2095 iov_len = parent_iov->iov_len - parent_iov_offset; 2096 2097 iov_len = spdk_min(iov_len, max_segment_size); 2098 iov_len = spdk_min(iov_len, to_next_boundary_bytes); 2099 to_next_boundary_bytes -= iov_len; 2100 2101 bdev_io->child_iov[child_iovcnt].iov_base = parent_iov->iov_base + parent_iov_offset; 2102 bdev_io->child_iov[child_iovcnt].iov_len = iov_len; 2103 2104 if (iov_len < parent_iov->iov_len - parent_iov_offset) { 2105 parent_iov_offset += iov_len; 2106 } else { 2107 parent_iovpos++; 2108 parent_iov_offset = 0; 2109 } 2110 child_iovcnt++; 2111 iovcnt++; 2112 } 2113 2114 if (to_next_boundary_bytes > 0) { 2115 /* We had to stop this child I/O early because we ran out of 2116 * child_iov space or were limited by max_num_segments. 2117 * Ensure the iovs to be aligned with block size and 2118 * then adjust to_next_boundary before starting the 2119 * child I/O. 2120 */ 2121 assert(child_iovcnt == BDEV_IO_NUM_CHILD_IOV || 2122 iovcnt == child_iovsize); 2123 to_last_block_bytes = to_next_boundary_bytes % blocklen; 2124 if (to_last_block_bytes != 0) { 2125 uint32_t child_iovpos = child_iovcnt - 1; 2126 /* don't decrease child_iovcnt when it equals to BDEV_IO_NUM_CHILD_IOV 2127 * so the loop will naturally end 2128 */ 2129 2130 to_last_block_bytes = blocklen - to_last_block_bytes; 2131 to_next_boundary_bytes += to_last_block_bytes; 2132 while (to_last_block_bytes > 0 && iovcnt > 0) { 2133 iov_len = spdk_min(to_last_block_bytes, 2134 bdev_io->child_iov[child_iovpos].iov_len); 2135 bdev_io->child_iov[child_iovpos].iov_len -= iov_len; 2136 if (bdev_io->child_iov[child_iovpos].iov_len == 0) { 2137 child_iovpos--; 2138 if (--iovcnt == 0) { 2139 /* If the child IO is less than a block size just return. 2140 * If the first child IO of any split round is less than 2141 * a block size, an error exit. 2142 */ 2143 if (bdev_io->u.bdev.split_outstanding == 0) { 2144 SPDK_ERRLOG("The first child io was less than a block size\n"); 2145 bdev_io->internal.status = SPDK_BDEV_IO_STATUS_FAILED; 2146 spdk_trace_record_tsc(spdk_get_ticks(), TRACE_BDEV_IO_DONE, 0, 0, 2147 (uintptr_t)bdev_io, 0); 2148 TAILQ_REMOVE(&bdev_io->internal.ch->io_submitted, bdev_io, internal.ch_link); 2149 bdev_io->internal.cb(bdev_io, false, bdev_io->internal.caller_ctx); 2150 } 2151 2152 return; 2153 } 2154 } 2155 2156 to_last_block_bytes -= iov_len; 2157 2158 if (parent_iov_offset == 0) { 2159 parent_iovpos--; 2160 parent_iov_offset = bdev_io->u.bdev.iovs[parent_iovpos].iov_len; 2161 } 2162 parent_iov_offset -= iov_len; 2163 } 2164 2165 assert(to_last_block_bytes == 0); 2166 } 2167 to_next_boundary -= to_next_boundary_bytes / blocklen; 2168 } 2169 2170 bdev_io->u.bdev.split_outstanding++; 2171 2172 if (bdev_io->type == SPDK_BDEV_IO_TYPE_READ) { 2173 rc = bdev_readv_blocks_with_md(bdev_io->internal.desc, 2174 spdk_io_channel_from_ctx(bdev_io->internal.ch), 2175 iov, iovcnt, md_buf, current_offset, 2176 to_next_boundary, 2177 bdev_io_split_done, bdev_io); 2178 } else { 2179 rc = bdev_writev_blocks_with_md(bdev_io->internal.desc, 2180 spdk_io_channel_from_ctx(bdev_io->internal.ch), 2181 iov, iovcnt, md_buf, current_offset, 2182 to_next_boundary, 2183 bdev_io_split_done, bdev_io); 2184 } 2185 2186 if (rc == 0) { 2187 current_offset += to_next_boundary; 2188 remaining -= to_next_boundary; 2189 bdev_io->u.bdev.split_current_offset_blocks = current_offset; 2190 bdev_io->u.bdev.split_remaining_num_blocks = remaining; 2191 } else { 2192 bdev_io->u.bdev.split_outstanding--; 2193 if (rc == -ENOMEM) { 2194 if (bdev_io->u.bdev.split_outstanding == 0) { 2195 /* No I/O is outstanding. Hence we should wait here. */ 2196 bdev_queue_io_wait_with_cb(bdev_io, _bdev_io_split); 2197 } 2198 } else { 2199 bdev_io->internal.status = SPDK_BDEV_IO_STATUS_FAILED; 2200 if (bdev_io->u.bdev.split_outstanding == 0) { 2201 spdk_trace_record_tsc(spdk_get_ticks(), TRACE_BDEV_IO_DONE, 0, 0, 2202 (uintptr_t)bdev_io, 0); 2203 TAILQ_REMOVE(&bdev_io->internal.ch->io_submitted, bdev_io, internal.ch_link); 2204 bdev_io->internal.cb(bdev_io, false, bdev_io->internal.caller_ctx); 2205 } 2206 } 2207 2208 return; 2209 } 2210 } 2211 } 2212 2213 static void 2214 bdev_io_split_done(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg) 2215 { 2216 struct spdk_bdev_io *parent_io = cb_arg; 2217 2218 spdk_bdev_free_io(bdev_io); 2219 2220 if (!success) { 2221 parent_io->internal.status = SPDK_BDEV_IO_STATUS_FAILED; 2222 /* If any child I/O failed, stop further splitting process. */ 2223 parent_io->u.bdev.split_current_offset_blocks += parent_io->u.bdev.split_remaining_num_blocks; 2224 parent_io->u.bdev.split_remaining_num_blocks = 0; 2225 } 2226 parent_io->u.bdev.split_outstanding--; 2227 if (parent_io->u.bdev.split_outstanding != 0) { 2228 return; 2229 } 2230 2231 /* 2232 * Parent I/O finishes when all blocks are consumed. 2233 */ 2234 if (parent_io->u.bdev.split_remaining_num_blocks == 0) { 2235 assert(parent_io->internal.cb != bdev_io_split_done); 2236 spdk_trace_record_tsc(spdk_get_ticks(), TRACE_BDEV_IO_DONE, 0, 0, 2237 (uintptr_t)parent_io, 0); 2238 TAILQ_REMOVE(&parent_io->internal.ch->io_submitted, parent_io, internal.ch_link); 2239 parent_io->internal.cb(parent_io, parent_io->internal.status == SPDK_BDEV_IO_STATUS_SUCCESS, 2240 parent_io->internal.caller_ctx); 2241 return; 2242 } 2243 2244 /* 2245 * Continue with the splitting process. This function will complete the parent I/O if the 2246 * splitting is done. 2247 */ 2248 _bdev_io_split(parent_io); 2249 } 2250 2251 static void 2252 bdev_io_split_get_buf_cb(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io, bool success); 2253 2254 static void 2255 bdev_io_split(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io) 2256 { 2257 assert(bdev_io_type_can_split(bdev_io->type)); 2258 2259 bdev_io->u.bdev.split_current_offset_blocks = bdev_io->u.bdev.offset_blocks; 2260 bdev_io->u.bdev.split_remaining_num_blocks = bdev_io->u.bdev.num_blocks; 2261 bdev_io->u.bdev.split_outstanding = 0; 2262 bdev_io->internal.status = SPDK_BDEV_IO_STATUS_SUCCESS; 2263 2264 if (_is_buf_allocated(bdev_io->u.bdev.iovs)) { 2265 _bdev_io_split(bdev_io); 2266 } else { 2267 assert(bdev_io->type == SPDK_BDEV_IO_TYPE_READ); 2268 spdk_bdev_io_get_buf(bdev_io, bdev_io_split_get_buf_cb, 2269 bdev_io->u.bdev.num_blocks * bdev_io->bdev->blocklen); 2270 } 2271 } 2272 2273 static void 2274 bdev_io_split_get_buf_cb(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io, bool success) 2275 { 2276 if (!success) { 2277 spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED); 2278 return; 2279 } 2280 2281 _bdev_io_split(bdev_io); 2282 } 2283 2284 /* Explicitly mark this inline, since it's used as a function pointer and otherwise won't 2285 * be inlined, at least on some compilers. 2286 */ 2287 static inline void 2288 _bdev_io_submit(void *ctx) 2289 { 2290 struct spdk_bdev_io *bdev_io = ctx; 2291 struct spdk_bdev *bdev = bdev_io->bdev; 2292 struct spdk_bdev_channel *bdev_ch = bdev_io->internal.ch; 2293 uint64_t tsc; 2294 2295 tsc = spdk_get_ticks(); 2296 bdev_io->internal.submit_tsc = tsc; 2297 spdk_trace_record_tsc(tsc, TRACE_BDEV_IO_START, 0, 0, (uintptr_t)bdev_io, bdev_io->type); 2298 2299 if (spdk_likely(bdev_ch->flags == 0)) { 2300 bdev_io_do_submit(bdev_ch, bdev_io); 2301 return; 2302 } 2303 2304 if (bdev_ch->flags & BDEV_CH_RESET_IN_PROGRESS) { 2305 _bdev_io_complete_in_submit(bdev_ch, bdev_io, SPDK_BDEV_IO_STATUS_ABORTED); 2306 } else if (bdev_ch->flags & BDEV_CH_QOS_ENABLED) { 2307 if (spdk_unlikely(bdev_io->type == SPDK_BDEV_IO_TYPE_ABORT) && 2308 bdev_abort_queued_io(&bdev->internal.qos->queued, bdev_io->u.abort.bio_to_abort)) { 2309 _bdev_io_complete_in_submit(bdev_ch, bdev_io, SPDK_BDEV_IO_STATUS_SUCCESS); 2310 } else { 2311 TAILQ_INSERT_TAIL(&bdev->internal.qos->queued, bdev_io, internal.link); 2312 bdev_qos_io_submit(bdev_ch, bdev->internal.qos); 2313 } 2314 } else { 2315 SPDK_ERRLOG("unknown bdev_ch flag %x found\n", bdev_ch->flags); 2316 _bdev_io_complete_in_submit(bdev_ch, bdev_io, SPDK_BDEV_IO_STATUS_FAILED); 2317 } 2318 } 2319 2320 bool 2321 bdev_lba_range_overlapped(struct lba_range *range1, struct lba_range *range2); 2322 2323 bool 2324 bdev_lba_range_overlapped(struct lba_range *range1, struct lba_range *range2) 2325 { 2326 if (range1->length == 0 || range2->length == 0) { 2327 return false; 2328 } 2329 2330 if (range1->offset + range1->length <= range2->offset) { 2331 return false; 2332 } 2333 2334 if (range2->offset + range2->length <= range1->offset) { 2335 return false; 2336 } 2337 2338 return true; 2339 } 2340 2341 static bool 2342 bdev_io_range_is_locked(struct spdk_bdev_io *bdev_io, struct lba_range *range) 2343 { 2344 struct spdk_bdev_channel *ch = bdev_io->internal.ch; 2345 struct lba_range r; 2346 2347 switch (bdev_io->type) { 2348 case SPDK_BDEV_IO_TYPE_NVME_IO: 2349 case SPDK_BDEV_IO_TYPE_NVME_IO_MD: 2350 /* Don't try to decode the NVMe command - just assume worst-case and that 2351 * it overlaps a locked range. 2352 */ 2353 return true; 2354 case SPDK_BDEV_IO_TYPE_WRITE: 2355 case SPDK_BDEV_IO_TYPE_UNMAP: 2356 case SPDK_BDEV_IO_TYPE_WRITE_ZEROES: 2357 case SPDK_BDEV_IO_TYPE_ZCOPY: 2358 r.offset = bdev_io->u.bdev.offset_blocks; 2359 r.length = bdev_io->u.bdev.num_blocks; 2360 if (!bdev_lba_range_overlapped(range, &r)) { 2361 /* This I/O doesn't overlap the specified LBA range. */ 2362 return false; 2363 } else if (range->owner_ch == ch && range->locked_ctx == bdev_io->internal.caller_ctx) { 2364 /* This I/O overlaps, but the I/O is on the same channel that locked this 2365 * range, and the caller_ctx is the same as the locked_ctx. This means 2366 * that this I/O is associated with the lock, and is allowed to execute. 2367 */ 2368 return false; 2369 } else { 2370 return true; 2371 } 2372 default: 2373 return false; 2374 } 2375 } 2376 2377 void 2378 bdev_io_submit(struct spdk_bdev_io *bdev_io) 2379 { 2380 struct spdk_bdev *bdev = bdev_io->bdev; 2381 struct spdk_thread *thread = spdk_bdev_io_get_thread(bdev_io); 2382 struct spdk_bdev_channel *ch = bdev_io->internal.ch; 2383 2384 assert(thread != NULL); 2385 assert(bdev_io->internal.status == SPDK_BDEV_IO_STATUS_PENDING); 2386 2387 if (!TAILQ_EMPTY(&ch->locked_ranges)) { 2388 struct lba_range *range; 2389 2390 TAILQ_FOREACH(range, &ch->locked_ranges, tailq) { 2391 if (bdev_io_range_is_locked(bdev_io, range)) { 2392 TAILQ_INSERT_TAIL(&ch->io_locked, bdev_io, internal.ch_link); 2393 return; 2394 } 2395 } 2396 } 2397 2398 TAILQ_INSERT_TAIL(&ch->io_submitted, bdev_io, internal.ch_link); 2399 2400 if (bdev_io_should_split(bdev_io)) { 2401 bdev_io->internal.submit_tsc = spdk_get_ticks(); 2402 spdk_trace_record_tsc(bdev_io->internal.submit_tsc, TRACE_BDEV_IO_START, 0, 0, 2403 (uintptr_t)bdev_io, bdev_io->type); 2404 bdev_io_split(NULL, bdev_io); 2405 return; 2406 } 2407 2408 if (ch->flags & BDEV_CH_QOS_ENABLED) { 2409 if ((thread == bdev->internal.qos->thread) || !bdev->internal.qos->thread) { 2410 _bdev_io_submit(bdev_io); 2411 } else { 2412 bdev_io->internal.io_submit_ch = ch; 2413 bdev_io->internal.ch = bdev->internal.qos->ch; 2414 spdk_thread_send_msg(bdev->internal.qos->thread, _bdev_io_submit, bdev_io); 2415 } 2416 } else { 2417 _bdev_io_submit(bdev_io); 2418 } 2419 } 2420 2421 static void 2422 bdev_io_submit_reset(struct spdk_bdev_io *bdev_io) 2423 { 2424 struct spdk_bdev *bdev = bdev_io->bdev; 2425 struct spdk_bdev_channel *bdev_ch = bdev_io->internal.ch; 2426 struct spdk_io_channel *ch = bdev_ch->channel; 2427 2428 assert(bdev_io->internal.status == SPDK_BDEV_IO_STATUS_PENDING); 2429 2430 bdev_io->internal.in_submit_request = true; 2431 bdev->fn_table->submit_request(ch, bdev_io); 2432 bdev_io->internal.in_submit_request = false; 2433 } 2434 2435 void 2436 bdev_io_init(struct spdk_bdev_io *bdev_io, 2437 struct spdk_bdev *bdev, void *cb_arg, 2438 spdk_bdev_io_completion_cb cb) 2439 { 2440 bdev_io->bdev = bdev; 2441 bdev_io->internal.caller_ctx = cb_arg; 2442 bdev_io->internal.cb = cb; 2443 bdev_io->internal.status = SPDK_BDEV_IO_STATUS_PENDING; 2444 bdev_io->internal.in_submit_request = false; 2445 bdev_io->internal.buf = NULL; 2446 bdev_io->internal.io_submit_ch = NULL; 2447 bdev_io->internal.orig_iovs = NULL; 2448 bdev_io->internal.orig_iovcnt = 0; 2449 bdev_io->internal.orig_md_buf = NULL; 2450 bdev_io->internal.error.nvme.cdw0 = 0; 2451 bdev_io->num_retries = 0; 2452 bdev_io->internal.get_buf_cb = NULL; 2453 bdev_io->internal.get_aux_buf_cb = NULL; 2454 } 2455 2456 static bool 2457 bdev_io_type_supported(struct spdk_bdev *bdev, enum spdk_bdev_io_type io_type) 2458 { 2459 return bdev->fn_table->io_type_supported(bdev->ctxt, io_type); 2460 } 2461 2462 bool 2463 spdk_bdev_io_type_supported(struct spdk_bdev *bdev, enum spdk_bdev_io_type io_type) 2464 { 2465 bool supported; 2466 2467 supported = bdev_io_type_supported(bdev, io_type); 2468 2469 if (!supported) { 2470 switch (io_type) { 2471 case SPDK_BDEV_IO_TYPE_WRITE_ZEROES: 2472 /* The bdev layer will emulate write zeroes as long as write is supported. */ 2473 supported = bdev_io_type_supported(bdev, SPDK_BDEV_IO_TYPE_WRITE); 2474 break; 2475 case SPDK_BDEV_IO_TYPE_ZCOPY: 2476 /* Zero copy can be emulated with regular read and write */ 2477 supported = bdev_io_type_supported(bdev, SPDK_BDEV_IO_TYPE_READ) && 2478 bdev_io_type_supported(bdev, SPDK_BDEV_IO_TYPE_WRITE); 2479 break; 2480 default: 2481 break; 2482 } 2483 } 2484 2485 return supported; 2486 } 2487 2488 int 2489 spdk_bdev_dump_info_json(struct spdk_bdev *bdev, struct spdk_json_write_ctx *w) 2490 { 2491 if (bdev->fn_table->dump_info_json) { 2492 return bdev->fn_table->dump_info_json(bdev->ctxt, w); 2493 } 2494 2495 return 0; 2496 } 2497 2498 static void 2499 bdev_qos_update_max_quota_per_timeslice(struct spdk_bdev_qos *qos) 2500 { 2501 uint32_t max_per_timeslice = 0; 2502 int i; 2503 2504 for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) { 2505 if (qos->rate_limits[i].limit == SPDK_BDEV_QOS_LIMIT_NOT_DEFINED) { 2506 qos->rate_limits[i].max_per_timeslice = 0; 2507 continue; 2508 } 2509 2510 max_per_timeslice = qos->rate_limits[i].limit * 2511 SPDK_BDEV_QOS_TIMESLICE_IN_USEC / SPDK_SEC_TO_USEC; 2512 2513 qos->rate_limits[i].max_per_timeslice = spdk_max(max_per_timeslice, 2514 qos->rate_limits[i].min_per_timeslice); 2515 2516 qos->rate_limits[i].remaining_this_timeslice = qos->rate_limits[i].max_per_timeslice; 2517 } 2518 2519 bdev_qos_set_ops(qos); 2520 } 2521 2522 static int 2523 bdev_channel_poll_qos(void *arg) 2524 { 2525 struct spdk_bdev_qos *qos = arg; 2526 uint64_t now = spdk_get_ticks(); 2527 int i; 2528 2529 if (now < (qos->last_timeslice + qos->timeslice_size)) { 2530 /* We received our callback earlier than expected - return 2531 * immediately and wait to do accounting until at least one 2532 * timeslice has actually expired. This should never happen 2533 * with a well-behaved timer implementation. 2534 */ 2535 return SPDK_POLLER_IDLE; 2536 } 2537 2538 /* Reset for next round of rate limiting */ 2539 for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) { 2540 /* We may have allowed the IOs or bytes to slightly overrun in the last 2541 * timeslice. remaining_this_timeslice is signed, so if it's negative 2542 * here, we'll account for the overrun so that the next timeslice will 2543 * be appropriately reduced. 2544 */ 2545 if (qos->rate_limits[i].remaining_this_timeslice > 0) { 2546 qos->rate_limits[i].remaining_this_timeslice = 0; 2547 } 2548 } 2549 2550 while (now >= (qos->last_timeslice + qos->timeslice_size)) { 2551 qos->last_timeslice += qos->timeslice_size; 2552 for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) { 2553 qos->rate_limits[i].remaining_this_timeslice += 2554 qos->rate_limits[i].max_per_timeslice; 2555 } 2556 } 2557 2558 return bdev_qos_io_submit(qos->ch, qos); 2559 } 2560 2561 static void 2562 bdev_channel_destroy_resource(struct spdk_bdev_channel *ch) 2563 { 2564 struct spdk_bdev_shared_resource *shared_resource; 2565 struct lba_range *range; 2566 2567 while (!TAILQ_EMPTY(&ch->locked_ranges)) { 2568 range = TAILQ_FIRST(&ch->locked_ranges); 2569 TAILQ_REMOVE(&ch->locked_ranges, range, tailq); 2570 free(range); 2571 } 2572 2573 spdk_put_io_channel(ch->channel); 2574 2575 shared_resource = ch->shared_resource; 2576 2577 assert(TAILQ_EMPTY(&ch->io_locked)); 2578 assert(TAILQ_EMPTY(&ch->io_submitted)); 2579 assert(ch->io_outstanding == 0); 2580 assert(shared_resource->ref > 0); 2581 shared_resource->ref--; 2582 if (shared_resource->ref == 0) { 2583 assert(shared_resource->io_outstanding == 0); 2584 TAILQ_REMOVE(&shared_resource->mgmt_ch->shared_resources, shared_resource, link); 2585 spdk_put_io_channel(spdk_io_channel_from_ctx(shared_resource->mgmt_ch)); 2586 free(shared_resource); 2587 } 2588 } 2589 2590 /* Caller must hold bdev->internal.mutex. */ 2591 static void 2592 bdev_enable_qos(struct spdk_bdev *bdev, struct spdk_bdev_channel *ch) 2593 { 2594 struct spdk_bdev_qos *qos = bdev->internal.qos; 2595 int i; 2596 2597 /* Rate limiting on this bdev enabled */ 2598 if (qos) { 2599 if (qos->ch == NULL) { 2600 struct spdk_io_channel *io_ch; 2601 2602 SPDK_DEBUGLOG(bdev, "Selecting channel %p as QoS channel for bdev %s on thread %p\n", ch, 2603 bdev->name, spdk_get_thread()); 2604 2605 /* No qos channel has been selected, so set one up */ 2606 2607 /* Take another reference to ch */ 2608 io_ch = spdk_get_io_channel(__bdev_to_io_dev(bdev)); 2609 assert(io_ch != NULL); 2610 qos->ch = ch; 2611 2612 qos->thread = spdk_io_channel_get_thread(io_ch); 2613 2614 TAILQ_INIT(&qos->queued); 2615 2616 for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) { 2617 if (bdev_qos_is_iops_rate_limit(i) == true) { 2618 qos->rate_limits[i].min_per_timeslice = 2619 SPDK_BDEV_QOS_MIN_IO_PER_TIMESLICE; 2620 } else { 2621 qos->rate_limits[i].min_per_timeslice = 2622 SPDK_BDEV_QOS_MIN_BYTE_PER_TIMESLICE; 2623 } 2624 2625 if (qos->rate_limits[i].limit == 0) { 2626 qos->rate_limits[i].limit = SPDK_BDEV_QOS_LIMIT_NOT_DEFINED; 2627 } 2628 } 2629 bdev_qos_update_max_quota_per_timeslice(qos); 2630 qos->timeslice_size = 2631 SPDK_BDEV_QOS_TIMESLICE_IN_USEC * spdk_get_ticks_hz() / SPDK_SEC_TO_USEC; 2632 qos->last_timeslice = spdk_get_ticks(); 2633 qos->poller = SPDK_POLLER_REGISTER(bdev_channel_poll_qos, 2634 qos, 2635 SPDK_BDEV_QOS_TIMESLICE_IN_USEC); 2636 } 2637 2638 ch->flags |= BDEV_CH_QOS_ENABLED; 2639 } 2640 } 2641 2642 struct poll_timeout_ctx { 2643 struct spdk_bdev_desc *desc; 2644 uint64_t timeout_in_sec; 2645 spdk_bdev_io_timeout_cb cb_fn; 2646 void *cb_arg; 2647 }; 2648 2649 static void 2650 bdev_desc_free(struct spdk_bdev_desc *desc) 2651 { 2652 pthread_mutex_destroy(&desc->mutex); 2653 free(desc->media_events_buffer); 2654 free(desc); 2655 } 2656 2657 static void 2658 bdev_channel_poll_timeout_io_done(struct spdk_io_channel_iter *i, int status) 2659 { 2660 struct poll_timeout_ctx *ctx = spdk_io_channel_iter_get_ctx(i); 2661 struct spdk_bdev_desc *desc = ctx->desc; 2662 2663 free(ctx); 2664 2665 pthread_mutex_lock(&desc->mutex); 2666 desc->refs--; 2667 if (desc->closed == true && desc->refs == 0) { 2668 pthread_mutex_unlock(&desc->mutex); 2669 bdev_desc_free(desc); 2670 return; 2671 } 2672 pthread_mutex_unlock(&desc->mutex); 2673 } 2674 2675 static void 2676 bdev_channel_poll_timeout_io(struct spdk_io_channel_iter *i) 2677 { 2678 struct poll_timeout_ctx *ctx = spdk_io_channel_iter_get_ctx(i); 2679 struct spdk_io_channel *io_ch = spdk_io_channel_iter_get_channel(i); 2680 struct spdk_bdev_channel *bdev_ch = spdk_io_channel_get_ctx(io_ch); 2681 struct spdk_bdev_desc *desc = ctx->desc; 2682 struct spdk_bdev_io *bdev_io; 2683 uint64_t now; 2684 2685 pthread_mutex_lock(&desc->mutex); 2686 if (desc->closed == true) { 2687 pthread_mutex_unlock(&desc->mutex); 2688 spdk_for_each_channel_continue(i, -1); 2689 return; 2690 } 2691 pthread_mutex_unlock(&desc->mutex); 2692 2693 now = spdk_get_ticks(); 2694 TAILQ_FOREACH(bdev_io, &bdev_ch->io_submitted, internal.ch_link) { 2695 /* Exclude any I/O that are generated via splitting. */ 2696 if (bdev_io->internal.cb == bdev_io_split_done) { 2697 continue; 2698 } 2699 2700 /* Once we find an I/O that has not timed out, we can immediately 2701 * exit the loop. 2702 */ 2703 if (now < (bdev_io->internal.submit_tsc + 2704 ctx->timeout_in_sec * spdk_get_ticks_hz())) { 2705 goto end; 2706 } 2707 2708 if (bdev_io->internal.desc == desc) { 2709 ctx->cb_fn(ctx->cb_arg, bdev_io); 2710 } 2711 } 2712 2713 end: 2714 spdk_for_each_channel_continue(i, 0); 2715 } 2716 2717 static int 2718 bdev_poll_timeout_io(void *arg) 2719 { 2720 struct spdk_bdev_desc *desc = arg; 2721 struct spdk_bdev *bdev = spdk_bdev_desc_get_bdev(desc); 2722 struct poll_timeout_ctx *ctx; 2723 2724 ctx = calloc(1, sizeof(struct poll_timeout_ctx)); 2725 if (!ctx) { 2726 SPDK_ERRLOG("failed to allocate memory\n"); 2727 return SPDK_POLLER_BUSY; 2728 } 2729 ctx->desc = desc; 2730 ctx->cb_arg = desc->cb_arg; 2731 ctx->cb_fn = desc->cb_fn; 2732 ctx->timeout_in_sec = desc->timeout_in_sec; 2733 2734 /* Take a ref on the descriptor in case it gets closed while we are checking 2735 * all of the channels. 2736 */ 2737 pthread_mutex_lock(&desc->mutex); 2738 desc->refs++; 2739 pthread_mutex_unlock(&desc->mutex); 2740 2741 spdk_for_each_channel(__bdev_to_io_dev(bdev), 2742 bdev_channel_poll_timeout_io, 2743 ctx, 2744 bdev_channel_poll_timeout_io_done); 2745 2746 return SPDK_POLLER_BUSY; 2747 } 2748 2749 int 2750 spdk_bdev_set_timeout(struct spdk_bdev_desc *desc, uint64_t timeout_in_sec, 2751 spdk_bdev_io_timeout_cb cb_fn, void *cb_arg) 2752 { 2753 assert(desc->thread == spdk_get_thread()); 2754 2755 spdk_poller_unregister(&desc->io_timeout_poller); 2756 2757 if (timeout_in_sec) { 2758 assert(cb_fn != NULL); 2759 desc->io_timeout_poller = SPDK_POLLER_REGISTER(bdev_poll_timeout_io, 2760 desc, 2761 SPDK_BDEV_IO_POLL_INTERVAL_IN_MSEC * SPDK_SEC_TO_USEC / 2762 1000); 2763 if (desc->io_timeout_poller == NULL) { 2764 SPDK_ERRLOG("can not register the desc timeout IO poller\n"); 2765 return -1; 2766 } 2767 } 2768 2769 desc->cb_fn = cb_fn; 2770 desc->cb_arg = cb_arg; 2771 desc->timeout_in_sec = timeout_in_sec; 2772 2773 return 0; 2774 } 2775 2776 static int 2777 bdev_channel_create(void *io_device, void *ctx_buf) 2778 { 2779 struct spdk_bdev *bdev = __bdev_from_io_dev(io_device); 2780 struct spdk_bdev_channel *ch = ctx_buf; 2781 struct spdk_io_channel *mgmt_io_ch; 2782 struct spdk_bdev_mgmt_channel *mgmt_ch; 2783 struct spdk_bdev_shared_resource *shared_resource; 2784 struct lba_range *range; 2785 2786 ch->bdev = bdev; 2787 ch->channel = bdev->fn_table->get_io_channel(bdev->ctxt); 2788 if (!ch->channel) { 2789 return -1; 2790 } 2791 2792 assert(ch->histogram == NULL); 2793 if (bdev->internal.histogram_enabled) { 2794 ch->histogram = spdk_histogram_data_alloc(); 2795 if (ch->histogram == NULL) { 2796 SPDK_ERRLOG("Could not allocate histogram\n"); 2797 } 2798 } 2799 2800 mgmt_io_ch = spdk_get_io_channel(&g_bdev_mgr); 2801 if (!mgmt_io_ch) { 2802 spdk_put_io_channel(ch->channel); 2803 return -1; 2804 } 2805 2806 mgmt_ch = spdk_io_channel_get_ctx(mgmt_io_ch); 2807 TAILQ_FOREACH(shared_resource, &mgmt_ch->shared_resources, link) { 2808 if (shared_resource->shared_ch == ch->channel) { 2809 spdk_put_io_channel(mgmt_io_ch); 2810 shared_resource->ref++; 2811 break; 2812 } 2813 } 2814 2815 if (shared_resource == NULL) { 2816 shared_resource = calloc(1, sizeof(*shared_resource)); 2817 if (shared_resource == NULL) { 2818 spdk_put_io_channel(ch->channel); 2819 spdk_put_io_channel(mgmt_io_ch); 2820 return -1; 2821 } 2822 2823 shared_resource->mgmt_ch = mgmt_ch; 2824 shared_resource->io_outstanding = 0; 2825 TAILQ_INIT(&shared_resource->nomem_io); 2826 shared_resource->nomem_threshold = 0; 2827 shared_resource->shared_ch = ch->channel; 2828 shared_resource->ref = 1; 2829 TAILQ_INSERT_TAIL(&mgmt_ch->shared_resources, shared_resource, link); 2830 } 2831 2832 memset(&ch->stat, 0, sizeof(ch->stat)); 2833 ch->stat.ticks_rate = spdk_get_ticks_hz(); 2834 ch->io_outstanding = 0; 2835 TAILQ_INIT(&ch->queued_resets); 2836 TAILQ_INIT(&ch->locked_ranges); 2837 ch->flags = 0; 2838 ch->shared_resource = shared_resource; 2839 2840 TAILQ_INIT(&ch->io_submitted); 2841 TAILQ_INIT(&ch->io_locked); 2842 2843 #ifdef SPDK_CONFIG_VTUNE 2844 { 2845 char *name; 2846 __itt_init_ittlib(NULL, 0); 2847 name = spdk_sprintf_alloc("spdk_bdev_%s_%p", ch->bdev->name, ch); 2848 if (!name) { 2849 bdev_channel_destroy_resource(ch); 2850 return -1; 2851 } 2852 ch->handle = __itt_string_handle_create(name); 2853 free(name); 2854 ch->start_tsc = spdk_get_ticks(); 2855 ch->interval_tsc = spdk_get_ticks_hz() / 100; 2856 memset(&ch->prev_stat, 0, sizeof(ch->prev_stat)); 2857 } 2858 #endif 2859 2860 pthread_mutex_lock(&bdev->internal.mutex); 2861 bdev_enable_qos(bdev, ch); 2862 2863 TAILQ_FOREACH(range, &bdev->internal.locked_ranges, tailq) { 2864 struct lba_range *new_range; 2865 2866 new_range = calloc(1, sizeof(*new_range)); 2867 if (new_range == NULL) { 2868 pthread_mutex_unlock(&bdev->internal.mutex); 2869 bdev_channel_destroy_resource(ch); 2870 return -1; 2871 } 2872 new_range->length = range->length; 2873 new_range->offset = range->offset; 2874 new_range->locked_ctx = range->locked_ctx; 2875 TAILQ_INSERT_TAIL(&ch->locked_ranges, new_range, tailq); 2876 } 2877 2878 pthread_mutex_unlock(&bdev->internal.mutex); 2879 2880 return 0; 2881 } 2882 2883 /* 2884 * Abort I/O that are waiting on a data buffer. These types of I/O are 2885 * linked using the spdk_bdev_io internal.buf_link TAILQ_ENTRY. 2886 */ 2887 static void 2888 bdev_abort_all_buf_io(bdev_io_stailq_t *queue, struct spdk_bdev_channel *ch) 2889 { 2890 bdev_io_stailq_t tmp; 2891 struct spdk_bdev_io *bdev_io; 2892 2893 STAILQ_INIT(&tmp); 2894 2895 while (!STAILQ_EMPTY(queue)) { 2896 bdev_io = STAILQ_FIRST(queue); 2897 STAILQ_REMOVE_HEAD(queue, internal.buf_link); 2898 if (bdev_io->internal.ch == ch) { 2899 spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_ABORTED); 2900 } else { 2901 STAILQ_INSERT_TAIL(&tmp, bdev_io, internal.buf_link); 2902 } 2903 } 2904 2905 STAILQ_SWAP(&tmp, queue, spdk_bdev_io); 2906 } 2907 2908 /* 2909 * Abort I/O that are queued waiting for submission. These types of I/O are 2910 * linked using the spdk_bdev_io link TAILQ_ENTRY. 2911 */ 2912 static void 2913 bdev_abort_all_queued_io(bdev_io_tailq_t *queue, struct spdk_bdev_channel *ch) 2914 { 2915 struct spdk_bdev_io *bdev_io, *tmp; 2916 2917 TAILQ_FOREACH_SAFE(bdev_io, queue, internal.link, tmp) { 2918 if (bdev_io->internal.ch == ch) { 2919 TAILQ_REMOVE(queue, bdev_io, internal.link); 2920 /* 2921 * spdk_bdev_io_complete() assumes that the completed I/O had 2922 * been submitted to the bdev module. Since in this case it 2923 * hadn't, bump io_outstanding to account for the decrement 2924 * that spdk_bdev_io_complete() will do. 2925 */ 2926 if (bdev_io->type != SPDK_BDEV_IO_TYPE_RESET) { 2927 ch->io_outstanding++; 2928 ch->shared_resource->io_outstanding++; 2929 } 2930 spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_ABORTED); 2931 } 2932 } 2933 } 2934 2935 static bool 2936 bdev_abort_queued_io(bdev_io_tailq_t *queue, struct spdk_bdev_io *bio_to_abort) 2937 { 2938 struct spdk_bdev_io *bdev_io; 2939 2940 TAILQ_FOREACH(bdev_io, queue, internal.link) { 2941 if (bdev_io == bio_to_abort) { 2942 TAILQ_REMOVE(queue, bio_to_abort, internal.link); 2943 spdk_bdev_io_complete(bio_to_abort, SPDK_BDEV_IO_STATUS_ABORTED); 2944 return true; 2945 } 2946 } 2947 2948 return false; 2949 } 2950 2951 static bool 2952 bdev_abort_buf_io(bdev_io_stailq_t *queue, struct spdk_bdev_io *bio_to_abort) 2953 { 2954 struct spdk_bdev_io *bdev_io; 2955 2956 STAILQ_FOREACH(bdev_io, queue, internal.buf_link) { 2957 if (bdev_io == bio_to_abort) { 2958 STAILQ_REMOVE(queue, bio_to_abort, spdk_bdev_io, internal.buf_link); 2959 spdk_bdev_io_complete(bio_to_abort, SPDK_BDEV_IO_STATUS_ABORTED); 2960 return true; 2961 } 2962 } 2963 2964 return false; 2965 } 2966 2967 static void 2968 bdev_qos_channel_destroy(void *cb_arg) 2969 { 2970 struct spdk_bdev_qos *qos = cb_arg; 2971 2972 spdk_put_io_channel(spdk_io_channel_from_ctx(qos->ch)); 2973 spdk_poller_unregister(&qos->poller); 2974 2975 SPDK_DEBUGLOG(bdev, "Free QoS %p.\n", qos); 2976 2977 free(qos); 2978 } 2979 2980 static int 2981 bdev_qos_destroy(struct spdk_bdev *bdev) 2982 { 2983 int i; 2984 2985 /* 2986 * Cleanly shutting down the QoS poller is tricky, because 2987 * during the asynchronous operation the user could open 2988 * a new descriptor and create a new channel, spawning 2989 * a new QoS poller. 2990 * 2991 * The strategy is to create a new QoS structure here and swap it 2992 * in. The shutdown path then continues to refer to the old one 2993 * until it completes and then releases it. 2994 */ 2995 struct spdk_bdev_qos *new_qos, *old_qos; 2996 2997 old_qos = bdev->internal.qos; 2998 2999 new_qos = calloc(1, sizeof(*new_qos)); 3000 if (!new_qos) { 3001 SPDK_ERRLOG("Unable to allocate memory to shut down QoS.\n"); 3002 return -ENOMEM; 3003 } 3004 3005 /* Copy the old QoS data into the newly allocated structure */ 3006 memcpy(new_qos, old_qos, sizeof(*new_qos)); 3007 3008 /* Zero out the key parts of the QoS structure */ 3009 new_qos->ch = NULL; 3010 new_qos->thread = NULL; 3011 new_qos->poller = NULL; 3012 TAILQ_INIT(&new_qos->queued); 3013 /* 3014 * The limit member of spdk_bdev_qos_limit structure is not zeroed. 3015 * It will be used later for the new QoS structure. 3016 */ 3017 for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) { 3018 new_qos->rate_limits[i].remaining_this_timeslice = 0; 3019 new_qos->rate_limits[i].min_per_timeslice = 0; 3020 new_qos->rate_limits[i].max_per_timeslice = 0; 3021 } 3022 3023 bdev->internal.qos = new_qos; 3024 3025 if (old_qos->thread == NULL) { 3026 free(old_qos); 3027 } else { 3028 spdk_thread_send_msg(old_qos->thread, bdev_qos_channel_destroy, old_qos); 3029 } 3030 3031 /* It is safe to continue with destroying the bdev even though the QoS channel hasn't 3032 * been destroyed yet. The destruction path will end up waiting for the final 3033 * channel to be put before it releases resources. */ 3034 3035 return 0; 3036 } 3037 3038 static void 3039 bdev_io_stat_add(struct spdk_bdev_io_stat *total, struct spdk_bdev_io_stat *add) 3040 { 3041 total->bytes_read += add->bytes_read; 3042 total->num_read_ops += add->num_read_ops; 3043 total->bytes_written += add->bytes_written; 3044 total->num_write_ops += add->num_write_ops; 3045 total->bytes_unmapped += add->bytes_unmapped; 3046 total->num_unmap_ops += add->num_unmap_ops; 3047 total->read_latency_ticks += add->read_latency_ticks; 3048 total->write_latency_ticks += add->write_latency_ticks; 3049 total->unmap_latency_ticks += add->unmap_latency_ticks; 3050 } 3051 3052 static void 3053 bdev_channel_destroy(void *io_device, void *ctx_buf) 3054 { 3055 struct spdk_bdev_channel *ch = ctx_buf; 3056 struct spdk_bdev_mgmt_channel *mgmt_ch; 3057 struct spdk_bdev_shared_resource *shared_resource = ch->shared_resource; 3058 3059 SPDK_DEBUGLOG(bdev, "Destroying channel %p for bdev %s on thread %p\n", ch, ch->bdev->name, 3060 spdk_get_thread()); 3061 3062 /* This channel is going away, so add its statistics into the bdev so that they don't get lost. */ 3063 pthread_mutex_lock(&ch->bdev->internal.mutex); 3064 bdev_io_stat_add(&ch->bdev->internal.stat, &ch->stat); 3065 pthread_mutex_unlock(&ch->bdev->internal.mutex); 3066 3067 mgmt_ch = shared_resource->mgmt_ch; 3068 3069 bdev_abort_all_queued_io(&ch->queued_resets, ch); 3070 bdev_abort_all_queued_io(&shared_resource->nomem_io, ch); 3071 bdev_abort_all_buf_io(&mgmt_ch->need_buf_small, ch); 3072 bdev_abort_all_buf_io(&mgmt_ch->need_buf_large, ch); 3073 3074 if (ch->histogram) { 3075 spdk_histogram_data_free(ch->histogram); 3076 } 3077 3078 bdev_channel_destroy_resource(ch); 3079 } 3080 3081 int 3082 spdk_bdev_alias_add(struct spdk_bdev *bdev, const char *alias) 3083 { 3084 struct spdk_bdev_alias *tmp; 3085 3086 if (alias == NULL) { 3087 SPDK_ERRLOG("Empty alias passed\n"); 3088 return -EINVAL; 3089 } 3090 3091 if (spdk_bdev_get_by_name(alias)) { 3092 SPDK_ERRLOG("Bdev name/alias: %s already exists\n", alias); 3093 return -EEXIST; 3094 } 3095 3096 tmp = calloc(1, sizeof(*tmp)); 3097 if (tmp == NULL) { 3098 SPDK_ERRLOG("Unable to allocate alias\n"); 3099 return -ENOMEM; 3100 } 3101 3102 tmp->alias = strdup(alias); 3103 if (tmp->alias == NULL) { 3104 free(tmp); 3105 SPDK_ERRLOG("Unable to allocate alias\n"); 3106 return -ENOMEM; 3107 } 3108 3109 TAILQ_INSERT_TAIL(&bdev->aliases, tmp, tailq); 3110 3111 return 0; 3112 } 3113 3114 int 3115 spdk_bdev_alias_del(struct spdk_bdev *bdev, const char *alias) 3116 { 3117 struct spdk_bdev_alias *tmp; 3118 3119 TAILQ_FOREACH(tmp, &bdev->aliases, tailq) { 3120 if (strcmp(alias, tmp->alias) == 0) { 3121 TAILQ_REMOVE(&bdev->aliases, tmp, tailq); 3122 free(tmp->alias); 3123 free(tmp); 3124 return 0; 3125 } 3126 } 3127 3128 SPDK_INFOLOG(bdev, "Alias %s does not exists\n", alias); 3129 3130 return -ENOENT; 3131 } 3132 3133 void 3134 spdk_bdev_alias_del_all(struct spdk_bdev *bdev) 3135 { 3136 struct spdk_bdev_alias *p, *tmp; 3137 3138 TAILQ_FOREACH_SAFE(p, &bdev->aliases, tailq, tmp) { 3139 TAILQ_REMOVE(&bdev->aliases, p, tailq); 3140 free(p->alias); 3141 free(p); 3142 } 3143 } 3144 3145 struct spdk_io_channel * 3146 spdk_bdev_get_io_channel(struct spdk_bdev_desc *desc) 3147 { 3148 return spdk_get_io_channel(__bdev_to_io_dev(spdk_bdev_desc_get_bdev(desc))); 3149 } 3150 3151 void * 3152 spdk_bdev_get_module_ctx(struct spdk_bdev_desc *desc) 3153 { 3154 struct spdk_bdev *bdev = spdk_bdev_desc_get_bdev(desc); 3155 void *ctx = NULL; 3156 3157 if (bdev->fn_table->get_module_ctx) { 3158 ctx = bdev->fn_table->get_module_ctx(bdev->ctxt); 3159 } 3160 3161 return ctx; 3162 } 3163 3164 const char * 3165 spdk_bdev_get_module_name(const struct spdk_bdev *bdev) 3166 { 3167 return bdev->module->name; 3168 } 3169 3170 const char * 3171 spdk_bdev_get_name(const struct spdk_bdev *bdev) 3172 { 3173 return bdev->name; 3174 } 3175 3176 const char * 3177 spdk_bdev_get_product_name(const struct spdk_bdev *bdev) 3178 { 3179 return bdev->product_name; 3180 } 3181 3182 const struct spdk_bdev_aliases_list * 3183 spdk_bdev_get_aliases(const struct spdk_bdev *bdev) 3184 { 3185 return &bdev->aliases; 3186 } 3187 3188 uint32_t 3189 spdk_bdev_get_block_size(const struct spdk_bdev *bdev) 3190 { 3191 return bdev->blocklen; 3192 } 3193 3194 uint32_t 3195 spdk_bdev_get_write_unit_size(const struct spdk_bdev *bdev) 3196 { 3197 return bdev->write_unit_size; 3198 } 3199 3200 uint64_t 3201 spdk_bdev_get_num_blocks(const struct spdk_bdev *bdev) 3202 { 3203 return bdev->blockcnt; 3204 } 3205 3206 const char * 3207 spdk_bdev_get_qos_rpc_type(enum spdk_bdev_qos_rate_limit_type type) 3208 { 3209 return qos_rpc_type[type]; 3210 } 3211 3212 void 3213 spdk_bdev_get_qos_rate_limits(struct spdk_bdev *bdev, uint64_t *limits) 3214 { 3215 int i; 3216 3217 memset(limits, 0, sizeof(*limits) * SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES); 3218 3219 pthread_mutex_lock(&bdev->internal.mutex); 3220 if (bdev->internal.qos) { 3221 for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) { 3222 if (bdev->internal.qos->rate_limits[i].limit != 3223 SPDK_BDEV_QOS_LIMIT_NOT_DEFINED) { 3224 limits[i] = bdev->internal.qos->rate_limits[i].limit; 3225 if (bdev_qos_is_iops_rate_limit(i) == false) { 3226 /* Change from Byte to Megabyte which is user visible. */ 3227 limits[i] = limits[i] / 1024 / 1024; 3228 } 3229 } 3230 } 3231 } 3232 pthread_mutex_unlock(&bdev->internal.mutex); 3233 } 3234 3235 size_t 3236 spdk_bdev_get_buf_align(const struct spdk_bdev *bdev) 3237 { 3238 return 1 << bdev->required_alignment; 3239 } 3240 3241 uint32_t 3242 spdk_bdev_get_optimal_io_boundary(const struct spdk_bdev *bdev) 3243 { 3244 return bdev->optimal_io_boundary; 3245 } 3246 3247 bool 3248 spdk_bdev_has_write_cache(const struct spdk_bdev *bdev) 3249 { 3250 return bdev->write_cache; 3251 } 3252 3253 const struct spdk_uuid * 3254 spdk_bdev_get_uuid(const struct spdk_bdev *bdev) 3255 { 3256 return &bdev->uuid; 3257 } 3258 3259 uint16_t 3260 spdk_bdev_get_acwu(const struct spdk_bdev *bdev) 3261 { 3262 return bdev->acwu; 3263 } 3264 3265 uint32_t 3266 spdk_bdev_get_md_size(const struct spdk_bdev *bdev) 3267 { 3268 return bdev->md_len; 3269 } 3270 3271 bool 3272 spdk_bdev_is_md_interleaved(const struct spdk_bdev *bdev) 3273 { 3274 return (bdev->md_len != 0) && bdev->md_interleave; 3275 } 3276 3277 bool 3278 spdk_bdev_is_md_separate(const struct spdk_bdev *bdev) 3279 { 3280 return (bdev->md_len != 0) && !bdev->md_interleave; 3281 } 3282 3283 bool 3284 spdk_bdev_is_zoned(const struct spdk_bdev *bdev) 3285 { 3286 return bdev->zoned; 3287 } 3288 3289 uint32_t 3290 spdk_bdev_get_data_block_size(const struct spdk_bdev *bdev) 3291 { 3292 if (spdk_bdev_is_md_interleaved(bdev)) { 3293 return bdev->blocklen - bdev->md_len; 3294 } else { 3295 return bdev->blocklen; 3296 } 3297 } 3298 3299 static uint32_t 3300 _bdev_get_block_size_with_md(const struct spdk_bdev *bdev) 3301 { 3302 if (!spdk_bdev_is_md_interleaved(bdev)) { 3303 return bdev->blocklen + bdev->md_len; 3304 } else { 3305 return bdev->blocklen; 3306 } 3307 } 3308 3309 enum spdk_dif_type spdk_bdev_get_dif_type(const struct spdk_bdev *bdev) 3310 { 3311 if (bdev->md_len != 0) { 3312 return bdev->dif_type; 3313 } else { 3314 return SPDK_DIF_DISABLE; 3315 } 3316 } 3317 3318 bool 3319 spdk_bdev_is_dif_head_of_md(const struct spdk_bdev *bdev) 3320 { 3321 if (spdk_bdev_get_dif_type(bdev) != SPDK_DIF_DISABLE) { 3322 return bdev->dif_is_head_of_md; 3323 } else { 3324 return false; 3325 } 3326 } 3327 3328 bool 3329 spdk_bdev_is_dif_check_enabled(const struct spdk_bdev *bdev, 3330 enum spdk_dif_check_type check_type) 3331 { 3332 if (spdk_bdev_get_dif_type(bdev) == SPDK_DIF_DISABLE) { 3333 return false; 3334 } 3335 3336 switch (check_type) { 3337 case SPDK_DIF_CHECK_TYPE_REFTAG: 3338 return (bdev->dif_check_flags & SPDK_DIF_FLAGS_REFTAG_CHECK) != 0; 3339 case SPDK_DIF_CHECK_TYPE_APPTAG: 3340 return (bdev->dif_check_flags & SPDK_DIF_FLAGS_APPTAG_CHECK) != 0; 3341 case SPDK_DIF_CHECK_TYPE_GUARD: 3342 return (bdev->dif_check_flags & SPDK_DIF_FLAGS_GUARD_CHECK) != 0; 3343 default: 3344 return false; 3345 } 3346 } 3347 3348 uint64_t 3349 spdk_bdev_get_qd(const struct spdk_bdev *bdev) 3350 { 3351 return bdev->internal.measured_queue_depth; 3352 } 3353 3354 uint64_t 3355 spdk_bdev_get_qd_sampling_period(const struct spdk_bdev *bdev) 3356 { 3357 return bdev->internal.period; 3358 } 3359 3360 uint64_t 3361 spdk_bdev_get_weighted_io_time(const struct spdk_bdev *bdev) 3362 { 3363 return bdev->internal.weighted_io_time; 3364 } 3365 3366 uint64_t 3367 spdk_bdev_get_io_time(const struct spdk_bdev *bdev) 3368 { 3369 return bdev->internal.io_time; 3370 } 3371 3372 static void 3373 _calculate_measured_qd_cpl(struct spdk_io_channel_iter *i, int status) 3374 { 3375 struct spdk_bdev *bdev = spdk_io_channel_iter_get_ctx(i); 3376 3377 bdev->internal.measured_queue_depth = bdev->internal.temporary_queue_depth; 3378 3379 if (bdev->internal.measured_queue_depth) { 3380 bdev->internal.io_time += bdev->internal.period; 3381 bdev->internal.weighted_io_time += bdev->internal.period * bdev->internal.measured_queue_depth; 3382 } 3383 } 3384 3385 static void 3386 _calculate_measured_qd(struct spdk_io_channel_iter *i) 3387 { 3388 struct spdk_bdev *bdev = spdk_io_channel_iter_get_ctx(i); 3389 struct spdk_io_channel *io_ch = spdk_io_channel_iter_get_channel(i); 3390 struct spdk_bdev_channel *ch = spdk_io_channel_get_ctx(io_ch); 3391 3392 bdev->internal.temporary_queue_depth += ch->io_outstanding; 3393 spdk_for_each_channel_continue(i, 0); 3394 } 3395 3396 static int 3397 bdev_calculate_measured_queue_depth(void *ctx) 3398 { 3399 struct spdk_bdev *bdev = ctx; 3400 bdev->internal.temporary_queue_depth = 0; 3401 spdk_for_each_channel(__bdev_to_io_dev(bdev), _calculate_measured_qd, bdev, 3402 _calculate_measured_qd_cpl); 3403 return SPDK_POLLER_BUSY; 3404 } 3405 3406 void 3407 spdk_bdev_set_qd_sampling_period(struct spdk_bdev *bdev, uint64_t period) 3408 { 3409 bdev->internal.period = period; 3410 3411 if (bdev->internal.qd_poller != NULL) { 3412 spdk_poller_unregister(&bdev->internal.qd_poller); 3413 bdev->internal.measured_queue_depth = UINT64_MAX; 3414 } 3415 3416 if (period != 0) { 3417 bdev->internal.qd_poller = SPDK_POLLER_REGISTER(bdev_calculate_measured_queue_depth, bdev, 3418 period); 3419 } 3420 } 3421 3422 static void 3423 _resize_notify(void *arg) 3424 { 3425 struct spdk_bdev_desc *desc = arg; 3426 3427 pthread_mutex_lock(&desc->mutex); 3428 desc->refs--; 3429 if (!desc->closed) { 3430 pthread_mutex_unlock(&desc->mutex); 3431 desc->callback.event_fn(SPDK_BDEV_EVENT_RESIZE, 3432 desc->bdev, 3433 desc->callback.ctx); 3434 return; 3435 } else if (0 == desc->refs) { 3436 /* This descriptor was closed after this resize_notify message was sent. 3437 * spdk_bdev_close() could not free the descriptor since this message was 3438 * in flight, so we free it now using bdev_desc_free(). 3439 */ 3440 pthread_mutex_unlock(&desc->mutex); 3441 bdev_desc_free(desc); 3442 return; 3443 } 3444 pthread_mutex_unlock(&desc->mutex); 3445 } 3446 3447 int 3448 spdk_bdev_notify_blockcnt_change(struct spdk_bdev *bdev, uint64_t size) 3449 { 3450 struct spdk_bdev_desc *desc; 3451 int ret; 3452 3453 if (size == bdev->blockcnt) { 3454 return 0; 3455 } 3456 3457 pthread_mutex_lock(&bdev->internal.mutex); 3458 3459 /* bdev has open descriptors */ 3460 if (!TAILQ_EMPTY(&bdev->internal.open_descs) && 3461 bdev->blockcnt > size) { 3462 ret = -EBUSY; 3463 } else { 3464 bdev->blockcnt = size; 3465 TAILQ_FOREACH(desc, &bdev->internal.open_descs, link) { 3466 pthread_mutex_lock(&desc->mutex); 3467 if (desc->callback.open_with_ext && !desc->closed) { 3468 desc->refs++; 3469 spdk_thread_send_msg(desc->thread, _resize_notify, desc); 3470 } 3471 pthread_mutex_unlock(&desc->mutex); 3472 } 3473 ret = 0; 3474 } 3475 3476 pthread_mutex_unlock(&bdev->internal.mutex); 3477 3478 return ret; 3479 } 3480 3481 /* 3482 * Convert I/O offset and length from bytes to blocks. 3483 * 3484 * Returns zero on success or non-zero if the byte parameters aren't divisible by the block size. 3485 */ 3486 static uint64_t 3487 bdev_bytes_to_blocks(struct spdk_bdev *bdev, uint64_t offset_bytes, uint64_t *offset_blocks, 3488 uint64_t num_bytes, uint64_t *num_blocks) 3489 { 3490 uint32_t block_size = bdev->blocklen; 3491 uint8_t shift_cnt; 3492 3493 /* Avoid expensive div operations if possible. These spdk_u32 functions are very cheap. */ 3494 if (spdk_likely(spdk_u32_is_pow2(block_size))) { 3495 shift_cnt = spdk_u32log2(block_size); 3496 *offset_blocks = offset_bytes >> shift_cnt; 3497 *num_blocks = num_bytes >> shift_cnt; 3498 return (offset_bytes - (*offset_blocks << shift_cnt)) | 3499 (num_bytes - (*num_blocks << shift_cnt)); 3500 } else { 3501 *offset_blocks = offset_bytes / block_size; 3502 *num_blocks = num_bytes / block_size; 3503 return (offset_bytes % block_size) | (num_bytes % block_size); 3504 } 3505 } 3506 3507 static bool 3508 bdev_io_valid_blocks(struct spdk_bdev *bdev, uint64_t offset_blocks, uint64_t num_blocks) 3509 { 3510 /* Return failure if offset_blocks + num_blocks is less than offset_blocks; indicates there 3511 * has been an overflow and hence the offset has been wrapped around */ 3512 if (offset_blocks + num_blocks < offset_blocks) { 3513 return false; 3514 } 3515 3516 /* Return failure if offset_blocks + num_blocks exceeds the size of the bdev */ 3517 if (offset_blocks + num_blocks > bdev->blockcnt) { 3518 return false; 3519 } 3520 3521 return true; 3522 } 3523 3524 static bool 3525 _bdev_io_check_md_buf(const struct iovec *iovs, const void *md_buf) 3526 { 3527 return _is_buf_allocated(iovs) == (md_buf != NULL); 3528 } 3529 3530 static int 3531 bdev_read_blocks_with_md(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, void *buf, 3532 void *md_buf, int64_t offset_blocks, uint64_t num_blocks, 3533 spdk_bdev_io_completion_cb cb, void *cb_arg) 3534 { 3535 struct spdk_bdev *bdev = spdk_bdev_desc_get_bdev(desc); 3536 struct spdk_bdev_io *bdev_io; 3537 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 3538 3539 if (!bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) { 3540 return -EINVAL; 3541 } 3542 3543 bdev_io = bdev_channel_get_io(channel); 3544 if (!bdev_io) { 3545 return -ENOMEM; 3546 } 3547 3548 bdev_io->internal.ch = channel; 3549 bdev_io->internal.desc = desc; 3550 bdev_io->type = SPDK_BDEV_IO_TYPE_READ; 3551 bdev_io->u.bdev.iovs = &bdev_io->iov; 3552 bdev_io->u.bdev.iovs[0].iov_base = buf; 3553 bdev_io->u.bdev.iovs[0].iov_len = num_blocks * bdev->blocklen; 3554 bdev_io->u.bdev.iovcnt = 1; 3555 bdev_io->u.bdev.md_buf = md_buf; 3556 bdev_io->u.bdev.num_blocks = num_blocks; 3557 bdev_io->u.bdev.offset_blocks = offset_blocks; 3558 bdev_io_init(bdev_io, bdev, cb_arg, cb); 3559 3560 bdev_io_submit(bdev_io); 3561 return 0; 3562 } 3563 3564 int 3565 spdk_bdev_read(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 3566 void *buf, uint64_t offset, uint64_t nbytes, 3567 spdk_bdev_io_completion_cb cb, void *cb_arg) 3568 { 3569 uint64_t offset_blocks, num_blocks; 3570 3571 if (bdev_bytes_to_blocks(spdk_bdev_desc_get_bdev(desc), offset, &offset_blocks, 3572 nbytes, &num_blocks) != 0) { 3573 return -EINVAL; 3574 } 3575 3576 return spdk_bdev_read_blocks(desc, ch, buf, offset_blocks, num_blocks, cb, cb_arg); 3577 } 3578 3579 int 3580 spdk_bdev_read_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 3581 void *buf, uint64_t offset_blocks, uint64_t num_blocks, 3582 spdk_bdev_io_completion_cb cb, void *cb_arg) 3583 { 3584 return bdev_read_blocks_with_md(desc, ch, buf, NULL, offset_blocks, num_blocks, cb, cb_arg); 3585 } 3586 3587 int 3588 spdk_bdev_read_blocks_with_md(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 3589 void *buf, void *md_buf, int64_t offset_blocks, uint64_t num_blocks, 3590 spdk_bdev_io_completion_cb cb, void *cb_arg) 3591 { 3592 struct iovec iov = { 3593 .iov_base = buf, 3594 }; 3595 3596 if (!spdk_bdev_is_md_separate(spdk_bdev_desc_get_bdev(desc))) { 3597 return -EINVAL; 3598 } 3599 3600 if (!_bdev_io_check_md_buf(&iov, md_buf)) { 3601 return -EINVAL; 3602 } 3603 3604 return bdev_read_blocks_with_md(desc, ch, buf, md_buf, offset_blocks, num_blocks, 3605 cb, cb_arg); 3606 } 3607 3608 int 3609 spdk_bdev_readv(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 3610 struct iovec *iov, int iovcnt, 3611 uint64_t offset, uint64_t nbytes, 3612 spdk_bdev_io_completion_cb cb, void *cb_arg) 3613 { 3614 uint64_t offset_blocks, num_blocks; 3615 3616 if (bdev_bytes_to_blocks(spdk_bdev_desc_get_bdev(desc), offset, &offset_blocks, 3617 nbytes, &num_blocks) != 0) { 3618 return -EINVAL; 3619 } 3620 3621 return spdk_bdev_readv_blocks(desc, ch, iov, iovcnt, offset_blocks, num_blocks, cb, cb_arg); 3622 } 3623 3624 static int 3625 bdev_readv_blocks_with_md(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 3626 struct iovec *iov, int iovcnt, void *md_buf, uint64_t offset_blocks, 3627 uint64_t num_blocks, spdk_bdev_io_completion_cb cb, void *cb_arg) 3628 { 3629 struct spdk_bdev *bdev = spdk_bdev_desc_get_bdev(desc); 3630 struct spdk_bdev_io *bdev_io; 3631 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 3632 3633 if (!bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) { 3634 return -EINVAL; 3635 } 3636 3637 bdev_io = bdev_channel_get_io(channel); 3638 if (!bdev_io) { 3639 return -ENOMEM; 3640 } 3641 3642 bdev_io->internal.ch = channel; 3643 bdev_io->internal.desc = desc; 3644 bdev_io->type = SPDK_BDEV_IO_TYPE_READ; 3645 bdev_io->u.bdev.iovs = iov; 3646 bdev_io->u.bdev.iovcnt = iovcnt; 3647 bdev_io->u.bdev.md_buf = md_buf; 3648 bdev_io->u.bdev.num_blocks = num_blocks; 3649 bdev_io->u.bdev.offset_blocks = offset_blocks; 3650 bdev_io_init(bdev_io, bdev, cb_arg, cb); 3651 3652 bdev_io_submit(bdev_io); 3653 return 0; 3654 } 3655 3656 int spdk_bdev_readv_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 3657 struct iovec *iov, int iovcnt, 3658 uint64_t offset_blocks, uint64_t num_blocks, 3659 spdk_bdev_io_completion_cb cb, void *cb_arg) 3660 { 3661 return bdev_readv_blocks_with_md(desc, ch, iov, iovcnt, NULL, offset_blocks, 3662 num_blocks, cb, cb_arg); 3663 } 3664 3665 int 3666 spdk_bdev_readv_blocks_with_md(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 3667 struct iovec *iov, int iovcnt, void *md_buf, 3668 uint64_t offset_blocks, uint64_t num_blocks, 3669 spdk_bdev_io_completion_cb cb, void *cb_arg) 3670 { 3671 if (!spdk_bdev_is_md_separate(spdk_bdev_desc_get_bdev(desc))) { 3672 return -EINVAL; 3673 } 3674 3675 if (!_bdev_io_check_md_buf(iov, md_buf)) { 3676 return -EINVAL; 3677 } 3678 3679 return bdev_readv_blocks_with_md(desc, ch, iov, iovcnt, md_buf, offset_blocks, 3680 num_blocks, cb, cb_arg); 3681 } 3682 3683 static int 3684 bdev_write_blocks_with_md(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 3685 void *buf, void *md_buf, uint64_t offset_blocks, uint64_t num_blocks, 3686 spdk_bdev_io_completion_cb cb, void *cb_arg) 3687 { 3688 struct spdk_bdev *bdev = spdk_bdev_desc_get_bdev(desc); 3689 struct spdk_bdev_io *bdev_io; 3690 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 3691 3692 if (!desc->write) { 3693 return -EBADF; 3694 } 3695 3696 if (!bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) { 3697 return -EINVAL; 3698 } 3699 3700 bdev_io = bdev_channel_get_io(channel); 3701 if (!bdev_io) { 3702 return -ENOMEM; 3703 } 3704 3705 bdev_io->internal.ch = channel; 3706 bdev_io->internal.desc = desc; 3707 bdev_io->type = SPDK_BDEV_IO_TYPE_WRITE; 3708 bdev_io->u.bdev.iovs = &bdev_io->iov; 3709 bdev_io->u.bdev.iovs[0].iov_base = buf; 3710 bdev_io->u.bdev.iovs[0].iov_len = num_blocks * bdev->blocklen; 3711 bdev_io->u.bdev.iovcnt = 1; 3712 bdev_io->u.bdev.md_buf = md_buf; 3713 bdev_io->u.bdev.num_blocks = num_blocks; 3714 bdev_io->u.bdev.offset_blocks = offset_blocks; 3715 bdev_io_init(bdev_io, bdev, cb_arg, cb); 3716 3717 bdev_io_submit(bdev_io); 3718 return 0; 3719 } 3720 3721 int 3722 spdk_bdev_write(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 3723 void *buf, uint64_t offset, uint64_t nbytes, 3724 spdk_bdev_io_completion_cb cb, void *cb_arg) 3725 { 3726 uint64_t offset_blocks, num_blocks; 3727 3728 if (bdev_bytes_to_blocks(spdk_bdev_desc_get_bdev(desc), offset, &offset_blocks, 3729 nbytes, &num_blocks) != 0) { 3730 return -EINVAL; 3731 } 3732 3733 return spdk_bdev_write_blocks(desc, ch, buf, offset_blocks, num_blocks, cb, cb_arg); 3734 } 3735 3736 int 3737 spdk_bdev_write_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 3738 void *buf, uint64_t offset_blocks, uint64_t num_blocks, 3739 spdk_bdev_io_completion_cb cb, void *cb_arg) 3740 { 3741 return bdev_write_blocks_with_md(desc, ch, buf, NULL, offset_blocks, num_blocks, 3742 cb, cb_arg); 3743 } 3744 3745 int 3746 spdk_bdev_write_blocks_with_md(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 3747 void *buf, void *md_buf, uint64_t offset_blocks, uint64_t num_blocks, 3748 spdk_bdev_io_completion_cb cb, void *cb_arg) 3749 { 3750 struct iovec iov = { 3751 .iov_base = buf, 3752 }; 3753 3754 if (!spdk_bdev_is_md_separate(spdk_bdev_desc_get_bdev(desc))) { 3755 return -EINVAL; 3756 } 3757 3758 if (!_bdev_io_check_md_buf(&iov, md_buf)) { 3759 return -EINVAL; 3760 } 3761 3762 return bdev_write_blocks_with_md(desc, ch, buf, md_buf, offset_blocks, num_blocks, 3763 cb, cb_arg); 3764 } 3765 3766 static int 3767 bdev_writev_blocks_with_md(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 3768 struct iovec *iov, int iovcnt, void *md_buf, 3769 uint64_t offset_blocks, uint64_t num_blocks, 3770 spdk_bdev_io_completion_cb cb, void *cb_arg) 3771 { 3772 struct spdk_bdev *bdev = spdk_bdev_desc_get_bdev(desc); 3773 struct spdk_bdev_io *bdev_io; 3774 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 3775 3776 if (!desc->write) { 3777 return -EBADF; 3778 } 3779 3780 if (!bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) { 3781 return -EINVAL; 3782 } 3783 3784 bdev_io = bdev_channel_get_io(channel); 3785 if (!bdev_io) { 3786 return -ENOMEM; 3787 } 3788 3789 bdev_io->internal.ch = channel; 3790 bdev_io->internal.desc = desc; 3791 bdev_io->type = SPDK_BDEV_IO_TYPE_WRITE; 3792 bdev_io->u.bdev.iovs = iov; 3793 bdev_io->u.bdev.iovcnt = iovcnt; 3794 bdev_io->u.bdev.md_buf = md_buf; 3795 bdev_io->u.bdev.num_blocks = num_blocks; 3796 bdev_io->u.bdev.offset_blocks = offset_blocks; 3797 bdev_io_init(bdev_io, bdev, cb_arg, cb); 3798 3799 bdev_io_submit(bdev_io); 3800 return 0; 3801 } 3802 3803 int 3804 spdk_bdev_writev(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 3805 struct iovec *iov, int iovcnt, 3806 uint64_t offset, uint64_t len, 3807 spdk_bdev_io_completion_cb cb, void *cb_arg) 3808 { 3809 uint64_t offset_blocks, num_blocks; 3810 3811 if (bdev_bytes_to_blocks(spdk_bdev_desc_get_bdev(desc), offset, &offset_blocks, 3812 len, &num_blocks) != 0) { 3813 return -EINVAL; 3814 } 3815 3816 return spdk_bdev_writev_blocks(desc, ch, iov, iovcnt, offset_blocks, num_blocks, cb, cb_arg); 3817 } 3818 3819 int 3820 spdk_bdev_writev_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 3821 struct iovec *iov, int iovcnt, 3822 uint64_t offset_blocks, uint64_t num_blocks, 3823 spdk_bdev_io_completion_cb cb, void *cb_arg) 3824 { 3825 return bdev_writev_blocks_with_md(desc, ch, iov, iovcnt, NULL, offset_blocks, 3826 num_blocks, cb, cb_arg); 3827 } 3828 3829 int 3830 spdk_bdev_writev_blocks_with_md(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 3831 struct iovec *iov, int iovcnt, void *md_buf, 3832 uint64_t offset_blocks, uint64_t num_blocks, 3833 spdk_bdev_io_completion_cb cb, void *cb_arg) 3834 { 3835 if (!spdk_bdev_is_md_separate(spdk_bdev_desc_get_bdev(desc))) { 3836 return -EINVAL; 3837 } 3838 3839 if (!_bdev_io_check_md_buf(iov, md_buf)) { 3840 return -EINVAL; 3841 } 3842 3843 return bdev_writev_blocks_with_md(desc, ch, iov, iovcnt, md_buf, offset_blocks, 3844 num_blocks, cb, cb_arg); 3845 } 3846 3847 static void 3848 bdev_compare_do_read_done(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg) 3849 { 3850 struct spdk_bdev_io *parent_io = cb_arg; 3851 uint8_t *read_buf = bdev_io->u.bdev.iovs[0].iov_base; 3852 int i, rc = 0; 3853 3854 if (!success) { 3855 parent_io->internal.status = SPDK_BDEV_IO_STATUS_FAILED; 3856 parent_io->internal.cb(parent_io, false, parent_io->internal.caller_ctx); 3857 spdk_bdev_free_io(bdev_io); 3858 return; 3859 } 3860 3861 for (i = 0; i < parent_io->u.bdev.iovcnt; i++) { 3862 rc = memcmp(read_buf, 3863 parent_io->u.bdev.iovs[i].iov_base, 3864 parent_io->u.bdev.iovs[i].iov_len); 3865 if (rc) { 3866 break; 3867 } 3868 read_buf += parent_io->u.bdev.iovs[i].iov_len; 3869 } 3870 3871 spdk_bdev_free_io(bdev_io); 3872 3873 if (rc == 0) { 3874 parent_io->internal.status = SPDK_BDEV_IO_STATUS_SUCCESS; 3875 parent_io->internal.cb(parent_io, true, parent_io->internal.caller_ctx); 3876 } else { 3877 parent_io->internal.status = SPDK_BDEV_IO_STATUS_MISCOMPARE; 3878 parent_io->internal.cb(parent_io, false, parent_io->internal.caller_ctx); 3879 } 3880 } 3881 3882 static void 3883 bdev_compare_do_read(void *_bdev_io) 3884 { 3885 struct spdk_bdev_io *bdev_io = _bdev_io; 3886 int rc; 3887 3888 rc = spdk_bdev_read_blocks(bdev_io->internal.desc, 3889 spdk_io_channel_from_ctx(bdev_io->internal.ch), NULL, 3890 bdev_io->u.bdev.offset_blocks, bdev_io->u.bdev.num_blocks, 3891 bdev_compare_do_read_done, bdev_io); 3892 3893 if (rc == -ENOMEM) { 3894 bdev_queue_io_wait_with_cb(bdev_io, bdev_compare_do_read); 3895 } else if (rc != 0) { 3896 bdev_io->internal.status = SPDK_BDEV_IO_STATUS_FAILED; 3897 bdev_io->internal.cb(bdev_io, false, bdev_io->internal.caller_ctx); 3898 } 3899 } 3900 3901 static int 3902 bdev_comparev_blocks_with_md(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 3903 struct iovec *iov, int iovcnt, void *md_buf, 3904 uint64_t offset_blocks, uint64_t num_blocks, 3905 spdk_bdev_io_completion_cb cb, void *cb_arg) 3906 { 3907 struct spdk_bdev *bdev = spdk_bdev_desc_get_bdev(desc); 3908 struct spdk_bdev_io *bdev_io; 3909 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 3910 3911 if (!bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) { 3912 return -EINVAL; 3913 } 3914 3915 bdev_io = bdev_channel_get_io(channel); 3916 if (!bdev_io) { 3917 return -ENOMEM; 3918 } 3919 3920 bdev_io->internal.ch = channel; 3921 bdev_io->internal.desc = desc; 3922 bdev_io->type = SPDK_BDEV_IO_TYPE_COMPARE; 3923 bdev_io->u.bdev.iovs = iov; 3924 bdev_io->u.bdev.iovcnt = iovcnt; 3925 bdev_io->u.bdev.md_buf = md_buf; 3926 bdev_io->u.bdev.num_blocks = num_blocks; 3927 bdev_io->u.bdev.offset_blocks = offset_blocks; 3928 bdev_io_init(bdev_io, bdev, cb_arg, cb); 3929 3930 if (bdev_io_type_supported(bdev, SPDK_BDEV_IO_TYPE_COMPARE)) { 3931 bdev_io_submit(bdev_io); 3932 return 0; 3933 } 3934 3935 bdev_compare_do_read(bdev_io); 3936 3937 return 0; 3938 } 3939 3940 int 3941 spdk_bdev_comparev_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 3942 struct iovec *iov, int iovcnt, 3943 uint64_t offset_blocks, uint64_t num_blocks, 3944 spdk_bdev_io_completion_cb cb, void *cb_arg) 3945 { 3946 return bdev_comparev_blocks_with_md(desc, ch, iov, iovcnt, NULL, offset_blocks, 3947 num_blocks, cb, cb_arg); 3948 } 3949 3950 int 3951 spdk_bdev_comparev_blocks_with_md(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 3952 struct iovec *iov, int iovcnt, void *md_buf, 3953 uint64_t offset_blocks, uint64_t num_blocks, 3954 spdk_bdev_io_completion_cb cb, void *cb_arg) 3955 { 3956 if (!spdk_bdev_is_md_separate(spdk_bdev_desc_get_bdev(desc))) { 3957 return -EINVAL; 3958 } 3959 3960 if (!_bdev_io_check_md_buf(iov, md_buf)) { 3961 return -EINVAL; 3962 } 3963 3964 return bdev_comparev_blocks_with_md(desc, ch, iov, iovcnt, md_buf, offset_blocks, 3965 num_blocks, cb, cb_arg); 3966 } 3967 3968 static int 3969 bdev_compare_blocks_with_md(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 3970 void *buf, void *md_buf, uint64_t offset_blocks, uint64_t num_blocks, 3971 spdk_bdev_io_completion_cb cb, void *cb_arg) 3972 { 3973 struct spdk_bdev *bdev = spdk_bdev_desc_get_bdev(desc); 3974 struct spdk_bdev_io *bdev_io; 3975 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 3976 3977 if (!bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) { 3978 return -EINVAL; 3979 } 3980 3981 bdev_io = bdev_channel_get_io(channel); 3982 if (!bdev_io) { 3983 return -ENOMEM; 3984 } 3985 3986 bdev_io->internal.ch = channel; 3987 bdev_io->internal.desc = desc; 3988 bdev_io->type = SPDK_BDEV_IO_TYPE_COMPARE; 3989 bdev_io->u.bdev.iovs = &bdev_io->iov; 3990 bdev_io->u.bdev.iovs[0].iov_base = buf; 3991 bdev_io->u.bdev.iovs[0].iov_len = num_blocks * bdev->blocklen; 3992 bdev_io->u.bdev.iovcnt = 1; 3993 bdev_io->u.bdev.md_buf = md_buf; 3994 bdev_io->u.bdev.num_blocks = num_blocks; 3995 bdev_io->u.bdev.offset_blocks = offset_blocks; 3996 bdev_io_init(bdev_io, bdev, cb_arg, cb); 3997 3998 if (bdev_io_type_supported(bdev, SPDK_BDEV_IO_TYPE_COMPARE)) { 3999 bdev_io_submit(bdev_io); 4000 return 0; 4001 } 4002 4003 bdev_compare_do_read(bdev_io); 4004 4005 return 0; 4006 } 4007 4008 int 4009 spdk_bdev_compare_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 4010 void *buf, uint64_t offset_blocks, uint64_t num_blocks, 4011 spdk_bdev_io_completion_cb cb, void *cb_arg) 4012 { 4013 return bdev_compare_blocks_with_md(desc, ch, buf, NULL, offset_blocks, num_blocks, 4014 cb, cb_arg); 4015 } 4016 4017 int 4018 spdk_bdev_compare_blocks_with_md(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 4019 void *buf, void *md_buf, uint64_t offset_blocks, uint64_t num_blocks, 4020 spdk_bdev_io_completion_cb cb, void *cb_arg) 4021 { 4022 struct iovec iov = { 4023 .iov_base = buf, 4024 }; 4025 4026 if (!spdk_bdev_is_md_separate(spdk_bdev_desc_get_bdev(desc))) { 4027 return -EINVAL; 4028 } 4029 4030 if (!_bdev_io_check_md_buf(&iov, md_buf)) { 4031 return -EINVAL; 4032 } 4033 4034 return bdev_compare_blocks_with_md(desc, ch, buf, md_buf, offset_blocks, num_blocks, 4035 cb, cb_arg); 4036 } 4037 4038 static void 4039 bdev_comparev_and_writev_blocks_unlocked(void *ctx, int unlock_status) 4040 { 4041 struct spdk_bdev_io *bdev_io = ctx; 4042 4043 if (unlock_status) { 4044 SPDK_ERRLOG("LBA range unlock failed\n"); 4045 } 4046 4047 bdev_io->internal.cb(bdev_io, bdev_io->internal.status == SPDK_BDEV_IO_STATUS_SUCCESS ? true : 4048 false, bdev_io->internal.caller_ctx); 4049 } 4050 4051 static void 4052 bdev_comparev_and_writev_blocks_unlock(struct spdk_bdev_io *bdev_io, int status) 4053 { 4054 bdev_io->internal.status = status; 4055 4056 bdev_unlock_lba_range(bdev_io->internal.desc, spdk_io_channel_from_ctx(bdev_io->internal.ch), 4057 bdev_io->u.bdev.offset_blocks, bdev_io->u.bdev.num_blocks, 4058 bdev_comparev_and_writev_blocks_unlocked, bdev_io); 4059 } 4060 4061 static void 4062 bdev_compare_and_write_do_write_done(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg) 4063 { 4064 struct spdk_bdev_io *parent_io = cb_arg; 4065 4066 if (!success) { 4067 SPDK_ERRLOG("Compare and write operation failed\n"); 4068 } 4069 4070 spdk_bdev_free_io(bdev_io); 4071 4072 bdev_comparev_and_writev_blocks_unlock(parent_io, 4073 success ? SPDK_BDEV_IO_STATUS_SUCCESS : SPDK_BDEV_IO_STATUS_FAILED); 4074 } 4075 4076 static void 4077 bdev_compare_and_write_do_write(void *_bdev_io) 4078 { 4079 struct spdk_bdev_io *bdev_io = _bdev_io; 4080 int rc; 4081 4082 rc = spdk_bdev_writev_blocks(bdev_io->internal.desc, 4083 spdk_io_channel_from_ctx(bdev_io->internal.ch), 4084 bdev_io->u.bdev.fused_iovs, bdev_io->u.bdev.fused_iovcnt, 4085 bdev_io->u.bdev.offset_blocks, bdev_io->u.bdev.num_blocks, 4086 bdev_compare_and_write_do_write_done, bdev_io); 4087 4088 4089 if (rc == -ENOMEM) { 4090 bdev_queue_io_wait_with_cb(bdev_io, bdev_compare_and_write_do_write); 4091 } else if (rc != 0) { 4092 bdev_comparev_and_writev_blocks_unlock(bdev_io, SPDK_BDEV_IO_STATUS_FAILED); 4093 } 4094 } 4095 4096 static void 4097 bdev_compare_and_write_do_compare_done(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg) 4098 { 4099 struct spdk_bdev_io *parent_io = cb_arg; 4100 4101 spdk_bdev_free_io(bdev_io); 4102 4103 if (!success) { 4104 bdev_comparev_and_writev_blocks_unlock(parent_io, SPDK_BDEV_IO_STATUS_MISCOMPARE); 4105 return; 4106 } 4107 4108 bdev_compare_and_write_do_write(parent_io); 4109 } 4110 4111 static void 4112 bdev_compare_and_write_do_compare(void *_bdev_io) 4113 { 4114 struct spdk_bdev_io *bdev_io = _bdev_io; 4115 int rc; 4116 4117 rc = spdk_bdev_comparev_blocks(bdev_io->internal.desc, 4118 spdk_io_channel_from_ctx(bdev_io->internal.ch), bdev_io->u.bdev.iovs, 4119 bdev_io->u.bdev.iovcnt, bdev_io->u.bdev.offset_blocks, bdev_io->u.bdev.num_blocks, 4120 bdev_compare_and_write_do_compare_done, bdev_io); 4121 4122 if (rc == -ENOMEM) { 4123 bdev_queue_io_wait_with_cb(bdev_io, bdev_compare_and_write_do_compare); 4124 } else if (rc != 0) { 4125 bdev_comparev_and_writev_blocks_unlock(bdev_io, SPDK_BDEV_IO_STATUS_FIRST_FUSED_FAILED); 4126 } 4127 } 4128 4129 static void 4130 bdev_comparev_and_writev_blocks_locked(void *ctx, int status) 4131 { 4132 struct spdk_bdev_io *bdev_io = ctx; 4133 4134 if (status) { 4135 bdev_io->internal.status = SPDK_BDEV_IO_STATUS_FIRST_FUSED_FAILED; 4136 bdev_io->internal.cb(bdev_io, false, bdev_io->internal.caller_ctx); 4137 return; 4138 } 4139 4140 bdev_compare_and_write_do_compare(bdev_io); 4141 } 4142 4143 int 4144 spdk_bdev_comparev_and_writev_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 4145 struct iovec *compare_iov, int compare_iovcnt, 4146 struct iovec *write_iov, int write_iovcnt, 4147 uint64_t offset_blocks, uint64_t num_blocks, 4148 spdk_bdev_io_completion_cb cb, void *cb_arg) 4149 { 4150 struct spdk_bdev *bdev = spdk_bdev_desc_get_bdev(desc); 4151 struct spdk_bdev_io *bdev_io; 4152 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 4153 4154 if (!desc->write) { 4155 return -EBADF; 4156 } 4157 4158 if (!bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) { 4159 return -EINVAL; 4160 } 4161 4162 if (num_blocks > bdev->acwu) { 4163 return -EINVAL; 4164 } 4165 4166 bdev_io = bdev_channel_get_io(channel); 4167 if (!bdev_io) { 4168 return -ENOMEM; 4169 } 4170 4171 bdev_io->internal.ch = channel; 4172 bdev_io->internal.desc = desc; 4173 bdev_io->type = SPDK_BDEV_IO_TYPE_COMPARE_AND_WRITE; 4174 bdev_io->u.bdev.iovs = compare_iov; 4175 bdev_io->u.bdev.iovcnt = compare_iovcnt; 4176 bdev_io->u.bdev.fused_iovs = write_iov; 4177 bdev_io->u.bdev.fused_iovcnt = write_iovcnt; 4178 bdev_io->u.bdev.md_buf = NULL; 4179 bdev_io->u.bdev.num_blocks = num_blocks; 4180 bdev_io->u.bdev.offset_blocks = offset_blocks; 4181 bdev_io_init(bdev_io, bdev, cb_arg, cb); 4182 4183 if (bdev_io_type_supported(bdev, SPDK_BDEV_IO_TYPE_COMPARE_AND_WRITE)) { 4184 bdev_io_submit(bdev_io); 4185 return 0; 4186 } 4187 4188 return bdev_lock_lba_range(desc, ch, offset_blocks, num_blocks, 4189 bdev_comparev_and_writev_blocks_locked, bdev_io); 4190 } 4191 4192 static void 4193 bdev_zcopy_get_buf(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io, bool success) 4194 { 4195 if (!success) { 4196 /* Don't use spdk_bdev_io_complete here - this bdev_io was never actually submitted. */ 4197 bdev_io->internal.status = SPDK_BDEV_IO_STATUS_NOMEM; 4198 bdev_io->internal.cb(bdev_io, success, bdev_io->internal.caller_ctx); 4199 return; 4200 } 4201 4202 if (bdev_io->u.bdev.zcopy.populate) { 4203 /* Read the real data into the buffer */ 4204 bdev_io->type = SPDK_BDEV_IO_TYPE_READ; 4205 bdev_io->internal.status = SPDK_BDEV_IO_STATUS_PENDING; 4206 bdev_io_submit(bdev_io); 4207 return; 4208 } 4209 4210 /* Don't use spdk_bdev_io_complete here - this bdev_io was never actually submitted. */ 4211 bdev_io->internal.status = SPDK_BDEV_IO_STATUS_SUCCESS; 4212 bdev_io->internal.cb(bdev_io, success, bdev_io->internal.caller_ctx); 4213 } 4214 4215 int 4216 spdk_bdev_zcopy_start(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 4217 uint64_t offset_blocks, uint64_t num_blocks, 4218 bool populate, 4219 spdk_bdev_io_completion_cb cb, void *cb_arg) 4220 { 4221 struct spdk_bdev *bdev = spdk_bdev_desc_get_bdev(desc); 4222 struct spdk_bdev_io *bdev_io; 4223 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 4224 4225 if (!desc->write) { 4226 return -EBADF; 4227 } 4228 4229 if (!bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) { 4230 return -EINVAL; 4231 } 4232 4233 if (!spdk_bdev_io_type_supported(bdev, SPDK_BDEV_IO_TYPE_ZCOPY)) { 4234 return -ENOTSUP; 4235 } 4236 4237 bdev_io = bdev_channel_get_io(channel); 4238 if (!bdev_io) { 4239 return -ENOMEM; 4240 } 4241 4242 bdev_io->internal.ch = channel; 4243 bdev_io->internal.desc = desc; 4244 bdev_io->type = SPDK_BDEV_IO_TYPE_ZCOPY; 4245 bdev_io->u.bdev.num_blocks = num_blocks; 4246 bdev_io->u.bdev.offset_blocks = offset_blocks; 4247 bdev_io->u.bdev.iovs = NULL; 4248 bdev_io->u.bdev.iovcnt = 0; 4249 bdev_io->u.bdev.md_buf = NULL; 4250 bdev_io->u.bdev.zcopy.populate = populate ? 1 : 0; 4251 bdev_io->u.bdev.zcopy.commit = 0; 4252 bdev_io->u.bdev.zcopy.start = 1; 4253 bdev_io_init(bdev_io, bdev, cb_arg, cb); 4254 4255 if (bdev_io_type_supported(bdev, SPDK_BDEV_IO_TYPE_ZCOPY)) { 4256 bdev_io_submit(bdev_io); 4257 } else { 4258 /* Emulate zcopy by allocating a buffer */ 4259 spdk_bdev_io_get_buf(bdev_io, bdev_zcopy_get_buf, 4260 bdev_io->u.bdev.num_blocks * bdev->blocklen); 4261 } 4262 4263 return 0; 4264 } 4265 4266 int 4267 spdk_bdev_zcopy_end(struct spdk_bdev_io *bdev_io, bool commit, 4268 spdk_bdev_io_completion_cb cb, void *cb_arg) 4269 { 4270 struct spdk_bdev *bdev = bdev_io->bdev; 4271 4272 if (bdev_io->type == SPDK_BDEV_IO_TYPE_READ) { 4273 /* This can happen if the zcopy was emulated in start */ 4274 if (bdev_io->u.bdev.zcopy.start != 1) { 4275 return -EINVAL; 4276 } 4277 bdev_io->type = SPDK_BDEV_IO_TYPE_ZCOPY; 4278 } 4279 4280 if (bdev_io->type != SPDK_BDEV_IO_TYPE_ZCOPY) { 4281 return -EINVAL; 4282 } 4283 4284 bdev_io->u.bdev.zcopy.commit = commit ? 1 : 0; 4285 bdev_io->u.bdev.zcopy.start = 0; 4286 bdev_io->internal.caller_ctx = cb_arg; 4287 bdev_io->internal.cb = cb; 4288 bdev_io->internal.status = SPDK_BDEV_IO_STATUS_PENDING; 4289 4290 if (bdev_io_type_supported(bdev, SPDK_BDEV_IO_TYPE_ZCOPY)) { 4291 bdev_io_submit(bdev_io); 4292 return 0; 4293 } 4294 4295 if (!bdev_io->u.bdev.zcopy.commit) { 4296 /* Don't use spdk_bdev_io_complete here - this bdev_io was never actually submitted. */ 4297 bdev_io->internal.status = SPDK_BDEV_IO_STATUS_SUCCESS; 4298 bdev_io->internal.cb(bdev_io, true, bdev_io->internal.caller_ctx); 4299 return 0; 4300 } 4301 4302 bdev_io->type = SPDK_BDEV_IO_TYPE_WRITE; 4303 bdev_io_submit(bdev_io); 4304 4305 return 0; 4306 } 4307 4308 int 4309 spdk_bdev_write_zeroes(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 4310 uint64_t offset, uint64_t len, 4311 spdk_bdev_io_completion_cb cb, void *cb_arg) 4312 { 4313 uint64_t offset_blocks, num_blocks; 4314 4315 if (bdev_bytes_to_blocks(spdk_bdev_desc_get_bdev(desc), offset, &offset_blocks, 4316 len, &num_blocks) != 0) { 4317 return -EINVAL; 4318 } 4319 4320 return spdk_bdev_write_zeroes_blocks(desc, ch, offset_blocks, num_blocks, cb, cb_arg); 4321 } 4322 4323 int 4324 spdk_bdev_write_zeroes_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 4325 uint64_t offset_blocks, uint64_t num_blocks, 4326 spdk_bdev_io_completion_cb cb, void *cb_arg) 4327 { 4328 struct spdk_bdev *bdev = spdk_bdev_desc_get_bdev(desc); 4329 struct spdk_bdev_io *bdev_io; 4330 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 4331 4332 if (!desc->write) { 4333 return -EBADF; 4334 } 4335 4336 if (!bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) { 4337 return -EINVAL; 4338 } 4339 4340 if (!bdev_io_type_supported(bdev, SPDK_BDEV_IO_TYPE_WRITE_ZEROES) && 4341 !bdev_io_type_supported(bdev, SPDK_BDEV_IO_TYPE_WRITE)) { 4342 return -ENOTSUP; 4343 } 4344 4345 bdev_io = bdev_channel_get_io(channel); 4346 4347 if (!bdev_io) { 4348 return -ENOMEM; 4349 } 4350 4351 bdev_io->type = SPDK_BDEV_IO_TYPE_WRITE_ZEROES; 4352 bdev_io->internal.ch = channel; 4353 bdev_io->internal.desc = desc; 4354 bdev_io->u.bdev.offset_blocks = offset_blocks; 4355 bdev_io->u.bdev.num_blocks = num_blocks; 4356 bdev_io_init(bdev_io, bdev, cb_arg, cb); 4357 4358 if (bdev_io_type_supported(bdev, SPDK_BDEV_IO_TYPE_WRITE_ZEROES)) { 4359 bdev_io_submit(bdev_io); 4360 return 0; 4361 } 4362 4363 assert(bdev_io_type_supported(bdev, SPDK_BDEV_IO_TYPE_WRITE)); 4364 assert(_bdev_get_block_size_with_md(bdev) <= ZERO_BUFFER_SIZE); 4365 bdev_io->u.bdev.split_remaining_num_blocks = num_blocks; 4366 bdev_io->u.bdev.split_current_offset_blocks = offset_blocks; 4367 bdev_write_zero_buffer_next(bdev_io); 4368 4369 return 0; 4370 } 4371 4372 int 4373 spdk_bdev_unmap(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 4374 uint64_t offset, uint64_t nbytes, 4375 spdk_bdev_io_completion_cb cb, void *cb_arg) 4376 { 4377 uint64_t offset_blocks, num_blocks; 4378 4379 if (bdev_bytes_to_blocks(spdk_bdev_desc_get_bdev(desc), offset, &offset_blocks, 4380 nbytes, &num_blocks) != 0) { 4381 return -EINVAL; 4382 } 4383 4384 return spdk_bdev_unmap_blocks(desc, ch, offset_blocks, num_blocks, cb, cb_arg); 4385 } 4386 4387 int 4388 spdk_bdev_unmap_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 4389 uint64_t offset_blocks, uint64_t num_blocks, 4390 spdk_bdev_io_completion_cb cb, void *cb_arg) 4391 { 4392 struct spdk_bdev *bdev = spdk_bdev_desc_get_bdev(desc); 4393 struct spdk_bdev_io *bdev_io; 4394 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 4395 4396 if (!desc->write) { 4397 return -EBADF; 4398 } 4399 4400 if (!bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) { 4401 return -EINVAL; 4402 } 4403 4404 if (num_blocks == 0) { 4405 SPDK_ERRLOG("Can't unmap 0 bytes\n"); 4406 return -EINVAL; 4407 } 4408 4409 bdev_io = bdev_channel_get_io(channel); 4410 if (!bdev_io) { 4411 return -ENOMEM; 4412 } 4413 4414 bdev_io->internal.ch = channel; 4415 bdev_io->internal.desc = desc; 4416 bdev_io->type = SPDK_BDEV_IO_TYPE_UNMAP; 4417 4418 bdev_io->u.bdev.iovs = &bdev_io->iov; 4419 bdev_io->u.bdev.iovs[0].iov_base = NULL; 4420 bdev_io->u.bdev.iovs[0].iov_len = 0; 4421 bdev_io->u.bdev.iovcnt = 1; 4422 4423 bdev_io->u.bdev.offset_blocks = offset_blocks; 4424 bdev_io->u.bdev.num_blocks = num_blocks; 4425 bdev_io_init(bdev_io, bdev, cb_arg, cb); 4426 4427 bdev_io_submit(bdev_io); 4428 return 0; 4429 } 4430 4431 int 4432 spdk_bdev_flush(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 4433 uint64_t offset, uint64_t length, 4434 spdk_bdev_io_completion_cb cb, void *cb_arg) 4435 { 4436 uint64_t offset_blocks, num_blocks; 4437 4438 if (bdev_bytes_to_blocks(spdk_bdev_desc_get_bdev(desc), offset, &offset_blocks, 4439 length, &num_blocks) != 0) { 4440 return -EINVAL; 4441 } 4442 4443 return spdk_bdev_flush_blocks(desc, ch, offset_blocks, num_blocks, cb, cb_arg); 4444 } 4445 4446 int 4447 spdk_bdev_flush_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 4448 uint64_t offset_blocks, uint64_t num_blocks, 4449 spdk_bdev_io_completion_cb cb, void *cb_arg) 4450 { 4451 struct spdk_bdev *bdev = spdk_bdev_desc_get_bdev(desc); 4452 struct spdk_bdev_io *bdev_io; 4453 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 4454 4455 if (!desc->write) { 4456 return -EBADF; 4457 } 4458 4459 if (!bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) { 4460 return -EINVAL; 4461 } 4462 4463 bdev_io = bdev_channel_get_io(channel); 4464 if (!bdev_io) { 4465 return -ENOMEM; 4466 } 4467 4468 bdev_io->internal.ch = channel; 4469 bdev_io->internal.desc = desc; 4470 bdev_io->type = SPDK_BDEV_IO_TYPE_FLUSH; 4471 bdev_io->u.bdev.iovs = NULL; 4472 bdev_io->u.bdev.iovcnt = 0; 4473 bdev_io->u.bdev.offset_blocks = offset_blocks; 4474 bdev_io->u.bdev.num_blocks = num_blocks; 4475 bdev_io_init(bdev_io, bdev, cb_arg, cb); 4476 4477 bdev_io_submit(bdev_io); 4478 return 0; 4479 } 4480 4481 static void 4482 bdev_reset_dev(struct spdk_io_channel_iter *i, int status) 4483 { 4484 struct spdk_bdev_channel *ch = spdk_io_channel_iter_get_ctx(i); 4485 struct spdk_bdev_io *bdev_io; 4486 4487 bdev_io = TAILQ_FIRST(&ch->queued_resets); 4488 TAILQ_REMOVE(&ch->queued_resets, bdev_io, internal.link); 4489 bdev_io_submit_reset(bdev_io); 4490 } 4491 4492 static void 4493 bdev_reset_freeze_channel(struct spdk_io_channel_iter *i) 4494 { 4495 struct spdk_io_channel *ch; 4496 struct spdk_bdev_channel *channel; 4497 struct spdk_bdev_mgmt_channel *mgmt_channel; 4498 struct spdk_bdev_shared_resource *shared_resource; 4499 bdev_io_tailq_t tmp_queued; 4500 4501 TAILQ_INIT(&tmp_queued); 4502 4503 ch = spdk_io_channel_iter_get_channel(i); 4504 channel = spdk_io_channel_get_ctx(ch); 4505 shared_resource = channel->shared_resource; 4506 mgmt_channel = shared_resource->mgmt_ch; 4507 4508 channel->flags |= BDEV_CH_RESET_IN_PROGRESS; 4509 4510 if ((channel->flags & BDEV_CH_QOS_ENABLED) != 0) { 4511 /* The QoS object is always valid and readable while 4512 * the channel flag is set, so the lock here should not 4513 * be necessary. We're not in the fast path though, so 4514 * just take it anyway. */ 4515 pthread_mutex_lock(&channel->bdev->internal.mutex); 4516 if (channel->bdev->internal.qos->ch == channel) { 4517 TAILQ_SWAP(&channel->bdev->internal.qos->queued, &tmp_queued, spdk_bdev_io, internal.link); 4518 } 4519 pthread_mutex_unlock(&channel->bdev->internal.mutex); 4520 } 4521 4522 bdev_abort_all_queued_io(&shared_resource->nomem_io, channel); 4523 bdev_abort_all_buf_io(&mgmt_channel->need_buf_small, channel); 4524 bdev_abort_all_buf_io(&mgmt_channel->need_buf_large, channel); 4525 bdev_abort_all_queued_io(&tmp_queued, channel); 4526 4527 spdk_for_each_channel_continue(i, 0); 4528 } 4529 4530 static void 4531 bdev_start_reset(void *ctx) 4532 { 4533 struct spdk_bdev_channel *ch = ctx; 4534 4535 spdk_for_each_channel(__bdev_to_io_dev(ch->bdev), bdev_reset_freeze_channel, 4536 ch, bdev_reset_dev); 4537 } 4538 4539 static void 4540 bdev_channel_start_reset(struct spdk_bdev_channel *ch) 4541 { 4542 struct spdk_bdev *bdev = ch->bdev; 4543 4544 assert(!TAILQ_EMPTY(&ch->queued_resets)); 4545 4546 pthread_mutex_lock(&bdev->internal.mutex); 4547 if (bdev->internal.reset_in_progress == NULL) { 4548 bdev->internal.reset_in_progress = TAILQ_FIRST(&ch->queued_resets); 4549 /* 4550 * Take a channel reference for the target bdev for the life of this 4551 * reset. This guards against the channel getting destroyed while 4552 * spdk_for_each_channel() calls related to this reset IO are in 4553 * progress. We will release the reference when this reset is 4554 * completed. 4555 */ 4556 bdev->internal.reset_in_progress->u.reset.ch_ref = spdk_get_io_channel(__bdev_to_io_dev(bdev)); 4557 bdev_start_reset(ch); 4558 } 4559 pthread_mutex_unlock(&bdev->internal.mutex); 4560 } 4561 4562 int 4563 spdk_bdev_reset(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 4564 spdk_bdev_io_completion_cb cb, void *cb_arg) 4565 { 4566 struct spdk_bdev *bdev = spdk_bdev_desc_get_bdev(desc); 4567 struct spdk_bdev_io *bdev_io; 4568 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 4569 4570 bdev_io = bdev_channel_get_io(channel); 4571 if (!bdev_io) { 4572 return -ENOMEM; 4573 } 4574 4575 bdev_io->internal.ch = channel; 4576 bdev_io->internal.desc = desc; 4577 bdev_io->internal.submit_tsc = spdk_get_ticks(); 4578 bdev_io->type = SPDK_BDEV_IO_TYPE_RESET; 4579 bdev_io->u.reset.ch_ref = NULL; 4580 bdev_io_init(bdev_io, bdev, cb_arg, cb); 4581 4582 pthread_mutex_lock(&bdev->internal.mutex); 4583 TAILQ_INSERT_TAIL(&channel->queued_resets, bdev_io, internal.link); 4584 pthread_mutex_unlock(&bdev->internal.mutex); 4585 4586 TAILQ_INSERT_TAIL(&bdev_io->internal.ch->io_submitted, bdev_io, 4587 internal.ch_link); 4588 4589 bdev_channel_start_reset(channel); 4590 4591 return 0; 4592 } 4593 4594 void 4595 spdk_bdev_get_io_stat(struct spdk_bdev *bdev, struct spdk_io_channel *ch, 4596 struct spdk_bdev_io_stat *stat) 4597 { 4598 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 4599 4600 *stat = channel->stat; 4601 } 4602 4603 static void 4604 bdev_get_device_stat_done(struct spdk_io_channel_iter *i, int status) 4605 { 4606 void *io_device = spdk_io_channel_iter_get_io_device(i); 4607 struct spdk_bdev_iostat_ctx *bdev_iostat_ctx = spdk_io_channel_iter_get_ctx(i); 4608 4609 bdev_iostat_ctx->cb(__bdev_from_io_dev(io_device), bdev_iostat_ctx->stat, 4610 bdev_iostat_ctx->cb_arg, 0); 4611 free(bdev_iostat_ctx); 4612 } 4613 4614 static void 4615 bdev_get_each_channel_stat(struct spdk_io_channel_iter *i) 4616 { 4617 struct spdk_bdev_iostat_ctx *bdev_iostat_ctx = spdk_io_channel_iter_get_ctx(i); 4618 struct spdk_io_channel *ch = spdk_io_channel_iter_get_channel(i); 4619 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 4620 4621 bdev_io_stat_add(bdev_iostat_ctx->stat, &channel->stat); 4622 spdk_for_each_channel_continue(i, 0); 4623 } 4624 4625 void 4626 spdk_bdev_get_device_stat(struct spdk_bdev *bdev, struct spdk_bdev_io_stat *stat, 4627 spdk_bdev_get_device_stat_cb cb, void *cb_arg) 4628 { 4629 struct spdk_bdev_iostat_ctx *bdev_iostat_ctx; 4630 4631 assert(bdev != NULL); 4632 assert(stat != NULL); 4633 assert(cb != NULL); 4634 4635 bdev_iostat_ctx = calloc(1, sizeof(struct spdk_bdev_iostat_ctx)); 4636 if (bdev_iostat_ctx == NULL) { 4637 SPDK_ERRLOG("Unable to allocate memory for spdk_bdev_iostat_ctx\n"); 4638 cb(bdev, stat, cb_arg, -ENOMEM); 4639 return; 4640 } 4641 4642 bdev_iostat_ctx->stat = stat; 4643 bdev_iostat_ctx->cb = cb; 4644 bdev_iostat_ctx->cb_arg = cb_arg; 4645 4646 /* Start with the statistics from previously deleted channels. */ 4647 pthread_mutex_lock(&bdev->internal.mutex); 4648 bdev_io_stat_add(bdev_iostat_ctx->stat, &bdev->internal.stat); 4649 pthread_mutex_unlock(&bdev->internal.mutex); 4650 4651 /* Then iterate and add the statistics from each existing channel. */ 4652 spdk_for_each_channel(__bdev_to_io_dev(bdev), 4653 bdev_get_each_channel_stat, 4654 bdev_iostat_ctx, 4655 bdev_get_device_stat_done); 4656 } 4657 4658 int 4659 spdk_bdev_nvme_admin_passthru(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 4660 const struct spdk_nvme_cmd *cmd, void *buf, size_t nbytes, 4661 spdk_bdev_io_completion_cb cb, void *cb_arg) 4662 { 4663 struct spdk_bdev *bdev = spdk_bdev_desc_get_bdev(desc); 4664 struct spdk_bdev_io *bdev_io; 4665 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 4666 4667 if (!desc->write) { 4668 return -EBADF; 4669 } 4670 4671 bdev_io = bdev_channel_get_io(channel); 4672 if (!bdev_io) { 4673 return -ENOMEM; 4674 } 4675 4676 bdev_io->internal.ch = channel; 4677 bdev_io->internal.desc = desc; 4678 bdev_io->type = SPDK_BDEV_IO_TYPE_NVME_ADMIN; 4679 bdev_io->u.nvme_passthru.cmd = *cmd; 4680 bdev_io->u.nvme_passthru.buf = buf; 4681 bdev_io->u.nvme_passthru.nbytes = nbytes; 4682 bdev_io->u.nvme_passthru.md_buf = NULL; 4683 bdev_io->u.nvme_passthru.md_len = 0; 4684 4685 bdev_io_init(bdev_io, bdev, cb_arg, cb); 4686 4687 bdev_io_submit(bdev_io); 4688 return 0; 4689 } 4690 4691 int 4692 spdk_bdev_nvme_io_passthru(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 4693 const struct spdk_nvme_cmd *cmd, void *buf, size_t nbytes, 4694 spdk_bdev_io_completion_cb cb, void *cb_arg) 4695 { 4696 struct spdk_bdev *bdev = spdk_bdev_desc_get_bdev(desc); 4697 struct spdk_bdev_io *bdev_io; 4698 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 4699 4700 if (!desc->write) { 4701 /* 4702 * Do not try to parse the NVMe command - we could maybe use bits in the opcode 4703 * to easily determine if the command is a read or write, but for now just 4704 * do not allow io_passthru with a read-only descriptor. 4705 */ 4706 return -EBADF; 4707 } 4708 4709 bdev_io = bdev_channel_get_io(channel); 4710 if (!bdev_io) { 4711 return -ENOMEM; 4712 } 4713 4714 bdev_io->internal.ch = channel; 4715 bdev_io->internal.desc = desc; 4716 bdev_io->type = SPDK_BDEV_IO_TYPE_NVME_IO; 4717 bdev_io->u.nvme_passthru.cmd = *cmd; 4718 bdev_io->u.nvme_passthru.buf = buf; 4719 bdev_io->u.nvme_passthru.nbytes = nbytes; 4720 bdev_io->u.nvme_passthru.md_buf = NULL; 4721 bdev_io->u.nvme_passthru.md_len = 0; 4722 4723 bdev_io_init(bdev_io, bdev, cb_arg, cb); 4724 4725 bdev_io_submit(bdev_io); 4726 return 0; 4727 } 4728 4729 int 4730 spdk_bdev_nvme_io_passthru_md(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 4731 const struct spdk_nvme_cmd *cmd, void *buf, size_t nbytes, void *md_buf, size_t md_len, 4732 spdk_bdev_io_completion_cb cb, void *cb_arg) 4733 { 4734 struct spdk_bdev *bdev = spdk_bdev_desc_get_bdev(desc); 4735 struct spdk_bdev_io *bdev_io; 4736 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 4737 4738 if (!desc->write) { 4739 /* 4740 * Do not try to parse the NVMe command - we could maybe use bits in the opcode 4741 * to easily determine if the command is a read or write, but for now just 4742 * do not allow io_passthru with a read-only descriptor. 4743 */ 4744 return -EBADF; 4745 } 4746 4747 bdev_io = bdev_channel_get_io(channel); 4748 if (!bdev_io) { 4749 return -ENOMEM; 4750 } 4751 4752 bdev_io->internal.ch = channel; 4753 bdev_io->internal.desc = desc; 4754 bdev_io->type = SPDK_BDEV_IO_TYPE_NVME_IO_MD; 4755 bdev_io->u.nvme_passthru.cmd = *cmd; 4756 bdev_io->u.nvme_passthru.buf = buf; 4757 bdev_io->u.nvme_passthru.nbytes = nbytes; 4758 bdev_io->u.nvme_passthru.md_buf = md_buf; 4759 bdev_io->u.nvme_passthru.md_len = md_len; 4760 4761 bdev_io_init(bdev_io, bdev, cb_arg, cb); 4762 4763 bdev_io_submit(bdev_io); 4764 return 0; 4765 } 4766 4767 static void bdev_abort_retry(void *ctx); 4768 static void bdev_abort(struct spdk_bdev_io *parent_io); 4769 4770 static void 4771 bdev_abort_io_done(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg) 4772 { 4773 struct spdk_bdev_channel *channel = bdev_io->internal.ch; 4774 struct spdk_bdev_io *parent_io = cb_arg; 4775 struct spdk_bdev_io *bio_to_abort, *tmp_io; 4776 4777 bio_to_abort = bdev_io->u.abort.bio_to_abort; 4778 4779 spdk_bdev_free_io(bdev_io); 4780 4781 if (!success) { 4782 /* Check if the target I/O completed in the meantime. */ 4783 TAILQ_FOREACH(tmp_io, &channel->io_submitted, internal.ch_link) { 4784 if (tmp_io == bio_to_abort) { 4785 break; 4786 } 4787 } 4788 4789 /* If the target I/O still exists, set the parent to failed. */ 4790 if (tmp_io != NULL) { 4791 parent_io->internal.status = SPDK_BDEV_IO_STATUS_FAILED; 4792 } 4793 } 4794 4795 parent_io->u.bdev.split_outstanding--; 4796 if (parent_io->u.bdev.split_outstanding == 0) { 4797 if (parent_io->internal.status == SPDK_BDEV_IO_STATUS_NOMEM) { 4798 bdev_abort_retry(parent_io); 4799 } else { 4800 bdev_io_complete(parent_io); 4801 } 4802 } 4803 } 4804 4805 static int 4806 bdev_abort_io(struct spdk_bdev_desc *desc, struct spdk_bdev_channel *channel, 4807 struct spdk_bdev_io *bio_to_abort, 4808 spdk_bdev_io_completion_cb cb, void *cb_arg) 4809 { 4810 struct spdk_bdev *bdev = spdk_bdev_desc_get_bdev(desc); 4811 struct spdk_bdev_io *bdev_io; 4812 4813 if (bio_to_abort->type == SPDK_BDEV_IO_TYPE_ABORT || 4814 bio_to_abort->type == SPDK_BDEV_IO_TYPE_RESET) { 4815 /* TODO: Abort reset or abort request. */ 4816 return -ENOTSUP; 4817 } 4818 4819 bdev_io = bdev_channel_get_io(channel); 4820 if (bdev_io == NULL) { 4821 return -ENOMEM; 4822 } 4823 4824 bdev_io->internal.ch = channel; 4825 bdev_io->internal.desc = desc; 4826 bdev_io->type = SPDK_BDEV_IO_TYPE_ABORT; 4827 bdev_io_init(bdev_io, bdev, cb_arg, cb); 4828 4829 if (bdev->split_on_optimal_io_boundary && bdev_io_should_split(bio_to_abort)) { 4830 bdev_io->u.bdev.abort.bio_cb_arg = bio_to_abort; 4831 4832 /* Parent abort request is not submitted directly, but to manage its 4833 * execution add it to the submitted list here. 4834 */ 4835 bdev_io->internal.submit_tsc = spdk_get_ticks(); 4836 TAILQ_INSERT_TAIL(&channel->io_submitted, bdev_io, internal.ch_link); 4837 4838 bdev_abort(bdev_io); 4839 4840 return 0; 4841 } 4842 4843 bdev_io->u.abort.bio_to_abort = bio_to_abort; 4844 4845 /* Submit the abort request to the underlying bdev module. */ 4846 bdev_io_submit(bdev_io); 4847 4848 return 0; 4849 } 4850 4851 static uint32_t 4852 _bdev_abort(struct spdk_bdev_io *parent_io) 4853 { 4854 struct spdk_bdev_desc *desc = parent_io->internal.desc; 4855 struct spdk_bdev_channel *channel = parent_io->internal.ch; 4856 void *bio_cb_arg; 4857 struct spdk_bdev_io *bio_to_abort; 4858 uint32_t matched_ios; 4859 int rc; 4860 4861 bio_cb_arg = parent_io->u.bdev.abort.bio_cb_arg; 4862 4863 /* matched_ios is returned and will be kept by the caller. 4864 * 4865 * This funcion will be used for two cases, 1) the same cb_arg is used for 4866 * multiple I/Os, 2) a single large I/O is split into smaller ones. 4867 * Incrementing split_outstanding directly here may confuse readers especially 4868 * for the 1st case. 4869 * 4870 * Completion of I/O abort is processed after stack unwinding. Hence this trick 4871 * works as expected. 4872 */ 4873 matched_ios = 0; 4874 parent_io->internal.status = SPDK_BDEV_IO_STATUS_SUCCESS; 4875 4876 TAILQ_FOREACH(bio_to_abort, &channel->io_submitted, internal.ch_link) { 4877 if (bio_to_abort->internal.caller_ctx != bio_cb_arg) { 4878 continue; 4879 } 4880 4881 if (bio_to_abort->internal.submit_tsc > parent_io->internal.submit_tsc) { 4882 /* Any I/O which was submitted after this abort command should be excluded. */ 4883 continue; 4884 } 4885 4886 rc = bdev_abort_io(desc, channel, bio_to_abort, bdev_abort_io_done, parent_io); 4887 if (rc != 0) { 4888 if (rc == -ENOMEM) { 4889 parent_io->internal.status = SPDK_BDEV_IO_STATUS_NOMEM; 4890 } else { 4891 parent_io->internal.status = SPDK_BDEV_IO_STATUS_FAILED; 4892 } 4893 break; 4894 } 4895 matched_ios++; 4896 } 4897 4898 return matched_ios; 4899 } 4900 4901 static void 4902 bdev_abort_retry(void *ctx) 4903 { 4904 struct spdk_bdev_io *parent_io = ctx; 4905 uint32_t matched_ios; 4906 4907 matched_ios = _bdev_abort(parent_io); 4908 4909 if (matched_ios == 0) { 4910 if (parent_io->internal.status == SPDK_BDEV_IO_STATUS_NOMEM) { 4911 bdev_queue_io_wait_with_cb(parent_io, bdev_abort_retry); 4912 } else { 4913 /* For retry, the case that no target I/O was found is success 4914 * because it means target I/Os completed in the meantime. 4915 */ 4916 bdev_io_complete(parent_io); 4917 } 4918 return; 4919 } 4920 4921 /* Use split_outstanding to manage the progress of aborting I/Os. */ 4922 parent_io->u.bdev.split_outstanding = matched_ios; 4923 } 4924 4925 static void 4926 bdev_abort(struct spdk_bdev_io *parent_io) 4927 { 4928 uint32_t matched_ios; 4929 4930 matched_ios = _bdev_abort(parent_io); 4931 4932 if (matched_ios == 0) { 4933 if (parent_io->internal.status == SPDK_BDEV_IO_STATUS_NOMEM) { 4934 bdev_queue_io_wait_with_cb(parent_io, bdev_abort_retry); 4935 } else { 4936 /* The case the no target I/O was found is failure. */ 4937 parent_io->internal.status = SPDK_BDEV_IO_STATUS_FAILED; 4938 bdev_io_complete(parent_io); 4939 } 4940 return; 4941 } 4942 4943 /* Use split_outstanding to manage the progress of aborting I/Os. */ 4944 parent_io->u.bdev.split_outstanding = matched_ios; 4945 } 4946 4947 int 4948 spdk_bdev_abort(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 4949 void *bio_cb_arg, 4950 spdk_bdev_io_completion_cb cb, void *cb_arg) 4951 { 4952 struct spdk_bdev *bdev = spdk_bdev_desc_get_bdev(desc); 4953 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 4954 struct spdk_bdev_io *bdev_io; 4955 4956 if (bio_cb_arg == NULL) { 4957 return -EINVAL; 4958 } 4959 4960 if (!spdk_bdev_io_type_supported(bdev, SPDK_BDEV_IO_TYPE_ABORT)) { 4961 return -ENOTSUP; 4962 } 4963 4964 bdev_io = bdev_channel_get_io(channel); 4965 if (bdev_io == NULL) { 4966 return -ENOMEM; 4967 } 4968 4969 bdev_io->internal.ch = channel; 4970 bdev_io->internal.desc = desc; 4971 bdev_io->internal.submit_tsc = spdk_get_ticks(); 4972 bdev_io->type = SPDK_BDEV_IO_TYPE_ABORT; 4973 bdev_io_init(bdev_io, bdev, cb_arg, cb); 4974 4975 bdev_io->u.bdev.abort.bio_cb_arg = bio_cb_arg; 4976 4977 /* Parent abort request is not submitted directly, but to manage its execution, 4978 * add it to the submitted list here. 4979 */ 4980 TAILQ_INSERT_TAIL(&channel->io_submitted, bdev_io, internal.ch_link); 4981 4982 bdev_abort(bdev_io); 4983 4984 return 0; 4985 } 4986 4987 int 4988 spdk_bdev_queue_io_wait(struct spdk_bdev *bdev, struct spdk_io_channel *ch, 4989 struct spdk_bdev_io_wait_entry *entry) 4990 { 4991 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 4992 struct spdk_bdev_mgmt_channel *mgmt_ch = channel->shared_resource->mgmt_ch; 4993 4994 if (bdev != entry->bdev) { 4995 SPDK_ERRLOG("bdevs do not match\n"); 4996 return -EINVAL; 4997 } 4998 4999 if (mgmt_ch->per_thread_cache_count > 0) { 5000 SPDK_ERRLOG("Cannot queue io_wait if spdk_bdev_io available in per-thread cache\n"); 5001 return -EINVAL; 5002 } 5003 5004 TAILQ_INSERT_TAIL(&mgmt_ch->io_wait_queue, entry, link); 5005 return 0; 5006 } 5007 5008 static void 5009 bdev_ch_retry_io(struct spdk_bdev_channel *bdev_ch) 5010 { 5011 struct spdk_bdev *bdev = bdev_ch->bdev; 5012 struct spdk_bdev_shared_resource *shared_resource = bdev_ch->shared_resource; 5013 struct spdk_bdev_io *bdev_io; 5014 5015 if (shared_resource->io_outstanding > shared_resource->nomem_threshold) { 5016 /* 5017 * Allow some more I/O to complete before retrying the nomem_io queue. 5018 * Some drivers (such as nvme) cannot immediately take a new I/O in 5019 * the context of a completion, because the resources for the I/O are 5020 * not released until control returns to the bdev poller. Also, we 5021 * may require several small I/O to complete before a larger I/O 5022 * (that requires splitting) can be submitted. 5023 */ 5024 return; 5025 } 5026 5027 while (!TAILQ_EMPTY(&shared_resource->nomem_io)) { 5028 bdev_io = TAILQ_FIRST(&shared_resource->nomem_io); 5029 TAILQ_REMOVE(&shared_resource->nomem_io, bdev_io, internal.link); 5030 bdev_io->internal.ch->io_outstanding++; 5031 shared_resource->io_outstanding++; 5032 bdev_io->internal.status = SPDK_BDEV_IO_STATUS_PENDING; 5033 bdev_io->internal.error.nvme.cdw0 = 0; 5034 bdev_io->num_retries++; 5035 bdev->fn_table->submit_request(spdk_bdev_io_get_io_channel(bdev_io), bdev_io); 5036 if (bdev_io->internal.status == SPDK_BDEV_IO_STATUS_NOMEM) { 5037 break; 5038 } 5039 } 5040 } 5041 5042 static inline void 5043 bdev_io_complete(void *ctx) 5044 { 5045 struct spdk_bdev_io *bdev_io = ctx; 5046 struct spdk_bdev_channel *bdev_ch = bdev_io->internal.ch; 5047 uint64_t tsc, tsc_diff; 5048 5049 if (spdk_unlikely(bdev_io->internal.in_submit_request || bdev_io->internal.io_submit_ch)) { 5050 /* 5051 * Send the completion to the thread that originally submitted the I/O, 5052 * which may not be the current thread in the case of QoS. 5053 */ 5054 if (bdev_io->internal.io_submit_ch) { 5055 bdev_io->internal.ch = bdev_io->internal.io_submit_ch; 5056 bdev_io->internal.io_submit_ch = NULL; 5057 } 5058 5059 /* 5060 * Defer completion to avoid potential infinite recursion if the 5061 * user's completion callback issues a new I/O. 5062 */ 5063 spdk_thread_send_msg(spdk_bdev_io_get_thread(bdev_io), 5064 bdev_io_complete, bdev_io); 5065 return; 5066 } 5067 5068 tsc = spdk_get_ticks(); 5069 tsc_diff = tsc - bdev_io->internal.submit_tsc; 5070 spdk_trace_record_tsc(tsc, TRACE_BDEV_IO_DONE, 0, 0, (uintptr_t)bdev_io, 0); 5071 5072 TAILQ_REMOVE(&bdev_ch->io_submitted, bdev_io, internal.ch_link); 5073 5074 if (bdev_io->internal.ch->histogram) { 5075 spdk_histogram_data_tally(bdev_io->internal.ch->histogram, tsc_diff); 5076 } 5077 5078 if (bdev_io->internal.status == SPDK_BDEV_IO_STATUS_SUCCESS) { 5079 switch (bdev_io->type) { 5080 case SPDK_BDEV_IO_TYPE_READ: 5081 bdev_io->internal.ch->stat.bytes_read += bdev_io->u.bdev.num_blocks * bdev_io->bdev->blocklen; 5082 bdev_io->internal.ch->stat.num_read_ops++; 5083 bdev_io->internal.ch->stat.read_latency_ticks += tsc_diff; 5084 break; 5085 case SPDK_BDEV_IO_TYPE_WRITE: 5086 bdev_io->internal.ch->stat.bytes_written += bdev_io->u.bdev.num_blocks * bdev_io->bdev->blocklen; 5087 bdev_io->internal.ch->stat.num_write_ops++; 5088 bdev_io->internal.ch->stat.write_latency_ticks += tsc_diff; 5089 break; 5090 case SPDK_BDEV_IO_TYPE_UNMAP: 5091 bdev_io->internal.ch->stat.bytes_unmapped += bdev_io->u.bdev.num_blocks * bdev_io->bdev->blocklen; 5092 bdev_io->internal.ch->stat.num_unmap_ops++; 5093 bdev_io->internal.ch->stat.unmap_latency_ticks += tsc_diff; 5094 break; 5095 case SPDK_BDEV_IO_TYPE_ZCOPY: 5096 /* Track the data in the start phase only */ 5097 if (bdev_io->u.bdev.zcopy.start) { 5098 if (bdev_io->u.bdev.zcopy.populate) { 5099 bdev_io->internal.ch->stat.bytes_read += 5100 bdev_io->u.bdev.num_blocks * bdev_io->bdev->blocklen; 5101 bdev_io->internal.ch->stat.num_read_ops++; 5102 bdev_io->internal.ch->stat.read_latency_ticks += tsc_diff; 5103 } else { 5104 bdev_io->internal.ch->stat.bytes_written += 5105 bdev_io->u.bdev.num_blocks * bdev_io->bdev->blocklen; 5106 bdev_io->internal.ch->stat.num_write_ops++; 5107 bdev_io->internal.ch->stat.write_latency_ticks += tsc_diff; 5108 } 5109 } 5110 break; 5111 default: 5112 break; 5113 } 5114 } 5115 5116 #ifdef SPDK_CONFIG_VTUNE 5117 uint64_t now_tsc = spdk_get_ticks(); 5118 if (now_tsc > (bdev_io->internal.ch->start_tsc + bdev_io->internal.ch->interval_tsc)) { 5119 uint64_t data[5]; 5120 5121 data[0] = bdev_io->internal.ch->stat.num_read_ops - bdev_io->internal.ch->prev_stat.num_read_ops; 5122 data[1] = bdev_io->internal.ch->stat.bytes_read - bdev_io->internal.ch->prev_stat.bytes_read; 5123 data[2] = bdev_io->internal.ch->stat.num_write_ops - bdev_io->internal.ch->prev_stat.num_write_ops; 5124 data[3] = bdev_io->internal.ch->stat.bytes_written - bdev_io->internal.ch->prev_stat.bytes_written; 5125 data[4] = bdev_io->bdev->fn_table->get_spin_time ? 5126 bdev_io->bdev->fn_table->get_spin_time(spdk_bdev_io_get_io_channel(bdev_io)) : 0; 5127 5128 __itt_metadata_add(g_bdev_mgr.domain, __itt_null, bdev_io->internal.ch->handle, 5129 __itt_metadata_u64, 5, data); 5130 5131 bdev_io->internal.ch->prev_stat = bdev_io->internal.ch->stat; 5132 bdev_io->internal.ch->start_tsc = now_tsc; 5133 } 5134 #endif 5135 5136 assert(bdev_io->internal.cb != NULL); 5137 assert(spdk_get_thread() == spdk_bdev_io_get_thread(bdev_io)); 5138 5139 bdev_io->internal.cb(bdev_io, bdev_io->internal.status == SPDK_BDEV_IO_STATUS_SUCCESS, 5140 bdev_io->internal.caller_ctx); 5141 } 5142 5143 static void 5144 bdev_reset_complete(struct spdk_io_channel_iter *i, int status) 5145 { 5146 struct spdk_bdev_io *bdev_io = spdk_io_channel_iter_get_ctx(i); 5147 5148 if (bdev_io->u.reset.ch_ref != NULL) { 5149 spdk_put_io_channel(bdev_io->u.reset.ch_ref); 5150 bdev_io->u.reset.ch_ref = NULL; 5151 } 5152 5153 bdev_io_complete(bdev_io); 5154 } 5155 5156 static void 5157 bdev_unfreeze_channel(struct spdk_io_channel_iter *i) 5158 { 5159 struct spdk_bdev_io *bdev_io = spdk_io_channel_iter_get_ctx(i); 5160 struct spdk_io_channel *_ch = spdk_io_channel_iter_get_channel(i); 5161 struct spdk_bdev_channel *ch = spdk_io_channel_get_ctx(_ch); 5162 struct spdk_bdev_io *queued_reset; 5163 5164 ch->flags &= ~BDEV_CH_RESET_IN_PROGRESS; 5165 while (!TAILQ_EMPTY(&ch->queued_resets)) { 5166 queued_reset = TAILQ_FIRST(&ch->queued_resets); 5167 TAILQ_REMOVE(&ch->queued_resets, queued_reset, internal.link); 5168 spdk_bdev_io_complete(queued_reset, bdev_io->internal.status); 5169 } 5170 5171 spdk_for_each_channel_continue(i, 0); 5172 } 5173 5174 void 5175 spdk_bdev_io_complete(struct spdk_bdev_io *bdev_io, enum spdk_bdev_io_status status) 5176 { 5177 struct spdk_bdev *bdev = bdev_io->bdev; 5178 struct spdk_bdev_channel *bdev_ch = bdev_io->internal.ch; 5179 struct spdk_bdev_shared_resource *shared_resource = bdev_ch->shared_resource; 5180 5181 bdev_io->internal.status = status; 5182 5183 if (spdk_unlikely(bdev_io->type == SPDK_BDEV_IO_TYPE_RESET)) { 5184 bool unlock_channels = false; 5185 5186 if (status == SPDK_BDEV_IO_STATUS_NOMEM) { 5187 SPDK_ERRLOG("NOMEM returned for reset\n"); 5188 } 5189 pthread_mutex_lock(&bdev->internal.mutex); 5190 if (bdev_io == bdev->internal.reset_in_progress) { 5191 bdev->internal.reset_in_progress = NULL; 5192 unlock_channels = true; 5193 } 5194 pthread_mutex_unlock(&bdev->internal.mutex); 5195 5196 if (unlock_channels) { 5197 spdk_for_each_channel(__bdev_to_io_dev(bdev), bdev_unfreeze_channel, 5198 bdev_io, bdev_reset_complete); 5199 return; 5200 } 5201 } else { 5202 _bdev_io_unset_bounce_buf(bdev_io); 5203 5204 assert(bdev_ch->io_outstanding > 0); 5205 assert(shared_resource->io_outstanding > 0); 5206 bdev_ch->io_outstanding--; 5207 shared_resource->io_outstanding--; 5208 5209 if (spdk_unlikely(status == SPDK_BDEV_IO_STATUS_NOMEM)) { 5210 TAILQ_INSERT_HEAD(&shared_resource->nomem_io, bdev_io, internal.link); 5211 /* 5212 * Wait for some of the outstanding I/O to complete before we 5213 * retry any of the nomem_io. Normally we will wait for 5214 * NOMEM_THRESHOLD_COUNT I/O to complete but for low queue 5215 * depth channels we will instead wait for half to complete. 5216 */ 5217 shared_resource->nomem_threshold = spdk_max((int64_t)shared_resource->io_outstanding / 2, 5218 (int64_t)shared_resource->io_outstanding - NOMEM_THRESHOLD_COUNT); 5219 return; 5220 } 5221 5222 if (spdk_unlikely(!TAILQ_EMPTY(&shared_resource->nomem_io))) { 5223 bdev_ch_retry_io(bdev_ch); 5224 } 5225 } 5226 5227 bdev_io_complete(bdev_io); 5228 } 5229 5230 void 5231 spdk_bdev_io_complete_scsi_status(struct spdk_bdev_io *bdev_io, enum spdk_scsi_status sc, 5232 enum spdk_scsi_sense sk, uint8_t asc, uint8_t ascq) 5233 { 5234 if (sc == SPDK_SCSI_STATUS_GOOD) { 5235 bdev_io->internal.status = SPDK_BDEV_IO_STATUS_SUCCESS; 5236 } else { 5237 bdev_io->internal.status = SPDK_BDEV_IO_STATUS_SCSI_ERROR; 5238 bdev_io->internal.error.scsi.sc = sc; 5239 bdev_io->internal.error.scsi.sk = sk; 5240 bdev_io->internal.error.scsi.asc = asc; 5241 bdev_io->internal.error.scsi.ascq = ascq; 5242 } 5243 5244 spdk_bdev_io_complete(bdev_io, bdev_io->internal.status); 5245 } 5246 5247 void 5248 spdk_bdev_io_get_scsi_status(const struct spdk_bdev_io *bdev_io, 5249 int *sc, int *sk, int *asc, int *ascq) 5250 { 5251 assert(sc != NULL); 5252 assert(sk != NULL); 5253 assert(asc != NULL); 5254 assert(ascq != NULL); 5255 5256 switch (bdev_io->internal.status) { 5257 case SPDK_BDEV_IO_STATUS_SUCCESS: 5258 *sc = SPDK_SCSI_STATUS_GOOD; 5259 *sk = SPDK_SCSI_SENSE_NO_SENSE; 5260 *asc = SPDK_SCSI_ASC_NO_ADDITIONAL_SENSE; 5261 *ascq = SPDK_SCSI_ASCQ_CAUSE_NOT_REPORTABLE; 5262 break; 5263 case SPDK_BDEV_IO_STATUS_NVME_ERROR: 5264 spdk_scsi_nvme_translate(bdev_io, sc, sk, asc, ascq); 5265 break; 5266 case SPDK_BDEV_IO_STATUS_SCSI_ERROR: 5267 *sc = bdev_io->internal.error.scsi.sc; 5268 *sk = bdev_io->internal.error.scsi.sk; 5269 *asc = bdev_io->internal.error.scsi.asc; 5270 *ascq = bdev_io->internal.error.scsi.ascq; 5271 break; 5272 default: 5273 *sc = SPDK_SCSI_STATUS_CHECK_CONDITION; 5274 *sk = SPDK_SCSI_SENSE_ABORTED_COMMAND; 5275 *asc = SPDK_SCSI_ASC_NO_ADDITIONAL_SENSE; 5276 *ascq = SPDK_SCSI_ASCQ_CAUSE_NOT_REPORTABLE; 5277 break; 5278 } 5279 } 5280 5281 void 5282 spdk_bdev_io_complete_aio_status(struct spdk_bdev_io *bdev_io, int aio_result) 5283 { 5284 if (aio_result == 0) { 5285 bdev_io->internal.status = SPDK_BDEV_IO_STATUS_SUCCESS; 5286 } else { 5287 bdev_io->internal.status = SPDK_BDEV_IO_STATUS_AIO_ERROR; 5288 } 5289 5290 bdev_io->internal.error.aio_result = aio_result; 5291 5292 spdk_bdev_io_complete(bdev_io, bdev_io->internal.status); 5293 } 5294 5295 void 5296 spdk_bdev_io_get_aio_status(const struct spdk_bdev_io *bdev_io, int *aio_result) 5297 { 5298 assert(aio_result != NULL); 5299 5300 if (bdev_io->internal.status == SPDK_BDEV_IO_STATUS_AIO_ERROR) { 5301 *aio_result = bdev_io->internal.error.aio_result; 5302 } else if (bdev_io->internal.status == SPDK_BDEV_IO_STATUS_SUCCESS) { 5303 *aio_result = 0; 5304 } else { 5305 *aio_result = -EIO; 5306 } 5307 } 5308 5309 void 5310 spdk_bdev_io_complete_nvme_status(struct spdk_bdev_io *bdev_io, uint32_t cdw0, int sct, int sc) 5311 { 5312 if (sct == SPDK_NVME_SCT_GENERIC && sc == SPDK_NVME_SC_SUCCESS) { 5313 bdev_io->internal.status = SPDK_BDEV_IO_STATUS_SUCCESS; 5314 } else { 5315 bdev_io->internal.status = SPDK_BDEV_IO_STATUS_NVME_ERROR; 5316 } 5317 5318 bdev_io->internal.error.nvme.cdw0 = cdw0; 5319 bdev_io->internal.error.nvme.sct = sct; 5320 bdev_io->internal.error.nvme.sc = sc; 5321 5322 spdk_bdev_io_complete(bdev_io, bdev_io->internal.status); 5323 } 5324 5325 void 5326 spdk_bdev_io_get_nvme_status(const struct spdk_bdev_io *bdev_io, uint32_t *cdw0, int *sct, int *sc) 5327 { 5328 assert(sct != NULL); 5329 assert(sc != NULL); 5330 assert(cdw0 != NULL); 5331 5332 if (bdev_io->internal.status == SPDK_BDEV_IO_STATUS_NVME_ERROR) { 5333 *sct = bdev_io->internal.error.nvme.sct; 5334 *sc = bdev_io->internal.error.nvme.sc; 5335 } else if (bdev_io->internal.status == SPDK_BDEV_IO_STATUS_SUCCESS) { 5336 *sct = SPDK_NVME_SCT_GENERIC; 5337 *sc = SPDK_NVME_SC_SUCCESS; 5338 } else if (bdev_io->internal.status == SPDK_BDEV_IO_STATUS_ABORTED) { 5339 *sct = SPDK_NVME_SCT_GENERIC; 5340 *sc = SPDK_NVME_SC_ABORTED_BY_REQUEST; 5341 } else { 5342 *sct = SPDK_NVME_SCT_GENERIC; 5343 *sc = SPDK_NVME_SC_INTERNAL_DEVICE_ERROR; 5344 } 5345 5346 *cdw0 = bdev_io->internal.error.nvme.cdw0; 5347 } 5348 5349 void 5350 spdk_bdev_io_get_nvme_fused_status(const struct spdk_bdev_io *bdev_io, uint32_t *cdw0, 5351 int *first_sct, int *first_sc, int *second_sct, int *second_sc) 5352 { 5353 assert(first_sct != NULL); 5354 assert(first_sc != NULL); 5355 assert(second_sct != NULL); 5356 assert(second_sc != NULL); 5357 assert(cdw0 != NULL); 5358 5359 if (bdev_io->internal.status == SPDK_BDEV_IO_STATUS_NVME_ERROR) { 5360 if (bdev_io->internal.error.nvme.sct == SPDK_NVME_SCT_MEDIA_ERROR && 5361 bdev_io->internal.error.nvme.sc == SPDK_NVME_SC_COMPARE_FAILURE) { 5362 *first_sct = bdev_io->internal.error.nvme.sct; 5363 *first_sc = bdev_io->internal.error.nvme.sc; 5364 *second_sct = SPDK_NVME_SCT_GENERIC; 5365 *second_sc = SPDK_NVME_SC_ABORTED_FAILED_FUSED; 5366 } else { 5367 *first_sct = SPDK_NVME_SCT_GENERIC; 5368 *first_sc = SPDK_NVME_SC_SUCCESS; 5369 *second_sct = bdev_io->internal.error.nvme.sct; 5370 *second_sc = bdev_io->internal.error.nvme.sc; 5371 } 5372 } else if (bdev_io->internal.status == SPDK_BDEV_IO_STATUS_SUCCESS) { 5373 *first_sct = SPDK_NVME_SCT_GENERIC; 5374 *first_sc = SPDK_NVME_SC_SUCCESS; 5375 *second_sct = SPDK_NVME_SCT_GENERIC; 5376 *second_sc = SPDK_NVME_SC_SUCCESS; 5377 } else if (bdev_io->internal.status == SPDK_BDEV_IO_STATUS_FIRST_FUSED_FAILED) { 5378 *first_sct = SPDK_NVME_SCT_GENERIC; 5379 *first_sc = SPDK_NVME_SC_INTERNAL_DEVICE_ERROR; 5380 *second_sct = SPDK_NVME_SCT_GENERIC; 5381 *second_sc = SPDK_NVME_SC_ABORTED_FAILED_FUSED; 5382 } else if (bdev_io->internal.status == SPDK_BDEV_IO_STATUS_MISCOMPARE) { 5383 *first_sct = SPDK_NVME_SCT_MEDIA_ERROR; 5384 *first_sc = SPDK_NVME_SC_COMPARE_FAILURE; 5385 *second_sct = SPDK_NVME_SCT_GENERIC; 5386 *second_sc = SPDK_NVME_SC_ABORTED_FAILED_FUSED; 5387 } else { 5388 *first_sct = SPDK_NVME_SCT_GENERIC; 5389 *first_sc = SPDK_NVME_SC_INTERNAL_DEVICE_ERROR; 5390 *second_sct = SPDK_NVME_SCT_GENERIC; 5391 *second_sc = SPDK_NVME_SC_INTERNAL_DEVICE_ERROR; 5392 } 5393 5394 *cdw0 = bdev_io->internal.error.nvme.cdw0; 5395 } 5396 5397 struct spdk_thread * 5398 spdk_bdev_io_get_thread(struct spdk_bdev_io *bdev_io) 5399 { 5400 return spdk_io_channel_get_thread(bdev_io->internal.ch->channel); 5401 } 5402 5403 struct spdk_io_channel * 5404 spdk_bdev_io_get_io_channel(struct spdk_bdev_io *bdev_io) 5405 { 5406 return bdev_io->internal.ch->channel; 5407 } 5408 5409 static int 5410 bdev_init(struct spdk_bdev *bdev) 5411 { 5412 char *bdev_name; 5413 5414 assert(bdev->module != NULL); 5415 5416 if (!bdev->name) { 5417 SPDK_ERRLOG("Bdev name is NULL\n"); 5418 return -EINVAL; 5419 } 5420 5421 if (!strlen(bdev->name)) { 5422 SPDK_ERRLOG("Bdev name must not be an empty string\n"); 5423 return -EINVAL; 5424 } 5425 5426 if (spdk_bdev_get_by_name(bdev->name)) { 5427 SPDK_ERRLOG("Bdev name:%s already exists\n", bdev->name); 5428 return -EEXIST; 5429 } 5430 5431 /* Users often register their own I/O devices using the bdev name. In 5432 * order to avoid conflicts, prepend bdev_. */ 5433 bdev_name = spdk_sprintf_alloc("bdev_%s", bdev->name); 5434 if (!bdev_name) { 5435 SPDK_ERRLOG("Unable to allocate memory for internal bdev name.\n"); 5436 return -ENOMEM; 5437 } 5438 5439 bdev->internal.status = SPDK_BDEV_STATUS_READY; 5440 bdev->internal.measured_queue_depth = UINT64_MAX; 5441 bdev->internal.claim_module = NULL; 5442 bdev->internal.qd_poller = NULL; 5443 bdev->internal.qos = NULL; 5444 5445 /* If the user didn't specify a uuid, generate one. */ 5446 if (spdk_mem_all_zero(&bdev->uuid, sizeof(bdev->uuid))) { 5447 spdk_uuid_generate(&bdev->uuid); 5448 } 5449 5450 if (spdk_bdev_get_buf_align(bdev) > 1) { 5451 if (bdev->split_on_optimal_io_boundary) { 5452 bdev->optimal_io_boundary = spdk_min(bdev->optimal_io_boundary, 5453 SPDK_BDEV_LARGE_BUF_MAX_SIZE / bdev->blocklen); 5454 } else { 5455 bdev->split_on_optimal_io_boundary = true; 5456 bdev->optimal_io_boundary = SPDK_BDEV_LARGE_BUF_MAX_SIZE / bdev->blocklen; 5457 } 5458 } 5459 5460 /* If the user didn't specify a write unit size, set it to one. */ 5461 if (bdev->write_unit_size == 0) { 5462 bdev->write_unit_size = 1; 5463 } 5464 5465 /* Set ACWU value to 1 if bdev module did not set it (does not support it natively) */ 5466 if (bdev->acwu == 0) { 5467 bdev->acwu = 1; 5468 } 5469 5470 TAILQ_INIT(&bdev->internal.open_descs); 5471 TAILQ_INIT(&bdev->internal.locked_ranges); 5472 TAILQ_INIT(&bdev->internal.pending_locked_ranges); 5473 5474 TAILQ_INIT(&bdev->aliases); 5475 5476 bdev->internal.reset_in_progress = NULL; 5477 5478 spdk_io_device_register(__bdev_to_io_dev(bdev), 5479 bdev_channel_create, bdev_channel_destroy, 5480 sizeof(struct spdk_bdev_channel), 5481 bdev_name); 5482 5483 free(bdev_name); 5484 5485 pthread_mutex_init(&bdev->internal.mutex, NULL); 5486 return 0; 5487 } 5488 5489 static void 5490 bdev_destroy_cb(void *io_device) 5491 { 5492 int rc; 5493 struct spdk_bdev *bdev; 5494 spdk_bdev_unregister_cb cb_fn; 5495 void *cb_arg; 5496 5497 bdev = __bdev_from_io_dev(io_device); 5498 cb_fn = bdev->internal.unregister_cb; 5499 cb_arg = bdev->internal.unregister_ctx; 5500 5501 rc = bdev->fn_table->destruct(bdev->ctxt); 5502 if (rc < 0) { 5503 SPDK_ERRLOG("destruct failed\n"); 5504 } 5505 if (rc <= 0 && cb_fn != NULL) { 5506 cb_fn(cb_arg, rc); 5507 } 5508 } 5509 5510 5511 static void 5512 bdev_fini(struct spdk_bdev *bdev) 5513 { 5514 pthread_mutex_destroy(&bdev->internal.mutex); 5515 5516 free(bdev->internal.qos); 5517 5518 spdk_io_device_unregister(__bdev_to_io_dev(bdev), bdev_destroy_cb); 5519 } 5520 5521 static void 5522 bdev_start_finished(void *arg) 5523 { 5524 struct spdk_bdev *bdev = arg; 5525 5526 spdk_notify_send("bdev_register", spdk_bdev_get_name(bdev)); 5527 } 5528 5529 static void 5530 bdev_start(struct spdk_bdev *bdev) 5531 { 5532 SPDK_DEBUGLOG(bdev, "Inserting bdev %s into list\n", bdev->name); 5533 TAILQ_INSERT_TAIL(&g_bdev_mgr.bdevs, bdev, internal.link); 5534 5535 /* Examine configuration before initializing I/O */ 5536 bdev_examine(bdev); 5537 5538 spdk_bdev_wait_for_examine(bdev_start_finished, bdev); 5539 } 5540 5541 int 5542 spdk_bdev_register(struct spdk_bdev *bdev) 5543 { 5544 int rc = bdev_init(bdev); 5545 5546 if (rc == 0) { 5547 bdev_start(bdev); 5548 } 5549 5550 return rc; 5551 } 5552 5553 int 5554 spdk_vbdev_register(struct spdk_bdev *vbdev, struct spdk_bdev **base_bdevs, int base_bdev_count) 5555 { 5556 SPDK_ERRLOG("This function is deprecated. Use spdk_bdev_register() instead.\n"); 5557 return spdk_bdev_register(vbdev); 5558 } 5559 5560 void 5561 spdk_bdev_destruct_done(struct spdk_bdev *bdev, int bdeverrno) 5562 { 5563 if (bdev->internal.unregister_cb != NULL) { 5564 bdev->internal.unregister_cb(bdev->internal.unregister_ctx, bdeverrno); 5565 } 5566 } 5567 5568 static void 5569 _remove_notify(void *arg) 5570 { 5571 struct spdk_bdev_desc *desc = arg; 5572 5573 pthread_mutex_lock(&desc->mutex); 5574 desc->refs--; 5575 5576 if (!desc->closed) { 5577 pthread_mutex_unlock(&desc->mutex); 5578 if (desc->callback.open_with_ext) { 5579 desc->callback.event_fn(SPDK_BDEV_EVENT_REMOVE, desc->bdev, desc->callback.ctx); 5580 } else { 5581 desc->callback.remove_fn(desc->callback.ctx); 5582 } 5583 return; 5584 } else if (0 == desc->refs) { 5585 /* This descriptor was closed after this remove_notify message was sent. 5586 * spdk_bdev_close() could not free the descriptor since this message was 5587 * in flight, so we free it now using bdev_desc_free(). 5588 */ 5589 pthread_mutex_unlock(&desc->mutex); 5590 bdev_desc_free(desc); 5591 return; 5592 } 5593 pthread_mutex_unlock(&desc->mutex); 5594 } 5595 5596 /* Must be called while holding bdev->internal.mutex. 5597 * returns: 0 - bdev removed and ready to be destructed. 5598 * -EBUSY - bdev can't be destructed yet. */ 5599 static int 5600 bdev_unregister_unsafe(struct spdk_bdev *bdev) 5601 { 5602 struct spdk_bdev_desc *desc, *tmp; 5603 int rc = 0; 5604 5605 /* Notify each descriptor about hotremoval */ 5606 TAILQ_FOREACH_SAFE(desc, &bdev->internal.open_descs, link, tmp) { 5607 rc = -EBUSY; 5608 pthread_mutex_lock(&desc->mutex); 5609 /* 5610 * Defer invocation of the event_cb to a separate message that will 5611 * run later on its thread. This ensures this context unwinds and 5612 * we don't recursively unregister this bdev again if the event_cb 5613 * immediately closes its descriptor. 5614 */ 5615 desc->refs++; 5616 spdk_thread_send_msg(desc->thread, _remove_notify, desc); 5617 pthread_mutex_unlock(&desc->mutex); 5618 } 5619 5620 /* If there are no descriptors, proceed removing the bdev */ 5621 if (rc == 0) { 5622 TAILQ_REMOVE(&g_bdev_mgr.bdevs, bdev, internal.link); 5623 SPDK_DEBUGLOG(bdev, "Removing bdev %s from list done\n", bdev->name); 5624 spdk_notify_send("bdev_unregister", spdk_bdev_get_name(bdev)); 5625 } 5626 5627 return rc; 5628 } 5629 5630 void 5631 spdk_bdev_unregister(struct spdk_bdev *bdev, spdk_bdev_unregister_cb cb_fn, void *cb_arg) 5632 { 5633 struct spdk_thread *thread; 5634 int rc; 5635 5636 SPDK_DEBUGLOG(bdev, "Removing bdev %s from list\n", bdev->name); 5637 5638 thread = spdk_get_thread(); 5639 if (!thread) { 5640 /* The user called this from a non-SPDK thread. */ 5641 if (cb_fn != NULL) { 5642 cb_fn(cb_arg, -ENOTSUP); 5643 } 5644 return; 5645 } 5646 5647 pthread_mutex_lock(&g_bdev_mgr.mutex); 5648 pthread_mutex_lock(&bdev->internal.mutex); 5649 if (bdev->internal.status == SPDK_BDEV_STATUS_REMOVING) { 5650 pthread_mutex_unlock(&bdev->internal.mutex); 5651 pthread_mutex_unlock(&g_bdev_mgr.mutex); 5652 if (cb_fn) { 5653 cb_fn(cb_arg, -EBUSY); 5654 } 5655 return; 5656 } 5657 5658 bdev->internal.status = SPDK_BDEV_STATUS_REMOVING; 5659 bdev->internal.unregister_cb = cb_fn; 5660 bdev->internal.unregister_ctx = cb_arg; 5661 5662 /* Call under lock. */ 5663 rc = bdev_unregister_unsafe(bdev); 5664 pthread_mutex_unlock(&bdev->internal.mutex); 5665 pthread_mutex_unlock(&g_bdev_mgr.mutex); 5666 5667 if (rc == 0) { 5668 bdev_fini(bdev); 5669 } 5670 } 5671 5672 static void 5673 bdev_dummy_event_cb(void *remove_ctx) 5674 { 5675 SPDK_DEBUGLOG(bdev, "Bdev remove event received with no remove callback specified"); 5676 } 5677 5678 static int 5679 bdev_start_qos(struct spdk_bdev *bdev) 5680 { 5681 struct set_qos_limit_ctx *ctx; 5682 5683 /* Enable QoS */ 5684 if (bdev->internal.qos && bdev->internal.qos->thread == NULL) { 5685 ctx = calloc(1, sizeof(*ctx)); 5686 if (ctx == NULL) { 5687 SPDK_ERRLOG("Failed to allocate memory for QoS context\n"); 5688 return -ENOMEM; 5689 } 5690 ctx->bdev = bdev; 5691 spdk_for_each_channel(__bdev_to_io_dev(bdev), 5692 bdev_enable_qos_msg, ctx, 5693 bdev_enable_qos_done); 5694 } 5695 5696 return 0; 5697 } 5698 5699 static int 5700 bdev_open(struct spdk_bdev *bdev, bool write, struct spdk_bdev_desc *desc) 5701 { 5702 struct spdk_thread *thread; 5703 int rc = 0; 5704 5705 thread = spdk_get_thread(); 5706 if (!thread) { 5707 SPDK_ERRLOG("Cannot open bdev from non-SPDK thread.\n"); 5708 return -ENOTSUP; 5709 } 5710 5711 SPDK_DEBUGLOG(bdev, "Opening descriptor %p for bdev %s on thread %p\n", desc, bdev->name, 5712 spdk_get_thread()); 5713 5714 desc->bdev = bdev; 5715 desc->thread = thread; 5716 desc->write = write; 5717 5718 pthread_mutex_lock(&bdev->internal.mutex); 5719 if (bdev->internal.status == SPDK_BDEV_STATUS_REMOVING) { 5720 pthread_mutex_unlock(&bdev->internal.mutex); 5721 return -ENODEV; 5722 } 5723 5724 if (write && bdev->internal.claim_module) { 5725 SPDK_ERRLOG("Could not open %s - %s module already claimed it\n", 5726 bdev->name, bdev->internal.claim_module->name); 5727 pthread_mutex_unlock(&bdev->internal.mutex); 5728 return -EPERM; 5729 } 5730 5731 rc = bdev_start_qos(bdev); 5732 if (rc != 0) { 5733 SPDK_ERRLOG("Failed to start QoS on bdev %s\n", bdev->name); 5734 pthread_mutex_unlock(&bdev->internal.mutex); 5735 return rc; 5736 } 5737 5738 TAILQ_INSERT_TAIL(&bdev->internal.open_descs, desc, link); 5739 5740 pthread_mutex_unlock(&bdev->internal.mutex); 5741 5742 return 0; 5743 } 5744 5745 int 5746 spdk_bdev_open(struct spdk_bdev *bdev, bool write, spdk_bdev_remove_cb_t remove_cb, 5747 void *remove_ctx, struct spdk_bdev_desc **_desc) 5748 { 5749 struct spdk_bdev_desc *desc; 5750 int rc; 5751 5752 desc = calloc(1, sizeof(*desc)); 5753 if (desc == NULL) { 5754 SPDK_ERRLOG("Failed to allocate memory for bdev descriptor\n"); 5755 return -ENOMEM; 5756 } 5757 5758 if (remove_cb == NULL) { 5759 remove_cb = bdev_dummy_event_cb; 5760 } 5761 5762 TAILQ_INIT(&desc->pending_media_events); 5763 TAILQ_INIT(&desc->free_media_events); 5764 5765 desc->callback.open_with_ext = false; 5766 desc->callback.remove_fn = remove_cb; 5767 desc->callback.ctx = remove_ctx; 5768 pthread_mutex_init(&desc->mutex, NULL); 5769 5770 pthread_mutex_lock(&g_bdev_mgr.mutex); 5771 5772 rc = bdev_open(bdev, write, desc); 5773 if (rc != 0) { 5774 bdev_desc_free(desc); 5775 desc = NULL; 5776 } 5777 5778 *_desc = desc; 5779 5780 pthread_mutex_unlock(&g_bdev_mgr.mutex); 5781 5782 return rc; 5783 } 5784 5785 int 5786 spdk_bdev_open_ext(const char *bdev_name, bool write, spdk_bdev_event_cb_t event_cb, 5787 void *event_ctx, struct spdk_bdev_desc **_desc) 5788 { 5789 struct spdk_bdev_desc *desc; 5790 struct spdk_bdev *bdev; 5791 unsigned int event_id; 5792 int rc; 5793 5794 if (event_cb == NULL) { 5795 SPDK_ERRLOG("Missing event callback function\n"); 5796 return -EINVAL; 5797 } 5798 5799 pthread_mutex_lock(&g_bdev_mgr.mutex); 5800 5801 bdev = spdk_bdev_get_by_name(bdev_name); 5802 5803 if (bdev == NULL) { 5804 SPDK_NOTICELOG("Currently unable to find bdev with name: %s\n", bdev_name); 5805 pthread_mutex_unlock(&g_bdev_mgr.mutex); 5806 return -ENODEV; 5807 } 5808 5809 desc = calloc(1, sizeof(*desc)); 5810 if (desc == NULL) { 5811 SPDK_ERRLOG("Failed to allocate memory for bdev descriptor\n"); 5812 pthread_mutex_unlock(&g_bdev_mgr.mutex); 5813 return -ENOMEM; 5814 } 5815 5816 TAILQ_INIT(&desc->pending_media_events); 5817 TAILQ_INIT(&desc->free_media_events); 5818 5819 desc->callback.open_with_ext = true; 5820 desc->callback.event_fn = event_cb; 5821 desc->callback.ctx = event_ctx; 5822 pthread_mutex_init(&desc->mutex, NULL); 5823 5824 if (bdev->media_events) { 5825 desc->media_events_buffer = calloc(MEDIA_EVENT_POOL_SIZE, 5826 sizeof(*desc->media_events_buffer)); 5827 if (desc->media_events_buffer == NULL) { 5828 SPDK_ERRLOG("Failed to initialize media event pool\n"); 5829 bdev_desc_free(desc); 5830 pthread_mutex_unlock(&g_bdev_mgr.mutex); 5831 return -ENOMEM; 5832 } 5833 5834 for (event_id = 0; event_id < MEDIA_EVENT_POOL_SIZE; ++event_id) { 5835 TAILQ_INSERT_TAIL(&desc->free_media_events, 5836 &desc->media_events_buffer[event_id], tailq); 5837 } 5838 } 5839 5840 rc = bdev_open(bdev, write, desc); 5841 if (rc != 0) { 5842 bdev_desc_free(desc); 5843 desc = NULL; 5844 } 5845 5846 *_desc = desc; 5847 5848 pthread_mutex_unlock(&g_bdev_mgr.mutex); 5849 5850 return rc; 5851 } 5852 5853 void 5854 spdk_bdev_close(struct spdk_bdev_desc *desc) 5855 { 5856 struct spdk_bdev *bdev = spdk_bdev_desc_get_bdev(desc); 5857 int rc; 5858 5859 SPDK_DEBUGLOG(bdev, "Closing descriptor %p for bdev %s on thread %p\n", desc, bdev->name, 5860 spdk_get_thread()); 5861 5862 assert(desc->thread == spdk_get_thread()); 5863 5864 spdk_poller_unregister(&desc->io_timeout_poller); 5865 5866 pthread_mutex_lock(&bdev->internal.mutex); 5867 pthread_mutex_lock(&desc->mutex); 5868 5869 TAILQ_REMOVE(&bdev->internal.open_descs, desc, link); 5870 5871 desc->closed = true; 5872 5873 if (0 == desc->refs) { 5874 pthread_mutex_unlock(&desc->mutex); 5875 bdev_desc_free(desc); 5876 } else { 5877 pthread_mutex_unlock(&desc->mutex); 5878 } 5879 5880 /* If no more descriptors, kill QoS channel */ 5881 if (bdev->internal.qos && TAILQ_EMPTY(&bdev->internal.open_descs)) { 5882 SPDK_DEBUGLOG(bdev, "Closed last descriptor for bdev %s on thread %p. Stopping QoS.\n", 5883 bdev->name, spdk_get_thread()); 5884 5885 if (bdev_qos_destroy(bdev)) { 5886 /* There isn't anything we can do to recover here. Just let the 5887 * old QoS poller keep running. The QoS handling won't change 5888 * cores when the user allocates a new channel, but it won't break. */ 5889 SPDK_ERRLOG("Unable to shut down QoS poller. It will continue running on the current thread.\n"); 5890 } 5891 } 5892 5893 spdk_bdev_set_qd_sampling_period(bdev, 0); 5894 5895 if (bdev->internal.status == SPDK_BDEV_STATUS_REMOVING && TAILQ_EMPTY(&bdev->internal.open_descs)) { 5896 rc = bdev_unregister_unsafe(bdev); 5897 pthread_mutex_unlock(&bdev->internal.mutex); 5898 5899 if (rc == 0) { 5900 bdev_fini(bdev); 5901 } 5902 } else { 5903 pthread_mutex_unlock(&bdev->internal.mutex); 5904 } 5905 } 5906 5907 int 5908 spdk_bdev_module_claim_bdev(struct spdk_bdev *bdev, struct spdk_bdev_desc *desc, 5909 struct spdk_bdev_module *module) 5910 { 5911 if (bdev->internal.claim_module != NULL) { 5912 SPDK_ERRLOG("bdev %s already claimed by module %s\n", bdev->name, 5913 bdev->internal.claim_module->name); 5914 return -EPERM; 5915 } 5916 5917 if (desc && !desc->write) { 5918 desc->write = true; 5919 } 5920 5921 bdev->internal.claim_module = module; 5922 return 0; 5923 } 5924 5925 void 5926 spdk_bdev_module_release_bdev(struct spdk_bdev *bdev) 5927 { 5928 assert(bdev->internal.claim_module != NULL); 5929 bdev->internal.claim_module = NULL; 5930 } 5931 5932 struct spdk_bdev * 5933 spdk_bdev_desc_get_bdev(struct spdk_bdev_desc *desc) 5934 { 5935 assert(desc != NULL); 5936 return desc->bdev; 5937 } 5938 5939 void 5940 spdk_bdev_io_get_iovec(struct spdk_bdev_io *bdev_io, struct iovec **iovp, int *iovcntp) 5941 { 5942 struct iovec *iovs; 5943 int iovcnt; 5944 5945 if (bdev_io == NULL) { 5946 return; 5947 } 5948 5949 switch (bdev_io->type) { 5950 case SPDK_BDEV_IO_TYPE_READ: 5951 case SPDK_BDEV_IO_TYPE_WRITE: 5952 case SPDK_BDEV_IO_TYPE_ZCOPY: 5953 iovs = bdev_io->u.bdev.iovs; 5954 iovcnt = bdev_io->u.bdev.iovcnt; 5955 break; 5956 default: 5957 iovs = NULL; 5958 iovcnt = 0; 5959 break; 5960 } 5961 5962 if (iovp) { 5963 *iovp = iovs; 5964 } 5965 if (iovcntp) { 5966 *iovcntp = iovcnt; 5967 } 5968 } 5969 5970 void * 5971 spdk_bdev_io_get_md_buf(struct spdk_bdev_io *bdev_io) 5972 { 5973 if (bdev_io == NULL) { 5974 return NULL; 5975 } 5976 5977 if (!spdk_bdev_is_md_separate(bdev_io->bdev)) { 5978 return NULL; 5979 } 5980 5981 if (bdev_io->type == SPDK_BDEV_IO_TYPE_READ || 5982 bdev_io->type == SPDK_BDEV_IO_TYPE_WRITE) { 5983 return bdev_io->u.bdev.md_buf; 5984 } 5985 5986 return NULL; 5987 } 5988 5989 void * 5990 spdk_bdev_io_get_cb_arg(struct spdk_bdev_io *bdev_io) 5991 { 5992 if (bdev_io == NULL) { 5993 assert(false); 5994 return NULL; 5995 } 5996 5997 return bdev_io->internal.caller_ctx; 5998 } 5999 6000 void 6001 spdk_bdev_module_list_add(struct spdk_bdev_module *bdev_module) 6002 { 6003 6004 if (spdk_bdev_module_list_find(bdev_module->name)) { 6005 SPDK_ERRLOG("ERROR: module '%s' already registered.\n", bdev_module->name); 6006 assert(false); 6007 } 6008 6009 /* 6010 * Modules with examine callbacks must be initialized first, so they are 6011 * ready to handle examine callbacks from later modules that will 6012 * register physical bdevs. 6013 */ 6014 if (bdev_module->examine_config != NULL || bdev_module->examine_disk != NULL) { 6015 TAILQ_INSERT_HEAD(&g_bdev_mgr.bdev_modules, bdev_module, internal.tailq); 6016 } else { 6017 TAILQ_INSERT_TAIL(&g_bdev_mgr.bdev_modules, bdev_module, internal.tailq); 6018 } 6019 } 6020 6021 struct spdk_bdev_module * 6022 spdk_bdev_module_list_find(const char *name) 6023 { 6024 struct spdk_bdev_module *bdev_module; 6025 6026 TAILQ_FOREACH(bdev_module, &g_bdev_mgr.bdev_modules, internal.tailq) { 6027 if (strcmp(name, bdev_module->name) == 0) { 6028 break; 6029 } 6030 } 6031 6032 return bdev_module; 6033 } 6034 6035 static void 6036 bdev_write_zero_buffer_next(void *_bdev_io) 6037 { 6038 struct spdk_bdev_io *bdev_io = _bdev_io; 6039 uint64_t num_bytes, num_blocks; 6040 void *md_buf = NULL; 6041 int rc; 6042 6043 num_bytes = spdk_min(_bdev_get_block_size_with_md(bdev_io->bdev) * 6044 bdev_io->u.bdev.split_remaining_num_blocks, 6045 ZERO_BUFFER_SIZE); 6046 num_blocks = num_bytes / _bdev_get_block_size_with_md(bdev_io->bdev); 6047 6048 if (spdk_bdev_is_md_separate(bdev_io->bdev)) { 6049 md_buf = (char *)g_bdev_mgr.zero_buffer + 6050 spdk_bdev_get_block_size(bdev_io->bdev) * num_blocks; 6051 } 6052 6053 rc = bdev_write_blocks_with_md(bdev_io->internal.desc, 6054 spdk_io_channel_from_ctx(bdev_io->internal.ch), 6055 g_bdev_mgr.zero_buffer, md_buf, 6056 bdev_io->u.bdev.split_current_offset_blocks, num_blocks, 6057 bdev_write_zero_buffer_done, bdev_io); 6058 if (rc == 0) { 6059 bdev_io->u.bdev.split_remaining_num_blocks -= num_blocks; 6060 bdev_io->u.bdev.split_current_offset_blocks += num_blocks; 6061 } else if (rc == -ENOMEM) { 6062 bdev_queue_io_wait_with_cb(bdev_io, bdev_write_zero_buffer_next); 6063 } else { 6064 bdev_io->internal.status = SPDK_BDEV_IO_STATUS_FAILED; 6065 bdev_io->internal.cb(bdev_io, false, bdev_io->internal.caller_ctx); 6066 } 6067 } 6068 6069 static void 6070 bdev_write_zero_buffer_done(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg) 6071 { 6072 struct spdk_bdev_io *parent_io = cb_arg; 6073 6074 spdk_bdev_free_io(bdev_io); 6075 6076 if (!success) { 6077 parent_io->internal.status = SPDK_BDEV_IO_STATUS_FAILED; 6078 parent_io->internal.cb(parent_io, false, parent_io->internal.caller_ctx); 6079 return; 6080 } 6081 6082 if (parent_io->u.bdev.split_remaining_num_blocks == 0) { 6083 parent_io->internal.status = SPDK_BDEV_IO_STATUS_SUCCESS; 6084 parent_io->internal.cb(parent_io, true, parent_io->internal.caller_ctx); 6085 return; 6086 } 6087 6088 bdev_write_zero_buffer_next(parent_io); 6089 } 6090 6091 static void 6092 bdev_set_qos_limit_done(struct set_qos_limit_ctx *ctx, int status) 6093 { 6094 pthread_mutex_lock(&ctx->bdev->internal.mutex); 6095 ctx->bdev->internal.qos_mod_in_progress = false; 6096 pthread_mutex_unlock(&ctx->bdev->internal.mutex); 6097 6098 if (ctx->cb_fn) { 6099 ctx->cb_fn(ctx->cb_arg, status); 6100 } 6101 free(ctx); 6102 } 6103 6104 static void 6105 bdev_disable_qos_done(void *cb_arg) 6106 { 6107 struct set_qos_limit_ctx *ctx = cb_arg; 6108 struct spdk_bdev *bdev = ctx->bdev; 6109 struct spdk_bdev_io *bdev_io; 6110 struct spdk_bdev_qos *qos; 6111 6112 pthread_mutex_lock(&bdev->internal.mutex); 6113 qos = bdev->internal.qos; 6114 bdev->internal.qos = NULL; 6115 pthread_mutex_unlock(&bdev->internal.mutex); 6116 6117 while (!TAILQ_EMPTY(&qos->queued)) { 6118 /* Send queued I/O back to their original thread for resubmission. */ 6119 bdev_io = TAILQ_FIRST(&qos->queued); 6120 TAILQ_REMOVE(&qos->queued, bdev_io, internal.link); 6121 6122 if (bdev_io->internal.io_submit_ch) { 6123 /* 6124 * Channel was changed when sending it to the QoS thread - change it back 6125 * before sending it back to the original thread. 6126 */ 6127 bdev_io->internal.ch = bdev_io->internal.io_submit_ch; 6128 bdev_io->internal.io_submit_ch = NULL; 6129 } 6130 6131 spdk_thread_send_msg(spdk_bdev_io_get_thread(bdev_io), 6132 _bdev_io_submit, bdev_io); 6133 } 6134 6135 if (qos->thread != NULL) { 6136 spdk_put_io_channel(spdk_io_channel_from_ctx(qos->ch)); 6137 spdk_poller_unregister(&qos->poller); 6138 } 6139 6140 free(qos); 6141 6142 bdev_set_qos_limit_done(ctx, 0); 6143 } 6144 6145 static void 6146 bdev_disable_qos_msg_done(struct spdk_io_channel_iter *i, int status) 6147 { 6148 void *io_device = spdk_io_channel_iter_get_io_device(i); 6149 struct spdk_bdev *bdev = __bdev_from_io_dev(io_device); 6150 struct set_qos_limit_ctx *ctx = spdk_io_channel_iter_get_ctx(i); 6151 struct spdk_thread *thread; 6152 6153 pthread_mutex_lock(&bdev->internal.mutex); 6154 thread = bdev->internal.qos->thread; 6155 pthread_mutex_unlock(&bdev->internal.mutex); 6156 6157 if (thread != NULL) { 6158 spdk_thread_send_msg(thread, bdev_disable_qos_done, ctx); 6159 } else { 6160 bdev_disable_qos_done(ctx); 6161 } 6162 } 6163 6164 static void 6165 bdev_disable_qos_msg(struct spdk_io_channel_iter *i) 6166 { 6167 struct spdk_io_channel *ch = spdk_io_channel_iter_get_channel(i); 6168 struct spdk_bdev_channel *bdev_ch = spdk_io_channel_get_ctx(ch); 6169 6170 bdev_ch->flags &= ~BDEV_CH_QOS_ENABLED; 6171 6172 spdk_for_each_channel_continue(i, 0); 6173 } 6174 6175 static void 6176 bdev_update_qos_rate_limit_msg(void *cb_arg) 6177 { 6178 struct set_qos_limit_ctx *ctx = cb_arg; 6179 struct spdk_bdev *bdev = ctx->bdev; 6180 6181 pthread_mutex_lock(&bdev->internal.mutex); 6182 bdev_qos_update_max_quota_per_timeslice(bdev->internal.qos); 6183 pthread_mutex_unlock(&bdev->internal.mutex); 6184 6185 bdev_set_qos_limit_done(ctx, 0); 6186 } 6187 6188 static void 6189 bdev_enable_qos_msg(struct spdk_io_channel_iter *i) 6190 { 6191 void *io_device = spdk_io_channel_iter_get_io_device(i); 6192 struct spdk_bdev *bdev = __bdev_from_io_dev(io_device); 6193 struct spdk_io_channel *ch = spdk_io_channel_iter_get_channel(i); 6194 struct spdk_bdev_channel *bdev_ch = spdk_io_channel_get_ctx(ch); 6195 6196 pthread_mutex_lock(&bdev->internal.mutex); 6197 bdev_enable_qos(bdev, bdev_ch); 6198 pthread_mutex_unlock(&bdev->internal.mutex); 6199 spdk_for_each_channel_continue(i, 0); 6200 } 6201 6202 static void 6203 bdev_enable_qos_done(struct spdk_io_channel_iter *i, int status) 6204 { 6205 struct set_qos_limit_ctx *ctx = spdk_io_channel_iter_get_ctx(i); 6206 6207 bdev_set_qos_limit_done(ctx, status); 6208 } 6209 6210 static void 6211 bdev_set_qos_rate_limits(struct spdk_bdev *bdev, uint64_t *limits) 6212 { 6213 int i; 6214 6215 assert(bdev->internal.qos != NULL); 6216 6217 for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) { 6218 if (limits[i] != SPDK_BDEV_QOS_LIMIT_NOT_DEFINED) { 6219 bdev->internal.qos->rate_limits[i].limit = limits[i]; 6220 6221 if (limits[i] == 0) { 6222 bdev->internal.qos->rate_limits[i].limit = 6223 SPDK_BDEV_QOS_LIMIT_NOT_DEFINED; 6224 } 6225 } 6226 } 6227 } 6228 6229 void 6230 spdk_bdev_set_qos_rate_limits(struct spdk_bdev *bdev, uint64_t *limits, 6231 void (*cb_fn)(void *cb_arg, int status), void *cb_arg) 6232 { 6233 struct set_qos_limit_ctx *ctx; 6234 uint32_t limit_set_complement; 6235 uint64_t min_limit_per_sec; 6236 int i; 6237 bool disable_rate_limit = true; 6238 6239 for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) { 6240 if (limits[i] == SPDK_BDEV_QOS_LIMIT_NOT_DEFINED) { 6241 continue; 6242 } 6243 6244 if (limits[i] > 0) { 6245 disable_rate_limit = false; 6246 } 6247 6248 if (bdev_qos_is_iops_rate_limit(i) == true) { 6249 min_limit_per_sec = SPDK_BDEV_QOS_MIN_IOS_PER_SEC; 6250 } else { 6251 /* Change from megabyte to byte rate limit */ 6252 limits[i] = limits[i] * 1024 * 1024; 6253 min_limit_per_sec = SPDK_BDEV_QOS_MIN_BYTES_PER_SEC; 6254 } 6255 6256 limit_set_complement = limits[i] % min_limit_per_sec; 6257 if (limit_set_complement) { 6258 SPDK_ERRLOG("Requested rate limit %" PRIu64 " is not a multiple of %" PRIu64 "\n", 6259 limits[i], min_limit_per_sec); 6260 limits[i] += min_limit_per_sec - limit_set_complement; 6261 SPDK_ERRLOG("Round up the rate limit to %" PRIu64 "\n", limits[i]); 6262 } 6263 } 6264 6265 ctx = calloc(1, sizeof(*ctx)); 6266 if (ctx == NULL) { 6267 cb_fn(cb_arg, -ENOMEM); 6268 return; 6269 } 6270 6271 ctx->cb_fn = cb_fn; 6272 ctx->cb_arg = cb_arg; 6273 ctx->bdev = bdev; 6274 6275 pthread_mutex_lock(&bdev->internal.mutex); 6276 if (bdev->internal.qos_mod_in_progress) { 6277 pthread_mutex_unlock(&bdev->internal.mutex); 6278 free(ctx); 6279 cb_fn(cb_arg, -EAGAIN); 6280 return; 6281 } 6282 bdev->internal.qos_mod_in_progress = true; 6283 6284 if (disable_rate_limit == true && bdev->internal.qos) { 6285 for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) { 6286 if (limits[i] == SPDK_BDEV_QOS_LIMIT_NOT_DEFINED && 6287 (bdev->internal.qos->rate_limits[i].limit > 0 && 6288 bdev->internal.qos->rate_limits[i].limit != 6289 SPDK_BDEV_QOS_LIMIT_NOT_DEFINED)) { 6290 disable_rate_limit = false; 6291 break; 6292 } 6293 } 6294 } 6295 6296 if (disable_rate_limit == false) { 6297 if (bdev->internal.qos == NULL) { 6298 bdev->internal.qos = calloc(1, sizeof(*bdev->internal.qos)); 6299 if (!bdev->internal.qos) { 6300 pthread_mutex_unlock(&bdev->internal.mutex); 6301 SPDK_ERRLOG("Unable to allocate memory for QoS tracking\n"); 6302 bdev_set_qos_limit_done(ctx, -ENOMEM); 6303 return; 6304 } 6305 } 6306 6307 if (bdev->internal.qos->thread == NULL) { 6308 /* Enabling */ 6309 bdev_set_qos_rate_limits(bdev, limits); 6310 6311 spdk_for_each_channel(__bdev_to_io_dev(bdev), 6312 bdev_enable_qos_msg, ctx, 6313 bdev_enable_qos_done); 6314 } else { 6315 /* Updating */ 6316 bdev_set_qos_rate_limits(bdev, limits); 6317 6318 spdk_thread_send_msg(bdev->internal.qos->thread, 6319 bdev_update_qos_rate_limit_msg, ctx); 6320 } 6321 } else { 6322 if (bdev->internal.qos != NULL) { 6323 bdev_set_qos_rate_limits(bdev, limits); 6324 6325 /* Disabling */ 6326 spdk_for_each_channel(__bdev_to_io_dev(bdev), 6327 bdev_disable_qos_msg, ctx, 6328 bdev_disable_qos_msg_done); 6329 } else { 6330 pthread_mutex_unlock(&bdev->internal.mutex); 6331 bdev_set_qos_limit_done(ctx, 0); 6332 return; 6333 } 6334 } 6335 6336 pthread_mutex_unlock(&bdev->internal.mutex); 6337 } 6338 6339 struct spdk_bdev_histogram_ctx { 6340 spdk_bdev_histogram_status_cb cb_fn; 6341 void *cb_arg; 6342 struct spdk_bdev *bdev; 6343 int status; 6344 }; 6345 6346 static void 6347 bdev_histogram_disable_channel_cb(struct spdk_io_channel_iter *i, int status) 6348 { 6349 struct spdk_bdev_histogram_ctx *ctx = spdk_io_channel_iter_get_ctx(i); 6350 6351 pthread_mutex_lock(&ctx->bdev->internal.mutex); 6352 ctx->bdev->internal.histogram_in_progress = false; 6353 pthread_mutex_unlock(&ctx->bdev->internal.mutex); 6354 ctx->cb_fn(ctx->cb_arg, ctx->status); 6355 free(ctx); 6356 } 6357 6358 static void 6359 bdev_histogram_disable_channel(struct spdk_io_channel_iter *i) 6360 { 6361 struct spdk_io_channel *_ch = spdk_io_channel_iter_get_channel(i); 6362 struct spdk_bdev_channel *ch = spdk_io_channel_get_ctx(_ch); 6363 6364 if (ch->histogram != NULL) { 6365 spdk_histogram_data_free(ch->histogram); 6366 ch->histogram = NULL; 6367 } 6368 spdk_for_each_channel_continue(i, 0); 6369 } 6370 6371 static void 6372 bdev_histogram_enable_channel_cb(struct spdk_io_channel_iter *i, int status) 6373 { 6374 struct spdk_bdev_histogram_ctx *ctx = spdk_io_channel_iter_get_ctx(i); 6375 6376 if (status != 0) { 6377 ctx->status = status; 6378 ctx->bdev->internal.histogram_enabled = false; 6379 spdk_for_each_channel(__bdev_to_io_dev(ctx->bdev), bdev_histogram_disable_channel, ctx, 6380 bdev_histogram_disable_channel_cb); 6381 } else { 6382 pthread_mutex_lock(&ctx->bdev->internal.mutex); 6383 ctx->bdev->internal.histogram_in_progress = false; 6384 pthread_mutex_unlock(&ctx->bdev->internal.mutex); 6385 ctx->cb_fn(ctx->cb_arg, ctx->status); 6386 free(ctx); 6387 } 6388 } 6389 6390 static void 6391 bdev_histogram_enable_channel(struct spdk_io_channel_iter *i) 6392 { 6393 struct spdk_io_channel *_ch = spdk_io_channel_iter_get_channel(i); 6394 struct spdk_bdev_channel *ch = spdk_io_channel_get_ctx(_ch); 6395 int status = 0; 6396 6397 if (ch->histogram == NULL) { 6398 ch->histogram = spdk_histogram_data_alloc(); 6399 if (ch->histogram == NULL) { 6400 status = -ENOMEM; 6401 } 6402 } 6403 6404 spdk_for_each_channel_continue(i, status); 6405 } 6406 6407 void 6408 spdk_bdev_histogram_enable(struct spdk_bdev *bdev, spdk_bdev_histogram_status_cb cb_fn, 6409 void *cb_arg, bool enable) 6410 { 6411 struct spdk_bdev_histogram_ctx *ctx; 6412 6413 ctx = calloc(1, sizeof(struct spdk_bdev_histogram_ctx)); 6414 if (ctx == NULL) { 6415 cb_fn(cb_arg, -ENOMEM); 6416 return; 6417 } 6418 6419 ctx->bdev = bdev; 6420 ctx->status = 0; 6421 ctx->cb_fn = cb_fn; 6422 ctx->cb_arg = cb_arg; 6423 6424 pthread_mutex_lock(&bdev->internal.mutex); 6425 if (bdev->internal.histogram_in_progress) { 6426 pthread_mutex_unlock(&bdev->internal.mutex); 6427 free(ctx); 6428 cb_fn(cb_arg, -EAGAIN); 6429 return; 6430 } 6431 6432 bdev->internal.histogram_in_progress = true; 6433 pthread_mutex_unlock(&bdev->internal.mutex); 6434 6435 bdev->internal.histogram_enabled = enable; 6436 6437 if (enable) { 6438 /* Allocate histogram for each channel */ 6439 spdk_for_each_channel(__bdev_to_io_dev(bdev), bdev_histogram_enable_channel, ctx, 6440 bdev_histogram_enable_channel_cb); 6441 } else { 6442 spdk_for_each_channel(__bdev_to_io_dev(bdev), bdev_histogram_disable_channel, ctx, 6443 bdev_histogram_disable_channel_cb); 6444 } 6445 } 6446 6447 struct spdk_bdev_histogram_data_ctx { 6448 spdk_bdev_histogram_data_cb cb_fn; 6449 void *cb_arg; 6450 struct spdk_bdev *bdev; 6451 /** merged histogram data from all channels */ 6452 struct spdk_histogram_data *histogram; 6453 }; 6454 6455 static void 6456 bdev_histogram_get_channel_cb(struct spdk_io_channel_iter *i, int status) 6457 { 6458 struct spdk_bdev_histogram_data_ctx *ctx = spdk_io_channel_iter_get_ctx(i); 6459 6460 ctx->cb_fn(ctx->cb_arg, status, ctx->histogram); 6461 free(ctx); 6462 } 6463 6464 static void 6465 bdev_histogram_get_channel(struct spdk_io_channel_iter *i) 6466 { 6467 struct spdk_io_channel *_ch = spdk_io_channel_iter_get_channel(i); 6468 struct spdk_bdev_channel *ch = spdk_io_channel_get_ctx(_ch); 6469 struct spdk_bdev_histogram_data_ctx *ctx = spdk_io_channel_iter_get_ctx(i); 6470 int status = 0; 6471 6472 if (ch->histogram == NULL) { 6473 status = -EFAULT; 6474 } else { 6475 spdk_histogram_data_merge(ctx->histogram, ch->histogram); 6476 } 6477 6478 spdk_for_each_channel_continue(i, status); 6479 } 6480 6481 void 6482 spdk_bdev_histogram_get(struct spdk_bdev *bdev, struct spdk_histogram_data *histogram, 6483 spdk_bdev_histogram_data_cb cb_fn, 6484 void *cb_arg) 6485 { 6486 struct spdk_bdev_histogram_data_ctx *ctx; 6487 6488 ctx = calloc(1, sizeof(struct spdk_bdev_histogram_data_ctx)); 6489 if (ctx == NULL) { 6490 cb_fn(cb_arg, -ENOMEM, NULL); 6491 return; 6492 } 6493 6494 ctx->bdev = bdev; 6495 ctx->cb_fn = cb_fn; 6496 ctx->cb_arg = cb_arg; 6497 6498 ctx->histogram = histogram; 6499 6500 spdk_for_each_channel(__bdev_to_io_dev(bdev), bdev_histogram_get_channel, ctx, 6501 bdev_histogram_get_channel_cb); 6502 } 6503 6504 size_t 6505 spdk_bdev_get_media_events(struct spdk_bdev_desc *desc, struct spdk_bdev_media_event *events, 6506 size_t max_events) 6507 { 6508 struct media_event_entry *entry; 6509 size_t num_events = 0; 6510 6511 for (; num_events < max_events; ++num_events) { 6512 entry = TAILQ_FIRST(&desc->pending_media_events); 6513 if (entry == NULL) { 6514 break; 6515 } 6516 6517 events[num_events] = entry->event; 6518 TAILQ_REMOVE(&desc->pending_media_events, entry, tailq); 6519 TAILQ_INSERT_TAIL(&desc->free_media_events, entry, tailq); 6520 } 6521 6522 return num_events; 6523 } 6524 6525 int 6526 spdk_bdev_push_media_events(struct spdk_bdev *bdev, const struct spdk_bdev_media_event *events, 6527 size_t num_events) 6528 { 6529 struct spdk_bdev_desc *desc; 6530 struct media_event_entry *entry; 6531 size_t event_id; 6532 int rc = 0; 6533 6534 assert(bdev->media_events); 6535 6536 pthread_mutex_lock(&bdev->internal.mutex); 6537 TAILQ_FOREACH(desc, &bdev->internal.open_descs, link) { 6538 if (desc->write) { 6539 break; 6540 } 6541 } 6542 6543 if (desc == NULL || desc->media_events_buffer == NULL) { 6544 rc = -ENODEV; 6545 goto out; 6546 } 6547 6548 for (event_id = 0; event_id < num_events; ++event_id) { 6549 entry = TAILQ_FIRST(&desc->free_media_events); 6550 if (entry == NULL) { 6551 break; 6552 } 6553 6554 TAILQ_REMOVE(&desc->free_media_events, entry, tailq); 6555 TAILQ_INSERT_TAIL(&desc->pending_media_events, entry, tailq); 6556 entry->event = events[event_id]; 6557 } 6558 6559 rc = event_id; 6560 out: 6561 pthread_mutex_unlock(&bdev->internal.mutex); 6562 return rc; 6563 } 6564 6565 void 6566 spdk_bdev_notify_media_management(struct spdk_bdev *bdev) 6567 { 6568 struct spdk_bdev_desc *desc; 6569 6570 pthread_mutex_lock(&bdev->internal.mutex); 6571 TAILQ_FOREACH(desc, &bdev->internal.open_descs, link) { 6572 if (!TAILQ_EMPTY(&desc->pending_media_events)) { 6573 desc->callback.event_fn(SPDK_BDEV_EVENT_MEDIA_MANAGEMENT, bdev, 6574 desc->callback.ctx); 6575 } 6576 } 6577 pthread_mutex_unlock(&bdev->internal.mutex); 6578 } 6579 6580 struct locked_lba_range_ctx { 6581 struct lba_range range; 6582 struct spdk_bdev *bdev; 6583 struct lba_range *current_range; 6584 struct lba_range *owner_range; 6585 struct spdk_poller *poller; 6586 lock_range_cb cb_fn; 6587 void *cb_arg; 6588 }; 6589 6590 static void 6591 bdev_lock_error_cleanup_cb(struct spdk_io_channel_iter *i, int status) 6592 { 6593 struct locked_lba_range_ctx *ctx = spdk_io_channel_iter_get_ctx(i); 6594 6595 ctx->cb_fn(ctx->cb_arg, -ENOMEM); 6596 free(ctx); 6597 } 6598 6599 static void 6600 bdev_unlock_lba_range_get_channel(struct spdk_io_channel_iter *i); 6601 6602 static void 6603 bdev_lock_lba_range_cb(struct spdk_io_channel_iter *i, int status) 6604 { 6605 struct locked_lba_range_ctx *ctx = spdk_io_channel_iter_get_ctx(i); 6606 struct spdk_bdev *bdev = ctx->bdev; 6607 6608 if (status == -ENOMEM) { 6609 /* One of the channels could not allocate a range object. 6610 * So we have to go back and clean up any ranges that were 6611 * allocated successfully before we return error status to 6612 * the caller. We can reuse the unlock function to do that 6613 * clean up. 6614 */ 6615 spdk_for_each_channel(__bdev_to_io_dev(bdev), 6616 bdev_unlock_lba_range_get_channel, ctx, 6617 bdev_lock_error_cleanup_cb); 6618 return; 6619 } 6620 6621 /* All channels have locked this range and no I/O overlapping the range 6622 * are outstanding! Set the owner_ch for the range object for the 6623 * locking channel, so that this channel will know that it is allowed 6624 * to write to this range. 6625 */ 6626 ctx->owner_range->owner_ch = ctx->range.owner_ch; 6627 ctx->cb_fn(ctx->cb_arg, status); 6628 6629 /* Don't free the ctx here. Its range is in the bdev's global list of 6630 * locked ranges still, and will be removed and freed when this range 6631 * is later unlocked. 6632 */ 6633 } 6634 6635 static int 6636 bdev_lock_lba_range_check_io(void *_i) 6637 { 6638 struct spdk_io_channel_iter *i = _i; 6639 struct spdk_io_channel *_ch = spdk_io_channel_iter_get_channel(i); 6640 struct spdk_bdev_channel *ch = spdk_io_channel_get_ctx(_ch); 6641 struct locked_lba_range_ctx *ctx = spdk_io_channel_iter_get_ctx(i); 6642 struct lba_range *range = ctx->current_range; 6643 struct spdk_bdev_io *bdev_io; 6644 6645 spdk_poller_unregister(&ctx->poller); 6646 6647 /* The range is now in the locked_ranges, so no new IO can be submitted to this 6648 * range. But we need to wait until any outstanding IO overlapping with this range 6649 * are completed. 6650 */ 6651 TAILQ_FOREACH(bdev_io, &ch->io_submitted, internal.ch_link) { 6652 if (bdev_io_range_is_locked(bdev_io, range)) { 6653 ctx->poller = SPDK_POLLER_REGISTER(bdev_lock_lba_range_check_io, i, 100); 6654 return SPDK_POLLER_BUSY; 6655 } 6656 } 6657 6658 spdk_for_each_channel_continue(i, 0); 6659 return SPDK_POLLER_BUSY; 6660 } 6661 6662 static void 6663 bdev_lock_lba_range_get_channel(struct spdk_io_channel_iter *i) 6664 { 6665 struct spdk_io_channel *_ch = spdk_io_channel_iter_get_channel(i); 6666 struct spdk_bdev_channel *ch = spdk_io_channel_get_ctx(_ch); 6667 struct locked_lba_range_ctx *ctx = spdk_io_channel_iter_get_ctx(i); 6668 struct lba_range *range; 6669 6670 TAILQ_FOREACH(range, &ch->locked_ranges, tailq) { 6671 if (range->length == ctx->range.length && 6672 range->offset == ctx->range.offset && 6673 range->locked_ctx == ctx->range.locked_ctx) { 6674 /* This range already exists on this channel, so don't add 6675 * it again. This can happen when a new channel is created 6676 * while the for_each_channel operation is in progress. 6677 * Do not check for outstanding I/O in that case, since the 6678 * range was locked before any I/O could be submitted to the 6679 * new channel. 6680 */ 6681 spdk_for_each_channel_continue(i, 0); 6682 return; 6683 } 6684 } 6685 6686 range = calloc(1, sizeof(*range)); 6687 if (range == NULL) { 6688 spdk_for_each_channel_continue(i, -ENOMEM); 6689 return; 6690 } 6691 6692 range->length = ctx->range.length; 6693 range->offset = ctx->range.offset; 6694 range->locked_ctx = ctx->range.locked_ctx; 6695 ctx->current_range = range; 6696 if (ctx->range.owner_ch == ch) { 6697 /* This is the range object for the channel that will hold 6698 * the lock. Store it in the ctx object so that we can easily 6699 * set its owner_ch after the lock is finally acquired. 6700 */ 6701 ctx->owner_range = range; 6702 } 6703 TAILQ_INSERT_TAIL(&ch->locked_ranges, range, tailq); 6704 bdev_lock_lba_range_check_io(i); 6705 } 6706 6707 static void 6708 bdev_lock_lba_range_ctx(struct spdk_bdev *bdev, struct locked_lba_range_ctx *ctx) 6709 { 6710 assert(spdk_get_thread() == ctx->range.owner_ch->channel->thread); 6711 6712 /* We will add a copy of this range to each channel now. */ 6713 spdk_for_each_channel(__bdev_to_io_dev(bdev), bdev_lock_lba_range_get_channel, ctx, 6714 bdev_lock_lba_range_cb); 6715 } 6716 6717 static bool 6718 bdev_lba_range_overlaps_tailq(struct lba_range *range, lba_range_tailq_t *tailq) 6719 { 6720 struct lba_range *r; 6721 6722 TAILQ_FOREACH(r, tailq, tailq) { 6723 if (bdev_lba_range_overlapped(range, r)) { 6724 return true; 6725 } 6726 } 6727 return false; 6728 } 6729 6730 static int 6731 bdev_lock_lba_range(struct spdk_bdev_desc *desc, struct spdk_io_channel *_ch, 6732 uint64_t offset, uint64_t length, 6733 lock_range_cb cb_fn, void *cb_arg) 6734 { 6735 struct spdk_bdev *bdev = spdk_bdev_desc_get_bdev(desc); 6736 struct spdk_bdev_channel *ch = spdk_io_channel_get_ctx(_ch); 6737 struct locked_lba_range_ctx *ctx; 6738 6739 if (cb_arg == NULL) { 6740 SPDK_ERRLOG("cb_arg must not be NULL\n"); 6741 return -EINVAL; 6742 } 6743 6744 ctx = calloc(1, sizeof(*ctx)); 6745 if (ctx == NULL) { 6746 return -ENOMEM; 6747 } 6748 6749 ctx->range.offset = offset; 6750 ctx->range.length = length; 6751 ctx->range.owner_ch = ch; 6752 ctx->range.locked_ctx = cb_arg; 6753 ctx->bdev = bdev; 6754 ctx->cb_fn = cb_fn; 6755 ctx->cb_arg = cb_arg; 6756 6757 pthread_mutex_lock(&bdev->internal.mutex); 6758 if (bdev_lba_range_overlaps_tailq(&ctx->range, &bdev->internal.locked_ranges)) { 6759 /* There is an active lock overlapping with this range. 6760 * Put it on the pending list until this range no 6761 * longer overlaps with another. 6762 */ 6763 TAILQ_INSERT_TAIL(&bdev->internal.pending_locked_ranges, &ctx->range, tailq); 6764 } else { 6765 TAILQ_INSERT_TAIL(&bdev->internal.locked_ranges, &ctx->range, tailq); 6766 bdev_lock_lba_range_ctx(bdev, ctx); 6767 } 6768 pthread_mutex_unlock(&bdev->internal.mutex); 6769 return 0; 6770 } 6771 6772 static void 6773 bdev_lock_lba_range_ctx_msg(void *_ctx) 6774 { 6775 struct locked_lba_range_ctx *ctx = _ctx; 6776 6777 bdev_lock_lba_range_ctx(ctx->bdev, ctx); 6778 } 6779 6780 static void 6781 bdev_unlock_lba_range_cb(struct spdk_io_channel_iter *i, int status) 6782 { 6783 struct locked_lba_range_ctx *ctx = spdk_io_channel_iter_get_ctx(i); 6784 struct locked_lba_range_ctx *pending_ctx; 6785 struct spdk_bdev_channel *ch = ctx->range.owner_ch; 6786 struct spdk_bdev *bdev = ch->bdev; 6787 struct lba_range *range, *tmp; 6788 6789 pthread_mutex_lock(&bdev->internal.mutex); 6790 /* Check if there are any pending locked ranges that overlap with this range 6791 * that was just unlocked. If there are, check that it doesn't overlap with any 6792 * other locked ranges before calling bdev_lock_lba_range_ctx which will start 6793 * the lock process. 6794 */ 6795 TAILQ_FOREACH_SAFE(range, &bdev->internal.pending_locked_ranges, tailq, tmp) { 6796 if (bdev_lba_range_overlapped(range, &ctx->range) && 6797 !bdev_lba_range_overlaps_tailq(range, &bdev->internal.locked_ranges)) { 6798 TAILQ_REMOVE(&bdev->internal.pending_locked_ranges, range, tailq); 6799 pending_ctx = SPDK_CONTAINEROF(range, struct locked_lba_range_ctx, range); 6800 TAILQ_INSERT_TAIL(&bdev->internal.locked_ranges, range, tailq); 6801 spdk_thread_send_msg(pending_ctx->range.owner_ch->channel->thread, 6802 bdev_lock_lba_range_ctx_msg, pending_ctx); 6803 } 6804 } 6805 pthread_mutex_unlock(&bdev->internal.mutex); 6806 6807 ctx->cb_fn(ctx->cb_arg, status); 6808 free(ctx); 6809 } 6810 6811 static void 6812 bdev_unlock_lba_range_get_channel(struct spdk_io_channel_iter *i) 6813 { 6814 struct spdk_io_channel *_ch = spdk_io_channel_iter_get_channel(i); 6815 struct spdk_bdev_channel *ch = spdk_io_channel_get_ctx(_ch); 6816 struct locked_lba_range_ctx *ctx = spdk_io_channel_iter_get_ctx(i); 6817 TAILQ_HEAD(, spdk_bdev_io) io_locked; 6818 struct spdk_bdev_io *bdev_io; 6819 struct lba_range *range; 6820 6821 TAILQ_FOREACH(range, &ch->locked_ranges, tailq) { 6822 if (ctx->range.offset == range->offset && 6823 ctx->range.length == range->length && 6824 ctx->range.locked_ctx == range->locked_ctx) { 6825 TAILQ_REMOVE(&ch->locked_ranges, range, tailq); 6826 free(range); 6827 break; 6828 } 6829 } 6830 6831 /* Note: we should almost always be able to assert that the range specified 6832 * was found. But there are some very rare corner cases where a new channel 6833 * gets created simultaneously with a range unlock, where this function 6834 * would execute on that new channel and wouldn't have the range. 6835 * We also use this to clean up range allocations when a later allocation 6836 * fails in the locking path. 6837 * So we can't actually assert() here. 6838 */ 6839 6840 /* Swap the locked IO into a temporary list, and then try to submit them again. 6841 * We could hyper-optimize this to only resubmit locked I/O that overlap 6842 * with the range that was just unlocked, but this isn't a performance path so 6843 * we go for simplicity here. 6844 */ 6845 TAILQ_INIT(&io_locked); 6846 TAILQ_SWAP(&ch->io_locked, &io_locked, spdk_bdev_io, internal.ch_link); 6847 while (!TAILQ_EMPTY(&io_locked)) { 6848 bdev_io = TAILQ_FIRST(&io_locked); 6849 TAILQ_REMOVE(&io_locked, bdev_io, internal.ch_link); 6850 bdev_io_submit(bdev_io); 6851 } 6852 6853 spdk_for_each_channel_continue(i, 0); 6854 } 6855 6856 static int 6857 bdev_unlock_lba_range(struct spdk_bdev_desc *desc, struct spdk_io_channel *_ch, 6858 uint64_t offset, uint64_t length, 6859 lock_range_cb cb_fn, void *cb_arg) 6860 { 6861 struct spdk_bdev *bdev = spdk_bdev_desc_get_bdev(desc); 6862 struct spdk_bdev_channel *ch = spdk_io_channel_get_ctx(_ch); 6863 struct locked_lba_range_ctx *ctx; 6864 struct lba_range *range; 6865 bool range_found = false; 6866 6867 /* Let's make sure the specified channel actually has a lock on 6868 * the specified range. Note that the range must match exactly. 6869 */ 6870 TAILQ_FOREACH(range, &ch->locked_ranges, tailq) { 6871 if (range->offset == offset && range->length == length && 6872 range->owner_ch == ch && range->locked_ctx == cb_arg) { 6873 range_found = true; 6874 break; 6875 } 6876 } 6877 6878 if (!range_found) { 6879 return -EINVAL; 6880 } 6881 6882 pthread_mutex_lock(&bdev->internal.mutex); 6883 /* We confirmed that this channel has locked the specified range. To 6884 * start the unlock the process, we find the range in the bdev's locked_ranges 6885 * and remove it. This ensures new channels don't inherit the locked range. 6886 * Then we will send a message to each channel (including the one specified 6887 * here) to remove the range from its per-channel list. 6888 */ 6889 TAILQ_FOREACH(range, &bdev->internal.locked_ranges, tailq) { 6890 if (range->offset == offset && range->length == length && 6891 range->locked_ctx == cb_arg) { 6892 break; 6893 } 6894 } 6895 if (range == NULL) { 6896 assert(false); 6897 pthread_mutex_unlock(&bdev->internal.mutex); 6898 return -EINVAL; 6899 } 6900 TAILQ_REMOVE(&bdev->internal.locked_ranges, range, tailq); 6901 ctx = SPDK_CONTAINEROF(range, struct locked_lba_range_ctx, range); 6902 pthread_mutex_unlock(&bdev->internal.mutex); 6903 6904 ctx->cb_fn = cb_fn; 6905 ctx->cb_arg = cb_arg; 6906 6907 spdk_for_each_channel(__bdev_to_io_dev(bdev), bdev_unlock_lba_range_get_channel, ctx, 6908 bdev_unlock_lba_range_cb); 6909 return 0; 6910 } 6911 6912 SPDK_LOG_REGISTER_COMPONENT(bdev) 6913 6914 SPDK_TRACE_REGISTER_FN(bdev_trace, "bdev", TRACE_GROUP_BDEV) 6915 { 6916 spdk_trace_register_owner(OWNER_BDEV, 'b'); 6917 spdk_trace_register_object(OBJECT_BDEV_IO, 'i'); 6918 spdk_trace_register_description("BDEV_IO_START", TRACE_BDEV_IO_START, OWNER_BDEV, 6919 OBJECT_BDEV_IO, 1, 0, "type: "); 6920 spdk_trace_register_description("BDEV_IO_DONE", TRACE_BDEV_IO_DONE, OWNER_BDEV, 6921 OBJECT_BDEV_IO, 0, 0, ""); 6922 } 6923