1 /*- 2 * BSD LICENSE 3 * 4 * Copyright (c) Intel Corporation. 5 * All rights reserved. 6 * 7 * Redistribution and use in source and binary forms, with or without 8 * modification, are permitted provided that the following conditions 9 * are met: 10 * 11 * * Redistributions of source code must retain the above copyright 12 * notice, this list of conditions and the following disclaimer. 13 * * Redistributions in binary form must reproduce the above copyright 14 * notice, this list of conditions and the following disclaimer in 15 * the documentation and/or other materials provided with the 16 * distribution. 17 * * Neither the name of Intel Corporation nor the names of its 18 * contributors may be used to endorse or promote products derived 19 * from this software without specific prior written permission. 20 * 21 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 22 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 23 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR 24 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT 25 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, 26 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT 27 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 28 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY 29 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 30 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE 31 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 32 */ 33 34 #include "spdk/stdinc.h" 35 36 #include "spdk/bdev.h" 37 #include "spdk/conf.h" 38 39 #include "spdk/config.h" 40 #include "spdk/env.h" 41 #include "spdk/event.h" 42 #include "spdk/thread.h" 43 #include "spdk/likely.h" 44 #include "spdk/queue.h" 45 #include "spdk/nvme_spec.h" 46 #include "spdk/scsi_spec.h" 47 #include "spdk/notify.h" 48 #include "spdk/util.h" 49 #include "spdk/trace.h" 50 51 #include "spdk/bdev_module.h" 52 #include "spdk_internal/log.h" 53 #include "spdk/string.h" 54 55 #ifdef SPDK_CONFIG_VTUNE 56 #include "ittnotify.h" 57 #include "ittnotify_types.h" 58 int __itt_init_ittlib(const char *, __itt_group_id); 59 #endif 60 61 #define SPDK_BDEV_IO_POOL_SIZE (64 * 1024 - 1) 62 #define SPDK_BDEV_IO_CACHE_SIZE 256 63 #define BUF_SMALL_POOL_SIZE 8191 64 #define BUF_LARGE_POOL_SIZE 1023 65 #define NOMEM_THRESHOLD_COUNT 8 66 #define ZERO_BUFFER_SIZE 0x100000 67 68 #define OWNER_BDEV 0x2 69 70 #define OBJECT_BDEV_IO 0x2 71 72 #define TRACE_GROUP_BDEV 0x3 73 #define TRACE_BDEV_IO_START SPDK_TPOINT_ID(TRACE_GROUP_BDEV, 0x0) 74 #define TRACE_BDEV_IO_DONE SPDK_TPOINT_ID(TRACE_GROUP_BDEV, 0x1) 75 76 #define SPDK_BDEV_QOS_TIMESLICE_IN_USEC 1000 77 #define SPDK_BDEV_QOS_MIN_IO_PER_TIMESLICE 1 78 #define SPDK_BDEV_QOS_MIN_BYTE_PER_TIMESLICE 512 79 #define SPDK_BDEV_QOS_MIN_IOS_PER_SEC 1000 80 #define SPDK_BDEV_QOS_MIN_BYTES_PER_SEC (1024 * 1024) 81 #define SPDK_BDEV_QOS_LIMIT_NOT_DEFINED UINT64_MAX 82 83 #define SPDK_BDEV_POOL_ALIGNMENT 512 84 85 static const char *qos_conf_type[] = {"Limit_IOPS", 86 "Limit_BPS", "Limit_Read_BPS", "Limit_Write_BPS" 87 }; 88 static const char *qos_rpc_type[] = {"rw_ios_per_sec", 89 "rw_mbytes_per_sec", "r_mbytes_per_sec", "w_mbytes_per_sec" 90 }; 91 92 TAILQ_HEAD(spdk_bdev_list, spdk_bdev); 93 94 struct spdk_bdev_mgr { 95 struct spdk_mempool *bdev_io_pool; 96 97 struct spdk_mempool *buf_small_pool; 98 struct spdk_mempool *buf_large_pool; 99 100 void *zero_buffer; 101 102 TAILQ_HEAD(bdev_module_list, spdk_bdev_module) bdev_modules; 103 104 struct spdk_bdev_list bdevs; 105 106 bool init_complete; 107 bool module_init_complete; 108 109 pthread_mutex_t mutex; 110 111 #ifdef SPDK_CONFIG_VTUNE 112 __itt_domain *domain; 113 #endif 114 }; 115 116 static struct spdk_bdev_mgr g_bdev_mgr = { 117 .bdev_modules = TAILQ_HEAD_INITIALIZER(g_bdev_mgr.bdev_modules), 118 .bdevs = TAILQ_HEAD_INITIALIZER(g_bdev_mgr.bdevs), 119 .init_complete = false, 120 .module_init_complete = false, 121 .mutex = PTHREAD_MUTEX_INITIALIZER, 122 }; 123 124 125 static struct spdk_bdev_opts g_bdev_opts = { 126 .bdev_io_pool_size = SPDK_BDEV_IO_POOL_SIZE, 127 .bdev_io_cache_size = SPDK_BDEV_IO_CACHE_SIZE, 128 }; 129 130 static spdk_bdev_init_cb g_init_cb_fn = NULL; 131 static void *g_init_cb_arg = NULL; 132 133 static spdk_bdev_fini_cb g_fini_cb_fn = NULL; 134 static void *g_fini_cb_arg = NULL; 135 static struct spdk_thread *g_fini_thread = NULL; 136 137 struct spdk_bdev_qos_limit { 138 /** IOs or bytes allowed per second (i.e., 1s). */ 139 uint64_t limit; 140 141 /** Remaining IOs or bytes allowed in current timeslice (e.g., 1ms). 142 * For remaining bytes, allowed to run negative if an I/O is submitted when 143 * some bytes are remaining, but the I/O is bigger than that amount. The 144 * excess will be deducted from the next timeslice. 145 */ 146 int64_t remaining_this_timeslice; 147 148 /** Minimum allowed IOs or bytes to be issued in one timeslice (e.g., 1ms). */ 149 uint32_t min_per_timeslice; 150 151 /** Maximum allowed IOs or bytes to be issued in one timeslice (e.g., 1ms). */ 152 uint32_t max_per_timeslice; 153 154 /** Function to check whether to queue the IO. */ 155 bool (*queue_io)(const struct spdk_bdev_qos_limit *limit, struct spdk_bdev_io *io); 156 157 /** Function to update for the submitted IO. */ 158 void (*update_quota)(struct spdk_bdev_qos_limit *limit, struct spdk_bdev_io *io); 159 }; 160 161 struct spdk_bdev_qos { 162 /** Types of structure of rate limits. */ 163 struct spdk_bdev_qos_limit rate_limits[SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES]; 164 165 /** The channel that all I/O are funneled through. */ 166 struct spdk_bdev_channel *ch; 167 168 /** The thread on which the poller is running. */ 169 struct spdk_thread *thread; 170 171 /** Queue of I/O waiting to be issued. */ 172 bdev_io_tailq_t queued; 173 174 /** Size of a timeslice in tsc ticks. */ 175 uint64_t timeslice_size; 176 177 /** Timestamp of start of last timeslice. */ 178 uint64_t last_timeslice; 179 180 /** Poller that processes queued I/O commands each time slice. */ 181 struct spdk_poller *poller; 182 }; 183 184 struct spdk_bdev_mgmt_channel { 185 bdev_io_stailq_t need_buf_small; 186 bdev_io_stailq_t need_buf_large; 187 188 /* 189 * Each thread keeps a cache of bdev_io - this allows 190 * bdev threads which are *not* DPDK threads to still 191 * benefit from a per-thread bdev_io cache. Without 192 * this, non-DPDK threads fetching from the mempool 193 * incur a cmpxchg on get and put. 194 */ 195 bdev_io_stailq_t per_thread_cache; 196 uint32_t per_thread_cache_count; 197 uint32_t bdev_io_cache_size; 198 199 TAILQ_HEAD(, spdk_bdev_shared_resource) shared_resources; 200 TAILQ_HEAD(, spdk_bdev_io_wait_entry) io_wait_queue; 201 }; 202 203 /* 204 * Per-module (or per-io_device) data. Multiple bdevs built on the same io_device 205 * will queue here their IO that awaits retry. It makes it possible to retry sending 206 * IO to one bdev after IO from other bdev completes. 207 */ 208 struct spdk_bdev_shared_resource { 209 /* The bdev management channel */ 210 struct spdk_bdev_mgmt_channel *mgmt_ch; 211 212 /* 213 * Count of I/O submitted to bdev module and waiting for completion. 214 * Incremented before submit_request() is called on an spdk_bdev_io. 215 */ 216 uint64_t io_outstanding; 217 218 /* 219 * Queue of IO awaiting retry because of a previous NOMEM status returned 220 * on this channel. 221 */ 222 bdev_io_tailq_t nomem_io; 223 224 /* 225 * Threshold which io_outstanding must drop to before retrying nomem_io. 226 */ 227 uint64_t nomem_threshold; 228 229 /* I/O channel allocated by a bdev module */ 230 struct spdk_io_channel *shared_ch; 231 232 /* Refcount of bdev channels using this resource */ 233 uint32_t ref; 234 235 TAILQ_ENTRY(spdk_bdev_shared_resource) link; 236 }; 237 238 #define BDEV_CH_RESET_IN_PROGRESS (1 << 0) 239 #define BDEV_CH_QOS_ENABLED (1 << 1) 240 241 struct spdk_bdev_channel { 242 struct spdk_bdev *bdev; 243 244 /* The channel for the underlying device */ 245 struct spdk_io_channel *channel; 246 247 /* Per io_device per thread data */ 248 struct spdk_bdev_shared_resource *shared_resource; 249 250 struct spdk_bdev_io_stat stat; 251 252 /* 253 * Count of I/O submitted through this channel and waiting for completion. 254 * Incremented before submit_request() is called on an spdk_bdev_io. 255 */ 256 uint64_t io_outstanding; 257 258 bdev_io_tailq_t queued_resets; 259 260 uint32_t flags; 261 262 struct spdk_histogram_data *histogram; 263 264 #ifdef SPDK_CONFIG_VTUNE 265 uint64_t start_tsc; 266 uint64_t interval_tsc; 267 __itt_string_handle *handle; 268 struct spdk_bdev_io_stat prev_stat; 269 #endif 270 271 }; 272 273 struct spdk_bdev_desc { 274 struct spdk_bdev *bdev; 275 struct spdk_thread *thread; 276 struct { 277 bool open_with_ext; 278 union { 279 spdk_bdev_remove_cb_t remove_fn; 280 spdk_bdev_event_cb_t event_fn; 281 }; 282 void *ctx; 283 } callback; 284 bool remove_scheduled; 285 bool closed; 286 bool write; 287 TAILQ_ENTRY(spdk_bdev_desc) link; 288 }; 289 290 struct spdk_bdev_iostat_ctx { 291 struct spdk_bdev_io_stat *stat; 292 spdk_bdev_get_device_stat_cb cb; 293 void *cb_arg; 294 }; 295 296 struct set_qos_limit_ctx { 297 void (*cb_fn)(void *cb_arg, int status); 298 void *cb_arg; 299 struct spdk_bdev *bdev; 300 }; 301 302 #define __bdev_to_io_dev(bdev) (((char *)bdev) + 1) 303 #define __bdev_from_io_dev(io_dev) ((struct spdk_bdev *)(((char *)io_dev) - 1)) 304 305 static void _spdk_bdev_write_zero_buffer_done(struct spdk_bdev_io *bdev_io, bool success, 306 void *cb_arg); 307 static void _spdk_bdev_write_zero_buffer_next(void *_bdev_io); 308 309 static void _spdk_bdev_enable_qos_msg(struct spdk_io_channel_iter *i); 310 static void _spdk_bdev_enable_qos_done(struct spdk_io_channel_iter *i, int status); 311 312 static int 313 _spdk_bdev_readv_blocks_with_md(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 314 struct iovec *iov, int iovcnt, void *md_buf, uint64_t offset_blocks, 315 uint64_t num_blocks, spdk_bdev_io_completion_cb cb, void *cb_arg); 316 static int 317 _spdk_bdev_writev_blocks_with_md(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 318 struct iovec *iov, int iovcnt, void *md_buf, 319 uint64_t offset_blocks, uint64_t num_blocks, 320 spdk_bdev_io_completion_cb cb, void *cb_arg); 321 322 void 323 spdk_bdev_get_opts(struct spdk_bdev_opts *opts) 324 { 325 *opts = g_bdev_opts; 326 } 327 328 int 329 spdk_bdev_set_opts(struct spdk_bdev_opts *opts) 330 { 331 uint32_t min_pool_size; 332 333 /* 334 * Add 1 to the thread count to account for the extra mgmt_ch that gets created during subsystem 335 * initialization. A second mgmt_ch will be created on the same thread when the application starts 336 * but before the deferred put_io_channel event is executed for the first mgmt_ch. 337 */ 338 min_pool_size = opts->bdev_io_cache_size * (spdk_thread_get_count() + 1); 339 if (opts->bdev_io_pool_size < min_pool_size) { 340 SPDK_ERRLOG("bdev_io_pool_size %" PRIu32 " is not compatible with bdev_io_cache_size %" PRIu32 341 " and %" PRIu32 " threads\n", opts->bdev_io_pool_size, opts->bdev_io_cache_size, 342 spdk_thread_get_count()); 343 SPDK_ERRLOG("bdev_io_pool_size must be at least %" PRIu32 "\n", min_pool_size); 344 return -1; 345 } 346 347 g_bdev_opts = *opts; 348 return 0; 349 } 350 351 struct spdk_bdev * 352 spdk_bdev_first(void) 353 { 354 struct spdk_bdev *bdev; 355 356 bdev = TAILQ_FIRST(&g_bdev_mgr.bdevs); 357 if (bdev) { 358 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Starting bdev iteration at %s\n", bdev->name); 359 } 360 361 return bdev; 362 } 363 364 struct spdk_bdev * 365 spdk_bdev_next(struct spdk_bdev *prev) 366 { 367 struct spdk_bdev *bdev; 368 369 bdev = TAILQ_NEXT(prev, internal.link); 370 if (bdev) { 371 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Continuing bdev iteration at %s\n", bdev->name); 372 } 373 374 return bdev; 375 } 376 377 static struct spdk_bdev * 378 _bdev_next_leaf(struct spdk_bdev *bdev) 379 { 380 while (bdev != NULL) { 381 if (bdev->internal.claim_module == NULL) { 382 return bdev; 383 } else { 384 bdev = TAILQ_NEXT(bdev, internal.link); 385 } 386 } 387 388 return bdev; 389 } 390 391 struct spdk_bdev * 392 spdk_bdev_first_leaf(void) 393 { 394 struct spdk_bdev *bdev; 395 396 bdev = _bdev_next_leaf(TAILQ_FIRST(&g_bdev_mgr.bdevs)); 397 398 if (bdev) { 399 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Starting bdev iteration at %s\n", bdev->name); 400 } 401 402 return bdev; 403 } 404 405 struct spdk_bdev * 406 spdk_bdev_next_leaf(struct spdk_bdev *prev) 407 { 408 struct spdk_bdev *bdev; 409 410 bdev = _bdev_next_leaf(TAILQ_NEXT(prev, internal.link)); 411 412 if (bdev) { 413 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Continuing bdev iteration at %s\n", bdev->name); 414 } 415 416 return bdev; 417 } 418 419 struct spdk_bdev * 420 spdk_bdev_get_by_name(const char *bdev_name) 421 { 422 struct spdk_bdev_alias *tmp; 423 struct spdk_bdev *bdev = spdk_bdev_first(); 424 425 while (bdev != NULL) { 426 if (strcmp(bdev_name, bdev->name) == 0) { 427 return bdev; 428 } 429 430 TAILQ_FOREACH(tmp, &bdev->aliases, tailq) { 431 if (strcmp(bdev_name, tmp->alias) == 0) { 432 return bdev; 433 } 434 } 435 436 bdev = spdk_bdev_next(bdev); 437 } 438 439 return NULL; 440 } 441 442 void 443 spdk_bdev_io_set_buf(struct spdk_bdev_io *bdev_io, void *buf, size_t len) 444 { 445 struct iovec *iovs; 446 447 if (bdev_io->u.bdev.iovs == NULL) { 448 bdev_io->u.bdev.iovs = &bdev_io->iov; 449 bdev_io->u.bdev.iovcnt = 1; 450 } 451 452 iovs = bdev_io->u.bdev.iovs; 453 454 assert(iovs != NULL); 455 assert(bdev_io->u.bdev.iovcnt >= 1); 456 457 iovs[0].iov_base = buf; 458 iovs[0].iov_len = len; 459 } 460 461 void 462 spdk_bdev_io_set_md_buf(struct spdk_bdev_io *bdev_io, void *md_buf, size_t len) 463 { 464 assert((len / spdk_bdev_get_md_size(bdev_io->bdev)) >= bdev_io->u.bdev.num_blocks); 465 bdev_io->u.bdev.md_buf = md_buf; 466 } 467 468 static bool 469 _is_buf_allocated(const struct iovec *iovs) 470 { 471 if (iovs == NULL) { 472 return false; 473 } 474 475 return iovs[0].iov_base != NULL; 476 } 477 478 static bool 479 _are_iovs_aligned(struct iovec *iovs, int iovcnt, uint32_t alignment) 480 { 481 int i; 482 uintptr_t iov_base; 483 484 if (spdk_likely(alignment == 1)) { 485 return true; 486 } 487 488 for (i = 0; i < iovcnt; i++) { 489 iov_base = (uintptr_t)iovs[i].iov_base; 490 if ((iov_base & (alignment - 1)) != 0) { 491 return false; 492 } 493 } 494 495 return true; 496 } 497 498 static void 499 _copy_iovs_to_buf(void *buf, size_t buf_len, struct iovec *iovs, int iovcnt) 500 { 501 int i; 502 size_t len; 503 504 for (i = 0; i < iovcnt; i++) { 505 len = spdk_min(iovs[i].iov_len, buf_len); 506 memcpy(buf, iovs[i].iov_base, len); 507 buf += len; 508 buf_len -= len; 509 } 510 } 511 512 static void 513 _copy_buf_to_iovs(struct iovec *iovs, int iovcnt, void *buf, size_t buf_len) 514 { 515 int i; 516 size_t len; 517 518 for (i = 0; i < iovcnt; i++) { 519 len = spdk_min(iovs[i].iov_len, buf_len); 520 memcpy(iovs[i].iov_base, buf, len); 521 buf += len; 522 buf_len -= len; 523 } 524 } 525 526 static void 527 _bdev_io_set_bounce_buf(struct spdk_bdev_io *bdev_io, void *buf, size_t len) 528 { 529 /* save original iovec */ 530 bdev_io->internal.orig_iovs = bdev_io->u.bdev.iovs; 531 bdev_io->internal.orig_iovcnt = bdev_io->u.bdev.iovcnt; 532 /* set bounce iov */ 533 bdev_io->u.bdev.iovs = &bdev_io->internal.bounce_iov; 534 bdev_io->u.bdev.iovcnt = 1; 535 /* set bounce buffer for this operation */ 536 bdev_io->u.bdev.iovs[0].iov_base = buf; 537 bdev_io->u.bdev.iovs[0].iov_len = len; 538 /* if this is write path, copy data from original buffer to bounce buffer */ 539 if (bdev_io->type == SPDK_BDEV_IO_TYPE_WRITE) { 540 _copy_iovs_to_buf(buf, len, bdev_io->internal.orig_iovs, bdev_io->internal.orig_iovcnt); 541 } 542 } 543 544 static void 545 _bdev_io_set_bounce_md_buf(struct spdk_bdev_io *bdev_io, void *md_buf, size_t len) 546 { 547 /* save original md_buf */ 548 bdev_io->internal.orig_md_buf = bdev_io->u.bdev.md_buf; 549 /* set bounce md_buf */ 550 bdev_io->u.bdev.md_buf = md_buf; 551 552 if (bdev_io->type == SPDK_BDEV_IO_TYPE_WRITE) { 553 memcpy(md_buf, bdev_io->internal.orig_md_buf, len); 554 } 555 } 556 557 static void 558 _bdev_io_set_buf(struct spdk_bdev_io *bdev_io, void *buf, uint64_t len) 559 { 560 struct spdk_bdev *bdev = bdev_io->bdev; 561 bool buf_allocated; 562 uint64_t md_len, alignment; 563 void *aligned_buf; 564 565 alignment = spdk_bdev_get_buf_align(bdev); 566 buf_allocated = _is_buf_allocated(bdev_io->u.bdev.iovs); 567 aligned_buf = (void *)(((uintptr_t)buf + (alignment - 1)) & ~(alignment - 1)); 568 569 if (buf_allocated) { 570 _bdev_io_set_bounce_buf(bdev_io, aligned_buf, len); 571 } else { 572 spdk_bdev_io_set_buf(bdev_io, aligned_buf, len); 573 } 574 575 if (spdk_bdev_is_md_separate(bdev)) { 576 aligned_buf = (char *)aligned_buf + len; 577 md_len = bdev_io->u.bdev.num_blocks * bdev->md_len; 578 579 assert(((uintptr_t)aligned_buf & (alignment - 1)) == 0); 580 581 if (bdev_io->u.bdev.md_buf != NULL) { 582 _bdev_io_set_bounce_md_buf(bdev_io, aligned_buf, md_len); 583 } else { 584 spdk_bdev_io_set_md_buf(bdev_io, aligned_buf, md_len); 585 } 586 } 587 588 bdev_io->internal.buf = buf; 589 bdev_io->internal.get_buf_cb(spdk_bdev_io_get_io_channel(bdev_io), bdev_io, true); 590 } 591 592 static void 593 spdk_bdev_io_put_buf(struct spdk_bdev_io *bdev_io) 594 { 595 struct spdk_bdev *bdev = bdev_io->bdev; 596 struct spdk_mempool *pool; 597 struct spdk_bdev_io *tmp; 598 bdev_io_stailq_t *stailq; 599 struct spdk_bdev_mgmt_channel *ch; 600 uint64_t buf_len, md_len, alignment; 601 void *buf; 602 603 buf = bdev_io->internal.buf; 604 buf_len = bdev_io->internal.buf_len; 605 md_len = spdk_bdev_is_md_separate(bdev) ? bdev_io->u.bdev.num_blocks * bdev->md_len : 0; 606 alignment = spdk_bdev_get_buf_align(bdev); 607 ch = bdev_io->internal.ch->shared_resource->mgmt_ch; 608 609 bdev_io->internal.buf = NULL; 610 611 if (buf_len + alignment + md_len <= SPDK_BDEV_BUF_SIZE_WITH_MD(SPDK_BDEV_SMALL_BUF_MAX_SIZE) + 612 SPDK_BDEV_POOL_ALIGNMENT) { 613 pool = g_bdev_mgr.buf_small_pool; 614 stailq = &ch->need_buf_small; 615 } else { 616 pool = g_bdev_mgr.buf_large_pool; 617 stailq = &ch->need_buf_large; 618 } 619 620 if (STAILQ_EMPTY(stailq)) { 621 spdk_mempool_put(pool, buf); 622 } else { 623 tmp = STAILQ_FIRST(stailq); 624 STAILQ_REMOVE_HEAD(stailq, internal.buf_link); 625 _bdev_io_set_buf(tmp, buf, tmp->internal.buf_len); 626 } 627 } 628 629 static void 630 _bdev_io_unset_bounce_buf(struct spdk_bdev_io *bdev_io) 631 { 632 if (spdk_likely(bdev_io->internal.orig_iovcnt == 0)) { 633 assert(bdev_io->internal.orig_md_buf == NULL); 634 return; 635 } 636 637 /* if this is read path, copy data from bounce buffer to original buffer */ 638 if (bdev_io->type == SPDK_BDEV_IO_TYPE_READ && 639 bdev_io->internal.status == SPDK_BDEV_IO_STATUS_SUCCESS) { 640 _copy_buf_to_iovs(bdev_io->internal.orig_iovs, 641 bdev_io->internal.orig_iovcnt, 642 bdev_io->internal.bounce_iov.iov_base, 643 bdev_io->internal.bounce_iov.iov_len); 644 } 645 /* set orignal buffer for this io */ 646 bdev_io->u.bdev.iovcnt = bdev_io->internal.orig_iovcnt; 647 bdev_io->u.bdev.iovs = bdev_io->internal.orig_iovs; 648 /* disable bouncing buffer for this io */ 649 bdev_io->internal.orig_iovcnt = 0; 650 bdev_io->internal.orig_iovs = NULL; 651 652 /* do the same for metadata buffer */ 653 if (spdk_unlikely(bdev_io->internal.orig_md_buf != NULL)) { 654 assert(spdk_bdev_is_md_separate(bdev_io->bdev)); 655 656 if (bdev_io->type == SPDK_BDEV_IO_TYPE_READ && 657 bdev_io->internal.status == SPDK_BDEV_IO_STATUS_SUCCESS) { 658 memcpy(bdev_io->internal.orig_md_buf, bdev_io->u.bdev.md_buf, 659 bdev_io->u.bdev.num_blocks * spdk_bdev_get_md_size(bdev_io->bdev)); 660 } 661 662 bdev_io->u.bdev.md_buf = bdev_io->internal.orig_md_buf; 663 bdev_io->internal.orig_md_buf = NULL; 664 } 665 666 spdk_bdev_io_put_buf(bdev_io); 667 } 668 669 void 670 spdk_bdev_io_get_buf(struct spdk_bdev_io *bdev_io, spdk_bdev_io_get_buf_cb cb, uint64_t len) 671 { 672 struct spdk_bdev *bdev = bdev_io->bdev; 673 struct spdk_mempool *pool; 674 bdev_io_stailq_t *stailq; 675 struct spdk_bdev_mgmt_channel *mgmt_ch; 676 uint64_t alignment, md_len; 677 void *buf; 678 679 assert(cb != NULL); 680 681 alignment = spdk_bdev_get_buf_align(bdev); 682 md_len = spdk_bdev_is_md_separate(bdev) ? bdev_io->u.bdev.num_blocks * bdev->md_len : 0; 683 684 if (_is_buf_allocated(bdev_io->u.bdev.iovs) && 685 _are_iovs_aligned(bdev_io->u.bdev.iovs, bdev_io->u.bdev.iovcnt, alignment)) { 686 /* Buffer already present and aligned */ 687 cb(spdk_bdev_io_get_io_channel(bdev_io), bdev_io, true); 688 return; 689 } 690 691 if (len + alignment + md_len > SPDK_BDEV_BUF_SIZE_WITH_MD(SPDK_BDEV_LARGE_BUF_MAX_SIZE) + 692 SPDK_BDEV_POOL_ALIGNMENT) { 693 SPDK_ERRLOG("Length + alignment %" PRIu64 " is larger than allowed\n", 694 len + alignment); 695 cb(spdk_bdev_io_get_io_channel(bdev_io), bdev_io, false); 696 return; 697 } 698 699 mgmt_ch = bdev_io->internal.ch->shared_resource->mgmt_ch; 700 701 bdev_io->internal.buf_len = len; 702 bdev_io->internal.get_buf_cb = cb; 703 704 if (len + alignment + md_len <= SPDK_BDEV_BUF_SIZE_WITH_MD(SPDK_BDEV_SMALL_BUF_MAX_SIZE) + 705 SPDK_BDEV_POOL_ALIGNMENT) { 706 pool = g_bdev_mgr.buf_small_pool; 707 stailq = &mgmt_ch->need_buf_small; 708 } else { 709 pool = g_bdev_mgr.buf_large_pool; 710 stailq = &mgmt_ch->need_buf_large; 711 } 712 713 buf = spdk_mempool_get(pool); 714 if (!buf) { 715 STAILQ_INSERT_TAIL(stailq, bdev_io, internal.buf_link); 716 } else { 717 _bdev_io_set_buf(bdev_io, buf, len); 718 } 719 } 720 721 static int 722 spdk_bdev_module_get_max_ctx_size(void) 723 { 724 struct spdk_bdev_module *bdev_module; 725 int max_bdev_module_size = 0; 726 727 TAILQ_FOREACH(bdev_module, &g_bdev_mgr.bdev_modules, internal.tailq) { 728 if (bdev_module->get_ctx_size && bdev_module->get_ctx_size() > max_bdev_module_size) { 729 max_bdev_module_size = bdev_module->get_ctx_size(); 730 } 731 } 732 733 return max_bdev_module_size; 734 } 735 736 void 737 spdk_bdev_config_text(FILE *fp) 738 { 739 struct spdk_bdev_module *bdev_module; 740 741 TAILQ_FOREACH(bdev_module, &g_bdev_mgr.bdev_modules, internal.tailq) { 742 if (bdev_module->config_text) { 743 bdev_module->config_text(fp); 744 } 745 } 746 } 747 748 static void 749 spdk_bdev_qos_config_json(struct spdk_bdev *bdev, struct spdk_json_write_ctx *w) 750 { 751 int i; 752 struct spdk_bdev_qos *qos = bdev->internal.qos; 753 uint64_t limits[SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES]; 754 755 if (!qos) { 756 return; 757 } 758 759 spdk_bdev_get_qos_rate_limits(bdev, limits); 760 761 spdk_json_write_object_begin(w); 762 spdk_json_write_named_string(w, "method", "set_bdev_qos_limit"); 763 764 spdk_json_write_named_object_begin(w, "params"); 765 spdk_json_write_named_string(w, "name", bdev->name); 766 for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) { 767 if (limits[i] > 0) { 768 spdk_json_write_named_uint64(w, qos_rpc_type[i], limits[i]); 769 } 770 } 771 spdk_json_write_object_end(w); 772 773 spdk_json_write_object_end(w); 774 } 775 776 void 777 spdk_bdev_subsystem_config_json(struct spdk_json_write_ctx *w) 778 { 779 struct spdk_bdev_module *bdev_module; 780 struct spdk_bdev *bdev; 781 782 assert(w != NULL); 783 784 spdk_json_write_array_begin(w); 785 786 spdk_json_write_object_begin(w); 787 spdk_json_write_named_string(w, "method", "set_bdev_options"); 788 spdk_json_write_named_object_begin(w, "params"); 789 spdk_json_write_named_uint32(w, "bdev_io_pool_size", g_bdev_opts.bdev_io_pool_size); 790 spdk_json_write_named_uint32(w, "bdev_io_cache_size", g_bdev_opts.bdev_io_cache_size); 791 spdk_json_write_object_end(w); 792 spdk_json_write_object_end(w); 793 794 TAILQ_FOREACH(bdev_module, &g_bdev_mgr.bdev_modules, internal.tailq) { 795 if (bdev_module->config_json) { 796 bdev_module->config_json(w); 797 } 798 } 799 800 pthread_mutex_lock(&g_bdev_mgr.mutex); 801 802 TAILQ_FOREACH(bdev, &g_bdev_mgr.bdevs, internal.link) { 803 if (bdev->fn_table->write_config_json) { 804 bdev->fn_table->write_config_json(bdev, w); 805 } 806 807 spdk_bdev_qos_config_json(bdev, w); 808 } 809 810 pthread_mutex_unlock(&g_bdev_mgr.mutex); 811 812 spdk_json_write_array_end(w); 813 } 814 815 static int 816 spdk_bdev_mgmt_channel_create(void *io_device, void *ctx_buf) 817 { 818 struct spdk_bdev_mgmt_channel *ch = ctx_buf; 819 struct spdk_bdev_io *bdev_io; 820 uint32_t i; 821 822 STAILQ_INIT(&ch->need_buf_small); 823 STAILQ_INIT(&ch->need_buf_large); 824 825 STAILQ_INIT(&ch->per_thread_cache); 826 ch->bdev_io_cache_size = g_bdev_opts.bdev_io_cache_size; 827 828 /* Pre-populate bdev_io cache to ensure this thread cannot be starved. */ 829 ch->per_thread_cache_count = 0; 830 for (i = 0; i < ch->bdev_io_cache_size; i++) { 831 bdev_io = spdk_mempool_get(g_bdev_mgr.bdev_io_pool); 832 assert(bdev_io != NULL); 833 ch->per_thread_cache_count++; 834 STAILQ_INSERT_HEAD(&ch->per_thread_cache, bdev_io, internal.buf_link); 835 } 836 837 TAILQ_INIT(&ch->shared_resources); 838 TAILQ_INIT(&ch->io_wait_queue); 839 840 return 0; 841 } 842 843 static void 844 spdk_bdev_mgmt_channel_destroy(void *io_device, void *ctx_buf) 845 { 846 struct spdk_bdev_mgmt_channel *ch = ctx_buf; 847 struct spdk_bdev_io *bdev_io; 848 849 if (!STAILQ_EMPTY(&ch->need_buf_small) || !STAILQ_EMPTY(&ch->need_buf_large)) { 850 SPDK_ERRLOG("Pending I/O list wasn't empty on mgmt channel free\n"); 851 } 852 853 if (!TAILQ_EMPTY(&ch->shared_resources)) { 854 SPDK_ERRLOG("Module channel list wasn't empty on mgmt channel free\n"); 855 } 856 857 while (!STAILQ_EMPTY(&ch->per_thread_cache)) { 858 bdev_io = STAILQ_FIRST(&ch->per_thread_cache); 859 STAILQ_REMOVE_HEAD(&ch->per_thread_cache, internal.buf_link); 860 ch->per_thread_cache_count--; 861 spdk_mempool_put(g_bdev_mgr.bdev_io_pool, (void *)bdev_io); 862 } 863 864 assert(ch->per_thread_cache_count == 0); 865 } 866 867 static void 868 spdk_bdev_init_complete(int rc) 869 { 870 spdk_bdev_init_cb cb_fn = g_init_cb_fn; 871 void *cb_arg = g_init_cb_arg; 872 struct spdk_bdev_module *m; 873 874 g_bdev_mgr.init_complete = true; 875 g_init_cb_fn = NULL; 876 g_init_cb_arg = NULL; 877 878 /* 879 * For modules that need to know when subsystem init is complete, 880 * inform them now. 881 */ 882 if (rc == 0) { 883 TAILQ_FOREACH(m, &g_bdev_mgr.bdev_modules, internal.tailq) { 884 if (m->init_complete) { 885 m->init_complete(); 886 } 887 } 888 } 889 890 cb_fn(cb_arg, rc); 891 } 892 893 static void 894 spdk_bdev_module_action_complete(void) 895 { 896 struct spdk_bdev_module *m; 897 898 /* 899 * Don't finish bdev subsystem initialization if 900 * module pre-initialization is still in progress, or 901 * the subsystem been already initialized. 902 */ 903 if (!g_bdev_mgr.module_init_complete || g_bdev_mgr.init_complete) { 904 return; 905 } 906 907 /* 908 * Check all bdev modules for inits/examinations in progress. If any 909 * exist, return immediately since we cannot finish bdev subsystem 910 * initialization until all are completed. 911 */ 912 TAILQ_FOREACH(m, &g_bdev_mgr.bdev_modules, internal.tailq) { 913 if (m->internal.action_in_progress > 0) { 914 return; 915 } 916 } 917 918 /* 919 * Modules already finished initialization - now that all 920 * the bdev modules have finished their asynchronous I/O 921 * processing, the entire bdev layer can be marked as complete. 922 */ 923 spdk_bdev_init_complete(0); 924 } 925 926 static void 927 spdk_bdev_module_action_done(struct spdk_bdev_module *module) 928 { 929 assert(module->internal.action_in_progress > 0); 930 module->internal.action_in_progress--; 931 spdk_bdev_module_action_complete(); 932 } 933 934 void 935 spdk_bdev_module_init_done(struct spdk_bdev_module *module) 936 { 937 spdk_bdev_module_action_done(module); 938 } 939 940 void 941 spdk_bdev_module_examine_done(struct spdk_bdev_module *module) 942 { 943 spdk_bdev_module_action_done(module); 944 } 945 946 /** The last initialized bdev module */ 947 static struct spdk_bdev_module *g_resume_bdev_module = NULL; 948 949 static void 950 spdk_bdev_init_failed(void *cb_arg) 951 { 952 struct spdk_bdev_module *module = cb_arg; 953 954 module->internal.action_in_progress--; 955 spdk_bdev_init_complete(-1); 956 } 957 958 static int 959 spdk_bdev_modules_init(void) 960 { 961 struct spdk_bdev_module *module; 962 int rc = 0; 963 964 TAILQ_FOREACH(module, &g_bdev_mgr.bdev_modules, internal.tailq) { 965 g_resume_bdev_module = module; 966 if (module->async_init) { 967 module->internal.action_in_progress = 1; 968 } 969 rc = module->module_init(); 970 if (rc != 0) { 971 /* Bump action_in_progress to prevent other modules from completion of modules_init 972 * Send message to defer application shutdown until resources are cleaned up */ 973 module->internal.action_in_progress = 1; 974 spdk_thread_send_msg(spdk_get_thread(), spdk_bdev_init_failed, module); 975 return rc; 976 } 977 } 978 979 g_resume_bdev_module = NULL; 980 return 0; 981 } 982 983 void 984 spdk_bdev_initialize(spdk_bdev_init_cb cb_fn, void *cb_arg) 985 { 986 struct spdk_conf_section *sp; 987 struct spdk_bdev_opts bdev_opts; 988 int32_t bdev_io_pool_size, bdev_io_cache_size; 989 int cache_size; 990 int rc = 0; 991 char mempool_name[32]; 992 993 assert(cb_fn != NULL); 994 995 sp = spdk_conf_find_section(NULL, "Bdev"); 996 if (sp != NULL) { 997 spdk_bdev_get_opts(&bdev_opts); 998 999 bdev_io_pool_size = spdk_conf_section_get_intval(sp, "BdevIoPoolSize"); 1000 if (bdev_io_pool_size >= 0) { 1001 bdev_opts.bdev_io_pool_size = bdev_io_pool_size; 1002 } 1003 1004 bdev_io_cache_size = spdk_conf_section_get_intval(sp, "BdevIoCacheSize"); 1005 if (bdev_io_cache_size >= 0) { 1006 bdev_opts.bdev_io_cache_size = bdev_io_cache_size; 1007 } 1008 1009 if (spdk_bdev_set_opts(&bdev_opts)) { 1010 spdk_bdev_init_complete(-1); 1011 return; 1012 } 1013 1014 assert(memcmp(&bdev_opts, &g_bdev_opts, sizeof(bdev_opts)) == 0); 1015 } 1016 1017 g_init_cb_fn = cb_fn; 1018 g_init_cb_arg = cb_arg; 1019 1020 spdk_notify_type_register("bdev_register"); 1021 spdk_notify_type_register("bdev_unregister"); 1022 1023 snprintf(mempool_name, sizeof(mempool_name), "bdev_io_%d", getpid()); 1024 1025 g_bdev_mgr.bdev_io_pool = spdk_mempool_create(mempool_name, 1026 g_bdev_opts.bdev_io_pool_size, 1027 sizeof(struct spdk_bdev_io) + 1028 spdk_bdev_module_get_max_ctx_size(), 1029 0, 1030 SPDK_ENV_SOCKET_ID_ANY); 1031 1032 if (g_bdev_mgr.bdev_io_pool == NULL) { 1033 SPDK_ERRLOG("could not allocate spdk_bdev_io pool\n"); 1034 spdk_bdev_init_complete(-1); 1035 return; 1036 } 1037 1038 /** 1039 * Ensure no more than half of the total buffers end up local caches, by 1040 * using spdk_thread_get_count() to determine how many local caches we need 1041 * to account for. 1042 */ 1043 cache_size = BUF_SMALL_POOL_SIZE / (2 * spdk_thread_get_count()); 1044 snprintf(mempool_name, sizeof(mempool_name), "buf_small_pool_%d", getpid()); 1045 1046 g_bdev_mgr.buf_small_pool = spdk_mempool_create(mempool_name, 1047 BUF_SMALL_POOL_SIZE, 1048 SPDK_BDEV_BUF_SIZE_WITH_MD(SPDK_BDEV_SMALL_BUF_MAX_SIZE) + 1049 SPDK_BDEV_POOL_ALIGNMENT, 1050 cache_size, 1051 SPDK_ENV_SOCKET_ID_ANY); 1052 if (!g_bdev_mgr.buf_small_pool) { 1053 SPDK_ERRLOG("create rbuf small pool failed\n"); 1054 spdk_bdev_init_complete(-1); 1055 return; 1056 } 1057 1058 cache_size = BUF_LARGE_POOL_SIZE / (2 * spdk_thread_get_count()); 1059 snprintf(mempool_name, sizeof(mempool_name), "buf_large_pool_%d", getpid()); 1060 1061 g_bdev_mgr.buf_large_pool = spdk_mempool_create(mempool_name, 1062 BUF_LARGE_POOL_SIZE, 1063 SPDK_BDEV_BUF_SIZE_WITH_MD(SPDK_BDEV_LARGE_BUF_MAX_SIZE) + 1064 SPDK_BDEV_POOL_ALIGNMENT, 1065 cache_size, 1066 SPDK_ENV_SOCKET_ID_ANY); 1067 if (!g_bdev_mgr.buf_large_pool) { 1068 SPDK_ERRLOG("create rbuf large pool failed\n"); 1069 spdk_bdev_init_complete(-1); 1070 return; 1071 } 1072 1073 g_bdev_mgr.zero_buffer = spdk_zmalloc(ZERO_BUFFER_SIZE, ZERO_BUFFER_SIZE, 1074 NULL, SPDK_ENV_LCORE_ID_ANY, SPDK_MALLOC_DMA); 1075 if (!g_bdev_mgr.zero_buffer) { 1076 SPDK_ERRLOG("create bdev zero buffer failed\n"); 1077 spdk_bdev_init_complete(-1); 1078 return; 1079 } 1080 1081 #ifdef SPDK_CONFIG_VTUNE 1082 g_bdev_mgr.domain = __itt_domain_create("spdk_bdev"); 1083 #endif 1084 1085 spdk_io_device_register(&g_bdev_mgr, spdk_bdev_mgmt_channel_create, 1086 spdk_bdev_mgmt_channel_destroy, 1087 sizeof(struct spdk_bdev_mgmt_channel), 1088 "bdev_mgr"); 1089 1090 rc = spdk_bdev_modules_init(); 1091 g_bdev_mgr.module_init_complete = true; 1092 if (rc != 0) { 1093 SPDK_ERRLOG("bdev modules init failed\n"); 1094 return; 1095 } 1096 1097 spdk_bdev_module_action_complete(); 1098 } 1099 1100 static void 1101 spdk_bdev_mgr_unregister_cb(void *io_device) 1102 { 1103 spdk_bdev_fini_cb cb_fn = g_fini_cb_fn; 1104 1105 if (spdk_mempool_count(g_bdev_mgr.bdev_io_pool) != g_bdev_opts.bdev_io_pool_size) { 1106 SPDK_ERRLOG("bdev IO pool count is %zu but should be %u\n", 1107 spdk_mempool_count(g_bdev_mgr.bdev_io_pool), 1108 g_bdev_opts.bdev_io_pool_size); 1109 } 1110 1111 if (spdk_mempool_count(g_bdev_mgr.buf_small_pool) != BUF_SMALL_POOL_SIZE) { 1112 SPDK_ERRLOG("Small buffer pool count is %zu but should be %u\n", 1113 spdk_mempool_count(g_bdev_mgr.buf_small_pool), 1114 BUF_SMALL_POOL_SIZE); 1115 assert(false); 1116 } 1117 1118 if (spdk_mempool_count(g_bdev_mgr.buf_large_pool) != BUF_LARGE_POOL_SIZE) { 1119 SPDK_ERRLOG("Large buffer pool count is %zu but should be %u\n", 1120 spdk_mempool_count(g_bdev_mgr.buf_large_pool), 1121 BUF_LARGE_POOL_SIZE); 1122 assert(false); 1123 } 1124 1125 spdk_mempool_free(g_bdev_mgr.bdev_io_pool); 1126 spdk_mempool_free(g_bdev_mgr.buf_small_pool); 1127 spdk_mempool_free(g_bdev_mgr.buf_large_pool); 1128 spdk_free(g_bdev_mgr.zero_buffer); 1129 1130 cb_fn(g_fini_cb_arg); 1131 g_fini_cb_fn = NULL; 1132 g_fini_cb_arg = NULL; 1133 g_bdev_mgr.init_complete = false; 1134 g_bdev_mgr.module_init_complete = false; 1135 pthread_mutex_destroy(&g_bdev_mgr.mutex); 1136 } 1137 1138 static void 1139 spdk_bdev_module_finish_iter(void *arg) 1140 { 1141 struct spdk_bdev_module *bdev_module; 1142 1143 /* FIXME: Handling initialization failures is broken now, 1144 * so we won't even try cleaning up after successfully 1145 * initialized modules. if module_init_complete is false, 1146 * just call spdk_bdev_mgr_unregister_cb 1147 */ 1148 if (!g_bdev_mgr.module_init_complete) { 1149 spdk_bdev_mgr_unregister_cb(NULL); 1150 return; 1151 } 1152 1153 /* Start iterating from the last touched module */ 1154 if (!g_resume_bdev_module) { 1155 bdev_module = TAILQ_LAST(&g_bdev_mgr.bdev_modules, bdev_module_list); 1156 } else { 1157 bdev_module = TAILQ_PREV(g_resume_bdev_module, bdev_module_list, 1158 internal.tailq); 1159 } 1160 1161 while (bdev_module) { 1162 if (bdev_module->async_fini) { 1163 /* Save our place so we can resume later. We must 1164 * save the variable here, before calling module_fini() 1165 * below, because in some cases the module may immediately 1166 * call spdk_bdev_module_finish_done() and re-enter 1167 * this function to continue iterating. */ 1168 g_resume_bdev_module = bdev_module; 1169 } 1170 1171 if (bdev_module->module_fini) { 1172 bdev_module->module_fini(); 1173 } 1174 1175 if (bdev_module->async_fini) { 1176 return; 1177 } 1178 1179 bdev_module = TAILQ_PREV(bdev_module, bdev_module_list, 1180 internal.tailq); 1181 } 1182 1183 g_resume_bdev_module = NULL; 1184 spdk_io_device_unregister(&g_bdev_mgr, spdk_bdev_mgr_unregister_cb); 1185 } 1186 1187 void 1188 spdk_bdev_module_finish_done(void) 1189 { 1190 if (spdk_get_thread() != g_fini_thread) { 1191 spdk_thread_send_msg(g_fini_thread, spdk_bdev_module_finish_iter, NULL); 1192 } else { 1193 spdk_bdev_module_finish_iter(NULL); 1194 } 1195 } 1196 1197 static void 1198 _spdk_bdev_finish_unregister_bdevs_iter(void *cb_arg, int bdeverrno) 1199 { 1200 struct spdk_bdev *bdev = cb_arg; 1201 1202 if (bdeverrno && bdev) { 1203 SPDK_WARNLOG("Unable to unregister bdev '%s' during spdk_bdev_finish()\n", 1204 bdev->name); 1205 1206 /* 1207 * Since the call to spdk_bdev_unregister() failed, we have no way to free this 1208 * bdev; try to continue by manually removing this bdev from the list and continue 1209 * with the next bdev in the list. 1210 */ 1211 TAILQ_REMOVE(&g_bdev_mgr.bdevs, bdev, internal.link); 1212 } 1213 1214 if (TAILQ_EMPTY(&g_bdev_mgr.bdevs)) { 1215 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Done unregistering bdevs\n"); 1216 /* 1217 * Bdev module finish need to be deferred as we might be in the middle of some context 1218 * (like bdev part free) that will use this bdev (or private bdev driver ctx data) 1219 * after returning. 1220 */ 1221 spdk_thread_send_msg(spdk_get_thread(), spdk_bdev_module_finish_iter, NULL); 1222 return; 1223 } 1224 1225 /* 1226 * Unregister last unclaimed bdev in the list, to ensure that bdev subsystem 1227 * shutdown proceeds top-down. The goal is to give virtual bdevs an opportunity 1228 * to detect clean shutdown as opposed to run-time hot removal of the underlying 1229 * base bdevs. 1230 * 1231 * Also, walk the list in the reverse order. 1232 */ 1233 for (bdev = TAILQ_LAST(&g_bdev_mgr.bdevs, spdk_bdev_list); 1234 bdev; bdev = TAILQ_PREV(bdev, spdk_bdev_list, internal.link)) { 1235 if (bdev->internal.claim_module != NULL) { 1236 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Skipping claimed bdev '%s'(<-'%s').\n", 1237 bdev->name, bdev->internal.claim_module->name); 1238 continue; 1239 } 1240 1241 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Unregistering bdev '%s'\n", bdev->name); 1242 spdk_bdev_unregister(bdev, _spdk_bdev_finish_unregister_bdevs_iter, bdev); 1243 return; 1244 } 1245 1246 /* 1247 * If any bdev fails to unclaim underlying bdev properly, we may face the 1248 * case of bdev list consisting of claimed bdevs only (if claims are managed 1249 * correctly, this would mean there's a loop in the claims graph which is 1250 * clearly impossible). Warn and unregister last bdev on the list then. 1251 */ 1252 for (bdev = TAILQ_LAST(&g_bdev_mgr.bdevs, spdk_bdev_list); 1253 bdev; bdev = TAILQ_PREV(bdev, spdk_bdev_list, internal.link)) { 1254 SPDK_WARNLOG("Unregistering claimed bdev '%s'!\n", bdev->name); 1255 spdk_bdev_unregister(bdev, _spdk_bdev_finish_unregister_bdevs_iter, bdev); 1256 return; 1257 } 1258 } 1259 1260 void 1261 spdk_bdev_finish(spdk_bdev_fini_cb cb_fn, void *cb_arg) 1262 { 1263 struct spdk_bdev_module *m; 1264 1265 assert(cb_fn != NULL); 1266 1267 g_fini_thread = spdk_get_thread(); 1268 1269 g_fini_cb_fn = cb_fn; 1270 g_fini_cb_arg = cb_arg; 1271 1272 TAILQ_FOREACH(m, &g_bdev_mgr.bdev_modules, internal.tailq) { 1273 if (m->fini_start) { 1274 m->fini_start(); 1275 } 1276 } 1277 1278 _spdk_bdev_finish_unregister_bdevs_iter(NULL, 0); 1279 } 1280 1281 static struct spdk_bdev_io * 1282 spdk_bdev_get_io(struct spdk_bdev_channel *channel) 1283 { 1284 struct spdk_bdev_mgmt_channel *ch = channel->shared_resource->mgmt_ch; 1285 struct spdk_bdev_io *bdev_io; 1286 1287 if (ch->per_thread_cache_count > 0) { 1288 bdev_io = STAILQ_FIRST(&ch->per_thread_cache); 1289 STAILQ_REMOVE_HEAD(&ch->per_thread_cache, internal.buf_link); 1290 ch->per_thread_cache_count--; 1291 } else if (spdk_unlikely(!TAILQ_EMPTY(&ch->io_wait_queue))) { 1292 /* 1293 * Don't try to look for bdev_ios in the global pool if there are 1294 * waiters on bdev_ios - we don't want this caller to jump the line. 1295 */ 1296 bdev_io = NULL; 1297 } else { 1298 bdev_io = spdk_mempool_get(g_bdev_mgr.bdev_io_pool); 1299 } 1300 1301 return bdev_io; 1302 } 1303 1304 void 1305 spdk_bdev_free_io(struct spdk_bdev_io *bdev_io) 1306 { 1307 struct spdk_bdev_mgmt_channel *ch; 1308 1309 assert(bdev_io != NULL); 1310 assert(bdev_io->internal.status != SPDK_BDEV_IO_STATUS_PENDING); 1311 1312 ch = bdev_io->internal.ch->shared_resource->mgmt_ch; 1313 1314 if (bdev_io->internal.buf != NULL) { 1315 spdk_bdev_io_put_buf(bdev_io); 1316 } 1317 1318 if (ch->per_thread_cache_count < ch->bdev_io_cache_size) { 1319 ch->per_thread_cache_count++; 1320 STAILQ_INSERT_HEAD(&ch->per_thread_cache, bdev_io, internal.buf_link); 1321 while (ch->per_thread_cache_count > 0 && !TAILQ_EMPTY(&ch->io_wait_queue)) { 1322 struct spdk_bdev_io_wait_entry *entry; 1323 1324 entry = TAILQ_FIRST(&ch->io_wait_queue); 1325 TAILQ_REMOVE(&ch->io_wait_queue, entry, link); 1326 entry->cb_fn(entry->cb_arg); 1327 } 1328 } else { 1329 /* We should never have a full cache with entries on the io wait queue. */ 1330 assert(TAILQ_EMPTY(&ch->io_wait_queue)); 1331 spdk_mempool_put(g_bdev_mgr.bdev_io_pool, (void *)bdev_io); 1332 } 1333 } 1334 1335 static bool 1336 _spdk_bdev_qos_is_iops_rate_limit(enum spdk_bdev_qos_rate_limit_type limit) 1337 { 1338 assert(limit != SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES); 1339 1340 switch (limit) { 1341 case SPDK_BDEV_QOS_RW_IOPS_RATE_LIMIT: 1342 return true; 1343 case SPDK_BDEV_QOS_RW_BPS_RATE_LIMIT: 1344 case SPDK_BDEV_QOS_R_BPS_RATE_LIMIT: 1345 case SPDK_BDEV_QOS_W_BPS_RATE_LIMIT: 1346 return false; 1347 case SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES: 1348 default: 1349 return false; 1350 } 1351 } 1352 1353 static bool 1354 _spdk_bdev_qos_io_to_limit(struct spdk_bdev_io *bdev_io) 1355 { 1356 switch (bdev_io->type) { 1357 case SPDK_BDEV_IO_TYPE_NVME_IO: 1358 case SPDK_BDEV_IO_TYPE_NVME_IO_MD: 1359 case SPDK_BDEV_IO_TYPE_READ: 1360 case SPDK_BDEV_IO_TYPE_WRITE: 1361 return true; 1362 default: 1363 return false; 1364 } 1365 } 1366 1367 static bool 1368 _spdk_bdev_is_read_io(struct spdk_bdev_io *bdev_io) 1369 { 1370 switch (bdev_io->type) { 1371 case SPDK_BDEV_IO_TYPE_NVME_IO: 1372 case SPDK_BDEV_IO_TYPE_NVME_IO_MD: 1373 /* Bit 1 (0x2) set for read operation */ 1374 if (bdev_io->u.nvme_passthru.cmd.opc & SPDK_NVME_OPC_READ) { 1375 return true; 1376 } else { 1377 return false; 1378 } 1379 case SPDK_BDEV_IO_TYPE_READ: 1380 return true; 1381 default: 1382 return false; 1383 } 1384 } 1385 1386 static uint64_t 1387 _spdk_bdev_get_io_size_in_byte(struct spdk_bdev_io *bdev_io) 1388 { 1389 struct spdk_bdev *bdev = bdev_io->bdev; 1390 1391 switch (bdev_io->type) { 1392 case SPDK_BDEV_IO_TYPE_NVME_IO: 1393 case SPDK_BDEV_IO_TYPE_NVME_IO_MD: 1394 return bdev_io->u.nvme_passthru.nbytes; 1395 case SPDK_BDEV_IO_TYPE_READ: 1396 case SPDK_BDEV_IO_TYPE_WRITE: 1397 return bdev_io->u.bdev.num_blocks * bdev->blocklen; 1398 default: 1399 return 0; 1400 } 1401 } 1402 1403 static bool 1404 _spdk_bdev_qos_rw_queue_io(const struct spdk_bdev_qos_limit *limit, struct spdk_bdev_io *io) 1405 { 1406 if (limit->max_per_timeslice > 0 && limit->remaining_this_timeslice <= 0) { 1407 return true; 1408 } else { 1409 return false; 1410 } 1411 } 1412 1413 static bool 1414 _spdk_bdev_qos_r_queue_io(const struct spdk_bdev_qos_limit *limit, struct spdk_bdev_io *io) 1415 { 1416 if (_spdk_bdev_is_read_io(io) == false) { 1417 return false; 1418 } 1419 1420 return _spdk_bdev_qos_rw_queue_io(limit, io); 1421 } 1422 1423 static bool 1424 _spdk_bdev_qos_w_queue_io(const struct spdk_bdev_qos_limit *limit, struct spdk_bdev_io *io) 1425 { 1426 if (_spdk_bdev_is_read_io(io) == true) { 1427 return false; 1428 } 1429 1430 return _spdk_bdev_qos_rw_queue_io(limit, io); 1431 } 1432 1433 static void 1434 _spdk_bdev_qos_rw_iops_update_quota(struct spdk_bdev_qos_limit *limit, struct spdk_bdev_io *io) 1435 { 1436 limit->remaining_this_timeslice--; 1437 } 1438 1439 static void 1440 _spdk_bdev_qos_rw_bps_update_quota(struct spdk_bdev_qos_limit *limit, struct spdk_bdev_io *io) 1441 { 1442 limit->remaining_this_timeslice -= _spdk_bdev_get_io_size_in_byte(io); 1443 } 1444 1445 static void 1446 _spdk_bdev_qos_r_bps_update_quota(struct spdk_bdev_qos_limit *limit, struct spdk_bdev_io *io) 1447 { 1448 if (_spdk_bdev_is_read_io(io) == false) { 1449 return; 1450 } 1451 1452 return _spdk_bdev_qos_rw_bps_update_quota(limit, io); 1453 } 1454 1455 static void 1456 _spdk_bdev_qos_w_bps_update_quota(struct spdk_bdev_qos_limit *limit, struct spdk_bdev_io *io) 1457 { 1458 if (_spdk_bdev_is_read_io(io) == true) { 1459 return; 1460 } 1461 1462 return _spdk_bdev_qos_rw_bps_update_quota(limit, io); 1463 } 1464 1465 static void 1466 _spdk_bdev_qos_set_ops(struct spdk_bdev_qos *qos) 1467 { 1468 int i; 1469 1470 for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) { 1471 if (qos->rate_limits[i].limit == SPDK_BDEV_QOS_LIMIT_NOT_DEFINED) { 1472 qos->rate_limits[i].queue_io = NULL; 1473 qos->rate_limits[i].update_quota = NULL; 1474 continue; 1475 } 1476 1477 switch (i) { 1478 case SPDK_BDEV_QOS_RW_IOPS_RATE_LIMIT: 1479 qos->rate_limits[i].queue_io = _spdk_bdev_qos_rw_queue_io; 1480 qos->rate_limits[i].update_quota = _spdk_bdev_qos_rw_iops_update_quota; 1481 break; 1482 case SPDK_BDEV_QOS_RW_BPS_RATE_LIMIT: 1483 qos->rate_limits[i].queue_io = _spdk_bdev_qos_rw_queue_io; 1484 qos->rate_limits[i].update_quota = _spdk_bdev_qos_rw_bps_update_quota; 1485 break; 1486 case SPDK_BDEV_QOS_R_BPS_RATE_LIMIT: 1487 qos->rate_limits[i].queue_io = _spdk_bdev_qos_r_queue_io; 1488 qos->rate_limits[i].update_quota = _spdk_bdev_qos_r_bps_update_quota; 1489 break; 1490 case SPDK_BDEV_QOS_W_BPS_RATE_LIMIT: 1491 qos->rate_limits[i].queue_io = _spdk_bdev_qos_w_queue_io; 1492 qos->rate_limits[i].update_quota = _spdk_bdev_qos_w_bps_update_quota; 1493 break; 1494 default: 1495 break; 1496 } 1497 } 1498 } 1499 1500 static inline void 1501 _spdk_bdev_io_do_submit(struct spdk_bdev_channel *bdev_ch, struct spdk_bdev_io *bdev_io) 1502 { 1503 struct spdk_bdev *bdev = bdev_io->bdev; 1504 struct spdk_io_channel *ch = bdev_ch->channel; 1505 struct spdk_bdev_shared_resource *shared_resource = bdev_ch->shared_resource; 1506 1507 if (spdk_likely(TAILQ_EMPTY(&shared_resource->nomem_io))) { 1508 bdev_ch->io_outstanding++; 1509 shared_resource->io_outstanding++; 1510 bdev_io->internal.in_submit_request = true; 1511 bdev->fn_table->submit_request(ch, bdev_io); 1512 bdev_io->internal.in_submit_request = false; 1513 } else { 1514 TAILQ_INSERT_TAIL(&shared_resource->nomem_io, bdev_io, internal.link); 1515 } 1516 } 1517 1518 static int 1519 _spdk_bdev_qos_io_submit(struct spdk_bdev_channel *ch, struct spdk_bdev_qos *qos) 1520 { 1521 struct spdk_bdev_io *bdev_io = NULL, *tmp = NULL; 1522 int i, submitted_ios = 0; 1523 1524 TAILQ_FOREACH_SAFE(bdev_io, &qos->queued, internal.link, tmp) { 1525 if (_spdk_bdev_qos_io_to_limit(bdev_io) == true) { 1526 for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) { 1527 if (!qos->rate_limits[i].queue_io) { 1528 continue; 1529 } 1530 1531 if (qos->rate_limits[i].queue_io(&qos->rate_limits[i], 1532 bdev_io) == true) { 1533 return submitted_ios; 1534 } 1535 } 1536 for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) { 1537 if (!qos->rate_limits[i].update_quota) { 1538 continue; 1539 } 1540 1541 qos->rate_limits[i].update_quota(&qos->rate_limits[i], bdev_io); 1542 } 1543 } 1544 1545 TAILQ_REMOVE(&qos->queued, bdev_io, internal.link); 1546 _spdk_bdev_io_do_submit(ch, bdev_io); 1547 submitted_ios++; 1548 } 1549 1550 return submitted_ios; 1551 } 1552 1553 static void 1554 _spdk_bdev_queue_io_wait_with_cb(struct spdk_bdev_io *bdev_io, spdk_bdev_io_wait_cb cb_fn) 1555 { 1556 int rc; 1557 1558 bdev_io->internal.waitq_entry.bdev = bdev_io->bdev; 1559 bdev_io->internal.waitq_entry.cb_fn = cb_fn; 1560 bdev_io->internal.waitq_entry.cb_arg = bdev_io; 1561 rc = spdk_bdev_queue_io_wait(bdev_io->bdev, spdk_io_channel_from_ctx(bdev_io->internal.ch), 1562 &bdev_io->internal.waitq_entry); 1563 if (rc != 0) { 1564 SPDK_ERRLOG("Queue IO failed, rc=%d\n", rc); 1565 bdev_io->internal.status = SPDK_BDEV_IO_STATUS_FAILED; 1566 bdev_io->internal.cb(bdev_io, false, bdev_io->internal.caller_ctx); 1567 } 1568 } 1569 1570 static bool 1571 _spdk_bdev_io_type_can_split(uint8_t type) 1572 { 1573 assert(type != SPDK_BDEV_IO_TYPE_INVALID); 1574 assert(type < SPDK_BDEV_NUM_IO_TYPES); 1575 1576 /* Only split READ and WRITE I/O. Theoretically other types of I/O like 1577 * UNMAP could be split, but these types of I/O are typically much larger 1578 * in size (sometimes the size of the entire block device), and the bdev 1579 * module can more efficiently split these types of I/O. Plus those types 1580 * of I/O do not have a payload, which makes the splitting process simpler. 1581 */ 1582 if (type == SPDK_BDEV_IO_TYPE_READ || type == SPDK_BDEV_IO_TYPE_WRITE) { 1583 return true; 1584 } else { 1585 return false; 1586 } 1587 } 1588 1589 static bool 1590 _spdk_bdev_io_should_split(struct spdk_bdev_io *bdev_io) 1591 { 1592 uint64_t start_stripe, end_stripe; 1593 uint32_t io_boundary = bdev_io->bdev->optimal_io_boundary; 1594 1595 if (io_boundary == 0) { 1596 return false; 1597 } 1598 1599 if (!_spdk_bdev_io_type_can_split(bdev_io->type)) { 1600 return false; 1601 } 1602 1603 start_stripe = bdev_io->u.bdev.offset_blocks; 1604 end_stripe = start_stripe + bdev_io->u.bdev.num_blocks - 1; 1605 /* Avoid expensive div operations if possible. These spdk_u32 functions are very cheap. */ 1606 if (spdk_likely(spdk_u32_is_pow2(io_boundary))) { 1607 start_stripe >>= spdk_u32log2(io_boundary); 1608 end_stripe >>= spdk_u32log2(io_boundary); 1609 } else { 1610 start_stripe /= io_boundary; 1611 end_stripe /= io_boundary; 1612 } 1613 return (start_stripe != end_stripe); 1614 } 1615 1616 static uint32_t 1617 _to_next_boundary(uint64_t offset, uint32_t boundary) 1618 { 1619 return (boundary - (offset % boundary)); 1620 } 1621 1622 static void 1623 _spdk_bdev_io_split_done(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg); 1624 1625 static void 1626 _spdk_bdev_io_split(void *_bdev_io) 1627 { 1628 struct spdk_bdev_io *bdev_io = _bdev_io; 1629 uint64_t current_offset, remaining; 1630 uint32_t blocklen, to_next_boundary, to_next_boundary_bytes, to_last_block_bytes; 1631 struct iovec *parent_iov, *iov; 1632 uint64_t parent_iov_offset, iov_len; 1633 uint32_t parent_iovpos, parent_iovcnt, child_iovcnt, iovcnt; 1634 void *md_buf = NULL; 1635 int rc; 1636 1637 remaining = bdev_io->u.bdev.split_remaining_num_blocks; 1638 current_offset = bdev_io->u.bdev.split_current_offset_blocks; 1639 blocklen = bdev_io->bdev->blocklen; 1640 parent_iov_offset = (current_offset - bdev_io->u.bdev.offset_blocks) * blocklen; 1641 parent_iovcnt = bdev_io->u.bdev.iovcnt; 1642 1643 for (parent_iovpos = 0; parent_iovpos < parent_iovcnt; parent_iovpos++) { 1644 parent_iov = &bdev_io->u.bdev.iovs[parent_iovpos]; 1645 if (parent_iov_offset < parent_iov->iov_len) { 1646 break; 1647 } 1648 parent_iov_offset -= parent_iov->iov_len; 1649 } 1650 1651 child_iovcnt = 0; 1652 while (remaining > 0 && parent_iovpos < parent_iovcnt && child_iovcnt < BDEV_IO_NUM_CHILD_IOV) { 1653 to_next_boundary = _to_next_boundary(current_offset, bdev_io->bdev->optimal_io_boundary); 1654 to_next_boundary = spdk_min(remaining, to_next_boundary); 1655 to_next_boundary_bytes = to_next_boundary * blocklen; 1656 iov = &bdev_io->child_iov[child_iovcnt]; 1657 iovcnt = 0; 1658 1659 if (bdev_io->u.bdev.md_buf) { 1660 assert((parent_iov_offset % blocklen) > 0); 1661 md_buf = (char *)bdev_io->u.bdev.md_buf + (parent_iov_offset / blocklen) * 1662 spdk_bdev_get_md_size(bdev_io->bdev); 1663 } 1664 1665 while (to_next_boundary_bytes > 0 && parent_iovpos < parent_iovcnt && 1666 child_iovcnt < BDEV_IO_NUM_CHILD_IOV) { 1667 parent_iov = &bdev_io->u.bdev.iovs[parent_iovpos]; 1668 iov_len = spdk_min(to_next_boundary_bytes, parent_iov->iov_len - parent_iov_offset); 1669 to_next_boundary_bytes -= iov_len; 1670 1671 bdev_io->child_iov[child_iovcnt].iov_base = parent_iov->iov_base + parent_iov_offset; 1672 bdev_io->child_iov[child_iovcnt].iov_len = iov_len; 1673 1674 if (iov_len < parent_iov->iov_len - parent_iov_offset) { 1675 parent_iov_offset += iov_len; 1676 } else { 1677 parent_iovpos++; 1678 parent_iov_offset = 0; 1679 } 1680 child_iovcnt++; 1681 iovcnt++; 1682 } 1683 1684 if (to_next_boundary_bytes > 0) { 1685 /* We had to stop this child I/O early because we ran out of 1686 * child_iov space. Ensure the iovs to be aligned with block 1687 * size and then adjust to_next_boundary before starting the 1688 * child I/O. 1689 */ 1690 assert(child_iovcnt == BDEV_IO_NUM_CHILD_IOV); 1691 to_last_block_bytes = to_next_boundary_bytes % blocklen; 1692 if (to_last_block_bytes != 0) { 1693 uint32_t child_iovpos = child_iovcnt - 1; 1694 /* don't decrease child_iovcnt so the loop will naturally end */ 1695 1696 to_next_boundary_bytes += _to_next_boundary(to_next_boundary_bytes, blocklen); 1697 while (to_last_block_bytes > 0 && iovcnt > 0) { 1698 iov_len = spdk_min(to_last_block_bytes, 1699 bdev_io->child_iov[child_iovpos].iov_len); 1700 bdev_io->child_iov[child_iovpos].iov_len -= iov_len; 1701 if (bdev_io->child_iov[child_iovpos].iov_len == 0) { 1702 child_iovpos--; 1703 iovcnt--; 1704 } 1705 to_last_block_bytes -= iov_len; 1706 } 1707 1708 assert(to_last_block_bytes == 0); 1709 } 1710 to_next_boundary -= to_next_boundary_bytes / blocklen; 1711 } 1712 1713 bdev_io->u.bdev.split_outstanding++; 1714 1715 if (bdev_io->type == SPDK_BDEV_IO_TYPE_READ) { 1716 rc = _spdk_bdev_readv_blocks_with_md(bdev_io->internal.desc, 1717 spdk_io_channel_from_ctx(bdev_io->internal.ch), 1718 iov, iovcnt, md_buf, current_offset, 1719 to_next_boundary, 1720 _spdk_bdev_io_split_done, bdev_io); 1721 } else { 1722 rc = _spdk_bdev_writev_blocks_with_md(bdev_io->internal.desc, 1723 spdk_io_channel_from_ctx(bdev_io->internal.ch), 1724 iov, iovcnt, md_buf, current_offset, 1725 to_next_boundary, 1726 _spdk_bdev_io_split_done, bdev_io); 1727 } 1728 1729 if (rc == 0) { 1730 current_offset += to_next_boundary; 1731 remaining -= to_next_boundary; 1732 bdev_io->u.bdev.split_current_offset_blocks = current_offset; 1733 bdev_io->u.bdev.split_remaining_num_blocks = remaining; 1734 } else { 1735 bdev_io->u.bdev.split_outstanding--; 1736 if (rc == -ENOMEM) { 1737 if (bdev_io->u.bdev.split_outstanding == 0) { 1738 /* No I/O is outstanding. Hence we should wait here. */ 1739 _spdk_bdev_queue_io_wait_with_cb(bdev_io, 1740 _spdk_bdev_io_split); 1741 } 1742 } else { 1743 bdev_io->internal.status = SPDK_BDEV_IO_STATUS_FAILED; 1744 if (bdev_io->u.bdev.split_outstanding == 0) { 1745 bdev_io->internal.cb(bdev_io, false, bdev_io->internal.caller_ctx); 1746 } 1747 } 1748 1749 return; 1750 } 1751 } 1752 } 1753 1754 static void 1755 _spdk_bdev_io_split_done(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg) 1756 { 1757 struct spdk_bdev_io *parent_io = cb_arg; 1758 1759 spdk_bdev_free_io(bdev_io); 1760 1761 if (!success) { 1762 parent_io->internal.status = SPDK_BDEV_IO_STATUS_FAILED; 1763 } 1764 parent_io->u.bdev.split_outstanding--; 1765 if (parent_io->u.bdev.split_outstanding != 0) { 1766 return; 1767 } 1768 1769 /* 1770 * Parent I/O finishes when all blocks are consumed. 1771 */ 1772 if (parent_io->u.bdev.split_remaining_num_blocks == 0) { 1773 parent_io->internal.cb(parent_io, parent_io->internal.status == SPDK_BDEV_IO_STATUS_SUCCESS, 1774 parent_io->internal.caller_ctx); 1775 return; 1776 } 1777 1778 /* 1779 * Continue with the splitting process. This function will complete the parent I/O if the 1780 * splitting is done. 1781 */ 1782 _spdk_bdev_io_split(parent_io); 1783 } 1784 1785 static void 1786 _spdk_bdev_io_split_get_buf_cb(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io, 1787 bool success); 1788 1789 static void 1790 spdk_bdev_io_split(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io) 1791 { 1792 assert(_spdk_bdev_io_type_can_split(bdev_io->type)); 1793 1794 bdev_io->u.bdev.split_current_offset_blocks = bdev_io->u.bdev.offset_blocks; 1795 bdev_io->u.bdev.split_remaining_num_blocks = bdev_io->u.bdev.num_blocks; 1796 bdev_io->u.bdev.split_outstanding = 0; 1797 bdev_io->internal.status = SPDK_BDEV_IO_STATUS_SUCCESS; 1798 1799 if (_is_buf_allocated(bdev_io->u.bdev.iovs)) { 1800 _spdk_bdev_io_split(bdev_io); 1801 } else { 1802 assert(bdev_io->type == SPDK_BDEV_IO_TYPE_READ); 1803 spdk_bdev_io_get_buf(bdev_io, _spdk_bdev_io_split_get_buf_cb, 1804 bdev_io->u.bdev.num_blocks * bdev_io->bdev->blocklen); 1805 } 1806 } 1807 1808 static void 1809 _spdk_bdev_io_split_get_buf_cb(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io, 1810 bool success) 1811 { 1812 if (!success) { 1813 spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED); 1814 return; 1815 } 1816 1817 spdk_bdev_io_split(ch, bdev_io); 1818 } 1819 1820 /* Explicitly mark this inline, since it's used as a function pointer and otherwise won't 1821 * be inlined, at least on some compilers. 1822 */ 1823 static inline void 1824 _spdk_bdev_io_submit(void *ctx) 1825 { 1826 struct spdk_bdev_io *bdev_io = ctx; 1827 struct spdk_bdev *bdev = bdev_io->bdev; 1828 struct spdk_bdev_channel *bdev_ch = bdev_io->internal.ch; 1829 struct spdk_bdev_shared_resource *shared_resource = bdev_ch->shared_resource; 1830 uint64_t tsc; 1831 1832 tsc = spdk_get_ticks(); 1833 bdev_io->internal.submit_tsc = tsc; 1834 spdk_trace_record_tsc(tsc, TRACE_BDEV_IO_START, 0, 0, (uintptr_t)bdev_io, bdev_io->type); 1835 1836 if (spdk_likely(bdev_ch->flags == 0)) { 1837 _spdk_bdev_io_do_submit(bdev_ch, bdev_io); 1838 return; 1839 } 1840 1841 bdev_ch->io_outstanding++; 1842 shared_resource->io_outstanding++; 1843 bdev_io->internal.in_submit_request = true; 1844 if (bdev_ch->flags & BDEV_CH_RESET_IN_PROGRESS) { 1845 spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED); 1846 } else if (bdev_ch->flags & BDEV_CH_QOS_ENABLED) { 1847 bdev_ch->io_outstanding--; 1848 shared_resource->io_outstanding--; 1849 TAILQ_INSERT_TAIL(&bdev->internal.qos->queued, bdev_io, internal.link); 1850 _spdk_bdev_qos_io_submit(bdev_ch, bdev->internal.qos); 1851 } else { 1852 SPDK_ERRLOG("unknown bdev_ch flag %x found\n", bdev_ch->flags); 1853 spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED); 1854 } 1855 bdev_io->internal.in_submit_request = false; 1856 } 1857 1858 static void 1859 spdk_bdev_io_submit(struct spdk_bdev_io *bdev_io) 1860 { 1861 struct spdk_bdev *bdev = bdev_io->bdev; 1862 struct spdk_thread *thread = spdk_bdev_io_get_thread(bdev_io); 1863 1864 assert(thread != NULL); 1865 assert(bdev_io->internal.status == SPDK_BDEV_IO_STATUS_PENDING); 1866 1867 if (bdev->split_on_optimal_io_boundary && _spdk_bdev_io_should_split(bdev_io)) { 1868 spdk_bdev_io_split(NULL, bdev_io); 1869 return; 1870 } 1871 1872 if (bdev_io->internal.ch->flags & BDEV_CH_QOS_ENABLED) { 1873 if ((thread == bdev->internal.qos->thread) || !bdev->internal.qos->thread) { 1874 _spdk_bdev_io_submit(bdev_io); 1875 } else { 1876 bdev_io->internal.io_submit_ch = bdev_io->internal.ch; 1877 bdev_io->internal.ch = bdev->internal.qos->ch; 1878 spdk_thread_send_msg(bdev->internal.qos->thread, _spdk_bdev_io_submit, bdev_io); 1879 } 1880 } else { 1881 _spdk_bdev_io_submit(bdev_io); 1882 } 1883 } 1884 1885 static void 1886 spdk_bdev_io_submit_reset(struct spdk_bdev_io *bdev_io) 1887 { 1888 struct spdk_bdev *bdev = bdev_io->bdev; 1889 struct spdk_bdev_channel *bdev_ch = bdev_io->internal.ch; 1890 struct spdk_io_channel *ch = bdev_ch->channel; 1891 1892 assert(bdev_io->internal.status == SPDK_BDEV_IO_STATUS_PENDING); 1893 1894 bdev_io->internal.in_submit_request = true; 1895 bdev->fn_table->submit_request(ch, bdev_io); 1896 bdev_io->internal.in_submit_request = false; 1897 } 1898 1899 static void 1900 spdk_bdev_io_init(struct spdk_bdev_io *bdev_io, 1901 struct spdk_bdev *bdev, void *cb_arg, 1902 spdk_bdev_io_completion_cb cb) 1903 { 1904 bdev_io->bdev = bdev; 1905 bdev_io->internal.caller_ctx = cb_arg; 1906 bdev_io->internal.cb = cb; 1907 bdev_io->internal.status = SPDK_BDEV_IO_STATUS_PENDING; 1908 bdev_io->internal.in_submit_request = false; 1909 bdev_io->internal.buf = NULL; 1910 bdev_io->internal.io_submit_ch = NULL; 1911 bdev_io->internal.orig_iovs = NULL; 1912 bdev_io->internal.orig_iovcnt = 0; 1913 bdev_io->internal.orig_md_buf = NULL; 1914 } 1915 1916 static bool 1917 _spdk_bdev_io_type_supported(struct spdk_bdev *bdev, enum spdk_bdev_io_type io_type) 1918 { 1919 return bdev->fn_table->io_type_supported(bdev->ctxt, io_type); 1920 } 1921 1922 bool 1923 spdk_bdev_io_type_supported(struct spdk_bdev *bdev, enum spdk_bdev_io_type io_type) 1924 { 1925 bool supported; 1926 1927 supported = _spdk_bdev_io_type_supported(bdev, io_type); 1928 1929 if (!supported) { 1930 switch (io_type) { 1931 case SPDK_BDEV_IO_TYPE_WRITE_ZEROES: 1932 /* The bdev layer will emulate write zeroes as long as write is supported. */ 1933 supported = _spdk_bdev_io_type_supported(bdev, SPDK_BDEV_IO_TYPE_WRITE); 1934 break; 1935 case SPDK_BDEV_IO_TYPE_ZCOPY: 1936 /* Zero copy can be emulated with regular read and write */ 1937 supported = _spdk_bdev_io_type_supported(bdev, SPDK_BDEV_IO_TYPE_READ) && 1938 _spdk_bdev_io_type_supported(bdev, SPDK_BDEV_IO_TYPE_WRITE); 1939 break; 1940 default: 1941 break; 1942 } 1943 } 1944 1945 return supported; 1946 } 1947 1948 int 1949 spdk_bdev_dump_info_json(struct spdk_bdev *bdev, struct spdk_json_write_ctx *w) 1950 { 1951 if (bdev->fn_table->dump_info_json) { 1952 return bdev->fn_table->dump_info_json(bdev->ctxt, w); 1953 } 1954 1955 return 0; 1956 } 1957 1958 static void 1959 spdk_bdev_qos_update_max_quota_per_timeslice(struct spdk_bdev_qos *qos) 1960 { 1961 uint32_t max_per_timeslice = 0; 1962 int i; 1963 1964 for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) { 1965 if (qos->rate_limits[i].limit == SPDK_BDEV_QOS_LIMIT_NOT_DEFINED) { 1966 qos->rate_limits[i].max_per_timeslice = 0; 1967 continue; 1968 } 1969 1970 max_per_timeslice = qos->rate_limits[i].limit * 1971 SPDK_BDEV_QOS_TIMESLICE_IN_USEC / SPDK_SEC_TO_USEC; 1972 1973 qos->rate_limits[i].max_per_timeslice = spdk_max(max_per_timeslice, 1974 qos->rate_limits[i].min_per_timeslice); 1975 1976 qos->rate_limits[i].remaining_this_timeslice = qos->rate_limits[i].max_per_timeslice; 1977 } 1978 1979 _spdk_bdev_qos_set_ops(qos); 1980 } 1981 1982 static int 1983 spdk_bdev_channel_poll_qos(void *arg) 1984 { 1985 struct spdk_bdev_qos *qos = arg; 1986 uint64_t now = spdk_get_ticks(); 1987 int i; 1988 1989 if (now < (qos->last_timeslice + qos->timeslice_size)) { 1990 /* We received our callback earlier than expected - return 1991 * immediately and wait to do accounting until at least one 1992 * timeslice has actually expired. This should never happen 1993 * with a well-behaved timer implementation. 1994 */ 1995 return 0; 1996 } 1997 1998 /* Reset for next round of rate limiting */ 1999 for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) { 2000 /* We may have allowed the IOs or bytes to slightly overrun in the last 2001 * timeslice. remaining_this_timeslice is signed, so if it's negative 2002 * here, we'll account for the overrun so that the next timeslice will 2003 * be appropriately reduced. 2004 */ 2005 if (qos->rate_limits[i].remaining_this_timeslice > 0) { 2006 qos->rate_limits[i].remaining_this_timeslice = 0; 2007 } 2008 } 2009 2010 while (now >= (qos->last_timeslice + qos->timeslice_size)) { 2011 qos->last_timeslice += qos->timeslice_size; 2012 for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) { 2013 qos->rate_limits[i].remaining_this_timeslice += 2014 qos->rate_limits[i].max_per_timeslice; 2015 } 2016 } 2017 2018 return _spdk_bdev_qos_io_submit(qos->ch, qos); 2019 } 2020 2021 static void 2022 _spdk_bdev_channel_destroy_resource(struct spdk_bdev_channel *ch) 2023 { 2024 struct spdk_bdev_shared_resource *shared_resource; 2025 2026 spdk_put_io_channel(ch->channel); 2027 2028 shared_resource = ch->shared_resource; 2029 2030 assert(ch->io_outstanding == 0); 2031 assert(shared_resource->ref > 0); 2032 shared_resource->ref--; 2033 if (shared_resource->ref == 0) { 2034 assert(shared_resource->io_outstanding == 0); 2035 TAILQ_REMOVE(&shared_resource->mgmt_ch->shared_resources, shared_resource, link); 2036 spdk_put_io_channel(spdk_io_channel_from_ctx(shared_resource->mgmt_ch)); 2037 free(shared_resource); 2038 } 2039 } 2040 2041 /* Caller must hold bdev->internal.mutex. */ 2042 static void 2043 _spdk_bdev_enable_qos(struct spdk_bdev *bdev, struct spdk_bdev_channel *ch) 2044 { 2045 struct spdk_bdev_qos *qos = bdev->internal.qos; 2046 int i; 2047 2048 /* Rate limiting on this bdev enabled */ 2049 if (qos) { 2050 if (qos->ch == NULL) { 2051 struct spdk_io_channel *io_ch; 2052 2053 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Selecting channel %p as QoS channel for bdev %s on thread %p\n", ch, 2054 bdev->name, spdk_get_thread()); 2055 2056 /* No qos channel has been selected, so set one up */ 2057 2058 /* Take another reference to ch */ 2059 io_ch = spdk_get_io_channel(__bdev_to_io_dev(bdev)); 2060 assert(io_ch != NULL); 2061 qos->ch = ch; 2062 2063 qos->thread = spdk_io_channel_get_thread(io_ch); 2064 2065 TAILQ_INIT(&qos->queued); 2066 2067 for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) { 2068 if (_spdk_bdev_qos_is_iops_rate_limit(i) == true) { 2069 qos->rate_limits[i].min_per_timeslice = 2070 SPDK_BDEV_QOS_MIN_IO_PER_TIMESLICE; 2071 } else { 2072 qos->rate_limits[i].min_per_timeslice = 2073 SPDK_BDEV_QOS_MIN_BYTE_PER_TIMESLICE; 2074 } 2075 2076 if (qos->rate_limits[i].limit == 0) { 2077 qos->rate_limits[i].limit = SPDK_BDEV_QOS_LIMIT_NOT_DEFINED; 2078 } 2079 } 2080 spdk_bdev_qos_update_max_quota_per_timeslice(qos); 2081 qos->timeslice_size = 2082 SPDK_BDEV_QOS_TIMESLICE_IN_USEC * spdk_get_ticks_hz() / SPDK_SEC_TO_USEC; 2083 qos->last_timeslice = spdk_get_ticks(); 2084 qos->poller = spdk_poller_register(spdk_bdev_channel_poll_qos, 2085 qos, 2086 SPDK_BDEV_QOS_TIMESLICE_IN_USEC); 2087 } 2088 2089 ch->flags |= BDEV_CH_QOS_ENABLED; 2090 } 2091 } 2092 2093 static int 2094 spdk_bdev_channel_create(void *io_device, void *ctx_buf) 2095 { 2096 struct spdk_bdev *bdev = __bdev_from_io_dev(io_device); 2097 struct spdk_bdev_channel *ch = ctx_buf; 2098 struct spdk_io_channel *mgmt_io_ch; 2099 struct spdk_bdev_mgmt_channel *mgmt_ch; 2100 struct spdk_bdev_shared_resource *shared_resource; 2101 2102 ch->bdev = bdev; 2103 ch->channel = bdev->fn_table->get_io_channel(bdev->ctxt); 2104 if (!ch->channel) { 2105 return -1; 2106 } 2107 2108 assert(ch->histogram == NULL); 2109 if (bdev->internal.histogram_enabled) { 2110 ch->histogram = spdk_histogram_data_alloc(); 2111 if (ch->histogram == NULL) { 2112 SPDK_ERRLOG("Could not allocate histogram\n"); 2113 } 2114 } 2115 2116 mgmt_io_ch = spdk_get_io_channel(&g_bdev_mgr); 2117 if (!mgmt_io_ch) { 2118 spdk_put_io_channel(ch->channel); 2119 return -1; 2120 } 2121 2122 mgmt_ch = spdk_io_channel_get_ctx(mgmt_io_ch); 2123 TAILQ_FOREACH(shared_resource, &mgmt_ch->shared_resources, link) { 2124 if (shared_resource->shared_ch == ch->channel) { 2125 spdk_put_io_channel(mgmt_io_ch); 2126 shared_resource->ref++; 2127 break; 2128 } 2129 } 2130 2131 if (shared_resource == NULL) { 2132 shared_resource = calloc(1, sizeof(*shared_resource)); 2133 if (shared_resource == NULL) { 2134 spdk_put_io_channel(ch->channel); 2135 spdk_put_io_channel(mgmt_io_ch); 2136 return -1; 2137 } 2138 2139 shared_resource->mgmt_ch = mgmt_ch; 2140 shared_resource->io_outstanding = 0; 2141 TAILQ_INIT(&shared_resource->nomem_io); 2142 shared_resource->nomem_threshold = 0; 2143 shared_resource->shared_ch = ch->channel; 2144 shared_resource->ref = 1; 2145 TAILQ_INSERT_TAIL(&mgmt_ch->shared_resources, shared_resource, link); 2146 } 2147 2148 memset(&ch->stat, 0, sizeof(ch->stat)); 2149 ch->stat.ticks_rate = spdk_get_ticks_hz(); 2150 ch->io_outstanding = 0; 2151 TAILQ_INIT(&ch->queued_resets); 2152 ch->flags = 0; 2153 ch->shared_resource = shared_resource; 2154 2155 #ifdef SPDK_CONFIG_VTUNE 2156 { 2157 char *name; 2158 __itt_init_ittlib(NULL, 0); 2159 name = spdk_sprintf_alloc("spdk_bdev_%s_%p", ch->bdev->name, ch); 2160 if (!name) { 2161 _spdk_bdev_channel_destroy_resource(ch); 2162 return -1; 2163 } 2164 ch->handle = __itt_string_handle_create(name); 2165 free(name); 2166 ch->start_tsc = spdk_get_ticks(); 2167 ch->interval_tsc = spdk_get_ticks_hz() / 100; 2168 memset(&ch->prev_stat, 0, sizeof(ch->prev_stat)); 2169 } 2170 #endif 2171 2172 pthread_mutex_lock(&bdev->internal.mutex); 2173 _spdk_bdev_enable_qos(bdev, ch); 2174 pthread_mutex_unlock(&bdev->internal.mutex); 2175 2176 return 0; 2177 } 2178 2179 /* 2180 * Abort I/O that are waiting on a data buffer. These types of I/O are 2181 * linked using the spdk_bdev_io internal.buf_link TAILQ_ENTRY. 2182 */ 2183 static void 2184 _spdk_bdev_abort_buf_io(bdev_io_stailq_t *queue, struct spdk_bdev_channel *ch) 2185 { 2186 bdev_io_stailq_t tmp; 2187 struct spdk_bdev_io *bdev_io; 2188 2189 STAILQ_INIT(&tmp); 2190 2191 while (!STAILQ_EMPTY(queue)) { 2192 bdev_io = STAILQ_FIRST(queue); 2193 STAILQ_REMOVE_HEAD(queue, internal.buf_link); 2194 if (bdev_io->internal.ch == ch) { 2195 spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED); 2196 } else { 2197 STAILQ_INSERT_TAIL(&tmp, bdev_io, internal.buf_link); 2198 } 2199 } 2200 2201 STAILQ_SWAP(&tmp, queue, spdk_bdev_io); 2202 } 2203 2204 /* 2205 * Abort I/O that are queued waiting for submission. These types of I/O are 2206 * linked using the spdk_bdev_io link TAILQ_ENTRY. 2207 */ 2208 static void 2209 _spdk_bdev_abort_queued_io(bdev_io_tailq_t *queue, struct spdk_bdev_channel *ch) 2210 { 2211 struct spdk_bdev_io *bdev_io, *tmp; 2212 2213 TAILQ_FOREACH_SAFE(bdev_io, queue, internal.link, tmp) { 2214 if (bdev_io->internal.ch == ch) { 2215 TAILQ_REMOVE(queue, bdev_io, internal.link); 2216 /* 2217 * spdk_bdev_io_complete() assumes that the completed I/O had 2218 * been submitted to the bdev module. Since in this case it 2219 * hadn't, bump io_outstanding to account for the decrement 2220 * that spdk_bdev_io_complete() will do. 2221 */ 2222 if (bdev_io->type != SPDK_BDEV_IO_TYPE_RESET) { 2223 ch->io_outstanding++; 2224 ch->shared_resource->io_outstanding++; 2225 } 2226 spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED); 2227 } 2228 } 2229 } 2230 2231 static void 2232 spdk_bdev_qos_channel_destroy(void *cb_arg) 2233 { 2234 struct spdk_bdev_qos *qos = cb_arg; 2235 2236 spdk_put_io_channel(spdk_io_channel_from_ctx(qos->ch)); 2237 spdk_poller_unregister(&qos->poller); 2238 2239 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Free QoS %p.\n", qos); 2240 2241 free(qos); 2242 } 2243 2244 static int 2245 spdk_bdev_qos_destroy(struct spdk_bdev *bdev) 2246 { 2247 int i; 2248 2249 /* 2250 * Cleanly shutting down the QoS poller is tricky, because 2251 * during the asynchronous operation the user could open 2252 * a new descriptor and create a new channel, spawning 2253 * a new QoS poller. 2254 * 2255 * The strategy is to create a new QoS structure here and swap it 2256 * in. The shutdown path then continues to refer to the old one 2257 * until it completes and then releases it. 2258 */ 2259 struct spdk_bdev_qos *new_qos, *old_qos; 2260 2261 old_qos = bdev->internal.qos; 2262 2263 new_qos = calloc(1, sizeof(*new_qos)); 2264 if (!new_qos) { 2265 SPDK_ERRLOG("Unable to allocate memory to shut down QoS.\n"); 2266 return -ENOMEM; 2267 } 2268 2269 /* Copy the old QoS data into the newly allocated structure */ 2270 memcpy(new_qos, old_qos, sizeof(*new_qos)); 2271 2272 /* Zero out the key parts of the QoS structure */ 2273 new_qos->ch = NULL; 2274 new_qos->thread = NULL; 2275 new_qos->poller = NULL; 2276 TAILQ_INIT(&new_qos->queued); 2277 /* 2278 * The limit member of spdk_bdev_qos_limit structure is not zeroed. 2279 * It will be used later for the new QoS structure. 2280 */ 2281 for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) { 2282 new_qos->rate_limits[i].remaining_this_timeslice = 0; 2283 new_qos->rate_limits[i].min_per_timeslice = 0; 2284 new_qos->rate_limits[i].max_per_timeslice = 0; 2285 } 2286 2287 bdev->internal.qos = new_qos; 2288 2289 if (old_qos->thread == NULL) { 2290 free(old_qos); 2291 } else { 2292 spdk_thread_send_msg(old_qos->thread, spdk_bdev_qos_channel_destroy, 2293 old_qos); 2294 } 2295 2296 /* It is safe to continue with destroying the bdev even though the QoS channel hasn't 2297 * been destroyed yet. The destruction path will end up waiting for the final 2298 * channel to be put before it releases resources. */ 2299 2300 return 0; 2301 } 2302 2303 static void 2304 _spdk_bdev_io_stat_add(struct spdk_bdev_io_stat *total, struct spdk_bdev_io_stat *add) 2305 { 2306 total->bytes_read += add->bytes_read; 2307 total->num_read_ops += add->num_read_ops; 2308 total->bytes_written += add->bytes_written; 2309 total->num_write_ops += add->num_write_ops; 2310 total->bytes_unmapped += add->bytes_unmapped; 2311 total->num_unmap_ops += add->num_unmap_ops; 2312 total->read_latency_ticks += add->read_latency_ticks; 2313 total->write_latency_ticks += add->write_latency_ticks; 2314 total->unmap_latency_ticks += add->unmap_latency_ticks; 2315 } 2316 2317 static void 2318 spdk_bdev_channel_destroy(void *io_device, void *ctx_buf) 2319 { 2320 struct spdk_bdev_channel *ch = ctx_buf; 2321 struct spdk_bdev_mgmt_channel *mgmt_ch; 2322 struct spdk_bdev_shared_resource *shared_resource = ch->shared_resource; 2323 2324 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Destroying channel %p for bdev %s on thread %p\n", ch, ch->bdev->name, 2325 spdk_get_thread()); 2326 2327 /* This channel is going away, so add its statistics into the bdev so that they don't get lost. */ 2328 pthread_mutex_lock(&ch->bdev->internal.mutex); 2329 _spdk_bdev_io_stat_add(&ch->bdev->internal.stat, &ch->stat); 2330 pthread_mutex_unlock(&ch->bdev->internal.mutex); 2331 2332 mgmt_ch = shared_resource->mgmt_ch; 2333 2334 _spdk_bdev_abort_queued_io(&ch->queued_resets, ch); 2335 _spdk_bdev_abort_queued_io(&shared_resource->nomem_io, ch); 2336 _spdk_bdev_abort_buf_io(&mgmt_ch->need_buf_small, ch); 2337 _spdk_bdev_abort_buf_io(&mgmt_ch->need_buf_large, ch); 2338 2339 if (ch->histogram) { 2340 spdk_histogram_data_free(ch->histogram); 2341 } 2342 2343 _spdk_bdev_channel_destroy_resource(ch); 2344 } 2345 2346 int 2347 spdk_bdev_alias_add(struct spdk_bdev *bdev, const char *alias) 2348 { 2349 struct spdk_bdev_alias *tmp; 2350 2351 if (alias == NULL) { 2352 SPDK_ERRLOG("Empty alias passed\n"); 2353 return -EINVAL; 2354 } 2355 2356 if (spdk_bdev_get_by_name(alias)) { 2357 SPDK_ERRLOG("Bdev name/alias: %s already exists\n", alias); 2358 return -EEXIST; 2359 } 2360 2361 tmp = calloc(1, sizeof(*tmp)); 2362 if (tmp == NULL) { 2363 SPDK_ERRLOG("Unable to allocate alias\n"); 2364 return -ENOMEM; 2365 } 2366 2367 tmp->alias = strdup(alias); 2368 if (tmp->alias == NULL) { 2369 free(tmp); 2370 SPDK_ERRLOG("Unable to allocate alias\n"); 2371 return -ENOMEM; 2372 } 2373 2374 TAILQ_INSERT_TAIL(&bdev->aliases, tmp, tailq); 2375 2376 return 0; 2377 } 2378 2379 int 2380 spdk_bdev_alias_del(struct spdk_bdev *bdev, const char *alias) 2381 { 2382 struct spdk_bdev_alias *tmp; 2383 2384 TAILQ_FOREACH(tmp, &bdev->aliases, tailq) { 2385 if (strcmp(alias, tmp->alias) == 0) { 2386 TAILQ_REMOVE(&bdev->aliases, tmp, tailq); 2387 free(tmp->alias); 2388 free(tmp); 2389 return 0; 2390 } 2391 } 2392 2393 SPDK_INFOLOG(SPDK_LOG_BDEV, "Alias %s does not exists\n", alias); 2394 2395 return -ENOENT; 2396 } 2397 2398 void 2399 spdk_bdev_alias_del_all(struct spdk_bdev *bdev) 2400 { 2401 struct spdk_bdev_alias *p, *tmp; 2402 2403 TAILQ_FOREACH_SAFE(p, &bdev->aliases, tailq, tmp) { 2404 TAILQ_REMOVE(&bdev->aliases, p, tailq); 2405 free(p->alias); 2406 free(p); 2407 } 2408 } 2409 2410 struct spdk_io_channel * 2411 spdk_bdev_get_io_channel(struct spdk_bdev_desc *desc) 2412 { 2413 return spdk_get_io_channel(__bdev_to_io_dev(spdk_bdev_desc_get_bdev(desc))); 2414 } 2415 2416 const char * 2417 spdk_bdev_get_name(const struct spdk_bdev *bdev) 2418 { 2419 return bdev->name; 2420 } 2421 2422 const char * 2423 spdk_bdev_get_product_name(const struct spdk_bdev *bdev) 2424 { 2425 return bdev->product_name; 2426 } 2427 2428 const struct spdk_bdev_aliases_list * 2429 spdk_bdev_get_aliases(const struct spdk_bdev *bdev) 2430 { 2431 return &bdev->aliases; 2432 } 2433 2434 uint32_t 2435 spdk_bdev_get_block_size(const struct spdk_bdev *bdev) 2436 { 2437 return bdev->blocklen; 2438 } 2439 2440 uint64_t 2441 spdk_bdev_get_num_blocks(const struct spdk_bdev *bdev) 2442 { 2443 return bdev->blockcnt; 2444 } 2445 2446 const char * 2447 spdk_bdev_get_qos_rpc_type(enum spdk_bdev_qos_rate_limit_type type) 2448 { 2449 return qos_rpc_type[type]; 2450 } 2451 2452 void 2453 spdk_bdev_get_qos_rate_limits(struct spdk_bdev *bdev, uint64_t *limits) 2454 { 2455 int i; 2456 2457 memset(limits, 0, sizeof(*limits) * SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES); 2458 2459 pthread_mutex_lock(&bdev->internal.mutex); 2460 if (bdev->internal.qos) { 2461 for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) { 2462 if (bdev->internal.qos->rate_limits[i].limit != 2463 SPDK_BDEV_QOS_LIMIT_NOT_DEFINED) { 2464 limits[i] = bdev->internal.qos->rate_limits[i].limit; 2465 if (_spdk_bdev_qos_is_iops_rate_limit(i) == false) { 2466 /* Change from Byte to Megabyte which is user visible. */ 2467 limits[i] = limits[i] / 1024 / 1024; 2468 } 2469 } 2470 } 2471 } 2472 pthread_mutex_unlock(&bdev->internal.mutex); 2473 } 2474 2475 size_t 2476 spdk_bdev_get_buf_align(const struct spdk_bdev *bdev) 2477 { 2478 return 1 << bdev->required_alignment; 2479 } 2480 2481 uint32_t 2482 spdk_bdev_get_optimal_io_boundary(const struct spdk_bdev *bdev) 2483 { 2484 return bdev->optimal_io_boundary; 2485 } 2486 2487 bool 2488 spdk_bdev_has_write_cache(const struct spdk_bdev *bdev) 2489 { 2490 return bdev->write_cache; 2491 } 2492 2493 const struct spdk_uuid * 2494 spdk_bdev_get_uuid(const struct spdk_bdev *bdev) 2495 { 2496 return &bdev->uuid; 2497 } 2498 2499 uint32_t 2500 spdk_bdev_get_md_size(const struct spdk_bdev *bdev) 2501 { 2502 return bdev->md_len; 2503 } 2504 2505 bool 2506 spdk_bdev_is_md_interleaved(const struct spdk_bdev *bdev) 2507 { 2508 return (bdev->md_len != 0) && bdev->md_interleave; 2509 } 2510 2511 bool 2512 spdk_bdev_is_md_separate(const struct spdk_bdev *bdev) 2513 { 2514 return (bdev->md_len != 0) && !bdev->md_interleave; 2515 } 2516 2517 uint32_t 2518 spdk_bdev_get_data_block_size(const struct spdk_bdev *bdev) 2519 { 2520 if (spdk_bdev_is_md_interleaved(bdev)) { 2521 return bdev->blocklen - bdev->md_len; 2522 } else { 2523 return bdev->blocklen; 2524 } 2525 } 2526 2527 static uint32_t 2528 _bdev_get_block_size_with_md(const struct spdk_bdev *bdev) 2529 { 2530 if (!spdk_bdev_is_md_interleaved(bdev)) { 2531 return bdev->blocklen + bdev->md_len; 2532 } else { 2533 return bdev->blocklen; 2534 } 2535 } 2536 2537 enum spdk_dif_type spdk_bdev_get_dif_type(const struct spdk_bdev *bdev) 2538 { 2539 if (bdev->md_len != 0) { 2540 return bdev->dif_type; 2541 } else { 2542 return SPDK_DIF_DISABLE; 2543 } 2544 } 2545 2546 bool 2547 spdk_bdev_is_dif_head_of_md(const struct spdk_bdev *bdev) 2548 { 2549 if (spdk_bdev_get_dif_type(bdev) != SPDK_DIF_DISABLE) { 2550 return bdev->dif_is_head_of_md; 2551 } else { 2552 return false; 2553 } 2554 } 2555 2556 bool 2557 spdk_bdev_is_dif_check_enabled(const struct spdk_bdev *bdev, 2558 enum spdk_dif_check_type check_type) 2559 { 2560 if (spdk_bdev_get_dif_type(bdev) == SPDK_DIF_DISABLE) { 2561 return false; 2562 } 2563 2564 switch (check_type) { 2565 case SPDK_DIF_CHECK_TYPE_REFTAG: 2566 return (bdev->dif_check_flags & SPDK_DIF_FLAGS_REFTAG_CHECK) != 0; 2567 case SPDK_DIF_CHECK_TYPE_APPTAG: 2568 return (bdev->dif_check_flags & SPDK_DIF_FLAGS_APPTAG_CHECK) != 0; 2569 case SPDK_DIF_CHECK_TYPE_GUARD: 2570 return (bdev->dif_check_flags & SPDK_DIF_FLAGS_GUARD_CHECK) != 0; 2571 default: 2572 return false; 2573 } 2574 } 2575 2576 uint64_t 2577 spdk_bdev_get_qd(const struct spdk_bdev *bdev) 2578 { 2579 return bdev->internal.measured_queue_depth; 2580 } 2581 2582 uint64_t 2583 spdk_bdev_get_qd_sampling_period(const struct spdk_bdev *bdev) 2584 { 2585 return bdev->internal.period; 2586 } 2587 2588 uint64_t 2589 spdk_bdev_get_weighted_io_time(const struct spdk_bdev *bdev) 2590 { 2591 return bdev->internal.weighted_io_time; 2592 } 2593 2594 uint64_t 2595 spdk_bdev_get_io_time(const struct spdk_bdev *bdev) 2596 { 2597 return bdev->internal.io_time; 2598 } 2599 2600 static void 2601 _calculate_measured_qd_cpl(struct spdk_io_channel_iter *i, int status) 2602 { 2603 struct spdk_bdev *bdev = spdk_io_channel_iter_get_ctx(i); 2604 2605 bdev->internal.measured_queue_depth = bdev->internal.temporary_queue_depth; 2606 2607 if (bdev->internal.measured_queue_depth) { 2608 bdev->internal.io_time += bdev->internal.period; 2609 bdev->internal.weighted_io_time += bdev->internal.period * bdev->internal.measured_queue_depth; 2610 } 2611 } 2612 2613 static void 2614 _calculate_measured_qd(struct spdk_io_channel_iter *i) 2615 { 2616 struct spdk_bdev *bdev = spdk_io_channel_iter_get_ctx(i); 2617 struct spdk_io_channel *io_ch = spdk_io_channel_iter_get_channel(i); 2618 struct spdk_bdev_channel *ch = spdk_io_channel_get_ctx(io_ch); 2619 2620 bdev->internal.temporary_queue_depth += ch->io_outstanding; 2621 spdk_for_each_channel_continue(i, 0); 2622 } 2623 2624 static int 2625 spdk_bdev_calculate_measured_queue_depth(void *ctx) 2626 { 2627 struct spdk_bdev *bdev = ctx; 2628 bdev->internal.temporary_queue_depth = 0; 2629 spdk_for_each_channel(__bdev_to_io_dev(bdev), _calculate_measured_qd, bdev, 2630 _calculate_measured_qd_cpl); 2631 return 0; 2632 } 2633 2634 void 2635 spdk_bdev_set_qd_sampling_period(struct spdk_bdev *bdev, uint64_t period) 2636 { 2637 bdev->internal.period = period; 2638 2639 if (bdev->internal.qd_poller != NULL) { 2640 spdk_poller_unregister(&bdev->internal.qd_poller); 2641 bdev->internal.measured_queue_depth = UINT64_MAX; 2642 } 2643 2644 if (period != 0) { 2645 bdev->internal.qd_poller = spdk_poller_register(spdk_bdev_calculate_measured_queue_depth, bdev, 2646 period); 2647 } 2648 } 2649 2650 int 2651 spdk_bdev_notify_blockcnt_change(struct spdk_bdev *bdev, uint64_t size) 2652 { 2653 int ret; 2654 2655 pthread_mutex_lock(&bdev->internal.mutex); 2656 2657 /* bdev has open descriptors */ 2658 if (!TAILQ_EMPTY(&bdev->internal.open_descs) && 2659 bdev->blockcnt > size) { 2660 ret = -EBUSY; 2661 } else { 2662 bdev->blockcnt = size; 2663 ret = 0; 2664 } 2665 2666 pthread_mutex_unlock(&bdev->internal.mutex); 2667 2668 return ret; 2669 } 2670 2671 /* 2672 * Convert I/O offset and length from bytes to blocks. 2673 * 2674 * Returns zero on success or non-zero if the byte parameters aren't divisible by the block size. 2675 */ 2676 static uint64_t 2677 spdk_bdev_bytes_to_blocks(struct spdk_bdev *bdev, uint64_t offset_bytes, uint64_t *offset_blocks, 2678 uint64_t num_bytes, uint64_t *num_blocks) 2679 { 2680 uint32_t block_size = bdev->blocklen; 2681 uint8_t shift_cnt; 2682 2683 /* Avoid expensive div operations if possible. These spdk_u32 functions are very cheap. */ 2684 if (spdk_likely(spdk_u32_is_pow2(block_size))) { 2685 shift_cnt = spdk_u32log2(block_size); 2686 *offset_blocks = offset_bytes >> shift_cnt; 2687 *num_blocks = num_bytes >> shift_cnt; 2688 return (offset_bytes - (*offset_blocks << shift_cnt)) | 2689 (num_bytes - (*num_blocks << shift_cnt)); 2690 } else { 2691 *offset_blocks = offset_bytes / block_size; 2692 *num_blocks = num_bytes / block_size; 2693 return (offset_bytes % block_size) | (num_bytes % block_size); 2694 } 2695 } 2696 2697 static bool 2698 spdk_bdev_io_valid_blocks(struct spdk_bdev *bdev, uint64_t offset_blocks, uint64_t num_blocks) 2699 { 2700 /* Return failure if offset_blocks + num_blocks is less than offset_blocks; indicates there 2701 * has been an overflow and hence the offset has been wrapped around */ 2702 if (offset_blocks + num_blocks < offset_blocks) { 2703 return false; 2704 } 2705 2706 /* Return failure if offset_blocks + num_blocks exceeds the size of the bdev */ 2707 if (offset_blocks + num_blocks > bdev->blockcnt) { 2708 return false; 2709 } 2710 2711 return true; 2712 } 2713 2714 static bool 2715 _bdev_io_check_md_buf(const struct iovec *iovs, const void *md_buf) 2716 { 2717 return _is_buf_allocated(iovs) == (md_buf != NULL); 2718 } 2719 2720 static int 2721 _spdk_bdev_read_blocks_with_md(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, void *buf, 2722 void *md_buf, int64_t offset_blocks, uint64_t num_blocks, 2723 spdk_bdev_io_completion_cb cb, void *cb_arg) 2724 { 2725 struct spdk_bdev *bdev = spdk_bdev_desc_get_bdev(desc); 2726 struct spdk_bdev_io *bdev_io; 2727 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 2728 2729 if (!spdk_bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) { 2730 return -EINVAL; 2731 } 2732 2733 bdev_io = spdk_bdev_get_io(channel); 2734 if (!bdev_io) { 2735 return -ENOMEM; 2736 } 2737 2738 bdev_io->internal.ch = channel; 2739 bdev_io->internal.desc = desc; 2740 bdev_io->type = SPDK_BDEV_IO_TYPE_READ; 2741 bdev_io->u.bdev.iovs = &bdev_io->iov; 2742 bdev_io->u.bdev.iovs[0].iov_base = buf; 2743 bdev_io->u.bdev.iovs[0].iov_len = num_blocks * bdev->blocklen; 2744 bdev_io->u.bdev.iovcnt = 1; 2745 bdev_io->u.bdev.md_buf = md_buf; 2746 bdev_io->u.bdev.num_blocks = num_blocks; 2747 bdev_io->u.bdev.offset_blocks = offset_blocks; 2748 spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb); 2749 2750 spdk_bdev_io_submit(bdev_io); 2751 return 0; 2752 } 2753 2754 int 2755 spdk_bdev_read(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 2756 void *buf, uint64_t offset, uint64_t nbytes, 2757 spdk_bdev_io_completion_cb cb, void *cb_arg) 2758 { 2759 uint64_t offset_blocks, num_blocks; 2760 2761 if (spdk_bdev_bytes_to_blocks(spdk_bdev_desc_get_bdev(desc), offset, &offset_blocks, 2762 nbytes, &num_blocks) != 0) { 2763 return -EINVAL; 2764 } 2765 2766 return spdk_bdev_read_blocks(desc, ch, buf, offset_blocks, num_blocks, cb, cb_arg); 2767 } 2768 2769 int 2770 spdk_bdev_read_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 2771 void *buf, uint64_t offset_blocks, uint64_t num_blocks, 2772 spdk_bdev_io_completion_cb cb, void *cb_arg) 2773 { 2774 return _spdk_bdev_read_blocks_with_md(desc, ch, buf, NULL, offset_blocks, num_blocks, 2775 cb, cb_arg); 2776 } 2777 2778 int 2779 spdk_bdev_read_blocks_with_md(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 2780 void *buf, void *md_buf, int64_t offset_blocks, uint64_t num_blocks, 2781 spdk_bdev_io_completion_cb cb, void *cb_arg) 2782 { 2783 struct iovec iov = { 2784 .iov_base = buf, 2785 }; 2786 2787 if (!spdk_bdev_is_md_separate(spdk_bdev_desc_get_bdev(desc))) { 2788 return -EINVAL; 2789 } 2790 2791 if (!_bdev_io_check_md_buf(&iov, md_buf)) { 2792 return -EINVAL; 2793 } 2794 2795 return _spdk_bdev_read_blocks_with_md(desc, ch, buf, md_buf, offset_blocks, num_blocks, 2796 cb, cb_arg); 2797 } 2798 2799 int 2800 spdk_bdev_readv(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 2801 struct iovec *iov, int iovcnt, 2802 uint64_t offset, uint64_t nbytes, 2803 spdk_bdev_io_completion_cb cb, void *cb_arg) 2804 { 2805 uint64_t offset_blocks, num_blocks; 2806 2807 if (spdk_bdev_bytes_to_blocks(spdk_bdev_desc_get_bdev(desc), offset, &offset_blocks, 2808 nbytes, &num_blocks) != 0) { 2809 return -EINVAL; 2810 } 2811 2812 return spdk_bdev_readv_blocks(desc, ch, iov, iovcnt, offset_blocks, num_blocks, cb, cb_arg); 2813 } 2814 2815 static int 2816 _spdk_bdev_readv_blocks_with_md(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 2817 struct iovec *iov, int iovcnt, void *md_buf, uint64_t offset_blocks, 2818 uint64_t num_blocks, spdk_bdev_io_completion_cb cb, void *cb_arg) 2819 { 2820 struct spdk_bdev *bdev = spdk_bdev_desc_get_bdev(desc); 2821 struct spdk_bdev_io *bdev_io; 2822 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 2823 2824 if (!spdk_bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) { 2825 return -EINVAL; 2826 } 2827 2828 bdev_io = spdk_bdev_get_io(channel); 2829 if (!bdev_io) { 2830 return -ENOMEM; 2831 } 2832 2833 bdev_io->internal.ch = channel; 2834 bdev_io->internal.desc = desc; 2835 bdev_io->type = SPDK_BDEV_IO_TYPE_READ; 2836 bdev_io->u.bdev.iovs = iov; 2837 bdev_io->u.bdev.iovcnt = iovcnt; 2838 bdev_io->u.bdev.md_buf = md_buf; 2839 bdev_io->u.bdev.num_blocks = num_blocks; 2840 bdev_io->u.bdev.offset_blocks = offset_blocks; 2841 spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb); 2842 2843 spdk_bdev_io_submit(bdev_io); 2844 return 0; 2845 } 2846 2847 int spdk_bdev_readv_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 2848 struct iovec *iov, int iovcnt, 2849 uint64_t offset_blocks, uint64_t num_blocks, 2850 spdk_bdev_io_completion_cb cb, void *cb_arg) 2851 { 2852 return _spdk_bdev_readv_blocks_with_md(desc, ch, iov, iovcnt, NULL, offset_blocks, 2853 num_blocks, cb, cb_arg); 2854 } 2855 2856 int 2857 spdk_bdev_readv_blocks_with_md(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 2858 struct iovec *iov, int iovcnt, void *md_buf, 2859 uint64_t offset_blocks, uint64_t num_blocks, 2860 spdk_bdev_io_completion_cb cb, void *cb_arg) 2861 { 2862 if (!spdk_bdev_is_md_separate(spdk_bdev_desc_get_bdev(desc))) { 2863 return -EINVAL; 2864 } 2865 2866 if (!_bdev_io_check_md_buf(iov, md_buf)) { 2867 return -EINVAL; 2868 } 2869 2870 return _spdk_bdev_readv_blocks_with_md(desc, ch, iov, iovcnt, md_buf, offset_blocks, 2871 num_blocks, cb, cb_arg); 2872 } 2873 2874 static int 2875 _spdk_bdev_write_blocks_with_md(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 2876 void *buf, void *md_buf, uint64_t offset_blocks, uint64_t num_blocks, 2877 spdk_bdev_io_completion_cb cb, void *cb_arg) 2878 { 2879 struct spdk_bdev *bdev = spdk_bdev_desc_get_bdev(desc); 2880 struct spdk_bdev_io *bdev_io; 2881 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 2882 2883 if (!desc->write) { 2884 return -EBADF; 2885 } 2886 2887 if (!spdk_bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) { 2888 return -EINVAL; 2889 } 2890 2891 bdev_io = spdk_bdev_get_io(channel); 2892 if (!bdev_io) { 2893 return -ENOMEM; 2894 } 2895 2896 bdev_io->internal.ch = channel; 2897 bdev_io->internal.desc = desc; 2898 bdev_io->type = SPDK_BDEV_IO_TYPE_WRITE; 2899 bdev_io->u.bdev.iovs = &bdev_io->iov; 2900 bdev_io->u.bdev.iovs[0].iov_base = buf; 2901 bdev_io->u.bdev.iovs[0].iov_len = num_blocks * bdev->blocklen; 2902 bdev_io->u.bdev.iovcnt = 1; 2903 bdev_io->u.bdev.md_buf = md_buf; 2904 bdev_io->u.bdev.num_blocks = num_blocks; 2905 bdev_io->u.bdev.offset_blocks = offset_blocks; 2906 spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb); 2907 2908 spdk_bdev_io_submit(bdev_io); 2909 return 0; 2910 } 2911 2912 int 2913 spdk_bdev_write(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 2914 void *buf, uint64_t offset, uint64_t nbytes, 2915 spdk_bdev_io_completion_cb cb, void *cb_arg) 2916 { 2917 uint64_t offset_blocks, num_blocks; 2918 2919 if (spdk_bdev_bytes_to_blocks(spdk_bdev_desc_get_bdev(desc), offset, &offset_blocks, 2920 nbytes, &num_blocks) != 0) { 2921 return -EINVAL; 2922 } 2923 2924 return spdk_bdev_write_blocks(desc, ch, buf, offset_blocks, num_blocks, cb, cb_arg); 2925 } 2926 2927 int 2928 spdk_bdev_write_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 2929 void *buf, uint64_t offset_blocks, uint64_t num_blocks, 2930 spdk_bdev_io_completion_cb cb, void *cb_arg) 2931 { 2932 return _spdk_bdev_write_blocks_with_md(desc, ch, buf, NULL, offset_blocks, num_blocks, 2933 cb, cb_arg); 2934 } 2935 2936 int 2937 spdk_bdev_write_blocks_with_md(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 2938 void *buf, void *md_buf, uint64_t offset_blocks, uint64_t num_blocks, 2939 spdk_bdev_io_completion_cb cb, void *cb_arg) 2940 { 2941 struct iovec iov = { 2942 .iov_base = buf, 2943 }; 2944 2945 if (!spdk_bdev_is_md_separate(spdk_bdev_desc_get_bdev(desc))) { 2946 return -EINVAL; 2947 } 2948 2949 if (!_bdev_io_check_md_buf(&iov, md_buf)) { 2950 return -EINVAL; 2951 } 2952 2953 return _spdk_bdev_write_blocks_with_md(desc, ch, buf, md_buf, offset_blocks, num_blocks, 2954 cb, cb_arg); 2955 } 2956 2957 static int 2958 _spdk_bdev_writev_blocks_with_md(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 2959 struct iovec *iov, int iovcnt, void *md_buf, 2960 uint64_t offset_blocks, uint64_t num_blocks, 2961 spdk_bdev_io_completion_cb cb, void *cb_arg) 2962 { 2963 struct spdk_bdev *bdev = spdk_bdev_desc_get_bdev(desc); 2964 struct spdk_bdev_io *bdev_io; 2965 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 2966 2967 if (!desc->write) { 2968 return -EBADF; 2969 } 2970 2971 if (!spdk_bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) { 2972 return -EINVAL; 2973 } 2974 2975 bdev_io = spdk_bdev_get_io(channel); 2976 if (!bdev_io) { 2977 return -ENOMEM; 2978 } 2979 2980 bdev_io->internal.ch = channel; 2981 bdev_io->internal.desc = desc; 2982 bdev_io->type = SPDK_BDEV_IO_TYPE_WRITE; 2983 bdev_io->u.bdev.iovs = iov; 2984 bdev_io->u.bdev.iovcnt = iovcnt; 2985 bdev_io->u.bdev.md_buf = md_buf; 2986 bdev_io->u.bdev.num_blocks = num_blocks; 2987 bdev_io->u.bdev.offset_blocks = offset_blocks; 2988 spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb); 2989 2990 spdk_bdev_io_submit(bdev_io); 2991 return 0; 2992 } 2993 2994 int 2995 spdk_bdev_writev(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 2996 struct iovec *iov, int iovcnt, 2997 uint64_t offset, uint64_t len, 2998 spdk_bdev_io_completion_cb cb, void *cb_arg) 2999 { 3000 uint64_t offset_blocks, num_blocks; 3001 3002 if (spdk_bdev_bytes_to_blocks(spdk_bdev_desc_get_bdev(desc), offset, &offset_blocks, 3003 len, &num_blocks) != 0) { 3004 return -EINVAL; 3005 } 3006 3007 return spdk_bdev_writev_blocks(desc, ch, iov, iovcnt, offset_blocks, num_blocks, cb, cb_arg); 3008 } 3009 3010 int 3011 spdk_bdev_writev_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 3012 struct iovec *iov, int iovcnt, 3013 uint64_t offset_blocks, uint64_t num_blocks, 3014 spdk_bdev_io_completion_cb cb, void *cb_arg) 3015 { 3016 return _spdk_bdev_writev_blocks_with_md(desc, ch, iov, iovcnt, NULL, offset_blocks, 3017 num_blocks, cb, cb_arg); 3018 } 3019 3020 int 3021 spdk_bdev_writev_blocks_with_md(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 3022 struct iovec *iov, int iovcnt, void *md_buf, 3023 uint64_t offset_blocks, uint64_t num_blocks, 3024 spdk_bdev_io_completion_cb cb, void *cb_arg) 3025 { 3026 if (!spdk_bdev_is_md_separate(spdk_bdev_desc_get_bdev(desc))) { 3027 return -EINVAL; 3028 } 3029 3030 if (!_bdev_io_check_md_buf(iov, md_buf)) { 3031 return -EINVAL; 3032 } 3033 3034 return _spdk_bdev_writev_blocks_with_md(desc, ch, iov, iovcnt, md_buf, offset_blocks, 3035 num_blocks, cb, cb_arg); 3036 } 3037 3038 static void 3039 bdev_zcopy_get_buf(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io, bool success) 3040 { 3041 if (!success) { 3042 /* Don't use spdk_bdev_io_complete here - this bdev_io was never actually submitted. */ 3043 bdev_io->internal.status = SPDK_BDEV_IO_STATUS_NOMEM; 3044 bdev_io->internal.cb(bdev_io, success, bdev_io->internal.caller_ctx); 3045 return; 3046 } 3047 3048 if (bdev_io->u.bdev.zcopy.populate) { 3049 /* Read the real data into the buffer */ 3050 bdev_io->type = SPDK_BDEV_IO_TYPE_READ; 3051 bdev_io->internal.status = SPDK_BDEV_IO_STATUS_PENDING; 3052 spdk_bdev_io_submit(bdev_io); 3053 return; 3054 } 3055 3056 /* Don't use spdk_bdev_io_complete here - this bdev_io was never actually submitted. */ 3057 bdev_io->internal.status = SPDK_BDEV_IO_STATUS_SUCCESS; 3058 bdev_io->internal.cb(bdev_io, success, bdev_io->internal.caller_ctx); 3059 } 3060 3061 int 3062 spdk_bdev_zcopy_start(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 3063 uint64_t offset_blocks, uint64_t num_blocks, 3064 bool populate, 3065 spdk_bdev_io_completion_cb cb, void *cb_arg) 3066 { 3067 struct spdk_bdev *bdev = spdk_bdev_desc_get_bdev(desc); 3068 struct spdk_bdev_io *bdev_io; 3069 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 3070 3071 if (!desc->write) { 3072 return -EBADF; 3073 } 3074 3075 if (!spdk_bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) { 3076 return -EINVAL; 3077 } 3078 3079 if (!spdk_bdev_io_type_supported(bdev, SPDK_BDEV_IO_TYPE_ZCOPY)) { 3080 return -ENOTSUP; 3081 } 3082 3083 bdev_io = spdk_bdev_get_io(channel); 3084 if (!bdev_io) { 3085 return -ENOMEM; 3086 } 3087 3088 bdev_io->internal.ch = channel; 3089 bdev_io->internal.desc = desc; 3090 bdev_io->type = SPDK_BDEV_IO_TYPE_ZCOPY; 3091 bdev_io->u.bdev.num_blocks = num_blocks; 3092 bdev_io->u.bdev.offset_blocks = offset_blocks; 3093 bdev_io->u.bdev.iovs = NULL; 3094 bdev_io->u.bdev.iovcnt = 0; 3095 bdev_io->u.bdev.zcopy.populate = populate ? 1 : 0; 3096 bdev_io->u.bdev.zcopy.commit = 0; 3097 bdev_io->u.bdev.zcopy.start = 1; 3098 spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb); 3099 3100 if (_spdk_bdev_io_type_supported(bdev, SPDK_BDEV_IO_TYPE_ZCOPY)) { 3101 spdk_bdev_io_submit(bdev_io); 3102 } else { 3103 /* Emulate zcopy by allocating a buffer */ 3104 spdk_bdev_io_get_buf(bdev_io, bdev_zcopy_get_buf, 3105 bdev_io->u.bdev.num_blocks * bdev->blocklen); 3106 } 3107 3108 return 0; 3109 } 3110 3111 int 3112 spdk_bdev_zcopy_end(struct spdk_bdev_io *bdev_io, bool commit, 3113 spdk_bdev_io_completion_cb cb, void *cb_arg) 3114 { 3115 struct spdk_bdev *bdev = bdev_io->bdev; 3116 3117 if (bdev_io->type == SPDK_BDEV_IO_TYPE_READ) { 3118 /* This can happen if the zcopy was emulated in start */ 3119 if (bdev_io->u.bdev.zcopy.start != 1) { 3120 return -EINVAL; 3121 } 3122 bdev_io->type = SPDK_BDEV_IO_TYPE_ZCOPY; 3123 } 3124 3125 if (bdev_io->type != SPDK_BDEV_IO_TYPE_ZCOPY) { 3126 return -EINVAL; 3127 } 3128 3129 bdev_io->u.bdev.zcopy.commit = commit ? 1 : 0; 3130 bdev_io->u.bdev.zcopy.start = 0; 3131 bdev_io->internal.caller_ctx = cb_arg; 3132 bdev_io->internal.cb = cb; 3133 bdev_io->internal.status = SPDK_BDEV_IO_STATUS_PENDING; 3134 3135 if (_spdk_bdev_io_type_supported(bdev, SPDK_BDEV_IO_TYPE_ZCOPY)) { 3136 spdk_bdev_io_submit(bdev_io); 3137 return 0; 3138 } 3139 3140 if (!bdev_io->u.bdev.zcopy.commit) { 3141 /* Don't use spdk_bdev_io_complete here - this bdev_io was never actually submitted. */ 3142 bdev_io->internal.status = SPDK_BDEV_IO_STATUS_SUCCESS; 3143 bdev_io->internal.cb(bdev_io, true, bdev_io->internal.caller_ctx); 3144 return 0; 3145 } 3146 3147 bdev_io->type = SPDK_BDEV_IO_TYPE_WRITE; 3148 spdk_bdev_io_submit(bdev_io); 3149 3150 return 0; 3151 } 3152 3153 int 3154 spdk_bdev_write_zeroes(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 3155 uint64_t offset, uint64_t len, 3156 spdk_bdev_io_completion_cb cb, void *cb_arg) 3157 { 3158 uint64_t offset_blocks, num_blocks; 3159 3160 if (spdk_bdev_bytes_to_blocks(spdk_bdev_desc_get_bdev(desc), offset, &offset_blocks, 3161 len, &num_blocks) != 0) { 3162 return -EINVAL; 3163 } 3164 3165 return spdk_bdev_write_zeroes_blocks(desc, ch, offset_blocks, num_blocks, cb, cb_arg); 3166 } 3167 3168 int 3169 spdk_bdev_write_zeroes_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 3170 uint64_t offset_blocks, uint64_t num_blocks, 3171 spdk_bdev_io_completion_cb cb, void *cb_arg) 3172 { 3173 struct spdk_bdev *bdev = spdk_bdev_desc_get_bdev(desc); 3174 struct spdk_bdev_io *bdev_io; 3175 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 3176 3177 if (!desc->write) { 3178 return -EBADF; 3179 } 3180 3181 if (!spdk_bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) { 3182 return -EINVAL; 3183 } 3184 3185 if (!_spdk_bdev_io_type_supported(bdev, SPDK_BDEV_IO_TYPE_WRITE_ZEROES) && 3186 !_spdk_bdev_io_type_supported(bdev, SPDK_BDEV_IO_TYPE_WRITE)) { 3187 return -ENOTSUP; 3188 } 3189 3190 bdev_io = spdk_bdev_get_io(channel); 3191 3192 if (!bdev_io) { 3193 return -ENOMEM; 3194 } 3195 3196 bdev_io->type = SPDK_BDEV_IO_TYPE_WRITE_ZEROES; 3197 bdev_io->internal.ch = channel; 3198 bdev_io->internal.desc = desc; 3199 bdev_io->u.bdev.offset_blocks = offset_blocks; 3200 bdev_io->u.bdev.num_blocks = num_blocks; 3201 spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb); 3202 3203 if (_spdk_bdev_io_type_supported(bdev, SPDK_BDEV_IO_TYPE_WRITE_ZEROES)) { 3204 spdk_bdev_io_submit(bdev_io); 3205 return 0; 3206 } 3207 3208 assert(_spdk_bdev_io_type_supported(bdev, SPDK_BDEV_IO_TYPE_WRITE)); 3209 assert(_bdev_get_block_size_with_md(bdev) <= ZERO_BUFFER_SIZE); 3210 bdev_io->u.bdev.split_remaining_num_blocks = num_blocks; 3211 bdev_io->u.bdev.split_current_offset_blocks = offset_blocks; 3212 _spdk_bdev_write_zero_buffer_next(bdev_io); 3213 3214 return 0; 3215 } 3216 3217 int 3218 spdk_bdev_unmap(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 3219 uint64_t offset, uint64_t nbytes, 3220 spdk_bdev_io_completion_cb cb, void *cb_arg) 3221 { 3222 uint64_t offset_blocks, num_blocks; 3223 3224 if (spdk_bdev_bytes_to_blocks(spdk_bdev_desc_get_bdev(desc), offset, &offset_blocks, 3225 nbytes, &num_blocks) != 0) { 3226 return -EINVAL; 3227 } 3228 3229 return spdk_bdev_unmap_blocks(desc, ch, offset_blocks, num_blocks, cb, cb_arg); 3230 } 3231 3232 int 3233 spdk_bdev_unmap_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 3234 uint64_t offset_blocks, uint64_t num_blocks, 3235 spdk_bdev_io_completion_cb cb, void *cb_arg) 3236 { 3237 struct spdk_bdev *bdev = spdk_bdev_desc_get_bdev(desc); 3238 struct spdk_bdev_io *bdev_io; 3239 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 3240 3241 if (!desc->write) { 3242 return -EBADF; 3243 } 3244 3245 if (!spdk_bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) { 3246 return -EINVAL; 3247 } 3248 3249 if (num_blocks == 0) { 3250 SPDK_ERRLOG("Can't unmap 0 bytes\n"); 3251 return -EINVAL; 3252 } 3253 3254 bdev_io = spdk_bdev_get_io(channel); 3255 if (!bdev_io) { 3256 return -ENOMEM; 3257 } 3258 3259 bdev_io->internal.ch = channel; 3260 bdev_io->internal.desc = desc; 3261 bdev_io->type = SPDK_BDEV_IO_TYPE_UNMAP; 3262 3263 bdev_io->u.bdev.iovs = &bdev_io->iov; 3264 bdev_io->u.bdev.iovs[0].iov_base = NULL; 3265 bdev_io->u.bdev.iovs[0].iov_len = 0; 3266 bdev_io->u.bdev.iovcnt = 1; 3267 3268 bdev_io->u.bdev.offset_blocks = offset_blocks; 3269 bdev_io->u.bdev.num_blocks = num_blocks; 3270 spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb); 3271 3272 spdk_bdev_io_submit(bdev_io); 3273 return 0; 3274 } 3275 3276 int 3277 spdk_bdev_flush(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 3278 uint64_t offset, uint64_t length, 3279 spdk_bdev_io_completion_cb cb, void *cb_arg) 3280 { 3281 uint64_t offset_blocks, num_blocks; 3282 3283 if (spdk_bdev_bytes_to_blocks(spdk_bdev_desc_get_bdev(desc), offset, &offset_blocks, 3284 length, &num_blocks) != 0) { 3285 return -EINVAL; 3286 } 3287 3288 return spdk_bdev_flush_blocks(desc, ch, offset_blocks, num_blocks, cb, cb_arg); 3289 } 3290 3291 int 3292 spdk_bdev_flush_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 3293 uint64_t offset_blocks, uint64_t num_blocks, 3294 spdk_bdev_io_completion_cb cb, void *cb_arg) 3295 { 3296 struct spdk_bdev *bdev = spdk_bdev_desc_get_bdev(desc); 3297 struct spdk_bdev_io *bdev_io; 3298 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 3299 3300 if (!desc->write) { 3301 return -EBADF; 3302 } 3303 3304 if (!spdk_bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) { 3305 return -EINVAL; 3306 } 3307 3308 bdev_io = spdk_bdev_get_io(channel); 3309 if (!bdev_io) { 3310 return -ENOMEM; 3311 } 3312 3313 bdev_io->internal.ch = channel; 3314 bdev_io->internal.desc = desc; 3315 bdev_io->type = SPDK_BDEV_IO_TYPE_FLUSH; 3316 bdev_io->u.bdev.iovs = NULL; 3317 bdev_io->u.bdev.iovcnt = 0; 3318 bdev_io->u.bdev.offset_blocks = offset_blocks; 3319 bdev_io->u.bdev.num_blocks = num_blocks; 3320 spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb); 3321 3322 spdk_bdev_io_submit(bdev_io); 3323 return 0; 3324 } 3325 3326 static void 3327 _spdk_bdev_reset_dev(struct spdk_io_channel_iter *i, int status) 3328 { 3329 struct spdk_bdev_channel *ch = spdk_io_channel_iter_get_ctx(i); 3330 struct spdk_bdev_io *bdev_io; 3331 3332 bdev_io = TAILQ_FIRST(&ch->queued_resets); 3333 TAILQ_REMOVE(&ch->queued_resets, bdev_io, internal.link); 3334 spdk_bdev_io_submit_reset(bdev_io); 3335 } 3336 3337 static void 3338 _spdk_bdev_reset_freeze_channel(struct spdk_io_channel_iter *i) 3339 { 3340 struct spdk_io_channel *ch; 3341 struct spdk_bdev_channel *channel; 3342 struct spdk_bdev_mgmt_channel *mgmt_channel; 3343 struct spdk_bdev_shared_resource *shared_resource; 3344 bdev_io_tailq_t tmp_queued; 3345 3346 TAILQ_INIT(&tmp_queued); 3347 3348 ch = spdk_io_channel_iter_get_channel(i); 3349 channel = spdk_io_channel_get_ctx(ch); 3350 shared_resource = channel->shared_resource; 3351 mgmt_channel = shared_resource->mgmt_ch; 3352 3353 channel->flags |= BDEV_CH_RESET_IN_PROGRESS; 3354 3355 if ((channel->flags & BDEV_CH_QOS_ENABLED) != 0) { 3356 /* The QoS object is always valid and readable while 3357 * the channel flag is set, so the lock here should not 3358 * be necessary. We're not in the fast path though, so 3359 * just take it anyway. */ 3360 pthread_mutex_lock(&channel->bdev->internal.mutex); 3361 if (channel->bdev->internal.qos->ch == channel) { 3362 TAILQ_SWAP(&channel->bdev->internal.qos->queued, &tmp_queued, spdk_bdev_io, internal.link); 3363 } 3364 pthread_mutex_unlock(&channel->bdev->internal.mutex); 3365 } 3366 3367 _spdk_bdev_abort_queued_io(&shared_resource->nomem_io, channel); 3368 _spdk_bdev_abort_buf_io(&mgmt_channel->need_buf_small, channel); 3369 _spdk_bdev_abort_buf_io(&mgmt_channel->need_buf_large, channel); 3370 _spdk_bdev_abort_queued_io(&tmp_queued, channel); 3371 3372 spdk_for_each_channel_continue(i, 0); 3373 } 3374 3375 static void 3376 _spdk_bdev_start_reset(void *ctx) 3377 { 3378 struct spdk_bdev_channel *ch = ctx; 3379 3380 spdk_for_each_channel(__bdev_to_io_dev(ch->bdev), _spdk_bdev_reset_freeze_channel, 3381 ch, _spdk_bdev_reset_dev); 3382 } 3383 3384 static void 3385 _spdk_bdev_channel_start_reset(struct spdk_bdev_channel *ch) 3386 { 3387 struct spdk_bdev *bdev = ch->bdev; 3388 3389 assert(!TAILQ_EMPTY(&ch->queued_resets)); 3390 3391 pthread_mutex_lock(&bdev->internal.mutex); 3392 if (bdev->internal.reset_in_progress == NULL) { 3393 bdev->internal.reset_in_progress = TAILQ_FIRST(&ch->queued_resets); 3394 /* 3395 * Take a channel reference for the target bdev for the life of this 3396 * reset. This guards against the channel getting destroyed while 3397 * spdk_for_each_channel() calls related to this reset IO are in 3398 * progress. We will release the reference when this reset is 3399 * completed. 3400 */ 3401 bdev->internal.reset_in_progress->u.reset.ch_ref = spdk_get_io_channel(__bdev_to_io_dev(bdev)); 3402 _spdk_bdev_start_reset(ch); 3403 } 3404 pthread_mutex_unlock(&bdev->internal.mutex); 3405 } 3406 3407 int 3408 spdk_bdev_reset(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 3409 spdk_bdev_io_completion_cb cb, void *cb_arg) 3410 { 3411 struct spdk_bdev *bdev = spdk_bdev_desc_get_bdev(desc); 3412 struct spdk_bdev_io *bdev_io; 3413 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 3414 3415 bdev_io = spdk_bdev_get_io(channel); 3416 if (!bdev_io) { 3417 return -ENOMEM; 3418 } 3419 3420 bdev_io->internal.ch = channel; 3421 bdev_io->internal.desc = desc; 3422 bdev_io->type = SPDK_BDEV_IO_TYPE_RESET; 3423 bdev_io->u.reset.ch_ref = NULL; 3424 spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb); 3425 3426 pthread_mutex_lock(&bdev->internal.mutex); 3427 TAILQ_INSERT_TAIL(&channel->queued_resets, bdev_io, internal.link); 3428 pthread_mutex_unlock(&bdev->internal.mutex); 3429 3430 _spdk_bdev_channel_start_reset(channel); 3431 3432 return 0; 3433 } 3434 3435 void 3436 spdk_bdev_get_io_stat(struct spdk_bdev *bdev, struct spdk_io_channel *ch, 3437 struct spdk_bdev_io_stat *stat) 3438 { 3439 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 3440 3441 *stat = channel->stat; 3442 } 3443 3444 static void 3445 _spdk_bdev_get_device_stat_done(struct spdk_io_channel_iter *i, int status) 3446 { 3447 void *io_device = spdk_io_channel_iter_get_io_device(i); 3448 struct spdk_bdev_iostat_ctx *bdev_iostat_ctx = spdk_io_channel_iter_get_ctx(i); 3449 3450 bdev_iostat_ctx->cb(__bdev_from_io_dev(io_device), bdev_iostat_ctx->stat, 3451 bdev_iostat_ctx->cb_arg, 0); 3452 free(bdev_iostat_ctx); 3453 } 3454 3455 static void 3456 _spdk_bdev_get_each_channel_stat(struct spdk_io_channel_iter *i) 3457 { 3458 struct spdk_bdev_iostat_ctx *bdev_iostat_ctx = spdk_io_channel_iter_get_ctx(i); 3459 struct spdk_io_channel *ch = spdk_io_channel_iter_get_channel(i); 3460 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 3461 3462 _spdk_bdev_io_stat_add(bdev_iostat_ctx->stat, &channel->stat); 3463 spdk_for_each_channel_continue(i, 0); 3464 } 3465 3466 void 3467 spdk_bdev_get_device_stat(struct spdk_bdev *bdev, struct spdk_bdev_io_stat *stat, 3468 spdk_bdev_get_device_stat_cb cb, void *cb_arg) 3469 { 3470 struct spdk_bdev_iostat_ctx *bdev_iostat_ctx; 3471 3472 assert(bdev != NULL); 3473 assert(stat != NULL); 3474 assert(cb != NULL); 3475 3476 bdev_iostat_ctx = calloc(1, sizeof(struct spdk_bdev_iostat_ctx)); 3477 if (bdev_iostat_ctx == NULL) { 3478 SPDK_ERRLOG("Unable to allocate memory for spdk_bdev_iostat_ctx\n"); 3479 cb(bdev, stat, cb_arg, -ENOMEM); 3480 return; 3481 } 3482 3483 bdev_iostat_ctx->stat = stat; 3484 bdev_iostat_ctx->cb = cb; 3485 bdev_iostat_ctx->cb_arg = cb_arg; 3486 3487 /* Start with the statistics from previously deleted channels. */ 3488 pthread_mutex_lock(&bdev->internal.mutex); 3489 _spdk_bdev_io_stat_add(bdev_iostat_ctx->stat, &bdev->internal.stat); 3490 pthread_mutex_unlock(&bdev->internal.mutex); 3491 3492 /* Then iterate and add the statistics from each existing channel. */ 3493 spdk_for_each_channel(__bdev_to_io_dev(bdev), 3494 _spdk_bdev_get_each_channel_stat, 3495 bdev_iostat_ctx, 3496 _spdk_bdev_get_device_stat_done); 3497 } 3498 3499 int 3500 spdk_bdev_nvme_admin_passthru(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 3501 const struct spdk_nvme_cmd *cmd, void *buf, size_t nbytes, 3502 spdk_bdev_io_completion_cb cb, void *cb_arg) 3503 { 3504 struct spdk_bdev *bdev = spdk_bdev_desc_get_bdev(desc); 3505 struct spdk_bdev_io *bdev_io; 3506 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 3507 3508 if (!desc->write) { 3509 return -EBADF; 3510 } 3511 3512 bdev_io = spdk_bdev_get_io(channel); 3513 if (!bdev_io) { 3514 return -ENOMEM; 3515 } 3516 3517 bdev_io->internal.ch = channel; 3518 bdev_io->internal.desc = desc; 3519 bdev_io->type = SPDK_BDEV_IO_TYPE_NVME_ADMIN; 3520 bdev_io->u.nvme_passthru.cmd = *cmd; 3521 bdev_io->u.nvme_passthru.buf = buf; 3522 bdev_io->u.nvme_passthru.nbytes = nbytes; 3523 bdev_io->u.nvme_passthru.md_buf = NULL; 3524 bdev_io->u.nvme_passthru.md_len = 0; 3525 3526 spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb); 3527 3528 spdk_bdev_io_submit(bdev_io); 3529 return 0; 3530 } 3531 3532 int 3533 spdk_bdev_nvme_io_passthru(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 3534 const struct spdk_nvme_cmd *cmd, void *buf, size_t nbytes, 3535 spdk_bdev_io_completion_cb cb, void *cb_arg) 3536 { 3537 struct spdk_bdev *bdev = spdk_bdev_desc_get_bdev(desc); 3538 struct spdk_bdev_io *bdev_io; 3539 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 3540 3541 if (!desc->write) { 3542 /* 3543 * Do not try to parse the NVMe command - we could maybe use bits in the opcode 3544 * to easily determine if the command is a read or write, but for now just 3545 * do not allow io_passthru with a read-only descriptor. 3546 */ 3547 return -EBADF; 3548 } 3549 3550 bdev_io = spdk_bdev_get_io(channel); 3551 if (!bdev_io) { 3552 return -ENOMEM; 3553 } 3554 3555 bdev_io->internal.ch = channel; 3556 bdev_io->internal.desc = desc; 3557 bdev_io->type = SPDK_BDEV_IO_TYPE_NVME_IO; 3558 bdev_io->u.nvme_passthru.cmd = *cmd; 3559 bdev_io->u.nvme_passthru.buf = buf; 3560 bdev_io->u.nvme_passthru.nbytes = nbytes; 3561 bdev_io->u.nvme_passthru.md_buf = NULL; 3562 bdev_io->u.nvme_passthru.md_len = 0; 3563 3564 spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb); 3565 3566 spdk_bdev_io_submit(bdev_io); 3567 return 0; 3568 } 3569 3570 int 3571 spdk_bdev_nvme_io_passthru_md(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 3572 const struct spdk_nvme_cmd *cmd, void *buf, size_t nbytes, void *md_buf, size_t md_len, 3573 spdk_bdev_io_completion_cb cb, void *cb_arg) 3574 { 3575 struct spdk_bdev *bdev = spdk_bdev_desc_get_bdev(desc); 3576 struct spdk_bdev_io *bdev_io; 3577 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 3578 3579 if (!desc->write) { 3580 /* 3581 * Do not try to parse the NVMe command - we could maybe use bits in the opcode 3582 * to easily determine if the command is a read or write, but for now just 3583 * do not allow io_passthru with a read-only descriptor. 3584 */ 3585 return -EBADF; 3586 } 3587 3588 bdev_io = spdk_bdev_get_io(channel); 3589 if (!bdev_io) { 3590 return -ENOMEM; 3591 } 3592 3593 bdev_io->internal.ch = channel; 3594 bdev_io->internal.desc = desc; 3595 bdev_io->type = SPDK_BDEV_IO_TYPE_NVME_IO_MD; 3596 bdev_io->u.nvme_passthru.cmd = *cmd; 3597 bdev_io->u.nvme_passthru.buf = buf; 3598 bdev_io->u.nvme_passthru.nbytes = nbytes; 3599 bdev_io->u.nvme_passthru.md_buf = md_buf; 3600 bdev_io->u.nvme_passthru.md_len = md_len; 3601 3602 spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb); 3603 3604 spdk_bdev_io_submit(bdev_io); 3605 return 0; 3606 } 3607 3608 int 3609 spdk_bdev_queue_io_wait(struct spdk_bdev *bdev, struct spdk_io_channel *ch, 3610 struct spdk_bdev_io_wait_entry *entry) 3611 { 3612 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 3613 struct spdk_bdev_mgmt_channel *mgmt_ch = channel->shared_resource->mgmt_ch; 3614 3615 if (bdev != entry->bdev) { 3616 SPDK_ERRLOG("bdevs do not match\n"); 3617 return -EINVAL; 3618 } 3619 3620 if (mgmt_ch->per_thread_cache_count > 0) { 3621 SPDK_ERRLOG("Cannot queue io_wait if spdk_bdev_io available in per-thread cache\n"); 3622 return -EINVAL; 3623 } 3624 3625 TAILQ_INSERT_TAIL(&mgmt_ch->io_wait_queue, entry, link); 3626 return 0; 3627 } 3628 3629 static void 3630 _spdk_bdev_ch_retry_io(struct spdk_bdev_channel *bdev_ch) 3631 { 3632 struct spdk_bdev *bdev = bdev_ch->bdev; 3633 struct spdk_bdev_shared_resource *shared_resource = bdev_ch->shared_resource; 3634 struct spdk_bdev_io *bdev_io; 3635 3636 if (shared_resource->io_outstanding > shared_resource->nomem_threshold) { 3637 /* 3638 * Allow some more I/O to complete before retrying the nomem_io queue. 3639 * Some drivers (such as nvme) cannot immediately take a new I/O in 3640 * the context of a completion, because the resources for the I/O are 3641 * not released until control returns to the bdev poller. Also, we 3642 * may require several small I/O to complete before a larger I/O 3643 * (that requires splitting) can be submitted. 3644 */ 3645 return; 3646 } 3647 3648 while (!TAILQ_EMPTY(&shared_resource->nomem_io)) { 3649 bdev_io = TAILQ_FIRST(&shared_resource->nomem_io); 3650 TAILQ_REMOVE(&shared_resource->nomem_io, bdev_io, internal.link); 3651 bdev_io->internal.ch->io_outstanding++; 3652 shared_resource->io_outstanding++; 3653 bdev_io->internal.status = SPDK_BDEV_IO_STATUS_PENDING; 3654 bdev->fn_table->submit_request(spdk_bdev_io_get_io_channel(bdev_io), bdev_io); 3655 if (bdev_io->internal.status == SPDK_BDEV_IO_STATUS_NOMEM) { 3656 break; 3657 } 3658 } 3659 } 3660 3661 static inline void 3662 _spdk_bdev_io_complete(void *ctx) 3663 { 3664 struct spdk_bdev_io *bdev_io = ctx; 3665 uint64_t tsc, tsc_diff; 3666 3667 if (spdk_unlikely(bdev_io->internal.in_submit_request || bdev_io->internal.io_submit_ch)) { 3668 /* 3669 * Send the completion to the thread that originally submitted the I/O, 3670 * which may not be the current thread in the case of QoS. 3671 */ 3672 if (bdev_io->internal.io_submit_ch) { 3673 bdev_io->internal.ch = bdev_io->internal.io_submit_ch; 3674 bdev_io->internal.io_submit_ch = NULL; 3675 } 3676 3677 /* 3678 * Defer completion to avoid potential infinite recursion if the 3679 * user's completion callback issues a new I/O. 3680 */ 3681 spdk_thread_send_msg(spdk_bdev_io_get_thread(bdev_io), 3682 _spdk_bdev_io_complete, bdev_io); 3683 return; 3684 } 3685 3686 tsc = spdk_get_ticks(); 3687 tsc_diff = tsc - bdev_io->internal.submit_tsc; 3688 spdk_trace_record_tsc(tsc, TRACE_BDEV_IO_DONE, 0, 0, (uintptr_t)bdev_io, 0); 3689 3690 if (bdev_io->internal.ch->histogram) { 3691 spdk_histogram_data_tally(bdev_io->internal.ch->histogram, tsc_diff); 3692 } 3693 3694 if (bdev_io->internal.status == SPDK_BDEV_IO_STATUS_SUCCESS) { 3695 switch (bdev_io->type) { 3696 case SPDK_BDEV_IO_TYPE_READ: 3697 bdev_io->internal.ch->stat.bytes_read += bdev_io->u.bdev.num_blocks * bdev_io->bdev->blocklen; 3698 bdev_io->internal.ch->stat.num_read_ops++; 3699 bdev_io->internal.ch->stat.read_latency_ticks += tsc_diff; 3700 break; 3701 case SPDK_BDEV_IO_TYPE_WRITE: 3702 bdev_io->internal.ch->stat.bytes_written += bdev_io->u.bdev.num_blocks * bdev_io->bdev->blocklen; 3703 bdev_io->internal.ch->stat.num_write_ops++; 3704 bdev_io->internal.ch->stat.write_latency_ticks += tsc_diff; 3705 break; 3706 case SPDK_BDEV_IO_TYPE_UNMAP: 3707 bdev_io->internal.ch->stat.bytes_unmapped += bdev_io->u.bdev.num_blocks * bdev_io->bdev->blocklen; 3708 bdev_io->internal.ch->stat.num_unmap_ops++; 3709 bdev_io->internal.ch->stat.unmap_latency_ticks += tsc_diff; 3710 default: 3711 break; 3712 } 3713 } 3714 3715 #ifdef SPDK_CONFIG_VTUNE 3716 uint64_t now_tsc = spdk_get_ticks(); 3717 if (now_tsc > (bdev_io->internal.ch->start_tsc + bdev_io->internal.ch->interval_tsc)) { 3718 uint64_t data[5]; 3719 3720 data[0] = bdev_io->internal.ch->stat.num_read_ops - bdev_io->internal.ch->prev_stat.num_read_ops; 3721 data[1] = bdev_io->internal.ch->stat.bytes_read - bdev_io->internal.ch->prev_stat.bytes_read; 3722 data[2] = bdev_io->internal.ch->stat.num_write_ops - bdev_io->internal.ch->prev_stat.num_write_ops; 3723 data[3] = bdev_io->internal.ch->stat.bytes_written - bdev_io->internal.ch->prev_stat.bytes_written; 3724 data[4] = bdev_io->bdev->fn_table->get_spin_time ? 3725 bdev_io->bdev->fn_table->get_spin_time(spdk_bdev_io_get_io_channel(bdev_io)) : 0; 3726 3727 __itt_metadata_add(g_bdev_mgr.domain, __itt_null, bdev_io->internal.ch->handle, 3728 __itt_metadata_u64, 5, data); 3729 3730 bdev_io->internal.ch->prev_stat = bdev_io->internal.ch->stat; 3731 bdev_io->internal.ch->start_tsc = now_tsc; 3732 } 3733 #endif 3734 3735 assert(bdev_io->internal.cb != NULL); 3736 assert(spdk_get_thread() == spdk_bdev_io_get_thread(bdev_io)); 3737 3738 bdev_io->internal.cb(bdev_io, bdev_io->internal.status == SPDK_BDEV_IO_STATUS_SUCCESS, 3739 bdev_io->internal.caller_ctx); 3740 } 3741 3742 static void 3743 _spdk_bdev_reset_complete(struct spdk_io_channel_iter *i, int status) 3744 { 3745 struct spdk_bdev_io *bdev_io = spdk_io_channel_iter_get_ctx(i); 3746 3747 if (bdev_io->u.reset.ch_ref != NULL) { 3748 spdk_put_io_channel(bdev_io->u.reset.ch_ref); 3749 bdev_io->u.reset.ch_ref = NULL; 3750 } 3751 3752 _spdk_bdev_io_complete(bdev_io); 3753 } 3754 3755 static void 3756 _spdk_bdev_unfreeze_channel(struct spdk_io_channel_iter *i) 3757 { 3758 struct spdk_io_channel *_ch = spdk_io_channel_iter_get_channel(i); 3759 struct spdk_bdev_channel *ch = spdk_io_channel_get_ctx(_ch); 3760 3761 ch->flags &= ~BDEV_CH_RESET_IN_PROGRESS; 3762 if (!TAILQ_EMPTY(&ch->queued_resets)) { 3763 _spdk_bdev_channel_start_reset(ch); 3764 } 3765 3766 spdk_for_each_channel_continue(i, 0); 3767 } 3768 3769 void 3770 spdk_bdev_io_complete(struct spdk_bdev_io *bdev_io, enum spdk_bdev_io_status status) 3771 { 3772 struct spdk_bdev *bdev = bdev_io->bdev; 3773 struct spdk_bdev_channel *bdev_ch = bdev_io->internal.ch; 3774 struct spdk_bdev_shared_resource *shared_resource = bdev_ch->shared_resource; 3775 3776 bdev_io->internal.status = status; 3777 3778 if (spdk_unlikely(bdev_io->type == SPDK_BDEV_IO_TYPE_RESET)) { 3779 bool unlock_channels = false; 3780 3781 if (status == SPDK_BDEV_IO_STATUS_NOMEM) { 3782 SPDK_ERRLOG("NOMEM returned for reset\n"); 3783 } 3784 pthread_mutex_lock(&bdev->internal.mutex); 3785 if (bdev_io == bdev->internal.reset_in_progress) { 3786 bdev->internal.reset_in_progress = NULL; 3787 unlock_channels = true; 3788 } 3789 pthread_mutex_unlock(&bdev->internal.mutex); 3790 3791 if (unlock_channels) { 3792 spdk_for_each_channel(__bdev_to_io_dev(bdev), _spdk_bdev_unfreeze_channel, 3793 bdev_io, _spdk_bdev_reset_complete); 3794 return; 3795 } 3796 } else { 3797 _bdev_io_unset_bounce_buf(bdev_io); 3798 3799 assert(bdev_ch->io_outstanding > 0); 3800 assert(shared_resource->io_outstanding > 0); 3801 bdev_ch->io_outstanding--; 3802 shared_resource->io_outstanding--; 3803 3804 if (spdk_unlikely(status == SPDK_BDEV_IO_STATUS_NOMEM)) { 3805 TAILQ_INSERT_HEAD(&shared_resource->nomem_io, bdev_io, internal.link); 3806 /* 3807 * Wait for some of the outstanding I/O to complete before we 3808 * retry any of the nomem_io. Normally we will wait for 3809 * NOMEM_THRESHOLD_COUNT I/O to complete but for low queue 3810 * depth channels we will instead wait for half to complete. 3811 */ 3812 shared_resource->nomem_threshold = spdk_max((int64_t)shared_resource->io_outstanding / 2, 3813 (int64_t)shared_resource->io_outstanding - NOMEM_THRESHOLD_COUNT); 3814 return; 3815 } 3816 3817 if (spdk_unlikely(!TAILQ_EMPTY(&shared_resource->nomem_io))) { 3818 _spdk_bdev_ch_retry_io(bdev_ch); 3819 } 3820 } 3821 3822 _spdk_bdev_io_complete(bdev_io); 3823 } 3824 3825 void 3826 spdk_bdev_io_complete_scsi_status(struct spdk_bdev_io *bdev_io, enum spdk_scsi_status sc, 3827 enum spdk_scsi_sense sk, uint8_t asc, uint8_t ascq) 3828 { 3829 if (sc == SPDK_SCSI_STATUS_GOOD) { 3830 bdev_io->internal.status = SPDK_BDEV_IO_STATUS_SUCCESS; 3831 } else { 3832 bdev_io->internal.status = SPDK_BDEV_IO_STATUS_SCSI_ERROR; 3833 bdev_io->internal.error.scsi.sc = sc; 3834 bdev_io->internal.error.scsi.sk = sk; 3835 bdev_io->internal.error.scsi.asc = asc; 3836 bdev_io->internal.error.scsi.ascq = ascq; 3837 } 3838 3839 spdk_bdev_io_complete(bdev_io, bdev_io->internal.status); 3840 } 3841 3842 void 3843 spdk_bdev_io_get_scsi_status(const struct spdk_bdev_io *bdev_io, 3844 int *sc, int *sk, int *asc, int *ascq) 3845 { 3846 assert(sc != NULL); 3847 assert(sk != NULL); 3848 assert(asc != NULL); 3849 assert(ascq != NULL); 3850 3851 switch (bdev_io->internal.status) { 3852 case SPDK_BDEV_IO_STATUS_SUCCESS: 3853 *sc = SPDK_SCSI_STATUS_GOOD; 3854 *sk = SPDK_SCSI_SENSE_NO_SENSE; 3855 *asc = SPDK_SCSI_ASC_NO_ADDITIONAL_SENSE; 3856 *ascq = SPDK_SCSI_ASCQ_CAUSE_NOT_REPORTABLE; 3857 break; 3858 case SPDK_BDEV_IO_STATUS_NVME_ERROR: 3859 spdk_scsi_nvme_translate(bdev_io, sc, sk, asc, ascq); 3860 break; 3861 case SPDK_BDEV_IO_STATUS_SCSI_ERROR: 3862 *sc = bdev_io->internal.error.scsi.sc; 3863 *sk = bdev_io->internal.error.scsi.sk; 3864 *asc = bdev_io->internal.error.scsi.asc; 3865 *ascq = bdev_io->internal.error.scsi.ascq; 3866 break; 3867 default: 3868 *sc = SPDK_SCSI_STATUS_CHECK_CONDITION; 3869 *sk = SPDK_SCSI_SENSE_ABORTED_COMMAND; 3870 *asc = SPDK_SCSI_ASC_NO_ADDITIONAL_SENSE; 3871 *ascq = SPDK_SCSI_ASCQ_CAUSE_NOT_REPORTABLE; 3872 break; 3873 } 3874 } 3875 3876 void 3877 spdk_bdev_io_complete_nvme_status(struct spdk_bdev_io *bdev_io, int sct, int sc) 3878 { 3879 if (sct == SPDK_NVME_SCT_GENERIC && sc == SPDK_NVME_SC_SUCCESS) { 3880 bdev_io->internal.status = SPDK_BDEV_IO_STATUS_SUCCESS; 3881 } else { 3882 bdev_io->internal.error.nvme.sct = sct; 3883 bdev_io->internal.error.nvme.sc = sc; 3884 bdev_io->internal.status = SPDK_BDEV_IO_STATUS_NVME_ERROR; 3885 } 3886 3887 spdk_bdev_io_complete(bdev_io, bdev_io->internal.status); 3888 } 3889 3890 void 3891 spdk_bdev_io_get_nvme_status(const struct spdk_bdev_io *bdev_io, int *sct, int *sc) 3892 { 3893 assert(sct != NULL); 3894 assert(sc != NULL); 3895 3896 if (bdev_io->internal.status == SPDK_BDEV_IO_STATUS_NVME_ERROR) { 3897 *sct = bdev_io->internal.error.nvme.sct; 3898 *sc = bdev_io->internal.error.nvme.sc; 3899 } else if (bdev_io->internal.status == SPDK_BDEV_IO_STATUS_SUCCESS) { 3900 *sct = SPDK_NVME_SCT_GENERIC; 3901 *sc = SPDK_NVME_SC_SUCCESS; 3902 } else { 3903 *sct = SPDK_NVME_SCT_GENERIC; 3904 *sc = SPDK_NVME_SC_INTERNAL_DEVICE_ERROR; 3905 } 3906 } 3907 3908 struct spdk_thread * 3909 spdk_bdev_io_get_thread(struct spdk_bdev_io *bdev_io) 3910 { 3911 return spdk_io_channel_get_thread(bdev_io->internal.ch->channel); 3912 } 3913 3914 struct spdk_io_channel * 3915 spdk_bdev_io_get_io_channel(struct spdk_bdev_io *bdev_io) 3916 { 3917 return bdev_io->internal.ch->channel; 3918 } 3919 3920 static void 3921 _spdk_bdev_qos_config_limit(struct spdk_bdev *bdev, uint64_t *limits) 3922 { 3923 uint64_t min_qos_set; 3924 int i; 3925 3926 for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) { 3927 if (limits[i] != SPDK_BDEV_QOS_LIMIT_NOT_DEFINED) { 3928 break; 3929 } 3930 } 3931 3932 if (i == SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES) { 3933 SPDK_ERRLOG("Invalid rate limits set.\n"); 3934 return; 3935 } 3936 3937 for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) { 3938 if (limits[i] == SPDK_BDEV_QOS_LIMIT_NOT_DEFINED) { 3939 continue; 3940 } 3941 3942 if (_spdk_bdev_qos_is_iops_rate_limit(i) == true) { 3943 min_qos_set = SPDK_BDEV_QOS_MIN_IOS_PER_SEC; 3944 } else { 3945 min_qos_set = SPDK_BDEV_QOS_MIN_BYTES_PER_SEC; 3946 } 3947 3948 if (limits[i] == 0 || limits[i] % min_qos_set) { 3949 SPDK_ERRLOG("Assigned limit %" PRIu64 " on bdev %s is not multiple of %" PRIu64 "\n", 3950 limits[i], bdev->name, min_qos_set); 3951 SPDK_ERRLOG("Failed to enable QoS on this bdev %s\n", bdev->name); 3952 return; 3953 } 3954 } 3955 3956 if (!bdev->internal.qos) { 3957 bdev->internal.qos = calloc(1, sizeof(*bdev->internal.qos)); 3958 if (!bdev->internal.qos) { 3959 SPDK_ERRLOG("Unable to allocate memory for QoS tracking\n"); 3960 return; 3961 } 3962 } 3963 3964 for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) { 3965 bdev->internal.qos->rate_limits[i].limit = limits[i]; 3966 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Bdev:%s QoS type:%d set:%lu\n", 3967 bdev->name, i, limits[i]); 3968 } 3969 3970 return; 3971 } 3972 3973 static void 3974 _spdk_bdev_qos_config(struct spdk_bdev *bdev) 3975 { 3976 struct spdk_conf_section *sp = NULL; 3977 const char *val = NULL; 3978 int i = 0, j = 0; 3979 uint64_t limits[SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES] = {}; 3980 bool config_qos = false; 3981 3982 sp = spdk_conf_find_section(NULL, "QoS"); 3983 if (!sp) { 3984 return; 3985 } 3986 3987 while (j < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES) { 3988 limits[j] = SPDK_BDEV_QOS_LIMIT_NOT_DEFINED; 3989 3990 i = 0; 3991 while (true) { 3992 val = spdk_conf_section_get_nmval(sp, qos_conf_type[j], i, 0); 3993 if (!val) { 3994 break; 3995 } 3996 3997 if (strcmp(bdev->name, val) != 0) { 3998 i++; 3999 continue; 4000 } 4001 4002 val = spdk_conf_section_get_nmval(sp, qos_conf_type[j], i, 1); 4003 if (val) { 4004 if (_spdk_bdev_qos_is_iops_rate_limit(j) == true) { 4005 limits[j] = strtoull(val, NULL, 10); 4006 } else { 4007 limits[j] = strtoull(val, NULL, 10) * 1024 * 1024; 4008 } 4009 config_qos = true; 4010 } 4011 4012 break; 4013 } 4014 4015 j++; 4016 } 4017 4018 if (config_qos == true) { 4019 _spdk_bdev_qos_config_limit(bdev, limits); 4020 } 4021 4022 return; 4023 } 4024 4025 static int 4026 spdk_bdev_init(struct spdk_bdev *bdev) 4027 { 4028 char *bdev_name; 4029 4030 assert(bdev->module != NULL); 4031 4032 if (!bdev->name) { 4033 SPDK_ERRLOG("Bdev name is NULL\n"); 4034 return -EINVAL; 4035 } 4036 4037 if (!strlen(bdev->name)) { 4038 SPDK_ERRLOG("Bdev name must not be an empty string\n"); 4039 return -EINVAL; 4040 } 4041 4042 if (spdk_bdev_get_by_name(bdev->name)) { 4043 SPDK_ERRLOG("Bdev name:%s already exists\n", bdev->name); 4044 return -EEXIST; 4045 } 4046 4047 /* Users often register their own I/O devices using the bdev name. In 4048 * order to avoid conflicts, prepend bdev_. */ 4049 bdev_name = spdk_sprintf_alloc("bdev_%s", bdev->name); 4050 if (!bdev_name) { 4051 SPDK_ERRLOG("Unable to allocate memory for internal bdev name.\n"); 4052 return -ENOMEM; 4053 } 4054 4055 bdev->internal.status = SPDK_BDEV_STATUS_READY; 4056 bdev->internal.measured_queue_depth = UINT64_MAX; 4057 bdev->internal.claim_module = NULL; 4058 bdev->internal.qd_poller = NULL; 4059 bdev->internal.qos = NULL; 4060 4061 /* If the user didn't specify a uuid, generate one. */ 4062 if (spdk_mem_all_zero(&bdev->uuid, sizeof(bdev->uuid))) { 4063 spdk_uuid_generate(&bdev->uuid); 4064 } 4065 4066 if (spdk_bdev_get_buf_align(bdev) > 1) { 4067 if (bdev->split_on_optimal_io_boundary) { 4068 bdev->optimal_io_boundary = spdk_min(bdev->optimal_io_boundary, 4069 SPDK_BDEV_LARGE_BUF_MAX_SIZE / bdev->blocklen); 4070 } else { 4071 bdev->split_on_optimal_io_boundary = true; 4072 bdev->optimal_io_boundary = SPDK_BDEV_LARGE_BUF_MAX_SIZE / bdev->blocklen; 4073 } 4074 } 4075 4076 TAILQ_INIT(&bdev->internal.open_descs); 4077 4078 TAILQ_INIT(&bdev->aliases); 4079 4080 bdev->internal.reset_in_progress = NULL; 4081 4082 _spdk_bdev_qos_config(bdev); 4083 4084 spdk_io_device_register(__bdev_to_io_dev(bdev), 4085 spdk_bdev_channel_create, spdk_bdev_channel_destroy, 4086 sizeof(struct spdk_bdev_channel), 4087 bdev_name); 4088 4089 free(bdev_name); 4090 4091 pthread_mutex_init(&bdev->internal.mutex, NULL); 4092 return 0; 4093 } 4094 4095 static void 4096 spdk_bdev_destroy_cb(void *io_device) 4097 { 4098 int rc; 4099 struct spdk_bdev *bdev; 4100 spdk_bdev_unregister_cb cb_fn; 4101 void *cb_arg; 4102 4103 bdev = __bdev_from_io_dev(io_device); 4104 cb_fn = bdev->internal.unregister_cb; 4105 cb_arg = bdev->internal.unregister_ctx; 4106 4107 rc = bdev->fn_table->destruct(bdev->ctxt); 4108 if (rc < 0) { 4109 SPDK_ERRLOG("destruct failed\n"); 4110 } 4111 if (rc <= 0 && cb_fn != NULL) { 4112 cb_fn(cb_arg, rc); 4113 } 4114 } 4115 4116 4117 static void 4118 spdk_bdev_fini(struct spdk_bdev *bdev) 4119 { 4120 pthread_mutex_destroy(&bdev->internal.mutex); 4121 4122 free(bdev->internal.qos); 4123 4124 spdk_io_device_unregister(__bdev_to_io_dev(bdev), spdk_bdev_destroy_cb); 4125 } 4126 4127 static void 4128 spdk_bdev_start(struct spdk_bdev *bdev) 4129 { 4130 struct spdk_bdev_module *module; 4131 uint32_t action; 4132 4133 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Inserting bdev %s into list\n", bdev->name); 4134 TAILQ_INSERT_TAIL(&g_bdev_mgr.bdevs, bdev, internal.link); 4135 4136 /* Examine configuration before initializing I/O */ 4137 TAILQ_FOREACH(module, &g_bdev_mgr.bdev_modules, internal.tailq) { 4138 if (module->examine_config) { 4139 action = module->internal.action_in_progress; 4140 module->internal.action_in_progress++; 4141 module->examine_config(bdev); 4142 if (action != module->internal.action_in_progress) { 4143 SPDK_ERRLOG("examine_config for module %s did not call spdk_bdev_module_examine_done()\n", 4144 module->name); 4145 } 4146 } 4147 } 4148 4149 if (bdev->internal.claim_module) { 4150 if (bdev->internal.claim_module->examine_disk) { 4151 bdev->internal.claim_module->internal.action_in_progress++; 4152 bdev->internal.claim_module->examine_disk(bdev); 4153 } 4154 return; 4155 } 4156 4157 TAILQ_FOREACH(module, &g_bdev_mgr.bdev_modules, internal.tailq) { 4158 if (module->examine_disk) { 4159 module->internal.action_in_progress++; 4160 module->examine_disk(bdev); 4161 } 4162 } 4163 } 4164 4165 int 4166 spdk_bdev_register(struct spdk_bdev *bdev) 4167 { 4168 int rc = spdk_bdev_init(bdev); 4169 4170 if (rc == 0) { 4171 spdk_bdev_start(bdev); 4172 } 4173 4174 spdk_notify_send("bdev_register", spdk_bdev_get_name(bdev)); 4175 return rc; 4176 } 4177 4178 int 4179 spdk_vbdev_register(struct spdk_bdev *vbdev, struct spdk_bdev **base_bdevs, int base_bdev_count) 4180 { 4181 SPDK_ERRLOG("This function is deprecated. Use spdk_bdev_register() instead.\n"); 4182 return spdk_bdev_register(vbdev); 4183 } 4184 4185 void 4186 spdk_bdev_destruct_done(struct spdk_bdev *bdev, int bdeverrno) 4187 { 4188 if (bdev->internal.unregister_cb != NULL) { 4189 bdev->internal.unregister_cb(bdev->internal.unregister_ctx, bdeverrno); 4190 } 4191 } 4192 4193 static void 4194 _remove_notify(void *arg) 4195 { 4196 struct spdk_bdev_desc *desc = arg; 4197 4198 desc->remove_scheduled = false; 4199 4200 if (desc->closed) { 4201 free(desc); 4202 } else { 4203 if (desc->callback.open_with_ext) { 4204 desc->callback.event_fn(SPDK_BDEV_EVENT_REMOVE, desc->bdev, desc->callback.ctx); 4205 } else { 4206 desc->callback.remove_fn(desc->callback.ctx); 4207 } 4208 } 4209 } 4210 4211 /* Must be called while holding bdev->internal.mutex. 4212 * returns: 0 - bdev removed and ready to be destructed. 4213 * -EBUSY - bdev can't be destructed yet. */ 4214 static int 4215 spdk_bdev_unregister_unsafe(struct spdk_bdev *bdev) 4216 { 4217 struct spdk_bdev_desc *desc, *tmp; 4218 int rc = 0; 4219 4220 /* Notify each descriptor about hotremoval */ 4221 TAILQ_FOREACH_SAFE(desc, &bdev->internal.open_descs, link, tmp) { 4222 rc = -EBUSY; 4223 /* 4224 * Defer invocation of the event_cb to a separate message that will 4225 * run later on its thread. This ensures this context unwinds and 4226 * we don't recursively unregister this bdev again if the event_cb 4227 * immediately closes its descriptor. 4228 */ 4229 if (!desc->remove_scheduled) { 4230 /* Avoid scheduling removal of the same descriptor multiple times. */ 4231 desc->remove_scheduled = true; 4232 spdk_thread_send_msg(desc->thread, _remove_notify, desc); 4233 } 4234 } 4235 4236 /* If there are no descriptors, proceed removing the bdev */ 4237 if (rc == 0) { 4238 TAILQ_REMOVE(&g_bdev_mgr.bdevs, bdev, internal.link); 4239 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Removing bdev %s from list done\n", bdev->name); 4240 spdk_notify_send("bdev_unregister", spdk_bdev_get_name(bdev)); 4241 } 4242 4243 return rc; 4244 } 4245 4246 void 4247 spdk_bdev_unregister(struct spdk_bdev *bdev, spdk_bdev_unregister_cb cb_fn, void *cb_arg) 4248 { 4249 struct spdk_thread *thread; 4250 int rc; 4251 4252 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Removing bdev %s from list\n", bdev->name); 4253 4254 thread = spdk_get_thread(); 4255 if (!thread) { 4256 /* The user called this from a non-SPDK thread. */ 4257 if (cb_fn != NULL) { 4258 cb_fn(cb_arg, -ENOTSUP); 4259 } 4260 return; 4261 } 4262 4263 pthread_mutex_lock(&g_bdev_mgr.mutex); 4264 pthread_mutex_lock(&bdev->internal.mutex); 4265 if (bdev->internal.status == SPDK_BDEV_STATUS_REMOVING) { 4266 pthread_mutex_unlock(&bdev->internal.mutex); 4267 pthread_mutex_unlock(&g_bdev_mgr.mutex); 4268 if (cb_fn) { 4269 cb_fn(cb_arg, -EBUSY); 4270 } 4271 return; 4272 } 4273 4274 bdev->internal.status = SPDK_BDEV_STATUS_REMOVING; 4275 bdev->internal.unregister_cb = cb_fn; 4276 bdev->internal.unregister_ctx = cb_arg; 4277 4278 /* Call under lock. */ 4279 rc = spdk_bdev_unregister_unsafe(bdev); 4280 pthread_mutex_unlock(&bdev->internal.mutex); 4281 pthread_mutex_unlock(&g_bdev_mgr.mutex); 4282 4283 if (rc == 0) { 4284 spdk_bdev_fini(bdev); 4285 } 4286 } 4287 4288 static void 4289 _spdk_bdev_dummy_event_cb(void *remove_ctx) 4290 { 4291 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Bdev remove event received with no remove callback specified"); 4292 } 4293 4294 static int 4295 _spdk_bdev_open(struct spdk_bdev *bdev, bool write, struct spdk_bdev_desc *desc) 4296 { 4297 struct spdk_thread *thread; 4298 struct set_qos_limit_ctx *ctx; 4299 4300 thread = spdk_get_thread(); 4301 if (!thread) { 4302 SPDK_ERRLOG("Cannot open bdev from non-SPDK thread.\n"); 4303 return -ENOTSUP; 4304 } 4305 4306 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Opening descriptor %p for bdev %s on thread %p\n", desc, bdev->name, 4307 spdk_get_thread()); 4308 4309 desc->bdev = bdev; 4310 desc->thread = thread; 4311 desc->write = write; 4312 4313 pthread_mutex_lock(&bdev->internal.mutex); 4314 if (bdev->internal.status == SPDK_BDEV_STATUS_REMOVING) { 4315 pthread_mutex_unlock(&bdev->internal.mutex); 4316 return -ENODEV; 4317 } 4318 4319 if (write && bdev->internal.claim_module) { 4320 SPDK_ERRLOG("Could not open %s - %s module already claimed it\n", 4321 bdev->name, bdev->internal.claim_module->name); 4322 pthread_mutex_unlock(&bdev->internal.mutex); 4323 return -EPERM; 4324 } 4325 4326 /* Enable QoS */ 4327 if (bdev->internal.qos && bdev->internal.qos->thread == NULL) { 4328 ctx = calloc(1, sizeof(*ctx)); 4329 if (ctx == NULL) { 4330 SPDK_ERRLOG("Failed to allocate memory for QoS context\n"); 4331 pthread_mutex_unlock(&bdev->internal.mutex); 4332 return -ENOMEM; 4333 } 4334 ctx->bdev = bdev; 4335 spdk_for_each_channel(__bdev_to_io_dev(bdev), 4336 _spdk_bdev_enable_qos_msg, ctx, 4337 _spdk_bdev_enable_qos_done); 4338 } 4339 4340 TAILQ_INSERT_TAIL(&bdev->internal.open_descs, desc, link); 4341 4342 pthread_mutex_unlock(&bdev->internal.mutex); 4343 4344 return 0; 4345 } 4346 4347 int 4348 spdk_bdev_open(struct spdk_bdev *bdev, bool write, spdk_bdev_remove_cb_t remove_cb, 4349 void *remove_ctx, struct spdk_bdev_desc **_desc) 4350 { 4351 struct spdk_bdev_desc *desc; 4352 int rc; 4353 4354 desc = calloc(1, sizeof(*desc)); 4355 if (desc == NULL) { 4356 SPDK_ERRLOG("Failed to allocate memory for bdev descriptor\n"); 4357 return -ENOMEM; 4358 } 4359 4360 if (remove_cb == NULL) { 4361 remove_cb = _spdk_bdev_dummy_event_cb; 4362 } 4363 4364 desc->callback.open_with_ext = false; 4365 desc->callback.remove_fn = remove_cb; 4366 desc->callback.ctx = remove_ctx; 4367 4368 pthread_mutex_lock(&g_bdev_mgr.mutex); 4369 4370 rc = _spdk_bdev_open(bdev, write, desc); 4371 if (rc != 0) { 4372 free(desc); 4373 desc = NULL; 4374 } 4375 4376 *_desc = desc; 4377 4378 pthread_mutex_unlock(&g_bdev_mgr.mutex); 4379 4380 return rc; 4381 } 4382 4383 int 4384 spdk_bdev_open_ext(const char *bdev_name, bool write, spdk_bdev_event_cb_t event_cb, 4385 void *event_ctx, struct spdk_bdev_desc **_desc) 4386 { 4387 struct spdk_bdev_desc *desc; 4388 struct spdk_bdev *bdev; 4389 int rc; 4390 4391 if (event_cb == NULL) { 4392 SPDK_ERRLOG("Missing event callback function\n"); 4393 return -EINVAL; 4394 } 4395 4396 pthread_mutex_lock(&g_bdev_mgr.mutex); 4397 4398 bdev = spdk_bdev_get_by_name(bdev_name); 4399 4400 if (bdev == NULL) { 4401 SPDK_ERRLOG("Failed to find bdev with name: %s\n", bdev_name); 4402 pthread_mutex_unlock(&g_bdev_mgr.mutex); 4403 return -EINVAL; 4404 } 4405 4406 desc = calloc(1, sizeof(*desc)); 4407 if (desc == NULL) { 4408 SPDK_ERRLOG("Failed to allocate memory for bdev descriptor\n"); 4409 pthread_mutex_unlock(&g_bdev_mgr.mutex); 4410 return -ENOMEM; 4411 } 4412 4413 desc->callback.open_with_ext = true; 4414 desc->callback.event_fn = event_cb; 4415 desc->callback.ctx = event_ctx; 4416 4417 rc = _spdk_bdev_open(bdev, write, desc); 4418 if (rc != 0) { 4419 free(desc); 4420 desc = NULL; 4421 } 4422 4423 *_desc = desc; 4424 4425 pthread_mutex_unlock(&g_bdev_mgr.mutex); 4426 4427 return rc; 4428 } 4429 4430 void 4431 spdk_bdev_close(struct spdk_bdev_desc *desc) 4432 { 4433 struct spdk_bdev *bdev = spdk_bdev_desc_get_bdev(desc); 4434 int rc; 4435 4436 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Closing descriptor %p for bdev %s on thread %p\n", desc, bdev->name, 4437 spdk_get_thread()); 4438 4439 assert(desc->thread == spdk_get_thread()); 4440 4441 pthread_mutex_lock(&bdev->internal.mutex); 4442 4443 TAILQ_REMOVE(&bdev->internal.open_descs, desc, link); 4444 4445 desc->closed = true; 4446 4447 if (!desc->remove_scheduled) { 4448 free(desc); 4449 } 4450 4451 /* If no more descriptors, kill QoS channel */ 4452 if (bdev->internal.qos && TAILQ_EMPTY(&bdev->internal.open_descs)) { 4453 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Closed last descriptor for bdev %s on thread %p. Stopping QoS.\n", 4454 bdev->name, spdk_get_thread()); 4455 4456 if (spdk_bdev_qos_destroy(bdev)) { 4457 /* There isn't anything we can do to recover here. Just let the 4458 * old QoS poller keep running. The QoS handling won't change 4459 * cores when the user allocates a new channel, but it won't break. */ 4460 SPDK_ERRLOG("Unable to shut down QoS poller. It will continue running on the current thread.\n"); 4461 } 4462 } 4463 4464 spdk_bdev_set_qd_sampling_period(bdev, 0); 4465 4466 if (bdev->internal.status == SPDK_BDEV_STATUS_REMOVING && TAILQ_EMPTY(&bdev->internal.open_descs)) { 4467 rc = spdk_bdev_unregister_unsafe(bdev); 4468 pthread_mutex_unlock(&bdev->internal.mutex); 4469 4470 if (rc == 0) { 4471 spdk_bdev_fini(bdev); 4472 } 4473 } else { 4474 pthread_mutex_unlock(&bdev->internal.mutex); 4475 } 4476 } 4477 4478 int 4479 spdk_bdev_module_claim_bdev(struct spdk_bdev *bdev, struct spdk_bdev_desc *desc, 4480 struct spdk_bdev_module *module) 4481 { 4482 if (bdev->internal.claim_module != NULL) { 4483 SPDK_ERRLOG("bdev %s already claimed by module %s\n", bdev->name, 4484 bdev->internal.claim_module->name); 4485 return -EPERM; 4486 } 4487 4488 if (desc && !desc->write) { 4489 desc->write = true; 4490 } 4491 4492 bdev->internal.claim_module = module; 4493 return 0; 4494 } 4495 4496 void 4497 spdk_bdev_module_release_bdev(struct spdk_bdev *bdev) 4498 { 4499 assert(bdev->internal.claim_module != NULL); 4500 bdev->internal.claim_module = NULL; 4501 } 4502 4503 struct spdk_bdev * 4504 spdk_bdev_desc_get_bdev(struct spdk_bdev_desc *desc) 4505 { 4506 assert(desc != NULL); 4507 return desc->bdev; 4508 } 4509 4510 void 4511 spdk_bdev_io_get_iovec(struct spdk_bdev_io *bdev_io, struct iovec **iovp, int *iovcntp) 4512 { 4513 struct iovec *iovs; 4514 int iovcnt; 4515 4516 if (bdev_io == NULL) { 4517 return; 4518 } 4519 4520 switch (bdev_io->type) { 4521 case SPDK_BDEV_IO_TYPE_READ: 4522 case SPDK_BDEV_IO_TYPE_WRITE: 4523 case SPDK_BDEV_IO_TYPE_ZCOPY: 4524 iovs = bdev_io->u.bdev.iovs; 4525 iovcnt = bdev_io->u.bdev.iovcnt; 4526 break; 4527 default: 4528 iovs = NULL; 4529 iovcnt = 0; 4530 break; 4531 } 4532 4533 if (iovp) { 4534 *iovp = iovs; 4535 } 4536 if (iovcntp) { 4537 *iovcntp = iovcnt; 4538 } 4539 } 4540 4541 void * 4542 spdk_bdev_io_get_md_buf(struct spdk_bdev_io *bdev_io) 4543 { 4544 if (bdev_io == NULL) { 4545 return NULL; 4546 } 4547 4548 if (!spdk_bdev_is_md_separate(bdev_io->bdev)) { 4549 return NULL; 4550 } 4551 4552 if (bdev_io->type == SPDK_BDEV_IO_TYPE_READ || 4553 bdev_io->type == SPDK_BDEV_IO_TYPE_WRITE) { 4554 return bdev_io->u.bdev.md_buf; 4555 } 4556 4557 return NULL; 4558 } 4559 4560 void 4561 spdk_bdev_module_list_add(struct spdk_bdev_module *bdev_module) 4562 { 4563 4564 if (spdk_bdev_module_list_find(bdev_module->name)) { 4565 SPDK_ERRLOG("ERROR: module '%s' already registered.\n", bdev_module->name); 4566 assert(false); 4567 } 4568 4569 /* 4570 * Modules with examine callbacks must be initialized first, so they are 4571 * ready to handle examine callbacks from later modules that will 4572 * register physical bdevs. 4573 */ 4574 if (bdev_module->examine_config != NULL || bdev_module->examine_disk != NULL) { 4575 TAILQ_INSERT_HEAD(&g_bdev_mgr.bdev_modules, bdev_module, internal.tailq); 4576 } else { 4577 TAILQ_INSERT_TAIL(&g_bdev_mgr.bdev_modules, bdev_module, internal.tailq); 4578 } 4579 } 4580 4581 struct spdk_bdev_module * 4582 spdk_bdev_module_list_find(const char *name) 4583 { 4584 struct spdk_bdev_module *bdev_module; 4585 4586 TAILQ_FOREACH(bdev_module, &g_bdev_mgr.bdev_modules, internal.tailq) { 4587 if (strcmp(name, bdev_module->name) == 0) { 4588 break; 4589 } 4590 } 4591 4592 return bdev_module; 4593 } 4594 4595 static void 4596 _spdk_bdev_write_zero_buffer_next(void *_bdev_io) 4597 { 4598 struct spdk_bdev_io *bdev_io = _bdev_io; 4599 uint64_t num_bytes, num_blocks; 4600 void *md_buf = NULL; 4601 int rc; 4602 4603 num_bytes = spdk_min(_bdev_get_block_size_with_md(bdev_io->bdev) * 4604 bdev_io->u.bdev.split_remaining_num_blocks, 4605 ZERO_BUFFER_SIZE); 4606 num_blocks = num_bytes / _bdev_get_block_size_with_md(bdev_io->bdev); 4607 4608 if (spdk_bdev_is_md_separate(bdev_io->bdev)) { 4609 md_buf = (char *)g_bdev_mgr.zero_buffer + 4610 spdk_bdev_get_block_size(bdev_io->bdev) * num_blocks; 4611 } 4612 4613 rc = _spdk_bdev_write_blocks_with_md(bdev_io->internal.desc, 4614 spdk_io_channel_from_ctx(bdev_io->internal.ch), 4615 g_bdev_mgr.zero_buffer, md_buf, 4616 bdev_io->u.bdev.split_current_offset_blocks, num_blocks, 4617 _spdk_bdev_write_zero_buffer_done, bdev_io); 4618 if (rc == 0) { 4619 bdev_io->u.bdev.split_remaining_num_blocks -= num_blocks; 4620 bdev_io->u.bdev.split_current_offset_blocks += num_blocks; 4621 } else if (rc == -ENOMEM) { 4622 _spdk_bdev_queue_io_wait_with_cb(bdev_io, _spdk_bdev_write_zero_buffer_next); 4623 } else { 4624 bdev_io->internal.status = SPDK_BDEV_IO_STATUS_FAILED; 4625 bdev_io->internal.cb(bdev_io, false, bdev_io->internal.caller_ctx); 4626 } 4627 } 4628 4629 static void 4630 _spdk_bdev_write_zero_buffer_done(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg) 4631 { 4632 struct spdk_bdev_io *parent_io = cb_arg; 4633 4634 spdk_bdev_free_io(bdev_io); 4635 4636 if (!success) { 4637 parent_io->internal.status = SPDK_BDEV_IO_STATUS_FAILED; 4638 parent_io->internal.cb(parent_io, false, parent_io->internal.caller_ctx); 4639 return; 4640 } 4641 4642 if (parent_io->u.bdev.split_remaining_num_blocks == 0) { 4643 parent_io->internal.status = SPDK_BDEV_IO_STATUS_SUCCESS; 4644 parent_io->internal.cb(parent_io, true, parent_io->internal.caller_ctx); 4645 return; 4646 } 4647 4648 _spdk_bdev_write_zero_buffer_next(parent_io); 4649 } 4650 4651 static void 4652 _spdk_bdev_set_qos_limit_done(struct set_qos_limit_ctx *ctx, int status) 4653 { 4654 pthread_mutex_lock(&ctx->bdev->internal.mutex); 4655 ctx->bdev->internal.qos_mod_in_progress = false; 4656 pthread_mutex_unlock(&ctx->bdev->internal.mutex); 4657 4658 if (ctx->cb_fn) { 4659 ctx->cb_fn(ctx->cb_arg, status); 4660 } 4661 free(ctx); 4662 } 4663 4664 static void 4665 _spdk_bdev_disable_qos_done(void *cb_arg) 4666 { 4667 struct set_qos_limit_ctx *ctx = cb_arg; 4668 struct spdk_bdev *bdev = ctx->bdev; 4669 struct spdk_bdev_io *bdev_io; 4670 struct spdk_bdev_qos *qos; 4671 4672 pthread_mutex_lock(&bdev->internal.mutex); 4673 qos = bdev->internal.qos; 4674 bdev->internal.qos = NULL; 4675 pthread_mutex_unlock(&bdev->internal.mutex); 4676 4677 while (!TAILQ_EMPTY(&qos->queued)) { 4678 /* Send queued I/O back to their original thread for resubmission. */ 4679 bdev_io = TAILQ_FIRST(&qos->queued); 4680 TAILQ_REMOVE(&qos->queued, bdev_io, internal.link); 4681 4682 if (bdev_io->internal.io_submit_ch) { 4683 /* 4684 * Channel was changed when sending it to the QoS thread - change it back 4685 * before sending it back to the original thread. 4686 */ 4687 bdev_io->internal.ch = bdev_io->internal.io_submit_ch; 4688 bdev_io->internal.io_submit_ch = NULL; 4689 } 4690 4691 spdk_thread_send_msg(spdk_bdev_io_get_thread(bdev_io), 4692 _spdk_bdev_io_submit, bdev_io); 4693 } 4694 4695 if (qos->thread != NULL) { 4696 spdk_put_io_channel(spdk_io_channel_from_ctx(qos->ch)); 4697 spdk_poller_unregister(&qos->poller); 4698 } 4699 4700 free(qos); 4701 4702 _spdk_bdev_set_qos_limit_done(ctx, 0); 4703 } 4704 4705 static void 4706 _spdk_bdev_disable_qos_msg_done(struct spdk_io_channel_iter *i, int status) 4707 { 4708 void *io_device = spdk_io_channel_iter_get_io_device(i); 4709 struct spdk_bdev *bdev = __bdev_from_io_dev(io_device); 4710 struct set_qos_limit_ctx *ctx = spdk_io_channel_iter_get_ctx(i); 4711 struct spdk_thread *thread; 4712 4713 pthread_mutex_lock(&bdev->internal.mutex); 4714 thread = bdev->internal.qos->thread; 4715 pthread_mutex_unlock(&bdev->internal.mutex); 4716 4717 if (thread != NULL) { 4718 spdk_thread_send_msg(thread, _spdk_bdev_disable_qos_done, ctx); 4719 } else { 4720 _spdk_bdev_disable_qos_done(ctx); 4721 } 4722 } 4723 4724 static void 4725 _spdk_bdev_disable_qos_msg(struct spdk_io_channel_iter *i) 4726 { 4727 struct spdk_io_channel *ch = spdk_io_channel_iter_get_channel(i); 4728 struct spdk_bdev_channel *bdev_ch = spdk_io_channel_get_ctx(ch); 4729 4730 bdev_ch->flags &= ~BDEV_CH_QOS_ENABLED; 4731 4732 spdk_for_each_channel_continue(i, 0); 4733 } 4734 4735 static void 4736 _spdk_bdev_update_qos_rate_limit_msg(void *cb_arg) 4737 { 4738 struct set_qos_limit_ctx *ctx = cb_arg; 4739 struct spdk_bdev *bdev = ctx->bdev; 4740 4741 pthread_mutex_lock(&bdev->internal.mutex); 4742 spdk_bdev_qos_update_max_quota_per_timeslice(bdev->internal.qos); 4743 pthread_mutex_unlock(&bdev->internal.mutex); 4744 4745 _spdk_bdev_set_qos_limit_done(ctx, 0); 4746 } 4747 4748 static void 4749 _spdk_bdev_enable_qos_msg(struct spdk_io_channel_iter *i) 4750 { 4751 void *io_device = spdk_io_channel_iter_get_io_device(i); 4752 struct spdk_bdev *bdev = __bdev_from_io_dev(io_device); 4753 struct spdk_io_channel *ch = spdk_io_channel_iter_get_channel(i); 4754 struct spdk_bdev_channel *bdev_ch = spdk_io_channel_get_ctx(ch); 4755 4756 pthread_mutex_lock(&bdev->internal.mutex); 4757 _spdk_bdev_enable_qos(bdev, bdev_ch); 4758 pthread_mutex_unlock(&bdev->internal.mutex); 4759 spdk_for_each_channel_continue(i, 0); 4760 } 4761 4762 static void 4763 _spdk_bdev_enable_qos_done(struct spdk_io_channel_iter *i, int status) 4764 { 4765 struct set_qos_limit_ctx *ctx = spdk_io_channel_iter_get_ctx(i); 4766 4767 _spdk_bdev_set_qos_limit_done(ctx, status); 4768 } 4769 4770 static void 4771 _spdk_bdev_set_qos_rate_limits(struct spdk_bdev *bdev, uint64_t *limits) 4772 { 4773 int i; 4774 4775 assert(bdev->internal.qos != NULL); 4776 4777 for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) { 4778 if (limits[i] != SPDK_BDEV_QOS_LIMIT_NOT_DEFINED) { 4779 bdev->internal.qos->rate_limits[i].limit = limits[i]; 4780 4781 if (limits[i] == 0) { 4782 bdev->internal.qos->rate_limits[i].limit = 4783 SPDK_BDEV_QOS_LIMIT_NOT_DEFINED; 4784 } 4785 } 4786 } 4787 } 4788 4789 void 4790 spdk_bdev_set_qos_rate_limits(struct spdk_bdev *bdev, uint64_t *limits, 4791 void (*cb_fn)(void *cb_arg, int status), void *cb_arg) 4792 { 4793 struct set_qos_limit_ctx *ctx; 4794 uint32_t limit_set_complement; 4795 uint64_t min_limit_per_sec; 4796 int i; 4797 bool disable_rate_limit = true; 4798 4799 for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) { 4800 if (limits[i] == SPDK_BDEV_QOS_LIMIT_NOT_DEFINED) { 4801 continue; 4802 } 4803 4804 if (limits[i] > 0) { 4805 disable_rate_limit = false; 4806 } 4807 4808 if (_spdk_bdev_qos_is_iops_rate_limit(i) == true) { 4809 min_limit_per_sec = SPDK_BDEV_QOS_MIN_IOS_PER_SEC; 4810 } else { 4811 /* Change from megabyte to byte rate limit */ 4812 limits[i] = limits[i] * 1024 * 1024; 4813 min_limit_per_sec = SPDK_BDEV_QOS_MIN_BYTES_PER_SEC; 4814 } 4815 4816 limit_set_complement = limits[i] % min_limit_per_sec; 4817 if (limit_set_complement) { 4818 SPDK_ERRLOG("Requested rate limit %" PRIu64 " is not a multiple of %" PRIu64 "\n", 4819 limits[i], min_limit_per_sec); 4820 limits[i] += min_limit_per_sec - limit_set_complement; 4821 SPDK_ERRLOG("Round up the rate limit to %" PRIu64 "\n", limits[i]); 4822 } 4823 } 4824 4825 ctx = calloc(1, sizeof(*ctx)); 4826 if (ctx == NULL) { 4827 cb_fn(cb_arg, -ENOMEM); 4828 return; 4829 } 4830 4831 ctx->cb_fn = cb_fn; 4832 ctx->cb_arg = cb_arg; 4833 ctx->bdev = bdev; 4834 4835 pthread_mutex_lock(&bdev->internal.mutex); 4836 if (bdev->internal.qos_mod_in_progress) { 4837 pthread_mutex_unlock(&bdev->internal.mutex); 4838 free(ctx); 4839 cb_fn(cb_arg, -EAGAIN); 4840 return; 4841 } 4842 bdev->internal.qos_mod_in_progress = true; 4843 4844 if (disable_rate_limit == true && bdev->internal.qos) { 4845 for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) { 4846 if (limits[i] == SPDK_BDEV_QOS_LIMIT_NOT_DEFINED && 4847 (bdev->internal.qos->rate_limits[i].limit > 0 && 4848 bdev->internal.qos->rate_limits[i].limit != 4849 SPDK_BDEV_QOS_LIMIT_NOT_DEFINED)) { 4850 disable_rate_limit = false; 4851 break; 4852 } 4853 } 4854 } 4855 4856 if (disable_rate_limit == false) { 4857 if (bdev->internal.qos == NULL) { 4858 bdev->internal.qos = calloc(1, sizeof(*bdev->internal.qos)); 4859 if (!bdev->internal.qos) { 4860 pthread_mutex_unlock(&bdev->internal.mutex); 4861 SPDK_ERRLOG("Unable to allocate memory for QoS tracking\n"); 4862 _spdk_bdev_set_qos_limit_done(ctx, -ENOMEM); 4863 return; 4864 } 4865 } 4866 4867 if (bdev->internal.qos->thread == NULL) { 4868 /* Enabling */ 4869 _spdk_bdev_set_qos_rate_limits(bdev, limits); 4870 4871 spdk_for_each_channel(__bdev_to_io_dev(bdev), 4872 _spdk_bdev_enable_qos_msg, ctx, 4873 _spdk_bdev_enable_qos_done); 4874 } else { 4875 /* Updating */ 4876 _spdk_bdev_set_qos_rate_limits(bdev, limits); 4877 4878 spdk_thread_send_msg(bdev->internal.qos->thread, 4879 _spdk_bdev_update_qos_rate_limit_msg, ctx); 4880 } 4881 } else { 4882 if (bdev->internal.qos != NULL) { 4883 _spdk_bdev_set_qos_rate_limits(bdev, limits); 4884 4885 /* Disabling */ 4886 spdk_for_each_channel(__bdev_to_io_dev(bdev), 4887 _spdk_bdev_disable_qos_msg, ctx, 4888 _spdk_bdev_disable_qos_msg_done); 4889 } else { 4890 pthread_mutex_unlock(&bdev->internal.mutex); 4891 _spdk_bdev_set_qos_limit_done(ctx, 0); 4892 return; 4893 } 4894 } 4895 4896 pthread_mutex_unlock(&bdev->internal.mutex); 4897 } 4898 4899 struct spdk_bdev_histogram_ctx { 4900 spdk_bdev_histogram_status_cb cb_fn; 4901 void *cb_arg; 4902 struct spdk_bdev *bdev; 4903 int status; 4904 }; 4905 4906 static void 4907 _spdk_bdev_histogram_disable_channel_cb(struct spdk_io_channel_iter *i, int status) 4908 { 4909 struct spdk_bdev_histogram_ctx *ctx = spdk_io_channel_iter_get_ctx(i); 4910 4911 pthread_mutex_lock(&ctx->bdev->internal.mutex); 4912 ctx->bdev->internal.histogram_in_progress = false; 4913 pthread_mutex_unlock(&ctx->bdev->internal.mutex); 4914 ctx->cb_fn(ctx->cb_arg, ctx->status); 4915 free(ctx); 4916 } 4917 4918 static void 4919 _spdk_bdev_histogram_disable_channel(struct spdk_io_channel_iter *i) 4920 { 4921 struct spdk_io_channel *_ch = spdk_io_channel_iter_get_channel(i); 4922 struct spdk_bdev_channel *ch = spdk_io_channel_get_ctx(_ch); 4923 4924 if (ch->histogram != NULL) { 4925 spdk_histogram_data_free(ch->histogram); 4926 ch->histogram = NULL; 4927 } 4928 spdk_for_each_channel_continue(i, 0); 4929 } 4930 4931 static void 4932 _spdk_bdev_histogram_enable_channel_cb(struct spdk_io_channel_iter *i, int status) 4933 { 4934 struct spdk_bdev_histogram_ctx *ctx = spdk_io_channel_iter_get_ctx(i); 4935 4936 if (status != 0) { 4937 ctx->status = status; 4938 ctx->bdev->internal.histogram_enabled = false; 4939 spdk_for_each_channel(__bdev_to_io_dev(ctx->bdev), _spdk_bdev_histogram_disable_channel, ctx, 4940 _spdk_bdev_histogram_disable_channel_cb); 4941 } else { 4942 pthread_mutex_lock(&ctx->bdev->internal.mutex); 4943 ctx->bdev->internal.histogram_in_progress = false; 4944 pthread_mutex_unlock(&ctx->bdev->internal.mutex); 4945 ctx->cb_fn(ctx->cb_arg, ctx->status); 4946 free(ctx); 4947 } 4948 } 4949 4950 static void 4951 _spdk_bdev_histogram_enable_channel(struct spdk_io_channel_iter *i) 4952 { 4953 struct spdk_io_channel *_ch = spdk_io_channel_iter_get_channel(i); 4954 struct spdk_bdev_channel *ch = spdk_io_channel_get_ctx(_ch); 4955 int status = 0; 4956 4957 if (ch->histogram == NULL) { 4958 ch->histogram = spdk_histogram_data_alloc(); 4959 if (ch->histogram == NULL) { 4960 status = -ENOMEM; 4961 } 4962 } 4963 4964 spdk_for_each_channel_continue(i, status); 4965 } 4966 4967 void 4968 spdk_bdev_histogram_enable(struct spdk_bdev *bdev, spdk_bdev_histogram_status_cb cb_fn, 4969 void *cb_arg, bool enable) 4970 { 4971 struct spdk_bdev_histogram_ctx *ctx; 4972 4973 ctx = calloc(1, sizeof(struct spdk_bdev_histogram_ctx)); 4974 if (ctx == NULL) { 4975 cb_fn(cb_arg, -ENOMEM); 4976 return; 4977 } 4978 4979 ctx->bdev = bdev; 4980 ctx->status = 0; 4981 ctx->cb_fn = cb_fn; 4982 ctx->cb_arg = cb_arg; 4983 4984 pthread_mutex_lock(&bdev->internal.mutex); 4985 if (bdev->internal.histogram_in_progress) { 4986 pthread_mutex_unlock(&bdev->internal.mutex); 4987 free(ctx); 4988 cb_fn(cb_arg, -EAGAIN); 4989 return; 4990 } 4991 4992 bdev->internal.histogram_in_progress = true; 4993 pthread_mutex_unlock(&bdev->internal.mutex); 4994 4995 bdev->internal.histogram_enabled = enable; 4996 4997 if (enable) { 4998 /* Allocate histogram for each channel */ 4999 spdk_for_each_channel(__bdev_to_io_dev(bdev), _spdk_bdev_histogram_enable_channel, ctx, 5000 _spdk_bdev_histogram_enable_channel_cb); 5001 } else { 5002 spdk_for_each_channel(__bdev_to_io_dev(bdev), _spdk_bdev_histogram_disable_channel, ctx, 5003 _spdk_bdev_histogram_disable_channel_cb); 5004 } 5005 } 5006 5007 struct spdk_bdev_histogram_data_ctx { 5008 spdk_bdev_histogram_data_cb cb_fn; 5009 void *cb_arg; 5010 struct spdk_bdev *bdev; 5011 /** merged histogram data from all channels */ 5012 struct spdk_histogram_data *histogram; 5013 }; 5014 5015 static void 5016 _spdk_bdev_histogram_get_channel_cb(struct spdk_io_channel_iter *i, int status) 5017 { 5018 struct spdk_bdev_histogram_data_ctx *ctx = spdk_io_channel_iter_get_ctx(i); 5019 5020 ctx->cb_fn(ctx->cb_arg, status, ctx->histogram); 5021 free(ctx); 5022 } 5023 5024 static void 5025 _spdk_bdev_histogram_get_channel(struct spdk_io_channel_iter *i) 5026 { 5027 struct spdk_io_channel *_ch = spdk_io_channel_iter_get_channel(i); 5028 struct spdk_bdev_channel *ch = spdk_io_channel_get_ctx(_ch); 5029 struct spdk_bdev_histogram_data_ctx *ctx = spdk_io_channel_iter_get_ctx(i); 5030 int status = 0; 5031 5032 if (ch->histogram == NULL) { 5033 status = -EFAULT; 5034 } else { 5035 spdk_histogram_data_merge(ctx->histogram, ch->histogram); 5036 } 5037 5038 spdk_for_each_channel_continue(i, status); 5039 } 5040 5041 void 5042 spdk_bdev_histogram_get(struct spdk_bdev *bdev, struct spdk_histogram_data *histogram, 5043 spdk_bdev_histogram_data_cb cb_fn, 5044 void *cb_arg) 5045 { 5046 struct spdk_bdev_histogram_data_ctx *ctx; 5047 5048 ctx = calloc(1, sizeof(struct spdk_bdev_histogram_data_ctx)); 5049 if (ctx == NULL) { 5050 cb_fn(cb_arg, -ENOMEM, NULL); 5051 return; 5052 } 5053 5054 ctx->bdev = bdev; 5055 ctx->cb_fn = cb_fn; 5056 ctx->cb_arg = cb_arg; 5057 5058 ctx->histogram = histogram; 5059 5060 spdk_for_each_channel(__bdev_to_io_dev(bdev), _spdk_bdev_histogram_get_channel, ctx, 5061 _spdk_bdev_histogram_get_channel_cb); 5062 } 5063 5064 SPDK_LOG_REGISTER_COMPONENT("bdev", SPDK_LOG_BDEV) 5065 5066 SPDK_TRACE_REGISTER_FN(bdev_trace, "bdev", TRACE_GROUP_BDEV) 5067 { 5068 spdk_trace_register_owner(OWNER_BDEV, 'b'); 5069 spdk_trace_register_object(OBJECT_BDEV_IO, 'i'); 5070 spdk_trace_register_description("BDEV_IO_START", TRACE_BDEV_IO_START, OWNER_BDEV, 5071 OBJECT_BDEV_IO, 1, 0, "type: "); 5072 spdk_trace_register_description("BDEV_IO_DONE", TRACE_BDEV_IO_DONE, OWNER_BDEV, 5073 OBJECT_BDEV_IO, 0, 0, ""); 5074 } 5075