1 /*- 2 * BSD LICENSE 3 * 4 * Copyright (c) Intel Corporation. All rights reserved. 5 * Copyright (c) 2019 Mellanox Technologies LTD. All rights reserved. 6 * 7 * Redistribution and use in source and binary forms, with or without 8 * modification, are permitted provided that the following conditions 9 * are met: 10 * 11 * * Redistributions of source code must retain the above copyright 12 * notice, this list of conditions and the following disclaimer. 13 * * Redistributions in binary form must reproduce the above copyright 14 * notice, this list of conditions and the following disclaimer in 15 * the documentation and/or other materials provided with the 16 * distribution. 17 * * Neither the name of Intel Corporation nor the names of its 18 * contributors may be used to endorse or promote products derived 19 * from this software without specific prior written permission. 20 * 21 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 22 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 23 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR 24 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT 25 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, 26 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT 27 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 28 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY 29 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 30 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE 31 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 32 */ 33 34 #include "spdk/stdinc.h" 35 36 #include "spdk/bdev.h" 37 #include "spdk/conf.h" 38 39 #include "spdk/config.h" 40 #include "spdk/env.h" 41 #include "spdk/event.h" 42 #include "spdk/thread.h" 43 #include "spdk/likely.h" 44 #include "spdk/queue.h" 45 #include "spdk/nvme_spec.h" 46 #include "spdk/scsi_spec.h" 47 #include "spdk/notify.h" 48 #include "spdk/util.h" 49 #include "spdk/trace.h" 50 51 #include "spdk/bdev_module.h" 52 #include "spdk_internal/log.h" 53 #include "spdk/string.h" 54 55 #include "bdev_internal.h" 56 57 #ifdef SPDK_CONFIG_VTUNE 58 #include "ittnotify.h" 59 #include "ittnotify_types.h" 60 int __itt_init_ittlib(const char *, __itt_group_id); 61 #endif 62 63 #define SPDK_BDEV_IO_POOL_SIZE (64 * 1024 - 1) 64 #define SPDK_BDEV_IO_CACHE_SIZE 256 65 #define BUF_SMALL_POOL_SIZE 8191 66 #define BUF_LARGE_POOL_SIZE 1023 67 #define NOMEM_THRESHOLD_COUNT 8 68 #define ZERO_BUFFER_SIZE 0x100000 69 70 #define OWNER_BDEV 0x2 71 72 #define OBJECT_BDEV_IO 0x2 73 74 #define TRACE_GROUP_BDEV 0x3 75 #define TRACE_BDEV_IO_START SPDK_TPOINT_ID(TRACE_GROUP_BDEV, 0x0) 76 #define TRACE_BDEV_IO_DONE SPDK_TPOINT_ID(TRACE_GROUP_BDEV, 0x1) 77 78 #define SPDK_BDEV_QOS_TIMESLICE_IN_USEC 1000 79 #define SPDK_BDEV_QOS_MIN_IO_PER_TIMESLICE 1 80 #define SPDK_BDEV_QOS_MIN_BYTE_PER_TIMESLICE 512 81 #define SPDK_BDEV_QOS_MIN_IOS_PER_SEC 1000 82 #define SPDK_BDEV_QOS_MIN_BYTES_PER_SEC (1024 * 1024) 83 #define SPDK_BDEV_QOS_LIMIT_NOT_DEFINED UINT64_MAX 84 #define SPDK_BDEV_IO_POLL_INTERVAL_IN_MSEC 1000 85 86 #define SPDK_BDEV_POOL_ALIGNMENT 512 87 88 static const char *qos_conf_type[] = {"Limit_IOPS", 89 "Limit_BPS", "Limit_Read_BPS", "Limit_Write_BPS" 90 }; 91 static const char *qos_rpc_type[] = {"rw_ios_per_sec", 92 "rw_mbytes_per_sec", "r_mbytes_per_sec", "w_mbytes_per_sec" 93 }; 94 95 TAILQ_HEAD(spdk_bdev_list, spdk_bdev); 96 97 struct spdk_bdev_mgr { 98 struct spdk_mempool *bdev_io_pool; 99 100 struct spdk_mempool *buf_small_pool; 101 struct spdk_mempool *buf_large_pool; 102 103 void *zero_buffer; 104 105 TAILQ_HEAD(bdev_module_list, spdk_bdev_module) bdev_modules; 106 107 struct spdk_bdev_list bdevs; 108 109 bool init_complete; 110 bool module_init_complete; 111 112 pthread_mutex_t mutex; 113 114 #ifdef SPDK_CONFIG_VTUNE 115 __itt_domain *domain; 116 #endif 117 }; 118 119 static struct spdk_bdev_mgr g_bdev_mgr = { 120 .bdev_modules = TAILQ_HEAD_INITIALIZER(g_bdev_mgr.bdev_modules), 121 .bdevs = TAILQ_HEAD_INITIALIZER(g_bdev_mgr.bdevs), 122 .init_complete = false, 123 .module_init_complete = false, 124 .mutex = PTHREAD_MUTEX_INITIALIZER, 125 }; 126 127 128 static struct spdk_bdev_opts g_bdev_opts = { 129 .bdev_io_pool_size = SPDK_BDEV_IO_POOL_SIZE, 130 .bdev_io_cache_size = SPDK_BDEV_IO_CACHE_SIZE, 131 }; 132 133 static spdk_bdev_init_cb g_init_cb_fn = NULL; 134 static void *g_init_cb_arg = NULL; 135 136 static spdk_bdev_fini_cb g_fini_cb_fn = NULL; 137 static void *g_fini_cb_arg = NULL; 138 static struct spdk_thread *g_fini_thread = NULL; 139 140 struct spdk_bdev_qos_limit { 141 /** IOs or bytes allowed per second (i.e., 1s). */ 142 uint64_t limit; 143 144 /** Remaining IOs or bytes allowed in current timeslice (e.g., 1ms). 145 * For remaining bytes, allowed to run negative if an I/O is submitted when 146 * some bytes are remaining, but the I/O is bigger than that amount. The 147 * excess will be deducted from the next timeslice. 148 */ 149 int64_t remaining_this_timeslice; 150 151 /** Minimum allowed IOs or bytes to be issued in one timeslice (e.g., 1ms). */ 152 uint32_t min_per_timeslice; 153 154 /** Maximum allowed IOs or bytes to be issued in one timeslice (e.g., 1ms). */ 155 uint32_t max_per_timeslice; 156 157 /** Function to check whether to queue the IO. */ 158 bool (*queue_io)(const struct spdk_bdev_qos_limit *limit, struct spdk_bdev_io *io); 159 160 /** Function to update for the submitted IO. */ 161 void (*update_quota)(struct spdk_bdev_qos_limit *limit, struct spdk_bdev_io *io); 162 }; 163 164 struct spdk_bdev_qos { 165 /** Types of structure of rate limits. */ 166 struct spdk_bdev_qos_limit rate_limits[SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES]; 167 168 /** The channel that all I/O are funneled through. */ 169 struct spdk_bdev_channel *ch; 170 171 /** The thread on which the poller is running. */ 172 struct spdk_thread *thread; 173 174 /** Queue of I/O waiting to be issued. */ 175 bdev_io_tailq_t queued; 176 177 /** Size of a timeslice in tsc ticks. */ 178 uint64_t timeslice_size; 179 180 /** Timestamp of start of last timeslice. */ 181 uint64_t last_timeslice; 182 183 /** Poller that processes queued I/O commands each time slice. */ 184 struct spdk_poller *poller; 185 }; 186 187 struct spdk_bdev_mgmt_channel { 188 bdev_io_stailq_t need_buf_small; 189 bdev_io_stailq_t need_buf_large; 190 191 /* 192 * Each thread keeps a cache of bdev_io - this allows 193 * bdev threads which are *not* DPDK threads to still 194 * benefit from a per-thread bdev_io cache. Without 195 * this, non-DPDK threads fetching from the mempool 196 * incur a cmpxchg on get and put. 197 */ 198 bdev_io_stailq_t per_thread_cache; 199 uint32_t per_thread_cache_count; 200 uint32_t bdev_io_cache_size; 201 202 TAILQ_HEAD(, spdk_bdev_shared_resource) shared_resources; 203 TAILQ_HEAD(, spdk_bdev_io_wait_entry) io_wait_queue; 204 }; 205 206 /* 207 * Per-module (or per-io_device) data. Multiple bdevs built on the same io_device 208 * will queue here their IO that awaits retry. It makes it possible to retry sending 209 * IO to one bdev after IO from other bdev completes. 210 */ 211 struct spdk_bdev_shared_resource { 212 /* The bdev management channel */ 213 struct spdk_bdev_mgmt_channel *mgmt_ch; 214 215 /* 216 * Count of I/O submitted to bdev module and waiting for completion. 217 * Incremented before submit_request() is called on an spdk_bdev_io. 218 */ 219 uint64_t io_outstanding; 220 221 /* 222 * Queue of IO awaiting retry because of a previous NOMEM status returned 223 * on this channel. 224 */ 225 bdev_io_tailq_t nomem_io; 226 227 /* 228 * Threshold which io_outstanding must drop to before retrying nomem_io. 229 */ 230 uint64_t nomem_threshold; 231 232 /* I/O channel allocated by a bdev module */ 233 struct spdk_io_channel *shared_ch; 234 235 /* Refcount of bdev channels using this resource */ 236 uint32_t ref; 237 238 TAILQ_ENTRY(spdk_bdev_shared_resource) link; 239 }; 240 241 #define BDEV_CH_RESET_IN_PROGRESS (1 << 0) 242 #define BDEV_CH_QOS_ENABLED (1 << 1) 243 244 struct spdk_bdev_channel { 245 struct spdk_bdev *bdev; 246 247 /* The channel for the underlying device */ 248 struct spdk_io_channel *channel; 249 250 /* Per io_device per thread data */ 251 struct spdk_bdev_shared_resource *shared_resource; 252 253 struct spdk_bdev_io_stat stat; 254 255 /* 256 * Count of I/O submitted to the underlying dev module through this channel 257 * and waiting for completion. 258 */ 259 uint64_t io_outstanding; 260 261 /* 262 * List of spdk_bdev_io directly associated with a call to the public bdev API. 263 * It does not include any spdk_bdev_io that are generated via splitting. 264 */ 265 bdev_io_tailq_t io_submitted; 266 267 uint32_t flags; 268 269 struct spdk_histogram_data *histogram; 270 271 #ifdef SPDK_CONFIG_VTUNE 272 uint64_t start_tsc; 273 uint64_t interval_tsc; 274 __itt_string_handle *handle; 275 struct spdk_bdev_io_stat prev_stat; 276 #endif 277 278 bdev_io_tailq_t queued_resets; 279 }; 280 281 struct media_event_entry { 282 struct spdk_bdev_media_event event; 283 TAILQ_ENTRY(media_event_entry) tailq; 284 }; 285 286 #define MEDIA_EVENT_POOL_SIZE 64 287 288 struct spdk_bdev_desc { 289 struct spdk_bdev *bdev; 290 struct spdk_thread *thread; 291 struct { 292 bool open_with_ext; 293 union { 294 spdk_bdev_remove_cb_t remove_fn; 295 spdk_bdev_event_cb_t event_fn; 296 }; 297 void *ctx; 298 } callback; 299 bool closed; 300 bool write; 301 pthread_mutex_t mutex; 302 uint32_t refs; 303 TAILQ_HEAD(, media_event_entry) pending_media_events; 304 TAILQ_HEAD(, media_event_entry) free_media_events; 305 struct media_event_entry *media_events_buffer; 306 TAILQ_ENTRY(spdk_bdev_desc) link; 307 308 uint64_t timeout_in_sec; 309 spdk_bdev_io_timeout_cb cb_fn; 310 void *cb_arg; 311 struct spdk_poller *io_timeout_poller; 312 }; 313 314 struct spdk_bdev_iostat_ctx { 315 struct spdk_bdev_io_stat *stat; 316 spdk_bdev_get_device_stat_cb cb; 317 void *cb_arg; 318 }; 319 320 struct set_qos_limit_ctx { 321 void (*cb_fn)(void *cb_arg, int status); 322 void *cb_arg; 323 struct spdk_bdev *bdev; 324 }; 325 326 #define __bdev_to_io_dev(bdev) (((char *)bdev) + 1) 327 #define __bdev_from_io_dev(io_dev) ((struct spdk_bdev *)(((char *)io_dev) - 1)) 328 329 static void bdev_write_zero_buffer_done(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg); 330 static void bdev_write_zero_buffer_next(void *_bdev_io); 331 332 static void bdev_enable_qos_msg(struct spdk_io_channel_iter *i); 333 static void bdev_enable_qos_done(struct spdk_io_channel_iter *i, int status); 334 335 static int 336 bdev_readv_blocks_with_md(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 337 struct iovec *iov, int iovcnt, void *md_buf, uint64_t offset_blocks, 338 uint64_t num_blocks, spdk_bdev_io_completion_cb cb, void *cb_arg); 339 static int 340 bdev_writev_blocks_with_md(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 341 struct iovec *iov, int iovcnt, void *md_buf, 342 uint64_t offset_blocks, uint64_t num_blocks, 343 spdk_bdev_io_completion_cb cb, void *cb_arg); 344 345 void 346 spdk_bdev_get_opts(struct spdk_bdev_opts *opts) 347 { 348 *opts = g_bdev_opts; 349 } 350 351 int 352 spdk_bdev_set_opts(struct spdk_bdev_opts *opts) 353 { 354 uint32_t min_pool_size; 355 356 /* 357 * Add 1 to the thread count to account for the extra mgmt_ch that gets created during subsystem 358 * initialization. A second mgmt_ch will be created on the same thread when the application starts 359 * but before the deferred put_io_channel event is executed for the first mgmt_ch. 360 */ 361 min_pool_size = opts->bdev_io_cache_size * (spdk_thread_get_count() + 1); 362 if (opts->bdev_io_pool_size < min_pool_size) { 363 SPDK_ERRLOG("bdev_io_pool_size %" PRIu32 " is not compatible with bdev_io_cache_size %" PRIu32 364 " and %" PRIu32 " threads\n", opts->bdev_io_pool_size, opts->bdev_io_cache_size, 365 spdk_thread_get_count()); 366 SPDK_ERRLOG("bdev_io_pool_size must be at least %" PRIu32 "\n", min_pool_size); 367 return -1; 368 } 369 370 g_bdev_opts = *opts; 371 return 0; 372 } 373 374 struct spdk_bdev * 375 spdk_bdev_first(void) 376 { 377 struct spdk_bdev *bdev; 378 379 bdev = TAILQ_FIRST(&g_bdev_mgr.bdevs); 380 if (bdev) { 381 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Starting bdev iteration at %s\n", bdev->name); 382 } 383 384 return bdev; 385 } 386 387 struct spdk_bdev * 388 spdk_bdev_next(struct spdk_bdev *prev) 389 { 390 struct spdk_bdev *bdev; 391 392 bdev = TAILQ_NEXT(prev, internal.link); 393 if (bdev) { 394 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Continuing bdev iteration at %s\n", bdev->name); 395 } 396 397 return bdev; 398 } 399 400 static struct spdk_bdev * 401 _bdev_next_leaf(struct spdk_bdev *bdev) 402 { 403 while (bdev != NULL) { 404 if (bdev->internal.claim_module == NULL) { 405 return bdev; 406 } else { 407 bdev = TAILQ_NEXT(bdev, internal.link); 408 } 409 } 410 411 return bdev; 412 } 413 414 struct spdk_bdev * 415 spdk_bdev_first_leaf(void) 416 { 417 struct spdk_bdev *bdev; 418 419 bdev = _bdev_next_leaf(TAILQ_FIRST(&g_bdev_mgr.bdevs)); 420 421 if (bdev) { 422 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Starting bdev iteration at %s\n", bdev->name); 423 } 424 425 return bdev; 426 } 427 428 struct spdk_bdev * 429 spdk_bdev_next_leaf(struct spdk_bdev *prev) 430 { 431 struct spdk_bdev *bdev; 432 433 bdev = _bdev_next_leaf(TAILQ_NEXT(prev, internal.link)); 434 435 if (bdev) { 436 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Continuing bdev iteration at %s\n", bdev->name); 437 } 438 439 return bdev; 440 } 441 442 struct spdk_bdev * 443 spdk_bdev_get_by_name(const char *bdev_name) 444 { 445 struct spdk_bdev_alias *tmp; 446 struct spdk_bdev *bdev = spdk_bdev_first(); 447 448 while (bdev != NULL) { 449 if (strcmp(bdev_name, bdev->name) == 0) { 450 return bdev; 451 } 452 453 TAILQ_FOREACH(tmp, &bdev->aliases, tailq) { 454 if (strcmp(bdev_name, tmp->alias) == 0) { 455 return bdev; 456 } 457 } 458 459 bdev = spdk_bdev_next(bdev); 460 } 461 462 return NULL; 463 } 464 465 void 466 spdk_bdev_io_set_buf(struct spdk_bdev_io *bdev_io, void *buf, size_t len) 467 { 468 struct iovec *iovs; 469 470 if (bdev_io->u.bdev.iovs == NULL) { 471 bdev_io->u.bdev.iovs = &bdev_io->iov; 472 bdev_io->u.bdev.iovcnt = 1; 473 } 474 475 iovs = bdev_io->u.bdev.iovs; 476 477 assert(iovs != NULL); 478 assert(bdev_io->u.bdev.iovcnt >= 1); 479 480 iovs[0].iov_base = buf; 481 iovs[0].iov_len = len; 482 } 483 484 void 485 spdk_bdev_io_set_md_buf(struct spdk_bdev_io *bdev_io, void *md_buf, size_t len) 486 { 487 assert((len / spdk_bdev_get_md_size(bdev_io->bdev)) >= bdev_io->u.bdev.num_blocks); 488 bdev_io->u.bdev.md_buf = md_buf; 489 } 490 491 static bool 492 _is_buf_allocated(const struct iovec *iovs) 493 { 494 if (iovs == NULL) { 495 return false; 496 } 497 498 return iovs[0].iov_base != NULL; 499 } 500 501 static bool 502 _are_iovs_aligned(struct iovec *iovs, int iovcnt, uint32_t alignment) 503 { 504 int i; 505 uintptr_t iov_base; 506 507 if (spdk_likely(alignment == 1)) { 508 return true; 509 } 510 511 for (i = 0; i < iovcnt; i++) { 512 iov_base = (uintptr_t)iovs[i].iov_base; 513 if ((iov_base & (alignment - 1)) != 0) { 514 return false; 515 } 516 } 517 518 return true; 519 } 520 521 static void 522 _copy_iovs_to_buf(void *buf, size_t buf_len, struct iovec *iovs, int iovcnt) 523 { 524 int i; 525 size_t len; 526 527 for (i = 0; i < iovcnt; i++) { 528 len = spdk_min(iovs[i].iov_len, buf_len); 529 memcpy(buf, iovs[i].iov_base, len); 530 buf += len; 531 buf_len -= len; 532 } 533 } 534 535 static void 536 _copy_buf_to_iovs(struct iovec *iovs, int iovcnt, void *buf, size_t buf_len) 537 { 538 int i; 539 size_t len; 540 541 for (i = 0; i < iovcnt; i++) { 542 len = spdk_min(iovs[i].iov_len, buf_len); 543 memcpy(iovs[i].iov_base, buf, len); 544 buf += len; 545 buf_len -= len; 546 } 547 } 548 549 static void 550 _bdev_io_set_bounce_buf(struct spdk_bdev_io *bdev_io, void *buf, size_t len) 551 { 552 /* save original iovec */ 553 bdev_io->internal.orig_iovs = bdev_io->u.bdev.iovs; 554 bdev_io->internal.orig_iovcnt = bdev_io->u.bdev.iovcnt; 555 /* set bounce iov */ 556 bdev_io->u.bdev.iovs = &bdev_io->internal.bounce_iov; 557 bdev_io->u.bdev.iovcnt = 1; 558 /* set bounce buffer for this operation */ 559 bdev_io->u.bdev.iovs[0].iov_base = buf; 560 bdev_io->u.bdev.iovs[0].iov_len = len; 561 /* if this is write path, copy data from original buffer to bounce buffer */ 562 if (bdev_io->type == SPDK_BDEV_IO_TYPE_WRITE) { 563 _copy_iovs_to_buf(buf, len, bdev_io->internal.orig_iovs, bdev_io->internal.orig_iovcnt); 564 } 565 } 566 567 static void 568 _bdev_io_set_bounce_md_buf(struct spdk_bdev_io *bdev_io, void *md_buf, size_t len) 569 { 570 /* save original md_buf */ 571 bdev_io->internal.orig_md_buf = bdev_io->u.bdev.md_buf; 572 /* set bounce md_buf */ 573 bdev_io->u.bdev.md_buf = md_buf; 574 575 if (bdev_io->type == SPDK_BDEV_IO_TYPE_WRITE) { 576 memcpy(md_buf, bdev_io->internal.orig_md_buf, len); 577 } 578 } 579 580 static void 581 _bdev_io_set_buf(struct spdk_bdev_io *bdev_io, void *buf, uint64_t len) 582 { 583 struct spdk_bdev *bdev = bdev_io->bdev; 584 bool buf_allocated; 585 uint64_t md_len, alignment; 586 void *aligned_buf; 587 588 alignment = spdk_bdev_get_buf_align(bdev); 589 buf_allocated = _is_buf_allocated(bdev_io->u.bdev.iovs); 590 aligned_buf = (void *)(((uintptr_t)buf + (alignment - 1)) & ~(alignment - 1)); 591 592 if (buf_allocated) { 593 _bdev_io_set_bounce_buf(bdev_io, aligned_buf, len); 594 } else { 595 spdk_bdev_io_set_buf(bdev_io, aligned_buf, len); 596 } 597 598 if (spdk_bdev_is_md_separate(bdev)) { 599 aligned_buf = (char *)aligned_buf + len; 600 md_len = bdev_io->u.bdev.num_blocks * bdev->md_len; 601 602 assert(((uintptr_t)aligned_buf & (alignment - 1)) == 0); 603 604 if (bdev_io->u.bdev.md_buf != NULL) { 605 _bdev_io_set_bounce_md_buf(bdev_io, aligned_buf, md_len); 606 } else { 607 spdk_bdev_io_set_md_buf(bdev_io, aligned_buf, md_len); 608 } 609 } 610 611 bdev_io->internal.buf = buf; 612 bdev_io->internal.get_buf_cb(spdk_bdev_io_get_io_channel(bdev_io), bdev_io, true); 613 } 614 615 static void 616 bdev_io_put_buf(struct spdk_bdev_io *bdev_io) 617 { 618 struct spdk_bdev *bdev = bdev_io->bdev; 619 struct spdk_mempool *pool; 620 struct spdk_bdev_io *tmp; 621 bdev_io_stailq_t *stailq; 622 struct spdk_bdev_mgmt_channel *ch; 623 uint64_t buf_len, md_len, alignment; 624 void *buf; 625 626 buf = bdev_io->internal.buf; 627 buf_len = bdev_io->internal.buf_len; 628 md_len = spdk_bdev_is_md_separate(bdev) ? bdev_io->u.bdev.num_blocks * bdev->md_len : 0; 629 alignment = spdk_bdev_get_buf_align(bdev); 630 ch = bdev_io->internal.ch->shared_resource->mgmt_ch; 631 632 bdev_io->internal.buf = NULL; 633 634 if (buf_len + alignment + md_len <= SPDK_BDEV_BUF_SIZE_WITH_MD(SPDK_BDEV_SMALL_BUF_MAX_SIZE) + 635 SPDK_BDEV_POOL_ALIGNMENT) { 636 pool = g_bdev_mgr.buf_small_pool; 637 stailq = &ch->need_buf_small; 638 } else { 639 pool = g_bdev_mgr.buf_large_pool; 640 stailq = &ch->need_buf_large; 641 } 642 643 if (STAILQ_EMPTY(stailq)) { 644 spdk_mempool_put(pool, buf); 645 } else { 646 tmp = STAILQ_FIRST(stailq); 647 STAILQ_REMOVE_HEAD(stailq, internal.buf_link); 648 _bdev_io_set_buf(tmp, buf, tmp->internal.buf_len); 649 } 650 } 651 652 static void 653 _bdev_io_unset_bounce_buf(struct spdk_bdev_io *bdev_io) 654 { 655 if (spdk_likely(bdev_io->internal.orig_iovcnt == 0)) { 656 assert(bdev_io->internal.orig_md_buf == NULL); 657 return; 658 } 659 660 /* if this is read path, copy data from bounce buffer to original buffer */ 661 if (bdev_io->type == SPDK_BDEV_IO_TYPE_READ && 662 bdev_io->internal.status == SPDK_BDEV_IO_STATUS_SUCCESS) { 663 _copy_buf_to_iovs(bdev_io->internal.orig_iovs, 664 bdev_io->internal.orig_iovcnt, 665 bdev_io->internal.bounce_iov.iov_base, 666 bdev_io->internal.bounce_iov.iov_len); 667 } 668 /* set orignal buffer for this io */ 669 bdev_io->u.bdev.iovcnt = bdev_io->internal.orig_iovcnt; 670 bdev_io->u.bdev.iovs = bdev_io->internal.orig_iovs; 671 /* disable bouncing buffer for this io */ 672 bdev_io->internal.orig_iovcnt = 0; 673 bdev_io->internal.orig_iovs = NULL; 674 675 /* do the same for metadata buffer */ 676 if (spdk_unlikely(bdev_io->internal.orig_md_buf != NULL)) { 677 assert(spdk_bdev_is_md_separate(bdev_io->bdev)); 678 679 if (bdev_io->type == SPDK_BDEV_IO_TYPE_READ && 680 bdev_io->internal.status == SPDK_BDEV_IO_STATUS_SUCCESS) { 681 memcpy(bdev_io->internal.orig_md_buf, bdev_io->u.bdev.md_buf, 682 bdev_io->u.bdev.num_blocks * spdk_bdev_get_md_size(bdev_io->bdev)); 683 } 684 685 bdev_io->u.bdev.md_buf = bdev_io->internal.orig_md_buf; 686 bdev_io->internal.orig_md_buf = NULL; 687 } 688 689 bdev_io_put_buf(bdev_io); 690 } 691 692 void 693 spdk_bdev_io_get_buf(struct spdk_bdev_io *bdev_io, spdk_bdev_io_get_buf_cb cb, uint64_t len) 694 { 695 struct spdk_bdev *bdev = bdev_io->bdev; 696 struct spdk_mempool *pool; 697 bdev_io_stailq_t *stailq; 698 struct spdk_bdev_mgmt_channel *mgmt_ch; 699 uint64_t alignment, md_len; 700 void *buf; 701 702 assert(cb != NULL); 703 704 alignment = spdk_bdev_get_buf_align(bdev); 705 md_len = spdk_bdev_is_md_separate(bdev) ? bdev_io->u.bdev.num_blocks * bdev->md_len : 0; 706 707 if (_is_buf_allocated(bdev_io->u.bdev.iovs) && 708 _are_iovs_aligned(bdev_io->u.bdev.iovs, bdev_io->u.bdev.iovcnt, alignment)) { 709 /* Buffer already present and aligned */ 710 cb(spdk_bdev_io_get_io_channel(bdev_io), bdev_io, true); 711 return; 712 } 713 714 if (len + alignment + md_len > SPDK_BDEV_BUF_SIZE_WITH_MD(SPDK_BDEV_LARGE_BUF_MAX_SIZE) + 715 SPDK_BDEV_POOL_ALIGNMENT) { 716 SPDK_ERRLOG("Length + alignment %" PRIu64 " is larger than allowed\n", 717 len + alignment); 718 cb(spdk_bdev_io_get_io_channel(bdev_io), bdev_io, false); 719 return; 720 } 721 722 mgmt_ch = bdev_io->internal.ch->shared_resource->mgmt_ch; 723 724 bdev_io->internal.buf_len = len; 725 bdev_io->internal.get_buf_cb = cb; 726 727 if (len + alignment + md_len <= SPDK_BDEV_BUF_SIZE_WITH_MD(SPDK_BDEV_SMALL_BUF_MAX_SIZE) + 728 SPDK_BDEV_POOL_ALIGNMENT) { 729 pool = g_bdev_mgr.buf_small_pool; 730 stailq = &mgmt_ch->need_buf_small; 731 } else { 732 pool = g_bdev_mgr.buf_large_pool; 733 stailq = &mgmt_ch->need_buf_large; 734 } 735 736 buf = spdk_mempool_get(pool); 737 if (!buf) { 738 STAILQ_INSERT_TAIL(stailq, bdev_io, internal.buf_link); 739 } else { 740 _bdev_io_set_buf(bdev_io, buf, len); 741 } 742 } 743 744 static int 745 bdev_module_get_max_ctx_size(void) 746 { 747 struct spdk_bdev_module *bdev_module; 748 int max_bdev_module_size = 0; 749 750 TAILQ_FOREACH(bdev_module, &g_bdev_mgr.bdev_modules, internal.tailq) { 751 if (bdev_module->get_ctx_size && bdev_module->get_ctx_size() > max_bdev_module_size) { 752 max_bdev_module_size = bdev_module->get_ctx_size(); 753 } 754 } 755 756 return max_bdev_module_size; 757 } 758 759 void 760 spdk_bdev_config_text(FILE *fp) 761 { 762 struct spdk_bdev_module *bdev_module; 763 764 TAILQ_FOREACH(bdev_module, &g_bdev_mgr.bdev_modules, internal.tailq) { 765 if (bdev_module->config_text) { 766 bdev_module->config_text(fp); 767 } 768 } 769 } 770 771 static void 772 bdev_qos_config_json(struct spdk_bdev *bdev, struct spdk_json_write_ctx *w) 773 { 774 int i; 775 struct spdk_bdev_qos *qos = bdev->internal.qos; 776 uint64_t limits[SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES]; 777 778 if (!qos) { 779 return; 780 } 781 782 spdk_bdev_get_qos_rate_limits(bdev, limits); 783 784 spdk_json_write_object_begin(w); 785 spdk_json_write_named_string(w, "method", "bdev_set_qos_limit"); 786 787 spdk_json_write_named_object_begin(w, "params"); 788 spdk_json_write_named_string(w, "name", bdev->name); 789 for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) { 790 if (limits[i] > 0) { 791 spdk_json_write_named_uint64(w, qos_rpc_type[i], limits[i]); 792 } 793 } 794 spdk_json_write_object_end(w); 795 796 spdk_json_write_object_end(w); 797 } 798 799 void 800 spdk_bdev_subsystem_config_json(struct spdk_json_write_ctx *w) 801 { 802 struct spdk_bdev_module *bdev_module; 803 struct spdk_bdev *bdev; 804 805 assert(w != NULL); 806 807 spdk_json_write_array_begin(w); 808 809 spdk_json_write_object_begin(w); 810 spdk_json_write_named_string(w, "method", "bdev_set_options"); 811 spdk_json_write_named_object_begin(w, "params"); 812 spdk_json_write_named_uint32(w, "bdev_io_pool_size", g_bdev_opts.bdev_io_pool_size); 813 spdk_json_write_named_uint32(w, "bdev_io_cache_size", g_bdev_opts.bdev_io_cache_size); 814 spdk_json_write_object_end(w); 815 spdk_json_write_object_end(w); 816 817 TAILQ_FOREACH(bdev_module, &g_bdev_mgr.bdev_modules, internal.tailq) { 818 if (bdev_module->config_json) { 819 bdev_module->config_json(w); 820 } 821 } 822 823 pthread_mutex_lock(&g_bdev_mgr.mutex); 824 825 TAILQ_FOREACH(bdev, &g_bdev_mgr.bdevs, internal.link) { 826 if (bdev->fn_table->write_config_json) { 827 bdev->fn_table->write_config_json(bdev, w); 828 } 829 830 bdev_qos_config_json(bdev, w); 831 } 832 833 pthread_mutex_unlock(&g_bdev_mgr.mutex); 834 835 spdk_json_write_array_end(w); 836 } 837 838 static int 839 bdev_mgmt_channel_create(void *io_device, void *ctx_buf) 840 { 841 struct spdk_bdev_mgmt_channel *ch = ctx_buf; 842 struct spdk_bdev_io *bdev_io; 843 uint32_t i; 844 845 STAILQ_INIT(&ch->need_buf_small); 846 STAILQ_INIT(&ch->need_buf_large); 847 848 STAILQ_INIT(&ch->per_thread_cache); 849 ch->bdev_io_cache_size = g_bdev_opts.bdev_io_cache_size; 850 851 /* Pre-populate bdev_io cache to ensure this thread cannot be starved. */ 852 ch->per_thread_cache_count = 0; 853 for (i = 0; i < ch->bdev_io_cache_size; i++) { 854 bdev_io = spdk_mempool_get(g_bdev_mgr.bdev_io_pool); 855 assert(bdev_io != NULL); 856 ch->per_thread_cache_count++; 857 STAILQ_INSERT_HEAD(&ch->per_thread_cache, bdev_io, internal.buf_link); 858 } 859 860 TAILQ_INIT(&ch->shared_resources); 861 TAILQ_INIT(&ch->io_wait_queue); 862 863 return 0; 864 } 865 866 static void 867 bdev_mgmt_channel_destroy(void *io_device, void *ctx_buf) 868 { 869 struct spdk_bdev_mgmt_channel *ch = ctx_buf; 870 struct spdk_bdev_io *bdev_io; 871 872 if (!STAILQ_EMPTY(&ch->need_buf_small) || !STAILQ_EMPTY(&ch->need_buf_large)) { 873 SPDK_ERRLOG("Pending I/O list wasn't empty on mgmt channel free\n"); 874 } 875 876 if (!TAILQ_EMPTY(&ch->shared_resources)) { 877 SPDK_ERRLOG("Module channel list wasn't empty on mgmt channel free\n"); 878 } 879 880 while (!STAILQ_EMPTY(&ch->per_thread_cache)) { 881 bdev_io = STAILQ_FIRST(&ch->per_thread_cache); 882 STAILQ_REMOVE_HEAD(&ch->per_thread_cache, internal.buf_link); 883 ch->per_thread_cache_count--; 884 spdk_mempool_put(g_bdev_mgr.bdev_io_pool, (void *)bdev_io); 885 } 886 887 assert(ch->per_thread_cache_count == 0); 888 } 889 890 static void 891 bdev_init_complete(int rc) 892 { 893 spdk_bdev_init_cb cb_fn = g_init_cb_fn; 894 void *cb_arg = g_init_cb_arg; 895 struct spdk_bdev_module *m; 896 897 g_bdev_mgr.init_complete = true; 898 g_init_cb_fn = NULL; 899 g_init_cb_arg = NULL; 900 901 /* 902 * For modules that need to know when subsystem init is complete, 903 * inform them now. 904 */ 905 if (rc == 0) { 906 TAILQ_FOREACH(m, &g_bdev_mgr.bdev_modules, internal.tailq) { 907 if (m->init_complete) { 908 m->init_complete(); 909 } 910 } 911 } 912 913 cb_fn(cb_arg, rc); 914 } 915 916 static void 917 bdev_module_action_complete(void) 918 { 919 struct spdk_bdev_module *m; 920 921 /* 922 * Don't finish bdev subsystem initialization if 923 * module pre-initialization is still in progress, or 924 * the subsystem been already initialized. 925 */ 926 if (!g_bdev_mgr.module_init_complete || g_bdev_mgr.init_complete) { 927 return; 928 } 929 930 /* 931 * Check all bdev modules for inits/examinations in progress. If any 932 * exist, return immediately since we cannot finish bdev subsystem 933 * initialization until all are completed. 934 */ 935 TAILQ_FOREACH(m, &g_bdev_mgr.bdev_modules, internal.tailq) { 936 if (m->internal.action_in_progress > 0) { 937 return; 938 } 939 } 940 941 /* 942 * Modules already finished initialization - now that all 943 * the bdev modules have finished their asynchronous I/O 944 * processing, the entire bdev layer can be marked as complete. 945 */ 946 bdev_init_complete(0); 947 } 948 949 static void 950 bdev_module_action_done(struct spdk_bdev_module *module) 951 { 952 assert(module->internal.action_in_progress > 0); 953 module->internal.action_in_progress--; 954 bdev_module_action_complete(); 955 } 956 957 void 958 spdk_bdev_module_init_done(struct spdk_bdev_module *module) 959 { 960 bdev_module_action_done(module); 961 } 962 963 void 964 spdk_bdev_module_examine_done(struct spdk_bdev_module *module) 965 { 966 bdev_module_action_done(module); 967 } 968 969 /** The last initialized bdev module */ 970 static struct spdk_bdev_module *g_resume_bdev_module = NULL; 971 972 static void 973 bdev_init_failed(void *cb_arg) 974 { 975 struct spdk_bdev_module *module = cb_arg; 976 977 module->internal.action_in_progress--; 978 bdev_init_complete(-1); 979 } 980 981 static int 982 bdev_modules_init(void) 983 { 984 struct spdk_bdev_module *module; 985 int rc = 0; 986 987 TAILQ_FOREACH(module, &g_bdev_mgr.bdev_modules, internal.tailq) { 988 g_resume_bdev_module = module; 989 if (module->async_init) { 990 module->internal.action_in_progress = 1; 991 } 992 rc = module->module_init(); 993 if (rc != 0) { 994 /* Bump action_in_progress to prevent other modules from completion of modules_init 995 * Send message to defer application shutdown until resources are cleaned up */ 996 module->internal.action_in_progress = 1; 997 spdk_thread_send_msg(spdk_get_thread(), bdev_init_failed, module); 998 return rc; 999 } 1000 } 1001 1002 g_resume_bdev_module = NULL; 1003 return 0; 1004 } 1005 1006 void 1007 spdk_bdev_initialize(spdk_bdev_init_cb cb_fn, void *cb_arg) 1008 { 1009 struct spdk_conf_section *sp; 1010 struct spdk_bdev_opts bdev_opts; 1011 int32_t bdev_io_pool_size, bdev_io_cache_size; 1012 int cache_size; 1013 int rc = 0; 1014 char mempool_name[32]; 1015 1016 assert(cb_fn != NULL); 1017 1018 sp = spdk_conf_find_section(NULL, "Bdev"); 1019 if (sp != NULL) { 1020 spdk_bdev_get_opts(&bdev_opts); 1021 1022 bdev_io_pool_size = spdk_conf_section_get_intval(sp, "BdevIoPoolSize"); 1023 if (bdev_io_pool_size >= 0) { 1024 bdev_opts.bdev_io_pool_size = bdev_io_pool_size; 1025 } 1026 1027 bdev_io_cache_size = spdk_conf_section_get_intval(sp, "BdevIoCacheSize"); 1028 if (bdev_io_cache_size >= 0) { 1029 bdev_opts.bdev_io_cache_size = bdev_io_cache_size; 1030 } 1031 1032 if (spdk_bdev_set_opts(&bdev_opts)) { 1033 bdev_init_complete(-1); 1034 return; 1035 } 1036 1037 assert(memcmp(&bdev_opts, &g_bdev_opts, sizeof(bdev_opts)) == 0); 1038 } 1039 1040 g_init_cb_fn = cb_fn; 1041 g_init_cb_arg = cb_arg; 1042 1043 spdk_notify_type_register("bdev_register"); 1044 spdk_notify_type_register("bdev_unregister"); 1045 1046 snprintf(mempool_name, sizeof(mempool_name), "bdev_io_%d", getpid()); 1047 1048 g_bdev_mgr.bdev_io_pool = spdk_mempool_create(mempool_name, 1049 g_bdev_opts.bdev_io_pool_size, 1050 sizeof(struct spdk_bdev_io) + 1051 bdev_module_get_max_ctx_size(), 1052 0, 1053 SPDK_ENV_SOCKET_ID_ANY); 1054 1055 if (g_bdev_mgr.bdev_io_pool == NULL) { 1056 SPDK_ERRLOG("could not allocate spdk_bdev_io pool\n"); 1057 bdev_init_complete(-1); 1058 return; 1059 } 1060 1061 /** 1062 * Ensure no more than half of the total buffers end up local caches, by 1063 * using spdk_thread_get_count() to determine how many local caches we need 1064 * to account for. 1065 */ 1066 cache_size = BUF_SMALL_POOL_SIZE / (2 * spdk_thread_get_count()); 1067 snprintf(mempool_name, sizeof(mempool_name), "buf_small_pool_%d", getpid()); 1068 1069 g_bdev_mgr.buf_small_pool = spdk_mempool_create(mempool_name, 1070 BUF_SMALL_POOL_SIZE, 1071 SPDK_BDEV_BUF_SIZE_WITH_MD(SPDK_BDEV_SMALL_BUF_MAX_SIZE) + 1072 SPDK_BDEV_POOL_ALIGNMENT, 1073 cache_size, 1074 SPDK_ENV_SOCKET_ID_ANY); 1075 if (!g_bdev_mgr.buf_small_pool) { 1076 SPDK_ERRLOG("create rbuf small pool failed\n"); 1077 bdev_init_complete(-1); 1078 return; 1079 } 1080 1081 cache_size = BUF_LARGE_POOL_SIZE / (2 * spdk_thread_get_count()); 1082 snprintf(mempool_name, sizeof(mempool_name), "buf_large_pool_%d", getpid()); 1083 1084 g_bdev_mgr.buf_large_pool = spdk_mempool_create(mempool_name, 1085 BUF_LARGE_POOL_SIZE, 1086 SPDK_BDEV_BUF_SIZE_WITH_MD(SPDK_BDEV_LARGE_BUF_MAX_SIZE) + 1087 SPDK_BDEV_POOL_ALIGNMENT, 1088 cache_size, 1089 SPDK_ENV_SOCKET_ID_ANY); 1090 if (!g_bdev_mgr.buf_large_pool) { 1091 SPDK_ERRLOG("create rbuf large pool failed\n"); 1092 bdev_init_complete(-1); 1093 return; 1094 } 1095 1096 g_bdev_mgr.zero_buffer = spdk_zmalloc(ZERO_BUFFER_SIZE, ZERO_BUFFER_SIZE, 1097 NULL, SPDK_ENV_LCORE_ID_ANY, SPDK_MALLOC_DMA); 1098 if (!g_bdev_mgr.zero_buffer) { 1099 SPDK_ERRLOG("create bdev zero buffer failed\n"); 1100 bdev_init_complete(-1); 1101 return; 1102 } 1103 1104 #ifdef SPDK_CONFIG_VTUNE 1105 g_bdev_mgr.domain = __itt_domain_create("spdk_bdev"); 1106 #endif 1107 1108 spdk_io_device_register(&g_bdev_mgr, bdev_mgmt_channel_create, 1109 bdev_mgmt_channel_destroy, 1110 sizeof(struct spdk_bdev_mgmt_channel), 1111 "bdev_mgr"); 1112 1113 rc = bdev_modules_init(); 1114 g_bdev_mgr.module_init_complete = true; 1115 if (rc != 0) { 1116 SPDK_ERRLOG("bdev modules init failed\n"); 1117 return; 1118 } 1119 1120 bdev_module_action_complete(); 1121 } 1122 1123 static void 1124 bdev_mgr_unregister_cb(void *io_device) 1125 { 1126 spdk_bdev_fini_cb cb_fn = g_fini_cb_fn; 1127 1128 if (spdk_mempool_count(g_bdev_mgr.bdev_io_pool) != g_bdev_opts.bdev_io_pool_size) { 1129 SPDK_ERRLOG("bdev IO pool count is %zu but should be %u\n", 1130 spdk_mempool_count(g_bdev_mgr.bdev_io_pool), 1131 g_bdev_opts.bdev_io_pool_size); 1132 } 1133 1134 if (spdk_mempool_count(g_bdev_mgr.buf_small_pool) != BUF_SMALL_POOL_SIZE) { 1135 SPDK_ERRLOG("Small buffer pool count is %zu but should be %u\n", 1136 spdk_mempool_count(g_bdev_mgr.buf_small_pool), 1137 BUF_SMALL_POOL_SIZE); 1138 assert(false); 1139 } 1140 1141 if (spdk_mempool_count(g_bdev_mgr.buf_large_pool) != BUF_LARGE_POOL_SIZE) { 1142 SPDK_ERRLOG("Large buffer pool count is %zu but should be %u\n", 1143 spdk_mempool_count(g_bdev_mgr.buf_large_pool), 1144 BUF_LARGE_POOL_SIZE); 1145 assert(false); 1146 } 1147 1148 spdk_mempool_free(g_bdev_mgr.bdev_io_pool); 1149 spdk_mempool_free(g_bdev_mgr.buf_small_pool); 1150 spdk_mempool_free(g_bdev_mgr.buf_large_pool); 1151 spdk_free(g_bdev_mgr.zero_buffer); 1152 1153 cb_fn(g_fini_cb_arg); 1154 g_fini_cb_fn = NULL; 1155 g_fini_cb_arg = NULL; 1156 g_bdev_mgr.init_complete = false; 1157 g_bdev_mgr.module_init_complete = false; 1158 pthread_mutex_destroy(&g_bdev_mgr.mutex); 1159 } 1160 1161 static void 1162 bdev_module_finish_iter(void *arg) 1163 { 1164 struct spdk_bdev_module *bdev_module; 1165 1166 /* FIXME: Handling initialization failures is broken now, 1167 * so we won't even try cleaning up after successfully 1168 * initialized modules. if module_init_complete is false, 1169 * just call spdk_bdev_mgr_unregister_cb 1170 */ 1171 if (!g_bdev_mgr.module_init_complete) { 1172 bdev_mgr_unregister_cb(NULL); 1173 return; 1174 } 1175 1176 /* Start iterating from the last touched module */ 1177 if (!g_resume_bdev_module) { 1178 bdev_module = TAILQ_LAST(&g_bdev_mgr.bdev_modules, bdev_module_list); 1179 } else { 1180 bdev_module = TAILQ_PREV(g_resume_bdev_module, bdev_module_list, 1181 internal.tailq); 1182 } 1183 1184 while (bdev_module) { 1185 if (bdev_module->async_fini) { 1186 /* Save our place so we can resume later. We must 1187 * save the variable here, before calling module_fini() 1188 * below, because in some cases the module may immediately 1189 * call spdk_bdev_module_finish_done() and re-enter 1190 * this function to continue iterating. */ 1191 g_resume_bdev_module = bdev_module; 1192 } 1193 1194 if (bdev_module->module_fini) { 1195 bdev_module->module_fini(); 1196 } 1197 1198 if (bdev_module->async_fini) { 1199 return; 1200 } 1201 1202 bdev_module = TAILQ_PREV(bdev_module, bdev_module_list, 1203 internal.tailq); 1204 } 1205 1206 g_resume_bdev_module = NULL; 1207 spdk_io_device_unregister(&g_bdev_mgr, bdev_mgr_unregister_cb); 1208 } 1209 1210 void 1211 spdk_bdev_module_finish_done(void) 1212 { 1213 if (spdk_get_thread() != g_fini_thread) { 1214 spdk_thread_send_msg(g_fini_thread, bdev_module_finish_iter, NULL); 1215 } else { 1216 bdev_module_finish_iter(NULL); 1217 } 1218 } 1219 1220 static void 1221 bdev_finish_unregister_bdevs_iter(void *cb_arg, int bdeverrno) 1222 { 1223 struct spdk_bdev *bdev = cb_arg; 1224 1225 if (bdeverrno && bdev) { 1226 SPDK_WARNLOG("Unable to unregister bdev '%s' during spdk_bdev_finish()\n", 1227 bdev->name); 1228 1229 /* 1230 * Since the call to spdk_bdev_unregister() failed, we have no way to free this 1231 * bdev; try to continue by manually removing this bdev from the list and continue 1232 * with the next bdev in the list. 1233 */ 1234 TAILQ_REMOVE(&g_bdev_mgr.bdevs, bdev, internal.link); 1235 } 1236 1237 if (TAILQ_EMPTY(&g_bdev_mgr.bdevs)) { 1238 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Done unregistering bdevs\n"); 1239 /* 1240 * Bdev module finish need to be deferred as we might be in the middle of some context 1241 * (like bdev part free) that will use this bdev (or private bdev driver ctx data) 1242 * after returning. 1243 */ 1244 spdk_thread_send_msg(spdk_get_thread(), bdev_module_finish_iter, NULL); 1245 return; 1246 } 1247 1248 /* 1249 * Unregister last unclaimed bdev in the list, to ensure that bdev subsystem 1250 * shutdown proceeds top-down. The goal is to give virtual bdevs an opportunity 1251 * to detect clean shutdown as opposed to run-time hot removal of the underlying 1252 * base bdevs. 1253 * 1254 * Also, walk the list in the reverse order. 1255 */ 1256 for (bdev = TAILQ_LAST(&g_bdev_mgr.bdevs, spdk_bdev_list); 1257 bdev; bdev = TAILQ_PREV(bdev, spdk_bdev_list, internal.link)) { 1258 if (bdev->internal.claim_module != NULL) { 1259 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Skipping claimed bdev '%s'(<-'%s').\n", 1260 bdev->name, bdev->internal.claim_module->name); 1261 continue; 1262 } 1263 1264 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Unregistering bdev '%s'\n", bdev->name); 1265 spdk_bdev_unregister(bdev, bdev_finish_unregister_bdevs_iter, bdev); 1266 return; 1267 } 1268 1269 /* 1270 * If any bdev fails to unclaim underlying bdev properly, we may face the 1271 * case of bdev list consisting of claimed bdevs only (if claims are managed 1272 * correctly, this would mean there's a loop in the claims graph which is 1273 * clearly impossible). Warn and unregister last bdev on the list then. 1274 */ 1275 for (bdev = TAILQ_LAST(&g_bdev_mgr.bdevs, spdk_bdev_list); 1276 bdev; bdev = TAILQ_PREV(bdev, spdk_bdev_list, internal.link)) { 1277 SPDK_WARNLOG("Unregistering claimed bdev '%s'!\n", bdev->name); 1278 spdk_bdev_unregister(bdev, bdev_finish_unregister_bdevs_iter, bdev); 1279 return; 1280 } 1281 } 1282 1283 void 1284 spdk_bdev_finish(spdk_bdev_fini_cb cb_fn, void *cb_arg) 1285 { 1286 struct spdk_bdev_module *m; 1287 1288 assert(cb_fn != NULL); 1289 1290 g_fini_thread = spdk_get_thread(); 1291 1292 g_fini_cb_fn = cb_fn; 1293 g_fini_cb_arg = cb_arg; 1294 1295 TAILQ_FOREACH(m, &g_bdev_mgr.bdev_modules, internal.tailq) { 1296 if (m->fini_start) { 1297 m->fini_start(); 1298 } 1299 } 1300 1301 bdev_finish_unregister_bdevs_iter(NULL, 0); 1302 } 1303 1304 struct spdk_bdev_io * 1305 bdev_channel_get_io(struct spdk_bdev_channel *channel) 1306 { 1307 struct spdk_bdev_mgmt_channel *ch = channel->shared_resource->mgmt_ch; 1308 struct spdk_bdev_io *bdev_io; 1309 1310 if (ch->per_thread_cache_count > 0) { 1311 bdev_io = STAILQ_FIRST(&ch->per_thread_cache); 1312 STAILQ_REMOVE_HEAD(&ch->per_thread_cache, internal.buf_link); 1313 ch->per_thread_cache_count--; 1314 } else if (spdk_unlikely(!TAILQ_EMPTY(&ch->io_wait_queue))) { 1315 /* 1316 * Don't try to look for bdev_ios in the global pool if there are 1317 * waiters on bdev_ios - we don't want this caller to jump the line. 1318 */ 1319 bdev_io = NULL; 1320 } else { 1321 bdev_io = spdk_mempool_get(g_bdev_mgr.bdev_io_pool); 1322 } 1323 1324 return bdev_io; 1325 } 1326 1327 void 1328 spdk_bdev_free_io(struct spdk_bdev_io *bdev_io) 1329 { 1330 struct spdk_bdev_mgmt_channel *ch; 1331 1332 assert(bdev_io != NULL); 1333 assert(bdev_io->internal.status != SPDK_BDEV_IO_STATUS_PENDING); 1334 1335 ch = bdev_io->internal.ch->shared_resource->mgmt_ch; 1336 1337 if (bdev_io->internal.buf != NULL) { 1338 bdev_io_put_buf(bdev_io); 1339 } 1340 1341 if (ch->per_thread_cache_count < ch->bdev_io_cache_size) { 1342 ch->per_thread_cache_count++; 1343 STAILQ_INSERT_HEAD(&ch->per_thread_cache, bdev_io, internal.buf_link); 1344 while (ch->per_thread_cache_count > 0 && !TAILQ_EMPTY(&ch->io_wait_queue)) { 1345 struct spdk_bdev_io_wait_entry *entry; 1346 1347 entry = TAILQ_FIRST(&ch->io_wait_queue); 1348 TAILQ_REMOVE(&ch->io_wait_queue, entry, link); 1349 entry->cb_fn(entry->cb_arg); 1350 } 1351 } else { 1352 /* We should never have a full cache with entries on the io wait queue. */ 1353 assert(TAILQ_EMPTY(&ch->io_wait_queue)); 1354 spdk_mempool_put(g_bdev_mgr.bdev_io_pool, (void *)bdev_io); 1355 } 1356 } 1357 1358 static bool 1359 bdev_qos_is_iops_rate_limit(enum spdk_bdev_qos_rate_limit_type limit) 1360 { 1361 assert(limit != SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES); 1362 1363 switch (limit) { 1364 case SPDK_BDEV_QOS_RW_IOPS_RATE_LIMIT: 1365 return true; 1366 case SPDK_BDEV_QOS_RW_BPS_RATE_LIMIT: 1367 case SPDK_BDEV_QOS_R_BPS_RATE_LIMIT: 1368 case SPDK_BDEV_QOS_W_BPS_RATE_LIMIT: 1369 return false; 1370 case SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES: 1371 default: 1372 return false; 1373 } 1374 } 1375 1376 static bool 1377 bdev_qos_io_to_limit(struct spdk_bdev_io *bdev_io) 1378 { 1379 switch (bdev_io->type) { 1380 case SPDK_BDEV_IO_TYPE_NVME_IO: 1381 case SPDK_BDEV_IO_TYPE_NVME_IO_MD: 1382 case SPDK_BDEV_IO_TYPE_READ: 1383 case SPDK_BDEV_IO_TYPE_WRITE: 1384 return true; 1385 default: 1386 return false; 1387 } 1388 } 1389 1390 static bool 1391 bdev_is_read_io(struct spdk_bdev_io *bdev_io) 1392 { 1393 switch (bdev_io->type) { 1394 case SPDK_BDEV_IO_TYPE_NVME_IO: 1395 case SPDK_BDEV_IO_TYPE_NVME_IO_MD: 1396 /* Bit 1 (0x2) set for read operation */ 1397 if (bdev_io->u.nvme_passthru.cmd.opc & SPDK_NVME_OPC_READ) { 1398 return true; 1399 } else { 1400 return false; 1401 } 1402 case SPDK_BDEV_IO_TYPE_READ: 1403 return true; 1404 default: 1405 return false; 1406 } 1407 } 1408 1409 static uint64_t 1410 bdev_get_io_size_in_byte(struct spdk_bdev_io *bdev_io) 1411 { 1412 struct spdk_bdev *bdev = bdev_io->bdev; 1413 1414 switch (bdev_io->type) { 1415 case SPDK_BDEV_IO_TYPE_NVME_IO: 1416 case SPDK_BDEV_IO_TYPE_NVME_IO_MD: 1417 return bdev_io->u.nvme_passthru.nbytes; 1418 case SPDK_BDEV_IO_TYPE_READ: 1419 case SPDK_BDEV_IO_TYPE_WRITE: 1420 return bdev_io->u.bdev.num_blocks * bdev->blocklen; 1421 default: 1422 return 0; 1423 } 1424 } 1425 1426 static bool 1427 bdev_qos_rw_queue_io(const struct spdk_bdev_qos_limit *limit, struct spdk_bdev_io *io) 1428 { 1429 if (limit->max_per_timeslice > 0 && limit->remaining_this_timeslice <= 0) { 1430 return true; 1431 } else { 1432 return false; 1433 } 1434 } 1435 1436 static bool 1437 bdev_qos_r_queue_io(const struct spdk_bdev_qos_limit *limit, struct spdk_bdev_io *io) 1438 { 1439 if (bdev_is_read_io(io) == false) { 1440 return false; 1441 } 1442 1443 return bdev_qos_rw_queue_io(limit, io); 1444 } 1445 1446 static bool 1447 bdev_qos_w_queue_io(const struct spdk_bdev_qos_limit *limit, struct spdk_bdev_io *io) 1448 { 1449 if (bdev_is_read_io(io) == true) { 1450 return false; 1451 } 1452 1453 return bdev_qos_rw_queue_io(limit, io); 1454 } 1455 1456 static void 1457 bdev_qos_rw_iops_update_quota(struct spdk_bdev_qos_limit *limit, struct spdk_bdev_io *io) 1458 { 1459 limit->remaining_this_timeslice--; 1460 } 1461 1462 static void 1463 bdev_qos_rw_bps_update_quota(struct spdk_bdev_qos_limit *limit, struct spdk_bdev_io *io) 1464 { 1465 limit->remaining_this_timeslice -= bdev_get_io_size_in_byte(io); 1466 } 1467 1468 static void 1469 bdev_qos_r_bps_update_quota(struct spdk_bdev_qos_limit *limit, struct spdk_bdev_io *io) 1470 { 1471 if (bdev_is_read_io(io) == false) { 1472 return; 1473 } 1474 1475 return bdev_qos_rw_bps_update_quota(limit, io); 1476 } 1477 1478 static void 1479 bdev_qos_w_bps_update_quota(struct spdk_bdev_qos_limit *limit, struct spdk_bdev_io *io) 1480 { 1481 if (bdev_is_read_io(io) == true) { 1482 return; 1483 } 1484 1485 return bdev_qos_rw_bps_update_quota(limit, io); 1486 } 1487 1488 static void 1489 bdev_qos_set_ops(struct spdk_bdev_qos *qos) 1490 { 1491 int i; 1492 1493 for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) { 1494 if (qos->rate_limits[i].limit == SPDK_BDEV_QOS_LIMIT_NOT_DEFINED) { 1495 qos->rate_limits[i].queue_io = NULL; 1496 qos->rate_limits[i].update_quota = NULL; 1497 continue; 1498 } 1499 1500 switch (i) { 1501 case SPDK_BDEV_QOS_RW_IOPS_RATE_LIMIT: 1502 qos->rate_limits[i].queue_io = bdev_qos_rw_queue_io; 1503 qos->rate_limits[i].update_quota = bdev_qos_rw_iops_update_quota; 1504 break; 1505 case SPDK_BDEV_QOS_RW_BPS_RATE_LIMIT: 1506 qos->rate_limits[i].queue_io = bdev_qos_rw_queue_io; 1507 qos->rate_limits[i].update_quota = bdev_qos_rw_bps_update_quota; 1508 break; 1509 case SPDK_BDEV_QOS_R_BPS_RATE_LIMIT: 1510 qos->rate_limits[i].queue_io = bdev_qos_r_queue_io; 1511 qos->rate_limits[i].update_quota = bdev_qos_r_bps_update_quota; 1512 break; 1513 case SPDK_BDEV_QOS_W_BPS_RATE_LIMIT: 1514 qos->rate_limits[i].queue_io = bdev_qos_w_queue_io; 1515 qos->rate_limits[i].update_quota = bdev_qos_w_bps_update_quota; 1516 break; 1517 default: 1518 break; 1519 } 1520 } 1521 } 1522 1523 static inline void 1524 bdev_io_do_submit(struct spdk_bdev_channel *bdev_ch, struct spdk_bdev_io *bdev_io) 1525 { 1526 struct spdk_bdev *bdev = bdev_io->bdev; 1527 struct spdk_io_channel *ch = bdev_ch->channel; 1528 struct spdk_bdev_shared_resource *shared_resource = bdev_ch->shared_resource; 1529 1530 if (spdk_likely(TAILQ_EMPTY(&shared_resource->nomem_io))) { 1531 bdev_ch->io_outstanding++; 1532 shared_resource->io_outstanding++; 1533 bdev_io->internal.in_submit_request = true; 1534 bdev->fn_table->submit_request(ch, bdev_io); 1535 bdev_io->internal.in_submit_request = false; 1536 } else { 1537 TAILQ_INSERT_TAIL(&shared_resource->nomem_io, bdev_io, internal.link); 1538 } 1539 } 1540 1541 static int 1542 bdev_qos_io_submit(struct spdk_bdev_channel *ch, struct spdk_bdev_qos *qos) 1543 { 1544 struct spdk_bdev_io *bdev_io = NULL, *tmp = NULL; 1545 int i, submitted_ios = 0; 1546 1547 TAILQ_FOREACH_SAFE(bdev_io, &qos->queued, internal.link, tmp) { 1548 if (bdev_qos_io_to_limit(bdev_io) == true) { 1549 for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) { 1550 if (!qos->rate_limits[i].queue_io) { 1551 continue; 1552 } 1553 1554 if (qos->rate_limits[i].queue_io(&qos->rate_limits[i], 1555 bdev_io) == true) { 1556 return submitted_ios; 1557 } 1558 } 1559 for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) { 1560 if (!qos->rate_limits[i].update_quota) { 1561 continue; 1562 } 1563 1564 qos->rate_limits[i].update_quota(&qos->rate_limits[i], bdev_io); 1565 } 1566 } 1567 1568 TAILQ_REMOVE(&qos->queued, bdev_io, internal.link); 1569 bdev_io_do_submit(ch, bdev_io); 1570 submitted_ios++; 1571 } 1572 1573 return submitted_ios; 1574 } 1575 1576 static void 1577 bdev_queue_io_wait_with_cb(struct spdk_bdev_io *bdev_io, spdk_bdev_io_wait_cb cb_fn) 1578 { 1579 int rc; 1580 1581 bdev_io->internal.waitq_entry.bdev = bdev_io->bdev; 1582 bdev_io->internal.waitq_entry.cb_fn = cb_fn; 1583 bdev_io->internal.waitq_entry.cb_arg = bdev_io; 1584 rc = spdk_bdev_queue_io_wait(bdev_io->bdev, spdk_io_channel_from_ctx(bdev_io->internal.ch), 1585 &bdev_io->internal.waitq_entry); 1586 if (rc != 0) { 1587 SPDK_ERRLOG("Queue IO failed, rc=%d\n", rc); 1588 bdev_io->internal.status = SPDK_BDEV_IO_STATUS_FAILED; 1589 bdev_io->internal.cb(bdev_io, false, bdev_io->internal.caller_ctx); 1590 } 1591 } 1592 1593 static bool 1594 bdev_io_type_can_split(uint8_t type) 1595 { 1596 assert(type != SPDK_BDEV_IO_TYPE_INVALID); 1597 assert(type < SPDK_BDEV_NUM_IO_TYPES); 1598 1599 /* Only split READ and WRITE I/O. Theoretically other types of I/O like 1600 * UNMAP could be split, but these types of I/O are typically much larger 1601 * in size (sometimes the size of the entire block device), and the bdev 1602 * module can more efficiently split these types of I/O. Plus those types 1603 * of I/O do not have a payload, which makes the splitting process simpler. 1604 */ 1605 if (type == SPDK_BDEV_IO_TYPE_READ || type == SPDK_BDEV_IO_TYPE_WRITE) { 1606 return true; 1607 } else { 1608 return false; 1609 } 1610 } 1611 1612 static bool 1613 bdev_io_should_split(struct spdk_bdev_io *bdev_io) 1614 { 1615 uint64_t start_stripe, end_stripe; 1616 uint32_t io_boundary = bdev_io->bdev->optimal_io_boundary; 1617 1618 if (io_boundary == 0) { 1619 return false; 1620 } 1621 1622 if (!bdev_io_type_can_split(bdev_io->type)) { 1623 return false; 1624 } 1625 1626 start_stripe = bdev_io->u.bdev.offset_blocks; 1627 end_stripe = start_stripe + bdev_io->u.bdev.num_blocks - 1; 1628 /* Avoid expensive div operations if possible. These spdk_u32 functions are very cheap. */ 1629 if (spdk_likely(spdk_u32_is_pow2(io_boundary))) { 1630 start_stripe >>= spdk_u32log2(io_boundary); 1631 end_stripe >>= spdk_u32log2(io_boundary); 1632 } else { 1633 start_stripe /= io_boundary; 1634 end_stripe /= io_boundary; 1635 } 1636 return (start_stripe != end_stripe); 1637 } 1638 1639 static uint32_t 1640 _to_next_boundary(uint64_t offset, uint32_t boundary) 1641 { 1642 return (boundary - (offset % boundary)); 1643 } 1644 1645 static void 1646 bdev_io_split_done(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg); 1647 1648 static void 1649 _bdev_io_split(void *_bdev_io) 1650 { 1651 struct spdk_bdev_io *bdev_io = _bdev_io; 1652 uint64_t current_offset, remaining; 1653 uint32_t blocklen, to_next_boundary, to_next_boundary_bytes, to_last_block_bytes; 1654 struct iovec *parent_iov, *iov; 1655 uint64_t parent_iov_offset, iov_len; 1656 uint32_t parent_iovpos, parent_iovcnt, child_iovcnt, iovcnt; 1657 void *md_buf = NULL; 1658 int rc; 1659 1660 remaining = bdev_io->u.bdev.split_remaining_num_blocks; 1661 current_offset = bdev_io->u.bdev.split_current_offset_blocks; 1662 blocklen = bdev_io->bdev->blocklen; 1663 parent_iov_offset = (current_offset - bdev_io->u.bdev.offset_blocks) * blocklen; 1664 parent_iovcnt = bdev_io->u.bdev.iovcnt; 1665 1666 for (parent_iovpos = 0; parent_iovpos < parent_iovcnt; parent_iovpos++) { 1667 parent_iov = &bdev_io->u.bdev.iovs[parent_iovpos]; 1668 if (parent_iov_offset < parent_iov->iov_len) { 1669 break; 1670 } 1671 parent_iov_offset -= parent_iov->iov_len; 1672 } 1673 1674 child_iovcnt = 0; 1675 while (remaining > 0 && parent_iovpos < parent_iovcnt && child_iovcnt < BDEV_IO_NUM_CHILD_IOV) { 1676 to_next_boundary = _to_next_boundary(current_offset, bdev_io->bdev->optimal_io_boundary); 1677 to_next_boundary = spdk_min(remaining, to_next_boundary); 1678 to_next_boundary_bytes = to_next_boundary * blocklen; 1679 iov = &bdev_io->child_iov[child_iovcnt]; 1680 iovcnt = 0; 1681 1682 if (bdev_io->u.bdev.md_buf) { 1683 assert((parent_iov_offset % blocklen) > 0); 1684 md_buf = (char *)bdev_io->u.bdev.md_buf + (parent_iov_offset / blocklen) * 1685 spdk_bdev_get_md_size(bdev_io->bdev); 1686 } 1687 1688 while (to_next_boundary_bytes > 0 && parent_iovpos < parent_iovcnt && 1689 child_iovcnt < BDEV_IO_NUM_CHILD_IOV) { 1690 parent_iov = &bdev_io->u.bdev.iovs[parent_iovpos]; 1691 iov_len = spdk_min(to_next_boundary_bytes, parent_iov->iov_len - parent_iov_offset); 1692 to_next_boundary_bytes -= iov_len; 1693 1694 bdev_io->child_iov[child_iovcnt].iov_base = parent_iov->iov_base + parent_iov_offset; 1695 bdev_io->child_iov[child_iovcnt].iov_len = iov_len; 1696 1697 if (iov_len < parent_iov->iov_len - parent_iov_offset) { 1698 parent_iov_offset += iov_len; 1699 } else { 1700 parent_iovpos++; 1701 parent_iov_offset = 0; 1702 } 1703 child_iovcnt++; 1704 iovcnt++; 1705 } 1706 1707 if (to_next_boundary_bytes > 0) { 1708 /* We had to stop this child I/O early because we ran out of 1709 * child_iov space. Ensure the iovs to be aligned with block 1710 * size and then adjust to_next_boundary before starting the 1711 * child I/O. 1712 */ 1713 assert(child_iovcnt == BDEV_IO_NUM_CHILD_IOV); 1714 to_last_block_bytes = to_next_boundary_bytes % blocklen; 1715 if (to_last_block_bytes != 0) { 1716 uint32_t child_iovpos = child_iovcnt - 1; 1717 /* don't decrease child_iovcnt so the loop will naturally end */ 1718 1719 to_last_block_bytes = blocklen - to_last_block_bytes; 1720 to_next_boundary_bytes += to_last_block_bytes; 1721 while (to_last_block_bytes > 0 && iovcnt > 0) { 1722 iov_len = spdk_min(to_last_block_bytes, 1723 bdev_io->child_iov[child_iovpos].iov_len); 1724 bdev_io->child_iov[child_iovpos].iov_len -= iov_len; 1725 if (bdev_io->child_iov[child_iovpos].iov_len == 0) { 1726 child_iovpos--; 1727 if (--iovcnt == 0) { 1728 return; 1729 } 1730 } 1731 to_last_block_bytes -= iov_len; 1732 } 1733 1734 assert(to_last_block_bytes == 0); 1735 } 1736 to_next_boundary -= to_next_boundary_bytes / blocklen; 1737 } 1738 1739 bdev_io->u.bdev.split_outstanding++; 1740 1741 if (bdev_io->type == SPDK_BDEV_IO_TYPE_READ) { 1742 rc = bdev_readv_blocks_with_md(bdev_io->internal.desc, 1743 spdk_io_channel_from_ctx(bdev_io->internal.ch), 1744 iov, iovcnt, md_buf, current_offset, 1745 to_next_boundary, 1746 bdev_io_split_done, bdev_io); 1747 } else { 1748 rc = bdev_writev_blocks_with_md(bdev_io->internal.desc, 1749 spdk_io_channel_from_ctx(bdev_io->internal.ch), 1750 iov, iovcnt, md_buf, current_offset, 1751 to_next_boundary, 1752 bdev_io_split_done, bdev_io); 1753 } 1754 1755 if (rc == 0) { 1756 current_offset += to_next_boundary; 1757 remaining -= to_next_boundary; 1758 bdev_io->u.bdev.split_current_offset_blocks = current_offset; 1759 bdev_io->u.bdev.split_remaining_num_blocks = remaining; 1760 } else { 1761 bdev_io->u.bdev.split_outstanding--; 1762 if (rc == -ENOMEM) { 1763 if (bdev_io->u.bdev.split_outstanding == 0) { 1764 /* No I/O is outstanding. Hence we should wait here. */ 1765 bdev_queue_io_wait_with_cb(bdev_io, _bdev_io_split); 1766 } 1767 } else { 1768 bdev_io->internal.status = SPDK_BDEV_IO_STATUS_FAILED; 1769 if (bdev_io->u.bdev.split_outstanding == 0) { 1770 spdk_trace_record_tsc(spdk_get_ticks(), TRACE_BDEV_IO_DONE, 0, 0, 1771 (uintptr_t)bdev_io, 0); 1772 TAILQ_REMOVE(&bdev_io->internal.ch->io_submitted, bdev_io, internal.ch_link); 1773 bdev_io->internal.cb(bdev_io, false, bdev_io->internal.caller_ctx); 1774 } 1775 } 1776 1777 return; 1778 } 1779 } 1780 } 1781 1782 static void 1783 bdev_io_split_done(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg) 1784 { 1785 struct spdk_bdev_io *parent_io = cb_arg; 1786 1787 spdk_bdev_free_io(bdev_io); 1788 1789 if (!success) { 1790 parent_io->internal.status = SPDK_BDEV_IO_STATUS_FAILED; 1791 } 1792 parent_io->u.bdev.split_outstanding--; 1793 if (parent_io->u.bdev.split_outstanding != 0) { 1794 return; 1795 } 1796 1797 /* 1798 * Parent I/O finishes when all blocks are consumed. 1799 */ 1800 if (parent_io->u.bdev.split_remaining_num_blocks == 0) { 1801 assert(parent_io->internal.cb != bdev_io_split_done); 1802 spdk_trace_record_tsc(spdk_get_ticks(), TRACE_BDEV_IO_DONE, 0, 0, 1803 (uintptr_t)parent_io, 0); 1804 TAILQ_REMOVE(&parent_io->internal.ch->io_submitted, parent_io, internal.ch_link); 1805 parent_io->internal.cb(parent_io, parent_io->internal.status == SPDK_BDEV_IO_STATUS_SUCCESS, 1806 parent_io->internal.caller_ctx); 1807 return; 1808 } 1809 1810 /* 1811 * Continue with the splitting process. This function will complete the parent I/O if the 1812 * splitting is done. 1813 */ 1814 _bdev_io_split(parent_io); 1815 } 1816 1817 static void 1818 bdev_io_split_get_buf_cb(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io, bool success); 1819 1820 static void 1821 bdev_io_split(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io) 1822 { 1823 assert(bdev_io_type_can_split(bdev_io->type)); 1824 1825 bdev_io->u.bdev.split_current_offset_blocks = bdev_io->u.bdev.offset_blocks; 1826 bdev_io->u.bdev.split_remaining_num_blocks = bdev_io->u.bdev.num_blocks; 1827 bdev_io->u.bdev.split_outstanding = 0; 1828 bdev_io->internal.status = SPDK_BDEV_IO_STATUS_SUCCESS; 1829 1830 if (_is_buf_allocated(bdev_io->u.bdev.iovs)) { 1831 _bdev_io_split(bdev_io); 1832 } else { 1833 assert(bdev_io->type == SPDK_BDEV_IO_TYPE_READ); 1834 spdk_bdev_io_get_buf(bdev_io, bdev_io_split_get_buf_cb, 1835 bdev_io->u.bdev.num_blocks * bdev_io->bdev->blocklen); 1836 } 1837 } 1838 1839 static void 1840 bdev_io_split_get_buf_cb(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io, bool success) 1841 { 1842 if (!success) { 1843 spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED); 1844 return; 1845 } 1846 1847 bdev_io_split(ch, bdev_io); 1848 } 1849 1850 /* Explicitly mark this inline, since it's used as a function pointer and otherwise won't 1851 * be inlined, at least on some compilers. 1852 */ 1853 static inline void 1854 _bdev_io_submit(void *ctx) 1855 { 1856 struct spdk_bdev_io *bdev_io = ctx; 1857 struct spdk_bdev *bdev = bdev_io->bdev; 1858 struct spdk_bdev_channel *bdev_ch = bdev_io->internal.ch; 1859 struct spdk_bdev_shared_resource *shared_resource = bdev_ch->shared_resource; 1860 uint64_t tsc; 1861 1862 tsc = spdk_get_ticks(); 1863 bdev_io->internal.submit_tsc = tsc; 1864 spdk_trace_record_tsc(tsc, TRACE_BDEV_IO_START, 0, 0, (uintptr_t)bdev_io, bdev_io->type); 1865 1866 if (spdk_likely(bdev_ch->flags == 0)) { 1867 bdev_io_do_submit(bdev_ch, bdev_io); 1868 return; 1869 } 1870 1871 bdev_ch->io_outstanding++; 1872 shared_resource->io_outstanding++; 1873 bdev_io->internal.in_submit_request = true; 1874 if (bdev_ch->flags & BDEV_CH_RESET_IN_PROGRESS) { 1875 spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED); 1876 } else if (bdev_ch->flags & BDEV_CH_QOS_ENABLED) { 1877 bdev_ch->io_outstanding--; 1878 shared_resource->io_outstanding--; 1879 TAILQ_INSERT_TAIL(&bdev->internal.qos->queued, bdev_io, internal.link); 1880 bdev_qos_io_submit(bdev_ch, bdev->internal.qos); 1881 } else { 1882 SPDK_ERRLOG("unknown bdev_ch flag %x found\n", bdev_ch->flags); 1883 spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED); 1884 } 1885 bdev_io->internal.in_submit_request = false; 1886 } 1887 1888 void 1889 bdev_io_submit(struct spdk_bdev_io *bdev_io) 1890 { 1891 struct spdk_bdev *bdev = bdev_io->bdev; 1892 struct spdk_thread *thread = spdk_bdev_io_get_thread(bdev_io); 1893 struct spdk_bdev_channel *ch = bdev_io->internal.ch; 1894 1895 assert(thread != NULL); 1896 assert(bdev_io->internal.status == SPDK_BDEV_IO_STATUS_PENDING); 1897 1898 /* Add the bdev_io to io_submitted only if it is the original 1899 * submission from the bdev user. When a bdev_io is split, 1900 * it comes back through this code path, so we need to make sure 1901 * we don't try to add it a second time. 1902 */ 1903 if (bdev_io->internal.cb != bdev_io_split_done) { 1904 TAILQ_INSERT_TAIL(&ch->io_submitted, bdev_io, internal.ch_link); 1905 } 1906 1907 if (bdev->split_on_optimal_io_boundary && bdev_io_should_split(bdev_io)) { 1908 bdev_io->internal.submit_tsc = spdk_get_ticks(); 1909 spdk_trace_record_tsc(bdev_io->internal.submit_tsc, TRACE_BDEV_IO_START, 0, 0, 1910 (uintptr_t)bdev_io, bdev_io->type); 1911 bdev_io_split(NULL, bdev_io); 1912 return; 1913 } 1914 1915 if (ch->flags & BDEV_CH_QOS_ENABLED) { 1916 if ((thread == bdev->internal.qos->thread) || !bdev->internal.qos->thread) { 1917 _bdev_io_submit(bdev_io); 1918 } else { 1919 bdev_io->internal.io_submit_ch = ch; 1920 bdev_io->internal.ch = bdev->internal.qos->ch; 1921 spdk_thread_send_msg(bdev->internal.qos->thread, _bdev_io_submit, bdev_io); 1922 } 1923 } else { 1924 _bdev_io_submit(bdev_io); 1925 } 1926 } 1927 1928 static void 1929 bdev_io_submit_reset(struct spdk_bdev_io *bdev_io) 1930 { 1931 struct spdk_bdev *bdev = bdev_io->bdev; 1932 struct spdk_bdev_channel *bdev_ch = bdev_io->internal.ch; 1933 struct spdk_io_channel *ch = bdev_ch->channel; 1934 1935 assert(bdev_io->internal.status == SPDK_BDEV_IO_STATUS_PENDING); 1936 1937 bdev_io->internal.in_submit_request = true; 1938 bdev->fn_table->submit_request(ch, bdev_io); 1939 bdev_io->internal.in_submit_request = false; 1940 } 1941 1942 void 1943 bdev_io_init(struct spdk_bdev_io *bdev_io, 1944 struct spdk_bdev *bdev, void *cb_arg, 1945 spdk_bdev_io_completion_cb cb) 1946 { 1947 bdev_io->bdev = bdev; 1948 bdev_io->internal.caller_ctx = cb_arg; 1949 bdev_io->internal.cb = cb; 1950 bdev_io->internal.status = SPDK_BDEV_IO_STATUS_PENDING; 1951 bdev_io->internal.in_submit_request = false; 1952 bdev_io->internal.buf = NULL; 1953 bdev_io->internal.io_submit_ch = NULL; 1954 bdev_io->internal.orig_iovs = NULL; 1955 bdev_io->internal.orig_iovcnt = 0; 1956 bdev_io->internal.orig_md_buf = NULL; 1957 bdev_io->internal.error.nvme.cdw0 = 0; 1958 } 1959 1960 static bool 1961 bdev_io_type_supported(struct spdk_bdev *bdev, enum spdk_bdev_io_type io_type) 1962 { 1963 return bdev->fn_table->io_type_supported(bdev->ctxt, io_type); 1964 } 1965 1966 bool 1967 spdk_bdev_io_type_supported(struct spdk_bdev *bdev, enum spdk_bdev_io_type io_type) 1968 { 1969 bool supported; 1970 1971 supported = bdev_io_type_supported(bdev, io_type); 1972 1973 if (!supported) { 1974 switch (io_type) { 1975 case SPDK_BDEV_IO_TYPE_WRITE_ZEROES: 1976 /* The bdev layer will emulate write zeroes as long as write is supported. */ 1977 supported = bdev_io_type_supported(bdev, SPDK_BDEV_IO_TYPE_WRITE); 1978 break; 1979 case SPDK_BDEV_IO_TYPE_ZCOPY: 1980 /* Zero copy can be emulated with regular read and write */ 1981 supported = bdev_io_type_supported(bdev, SPDK_BDEV_IO_TYPE_READ) && 1982 bdev_io_type_supported(bdev, SPDK_BDEV_IO_TYPE_WRITE); 1983 break; 1984 default: 1985 break; 1986 } 1987 } 1988 1989 return supported; 1990 } 1991 1992 int 1993 spdk_bdev_dump_info_json(struct spdk_bdev *bdev, struct spdk_json_write_ctx *w) 1994 { 1995 if (bdev->fn_table->dump_info_json) { 1996 return bdev->fn_table->dump_info_json(bdev->ctxt, w); 1997 } 1998 1999 return 0; 2000 } 2001 2002 static void 2003 bdev_qos_update_max_quota_per_timeslice(struct spdk_bdev_qos *qos) 2004 { 2005 uint32_t max_per_timeslice = 0; 2006 int i; 2007 2008 for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) { 2009 if (qos->rate_limits[i].limit == SPDK_BDEV_QOS_LIMIT_NOT_DEFINED) { 2010 qos->rate_limits[i].max_per_timeslice = 0; 2011 continue; 2012 } 2013 2014 max_per_timeslice = qos->rate_limits[i].limit * 2015 SPDK_BDEV_QOS_TIMESLICE_IN_USEC / SPDK_SEC_TO_USEC; 2016 2017 qos->rate_limits[i].max_per_timeslice = spdk_max(max_per_timeslice, 2018 qos->rate_limits[i].min_per_timeslice); 2019 2020 qos->rate_limits[i].remaining_this_timeslice = qos->rate_limits[i].max_per_timeslice; 2021 } 2022 2023 bdev_qos_set_ops(qos); 2024 } 2025 2026 static int 2027 bdev_channel_poll_qos(void *arg) 2028 { 2029 struct spdk_bdev_qos *qos = arg; 2030 uint64_t now = spdk_get_ticks(); 2031 int i; 2032 2033 if (now < (qos->last_timeslice + qos->timeslice_size)) { 2034 /* We received our callback earlier than expected - return 2035 * immediately and wait to do accounting until at least one 2036 * timeslice has actually expired. This should never happen 2037 * with a well-behaved timer implementation. 2038 */ 2039 return 0; 2040 } 2041 2042 /* Reset for next round of rate limiting */ 2043 for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) { 2044 /* We may have allowed the IOs or bytes to slightly overrun in the last 2045 * timeslice. remaining_this_timeslice is signed, so if it's negative 2046 * here, we'll account for the overrun so that the next timeslice will 2047 * be appropriately reduced. 2048 */ 2049 if (qos->rate_limits[i].remaining_this_timeslice > 0) { 2050 qos->rate_limits[i].remaining_this_timeslice = 0; 2051 } 2052 } 2053 2054 while (now >= (qos->last_timeslice + qos->timeslice_size)) { 2055 qos->last_timeslice += qos->timeslice_size; 2056 for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) { 2057 qos->rate_limits[i].remaining_this_timeslice += 2058 qos->rate_limits[i].max_per_timeslice; 2059 } 2060 } 2061 2062 return bdev_qos_io_submit(qos->ch, qos); 2063 } 2064 2065 static void 2066 bdev_channel_destroy_resource(struct spdk_bdev_channel *ch) 2067 { 2068 struct spdk_bdev_shared_resource *shared_resource; 2069 2070 spdk_put_io_channel(ch->channel); 2071 2072 shared_resource = ch->shared_resource; 2073 2074 assert(TAILQ_EMPTY(&ch->io_submitted)); 2075 assert(ch->io_outstanding == 0); 2076 assert(shared_resource->ref > 0); 2077 shared_resource->ref--; 2078 if (shared_resource->ref == 0) { 2079 assert(shared_resource->io_outstanding == 0); 2080 TAILQ_REMOVE(&shared_resource->mgmt_ch->shared_resources, shared_resource, link); 2081 spdk_put_io_channel(spdk_io_channel_from_ctx(shared_resource->mgmt_ch)); 2082 free(shared_resource); 2083 } 2084 } 2085 2086 /* Caller must hold bdev->internal.mutex. */ 2087 static void 2088 bdev_enable_qos(struct spdk_bdev *bdev, struct spdk_bdev_channel *ch) 2089 { 2090 struct spdk_bdev_qos *qos = bdev->internal.qos; 2091 int i; 2092 2093 /* Rate limiting on this bdev enabled */ 2094 if (qos) { 2095 if (qos->ch == NULL) { 2096 struct spdk_io_channel *io_ch; 2097 2098 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Selecting channel %p as QoS channel for bdev %s on thread %p\n", ch, 2099 bdev->name, spdk_get_thread()); 2100 2101 /* No qos channel has been selected, so set one up */ 2102 2103 /* Take another reference to ch */ 2104 io_ch = spdk_get_io_channel(__bdev_to_io_dev(bdev)); 2105 assert(io_ch != NULL); 2106 qos->ch = ch; 2107 2108 qos->thread = spdk_io_channel_get_thread(io_ch); 2109 2110 TAILQ_INIT(&qos->queued); 2111 2112 for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) { 2113 if (bdev_qos_is_iops_rate_limit(i) == true) { 2114 qos->rate_limits[i].min_per_timeslice = 2115 SPDK_BDEV_QOS_MIN_IO_PER_TIMESLICE; 2116 } else { 2117 qos->rate_limits[i].min_per_timeslice = 2118 SPDK_BDEV_QOS_MIN_BYTE_PER_TIMESLICE; 2119 } 2120 2121 if (qos->rate_limits[i].limit == 0) { 2122 qos->rate_limits[i].limit = SPDK_BDEV_QOS_LIMIT_NOT_DEFINED; 2123 } 2124 } 2125 bdev_qos_update_max_quota_per_timeslice(qos); 2126 qos->timeslice_size = 2127 SPDK_BDEV_QOS_TIMESLICE_IN_USEC * spdk_get_ticks_hz() / SPDK_SEC_TO_USEC; 2128 qos->last_timeslice = spdk_get_ticks(); 2129 qos->poller = spdk_poller_register(bdev_channel_poll_qos, 2130 qos, 2131 SPDK_BDEV_QOS_TIMESLICE_IN_USEC); 2132 } 2133 2134 ch->flags |= BDEV_CH_QOS_ENABLED; 2135 } 2136 } 2137 2138 struct poll_timeout_ctx { 2139 struct spdk_bdev_desc *desc; 2140 uint64_t timeout_in_sec; 2141 spdk_bdev_io_timeout_cb cb_fn; 2142 void *cb_arg; 2143 }; 2144 2145 static void 2146 bdev_desc_free(struct spdk_bdev_desc *desc) 2147 { 2148 pthread_mutex_destroy(&desc->mutex); 2149 free(desc->media_events_buffer); 2150 free(desc); 2151 } 2152 2153 static void 2154 bdev_channel_poll_timeout_io_done(struct spdk_io_channel_iter *i, int status) 2155 { 2156 struct poll_timeout_ctx *ctx = spdk_io_channel_iter_get_ctx(i); 2157 struct spdk_bdev_desc *desc = ctx->desc; 2158 2159 free(ctx); 2160 2161 pthread_mutex_lock(&desc->mutex); 2162 desc->refs--; 2163 if (desc->closed == true && desc->refs == 0) { 2164 pthread_mutex_unlock(&desc->mutex); 2165 bdev_desc_free(desc); 2166 return; 2167 } 2168 pthread_mutex_unlock(&desc->mutex); 2169 } 2170 2171 static void 2172 bdev_channel_poll_timeout_io(struct spdk_io_channel_iter *i) 2173 { 2174 struct poll_timeout_ctx *ctx = spdk_io_channel_iter_get_ctx(i); 2175 struct spdk_io_channel *io_ch = spdk_io_channel_iter_get_channel(i); 2176 struct spdk_bdev_channel *bdev_ch = spdk_io_channel_get_ctx(io_ch); 2177 struct spdk_bdev_desc *desc = ctx->desc; 2178 struct spdk_bdev_io *bdev_io; 2179 uint64_t now; 2180 2181 pthread_mutex_lock(&desc->mutex); 2182 if (desc->closed == true) { 2183 pthread_mutex_unlock(&desc->mutex); 2184 spdk_for_each_channel_continue(i, -1); 2185 return; 2186 } 2187 pthread_mutex_unlock(&desc->mutex); 2188 2189 now = spdk_get_ticks(); 2190 TAILQ_FOREACH(bdev_io, &bdev_ch->io_submitted, internal.ch_link) { 2191 /* I/O are added to this TAILQ as they are submitted. 2192 * So once we find an I/O that has not timed out, we can immediately exit the loop. */ 2193 if (now < (bdev_io->internal.submit_tsc + 2194 ctx->timeout_in_sec * spdk_get_ticks_hz())) { 2195 goto end; 2196 } 2197 2198 if (bdev_io->internal.desc == desc) { 2199 ctx->cb_fn(ctx->cb_arg, bdev_io); 2200 } 2201 } 2202 2203 end: 2204 spdk_for_each_channel_continue(i, 0); 2205 } 2206 2207 static int 2208 bdev_poll_timeout_io(void *arg) 2209 { 2210 struct spdk_bdev_desc *desc = arg; 2211 struct spdk_bdev *bdev = spdk_bdev_desc_get_bdev(desc); 2212 struct poll_timeout_ctx *ctx; 2213 2214 ctx = calloc(1, sizeof(struct poll_timeout_ctx)); 2215 if (!ctx) { 2216 SPDK_ERRLOG("failed to allocate memory\n"); 2217 return 1; 2218 } 2219 ctx->desc = desc; 2220 ctx->cb_arg = desc->cb_arg; 2221 ctx->cb_fn = desc->cb_fn; 2222 ctx->timeout_in_sec = desc->timeout_in_sec; 2223 2224 /* Take a ref on the descriptor in case it gets closed while we are checking 2225 * all of the channels. 2226 */ 2227 pthread_mutex_lock(&desc->mutex); 2228 desc->refs++; 2229 pthread_mutex_unlock(&desc->mutex); 2230 2231 spdk_for_each_channel(__bdev_to_io_dev(bdev), 2232 bdev_channel_poll_timeout_io, 2233 ctx, 2234 bdev_channel_poll_timeout_io_done); 2235 2236 return 1; 2237 } 2238 2239 int 2240 spdk_bdev_set_timeout(struct spdk_bdev_desc *desc, uint64_t timeout_in_sec, 2241 spdk_bdev_io_timeout_cb cb_fn, void *cb_arg) 2242 { 2243 assert(desc->thread == spdk_get_thread()); 2244 2245 spdk_poller_unregister(&desc->io_timeout_poller); 2246 2247 if (timeout_in_sec) { 2248 assert(cb_fn != NULL); 2249 desc->io_timeout_poller = spdk_poller_register(bdev_poll_timeout_io, 2250 desc, 2251 SPDK_BDEV_IO_POLL_INTERVAL_IN_MSEC * SPDK_SEC_TO_USEC / 2252 1000); 2253 if (desc->io_timeout_poller == NULL) { 2254 SPDK_ERRLOG("can not register the desc timeout IO poller\n"); 2255 return -1; 2256 } 2257 } 2258 2259 desc->cb_fn = cb_fn; 2260 desc->cb_arg = cb_arg; 2261 desc->timeout_in_sec = timeout_in_sec; 2262 2263 return 0; 2264 } 2265 2266 static int 2267 bdev_channel_create(void *io_device, void *ctx_buf) 2268 { 2269 struct spdk_bdev *bdev = __bdev_from_io_dev(io_device); 2270 struct spdk_bdev_channel *ch = ctx_buf; 2271 struct spdk_io_channel *mgmt_io_ch; 2272 struct spdk_bdev_mgmt_channel *mgmt_ch; 2273 struct spdk_bdev_shared_resource *shared_resource; 2274 2275 ch->bdev = bdev; 2276 ch->channel = bdev->fn_table->get_io_channel(bdev->ctxt); 2277 if (!ch->channel) { 2278 return -1; 2279 } 2280 2281 assert(ch->histogram == NULL); 2282 if (bdev->internal.histogram_enabled) { 2283 ch->histogram = spdk_histogram_data_alloc(); 2284 if (ch->histogram == NULL) { 2285 SPDK_ERRLOG("Could not allocate histogram\n"); 2286 } 2287 } 2288 2289 mgmt_io_ch = spdk_get_io_channel(&g_bdev_mgr); 2290 if (!mgmt_io_ch) { 2291 spdk_put_io_channel(ch->channel); 2292 return -1; 2293 } 2294 2295 mgmt_ch = spdk_io_channel_get_ctx(mgmt_io_ch); 2296 TAILQ_FOREACH(shared_resource, &mgmt_ch->shared_resources, link) { 2297 if (shared_resource->shared_ch == ch->channel) { 2298 spdk_put_io_channel(mgmt_io_ch); 2299 shared_resource->ref++; 2300 break; 2301 } 2302 } 2303 2304 if (shared_resource == NULL) { 2305 shared_resource = calloc(1, sizeof(*shared_resource)); 2306 if (shared_resource == NULL) { 2307 spdk_put_io_channel(ch->channel); 2308 spdk_put_io_channel(mgmt_io_ch); 2309 return -1; 2310 } 2311 2312 shared_resource->mgmt_ch = mgmt_ch; 2313 shared_resource->io_outstanding = 0; 2314 TAILQ_INIT(&shared_resource->nomem_io); 2315 shared_resource->nomem_threshold = 0; 2316 shared_resource->shared_ch = ch->channel; 2317 shared_resource->ref = 1; 2318 TAILQ_INSERT_TAIL(&mgmt_ch->shared_resources, shared_resource, link); 2319 } 2320 2321 memset(&ch->stat, 0, sizeof(ch->stat)); 2322 ch->stat.ticks_rate = spdk_get_ticks_hz(); 2323 ch->io_outstanding = 0; 2324 TAILQ_INIT(&ch->queued_resets); 2325 ch->flags = 0; 2326 ch->shared_resource = shared_resource; 2327 2328 TAILQ_INIT(&ch->io_submitted); 2329 2330 #ifdef SPDK_CONFIG_VTUNE 2331 { 2332 char *name; 2333 __itt_init_ittlib(NULL, 0); 2334 name = spdk_sprintf_alloc("spdk_bdev_%s_%p", ch->bdev->name, ch); 2335 if (!name) { 2336 bdev_channel_destroy_resource(ch); 2337 return -1; 2338 } 2339 ch->handle = __itt_string_handle_create(name); 2340 free(name); 2341 ch->start_tsc = spdk_get_ticks(); 2342 ch->interval_tsc = spdk_get_ticks_hz() / 100; 2343 memset(&ch->prev_stat, 0, sizeof(ch->prev_stat)); 2344 } 2345 #endif 2346 2347 pthread_mutex_lock(&bdev->internal.mutex); 2348 bdev_enable_qos(bdev, ch); 2349 pthread_mutex_unlock(&bdev->internal.mutex); 2350 2351 return 0; 2352 } 2353 2354 /* 2355 * Abort I/O that are waiting on a data buffer. These types of I/O are 2356 * linked using the spdk_bdev_io internal.buf_link TAILQ_ENTRY. 2357 */ 2358 static void 2359 bdev_abort_buf_io(bdev_io_stailq_t *queue, struct spdk_bdev_channel *ch) 2360 { 2361 bdev_io_stailq_t tmp; 2362 struct spdk_bdev_io *bdev_io; 2363 2364 STAILQ_INIT(&tmp); 2365 2366 while (!STAILQ_EMPTY(queue)) { 2367 bdev_io = STAILQ_FIRST(queue); 2368 STAILQ_REMOVE_HEAD(queue, internal.buf_link); 2369 if (bdev_io->internal.ch == ch) { 2370 spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED); 2371 } else { 2372 STAILQ_INSERT_TAIL(&tmp, bdev_io, internal.buf_link); 2373 } 2374 } 2375 2376 STAILQ_SWAP(&tmp, queue, spdk_bdev_io); 2377 } 2378 2379 /* 2380 * Abort I/O that are queued waiting for submission. These types of I/O are 2381 * linked using the spdk_bdev_io link TAILQ_ENTRY. 2382 */ 2383 static void 2384 bdev_abort_queued_io(bdev_io_tailq_t *queue, struct spdk_bdev_channel *ch) 2385 { 2386 struct spdk_bdev_io *bdev_io, *tmp; 2387 2388 TAILQ_FOREACH_SAFE(bdev_io, queue, internal.link, tmp) { 2389 if (bdev_io->internal.ch == ch) { 2390 TAILQ_REMOVE(queue, bdev_io, internal.link); 2391 /* 2392 * spdk_bdev_io_complete() assumes that the completed I/O had 2393 * been submitted to the bdev module. Since in this case it 2394 * hadn't, bump io_outstanding to account for the decrement 2395 * that spdk_bdev_io_complete() will do. 2396 */ 2397 if (bdev_io->type != SPDK_BDEV_IO_TYPE_RESET) { 2398 ch->io_outstanding++; 2399 ch->shared_resource->io_outstanding++; 2400 } 2401 spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED); 2402 } 2403 } 2404 } 2405 2406 static void 2407 bdev_qos_channel_destroy(void *cb_arg) 2408 { 2409 struct spdk_bdev_qos *qos = cb_arg; 2410 2411 spdk_put_io_channel(spdk_io_channel_from_ctx(qos->ch)); 2412 spdk_poller_unregister(&qos->poller); 2413 2414 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Free QoS %p.\n", qos); 2415 2416 free(qos); 2417 } 2418 2419 static int 2420 bdev_qos_destroy(struct spdk_bdev *bdev) 2421 { 2422 int i; 2423 2424 /* 2425 * Cleanly shutting down the QoS poller is tricky, because 2426 * during the asynchronous operation the user could open 2427 * a new descriptor and create a new channel, spawning 2428 * a new QoS poller. 2429 * 2430 * The strategy is to create a new QoS structure here and swap it 2431 * in. The shutdown path then continues to refer to the old one 2432 * until it completes and then releases it. 2433 */ 2434 struct spdk_bdev_qos *new_qos, *old_qos; 2435 2436 old_qos = bdev->internal.qos; 2437 2438 new_qos = calloc(1, sizeof(*new_qos)); 2439 if (!new_qos) { 2440 SPDK_ERRLOG("Unable to allocate memory to shut down QoS.\n"); 2441 return -ENOMEM; 2442 } 2443 2444 /* Copy the old QoS data into the newly allocated structure */ 2445 memcpy(new_qos, old_qos, sizeof(*new_qos)); 2446 2447 /* Zero out the key parts of the QoS structure */ 2448 new_qos->ch = NULL; 2449 new_qos->thread = NULL; 2450 new_qos->poller = NULL; 2451 TAILQ_INIT(&new_qos->queued); 2452 /* 2453 * The limit member of spdk_bdev_qos_limit structure is not zeroed. 2454 * It will be used later for the new QoS structure. 2455 */ 2456 for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) { 2457 new_qos->rate_limits[i].remaining_this_timeslice = 0; 2458 new_qos->rate_limits[i].min_per_timeslice = 0; 2459 new_qos->rate_limits[i].max_per_timeslice = 0; 2460 } 2461 2462 bdev->internal.qos = new_qos; 2463 2464 if (old_qos->thread == NULL) { 2465 free(old_qos); 2466 } else { 2467 spdk_thread_send_msg(old_qos->thread, bdev_qos_channel_destroy, old_qos); 2468 } 2469 2470 /* It is safe to continue with destroying the bdev even though the QoS channel hasn't 2471 * been destroyed yet. The destruction path will end up waiting for the final 2472 * channel to be put before it releases resources. */ 2473 2474 return 0; 2475 } 2476 2477 static void 2478 bdev_io_stat_add(struct spdk_bdev_io_stat *total, struct spdk_bdev_io_stat *add) 2479 { 2480 total->bytes_read += add->bytes_read; 2481 total->num_read_ops += add->num_read_ops; 2482 total->bytes_written += add->bytes_written; 2483 total->num_write_ops += add->num_write_ops; 2484 total->bytes_unmapped += add->bytes_unmapped; 2485 total->num_unmap_ops += add->num_unmap_ops; 2486 total->read_latency_ticks += add->read_latency_ticks; 2487 total->write_latency_ticks += add->write_latency_ticks; 2488 total->unmap_latency_ticks += add->unmap_latency_ticks; 2489 } 2490 2491 static void 2492 bdev_channel_destroy(void *io_device, void *ctx_buf) 2493 { 2494 struct spdk_bdev_channel *ch = ctx_buf; 2495 struct spdk_bdev_mgmt_channel *mgmt_ch; 2496 struct spdk_bdev_shared_resource *shared_resource = ch->shared_resource; 2497 2498 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Destroying channel %p for bdev %s on thread %p\n", ch, ch->bdev->name, 2499 spdk_get_thread()); 2500 2501 /* This channel is going away, so add its statistics into the bdev so that they don't get lost. */ 2502 pthread_mutex_lock(&ch->bdev->internal.mutex); 2503 bdev_io_stat_add(&ch->bdev->internal.stat, &ch->stat); 2504 pthread_mutex_unlock(&ch->bdev->internal.mutex); 2505 2506 mgmt_ch = shared_resource->mgmt_ch; 2507 2508 bdev_abort_queued_io(&ch->queued_resets, ch); 2509 bdev_abort_queued_io(&shared_resource->nomem_io, ch); 2510 bdev_abort_buf_io(&mgmt_ch->need_buf_small, ch); 2511 bdev_abort_buf_io(&mgmt_ch->need_buf_large, ch); 2512 2513 if (ch->histogram) { 2514 spdk_histogram_data_free(ch->histogram); 2515 } 2516 2517 bdev_channel_destroy_resource(ch); 2518 } 2519 2520 int 2521 spdk_bdev_alias_add(struct spdk_bdev *bdev, const char *alias) 2522 { 2523 struct spdk_bdev_alias *tmp; 2524 2525 if (alias == NULL) { 2526 SPDK_ERRLOG("Empty alias passed\n"); 2527 return -EINVAL; 2528 } 2529 2530 if (spdk_bdev_get_by_name(alias)) { 2531 SPDK_ERRLOG("Bdev name/alias: %s already exists\n", alias); 2532 return -EEXIST; 2533 } 2534 2535 tmp = calloc(1, sizeof(*tmp)); 2536 if (tmp == NULL) { 2537 SPDK_ERRLOG("Unable to allocate alias\n"); 2538 return -ENOMEM; 2539 } 2540 2541 tmp->alias = strdup(alias); 2542 if (tmp->alias == NULL) { 2543 free(tmp); 2544 SPDK_ERRLOG("Unable to allocate alias\n"); 2545 return -ENOMEM; 2546 } 2547 2548 TAILQ_INSERT_TAIL(&bdev->aliases, tmp, tailq); 2549 2550 return 0; 2551 } 2552 2553 int 2554 spdk_bdev_alias_del(struct spdk_bdev *bdev, const char *alias) 2555 { 2556 struct spdk_bdev_alias *tmp; 2557 2558 TAILQ_FOREACH(tmp, &bdev->aliases, tailq) { 2559 if (strcmp(alias, tmp->alias) == 0) { 2560 TAILQ_REMOVE(&bdev->aliases, tmp, tailq); 2561 free(tmp->alias); 2562 free(tmp); 2563 return 0; 2564 } 2565 } 2566 2567 SPDK_INFOLOG(SPDK_LOG_BDEV, "Alias %s does not exists\n", alias); 2568 2569 return -ENOENT; 2570 } 2571 2572 void 2573 spdk_bdev_alias_del_all(struct spdk_bdev *bdev) 2574 { 2575 struct spdk_bdev_alias *p, *tmp; 2576 2577 TAILQ_FOREACH_SAFE(p, &bdev->aliases, tailq, tmp) { 2578 TAILQ_REMOVE(&bdev->aliases, p, tailq); 2579 free(p->alias); 2580 free(p); 2581 } 2582 } 2583 2584 struct spdk_io_channel * 2585 spdk_bdev_get_io_channel(struct spdk_bdev_desc *desc) 2586 { 2587 return spdk_get_io_channel(__bdev_to_io_dev(spdk_bdev_desc_get_bdev(desc))); 2588 } 2589 2590 const char * 2591 spdk_bdev_get_name(const struct spdk_bdev *bdev) 2592 { 2593 return bdev->name; 2594 } 2595 2596 const char * 2597 spdk_bdev_get_product_name(const struct spdk_bdev *bdev) 2598 { 2599 return bdev->product_name; 2600 } 2601 2602 const struct spdk_bdev_aliases_list * 2603 spdk_bdev_get_aliases(const struct spdk_bdev *bdev) 2604 { 2605 return &bdev->aliases; 2606 } 2607 2608 uint32_t 2609 spdk_bdev_get_block_size(const struct spdk_bdev *bdev) 2610 { 2611 return bdev->blocklen; 2612 } 2613 2614 uint32_t 2615 spdk_bdev_get_write_unit_size(const struct spdk_bdev *bdev) 2616 { 2617 return bdev->write_unit_size; 2618 } 2619 2620 uint64_t 2621 spdk_bdev_get_num_blocks(const struct spdk_bdev *bdev) 2622 { 2623 return bdev->blockcnt; 2624 } 2625 2626 const char * 2627 spdk_bdev_get_qos_rpc_type(enum spdk_bdev_qos_rate_limit_type type) 2628 { 2629 return qos_rpc_type[type]; 2630 } 2631 2632 void 2633 spdk_bdev_get_qos_rate_limits(struct spdk_bdev *bdev, uint64_t *limits) 2634 { 2635 int i; 2636 2637 memset(limits, 0, sizeof(*limits) * SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES); 2638 2639 pthread_mutex_lock(&bdev->internal.mutex); 2640 if (bdev->internal.qos) { 2641 for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) { 2642 if (bdev->internal.qos->rate_limits[i].limit != 2643 SPDK_BDEV_QOS_LIMIT_NOT_DEFINED) { 2644 limits[i] = bdev->internal.qos->rate_limits[i].limit; 2645 if (bdev_qos_is_iops_rate_limit(i) == false) { 2646 /* Change from Byte to Megabyte which is user visible. */ 2647 limits[i] = limits[i] / 1024 / 1024; 2648 } 2649 } 2650 } 2651 } 2652 pthread_mutex_unlock(&bdev->internal.mutex); 2653 } 2654 2655 size_t 2656 spdk_bdev_get_buf_align(const struct spdk_bdev *bdev) 2657 { 2658 return 1 << bdev->required_alignment; 2659 } 2660 2661 uint32_t 2662 spdk_bdev_get_optimal_io_boundary(const struct spdk_bdev *bdev) 2663 { 2664 return bdev->optimal_io_boundary; 2665 } 2666 2667 bool 2668 spdk_bdev_has_write_cache(const struct spdk_bdev *bdev) 2669 { 2670 return bdev->write_cache; 2671 } 2672 2673 const struct spdk_uuid * 2674 spdk_bdev_get_uuid(const struct spdk_bdev *bdev) 2675 { 2676 return &bdev->uuid; 2677 } 2678 2679 uint32_t 2680 spdk_bdev_get_md_size(const struct spdk_bdev *bdev) 2681 { 2682 return bdev->md_len; 2683 } 2684 2685 bool 2686 spdk_bdev_is_md_interleaved(const struct spdk_bdev *bdev) 2687 { 2688 return (bdev->md_len != 0) && bdev->md_interleave; 2689 } 2690 2691 bool 2692 spdk_bdev_is_md_separate(const struct spdk_bdev *bdev) 2693 { 2694 return (bdev->md_len != 0) && !bdev->md_interleave; 2695 } 2696 2697 bool 2698 spdk_bdev_is_zoned(const struct spdk_bdev *bdev) 2699 { 2700 return bdev->zoned; 2701 } 2702 2703 uint32_t 2704 spdk_bdev_get_data_block_size(const struct spdk_bdev *bdev) 2705 { 2706 if (spdk_bdev_is_md_interleaved(bdev)) { 2707 return bdev->blocklen - bdev->md_len; 2708 } else { 2709 return bdev->blocklen; 2710 } 2711 } 2712 2713 static uint32_t 2714 _bdev_get_block_size_with_md(const struct spdk_bdev *bdev) 2715 { 2716 if (!spdk_bdev_is_md_interleaved(bdev)) { 2717 return bdev->blocklen + bdev->md_len; 2718 } else { 2719 return bdev->blocklen; 2720 } 2721 } 2722 2723 enum spdk_dif_type spdk_bdev_get_dif_type(const struct spdk_bdev *bdev) 2724 { 2725 if (bdev->md_len != 0) { 2726 return bdev->dif_type; 2727 } else { 2728 return SPDK_DIF_DISABLE; 2729 } 2730 } 2731 2732 bool 2733 spdk_bdev_is_dif_head_of_md(const struct spdk_bdev *bdev) 2734 { 2735 if (spdk_bdev_get_dif_type(bdev) != SPDK_DIF_DISABLE) { 2736 return bdev->dif_is_head_of_md; 2737 } else { 2738 return false; 2739 } 2740 } 2741 2742 bool 2743 spdk_bdev_is_dif_check_enabled(const struct spdk_bdev *bdev, 2744 enum spdk_dif_check_type check_type) 2745 { 2746 if (spdk_bdev_get_dif_type(bdev) == SPDK_DIF_DISABLE) { 2747 return false; 2748 } 2749 2750 switch (check_type) { 2751 case SPDK_DIF_CHECK_TYPE_REFTAG: 2752 return (bdev->dif_check_flags & SPDK_DIF_FLAGS_REFTAG_CHECK) != 0; 2753 case SPDK_DIF_CHECK_TYPE_APPTAG: 2754 return (bdev->dif_check_flags & SPDK_DIF_FLAGS_APPTAG_CHECK) != 0; 2755 case SPDK_DIF_CHECK_TYPE_GUARD: 2756 return (bdev->dif_check_flags & SPDK_DIF_FLAGS_GUARD_CHECK) != 0; 2757 default: 2758 return false; 2759 } 2760 } 2761 2762 uint64_t 2763 spdk_bdev_get_qd(const struct spdk_bdev *bdev) 2764 { 2765 return bdev->internal.measured_queue_depth; 2766 } 2767 2768 uint64_t 2769 spdk_bdev_get_qd_sampling_period(const struct spdk_bdev *bdev) 2770 { 2771 return bdev->internal.period; 2772 } 2773 2774 uint64_t 2775 spdk_bdev_get_weighted_io_time(const struct spdk_bdev *bdev) 2776 { 2777 return bdev->internal.weighted_io_time; 2778 } 2779 2780 uint64_t 2781 spdk_bdev_get_io_time(const struct spdk_bdev *bdev) 2782 { 2783 return bdev->internal.io_time; 2784 } 2785 2786 static void 2787 _calculate_measured_qd_cpl(struct spdk_io_channel_iter *i, int status) 2788 { 2789 struct spdk_bdev *bdev = spdk_io_channel_iter_get_ctx(i); 2790 2791 bdev->internal.measured_queue_depth = bdev->internal.temporary_queue_depth; 2792 2793 if (bdev->internal.measured_queue_depth) { 2794 bdev->internal.io_time += bdev->internal.period; 2795 bdev->internal.weighted_io_time += bdev->internal.period * bdev->internal.measured_queue_depth; 2796 } 2797 } 2798 2799 static void 2800 _calculate_measured_qd(struct spdk_io_channel_iter *i) 2801 { 2802 struct spdk_bdev *bdev = spdk_io_channel_iter_get_ctx(i); 2803 struct spdk_io_channel *io_ch = spdk_io_channel_iter_get_channel(i); 2804 struct spdk_bdev_channel *ch = spdk_io_channel_get_ctx(io_ch); 2805 2806 bdev->internal.temporary_queue_depth += ch->io_outstanding; 2807 spdk_for_each_channel_continue(i, 0); 2808 } 2809 2810 static int 2811 bdev_calculate_measured_queue_depth(void *ctx) 2812 { 2813 struct spdk_bdev *bdev = ctx; 2814 bdev->internal.temporary_queue_depth = 0; 2815 spdk_for_each_channel(__bdev_to_io_dev(bdev), _calculate_measured_qd, bdev, 2816 _calculate_measured_qd_cpl); 2817 return 0; 2818 } 2819 2820 void 2821 spdk_bdev_set_qd_sampling_period(struct spdk_bdev *bdev, uint64_t period) 2822 { 2823 bdev->internal.period = period; 2824 2825 if (bdev->internal.qd_poller != NULL) { 2826 spdk_poller_unregister(&bdev->internal.qd_poller); 2827 bdev->internal.measured_queue_depth = UINT64_MAX; 2828 } 2829 2830 if (period != 0) { 2831 bdev->internal.qd_poller = spdk_poller_register(bdev_calculate_measured_queue_depth, bdev, 2832 period); 2833 } 2834 } 2835 2836 static void 2837 _resize_notify(void *arg) 2838 { 2839 struct spdk_bdev_desc *desc = arg; 2840 2841 pthread_mutex_lock(&desc->mutex); 2842 desc->refs--; 2843 if (!desc->closed) { 2844 pthread_mutex_unlock(&desc->mutex); 2845 desc->callback.event_fn(SPDK_BDEV_EVENT_RESIZE, 2846 desc->bdev, 2847 desc->callback.ctx); 2848 return; 2849 } else if (0 == desc->refs) { 2850 /* This descriptor was closed after this resize_notify message was sent. 2851 * spdk_bdev_close() could not free the descriptor since this message was 2852 * in flight, so we free it now using bdev_desc_free(). 2853 */ 2854 pthread_mutex_unlock(&desc->mutex); 2855 bdev_desc_free(desc); 2856 return; 2857 } 2858 pthread_mutex_unlock(&desc->mutex); 2859 } 2860 2861 int 2862 spdk_bdev_notify_blockcnt_change(struct spdk_bdev *bdev, uint64_t size) 2863 { 2864 struct spdk_bdev_desc *desc; 2865 int ret; 2866 2867 pthread_mutex_lock(&bdev->internal.mutex); 2868 2869 /* bdev has open descriptors */ 2870 if (!TAILQ_EMPTY(&bdev->internal.open_descs) && 2871 bdev->blockcnt > size) { 2872 ret = -EBUSY; 2873 } else { 2874 bdev->blockcnt = size; 2875 TAILQ_FOREACH(desc, &bdev->internal.open_descs, link) { 2876 pthread_mutex_lock(&desc->mutex); 2877 if (desc->callback.open_with_ext && !desc->closed) { 2878 desc->refs++; 2879 spdk_thread_send_msg(desc->thread, _resize_notify, desc); 2880 } 2881 pthread_mutex_unlock(&desc->mutex); 2882 } 2883 ret = 0; 2884 } 2885 2886 pthread_mutex_unlock(&bdev->internal.mutex); 2887 2888 return ret; 2889 } 2890 2891 /* 2892 * Convert I/O offset and length from bytes to blocks. 2893 * 2894 * Returns zero on success or non-zero if the byte parameters aren't divisible by the block size. 2895 */ 2896 static uint64_t 2897 bdev_bytes_to_blocks(struct spdk_bdev *bdev, uint64_t offset_bytes, uint64_t *offset_blocks, 2898 uint64_t num_bytes, uint64_t *num_blocks) 2899 { 2900 uint32_t block_size = bdev->blocklen; 2901 uint8_t shift_cnt; 2902 2903 /* Avoid expensive div operations if possible. These spdk_u32 functions are very cheap. */ 2904 if (spdk_likely(spdk_u32_is_pow2(block_size))) { 2905 shift_cnt = spdk_u32log2(block_size); 2906 *offset_blocks = offset_bytes >> shift_cnt; 2907 *num_blocks = num_bytes >> shift_cnt; 2908 return (offset_bytes - (*offset_blocks << shift_cnt)) | 2909 (num_bytes - (*num_blocks << shift_cnt)); 2910 } else { 2911 *offset_blocks = offset_bytes / block_size; 2912 *num_blocks = num_bytes / block_size; 2913 return (offset_bytes % block_size) | (num_bytes % block_size); 2914 } 2915 } 2916 2917 static bool 2918 bdev_io_valid_blocks(struct spdk_bdev *bdev, uint64_t offset_blocks, uint64_t num_blocks) 2919 { 2920 /* Return failure if offset_blocks + num_blocks is less than offset_blocks; indicates there 2921 * has been an overflow and hence the offset has been wrapped around */ 2922 if (offset_blocks + num_blocks < offset_blocks) { 2923 return false; 2924 } 2925 2926 /* Return failure if offset_blocks + num_blocks exceeds the size of the bdev */ 2927 if (offset_blocks + num_blocks > bdev->blockcnt) { 2928 return false; 2929 } 2930 2931 return true; 2932 } 2933 2934 static bool 2935 _bdev_io_check_md_buf(const struct iovec *iovs, const void *md_buf) 2936 { 2937 return _is_buf_allocated(iovs) == (md_buf != NULL); 2938 } 2939 2940 static int 2941 bdev_read_blocks_with_md(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, void *buf, 2942 void *md_buf, int64_t offset_blocks, uint64_t num_blocks, 2943 spdk_bdev_io_completion_cb cb, void *cb_arg) 2944 { 2945 struct spdk_bdev *bdev = spdk_bdev_desc_get_bdev(desc); 2946 struct spdk_bdev_io *bdev_io; 2947 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 2948 2949 if (!bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) { 2950 return -EINVAL; 2951 } 2952 2953 bdev_io = bdev_channel_get_io(channel); 2954 if (!bdev_io) { 2955 return -ENOMEM; 2956 } 2957 2958 bdev_io->internal.ch = channel; 2959 bdev_io->internal.desc = desc; 2960 bdev_io->type = SPDK_BDEV_IO_TYPE_READ; 2961 bdev_io->u.bdev.iovs = &bdev_io->iov; 2962 bdev_io->u.bdev.iovs[0].iov_base = buf; 2963 bdev_io->u.bdev.iovs[0].iov_len = num_blocks * bdev->blocklen; 2964 bdev_io->u.bdev.iovcnt = 1; 2965 bdev_io->u.bdev.md_buf = md_buf; 2966 bdev_io->u.bdev.num_blocks = num_blocks; 2967 bdev_io->u.bdev.offset_blocks = offset_blocks; 2968 bdev_io_init(bdev_io, bdev, cb_arg, cb); 2969 2970 bdev_io_submit(bdev_io); 2971 return 0; 2972 } 2973 2974 int 2975 spdk_bdev_read(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 2976 void *buf, uint64_t offset, uint64_t nbytes, 2977 spdk_bdev_io_completion_cb cb, void *cb_arg) 2978 { 2979 uint64_t offset_blocks, num_blocks; 2980 2981 if (bdev_bytes_to_blocks(spdk_bdev_desc_get_bdev(desc), offset, &offset_blocks, 2982 nbytes, &num_blocks) != 0) { 2983 return -EINVAL; 2984 } 2985 2986 return spdk_bdev_read_blocks(desc, ch, buf, offset_blocks, num_blocks, cb, cb_arg); 2987 } 2988 2989 int 2990 spdk_bdev_read_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 2991 void *buf, uint64_t offset_blocks, uint64_t num_blocks, 2992 spdk_bdev_io_completion_cb cb, void *cb_arg) 2993 { 2994 return bdev_read_blocks_with_md(desc, ch, buf, NULL, offset_blocks, num_blocks, cb, cb_arg); 2995 } 2996 2997 int 2998 spdk_bdev_read_blocks_with_md(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 2999 void *buf, void *md_buf, int64_t offset_blocks, uint64_t num_blocks, 3000 spdk_bdev_io_completion_cb cb, void *cb_arg) 3001 { 3002 struct iovec iov = { 3003 .iov_base = buf, 3004 }; 3005 3006 if (!spdk_bdev_is_md_separate(spdk_bdev_desc_get_bdev(desc))) { 3007 return -EINVAL; 3008 } 3009 3010 if (!_bdev_io_check_md_buf(&iov, md_buf)) { 3011 return -EINVAL; 3012 } 3013 3014 return bdev_read_blocks_with_md(desc, ch, buf, md_buf, offset_blocks, num_blocks, 3015 cb, cb_arg); 3016 } 3017 3018 int 3019 spdk_bdev_readv(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 3020 struct iovec *iov, int iovcnt, 3021 uint64_t offset, uint64_t nbytes, 3022 spdk_bdev_io_completion_cb cb, void *cb_arg) 3023 { 3024 uint64_t offset_blocks, num_blocks; 3025 3026 if (bdev_bytes_to_blocks(spdk_bdev_desc_get_bdev(desc), offset, &offset_blocks, 3027 nbytes, &num_blocks) != 0) { 3028 return -EINVAL; 3029 } 3030 3031 return spdk_bdev_readv_blocks(desc, ch, iov, iovcnt, offset_blocks, num_blocks, cb, cb_arg); 3032 } 3033 3034 static int 3035 bdev_readv_blocks_with_md(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 3036 struct iovec *iov, int iovcnt, void *md_buf, uint64_t offset_blocks, 3037 uint64_t num_blocks, spdk_bdev_io_completion_cb cb, void *cb_arg) 3038 { 3039 struct spdk_bdev *bdev = spdk_bdev_desc_get_bdev(desc); 3040 struct spdk_bdev_io *bdev_io; 3041 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 3042 3043 if (!bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) { 3044 return -EINVAL; 3045 } 3046 3047 bdev_io = bdev_channel_get_io(channel); 3048 if (!bdev_io) { 3049 return -ENOMEM; 3050 } 3051 3052 bdev_io->internal.ch = channel; 3053 bdev_io->internal.desc = desc; 3054 bdev_io->type = SPDK_BDEV_IO_TYPE_READ; 3055 bdev_io->u.bdev.iovs = iov; 3056 bdev_io->u.bdev.iovcnt = iovcnt; 3057 bdev_io->u.bdev.md_buf = md_buf; 3058 bdev_io->u.bdev.num_blocks = num_blocks; 3059 bdev_io->u.bdev.offset_blocks = offset_blocks; 3060 bdev_io_init(bdev_io, bdev, cb_arg, cb); 3061 3062 bdev_io_submit(bdev_io); 3063 return 0; 3064 } 3065 3066 int spdk_bdev_readv_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 3067 struct iovec *iov, int iovcnt, 3068 uint64_t offset_blocks, uint64_t num_blocks, 3069 spdk_bdev_io_completion_cb cb, void *cb_arg) 3070 { 3071 return bdev_readv_blocks_with_md(desc, ch, iov, iovcnt, NULL, offset_blocks, 3072 num_blocks, cb, cb_arg); 3073 } 3074 3075 int 3076 spdk_bdev_readv_blocks_with_md(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 3077 struct iovec *iov, int iovcnt, void *md_buf, 3078 uint64_t offset_blocks, uint64_t num_blocks, 3079 spdk_bdev_io_completion_cb cb, void *cb_arg) 3080 { 3081 if (!spdk_bdev_is_md_separate(spdk_bdev_desc_get_bdev(desc))) { 3082 return -EINVAL; 3083 } 3084 3085 if (!_bdev_io_check_md_buf(iov, md_buf)) { 3086 return -EINVAL; 3087 } 3088 3089 return bdev_readv_blocks_with_md(desc, ch, iov, iovcnt, md_buf, offset_blocks, 3090 num_blocks, cb, cb_arg); 3091 } 3092 3093 static int 3094 bdev_write_blocks_with_md(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 3095 void *buf, void *md_buf, uint64_t offset_blocks, uint64_t num_blocks, 3096 spdk_bdev_io_completion_cb cb, void *cb_arg) 3097 { 3098 struct spdk_bdev *bdev = spdk_bdev_desc_get_bdev(desc); 3099 struct spdk_bdev_io *bdev_io; 3100 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 3101 3102 if (!desc->write) { 3103 return -EBADF; 3104 } 3105 3106 if (!bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) { 3107 return -EINVAL; 3108 } 3109 3110 bdev_io = bdev_channel_get_io(channel); 3111 if (!bdev_io) { 3112 return -ENOMEM; 3113 } 3114 3115 bdev_io->internal.ch = channel; 3116 bdev_io->internal.desc = desc; 3117 bdev_io->type = SPDK_BDEV_IO_TYPE_WRITE; 3118 bdev_io->u.bdev.iovs = &bdev_io->iov; 3119 bdev_io->u.bdev.iovs[0].iov_base = buf; 3120 bdev_io->u.bdev.iovs[0].iov_len = num_blocks * bdev->blocklen; 3121 bdev_io->u.bdev.iovcnt = 1; 3122 bdev_io->u.bdev.md_buf = md_buf; 3123 bdev_io->u.bdev.num_blocks = num_blocks; 3124 bdev_io->u.bdev.offset_blocks = offset_blocks; 3125 bdev_io_init(bdev_io, bdev, cb_arg, cb); 3126 3127 bdev_io_submit(bdev_io); 3128 return 0; 3129 } 3130 3131 int 3132 spdk_bdev_write(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 3133 void *buf, uint64_t offset, uint64_t nbytes, 3134 spdk_bdev_io_completion_cb cb, void *cb_arg) 3135 { 3136 uint64_t offset_blocks, num_blocks; 3137 3138 if (bdev_bytes_to_blocks(spdk_bdev_desc_get_bdev(desc), offset, &offset_blocks, 3139 nbytes, &num_blocks) != 0) { 3140 return -EINVAL; 3141 } 3142 3143 return spdk_bdev_write_blocks(desc, ch, buf, offset_blocks, num_blocks, cb, cb_arg); 3144 } 3145 3146 int 3147 spdk_bdev_write_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 3148 void *buf, uint64_t offset_blocks, uint64_t num_blocks, 3149 spdk_bdev_io_completion_cb cb, void *cb_arg) 3150 { 3151 return bdev_write_blocks_with_md(desc, ch, buf, NULL, offset_blocks, num_blocks, 3152 cb, cb_arg); 3153 } 3154 3155 int 3156 spdk_bdev_write_blocks_with_md(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 3157 void *buf, void *md_buf, uint64_t offset_blocks, uint64_t num_blocks, 3158 spdk_bdev_io_completion_cb cb, void *cb_arg) 3159 { 3160 struct iovec iov = { 3161 .iov_base = buf, 3162 }; 3163 3164 if (!spdk_bdev_is_md_separate(spdk_bdev_desc_get_bdev(desc))) { 3165 return -EINVAL; 3166 } 3167 3168 if (!_bdev_io_check_md_buf(&iov, md_buf)) { 3169 return -EINVAL; 3170 } 3171 3172 return bdev_write_blocks_with_md(desc, ch, buf, md_buf, offset_blocks, num_blocks, 3173 cb, cb_arg); 3174 } 3175 3176 static int 3177 bdev_writev_blocks_with_md(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 3178 struct iovec *iov, int iovcnt, void *md_buf, 3179 uint64_t offset_blocks, uint64_t num_blocks, 3180 spdk_bdev_io_completion_cb cb, void *cb_arg) 3181 { 3182 struct spdk_bdev *bdev = spdk_bdev_desc_get_bdev(desc); 3183 struct spdk_bdev_io *bdev_io; 3184 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 3185 3186 if (!desc->write) { 3187 return -EBADF; 3188 } 3189 3190 if (!bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) { 3191 return -EINVAL; 3192 } 3193 3194 bdev_io = bdev_channel_get_io(channel); 3195 if (!bdev_io) { 3196 return -ENOMEM; 3197 } 3198 3199 bdev_io->internal.ch = channel; 3200 bdev_io->internal.desc = desc; 3201 bdev_io->type = SPDK_BDEV_IO_TYPE_WRITE; 3202 bdev_io->u.bdev.iovs = iov; 3203 bdev_io->u.bdev.iovcnt = iovcnt; 3204 bdev_io->u.bdev.md_buf = md_buf; 3205 bdev_io->u.bdev.num_blocks = num_blocks; 3206 bdev_io->u.bdev.offset_blocks = offset_blocks; 3207 bdev_io_init(bdev_io, bdev, cb_arg, cb); 3208 3209 bdev_io_submit(bdev_io); 3210 return 0; 3211 } 3212 3213 int 3214 spdk_bdev_writev(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 3215 struct iovec *iov, int iovcnt, 3216 uint64_t offset, uint64_t len, 3217 spdk_bdev_io_completion_cb cb, void *cb_arg) 3218 { 3219 uint64_t offset_blocks, num_blocks; 3220 3221 if (bdev_bytes_to_blocks(spdk_bdev_desc_get_bdev(desc), offset, &offset_blocks, 3222 len, &num_blocks) != 0) { 3223 return -EINVAL; 3224 } 3225 3226 return spdk_bdev_writev_blocks(desc, ch, iov, iovcnt, offset_blocks, num_blocks, cb, cb_arg); 3227 } 3228 3229 int 3230 spdk_bdev_writev_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 3231 struct iovec *iov, int iovcnt, 3232 uint64_t offset_blocks, uint64_t num_blocks, 3233 spdk_bdev_io_completion_cb cb, void *cb_arg) 3234 { 3235 return bdev_writev_blocks_with_md(desc, ch, iov, iovcnt, NULL, offset_blocks, 3236 num_blocks, cb, cb_arg); 3237 } 3238 3239 int 3240 spdk_bdev_writev_blocks_with_md(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 3241 struct iovec *iov, int iovcnt, void *md_buf, 3242 uint64_t offset_blocks, uint64_t num_blocks, 3243 spdk_bdev_io_completion_cb cb, void *cb_arg) 3244 { 3245 if (!spdk_bdev_is_md_separate(spdk_bdev_desc_get_bdev(desc))) { 3246 return -EINVAL; 3247 } 3248 3249 if (!_bdev_io_check_md_buf(iov, md_buf)) { 3250 return -EINVAL; 3251 } 3252 3253 return bdev_writev_blocks_with_md(desc, ch, iov, iovcnt, md_buf, offset_blocks, 3254 num_blocks, cb, cb_arg); 3255 } 3256 3257 static void 3258 bdev_zcopy_get_buf(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io, bool success) 3259 { 3260 if (!success) { 3261 /* Don't use spdk_bdev_io_complete here - this bdev_io was never actually submitted. */ 3262 bdev_io->internal.status = SPDK_BDEV_IO_STATUS_NOMEM; 3263 bdev_io->internal.cb(bdev_io, success, bdev_io->internal.caller_ctx); 3264 return; 3265 } 3266 3267 if (bdev_io->u.bdev.zcopy.populate) { 3268 /* Read the real data into the buffer */ 3269 bdev_io->type = SPDK_BDEV_IO_TYPE_READ; 3270 bdev_io->internal.status = SPDK_BDEV_IO_STATUS_PENDING; 3271 bdev_io_submit(bdev_io); 3272 return; 3273 } 3274 3275 /* Don't use spdk_bdev_io_complete here - this bdev_io was never actually submitted. */ 3276 bdev_io->internal.status = SPDK_BDEV_IO_STATUS_SUCCESS; 3277 bdev_io->internal.cb(bdev_io, success, bdev_io->internal.caller_ctx); 3278 } 3279 3280 int 3281 spdk_bdev_zcopy_start(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 3282 uint64_t offset_blocks, uint64_t num_blocks, 3283 bool populate, 3284 spdk_bdev_io_completion_cb cb, void *cb_arg) 3285 { 3286 struct spdk_bdev *bdev = spdk_bdev_desc_get_bdev(desc); 3287 struct spdk_bdev_io *bdev_io; 3288 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 3289 3290 if (!desc->write) { 3291 return -EBADF; 3292 } 3293 3294 if (!bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) { 3295 return -EINVAL; 3296 } 3297 3298 if (!spdk_bdev_io_type_supported(bdev, SPDK_BDEV_IO_TYPE_ZCOPY)) { 3299 return -ENOTSUP; 3300 } 3301 3302 bdev_io = bdev_channel_get_io(channel); 3303 if (!bdev_io) { 3304 return -ENOMEM; 3305 } 3306 3307 bdev_io->internal.ch = channel; 3308 bdev_io->internal.desc = desc; 3309 bdev_io->type = SPDK_BDEV_IO_TYPE_ZCOPY; 3310 bdev_io->u.bdev.num_blocks = num_blocks; 3311 bdev_io->u.bdev.offset_blocks = offset_blocks; 3312 bdev_io->u.bdev.iovs = NULL; 3313 bdev_io->u.bdev.iovcnt = 0; 3314 bdev_io->u.bdev.md_buf = NULL; 3315 bdev_io->u.bdev.zcopy.populate = populate ? 1 : 0; 3316 bdev_io->u.bdev.zcopy.commit = 0; 3317 bdev_io->u.bdev.zcopy.start = 1; 3318 bdev_io_init(bdev_io, bdev, cb_arg, cb); 3319 3320 if (bdev_io_type_supported(bdev, SPDK_BDEV_IO_TYPE_ZCOPY)) { 3321 bdev_io_submit(bdev_io); 3322 } else { 3323 /* Emulate zcopy by allocating a buffer */ 3324 spdk_bdev_io_get_buf(bdev_io, bdev_zcopy_get_buf, 3325 bdev_io->u.bdev.num_blocks * bdev->blocklen); 3326 } 3327 3328 return 0; 3329 } 3330 3331 int 3332 spdk_bdev_zcopy_end(struct spdk_bdev_io *bdev_io, bool commit, 3333 spdk_bdev_io_completion_cb cb, void *cb_arg) 3334 { 3335 struct spdk_bdev *bdev = bdev_io->bdev; 3336 3337 if (bdev_io->type == SPDK_BDEV_IO_TYPE_READ) { 3338 /* This can happen if the zcopy was emulated in start */ 3339 if (bdev_io->u.bdev.zcopy.start != 1) { 3340 return -EINVAL; 3341 } 3342 bdev_io->type = SPDK_BDEV_IO_TYPE_ZCOPY; 3343 } 3344 3345 if (bdev_io->type != SPDK_BDEV_IO_TYPE_ZCOPY) { 3346 return -EINVAL; 3347 } 3348 3349 bdev_io->u.bdev.zcopy.commit = commit ? 1 : 0; 3350 bdev_io->u.bdev.zcopy.start = 0; 3351 bdev_io->internal.caller_ctx = cb_arg; 3352 bdev_io->internal.cb = cb; 3353 bdev_io->internal.status = SPDK_BDEV_IO_STATUS_PENDING; 3354 3355 if (bdev_io_type_supported(bdev, SPDK_BDEV_IO_TYPE_ZCOPY)) { 3356 bdev_io_submit(bdev_io); 3357 return 0; 3358 } 3359 3360 if (!bdev_io->u.bdev.zcopy.commit) { 3361 /* Don't use spdk_bdev_io_complete here - this bdev_io was never actually submitted. */ 3362 bdev_io->internal.status = SPDK_BDEV_IO_STATUS_SUCCESS; 3363 bdev_io->internal.cb(bdev_io, true, bdev_io->internal.caller_ctx); 3364 return 0; 3365 } 3366 3367 bdev_io->type = SPDK_BDEV_IO_TYPE_WRITE; 3368 bdev_io_submit(bdev_io); 3369 3370 return 0; 3371 } 3372 3373 int 3374 spdk_bdev_write_zeroes(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 3375 uint64_t offset, uint64_t len, 3376 spdk_bdev_io_completion_cb cb, void *cb_arg) 3377 { 3378 uint64_t offset_blocks, num_blocks; 3379 3380 if (bdev_bytes_to_blocks(spdk_bdev_desc_get_bdev(desc), offset, &offset_blocks, 3381 len, &num_blocks) != 0) { 3382 return -EINVAL; 3383 } 3384 3385 return spdk_bdev_write_zeroes_blocks(desc, ch, offset_blocks, num_blocks, cb, cb_arg); 3386 } 3387 3388 int 3389 spdk_bdev_write_zeroes_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 3390 uint64_t offset_blocks, uint64_t num_blocks, 3391 spdk_bdev_io_completion_cb cb, void *cb_arg) 3392 { 3393 struct spdk_bdev *bdev = spdk_bdev_desc_get_bdev(desc); 3394 struct spdk_bdev_io *bdev_io; 3395 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 3396 3397 if (!desc->write) { 3398 return -EBADF; 3399 } 3400 3401 if (!bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) { 3402 return -EINVAL; 3403 } 3404 3405 if (!bdev_io_type_supported(bdev, SPDK_BDEV_IO_TYPE_WRITE_ZEROES) && 3406 !bdev_io_type_supported(bdev, SPDK_BDEV_IO_TYPE_WRITE)) { 3407 return -ENOTSUP; 3408 } 3409 3410 bdev_io = bdev_channel_get_io(channel); 3411 3412 if (!bdev_io) { 3413 return -ENOMEM; 3414 } 3415 3416 bdev_io->type = SPDK_BDEV_IO_TYPE_WRITE_ZEROES; 3417 bdev_io->internal.ch = channel; 3418 bdev_io->internal.desc = desc; 3419 bdev_io->u.bdev.offset_blocks = offset_blocks; 3420 bdev_io->u.bdev.num_blocks = num_blocks; 3421 bdev_io_init(bdev_io, bdev, cb_arg, cb); 3422 3423 if (bdev_io_type_supported(bdev, SPDK_BDEV_IO_TYPE_WRITE_ZEROES)) { 3424 bdev_io_submit(bdev_io); 3425 return 0; 3426 } 3427 3428 assert(bdev_io_type_supported(bdev, SPDK_BDEV_IO_TYPE_WRITE)); 3429 assert(_bdev_get_block_size_with_md(bdev) <= ZERO_BUFFER_SIZE); 3430 bdev_io->u.bdev.split_remaining_num_blocks = num_blocks; 3431 bdev_io->u.bdev.split_current_offset_blocks = offset_blocks; 3432 bdev_write_zero_buffer_next(bdev_io); 3433 3434 return 0; 3435 } 3436 3437 int 3438 spdk_bdev_unmap(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 3439 uint64_t offset, uint64_t nbytes, 3440 spdk_bdev_io_completion_cb cb, void *cb_arg) 3441 { 3442 uint64_t offset_blocks, num_blocks; 3443 3444 if (bdev_bytes_to_blocks(spdk_bdev_desc_get_bdev(desc), offset, &offset_blocks, 3445 nbytes, &num_blocks) != 0) { 3446 return -EINVAL; 3447 } 3448 3449 return spdk_bdev_unmap_blocks(desc, ch, offset_blocks, num_blocks, cb, cb_arg); 3450 } 3451 3452 int 3453 spdk_bdev_unmap_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 3454 uint64_t offset_blocks, uint64_t num_blocks, 3455 spdk_bdev_io_completion_cb cb, void *cb_arg) 3456 { 3457 struct spdk_bdev *bdev = spdk_bdev_desc_get_bdev(desc); 3458 struct spdk_bdev_io *bdev_io; 3459 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 3460 3461 if (!desc->write) { 3462 return -EBADF; 3463 } 3464 3465 if (!bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) { 3466 return -EINVAL; 3467 } 3468 3469 if (num_blocks == 0) { 3470 SPDK_ERRLOG("Can't unmap 0 bytes\n"); 3471 return -EINVAL; 3472 } 3473 3474 bdev_io = bdev_channel_get_io(channel); 3475 if (!bdev_io) { 3476 return -ENOMEM; 3477 } 3478 3479 bdev_io->internal.ch = channel; 3480 bdev_io->internal.desc = desc; 3481 bdev_io->type = SPDK_BDEV_IO_TYPE_UNMAP; 3482 3483 bdev_io->u.bdev.iovs = &bdev_io->iov; 3484 bdev_io->u.bdev.iovs[0].iov_base = NULL; 3485 bdev_io->u.bdev.iovs[0].iov_len = 0; 3486 bdev_io->u.bdev.iovcnt = 1; 3487 3488 bdev_io->u.bdev.offset_blocks = offset_blocks; 3489 bdev_io->u.bdev.num_blocks = num_blocks; 3490 bdev_io_init(bdev_io, bdev, cb_arg, cb); 3491 3492 bdev_io_submit(bdev_io); 3493 return 0; 3494 } 3495 3496 int 3497 spdk_bdev_flush(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 3498 uint64_t offset, uint64_t length, 3499 spdk_bdev_io_completion_cb cb, void *cb_arg) 3500 { 3501 uint64_t offset_blocks, num_blocks; 3502 3503 if (bdev_bytes_to_blocks(spdk_bdev_desc_get_bdev(desc), offset, &offset_blocks, 3504 length, &num_blocks) != 0) { 3505 return -EINVAL; 3506 } 3507 3508 return spdk_bdev_flush_blocks(desc, ch, offset_blocks, num_blocks, cb, cb_arg); 3509 } 3510 3511 int 3512 spdk_bdev_flush_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 3513 uint64_t offset_blocks, uint64_t num_blocks, 3514 spdk_bdev_io_completion_cb cb, void *cb_arg) 3515 { 3516 struct spdk_bdev *bdev = spdk_bdev_desc_get_bdev(desc); 3517 struct spdk_bdev_io *bdev_io; 3518 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 3519 3520 if (!desc->write) { 3521 return -EBADF; 3522 } 3523 3524 if (!bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) { 3525 return -EINVAL; 3526 } 3527 3528 bdev_io = bdev_channel_get_io(channel); 3529 if (!bdev_io) { 3530 return -ENOMEM; 3531 } 3532 3533 bdev_io->internal.ch = channel; 3534 bdev_io->internal.desc = desc; 3535 bdev_io->type = SPDK_BDEV_IO_TYPE_FLUSH; 3536 bdev_io->u.bdev.iovs = NULL; 3537 bdev_io->u.bdev.iovcnt = 0; 3538 bdev_io->u.bdev.offset_blocks = offset_blocks; 3539 bdev_io->u.bdev.num_blocks = num_blocks; 3540 bdev_io_init(bdev_io, bdev, cb_arg, cb); 3541 3542 bdev_io_submit(bdev_io); 3543 return 0; 3544 } 3545 3546 static void 3547 bdev_reset_dev(struct spdk_io_channel_iter *i, int status) 3548 { 3549 struct spdk_bdev_channel *ch = spdk_io_channel_iter_get_ctx(i); 3550 struct spdk_bdev_io *bdev_io; 3551 3552 bdev_io = TAILQ_FIRST(&ch->queued_resets); 3553 TAILQ_REMOVE(&ch->queued_resets, bdev_io, internal.link); 3554 bdev_io_submit_reset(bdev_io); 3555 } 3556 3557 static void 3558 bdev_reset_freeze_channel(struct spdk_io_channel_iter *i) 3559 { 3560 struct spdk_io_channel *ch; 3561 struct spdk_bdev_channel *channel; 3562 struct spdk_bdev_mgmt_channel *mgmt_channel; 3563 struct spdk_bdev_shared_resource *shared_resource; 3564 bdev_io_tailq_t tmp_queued; 3565 3566 TAILQ_INIT(&tmp_queued); 3567 3568 ch = spdk_io_channel_iter_get_channel(i); 3569 channel = spdk_io_channel_get_ctx(ch); 3570 shared_resource = channel->shared_resource; 3571 mgmt_channel = shared_resource->mgmt_ch; 3572 3573 channel->flags |= BDEV_CH_RESET_IN_PROGRESS; 3574 3575 if ((channel->flags & BDEV_CH_QOS_ENABLED) != 0) { 3576 /* The QoS object is always valid and readable while 3577 * the channel flag is set, so the lock here should not 3578 * be necessary. We're not in the fast path though, so 3579 * just take it anyway. */ 3580 pthread_mutex_lock(&channel->bdev->internal.mutex); 3581 if (channel->bdev->internal.qos->ch == channel) { 3582 TAILQ_SWAP(&channel->bdev->internal.qos->queued, &tmp_queued, spdk_bdev_io, internal.link); 3583 } 3584 pthread_mutex_unlock(&channel->bdev->internal.mutex); 3585 } 3586 3587 bdev_abort_queued_io(&shared_resource->nomem_io, channel); 3588 bdev_abort_buf_io(&mgmt_channel->need_buf_small, channel); 3589 bdev_abort_buf_io(&mgmt_channel->need_buf_large, channel); 3590 bdev_abort_queued_io(&tmp_queued, channel); 3591 3592 spdk_for_each_channel_continue(i, 0); 3593 } 3594 3595 static void 3596 bdev_start_reset(void *ctx) 3597 { 3598 struct spdk_bdev_channel *ch = ctx; 3599 3600 spdk_for_each_channel(__bdev_to_io_dev(ch->bdev), bdev_reset_freeze_channel, 3601 ch, bdev_reset_dev); 3602 } 3603 3604 static void 3605 bdev_channel_start_reset(struct spdk_bdev_channel *ch) 3606 { 3607 struct spdk_bdev *bdev = ch->bdev; 3608 3609 assert(!TAILQ_EMPTY(&ch->queued_resets)); 3610 3611 pthread_mutex_lock(&bdev->internal.mutex); 3612 if (bdev->internal.reset_in_progress == NULL) { 3613 bdev->internal.reset_in_progress = TAILQ_FIRST(&ch->queued_resets); 3614 /* 3615 * Take a channel reference for the target bdev for the life of this 3616 * reset. This guards against the channel getting destroyed while 3617 * spdk_for_each_channel() calls related to this reset IO are in 3618 * progress. We will release the reference when this reset is 3619 * completed. 3620 */ 3621 bdev->internal.reset_in_progress->u.reset.ch_ref = spdk_get_io_channel(__bdev_to_io_dev(bdev)); 3622 bdev_start_reset(ch); 3623 } 3624 pthread_mutex_unlock(&bdev->internal.mutex); 3625 } 3626 3627 int 3628 spdk_bdev_reset(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 3629 spdk_bdev_io_completion_cb cb, void *cb_arg) 3630 { 3631 struct spdk_bdev *bdev = spdk_bdev_desc_get_bdev(desc); 3632 struct spdk_bdev_io *bdev_io; 3633 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 3634 3635 bdev_io = bdev_channel_get_io(channel); 3636 if (!bdev_io) { 3637 return -ENOMEM; 3638 } 3639 3640 bdev_io->internal.ch = channel; 3641 bdev_io->internal.desc = desc; 3642 bdev_io->internal.submit_tsc = spdk_get_ticks(); 3643 bdev_io->type = SPDK_BDEV_IO_TYPE_RESET; 3644 bdev_io->u.reset.ch_ref = NULL; 3645 bdev_io_init(bdev_io, bdev, cb_arg, cb); 3646 3647 pthread_mutex_lock(&bdev->internal.mutex); 3648 TAILQ_INSERT_TAIL(&channel->queued_resets, bdev_io, internal.link); 3649 pthread_mutex_unlock(&bdev->internal.mutex); 3650 3651 TAILQ_INSERT_TAIL(&bdev_io->internal.ch->io_submitted, bdev_io, 3652 internal.ch_link); 3653 3654 bdev_channel_start_reset(channel); 3655 3656 return 0; 3657 } 3658 3659 void 3660 spdk_bdev_get_io_stat(struct spdk_bdev *bdev, struct spdk_io_channel *ch, 3661 struct spdk_bdev_io_stat *stat) 3662 { 3663 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 3664 3665 *stat = channel->stat; 3666 } 3667 3668 static void 3669 bdev_get_device_stat_done(struct spdk_io_channel_iter *i, int status) 3670 { 3671 void *io_device = spdk_io_channel_iter_get_io_device(i); 3672 struct spdk_bdev_iostat_ctx *bdev_iostat_ctx = spdk_io_channel_iter_get_ctx(i); 3673 3674 bdev_iostat_ctx->cb(__bdev_from_io_dev(io_device), bdev_iostat_ctx->stat, 3675 bdev_iostat_ctx->cb_arg, 0); 3676 free(bdev_iostat_ctx); 3677 } 3678 3679 static void 3680 bdev_get_each_channel_stat(struct spdk_io_channel_iter *i) 3681 { 3682 struct spdk_bdev_iostat_ctx *bdev_iostat_ctx = spdk_io_channel_iter_get_ctx(i); 3683 struct spdk_io_channel *ch = spdk_io_channel_iter_get_channel(i); 3684 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 3685 3686 bdev_io_stat_add(bdev_iostat_ctx->stat, &channel->stat); 3687 spdk_for_each_channel_continue(i, 0); 3688 } 3689 3690 void 3691 spdk_bdev_get_device_stat(struct spdk_bdev *bdev, struct spdk_bdev_io_stat *stat, 3692 spdk_bdev_get_device_stat_cb cb, void *cb_arg) 3693 { 3694 struct spdk_bdev_iostat_ctx *bdev_iostat_ctx; 3695 3696 assert(bdev != NULL); 3697 assert(stat != NULL); 3698 assert(cb != NULL); 3699 3700 bdev_iostat_ctx = calloc(1, sizeof(struct spdk_bdev_iostat_ctx)); 3701 if (bdev_iostat_ctx == NULL) { 3702 SPDK_ERRLOG("Unable to allocate memory for spdk_bdev_iostat_ctx\n"); 3703 cb(bdev, stat, cb_arg, -ENOMEM); 3704 return; 3705 } 3706 3707 bdev_iostat_ctx->stat = stat; 3708 bdev_iostat_ctx->cb = cb; 3709 bdev_iostat_ctx->cb_arg = cb_arg; 3710 3711 /* Start with the statistics from previously deleted channels. */ 3712 pthread_mutex_lock(&bdev->internal.mutex); 3713 bdev_io_stat_add(bdev_iostat_ctx->stat, &bdev->internal.stat); 3714 pthread_mutex_unlock(&bdev->internal.mutex); 3715 3716 /* Then iterate and add the statistics from each existing channel. */ 3717 spdk_for_each_channel(__bdev_to_io_dev(bdev), 3718 bdev_get_each_channel_stat, 3719 bdev_iostat_ctx, 3720 bdev_get_device_stat_done); 3721 } 3722 3723 int 3724 spdk_bdev_nvme_admin_passthru(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 3725 const struct spdk_nvme_cmd *cmd, void *buf, size_t nbytes, 3726 spdk_bdev_io_completion_cb cb, void *cb_arg) 3727 { 3728 struct spdk_bdev *bdev = spdk_bdev_desc_get_bdev(desc); 3729 struct spdk_bdev_io *bdev_io; 3730 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 3731 3732 if (!desc->write) { 3733 return -EBADF; 3734 } 3735 3736 bdev_io = bdev_channel_get_io(channel); 3737 if (!bdev_io) { 3738 return -ENOMEM; 3739 } 3740 3741 bdev_io->internal.ch = channel; 3742 bdev_io->internal.desc = desc; 3743 bdev_io->type = SPDK_BDEV_IO_TYPE_NVME_ADMIN; 3744 bdev_io->u.nvme_passthru.cmd = *cmd; 3745 bdev_io->u.nvme_passthru.buf = buf; 3746 bdev_io->u.nvme_passthru.nbytes = nbytes; 3747 bdev_io->u.nvme_passthru.md_buf = NULL; 3748 bdev_io->u.nvme_passthru.md_len = 0; 3749 3750 bdev_io_init(bdev_io, bdev, cb_arg, cb); 3751 3752 bdev_io_submit(bdev_io); 3753 return 0; 3754 } 3755 3756 int 3757 spdk_bdev_nvme_io_passthru(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 3758 const struct spdk_nvme_cmd *cmd, void *buf, size_t nbytes, 3759 spdk_bdev_io_completion_cb cb, void *cb_arg) 3760 { 3761 struct spdk_bdev *bdev = spdk_bdev_desc_get_bdev(desc); 3762 struct spdk_bdev_io *bdev_io; 3763 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 3764 3765 if (!desc->write) { 3766 /* 3767 * Do not try to parse the NVMe command - we could maybe use bits in the opcode 3768 * to easily determine if the command is a read or write, but for now just 3769 * do not allow io_passthru with a read-only descriptor. 3770 */ 3771 return -EBADF; 3772 } 3773 3774 bdev_io = bdev_channel_get_io(channel); 3775 if (!bdev_io) { 3776 return -ENOMEM; 3777 } 3778 3779 bdev_io->internal.ch = channel; 3780 bdev_io->internal.desc = desc; 3781 bdev_io->type = SPDK_BDEV_IO_TYPE_NVME_IO; 3782 bdev_io->u.nvme_passthru.cmd = *cmd; 3783 bdev_io->u.nvme_passthru.buf = buf; 3784 bdev_io->u.nvme_passthru.nbytes = nbytes; 3785 bdev_io->u.nvme_passthru.md_buf = NULL; 3786 bdev_io->u.nvme_passthru.md_len = 0; 3787 3788 bdev_io_init(bdev_io, bdev, cb_arg, cb); 3789 3790 bdev_io_submit(bdev_io); 3791 return 0; 3792 } 3793 3794 int 3795 spdk_bdev_nvme_io_passthru_md(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 3796 const struct spdk_nvme_cmd *cmd, void *buf, size_t nbytes, void *md_buf, size_t md_len, 3797 spdk_bdev_io_completion_cb cb, void *cb_arg) 3798 { 3799 struct spdk_bdev *bdev = spdk_bdev_desc_get_bdev(desc); 3800 struct spdk_bdev_io *bdev_io; 3801 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 3802 3803 if (!desc->write) { 3804 /* 3805 * Do not try to parse the NVMe command - we could maybe use bits in the opcode 3806 * to easily determine if the command is a read or write, but for now just 3807 * do not allow io_passthru with a read-only descriptor. 3808 */ 3809 return -EBADF; 3810 } 3811 3812 bdev_io = bdev_channel_get_io(channel); 3813 if (!bdev_io) { 3814 return -ENOMEM; 3815 } 3816 3817 bdev_io->internal.ch = channel; 3818 bdev_io->internal.desc = desc; 3819 bdev_io->type = SPDK_BDEV_IO_TYPE_NVME_IO_MD; 3820 bdev_io->u.nvme_passthru.cmd = *cmd; 3821 bdev_io->u.nvme_passthru.buf = buf; 3822 bdev_io->u.nvme_passthru.nbytes = nbytes; 3823 bdev_io->u.nvme_passthru.md_buf = md_buf; 3824 bdev_io->u.nvme_passthru.md_len = md_len; 3825 3826 bdev_io_init(bdev_io, bdev, cb_arg, cb); 3827 3828 bdev_io_submit(bdev_io); 3829 return 0; 3830 } 3831 3832 int 3833 spdk_bdev_queue_io_wait(struct spdk_bdev *bdev, struct spdk_io_channel *ch, 3834 struct spdk_bdev_io_wait_entry *entry) 3835 { 3836 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 3837 struct spdk_bdev_mgmt_channel *mgmt_ch = channel->shared_resource->mgmt_ch; 3838 3839 if (bdev != entry->bdev) { 3840 SPDK_ERRLOG("bdevs do not match\n"); 3841 return -EINVAL; 3842 } 3843 3844 if (mgmt_ch->per_thread_cache_count > 0) { 3845 SPDK_ERRLOG("Cannot queue io_wait if spdk_bdev_io available in per-thread cache\n"); 3846 return -EINVAL; 3847 } 3848 3849 TAILQ_INSERT_TAIL(&mgmt_ch->io_wait_queue, entry, link); 3850 return 0; 3851 } 3852 3853 static void 3854 bdev_ch_retry_io(struct spdk_bdev_channel *bdev_ch) 3855 { 3856 struct spdk_bdev *bdev = bdev_ch->bdev; 3857 struct spdk_bdev_shared_resource *shared_resource = bdev_ch->shared_resource; 3858 struct spdk_bdev_io *bdev_io; 3859 3860 if (shared_resource->io_outstanding > shared_resource->nomem_threshold) { 3861 /* 3862 * Allow some more I/O to complete before retrying the nomem_io queue. 3863 * Some drivers (such as nvme) cannot immediately take a new I/O in 3864 * the context of a completion, because the resources for the I/O are 3865 * not released until control returns to the bdev poller. Also, we 3866 * may require several small I/O to complete before a larger I/O 3867 * (that requires splitting) can be submitted. 3868 */ 3869 return; 3870 } 3871 3872 while (!TAILQ_EMPTY(&shared_resource->nomem_io)) { 3873 bdev_io = TAILQ_FIRST(&shared_resource->nomem_io); 3874 TAILQ_REMOVE(&shared_resource->nomem_io, bdev_io, internal.link); 3875 bdev_io->internal.ch->io_outstanding++; 3876 shared_resource->io_outstanding++; 3877 bdev_io->internal.status = SPDK_BDEV_IO_STATUS_PENDING; 3878 bdev_io->internal.error.nvme.cdw0 = 0; 3879 bdev->fn_table->submit_request(spdk_bdev_io_get_io_channel(bdev_io), bdev_io); 3880 if (bdev_io->internal.status == SPDK_BDEV_IO_STATUS_NOMEM) { 3881 break; 3882 } 3883 } 3884 } 3885 3886 static inline void 3887 bdev_io_complete(void *ctx) 3888 { 3889 struct spdk_bdev_io *bdev_io = ctx; 3890 struct spdk_bdev_channel *bdev_ch = bdev_io->internal.ch; 3891 uint64_t tsc, tsc_diff; 3892 3893 if (spdk_unlikely(bdev_io->internal.in_submit_request || bdev_io->internal.io_submit_ch)) { 3894 /* 3895 * Send the completion to the thread that originally submitted the I/O, 3896 * which may not be the current thread in the case of QoS. 3897 */ 3898 if (bdev_io->internal.io_submit_ch) { 3899 bdev_io->internal.ch = bdev_io->internal.io_submit_ch; 3900 bdev_io->internal.io_submit_ch = NULL; 3901 } 3902 3903 /* 3904 * Defer completion to avoid potential infinite recursion if the 3905 * user's completion callback issues a new I/O. 3906 */ 3907 spdk_thread_send_msg(spdk_bdev_io_get_thread(bdev_io), 3908 bdev_io_complete, bdev_io); 3909 return; 3910 } 3911 3912 tsc = spdk_get_ticks(); 3913 tsc_diff = tsc - bdev_io->internal.submit_tsc; 3914 spdk_trace_record_tsc(tsc, TRACE_BDEV_IO_DONE, 0, 0, (uintptr_t)bdev_io, 0); 3915 /* When a bdev_io is split, the children bdev_io are not added 3916 * to the io_submitted list. So don't try to remove them in that 3917 * case. 3918 */ 3919 if (bdev_io->internal.cb != bdev_io_split_done) { 3920 TAILQ_REMOVE(&bdev_ch->io_submitted, bdev_io, internal.ch_link); 3921 } 3922 3923 if (bdev_io->internal.ch->histogram) { 3924 spdk_histogram_data_tally(bdev_io->internal.ch->histogram, tsc_diff); 3925 } 3926 3927 if (bdev_io->internal.status == SPDK_BDEV_IO_STATUS_SUCCESS) { 3928 switch (bdev_io->type) { 3929 case SPDK_BDEV_IO_TYPE_READ: 3930 bdev_io->internal.ch->stat.bytes_read += bdev_io->u.bdev.num_blocks * bdev_io->bdev->blocklen; 3931 bdev_io->internal.ch->stat.num_read_ops++; 3932 bdev_io->internal.ch->stat.read_latency_ticks += tsc_diff; 3933 break; 3934 case SPDK_BDEV_IO_TYPE_WRITE: 3935 bdev_io->internal.ch->stat.bytes_written += bdev_io->u.bdev.num_blocks * bdev_io->bdev->blocklen; 3936 bdev_io->internal.ch->stat.num_write_ops++; 3937 bdev_io->internal.ch->stat.write_latency_ticks += tsc_diff; 3938 break; 3939 case SPDK_BDEV_IO_TYPE_UNMAP: 3940 bdev_io->internal.ch->stat.bytes_unmapped += bdev_io->u.bdev.num_blocks * bdev_io->bdev->blocklen; 3941 bdev_io->internal.ch->stat.num_unmap_ops++; 3942 bdev_io->internal.ch->stat.unmap_latency_ticks += tsc_diff; 3943 default: 3944 break; 3945 } 3946 } 3947 3948 #ifdef SPDK_CONFIG_VTUNE 3949 uint64_t now_tsc = spdk_get_ticks(); 3950 if (now_tsc > (bdev_io->internal.ch->start_tsc + bdev_io->internal.ch->interval_tsc)) { 3951 uint64_t data[5]; 3952 3953 data[0] = bdev_io->internal.ch->stat.num_read_ops - bdev_io->internal.ch->prev_stat.num_read_ops; 3954 data[1] = bdev_io->internal.ch->stat.bytes_read - bdev_io->internal.ch->prev_stat.bytes_read; 3955 data[2] = bdev_io->internal.ch->stat.num_write_ops - bdev_io->internal.ch->prev_stat.num_write_ops; 3956 data[3] = bdev_io->internal.ch->stat.bytes_written - bdev_io->internal.ch->prev_stat.bytes_written; 3957 data[4] = bdev_io->bdev->fn_table->get_spin_time ? 3958 bdev_io->bdev->fn_table->get_spin_time(spdk_bdev_io_get_io_channel(bdev_io)) : 0; 3959 3960 __itt_metadata_add(g_bdev_mgr.domain, __itt_null, bdev_io->internal.ch->handle, 3961 __itt_metadata_u64, 5, data); 3962 3963 bdev_io->internal.ch->prev_stat = bdev_io->internal.ch->stat; 3964 bdev_io->internal.ch->start_tsc = now_tsc; 3965 } 3966 #endif 3967 3968 assert(bdev_io->internal.cb != NULL); 3969 assert(spdk_get_thread() == spdk_bdev_io_get_thread(bdev_io)); 3970 3971 bdev_io->internal.cb(bdev_io, bdev_io->internal.status == SPDK_BDEV_IO_STATUS_SUCCESS, 3972 bdev_io->internal.caller_ctx); 3973 } 3974 3975 static void 3976 bdev_reset_complete(struct spdk_io_channel_iter *i, int status) 3977 { 3978 struct spdk_bdev_io *bdev_io = spdk_io_channel_iter_get_ctx(i); 3979 3980 if (bdev_io->u.reset.ch_ref != NULL) { 3981 spdk_put_io_channel(bdev_io->u.reset.ch_ref); 3982 bdev_io->u.reset.ch_ref = NULL; 3983 } 3984 3985 bdev_io_complete(bdev_io); 3986 } 3987 3988 static void 3989 bdev_unfreeze_channel(struct spdk_io_channel_iter *i) 3990 { 3991 struct spdk_bdev_io *bdev_io = spdk_io_channel_iter_get_ctx(i); 3992 struct spdk_io_channel *_ch = spdk_io_channel_iter_get_channel(i); 3993 struct spdk_bdev_channel *ch = spdk_io_channel_get_ctx(_ch); 3994 struct spdk_bdev_io *queued_reset; 3995 3996 ch->flags &= ~BDEV_CH_RESET_IN_PROGRESS; 3997 while (!TAILQ_EMPTY(&ch->queued_resets)) { 3998 queued_reset = TAILQ_FIRST(&ch->queued_resets); 3999 TAILQ_REMOVE(&ch->queued_resets, queued_reset, internal.link); 4000 spdk_bdev_io_complete(queued_reset, bdev_io->internal.status); 4001 } 4002 4003 spdk_for_each_channel_continue(i, 0); 4004 } 4005 4006 void 4007 spdk_bdev_io_complete(struct spdk_bdev_io *bdev_io, enum spdk_bdev_io_status status) 4008 { 4009 struct spdk_bdev *bdev = bdev_io->bdev; 4010 struct spdk_bdev_channel *bdev_ch = bdev_io->internal.ch; 4011 struct spdk_bdev_shared_resource *shared_resource = bdev_ch->shared_resource; 4012 4013 bdev_io->internal.status = status; 4014 4015 if (spdk_unlikely(bdev_io->type == SPDK_BDEV_IO_TYPE_RESET)) { 4016 bool unlock_channels = false; 4017 4018 if (status == SPDK_BDEV_IO_STATUS_NOMEM) { 4019 SPDK_ERRLOG("NOMEM returned for reset\n"); 4020 } 4021 pthread_mutex_lock(&bdev->internal.mutex); 4022 if (bdev_io == bdev->internal.reset_in_progress) { 4023 bdev->internal.reset_in_progress = NULL; 4024 unlock_channels = true; 4025 } 4026 pthread_mutex_unlock(&bdev->internal.mutex); 4027 4028 if (unlock_channels) { 4029 spdk_for_each_channel(__bdev_to_io_dev(bdev), bdev_unfreeze_channel, 4030 bdev_io, bdev_reset_complete); 4031 return; 4032 } 4033 } else { 4034 _bdev_io_unset_bounce_buf(bdev_io); 4035 4036 assert(bdev_ch->io_outstanding > 0); 4037 assert(shared_resource->io_outstanding > 0); 4038 bdev_ch->io_outstanding--; 4039 shared_resource->io_outstanding--; 4040 4041 if (spdk_unlikely(status == SPDK_BDEV_IO_STATUS_NOMEM)) { 4042 TAILQ_INSERT_HEAD(&shared_resource->nomem_io, bdev_io, internal.link); 4043 /* 4044 * Wait for some of the outstanding I/O to complete before we 4045 * retry any of the nomem_io. Normally we will wait for 4046 * NOMEM_THRESHOLD_COUNT I/O to complete but for low queue 4047 * depth channels we will instead wait for half to complete. 4048 */ 4049 shared_resource->nomem_threshold = spdk_max((int64_t)shared_resource->io_outstanding / 2, 4050 (int64_t)shared_resource->io_outstanding - NOMEM_THRESHOLD_COUNT); 4051 return; 4052 } 4053 4054 if (spdk_unlikely(!TAILQ_EMPTY(&shared_resource->nomem_io))) { 4055 bdev_ch_retry_io(bdev_ch); 4056 } 4057 } 4058 4059 bdev_io_complete(bdev_io); 4060 } 4061 4062 void 4063 spdk_bdev_io_complete_scsi_status(struct spdk_bdev_io *bdev_io, enum spdk_scsi_status sc, 4064 enum spdk_scsi_sense sk, uint8_t asc, uint8_t ascq) 4065 { 4066 if (sc == SPDK_SCSI_STATUS_GOOD) { 4067 bdev_io->internal.status = SPDK_BDEV_IO_STATUS_SUCCESS; 4068 } else { 4069 bdev_io->internal.status = SPDK_BDEV_IO_STATUS_SCSI_ERROR; 4070 bdev_io->internal.error.scsi.sc = sc; 4071 bdev_io->internal.error.scsi.sk = sk; 4072 bdev_io->internal.error.scsi.asc = asc; 4073 bdev_io->internal.error.scsi.ascq = ascq; 4074 } 4075 4076 spdk_bdev_io_complete(bdev_io, bdev_io->internal.status); 4077 } 4078 4079 void 4080 spdk_bdev_io_get_scsi_status(const struct spdk_bdev_io *bdev_io, 4081 int *sc, int *sk, int *asc, int *ascq) 4082 { 4083 assert(sc != NULL); 4084 assert(sk != NULL); 4085 assert(asc != NULL); 4086 assert(ascq != NULL); 4087 4088 switch (bdev_io->internal.status) { 4089 case SPDK_BDEV_IO_STATUS_SUCCESS: 4090 *sc = SPDK_SCSI_STATUS_GOOD; 4091 *sk = SPDK_SCSI_SENSE_NO_SENSE; 4092 *asc = SPDK_SCSI_ASC_NO_ADDITIONAL_SENSE; 4093 *ascq = SPDK_SCSI_ASCQ_CAUSE_NOT_REPORTABLE; 4094 break; 4095 case SPDK_BDEV_IO_STATUS_NVME_ERROR: 4096 spdk_scsi_nvme_translate(bdev_io, sc, sk, asc, ascq); 4097 break; 4098 case SPDK_BDEV_IO_STATUS_SCSI_ERROR: 4099 *sc = bdev_io->internal.error.scsi.sc; 4100 *sk = bdev_io->internal.error.scsi.sk; 4101 *asc = bdev_io->internal.error.scsi.asc; 4102 *ascq = bdev_io->internal.error.scsi.ascq; 4103 break; 4104 default: 4105 *sc = SPDK_SCSI_STATUS_CHECK_CONDITION; 4106 *sk = SPDK_SCSI_SENSE_ABORTED_COMMAND; 4107 *asc = SPDK_SCSI_ASC_NO_ADDITIONAL_SENSE; 4108 *ascq = SPDK_SCSI_ASCQ_CAUSE_NOT_REPORTABLE; 4109 break; 4110 } 4111 } 4112 4113 void 4114 spdk_bdev_io_complete_nvme_status(struct spdk_bdev_io *bdev_io, uint32_t cdw0, int sct, int sc) 4115 { 4116 if (sct == SPDK_NVME_SCT_GENERIC && sc == SPDK_NVME_SC_SUCCESS) { 4117 bdev_io->internal.status = SPDK_BDEV_IO_STATUS_SUCCESS; 4118 } else { 4119 bdev_io->internal.status = SPDK_BDEV_IO_STATUS_NVME_ERROR; 4120 } 4121 4122 bdev_io->internal.error.nvme.cdw0 = cdw0; 4123 bdev_io->internal.error.nvme.sct = sct; 4124 bdev_io->internal.error.nvme.sc = sc; 4125 4126 spdk_bdev_io_complete(bdev_io, bdev_io->internal.status); 4127 } 4128 4129 void 4130 spdk_bdev_io_get_nvme_status(const struct spdk_bdev_io *bdev_io, uint32_t *cdw0, int *sct, int *sc) 4131 { 4132 assert(sct != NULL); 4133 assert(sc != NULL); 4134 assert(cdw0 != NULL); 4135 4136 if (bdev_io->internal.status == SPDK_BDEV_IO_STATUS_NVME_ERROR) { 4137 *sct = bdev_io->internal.error.nvme.sct; 4138 *sc = bdev_io->internal.error.nvme.sc; 4139 } else if (bdev_io->internal.status == SPDK_BDEV_IO_STATUS_SUCCESS) { 4140 *sct = SPDK_NVME_SCT_GENERIC; 4141 *sc = SPDK_NVME_SC_SUCCESS; 4142 } else { 4143 *sct = SPDK_NVME_SCT_GENERIC; 4144 *sc = SPDK_NVME_SC_INTERNAL_DEVICE_ERROR; 4145 } 4146 4147 *cdw0 = bdev_io->internal.error.nvme.cdw0; 4148 } 4149 4150 struct spdk_thread * 4151 spdk_bdev_io_get_thread(struct spdk_bdev_io *bdev_io) 4152 { 4153 return spdk_io_channel_get_thread(bdev_io->internal.ch->channel); 4154 } 4155 4156 struct spdk_io_channel * 4157 spdk_bdev_io_get_io_channel(struct spdk_bdev_io *bdev_io) 4158 { 4159 return bdev_io->internal.ch->channel; 4160 } 4161 4162 static void 4163 bdev_qos_config_limit(struct spdk_bdev *bdev, uint64_t *limits) 4164 { 4165 uint64_t min_qos_set; 4166 int i; 4167 4168 for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) { 4169 if (limits[i] != SPDK_BDEV_QOS_LIMIT_NOT_DEFINED) { 4170 break; 4171 } 4172 } 4173 4174 if (i == SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES) { 4175 SPDK_ERRLOG("Invalid rate limits set.\n"); 4176 return; 4177 } 4178 4179 for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) { 4180 if (limits[i] == SPDK_BDEV_QOS_LIMIT_NOT_DEFINED) { 4181 continue; 4182 } 4183 4184 if (bdev_qos_is_iops_rate_limit(i) == true) { 4185 min_qos_set = SPDK_BDEV_QOS_MIN_IOS_PER_SEC; 4186 } else { 4187 min_qos_set = SPDK_BDEV_QOS_MIN_BYTES_PER_SEC; 4188 } 4189 4190 if (limits[i] == 0 || limits[i] % min_qos_set) { 4191 SPDK_ERRLOG("Assigned limit %" PRIu64 " on bdev %s is not multiple of %" PRIu64 "\n", 4192 limits[i], bdev->name, min_qos_set); 4193 SPDK_ERRLOG("Failed to enable QoS on this bdev %s\n", bdev->name); 4194 return; 4195 } 4196 } 4197 4198 if (!bdev->internal.qos) { 4199 bdev->internal.qos = calloc(1, sizeof(*bdev->internal.qos)); 4200 if (!bdev->internal.qos) { 4201 SPDK_ERRLOG("Unable to allocate memory for QoS tracking\n"); 4202 return; 4203 } 4204 } 4205 4206 for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) { 4207 bdev->internal.qos->rate_limits[i].limit = limits[i]; 4208 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Bdev:%s QoS type:%d set:%lu\n", 4209 bdev->name, i, limits[i]); 4210 } 4211 4212 return; 4213 } 4214 4215 static void 4216 bdev_qos_config(struct spdk_bdev *bdev) 4217 { 4218 struct spdk_conf_section *sp = NULL; 4219 const char *val = NULL; 4220 int i = 0, j = 0; 4221 uint64_t limits[SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES] = {}; 4222 bool config_qos = false; 4223 4224 sp = spdk_conf_find_section(NULL, "QoS"); 4225 if (!sp) { 4226 return; 4227 } 4228 4229 while (j < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES) { 4230 limits[j] = SPDK_BDEV_QOS_LIMIT_NOT_DEFINED; 4231 4232 i = 0; 4233 while (true) { 4234 val = spdk_conf_section_get_nmval(sp, qos_conf_type[j], i, 0); 4235 if (!val) { 4236 break; 4237 } 4238 4239 if (strcmp(bdev->name, val) != 0) { 4240 i++; 4241 continue; 4242 } 4243 4244 val = spdk_conf_section_get_nmval(sp, qos_conf_type[j], i, 1); 4245 if (val) { 4246 if (bdev_qos_is_iops_rate_limit(j) == true) { 4247 limits[j] = strtoull(val, NULL, 10); 4248 } else { 4249 limits[j] = strtoull(val, NULL, 10) * 1024 * 1024; 4250 } 4251 config_qos = true; 4252 } 4253 4254 break; 4255 } 4256 4257 j++; 4258 } 4259 4260 if (config_qos == true) { 4261 bdev_qos_config_limit(bdev, limits); 4262 } 4263 4264 return; 4265 } 4266 4267 static int 4268 bdev_init(struct spdk_bdev *bdev) 4269 { 4270 char *bdev_name; 4271 4272 assert(bdev->module != NULL); 4273 4274 if (!bdev->name) { 4275 SPDK_ERRLOG("Bdev name is NULL\n"); 4276 return -EINVAL; 4277 } 4278 4279 if (!strlen(bdev->name)) { 4280 SPDK_ERRLOG("Bdev name must not be an empty string\n"); 4281 return -EINVAL; 4282 } 4283 4284 if (spdk_bdev_get_by_name(bdev->name)) { 4285 SPDK_ERRLOG("Bdev name:%s already exists\n", bdev->name); 4286 return -EEXIST; 4287 } 4288 4289 /* Users often register their own I/O devices using the bdev name. In 4290 * order to avoid conflicts, prepend bdev_. */ 4291 bdev_name = spdk_sprintf_alloc("bdev_%s", bdev->name); 4292 if (!bdev_name) { 4293 SPDK_ERRLOG("Unable to allocate memory for internal bdev name.\n"); 4294 return -ENOMEM; 4295 } 4296 4297 bdev->internal.status = SPDK_BDEV_STATUS_READY; 4298 bdev->internal.measured_queue_depth = UINT64_MAX; 4299 bdev->internal.claim_module = NULL; 4300 bdev->internal.qd_poller = NULL; 4301 bdev->internal.qos = NULL; 4302 4303 /* If the user didn't specify a uuid, generate one. */ 4304 if (spdk_mem_all_zero(&bdev->uuid, sizeof(bdev->uuid))) { 4305 spdk_uuid_generate(&bdev->uuid); 4306 } 4307 4308 if (spdk_bdev_get_buf_align(bdev) > 1) { 4309 if (bdev->split_on_optimal_io_boundary) { 4310 bdev->optimal_io_boundary = spdk_min(bdev->optimal_io_boundary, 4311 SPDK_BDEV_LARGE_BUF_MAX_SIZE / bdev->blocklen); 4312 } else { 4313 bdev->split_on_optimal_io_boundary = true; 4314 bdev->optimal_io_boundary = SPDK_BDEV_LARGE_BUF_MAX_SIZE / bdev->blocklen; 4315 } 4316 } 4317 4318 /* If the user didn't specify a write unit size, set it to one. */ 4319 if (bdev->write_unit_size == 0) { 4320 bdev->write_unit_size = 1; 4321 } 4322 4323 TAILQ_INIT(&bdev->internal.open_descs); 4324 4325 TAILQ_INIT(&bdev->aliases); 4326 4327 bdev->internal.reset_in_progress = NULL; 4328 4329 bdev_qos_config(bdev); 4330 4331 spdk_io_device_register(__bdev_to_io_dev(bdev), 4332 bdev_channel_create, bdev_channel_destroy, 4333 sizeof(struct spdk_bdev_channel), 4334 bdev_name); 4335 4336 free(bdev_name); 4337 4338 pthread_mutex_init(&bdev->internal.mutex, NULL); 4339 return 0; 4340 } 4341 4342 static void 4343 bdev_destroy_cb(void *io_device) 4344 { 4345 int rc; 4346 struct spdk_bdev *bdev; 4347 spdk_bdev_unregister_cb cb_fn; 4348 void *cb_arg; 4349 4350 bdev = __bdev_from_io_dev(io_device); 4351 cb_fn = bdev->internal.unregister_cb; 4352 cb_arg = bdev->internal.unregister_ctx; 4353 4354 rc = bdev->fn_table->destruct(bdev->ctxt); 4355 if (rc < 0) { 4356 SPDK_ERRLOG("destruct failed\n"); 4357 } 4358 if (rc <= 0 && cb_fn != NULL) { 4359 cb_fn(cb_arg, rc); 4360 } 4361 } 4362 4363 4364 static void 4365 bdev_fini(struct spdk_bdev *bdev) 4366 { 4367 pthread_mutex_destroy(&bdev->internal.mutex); 4368 4369 free(bdev->internal.qos); 4370 4371 spdk_io_device_unregister(__bdev_to_io_dev(bdev), bdev_destroy_cb); 4372 } 4373 4374 static void 4375 bdev_start(struct spdk_bdev *bdev) 4376 { 4377 struct spdk_bdev_module *module; 4378 uint32_t action; 4379 4380 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Inserting bdev %s into list\n", bdev->name); 4381 TAILQ_INSERT_TAIL(&g_bdev_mgr.bdevs, bdev, internal.link); 4382 4383 /* Examine configuration before initializing I/O */ 4384 TAILQ_FOREACH(module, &g_bdev_mgr.bdev_modules, internal.tailq) { 4385 if (module->examine_config) { 4386 action = module->internal.action_in_progress; 4387 module->internal.action_in_progress++; 4388 module->examine_config(bdev); 4389 if (action != module->internal.action_in_progress) { 4390 SPDK_ERRLOG("examine_config for module %s did not call spdk_bdev_module_examine_done()\n", 4391 module->name); 4392 } 4393 } 4394 } 4395 4396 if (bdev->internal.claim_module) { 4397 if (bdev->internal.claim_module->examine_disk) { 4398 bdev->internal.claim_module->internal.action_in_progress++; 4399 bdev->internal.claim_module->examine_disk(bdev); 4400 } 4401 return; 4402 } 4403 4404 TAILQ_FOREACH(module, &g_bdev_mgr.bdev_modules, internal.tailq) { 4405 if (module->examine_disk) { 4406 module->internal.action_in_progress++; 4407 module->examine_disk(bdev); 4408 } 4409 } 4410 } 4411 4412 int 4413 spdk_bdev_register(struct spdk_bdev *bdev) 4414 { 4415 int rc = bdev_init(bdev); 4416 4417 if (rc == 0) { 4418 bdev_start(bdev); 4419 } 4420 4421 spdk_notify_send("bdev_register", spdk_bdev_get_name(bdev)); 4422 return rc; 4423 } 4424 4425 int 4426 spdk_vbdev_register(struct spdk_bdev *vbdev, struct spdk_bdev **base_bdevs, int base_bdev_count) 4427 { 4428 SPDK_ERRLOG("This function is deprecated. Use spdk_bdev_register() instead.\n"); 4429 return spdk_bdev_register(vbdev); 4430 } 4431 4432 void 4433 spdk_bdev_destruct_done(struct spdk_bdev *bdev, int bdeverrno) 4434 { 4435 if (bdev->internal.unregister_cb != NULL) { 4436 bdev->internal.unregister_cb(bdev->internal.unregister_ctx, bdeverrno); 4437 } 4438 } 4439 4440 static void 4441 _remove_notify(void *arg) 4442 { 4443 struct spdk_bdev_desc *desc = arg; 4444 4445 pthread_mutex_lock(&desc->mutex); 4446 desc->refs--; 4447 4448 if (!desc->closed) { 4449 pthread_mutex_unlock(&desc->mutex); 4450 if (desc->callback.open_with_ext) { 4451 desc->callback.event_fn(SPDK_BDEV_EVENT_REMOVE, desc->bdev, desc->callback.ctx); 4452 } else { 4453 desc->callback.remove_fn(desc->callback.ctx); 4454 } 4455 return; 4456 } else if (0 == desc->refs) { 4457 /* This descriptor was closed after this remove_notify message was sent. 4458 * spdk_bdev_close() could not free the descriptor since this message was 4459 * in flight, so we free it now using bdev_desc_free(). 4460 */ 4461 pthread_mutex_unlock(&desc->mutex); 4462 bdev_desc_free(desc); 4463 return; 4464 } 4465 pthread_mutex_unlock(&desc->mutex); 4466 } 4467 4468 /* Must be called while holding bdev->internal.mutex. 4469 * returns: 0 - bdev removed and ready to be destructed. 4470 * -EBUSY - bdev can't be destructed yet. */ 4471 static int 4472 bdev_unregister_unsafe(struct spdk_bdev *bdev) 4473 { 4474 struct spdk_bdev_desc *desc, *tmp; 4475 int rc = 0; 4476 4477 /* Notify each descriptor about hotremoval */ 4478 TAILQ_FOREACH_SAFE(desc, &bdev->internal.open_descs, link, tmp) { 4479 rc = -EBUSY; 4480 pthread_mutex_lock(&desc->mutex); 4481 /* 4482 * Defer invocation of the event_cb to a separate message that will 4483 * run later on its thread. This ensures this context unwinds and 4484 * we don't recursively unregister this bdev again if the event_cb 4485 * immediately closes its descriptor. 4486 */ 4487 desc->refs++; 4488 spdk_thread_send_msg(desc->thread, _remove_notify, desc); 4489 pthread_mutex_unlock(&desc->mutex); 4490 } 4491 4492 /* If there are no descriptors, proceed removing the bdev */ 4493 if (rc == 0) { 4494 TAILQ_REMOVE(&g_bdev_mgr.bdevs, bdev, internal.link); 4495 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Removing bdev %s from list done\n", bdev->name); 4496 spdk_notify_send("bdev_unregister", spdk_bdev_get_name(bdev)); 4497 } 4498 4499 return rc; 4500 } 4501 4502 void 4503 spdk_bdev_unregister(struct spdk_bdev *bdev, spdk_bdev_unregister_cb cb_fn, void *cb_arg) 4504 { 4505 struct spdk_thread *thread; 4506 int rc; 4507 4508 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Removing bdev %s from list\n", bdev->name); 4509 4510 thread = spdk_get_thread(); 4511 if (!thread) { 4512 /* The user called this from a non-SPDK thread. */ 4513 if (cb_fn != NULL) { 4514 cb_fn(cb_arg, -ENOTSUP); 4515 } 4516 return; 4517 } 4518 4519 pthread_mutex_lock(&g_bdev_mgr.mutex); 4520 pthread_mutex_lock(&bdev->internal.mutex); 4521 if (bdev->internal.status == SPDK_BDEV_STATUS_REMOVING) { 4522 pthread_mutex_unlock(&bdev->internal.mutex); 4523 pthread_mutex_unlock(&g_bdev_mgr.mutex); 4524 if (cb_fn) { 4525 cb_fn(cb_arg, -EBUSY); 4526 } 4527 return; 4528 } 4529 4530 bdev->internal.status = SPDK_BDEV_STATUS_REMOVING; 4531 bdev->internal.unregister_cb = cb_fn; 4532 bdev->internal.unregister_ctx = cb_arg; 4533 4534 /* Call under lock. */ 4535 rc = bdev_unregister_unsafe(bdev); 4536 pthread_mutex_unlock(&bdev->internal.mutex); 4537 pthread_mutex_unlock(&g_bdev_mgr.mutex); 4538 4539 if (rc == 0) { 4540 bdev_fini(bdev); 4541 } 4542 } 4543 4544 static void 4545 bdev_dummy_event_cb(void *remove_ctx) 4546 { 4547 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Bdev remove event received with no remove callback specified"); 4548 } 4549 4550 static int 4551 bdev_open(struct spdk_bdev *bdev, bool write, struct spdk_bdev_desc *desc) 4552 { 4553 struct spdk_thread *thread; 4554 struct set_qos_limit_ctx *ctx; 4555 4556 thread = spdk_get_thread(); 4557 if (!thread) { 4558 SPDK_ERRLOG("Cannot open bdev from non-SPDK thread.\n"); 4559 return -ENOTSUP; 4560 } 4561 4562 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Opening descriptor %p for bdev %s on thread %p\n", desc, bdev->name, 4563 spdk_get_thread()); 4564 4565 desc->bdev = bdev; 4566 desc->thread = thread; 4567 desc->write = write; 4568 4569 pthread_mutex_lock(&bdev->internal.mutex); 4570 if (bdev->internal.status == SPDK_BDEV_STATUS_REMOVING) { 4571 pthread_mutex_unlock(&bdev->internal.mutex); 4572 return -ENODEV; 4573 } 4574 4575 if (write && bdev->internal.claim_module) { 4576 SPDK_ERRLOG("Could not open %s - %s module already claimed it\n", 4577 bdev->name, bdev->internal.claim_module->name); 4578 pthread_mutex_unlock(&bdev->internal.mutex); 4579 return -EPERM; 4580 } 4581 4582 /* Enable QoS */ 4583 if (bdev->internal.qos && bdev->internal.qos->thread == NULL) { 4584 ctx = calloc(1, sizeof(*ctx)); 4585 if (ctx == NULL) { 4586 SPDK_ERRLOG("Failed to allocate memory for QoS context\n"); 4587 pthread_mutex_unlock(&bdev->internal.mutex); 4588 return -ENOMEM; 4589 } 4590 ctx->bdev = bdev; 4591 spdk_for_each_channel(__bdev_to_io_dev(bdev), 4592 bdev_enable_qos_msg, ctx, 4593 bdev_enable_qos_done); 4594 } 4595 4596 TAILQ_INSERT_TAIL(&bdev->internal.open_descs, desc, link); 4597 4598 pthread_mutex_unlock(&bdev->internal.mutex); 4599 4600 return 0; 4601 } 4602 4603 int 4604 spdk_bdev_open(struct spdk_bdev *bdev, bool write, spdk_bdev_remove_cb_t remove_cb, 4605 void *remove_ctx, struct spdk_bdev_desc **_desc) 4606 { 4607 struct spdk_bdev_desc *desc; 4608 int rc; 4609 4610 desc = calloc(1, sizeof(*desc)); 4611 if (desc == NULL) { 4612 SPDK_ERRLOG("Failed to allocate memory for bdev descriptor\n"); 4613 return -ENOMEM; 4614 } 4615 4616 if (remove_cb == NULL) { 4617 remove_cb = bdev_dummy_event_cb; 4618 } 4619 4620 TAILQ_INIT(&desc->pending_media_events); 4621 TAILQ_INIT(&desc->free_media_events); 4622 4623 desc->callback.open_with_ext = false; 4624 desc->callback.remove_fn = remove_cb; 4625 desc->callback.ctx = remove_ctx; 4626 pthread_mutex_init(&desc->mutex, NULL); 4627 4628 pthread_mutex_lock(&g_bdev_mgr.mutex); 4629 4630 rc = bdev_open(bdev, write, desc); 4631 if (rc != 0) { 4632 bdev_desc_free(desc); 4633 desc = NULL; 4634 } 4635 4636 *_desc = desc; 4637 4638 pthread_mutex_unlock(&g_bdev_mgr.mutex); 4639 4640 return rc; 4641 } 4642 4643 int 4644 spdk_bdev_open_ext(const char *bdev_name, bool write, spdk_bdev_event_cb_t event_cb, 4645 void *event_ctx, struct spdk_bdev_desc **_desc) 4646 { 4647 struct spdk_bdev_desc *desc; 4648 struct spdk_bdev *bdev; 4649 unsigned int event_id; 4650 int rc; 4651 4652 if (event_cb == NULL) { 4653 SPDK_ERRLOG("Missing event callback function\n"); 4654 return -EINVAL; 4655 } 4656 4657 pthread_mutex_lock(&g_bdev_mgr.mutex); 4658 4659 bdev = spdk_bdev_get_by_name(bdev_name); 4660 4661 if (bdev == NULL) { 4662 SPDK_ERRLOG("Failed to find bdev with name: %s\n", bdev_name); 4663 pthread_mutex_unlock(&g_bdev_mgr.mutex); 4664 return -EINVAL; 4665 } 4666 4667 desc = calloc(1, sizeof(*desc)); 4668 if (desc == NULL) { 4669 SPDK_ERRLOG("Failed to allocate memory for bdev descriptor\n"); 4670 pthread_mutex_unlock(&g_bdev_mgr.mutex); 4671 return -ENOMEM; 4672 } 4673 4674 TAILQ_INIT(&desc->pending_media_events); 4675 TAILQ_INIT(&desc->free_media_events); 4676 4677 desc->callback.open_with_ext = true; 4678 desc->callback.event_fn = event_cb; 4679 desc->callback.ctx = event_ctx; 4680 pthread_mutex_init(&desc->mutex, NULL); 4681 4682 if (bdev->media_events) { 4683 desc->media_events_buffer = calloc(MEDIA_EVENT_POOL_SIZE, 4684 sizeof(*desc->media_events_buffer)); 4685 if (desc->media_events_buffer == NULL) { 4686 SPDK_ERRLOG("Failed to initialize media event pool\n"); 4687 bdev_desc_free(desc); 4688 pthread_mutex_unlock(&g_bdev_mgr.mutex); 4689 return -ENOMEM; 4690 } 4691 4692 for (event_id = 0; event_id < MEDIA_EVENT_POOL_SIZE; ++event_id) { 4693 TAILQ_INSERT_TAIL(&desc->free_media_events, 4694 &desc->media_events_buffer[event_id], tailq); 4695 } 4696 } 4697 4698 rc = bdev_open(bdev, write, desc); 4699 if (rc != 0) { 4700 bdev_desc_free(desc); 4701 desc = NULL; 4702 } 4703 4704 *_desc = desc; 4705 4706 pthread_mutex_unlock(&g_bdev_mgr.mutex); 4707 4708 return rc; 4709 } 4710 4711 void 4712 spdk_bdev_close(struct spdk_bdev_desc *desc) 4713 { 4714 struct spdk_bdev *bdev = spdk_bdev_desc_get_bdev(desc); 4715 int rc; 4716 4717 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Closing descriptor %p for bdev %s on thread %p\n", desc, bdev->name, 4718 spdk_get_thread()); 4719 4720 assert(desc->thread == spdk_get_thread()); 4721 4722 spdk_poller_unregister(&desc->io_timeout_poller); 4723 4724 pthread_mutex_lock(&bdev->internal.mutex); 4725 pthread_mutex_lock(&desc->mutex); 4726 4727 TAILQ_REMOVE(&bdev->internal.open_descs, desc, link); 4728 4729 desc->closed = true; 4730 4731 if (0 == desc->refs) { 4732 pthread_mutex_unlock(&desc->mutex); 4733 bdev_desc_free(desc); 4734 } else { 4735 pthread_mutex_unlock(&desc->mutex); 4736 } 4737 4738 /* If no more descriptors, kill QoS channel */ 4739 if (bdev->internal.qos && TAILQ_EMPTY(&bdev->internal.open_descs)) { 4740 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Closed last descriptor for bdev %s on thread %p. Stopping QoS.\n", 4741 bdev->name, spdk_get_thread()); 4742 4743 if (bdev_qos_destroy(bdev)) { 4744 /* There isn't anything we can do to recover here. Just let the 4745 * old QoS poller keep running. The QoS handling won't change 4746 * cores when the user allocates a new channel, but it won't break. */ 4747 SPDK_ERRLOG("Unable to shut down QoS poller. It will continue running on the current thread.\n"); 4748 } 4749 } 4750 4751 spdk_bdev_set_qd_sampling_period(bdev, 0); 4752 4753 if (bdev->internal.status == SPDK_BDEV_STATUS_REMOVING && TAILQ_EMPTY(&bdev->internal.open_descs)) { 4754 rc = bdev_unregister_unsafe(bdev); 4755 pthread_mutex_unlock(&bdev->internal.mutex); 4756 4757 if (rc == 0) { 4758 bdev_fini(bdev); 4759 } 4760 } else { 4761 pthread_mutex_unlock(&bdev->internal.mutex); 4762 } 4763 } 4764 4765 int 4766 spdk_bdev_module_claim_bdev(struct spdk_bdev *bdev, struct spdk_bdev_desc *desc, 4767 struct spdk_bdev_module *module) 4768 { 4769 if (bdev->internal.claim_module != NULL) { 4770 SPDK_ERRLOG("bdev %s already claimed by module %s\n", bdev->name, 4771 bdev->internal.claim_module->name); 4772 return -EPERM; 4773 } 4774 4775 if (desc && !desc->write) { 4776 desc->write = true; 4777 } 4778 4779 bdev->internal.claim_module = module; 4780 return 0; 4781 } 4782 4783 void 4784 spdk_bdev_module_release_bdev(struct spdk_bdev *bdev) 4785 { 4786 assert(bdev->internal.claim_module != NULL); 4787 bdev->internal.claim_module = NULL; 4788 } 4789 4790 struct spdk_bdev * 4791 spdk_bdev_desc_get_bdev(struct spdk_bdev_desc *desc) 4792 { 4793 assert(desc != NULL); 4794 return desc->bdev; 4795 } 4796 4797 void 4798 spdk_bdev_io_get_iovec(struct spdk_bdev_io *bdev_io, struct iovec **iovp, int *iovcntp) 4799 { 4800 struct iovec *iovs; 4801 int iovcnt; 4802 4803 if (bdev_io == NULL) { 4804 return; 4805 } 4806 4807 switch (bdev_io->type) { 4808 case SPDK_BDEV_IO_TYPE_READ: 4809 case SPDK_BDEV_IO_TYPE_WRITE: 4810 case SPDK_BDEV_IO_TYPE_ZCOPY: 4811 iovs = bdev_io->u.bdev.iovs; 4812 iovcnt = bdev_io->u.bdev.iovcnt; 4813 break; 4814 default: 4815 iovs = NULL; 4816 iovcnt = 0; 4817 break; 4818 } 4819 4820 if (iovp) { 4821 *iovp = iovs; 4822 } 4823 if (iovcntp) { 4824 *iovcntp = iovcnt; 4825 } 4826 } 4827 4828 void * 4829 spdk_bdev_io_get_md_buf(struct spdk_bdev_io *bdev_io) 4830 { 4831 if (bdev_io == NULL) { 4832 return NULL; 4833 } 4834 4835 if (!spdk_bdev_is_md_separate(bdev_io->bdev)) { 4836 return NULL; 4837 } 4838 4839 if (bdev_io->type == SPDK_BDEV_IO_TYPE_READ || 4840 bdev_io->type == SPDK_BDEV_IO_TYPE_WRITE) { 4841 return bdev_io->u.bdev.md_buf; 4842 } 4843 4844 return NULL; 4845 } 4846 4847 void 4848 spdk_bdev_module_list_add(struct spdk_bdev_module *bdev_module) 4849 { 4850 4851 if (spdk_bdev_module_list_find(bdev_module->name)) { 4852 SPDK_ERRLOG("ERROR: module '%s' already registered.\n", bdev_module->name); 4853 assert(false); 4854 } 4855 4856 /* 4857 * Modules with examine callbacks must be initialized first, so they are 4858 * ready to handle examine callbacks from later modules that will 4859 * register physical bdevs. 4860 */ 4861 if (bdev_module->examine_config != NULL || bdev_module->examine_disk != NULL) { 4862 TAILQ_INSERT_HEAD(&g_bdev_mgr.bdev_modules, bdev_module, internal.tailq); 4863 } else { 4864 TAILQ_INSERT_TAIL(&g_bdev_mgr.bdev_modules, bdev_module, internal.tailq); 4865 } 4866 } 4867 4868 struct spdk_bdev_module * 4869 spdk_bdev_module_list_find(const char *name) 4870 { 4871 struct spdk_bdev_module *bdev_module; 4872 4873 TAILQ_FOREACH(bdev_module, &g_bdev_mgr.bdev_modules, internal.tailq) { 4874 if (strcmp(name, bdev_module->name) == 0) { 4875 break; 4876 } 4877 } 4878 4879 return bdev_module; 4880 } 4881 4882 static void 4883 bdev_write_zero_buffer_next(void *_bdev_io) 4884 { 4885 struct spdk_bdev_io *bdev_io = _bdev_io; 4886 uint64_t num_bytes, num_blocks; 4887 void *md_buf = NULL; 4888 int rc; 4889 4890 num_bytes = spdk_min(_bdev_get_block_size_with_md(bdev_io->bdev) * 4891 bdev_io->u.bdev.split_remaining_num_blocks, 4892 ZERO_BUFFER_SIZE); 4893 num_blocks = num_bytes / _bdev_get_block_size_with_md(bdev_io->bdev); 4894 4895 if (spdk_bdev_is_md_separate(bdev_io->bdev)) { 4896 md_buf = (char *)g_bdev_mgr.zero_buffer + 4897 spdk_bdev_get_block_size(bdev_io->bdev) * num_blocks; 4898 } 4899 4900 rc = bdev_write_blocks_with_md(bdev_io->internal.desc, 4901 spdk_io_channel_from_ctx(bdev_io->internal.ch), 4902 g_bdev_mgr.zero_buffer, md_buf, 4903 bdev_io->u.bdev.split_current_offset_blocks, num_blocks, 4904 bdev_write_zero_buffer_done, bdev_io); 4905 if (rc == 0) { 4906 bdev_io->u.bdev.split_remaining_num_blocks -= num_blocks; 4907 bdev_io->u.bdev.split_current_offset_blocks += num_blocks; 4908 } else if (rc == -ENOMEM) { 4909 bdev_queue_io_wait_with_cb(bdev_io, bdev_write_zero_buffer_next); 4910 } else { 4911 bdev_io->internal.status = SPDK_BDEV_IO_STATUS_FAILED; 4912 bdev_io->internal.cb(bdev_io, false, bdev_io->internal.caller_ctx); 4913 } 4914 } 4915 4916 static void 4917 bdev_write_zero_buffer_done(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg) 4918 { 4919 struct spdk_bdev_io *parent_io = cb_arg; 4920 4921 spdk_bdev_free_io(bdev_io); 4922 4923 if (!success) { 4924 parent_io->internal.status = SPDK_BDEV_IO_STATUS_FAILED; 4925 parent_io->internal.cb(parent_io, false, parent_io->internal.caller_ctx); 4926 return; 4927 } 4928 4929 if (parent_io->u.bdev.split_remaining_num_blocks == 0) { 4930 parent_io->internal.status = SPDK_BDEV_IO_STATUS_SUCCESS; 4931 parent_io->internal.cb(parent_io, true, parent_io->internal.caller_ctx); 4932 return; 4933 } 4934 4935 bdev_write_zero_buffer_next(parent_io); 4936 } 4937 4938 static void 4939 bdev_set_qos_limit_done(struct set_qos_limit_ctx *ctx, int status) 4940 { 4941 pthread_mutex_lock(&ctx->bdev->internal.mutex); 4942 ctx->bdev->internal.qos_mod_in_progress = false; 4943 pthread_mutex_unlock(&ctx->bdev->internal.mutex); 4944 4945 if (ctx->cb_fn) { 4946 ctx->cb_fn(ctx->cb_arg, status); 4947 } 4948 free(ctx); 4949 } 4950 4951 static void 4952 bdev_disable_qos_done(void *cb_arg) 4953 { 4954 struct set_qos_limit_ctx *ctx = cb_arg; 4955 struct spdk_bdev *bdev = ctx->bdev; 4956 struct spdk_bdev_io *bdev_io; 4957 struct spdk_bdev_qos *qos; 4958 4959 pthread_mutex_lock(&bdev->internal.mutex); 4960 qos = bdev->internal.qos; 4961 bdev->internal.qos = NULL; 4962 pthread_mutex_unlock(&bdev->internal.mutex); 4963 4964 while (!TAILQ_EMPTY(&qos->queued)) { 4965 /* Send queued I/O back to their original thread for resubmission. */ 4966 bdev_io = TAILQ_FIRST(&qos->queued); 4967 TAILQ_REMOVE(&qos->queued, bdev_io, internal.link); 4968 4969 if (bdev_io->internal.io_submit_ch) { 4970 /* 4971 * Channel was changed when sending it to the QoS thread - change it back 4972 * before sending it back to the original thread. 4973 */ 4974 bdev_io->internal.ch = bdev_io->internal.io_submit_ch; 4975 bdev_io->internal.io_submit_ch = NULL; 4976 } 4977 4978 spdk_thread_send_msg(spdk_bdev_io_get_thread(bdev_io), 4979 _bdev_io_submit, bdev_io); 4980 } 4981 4982 if (qos->thread != NULL) { 4983 spdk_put_io_channel(spdk_io_channel_from_ctx(qos->ch)); 4984 spdk_poller_unregister(&qos->poller); 4985 } 4986 4987 free(qos); 4988 4989 bdev_set_qos_limit_done(ctx, 0); 4990 } 4991 4992 static void 4993 bdev_disable_qos_msg_done(struct spdk_io_channel_iter *i, int status) 4994 { 4995 void *io_device = spdk_io_channel_iter_get_io_device(i); 4996 struct spdk_bdev *bdev = __bdev_from_io_dev(io_device); 4997 struct set_qos_limit_ctx *ctx = spdk_io_channel_iter_get_ctx(i); 4998 struct spdk_thread *thread; 4999 5000 pthread_mutex_lock(&bdev->internal.mutex); 5001 thread = bdev->internal.qos->thread; 5002 pthread_mutex_unlock(&bdev->internal.mutex); 5003 5004 if (thread != NULL) { 5005 spdk_thread_send_msg(thread, bdev_disable_qos_done, ctx); 5006 } else { 5007 bdev_disable_qos_done(ctx); 5008 } 5009 } 5010 5011 static void 5012 bdev_disable_qos_msg(struct spdk_io_channel_iter *i) 5013 { 5014 struct spdk_io_channel *ch = spdk_io_channel_iter_get_channel(i); 5015 struct spdk_bdev_channel *bdev_ch = spdk_io_channel_get_ctx(ch); 5016 5017 bdev_ch->flags &= ~BDEV_CH_QOS_ENABLED; 5018 5019 spdk_for_each_channel_continue(i, 0); 5020 } 5021 5022 static void 5023 bdev_update_qos_rate_limit_msg(void *cb_arg) 5024 { 5025 struct set_qos_limit_ctx *ctx = cb_arg; 5026 struct spdk_bdev *bdev = ctx->bdev; 5027 5028 pthread_mutex_lock(&bdev->internal.mutex); 5029 bdev_qos_update_max_quota_per_timeslice(bdev->internal.qos); 5030 pthread_mutex_unlock(&bdev->internal.mutex); 5031 5032 bdev_set_qos_limit_done(ctx, 0); 5033 } 5034 5035 static void 5036 bdev_enable_qos_msg(struct spdk_io_channel_iter *i) 5037 { 5038 void *io_device = spdk_io_channel_iter_get_io_device(i); 5039 struct spdk_bdev *bdev = __bdev_from_io_dev(io_device); 5040 struct spdk_io_channel *ch = spdk_io_channel_iter_get_channel(i); 5041 struct spdk_bdev_channel *bdev_ch = spdk_io_channel_get_ctx(ch); 5042 5043 pthread_mutex_lock(&bdev->internal.mutex); 5044 bdev_enable_qos(bdev, bdev_ch); 5045 pthread_mutex_unlock(&bdev->internal.mutex); 5046 spdk_for_each_channel_continue(i, 0); 5047 } 5048 5049 static void 5050 bdev_enable_qos_done(struct spdk_io_channel_iter *i, int status) 5051 { 5052 struct set_qos_limit_ctx *ctx = spdk_io_channel_iter_get_ctx(i); 5053 5054 bdev_set_qos_limit_done(ctx, status); 5055 } 5056 5057 static void 5058 bdev_set_qos_rate_limits(struct spdk_bdev *bdev, uint64_t *limits) 5059 { 5060 int i; 5061 5062 assert(bdev->internal.qos != NULL); 5063 5064 for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) { 5065 if (limits[i] != SPDK_BDEV_QOS_LIMIT_NOT_DEFINED) { 5066 bdev->internal.qos->rate_limits[i].limit = limits[i]; 5067 5068 if (limits[i] == 0) { 5069 bdev->internal.qos->rate_limits[i].limit = 5070 SPDK_BDEV_QOS_LIMIT_NOT_DEFINED; 5071 } 5072 } 5073 } 5074 } 5075 5076 void 5077 spdk_bdev_set_qos_rate_limits(struct spdk_bdev *bdev, uint64_t *limits, 5078 void (*cb_fn)(void *cb_arg, int status), void *cb_arg) 5079 { 5080 struct set_qos_limit_ctx *ctx; 5081 uint32_t limit_set_complement; 5082 uint64_t min_limit_per_sec; 5083 int i; 5084 bool disable_rate_limit = true; 5085 5086 for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) { 5087 if (limits[i] == SPDK_BDEV_QOS_LIMIT_NOT_DEFINED) { 5088 continue; 5089 } 5090 5091 if (limits[i] > 0) { 5092 disable_rate_limit = false; 5093 } 5094 5095 if (bdev_qos_is_iops_rate_limit(i) == true) { 5096 min_limit_per_sec = SPDK_BDEV_QOS_MIN_IOS_PER_SEC; 5097 } else { 5098 /* Change from megabyte to byte rate limit */ 5099 limits[i] = limits[i] * 1024 * 1024; 5100 min_limit_per_sec = SPDK_BDEV_QOS_MIN_BYTES_PER_SEC; 5101 } 5102 5103 limit_set_complement = limits[i] % min_limit_per_sec; 5104 if (limit_set_complement) { 5105 SPDK_ERRLOG("Requested rate limit %" PRIu64 " is not a multiple of %" PRIu64 "\n", 5106 limits[i], min_limit_per_sec); 5107 limits[i] += min_limit_per_sec - limit_set_complement; 5108 SPDK_ERRLOG("Round up the rate limit to %" PRIu64 "\n", limits[i]); 5109 } 5110 } 5111 5112 ctx = calloc(1, sizeof(*ctx)); 5113 if (ctx == NULL) { 5114 cb_fn(cb_arg, -ENOMEM); 5115 return; 5116 } 5117 5118 ctx->cb_fn = cb_fn; 5119 ctx->cb_arg = cb_arg; 5120 ctx->bdev = bdev; 5121 5122 pthread_mutex_lock(&bdev->internal.mutex); 5123 if (bdev->internal.qos_mod_in_progress) { 5124 pthread_mutex_unlock(&bdev->internal.mutex); 5125 free(ctx); 5126 cb_fn(cb_arg, -EAGAIN); 5127 return; 5128 } 5129 bdev->internal.qos_mod_in_progress = true; 5130 5131 if (disable_rate_limit == true && bdev->internal.qos) { 5132 for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) { 5133 if (limits[i] == SPDK_BDEV_QOS_LIMIT_NOT_DEFINED && 5134 (bdev->internal.qos->rate_limits[i].limit > 0 && 5135 bdev->internal.qos->rate_limits[i].limit != 5136 SPDK_BDEV_QOS_LIMIT_NOT_DEFINED)) { 5137 disable_rate_limit = false; 5138 break; 5139 } 5140 } 5141 } 5142 5143 if (disable_rate_limit == false) { 5144 if (bdev->internal.qos == NULL) { 5145 bdev->internal.qos = calloc(1, sizeof(*bdev->internal.qos)); 5146 if (!bdev->internal.qos) { 5147 pthread_mutex_unlock(&bdev->internal.mutex); 5148 SPDK_ERRLOG("Unable to allocate memory for QoS tracking\n"); 5149 bdev_set_qos_limit_done(ctx, -ENOMEM); 5150 return; 5151 } 5152 } 5153 5154 if (bdev->internal.qos->thread == NULL) { 5155 /* Enabling */ 5156 bdev_set_qos_rate_limits(bdev, limits); 5157 5158 spdk_for_each_channel(__bdev_to_io_dev(bdev), 5159 bdev_enable_qos_msg, ctx, 5160 bdev_enable_qos_done); 5161 } else { 5162 /* Updating */ 5163 bdev_set_qos_rate_limits(bdev, limits); 5164 5165 spdk_thread_send_msg(bdev->internal.qos->thread, 5166 bdev_update_qos_rate_limit_msg, ctx); 5167 } 5168 } else { 5169 if (bdev->internal.qos != NULL) { 5170 bdev_set_qos_rate_limits(bdev, limits); 5171 5172 /* Disabling */ 5173 spdk_for_each_channel(__bdev_to_io_dev(bdev), 5174 bdev_disable_qos_msg, ctx, 5175 bdev_disable_qos_msg_done); 5176 } else { 5177 pthread_mutex_unlock(&bdev->internal.mutex); 5178 bdev_set_qos_limit_done(ctx, 0); 5179 return; 5180 } 5181 } 5182 5183 pthread_mutex_unlock(&bdev->internal.mutex); 5184 } 5185 5186 struct spdk_bdev_histogram_ctx { 5187 spdk_bdev_histogram_status_cb cb_fn; 5188 void *cb_arg; 5189 struct spdk_bdev *bdev; 5190 int status; 5191 }; 5192 5193 static void 5194 bdev_histogram_disable_channel_cb(struct spdk_io_channel_iter *i, int status) 5195 { 5196 struct spdk_bdev_histogram_ctx *ctx = spdk_io_channel_iter_get_ctx(i); 5197 5198 pthread_mutex_lock(&ctx->bdev->internal.mutex); 5199 ctx->bdev->internal.histogram_in_progress = false; 5200 pthread_mutex_unlock(&ctx->bdev->internal.mutex); 5201 ctx->cb_fn(ctx->cb_arg, ctx->status); 5202 free(ctx); 5203 } 5204 5205 static void 5206 bdev_histogram_disable_channel(struct spdk_io_channel_iter *i) 5207 { 5208 struct spdk_io_channel *_ch = spdk_io_channel_iter_get_channel(i); 5209 struct spdk_bdev_channel *ch = spdk_io_channel_get_ctx(_ch); 5210 5211 if (ch->histogram != NULL) { 5212 spdk_histogram_data_free(ch->histogram); 5213 ch->histogram = NULL; 5214 } 5215 spdk_for_each_channel_continue(i, 0); 5216 } 5217 5218 static void 5219 bdev_histogram_enable_channel_cb(struct spdk_io_channel_iter *i, int status) 5220 { 5221 struct spdk_bdev_histogram_ctx *ctx = spdk_io_channel_iter_get_ctx(i); 5222 5223 if (status != 0) { 5224 ctx->status = status; 5225 ctx->bdev->internal.histogram_enabled = false; 5226 spdk_for_each_channel(__bdev_to_io_dev(ctx->bdev), bdev_histogram_disable_channel, ctx, 5227 bdev_histogram_disable_channel_cb); 5228 } else { 5229 pthread_mutex_lock(&ctx->bdev->internal.mutex); 5230 ctx->bdev->internal.histogram_in_progress = false; 5231 pthread_mutex_unlock(&ctx->bdev->internal.mutex); 5232 ctx->cb_fn(ctx->cb_arg, ctx->status); 5233 free(ctx); 5234 } 5235 } 5236 5237 static void 5238 bdev_histogram_enable_channel(struct spdk_io_channel_iter *i) 5239 { 5240 struct spdk_io_channel *_ch = spdk_io_channel_iter_get_channel(i); 5241 struct spdk_bdev_channel *ch = spdk_io_channel_get_ctx(_ch); 5242 int status = 0; 5243 5244 if (ch->histogram == NULL) { 5245 ch->histogram = spdk_histogram_data_alloc(); 5246 if (ch->histogram == NULL) { 5247 status = -ENOMEM; 5248 } 5249 } 5250 5251 spdk_for_each_channel_continue(i, status); 5252 } 5253 5254 void 5255 spdk_bdev_histogram_enable(struct spdk_bdev *bdev, spdk_bdev_histogram_status_cb cb_fn, 5256 void *cb_arg, bool enable) 5257 { 5258 struct spdk_bdev_histogram_ctx *ctx; 5259 5260 ctx = calloc(1, sizeof(struct spdk_bdev_histogram_ctx)); 5261 if (ctx == NULL) { 5262 cb_fn(cb_arg, -ENOMEM); 5263 return; 5264 } 5265 5266 ctx->bdev = bdev; 5267 ctx->status = 0; 5268 ctx->cb_fn = cb_fn; 5269 ctx->cb_arg = cb_arg; 5270 5271 pthread_mutex_lock(&bdev->internal.mutex); 5272 if (bdev->internal.histogram_in_progress) { 5273 pthread_mutex_unlock(&bdev->internal.mutex); 5274 free(ctx); 5275 cb_fn(cb_arg, -EAGAIN); 5276 return; 5277 } 5278 5279 bdev->internal.histogram_in_progress = true; 5280 pthread_mutex_unlock(&bdev->internal.mutex); 5281 5282 bdev->internal.histogram_enabled = enable; 5283 5284 if (enable) { 5285 /* Allocate histogram for each channel */ 5286 spdk_for_each_channel(__bdev_to_io_dev(bdev), bdev_histogram_enable_channel, ctx, 5287 bdev_histogram_enable_channel_cb); 5288 } else { 5289 spdk_for_each_channel(__bdev_to_io_dev(bdev), bdev_histogram_disable_channel, ctx, 5290 bdev_histogram_disable_channel_cb); 5291 } 5292 } 5293 5294 struct spdk_bdev_histogram_data_ctx { 5295 spdk_bdev_histogram_data_cb cb_fn; 5296 void *cb_arg; 5297 struct spdk_bdev *bdev; 5298 /** merged histogram data from all channels */ 5299 struct spdk_histogram_data *histogram; 5300 }; 5301 5302 static void 5303 bdev_histogram_get_channel_cb(struct spdk_io_channel_iter *i, int status) 5304 { 5305 struct spdk_bdev_histogram_data_ctx *ctx = spdk_io_channel_iter_get_ctx(i); 5306 5307 ctx->cb_fn(ctx->cb_arg, status, ctx->histogram); 5308 free(ctx); 5309 } 5310 5311 static void 5312 bdev_histogram_get_channel(struct spdk_io_channel_iter *i) 5313 { 5314 struct spdk_io_channel *_ch = spdk_io_channel_iter_get_channel(i); 5315 struct spdk_bdev_channel *ch = spdk_io_channel_get_ctx(_ch); 5316 struct spdk_bdev_histogram_data_ctx *ctx = spdk_io_channel_iter_get_ctx(i); 5317 int status = 0; 5318 5319 if (ch->histogram == NULL) { 5320 status = -EFAULT; 5321 } else { 5322 spdk_histogram_data_merge(ctx->histogram, ch->histogram); 5323 } 5324 5325 spdk_for_each_channel_continue(i, status); 5326 } 5327 5328 void 5329 spdk_bdev_histogram_get(struct spdk_bdev *bdev, struct spdk_histogram_data *histogram, 5330 spdk_bdev_histogram_data_cb cb_fn, 5331 void *cb_arg) 5332 { 5333 struct spdk_bdev_histogram_data_ctx *ctx; 5334 5335 ctx = calloc(1, sizeof(struct spdk_bdev_histogram_data_ctx)); 5336 if (ctx == NULL) { 5337 cb_fn(cb_arg, -ENOMEM, NULL); 5338 return; 5339 } 5340 5341 ctx->bdev = bdev; 5342 ctx->cb_fn = cb_fn; 5343 ctx->cb_arg = cb_arg; 5344 5345 ctx->histogram = histogram; 5346 5347 spdk_for_each_channel(__bdev_to_io_dev(bdev), bdev_histogram_get_channel, ctx, 5348 bdev_histogram_get_channel_cb); 5349 } 5350 5351 size_t 5352 spdk_bdev_get_media_events(struct spdk_bdev_desc *desc, struct spdk_bdev_media_event *events, 5353 size_t max_events) 5354 { 5355 struct media_event_entry *entry; 5356 size_t num_events = 0; 5357 5358 for (; num_events < max_events; ++num_events) { 5359 entry = TAILQ_FIRST(&desc->pending_media_events); 5360 if (entry == NULL) { 5361 break; 5362 } 5363 5364 events[num_events] = entry->event; 5365 TAILQ_REMOVE(&desc->pending_media_events, entry, tailq); 5366 TAILQ_INSERT_TAIL(&desc->free_media_events, entry, tailq); 5367 } 5368 5369 return num_events; 5370 } 5371 5372 int 5373 spdk_bdev_push_media_events(struct spdk_bdev *bdev, const struct spdk_bdev_media_event *events, 5374 size_t num_events) 5375 { 5376 struct spdk_bdev_desc *desc; 5377 struct media_event_entry *entry; 5378 size_t event_id; 5379 int rc = 0; 5380 5381 assert(bdev->media_events); 5382 5383 pthread_mutex_lock(&bdev->internal.mutex); 5384 TAILQ_FOREACH(desc, &bdev->internal.open_descs, link) { 5385 if (desc->write) { 5386 break; 5387 } 5388 } 5389 5390 if (desc == NULL || desc->media_events_buffer == NULL) { 5391 rc = -ENODEV; 5392 goto out; 5393 } 5394 5395 for (event_id = 0; event_id < num_events; ++event_id) { 5396 entry = TAILQ_FIRST(&desc->free_media_events); 5397 if (entry == NULL) { 5398 break; 5399 } 5400 5401 TAILQ_REMOVE(&desc->free_media_events, entry, tailq); 5402 TAILQ_INSERT_TAIL(&desc->pending_media_events, entry, tailq); 5403 entry->event = events[event_id]; 5404 } 5405 5406 rc = event_id; 5407 out: 5408 pthread_mutex_unlock(&bdev->internal.mutex); 5409 return rc; 5410 } 5411 5412 void 5413 spdk_bdev_notify_media_management(struct spdk_bdev *bdev) 5414 { 5415 struct spdk_bdev_desc *desc; 5416 5417 pthread_mutex_lock(&bdev->internal.mutex); 5418 TAILQ_FOREACH(desc, &bdev->internal.open_descs, link) { 5419 if (!TAILQ_EMPTY(&desc->pending_media_events)) { 5420 desc->callback.event_fn(SPDK_BDEV_EVENT_MEDIA_MANAGEMENT, bdev, 5421 desc->callback.ctx); 5422 } 5423 } 5424 pthread_mutex_unlock(&bdev->internal.mutex); 5425 } 5426 5427 SPDK_LOG_REGISTER_COMPONENT("bdev", SPDK_LOG_BDEV) 5428 5429 SPDK_TRACE_REGISTER_FN(bdev_trace, "bdev", TRACE_GROUP_BDEV) 5430 { 5431 spdk_trace_register_owner(OWNER_BDEV, 'b'); 5432 spdk_trace_register_object(OBJECT_BDEV_IO, 'i'); 5433 spdk_trace_register_description("BDEV_IO_START", TRACE_BDEV_IO_START, OWNER_BDEV, 5434 OBJECT_BDEV_IO, 1, 0, "type: "); 5435 spdk_trace_register_description("BDEV_IO_DONE", TRACE_BDEV_IO_DONE, OWNER_BDEV, 5436 OBJECT_BDEV_IO, 0, 0, ""); 5437 } 5438