1 /*- 2 * BSD LICENSE 3 * 4 * Copyright (c) Intel Corporation. 5 * All rights reserved. 6 * 7 * Redistribution and use in source and binary forms, with or without 8 * modification, are permitted provided that the following conditions 9 * are met: 10 * 11 * * Redistributions of source code must retain the above copyright 12 * notice, this list of conditions and the following disclaimer. 13 * * Redistributions in binary form must reproduce the above copyright 14 * notice, this list of conditions and the following disclaimer in 15 * the documentation and/or other materials provided with the 16 * distribution. 17 * * Neither the name of Intel Corporation nor the names of its 18 * contributors may be used to endorse or promote products derived 19 * from this software without specific prior written permission. 20 * 21 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 22 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 23 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR 24 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT 25 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, 26 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT 27 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 28 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY 29 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 30 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE 31 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 32 */ 33 34 #include "spdk/stdinc.h" 35 36 #include "spdk/bdev.h" 37 #include "spdk/conf.h" 38 39 #include "spdk/config.h" 40 #include "spdk/env.h" 41 #include "spdk/event.h" 42 #include "spdk/thread.h" 43 #include "spdk/likely.h" 44 #include "spdk/queue.h" 45 #include "spdk/nvme_spec.h" 46 #include "spdk/scsi_spec.h" 47 #include "spdk/util.h" 48 #include "spdk/trace.h" 49 50 #include "spdk/bdev_module.h" 51 #include "spdk_internal/log.h" 52 #include "spdk/string.h" 53 54 #ifdef SPDK_CONFIG_VTUNE 55 #include "ittnotify.h" 56 #include "ittnotify_types.h" 57 int __itt_init_ittlib(const char *, __itt_group_id); 58 #endif 59 60 #define SPDK_BDEV_IO_POOL_SIZE (64 * 1024) 61 #define SPDK_BDEV_IO_CACHE_SIZE 256 62 #define BUF_SMALL_POOL_SIZE 8192 63 #define BUF_LARGE_POOL_SIZE 1024 64 #define NOMEM_THRESHOLD_COUNT 8 65 #define ZERO_BUFFER_SIZE 0x100000 66 67 #define OWNER_BDEV 0x2 68 69 #define OBJECT_BDEV_IO 0x2 70 71 #define TRACE_GROUP_BDEV 0x3 72 #define TRACE_BDEV_IO_START SPDK_TPOINT_ID(TRACE_GROUP_BDEV, 0x0) 73 #define TRACE_BDEV_IO_DONE SPDK_TPOINT_ID(TRACE_GROUP_BDEV, 0x1) 74 75 #define SPDK_BDEV_QOS_TIMESLICE_IN_USEC 1000 76 #define SPDK_BDEV_QOS_MIN_IO_PER_TIMESLICE 1 77 #define SPDK_BDEV_QOS_MIN_BYTE_PER_TIMESLICE 512 78 #define SPDK_BDEV_QOS_MIN_IOS_PER_SEC 10000 79 #define SPDK_BDEV_QOS_MIN_BYTES_PER_SEC (10 * 1024 * 1024) 80 #define SPDK_BDEV_QOS_LIMIT_NOT_DEFINED UINT64_MAX 81 82 #define SPDK_BDEV_POOL_ALIGNMENT 512 83 84 static const char *qos_conf_type[] = {"Limit_IOPS", 85 "Limit_BPS", "Limit_Read_BPS", "Limit_Write_BPS" 86 }; 87 static const char *qos_rpc_type[] = {"rw_ios_per_sec", 88 "rw_mbytes_per_sec", "r_mbytes_per_sec", "w_mbytes_per_sec" 89 }; 90 91 TAILQ_HEAD(spdk_bdev_list, spdk_bdev); 92 93 struct spdk_bdev_mgr { 94 struct spdk_mempool *bdev_io_pool; 95 96 struct spdk_mempool *buf_small_pool; 97 struct spdk_mempool *buf_large_pool; 98 99 void *zero_buffer; 100 101 TAILQ_HEAD(bdev_module_list, spdk_bdev_module) bdev_modules; 102 103 struct spdk_bdev_list bdevs; 104 105 bool init_complete; 106 bool module_init_complete; 107 108 #ifdef SPDK_CONFIG_VTUNE 109 __itt_domain *domain; 110 #endif 111 }; 112 113 static struct spdk_bdev_mgr g_bdev_mgr = { 114 .bdev_modules = TAILQ_HEAD_INITIALIZER(g_bdev_mgr.bdev_modules), 115 .bdevs = TAILQ_HEAD_INITIALIZER(g_bdev_mgr.bdevs), 116 .init_complete = false, 117 .module_init_complete = false, 118 }; 119 120 static struct spdk_bdev_opts g_bdev_opts = { 121 .bdev_io_pool_size = SPDK_BDEV_IO_POOL_SIZE, 122 .bdev_io_cache_size = SPDK_BDEV_IO_CACHE_SIZE, 123 }; 124 125 static spdk_bdev_init_cb g_init_cb_fn = NULL; 126 static void *g_init_cb_arg = NULL; 127 128 static spdk_bdev_fini_cb g_fini_cb_fn = NULL; 129 static void *g_fini_cb_arg = NULL; 130 static struct spdk_thread *g_fini_thread = NULL; 131 132 struct spdk_bdev_qos_limit { 133 /** IOs or bytes allowed per second (i.e., 1s). */ 134 uint64_t limit; 135 136 /** Remaining IOs or bytes allowed in current timeslice (e.g., 1ms). 137 * For remaining bytes, allowed to run negative if an I/O is submitted when 138 * some bytes are remaining, but the I/O is bigger than that amount. The 139 * excess will be deducted from the next timeslice. 140 */ 141 int64_t remaining_this_timeslice; 142 143 /** Minimum allowed IOs or bytes to be issued in one timeslice (e.g., 1ms). */ 144 uint32_t min_per_timeslice; 145 146 /** Maximum allowed IOs or bytes to be issued in one timeslice (e.g., 1ms). */ 147 uint32_t max_per_timeslice; 148 149 /** Function to check whether to queue the IO. */ 150 bool (*queue_io)(const struct spdk_bdev_qos_limit *limit, struct spdk_bdev_io *io); 151 152 /** Function to update for the submitted IO. */ 153 void (*update_quota)(struct spdk_bdev_qos_limit *limit, struct spdk_bdev_io *io); 154 }; 155 156 struct spdk_bdev_qos { 157 /** Types of structure of rate limits. */ 158 struct spdk_bdev_qos_limit rate_limits[SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES]; 159 160 /** The channel that all I/O are funneled through. */ 161 struct spdk_bdev_channel *ch; 162 163 /** The thread on which the poller is running. */ 164 struct spdk_thread *thread; 165 166 /** Queue of I/O waiting to be issued. */ 167 bdev_io_tailq_t queued; 168 169 /** Size of a timeslice in tsc ticks. */ 170 uint64_t timeslice_size; 171 172 /** Timestamp of start of last timeslice. */ 173 uint64_t last_timeslice; 174 175 /** Poller that processes queued I/O commands each time slice. */ 176 struct spdk_poller *poller; 177 }; 178 179 struct spdk_bdev_mgmt_channel { 180 bdev_io_stailq_t need_buf_small; 181 bdev_io_stailq_t need_buf_large; 182 183 /* 184 * Each thread keeps a cache of bdev_io - this allows 185 * bdev threads which are *not* DPDK threads to still 186 * benefit from a per-thread bdev_io cache. Without 187 * this, non-DPDK threads fetching from the mempool 188 * incur a cmpxchg on get and put. 189 */ 190 bdev_io_stailq_t per_thread_cache; 191 uint32_t per_thread_cache_count; 192 uint32_t bdev_io_cache_size; 193 194 TAILQ_HEAD(, spdk_bdev_shared_resource) shared_resources; 195 TAILQ_HEAD(, spdk_bdev_io_wait_entry) io_wait_queue; 196 }; 197 198 /* 199 * Per-module (or per-io_device) data. Multiple bdevs built on the same io_device 200 * will queue here their IO that awaits retry. It makes it possible to retry sending 201 * IO to one bdev after IO from other bdev completes. 202 */ 203 struct spdk_bdev_shared_resource { 204 /* The bdev management channel */ 205 struct spdk_bdev_mgmt_channel *mgmt_ch; 206 207 /* 208 * Count of I/O submitted to bdev module and waiting for completion. 209 * Incremented before submit_request() is called on an spdk_bdev_io. 210 */ 211 uint64_t io_outstanding; 212 213 /* 214 * Queue of IO awaiting retry because of a previous NOMEM status returned 215 * on this channel. 216 */ 217 bdev_io_tailq_t nomem_io; 218 219 /* 220 * Threshold which io_outstanding must drop to before retrying nomem_io. 221 */ 222 uint64_t nomem_threshold; 223 224 /* I/O channel allocated by a bdev module */ 225 struct spdk_io_channel *shared_ch; 226 227 /* Refcount of bdev channels using this resource */ 228 uint32_t ref; 229 230 TAILQ_ENTRY(spdk_bdev_shared_resource) link; 231 }; 232 233 #define BDEV_CH_RESET_IN_PROGRESS (1 << 0) 234 #define BDEV_CH_QOS_ENABLED (1 << 1) 235 236 struct spdk_bdev_channel { 237 struct spdk_bdev *bdev; 238 239 /* The channel for the underlying device */ 240 struct spdk_io_channel *channel; 241 242 /* Per io_device per thread data */ 243 struct spdk_bdev_shared_resource *shared_resource; 244 245 struct spdk_bdev_io_stat stat; 246 247 /* 248 * Count of I/O submitted through this channel and waiting for completion. 249 * Incremented before submit_request() is called on an spdk_bdev_io. 250 */ 251 uint64_t io_outstanding; 252 253 bdev_io_tailq_t queued_resets; 254 255 uint32_t flags; 256 257 struct spdk_histogram_data *histogram; 258 259 #ifdef SPDK_CONFIG_VTUNE 260 uint64_t start_tsc; 261 uint64_t interval_tsc; 262 __itt_string_handle *handle; 263 struct spdk_bdev_io_stat prev_stat; 264 #endif 265 266 }; 267 268 struct spdk_bdev_desc { 269 struct spdk_bdev *bdev; 270 struct spdk_thread *thread; 271 spdk_bdev_remove_cb_t remove_cb; 272 void *remove_ctx; 273 bool remove_scheduled; 274 bool closed; 275 bool write; 276 TAILQ_ENTRY(spdk_bdev_desc) link; 277 }; 278 279 struct spdk_bdev_iostat_ctx { 280 struct spdk_bdev_io_stat *stat; 281 spdk_bdev_get_device_stat_cb cb; 282 void *cb_arg; 283 }; 284 285 #define __bdev_to_io_dev(bdev) (((char *)bdev) + 1) 286 #define __bdev_from_io_dev(io_dev) ((struct spdk_bdev *)(((char *)io_dev) - 1)) 287 288 static void _spdk_bdev_write_zero_buffer_done(struct spdk_bdev_io *bdev_io, bool success, 289 void *cb_arg); 290 static void _spdk_bdev_write_zero_buffer_next(void *_bdev_io); 291 292 void 293 spdk_bdev_get_opts(struct spdk_bdev_opts *opts) 294 { 295 *opts = g_bdev_opts; 296 } 297 298 int 299 spdk_bdev_set_opts(struct spdk_bdev_opts *opts) 300 { 301 uint32_t min_pool_size; 302 303 /* 304 * Add 1 to the thread count to account for the extra mgmt_ch that gets created during subsystem 305 * initialization. A second mgmt_ch will be created on the same thread when the application starts 306 * but before the deferred put_io_channel event is executed for the first mgmt_ch. 307 */ 308 min_pool_size = opts->bdev_io_cache_size * (spdk_thread_get_count() + 1); 309 if (opts->bdev_io_pool_size < min_pool_size) { 310 SPDK_ERRLOG("bdev_io_pool_size %" PRIu32 " is not compatible with bdev_io_cache_size %" PRIu32 311 " and %" PRIu32 " threads\n", opts->bdev_io_pool_size, opts->bdev_io_cache_size, 312 spdk_thread_get_count()); 313 SPDK_ERRLOG("bdev_io_pool_size must be at least %" PRIu32 "\n", min_pool_size); 314 return -1; 315 } 316 317 g_bdev_opts = *opts; 318 return 0; 319 } 320 321 struct spdk_bdev * 322 spdk_bdev_first(void) 323 { 324 struct spdk_bdev *bdev; 325 326 bdev = TAILQ_FIRST(&g_bdev_mgr.bdevs); 327 if (bdev) { 328 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Starting bdev iteration at %s\n", bdev->name); 329 } 330 331 return bdev; 332 } 333 334 struct spdk_bdev * 335 spdk_bdev_next(struct spdk_bdev *prev) 336 { 337 struct spdk_bdev *bdev; 338 339 bdev = TAILQ_NEXT(prev, internal.link); 340 if (bdev) { 341 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Continuing bdev iteration at %s\n", bdev->name); 342 } 343 344 return bdev; 345 } 346 347 static struct spdk_bdev * 348 _bdev_next_leaf(struct spdk_bdev *bdev) 349 { 350 while (bdev != NULL) { 351 if (bdev->internal.claim_module == NULL) { 352 return bdev; 353 } else { 354 bdev = TAILQ_NEXT(bdev, internal.link); 355 } 356 } 357 358 return bdev; 359 } 360 361 struct spdk_bdev * 362 spdk_bdev_first_leaf(void) 363 { 364 struct spdk_bdev *bdev; 365 366 bdev = _bdev_next_leaf(TAILQ_FIRST(&g_bdev_mgr.bdevs)); 367 368 if (bdev) { 369 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Starting bdev iteration at %s\n", bdev->name); 370 } 371 372 return bdev; 373 } 374 375 struct spdk_bdev * 376 spdk_bdev_next_leaf(struct spdk_bdev *prev) 377 { 378 struct spdk_bdev *bdev; 379 380 bdev = _bdev_next_leaf(TAILQ_NEXT(prev, internal.link)); 381 382 if (bdev) { 383 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Continuing bdev iteration at %s\n", bdev->name); 384 } 385 386 return bdev; 387 } 388 389 struct spdk_bdev * 390 spdk_bdev_get_by_name(const char *bdev_name) 391 { 392 struct spdk_bdev_alias *tmp; 393 struct spdk_bdev *bdev = spdk_bdev_first(); 394 395 while (bdev != NULL) { 396 if (strcmp(bdev_name, bdev->name) == 0) { 397 return bdev; 398 } 399 400 TAILQ_FOREACH(tmp, &bdev->aliases, tailq) { 401 if (strcmp(bdev_name, tmp->alias) == 0) { 402 return bdev; 403 } 404 } 405 406 bdev = spdk_bdev_next(bdev); 407 } 408 409 return NULL; 410 } 411 412 void 413 spdk_bdev_io_set_buf(struct spdk_bdev_io *bdev_io, void *buf, size_t len) 414 { 415 struct iovec *iovs; 416 417 iovs = bdev_io->u.bdev.iovs; 418 419 assert(iovs != NULL); 420 assert(bdev_io->u.bdev.iovcnt >= 1); 421 422 iovs[0].iov_base = buf; 423 iovs[0].iov_len = len; 424 } 425 426 static bool 427 _is_buf_allocated(struct iovec *iovs) 428 { 429 return iovs[0].iov_base != NULL; 430 } 431 432 static bool 433 _are_iovs_aligned(struct iovec *iovs, int iovcnt, uint32_t alignment) 434 { 435 int i; 436 uintptr_t iov_base; 437 438 if (spdk_likely(alignment == 1)) { 439 return true; 440 } 441 442 for (i = 0; i < iovcnt; i++) { 443 iov_base = (uintptr_t)iovs[i].iov_base; 444 if ((iov_base & (alignment - 1)) != 0) { 445 return false; 446 } 447 } 448 449 return true; 450 } 451 452 static void 453 _copy_iovs_to_buf(void *buf, size_t buf_len, struct iovec *iovs, int iovcnt) 454 { 455 int i; 456 size_t len; 457 458 for (i = 0; i < iovcnt; i++) { 459 len = spdk_min(iovs[i].iov_len, buf_len); 460 memcpy(buf, iovs[i].iov_base, len); 461 buf += len; 462 buf_len -= len; 463 } 464 } 465 466 static void 467 _copy_buf_to_iovs(struct iovec *iovs, int iovcnt, void *buf, size_t buf_len) 468 { 469 int i; 470 size_t len; 471 472 for (i = 0; i < iovcnt; i++) { 473 len = spdk_min(iovs[i].iov_len, buf_len); 474 memcpy(iovs[i].iov_base, buf, len); 475 buf += len; 476 buf_len -= len; 477 } 478 } 479 480 static void 481 _bdev_io_set_bounce_buf(struct spdk_bdev_io *bdev_io, void *buf, size_t len) 482 { 483 /* save original iovec */ 484 bdev_io->internal.orig_iovs = bdev_io->u.bdev.iovs; 485 bdev_io->internal.orig_iovcnt = bdev_io->u.bdev.iovcnt; 486 /* set bounce iov */ 487 bdev_io->u.bdev.iovs = &bdev_io->internal.bounce_iov; 488 bdev_io->u.bdev.iovcnt = 1; 489 /* set bounce buffer for this operation */ 490 bdev_io->u.bdev.iovs[0].iov_base = buf; 491 bdev_io->u.bdev.iovs[0].iov_len = len; 492 /* if this is write path, copy data from original buffer to bounce buffer */ 493 if (bdev_io->type == SPDK_BDEV_IO_TYPE_WRITE) { 494 _copy_iovs_to_buf(buf, len, bdev_io->internal.orig_iovs, bdev_io->internal.orig_iovcnt); 495 } 496 } 497 498 static void 499 spdk_bdev_io_put_buf(struct spdk_bdev_io *bdev_io) 500 { 501 struct spdk_mempool *pool; 502 struct spdk_bdev_io *tmp; 503 void *buf, *aligned_buf; 504 bdev_io_stailq_t *stailq; 505 struct spdk_bdev_mgmt_channel *ch; 506 uint64_t buf_len; 507 uint64_t alignment; 508 bool buf_allocated; 509 510 buf = bdev_io->internal.buf; 511 buf_len = bdev_io->internal.buf_len; 512 alignment = spdk_bdev_get_buf_align(bdev_io->bdev); 513 ch = bdev_io->internal.ch->shared_resource->mgmt_ch; 514 515 bdev_io->internal.buf = NULL; 516 517 if (buf_len + alignment <= SPDK_BDEV_SMALL_BUF_MAX_SIZE + SPDK_BDEV_POOL_ALIGNMENT) { 518 pool = g_bdev_mgr.buf_small_pool; 519 stailq = &ch->need_buf_small; 520 } else { 521 pool = g_bdev_mgr.buf_large_pool; 522 stailq = &ch->need_buf_large; 523 } 524 525 if (STAILQ_EMPTY(stailq)) { 526 spdk_mempool_put(pool, buf); 527 } else { 528 tmp = STAILQ_FIRST(stailq); 529 530 alignment = spdk_bdev_get_buf_align(tmp->bdev); 531 buf_allocated = _is_buf_allocated(tmp->u.bdev.iovs); 532 533 aligned_buf = (void *)(((uintptr_t)buf + 534 (alignment - 1)) & ~(alignment - 1)); 535 if (buf_allocated) { 536 _bdev_io_set_bounce_buf(tmp, aligned_buf, tmp->internal.buf_len); 537 } else { 538 spdk_bdev_io_set_buf(tmp, aligned_buf, tmp->internal.buf_len); 539 } 540 541 STAILQ_REMOVE_HEAD(stailq, internal.buf_link); 542 tmp->internal.buf = buf; 543 tmp->internal.get_buf_cb(tmp->internal.ch->channel, tmp); 544 } 545 } 546 547 static void 548 _bdev_io_unset_bounce_buf(struct spdk_bdev_io *bdev_io) 549 { 550 /* if this is read path, copy data from bounce buffer to original buffer */ 551 if (bdev_io->type == SPDK_BDEV_IO_TYPE_READ && 552 bdev_io->internal.status == SPDK_BDEV_IO_STATUS_SUCCESS) { 553 _copy_buf_to_iovs(bdev_io->internal.orig_iovs, bdev_io->internal.orig_iovcnt, 554 bdev_io->internal.bounce_iov.iov_base, bdev_io->internal.bounce_iov.iov_len); 555 } 556 /* set orignal buffer for this io */ 557 bdev_io->u.bdev.iovcnt = bdev_io->internal.orig_iovcnt; 558 bdev_io->u.bdev.iovs = bdev_io->internal.orig_iovs; 559 /* disable bouncing buffer for this io */ 560 bdev_io->internal.orig_iovcnt = 0; 561 bdev_io->internal.orig_iovs = NULL; 562 /* return bounce buffer to the pool */ 563 spdk_bdev_io_put_buf(bdev_io); 564 } 565 566 void 567 spdk_bdev_io_get_buf(struct spdk_bdev_io *bdev_io, spdk_bdev_io_get_buf_cb cb, uint64_t len) 568 { 569 struct spdk_mempool *pool; 570 bdev_io_stailq_t *stailq; 571 void *buf, *aligned_buf; 572 struct spdk_bdev_mgmt_channel *mgmt_ch; 573 uint64_t alignment; 574 bool buf_allocated; 575 576 assert(cb != NULL); 577 assert(bdev_io->u.bdev.iovs != NULL); 578 579 alignment = spdk_bdev_get_buf_align(bdev_io->bdev); 580 buf_allocated = _is_buf_allocated(bdev_io->u.bdev.iovs); 581 582 if (buf_allocated && 583 _are_iovs_aligned(bdev_io->u.bdev.iovs, bdev_io->u.bdev.iovcnt, alignment)) { 584 /* Buffer already present and aligned */ 585 cb(bdev_io->internal.ch->channel, bdev_io); 586 return; 587 } 588 589 assert(len + alignment <= SPDK_BDEV_LARGE_BUF_MAX_SIZE + SPDK_BDEV_POOL_ALIGNMENT); 590 mgmt_ch = bdev_io->internal.ch->shared_resource->mgmt_ch; 591 592 bdev_io->internal.buf_len = len; 593 bdev_io->internal.get_buf_cb = cb; 594 595 if (len + alignment <= SPDK_BDEV_SMALL_BUF_MAX_SIZE + SPDK_BDEV_POOL_ALIGNMENT) { 596 pool = g_bdev_mgr.buf_small_pool; 597 stailq = &mgmt_ch->need_buf_small; 598 } else { 599 pool = g_bdev_mgr.buf_large_pool; 600 stailq = &mgmt_ch->need_buf_large; 601 } 602 603 buf = spdk_mempool_get(pool); 604 605 if (!buf) { 606 STAILQ_INSERT_TAIL(stailq, bdev_io, internal.buf_link); 607 } else { 608 aligned_buf = (void *)(((uintptr_t)buf + (alignment - 1)) & ~(alignment - 1)); 609 610 if (buf_allocated) { 611 _bdev_io_set_bounce_buf(bdev_io, aligned_buf, len); 612 } else { 613 spdk_bdev_io_set_buf(bdev_io, aligned_buf, len); 614 } 615 bdev_io->internal.buf = buf; 616 bdev_io->internal.get_buf_cb(bdev_io->internal.ch->channel, bdev_io); 617 } 618 } 619 620 static int 621 spdk_bdev_module_get_max_ctx_size(void) 622 { 623 struct spdk_bdev_module *bdev_module; 624 int max_bdev_module_size = 0; 625 626 TAILQ_FOREACH(bdev_module, &g_bdev_mgr.bdev_modules, internal.tailq) { 627 if (bdev_module->get_ctx_size && bdev_module->get_ctx_size() > max_bdev_module_size) { 628 max_bdev_module_size = bdev_module->get_ctx_size(); 629 } 630 } 631 632 return max_bdev_module_size; 633 } 634 635 void 636 spdk_bdev_config_text(FILE *fp) 637 { 638 struct spdk_bdev_module *bdev_module; 639 640 TAILQ_FOREACH(bdev_module, &g_bdev_mgr.bdev_modules, internal.tailq) { 641 if (bdev_module->config_text) { 642 bdev_module->config_text(fp); 643 } 644 } 645 } 646 647 static void 648 spdk_bdev_qos_config_json(struct spdk_bdev *bdev, struct spdk_json_write_ctx *w) 649 { 650 int i; 651 struct spdk_bdev_qos *qos = bdev->internal.qos; 652 uint64_t limits[SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES]; 653 654 if (!qos) { 655 return; 656 } 657 658 spdk_bdev_get_qos_rate_limits(bdev, limits); 659 660 spdk_json_write_object_begin(w); 661 spdk_json_write_named_string(w, "method", "set_bdev_qos_limit"); 662 663 spdk_json_write_named_object_begin(w, "params"); 664 spdk_json_write_named_string(w, "name", bdev->name); 665 for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) { 666 if (limits[i] > 0) { 667 spdk_json_write_named_uint64(w, qos_rpc_type[i], limits[i]); 668 } 669 } 670 spdk_json_write_object_end(w); 671 672 spdk_json_write_object_end(w); 673 } 674 675 void 676 spdk_bdev_subsystem_config_json(struct spdk_json_write_ctx *w) 677 { 678 struct spdk_bdev_module *bdev_module; 679 struct spdk_bdev *bdev; 680 681 assert(w != NULL); 682 683 spdk_json_write_array_begin(w); 684 685 spdk_json_write_object_begin(w); 686 spdk_json_write_named_string(w, "method", "set_bdev_options"); 687 spdk_json_write_named_object_begin(w, "params"); 688 spdk_json_write_named_uint32(w, "bdev_io_pool_size", g_bdev_opts.bdev_io_pool_size); 689 spdk_json_write_named_uint32(w, "bdev_io_cache_size", g_bdev_opts.bdev_io_cache_size); 690 spdk_json_write_object_end(w); 691 spdk_json_write_object_end(w); 692 693 TAILQ_FOREACH(bdev_module, &g_bdev_mgr.bdev_modules, internal.tailq) { 694 if (bdev_module->config_json) { 695 bdev_module->config_json(w); 696 } 697 } 698 699 TAILQ_FOREACH(bdev, &g_bdev_mgr.bdevs, internal.link) { 700 spdk_bdev_qos_config_json(bdev, w); 701 702 if (bdev->fn_table->write_config_json) { 703 bdev->fn_table->write_config_json(bdev, w); 704 } 705 } 706 707 spdk_json_write_array_end(w); 708 } 709 710 static int 711 spdk_bdev_mgmt_channel_create(void *io_device, void *ctx_buf) 712 { 713 struct spdk_bdev_mgmt_channel *ch = ctx_buf; 714 struct spdk_bdev_io *bdev_io; 715 uint32_t i; 716 717 STAILQ_INIT(&ch->need_buf_small); 718 STAILQ_INIT(&ch->need_buf_large); 719 720 STAILQ_INIT(&ch->per_thread_cache); 721 ch->bdev_io_cache_size = g_bdev_opts.bdev_io_cache_size; 722 723 /* Pre-populate bdev_io cache to ensure this thread cannot be starved. */ 724 ch->per_thread_cache_count = 0; 725 for (i = 0; i < ch->bdev_io_cache_size; i++) { 726 bdev_io = spdk_mempool_get(g_bdev_mgr.bdev_io_pool); 727 assert(bdev_io != NULL); 728 ch->per_thread_cache_count++; 729 STAILQ_INSERT_TAIL(&ch->per_thread_cache, bdev_io, internal.buf_link); 730 } 731 732 TAILQ_INIT(&ch->shared_resources); 733 TAILQ_INIT(&ch->io_wait_queue); 734 735 return 0; 736 } 737 738 static void 739 spdk_bdev_mgmt_channel_destroy(void *io_device, void *ctx_buf) 740 { 741 struct spdk_bdev_mgmt_channel *ch = ctx_buf; 742 struct spdk_bdev_io *bdev_io; 743 744 if (!STAILQ_EMPTY(&ch->need_buf_small) || !STAILQ_EMPTY(&ch->need_buf_large)) { 745 SPDK_ERRLOG("Pending I/O list wasn't empty on mgmt channel free\n"); 746 } 747 748 if (!TAILQ_EMPTY(&ch->shared_resources)) { 749 SPDK_ERRLOG("Module channel list wasn't empty on mgmt channel free\n"); 750 } 751 752 while (!STAILQ_EMPTY(&ch->per_thread_cache)) { 753 bdev_io = STAILQ_FIRST(&ch->per_thread_cache); 754 STAILQ_REMOVE_HEAD(&ch->per_thread_cache, internal.buf_link); 755 ch->per_thread_cache_count--; 756 spdk_mempool_put(g_bdev_mgr.bdev_io_pool, (void *)bdev_io); 757 } 758 759 assert(ch->per_thread_cache_count == 0); 760 } 761 762 static void 763 spdk_bdev_init_complete(int rc) 764 { 765 spdk_bdev_init_cb cb_fn = g_init_cb_fn; 766 void *cb_arg = g_init_cb_arg; 767 struct spdk_bdev_module *m; 768 769 g_bdev_mgr.init_complete = true; 770 g_init_cb_fn = NULL; 771 g_init_cb_arg = NULL; 772 773 /* 774 * For modules that need to know when subsystem init is complete, 775 * inform them now. 776 */ 777 if (rc == 0) { 778 TAILQ_FOREACH(m, &g_bdev_mgr.bdev_modules, internal.tailq) { 779 if (m->init_complete) { 780 m->init_complete(); 781 } 782 } 783 } 784 785 cb_fn(cb_arg, rc); 786 } 787 788 static void 789 spdk_bdev_module_action_complete(void) 790 { 791 struct spdk_bdev_module *m; 792 793 /* 794 * Don't finish bdev subsystem initialization if 795 * module pre-initialization is still in progress, or 796 * the subsystem been already initialized. 797 */ 798 if (!g_bdev_mgr.module_init_complete || g_bdev_mgr.init_complete) { 799 return; 800 } 801 802 /* 803 * Check all bdev modules for inits/examinations in progress. If any 804 * exist, return immediately since we cannot finish bdev subsystem 805 * initialization until all are completed. 806 */ 807 TAILQ_FOREACH(m, &g_bdev_mgr.bdev_modules, internal.tailq) { 808 if (m->internal.action_in_progress > 0) { 809 return; 810 } 811 } 812 813 /* 814 * Modules already finished initialization - now that all 815 * the bdev modules have finished their asynchronous I/O 816 * processing, the entire bdev layer can be marked as complete. 817 */ 818 spdk_bdev_init_complete(0); 819 } 820 821 static void 822 spdk_bdev_module_action_done(struct spdk_bdev_module *module) 823 { 824 assert(module->internal.action_in_progress > 0); 825 module->internal.action_in_progress--; 826 spdk_bdev_module_action_complete(); 827 } 828 829 void 830 spdk_bdev_module_init_done(struct spdk_bdev_module *module) 831 { 832 spdk_bdev_module_action_done(module); 833 } 834 835 void 836 spdk_bdev_module_examine_done(struct spdk_bdev_module *module) 837 { 838 spdk_bdev_module_action_done(module); 839 } 840 841 /** The last initialized bdev module */ 842 static struct spdk_bdev_module *g_resume_bdev_module = NULL; 843 844 static int 845 spdk_bdev_modules_init(void) 846 { 847 struct spdk_bdev_module *module; 848 int rc = 0; 849 850 TAILQ_FOREACH(module, &g_bdev_mgr.bdev_modules, internal.tailq) { 851 g_resume_bdev_module = module; 852 rc = module->module_init(); 853 if (rc != 0) { 854 return rc; 855 } 856 } 857 858 g_resume_bdev_module = NULL; 859 return 0; 860 } 861 862 863 static void 864 spdk_bdev_init_failed_complete(void *cb_arg) 865 { 866 spdk_bdev_init_complete(-1); 867 } 868 869 static void 870 spdk_bdev_init_failed(void *cb_arg) 871 { 872 spdk_bdev_finish(spdk_bdev_init_failed_complete, NULL); 873 } 874 875 void 876 spdk_bdev_initialize(spdk_bdev_init_cb cb_fn, void *cb_arg) 877 { 878 struct spdk_conf_section *sp; 879 struct spdk_bdev_opts bdev_opts; 880 int32_t bdev_io_pool_size, bdev_io_cache_size; 881 int cache_size; 882 int rc = 0; 883 char mempool_name[32]; 884 885 assert(cb_fn != NULL); 886 887 sp = spdk_conf_find_section(NULL, "Bdev"); 888 if (sp != NULL) { 889 spdk_bdev_get_opts(&bdev_opts); 890 891 bdev_io_pool_size = spdk_conf_section_get_intval(sp, "BdevIoPoolSize"); 892 if (bdev_io_pool_size >= 0) { 893 bdev_opts.bdev_io_pool_size = bdev_io_pool_size; 894 } 895 896 bdev_io_cache_size = spdk_conf_section_get_intval(sp, "BdevIoCacheSize"); 897 if (bdev_io_cache_size >= 0) { 898 bdev_opts.bdev_io_cache_size = bdev_io_cache_size; 899 } 900 901 if (spdk_bdev_set_opts(&bdev_opts)) { 902 spdk_bdev_init_complete(-1); 903 return; 904 } 905 906 assert(memcmp(&bdev_opts, &g_bdev_opts, sizeof(bdev_opts)) == 0); 907 } 908 909 g_init_cb_fn = cb_fn; 910 g_init_cb_arg = cb_arg; 911 912 snprintf(mempool_name, sizeof(mempool_name), "bdev_io_%d", getpid()); 913 914 g_bdev_mgr.bdev_io_pool = spdk_mempool_create(mempool_name, 915 g_bdev_opts.bdev_io_pool_size, 916 sizeof(struct spdk_bdev_io) + 917 spdk_bdev_module_get_max_ctx_size(), 918 0, 919 SPDK_ENV_SOCKET_ID_ANY); 920 921 if (g_bdev_mgr.bdev_io_pool == NULL) { 922 SPDK_ERRLOG("could not allocate spdk_bdev_io pool\n"); 923 spdk_bdev_init_complete(-1); 924 return; 925 } 926 927 /** 928 * Ensure no more than half of the total buffers end up local caches, by 929 * using spdk_thread_get_count() to determine how many local caches we need 930 * to account for. 931 */ 932 cache_size = BUF_SMALL_POOL_SIZE / (2 * spdk_thread_get_count()); 933 snprintf(mempool_name, sizeof(mempool_name), "buf_small_pool_%d", getpid()); 934 935 g_bdev_mgr.buf_small_pool = spdk_mempool_create(mempool_name, 936 BUF_SMALL_POOL_SIZE, 937 SPDK_BDEV_SMALL_BUF_MAX_SIZE + SPDK_BDEV_POOL_ALIGNMENT, 938 cache_size, 939 SPDK_ENV_SOCKET_ID_ANY); 940 if (!g_bdev_mgr.buf_small_pool) { 941 SPDK_ERRLOG("create rbuf small pool failed\n"); 942 spdk_bdev_init_complete(-1); 943 return; 944 } 945 946 cache_size = BUF_LARGE_POOL_SIZE / (2 * spdk_thread_get_count()); 947 snprintf(mempool_name, sizeof(mempool_name), "buf_large_pool_%d", getpid()); 948 949 g_bdev_mgr.buf_large_pool = spdk_mempool_create(mempool_name, 950 BUF_LARGE_POOL_SIZE, 951 SPDK_BDEV_LARGE_BUF_MAX_SIZE + SPDK_BDEV_POOL_ALIGNMENT, 952 cache_size, 953 SPDK_ENV_SOCKET_ID_ANY); 954 if (!g_bdev_mgr.buf_large_pool) { 955 SPDK_ERRLOG("create rbuf large pool failed\n"); 956 spdk_bdev_init_complete(-1); 957 return; 958 } 959 960 g_bdev_mgr.zero_buffer = spdk_dma_zmalloc(ZERO_BUFFER_SIZE, ZERO_BUFFER_SIZE, 961 NULL); 962 if (!g_bdev_mgr.zero_buffer) { 963 SPDK_ERRLOG("create bdev zero buffer failed\n"); 964 spdk_bdev_init_complete(-1); 965 return; 966 } 967 968 #ifdef SPDK_CONFIG_VTUNE 969 g_bdev_mgr.domain = __itt_domain_create("spdk_bdev"); 970 #endif 971 972 spdk_io_device_register(&g_bdev_mgr, spdk_bdev_mgmt_channel_create, 973 spdk_bdev_mgmt_channel_destroy, 974 sizeof(struct spdk_bdev_mgmt_channel), 975 "bdev_mgr"); 976 977 rc = spdk_bdev_modules_init(); 978 g_bdev_mgr.module_init_complete = true; 979 if (rc != 0) { 980 SPDK_ERRLOG("bdev modules init failed\n"); 981 spdk_thread_send_msg(spdk_get_thread(), spdk_bdev_init_failed, NULL); 982 return; 983 } 984 985 spdk_bdev_module_action_complete(); 986 } 987 988 static void 989 spdk_bdev_mgr_unregister_cb(void *io_device) 990 { 991 spdk_bdev_fini_cb cb_fn = g_fini_cb_fn; 992 993 if (spdk_mempool_count(g_bdev_mgr.bdev_io_pool) != g_bdev_opts.bdev_io_pool_size) { 994 SPDK_ERRLOG("bdev IO pool count is %zu but should be %u\n", 995 spdk_mempool_count(g_bdev_mgr.bdev_io_pool), 996 g_bdev_opts.bdev_io_pool_size); 997 } 998 999 if (spdk_mempool_count(g_bdev_mgr.buf_small_pool) != BUF_SMALL_POOL_SIZE) { 1000 SPDK_ERRLOG("Small buffer pool count is %zu but should be %u\n", 1001 spdk_mempool_count(g_bdev_mgr.buf_small_pool), 1002 BUF_SMALL_POOL_SIZE); 1003 assert(false); 1004 } 1005 1006 if (spdk_mempool_count(g_bdev_mgr.buf_large_pool) != BUF_LARGE_POOL_SIZE) { 1007 SPDK_ERRLOG("Large buffer pool count is %zu but should be %u\n", 1008 spdk_mempool_count(g_bdev_mgr.buf_large_pool), 1009 BUF_LARGE_POOL_SIZE); 1010 assert(false); 1011 } 1012 1013 spdk_mempool_free(g_bdev_mgr.bdev_io_pool); 1014 spdk_mempool_free(g_bdev_mgr.buf_small_pool); 1015 spdk_mempool_free(g_bdev_mgr.buf_large_pool); 1016 spdk_dma_free(g_bdev_mgr.zero_buffer); 1017 1018 cb_fn(g_fini_cb_arg); 1019 g_fini_cb_fn = NULL; 1020 g_fini_cb_arg = NULL; 1021 g_bdev_mgr.init_complete = false; 1022 g_bdev_mgr.module_init_complete = false; 1023 } 1024 1025 static void 1026 spdk_bdev_module_finish_iter(void *arg) 1027 { 1028 struct spdk_bdev_module *bdev_module; 1029 1030 /* Start iterating from the last touched module */ 1031 if (!g_resume_bdev_module) { 1032 bdev_module = TAILQ_LAST(&g_bdev_mgr.bdev_modules, bdev_module_list); 1033 } else { 1034 bdev_module = TAILQ_PREV(g_resume_bdev_module, bdev_module_list, 1035 internal.tailq); 1036 } 1037 1038 while (bdev_module) { 1039 if (bdev_module->async_fini) { 1040 /* Save our place so we can resume later. We must 1041 * save the variable here, before calling module_fini() 1042 * below, because in some cases the module may immediately 1043 * call spdk_bdev_module_finish_done() and re-enter 1044 * this function to continue iterating. */ 1045 g_resume_bdev_module = bdev_module; 1046 } 1047 1048 if (bdev_module->module_fini) { 1049 bdev_module->module_fini(); 1050 } 1051 1052 if (bdev_module->async_fini) { 1053 return; 1054 } 1055 1056 bdev_module = TAILQ_PREV(bdev_module, bdev_module_list, 1057 internal.tailq); 1058 } 1059 1060 g_resume_bdev_module = NULL; 1061 spdk_io_device_unregister(&g_bdev_mgr, spdk_bdev_mgr_unregister_cb); 1062 } 1063 1064 void 1065 spdk_bdev_module_finish_done(void) 1066 { 1067 if (spdk_get_thread() != g_fini_thread) { 1068 spdk_thread_send_msg(g_fini_thread, spdk_bdev_module_finish_iter, NULL); 1069 } else { 1070 spdk_bdev_module_finish_iter(NULL); 1071 } 1072 } 1073 1074 static void 1075 _spdk_bdev_finish_unregister_bdevs_iter(void *cb_arg, int bdeverrno) 1076 { 1077 struct spdk_bdev *bdev = cb_arg; 1078 1079 if (bdeverrno && bdev) { 1080 SPDK_WARNLOG("Unable to unregister bdev '%s' during spdk_bdev_finish()\n", 1081 bdev->name); 1082 1083 /* 1084 * Since the call to spdk_bdev_unregister() failed, we have no way to free this 1085 * bdev; try to continue by manually removing this bdev from the list and continue 1086 * with the next bdev in the list. 1087 */ 1088 TAILQ_REMOVE(&g_bdev_mgr.bdevs, bdev, internal.link); 1089 } 1090 1091 if (TAILQ_EMPTY(&g_bdev_mgr.bdevs)) { 1092 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Done unregistering bdevs\n"); 1093 /* 1094 * Bdev module finish need to be deferred as we might be in the middle of some context 1095 * (like bdev part free) that will use this bdev (or private bdev driver ctx data) 1096 * after returning. 1097 */ 1098 spdk_thread_send_msg(spdk_get_thread(), spdk_bdev_module_finish_iter, NULL); 1099 return; 1100 } 1101 1102 /* 1103 * Unregister last unclaimed bdev in the list, to ensure that bdev subsystem 1104 * shutdown proceeds top-down. The goal is to give virtual bdevs an opportunity 1105 * to detect clean shutdown as opposed to run-time hot removal of the underlying 1106 * base bdevs. 1107 * 1108 * Also, walk the list in the reverse order. 1109 */ 1110 for (bdev = TAILQ_LAST(&g_bdev_mgr.bdevs, spdk_bdev_list); 1111 bdev; bdev = TAILQ_PREV(bdev, spdk_bdev_list, internal.link)) { 1112 if (bdev->internal.claim_module != NULL) { 1113 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Skipping claimed bdev '%s'(<-'%s').\n", 1114 bdev->name, bdev->internal.claim_module->name); 1115 continue; 1116 } 1117 1118 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Unregistering bdev '%s'\n", bdev->name); 1119 spdk_bdev_unregister(bdev, _spdk_bdev_finish_unregister_bdevs_iter, bdev); 1120 return; 1121 } 1122 1123 /* 1124 * If any bdev fails to unclaim underlying bdev properly, we may face the 1125 * case of bdev list consisting of claimed bdevs only (if claims are managed 1126 * correctly, this would mean there's a loop in the claims graph which is 1127 * clearly impossible). Warn and unregister last bdev on the list then. 1128 */ 1129 for (bdev = TAILQ_LAST(&g_bdev_mgr.bdevs, spdk_bdev_list); 1130 bdev; bdev = TAILQ_PREV(bdev, spdk_bdev_list, internal.link)) { 1131 SPDK_ERRLOG("Unregistering claimed bdev '%s'!\n", bdev->name); 1132 spdk_bdev_unregister(bdev, _spdk_bdev_finish_unregister_bdevs_iter, bdev); 1133 return; 1134 } 1135 } 1136 1137 void 1138 spdk_bdev_finish(spdk_bdev_fini_cb cb_fn, void *cb_arg) 1139 { 1140 struct spdk_bdev_module *m; 1141 1142 assert(cb_fn != NULL); 1143 1144 g_fini_thread = spdk_get_thread(); 1145 1146 g_fini_cb_fn = cb_fn; 1147 g_fini_cb_arg = cb_arg; 1148 1149 TAILQ_FOREACH(m, &g_bdev_mgr.bdev_modules, internal.tailq) { 1150 if (m->fini_start) { 1151 m->fini_start(); 1152 } 1153 } 1154 1155 _spdk_bdev_finish_unregister_bdevs_iter(NULL, 0); 1156 } 1157 1158 static struct spdk_bdev_io * 1159 spdk_bdev_get_io(struct spdk_bdev_channel *channel) 1160 { 1161 struct spdk_bdev_mgmt_channel *ch = channel->shared_resource->mgmt_ch; 1162 struct spdk_bdev_io *bdev_io; 1163 1164 if (ch->per_thread_cache_count > 0) { 1165 bdev_io = STAILQ_FIRST(&ch->per_thread_cache); 1166 STAILQ_REMOVE_HEAD(&ch->per_thread_cache, internal.buf_link); 1167 ch->per_thread_cache_count--; 1168 } else if (spdk_unlikely(!TAILQ_EMPTY(&ch->io_wait_queue))) { 1169 /* 1170 * Don't try to look for bdev_ios in the global pool if there are 1171 * waiters on bdev_ios - we don't want this caller to jump the line. 1172 */ 1173 bdev_io = NULL; 1174 } else { 1175 bdev_io = spdk_mempool_get(g_bdev_mgr.bdev_io_pool); 1176 } 1177 1178 return bdev_io; 1179 } 1180 1181 void 1182 spdk_bdev_free_io(struct spdk_bdev_io *bdev_io) 1183 { 1184 struct spdk_bdev_mgmt_channel *ch = bdev_io->internal.ch->shared_resource->mgmt_ch; 1185 1186 assert(bdev_io != NULL); 1187 assert(bdev_io->internal.status != SPDK_BDEV_IO_STATUS_PENDING); 1188 1189 if (bdev_io->internal.buf != NULL) { 1190 spdk_bdev_io_put_buf(bdev_io); 1191 } 1192 1193 if (ch->per_thread_cache_count < ch->bdev_io_cache_size) { 1194 ch->per_thread_cache_count++; 1195 STAILQ_INSERT_TAIL(&ch->per_thread_cache, bdev_io, internal.buf_link); 1196 while (ch->per_thread_cache_count > 0 && !TAILQ_EMPTY(&ch->io_wait_queue)) { 1197 struct spdk_bdev_io_wait_entry *entry; 1198 1199 entry = TAILQ_FIRST(&ch->io_wait_queue); 1200 TAILQ_REMOVE(&ch->io_wait_queue, entry, link); 1201 entry->cb_fn(entry->cb_arg); 1202 } 1203 } else { 1204 /* We should never have a full cache with entries on the io wait queue. */ 1205 assert(TAILQ_EMPTY(&ch->io_wait_queue)); 1206 spdk_mempool_put(g_bdev_mgr.bdev_io_pool, (void *)bdev_io); 1207 } 1208 } 1209 1210 static bool 1211 _spdk_bdev_qos_is_iops_rate_limit(enum spdk_bdev_qos_rate_limit_type limit) 1212 { 1213 assert(limit != SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES); 1214 1215 switch (limit) { 1216 case SPDK_BDEV_QOS_RW_IOPS_RATE_LIMIT: 1217 return true; 1218 case SPDK_BDEV_QOS_RW_BPS_RATE_LIMIT: 1219 case SPDK_BDEV_QOS_R_BPS_RATE_LIMIT: 1220 case SPDK_BDEV_QOS_W_BPS_RATE_LIMIT: 1221 return false; 1222 case SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES: 1223 default: 1224 return false; 1225 } 1226 } 1227 1228 static bool 1229 _spdk_bdev_qos_io_to_limit(struct spdk_bdev_io *bdev_io) 1230 { 1231 switch (bdev_io->type) { 1232 case SPDK_BDEV_IO_TYPE_NVME_IO: 1233 case SPDK_BDEV_IO_TYPE_NVME_IO_MD: 1234 case SPDK_BDEV_IO_TYPE_READ: 1235 case SPDK_BDEV_IO_TYPE_WRITE: 1236 return true; 1237 default: 1238 return false; 1239 } 1240 } 1241 1242 static bool 1243 _spdk_bdev_is_read_io(struct spdk_bdev_io *bdev_io) 1244 { 1245 switch (bdev_io->type) { 1246 case SPDK_BDEV_IO_TYPE_NVME_IO: 1247 case SPDK_BDEV_IO_TYPE_NVME_IO_MD: 1248 /* Bit 1 (0x2) set for read operation */ 1249 if (bdev_io->u.nvme_passthru.cmd.opc & SPDK_NVME_OPC_READ) { 1250 return true; 1251 } else { 1252 return false; 1253 } 1254 case SPDK_BDEV_IO_TYPE_READ: 1255 return true; 1256 default: 1257 return false; 1258 } 1259 } 1260 1261 static uint64_t 1262 _spdk_bdev_get_io_size_in_byte(struct spdk_bdev_io *bdev_io) 1263 { 1264 struct spdk_bdev *bdev = bdev_io->bdev; 1265 1266 switch (bdev_io->type) { 1267 case SPDK_BDEV_IO_TYPE_NVME_IO: 1268 case SPDK_BDEV_IO_TYPE_NVME_IO_MD: 1269 return bdev_io->u.nvme_passthru.nbytes; 1270 case SPDK_BDEV_IO_TYPE_READ: 1271 case SPDK_BDEV_IO_TYPE_WRITE: 1272 return bdev_io->u.bdev.num_blocks * bdev->blocklen; 1273 default: 1274 return 0; 1275 } 1276 } 1277 1278 static bool 1279 _spdk_bdev_qos_rw_queue_io(const struct spdk_bdev_qos_limit *limit, struct spdk_bdev_io *io) 1280 { 1281 if (limit->max_per_timeslice > 0 && limit->remaining_this_timeslice <= 0) { 1282 return true; 1283 } else { 1284 return false; 1285 } 1286 } 1287 1288 static bool 1289 _spdk_bdev_qos_r_queue_io(const struct spdk_bdev_qos_limit *limit, struct spdk_bdev_io *io) 1290 { 1291 if (_spdk_bdev_is_read_io(io) == false) { 1292 return false; 1293 } 1294 1295 return _spdk_bdev_qos_rw_queue_io(limit, io); 1296 } 1297 1298 static bool 1299 _spdk_bdev_qos_w_queue_io(const struct spdk_bdev_qos_limit *limit, struct spdk_bdev_io *io) 1300 { 1301 if (_spdk_bdev_is_read_io(io) == true) { 1302 return false; 1303 } 1304 1305 return _spdk_bdev_qos_rw_queue_io(limit, io); 1306 } 1307 1308 static void 1309 _spdk_bdev_qos_rw_iops_update_quota(struct spdk_bdev_qos_limit *limit, struct spdk_bdev_io *io) 1310 { 1311 limit->remaining_this_timeslice--; 1312 } 1313 1314 static void 1315 _spdk_bdev_qos_rw_bps_update_quota(struct spdk_bdev_qos_limit *limit, struct spdk_bdev_io *io) 1316 { 1317 limit->remaining_this_timeslice -= _spdk_bdev_get_io_size_in_byte(io); 1318 } 1319 1320 static void 1321 _spdk_bdev_qos_r_bps_update_quota(struct spdk_bdev_qos_limit *limit, struct spdk_bdev_io *io) 1322 { 1323 if (_spdk_bdev_is_read_io(io) == false) { 1324 return; 1325 } 1326 1327 return _spdk_bdev_qos_rw_bps_update_quota(limit, io); 1328 } 1329 1330 static void 1331 _spdk_bdev_qos_w_bps_update_quota(struct spdk_bdev_qos_limit *limit, struct spdk_bdev_io *io) 1332 { 1333 if (_spdk_bdev_is_read_io(io) == true) { 1334 return; 1335 } 1336 1337 return _spdk_bdev_qos_rw_bps_update_quota(limit, io); 1338 } 1339 1340 static void 1341 _spdk_bdev_qos_set_ops(struct spdk_bdev_qos *qos) 1342 { 1343 int i; 1344 1345 for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) { 1346 if (qos->rate_limits[i].limit == SPDK_BDEV_QOS_LIMIT_NOT_DEFINED) { 1347 qos->rate_limits[i].queue_io = NULL; 1348 qos->rate_limits[i].update_quota = NULL; 1349 continue; 1350 } 1351 1352 switch (i) { 1353 case SPDK_BDEV_QOS_RW_IOPS_RATE_LIMIT: 1354 qos->rate_limits[i].queue_io = _spdk_bdev_qos_rw_queue_io; 1355 qos->rate_limits[i].update_quota = _spdk_bdev_qos_rw_iops_update_quota; 1356 break; 1357 case SPDK_BDEV_QOS_RW_BPS_RATE_LIMIT: 1358 qos->rate_limits[i].queue_io = _spdk_bdev_qos_rw_queue_io; 1359 qos->rate_limits[i].update_quota = _spdk_bdev_qos_rw_bps_update_quota; 1360 break; 1361 case SPDK_BDEV_QOS_R_BPS_RATE_LIMIT: 1362 qos->rate_limits[i].queue_io = _spdk_bdev_qos_r_queue_io; 1363 qos->rate_limits[i].update_quota = _spdk_bdev_qos_r_bps_update_quota; 1364 break; 1365 case SPDK_BDEV_QOS_W_BPS_RATE_LIMIT: 1366 qos->rate_limits[i].queue_io = _spdk_bdev_qos_w_queue_io; 1367 qos->rate_limits[i].update_quota = _spdk_bdev_qos_w_bps_update_quota; 1368 break; 1369 default: 1370 break; 1371 } 1372 } 1373 } 1374 1375 static int 1376 _spdk_bdev_qos_io_submit(struct spdk_bdev_channel *ch, struct spdk_bdev_qos *qos) 1377 { 1378 struct spdk_bdev_io *bdev_io = NULL, *tmp = NULL; 1379 struct spdk_bdev *bdev = ch->bdev; 1380 struct spdk_bdev_shared_resource *shared_resource = ch->shared_resource; 1381 int i, submitted_ios = 0; 1382 1383 TAILQ_FOREACH_SAFE(bdev_io, &qos->queued, internal.link, tmp) { 1384 if (_spdk_bdev_qos_io_to_limit(bdev_io) == true) { 1385 for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) { 1386 if (!qos->rate_limits[i].queue_io) { 1387 continue; 1388 } 1389 1390 if (qos->rate_limits[i].queue_io(&qos->rate_limits[i], 1391 bdev_io) == true) { 1392 return submitted_ios; 1393 } 1394 } 1395 for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) { 1396 if (!qos->rate_limits[i].update_quota) { 1397 continue; 1398 } 1399 1400 qos->rate_limits[i].update_quota(&qos->rate_limits[i], bdev_io); 1401 } 1402 } 1403 1404 TAILQ_REMOVE(&qos->queued, bdev_io, internal.link); 1405 ch->io_outstanding++; 1406 shared_resource->io_outstanding++; 1407 bdev_io->internal.in_submit_request = true; 1408 bdev->fn_table->submit_request(ch->channel, bdev_io); 1409 bdev_io->internal.in_submit_request = false; 1410 submitted_ios++; 1411 } 1412 1413 return submitted_ios; 1414 } 1415 1416 static void 1417 _spdk_bdev_queue_io_wait_with_cb(struct spdk_bdev_io *bdev_io, spdk_bdev_io_wait_cb cb_fn) 1418 { 1419 int rc; 1420 1421 bdev_io->internal.waitq_entry.bdev = bdev_io->bdev; 1422 bdev_io->internal.waitq_entry.cb_fn = cb_fn; 1423 bdev_io->internal.waitq_entry.cb_arg = bdev_io; 1424 rc = spdk_bdev_queue_io_wait(bdev_io->bdev, spdk_io_channel_from_ctx(bdev_io->internal.ch), 1425 &bdev_io->internal.waitq_entry); 1426 if (rc != 0) { 1427 SPDK_ERRLOG("Queue IO failed, rc=%d\n", rc); 1428 bdev_io->internal.status = SPDK_BDEV_IO_STATUS_FAILED; 1429 bdev_io->internal.cb(bdev_io, false, bdev_io->internal.caller_ctx); 1430 } 1431 } 1432 1433 static bool 1434 _spdk_bdev_io_type_can_split(uint8_t type) 1435 { 1436 assert(type != SPDK_BDEV_IO_TYPE_INVALID); 1437 assert(type < SPDK_BDEV_NUM_IO_TYPES); 1438 1439 /* Only split READ and WRITE I/O. Theoretically other types of I/O like 1440 * UNMAP could be split, but these types of I/O are typically much larger 1441 * in size (sometimes the size of the entire block device), and the bdev 1442 * module can more efficiently split these types of I/O. Plus those types 1443 * of I/O do not have a payload, which makes the splitting process simpler. 1444 */ 1445 if (type == SPDK_BDEV_IO_TYPE_READ || type == SPDK_BDEV_IO_TYPE_WRITE) { 1446 return true; 1447 } else { 1448 return false; 1449 } 1450 } 1451 1452 static bool 1453 _spdk_bdev_io_should_split(struct spdk_bdev_io *bdev_io) 1454 { 1455 uint64_t start_stripe, end_stripe; 1456 uint32_t io_boundary = bdev_io->bdev->optimal_io_boundary; 1457 1458 if (io_boundary == 0) { 1459 return false; 1460 } 1461 1462 if (!_spdk_bdev_io_type_can_split(bdev_io->type)) { 1463 return false; 1464 } 1465 1466 start_stripe = bdev_io->u.bdev.offset_blocks; 1467 end_stripe = start_stripe + bdev_io->u.bdev.num_blocks - 1; 1468 /* Avoid expensive div operations if possible. These spdk_u32 functions are very cheap. */ 1469 if (spdk_likely(spdk_u32_is_pow2(io_boundary))) { 1470 start_stripe >>= spdk_u32log2(io_boundary); 1471 end_stripe >>= spdk_u32log2(io_boundary); 1472 } else { 1473 start_stripe /= io_boundary; 1474 end_stripe /= io_boundary; 1475 } 1476 return (start_stripe != end_stripe); 1477 } 1478 1479 static uint32_t 1480 _to_next_boundary(uint64_t offset, uint32_t boundary) 1481 { 1482 return (boundary - (offset % boundary)); 1483 } 1484 1485 static void 1486 _spdk_bdev_io_split_done(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg); 1487 1488 static void 1489 _spdk_bdev_io_split_with_payload(void *_bdev_io) 1490 { 1491 struct spdk_bdev_io *bdev_io = _bdev_io; 1492 uint64_t current_offset, remaining; 1493 uint32_t blocklen, to_next_boundary, to_next_boundary_bytes; 1494 struct iovec *parent_iov, *iov; 1495 uint64_t parent_iov_offset, iov_len; 1496 uint32_t parent_iovpos, parent_iovcnt, child_iovcnt, iovcnt; 1497 int rc; 1498 1499 remaining = bdev_io->u.bdev.split_remaining_num_blocks; 1500 current_offset = bdev_io->u.bdev.split_current_offset_blocks; 1501 blocklen = bdev_io->bdev->blocklen; 1502 parent_iov_offset = (current_offset - bdev_io->u.bdev.offset_blocks) * blocklen; 1503 parent_iovcnt = bdev_io->u.bdev.iovcnt; 1504 1505 for (parent_iovpos = 0; parent_iovpos < parent_iovcnt; parent_iovpos++) { 1506 parent_iov = &bdev_io->u.bdev.iovs[parent_iovpos]; 1507 if (parent_iov_offset < parent_iov->iov_len) { 1508 break; 1509 } 1510 parent_iov_offset -= parent_iov->iov_len; 1511 } 1512 1513 child_iovcnt = 0; 1514 while (remaining > 0 && parent_iovpos < parent_iovcnt && child_iovcnt < BDEV_IO_NUM_CHILD_IOV) { 1515 to_next_boundary = _to_next_boundary(current_offset, bdev_io->bdev->optimal_io_boundary); 1516 to_next_boundary = spdk_min(remaining, to_next_boundary); 1517 to_next_boundary_bytes = to_next_boundary * blocklen; 1518 iov = &bdev_io->child_iov[child_iovcnt]; 1519 iovcnt = 0; 1520 while (to_next_boundary_bytes > 0 && parent_iovpos < parent_iovcnt && 1521 child_iovcnt < BDEV_IO_NUM_CHILD_IOV) { 1522 parent_iov = &bdev_io->u.bdev.iovs[parent_iovpos]; 1523 iov_len = spdk_min(to_next_boundary_bytes, parent_iov->iov_len - parent_iov_offset); 1524 to_next_boundary_bytes -= iov_len; 1525 1526 bdev_io->child_iov[child_iovcnt].iov_base = parent_iov->iov_base + parent_iov_offset; 1527 bdev_io->child_iov[child_iovcnt].iov_len = iov_len; 1528 1529 if (iov_len < parent_iov->iov_len - parent_iov_offset) { 1530 parent_iov_offset += iov_len; 1531 } else { 1532 parent_iovpos++; 1533 parent_iov_offset = 0; 1534 } 1535 child_iovcnt++; 1536 iovcnt++; 1537 } 1538 1539 if (to_next_boundary_bytes > 0) { 1540 /* We had to stop this child I/O early because we ran out of 1541 * child_iov space. Make sure the iovs collected are valid and 1542 * then adjust to_next_boundary before starting the child I/O. 1543 */ 1544 if ((to_next_boundary_bytes % blocklen) != 0) { 1545 SPDK_ERRLOG("Remaining %" PRIu32 " is not multiple of block size %" PRIu32 "\n", 1546 to_next_boundary_bytes, blocklen); 1547 bdev_io->internal.status = SPDK_BDEV_IO_STATUS_FAILED; 1548 if (bdev_io->u.bdev.split_outstanding == 0) { 1549 bdev_io->internal.cb(bdev_io, false, bdev_io->internal.caller_ctx); 1550 } 1551 return; 1552 } 1553 to_next_boundary -= to_next_boundary_bytes / blocklen; 1554 } 1555 1556 bdev_io->u.bdev.split_outstanding++; 1557 1558 if (bdev_io->type == SPDK_BDEV_IO_TYPE_READ) { 1559 rc = spdk_bdev_readv_blocks(bdev_io->internal.desc, 1560 spdk_io_channel_from_ctx(bdev_io->internal.ch), 1561 iov, iovcnt, current_offset, to_next_boundary, 1562 _spdk_bdev_io_split_done, bdev_io); 1563 } else { 1564 rc = spdk_bdev_writev_blocks(bdev_io->internal.desc, 1565 spdk_io_channel_from_ctx(bdev_io->internal.ch), 1566 iov, iovcnt, current_offset, to_next_boundary, 1567 _spdk_bdev_io_split_done, bdev_io); 1568 } 1569 1570 if (rc == 0) { 1571 current_offset += to_next_boundary; 1572 remaining -= to_next_boundary; 1573 bdev_io->u.bdev.split_current_offset_blocks = current_offset; 1574 bdev_io->u.bdev.split_remaining_num_blocks = remaining; 1575 } else { 1576 bdev_io->u.bdev.split_outstanding--; 1577 if (rc == -ENOMEM) { 1578 if (bdev_io->u.bdev.split_outstanding == 0) { 1579 /* No I/O is outstanding. Hence we should wait here. */ 1580 _spdk_bdev_queue_io_wait_with_cb(bdev_io, 1581 _spdk_bdev_io_split_with_payload); 1582 } 1583 } else { 1584 bdev_io->internal.status = SPDK_BDEV_IO_STATUS_FAILED; 1585 if (bdev_io->u.bdev.split_outstanding == 0) { 1586 bdev_io->internal.cb(bdev_io, false, bdev_io->internal.caller_ctx); 1587 } 1588 } 1589 1590 return; 1591 } 1592 } 1593 } 1594 1595 static void 1596 _spdk_bdev_io_split_done(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg) 1597 { 1598 struct spdk_bdev_io *parent_io = cb_arg; 1599 1600 spdk_bdev_free_io(bdev_io); 1601 1602 if (!success) { 1603 parent_io->internal.status = SPDK_BDEV_IO_STATUS_FAILED; 1604 } 1605 parent_io->u.bdev.split_outstanding--; 1606 if (parent_io->u.bdev.split_outstanding != 0) { 1607 return; 1608 } 1609 1610 /* 1611 * Parent I/O finishes when all blocks are consumed or there is any failure of 1612 * child I/O and no outstanding child I/O. 1613 */ 1614 if (parent_io->u.bdev.split_remaining_num_blocks == 0 || 1615 parent_io->internal.status != SPDK_BDEV_IO_STATUS_SUCCESS) { 1616 parent_io->internal.cb(parent_io, parent_io->internal.status == SPDK_BDEV_IO_STATUS_SUCCESS, 1617 parent_io->internal.caller_ctx); 1618 return; 1619 } 1620 1621 /* 1622 * Continue with the splitting process. This function will complete the parent I/O if the 1623 * splitting is done. 1624 */ 1625 _spdk_bdev_io_split_with_payload(parent_io); 1626 } 1627 1628 static void 1629 _spdk_bdev_io_split(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io) 1630 { 1631 assert(_spdk_bdev_io_type_can_split(bdev_io->type)); 1632 1633 bdev_io->u.bdev.split_current_offset_blocks = bdev_io->u.bdev.offset_blocks; 1634 bdev_io->u.bdev.split_remaining_num_blocks = bdev_io->u.bdev.num_blocks; 1635 bdev_io->u.bdev.split_outstanding = 0; 1636 bdev_io->internal.status = SPDK_BDEV_IO_STATUS_SUCCESS; 1637 1638 _spdk_bdev_io_split_with_payload(bdev_io); 1639 } 1640 1641 static void 1642 _spdk_bdev_io_submit(void *ctx) 1643 { 1644 struct spdk_bdev_io *bdev_io = ctx; 1645 struct spdk_bdev *bdev = bdev_io->bdev; 1646 struct spdk_bdev_channel *bdev_ch = bdev_io->internal.ch; 1647 struct spdk_io_channel *ch = bdev_ch->channel; 1648 struct spdk_bdev_shared_resource *shared_resource = bdev_ch->shared_resource; 1649 uint64_t tsc; 1650 1651 tsc = spdk_get_ticks(); 1652 bdev_io->internal.submit_tsc = tsc; 1653 spdk_trace_record_tsc(tsc, TRACE_BDEV_IO_START, 0, 0, (uintptr_t)bdev_io, bdev_io->type); 1654 bdev_ch->io_outstanding++; 1655 shared_resource->io_outstanding++; 1656 bdev_io->internal.in_submit_request = true; 1657 if (spdk_likely(bdev_ch->flags == 0)) { 1658 if (spdk_likely(TAILQ_EMPTY(&shared_resource->nomem_io))) { 1659 bdev->fn_table->submit_request(ch, bdev_io); 1660 } else { 1661 bdev_ch->io_outstanding--; 1662 shared_resource->io_outstanding--; 1663 TAILQ_INSERT_TAIL(&shared_resource->nomem_io, bdev_io, internal.link); 1664 } 1665 } else if (bdev_ch->flags & BDEV_CH_RESET_IN_PROGRESS) { 1666 spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED); 1667 } else if (bdev_ch->flags & BDEV_CH_QOS_ENABLED) { 1668 bdev_ch->io_outstanding--; 1669 shared_resource->io_outstanding--; 1670 TAILQ_INSERT_TAIL(&bdev->internal.qos->queued, bdev_io, internal.link); 1671 _spdk_bdev_qos_io_submit(bdev_ch, bdev->internal.qos); 1672 } else { 1673 SPDK_ERRLOG("unknown bdev_ch flag %x found\n", bdev_ch->flags); 1674 spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED); 1675 } 1676 bdev_io->internal.in_submit_request = false; 1677 } 1678 1679 static void 1680 spdk_bdev_io_submit(struct spdk_bdev_io *bdev_io) 1681 { 1682 struct spdk_bdev *bdev = bdev_io->bdev; 1683 struct spdk_thread *thread = spdk_io_channel_get_thread(bdev_io->internal.ch->channel); 1684 1685 assert(thread != NULL); 1686 assert(bdev_io->internal.status == SPDK_BDEV_IO_STATUS_PENDING); 1687 1688 if (bdev->split_on_optimal_io_boundary && _spdk_bdev_io_should_split(bdev_io)) { 1689 if (bdev_io->type == SPDK_BDEV_IO_TYPE_READ) { 1690 spdk_bdev_io_get_buf(bdev_io, _spdk_bdev_io_split, 1691 bdev_io->u.bdev.num_blocks * bdev_io->bdev->blocklen); 1692 } else { 1693 _spdk_bdev_io_split(NULL, bdev_io); 1694 } 1695 return; 1696 } 1697 1698 if (bdev_io->internal.ch->flags & BDEV_CH_QOS_ENABLED) { 1699 if ((thread == bdev->internal.qos->thread) || !bdev->internal.qos->thread) { 1700 _spdk_bdev_io_submit(bdev_io); 1701 } else { 1702 bdev_io->internal.io_submit_ch = bdev_io->internal.ch; 1703 bdev_io->internal.ch = bdev->internal.qos->ch; 1704 spdk_thread_send_msg(bdev->internal.qos->thread, _spdk_bdev_io_submit, bdev_io); 1705 } 1706 } else { 1707 _spdk_bdev_io_submit(bdev_io); 1708 } 1709 } 1710 1711 static void 1712 spdk_bdev_io_submit_reset(struct spdk_bdev_io *bdev_io) 1713 { 1714 struct spdk_bdev *bdev = bdev_io->bdev; 1715 struct spdk_bdev_channel *bdev_ch = bdev_io->internal.ch; 1716 struct spdk_io_channel *ch = bdev_ch->channel; 1717 1718 assert(bdev_io->internal.status == SPDK_BDEV_IO_STATUS_PENDING); 1719 1720 bdev_io->internal.in_submit_request = true; 1721 bdev->fn_table->submit_request(ch, bdev_io); 1722 bdev_io->internal.in_submit_request = false; 1723 } 1724 1725 static void 1726 spdk_bdev_io_init(struct spdk_bdev_io *bdev_io, 1727 struct spdk_bdev *bdev, void *cb_arg, 1728 spdk_bdev_io_completion_cb cb) 1729 { 1730 bdev_io->bdev = bdev; 1731 bdev_io->internal.caller_ctx = cb_arg; 1732 bdev_io->internal.cb = cb; 1733 bdev_io->internal.status = SPDK_BDEV_IO_STATUS_PENDING; 1734 bdev_io->internal.in_submit_request = false; 1735 bdev_io->internal.buf = NULL; 1736 bdev_io->internal.io_submit_ch = NULL; 1737 bdev_io->internal.orig_iovs = NULL; 1738 bdev_io->internal.orig_iovcnt = 0; 1739 } 1740 1741 static bool 1742 _spdk_bdev_io_type_supported(struct spdk_bdev *bdev, enum spdk_bdev_io_type io_type) 1743 { 1744 return bdev->fn_table->io_type_supported(bdev->ctxt, io_type); 1745 } 1746 1747 bool 1748 spdk_bdev_io_type_supported(struct spdk_bdev *bdev, enum spdk_bdev_io_type io_type) 1749 { 1750 bool supported; 1751 1752 supported = _spdk_bdev_io_type_supported(bdev, io_type); 1753 1754 if (!supported) { 1755 switch (io_type) { 1756 case SPDK_BDEV_IO_TYPE_WRITE_ZEROES: 1757 /* The bdev layer will emulate write zeroes as long as write is supported. */ 1758 supported = _spdk_bdev_io_type_supported(bdev, SPDK_BDEV_IO_TYPE_WRITE); 1759 break; 1760 default: 1761 break; 1762 } 1763 } 1764 1765 return supported; 1766 } 1767 1768 int 1769 spdk_bdev_dump_info_json(struct spdk_bdev *bdev, struct spdk_json_write_ctx *w) 1770 { 1771 if (bdev->fn_table->dump_info_json) { 1772 return bdev->fn_table->dump_info_json(bdev->ctxt, w); 1773 } 1774 1775 return 0; 1776 } 1777 1778 static void 1779 spdk_bdev_qos_update_max_quota_per_timeslice(struct spdk_bdev_qos *qos) 1780 { 1781 uint32_t max_per_timeslice = 0; 1782 int i; 1783 1784 for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) { 1785 if (qos->rate_limits[i].limit == SPDK_BDEV_QOS_LIMIT_NOT_DEFINED) { 1786 qos->rate_limits[i].max_per_timeslice = 0; 1787 continue; 1788 } 1789 1790 max_per_timeslice = qos->rate_limits[i].limit * 1791 SPDK_BDEV_QOS_TIMESLICE_IN_USEC / SPDK_SEC_TO_USEC; 1792 1793 qos->rate_limits[i].max_per_timeslice = spdk_max(max_per_timeslice, 1794 qos->rate_limits[i].min_per_timeslice); 1795 1796 qos->rate_limits[i].remaining_this_timeslice = qos->rate_limits[i].max_per_timeslice; 1797 } 1798 1799 _spdk_bdev_qos_set_ops(qos); 1800 } 1801 1802 static int 1803 spdk_bdev_channel_poll_qos(void *arg) 1804 { 1805 struct spdk_bdev_qos *qos = arg; 1806 uint64_t now = spdk_get_ticks(); 1807 int i; 1808 1809 if (now < (qos->last_timeslice + qos->timeslice_size)) { 1810 /* We received our callback earlier than expected - return 1811 * immediately and wait to do accounting until at least one 1812 * timeslice has actually expired. This should never happen 1813 * with a well-behaved timer implementation. 1814 */ 1815 return 0; 1816 } 1817 1818 /* Reset for next round of rate limiting */ 1819 for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) { 1820 /* We may have allowed the IOs or bytes to slightly overrun in the last 1821 * timeslice. remaining_this_timeslice is signed, so if it's negative 1822 * here, we'll account for the overrun so that the next timeslice will 1823 * be appropriately reduced. 1824 */ 1825 if (qos->rate_limits[i].remaining_this_timeslice > 0) { 1826 qos->rate_limits[i].remaining_this_timeslice = 0; 1827 } 1828 } 1829 1830 while (now >= (qos->last_timeslice + qos->timeslice_size)) { 1831 qos->last_timeslice += qos->timeslice_size; 1832 for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) { 1833 qos->rate_limits[i].remaining_this_timeslice += 1834 qos->rate_limits[i].max_per_timeslice; 1835 } 1836 } 1837 1838 return _spdk_bdev_qos_io_submit(qos->ch, qos); 1839 } 1840 1841 static void 1842 _spdk_bdev_channel_destroy_resource(struct spdk_bdev_channel *ch) 1843 { 1844 struct spdk_bdev_shared_resource *shared_resource; 1845 1846 spdk_put_io_channel(ch->channel); 1847 1848 shared_resource = ch->shared_resource; 1849 1850 assert(ch->io_outstanding == 0); 1851 assert(shared_resource->ref > 0); 1852 shared_resource->ref--; 1853 if (shared_resource->ref == 0) { 1854 assert(shared_resource->io_outstanding == 0); 1855 TAILQ_REMOVE(&shared_resource->mgmt_ch->shared_resources, shared_resource, link); 1856 spdk_put_io_channel(spdk_io_channel_from_ctx(shared_resource->mgmt_ch)); 1857 free(shared_resource); 1858 } 1859 } 1860 1861 /* Caller must hold bdev->internal.mutex. */ 1862 static void 1863 _spdk_bdev_enable_qos(struct spdk_bdev *bdev, struct spdk_bdev_channel *ch) 1864 { 1865 struct spdk_bdev_qos *qos = bdev->internal.qos; 1866 int i; 1867 1868 /* Rate limiting on this bdev enabled */ 1869 if (qos) { 1870 if (qos->ch == NULL) { 1871 struct spdk_io_channel *io_ch; 1872 1873 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Selecting channel %p as QoS channel for bdev %s on thread %p\n", ch, 1874 bdev->name, spdk_get_thread()); 1875 1876 /* No qos channel has been selected, so set one up */ 1877 1878 /* Take another reference to ch */ 1879 io_ch = spdk_get_io_channel(__bdev_to_io_dev(bdev)); 1880 assert(io_ch != NULL); 1881 qos->ch = ch; 1882 1883 qos->thread = spdk_io_channel_get_thread(io_ch); 1884 1885 TAILQ_INIT(&qos->queued); 1886 1887 for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) { 1888 if (_spdk_bdev_qos_is_iops_rate_limit(i) == true) { 1889 qos->rate_limits[i].min_per_timeslice = 1890 SPDK_BDEV_QOS_MIN_IO_PER_TIMESLICE; 1891 } else { 1892 qos->rate_limits[i].min_per_timeslice = 1893 SPDK_BDEV_QOS_MIN_BYTE_PER_TIMESLICE; 1894 } 1895 1896 if (qos->rate_limits[i].limit == 0) { 1897 qos->rate_limits[i].limit = SPDK_BDEV_QOS_LIMIT_NOT_DEFINED; 1898 } 1899 } 1900 spdk_bdev_qos_update_max_quota_per_timeslice(qos); 1901 qos->timeslice_size = 1902 SPDK_BDEV_QOS_TIMESLICE_IN_USEC * spdk_get_ticks_hz() / SPDK_SEC_TO_USEC; 1903 qos->last_timeslice = spdk_get_ticks(); 1904 qos->poller = spdk_poller_register(spdk_bdev_channel_poll_qos, 1905 qos, 1906 SPDK_BDEV_QOS_TIMESLICE_IN_USEC); 1907 } 1908 1909 ch->flags |= BDEV_CH_QOS_ENABLED; 1910 } 1911 } 1912 1913 static int 1914 spdk_bdev_channel_create(void *io_device, void *ctx_buf) 1915 { 1916 struct spdk_bdev *bdev = __bdev_from_io_dev(io_device); 1917 struct spdk_bdev_channel *ch = ctx_buf; 1918 struct spdk_io_channel *mgmt_io_ch; 1919 struct spdk_bdev_mgmt_channel *mgmt_ch; 1920 struct spdk_bdev_shared_resource *shared_resource; 1921 1922 ch->bdev = bdev; 1923 ch->channel = bdev->fn_table->get_io_channel(bdev->ctxt); 1924 if (!ch->channel) { 1925 return -1; 1926 } 1927 1928 assert(ch->histogram == NULL); 1929 if (bdev->internal.histogram_enabled) { 1930 ch->histogram = spdk_histogram_data_alloc(); 1931 if (ch->histogram == NULL) { 1932 SPDK_ERRLOG("Could not allocate histogram\n"); 1933 } 1934 } 1935 1936 mgmt_io_ch = spdk_get_io_channel(&g_bdev_mgr); 1937 if (!mgmt_io_ch) { 1938 spdk_put_io_channel(ch->channel); 1939 return -1; 1940 } 1941 1942 mgmt_ch = spdk_io_channel_get_ctx(mgmt_io_ch); 1943 TAILQ_FOREACH(shared_resource, &mgmt_ch->shared_resources, link) { 1944 if (shared_resource->shared_ch == ch->channel) { 1945 spdk_put_io_channel(mgmt_io_ch); 1946 shared_resource->ref++; 1947 break; 1948 } 1949 } 1950 1951 if (shared_resource == NULL) { 1952 shared_resource = calloc(1, sizeof(*shared_resource)); 1953 if (shared_resource == NULL) { 1954 spdk_put_io_channel(ch->channel); 1955 spdk_put_io_channel(mgmt_io_ch); 1956 return -1; 1957 } 1958 1959 shared_resource->mgmt_ch = mgmt_ch; 1960 shared_resource->io_outstanding = 0; 1961 TAILQ_INIT(&shared_resource->nomem_io); 1962 shared_resource->nomem_threshold = 0; 1963 shared_resource->shared_ch = ch->channel; 1964 shared_resource->ref = 1; 1965 TAILQ_INSERT_TAIL(&mgmt_ch->shared_resources, shared_resource, link); 1966 } 1967 1968 memset(&ch->stat, 0, sizeof(ch->stat)); 1969 ch->stat.ticks_rate = spdk_get_ticks_hz(); 1970 ch->io_outstanding = 0; 1971 TAILQ_INIT(&ch->queued_resets); 1972 ch->flags = 0; 1973 ch->shared_resource = shared_resource; 1974 1975 #ifdef SPDK_CONFIG_VTUNE 1976 { 1977 char *name; 1978 __itt_init_ittlib(NULL, 0); 1979 name = spdk_sprintf_alloc("spdk_bdev_%s_%p", ch->bdev->name, ch); 1980 if (!name) { 1981 _spdk_bdev_channel_destroy_resource(ch); 1982 return -1; 1983 } 1984 ch->handle = __itt_string_handle_create(name); 1985 free(name); 1986 ch->start_tsc = spdk_get_ticks(); 1987 ch->interval_tsc = spdk_get_ticks_hz() / 100; 1988 memset(&ch->prev_stat, 0, sizeof(ch->prev_stat)); 1989 } 1990 #endif 1991 1992 pthread_mutex_lock(&bdev->internal.mutex); 1993 _spdk_bdev_enable_qos(bdev, ch); 1994 pthread_mutex_unlock(&bdev->internal.mutex); 1995 1996 return 0; 1997 } 1998 1999 /* 2000 * Abort I/O that are waiting on a data buffer. These types of I/O are 2001 * linked using the spdk_bdev_io internal.buf_link TAILQ_ENTRY. 2002 */ 2003 static void 2004 _spdk_bdev_abort_buf_io(bdev_io_stailq_t *queue, struct spdk_bdev_channel *ch) 2005 { 2006 bdev_io_stailq_t tmp; 2007 struct spdk_bdev_io *bdev_io; 2008 2009 STAILQ_INIT(&tmp); 2010 2011 while (!STAILQ_EMPTY(queue)) { 2012 bdev_io = STAILQ_FIRST(queue); 2013 STAILQ_REMOVE_HEAD(queue, internal.buf_link); 2014 if (bdev_io->internal.ch == ch) { 2015 spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED); 2016 } else { 2017 STAILQ_INSERT_TAIL(&tmp, bdev_io, internal.buf_link); 2018 } 2019 } 2020 2021 STAILQ_SWAP(&tmp, queue, spdk_bdev_io); 2022 } 2023 2024 /* 2025 * Abort I/O that are queued waiting for submission. These types of I/O are 2026 * linked using the spdk_bdev_io link TAILQ_ENTRY. 2027 */ 2028 static void 2029 _spdk_bdev_abort_queued_io(bdev_io_tailq_t *queue, struct spdk_bdev_channel *ch) 2030 { 2031 struct spdk_bdev_io *bdev_io, *tmp; 2032 2033 TAILQ_FOREACH_SAFE(bdev_io, queue, internal.link, tmp) { 2034 if (bdev_io->internal.ch == ch) { 2035 TAILQ_REMOVE(queue, bdev_io, internal.link); 2036 /* 2037 * spdk_bdev_io_complete() assumes that the completed I/O had 2038 * been submitted to the bdev module. Since in this case it 2039 * hadn't, bump io_outstanding to account for the decrement 2040 * that spdk_bdev_io_complete() will do. 2041 */ 2042 if (bdev_io->type != SPDK_BDEV_IO_TYPE_RESET) { 2043 ch->io_outstanding++; 2044 ch->shared_resource->io_outstanding++; 2045 } 2046 spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED); 2047 } 2048 } 2049 } 2050 2051 static void 2052 spdk_bdev_qos_channel_destroy(void *cb_arg) 2053 { 2054 struct spdk_bdev_qos *qos = cb_arg; 2055 2056 spdk_put_io_channel(spdk_io_channel_from_ctx(qos->ch)); 2057 spdk_poller_unregister(&qos->poller); 2058 2059 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Free QoS %p.\n", qos); 2060 2061 free(qos); 2062 } 2063 2064 static int 2065 spdk_bdev_qos_destroy(struct spdk_bdev *bdev) 2066 { 2067 int i; 2068 2069 /* 2070 * Cleanly shutting down the QoS poller is tricky, because 2071 * during the asynchronous operation the user could open 2072 * a new descriptor and create a new channel, spawning 2073 * a new QoS poller. 2074 * 2075 * The strategy is to create a new QoS structure here and swap it 2076 * in. The shutdown path then continues to refer to the old one 2077 * until it completes and then releases it. 2078 */ 2079 struct spdk_bdev_qos *new_qos, *old_qos; 2080 2081 old_qos = bdev->internal.qos; 2082 2083 new_qos = calloc(1, sizeof(*new_qos)); 2084 if (!new_qos) { 2085 SPDK_ERRLOG("Unable to allocate memory to shut down QoS.\n"); 2086 return -ENOMEM; 2087 } 2088 2089 /* Copy the old QoS data into the newly allocated structure */ 2090 memcpy(new_qos, old_qos, sizeof(*new_qos)); 2091 2092 /* Zero out the key parts of the QoS structure */ 2093 new_qos->ch = NULL; 2094 new_qos->thread = NULL; 2095 new_qos->poller = NULL; 2096 TAILQ_INIT(&new_qos->queued); 2097 /* 2098 * The limit member of spdk_bdev_qos_limit structure is not zeroed. 2099 * It will be used later for the new QoS structure. 2100 */ 2101 for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) { 2102 new_qos->rate_limits[i].remaining_this_timeslice = 0; 2103 new_qos->rate_limits[i].min_per_timeslice = 0; 2104 new_qos->rate_limits[i].max_per_timeslice = 0; 2105 } 2106 2107 bdev->internal.qos = new_qos; 2108 2109 if (old_qos->thread == NULL) { 2110 free(old_qos); 2111 } else { 2112 spdk_thread_send_msg(old_qos->thread, spdk_bdev_qos_channel_destroy, 2113 old_qos); 2114 } 2115 2116 /* It is safe to continue with destroying the bdev even though the QoS channel hasn't 2117 * been destroyed yet. The destruction path will end up waiting for the final 2118 * channel to be put before it releases resources. */ 2119 2120 return 0; 2121 } 2122 2123 static void 2124 _spdk_bdev_io_stat_add(struct spdk_bdev_io_stat *total, struct spdk_bdev_io_stat *add) 2125 { 2126 total->bytes_read += add->bytes_read; 2127 total->num_read_ops += add->num_read_ops; 2128 total->bytes_written += add->bytes_written; 2129 total->num_write_ops += add->num_write_ops; 2130 total->bytes_unmapped += add->bytes_unmapped; 2131 total->num_unmap_ops += add->num_unmap_ops; 2132 total->read_latency_ticks += add->read_latency_ticks; 2133 total->write_latency_ticks += add->write_latency_ticks; 2134 total->unmap_latency_ticks += add->unmap_latency_ticks; 2135 } 2136 2137 static void 2138 spdk_bdev_channel_destroy(void *io_device, void *ctx_buf) 2139 { 2140 struct spdk_bdev_channel *ch = ctx_buf; 2141 struct spdk_bdev_mgmt_channel *mgmt_ch; 2142 struct spdk_bdev_shared_resource *shared_resource = ch->shared_resource; 2143 2144 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Destroying channel %p for bdev %s on thread %p\n", ch, ch->bdev->name, 2145 spdk_get_thread()); 2146 2147 /* This channel is going away, so add its statistics into the bdev so that they don't get lost. */ 2148 pthread_mutex_lock(&ch->bdev->internal.mutex); 2149 _spdk_bdev_io_stat_add(&ch->bdev->internal.stat, &ch->stat); 2150 pthread_mutex_unlock(&ch->bdev->internal.mutex); 2151 2152 mgmt_ch = shared_resource->mgmt_ch; 2153 2154 _spdk_bdev_abort_queued_io(&ch->queued_resets, ch); 2155 _spdk_bdev_abort_queued_io(&shared_resource->nomem_io, ch); 2156 _spdk_bdev_abort_buf_io(&mgmt_ch->need_buf_small, ch); 2157 _spdk_bdev_abort_buf_io(&mgmt_ch->need_buf_large, ch); 2158 2159 if (ch->histogram) { 2160 spdk_histogram_data_free(ch->histogram); 2161 } 2162 2163 _spdk_bdev_channel_destroy_resource(ch); 2164 } 2165 2166 int 2167 spdk_bdev_alias_add(struct spdk_bdev *bdev, const char *alias) 2168 { 2169 struct spdk_bdev_alias *tmp; 2170 2171 if (alias == NULL) { 2172 SPDK_ERRLOG("Empty alias passed\n"); 2173 return -EINVAL; 2174 } 2175 2176 if (spdk_bdev_get_by_name(alias)) { 2177 SPDK_ERRLOG("Bdev name/alias: %s already exists\n", alias); 2178 return -EEXIST; 2179 } 2180 2181 tmp = calloc(1, sizeof(*tmp)); 2182 if (tmp == NULL) { 2183 SPDK_ERRLOG("Unable to allocate alias\n"); 2184 return -ENOMEM; 2185 } 2186 2187 tmp->alias = strdup(alias); 2188 if (tmp->alias == NULL) { 2189 free(tmp); 2190 SPDK_ERRLOG("Unable to allocate alias\n"); 2191 return -ENOMEM; 2192 } 2193 2194 TAILQ_INSERT_TAIL(&bdev->aliases, tmp, tailq); 2195 2196 return 0; 2197 } 2198 2199 int 2200 spdk_bdev_alias_del(struct spdk_bdev *bdev, const char *alias) 2201 { 2202 struct spdk_bdev_alias *tmp; 2203 2204 TAILQ_FOREACH(tmp, &bdev->aliases, tailq) { 2205 if (strcmp(alias, tmp->alias) == 0) { 2206 TAILQ_REMOVE(&bdev->aliases, tmp, tailq); 2207 free(tmp->alias); 2208 free(tmp); 2209 return 0; 2210 } 2211 } 2212 2213 SPDK_INFOLOG(SPDK_LOG_BDEV, "Alias %s does not exists\n", alias); 2214 2215 return -ENOENT; 2216 } 2217 2218 void 2219 spdk_bdev_alias_del_all(struct spdk_bdev *bdev) 2220 { 2221 struct spdk_bdev_alias *p, *tmp; 2222 2223 TAILQ_FOREACH_SAFE(p, &bdev->aliases, tailq, tmp) { 2224 TAILQ_REMOVE(&bdev->aliases, p, tailq); 2225 free(p->alias); 2226 free(p); 2227 } 2228 } 2229 2230 struct spdk_io_channel * 2231 spdk_bdev_get_io_channel(struct spdk_bdev_desc *desc) 2232 { 2233 return spdk_get_io_channel(__bdev_to_io_dev(desc->bdev)); 2234 } 2235 2236 const char * 2237 spdk_bdev_get_name(const struct spdk_bdev *bdev) 2238 { 2239 return bdev->name; 2240 } 2241 2242 const char * 2243 spdk_bdev_get_product_name(const struct spdk_bdev *bdev) 2244 { 2245 return bdev->product_name; 2246 } 2247 2248 const struct spdk_bdev_aliases_list * 2249 spdk_bdev_get_aliases(const struct spdk_bdev *bdev) 2250 { 2251 return &bdev->aliases; 2252 } 2253 2254 uint32_t 2255 spdk_bdev_get_block_size(const struct spdk_bdev *bdev) 2256 { 2257 return bdev->blocklen; 2258 } 2259 2260 uint64_t 2261 spdk_bdev_get_num_blocks(const struct spdk_bdev *bdev) 2262 { 2263 return bdev->blockcnt; 2264 } 2265 2266 const char * 2267 spdk_bdev_get_qos_rpc_type(enum spdk_bdev_qos_rate_limit_type type) 2268 { 2269 return qos_rpc_type[type]; 2270 } 2271 2272 void 2273 spdk_bdev_get_qos_rate_limits(struct spdk_bdev *bdev, uint64_t *limits) 2274 { 2275 int i; 2276 2277 memset(limits, 0, sizeof(*limits) * SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES); 2278 2279 pthread_mutex_lock(&bdev->internal.mutex); 2280 if (bdev->internal.qos) { 2281 for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) { 2282 if (bdev->internal.qos->rate_limits[i].limit != 2283 SPDK_BDEV_QOS_LIMIT_NOT_DEFINED) { 2284 limits[i] = bdev->internal.qos->rate_limits[i].limit; 2285 if (_spdk_bdev_qos_is_iops_rate_limit(i) == false) { 2286 /* Change from Byte to Megabyte which is user visible. */ 2287 limits[i] = limits[i] / 1024 / 1024; 2288 } 2289 } 2290 } 2291 } 2292 pthread_mutex_unlock(&bdev->internal.mutex); 2293 } 2294 2295 size_t 2296 spdk_bdev_get_buf_align(const struct spdk_bdev *bdev) 2297 { 2298 return 1 << bdev->required_alignment; 2299 } 2300 2301 uint32_t 2302 spdk_bdev_get_optimal_io_boundary(const struct spdk_bdev *bdev) 2303 { 2304 return bdev->optimal_io_boundary; 2305 } 2306 2307 bool 2308 spdk_bdev_has_write_cache(const struct spdk_bdev *bdev) 2309 { 2310 return bdev->write_cache; 2311 } 2312 2313 const struct spdk_uuid * 2314 spdk_bdev_get_uuid(const struct spdk_bdev *bdev) 2315 { 2316 return &bdev->uuid; 2317 } 2318 2319 uint64_t 2320 spdk_bdev_get_qd(const struct spdk_bdev *bdev) 2321 { 2322 return bdev->internal.measured_queue_depth; 2323 } 2324 2325 uint64_t 2326 spdk_bdev_get_qd_sampling_period(const struct spdk_bdev *bdev) 2327 { 2328 return bdev->internal.period; 2329 } 2330 2331 uint64_t 2332 spdk_bdev_get_weighted_io_time(const struct spdk_bdev *bdev) 2333 { 2334 return bdev->internal.weighted_io_time; 2335 } 2336 2337 uint64_t 2338 spdk_bdev_get_io_time(const struct spdk_bdev *bdev) 2339 { 2340 return bdev->internal.io_time; 2341 } 2342 2343 static void 2344 _calculate_measured_qd_cpl(struct spdk_io_channel_iter *i, int status) 2345 { 2346 struct spdk_bdev *bdev = spdk_io_channel_iter_get_ctx(i); 2347 2348 bdev->internal.measured_queue_depth = bdev->internal.temporary_queue_depth; 2349 2350 if (bdev->internal.measured_queue_depth) { 2351 bdev->internal.io_time += bdev->internal.period; 2352 bdev->internal.weighted_io_time += bdev->internal.period * bdev->internal.measured_queue_depth; 2353 } 2354 } 2355 2356 static void 2357 _calculate_measured_qd(struct spdk_io_channel_iter *i) 2358 { 2359 struct spdk_bdev *bdev = spdk_io_channel_iter_get_ctx(i); 2360 struct spdk_io_channel *io_ch = spdk_io_channel_iter_get_channel(i); 2361 struct spdk_bdev_channel *ch = spdk_io_channel_get_ctx(io_ch); 2362 2363 bdev->internal.temporary_queue_depth += ch->io_outstanding; 2364 spdk_for_each_channel_continue(i, 0); 2365 } 2366 2367 static int 2368 spdk_bdev_calculate_measured_queue_depth(void *ctx) 2369 { 2370 struct spdk_bdev *bdev = ctx; 2371 bdev->internal.temporary_queue_depth = 0; 2372 spdk_for_each_channel(__bdev_to_io_dev(bdev), _calculate_measured_qd, bdev, 2373 _calculate_measured_qd_cpl); 2374 return 0; 2375 } 2376 2377 void 2378 spdk_bdev_set_qd_sampling_period(struct spdk_bdev *bdev, uint64_t period) 2379 { 2380 bdev->internal.period = period; 2381 2382 if (bdev->internal.qd_poller != NULL) { 2383 spdk_poller_unregister(&bdev->internal.qd_poller); 2384 bdev->internal.measured_queue_depth = UINT64_MAX; 2385 } 2386 2387 if (period != 0) { 2388 bdev->internal.qd_poller = spdk_poller_register(spdk_bdev_calculate_measured_queue_depth, bdev, 2389 period); 2390 } 2391 } 2392 2393 int 2394 spdk_bdev_notify_blockcnt_change(struct spdk_bdev *bdev, uint64_t size) 2395 { 2396 int ret; 2397 2398 pthread_mutex_lock(&bdev->internal.mutex); 2399 2400 /* bdev has open descriptors */ 2401 if (!TAILQ_EMPTY(&bdev->internal.open_descs) && 2402 bdev->blockcnt > size) { 2403 ret = -EBUSY; 2404 } else { 2405 bdev->blockcnt = size; 2406 ret = 0; 2407 } 2408 2409 pthread_mutex_unlock(&bdev->internal.mutex); 2410 2411 return ret; 2412 } 2413 2414 /* 2415 * Convert I/O offset and length from bytes to blocks. 2416 * 2417 * Returns zero on success or non-zero if the byte parameters aren't divisible by the block size. 2418 */ 2419 static uint64_t 2420 spdk_bdev_bytes_to_blocks(struct spdk_bdev *bdev, uint64_t offset_bytes, uint64_t *offset_blocks, 2421 uint64_t num_bytes, uint64_t *num_blocks) 2422 { 2423 uint32_t block_size = bdev->blocklen; 2424 uint8_t shift_cnt; 2425 2426 /* Avoid expensive div operations if possible. These spdk_u32 functions are very cheap. */ 2427 if (spdk_likely(spdk_u32_is_pow2(block_size))) { 2428 shift_cnt = spdk_u32log2(block_size); 2429 *offset_blocks = offset_bytes >> shift_cnt; 2430 *num_blocks = num_bytes >> shift_cnt; 2431 return (offset_bytes - (*offset_blocks << shift_cnt)) | 2432 (num_bytes - (*num_blocks << shift_cnt)); 2433 } else { 2434 *offset_blocks = offset_bytes / block_size; 2435 *num_blocks = num_bytes / block_size; 2436 return (offset_bytes % block_size) | (num_bytes % block_size); 2437 } 2438 } 2439 2440 static bool 2441 spdk_bdev_io_valid_blocks(struct spdk_bdev *bdev, uint64_t offset_blocks, uint64_t num_blocks) 2442 { 2443 /* Return failure if offset_blocks + num_blocks is less than offset_blocks; indicates there 2444 * has been an overflow and hence the offset has been wrapped around */ 2445 if (offset_blocks + num_blocks < offset_blocks) { 2446 return false; 2447 } 2448 2449 /* Return failure if offset_blocks + num_blocks exceeds the size of the bdev */ 2450 if (offset_blocks + num_blocks > bdev->blockcnt) { 2451 return false; 2452 } 2453 2454 return true; 2455 } 2456 2457 int 2458 spdk_bdev_read(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 2459 void *buf, uint64_t offset, uint64_t nbytes, 2460 spdk_bdev_io_completion_cb cb, void *cb_arg) 2461 { 2462 uint64_t offset_blocks, num_blocks; 2463 2464 if (spdk_bdev_bytes_to_blocks(desc->bdev, offset, &offset_blocks, nbytes, &num_blocks) != 0) { 2465 return -EINVAL; 2466 } 2467 2468 return spdk_bdev_read_blocks(desc, ch, buf, offset_blocks, num_blocks, cb, cb_arg); 2469 } 2470 2471 int 2472 spdk_bdev_read_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 2473 void *buf, uint64_t offset_blocks, uint64_t num_blocks, 2474 spdk_bdev_io_completion_cb cb, void *cb_arg) 2475 { 2476 struct spdk_bdev *bdev = desc->bdev; 2477 struct spdk_bdev_io *bdev_io; 2478 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 2479 2480 if (!spdk_bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) { 2481 return -EINVAL; 2482 } 2483 2484 bdev_io = spdk_bdev_get_io(channel); 2485 if (!bdev_io) { 2486 return -ENOMEM; 2487 } 2488 2489 bdev_io->internal.ch = channel; 2490 bdev_io->internal.desc = desc; 2491 bdev_io->type = SPDK_BDEV_IO_TYPE_READ; 2492 bdev_io->u.bdev.iovs = &bdev_io->iov; 2493 bdev_io->u.bdev.iovs[0].iov_base = buf; 2494 bdev_io->u.bdev.iovs[0].iov_len = num_blocks * bdev->blocklen; 2495 bdev_io->u.bdev.iovcnt = 1; 2496 bdev_io->u.bdev.num_blocks = num_blocks; 2497 bdev_io->u.bdev.offset_blocks = offset_blocks; 2498 spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb); 2499 2500 spdk_bdev_io_submit(bdev_io); 2501 return 0; 2502 } 2503 2504 int 2505 spdk_bdev_readv(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 2506 struct iovec *iov, int iovcnt, 2507 uint64_t offset, uint64_t nbytes, 2508 spdk_bdev_io_completion_cb cb, void *cb_arg) 2509 { 2510 uint64_t offset_blocks, num_blocks; 2511 2512 if (spdk_bdev_bytes_to_blocks(desc->bdev, offset, &offset_blocks, nbytes, &num_blocks) != 0) { 2513 return -EINVAL; 2514 } 2515 2516 return spdk_bdev_readv_blocks(desc, ch, iov, iovcnt, offset_blocks, num_blocks, cb, cb_arg); 2517 } 2518 2519 int spdk_bdev_readv_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 2520 struct iovec *iov, int iovcnt, 2521 uint64_t offset_blocks, uint64_t num_blocks, 2522 spdk_bdev_io_completion_cb cb, void *cb_arg) 2523 { 2524 struct spdk_bdev *bdev = desc->bdev; 2525 struct spdk_bdev_io *bdev_io; 2526 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 2527 2528 if (!spdk_bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) { 2529 return -EINVAL; 2530 } 2531 2532 bdev_io = spdk_bdev_get_io(channel); 2533 if (!bdev_io) { 2534 return -ENOMEM; 2535 } 2536 2537 bdev_io->internal.ch = channel; 2538 bdev_io->internal.desc = desc; 2539 bdev_io->type = SPDK_BDEV_IO_TYPE_READ; 2540 bdev_io->u.bdev.iovs = iov; 2541 bdev_io->u.bdev.iovcnt = iovcnt; 2542 bdev_io->u.bdev.num_blocks = num_blocks; 2543 bdev_io->u.bdev.offset_blocks = offset_blocks; 2544 spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb); 2545 2546 spdk_bdev_io_submit(bdev_io); 2547 return 0; 2548 } 2549 2550 int 2551 spdk_bdev_write(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 2552 void *buf, uint64_t offset, uint64_t nbytes, 2553 spdk_bdev_io_completion_cb cb, void *cb_arg) 2554 { 2555 uint64_t offset_blocks, num_blocks; 2556 2557 if (spdk_bdev_bytes_to_blocks(desc->bdev, offset, &offset_blocks, nbytes, &num_blocks) != 0) { 2558 return -EINVAL; 2559 } 2560 2561 return spdk_bdev_write_blocks(desc, ch, buf, offset_blocks, num_blocks, cb, cb_arg); 2562 } 2563 2564 int 2565 spdk_bdev_write_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 2566 void *buf, uint64_t offset_blocks, uint64_t num_blocks, 2567 spdk_bdev_io_completion_cb cb, void *cb_arg) 2568 { 2569 struct spdk_bdev *bdev = desc->bdev; 2570 struct spdk_bdev_io *bdev_io; 2571 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 2572 2573 if (!desc->write) { 2574 return -EBADF; 2575 } 2576 2577 if (!spdk_bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) { 2578 return -EINVAL; 2579 } 2580 2581 bdev_io = spdk_bdev_get_io(channel); 2582 if (!bdev_io) { 2583 return -ENOMEM; 2584 } 2585 2586 bdev_io->internal.ch = channel; 2587 bdev_io->internal.desc = desc; 2588 bdev_io->type = SPDK_BDEV_IO_TYPE_WRITE; 2589 bdev_io->u.bdev.iovs = &bdev_io->iov; 2590 bdev_io->u.bdev.iovs[0].iov_base = buf; 2591 bdev_io->u.bdev.iovs[0].iov_len = num_blocks * bdev->blocklen; 2592 bdev_io->u.bdev.iovcnt = 1; 2593 bdev_io->u.bdev.num_blocks = num_blocks; 2594 bdev_io->u.bdev.offset_blocks = offset_blocks; 2595 spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb); 2596 2597 spdk_bdev_io_submit(bdev_io); 2598 return 0; 2599 } 2600 2601 int 2602 spdk_bdev_writev(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 2603 struct iovec *iov, int iovcnt, 2604 uint64_t offset, uint64_t len, 2605 spdk_bdev_io_completion_cb cb, void *cb_arg) 2606 { 2607 uint64_t offset_blocks, num_blocks; 2608 2609 if (spdk_bdev_bytes_to_blocks(desc->bdev, offset, &offset_blocks, len, &num_blocks) != 0) { 2610 return -EINVAL; 2611 } 2612 2613 return spdk_bdev_writev_blocks(desc, ch, iov, iovcnt, offset_blocks, num_blocks, cb, cb_arg); 2614 } 2615 2616 int 2617 spdk_bdev_writev_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 2618 struct iovec *iov, int iovcnt, 2619 uint64_t offset_blocks, uint64_t num_blocks, 2620 spdk_bdev_io_completion_cb cb, void *cb_arg) 2621 { 2622 struct spdk_bdev *bdev = desc->bdev; 2623 struct spdk_bdev_io *bdev_io; 2624 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 2625 2626 if (!desc->write) { 2627 return -EBADF; 2628 } 2629 2630 if (!spdk_bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) { 2631 return -EINVAL; 2632 } 2633 2634 bdev_io = spdk_bdev_get_io(channel); 2635 if (!bdev_io) { 2636 return -ENOMEM; 2637 } 2638 2639 bdev_io->internal.ch = channel; 2640 bdev_io->internal.desc = desc; 2641 bdev_io->type = SPDK_BDEV_IO_TYPE_WRITE; 2642 bdev_io->u.bdev.iovs = iov; 2643 bdev_io->u.bdev.iovcnt = iovcnt; 2644 bdev_io->u.bdev.num_blocks = num_blocks; 2645 bdev_io->u.bdev.offset_blocks = offset_blocks; 2646 spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb); 2647 2648 spdk_bdev_io_submit(bdev_io); 2649 return 0; 2650 } 2651 2652 int 2653 spdk_bdev_write_zeroes(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 2654 uint64_t offset, uint64_t len, 2655 spdk_bdev_io_completion_cb cb, void *cb_arg) 2656 { 2657 uint64_t offset_blocks, num_blocks; 2658 2659 if (spdk_bdev_bytes_to_blocks(desc->bdev, offset, &offset_blocks, len, &num_blocks) != 0) { 2660 return -EINVAL; 2661 } 2662 2663 return spdk_bdev_write_zeroes_blocks(desc, ch, offset_blocks, num_blocks, cb, cb_arg); 2664 } 2665 2666 int 2667 spdk_bdev_write_zeroes_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 2668 uint64_t offset_blocks, uint64_t num_blocks, 2669 spdk_bdev_io_completion_cb cb, void *cb_arg) 2670 { 2671 struct spdk_bdev *bdev = desc->bdev; 2672 struct spdk_bdev_io *bdev_io; 2673 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 2674 2675 if (!desc->write) { 2676 return -EBADF; 2677 } 2678 2679 if (!spdk_bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) { 2680 return -EINVAL; 2681 } 2682 2683 bdev_io = spdk_bdev_get_io(channel); 2684 2685 if (!bdev_io) { 2686 return -ENOMEM; 2687 } 2688 2689 bdev_io->type = SPDK_BDEV_IO_TYPE_WRITE_ZEROES; 2690 bdev_io->internal.ch = channel; 2691 bdev_io->internal.desc = desc; 2692 bdev_io->u.bdev.offset_blocks = offset_blocks; 2693 bdev_io->u.bdev.num_blocks = num_blocks; 2694 spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb); 2695 2696 if (_spdk_bdev_io_type_supported(bdev, SPDK_BDEV_IO_TYPE_WRITE_ZEROES)) { 2697 spdk_bdev_io_submit(bdev_io); 2698 return 0; 2699 } else if (_spdk_bdev_io_type_supported(bdev, SPDK_BDEV_IO_TYPE_WRITE)) { 2700 assert(spdk_bdev_get_block_size(bdev) <= ZERO_BUFFER_SIZE); 2701 bdev_io->u.bdev.split_remaining_num_blocks = num_blocks; 2702 bdev_io->u.bdev.split_current_offset_blocks = offset_blocks; 2703 _spdk_bdev_write_zero_buffer_next(bdev_io); 2704 return 0; 2705 } else { 2706 spdk_bdev_free_io(bdev_io); 2707 return -ENOTSUP; 2708 } 2709 } 2710 2711 int 2712 spdk_bdev_unmap(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 2713 uint64_t offset, uint64_t nbytes, 2714 spdk_bdev_io_completion_cb cb, void *cb_arg) 2715 { 2716 uint64_t offset_blocks, num_blocks; 2717 2718 if (spdk_bdev_bytes_to_blocks(desc->bdev, offset, &offset_blocks, nbytes, &num_blocks) != 0) { 2719 return -EINVAL; 2720 } 2721 2722 return spdk_bdev_unmap_blocks(desc, ch, offset_blocks, num_blocks, cb, cb_arg); 2723 } 2724 2725 int 2726 spdk_bdev_unmap_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 2727 uint64_t offset_blocks, uint64_t num_blocks, 2728 spdk_bdev_io_completion_cb cb, void *cb_arg) 2729 { 2730 struct spdk_bdev *bdev = desc->bdev; 2731 struct spdk_bdev_io *bdev_io; 2732 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 2733 2734 if (!desc->write) { 2735 return -EBADF; 2736 } 2737 2738 if (!spdk_bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) { 2739 return -EINVAL; 2740 } 2741 2742 if (num_blocks == 0) { 2743 SPDK_ERRLOG("Can't unmap 0 bytes\n"); 2744 return -EINVAL; 2745 } 2746 2747 bdev_io = spdk_bdev_get_io(channel); 2748 if (!bdev_io) { 2749 return -ENOMEM; 2750 } 2751 2752 bdev_io->internal.ch = channel; 2753 bdev_io->internal.desc = desc; 2754 bdev_io->type = SPDK_BDEV_IO_TYPE_UNMAP; 2755 2756 bdev_io->u.bdev.iovs = &bdev_io->iov; 2757 bdev_io->u.bdev.iovs[0].iov_base = NULL; 2758 bdev_io->u.bdev.iovs[0].iov_len = 0; 2759 bdev_io->u.bdev.iovcnt = 1; 2760 2761 bdev_io->u.bdev.offset_blocks = offset_blocks; 2762 bdev_io->u.bdev.num_blocks = num_blocks; 2763 spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb); 2764 2765 spdk_bdev_io_submit(bdev_io); 2766 return 0; 2767 } 2768 2769 int 2770 spdk_bdev_flush(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 2771 uint64_t offset, uint64_t length, 2772 spdk_bdev_io_completion_cb cb, void *cb_arg) 2773 { 2774 uint64_t offset_blocks, num_blocks; 2775 2776 if (spdk_bdev_bytes_to_blocks(desc->bdev, offset, &offset_blocks, length, &num_blocks) != 0) { 2777 return -EINVAL; 2778 } 2779 2780 return spdk_bdev_flush_blocks(desc, ch, offset_blocks, num_blocks, cb, cb_arg); 2781 } 2782 2783 int 2784 spdk_bdev_flush_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 2785 uint64_t offset_blocks, uint64_t num_blocks, 2786 spdk_bdev_io_completion_cb cb, void *cb_arg) 2787 { 2788 struct spdk_bdev *bdev = desc->bdev; 2789 struct spdk_bdev_io *bdev_io; 2790 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 2791 2792 if (!desc->write) { 2793 return -EBADF; 2794 } 2795 2796 if (!spdk_bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) { 2797 return -EINVAL; 2798 } 2799 2800 bdev_io = spdk_bdev_get_io(channel); 2801 if (!bdev_io) { 2802 return -ENOMEM; 2803 } 2804 2805 bdev_io->internal.ch = channel; 2806 bdev_io->internal.desc = desc; 2807 bdev_io->type = SPDK_BDEV_IO_TYPE_FLUSH; 2808 bdev_io->u.bdev.iovs = NULL; 2809 bdev_io->u.bdev.iovcnt = 0; 2810 bdev_io->u.bdev.offset_blocks = offset_blocks; 2811 bdev_io->u.bdev.num_blocks = num_blocks; 2812 spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb); 2813 2814 spdk_bdev_io_submit(bdev_io); 2815 return 0; 2816 } 2817 2818 static void 2819 _spdk_bdev_reset_dev(struct spdk_io_channel_iter *i, int status) 2820 { 2821 struct spdk_bdev_channel *ch = spdk_io_channel_iter_get_ctx(i); 2822 struct spdk_bdev_io *bdev_io; 2823 2824 bdev_io = TAILQ_FIRST(&ch->queued_resets); 2825 TAILQ_REMOVE(&ch->queued_resets, bdev_io, internal.link); 2826 spdk_bdev_io_submit_reset(bdev_io); 2827 } 2828 2829 static void 2830 _spdk_bdev_reset_freeze_channel(struct spdk_io_channel_iter *i) 2831 { 2832 struct spdk_io_channel *ch; 2833 struct spdk_bdev_channel *channel; 2834 struct spdk_bdev_mgmt_channel *mgmt_channel; 2835 struct spdk_bdev_shared_resource *shared_resource; 2836 bdev_io_tailq_t tmp_queued; 2837 2838 TAILQ_INIT(&tmp_queued); 2839 2840 ch = spdk_io_channel_iter_get_channel(i); 2841 channel = spdk_io_channel_get_ctx(ch); 2842 shared_resource = channel->shared_resource; 2843 mgmt_channel = shared_resource->mgmt_ch; 2844 2845 channel->flags |= BDEV_CH_RESET_IN_PROGRESS; 2846 2847 if ((channel->flags & BDEV_CH_QOS_ENABLED) != 0) { 2848 /* The QoS object is always valid and readable while 2849 * the channel flag is set, so the lock here should not 2850 * be necessary. We're not in the fast path though, so 2851 * just take it anyway. */ 2852 pthread_mutex_lock(&channel->bdev->internal.mutex); 2853 if (channel->bdev->internal.qos->ch == channel) { 2854 TAILQ_SWAP(&channel->bdev->internal.qos->queued, &tmp_queued, spdk_bdev_io, internal.link); 2855 } 2856 pthread_mutex_unlock(&channel->bdev->internal.mutex); 2857 } 2858 2859 _spdk_bdev_abort_queued_io(&shared_resource->nomem_io, channel); 2860 _spdk_bdev_abort_buf_io(&mgmt_channel->need_buf_small, channel); 2861 _spdk_bdev_abort_buf_io(&mgmt_channel->need_buf_large, channel); 2862 _spdk_bdev_abort_queued_io(&tmp_queued, channel); 2863 2864 spdk_for_each_channel_continue(i, 0); 2865 } 2866 2867 static void 2868 _spdk_bdev_start_reset(void *ctx) 2869 { 2870 struct spdk_bdev_channel *ch = ctx; 2871 2872 spdk_for_each_channel(__bdev_to_io_dev(ch->bdev), _spdk_bdev_reset_freeze_channel, 2873 ch, _spdk_bdev_reset_dev); 2874 } 2875 2876 static void 2877 _spdk_bdev_channel_start_reset(struct spdk_bdev_channel *ch) 2878 { 2879 struct spdk_bdev *bdev = ch->bdev; 2880 2881 assert(!TAILQ_EMPTY(&ch->queued_resets)); 2882 2883 pthread_mutex_lock(&bdev->internal.mutex); 2884 if (bdev->internal.reset_in_progress == NULL) { 2885 bdev->internal.reset_in_progress = TAILQ_FIRST(&ch->queued_resets); 2886 /* 2887 * Take a channel reference for the target bdev for the life of this 2888 * reset. This guards against the channel getting destroyed while 2889 * spdk_for_each_channel() calls related to this reset IO are in 2890 * progress. We will release the reference when this reset is 2891 * completed. 2892 */ 2893 bdev->internal.reset_in_progress->u.reset.ch_ref = spdk_get_io_channel(__bdev_to_io_dev(bdev)); 2894 _spdk_bdev_start_reset(ch); 2895 } 2896 pthread_mutex_unlock(&bdev->internal.mutex); 2897 } 2898 2899 int 2900 spdk_bdev_reset(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 2901 spdk_bdev_io_completion_cb cb, void *cb_arg) 2902 { 2903 struct spdk_bdev *bdev = desc->bdev; 2904 struct spdk_bdev_io *bdev_io; 2905 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 2906 2907 bdev_io = spdk_bdev_get_io(channel); 2908 if (!bdev_io) { 2909 return -ENOMEM; 2910 } 2911 2912 bdev_io->internal.ch = channel; 2913 bdev_io->internal.desc = desc; 2914 bdev_io->type = SPDK_BDEV_IO_TYPE_RESET; 2915 bdev_io->u.reset.ch_ref = NULL; 2916 spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb); 2917 2918 pthread_mutex_lock(&bdev->internal.mutex); 2919 TAILQ_INSERT_TAIL(&channel->queued_resets, bdev_io, internal.link); 2920 pthread_mutex_unlock(&bdev->internal.mutex); 2921 2922 _spdk_bdev_channel_start_reset(channel); 2923 2924 return 0; 2925 } 2926 2927 void 2928 spdk_bdev_get_io_stat(struct spdk_bdev *bdev, struct spdk_io_channel *ch, 2929 struct spdk_bdev_io_stat *stat) 2930 { 2931 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 2932 2933 *stat = channel->stat; 2934 } 2935 2936 static void 2937 _spdk_bdev_get_device_stat_done(struct spdk_io_channel_iter *i, int status) 2938 { 2939 void *io_device = spdk_io_channel_iter_get_io_device(i); 2940 struct spdk_bdev_iostat_ctx *bdev_iostat_ctx = spdk_io_channel_iter_get_ctx(i); 2941 2942 bdev_iostat_ctx->cb(__bdev_from_io_dev(io_device), bdev_iostat_ctx->stat, 2943 bdev_iostat_ctx->cb_arg, 0); 2944 free(bdev_iostat_ctx); 2945 } 2946 2947 static void 2948 _spdk_bdev_get_each_channel_stat(struct spdk_io_channel_iter *i) 2949 { 2950 struct spdk_bdev_iostat_ctx *bdev_iostat_ctx = spdk_io_channel_iter_get_ctx(i); 2951 struct spdk_io_channel *ch = spdk_io_channel_iter_get_channel(i); 2952 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 2953 2954 _spdk_bdev_io_stat_add(bdev_iostat_ctx->stat, &channel->stat); 2955 spdk_for_each_channel_continue(i, 0); 2956 } 2957 2958 void 2959 spdk_bdev_get_device_stat(struct spdk_bdev *bdev, struct spdk_bdev_io_stat *stat, 2960 spdk_bdev_get_device_stat_cb cb, void *cb_arg) 2961 { 2962 struct spdk_bdev_iostat_ctx *bdev_iostat_ctx; 2963 2964 assert(bdev != NULL); 2965 assert(stat != NULL); 2966 assert(cb != NULL); 2967 2968 bdev_iostat_ctx = calloc(1, sizeof(struct spdk_bdev_iostat_ctx)); 2969 if (bdev_iostat_ctx == NULL) { 2970 SPDK_ERRLOG("Unable to allocate memory for spdk_bdev_iostat_ctx\n"); 2971 cb(bdev, stat, cb_arg, -ENOMEM); 2972 return; 2973 } 2974 2975 bdev_iostat_ctx->stat = stat; 2976 bdev_iostat_ctx->cb = cb; 2977 bdev_iostat_ctx->cb_arg = cb_arg; 2978 2979 /* Start with the statistics from previously deleted channels. */ 2980 pthread_mutex_lock(&bdev->internal.mutex); 2981 _spdk_bdev_io_stat_add(bdev_iostat_ctx->stat, &bdev->internal.stat); 2982 pthread_mutex_unlock(&bdev->internal.mutex); 2983 2984 /* Then iterate and add the statistics from each existing channel. */ 2985 spdk_for_each_channel(__bdev_to_io_dev(bdev), 2986 _spdk_bdev_get_each_channel_stat, 2987 bdev_iostat_ctx, 2988 _spdk_bdev_get_device_stat_done); 2989 } 2990 2991 int 2992 spdk_bdev_nvme_admin_passthru(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 2993 const struct spdk_nvme_cmd *cmd, void *buf, size_t nbytes, 2994 spdk_bdev_io_completion_cb cb, void *cb_arg) 2995 { 2996 struct spdk_bdev *bdev = desc->bdev; 2997 struct spdk_bdev_io *bdev_io; 2998 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 2999 3000 if (!desc->write) { 3001 return -EBADF; 3002 } 3003 3004 bdev_io = spdk_bdev_get_io(channel); 3005 if (!bdev_io) { 3006 return -ENOMEM; 3007 } 3008 3009 bdev_io->internal.ch = channel; 3010 bdev_io->internal.desc = desc; 3011 bdev_io->type = SPDK_BDEV_IO_TYPE_NVME_ADMIN; 3012 bdev_io->u.nvme_passthru.cmd = *cmd; 3013 bdev_io->u.nvme_passthru.buf = buf; 3014 bdev_io->u.nvme_passthru.nbytes = nbytes; 3015 bdev_io->u.nvme_passthru.md_buf = NULL; 3016 bdev_io->u.nvme_passthru.md_len = 0; 3017 3018 spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb); 3019 3020 spdk_bdev_io_submit(bdev_io); 3021 return 0; 3022 } 3023 3024 int 3025 spdk_bdev_nvme_io_passthru(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 3026 const struct spdk_nvme_cmd *cmd, void *buf, size_t nbytes, 3027 spdk_bdev_io_completion_cb cb, void *cb_arg) 3028 { 3029 struct spdk_bdev *bdev = desc->bdev; 3030 struct spdk_bdev_io *bdev_io; 3031 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 3032 3033 if (!desc->write) { 3034 /* 3035 * Do not try to parse the NVMe command - we could maybe use bits in the opcode 3036 * to easily determine if the command is a read or write, but for now just 3037 * do not allow io_passthru with a read-only descriptor. 3038 */ 3039 return -EBADF; 3040 } 3041 3042 bdev_io = spdk_bdev_get_io(channel); 3043 if (!bdev_io) { 3044 return -ENOMEM; 3045 } 3046 3047 bdev_io->internal.ch = channel; 3048 bdev_io->internal.desc = desc; 3049 bdev_io->type = SPDK_BDEV_IO_TYPE_NVME_IO; 3050 bdev_io->u.nvme_passthru.cmd = *cmd; 3051 bdev_io->u.nvme_passthru.buf = buf; 3052 bdev_io->u.nvme_passthru.nbytes = nbytes; 3053 bdev_io->u.nvme_passthru.md_buf = NULL; 3054 bdev_io->u.nvme_passthru.md_len = 0; 3055 3056 spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb); 3057 3058 spdk_bdev_io_submit(bdev_io); 3059 return 0; 3060 } 3061 3062 int 3063 spdk_bdev_nvme_io_passthru_md(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 3064 const struct spdk_nvme_cmd *cmd, void *buf, size_t nbytes, void *md_buf, size_t md_len, 3065 spdk_bdev_io_completion_cb cb, void *cb_arg) 3066 { 3067 struct spdk_bdev *bdev = desc->bdev; 3068 struct spdk_bdev_io *bdev_io; 3069 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 3070 3071 if (!desc->write) { 3072 /* 3073 * Do not try to parse the NVMe command - we could maybe use bits in the opcode 3074 * to easily determine if the command is a read or write, but for now just 3075 * do not allow io_passthru with a read-only descriptor. 3076 */ 3077 return -EBADF; 3078 } 3079 3080 bdev_io = spdk_bdev_get_io(channel); 3081 if (!bdev_io) { 3082 return -ENOMEM; 3083 } 3084 3085 bdev_io->internal.ch = channel; 3086 bdev_io->internal.desc = desc; 3087 bdev_io->type = SPDK_BDEV_IO_TYPE_NVME_IO_MD; 3088 bdev_io->u.nvme_passthru.cmd = *cmd; 3089 bdev_io->u.nvme_passthru.buf = buf; 3090 bdev_io->u.nvme_passthru.nbytes = nbytes; 3091 bdev_io->u.nvme_passthru.md_buf = md_buf; 3092 bdev_io->u.nvme_passthru.md_len = md_len; 3093 3094 spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb); 3095 3096 spdk_bdev_io_submit(bdev_io); 3097 return 0; 3098 } 3099 3100 int 3101 spdk_bdev_queue_io_wait(struct spdk_bdev *bdev, struct spdk_io_channel *ch, 3102 struct spdk_bdev_io_wait_entry *entry) 3103 { 3104 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 3105 struct spdk_bdev_mgmt_channel *mgmt_ch = channel->shared_resource->mgmt_ch; 3106 3107 if (bdev != entry->bdev) { 3108 SPDK_ERRLOG("bdevs do not match\n"); 3109 return -EINVAL; 3110 } 3111 3112 if (mgmt_ch->per_thread_cache_count > 0) { 3113 SPDK_ERRLOG("Cannot queue io_wait if spdk_bdev_io available in per-thread cache\n"); 3114 return -EINVAL; 3115 } 3116 3117 TAILQ_INSERT_TAIL(&mgmt_ch->io_wait_queue, entry, link); 3118 return 0; 3119 } 3120 3121 static void 3122 _spdk_bdev_ch_retry_io(struct spdk_bdev_channel *bdev_ch) 3123 { 3124 struct spdk_bdev *bdev = bdev_ch->bdev; 3125 struct spdk_bdev_shared_resource *shared_resource = bdev_ch->shared_resource; 3126 struct spdk_bdev_io *bdev_io; 3127 3128 if (shared_resource->io_outstanding > shared_resource->nomem_threshold) { 3129 /* 3130 * Allow some more I/O to complete before retrying the nomem_io queue. 3131 * Some drivers (such as nvme) cannot immediately take a new I/O in 3132 * the context of a completion, because the resources for the I/O are 3133 * not released until control returns to the bdev poller. Also, we 3134 * may require several small I/O to complete before a larger I/O 3135 * (that requires splitting) can be submitted. 3136 */ 3137 return; 3138 } 3139 3140 while (!TAILQ_EMPTY(&shared_resource->nomem_io)) { 3141 bdev_io = TAILQ_FIRST(&shared_resource->nomem_io); 3142 TAILQ_REMOVE(&shared_resource->nomem_io, bdev_io, internal.link); 3143 bdev_io->internal.ch->io_outstanding++; 3144 shared_resource->io_outstanding++; 3145 bdev_io->internal.status = SPDK_BDEV_IO_STATUS_PENDING; 3146 bdev->fn_table->submit_request(bdev_io->internal.ch->channel, bdev_io); 3147 if (bdev_io->internal.status == SPDK_BDEV_IO_STATUS_NOMEM) { 3148 break; 3149 } 3150 } 3151 } 3152 3153 static inline void 3154 _spdk_bdev_io_complete(void *ctx) 3155 { 3156 struct spdk_bdev_io *bdev_io = ctx; 3157 uint64_t tsc, tsc_diff; 3158 3159 if (spdk_unlikely(bdev_io->internal.in_submit_request || bdev_io->internal.io_submit_ch)) { 3160 /* 3161 * Send the completion to the thread that originally submitted the I/O, 3162 * which may not be the current thread in the case of QoS. 3163 */ 3164 if (bdev_io->internal.io_submit_ch) { 3165 bdev_io->internal.ch = bdev_io->internal.io_submit_ch; 3166 bdev_io->internal.io_submit_ch = NULL; 3167 } 3168 3169 /* 3170 * Defer completion to avoid potential infinite recursion if the 3171 * user's completion callback issues a new I/O. 3172 */ 3173 spdk_thread_send_msg(spdk_io_channel_get_thread(bdev_io->internal.ch->channel), 3174 _spdk_bdev_io_complete, bdev_io); 3175 return; 3176 } 3177 3178 tsc = spdk_get_ticks(); 3179 tsc_diff = tsc - bdev_io->internal.submit_tsc; 3180 spdk_trace_record_tsc(tsc, TRACE_BDEV_IO_DONE, 0, 0, (uintptr_t)bdev_io, 0); 3181 3182 if (bdev_io->internal.ch->histogram) { 3183 spdk_histogram_data_tally(bdev_io->internal.ch->histogram, tsc_diff); 3184 } 3185 3186 if (bdev_io->internal.status == SPDK_BDEV_IO_STATUS_SUCCESS) { 3187 switch (bdev_io->type) { 3188 case SPDK_BDEV_IO_TYPE_READ: 3189 bdev_io->internal.ch->stat.bytes_read += bdev_io->u.bdev.num_blocks * bdev_io->bdev->blocklen; 3190 bdev_io->internal.ch->stat.num_read_ops++; 3191 bdev_io->internal.ch->stat.read_latency_ticks += tsc_diff; 3192 break; 3193 case SPDK_BDEV_IO_TYPE_WRITE: 3194 bdev_io->internal.ch->stat.bytes_written += bdev_io->u.bdev.num_blocks * bdev_io->bdev->blocklen; 3195 bdev_io->internal.ch->stat.num_write_ops++; 3196 bdev_io->internal.ch->stat.write_latency_ticks += tsc_diff; 3197 break; 3198 case SPDK_BDEV_IO_TYPE_UNMAP: 3199 bdev_io->internal.ch->stat.bytes_unmapped += bdev_io->u.bdev.num_blocks * bdev_io->bdev->blocklen; 3200 bdev_io->internal.ch->stat.num_unmap_ops++; 3201 bdev_io->internal.ch->stat.unmap_latency_ticks += tsc_diff; 3202 default: 3203 break; 3204 } 3205 } 3206 3207 #ifdef SPDK_CONFIG_VTUNE 3208 uint64_t now_tsc = spdk_get_ticks(); 3209 if (now_tsc > (bdev_io->internal.ch->start_tsc + bdev_io->internal.ch->interval_tsc)) { 3210 uint64_t data[5]; 3211 3212 data[0] = bdev_io->internal.ch->stat.num_read_ops - bdev_io->internal.ch->prev_stat.num_read_ops; 3213 data[1] = bdev_io->internal.ch->stat.bytes_read - bdev_io->internal.ch->prev_stat.bytes_read; 3214 data[2] = bdev_io->internal.ch->stat.num_write_ops - bdev_io->internal.ch->prev_stat.num_write_ops; 3215 data[3] = bdev_io->internal.ch->stat.bytes_written - bdev_io->internal.ch->prev_stat.bytes_written; 3216 data[4] = bdev_io->bdev->fn_table->get_spin_time ? 3217 bdev_io->bdev->fn_table->get_spin_time(bdev_io->internal.ch->channel) : 0; 3218 3219 __itt_metadata_add(g_bdev_mgr.domain, __itt_null, bdev_io->internal.ch->handle, 3220 __itt_metadata_u64, 5, data); 3221 3222 bdev_io->internal.ch->prev_stat = bdev_io->internal.ch->stat; 3223 bdev_io->internal.ch->start_tsc = now_tsc; 3224 } 3225 #endif 3226 3227 assert(bdev_io->internal.cb != NULL); 3228 assert(spdk_get_thread() == spdk_io_channel_get_thread(bdev_io->internal.ch->channel)); 3229 3230 bdev_io->internal.cb(bdev_io, bdev_io->internal.status == SPDK_BDEV_IO_STATUS_SUCCESS, 3231 bdev_io->internal.caller_ctx); 3232 } 3233 3234 static void 3235 _spdk_bdev_reset_complete(struct spdk_io_channel_iter *i, int status) 3236 { 3237 struct spdk_bdev_io *bdev_io = spdk_io_channel_iter_get_ctx(i); 3238 3239 if (bdev_io->u.reset.ch_ref != NULL) { 3240 spdk_put_io_channel(bdev_io->u.reset.ch_ref); 3241 bdev_io->u.reset.ch_ref = NULL; 3242 } 3243 3244 _spdk_bdev_io_complete(bdev_io); 3245 } 3246 3247 static void 3248 _spdk_bdev_unfreeze_channel(struct spdk_io_channel_iter *i) 3249 { 3250 struct spdk_io_channel *_ch = spdk_io_channel_iter_get_channel(i); 3251 struct spdk_bdev_channel *ch = spdk_io_channel_get_ctx(_ch); 3252 3253 ch->flags &= ~BDEV_CH_RESET_IN_PROGRESS; 3254 if (!TAILQ_EMPTY(&ch->queued_resets)) { 3255 _spdk_bdev_channel_start_reset(ch); 3256 } 3257 3258 spdk_for_each_channel_continue(i, 0); 3259 } 3260 3261 void 3262 spdk_bdev_io_complete(struct spdk_bdev_io *bdev_io, enum spdk_bdev_io_status status) 3263 { 3264 struct spdk_bdev *bdev = bdev_io->bdev; 3265 struct spdk_bdev_channel *bdev_ch = bdev_io->internal.ch; 3266 struct spdk_bdev_shared_resource *shared_resource = bdev_ch->shared_resource; 3267 3268 bdev_io->internal.status = status; 3269 3270 if (spdk_unlikely(bdev_io->type == SPDK_BDEV_IO_TYPE_RESET)) { 3271 bool unlock_channels = false; 3272 3273 if (status == SPDK_BDEV_IO_STATUS_NOMEM) { 3274 SPDK_ERRLOG("NOMEM returned for reset\n"); 3275 } 3276 pthread_mutex_lock(&bdev->internal.mutex); 3277 if (bdev_io == bdev->internal.reset_in_progress) { 3278 bdev->internal.reset_in_progress = NULL; 3279 unlock_channels = true; 3280 } 3281 pthread_mutex_unlock(&bdev->internal.mutex); 3282 3283 if (unlock_channels) { 3284 spdk_for_each_channel(__bdev_to_io_dev(bdev), _spdk_bdev_unfreeze_channel, 3285 bdev_io, _spdk_bdev_reset_complete); 3286 return; 3287 } 3288 } else { 3289 if (spdk_unlikely(bdev_io->internal.orig_iovcnt > 0)) { 3290 _bdev_io_unset_bounce_buf(bdev_io); 3291 } 3292 3293 assert(bdev_ch->io_outstanding > 0); 3294 assert(shared_resource->io_outstanding > 0); 3295 bdev_ch->io_outstanding--; 3296 shared_resource->io_outstanding--; 3297 3298 if (spdk_unlikely(status == SPDK_BDEV_IO_STATUS_NOMEM)) { 3299 TAILQ_INSERT_HEAD(&shared_resource->nomem_io, bdev_io, internal.link); 3300 /* 3301 * Wait for some of the outstanding I/O to complete before we 3302 * retry any of the nomem_io. Normally we will wait for 3303 * NOMEM_THRESHOLD_COUNT I/O to complete but for low queue 3304 * depth channels we will instead wait for half to complete. 3305 */ 3306 shared_resource->nomem_threshold = spdk_max((int64_t)shared_resource->io_outstanding / 2, 3307 (int64_t)shared_resource->io_outstanding - NOMEM_THRESHOLD_COUNT); 3308 return; 3309 } 3310 3311 if (spdk_unlikely(!TAILQ_EMPTY(&shared_resource->nomem_io))) { 3312 _spdk_bdev_ch_retry_io(bdev_ch); 3313 } 3314 } 3315 3316 _spdk_bdev_io_complete(bdev_io); 3317 } 3318 3319 void 3320 spdk_bdev_io_complete_scsi_status(struct spdk_bdev_io *bdev_io, enum spdk_scsi_status sc, 3321 enum spdk_scsi_sense sk, uint8_t asc, uint8_t ascq) 3322 { 3323 if (sc == SPDK_SCSI_STATUS_GOOD) { 3324 bdev_io->internal.status = SPDK_BDEV_IO_STATUS_SUCCESS; 3325 } else { 3326 bdev_io->internal.status = SPDK_BDEV_IO_STATUS_SCSI_ERROR; 3327 bdev_io->internal.error.scsi.sc = sc; 3328 bdev_io->internal.error.scsi.sk = sk; 3329 bdev_io->internal.error.scsi.asc = asc; 3330 bdev_io->internal.error.scsi.ascq = ascq; 3331 } 3332 3333 spdk_bdev_io_complete(bdev_io, bdev_io->internal.status); 3334 } 3335 3336 void 3337 spdk_bdev_io_get_scsi_status(const struct spdk_bdev_io *bdev_io, 3338 int *sc, int *sk, int *asc, int *ascq) 3339 { 3340 assert(sc != NULL); 3341 assert(sk != NULL); 3342 assert(asc != NULL); 3343 assert(ascq != NULL); 3344 3345 switch (bdev_io->internal.status) { 3346 case SPDK_BDEV_IO_STATUS_SUCCESS: 3347 *sc = SPDK_SCSI_STATUS_GOOD; 3348 *sk = SPDK_SCSI_SENSE_NO_SENSE; 3349 *asc = SPDK_SCSI_ASC_NO_ADDITIONAL_SENSE; 3350 *ascq = SPDK_SCSI_ASCQ_CAUSE_NOT_REPORTABLE; 3351 break; 3352 case SPDK_BDEV_IO_STATUS_NVME_ERROR: 3353 spdk_scsi_nvme_translate(bdev_io, sc, sk, asc, ascq); 3354 break; 3355 case SPDK_BDEV_IO_STATUS_SCSI_ERROR: 3356 *sc = bdev_io->internal.error.scsi.sc; 3357 *sk = bdev_io->internal.error.scsi.sk; 3358 *asc = bdev_io->internal.error.scsi.asc; 3359 *ascq = bdev_io->internal.error.scsi.ascq; 3360 break; 3361 default: 3362 *sc = SPDK_SCSI_STATUS_CHECK_CONDITION; 3363 *sk = SPDK_SCSI_SENSE_ABORTED_COMMAND; 3364 *asc = SPDK_SCSI_ASC_NO_ADDITIONAL_SENSE; 3365 *ascq = SPDK_SCSI_ASCQ_CAUSE_NOT_REPORTABLE; 3366 break; 3367 } 3368 } 3369 3370 void 3371 spdk_bdev_io_complete_nvme_status(struct spdk_bdev_io *bdev_io, int sct, int sc) 3372 { 3373 if (sct == SPDK_NVME_SCT_GENERIC && sc == SPDK_NVME_SC_SUCCESS) { 3374 bdev_io->internal.status = SPDK_BDEV_IO_STATUS_SUCCESS; 3375 } else { 3376 bdev_io->internal.error.nvme.sct = sct; 3377 bdev_io->internal.error.nvme.sc = sc; 3378 bdev_io->internal.status = SPDK_BDEV_IO_STATUS_NVME_ERROR; 3379 } 3380 3381 spdk_bdev_io_complete(bdev_io, bdev_io->internal.status); 3382 } 3383 3384 void 3385 spdk_bdev_io_get_nvme_status(const struct spdk_bdev_io *bdev_io, int *sct, int *sc) 3386 { 3387 assert(sct != NULL); 3388 assert(sc != NULL); 3389 3390 if (bdev_io->internal.status == SPDK_BDEV_IO_STATUS_NVME_ERROR) { 3391 *sct = bdev_io->internal.error.nvme.sct; 3392 *sc = bdev_io->internal.error.nvme.sc; 3393 } else if (bdev_io->internal.status == SPDK_BDEV_IO_STATUS_SUCCESS) { 3394 *sct = SPDK_NVME_SCT_GENERIC; 3395 *sc = SPDK_NVME_SC_SUCCESS; 3396 } else { 3397 *sct = SPDK_NVME_SCT_GENERIC; 3398 *sc = SPDK_NVME_SC_INTERNAL_DEVICE_ERROR; 3399 } 3400 } 3401 3402 struct spdk_thread * 3403 spdk_bdev_io_get_thread(struct spdk_bdev_io *bdev_io) 3404 { 3405 return spdk_io_channel_get_thread(bdev_io->internal.ch->channel); 3406 } 3407 3408 static void 3409 _spdk_bdev_qos_config_limit(struct spdk_bdev *bdev, uint64_t *limits) 3410 { 3411 uint64_t min_qos_set; 3412 int i; 3413 3414 for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) { 3415 if (limits[i] != SPDK_BDEV_QOS_LIMIT_NOT_DEFINED) { 3416 break; 3417 } 3418 } 3419 3420 if (i == SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES) { 3421 SPDK_ERRLOG("Invalid rate limits set.\n"); 3422 return; 3423 } 3424 3425 for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) { 3426 if (limits[i] == SPDK_BDEV_QOS_LIMIT_NOT_DEFINED) { 3427 continue; 3428 } 3429 3430 if (_spdk_bdev_qos_is_iops_rate_limit(i) == true) { 3431 min_qos_set = SPDK_BDEV_QOS_MIN_IOS_PER_SEC; 3432 } else { 3433 min_qos_set = SPDK_BDEV_QOS_MIN_BYTES_PER_SEC; 3434 } 3435 3436 if (limits[i] == 0 || limits[i] % min_qos_set) { 3437 SPDK_ERRLOG("Assigned limit %" PRIu64 " on bdev %s is not multiple of %" PRIu64 "\n", 3438 limits[i], bdev->name, min_qos_set); 3439 SPDK_ERRLOG("Failed to enable QoS on this bdev %s\n", bdev->name); 3440 return; 3441 } 3442 } 3443 3444 if (!bdev->internal.qos) { 3445 bdev->internal.qos = calloc(1, sizeof(*bdev->internal.qos)); 3446 if (!bdev->internal.qos) { 3447 SPDK_ERRLOG("Unable to allocate memory for QoS tracking\n"); 3448 return; 3449 } 3450 } 3451 3452 for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) { 3453 bdev->internal.qos->rate_limits[i].limit = limits[i]; 3454 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Bdev:%s QoS type:%d set:%lu\n", 3455 bdev->name, i, limits[i]); 3456 } 3457 3458 return; 3459 } 3460 3461 static void 3462 _spdk_bdev_qos_config(struct spdk_bdev *bdev) 3463 { 3464 struct spdk_conf_section *sp = NULL; 3465 const char *val = NULL; 3466 int i = 0, j = 0; 3467 uint64_t limits[SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES] = {}; 3468 bool config_qos = false; 3469 3470 sp = spdk_conf_find_section(NULL, "QoS"); 3471 if (!sp) { 3472 return; 3473 } 3474 3475 while (j < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES) { 3476 limits[j] = SPDK_BDEV_QOS_LIMIT_NOT_DEFINED; 3477 3478 i = 0; 3479 while (true) { 3480 val = spdk_conf_section_get_nmval(sp, qos_conf_type[j], i, 0); 3481 if (!val) { 3482 break; 3483 } 3484 3485 if (strcmp(bdev->name, val) != 0) { 3486 i++; 3487 continue; 3488 } 3489 3490 val = spdk_conf_section_get_nmval(sp, qos_conf_type[j], i, 1); 3491 if (val) { 3492 if (_spdk_bdev_qos_is_iops_rate_limit(j) == true) { 3493 limits[j] = strtoull(val, NULL, 10); 3494 } else { 3495 limits[j] = strtoull(val, NULL, 10) * 1024 * 1024; 3496 } 3497 config_qos = true; 3498 } 3499 3500 break; 3501 } 3502 3503 j++; 3504 } 3505 3506 if (config_qos == true) { 3507 _spdk_bdev_qos_config_limit(bdev, limits); 3508 } 3509 3510 return; 3511 } 3512 3513 static int 3514 spdk_bdev_init(struct spdk_bdev *bdev) 3515 { 3516 char *bdev_name; 3517 3518 assert(bdev->module != NULL); 3519 3520 if (!bdev->name) { 3521 SPDK_ERRLOG("Bdev name is NULL\n"); 3522 return -EINVAL; 3523 } 3524 3525 if (spdk_bdev_get_by_name(bdev->name)) { 3526 SPDK_ERRLOG("Bdev name:%s already exists\n", bdev->name); 3527 return -EEXIST; 3528 } 3529 3530 /* Users often register their own I/O devices using the bdev name. In 3531 * order to avoid conflicts, prepend bdev_. */ 3532 bdev_name = spdk_sprintf_alloc("bdev_%s", bdev->name); 3533 if (!bdev_name) { 3534 SPDK_ERRLOG("Unable to allocate memory for internal bdev name.\n"); 3535 return -ENOMEM; 3536 } 3537 3538 bdev->internal.status = SPDK_BDEV_STATUS_READY; 3539 bdev->internal.measured_queue_depth = UINT64_MAX; 3540 bdev->internal.claim_module = NULL; 3541 bdev->internal.qd_poller = NULL; 3542 bdev->internal.qos = NULL; 3543 3544 if (spdk_bdev_get_buf_align(bdev) > 1) { 3545 if (bdev->split_on_optimal_io_boundary) { 3546 bdev->optimal_io_boundary = spdk_min(bdev->optimal_io_boundary, 3547 SPDK_BDEV_LARGE_BUF_MAX_SIZE / bdev->blocklen); 3548 } else { 3549 bdev->split_on_optimal_io_boundary = true; 3550 bdev->optimal_io_boundary = SPDK_BDEV_LARGE_BUF_MAX_SIZE / bdev->blocklen; 3551 } 3552 } 3553 3554 TAILQ_INIT(&bdev->internal.open_descs); 3555 3556 TAILQ_INIT(&bdev->aliases); 3557 3558 bdev->internal.reset_in_progress = NULL; 3559 3560 _spdk_bdev_qos_config(bdev); 3561 3562 spdk_io_device_register(__bdev_to_io_dev(bdev), 3563 spdk_bdev_channel_create, spdk_bdev_channel_destroy, 3564 sizeof(struct spdk_bdev_channel), 3565 bdev_name); 3566 3567 free(bdev_name); 3568 3569 pthread_mutex_init(&bdev->internal.mutex, NULL); 3570 return 0; 3571 } 3572 3573 static void 3574 spdk_bdev_destroy_cb(void *io_device) 3575 { 3576 int rc; 3577 struct spdk_bdev *bdev; 3578 spdk_bdev_unregister_cb cb_fn; 3579 void *cb_arg; 3580 3581 bdev = __bdev_from_io_dev(io_device); 3582 cb_fn = bdev->internal.unregister_cb; 3583 cb_arg = bdev->internal.unregister_ctx; 3584 3585 rc = bdev->fn_table->destruct(bdev->ctxt); 3586 if (rc < 0) { 3587 SPDK_ERRLOG("destruct failed\n"); 3588 } 3589 if (rc <= 0 && cb_fn != NULL) { 3590 cb_fn(cb_arg, rc); 3591 } 3592 } 3593 3594 3595 static void 3596 spdk_bdev_fini(struct spdk_bdev *bdev) 3597 { 3598 pthread_mutex_destroy(&bdev->internal.mutex); 3599 3600 free(bdev->internal.qos); 3601 3602 spdk_io_device_unregister(__bdev_to_io_dev(bdev), spdk_bdev_destroy_cb); 3603 } 3604 3605 static void 3606 spdk_bdev_start(struct spdk_bdev *bdev) 3607 { 3608 struct spdk_bdev_module *module; 3609 uint32_t action; 3610 3611 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Inserting bdev %s into list\n", bdev->name); 3612 TAILQ_INSERT_TAIL(&g_bdev_mgr.bdevs, bdev, internal.link); 3613 3614 /* Examine configuration before initializing I/O */ 3615 TAILQ_FOREACH(module, &g_bdev_mgr.bdev_modules, internal.tailq) { 3616 if (module->examine_config) { 3617 action = module->internal.action_in_progress; 3618 module->internal.action_in_progress++; 3619 module->examine_config(bdev); 3620 if (action != module->internal.action_in_progress) { 3621 SPDK_ERRLOG("examine_config for module %s did not call spdk_bdev_module_examine_done()\n", 3622 module->name); 3623 } 3624 } 3625 } 3626 3627 if (bdev->internal.claim_module) { 3628 return; 3629 } 3630 3631 TAILQ_FOREACH(module, &g_bdev_mgr.bdev_modules, internal.tailq) { 3632 if (module->examine_disk) { 3633 module->internal.action_in_progress++; 3634 module->examine_disk(bdev); 3635 } 3636 } 3637 } 3638 3639 int 3640 spdk_bdev_register(struct spdk_bdev *bdev) 3641 { 3642 int rc = spdk_bdev_init(bdev); 3643 3644 if (rc == 0) { 3645 spdk_bdev_start(bdev); 3646 } 3647 3648 return rc; 3649 } 3650 3651 int 3652 spdk_vbdev_register(struct spdk_bdev *vbdev, struct spdk_bdev **base_bdevs, int base_bdev_count) 3653 { 3654 int rc; 3655 3656 rc = spdk_bdev_init(vbdev); 3657 if (rc) { 3658 return rc; 3659 } 3660 3661 spdk_bdev_start(vbdev); 3662 return 0; 3663 } 3664 3665 void 3666 spdk_bdev_destruct_done(struct spdk_bdev *bdev, int bdeverrno) 3667 { 3668 if (bdev->internal.unregister_cb != NULL) { 3669 bdev->internal.unregister_cb(bdev->internal.unregister_ctx, bdeverrno); 3670 } 3671 } 3672 3673 static void 3674 _remove_notify(void *arg) 3675 { 3676 struct spdk_bdev_desc *desc = arg; 3677 3678 desc->remove_scheduled = false; 3679 3680 if (desc->closed) { 3681 free(desc); 3682 } else { 3683 desc->remove_cb(desc->remove_ctx); 3684 } 3685 } 3686 3687 void 3688 spdk_bdev_unregister(struct spdk_bdev *bdev, spdk_bdev_unregister_cb cb_fn, void *cb_arg) 3689 { 3690 struct spdk_bdev_desc *desc, *tmp; 3691 bool do_destruct = true; 3692 struct spdk_thread *thread; 3693 3694 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Removing bdev %s from list\n", bdev->name); 3695 3696 thread = spdk_get_thread(); 3697 if (!thread) { 3698 /* The user called this from a non-SPDK thread. */ 3699 if (cb_fn != NULL) { 3700 cb_fn(cb_arg, -ENOTSUP); 3701 } 3702 return; 3703 } 3704 3705 pthread_mutex_lock(&bdev->internal.mutex); 3706 3707 bdev->internal.status = SPDK_BDEV_STATUS_REMOVING; 3708 bdev->internal.unregister_cb = cb_fn; 3709 bdev->internal.unregister_ctx = cb_arg; 3710 3711 TAILQ_FOREACH_SAFE(desc, &bdev->internal.open_descs, link, tmp) { 3712 if (desc->remove_cb) { 3713 do_destruct = false; 3714 /* 3715 * Defer invocation of the remove_cb to a separate message that will 3716 * run later on its thread. This ensures this context unwinds and 3717 * we don't recursively unregister this bdev again if the remove_cb 3718 * immediately closes its descriptor. 3719 */ 3720 if (!desc->remove_scheduled) { 3721 /* Avoid scheduling removal of the same descriptor multiple times. */ 3722 desc->remove_scheduled = true; 3723 spdk_thread_send_msg(desc->thread, _remove_notify, desc); 3724 } 3725 } 3726 } 3727 3728 if (!do_destruct) { 3729 pthread_mutex_unlock(&bdev->internal.mutex); 3730 return; 3731 } 3732 3733 TAILQ_REMOVE(&g_bdev_mgr.bdevs, bdev, internal.link); 3734 pthread_mutex_unlock(&bdev->internal.mutex); 3735 3736 spdk_bdev_fini(bdev); 3737 } 3738 3739 int 3740 spdk_bdev_open(struct spdk_bdev *bdev, bool write, spdk_bdev_remove_cb_t remove_cb, 3741 void *remove_ctx, struct spdk_bdev_desc **_desc) 3742 { 3743 struct spdk_bdev_desc *desc; 3744 struct spdk_thread *thread; 3745 3746 thread = spdk_get_thread(); 3747 if (!thread) { 3748 SPDK_ERRLOG("Cannot open bdev from non-SPDK thread.\n"); 3749 return -ENOTSUP; 3750 } 3751 3752 desc = calloc(1, sizeof(*desc)); 3753 if (desc == NULL) { 3754 SPDK_ERRLOG("Failed to allocate memory for bdev descriptor\n"); 3755 return -ENOMEM; 3756 } 3757 3758 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Opening descriptor %p for bdev %s on thread %p\n", desc, bdev->name, 3759 spdk_get_thread()); 3760 3761 desc->bdev = bdev; 3762 desc->thread = thread; 3763 desc->remove_cb = remove_cb; 3764 desc->remove_ctx = remove_ctx; 3765 desc->write = write; 3766 *_desc = desc; 3767 3768 pthread_mutex_lock(&bdev->internal.mutex); 3769 3770 if (write && bdev->internal.claim_module) { 3771 SPDK_ERRLOG("Could not open %s - %s module already claimed it\n", 3772 bdev->name, bdev->internal.claim_module->name); 3773 pthread_mutex_unlock(&bdev->internal.mutex); 3774 free(desc); 3775 *_desc = NULL; 3776 return -EPERM; 3777 } 3778 3779 TAILQ_INSERT_TAIL(&bdev->internal.open_descs, desc, link); 3780 3781 pthread_mutex_unlock(&bdev->internal.mutex); 3782 3783 return 0; 3784 } 3785 3786 void 3787 spdk_bdev_close(struct spdk_bdev_desc *desc) 3788 { 3789 struct spdk_bdev *bdev = desc->bdev; 3790 bool do_unregister = false; 3791 3792 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Closing descriptor %p for bdev %s on thread %p\n", desc, bdev->name, 3793 spdk_get_thread()); 3794 3795 assert(desc->thread == spdk_get_thread()); 3796 3797 pthread_mutex_lock(&bdev->internal.mutex); 3798 3799 TAILQ_REMOVE(&bdev->internal.open_descs, desc, link); 3800 3801 desc->closed = true; 3802 3803 if (!desc->remove_scheduled) { 3804 free(desc); 3805 } 3806 3807 /* If no more descriptors, kill QoS channel */ 3808 if (bdev->internal.qos && TAILQ_EMPTY(&bdev->internal.open_descs)) { 3809 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Closed last descriptor for bdev %s on thread %p. Stopping QoS.\n", 3810 bdev->name, spdk_get_thread()); 3811 3812 if (spdk_bdev_qos_destroy(bdev)) { 3813 /* There isn't anything we can do to recover here. Just let the 3814 * old QoS poller keep running. The QoS handling won't change 3815 * cores when the user allocates a new channel, but it won't break. */ 3816 SPDK_ERRLOG("Unable to shut down QoS poller. It will continue running on the current thread.\n"); 3817 } 3818 } 3819 3820 spdk_bdev_set_qd_sampling_period(bdev, 0); 3821 3822 if (bdev->internal.status == SPDK_BDEV_STATUS_REMOVING && TAILQ_EMPTY(&bdev->internal.open_descs)) { 3823 do_unregister = true; 3824 } 3825 pthread_mutex_unlock(&bdev->internal.mutex); 3826 3827 if (do_unregister == true) { 3828 spdk_bdev_unregister(bdev, bdev->internal.unregister_cb, bdev->internal.unregister_ctx); 3829 } 3830 } 3831 3832 int 3833 spdk_bdev_module_claim_bdev(struct spdk_bdev *bdev, struct spdk_bdev_desc *desc, 3834 struct spdk_bdev_module *module) 3835 { 3836 if (bdev->internal.claim_module != NULL) { 3837 SPDK_ERRLOG("bdev %s already claimed by module %s\n", bdev->name, 3838 bdev->internal.claim_module->name); 3839 return -EPERM; 3840 } 3841 3842 if (desc && !desc->write) { 3843 desc->write = true; 3844 } 3845 3846 bdev->internal.claim_module = module; 3847 return 0; 3848 } 3849 3850 void 3851 spdk_bdev_module_release_bdev(struct spdk_bdev *bdev) 3852 { 3853 assert(bdev->internal.claim_module != NULL); 3854 bdev->internal.claim_module = NULL; 3855 } 3856 3857 struct spdk_bdev * 3858 spdk_bdev_desc_get_bdev(struct spdk_bdev_desc *desc) 3859 { 3860 return desc->bdev; 3861 } 3862 3863 void 3864 spdk_bdev_io_get_iovec(struct spdk_bdev_io *bdev_io, struct iovec **iovp, int *iovcntp) 3865 { 3866 struct iovec *iovs; 3867 int iovcnt; 3868 3869 if (bdev_io == NULL) { 3870 return; 3871 } 3872 3873 switch (bdev_io->type) { 3874 case SPDK_BDEV_IO_TYPE_READ: 3875 iovs = bdev_io->u.bdev.iovs; 3876 iovcnt = bdev_io->u.bdev.iovcnt; 3877 break; 3878 case SPDK_BDEV_IO_TYPE_WRITE: 3879 iovs = bdev_io->u.bdev.iovs; 3880 iovcnt = bdev_io->u.bdev.iovcnt; 3881 break; 3882 default: 3883 iovs = NULL; 3884 iovcnt = 0; 3885 break; 3886 } 3887 3888 if (iovp) { 3889 *iovp = iovs; 3890 } 3891 if (iovcntp) { 3892 *iovcntp = iovcnt; 3893 } 3894 } 3895 3896 void 3897 spdk_bdev_module_list_add(struct spdk_bdev_module *bdev_module) 3898 { 3899 3900 if (spdk_bdev_module_list_find(bdev_module->name)) { 3901 SPDK_ERRLOG("ERROR: module '%s' already registered.\n", bdev_module->name); 3902 assert(false); 3903 } 3904 3905 if (bdev_module->async_init) { 3906 bdev_module->internal.action_in_progress = 1; 3907 } 3908 3909 /* 3910 * Modules with examine callbacks must be initialized first, so they are 3911 * ready to handle examine callbacks from later modules that will 3912 * register physical bdevs. 3913 */ 3914 if (bdev_module->examine_config != NULL || bdev_module->examine_disk != NULL) { 3915 TAILQ_INSERT_HEAD(&g_bdev_mgr.bdev_modules, bdev_module, internal.tailq); 3916 } else { 3917 TAILQ_INSERT_TAIL(&g_bdev_mgr.bdev_modules, bdev_module, internal.tailq); 3918 } 3919 } 3920 3921 struct spdk_bdev_module * 3922 spdk_bdev_module_list_find(const char *name) 3923 { 3924 struct spdk_bdev_module *bdev_module; 3925 3926 TAILQ_FOREACH(bdev_module, &g_bdev_mgr.bdev_modules, internal.tailq) { 3927 if (strcmp(name, bdev_module->name) == 0) { 3928 break; 3929 } 3930 } 3931 3932 return bdev_module; 3933 } 3934 3935 static void 3936 _spdk_bdev_write_zero_buffer_next(void *_bdev_io) 3937 { 3938 struct spdk_bdev_io *bdev_io = _bdev_io; 3939 uint64_t num_bytes, num_blocks; 3940 int rc; 3941 3942 num_bytes = spdk_min(spdk_bdev_get_block_size(bdev_io->bdev) * 3943 bdev_io->u.bdev.split_remaining_num_blocks, 3944 ZERO_BUFFER_SIZE); 3945 num_blocks = num_bytes / spdk_bdev_get_block_size(bdev_io->bdev); 3946 3947 rc = spdk_bdev_write_blocks(bdev_io->internal.desc, 3948 spdk_io_channel_from_ctx(bdev_io->internal.ch), 3949 g_bdev_mgr.zero_buffer, 3950 bdev_io->u.bdev.split_current_offset_blocks, num_blocks, 3951 _spdk_bdev_write_zero_buffer_done, bdev_io); 3952 if (rc == 0) { 3953 bdev_io->u.bdev.split_remaining_num_blocks -= num_blocks; 3954 bdev_io->u.bdev.split_current_offset_blocks += num_blocks; 3955 } else if (rc == -ENOMEM) { 3956 _spdk_bdev_queue_io_wait_with_cb(bdev_io, _spdk_bdev_write_zero_buffer_next); 3957 } else { 3958 bdev_io->internal.status = SPDK_BDEV_IO_STATUS_FAILED; 3959 bdev_io->internal.cb(bdev_io, false, bdev_io->internal.caller_ctx); 3960 } 3961 } 3962 3963 static void 3964 _spdk_bdev_write_zero_buffer_done(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg) 3965 { 3966 struct spdk_bdev_io *parent_io = cb_arg; 3967 3968 spdk_bdev_free_io(bdev_io); 3969 3970 if (!success) { 3971 parent_io->internal.status = SPDK_BDEV_IO_STATUS_FAILED; 3972 parent_io->internal.cb(parent_io, false, parent_io->internal.caller_ctx); 3973 return; 3974 } 3975 3976 if (parent_io->u.bdev.split_remaining_num_blocks == 0) { 3977 parent_io->internal.status = SPDK_BDEV_IO_STATUS_SUCCESS; 3978 parent_io->internal.cb(parent_io, true, parent_io->internal.caller_ctx); 3979 return; 3980 } 3981 3982 _spdk_bdev_write_zero_buffer_next(parent_io); 3983 } 3984 3985 struct set_qos_limit_ctx { 3986 void (*cb_fn)(void *cb_arg, int status); 3987 void *cb_arg; 3988 struct spdk_bdev *bdev; 3989 }; 3990 3991 static void 3992 _spdk_bdev_set_qos_limit_done(struct set_qos_limit_ctx *ctx, int status) 3993 { 3994 pthread_mutex_lock(&ctx->bdev->internal.mutex); 3995 ctx->bdev->internal.qos_mod_in_progress = false; 3996 pthread_mutex_unlock(&ctx->bdev->internal.mutex); 3997 3998 ctx->cb_fn(ctx->cb_arg, status); 3999 free(ctx); 4000 } 4001 4002 static void 4003 _spdk_bdev_disable_qos_done(void *cb_arg) 4004 { 4005 struct set_qos_limit_ctx *ctx = cb_arg; 4006 struct spdk_bdev *bdev = ctx->bdev; 4007 struct spdk_bdev_io *bdev_io; 4008 struct spdk_bdev_qos *qos; 4009 4010 pthread_mutex_lock(&bdev->internal.mutex); 4011 qos = bdev->internal.qos; 4012 bdev->internal.qos = NULL; 4013 pthread_mutex_unlock(&bdev->internal.mutex); 4014 4015 while (!TAILQ_EMPTY(&qos->queued)) { 4016 /* Send queued I/O back to their original thread for resubmission. */ 4017 bdev_io = TAILQ_FIRST(&qos->queued); 4018 TAILQ_REMOVE(&qos->queued, bdev_io, internal.link); 4019 4020 if (bdev_io->internal.io_submit_ch) { 4021 /* 4022 * Channel was changed when sending it to the QoS thread - change it back 4023 * before sending it back to the original thread. 4024 */ 4025 bdev_io->internal.ch = bdev_io->internal.io_submit_ch; 4026 bdev_io->internal.io_submit_ch = NULL; 4027 } 4028 4029 spdk_thread_send_msg(spdk_io_channel_get_thread(bdev_io->internal.ch->channel), 4030 _spdk_bdev_io_submit, bdev_io); 4031 } 4032 4033 if (qos->thread != NULL) { 4034 spdk_put_io_channel(spdk_io_channel_from_ctx(qos->ch)); 4035 spdk_poller_unregister(&qos->poller); 4036 } 4037 4038 free(qos); 4039 4040 _spdk_bdev_set_qos_limit_done(ctx, 0); 4041 } 4042 4043 static void 4044 _spdk_bdev_disable_qos_msg_done(struct spdk_io_channel_iter *i, int status) 4045 { 4046 void *io_device = spdk_io_channel_iter_get_io_device(i); 4047 struct spdk_bdev *bdev = __bdev_from_io_dev(io_device); 4048 struct set_qos_limit_ctx *ctx = spdk_io_channel_iter_get_ctx(i); 4049 struct spdk_thread *thread; 4050 4051 pthread_mutex_lock(&bdev->internal.mutex); 4052 thread = bdev->internal.qos->thread; 4053 pthread_mutex_unlock(&bdev->internal.mutex); 4054 4055 if (thread != NULL) { 4056 spdk_thread_send_msg(thread, _spdk_bdev_disable_qos_done, ctx); 4057 } else { 4058 _spdk_bdev_disable_qos_done(ctx); 4059 } 4060 } 4061 4062 static void 4063 _spdk_bdev_disable_qos_msg(struct spdk_io_channel_iter *i) 4064 { 4065 struct spdk_io_channel *ch = spdk_io_channel_iter_get_channel(i); 4066 struct spdk_bdev_channel *bdev_ch = spdk_io_channel_get_ctx(ch); 4067 4068 bdev_ch->flags &= ~BDEV_CH_QOS_ENABLED; 4069 4070 spdk_for_each_channel_continue(i, 0); 4071 } 4072 4073 static void 4074 _spdk_bdev_update_qos_rate_limit_msg(void *cb_arg) 4075 { 4076 struct set_qos_limit_ctx *ctx = cb_arg; 4077 struct spdk_bdev *bdev = ctx->bdev; 4078 4079 pthread_mutex_lock(&bdev->internal.mutex); 4080 spdk_bdev_qos_update_max_quota_per_timeslice(bdev->internal.qos); 4081 pthread_mutex_unlock(&bdev->internal.mutex); 4082 4083 _spdk_bdev_set_qos_limit_done(ctx, 0); 4084 } 4085 4086 static void 4087 _spdk_bdev_enable_qos_msg(struct spdk_io_channel_iter *i) 4088 { 4089 void *io_device = spdk_io_channel_iter_get_io_device(i); 4090 struct spdk_bdev *bdev = __bdev_from_io_dev(io_device); 4091 struct spdk_io_channel *ch = spdk_io_channel_iter_get_channel(i); 4092 struct spdk_bdev_channel *bdev_ch = spdk_io_channel_get_ctx(ch); 4093 4094 pthread_mutex_lock(&bdev->internal.mutex); 4095 _spdk_bdev_enable_qos(bdev, bdev_ch); 4096 pthread_mutex_unlock(&bdev->internal.mutex); 4097 spdk_for_each_channel_continue(i, 0); 4098 } 4099 4100 static void 4101 _spdk_bdev_enable_qos_done(struct spdk_io_channel_iter *i, int status) 4102 { 4103 struct set_qos_limit_ctx *ctx = spdk_io_channel_iter_get_ctx(i); 4104 4105 _spdk_bdev_set_qos_limit_done(ctx, status); 4106 } 4107 4108 static void 4109 _spdk_bdev_set_qos_rate_limits(struct spdk_bdev *bdev, uint64_t *limits) 4110 { 4111 int i; 4112 4113 assert(bdev->internal.qos != NULL); 4114 4115 for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) { 4116 if (limits[i] != SPDK_BDEV_QOS_LIMIT_NOT_DEFINED) { 4117 bdev->internal.qos->rate_limits[i].limit = limits[i]; 4118 4119 if (limits[i] == 0) { 4120 bdev->internal.qos->rate_limits[i].limit = 4121 SPDK_BDEV_QOS_LIMIT_NOT_DEFINED; 4122 } 4123 } 4124 } 4125 } 4126 4127 void 4128 spdk_bdev_set_qos_rate_limits(struct spdk_bdev *bdev, uint64_t *limits, 4129 void (*cb_fn)(void *cb_arg, int status), void *cb_arg) 4130 { 4131 struct set_qos_limit_ctx *ctx; 4132 uint32_t limit_set_complement; 4133 uint64_t min_limit_per_sec; 4134 int i; 4135 bool disable_rate_limit = true; 4136 4137 for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) { 4138 if (limits[i] == SPDK_BDEV_QOS_LIMIT_NOT_DEFINED) { 4139 continue; 4140 } 4141 4142 if (limits[i] > 0) { 4143 disable_rate_limit = false; 4144 } 4145 4146 if (_spdk_bdev_qos_is_iops_rate_limit(i) == true) { 4147 min_limit_per_sec = SPDK_BDEV_QOS_MIN_IOS_PER_SEC; 4148 } else { 4149 /* Change from megabyte to byte rate limit */ 4150 limits[i] = limits[i] * 1024 * 1024; 4151 min_limit_per_sec = SPDK_BDEV_QOS_MIN_BYTES_PER_SEC; 4152 } 4153 4154 limit_set_complement = limits[i] % min_limit_per_sec; 4155 if (limit_set_complement) { 4156 SPDK_ERRLOG("Requested rate limit %" PRIu64 " is not a multiple of %" PRIu64 "\n", 4157 limits[i], min_limit_per_sec); 4158 limits[i] += min_limit_per_sec - limit_set_complement; 4159 SPDK_ERRLOG("Round up the rate limit to %" PRIu64 "\n", limits[i]); 4160 } 4161 } 4162 4163 ctx = calloc(1, sizeof(*ctx)); 4164 if (ctx == NULL) { 4165 cb_fn(cb_arg, -ENOMEM); 4166 return; 4167 } 4168 4169 ctx->cb_fn = cb_fn; 4170 ctx->cb_arg = cb_arg; 4171 ctx->bdev = bdev; 4172 4173 pthread_mutex_lock(&bdev->internal.mutex); 4174 if (bdev->internal.qos_mod_in_progress) { 4175 pthread_mutex_unlock(&bdev->internal.mutex); 4176 free(ctx); 4177 cb_fn(cb_arg, -EAGAIN); 4178 return; 4179 } 4180 bdev->internal.qos_mod_in_progress = true; 4181 4182 if (disable_rate_limit == true && bdev->internal.qos) { 4183 for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) { 4184 if (limits[i] == SPDK_BDEV_QOS_LIMIT_NOT_DEFINED && 4185 (bdev->internal.qos->rate_limits[i].limit > 0 && 4186 bdev->internal.qos->rate_limits[i].limit != 4187 SPDK_BDEV_QOS_LIMIT_NOT_DEFINED)) { 4188 disable_rate_limit = false; 4189 break; 4190 } 4191 } 4192 } 4193 4194 if (disable_rate_limit == false) { 4195 if (bdev->internal.qos == NULL) { 4196 bdev->internal.qos = calloc(1, sizeof(*bdev->internal.qos)); 4197 if (!bdev->internal.qos) { 4198 pthread_mutex_unlock(&bdev->internal.mutex); 4199 SPDK_ERRLOG("Unable to allocate memory for QoS tracking\n"); 4200 free(ctx); 4201 cb_fn(cb_arg, -ENOMEM); 4202 return; 4203 } 4204 } 4205 4206 if (bdev->internal.qos->thread == NULL) { 4207 /* Enabling */ 4208 _spdk_bdev_set_qos_rate_limits(bdev, limits); 4209 4210 spdk_for_each_channel(__bdev_to_io_dev(bdev), 4211 _spdk_bdev_enable_qos_msg, ctx, 4212 _spdk_bdev_enable_qos_done); 4213 } else { 4214 /* Updating */ 4215 _spdk_bdev_set_qos_rate_limits(bdev, limits); 4216 4217 spdk_thread_send_msg(bdev->internal.qos->thread, 4218 _spdk_bdev_update_qos_rate_limit_msg, ctx); 4219 } 4220 } else { 4221 if (bdev->internal.qos != NULL) { 4222 _spdk_bdev_set_qos_rate_limits(bdev, limits); 4223 4224 /* Disabling */ 4225 spdk_for_each_channel(__bdev_to_io_dev(bdev), 4226 _spdk_bdev_disable_qos_msg, ctx, 4227 _spdk_bdev_disable_qos_msg_done); 4228 } else { 4229 pthread_mutex_unlock(&bdev->internal.mutex); 4230 _spdk_bdev_set_qos_limit_done(ctx, 0); 4231 return; 4232 } 4233 } 4234 4235 pthread_mutex_unlock(&bdev->internal.mutex); 4236 } 4237 4238 struct spdk_bdev_histogram_ctx { 4239 spdk_bdev_histogram_status_cb cb_fn; 4240 void *cb_arg; 4241 struct spdk_bdev *bdev; 4242 int status; 4243 }; 4244 4245 static void 4246 _spdk_bdev_histogram_disable_channel_cb(struct spdk_io_channel_iter *i, int status) 4247 { 4248 struct spdk_bdev_histogram_ctx *ctx = spdk_io_channel_iter_get_ctx(i); 4249 4250 pthread_mutex_lock(&ctx->bdev->internal.mutex); 4251 ctx->bdev->internal.histogram_in_progress = false; 4252 pthread_mutex_unlock(&ctx->bdev->internal.mutex); 4253 ctx->cb_fn(ctx->cb_arg, ctx->status); 4254 free(ctx); 4255 } 4256 4257 static void 4258 _spdk_bdev_histogram_disable_channel(struct spdk_io_channel_iter *i) 4259 { 4260 struct spdk_io_channel *_ch = spdk_io_channel_iter_get_channel(i); 4261 struct spdk_bdev_channel *ch = spdk_io_channel_get_ctx(_ch); 4262 4263 if (ch->histogram != NULL) { 4264 spdk_histogram_data_free(ch->histogram); 4265 ch->histogram = NULL; 4266 } 4267 spdk_for_each_channel_continue(i, 0); 4268 } 4269 4270 static void 4271 _spdk_bdev_histogram_enable_channel_cb(struct spdk_io_channel_iter *i, int status) 4272 { 4273 struct spdk_bdev_histogram_ctx *ctx = spdk_io_channel_iter_get_ctx(i); 4274 4275 if (status != 0) { 4276 ctx->status = status; 4277 ctx->bdev->internal.histogram_enabled = false; 4278 spdk_for_each_channel(__bdev_to_io_dev(ctx->bdev), _spdk_bdev_histogram_disable_channel, ctx, 4279 _spdk_bdev_histogram_disable_channel_cb); 4280 } else { 4281 pthread_mutex_lock(&ctx->bdev->internal.mutex); 4282 ctx->bdev->internal.histogram_in_progress = false; 4283 pthread_mutex_unlock(&ctx->bdev->internal.mutex); 4284 ctx->cb_fn(ctx->cb_arg, ctx->status); 4285 free(ctx); 4286 } 4287 } 4288 4289 static void 4290 _spdk_bdev_histogram_enable_channel(struct spdk_io_channel_iter *i) 4291 { 4292 struct spdk_io_channel *_ch = spdk_io_channel_iter_get_channel(i); 4293 struct spdk_bdev_channel *ch = spdk_io_channel_get_ctx(_ch); 4294 int status = 0; 4295 4296 if (ch->histogram == NULL) { 4297 ch->histogram = spdk_histogram_data_alloc(); 4298 if (ch->histogram == NULL) { 4299 status = -ENOMEM; 4300 } 4301 } 4302 4303 spdk_for_each_channel_continue(i, status); 4304 } 4305 4306 void 4307 spdk_bdev_histogram_enable(struct spdk_bdev *bdev, spdk_bdev_histogram_status_cb cb_fn, 4308 void *cb_arg, bool enable) 4309 { 4310 struct spdk_bdev_histogram_ctx *ctx; 4311 4312 ctx = calloc(1, sizeof(struct spdk_bdev_histogram_ctx)); 4313 if (ctx == NULL) { 4314 cb_fn(cb_arg, -ENOMEM); 4315 return; 4316 } 4317 4318 ctx->bdev = bdev; 4319 ctx->status = 0; 4320 ctx->cb_fn = cb_fn; 4321 ctx->cb_arg = cb_arg; 4322 4323 pthread_mutex_lock(&bdev->internal.mutex); 4324 if (bdev->internal.histogram_in_progress) { 4325 pthread_mutex_unlock(&bdev->internal.mutex); 4326 free(ctx); 4327 cb_fn(cb_arg, -EAGAIN); 4328 return; 4329 } 4330 4331 bdev->internal.histogram_in_progress = true; 4332 pthread_mutex_unlock(&bdev->internal.mutex); 4333 4334 bdev->internal.histogram_enabled = enable; 4335 4336 if (enable) { 4337 /* Allocate histogram for each channel */ 4338 spdk_for_each_channel(__bdev_to_io_dev(bdev), _spdk_bdev_histogram_enable_channel, ctx, 4339 _spdk_bdev_histogram_enable_channel_cb); 4340 } else { 4341 spdk_for_each_channel(__bdev_to_io_dev(bdev), _spdk_bdev_histogram_disable_channel, ctx, 4342 _spdk_bdev_histogram_disable_channel_cb); 4343 } 4344 } 4345 4346 struct spdk_bdev_histogram_data_ctx { 4347 spdk_bdev_histogram_data_cb cb_fn; 4348 void *cb_arg; 4349 struct spdk_bdev *bdev; 4350 /** merged histogram data from all channels */ 4351 struct spdk_histogram_data *histogram; 4352 }; 4353 4354 static void 4355 _spdk_bdev_histogram_get_channel_cb(struct spdk_io_channel_iter *i, int status) 4356 { 4357 struct spdk_bdev_histogram_data_ctx *ctx = spdk_io_channel_iter_get_ctx(i); 4358 4359 ctx->cb_fn(ctx->cb_arg, status, ctx->histogram); 4360 free(ctx); 4361 } 4362 4363 static void 4364 _spdk_bdev_histogram_get_channel(struct spdk_io_channel_iter *i) 4365 { 4366 struct spdk_io_channel *_ch = spdk_io_channel_iter_get_channel(i); 4367 struct spdk_bdev_channel *ch = spdk_io_channel_get_ctx(_ch); 4368 struct spdk_bdev_histogram_data_ctx *ctx = spdk_io_channel_iter_get_ctx(i); 4369 int status = 0; 4370 4371 if (ch->histogram == NULL) { 4372 status = -EFAULT; 4373 } else { 4374 spdk_histogram_data_merge(ctx->histogram, ch->histogram); 4375 } 4376 4377 spdk_for_each_channel_continue(i, status); 4378 } 4379 4380 void 4381 spdk_bdev_histogram_get(struct spdk_bdev *bdev, struct spdk_histogram_data *histogram, 4382 spdk_bdev_histogram_data_cb cb_fn, 4383 void *cb_arg) 4384 { 4385 struct spdk_bdev_histogram_data_ctx *ctx; 4386 4387 ctx = calloc(1, sizeof(struct spdk_bdev_histogram_data_ctx)); 4388 if (ctx == NULL) { 4389 cb_fn(cb_arg, -ENOMEM, NULL); 4390 return; 4391 } 4392 4393 ctx->bdev = bdev; 4394 ctx->cb_fn = cb_fn; 4395 ctx->cb_arg = cb_arg; 4396 4397 ctx->histogram = histogram; 4398 4399 spdk_for_each_channel(__bdev_to_io_dev(bdev), _spdk_bdev_histogram_get_channel, ctx, 4400 _spdk_bdev_histogram_get_channel_cb); 4401 } 4402 4403 SPDK_LOG_REGISTER_COMPONENT("bdev", SPDK_LOG_BDEV) 4404 4405 SPDK_TRACE_REGISTER_FN(bdev_trace, "bdev", TRACE_GROUP_BDEV) 4406 { 4407 spdk_trace_register_owner(OWNER_BDEV, 'b'); 4408 spdk_trace_register_object(OBJECT_BDEV_IO, 'i'); 4409 spdk_trace_register_description("BDEV_IO_START", "", TRACE_BDEV_IO_START, OWNER_BDEV, 4410 OBJECT_BDEV_IO, 1, 0, "type: "); 4411 spdk_trace_register_description("BDEV_IO_DONE", "", TRACE_BDEV_IO_DONE, OWNER_BDEV, 4412 OBJECT_BDEV_IO, 0, 0, ""); 4413 } 4414