1 /*- 2 * BSD LICENSE 3 * 4 * Copyright (c) Intel Corporation. 5 * All rights reserved. 6 * 7 * Redistribution and use in source and binary forms, with or without 8 * modification, are permitted provided that the following conditions 9 * are met: 10 * 11 * * Redistributions of source code must retain the above copyright 12 * notice, this list of conditions and the following disclaimer. 13 * * Redistributions in binary form must reproduce the above copyright 14 * notice, this list of conditions and the following disclaimer in 15 * the documentation and/or other materials provided with the 16 * distribution. 17 * * Neither the name of Intel Corporation nor the names of its 18 * contributors may be used to endorse or promote products derived 19 * from this software without specific prior written permission. 20 * 21 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 22 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 23 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR 24 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT 25 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, 26 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT 27 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 28 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY 29 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 30 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE 31 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 32 */ 33 34 #include "spdk/stdinc.h" 35 36 #include "spdk/bdev.h" 37 #include "spdk/conf.h" 38 39 #include "spdk/config.h" 40 #include "spdk/env.h" 41 #include "spdk/event.h" 42 #include "spdk/thread.h" 43 #include "spdk/likely.h" 44 #include "spdk/queue.h" 45 #include "spdk/nvme_spec.h" 46 #include "spdk/scsi_spec.h" 47 #include "spdk/util.h" 48 #include "spdk/trace.h" 49 50 #include "spdk/bdev_module.h" 51 #include "spdk_internal/log.h" 52 #include "spdk/string.h" 53 54 #ifdef SPDK_CONFIG_VTUNE 55 #include "ittnotify.h" 56 #include "ittnotify_types.h" 57 int __itt_init_ittlib(const char *, __itt_group_id); 58 #endif 59 60 #define SPDK_BDEV_IO_POOL_SIZE (64 * 1024) 61 #define SPDK_BDEV_IO_CACHE_SIZE 256 62 #define BUF_SMALL_POOL_SIZE 8192 63 #define BUF_LARGE_POOL_SIZE 1024 64 #define NOMEM_THRESHOLD_COUNT 8 65 #define ZERO_BUFFER_SIZE 0x100000 66 67 #define OWNER_BDEV 0x2 68 69 #define OBJECT_BDEV_IO 0x2 70 71 #define TRACE_GROUP_BDEV 0x3 72 #define TRACE_BDEV_IO_START SPDK_TPOINT_ID(TRACE_GROUP_BDEV, 0x0) 73 #define TRACE_BDEV_IO_DONE SPDK_TPOINT_ID(TRACE_GROUP_BDEV, 0x1) 74 75 #define SPDK_BDEV_QOS_TIMESLICE_IN_USEC 1000 76 #define SPDK_BDEV_QOS_MIN_IO_PER_TIMESLICE 1 77 #define SPDK_BDEV_QOS_MIN_BYTE_PER_TIMESLICE 512 78 #define SPDK_BDEV_QOS_MIN_IOS_PER_SEC 10000 79 #define SPDK_BDEV_QOS_MIN_BYTES_PER_SEC (10 * 1024 * 1024) 80 #define SPDK_BDEV_QOS_LIMIT_NOT_DEFINED UINT64_MAX 81 82 #define SPDK_BDEV_POOL_ALIGNMENT 512 83 84 static const char *qos_conf_type[] = {"Limit_IOPS", "Limit_BPS"}; 85 static const char *qos_rpc_type[] = {"rw_ios_per_sec", "rw_mbytes_per_sec"}; 86 87 TAILQ_HEAD(spdk_bdev_list, spdk_bdev); 88 89 struct spdk_bdev_mgr { 90 struct spdk_mempool *bdev_io_pool; 91 92 struct spdk_mempool *buf_small_pool; 93 struct spdk_mempool *buf_large_pool; 94 95 void *zero_buffer; 96 97 TAILQ_HEAD(bdev_module_list, spdk_bdev_module) bdev_modules; 98 99 struct spdk_bdev_list bdevs; 100 101 bool init_complete; 102 bool module_init_complete; 103 104 #ifdef SPDK_CONFIG_VTUNE 105 __itt_domain *domain; 106 #endif 107 }; 108 109 static struct spdk_bdev_mgr g_bdev_mgr = { 110 .bdev_modules = TAILQ_HEAD_INITIALIZER(g_bdev_mgr.bdev_modules), 111 .bdevs = TAILQ_HEAD_INITIALIZER(g_bdev_mgr.bdevs), 112 .init_complete = false, 113 .module_init_complete = false, 114 }; 115 116 static struct spdk_bdev_opts g_bdev_opts = { 117 .bdev_io_pool_size = SPDK_BDEV_IO_POOL_SIZE, 118 .bdev_io_cache_size = SPDK_BDEV_IO_CACHE_SIZE, 119 }; 120 121 static spdk_bdev_init_cb g_init_cb_fn = NULL; 122 static void *g_init_cb_arg = NULL; 123 124 static spdk_bdev_fini_cb g_fini_cb_fn = NULL; 125 static void *g_fini_cb_arg = NULL; 126 static struct spdk_thread *g_fini_thread = NULL; 127 128 struct spdk_bdev_qos_limit { 129 /** IOs or bytes allowed per second (i.e., 1s). */ 130 uint64_t limit; 131 132 /** Remaining IOs or bytes allowed in current timeslice (e.g., 1ms). 133 * For remaining bytes, allowed to run negative if an I/O is submitted when 134 * some bytes are remaining, but the I/O is bigger than that amount. The 135 * excess will be deducted from the next timeslice. 136 */ 137 int64_t remaining_this_timeslice; 138 139 /** Minimum allowed IOs or bytes to be issued in one timeslice (e.g., 1ms). */ 140 uint32_t min_per_timeslice; 141 142 /** Maximum allowed IOs or bytes to be issued in one timeslice (e.g., 1ms). */ 143 uint32_t max_per_timeslice; 144 }; 145 146 struct spdk_bdev_qos { 147 /** Types of structure of rate limits. */ 148 struct spdk_bdev_qos_limit rate_limits[SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES]; 149 150 /** The channel that all I/O are funneled through. */ 151 struct spdk_bdev_channel *ch; 152 153 /** The thread on which the poller is running. */ 154 struct spdk_thread *thread; 155 156 /** Queue of I/O waiting to be issued. */ 157 bdev_io_tailq_t queued; 158 159 /** Size of a timeslice in tsc ticks. */ 160 uint64_t timeslice_size; 161 162 /** Timestamp of start of last timeslice. */ 163 uint64_t last_timeslice; 164 165 /** Poller that processes queued I/O commands each time slice. */ 166 struct spdk_poller *poller; 167 }; 168 169 struct spdk_bdev_mgmt_channel { 170 bdev_io_stailq_t need_buf_small; 171 bdev_io_stailq_t need_buf_large; 172 173 /* 174 * Each thread keeps a cache of bdev_io - this allows 175 * bdev threads which are *not* DPDK threads to still 176 * benefit from a per-thread bdev_io cache. Without 177 * this, non-DPDK threads fetching from the mempool 178 * incur a cmpxchg on get and put. 179 */ 180 bdev_io_stailq_t per_thread_cache; 181 uint32_t per_thread_cache_count; 182 uint32_t bdev_io_cache_size; 183 184 TAILQ_HEAD(, spdk_bdev_shared_resource) shared_resources; 185 TAILQ_HEAD(, spdk_bdev_io_wait_entry) io_wait_queue; 186 }; 187 188 /* 189 * Per-module (or per-io_device) data. Multiple bdevs built on the same io_device 190 * will queue here their IO that awaits retry. It makes it possible to retry sending 191 * IO to one bdev after IO from other bdev completes. 192 */ 193 struct spdk_bdev_shared_resource { 194 /* The bdev management channel */ 195 struct spdk_bdev_mgmt_channel *mgmt_ch; 196 197 /* 198 * Count of I/O submitted to bdev module and waiting for completion. 199 * Incremented before submit_request() is called on an spdk_bdev_io. 200 */ 201 uint64_t io_outstanding; 202 203 /* 204 * Queue of IO awaiting retry because of a previous NOMEM status returned 205 * on this channel. 206 */ 207 bdev_io_tailq_t nomem_io; 208 209 /* 210 * Threshold which io_outstanding must drop to before retrying nomem_io. 211 */ 212 uint64_t nomem_threshold; 213 214 /* I/O channel allocated by a bdev module */ 215 struct spdk_io_channel *shared_ch; 216 217 /* Refcount of bdev channels using this resource */ 218 uint32_t ref; 219 220 TAILQ_ENTRY(spdk_bdev_shared_resource) link; 221 }; 222 223 #define BDEV_CH_RESET_IN_PROGRESS (1 << 0) 224 #define BDEV_CH_QOS_ENABLED (1 << 1) 225 226 struct spdk_bdev_channel { 227 struct spdk_bdev *bdev; 228 229 /* The channel for the underlying device */ 230 struct spdk_io_channel *channel; 231 232 /* Per io_device per thread data */ 233 struct spdk_bdev_shared_resource *shared_resource; 234 235 struct spdk_bdev_io_stat stat; 236 237 /* 238 * Count of I/O submitted through this channel and waiting for completion. 239 * Incremented before submit_request() is called on an spdk_bdev_io. 240 */ 241 uint64_t io_outstanding; 242 243 bdev_io_tailq_t queued_resets; 244 245 uint32_t flags; 246 247 #ifdef SPDK_CONFIG_VTUNE 248 uint64_t start_tsc; 249 uint64_t interval_tsc; 250 __itt_string_handle *handle; 251 struct spdk_bdev_io_stat prev_stat; 252 #endif 253 254 }; 255 256 struct spdk_bdev_desc { 257 struct spdk_bdev *bdev; 258 struct spdk_thread *thread; 259 spdk_bdev_remove_cb_t remove_cb; 260 void *remove_ctx; 261 bool remove_scheduled; 262 bool closed; 263 bool write; 264 TAILQ_ENTRY(spdk_bdev_desc) link; 265 }; 266 267 struct spdk_bdev_iostat_ctx { 268 struct spdk_bdev_io_stat *stat; 269 spdk_bdev_get_device_stat_cb cb; 270 void *cb_arg; 271 }; 272 273 #define __bdev_to_io_dev(bdev) (((char *)bdev) + 1) 274 #define __bdev_from_io_dev(io_dev) ((struct spdk_bdev *)(((char *)io_dev) - 1)) 275 276 static void _spdk_bdev_write_zero_buffer_done(struct spdk_bdev_io *bdev_io, bool success, 277 void *cb_arg); 278 static void _spdk_bdev_write_zero_buffer_next(void *_bdev_io); 279 280 void 281 spdk_bdev_get_opts(struct spdk_bdev_opts *opts) 282 { 283 *opts = g_bdev_opts; 284 } 285 286 int 287 spdk_bdev_set_opts(struct spdk_bdev_opts *opts) 288 { 289 uint32_t min_pool_size; 290 291 /* 292 * Add 1 to the thread count to account for the extra mgmt_ch that gets created during subsystem 293 * initialization. A second mgmt_ch will be created on the same thread when the application starts 294 * but before the deferred put_io_channel event is executed for the first mgmt_ch. 295 */ 296 min_pool_size = opts->bdev_io_cache_size * (spdk_thread_get_count() + 1); 297 if (opts->bdev_io_pool_size < min_pool_size) { 298 SPDK_ERRLOG("bdev_io_pool_size %" PRIu32 " is not compatible with bdev_io_cache_size %" PRIu32 299 " and %" PRIu32 " threads\n", opts->bdev_io_pool_size, opts->bdev_io_cache_size, 300 spdk_thread_get_count()); 301 SPDK_ERRLOG("bdev_io_pool_size must be at least %" PRIu32 "\n", min_pool_size); 302 return -1; 303 } 304 305 g_bdev_opts = *opts; 306 return 0; 307 } 308 309 struct spdk_bdev * 310 spdk_bdev_first(void) 311 { 312 struct spdk_bdev *bdev; 313 314 bdev = TAILQ_FIRST(&g_bdev_mgr.bdevs); 315 if (bdev) { 316 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Starting bdev iteration at %s\n", bdev->name); 317 } 318 319 return bdev; 320 } 321 322 struct spdk_bdev * 323 spdk_bdev_next(struct spdk_bdev *prev) 324 { 325 struct spdk_bdev *bdev; 326 327 bdev = TAILQ_NEXT(prev, internal.link); 328 if (bdev) { 329 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Continuing bdev iteration at %s\n", bdev->name); 330 } 331 332 return bdev; 333 } 334 335 static struct spdk_bdev * 336 _bdev_next_leaf(struct spdk_bdev *bdev) 337 { 338 while (bdev != NULL) { 339 if (bdev->internal.claim_module == NULL) { 340 return bdev; 341 } else { 342 bdev = TAILQ_NEXT(bdev, internal.link); 343 } 344 } 345 346 return bdev; 347 } 348 349 struct spdk_bdev * 350 spdk_bdev_first_leaf(void) 351 { 352 struct spdk_bdev *bdev; 353 354 bdev = _bdev_next_leaf(TAILQ_FIRST(&g_bdev_mgr.bdevs)); 355 356 if (bdev) { 357 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Starting bdev iteration at %s\n", bdev->name); 358 } 359 360 return bdev; 361 } 362 363 struct spdk_bdev * 364 spdk_bdev_next_leaf(struct spdk_bdev *prev) 365 { 366 struct spdk_bdev *bdev; 367 368 bdev = _bdev_next_leaf(TAILQ_NEXT(prev, internal.link)); 369 370 if (bdev) { 371 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Continuing bdev iteration at %s\n", bdev->name); 372 } 373 374 return bdev; 375 } 376 377 struct spdk_bdev * 378 spdk_bdev_get_by_name(const char *bdev_name) 379 { 380 struct spdk_bdev_alias *tmp; 381 struct spdk_bdev *bdev = spdk_bdev_first(); 382 383 while (bdev != NULL) { 384 if (strcmp(bdev_name, bdev->name) == 0) { 385 return bdev; 386 } 387 388 TAILQ_FOREACH(tmp, &bdev->aliases, tailq) { 389 if (strcmp(bdev_name, tmp->alias) == 0) { 390 return bdev; 391 } 392 } 393 394 bdev = spdk_bdev_next(bdev); 395 } 396 397 return NULL; 398 } 399 400 void 401 spdk_bdev_io_set_buf(struct spdk_bdev_io *bdev_io, void *buf, size_t len) 402 { 403 struct iovec *iovs; 404 405 iovs = bdev_io->u.bdev.iovs; 406 407 assert(iovs != NULL); 408 assert(bdev_io->u.bdev.iovcnt >= 1); 409 410 iovs[0].iov_base = buf; 411 iovs[0].iov_len = len; 412 } 413 414 static bool 415 _is_buf_allocated(struct iovec *iovs) 416 { 417 return iovs[0].iov_base != NULL; 418 } 419 420 static bool 421 _are_iovs_aligned(struct iovec *iovs, int iovcnt, uint32_t alignment) 422 { 423 int i; 424 uintptr_t iov_base; 425 426 if (spdk_likely(alignment == 1)) { 427 return true; 428 } 429 430 for (i = 0; i < iovcnt; i++) { 431 iov_base = (uintptr_t)iovs[i].iov_base; 432 if ((iov_base & (alignment - 1)) != 0) { 433 return false; 434 } 435 } 436 437 return true; 438 } 439 440 static void 441 _copy_iovs_to_buf(void *buf, size_t buf_len, struct iovec *iovs, int iovcnt) 442 { 443 int i; 444 size_t len; 445 446 for (i = 0; i < iovcnt; i++) { 447 len = spdk_min(iovs[i].iov_len, buf_len); 448 memcpy(buf, iovs[i].iov_base, len); 449 buf += len; 450 buf_len -= len; 451 } 452 } 453 454 static void 455 _copy_buf_to_iovs(struct iovec *iovs, int iovcnt, void *buf, size_t buf_len) 456 { 457 int i; 458 size_t len; 459 460 for (i = 0; i < iovcnt; i++) { 461 len = spdk_min(iovs[i].iov_len, buf_len); 462 memcpy(iovs[i].iov_base, buf, len); 463 buf += len; 464 buf_len -= len; 465 } 466 } 467 468 static void 469 _bdev_io_set_bounce_buf(struct spdk_bdev_io *bdev_io, void *buf, size_t len) 470 { 471 /* save original iovec */ 472 bdev_io->internal.orig_iovs = bdev_io->u.bdev.iovs; 473 bdev_io->internal.orig_iovcnt = bdev_io->u.bdev.iovcnt; 474 /* set bounce iov */ 475 bdev_io->u.bdev.iovs = &bdev_io->internal.bounce_iov; 476 bdev_io->u.bdev.iovcnt = 1; 477 /* set bounce buffer for this operation */ 478 bdev_io->u.bdev.iovs[0].iov_base = buf; 479 bdev_io->u.bdev.iovs[0].iov_len = len; 480 /* if this is write path, copy data from original buffer to bounce buffer */ 481 if (bdev_io->type == SPDK_BDEV_IO_TYPE_WRITE) { 482 _copy_iovs_to_buf(buf, len, bdev_io->internal.orig_iovs, bdev_io->internal.orig_iovcnt); 483 } 484 } 485 486 static void 487 spdk_bdev_io_put_buf(struct spdk_bdev_io *bdev_io) 488 { 489 struct spdk_mempool *pool; 490 struct spdk_bdev_io *tmp; 491 void *buf, *aligned_buf; 492 bdev_io_stailq_t *stailq; 493 struct spdk_bdev_mgmt_channel *ch; 494 uint64_t buf_len; 495 uint64_t alignment; 496 bool buf_allocated; 497 498 buf = bdev_io->internal.buf; 499 buf_len = bdev_io->internal.buf_len; 500 alignment = spdk_bdev_get_buf_align(bdev_io->bdev); 501 ch = bdev_io->internal.ch->shared_resource->mgmt_ch; 502 503 bdev_io->internal.buf = NULL; 504 505 if (buf_len + alignment <= SPDK_BDEV_SMALL_BUF_MAX_SIZE + SPDK_BDEV_POOL_ALIGNMENT) { 506 pool = g_bdev_mgr.buf_small_pool; 507 stailq = &ch->need_buf_small; 508 } else { 509 pool = g_bdev_mgr.buf_large_pool; 510 stailq = &ch->need_buf_large; 511 } 512 513 if (STAILQ_EMPTY(stailq)) { 514 spdk_mempool_put(pool, buf); 515 } else { 516 tmp = STAILQ_FIRST(stailq); 517 518 alignment = spdk_bdev_get_buf_align(tmp->bdev); 519 buf_allocated = _is_buf_allocated(tmp->u.bdev.iovs); 520 521 aligned_buf = (void *)(((uintptr_t)buf + 522 (alignment - 1)) & ~(alignment - 1)); 523 if (buf_allocated) { 524 _bdev_io_set_bounce_buf(tmp, aligned_buf, tmp->internal.buf_len); 525 } else { 526 spdk_bdev_io_set_buf(tmp, aligned_buf, tmp->internal.buf_len); 527 } 528 529 STAILQ_REMOVE_HEAD(stailq, internal.buf_link); 530 tmp->internal.buf = buf; 531 tmp->internal.get_buf_cb(tmp->internal.ch->channel, tmp); 532 } 533 } 534 535 static void 536 _bdev_io_unset_bounce_buf(struct spdk_bdev_io *bdev_io) 537 { 538 /* if this is read path, copy data from bounce buffer to original buffer */ 539 if (bdev_io->type == SPDK_BDEV_IO_TYPE_READ && 540 bdev_io->internal.status == SPDK_BDEV_IO_STATUS_SUCCESS) { 541 _copy_buf_to_iovs(bdev_io->internal.orig_iovs, bdev_io->internal.orig_iovcnt, 542 bdev_io->internal.bounce_iov.iov_base, bdev_io->internal.bounce_iov.iov_len); 543 } 544 /* set orignal buffer for this io */ 545 bdev_io->u.bdev.iovcnt = bdev_io->internal.orig_iovcnt; 546 bdev_io->u.bdev.iovs = bdev_io->internal.orig_iovs; 547 /* disable bouncing buffer for this io */ 548 bdev_io->internal.orig_iovcnt = 0; 549 bdev_io->internal.orig_iovs = NULL; 550 /* return bounce buffer to the pool */ 551 spdk_bdev_io_put_buf(bdev_io); 552 } 553 554 void 555 spdk_bdev_io_get_buf(struct spdk_bdev_io *bdev_io, spdk_bdev_io_get_buf_cb cb, uint64_t len) 556 { 557 struct spdk_mempool *pool; 558 bdev_io_stailq_t *stailq; 559 void *buf, *aligned_buf; 560 struct spdk_bdev_mgmt_channel *mgmt_ch; 561 uint64_t alignment; 562 bool buf_allocated; 563 564 assert(cb != NULL); 565 assert(bdev_io->u.bdev.iovs != NULL); 566 567 alignment = spdk_bdev_get_buf_align(bdev_io->bdev); 568 buf_allocated = _is_buf_allocated(bdev_io->u.bdev.iovs); 569 570 if (buf_allocated && 571 _are_iovs_aligned(bdev_io->u.bdev.iovs, bdev_io->u.bdev.iovcnt, alignment)) { 572 /* Buffer already present and aligned */ 573 cb(bdev_io->internal.ch->channel, bdev_io); 574 return; 575 } 576 577 assert(len + alignment <= SPDK_BDEV_LARGE_BUF_MAX_SIZE + SPDK_BDEV_POOL_ALIGNMENT); 578 mgmt_ch = bdev_io->internal.ch->shared_resource->mgmt_ch; 579 580 bdev_io->internal.buf_len = len; 581 bdev_io->internal.get_buf_cb = cb; 582 583 if (len + alignment <= SPDK_BDEV_SMALL_BUF_MAX_SIZE + SPDK_BDEV_POOL_ALIGNMENT) { 584 pool = g_bdev_mgr.buf_small_pool; 585 stailq = &mgmt_ch->need_buf_small; 586 } else { 587 pool = g_bdev_mgr.buf_large_pool; 588 stailq = &mgmt_ch->need_buf_large; 589 } 590 591 buf = spdk_mempool_get(pool); 592 593 if (!buf) { 594 STAILQ_INSERT_TAIL(stailq, bdev_io, internal.buf_link); 595 } else { 596 aligned_buf = (void *)(((uintptr_t)buf + (alignment - 1)) & ~(alignment - 1)); 597 598 if (buf_allocated) { 599 _bdev_io_set_bounce_buf(bdev_io, aligned_buf, len); 600 } else { 601 spdk_bdev_io_set_buf(bdev_io, aligned_buf, len); 602 } 603 bdev_io->internal.buf = buf; 604 bdev_io->internal.get_buf_cb(bdev_io->internal.ch->channel, bdev_io); 605 } 606 } 607 608 static int 609 spdk_bdev_module_get_max_ctx_size(void) 610 { 611 struct spdk_bdev_module *bdev_module; 612 int max_bdev_module_size = 0; 613 614 TAILQ_FOREACH(bdev_module, &g_bdev_mgr.bdev_modules, internal.tailq) { 615 if (bdev_module->get_ctx_size && bdev_module->get_ctx_size() > max_bdev_module_size) { 616 max_bdev_module_size = bdev_module->get_ctx_size(); 617 } 618 } 619 620 return max_bdev_module_size; 621 } 622 623 void 624 spdk_bdev_config_text(FILE *fp) 625 { 626 struct spdk_bdev_module *bdev_module; 627 628 TAILQ_FOREACH(bdev_module, &g_bdev_mgr.bdev_modules, internal.tailq) { 629 if (bdev_module->config_text) { 630 bdev_module->config_text(fp); 631 } 632 } 633 } 634 635 static void 636 spdk_bdev_qos_config_json(struct spdk_bdev *bdev, struct spdk_json_write_ctx *w) 637 { 638 int i; 639 struct spdk_bdev_qos *qos = bdev->internal.qos; 640 uint64_t limits[SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES]; 641 642 if (!qos) { 643 return; 644 } 645 646 spdk_bdev_get_qos_rate_limits(bdev, limits); 647 648 spdk_json_write_object_begin(w); 649 spdk_json_write_named_string(w, "method", "set_bdev_qos_limit"); 650 spdk_json_write_name(w, "params"); 651 652 spdk_json_write_object_begin(w); 653 spdk_json_write_named_string(w, "name", bdev->name); 654 for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) { 655 if (limits[i] > 0) { 656 spdk_json_write_named_uint64(w, qos_rpc_type[i], limits[i]); 657 } 658 } 659 spdk_json_write_object_end(w); 660 661 spdk_json_write_object_end(w); 662 } 663 664 void 665 spdk_bdev_subsystem_config_json(struct spdk_json_write_ctx *w) 666 { 667 struct spdk_bdev_module *bdev_module; 668 struct spdk_bdev *bdev; 669 670 assert(w != NULL); 671 672 spdk_json_write_array_begin(w); 673 674 spdk_json_write_object_begin(w); 675 spdk_json_write_named_string(w, "method", "set_bdev_options"); 676 spdk_json_write_name(w, "params"); 677 spdk_json_write_object_begin(w); 678 spdk_json_write_named_uint32(w, "bdev_io_pool_size", g_bdev_opts.bdev_io_pool_size); 679 spdk_json_write_named_uint32(w, "bdev_io_cache_size", g_bdev_opts.bdev_io_cache_size); 680 spdk_json_write_object_end(w); 681 spdk_json_write_object_end(w); 682 683 TAILQ_FOREACH(bdev_module, &g_bdev_mgr.bdev_modules, internal.tailq) { 684 if (bdev_module->config_json) { 685 bdev_module->config_json(w); 686 } 687 } 688 689 TAILQ_FOREACH(bdev, &g_bdev_mgr.bdevs, internal.link) { 690 spdk_bdev_qos_config_json(bdev, w); 691 692 if (bdev->fn_table->write_config_json) { 693 bdev->fn_table->write_config_json(bdev, w); 694 } 695 } 696 697 spdk_json_write_array_end(w); 698 } 699 700 static int 701 spdk_bdev_mgmt_channel_create(void *io_device, void *ctx_buf) 702 { 703 struct spdk_bdev_mgmt_channel *ch = ctx_buf; 704 struct spdk_bdev_io *bdev_io; 705 uint32_t i; 706 707 STAILQ_INIT(&ch->need_buf_small); 708 STAILQ_INIT(&ch->need_buf_large); 709 710 STAILQ_INIT(&ch->per_thread_cache); 711 ch->bdev_io_cache_size = g_bdev_opts.bdev_io_cache_size; 712 713 /* Pre-populate bdev_io cache to ensure this thread cannot be starved. */ 714 ch->per_thread_cache_count = 0; 715 for (i = 0; i < ch->bdev_io_cache_size; i++) { 716 bdev_io = spdk_mempool_get(g_bdev_mgr.bdev_io_pool); 717 assert(bdev_io != NULL); 718 ch->per_thread_cache_count++; 719 STAILQ_INSERT_TAIL(&ch->per_thread_cache, bdev_io, internal.buf_link); 720 } 721 722 TAILQ_INIT(&ch->shared_resources); 723 TAILQ_INIT(&ch->io_wait_queue); 724 725 return 0; 726 } 727 728 static void 729 spdk_bdev_mgmt_channel_destroy(void *io_device, void *ctx_buf) 730 { 731 struct spdk_bdev_mgmt_channel *ch = ctx_buf; 732 struct spdk_bdev_io *bdev_io; 733 734 if (!STAILQ_EMPTY(&ch->need_buf_small) || !STAILQ_EMPTY(&ch->need_buf_large)) { 735 SPDK_ERRLOG("Pending I/O list wasn't empty on mgmt channel free\n"); 736 } 737 738 if (!TAILQ_EMPTY(&ch->shared_resources)) { 739 SPDK_ERRLOG("Module channel list wasn't empty on mgmt channel free\n"); 740 } 741 742 while (!STAILQ_EMPTY(&ch->per_thread_cache)) { 743 bdev_io = STAILQ_FIRST(&ch->per_thread_cache); 744 STAILQ_REMOVE_HEAD(&ch->per_thread_cache, internal.buf_link); 745 ch->per_thread_cache_count--; 746 spdk_mempool_put(g_bdev_mgr.bdev_io_pool, (void *)bdev_io); 747 } 748 749 assert(ch->per_thread_cache_count == 0); 750 } 751 752 static void 753 spdk_bdev_init_complete(int rc) 754 { 755 spdk_bdev_init_cb cb_fn = g_init_cb_fn; 756 void *cb_arg = g_init_cb_arg; 757 struct spdk_bdev_module *m; 758 759 g_bdev_mgr.init_complete = true; 760 g_init_cb_fn = NULL; 761 g_init_cb_arg = NULL; 762 763 /* 764 * For modules that need to know when subsystem init is complete, 765 * inform them now. 766 */ 767 if (rc == 0) { 768 TAILQ_FOREACH(m, &g_bdev_mgr.bdev_modules, internal.tailq) { 769 if (m->init_complete) { 770 m->init_complete(); 771 } 772 } 773 } 774 775 cb_fn(cb_arg, rc); 776 } 777 778 static void 779 spdk_bdev_module_action_complete(void) 780 { 781 struct spdk_bdev_module *m; 782 783 /* 784 * Don't finish bdev subsystem initialization if 785 * module pre-initialization is still in progress, or 786 * the subsystem been already initialized. 787 */ 788 if (!g_bdev_mgr.module_init_complete || g_bdev_mgr.init_complete) { 789 return; 790 } 791 792 /* 793 * Check all bdev modules for inits/examinations in progress. If any 794 * exist, return immediately since we cannot finish bdev subsystem 795 * initialization until all are completed. 796 */ 797 TAILQ_FOREACH(m, &g_bdev_mgr.bdev_modules, internal.tailq) { 798 if (m->internal.action_in_progress > 0) { 799 return; 800 } 801 } 802 803 /* 804 * Modules already finished initialization - now that all 805 * the bdev modules have finished their asynchronous I/O 806 * processing, the entire bdev layer can be marked as complete. 807 */ 808 spdk_bdev_init_complete(0); 809 } 810 811 static void 812 spdk_bdev_module_action_done(struct spdk_bdev_module *module) 813 { 814 assert(module->internal.action_in_progress > 0); 815 module->internal.action_in_progress--; 816 spdk_bdev_module_action_complete(); 817 } 818 819 void 820 spdk_bdev_module_init_done(struct spdk_bdev_module *module) 821 { 822 spdk_bdev_module_action_done(module); 823 } 824 825 void 826 spdk_bdev_module_examine_done(struct spdk_bdev_module *module) 827 { 828 spdk_bdev_module_action_done(module); 829 } 830 831 /** The last initialized bdev module */ 832 static struct spdk_bdev_module *g_resume_bdev_module = NULL; 833 834 static int 835 spdk_bdev_modules_init(void) 836 { 837 struct spdk_bdev_module *module; 838 int rc = 0; 839 840 TAILQ_FOREACH(module, &g_bdev_mgr.bdev_modules, internal.tailq) { 841 g_resume_bdev_module = module; 842 rc = module->module_init(); 843 if (rc != 0) { 844 return rc; 845 } 846 } 847 848 g_resume_bdev_module = NULL; 849 return 0; 850 } 851 852 853 static void 854 spdk_bdev_init_failed_complete(void *cb_arg) 855 { 856 spdk_bdev_init_complete(-1); 857 } 858 859 static void 860 spdk_bdev_init_failed(void *cb_arg) 861 { 862 spdk_bdev_finish(spdk_bdev_init_failed_complete, NULL); 863 } 864 865 void 866 spdk_bdev_initialize(spdk_bdev_init_cb cb_fn, void *cb_arg) 867 { 868 struct spdk_conf_section *sp; 869 struct spdk_bdev_opts bdev_opts; 870 int32_t bdev_io_pool_size, bdev_io_cache_size; 871 int cache_size; 872 int rc = 0; 873 char mempool_name[32]; 874 875 assert(cb_fn != NULL); 876 877 sp = spdk_conf_find_section(NULL, "Bdev"); 878 if (sp != NULL) { 879 spdk_bdev_get_opts(&bdev_opts); 880 881 bdev_io_pool_size = spdk_conf_section_get_intval(sp, "BdevIoPoolSize"); 882 if (bdev_io_pool_size >= 0) { 883 bdev_opts.bdev_io_pool_size = bdev_io_pool_size; 884 } 885 886 bdev_io_cache_size = spdk_conf_section_get_intval(sp, "BdevIoCacheSize"); 887 if (bdev_io_cache_size >= 0) { 888 bdev_opts.bdev_io_cache_size = bdev_io_cache_size; 889 } 890 891 if (spdk_bdev_set_opts(&bdev_opts)) { 892 spdk_bdev_init_complete(-1); 893 return; 894 } 895 896 assert(memcmp(&bdev_opts, &g_bdev_opts, sizeof(bdev_opts)) == 0); 897 } 898 899 g_init_cb_fn = cb_fn; 900 g_init_cb_arg = cb_arg; 901 902 snprintf(mempool_name, sizeof(mempool_name), "bdev_io_%d", getpid()); 903 904 g_bdev_mgr.bdev_io_pool = spdk_mempool_create(mempool_name, 905 g_bdev_opts.bdev_io_pool_size, 906 sizeof(struct spdk_bdev_io) + 907 spdk_bdev_module_get_max_ctx_size(), 908 0, 909 SPDK_ENV_SOCKET_ID_ANY); 910 911 if (g_bdev_mgr.bdev_io_pool == NULL) { 912 SPDK_ERRLOG("could not allocate spdk_bdev_io pool\n"); 913 spdk_bdev_init_complete(-1); 914 return; 915 } 916 917 /** 918 * Ensure no more than half of the total buffers end up local caches, by 919 * using spdk_thread_get_count() to determine how many local caches we need 920 * to account for. 921 */ 922 cache_size = BUF_SMALL_POOL_SIZE / (2 * spdk_thread_get_count()); 923 snprintf(mempool_name, sizeof(mempool_name), "buf_small_pool_%d", getpid()); 924 925 g_bdev_mgr.buf_small_pool = spdk_mempool_create(mempool_name, 926 BUF_SMALL_POOL_SIZE, 927 SPDK_BDEV_SMALL_BUF_MAX_SIZE + SPDK_BDEV_POOL_ALIGNMENT, 928 cache_size, 929 SPDK_ENV_SOCKET_ID_ANY); 930 if (!g_bdev_mgr.buf_small_pool) { 931 SPDK_ERRLOG("create rbuf small pool failed\n"); 932 spdk_bdev_init_complete(-1); 933 return; 934 } 935 936 cache_size = BUF_LARGE_POOL_SIZE / (2 * spdk_thread_get_count()); 937 snprintf(mempool_name, sizeof(mempool_name), "buf_large_pool_%d", getpid()); 938 939 g_bdev_mgr.buf_large_pool = spdk_mempool_create(mempool_name, 940 BUF_LARGE_POOL_SIZE, 941 SPDK_BDEV_LARGE_BUF_MAX_SIZE + SPDK_BDEV_POOL_ALIGNMENT, 942 cache_size, 943 SPDK_ENV_SOCKET_ID_ANY); 944 if (!g_bdev_mgr.buf_large_pool) { 945 SPDK_ERRLOG("create rbuf large pool failed\n"); 946 spdk_bdev_init_complete(-1); 947 return; 948 } 949 950 g_bdev_mgr.zero_buffer = spdk_dma_zmalloc(ZERO_BUFFER_SIZE, ZERO_BUFFER_SIZE, 951 NULL); 952 if (!g_bdev_mgr.zero_buffer) { 953 SPDK_ERRLOG("create bdev zero buffer failed\n"); 954 spdk_bdev_init_complete(-1); 955 return; 956 } 957 958 #ifdef SPDK_CONFIG_VTUNE 959 g_bdev_mgr.domain = __itt_domain_create("spdk_bdev"); 960 #endif 961 962 spdk_io_device_register(&g_bdev_mgr, spdk_bdev_mgmt_channel_create, 963 spdk_bdev_mgmt_channel_destroy, 964 sizeof(struct spdk_bdev_mgmt_channel), 965 "bdev_mgr"); 966 967 rc = spdk_bdev_modules_init(); 968 g_bdev_mgr.module_init_complete = true; 969 if (rc != 0) { 970 SPDK_ERRLOG("bdev modules init failed\n"); 971 spdk_thread_send_msg(spdk_get_thread(), spdk_bdev_init_failed, NULL); 972 return; 973 } 974 975 spdk_bdev_module_action_complete(); 976 } 977 978 static void 979 spdk_bdev_mgr_unregister_cb(void *io_device) 980 { 981 spdk_bdev_fini_cb cb_fn = g_fini_cb_fn; 982 983 if (spdk_mempool_count(g_bdev_mgr.bdev_io_pool) != g_bdev_opts.bdev_io_pool_size) { 984 SPDK_ERRLOG("bdev IO pool count is %zu but should be %u\n", 985 spdk_mempool_count(g_bdev_mgr.bdev_io_pool), 986 g_bdev_opts.bdev_io_pool_size); 987 } 988 989 if (spdk_mempool_count(g_bdev_mgr.buf_small_pool) != BUF_SMALL_POOL_SIZE) { 990 SPDK_ERRLOG("Small buffer pool count is %zu but should be %u\n", 991 spdk_mempool_count(g_bdev_mgr.buf_small_pool), 992 BUF_SMALL_POOL_SIZE); 993 assert(false); 994 } 995 996 if (spdk_mempool_count(g_bdev_mgr.buf_large_pool) != BUF_LARGE_POOL_SIZE) { 997 SPDK_ERRLOG("Large buffer pool count is %zu but should be %u\n", 998 spdk_mempool_count(g_bdev_mgr.buf_large_pool), 999 BUF_LARGE_POOL_SIZE); 1000 assert(false); 1001 } 1002 1003 spdk_mempool_free(g_bdev_mgr.bdev_io_pool); 1004 spdk_mempool_free(g_bdev_mgr.buf_small_pool); 1005 spdk_mempool_free(g_bdev_mgr.buf_large_pool); 1006 spdk_dma_free(g_bdev_mgr.zero_buffer); 1007 1008 cb_fn(g_fini_cb_arg); 1009 g_fini_cb_fn = NULL; 1010 g_fini_cb_arg = NULL; 1011 g_bdev_mgr.init_complete = false; 1012 g_bdev_mgr.module_init_complete = false; 1013 } 1014 1015 static void 1016 spdk_bdev_module_finish_iter(void *arg) 1017 { 1018 struct spdk_bdev_module *bdev_module; 1019 1020 /* Start iterating from the last touched module */ 1021 if (!g_resume_bdev_module) { 1022 bdev_module = TAILQ_LAST(&g_bdev_mgr.bdev_modules, bdev_module_list); 1023 } else { 1024 bdev_module = TAILQ_PREV(g_resume_bdev_module, bdev_module_list, 1025 internal.tailq); 1026 } 1027 1028 while (bdev_module) { 1029 if (bdev_module->async_fini) { 1030 /* Save our place so we can resume later. We must 1031 * save the variable here, before calling module_fini() 1032 * below, because in some cases the module may immediately 1033 * call spdk_bdev_module_finish_done() and re-enter 1034 * this function to continue iterating. */ 1035 g_resume_bdev_module = bdev_module; 1036 } 1037 1038 if (bdev_module->module_fini) { 1039 bdev_module->module_fini(); 1040 } 1041 1042 if (bdev_module->async_fini) { 1043 return; 1044 } 1045 1046 bdev_module = TAILQ_PREV(bdev_module, bdev_module_list, 1047 internal.tailq); 1048 } 1049 1050 g_resume_bdev_module = NULL; 1051 spdk_io_device_unregister(&g_bdev_mgr, spdk_bdev_mgr_unregister_cb); 1052 } 1053 1054 void 1055 spdk_bdev_module_finish_done(void) 1056 { 1057 if (spdk_get_thread() != g_fini_thread) { 1058 spdk_thread_send_msg(g_fini_thread, spdk_bdev_module_finish_iter, NULL); 1059 } else { 1060 spdk_bdev_module_finish_iter(NULL); 1061 } 1062 } 1063 1064 static void 1065 _spdk_bdev_finish_unregister_bdevs_iter(void *cb_arg, int bdeverrno) 1066 { 1067 struct spdk_bdev *bdev = cb_arg; 1068 1069 if (bdeverrno && bdev) { 1070 SPDK_WARNLOG("Unable to unregister bdev '%s' during spdk_bdev_finish()\n", 1071 bdev->name); 1072 1073 /* 1074 * Since the call to spdk_bdev_unregister() failed, we have no way to free this 1075 * bdev; try to continue by manually removing this bdev from the list and continue 1076 * with the next bdev in the list. 1077 */ 1078 TAILQ_REMOVE(&g_bdev_mgr.bdevs, bdev, internal.link); 1079 } 1080 1081 if (TAILQ_EMPTY(&g_bdev_mgr.bdevs)) { 1082 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Done unregistering bdevs\n"); 1083 /* 1084 * Bdev module finish need to be deferred as we might be in the middle of some context 1085 * (like bdev part free) that will use this bdev (or private bdev driver ctx data) 1086 * after returning. 1087 */ 1088 spdk_thread_send_msg(spdk_get_thread(), spdk_bdev_module_finish_iter, NULL); 1089 return; 1090 } 1091 1092 /* 1093 * Unregister last unclaimed bdev in the list, to ensure that bdev subsystem 1094 * shutdown proceeds top-down. The goal is to give virtual bdevs an opportunity 1095 * to detect clean shutdown as opposed to run-time hot removal of the underlying 1096 * base bdevs. 1097 * 1098 * Also, walk the list in the reverse order. 1099 */ 1100 for (bdev = TAILQ_LAST(&g_bdev_mgr.bdevs, spdk_bdev_list); 1101 bdev; bdev = TAILQ_PREV(bdev, spdk_bdev_list, internal.link)) { 1102 if (bdev->internal.claim_module != NULL) { 1103 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Skipping claimed bdev '%s'(<-'%s').\n", 1104 bdev->name, bdev->internal.claim_module->name); 1105 continue; 1106 } 1107 1108 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Unregistering bdev '%s'\n", bdev->name); 1109 spdk_bdev_unregister(bdev, _spdk_bdev_finish_unregister_bdevs_iter, bdev); 1110 return; 1111 } 1112 1113 /* 1114 * If any bdev fails to unclaim underlying bdev properly, we may face the 1115 * case of bdev list consisting of claimed bdevs only (if claims are managed 1116 * correctly, this would mean there's a loop in the claims graph which is 1117 * clearly impossible). Warn and unregister last bdev on the list then. 1118 */ 1119 for (bdev = TAILQ_LAST(&g_bdev_mgr.bdevs, spdk_bdev_list); 1120 bdev; bdev = TAILQ_PREV(bdev, spdk_bdev_list, internal.link)) { 1121 SPDK_ERRLOG("Unregistering claimed bdev '%s'!\n", bdev->name); 1122 spdk_bdev_unregister(bdev, _spdk_bdev_finish_unregister_bdevs_iter, bdev); 1123 return; 1124 } 1125 } 1126 1127 void 1128 spdk_bdev_finish(spdk_bdev_fini_cb cb_fn, void *cb_arg) 1129 { 1130 struct spdk_bdev_module *m; 1131 1132 assert(cb_fn != NULL); 1133 1134 g_fini_thread = spdk_get_thread(); 1135 1136 g_fini_cb_fn = cb_fn; 1137 g_fini_cb_arg = cb_arg; 1138 1139 TAILQ_FOREACH(m, &g_bdev_mgr.bdev_modules, internal.tailq) { 1140 if (m->fini_start) { 1141 m->fini_start(); 1142 } 1143 } 1144 1145 _spdk_bdev_finish_unregister_bdevs_iter(NULL, 0); 1146 } 1147 1148 static struct spdk_bdev_io * 1149 spdk_bdev_get_io(struct spdk_bdev_channel *channel) 1150 { 1151 struct spdk_bdev_mgmt_channel *ch = channel->shared_resource->mgmt_ch; 1152 struct spdk_bdev_io *bdev_io; 1153 1154 if (ch->per_thread_cache_count > 0) { 1155 bdev_io = STAILQ_FIRST(&ch->per_thread_cache); 1156 STAILQ_REMOVE_HEAD(&ch->per_thread_cache, internal.buf_link); 1157 ch->per_thread_cache_count--; 1158 } else if (spdk_unlikely(!TAILQ_EMPTY(&ch->io_wait_queue))) { 1159 /* 1160 * Don't try to look for bdev_ios in the global pool if there are 1161 * waiters on bdev_ios - we don't want this caller to jump the line. 1162 */ 1163 bdev_io = NULL; 1164 } else { 1165 bdev_io = spdk_mempool_get(g_bdev_mgr.bdev_io_pool); 1166 } 1167 1168 return bdev_io; 1169 } 1170 1171 void 1172 spdk_bdev_free_io(struct spdk_bdev_io *bdev_io) 1173 { 1174 struct spdk_bdev_mgmt_channel *ch = bdev_io->internal.ch->shared_resource->mgmt_ch; 1175 1176 assert(bdev_io != NULL); 1177 assert(bdev_io->internal.status != SPDK_BDEV_IO_STATUS_PENDING); 1178 1179 if (bdev_io->internal.buf != NULL) { 1180 spdk_bdev_io_put_buf(bdev_io); 1181 } 1182 1183 if (ch->per_thread_cache_count < ch->bdev_io_cache_size) { 1184 ch->per_thread_cache_count++; 1185 STAILQ_INSERT_TAIL(&ch->per_thread_cache, bdev_io, internal.buf_link); 1186 while (ch->per_thread_cache_count > 0 && !TAILQ_EMPTY(&ch->io_wait_queue)) { 1187 struct spdk_bdev_io_wait_entry *entry; 1188 1189 entry = TAILQ_FIRST(&ch->io_wait_queue); 1190 TAILQ_REMOVE(&ch->io_wait_queue, entry, link); 1191 entry->cb_fn(entry->cb_arg); 1192 } 1193 } else { 1194 /* We should never have a full cache with entries on the io wait queue. */ 1195 assert(TAILQ_EMPTY(&ch->io_wait_queue)); 1196 spdk_mempool_put(g_bdev_mgr.bdev_io_pool, (void *)bdev_io); 1197 } 1198 } 1199 1200 static bool 1201 _spdk_bdev_qos_is_iops_rate_limit(enum spdk_bdev_qos_rate_limit_type limit) 1202 { 1203 assert(limit != SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES); 1204 1205 switch (limit) { 1206 case SPDK_BDEV_QOS_RW_IOPS_RATE_LIMIT: 1207 return true; 1208 case SPDK_BDEV_QOS_RW_BPS_RATE_LIMIT: 1209 return false; 1210 case SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES: 1211 default: 1212 return false; 1213 } 1214 } 1215 1216 static bool 1217 _spdk_bdev_qos_io_to_limit(struct spdk_bdev_io *bdev_io) 1218 { 1219 switch (bdev_io->type) { 1220 case SPDK_BDEV_IO_TYPE_NVME_IO: 1221 case SPDK_BDEV_IO_TYPE_NVME_IO_MD: 1222 case SPDK_BDEV_IO_TYPE_READ: 1223 case SPDK_BDEV_IO_TYPE_WRITE: 1224 case SPDK_BDEV_IO_TYPE_UNMAP: 1225 case SPDK_BDEV_IO_TYPE_WRITE_ZEROES: 1226 return true; 1227 default: 1228 return false; 1229 } 1230 } 1231 1232 static uint64_t 1233 _spdk_bdev_get_io_size_in_byte(struct spdk_bdev_io *bdev_io) 1234 { 1235 struct spdk_bdev *bdev = bdev_io->bdev; 1236 1237 switch (bdev_io->type) { 1238 case SPDK_BDEV_IO_TYPE_NVME_IO: 1239 case SPDK_BDEV_IO_TYPE_NVME_IO_MD: 1240 return bdev_io->u.nvme_passthru.nbytes; 1241 case SPDK_BDEV_IO_TYPE_READ: 1242 case SPDK_BDEV_IO_TYPE_WRITE: 1243 case SPDK_BDEV_IO_TYPE_UNMAP: 1244 case SPDK_BDEV_IO_TYPE_WRITE_ZEROES: 1245 return bdev_io->u.bdev.num_blocks * bdev->blocklen; 1246 default: 1247 return 0; 1248 } 1249 } 1250 1251 static void 1252 _spdk_bdev_qos_update_per_io(struct spdk_bdev_qos *qos, uint64_t io_size_in_byte) 1253 { 1254 int i; 1255 1256 for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) { 1257 if (qos->rate_limits[i].limit == SPDK_BDEV_QOS_LIMIT_NOT_DEFINED) { 1258 continue; 1259 } 1260 1261 switch (i) { 1262 case SPDK_BDEV_QOS_RW_IOPS_RATE_LIMIT: 1263 qos->rate_limits[i].remaining_this_timeslice--; 1264 break; 1265 case SPDK_BDEV_QOS_RW_BPS_RATE_LIMIT: 1266 qos->rate_limits[i].remaining_this_timeslice -= io_size_in_byte; 1267 break; 1268 case SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES: 1269 default: 1270 break; 1271 } 1272 } 1273 } 1274 1275 static int 1276 _spdk_bdev_qos_io_submit(struct spdk_bdev_channel *ch, struct spdk_bdev_qos *qos) 1277 { 1278 struct spdk_bdev_io *bdev_io = NULL; 1279 struct spdk_bdev *bdev = ch->bdev; 1280 struct spdk_bdev_shared_resource *shared_resource = ch->shared_resource; 1281 int i, submitted_ios = 0; 1282 bool to_limit_io; 1283 uint64_t io_size_in_byte; 1284 1285 while (!TAILQ_EMPTY(&qos->queued)) { 1286 for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) { 1287 if (qos->rate_limits[i].max_per_timeslice > 0 && 1288 (qos->rate_limits[i].remaining_this_timeslice <= 0)) { 1289 return submitted_ios; 1290 } 1291 } 1292 1293 bdev_io = TAILQ_FIRST(&qos->queued); 1294 TAILQ_REMOVE(&qos->queued, bdev_io, internal.link); 1295 ch->io_outstanding++; 1296 shared_resource->io_outstanding++; 1297 to_limit_io = _spdk_bdev_qos_io_to_limit(bdev_io); 1298 if (to_limit_io == true) { 1299 io_size_in_byte = _spdk_bdev_get_io_size_in_byte(bdev_io); 1300 _spdk_bdev_qos_update_per_io(qos, io_size_in_byte); 1301 } 1302 bdev->fn_table->submit_request(ch->channel, bdev_io); 1303 submitted_ios++; 1304 } 1305 1306 return submitted_ios; 1307 } 1308 1309 static void 1310 _spdk_bdev_queue_io_wait_with_cb(struct spdk_bdev_io *bdev_io, spdk_bdev_io_wait_cb cb_fn) 1311 { 1312 int rc; 1313 1314 bdev_io->internal.waitq_entry.bdev = bdev_io->bdev; 1315 bdev_io->internal.waitq_entry.cb_fn = cb_fn; 1316 bdev_io->internal.waitq_entry.cb_arg = bdev_io; 1317 rc = spdk_bdev_queue_io_wait(bdev_io->bdev, spdk_io_channel_from_ctx(bdev_io->internal.ch), 1318 &bdev_io->internal.waitq_entry); 1319 if (rc != 0) { 1320 SPDK_ERRLOG("Queue IO failed, rc=%d\n", rc); 1321 bdev_io->internal.status = SPDK_BDEV_IO_STATUS_FAILED; 1322 bdev_io->internal.cb(bdev_io, false, bdev_io->internal.caller_ctx); 1323 } 1324 } 1325 1326 static bool 1327 _spdk_bdev_io_type_can_split(uint8_t type) 1328 { 1329 assert(type != SPDK_BDEV_IO_TYPE_INVALID); 1330 assert(type < SPDK_BDEV_NUM_IO_TYPES); 1331 1332 /* Only split READ and WRITE I/O. Theoretically other types of I/O like 1333 * UNMAP could be split, but these types of I/O are typically much larger 1334 * in size (sometimes the size of the entire block device), and the bdev 1335 * module can more efficiently split these types of I/O. Plus those types 1336 * of I/O do not have a payload, which makes the splitting process simpler. 1337 */ 1338 if (type == SPDK_BDEV_IO_TYPE_READ || type == SPDK_BDEV_IO_TYPE_WRITE) { 1339 return true; 1340 } else { 1341 return false; 1342 } 1343 } 1344 1345 static bool 1346 _spdk_bdev_io_should_split(struct spdk_bdev_io *bdev_io) 1347 { 1348 uint64_t start_stripe, end_stripe; 1349 uint32_t io_boundary = bdev_io->bdev->optimal_io_boundary; 1350 1351 if (io_boundary == 0) { 1352 return false; 1353 } 1354 1355 if (!_spdk_bdev_io_type_can_split(bdev_io->type)) { 1356 return false; 1357 } 1358 1359 start_stripe = bdev_io->u.bdev.offset_blocks; 1360 end_stripe = start_stripe + bdev_io->u.bdev.num_blocks - 1; 1361 /* Avoid expensive div operations if possible. These spdk_u32 functions are very cheap. */ 1362 if (spdk_likely(spdk_u32_is_pow2(io_boundary))) { 1363 start_stripe >>= spdk_u32log2(io_boundary); 1364 end_stripe >>= spdk_u32log2(io_boundary); 1365 } else { 1366 start_stripe /= io_boundary; 1367 end_stripe /= io_boundary; 1368 } 1369 return (start_stripe != end_stripe); 1370 } 1371 1372 static uint32_t 1373 _to_next_boundary(uint64_t offset, uint32_t boundary) 1374 { 1375 return (boundary - (offset % boundary)); 1376 } 1377 1378 static void 1379 _spdk_bdev_io_split_done(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg); 1380 1381 static void 1382 _spdk_bdev_io_split_with_payload(void *_bdev_io) 1383 { 1384 struct spdk_bdev_io *bdev_io = _bdev_io; 1385 uint64_t current_offset, remaining; 1386 uint32_t blocklen, to_next_boundary, to_next_boundary_bytes; 1387 struct iovec *parent_iov, *iov; 1388 uint64_t parent_iov_offset, iov_len; 1389 uint32_t parent_iovpos, parent_iovcnt, child_iovcnt, iovcnt; 1390 int rc; 1391 1392 remaining = bdev_io->u.bdev.split_remaining_num_blocks; 1393 current_offset = bdev_io->u.bdev.split_current_offset_blocks; 1394 blocklen = bdev_io->bdev->blocklen; 1395 parent_iov_offset = (current_offset - bdev_io->u.bdev.offset_blocks) * blocklen; 1396 parent_iovcnt = bdev_io->u.bdev.iovcnt; 1397 1398 for (parent_iovpos = 0; parent_iovpos < parent_iovcnt; parent_iovpos++) { 1399 parent_iov = &bdev_io->u.bdev.iovs[parent_iovpos]; 1400 if (parent_iov_offset < parent_iov->iov_len) { 1401 break; 1402 } 1403 parent_iov_offset -= parent_iov->iov_len; 1404 } 1405 1406 child_iovcnt = 0; 1407 while (remaining > 0 && parent_iovpos < parent_iovcnt && child_iovcnt < BDEV_IO_NUM_CHILD_IOV) { 1408 to_next_boundary = _to_next_boundary(current_offset, bdev_io->bdev->optimal_io_boundary); 1409 to_next_boundary = spdk_min(remaining, to_next_boundary); 1410 to_next_boundary_bytes = to_next_boundary * blocklen; 1411 iov = &bdev_io->child_iov[child_iovcnt]; 1412 iovcnt = 0; 1413 while (to_next_boundary_bytes > 0 && parent_iovpos < parent_iovcnt && 1414 child_iovcnt < BDEV_IO_NUM_CHILD_IOV) { 1415 parent_iov = &bdev_io->u.bdev.iovs[parent_iovpos]; 1416 iov_len = spdk_min(to_next_boundary_bytes, parent_iov->iov_len - parent_iov_offset); 1417 to_next_boundary_bytes -= iov_len; 1418 1419 bdev_io->child_iov[child_iovcnt].iov_base = parent_iov->iov_base + parent_iov_offset; 1420 bdev_io->child_iov[child_iovcnt].iov_len = iov_len; 1421 1422 if (iov_len < parent_iov->iov_len - parent_iov_offset) { 1423 parent_iov_offset += iov_len; 1424 } else { 1425 parent_iovpos++; 1426 parent_iov_offset = 0; 1427 } 1428 child_iovcnt++; 1429 iovcnt++; 1430 } 1431 1432 if (to_next_boundary_bytes > 0) { 1433 /* We had to stop this child I/O early because we ran out of 1434 * child_iov space. Make sure the iovs collected are valid and 1435 * then adjust to_next_boundary before starting the child I/O. 1436 */ 1437 if ((to_next_boundary_bytes % blocklen) != 0) { 1438 SPDK_ERRLOG("Remaining %" PRIu32 " is not multiple of block size %" PRIu32 "\n", 1439 to_next_boundary_bytes, blocklen); 1440 bdev_io->internal.status = SPDK_BDEV_IO_STATUS_FAILED; 1441 if (bdev_io->u.bdev.split_outstanding == 0) { 1442 bdev_io->internal.cb(bdev_io, false, bdev_io->internal.caller_ctx); 1443 } 1444 return; 1445 } 1446 to_next_boundary -= to_next_boundary_bytes / blocklen; 1447 } 1448 1449 bdev_io->u.bdev.split_outstanding++; 1450 1451 if (bdev_io->type == SPDK_BDEV_IO_TYPE_READ) { 1452 rc = spdk_bdev_readv_blocks(bdev_io->internal.desc, 1453 spdk_io_channel_from_ctx(bdev_io->internal.ch), 1454 iov, iovcnt, current_offset, to_next_boundary, 1455 _spdk_bdev_io_split_done, bdev_io); 1456 } else { 1457 rc = spdk_bdev_writev_blocks(bdev_io->internal.desc, 1458 spdk_io_channel_from_ctx(bdev_io->internal.ch), 1459 iov, iovcnt, current_offset, to_next_boundary, 1460 _spdk_bdev_io_split_done, bdev_io); 1461 } 1462 1463 if (rc == 0) { 1464 current_offset += to_next_boundary; 1465 remaining -= to_next_boundary; 1466 bdev_io->u.bdev.split_current_offset_blocks = current_offset; 1467 bdev_io->u.bdev.split_remaining_num_blocks = remaining; 1468 } else { 1469 bdev_io->u.bdev.split_outstanding--; 1470 if (rc == -ENOMEM) { 1471 if (bdev_io->u.bdev.split_outstanding == 0) { 1472 /* No I/O is outstanding. Hence we should wait here. */ 1473 _spdk_bdev_queue_io_wait_with_cb(bdev_io, 1474 _spdk_bdev_io_split_with_payload); 1475 } 1476 } else { 1477 bdev_io->internal.status = SPDK_BDEV_IO_STATUS_FAILED; 1478 if (bdev_io->u.bdev.split_outstanding == 0) { 1479 bdev_io->internal.cb(bdev_io, false, bdev_io->internal.caller_ctx); 1480 } 1481 } 1482 1483 return; 1484 } 1485 } 1486 } 1487 1488 static void 1489 _spdk_bdev_io_split_done(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg) 1490 { 1491 struct spdk_bdev_io *parent_io = cb_arg; 1492 1493 spdk_bdev_free_io(bdev_io); 1494 1495 if (!success) { 1496 parent_io->internal.status = SPDK_BDEV_IO_STATUS_FAILED; 1497 } 1498 parent_io->u.bdev.split_outstanding--; 1499 if (parent_io->u.bdev.split_outstanding != 0) { 1500 return; 1501 } 1502 1503 /* 1504 * Parent I/O finishes when all blocks are consumed or there is any failure of 1505 * child I/O and no outstanding child I/O. 1506 */ 1507 if (parent_io->u.bdev.split_remaining_num_blocks == 0 || 1508 parent_io->internal.status != SPDK_BDEV_IO_STATUS_SUCCESS) { 1509 parent_io->internal.cb(parent_io, parent_io->internal.status == SPDK_BDEV_IO_STATUS_SUCCESS, 1510 parent_io->internal.caller_ctx); 1511 return; 1512 } 1513 1514 /* 1515 * Continue with the splitting process. This function will complete the parent I/O if the 1516 * splitting is done. 1517 */ 1518 _spdk_bdev_io_split_with_payload(parent_io); 1519 } 1520 1521 static void 1522 _spdk_bdev_io_split(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io) 1523 { 1524 assert(_spdk_bdev_io_type_can_split(bdev_io->type)); 1525 1526 bdev_io->u.bdev.split_current_offset_blocks = bdev_io->u.bdev.offset_blocks; 1527 bdev_io->u.bdev.split_remaining_num_blocks = bdev_io->u.bdev.num_blocks; 1528 bdev_io->u.bdev.split_outstanding = 0; 1529 bdev_io->internal.status = SPDK_BDEV_IO_STATUS_SUCCESS; 1530 1531 _spdk_bdev_io_split_with_payload(bdev_io); 1532 } 1533 1534 static void 1535 _spdk_bdev_io_submit(void *ctx) 1536 { 1537 struct spdk_bdev_io *bdev_io = ctx; 1538 struct spdk_bdev *bdev = bdev_io->bdev; 1539 struct spdk_bdev_channel *bdev_ch = bdev_io->internal.ch; 1540 struct spdk_io_channel *ch = bdev_ch->channel; 1541 struct spdk_bdev_shared_resource *shared_resource = bdev_ch->shared_resource; 1542 uint64_t tsc; 1543 1544 tsc = spdk_get_ticks(); 1545 bdev_io->internal.submit_tsc = tsc; 1546 spdk_trace_record_tsc(tsc, TRACE_BDEV_IO_START, 0, 0, (uintptr_t)bdev_io, bdev_io->type); 1547 bdev_ch->io_outstanding++; 1548 shared_resource->io_outstanding++; 1549 bdev_io->internal.in_submit_request = true; 1550 if (spdk_likely(bdev_ch->flags == 0)) { 1551 if (spdk_likely(TAILQ_EMPTY(&shared_resource->nomem_io))) { 1552 bdev->fn_table->submit_request(ch, bdev_io); 1553 } else { 1554 bdev_ch->io_outstanding--; 1555 shared_resource->io_outstanding--; 1556 TAILQ_INSERT_TAIL(&shared_resource->nomem_io, bdev_io, internal.link); 1557 } 1558 } else if (bdev_ch->flags & BDEV_CH_RESET_IN_PROGRESS) { 1559 spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED); 1560 } else if (bdev_ch->flags & BDEV_CH_QOS_ENABLED) { 1561 bdev_ch->io_outstanding--; 1562 shared_resource->io_outstanding--; 1563 TAILQ_INSERT_TAIL(&bdev->internal.qos->queued, bdev_io, internal.link); 1564 _spdk_bdev_qos_io_submit(bdev_ch, bdev->internal.qos); 1565 } else { 1566 SPDK_ERRLOG("unknown bdev_ch flag %x found\n", bdev_ch->flags); 1567 spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED); 1568 } 1569 bdev_io->internal.in_submit_request = false; 1570 } 1571 1572 static void 1573 spdk_bdev_io_submit(struct spdk_bdev_io *bdev_io) 1574 { 1575 struct spdk_bdev *bdev = bdev_io->bdev; 1576 struct spdk_thread *thread = spdk_io_channel_get_thread(bdev_io->internal.ch->channel); 1577 1578 assert(thread != NULL); 1579 assert(bdev_io->internal.status == SPDK_BDEV_IO_STATUS_PENDING); 1580 1581 if (bdev->split_on_optimal_io_boundary && _spdk_bdev_io_should_split(bdev_io)) { 1582 if (bdev_io->type == SPDK_BDEV_IO_TYPE_READ) { 1583 spdk_bdev_io_get_buf(bdev_io, _spdk_bdev_io_split, 1584 bdev_io->u.bdev.num_blocks * bdev_io->bdev->blocklen); 1585 } else { 1586 _spdk_bdev_io_split(NULL, bdev_io); 1587 } 1588 return; 1589 } 1590 1591 if (bdev_io->internal.ch->flags & BDEV_CH_QOS_ENABLED) { 1592 if ((thread == bdev->internal.qos->thread) || !bdev->internal.qos->thread) { 1593 _spdk_bdev_io_submit(bdev_io); 1594 } else { 1595 bdev_io->internal.io_submit_ch = bdev_io->internal.ch; 1596 bdev_io->internal.ch = bdev->internal.qos->ch; 1597 spdk_thread_send_msg(bdev->internal.qos->thread, _spdk_bdev_io_submit, bdev_io); 1598 } 1599 } else { 1600 _spdk_bdev_io_submit(bdev_io); 1601 } 1602 } 1603 1604 static void 1605 spdk_bdev_io_submit_reset(struct spdk_bdev_io *bdev_io) 1606 { 1607 struct spdk_bdev *bdev = bdev_io->bdev; 1608 struct spdk_bdev_channel *bdev_ch = bdev_io->internal.ch; 1609 struct spdk_io_channel *ch = bdev_ch->channel; 1610 1611 assert(bdev_io->internal.status == SPDK_BDEV_IO_STATUS_PENDING); 1612 1613 bdev_io->internal.in_submit_request = true; 1614 bdev->fn_table->submit_request(ch, bdev_io); 1615 bdev_io->internal.in_submit_request = false; 1616 } 1617 1618 static void 1619 spdk_bdev_io_init(struct spdk_bdev_io *bdev_io, 1620 struct spdk_bdev *bdev, void *cb_arg, 1621 spdk_bdev_io_completion_cb cb) 1622 { 1623 bdev_io->bdev = bdev; 1624 bdev_io->internal.caller_ctx = cb_arg; 1625 bdev_io->internal.cb = cb; 1626 bdev_io->internal.status = SPDK_BDEV_IO_STATUS_PENDING; 1627 bdev_io->internal.in_submit_request = false; 1628 bdev_io->internal.buf = NULL; 1629 bdev_io->internal.io_submit_ch = NULL; 1630 bdev_io->internal.orig_iovs = NULL; 1631 bdev_io->internal.orig_iovcnt = 0; 1632 } 1633 1634 static bool 1635 _spdk_bdev_io_type_supported(struct spdk_bdev *bdev, enum spdk_bdev_io_type io_type) 1636 { 1637 return bdev->fn_table->io_type_supported(bdev->ctxt, io_type); 1638 } 1639 1640 bool 1641 spdk_bdev_io_type_supported(struct spdk_bdev *bdev, enum spdk_bdev_io_type io_type) 1642 { 1643 bool supported; 1644 1645 supported = _spdk_bdev_io_type_supported(bdev, io_type); 1646 1647 if (!supported) { 1648 switch (io_type) { 1649 case SPDK_BDEV_IO_TYPE_WRITE_ZEROES: 1650 /* The bdev layer will emulate write zeroes as long as write is supported. */ 1651 supported = _spdk_bdev_io_type_supported(bdev, SPDK_BDEV_IO_TYPE_WRITE); 1652 break; 1653 default: 1654 break; 1655 } 1656 } 1657 1658 return supported; 1659 } 1660 1661 int 1662 spdk_bdev_dump_info_json(struct spdk_bdev *bdev, struct spdk_json_write_ctx *w) 1663 { 1664 if (bdev->fn_table->dump_info_json) { 1665 return bdev->fn_table->dump_info_json(bdev->ctxt, w); 1666 } 1667 1668 return 0; 1669 } 1670 1671 static void 1672 spdk_bdev_qos_update_max_quota_per_timeslice(struct spdk_bdev_qos *qos) 1673 { 1674 uint32_t max_per_timeslice = 0; 1675 int i; 1676 1677 for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) { 1678 if (qos->rate_limits[i].limit == SPDK_BDEV_QOS_LIMIT_NOT_DEFINED) { 1679 qos->rate_limits[i].max_per_timeslice = 0; 1680 continue; 1681 } 1682 1683 max_per_timeslice = qos->rate_limits[i].limit * 1684 SPDK_BDEV_QOS_TIMESLICE_IN_USEC / SPDK_SEC_TO_USEC; 1685 1686 qos->rate_limits[i].max_per_timeslice = spdk_max(max_per_timeslice, 1687 qos->rate_limits[i].min_per_timeslice); 1688 1689 qos->rate_limits[i].remaining_this_timeslice = qos->rate_limits[i].max_per_timeslice; 1690 } 1691 } 1692 1693 static int 1694 spdk_bdev_channel_poll_qos(void *arg) 1695 { 1696 struct spdk_bdev_qos *qos = arg; 1697 uint64_t now = spdk_get_ticks(); 1698 int i; 1699 1700 if (now < (qos->last_timeslice + qos->timeslice_size)) { 1701 /* We received our callback earlier than expected - return 1702 * immediately and wait to do accounting until at least one 1703 * timeslice has actually expired. This should never happen 1704 * with a well-behaved timer implementation. 1705 */ 1706 return 0; 1707 } 1708 1709 /* Reset for next round of rate limiting */ 1710 for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) { 1711 /* We may have allowed the IOs or bytes to slightly overrun in the last 1712 * timeslice. remaining_this_timeslice is signed, so if it's negative 1713 * here, we'll account for the overrun so that the next timeslice will 1714 * be appropriately reduced. 1715 */ 1716 if (qos->rate_limits[i].remaining_this_timeslice > 0) { 1717 qos->rate_limits[i].remaining_this_timeslice = 0; 1718 } 1719 } 1720 1721 while (now >= (qos->last_timeslice + qos->timeslice_size)) { 1722 qos->last_timeslice += qos->timeslice_size; 1723 for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) { 1724 qos->rate_limits[i].remaining_this_timeslice += 1725 qos->rate_limits[i].max_per_timeslice; 1726 } 1727 } 1728 1729 return _spdk_bdev_qos_io_submit(qos->ch, qos); 1730 } 1731 1732 static void 1733 _spdk_bdev_channel_destroy_resource(struct spdk_bdev_channel *ch) 1734 { 1735 struct spdk_bdev_shared_resource *shared_resource; 1736 1737 if (!ch) { 1738 return; 1739 } 1740 1741 if (ch->channel) { 1742 spdk_put_io_channel(ch->channel); 1743 } 1744 1745 assert(ch->io_outstanding == 0); 1746 1747 shared_resource = ch->shared_resource; 1748 if (shared_resource) { 1749 assert(ch->io_outstanding == 0); 1750 assert(shared_resource->ref > 0); 1751 shared_resource->ref--; 1752 if (shared_resource->ref == 0) { 1753 assert(shared_resource->io_outstanding == 0); 1754 TAILQ_REMOVE(&shared_resource->mgmt_ch->shared_resources, shared_resource, link); 1755 spdk_put_io_channel(spdk_io_channel_from_ctx(shared_resource->mgmt_ch)); 1756 free(shared_resource); 1757 } 1758 } 1759 } 1760 1761 /* Caller must hold bdev->internal.mutex. */ 1762 static void 1763 _spdk_bdev_enable_qos(struct spdk_bdev *bdev, struct spdk_bdev_channel *ch) 1764 { 1765 struct spdk_bdev_qos *qos = bdev->internal.qos; 1766 int i; 1767 1768 /* Rate limiting on this bdev enabled */ 1769 if (qos) { 1770 if (qos->ch == NULL) { 1771 struct spdk_io_channel *io_ch; 1772 1773 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Selecting channel %p as QoS channel for bdev %s on thread %p\n", ch, 1774 bdev->name, spdk_get_thread()); 1775 1776 /* No qos channel has been selected, so set one up */ 1777 1778 /* Take another reference to ch */ 1779 io_ch = spdk_get_io_channel(__bdev_to_io_dev(bdev)); 1780 qos->ch = ch; 1781 1782 qos->thread = spdk_io_channel_get_thread(io_ch); 1783 1784 TAILQ_INIT(&qos->queued); 1785 1786 for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) { 1787 if (_spdk_bdev_qos_is_iops_rate_limit(i) == true) { 1788 qos->rate_limits[i].min_per_timeslice = 1789 SPDK_BDEV_QOS_MIN_IO_PER_TIMESLICE; 1790 } else { 1791 qos->rate_limits[i].min_per_timeslice = 1792 SPDK_BDEV_QOS_MIN_BYTE_PER_TIMESLICE; 1793 } 1794 1795 if (qos->rate_limits[i].limit == 0) { 1796 qos->rate_limits[i].limit = SPDK_BDEV_QOS_LIMIT_NOT_DEFINED; 1797 } 1798 } 1799 spdk_bdev_qos_update_max_quota_per_timeslice(qos); 1800 qos->timeslice_size = 1801 SPDK_BDEV_QOS_TIMESLICE_IN_USEC * spdk_get_ticks_hz() / SPDK_SEC_TO_USEC; 1802 qos->last_timeslice = spdk_get_ticks(); 1803 qos->poller = spdk_poller_register(spdk_bdev_channel_poll_qos, 1804 qos, 1805 SPDK_BDEV_QOS_TIMESLICE_IN_USEC); 1806 } 1807 1808 ch->flags |= BDEV_CH_QOS_ENABLED; 1809 } 1810 } 1811 1812 static int 1813 spdk_bdev_channel_create(void *io_device, void *ctx_buf) 1814 { 1815 struct spdk_bdev *bdev = __bdev_from_io_dev(io_device); 1816 struct spdk_bdev_channel *ch = ctx_buf; 1817 struct spdk_io_channel *mgmt_io_ch; 1818 struct spdk_bdev_mgmt_channel *mgmt_ch; 1819 struct spdk_bdev_shared_resource *shared_resource; 1820 1821 ch->bdev = bdev; 1822 ch->channel = bdev->fn_table->get_io_channel(bdev->ctxt); 1823 if (!ch->channel) { 1824 return -1; 1825 } 1826 1827 mgmt_io_ch = spdk_get_io_channel(&g_bdev_mgr); 1828 if (!mgmt_io_ch) { 1829 return -1; 1830 } 1831 1832 mgmt_ch = spdk_io_channel_get_ctx(mgmt_io_ch); 1833 TAILQ_FOREACH(shared_resource, &mgmt_ch->shared_resources, link) { 1834 if (shared_resource->shared_ch == ch->channel) { 1835 spdk_put_io_channel(mgmt_io_ch); 1836 shared_resource->ref++; 1837 break; 1838 } 1839 } 1840 1841 if (shared_resource == NULL) { 1842 shared_resource = calloc(1, sizeof(*shared_resource)); 1843 if (shared_resource == NULL) { 1844 spdk_put_io_channel(mgmt_io_ch); 1845 return -1; 1846 } 1847 1848 shared_resource->mgmt_ch = mgmt_ch; 1849 shared_resource->io_outstanding = 0; 1850 TAILQ_INIT(&shared_resource->nomem_io); 1851 shared_resource->nomem_threshold = 0; 1852 shared_resource->shared_ch = ch->channel; 1853 shared_resource->ref = 1; 1854 TAILQ_INSERT_TAIL(&mgmt_ch->shared_resources, shared_resource, link); 1855 } 1856 1857 memset(&ch->stat, 0, sizeof(ch->stat)); 1858 ch->stat.ticks_rate = spdk_get_ticks_hz(); 1859 ch->io_outstanding = 0; 1860 TAILQ_INIT(&ch->queued_resets); 1861 ch->flags = 0; 1862 ch->shared_resource = shared_resource; 1863 1864 #ifdef SPDK_CONFIG_VTUNE 1865 { 1866 char *name; 1867 __itt_init_ittlib(NULL, 0); 1868 name = spdk_sprintf_alloc("spdk_bdev_%s_%p", ch->bdev->name, ch); 1869 if (!name) { 1870 _spdk_bdev_channel_destroy_resource(ch); 1871 return -1; 1872 } 1873 ch->handle = __itt_string_handle_create(name); 1874 free(name); 1875 ch->start_tsc = spdk_get_ticks(); 1876 ch->interval_tsc = spdk_get_ticks_hz() / 100; 1877 memset(&ch->prev_stat, 0, sizeof(ch->prev_stat)); 1878 } 1879 #endif 1880 1881 pthread_mutex_lock(&bdev->internal.mutex); 1882 _spdk_bdev_enable_qos(bdev, ch); 1883 pthread_mutex_unlock(&bdev->internal.mutex); 1884 1885 return 0; 1886 } 1887 1888 /* 1889 * Abort I/O that are waiting on a data buffer. These types of I/O are 1890 * linked using the spdk_bdev_io internal.buf_link TAILQ_ENTRY. 1891 */ 1892 static void 1893 _spdk_bdev_abort_buf_io(bdev_io_stailq_t *queue, struct spdk_bdev_channel *ch) 1894 { 1895 bdev_io_stailq_t tmp; 1896 struct spdk_bdev_io *bdev_io; 1897 1898 STAILQ_INIT(&tmp); 1899 1900 while (!STAILQ_EMPTY(queue)) { 1901 bdev_io = STAILQ_FIRST(queue); 1902 STAILQ_REMOVE_HEAD(queue, internal.buf_link); 1903 if (bdev_io->internal.ch == ch) { 1904 spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED); 1905 } else { 1906 STAILQ_INSERT_TAIL(&tmp, bdev_io, internal.buf_link); 1907 } 1908 } 1909 1910 STAILQ_SWAP(&tmp, queue, spdk_bdev_io); 1911 } 1912 1913 /* 1914 * Abort I/O that are queued waiting for submission. These types of I/O are 1915 * linked using the spdk_bdev_io link TAILQ_ENTRY. 1916 */ 1917 static void 1918 _spdk_bdev_abort_queued_io(bdev_io_tailq_t *queue, struct spdk_bdev_channel *ch) 1919 { 1920 struct spdk_bdev_io *bdev_io, *tmp; 1921 1922 TAILQ_FOREACH_SAFE(bdev_io, queue, internal.link, tmp) { 1923 if (bdev_io->internal.ch == ch) { 1924 TAILQ_REMOVE(queue, bdev_io, internal.link); 1925 /* 1926 * spdk_bdev_io_complete() assumes that the completed I/O had 1927 * been submitted to the bdev module. Since in this case it 1928 * hadn't, bump io_outstanding to account for the decrement 1929 * that spdk_bdev_io_complete() will do. 1930 */ 1931 if (bdev_io->type != SPDK_BDEV_IO_TYPE_RESET) { 1932 ch->io_outstanding++; 1933 ch->shared_resource->io_outstanding++; 1934 } 1935 spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED); 1936 } 1937 } 1938 } 1939 1940 static void 1941 spdk_bdev_qos_channel_destroy(void *cb_arg) 1942 { 1943 struct spdk_bdev_qos *qos = cb_arg; 1944 1945 spdk_put_io_channel(spdk_io_channel_from_ctx(qos->ch)); 1946 spdk_poller_unregister(&qos->poller); 1947 1948 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Free QoS %p.\n", qos); 1949 1950 free(qos); 1951 } 1952 1953 static int 1954 spdk_bdev_qos_destroy(struct spdk_bdev *bdev) 1955 { 1956 int i; 1957 1958 /* 1959 * Cleanly shutting down the QoS poller is tricky, because 1960 * during the asynchronous operation the user could open 1961 * a new descriptor and create a new channel, spawning 1962 * a new QoS poller. 1963 * 1964 * The strategy is to create a new QoS structure here and swap it 1965 * in. The shutdown path then continues to refer to the old one 1966 * until it completes and then releases it. 1967 */ 1968 struct spdk_bdev_qos *new_qos, *old_qos; 1969 1970 old_qos = bdev->internal.qos; 1971 1972 new_qos = calloc(1, sizeof(*new_qos)); 1973 if (!new_qos) { 1974 SPDK_ERRLOG("Unable to allocate memory to shut down QoS.\n"); 1975 return -ENOMEM; 1976 } 1977 1978 /* Copy the old QoS data into the newly allocated structure */ 1979 memcpy(new_qos, old_qos, sizeof(*new_qos)); 1980 1981 /* Zero out the key parts of the QoS structure */ 1982 new_qos->ch = NULL; 1983 new_qos->thread = NULL; 1984 new_qos->poller = NULL; 1985 TAILQ_INIT(&new_qos->queued); 1986 /* 1987 * The limit member of spdk_bdev_qos_limit structure is not zeroed. 1988 * It will be used later for the new QoS structure. 1989 */ 1990 for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) { 1991 new_qos->rate_limits[i].remaining_this_timeslice = 0; 1992 new_qos->rate_limits[i].min_per_timeslice = 0; 1993 new_qos->rate_limits[i].max_per_timeslice = 0; 1994 } 1995 1996 bdev->internal.qos = new_qos; 1997 1998 if (old_qos->thread == NULL) { 1999 free(old_qos); 2000 } else { 2001 spdk_thread_send_msg(old_qos->thread, spdk_bdev_qos_channel_destroy, 2002 old_qos); 2003 } 2004 2005 /* It is safe to continue with destroying the bdev even though the QoS channel hasn't 2006 * been destroyed yet. The destruction path will end up waiting for the final 2007 * channel to be put before it releases resources. */ 2008 2009 return 0; 2010 } 2011 2012 static void 2013 _spdk_bdev_io_stat_add(struct spdk_bdev_io_stat *total, struct spdk_bdev_io_stat *add) 2014 { 2015 total->bytes_read += add->bytes_read; 2016 total->num_read_ops += add->num_read_ops; 2017 total->bytes_written += add->bytes_written; 2018 total->num_write_ops += add->num_write_ops; 2019 total->read_latency_ticks += add->read_latency_ticks; 2020 total->write_latency_ticks += add->write_latency_ticks; 2021 } 2022 2023 static void 2024 spdk_bdev_channel_destroy(void *io_device, void *ctx_buf) 2025 { 2026 struct spdk_bdev_channel *ch = ctx_buf; 2027 struct spdk_bdev_mgmt_channel *mgmt_ch; 2028 struct spdk_bdev_shared_resource *shared_resource = ch->shared_resource; 2029 2030 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Destroying channel %p for bdev %s on thread %p\n", ch, ch->bdev->name, 2031 spdk_get_thread()); 2032 2033 /* This channel is going away, so add its statistics into the bdev so that they don't get lost. */ 2034 pthread_mutex_lock(&ch->bdev->internal.mutex); 2035 _spdk_bdev_io_stat_add(&ch->bdev->internal.stat, &ch->stat); 2036 pthread_mutex_unlock(&ch->bdev->internal.mutex); 2037 2038 mgmt_ch = shared_resource->mgmt_ch; 2039 2040 _spdk_bdev_abort_queued_io(&ch->queued_resets, ch); 2041 _spdk_bdev_abort_queued_io(&shared_resource->nomem_io, ch); 2042 _spdk_bdev_abort_buf_io(&mgmt_ch->need_buf_small, ch); 2043 _spdk_bdev_abort_buf_io(&mgmt_ch->need_buf_large, ch); 2044 2045 _spdk_bdev_channel_destroy_resource(ch); 2046 } 2047 2048 int 2049 spdk_bdev_alias_add(struct spdk_bdev *bdev, const char *alias) 2050 { 2051 struct spdk_bdev_alias *tmp; 2052 2053 if (alias == NULL) { 2054 SPDK_ERRLOG("Empty alias passed\n"); 2055 return -EINVAL; 2056 } 2057 2058 if (spdk_bdev_get_by_name(alias)) { 2059 SPDK_ERRLOG("Bdev name/alias: %s already exists\n", alias); 2060 return -EEXIST; 2061 } 2062 2063 tmp = calloc(1, sizeof(*tmp)); 2064 if (tmp == NULL) { 2065 SPDK_ERRLOG("Unable to allocate alias\n"); 2066 return -ENOMEM; 2067 } 2068 2069 tmp->alias = strdup(alias); 2070 if (tmp->alias == NULL) { 2071 free(tmp); 2072 SPDK_ERRLOG("Unable to allocate alias\n"); 2073 return -ENOMEM; 2074 } 2075 2076 TAILQ_INSERT_TAIL(&bdev->aliases, tmp, tailq); 2077 2078 return 0; 2079 } 2080 2081 int 2082 spdk_bdev_alias_del(struct spdk_bdev *bdev, const char *alias) 2083 { 2084 struct spdk_bdev_alias *tmp; 2085 2086 TAILQ_FOREACH(tmp, &bdev->aliases, tailq) { 2087 if (strcmp(alias, tmp->alias) == 0) { 2088 TAILQ_REMOVE(&bdev->aliases, tmp, tailq); 2089 free(tmp->alias); 2090 free(tmp); 2091 return 0; 2092 } 2093 } 2094 2095 SPDK_INFOLOG(SPDK_LOG_BDEV, "Alias %s does not exists\n", alias); 2096 2097 return -ENOENT; 2098 } 2099 2100 void 2101 spdk_bdev_alias_del_all(struct spdk_bdev *bdev) 2102 { 2103 struct spdk_bdev_alias *p, *tmp; 2104 2105 TAILQ_FOREACH_SAFE(p, &bdev->aliases, tailq, tmp) { 2106 TAILQ_REMOVE(&bdev->aliases, p, tailq); 2107 free(p->alias); 2108 free(p); 2109 } 2110 } 2111 2112 struct spdk_io_channel * 2113 spdk_bdev_get_io_channel(struct spdk_bdev_desc *desc) 2114 { 2115 return spdk_get_io_channel(__bdev_to_io_dev(desc->bdev)); 2116 } 2117 2118 const char * 2119 spdk_bdev_get_name(const struct spdk_bdev *bdev) 2120 { 2121 return bdev->name; 2122 } 2123 2124 const char * 2125 spdk_bdev_get_product_name(const struct spdk_bdev *bdev) 2126 { 2127 return bdev->product_name; 2128 } 2129 2130 const struct spdk_bdev_aliases_list * 2131 spdk_bdev_get_aliases(const struct spdk_bdev *bdev) 2132 { 2133 return &bdev->aliases; 2134 } 2135 2136 uint32_t 2137 spdk_bdev_get_block_size(const struct spdk_bdev *bdev) 2138 { 2139 return bdev->blocklen; 2140 } 2141 2142 uint64_t 2143 spdk_bdev_get_num_blocks(const struct spdk_bdev *bdev) 2144 { 2145 return bdev->blockcnt; 2146 } 2147 2148 const char * 2149 spdk_bdev_get_qos_rpc_type(enum spdk_bdev_qos_rate_limit_type type) 2150 { 2151 return qos_rpc_type[type]; 2152 } 2153 2154 void 2155 spdk_bdev_get_qos_rate_limits(struct spdk_bdev *bdev, uint64_t *limits) 2156 { 2157 int i; 2158 2159 memset(limits, 0, sizeof(*limits) * SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES); 2160 2161 pthread_mutex_lock(&bdev->internal.mutex); 2162 if (bdev->internal.qos) { 2163 for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) { 2164 if (bdev->internal.qos->rate_limits[i].limit != 2165 SPDK_BDEV_QOS_LIMIT_NOT_DEFINED) { 2166 limits[i] = bdev->internal.qos->rate_limits[i].limit; 2167 if (_spdk_bdev_qos_is_iops_rate_limit(i) == false) { 2168 /* Change from Byte to Megabyte which is user visible. */ 2169 limits[i] = limits[i] / 1024 / 1024; 2170 } 2171 } 2172 } 2173 } 2174 pthread_mutex_unlock(&bdev->internal.mutex); 2175 } 2176 2177 size_t 2178 spdk_bdev_get_buf_align(const struct spdk_bdev *bdev) 2179 { 2180 return 1 << bdev->required_alignment; 2181 } 2182 2183 uint32_t 2184 spdk_bdev_get_optimal_io_boundary(const struct spdk_bdev *bdev) 2185 { 2186 return bdev->optimal_io_boundary; 2187 } 2188 2189 bool 2190 spdk_bdev_has_write_cache(const struct spdk_bdev *bdev) 2191 { 2192 return bdev->write_cache; 2193 } 2194 2195 const struct spdk_uuid * 2196 spdk_bdev_get_uuid(const struct spdk_bdev *bdev) 2197 { 2198 return &bdev->uuid; 2199 } 2200 2201 uint64_t 2202 spdk_bdev_get_qd(const struct spdk_bdev *bdev) 2203 { 2204 return bdev->internal.measured_queue_depth; 2205 } 2206 2207 uint64_t 2208 spdk_bdev_get_qd_sampling_period(const struct spdk_bdev *bdev) 2209 { 2210 return bdev->internal.period; 2211 } 2212 2213 uint64_t 2214 spdk_bdev_get_weighted_io_time(const struct spdk_bdev *bdev) 2215 { 2216 return bdev->internal.weighted_io_time; 2217 } 2218 2219 uint64_t 2220 spdk_bdev_get_io_time(const struct spdk_bdev *bdev) 2221 { 2222 return bdev->internal.io_time; 2223 } 2224 2225 static void 2226 _calculate_measured_qd_cpl(struct spdk_io_channel_iter *i, int status) 2227 { 2228 struct spdk_bdev *bdev = spdk_io_channel_iter_get_ctx(i); 2229 2230 bdev->internal.measured_queue_depth = bdev->internal.temporary_queue_depth; 2231 2232 if (bdev->internal.measured_queue_depth) { 2233 bdev->internal.io_time += bdev->internal.period; 2234 bdev->internal.weighted_io_time += bdev->internal.period * bdev->internal.measured_queue_depth; 2235 } 2236 } 2237 2238 static void 2239 _calculate_measured_qd(struct spdk_io_channel_iter *i) 2240 { 2241 struct spdk_bdev *bdev = spdk_io_channel_iter_get_ctx(i); 2242 struct spdk_io_channel *io_ch = spdk_io_channel_iter_get_channel(i); 2243 struct spdk_bdev_channel *ch = spdk_io_channel_get_ctx(io_ch); 2244 2245 bdev->internal.temporary_queue_depth += ch->io_outstanding; 2246 spdk_for_each_channel_continue(i, 0); 2247 } 2248 2249 static int 2250 spdk_bdev_calculate_measured_queue_depth(void *ctx) 2251 { 2252 struct spdk_bdev *bdev = ctx; 2253 bdev->internal.temporary_queue_depth = 0; 2254 spdk_for_each_channel(__bdev_to_io_dev(bdev), _calculate_measured_qd, bdev, 2255 _calculate_measured_qd_cpl); 2256 return 0; 2257 } 2258 2259 void 2260 spdk_bdev_set_qd_sampling_period(struct spdk_bdev *bdev, uint64_t period) 2261 { 2262 bdev->internal.period = period; 2263 2264 if (bdev->internal.qd_poller != NULL) { 2265 spdk_poller_unregister(&bdev->internal.qd_poller); 2266 bdev->internal.measured_queue_depth = UINT64_MAX; 2267 } 2268 2269 if (period != 0) { 2270 bdev->internal.qd_poller = spdk_poller_register(spdk_bdev_calculate_measured_queue_depth, bdev, 2271 period); 2272 } 2273 } 2274 2275 int 2276 spdk_bdev_notify_blockcnt_change(struct spdk_bdev *bdev, uint64_t size) 2277 { 2278 int ret; 2279 2280 pthread_mutex_lock(&bdev->internal.mutex); 2281 2282 /* bdev has open descriptors */ 2283 if (!TAILQ_EMPTY(&bdev->internal.open_descs) && 2284 bdev->blockcnt > size) { 2285 ret = -EBUSY; 2286 } else { 2287 bdev->blockcnt = size; 2288 ret = 0; 2289 } 2290 2291 pthread_mutex_unlock(&bdev->internal.mutex); 2292 2293 return ret; 2294 } 2295 2296 /* 2297 * Convert I/O offset and length from bytes to blocks. 2298 * 2299 * Returns zero on success or non-zero if the byte parameters aren't divisible by the block size. 2300 */ 2301 static uint64_t 2302 spdk_bdev_bytes_to_blocks(struct spdk_bdev *bdev, uint64_t offset_bytes, uint64_t *offset_blocks, 2303 uint64_t num_bytes, uint64_t *num_blocks) 2304 { 2305 uint32_t block_size = bdev->blocklen; 2306 2307 *offset_blocks = offset_bytes / block_size; 2308 *num_blocks = num_bytes / block_size; 2309 2310 return (offset_bytes % block_size) | (num_bytes % block_size); 2311 } 2312 2313 static bool 2314 spdk_bdev_io_valid_blocks(struct spdk_bdev *bdev, uint64_t offset_blocks, uint64_t num_blocks) 2315 { 2316 /* Return failure if offset_blocks + num_blocks is less than offset_blocks; indicates there 2317 * has been an overflow and hence the offset has been wrapped around */ 2318 if (offset_blocks + num_blocks < offset_blocks) { 2319 return false; 2320 } 2321 2322 /* Return failure if offset_blocks + num_blocks exceeds the size of the bdev */ 2323 if (offset_blocks + num_blocks > bdev->blockcnt) { 2324 return false; 2325 } 2326 2327 return true; 2328 } 2329 2330 int 2331 spdk_bdev_read(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 2332 void *buf, uint64_t offset, uint64_t nbytes, 2333 spdk_bdev_io_completion_cb cb, void *cb_arg) 2334 { 2335 uint64_t offset_blocks, num_blocks; 2336 2337 if (spdk_bdev_bytes_to_blocks(desc->bdev, offset, &offset_blocks, nbytes, &num_blocks) != 0) { 2338 return -EINVAL; 2339 } 2340 2341 return spdk_bdev_read_blocks(desc, ch, buf, offset_blocks, num_blocks, cb, cb_arg); 2342 } 2343 2344 int 2345 spdk_bdev_read_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 2346 void *buf, uint64_t offset_blocks, uint64_t num_blocks, 2347 spdk_bdev_io_completion_cb cb, void *cb_arg) 2348 { 2349 struct spdk_bdev *bdev = desc->bdev; 2350 struct spdk_bdev_io *bdev_io; 2351 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 2352 2353 if (!spdk_bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) { 2354 return -EINVAL; 2355 } 2356 2357 bdev_io = spdk_bdev_get_io(channel); 2358 if (!bdev_io) { 2359 return -ENOMEM; 2360 } 2361 2362 bdev_io->internal.ch = channel; 2363 bdev_io->internal.desc = desc; 2364 bdev_io->type = SPDK_BDEV_IO_TYPE_READ; 2365 bdev_io->u.bdev.iovs = &bdev_io->iov; 2366 bdev_io->u.bdev.iovs[0].iov_base = buf; 2367 bdev_io->u.bdev.iovs[0].iov_len = num_blocks * bdev->blocklen; 2368 bdev_io->u.bdev.iovcnt = 1; 2369 bdev_io->u.bdev.num_blocks = num_blocks; 2370 bdev_io->u.bdev.offset_blocks = offset_blocks; 2371 spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb); 2372 2373 spdk_bdev_io_submit(bdev_io); 2374 return 0; 2375 } 2376 2377 int 2378 spdk_bdev_readv(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 2379 struct iovec *iov, int iovcnt, 2380 uint64_t offset, uint64_t nbytes, 2381 spdk_bdev_io_completion_cb cb, void *cb_arg) 2382 { 2383 uint64_t offset_blocks, num_blocks; 2384 2385 if (spdk_bdev_bytes_to_blocks(desc->bdev, offset, &offset_blocks, nbytes, &num_blocks) != 0) { 2386 return -EINVAL; 2387 } 2388 2389 return spdk_bdev_readv_blocks(desc, ch, iov, iovcnt, offset_blocks, num_blocks, cb, cb_arg); 2390 } 2391 2392 int spdk_bdev_readv_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 2393 struct iovec *iov, int iovcnt, 2394 uint64_t offset_blocks, uint64_t num_blocks, 2395 spdk_bdev_io_completion_cb cb, void *cb_arg) 2396 { 2397 struct spdk_bdev *bdev = desc->bdev; 2398 struct spdk_bdev_io *bdev_io; 2399 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 2400 2401 if (!spdk_bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) { 2402 return -EINVAL; 2403 } 2404 2405 bdev_io = spdk_bdev_get_io(channel); 2406 if (!bdev_io) { 2407 return -ENOMEM; 2408 } 2409 2410 bdev_io->internal.ch = channel; 2411 bdev_io->internal.desc = desc; 2412 bdev_io->type = SPDK_BDEV_IO_TYPE_READ; 2413 bdev_io->u.bdev.iovs = iov; 2414 bdev_io->u.bdev.iovcnt = iovcnt; 2415 bdev_io->u.bdev.num_blocks = num_blocks; 2416 bdev_io->u.bdev.offset_blocks = offset_blocks; 2417 spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb); 2418 2419 spdk_bdev_io_submit(bdev_io); 2420 return 0; 2421 } 2422 2423 int 2424 spdk_bdev_write(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 2425 void *buf, uint64_t offset, uint64_t nbytes, 2426 spdk_bdev_io_completion_cb cb, void *cb_arg) 2427 { 2428 uint64_t offset_blocks, num_blocks; 2429 2430 if (spdk_bdev_bytes_to_blocks(desc->bdev, offset, &offset_blocks, nbytes, &num_blocks) != 0) { 2431 return -EINVAL; 2432 } 2433 2434 return spdk_bdev_write_blocks(desc, ch, buf, offset_blocks, num_blocks, cb, cb_arg); 2435 } 2436 2437 int 2438 spdk_bdev_write_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 2439 void *buf, uint64_t offset_blocks, uint64_t num_blocks, 2440 spdk_bdev_io_completion_cb cb, void *cb_arg) 2441 { 2442 struct spdk_bdev *bdev = desc->bdev; 2443 struct spdk_bdev_io *bdev_io; 2444 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 2445 2446 if (!desc->write) { 2447 return -EBADF; 2448 } 2449 2450 if (!spdk_bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) { 2451 return -EINVAL; 2452 } 2453 2454 bdev_io = spdk_bdev_get_io(channel); 2455 if (!bdev_io) { 2456 return -ENOMEM; 2457 } 2458 2459 bdev_io->internal.ch = channel; 2460 bdev_io->internal.desc = desc; 2461 bdev_io->type = SPDK_BDEV_IO_TYPE_WRITE; 2462 bdev_io->u.bdev.iovs = &bdev_io->iov; 2463 bdev_io->u.bdev.iovs[0].iov_base = buf; 2464 bdev_io->u.bdev.iovs[0].iov_len = num_blocks * bdev->blocklen; 2465 bdev_io->u.bdev.iovcnt = 1; 2466 bdev_io->u.bdev.num_blocks = num_blocks; 2467 bdev_io->u.bdev.offset_blocks = offset_blocks; 2468 spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb); 2469 2470 spdk_bdev_io_submit(bdev_io); 2471 return 0; 2472 } 2473 2474 int 2475 spdk_bdev_writev(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 2476 struct iovec *iov, int iovcnt, 2477 uint64_t offset, uint64_t len, 2478 spdk_bdev_io_completion_cb cb, void *cb_arg) 2479 { 2480 uint64_t offset_blocks, num_blocks; 2481 2482 if (spdk_bdev_bytes_to_blocks(desc->bdev, offset, &offset_blocks, len, &num_blocks) != 0) { 2483 return -EINVAL; 2484 } 2485 2486 return spdk_bdev_writev_blocks(desc, ch, iov, iovcnt, offset_blocks, num_blocks, cb, cb_arg); 2487 } 2488 2489 int 2490 spdk_bdev_writev_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 2491 struct iovec *iov, int iovcnt, 2492 uint64_t offset_blocks, uint64_t num_blocks, 2493 spdk_bdev_io_completion_cb cb, void *cb_arg) 2494 { 2495 struct spdk_bdev *bdev = desc->bdev; 2496 struct spdk_bdev_io *bdev_io; 2497 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 2498 2499 if (!desc->write) { 2500 return -EBADF; 2501 } 2502 2503 if (!spdk_bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) { 2504 return -EINVAL; 2505 } 2506 2507 bdev_io = spdk_bdev_get_io(channel); 2508 if (!bdev_io) { 2509 return -ENOMEM; 2510 } 2511 2512 bdev_io->internal.ch = channel; 2513 bdev_io->internal.desc = desc; 2514 bdev_io->type = SPDK_BDEV_IO_TYPE_WRITE; 2515 bdev_io->u.bdev.iovs = iov; 2516 bdev_io->u.bdev.iovcnt = iovcnt; 2517 bdev_io->u.bdev.num_blocks = num_blocks; 2518 bdev_io->u.bdev.offset_blocks = offset_blocks; 2519 spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb); 2520 2521 spdk_bdev_io_submit(bdev_io); 2522 return 0; 2523 } 2524 2525 int 2526 spdk_bdev_write_zeroes(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 2527 uint64_t offset, uint64_t len, 2528 spdk_bdev_io_completion_cb cb, void *cb_arg) 2529 { 2530 uint64_t offset_blocks, num_blocks; 2531 2532 if (spdk_bdev_bytes_to_blocks(desc->bdev, offset, &offset_blocks, len, &num_blocks) != 0) { 2533 return -EINVAL; 2534 } 2535 2536 return spdk_bdev_write_zeroes_blocks(desc, ch, offset_blocks, num_blocks, cb, cb_arg); 2537 } 2538 2539 int 2540 spdk_bdev_write_zeroes_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 2541 uint64_t offset_blocks, uint64_t num_blocks, 2542 spdk_bdev_io_completion_cb cb, void *cb_arg) 2543 { 2544 struct spdk_bdev *bdev = desc->bdev; 2545 struct spdk_bdev_io *bdev_io; 2546 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 2547 2548 if (!desc->write) { 2549 return -EBADF; 2550 } 2551 2552 if (!spdk_bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) { 2553 return -EINVAL; 2554 } 2555 2556 bdev_io = spdk_bdev_get_io(channel); 2557 2558 if (!bdev_io) { 2559 return -ENOMEM; 2560 } 2561 2562 bdev_io->type = SPDK_BDEV_IO_TYPE_WRITE_ZEROES; 2563 bdev_io->internal.ch = channel; 2564 bdev_io->internal.desc = desc; 2565 bdev_io->u.bdev.offset_blocks = offset_blocks; 2566 bdev_io->u.bdev.num_blocks = num_blocks; 2567 spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb); 2568 2569 if (_spdk_bdev_io_type_supported(bdev, SPDK_BDEV_IO_TYPE_WRITE_ZEROES)) { 2570 spdk_bdev_io_submit(bdev_io); 2571 return 0; 2572 } else if (_spdk_bdev_io_type_supported(bdev, SPDK_BDEV_IO_TYPE_WRITE)) { 2573 assert(spdk_bdev_get_block_size(bdev) <= ZERO_BUFFER_SIZE); 2574 bdev_io->u.bdev.split_remaining_num_blocks = num_blocks; 2575 bdev_io->u.bdev.split_current_offset_blocks = offset_blocks; 2576 _spdk_bdev_write_zero_buffer_next(bdev_io); 2577 return 0; 2578 } else { 2579 spdk_bdev_free_io(bdev_io); 2580 return -ENOTSUP; 2581 } 2582 } 2583 2584 int 2585 spdk_bdev_unmap(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 2586 uint64_t offset, uint64_t nbytes, 2587 spdk_bdev_io_completion_cb cb, void *cb_arg) 2588 { 2589 uint64_t offset_blocks, num_blocks; 2590 2591 if (spdk_bdev_bytes_to_blocks(desc->bdev, offset, &offset_blocks, nbytes, &num_blocks) != 0) { 2592 return -EINVAL; 2593 } 2594 2595 return spdk_bdev_unmap_blocks(desc, ch, offset_blocks, num_blocks, cb, cb_arg); 2596 } 2597 2598 int 2599 spdk_bdev_unmap_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 2600 uint64_t offset_blocks, uint64_t num_blocks, 2601 spdk_bdev_io_completion_cb cb, void *cb_arg) 2602 { 2603 struct spdk_bdev *bdev = desc->bdev; 2604 struct spdk_bdev_io *bdev_io; 2605 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 2606 2607 if (!desc->write) { 2608 return -EBADF; 2609 } 2610 2611 if (!spdk_bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) { 2612 return -EINVAL; 2613 } 2614 2615 if (num_blocks == 0) { 2616 SPDK_ERRLOG("Can't unmap 0 bytes\n"); 2617 return -EINVAL; 2618 } 2619 2620 bdev_io = spdk_bdev_get_io(channel); 2621 if (!bdev_io) { 2622 return -ENOMEM; 2623 } 2624 2625 bdev_io->internal.ch = channel; 2626 bdev_io->internal.desc = desc; 2627 bdev_io->type = SPDK_BDEV_IO_TYPE_UNMAP; 2628 2629 bdev_io->u.bdev.iovs = &bdev_io->iov; 2630 bdev_io->u.bdev.iovs[0].iov_base = NULL; 2631 bdev_io->u.bdev.iovs[0].iov_len = 0; 2632 bdev_io->u.bdev.iovcnt = 1; 2633 2634 bdev_io->u.bdev.offset_blocks = offset_blocks; 2635 bdev_io->u.bdev.num_blocks = num_blocks; 2636 spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb); 2637 2638 spdk_bdev_io_submit(bdev_io); 2639 return 0; 2640 } 2641 2642 int 2643 spdk_bdev_flush(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 2644 uint64_t offset, uint64_t length, 2645 spdk_bdev_io_completion_cb cb, void *cb_arg) 2646 { 2647 uint64_t offset_blocks, num_blocks; 2648 2649 if (spdk_bdev_bytes_to_blocks(desc->bdev, offset, &offset_blocks, length, &num_blocks) != 0) { 2650 return -EINVAL; 2651 } 2652 2653 return spdk_bdev_flush_blocks(desc, ch, offset_blocks, num_blocks, cb, cb_arg); 2654 } 2655 2656 int 2657 spdk_bdev_flush_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 2658 uint64_t offset_blocks, uint64_t num_blocks, 2659 spdk_bdev_io_completion_cb cb, void *cb_arg) 2660 { 2661 struct spdk_bdev *bdev = desc->bdev; 2662 struct spdk_bdev_io *bdev_io; 2663 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 2664 2665 if (!desc->write) { 2666 return -EBADF; 2667 } 2668 2669 if (!spdk_bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) { 2670 return -EINVAL; 2671 } 2672 2673 bdev_io = spdk_bdev_get_io(channel); 2674 if (!bdev_io) { 2675 return -ENOMEM; 2676 } 2677 2678 bdev_io->internal.ch = channel; 2679 bdev_io->internal.desc = desc; 2680 bdev_io->type = SPDK_BDEV_IO_TYPE_FLUSH; 2681 bdev_io->u.bdev.iovs = NULL; 2682 bdev_io->u.bdev.iovcnt = 0; 2683 bdev_io->u.bdev.offset_blocks = offset_blocks; 2684 bdev_io->u.bdev.num_blocks = num_blocks; 2685 spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb); 2686 2687 spdk_bdev_io_submit(bdev_io); 2688 return 0; 2689 } 2690 2691 static void 2692 _spdk_bdev_reset_dev(struct spdk_io_channel_iter *i, int status) 2693 { 2694 struct spdk_bdev_channel *ch = spdk_io_channel_iter_get_ctx(i); 2695 struct spdk_bdev_io *bdev_io; 2696 2697 bdev_io = TAILQ_FIRST(&ch->queued_resets); 2698 TAILQ_REMOVE(&ch->queued_resets, bdev_io, internal.link); 2699 spdk_bdev_io_submit_reset(bdev_io); 2700 } 2701 2702 static void 2703 _spdk_bdev_reset_freeze_channel(struct spdk_io_channel_iter *i) 2704 { 2705 struct spdk_io_channel *ch; 2706 struct spdk_bdev_channel *channel; 2707 struct spdk_bdev_mgmt_channel *mgmt_channel; 2708 struct spdk_bdev_shared_resource *shared_resource; 2709 bdev_io_tailq_t tmp_queued; 2710 2711 TAILQ_INIT(&tmp_queued); 2712 2713 ch = spdk_io_channel_iter_get_channel(i); 2714 channel = spdk_io_channel_get_ctx(ch); 2715 shared_resource = channel->shared_resource; 2716 mgmt_channel = shared_resource->mgmt_ch; 2717 2718 channel->flags |= BDEV_CH_RESET_IN_PROGRESS; 2719 2720 if ((channel->flags & BDEV_CH_QOS_ENABLED) != 0) { 2721 /* The QoS object is always valid and readable while 2722 * the channel flag is set, so the lock here should not 2723 * be necessary. We're not in the fast path though, so 2724 * just take it anyway. */ 2725 pthread_mutex_lock(&channel->bdev->internal.mutex); 2726 if (channel->bdev->internal.qos->ch == channel) { 2727 TAILQ_SWAP(&channel->bdev->internal.qos->queued, &tmp_queued, spdk_bdev_io, internal.link); 2728 } 2729 pthread_mutex_unlock(&channel->bdev->internal.mutex); 2730 } 2731 2732 _spdk_bdev_abort_queued_io(&shared_resource->nomem_io, channel); 2733 _spdk_bdev_abort_buf_io(&mgmt_channel->need_buf_small, channel); 2734 _spdk_bdev_abort_buf_io(&mgmt_channel->need_buf_large, channel); 2735 _spdk_bdev_abort_queued_io(&tmp_queued, channel); 2736 2737 spdk_for_each_channel_continue(i, 0); 2738 } 2739 2740 static void 2741 _spdk_bdev_start_reset(void *ctx) 2742 { 2743 struct spdk_bdev_channel *ch = ctx; 2744 2745 spdk_for_each_channel(__bdev_to_io_dev(ch->bdev), _spdk_bdev_reset_freeze_channel, 2746 ch, _spdk_bdev_reset_dev); 2747 } 2748 2749 static void 2750 _spdk_bdev_channel_start_reset(struct spdk_bdev_channel *ch) 2751 { 2752 struct spdk_bdev *bdev = ch->bdev; 2753 2754 assert(!TAILQ_EMPTY(&ch->queued_resets)); 2755 2756 pthread_mutex_lock(&bdev->internal.mutex); 2757 if (bdev->internal.reset_in_progress == NULL) { 2758 bdev->internal.reset_in_progress = TAILQ_FIRST(&ch->queued_resets); 2759 /* 2760 * Take a channel reference for the target bdev for the life of this 2761 * reset. This guards against the channel getting destroyed while 2762 * spdk_for_each_channel() calls related to this reset IO are in 2763 * progress. We will release the reference when this reset is 2764 * completed. 2765 */ 2766 bdev->internal.reset_in_progress->u.reset.ch_ref = spdk_get_io_channel(__bdev_to_io_dev(bdev)); 2767 _spdk_bdev_start_reset(ch); 2768 } 2769 pthread_mutex_unlock(&bdev->internal.mutex); 2770 } 2771 2772 int 2773 spdk_bdev_reset(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 2774 spdk_bdev_io_completion_cb cb, void *cb_arg) 2775 { 2776 struct spdk_bdev *bdev = desc->bdev; 2777 struct spdk_bdev_io *bdev_io; 2778 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 2779 2780 bdev_io = spdk_bdev_get_io(channel); 2781 if (!bdev_io) { 2782 return -ENOMEM; 2783 } 2784 2785 bdev_io->internal.ch = channel; 2786 bdev_io->internal.desc = desc; 2787 bdev_io->type = SPDK_BDEV_IO_TYPE_RESET; 2788 bdev_io->u.reset.ch_ref = NULL; 2789 spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb); 2790 2791 pthread_mutex_lock(&bdev->internal.mutex); 2792 TAILQ_INSERT_TAIL(&channel->queued_resets, bdev_io, internal.link); 2793 pthread_mutex_unlock(&bdev->internal.mutex); 2794 2795 _spdk_bdev_channel_start_reset(channel); 2796 2797 return 0; 2798 } 2799 2800 void 2801 spdk_bdev_get_io_stat(struct spdk_bdev *bdev, struct spdk_io_channel *ch, 2802 struct spdk_bdev_io_stat *stat) 2803 { 2804 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 2805 2806 *stat = channel->stat; 2807 } 2808 2809 static void 2810 _spdk_bdev_get_device_stat_done(struct spdk_io_channel_iter *i, int status) 2811 { 2812 void *io_device = spdk_io_channel_iter_get_io_device(i); 2813 struct spdk_bdev_iostat_ctx *bdev_iostat_ctx = spdk_io_channel_iter_get_ctx(i); 2814 2815 bdev_iostat_ctx->cb(__bdev_from_io_dev(io_device), bdev_iostat_ctx->stat, 2816 bdev_iostat_ctx->cb_arg, 0); 2817 free(bdev_iostat_ctx); 2818 } 2819 2820 static void 2821 _spdk_bdev_get_each_channel_stat(struct spdk_io_channel_iter *i) 2822 { 2823 struct spdk_bdev_iostat_ctx *bdev_iostat_ctx = spdk_io_channel_iter_get_ctx(i); 2824 struct spdk_io_channel *ch = spdk_io_channel_iter_get_channel(i); 2825 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 2826 2827 _spdk_bdev_io_stat_add(bdev_iostat_ctx->stat, &channel->stat); 2828 spdk_for_each_channel_continue(i, 0); 2829 } 2830 2831 void 2832 spdk_bdev_get_device_stat(struct spdk_bdev *bdev, struct spdk_bdev_io_stat *stat, 2833 spdk_bdev_get_device_stat_cb cb, void *cb_arg) 2834 { 2835 struct spdk_bdev_iostat_ctx *bdev_iostat_ctx; 2836 2837 assert(bdev != NULL); 2838 assert(stat != NULL); 2839 assert(cb != NULL); 2840 2841 bdev_iostat_ctx = calloc(1, sizeof(struct spdk_bdev_iostat_ctx)); 2842 if (bdev_iostat_ctx == NULL) { 2843 SPDK_ERRLOG("Unable to allocate memory for spdk_bdev_iostat_ctx\n"); 2844 cb(bdev, stat, cb_arg, -ENOMEM); 2845 return; 2846 } 2847 2848 bdev_iostat_ctx->stat = stat; 2849 bdev_iostat_ctx->cb = cb; 2850 bdev_iostat_ctx->cb_arg = cb_arg; 2851 2852 /* Start with the statistics from previously deleted channels. */ 2853 pthread_mutex_lock(&bdev->internal.mutex); 2854 _spdk_bdev_io_stat_add(bdev_iostat_ctx->stat, &bdev->internal.stat); 2855 pthread_mutex_unlock(&bdev->internal.mutex); 2856 2857 /* Then iterate and add the statistics from each existing channel. */ 2858 spdk_for_each_channel(__bdev_to_io_dev(bdev), 2859 _spdk_bdev_get_each_channel_stat, 2860 bdev_iostat_ctx, 2861 _spdk_bdev_get_device_stat_done); 2862 } 2863 2864 int 2865 spdk_bdev_nvme_admin_passthru(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 2866 const struct spdk_nvme_cmd *cmd, void *buf, size_t nbytes, 2867 spdk_bdev_io_completion_cb cb, void *cb_arg) 2868 { 2869 struct spdk_bdev *bdev = desc->bdev; 2870 struct spdk_bdev_io *bdev_io; 2871 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 2872 2873 if (!desc->write) { 2874 return -EBADF; 2875 } 2876 2877 bdev_io = spdk_bdev_get_io(channel); 2878 if (!bdev_io) { 2879 return -ENOMEM; 2880 } 2881 2882 bdev_io->internal.ch = channel; 2883 bdev_io->internal.desc = desc; 2884 bdev_io->type = SPDK_BDEV_IO_TYPE_NVME_ADMIN; 2885 bdev_io->u.nvme_passthru.cmd = *cmd; 2886 bdev_io->u.nvme_passthru.buf = buf; 2887 bdev_io->u.nvme_passthru.nbytes = nbytes; 2888 bdev_io->u.nvme_passthru.md_buf = NULL; 2889 bdev_io->u.nvme_passthru.md_len = 0; 2890 2891 spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb); 2892 2893 spdk_bdev_io_submit(bdev_io); 2894 return 0; 2895 } 2896 2897 int 2898 spdk_bdev_nvme_io_passthru(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 2899 const struct spdk_nvme_cmd *cmd, void *buf, size_t nbytes, 2900 spdk_bdev_io_completion_cb cb, void *cb_arg) 2901 { 2902 struct spdk_bdev *bdev = desc->bdev; 2903 struct spdk_bdev_io *bdev_io; 2904 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 2905 2906 if (!desc->write) { 2907 /* 2908 * Do not try to parse the NVMe command - we could maybe use bits in the opcode 2909 * to easily determine if the command is a read or write, but for now just 2910 * do not allow io_passthru with a read-only descriptor. 2911 */ 2912 return -EBADF; 2913 } 2914 2915 bdev_io = spdk_bdev_get_io(channel); 2916 if (!bdev_io) { 2917 return -ENOMEM; 2918 } 2919 2920 bdev_io->internal.ch = channel; 2921 bdev_io->internal.desc = desc; 2922 bdev_io->type = SPDK_BDEV_IO_TYPE_NVME_IO; 2923 bdev_io->u.nvme_passthru.cmd = *cmd; 2924 bdev_io->u.nvme_passthru.buf = buf; 2925 bdev_io->u.nvme_passthru.nbytes = nbytes; 2926 bdev_io->u.nvme_passthru.md_buf = NULL; 2927 bdev_io->u.nvme_passthru.md_len = 0; 2928 2929 spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb); 2930 2931 spdk_bdev_io_submit(bdev_io); 2932 return 0; 2933 } 2934 2935 int 2936 spdk_bdev_nvme_io_passthru_md(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 2937 const struct spdk_nvme_cmd *cmd, void *buf, size_t nbytes, void *md_buf, size_t md_len, 2938 spdk_bdev_io_completion_cb cb, void *cb_arg) 2939 { 2940 struct spdk_bdev *bdev = desc->bdev; 2941 struct spdk_bdev_io *bdev_io; 2942 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 2943 2944 if (!desc->write) { 2945 /* 2946 * Do not try to parse the NVMe command - we could maybe use bits in the opcode 2947 * to easily determine if the command is a read or write, but for now just 2948 * do not allow io_passthru with a read-only descriptor. 2949 */ 2950 return -EBADF; 2951 } 2952 2953 bdev_io = spdk_bdev_get_io(channel); 2954 if (!bdev_io) { 2955 return -ENOMEM; 2956 } 2957 2958 bdev_io->internal.ch = channel; 2959 bdev_io->internal.desc = desc; 2960 bdev_io->type = SPDK_BDEV_IO_TYPE_NVME_IO_MD; 2961 bdev_io->u.nvme_passthru.cmd = *cmd; 2962 bdev_io->u.nvme_passthru.buf = buf; 2963 bdev_io->u.nvme_passthru.nbytes = nbytes; 2964 bdev_io->u.nvme_passthru.md_buf = md_buf; 2965 bdev_io->u.nvme_passthru.md_len = md_len; 2966 2967 spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb); 2968 2969 spdk_bdev_io_submit(bdev_io); 2970 return 0; 2971 } 2972 2973 int 2974 spdk_bdev_queue_io_wait(struct spdk_bdev *bdev, struct spdk_io_channel *ch, 2975 struct spdk_bdev_io_wait_entry *entry) 2976 { 2977 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 2978 struct spdk_bdev_mgmt_channel *mgmt_ch = channel->shared_resource->mgmt_ch; 2979 2980 if (bdev != entry->bdev) { 2981 SPDK_ERRLOG("bdevs do not match\n"); 2982 return -EINVAL; 2983 } 2984 2985 if (mgmt_ch->per_thread_cache_count > 0) { 2986 SPDK_ERRLOG("Cannot queue io_wait if spdk_bdev_io available in per-thread cache\n"); 2987 return -EINVAL; 2988 } 2989 2990 TAILQ_INSERT_TAIL(&mgmt_ch->io_wait_queue, entry, link); 2991 return 0; 2992 } 2993 2994 static void 2995 _spdk_bdev_ch_retry_io(struct spdk_bdev_channel *bdev_ch) 2996 { 2997 struct spdk_bdev *bdev = bdev_ch->bdev; 2998 struct spdk_bdev_shared_resource *shared_resource = bdev_ch->shared_resource; 2999 struct spdk_bdev_io *bdev_io; 3000 3001 if (shared_resource->io_outstanding > shared_resource->nomem_threshold) { 3002 /* 3003 * Allow some more I/O to complete before retrying the nomem_io queue. 3004 * Some drivers (such as nvme) cannot immediately take a new I/O in 3005 * the context of a completion, because the resources for the I/O are 3006 * not released until control returns to the bdev poller. Also, we 3007 * may require several small I/O to complete before a larger I/O 3008 * (that requires splitting) can be submitted. 3009 */ 3010 return; 3011 } 3012 3013 while (!TAILQ_EMPTY(&shared_resource->nomem_io)) { 3014 bdev_io = TAILQ_FIRST(&shared_resource->nomem_io); 3015 TAILQ_REMOVE(&shared_resource->nomem_io, bdev_io, internal.link); 3016 bdev_io->internal.ch->io_outstanding++; 3017 shared_resource->io_outstanding++; 3018 bdev_io->internal.status = SPDK_BDEV_IO_STATUS_PENDING; 3019 bdev->fn_table->submit_request(bdev_io->internal.ch->channel, bdev_io); 3020 if (bdev_io->internal.status == SPDK_BDEV_IO_STATUS_NOMEM) { 3021 break; 3022 } 3023 } 3024 } 3025 3026 static inline void 3027 _spdk_bdev_io_complete(void *ctx) 3028 { 3029 struct spdk_bdev_io *bdev_io = ctx; 3030 uint64_t tsc; 3031 3032 if (spdk_unlikely(bdev_io->internal.in_submit_request || bdev_io->internal.io_submit_ch)) { 3033 /* 3034 * Send the completion to the thread that originally submitted the I/O, 3035 * which may not be the current thread in the case of QoS. 3036 */ 3037 if (bdev_io->internal.io_submit_ch) { 3038 bdev_io->internal.ch = bdev_io->internal.io_submit_ch; 3039 bdev_io->internal.io_submit_ch = NULL; 3040 } 3041 3042 /* 3043 * Defer completion to avoid potential infinite recursion if the 3044 * user's completion callback issues a new I/O. 3045 */ 3046 spdk_thread_send_msg(spdk_io_channel_get_thread(bdev_io->internal.ch->channel), 3047 _spdk_bdev_io_complete, bdev_io); 3048 return; 3049 } 3050 3051 tsc = spdk_get_ticks(); 3052 spdk_trace_record_tsc(tsc, TRACE_BDEV_IO_DONE, 0, 0, (uintptr_t)bdev_io, 0); 3053 3054 if (bdev_io->internal.status == SPDK_BDEV_IO_STATUS_SUCCESS) { 3055 switch (bdev_io->type) { 3056 case SPDK_BDEV_IO_TYPE_READ: 3057 bdev_io->internal.ch->stat.bytes_read += bdev_io->u.bdev.num_blocks * bdev_io->bdev->blocklen; 3058 bdev_io->internal.ch->stat.num_read_ops++; 3059 bdev_io->internal.ch->stat.read_latency_ticks += (tsc - bdev_io->internal.submit_tsc); 3060 break; 3061 case SPDK_BDEV_IO_TYPE_WRITE: 3062 bdev_io->internal.ch->stat.bytes_written += bdev_io->u.bdev.num_blocks * bdev_io->bdev->blocklen; 3063 bdev_io->internal.ch->stat.num_write_ops++; 3064 bdev_io->internal.ch->stat.write_latency_ticks += (tsc - bdev_io->internal.submit_tsc); 3065 break; 3066 default: 3067 break; 3068 } 3069 } 3070 3071 #ifdef SPDK_CONFIG_VTUNE 3072 uint64_t now_tsc = spdk_get_ticks(); 3073 if (now_tsc > (bdev_io->internal.ch->start_tsc + bdev_io->internal.ch->interval_tsc)) { 3074 uint64_t data[5]; 3075 3076 data[0] = bdev_io->internal.ch->stat.num_read_ops - bdev_io->internal.ch->prev_stat.num_read_ops; 3077 data[1] = bdev_io->internal.ch->stat.bytes_read - bdev_io->internal.ch->prev_stat.bytes_read; 3078 data[2] = bdev_io->internal.ch->stat.num_write_ops - bdev_io->internal.ch->prev_stat.num_write_ops; 3079 data[3] = bdev_io->internal.ch->stat.bytes_written - bdev_io->internal.ch->prev_stat.bytes_written; 3080 data[4] = bdev_io->bdev->fn_table->get_spin_time ? 3081 bdev_io->bdev->fn_table->get_spin_time(bdev_io->internal.ch->channel) : 0; 3082 3083 __itt_metadata_add(g_bdev_mgr.domain, __itt_null, bdev_io->internal.ch->handle, 3084 __itt_metadata_u64, 5, data); 3085 3086 bdev_io->internal.ch->prev_stat = bdev_io->internal.ch->stat; 3087 bdev_io->internal.ch->start_tsc = now_tsc; 3088 } 3089 #endif 3090 3091 assert(bdev_io->internal.cb != NULL); 3092 assert(spdk_get_thread() == spdk_io_channel_get_thread(bdev_io->internal.ch->channel)); 3093 3094 bdev_io->internal.cb(bdev_io, bdev_io->internal.status == SPDK_BDEV_IO_STATUS_SUCCESS, 3095 bdev_io->internal.caller_ctx); 3096 } 3097 3098 static void 3099 _spdk_bdev_reset_complete(struct spdk_io_channel_iter *i, int status) 3100 { 3101 struct spdk_bdev_io *bdev_io = spdk_io_channel_iter_get_ctx(i); 3102 3103 if (bdev_io->u.reset.ch_ref != NULL) { 3104 spdk_put_io_channel(bdev_io->u.reset.ch_ref); 3105 bdev_io->u.reset.ch_ref = NULL; 3106 } 3107 3108 _spdk_bdev_io_complete(bdev_io); 3109 } 3110 3111 static void 3112 _spdk_bdev_unfreeze_channel(struct spdk_io_channel_iter *i) 3113 { 3114 struct spdk_io_channel *_ch = spdk_io_channel_iter_get_channel(i); 3115 struct spdk_bdev_channel *ch = spdk_io_channel_get_ctx(_ch); 3116 3117 ch->flags &= ~BDEV_CH_RESET_IN_PROGRESS; 3118 if (!TAILQ_EMPTY(&ch->queued_resets)) { 3119 _spdk_bdev_channel_start_reset(ch); 3120 } 3121 3122 spdk_for_each_channel_continue(i, 0); 3123 } 3124 3125 void 3126 spdk_bdev_io_complete(struct spdk_bdev_io *bdev_io, enum spdk_bdev_io_status status) 3127 { 3128 struct spdk_bdev *bdev = bdev_io->bdev; 3129 struct spdk_bdev_channel *bdev_ch = bdev_io->internal.ch; 3130 struct spdk_bdev_shared_resource *shared_resource = bdev_ch->shared_resource; 3131 3132 bdev_io->internal.status = status; 3133 3134 if (spdk_unlikely(bdev_io->type == SPDK_BDEV_IO_TYPE_RESET)) { 3135 bool unlock_channels = false; 3136 3137 if (status == SPDK_BDEV_IO_STATUS_NOMEM) { 3138 SPDK_ERRLOG("NOMEM returned for reset\n"); 3139 } 3140 pthread_mutex_lock(&bdev->internal.mutex); 3141 if (bdev_io == bdev->internal.reset_in_progress) { 3142 bdev->internal.reset_in_progress = NULL; 3143 unlock_channels = true; 3144 } 3145 pthread_mutex_unlock(&bdev->internal.mutex); 3146 3147 if (unlock_channels) { 3148 spdk_for_each_channel(__bdev_to_io_dev(bdev), _spdk_bdev_unfreeze_channel, 3149 bdev_io, _spdk_bdev_reset_complete); 3150 return; 3151 } 3152 } else { 3153 if (spdk_unlikely(bdev_io->internal.orig_iovcnt > 0)) { 3154 _bdev_io_unset_bounce_buf(bdev_io); 3155 } 3156 3157 assert(bdev_ch->io_outstanding > 0); 3158 assert(shared_resource->io_outstanding > 0); 3159 bdev_ch->io_outstanding--; 3160 shared_resource->io_outstanding--; 3161 3162 if (spdk_unlikely(status == SPDK_BDEV_IO_STATUS_NOMEM)) { 3163 TAILQ_INSERT_HEAD(&shared_resource->nomem_io, bdev_io, internal.link); 3164 /* 3165 * Wait for some of the outstanding I/O to complete before we 3166 * retry any of the nomem_io. Normally we will wait for 3167 * NOMEM_THRESHOLD_COUNT I/O to complete but for low queue 3168 * depth channels we will instead wait for half to complete. 3169 */ 3170 shared_resource->nomem_threshold = spdk_max((int64_t)shared_resource->io_outstanding / 2, 3171 (int64_t)shared_resource->io_outstanding - NOMEM_THRESHOLD_COUNT); 3172 return; 3173 } 3174 3175 if (spdk_unlikely(!TAILQ_EMPTY(&shared_resource->nomem_io))) { 3176 _spdk_bdev_ch_retry_io(bdev_ch); 3177 } 3178 } 3179 3180 _spdk_bdev_io_complete(bdev_io); 3181 } 3182 3183 void 3184 spdk_bdev_io_complete_scsi_status(struct spdk_bdev_io *bdev_io, enum spdk_scsi_status sc, 3185 enum spdk_scsi_sense sk, uint8_t asc, uint8_t ascq) 3186 { 3187 if (sc == SPDK_SCSI_STATUS_GOOD) { 3188 bdev_io->internal.status = SPDK_BDEV_IO_STATUS_SUCCESS; 3189 } else { 3190 bdev_io->internal.status = SPDK_BDEV_IO_STATUS_SCSI_ERROR; 3191 bdev_io->internal.error.scsi.sc = sc; 3192 bdev_io->internal.error.scsi.sk = sk; 3193 bdev_io->internal.error.scsi.asc = asc; 3194 bdev_io->internal.error.scsi.ascq = ascq; 3195 } 3196 3197 spdk_bdev_io_complete(bdev_io, bdev_io->internal.status); 3198 } 3199 3200 void 3201 spdk_bdev_io_get_scsi_status(const struct spdk_bdev_io *bdev_io, 3202 int *sc, int *sk, int *asc, int *ascq) 3203 { 3204 assert(sc != NULL); 3205 assert(sk != NULL); 3206 assert(asc != NULL); 3207 assert(ascq != NULL); 3208 3209 switch (bdev_io->internal.status) { 3210 case SPDK_BDEV_IO_STATUS_SUCCESS: 3211 *sc = SPDK_SCSI_STATUS_GOOD; 3212 *sk = SPDK_SCSI_SENSE_NO_SENSE; 3213 *asc = SPDK_SCSI_ASC_NO_ADDITIONAL_SENSE; 3214 *ascq = SPDK_SCSI_ASCQ_CAUSE_NOT_REPORTABLE; 3215 break; 3216 case SPDK_BDEV_IO_STATUS_NVME_ERROR: 3217 spdk_scsi_nvme_translate(bdev_io, sc, sk, asc, ascq); 3218 break; 3219 case SPDK_BDEV_IO_STATUS_SCSI_ERROR: 3220 *sc = bdev_io->internal.error.scsi.sc; 3221 *sk = bdev_io->internal.error.scsi.sk; 3222 *asc = bdev_io->internal.error.scsi.asc; 3223 *ascq = bdev_io->internal.error.scsi.ascq; 3224 break; 3225 default: 3226 *sc = SPDK_SCSI_STATUS_CHECK_CONDITION; 3227 *sk = SPDK_SCSI_SENSE_ABORTED_COMMAND; 3228 *asc = SPDK_SCSI_ASC_NO_ADDITIONAL_SENSE; 3229 *ascq = SPDK_SCSI_ASCQ_CAUSE_NOT_REPORTABLE; 3230 break; 3231 } 3232 } 3233 3234 void 3235 spdk_bdev_io_complete_nvme_status(struct spdk_bdev_io *bdev_io, int sct, int sc) 3236 { 3237 if (sct == SPDK_NVME_SCT_GENERIC && sc == SPDK_NVME_SC_SUCCESS) { 3238 bdev_io->internal.status = SPDK_BDEV_IO_STATUS_SUCCESS; 3239 } else { 3240 bdev_io->internal.error.nvme.sct = sct; 3241 bdev_io->internal.error.nvme.sc = sc; 3242 bdev_io->internal.status = SPDK_BDEV_IO_STATUS_NVME_ERROR; 3243 } 3244 3245 spdk_bdev_io_complete(bdev_io, bdev_io->internal.status); 3246 } 3247 3248 void 3249 spdk_bdev_io_get_nvme_status(const struct spdk_bdev_io *bdev_io, int *sct, int *sc) 3250 { 3251 assert(sct != NULL); 3252 assert(sc != NULL); 3253 3254 if (bdev_io->internal.status == SPDK_BDEV_IO_STATUS_NVME_ERROR) { 3255 *sct = bdev_io->internal.error.nvme.sct; 3256 *sc = bdev_io->internal.error.nvme.sc; 3257 } else if (bdev_io->internal.status == SPDK_BDEV_IO_STATUS_SUCCESS) { 3258 *sct = SPDK_NVME_SCT_GENERIC; 3259 *sc = SPDK_NVME_SC_SUCCESS; 3260 } else { 3261 *sct = SPDK_NVME_SCT_GENERIC; 3262 *sc = SPDK_NVME_SC_INTERNAL_DEVICE_ERROR; 3263 } 3264 } 3265 3266 struct spdk_thread * 3267 spdk_bdev_io_get_thread(struct spdk_bdev_io *bdev_io) 3268 { 3269 return spdk_io_channel_get_thread(bdev_io->internal.ch->channel); 3270 } 3271 3272 static void 3273 _spdk_bdev_qos_config_limit(struct spdk_bdev *bdev, uint64_t *limits) 3274 { 3275 uint64_t min_qos_set; 3276 int i; 3277 3278 for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) { 3279 if (limits[i] != SPDK_BDEV_QOS_LIMIT_NOT_DEFINED) { 3280 break; 3281 } 3282 } 3283 3284 if (i == SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES) { 3285 SPDK_ERRLOG("Invalid rate limits set.\n"); 3286 return; 3287 } 3288 3289 for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) { 3290 if (limits[i] == SPDK_BDEV_QOS_LIMIT_NOT_DEFINED) { 3291 continue; 3292 } 3293 3294 if (_spdk_bdev_qos_is_iops_rate_limit(i) == true) { 3295 min_qos_set = SPDK_BDEV_QOS_MIN_IOS_PER_SEC; 3296 } else { 3297 min_qos_set = SPDK_BDEV_QOS_MIN_BYTES_PER_SEC; 3298 } 3299 3300 if (limits[i] == 0 || limits[i] % min_qos_set) { 3301 SPDK_ERRLOG("Assigned limit %" PRIu64 " on bdev %s is not multiple of %" PRIu64 "\n", 3302 limits[i], bdev->name, min_qos_set); 3303 SPDK_ERRLOG("Failed to enable QoS on this bdev %s\n", bdev->name); 3304 return; 3305 } 3306 } 3307 3308 if (!bdev->internal.qos) { 3309 bdev->internal.qos = calloc(1, sizeof(*bdev->internal.qos)); 3310 if (!bdev->internal.qos) { 3311 SPDK_ERRLOG("Unable to allocate memory for QoS tracking\n"); 3312 return; 3313 } 3314 } 3315 3316 for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) { 3317 bdev->internal.qos->rate_limits[i].limit = limits[i]; 3318 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Bdev:%s QoS type:%d set:%lu\n", 3319 bdev->name, i, limits[i]); 3320 } 3321 3322 return; 3323 } 3324 3325 static void 3326 _spdk_bdev_qos_config(struct spdk_bdev *bdev) 3327 { 3328 struct spdk_conf_section *sp = NULL; 3329 const char *val = NULL; 3330 int i = 0, j = 0; 3331 uint64_t limits[SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES] = {}; 3332 bool config_qos = false; 3333 3334 sp = spdk_conf_find_section(NULL, "QoS"); 3335 if (!sp) { 3336 return; 3337 } 3338 3339 while (j < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES) { 3340 limits[j] = SPDK_BDEV_QOS_LIMIT_NOT_DEFINED; 3341 3342 i = 0; 3343 while (true) { 3344 val = spdk_conf_section_get_nmval(sp, qos_conf_type[j], i, 0); 3345 if (!val) { 3346 break; 3347 } 3348 3349 if (strcmp(bdev->name, val) != 0) { 3350 i++; 3351 continue; 3352 } 3353 3354 val = spdk_conf_section_get_nmval(sp, qos_conf_type[j], i, 1); 3355 if (val) { 3356 if (_spdk_bdev_qos_is_iops_rate_limit(j) == true) { 3357 limits[j] = strtoull(val, NULL, 10); 3358 } else { 3359 limits[j] = strtoull(val, NULL, 10) * 1024 * 1024; 3360 } 3361 config_qos = true; 3362 } 3363 3364 break; 3365 } 3366 3367 j++; 3368 } 3369 3370 if (config_qos == true) { 3371 _spdk_bdev_qos_config_limit(bdev, limits); 3372 } 3373 3374 return; 3375 } 3376 3377 static int 3378 spdk_bdev_init(struct spdk_bdev *bdev) 3379 { 3380 char *bdev_name; 3381 3382 assert(bdev->module != NULL); 3383 3384 if (!bdev->name) { 3385 SPDK_ERRLOG("Bdev name is NULL\n"); 3386 return -EINVAL; 3387 } 3388 3389 if (spdk_bdev_get_by_name(bdev->name)) { 3390 SPDK_ERRLOG("Bdev name:%s already exists\n", bdev->name); 3391 return -EEXIST; 3392 } 3393 3394 /* Users often register their own I/O devices using the bdev name. In 3395 * order to avoid conflicts, prepend bdev_. */ 3396 bdev_name = spdk_sprintf_alloc("bdev_%s", bdev->name); 3397 if (!bdev_name) { 3398 SPDK_ERRLOG("Unable to allocate memory for internal bdev name.\n"); 3399 return -ENOMEM; 3400 } 3401 3402 bdev->internal.status = SPDK_BDEV_STATUS_READY; 3403 bdev->internal.measured_queue_depth = UINT64_MAX; 3404 bdev->internal.claim_module = NULL; 3405 bdev->internal.qd_poller = NULL; 3406 bdev->internal.qos = NULL; 3407 3408 if (spdk_bdev_get_buf_align(bdev) > 1) { 3409 if (bdev->split_on_optimal_io_boundary) { 3410 bdev->optimal_io_boundary = spdk_min(bdev->optimal_io_boundary, 3411 SPDK_BDEV_LARGE_BUF_MAX_SIZE / bdev->blocklen); 3412 } else { 3413 bdev->split_on_optimal_io_boundary = true; 3414 bdev->optimal_io_boundary = SPDK_BDEV_LARGE_BUF_MAX_SIZE / bdev->blocklen; 3415 } 3416 } 3417 3418 TAILQ_INIT(&bdev->internal.open_descs); 3419 3420 TAILQ_INIT(&bdev->aliases); 3421 3422 bdev->internal.reset_in_progress = NULL; 3423 3424 _spdk_bdev_qos_config(bdev); 3425 3426 spdk_io_device_register(__bdev_to_io_dev(bdev), 3427 spdk_bdev_channel_create, spdk_bdev_channel_destroy, 3428 sizeof(struct spdk_bdev_channel), 3429 bdev_name); 3430 3431 free(bdev_name); 3432 3433 pthread_mutex_init(&bdev->internal.mutex, NULL); 3434 return 0; 3435 } 3436 3437 static void 3438 spdk_bdev_destroy_cb(void *io_device) 3439 { 3440 int rc; 3441 struct spdk_bdev *bdev; 3442 spdk_bdev_unregister_cb cb_fn; 3443 void *cb_arg; 3444 3445 bdev = __bdev_from_io_dev(io_device); 3446 cb_fn = bdev->internal.unregister_cb; 3447 cb_arg = bdev->internal.unregister_ctx; 3448 3449 rc = bdev->fn_table->destruct(bdev->ctxt); 3450 if (rc < 0) { 3451 SPDK_ERRLOG("destruct failed\n"); 3452 } 3453 if (rc <= 0 && cb_fn != NULL) { 3454 cb_fn(cb_arg, rc); 3455 } 3456 } 3457 3458 3459 static void 3460 spdk_bdev_fini(struct spdk_bdev *bdev) 3461 { 3462 pthread_mutex_destroy(&bdev->internal.mutex); 3463 3464 free(bdev->internal.qos); 3465 3466 spdk_io_device_unregister(__bdev_to_io_dev(bdev), spdk_bdev_destroy_cb); 3467 } 3468 3469 static void 3470 spdk_bdev_start(struct spdk_bdev *bdev) 3471 { 3472 struct spdk_bdev_module *module; 3473 uint32_t action; 3474 3475 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Inserting bdev %s into list\n", bdev->name); 3476 TAILQ_INSERT_TAIL(&g_bdev_mgr.bdevs, bdev, internal.link); 3477 3478 /* Examine configuration before initializing I/O */ 3479 TAILQ_FOREACH(module, &g_bdev_mgr.bdev_modules, internal.tailq) { 3480 if (module->examine_config) { 3481 action = module->internal.action_in_progress; 3482 module->internal.action_in_progress++; 3483 module->examine_config(bdev); 3484 if (action != module->internal.action_in_progress) { 3485 SPDK_ERRLOG("examine_config for module %s did not call spdk_bdev_module_examine_done()\n", 3486 module->name); 3487 } 3488 } 3489 } 3490 3491 if (bdev->internal.claim_module) { 3492 return; 3493 } 3494 3495 TAILQ_FOREACH(module, &g_bdev_mgr.bdev_modules, internal.tailq) { 3496 if (module->examine_disk) { 3497 module->internal.action_in_progress++; 3498 module->examine_disk(bdev); 3499 } 3500 } 3501 } 3502 3503 int 3504 spdk_bdev_register(struct spdk_bdev *bdev) 3505 { 3506 int rc = spdk_bdev_init(bdev); 3507 3508 if (rc == 0) { 3509 spdk_bdev_start(bdev); 3510 } 3511 3512 return rc; 3513 } 3514 3515 int 3516 spdk_vbdev_register(struct spdk_bdev *vbdev, struct spdk_bdev **base_bdevs, int base_bdev_count) 3517 { 3518 int rc; 3519 3520 rc = spdk_bdev_init(vbdev); 3521 if (rc) { 3522 return rc; 3523 } 3524 3525 spdk_bdev_start(vbdev); 3526 return 0; 3527 } 3528 3529 void 3530 spdk_bdev_destruct_done(struct spdk_bdev *bdev, int bdeverrno) 3531 { 3532 if (bdev->internal.unregister_cb != NULL) { 3533 bdev->internal.unregister_cb(bdev->internal.unregister_ctx, bdeverrno); 3534 } 3535 } 3536 3537 static void 3538 _remove_notify(void *arg) 3539 { 3540 struct spdk_bdev_desc *desc = arg; 3541 3542 desc->remove_scheduled = false; 3543 3544 if (desc->closed) { 3545 free(desc); 3546 } else { 3547 desc->remove_cb(desc->remove_ctx); 3548 } 3549 } 3550 3551 void 3552 spdk_bdev_unregister(struct spdk_bdev *bdev, spdk_bdev_unregister_cb cb_fn, void *cb_arg) 3553 { 3554 struct spdk_bdev_desc *desc, *tmp; 3555 bool do_destruct = true; 3556 struct spdk_thread *thread; 3557 3558 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Removing bdev %s from list\n", bdev->name); 3559 3560 thread = spdk_get_thread(); 3561 if (!thread) { 3562 /* The user called this from a non-SPDK thread. */ 3563 if (cb_fn != NULL) { 3564 cb_fn(cb_arg, -ENOTSUP); 3565 } 3566 return; 3567 } 3568 3569 pthread_mutex_lock(&bdev->internal.mutex); 3570 3571 bdev->internal.status = SPDK_BDEV_STATUS_REMOVING; 3572 bdev->internal.unregister_cb = cb_fn; 3573 bdev->internal.unregister_ctx = cb_arg; 3574 3575 TAILQ_FOREACH_SAFE(desc, &bdev->internal.open_descs, link, tmp) { 3576 if (desc->remove_cb) { 3577 do_destruct = false; 3578 /* 3579 * Defer invocation of the remove_cb to a separate message that will 3580 * run later on its thread. This ensures this context unwinds and 3581 * we don't recursively unregister this bdev again if the remove_cb 3582 * immediately closes its descriptor. 3583 */ 3584 if (!desc->remove_scheduled) { 3585 /* Avoid scheduling removal of the same descriptor multiple times. */ 3586 desc->remove_scheduled = true; 3587 spdk_thread_send_msg(desc->thread, _remove_notify, desc); 3588 } 3589 } 3590 } 3591 3592 if (!do_destruct) { 3593 pthread_mutex_unlock(&bdev->internal.mutex); 3594 return; 3595 } 3596 3597 TAILQ_REMOVE(&g_bdev_mgr.bdevs, bdev, internal.link); 3598 pthread_mutex_unlock(&bdev->internal.mutex); 3599 3600 spdk_bdev_fini(bdev); 3601 } 3602 3603 int 3604 spdk_bdev_open(struct spdk_bdev *bdev, bool write, spdk_bdev_remove_cb_t remove_cb, 3605 void *remove_ctx, struct spdk_bdev_desc **_desc) 3606 { 3607 struct spdk_bdev_desc *desc; 3608 struct spdk_thread *thread; 3609 3610 thread = spdk_get_thread(); 3611 if (!thread) { 3612 SPDK_ERRLOG("Cannot open bdev from non-SPDK thread.\n"); 3613 return -ENOTSUP; 3614 } 3615 3616 desc = calloc(1, sizeof(*desc)); 3617 if (desc == NULL) { 3618 SPDK_ERRLOG("Failed to allocate memory for bdev descriptor\n"); 3619 return -ENOMEM; 3620 } 3621 3622 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Opening descriptor %p for bdev %s on thread %p\n", desc, bdev->name, 3623 spdk_get_thread()); 3624 3625 pthread_mutex_lock(&bdev->internal.mutex); 3626 3627 if (write && bdev->internal.claim_module) { 3628 SPDK_ERRLOG("Could not open %s - %s module already claimed it\n", 3629 bdev->name, bdev->internal.claim_module->name); 3630 free(desc); 3631 pthread_mutex_unlock(&bdev->internal.mutex); 3632 return -EPERM; 3633 } 3634 3635 TAILQ_INSERT_TAIL(&bdev->internal.open_descs, desc, link); 3636 3637 desc->bdev = bdev; 3638 desc->thread = thread; 3639 desc->remove_cb = remove_cb; 3640 desc->remove_ctx = remove_ctx; 3641 desc->write = write; 3642 *_desc = desc; 3643 3644 pthread_mutex_unlock(&bdev->internal.mutex); 3645 3646 return 0; 3647 } 3648 3649 void 3650 spdk_bdev_close(struct spdk_bdev_desc *desc) 3651 { 3652 struct spdk_bdev *bdev = desc->bdev; 3653 bool do_unregister = false; 3654 3655 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Closing descriptor %p for bdev %s on thread %p\n", desc, bdev->name, 3656 spdk_get_thread()); 3657 3658 assert(desc->thread == spdk_get_thread()); 3659 3660 pthread_mutex_lock(&bdev->internal.mutex); 3661 3662 TAILQ_REMOVE(&bdev->internal.open_descs, desc, link); 3663 3664 desc->closed = true; 3665 3666 if (!desc->remove_scheduled) { 3667 free(desc); 3668 } 3669 3670 /* If no more descriptors, kill QoS channel */ 3671 if (bdev->internal.qos && TAILQ_EMPTY(&bdev->internal.open_descs)) { 3672 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Closed last descriptor for bdev %s on thread %p. Stopping QoS.\n", 3673 bdev->name, spdk_get_thread()); 3674 3675 if (spdk_bdev_qos_destroy(bdev)) { 3676 /* There isn't anything we can do to recover here. Just let the 3677 * old QoS poller keep running. The QoS handling won't change 3678 * cores when the user allocates a new channel, but it won't break. */ 3679 SPDK_ERRLOG("Unable to shut down QoS poller. It will continue running on the current thread.\n"); 3680 } 3681 } 3682 3683 spdk_bdev_set_qd_sampling_period(bdev, 0); 3684 3685 if (bdev->internal.status == SPDK_BDEV_STATUS_REMOVING && TAILQ_EMPTY(&bdev->internal.open_descs)) { 3686 do_unregister = true; 3687 } 3688 pthread_mutex_unlock(&bdev->internal.mutex); 3689 3690 if (do_unregister == true) { 3691 spdk_bdev_unregister(bdev, bdev->internal.unregister_cb, bdev->internal.unregister_ctx); 3692 } 3693 } 3694 3695 int 3696 spdk_bdev_module_claim_bdev(struct spdk_bdev *bdev, struct spdk_bdev_desc *desc, 3697 struct spdk_bdev_module *module) 3698 { 3699 if (bdev->internal.claim_module != NULL) { 3700 SPDK_ERRLOG("bdev %s already claimed by module %s\n", bdev->name, 3701 bdev->internal.claim_module->name); 3702 return -EPERM; 3703 } 3704 3705 if (desc && !desc->write) { 3706 desc->write = true; 3707 } 3708 3709 bdev->internal.claim_module = module; 3710 return 0; 3711 } 3712 3713 void 3714 spdk_bdev_module_release_bdev(struct spdk_bdev *bdev) 3715 { 3716 assert(bdev->internal.claim_module != NULL); 3717 bdev->internal.claim_module = NULL; 3718 } 3719 3720 struct spdk_bdev * 3721 spdk_bdev_desc_get_bdev(struct spdk_bdev_desc *desc) 3722 { 3723 return desc->bdev; 3724 } 3725 3726 void 3727 spdk_bdev_io_get_iovec(struct spdk_bdev_io *bdev_io, struct iovec **iovp, int *iovcntp) 3728 { 3729 struct iovec *iovs; 3730 int iovcnt; 3731 3732 if (bdev_io == NULL) { 3733 return; 3734 } 3735 3736 switch (bdev_io->type) { 3737 case SPDK_BDEV_IO_TYPE_READ: 3738 iovs = bdev_io->u.bdev.iovs; 3739 iovcnt = bdev_io->u.bdev.iovcnt; 3740 break; 3741 case SPDK_BDEV_IO_TYPE_WRITE: 3742 iovs = bdev_io->u.bdev.iovs; 3743 iovcnt = bdev_io->u.bdev.iovcnt; 3744 break; 3745 default: 3746 iovs = NULL; 3747 iovcnt = 0; 3748 break; 3749 } 3750 3751 if (iovp) { 3752 *iovp = iovs; 3753 } 3754 if (iovcntp) { 3755 *iovcntp = iovcnt; 3756 } 3757 } 3758 3759 void 3760 spdk_bdev_module_list_add(struct spdk_bdev_module *bdev_module) 3761 { 3762 3763 if (spdk_bdev_module_list_find(bdev_module->name)) { 3764 SPDK_ERRLOG("ERROR: module '%s' already registered.\n", bdev_module->name); 3765 assert(false); 3766 } 3767 3768 if (bdev_module->async_init) { 3769 bdev_module->internal.action_in_progress = 1; 3770 } 3771 3772 /* 3773 * Modules with examine callbacks must be initialized first, so they are 3774 * ready to handle examine callbacks from later modules that will 3775 * register physical bdevs. 3776 */ 3777 if (bdev_module->examine_config != NULL || bdev_module->examine_disk != NULL) { 3778 TAILQ_INSERT_HEAD(&g_bdev_mgr.bdev_modules, bdev_module, internal.tailq); 3779 } else { 3780 TAILQ_INSERT_TAIL(&g_bdev_mgr.bdev_modules, bdev_module, internal.tailq); 3781 } 3782 } 3783 3784 struct spdk_bdev_module * 3785 spdk_bdev_module_list_find(const char *name) 3786 { 3787 struct spdk_bdev_module *bdev_module; 3788 3789 TAILQ_FOREACH(bdev_module, &g_bdev_mgr.bdev_modules, internal.tailq) { 3790 if (strcmp(name, bdev_module->name) == 0) { 3791 break; 3792 } 3793 } 3794 3795 return bdev_module; 3796 } 3797 3798 static void 3799 _spdk_bdev_write_zero_buffer_next(void *_bdev_io) 3800 { 3801 struct spdk_bdev_io *bdev_io = _bdev_io; 3802 uint64_t num_bytes, num_blocks; 3803 int rc; 3804 3805 num_bytes = spdk_min(spdk_bdev_get_block_size(bdev_io->bdev) * 3806 bdev_io->u.bdev.split_remaining_num_blocks, 3807 ZERO_BUFFER_SIZE); 3808 num_blocks = num_bytes / spdk_bdev_get_block_size(bdev_io->bdev); 3809 3810 rc = spdk_bdev_write_blocks(bdev_io->internal.desc, 3811 spdk_io_channel_from_ctx(bdev_io->internal.ch), 3812 g_bdev_mgr.zero_buffer, 3813 bdev_io->u.bdev.split_current_offset_blocks, num_blocks, 3814 _spdk_bdev_write_zero_buffer_done, bdev_io); 3815 if (rc == 0) { 3816 bdev_io->u.bdev.split_remaining_num_blocks -= num_blocks; 3817 bdev_io->u.bdev.split_current_offset_blocks += num_blocks; 3818 } else if (rc == -ENOMEM) { 3819 _spdk_bdev_queue_io_wait_with_cb(bdev_io, _spdk_bdev_write_zero_buffer_next); 3820 } else { 3821 bdev_io->internal.status = SPDK_BDEV_IO_STATUS_FAILED; 3822 bdev_io->internal.cb(bdev_io, false, bdev_io->internal.caller_ctx); 3823 } 3824 } 3825 3826 static void 3827 _spdk_bdev_write_zero_buffer_done(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg) 3828 { 3829 struct spdk_bdev_io *parent_io = cb_arg; 3830 3831 spdk_bdev_free_io(bdev_io); 3832 3833 if (!success) { 3834 parent_io->internal.status = SPDK_BDEV_IO_STATUS_FAILED; 3835 parent_io->internal.cb(parent_io, false, parent_io->internal.caller_ctx); 3836 return; 3837 } 3838 3839 if (parent_io->u.bdev.split_remaining_num_blocks == 0) { 3840 parent_io->internal.status = SPDK_BDEV_IO_STATUS_SUCCESS; 3841 parent_io->internal.cb(parent_io, true, parent_io->internal.caller_ctx); 3842 return; 3843 } 3844 3845 _spdk_bdev_write_zero_buffer_next(parent_io); 3846 } 3847 3848 struct set_qos_limit_ctx { 3849 void (*cb_fn)(void *cb_arg, int status); 3850 void *cb_arg; 3851 struct spdk_bdev *bdev; 3852 }; 3853 3854 static void 3855 _spdk_bdev_set_qos_limit_done(struct set_qos_limit_ctx *ctx, int status) 3856 { 3857 pthread_mutex_lock(&ctx->bdev->internal.mutex); 3858 ctx->bdev->internal.qos_mod_in_progress = false; 3859 pthread_mutex_unlock(&ctx->bdev->internal.mutex); 3860 3861 ctx->cb_fn(ctx->cb_arg, status); 3862 free(ctx); 3863 } 3864 3865 static void 3866 _spdk_bdev_disable_qos_done(void *cb_arg) 3867 { 3868 struct set_qos_limit_ctx *ctx = cb_arg; 3869 struct spdk_bdev *bdev = ctx->bdev; 3870 struct spdk_bdev_io *bdev_io; 3871 struct spdk_bdev_qos *qos; 3872 3873 pthread_mutex_lock(&bdev->internal.mutex); 3874 qos = bdev->internal.qos; 3875 bdev->internal.qos = NULL; 3876 pthread_mutex_unlock(&bdev->internal.mutex); 3877 3878 while (!TAILQ_EMPTY(&qos->queued)) { 3879 /* Send queued I/O back to their original thread for resubmission. */ 3880 bdev_io = TAILQ_FIRST(&qos->queued); 3881 TAILQ_REMOVE(&qos->queued, bdev_io, internal.link); 3882 3883 if (bdev_io->internal.io_submit_ch) { 3884 /* 3885 * Channel was changed when sending it to the QoS thread - change it back 3886 * before sending it back to the original thread. 3887 */ 3888 bdev_io->internal.ch = bdev_io->internal.io_submit_ch; 3889 bdev_io->internal.io_submit_ch = NULL; 3890 } 3891 3892 spdk_thread_send_msg(spdk_io_channel_get_thread(bdev_io->internal.ch->channel), 3893 _spdk_bdev_io_submit, bdev_io); 3894 } 3895 3896 spdk_put_io_channel(spdk_io_channel_from_ctx(qos->ch)); 3897 spdk_poller_unregister(&qos->poller); 3898 3899 free(qos); 3900 3901 _spdk_bdev_set_qos_limit_done(ctx, 0); 3902 } 3903 3904 static void 3905 _spdk_bdev_disable_qos_msg_done(struct spdk_io_channel_iter *i, int status) 3906 { 3907 void *io_device = spdk_io_channel_iter_get_io_device(i); 3908 struct spdk_bdev *bdev = __bdev_from_io_dev(io_device); 3909 struct set_qos_limit_ctx *ctx = spdk_io_channel_iter_get_ctx(i); 3910 struct spdk_thread *thread; 3911 3912 pthread_mutex_lock(&bdev->internal.mutex); 3913 thread = bdev->internal.qos->thread; 3914 pthread_mutex_unlock(&bdev->internal.mutex); 3915 3916 spdk_thread_send_msg(thread, _spdk_bdev_disable_qos_done, ctx); 3917 } 3918 3919 static void 3920 _spdk_bdev_disable_qos_msg(struct spdk_io_channel_iter *i) 3921 { 3922 struct spdk_io_channel *ch = spdk_io_channel_iter_get_channel(i); 3923 struct spdk_bdev_channel *bdev_ch = spdk_io_channel_get_ctx(ch); 3924 3925 bdev_ch->flags &= ~BDEV_CH_QOS_ENABLED; 3926 3927 spdk_for_each_channel_continue(i, 0); 3928 } 3929 3930 static void 3931 _spdk_bdev_update_qos_rate_limit_msg(void *cb_arg) 3932 { 3933 struct set_qos_limit_ctx *ctx = cb_arg; 3934 struct spdk_bdev *bdev = ctx->bdev; 3935 3936 pthread_mutex_lock(&bdev->internal.mutex); 3937 spdk_bdev_qos_update_max_quota_per_timeslice(bdev->internal.qos); 3938 pthread_mutex_unlock(&bdev->internal.mutex); 3939 3940 _spdk_bdev_set_qos_limit_done(ctx, 0); 3941 } 3942 3943 static void 3944 _spdk_bdev_enable_qos_msg(struct spdk_io_channel_iter *i) 3945 { 3946 void *io_device = spdk_io_channel_iter_get_io_device(i); 3947 struct spdk_bdev *bdev = __bdev_from_io_dev(io_device); 3948 struct spdk_io_channel *ch = spdk_io_channel_iter_get_channel(i); 3949 struct spdk_bdev_channel *bdev_ch = spdk_io_channel_get_ctx(ch); 3950 3951 pthread_mutex_lock(&bdev->internal.mutex); 3952 _spdk_bdev_enable_qos(bdev, bdev_ch); 3953 pthread_mutex_unlock(&bdev->internal.mutex); 3954 spdk_for_each_channel_continue(i, 0); 3955 } 3956 3957 static void 3958 _spdk_bdev_enable_qos_done(struct spdk_io_channel_iter *i, int status) 3959 { 3960 struct set_qos_limit_ctx *ctx = spdk_io_channel_iter_get_ctx(i); 3961 3962 _spdk_bdev_set_qos_limit_done(ctx, status); 3963 } 3964 3965 static void 3966 _spdk_bdev_set_qos_rate_limits(struct spdk_bdev *bdev, uint64_t *limits) 3967 { 3968 int i; 3969 3970 assert(bdev->internal.qos != NULL); 3971 3972 for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) { 3973 if (limits[i] != SPDK_BDEV_QOS_LIMIT_NOT_DEFINED) { 3974 bdev->internal.qos->rate_limits[i].limit = limits[i]; 3975 3976 if (limits[i] == 0) { 3977 bdev->internal.qos->rate_limits[i].limit = 3978 SPDK_BDEV_QOS_LIMIT_NOT_DEFINED; 3979 } 3980 } 3981 } 3982 } 3983 3984 void 3985 spdk_bdev_set_qos_rate_limits(struct spdk_bdev *bdev, uint64_t *limits, 3986 void (*cb_fn)(void *cb_arg, int status), void *cb_arg) 3987 { 3988 struct set_qos_limit_ctx *ctx; 3989 uint32_t limit_set_complement; 3990 uint64_t min_limit_per_sec; 3991 int i; 3992 bool disable_rate_limit = true; 3993 3994 for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) { 3995 if (limits[i] == SPDK_BDEV_QOS_LIMIT_NOT_DEFINED) { 3996 continue; 3997 } 3998 3999 if (limits[i] > 0) { 4000 disable_rate_limit = false; 4001 } 4002 4003 if (_spdk_bdev_qos_is_iops_rate_limit(i) == true) { 4004 min_limit_per_sec = SPDK_BDEV_QOS_MIN_IOS_PER_SEC; 4005 } else { 4006 /* Change from megabyte to byte rate limit */ 4007 limits[i] = limits[i] * 1024 * 1024; 4008 min_limit_per_sec = SPDK_BDEV_QOS_MIN_BYTES_PER_SEC; 4009 } 4010 4011 limit_set_complement = limits[i] % min_limit_per_sec; 4012 if (limit_set_complement) { 4013 SPDK_ERRLOG("Requested rate limit %" PRIu64 " is not a multiple of %" PRIu64 "\n", 4014 limits[i], min_limit_per_sec); 4015 limits[i] += min_limit_per_sec - limit_set_complement; 4016 SPDK_ERRLOG("Round up the rate limit to %" PRIu64 "\n", limits[i]); 4017 } 4018 } 4019 4020 ctx = calloc(1, sizeof(*ctx)); 4021 if (ctx == NULL) { 4022 cb_fn(cb_arg, -ENOMEM); 4023 return; 4024 } 4025 4026 ctx->cb_fn = cb_fn; 4027 ctx->cb_arg = cb_arg; 4028 ctx->bdev = bdev; 4029 4030 pthread_mutex_lock(&bdev->internal.mutex); 4031 if (bdev->internal.qos_mod_in_progress) { 4032 pthread_mutex_unlock(&bdev->internal.mutex); 4033 free(ctx); 4034 cb_fn(cb_arg, -EAGAIN); 4035 return; 4036 } 4037 bdev->internal.qos_mod_in_progress = true; 4038 4039 if (disable_rate_limit == true && bdev->internal.qos) { 4040 for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) { 4041 if (limits[i] == SPDK_BDEV_QOS_LIMIT_NOT_DEFINED && 4042 (bdev->internal.qos->rate_limits[i].limit > 0 && 4043 bdev->internal.qos->rate_limits[i].limit != 4044 SPDK_BDEV_QOS_LIMIT_NOT_DEFINED)) { 4045 disable_rate_limit = false; 4046 break; 4047 } 4048 } 4049 } 4050 4051 if (disable_rate_limit == false) { 4052 if (bdev->internal.qos == NULL) { 4053 /* Enabling */ 4054 bdev->internal.qos = calloc(1, sizeof(*bdev->internal.qos)); 4055 if (!bdev->internal.qos) { 4056 pthread_mutex_unlock(&bdev->internal.mutex); 4057 SPDK_ERRLOG("Unable to allocate memory for QoS tracking\n"); 4058 free(ctx); 4059 cb_fn(cb_arg, -ENOMEM); 4060 return; 4061 } 4062 4063 _spdk_bdev_set_qos_rate_limits(bdev, limits); 4064 4065 spdk_for_each_channel(__bdev_to_io_dev(bdev), 4066 _spdk_bdev_enable_qos_msg, ctx, 4067 _spdk_bdev_enable_qos_done); 4068 } else { 4069 /* Updating */ 4070 _spdk_bdev_set_qos_rate_limits(bdev, limits); 4071 4072 spdk_thread_send_msg(bdev->internal.qos->thread, 4073 _spdk_bdev_update_qos_rate_limit_msg, ctx); 4074 } 4075 } else { 4076 if (bdev->internal.qos != NULL) { 4077 _spdk_bdev_set_qos_rate_limits(bdev, limits); 4078 4079 /* Disabling */ 4080 spdk_for_each_channel(__bdev_to_io_dev(bdev), 4081 _spdk_bdev_disable_qos_msg, ctx, 4082 _spdk_bdev_disable_qos_msg_done); 4083 } else { 4084 pthread_mutex_unlock(&bdev->internal.mutex); 4085 _spdk_bdev_set_qos_limit_done(ctx, 0); 4086 return; 4087 } 4088 } 4089 4090 pthread_mutex_unlock(&bdev->internal.mutex); 4091 } 4092 4093 SPDK_LOG_REGISTER_COMPONENT("bdev", SPDK_LOG_BDEV) 4094 4095 SPDK_TRACE_REGISTER_FN(bdev_trace) 4096 { 4097 spdk_trace_register_owner(OWNER_BDEV, 'b'); 4098 spdk_trace_register_object(OBJECT_BDEV_IO, 'i'); 4099 spdk_trace_register_description("BDEV_IO_START", "", TRACE_BDEV_IO_START, OWNER_BDEV, 4100 OBJECT_BDEV_IO, 1, 0, "type: "); 4101 spdk_trace_register_description("BDEV_IO_DONE", "", TRACE_BDEV_IO_DONE, OWNER_BDEV, 4102 OBJECT_BDEV_IO, 0, 0, ""); 4103 } 4104