1 /*- 2 * BSD LICENSE 3 * 4 * Copyright (c) Intel Corporation. 5 * All rights reserved. 6 * 7 * Redistribution and use in source and binary forms, with or without 8 * modification, are permitted provided that the following conditions 9 * are met: 10 * 11 * * Redistributions of source code must retain the above copyright 12 * notice, this list of conditions and the following disclaimer. 13 * * Redistributions in binary form must reproduce the above copyright 14 * notice, this list of conditions and the following disclaimer in 15 * the documentation and/or other materials provided with the 16 * distribution. 17 * * Neither the name of Intel Corporation nor the names of its 18 * contributors may be used to endorse or promote products derived 19 * from this software without specific prior written permission. 20 * 21 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 22 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 23 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR 24 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT 25 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, 26 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT 27 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 28 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY 29 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 30 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE 31 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 32 */ 33 34 #include "spdk/stdinc.h" 35 36 #include "spdk/bdev.h" 37 #include "spdk/conf.h" 38 39 #include "spdk/config.h" 40 #include "spdk/env.h" 41 #include "spdk/event.h" 42 #include "spdk/thread.h" 43 #include "spdk/likely.h" 44 #include "spdk/queue.h" 45 #include "spdk/nvme_spec.h" 46 #include "spdk/scsi_spec.h" 47 #include "spdk/util.h" 48 #include "spdk/trace.h" 49 50 #include "spdk/bdev_module.h" 51 #include "spdk_internal/log.h" 52 #include "spdk/string.h" 53 54 #ifdef SPDK_CONFIG_VTUNE 55 #include "ittnotify.h" 56 #include "ittnotify_types.h" 57 int __itt_init_ittlib(const char *, __itt_group_id); 58 #endif 59 60 #define SPDK_BDEV_IO_POOL_SIZE (64 * 1024) 61 #define SPDK_BDEV_IO_CACHE_SIZE 256 62 #define BUF_SMALL_POOL_SIZE 8192 63 #define BUF_LARGE_POOL_SIZE 1024 64 #define NOMEM_THRESHOLD_COUNT 8 65 #define ZERO_BUFFER_SIZE 0x100000 66 67 #define OWNER_BDEV 0x2 68 69 #define OBJECT_BDEV_IO 0x2 70 71 #define TRACE_GROUP_BDEV 0x3 72 #define TRACE_BDEV_IO_START SPDK_TPOINT_ID(TRACE_GROUP_BDEV, 0x0) 73 #define TRACE_BDEV_IO_DONE SPDK_TPOINT_ID(TRACE_GROUP_BDEV, 0x1) 74 75 #define SPDK_BDEV_QOS_TIMESLICE_IN_USEC 1000 76 #define SPDK_BDEV_QOS_MIN_IO_PER_TIMESLICE 1 77 #define SPDK_BDEV_QOS_MIN_BYTE_PER_TIMESLICE 512 78 #define SPDK_BDEV_QOS_MIN_IOS_PER_SEC 10000 79 #define SPDK_BDEV_QOS_MIN_BYTES_PER_SEC (10 * 1024 * 1024) 80 #define SPDK_BDEV_QOS_LIMIT_NOT_DEFINED UINT64_MAX 81 82 #define SPDK_BDEV_POOL_ALIGNMENT 512 83 84 static const char *qos_conf_type[] = {"Limit_IOPS", "Limit_BPS"}; 85 static const char *qos_rpc_type[] = {"rw_ios_per_sec", "rw_mbytes_per_sec"}; 86 87 TAILQ_HEAD(spdk_bdev_list, spdk_bdev); 88 89 struct spdk_bdev_mgr { 90 struct spdk_mempool *bdev_io_pool; 91 92 struct spdk_mempool *buf_small_pool; 93 struct spdk_mempool *buf_large_pool; 94 95 void *zero_buffer; 96 97 TAILQ_HEAD(bdev_module_list, spdk_bdev_module) bdev_modules; 98 99 struct spdk_bdev_list bdevs; 100 101 bool init_complete; 102 bool module_init_complete; 103 104 #ifdef SPDK_CONFIG_VTUNE 105 __itt_domain *domain; 106 #endif 107 }; 108 109 static struct spdk_bdev_mgr g_bdev_mgr = { 110 .bdev_modules = TAILQ_HEAD_INITIALIZER(g_bdev_mgr.bdev_modules), 111 .bdevs = TAILQ_HEAD_INITIALIZER(g_bdev_mgr.bdevs), 112 .init_complete = false, 113 .module_init_complete = false, 114 }; 115 116 static struct spdk_bdev_opts g_bdev_opts = { 117 .bdev_io_pool_size = SPDK_BDEV_IO_POOL_SIZE, 118 .bdev_io_cache_size = SPDK_BDEV_IO_CACHE_SIZE, 119 }; 120 121 static spdk_bdev_init_cb g_init_cb_fn = NULL; 122 static void *g_init_cb_arg = NULL; 123 124 static spdk_bdev_fini_cb g_fini_cb_fn = NULL; 125 static void *g_fini_cb_arg = NULL; 126 static struct spdk_thread *g_fini_thread = NULL; 127 128 struct spdk_bdev_qos_limit { 129 /** IOs or bytes allowed per second (i.e., 1s). */ 130 uint64_t limit; 131 132 /** Remaining IOs or bytes allowed in current timeslice (e.g., 1ms). 133 * For remaining bytes, allowed to run negative if an I/O is submitted when 134 * some bytes are remaining, but the I/O is bigger than that amount. The 135 * excess will be deducted from the next timeslice. 136 */ 137 int64_t remaining_this_timeslice; 138 139 /** Minimum allowed IOs or bytes to be issued in one timeslice (e.g., 1ms). */ 140 uint32_t min_per_timeslice; 141 142 /** Maximum allowed IOs or bytes to be issued in one timeslice (e.g., 1ms). */ 143 uint32_t max_per_timeslice; 144 145 /** Function to check whether to queue the IO. */ 146 bool (*queue_io)(const struct spdk_bdev_qos_limit *limit, struct spdk_bdev_io *io); 147 148 /** Function to update for the submitted IO. */ 149 void (*update_quota)(struct spdk_bdev_qos_limit *limit, struct spdk_bdev_io *io); 150 }; 151 152 struct spdk_bdev_qos { 153 /** Types of structure of rate limits. */ 154 struct spdk_bdev_qos_limit rate_limits[SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES]; 155 156 /** The channel that all I/O are funneled through. */ 157 struct spdk_bdev_channel *ch; 158 159 /** The thread on which the poller is running. */ 160 struct spdk_thread *thread; 161 162 /** Queue of I/O waiting to be issued. */ 163 bdev_io_tailq_t queued; 164 165 /** Size of a timeslice in tsc ticks. */ 166 uint64_t timeslice_size; 167 168 /** Timestamp of start of last timeslice. */ 169 uint64_t last_timeslice; 170 171 /** Poller that processes queued I/O commands each time slice. */ 172 struct spdk_poller *poller; 173 }; 174 175 struct spdk_bdev_mgmt_channel { 176 bdev_io_stailq_t need_buf_small; 177 bdev_io_stailq_t need_buf_large; 178 179 /* 180 * Each thread keeps a cache of bdev_io - this allows 181 * bdev threads which are *not* DPDK threads to still 182 * benefit from a per-thread bdev_io cache. Without 183 * this, non-DPDK threads fetching from the mempool 184 * incur a cmpxchg on get and put. 185 */ 186 bdev_io_stailq_t per_thread_cache; 187 uint32_t per_thread_cache_count; 188 uint32_t bdev_io_cache_size; 189 190 TAILQ_HEAD(, spdk_bdev_shared_resource) shared_resources; 191 TAILQ_HEAD(, spdk_bdev_io_wait_entry) io_wait_queue; 192 }; 193 194 /* 195 * Per-module (or per-io_device) data. Multiple bdevs built on the same io_device 196 * will queue here their IO that awaits retry. It makes it possible to retry sending 197 * IO to one bdev after IO from other bdev completes. 198 */ 199 struct spdk_bdev_shared_resource { 200 /* The bdev management channel */ 201 struct spdk_bdev_mgmt_channel *mgmt_ch; 202 203 /* 204 * Count of I/O submitted to bdev module and waiting for completion. 205 * Incremented before submit_request() is called on an spdk_bdev_io. 206 */ 207 uint64_t io_outstanding; 208 209 /* 210 * Queue of IO awaiting retry because of a previous NOMEM status returned 211 * on this channel. 212 */ 213 bdev_io_tailq_t nomem_io; 214 215 /* 216 * Threshold which io_outstanding must drop to before retrying nomem_io. 217 */ 218 uint64_t nomem_threshold; 219 220 /* I/O channel allocated by a bdev module */ 221 struct spdk_io_channel *shared_ch; 222 223 /* Refcount of bdev channels using this resource */ 224 uint32_t ref; 225 226 TAILQ_ENTRY(spdk_bdev_shared_resource) link; 227 }; 228 229 #define BDEV_CH_RESET_IN_PROGRESS (1 << 0) 230 #define BDEV_CH_QOS_ENABLED (1 << 1) 231 232 struct spdk_bdev_channel { 233 struct spdk_bdev *bdev; 234 235 /* The channel for the underlying device */ 236 struct spdk_io_channel *channel; 237 238 /* Per io_device per thread data */ 239 struct spdk_bdev_shared_resource *shared_resource; 240 241 struct spdk_bdev_io_stat stat; 242 243 /* 244 * Count of I/O submitted through this channel and waiting for completion. 245 * Incremented before submit_request() is called on an spdk_bdev_io. 246 */ 247 uint64_t io_outstanding; 248 249 bdev_io_tailq_t queued_resets; 250 251 uint32_t flags; 252 253 struct spdk_histogram_data *histogram; 254 255 #ifdef SPDK_CONFIG_VTUNE 256 uint64_t start_tsc; 257 uint64_t interval_tsc; 258 __itt_string_handle *handle; 259 struct spdk_bdev_io_stat prev_stat; 260 #endif 261 262 }; 263 264 struct spdk_bdev_desc { 265 struct spdk_bdev *bdev; 266 struct spdk_thread *thread; 267 spdk_bdev_remove_cb_t remove_cb; 268 void *remove_ctx; 269 bool remove_scheduled; 270 bool closed; 271 bool write; 272 TAILQ_ENTRY(spdk_bdev_desc) link; 273 }; 274 275 struct spdk_bdev_iostat_ctx { 276 struct spdk_bdev_io_stat *stat; 277 spdk_bdev_get_device_stat_cb cb; 278 void *cb_arg; 279 }; 280 281 #define __bdev_to_io_dev(bdev) (((char *)bdev) + 1) 282 #define __bdev_from_io_dev(io_dev) ((struct spdk_bdev *)(((char *)io_dev) - 1)) 283 284 static void _spdk_bdev_write_zero_buffer_done(struct spdk_bdev_io *bdev_io, bool success, 285 void *cb_arg); 286 static void _spdk_bdev_write_zero_buffer_next(void *_bdev_io); 287 288 void 289 spdk_bdev_get_opts(struct spdk_bdev_opts *opts) 290 { 291 *opts = g_bdev_opts; 292 } 293 294 int 295 spdk_bdev_set_opts(struct spdk_bdev_opts *opts) 296 { 297 uint32_t min_pool_size; 298 299 /* 300 * Add 1 to the thread count to account for the extra mgmt_ch that gets created during subsystem 301 * initialization. A second mgmt_ch will be created on the same thread when the application starts 302 * but before the deferred put_io_channel event is executed for the first mgmt_ch. 303 */ 304 min_pool_size = opts->bdev_io_cache_size * (spdk_thread_get_count() + 1); 305 if (opts->bdev_io_pool_size < min_pool_size) { 306 SPDK_ERRLOG("bdev_io_pool_size %" PRIu32 " is not compatible with bdev_io_cache_size %" PRIu32 307 " and %" PRIu32 " threads\n", opts->bdev_io_pool_size, opts->bdev_io_cache_size, 308 spdk_thread_get_count()); 309 SPDK_ERRLOG("bdev_io_pool_size must be at least %" PRIu32 "\n", min_pool_size); 310 return -1; 311 } 312 313 g_bdev_opts = *opts; 314 return 0; 315 } 316 317 struct spdk_bdev * 318 spdk_bdev_first(void) 319 { 320 struct spdk_bdev *bdev; 321 322 bdev = TAILQ_FIRST(&g_bdev_mgr.bdevs); 323 if (bdev) { 324 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Starting bdev iteration at %s\n", bdev->name); 325 } 326 327 return bdev; 328 } 329 330 struct spdk_bdev * 331 spdk_bdev_next(struct spdk_bdev *prev) 332 { 333 struct spdk_bdev *bdev; 334 335 bdev = TAILQ_NEXT(prev, internal.link); 336 if (bdev) { 337 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Continuing bdev iteration at %s\n", bdev->name); 338 } 339 340 return bdev; 341 } 342 343 static struct spdk_bdev * 344 _bdev_next_leaf(struct spdk_bdev *bdev) 345 { 346 while (bdev != NULL) { 347 if (bdev->internal.claim_module == NULL) { 348 return bdev; 349 } else { 350 bdev = TAILQ_NEXT(bdev, internal.link); 351 } 352 } 353 354 return bdev; 355 } 356 357 struct spdk_bdev * 358 spdk_bdev_first_leaf(void) 359 { 360 struct spdk_bdev *bdev; 361 362 bdev = _bdev_next_leaf(TAILQ_FIRST(&g_bdev_mgr.bdevs)); 363 364 if (bdev) { 365 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Starting bdev iteration at %s\n", bdev->name); 366 } 367 368 return bdev; 369 } 370 371 struct spdk_bdev * 372 spdk_bdev_next_leaf(struct spdk_bdev *prev) 373 { 374 struct spdk_bdev *bdev; 375 376 bdev = _bdev_next_leaf(TAILQ_NEXT(prev, internal.link)); 377 378 if (bdev) { 379 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Continuing bdev iteration at %s\n", bdev->name); 380 } 381 382 return bdev; 383 } 384 385 struct spdk_bdev * 386 spdk_bdev_get_by_name(const char *bdev_name) 387 { 388 struct spdk_bdev_alias *tmp; 389 struct spdk_bdev *bdev = spdk_bdev_first(); 390 391 while (bdev != NULL) { 392 if (strcmp(bdev_name, bdev->name) == 0) { 393 return bdev; 394 } 395 396 TAILQ_FOREACH(tmp, &bdev->aliases, tailq) { 397 if (strcmp(bdev_name, tmp->alias) == 0) { 398 return bdev; 399 } 400 } 401 402 bdev = spdk_bdev_next(bdev); 403 } 404 405 return NULL; 406 } 407 408 void 409 spdk_bdev_io_set_buf(struct spdk_bdev_io *bdev_io, void *buf, size_t len) 410 { 411 struct iovec *iovs; 412 413 iovs = bdev_io->u.bdev.iovs; 414 415 assert(iovs != NULL); 416 assert(bdev_io->u.bdev.iovcnt >= 1); 417 418 iovs[0].iov_base = buf; 419 iovs[0].iov_len = len; 420 } 421 422 static bool 423 _is_buf_allocated(struct iovec *iovs) 424 { 425 return iovs[0].iov_base != NULL; 426 } 427 428 static bool 429 _are_iovs_aligned(struct iovec *iovs, int iovcnt, uint32_t alignment) 430 { 431 int i; 432 uintptr_t iov_base; 433 434 if (spdk_likely(alignment == 1)) { 435 return true; 436 } 437 438 for (i = 0; i < iovcnt; i++) { 439 iov_base = (uintptr_t)iovs[i].iov_base; 440 if ((iov_base & (alignment - 1)) != 0) { 441 return false; 442 } 443 } 444 445 return true; 446 } 447 448 static void 449 _copy_iovs_to_buf(void *buf, size_t buf_len, struct iovec *iovs, int iovcnt) 450 { 451 int i; 452 size_t len; 453 454 for (i = 0; i < iovcnt; i++) { 455 len = spdk_min(iovs[i].iov_len, buf_len); 456 memcpy(buf, iovs[i].iov_base, len); 457 buf += len; 458 buf_len -= len; 459 } 460 } 461 462 static void 463 _copy_buf_to_iovs(struct iovec *iovs, int iovcnt, void *buf, size_t buf_len) 464 { 465 int i; 466 size_t len; 467 468 for (i = 0; i < iovcnt; i++) { 469 len = spdk_min(iovs[i].iov_len, buf_len); 470 memcpy(iovs[i].iov_base, buf, len); 471 buf += len; 472 buf_len -= len; 473 } 474 } 475 476 static void 477 _bdev_io_set_bounce_buf(struct spdk_bdev_io *bdev_io, void *buf, size_t len) 478 { 479 /* save original iovec */ 480 bdev_io->internal.orig_iovs = bdev_io->u.bdev.iovs; 481 bdev_io->internal.orig_iovcnt = bdev_io->u.bdev.iovcnt; 482 /* set bounce iov */ 483 bdev_io->u.bdev.iovs = &bdev_io->internal.bounce_iov; 484 bdev_io->u.bdev.iovcnt = 1; 485 /* set bounce buffer for this operation */ 486 bdev_io->u.bdev.iovs[0].iov_base = buf; 487 bdev_io->u.bdev.iovs[0].iov_len = len; 488 /* if this is write path, copy data from original buffer to bounce buffer */ 489 if (bdev_io->type == SPDK_BDEV_IO_TYPE_WRITE) { 490 _copy_iovs_to_buf(buf, len, bdev_io->internal.orig_iovs, bdev_io->internal.orig_iovcnt); 491 } 492 } 493 494 static void 495 spdk_bdev_io_put_buf(struct spdk_bdev_io *bdev_io) 496 { 497 struct spdk_mempool *pool; 498 struct spdk_bdev_io *tmp; 499 void *buf, *aligned_buf; 500 bdev_io_stailq_t *stailq; 501 struct spdk_bdev_mgmt_channel *ch; 502 uint64_t buf_len; 503 uint64_t alignment; 504 bool buf_allocated; 505 506 buf = bdev_io->internal.buf; 507 buf_len = bdev_io->internal.buf_len; 508 alignment = spdk_bdev_get_buf_align(bdev_io->bdev); 509 ch = bdev_io->internal.ch->shared_resource->mgmt_ch; 510 511 bdev_io->internal.buf = NULL; 512 513 if (buf_len + alignment <= SPDK_BDEV_SMALL_BUF_MAX_SIZE + SPDK_BDEV_POOL_ALIGNMENT) { 514 pool = g_bdev_mgr.buf_small_pool; 515 stailq = &ch->need_buf_small; 516 } else { 517 pool = g_bdev_mgr.buf_large_pool; 518 stailq = &ch->need_buf_large; 519 } 520 521 if (STAILQ_EMPTY(stailq)) { 522 spdk_mempool_put(pool, buf); 523 } else { 524 tmp = STAILQ_FIRST(stailq); 525 526 alignment = spdk_bdev_get_buf_align(tmp->bdev); 527 buf_allocated = _is_buf_allocated(tmp->u.bdev.iovs); 528 529 aligned_buf = (void *)(((uintptr_t)buf + 530 (alignment - 1)) & ~(alignment - 1)); 531 if (buf_allocated) { 532 _bdev_io_set_bounce_buf(tmp, aligned_buf, tmp->internal.buf_len); 533 } else { 534 spdk_bdev_io_set_buf(tmp, aligned_buf, tmp->internal.buf_len); 535 } 536 537 STAILQ_REMOVE_HEAD(stailq, internal.buf_link); 538 tmp->internal.buf = buf; 539 tmp->internal.get_buf_cb(tmp->internal.ch->channel, tmp); 540 } 541 } 542 543 static void 544 _bdev_io_unset_bounce_buf(struct spdk_bdev_io *bdev_io) 545 { 546 /* if this is read path, copy data from bounce buffer to original buffer */ 547 if (bdev_io->type == SPDK_BDEV_IO_TYPE_READ && 548 bdev_io->internal.status == SPDK_BDEV_IO_STATUS_SUCCESS) { 549 _copy_buf_to_iovs(bdev_io->internal.orig_iovs, bdev_io->internal.orig_iovcnt, 550 bdev_io->internal.bounce_iov.iov_base, bdev_io->internal.bounce_iov.iov_len); 551 } 552 /* set orignal buffer for this io */ 553 bdev_io->u.bdev.iovcnt = bdev_io->internal.orig_iovcnt; 554 bdev_io->u.bdev.iovs = bdev_io->internal.orig_iovs; 555 /* disable bouncing buffer for this io */ 556 bdev_io->internal.orig_iovcnt = 0; 557 bdev_io->internal.orig_iovs = NULL; 558 /* return bounce buffer to the pool */ 559 spdk_bdev_io_put_buf(bdev_io); 560 } 561 562 void 563 spdk_bdev_io_get_buf(struct spdk_bdev_io *bdev_io, spdk_bdev_io_get_buf_cb cb, uint64_t len) 564 { 565 struct spdk_mempool *pool; 566 bdev_io_stailq_t *stailq; 567 void *buf, *aligned_buf; 568 struct spdk_bdev_mgmt_channel *mgmt_ch; 569 uint64_t alignment; 570 bool buf_allocated; 571 572 assert(cb != NULL); 573 assert(bdev_io->u.bdev.iovs != NULL); 574 575 alignment = spdk_bdev_get_buf_align(bdev_io->bdev); 576 buf_allocated = _is_buf_allocated(bdev_io->u.bdev.iovs); 577 578 if (buf_allocated && 579 _are_iovs_aligned(bdev_io->u.bdev.iovs, bdev_io->u.bdev.iovcnt, alignment)) { 580 /* Buffer already present and aligned */ 581 cb(bdev_io->internal.ch->channel, bdev_io); 582 return; 583 } 584 585 assert(len + alignment <= SPDK_BDEV_LARGE_BUF_MAX_SIZE + SPDK_BDEV_POOL_ALIGNMENT); 586 mgmt_ch = bdev_io->internal.ch->shared_resource->mgmt_ch; 587 588 bdev_io->internal.buf_len = len; 589 bdev_io->internal.get_buf_cb = cb; 590 591 if (len + alignment <= SPDK_BDEV_SMALL_BUF_MAX_SIZE + SPDK_BDEV_POOL_ALIGNMENT) { 592 pool = g_bdev_mgr.buf_small_pool; 593 stailq = &mgmt_ch->need_buf_small; 594 } else { 595 pool = g_bdev_mgr.buf_large_pool; 596 stailq = &mgmt_ch->need_buf_large; 597 } 598 599 buf = spdk_mempool_get(pool); 600 601 if (!buf) { 602 STAILQ_INSERT_TAIL(stailq, bdev_io, internal.buf_link); 603 } else { 604 aligned_buf = (void *)(((uintptr_t)buf + (alignment - 1)) & ~(alignment - 1)); 605 606 if (buf_allocated) { 607 _bdev_io_set_bounce_buf(bdev_io, aligned_buf, len); 608 } else { 609 spdk_bdev_io_set_buf(bdev_io, aligned_buf, len); 610 } 611 bdev_io->internal.buf = buf; 612 bdev_io->internal.get_buf_cb(bdev_io->internal.ch->channel, bdev_io); 613 } 614 } 615 616 static int 617 spdk_bdev_module_get_max_ctx_size(void) 618 { 619 struct spdk_bdev_module *bdev_module; 620 int max_bdev_module_size = 0; 621 622 TAILQ_FOREACH(bdev_module, &g_bdev_mgr.bdev_modules, internal.tailq) { 623 if (bdev_module->get_ctx_size && bdev_module->get_ctx_size() > max_bdev_module_size) { 624 max_bdev_module_size = bdev_module->get_ctx_size(); 625 } 626 } 627 628 return max_bdev_module_size; 629 } 630 631 void 632 spdk_bdev_config_text(FILE *fp) 633 { 634 struct spdk_bdev_module *bdev_module; 635 636 TAILQ_FOREACH(bdev_module, &g_bdev_mgr.bdev_modules, internal.tailq) { 637 if (bdev_module->config_text) { 638 bdev_module->config_text(fp); 639 } 640 } 641 } 642 643 static void 644 spdk_bdev_qos_config_json(struct spdk_bdev *bdev, struct spdk_json_write_ctx *w) 645 { 646 int i; 647 struct spdk_bdev_qos *qos = bdev->internal.qos; 648 uint64_t limits[SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES]; 649 650 if (!qos) { 651 return; 652 } 653 654 spdk_bdev_get_qos_rate_limits(bdev, limits); 655 656 spdk_json_write_object_begin(w); 657 spdk_json_write_named_string(w, "method", "set_bdev_qos_limit"); 658 spdk_json_write_name(w, "params"); 659 660 spdk_json_write_object_begin(w); 661 spdk_json_write_named_string(w, "name", bdev->name); 662 for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) { 663 if (limits[i] > 0) { 664 spdk_json_write_named_uint64(w, qos_rpc_type[i], limits[i]); 665 } 666 } 667 spdk_json_write_object_end(w); 668 669 spdk_json_write_object_end(w); 670 } 671 672 void 673 spdk_bdev_subsystem_config_json(struct spdk_json_write_ctx *w) 674 { 675 struct spdk_bdev_module *bdev_module; 676 struct spdk_bdev *bdev; 677 678 assert(w != NULL); 679 680 spdk_json_write_array_begin(w); 681 682 spdk_json_write_object_begin(w); 683 spdk_json_write_named_string(w, "method", "set_bdev_options"); 684 spdk_json_write_name(w, "params"); 685 spdk_json_write_object_begin(w); 686 spdk_json_write_named_uint32(w, "bdev_io_pool_size", g_bdev_opts.bdev_io_pool_size); 687 spdk_json_write_named_uint32(w, "bdev_io_cache_size", g_bdev_opts.bdev_io_cache_size); 688 spdk_json_write_object_end(w); 689 spdk_json_write_object_end(w); 690 691 TAILQ_FOREACH(bdev_module, &g_bdev_mgr.bdev_modules, internal.tailq) { 692 if (bdev_module->config_json) { 693 bdev_module->config_json(w); 694 } 695 } 696 697 TAILQ_FOREACH(bdev, &g_bdev_mgr.bdevs, internal.link) { 698 spdk_bdev_qos_config_json(bdev, w); 699 700 if (bdev->fn_table->write_config_json) { 701 bdev->fn_table->write_config_json(bdev, w); 702 } 703 } 704 705 spdk_json_write_array_end(w); 706 } 707 708 static int 709 spdk_bdev_mgmt_channel_create(void *io_device, void *ctx_buf) 710 { 711 struct spdk_bdev_mgmt_channel *ch = ctx_buf; 712 struct spdk_bdev_io *bdev_io; 713 uint32_t i; 714 715 STAILQ_INIT(&ch->need_buf_small); 716 STAILQ_INIT(&ch->need_buf_large); 717 718 STAILQ_INIT(&ch->per_thread_cache); 719 ch->bdev_io_cache_size = g_bdev_opts.bdev_io_cache_size; 720 721 /* Pre-populate bdev_io cache to ensure this thread cannot be starved. */ 722 ch->per_thread_cache_count = 0; 723 for (i = 0; i < ch->bdev_io_cache_size; i++) { 724 bdev_io = spdk_mempool_get(g_bdev_mgr.bdev_io_pool); 725 assert(bdev_io != NULL); 726 ch->per_thread_cache_count++; 727 STAILQ_INSERT_TAIL(&ch->per_thread_cache, bdev_io, internal.buf_link); 728 } 729 730 TAILQ_INIT(&ch->shared_resources); 731 TAILQ_INIT(&ch->io_wait_queue); 732 733 return 0; 734 } 735 736 static void 737 spdk_bdev_mgmt_channel_destroy(void *io_device, void *ctx_buf) 738 { 739 struct spdk_bdev_mgmt_channel *ch = ctx_buf; 740 struct spdk_bdev_io *bdev_io; 741 742 if (!STAILQ_EMPTY(&ch->need_buf_small) || !STAILQ_EMPTY(&ch->need_buf_large)) { 743 SPDK_ERRLOG("Pending I/O list wasn't empty on mgmt channel free\n"); 744 } 745 746 if (!TAILQ_EMPTY(&ch->shared_resources)) { 747 SPDK_ERRLOG("Module channel list wasn't empty on mgmt channel free\n"); 748 } 749 750 while (!STAILQ_EMPTY(&ch->per_thread_cache)) { 751 bdev_io = STAILQ_FIRST(&ch->per_thread_cache); 752 STAILQ_REMOVE_HEAD(&ch->per_thread_cache, internal.buf_link); 753 ch->per_thread_cache_count--; 754 spdk_mempool_put(g_bdev_mgr.bdev_io_pool, (void *)bdev_io); 755 } 756 757 assert(ch->per_thread_cache_count == 0); 758 } 759 760 static void 761 spdk_bdev_init_complete(int rc) 762 { 763 spdk_bdev_init_cb cb_fn = g_init_cb_fn; 764 void *cb_arg = g_init_cb_arg; 765 struct spdk_bdev_module *m; 766 767 g_bdev_mgr.init_complete = true; 768 g_init_cb_fn = NULL; 769 g_init_cb_arg = NULL; 770 771 /* 772 * For modules that need to know when subsystem init is complete, 773 * inform them now. 774 */ 775 if (rc == 0) { 776 TAILQ_FOREACH(m, &g_bdev_mgr.bdev_modules, internal.tailq) { 777 if (m->init_complete) { 778 m->init_complete(); 779 } 780 } 781 } 782 783 cb_fn(cb_arg, rc); 784 } 785 786 static void 787 spdk_bdev_module_action_complete(void) 788 { 789 struct spdk_bdev_module *m; 790 791 /* 792 * Don't finish bdev subsystem initialization if 793 * module pre-initialization is still in progress, or 794 * the subsystem been already initialized. 795 */ 796 if (!g_bdev_mgr.module_init_complete || g_bdev_mgr.init_complete) { 797 return; 798 } 799 800 /* 801 * Check all bdev modules for inits/examinations in progress. If any 802 * exist, return immediately since we cannot finish bdev subsystem 803 * initialization until all are completed. 804 */ 805 TAILQ_FOREACH(m, &g_bdev_mgr.bdev_modules, internal.tailq) { 806 if (m->internal.action_in_progress > 0) { 807 return; 808 } 809 } 810 811 /* 812 * Modules already finished initialization - now that all 813 * the bdev modules have finished their asynchronous I/O 814 * processing, the entire bdev layer can be marked as complete. 815 */ 816 spdk_bdev_init_complete(0); 817 } 818 819 static void 820 spdk_bdev_module_action_done(struct spdk_bdev_module *module) 821 { 822 assert(module->internal.action_in_progress > 0); 823 module->internal.action_in_progress--; 824 spdk_bdev_module_action_complete(); 825 } 826 827 void 828 spdk_bdev_module_init_done(struct spdk_bdev_module *module) 829 { 830 spdk_bdev_module_action_done(module); 831 } 832 833 void 834 spdk_bdev_module_examine_done(struct spdk_bdev_module *module) 835 { 836 spdk_bdev_module_action_done(module); 837 } 838 839 /** The last initialized bdev module */ 840 static struct spdk_bdev_module *g_resume_bdev_module = NULL; 841 842 static int 843 spdk_bdev_modules_init(void) 844 { 845 struct spdk_bdev_module *module; 846 int rc = 0; 847 848 TAILQ_FOREACH(module, &g_bdev_mgr.bdev_modules, internal.tailq) { 849 g_resume_bdev_module = module; 850 rc = module->module_init(); 851 if (rc != 0) { 852 return rc; 853 } 854 } 855 856 g_resume_bdev_module = NULL; 857 return 0; 858 } 859 860 861 static void 862 spdk_bdev_init_failed_complete(void *cb_arg) 863 { 864 spdk_bdev_init_complete(-1); 865 } 866 867 static void 868 spdk_bdev_init_failed(void *cb_arg) 869 { 870 spdk_bdev_finish(spdk_bdev_init_failed_complete, NULL); 871 } 872 873 void 874 spdk_bdev_initialize(spdk_bdev_init_cb cb_fn, void *cb_arg) 875 { 876 struct spdk_conf_section *sp; 877 struct spdk_bdev_opts bdev_opts; 878 int32_t bdev_io_pool_size, bdev_io_cache_size; 879 int cache_size; 880 int rc = 0; 881 char mempool_name[32]; 882 883 assert(cb_fn != NULL); 884 885 sp = spdk_conf_find_section(NULL, "Bdev"); 886 if (sp != NULL) { 887 spdk_bdev_get_opts(&bdev_opts); 888 889 bdev_io_pool_size = spdk_conf_section_get_intval(sp, "BdevIoPoolSize"); 890 if (bdev_io_pool_size >= 0) { 891 bdev_opts.bdev_io_pool_size = bdev_io_pool_size; 892 } 893 894 bdev_io_cache_size = spdk_conf_section_get_intval(sp, "BdevIoCacheSize"); 895 if (bdev_io_cache_size >= 0) { 896 bdev_opts.bdev_io_cache_size = bdev_io_cache_size; 897 } 898 899 if (spdk_bdev_set_opts(&bdev_opts)) { 900 spdk_bdev_init_complete(-1); 901 return; 902 } 903 904 assert(memcmp(&bdev_opts, &g_bdev_opts, sizeof(bdev_opts)) == 0); 905 } 906 907 g_init_cb_fn = cb_fn; 908 g_init_cb_arg = cb_arg; 909 910 snprintf(mempool_name, sizeof(mempool_name), "bdev_io_%d", getpid()); 911 912 g_bdev_mgr.bdev_io_pool = spdk_mempool_create(mempool_name, 913 g_bdev_opts.bdev_io_pool_size, 914 sizeof(struct spdk_bdev_io) + 915 spdk_bdev_module_get_max_ctx_size(), 916 0, 917 SPDK_ENV_SOCKET_ID_ANY); 918 919 if (g_bdev_mgr.bdev_io_pool == NULL) { 920 SPDK_ERRLOG("could not allocate spdk_bdev_io pool\n"); 921 spdk_bdev_init_complete(-1); 922 return; 923 } 924 925 /** 926 * Ensure no more than half of the total buffers end up local caches, by 927 * using spdk_thread_get_count() to determine how many local caches we need 928 * to account for. 929 */ 930 cache_size = BUF_SMALL_POOL_SIZE / (2 * spdk_thread_get_count()); 931 snprintf(mempool_name, sizeof(mempool_name), "buf_small_pool_%d", getpid()); 932 933 g_bdev_mgr.buf_small_pool = spdk_mempool_create(mempool_name, 934 BUF_SMALL_POOL_SIZE, 935 SPDK_BDEV_SMALL_BUF_MAX_SIZE + SPDK_BDEV_POOL_ALIGNMENT, 936 cache_size, 937 SPDK_ENV_SOCKET_ID_ANY); 938 if (!g_bdev_mgr.buf_small_pool) { 939 SPDK_ERRLOG("create rbuf small pool failed\n"); 940 spdk_bdev_init_complete(-1); 941 return; 942 } 943 944 cache_size = BUF_LARGE_POOL_SIZE / (2 * spdk_thread_get_count()); 945 snprintf(mempool_name, sizeof(mempool_name), "buf_large_pool_%d", getpid()); 946 947 g_bdev_mgr.buf_large_pool = spdk_mempool_create(mempool_name, 948 BUF_LARGE_POOL_SIZE, 949 SPDK_BDEV_LARGE_BUF_MAX_SIZE + SPDK_BDEV_POOL_ALIGNMENT, 950 cache_size, 951 SPDK_ENV_SOCKET_ID_ANY); 952 if (!g_bdev_mgr.buf_large_pool) { 953 SPDK_ERRLOG("create rbuf large pool failed\n"); 954 spdk_bdev_init_complete(-1); 955 return; 956 } 957 958 g_bdev_mgr.zero_buffer = spdk_dma_zmalloc(ZERO_BUFFER_SIZE, ZERO_BUFFER_SIZE, 959 NULL); 960 if (!g_bdev_mgr.zero_buffer) { 961 SPDK_ERRLOG("create bdev zero buffer failed\n"); 962 spdk_bdev_init_complete(-1); 963 return; 964 } 965 966 #ifdef SPDK_CONFIG_VTUNE 967 g_bdev_mgr.domain = __itt_domain_create("spdk_bdev"); 968 #endif 969 970 spdk_io_device_register(&g_bdev_mgr, spdk_bdev_mgmt_channel_create, 971 spdk_bdev_mgmt_channel_destroy, 972 sizeof(struct spdk_bdev_mgmt_channel), 973 "bdev_mgr"); 974 975 rc = spdk_bdev_modules_init(); 976 g_bdev_mgr.module_init_complete = true; 977 if (rc != 0) { 978 SPDK_ERRLOG("bdev modules init failed\n"); 979 spdk_thread_send_msg(spdk_get_thread(), spdk_bdev_init_failed, NULL); 980 return; 981 } 982 983 spdk_bdev_module_action_complete(); 984 } 985 986 static void 987 spdk_bdev_mgr_unregister_cb(void *io_device) 988 { 989 spdk_bdev_fini_cb cb_fn = g_fini_cb_fn; 990 991 if (spdk_mempool_count(g_bdev_mgr.bdev_io_pool) != g_bdev_opts.bdev_io_pool_size) { 992 SPDK_ERRLOG("bdev IO pool count is %zu but should be %u\n", 993 spdk_mempool_count(g_bdev_mgr.bdev_io_pool), 994 g_bdev_opts.bdev_io_pool_size); 995 } 996 997 if (spdk_mempool_count(g_bdev_mgr.buf_small_pool) != BUF_SMALL_POOL_SIZE) { 998 SPDK_ERRLOG("Small buffer pool count is %zu but should be %u\n", 999 spdk_mempool_count(g_bdev_mgr.buf_small_pool), 1000 BUF_SMALL_POOL_SIZE); 1001 assert(false); 1002 } 1003 1004 if (spdk_mempool_count(g_bdev_mgr.buf_large_pool) != BUF_LARGE_POOL_SIZE) { 1005 SPDK_ERRLOG("Large buffer pool count is %zu but should be %u\n", 1006 spdk_mempool_count(g_bdev_mgr.buf_large_pool), 1007 BUF_LARGE_POOL_SIZE); 1008 assert(false); 1009 } 1010 1011 spdk_mempool_free(g_bdev_mgr.bdev_io_pool); 1012 spdk_mempool_free(g_bdev_mgr.buf_small_pool); 1013 spdk_mempool_free(g_bdev_mgr.buf_large_pool); 1014 spdk_dma_free(g_bdev_mgr.zero_buffer); 1015 1016 cb_fn(g_fini_cb_arg); 1017 g_fini_cb_fn = NULL; 1018 g_fini_cb_arg = NULL; 1019 g_bdev_mgr.init_complete = false; 1020 g_bdev_mgr.module_init_complete = false; 1021 } 1022 1023 static void 1024 spdk_bdev_module_finish_iter(void *arg) 1025 { 1026 struct spdk_bdev_module *bdev_module; 1027 1028 /* Start iterating from the last touched module */ 1029 if (!g_resume_bdev_module) { 1030 bdev_module = TAILQ_LAST(&g_bdev_mgr.bdev_modules, bdev_module_list); 1031 } else { 1032 bdev_module = TAILQ_PREV(g_resume_bdev_module, bdev_module_list, 1033 internal.tailq); 1034 } 1035 1036 while (bdev_module) { 1037 if (bdev_module->async_fini) { 1038 /* Save our place so we can resume later. We must 1039 * save the variable here, before calling module_fini() 1040 * below, because in some cases the module may immediately 1041 * call spdk_bdev_module_finish_done() and re-enter 1042 * this function to continue iterating. */ 1043 g_resume_bdev_module = bdev_module; 1044 } 1045 1046 if (bdev_module->module_fini) { 1047 bdev_module->module_fini(); 1048 } 1049 1050 if (bdev_module->async_fini) { 1051 return; 1052 } 1053 1054 bdev_module = TAILQ_PREV(bdev_module, bdev_module_list, 1055 internal.tailq); 1056 } 1057 1058 g_resume_bdev_module = NULL; 1059 spdk_io_device_unregister(&g_bdev_mgr, spdk_bdev_mgr_unregister_cb); 1060 } 1061 1062 void 1063 spdk_bdev_module_finish_done(void) 1064 { 1065 if (spdk_get_thread() != g_fini_thread) { 1066 spdk_thread_send_msg(g_fini_thread, spdk_bdev_module_finish_iter, NULL); 1067 } else { 1068 spdk_bdev_module_finish_iter(NULL); 1069 } 1070 } 1071 1072 static void 1073 _spdk_bdev_finish_unregister_bdevs_iter(void *cb_arg, int bdeverrno) 1074 { 1075 struct spdk_bdev *bdev = cb_arg; 1076 1077 if (bdeverrno && bdev) { 1078 SPDK_WARNLOG("Unable to unregister bdev '%s' during spdk_bdev_finish()\n", 1079 bdev->name); 1080 1081 /* 1082 * Since the call to spdk_bdev_unregister() failed, we have no way to free this 1083 * bdev; try to continue by manually removing this bdev from the list and continue 1084 * with the next bdev in the list. 1085 */ 1086 TAILQ_REMOVE(&g_bdev_mgr.bdevs, bdev, internal.link); 1087 } 1088 1089 if (TAILQ_EMPTY(&g_bdev_mgr.bdevs)) { 1090 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Done unregistering bdevs\n"); 1091 /* 1092 * Bdev module finish need to be deferred as we might be in the middle of some context 1093 * (like bdev part free) that will use this bdev (or private bdev driver ctx data) 1094 * after returning. 1095 */ 1096 spdk_thread_send_msg(spdk_get_thread(), spdk_bdev_module_finish_iter, NULL); 1097 return; 1098 } 1099 1100 /* 1101 * Unregister last unclaimed bdev in the list, to ensure that bdev subsystem 1102 * shutdown proceeds top-down. The goal is to give virtual bdevs an opportunity 1103 * to detect clean shutdown as opposed to run-time hot removal of the underlying 1104 * base bdevs. 1105 * 1106 * Also, walk the list in the reverse order. 1107 */ 1108 for (bdev = TAILQ_LAST(&g_bdev_mgr.bdevs, spdk_bdev_list); 1109 bdev; bdev = TAILQ_PREV(bdev, spdk_bdev_list, internal.link)) { 1110 if (bdev->internal.claim_module != NULL) { 1111 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Skipping claimed bdev '%s'(<-'%s').\n", 1112 bdev->name, bdev->internal.claim_module->name); 1113 continue; 1114 } 1115 1116 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Unregistering bdev '%s'\n", bdev->name); 1117 spdk_bdev_unregister(bdev, _spdk_bdev_finish_unregister_bdevs_iter, bdev); 1118 return; 1119 } 1120 1121 /* 1122 * If any bdev fails to unclaim underlying bdev properly, we may face the 1123 * case of bdev list consisting of claimed bdevs only (if claims are managed 1124 * correctly, this would mean there's a loop in the claims graph which is 1125 * clearly impossible). Warn and unregister last bdev on the list then. 1126 */ 1127 for (bdev = TAILQ_LAST(&g_bdev_mgr.bdevs, spdk_bdev_list); 1128 bdev; bdev = TAILQ_PREV(bdev, spdk_bdev_list, internal.link)) { 1129 SPDK_ERRLOG("Unregistering claimed bdev '%s'!\n", bdev->name); 1130 spdk_bdev_unregister(bdev, _spdk_bdev_finish_unregister_bdevs_iter, bdev); 1131 return; 1132 } 1133 } 1134 1135 void 1136 spdk_bdev_finish(spdk_bdev_fini_cb cb_fn, void *cb_arg) 1137 { 1138 struct spdk_bdev_module *m; 1139 1140 assert(cb_fn != NULL); 1141 1142 g_fini_thread = spdk_get_thread(); 1143 1144 g_fini_cb_fn = cb_fn; 1145 g_fini_cb_arg = cb_arg; 1146 1147 TAILQ_FOREACH(m, &g_bdev_mgr.bdev_modules, internal.tailq) { 1148 if (m->fini_start) { 1149 m->fini_start(); 1150 } 1151 } 1152 1153 _spdk_bdev_finish_unregister_bdevs_iter(NULL, 0); 1154 } 1155 1156 static struct spdk_bdev_io * 1157 spdk_bdev_get_io(struct spdk_bdev_channel *channel) 1158 { 1159 struct spdk_bdev_mgmt_channel *ch = channel->shared_resource->mgmt_ch; 1160 struct spdk_bdev_io *bdev_io; 1161 1162 if (ch->per_thread_cache_count > 0) { 1163 bdev_io = STAILQ_FIRST(&ch->per_thread_cache); 1164 STAILQ_REMOVE_HEAD(&ch->per_thread_cache, internal.buf_link); 1165 ch->per_thread_cache_count--; 1166 } else if (spdk_unlikely(!TAILQ_EMPTY(&ch->io_wait_queue))) { 1167 /* 1168 * Don't try to look for bdev_ios in the global pool if there are 1169 * waiters on bdev_ios - we don't want this caller to jump the line. 1170 */ 1171 bdev_io = NULL; 1172 } else { 1173 bdev_io = spdk_mempool_get(g_bdev_mgr.bdev_io_pool); 1174 } 1175 1176 return bdev_io; 1177 } 1178 1179 void 1180 spdk_bdev_free_io(struct spdk_bdev_io *bdev_io) 1181 { 1182 struct spdk_bdev_mgmt_channel *ch = bdev_io->internal.ch->shared_resource->mgmt_ch; 1183 1184 assert(bdev_io != NULL); 1185 assert(bdev_io->internal.status != SPDK_BDEV_IO_STATUS_PENDING); 1186 1187 if (bdev_io->internal.buf != NULL) { 1188 spdk_bdev_io_put_buf(bdev_io); 1189 } 1190 1191 if (ch->per_thread_cache_count < ch->bdev_io_cache_size) { 1192 ch->per_thread_cache_count++; 1193 STAILQ_INSERT_TAIL(&ch->per_thread_cache, bdev_io, internal.buf_link); 1194 while (ch->per_thread_cache_count > 0 && !TAILQ_EMPTY(&ch->io_wait_queue)) { 1195 struct spdk_bdev_io_wait_entry *entry; 1196 1197 entry = TAILQ_FIRST(&ch->io_wait_queue); 1198 TAILQ_REMOVE(&ch->io_wait_queue, entry, link); 1199 entry->cb_fn(entry->cb_arg); 1200 } 1201 } else { 1202 /* We should never have a full cache with entries on the io wait queue. */ 1203 assert(TAILQ_EMPTY(&ch->io_wait_queue)); 1204 spdk_mempool_put(g_bdev_mgr.bdev_io_pool, (void *)bdev_io); 1205 } 1206 } 1207 1208 static bool 1209 _spdk_bdev_qos_is_iops_rate_limit(enum spdk_bdev_qos_rate_limit_type limit) 1210 { 1211 assert(limit != SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES); 1212 1213 switch (limit) { 1214 case SPDK_BDEV_QOS_RW_IOPS_RATE_LIMIT: 1215 return true; 1216 case SPDK_BDEV_QOS_RW_BPS_RATE_LIMIT: 1217 return false; 1218 case SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES: 1219 default: 1220 return false; 1221 } 1222 } 1223 1224 static bool 1225 _spdk_bdev_qos_io_to_limit(struct spdk_bdev_io *bdev_io) 1226 { 1227 switch (bdev_io->type) { 1228 case SPDK_BDEV_IO_TYPE_NVME_IO: 1229 case SPDK_BDEV_IO_TYPE_NVME_IO_MD: 1230 case SPDK_BDEV_IO_TYPE_READ: 1231 case SPDK_BDEV_IO_TYPE_WRITE: 1232 case SPDK_BDEV_IO_TYPE_UNMAP: 1233 case SPDK_BDEV_IO_TYPE_WRITE_ZEROES: 1234 return true; 1235 default: 1236 return false; 1237 } 1238 } 1239 1240 static uint64_t 1241 _spdk_bdev_get_io_size_in_byte(struct spdk_bdev_io *bdev_io) 1242 { 1243 struct spdk_bdev *bdev = bdev_io->bdev; 1244 1245 switch (bdev_io->type) { 1246 case SPDK_BDEV_IO_TYPE_NVME_IO: 1247 case SPDK_BDEV_IO_TYPE_NVME_IO_MD: 1248 return bdev_io->u.nvme_passthru.nbytes; 1249 case SPDK_BDEV_IO_TYPE_READ: 1250 case SPDK_BDEV_IO_TYPE_WRITE: 1251 case SPDK_BDEV_IO_TYPE_UNMAP: 1252 case SPDK_BDEV_IO_TYPE_WRITE_ZEROES: 1253 return bdev_io->u.bdev.num_blocks * bdev->blocklen; 1254 default: 1255 return 0; 1256 } 1257 } 1258 1259 static bool 1260 _spdk_bdev_qos_rw_queue_io(const struct spdk_bdev_qos_limit *limit, struct spdk_bdev_io *io) 1261 { 1262 if (limit->max_per_timeslice > 0 && limit->remaining_this_timeslice <= 0) { 1263 return true; 1264 } else { 1265 return false; 1266 } 1267 } 1268 1269 static void 1270 _spdk_bdev_qos_rw_iops_update_quota(struct spdk_bdev_qos_limit *limit, struct spdk_bdev_io *io) 1271 { 1272 limit->remaining_this_timeslice--; 1273 } 1274 1275 static void 1276 _spdk_bdev_qos_rw_bps_update_quota(struct spdk_bdev_qos_limit *limit, struct spdk_bdev_io *io) 1277 { 1278 limit->remaining_this_timeslice -= _spdk_bdev_get_io_size_in_byte(io); 1279 } 1280 1281 static void 1282 _spdk_bdev_qos_set_ops(struct spdk_bdev_qos *qos) 1283 { 1284 int i; 1285 1286 for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) { 1287 if (qos->rate_limits[i].limit == SPDK_BDEV_QOS_LIMIT_NOT_DEFINED) { 1288 qos->rate_limits[i].queue_io = NULL; 1289 qos->rate_limits[i].update_quota = NULL; 1290 continue; 1291 } 1292 1293 switch (i) { 1294 case SPDK_BDEV_QOS_RW_IOPS_RATE_LIMIT: 1295 qos->rate_limits[i].queue_io = _spdk_bdev_qos_rw_queue_io; 1296 qos->rate_limits[i].update_quota = _spdk_bdev_qos_rw_iops_update_quota; 1297 break; 1298 case SPDK_BDEV_QOS_RW_BPS_RATE_LIMIT: 1299 qos->rate_limits[i].queue_io = _spdk_bdev_qos_rw_queue_io; 1300 qos->rate_limits[i].update_quota = _spdk_bdev_qos_rw_bps_update_quota; 1301 break; 1302 default: 1303 break; 1304 } 1305 } 1306 } 1307 1308 static int 1309 _spdk_bdev_qos_io_submit(struct spdk_bdev_channel *ch, struct spdk_bdev_qos *qos) 1310 { 1311 struct spdk_bdev_io *bdev_io = NULL, *tmp = NULL; 1312 struct spdk_bdev *bdev = ch->bdev; 1313 struct spdk_bdev_shared_resource *shared_resource = ch->shared_resource; 1314 int i, submitted_ios = 0; 1315 1316 TAILQ_FOREACH_SAFE(bdev_io, &qos->queued, internal.link, tmp) { 1317 if (_spdk_bdev_qos_io_to_limit(bdev_io) == true) { 1318 for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) { 1319 if (!qos->rate_limits[i].queue_io) { 1320 continue; 1321 } 1322 1323 if (qos->rate_limits[i].queue_io(&qos->rate_limits[i], 1324 bdev_io) == true) { 1325 return submitted_ios; 1326 } 1327 } 1328 for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) { 1329 if (!qos->rate_limits[i].update_quota) { 1330 continue; 1331 } 1332 1333 qos->rate_limits[i].update_quota(&qos->rate_limits[i], bdev_io); 1334 } 1335 } 1336 1337 TAILQ_REMOVE(&qos->queued, bdev_io, internal.link); 1338 ch->io_outstanding++; 1339 shared_resource->io_outstanding++; 1340 bdev->fn_table->submit_request(ch->channel, bdev_io); 1341 submitted_ios++; 1342 } 1343 1344 return submitted_ios; 1345 } 1346 1347 static void 1348 _spdk_bdev_queue_io_wait_with_cb(struct spdk_bdev_io *bdev_io, spdk_bdev_io_wait_cb cb_fn) 1349 { 1350 int rc; 1351 1352 bdev_io->internal.waitq_entry.bdev = bdev_io->bdev; 1353 bdev_io->internal.waitq_entry.cb_fn = cb_fn; 1354 bdev_io->internal.waitq_entry.cb_arg = bdev_io; 1355 rc = spdk_bdev_queue_io_wait(bdev_io->bdev, spdk_io_channel_from_ctx(bdev_io->internal.ch), 1356 &bdev_io->internal.waitq_entry); 1357 if (rc != 0) { 1358 SPDK_ERRLOG("Queue IO failed, rc=%d\n", rc); 1359 bdev_io->internal.status = SPDK_BDEV_IO_STATUS_FAILED; 1360 bdev_io->internal.cb(bdev_io, false, bdev_io->internal.caller_ctx); 1361 } 1362 } 1363 1364 static bool 1365 _spdk_bdev_io_type_can_split(uint8_t type) 1366 { 1367 assert(type != SPDK_BDEV_IO_TYPE_INVALID); 1368 assert(type < SPDK_BDEV_NUM_IO_TYPES); 1369 1370 /* Only split READ and WRITE I/O. Theoretically other types of I/O like 1371 * UNMAP could be split, but these types of I/O are typically much larger 1372 * in size (sometimes the size of the entire block device), and the bdev 1373 * module can more efficiently split these types of I/O. Plus those types 1374 * of I/O do not have a payload, which makes the splitting process simpler. 1375 */ 1376 if (type == SPDK_BDEV_IO_TYPE_READ || type == SPDK_BDEV_IO_TYPE_WRITE) { 1377 return true; 1378 } else { 1379 return false; 1380 } 1381 } 1382 1383 static bool 1384 _spdk_bdev_io_should_split(struct spdk_bdev_io *bdev_io) 1385 { 1386 uint64_t start_stripe, end_stripe; 1387 uint32_t io_boundary = bdev_io->bdev->optimal_io_boundary; 1388 1389 if (io_boundary == 0) { 1390 return false; 1391 } 1392 1393 if (!_spdk_bdev_io_type_can_split(bdev_io->type)) { 1394 return false; 1395 } 1396 1397 start_stripe = bdev_io->u.bdev.offset_blocks; 1398 end_stripe = start_stripe + bdev_io->u.bdev.num_blocks - 1; 1399 /* Avoid expensive div operations if possible. These spdk_u32 functions are very cheap. */ 1400 if (spdk_likely(spdk_u32_is_pow2(io_boundary))) { 1401 start_stripe >>= spdk_u32log2(io_boundary); 1402 end_stripe >>= spdk_u32log2(io_boundary); 1403 } else { 1404 start_stripe /= io_boundary; 1405 end_stripe /= io_boundary; 1406 } 1407 return (start_stripe != end_stripe); 1408 } 1409 1410 static uint32_t 1411 _to_next_boundary(uint64_t offset, uint32_t boundary) 1412 { 1413 return (boundary - (offset % boundary)); 1414 } 1415 1416 static void 1417 _spdk_bdev_io_split_done(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg); 1418 1419 static void 1420 _spdk_bdev_io_split_with_payload(void *_bdev_io) 1421 { 1422 struct spdk_bdev_io *bdev_io = _bdev_io; 1423 uint64_t current_offset, remaining; 1424 uint32_t blocklen, to_next_boundary, to_next_boundary_bytes; 1425 struct iovec *parent_iov, *iov; 1426 uint64_t parent_iov_offset, iov_len; 1427 uint32_t parent_iovpos, parent_iovcnt, child_iovcnt, iovcnt; 1428 int rc; 1429 1430 remaining = bdev_io->u.bdev.split_remaining_num_blocks; 1431 current_offset = bdev_io->u.bdev.split_current_offset_blocks; 1432 blocklen = bdev_io->bdev->blocklen; 1433 parent_iov_offset = (current_offset - bdev_io->u.bdev.offset_blocks) * blocklen; 1434 parent_iovcnt = bdev_io->u.bdev.iovcnt; 1435 1436 for (parent_iovpos = 0; parent_iovpos < parent_iovcnt; parent_iovpos++) { 1437 parent_iov = &bdev_io->u.bdev.iovs[parent_iovpos]; 1438 if (parent_iov_offset < parent_iov->iov_len) { 1439 break; 1440 } 1441 parent_iov_offset -= parent_iov->iov_len; 1442 } 1443 1444 child_iovcnt = 0; 1445 while (remaining > 0 && parent_iovpos < parent_iovcnt && child_iovcnt < BDEV_IO_NUM_CHILD_IOV) { 1446 to_next_boundary = _to_next_boundary(current_offset, bdev_io->bdev->optimal_io_boundary); 1447 to_next_boundary = spdk_min(remaining, to_next_boundary); 1448 to_next_boundary_bytes = to_next_boundary * blocklen; 1449 iov = &bdev_io->child_iov[child_iovcnt]; 1450 iovcnt = 0; 1451 while (to_next_boundary_bytes > 0 && parent_iovpos < parent_iovcnt && 1452 child_iovcnt < BDEV_IO_NUM_CHILD_IOV) { 1453 parent_iov = &bdev_io->u.bdev.iovs[parent_iovpos]; 1454 iov_len = spdk_min(to_next_boundary_bytes, parent_iov->iov_len - parent_iov_offset); 1455 to_next_boundary_bytes -= iov_len; 1456 1457 bdev_io->child_iov[child_iovcnt].iov_base = parent_iov->iov_base + parent_iov_offset; 1458 bdev_io->child_iov[child_iovcnt].iov_len = iov_len; 1459 1460 if (iov_len < parent_iov->iov_len - parent_iov_offset) { 1461 parent_iov_offset += iov_len; 1462 } else { 1463 parent_iovpos++; 1464 parent_iov_offset = 0; 1465 } 1466 child_iovcnt++; 1467 iovcnt++; 1468 } 1469 1470 if (to_next_boundary_bytes > 0) { 1471 /* We had to stop this child I/O early because we ran out of 1472 * child_iov space. Make sure the iovs collected are valid and 1473 * then adjust to_next_boundary before starting the child I/O. 1474 */ 1475 if ((to_next_boundary_bytes % blocklen) != 0) { 1476 SPDK_ERRLOG("Remaining %" PRIu32 " is not multiple of block size %" PRIu32 "\n", 1477 to_next_boundary_bytes, blocklen); 1478 bdev_io->internal.status = SPDK_BDEV_IO_STATUS_FAILED; 1479 if (bdev_io->u.bdev.split_outstanding == 0) { 1480 bdev_io->internal.cb(bdev_io, false, bdev_io->internal.caller_ctx); 1481 } 1482 return; 1483 } 1484 to_next_boundary -= to_next_boundary_bytes / blocklen; 1485 } 1486 1487 bdev_io->u.bdev.split_outstanding++; 1488 1489 if (bdev_io->type == SPDK_BDEV_IO_TYPE_READ) { 1490 rc = spdk_bdev_readv_blocks(bdev_io->internal.desc, 1491 spdk_io_channel_from_ctx(bdev_io->internal.ch), 1492 iov, iovcnt, current_offset, to_next_boundary, 1493 _spdk_bdev_io_split_done, bdev_io); 1494 } else { 1495 rc = spdk_bdev_writev_blocks(bdev_io->internal.desc, 1496 spdk_io_channel_from_ctx(bdev_io->internal.ch), 1497 iov, iovcnt, current_offset, to_next_boundary, 1498 _spdk_bdev_io_split_done, bdev_io); 1499 } 1500 1501 if (rc == 0) { 1502 current_offset += to_next_boundary; 1503 remaining -= to_next_boundary; 1504 bdev_io->u.bdev.split_current_offset_blocks = current_offset; 1505 bdev_io->u.bdev.split_remaining_num_blocks = remaining; 1506 } else { 1507 bdev_io->u.bdev.split_outstanding--; 1508 if (rc == -ENOMEM) { 1509 if (bdev_io->u.bdev.split_outstanding == 0) { 1510 /* No I/O is outstanding. Hence we should wait here. */ 1511 _spdk_bdev_queue_io_wait_with_cb(bdev_io, 1512 _spdk_bdev_io_split_with_payload); 1513 } 1514 } else { 1515 bdev_io->internal.status = SPDK_BDEV_IO_STATUS_FAILED; 1516 if (bdev_io->u.bdev.split_outstanding == 0) { 1517 bdev_io->internal.cb(bdev_io, false, bdev_io->internal.caller_ctx); 1518 } 1519 } 1520 1521 return; 1522 } 1523 } 1524 } 1525 1526 static void 1527 _spdk_bdev_io_split_done(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg) 1528 { 1529 struct spdk_bdev_io *parent_io = cb_arg; 1530 1531 spdk_bdev_free_io(bdev_io); 1532 1533 if (!success) { 1534 parent_io->internal.status = SPDK_BDEV_IO_STATUS_FAILED; 1535 } 1536 parent_io->u.bdev.split_outstanding--; 1537 if (parent_io->u.bdev.split_outstanding != 0) { 1538 return; 1539 } 1540 1541 /* 1542 * Parent I/O finishes when all blocks are consumed or there is any failure of 1543 * child I/O and no outstanding child I/O. 1544 */ 1545 if (parent_io->u.bdev.split_remaining_num_blocks == 0 || 1546 parent_io->internal.status != SPDK_BDEV_IO_STATUS_SUCCESS) { 1547 parent_io->internal.cb(parent_io, parent_io->internal.status == SPDK_BDEV_IO_STATUS_SUCCESS, 1548 parent_io->internal.caller_ctx); 1549 return; 1550 } 1551 1552 /* 1553 * Continue with the splitting process. This function will complete the parent I/O if the 1554 * splitting is done. 1555 */ 1556 _spdk_bdev_io_split_with_payload(parent_io); 1557 } 1558 1559 static void 1560 _spdk_bdev_io_split(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io) 1561 { 1562 assert(_spdk_bdev_io_type_can_split(bdev_io->type)); 1563 1564 bdev_io->u.bdev.split_current_offset_blocks = bdev_io->u.bdev.offset_blocks; 1565 bdev_io->u.bdev.split_remaining_num_blocks = bdev_io->u.bdev.num_blocks; 1566 bdev_io->u.bdev.split_outstanding = 0; 1567 bdev_io->internal.status = SPDK_BDEV_IO_STATUS_SUCCESS; 1568 1569 _spdk_bdev_io_split_with_payload(bdev_io); 1570 } 1571 1572 static void 1573 _spdk_bdev_io_submit(void *ctx) 1574 { 1575 struct spdk_bdev_io *bdev_io = ctx; 1576 struct spdk_bdev *bdev = bdev_io->bdev; 1577 struct spdk_bdev_channel *bdev_ch = bdev_io->internal.ch; 1578 struct spdk_io_channel *ch = bdev_ch->channel; 1579 struct spdk_bdev_shared_resource *shared_resource = bdev_ch->shared_resource; 1580 uint64_t tsc; 1581 1582 tsc = spdk_get_ticks(); 1583 bdev_io->internal.submit_tsc = tsc; 1584 spdk_trace_record_tsc(tsc, TRACE_BDEV_IO_START, 0, 0, (uintptr_t)bdev_io, bdev_io->type); 1585 bdev_ch->io_outstanding++; 1586 shared_resource->io_outstanding++; 1587 bdev_io->internal.in_submit_request = true; 1588 if (spdk_likely(bdev_ch->flags == 0)) { 1589 if (spdk_likely(TAILQ_EMPTY(&shared_resource->nomem_io))) { 1590 bdev->fn_table->submit_request(ch, bdev_io); 1591 } else { 1592 bdev_ch->io_outstanding--; 1593 shared_resource->io_outstanding--; 1594 TAILQ_INSERT_TAIL(&shared_resource->nomem_io, bdev_io, internal.link); 1595 } 1596 } else if (bdev_ch->flags & BDEV_CH_RESET_IN_PROGRESS) { 1597 spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED); 1598 } else if (bdev_ch->flags & BDEV_CH_QOS_ENABLED) { 1599 bdev_ch->io_outstanding--; 1600 shared_resource->io_outstanding--; 1601 TAILQ_INSERT_TAIL(&bdev->internal.qos->queued, bdev_io, internal.link); 1602 _spdk_bdev_qos_io_submit(bdev_ch, bdev->internal.qos); 1603 } else { 1604 SPDK_ERRLOG("unknown bdev_ch flag %x found\n", bdev_ch->flags); 1605 spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED); 1606 } 1607 bdev_io->internal.in_submit_request = false; 1608 } 1609 1610 static void 1611 spdk_bdev_io_submit(struct spdk_bdev_io *bdev_io) 1612 { 1613 struct spdk_bdev *bdev = bdev_io->bdev; 1614 struct spdk_thread *thread = spdk_io_channel_get_thread(bdev_io->internal.ch->channel); 1615 1616 assert(thread != NULL); 1617 assert(bdev_io->internal.status == SPDK_BDEV_IO_STATUS_PENDING); 1618 1619 if (bdev->split_on_optimal_io_boundary && _spdk_bdev_io_should_split(bdev_io)) { 1620 if (bdev_io->type == SPDK_BDEV_IO_TYPE_READ) { 1621 spdk_bdev_io_get_buf(bdev_io, _spdk_bdev_io_split, 1622 bdev_io->u.bdev.num_blocks * bdev_io->bdev->blocklen); 1623 } else { 1624 _spdk_bdev_io_split(NULL, bdev_io); 1625 } 1626 return; 1627 } 1628 1629 if (bdev_io->internal.ch->flags & BDEV_CH_QOS_ENABLED) { 1630 if ((thread == bdev->internal.qos->thread) || !bdev->internal.qos->thread) { 1631 _spdk_bdev_io_submit(bdev_io); 1632 } else { 1633 bdev_io->internal.io_submit_ch = bdev_io->internal.ch; 1634 bdev_io->internal.ch = bdev->internal.qos->ch; 1635 spdk_thread_send_msg(bdev->internal.qos->thread, _spdk_bdev_io_submit, bdev_io); 1636 } 1637 } else { 1638 _spdk_bdev_io_submit(bdev_io); 1639 } 1640 } 1641 1642 static void 1643 spdk_bdev_io_submit_reset(struct spdk_bdev_io *bdev_io) 1644 { 1645 struct spdk_bdev *bdev = bdev_io->bdev; 1646 struct spdk_bdev_channel *bdev_ch = bdev_io->internal.ch; 1647 struct spdk_io_channel *ch = bdev_ch->channel; 1648 1649 assert(bdev_io->internal.status == SPDK_BDEV_IO_STATUS_PENDING); 1650 1651 bdev_io->internal.in_submit_request = true; 1652 bdev->fn_table->submit_request(ch, bdev_io); 1653 bdev_io->internal.in_submit_request = false; 1654 } 1655 1656 static void 1657 spdk_bdev_io_init(struct spdk_bdev_io *bdev_io, 1658 struct spdk_bdev *bdev, void *cb_arg, 1659 spdk_bdev_io_completion_cb cb) 1660 { 1661 bdev_io->bdev = bdev; 1662 bdev_io->internal.caller_ctx = cb_arg; 1663 bdev_io->internal.cb = cb; 1664 bdev_io->internal.status = SPDK_BDEV_IO_STATUS_PENDING; 1665 bdev_io->internal.in_submit_request = false; 1666 bdev_io->internal.buf = NULL; 1667 bdev_io->internal.io_submit_ch = NULL; 1668 bdev_io->internal.orig_iovs = NULL; 1669 bdev_io->internal.orig_iovcnt = 0; 1670 } 1671 1672 static bool 1673 _spdk_bdev_io_type_supported(struct spdk_bdev *bdev, enum spdk_bdev_io_type io_type) 1674 { 1675 return bdev->fn_table->io_type_supported(bdev->ctxt, io_type); 1676 } 1677 1678 bool 1679 spdk_bdev_io_type_supported(struct spdk_bdev *bdev, enum spdk_bdev_io_type io_type) 1680 { 1681 bool supported; 1682 1683 supported = _spdk_bdev_io_type_supported(bdev, io_type); 1684 1685 if (!supported) { 1686 switch (io_type) { 1687 case SPDK_BDEV_IO_TYPE_WRITE_ZEROES: 1688 /* The bdev layer will emulate write zeroes as long as write is supported. */ 1689 supported = _spdk_bdev_io_type_supported(bdev, SPDK_BDEV_IO_TYPE_WRITE); 1690 break; 1691 default: 1692 break; 1693 } 1694 } 1695 1696 return supported; 1697 } 1698 1699 int 1700 spdk_bdev_dump_info_json(struct spdk_bdev *bdev, struct spdk_json_write_ctx *w) 1701 { 1702 if (bdev->fn_table->dump_info_json) { 1703 return bdev->fn_table->dump_info_json(bdev->ctxt, w); 1704 } 1705 1706 return 0; 1707 } 1708 1709 static void 1710 spdk_bdev_qos_update_max_quota_per_timeslice(struct spdk_bdev_qos *qos) 1711 { 1712 uint32_t max_per_timeslice = 0; 1713 int i; 1714 1715 for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) { 1716 if (qos->rate_limits[i].limit == SPDK_BDEV_QOS_LIMIT_NOT_DEFINED) { 1717 qos->rate_limits[i].max_per_timeslice = 0; 1718 continue; 1719 } 1720 1721 max_per_timeslice = qos->rate_limits[i].limit * 1722 SPDK_BDEV_QOS_TIMESLICE_IN_USEC / SPDK_SEC_TO_USEC; 1723 1724 qos->rate_limits[i].max_per_timeslice = spdk_max(max_per_timeslice, 1725 qos->rate_limits[i].min_per_timeslice); 1726 1727 qos->rate_limits[i].remaining_this_timeslice = qos->rate_limits[i].max_per_timeslice; 1728 } 1729 1730 _spdk_bdev_qos_set_ops(qos); 1731 } 1732 1733 static int 1734 spdk_bdev_channel_poll_qos(void *arg) 1735 { 1736 struct spdk_bdev_qos *qos = arg; 1737 uint64_t now = spdk_get_ticks(); 1738 int i; 1739 1740 if (now < (qos->last_timeslice + qos->timeslice_size)) { 1741 /* We received our callback earlier than expected - return 1742 * immediately and wait to do accounting until at least one 1743 * timeslice has actually expired. This should never happen 1744 * with a well-behaved timer implementation. 1745 */ 1746 return 0; 1747 } 1748 1749 /* Reset for next round of rate limiting */ 1750 for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) { 1751 /* We may have allowed the IOs or bytes to slightly overrun in the last 1752 * timeslice. remaining_this_timeslice is signed, so if it's negative 1753 * here, we'll account for the overrun so that the next timeslice will 1754 * be appropriately reduced. 1755 */ 1756 if (qos->rate_limits[i].remaining_this_timeslice > 0) { 1757 qos->rate_limits[i].remaining_this_timeslice = 0; 1758 } 1759 } 1760 1761 while (now >= (qos->last_timeslice + qos->timeslice_size)) { 1762 qos->last_timeslice += qos->timeslice_size; 1763 for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) { 1764 qos->rate_limits[i].remaining_this_timeslice += 1765 qos->rate_limits[i].max_per_timeslice; 1766 } 1767 } 1768 1769 return _spdk_bdev_qos_io_submit(qos->ch, qos); 1770 } 1771 1772 static void 1773 _spdk_bdev_channel_destroy_resource(struct spdk_bdev_channel *ch) 1774 { 1775 struct spdk_bdev_shared_resource *shared_resource; 1776 1777 spdk_put_io_channel(ch->channel); 1778 1779 shared_resource = ch->shared_resource; 1780 1781 assert(ch->io_outstanding == 0); 1782 assert(shared_resource->ref > 0); 1783 shared_resource->ref--; 1784 if (shared_resource->ref == 0) { 1785 assert(shared_resource->io_outstanding == 0); 1786 TAILQ_REMOVE(&shared_resource->mgmt_ch->shared_resources, shared_resource, link); 1787 spdk_put_io_channel(spdk_io_channel_from_ctx(shared_resource->mgmt_ch)); 1788 free(shared_resource); 1789 } 1790 } 1791 1792 /* Caller must hold bdev->internal.mutex. */ 1793 static void 1794 _spdk_bdev_enable_qos(struct spdk_bdev *bdev, struct spdk_bdev_channel *ch) 1795 { 1796 struct spdk_bdev_qos *qos = bdev->internal.qos; 1797 int i; 1798 1799 /* Rate limiting on this bdev enabled */ 1800 if (qos) { 1801 if (qos->ch == NULL) { 1802 struct spdk_io_channel *io_ch; 1803 1804 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Selecting channel %p as QoS channel for bdev %s on thread %p\n", ch, 1805 bdev->name, spdk_get_thread()); 1806 1807 /* No qos channel has been selected, so set one up */ 1808 1809 /* Take another reference to ch */ 1810 io_ch = spdk_get_io_channel(__bdev_to_io_dev(bdev)); 1811 assert(io_ch != NULL); 1812 qos->ch = ch; 1813 1814 qos->thread = spdk_io_channel_get_thread(io_ch); 1815 1816 TAILQ_INIT(&qos->queued); 1817 1818 for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) { 1819 if (_spdk_bdev_qos_is_iops_rate_limit(i) == true) { 1820 qos->rate_limits[i].min_per_timeslice = 1821 SPDK_BDEV_QOS_MIN_IO_PER_TIMESLICE; 1822 } else { 1823 qos->rate_limits[i].min_per_timeslice = 1824 SPDK_BDEV_QOS_MIN_BYTE_PER_TIMESLICE; 1825 } 1826 1827 if (qos->rate_limits[i].limit == 0) { 1828 qos->rate_limits[i].limit = SPDK_BDEV_QOS_LIMIT_NOT_DEFINED; 1829 } 1830 } 1831 spdk_bdev_qos_update_max_quota_per_timeslice(qos); 1832 qos->timeslice_size = 1833 SPDK_BDEV_QOS_TIMESLICE_IN_USEC * spdk_get_ticks_hz() / SPDK_SEC_TO_USEC; 1834 qos->last_timeslice = spdk_get_ticks(); 1835 qos->poller = spdk_poller_register(spdk_bdev_channel_poll_qos, 1836 qos, 1837 SPDK_BDEV_QOS_TIMESLICE_IN_USEC); 1838 } 1839 1840 ch->flags |= BDEV_CH_QOS_ENABLED; 1841 } 1842 } 1843 1844 static int 1845 spdk_bdev_channel_create(void *io_device, void *ctx_buf) 1846 { 1847 struct spdk_bdev *bdev = __bdev_from_io_dev(io_device); 1848 struct spdk_bdev_channel *ch = ctx_buf; 1849 struct spdk_io_channel *mgmt_io_ch; 1850 struct spdk_bdev_mgmt_channel *mgmt_ch; 1851 struct spdk_bdev_shared_resource *shared_resource; 1852 1853 ch->bdev = bdev; 1854 ch->channel = bdev->fn_table->get_io_channel(bdev->ctxt); 1855 if (!ch->channel) { 1856 return -1; 1857 } 1858 1859 assert(ch->histogram == NULL); 1860 if (bdev->internal.histogram_enabled) { 1861 ch->histogram = spdk_histogram_data_alloc(); 1862 if (ch->histogram == NULL) { 1863 SPDK_ERRLOG("Could not allocate histogram\n"); 1864 } 1865 } 1866 1867 mgmt_io_ch = spdk_get_io_channel(&g_bdev_mgr); 1868 if (!mgmt_io_ch) { 1869 spdk_put_io_channel(ch->channel); 1870 return -1; 1871 } 1872 1873 mgmt_ch = spdk_io_channel_get_ctx(mgmt_io_ch); 1874 TAILQ_FOREACH(shared_resource, &mgmt_ch->shared_resources, link) { 1875 if (shared_resource->shared_ch == ch->channel) { 1876 spdk_put_io_channel(mgmt_io_ch); 1877 shared_resource->ref++; 1878 break; 1879 } 1880 } 1881 1882 if (shared_resource == NULL) { 1883 shared_resource = calloc(1, sizeof(*shared_resource)); 1884 if (shared_resource == NULL) { 1885 spdk_put_io_channel(ch->channel); 1886 spdk_put_io_channel(mgmt_io_ch); 1887 return -1; 1888 } 1889 1890 shared_resource->mgmt_ch = mgmt_ch; 1891 shared_resource->io_outstanding = 0; 1892 TAILQ_INIT(&shared_resource->nomem_io); 1893 shared_resource->nomem_threshold = 0; 1894 shared_resource->shared_ch = ch->channel; 1895 shared_resource->ref = 1; 1896 TAILQ_INSERT_TAIL(&mgmt_ch->shared_resources, shared_resource, link); 1897 } 1898 1899 memset(&ch->stat, 0, sizeof(ch->stat)); 1900 ch->stat.ticks_rate = spdk_get_ticks_hz(); 1901 ch->io_outstanding = 0; 1902 TAILQ_INIT(&ch->queued_resets); 1903 ch->flags = 0; 1904 ch->shared_resource = shared_resource; 1905 1906 #ifdef SPDK_CONFIG_VTUNE 1907 { 1908 char *name; 1909 __itt_init_ittlib(NULL, 0); 1910 name = spdk_sprintf_alloc("spdk_bdev_%s_%p", ch->bdev->name, ch); 1911 if (!name) { 1912 _spdk_bdev_channel_destroy_resource(ch); 1913 return -1; 1914 } 1915 ch->handle = __itt_string_handle_create(name); 1916 free(name); 1917 ch->start_tsc = spdk_get_ticks(); 1918 ch->interval_tsc = spdk_get_ticks_hz() / 100; 1919 memset(&ch->prev_stat, 0, sizeof(ch->prev_stat)); 1920 } 1921 #endif 1922 1923 pthread_mutex_lock(&bdev->internal.mutex); 1924 _spdk_bdev_enable_qos(bdev, ch); 1925 pthread_mutex_unlock(&bdev->internal.mutex); 1926 1927 return 0; 1928 } 1929 1930 /* 1931 * Abort I/O that are waiting on a data buffer. These types of I/O are 1932 * linked using the spdk_bdev_io internal.buf_link TAILQ_ENTRY. 1933 */ 1934 static void 1935 _spdk_bdev_abort_buf_io(bdev_io_stailq_t *queue, struct spdk_bdev_channel *ch) 1936 { 1937 bdev_io_stailq_t tmp; 1938 struct spdk_bdev_io *bdev_io; 1939 1940 STAILQ_INIT(&tmp); 1941 1942 while (!STAILQ_EMPTY(queue)) { 1943 bdev_io = STAILQ_FIRST(queue); 1944 STAILQ_REMOVE_HEAD(queue, internal.buf_link); 1945 if (bdev_io->internal.ch == ch) { 1946 spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED); 1947 } else { 1948 STAILQ_INSERT_TAIL(&tmp, bdev_io, internal.buf_link); 1949 } 1950 } 1951 1952 STAILQ_SWAP(&tmp, queue, spdk_bdev_io); 1953 } 1954 1955 /* 1956 * Abort I/O that are queued waiting for submission. These types of I/O are 1957 * linked using the spdk_bdev_io link TAILQ_ENTRY. 1958 */ 1959 static void 1960 _spdk_bdev_abort_queued_io(bdev_io_tailq_t *queue, struct spdk_bdev_channel *ch) 1961 { 1962 struct spdk_bdev_io *bdev_io, *tmp; 1963 1964 TAILQ_FOREACH_SAFE(bdev_io, queue, internal.link, tmp) { 1965 if (bdev_io->internal.ch == ch) { 1966 TAILQ_REMOVE(queue, bdev_io, internal.link); 1967 /* 1968 * spdk_bdev_io_complete() assumes that the completed I/O had 1969 * been submitted to the bdev module. Since in this case it 1970 * hadn't, bump io_outstanding to account for the decrement 1971 * that spdk_bdev_io_complete() will do. 1972 */ 1973 if (bdev_io->type != SPDK_BDEV_IO_TYPE_RESET) { 1974 ch->io_outstanding++; 1975 ch->shared_resource->io_outstanding++; 1976 } 1977 spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED); 1978 } 1979 } 1980 } 1981 1982 static void 1983 spdk_bdev_qos_channel_destroy(void *cb_arg) 1984 { 1985 struct spdk_bdev_qos *qos = cb_arg; 1986 1987 spdk_put_io_channel(spdk_io_channel_from_ctx(qos->ch)); 1988 spdk_poller_unregister(&qos->poller); 1989 1990 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Free QoS %p.\n", qos); 1991 1992 free(qos); 1993 } 1994 1995 static int 1996 spdk_bdev_qos_destroy(struct spdk_bdev *bdev) 1997 { 1998 int i; 1999 2000 /* 2001 * Cleanly shutting down the QoS poller is tricky, because 2002 * during the asynchronous operation the user could open 2003 * a new descriptor and create a new channel, spawning 2004 * a new QoS poller. 2005 * 2006 * The strategy is to create a new QoS structure here and swap it 2007 * in. The shutdown path then continues to refer to the old one 2008 * until it completes and then releases it. 2009 */ 2010 struct spdk_bdev_qos *new_qos, *old_qos; 2011 2012 old_qos = bdev->internal.qos; 2013 2014 new_qos = calloc(1, sizeof(*new_qos)); 2015 if (!new_qos) { 2016 SPDK_ERRLOG("Unable to allocate memory to shut down QoS.\n"); 2017 return -ENOMEM; 2018 } 2019 2020 /* Copy the old QoS data into the newly allocated structure */ 2021 memcpy(new_qos, old_qos, sizeof(*new_qos)); 2022 2023 /* Zero out the key parts of the QoS structure */ 2024 new_qos->ch = NULL; 2025 new_qos->thread = NULL; 2026 new_qos->poller = NULL; 2027 TAILQ_INIT(&new_qos->queued); 2028 /* 2029 * The limit member of spdk_bdev_qos_limit structure is not zeroed. 2030 * It will be used later for the new QoS structure. 2031 */ 2032 for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) { 2033 new_qos->rate_limits[i].remaining_this_timeslice = 0; 2034 new_qos->rate_limits[i].min_per_timeslice = 0; 2035 new_qos->rate_limits[i].max_per_timeslice = 0; 2036 } 2037 2038 bdev->internal.qos = new_qos; 2039 2040 if (old_qos->thread == NULL) { 2041 free(old_qos); 2042 } else { 2043 spdk_thread_send_msg(old_qos->thread, spdk_bdev_qos_channel_destroy, 2044 old_qos); 2045 } 2046 2047 /* It is safe to continue with destroying the bdev even though the QoS channel hasn't 2048 * been destroyed yet. The destruction path will end up waiting for the final 2049 * channel to be put before it releases resources. */ 2050 2051 return 0; 2052 } 2053 2054 static void 2055 _spdk_bdev_io_stat_add(struct spdk_bdev_io_stat *total, struct spdk_bdev_io_stat *add) 2056 { 2057 total->bytes_read += add->bytes_read; 2058 total->num_read_ops += add->num_read_ops; 2059 total->bytes_written += add->bytes_written; 2060 total->num_write_ops += add->num_write_ops; 2061 total->bytes_unmapped += add->bytes_unmapped; 2062 total->num_unmap_ops += add->num_unmap_ops; 2063 total->read_latency_ticks += add->read_latency_ticks; 2064 total->write_latency_ticks += add->write_latency_ticks; 2065 total->unmap_latency_ticks += add->unmap_latency_ticks; 2066 } 2067 2068 static void 2069 spdk_bdev_channel_destroy(void *io_device, void *ctx_buf) 2070 { 2071 struct spdk_bdev_channel *ch = ctx_buf; 2072 struct spdk_bdev_mgmt_channel *mgmt_ch; 2073 struct spdk_bdev_shared_resource *shared_resource = ch->shared_resource; 2074 2075 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Destroying channel %p for bdev %s on thread %p\n", ch, ch->bdev->name, 2076 spdk_get_thread()); 2077 2078 /* This channel is going away, so add its statistics into the bdev so that they don't get lost. */ 2079 pthread_mutex_lock(&ch->bdev->internal.mutex); 2080 _spdk_bdev_io_stat_add(&ch->bdev->internal.stat, &ch->stat); 2081 pthread_mutex_unlock(&ch->bdev->internal.mutex); 2082 2083 mgmt_ch = shared_resource->mgmt_ch; 2084 2085 _spdk_bdev_abort_queued_io(&ch->queued_resets, ch); 2086 _spdk_bdev_abort_queued_io(&shared_resource->nomem_io, ch); 2087 _spdk_bdev_abort_buf_io(&mgmt_ch->need_buf_small, ch); 2088 _spdk_bdev_abort_buf_io(&mgmt_ch->need_buf_large, ch); 2089 2090 if (ch->histogram) { 2091 spdk_histogram_data_free(ch->histogram); 2092 } 2093 2094 _spdk_bdev_channel_destroy_resource(ch); 2095 } 2096 2097 int 2098 spdk_bdev_alias_add(struct spdk_bdev *bdev, const char *alias) 2099 { 2100 struct spdk_bdev_alias *tmp; 2101 2102 if (alias == NULL) { 2103 SPDK_ERRLOG("Empty alias passed\n"); 2104 return -EINVAL; 2105 } 2106 2107 if (spdk_bdev_get_by_name(alias)) { 2108 SPDK_ERRLOG("Bdev name/alias: %s already exists\n", alias); 2109 return -EEXIST; 2110 } 2111 2112 tmp = calloc(1, sizeof(*tmp)); 2113 if (tmp == NULL) { 2114 SPDK_ERRLOG("Unable to allocate alias\n"); 2115 return -ENOMEM; 2116 } 2117 2118 tmp->alias = strdup(alias); 2119 if (tmp->alias == NULL) { 2120 free(tmp); 2121 SPDK_ERRLOG("Unable to allocate alias\n"); 2122 return -ENOMEM; 2123 } 2124 2125 TAILQ_INSERT_TAIL(&bdev->aliases, tmp, tailq); 2126 2127 return 0; 2128 } 2129 2130 int 2131 spdk_bdev_alias_del(struct spdk_bdev *bdev, const char *alias) 2132 { 2133 struct spdk_bdev_alias *tmp; 2134 2135 TAILQ_FOREACH(tmp, &bdev->aliases, tailq) { 2136 if (strcmp(alias, tmp->alias) == 0) { 2137 TAILQ_REMOVE(&bdev->aliases, tmp, tailq); 2138 free(tmp->alias); 2139 free(tmp); 2140 return 0; 2141 } 2142 } 2143 2144 SPDK_INFOLOG(SPDK_LOG_BDEV, "Alias %s does not exists\n", alias); 2145 2146 return -ENOENT; 2147 } 2148 2149 void 2150 spdk_bdev_alias_del_all(struct spdk_bdev *bdev) 2151 { 2152 struct spdk_bdev_alias *p, *tmp; 2153 2154 TAILQ_FOREACH_SAFE(p, &bdev->aliases, tailq, tmp) { 2155 TAILQ_REMOVE(&bdev->aliases, p, tailq); 2156 free(p->alias); 2157 free(p); 2158 } 2159 } 2160 2161 struct spdk_io_channel * 2162 spdk_bdev_get_io_channel(struct spdk_bdev_desc *desc) 2163 { 2164 return spdk_get_io_channel(__bdev_to_io_dev(desc->bdev)); 2165 } 2166 2167 const char * 2168 spdk_bdev_get_name(const struct spdk_bdev *bdev) 2169 { 2170 return bdev->name; 2171 } 2172 2173 const char * 2174 spdk_bdev_get_product_name(const struct spdk_bdev *bdev) 2175 { 2176 return bdev->product_name; 2177 } 2178 2179 const struct spdk_bdev_aliases_list * 2180 spdk_bdev_get_aliases(const struct spdk_bdev *bdev) 2181 { 2182 return &bdev->aliases; 2183 } 2184 2185 uint32_t 2186 spdk_bdev_get_block_size(const struct spdk_bdev *bdev) 2187 { 2188 return bdev->blocklen; 2189 } 2190 2191 uint64_t 2192 spdk_bdev_get_num_blocks(const struct spdk_bdev *bdev) 2193 { 2194 return bdev->blockcnt; 2195 } 2196 2197 const char * 2198 spdk_bdev_get_qos_rpc_type(enum spdk_bdev_qos_rate_limit_type type) 2199 { 2200 return qos_rpc_type[type]; 2201 } 2202 2203 void 2204 spdk_bdev_get_qos_rate_limits(struct spdk_bdev *bdev, uint64_t *limits) 2205 { 2206 int i; 2207 2208 memset(limits, 0, sizeof(*limits) * SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES); 2209 2210 pthread_mutex_lock(&bdev->internal.mutex); 2211 if (bdev->internal.qos) { 2212 for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) { 2213 if (bdev->internal.qos->rate_limits[i].limit != 2214 SPDK_BDEV_QOS_LIMIT_NOT_DEFINED) { 2215 limits[i] = bdev->internal.qos->rate_limits[i].limit; 2216 if (_spdk_bdev_qos_is_iops_rate_limit(i) == false) { 2217 /* Change from Byte to Megabyte which is user visible. */ 2218 limits[i] = limits[i] / 1024 / 1024; 2219 } 2220 } 2221 } 2222 } 2223 pthread_mutex_unlock(&bdev->internal.mutex); 2224 } 2225 2226 size_t 2227 spdk_bdev_get_buf_align(const struct spdk_bdev *bdev) 2228 { 2229 return 1 << bdev->required_alignment; 2230 } 2231 2232 uint32_t 2233 spdk_bdev_get_optimal_io_boundary(const struct spdk_bdev *bdev) 2234 { 2235 return bdev->optimal_io_boundary; 2236 } 2237 2238 bool 2239 spdk_bdev_has_write_cache(const struct spdk_bdev *bdev) 2240 { 2241 return bdev->write_cache; 2242 } 2243 2244 const struct spdk_uuid * 2245 spdk_bdev_get_uuid(const struct spdk_bdev *bdev) 2246 { 2247 return &bdev->uuid; 2248 } 2249 2250 uint64_t 2251 spdk_bdev_get_qd(const struct spdk_bdev *bdev) 2252 { 2253 return bdev->internal.measured_queue_depth; 2254 } 2255 2256 uint64_t 2257 spdk_bdev_get_qd_sampling_period(const struct spdk_bdev *bdev) 2258 { 2259 return bdev->internal.period; 2260 } 2261 2262 uint64_t 2263 spdk_bdev_get_weighted_io_time(const struct spdk_bdev *bdev) 2264 { 2265 return bdev->internal.weighted_io_time; 2266 } 2267 2268 uint64_t 2269 spdk_bdev_get_io_time(const struct spdk_bdev *bdev) 2270 { 2271 return bdev->internal.io_time; 2272 } 2273 2274 static void 2275 _calculate_measured_qd_cpl(struct spdk_io_channel_iter *i, int status) 2276 { 2277 struct spdk_bdev *bdev = spdk_io_channel_iter_get_ctx(i); 2278 2279 bdev->internal.measured_queue_depth = bdev->internal.temporary_queue_depth; 2280 2281 if (bdev->internal.measured_queue_depth) { 2282 bdev->internal.io_time += bdev->internal.period; 2283 bdev->internal.weighted_io_time += bdev->internal.period * bdev->internal.measured_queue_depth; 2284 } 2285 } 2286 2287 static void 2288 _calculate_measured_qd(struct spdk_io_channel_iter *i) 2289 { 2290 struct spdk_bdev *bdev = spdk_io_channel_iter_get_ctx(i); 2291 struct spdk_io_channel *io_ch = spdk_io_channel_iter_get_channel(i); 2292 struct spdk_bdev_channel *ch = spdk_io_channel_get_ctx(io_ch); 2293 2294 bdev->internal.temporary_queue_depth += ch->io_outstanding; 2295 spdk_for_each_channel_continue(i, 0); 2296 } 2297 2298 static int 2299 spdk_bdev_calculate_measured_queue_depth(void *ctx) 2300 { 2301 struct spdk_bdev *bdev = ctx; 2302 bdev->internal.temporary_queue_depth = 0; 2303 spdk_for_each_channel(__bdev_to_io_dev(bdev), _calculate_measured_qd, bdev, 2304 _calculate_measured_qd_cpl); 2305 return 0; 2306 } 2307 2308 void 2309 spdk_bdev_set_qd_sampling_period(struct spdk_bdev *bdev, uint64_t period) 2310 { 2311 bdev->internal.period = period; 2312 2313 if (bdev->internal.qd_poller != NULL) { 2314 spdk_poller_unregister(&bdev->internal.qd_poller); 2315 bdev->internal.measured_queue_depth = UINT64_MAX; 2316 } 2317 2318 if (period != 0) { 2319 bdev->internal.qd_poller = spdk_poller_register(spdk_bdev_calculate_measured_queue_depth, bdev, 2320 period); 2321 } 2322 } 2323 2324 int 2325 spdk_bdev_notify_blockcnt_change(struct spdk_bdev *bdev, uint64_t size) 2326 { 2327 int ret; 2328 2329 pthread_mutex_lock(&bdev->internal.mutex); 2330 2331 /* bdev has open descriptors */ 2332 if (!TAILQ_EMPTY(&bdev->internal.open_descs) && 2333 bdev->blockcnt > size) { 2334 ret = -EBUSY; 2335 } else { 2336 bdev->blockcnt = size; 2337 ret = 0; 2338 } 2339 2340 pthread_mutex_unlock(&bdev->internal.mutex); 2341 2342 return ret; 2343 } 2344 2345 /* 2346 * Convert I/O offset and length from bytes to blocks. 2347 * 2348 * Returns zero on success or non-zero if the byte parameters aren't divisible by the block size. 2349 */ 2350 static uint64_t 2351 spdk_bdev_bytes_to_blocks(struct spdk_bdev *bdev, uint64_t offset_bytes, uint64_t *offset_blocks, 2352 uint64_t num_bytes, uint64_t *num_blocks) 2353 { 2354 uint32_t block_size = bdev->blocklen; 2355 uint8_t shift_cnt; 2356 2357 /* Avoid expensive div operations if possible. These spdk_u32 functions are very cheap. */ 2358 if (spdk_likely(spdk_u32_is_pow2(block_size))) { 2359 shift_cnt = spdk_u32log2(block_size); 2360 *offset_blocks = offset_bytes >> shift_cnt; 2361 *num_blocks = num_bytes >> shift_cnt; 2362 return (offset_bytes - (*offset_blocks << shift_cnt)) | 2363 (num_bytes - (*num_blocks << shift_cnt)); 2364 } else { 2365 *offset_blocks = offset_bytes / block_size; 2366 *num_blocks = num_bytes / block_size; 2367 return (offset_bytes % block_size) | (num_bytes % block_size); 2368 } 2369 } 2370 2371 static bool 2372 spdk_bdev_io_valid_blocks(struct spdk_bdev *bdev, uint64_t offset_blocks, uint64_t num_blocks) 2373 { 2374 /* Return failure if offset_blocks + num_blocks is less than offset_blocks; indicates there 2375 * has been an overflow and hence the offset has been wrapped around */ 2376 if (offset_blocks + num_blocks < offset_blocks) { 2377 return false; 2378 } 2379 2380 /* Return failure if offset_blocks + num_blocks exceeds the size of the bdev */ 2381 if (offset_blocks + num_blocks > bdev->blockcnt) { 2382 return false; 2383 } 2384 2385 return true; 2386 } 2387 2388 int 2389 spdk_bdev_read(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 2390 void *buf, uint64_t offset, uint64_t nbytes, 2391 spdk_bdev_io_completion_cb cb, void *cb_arg) 2392 { 2393 uint64_t offset_blocks, num_blocks; 2394 2395 if (spdk_bdev_bytes_to_blocks(desc->bdev, offset, &offset_blocks, nbytes, &num_blocks) != 0) { 2396 return -EINVAL; 2397 } 2398 2399 return spdk_bdev_read_blocks(desc, ch, buf, offset_blocks, num_blocks, cb, cb_arg); 2400 } 2401 2402 int 2403 spdk_bdev_read_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 2404 void *buf, uint64_t offset_blocks, uint64_t num_blocks, 2405 spdk_bdev_io_completion_cb cb, void *cb_arg) 2406 { 2407 struct spdk_bdev *bdev = desc->bdev; 2408 struct spdk_bdev_io *bdev_io; 2409 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 2410 2411 if (!spdk_bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) { 2412 return -EINVAL; 2413 } 2414 2415 bdev_io = spdk_bdev_get_io(channel); 2416 if (!bdev_io) { 2417 return -ENOMEM; 2418 } 2419 2420 bdev_io->internal.ch = channel; 2421 bdev_io->internal.desc = desc; 2422 bdev_io->type = SPDK_BDEV_IO_TYPE_READ; 2423 bdev_io->u.bdev.iovs = &bdev_io->iov; 2424 bdev_io->u.bdev.iovs[0].iov_base = buf; 2425 bdev_io->u.bdev.iovs[0].iov_len = num_blocks * bdev->blocklen; 2426 bdev_io->u.bdev.iovcnt = 1; 2427 bdev_io->u.bdev.num_blocks = num_blocks; 2428 bdev_io->u.bdev.offset_blocks = offset_blocks; 2429 spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb); 2430 2431 spdk_bdev_io_submit(bdev_io); 2432 return 0; 2433 } 2434 2435 int 2436 spdk_bdev_readv(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 2437 struct iovec *iov, int iovcnt, 2438 uint64_t offset, uint64_t nbytes, 2439 spdk_bdev_io_completion_cb cb, void *cb_arg) 2440 { 2441 uint64_t offset_blocks, num_blocks; 2442 2443 if (spdk_bdev_bytes_to_blocks(desc->bdev, offset, &offset_blocks, nbytes, &num_blocks) != 0) { 2444 return -EINVAL; 2445 } 2446 2447 return spdk_bdev_readv_blocks(desc, ch, iov, iovcnt, offset_blocks, num_blocks, cb, cb_arg); 2448 } 2449 2450 int spdk_bdev_readv_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 2451 struct iovec *iov, int iovcnt, 2452 uint64_t offset_blocks, uint64_t num_blocks, 2453 spdk_bdev_io_completion_cb cb, void *cb_arg) 2454 { 2455 struct spdk_bdev *bdev = desc->bdev; 2456 struct spdk_bdev_io *bdev_io; 2457 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 2458 2459 if (!spdk_bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) { 2460 return -EINVAL; 2461 } 2462 2463 bdev_io = spdk_bdev_get_io(channel); 2464 if (!bdev_io) { 2465 return -ENOMEM; 2466 } 2467 2468 bdev_io->internal.ch = channel; 2469 bdev_io->internal.desc = desc; 2470 bdev_io->type = SPDK_BDEV_IO_TYPE_READ; 2471 bdev_io->u.bdev.iovs = iov; 2472 bdev_io->u.bdev.iovcnt = iovcnt; 2473 bdev_io->u.bdev.num_blocks = num_blocks; 2474 bdev_io->u.bdev.offset_blocks = offset_blocks; 2475 spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb); 2476 2477 spdk_bdev_io_submit(bdev_io); 2478 return 0; 2479 } 2480 2481 int 2482 spdk_bdev_write(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 2483 void *buf, uint64_t offset, uint64_t nbytes, 2484 spdk_bdev_io_completion_cb cb, void *cb_arg) 2485 { 2486 uint64_t offset_blocks, num_blocks; 2487 2488 if (spdk_bdev_bytes_to_blocks(desc->bdev, offset, &offset_blocks, nbytes, &num_blocks) != 0) { 2489 return -EINVAL; 2490 } 2491 2492 return spdk_bdev_write_blocks(desc, ch, buf, offset_blocks, num_blocks, cb, cb_arg); 2493 } 2494 2495 int 2496 spdk_bdev_write_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 2497 void *buf, uint64_t offset_blocks, uint64_t num_blocks, 2498 spdk_bdev_io_completion_cb cb, void *cb_arg) 2499 { 2500 struct spdk_bdev *bdev = desc->bdev; 2501 struct spdk_bdev_io *bdev_io; 2502 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 2503 2504 if (!desc->write) { 2505 return -EBADF; 2506 } 2507 2508 if (!spdk_bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) { 2509 return -EINVAL; 2510 } 2511 2512 bdev_io = spdk_bdev_get_io(channel); 2513 if (!bdev_io) { 2514 return -ENOMEM; 2515 } 2516 2517 bdev_io->internal.ch = channel; 2518 bdev_io->internal.desc = desc; 2519 bdev_io->type = SPDK_BDEV_IO_TYPE_WRITE; 2520 bdev_io->u.bdev.iovs = &bdev_io->iov; 2521 bdev_io->u.bdev.iovs[0].iov_base = buf; 2522 bdev_io->u.bdev.iovs[0].iov_len = num_blocks * bdev->blocklen; 2523 bdev_io->u.bdev.iovcnt = 1; 2524 bdev_io->u.bdev.num_blocks = num_blocks; 2525 bdev_io->u.bdev.offset_blocks = offset_blocks; 2526 spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb); 2527 2528 spdk_bdev_io_submit(bdev_io); 2529 return 0; 2530 } 2531 2532 int 2533 spdk_bdev_writev(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 2534 struct iovec *iov, int iovcnt, 2535 uint64_t offset, uint64_t len, 2536 spdk_bdev_io_completion_cb cb, void *cb_arg) 2537 { 2538 uint64_t offset_blocks, num_blocks; 2539 2540 if (spdk_bdev_bytes_to_blocks(desc->bdev, offset, &offset_blocks, len, &num_blocks) != 0) { 2541 return -EINVAL; 2542 } 2543 2544 return spdk_bdev_writev_blocks(desc, ch, iov, iovcnt, offset_blocks, num_blocks, cb, cb_arg); 2545 } 2546 2547 int 2548 spdk_bdev_writev_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 2549 struct iovec *iov, int iovcnt, 2550 uint64_t offset_blocks, uint64_t num_blocks, 2551 spdk_bdev_io_completion_cb cb, void *cb_arg) 2552 { 2553 struct spdk_bdev *bdev = desc->bdev; 2554 struct spdk_bdev_io *bdev_io; 2555 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 2556 2557 if (!desc->write) { 2558 return -EBADF; 2559 } 2560 2561 if (!spdk_bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) { 2562 return -EINVAL; 2563 } 2564 2565 bdev_io = spdk_bdev_get_io(channel); 2566 if (!bdev_io) { 2567 return -ENOMEM; 2568 } 2569 2570 bdev_io->internal.ch = channel; 2571 bdev_io->internal.desc = desc; 2572 bdev_io->type = SPDK_BDEV_IO_TYPE_WRITE; 2573 bdev_io->u.bdev.iovs = iov; 2574 bdev_io->u.bdev.iovcnt = iovcnt; 2575 bdev_io->u.bdev.num_blocks = num_blocks; 2576 bdev_io->u.bdev.offset_blocks = offset_blocks; 2577 spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb); 2578 2579 spdk_bdev_io_submit(bdev_io); 2580 return 0; 2581 } 2582 2583 int 2584 spdk_bdev_write_zeroes(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 2585 uint64_t offset, uint64_t len, 2586 spdk_bdev_io_completion_cb cb, void *cb_arg) 2587 { 2588 uint64_t offset_blocks, num_blocks; 2589 2590 if (spdk_bdev_bytes_to_blocks(desc->bdev, offset, &offset_blocks, len, &num_blocks) != 0) { 2591 return -EINVAL; 2592 } 2593 2594 return spdk_bdev_write_zeroes_blocks(desc, ch, offset_blocks, num_blocks, cb, cb_arg); 2595 } 2596 2597 int 2598 spdk_bdev_write_zeroes_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 2599 uint64_t offset_blocks, uint64_t num_blocks, 2600 spdk_bdev_io_completion_cb cb, void *cb_arg) 2601 { 2602 struct spdk_bdev *bdev = desc->bdev; 2603 struct spdk_bdev_io *bdev_io; 2604 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 2605 2606 if (!desc->write) { 2607 return -EBADF; 2608 } 2609 2610 if (!spdk_bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) { 2611 return -EINVAL; 2612 } 2613 2614 bdev_io = spdk_bdev_get_io(channel); 2615 2616 if (!bdev_io) { 2617 return -ENOMEM; 2618 } 2619 2620 bdev_io->type = SPDK_BDEV_IO_TYPE_WRITE_ZEROES; 2621 bdev_io->internal.ch = channel; 2622 bdev_io->internal.desc = desc; 2623 bdev_io->u.bdev.offset_blocks = offset_blocks; 2624 bdev_io->u.bdev.num_blocks = num_blocks; 2625 spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb); 2626 2627 if (_spdk_bdev_io_type_supported(bdev, SPDK_BDEV_IO_TYPE_WRITE_ZEROES)) { 2628 spdk_bdev_io_submit(bdev_io); 2629 return 0; 2630 } else if (_spdk_bdev_io_type_supported(bdev, SPDK_BDEV_IO_TYPE_WRITE)) { 2631 assert(spdk_bdev_get_block_size(bdev) <= ZERO_BUFFER_SIZE); 2632 bdev_io->u.bdev.split_remaining_num_blocks = num_blocks; 2633 bdev_io->u.bdev.split_current_offset_blocks = offset_blocks; 2634 _spdk_bdev_write_zero_buffer_next(bdev_io); 2635 return 0; 2636 } else { 2637 spdk_bdev_free_io(bdev_io); 2638 return -ENOTSUP; 2639 } 2640 } 2641 2642 int 2643 spdk_bdev_unmap(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 2644 uint64_t offset, uint64_t nbytes, 2645 spdk_bdev_io_completion_cb cb, void *cb_arg) 2646 { 2647 uint64_t offset_blocks, num_blocks; 2648 2649 if (spdk_bdev_bytes_to_blocks(desc->bdev, offset, &offset_blocks, nbytes, &num_blocks) != 0) { 2650 return -EINVAL; 2651 } 2652 2653 return spdk_bdev_unmap_blocks(desc, ch, offset_blocks, num_blocks, cb, cb_arg); 2654 } 2655 2656 int 2657 spdk_bdev_unmap_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 2658 uint64_t offset_blocks, uint64_t num_blocks, 2659 spdk_bdev_io_completion_cb cb, void *cb_arg) 2660 { 2661 struct spdk_bdev *bdev = desc->bdev; 2662 struct spdk_bdev_io *bdev_io; 2663 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 2664 2665 if (!desc->write) { 2666 return -EBADF; 2667 } 2668 2669 if (!spdk_bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) { 2670 return -EINVAL; 2671 } 2672 2673 if (num_blocks == 0) { 2674 SPDK_ERRLOG("Can't unmap 0 bytes\n"); 2675 return -EINVAL; 2676 } 2677 2678 bdev_io = spdk_bdev_get_io(channel); 2679 if (!bdev_io) { 2680 return -ENOMEM; 2681 } 2682 2683 bdev_io->internal.ch = channel; 2684 bdev_io->internal.desc = desc; 2685 bdev_io->type = SPDK_BDEV_IO_TYPE_UNMAP; 2686 2687 bdev_io->u.bdev.iovs = &bdev_io->iov; 2688 bdev_io->u.bdev.iovs[0].iov_base = NULL; 2689 bdev_io->u.bdev.iovs[0].iov_len = 0; 2690 bdev_io->u.bdev.iovcnt = 1; 2691 2692 bdev_io->u.bdev.offset_blocks = offset_blocks; 2693 bdev_io->u.bdev.num_blocks = num_blocks; 2694 spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb); 2695 2696 spdk_bdev_io_submit(bdev_io); 2697 return 0; 2698 } 2699 2700 int 2701 spdk_bdev_flush(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 2702 uint64_t offset, uint64_t length, 2703 spdk_bdev_io_completion_cb cb, void *cb_arg) 2704 { 2705 uint64_t offset_blocks, num_blocks; 2706 2707 if (spdk_bdev_bytes_to_blocks(desc->bdev, offset, &offset_blocks, length, &num_blocks) != 0) { 2708 return -EINVAL; 2709 } 2710 2711 return spdk_bdev_flush_blocks(desc, ch, offset_blocks, num_blocks, cb, cb_arg); 2712 } 2713 2714 int 2715 spdk_bdev_flush_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 2716 uint64_t offset_blocks, uint64_t num_blocks, 2717 spdk_bdev_io_completion_cb cb, void *cb_arg) 2718 { 2719 struct spdk_bdev *bdev = desc->bdev; 2720 struct spdk_bdev_io *bdev_io; 2721 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 2722 2723 if (!desc->write) { 2724 return -EBADF; 2725 } 2726 2727 if (!spdk_bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) { 2728 return -EINVAL; 2729 } 2730 2731 bdev_io = spdk_bdev_get_io(channel); 2732 if (!bdev_io) { 2733 return -ENOMEM; 2734 } 2735 2736 bdev_io->internal.ch = channel; 2737 bdev_io->internal.desc = desc; 2738 bdev_io->type = SPDK_BDEV_IO_TYPE_FLUSH; 2739 bdev_io->u.bdev.iovs = NULL; 2740 bdev_io->u.bdev.iovcnt = 0; 2741 bdev_io->u.bdev.offset_blocks = offset_blocks; 2742 bdev_io->u.bdev.num_blocks = num_blocks; 2743 spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb); 2744 2745 spdk_bdev_io_submit(bdev_io); 2746 return 0; 2747 } 2748 2749 static void 2750 _spdk_bdev_reset_dev(struct spdk_io_channel_iter *i, int status) 2751 { 2752 struct spdk_bdev_channel *ch = spdk_io_channel_iter_get_ctx(i); 2753 struct spdk_bdev_io *bdev_io; 2754 2755 bdev_io = TAILQ_FIRST(&ch->queued_resets); 2756 TAILQ_REMOVE(&ch->queued_resets, bdev_io, internal.link); 2757 spdk_bdev_io_submit_reset(bdev_io); 2758 } 2759 2760 static void 2761 _spdk_bdev_reset_freeze_channel(struct spdk_io_channel_iter *i) 2762 { 2763 struct spdk_io_channel *ch; 2764 struct spdk_bdev_channel *channel; 2765 struct spdk_bdev_mgmt_channel *mgmt_channel; 2766 struct spdk_bdev_shared_resource *shared_resource; 2767 bdev_io_tailq_t tmp_queued; 2768 2769 TAILQ_INIT(&tmp_queued); 2770 2771 ch = spdk_io_channel_iter_get_channel(i); 2772 channel = spdk_io_channel_get_ctx(ch); 2773 shared_resource = channel->shared_resource; 2774 mgmt_channel = shared_resource->mgmt_ch; 2775 2776 channel->flags |= BDEV_CH_RESET_IN_PROGRESS; 2777 2778 if ((channel->flags & BDEV_CH_QOS_ENABLED) != 0) { 2779 /* The QoS object is always valid and readable while 2780 * the channel flag is set, so the lock here should not 2781 * be necessary. We're not in the fast path though, so 2782 * just take it anyway. */ 2783 pthread_mutex_lock(&channel->bdev->internal.mutex); 2784 if (channel->bdev->internal.qos->ch == channel) { 2785 TAILQ_SWAP(&channel->bdev->internal.qos->queued, &tmp_queued, spdk_bdev_io, internal.link); 2786 } 2787 pthread_mutex_unlock(&channel->bdev->internal.mutex); 2788 } 2789 2790 _spdk_bdev_abort_queued_io(&shared_resource->nomem_io, channel); 2791 _spdk_bdev_abort_buf_io(&mgmt_channel->need_buf_small, channel); 2792 _spdk_bdev_abort_buf_io(&mgmt_channel->need_buf_large, channel); 2793 _spdk_bdev_abort_queued_io(&tmp_queued, channel); 2794 2795 spdk_for_each_channel_continue(i, 0); 2796 } 2797 2798 static void 2799 _spdk_bdev_start_reset(void *ctx) 2800 { 2801 struct spdk_bdev_channel *ch = ctx; 2802 2803 spdk_for_each_channel(__bdev_to_io_dev(ch->bdev), _spdk_bdev_reset_freeze_channel, 2804 ch, _spdk_bdev_reset_dev); 2805 } 2806 2807 static void 2808 _spdk_bdev_channel_start_reset(struct spdk_bdev_channel *ch) 2809 { 2810 struct spdk_bdev *bdev = ch->bdev; 2811 2812 assert(!TAILQ_EMPTY(&ch->queued_resets)); 2813 2814 pthread_mutex_lock(&bdev->internal.mutex); 2815 if (bdev->internal.reset_in_progress == NULL) { 2816 bdev->internal.reset_in_progress = TAILQ_FIRST(&ch->queued_resets); 2817 /* 2818 * Take a channel reference for the target bdev for the life of this 2819 * reset. This guards against the channel getting destroyed while 2820 * spdk_for_each_channel() calls related to this reset IO are in 2821 * progress. We will release the reference when this reset is 2822 * completed. 2823 */ 2824 bdev->internal.reset_in_progress->u.reset.ch_ref = spdk_get_io_channel(__bdev_to_io_dev(bdev)); 2825 _spdk_bdev_start_reset(ch); 2826 } 2827 pthread_mutex_unlock(&bdev->internal.mutex); 2828 } 2829 2830 int 2831 spdk_bdev_reset(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 2832 spdk_bdev_io_completion_cb cb, void *cb_arg) 2833 { 2834 struct spdk_bdev *bdev = desc->bdev; 2835 struct spdk_bdev_io *bdev_io; 2836 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 2837 2838 bdev_io = spdk_bdev_get_io(channel); 2839 if (!bdev_io) { 2840 return -ENOMEM; 2841 } 2842 2843 bdev_io->internal.ch = channel; 2844 bdev_io->internal.desc = desc; 2845 bdev_io->type = SPDK_BDEV_IO_TYPE_RESET; 2846 bdev_io->u.reset.ch_ref = NULL; 2847 spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb); 2848 2849 pthread_mutex_lock(&bdev->internal.mutex); 2850 TAILQ_INSERT_TAIL(&channel->queued_resets, bdev_io, internal.link); 2851 pthread_mutex_unlock(&bdev->internal.mutex); 2852 2853 _spdk_bdev_channel_start_reset(channel); 2854 2855 return 0; 2856 } 2857 2858 void 2859 spdk_bdev_get_io_stat(struct spdk_bdev *bdev, struct spdk_io_channel *ch, 2860 struct spdk_bdev_io_stat *stat) 2861 { 2862 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 2863 2864 *stat = channel->stat; 2865 } 2866 2867 static void 2868 _spdk_bdev_get_device_stat_done(struct spdk_io_channel_iter *i, int status) 2869 { 2870 void *io_device = spdk_io_channel_iter_get_io_device(i); 2871 struct spdk_bdev_iostat_ctx *bdev_iostat_ctx = spdk_io_channel_iter_get_ctx(i); 2872 2873 bdev_iostat_ctx->cb(__bdev_from_io_dev(io_device), bdev_iostat_ctx->stat, 2874 bdev_iostat_ctx->cb_arg, 0); 2875 free(bdev_iostat_ctx); 2876 } 2877 2878 static void 2879 _spdk_bdev_get_each_channel_stat(struct spdk_io_channel_iter *i) 2880 { 2881 struct spdk_bdev_iostat_ctx *bdev_iostat_ctx = spdk_io_channel_iter_get_ctx(i); 2882 struct spdk_io_channel *ch = spdk_io_channel_iter_get_channel(i); 2883 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 2884 2885 _spdk_bdev_io_stat_add(bdev_iostat_ctx->stat, &channel->stat); 2886 spdk_for_each_channel_continue(i, 0); 2887 } 2888 2889 void 2890 spdk_bdev_get_device_stat(struct spdk_bdev *bdev, struct spdk_bdev_io_stat *stat, 2891 spdk_bdev_get_device_stat_cb cb, void *cb_arg) 2892 { 2893 struct spdk_bdev_iostat_ctx *bdev_iostat_ctx; 2894 2895 assert(bdev != NULL); 2896 assert(stat != NULL); 2897 assert(cb != NULL); 2898 2899 bdev_iostat_ctx = calloc(1, sizeof(struct spdk_bdev_iostat_ctx)); 2900 if (bdev_iostat_ctx == NULL) { 2901 SPDK_ERRLOG("Unable to allocate memory for spdk_bdev_iostat_ctx\n"); 2902 cb(bdev, stat, cb_arg, -ENOMEM); 2903 return; 2904 } 2905 2906 bdev_iostat_ctx->stat = stat; 2907 bdev_iostat_ctx->cb = cb; 2908 bdev_iostat_ctx->cb_arg = cb_arg; 2909 2910 /* Start with the statistics from previously deleted channels. */ 2911 pthread_mutex_lock(&bdev->internal.mutex); 2912 _spdk_bdev_io_stat_add(bdev_iostat_ctx->stat, &bdev->internal.stat); 2913 pthread_mutex_unlock(&bdev->internal.mutex); 2914 2915 /* Then iterate and add the statistics from each existing channel. */ 2916 spdk_for_each_channel(__bdev_to_io_dev(bdev), 2917 _spdk_bdev_get_each_channel_stat, 2918 bdev_iostat_ctx, 2919 _spdk_bdev_get_device_stat_done); 2920 } 2921 2922 int 2923 spdk_bdev_nvme_admin_passthru(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 2924 const struct spdk_nvme_cmd *cmd, void *buf, size_t nbytes, 2925 spdk_bdev_io_completion_cb cb, void *cb_arg) 2926 { 2927 struct spdk_bdev *bdev = desc->bdev; 2928 struct spdk_bdev_io *bdev_io; 2929 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 2930 2931 if (!desc->write) { 2932 return -EBADF; 2933 } 2934 2935 bdev_io = spdk_bdev_get_io(channel); 2936 if (!bdev_io) { 2937 return -ENOMEM; 2938 } 2939 2940 bdev_io->internal.ch = channel; 2941 bdev_io->internal.desc = desc; 2942 bdev_io->type = SPDK_BDEV_IO_TYPE_NVME_ADMIN; 2943 bdev_io->u.nvme_passthru.cmd = *cmd; 2944 bdev_io->u.nvme_passthru.buf = buf; 2945 bdev_io->u.nvme_passthru.nbytes = nbytes; 2946 bdev_io->u.nvme_passthru.md_buf = NULL; 2947 bdev_io->u.nvme_passthru.md_len = 0; 2948 2949 spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb); 2950 2951 spdk_bdev_io_submit(bdev_io); 2952 return 0; 2953 } 2954 2955 int 2956 spdk_bdev_nvme_io_passthru(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 2957 const struct spdk_nvme_cmd *cmd, void *buf, size_t nbytes, 2958 spdk_bdev_io_completion_cb cb, void *cb_arg) 2959 { 2960 struct spdk_bdev *bdev = desc->bdev; 2961 struct spdk_bdev_io *bdev_io; 2962 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 2963 2964 if (!desc->write) { 2965 /* 2966 * Do not try to parse the NVMe command - we could maybe use bits in the opcode 2967 * to easily determine if the command is a read or write, but for now just 2968 * do not allow io_passthru with a read-only descriptor. 2969 */ 2970 return -EBADF; 2971 } 2972 2973 bdev_io = spdk_bdev_get_io(channel); 2974 if (!bdev_io) { 2975 return -ENOMEM; 2976 } 2977 2978 bdev_io->internal.ch = channel; 2979 bdev_io->internal.desc = desc; 2980 bdev_io->type = SPDK_BDEV_IO_TYPE_NVME_IO; 2981 bdev_io->u.nvme_passthru.cmd = *cmd; 2982 bdev_io->u.nvme_passthru.buf = buf; 2983 bdev_io->u.nvme_passthru.nbytes = nbytes; 2984 bdev_io->u.nvme_passthru.md_buf = NULL; 2985 bdev_io->u.nvme_passthru.md_len = 0; 2986 2987 spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb); 2988 2989 spdk_bdev_io_submit(bdev_io); 2990 return 0; 2991 } 2992 2993 int 2994 spdk_bdev_nvme_io_passthru_md(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 2995 const struct spdk_nvme_cmd *cmd, void *buf, size_t nbytes, void *md_buf, size_t md_len, 2996 spdk_bdev_io_completion_cb cb, void *cb_arg) 2997 { 2998 struct spdk_bdev *bdev = desc->bdev; 2999 struct spdk_bdev_io *bdev_io; 3000 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 3001 3002 if (!desc->write) { 3003 /* 3004 * Do not try to parse the NVMe command - we could maybe use bits in the opcode 3005 * to easily determine if the command is a read or write, but for now just 3006 * do not allow io_passthru with a read-only descriptor. 3007 */ 3008 return -EBADF; 3009 } 3010 3011 bdev_io = spdk_bdev_get_io(channel); 3012 if (!bdev_io) { 3013 return -ENOMEM; 3014 } 3015 3016 bdev_io->internal.ch = channel; 3017 bdev_io->internal.desc = desc; 3018 bdev_io->type = SPDK_BDEV_IO_TYPE_NVME_IO_MD; 3019 bdev_io->u.nvme_passthru.cmd = *cmd; 3020 bdev_io->u.nvme_passthru.buf = buf; 3021 bdev_io->u.nvme_passthru.nbytes = nbytes; 3022 bdev_io->u.nvme_passthru.md_buf = md_buf; 3023 bdev_io->u.nvme_passthru.md_len = md_len; 3024 3025 spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb); 3026 3027 spdk_bdev_io_submit(bdev_io); 3028 return 0; 3029 } 3030 3031 int 3032 spdk_bdev_queue_io_wait(struct spdk_bdev *bdev, struct spdk_io_channel *ch, 3033 struct spdk_bdev_io_wait_entry *entry) 3034 { 3035 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 3036 struct spdk_bdev_mgmt_channel *mgmt_ch = channel->shared_resource->mgmt_ch; 3037 3038 if (bdev != entry->bdev) { 3039 SPDK_ERRLOG("bdevs do not match\n"); 3040 return -EINVAL; 3041 } 3042 3043 if (mgmt_ch->per_thread_cache_count > 0) { 3044 SPDK_ERRLOG("Cannot queue io_wait if spdk_bdev_io available in per-thread cache\n"); 3045 return -EINVAL; 3046 } 3047 3048 TAILQ_INSERT_TAIL(&mgmt_ch->io_wait_queue, entry, link); 3049 return 0; 3050 } 3051 3052 static void 3053 _spdk_bdev_ch_retry_io(struct spdk_bdev_channel *bdev_ch) 3054 { 3055 struct spdk_bdev *bdev = bdev_ch->bdev; 3056 struct spdk_bdev_shared_resource *shared_resource = bdev_ch->shared_resource; 3057 struct spdk_bdev_io *bdev_io; 3058 3059 if (shared_resource->io_outstanding > shared_resource->nomem_threshold) { 3060 /* 3061 * Allow some more I/O to complete before retrying the nomem_io queue. 3062 * Some drivers (such as nvme) cannot immediately take a new I/O in 3063 * the context of a completion, because the resources for the I/O are 3064 * not released until control returns to the bdev poller. Also, we 3065 * may require several small I/O to complete before a larger I/O 3066 * (that requires splitting) can be submitted. 3067 */ 3068 return; 3069 } 3070 3071 while (!TAILQ_EMPTY(&shared_resource->nomem_io)) { 3072 bdev_io = TAILQ_FIRST(&shared_resource->nomem_io); 3073 TAILQ_REMOVE(&shared_resource->nomem_io, bdev_io, internal.link); 3074 bdev_io->internal.ch->io_outstanding++; 3075 shared_resource->io_outstanding++; 3076 bdev_io->internal.status = SPDK_BDEV_IO_STATUS_PENDING; 3077 bdev->fn_table->submit_request(bdev_io->internal.ch->channel, bdev_io); 3078 if (bdev_io->internal.status == SPDK_BDEV_IO_STATUS_NOMEM) { 3079 break; 3080 } 3081 } 3082 } 3083 3084 static inline void 3085 _spdk_bdev_io_complete(void *ctx) 3086 { 3087 struct spdk_bdev_io *bdev_io = ctx; 3088 uint64_t tsc, tsc_diff; 3089 3090 if (spdk_unlikely(bdev_io->internal.in_submit_request || bdev_io->internal.io_submit_ch)) { 3091 /* 3092 * Send the completion to the thread that originally submitted the I/O, 3093 * which may not be the current thread in the case of QoS. 3094 */ 3095 if (bdev_io->internal.io_submit_ch) { 3096 bdev_io->internal.ch = bdev_io->internal.io_submit_ch; 3097 bdev_io->internal.io_submit_ch = NULL; 3098 } 3099 3100 /* 3101 * Defer completion to avoid potential infinite recursion if the 3102 * user's completion callback issues a new I/O. 3103 */ 3104 spdk_thread_send_msg(spdk_io_channel_get_thread(bdev_io->internal.ch->channel), 3105 _spdk_bdev_io_complete, bdev_io); 3106 return; 3107 } 3108 3109 tsc = spdk_get_ticks(); 3110 tsc_diff = tsc - bdev_io->internal.submit_tsc; 3111 spdk_trace_record_tsc(tsc, TRACE_BDEV_IO_DONE, 0, 0, (uintptr_t)bdev_io, 0); 3112 3113 if (bdev_io->internal.ch->histogram) { 3114 spdk_histogram_data_tally(bdev_io->internal.ch->histogram, tsc_diff); 3115 } 3116 3117 if (bdev_io->internal.status == SPDK_BDEV_IO_STATUS_SUCCESS) { 3118 switch (bdev_io->type) { 3119 case SPDK_BDEV_IO_TYPE_READ: 3120 bdev_io->internal.ch->stat.bytes_read += bdev_io->u.bdev.num_blocks * bdev_io->bdev->blocklen; 3121 bdev_io->internal.ch->stat.num_read_ops++; 3122 bdev_io->internal.ch->stat.read_latency_ticks += tsc_diff; 3123 break; 3124 case SPDK_BDEV_IO_TYPE_WRITE: 3125 bdev_io->internal.ch->stat.bytes_written += bdev_io->u.bdev.num_blocks * bdev_io->bdev->blocklen; 3126 bdev_io->internal.ch->stat.num_write_ops++; 3127 bdev_io->internal.ch->stat.write_latency_ticks += tsc_diff; 3128 break; 3129 case SPDK_BDEV_IO_TYPE_UNMAP: 3130 bdev_io->internal.ch->stat.bytes_unmapped += bdev_io->u.bdev.num_blocks * bdev_io->bdev->blocklen; 3131 bdev_io->internal.ch->stat.num_unmap_ops++; 3132 bdev_io->internal.ch->stat.unmap_latency_ticks += tsc_diff; 3133 default: 3134 break; 3135 } 3136 } 3137 3138 #ifdef SPDK_CONFIG_VTUNE 3139 uint64_t now_tsc = spdk_get_ticks(); 3140 if (now_tsc > (bdev_io->internal.ch->start_tsc + bdev_io->internal.ch->interval_tsc)) { 3141 uint64_t data[5]; 3142 3143 data[0] = bdev_io->internal.ch->stat.num_read_ops - bdev_io->internal.ch->prev_stat.num_read_ops; 3144 data[1] = bdev_io->internal.ch->stat.bytes_read - bdev_io->internal.ch->prev_stat.bytes_read; 3145 data[2] = bdev_io->internal.ch->stat.num_write_ops - bdev_io->internal.ch->prev_stat.num_write_ops; 3146 data[3] = bdev_io->internal.ch->stat.bytes_written - bdev_io->internal.ch->prev_stat.bytes_written; 3147 data[4] = bdev_io->bdev->fn_table->get_spin_time ? 3148 bdev_io->bdev->fn_table->get_spin_time(bdev_io->internal.ch->channel) : 0; 3149 3150 __itt_metadata_add(g_bdev_mgr.domain, __itt_null, bdev_io->internal.ch->handle, 3151 __itt_metadata_u64, 5, data); 3152 3153 bdev_io->internal.ch->prev_stat = bdev_io->internal.ch->stat; 3154 bdev_io->internal.ch->start_tsc = now_tsc; 3155 } 3156 #endif 3157 3158 assert(bdev_io->internal.cb != NULL); 3159 assert(spdk_get_thread() == spdk_io_channel_get_thread(bdev_io->internal.ch->channel)); 3160 3161 bdev_io->internal.cb(bdev_io, bdev_io->internal.status == SPDK_BDEV_IO_STATUS_SUCCESS, 3162 bdev_io->internal.caller_ctx); 3163 } 3164 3165 static void 3166 _spdk_bdev_reset_complete(struct spdk_io_channel_iter *i, int status) 3167 { 3168 struct spdk_bdev_io *bdev_io = spdk_io_channel_iter_get_ctx(i); 3169 3170 if (bdev_io->u.reset.ch_ref != NULL) { 3171 spdk_put_io_channel(bdev_io->u.reset.ch_ref); 3172 bdev_io->u.reset.ch_ref = NULL; 3173 } 3174 3175 _spdk_bdev_io_complete(bdev_io); 3176 } 3177 3178 static void 3179 _spdk_bdev_unfreeze_channel(struct spdk_io_channel_iter *i) 3180 { 3181 struct spdk_io_channel *_ch = spdk_io_channel_iter_get_channel(i); 3182 struct spdk_bdev_channel *ch = spdk_io_channel_get_ctx(_ch); 3183 3184 ch->flags &= ~BDEV_CH_RESET_IN_PROGRESS; 3185 if (!TAILQ_EMPTY(&ch->queued_resets)) { 3186 _spdk_bdev_channel_start_reset(ch); 3187 } 3188 3189 spdk_for_each_channel_continue(i, 0); 3190 } 3191 3192 void 3193 spdk_bdev_io_complete(struct spdk_bdev_io *bdev_io, enum spdk_bdev_io_status status) 3194 { 3195 struct spdk_bdev *bdev = bdev_io->bdev; 3196 struct spdk_bdev_channel *bdev_ch = bdev_io->internal.ch; 3197 struct spdk_bdev_shared_resource *shared_resource = bdev_ch->shared_resource; 3198 3199 bdev_io->internal.status = status; 3200 3201 if (spdk_unlikely(bdev_io->type == SPDK_BDEV_IO_TYPE_RESET)) { 3202 bool unlock_channels = false; 3203 3204 if (status == SPDK_BDEV_IO_STATUS_NOMEM) { 3205 SPDK_ERRLOG("NOMEM returned for reset\n"); 3206 } 3207 pthread_mutex_lock(&bdev->internal.mutex); 3208 if (bdev_io == bdev->internal.reset_in_progress) { 3209 bdev->internal.reset_in_progress = NULL; 3210 unlock_channels = true; 3211 } 3212 pthread_mutex_unlock(&bdev->internal.mutex); 3213 3214 if (unlock_channels) { 3215 spdk_for_each_channel(__bdev_to_io_dev(bdev), _spdk_bdev_unfreeze_channel, 3216 bdev_io, _spdk_bdev_reset_complete); 3217 return; 3218 } 3219 } else { 3220 if (spdk_unlikely(bdev_io->internal.orig_iovcnt > 0)) { 3221 _bdev_io_unset_bounce_buf(bdev_io); 3222 } 3223 3224 assert(bdev_ch->io_outstanding > 0); 3225 assert(shared_resource->io_outstanding > 0); 3226 bdev_ch->io_outstanding--; 3227 shared_resource->io_outstanding--; 3228 3229 if (spdk_unlikely(status == SPDK_BDEV_IO_STATUS_NOMEM)) { 3230 TAILQ_INSERT_HEAD(&shared_resource->nomem_io, bdev_io, internal.link); 3231 /* 3232 * Wait for some of the outstanding I/O to complete before we 3233 * retry any of the nomem_io. Normally we will wait for 3234 * NOMEM_THRESHOLD_COUNT I/O to complete but for low queue 3235 * depth channels we will instead wait for half to complete. 3236 */ 3237 shared_resource->nomem_threshold = spdk_max((int64_t)shared_resource->io_outstanding / 2, 3238 (int64_t)shared_resource->io_outstanding - NOMEM_THRESHOLD_COUNT); 3239 return; 3240 } 3241 3242 if (spdk_unlikely(!TAILQ_EMPTY(&shared_resource->nomem_io))) { 3243 _spdk_bdev_ch_retry_io(bdev_ch); 3244 } 3245 } 3246 3247 _spdk_bdev_io_complete(bdev_io); 3248 } 3249 3250 void 3251 spdk_bdev_io_complete_scsi_status(struct spdk_bdev_io *bdev_io, enum spdk_scsi_status sc, 3252 enum spdk_scsi_sense sk, uint8_t asc, uint8_t ascq) 3253 { 3254 if (sc == SPDK_SCSI_STATUS_GOOD) { 3255 bdev_io->internal.status = SPDK_BDEV_IO_STATUS_SUCCESS; 3256 } else { 3257 bdev_io->internal.status = SPDK_BDEV_IO_STATUS_SCSI_ERROR; 3258 bdev_io->internal.error.scsi.sc = sc; 3259 bdev_io->internal.error.scsi.sk = sk; 3260 bdev_io->internal.error.scsi.asc = asc; 3261 bdev_io->internal.error.scsi.ascq = ascq; 3262 } 3263 3264 spdk_bdev_io_complete(bdev_io, bdev_io->internal.status); 3265 } 3266 3267 void 3268 spdk_bdev_io_get_scsi_status(const struct spdk_bdev_io *bdev_io, 3269 int *sc, int *sk, int *asc, int *ascq) 3270 { 3271 assert(sc != NULL); 3272 assert(sk != NULL); 3273 assert(asc != NULL); 3274 assert(ascq != NULL); 3275 3276 switch (bdev_io->internal.status) { 3277 case SPDK_BDEV_IO_STATUS_SUCCESS: 3278 *sc = SPDK_SCSI_STATUS_GOOD; 3279 *sk = SPDK_SCSI_SENSE_NO_SENSE; 3280 *asc = SPDK_SCSI_ASC_NO_ADDITIONAL_SENSE; 3281 *ascq = SPDK_SCSI_ASCQ_CAUSE_NOT_REPORTABLE; 3282 break; 3283 case SPDK_BDEV_IO_STATUS_NVME_ERROR: 3284 spdk_scsi_nvme_translate(bdev_io, sc, sk, asc, ascq); 3285 break; 3286 case SPDK_BDEV_IO_STATUS_SCSI_ERROR: 3287 *sc = bdev_io->internal.error.scsi.sc; 3288 *sk = bdev_io->internal.error.scsi.sk; 3289 *asc = bdev_io->internal.error.scsi.asc; 3290 *ascq = bdev_io->internal.error.scsi.ascq; 3291 break; 3292 default: 3293 *sc = SPDK_SCSI_STATUS_CHECK_CONDITION; 3294 *sk = SPDK_SCSI_SENSE_ABORTED_COMMAND; 3295 *asc = SPDK_SCSI_ASC_NO_ADDITIONAL_SENSE; 3296 *ascq = SPDK_SCSI_ASCQ_CAUSE_NOT_REPORTABLE; 3297 break; 3298 } 3299 } 3300 3301 void 3302 spdk_bdev_io_complete_nvme_status(struct spdk_bdev_io *bdev_io, int sct, int sc) 3303 { 3304 if (sct == SPDK_NVME_SCT_GENERIC && sc == SPDK_NVME_SC_SUCCESS) { 3305 bdev_io->internal.status = SPDK_BDEV_IO_STATUS_SUCCESS; 3306 } else { 3307 bdev_io->internal.error.nvme.sct = sct; 3308 bdev_io->internal.error.nvme.sc = sc; 3309 bdev_io->internal.status = SPDK_BDEV_IO_STATUS_NVME_ERROR; 3310 } 3311 3312 spdk_bdev_io_complete(bdev_io, bdev_io->internal.status); 3313 } 3314 3315 void 3316 spdk_bdev_io_get_nvme_status(const struct spdk_bdev_io *bdev_io, int *sct, int *sc) 3317 { 3318 assert(sct != NULL); 3319 assert(sc != NULL); 3320 3321 if (bdev_io->internal.status == SPDK_BDEV_IO_STATUS_NVME_ERROR) { 3322 *sct = bdev_io->internal.error.nvme.sct; 3323 *sc = bdev_io->internal.error.nvme.sc; 3324 } else if (bdev_io->internal.status == SPDK_BDEV_IO_STATUS_SUCCESS) { 3325 *sct = SPDK_NVME_SCT_GENERIC; 3326 *sc = SPDK_NVME_SC_SUCCESS; 3327 } else { 3328 *sct = SPDK_NVME_SCT_GENERIC; 3329 *sc = SPDK_NVME_SC_INTERNAL_DEVICE_ERROR; 3330 } 3331 } 3332 3333 struct spdk_thread * 3334 spdk_bdev_io_get_thread(struct spdk_bdev_io *bdev_io) 3335 { 3336 return spdk_io_channel_get_thread(bdev_io->internal.ch->channel); 3337 } 3338 3339 static void 3340 _spdk_bdev_qos_config_limit(struct spdk_bdev *bdev, uint64_t *limits) 3341 { 3342 uint64_t min_qos_set; 3343 int i; 3344 3345 for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) { 3346 if (limits[i] != SPDK_BDEV_QOS_LIMIT_NOT_DEFINED) { 3347 break; 3348 } 3349 } 3350 3351 if (i == SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES) { 3352 SPDK_ERRLOG("Invalid rate limits set.\n"); 3353 return; 3354 } 3355 3356 for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) { 3357 if (limits[i] == SPDK_BDEV_QOS_LIMIT_NOT_DEFINED) { 3358 continue; 3359 } 3360 3361 if (_spdk_bdev_qos_is_iops_rate_limit(i) == true) { 3362 min_qos_set = SPDK_BDEV_QOS_MIN_IOS_PER_SEC; 3363 } else { 3364 min_qos_set = SPDK_BDEV_QOS_MIN_BYTES_PER_SEC; 3365 } 3366 3367 if (limits[i] == 0 || limits[i] % min_qos_set) { 3368 SPDK_ERRLOG("Assigned limit %" PRIu64 " on bdev %s is not multiple of %" PRIu64 "\n", 3369 limits[i], bdev->name, min_qos_set); 3370 SPDK_ERRLOG("Failed to enable QoS on this bdev %s\n", bdev->name); 3371 return; 3372 } 3373 } 3374 3375 if (!bdev->internal.qos) { 3376 bdev->internal.qos = calloc(1, sizeof(*bdev->internal.qos)); 3377 if (!bdev->internal.qos) { 3378 SPDK_ERRLOG("Unable to allocate memory for QoS tracking\n"); 3379 return; 3380 } 3381 } 3382 3383 for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) { 3384 bdev->internal.qos->rate_limits[i].limit = limits[i]; 3385 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Bdev:%s QoS type:%d set:%lu\n", 3386 bdev->name, i, limits[i]); 3387 } 3388 3389 return; 3390 } 3391 3392 static void 3393 _spdk_bdev_qos_config(struct spdk_bdev *bdev) 3394 { 3395 struct spdk_conf_section *sp = NULL; 3396 const char *val = NULL; 3397 int i = 0, j = 0; 3398 uint64_t limits[SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES] = {}; 3399 bool config_qos = false; 3400 3401 sp = spdk_conf_find_section(NULL, "QoS"); 3402 if (!sp) { 3403 return; 3404 } 3405 3406 while (j < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES) { 3407 limits[j] = SPDK_BDEV_QOS_LIMIT_NOT_DEFINED; 3408 3409 i = 0; 3410 while (true) { 3411 val = spdk_conf_section_get_nmval(sp, qos_conf_type[j], i, 0); 3412 if (!val) { 3413 break; 3414 } 3415 3416 if (strcmp(bdev->name, val) != 0) { 3417 i++; 3418 continue; 3419 } 3420 3421 val = spdk_conf_section_get_nmval(sp, qos_conf_type[j], i, 1); 3422 if (val) { 3423 if (_spdk_bdev_qos_is_iops_rate_limit(j) == true) { 3424 limits[j] = strtoull(val, NULL, 10); 3425 } else { 3426 limits[j] = strtoull(val, NULL, 10) * 1024 * 1024; 3427 } 3428 config_qos = true; 3429 } 3430 3431 break; 3432 } 3433 3434 j++; 3435 } 3436 3437 if (config_qos == true) { 3438 _spdk_bdev_qos_config_limit(bdev, limits); 3439 } 3440 3441 return; 3442 } 3443 3444 static int 3445 spdk_bdev_init(struct spdk_bdev *bdev) 3446 { 3447 char *bdev_name; 3448 3449 assert(bdev->module != NULL); 3450 3451 if (!bdev->name) { 3452 SPDK_ERRLOG("Bdev name is NULL\n"); 3453 return -EINVAL; 3454 } 3455 3456 if (spdk_bdev_get_by_name(bdev->name)) { 3457 SPDK_ERRLOG("Bdev name:%s already exists\n", bdev->name); 3458 return -EEXIST; 3459 } 3460 3461 /* Users often register their own I/O devices using the bdev name. In 3462 * order to avoid conflicts, prepend bdev_. */ 3463 bdev_name = spdk_sprintf_alloc("bdev_%s", bdev->name); 3464 if (!bdev_name) { 3465 SPDK_ERRLOG("Unable to allocate memory for internal bdev name.\n"); 3466 return -ENOMEM; 3467 } 3468 3469 bdev->internal.status = SPDK_BDEV_STATUS_READY; 3470 bdev->internal.measured_queue_depth = UINT64_MAX; 3471 bdev->internal.claim_module = NULL; 3472 bdev->internal.qd_poller = NULL; 3473 bdev->internal.qos = NULL; 3474 3475 if (spdk_bdev_get_buf_align(bdev) > 1) { 3476 if (bdev->split_on_optimal_io_boundary) { 3477 bdev->optimal_io_boundary = spdk_min(bdev->optimal_io_boundary, 3478 SPDK_BDEV_LARGE_BUF_MAX_SIZE / bdev->blocklen); 3479 } else { 3480 bdev->split_on_optimal_io_boundary = true; 3481 bdev->optimal_io_boundary = SPDK_BDEV_LARGE_BUF_MAX_SIZE / bdev->blocklen; 3482 } 3483 } 3484 3485 TAILQ_INIT(&bdev->internal.open_descs); 3486 3487 TAILQ_INIT(&bdev->aliases); 3488 3489 bdev->internal.reset_in_progress = NULL; 3490 3491 _spdk_bdev_qos_config(bdev); 3492 3493 spdk_io_device_register(__bdev_to_io_dev(bdev), 3494 spdk_bdev_channel_create, spdk_bdev_channel_destroy, 3495 sizeof(struct spdk_bdev_channel), 3496 bdev_name); 3497 3498 free(bdev_name); 3499 3500 pthread_mutex_init(&bdev->internal.mutex, NULL); 3501 return 0; 3502 } 3503 3504 static void 3505 spdk_bdev_destroy_cb(void *io_device) 3506 { 3507 int rc; 3508 struct spdk_bdev *bdev; 3509 spdk_bdev_unregister_cb cb_fn; 3510 void *cb_arg; 3511 3512 bdev = __bdev_from_io_dev(io_device); 3513 cb_fn = bdev->internal.unregister_cb; 3514 cb_arg = bdev->internal.unregister_ctx; 3515 3516 rc = bdev->fn_table->destruct(bdev->ctxt); 3517 if (rc < 0) { 3518 SPDK_ERRLOG("destruct failed\n"); 3519 } 3520 if (rc <= 0 && cb_fn != NULL) { 3521 cb_fn(cb_arg, rc); 3522 } 3523 } 3524 3525 3526 static void 3527 spdk_bdev_fini(struct spdk_bdev *bdev) 3528 { 3529 pthread_mutex_destroy(&bdev->internal.mutex); 3530 3531 free(bdev->internal.qos); 3532 3533 spdk_io_device_unregister(__bdev_to_io_dev(bdev), spdk_bdev_destroy_cb); 3534 } 3535 3536 static void 3537 spdk_bdev_start(struct spdk_bdev *bdev) 3538 { 3539 struct spdk_bdev_module *module; 3540 uint32_t action; 3541 3542 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Inserting bdev %s into list\n", bdev->name); 3543 TAILQ_INSERT_TAIL(&g_bdev_mgr.bdevs, bdev, internal.link); 3544 3545 /* Examine configuration before initializing I/O */ 3546 TAILQ_FOREACH(module, &g_bdev_mgr.bdev_modules, internal.tailq) { 3547 if (module->examine_config) { 3548 action = module->internal.action_in_progress; 3549 module->internal.action_in_progress++; 3550 module->examine_config(bdev); 3551 if (action != module->internal.action_in_progress) { 3552 SPDK_ERRLOG("examine_config for module %s did not call spdk_bdev_module_examine_done()\n", 3553 module->name); 3554 } 3555 } 3556 } 3557 3558 if (bdev->internal.claim_module) { 3559 return; 3560 } 3561 3562 TAILQ_FOREACH(module, &g_bdev_mgr.bdev_modules, internal.tailq) { 3563 if (module->examine_disk) { 3564 module->internal.action_in_progress++; 3565 module->examine_disk(bdev); 3566 } 3567 } 3568 } 3569 3570 int 3571 spdk_bdev_register(struct spdk_bdev *bdev) 3572 { 3573 int rc = spdk_bdev_init(bdev); 3574 3575 if (rc == 0) { 3576 spdk_bdev_start(bdev); 3577 } 3578 3579 return rc; 3580 } 3581 3582 int 3583 spdk_vbdev_register(struct spdk_bdev *vbdev, struct spdk_bdev **base_bdevs, int base_bdev_count) 3584 { 3585 int rc; 3586 3587 rc = spdk_bdev_init(vbdev); 3588 if (rc) { 3589 return rc; 3590 } 3591 3592 spdk_bdev_start(vbdev); 3593 return 0; 3594 } 3595 3596 void 3597 spdk_bdev_destruct_done(struct spdk_bdev *bdev, int bdeverrno) 3598 { 3599 if (bdev->internal.unregister_cb != NULL) { 3600 bdev->internal.unregister_cb(bdev->internal.unregister_ctx, bdeverrno); 3601 } 3602 } 3603 3604 static void 3605 _remove_notify(void *arg) 3606 { 3607 struct spdk_bdev_desc *desc = arg; 3608 3609 desc->remove_scheduled = false; 3610 3611 if (desc->closed) { 3612 free(desc); 3613 } else { 3614 desc->remove_cb(desc->remove_ctx); 3615 } 3616 } 3617 3618 void 3619 spdk_bdev_unregister(struct spdk_bdev *bdev, spdk_bdev_unregister_cb cb_fn, void *cb_arg) 3620 { 3621 struct spdk_bdev_desc *desc, *tmp; 3622 bool do_destruct = true; 3623 struct spdk_thread *thread; 3624 3625 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Removing bdev %s from list\n", bdev->name); 3626 3627 thread = spdk_get_thread(); 3628 if (!thread) { 3629 /* The user called this from a non-SPDK thread. */ 3630 if (cb_fn != NULL) { 3631 cb_fn(cb_arg, -ENOTSUP); 3632 } 3633 return; 3634 } 3635 3636 pthread_mutex_lock(&bdev->internal.mutex); 3637 3638 bdev->internal.status = SPDK_BDEV_STATUS_REMOVING; 3639 bdev->internal.unregister_cb = cb_fn; 3640 bdev->internal.unregister_ctx = cb_arg; 3641 3642 TAILQ_FOREACH_SAFE(desc, &bdev->internal.open_descs, link, tmp) { 3643 if (desc->remove_cb) { 3644 do_destruct = false; 3645 /* 3646 * Defer invocation of the remove_cb to a separate message that will 3647 * run later on its thread. This ensures this context unwinds and 3648 * we don't recursively unregister this bdev again if the remove_cb 3649 * immediately closes its descriptor. 3650 */ 3651 if (!desc->remove_scheduled) { 3652 /* Avoid scheduling removal of the same descriptor multiple times. */ 3653 desc->remove_scheduled = true; 3654 spdk_thread_send_msg(desc->thread, _remove_notify, desc); 3655 } 3656 } 3657 } 3658 3659 if (!do_destruct) { 3660 pthread_mutex_unlock(&bdev->internal.mutex); 3661 return; 3662 } 3663 3664 TAILQ_REMOVE(&g_bdev_mgr.bdevs, bdev, internal.link); 3665 pthread_mutex_unlock(&bdev->internal.mutex); 3666 3667 spdk_bdev_fini(bdev); 3668 } 3669 3670 int 3671 spdk_bdev_open(struct spdk_bdev *bdev, bool write, spdk_bdev_remove_cb_t remove_cb, 3672 void *remove_ctx, struct spdk_bdev_desc **_desc) 3673 { 3674 struct spdk_bdev_desc *desc; 3675 struct spdk_thread *thread; 3676 3677 thread = spdk_get_thread(); 3678 if (!thread) { 3679 SPDK_ERRLOG("Cannot open bdev from non-SPDK thread.\n"); 3680 return -ENOTSUP; 3681 } 3682 3683 desc = calloc(1, sizeof(*desc)); 3684 if (desc == NULL) { 3685 SPDK_ERRLOG("Failed to allocate memory for bdev descriptor\n"); 3686 return -ENOMEM; 3687 } 3688 3689 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Opening descriptor %p for bdev %s on thread %p\n", desc, bdev->name, 3690 spdk_get_thread()); 3691 3692 desc->bdev = bdev; 3693 desc->thread = thread; 3694 desc->remove_cb = remove_cb; 3695 desc->remove_ctx = remove_ctx; 3696 desc->write = write; 3697 *_desc = desc; 3698 3699 pthread_mutex_lock(&bdev->internal.mutex); 3700 3701 if (write && bdev->internal.claim_module) { 3702 SPDK_ERRLOG("Could not open %s - %s module already claimed it\n", 3703 bdev->name, bdev->internal.claim_module->name); 3704 pthread_mutex_unlock(&bdev->internal.mutex); 3705 free(desc); 3706 *_desc = NULL; 3707 return -EPERM; 3708 } 3709 3710 TAILQ_INSERT_TAIL(&bdev->internal.open_descs, desc, link); 3711 3712 pthread_mutex_unlock(&bdev->internal.mutex); 3713 3714 return 0; 3715 } 3716 3717 void 3718 spdk_bdev_close(struct spdk_bdev_desc *desc) 3719 { 3720 struct spdk_bdev *bdev = desc->bdev; 3721 bool do_unregister = false; 3722 3723 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Closing descriptor %p for bdev %s on thread %p\n", desc, bdev->name, 3724 spdk_get_thread()); 3725 3726 assert(desc->thread == spdk_get_thread()); 3727 3728 pthread_mutex_lock(&bdev->internal.mutex); 3729 3730 TAILQ_REMOVE(&bdev->internal.open_descs, desc, link); 3731 3732 desc->closed = true; 3733 3734 if (!desc->remove_scheduled) { 3735 free(desc); 3736 } 3737 3738 /* If no more descriptors, kill QoS channel */ 3739 if (bdev->internal.qos && TAILQ_EMPTY(&bdev->internal.open_descs)) { 3740 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Closed last descriptor for bdev %s on thread %p. Stopping QoS.\n", 3741 bdev->name, spdk_get_thread()); 3742 3743 if (spdk_bdev_qos_destroy(bdev)) { 3744 /* There isn't anything we can do to recover here. Just let the 3745 * old QoS poller keep running. The QoS handling won't change 3746 * cores when the user allocates a new channel, but it won't break. */ 3747 SPDK_ERRLOG("Unable to shut down QoS poller. It will continue running on the current thread.\n"); 3748 } 3749 } 3750 3751 spdk_bdev_set_qd_sampling_period(bdev, 0); 3752 3753 if (bdev->internal.status == SPDK_BDEV_STATUS_REMOVING && TAILQ_EMPTY(&bdev->internal.open_descs)) { 3754 do_unregister = true; 3755 } 3756 pthread_mutex_unlock(&bdev->internal.mutex); 3757 3758 if (do_unregister == true) { 3759 spdk_bdev_unregister(bdev, bdev->internal.unregister_cb, bdev->internal.unregister_ctx); 3760 } 3761 } 3762 3763 int 3764 spdk_bdev_module_claim_bdev(struct spdk_bdev *bdev, struct spdk_bdev_desc *desc, 3765 struct spdk_bdev_module *module) 3766 { 3767 if (bdev->internal.claim_module != NULL) { 3768 SPDK_ERRLOG("bdev %s already claimed by module %s\n", bdev->name, 3769 bdev->internal.claim_module->name); 3770 return -EPERM; 3771 } 3772 3773 if (desc && !desc->write) { 3774 desc->write = true; 3775 } 3776 3777 bdev->internal.claim_module = module; 3778 return 0; 3779 } 3780 3781 void 3782 spdk_bdev_module_release_bdev(struct spdk_bdev *bdev) 3783 { 3784 assert(bdev->internal.claim_module != NULL); 3785 bdev->internal.claim_module = NULL; 3786 } 3787 3788 struct spdk_bdev * 3789 spdk_bdev_desc_get_bdev(struct spdk_bdev_desc *desc) 3790 { 3791 return desc->bdev; 3792 } 3793 3794 void 3795 spdk_bdev_io_get_iovec(struct spdk_bdev_io *bdev_io, struct iovec **iovp, int *iovcntp) 3796 { 3797 struct iovec *iovs; 3798 int iovcnt; 3799 3800 if (bdev_io == NULL) { 3801 return; 3802 } 3803 3804 switch (bdev_io->type) { 3805 case SPDK_BDEV_IO_TYPE_READ: 3806 iovs = bdev_io->u.bdev.iovs; 3807 iovcnt = bdev_io->u.bdev.iovcnt; 3808 break; 3809 case SPDK_BDEV_IO_TYPE_WRITE: 3810 iovs = bdev_io->u.bdev.iovs; 3811 iovcnt = bdev_io->u.bdev.iovcnt; 3812 break; 3813 default: 3814 iovs = NULL; 3815 iovcnt = 0; 3816 break; 3817 } 3818 3819 if (iovp) { 3820 *iovp = iovs; 3821 } 3822 if (iovcntp) { 3823 *iovcntp = iovcnt; 3824 } 3825 } 3826 3827 void 3828 spdk_bdev_module_list_add(struct spdk_bdev_module *bdev_module) 3829 { 3830 3831 if (spdk_bdev_module_list_find(bdev_module->name)) { 3832 SPDK_ERRLOG("ERROR: module '%s' already registered.\n", bdev_module->name); 3833 assert(false); 3834 } 3835 3836 if (bdev_module->async_init) { 3837 bdev_module->internal.action_in_progress = 1; 3838 } 3839 3840 /* 3841 * Modules with examine callbacks must be initialized first, so they are 3842 * ready to handle examine callbacks from later modules that will 3843 * register physical bdevs. 3844 */ 3845 if (bdev_module->examine_config != NULL || bdev_module->examine_disk != NULL) { 3846 TAILQ_INSERT_HEAD(&g_bdev_mgr.bdev_modules, bdev_module, internal.tailq); 3847 } else { 3848 TAILQ_INSERT_TAIL(&g_bdev_mgr.bdev_modules, bdev_module, internal.tailq); 3849 } 3850 } 3851 3852 struct spdk_bdev_module * 3853 spdk_bdev_module_list_find(const char *name) 3854 { 3855 struct spdk_bdev_module *bdev_module; 3856 3857 TAILQ_FOREACH(bdev_module, &g_bdev_mgr.bdev_modules, internal.tailq) { 3858 if (strcmp(name, bdev_module->name) == 0) { 3859 break; 3860 } 3861 } 3862 3863 return bdev_module; 3864 } 3865 3866 static void 3867 _spdk_bdev_write_zero_buffer_next(void *_bdev_io) 3868 { 3869 struct spdk_bdev_io *bdev_io = _bdev_io; 3870 uint64_t num_bytes, num_blocks; 3871 int rc; 3872 3873 num_bytes = spdk_min(spdk_bdev_get_block_size(bdev_io->bdev) * 3874 bdev_io->u.bdev.split_remaining_num_blocks, 3875 ZERO_BUFFER_SIZE); 3876 num_blocks = num_bytes / spdk_bdev_get_block_size(bdev_io->bdev); 3877 3878 rc = spdk_bdev_write_blocks(bdev_io->internal.desc, 3879 spdk_io_channel_from_ctx(bdev_io->internal.ch), 3880 g_bdev_mgr.zero_buffer, 3881 bdev_io->u.bdev.split_current_offset_blocks, num_blocks, 3882 _spdk_bdev_write_zero_buffer_done, bdev_io); 3883 if (rc == 0) { 3884 bdev_io->u.bdev.split_remaining_num_blocks -= num_blocks; 3885 bdev_io->u.bdev.split_current_offset_blocks += num_blocks; 3886 } else if (rc == -ENOMEM) { 3887 _spdk_bdev_queue_io_wait_with_cb(bdev_io, _spdk_bdev_write_zero_buffer_next); 3888 } else { 3889 bdev_io->internal.status = SPDK_BDEV_IO_STATUS_FAILED; 3890 bdev_io->internal.cb(bdev_io, false, bdev_io->internal.caller_ctx); 3891 } 3892 } 3893 3894 static void 3895 _spdk_bdev_write_zero_buffer_done(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg) 3896 { 3897 struct spdk_bdev_io *parent_io = cb_arg; 3898 3899 spdk_bdev_free_io(bdev_io); 3900 3901 if (!success) { 3902 parent_io->internal.status = SPDK_BDEV_IO_STATUS_FAILED; 3903 parent_io->internal.cb(parent_io, false, parent_io->internal.caller_ctx); 3904 return; 3905 } 3906 3907 if (parent_io->u.bdev.split_remaining_num_blocks == 0) { 3908 parent_io->internal.status = SPDK_BDEV_IO_STATUS_SUCCESS; 3909 parent_io->internal.cb(parent_io, true, parent_io->internal.caller_ctx); 3910 return; 3911 } 3912 3913 _spdk_bdev_write_zero_buffer_next(parent_io); 3914 } 3915 3916 struct set_qos_limit_ctx { 3917 void (*cb_fn)(void *cb_arg, int status); 3918 void *cb_arg; 3919 struct spdk_bdev *bdev; 3920 }; 3921 3922 static void 3923 _spdk_bdev_set_qos_limit_done(struct set_qos_limit_ctx *ctx, int status) 3924 { 3925 pthread_mutex_lock(&ctx->bdev->internal.mutex); 3926 ctx->bdev->internal.qos_mod_in_progress = false; 3927 pthread_mutex_unlock(&ctx->bdev->internal.mutex); 3928 3929 ctx->cb_fn(ctx->cb_arg, status); 3930 free(ctx); 3931 } 3932 3933 static void 3934 _spdk_bdev_disable_qos_done(void *cb_arg) 3935 { 3936 struct set_qos_limit_ctx *ctx = cb_arg; 3937 struct spdk_bdev *bdev = ctx->bdev; 3938 struct spdk_bdev_io *bdev_io; 3939 struct spdk_bdev_qos *qos; 3940 3941 pthread_mutex_lock(&bdev->internal.mutex); 3942 qos = bdev->internal.qos; 3943 bdev->internal.qos = NULL; 3944 pthread_mutex_unlock(&bdev->internal.mutex); 3945 3946 while (!TAILQ_EMPTY(&qos->queued)) { 3947 /* Send queued I/O back to their original thread for resubmission. */ 3948 bdev_io = TAILQ_FIRST(&qos->queued); 3949 TAILQ_REMOVE(&qos->queued, bdev_io, internal.link); 3950 3951 if (bdev_io->internal.io_submit_ch) { 3952 /* 3953 * Channel was changed when sending it to the QoS thread - change it back 3954 * before sending it back to the original thread. 3955 */ 3956 bdev_io->internal.ch = bdev_io->internal.io_submit_ch; 3957 bdev_io->internal.io_submit_ch = NULL; 3958 } 3959 3960 spdk_thread_send_msg(spdk_io_channel_get_thread(bdev_io->internal.ch->channel), 3961 _spdk_bdev_io_submit, bdev_io); 3962 } 3963 3964 if (qos->thread != NULL) { 3965 spdk_put_io_channel(spdk_io_channel_from_ctx(qos->ch)); 3966 spdk_poller_unregister(&qos->poller); 3967 } 3968 3969 free(qos); 3970 3971 _spdk_bdev_set_qos_limit_done(ctx, 0); 3972 } 3973 3974 static void 3975 _spdk_bdev_disable_qos_msg_done(struct spdk_io_channel_iter *i, int status) 3976 { 3977 void *io_device = spdk_io_channel_iter_get_io_device(i); 3978 struct spdk_bdev *bdev = __bdev_from_io_dev(io_device); 3979 struct set_qos_limit_ctx *ctx = spdk_io_channel_iter_get_ctx(i); 3980 struct spdk_thread *thread; 3981 3982 pthread_mutex_lock(&bdev->internal.mutex); 3983 thread = bdev->internal.qos->thread; 3984 pthread_mutex_unlock(&bdev->internal.mutex); 3985 3986 if (thread != NULL) { 3987 spdk_thread_send_msg(thread, _spdk_bdev_disable_qos_done, ctx); 3988 } else { 3989 _spdk_bdev_disable_qos_done(ctx); 3990 } 3991 } 3992 3993 static void 3994 _spdk_bdev_disable_qos_msg(struct spdk_io_channel_iter *i) 3995 { 3996 struct spdk_io_channel *ch = spdk_io_channel_iter_get_channel(i); 3997 struct spdk_bdev_channel *bdev_ch = spdk_io_channel_get_ctx(ch); 3998 3999 bdev_ch->flags &= ~BDEV_CH_QOS_ENABLED; 4000 4001 spdk_for_each_channel_continue(i, 0); 4002 } 4003 4004 static void 4005 _spdk_bdev_update_qos_rate_limit_msg(void *cb_arg) 4006 { 4007 struct set_qos_limit_ctx *ctx = cb_arg; 4008 struct spdk_bdev *bdev = ctx->bdev; 4009 4010 pthread_mutex_lock(&bdev->internal.mutex); 4011 spdk_bdev_qos_update_max_quota_per_timeslice(bdev->internal.qos); 4012 pthread_mutex_unlock(&bdev->internal.mutex); 4013 4014 _spdk_bdev_set_qos_limit_done(ctx, 0); 4015 } 4016 4017 static void 4018 _spdk_bdev_enable_qos_msg(struct spdk_io_channel_iter *i) 4019 { 4020 void *io_device = spdk_io_channel_iter_get_io_device(i); 4021 struct spdk_bdev *bdev = __bdev_from_io_dev(io_device); 4022 struct spdk_io_channel *ch = spdk_io_channel_iter_get_channel(i); 4023 struct spdk_bdev_channel *bdev_ch = spdk_io_channel_get_ctx(ch); 4024 4025 pthread_mutex_lock(&bdev->internal.mutex); 4026 _spdk_bdev_enable_qos(bdev, bdev_ch); 4027 pthread_mutex_unlock(&bdev->internal.mutex); 4028 spdk_for_each_channel_continue(i, 0); 4029 } 4030 4031 static void 4032 _spdk_bdev_enable_qos_done(struct spdk_io_channel_iter *i, int status) 4033 { 4034 struct set_qos_limit_ctx *ctx = spdk_io_channel_iter_get_ctx(i); 4035 4036 _spdk_bdev_set_qos_limit_done(ctx, status); 4037 } 4038 4039 static void 4040 _spdk_bdev_set_qos_rate_limits(struct spdk_bdev *bdev, uint64_t *limits) 4041 { 4042 int i; 4043 4044 assert(bdev->internal.qos != NULL); 4045 4046 for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) { 4047 if (limits[i] != SPDK_BDEV_QOS_LIMIT_NOT_DEFINED) { 4048 bdev->internal.qos->rate_limits[i].limit = limits[i]; 4049 4050 if (limits[i] == 0) { 4051 bdev->internal.qos->rate_limits[i].limit = 4052 SPDK_BDEV_QOS_LIMIT_NOT_DEFINED; 4053 } 4054 } 4055 } 4056 } 4057 4058 void 4059 spdk_bdev_set_qos_rate_limits(struct spdk_bdev *bdev, uint64_t *limits, 4060 void (*cb_fn)(void *cb_arg, int status), void *cb_arg) 4061 { 4062 struct set_qos_limit_ctx *ctx; 4063 uint32_t limit_set_complement; 4064 uint64_t min_limit_per_sec; 4065 int i; 4066 bool disable_rate_limit = true; 4067 4068 for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) { 4069 if (limits[i] == SPDK_BDEV_QOS_LIMIT_NOT_DEFINED) { 4070 continue; 4071 } 4072 4073 if (limits[i] > 0) { 4074 disable_rate_limit = false; 4075 } 4076 4077 if (_spdk_bdev_qos_is_iops_rate_limit(i) == true) { 4078 min_limit_per_sec = SPDK_BDEV_QOS_MIN_IOS_PER_SEC; 4079 } else { 4080 /* Change from megabyte to byte rate limit */ 4081 limits[i] = limits[i] * 1024 * 1024; 4082 min_limit_per_sec = SPDK_BDEV_QOS_MIN_BYTES_PER_SEC; 4083 } 4084 4085 limit_set_complement = limits[i] % min_limit_per_sec; 4086 if (limit_set_complement) { 4087 SPDK_ERRLOG("Requested rate limit %" PRIu64 " is not a multiple of %" PRIu64 "\n", 4088 limits[i], min_limit_per_sec); 4089 limits[i] += min_limit_per_sec - limit_set_complement; 4090 SPDK_ERRLOG("Round up the rate limit to %" PRIu64 "\n", limits[i]); 4091 } 4092 } 4093 4094 ctx = calloc(1, sizeof(*ctx)); 4095 if (ctx == NULL) { 4096 cb_fn(cb_arg, -ENOMEM); 4097 return; 4098 } 4099 4100 ctx->cb_fn = cb_fn; 4101 ctx->cb_arg = cb_arg; 4102 ctx->bdev = bdev; 4103 4104 pthread_mutex_lock(&bdev->internal.mutex); 4105 if (bdev->internal.qos_mod_in_progress) { 4106 pthread_mutex_unlock(&bdev->internal.mutex); 4107 free(ctx); 4108 cb_fn(cb_arg, -EAGAIN); 4109 return; 4110 } 4111 bdev->internal.qos_mod_in_progress = true; 4112 4113 if (disable_rate_limit == true && bdev->internal.qos) { 4114 for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) { 4115 if (limits[i] == SPDK_BDEV_QOS_LIMIT_NOT_DEFINED && 4116 (bdev->internal.qos->rate_limits[i].limit > 0 && 4117 bdev->internal.qos->rate_limits[i].limit != 4118 SPDK_BDEV_QOS_LIMIT_NOT_DEFINED)) { 4119 disable_rate_limit = false; 4120 break; 4121 } 4122 } 4123 } 4124 4125 if (disable_rate_limit == false) { 4126 if (bdev->internal.qos == NULL) { 4127 bdev->internal.qos = calloc(1, sizeof(*bdev->internal.qos)); 4128 if (!bdev->internal.qos) { 4129 pthread_mutex_unlock(&bdev->internal.mutex); 4130 SPDK_ERRLOG("Unable to allocate memory for QoS tracking\n"); 4131 free(ctx); 4132 cb_fn(cb_arg, -ENOMEM); 4133 return; 4134 } 4135 } 4136 4137 if (bdev->internal.qos->thread == NULL) { 4138 /* Enabling */ 4139 _spdk_bdev_set_qos_rate_limits(bdev, limits); 4140 4141 spdk_for_each_channel(__bdev_to_io_dev(bdev), 4142 _spdk_bdev_enable_qos_msg, ctx, 4143 _spdk_bdev_enable_qos_done); 4144 } else { 4145 /* Updating */ 4146 _spdk_bdev_set_qos_rate_limits(bdev, limits); 4147 4148 spdk_thread_send_msg(bdev->internal.qos->thread, 4149 _spdk_bdev_update_qos_rate_limit_msg, ctx); 4150 } 4151 } else { 4152 if (bdev->internal.qos != NULL) { 4153 _spdk_bdev_set_qos_rate_limits(bdev, limits); 4154 4155 /* Disabling */ 4156 spdk_for_each_channel(__bdev_to_io_dev(bdev), 4157 _spdk_bdev_disable_qos_msg, ctx, 4158 _spdk_bdev_disable_qos_msg_done); 4159 } else { 4160 pthread_mutex_unlock(&bdev->internal.mutex); 4161 _spdk_bdev_set_qos_limit_done(ctx, 0); 4162 return; 4163 } 4164 } 4165 4166 pthread_mutex_unlock(&bdev->internal.mutex); 4167 } 4168 4169 struct spdk_bdev_histogram_ctx { 4170 spdk_bdev_histogram_status_cb cb_fn; 4171 void *cb_arg; 4172 struct spdk_bdev *bdev; 4173 int status; 4174 }; 4175 4176 static void 4177 _spdk_bdev_histogram_disable_channel_cb(struct spdk_io_channel_iter *i, int status) 4178 { 4179 struct spdk_bdev_histogram_ctx *ctx = spdk_io_channel_iter_get_ctx(i); 4180 4181 pthread_mutex_lock(&ctx->bdev->internal.mutex); 4182 ctx->bdev->internal.histogram_in_progress = false; 4183 pthread_mutex_unlock(&ctx->bdev->internal.mutex); 4184 ctx->cb_fn(ctx->cb_arg, ctx->status); 4185 free(ctx); 4186 } 4187 4188 static void 4189 _spdk_bdev_histogram_disable_channel(struct spdk_io_channel_iter *i) 4190 { 4191 struct spdk_io_channel *_ch = spdk_io_channel_iter_get_channel(i); 4192 struct spdk_bdev_channel *ch = spdk_io_channel_get_ctx(_ch); 4193 4194 if (ch->histogram != NULL) { 4195 spdk_histogram_data_free(ch->histogram); 4196 ch->histogram = NULL; 4197 } 4198 spdk_for_each_channel_continue(i, 0); 4199 } 4200 4201 static void 4202 _spdk_bdev_histogram_enable_channel_cb(struct spdk_io_channel_iter *i, int status) 4203 { 4204 struct spdk_bdev_histogram_ctx *ctx = spdk_io_channel_iter_get_ctx(i); 4205 4206 if (status != 0) { 4207 ctx->status = status; 4208 ctx->bdev->internal.histogram_enabled = false; 4209 spdk_for_each_channel(__bdev_to_io_dev(ctx->bdev), _spdk_bdev_histogram_disable_channel, ctx, 4210 _spdk_bdev_histogram_disable_channel_cb); 4211 } else { 4212 pthread_mutex_lock(&ctx->bdev->internal.mutex); 4213 ctx->bdev->internal.histogram_in_progress = false; 4214 pthread_mutex_unlock(&ctx->bdev->internal.mutex); 4215 ctx->cb_fn(ctx->cb_arg, ctx->status); 4216 free(ctx); 4217 } 4218 } 4219 4220 static void 4221 _spdk_bdev_histogram_enable_channel(struct spdk_io_channel_iter *i) 4222 { 4223 struct spdk_io_channel *_ch = spdk_io_channel_iter_get_channel(i); 4224 struct spdk_bdev_channel *ch = spdk_io_channel_get_ctx(_ch); 4225 int status = 0; 4226 4227 if (ch->histogram == NULL) { 4228 ch->histogram = spdk_histogram_data_alloc(); 4229 if (ch->histogram == NULL) { 4230 status = -ENOMEM; 4231 } 4232 } 4233 4234 spdk_for_each_channel_continue(i, status); 4235 } 4236 4237 void 4238 spdk_bdev_histogram_enable(struct spdk_bdev *bdev, spdk_bdev_histogram_status_cb cb_fn, 4239 void *cb_arg, bool enable) 4240 { 4241 struct spdk_bdev_histogram_ctx *ctx; 4242 4243 ctx = calloc(1, sizeof(struct spdk_bdev_histogram_ctx)); 4244 if (ctx == NULL) { 4245 cb_fn(cb_arg, -ENOMEM); 4246 return; 4247 } 4248 4249 ctx->bdev = bdev; 4250 ctx->status = 0; 4251 ctx->cb_fn = cb_fn; 4252 ctx->cb_arg = cb_arg; 4253 4254 pthread_mutex_lock(&bdev->internal.mutex); 4255 if (bdev->internal.histogram_in_progress) { 4256 pthread_mutex_unlock(&bdev->internal.mutex); 4257 free(ctx); 4258 cb_fn(cb_arg, -EAGAIN); 4259 return; 4260 } 4261 4262 bdev->internal.histogram_in_progress = true; 4263 pthread_mutex_unlock(&bdev->internal.mutex); 4264 4265 bdev->internal.histogram_enabled = enable; 4266 4267 if (enable) { 4268 /* Allocate histogram for each channel */ 4269 spdk_for_each_channel(__bdev_to_io_dev(bdev), _spdk_bdev_histogram_enable_channel, ctx, 4270 _spdk_bdev_histogram_enable_channel_cb); 4271 } else { 4272 spdk_for_each_channel(__bdev_to_io_dev(bdev), _spdk_bdev_histogram_disable_channel, ctx, 4273 _spdk_bdev_histogram_disable_channel_cb); 4274 } 4275 } 4276 4277 struct spdk_bdev_histogram_data_ctx { 4278 spdk_bdev_histogram_data_cb cb_fn; 4279 void *cb_arg; 4280 struct spdk_bdev *bdev; 4281 /** merged histogram data from all channels */ 4282 struct spdk_histogram_data *histogram; 4283 }; 4284 4285 static void 4286 _spdk_bdev_histogram_get_channel_cb(struct spdk_io_channel_iter *i, int status) 4287 { 4288 struct spdk_bdev_histogram_data_ctx *ctx = spdk_io_channel_iter_get_ctx(i); 4289 4290 ctx->cb_fn(ctx->cb_arg, status, ctx->histogram); 4291 free(ctx); 4292 } 4293 4294 static void 4295 _spdk_bdev_histogram_get_channel(struct spdk_io_channel_iter *i) 4296 { 4297 struct spdk_io_channel *_ch = spdk_io_channel_iter_get_channel(i); 4298 struct spdk_bdev_channel *ch = spdk_io_channel_get_ctx(_ch); 4299 struct spdk_bdev_histogram_data_ctx *ctx = spdk_io_channel_iter_get_ctx(i); 4300 int status = 0; 4301 4302 if (ch->histogram == NULL) { 4303 status = -EFAULT; 4304 } else { 4305 spdk_histogram_data_merge(ctx->histogram, ch->histogram); 4306 } 4307 4308 spdk_for_each_channel_continue(i, status); 4309 } 4310 4311 void 4312 spdk_bdev_histogram_get(struct spdk_bdev *bdev, struct spdk_histogram_data *histogram, 4313 spdk_bdev_histogram_data_cb cb_fn, 4314 void *cb_arg) 4315 { 4316 struct spdk_bdev_histogram_data_ctx *ctx; 4317 4318 ctx = calloc(1, sizeof(struct spdk_bdev_histogram_data_ctx)); 4319 if (ctx == NULL) { 4320 cb_fn(cb_arg, -ENOMEM, NULL); 4321 return; 4322 } 4323 4324 ctx->bdev = bdev; 4325 ctx->cb_fn = cb_fn; 4326 ctx->cb_arg = cb_arg; 4327 4328 ctx->histogram = histogram; 4329 4330 spdk_for_each_channel(__bdev_to_io_dev(bdev), _spdk_bdev_histogram_get_channel, ctx, 4331 _spdk_bdev_histogram_get_channel_cb); 4332 } 4333 4334 SPDK_LOG_REGISTER_COMPONENT("bdev", SPDK_LOG_BDEV) 4335 4336 SPDK_TRACE_REGISTER_FN(bdev_trace, "bdev", TRACE_GROUP_BDEV) 4337 { 4338 spdk_trace_register_owner(OWNER_BDEV, 'b'); 4339 spdk_trace_register_object(OBJECT_BDEV_IO, 'i'); 4340 spdk_trace_register_description("BDEV_IO_START", "", TRACE_BDEV_IO_START, OWNER_BDEV, 4341 OBJECT_BDEV_IO, 1, 0, "type: "); 4342 spdk_trace_register_description("BDEV_IO_DONE", "", TRACE_BDEV_IO_DONE, OWNER_BDEV, 4343 OBJECT_BDEV_IO, 0, 0, ""); 4344 } 4345