1 /*- 2 * BSD LICENSE 3 * 4 * Copyright (c) Intel Corporation. 5 * All rights reserved. 6 * 7 * Redistribution and use in source and binary forms, with or without 8 * modification, are permitted provided that the following conditions 9 * are met: 10 * 11 * * Redistributions of source code must retain the above copyright 12 * notice, this list of conditions and the following disclaimer. 13 * * Redistributions in binary form must reproduce the above copyright 14 * notice, this list of conditions and the following disclaimer in 15 * the documentation and/or other materials provided with the 16 * distribution. 17 * * Neither the name of Intel Corporation nor the names of its 18 * contributors may be used to endorse or promote products derived 19 * from this software without specific prior written permission. 20 * 21 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 22 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 23 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR 24 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT 25 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, 26 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT 27 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 28 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY 29 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 30 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE 31 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 32 */ 33 34 #include "spdk/stdinc.h" 35 36 #include "spdk/bdev.h" 37 #include "spdk/conf.h" 38 39 #include "spdk/config.h" 40 #include "spdk/env.h" 41 #include "spdk/event.h" 42 #include "spdk/thread.h" 43 #include "spdk/likely.h" 44 #include "spdk/queue.h" 45 #include "spdk/nvme_spec.h" 46 #include "spdk/scsi_spec.h" 47 #include "spdk/util.h" 48 #include "spdk/trace.h" 49 50 #include "spdk/bdev_module.h" 51 #include "spdk_internal/log.h" 52 #include "spdk/string.h" 53 54 #ifdef SPDK_CONFIG_VTUNE 55 #include "ittnotify.h" 56 #include "ittnotify_types.h" 57 int __itt_init_ittlib(const char *, __itt_group_id); 58 #endif 59 60 #define SPDK_BDEV_IO_POOL_SIZE (64 * 1024) 61 #define SPDK_BDEV_IO_CACHE_SIZE 256 62 #define BUF_SMALL_POOL_SIZE 8192 63 #define BUF_LARGE_POOL_SIZE 1024 64 #define NOMEM_THRESHOLD_COUNT 8 65 #define ZERO_BUFFER_SIZE 0x100000 66 67 #define OWNER_BDEV 0x2 68 69 #define OBJECT_BDEV_IO 0x2 70 71 #define TRACE_GROUP_BDEV 0x3 72 #define TRACE_BDEV_IO_START SPDK_TPOINT_ID(TRACE_GROUP_BDEV, 0x0) 73 #define TRACE_BDEV_IO_DONE SPDK_TPOINT_ID(TRACE_GROUP_BDEV, 0x1) 74 75 #define SPDK_BDEV_QOS_TIMESLICE_IN_USEC 1000 76 #define SPDK_BDEV_QOS_MIN_IO_PER_TIMESLICE 1 77 #define SPDK_BDEV_QOS_MIN_BYTE_PER_TIMESLICE 512 78 #define SPDK_BDEV_QOS_MIN_IOS_PER_SEC 10000 79 #define SPDK_BDEV_QOS_MIN_BYTES_PER_SEC (10 * 1024 * 1024) 80 #define SPDK_BDEV_QOS_LIMIT_NOT_DEFINED UINT64_MAX 81 82 #define SPDK_BDEV_POOL_ALIGNMENT 512 83 84 static const char *qos_conf_type[] = {"Limit_IOPS", "Limit_BPS"}; 85 static const char *qos_rpc_type[] = {"rw_ios_per_sec", "rw_mbytes_per_sec"}; 86 87 TAILQ_HEAD(spdk_bdev_list, spdk_bdev); 88 89 struct spdk_bdev_mgr { 90 struct spdk_mempool *bdev_io_pool; 91 92 struct spdk_mempool *buf_small_pool; 93 struct spdk_mempool *buf_large_pool; 94 95 void *zero_buffer; 96 97 TAILQ_HEAD(bdev_module_list, spdk_bdev_module) bdev_modules; 98 99 struct spdk_bdev_list bdevs; 100 101 bool init_complete; 102 bool module_init_complete; 103 104 #ifdef SPDK_CONFIG_VTUNE 105 __itt_domain *domain; 106 #endif 107 }; 108 109 static struct spdk_bdev_mgr g_bdev_mgr = { 110 .bdev_modules = TAILQ_HEAD_INITIALIZER(g_bdev_mgr.bdev_modules), 111 .bdevs = TAILQ_HEAD_INITIALIZER(g_bdev_mgr.bdevs), 112 .init_complete = false, 113 .module_init_complete = false, 114 }; 115 116 static struct spdk_bdev_opts g_bdev_opts = { 117 .bdev_io_pool_size = SPDK_BDEV_IO_POOL_SIZE, 118 .bdev_io_cache_size = SPDK_BDEV_IO_CACHE_SIZE, 119 }; 120 121 static spdk_bdev_init_cb g_init_cb_fn = NULL; 122 static void *g_init_cb_arg = NULL; 123 124 static spdk_bdev_fini_cb g_fini_cb_fn = NULL; 125 static void *g_fini_cb_arg = NULL; 126 static struct spdk_thread *g_fini_thread = NULL; 127 128 struct spdk_bdev_qos_limit { 129 /** IOs or bytes allowed per second (i.e., 1s). */ 130 uint64_t limit; 131 132 /** Remaining IOs or bytes allowed in current timeslice (e.g., 1ms). 133 * For remaining bytes, allowed to run negative if an I/O is submitted when 134 * some bytes are remaining, but the I/O is bigger than that amount. The 135 * excess will be deducted from the next timeslice. 136 */ 137 int64_t remaining_this_timeslice; 138 139 /** Minimum allowed IOs or bytes to be issued in one timeslice (e.g., 1ms). */ 140 uint32_t min_per_timeslice; 141 142 /** Maximum allowed IOs or bytes to be issued in one timeslice (e.g., 1ms). */ 143 uint32_t max_per_timeslice; 144 }; 145 146 struct spdk_bdev_qos { 147 /** Types of structure of rate limits. */ 148 struct spdk_bdev_qos_limit rate_limits[SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES]; 149 150 /** The channel that all I/O are funneled through. */ 151 struct spdk_bdev_channel *ch; 152 153 /** The thread on which the poller is running. */ 154 struct spdk_thread *thread; 155 156 /** Queue of I/O waiting to be issued. */ 157 bdev_io_tailq_t queued; 158 159 /** Size of a timeslice in tsc ticks. */ 160 uint64_t timeslice_size; 161 162 /** Timestamp of start of last timeslice. */ 163 uint64_t last_timeslice; 164 165 /** Poller that processes queued I/O commands each time slice. */ 166 struct spdk_poller *poller; 167 }; 168 169 struct spdk_bdev_mgmt_channel { 170 bdev_io_stailq_t need_buf_small; 171 bdev_io_stailq_t need_buf_large; 172 173 /* 174 * Each thread keeps a cache of bdev_io - this allows 175 * bdev threads which are *not* DPDK threads to still 176 * benefit from a per-thread bdev_io cache. Without 177 * this, non-DPDK threads fetching from the mempool 178 * incur a cmpxchg on get and put. 179 */ 180 bdev_io_stailq_t per_thread_cache; 181 uint32_t per_thread_cache_count; 182 uint32_t bdev_io_cache_size; 183 184 TAILQ_HEAD(, spdk_bdev_shared_resource) shared_resources; 185 TAILQ_HEAD(, spdk_bdev_io_wait_entry) io_wait_queue; 186 }; 187 188 /* 189 * Per-module (or per-io_device) data. Multiple bdevs built on the same io_device 190 * will queue here their IO that awaits retry. It makes it possible to retry sending 191 * IO to one bdev after IO from other bdev completes. 192 */ 193 struct spdk_bdev_shared_resource { 194 /* The bdev management channel */ 195 struct spdk_bdev_mgmt_channel *mgmt_ch; 196 197 /* 198 * Count of I/O submitted to bdev module and waiting for completion. 199 * Incremented before submit_request() is called on an spdk_bdev_io. 200 */ 201 uint64_t io_outstanding; 202 203 /* 204 * Queue of IO awaiting retry because of a previous NOMEM status returned 205 * on this channel. 206 */ 207 bdev_io_tailq_t nomem_io; 208 209 /* 210 * Threshold which io_outstanding must drop to before retrying nomem_io. 211 */ 212 uint64_t nomem_threshold; 213 214 /* I/O channel allocated by a bdev module */ 215 struct spdk_io_channel *shared_ch; 216 217 /* Refcount of bdev channels using this resource */ 218 uint32_t ref; 219 220 TAILQ_ENTRY(spdk_bdev_shared_resource) link; 221 }; 222 223 #define BDEV_CH_RESET_IN_PROGRESS (1 << 0) 224 #define BDEV_CH_QOS_ENABLED (1 << 1) 225 226 struct spdk_bdev_channel { 227 struct spdk_bdev *bdev; 228 229 /* The channel for the underlying device */ 230 struct spdk_io_channel *channel; 231 232 /* Per io_device per thread data */ 233 struct spdk_bdev_shared_resource *shared_resource; 234 235 struct spdk_bdev_io_stat stat; 236 237 /* 238 * Count of I/O submitted through this channel and waiting for completion. 239 * Incremented before submit_request() is called on an spdk_bdev_io. 240 */ 241 uint64_t io_outstanding; 242 243 bdev_io_tailq_t queued_resets; 244 245 uint32_t flags; 246 247 #ifdef SPDK_CONFIG_VTUNE 248 uint64_t start_tsc; 249 uint64_t interval_tsc; 250 __itt_string_handle *handle; 251 struct spdk_bdev_io_stat prev_stat; 252 #endif 253 254 }; 255 256 struct spdk_bdev_desc { 257 struct spdk_bdev *bdev; 258 struct spdk_thread *thread; 259 spdk_bdev_remove_cb_t remove_cb; 260 void *remove_ctx; 261 bool remove_scheduled; 262 bool closed; 263 bool write; 264 TAILQ_ENTRY(spdk_bdev_desc) link; 265 }; 266 267 struct spdk_bdev_iostat_ctx { 268 struct spdk_bdev_io_stat *stat; 269 spdk_bdev_get_device_stat_cb cb; 270 void *cb_arg; 271 }; 272 273 #define __bdev_to_io_dev(bdev) (((char *)bdev) + 1) 274 #define __bdev_from_io_dev(io_dev) ((struct spdk_bdev *)(((char *)io_dev) - 1)) 275 276 static void _spdk_bdev_write_zero_buffer_done(struct spdk_bdev_io *bdev_io, bool success, 277 void *cb_arg); 278 static void _spdk_bdev_write_zero_buffer_next(void *_bdev_io); 279 280 void 281 spdk_bdev_get_opts(struct spdk_bdev_opts *opts) 282 { 283 *opts = g_bdev_opts; 284 } 285 286 int 287 spdk_bdev_set_opts(struct spdk_bdev_opts *opts) 288 { 289 uint32_t min_pool_size; 290 291 /* 292 * Add 1 to the thread count to account for the extra mgmt_ch that gets created during subsystem 293 * initialization. A second mgmt_ch will be created on the same thread when the application starts 294 * but before the deferred put_io_channel event is executed for the first mgmt_ch. 295 */ 296 min_pool_size = opts->bdev_io_cache_size * (spdk_thread_get_count() + 1); 297 if (opts->bdev_io_pool_size < min_pool_size) { 298 SPDK_ERRLOG("bdev_io_pool_size %" PRIu32 " is not compatible with bdev_io_cache_size %" PRIu32 299 " and %" PRIu32 " threads\n", opts->bdev_io_pool_size, opts->bdev_io_cache_size, 300 spdk_thread_get_count()); 301 SPDK_ERRLOG("bdev_io_pool_size must be at least %" PRIu32 "\n", min_pool_size); 302 return -1; 303 } 304 305 g_bdev_opts = *opts; 306 return 0; 307 } 308 309 struct spdk_bdev * 310 spdk_bdev_first(void) 311 { 312 struct spdk_bdev *bdev; 313 314 bdev = TAILQ_FIRST(&g_bdev_mgr.bdevs); 315 if (bdev) { 316 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Starting bdev iteration at %s\n", bdev->name); 317 } 318 319 return bdev; 320 } 321 322 struct spdk_bdev * 323 spdk_bdev_next(struct spdk_bdev *prev) 324 { 325 struct spdk_bdev *bdev; 326 327 bdev = TAILQ_NEXT(prev, internal.link); 328 if (bdev) { 329 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Continuing bdev iteration at %s\n", bdev->name); 330 } 331 332 return bdev; 333 } 334 335 static struct spdk_bdev * 336 _bdev_next_leaf(struct spdk_bdev *bdev) 337 { 338 while (bdev != NULL) { 339 if (bdev->internal.claim_module == NULL) { 340 return bdev; 341 } else { 342 bdev = TAILQ_NEXT(bdev, internal.link); 343 } 344 } 345 346 return bdev; 347 } 348 349 struct spdk_bdev * 350 spdk_bdev_first_leaf(void) 351 { 352 struct spdk_bdev *bdev; 353 354 bdev = _bdev_next_leaf(TAILQ_FIRST(&g_bdev_mgr.bdevs)); 355 356 if (bdev) { 357 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Starting bdev iteration at %s\n", bdev->name); 358 } 359 360 return bdev; 361 } 362 363 struct spdk_bdev * 364 spdk_bdev_next_leaf(struct spdk_bdev *prev) 365 { 366 struct spdk_bdev *bdev; 367 368 bdev = _bdev_next_leaf(TAILQ_NEXT(prev, internal.link)); 369 370 if (bdev) { 371 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Continuing bdev iteration at %s\n", bdev->name); 372 } 373 374 return bdev; 375 } 376 377 struct spdk_bdev * 378 spdk_bdev_get_by_name(const char *bdev_name) 379 { 380 struct spdk_bdev_alias *tmp; 381 struct spdk_bdev *bdev = spdk_bdev_first(); 382 383 while (bdev != NULL) { 384 if (strcmp(bdev_name, bdev->name) == 0) { 385 return bdev; 386 } 387 388 TAILQ_FOREACH(tmp, &bdev->aliases, tailq) { 389 if (strcmp(bdev_name, tmp->alias) == 0) { 390 return bdev; 391 } 392 } 393 394 bdev = spdk_bdev_next(bdev); 395 } 396 397 return NULL; 398 } 399 400 void 401 spdk_bdev_io_set_buf(struct spdk_bdev_io *bdev_io, void *buf, size_t len) 402 { 403 struct iovec *iovs; 404 405 iovs = bdev_io->u.bdev.iovs; 406 407 assert(iovs != NULL); 408 assert(bdev_io->u.bdev.iovcnt >= 1); 409 410 iovs[0].iov_base = buf; 411 iovs[0].iov_len = len; 412 } 413 414 static bool 415 _is_buf_allocated(struct iovec *iovs) 416 { 417 return iovs[0].iov_base != NULL; 418 } 419 420 static bool 421 _are_iovs_aligned(struct iovec *iovs, int iovcnt, uint32_t alignment) 422 { 423 int i; 424 uintptr_t iov_base; 425 426 if (spdk_likely(alignment == 1)) { 427 return true; 428 } 429 430 for (i = 0; i < iovcnt; i++) { 431 iov_base = (uintptr_t)iovs[i].iov_base; 432 if ((iov_base & (alignment - 1)) != 0) { 433 return false; 434 } 435 } 436 437 return true; 438 } 439 440 static void 441 _copy_iovs_to_buf(void *buf, size_t buf_len, struct iovec *iovs, int iovcnt) 442 { 443 int i; 444 size_t len; 445 446 for (i = 0; i < iovcnt; i++) { 447 len = spdk_min(iovs[i].iov_len, buf_len); 448 memcpy(buf, iovs[i].iov_base, len); 449 buf += len; 450 buf_len -= len; 451 } 452 } 453 454 static void 455 _copy_buf_to_iovs(struct iovec *iovs, int iovcnt, void *buf, size_t buf_len) 456 { 457 int i; 458 size_t len; 459 460 for (i = 0; i < iovcnt; i++) { 461 len = spdk_min(iovs[i].iov_len, buf_len); 462 memcpy(iovs[i].iov_base, buf, len); 463 buf += len; 464 buf_len -= len; 465 } 466 } 467 468 static void 469 _bdev_io_set_bounce_buf(struct spdk_bdev_io *bdev_io, void *buf, size_t len) 470 { 471 /* save original iovec */ 472 bdev_io->internal.orig_iovs = bdev_io->u.bdev.iovs; 473 bdev_io->internal.orig_iovcnt = bdev_io->u.bdev.iovcnt; 474 /* set bounce iov */ 475 bdev_io->u.bdev.iovs = &bdev_io->internal.bounce_iov; 476 bdev_io->u.bdev.iovcnt = 1; 477 /* set bounce buffer for this operation */ 478 bdev_io->u.bdev.iovs[0].iov_base = buf; 479 bdev_io->u.bdev.iovs[0].iov_len = len; 480 /* if this is write path, copy data from original buffer to bounce buffer */ 481 if (bdev_io->type == SPDK_BDEV_IO_TYPE_WRITE) { 482 _copy_iovs_to_buf(buf, len, bdev_io->internal.orig_iovs, bdev_io->internal.orig_iovcnt); 483 } 484 } 485 486 static void 487 spdk_bdev_io_put_buf(struct spdk_bdev_io *bdev_io) 488 { 489 struct spdk_mempool *pool; 490 struct spdk_bdev_io *tmp; 491 void *buf, *aligned_buf; 492 bdev_io_stailq_t *stailq; 493 struct spdk_bdev_mgmt_channel *ch; 494 uint64_t buf_len; 495 uint64_t alignment; 496 bool buf_allocated; 497 498 buf = bdev_io->internal.buf; 499 buf_len = bdev_io->internal.buf_len; 500 alignment = spdk_bdev_get_buf_align(bdev_io->bdev); 501 ch = bdev_io->internal.ch->shared_resource->mgmt_ch; 502 503 bdev_io->internal.buf = NULL; 504 505 if (buf_len + alignment <= SPDK_BDEV_SMALL_BUF_MAX_SIZE + SPDK_BDEV_POOL_ALIGNMENT) { 506 pool = g_bdev_mgr.buf_small_pool; 507 stailq = &ch->need_buf_small; 508 } else { 509 pool = g_bdev_mgr.buf_large_pool; 510 stailq = &ch->need_buf_large; 511 } 512 513 if (STAILQ_EMPTY(stailq)) { 514 spdk_mempool_put(pool, buf); 515 } else { 516 tmp = STAILQ_FIRST(stailq); 517 518 alignment = spdk_bdev_get_buf_align(tmp->bdev); 519 buf_allocated = _is_buf_allocated(tmp->u.bdev.iovs); 520 521 aligned_buf = (void *)(((uintptr_t)buf + 522 (alignment - 1)) & ~(alignment - 1)); 523 if (buf_allocated) { 524 _bdev_io_set_bounce_buf(tmp, aligned_buf, tmp->internal.buf_len); 525 } else { 526 spdk_bdev_io_set_buf(tmp, aligned_buf, tmp->internal.buf_len); 527 } 528 529 STAILQ_REMOVE_HEAD(stailq, internal.buf_link); 530 tmp->internal.buf = buf; 531 tmp->internal.get_buf_cb(tmp->internal.ch->channel, tmp); 532 } 533 } 534 535 static void 536 _bdev_io_unset_bounce_buf(struct spdk_bdev_io *bdev_io) 537 { 538 /* if this is read path, copy data from bounce buffer to original buffer */ 539 if (bdev_io->type == SPDK_BDEV_IO_TYPE_READ && 540 bdev_io->internal.status == SPDK_BDEV_IO_STATUS_SUCCESS) { 541 _copy_buf_to_iovs(bdev_io->internal.orig_iovs, bdev_io->internal.orig_iovcnt, 542 bdev_io->internal.bounce_iov.iov_base, bdev_io->internal.bounce_iov.iov_len); 543 } 544 /* set orignal buffer for this io */ 545 bdev_io->u.bdev.iovcnt = bdev_io->internal.orig_iovcnt; 546 bdev_io->u.bdev.iovs = bdev_io->internal.orig_iovs; 547 /* disable bouncing buffer for this io */ 548 bdev_io->internal.orig_iovcnt = 0; 549 bdev_io->internal.orig_iovs = NULL; 550 /* return bounce buffer to the pool */ 551 spdk_bdev_io_put_buf(bdev_io); 552 } 553 554 void 555 spdk_bdev_io_get_buf(struct spdk_bdev_io *bdev_io, spdk_bdev_io_get_buf_cb cb, uint64_t len) 556 { 557 struct spdk_mempool *pool; 558 bdev_io_stailq_t *stailq; 559 void *buf, *aligned_buf; 560 struct spdk_bdev_mgmt_channel *mgmt_ch; 561 uint64_t alignment; 562 bool buf_allocated; 563 564 assert(cb != NULL); 565 assert(bdev_io->u.bdev.iovs != NULL); 566 567 alignment = spdk_bdev_get_buf_align(bdev_io->bdev); 568 buf_allocated = _is_buf_allocated(bdev_io->u.bdev.iovs); 569 570 if (buf_allocated && 571 _are_iovs_aligned(bdev_io->u.bdev.iovs, bdev_io->u.bdev.iovcnt, alignment)) { 572 /* Buffer already present and aligned */ 573 cb(bdev_io->internal.ch->channel, bdev_io); 574 return; 575 } 576 577 assert(len + alignment <= SPDK_BDEV_LARGE_BUF_MAX_SIZE + SPDK_BDEV_POOL_ALIGNMENT); 578 mgmt_ch = bdev_io->internal.ch->shared_resource->mgmt_ch; 579 580 bdev_io->internal.buf_len = len; 581 bdev_io->internal.get_buf_cb = cb; 582 583 if (len + alignment <= SPDK_BDEV_SMALL_BUF_MAX_SIZE + SPDK_BDEV_POOL_ALIGNMENT) { 584 pool = g_bdev_mgr.buf_small_pool; 585 stailq = &mgmt_ch->need_buf_small; 586 } else { 587 pool = g_bdev_mgr.buf_large_pool; 588 stailq = &mgmt_ch->need_buf_large; 589 } 590 591 buf = spdk_mempool_get(pool); 592 593 if (!buf) { 594 STAILQ_INSERT_TAIL(stailq, bdev_io, internal.buf_link); 595 } else { 596 aligned_buf = (void *)(((uintptr_t)buf + (alignment - 1)) & ~(alignment - 1)); 597 598 if (buf_allocated) { 599 _bdev_io_set_bounce_buf(bdev_io, aligned_buf, len); 600 } else { 601 spdk_bdev_io_set_buf(bdev_io, aligned_buf, len); 602 } 603 bdev_io->internal.buf = buf; 604 bdev_io->internal.get_buf_cb(bdev_io->internal.ch->channel, bdev_io); 605 } 606 } 607 608 static int 609 spdk_bdev_module_get_max_ctx_size(void) 610 { 611 struct spdk_bdev_module *bdev_module; 612 int max_bdev_module_size = 0; 613 614 TAILQ_FOREACH(bdev_module, &g_bdev_mgr.bdev_modules, internal.tailq) { 615 if (bdev_module->get_ctx_size && bdev_module->get_ctx_size() > max_bdev_module_size) { 616 max_bdev_module_size = bdev_module->get_ctx_size(); 617 } 618 } 619 620 return max_bdev_module_size; 621 } 622 623 void 624 spdk_bdev_config_text(FILE *fp) 625 { 626 struct spdk_bdev_module *bdev_module; 627 628 TAILQ_FOREACH(bdev_module, &g_bdev_mgr.bdev_modules, internal.tailq) { 629 if (bdev_module->config_text) { 630 bdev_module->config_text(fp); 631 } 632 } 633 } 634 635 static void 636 spdk_bdev_qos_config_json(struct spdk_bdev *bdev, struct spdk_json_write_ctx *w) 637 { 638 int i; 639 struct spdk_bdev_qos *qos = bdev->internal.qos; 640 uint64_t limits[SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES]; 641 642 if (!qos) { 643 return; 644 } 645 646 spdk_bdev_get_qos_rate_limits(bdev, limits); 647 648 spdk_json_write_object_begin(w); 649 spdk_json_write_named_string(w, "method", "set_bdev_qos_limit"); 650 spdk_json_write_name(w, "params"); 651 652 spdk_json_write_object_begin(w); 653 spdk_json_write_named_string(w, "name", bdev->name); 654 for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) { 655 if (limits[i] > 0) { 656 spdk_json_write_named_uint64(w, qos_rpc_type[i], limits[i]); 657 } 658 } 659 spdk_json_write_object_end(w); 660 661 spdk_json_write_object_end(w); 662 } 663 664 void 665 spdk_bdev_subsystem_config_json(struct spdk_json_write_ctx *w) 666 { 667 struct spdk_bdev_module *bdev_module; 668 struct spdk_bdev *bdev; 669 670 assert(w != NULL); 671 672 spdk_json_write_array_begin(w); 673 674 spdk_json_write_object_begin(w); 675 spdk_json_write_named_string(w, "method", "set_bdev_options"); 676 spdk_json_write_name(w, "params"); 677 spdk_json_write_object_begin(w); 678 spdk_json_write_named_uint32(w, "bdev_io_pool_size", g_bdev_opts.bdev_io_pool_size); 679 spdk_json_write_named_uint32(w, "bdev_io_cache_size", g_bdev_opts.bdev_io_cache_size); 680 spdk_json_write_object_end(w); 681 spdk_json_write_object_end(w); 682 683 TAILQ_FOREACH(bdev_module, &g_bdev_mgr.bdev_modules, internal.tailq) { 684 if (bdev_module->config_json) { 685 bdev_module->config_json(w); 686 } 687 } 688 689 TAILQ_FOREACH(bdev, &g_bdev_mgr.bdevs, internal.link) { 690 spdk_bdev_qos_config_json(bdev, w); 691 692 if (bdev->fn_table->write_config_json) { 693 bdev->fn_table->write_config_json(bdev, w); 694 } 695 } 696 697 spdk_json_write_array_end(w); 698 } 699 700 static int 701 spdk_bdev_mgmt_channel_create(void *io_device, void *ctx_buf) 702 { 703 struct spdk_bdev_mgmt_channel *ch = ctx_buf; 704 struct spdk_bdev_io *bdev_io; 705 uint32_t i; 706 707 STAILQ_INIT(&ch->need_buf_small); 708 STAILQ_INIT(&ch->need_buf_large); 709 710 STAILQ_INIT(&ch->per_thread_cache); 711 ch->bdev_io_cache_size = g_bdev_opts.bdev_io_cache_size; 712 713 /* Pre-populate bdev_io cache to ensure this thread cannot be starved. */ 714 ch->per_thread_cache_count = 0; 715 for (i = 0; i < ch->bdev_io_cache_size; i++) { 716 bdev_io = spdk_mempool_get(g_bdev_mgr.bdev_io_pool); 717 assert(bdev_io != NULL); 718 ch->per_thread_cache_count++; 719 STAILQ_INSERT_TAIL(&ch->per_thread_cache, bdev_io, internal.buf_link); 720 } 721 722 TAILQ_INIT(&ch->shared_resources); 723 TAILQ_INIT(&ch->io_wait_queue); 724 725 return 0; 726 } 727 728 static void 729 spdk_bdev_mgmt_channel_destroy(void *io_device, void *ctx_buf) 730 { 731 struct spdk_bdev_mgmt_channel *ch = ctx_buf; 732 struct spdk_bdev_io *bdev_io; 733 734 if (!STAILQ_EMPTY(&ch->need_buf_small) || !STAILQ_EMPTY(&ch->need_buf_large)) { 735 SPDK_ERRLOG("Pending I/O list wasn't empty on mgmt channel free\n"); 736 } 737 738 if (!TAILQ_EMPTY(&ch->shared_resources)) { 739 SPDK_ERRLOG("Module channel list wasn't empty on mgmt channel free\n"); 740 } 741 742 while (!STAILQ_EMPTY(&ch->per_thread_cache)) { 743 bdev_io = STAILQ_FIRST(&ch->per_thread_cache); 744 STAILQ_REMOVE_HEAD(&ch->per_thread_cache, internal.buf_link); 745 ch->per_thread_cache_count--; 746 spdk_mempool_put(g_bdev_mgr.bdev_io_pool, (void *)bdev_io); 747 } 748 749 assert(ch->per_thread_cache_count == 0); 750 } 751 752 static void 753 spdk_bdev_init_complete(int rc) 754 { 755 spdk_bdev_init_cb cb_fn = g_init_cb_fn; 756 void *cb_arg = g_init_cb_arg; 757 struct spdk_bdev_module *m; 758 759 g_bdev_mgr.init_complete = true; 760 g_init_cb_fn = NULL; 761 g_init_cb_arg = NULL; 762 763 /* 764 * For modules that need to know when subsystem init is complete, 765 * inform them now. 766 */ 767 if (rc == 0) { 768 TAILQ_FOREACH(m, &g_bdev_mgr.bdev_modules, internal.tailq) { 769 if (m->init_complete) { 770 m->init_complete(); 771 } 772 } 773 } 774 775 cb_fn(cb_arg, rc); 776 } 777 778 static void 779 spdk_bdev_module_action_complete(void) 780 { 781 struct spdk_bdev_module *m; 782 783 /* 784 * Don't finish bdev subsystem initialization if 785 * module pre-initialization is still in progress, or 786 * the subsystem been already initialized. 787 */ 788 if (!g_bdev_mgr.module_init_complete || g_bdev_mgr.init_complete) { 789 return; 790 } 791 792 /* 793 * Check all bdev modules for inits/examinations in progress. If any 794 * exist, return immediately since we cannot finish bdev subsystem 795 * initialization until all are completed. 796 */ 797 TAILQ_FOREACH(m, &g_bdev_mgr.bdev_modules, internal.tailq) { 798 if (m->internal.action_in_progress > 0) { 799 return; 800 } 801 } 802 803 /* 804 * Modules already finished initialization - now that all 805 * the bdev modules have finished their asynchronous I/O 806 * processing, the entire bdev layer can be marked as complete. 807 */ 808 spdk_bdev_init_complete(0); 809 } 810 811 static void 812 spdk_bdev_module_action_done(struct spdk_bdev_module *module) 813 { 814 assert(module->internal.action_in_progress > 0); 815 module->internal.action_in_progress--; 816 spdk_bdev_module_action_complete(); 817 } 818 819 void 820 spdk_bdev_module_init_done(struct spdk_bdev_module *module) 821 { 822 spdk_bdev_module_action_done(module); 823 } 824 825 void 826 spdk_bdev_module_examine_done(struct spdk_bdev_module *module) 827 { 828 spdk_bdev_module_action_done(module); 829 } 830 831 /** The last initialized bdev module */ 832 static struct spdk_bdev_module *g_resume_bdev_module = NULL; 833 834 static int 835 spdk_bdev_modules_init(void) 836 { 837 struct spdk_bdev_module *module; 838 int rc = 0; 839 840 TAILQ_FOREACH(module, &g_bdev_mgr.bdev_modules, internal.tailq) { 841 g_resume_bdev_module = module; 842 rc = module->module_init(); 843 if (rc != 0) { 844 return rc; 845 } 846 } 847 848 g_resume_bdev_module = NULL; 849 return 0; 850 } 851 852 853 static void 854 spdk_bdev_init_failed_complete(void *cb_arg) 855 { 856 spdk_bdev_init_complete(-1); 857 } 858 859 static void 860 spdk_bdev_init_failed(void *cb_arg) 861 { 862 spdk_bdev_finish(spdk_bdev_init_failed_complete, NULL); 863 } 864 865 void 866 spdk_bdev_initialize(spdk_bdev_init_cb cb_fn, void *cb_arg) 867 { 868 struct spdk_conf_section *sp; 869 struct spdk_bdev_opts bdev_opts; 870 int32_t bdev_io_pool_size, bdev_io_cache_size; 871 int cache_size; 872 int rc = 0; 873 char mempool_name[32]; 874 875 assert(cb_fn != NULL); 876 877 sp = spdk_conf_find_section(NULL, "Bdev"); 878 if (sp != NULL) { 879 spdk_bdev_get_opts(&bdev_opts); 880 881 bdev_io_pool_size = spdk_conf_section_get_intval(sp, "BdevIoPoolSize"); 882 if (bdev_io_pool_size >= 0) { 883 bdev_opts.bdev_io_pool_size = bdev_io_pool_size; 884 } 885 886 bdev_io_cache_size = spdk_conf_section_get_intval(sp, "BdevIoCacheSize"); 887 if (bdev_io_cache_size >= 0) { 888 bdev_opts.bdev_io_cache_size = bdev_io_cache_size; 889 } 890 891 if (spdk_bdev_set_opts(&bdev_opts)) { 892 spdk_bdev_init_complete(-1); 893 return; 894 } 895 896 assert(memcmp(&bdev_opts, &g_bdev_opts, sizeof(bdev_opts)) == 0); 897 } 898 899 g_init_cb_fn = cb_fn; 900 g_init_cb_arg = cb_arg; 901 902 snprintf(mempool_name, sizeof(mempool_name), "bdev_io_%d", getpid()); 903 904 g_bdev_mgr.bdev_io_pool = spdk_mempool_create(mempool_name, 905 g_bdev_opts.bdev_io_pool_size, 906 sizeof(struct spdk_bdev_io) + 907 spdk_bdev_module_get_max_ctx_size(), 908 0, 909 SPDK_ENV_SOCKET_ID_ANY); 910 911 if (g_bdev_mgr.bdev_io_pool == NULL) { 912 SPDK_ERRLOG("could not allocate spdk_bdev_io pool\n"); 913 spdk_bdev_init_complete(-1); 914 return; 915 } 916 917 /** 918 * Ensure no more than half of the total buffers end up local caches, by 919 * using spdk_thread_get_count() to determine how many local caches we need 920 * to account for. 921 */ 922 cache_size = BUF_SMALL_POOL_SIZE / (2 * spdk_thread_get_count()); 923 snprintf(mempool_name, sizeof(mempool_name), "buf_small_pool_%d", getpid()); 924 925 g_bdev_mgr.buf_small_pool = spdk_mempool_create(mempool_name, 926 BUF_SMALL_POOL_SIZE, 927 SPDK_BDEV_SMALL_BUF_MAX_SIZE + SPDK_BDEV_POOL_ALIGNMENT, 928 cache_size, 929 SPDK_ENV_SOCKET_ID_ANY); 930 if (!g_bdev_mgr.buf_small_pool) { 931 SPDK_ERRLOG("create rbuf small pool failed\n"); 932 spdk_bdev_init_complete(-1); 933 return; 934 } 935 936 cache_size = BUF_LARGE_POOL_SIZE / (2 * spdk_thread_get_count()); 937 snprintf(mempool_name, sizeof(mempool_name), "buf_large_pool_%d", getpid()); 938 939 g_bdev_mgr.buf_large_pool = spdk_mempool_create(mempool_name, 940 BUF_LARGE_POOL_SIZE, 941 SPDK_BDEV_LARGE_BUF_MAX_SIZE + SPDK_BDEV_POOL_ALIGNMENT, 942 cache_size, 943 SPDK_ENV_SOCKET_ID_ANY); 944 if (!g_bdev_mgr.buf_large_pool) { 945 SPDK_ERRLOG("create rbuf large pool failed\n"); 946 spdk_bdev_init_complete(-1); 947 return; 948 } 949 950 g_bdev_mgr.zero_buffer = spdk_dma_zmalloc(ZERO_BUFFER_SIZE, ZERO_BUFFER_SIZE, 951 NULL); 952 if (!g_bdev_mgr.zero_buffer) { 953 SPDK_ERRLOG("create bdev zero buffer failed\n"); 954 spdk_bdev_init_complete(-1); 955 return; 956 } 957 958 #ifdef SPDK_CONFIG_VTUNE 959 g_bdev_mgr.domain = __itt_domain_create("spdk_bdev"); 960 #endif 961 962 spdk_io_device_register(&g_bdev_mgr, spdk_bdev_mgmt_channel_create, 963 spdk_bdev_mgmt_channel_destroy, 964 sizeof(struct spdk_bdev_mgmt_channel), 965 "bdev_mgr"); 966 967 rc = spdk_bdev_modules_init(); 968 g_bdev_mgr.module_init_complete = true; 969 if (rc != 0) { 970 SPDK_ERRLOG("bdev modules init failed\n"); 971 spdk_thread_send_msg(spdk_get_thread(), spdk_bdev_init_failed, NULL); 972 return; 973 } 974 975 spdk_bdev_module_action_complete(); 976 } 977 978 static void 979 spdk_bdev_mgr_unregister_cb(void *io_device) 980 { 981 spdk_bdev_fini_cb cb_fn = g_fini_cb_fn; 982 983 if (spdk_mempool_count(g_bdev_mgr.bdev_io_pool) != g_bdev_opts.bdev_io_pool_size) { 984 SPDK_ERRLOG("bdev IO pool count is %zu but should be %u\n", 985 spdk_mempool_count(g_bdev_mgr.bdev_io_pool), 986 g_bdev_opts.bdev_io_pool_size); 987 } 988 989 if (spdk_mempool_count(g_bdev_mgr.buf_small_pool) != BUF_SMALL_POOL_SIZE) { 990 SPDK_ERRLOG("Small buffer pool count is %zu but should be %u\n", 991 spdk_mempool_count(g_bdev_mgr.buf_small_pool), 992 BUF_SMALL_POOL_SIZE); 993 assert(false); 994 } 995 996 if (spdk_mempool_count(g_bdev_mgr.buf_large_pool) != BUF_LARGE_POOL_SIZE) { 997 SPDK_ERRLOG("Large buffer pool count is %zu but should be %u\n", 998 spdk_mempool_count(g_bdev_mgr.buf_large_pool), 999 BUF_LARGE_POOL_SIZE); 1000 assert(false); 1001 } 1002 1003 spdk_mempool_free(g_bdev_mgr.bdev_io_pool); 1004 spdk_mempool_free(g_bdev_mgr.buf_small_pool); 1005 spdk_mempool_free(g_bdev_mgr.buf_large_pool); 1006 spdk_dma_free(g_bdev_mgr.zero_buffer); 1007 1008 cb_fn(g_fini_cb_arg); 1009 g_fini_cb_fn = NULL; 1010 g_fini_cb_arg = NULL; 1011 g_bdev_mgr.init_complete = false; 1012 g_bdev_mgr.module_init_complete = false; 1013 } 1014 1015 static void 1016 spdk_bdev_module_finish_iter(void *arg) 1017 { 1018 struct spdk_bdev_module *bdev_module; 1019 1020 /* Start iterating from the last touched module */ 1021 if (!g_resume_bdev_module) { 1022 bdev_module = TAILQ_LAST(&g_bdev_mgr.bdev_modules, bdev_module_list); 1023 } else { 1024 bdev_module = TAILQ_PREV(g_resume_bdev_module, bdev_module_list, 1025 internal.tailq); 1026 } 1027 1028 while (bdev_module) { 1029 if (bdev_module->async_fini) { 1030 /* Save our place so we can resume later. We must 1031 * save the variable here, before calling module_fini() 1032 * below, because in some cases the module may immediately 1033 * call spdk_bdev_module_finish_done() and re-enter 1034 * this function to continue iterating. */ 1035 g_resume_bdev_module = bdev_module; 1036 } 1037 1038 if (bdev_module->module_fini) { 1039 bdev_module->module_fini(); 1040 } 1041 1042 if (bdev_module->async_fini) { 1043 return; 1044 } 1045 1046 bdev_module = TAILQ_PREV(bdev_module, bdev_module_list, 1047 internal.tailq); 1048 } 1049 1050 g_resume_bdev_module = NULL; 1051 spdk_io_device_unregister(&g_bdev_mgr, spdk_bdev_mgr_unregister_cb); 1052 } 1053 1054 void 1055 spdk_bdev_module_finish_done(void) 1056 { 1057 if (spdk_get_thread() != g_fini_thread) { 1058 spdk_thread_send_msg(g_fini_thread, spdk_bdev_module_finish_iter, NULL); 1059 } else { 1060 spdk_bdev_module_finish_iter(NULL); 1061 } 1062 } 1063 1064 static void 1065 _spdk_bdev_finish_unregister_bdevs_iter(void *cb_arg, int bdeverrno) 1066 { 1067 struct spdk_bdev *bdev = cb_arg; 1068 1069 if (bdeverrno && bdev) { 1070 SPDK_WARNLOG("Unable to unregister bdev '%s' during spdk_bdev_finish()\n", 1071 bdev->name); 1072 1073 /* 1074 * Since the call to spdk_bdev_unregister() failed, we have no way to free this 1075 * bdev; try to continue by manually removing this bdev from the list and continue 1076 * with the next bdev in the list. 1077 */ 1078 TAILQ_REMOVE(&g_bdev_mgr.bdevs, bdev, internal.link); 1079 } 1080 1081 if (TAILQ_EMPTY(&g_bdev_mgr.bdevs)) { 1082 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Done unregistering bdevs\n"); 1083 /* 1084 * Bdev module finish need to be deferred as we might be in the middle of some context 1085 * (like bdev part free) that will use this bdev (or private bdev driver ctx data) 1086 * after returning. 1087 */ 1088 spdk_thread_send_msg(spdk_get_thread(), spdk_bdev_module_finish_iter, NULL); 1089 return; 1090 } 1091 1092 /* 1093 * Unregister last unclaimed bdev in the list, to ensure that bdev subsystem 1094 * shutdown proceeds top-down. The goal is to give virtual bdevs an opportunity 1095 * to detect clean shutdown as opposed to run-time hot removal of the underlying 1096 * base bdevs. 1097 * 1098 * Also, walk the list in the reverse order. 1099 */ 1100 for (bdev = TAILQ_LAST(&g_bdev_mgr.bdevs, spdk_bdev_list); 1101 bdev; bdev = TAILQ_PREV(bdev, spdk_bdev_list, internal.link)) { 1102 if (bdev->internal.claim_module != NULL) { 1103 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Skipping claimed bdev '%s'(<-'%s').\n", 1104 bdev->name, bdev->internal.claim_module->name); 1105 continue; 1106 } 1107 1108 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Unregistering bdev '%s'\n", bdev->name); 1109 spdk_bdev_unregister(bdev, _spdk_bdev_finish_unregister_bdevs_iter, bdev); 1110 return; 1111 } 1112 1113 /* 1114 * If any bdev fails to unclaim underlying bdev properly, we may face the 1115 * case of bdev list consisting of claimed bdevs only (if claims are managed 1116 * correctly, this would mean there's a loop in the claims graph which is 1117 * clearly impossible). Warn and unregister last bdev on the list then. 1118 */ 1119 for (bdev = TAILQ_LAST(&g_bdev_mgr.bdevs, spdk_bdev_list); 1120 bdev; bdev = TAILQ_PREV(bdev, spdk_bdev_list, internal.link)) { 1121 SPDK_ERRLOG("Unregistering claimed bdev '%s'!\n", bdev->name); 1122 spdk_bdev_unregister(bdev, _spdk_bdev_finish_unregister_bdevs_iter, bdev); 1123 return; 1124 } 1125 } 1126 1127 void 1128 spdk_bdev_finish(spdk_bdev_fini_cb cb_fn, void *cb_arg) 1129 { 1130 struct spdk_bdev_module *m; 1131 1132 assert(cb_fn != NULL); 1133 1134 g_fini_thread = spdk_get_thread(); 1135 1136 g_fini_cb_fn = cb_fn; 1137 g_fini_cb_arg = cb_arg; 1138 1139 TAILQ_FOREACH(m, &g_bdev_mgr.bdev_modules, internal.tailq) { 1140 if (m->fini_start) { 1141 m->fini_start(); 1142 } 1143 } 1144 1145 _spdk_bdev_finish_unregister_bdevs_iter(NULL, 0); 1146 } 1147 1148 static struct spdk_bdev_io * 1149 spdk_bdev_get_io(struct spdk_bdev_channel *channel) 1150 { 1151 struct spdk_bdev_mgmt_channel *ch = channel->shared_resource->mgmt_ch; 1152 struct spdk_bdev_io *bdev_io; 1153 1154 if (ch->per_thread_cache_count > 0) { 1155 bdev_io = STAILQ_FIRST(&ch->per_thread_cache); 1156 STAILQ_REMOVE_HEAD(&ch->per_thread_cache, internal.buf_link); 1157 ch->per_thread_cache_count--; 1158 } else if (spdk_unlikely(!TAILQ_EMPTY(&ch->io_wait_queue))) { 1159 /* 1160 * Don't try to look for bdev_ios in the global pool if there are 1161 * waiters on bdev_ios - we don't want this caller to jump the line. 1162 */ 1163 bdev_io = NULL; 1164 } else { 1165 bdev_io = spdk_mempool_get(g_bdev_mgr.bdev_io_pool); 1166 } 1167 1168 return bdev_io; 1169 } 1170 1171 void 1172 spdk_bdev_free_io(struct spdk_bdev_io *bdev_io) 1173 { 1174 struct spdk_bdev_mgmt_channel *ch = bdev_io->internal.ch->shared_resource->mgmt_ch; 1175 1176 assert(bdev_io != NULL); 1177 assert(bdev_io->internal.status != SPDK_BDEV_IO_STATUS_PENDING); 1178 1179 if (bdev_io->internal.buf != NULL) { 1180 spdk_bdev_io_put_buf(bdev_io); 1181 } 1182 1183 if (ch->per_thread_cache_count < ch->bdev_io_cache_size) { 1184 ch->per_thread_cache_count++; 1185 STAILQ_INSERT_TAIL(&ch->per_thread_cache, bdev_io, internal.buf_link); 1186 while (ch->per_thread_cache_count > 0 && !TAILQ_EMPTY(&ch->io_wait_queue)) { 1187 struct spdk_bdev_io_wait_entry *entry; 1188 1189 entry = TAILQ_FIRST(&ch->io_wait_queue); 1190 TAILQ_REMOVE(&ch->io_wait_queue, entry, link); 1191 entry->cb_fn(entry->cb_arg); 1192 } 1193 } else { 1194 /* We should never have a full cache with entries on the io wait queue. */ 1195 assert(TAILQ_EMPTY(&ch->io_wait_queue)); 1196 spdk_mempool_put(g_bdev_mgr.bdev_io_pool, (void *)bdev_io); 1197 } 1198 } 1199 1200 static bool 1201 _spdk_bdev_qos_is_iops_rate_limit(enum spdk_bdev_qos_rate_limit_type limit) 1202 { 1203 assert(limit != SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES); 1204 1205 switch (limit) { 1206 case SPDK_BDEV_QOS_RW_IOPS_RATE_LIMIT: 1207 return true; 1208 case SPDK_BDEV_QOS_RW_BPS_RATE_LIMIT: 1209 return false; 1210 case SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES: 1211 default: 1212 return false; 1213 } 1214 } 1215 1216 static bool 1217 _spdk_bdev_qos_io_to_limit(struct spdk_bdev_io *bdev_io) 1218 { 1219 switch (bdev_io->type) { 1220 case SPDK_BDEV_IO_TYPE_NVME_IO: 1221 case SPDK_BDEV_IO_TYPE_NVME_IO_MD: 1222 case SPDK_BDEV_IO_TYPE_READ: 1223 case SPDK_BDEV_IO_TYPE_WRITE: 1224 case SPDK_BDEV_IO_TYPE_UNMAP: 1225 case SPDK_BDEV_IO_TYPE_WRITE_ZEROES: 1226 return true; 1227 default: 1228 return false; 1229 } 1230 } 1231 1232 static uint64_t 1233 _spdk_bdev_get_io_size_in_byte(struct spdk_bdev_io *bdev_io) 1234 { 1235 struct spdk_bdev *bdev = bdev_io->bdev; 1236 1237 switch (bdev_io->type) { 1238 case SPDK_BDEV_IO_TYPE_NVME_IO: 1239 case SPDK_BDEV_IO_TYPE_NVME_IO_MD: 1240 return bdev_io->u.nvme_passthru.nbytes; 1241 case SPDK_BDEV_IO_TYPE_READ: 1242 case SPDK_BDEV_IO_TYPE_WRITE: 1243 case SPDK_BDEV_IO_TYPE_UNMAP: 1244 case SPDK_BDEV_IO_TYPE_WRITE_ZEROES: 1245 return bdev_io->u.bdev.num_blocks * bdev->blocklen; 1246 default: 1247 return 0; 1248 } 1249 } 1250 1251 static void 1252 _spdk_bdev_qos_update_per_io(struct spdk_bdev_qos *qos, uint64_t io_size_in_byte) 1253 { 1254 int i; 1255 1256 for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) { 1257 if (qos->rate_limits[i].limit == SPDK_BDEV_QOS_LIMIT_NOT_DEFINED) { 1258 continue; 1259 } 1260 1261 switch (i) { 1262 case SPDK_BDEV_QOS_RW_IOPS_RATE_LIMIT: 1263 qos->rate_limits[i].remaining_this_timeslice--; 1264 break; 1265 case SPDK_BDEV_QOS_RW_BPS_RATE_LIMIT: 1266 qos->rate_limits[i].remaining_this_timeslice -= io_size_in_byte; 1267 break; 1268 case SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES: 1269 default: 1270 break; 1271 } 1272 } 1273 } 1274 1275 static int 1276 _spdk_bdev_qos_io_submit(struct spdk_bdev_channel *ch, struct spdk_bdev_qos *qos) 1277 { 1278 struct spdk_bdev_io *bdev_io = NULL; 1279 struct spdk_bdev *bdev = ch->bdev; 1280 struct spdk_bdev_shared_resource *shared_resource = ch->shared_resource; 1281 int i, submitted_ios = 0; 1282 bool to_limit_io; 1283 uint64_t io_size_in_byte; 1284 1285 while (!TAILQ_EMPTY(&qos->queued)) { 1286 for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) { 1287 if (qos->rate_limits[i].max_per_timeslice > 0 && 1288 (qos->rate_limits[i].remaining_this_timeslice <= 0)) { 1289 return submitted_ios; 1290 } 1291 } 1292 1293 bdev_io = TAILQ_FIRST(&qos->queued); 1294 TAILQ_REMOVE(&qos->queued, bdev_io, internal.link); 1295 ch->io_outstanding++; 1296 shared_resource->io_outstanding++; 1297 to_limit_io = _spdk_bdev_qos_io_to_limit(bdev_io); 1298 if (to_limit_io == true) { 1299 io_size_in_byte = _spdk_bdev_get_io_size_in_byte(bdev_io); 1300 _spdk_bdev_qos_update_per_io(qos, io_size_in_byte); 1301 } 1302 bdev->fn_table->submit_request(ch->channel, bdev_io); 1303 submitted_ios++; 1304 } 1305 1306 return submitted_ios; 1307 } 1308 1309 static void 1310 _spdk_bdev_queue_io_wait_with_cb(struct spdk_bdev_io *bdev_io, spdk_bdev_io_wait_cb cb_fn) 1311 { 1312 int rc; 1313 1314 bdev_io->internal.waitq_entry.bdev = bdev_io->bdev; 1315 bdev_io->internal.waitq_entry.cb_fn = cb_fn; 1316 bdev_io->internal.waitq_entry.cb_arg = bdev_io; 1317 rc = spdk_bdev_queue_io_wait(bdev_io->bdev, spdk_io_channel_from_ctx(bdev_io->internal.ch), 1318 &bdev_io->internal.waitq_entry); 1319 if (rc != 0) { 1320 SPDK_ERRLOG("Queue IO failed, rc=%d\n", rc); 1321 bdev_io->internal.status = SPDK_BDEV_IO_STATUS_FAILED; 1322 bdev_io->internal.cb(bdev_io, false, bdev_io->internal.caller_ctx); 1323 } 1324 } 1325 1326 static bool 1327 _spdk_bdev_io_type_can_split(uint8_t type) 1328 { 1329 assert(type != SPDK_BDEV_IO_TYPE_INVALID); 1330 assert(type < SPDK_BDEV_NUM_IO_TYPES); 1331 1332 /* Only split READ and WRITE I/O. Theoretically other types of I/O like 1333 * UNMAP could be split, but these types of I/O are typically much larger 1334 * in size (sometimes the size of the entire block device), and the bdev 1335 * module can more efficiently split these types of I/O. Plus those types 1336 * of I/O do not have a payload, which makes the splitting process simpler. 1337 */ 1338 if (type == SPDK_BDEV_IO_TYPE_READ || type == SPDK_BDEV_IO_TYPE_WRITE) { 1339 return true; 1340 } else { 1341 return false; 1342 } 1343 } 1344 1345 static bool 1346 _spdk_bdev_io_should_split(struct spdk_bdev_io *bdev_io) 1347 { 1348 uint64_t start_stripe, end_stripe; 1349 uint32_t io_boundary = bdev_io->bdev->optimal_io_boundary; 1350 1351 if (io_boundary == 0) { 1352 return false; 1353 } 1354 1355 if (!_spdk_bdev_io_type_can_split(bdev_io->type)) { 1356 return false; 1357 } 1358 1359 start_stripe = bdev_io->u.bdev.offset_blocks; 1360 end_stripe = start_stripe + bdev_io->u.bdev.num_blocks - 1; 1361 /* Avoid expensive div operations if possible. These spdk_u32 functions are very cheap. */ 1362 if (spdk_likely(spdk_u32_is_pow2(io_boundary))) { 1363 start_stripe >>= spdk_u32log2(io_boundary); 1364 end_stripe >>= spdk_u32log2(io_boundary); 1365 } else { 1366 start_stripe /= io_boundary; 1367 end_stripe /= io_boundary; 1368 } 1369 return (start_stripe != end_stripe); 1370 } 1371 1372 static uint32_t 1373 _to_next_boundary(uint64_t offset, uint32_t boundary) 1374 { 1375 return (boundary - (offset % boundary)); 1376 } 1377 1378 static void 1379 _spdk_bdev_io_split_done(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg); 1380 1381 static void 1382 _spdk_bdev_io_split_with_payload(void *_bdev_io) 1383 { 1384 struct spdk_bdev_io *bdev_io = _bdev_io; 1385 uint64_t current_offset, remaining; 1386 uint32_t blocklen, to_next_boundary, to_next_boundary_bytes; 1387 struct iovec *parent_iov, *iov; 1388 uint64_t parent_iov_offset, iov_len; 1389 uint32_t parent_iovpos, parent_iovcnt, child_iovcnt, iovcnt; 1390 int rc; 1391 1392 remaining = bdev_io->u.bdev.split_remaining_num_blocks; 1393 current_offset = bdev_io->u.bdev.split_current_offset_blocks; 1394 blocklen = bdev_io->bdev->blocklen; 1395 parent_iov_offset = (current_offset - bdev_io->u.bdev.offset_blocks) * blocklen; 1396 parent_iovcnt = bdev_io->u.bdev.iovcnt; 1397 1398 for (parent_iovpos = 0; parent_iovpos < parent_iovcnt; parent_iovpos++) { 1399 parent_iov = &bdev_io->u.bdev.iovs[parent_iovpos]; 1400 if (parent_iov_offset < parent_iov->iov_len) { 1401 break; 1402 } 1403 parent_iov_offset -= parent_iov->iov_len; 1404 } 1405 1406 child_iovcnt = 0; 1407 while (remaining > 0 && parent_iovpos < parent_iovcnt && child_iovcnt < BDEV_IO_NUM_CHILD_IOV) { 1408 to_next_boundary = _to_next_boundary(current_offset, bdev_io->bdev->optimal_io_boundary); 1409 to_next_boundary = spdk_min(remaining, to_next_boundary); 1410 to_next_boundary_bytes = to_next_boundary * blocklen; 1411 iov = &bdev_io->child_iov[child_iovcnt]; 1412 iovcnt = 0; 1413 while (to_next_boundary_bytes > 0 && parent_iovpos < parent_iovcnt && 1414 child_iovcnt < BDEV_IO_NUM_CHILD_IOV) { 1415 parent_iov = &bdev_io->u.bdev.iovs[parent_iovpos]; 1416 iov_len = spdk_min(to_next_boundary_bytes, parent_iov->iov_len - parent_iov_offset); 1417 to_next_boundary_bytes -= iov_len; 1418 1419 bdev_io->child_iov[child_iovcnt].iov_base = parent_iov->iov_base + parent_iov_offset; 1420 bdev_io->child_iov[child_iovcnt].iov_len = iov_len; 1421 1422 if (iov_len < parent_iov->iov_len - parent_iov_offset) { 1423 parent_iov_offset += iov_len; 1424 } else { 1425 parent_iovpos++; 1426 parent_iov_offset = 0; 1427 } 1428 child_iovcnt++; 1429 iovcnt++; 1430 } 1431 1432 if (to_next_boundary_bytes > 0) { 1433 /* We had to stop this child I/O early because we ran out of 1434 * child_iov space. Make sure the iovs collected are valid and 1435 * then adjust to_next_boundary before starting the child I/O. 1436 */ 1437 if ((to_next_boundary_bytes % blocklen) != 0) { 1438 SPDK_ERRLOG("Remaining %" PRIu32 " is not multiple of block size %" PRIu32 "\n", 1439 to_next_boundary_bytes, blocklen); 1440 bdev_io->internal.status = SPDK_BDEV_IO_STATUS_FAILED; 1441 if (bdev_io->u.bdev.split_outstanding == 0) { 1442 bdev_io->internal.cb(bdev_io, false, bdev_io->internal.caller_ctx); 1443 } 1444 return; 1445 } 1446 to_next_boundary -= to_next_boundary_bytes / blocklen; 1447 } 1448 1449 bdev_io->u.bdev.split_outstanding++; 1450 1451 if (bdev_io->type == SPDK_BDEV_IO_TYPE_READ) { 1452 rc = spdk_bdev_readv_blocks(bdev_io->internal.desc, 1453 spdk_io_channel_from_ctx(bdev_io->internal.ch), 1454 iov, iovcnt, current_offset, to_next_boundary, 1455 _spdk_bdev_io_split_done, bdev_io); 1456 } else { 1457 rc = spdk_bdev_writev_blocks(bdev_io->internal.desc, 1458 spdk_io_channel_from_ctx(bdev_io->internal.ch), 1459 iov, iovcnt, current_offset, to_next_boundary, 1460 _spdk_bdev_io_split_done, bdev_io); 1461 } 1462 1463 if (rc == 0) { 1464 current_offset += to_next_boundary; 1465 remaining -= to_next_boundary; 1466 bdev_io->u.bdev.split_current_offset_blocks = current_offset; 1467 bdev_io->u.bdev.split_remaining_num_blocks = remaining; 1468 } else { 1469 bdev_io->u.bdev.split_outstanding--; 1470 if (rc == -ENOMEM) { 1471 if (bdev_io->u.bdev.split_outstanding == 0) { 1472 /* No I/O is outstanding. Hence we should wait here. */ 1473 _spdk_bdev_queue_io_wait_with_cb(bdev_io, 1474 _spdk_bdev_io_split_with_payload); 1475 } 1476 } else { 1477 bdev_io->internal.status = SPDK_BDEV_IO_STATUS_FAILED; 1478 if (bdev_io->u.bdev.split_outstanding == 0) { 1479 bdev_io->internal.cb(bdev_io, false, bdev_io->internal.caller_ctx); 1480 } 1481 } 1482 1483 return; 1484 } 1485 } 1486 } 1487 1488 static void 1489 _spdk_bdev_io_split_done(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg) 1490 { 1491 struct spdk_bdev_io *parent_io = cb_arg; 1492 1493 spdk_bdev_free_io(bdev_io); 1494 1495 if (!success) { 1496 parent_io->internal.status = SPDK_BDEV_IO_STATUS_FAILED; 1497 } 1498 parent_io->u.bdev.split_outstanding--; 1499 if (parent_io->u.bdev.split_outstanding != 0) { 1500 return; 1501 } 1502 1503 /* 1504 * Parent I/O finishes when all blocks are consumed or there is any failure of 1505 * child I/O and no outstanding child I/O. 1506 */ 1507 if (parent_io->u.bdev.split_remaining_num_blocks == 0 || 1508 parent_io->internal.status != SPDK_BDEV_IO_STATUS_SUCCESS) { 1509 parent_io->internal.cb(parent_io, parent_io->internal.status == SPDK_BDEV_IO_STATUS_SUCCESS, 1510 parent_io->internal.caller_ctx); 1511 return; 1512 } 1513 1514 /* 1515 * Continue with the splitting process. This function will complete the parent I/O if the 1516 * splitting is done. 1517 */ 1518 _spdk_bdev_io_split_with_payload(parent_io); 1519 } 1520 1521 static void 1522 _spdk_bdev_io_split(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io) 1523 { 1524 assert(_spdk_bdev_io_type_can_split(bdev_io->type)); 1525 1526 bdev_io->u.bdev.split_current_offset_blocks = bdev_io->u.bdev.offset_blocks; 1527 bdev_io->u.bdev.split_remaining_num_blocks = bdev_io->u.bdev.num_blocks; 1528 bdev_io->u.bdev.split_outstanding = 0; 1529 bdev_io->internal.status = SPDK_BDEV_IO_STATUS_SUCCESS; 1530 1531 _spdk_bdev_io_split_with_payload(bdev_io); 1532 } 1533 1534 static void 1535 _spdk_bdev_io_submit(void *ctx) 1536 { 1537 struct spdk_bdev_io *bdev_io = ctx; 1538 struct spdk_bdev *bdev = bdev_io->bdev; 1539 struct spdk_bdev_channel *bdev_ch = bdev_io->internal.ch; 1540 struct spdk_io_channel *ch = bdev_ch->channel; 1541 struct spdk_bdev_shared_resource *shared_resource = bdev_ch->shared_resource; 1542 uint64_t tsc; 1543 1544 tsc = spdk_get_ticks(); 1545 bdev_io->internal.submit_tsc = tsc; 1546 spdk_trace_record_tsc(tsc, TRACE_BDEV_IO_START, 0, 0, (uintptr_t)bdev_io, bdev_io->type); 1547 bdev_ch->io_outstanding++; 1548 shared_resource->io_outstanding++; 1549 bdev_io->internal.in_submit_request = true; 1550 if (spdk_likely(bdev_ch->flags == 0)) { 1551 if (spdk_likely(TAILQ_EMPTY(&shared_resource->nomem_io))) { 1552 bdev->fn_table->submit_request(ch, bdev_io); 1553 } else { 1554 bdev_ch->io_outstanding--; 1555 shared_resource->io_outstanding--; 1556 TAILQ_INSERT_TAIL(&shared_resource->nomem_io, bdev_io, internal.link); 1557 } 1558 } else if (bdev_ch->flags & BDEV_CH_RESET_IN_PROGRESS) { 1559 spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED); 1560 } else if (bdev_ch->flags & BDEV_CH_QOS_ENABLED) { 1561 bdev_ch->io_outstanding--; 1562 shared_resource->io_outstanding--; 1563 TAILQ_INSERT_TAIL(&bdev->internal.qos->queued, bdev_io, internal.link); 1564 _spdk_bdev_qos_io_submit(bdev_ch, bdev->internal.qos); 1565 } else { 1566 SPDK_ERRLOG("unknown bdev_ch flag %x found\n", bdev_ch->flags); 1567 spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED); 1568 } 1569 bdev_io->internal.in_submit_request = false; 1570 } 1571 1572 static void 1573 spdk_bdev_io_submit(struct spdk_bdev_io *bdev_io) 1574 { 1575 struct spdk_bdev *bdev = bdev_io->bdev; 1576 struct spdk_thread *thread = spdk_io_channel_get_thread(bdev_io->internal.ch->channel); 1577 1578 assert(thread != NULL); 1579 assert(bdev_io->internal.status == SPDK_BDEV_IO_STATUS_PENDING); 1580 1581 if (bdev->split_on_optimal_io_boundary && _spdk_bdev_io_should_split(bdev_io)) { 1582 if (bdev_io->type == SPDK_BDEV_IO_TYPE_READ) { 1583 spdk_bdev_io_get_buf(bdev_io, _spdk_bdev_io_split, 1584 bdev_io->u.bdev.num_blocks * bdev_io->bdev->blocklen); 1585 } else { 1586 _spdk_bdev_io_split(NULL, bdev_io); 1587 } 1588 return; 1589 } 1590 1591 if (bdev_io->internal.ch->flags & BDEV_CH_QOS_ENABLED) { 1592 if ((thread == bdev->internal.qos->thread) || !bdev->internal.qos->thread) { 1593 _spdk_bdev_io_submit(bdev_io); 1594 } else { 1595 bdev_io->internal.io_submit_ch = bdev_io->internal.ch; 1596 bdev_io->internal.ch = bdev->internal.qos->ch; 1597 spdk_thread_send_msg(bdev->internal.qos->thread, _spdk_bdev_io_submit, bdev_io); 1598 } 1599 } else { 1600 _spdk_bdev_io_submit(bdev_io); 1601 } 1602 } 1603 1604 static void 1605 spdk_bdev_io_submit_reset(struct spdk_bdev_io *bdev_io) 1606 { 1607 struct spdk_bdev *bdev = bdev_io->bdev; 1608 struct spdk_bdev_channel *bdev_ch = bdev_io->internal.ch; 1609 struct spdk_io_channel *ch = bdev_ch->channel; 1610 1611 assert(bdev_io->internal.status == SPDK_BDEV_IO_STATUS_PENDING); 1612 1613 bdev_io->internal.in_submit_request = true; 1614 bdev->fn_table->submit_request(ch, bdev_io); 1615 bdev_io->internal.in_submit_request = false; 1616 } 1617 1618 static void 1619 spdk_bdev_io_init(struct spdk_bdev_io *bdev_io, 1620 struct spdk_bdev *bdev, void *cb_arg, 1621 spdk_bdev_io_completion_cb cb) 1622 { 1623 bdev_io->bdev = bdev; 1624 bdev_io->internal.caller_ctx = cb_arg; 1625 bdev_io->internal.cb = cb; 1626 bdev_io->internal.status = SPDK_BDEV_IO_STATUS_PENDING; 1627 bdev_io->internal.in_submit_request = false; 1628 bdev_io->internal.buf = NULL; 1629 bdev_io->internal.io_submit_ch = NULL; 1630 bdev_io->internal.orig_iovs = NULL; 1631 bdev_io->internal.orig_iovcnt = 0; 1632 } 1633 1634 static bool 1635 _spdk_bdev_io_type_supported(struct spdk_bdev *bdev, enum spdk_bdev_io_type io_type) 1636 { 1637 return bdev->fn_table->io_type_supported(bdev->ctxt, io_type); 1638 } 1639 1640 bool 1641 spdk_bdev_io_type_supported(struct spdk_bdev *bdev, enum spdk_bdev_io_type io_type) 1642 { 1643 bool supported; 1644 1645 supported = _spdk_bdev_io_type_supported(bdev, io_type); 1646 1647 if (!supported) { 1648 switch (io_type) { 1649 case SPDK_BDEV_IO_TYPE_WRITE_ZEROES: 1650 /* The bdev layer will emulate write zeroes as long as write is supported. */ 1651 supported = _spdk_bdev_io_type_supported(bdev, SPDK_BDEV_IO_TYPE_WRITE); 1652 break; 1653 default: 1654 break; 1655 } 1656 } 1657 1658 return supported; 1659 } 1660 1661 int 1662 spdk_bdev_dump_info_json(struct spdk_bdev *bdev, struct spdk_json_write_ctx *w) 1663 { 1664 if (bdev->fn_table->dump_info_json) { 1665 return bdev->fn_table->dump_info_json(bdev->ctxt, w); 1666 } 1667 1668 return 0; 1669 } 1670 1671 static void 1672 spdk_bdev_qos_update_max_quota_per_timeslice(struct spdk_bdev_qos *qos) 1673 { 1674 uint32_t max_per_timeslice = 0; 1675 int i; 1676 1677 for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) { 1678 if (qos->rate_limits[i].limit == SPDK_BDEV_QOS_LIMIT_NOT_DEFINED) { 1679 qos->rate_limits[i].max_per_timeslice = 0; 1680 continue; 1681 } 1682 1683 max_per_timeslice = qos->rate_limits[i].limit * 1684 SPDK_BDEV_QOS_TIMESLICE_IN_USEC / SPDK_SEC_TO_USEC; 1685 1686 qos->rate_limits[i].max_per_timeslice = spdk_max(max_per_timeslice, 1687 qos->rate_limits[i].min_per_timeslice); 1688 1689 qos->rate_limits[i].remaining_this_timeslice = qos->rate_limits[i].max_per_timeslice; 1690 } 1691 } 1692 1693 static int 1694 spdk_bdev_channel_poll_qos(void *arg) 1695 { 1696 struct spdk_bdev_qos *qos = arg; 1697 uint64_t now = spdk_get_ticks(); 1698 int i; 1699 1700 if (now < (qos->last_timeslice + qos->timeslice_size)) { 1701 /* We received our callback earlier than expected - return 1702 * immediately and wait to do accounting until at least one 1703 * timeslice has actually expired. This should never happen 1704 * with a well-behaved timer implementation. 1705 */ 1706 return 0; 1707 } 1708 1709 /* Reset for next round of rate limiting */ 1710 for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) { 1711 /* We may have allowed the IOs or bytes to slightly overrun in the last 1712 * timeslice. remaining_this_timeslice is signed, so if it's negative 1713 * here, we'll account for the overrun so that the next timeslice will 1714 * be appropriately reduced. 1715 */ 1716 if (qos->rate_limits[i].remaining_this_timeslice > 0) { 1717 qos->rate_limits[i].remaining_this_timeslice = 0; 1718 } 1719 } 1720 1721 while (now >= (qos->last_timeslice + qos->timeslice_size)) { 1722 qos->last_timeslice += qos->timeslice_size; 1723 for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) { 1724 qos->rate_limits[i].remaining_this_timeslice += 1725 qos->rate_limits[i].max_per_timeslice; 1726 } 1727 } 1728 1729 return _spdk_bdev_qos_io_submit(qos->ch, qos); 1730 } 1731 1732 static void 1733 _spdk_bdev_channel_destroy_resource(struct spdk_bdev_channel *ch) 1734 { 1735 struct spdk_bdev_shared_resource *shared_resource; 1736 1737 if (!ch) { 1738 return; 1739 } 1740 1741 if (ch->channel) { 1742 spdk_put_io_channel(ch->channel); 1743 } 1744 1745 assert(ch->io_outstanding == 0); 1746 1747 shared_resource = ch->shared_resource; 1748 if (shared_resource) { 1749 assert(ch->io_outstanding == 0); 1750 assert(shared_resource->ref > 0); 1751 shared_resource->ref--; 1752 if (shared_resource->ref == 0) { 1753 assert(shared_resource->io_outstanding == 0); 1754 TAILQ_REMOVE(&shared_resource->mgmt_ch->shared_resources, shared_resource, link); 1755 spdk_put_io_channel(spdk_io_channel_from_ctx(shared_resource->mgmt_ch)); 1756 free(shared_resource); 1757 } 1758 } 1759 } 1760 1761 /* Caller must hold bdev->internal.mutex. */ 1762 static void 1763 _spdk_bdev_enable_qos(struct spdk_bdev *bdev, struct spdk_bdev_channel *ch) 1764 { 1765 struct spdk_bdev_qos *qos = bdev->internal.qos; 1766 int i; 1767 1768 /* Rate limiting on this bdev enabled */ 1769 if (qos) { 1770 if (qos->ch == NULL) { 1771 struct spdk_io_channel *io_ch; 1772 1773 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Selecting channel %p as QoS channel for bdev %s on thread %p\n", ch, 1774 bdev->name, spdk_get_thread()); 1775 1776 /* No qos channel has been selected, so set one up */ 1777 1778 /* Take another reference to ch */ 1779 io_ch = spdk_get_io_channel(__bdev_to_io_dev(bdev)); 1780 assert(io_ch != NULL); 1781 qos->ch = ch; 1782 1783 qos->thread = spdk_io_channel_get_thread(io_ch); 1784 1785 TAILQ_INIT(&qos->queued); 1786 1787 for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) { 1788 if (_spdk_bdev_qos_is_iops_rate_limit(i) == true) { 1789 qos->rate_limits[i].min_per_timeslice = 1790 SPDK_BDEV_QOS_MIN_IO_PER_TIMESLICE; 1791 } else { 1792 qos->rate_limits[i].min_per_timeslice = 1793 SPDK_BDEV_QOS_MIN_BYTE_PER_TIMESLICE; 1794 } 1795 1796 if (qos->rate_limits[i].limit == 0) { 1797 qos->rate_limits[i].limit = SPDK_BDEV_QOS_LIMIT_NOT_DEFINED; 1798 } 1799 } 1800 spdk_bdev_qos_update_max_quota_per_timeslice(qos); 1801 qos->timeslice_size = 1802 SPDK_BDEV_QOS_TIMESLICE_IN_USEC * spdk_get_ticks_hz() / SPDK_SEC_TO_USEC; 1803 qos->last_timeslice = spdk_get_ticks(); 1804 qos->poller = spdk_poller_register(spdk_bdev_channel_poll_qos, 1805 qos, 1806 SPDK_BDEV_QOS_TIMESLICE_IN_USEC); 1807 } 1808 1809 ch->flags |= BDEV_CH_QOS_ENABLED; 1810 } 1811 } 1812 1813 static int 1814 spdk_bdev_channel_create(void *io_device, void *ctx_buf) 1815 { 1816 struct spdk_bdev *bdev = __bdev_from_io_dev(io_device); 1817 struct spdk_bdev_channel *ch = ctx_buf; 1818 struct spdk_io_channel *mgmt_io_ch; 1819 struct spdk_bdev_mgmt_channel *mgmt_ch; 1820 struct spdk_bdev_shared_resource *shared_resource; 1821 1822 ch->bdev = bdev; 1823 ch->channel = bdev->fn_table->get_io_channel(bdev->ctxt); 1824 if (!ch->channel) { 1825 return -1; 1826 } 1827 1828 mgmt_io_ch = spdk_get_io_channel(&g_bdev_mgr); 1829 if (!mgmt_io_ch) { 1830 return -1; 1831 } 1832 1833 mgmt_ch = spdk_io_channel_get_ctx(mgmt_io_ch); 1834 TAILQ_FOREACH(shared_resource, &mgmt_ch->shared_resources, link) { 1835 if (shared_resource->shared_ch == ch->channel) { 1836 spdk_put_io_channel(mgmt_io_ch); 1837 shared_resource->ref++; 1838 break; 1839 } 1840 } 1841 1842 if (shared_resource == NULL) { 1843 shared_resource = calloc(1, sizeof(*shared_resource)); 1844 if (shared_resource == NULL) { 1845 spdk_put_io_channel(mgmt_io_ch); 1846 return -1; 1847 } 1848 1849 shared_resource->mgmt_ch = mgmt_ch; 1850 shared_resource->io_outstanding = 0; 1851 TAILQ_INIT(&shared_resource->nomem_io); 1852 shared_resource->nomem_threshold = 0; 1853 shared_resource->shared_ch = ch->channel; 1854 shared_resource->ref = 1; 1855 TAILQ_INSERT_TAIL(&mgmt_ch->shared_resources, shared_resource, link); 1856 } 1857 1858 memset(&ch->stat, 0, sizeof(ch->stat)); 1859 ch->stat.ticks_rate = spdk_get_ticks_hz(); 1860 ch->io_outstanding = 0; 1861 TAILQ_INIT(&ch->queued_resets); 1862 ch->flags = 0; 1863 ch->shared_resource = shared_resource; 1864 1865 #ifdef SPDK_CONFIG_VTUNE 1866 { 1867 char *name; 1868 __itt_init_ittlib(NULL, 0); 1869 name = spdk_sprintf_alloc("spdk_bdev_%s_%p", ch->bdev->name, ch); 1870 if (!name) { 1871 _spdk_bdev_channel_destroy_resource(ch); 1872 return -1; 1873 } 1874 ch->handle = __itt_string_handle_create(name); 1875 free(name); 1876 ch->start_tsc = spdk_get_ticks(); 1877 ch->interval_tsc = spdk_get_ticks_hz() / 100; 1878 memset(&ch->prev_stat, 0, sizeof(ch->prev_stat)); 1879 } 1880 #endif 1881 1882 pthread_mutex_lock(&bdev->internal.mutex); 1883 _spdk_bdev_enable_qos(bdev, ch); 1884 pthread_mutex_unlock(&bdev->internal.mutex); 1885 1886 return 0; 1887 } 1888 1889 /* 1890 * Abort I/O that are waiting on a data buffer. These types of I/O are 1891 * linked using the spdk_bdev_io internal.buf_link TAILQ_ENTRY. 1892 */ 1893 static void 1894 _spdk_bdev_abort_buf_io(bdev_io_stailq_t *queue, struct spdk_bdev_channel *ch) 1895 { 1896 bdev_io_stailq_t tmp; 1897 struct spdk_bdev_io *bdev_io; 1898 1899 STAILQ_INIT(&tmp); 1900 1901 while (!STAILQ_EMPTY(queue)) { 1902 bdev_io = STAILQ_FIRST(queue); 1903 STAILQ_REMOVE_HEAD(queue, internal.buf_link); 1904 if (bdev_io->internal.ch == ch) { 1905 spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED); 1906 } else { 1907 STAILQ_INSERT_TAIL(&tmp, bdev_io, internal.buf_link); 1908 } 1909 } 1910 1911 STAILQ_SWAP(&tmp, queue, spdk_bdev_io); 1912 } 1913 1914 /* 1915 * Abort I/O that are queued waiting for submission. These types of I/O are 1916 * linked using the spdk_bdev_io link TAILQ_ENTRY. 1917 */ 1918 static void 1919 _spdk_bdev_abort_queued_io(bdev_io_tailq_t *queue, struct spdk_bdev_channel *ch) 1920 { 1921 struct spdk_bdev_io *bdev_io, *tmp; 1922 1923 TAILQ_FOREACH_SAFE(bdev_io, queue, internal.link, tmp) { 1924 if (bdev_io->internal.ch == ch) { 1925 TAILQ_REMOVE(queue, bdev_io, internal.link); 1926 /* 1927 * spdk_bdev_io_complete() assumes that the completed I/O had 1928 * been submitted to the bdev module. Since in this case it 1929 * hadn't, bump io_outstanding to account for the decrement 1930 * that spdk_bdev_io_complete() will do. 1931 */ 1932 if (bdev_io->type != SPDK_BDEV_IO_TYPE_RESET) { 1933 ch->io_outstanding++; 1934 ch->shared_resource->io_outstanding++; 1935 } 1936 spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED); 1937 } 1938 } 1939 } 1940 1941 static void 1942 spdk_bdev_qos_channel_destroy(void *cb_arg) 1943 { 1944 struct spdk_bdev_qos *qos = cb_arg; 1945 1946 spdk_put_io_channel(spdk_io_channel_from_ctx(qos->ch)); 1947 spdk_poller_unregister(&qos->poller); 1948 1949 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Free QoS %p.\n", qos); 1950 1951 free(qos); 1952 } 1953 1954 static int 1955 spdk_bdev_qos_destroy(struct spdk_bdev *bdev) 1956 { 1957 int i; 1958 1959 /* 1960 * Cleanly shutting down the QoS poller is tricky, because 1961 * during the asynchronous operation the user could open 1962 * a new descriptor and create a new channel, spawning 1963 * a new QoS poller. 1964 * 1965 * The strategy is to create a new QoS structure here and swap it 1966 * in. The shutdown path then continues to refer to the old one 1967 * until it completes and then releases it. 1968 */ 1969 struct spdk_bdev_qos *new_qos, *old_qos; 1970 1971 old_qos = bdev->internal.qos; 1972 1973 new_qos = calloc(1, sizeof(*new_qos)); 1974 if (!new_qos) { 1975 SPDK_ERRLOG("Unable to allocate memory to shut down QoS.\n"); 1976 return -ENOMEM; 1977 } 1978 1979 /* Copy the old QoS data into the newly allocated structure */ 1980 memcpy(new_qos, old_qos, sizeof(*new_qos)); 1981 1982 /* Zero out the key parts of the QoS structure */ 1983 new_qos->ch = NULL; 1984 new_qos->thread = NULL; 1985 new_qos->poller = NULL; 1986 TAILQ_INIT(&new_qos->queued); 1987 /* 1988 * The limit member of spdk_bdev_qos_limit structure is not zeroed. 1989 * It will be used later for the new QoS structure. 1990 */ 1991 for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) { 1992 new_qos->rate_limits[i].remaining_this_timeslice = 0; 1993 new_qos->rate_limits[i].min_per_timeslice = 0; 1994 new_qos->rate_limits[i].max_per_timeslice = 0; 1995 } 1996 1997 bdev->internal.qos = new_qos; 1998 1999 if (old_qos->thread == NULL) { 2000 free(old_qos); 2001 } else { 2002 spdk_thread_send_msg(old_qos->thread, spdk_bdev_qos_channel_destroy, 2003 old_qos); 2004 } 2005 2006 /* It is safe to continue with destroying the bdev even though the QoS channel hasn't 2007 * been destroyed yet. The destruction path will end up waiting for the final 2008 * channel to be put before it releases resources. */ 2009 2010 return 0; 2011 } 2012 2013 static void 2014 _spdk_bdev_io_stat_add(struct spdk_bdev_io_stat *total, struct spdk_bdev_io_stat *add) 2015 { 2016 total->bytes_read += add->bytes_read; 2017 total->num_read_ops += add->num_read_ops; 2018 total->bytes_written += add->bytes_written; 2019 total->num_write_ops += add->num_write_ops; 2020 total->read_latency_ticks += add->read_latency_ticks; 2021 total->write_latency_ticks += add->write_latency_ticks; 2022 } 2023 2024 static void 2025 spdk_bdev_channel_destroy(void *io_device, void *ctx_buf) 2026 { 2027 struct spdk_bdev_channel *ch = ctx_buf; 2028 struct spdk_bdev_mgmt_channel *mgmt_ch; 2029 struct spdk_bdev_shared_resource *shared_resource = ch->shared_resource; 2030 2031 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Destroying channel %p for bdev %s on thread %p\n", ch, ch->bdev->name, 2032 spdk_get_thread()); 2033 2034 /* This channel is going away, so add its statistics into the bdev so that they don't get lost. */ 2035 pthread_mutex_lock(&ch->bdev->internal.mutex); 2036 _spdk_bdev_io_stat_add(&ch->bdev->internal.stat, &ch->stat); 2037 pthread_mutex_unlock(&ch->bdev->internal.mutex); 2038 2039 mgmt_ch = shared_resource->mgmt_ch; 2040 2041 _spdk_bdev_abort_queued_io(&ch->queued_resets, ch); 2042 _spdk_bdev_abort_queued_io(&shared_resource->nomem_io, ch); 2043 _spdk_bdev_abort_buf_io(&mgmt_ch->need_buf_small, ch); 2044 _spdk_bdev_abort_buf_io(&mgmt_ch->need_buf_large, ch); 2045 2046 _spdk_bdev_channel_destroy_resource(ch); 2047 } 2048 2049 int 2050 spdk_bdev_alias_add(struct spdk_bdev *bdev, const char *alias) 2051 { 2052 struct spdk_bdev_alias *tmp; 2053 2054 if (alias == NULL) { 2055 SPDK_ERRLOG("Empty alias passed\n"); 2056 return -EINVAL; 2057 } 2058 2059 if (spdk_bdev_get_by_name(alias)) { 2060 SPDK_ERRLOG("Bdev name/alias: %s already exists\n", alias); 2061 return -EEXIST; 2062 } 2063 2064 tmp = calloc(1, sizeof(*tmp)); 2065 if (tmp == NULL) { 2066 SPDK_ERRLOG("Unable to allocate alias\n"); 2067 return -ENOMEM; 2068 } 2069 2070 tmp->alias = strdup(alias); 2071 if (tmp->alias == NULL) { 2072 free(tmp); 2073 SPDK_ERRLOG("Unable to allocate alias\n"); 2074 return -ENOMEM; 2075 } 2076 2077 TAILQ_INSERT_TAIL(&bdev->aliases, tmp, tailq); 2078 2079 return 0; 2080 } 2081 2082 int 2083 spdk_bdev_alias_del(struct spdk_bdev *bdev, const char *alias) 2084 { 2085 struct spdk_bdev_alias *tmp; 2086 2087 TAILQ_FOREACH(tmp, &bdev->aliases, tailq) { 2088 if (strcmp(alias, tmp->alias) == 0) { 2089 TAILQ_REMOVE(&bdev->aliases, tmp, tailq); 2090 free(tmp->alias); 2091 free(tmp); 2092 return 0; 2093 } 2094 } 2095 2096 SPDK_INFOLOG(SPDK_LOG_BDEV, "Alias %s does not exists\n", alias); 2097 2098 return -ENOENT; 2099 } 2100 2101 void 2102 spdk_bdev_alias_del_all(struct spdk_bdev *bdev) 2103 { 2104 struct spdk_bdev_alias *p, *tmp; 2105 2106 TAILQ_FOREACH_SAFE(p, &bdev->aliases, tailq, tmp) { 2107 TAILQ_REMOVE(&bdev->aliases, p, tailq); 2108 free(p->alias); 2109 free(p); 2110 } 2111 } 2112 2113 struct spdk_io_channel * 2114 spdk_bdev_get_io_channel(struct spdk_bdev_desc *desc) 2115 { 2116 return spdk_get_io_channel(__bdev_to_io_dev(desc->bdev)); 2117 } 2118 2119 const char * 2120 spdk_bdev_get_name(const struct spdk_bdev *bdev) 2121 { 2122 return bdev->name; 2123 } 2124 2125 const char * 2126 spdk_bdev_get_product_name(const struct spdk_bdev *bdev) 2127 { 2128 return bdev->product_name; 2129 } 2130 2131 const struct spdk_bdev_aliases_list * 2132 spdk_bdev_get_aliases(const struct spdk_bdev *bdev) 2133 { 2134 return &bdev->aliases; 2135 } 2136 2137 uint32_t 2138 spdk_bdev_get_block_size(const struct spdk_bdev *bdev) 2139 { 2140 return bdev->blocklen; 2141 } 2142 2143 uint64_t 2144 spdk_bdev_get_num_blocks(const struct spdk_bdev *bdev) 2145 { 2146 return bdev->blockcnt; 2147 } 2148 2149 const char * 2150 spdk_bdev_get_qos_rpc_type(enum spdk_bdev_qos_rate_limit_type type) 2151 { 2152 return qos_rpc_type[type]; 2153 } 2154 2155 void 2156 spdk_bdev_get_qos_rate_limits(struct spdk_bdev *bdev, uint64_t *limits) 2157 { 2158 int i; 2159 2160 memset(limits, 0, sizeof(*limits) * SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES); 2161 2162 pthread_mutex_lock(&bdev->internal.mutex); 2163 if (bdev->internal.qos) { 2164 for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) { 2165 if (bdev->internal.qos->rate_limits[i].limit != 2166 SPDK_BDEV_QOS_LIMIT_NOT_DEFINED) { 2167 limits[i] = bdev->internal.qos->rate_limits[i].limit; 2168 if (_spdk_bdev_qos_is_iops_rate_limit(i) == false) { 2169 /* Change from Byte to Megabyte which is user visible. */ 2170 limits[i] = limits[i] / 1024 / 1024; 2171 } 2172 } 2173 } 2174 } 2175 pthread_mutex_unlock(&bdev->internal.mutex); 2176 } 2177 2178 size_t 2179 spdk_bdev_get_buf_align(const struct spdk_bdev *bdev) 2180 { 2181 return 1 << bdev->required_alignment; 2182 } 2183 2184 uint32_t 2185 spdk_bdev_get_optimal_io_boundary(const struct spdk_bdev *bdev) 2186 { 2187 return bdev->optimal_io_boundary; 2188 } 2189 2190 bool 2191 spdk_bdev_has_write_cache(const struct spdk_bdev *bdev) 2192 { 2193 return bdev->write_cache; 2194 } 2195 2196 const struct spdk_uuid * 2197 spdk_bdev_get_uuid(const struct spdk_bdev *bdev) 2198 { 2199 return &bdev->uuid; 2200 } 2201 2202 uint64_t 2203 spdk_bdev_get_qd(const struct spdk_bdev *bdev) 2204 { 2205 return bdev->internal.measured_queue_depth; 2206 } 2207 2208 uint64_t 2209 spdk_bdev_get_qd_sampling_period(const struct spdk_bdev *bdev) 2210 { 2211 return bdev->internal.period; 2212 } 2213 2214 uint64_t 2215 spdk_bdev_get_weighted_io_time(const struct spdk_bdev *bdev) 2216 { 2217 return bdev->internal.weighted_io_time; 2218 } 2219 2220 uint64_t 2221 spdk_bdev_get_io_time(const struct spdk_bdev *bdev) 2222 { 2223 return bdev->internal.io_time; 2224 } 2225 2226 static void 2227 _calculate_measured_qd_cpl(struct spdk_io_channel_iter *i, int status) 2228 { 2229 struct spdk_bdev *bdev = spdk_io_channel_iter_get_ctx(i); 2230 2231 bdev->internal.measured_queue_depth = bdev->internal.temporary_queue_depth; 2232 2233 if (bdev->internal.measured_queue_depth) { 2234 bdev->internal.io_time += bdev->internal.period; 2235 bdev->internal.weighted_io_time += bdev->internal.period * bdev->internal.measured_queue_depth; 2236 } 2237 } 2238 2239 static void 2240 _calculate_measured_qd(struct spdk_io_channel_iter *i) 2241 { 2242 struct spdk_bdev *bdev = spdk_io_channel_iter_get_ctx(i); 2243 struct spdk_io_channel *io_ch = spdk_io_channel_iter_get_channel(i); 2244 struct spdk_bdev_channel *ch = spdk_io_channel_get_ctx(io_ch); 2245 2246 bdev->internal.temporary_queue_depth += ch->io_outstanding; 2247 spdk_for_each_channel_continue(i, 0); 2248 } 2249 2250 static int 2251 spdk_bdev_calculate_measured_queue_depth(void *ctx) 2252 { 2253 struct spdk_bdev *bdev = ctx; 2254 bdev->internal.temporary_queue_depth = 0; 2255 spdk_for_each_channel(__bdev_to_io_dev(bdev), _calculate_measured_qd, bdev, 2256 _calculate_measured_qd_cpl); 2257 return 0; 2258 } 2259 2260 void 2261 spdk_bdev_set_qd_sampling_period(struct spdk_bdev *bdev, uint64_t period) 2262 { 2263 bdev->internal.period = period; 2264 2265 if (bdev->internal.qd_poller != NULL) { 2266 spdk_poller_unregister(&bdev->internal.qd_poller); 2267 bdev->internal.measured_queue_depth = UINT64_MAX; 2268 } 2269 2270 if (period != 0) { 2271 bdev->internal.qd_poller = spdk_poller_register(spdk_bdev_calculate_measured_queue_depth, bdev, 2272 period); 2273 } 2274 } 2275 2276 int 2277 spdk_bdev_notify_blockcnt_change(struct spdk_bdev *bdev, uint64_t size) 2278 { 2279 int ret; 2280 2281 pthread_mutex_lock(&bdev->internal.mutex); 2282 2283 /* bdev has open descriptors */ 2284 if (!TAILQ_EMPTY(&bdev->internal.open_descs) && 2285 bdev->blockcnt > size) { 2286 ret = -EBUSY; 2287 } else { 2288 bdev->blockcnt = size; 2289 ret = 0; 2290 } 2291 2292 pthread_mutex_unlock(&bdev->internal.mutex); 2293 2294 return ret; 2295 } 2296 2297 /* 2298 * Convert I/O offset and length from bytes to blocks. 2299 * 2300 * Returns zero on success or non-zero if the byte parameters aren't divisible by the block size. 2301 */ 2302 static uint64_t 2303 spdk_bdev_bytes_to_blocks(struct spdk_bdev *bdev, uint64_t offset_bytes, uint64_t *offset_blocks, 2304 uint64_t num_bytes, uint64_t *num_blocks) 2305 { 2306 uint32_t block_size = bdev->blocklen; 2307 2308 *offset_blocks = offset_bytes / block_size; 2309 *num_blocks = num_bytes / block_size; 2310 2311 return (offset_bytes % block_size) | (num_bytes % block_size); 2312 } 2313 2314 static bool 2315 spdk_bdev_io_valid_blocks(struct spdk_bdev *bdev, uint64_t offset_blocks, uint64_t num_blocks) 2316 { 2317 /* Return failure if offset_blocks + num_blocks is less than offset_blocks; indicates there 2318 * has been an overflow and hence the offset has been wrapped around */ 2319 if (offset_blocks + num_blocks < offset_blocks) { 2320 return false; 2321 } 2322 2323 /* Return failure if offset_blocks + num_blocks exceeds the size of the bdev */ 2324 if (offset_blocks + num_blocks > bdev->blockcnt) { 2325 return false; 2326 } 2327 2328 return true; 2329 } 2330 2331 int 2332 spdk_bdev_read(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 2333 void *buf, uint64_t offset, uint64_t nbytes, 2334 spdk_bdev_io_completion_cb cb, void *cb_arg) 2335 { 2336 uint64_t offset_blocks, num_blocks; 2337 2338 if (spdk_bdev_bytes_to_blocks(desc->bdev, offset, &offset_blocks, nbytes, &num_blocks) != 0) { 2339 return -EINVAL; 2340 } 2341 2342 return spdk_bdev_read_blocks(desc, ch, buf, offset_blocks, num_blocks, cb, cb_arg); 2343 } 2344 2345 int 2346 spdk_bdev_read_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 2347 void *buf, uint64_t offset_blocks, uint64_t num_blocks, 2348 spdk_bdev_io_completion_cb cb, void *cb_arg) 2349 { 2350 struct spdk_bdev *bdev = desc->bdev; 2351 struct spdk_bdev_io *bdev_io; 2352 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 2353 2354 if (!spdk_bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) { 2355 return -EINVAL; 2356 } 2357 2358 bdev_io = spdk_bdev_get_io(channel); 2359 if (!bdev_io) { 2360 return -ENOMEM; 2361 } 2362 2363 bdev_io->internal.ch = channel; 2364 bdev_io->internal.desc = desc; 2365 bdev_io->type = SPDK_BDEV_IO_TYPE_READ; 2366 bdev_io->u.bdev.iovs = &bdev_io->iov; 2367 bdev_io->u.bdev.iovs[0].iov_base = buf; 2368 bdev_io->u.bdev.iovs[0].iov_len = num_blocks * bdev->blocklen; 2369 bdev_io->u.bdev.iovcnt = 1; 2370 bdev_io->u.bdev.num_blocks = num_blocks; 2371 bdev_io->u.bdev.offset_blocks = offset_blocks; 2372 spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb); 2373 2374 spdk_bdev_io_submit(bdev_io); 2375 return 0; 2376 } 2377 2378 int 2379 spdk_bdev_readv(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 2380 struct iovec *iov, int iovcnt, 2381 uint64_t offset, uint64_t nbytes, 2382 spdk_bdev_io_completion_cb cb, void *cb_arg) 2383 { 2384 uint64_t offset_blocks, num_blocks; 2385 2386 if (spdk_bdev_bytes_to_blocks(desc->bdev, offset, &offset_blocks, nbytes, &num_blocks) != 0) { 2387 return -EINVAL; 2388 } 2389 2390 return spdk_bdev_readv_blocks(desc, ch, iov, iovcnt, offset_blocks, num_blocks, cb, cb_arg); 2391 } 2392 2393 int spdk_bdev_readv_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 2394 struct iovec *iov, int iovcnt, 2395 uint64_t offset_blocks, uint64_t num_blocks, 2396 spdk_bdev_io_completion_cb cb, void *cb_arg) 2397 { 2398 struct spdk_bdev *bdev = desc->bdev; 2399 struct spdk_bdev_io *bdev_io; 2400 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 2401 2402 if (!spdk_bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) { 2403 return -EINVAL; 2404 } 2405 2406 bdev_io = spdk_bdev_get_io(channel); 2407 if (!bdev_io) { 2408 return -ENOMEM; 2409 } 2410 2411 bdev_io->internal.ch = channel; 2412 bdev_io->internal.desc = desc; 2413 bdev_io->type = SPDK_BDEV_IO_TYPE_READ; 2414 bdev_io->u.bdev.iovs = iov; 2415 bdev_io->u.bdev.iovcnt = iovcnt; 2416 bdev_io->u.bdev.num_blocks = num_blocks; 2417 bdev_io->u.bdev.offset_blocks = offset_blocks; 2418 spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb); 2419 2420 spdk_bdev_io_submit(bdev_io); 2421 return 0; 2422 } 2423 2424 int 2425 spdk_bdev_write(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 2426 void *buf, uint64_t offset, uint64_t nbytes, 2427 spdk_bdev_io_completion_cb cb, void *cb_arg) 2428 { 2429 uint64_t offset_blocks, num_blocks; 2430 2431 if (spdk_bdev_bytes_to_blocks(desc->bdev, offset, &offset_blocks, nbytes, &num_blocks) != 0) { 2432 return -EINVAL; 2433 } 2434 2435 return spdk_bdev_write_blocks(desc, ch, buf, offset_blocks, num_blocks, cb, cb_arg); 2436 } 2437 2438 int 2439 spdk_bdev_write_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 2440 void *buf, uint64_t offset_blocks, uint64_t num_blocks, 2441 spdk_bdev_io_completion_cb cb, void *cb_arg) 2442 { 2443 struct spdk_bdev *bdev = desc->bdev; 2444 struct spdk_bdev_io *bdev_io; 2445 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 2446 2447 if (!desc->write) { 2448 return -EBADF; 2449 } 2450 2451 if (!spdk_bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) { 2452 return -EINVAL; 2453 } 2454 2455 bdev_io = spdk_bdev_get_io(channel); 2456 if (!bdev_io) { 2457 return -ENOMEM; 2458 } 2459 2460 bdev_io->internal.ch = channel; 2461 bdev_io->internal.desc = desc; 2462 bdev_io->type = SPDK_BDEV_IO_TYPE_WRITE; 2463 bdev_io->u.bdev.iovs = &bdev_io->iov; 2464 bdev_io->u.bdev.iovs[0].iov_base = buf; 2465 bdev_io->u.bdev.iovs[0].iov_len = num_blocks * bdev->blocklen; 2466 bdev_io->u.bdev.iovcnt = 1; 2467 bdev_io->u.bdev.num_blocks = num_blocks; 2468 bdev_io->u.bdev.offset_blocks = offset_blocks; 2469 spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb); 2470 2471 spdk_bdev_io_submit(bdev_io); 2472 return 0; 2473 } 2474 2475 int 2476 spdk_bdev_writev(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 2477 struct iovec *iov, int iovcnt, 2478 uint64_t offset, uint64_t len, 2479 spdk_bdev_io_completion_cb cb, void *cb_arg) 2480 { 2481 uint64_t offset_blocks, num_blocks; 2482 2483 if (spdk_bdev_bytes_to_blocks(desc->bdev, offset, &offset_blocks, len, &num_blocks) != 0) { 2484 return -EINVAL; 2485 } 2486 2487 return spdk_bdev_writev_blocks(desc, ch, iov, iovcnt, offset_blocks, num_blocks, cb, cb_arg); 2488 } 2489 2490 int 2491 spdk_bdev_writev_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 2492 struct iovec *iov, int iovcnt, 2493 uint64_t offset_blocks, uint64_t num_blocks, 2494 spdk_bdev_io_completion_cb cb, void *cb_arg) 2495 { 2496 struct spdk_bdev *bdev = desc->bdev; 2497 struct spdk_bdev_io *bdev_io; 2498 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 2499 2500 if (!desc->write) { 2501 return -EBADF; 2502 } 2503 2504 if (!spdk_bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) { 2505 return -EINVAL; 2506 } 2507 2508 bdev_io = spdk_bdev_get_io(channel); 2509 if (!bdev_io) { 2510 return -ENOMEM; 2511 } 2512 2513 bdev_io->internal.ch = channel; 2514 bdev_io->internal.desc = desc; 2515 bdev_io->type = SPDK_BDEV_IO_TYPE_WRITE; 2516 bdev_io->u.bdev.iovs = iov; 2517 bdev_io->u.bdev.iovcnt = iovcnt; 2518 bdev_io->u.bdev.num_blocks = num_blocks; 2519 bdev_io->u.bdev.offset_blocks = offset_blocks; 2520 spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb); 2521 2522 spdk_bdev_io_submit(bdev_io); 2523 return 0; 2524 } 2525 2526 int 2527 spdk_bdev_write_zeroes(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 2528 uint64_t offset, uint64_t len, 2529 spdk_bdev_io_completion_cb cb, void *cb_arg) 2530 { 2531 uint64_t offset_blocks, num_blocks; 2532 2533 if (spdk_bdev_bytes_to_blocks(desc->bdev, offset, &offset_blocks, len, &num_blocks) != 0) { 2534 return -EINVAL; 2535 } 2536 2537 return spdk_bdev_write_zeroes_blocks(desc, ch, offset_blocks, num_blocks, cb, cb_arg); 2538 } 2539 2540 int 2541 spdk_bdev_write_zeroes_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 2542 uint64_t offset_blocks, uint64_t num_blocks, 2543 spdk_bdev_io_completion_cb cb, void *cb_arg) 2544 { 2545 struct spdk_bdev *bdev = desc->bdev; 2546 struct spdk_bdev_io *bdev_io; 2547 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 2548 2549 if (!desc->write) { 2550 return -EBADF; 2551 } 2552 2553 if (!spdk_bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) { 2554 return -EINVAL; 2555 } 2556 2557 bdev_io = spdk_bdev_get_io(channel); 2558 2559 if (!bdev_io) { 2560 return -ENOMEM; 2561 } 2562 2563 bdev_io->type = SPDK_BDEV_IO_TYPE_WRITE_ZEROES; 2564 bdev_io->internal.ch = channel; 2565 bdev_io->internal.desc = desc; 2566 bdev_io->u.bdev.offset_blocks = offset_blocks; 2567 bdev_io->u.bdev.num_blocks = num_blocks; 2568 spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb); 2569 2570 if (_spdk_bdev_io_type_supported(bdev, SPDK_BDEV_IO_TYPE_WRITE_ZEROES)) { 2571 spdk_bdev_io_submit(bdev_io); 2572 return 0; 2573 } else if (_spdk_bdev_io_type_supported(bdev, SPDK_BDEV_IO_TYPE_WRITE)) { 2574 assert(spdk_bdev_get_block_size(bdev) <= ZERO_BUFFER_SIZE); 2575 bdev_io->u.bdev.split_remaining_num_blocks = num_blocks; 2576 bdev_io->u.bdev.split_current_offset_blocks = offset_blocks; 2577 _spdk_bdev_write_zero_buffer_next(bdev_io); 2578 return 0; 2579 } else { 2580 spdk_bdev_free_io(bdev_io); 2581 return -ENOTSUP; 2582 } 2583 } 2584 2585 int 2586 spdk_bdev_unmap(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 2587 uint64_t offset, uint64_t nbytes, 2588 spdk_bdev_io_completion_cb cb, void *cb_arg) 2589 { 2590 uint64_t offset_blocks, num_blocks; 2591 2592 if (spdk_bdev_bytes_to_blocks(desc->bdev, offset, &offset_blocks, nbytes, &num_blocks) != 0) { 2593 return -EINVAL; 2594 } 2595 2596 return spdk_bdev_unmap_blocks(desc, ch, offset_blocks, num_blocks, cb, cb_arg); 2597 } 2598 2599 int 2600 spdk_bdev_unmap_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 2601 uint64_t offset_blocks, uint64_t num_blocks, 2602 spdk_bdev_io_completion_cb cb, void *cb_arg) 2603 { 2604 struct spdk_bdev *bdev = desc->bdev; 2605 struct spdk_bdev_io *bdev_io; 2606 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 2607 2608 if (!desc->write) { 2609 return -EBADF; 2610 } 2611 2612 if (!spdk_bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) { 2613 return -EINVAL; 2614 } 2615 2616 if (num_blocks == 0) { 2617 SPDK_ERRLOG("Can't unmap 0 bytes\n"); 2618 return -EINVAL; 2619 } 2620 2621 bdev_io = spdk_bdev_get_io(channel); 2622 if (!bdev_io) { 2623 return -ENOMEM; 2624 } 2625 2626 bdev_io->internal.ch = channel; 2627 bdev_io->internal.desc = desc; 2628 bdev_io->type = SPDK_BDEV_IO_TYPE_UNMAP; 2629 2630 bdev_io->u.bdev.iovs = &bdev_io->iov; 2631 bdev_io->u.bdev.iovs[0].iov_base = NULL; 2632 bdev_io->u.bdev.iovs[0].iov_len = 0; 2633 bdev_io->u.bdev.iovcnt = 1; 2634 2635 bdev_io->u.bdev.offset_blocks = offset_blocks; 2636 bdev_io->u.bdev.num_blocks = num_blocks; 2637 spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb); 2638 2639 spdk_bdev_io_submit(bdev_io); 2640 return 0; 2641 } 2642 2643 int 2644 spdk_bdev_flush(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 2645 uint64_t offset, uint64_t length, 2646 spdk_bdev_io_completion_cb cb, void *cb_arg) 2647 { 2648 uint64_t offset_blocks, num_blocks; 2649 2650 if (spdk_bdev_bytes_to_blocks(desc->bdev, offset, &offset_blocks, length, &num_blocks) != 0) { 2651 return -EINVAL; 2652 } 2653 2654 return spdk_bdev_flush_blocks(desc, ch, offset_blocks, num_blocks, cb, cb_arg); 2655 } 2656 2657 int 2658 spdk_bdev_flush_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 2659 uint64_t offset_blocks, uint64_t num_blocks, 2660 spdk_bdev_io_completion_cb cb, void *cb_arg) 2661 { 2662 struct spdk_bdev *bdev = desc->bdev; 2663 struct spdk_bdev_io *bdev_io; 2664 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 2665 2666 if (!desc->write) { 2667 return -EBADF; 2668 } 2669 2670 if (!spdk_bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) { 2671 return -EINVAL; 2672 } 2673 2674 bdev_io = spdk_bdev_get_io(channel); 2675 if (!bdev_io) { 2676 return -ENOMEM; 2677 } 2678 2679 bdev_io->internal.ch = channel; 2680 bdev_io->internal.desc = desc; 2681 bdev_io->type = SPDK_BDEV_IO_TYPE_FLUSH; 2682 bdev_io->u.bdev.iovs = NULL; 2683 bdev_io->u.bdev.iovcnt = 0; 2684 bdev_io->u.bdev.offset_blocks = offset_blocks; 2685 bdev_io->u.bdev.num_blocks = num_blocks; 2686 spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb); 2687 2688 spdk_bdev_io_submit(bdev_io); 2689 return 0; 2690 } 2691 2692 static void 2693 _spdk_bdev_reset_dev(struct spdk_io_channel_iter *i, int status) 2694 { 2695 struct spdk_bdev_channel *ch = spdk_io_channel_iter_get_ctx(i); 2696 struct spdk_bdev_io *bdev_io; 2697 2698 bdev_io = TAILQ_FIRST(&ch->queued_resets); 2699 TAILQ_REMOVE(&ch->queued_resets, bdev_io, internal.link); 2700 spdk_bdev_io_submit_reset(bdev_io); 2701 } 2702 2703 static void 2704 _spdk_bdev_reset_freeze_channel(struct spdk_io_channel_iter *i) 2705 { 2706 struct spdk_io_channel *ch; 2707 struct spdk_bdev_channel *channel; 2708 struct spdk_bdev_mgmt_channel *mgmt_channel; 2709 struct spdk_bdev_shared_resource *shared_resource; 2710 bdev_io_tailq_t tmp_queued; 2711 2712 TAILQ_INIT(&tmp_queued); 2713 2714 ch = spdk_io_channel_iter_get_channel(i); 2715 channel = spdk_io_channel_get_ctx(ch); 2716 shared_resource = channel->shared_resource; 2717 mgmt_channel = shared_resource->mgmt_ch; 2718 2719 channel->flags |= BDEV_CH_RESET_IN_PROGRESS; 2720 2721 if ((channel->flags & BDEV_CH_QOS_ENABLED) != 0) { 2722 /* The QoS object is always valid and readable while 2723 * the channel flag is set, so the lock here should not 2724 * be necessary. We're not in the fast path though, so 2725 * just take it anyway. */ 2726 pthread_mutex_lock(&channel->bdev->internal.mutex); 2727 if (channel->bdev->internal.qos->ch == channel) { 2728 TAILQ_SWAP(&channel->bdev->internal.qos->queued, &tmp_queued, spdk_bdev_io, internal.link); 2729 } 2730 pthread_mutex_unlock(&channel->bdev->internal.mutex); 2731 } 2732 2733 _spdk_bdev_abort_queued_io(&shared_resource->nomem_io, channel); 2734 _spdk_bdev_abort_buf_io(&mgmt_channel->need_buf_small, channel); 2735 _spdk_bdev_abort_buf_io(&mgmt_channel->need_buf_large, channel); 2736 _spdk_bdev_abort_queued_io(&tmp_queued, channel); 2737 2738 spdk_for_each_channel_continue(i, 0); 2739 } 2740 2741 static void 2742 _spdk_bdev_start_reset(void *ctx) 2743 { 2744 struct spdk_bdev_channel *ch = ctx; 2745 2746 spdk_for_each_channel(__bdev_to_io_dev(ch->bdev), _spdk_bdev_reset_freeze_channel, 2747 ch, _spdk_bdev_reset_dev); 2748 } 2749 2750 static void 2751 _spdk_bdev_channel_start_reset(struct spdk_bdev_channel *ch) 2752 { 2753 struct spdk_bdev *bdev = ch->bdev; 2754 2755 assert(!TAILQ_EMPTY(&ch->queued_resets)); 2756 2757 pthread_mutex_lock(&bdev->internal.mutex); 2758 if (bdev->internal.reset_in_progress == NULL) { 2759 bdev->internal.reset_in_progress = TAILQ_FIRST(&ch->queued_resets); 2760 /* 2761 * Take a channel reference for the target bdev for the life of this 2762 * reset. This guards against the channel getting destroyed while 2763 * spdk_for_each_channel() calls related to this reset IO are in 2764 * progress. We will release the reference when this reset is 2765 * completed. 2766 */ 2767 bdev->internal.reset_in_progress->u.reset.ch_ref = spdk_get_io_channel(__bdev_to_io_dev(bdev)); 2768 _spdk_bdev_start_reset(ch); 2769 } 2770 pthread_mutex_unlock(&bdev->internal.mutex); 2771 } 2772 2773 int 2774 spdk_bdev_reset(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 2775 spdk_bdev_io_completion_cb cb, void *cb_arg) 2776 { 2777 struct spdk_bdev *bdev = desc->bdev; 2778 struct spdk_bdev_io *bdev_io; 2779 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 2780 2781 bdev_io = spdk_bdev_get_io(channel); 2782 if (!bdev_io) { 2783 return -ENOMEM; 2784 } 2785 2786 bdev_io->internal.ch = channel; 2787 bdev_io->internal.desc = desc; 2788 bdev_io->type = SPDK_BDEV_IO_TYPE_RESET; 2789 bdev_io->u.reset.ch_ref = NULL; 2790 spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb); 2791 2792 pthread_mutex_lock(&bdev->internal.mutex); 2793 TAILQ_INSERT_TAIL(&channel->queued_resets, bdev_io, internal.link); 2794 pthread_mutex_unlock(&bdev->internal.mutex); 2795 2796 _spdk_bdev_channel_start_reset(channel); 2797 2798 return 0; 2799 } 2800 2801 void 2802 spdk_bdev_get_io_stat(struct spdk_bdev *bdev, struct spdk_io_channel *ch, 2803 struct spdk_bdev_io_stat *stat) 2804 { 2805 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 2806 2807 *stat = channel->stat; 2808 } 2809 2810 static void 2811 _spdk_bdev_get_device_stat_done(struct spdk_io_channel_iter *i, int status) 2812 { 2813 void *io_device = spdk_io_channel_iter_get_io_device(i); 2814 struct spdk_bdev_iostat_ctx *bdev_iostat_ctx = spdk_io_channel_iter_get_ctx(i); 2815 2816 bdev_iostat_ctx->cb(__bdev_from_io_dev(io_device), bdev_iostat_ctx->stat, 2817 bdev_iostat_ctx->cb_arg, 0); 2818 free(bdev_iostat_ctx); 2819 } 2820 2821 static void 2822 _spdk_bdev_get_each_channel_stat(struct spdk_io_channel_iter *i) 2823 { 2824 struct spdk_bdev_iostat_ctx *bdev_iostat_ctx = spdk_io_channel_iter_get_ctx(i); 2825 struct spdk_io_channel *ch = spdk_io_channel_iter_get_channel(i); 2826 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 2827 2828 _spdk_bdev_io_stat_add(bdev_iostat_ctx->stat, &channel->stat); 2829 spdk_for_each_channel_continue(i, 0); 2830 } 2831 2832 void 2833 spdk_bdev_get_device_stat(struct spdk_bdev *bdev, struct spdk_bdev_io_stat *stat, 2834 spdk_bdev_get_device_stat_cb cb, void *cb_arg) 2835 { 2836 struct spdk_bdev_iostat_ctx *bdev_iostat_ctx; 2837 2838 assert(bdev != NULL); 2839 assert(stat != NULL); 2840 assert(cb != NULL); 2841 2842 bdev_iostat_ctx = calloc(1, sizeof(struct spdk_bdev_iostat_ctx)); 2843 if (bdev_iostat_ctx == NULL) { 2844 SPDK_ERRLOG("Unable to allocate memory for spdk_bdev_iostat_ctx\n"); 2845 cb(bdev, stat, cb_arg, -ENOMEM); 2846 return; 2847 } 2848 2849 bdev_iostat_ctx->stat = stat; 2850 bdev_iostat_ctx->cb = cb; 2851 bdev_iostat_ctx->cb_arg = cb_arg; 2852 2853 /* Start with the statistics from previously deleted channels. */ 2854 pthread_mutex_lock(&bdev->internal.mutex); 2855 _spdk_bdev_io_stat_add(bdev_iostat_ctx->stat, &bdev->internal.stat); 2856 pthread_mutex_unlock(&bdev->internal.mutex); 2857 2858 /* Then iterate and add the statistics from each existing channel. */ 2859 spdk_for_each_channel(__bdev_to_io_dev(bdev), 2860 _spdk_bdev_get_each_channel_stat, 2861 bdev_iostat_ctx, 2862 _spdk_bdev_get_device_stat_done); 2863 } 2864 2865 int 2866 spdk_bdev_nvme_admin_passthru(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 2867 const struct spdk_nvme_cmd *cmd, void *buf, size_t nbytes, 2868 spdk_bdev_io_completion_cb cb, void *cb_arg) 2869 { 2870 struct spdk_bdev *bdev = desc->bdev; 2871 struct spdk_bdev_io *bdev_io; 2872 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 2873 2874 if (!desc->write) { 2875 return -EBADF; 2876 } 2877 2878 bdev_io = spdk_bdev_get_io(channel); 2879 if (!bdev_io) { 2880 return -ENOMEM; 2881 } 2882 2883 bdev_io->internal.ch = channel; 2884 bdev_io->internal.desc = desc; 2885 bdev_io->type = SPDK_BDEV_IO_TYPE_NVME_ADMIN; 2886 bdev_io->u.nvme_passthru.cmd = *cmd; 2887 bdev_io->u.nvme_passthru.buf = buf; 2888 bdev_io->u.nvme_passthru.nbytes = nbytes; 2889 bdev_io->u.nvme_passthru.md_buf = NULL; 2890 bdev_io->u.nvme_passthru.md_len = 0; 2891 2892 spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb); 2893 2894 spdk_bdev_io_submit(bdev_io); 2895 return 0; 2896 } 2897 2898 int 2899 spdk_bdev_nvme_io_passthru(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 2900 const struct spdk_nvme_cmd *cmd, void *buf, size_t nbytes, 2901 spdk_bdev_io_completion_cb cb, void *cb_arg) 2902 { 2903 struct spdk_bdev *bdev = desc->bdev; 2904 struct spdk_bdev_io *bdev_io; 2905 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 2906 2907 if (!desc->write) { 2908 /* 2909 * Do not try to parse the NVMe command - we could maybe use bits in the opcode 2910 * to easily determine if the command is a read or write, but for now just 2911 * do not allow io_passthru with a read-only descriptor. 2912 */ 2913 return -EBADF; 2914 } 2915 2916 bdev_io = spdk_bdev_get_io(channel); 2917 if (!bdev_io) { 2918 return -ENOMEM; 2919 } 2920 2921 bdev_io->internal.ch = channel; 2922 bdev_io->internal.desc = desc; 2923 bdev_io->type = SPDK_BDEV_IO_TYPE_NVME_IO; 2924 bdev_io->u.nvme_passthru.cmd = *cmd; 2925 bdev_io->u.nvme_passthru.buf = buf; 2926 bdev_io->u.nvme_passthru.nbytes = nbytes; 2927 bdev_io->u.nvme_passthru.md_buf = NULL; 2928 bdev_io->u.nvme_passthru.md_len = 0; 2929 2930 spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb); 2931 2932 spdk_bdev_io_submit(bdev_io); 2933 return 0; 2934 } 2935 2936 int 2937 spdk_bdev_nvme_io_passthru_md(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 2938 const struct spdk_nvme_cmd *cmd, void *buf, size_t nbytes, void *md_buf, size_t md_len, 2939 spdk_bdev_io_completion_cb cb, void *cb_arg) 2940 { 2941 struct spdk_bdev *bdev = desc->bdev; 2942 struct spdk_bdev_io *bdev_io; 2943 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 2944 2945 if (!desc->write) { 2946 /* 2947 * Do not try to parse the NVMe command - we could maybe use bits in the opcode 2948 * to easily determine if the command is a read or write, but for now just 2949 * do not allow io_passthru with a read-only descriptor. 2950 */ 2951 return -EBADF; 2952 } 2953 2954 bdev_io = spdk_bdev_get_io(channel); 2955 if (!bdev_io) { 2956 return -ENOMEM; 2957 } 2958 2959 bdev_io->internal.ch = channel; 2960 bdev_io->internal.desc = desc; 2961 bdev_io->type = SPDK_BDEV_IO_TYPE_NVME_IO_MD; 2962 bdev_io->u.nvme_passthru.cmd = *cmd; 2963 bdev_io->u.nvme_passthru.buf = buf; 2964 bdev_io->u.nvme_passthru.nbytes = nbytes; 2965 bdev_io->u.nvme_passthru.md_buf = md_buf; 2966 bdev_io->u.nvme_passthru.md_len = md_len; 2967 2968 spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb); 2969 2970 spdk_bdev_io_submit(bdev_io); 2971 return 0; 2972 } 2973 2974 int 2975 spdk_bdev_queue_io_wait(struct spdk_bdev *bdev, struct spdk_io_channel *ch, 2976 struct spdk_bdev_io_wait_entry *entry) 2977 { 2978 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 2979 struct spdk_bdev_mgmt_channel *mgmt_ch = channel->shared_resource->mgmt_ch; 2980 2981 if (bdev != entry->bdev) { 2982 SPDK_ERRLOG("bdevs do not match\n"); 2983 return -EINVAL; 2984 } 2985 2986 if (mgmt_ch->per_thread_cache_count > 0) { 2987 SPDK_ERRLOG("Cannot queue io_wait if spdk_bdev_io available in per-thread cache\n"); 2988 return -EINVAL; 2989 } 2990 2991 TAILQ_INSERT_TAIL(&mgmt_ch->io_wait_queue, entry, link); 2992 return 0; 2993 } 2994 2995 static void 2996 _spdk_bdev_ch_retry_io(struct spdk_bdev_channel *bdev_ch) 2997 { 2998 struct spdk_bdev *bdev = bdev_ch->bdev; 2999 struct spdk_bdev_shared_resource *shared_resource = bdev_ch->shared_resource; 3000 struct spdk_bdev_io *bdev_io; 3001 3002 if (shared_resource->io_outstanding > shared_resource->nomem_threshold) { 3003 /* 3004 * Allow some more I/O to complete before retrying the nomem_io queue. 3005 * Some drivers (such as nvme) cannot immediately take a new I/O in 3006 * the context of a completion, because the resources for the I/O are 3007 * not released until control returns to the bdev poller. Also, we 3008 * may require several small I/O to complete before a larger I/O 3009 * (that requires splitting) can be submitted. 3010 */ 3011 return; 3012 } 3013 3014 while (!TAILQ_EMPTY(&shared_resource->nomem_io)) { 3015 bdev_io = TAILQ_FIRST(&shared_resource->nomem_io); 3016 TAILQ_REMOVE(&shared_resource->nomem_io, bdev_io, internal.link); 3017 bdev_io->internal.ch->io_outstanding++; 3018 shared_resource->io_outstanding++; 3019 bdev_io->internal.status = SPDK_BDEV_IO_STATUS_PENDING; 3020 bdev->fn_table->submit_request(bdev_io->internal.ch->channel, bdev_io); 3021 if (bdev_io->internal.status == SPDK_BDEV_IO_STATUS_NOMEM) { 3022 break; 3023 } 3024 } 3025 } 3026 3027 static inline void 3028 _spdk_bdev_io_complete(void *ctx) 3029 { 3030 struct spdk_bdev_io *bdev_io = ctx; 3031 uint64_t tsc, tsc_diff; 3032 3033 if (spdk_unlikely(bdev_io->internal.in_submit_request || bdev_io->internal.io_submit_ch)) { 3034 /* 3035 * Send the completion to the thread that originally submitted the I/O, 3036 * which may not be the current thread in the case of QoS. 3037 */ 3038 if (bdev_io->internal.io_submit_ch) { 3039 bdev_io->internal.ch = bdev_io->internal.io_submit_ch; 3040 bdev_io->internal.io_submit_ch = NULL; 3041 } 3042 3043 /* 3044 * Defer completion to avoid potential infinite recursion if the 3045 * user's completion callback issues a new I/O. 3046 */ 3047 spdk_thread_send_msg(spdk_io_channel_get_thread(bdev_io->internal.ch->channel), 3048 _spdk_bdev_io_complete, bdev_io); 3049 return; 3050 } 3051 3052 tsc = spdk_get_ticks(); 3053 tsc_diff = tsc - bdev_io->internal.submit_tsc; 3054 spdk_trace_record_tsc(tsc, TRACE_BDEV_IO_DONE, 0, 0, (uintptr_t)bdev_io, 0); 3055 3056 if (bdev_io->internal.status == SPDK_BDEV_IO_STATUS_SUCCESS) { 3057 switch (bdev_io->type) { 3058 case SPDK_BDEV_IO_TYPE_READ: 3059 bdev_io->internal.ch->stat.bytes_read += bdev_io->u.bdev.num_blocks * bdev_io->bdev->blocklen; 3060 bdev_io->internal.ch->stat.num_read_ops++; 3061 bdev_io->internal.ch->stat.read_latency_ticks += tsc_diff; 3062 break; 3063 case SPDK_BDEV_IO_TYPE_WRITE: 3064 bdev_io->internal.ch->stat.bytes_written += bdev_io->u.bdev.num_blocks * bdev_io->bdev->blocklen; 3065 bdev_io->internal.ch->stat.num_write_ops++; 3066 bdev_io->internal.ch->stat.write_latency_ticks += tsc_diff; 3067 break; 3068 default: 3069 break; 3070 } 3071 } 3072 3073 #ifdef SPDK_CONFIG_VTUNE 3074 uint64_t now_tsc = spdk_get_ticks(); 3075 if (now_tsc > (bdev_io->internal.ch->start_tsc + bdev_io->internal.ch->interval_tsc)) { 3076 uint64_t data[5]; 3077 3078 data[0] = bdev_io->internal.ch->stat.num_read_ops - bdev_io->internal.ch->prev_stat.num_read_ops; 3079 data[1] = bdev_io->internal.ch->stat.bytes_read - bdev_io->internal.ch->prev_stat.bytes_read; 3080 data[2] = bdev_io->internal.ch->stat.num_write_ops - bdev_io->internal.ch->prev_stat.num_write_ops; 3081 data[3] = bdev_io->internal.ch->stat.bytes_written - bdev_io->internal.ch->prev_stat.bytes_written; 3082 data[4] = bdev_io->bdev->fn_table->get_spin_time ? 3083 bdev_io->bdev->fn_table->get_spin_time(bdev_io->internal.ch->channel) : 0; 3084 3085 __itt_metadata_add(g_bdev_mgr.domain, __itt_null, bdev_io->internal.ch->handle, 3086 __itt_metadata_u64, 5, data); 3087 3088 bdev_io->internal.ch->prev_stat = bdev_io->internal.ch->stat; 3089 bdev_io->internal.ch->start_tsc = now_tsc; 3090 } 3091 #endif 3092 3093 assert(bdev_io->internal.cb != NULL); 3094 assert(spdk_get_thread() == spdk_io_channel_get_thread(bdev_io->internal.ch->channel)); 3095 3096 bdev_io->internal.cb(bdev_io, bdev_io->internal.status == SPDK_BDEV_IO_STATUS_SUCCESS, 3097 bdev_io->internal.caller_ctx); 3098 } 3099 3100 static void 3101 _spdk_bdev_reset_complete(struct spdk_io_channel_iter *i, int status) 3102 { 3103 struct spdk_bdev_io *bdev_io = spdk_io_channel_iter_get_ctx(i); 3104 3105 if (bdev_io->u.reset.ch_ref != NULL) { 3106 spdk_put_io_channel(bdev_io->u.reset.ch_ref); 3107 bdev_io->u.reset.ch_ref = NULL; 3108 } 3109 3110 _spdk_bdev_io_complete(bdev_io); 3111 } 3112 3113 static void 3114 _spdk_bdev_unfreeze_channel(struct spdk_io_channel_iter *i) 3115 { 3116 struct spdk_io_channel *_ch = spdk_io_channel_iter_get_channel(i); 3117 struct spdk_bdev_channel *ch = spdk_io_channel_get_ctx(_ch); 3118 3119 ch->flags &= ~BDEV_CH_RESET_IN_PROGRESS; 3120 if (!TAILQ_EMPTY(&ch->queued_resets)) { 3121 _spdk_bdev_channel_start_reset(ch); 3122 } 3123 3124 spdk_for_each_channel_continue(i, 0); 3125 } 3126 3127 void 3128 spdk_bdev_io_complete(struct spdk_bdev_io *bdev_io, enum spdk_bdev_io_status status) 3129 { 3130 struct spdk_bdev *bdev = bdev_io->bdev; 3131 struct spdk_bdev_channel *bdev_ch = bdev_io->internal.ch; 3132 struct spdk_bdev_shared_resource *shared_resource = bdev_ch->shared_resource; 3133 3134 bdev_io->internal.status = status; 3135 3136 if (spdk_unlikely(bdev_io->type == SPDK_BDEV_IO_TYPE_RESET)) { 3137 bool unlock_channels = false; 3138 3139 if (status == SPDK_BDEV_IO_STATUS_NOMEM) { 3140 SPDK_ERRLOG("NOMEM returned for reset\n"); 3141 } 3142 pthread_mutex_lock(&bdev->internal.mutex); 3143 if (bdev_io == bdev->internal.reset_in_progress) { 3144 bdev->internal.reset_in_progress = NULL; 3145 unlock_channels = true; 3146 } 3147 pthread_mutex_unlock(&bdev->internal.mutex); 3148 3149 if (unlock_channels) { 3150 spdk_for_each_channel(__bdev_to_io_dev(bdev), _spdk_bdev_unfreeze_channel, 3151 bdev_io, _spdk_bdev_reset_complete); 3152 return; 3153 } 3154 } else { 3155 if (spdk_unlikely(bdev_io->internal.orig_iovcnt > 0)) { 3156 _bdev_io_unset_bounce_buf(bdev_io); 3157 } 3158 3159 assert(bdev_ch->io_outstanding > 0); 3160 assert(shared_resource->io_outstanding > 0); 3161 bdev_ch->io_outstanding--; 3162 shared_resource->io_outstanding--; 3163 3164 if (spdk_unlikely(status == SPDK_BDEV_IO_STATUS_NOMEM)) { 3165 TAILQ_INSERT_HEAD(&shared_resource->nomem_io, bdev_io, internal.link); 3166 /* 3167 * Wait for some of the outstanding I/O to complete before we 3168 * retry any of the nomem_io. Normally we will wait for 3169 * NOMEM_THRESHOLD_COUNT I/O to complete but for low queue 3170 * depth channels we will instead wait for half to complete. 3171 */ 3172 shared_resource->nomem_threshold = spdk_max((int64_t)shared_resource->io_outstanding / 2, 3173 (int64_t)shared_resource->io_outstanding - NOMEM_THRESHOLD_COUNT); 3174 return; 3175 } 3176 3177 if (spdk_unlikely(!TAILQ_EMPTY(&shared_resource->nomem_io))) { 3178 _spdk_bdev_ch_retry_io(bdev_ch); 3179 } 3180 } 3181 3182 _spdk_bdev_io_complete(bdev_io); 3183 } 3184 3185 void 3186 spdk_bdev_io_complete_scsi_status(struct spdk_bdev_io *bdev_io, enum spdk_scsi_status sc, 3187 enum spdk_scsi_sense sk, uint8_t asc, uint8_t ascq) 3188 { 3189 if (sc == SPDK_SCSI_STATUS_GOOD) { 3190 bdev_io->internal.status = SPDK_BDEV_IO_STATUS_SUCCESS; 3191 } else { 3192 bdev_io->internal.status = SPDK_BDEV_IO_STATUS_SCSI_ERROR; 3193 bdev_io->internal.error.scsi.sc = sc; 3194 bdev_io->internal.error.scsi.sk = sk; 3195 bdev_io->internal.error.scsi.asc = asc; 3196 bdev_io->internal.error.scsi.ascq = ascq; 3197 } 3198 3199 spdk_bdev_io_complete(bdev_io, bdev_io->internal.status); 3200 } 3201 3202 void 3203 spdk_bdev_io_get_scsi_status(const struct spdk_bdev_io *bdev_io, 3204 int *sc, int *sk, int *asc, int *ascq) 3205 { 3206 assert(sc != NULL); 3207 assert(sk != NULL); 3208 assert(asc != NULL); 3209 assert(ascq != NULL); 3210 3211 switch (bdev_io->internal.status) { 3212 case SPDK_BDEV_IO_STATUS_SUCCESS: 3213 *sc = SPDK_SCSI_STATUS_GOOD; 3214 *sk = SPDK_SCSI_SENSE_NO_SENSE; 3215 *asc = SPDK_SCSI_ASC_NO_ADDITIONAL_SENSE; 3216 *ascq = SPDK_SCSI_ASCQ_CAUSE_NOT_REPORTABLE; 3217 break; 3218 case SPDK_BDEV_IO_STATUS_NVME_ERROR: 3219 spdk_scsi_nvme_translate(bdev_io, sc, sk, asc, ascq); 3220 break; 3221 case SPDK_BDEV_IO_STATUS_SCSI_ERROR: 3222 *sc = bdev_io->internal.error.scsi.sc; 3223 *sk = bdev_io->internal.error.scsi.sk; 3224 *asc = bdev_io->internal.error.scsi.asc; 3225 *ascq = bdev_io->internal.error.scsi.ascq; 3226 break; 3227 default: 3228 *sc = SPDK_SCSI_STATUS_CHECK_CONDITION; 3229 *sk = SPDK_SCSI_SENSE_ABORTED_COMMAND; 3230 *asc = SPDK_SCSI_ASC_NO_ADDITIONAL_SENSE; 3231 *ascq = SPDK_SCSI_ASCQ_CAUSE_NOT_REPORTABLE; 3232 break; 3233 } 3234 } 3235 3236 void 3237 spdk_bdev_io_complete_nvme_status(struct spdk_bdev_io *bdev_io, int sct, int sc) 3238 { 3239 if (sct == SPDK_NVME_SCT_GENERIC && sc == SPDK_NVME_SC_SUCCESS) { 3240 bdev_io->internal.status = SPDK_BDEV_IO_STATUS_SUCCESS; 3241 } else { 3242 bdev_io->internal.error.nvme.sct = sct; 3243 bdev_io->internal.error.nvme.sc = sc; 3244 bdev_io->internal.status = SPDK_BDEV_IO_STATUS_NVME_ERROR; 3245 } 3246 3247 spdk_bdev_io_complete(bdev_io, bdev_io->internal.status); 3248 } 3249 3250 void 3251 spdk_bdev_io_get_nvme_status(const struct spdk_bdev_io *bdev_io, int *sct, int *sc) 3252 { 3253 assert(sct != NULL); 3254 assert(sc != NULL); 3255 3256 if (bdev_io->internal.status == SPDK_BDEV_IO_STATUS_NVME_ERROR) { 3257 *sct = bdev_io->internal.error.nvme.sct; 3258 *sc = bdev_io->internal.error.nvme.sc; 3259 } else if (bdev_io->internal.status == SPDK_BDEV_IO_STATUS_SUCCESS) { 3260 *sct = SPDK_NVME_SCT_GENERIC; 3261 *sc = SPDK_NVME_SC_SUCCESS; 3262 } else { 3263 *sct = SPDK_NVME_SCT_GENERIC; 3264 *sc = SPDK_NVME_SC_INTERNAL_DEVICE_ERROR; 3265 } 3266 } 3267 3268 struct spdk_thread * 3269 spdk_bdev_io_get_thread(struct spdk_bdev_io *bdev_io) 3270 { 3271 return spdk_io_channel_get_thread(bdev_io->internal.ch->channel); 3272 } 3273 3274 static void 3275 _spdk_bdev_qos_config_limit(struct spdk_bdev *bdev, uint64_t *limits) 3276 { 3277 uint64_t min_qos_set; 3278 int i; 3279 3280 for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) { 3281 if (limits[i] != SPDK_BDEV_QOS_LIMIT_NOT_DEFINED) { 3282 break; 3283 } 3284 } 3285 3286 if (i == SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES) { 3287 SPDK_ERRLOG("Invalid rate limits set.\n"); 3288 return; 3289 } 3290 3291 for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) { 3292 if (limits[i] == SPDK_BDEV_QOS_LIMIT_NOT_DEFINED) { 3293 continue; 3294 } 3295 3296 if (_spdk_bdev_qos_is_iops_rate_limit(i) == true) { 3297 min_qos_set = SPDK_BDEV_QOS_MIN_IOS_PER_SEC; 3298 } else { 3299 min_qos_set = SPDK_BDEV_QOS_MIN_BYTES_PER_SEC; 3300 } 3301 3302 if (limits[i] == 0 || limits[i] % min_qos_set) { 3303 SPDK_ERRLOG("Assigned limit %" PRIu64 " on bdev %s is not multiple of %" PRIu64 "\n", 3304 limits[i], bdev->name, min_qos_set); 3305 SPDK_ERRLOG("Failed to enable QoS on this bdev %s\n", bdev->name); 3306 return; 3307 } 3308 } 3309 3310 if (!bdev->internal.qos) { 3311 bdev->internal.qos = calloc(1, sizeof(*bdev->internal.qos)); 3312 if (!bdev->internal.qos) { 3313 SPDK_ERRLOG("Unable to allocate memory for QoS tracking\n"); 3314 return; 3315 } 3316 } 3317 3318 for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) { 3319 bdev->internal.qos->rate_limits[i].limit = limits[i]; 3320 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Bdev:%s QoS type:%d set:%lu\n", 3321 bdev->name, i, limits[i]); 3322 } 3323 3324 return; 3325 } 3326 3327 static void 3328 _spdk_bdev_qos_config(struct spdk_bdev *bdev) 3329 { 3330 struct spdk_conf_section *sp = NULL; 3331 const char *val = NULL; 3332 int i = 0, j = 0; 3333 uint64_t limits[SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES] = {}; 3334 bool config_qos = false; 3335 3336 sp = spdk_conf_find_section(NULL, "QoS"); 3337 if (!sp) { 3338 return; 3339 } 3340 3341 while (j < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES) { 3342 limits[j] = SPDK_BDEV_QOS_LIMIT_NOT_DEFINED; 3343 3344 i = 0; 3345 while (true) { 3346 val = spdk_conf_section_get_nmval(sp, qos_conf_type[j], i, 0); 3347 if (!val) { 3348 break; 3349 } 3350 3351 if (strcmp(bdev->name, val) != 0) { 3352 i++; 3353 continue; 3354 } 3355 3356 val = spdk_conf_section_get_nmval(sp, qos_conf_type[j], i, 1); 3357 if (val) { 3358 if (_spdk_bdev_qos_is_iops_rate_limit(j) == true) { 3359 limits[j] = strtoull(val, NULL, 10); 3360 } else { 3361 limits[j] = strtoull(val, NULL, 10) * 1024 * 1024; 3362 } 3363 config_qos = true; 3364 } 3365 3366 break; 3367 } 3368 3369 j++; 3370 } 3371 3372 if (config_qos == true) { 3373 _spdk_bdev_qos_config_limit(bdev, limits); 3374 } 3375 3376 return; 3377 } 3378 3379 static int 3380 spdk_bdev_init(struct spdk_bdev *bdev) 3381 { 3382 char *bdev_name; 3383 3384 assert(bdev->module != NULL); 3385 3386 if (!bdev->name) { 3387 SPDK_ERRLOG("Bdev name is NULL\n"); 3388 return -EINVAL; 3389 } 3390 3391 if (spdk_bdev_get_by_name(bdev->name)) { 3392 SPDK_ERRLOG("Bdev name:%s already exists\n", bdev->name); 3393 return -EEXIST; 3394 } 3395 3396 /* Users often register their own I/O devices using the bdev name. In 3397 * order to avoid conflicts, prepend bdev_. */ 3398 bdev_name = spdk_sprintf_alloc("bdev_%s", bdev->name); 3399 if (!bdev_name) { 3400 SPDK_ERRLOG("Unable to allocate memory for internal bdev name.\n"); 3401 return -ENOMEM; 3402 } 3403 3404 bdev->internal.status = SPDK_BDEV_STATUS_READY; 3405 bdev->internal.measured_queue_depth = UINT64_MAX; 3406 bdev->internal.claim_module = NULL; 3407 bdev->internal.qd_poller = NULL; 3408 bdev->internal.qos = NULL; 3409 3410 if (spdk_bdev_get_buf_align(bdev) > 1) { 3411 if (bdev->split_on_optimal_io_boundary) { 3412 bdev->optimal_io_boundary = spdk_min(bdev->optimal_io_boundary, 3413 SPDK_BDEV_LARGE_BUF_MAX_SIZE / bdev->blocklen); 3414 } else { 3415 bdev->split_on_optimal_io_boundary = true; 3416 bdev->optimal_io_boundary = SPDK_BDEV_LARGE_BUF_MAX_SIZE / bdev->blocklen; 3417 } 3418 } 3419 3420 TAILQ_INIT(&bdev->internal.open_descs); 3421 3422 TAILQ_INIT(&bdev->aliases); 3423 3424 bdev->internal.reset_in_progress = NULL; 3425 3426 _spdk_bdev_qos_config(bdev); 3427 3428 spdk_io_device_register(__bdev_to_io_dev(bdev), 3429 spdk_bdev_channel_create, spdk_bdev_channel_destroy, 3430 sizeof(struct spdk_bdev_channel), 3431 bdev_name); 3432 3433 free(bdev_name); 3434 3435 pthread_mutex_init(&bdev->internal.mutex, NULL); 3436 return 0; 3437 } 3438 3439 static void 3440 spdk_bdev_destroy_cb(void *io_device) 3441 { 3442 int rc; 3443 struct spdk_bdev *bdev; 3444 spdk_bdev_unregister_cb cb_fn; 3445 void *cb_arg; 3446 3447 bdev = __bdev_from_io_dev(io_device); 3448 cb_fn = bdev->internal.unregister_cb; 3449 cb_arg = bdev->internal.unregister_ctx; 3450 3451 rc = bdev->fn_table->destruct(bdev->ctxt); 3452 if (rc < 0) { 3453 SPDK_ERRLOG("destruct failed\n"); 3454 } 3455 if (rc <= 0 && cb_fn != NULL) { 3456 cb_fn(cb_arg, rc); 3457 } 3458 } 3459 3460 3461 static void 3462 spdk_bdev_fini(struct spdk_bdev *bdev) 3463 { 3464 pthread_mutex_destroy(&bdev->internal.mutex); 3465 3466 free(bdev->internal.qos); 3467 3468 spdk_io_device_unregister(__bdev_to_io_dev(bdev), spdk_bdev_destroy_cb); 3469 } 3470 3471 static void 3472 spdk_bdev_start(struct spdk_bdev *bdev) 3473 { 3474 struct spdk_bdev_module *module; 3475 uint32_t action; 3476 3477 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Inserting bdev %s into list\n", bdev->name); 3478 TAILQ_INSERT_TAIL(&g_bdev_mgr.bdevs, bdev, internal.link); 3479 3480 /* Examine configuration before initializing I/O */ 3481 TAILQ_FOREACH(module, &g_bdev_mgr.bdev_modules, internal.tailq) { 3482 if (module->examine_config) { 3483 action = module->internal.action_in_progress; 3484 module->internal.action_in_progress++; 3485 module->examine_config(bdev); 3486 if (action != module->internal.action_in_progress) { 3487 SPDK_ERRLOG("examine_config for module %s did not call spdk_bdev_module_examine_done()\n", 3488 module->name); 3489 } 3490 } 3491 } 3492 3493 if (bdev->internal.claim_module) { 3494 return; 3495 } 3496 3497 TAILQ_FOREACH(module, &g_bdev_mgr.bdev_modules, internal.tailq) { 3498 if (module->examine_disk) { 3499 module->internal.action_in_progress++; 3500 module->examine_disk(bdev); 3501 } 3502 } 3503 } 3504 3505 int 3506 spdk_bdev_register(struct spdk_bdev *bdev) 3507 { 3508 int rc = spdk_bdev_init(bdev); 3509 3510 if (rc == 0) { 3511 spdk_bdev_start(bdev); 3512 } 3513 3514 return rc; 3515 } 3516 3517 int 3518 spdk_vbdev_register(struct spdk_bdev *vbdev, struct spdk_bdev **base_bdevs, int base_bdev_count) 3519 { 3520 int rc; 3521 3522 rc = spdk_bdev_init(vbdev); 3523 if (rc) { 3524 return rc; 3525 } 3526 3527 spdk_bdev_start(vbdev); 3528 return 0; 3529 } 3530 3531 void 3532 spdk_bdev_destruct_done(struct spdk_bdev *bdev, int bdeverrno) 3533 { 3534 if (bdev->internal.unregister_cb != NULL) { 3535 bdev->internal.unregister_cb(bdev->internal.unregister_ctx, bdeverrno); 3536 } 3537 } 3538 3539 static void 3540 _remove_notify(void *arg) 3541 { 3542 struct spdk_bdev_desc *desc = arg; 3543 3544 desc->remove_scheduled = false; 3545 3546 if (desc->closed) { 3547 free(desc); 3548 } else { 3549 desc->remove_cb(desc->remove_ctx); 3550 } 3551 } 3552 3553 void 3554 spdk_bdev_unregister(struct spdk_bdev *bdev, spdk_bdev_unregister_cb cb_fn, void *cb_arg) 3555 { 3556 struct spdk_bdev_desc *desc, *tmp; 3557 bool do_destruct = true; 3558 struct spdk_thread *thread; 3559 3560 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Removing bdev %s from list\n", bdev->name); 3561 3562 thread = spdk_get_thread(); 3563 if (!thread) { 3564 /* The user called this from a non-SPDK thread. */ 3565 if (cb_fn != NULL) { 3566 cb_fn(cb_arg, -ENOTSUP); 3567 } 3568 return; 3569 } 3570 3571 pthread_mutex_lock(&bdev->internal.mutex); 3572 3573 bdev->internal.status = SPDK_BDEV_STATUS_REMOVING; 3574 bdev->internal.unregister_cb = cb_fn; 3575 bdev->internal.unregister_ctx = cb_arg; 3576 3577 TAILQ_FOREACH_SAFE(desc, &bdev->internal.open_descs, link, tmp) { 3578 if (desc->remove_cb) { 3579 do_destruct = false; 3580 /* 3581 * Defer invocation of the remove_cb to a separate message that will 3582 * run later on its thread. This ensures this context unwinds and 3583 * we don't recursively unregister this bdev again if the remove_cb 3584 * immediately closes its descriptor. 3585 */ 3586 if (!desc->remove_scheduled) { 3587 /* Avoid scheduling removal of the same descriptor multiple times. */ 3588 desc->remove_scheduled = true; 3589 spdk_thread_send_msg(desc->thread, _remove_notify, desc); 3590 } 3591 } 3592 } 3593 3594 if (!do_destruct) { 3595 pthread_mutex_unlock(&bdev->internal.mutex); 3596 return; 3597 } 3598 3599 TAILQ_REMOVE(&g_bdev_mgr.bdevs, bdev, internal.link); 3600 pthread_mutex_unlock(&bdev->internal.mutex); 3601 3602 spdk_bdev_fini(bdev); 3603 } 3604 3605 int 3606 spdk_bdev_open(struct spdk_bdev *bdev, bool write, spdk_bdev_remove_cb_t remove_cb, 3607 void *remove_ctx, struct spdk_bdev_desc **_desc) 3608 { 3609 struct spdk_bdev_desc *desc; 3610 struct spdk_thread *thread; 3611 3612 thread = spdk_get_thread(); 3613 if (!thread) { 3614 SPDK_ERRLOG("Cannot open bdev from non-SPDK thread.\n"); 3615 return -ENOTSUP; 3616 } 3617 3618 desc = calloc(1, sizeof(*desc)); 3619 if (desc == NULL) { 3620 SPDK_ERRLOG("Failed to allocate memory for bdev descriptor\n"); 3621 return -ENOMEM; 3622 } 3623 3624 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Opening descriptor %p for bdev %s on thread %p\n", desc, bdev->name, 3625 spdk_get_thread()); 3626 3627 pthread_mutex_lock(&bdev->internal.mutex); 3628 3629 if (write && bdev->internal.claim_module) { 3630 SPDK_ERRLOG("Could not open %s - %s module already claimed it\n", 3631 bdev->name, bdev->internal.claim_module->name); 3632 free(desc); 3633 pthread_mutex_unlock(&bdev->internal.mutex); 3634 return -EPERM; 3635 } 3636 3637 TAILQ_INSERT_TAIL(&bdev->internal.open_descs, desc, link); 3638 3639 desc->bdev = bdev; 3640 desc->thread = thread; 3641 desc->remove_cb = remove_cb; 3642 desc->remove_ctx = remove_ctx; 3643 desc->write = write; 3644 *_desc = desc; 3645 3646 pthread_mutex_unlock(&bdev->internal.mutex); 3647 3648 return 0; 3649 } 3650 3651 void 3652 spdk_bdev_close(struct spdk_bdev_desc *desc) 3653 { 3654 struct spdk_bdev *bdev = desc->bdev; 3655 bool do_unregister = false; 3656 3657 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Closing descriptor %p for bdev %s on thread %p\n", desc, bdev->name, 3658 spdk_get_thread()); 3659 3660 assert(desc->thread == spdk_get_thread()); 3661 3662 pthread_mutex_lock(&bdev->internal.mutex); 3663 3664 TAILQ_REMOVE(&bdev->internal.open_descs, desc, link); 3665 3666 desc->closed = true; 3667 3668 if (!desc->remove_scheduled) { 3669 free(desc); 3670 } 3671 3672 /* If no more descriptors, kill QoS channel */ 3673 if (bdev->internal.qos && TAILQ_EMPTY(&bdev->internal.open_descs)) { 3674 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Closed last descriptor for bdev %s on thread %p. Stopping QoS.\n", 3675 bdev->name, spdk_get_thread()); 3676 3677 if (spdk_bdev_qos_destroy(bdev)) { 3678 /* There isn't anything we can do to recover here. Just let the 3679 * old QoS poller keep running. The QoS handling won't change 3680 * cores when the user allocates a new channel, but it won't break. */ 3681 SPDK_ERRLOG("Unable to shut down QoS poller. It will continue running on the current thread.\n"); 3682 } 3683 } 3684 3685 spdk_bdev_set_qd_sampling_period(bdev, 0); 3686 3687 if (bdev->internal.status == SPDK_BDEV_STATUS_REMOVING && TAILQ_EMPTY(&bdev->internal.open_descs)) { 3688 do_unregister = true; 3689 } 3690 pthread_mutex_unlock(&bdev->internal.mutex); 3691 3692 if (do_unregister == true) { 3693 spdk_bdev_unregister(bdev, bdev->internal.unregister_cb, bdev->internal.unregister_ctx); 3694 } 3695 } 3696 3697 int 3698 spdk_bdev_module_claim_bdev(struct spdk_bdev *bdev, struct spdk_bdev_desc *desc, 3699 struct spdk_bdev_module *module) 3700 { 3701 if (bdev->internal.claim_module != NULL) { 3702 SPDK_ERRLOG("bdev %s already claimed by module %s\n", bdev->name, 3703 bdev->internal.claim_module->name); 3704 return -EPERM; 3705 } 3706 3707 if (desc && !desc->write) { 3708 desc->write = true; 3709 } 3710 3711 bdev->internal.claim_module = module; 3712 return 0; 3713 } 3714 3715 void 3716 spdk_bdev_module_release_bdev(struct spdk_bdev *bdev) 3717 { 3718 assert(bdev->internal.claim_module != NULL); 3719 bdev->internal.claim_module = NULL; 3720 } 3721 3722 struct spdk_bdev * 3723 spdk_bdev_desc_get_bdev(struct spdk_bdev_desc *desc) 3724 { 3725 return desc->bdev; 3726 } 3727 3728 void 3729 spdk_bdev_io_get_iovec(struct spdk_bdev_io *bdev_io, struct iovec **iovp, int *iovcntp) 3730 { 3731 struct iovec *iovs; 3732 int iovcnt; 3733 3734 if (bdev_io == NULL) { 3735 return; 3736 } 3737 3738 switch (bdev_io->type) { 3739 case SPDK_BDEV_IO_TYPE_READ: 3740 iovs = bdev_io->u.bdev.iovs; 3741 iovcnt = bdev_io->u.bdev.iovcnt; 3742 break; 3743 case SPDK_BDEV_IO_TYPE_WRITE: 3744 iovs = bdev_io->u.bdev.iovs; 3745 iovcnt = bdev_io->u.bdev.iovcnt; 3746 break; 3747 default: 3748 iovs = NULL; 3749 iovcnt = 0; 3750 break; 3751 } 3752 3753 if (iovp) { 3754 *iovp = iovs; 3755 } 3756 if (iovcntp) { 3757 *iovcntp = iovcnt; 3758 } 3759 } 3760 3761 void 3762 spdk_bdev_module_list_add(struct spdk_bdev_module *bdev_module) 3763 { 3764 3765 if (spdk_bdev_module_list_find(bdev_module->name)) { 3766 SPDK_ERRLOG("ERROR: module '%s' already registered.\n", bdev_module->name); 3767 assert(false); 3768 } 3769 3770 if (bdev_module->async_init) { 3771 bdev_module->internal.action_in_progress = 1; 3772 } 3773 3774 /* 3775 * Modules with examine callbacks must be initialized first, so they are 3776 * ready to handle examine callbacks from later modules that will 3777 * register physical bdevs. 3778 */ 3779 if (bdev_module->examine_config != NULL || bdev_module->examine_disk != NULL) { 3780 TAILQ_INSERT_HEAD(&g_bdev_mgr.bdev_modules, bdev_module, internal.tailq); 3781 } else { 3782 TAILQ_INSERT_TAIL(&g_bdev_mgr.bdev_modules, bdev_module, internal.tailq); 3783 } 3784 } 3785 3786 struct spdk_bdev_module * 3787 spdk_bdev_module_list_find(const char *name) 3788 { 3789 struct spdk_bdev_module *bdev_module; 3790 3791 TAILQ_FOREACH(bdev_module, &g_bdev_mgr.bdev_modules, internal.tailq) { 3792 if (strcmp(name, bdev_module->name) == 0) { 3793 break; 3794 } 3795 } 3796 3797 return bdev_module; 3798 } 3799 3800 static void 3801 _spdk_bdev_write_zero_buffer_next(void *_bdev_io) 3802 { 3803 struct spdk_bdev_io *bdev_io = _bdev_io; 3804 uint64_t num_bytes, num_blocks; 3805 int rc; 3806 3807 num_bytes = spdk_min(spdk_bdev_get_block_size(bdev_io->bdev) * 3808 bdev_io->u.bdev.split_remaining_num_blocks, 3809 ZERO_BUFFER_SIZE); 3810 num_blocks = num_bytes / spdk_bdev_get_block_size(bdev_io->bdev); 3811 3812 rc = spdk_bdev_write_blocks(bdev_io->internal.desc, 3813 spdk_io_channel_from_ctx(bdev_io->internal.ch), 3814 g_bdev_mgr.zero_buffer, 3815 bdev_io->u.bdev.split_current_offset_blocks, num_blocks, 3816 _spdk_bdev_write_zero_buffer_done, bdev_io); 3817 if (rc == 0) { 3818 bdev_io->u.bdev.split_remaining_num_blocks -= num_blocks; 3819 bdev_io->u.bdev.split_current_offset_blocks += num_blocks; 3820 } else if (rc == -ENOMEM) { 3821 _spdk_bdev_queue_io_wait_with_cb(bdev_io, _spdk_bdev_write_zero_buffer_next); 3822 } else { 3823 bdev_io->internal.status = SPDK_BDEV_IO_STATUS_FAILED; 3824 bdev_io->internal.cb(bdev_io, false, bdev_io->internal.caller_ctx); 3825 } 3826 } 3827 3828 static void 3829 _spdk_bdev_write_zero_buffer_done(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg) 3830 { 3831 struct spdk_bdev_io *parent_io = cb_arg; 3832 3833 spdk_bdev_free_io(bdev_io); 3834 3835 if (!success) { 3836 parent_io->internal.status = SPDK_BDEV_IO_STATUS_FAILED; 3837 parent_io->internal.cb(parent_io, false, parent_io->internal.caller_ctx); 3838 return; 3839 } 3840 3841 if (parent_io->u.bdev.split_remaining_num_blocks == 0) { 3842 parent_io->internal.status = SPDK_BDEV_IO_STATUS_SUCCESS; 3843 parent_io->internal.cb(parent_io, true, parent_io->internal.caller_ctx); 3844 return; 3845 } 3846 3847 _spdk_bdev_write_zero_buffer_next(parent_io); 3848 } 3849 3850 struct set_qos_limit_ctx { 3851 void (*cb_fn)(void *cb_arg, int status); 3852 void *cb_arg; 3853 struct spdk_bdev *bdev; 3854 }; 3855 3856 static void 3857 _spdk_bdev_set_qos_limit_done(struct set_qos_limit_ctx *ctx, int status) 3858 { 3859 pthread_mutex_lock(&ctx->bdev->internal.mutex); 3860 ctx->bdev->internal.qos_mod_in_progress = false; 3861 pthread_mutex_unlock(&ctx->bdev->internal.mutex); 3862 3863 ctx->cb_fn(ctx->cb_arg, status); 3864 free(ctx); 3865 } 3866 3867 static void 3868 _spdk_bdev_disable_qos_done(void *cb_arg) 3869 { 3870 struct set_qos_limit_ctx *ctx = cb_arg; 3871 struct spdk_bdev *bdev = ctx->bdev; 3872 struct spdk_bdev_io *bdev_io; 3873 struct spdk_bdev_qos *qos; 3874 3875 pthread_mutex_lock(&bdev->internal.mutex); 3876 qos = bdev->internal.qos; 3877 bdev->internal.qos = NULL; 3878 pthread_mutex_unlock(&bdev->internal.mutex); 3879 3880 while (!TAILQ_EMPTY(&qos->queued)) { 3881 /* Send queued I/O back to their original thread for resubmission. */ 3882 bdev_io = TAILQ_FIRST(&qos->queued); 3883 TAILQ_REMOVE(&qos->queued, bdev_io, internal.link); 3884 3885 if (bdev_io->internal.io_submit_ch) { 3886 /* 3887 * Channel was changed when sending it to the QoS thread - change it back 3888 * before sending it back to the original thread. 3889 */ 3890 bdev_io->internal.ch = bdev_io->internal.io_submit_ch; 3891 bdev_io->internal.io_submit_ch = NULL; 3892 } 3893 3894 spdk_thread_send_msg(spdk_io_channel_get_thread(bdev_io->internal.ch->channel), 3895 _spdk_bdev_io_submit, bdev_io); 3896 } 3897 3898 spdk_put_io_channel(spdk_io_channel_from_ctx(qos->ch)); 3899 spdk_poller_unregister(&qos->poller); 3900 3901 free(qos); 3902 3903 _spdk_bdev_set_qos_limit_done(ctx, 0); 3904 } 3905 3906 static void 3907 _spdk_bdev_disable_qos_msg_done(struct spdk_io_channel_iter *i, int status) 3908 { 3909 void *io_device = spdk_io_channel_iter_get_io_device(i); 3910 struct spdk_bdev *bdev = __bdev_from_io_dev(io_device); 3911 struct set_qos_limit_ctx *ctx = spdk_io_channel_iter_get_ctx(i); 3912 struct spdk_thread *thread; 3913 3914 pthread_mutex_lock(&bdev->internal.mutex); 3915 thread = bdev->internal.qos->thread; 3916 pthread_mutex_unlock(&bdev->internal.mutex); 3917 3918 spdk_thread_send_msg(thread, _spdk_bdev_disable_qos_done, ctx); 3919 } 3920 3921 static void 3922 _spdk_bdev_disable_qos_msg(struct spdk_io_channel_iter *i) 3923 { 3924 struct spdk_io_channel *ch = spdk_io_channel_iter_get_channel(i); 3925 struct spdk_bdev_channel *bdev_ch = spdk_io_channel_get_ctx(ch); 3926 3927 bdev_ch->flags &= ~BDEV_CH_QOS_ENABLED; 3928 3929 spdk_for_each_channel_continue(i, 0); 3930 } 3931 3932 static void 3933 _spdk_bdev_update_qos_rate_limit_msg(void *cb_arg) 3934 { 3935 struct set_qos_limit_ctx *ctx = cb_arg; 3936 struct spdk_bdev *bdev = ctx->bdev; 3937 3938 pthread_mutex_lock(&bdev->internal.mutex); 3939 spdk_bdev_qos_update_max_quota_per_timeslice(bdev->internal.qos); 3940 pthread_mutex_unlock(&bdev->internal.mutex); 3941 3942 _spdk_bdev_set_qos_limit_done(ctx, 0); 3943 } 3944 3945 static void 3946 _spdk_bdev_enable_qos_msg(struct spdk_io_channel_iter *i) 3947 { 3948 void *io_device = spdk_io_channel_iter_get_io_device(i); 3949 struct spdk_bdev *bdev = __bdev_from_io_dev(io_device); 3950 struct spdk_io_channel *ch = spdk_io_channel_iter_get_channel(i); 3951 struct spdk_bdev_channel *bdev_ch = spdk_io_channel_get_ctx(ch); 3952 3953 pthread_mutex_lock(&bdev->internal.mutex); 3954 _spdk_bdev_enable_qos(bdev, bdev_ch); 3955 pthread_mutex_unlock(&bdev->internal.mutex); 3956 spdk_for_each_channel_continue(i, 0); 3957 } 3958 3959 static void 3960 _spdk_bdev_enable_qos_done(struct spdk_io_channel_iter *i, int status) 3961 { 3962 struct set_qos_limit_ctx *ctx = spdk_io_channel_iter_get_ctx(i); 3963 3964 _spdk_bdev_set_qos_limit_done(ctx, status); 3965 } 3966 3967 static void 3968 _spdk_bdev_set_qos_rate_limits(struct spdk_bdev *bdev, uint64_t *limits) 3969 { 3970 int i; 3971 3972 assert(bdev->internal.qos != NULL); 3973 3974 for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) { 3975 if (limits[i] != SPDK_BDEV_QOS_LIMIT_NOT_DEFINED) { 3976 bdev->internal.qos->rate_limits[i].limit = limits[i]; 3977 3978 if (limits[i] == 0) { 3979 bdev->internal.qos->rate_limits[i].limit = 3980 SPDK_BDEV_QOS_LIMIT_NOT_DEFINED; 3981 } 3982 } 3983 } 3984 } 3985 3986 void 3987 spdk_bdev_set_qos_rate_limits(struct spdk_bdev *bdev, uint64_t *limits, 3988 void (*cb_fn)(void *cb_arg, int status), void *cb_arg) 3989 { 3990 struct set_qos_limit_ctx *ctx; 3991 uint32_t limit_set_complement; 3992 uint64_t min_limit_per_sec; 3993 int i; 3994 bool disable_rate_limit = true; 3995 3996 for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) { 3997 if (limits[i] == SPDK_BDEV_QOS_LIMIT_NOT_DEFINED) { 3998 continue; 3999 } 4000 4001 if (limits[i] > 0) { 4002 disable_rate_limit = false; 4003 } 4004 4005 if (_spdk_bdev_qos_is_iops_rate_limit(i) == true) { 4006 min_limit_per_sec = SPDK_BDEV_QOS_MIN_IOS_PER_SEC; 4007 } else { 4008 /* Change from megabyte to byte rate limit */ 4009 limits[i] = limits[i] * 1024 * 1024; 4010 min_limit_per_sec = SPDK_BDEV_QOS_MIN_BYTES_PER_SEC; 4011 } 4012 4013 limit_set_complement = limits[i] % min_limit_per_sec; 4014 if (limit_set_complement) { 4015 SPDK_ERRLOG("Requested rate limit %" PRIu64 " is not a multiple of %" PRIu64 "\n", 4016 limits[i], min_limit_per_sec); 4017 limits[i] += min_limit_per_sec - limit_set_complement; 4018 SPDK_ERRLOG("Round up the rate limit to %" PRIu64 "\n", limits[i]); 4019 } 4020 } 4021 4022 ctx = calloc(1, sizeof(*ctx)); 4023 if (ctx == NULL) { 4024 cb_fn(cb_arg, -ENOMEM); 4025 return; 4026 } 4027 4028 ctx->cb_fn = cb_fn; 4029 ctx->cb_arg = cb_arg; 4030 ctx->bdev = bdev; 4031 4032 pthread_mutex_lock(&bdev->internal.mutex); 4033 if (bdev->internal.qos_mod_in_progress) { 4034 pthread_mutex_unlock(&bdev->internal.mutex); 4035 free(ctx); 4036 cb_fn(cb_arg, -EAGAIN); 4037 return; 4038 } 4039 bdev->internal.qos_mod_in_progress = true; 4040 4041 if (disable_rate_limit == true && bdev->internal.qos) { 4042 for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) { 4043 if (limits[i] == SPDK_BDEV_QOS_LIMIT_NOT_DEFINED && 4044 (bdev->internal.qos->rate_limits[i].limit > 0 && 4045 bdev->internal.qos->rate_limits[i].limit != 4046 SPDK_BDEV_QOS_LIMIT_NOT_DEFINED)) { 4047 disable_rate_limit = false; 4048 break; 4049 } 4050 } 4051 } 4052 4053 if (disable_rate_limit == false) { 4054 if (bdev->internal.qos == NULL) { 4055 /* Enabling */ 4056 bdev->internal.qos = calloc(1, sizeof(*bdev->internal.qos)); 4057 if (!bdev->internal.qos) { 4058 pthread_mutex_unlock(&bdev->internal.mutex); 4059 SPDK_ERRLOG("Unable to allocate memory for QoS tracking\n"); 4060 free(ctx); 4061 cb_fn(cb_arg, -ENOMEM); 4062 return; 4063 } 4064 4065 _spdk_bdev_set_qos_rate_limits(bdev, limits); 4066 4067 spdk_for_each_channel(__bdev_to_io_dev(bdev), 4068 _spdk_bdev_enable_qos_msg, ctx, 4069 _spdk_bdev_enable_qos_done); 4070 } else { 4071 /* Updating */ 4072 _spdk_bdev_set_qos_rate_limits(bdev, limits); 4073 4074 spdk_thread_send_msg(bdev->internal.qos->thread, 4075 _spdk_bdev_update_qos_rate_limit_msg, ctx); 4076 } 4077 } else { 4078 if (bdev->internal.qos != NULL) { 4079 _spdk_bdev_set_qos_rate_limits(bdev, limits); 4080 4081 /* Disabling */ 4082 spdk_for_each_channel(__bdev_to_io_dev(bdev), 4083 _spdk_bdev_disable_qos_msg, ctx, 4084 _spdk_bdev_disable_qos_msg_done); 4085 } else { 4086 pthread_mutex_unlock(&bdev->internal.mutex); 4087 _spdk_bdev_set_qos_limit_done(ctx, 0); 4088 return; 4089 } 4090 } 4091 4092 pthread_mutex_unlock(&bdev->internal.mutex); 4093 } 4094 4095 SPDK_LOG_REGISTER_COMPONENT("bdev", SPDK_LOG_BDEV) 4096 4097 SPDK_TRACE_REGISTER_FN(bdev_trace, "bdev", TRACE_GROUP_BDEV) 4098 { 4099 spdk_trace_register_owner(OWNER_BDEV, 'b'); 4100 spdk_trace_register_object(OBJECT_BDEV_IO, 'i'); 4101 spdk_trace_register_description("BDEV_IO_START", "", TRACE_BDEV_IO_START, OWNER_BDEV, 4102 OBJECT_BDEV_IO, 1, 0, "type: "); 4103 spdk_trace_register_description("BDEV_IO_DONE", "", TRACE_BDEV_IO_DONE, OWNER_BDEV, 4104 OBJECT_BDEV_IO, 0, 0, ""); 4105 } 4106