1 /*- 2 * BSD LICENSE 3 * 4 * Copyright (c) Intel Corporation. 5 * All rights reserved. 6 * 7 * Redistribution and use in source and binary forms, with or without 8 * modification, are permitted provided that the following conditions 9 * are met: 10 * 11 * * Redistributions of source code must retain the above copyright 12 * notice, this list of conditions and the following disclaimer. 13 * * Redistributions in binary form must reproduce the above copyright 14 * notice, this list of conditions and the following disclaimer in 15 * the documentation and/or other materials provided with the 16 * distribution. 17 * * Neither the name of Intel Corporation nor the names of its 18 * contributors may be used to endorse or promote products derived 19 * from this software without specific prior written permission. 20 * 21 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 22 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 23 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR 24 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT 25 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, 26 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT 27 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 28 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY 29 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 30 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE 31 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 32 */ 33 34 #include "spdk/stdinc.h" 35 36 #include "spdk/bdev.h" 37 #include "spdk/conf.h" 38 39 #include "spdk/config.h" 40 #include "spdk/env.h" 41 #include "spdk/event.h" 42 #include "spdk/thread.h" 43 #include "spdk/likely.h" 44 #include "spdk/queue.h" 45 #include "spdk/nvme_spec.h" 46 #include "spdk/scsi_spec.h" 47 #include "spdk/util.h" 48 #include "spdk/trace.h" 49 50 #include "spdk/bdev_module.h" 51 #include "spdk_internal/log.h" 52 #include "spdk/string.h" 53 54 #ifdef SPDK_CONFIG_VTUNE 55 #include "ittnotify.h" 56 #include "ittnotify_types.h" 57 int __itt_init_ittlib(const char *, __itt_group_id); 58 #endif 59 60 #define SPDK_BDEV_IO_POOL_SIZE (64 * 1024) 61 #define SPDK_BDEV_IO_CACHE_SIZE 256 62 #define BUF_SMALL_POOL_SIZE 8192 63 #define BUF_LARGE_POOL_SIZE 1024 64 #define NOMEM_THRESHOLD_COUNT 8 65 #define ZERO_BUFFER_SIZE 0x100000 66 67 #define OWNER_BDEV 0x2 68 69 #define OBJECT_BDEV_IO 0x2 70 71 #define TRACE_GROUP_BDEV 0x3 72 #define TRACE_BDEV_IO_START SPDK_TPOINT_ID(TRACE_GROUP_BDEV, 0x0) 73 #define TRACE_BDEV_IO_DONE SPDK_TPOINT_ID(TRACE_GROUP_BDEV, 0x1) 74 75 #define SPDK_BDEV_QOS_TIMESLICE_IN_USEC 1000 76 #define SPDK_BDEV_QOS_MIN_IO_PER_TIMESLICE 1 77 #define SPDK_BDEV_QOS_MIN_BYTE_PER_TIMESLICE 512 78 #define SPDK_BDEV_QOS_MIN_IOS_PER_SEC 10000 79 #define SPDK_BDEV_QOS_MIN_BYTES_PER_SEC (10 * 1024 * 1024) 80 #define SPDK_BDEV_QOS_LIMIT_NOT_DEFINED UINT64_MAX 81 82 #define SPDK_BDEV_POOL_ALIGNMENT 512 83 84 static const char *qos_conf_type[] = {"Limit_IOPS", "Limit_BPS"}; 85 static const char *qos_rpc_type[] = {"rw_ios_per_sec", "rw_mbytes_per_sec"}; 86 87 TAILQ_HEAD(spdk_bdev_list, spdk_bdev); 88 89 struct spdk_bdev_mgr { 90 struct spdk_mempool *bdev_io_pool; 91 92 struct spdk_mempool *buf_small_pool; 93 struct spdk_mempool *buf_large_pool; 94 95 void *zero_buffer; 96 97 TAILQ_HEAD(bdev_module_list, spdk_bdev_module) bdev_modules; 98 99 struct spdk_bdev_list bdevs; 100 101 bool init_complete; 102 bool module_init_complete; 103 104 #ifdef SPDK_CONFIG_VTUNE 105 __itt_domain *domain; 106 #endif 107 }; 108 109 static struct spdk_bdev_mgr g_bdev_mgr = { 110 .bdev_modules = TAILQ_HEAD_INITIALIZER(g_bdev_mgr.bdev_modules), 111 .bdevs = TAILQ_HEAD_INITIALIZER(g_bdev_mgr.bdevs), 112 .init_complete = false, 113 .module_init_complete = false, 114 }; 115 116 static struct spdk_bdev_opts g_bdev_opts = { 117 .bdev_io_pool_size = SPDK_BDEV_IO_POOL_SIZE, 118 .bdev_io_cache_size = SPDK_BDEV_IO_CACHE_SIZE, 119 }; 120 121 static spdk_bdev_init_cb g_init_cb_fn = NULL; 122 static void *g_init_cb_arg = NULL; 123 124 static spdk_bdev_fini_cb g_fini_cb_fn = NULL; 125 static void *g_fini_cb_arg = NULL; 126 static struct spdk_thread *g_fini_thread = NULL; 127 128 struct spdk_bdev_qos_limit { 129 /** IOs or bytes allowed per second (i.e., 1s). */ 130 uint64_t limit; 131 132 /** Remaining IOs or bytes allowed in current timeslice (e.g., 1ms). 133 * For remaining bytes, allowed to run negative if an I/O is submitted when 134 * some bytes are remaining, but the I/O is bigger than that amount. The 135 * excess will be deducted from the next timeslice. 136 */ 137 int64_t remaining_this_timeslice; 138 139 /** Minimum allowed IOs or bytes to be issued in one timeslice (e.g., 1ms). */ 140 uint32_t min_per_timeslice; 141 142 /** Maximum allowed IOs or bytes to be issued in one timeslice (e.g., 1ms). */ 143 uint32_t max_per_timeslice; 144 }; 145 146 struct spdk_bdev_qos { 147 /** Types of structure of rate limits. */ 148 struct spdk_bdev_qos_limit rate_limits[SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES]; 149 150 /** The channel that all I/O are funneled through. */ 151 struct spdk_bdev_channel *ch; 152 153 /** The thread on which the poller is running. */ 154 struct spdk_thread *thread; 155 156 /** Queue of I/O waiting to be issued. */ 157 bdev_io_tailq_t queued; 158 159 /** Size of a timeslice in tsc ticks. */ 160 uint64_t timeslice_size; 161 162 /** Timestamp of start of last timeslice. */ 163 uint64_t last_timeslice; 164 165 /** Poller that processes queued I/O commands each time slice. */ 166 struct spdk_poller *poller; 167 }; 168 169 struct spdk_bdev_mgmt_channel { 170 bdev_io_stailq_t need_buf_small; 171 bdev_io_stailq_t need_buf_large; 172 173 /* 174 * Each thread keeps a cache of bdev_io - this allows 175 * bdev threads which are *not* DPDK threads to still 176 * benefit from a per-thread bdev_io cache. Without 177 * this, non-DPDK threads fetching from the mempool 178 * incur a cmpxchg on get and put. 179 */ 180 bdev_io_stailq_t per_thread_cache; 181 uint32_t per_thread_cache_count; 182 uint32_t bdev_io_cache_size; 183 184 TAILQ_HEAD(, spdk_bdev_shared_resource) shared_resources; 185 TAILQ_HEAD(, spdk_bdev_io_wait_entry) io_wait_queue; 186 }; 187 188 /* 189 * Per-module (or per-io_device) data. Multiple bdevs built on the same io_device 190 * will queue here their IO that awaits retry. It makes it possible to retry sending 191 * IO to one bdev after IO from other bdev completes. 192 */ 193 struct spdk_bdev_shared_resource { 194 /* The bdev management channel */ 195 struct spdk_bdev_mgmt_channel *mgmt_ch; 196 197 /* 198 * Count of I/O submitted to bdev module and waiting for completion. 199 * Incremented before submit_request() is called on an spdk_bdev_io. 200 */ 201 uint64_t io_outstanding; 202 203 /* 204 * Queue of IO awaiting retry because of a previous NOMEM status returned 205 * on this channel. 206 */ 207 bdev_io_tailq_t nomem_io; 208 209 /* 210 * Threshold which io_outstanding must drop to before retrying nomem_io. 211 */ 212 uint64_t nomem_threshold; 213 214 /* I/O channel allocated by a bdev module */ 215 struct spdk_io_channel *shared_ch; 216 217 /* Refcount of bdev channels using this resource */ 218 uint32_t ref; 219 220 TAILQ_ENTRY(spdk_bdev_shared_resource) link; 221 }; 222 223 #define BDEV_CH_RESET_IN_PROGRESS (1 << 0) 224 #define BDEV_CH_QOS_ENABLED (1 << 1) 225 226 struct spdk_bdev_channel { 227 struct spdk_bdev *bdev; 228 229 /* The channel for the underlying device */ 230 struct spdk_io_channel *channel; 231 232 /* Per io_device per thread data */ 233 struct spdk_bdev_shared_resource *shared_resource; 234 235 struct spdk_bdev_io_stat stat; 236 237 /* 238 * Count of I/O submitted through this channel and waiting for completion. 239 * Incremented before submit_request() is called on an spdk_bdev_io. 240 */ 241 uint64_t io_outstanding; 242 243 bdev_io_tailq_t queued_resets; 244 245 uint32_t flags; 246 247 #ifdef SPDK_CONFIG_VTUNE 248 uint64_t start_tsc; 249 uint64_t interval_tsc; 250 __itt_string_handle *handle; 251 struct spdk_bdev_io_stat prev_stat; 252 #endif 253 254 }; 255 256 struct spdk_bdev_desc { 257 struct spdk_bdev *bdev; 258 struct spdk_thread *thread; 259 spdk_bdev_remove_cb_t remove_cb; 260 void *remove_ctx; 261 bool remove_scheduled; 262 bool closed; 263 bool write; 264 TAILQ_ENTRY(spdk_bdev_desc) link; 265 }; 266 267 struct spdk_bdev_iostat_ctx { 268 struct spdk_bdev_io_stat *stat; 269 spdk_bdev_get_device_stat_cb cb; 270 void *cb_arg; 271 }; 272 273 #define __bdev_to_io_dev(bdev) (((char *)bdev) + 1) 274 #define __bdev_from_io_dev(io_dev) ((struct spdk_bdev *)(((char *)io_dev) - 1)) 275 276 static void _spdk_bdev_write_zero_buffer_done(struct spdk_bdev_io *bdev_io, bool success, 277 void *cb_arg); 278 static void _spdk_bdev_write_zero_buffer_next(void *_bdev_io); 279 280 void 281 spdk_bdev_get_opts(struct spdk_bdev_opts *opts) 282 { 283 *opts = g_bdev_opts; 284 } 285 286 int 287 spdk_bdev_set_opts(struct spdk_bdev_opts *opts) 288 { 289 uint32_t min_pool_size; 290 291 /* 292 * Add 1 to the thread count to account for the extra mgmt_ch that gets created during subsystem 293 * initialization. A second mgmt_ch will be created on the same thread when the application starts 294 * but before the deferred put_io_channel event is executed for the first mgmt_ch. 295 */ 296 min_pool_size = opts->bdev_io_cache_size * (spdk_thread_get_count() + 1); 297 if (opts->bdev_io_pool_size < min_pool_size) { 298 SPDK_ERRLOG("bdev_io_pool_size %" PRIu32 " is not compatible with bdev_io_cache_size %" PRIu32 299 " and %" PRIu32 " threads\n", opts->bdev_io_pool_size, opts->bdev_io_cache_size, 300 spdk_thread_get_count()); 301 SPDK_ERRLOG("bdev_io_pool_size must be at least %" PRIu32 "\n", min_pool_size); 302 return -1; 303 } 304 305 g_bdev_opts = *opts; 306 return 0; 307 } 308 309 struct spdk_bdev * 310 spdk_bdev_first(void) 311 { 312 struct spdk_bdev *bdev; 313 314 bdev = TAILQ_FIRST(&g_bdev_mgr.bdevs); 315 if (bdev) { 316 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Starting bdev iteration at %s\n", bdev->name); 317 } 318 319 return bdev; 320 } 321 322 struct spdk_bdev * 323 spdk_bdev_next(struct spdk_bdev *prev) 324 { 325 struct spdk_bdev *bdev; 326 327 bdev = TAILQ_NEXT(prev, internal.link); 328 if (bdev) { 329 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Continuing bdev iteration at %s\n", bdev->name); 330 } 331 332 return bdev; 333 } 334 335 static struct spdk_bdev * 336 _bdev_next_leaf(struct spdk_bdev *bdev) 337 { 338 while (bdev != NULL) { 339 if (bdev->internal.claim_module == NULL) { 340 return bdev; 341 } else { 342 bdev = TAILQ_NEXT(bdev, internal.link); 343 } 344 } 345 346 return bdev; 347 } 348 349 struct spdk_bdev * 350 spdk_bdev_first_leaf(void) 351 { 352 struct spdk_bdev *bdev; 353 354 bdev = _bdev_next_leaf(TAILQ_FIRST(&g_bdev_mgr.bdevs)); 355 356 if (bdev) { 357 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Starting bdev iteration at %s\n", bdev->name); 358 } 359 360 return bdev; 361 } 362 363 struct spdk_bdev * 364 spdk_bdev_next_leaf(struct spdk_bdev *prev) 365 { 366 struct spdk_bdev *bdev; 367 368 bdev = _bdev_next_leaf(TAILQ_NEXT(prev, internal.link)); 369 370 if (bdev) { 371 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Continuing bdev iteration at %s\n", bdev->name); 372 } 373 374 return bdev; 375 } 376 377 struct spdk_bdev * 378 spdk_bdev_get_by_name(const char *bdev_name) 379 { 380 struct spdk_bdev_alias *tmp; 381 struct spdk_bdev *bdev = spdk_bdev_first(); 382 383 while (bdev != NULL) { 384 if (strcmp(bdev_name, bdev->name) == 0) { 385 return bdev; 386 } 387 388 TAILQ_FOREACH(tmp, &bdev->aliases, tailq) { 389 if (strcmp(bdev_name, tmp->alias) == 0) { 390 return bdev; 391 } 392 } 393 394 bdev = spdk_bdev_next(bdev); 395 } 396 397 return NULL; 398 } 399 400 void 401 spdk_bdev_io_set_buf(struct spdk_bdev_io *bdev_io, void *buf, size_t len) 402 { 403 struct iovec *iovs; 404 405 iovs = bdev_io->u.bdev.iovs; 406 407 assert(iovs != NULL); 408 assert(bdev_io->u.bdev.iovcnt >= 1); 409 410 iovs[0].iov_base = buf; 411 iovs[0].iov_len = len; 412 } 413 414 static bool 415 _is_buf_allocated(struct iovec *iovs) 416 { 417 return iovs[0].iov_base != NULL; 418 } 419 420 static bool 421 _are_iovs_aligned(struct iovec *iovs, int iovcnt, uint32_t alignment) 422 { 423 int i; 424 uintptr_t iov_base; 425 426 if (spdk_likely(alignment == 1)) { 427 return true; 428 } 429 430 for (i = 0; i < iovcnt; i++) { 431 iov_base = (uintptr_t)iovs[i].iov_base; 432 if ((iov_base & (alignment - 1)) != 0) { 433 return false; 434 } 435 } 436 437 return true; 438 } 439 440 static void 441 _copy_iovs_to_buf(void *buf, size_t buf_len, struct iovec *iovs, int iovcnt) 442 { 443 int i; 444 size_t len; 445 446 for (i = 0; i < iovcnt; i++) { 447 len = spdk_min(iovs[i].iov_len, buf_len); 448 memcpy(buf, iovs[i].iov_base, len); 449 buf += len; 450 buf_len -= len; 451 } 452 } 453 454 static void 455 _copy_buf_to_iovs(struct iovec *iovs, int iovcnt, void *buf, size_t buf_len) 456 { 457 int i; 458 size_t len; 459 460 for (i = 0; i < iovcnt; i++) { 461 len = spdk_min(iovs[i].iov_len, buf_len); 462 memcpy(iovs[i].iov_base, buf, len); 463 buf += len; 464 buf_len -= len; 465 } 466 } 467 468 static void 469 _bdev_io_set_bounce_buf(struct spdk_bdev_io *bdev_io, void *buf, size_t len) 470 { 471 /* save original iovec */ 472 bdev_io->internal.orig_iovs = bdev_io->u.bdev.iovs; 473 bdev_io->internal.orig_iovcnt = bdev_io->u.bdev.iovcnt; 474 /* set bounce iov */ 475 bdev_io->u.bdev.iovs = &bdev_io->internal.bounce_iov; 476 bdev_io->u.bdev.iovcnt = 1; 477 /* set bounce buffer for this operation */ 478 bdev_io->u.bdev.iovs[0].iov_base = buf; 479 bdev_io->u.bdev.iovs[0].iov_len = len; 480 /* if this is write path, copy data from original buffer to bounce buffer */ 481 if (bdev_io->type == SPDK_BDEV_IO_TYPE_WRITE) { 482 _copy_iovs_to_buf(buf, len, bdev_io->internal.orig_iovs, bdev_io->internal.orig_iovcnt); 483 } 484 } 485 486 static void 487 spdk_bdev_io_put_buf(struct spdk_bdev_io *bdev_io) 488 { 489 struct spdk_mempool *pool; 490 struct spdk_bdev_io *tmp; 491 void *buf, *aligned_buf; 492 bdev_io_stailq_t *stailq; 493 struct spdk_bdev_mgmt_channel *ch; 494 uint64_t buf_len; 495 uint64_t alignment; 496 bool buf_allocated; 497 498 buf = bdev_io->internal.buf; 499 buf_len = bdev_io->internal.buf_len; 500 alignment = spdk_bdev_get_buf_align(bdev_io->bdev); 501 ch = bdev_io->internal.ch->shared_resource->mgmt_ch; 502 503 bdev_io->internal.buf = NULL; 504 505 if (buf_len + alignment <= SPDK_BDEV_SMALL_BUF_MAX_SIZE + SPDK_BDEV_POOL_ALIGNMENT) { 506 pool = g_bdev_mgr.buf_small_pool; 507 stailq = &ch->need_buf_small; 508 } else { 509 pool = g_bdev_mgr.buf_large_pool; 510 stailq = &ch->need_buf_large; 511 } 512 513 if (STAILQ_EMPTY(stailq)) { 514 spdk_mempool_put(pool, buf); 515 } else { 516 tmp = STAILQ_FIRST(stailq); 517 518 alignment = spdk_bdev_get_buf_align(tmp->bdev); 519 buf_allocated = _is_buf_allocated(tmp->u.bdev.iovs); 520 521 aligned_buf = (void *)(((uintptr_t)buf + 522 (alignment - 1)) & ~(alignment - 1)); 523 if (buf_allocated) { 524 _bdev_io_set_bounce_buf(tmp, aligned_buf, tmp->internal.buf_len); 525 } else { 526 spdk_bdev_io_set_buf(tmp, aligned_buf, tmp->internal.buf_len); 527 } 528 529 STAILQ_REMOVE_HEAD(stailq, internal.buf_link); 530 tmp->internal.buf = buf; 531 tmp->internal.get_buf_cb(tmp->internal.ch->channel, tmp); 532 } 533 } 534 535 static void 536 _bdev_io_unset_bounce_buf(struct spdk_bdev_io *bdev_io) 537 { 538 /* if this is read path, copy data from bounce buffer to original buffer */ 539 if (bdev_io->type == SPDK_BDEV_IO_TYPE_READ && 540 bdev_io->internal.status == SPDK_BDEV_IO_STATUS_SUCCESS) { 541 _copy_buf_to_iovs(bdev_io->internal.orig_iovs, bdev_io->internal.orig_iovcnt, 542 bdev_io->internal.bounce_iov.iov_base, bdev_io->internal.bounce_iov.iov_len); 543 } 544 /* set orignal buffer for this io */ 545 bdev_io->u.bdev.iovcnt = bdev_io->internal.orig_iovcnt; 546 bdev_io->u.bdev.iovs = bdev_io->internal.orig_iovs; 547 /* disable bouncing buffer for this io */ 548 bdev_io->internal.orig_iovcnt = 0; 549 bdev_io->internal.orig_iovs = NULL; 550 /* return bounce buffer to the pool */ 551 spdk_bdev_io_put_buf(bdev_io); 552 } 553 554 void 555 spdk_bdev_io_get_buf(struct spdk_bdev_io *bdev_io, spdk_bdev_io_get_buf_cb cb, uint64_t len) 556 { 557 struct spdk_mempool *pool; 558 bdev_io_stailq_t *stailq; 559 void *buf, *aligned_buf; 560 struct spdk_bdev_mgmt_channel *mgmt_ch; 561 uint64_t alignment; 562 bool buf_allocated; 563 564 assert(cb != NULL); 565 assert(bdev_io->u.bdev.iovs != NULL); 566 567 alignment = spdk_bdev_get_buf_align(bdev_io->bdev); 568 buf_allocated = _is_buf_allocated(bdev_io->u.bdev.iovs); 569 570 if (buf_allocated && 571 _are_iovs_aligned(bdev_io->u.bdev.iovs, bdev_io->u.bdev.iovcnt, alignment)) { 572 /* Buffer already present and aligned */ 573 cb(bdev_io->internal.ch->channel, bdev_io); 574 return; 575 } 576 577 assert(len + alignment <= SPDK_BDEV_LARGE_BUF_MAX_SIZE + SPDK_BDEV_POOL_ALIGNMENT); 578 mgmt_ch = bdev_io->internal.ch->shared_resource->mgmt_ch; 579 580 bdev_io->internal.buf_len = len; 581 bdev_io->internal.get_buf_cb = cb; 582 583 if (len + alignment <= SPDK_BDEV_SMALL_BUF_MAX_SIZE + SPDK_BDEV_POOL_ALIGNMENT) { 584 pool = g_bdev_mgr.buf_small_pool; 585 stailq = &mgmt_ch->need_buf_small; 586 } else { 587 pool = g_bdev_mgr.buf_large_pool; 588 stailq = &mgmt_ch->need_buf_large; 589 } 590 591 buf = spdk_mempool_get(pool); 592 593 if (!buf) { 594 STAILQ_INSERT_TAIL(stailq, bdev_io, internal.buf_link); 595 } else { 596 aligned_buf = (void *)(((uintptr_t)buf + (alignment - 1)) & ~(alignment - 1)); 597 598 if (buf_allocated) { 599 _bdev_io_set_bounce_buf(bdev_io, aligned_buf, len); 600 } else { 601 spdk_bdev_io_set_buf(bdev_io, aligned_buf, len); 602 } 603 bdev_io->internal.buf = buf; 604 bdev_io->internal.get_buf_cb(bdev_io->internal.ch->channel, bdev_io); 605 } 606 } 607 608 static int 609 spdk_bdev_module_get_max_ctx_size(void) 610 { 611 struct spdk_bdev_module *bdev_module; 612 int max_bdev_module_size = 0; 613 614 TAILQ_FOREACH(bdev_module, &g_bdev_mgr.bdev_modules, internal.tailq) { 615 if (bdev_module->get_ctx_size && bdev_module->get_ctx_size() > max_bdev_module_size) { 616 max_bdev_module_size = bdev_module->get_ctx_size(); 617 } 618 } 619 620 return max_bdev_module_size; 621 } 622 623 void 624 spdk_bdev_config_text(FILE *fp) 625 { 626 struct spdk_bdev_module *bdev_module; 627 628 TAILQ_FOREACH(bdev_module, &g_bdev_mgr.bdev_modules, internal.tailq) { 629 if (bdev_module->config_text) { 630 bdev_module->config_text(fp); 631 } 632 } 633 } 634 635 static void 636 spdk_bdev_qos_config_json(struct spdk_bdev *bdev, struct spdk_json_write_ctx *w) 637 { 638 int i; 639 struct spdk_bdev_qos *qos = bdev->internal.qos; 640 uint64_t limits[SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES]; 641 642 if (!qos) { 643 return; 644 } 645 646 spdk_bdev_get_qos_rate_limits(bdev, limits); 647 648 spdk_json_write_object_begin(w); 649 spdk_json_write_named_string(w, "method", "set_bdev_qos_limit"); 650 spdk_json_write_name(w, "params"); 651 652 spdk_json_write_object_begin(w); 653 spdk_json_write_named_string(w, "name", bdev->name); 654 for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) { 655 if (limits[i] > 0) { 656 spdk_json_write_named_uint64(w, qos_rpc_type[i], limits[i]); 657 } 658 } 659 spdk_json_write_object_end(w); 660 661 spdk_json_write_object_end(w); 662 } 663 664 void 665 spdk_bdev_subsystem_config_json(struct spdk_json_write_ctx *w) 666 { 667 struct spdk_bdev_module *bdev_module; 668 struct spdk_bdev *bdev; 669 670 assert(w != NULL); 671 672 spdk_json_write_array_begin(w); 673 674 spdk_json_write_object_begin(w); 675 spdk_json_write_named_string(w, "method", "set_bdev_options"); 676 spdk_json_write_name(w, "params"); 677 spdk_json_write_object_begin(w); 678 spdk_json_write_named_uint32(w, "bdev_io_pool_size", g_bdev_opts.bdev_io_pool_size); 679 spdk_json_write_named_uint32(w, "bdev_io_cache_size", g_bdev_opts.bdev_io_cache_size); 680 spdk_json_write_object_end(w); 681 spdk_json_write_object_end(w); 682 683 TAILQ_FOREACH(bdev_module, &g_bdev_mgr.bdev_modules, internal.tailq) { 684 if (bdev_module->config_json) { 685 bdev_module->config_json(w); 686 } 687 } 688 689 TAILQ_FOREACH(bdev, &g_bdev_mgr.bdevs, internal.link) { 690 spdk_bdev_qos_config_json(bdev, w); 691 692 if (bdev->fn_table->write_config_json) { 693 bdev->fn_table->write_config_json(bdev, w); 694 } 695 } 696 697 spdk_json_write_array_end(w); 698 } 699 700 static int 701 spdk_bdev_mgmt_channel_create(void *io_device, void *ctx_buf) 702 { 703 struct spdk_bdev_mgmt_channel *ch = ctx_buf; 704 struct spdk_bdev_io *bdev_io; 705 uint32_t i; 706 707 STAILQ_INIT(&ch->need_buf_small); 708 STAILQ_INIT(&ch->need_buf_large); 709 710 STAILQ_INIT(&ch->per_thread_cache); 711 ch->bdev_io_cache_size = g_bdev_opts.bdev_io_cache_size; 712 713 /* Pre-populate bdev_io cache to ensure this thread cannot be starved. */ 714 ch->per_thread_cache_count = 0; 715 for (i = 0; i < ch->bdev_io_cache_size; i++) { 716 bdev_io = spdk_mempool_get(g_bdev_mgr.bdev_io_pool); 717 assert(bdev_io != NULL); 718 ch->per_thread_cache_count++; 719 STAILQ_INSERT_TAIL(&ch->per_thread_cache, bdev_io, internal.buf_link); 720 } 721 722 TAILQ_INIT(&ch->shared_resources); 723 TAILQ_INIT(&ch->io_wait_queue); 724 725 return 0; 726 } 727 728 static void 729 spdk_bdev_mgmt_channel_destroy(void *io_device, void *ctx_buf) 730 { 731 struct spdk_bdev_mgmt_channel *ch = ctx_buf; 732 struct spdk_bdev_io *bdev_io; 733 734 if (!STAILQ_EMPTY(&ch->need_buf_small) || !STAILQ_EMPTY(&ch->need_buf_large)) { 735 SPDK_ERRLOG("Pending I/O list wasn't empty on mgmt channel free\n"); 736 } 737 738 if (!TAILQ_EMPTY(&ch->shared_resources)) { 739 SPDK_ERRLOG("Module channel list wasn't empty on mgmt channel free\n"); 740 } 741 742 while (!STAILQ_EMPTY(&ch->per_thread_cache)) { 743 bdev_io = STAILQ_FIRST(&ch->per_thread_cache); 744 STAILQ_REMOVE_HEAD(&ch->per_thread_cache, internal.buf_link); 745 ch->per_thread_cache_count--; 746 spdk_mempool_put(g_bdev_mgr.bdev_io_pool, (void *)bdev_io); 747 } 748 749 assert(ch->per_thread_cache_count == 0); 750 } 751 752 static void 753 spdk_bdev_init_complete(int rc) 754 { 755 spdk_bdev_init_cb cb_fn = g_init_cb_fn; 756 void *cb_arg = g_init_cb_arg; 757 struct spdk_bdev_module *m; 758 759 g_bdev_mgr.init_complete = true; 760 g_init_cb_fn = NULL; 761 g_init_cb_arg = NULL; 762 763 /* 764 * For modules that need to know when subsystem init is complete, 765 * inform them now. 766 */ 767 if (rc == 0) { 768 TAILQ_FOREACH(m, &g_bdev_mgr.bdev_modules, internal.tailq) { 769 if (m->init_complete) { 770 m->init_complete(); 771 } 772 } 773 } 774 775 cb_fn(cb_arg, rc); 776 } 777 778 static void 779 spdk_bdev_module_action_complete(void) 780 { 781 struct spdk_bdev_module *m; 782 783 /* 784 * Don't finish bdev subsystem initialization if 785 * module pre-initialization is still in progress, or 786 * the subsystem been already initialized. 787 */ 788 if (!g_bdev_mgr.module_init_complete || g_bdev_mgr.init_complete) { 789 return; 790 } 791 792 /* 793 * Check all bdev modules for inits/examinations in progress. If any 794 * exist, return immediately since we cannot finish bdev subsystem 795 * initialization until all are completed. 796 */ 797 TAILQ_FOREACH(m, &g_bdev_mgr.bdev_modules, internal.tailq) { 798 if (m->internal.action_in_progress > 0) { 799 return; 800 } 801 } 802 803 /* 804 * Modules already finished initialization - now that all 805 * the bdev modules have finished their asynchronous I/O 806 * processing, the entire bdev layer can be marked as complete. 807 */ 808 spdk_bdev_init_complete(0); 809 } 810 811 static void 812 spdk_bdev_module_action_done(struct spdk_bdev_module *module) 813 { 814 assert(module->internal.action_in_progress > 0); 815 module->internal.action_in_progress--; 816 spdk_bdev_module_action_complete(); 817 } 818 819 void 820 spdk_bdev_module_init_done(struct spdk_bdev_module *module) 821 { 822 spdk_bdev_module_action_done(module); 823 } 824 825 void 826 spdk_bdev_module_examine_done(struct spdk_bdev_module *module) 827 { 828 spdk_bdev_module_action_done(module); 829 } 830 831 /** The last initialized bdev module */ 832 static struct spdk_bdev_module *g_resume_bdev_module = NULL; 833 834 static int 835 spdk_bdev_modules_init(void) 836 { 837 struct spdk_bdev_module *module; 838 int rc = 0; 839 840 TAILQ_FOREACH(module, &g_bdev_mgr.bdev_modules, internal.tailq) { 841 g_resume_bdev_module = module; 842 rc = module->module_init(); 843 if (rc != 0) { 844 return rc; 845 } 846 } 847 848 g_resume_bdev_module = NULL; 849 return 0; 850 } 851 852 853 static void 854 spdk_bdev_init_failed_complete(void *cb_arg) 855 { 856 spdk_bdev_init_complete(-1); 857 } 858 859 static void 860 spdk_bdev_init_failed(void *cb_arg) 861 { 862 spdk_bdev_finish(spdk_bdev_init_failed_complete, NULL); 863 } 864 865 void 866 spdk_bdev_initialize(spdk_bdev_init_cb cb_fn, void *cb_arg) 867 { 868 struct spdk_conf_section *sp; 869 struct spdk_bdev_opts bdev_opts; 870 int32_t bdev_io_pool_size, bdev_io_cache_size; 871 int cache_size; 872 int rc = 0; 873 char mempool_name[32]; 874 875 assert(cb_fn != NULL); 876 877 sp = spdk_conf_find_section(NULL, "Bdev"); 878 if (sp != NULL) { 879 spdk_bdev_get_opts(&bdev_opts); 880 881 bdev_io_pool_size = spdk_conf_section_get_intval(sp, "BdevIoPoolSize"); 882 if (bdev_io_pool_size >= 0) { 883 bdev_opts.bdev_io_pool_size = bdev_io_pool_size; 884 } 885 886 bdev_io_cache_size = spdk_conf_section_get_intval(sp, "BdevIoCacheSize"); 887 if (bdev_io_cache_size >= 0) { 888 bdev_opts.bdev_io_cache_size = bdev_io_cache_size; 889 } 890 891 if (spdk_bdev_set_opts(&bdev_opts)) { 892 spdk_bdev_init_complete(-1); 893 return; 894 } 895 896 assert(memcmp(&bdev_opts, &g_bdev_opts, sizeof(bdev_opts)) == 0); 897 } 898 899 g_init_cb_fn = cb_fn; 900 g_init_cb_arg = cb_arg; 901 902 snprintf(mempool_name, sizeof(mempool_name), "bdev_io_%d", getpid()); 903 904 g_bdev_mgr.bdev_io_pool = spdk_mempool_create(mempool_name, 905 g_bdev_opts.bdev_io_pool_size, 906 sizeof(struct spdk_bdev_io) + 907 spdk_bdev_module_get_max_ctx_size(), 908 0, 909 SPDK_ENV_SOCKET_ID_ANY); 910 911 if (g_bdev_mgr.bdev_io_pool == NULL) { 912 SPDK_ERRLOG("could not allocate spdk_bdev_io pool\n"); 913 spdk_bdev_init_complete(-1); 914 return; 915 } 916 917 /** 918 * Ensure no more than half of the total buffers end up local caches, by 919 * using spdk_thread_get_count() to determine how many local caches we need 920 * to account for. 921 */ 922 cache_size = BUF_SMALL_POOL_SIZE / (2 * spdk_thread_get_count()); 923 snprintf(mempool_name, sizeof(mempool_name), "buf_small_pool_%d", getpid()); 924 925 g_bdev_mgr.buf_small_pool = spdk_mempool_create(mempool_name, 926 BUF_SMALL_POOL_SIZE, 927 SPDK_BDEV_SMALL_BUF_MAX_SIZE + SPDK_BDEV_POOL_ALIGNMENT, 928 cache_size, 929 SPDK_ENV_SOCKET_ID_ANY); 930 if (!g_bdev_mgr.buf_small_pool) { 931 SPDK_ERRLOG("create rbuf small pool failed\n"); 932 spdk_bdev_init_complete(-1); 933 return; 934 } 935 936 cache_size = BUF_LARGE_POOL_SIZE / (2 * spdk_thread_get_count()); 937 snprintf(mempool_name, sizeof(mempool_name), "buf_large_pool_%d", getpid()); 938 939 g_bdev_mgr.buf_large_pool = spdk_mempool_create(mempool_name, 940 BUF_LARGE_POOL_SIZE, 941 SPDK_BDEV_LARGE_BUF_MAX_SIZE + SPDK_BDEV_POOL_ALIGNMENT, 942 cache_size, 943 SPDK_ENV_SOCKET_ID_ANY); 944 if (!g_bdev_mgr.buf_large_pool) { 945 SPDK_ERRLOG("create rbuf large pool failed\n"); 946 spdk_bdev_init_complete(-1); 947 return; 948 } 949 950 g_bdev_mgr.zero_buffer = spdk_dma_zmalloc(ZERO_BUFFER_SIZE, ZERO_BUFFER_SIZE, 951 NULL); 952 if (!g_bdev_mgr.zero_buffer) { 953 SPDK_ERRLOG("create bdev zero buffer failed\n"); 954 spdk_bdev_init_complete(-1); 955 return; 956 } 957 958 #ifdef SPDK_CONFIG_VTUNE 959 g_bdev_mgr.domain = __itt_domain_create("spdk_bdev"); 960 #endif 961 962 spdk_io_device_register(&g_bdev_mgr, spdk_bdev_mgmt_channel_create, 963 spdk_bdev_mgmt_channel_destroy, 964 sizeof(struct spdk_bdev_mgmt_channel), 965 "bdev_mgr"); 966 967 rc = spdk_bdev_modules_init(); 968 g_bdev_mgr.module_init_complete = true; 969 if (rc != 0) { 970 SPDK_ERRLOG("bdev modules init failed\n"); 971 spdk_thread_send_msg(spdk_get_thread(), spdk_bdev_init_failed, NULL); 972 return; 973 } 974 975 spdk_bdev_module_action_complete(); 976 } 977 978 static void 979 spdk_bdev_mgr_unregister_cb(void *io_device) 980 { 981 spdk_bdev_fini_cb cb_fn = g_fini_cb_fn; 982 983 if (spdk_mempool_count(g_bdev_mgr.bdev_io_pool) != g_bdev_opts.bdev_io_pool_size) { 984 SPDK_ERRLOG("bdev IO pool count is %zu but should be %u\n", 985 spdk_mempool_count(g_bdev_mgr.bdev_io_pool), 986 g_bdev_opts.bdev_io_pool_size); 987 } 988 989 if (spdk_mempool_count(g_bdev_mgr.buf_small_pool) != BUF_SMALL_POOL_SIZE) { 990 SPDK_ERRLOG("Small buffer pool count is %zu but should be %u\n", 991 spdk_mempool_count(g_bdev_mgr.buf_small_pool), 992 BUF_SMALL_POOL_SIZE); 993 assert(false); 994 } 995 996 if (spdk_mempool_count(g_bdev_mgr.buf_large_pool) != BUF_LARGE_POOL_SIZE) { 997 SPDK_ERRLOG("Large buffer pool count is %zu but should be %u\n", 998 spdk_mempool_count(g_bdev_mgr.buf_large_pool), 999 BUF_LARGE_POOL_SIZE); 1000 assert(false); 1001 } 1002 1003 spdk_mempool_free(g_bdev_mgr.bdev_io_pool); 1004 spdk_mempool_free(g_bdev_mgr.buf_small_pool); 1005 spdk_mempool_free(g_bdev_mgr.buf_large_pool); 1006 spdk_dma_free(g_bdev_mgr.zero_buffer); 1007 1008 cb_fn(g_fini_cb_arg); 1009 g_fini_cb_fn = NULL; 1010 g_fini_cb_arg = NULL; 1011 g_bdev_mgr.init_complete = false; 1012 g_bdev_mgr.module_init_complete = false; 1013 } 1014 1015 static void 1016 spdk_bdev_module_finish_iter(void *arg) 1017 { 1018 struct spdk_bdev_module *bdev_module; 1019 1020 /* Start iterating from the last touched module */ 1021 if (!g_resume_bdev_module) { 1022 bdev_module = TAILQ_LAST(&g_bdev_mgr.bdev_modules, bdev_module_list); 1023 } else { 1024 bdev_module = TAILQ_PREV(g_resume_bdev_module, bdev_module_list, 1025 internal.tailq); 1026 } 1027 1028 while (bdev_module) { 1029 if (bdev_module->async_fini) { 1030 /* Save our place so we can resume later. We must 1031 * save the variable here, before calling module_fini() 1032 * below, because in some cases the module may immediately 1033 * call spdk_bdev_module_finish_done() and re-enter 1034 * this function to continue iterating. */ 1035 g_resume_bdev_module = bdev_module; 1036 } 1037 1038 if (bdev_module->module_fini) { 1039 bdev_module->module_fini(); 1040 } 1041 1042 if (bdev_module->async_fini) { 1043 return; 1044 } 1045 1046 bdev_module = TAILQ_PREV(bdev_module, bdev_module_list, 1047 internal.tailq); 1048 } 1049 1050 g_resume_bdev_module = NULL; 1051 spdk_io_device_unregister(&g_bdev_mgr, spdk_bdev_mgr_unregister_cb); 1052 } 1053 1054 void 1055 spdk_bdev_module_finish_done(void) 1056 { 1057 if (spdk_get_thread() != g_fini_thread) { 1058 spdk_thread_send_msg(g_fini_thread, spdk_bdev_module_finish_iter, NULL); 1059 } else { 1060 spdk_bdev_module_finish_iter(NULL); 1061 } 1062 } 1063 1064 static void 1065 _spdk_bdev_finish_unregister_bdevs_iter(void *cb_arg, int bdeverrno) 1066 { 1067 struct spdk_bdev *bdev = cb_arg; 1068 1069 if (bdeverrno && bdev) { 1070 SPDK_WARNLOG("Unable to unregister bdev '%s' during spdk_bdev_finish()\n", 1071 bdev->name); 1072 1073 /* 1074 * Since the call to spdk_bdev_unregister() failed, we have no way to free this 1075 * bdev; try to continue by manually removing this bdev from the list and continue 1076 * with the next bdev in the list. 1077 */ 1078 TAILQ_REMOVE(&g_bdev_mgr.bdevs, bdev, internal.link); 1079 } 1080 1081 if (TAILQ_EMPTY(&g_bdev_mgr.bdevs)) { 1082 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Done unregistering bdevs\n"); 1083 /* 1084 * Bdev module finish need to be deferred as we might be in the middle of some context 1085 * (like bdev part free) that will use this bdev (or private bdev driver ctx data) 1086 * after returning. 1087 */ 1088 spdk_thread_send_msg(spdk_get_thread(), spdk_bdev_module_finish_iter, NULL); 1089 return; 1090 } 1091 1092 /* 1093 * Unregister last unclaimed bdev in the list, to ensure that bdev subsystem 1094 * shutdown proceeds top-down. The goal is to give virtual bdevs an opportunity 1095 * to detect clean shutdown as opposed to run-time hot removal of the underlying 1096 * base bdevs. 1097 * 1098 * Also, walk the list in the reverse order. 1099 */ 1100 for (bdev = TAILQ_LAST(&g_bdev_mgr.bdevs, spdk_bdev_list); 1101 bdev; bdev = TAILQ_PREV(bdev, spdk_bdev_list, internal.link)) { 1102 if (bdev->internal.claim_module != NULL) { 1103 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Skipping claimed bdev '%s'(<-'%s').\n", 1104 bdev->name, bdev->internal.claim_module->name); 1105 continue; 1106 } 1107 1108 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Unregistering bdev '%s'\n", bdev->name); 1109 spdk_bdev_unregister(bdev, _spdk_bdev_finish_unregister_bdevs_iter, bdev); 1110 return; 1111 } 1112 1113 /* 1114 * If any bdev fails to unclaim underlying bdev properly, we may face the 1115 * case of bdev list consisting of claimed bdevs only (if claims are managed 1116 * correctly, this would mean there's a loop in the claims graph which is 1117 * clearly impossible). Warn and unregister last bdev on the list then. 1118 */ 1119 for (bdev = TAILQ_LAST(&g_bdev_mgr.bdevs, spdk_bdev_list); 1120 bdev; bdev = TAILQ_PREV(bdev, spdk_bdev_list, internal.link)) { 1121 SPDK_ERRLOG("Unregistering claimed bdev '%s'!\n", bdev->name); 1122 spdk_bdev_unregister(bdev, _spdk_bdev_finish_unregister_bdevs_iter, bdev); 1123 return; 1124 } 1125 } 1126 1127 void 1128 spdk_bdev_finish(spdk_bdev_fini_cb cb_fn, void *cb_arg) 1129 { 1130 struct spdk_bdev_module *m; 1131 1132 assert(cb_fn != NULL); 1133 1134 g_fini_thread = spdk_get_thread(); 1135 1136 g_fini_cb_fn = cb_fn; 1137 g_fini_cb_arg = cb_arg; 1138 1139 TAILQ_FOREACH(m, &g_bdev_mgr.bdev_modules, internal.tailq) { 1140 if (m->fini_start) { 1141 m->fini_start(); 1142 } 1143 } 1144 1145 _spdk_bdev_finish_unregister_bdevs_iter(NULL, 0); 1146 } 1147 1148 static struct spdk_bdev_io * 1149 spdk_bdev_get_io(struct spdk_bdev_channel *channel) 1150 { 1151 struct spdk_bdev_mgmt_channel *ch = channel->shared_resource->mgmt_ch; 1152 struct spdk_bdev_io *bdev_io; 1153 1154 if (ch->per_thread_cache_count > 0) { 1155 bdev_io = STAILQ_FIRST(&ch->per_thread_cache); 1156 STAILQ_REMOVE_HEAD(&ch->per_thread_cache, internal.buf_link); 1157 ch->per_thread_cache_count--; 1158 } else if (spdk_unlikely(!TAILQ_EMPTY(&ch->io_wait_queue))) { 1159 /* 1160 * Don't try to look for bdev_ios in the global pool if there are 1161 * waiters on bdev_ios - we don't want this caller to jump the line. 1162 */ 1163 bdev_io = NULL; 1164 } else { 1165 bdev_io = spdk_mempool_get(g_bdev_mgr.bdev_io_pool); 1166 } 1167 1168 return bdev_io; 1169 } 1170 1171 void 1172 spdk_bdev_free_io(struct spdk_bdev_io *bdev_io) 1173 { 1174 struct spdk_bdev_mgmt_channel *ch = bdev_io->internal.ch->shared_resource->mgmt_ch; 1175 1176 assert(bdev_io != NULL); 1177 assert(bdev_io->internal.status != SPDK_BDEV_IO_STATUS_PENDING); 1178 1179 if (bdev_io->internal.buf != NULL) { 1180 spdk_bdev_io_put_buf(bdev_io); 1181 } 1182 1183 if (ch->per_thread_cache_count < ch->bdev_io_cache_size) { 1184 ch->per_thread_cache_count++; 1185 STAILQ_INSERT_TAIL(&ch->per_thread_cache, bdev_io, internal.buf_link); 1186 while (ch->per_thread_cache_count > 0 && !TAILQ_EMPTY(&ch->io_wait_queue)) { 1187 struct spdk_bdev_io_wait_entry *entry; 1188 1189 entry = TAILQ_FIRST(&ch->io_wait_queue); 1190 TAILQ_REMOVE(&ch->io_wait_queue, entry, link); 1191 entry->cb_fn(entry->cb_arg); 1192 } 1193 } else { 1194 /* We should never have a full cache with entries on the io wait queue. */ 1195 assert(TAILQ_EMPTY(&ch->io_wait_queue)); 1196 spdk_mempool_put(g_bdev_mgr.bdev_io_pool, (void *)bdev_io); 1197 } 1198 } 1199 1200 static bool 1201 _spdk_bdev_qos_is_iops_rate_limit(enum spdk_bdev_qos_rate_limit_type limit) 1202 { 1203 assert(limit != SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES); 1204 1205 switch (limit) { 1206 case SPDK_BDEV_QOS_RW_IOPS_RATE_LIMIT: 1207 return true; 1208 case SPDK_BDEV_QOS_RW_BPS_RATE_LIMIT: 1209 return false; 1210 case SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES: 1211 default: 1212 return false; 1213 } 1214 } 1215 1216 static bool 1217 _spdk_bdev_qos_io_to_limit(struct spdk_bdev_io *bdev_io) 1218 { 1219 switch (bdev_io->type) { 1220 case SPDK_BDEV_IO_TYPE_NVME_IO: 1221 case SPDK_BDEV_IO_TYPE_NVME_IO_MD: 1222 case SPDK_BDEV_IO_TYPE_READ: 1223 case SPDK_BDEV_IO_TYPE_WRITE: 1224 case SPDK_BDEV_IO_TYPE_UNMAP: 1225 case SPDK_BDEV_IO_TYPE_WRITE_ZEROES: 1226 return true; 1227 default: 1228 return false; 1229 } 1230 } 1231 1232 static uint64_t 1233 _spdk_bdev_get_io_size_in_byte(struct spdk_bdev_io *bdev_io) 1234 { 1235 struct spdk_bdev *bdev = bdev_io->bdev; 1236 1237 switch (bdev_io->type) { 1238 case SPDK_BDEV_IO_TYPE_NVME_IO: 1239 case SPDK_BDEV_IO_TYPE_NVME_IO_MD: 1240 return bdev_io->u.nvme_passthru.nbytes; 1241 case SPDK_BDEV_IO_TYPE_READ: 1242 case SPDK_BDEV_IO_TYPE_WRITE: 1243 case SPDK_BDEV_IO_TYPE_UNMAP: 1244 case SPDK_BDEV_IO_TYPE_WRITE_ZEROES: 1245 return bdev_io->u.bdev.num_blocks * bdev->blocklen; 1246 default: 1247 return 0; 1248 } 1249 } 1250 1251 static void 1252 _spdk_bdev_qos_update_per_io(struct spdk_bdev_qos *qos, uint64_t io_size_in_byte) 1253 { 1254 int i; 1255 1256 for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) { 1257 if (qos->rate_limits[i].limit == SPDK_BDEV_QOS_LIMIT_NOT_DEFINED) { 1258 continue; 1259 } 1260 1261 switch (i) { 1262 case SPDK_BDEV_QOS_RW_IOPS_RATE_LIMIT: 1263 qos->rate_limits[i].remaining_this_timeslice--; 1264 break; 1265 case SPDK_BDEV_QOS_RW_BPS_RATE_LIMIT: 1266 qos->rate_limits[i].remaining_this_timeslice -= io_size_in_byte; 1267 break; 1268 case SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES: 1269 default: 1270 break; 1271 } 1272 } 1273 } 1274 1275 static int 1276 _spdk_bdev_qos_io_submit(struct spdk_bdev_channel *ch, struct spdk_bdev_qos *qos) 1277 { 1278 struct spdk_bdev_io *bdev_io = NULL; 1279 struct spdk_bdev *bdev = ch->bdev; 1280 struct spdk_bdev_shared_resource *shared_resource = ch->shared_resource; 1281 int i, submitted_ios = 0; 1282 bool to_limit_io; 1283 uint64_t io_size_in_byte; 1284 1285 while (!TAILQ_EMPTY(&qos->queued)) { 1286 for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) { 1287 if (qos->rate_limits[i].max_per_timeslice > 0 && 1288 (qos->rate_limits[i].remaining_this_timeslice <= 0)) { 1289 return submitted_ios; 1290 } 1291 } 1292 1293 bdev_io = TAILQ_FIRST(&qos->queued); 1294 TAILQ_REMOVE(&qos->queued, bdev_io, internal.link); 1295 ch->io_outstanding++; 1296 shared_resource->io_outstanding++; 1297 to_limit_io = _spdk_bdev_qos_io_to_limit(bdev_io); 1298 if (to_limit_io == true) { 1299 io_size_in_byte = _spdk_bdev_get_io_size_in_byte(bdev_io); 1300 _spdk_bdev_qos_update_per_io(qos, io_size_in_byte); 1301 } 1302 bdev->fn_table->submit_request(ch->channel, bdev_io); 1303 submitted_ios++; 1304 } 1305 1306 return submitted_ios; 1307 } 1308 1309 static void 1310 _spdk_bdev_queue_io_wait_with_cb(struct spdk_bdev_io *bdev_io, spdk_bdev_io_wait_cb cb_fn) 1311 { 1312 int rc; 1313 1314 bdev_io->internal.waitq_entry.bdev = bdev_io->bdev; 1315 bdev_io->internal.waitq_entry.cb_fn = cb_fn; 1316 bdev_io->internal.waitq_entry.cb_arg = bdev_io; 1317 rc = spdk_bdev_queue_io_wait(bdev_io->bdev, spdk_io_channel_from_ctx(bdev_io->internal.ch), 1318 &bdev_io->internal.waitq_entry); 1319 if (rc != 0) { 1320 SPDK_ERRLOG("Queue IO failed, rc=%d\n", rc); 1321 bdev_io->internal.status = SPDK_BDEV_IO_STATUS_FAILED; 1322 bdev_io->internal.cb(bdev_io, false, bdev_io->internal.caller_ctx); 1323 } 1324 } 1325 1326 static bool 1327 _spdk_bdev_io_type_can_split(uint8_t type) 1328 { 1329 assert(type != SPDK_BDEV_IO_TYPE_INVALID); 1330 assert(type < SPDK_BDEV_NUM_IO_TYPES); 1331 1332 /* Only split READ and WRITE I/O. Theoretically other types of I/O like 1333 * UNMAP could be split, but these types of I/O are typically much larger 1334 * in size (sometimes the size of the entire block device), and the bdev 1335 * module can more efficiently split these types of I/O. Plus those types 1336 * of I/O do not have a payload, which makes the splitting process simpler. 1337 */ 1338 if (type == SPDK_BDEV_IO_TYPE_READ || type == SPDK_BDEV_IO_TYPE_WRITE) { 1339 return true; 1340 } else { 1341 return false; 1342 } 1343 } 1344 1345 static bool 1346 _spdk_bdev_io_should_split(struct spdk_bdev_io *bdev_io) 1347 { 1348 uint64_t start_stripe, end_stripe; 1349 uint32_t io_boundary = bdev_io->bdev->optimal_io_boundary; 1350 1351 if (io_boundary == 0) { 1352 return false; 1353 } 1354 1355 if (!_spdk_bdev_io_type_can_split(bdev_io->type)) { 1356 return false; 1357 } 1358 1359 start_stripe = bdev_io->u.bdev.offset_blocks; 1360 end_stripe = start_stripe + bdev_io->u.bdev.num_blocks - 1; 1361 /* Avoid expensive div operations if possible. These spdk_u32 functions are very cheap. */ 1362 if (spdk_likely(spdk_u32_is_pow2(io_boundary))) { 1363 start_stripe >>= spdk_u32log2(io_boundary); 1364 end_stripe >>= spdk_u32log2(io_boundary); 1365 } else { 1366 start_stripe /= io_boundary; 1367 end_stripe /= io_boundary; 1368 } 1369 return (start_stripe != end_stripe); 1370 } 1371 1372 static uint32_t 1373 _to_next_boundary(uint64_t offset, uint32_t boundary) 1374 { 1375 return (boundary - (offset % boundary)); 1376 } 1377 1378 static void 1379 _spdk_bdev_io_split_done(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg); 1380 1381 static void 1382 _spdk_bdev_io_split_with_payload(void *_bdev_io) 1383 { 1384 struct spdk_bdev_io *bdev_io = _bdev_io; 1385 uint64_t current_offset, remaining; 1386 uint32_t blocklen, to_next_boundary, to_next_boundary_bytes; 1387 struct iovec *parent_iov, *iov; 1388 uint64_t parent_iov_offset, iov_len; 1389 uint32_t parent_iovpos, parent_iovcnt, child_iovcnt, iovcnt; 1390 int rc; 1391 1392 remaining = bdev_io->u.bdev.split_remaining_num_blocks; 1393 current_offset = bdev_io->u.bdev.split_current_offset_blocks; 1394 blocklen = bdev_io->bdev->blocklen; 1395 parent_iov_offset = (current_offset - bdev_io->u.bdev.offset_blocks) * blocklen; 1396 parent_iovcnt = bdev_io->u.bdev.iovcnt; 1397 1398 for (parent_iovpos = 0; parent_iovpos < parent_iovcnt; parent_iovpos++) { 1399 parent_iov = &bdev_io->u.bdev.iovs[parent_iovpos]; 1400 if (parent_iov_offset < parent_iov->iov_len) { 1401 break; 1402 } 1403 parent_iov_offset -= parent_iov->iov_len; 1404 } 1405 1406 child_iovcnt = 0; 1407 while (remaining > 0 && parent_iovpos < parent_iovcnt && child_iovcnt < BDEV_IO_NUM_CHILD_IOV) { 1408 to_next_boundary = _to_next_boundary(current_offset, bdev_io->bdev->optimal_io_boundary); 1409 to_next_boundary = spdk_min(remaining, to_next_boundary); 1410 to_next_boundary_bytes = to_next_boundary * blocklen; 1411 iov = &bdev_io->child_iov[child_iovcnt]; 1412 iovcnt = 0; 1413 while (to_next_boundary_bytes > 0 && parent_iovpos < parent_iovcnt && 1414 child_iovcnt < BDEV_IO_NUM_CHILD_IOV) { 1415 parent_iov = &bdev_io->u.bdev.iovs[parent_iovpos]; 1416 iov_len = spdk_min(to_next_boundary_bytes, parent_iov->iov_len - parent_iov_offset); 1417 to_next_boundary_bytes -= iov_len; 1418 1419 bdev_io->child_iov[child_iovcnt].iov_base = parent_iov->iov_base + parent_iov_offset; 1420 bdev_io->child_iov[child_iovcnt].iov_len = iov_len; 1421 1422 if (iov_len < parent_iov->iov_len - parent_iov_offset) { 1423 parent_iov_offset += iov_len; 1424 } else { 1425 parent_iovpos++; 1426 parent_iov_offset = 0; 1427 } 1428 child_iovcnt++; 1429 iovcnt++; 1430 } 1431 1432 if (to_next_boundary_bytes > 0) { 1433 /* We had to stop this child I/O early because we ran out of 1434 * child_iov space. Make sure the iovs collected are valid and 1435 * then adjust to_next_boundary before starting the child I/O. 1436 */ 1437 if ((to_next_boundary_bytes % blocklen) != 0) { 1438 SPDK_ERRLOG("Remaining %" PRIu32 " is not multiple of block size %" PRIu32 "\n", 1439 to_next_boundary_bytes, blocklen); 1440 bdev_io->internal.status = SPDK_BDEV_IO_STATUS_FAILED; 1441 if (bdev_io->u.bdev.split_outstanding == 0) { 1442 bdev_io->internal.cb(bdev_io, false, bdev_io->internal.caller_ctx); 1443 } 1444 return; 1445 } 1446 to_next_boundary -= to_next_boundary_bytes / blocklen; 1447 } 1448 1449 bdev_io->u.bdev.split_outstanding++; 1450 1451 if (bdev_io->type == SPDK_BDEV_IO_TYPE_READ) { 1452 rc = spdk_bdev_readv_blocks(bdev_io->internal.desc, 1453 spdk_io_channel_from_ctx(bdev_io->internal.ch), 1454 iov, iovcnt, current_offset, to_next_boundary, 1455 _spdk_bdev_io_split_done, bdev_io); 1456 } else { 1457 rc = spdk_bdev_writev_blocks(bdev_io->internal.desc, 1458 spdk_io_channel_from_ctx(bdev_io->internal.ch), 1459 iov, iovcnt, current_offset, to_next_boundary, 1460 _spdk_bdev_io_split_done, bdev_io); 1461 } 1462 1463 if (rc == 0) { 1464 current_offset += to_next_boundary; 1465 remaining -= to_next_boundary; 1466 bdev_io->u.bdev.split_current_offset_blocks = current_offset; 1467 bdev_io->u.bdev.split_remaining_num_blocks = remaining; 1468 } else { 1469 bdev_io->u.bdev.split_outstanding--; 1470 if (rc == -ENOMEM) { 1471 if (bdev_io->u.bdev.split_outstanding == 0) { 1472 /* No I/O is outstanding. Hence we should wait here. */ 1473 _spdk_bdev_queue_io_wait_with_cb(bdev_io, 1474 _spdk_bdev_io_split_with_payload); 1475 } 1476 } else { 1477 bdev_io->internal.status = SPDK_BDEV_IO_STATUS_FAILED; 1478 if (bdev_io->u.bdev.split_outstanding == 0) { 1479 bdev_io->internal.cb(bdev_io, false, bdev_io->internal.caller_ctx); 1480 } 1481 } 1482 1483 return; 1484 } 1485 } 1486 } 1487 1488 static void 1489 _spdk_bdev_io_split_done(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg) 1490 { 1491 struct spdk_bdev_io *parent_io = cb_arg; 1492 1493 spdk_bdev_free_io(bdev_io); 1494 1495 if (!success) { 1496 parent_io->internal.status = SPDK_BDEV_IO_STATUS_FAILED; 1497 } 1498 parent_io->u.bdev.split_outstanding--; 1499 if (parent_io->u.bdev.split_outstanding != 0) { 1500 return; 1501 } 1502 1503 /* 1504 * Parent I/O finishes when all blocks are consumed or there is any failure of 1505 * child I/O and no outstanding child I/O. 1506 */ 1507 if (parent_io->u.bdev.split_remaining_num_blocks == 0 || 1508 parent_io->internal.status != SPDK_BDEV_IO_STATUS_SUCCESS) { 1509 parent_io->internal.cb(parent_io, parent_io->internal.status == SPDK_BDEV_IO_STATUS_SUCCESS, 1510 parent_io->internal.caller_ctx); 1511 return; 1512 } 1513 1514 /* 1515 * Continue with the splitting process. This function will complete the parent I/O if the 1516 * splitting is done. 1517 */ 1518 _spdk_bdev_io_split_with_payload(parent_io); 1519 } 1520 1521 static void 1522 _spdk_bdev_io_split(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io) 1523 { 1524 assert(_spdk_bdev_io_type_can_split(bdev_io->type)); 1525 1526 bdev_io->u.bdev.split_current_offset_blocks = bdev_io->u.bdev.offset_blocks; 1527 bdev_io->u.bdev.split_remaining_num_blocks = bdev_io->u.bdev.num_blocks; 1528 bdev_io->u.bdev.split_outstanding = 0; 1529 bdev_io->internal.status = SPDK_BDEV_IO_STATUS_SUCCESS; 1530 1531 _spdk_bdev_io_split_with_payload(bdev_io); 1532 } 1533 1534 static void 1535 _spdk_bdev_io_submit(void *ctx) 1536 { 1537 struct spdk_bdev_io *bdev_io = ctx; 1538 struct spdk_bdev *bdev = bdev_io->bdev; 1539 struct spdk_bdev_channel *bdev_ch = bdev_io->internal.ch; 1540 struct spdk_io_channel *ch = bdev_ch->channel; 1541 struct spdk_bdev_shared_resource *shared_resource = bdev_ch->shared_resource; 1542 uint64_t tsc; 1543 1544 tsc = spdk_get_ticks(); 1545 bdev_io->internal.submit_tsc = tsc; 1546 spdk_trace_record_tsc(tsc, TRACE_BDEV_IO_START, 0, 0, (uintptr_t)bdev_io, bdev_io->type); 1547 bdev_ch->io_outstanding++; 1548 shared_resource->io_outstanding++; 1549 bdev_io->internal.in_submit_request = true; 1550 if (spdk_likely(bdev_ch->flags == 0)) { 1551 if (spdk_likely(TAILQ_EMPTY(&shared_resource->nomem_io))) { 1552 bdev->fn_table->submit_request(ch, bdev_io); 1553 } else { 1554 bdev_ch->io_outstanding--; 1555 shared_resource->io_outstanding--; 1556 TAILQ_INSERT_TAIL(&shared_resource->nomem_io, bdev_io, internal.link); 1557 } 1558 } else if (bdev_ch->flags & BDEV_CH_RESET_IN_PROGRESS) { 1559 spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED); 1560 } else if (bdev_ch->flags & BDEV_CH_QOS_ENABLED) { 1561 bdev_ch->io_outstanding--; 1562 shared_resource->io_outstanding--; 1563 TAILQ_INSERT_TAIL(&bdev->internal.qos->queued, bdev_io, internal.link); 1564 _spdk_bdev_qos_io_submit(bdev_ch, bdev->internal.qos); 1565 } else { 1566 SPDK_ERRLOG("unknown bdev_ch flag %x found\n", bdev_ch->flags); 1567 spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED); 1568 } 1569 bdev_io->internal.in_submit_request = false; 1570 } 1571 1572 static void 1573 spdk_bdev_io_submit(struct spdk_bdev_io *bdev_io) 1574 { 1575 struct spdk_bdev *bdev = bdev_io->bdev; 1576 struct spdk_thread *thread = spdk_io_channel_get_thread(bdev_io->internal.ch->channel); 1577 1578 assert(thread != NULL); 1579 assert(bdev_io->internal.status == SPDK_BDEV_IO_STATUS_PENDING); 1580 1581 if (bdev->split_on_optimal_io_boundary && _spdk_bdev_io_should_split(bdev_io)) { 1582 if (bdev_io->type == SPDK_BDEV_IO_TYPE_READ) { 1583 spdk_bdev_io_get_buf(bdev_io, _spdk_bdev_io_split, 1584 bdev_io->u.bdev.num_blocks * bdev_io->bdev->blocklen); 1585 } else { 1586 _spdk_bdev_io_split(NULL, bdev_io); 1587 } 1588 return; 1589 } 1590 1591 if (bdev_io->internal.ch->flags & BDEV_CH_QOS_ENABLED) { 1592 if ((thread == bdev->internal.qos->thread) || !bdev->internal.qos->thread) { 1593 _spdk_bdev_io_submit(bdev_io); 1594 } else { 1595 bdev_io->internal.io_submit_ch = bdev_io->internal.ch; 1596 bdev_io->internal.ch = bdev->internal.qos->ch; 1597 spdk_thread_send_msg(bdev->internal.qos->thread, _spdk_bdev_io_submit, bdev_io); 1598 } 1599 } else { 1600 _spdk_bdev_io_submit(bdev_io); 1601 } 1602 } 1603 1604 static void 1605 spdk_bdev_io_submit_reset(struct spdk_bdev_io *bdev_io) 1606 { 1607 struct spdk_bdev *bdev = bdev_io->bdev; 1608 struct spdk_bdev_channel *bdev_ch = bdev_io->internal.ch; 1609 struct spdk_io_channel *ch = bdev_ch->channel; 1610 1611 assert(bdev_io->internal.status == SPDK_BDEV_IO_STATUS_PENDING); 1612 1613 bdev_io->internal.in_submit_request = true; 1614 bdev->fn_table->submit_request(ch, bdev_io); 1615 bdev_io->internal.in_submit_request = false; 1616 } 1617 1618 static void 1619 spdk_bdev_io_init(struct spdk_bdev_io *bdev_io, 1620 struct spdk_bdev *bdev, void *cb_arg, 1621 spdk_bdev_io_completion_cb cb) 1622 { 1623 bdev_io->bdev = bdev; 1624 bdev_io->internal.caller_ctx = cb_arg; 1625 bdev_io->internal.cb = cb; 1626 bdev_io->internal.status = SPDK_BDEV_IO_STATUS_PENDING; 1627 bdev_io->internal.in_submit_request = false; 1628 bdev_io->internal.buf = NULL; 1629 bdev_io->internal.io_submit_ch = NULL; 1630 bdev_io->internal.orig_iovs = NULL; 1631 bdev_io->internal.orig_iovcnt = 0; 1632 } 1633 1634 static bool 1635 _spdk_bdev_io_type_supported(struct spdk_bdev *bdev, enum spdk_bdev_io_type io_type) 1636 { 1637 return bdev->fn_table->io_type_supported(bdev->ctxt, io_type); 1638 } 1639 1640 bool 1641 spdk_bdev_io_type_supported(struct spdk_bdev *bdev, enum spdk_bdev_io_type io_type) 1642 { 1643 bool supported; 1644 1645 supported = _spdk_bdev_io_type_supported(bdev, io_type); 1646 1647 if (!supported) { 1648 switch (io_type) { 1649 case SPDK_BDEV_IO_TYPE_WRITE_ZEROES: 1650 /* The bdev layer will emulate write zeroes as long as write is supported. */ 1651 supported = _spdk_bdev_io_type_supported(bdev, SPDK_BDEV_IO_TYPE_WRITE); 1652 break; 1653 default: 1654 break; 1655 } 1656 } 1657 1658 return supported; 1659 } 1660 1661 int 1662 spdk_bdev_dump_info_json(struct spdk_bdev *bdev, struct spdk_json_write_ctx *w) 1663 { 1664 if (bdev->fn_table->dump_info_json) { 1665 return bdev->fn_table->dump_info_json(bdev->ctxt, w); 1666 } 1667 1668 return 0; 1669 } 1670 1671 static void 1672 spdk_bdev_qos_update_max_quota_per_timeslice(struct spdk_bdev_qos *qos) 1673 { 1674 uint32_t max_per_timeslice = 0; 1675 int i; 1676 1677 for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) { 1678 if (qos->rate_limits[i].limit == SPDK_BDEV_QOS_LIMIT_NOT_DEFINED) { 1679 qos->rate_limits[i].max_per_timeslice = 0; 1680 continue; 1681 } 1682 1683 max_per_timeslice = qos->rate_limits[i].limit * 1684 SPDK_BDEV_QOS_TIMESLICE_IN_USEC / SPDK_SEC_TO_USEC; 1685 1686 qos->rate_limits[i].max_per_timeslice = spdk_max(max_per_timeslice, 1687 qos->rate_limits[i].min_per_timeslice); 1688 1689 qos->rate_limits[i].remaining_this_timeslice = qos->rate_limits[i].max_per_timeslice; 1690 } 1691 } 1692 1693 static int 1694 spdk_bdev_channel_poll_qos(void *arg) 1695 { 1696 struct spdk_bdev_qos *qos = arg; 1697 uint64_t now = spdk_get_ticks(); 1698 int i; 1699 1700 if (now < (qos->last_timeslice + qos->timeslice_size)) { 1701 /* We received our callback earlier than expected - return 1702 * immediately and wait to do accounting until at least one 1703 * timeslice has actually expired. This should never happen 1704 * with a well-behaved timer implementation. 1705 */ 1706 return 0; 1707 } 1708 1709 /* Reset for next round of rate limiting */ 1710 for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) { 1711 /* We may have allowed the IOs or bytes to slightly overrun in the last 1712 * timeslice. remaining_this_timeslice is signed, so if it's negative 1713 * here, we'll account for the overrun so that the next timeslice will 1714 * be appropriately reduced. 1715 */ 1716 if (qos->rate_limits[i].remaining_this_timeslice > 0) { 1717 qos->rate_limits[i].remaining_this_timeslice = 0; 1718 } 1719 } 1720 1721 while (now >= (qos->last_timeslice + qos->timeslice_size)) { 1722 qos->last_timeslice += qos->timeslice_size; 1723 for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) { 1724 qos->rate_limits[i].remaining_this_timeslice += 1725 qos->rate_limits[i].max_per_timeslice; 1726 } 1727 } 1728 1729 return _spdk_bdev_qos_io_submit(qos->ch, qos); 1730 } 1731 1732 static void 1733 _spdk_bdev_channel_destroy_resource(struct spdk_bdev_channel *ch) 1734 { 1735 struct spdk_bdev_shared_resource *shared_resource; 1736 1737 spdk_put_io_channel(ch->channel); 1738 1739 shared_resource = ch->shared_resource; 1740 1741 assert(ch->io_outstanding == 0); 1742 assert(shared_resource->ref > 0); 1743 shared_resource->ref--; 1744 if (shared_resource->ref == 0) { 1745 assert(shared_resource->io_outstanding == 0); 1746 TAILQ_REMOVE(&shared_resource->mgmt_ch->shared_resources, shared_resource, link); 1747 spdk_put_io_channel(spdk_io_channel_from_ctx(shared_resource->mgmt_ch)); 1748 free(shared_resource); 1749 } 1750 } 1751 1752 /* Caller must hold bdev->internal.mutex. */ 1753 static void 1754 _spdk_bdev_enable_qos(struct spdk_bdev *bdev, struct spdk_bdev_channel *ch) 1755 { 1756 struct spdk_bdev_qos *qos = bdev->internal.qos; 1757 int i; 1758 1759 /* Rate limiting on this bdev enabled */ 1760 if (qos) { 1761 if (qos->ch == NULL) { 1762 struct spdk_io_channel *io_ch; 1763 1764 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Selecting channel %p as QoS channel for bdev %s on thread %p\n", ch, 1765 bdev->name, spdk_get_thread()); 1766 1767 /* No qos channel has been selected, so set one up */ 1768 1769 /* Take another reference to ch */ 1770 io_ch = spdk_get_io_channel(__bdev_to_io_dev(bdev)); 1771 assert(io_ch != NULL); 1772 qos->ch = ch; 1773 1774 qos->thread = spdk_io_channel_get_thread(io_ch); 1775 1776 TAILQ_INIT(&qos->queued); 1777 1778 for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) { 1779 if (_spdk_bdev_qos_is_iops_rate_limit(i) == true) { 1780 qos->rate_limits[i].min_per_timeslice = 1781 SPDK_BDEV_QOS_MIN_IO_PER_TIMESLICE; 1782 } else { 1783 qos->rate_limits[i].min_per_timeslice = 1784 SPDK_BDEV_QOS_MIN_BYTE_PER_TIMESLICE; 1785 } 1786 1787 if (qos->rate_limits[i].limit == 0) { 1788 qos->rate_limits[i].limit = SPDK_BDEV_QOS_LIMIT_NOT_DEFINED; 1789 } 1790 } 1791 spdk_bdev_qos_update_max_quota_per_timeslice(qos); 1792 qos->timeslice_size = 1793 SPDK_BDEV_QOS_TIMESLICE_IN_USEC * spdk_get_ticks_hz() / SPDK_SEC_TO_USEC; 1794 qos->last_timeslice = spdk_get_ticks(); 1795 qos->poller = spdk_poller_register(spdk_bdev_channel_poll_qos, 1796 qos, 1797 SPDK_BDEV_QOS_TIMESLICE_IN_USEC); 1798 } 1799 1800 ch->flags |= BDEV_CH_QOS_ENABLED; 1801 } 1802 } 1803 1804 static int 1805 spdk_bdev_channel_create(void *io_device, void *ctx_buf) 1806 { 1807 struct spdk_bdev *bdev = __bdev_from_io_dev(io_device); 1808 struct spdk_bdev_channel *ch = ctx_buf; 1809 struct spdk_io_channel *mgmt_io_ch; 1810 struct spdk_bdev_mgmt_channel *mgmt_ch; 1811 struct spdk_bdev_shared_resource *shared_resource; 1812 1813 ch->bdev = bdev; 1814 ch->channel = bdev->fn_table->get_io_channel(bdev->ctxt); 1815 if (!ch->channel) { 1816 return -1; 1817 } 1818 1819 mgmt_io_ch = spdk_get_io_channel(&g_bdev_mgr); 1820 if (!mgmt_io_ch) { 1821 return -1; 1822 } 1823 1824 mgmt_ch = spdk_io_channel_get_ctx(mgmt_io_ch); 1825 TAILQ_FOREACH(shared_resource, &mgmt_ch->shared_resources, link) { 1826 if (shared_resource->shared_ch == ch->channel) { 1827 spdk_put_io_channel(mgmt_io_ch); 1828 shared_resource->ref++; 1829 break; 1830 } 1831 } 1832 1833 if (shared_resource == NULL) { 1834 shared_resource = calloc(1, sizeof(*shared_resource)); 1835 if (shared_resource == NULL) { 1836 spdk_put_io_channel(mgmt_io_ch); 1837 return -1; 1838 } 1839 1840 shared_resource->mgmt_ch = mgmt_ch; 1841 shared_resource->io_outstanding = 0; 1842 TAILQ_INIT(&shared_resource->nomem_io); 1843 shared_resource->nomem_threshold = 0; 1844 shared_resource->shared_ch = ch->channel; 1845 shared_resource->ref = 1; 1846 TAILQ_INSERT_TAIL(&mgmt_ch->shared_resources, shared_resource, link); 1847 } 1848 1849 memset(&ch->stat, 0, sizeof(ch->stat)); 1850 ch->stat.ticks_rate = spdk_get_ticks_hz(); 1851 ch->io_outstanding = 0; 1852 TAILQ_INIT(&ch->queued_resets); 1853 ch->flags = 0; 1854 ch->shared_resource = shared_resource; 1855 1856 #ifdef SPDK_CONFIG_VTUNE 1857 { 1858 char *name; 1859 __itt_init_ittlib(NULL, 0); 1860 name = spdk_sprintf_alloc("spdk_bdev_%s_%p", ch->bdev->name, ch); 1861 if (!name) { 1862 _spdk_bdev_channel_destroy_resource(ch); 1863 return -1; 1864 } 1865 ch->handle = __itt_string_handle_create(name); 1866 free(name); 1867 ch->start_tsc = spdk_get_ticks(); 1868 ch->interval_tsc = spdk_get_ticks_hz() / 100; 1869 memset(&ch->prev_stat, 0, sizeof(ch->prev_stat)); 1870 } 1871 #endif 1872 1873 pthread_mutex_lock(&bdev->internal.mutex); 1874 _spdk_bdev_enable_qos(bdev, ch); 1875 pthread_mutex_unlock(&bdev->internal.mutex); 1876 1877 return 0; 1878 } 1879 1880 /* 1881 * Abort I/O that are waiting on a data buffer. These types of I/O are 1882 * linked using the spdk_bdev_io internal.buf_link TAILQ_ENTRY. 1883 */ 1884 static void 1885 _spdk_bdev_abort_buf_io(bdev_io_stailq_t *queue, struct spdk_bdev_channel *ch) 1886 { 1887 bdev_io_stailq_t tmp; 1888 struct spdk_bdev_io *bdev_io; 1889 1890 STAILQ_INIT(&tmp); 1891 1892 while (!STAILQ_EMPTY(queue)) { 1893 bdev_io = STAILQ_FIRST(queue); 1894 STAILQ_REMOVE_HEAD(queue, internal.buf_link); 1895 if (bdev_io->internal.ch == ch) { 1896 spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED); 1897 } else { 1898 STAILQ_INSERT_TAIL(&tmp, bdev_io, internal.buf_link); 1899 } 1900 } 1901 1902 STAILQ_SWAP(&tmp, queue, spdk_bdev_io); 1903 } 1904 1905 /* 1906 * Abort I/O that are queued waiting for submission. These types of I/O are 1907 * linked using the spdk_bdev_io link TAILQ_ENTRY. 1908 */ 1909 static void 1910 _spdk_bdev_abort_queued_io(bdev_io_tailq_t *queue, struct spdk_bdev_channel *ch) 1911 { 1912 struct spdk_bdev_io *bdev_io, *tmp; 1913 1914 TAILQ_FOREACH_SAFE(bdev_io, queue, internal.link, tmp) { 1915 if (bdev_io->internal.ch == ch) { 1916 TAILQ_REMOVE(queue, bdev_io, internal.link); 1917 /* 1918 * spdk_bdev_io_complete() assumes that the completed I/O had 1919 * been submitted to the bdev module. Since in this case it 1920 * hadn't, bump io_outstanding to account for the decrement 1921 * that spdk_bdev_io_complete() will do. 1922 */ 1923 if (bdev_io->type != SPDK_BDEV_IO_TYPE_RESET) { 1924 ch->io_outstanding++; 1925 ch->shared_resource->io_outstanding++; 1926 } 1927 spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED); 1928 } 1929 } 1930 } 1931 1932 static void 1933 spdk_bdev_qos_channel_destroy(void *cb_arg) 1934 { 1935 struct spdk_bdev_qos *qos = cb_arg; 1936 1937 spdk_put_io_channel(spdk_io_channel_from_ctx(qos->ch)); 1938 spdk_poller_unregister(&qos->poller); 1939 1940 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Free QoS %p.\n", qos); 1941 1942 free(qos); 1943 } 1944 1945 static int 1946 spdk_bdev_qos_destroy(struct spdk_bdev *bdev) 1947 { 1948 int i; 1949 1950 /* 1951 * Cleanly shutting down the QoS poller is tricky, because 1952 * during the asynchronous operation the user could open 1953 * a new descriptor and create a new channel, spawning 1954 * a new QoS poller. 1955 * 1956 * The strategy is to create a new QoS structure here and swap it 1957 * in. The shutdown path then continues to refer to the old one 1958 * until it completes and then releases it. 1959 */ 1960 struct spdk_bdev_qos *new_qos, *old_qos; 1961 1962 old_qos = bdev->internal.qos; 1963 1964 new_qos = calloc(1, sizeof(*new_qos)); 1965 if (!new_qos) { 1966 SPDK_ERRLOG("Unable to allocate memory to shut down QoS.\n"); 1967 return -ENOMEM; 1968 } 1969 1970 /* Copy the old QoS data into the newly allocated structure */ 1971 memcpy(new_qos, old_qos, sizeof(*new_qos)); 1972 1973 /* Zero out the key parts of the QoS structure */ 1974 new_qos->ch = NULL; 1975 new_qos->thread = NULL; 1976 new_qos->poller = NULL; 1977 TAILQ_INIT(&new_qos->queued); 1978 /* 1979 * The limit member of spdk_bdev_qos_limit structure is not zeroed. 1980 * It will be used later for the new QoS structure. 1981 */ 1982 for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) { 1983 new_qos->rate_limits[i].remaining_this_timeslice = 0; 1984 new_qos->rate_limits[i].min_per_timeslice = 0; 1985 new_qos->rate_limits[i].max_per_timeslice = 0; 1986 } 1987 1988 bdev->internal.qos = new_qos; 1989 1990 if (old_qos->thread == NULL) { 1991 free(old_qos); 1992 } else { 1993 spdk_thread_send_msg(old_qos->thread, spdk_bdev_qos_channel_destroy, 1994 old_qos); 1995 } 1996 1997 /* It is safe to continue with destroying the bdev even though the QoS channel hasn't 1998 * been destroyed yet. The destruction path will end up waiting for the final 1999 * channel to be put before it releases resources. */ 2000 2001 return 0; 2002 } 2003 2004 static void 2005 _spdk_bdev_io_stat_add(struct spdk_bdev_io_stat *total, struct spdk_bdev_io_stat *add) 2006 { 2007 total->bytes_read += add->bytes_read; 2008 total->num_read_ops += add->num_read_ops; 2009 total->bytes_written += add->bytes_written; 2010 total->num_write_ops += add->num_write_ops; 2011 total->read_latency_ticks += add->read_latency_ticks; 2012 total->write_latency_ticks += add->write_latency_ticks; 2013 } 2014 2015 static void 2016 spdk_bdev_channel_destroy(void *io_device, void *ctx_buf) 2017 { 2018 struct spdk_bdev_channel *ch = ctx_buf; 2019 struct spdk_bdev_mgmt_channel *mgmt_ch; 2020 struct spdk_bdev_shared_resource *shared_resource = ch->shared_resource; 2021 2022 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Destroying channel %p for bdev %s on thread %p\n", ch, ch->bdev->name, 2023 spdk_get_thread()); 2024 2025 /* This channel is going away, so add its statistics into the bdev so that they don't get lost. */ 2026 pthread_mutex_lock(&ch->bdev->internal.mutex); 2027 _spdk_bdev_io_stat_add(&ch->bdev->internal.stat, &ch->stat); 2028 pthread_mutex_unlock(&ch->bdev->internal.mutex); 2029 2030 mgmt_ch = shared_resource->mgmt_ch; 2031 2032 _spdk_bdev_abort_queued_io(&ch->queued_resets, ch); 2033 _spdk_bdev_abort_queued_io(&shared_resource->nomem_io, ch); 2034 _spdk_bdev_abort_buf_io(&mgmt_ch->need_buf_small, ch); 2035 _spdk_bdev_abort_buf_io(&mgmt_ch->need_buf_large, ch); 2036 2037 _spdk_bdev_channel_destroy_resource(ch); 2038 } 2039 2040 int 2041 spdk_bdev_alias_add(struct spdk_bdev *bdev, const char *alias) 2042 { 2043 struct spdk_bdev_alias *tmp; 2044 2045 if (alias == NULL) { 2046 SPDK_ERRLOG("Empty alias passed\n"); 2047 return -EINVAL; 2048 } 2049 2050 if (spdk_bdev_get_by_name(alias)) { 2051 SPDK_ERRLOG("Bdev name/alias: %s already exists\n", alias); 2052 return -EEXIST; 2053 } 2054 2055 tmp = calloc(1, sizeof(*tmp)); 2056 if (tmp == NULL) { 2057 SPDK_ERRLOG("Unable to allocate alias\n"); 2058 return -ENOMEM; 2059 } 2060 2061 tmp->alias = strdup(alias); 2062 if (tmp->alias == NULL) { 2063 free(tmp); 2064 SPDK_ERRLOG("Unable to allocate alias\n"); 2065 return -ENOMEM; 2066 } 2067 2068 TAILQ_INSERT_TAIL(&bdev->aliases, tmp, tailq); 2069 2070 return 0; 2071 } 2072 2073 int 2074 spdk_bdev_alias_del(struct spdk_bdev *bdev, const char *alias) 2075 { 2076 struct spdk_bdev_alias *tmp; 2077 2078 TAILQ_FOREACH(tmp, &bdev->aliases, tailq) { 2079 if (strcmp(alias, tmp->alias) == 0) { 2080 TAILQ_REMOVE(&bdev->aliases, tmp, tailq); 2081 free(tmp->alias); 2082 free(tmp); 2083 return 0; 2084 } 2085 } 2086 2087 SPDK_INFOLOG(SPDK_LOG_BDEV, "Alias %s does not exists\n", alias); 2088 2089 return -ENOENT; 2090 } 2091 2092 void 2093 spdk_bdev_alias_del_all(struct spdk_bdev *bdev) 2094 { 2095 struct spdk_bdev_alias *p, *tmp; 2096 2097 TAILQ_FOREACH_SAFE(p, &bdev->aliases, tailq, tmp) { 2098 TAILQ_REMOVE(&bdev->aliases, p, tailq); 2099 free(p->alias); 2100 free(p); 2101 } 2102 } 2103 2104 struct spdk_io_channel * 2105 spdk_bdev_get_io_channel(struct spdk_bdev_desc *desc) 2106 { 2107 return spdk_get_io_channel(__bdev_to_io_dev(desc->bdev)); 2108 } 2109 2110 const char * 2111 spdk_bdev_get_name(const struct spdk_bdev *bdev) 2112 { 2113 return bdev->name; 2114 } 2115 2116 const char * 2117 spdk_bdev_get_product_name(const struct spdk_bdev *bdev) 2118 { 2119 return bdev->product_name; 2120 } 2121 2122 const struct spdk_bdev_aliases_list * 2123 spdk_bdev_get_aliases(const struct spdk_bdev *bdev) 2124 { 2125 return &bdev->aliases; 2126 } 2127 2128 uint32_t 2129 spdk_bdev_get_block_size(const struct spdk_bdev *bdev) 2130 { 2131 return bdev->blocklen; 2132 } 2133 2134 uint64_t 2135 spdk_bdev_get_num_blocks(const struct spdk_bdev *bdev) 2136 { 2137 return bdev->blockcnt; 2138 } 2139 2140 const char * 2141 spdk_bdev_get_qos_rpc_type(enum spdk_bdev_qos_rate_limit_type type) 2142 { 2143 return qos_rpc_type[type]; 2144 } 2145 2146 void 2147 spdk_bdev_get_qos_rate_limits(struct spdk_bdev *bdev, uint64_t *limits) 2148 { 2149 int i; 2150 2151 memset(limits, 0, sizeof(*limits) * SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES); 2152 2153 pthread_mutex_lock(&bdev->internal.mutex); 2154 if (bdev->internal.qos) { 2155 for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) { 2156 if (bdev->internal.qos->rate_limits[i].limit != 2157 SPDK_BDEV_QOS_LIMIT_NOT_DEFINED) { 2158 limits[i] = bdev->internal.qos->rate_limits[i].limit; 2159 if (_spdk_bdev_qos_is_iops_rate_limit(i) == false) { 2160 /* Change from Byte to Megabyte which is user visible. */ 2161 limits[i] = limits[i] / 1024 / 1024; 2162 } 2163 } 2164 } 2165 } 2166 pthread_mutex_unlock(&bdev->internal.mutex); 2167 } 2168 2169 size_t 2170 spdk_bdev_get_buf_align(const struct spdk_bdev *bdev) 2171 { 2172 return 1 << bdev->required_alignment; 2173 } 2174 2175 uint32_t 2176 spdk_bdev_get_optimal_io_boundary(const struct spdk_bdev *bdev) 2177 { 2178 return bdev->optimal_io_boundary; 2179 } 2180 2181 bool 2182 spdk_bdev_has_write_cache(const struct spdk_bdev *bdev) 2183 { 2184 return bdev->write_cache; 2185 } 2186 2187 const struct spdk_uuid * 2188 spdk_bdev_get_uuid(const struct spdk_bdev *bdev) 2189 { 2190 return &bdev->uuid; 2191 } 2192 2193 uint64_t 2194 spdk_bdev_get_qd(const struct spdk_bdev *bdev) 2195 { 2196 return bdev->internal.measured_queue_depth; 2197 } 2198 2199 uint64_t 2200 spdk_bdev_get_qd_sampling_period(const struct spdk_bdev *bdev) 2201 { 2202 return bdev->internal.period; 2203 } 2204 2205 uint64_t 2206 spdk_bdev_get_weighted_io_time(const struct spdk_bdev *bdev) 2207 { 2208 return bdev->internal.weighted_io_time; 2209 } 2210 2211 uint64_t 2212 spdk_bdev_get_io_time(const struct spdk_bdev *bdev) 2213 { 2214 return bdev->internal.io_time; 2215 } 2216 2217 static void 2218 _calculate_measured_qd_cpl(struct spdk_io_channel_iter *i, int status) 2219 { 2220 struct spdk_bdev *bdev = spdk_io_channel_iter_get_ctx(i); 2221 2222 bdev->internal.measured_queue_depth = bdev->internal.temporary_queue_depth; 2223 2224 if (bdev->internal.measured_queue_depth) { 2225 bdev->internal.io_time += bdev->internal.period; 2226 bdev->internal.weighted_io_time += bdev->internal.period * bdev->internal.measured_queue_depth; 2227 } 2228 } 2229 2230 static void 2231 _calculate_measured_qd(struct spdk_io_channel_iter *i) 2232 { 2233 struct spdk_bdev *bdev = spdk_io_channel_iter_get_ctx(i); 2234 struct spdk_io_channel *io_ch = spdk_io_channel_iter_get_channel(i); 2235 struct spdk_bdev_channel *ch = spdk_io_channel_get_ctx(io_ch); 2236 2237 bdev->internal.temporary_queue_depth += ch->io_outstanding; 2238 spdk_for_each_channel_continue(i, 0); 2239 } 2240 2241 static int 2242 spdk_bdev_calculate_measured_queue_depth(void *ctx) 2243 { 2244 struct spdk_bdev *bdev = ctx; 2245 bdev->internal.temporary_queue_depth = 0; 2246 spdk_for_each_channel(__bdev_to_io_dev(bdev), _calculate_measured_qd, bdev, 2247 _calculate_measured_qd_cpl); 2248 return 0; 2249 } 2250 2251 void 2252 spdk_bdev_set_qd_sampling_period(struct spdk_bdev *bdev, uint64_t period) 2253 { 2254 bdev->internal.period = period; 2255 2256 if (bdev->internal.qd_poller != NULL) { 2257 spdk_poller_unregister(&bdev->internal.qd_poller); 2258 bdev->internal.measured_queue_depth = UINT64_MAX; 2259 } 2260 2261 if (period != 0) { 2262 bdev->internal.qd_poller = spdk_poller_register(spdk_bdev_calculate_measured_queue_depth, bdev, 2263 period); 2264 } 2265 } 2266 2267 int 2268 spdk_bdev_notify_blockcnt_change(struct spdk_bdev *bdev, uint64_t size) 2269 { 2270 int ret; 2271 2272 pthread_mutex_lock(&bdev->internal.mutex); 2273 2274 /* bdev has open descriptors */ 2275 if (!TAILQ_EMPTY(&bdev->internal.open_descs) && 2276 bdev->blockcnt > size) { 2277 ret = -EBUSY; 2278 } else { 2279 bdev->blockcnt = size; 2280 ret = 0; 2281 } 2282 2283 pthread_mutex_unlock(&bdev->internal.mutex); 2284 2285 return ret; 2286 } 2287 2288 /* 2289 * Convert I/O offset and length from bytes to blocks. 2290 * 2291 * Returns zero on success or non-zero if the byte parameters aren't divisible by the block size. 2292 */ 2293 static uint64_t 2294 spdk_bdev_bytes_to_blocks(struct spdk_bdev *bdev, uint64_t offset_bytes, uint64_t *offset_blocks, 2295 uint64_t num_bytes, uint64_t *num_blocks) 2296 { 2297 uint32_t block_size = bdev->blocklen; 2298 uint8_t shift_cnt; 2299 2300 /* Avoid expensive div operations if possible. These spdk_u32 functions are very cheap. */ 2301 if (spdk_likely(spdk_u32_is_pow2(block_size))) { 2302 shift_cnt = spdk_u32log2(block_size); 2303 *offset_blocks = offset_bytes >> shift_cnt; 2304 *num_blocks = num_bytes >> shift_cnt; 2305 return (offset_bytes - (*offset_blocks << shift_cnt)) | 2306 (num_bytes - (*num_blocks << shift_cnt)); 2307 } else { 2308 *offset_blocks = offset_bytes / block_size; 2309 *num_blocks = num_bytes / block_size; 2310 return (offset_bytes % block_size) | (num_bytes % block_size); 2311 } 2312 } 2313 2314 static bool 2315 spdk_bdev_io_valid_blocks(struct spdk_bdev *bdev, uint64_t offset_blocks, uint64_t num_blocks) 2316 { 2317 /* Return failure if offset_blocks + num_blocks is less than offset_blocks; indicates there 2318 * has been an overflow and hence the offset has been wrapped around */ 2319 if (offset_blocks + num_blocks < offset_blocks) { 2320 return false; 2321 } 2322 2323 /* Return failure if offset_blocks + num_blocks exceeds the size of the bdev */ 2324 if (offset_blocks + num_blocks > bdev->blockcnt) { 2325 return false; 2326 } 2327 2328 return true; 2329 } 2330 2331 int 2332 spdk_bdev_read(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 2333 void *buf, uint64_t offset, uint64_t nbytes, 2334 spdk_bdev_io_completion_cb cb, void *cb_arg) 2335 { 2336 uint64_t offset_blocks, num_blocks; 2337 2338 if (spdk_bdev_bytes_to_blocks(desc->bdev, offset, &offset_blocks, nbytes, &num_blocks) != 0) { 2339 return -EINVAL; 2340 } 2341 2342 return spdk_bdev_read_blocks(desc, ch, buf, offset_blocks, num_blocks, cb, cb_arg); 2343 } 2344 2345 int 2346 spdk_bdev_read_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 2347 void *buf, uint64_t offset_blocks, uint64_t num_blocks, 2348 spdk_bdev_io_completion_cb cb, void *cb_arg) 2349 { 2350 struct spdk_bdev *bdev = desc->bdev; 2351 struct spdk_bdev_io *bdev_io; 2352 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 2353 2354 if (!spdk_bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) { 2355 return -EINVAL; 2356 } 2357 2358 bdev_io = spdk_bdev_get_io(channel); 2359 if (!bdev_io) { 2360 return -ENOMEM; 2361 } 2362 2363 bdev_io->internal.ch = channel; 2364 bdev_io->internal.desc = desc; 2365 bdev_io->type = SPDK_BDEV_IO_TYPE_READ; 2366 bdev_io->u.bdev.iovs = &bdev_io->iov; 2367 bdev_io->u.bdev.iovs[0].iov_base = buf; 2368 bdev_io->u.bdev.iovs[0].iov_len = num_blocks * bdev->blocklen; 2369 bdev_io->u.bdev.iovcnt = 1; 2370 bdev_io->u.bdev.num_blocks = num_blocks; 2371 bdev_io->u.bdev.offset_blocks = offset_blocks; 2372 spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb); 2373 2374 spdk_bdev_io_submit(bdev_io); 2375 return 0; 2376 } 2377 2378 int 2379 spdk_bdev_readv(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 2380 struct iovec *iov, int iovcnt, 2381 uint64_t offset, uint64_t nbytes, 2382 spdk_bdev_io_completion_cb cb, void *cb_arg) 2383 { 2384 uint64_t offset_blocks, num_blocks; 2385 2386 if (spdk_bdev_bytes_to_blocks(desc->bdev, offset, &offset_blocks, nbytes, &num_blocks) != 0) { 2387 return -EINVAL; 2388 } 2389 2390 return spdk_bdev_readv_blocks(desc, ch, iov, iovcnt, offset_blocks, num_blocks, cb, cb_arg); 2391 } 2392 2393 int spdk_bdev_readv_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 2394 struct iovec *iov, int iovcnt, 2395 uint64_t offset_blocks, uint64_t num_blocks, 2396 spdk_bdev_io_completion_cb cb, void *cb_arg) 2397 { 2398 struct spdk_bdev *bdev = desc->bdev; 2399 struct spdk_bdev_io *bdev_io; 2400 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 2401 2402 if (!spdk_bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) { 2403 return -EINVAL; 2404 } 2405 2406 bdev_io = spdk_bdev_get_io(channel); 2407 if (!bdev_io) { 2408 return -ENOMEM; 2409 } 2410 2411 bdev_io->internal.ch = channel; 2412 bdev_io->internal.desc = desc; 2413 bdev_io->type = SPDK_BDEV_IO_TYPE_READ; 2414 bdev_io->u.bdev.iovs = iov; 2415 bdev_io->u.bdev.iovcnt = iovcnt; 2416 bdev_io->u.bdev.num_blocks = num_blocks; 2417 bdev_io->u.bdev.offset_blocks = offset_blocks; 2418 spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb); 2419 2420 spdk_bdev_io_submit(bdev_io); 2421 return 0; 2422 } 2423 2424 int 2425 spdk_bdev_write(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 2426 void *buf, uint64_t offset, uint64_t nbytes, 2427 spdk_bdev_io_completion_cb cb, void *cb_arg) 2428 { 2429 uint64_t offset_blocks, num_blocks; 2430 2431 if (spdk_bdev_bytes_to_blocks(desc->bdev, offset, &offset_blocks, nbytes, &num_blocks) != 0) { 2432 return -EINVAL; 2433 } 2434 2435 return spdk_bdev_write_blocks(desc, ch, buf, offset_blocks, num_blocks, cb, cb_arg); 2436 } 2437 2438 int 2439 spdk_bdev_write_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 2440 void *buf, uint64_t offset_blocks, uint64_t num_blocks, 2441 spdk_bdev_io_completion_cb cb, void *cb_arg) 2442 { 2443 struct spdk_bdev *bdev = desc->bdev; 2444 struct spdk_bdev_io *bdev_io; 2445 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 2446 2447 if (!desc->write) { 2448 return -EBADF; 2449 } 2450 2451 if (!spdk_bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) { 2452 return -EINVAL; 2453 } 2454 2455 bdev_io = spdk_bdev_get_io(channel); 2456 if (!bdev_io) { 2457 return -ENOMEM; 2458 } 2459 2460 bdev_io->internal.ch = channel; 2461 bdev_io->internal.desc = desc; 2462 bdev_io->type = SPDK_BDEV_IO_TYPE_WRITE; 2463 bdev_io->u.bdev.iovs = &bdev_io->iov; 2464 bdev_io->u.bdev.iovs[0].iov_base = buf; 2465 bdev_io->u.bdev.iovs[0].iov_len = num_blocks * bdev->blocklen; 2466 bdev_io->u.bdev.iovcnt = 1; 2467 bdev_io->u.bdev.num_blocks = num_blocks; 2468 bdev_io->u.bdev.offset_blocks = offset_blocks; 2469 spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb); 2470 2471 spdk_bdev_io_submit(bdev_io); 2472 return 0; 2473 } 2474 2475 int 2476 spdk_bdev_writev(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 2477 struct iovec *iov, int iovcnt, 2478 uint64_t offset, uint64_t len, 2479 spdk_bdev_io_completion_cb cb, void *cb_arg) 2480 { 2481 uint64_t offset_blocks, num_blocks; 2482 2483 if (spdk_bdev_bytes_to_blocks(desc->bdev, offset, &offset_blocks, len, &num_blocks) != 0) { 2484 return -EINVAL; 2485 } 2486 2487 return spdk_bdev_writev_blocks(desc, ch, iov, iovcnt, offset_blocks, num_blocks, cb, cb_arg); 2488 } 2489 2490 int 2491 spdk_bdev_writev_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 2492 struct iovec *iov, int iovcnt, 2493 uint64_t offset_blocks, uint64_t num_blocks, 2494 spdk_bdev_io_completion_cb cb, void *cb_arg) 2495 { 2496 struct spdk_bdev *bdev = desc->bdev; 2497 struct spdk_bdev_io *bdev_io; 2498 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 2499 2500 if (!desc->write) { 2501 return -EBADF; 2502 } 2503 2504 if (!spdk_bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) { 2505 return -EINVAL; 2506 } 2507 2508 bdev_io = spdk_bdev_get_io(channel); 2509 if (!bdev_io) { 2510 return -ENOMEM; 2511 } 2512 2513 bdev_io->internal.ch = channel; 2514 bdev_io->internal.desc = desc; 2515 bdev_io->type = SPDK_BDEV_IO_TYPE_WRITE; 2516 bdev_io->u.bdev.iovs = iov; 2517 bdev_io->u.bdev.iovcnt = iovcnt; 2518 bdev_io->u.bdev.num_blocks = num_blocks; 2519 bdev_io->u.bdev.offset_blocks = offset_blocks; 2520 spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb); 2521 2522 spdk_bdev_io_submit(bdev_io); 2523 return 0; 2524 } 2525 2526 int 2527 spdk_bdev_write_zeroes(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 2528 uint64_t offset, uint64_t len, 2529 spdk_bdev_io_completion_cb cb, void *cb_arg) 2530 { 2531 uint64_t offset_blocks, num_blocks; 2532 2533 if (spdk_bdev_bytes_to_blocks(desc->bdev, offset, &offset_blocks, len, &num_blocks) != 0) { 2534 return -EINVAL; 2535 } 2536 2537 return spdk_bdev_write_zeroes_blocks(desc, ch, offset_blocks, num_blocks, cb, cb_arg); 2538 } 2539 2540 int 2541 spdk_bdev_write_zeroes_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 2542 uint64_t offset_blocks, uint64_t num_blocks, 2543 spdk_bdev_io_completion_cb cb, void *cb_arg) 2544 { 2545 struct spdk_bdev *bdev = desc->bdev; 2546 struct spdk_bdev_io *bdev_io; 2547 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 2548 2549 if (!desc->write) { 2550 return -EBADF; 2551 } 2552 2553 if (!spdk_bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) { 2554 return -EINVAL; 2555 } 2556 2557 bdev_io = spdk_bdev_get_io(channel); 2558 2559 if (!bdev_io) { 2560 return -ENOMEM; 2561 } 2562 2563 bdev_io->type = SPDK_BDEV_IO_TYPE_WRITE_ZEROES; 2564 bdev_io->internal.ch = channel; 2565 bdev_io->internal.desc = desc; 2566 bdev_io->u.bdev.offset_blocks = offset_blocks; 2567 bdev_io->u.bdev.num_blocks = num_blocks; 2568 spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb); 2569 2570 if (_spdk_bdev_io_type_supported(bdev, SPDK_BDEV_IO_TYPE_WRITE_ZEROES)) { 2571 spdk_bdev_io_submit(bdev_io); 2572 return 0; 2573 } else if (_spdk_bdev_io_type_supported(bdev, SPDK_BDEV_IO_TYPE_WRITE)) { 2574 assert(spdk_bdev_get_block_size(bdev) <= ZERO_BUFFER_SIZE); 2575 bdev_io->u.bdev.split_remaining_num_blocks = num_blocks; 2576 bdev_io->u.bdev.split_current_offset_blocks = offset_blocks; 2577 _spdk_bdev_write_zero_buffer_next(bdev_io); 2578 return 0; 2579 } else { 2580 spdk_bdev_free_io(bdev_io); 2581 return -ENOTSUP; 2582 } 2583 } 2584 2585 int 2586 spdk_bdev_unmap(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 2587 uint64_t offset, uint64_t nbytes, 2588 spdk_bdev_io_completion_cb cb, void *cb_arg) 2589 { 2590 uint64_t offset_blocks, num_blocks; 2591 2592 if (spdk_bdev_bytes_to_blocks(desc->bdev, offset, &offset_blocks, nbytes, &num_blocks) != 0) { 2593 return -EINVAL; 2594 } 2595 2596 return spdk_bdev_unmap_blocks(desc, ch, offset_blocks, num_blocks, cb, cb_arg); 2597 } 2598 2599 int 2600 spdk_bdev_unmap_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 2601 uint64_t offset_blocks, uint64_t num_blocks, 2602 spdk_bdev_io_completion_cb cb, void *cb_arg) 2603 { 2604 struct spdk_bdev *bdev = desc->bdev; 2605 struct spdk_bdev_io *bdev_io; 2606 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 2607 2608 if (!desc->write) { 2609 return -EBADF; 2610 } 2611 2612 if (!spdk_bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) { 2613 return -EINVAL; 2614 } 2615 2616 if (num_blocks == 0) { 2617 SPDK_ERRLOG("Can't unmap 0 bytes\n"); 2618 return -EINVAL; 2619 } 2620 2621 bdev_io = spdk_bdev_get_io(channel); 2622 if (!bdev_io) { 2623 return -ENOMEM; 2624 } 2625 2626 bdev_io->internal.ch = channel; 2627 bdev_io->internal.desc = desc; 2628 bdev_io->type = SPDK_BDEV_IO_TYPE_UNMAP; 2629 2630 bdev_io->u.bdev.iovs = &bdev_io->iov; 2631 bdev_io->u.bdev.iovs[0].iov_base = NULL; 2632 bdev_io->u.bdev.iovs[0].iov_len = 0; 2633 bdev_io->u.bdev.iovcnt = 1; 2634 2635 bdev_io->u.bdev.offset_blocks = offset_blocks; 2636 bdev_io->u.bdev.num_blocks = num_blocks; 2637 spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb); 2638 2639 spdk_bdev_io_submit(bdev_io); 2640 return 0; 2641 } 2642 2643 int 2644 spdk_bdev_flush(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 2645 uint64_t offset, uint64_t length, 2646 spdk_bdev_io_completion_cb cb, void *cb_arg) 2647 { 2648 uint64_t offset_blocks, num_blocks; 2649 2650 if (spdk_bdev_bytes_to_blocks(desc->bdev, offset, &offset_blocks, length, &num_blocks) != 0) { 2651 return -EINVAL; 2652 } 2653 2654 return spdk_bdev_flush_blocks(desc, ch, offset_blocks, num_blocks, cb, cb_arg); 2655 } 2656 2657 int 2658 spdk_bdev_flush_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 2659 uint64_t offset_blocks, uint64_t num_blocks, 2660 spdk_bdev_io_completion_cb cb, void *cb_arg) 2661 { 2662 struct spdk_bdev *bdev = desc->bdev; 2663 struct spdk_bdev_io *bdev_io; 2664 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 2665 2666 if (!desc->write) { 2667 return -EBADF; 2668 } 2669 2670 if (!spdk_bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) { 2671 return -EINVAL; 2672 } 2673 2674 bdev_io = spdk_bdev_get_io(channel); 2675 if (!bdev_io) { 2676 return -ENOMEM; 2677 } 2678 2679 bdev_io->internal.ch = channel; 2680 bdev_io->internal.desc = desc; 2681 bdev_io->type = SPDK_BDEV_IO_TYPE_FLUSH; 2682 bdev_io->u.bdev.iovs = NULL; 2683 bdev_io->u.bdev.iovcnt = 0; 2684 bdev_io->u.bdev.offset_blocks = offset_blocks; 2685 bdev_io->u.bdev.num_blocks = num_blocks; 2686 spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb); 2687 2688 spdk_bdev_io_submit(bdev_io); 2689 return 0; 2690 } 2691 2692 static void 2693 _spdk_bdev_reset_dev(struct spdk_io_channel_iter *i, int status) 2694 { 2695 struct spdk_bdev_channel *ch = spdk_io_channel_iter_get_ctx(i); 2696 struct spdk_bdev_io *bdev_io; 2697 2698 bdev_io = TAILQ_FIRST(&ch->queued_resets); 2699 TAILQ_REMOVE(&ch->queued_resets, bdev_io, internal.link); 2700 spdk_bdev_io_submit_reset(bdev_io); 2701 } 2702 2703 static void 2704 _spdk_bdev_reset_freeze_channel(struct spdk_io_channel_iter *i) 2705 { 2706 struct spdk_io_channel *ch; 2707 struct spdk_bdev_channel *channel; 2708 struct spdk_bdev_mgmt_channel *mgmt_channel; 2709 struct spdk_bdev_shared_resource *shared_resource; 2710 bdev_io_tailq_t tmp_queued; 2711 2712 TAILQ_INIT(&tmp_queued); 2713 2714 ch = spdk_io_channel_iter_get_channel(i); 2715 channel = spdk_io_channel_get_ctx(ch); 2716 shared_resource = channel->shared_resource; 2717 mgmt_channel = shared_resource->mgmt_ch; 2718 2719 channel->flags |= BDEV_CH_RESET_IN_PROGRESS; 2720 2721 if ((channel->flags & BDEV_CH_QOS_ENABLED) != 0) { 2722 /* The QoS object is always valid and readable while 2723 * the channel flag is set, so the lock here should not 2724 * be necessary. We're not in the fast path though, so 2725 * just take it anyway. */ 2726 pthread_mutex_lock(&channel->bdev->internal.mutex); 2727 if (channel->bdev->internal.qos->ch == channel) { 2728 TAILQ_SWAP(&channel->bdev->internal.qos->queued, &tmp_queued, spdk_bdev_io, internal.link); 2729 } 2730 pthread_mutex_unlock(&channel->bdev->internal.mutex); 2731 } 2732 2733 _spdk_bdev_abort_queued_io(&shared_resource->nomem_io, channel); 2734 _spdk_bdev_abort_buf_io(&mgmt_channel->need_buf_small, channel); 2735 _spdk_bdev_abort_buf_io(&mgmt_channel->need_buf_large, channel); 2736 _spdk_bdev_abort_queued_io(&tmp_queued, channel); 2737 2738 spdk_for_each_channel_continue(i, 0); 2739 } 2740 2741 static void 2742 _spdk_bdev_start_reset(void *ctx) 2743 { 2744 struct spdk_bdev_channel *ch = ctx; 2745 2746 spdk_for_each_channel(__bdev_to_io_dev(ch->bdev), _spdk_bdev_reset_freeze_channel, 2747 ch, _spdk_bdev_reset_dev); 2748 } 2749 2750 static void 2751 _spdk_bdev_channel_start_reset(struct spdk_bdev_channel *ch) 2752 { 2753 struct spdk_bdev *bdev = ch->bdev; 2754 2755 assert(!TAILQ_EMPTY(&ch->queued_resets)); 2756 2757 pthread_mutex_lock(&bdev->internal.mutex); 2758 if (bdev->internal.reset_in_progress == NULL) { 2759 bdev->internal.reset_in_progress = TAILQ_FIRST(&ch->queued_resets); 2760 /* 2761 * Take a channel reference for the target bdev for the life of this 2762 * reset. This guards against the channel getting destroyed while 2763 * spdk_for_each_channel() calls related to this reset IO are in 2764 * progress. We will release the reference when this reset is 2765 * completed. 2766 */ 2767 bdev->internal.reset_in_progress->u.reset.ch_ref = spdk_get_io_channel(__bdev_to_io_dev(bdev)); 2768 _spdk_bdev_start_reset(ch); 2769 } 2770 pthread_mutex_unlock(&bdev->internal.mutex); 2771 } 2772 2773 int 2774 spdk_bdev_reset(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 2775 spdk_bdev_io_completion_cb cb, void *cb_arg) 2776 { 2777 struct spdk_bdev *bdev = desc->bdev; 2778 struct spdk_bdev_io *bdev_io; 2779 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 2780 2781 bdev_io = spdk_bdev_get_io(channel); 2782 if (!bdev_io) { 2783 return -ENOMEM; 2784 } 2785 2786 bdev_io->internal.ch = channel; 2787 bdev_io->internal.desc = desc; 2788 bdev_io->type = SPDK_BDEV_IO_TYPE_RESET; 2789 bdev_io->u.reset.ch_ref = NULL; 2790 spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb); 2791 2792 pthread_mutex_lock(&bdev->internal.mutex); 2793 TAILQ_INSERT_TAIL(&channel->queued_resets, bdev_io, internal.link); 2794 pthread_mutex_unlock(&bdev->internal.mutex); 2795 2796 _spdk_bdev_channel_start_reset(channel); 2797 2798 return 0; 2799 } 2800 2801 void 2802 spdk_bdev_get_io_stat(struct spdk_bdev *bdev, struct spdk_io_channel *ch, 2803 struct spdk_bdev_io_stat *stat) 2804 { 2805 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 2806 2807 *stat = channel->stat; 2808 } 2809 2810 static void 2811 _spdk_bdev_get_device_stat_done(struct spdk_io_channel_iter *i, int status) 2812 { 2813 void *io_device = spdk_io_channel_iter_get_io_device(i); 2814 struct spdk_bdev_iostat_ctx *bdev_iostat_ctx = spdk_io_channel_iter_get_ctx(i); 2815 2816 bdev_iostat_ctx->cb(__bdev_from_io_dev(io_device), bdev_iostat_ctx->stat, 2817 bdev_iostat_ctx->cb_arg, 0); 2818 free(bdev_iostat_ctx); 2819 } 2820 2821 static void 2822 _spdk_bdev_get_each_channel_stat(struct spdk_io_channel_iter *i) 2823 { 2824 struct spdk_bdev_iostat_ctx *bdev_iostat_ctx = spdk_io_channel_iter_get_ctx(i); 2825 struct spdk_io_channel *ch = spdk_io_channel_iter_get_channel(i); 2826 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 2827 2828 _spdk_bdev_io_stat_add(bdev_iostat_ctx->stat, &channel->stat); 2829 spdk_for_each_channel_continue(i, 0); 2830 } 2831 2832 void 2833 spdk_bdev_get_device_stat(struct spdk_bdev *bdev, struct spdk_bdev_io_stat *stat, 2834 spdk_bdev_get_device_stat_cb cb, void *cb_arg) 2835 { 2836 struct spdk_bdev_iostat_ctx *bdev_iostat_ctx; 2837 2838 assert(bdev != NULL); 2839 assert(stat != NULL); 2840 assert(cb != NULL); 2841 2842 bdev_iostat_ctx = calloc(1, sizeof(struct spdk_bdev_iostat_ctx)); 2843 if (bdev_iostat_ctx == NULL) { 2844 SPDK_ERRLOG("Unable to allocate memory for spdk_bdev_iostat_ctx\n"); 2845 cb(bdev, stat, cb_arg, -ENOMEM); 2846 return; 2847 } 2848 2849 bdev_iostat_ctx->stat = stat; 2850 bdev_iostat_ctx->cb = cb; 2851 bdev_iostat_ctx->cb_arg = cb_arg; 2852 2853 /* Start with the statistics from previously deleted channels. */ 2854 pthread_mutex_lock(&bdev->internal.mutex); 2855 _spdk_bdev_io_stat_add(bdev_iostat_ctx->stat, &bdev->internal.stat); 2856 pthread_mutex_unlock(&bdev->internal.mutex); 2857 2858 /* Then iterate and add the statistics from each existing channel. */ 2859 spdk_for_each_channel(__bdev_to_io_dev(bdev), 2860 _spdk_bdev_get_each_channel_stat, 2861 bdev_iostat_ctx, 2862 _spdk_bdev_get_device_stat_done); 2863 } 2864 2865 int 2866 spdk_bdev_nvme_admin_passthru(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 2867 const struct spdk_nvme_cmd *cmd, void *buf, size_t nbytes, 2868 spdk_bdev_io_completion_cb cb, void *cb_arg) 2869 { 2870 struct spdk_bdev *bdev = desc->bdev; 2871 struct spdk_bdev_io *bdev_io; 2872 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 2873 2874 if (!desc->write) { 2875 return -EBADF; 2876 } 2877 2878 bdev_io = spdk_bdev_get_io(channel); 2879 if (!bdev_io) { 2880 return -ENOMEM; 2881 } 2882 2883 bdev_io->internal.ch = channel; 2884 bdev_io->internal.desc = desc; 2885 bdev_io->type = SPDK_BDEV_IO_TYPE_NVME_ADMIN; 2886 bdev_io->u.nvme_passthru.cmd = *cmd; 2887 bdev_io->u.nvme_passthru.buf = buf; 2888 bdev_io->u.nvme_passthru.nbytes = nbytes; 2889 bdev_io->u.nvme_passthru.md_buf = NULL; 2890 bdev_io->u.nvme_passthru.md_len = 0; 2891 2892 spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb); 2893 2894 spdk_bdev_io_submit(bdev_io); 2895 return 0; 2896 } 2897 2898 int 2899 spdk_bdev_nvme_io_passthru(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 2900 const struct spdk_nvme_cmd *cmd, void *buf, size_t nbytes, 2901 spdk_bdev_io_completion_cb cb, void *cb_arg) 2902 { 2903 struct spdk_bdev *bdev = desc->bdev; 2904 struct spdk_bdev_io *bdev_io; 2905 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 2906 2907 if (!desc->write) { 2908 /* 2909 * Do not try to parse the NVMe command - we could maybe use bits in the opcode 2910 * to easily determine if the command is a read or write, but for now just 2911 * do not allow io_passthru with a read-only descriptor. 2912 */ 2913 return -EBADF; 2914 } 2915 2916 bdev_io = spdk_bdev_get_io(channel); 2917 if (!bdev_io) { 2918 return -ENOMEM; 2919 } 2920 2921 bdev_io->internal.ch = channel; 2922 bdev_io->internal.desc = desc; 2923 bdev_io->type = SPDK_BDEV_IO_TYPE_NVME_IO; 2924 bdev_io->u.nvme_passthru.cmd = *cmd; 2925 bdev_io->u.nvme_passthru.buf = buf; 2926 bdev_io->u.nvme_passthru.nbytes = nbytes; 2927 bdev_io->u.nvme_passthru.md_buf = NULL; 2928 bdev_io->u.nvme_passthru.md_len = 0; 2929 2930 spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb); 2931 2932 spdk_bdev_io_submit(bdev_io); 2933 return 0; 2934 } 2935 2936 int 2937 spdk_bdev_nvme_io_passthru_md(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 2938 const struct spdk_nvme_cmd *cmd, void *buf, size_t nbytes, void *md_buf, size_t md_len, 2939 spdk_bdev_io_completion_cb cb, void *cb_arg) 2940 { 2941 struct spdk_bdev *bdev = desc->bdev; 2942 struct spdk_bdev_io *bdev_io; 2943 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 2944 2945 if (!desc->write) { 2946 /* 2947 * Do not try to parse the NVMe command - we could maybe use bits in the opcode 2948 * to easily determine if the command is a read or write, but for now just 2949 * do not allow io_passthru with a read-only descriptor. 2950 */ 2951 return -EBADF; 2952 } 2953 2954 bdev_io = spdk_bdev_get_io(channel); 2955 if (!bdev_io) { 2956 return -ENOMEM; 2957 } 2958 2959 bdev_io->internal.ch = channel; 2960 bdev_io->internal.desc = desc; 2961 bdev_io->type = SPDK_BDEV_IO_TYPE_NVME_IO_MD; 2962 bdev_io->u.nvme_passthru.cmd = *cmd; 2963 bdev_io->u.nvme_passthru.buf = buf; 2964 bdev_io->u.nvme_passthru.nbytes = nbytes; 2965 bdev_io->u.nvme_passthru.md_buf = md_buf; 2966 bdev_io->u.nvme_passthru.md_len = md_len; 2967 2968 spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb); 2969 2970 spdk_bdev_io_submit(bdev_io); 2971 return 0; 2972 } 2973 2974 int 2975 spdk_bdev_queue_io_wait(struct spdk_bdev *bdev, struct spdk_io_channel *ch, 2976 struct spdk_bdev_io_wait_entry *entry) 2977 { 2978 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 2979 struct spdk_bdev_mgmt_channel *mgmt_ch = channel->shared_resource->mgmt_ch; 2980 2981 if (bdev != entry->bdev) { 2982 SPDK_ERRLOG("bdevs do not match\n"); 2983 return -EINVAL; 2984 } 2985 2986 if (mgmt_ch->per_thread_cache_count > 0) { 2987 SPDK_ERRLOG("Cannot queue io_wait if spdk_bdev_io available in per-thread cache\n"); 2988 return -EINVAL; 2989 } 2990 2991 TAILQ_INSERT_TAIL(&mgmt_ch->io_wait_queue, entry, link); 2992 return 0; 2993 } 2994 2995 static void 2996 _spdk_bdev_ch_retry_io(struct spdk_bdev_channel *bdev_ch) 2997 { 2998 struct spdk_bdev *bdev = bdev_ch->bdev; 2999 struct spdk_bdev_shared_resource *shared_resource = bdev_ch->shared_resource; 3000 struct spdk_bdev_io *bdev_io; 3001 3002 if (shared_resource->io_outstanding > shared_resource->nomem_threshold) { 3003 /* 3004 * Allow some more I/O to complete before retrying the nomem_io queue. 3005 * Some drivers (such as nvme) cannot immediately take a new I/O in 3006 * the context of a completion, because the resources for the I/O are 3007 * not released until control returns to the bdev poller. Also, we 3008 * may require several small I/O to complete before a larger I/O 3009 * (that requires splitting) can be submitted. 3010 */ 3011 return; 3012 } 3013 3014 while (!TAILQ_EMPTY(&shared_resource->nomem_io)) { 3015 bdev_io = TAILQ_FIRST(&shared_resource->nomem_io); 3016 TAILQ_REMOVE(&shared_resource->nomem_io, bdev_io, internal.link); 3017 bdev_io->internal.ch->io_outstanding++; 3018 shared_resource->io_outstanding++; 3019 bdev_io->internal.status = SPDK_BDEV_IO_STATUS_PENDING; 3020 bdev->fn_table->submit_request(bdev_io->internal.ch->channel, bdev_io); 3021 if (bdev_io->internal.status == SPDK_BDEV_IO_STATUS_NOMEM) { 3022 break; 3023 } 3024 } 3025 } 3026 3027 static inline void 3028 _spdk_bdev_io_complete(void *ctx) 3029 { 3030 struct spdk_bdev_io *bdev_io = ctx; 3031 uint64_t tsc, tsc_diff; 3032 3033 if (spdk_unlikely(bdev_io->internal.in_submit_request || bdev_io->internal.io_submit_ch)) { 3034 /* 3035 * Send the completion to the thread that originally submitted the I/O, 3036 * which may not be the current thread in the case of QoS. 3037 */ 3038 if (bdev_io->internal.io_submit_ch) { 3039 bdev_io->internal.ch = bdev_io->internal.io_submit_ch; 3040 bdev_io->internal.io_submit_ch = NULL; 3041 } 3042 3043 /* 3044 * Defer completion to avoid potential infinite recursion if the 3045 * user's completion callback issues a new I/O. 3046 */ 3047 spdk_thread_send_msg(spdk_io_channel_get_thread(bdev_io->internal.ch->channel), 3048 _spdk_bdev_io_complete, bdev_io); 3049 return; 3050 } 3051 3052 tsc = spdk_get_ticks(); 3053 tsc_diff = tsc - bdev_io->internal.submit_tsc; 3054 spdk_trace_record_tsc(tsc, TRACE_BDEV_IO_DONE, 0, 0, (uintptr_t)bdev_io, 0); 3055 3056 if (bdev_io->internal.status == SPDK_BDEV_IO_STATUS_SUCCESS) { 3057 switch (bdev_io->type) { 3058 case SPDK_BDEV_IO_TYPE_READ: 3059 bdev_io->internal.ch->stat.bytes_read += bdev_io->u.bdev.num_blocks * bdev_io->bdev->blocklen; 3060 bdev_io->internal.ch->stat.num_read_ops++; 3061 bdev_io->internal.ch->stat.read_latency_ticks += tsc_diff; 3062 break; 3063 case SPDK_BDEV_IO_TYPE_WRITE: 3064 bdev_io->internal.ch->stat.bytes_written += bdev_io->u.bdev.num_blocks * bdev_io->bdev->blocklen; 3065 bdev_io->internal.ch->stat.num_write_ops++; 3066 bdev_io->internal.ch->stat.write_latency_ticks += tsc_diff; 3067 break; 3068 default: 3069 break; 3070 } 3071 } 3072 3073 #ifdef SPDK_CONFIG_VTUNE 3074 uint64_t now_tsc = spdk_get_ticks(); 3075 if (now_tsc > (bdev_io->internal.ch->start_tsc + bdev_io->internal.ch->interval_tsc)) { 3076 uint64_t data[5]; 3077 3078 data[0] = bdev_io->internal.ch->stat.num_read_ops - bdev_io->internal.ch->prev_stat.num_read_ops; 3079 data[1] = bdev_io->internal.ch->stat.bytes_read - bdev_io->internal.ch->prev_stat.bytes_read; 3080 data[2] = bdev_io->internal.ch->stat.num_write_ops - bdev_io->internal.ch->prev_stat.num_write_ops; 3081 data[3] = bdev_io->internal.ch->stat.bytes_written - bdev_io->internal.ch->prev_stat.bytes_written; 3082 data[4] = bdev_io->bdev->fn_table->get_spin_time ? 3083 bdev_io->bdev->fn_table->get_spin_time(bdev_io->internal.ch->channel) : 0; 3084 3085 __itt_metadata_add(g_bdev_mgr.domain, __itt_null, bdev_io->internal.ch->handle, 3086 __itt_metadata_u64, 5, data); 3087 3088 bdev_io->internal.ch->prev_stat = bdev_io->internal.ch->stat; 3089 bdev_io->internal.ch->start_tsc = now_tsc; 3090 } 3091 #endif 3092 3093 assert(bdev_io->internal.cb != NULL); 3094 assert(spdk_get_thread() == spdk_io_channel_get_thread(bdev_io->internal.ch->channel)); 3095 3096 bdev_io->internal.cb(bdev_io, bdev_io->internal.status == SPDK_BDEV_IO_STATUS_SUCCESS, 3097 bdev_io->internal.caller_ctx); 3098 } 3099 3100 static void 3101 _spdk_bdev_reset_complete(struct spdk_io_channel_iter *i, int status) 3102 { 3103 struct spdk_bdev_io *bdev_io = spdk_io_channel_iter_get_ctx(i); 3104 3105 if (bdev_io->u.reset.ch_ref != NULL) { 3106 spdk_put_io_channel(bdev_io->u.reset.ch_ref); 3107 bdev_io->u.reset.ch_ref = NULL; 3108 } 3109 3110 _spdk_bdev_io_complete(bdev_io); 3111 } 3112 3113 static void 3114 _spdk_bdev_unfreeze_channel(struct spdk_io_channel_iter *i) 3115 { 3116 struct spdk_io_channel *_ch = spdk_io_channel_iter_get_channel(i); 3117 struct spdk_bdev_channel *ch = spdk_io_channel_get_ctx(_ch); 3118 3119 ch->flags &= ~BDEV_CH_RESET_IN_PROGRESS; 3120 if (!TAILQ_EMPTY(&ch->queued_resets)) { 3121 _spdk_bdev_channel_start_reset(ch); 3122 } 3123 3124 spdk_for_each_channel_continue(i, 0); 3125 } 3126 3127 void 3128 spdk_bdev_io_complete(struct spdk_bdev_io *bdev_io, enum spdk_bdev_io_status status) 3129 { 3130 struct spdk_bdev *bdev = bdev_io->bdev; 3131 struct spdk_bdev_channel *bdev_ch = bdev_io->internal.ch; 3132 struct spdk_bdev_shared_resource *shared_resource = bdev_ch->shared_resource; 3133 3134 bdev_io->internal.status = status; 3135 3136 if (spdk_unlikely(bdev_io->type == SPDK_BDEV_IO_TYPE_RESET)) { 3137 bool unlock_channels = false; 3138 3139 if (status == SPDK_BDEV_IO_STATUS_NOMEM) { 3140 SPDK_ERRLOG("NOMEM returned for reset\n"); 3141 } 3142 pthread_mutex_lock(&bdev->internal.mutex); 3143 if (bdev_io == bdev->internal.reset_in_progress) { 3144 bdev->internal.reset_in_progress = NULL; 3145 unlock_channels = true; 3146 } 3147 pthread_mutex_unlock(&bdev->internal.mutex); 3148 3149 if (unlock_channels) { 3150 spdk_for_each_channel(__bdev_to_io_dev(bdev), _spdk_bdev_unfreeze_channel, 3151 bdev_io, _spdk_bdev_reset_complete); 3152 return; 3153 } 3154 } else { 3155 if (spdk_unlikely(bdev_io->internal.orig_iovcnt > 0)) { 3156 _bdev_io_unset_bounce_buf(bdev_io); 3157 } 3158 3159 assert(bdev_ch->io_outstanding > 0); 3160 assert(shared_resource->io_outstanding > 0); 3161 bdev_ch->io_outstanding--; 3162 shared_resource->io_outstanding--; 3163 3164 if (spdk_unlikely(status == SPDK_BDEV_IO_STATUS_NOMEM)) { 3165 TAILQ_INSERT_HEAD(&shared_resource->nomem_io, bdev_io, internal.link); 3166 /* 3167 * Wait for some of the outstanding I/O to complete before we 3168 * retry any of the nomem_io. Normally we will wait for 3169 * NOMEM_THRESHOLD_COUNT I/O to complete but for low queue 3170 * depth channels we will instead wait for half to complete. 3171 */ 3172 shared_resource->nomem_threshold = spdk_max((int64_t)shared_resource->io_outstanding / 2, 3173 (int64_t)shared_resource->io_outstanding - NOMEM_THRESHOLD_COUNT); 3174 return; 3175 } 3176 3177 if (spdk_unlikely(!TAILQ_EMPTY(&shared_resource->nomem_io))) { 3178 _spdk_bdev_ch_retry_io(bdev_ch); 3179 } 3180 } 3181 3182 _spdk_bdev_io_complete(bdev_io); 3183 } 3184 3185 void 3186 spdk_bdev_io_complete_scsi_status(struct spdk_bdev_io *bdev_io, enum spdk_scsi_status sc, 3187 enum spdk_scsi_sense sk, uint8_t asc, uint8_t ascq) 3188 { 3189 if (sc == SPDK_SCSI_STATUS_GOOD) { 3190 bdev_io->internal.status = SPDK_BDEV_IO_STATUS_SUCCESS; 3191 } else { 3192 bdev_io->internal.status = SPDK_BDEV_IO_STATUS_SCSI_ERROR; 3193 bdev_io->internal.error.scsi.sc = sc; 3194 bdev_io->internal.error.scsi.sk = sk; 3195 bdev_io->internal.error.scsi.asc = asc; 3196 bdev_io->internal.error.scsi.ascq = ascq; 3197 } 3198 3199 spdk_bdev_io_complete(bdev_io, bdev_io->internal.status); 3200 } 3201 3202 void 3203 spdk_bdev_io_get_scsi_status(const struct spdk_bdev_io *bdev_io, 3204 int *sc, int *sk, int *asc, int *ascq) 3205 { 3206 assert(sc != NULL); 3207 assert(sk != NULL); 3208 assert(asc != NULL); 3209 assert(ascq != NULL); 3210 3211 switch (bdev_io->internal.status) { 3212 case SPDK_BDEV_IO_STATUS_SUCCESS: 3213 *sc = SPDK_SCSI_STATUS_GOOD; 3214 *sk = SPDK_SCSI_SENSE_NO_SENSE; 3215 *asc = SPDK_SCSI_ASC_NO_ADDITIONAL_SENSE; 3216 *ascq = SPDK_SCSI_ASCQ_CAUSE_NOT_REPORTABLE; 3217 break; 3218 case SPDK_BDEV_IO_STATUS_NVME_ERROR: 3219 spdk_scsi_nvme_translate(bdev_io, sc, sk, asc, ascq); 3220 break; 3221 case SPDK_BDEV_IO_STATUS_SCSI_ERROR: 3222 *sc = bdev_io->internal.error.scsi.sc; 3223 *sk = bdev_io->internal.error.scsi.sk; 3224 *asc = bdev_io->internal.error.scsi.asc; 3225 *ascq = bdev_io->internal.error.scsi.ascq; 3226 break; 3227 default: 3228 *sc = SPDK_SCSI_STATUS_CHECK_CONDITION; 3229 *sk = SPDK_SCSI_SENSE_ABORTED_COMMAND; 3230 *asc = SPDK_SCSI_ASC_NO_ADDITIONAL_SENSE; 3231 *ascq = SPDK_SCSI_ASCQ_CAUSE_NOT_REPORTABLE; 3232 break; 3233 } 3234 } 3235 3236 void 3237 spdk_bdev_io_complete_nvme_status(struct spdk_bdev_io *bdev_io, int sct, int sc) 3238 { 3239 if (sct == SPDK_NVME_SCT_GENERIC && sc == SPDK_NVME_SC_SUCCESS) { 3240 bdev_io->internal.status = SPDK_BDEV_IO_STATUS_SUCCESS; 3241 } else { 3242 bdev_io->internal.error.nvme.sct = sct; 3243 bdev_io->internal.error.nvme.sc = sc; 3244 bdev_io->internal.status = SPDK_BDEV_IO_STATUS_NVME_ERROR; 3245 } 3246 3247 spdk_bdev_io_complete(bdev_io, bdev_io->internal.status); 3248 } 3249 3250 void 3251 spdk_bdev_io_get_nvme_status(const struct spdk_bdev_io *bdev_io, int *sct, int *sc) 3252 { 3253 assert(sct != NULL); 3254 assert(sc != NULL); 3255 3256 if (bdev_io->internal.status == SPDK_BDEV_IO_STATUS_NVME_ERROR) { 3257 *sct = bdev_io->internal.error.nvme.sct; 3258 *sc = bdev_io->internal.error.nvme.sc; 3259 } else if (bdev_io->internal.status == SPDK_BDEV_IO_STATUS_SUCCESS) { 3260 *sct = SPDK_NVME_SCT_GENERIC; 3261 *sc = SPDK_NVME_SC_SUCCESS; 3262 } else { 3263 *sct = SPDK_NVME_SCT_GENERIC; 3264 *sc = SPDK_NVME_SC_INTERNAL_DEVICE_ERROR; 3265 } 3266 } 3267 3268 struct spdk_thread * 3269 spdk_bdev_io_get_thread(struct spdk_bdev_io *bdev_io) 3270 { 3271 return spdk_io_channel_get_thread(bdev_io->internal.ch->channel); 3272 } 3273 3274 static void 3275 _spdk_bdev_qos_config_limit(struct spdk_bdev *bdev, uint64_t *limits) 3276 { 3277 uint64_t min_qos_set; 3278 int i; 3279 3280 for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) { 3281 if (limits[i] != SPDK_BDEV_QOS_LIMIT_NOT_DEFINED) { 3282 break; 3283 } 3284 } 3285 3286 if (i == SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES) { 3287 SPDK_ERRLOG("Invalid rate limits set.\n"); 3288 return; 3289 } 3290 3291 for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) { 3292 if (limits[i] == SPDK_BDEV_QOS_LIMIT_NOT_DEFINED) { 3293 continue; 3294 } 3295 3296 if (_spdk_bdev_qos_is_iops_rate_limit(i) == true) { 3297 min_qos_set = SPDK_BDEV_QOS_MIN_IOS_PER_SEC; 3298 } else { 3299 min_qos_set = SPDK_BDEV_QOS_MIN_BYTES_PER_SEC; 3300 } 3301 3302 if (limits[i] == 0 || limits[i] % min_qos_set) { 3303 SPDK_ERRLOG("Assigned limit %" PRIu64 " on bdev %s is not multiple of %" PRIu64 "\n", 3304 limits[i], bdev->name, min_qos_set); 3305 SPDK_ERRLOG("Failed to enable QoS on this bdev %s\n", bdev->name); 3306 return; 3307 } 3308 } 3309 3310 if (!bdev->internal.qos) { 3311 bdev->internal.qos = calloc(1, sizeof(*bdev->internal.qos)); 3312 if (!bdev->internal.qos) { 3313 SPDK_ERRLOG("Unable to allocate memory for QoS tracking\n"); 3314 return; 3315 } 3316 } 3317 3318 for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) { 3319 bdev->internal.qos->rate_limits[i].limit = limits[i]; 3320 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Bdev:%s QoS type:%d set:%lu\n", 3321 bdev->name, i, limits[i]); 3322 } 3323 3324 return; 3325 } 3326 3327 static void 3328 _spdk_bdev_qos_config(struct spdk_bdev *bdev) 3329 { 3330 struct spdk_conf_section *sp = NULL; 3331 const char *val = NULL; 3332 int i = 0, j = 0; 3333 uint64_t limits[SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES] = {}; 3334 bool config_qos = false; 3335 3336 sp = spdk_conf_find_section(NULL, "QoS"); 3337 if (!sp) { 3338 return; 3339 } 3340 3341 while (j < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES) { 3342 limits[j] = SPDK_BDEV_QOS_LIMIT_NOT_DEFINED; 3343 3344 i = 0; 3345 while (true) { 3346 val = spdk_conf_section_get_nmval(sp, qos_conf_type[j], i, 0); 3347 if (!val) { 3348 break; 3349 } 3350 3351 if (strcmp(bdev->name, val) != 0) { 3352 i++; 3353 continue; 3354 } 3355 3356 val = spdk_conf_section_get_nmval(sp, qos_conf_type[j], i, 1); 3357 if (val) { 3358 if (_spdk_bdev_qos_is_iops_rate_limit(j) == true) { 3359 limits[j] = strtoull(val, NULL, 10); 3360 } else { 3361 limits[j] = strtoull(val, NULL, 10) * 1024 * 1024; 3362 } 3363 config_qos = true; 3364 } 3365 3366 break; 3367 } 3368 3369 j++; 3370 } 3371 3372 if (config_qos == true) { 3373 _spdk_bdev_qos_config_limit(bdev, limits); 3374 } 3375 3376 return; 3377 } 3378 3379 static int 3380 spdk_bdev_init(struct spdk_bdev *bdev) 3381 { 3382 char *bdev_name; 3383 3384 assert(bdev->module != NULL); 3385 3386 if (!bdev->name) { 3387 SPDK_ERRLOG("Bdev name is NULL\n"); 3388 return -EINVAL; 3389 } 3390 3391 if (spdk_bdev_get_by_name(bdev->name)) { 3392 SPDK_ERRLOG("Bdev name:%s already exists\n", bdev->name); 3393 return -EEXIST; 3394 } 3395 3396 /* Users often register their own I/O devices using the bdev name. In 3397 * order to avoid conflicts, prepend bdev_. */ 3398 bdev_name = spdk_sprintf_alloc("bdev_%s", bdev->name); 3399 if (!bdev_name) { 3400 SPDK_ERRLOG("Unable to allocate memory for internal bdev name.\n"); 3401 return -ENOMEM; 3402 } 3403 3404 bdev->internal.status = SPDK_BDEV_STATUS_READY; 3405 bdev->internal.measured_queue_depth = UINT64_MAX; 3406 bdev->internal.claim_module = NULL; 3407 bdev->internal.qd_poller = NULL; 3408 bdev->internal.qos = NULL; 3409 3410 if (spdk_bdev_get_buf_align(bdev) > 1) { 3411 if (bdev->split_on_optimal_io_boundary) { 3412 bdev->optimal_io_boundary = spdk_min(bdev->optimal_io_boundary, 3413 SPDK_BDEV_LARGE_BUF_MAX_SIZE / bdev->blocklen); 3414 } else { 3415 bdev->split_on_optimal_io_boundary = true; 3416 bdev->optimal_io_boundary = SPDK_BDEV_LARGE_BUF_MAX_SIZE / bdev->blocklen; 3417 } 3418 } 3419 3420 TAILQ_INIT(&bdev->internal.open_descs); 3421 3422 TAILQ_INIT(&bdev->aliases); 3423 3424 bdev->internal.reset_in_progress = NULL; 3425 3426 _spdk_bdev_qos_config(bdev); 3427 3428 spdk_io_device_register(__bdev_to_io_dev(bdev), 3429 spdk_bdev_channel_create, spdk_bdev_channel_destroy, 3430 sizeof(struct spdk_bdev_channel), 3431 bdev_name); 3432 3433 free(bdev_name); 3434 3435 pthread_mutex_init(&bdev->internal.mutex, NULL); 3436 return 0; 3437 } 3438 3439 static void 3440 spdk_bdev_destroy_cb(void *io_device) 3441 { 3442 int rc; 3443 struct spdk_bdev *bdev; 3444 spdk_bdev_unregister_cb cb_fn; 3445 void *cb_arg; 3446 3447 bdev = __bdev_from_io_dev(io_device); 3448 cb_fn = bdev->internal.unregister_cb; 3449 cb_arg = bdev->internal.unregister_ctx; 3450 3451 rc = bdev->fn_table->destruct(bdev->ctxt); 3452 if (rc < 0) { 3453 SPDK_ERRLOG("destruct failed\n"); 3454 } 3455 if (rc <= 0 && cb_fn != NULL) { 3456 cb_fn(cb_arg, rc); 3457 } 3458 } 3459 3460 3461 static void 3462 spdk_bdev_fini(struct spdk_bdev *bdev) 3463 { 3464 pthread_mutex_destroy(&bdev->internal.mutex); 3465 3466 free(bdev->internal.qos); 3467 3468 spdk_io_device_unregister(__bdev_to_io_dev(bdev), spdk_bdev_destroy_cb); 3469 } 3470 3471 static void 3472 spdk_bdev_start(struct spdk_bdev *bdev) 3473 { 3474 struct spdk_bdev_module *module; 3475 uint32_t action; 3476 3477 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Inserting bdev %s into list\n", bdev->name); 3478 TAILQ_INSERT_TAIL(&g_bdev_mgr.bdevs, bdev, internal.link); 3479 3480 /* Examine configuration before initializing I/O */ 3481 TAILQ_FOREACH(module, &g_bdev_mgr.bdev_modules, internal.tailq) { 3482 if (module->examine_config) { 3483 action = module->internal.action_in_progress; 3484 module->internal.action_in_progress++; 3485 module->examine_config(bdev); 3486 if (action != module->internal.action_in_progress) { 3487 SPDK_ERRLOG("examine_config for module %s did not call spdk_bdev_module_examine_done()\n", 3488 module->name); 3489 } 3490 } 3491 } 3492 3493 if (bdev->internal.claim_module) { 3494 return; 3495 } 3496 3497 TAILQ_FOREACH(module, &g_bdev_mgr.bdev_modules, internal.tailq) { 3498 if (module->examine_disk) { 3499 module->internal.action_in_progress++; 3500 module->examine_disk(bdev); 3501 } 3502 } 3503 } 3504 3505 int 3506 spdk_bdev_register(struct spdk_bdev *bdev) 3507 { 3508 int rc = spdk_bdev_init(bdev); 3509 3510 if (rc == 0) { 3511 spdk_bdev_start(bdev); 3512 } 3513 3514 return rc; 3515 } 3516 3517 int 3518 spdk_vbdev_register(struct spdk_bdev *vbdev, struct spdk_bdev **base_bdevs, int base_bdev_count) 3519 { 3520 int rc; 3521 3522 rc = spdk_bdev_init(vbdev); 3523 if (rc) { 3524 return rc; 3525 } 3526 3527 spdk_bdev_start(vbdev); 3528 return 0; 3529 } 3530 3531 void 3532 spdk_bdev_destruct_done(struct spdk_bdev *bdev, int bdeverrno) 3533 { 3534 if (bdev->internal.unregister_cb != NULL) { 3535 bdev->internal.unregister_cb(bdev->internal.unregister_ctx, bdeverrno); 3536 } 3537 } 3538 3539 static void 3540 _remove_notify(void *arg) 3541 { 3542 struct spdk_bdev_desc *desc = arg; 3543 3544 desc->remove_scheduled = false; 3545 3546 if (desc->closed) { 3547 free(desc); 3548 } else { 3549 desc->remove_cb(desc->remove_ctx); 3550 } 3551 } 3552 3553 void 3554 spdk_bdev_unregister(struct spdk_bdev *bdev, spdk_bdev_unregister_cb cb_fn, void *cb_arg) 3555 { 3556 struct spdk_bdev_desc *desc, *tmp; 3557 bool do_destruct = true; 3558 struct spdk_thread *thread; 3559 3560 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Removing bdev %s from list\n", bdev->name); 3561 3562 thread = spdk_get_thread(); 3563 if (!thread) { 3564 /* The user called this from a non-SPDK thread. */ 3565 if (cb_fn != NULL) { 3566 cb_fn(cb_arg, -ENOTSUP); 3567 } 3568 return; 3569 } 3570 3571 pthread_mutex_lock(&bdev->internal.mutex); 3572 3573 bdev->internal.status = SPDK_BDEV_STATUS_REMOVING; 3574 bdev->internal.unregister_cb = cb_fn; 3575 bdev->internal.unregister_ctx = cb_arg; 3576 3577 TAILQ_FOREACH_SAFE(desc, &bdev->internal.open_descs, link, tmp) { 3578 if (desc->remove_cb) { 3579 do_destruct = false; 3580 /* 3581 * Defer invocation of the remove_cb to a separate message that will 3582 * run later on its thread. This ensures this context unwinds and 3583 * we don't recursively unregister this bdev again if the remove_cb 3584 * immediately closes its descriptor. 3585 */ 3586 if (!desc->remove_scheduled) { 3587 /* Avoid scheduling removal of the same descriptor multiple times. */ 3588 desc->remove_scheduled = true; 3589 spdk_thread_send_msg(desc->thread, _remove_notify, desc); 3590 } 3591 } 3592 } 3593 3594 if (!do_destruct) { 3595 pthread_mutex_unlock(&bdev->internal.mutex); 3596 return; 3597 } 3598 3599 TAILQ_REMOVE(&g_bdev_mgr.bdevs, bdev, internal.link); 3600 pthread_mutex_unlock(&bdev->internal.mutex); 3601 3602 spdk_bdev_fini(bdev); 3603 } 3604 3605 int 3606 spdk_bdev_open(struct spdk_bdev *bdev, bool write, spdk_bdev_remove_cb_t remove_cb, 3607 void *remove_ctx, struct spdk_bdev_desc **_desc) 3608 { 3609 struct spdk_bdev_desc *desc; 3610 struct spdk_thread *thread; 3611 3612 thread = spdk_get_thread(); 3613 if (!thread) { 3614 SPDK_ERRLOG("Cannot open bdev from non-SPDK thread.\n"); 3615 return -ENOTSUP; 3616 } 3617 3618 desc = calloc(1, sizeof(*desc)); 3619 if (desc == NULL) { 3620 SPDK_ERRLOG("Failed to allocate memory for bdev descriptor\n"); 3621 return -ENOMEM; 3622 } 3623 3624 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Opening descriptor %p for bdev %s on thread %p\n", desc, bdev->name, 3625 spdk_get_thread()); 3626 3627 desc->bdev = bdev; 3628 desc->thread = thread; 3629 desc->remove_cb = remove_cb; 3630 desc->remove_ctx = remove_ctx; 3631 desc->write = write; 3632 *_desc = desc; 3633 3634 pthread_mutex_lock(&bdev->internal.mutex); 3635 3636 if (write && bdev->internal.claim_module) { 3637 SPDK_ERRLOG("Could not open %s - %s module already claimed it\n", 3638 bdev->name, bdev->internal.claim_module->name); 3639 pthread_mutex_unlock(&bdev->internal.mutex); 3640 free(desc); 3641 *_desc = NULL; 3642 return -EPERM; 3643 } 3644 3645 TAILQ_INSERT_TAIL(&bdev->internal.open_descs, desc, link); 3646 3647 pthread_mutex_unlock(&bdev->internal.mutex); 3648 3649 return 0; 3650 } 3651 3652 void 3653 spdk_bdev_close(struct spdk_bdev_desc *desc) 3654 { 3655 struct spdk_bdev *bdev = desc->bdev; 3656 bool do_unregister = false; 3657 3658 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Closing descriptor %p for bdev %s on thread %p\n", desc, bdev->name, 3659 spdk_get_thread()); 3660 3661 assert(desc->thread == spdk_get_thread()); 3662 3663 pthread_mutex_lock(&bdev->internal.mutex); 3664 3665 TAILQ_REMOVE(&bdev->internal.open_descs, desc, link); 3666 3667 desc->closed = true; 3668 3669 if (!desc->remove_scheduled) { 3670 free(desc); 3671 } 3672 3673 /* If no more descriptors, kill QoS channel */ 3674 if (bdev->internal.qos && TAILQ_EMPTY(&bdev->internal.open_descs)) { 3675 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Closed last descriptor for bdev %s on thread %p. Stopping QoS.\n", 3676 bdev->name, spdk_get_thread()); 3677 3678 if (spdk_bdev_qos_destroy(bdev)) { 3679 /* There isn't anything we can do to recover here. Just let the 3680 * old QoS poller keep running. The QoS handling won't change 3681 * cores when the user allocates a new channel, but it won't break. */ 3682 SPDK_ERRLOG("Unable to shut down QoS poller. It will continue running on the current thread.\n"); 3683 } 3684 } 3685 3686 spdk_bdev_set_qd_sampling_period(bdev, 0); 3687 3688 if (bdev->internal.status == SPDK_BDEV_STATUS_REMOVING && TAILQ_EMPTY(&bdev->internal.open_descs)) { 3689 do_unregister = true; 3690 } 3691 pthread_mutex_unlock(&bdev->internal.mutex); 3692 3693 if (do_unregister == true) { 3694 spdk_bdev_unregister(bdev, bdev->internal.unregister_cb, bdev->internal.unregister_ctx); 3695 } 3696 } 3697 3698 int 3699 spdk_bdev_module_claim_bdev(struct spdk_bdev *bdev, struct spdk_bdev_desc *desc, 3700 struct spdk_bdev_module *module) 3701 { 3702 if (bdev->internal.claim_module != NULL) { 3703 SPDK_ERRLOG("bdev %s already claimed by module %s\n", bdev->name, 3704 bdev->internal.claim_module->name); 3705 return -EPERM; 3706 } 3707 3708 if (desc && !desc->write) { 3709 desc->write = true; 3710 } 3711 3712 bdev->internal.claim_module = module; 3713 return 0; 3714 } 3715 3716 void 3717 spdk_bdev_module_release_bdev(struct spdk_bdev *bdev) 3718 { 3719 assert(bdev->internal.claim_module != NULL); 3720 bdev->internal.claim_module = NULL; 3721 } 3722 3723 struct spdk_bdev * 3724 spdk_bdev_desc_get_bdev(struct spdk_bdev_desc *desc) 3725 { 3726 return desc->bdev; 3727 } 3728 3729 void 3730 spdk_bdev_io_get_iovec(struct spdk_bdev_io *bdev_io, struct iovec **iovp, int *iovcntp) 3731 { 3732 struct iovec *iovs; 3733 int iovcnt; 3734 3735 if (bdev_io == NULL) { 3736 return; 3737 } 3738 3739 switch (bdev_io->type) { 3740 case SPDK_BDEV_IO_TYPE_READ: 3741 iovs = bdev_io->u.bdev.iovs; 3742 iovcnt = bdev_io->u.bdev.iovcnt; 3743 break; 3744 case SPDK_BDEV_IO_TYPE_WRITE: 3745 iovs = bdev_io->u.bdev.iovs; 3746 iovcnt = bdev_io->u.bdev.iovcnt; 3747 break; 3748 default: 3749 iovs = NULL; 3750 iovcnt = 0; 3751 break; 3752 } 3753 3754 if (iovp) { 3755 *iovp = iovs; 3756 } 3757 if (iovcntp) { 3758 *iovcntp = iovcnt; 3759 } 3760 } 3761 3762 void 3763 spdk_bdev_module_list_add(struct spdk_bdev_module *bdev_module) 3764 { 3765 3766 if (spdk_bdev_module_list_find(bdev_module->name)) { 3767 SPDK_ERRLOG("ERROR: module '%s' already registered.\n", bdev_module->name); 3768 assert(false); 3769 } 3770 3771 if (bdev_module->async_init) { 3772 bdev_module->internal.action_in_progress = 1; 3773 } 3774 3775 /* 3776 * Modules with examine callbacks must be initialized first, so they are 3777 * ready to handle examine callbacks from later modules that will 3778 * register physical bdevs. 3779 */ 3780 if (bdev_module->examine_config != NULL || bdev_module->examine_disk != NULL) { 3781 TAILQ_INSERT_HEAD(&g_bdev_mgr.bdev_modules, bdev_module, internal.tailq); 3782 } else { 3783 TAILQ_INSERT_TAIL(&g_bdev_mgr.bdev_modules, bdev_module, internal.tailq); 3784 } 3785 } 3786 3787 struct spdk_bdev_module * 3788 spdk_bdev_module_list_find(const char *name) 3789 { 3790 struct spdk_bdev_module *bdev_module; 3791 3792 TAILQ_FOREACH(bdev_module, &g_bdev_mgr.bdev_modules, internal.tailq) { 3793 if (strcmp(name, bdev_module->name) == 0) { 3794 break; 3795 } 3796 } 3797 3798 return bdev_module; 3799 } 3800 3801 static void 3802 _spdk_bdev_write_zero_buffer_next(void *_bdev_io) 3803 { 3804 struct spdk_bdev_io *bdev_io = _bdev_io; 3805 uint64_t num_bytes, num_blocks; 3806 int rc; 3807 3808 num_bytes = spdk_min(spdk_bdev_get_block_size(bdev_io->bdev) * 3809 bdev_io->u.bdev.split_remaining_num_blocks, 3810 ZERO_BUFFER_SIZE); 3811 num_blocks = num_bytes / spdk_bdev_get_block_size(bdev_io->bdev); 3812 3813 rc = spdk_bdev_write_blocks(bdev_io->internal.desc, 3814 spdk_io_channel_from_ctx(bdev_io->internal.ch), 3815 g_bdev_mgr.zero_buffer, 3816 bdev_io->u.bdev.split_current_offset_blocks, num_blocks, 3817 _spdk_bdev_write_zero_buffer_done, bdev_io); 3818 if (rc == 0) { 3819 bdev_io->u.bdev.split_remaining_num_blocks -= num_blocks; 3820 bdev_io->u.bdev.split_current_offset_blocks += num_blocks; 3821 } else if (rc == -ENOMEM) { 3822 _spdk_bdev_queue_io_wait_with_cb(bdev_io, _spdk_bdev_write_zero_buffer_next); 3823 } else { 3824 bdev_io->internal.status = SPDK_BDEV_IO_STATUS_FAILED; 3825 bdev_io->internal.cb(bdev_io, false, bdev_io->internal.caller_ctx); 3826 } 3827 } 3828 3829 static void 3830 _spdk_bdev_write_zero_buffer_done(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg) 3831 { 3832 struct spdk_bdev_io *parent_io = cb_arg; 3833 3834 spdk_bdev_free_io(bdev_io); 3835 3836 if (!success) { 3837 parent_io->internal.status = SPDK_BDEV_IO_STATUS_FAILED; 3838 parent_io->internal.cb(parent_io, false, parent_io->internal.caller_ctx); 3839 return; 3840 } 3841 3842 if (parent_io->u.bdev.split_remaining_num_blocks == 0) { 3843 parent_io->internal.status = SPDK_BDEV_IO_STATUS_SUCCESS; 3844 parent_io->internal.cb(parent_io, true, parent_io->internal.caller_ctx); 3845 return; 3846 } 3847 3848 _spdk_bdev_write_zero_buffer_next(parent_io); 3849 } 3850 3851 struct set_qos_limit_ctx { 3852 void (*cb_fn)(void *cb_arg, int status); 3853 void *cb_arg; 3854 struct spdk_bdev *bdev; 3855 }; 3856 3857 static void 3858 _spdk_bdev_set_qos_limit_done(struct set_qos_limit_ctx *ctx, int status) 3859 { 3860 pthread_mutex_lock(&ctx->bdev->internal.mutex); 3861 ctx->bdev->internal.qos_mod_in_progress = false; 3862 pthread_mutex_unlock(&ctx->bdev->internal.mutex); 3863 3864 ctx->cb_fn(ctx->cb_arg, status); 3865 free(ctx); 3866 } 3867 3868 static void 3869 _spdk_bdev_disable_qos_done(void *cb_arg) 3870 { 3871 struct set_qos_limit_ctx *ctx = cb_arg; 3872 struct spdk_bdev *bdev = ctx->bdev; 3873 struct spdk_bdev_io *bdev_io; 3874 struct spdk_bdev_qos *qos; 3875 3876 pthread_mutex_lock(&bdev->internal.mutex); 3877 qos = bdev->internal.qos; 3878 bdev->internal.qos = NULL; 3879 pthread_mutex_unlock(&bdev->internal.mutex); 3880 3881 while (!TAILQ_EMPTY(&qos->queued)) { 3882 /* Send queued I/O back to their original thread for resubmission. */ 3883 bdev_io = TAILQ_FIRST(&qos->queued); 3884 TAILQ_REMOVE(&qos->queued, bdev_io, internal.link); 3885 3886 if (bdev_io->internal.io_submit_ch) { 3887 /* 3888 * Channel was changed when sending it to the QoS thread - change it back 3889 * before sending it back to the original thread. 3890 */ 3891 bdev_io->internal.ch = bdev_io->internal.io_submit_ch; 3892 bdev_io->internal.io_submit_ch = NULL; 3893 } 3894 3895 spdk_thread_send_msg(spdk_io_channel_get_thread(bdev_io->internal.ch->channel), 3896 _spdk_bdev_io_submit, bdev_io); 3897 } 3898 3899 if (qos->thread != NULL) { 3900 spdk_put_io_channel(spdk_io_channel_from_ctx(qos->ch)); 3901 spdk_poller_unregister(&qos->poller); 3902 } 3903 3904 free(qos); 3905 3906 _spdk_bdev_set_qos_limit_done(ctx, 0); 3907 } 3908 3909 static void 3910 _spdk_bdev_disable_qos_msg_done(struct spdk_io_channel_iter *i, int status) 3911 { 3912 void *io_device = spdk_io_channel_iter_get_io_device(i); 3913 struct spdk_bdev *bdev = __bdev_from_io_dev(io_device); 3914 struct set_qos_limit_ctx *ctx = spdk_io_channel_iter_get_ctx(i); 3915 struct spdk_thread *thread; 3916 3917 pthread_mutex_lock(&bdev->internal.mutex); 3918 thread = bdev->internal.qos->thread; 3919 pthread_mutex_unlock(&bdev->internal.mutex); 3920 3921 if (thread != NULL) { 3922 spdk_thread_send_msg(thread, _spdk_bdev_disable_qos_done, ctx); 3923 } else { 3924 _spdk_bdev_disable_qos_done(ctx); 3925 } 3926 } 3927 3928 static void 3929 _spdk_bdev_disable_qos_msg(struct spdk_io_channel_iter *i) 3930 { 3931 struct spdk_io_channel *ch = spdk_io_channel_iter_get_channel(i); 3932 struct spdk_bdev_channel *bdev_ch = spdk_io_channel_get_ctx(ch); 3933 3934 bdev_ch->flags &= ~BDEV_CH_QOS_ENABLED; 3935 3936 spdk_for_each_channel_continue(i, 0); 3937 } 3938 3939 static void 3940 _spdk_bdev_update_qos_rate_limit_msg(void *cb_arg) 3941 { 3942 struct set_qos_limit_ctx *ctx = cb_arg; 3943 struct spdk_bdev *bdev = ctx->bdev; 3944 3945 pthread_mutex_lock(&bdev->internal.mutex); 3946 spdk_bdev_qos_update_max_quota_per_timeslice(bdev->internal.qos); 3947 pthread_mutex_unlock(&bdev->internal.mutex); 3948 3949 _spdk_bdev_set_qos_limit_done(ctx, 0); 3950 } 3951 3952 static void 3953 _spdk_bdev_enable_qos_msg(struct spdk_io_channel_iter *i) 3954 { 3955 void *io_device = spdk_io_channel_iter_get_io_device(i); 3956 struct spdk_bdev *bdev = __bdev_from_io_dev(io_device); 3957 struct spdk_io_channel *ch = spdk_io_channel_iter_get_channel(i); 3958 struct spdk_bdev_channel *bdev_ch = spdk_io_channel_get_ctx(ch); 3959 3960 pthread_mutex_lock(&bdev->internal.mutex); 3961 _spdk_bdev_enable_qos(bdev, bdev_ch); 3962 pthread_mutex_unlock(&bdev->internal.mutex); 3963 spdk_for_each_channel_continue(i, 0); 3964 } 3965 3966 static void 3967 _spdk_bdev_enable_qos_done(struct spdk_io_channel_iter *i, int status) 3968 { 3969 struct set_qos_limit_ctx *ctx = spdk_io_channel_iter_get_ctx(i); 3970 3971 _spdk_bdev_set_qos_limit_done(ctx, status); 3972 } 3973 3974 static void 3975 _spdk_bdev_set_qos_rate_limits(struct spdk_bdev *bdev, uint64_t *limits) 3976 { 3977 int i; 3978 3979 assert(bdev->internal.qos != NULL); 3980 3981 for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) { 3982 if (limits[i] != SPDK_BDEV_QOS_LIMIT_NOT_DEFINED) { 3983 bdev->internal.qos->rate_limits[i].limit = limits[i]; 3984 3985 if (limits[i] == 0) { 3986 bdev->internal.qos->rate_limits[i].limit = 3987 SPDK_BDEV_QOS_LIMIT_NOT_DEFINED; 3988 } 3989 } 3990 } 3991 } 3992 3993 void 3994 spdk_bdev_set_qos_rate_limits(struct spdk_bdev *bdev, uint64_t *limits, 3995 void (*cb_fn)(void *cb_arg, int status), void *cb_arg) 3996 { 3997 struct set_qos_limit_ctx *ctx; 3998 uint32_t limit_set_complement; 3999 uint64_t min_limit_per_sec; 4000 int i; 4001 bool disable_rate_limit = true; 4002 4003 for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) { 4004 if (limits[i] == SPDK_BDEV_QOS_LIMIT_NOT_DEFINED) { 4005 continue; 4006 } 4007 4008 if (limits[i] > 0) { 4009 disable_rate_limit = false; 4010 } 4011 4012 if (_spdk_bdev_qos_is_iops_rate_limit(i) == true) { 4013 min_limit_per_sec = SPDK_BDEV_QOS_MIN_IOS_PER_SEC; 4014 } else { 4015 /* Change from megabyte to byte rate limit */ 4016 limits[i] = limits[i] * 1024 * 1024; 4017 min_limit_per_sec = SPDK_BDEV_QOS_MIN_BYTES_PER_SEC; 4018 } 4019 4020 limit_set_complement = limits[i] % min_limit_per_sec; 4021 if (limit_set_complement) { 4022 SPDK_ERRLOG("Requested rate limit %" PRIu64 " is not a multiple of %" PRIu64 "\n", 4023 limits[i], min_limit_per_sec); 4024 limits[i] += min_limit_per_sec - limit_set_complement; 4025 SPDK_ERRLOG("Round up the rate limit to %" PRIu64 "\n", limits[i]); 4026 } 4027 } 4028 4029 ctx = calloc(1, sizeof(*ctx)); 4030 if (ctx == NULL) { 4031 cb_fn(cb_arg, -ENOMEM); 4032 return; 4033 } 4034 4035 ctx->cb_fn = cb_fn; 4036 ctx->cb_arg = cb_arg; 4037 ctx->bdev = bdev; 4038 4039 pthread_mutex_lock(&bdev->internal.mutex); 4040 if (bdev->internal.qos_mod_in_progress) { 4041 pthread_mutex_unlock(&bdev->internal.mutex); 4042 free(ctx); 4043 cb_fn(cb_arg, -EAGAIN); 4044 return; 4045 } 4046 bdev->internal.qos_mod_in_progress = true; 4047 4048 if (disable_rate_limit == true && bdev->internal.qos) { 4049 for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) { 4050 if (limits[i] == SPDK_BDEV_QOS_LIMIT_NOT_DEFINED && 4051 (bdev->internal.qos->rate_limits[i].limit > 0 && 4052 bdev->internal.qos->rate_limits[i].limit != 4053 SPDK_BDEV_QOS_LIMIT_NOT_DEFINED)) { 4054 disable_rate_limit = false; 4055 break; 4056 } 4057 } 4058 } 4059 4060 if (disable_rate_limit == false) { 4061 if (bdev->internal.qos == NULL) { 4062 bdev->internal.qos = calloc(1, sizeof(*bdev->internal.qos)); 4063 if (!bdev->internal.qos) { 4064 pthread_mutex_unlock(&bdev->internal.mutex); 4065 SPDK_ERRLOG("Unable to allocate memory for QoS tracking\n"); 4066 free(ctx); 4067 cb_fn(cb_arg, -ENOMEM); 4068 return; 4069 } 4070 } 4071 4072 if (bdev->internal.qos->thread == NULL) { 4073 /* Enabling */ 4074 _spdk_bdev_set_qos_rate_limits(bdev, limits); 4075 4076 spdk_for_each_channel(__bdev_to_io_dev(bdev), 4077 _spdk_bdev_enable_qos_msg, ctx, 4078 _spdk_bdev_enable_qos_done); 4079 } else { 4080 /* Updating */ 4081 _spdk_bdev_set_qos_rate_limits(bdev, limits); 4082 4083 spdk_thread_send_msg(bdev->internal.qos->thread, 4084 _spdk_bdev_update_qos_rate_limit_msg, ctx); 4085 } 4086 } else { 4087 if (bdev->internal.qos != NULL) { 4088 _spdk_bdev_set_qos_rate_limits(bdev, limits); 4089 4090 /* Disabling */ 4091 spdk_for_each_channel(__bdev_to_io_dev(bdev), 4092 _spdk_bdev_disable_qos_msg, ctx, 4093 _spdk_bdev_disable_qos_msg_done); 4094 } else { 4095 pthread_mutex_unlock(&bdev->internal.mutex); 4096 _spdk_bdev_set_qos_limit_done(ctx, 0); 4097 return; 4098 } 4099 } 4100 4101 pthread_mutex_unlock(&bdev->internal.mutex); 4102 } 4103 4104 SPDK_LOG_REGISTER_COMPONENT("bdev", SPDK_LOG_BDEV) 4105 4106 SPDK_TRACE_REGISTER_FN(bdev_trace, "bdev", TRACE_GROUP_BDEV) 4107 { 4108 spdk_trace_register_owner(OWNER_BDEV, 'b'); 4109 spdk_trace_register_object(OBJECT_BDEV_IO, 'i'); 4110 spdk_trace_register_description("BDEV_IO_START", "", TRACE_BDEV_IO_START, OWNER_BDEV, 4111 OBJECT_BDEV_IO, 1, 0, "type: "); 4112 spdk_trace_register_description("BDEV_IO_DONE", "", TRACE_BDEV_IO_DONE, OWNER_BDEV, 4113 OBJECT_BDEV_IO, 0, 0, ""); 4114 } 4115