1 /*- 2 * BSD LICENSE 3 * 4 * Copyright (c) Intel Corporation. 5 * All rights reserved. 6 * 7 * Redistribution and use in source and binary forms, with or without 8 * modification, are permitted provided that the following conditions 9 * are met: 10 * 11 * * Redistributions of source code must retain the above copyright 12 * notice, this list of conditions and the following disclaimer. 13 * * Redistributions in binary form must reproduce the above copyright 14 * notice, this list of conditions and the following disclaimer in 15 * the documentation and/or other materials provided with the 16 * distribution. 17 * * Neither the name of Intel Corporation nor the names of its 18 * contributors may be used to endorse or promote products derived 19 * from this software without specific prior written permission. 20 * 21 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 22 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 23 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR 24 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT 25 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, 26 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT 27 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 28 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY 29 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 30 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE 31 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 32 */ 33 34 #include "spdk/stdinc.h" 35 36 #include "spdk/bdev.h" 37 #include "spdk/conf.h" 38 39 #include "spdk/config.h" 40 #include "spdk/env.h" 41 #include "spdk/event.h" 42 #include "spdk/thread.h" 43 #include "spdk/likely.h" 44 #include "spdk/queue.h" 45 #include "spdk/nvme_spec.h" 46 #include "spdk/scsi_spec.h" 47 #include "spdk/util.h" 48 #include "spdk/trace.h" 49 50 #include "spdk/bdev_module.h" 51 #include "spdk_internal/log.h" 52 #include "spdk/string.h" 53 54 #ifdef SPDK_CONFIG_VTUNE 55 #include "ittnotify.h" 56 #include "ittnotify_types.h" 57 int __itt_init_ittlib(const char *, __itt_group_id); 58 #endif 59 60 #define SPDK_BDEV_IO_POOL_SIZE (64 * 1024) 61 #define SPDK_BDEV_IO_CACHE_SIZE 256 62 #define BUF_SMALL_POOL_SIZE 8192 63 #define BUF_LARGE_POOL_SIZE 1024 64 #define NOMEM_THRESHOLD_COUNT 8 65 #define ZERO_BUFFER_SIZE 0x100000 66 67 #define OWNER_BDEV 0x2 68 69 #define OBJECT_BDEV_IO 0x2 70 71 #define TRACE_GROUP_BDEV 0x3 72 #define TRACE_BDEV_IO_START SPDK_TPOINT_ID(TRACE_GROUP_BDEV, 0x0) 73 #define TRACE_BDEV_IO_DONE SPDK_TPOINT_ID(TRACE_GROUP_BDEV, 0x1) 74 75 #define SPDK_BDEV_QOS_TIMESLICE_IN_USEC 1000 76 #define SPDK_BDEV_QOS_MIN_IO_PER_TIMESLICE 1 77 #define SPDK_BDEV_QOS_MIN_BYTE_PER_TIMESLICE 512 78 #define SPDK_BDEV_QOS_MIN_IOS_PER_SEC 10000 79 #define SPDK_BDEV_QOS_MIN_BYTES_PER_SEC (10 * 1024 * 1024) 80 #define SPDK_BDEV_QOS_LIMIT_NOT_DEFINED UINT64_MAX 81 82 #define SPDK_BDEV_POOL_ALIGNMENT 512 83 84 static const char *qos_conf_type[] = {"Limit_IOPS", "Limit_BPS"}; 85 static const char *qos_rpc_type[] = {"rw_ios_per_sec", "rw_mbytes_per_sec"}; 86 87 TAILQ_HEAD(spdk_bdev_list, spdk_bdev); 88 89 struct spdk_bdev_mgr { 90 struct spdk_mempool *bdev_io_pool; 91 92 struct spdk_mempool *buf_small_pool; 93 struct spdk_mempool *buf_large_pool; 94 95 void *zero_buffer; 96 97 TAILQ_HEAD(bdev_module_list, spdk_bdev_module) bdev_modules; 98 99 struct spdk_bdev_list bdevs; 100 101 bool init_complete; 102 bool module_init_complete; 103 104 #ifdef SPDK_CONFIG_VTUNE 105 __itt_domain *domain; 106 #endif 107 }; 108 109 static struct spdk_bdev_mgr g_bdev_mgr = { 110 .bdev_modules = TAILQ_HEAD_INITIALIZER(g_bdev_mgr.bdev_modules), 111 .bdevs = TAILQ_HEAD_INITIALIZER(g_bdev_mgr.bdevs), 112 .init_complete = false, 113 .module_init_complete = false, 114 }; 115 116 static struct spdk_bdev_opts g_bdev_opts = { 117 .bdev_io_pool_size = SPDK_BDEV_IO_POOL_SIZE, 118 .bdev_io_cache_size = SPDK_BDEV_IO_CACHE_SIZE, 119 }; 120 121 static spdk_bdev_init_cb g_init_cb_fn = NULL; 122 static void *g_init_cb_arg = NULL; 123 124 static spdk_bdev_fini_cb g_fini_cb_fn = NULL; 125 static void *g_fini_cb_arg = NULL; 126 static struct spdk_thread *g_fini_thread = NULL; 127 128 struct spdk_bdev_qos_limit { 129 /** IOs or bytes allowed per second (i.e., 1s). */ 130 uint64_t limit; 131 132 /** Remaining IOs or bytes allowed in current timeslice (e.g., 1ms). 133 * For remaining bytes, allowed to run negative if an I/O is submitted when 134 * some bytes are remaining, but the I/O is bigger than that amount. The 135 * excess will be deducted from the next timeslice. 136 */ 137 int64_t remaining_this_timeslice; 138 139 /** Minimum allowed IOs or bytes to be issued in one timeslice (e.g., 1ms). */ 140 uint32_t min_per_timeslice; 141 142 /** Maximum allowed IOs or bytes to be issued in one timeslice (e.g., 1ms). */ 143 uint32_t max_per_timeslice; 144 }; 145 146 struct spdk_bdev_qos { 147 /** Types of structure of rate limits. */ 148 struct spdk_bdev_qos_limit rate_limits[SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES]; 149 150 /** The channel that all I/O are funneled through. */ 151 struct spdk_bdev_channel *ch; 152 153 /** The thread on which the poller is running. */ 154 struct spdk_thread *thread; 155 156 /** Queue of I/O waiting to be issued. */ 157 bdev_io_tailq_t queued; 158 159 /** Size of a timeslice in tsc ticks. */ 160 uint64_t timeslice_size; 161 162 /** Timestamp of start of last timeslice. */ 163 uint64_t last_timeslice; 164 165 /** Poller that processes queued I/O commands each time slice. */ 166 struct spdk_poller *poller; 167 }; 168 169 struct spdk_bdev_mgmt_channel { 170 bdev_io_stailq_t need_buf_small; 171 bdev_io_stailq_t need_buf_large; 172 173 /* 174 * Each thread keeps a cache of bdev_io - this allows 175 * bdev threads which are *not* DPDK threads to still 176 * benefit from a per-thread bdev_io cache. Without 177 * this, non-DPDK threads fetching from the mempool 178 * incur a cmpxchg on get and put. 179 */ 180 bdev_io_stailq_t per_thread_cache; 181 uint32_t per_thread_cache_count; 182 uint32_t bdev_io_cache_size; 183 184 TAILQ_HEAD(, spdk_bdev_shared_resource) shared_resources; 185 TAILQ_HEAD(, spdk_bdev_io_wait_entry) io_wait_queue; 186 }; 187 188 /* 189 * Per-module (or per-io_device) data. Multiple bdevs built on the same io_device 190 * will queue here their IO that awaits retry. It makes it possible to retry sending 191 * IO to one bdev after IO from other bdev completes. 192 */ 193 struct spdk_bdev_shared_resource { 194 /* The bdev management channel */ 195 struct spdk_bdev_mgmt_channel *mgmt_ch; 196 197 /* 198 * Count of I/O submitted to bdev module and waiting for completion. 199 * Incremented before submit_request() is called on an spdk_bdev_io. 200 */ 201 uint64_t io_outstanding; 202 203 /* 204 * Queue of IO awaiting retry because of a previous NOMEM status returned 205 * on this channel. 206 */ 207 bdev_io_tailq_t nomem_io; 208 209 /* 210 * Threshold which io_outstanding must drop to before retrying nomem_io. 211 */ 212 uint64_t nomem_threshold; 213 214 /* I/O channel allocated by a bdev module */ 215 struct spdk_io_channel *shared_ch; 216 217 /* Refcount of bdev channels using this resource */ 218 uint32_t ref; 219 220 TAILQ_ENTRY(spdk_bdev_shared_resource) link; 221 }; 222 223 #define BDEV_CH_RESET_IN_PROGRESS (1 << 0) 224 #define BDEV_CH_QOS_ENABLED (1 << 1) 225 226 struct spdk_bdev_channel { 227 struct spdk_bdev *bdev; 228 229 /* The channel for the underlying device */ 230 struct spdk_io_channel *channel; 231 232 /* Per io_device per thread data */ 233 struct spdk_bdev_shared_resource *shared_resource; 234 235 struct spdk_bdev_io_stat stat; 236 237 /* 238 * Count of I/O submitted through this channel and waiting for completion. 239 * Incremented before submit_request() is called on an spdk_bdev_io. 240 */ 241 uint64_t io_outstanding; 242 243 bdev_io_tailq_t queued_resets; 244 245 uint32_t flags; 246 247 #ifdef SPDK_CONFIG_VTUNE 248 uint64_t start_tsc; 249 uint64_t interval_tsc; 250 __itt_string_handle *handle; 251 struct spdk_bdev_io_stat prev_stat; 252 #endif 253 254 }; 255 256 struct spdk_bdev_desc { 257 struct spdk_bdev *bdev; 258 struct spdk_thread *thread; 259 spdk_bdev_remove_cb_t remove_cb; 260 void *remove_ctx; 261 bool remove_scheduled; 262 bool closed; 263 bool write; 264 TAILQ_ENTRY(spdk_bdev_desc) link; 265 }; 266 267 struct spdk_bdev_iostat_ctx { 268 struct spdk_bdev_io_stat *stat; 269 spdk_bdev_get_device_stat_cb cb; 270 void *cb_arg; 271 }; 272 273 #define __bdev_to_io_dev(bdev) (((char *)bdev) + 1) 274 #define __bdev_from_io_dev(io_dev) ((struct spdk_bdev *)(((char *)io_dev) - 1)) 275 276 static void _spdk_bdev_write_zero_buffer_done(struct spdk_bdev_io *bdev_io, bool success, 277 void *cb_arg); 278 static void _spdk_bdev_write_zero_buffer_next(void *_bdev_io); 279 280 void 281 spdk_bdev_get_opts(struct spdk_bdev_opts *opts) 282 { 283 *opts = g_bdev_opts; 284 } 285 286 int 287 spdk_bdev_set_opts(struct spdk_bdev_opts *opts) 288 { 289 uint32_t min_pool_size; 290 291 /* 292 * Add 1 to the thread count to account for the extra mgmt_ch that gets created during subsystem 293 * initialization. A second mgmt_ch will be created on the same thread when the application starts 294 * but before the deferred put_io_channel event is executed for the first mgmt_ch. 295 */ 296 min_pool_size = opts->bdev_io_cache_size * (spdk_thread_get_count() + 1); 297 if (opts->bdev_io_pool_size < min_pool_size) { 298 SPDK_ERRLOG("bdev_io_pool_size %" PRIu32 " is not compatible with bdev_io_cache_size %" PRIu32 299 " and %" PRIu32 " threads\n", opts->bdev_io_pool_size, opts->bdev_io_cache_size, 300 spdk_thread_get_count()); 301 SPDK_ERRLOG("bdev_io_pool_size must be at least %" PRIu32 "\n", min_pool_size); 302 return -1; 303 } 304 305 g_bdev_opts = *opts; 306 return 0; 307 } 308 309 struct spdk_bdev * 310 spdk_bdev_first(void) 311 { 312 struct spdk_bdev *bdev; 313 314 bdev = TAILQ_FIRST(&g_bdev_mgr.bdevs); 315 if (bdev) { 316 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Starting bdev iteration at %s\n", bdev->name); 317 } 318 319 return bdev; 320 } 321 322 struct spdk_bdev * 323 spdk_bdev_next(struct spdk_bdev *prev) 324 { 325 struct spdk_bdev *bdev; 326 327 bdev = TAILQ_NEXT(prev, internal.link); 328 if (bdev) { 329 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Continuing bdev iteration at %s\n", bdev->name); 330 } 331 332 return bdev; 333 } 334 335 static struct spdk_bdev * 336 _bdev_next_leaf(struct spdk_bdev *bdev) 337 { 338 while (bdev != NULL) { 339 if (bdev->internal.claim_module == NULL) { 340 return bdev; 341 } else { 342 bdev = TAILQ_NEXT(bdev, internal.link); 343 } 344 } 345 346 return bdev; 347 } 348 349 struct spdk_bdev * 350 spdk_bdev_first_leaf(void) 351 { 352 struct spdk_bdev *bdev; 353 354 bdev = _bdev_next_leaf(TAILQ_FIRST(&g_bdev_mgr.bdevs)); 355 356 if (bdev) { 357 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Starting bdev iteration at %s\n", bdev->name); 358 } 359 360 return bdev; 361 } 362 363 struct spdk_bdev * 364 spdk_bdev_next_leaf(struct spdk_bdev *prev) 365 { 366 struct spdk_bdev *bdev; 367 368 bdev = _bdev_next_leaf(TAILQ_NEXT(prev, internal.link)); 369 370 if (bdev) { 371 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Continuing bdev iteration at %s\n", bdev->name); 372 } 373 374 return bdev; 375 } 376 377 struct spdk_bdev * 378 spdk_bdev_get_by_name(const char *bdev_name) 379 { 380 struct spdk_bdev_alias *tmp; 381 struct spdk_bdev *bdev = spdk_bdev_first(); 382 383 while (bdev != NULL) { 384 if (strcmp(bdev_name, bdev->name) == 0) { 385 return bdev; 386 } 387 388 TAILQ_FOREACH(tmp, &bdev->aliases, tailq) { 389 if (strcmp(bdev_name, tmp->alias) == 0) { 390 return bdev; 391 } 392 } 393 394 bdev = spdk_bdev_next(bdev); 395 } 396 397 return NULL; 398 } 399 400 void 401 spdk_bdev_io_set_buf(struct spdk_bdev_io *bdev_io, void *buf, size_t len) 402 { 403 struct iovec *iovs; 404 405 iovs = bdev_io->u.bdev.iovs; 406 407 assert(iovs != NULL); 408 assert(bdev_io->u.bdev.iovcnt >= 1); 409 410 iovs[0].iov_base = buf; 411 iovs[0].iov_len = len; 412 } 413 414 static bool 415 _is_buf_allocated(struct iovec *iovs) 416 { 417 return iovs[0].iov_base != NULL; 418 } 419 420 static bool 421 _are_iovs_aligned(struct iovec *iovs, int iovcnt, uint32_t alignment) 422 { 423 int i; 424 uintptr_t iov_base; 425 426 if (spdk_likely(alignment == 1)) { 427 return true; 428 } 429 430 for (i = 0; i < iovcnt; i++) { 431 iov_base = (uintptr_t)iovs[i].iov_base; 432 if ((iov_base & (alignment - 1)) != 0) { 433 return false; 434 } 435 } 436 437 return true; 438 } 439 440 static void 441 _copy_iovs_to_buf(void *buf, size_t buf_len, struct iovec *iovs, int iovcnt) 442 { 443 int i; 444 size_t len; 445 446 for (i = 0; i < iovcnt; i++) { 447 len = spdk_min(iovs[i].iov_len, buf_len); 448 memcpy(buf, iovs[i].iov_base, len); 449 buf += len; 450 buf_len -= len; 451 } 452 } 453 454 static void 455 _copy_buf_to_iovs(struct iovec *iovs, int iovcnt, void *buf, size_t buf_len) 456 { 457 int i; 458 size_t len; 459 460 for (i = 0; i < iovcnt; i++) { 461 len = spdk_min(iovs[i].iov_len, buf_len); 462 memcpy(iovs[i].iov_base, buf, len); 463 buf += len; 464 buf_len -= len; 465 } 466 } 467 468 static void 469 _bdev_io_set_bounce_buf(struct spdk_bdev_io *bdev_io, void *buf, size_t len) 470 { 471 /* save original iovec */ 472 bdev_io->internal.orig_iovs = bdev_io->u.bdev.iovs; 473 bdev_io->internal.orig_iovcnt = bdev_io->u.bdev.iovcnt; 474 /* set bounce iov */ 475 bdev_io->u.bdev.iovs = &bdev_io->internal.bounce_iov; 476 bdev_io->u.bdev.iovcnt = 1; 477 /* set bounce buffer for this operation */ 478 bdev_io->u.bdev.iovs[0].iov_base = buf; 479 bdev_io->u.bdev.iovs[0].iov_len = len; 480 /* if this is write path, copy data from original buffer to bounce buffer */ 481 if (bdev_io->type == SPDK_BDEV_IO_TYPE_WRITE) { 482 _copy_iovs_to_buf(buf, len, bdev_io->internal.orig_iovs, bdev_io->internal.orig_iovcnt); 483 } 484 } 485 486 static void 487 spdk_bdev_io_put_buf(struct spdk_bdev_io *bdev_io) 488 { 489 struct spdk_mempool *pool; 490 struct spdk_bdev_io *tmp; 491 void *buf, *aligned_buf; 492 bdev_io_stailq_t *stailq; 493 struct spdk_bdev_mgmt_channel *ch; 494 uint64_t buf_len; 495 uint64_t alignment; 496 bool buf_allocated; 497 498 buf = bdev_io->internal.buf; 499 buf_len = bdev_io->internal.buf_len; 500 alignment = spdk_bdev_get_buf_align(bdev_io->bdev); 501 ch = bdev_io->internal.ch->shared_resource->mgmt_ch; 502 503 bdev_io->internal.buf = NULL; 504 505 if (buf_len + alignment <= SPDK_BDEV_SMALL_BUF_MAX_SIZE + SPDK_BDEV_POOL_ALIGNMENT) { 506 pool = g_bdev_mgr.buf_small_pool; 507 stailq = &ch->need_buf_small; 508 } else { 509 pool = g_bdev_mgr.buf_large_pool; 510 stailq = &ch->need_buf_large; 511 } 512 513 if (STAILQ_EMPTY(stailq)) { 514 spdk_mempool_put(pool, buf); 515 } else { 516 tmp = STAILQ_FIRST(stailq); 517 518 alignment = spdk_bdev_get_buf_align(tmp->bdev); 519 buf_allocated = _is_buf_allocated(tmp->u.bdev.iovs); 520 521 aligned_buf = (void *)(((uintptr_t)buf + 522 (alignment - 1)) & ~(alignment - 1)); 523 if (buf_allocated) { 524 _bdev_io_set_bounce_buf(tmp, aligned_buf, tmp->internal.buf_len); 525 } else { 526 spdk_bdev_io_set_buf(tmp, aligned_buf, tmp->internal.buf_len); 527 } 528 529 STAILQ_REMOVE_HEAD(stailq, internal.buf_link); 530 tmp->internal.buf = buf; 531 tmp->internal.get_buf_cb(tmp->internal.ch->channel, tmp); 532 } 533 } 534 535 static void 536 _bdev_io_unset_bounce_buf(struct spdk_bdev_io *bdev_io) 537 { 538 /* if this is read path, copy data from bounce buffer to original buffer */ 539 if (bdev_io->type == SPDK_BDEV_IO_TYPE_READ && 540 bdev_io->internal.status == SPDK_BDEV_IO_STATUS_SUCCESS) { 541 _copy_buf_to_iovs(bdev_io->internal.orig_iovs, bdev_io->internal.orig_iovcnt, 542 bdev_io->internal.bounce_iov.iov_base, bdev_io->internal.bounce_iov.iov_len); 543 } 544 /* set orignal buffer for this io */ 545 bdev_io->u.bdev.iovcnt = bdev_io->internal.orig_iovcnt; 546 bdev_io->u.bdev.iovs = bdev_io->internal.orig_iovs; 547 /* disable bouncing buffer for this io */ 548 bdev_io->internal.orig_iovcnt = 0; 549 bdev_io->internal.orig_iovs = NULL; 550 /* return bounce buffer to the pool */ 551 spdk_bdev_io_put_buf(bdev_io); 552 } 553 554 void 555 spdk_bdev_io_get_buf(struct spdk_bdev_io *bdev_io, spdk_bdev_io_get_buf_cb cb, uint64_t len) 556 { 557 struct spdk_mempool *pool; 558 bdev_io_stailq_t *stailq; 559 void *buf, *aligned_buf; 560 struct spdk_bdev_mgmt_channel *mgmt_ch; 561 uint64_t alignment; 562 bool buf_allocated; 563 564 assert(cb != NULL); 565 assert(bdev_io->u.bdev.iovs != NULL); 566 567 alignment = spdk_bdev_get_buf_align(bdev_io->bdev); 568 buf_allocated = _is_buf_allocated(bdev_io->u.bdev.iovs); 569 570 if (buf_allocated && 571 _are_iovs_aligned(bdev_io->u.bdev.iovs, bdev_io->u.bdev.iovcnt, alignment)) { 572 /* Buffer already present and aligned */ 573 cb(bdev_io->internal.ch->channel, bdev_io); 574 return; 575 } 576 577 assert(len + alignment <= SPDK_BDEV_LARGE_BUF_MAX_SIZE + SPDK_BDEV_POOL_ALIGNMENT); 578 mgmt_ch = bdev_io->internal.ch->shared_resource->mgmt_ch; 579 580 bdev_io->internal.buf_len = len; 581 bdev_io->internal.get_buf_cb = cb; 582 583 if (len + alignment <= SPDK_BDEV_SMALL_BUF_MAX_SIZE + SPDK_BDEV_POOL_ALIGNMENT) { 584 pool = g_bdev_mgr.buf_small_pool; 585 stailq = &mgmt_ch->need_buf_small; 586 } else { 587 pool = g_bdev_mgr.buf_large_pool; 588 stailq = &mgmt_ch->need_buf_large; 589 } 590 591 buf = spdk_mempool_get(pool); 592 593 if (!buf) { 594 STAILQ_INSERT_TAIL(stailq, bdev_io, internal.buf_link); 595 } else { 596 aligned_buf = (void *)(((uintptr_t)buf + (alignment - 1)) & ~(alignment - 1)); 597 598 if (buf_allocated) { 599 _bdev_io_set_bounce_buf(bdev_io, aligned_buf, len); 600 } else { 601 spdk_bdev_io_set_buf(bdev_io, aligned_buf, len); 602 } 603 bdev_io->internal.buf = buf; 604 bdev_io->internal.get_buf_cb(bdev_io->internal.ch->channel, bdev_io); 605 } 606 } 607 608 static int 609 spdk_bdev_module_get_max_ctx_size(void) 610 { 611 struct spdk_bdev_module *bdev_module; 612 int max_bdev_module_size = 0; 613 614 TAILQ_FOREACH(bdev_module, &g_bdev_mgr.bdev_modules, internal.tailq) { 615 if (bdev_module->get_ctx_size && bdev_module->get_ctx_size() > max_bdev_module_size) { 616 max_bdev_module_size = bdev_module->get_ctx_size(); 617 } 618 } 619 620 return max_bdev_module_size; 621 } 622 623 void 624 spdk_bdev_config_text(FILE *fp) 625 { 626 struct spdk_bdev_module *bdev_module; 627 628 TAILQ_FOREACH(bdev_module, &g_bdev_mgr.bdev_modules, internal.tailq) { 629 if (bdev_module->config_text) { 630 bdev_module->config_text(fp); 631 } 632 } 633 } 634 635 static void 636 spdk_bdev_qos_config_json(struct spdk_bdev *bdev, struct spdk_json_write_ctx *w) 637 { 638 int i; 639 struct spdk_bdev_qos *qos = bdev->internal.qos; 640 uint64_t limits[SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES]; 641 642 if (!qos) { 643 return; 644 } 645 646 spdk_bdev_get_qos_rate_limits(bdev, limits); 647 648 spdk_json_write_object_begin(w); 649 spdk_json_write_named_string(w, "method", "set_bdev_qos_limit"); 650 spdk_json_write_name(w, "params"); 651 652 spdk_json_write_object_begin(w); 653 spdk_json_write_named_string(w, "name", bdev->name); 654 for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) { 655 if (limits[i] > 0) { 656 spdk_json_write_named_uint64(w, qos_rpc_type[i], limits[i]); 657 } 658 } 659 spdk_json_write_object_end(w); 660 661 spdk_json_write_object_end(w); 662 } 663 664 void 665 spdk_bdev_subsystem_config_json(struct spdk_json_write_ctx *w) 666 { 667 struct spdk_bdev_module *bdev_module; 668 struct spdk_bdev *bdev; 669 670 assert(w != NULL); 671 672 spdk_json_write_array_begin(w); 673 674 spdk_json_write_object_begin(w); 675 spdk_json_write_named_string(w, "method", "set_bdev_options"); 676 spdk_json_write_name(w, "params"); 677 spdk_json_write_object_begin(w); 678 spdk_json_write_named_uint32(w, "bdev_io_pool_size", g_bdev_opts.bdev_io_pool_size); 679 spdk_json_write_named_uint32(w, "bdev_io_cache_size", g_bdev_opts.bdev_io_cache_size); 680 spdk_json_write_object_end(w); 681 spdk_json_write_object_end(w); 682 683 TAILQ_FOREACH(bdev_module, &g_bdev_mgr.bdev_modules, internal.tailq) { 684 if (bdev_module->config_json) { 685 bdev_module->config_json(w); 686 } 687 } 688 689 TAILQ_FOREACH(bdev, &g_bdev_mgr.bdevs, internal.link) { 690 spdk_bdev_qos_config_json(bdev, w); 691 692 if (bdev->fn_table->write_config_json) { 693 bdev->fn_table->write_config_json(bdev, w); 694 } 695 } 696 697 spdk_json_write_array_end(w); 698 } 699 700 static int 701 spdk_bdev_mgmt_channel_create(void *io_device, void *ctx_buf) 702 { 703 struct spdk_bdev_mgmt_channel *ch = ctx_buf; 704 struct spdk_bdev_io *bdev_io; 705 uint32_t i; 706 707 STAILQ_INIT(&ch->need_buf_small); 708 STAILQ_INIT(&ch->need_buf_large); 709 710 STAILQ_INIT(&ch->per_thread_cache); 711 ch->bdev_io_cache_size = g_bdev_opts.bdev_io_cache_size; 712 713 /* Pre-populate bdev_io cache to ensure this thread cannot be starved. */ 714 ch->per_thread_cache_count = 0; 715 for (i = 0; i < ch->bdev_io_cache_size; i++) { 716 bdev_io = spdk_mempool_get(g_bdev_mgr.bdev_io_pool); 717 assert(bdev_io != NULL); 718 ch->per_thread_cache_count++; 719 STAILQ_INSERT_TAIL(&ch->per_thread_cache, bdev_io, internal.buf_link); 720 } 721 722 TAILQ_INIT(&ch->shared_resources); 723 TAILQ_INIT(&ch->io_wait_queue); 724 725 return 0; 726 } 727 728 static void 729 spdk_bdev_mgmt_channel_destroy(void *io_device, void *ctx_buf) 730 { 731 struct spdk_bdev_mgmt_channel *ch = ctx_buf; 732 struct spdk_bdev_io *bdev_io; 733 734 if (!STAILQ_EMPTY(&ch->need_buf_small) || !STAILQ_EMPTY(&ch->need_buf_large)) { 735 SPDK_ERRLOG("Pending I/O list wasn't empty on mgmt channel free\n"); 736 } 737 738 if (!TAILQ_EMPTY(&ch->shared_resources)) { 739 SPDK_ERRLOG("Module channel list wasn't empty on mgmt channel free\n"); 740 } 741 742 while (!STAILQ_EMPTY(&ch->per_thread_cache)) { 743 bdev_io = STAILQ_FIRST(&ch->per_thread_cache); 744 STAILQ_REMOVE_HEAD(&ch->per_thread_cache, internal.buf_link); 745 ch->per_thread_cache_count--; 746 spdk_mempool_put(g_bdev_mgr.bdev_io_pool, (void *)bdev_io); 747 } 748 749 assert(ch->per_thread_cache_count == 0); 750 } 751 752 static void 753 spdk_bdev_init_complete(int rc) 754 { 755 spdk_bdev_init_cb cb_fn = g_init_cb_fn; 756 void *cb_arg = g_init_cb_arg; 757 struct spdk_bdev_module *m; 758 759 g_bdev_mgr.init_complete = true; 760 g_init_cb_fn = NULL; 761 g_init_cb_arg = NULL; 762 763 /* 764 * For modules that need to know when subsystem init is complete, 765 * inform them now. 766 */ 767 if (rc == 0) { 768 TAILQ_FOREACH(m, &g_bdev_mgr.bdev_modules, internal.tailq) { 769 if (m->init_complete) { 770 m->init_complete(); 771 } 772 } 773 } 774 775 cb_fn(cb_arg, rc); 776 } 777 778 static void 779 spdk_bdev_module_action_complete(void) 780 { 781 struct spdk_bdev_module *m; 782 783 /* 784 * Don't finish bdev subsystem initialization if 785 * module pre-initialization is still in progress, or 786 * the subsystem been already initialized. 787 */ 788 if (!g_bdev_mgr.module_init_complete || g_bdev_mgr.init_complete) { 789 return; 790 } 791 792 /* 793 * Check all bdev modules for inits/examinations in progress. If any 794 * exist, return immediately since we cannot finish bdev subsystem 795 * initialization until all are completed. 796 */ 797 TAILQ_FOREACH(m, &g_bdev_mgr.bdev_modules, internal.tailq) { 798 if (m->internal.action_in_progress > 0) { 799 return; 800 } 801 } 802 803 /* 804 * Modules already finished initialization - now that all 805 * the bdev modules have finished their asynchronous I/O 806 * processing, the entire bdev layer can be marked as complete. 807 */ 808 spdk_bdev_init_complete(0); 809 } 810 811 static void 812 spdk_bdev_module_action_done(struct spdk_bdev_module *module) 813 { 814 assert(module->internal.action_in_progress > 0); 815 module->internal.action_in_progress--; 816 spdk_bdev_module_action_complete(); 817 } 818 819 void 820 spdk_bdev_module_init_done(struct spdk_bdev_module *module) 821 { 822 spdk_bdev_module_action_done(module); 823 } 824 825 void 826 spdk_bdev_module_examine_done(struct spdk_bdev_module *module) 827 { 828 spdk_bdev_module_action_done(module); 829 } 830 831 /** The last initialized bdev module */ 832 static struct spdk_bdev_module *g_resume_bdev_module = NULL; 833 834 static int 835 spdk_bdev_modules_init(void) 836 { 837 struct spdk_bdev_module *module; 838 int rc = 0; 839 840 TAILQ_FOREACH(module, &g_bdev_mgr.bdev_modules, internal.tailq) { 841 g_resume_bdev_module = module; 842 rc = module->module_init(); 843 if (rc != 0) { 844 return rc; 845 } 846 } 847 848 g_resume_bdev_module = NULL; 849 return 0; 850 } 851 852 853 static void 854 spdk_bdev_init_failed_complete(void *cb_arg) 855 { 856 spdk_bdev_init_complete(-1); 857 } 858 859 static void 860 spdk_bdev_init_failed(void *cb_arg) 861 { 862 spdk_bdev_finish(spdk_bdev_init_failed_complete, NULL); 863 } 864 865 void 866 spdk_bdev_initialize(spdk_bdev_init_cb cb_fn, void *cb_arg) 867 { 868 struct spdk_conf_section *sp; 869 struct spdk_bdev_opts bdev_opts; 870 int32_t bdev_io_pool_size, bdev_io_cache_size; 871 int cache_size; 872 int rc = 0; 873 char mempool_name[32]; 874 875 assert(cb_fn != NULL); 876 877 sp = spdk_conf_find_section(NULL, "Bdev"); 878 if (sp != NULL) { 879 spdk_bdev_get_opts(&bdev_opts); 880 881 bdev_io_pool_size = spdk_conf_section_get_intval(sp, "BdevIoPoolSize"); 882 if (bdev_io_pool_size >= 0) { 883 bdev_opts.bdev_io_pool_size = bdev_io_pool_size; 884 } 885 886 bdev_io_cache_size = spdk_conf_section_get_intval(sp, "BdevIoCacheSize"); 887 if (bdev_io_cache_size >= 0) { 888 bdev_opts.bdev_io_cache_size = bdev_io_cache_size; 889 } 890 891 if (spdk_bdev_set_opts(&bdev_opts)) { 892 spdk_bdev_init_complete(-1); 893 return; 894 } 895 896 assert(memcmp(&bdev_opts, &g_bdev_opts, sizeof(bdev_opts)) == 0); 897 } 898 899 g_init_cb_fn = cb_fn; 900 g_init_cb_arg = cb_arg; 901 902 snprintf(mempool_name, sizeof(mempool_name), "bdev_io_%d", getpid()); 903 904 g_bdev_mgr.bdev_io_pool = spdk_mempool_create(mempool_name, 905 g_bdev_opts.bdev_io_pool_size, 906 sizeof(struct spdk_bdev_io) + 907 spdk_bdev_module_get_max_ctx_size(), 908 0, 909 SPDK_ENV_SOCKET_ID_ANY); 910 911 if (g_bdev_mgr.bdev_io_pool == NULL) { 912 SPDK_ERRLOG("could not allocate spdk_bdev_io pool\n"); 913 spdk_bdev_init_complete(-1); 914 return; 915 } 916 917 /** 918 * Ensure no more than half of the total buffers end up local caches, by 919 * using spdk_thread_get_count() to determine how many local caches we need 920 * to account for. 921 */ 922 cache_size = BUF_SMALL_POOL_SIZE / (2 * spdk_thread_get_count()); 923 snprintf(mempool_name, sizeof(mempool_name), "buf_small_pool_%d", getpid()); 924 925 g_bdev_mgr.buf_small_pool = spdk_mempool_create(mempool_name, 926 BUF_SMALL_POOL_SIZE, 927 SPDK_BDEV_SMALL_BUF_MAX_SIZE + SPDK_BDEV_POOL_ALIGNMENT, 928 cache_size, 929 SPDK_ENV_SOCKET_ID_ANY); 930 if (!g_bdev_mgr.buf_small_pool) { 931 SPDK_ERRLOG("create rbuf small pool failed\n"); 932 spdk_bdev_init_complete(-1); 933 return; 934 } 935 936 cache_size = BUF_LARGE_POOL_SIZE / (2 * spdk_thread_get_count()); 937 snprintf(mempool_name, sizeof(mempool_name), "buf_large_pool_%d", getpid()); 938 939 g_bdev_mgr.buf_large_pool = spdk_mempool_create(mempool_name, 940 BUF_LARGE_POOL_SIZE, 941 SPDK_BDEV_LARGE_BUF_MAX_SIZE + SPDK_BDEV_POOL_ALIGNMENT, 942 cache_size, 943 SPDK_ENV_SOCKET_ID_ANY); 944 if (!g_bdev_mgr.buf_large_pool) { 945 SPDK_ERRLOG("create rbuf large pool failed\n"); 946 spdk_bdev_init_complete(-1); 947 return; 948 } 949 950 g_bdev_mgr.zero_buffer = spdk_dma_zmalloc(ZERO_BUFFER_SIZE, ZERO_BUFFER_SIZE, 951 NULL); 952 if (!g_bdev_mgr.zero_buffer) { 953 SPDK_ERRLOG("create bdev zero buffer failed\n"); 954 spdk_bdev_init_complete(-1); 955 return; 956 } 957 958 #ifdef SPDK_CONFIG_VTUNE 959 g_bdev_mgr.domain = __itt_domain_create("spdk_bdev"); 960 #endif 961 962 spdk_io_device_register(&g_bdev_mgr, spdk_bdev_mgmt_channel_create, 963 spdk_bdev_mgmt_channel_destroy, 964 sizeof(struct spdk_bdev_mgmt_channel), 965 "bdev_mgr"); 966 967 rc = spdk_bdev_modules_init(); 968 g_bdev_mgr.module_init_complete = true; 969 if (rc != 0) { 970 SPDK_ERRLOG("bdev modules init failed\n"); 971 spdk_thread_send_msg(spdk_get_thread(), spdk_bdev_init_failed, NULL); 972 return; 973 } 974 975 spdk_bdev_module_action_complete(); 976 } 977 978 static void 979 spdk_bdev_mgr_unregister_cb(void *io_device) 980 { 981 spdk_bdev_fini_cb cb_fn = g_fini_cb_fn; 982 983 if (spdk_mempool_count(g_bdev_mgr.bdev_io_pool) != g_bdev_opts.bdev_io_pool_size) { 984 SPDK_ERRLOG("bdev IO pool count is %zu but should be %u\n", 985 spdk_mempool_count(g_bdev_mgr.bdev_io_pool), 986 g_bdev_opts.bdev_io_pool_size); 987 } 988 989 if (spdk_mempool_count(g_bdev_mgr.buf_small_pool) != BUF_SMALL_POOL_SIZE) { 990 SPDK_ERRLOG("Small buffer pool count is %zu but should be %u\n", 991 spdk_mempool_count(g_bdev_mgr.buf_small_pool), 992 BUF_SMALL_POOL_SIZE); 993 assert(false); 994 } 995 996 if (spdk_mempool_count(g_bdev_mgr.buf_large_pool) != BUF_LARGE_POOL_SIZE) { 997 SPDK_ERRLOG("Large buffer pool count is %zu but should be %u\n", 998 spdk_mempool_count(g_bdev_mgr.buf_large_pool), 999 BUF_LARGE_POOL_SIZE); 1000 assert(false); 1001 } 1002 1003 spdk_mempool_free(g_bdev_mgr.bdev_io_pool); 1004 spdk_mempool_free(g_bdev_mgr.buf_small_pool); 1005 spdk_mempool_free(g_bdev_mgr.buf_large_pool); 1006 spdk_dma_free(g_bdev_mgr.zero_buffer); 1007 1008 cb_fn(g_fini_cb_arg); 1009 g_fini_cb_fn = NULL; 1010 g_fini_cb_arg = NULL; 1011 g_bdev_mgr.init_complete = false; 1012 g_bdev_mgr.module_init_complete = false; 1013 } 1014 1015 static void 1016 spdk_bdev_module_finish_iter(void *arg) 1017 { 1018 struct spdk_bdev_module *bdev_module; 1019 1020 /* Start iterating from the last touched module */ 1021 if (!g_resume_bdev_module) { 1022 bdev_module = TAILQ_LAST(&g_bdev_mgr.bdev_modules, bdev_module_list); 1023 } else { 1024 bdev_module = TAILQ_PREV(g_resume_bdev_module, bdev_module_list, 1025 internal.tailq); 1026 } 1027 1028 while (bdev_module) { 1029 if (bdev_module->async_fini) { 1030 /* Save our place so we can resume later. We must 1031 * save the variable here, before calling module_fini() 1032 * below, because in some cases the module may immediately 1033 * call spdk_bdev_module_finish_done() and re-enter 1034 * this function to continue iterating. */ 1035 g_resume_bdev_module = bdev_module; 1036 } 1037 1038 if (bdev_module->module_fini) { 1039 bdev_module->module_fini(); 1040 } 1041 1042 if (bdev_module->async_fini) { 1043 return; 1044 } 1045 1046 bdev_module = TAILQ_PREV(bdev_module, bdev_module_list, 1047 internal.tailq); 1048 } 1049 1050 g_resume_bdev_module = NULL; 1051 spdk_io_device_unregister(&g_bdev_mgr, spdk_bdev_mgr_unregister_cb); 1052 } 1053 1054 void 1055 spdk_bdev_module_finish_done(void) 1056 { 1057 if (spdk_get_thread() != g_fini_thread) { 1058 spdk_thread_send_msg(g_fini_thread, spdk_bdev_module_finish_iter, NULL); 1059 } else { 1060 spdk_bdev_module_finish_iter(NULL); 1061 } 1062 } 1063 1064 static void 1065 _spdk_bdev_finish_unregister_bdevs_iter(void *cb_arg, int bdeverrno) 1066 { 1067 struct spdk_bdev *bdev = cb_arg; 1068 1069 if (bdeverrno && bdev) { 1070 SPDK_WARNLOG("Unable to unregister bdev '%s' during spdk_bdev_finish()\n", 1071 bdev->name); 1072 1073 /* 1074 * Since the call to spdk_bdev_unregister() failed, we have no way to free this 1075 * bdev; try to continue by manually removing this bdev from the list and continue 1076 * with the next bdev in the list. 1077 */ 1078 TAILQ_REMOVE(&g_bdev_mgr.bdevs, bdev, internal.link); 1079 } 1080 1081 if (TAILQ_EMPTY(&g_bdev_mgr.bdevs)) { 1082 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Done unregistering bdevs\n"); 1083 /* 1084 * Bdev module finish need to be deferred as we might be in the middle of some context 1085 * (like bdev part free) that will use this bdev (or private bdev driver ctx data) 1086 * after returning. 1087 */ 1088 spdk_thread_send_msg(spdk_get_thread(), spdk_bdev_module_finish_iter, NULL); 1089 return; 1090 } 1091 1092 /* 1093 * Unregister last unclaimed bdev in the list, to ensure that bdev subsystem 1094 * shutdown proceeds top-down. The goal is to give virtual bdevs an opportunity 1095 * to detect clean shutdown as opposed to run-time hot removal of the underlying 1096 * base bdevs. 1097 * 1098 * Also, walk the list in the reverse order. 1099 */ 1100 for (bdev = TAILQ_LAST(&g_bdev_mgr.bdevs, spdk_bdev_list); 1101 bdev; bdev = TAILQ_PREV(bdev, spdk_bdev_list, internal.link)) { 1102 if (bdev->internal.claim_module != NULL) { 1103 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Skipping claimed bdev '%s'(<-'%s').\n", 1104 bdev->name, bdev->internal.claim_module->name); 1105 continue; 1106 } 1107 1108 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Unregistering bdev '%s'\n", bdev->name); 1109 spdk_bdev_unregister(bdev, _spdk_bdev_finish_unregister_bdevs_iter, bdev); 1110 return; 1111 } 1112 1113 /* 1114 * If any bdev fails to unclaim underlying bdev properly, we may face the 1115 * case of bdev list consisting of claimed bdevs only (if claims are managed 1116 * correctly, this would mean there's a loop in the claims graph which is 1117 * clearly impossible). Warn and unregister last bdev on the list then. 1118 */ 1119 for (bdev = TAILQ_LAST(&g_bdev_mgr.bdevs, spdk_bdev_list); 1120 bdev; bdev = TAILQ_PREV(bdev, spdk_bdev_list, internal.link)) { 1121 SPDK_ERRLOG("Unregistering claimed bdev '%s'!\n", bdev->name); 1122 spdk_bdev_unregister(bdev, _spdk_bdev_finish_unregister_bdevs_iter, bdev); 1123 return; 1124 } 1125 } 1126 1127 void 1128 spdk_bdev_finish(spdk_bdev_fini_cb cb_fn, void *cb_arg) 1129 { 1130 struct spdk_bdev_module *m; 1131 1132 assert(cb_fn != NULL); 1133 1134 g_fini_thread = spdk_get_thread(); 1135 1136 g_fini_cb_fn = cb_fn; 1137 g_fini_cb_arg = cb_arg; 1138 1139 TAILQ_FOREACH(m, &g_bdev_mgr.bdev_modules, internal.tailq) { 1140 if (m->fini_start) { 1141 m->fini_start(); 1142 } 1143 } 1144 1145 _spdk_bdev_finish_unregister_bdevs_iter(NULL, 0); 1146 } 1147 1148 static struct spdk_bdev_io * 1149 spdk_bdev_get_io(struct spdk_bdev_channel *channel) 1150 { 1151 struct spdk_bdev_mgmt_channel *ch = channel->shared_resource->mgmt_ch; 1152 struct spdk_bdev_io *bdev_io; 1153 1154 if (ch->per_thread_cache_count > 0) { 1155 bdev_io = STAILQ_FIRST(&ch->per_thread_cache); 1156 STAILQ_REMOVE_HEAD(&ch->per_thread_cache, internal.buf_link); 1157 ch->per_thread_cache_count--; 1158 } else if (spdk_unlikely(!TAILQ_EMPTY(&ch->io_wait_queue))) { 1159 /* 1160 * Don't try to look for bdev_ios in the global pool if there are 1161 * waiters on bdev_ios - we don't want this caller to jump the line. 1162 */ 1163 bdev_io = NULL; 1164 } else { 1165 bdev_io = spdk_mempool_get(g_bdev_mgr.bdev_io_pool); 1166 } 1167 1168 return bdev_io; 1169 } 1170 1171 void 1172 spdk_bdev_free_io(struct spdk_bdev_io *bdev_io) 1173 { 1174 struct spdk_bdev_mgmt_channel *ch = bdev_io->internal.ch->shared_resource->mgmt_ch; 1175 1176 assert(bdev_io != NULL); 1177 assert(bdev_io->internal.status != SPDK_BDEV_IO_STATUS_PENDING); 1178 1179 if (bdev_io->internal.buf != NULL) { 1180 spdk_bdev_io_put_buf(bdev_io); 1181 } 1182 1183 if (ch->per_thread_cache_count < ch->bdev_io_cache_size) { 1184 ch->per_thread_cache_count++; 1185 STAILQ_INSERT_TAIL(&ch->per_thread_cache, bdev_io, internal.buf_link); 1186 while (ch->per_thread_cache_count > 0 && !TAILQ_EMPTY(&ch->io_wait_queue)) { 1187 struct spdk_bdev_io_wait_entry *entry; 1188 1189 entry = TAILQ_FIRST(&ch->io_wait_queue); 1190 TAILQ_REMOVE(&ch->io_wait_queue, entry, link); 1191 entry->cb_fn(entry->cb_arg); 1192 } 1193 } else { 1194 /* We should never have a full cache with entries on the io wait queue. */ 1195 assert(TAILQ_EMPTY(&ch->io_wait_queue)); 1196 spdk_mempool_put(g_bdev_mgr.bdev_io_pool, (void *)bdev_io); 1197 } 1198 } 1199 1200 static bool 1201 _spdk_bdev_qos_is_iops_rate_limit(enum spdk_bdev_qos_rate_limit_type limit) 1202 { 1203 assert(limit != SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES); 1204 1205 switch (limit) { 1206 case SPDK_BDEV_QOS_RW_IOPS_RATE_LIMIT: 1207 return true; 1208 case SPDK_BDEV_QOS_RW_BPS_RATE_LIMIT: 1209 return false; 1210 case SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES: 1211 default: 1212 return false; 1213 } 1214 } 1215 1216 static bool 1217 _spdk_bdev_qos_io_to_limit(struct spdk_bdev_io *bdev_io) 1218 { 1219 switch (bdev_io->type) { 1220 case SPDK_BDEV_IO_TYPE_NVME_IO: 1221 case SPDK_BDEV_IO_TYPE_NVME_IO_MD: 1222 case SPDK_BDEV_IO_TYPE_READ: 1223 case SPDK_BDEV_IO_TYPE_WRITE: 1224 case SPDK_BDEV_IO_TYPE_UNMAP: 1225 case SPDK_BDEV_IO_TYPE_WRITE_ZEROES: 1226 return true; 1227 default: 1228 return false; 1229 } 1230 } 1231 1232 static uint64_t 1233 _spdk_bdev_get_io_size_in_byte(struct spdk_bdev_io *bdev_io) 1234 { 1235 struct spdk_bdev *bdev = bdev_io->bdev; 1236 1237 switch (bdev_io->type) { 1238 case SPDK_BDEV_IO_TYPE_NVME_IO: 1239 case SPDK_BDEV_IO_TYPE_NVME_IO_MD: 1240 return bdev_io->u.nvme_passthru.nbytes; 1241 case SPDK_BDEV_IO_TYPE_READ: 1242 case SPDK_BDEV_IO_TYPE_WRITE: 1243 case SPDK_BDEV_IO_TYPE_UNMAP: 1244 case SPDK_BDEV_IO_TYPE_WRITE_ZEROES: 1245 return bdev_io->u.bdev.num_blocks * bdev->blocklen; 1246 default: 1247 return 0; 1248 } 1249 } 1250 1251 static void 1252 _spdk_bdev_qos_update_per_io(struct spdk_bdev_qos *qos, uint64_t io_size_in_byte) 1253 { 1254 int i; 1255 1256 for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) { 1257 if (qos->rate_limits[i].limit == SPDK_BDEV_QOS_LIMIT_NOT_DEFINED) { 1258 continue; 1259 } 1260 1261 switch (i) { 1262 case SPDK_BDEV_QOS_RW_IOPS_RATE_LIMIT: 1263 qos->rate_limits[i].remaining_this_timeslice--; 1264 break; 1265 case SPDK_BDEV_QOS_RW_BPS_RATE_LIMIT: 1266 qos->rate_limits[i].remaining_this_timeslice -= io_size_in_byte; 1267 break; 1268 case SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES: 1269 default: 1270 break; 1271 } 1272 } 1273 } 1274 1275 static int 1276 _spdk_bdev_qos_io_submit(struct spdk_bdev_channel *ch, struct spdk_bdev_qos *qos) 1277 { 1278 struct spdk_bdev_io *bdev_io = NULL; 1279 struct spdk_bdev *bdev = ch->bdev; 1280 struct spdk_bdev_shared_resource *shared_resource = ch->shared_resource; 1281 int i, submitted_ios = 0; 1282 bool to_limit_io; 1283 uint64_t io_size_in_byte; 1284 1285 while (!TAILQ_EMPTY(&qos->queued)) { 1286 for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) { 1287 if (qos->rate_limits[i].max_per_timeslice > 0 && 1288 (qos->rate_limits[i].remaining_this_timeslice <= 0)) { 1289 return submitted_ios; 1290 } 1291 } 1292 1293 bdev_io = TAILQ_FIRST(&qos->queued); 1294 TAILQ_REMOVE(&qos->queued, bdev_io, internal.link); 1295 ch->io_outstanding++; 1296 shared_resource->io_outstanding++; 1297 to_limit_io = _spdk_bdev_qos_io_to_limit(bdev_io); 1298 if (to_limit_io == true) { 1299 io_size_in_byte = _spdk_bdev_get_io_size_in_byte(bdev_io); 1300 _spdk_bdev_qos_update_per_io(qos, io_size_in_byte); 1301 } 1302 bdev->fn_table->submit_request(ch->channel, bdev_io); 1303 submitted_ios++; 1304 } 1305 1306 return submitted_ios; 1307 } 1308 1309 static void 1310 _spdk_bdev_queue_io_wait_with_cb(struct spdk_bdev_io *bdev_io, spdk_bdev_io_wait_cb cb_fn) 1311 { 1312 int rc; 1313 1314 bdev_io->internal.waitq_entry.bdev = bdev_io->bdev; 1315 bdev_io->internal.waitq_entry.cb_fn = cb_fn; 1316 bdev_io->internal.waitq_entry.cb_arg = bdev_io; 1317 rc = spdk_bdev_queue_io_wait(bdev_io->bdev, spdk_io_channel_from_ctx(bdev_io->internal.ch), 1318 &bdev_io->internal.waitq_entry); 1319 if (rc != 0) { 1320 SPDK_ERRLOG("Queue IO failed, rc=%d\n", rc); 1321 bdev_io->internal.status = SPDK_BDEV_IO_STATUS_FAILED; 1322 bdev_io->internal.cb(bdev_io, false, bdev_io->internal.caller_ctx); 1323 } 1324 } 1325 1326 static bool 1327 _spdk_bdev_io_type_can_split(uint8_t type) 1328 { 1329 assert(type != SPDK_BDEV_IO_TYPE_INVALID); 1330 assert(type < SPDK_BDEV_NUM_IO_TYPES); 1331 1332 /* Only split READ and WRITE I/O. Theoretically other types of I/O like 1333 * UNMAP could be split, but these types of I/O are typically much larger 1334 * in size (sometimes the size of the entire block device), and the bdev 1335 * module can more efficiently split these types of I/O. Plus those types 1336 * of I/O do not have a payload, which makes the splitting process simpler. 1337 */ 1338 if (type == SPDK_BDEV_IO_TYPE_READ || type == SPDK_BDEV_IO_TYPE_WRITE) { 1339 return true; 1340 } else { 1341 return false; 1342 } 1343 } 1344 1345 static bool 1346 _spdk_bdev_io_should_split(struct spdk_bdev_io *bdev_io) 1347 { 1348 uint64_t start_stripe, end_stripe; 1349 uint32_t io_boundary = bdev_io->bdev->optimal_io_boundary; 1350 1351 if (io_boundary == 0) { 1352 return false; 1353 } 1354 1355 if (!_spdk_bdev_io_type_can_split(bdev_io->type)) { 1356 return false; 1357 } 1358 1359 start_stripe = bdev_io->u.bdev.offset_blocks; 1360 end_stripe = start_stripe + bdev_io->u.bdev.num_blocks - 1; 1361 /* Avoid expensive div operations if possible. These spdk_u32 functions are very cheap. */ 1362 if (spdk_likely(spdk_u32_is_pow2(io_boundary))) { 1363 start_stripe >>= spdk_u32log2(io_boundary); 1364 end_stripe >>= spdk_u32log2(io_boundary); 1365 } else { 1366 start_stripe /= io_boundary; 1367 end_stripe /= io_boundary; 1368 } 1369 return (start_stripe != end_stripe); 1370 } 1371 1372 static uint32_t 1373 _to_next_boundary(uint64_t offset, uint32_t boundary) 1374 { 1375 return (boundary - (offset % boundary)); 1376 } 1377 1378 static void 1379 _spdk_bdev_io_split_done(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg); 1380 1381 static void 1382 _spdk_bdev_io_split_with_payload(void *_bdev_io) 1383 { 1384 struct spdk_bdev_io *bdev_io = _bdev_io; 1385 uint64_t current_offset, remaining; 1386 uint32_t blocklen, to_next_boundary, to_next_boundary_bytes; 1387 struct iovec *parent_iov, *iov; 1388 uint64_t parent_iov_offset, iov_len; 1389 uint32_t parent_iovpos, parent_iovcnt, child_iovcnt, iovcnt; 1390 int rc; 1391 1392 remaining = bdev_io->u.bdev.split_remaining_num_blocks; 1393 current_offset = bdev_io->u.bdev.split_current_offset_blocks; 1394 blocklen = bdev_io->bdev->blocklen; 1395 parent_iov_offset = (current_offset - bdev_io->u.bdev.offset_blocks) * blocklen; 1396 parent_iovcnt = bdev_io->u.bdev.iovcnt; 1397 1398 for (parent_iovpos = 0; parent_iovpos < parent_iovcnt; parent_iovpos++) { 1399 parent_iov = &bdev_io->u.bdev.iovs[parent_iovpos]; 1400 if (parent_iov_offset < parent_iov->iov_len) { 1401 break; 1402 } 1403 parent_iov_offset -= parent_iov->iov_len; 1404 } 1405 1406 child_iovcnt = 0; 1407 while (remaining > 0 && parent_iovpos < parent_iovcnt && child_iovcnt < BDEV_IO_NUM_CHILD_IOV) { 1408 to_next_boundary = _to_next_boundary(current_offset, bdev_io->bdev->optimal_io_boundary); 1409 to_next_boundary = spdk_min(remaining, to_next_boundary); 1410 to_next_boundary_bytes = to_next_boundary * blocklen; 1411 iov = &bdev_io->child_iov[child_iovcnt]; 1412 iovcnt = 0; 1413 while (to_next_boundary_bytes > 0 && parent_iovpos < parent_iovcnt && 1414 child_iovcnt < BDEV_IO_NUM_CHILD_IOV) { 1415 parent_iov = &bdev_io->u.bdev.iovs[parent_iovpos]; 1416 iov_len = spdk_min(to_next_boundary_bytes, parent_iov->iov_len - parent_iov_offset); 1417 to_next_boundary_bytes -= iov_len; 1418 1419 bdev_io->child_iov[child_iovcnt].iov_base = parent_iov->iov_base + parent_iov_offset; 1420 bdev_io->child_iov[child_iovcnt].iov_len = iov_len; 1421 1422 if (iov_len < parent_iov->iov_len - parent_iov_offset) { 1423 parent_iov_offset += iov_len; 1424 } else { 1425 parent_iovpos++; 1426 parent_iov_offset = 0; 1427 } 1428 child_iovcnt++; 1429 iovcnt++; 1430 } 1431 1432 if (to_next_boundary_bytes > 0) { 1433 /* We had to stop this child I/O early because we ran out of 1434 * child_iov space. Make sure the iovs collected are valid and 1435 * then adjust to_next_boundary before starting the child I/O. 1436 */ 1437 if ((to_next_boundary_bytes % blocklen) != 0) { 1438 SPDK_ERRLOG("Remaining %" PRIu32 " is not multiple of block size %" PRIu32 "\n", 1439 to_next_boundary_bytes, blocklen); 1440 bdev_io->internal.status = SPDK_BDEV_IO_STATUS_FAILED; 1441 if (bdev_io->u.bdev.split_outstanding == 0) { 1442 bdev_io->internal.cb(bdev_io, false, bdev_io->internal.caller_ctx); 1443 } 1444 return; 1445 } 1446 to_next_boundary -= to_next_boundary_bytes / blocklen; 1447 } 1448 1449 bdev_io->u.bdev.split_outstanding++; 1450 1451 if (bdev_io->type == SPDK_BDEV_IO_TYPE_READ) { 1452 rc = spdk_bdev_readv_blocks(bdev_io->internal.desc, 1453 spdk_io_channel_from_ctx(bdev_io->internal.ch), 1454 iov, iovcnt, current_offset, to_next_boundary, 1455 _spdk_bdev_io_split_done, bdev_io); 1456 } else { 1457 rc = spdk_bdev_writev_blocks(bdev_io->internal.desc, 1458 spdk_io_channel_from_ctx(bdev_io->internal.ch), 1459 iov, iovcnt, current_offset, to_next_boundary, 1460 _spdk_bdev_io_split_done, bdev_io); 1461 } 1462 1463 if (rc == 0) { 1464 current_offset += to_next_boundary; 1465 remaining -= to_next_boundary; 1466 bdev_io->u.bdev.split_current_offset_blocks = current_offset; 1467 bdev_io->u.bdev.split_remaining_num_blocks = remaining; 1468 } else { 1469 bdev_io->u.bdev.split_outstanding--; 1470 if (rc == -ENOMEM) { 1471 if (bdev_io->u.bdev.split_outstanding == 0) { 1472 /* No I/O is outstanding. Hence we should wait here. */ 1473 _spdk_bdev_queue_io_wait_with_cb(bdev_io, 1474 _spdk_bdev_io_split_with_payload); 1475 } 1476 } else { 1477 bdev_io->internal.status = SPDK_BDEV_IO_STATUS_FAILED; 1478 if (bdev_io->u.bdev.split_outstanding == 0) { 1479 bdev_io->internal.cb(bdev_io, false, bdev_io->internal.caller_ctx); 1480 } 1481 } 1482 1483 return; 1484 } 1485 } 1486 } 1487 1488 static void 1489 _spdk_bdev_io_split_done(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg) 1490 { 1491 struct spdk_bdev_io *parent_io = cb_arg; 1492 1493 spdk_bdev_free_io(bdev_io); 1494 1495 if (!success) { 1496 parent_io->internal.status = SPDK_BDEV_IO_STATUS_FAILED; 1497 } 1498 parent_io->u.bdev.split_outstanding--; 1499 if (parent_io->u.bdev.split_outstanding != 0) { 1500 return; 1501 } 1502 1503 /* 1504 * Parent I/O finishes when all blocks are consumed or there is any failure of 1505 * child I/O and no outstanding child I/O. 1506 */ 1507 if (parent_io->u.bdev.split_remaining_num_blocks == 0 || 1508 parent_io->internal.status != SPDK_BDEV_IO_STATUS_SUCCESS) { 1509 parent_io->internal.cb(parent_io, parent_io->internal.status == SPDK_BDEV_IO_STATUS_SUCCESS, 1510 parent_io->internal.caller_ctx); 1511 return; 1512 } 1513 1514 /* 1515 * Continue with the splitting process. This function will complete the parent I/O if the 1516 * splitting is done. 1517 */ 1518 _spdk_bdev_io_split_with_payload(parent_io); 1519 } 1520 1521 static void 1522 _spdk_bdev_io_split(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io) 1523 { 1524 assert(_spdk_bdev_io_type_can_split(bdev_io->type)); 1525 1526 bdev_io->u.bdev.split_current_offset_blocks = bdev_io->u.bdev.offset_blocks; 1527 bdev_io->u.bdev.split_remaining_num_blocks = bdev_io->u.bdev.num_blocks; 1528 bdev_io->u.bdev.split_outstanding = 0; 1529 bdev_io->internal.status = SPDK_BDEV_IO_STATUS_SUCCESS; 1530 1531 _spdk_bdev_io_split_with_payload(bdev_io); 1532 } 1533 1534 static void 1535 _spdk_bdev_io_submit(void *ctx) 1536 { 1537 struct spdk_bdev_io *bdev_io = ctx; 1538 struct spdk_bdev *bdev = bdev_io->bdev; 1539 struct spdk_bdev_channel *bdev_ch = bdev_io->internal.ch; 1540 struct spdk_io_channel *ch = bdev_ch->channel; 1541 struct spdk_bdev_shared_resource *shared_resource = bdev_ch->shared_resource; 1542 uint64_t tsc; 1543 1544 tsc = spdk_get_ticks(); 1545 bdev_io->internal.submit_tsc = tsc; 1546 spdk_trace_record_tsc(tsc, TRACE_BDEV_IO_START, 0, 0, (uintptr_t)bdev_io, bdev_io->type); 1547 bdev_ch->io_outstanding++; 1548 shared_resource->io_outstanding++; 1549 bdev_io->internal.in_submit_request = true; 1550 if (spdk_likely(bdev_ch->flags == 0)) { 1551 if (spdk_likely(TAILQ_EMPTY(&shared_resource->nomem_io))) { 1552 bdev->fn_table->submit_request(ch, bdev_io); 1553 } else { 1554 bdev_ch->io_outstanding--; 1555 shared_resource->io_outstanding--; 1556 TAILQ_INSERT_TAIL(&shared_resource->nomem_io, bdev_io, internal.link); 1557 } 1558 } else if (bdev_ch->flags & BDEV_CH_RESET_IN_PROGRESS) { 1559 spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED); 1560 } else if (bdev_ch->flags & BDEV_CH_QOS_ENABLED) { 1561 bdev_ch->io_outstanding--; 1562 shared_resource->io_outstanding--; 1563 TAILQ_INSERT_TAIL(&bdev->internal.qos->queued, bdev_io, internal.link); 1564 _spdk_bdev_qos_io_submit(bdev_ch, bdev->internal.qos); 1565 } else { 1566 SPDK_ERRLOG("unknown bdev_ch flag %x found\n", bdev_ch->flags); 1567 spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED); 1568 } 1569 bdev_io->internal.in_submit_request = false; 1570 } 1571 1572 static void 1573 spdk_bdev_io_submit(struct spdk_bdev_io *bdev_io) 1574 { 1575 struct spdk_bdev *bdev = bdev_io->bdev; 1576 struct spdk_thread *thread = spdk_io_channel_get_thread(bdev_io->internal.ch->channel); 1577 1578 assert(thread != NULL); 1579 assert(bdev_io->internal.status == SPDK_BDEV_IO_STATUS_PENDING); 1580 1581 if (bdev->split_on_optimal_io_boundary && _spdk_bdev_io_should_split(bdev_io)) { 1582 if (bdev_io->type == SPDK_BDEV_IO_TYPE_READ) { 1583 spdk_bdev_io_get_buf(bdev_io, _spdk_bdev_io_split, 1584 bdev_io->u.bdev.num_blocks * bdev_io->bdev->blocklen); 1585 } else { 1586 _spdk_bdev_io_split(NULL, bdev_io); 1587 } 1588 return; 1589 } 1590 1591 if (bdev_io->internal.ch->flags & BDEV_CH_QOS_ENABLED) { 1592 if ((thread == bdev->internal.qos->thread) || !bdev->internal.qos->thread) { 1593 _spdk_bdev_io_submit(bdev_io); 1594 } else { 1595 bdev_io->internal.io_submit_ch = bdev_io->internal.ch; 1596 bdev_io->internal.ch = bdev->internal.qos->ch; 1597 spdk_thread_send_msg(bdev->internal.qos->thread, _spdk_bdev_io_submit, bdev_io); 1598 } 1599 } else { 1600 _spdk_bdev_io_submit(bdev_io); 1601 } 1602 } 1603 1604 static void 1605 spdk_bdev_io_submit_reset(struct spdk_bdev_io *bdev_io) 1606 { 1607 struct spdk_bdev *bdev = bdev_io->bdev; 1608 struct spdk_bdev_channel *bdev_ch = bdev_io->internal.ch; 1609 struct spdk_io_channel *ch = bdev_ch->channel; 1610 1611 assert(bdev_io->internal.status == SPDK_BDEV_IO_STATUS_PENDING); 1612 1613 bdev_io->internal.in_submit_request = true; 1614 bdev->fn_table->submit_request(ch, bdev_io); 1615 bdev_io->internal.in_submit_request = false; 1616 } 1617 1618 static void 1619 spdk_bdev_io_init(struct spdk_bdev_io *bdev_io, 1620 struct spdk_bdev *bdev, void *cb_arg, 1621 spdk_bdev_io_completion_cb cb) 1622 { 1623 bdev_io->bdev = bdev; 1624 bdev_io->internal.caller_ctx = cb_arg; 1625 bdev_io->internal.cb = cb; 1626 bdev_io->internal.status = SPDK_BDEV_IO_STATUS_PENDING; 1627 bdev_io->internal.in_submit_request = false; 1628 bdev_io->internal.buf = NULL; 1629 bdev_io->internal.io_submit_ch = NULL; 1630 bdev_io->internal.orig_iovs = NULL; 1631 bdev_io->internal.orig_iovcnt = 0; 1632 } 1633 1634 static bool 1635 _spdk_bdev_io_type_supported(struct spdk_bdev *bdev, enum spdk_bdev_io_type io_type) 1636 { 1637 return bdev->fn_table->io_type_supported(bdev->ctxt, io_type); 1638 } 1639 1640 bool 1641 spdk_bdev_io_type_supported(struct spdk_bdev *bdev, enum spdk_bdev_io_type io_type) 1642 { 1643 bool supported; 1644 1645 supported = _spdk_bdev_io_type_supported(bdev, io_type); 1646 1647 if (!supported) { 1648 switch (io_type) { 1649 case SPDK_BDEV_IO_TYPE_WRITE_ZEROES: 1650 /* The bdev layer will emulate write zeroes as long as write is supported. */ 1651 supported = _spdk_bdev_io_type_supported(bdev, SPDK_BDEV_IO_TYPE_WRITE); 1652 break; 1653 default: 1654 break; 1655 } 1656 } 1657 1658 return supported; 1659 } 1660 1661 int 1662 spdk_bdev_dump_info_json(struct spdk_bdev *bdev, struct spdk_json_write_ctx *w) 1663 { 1664 if (bdev->fn_table->dump_info_json) { 1665 return bdev->fn_table->dump_info_json(bdev->ctxt, w); 1666 } 1667 1668 return 0; 1669 } 1670 1671 static void 1672 spdk_bdev_qos_update_max_quota_per_timeslice(struct spdk_bdev_qos *qos) 1673 { 1674 uint32_t max_per_timeslice = 0; 1675 int i; 1676 1677 for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) { 1678 if (qos->rate_limits[i].limit == SPDK_BDEV_QOS_LIMIT_NOT_DEFINED) { 1679 qos->rate_limits[i].max_per_timeslice = 0; 1680 continue; 1681 } 1682 1683 max_per_timeslice = qos->rate_limits[i].limit * 1684 SPDK_BDEV_QOS_TIMESLICE_IN_USEC / SPDK_SEC_TO_USEC; 1685 1686 qos->rate_limits[i].max_per_timeslice = spdk_max(max_per_timeslice, 1687 qos->rate_limits[i].min_per_timeslice); 1688 1689 qos->rate_limits[i].remaining_this_timeslice = qos->rate_limits[i].max_per_timeslice; 1690 } 1691 } 1692 1693 static int 1694 spdk_bdev_channel_poll_qos(void *arg) 1695 { 1696 struct spdk_bdev_qos *qos = arg; 1697 uint64_t now = spdk_get_ticks(); 1698 int i; 1699 1700 if (now < (qos->last_timeslice + qos->timeslice_size)) { 1701 /* We received our callback earlier than expected - return 1702 * immediately and wait to do accounting until at least one 1703 * timeslice has actually expired. This should never happen 1704 * with a well-behaved timer implementation. 1705 */ 1706 return 0; 1707 } 1708 1709 /* Reset for next round of rate limiting */ 1710 for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) { 1711 /* We may have allowed the IOs or bytes to slightly overrun in the last 1712 * timeslice. remaining_this_timeslice is signed, so if it's negative 1713 * here, we'll account for the overrun so that the next timeslice will 1714 * be appropriately reduced. 1715 */ 1716 if (qos->rate_limits[i].remaining_this_timeslice > 0) { 1717 qos->rate_limits[i].remaining_this_timeslice = 0; 1718 } 1719 } 1720 1721 while (now >= (qos->last_timeslice + qos->timeslice_size)) { 1722 qos->last_timeslice += qos->timeslice_size; 1723 for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) { 1724 qos->rate_limits[i].remaining_this_timeslice += 1725 qos->rate_limits[i].max_per_timeslice; 1726 } 1727 } 1728 1729 return _spdk_bdev_qos_io_submit(qos->ch, qos); 1730 } 1731 1732 static void 1733 _spdk_bdev_channel_destroy_resource(struct spdk_bdev_channel *ch) 1734 { 1735 struct spdk_bdev_shared_resource *shared_resource; 1736 1737 spdk_put_io_channel(ch->channel); 1738 1739 shared_resource = ch->shared_resource; 1740 1741 assert(ch->io_outstanding == 0); 1742 assert(shared_resource->ref > 0); 1743 shared_resource->ref--; 1744 if (shared_resource->ref == 0) { 1745 assert(shared_resource->io_outstanding == 0); 1746 TAILQ_REMOVE(&shared_resource->mgmt_ch->shared_resources, shared_resource, link); 1747 spdk_put_io_channel(spdk_io_channel_from_ctx(shared_resource->mgmt_ch)); 1748 free(shared_resource); 1749 } 1750 } 1751 1752 /* Caller must hold bdev->internal.mutex. */ 1753 static void 1754 _spdk_bdev_enable_qos(struct spdk_bdev *bdev, struct spdk_bdev_channel *ch) 1755 { 1756 struct spdk_bdev_qos *qos = bdev->internal.qos; 1757 int i; 1758 1759 /* Rate limiting on this bdev enabled */ 1760 if (qos) { 1761 if (qos->ch == NULL) { 1762 struct spdk_io_channel *io_ch; 1763 1764 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Selecting channel %p as QoS channel for bdev %s on thread %p\n", ch, 1765 bdev->name, spdk_get_thread()); 1766 1767 /* No qos channel has been selected, so set one up */ 1768 1769 /* Take another reference to ch */ 1770 io_ch = spdk_get_io_channel(__bdev_to_io_dev(bdev)); 1771 assert(io_ch != NULL); 1772 qos->ch = ch; 1773 1774 qos->thread = spdk_io_channel_get_thread(io_ch); 1775 1776 TAILQ_INIT(&qos->queued); 1777 1778 for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) { 1779 if (_spdk_bdev_qos_is_iops_rate_limit(i) == true) { 1780 qos->rate_limits[i].min_per_timeslice = 1781 SPDK_BDEV_QOS_MIN_IO_PER_TIMESLICE; 1782 } else { 1783 qos->rate_limits[i].min_per_timeslice = 1784 SPDK_BDEV_QOS_MIN_BYTE_PER_TIMESLICE; 1785 } 1786 1787 if (qos->rate_limits[i].limit == 0) { 1788 qos->rate_limits[i].limit = SPDK_BDEV_QOS_LIMIT_NOT_DEFINED; 1789 } 1790 } 1791 spdk_bdev_qos_update_max_quota_per_timeslice(qos); 1792 qos->timeslice_size = 1793 SPDK_BDEV_QOS_TIMESLICE_IN_USEC * spdk_get_ticks_hz() / SPDK_SEC_TO_USEC; 1794 qos->last_timeslice = spdk_get_ticks(); 1795 qos->poller = spdk_poller_register(spdk_bdev_channel_poll_qos, 1796 qos, 1797 SPDK_BDEV_QOS_TIMESLICE_IN_USEC); 1798 } 1799 1800 ch->flags |= BDEV_CH_QOS_ENABLED; 1801 } 1802 } 1803 1804 static int 1805 spdk_bdev_channel_create(void *io_device, void *ctx_buf) 1806 { 1807 struct spdk_bdev *bdev = __bdev_from_io_dev(io_device); 1808 struct spdk_bdev_channel *ch = ctx_buf; 1809 struct spdk_io_channel *mgmt_io_ch; 1810 struct spdk_bdev_mgmt_channel *mgmt_ch; 1811 struct spdk_bdev_shared_resource *shared_resource; 1812 1813 ch->bdev = bdev; 1814 ch->channel = bdev->fn_table->get_io_channel(bdev->ctxt); 1815 if (!ch->channel) { 1816 return -1; 1817 } 1818 1819 mgmt_io_ch = spdk_get_io_channel(&g_bdev_mgr); 1820 if (!mgmt_io_ch) { 1821 spdk_put_io_channel(ch->channel); 1822 return -1; 1823 } 1824 1825 mgmt_ch = spdk_io_channel_get_ctx(mgmt_io_ch); 1826 TAILQ_FOREACH(shared_resource, &mgmt_ch->shared_resources, link) { 1827 if (shared_resource->shared_ch == ch->channel) { 1828 spdk_put_io_channel(mgmt_io_ch); 1829 shared_resource->ref++; 1830 break; 1831 } 1832 } 1833 1834 if (shared_resource == NULL) { 1835 shared_resource = calloc(1, sizeof(*shared_resource)); 1836 if (shared_resource == NULL) { 1837 spdk_put_io_channel(ch->channel); 1838 spdk_put_io_channel(mgmt_io_ch); 1839 return -1; 1840 } 1841 1842 shared_resource->mgmt_ch = mgmt_ch; 1843 shared_resource->io_outstanding = 0; 1844 TAILQ_INIT(&shared_resource->nomem_io); 1845 shared_resource->nomem_threshold = 0; 1846 shared_resource->shared_ch = ch->channel; 1847 shared_resource->ref = 1; 1848 TAILQ_INSERT_TAIL(&mgmt_ch->shared_resources, shared_resource, link); 1849 } 1850 1851 memset(&ch->stat, 0, sizeof(ch->stat)); 1852 ch->stat.ticks_rate = spdk_get_ticks_hz(); 1853 ch->io_outstanding = 0; 1854 TAILQ_INIT(&ch->queued_resets); 1855 ch->flags = 0; 1856 ch->shared_resource = shared_resource; 1857 1858 #ifdef SPDK_CONFIG_VTUNE 1859 { 1860 char *name; 1861 __itt_init_ittlib(NULL, 0); 1862 name = spdk_sprintf_alloc("spdk_bdev_%s_%p", ch->bdev->name, ch); 1863 if (!name) { 1864 _spdk_bdev_channel_destroy_resource(ch); 1865 return -1; 1866 } 1867 ch->handle = __itt_string_handle_create(name); 1868 free(name); 1869 ch->start_tsc = spdk_get_ticks(); 1870 ch->interval_tsc = spdk_get_ticks_hz() / 100; 1871 memset(&ch->prev_stat, 0, sizeof(ch->prev_stat)); 1872 } 1873 #endif 1874 1875 pthread_mutex_lock(&bdev->internal.mutex); 1876 _spdk_bdev_enable_qos(bdev, ch); 1877 pthread_mutex_unlock(&bdev->internal.mutex); 1878 1879 return 0; 1880 } 1881 1882 /* 1883 * Abort I/O that are waiting on a data buffer. These types of I/O are 1884 * linked using the spdk_bdev_io internal.buf_link TAILQ_ENTRY. 1885 */ 1886 static void 1887 _spdk_bdev_abort_buf_io(bdev_io_stailq_t *queue, struct spdk_bdev_channel *ch) 1888 { 1889 bdev_io_stailq_t tmp; 1890 struct spdk_bdev_io *bdev_io; 1891 1892 STAILQ_INIT(&tmp); 1893 1894 while (!STAILQ_EMPTY(queue)) { 1895 bdev_io = STAILQ_FIRST(queue); 1896 STAILQ_REMOVE_HEAD(queue, internal.buf_link); 1897 if (bdev_io->internal.ch == ch) { 1898 spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED); 1899 } else { 1900 STAILQ_INSERT_TAIL(&tmp, bdev_io, internal.buf_link); 1901 } 1902 } 1903 1904 STAILQ_SWAP(&tmp, queue, spdk_bdev_io); 1905 } 1906 1907 /* 1908 * Abort I/O that are queued waiting for submission. These types of I/O are 1909 * linked using the spdk_bdev_io link TAILQ_ENTRY. 1910 */ 1911 static void 1912 _spdk_bdev_abort_queued_io(bdev_io_tailq_t *queue, struct spdk_bdev_channel *ch) 1913 { 1914 struct spdk_bdev_io *bdev_io, *tmp; 1915 1916 TAILQ_FOREACH_SAFE(bdev_io, queue, internal.link, tmp) { 1917 if (bdev_io->internal.ch == ch) { 1918 TAILQ_REMOVE(queue, bdev_io, internal.link); 1919 /* 1920 * spdk_bdev_io_complete() assumes that the completed I/O had 1921 * been submitted to the bdev module. Since in this case it 1922 * hadn't, bump io_outstanding to account for the decrement 1923 * that spdk_bdev_io_complete() will do. 1924 */ 1925 if (bdev_io->type != SPDK_BDEV_IO_TYPE_RESET) { 1926 ch->io_outstanding++; 1927 ch->shared_resource->io_outstanding++; 1928 } 1929 spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED); 1930 } 1931 } 1932 } 1933 1934 static void 1935 spdk_bdev_qos_channel_destroy(void *cb_arg) 1936 { 1937 struct spdk_bdev_qos *qos = cb_arg; 1938 1939 spdk_put_io_channel(spdk_io_channel_from_ctx(qos->ch)); 1940 spdk_poller_unregister(&qos->poller); 1941 1942 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Free QoS %p.\n", qos); 1943 1944 free(qos); 1945 } 1946 1947 static int 1948 spdk_bdev_qos_destroy(struct spdk_bdev *bdev) 1949 { 1950 int i; 1951 1952 /* 1953 * Cleanly shutting down the QoS poller is tricky, because 1954 * during the asynchronous operation the user could open 1955 * a new descriptor and create a new channel, spawning 1956 * a new QoS poller. 1957 * 1958 * The strategy is to create a new QoS structure here and swap it 1959 * in. The shutdown path then continues to refer to the old one 1960 * until it completes and then releases it. 1961 */ 1962 struct spdk_bdev_qos *new_qos, *old_qos; 1963 1964 old_qos = bdev->internal.qos; 1965 1966 new_qos = calloc(1, sizeof(*new_qos)); 1967 if (!new_qos) { 1968 SPDK_ERRLOG("Unable to allocate memory to shut down QoS.\n"); 1969 return -ENOMEM; 1970 } 1971 1972 /* Copy the old QoS data into the newly allocated structure */ 1973 memcpy(new_qos, old_qos, sizeof(*new_qos)); 1974 1975 /* Zero out the key parts of the QoS structure */ 1976 new_qos->ch = NULL; 1977 new_qos->thread = NULL; 1978 new_qos->poller = NULL; 1979 TAILQ_INIT(&new_qos->queued); 1980 /* 1981 * The limit member of spdk_bdev_qos_limit structure is not zeroed. 1982 * It will be used later for the new QoS structure. 1983 */ 1984 for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) { 1985 new_qos->rate_limits[i].remaining_this_timeslice = 0; 1986 new_qos->rate_limits[i].min_per_timeslice = 0; 1987 new_qos->rate_limits[i].max_per_timeslice = 0; 1988 } 1989 1990 bdev->internal.qos = new_qos; 1991 1992 if (old_qos->thread == NULL) { 1993 free(old_qos); 1994 } else { 1995 spdk_thread_send_msg(old_qos->thread, spdk_bdev_qos_channel_destroy, 1996 old_qos); 1997 } 1998 1999 /* It is safe to continue with destroying the bdev even though the QoS channel hasn't 2000 * been destroyed yet. The destruction path will end up waiting for the final 2001 * channel to be put before it releases resources. */ 2002 2003 return 0; 2004 } 2005 2006 static void 2007 _spdk_bdev_io_stat_add(struct spdk_bdev_io_stat *total, struct spdk_bdev_io_stat *add) 2008 { 2009 total->bytes_read += add->bytes_read; 2010 total->num_read_ops += add->num_read_ops; 2011 total->bytes_written += add->bytes_written; 2012 total->num_write_ops += add->num_write_ops; 2013 total->bytes_unmapped += add->bytes_unmapped; 2014 total->num_unmap_ops += add->num_unmap_ops; 2015 total->read_latency_ticks += add->read_latency_ticks; 2016 total->write_latency_ticks += add->write_latency_ticks; 2017 total->unmap_latency_ticks += add->unmap_latency_ticks; 2018 } 2019 2020 static void 2021 spdk_bdev_channel_destroy(void *io_device, void *ctx_buf) 2022 { 2023 struct spdk_bdev_channel *ch = ctx_buf; 2024 struct spdk_bdev_mgmt_channel *mgmt_ch; 2025 struct spdk_bdev_shared_resource *shared_resource = ch->shared_resource; 2026 2027 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Destroying channel %p for bdev %s on thread %p\n", ch, ch->bdev->name, 2028 spdk_get_thread()); 2029 2030 /* This channel is going away, so add its statistics into the bdev so that they don't get lost. */ 2031 pthread_mutex_lock(&ch->bdev->internal.mutex); 2032 _spdk_bdev_io_stat_add(&ch->bdev->internal.stat, &ch->stat); 2033 pthread_mutex_unlock(&ch->bdev->internal.mutex); 2034 2035 mgmt_ch = shared_resource->mgmt_ch; 2036 2037 _spdk_bdev_abort_queued_io(&ch->queued_resets, ch); 2038 _spdk_bdev_abort_queued_io(&shared_resource->nomem_io, ch); 2039 _spdk_bdev_abort_buf_io(&mgmt_ch->need_buf_small, ch); 2040 _spdk_bdev_abort_buf_io(&mgmt_ch->need_buf_large, ch); 2041 2042 _spdk_bdev_channel_destroy_resource(ch); 2043 } 2044 2045 int 2046 spdk_bdev_alias_add(struct spdk_bdev *bdev, const char *alias) 2047 { 2048 struct spdk_bdev_alias *tmp; 2049 2050 if (alias == NULL) { 2051 SPDK_ERRLOG("Empty alias passed\n"); 2052 return -EINVAL; 2053 } 2054 2055 if (spdk_bdev_get_by_name(alias)) { 2056 SPDK_ERRLOG("Bdev name/alias: %s already exists\n", alias); 2057 return -EEXIST; 2058 } 2059 2060 tmp = calloc(1, sizeof(*tmp)); 2061 if (tmp == NULL) { 2062 SPDK_ERRLOG("Unable to allocate alias\n"); 2063 return -ENOMEM; 2064 } 2065 2066 tmp->alias = strdup(alias); 2067 if (tmp->alias == NULL) { 2068 free(tmp); 2069 SPDK_ERRLOG("Unable to allocate alias\n"); 2070 return -ENOMEM; 2071 } 2072 2073 TAILQ_INSERT_TAIL(&bdev->aliases, tmp, tailq); 2074 2075 return 0; 2076 } 2077 2078 int 2079 spdk_bdev_alias_del(struct spdk_bdev *bdev, const char *alias) 2080 { 2081 struct spdk_bdev_alias *tmp; 2082 2083 TAILQ_FOREACH(tmp, &bdev->aliases, tailq) { 2084 if (strcmp(alias, tmp->alias) == 0) { 2085 TAILQ_REMOVE(&bdev->aliases, tmp, tailq); 2086 free(tmp->alias); 2087 free(tmp); 2088 return 0; 2089 } 2090 } 2091 2092 SPDK_INFOLOG(SPDK_LOG_BDEV, "Alias %s does not exists\n", alias); 2093 2094 return -ENOENT; 2095 } 2096 2097 void 2098 spdk_bdev_alias_del_all(struct spdk_bdev *bdev) 2099 { 2100 struct spdk_bdev_alias *p, *tmp; 2101 2102 TAILQ_FOREACH_SAFE(p, &bdev->aliases, tailq, tmp) { 2103 TAILQ_REMOVE(&bdev->aliases, p, tailq); 2104 free(p->alias); 2105 free(p); 2106 } 2107 } 2108 2109 struct spdk_io_channel * 2110 spdk_bdev_get_io_channel(struct spdk_bdev_desc *desc) 2111 { 2112 return spdk_get_io_channel(__bdev_to_io_dev(desc->bdev)); 2113 } 2114 2115 const char * 2116 spdk_bdev_get_name(const struct spdk_bdev *bdev) 2117 { 2118 return bdev->name; 2119 } 2120 2121 const char * 2122 spdk_bdev_get_product_name(const struct spdk_bdev *bdev) 2123 { 2124 return bdev->product_name; 2125 } 2126 2127 const struct spdk_bdev_aliases_list * 2128 spdk_bdev_get_aliases(const struct spdk_bdev *bdev) 2129 { 2130 return &bdev->aliases; 2131 } 2132 2133 uint32_t 2134 spdk_bdev_get_block_size(const struct spdk_bdev *bdev) 2135 { 2136 return bdev->blocklen; 2137 } 2138 2139 uint64_t 2140 spdk_bdev_get_num_blocks(const struct spdk_bdev *bdev) 2141 { 2142 return bdev->blockcnt; 2143 } 2144 2145 const char * 2146 spdk_bdev_get_qos_rpc_type(enum spdk_bdev_qos_rate_limit_type type) 2147 { 2148 return qos_rpc_type[type]; 2149 } 2150 2151 void 2152 spdk_bdev_get_qos_rate_limits(struct spdk_bdev *bdev, uint64_t *limits) 2153 { 2154 int i; 2155 2156 memset(limits, 0, sizeof(*limits) * SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES); 2157 2158 pthread_mutex_lock(&bdev->internal.mutex); 2159 if (bdev->internal.qos) { 2160 for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) { 2161 if (bdev->internal.qos->rate_limits[i].limit != 2162 SPDK_BDEV_QOS_LIMIT_NOT_DEFINED) { 2163 limits[i] = bdev->internal.qos->rate_limits[i].limit; 2164 if (_spdk_bdev_qos_is_iops_rate_limit(i) == false) { 2165 /* Change from Byte to Megabyte which is user visible. */ 2166 limits[i] = limits[i] / 1024 / 1024; 2167 } 2168 } 2169 } 2170 } 2171 pthread_mutex_unlock(&bdev->internal.mutex); 2172 } 2173 2174 size_t 2175 spdk_bdev_get_buf_align(const struct spdk_bdev *bdev) 2176 { 2177 return 1 << bdev->required_alignment; 2178 } 2179 2180 uint32_t 2181 spdk_bdev_get_optimal_io_boundary(const struct spdk_bdev *bdev) 2182 { 2183 return bdev->optimal_io_boundary; 2184 } 2185 2186 bool 2187 spdk_bdev_has_write_cache(const struct spdk_bdev *bdev) 2188 { 2189 return bdev->write_cache; 2190 } 2191 2192 const struct spdk_uuid * 2193 spdk_bdev_get_uuid(const struct spdk_bdev *bdev) 2194 { 2195 return &bdev->uuid; 2196 } 2197 2198 uint64_t 2199 spdk_bdev_get_qd(const struct spdk_bdev *bdev) 2200 { 2201 return bdev->internal.measured_queue_depth; 2202 } 2203 2204 uint64_t 2205 spdk_bdev_get_qd_sampling_period(const struct spdk_bdev *bdev) 2206 { 2207 return bdev->internal.period; 2208 } 2209 2210 uint64_t 2211 spdk_bdev_get_weighted_io_time(const struct spdk_bdev *bdev) 2212 { 2213 return bdev->internal.weighted_io_time; 2214 } 2215 2216 uint64_t 2217 spdk_bdev_get_io_time(const struct spdk_bdev *bdev) 2218 { 2219 return bdev->internal.io_time; 2220 } 2221 2222 static void 2223 _calculate_measured_qd_cpl(struct spdk_io_channel_iter *i, int status) 2224 { 2225 struct spdk_bdev *bdev = spdk_io_channel_iter_get_ctx(i); 2226 2227 bdev->internal.measured_queue_depth = bdev->internal.temporary_queue_depth; 2228 2229 if (bdev->internal.measured_queue_depth) { 2230 bdev->internal.io_time += bdev->internal.period; 2231 bdev->internal.weighted_io_time += bdev->internal.period * bdev->internal.measured_queue_depth; 2232 } 2233 } 2234 2235 static void 2236 _calculate_measured_qd(struct spdk_io_channel_iter *i) 2237 { 2238 struct spdk_bdev *bdev = spdk_io_channel_iter_get_ctx(i); 2239 struct spdk_io_channel *io_ch = spdk_io_channel_iter_get_channel(i); 2240 struct spdk_bdev_channel *ch = spdk_io_channel_get_ctx(io_ch); 2241 2242 bdev->internal.temporary_queue_depth += ch->io_outstanding; 2243 spdk_for_each_channel_continue(i, 0); 2244 } 2245 2246 static int 2247 spdk_bdev_calculate_measured_queue_depth(void *ctx) 2248 { 2249 struct spdk_bdev *bdev = ctx; 2250 bdev->internal.temporary_queue_depth = 0; 2251 spdk_for_each_channel(__bdev_to_io_dev(bdev), _calculate_measured_qd, bdev, 2252 _calculate_measured_qd_cpl); 2253 return 0; 2254 } 2255 2256 void 2257 spdk_bdev_set_qd_sampling_period(struct spdk_bdev *bdev, uint64_t period) 2258 { 2259 bdev->internal.period = period; 2260 2261 if (bdev->internal.qd_poller != NULL) { 2262 spdk_poller_unregister(&bdev->internal.qd_poller); 2263 bdev->internal.measured_queue_depth = UINT64_MAX; 2264 } 2265 2266 if (period != 0) { 2267 bdev->internal.qd_poller = spdk_poller_register(spdk_bdev_calculate_measured_queue_depth, bdev, 2268 period); 2269 } 2270 } 2271 2272 int 2273 spdk_bdev_notify_blockcnt_change(struct spdk_bdev *bdev, uint64_t size) 2274 { 2275 int ret; 2276 2277 pthread_mutex_lock(&bdev->internal.mutex); 2278 2279 /* bdev has open descriptors */ 2280 if (!TAILQ_EMPTY(&bdev->internal.open_descs) && 2281 bdev->blockcnt > size) { 2282 ret = -EBUSY; 2283 } else { 2284 bdev->blockcnt = size; 2285 ret = 0; 2286 } 2287 2288 pthread_mutex_unlock(&bdev->internal.mutex); 2289 2290 return ret; 2291 } 2292 2293 /* 2294 * Convert I/O offset and length from bytes to blocks. 2295 * 2296 * Returns zero on success or non-zero if the byte parameters aren't divisible by the block size. 2297 */ 2298 static uint64_t 2299 spdk_bdev_bytes_to_blocks(struct spdk_bdev *bdev, uint64_t offset_bytes, uint64_t *offset_blocks, 2300 uint64_t num_bytes, uint64_t *num_blocks) 2301 { 2302 uint32_t block_size = bdev->blocklen; 2303 uint8_t shift_cnt; 2304 2305 /* Avoid expensive div operations if possible. These spdk_u32 functions are very cheap. */ 2306 if (spdk_likely(spdk_u32_is_pow2(block_size))) { 2307 shift_cnt = spdk_u32log2(block_size); 2308 *offset_blocks = offset_bytes >> shift_cnt; 2309 *num_blocks = num_bytes >> shift_cnt; 2310 return (offset_bytes - (*offset_blocks << shift_cnt)) | 2311 (num_bytes - (*num_blocks << shift_cnt)); 2312 } else { 2313 *offset_blocks = offset_bytes / block_size; 2314 *num_blocks = num_bytes / block_size; 2315 return (offset_bytes % block_size) | (num_bytes % block_size); 2316 } 2317 } 2318 2319 static bool 2320 spdk_bdev_io_valid_blocks(struct spdk_bdev *bdev, uint64_t offset_blocks, uint64_t num_blocks) 2321 { 2322 /* Return failure if offset_blocks + num_blocks is less than offset_blocks; indicates there 2323 * has been an overflow and hence the offset has been wrapped around */ 2324 if (offset_blocks + num_blocks < offset_blocks) { 2325 return false; 2326 } 2327 2328 /* Return failure if offset_blocks + num_blocks exceeds the size of the bdev */ 2329 if (offset_blocks + num_blocks > bdev->blockcnt) { 2330 return false; 2331 } 2332 2333 return true; 2334 } 2335 2336 int 2337 spdk_bdev_read(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 2338 void *buf, uint64_t offset, uint64_t nbytes, 2339 spdk_bdev_io_completion_cb cb, void *cb_arg) 2340 { 2341 uint64_t offset_blocks, num_blocks; 2342 2343 if (spdk_bdev_bytes_to_blocks(desc->bdev, offset, &offset_blocks, nbytes, &num_blocks) != 0) { 2344 return -EINVAL; 2345 } 2346 2347 return spdk_bdev_read_blocks(desc, ch, buf, offset_blocks, num_blocks, cb, cb_arg); 2348 } 2349 2350 int 2351 spdk_bdev_read_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 2352 void *buf, uint64_t offset_blocks, uint64_t num_blocks, 2353 spdk_bdev_io_completion_cb cb, void *cb_arg) 2354 { 2355 struct spdk_bdev *bdev = desc->bdev; 2356 struct spdk_bdev_io *bdev_io; 2357 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 2358 2359 if (!spdk_bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) { 2360 return -EINVAL; 2361 } 2362 2363 bdev_io = spdk_bdev_get_io(channel); 2364 if (!bdev_io) { 2365 return -ENOMEM; 2366 } 2367 2368 bdev_io->internal.ch = channel; 2369 bdev_io->internal.desc = desc; 2370 bdev_io->type = SPDK_BDEV_IO_TYPE_READ; 2371 bdev_io->u.bdev.iovs = &bdev_io->iov; 2372 bdev_io->u.bdev.iovs[0].iov_base = buf; 2373 bdev_io->u.bdev.iovs[0].iov_len = num_blocks * bdev->blocklen; 2374 bdev_io->u.bdev.iovcnt = 1; 2375 bdev_io->u.bdev.num_blocks = num_blocks; 2376 bdev_io->u.bdev.offset_blocks = offset_blocks; 2377 spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb); 2378 2379 spdk_bdev_io_submit(bdev_io); 2380 return 0; 2381 } 2382 2383 int 2384 spdk_bdev_readv(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 2385 struct iovec *iov, int iovcnt, 2386 uint64_t offset, uint64_t nbytes, 2387 spdk_bdev_io_completion_cb cb, void *cb_arg) 2388 { 2389 uint64_t offset_blocks, num_blocks; 2390 2391 if (spdk_bdev_bytes_to_blocks(desc->bdev, offset, &offset_blocks, nbytes, &num_blocks) != 0) { 2392 return -EINVAL; 2393 } 2394 2395 return spdk_bdev_readv_blocks(desc, ch, iov, iovcnt, offset_blocks, num_blocks, cb, cb_arg); 2396 } 2397 2398 int spdk_bdev_readv_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 2399 struct iovec *iov, int iovcnt, 2400 uint64_t offset_blocks, uint64_t num_blocks, 2401 spdk_bdev_io_completion_cb cb, void *cb_arg) 2402 { 2403 struct spdk_bdev *bdev = desc->bdev; 2404 struct spdk_bdev_io *bdev_io; 2405 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 2406 2407 if (!spdk_bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) { 2408 return -EINVAL; 2409 } 2410 2411 bdev_io = spdk_bdev_get_io(channel); 2412 if (!bdev_io) { 2413 return -ENOMEM; 2414 } 2415 2416 bdev_io->internal.ch = channel; 2417 bdev_io->internal.desc = desc; 2418 bdev_io->type = SPDK_BDEV_IO_TYPE_READ; 2419 bdev_io->u.bdev.iovs = iov; 2420 bdev_io->u.bdev.iovcnt = iovcnt; 2421 bdev_io->u.bdev.num_blocks = num_blocks; 2422 bdev_io->u.bdev.offset_blocks = offset_blocks; 2423 spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb); 2424 2425 spdk_bdev_io_submit(bdev_io); 2426 return 0; 2427 } 2428 2429 int 2430 spdk_bdev_write(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 2431 void *buf, uint64_t offset, uint64_t nbytes, 2432 spdk_bdev_io_completion_cb cb, void *cb_arg) 2433 { 2434 uint64_t offset_blocks, num_blocks; 2435 2436 if (spdk_bdev_bytes_to_blocks(desc->bdev, offset, &offset_blocks, nbytes, &num_blocks) != 0) { 2437 return -EINVAL; 2438 } 2439 2440 return spdk_bdev_write_blocks(desc, ch, buf, offset_blocks, num_blocks, cb, cb_arg); 2441 } 2442 2443 int 2444 spdk_bdev_write_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 2445 void *buf, uint64_t offset_blocks, uint64_t num_blocks, 2446 spdk_bdev_io_completion_cb cb, void *cb_arg) 2447 { 2448 struct spdk_bdev *bdev = desc->bdev; 2449 struct spdk_bdev_io *bdev_io; 2450 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 2451 2452 if (!desc->write) { 2453 return -EBADF; 2454 } 2455 2456 if (!spdk_bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) { 2457 return -EINVAL; 2458 } 2459 2460 bdev_io = spdk_bdev_get_io(channel); 2461 if (!bdev_io) { 2462 return -ENOMEM; 2463 } 2464 2465 bdev_io->internal.ch = channel; 2466 bdev_io->internal.desc = desc; 2467 bdev_io->type = SPDK_BDEV_IO_TYPE_WRITE; 2468 bdev_io->u.bdev.iovs = &bdev_io->iov; 2469 bdev_io->u.bdev.iovs[0].iov_base = buf; 2470 bdev_io->u.bdev.iovs[0].iov_len = num_blocks * bdev->blocklen; 2471 bdev_io->u.bdev.iovcnt = 1; 2472 bdev_io->u.bdev.num_blocks = num_blocks; 2473 bdev_io->u.bdev.offset_blocks = offset_blocks; 2474 spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb); 2475 2476 spdk_bdev_io_submit(bdev_io); 2477 return 0; 2478 } 2479 2480 int 2481 spdk_bdev_writev(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 2482 struct iovec *iov, int iovcnt, 2483 uint64_t offset, uint64_t len, 2484 spdk_bdev_io_completion_cb cb, void *cb_arg) 2485 { 2486 uint64_t offset_blocks, num_blocks; 2487 2488 if (spdk_bdev_bytes_to_blocks(desc->bdev, offset, &offset_blocks, len, &num_blocks) != 0) { 2489 return -EINVAL; 2490 } 2491 2492 return spdk_bdev_writev_blocks(desc, ch, iov, iovcnt, offset_blocks, num_blocks, cb, cb_arg); 2493 } 2494 2495 int 2496 spdk_bdev_writev_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 2497 struct iovec *iov, int iovcnt, 2498 uint64_t offset_blocks, uint64_t num_blocks, 2499 spdk_bdev_io_completion_cb cb, void *cb_arg) 2500 { 2501 struct spdk_bdev *bdev = desc->bdev; 2502 struct spdk_bdev_io *bdev_io; 2503 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 2504 2505 if (!desc->write) { 2506 return -EBADF; 2507 } 2508 2509 if (!spdk_bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) { 2510 return -EINVAL; 2511 } 2512 2513 bdev_io = spdk_bdev_get_io(channel); 2514 if (!bdev_io) { 2515 return -ENOMEM; 2516 } 2517 2518 bdev_io->internal.ch = channel; 2519 bdev_io->internal.desc = desc; 2520 bdev_io->type = SPDK_BDEV_IO_TYPE_WRITE; 2521 bdev_io->u.bdev.iovs = iov; 2522 bdev_io->u.bdev.iovcnt = iovcnt; 2523 bdev_io->u.bdev.num_blocks = num_blocks; 2524 bdev_io->u.bdev.offset_blocks = offset_blocks; 2525 spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb); 2526 2527 spdk_bdev_io_submit(bdev_io); 2528 return 0; 2529 } 2530 2531 int 2532 spdk_bdev_write_zeroes(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 2533 uint64_t offset, uint64_t len, 2534 spdk_bdev_io_completion_cb cb, void *cb_arg) 2535 { 2536 uint64_t offset_blocks, num_blocks; 2537 2538 if (spdk_bdev_bytes_to_blocks(desc->bdev, offset, &offset_blocks, len, &num_blocks) != 0) { 2539 return -EINVAL; 2540 } 2541 2542 return spdk_bdev_write_zeroes_blocks(desc, ch, offset_blocks, num_blocks, cb, cb_arg); 2543 } 2544 2545 int 2546 spdk_bdev_write_zeroes_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 2547 uint64_t offset_blocks, uint64_t num_blocks, 2548 spdk_bdev_io_completion_cb cb, void *cb_arg) 2549 { 2550 struct spdk_bdev *bdev = desc->bdev; 2551 struct spdk_bdev_io *bdev_io; 2552 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 2553 2554 if (!desc->write) { 2555 return -EBADF; 2556 } 2557 2558 if (!spdk_bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) { 2559 return -EINVAL; 2560 } 2561 2562 bdev_io = spdk_bdev_get_io(channel); 2563 2564 if (!bdev_io) { 2565 return -ENOMEM; 2566 } 2567 2568 bdev_io->type = SPDK_BDEV_IO_TYPE_WRITE_ZEROES; 2569 bdev_io->internal.ch = channel; 2570 bdev_io->internal.desc = desc; 2571 bdev_io->u.bdev.offset_blocks = offset_blocks; 2572 bdev_io->u.bdev.num_blocks = num_blocks; 2573 spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb); 2574 2575 if (_spdk_bdev_io_type_supported(bdev, SPDK_BDEV_IO_TYPE_WRITE_ZEROES)) { 2576 spdk_bdev_io_submit(bdev_io); 2577 return 0; 2578 } else if (_spdk_bdev_io_type_supported(bdev, SPDK_BDEV_IO_TYPE_WRITE)) { 2579 assert(spdk_bdev_get_block_size(bdev) <= ZERO_BUFFER_SIZE); 2580 bdev_io->u.bdev.split_remaining_num_blocks = num_blocks; 2581 bdev_io->u.bdev.split_current_offset_blocks = offset_blocks; 2582 _spdk_bdev_write_zero_buffer_next(bdev_io); 2583 return 0; 2584 } else { 2585 spdk_bdev_free_io(bdev_io); 2586 return -ENOTSUP; 2587 } 2588 } 2589 2590 int 2591 spdk_bdev_unmap(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 2592 uint64_t offset, uint64_t nbytes, 2593 spdk_bdev_io_completion_cb cb, void *cb_arg) 2594 { 2595 uint64_t offset_blocks, num_blocks; 2596 2597 if (spdk_bdev_bytes_to_blocks(desc->bdev, offset, &offset_blocks, nbytes, &num_blocks) != 0) { 2598 return -EINVAL; 2599 } 2600 2601 return spdk_bdev_unmap_blocks(desc, ch, offset_blocks, num_blocks, cb, cb_arg); 2602 } 2603 2604 int 2605 spdk_bdev_unmap_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 2606 uint64_t offset_blocks, uint64_t num_blocks, 2607 spdk_bdev_io_completion_cb cb, void *cb_arg) 2608 { 2609 struct spdk_bdev *bdev = desc->bdev; 2610 struct spdk_bdev_io *bdev_io; 2611 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 2612 2613 if (!desc->write) { 2614 return -EBADF; 2615 } 2616 2617 if (!spdk_bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) { 2618 return -EINVAL; 2619 } 2620 2621 if (num_blocks == 0) { 2622 SPDK_ERRLOG("Can't unmap 0 bytes\n"); 2623 return -EINVAL; 2624 } 2625 2626 bdev_io = spdk_bdev_get_io(channel); 2627 if (!bdev_io) { 2628 return -ENOMEM; 2629 } 2630 2631 bdev_io->internal.ch = channel; 2632 bdev_io->internal.desc = desc; 2633 bdev_io->type = SPDK_BDEV_IO_TYPE_UNMAP; 2634 2635 bdev_io->u.bdev.iovs = &bdev_io->iov; 2636 bdev_io->u.bdev.iovs[0].iov_base = NULL; 2637 bdev_io->u.bdev.iovs[0].iov_len = 0; 2638 bdev_io->u.bdev.iovcnt = 1; 2639 2640 bdev_io->u.bdev.offset_blocks = offset_blocks; 2641 bdev_io->u.bdev.num_blocks = num_blocks; 2642 spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb); 2643 2644 spdk_bdev_io_submit(bdev_io); 2645 return 0; 2646 } 2647 2648 int 2649 spdk_bdev_flush(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 2650 uint64_t offset, uint64_t length, 2651 spdk_bdev_io_completion_cb cb, void *cb_arg) 2652 { 2653 uint64_t offset_blocks, num_blocks; 2654 2655 if (spdk_bdev_bytes_to_blocks(desc->bdev, offset, &offset_blocks, length, &num_blocks) != 0) { 2656 return -EINVAL; 2657 } 2658 2659 return spdk_bdev_flush_blocks(desc, ch, offset_blocks, num_blocks, cb, cb_arg); 2660 } 2661 2662 int 2663 spdk_bdev_flush_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 2664 uint64_t offset_blocks, uint64_t num_blocks, 2665 spdk_bdev_io_completion_cb cb, void *cb_arg) 2666 { 2667 struct spdk_bdev *bdev = desc->bdev; 2668 struct spdk_bdev_io *bdev_io; 2669 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 2670 2671 if (!desc->write) { 2672 return -EBADF; 2673 } 2674 2675 if (!spdk_bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) { 2676 return -EINVAL; 2677 } 2678 2679 bdev_io = spdk_bdev_get_io(channel); 2680 if (!bdev_io) { 2681 return -ENOMEM; 2682 } 2683 2684 bdev_io->internal.ch = channel; 2685 bdev_io->internal.desc = desc; 2686 bdev_io->type = SPDK_BDEV_IO_TYPE_FLUSH; 2687 bdev_io->u.bdev.iovs = NULL; 2688 bdev_io->u.bdev.iovcnt = 0; 2689 bdev_io->u.bdev.offset_blocks = offset_blocks; 2690 bdev_io->u.bdev.num_blocks = num_blocks; 2691 spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb); 2692 2693 spdk_bdev_io_submit(bdev_io); 2694 return 0; 2695 } 2696 2697 static void 2698 _spdk_bdev_reset_dev(struct spdk_io_channel_iter *i, int status) 2699 { 2700 struct spdk_bdev_channel *ch = spdk_io_channel_iter_get_ctx(i); 2701 struct spdk_bdev_io *bdev_io; 2702 2703 bdev_io = TAILQ_FIRST(&ch->queued_resets); 2704 TAILQ_REMOVE(&ch->queued_resets, bdev_io, internal.link); 2705 spdk_bdev_io_submit_reset(bdev_io); 2706 } 2707 2708 static void 2709 _spdk_bdev_reset_freeze_channel(struct spdk_io_channel_iter *i) 2710 { 2711 struct spdk_io_channel *ch; 2712 struct spdk_bdev_channel *channel; 2713 struct spdk_bdev_mgmt_channel *mgmt_channel; 2714 struct spdk_bdev_shared_resource *shared_resource; 2715 bdev_io_tailq_t tmp_queued; 2716 2717 TAILQ_INIT(&tmp_queued); 2718 2719 ch = spdk_io_channel_iter_get_channel(i); 2720 channel = spdk_io_channel_get_ctx(ch); 2721 shared_resource = channel->shared_resource; 2722 mgmt_channel = shared_resource->mgmt_ch; 2723 2724 channel->flags |= BDEV_CH_RESET_IN_PROGRESS; 2725 2726 if ((channel->flags & BDEV_CH_QOS_ENABLED) != 0) { 2727 /* The QoS object is always valid and readable while 2728 * the channel flag is set, so the lock here should not 2729 * be necessary. We're not in the fast path though, so 2730 * just take it anyway. */ 2731 pthread_mutex_lock(&channel->bdev->internal.mutex); 2732 if (channel->bdev->internal.qos->ch == channel) { 2733 TAILQ_SWAP(&channel->bdev->internal.qos->queued, &tmp_queued, spdk_bdev_io, internal.link); 2734 } 2735 pthread_mutex_unlock(&channel->bdev->internal.mutex); 2736 } 2737 2738 _spdk_bdev_abort_queued_io(&shared_resource->nomem_io, channel); 2739 _spdk_bdev_abort_buf_io(&mgmt_channel->need_buf_small, channel); 2740 _spdk_bdev_abort_buf_io(&mgmt_channel->need_buf_large, channel); 2741 _spdk_bdev_abort_queued_io(&tmp_queued, channel); 2742 2743 spdk_for_each_channel_continue(i, 0); 2744 } 2745 2746 static void 2747 _spdk_bdev_start_reset(void *ctx) 2748 { 2749 struct spdk_bdev_channel *ch = ctx; 2750 2751 spdk_for_each_channel(__bdev_to_io_dev(ch->bdev), _spdk_bdev_reset_freeze_channel, 2752 ch, _spdk_bdev_reset_dev); 2753 } 2754 2755 static void 2756 _spdk_bdev_channel_start_reset(struct spdk_bdev_channel *ch) 2757 { 2758 struct spdk_bdev *bdev = ch->bdev; 2759 2760 assert(!TAILQ_EMPTY(&ch->queued_resets)); 2761 2762 pthread_mutex_lock(&bdev->internal.mutex); 2763 if (bdev->internal.reset_in_progress == NULL) { 2764 bdev->internal.reset_in_progress = TAILQ_FIRST(&ch->queued_resets); 2765 /* 2766 * Take a channel reference for the target bdev for the life of this 2767 * reset. This guards against the channel getting destroyed while 2768 * spdk_for_each_channel() calls related to this reset IO are in 2769 * progress. We will release the reference when this reset is 2770 * completed. 2771 */ 2772 bdev->internal.reset_in_progress->u.reset.ch_ref = spdk_get_io_channel(__bdev_to_io_dev(bdev)); 2773 _spdk_bdev_start_reset(ch); 2774 } 2775 pthread_mutex_unlock(&bdev->internal.mutex); 2776 } 2777 2778 int 2779 spdk_bdev_reset(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 2780 spdk_bdev_io_completion_cb cb, void *cb_arg) 2781 { 2782 struct spdk_bdev *bdev = desc->bdev; 2783 struct spdk_bdev_io *bdev_io; 2784 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 2785 2786 bdev_io = spdk_bdev_get_io(channel); 2787 if (!bdev_io) { 2788 return -ENOMEM; 2789 } 2790 2791 bdev_io->internal.ch = channel; 2792 bdev_io->internal.desc = desc; 2793 bdev_io->type = SPDK_BDEV_IO_TYPE_RESET; 2794 bdev_io->u.reset.ch_ref = NULL; 2795 spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb); 2796 2797 pthread_mutex_lock(&bdev->internal.mutex); 2798 TAILQ_INSERT_TAIL(&channel->queued_resets, bdev_io, internal.link); 2799 pthread_mutex_unlock(&bdev->internal.mutex); 2800 2801 _spdk_bdev_channel_start_reset(channel); 2802 2803 return 0; 2804 } 2805 2806 void 2807 spdk_bdev_get_io_stat(struct spdk_bdev *bdev, struct spdk_io_channel *ch, 2808 struct spdk_bdev_io_stat *stat) 2809 { 2810 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 2811 2812 *stat = channel->stat; 2813 } 2814 2815 static void 2816 _spdk_bdev_get_device_stat_done(struct spdk_io_channel_iter *i, int status) 2817 { 2818 void *io_device = spdk_io_channel_iter_get_io_device(i); 2819 struct spdk_bdev_iostat_ctx *bdev_iostat_ctx = spdk_io_channel_iter_get_ctx(i); 2820 2821 bdev_iostat_ctx->cb(__bdev_from_io_dev(io_device), bdev_iostat_ctx->stat, 2822 bdev_iostat_ctx->cb_arg, 0); 2823 free(bdev_iostat_ctx); 2824 } 2825 2826 static void 2827 _spdk_bdev_get_each_channel_stat(struct spdk_io_channel_iter *i) 2828 { 2829 struct spdk_bdev_iostat_ctx *bdev_iostat_ctx = spdk_io_channel_iter_get_ctx(i); 2830 struct spdk_io_channel *ch = spdk_io_channel_iter_get_channel(i); 2831 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 2832 2833 _spdk_bdev_io_stat_add(bdev_iostat_ctx->stat, &channel->stat); 2834 spdk_for_each_channel_continue(i, 0); 2835 } 2836 2837 void 2838 spdk_bdev_get_device_stat(struct spdk_bdev *bdev, struct spdk_bdev_io_stat *stat, 2839 spdk_bdev_get_device_stat_cb cb, void *cb_arg) 2840 { 2841 struct spdk_bdev_iostat_ctx *bdev_iostat_ctx; 2842 2843 assert(bdev != NULL); 2844 assert(stat != NULL); 2845 assert(cb != NULL); 2846 2847 bdev_iostat_ctx = calloc(1, sizeof(struct spdk_bdev_iostat_ctx)); 2848 if (bdev_iostat_ctx == NULL) { 2849 SPDK_ERRLOG("Unable to allocate memory for spdk_bdev_iostat_ctx\n"); 2850 cb(bdev, stat, cb_arg, -ENOMEM); 2851 return; 2852 } 2853 2854 bdev_iostat_ctx->stat = stat; 2855 bdev_iostat_ctx->cb = cb; 2856 bdev_iostat_ctx->cb_arg = cb_arg; 2857 2858 /* Start with the statistics from previously deleted channels. */ 2859 pthread_mutex_lock(&bdev->internal.mutex); 2860 _spdk_bdev_io_stat_add(bdev_iostat_ctx->stat, &bdev->internal.stat); 2861 pthread_mutex_unlock(&bdev->internal.mutex); 2862 2863 /* Then iterate and add the statistics from each existing channel. */ 2864 spdk_for_each_channel(__bdev_to_io_dev(bdev), 2865 _spdk_bdev_get_each_channel_stat, 2866 bdev_iostat_ctx, 2867 _spdk_bdev_get_device_stat_done); 2868 } 2869 2870 int 2871 spdk_bdev_nvme_admin_passthru(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 2872 const struct spdk_nvme_cmd *cmd, void *buf, size_t nbytes, 2873 spdk_bdev_io_completion_cb cb, void *cb_arg) 2874 { 2875 struct spdk_bdev *bdev = desc->bdev; 2876 struct spdk_bdev_io *bdev_io; 2877 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 2878 2879 if (!desc->write) { 2880 return -EBADF; 2881 } 2882 2883 bdev_io = spdk_bdev_get_io(channel); 2884 if (!bdev_io) { 2885 return -ENOMEM; 2886 } 2887 2888 bdev_io->internal.ch = channel; 2889 bdev_io->internal.desc = desc; 2890 bdev_io->type = SPDK_BDEV_IO_TYPE_NVME_ADMIN; 2891 bdev_io->u.nvme_passthru.cmd = *cmd; 2892 bdev_io->u.nvme_passthru.buf = buf; 2893 bdev_io->u.nvme_passthru.nbytes = nbytes; 2894 bdev_io->u.nvme_passthru.md_buf = NULL; 2895 bdev_io->u.nvme_passthru.md_len = 0; 2896 2897 spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb); 2898 2899 spdk_bdev_io_submit(bdev_io); 2900 return 0; 2901 } 2902 2903 int 2904 spdk_bdev_nvme_io_passthru(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 2905 const struct spdk_nvme_cmd *cmd, void *buf, size_t nbytes, 2906 spdk_bdev_io_completion_cb cb, void *cb_arg) 2907 { 2908 struct spdk_bdev *bdev = desc->bdev; 2909 struct spdk_bdev_io *bdev_io; 2910 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 2911 2912 if (!desc->write) { 2913 /* 2914 * Do not try to parse the NVMe command - we could maybe use bits in the opcode 2915 * to easily determine if the command is a read or write, but for now just 2916 * do not allow io_passthru with a read-only descriptor. 2917 */ 2918 return -EBADF; 2919 } 2920 2921 bdev_io = spdk_bdev_get_io(channel); 2922 if (!bdev_io) { 2923 return -ENOMEM; 2924 } 2925 2926 bdev_io->internal.ch = channel; 2927 bdev_io->internal.desc = desc; 2928 bdev_io->type = SPDK_BDEV_IO_TYPE_NVME_IO; 2929 bdev_io->u.nvme_passthru.cmd = *cmd; 2930 bdev_io->u.nvme_passthru.buf = buf; 2931 bdev_io->u.nvme_passthru.nbytes = nbytes; 2932 bdev_io->u.nvme_passthru.md_buf = NULL; 2933 bdev_io->u.nvme_passthru.md_len = 0; 2934 2935 spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb); 2936 2937 spdk_bdev_io_submit(bdev_io); 2938 return 0; 2939 } 2940 2941 int 2942 spdk_bdev_nvme_io_passthru_md(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 2943 const struct spdk_nvme_cmd *cmd, void *buf, size_t nbytes, void *md_buf, size_t md_len, 2944 spdk_bdev_io_completion_cb cb, void *cb_arg) 2945 { 2946 struct spdk_bdev *bdev = desc->bdev; 2947 struct spdk_bdev_io *bdev_io; 2948 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 2949 2950 if (!desc->write) { 2951 /* 2952 * Do not try to parse the NVMe command - we could maybe use bits in the opcode 2953 * to easily determine if the command is a read or write, but for now just 2954 * do not allow io_passthru with a read-only descriptor. 2955 */ 2956 return -EBADF; 2957 } 2958 2959 bdev_io = spdk_bdev_get_io(channel); 2960 if (!bdev_io) { 2961 return -ENOMEM; 2962 } 2963 2964 bdev_io->internal.ch = channel; 2965 bdev_io->internal.desc = desc; 2966 bdev_io->type = SPDK_BDEV_IO_TYPE_NVME_IO_MD; 2967 bdev_io->u.nvme_passthru.cmd = *cmd; 2968 bdev_io->u.nvme_passthru.buf = buf; 2969 bdev_io->u.nvme_passthru.nbytes = nbytes; 2970 bdev_io->u.nvme_passthru.md_buf = md_buf; 2971 bdev_io->u.nvme_passthru.md_len = md_len; 2972 2973 spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb); 2974 2975 spdk_bdev_io_submit(bdev_io); 2976 return 0; 2977 } 2978 2979 int 2980 spdk_bdev_queue_io_wait(struct spdk_bdev *bdev, struct spdk_io_channel *ch, 2981 struct spdk_bdev_io_wait_entry *entry) 2982 { 2983 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 2984 struct spdk_bdev_mgmt_channel *mgmt_ch = channel->shared_resource->mgmt_ch; 2985 2986 if (bdev != entry->bdev) { 2987 SPDK_ERRLOG("bdevs do not match\n"); 2988 return -EINVAL; 2989 } 2990 2991 if (mgmt_ch->per_thread_cache_count > 0) { 2992 SPDK_ERRLOG("Cannot queue io_wait if spdk_bdev_io available in per-thread cache\n"); 2993 return -EINVAL; 2994 } 2995 2996 TAILQ_INSERT_TAIL(&mgmt_ch->io_wait_queue, entry, link); 2997 return 0; 2998 } 2999 3000 static void 3001 _spdk_bdev_ch_retry_io(struct spdk_bdev_channel *bdev_ch) 3002 { 3003 struct spdk_bdev *bdev = bdev_ch->bdev; 3004 struct spdk_bdev_shared_resource *shared_resource = bdev_ch->shared_resource; 3005 struct spdk_bdev_io *bdev_io; 3006 3007 if (shared_resource->io_outstanding > shared_resource->nomem_threshold) { 3008 /* 3009 * Allow some more I/O to complete before retrying the nomem_io queue. 3010 * Some drivers (such as nvme) cannot immediately take a new I/O in 3011 * the context of a completion, because the resources for the I/O are 3012 * not released until control returns to the bdev poller. Also, we 3013 * may require several small I/O to complete before a larger I/O 3014 * (that requires splitting) can be submitted. 3015 */ 3016 return; 3017 } 3018 3019 while (!TAILQ_EMPTY(&shared_resource->nomem_io)) { 3020 bdev_io = TAILQ_FIRST(&shared_resource->nomem_io); 3021 TAILQ_REMOVE(&shared_resource->nomem_io, bdev_io, internal.link); 3022 bdev_io->internal.ch->io_outstanding++; 3023 shared_resource->io_outstanding++; 3024 bdev_io->internal.status = SPDK_BDEV_IO_STATUS_PENDING; 3025 bdev->fn_table->submit_request(bdev_io->internal.ch->channel, bdev_io); 3026 if (bdev_io->internal.status == SPDK_BDEV_IO_STATUS_NOMEM) { 3027 break; 3028 } 3029 } 3030 } 3031 3032 static inline void 3033 _spdk_bdev_io_complete(void *ctx) 3034 { 3035 struct spdk_bdev_io *bdev_io = ctx; 3036 uint64_t tsc, tsc_diff; 3037 3038 if (spdk_unlikely(bdev_io->internal.in_submit_request || bdev_io->internal.io_submit_ch)) { 3039 /* 3040 * Send the completion to the thread that originally submitted the I/O, 3041 * which may not be the current thread in the case of QoS. 3042 */ 3043 if (bdev_io->internal.io_submit_ch) { 3044 bdev_io->internal.ch = bdev_io->internal.io_submit_ch; 3045 bdev_io->internal.io_submit_ch = NULL; 3046 } 3047 3048 /* 3049 * Defer completion to avoid potential infinite recursion if the 3050 * user's completion callback issues a new I/O. 3051 */ 3052 spdk_thread_send_msg(spdk_io_channel_get_thread(bdev_io->internal.ch->channel), 3053 _spdk_bdev_io_complete, bdev_io); 3054 return; 3055 } 3056 3057 tsc = spdk_get_ticks(); 3058 tsc_diff = tsc - bdev_io->internal.submit_tsc; 3059 spdk_trace_record_tsc(tsc, TRACE_BDEV_IO_DONE, 0, 0, (uintptr_t)bdev_io, 0); 3060 3061 if (bdev_io->internal.status == SPDK_BDEV_IO_STATUS_SUCCESS) { 3062 switch (bdev_io->type) { 3063 case SPDK_BDEV_IO_TYPE_READ: 3064 bdev_io->internal.ch->stat.bytes_read += bdev_io->u.bdev.num_blocks * bdev_io->bdev->blocklen; 3065 bdev_io->internal.ch->stat.num_read_ops++; 3066 bdev_io->internal.ch->stat.read_latency_ticks += tsc_diff; 3067 break; 3068 case SPDK_BDEV_IO_TYPE_WRITE: 3069 bdev_io->internal.ch->stat.bytes_written += bdev_io->u.bdev.num_blocks * bdev_io->bdev->blocklen; 3070 bdev_io->internal.ch->stat.num_write_ops++; 3071 bdev_io->internal.ch->stat.write_latency_ticks += tsc_diff; 3072 break; 3073 case SPDK_BDEV_IO_TYPE_UNMAP: 3074 bdev_io->internal.ch->stat.bytes_unmapped += bdev_io->u.bdev.num_blocks * bdev_io->bdev->blocklen; 3075 bdev_io->internal.ch->stat.num_unmap_ops++; 3076 bdev_io->internal.ch->stat.unmap_latency_ticks += tsc_diff; 3077 default: 3078 break; 3079 } 3080 } 3081 3082 #ifdef SPDK_CONFIG_VTUNE 3083 uint64_t now_tsc = spdk_get_ticks(); 3084 if (now_tsc > (bdev_io->internal.ch->start_tsc + bdev_io->internal.ch->interval_tsc)) { 3085 uint64_t data[5]; 3086 3087 data[0] = bdev_io->internal.ch->stat.num_read_ops - bdev_io->internal.ch->prev_stat.num_read_ops; 3088 data[1] = bdev_io->internal.ch->stat.bytes_read - bdev_io->internal.ch->prev_stat.bytes_read; 3089 data[2] = bdev_io->internal.ch->stat.num_write_ops - bdev_io->internal.ch->prev_stat.num_write_ops; 3090 data[3] = bdev_io->internal.ch->stat.bytes_written - bdev_io->internal.ch->prev_stat.bytes_written; 3091 data[4] = bdev_io->bdev->fn_table->get_spin_time ? 3092 bdev_io->bdev->fn_table->get_spin_time(bdev_io->internal.ch->channel) : 0; 3093 3094 __itt_metadata_add(g_bdev_mgr.domain, __itt_null, bdev_io->internal.ch->handle, 3095 __itt_metadata_u64, 5, data); 3096 3097 bdev_io->internal.ch->prev_stat = bdev_io->internal.ch->stat; 3098 bdev_io->internal.ch->start_tsc = now_tsc; 3099 } 3100 #endif 3101 3102 assert(bdev_io->internal.cb != NULL); 3103 assert(spdk_get_thread() == spdk_io_channel_get_thread(bdev_io->internal.ch->channel)); 3104 3105 bdev_io->internal.cb(bdev_io, bdev_io->internal.status == SPDK_BDEV_IO_STATUS_SUCCESS, 3106 bdev_io->internal.caller_ctx); 3107 } 3108 3109 static void 3110 _spdk_bdev_reset_complete(struct spdk_io_channel_iter *i, int status) 3111 { 3112 struct spdk_bdev_io *bdev_io = spdk_io_channel_iter_get_ctx(i); 3113 3114 if (bdev_io->u.reset.ch_ref != NULL) { 3115 spdk_put_io_channel(bdev_io->u.reset.ch_ref); 3116 bdev_io->u.reset.ch_ref = NULL; 3117 } 3118 3119 _spdk_bdev_io_complete(bdev_io); 3120 } 3121 3122 static void 3123 _spdk_bdev_unfreeze_channel(struct spdk_io_channel_iter *i) 3124 { 3125 struct spdk_io_channel *_ch = spdk_io_channel_iter_get_channel(i); 3126 struct spdk_bdev_channel *ch = spdk_io_channel_get_ctx(_ch); 3127 3128 ch->flags &= ~BDEV_CH_RESET_IN_PROGRESS; 3129 if (!TAILQ_EMPTY(&ch->queued_resets)) { 3130 _spdk_bdev_channel_start_reset(ch); 3131 } 3132 3133 spdk_for_each_channel_continue(i, 0); 3134 } 3135 3136 void 3137 spdk_bdev_io_complete(struct spdk_bdev_io *bdev_io, enum spdk_bdev_io_status status) 3138 { 3139 struct spdk_bdev *bdev = bdev_io->bdev; 3140 struct spdk_bdev_channel *bdev_ch = bdev_io->internal.ch; 3141 struct spdk_bdev_shared_resource *shared_resource = bdev_ch->shared_resource; 3142 3143 bdev_io->internal.status = status; 3144 3145 if (spdk_unlikely(bdev_io->type == SPDK_BDEV_IO_TYPE_RESET)) { 3146 bool unlock_channels = false; 3147 3148 if (status == SPDK_BDEV_IO_STATUS_NOMEM) { 3149 SPDK_ERRLOG("NOMEM returned for reset\n"); 3150 } 3151 pthread_mutex_lock(&bdev->internal.mutex); 3152 if (bdev_io == bdev->internal.reset_in_progress) { 3153 bdev->internal.reset_in_progress = NULL; 3154 unlock_channels = true; 3155 } 3156 pthread_mutex_unlock(&bdev->internal.mutex); 3157 3158 if (unlock_channels) { 3159 spdk_for_each_channel(__bdev_to_io_dev(bdev), _spdk_bdev_unfreeze_channel, 3160 bdev_io, _spdk_bdev_reset_complete); 3161 return; 3162 } 3163 } else { 3164 if (spdk_unlikely(bdev_io->internal.orig_iovcnt > 0)) { 3165 _bdev_io_unset_bounce_buf(bdev_io); 3166 } 3167 3168 assert(bdev_ch->io_outstanding > 0); 3169 assert(shared_resource->io_outstanding > 0); 3170 bdev_ch->io_outstanding--; 3171 shared_resource->io_outstanding--; 3172 3173 if (spdk_unlikely(status == SPDK_BDEV_IO_STATUS_NOMEM)) { 3174 TAILQ_INSERT_HEAD(&shared_resource->nomem_io, bdev_io, internal.link); 3175 /* 3176 * Wait for some of the outstanding I/O to complete before we 3177 * retry any of the nomem_io. Normally we will wait for 3178 * NOMEM_THRESHOLD_COUNT I/O to complete but for low queue 3179 * depth channels we will instead wait for half to complete. 3180 */ 3181 shared_resource->nomem_threshold = spdk_max((int64_t)shared_resource->io_outstanding / 2, 3182 (int64_t)shared_resource->io_outstanding - NOMEM_THRESHOLD_COUNT); 3183 return; 3184 } 3185 3186 if (spdk_unlikely(!TAILQ_EMPTY(&shared_resource->nomem_io))) { 3187 _spdk_bdev_ch_retry_io(bdev_ch); 3188 } 3189 } 3190 3191 _spdk_bdev_io_complete(bdev_io); 3192 } 3193 3194 void 3195 spdk_bdev_io_complete_scsi_status(struct spdk_bdev_io *bdev_io, enum spdk_scsi_status sc, 3196 enum spdk_scsi_sense sk, uint8_t asc, uint8_t ascq) 3197 { 3198 if (sc == SPDK_SCSI_STATUS_GOOD) { 3199 bdev_io->internal.status = SPDK_BDEV_IO_STATUS_SUCCESS; 3200 } else { 3201 bdev_io->internal.status = SPDK_BDEV_IO_STATUS_SCSI_ERROR; 3202 bdev_io->internal.error.scsi.sc = sc; 3203 bdev_io->internal.error.scsi.sk = sk; 3204 bdev_io->internal.error.scsi.asc = asc; 3205 bdev_io->internal.error.scsi.ascq = ascq; 3206 } 3207 3208 spdk_bdev_io_complete(bdev_io, bdev_io->internal.status); 3209 } 3210 3211 void 3212 spdk_bdev_io_get_scsi_status(const struct spdk_bdev_io *bdev_io, 3213 int *sc, int *sk, int *asc, int *ascq) 3214 { 3215 assert(sc != NULL); 3216 assert(sk != NULL); 3217 assert(asc != NULL); 3218 assert(ascq != NULL); 3219 3220 switch (bdev_io->internal.status) { 3221 case SPDK_BDEV_IO_STATUS_SUCCESS: 3222 *sc = SPDK_SCSI_STATUS_GOOD; 3223 *sk = SPDK_SCSI_SENSE_NO_SENSE; 3224 *asc = SPDK_SCSI_ASC_NO_ADDITIONAL_SENSE; 3225 *ascq = SPDK_SCSI_ASCQ_CAUSE_NOT_REPORTABLE; 3226 break; 3227 case SPDK_BDEV_IO_STATUS_NVME_ERROR: 3228 spdk_scsi_nvme_translate(bdev_io, sc, sk, asc, ascq); 3229 break; 3230 case SPDK_BDEV_IO_STATUS_SCSI_ERROR: 3231 *sc = bdev_io->internal.error.scsi.sc; 3232 *sk = bdev_io->internal.error.scsi.sk; 3233 *asc = bdev_io->internal.error.scsi.asc; 3234 *ascq = bdev_io->internal.error.scsi.ascq; 3235 break; 3236 default: 3237 *sc = SPDK_SCSI_STATUS_CHECK_CONDITION; 3238 *sk = SPDK_SCSI_SENSE_ABORTED_COMMAND; 3239 *asc = SPDK_SCSI_ASC_NO_ADDITIONAL_SENSE; 3240 *ascq = SPDK_SCSI_ASCQ_CAUSE_NOT_REPORTABLE; 3241 break; 3242 } 3243 } 3244 3245 void 3246 spdk_bdev_io_complete_nvme_status(struct spdk_bdev_io *bdev_io, int sct, int sc) 3247 { 3248 if (sct == SPDK_NVME_SCT_GENERIC && sc == SPDK_NVME_SC_SUCCESS) { 3249 bdev_io->internal.status = SPDK_BDEV_IO_STATUS_SUCCESS; 3250 } else { 3251 bdev_io->internal.error.nvme.sct = sct; 3252 bdev_io->internal.error.nvme.sc = sc; 3253 bdev_io->internal.status = SPDK_BDEV_IO_STATUS_NVME_ERROR; 3254 } 3255 3256 spdk_bdev_io_complete(bdev_io, bdev_io->internal.status); 3257 } 3258 3259 void 3260 spdk_bdev_io_get_nvme_status(const struct spdk_bdev_io *bdev_io, int *sct, int *sc) 3261 { 3262 assert(sct != NULL); 3263 assert(sc != NULL); 3264 3265 if (bdev_io->internal.status == SPDK_BDEV_IO_STATUS_NVME_ERROR) { 3266 *sct = bdev_io->internal.error.nvme.sct; 3267 *sc = bdev_io->internal.error.nvme.sc; 3268 } else if (bdev_io->internal.status == SPDK_BDEV_IO_STATUS_SUCCESS) { 3269 *sct = SPDK_NVME_SCT_GENERIC; 3270 *sc = SPDK_NVME_SC_SUCCESS; 3271 } else { 3272 *sct = SPDK_NVME_SCT_GENERIC; 3273 *sc = SPDK_NVME_SC_INTERNAL_DEVICE_ERROR; 3274 } 3275 } 3276 3277 struct spdk_thread * 3278 spdk_bdev_io_get_thread(struct spdk_bdev_io *bdev_io) 3279 { 3280 return spdk_io_channel_get_thread(bdev_io->internal.ch->channel); 3281 } 3282 3283 static void 3284 _spdk_bdev_qos_config_limit(struct spdk_bdev *bdev, uint64_t *limits) 3285 { 3286 uint64_t min_qos_set; 3287 int i; 3288 3289 for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) { 3290 if (limits[i] != SPDK_BDEV_QOS_LIMIT_NOT_DEFINED) { 3291 break; 3292 } 3293 } 3294 3295 if (i == SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES) { 3296 SPDK_ERRLOG("Invalid rate limits set.\n"); 3297 return; 3298 } 3299 3300 for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) { 3301 if (limits[i] == SPDK_BDEV_QOS_LIMIT_NOT_DEFINED) { 3302 continue; 3303 } 3304 3305 if (_spdk_bdev_qos_is_iops_rate_limit(i) == true) { 3306 min_qos_set = SPDK_BDEV_QOS_MIN_IOS_PER_SEC; 3307 } else { 3308 min_qos_set = SPDK_BDEV_QOS_MIN_BYTES_PER_SEC; 3309 } 3310 3311 if (limits[i] == 0 || limits[i] % min_qos_set) { 3312 SPDK_ERRLOG("Assigned limit %" PRIu64 " on bdev %s is not multiple of %" PRIu64 "\n", 3313 limits[i], bdev->name, min_qos_set); 3314 SPDK_ERRLOG("Failed to enable QoS on this bdev %s\n", bdev->name); 3315 return; 3316 } 3317 } 3318 3319 if (!bdev->internal.qos) { 3320 bdev->internal.qos = calloc(1, sizeof(*bdev->internal.qos)); 3321 if (!bdev->internal.qos) { 3322 SPDK_ERRLOG("Unable to allocate memory for QoS tracking\n"); 3323 return; 3324 } 3325 } 3326 3327 for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) { 3328 bdev->internal.qos->rate_limits[i].limit = limits[i]; 3329 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Bdev:%s QoS type:%d set:%lu\n", 3330 bdev->name, i, limits[i]); 3331 } 3332 3333 return; 3334 } 3335 3336 static void 3337 _spdk_bdev_qos_config(struct spdk_bdev *bdev) 3338 { 3339 struct spdk_conf_section *sp = NULL; 3340 const char *val = NULL; 3341 int i = 0, j = 0; 3342 uint64_t limits[SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES] = {}; 3343 bool config_qos = false; 3344 3345 sp = spdk_conf_find_section(NULL, "QoS"); 3346 if (!sp) { 3347 return; 3348 } 3349 3350 while (j < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES) { 3351 limits[j] = SPDK_BDEV_QOS_LIMIT_NOT_DEFINED; 3352 3353 i = 0; 3354 while (true) { 3355 val = spdk_conf_section_get_nmval(sp, qos_conf_type[j], i, 0); 3356 if (!val) { 3357 break; 3358 } 3359 3360 if (strcmp(bdev->name, val) != 0) { 3361 i++; 3362 continue; 3363 } 3364 3365 val = spdk_conf_section_get_nmval(sp, qos_conf_type[j], i, 1); 3366 if (val) { 3367 if (_spdk_bdev_qos_is_iops_rate_limit(j) == true) { 3368 limits[j] = strtoull(val, NULL, 10); 3369 } else { 3370 limits[j] = strtoull(val, NULL, 10) * 1024 * 1024; 3371 } 3372 config_qos = true; 3373 } 3374 3375 break; 3376 } 3377 3378 j++; 3379 } 3380 3381 if (config_qos == true) { 3382 _spdk_bdev_qos_config_limit(bdev, limits); 3383 } 3384 3385 return; 3386 } 3387 3388 static int 3389 spdk_bdev_init(struct spdk_bdev *bdev) 3390 { 3391 char *bdev_name; 3392 3393 assert(bdev->module != NULL); 3394 3395 if (!bdev->name) { 3396 SPDK_ERRLOG("Bdev name is NULL\n"); 3397 return -EINVAL; 3398 } 3399 3400 if (spdk_bdev_get_by_name(bdev->name)) { 3401 SPDK_ERRLOG("Bdev name:%s already exists\n", bdev->name); 3402 return -EEXIST; 3403 } 3404 3405 /* Users often register their own I/O devices using the bdev name. In 3406 * order to avoid conflicts, prepend bdev_. */ 3407 bdev_name = spdk_sprintf_alloc("bdev_%s", bdev->name); 3408 if (!bdev_name) { 3409 SPDK_ERRLOG("Unable to allocate memory for internal bdev name.\n"); 3410 return -ENOMEM; 3411 } 3412 3413 bdev->internal.status = SPDK_BDEV_STATUS_READY; 3414 bdev->internal.measured_queue_depth = UINT64_MAX; 3415 bdev->internal.claim_module = NULL; 3416 bdev->internal.qd_poller = NULL; 3417 bdev->internal.qos = NULL; 3418 3419 if (spdk_bdev_get_buf_align(bdev) > 1) { 3420 if (bdev->split_on_optimal_io_boundary) { 3421 bdev->optimal_io_boundary = spdk_min(bdev->optimal_io_boundary, 3422 SPDK_BDEV_LARGE_BUF_MAX_SIZE / bdev->blocklen); 3423 } else { 3424 bdev->split_on_optimal_io_boundary = true; 3425 bdev->optimal_io_boundary = SPDK_BDEV_LARGE_BUF_MAX_SIZE / bdev->blocklen; 3426 } 3427 } 3428 3429 TAILQ_INIT(&bdev->internal.open_descs); 3430 3431 TAILQ_INIT(&bdev->aliases); 3432 3433 bdev->internal.reset_in_progress = NULL; 3434 3435 _spdk_bdev_qos_config(bdev); 3436 3437 spdk_io_device_register(__bdev_to_io_dev(bdev), 3438 spdk_bdev_channel_create, spdk_bdev_channel_destroy, 3439 sizeof(struct spdk_bdev_channel), 3440 bdev_name); 3441 3442 free(bdev_name); 3443 3444 pthread_mutex_init(&bdev->internal.mutex, NULL); 3445 return 0; 3446 } 3447 3448 static void 3449 spdk_bdev_destroy_cb(void *io_device) 3450 { 3451 int rc; 3452 struct spdk_bdev *bdev; 3453 spdk_bdev_unregister_cb cb_fn; 3454 void *cb_arg; 3455 3456 bdev = __bdev_from_io_dev(io_device); 3457 cb_fn = bdev->internal.unregister_cb; 3458 cb_arg = bdev->internal.unregister_ctx; 3459 3460 rc = bdev->fn_table->destruct(bdev->ctxt); 3461 if (rc < 0) { 3462 SPDK_ERRLOG("destruct failed\n"); 3463 } 3464 if (rc <= 0 && cb_fn != NULL) { 3465 cb_fn(cb_arg, rc); 3466 } 3467 } 3468 3469 3470 static void 3471 spdk_bdev_fini(struct spdk_bdev *bdev) 3472 { 3473 pthread_mutex_destroy(&bdev->internal.mutex); 3474 3475 free(bdev->internal.qos); 3476 3477 spdk_io_device_unregister(__bdev_to_io_dev(bdev), spdk_bdev_destroy_cb); 3478 } 3479 3480 static void 3481 spdk_bdev_start(struct spdk_bdev *bdev) 3482 { 3483 struct spdk_bdev_module *module; 3484 uint32_t action; 3485 3486 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Inserting bdev %s into list\n", bdev->name); 3487 TAILQ_INSERT_TAIL(&g_bdev_mgr.bdevs, bdev, internal.link); 3488 3489 /* Examine configuration before initializing I/O */ 3490 TAILQ_FOREACH(module, &g_bdev_mgr.bdev_modules, internal.tailq) { 3491 if (module->examine_config) { 3492 action = module->internal.action_in_progress; 3493 module->internal.action_in_progress++; 3494 module->examine_config(bdev); 3495 if (action != module->internal.action_in_progress) { 3496 SPDK_ERRLOG("examine_config for module %s did not call spdk_bdev_module_examine_done()\n", 3497 module->name); 3498 } 3499 } 3500 } 3501 3502 if (bdev->internal.claim_module) { 3503 return; 3504 } 3505 3506 TAILQ_FOREACH(module, &g_bdev_mgr.bdev_modules, internal.tailq) { 3507 if (module->examine_disk) { 3508 module->internal.action_in_progress++; 3509 module->examine_disk(bdev); 3510 } 3511 } 3512 } 3513 3514 int 3515 spdk_bdev_register(struct spdk_bdev *bdev) 3516 { 3517 int rc = spdk_bdev_init(bdev); 3518 3519 if (rc == 0) { 3520 spdk_bdev_start(bdev); 3521 } 3522 3523 return rc; 3524 } 3525 3526 int 3527 spdk_vbdev_register(struct spdk_bdev *vbdev, struct spdk_bdev **base_bdevs, int base_bdev_count) 3528 { 3529 int rc; 3530 3531 rc = spdk_bdev_init(vbdev); 3532 if (rc) { 3533 return rc; 3534 } 3535 3536 spdk_bdev_start(vbdev); 3537 return 0; 3538 } 3539 3540 void 3541 spdk_bdev_destruct_done(struct spdk_bdev *bdev, int bdeverrno) 3542 { 3543 if (bdev->internal.unregister_cb != NULL) { 3544 bdev->internal.unregister_cb(bdev->internal.unregister_ctx, bdeverrno); 3545 } 3546 } 3547 3548 static void 3549 _remove_notify(void *arg) 3550 { 3551 struct spdk_bdev_desc *desc = arg; 3552 3553 desc->remove_scheduled = false; 3554 3555 if (desc->closed) { 3556 free(desc); 3557 } else { 3558 desc->remove_cb(desc->remove_ctx); 3559 } 3560 } 3561 3562 void 3563 spdk_bdev_unregister(struct spdk_bdev *bdev, spdk_bdev_unregister_cb cb_fn, void *cb_arg) 3564 { 3565 struct spdk_bdev_desc *desc, *tmp; 3566 bool do_destruct = true; 3567 struct spdk_thread *thread; 3568 3569 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Removing bdev %s from list\n", bdev->name); 3570 3571 thread = spdk_get_thread(); 3572 if (!thread) { 3573 /* The user called this from a non-SPDK thread. */ 3574 if (cb_fn != NULL) { 3575 cb_fn(cb_arg, -ENOTSUP); 3576 } 3577 return; 3578 } 3579 3580 pthread_mutex_lock(&bdev->internal.mutex); 3581 3582 bdev->internal.status = SPDK_BDEV_STATUS_REMOVING; 3583 bdev->internal.unregister_cb = cb_fn; 3584 bdev->internal.unregister_ctx = cb_arg; 3585 3586 TAILQ_FOREACH_SAFE(desc, &bdev->internal.open_descs, link, tmp) { 3587 if (desc->remove_cb) { 3588 do_destruct = false; 3589 /* 3590 * Defer invocation of the remove_cb to a separate message that will 3591 * run later on its thread. This ensures this context unwinds and 3592 * we don't recursively unregister this bdev again if the remove_cb 3593 * immediately closes its descriptor. 3594 */ 3595 if (!desc->remove_scheduled) { 3596 /* Avoid scheduling removal of the same descriptor multiple times. */ 3597 desc->remove_scheduled = true; 3598 spdk_thread_send_msg(desc->thread, _remove_notify, desc); 3599 } 3600 } 3601 } 3602 3603 if (!do_destruct) { 3604 pthread_mutex_unlock(&bdev->internal.mutex); 3605 return; 3606 } 3607 3608 TAILQ_REMOVE(&g_bdev_mgr.bdevs, bdev, internal.link); 3609 pthread_mutex_unlock(&bdev->internal.mutex); 3610 3611 spdk_bdev_fini(bdev); 3612 } 3613 3614 int 3615 spdk_bdev_open(struct spdk_bdev *bdev, bool write, spdk_bdev_remove_cb_t remove_cb, 3616 void *remove_ctx, struct spdk_bdev_desc **_desc) 3617 { 3618 struct spdk_bdev_desc *desc; 3619 struct spdk_thread *thread; 3620 3621 thread = spdk_get_thread(); 3622 if (!thread) { 3623 SPDK_ERRLOG("Cannot open bdev from non-SPDK thread.\n"); 3624 return -ENOTSUP; 3625 } 3626 3627 desc = calloc(1, sizeof(*desc)); 3628 if (desc == NULL) { 3629 SPDK_ERRLOG("Failed to allocate memory for bdev descriptor\n"); 3630 return -ENOMEM; 3631 } 3632 3633 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Opening descriptor %p for bdev %s on thread %p\n", desc, bdev->name, 3634 spdk_get_thread()); 3635 3636 desc->bdev = bdev; 3637 desc->thread = thread; 3638 desc->remove_cb = remove_cb; 3639 desc->remove_ctx = remove_ctx; 3640 desc->write = write; 3641 *_desc = desc; 3642 3643 pthread_mutex_lock(&bdev->internal.mutex); 3644 3645 if (write && bdev->internal.claim_module) { 3646 SPDK_ERRLOG("Could not open %s - %s module already claimed it\n", 3647 bdev->name, bdev->internal.claim_module->name); 3648 pthread_mutex_unlock(&bdev->internal.mutex); 3649 free(desc); 3650 *_desc = NULL; 3651 return -EPERM; 3652 } 3653 3654 TAILQ_INSERT_TAIL(&bdev->internal.open_descs, desc, link); 3655 3656 pthread_mutex_unlock(&bdev->internal.mutex); 3657 3658 return 0; 3659 } 3660 3661 void 3662 spdk_bdev_close(struct spdk_bdev_desc *desc) 3663 { 3664 struct spdk_bdev *bdev = desc->bdev; 3665 bool do_unregister = false; 3666 3667 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Closing descriptor %p for bdev %s on thread %p\n", desc, bdev->name, 3668 spdk_get_thread()); 3669 3670 assert(desc->thread == spdk_get_thread()); 3671 3672 pthread_mutex_lock(&bdev->internal.mutex); 3673 3674 TAILQ_REMOVE(&bdev->internal.open_descs, desc, link); 3675 3676 desc->closed = true; 3677 3678 if (!desc->remove_scheduled) { 3679 free(desc); 3680 } 3681 3682 /* If no more descriptors, kill QoS channel */ 3683 if (bdev->internal.qos && TAILQ_EMPTY(&bdev->internal.open_descs)) { 3684 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Closed last descriptor for bdev %s on thread %p. Stopping QoS.\n", 3685 bdev->name, spdk_get_thread()); 3686 3687 if (spdk_bdev_qos_destroy(bdev)) { 3688 /* There isn't anything we can do to recover here. Just let the 3689 * old QoS poller keep running. The QoS handling won't change 3690 * cores when the user allocates a new channel, but it won't break. */ 3691 SPDK_ERRLOG("Unable to shut down QoS poller. It will continue running on the current thread.\n"); 3692 } 3693 } 3694 3695 spdk_bdev_set_qd_sampling_period(bdev, 0); 3696 3697 if (bdev->internal.status == SPDK_BDEV_STATUS_REMOVING && TAILQ_EMPTY(&bdev->internal.open_descs)) { 3698 do_unregister = true; 3699 } 3700 pthread_mutex_unlock(&bdev->internal.mutex); 3701 3702 if (do_unregister == true) { 3703 spdk_bdev_unregister(bdev, bdev->internal.unregister_cb, bdev->internal.unregister_ctx); 3704 } 3705 } 3706 3707 int 3708 spdk_bdev_module_claim_bdev(struct spdk_bdev *bdev, struct spdk_bdev_desc *desc, 3709 struct spdk_bdev_module *module) 3710 { 3711 if (bdev->internal.claim_module != NULL) { 3712 SPDK_ERRLOG("bdev %s already claimed by module %s\n", bdev->name, 3713 bdev->internal.claim_module->name); 3714 return -EPERM; 3715 } 3716 3717 if (desc && !desc->write) { 3718 desc->write = true; 3719 } 3720 3721 bdev->internal.claim_module = module; 3722 return 0; 3723 } 3724 3725 void 3726 spdk_bdev_module_release_bdev(struct spdk_bdev *bdev) 3727 { 3728 assert(bdev->internal.claim_module != NULL); 3729 bdev->internal.claim_module = NULL; 3730 } 3731 3732 struct spdk_bdev * 3733 spdk_bdev_desc_get_bdev(struct spdk_bdev_desc *desc) 3734 { 3735 return desc->bdev; 3736 } 3737 3738 void 3739 spdk_bdev_io_get_iovec(struct spdk_bdev_io *bdev_io, struct iovec **iovp, int *iovcntp) 3740 { 3741 struct iovec *iovs; 3742 int iovcnt; 3743 3744 if (bdev_io == NULL) { 3745 return; 3746 } 3747 3748 switch (bdev_io->type) { 3749 case SPDK_BDEV_IO_TYPE_READ: 3750 iovs = bdev_io->u.bdev.iovs; 3751 iovcnt = bdev_io->u.bdev.iovcnt; 3752 break; 3753 case SPDK_BDEV_IO_TYPE_WRITE: 3754 iovs = bdev_io->u.bdev.iovs; 3755 iovcnt = bdev_io->u.bdev.iovcnt; 3756 break; 3757 default: 3758 iovs = NULL; 3759 iovcnt = 0; 3760 break; 3761 } 3762 3763 if (iovp) { 3764 *iovp = iovs; 3765 } 3766 if (iovcntp) { 3767 *iovcntp = iovcnt; 3768 } 3769 } 3770 3771 void 3772 spdk_bdev_module_list_add(struct spdk_bdev_module *bdev_module) 3773 { 3774 3775 if (spdk_bdev_module_list_find(bdev_module->name)) { 3776 SPDK_ERRLOG("ERROR: module '%s' already registered.\n", bdev_module->name); 3777 assert(false); 3778 } 3779 3780 if (bdev_module->async_init) { 3781 bdev_module->internal.action_in_progress = 1; 3782 } 3783 3784 /* 3785 * Modules with examine callbacks must be initialized first, so they are 3786 * ready to handle examine callbacks from later modules that will 3787 * register physical bdevs. 3788 */ 3789 if (bdev_module->examine_config != NULL || bdev_module->examine_disk != NULL) { 3790 TAILQ_INSERT_HEAD(&g_bdev_mgr.bdev_modules, bdev_module, internal.tailq); 3791 } else { 3792 TAILQ_INSERT_TAIL(&g_bdev_mgr.bdev_modules, bdev_module, internal.tailq); 3793 } 3794 } 3795 3796 struct spdk_bdev_module * 3797 spdk_bdev_module_list_find(const char *name) 3798 { 3799 struct spdk_bdev_module *bdev_module; 3800 3801 TAILQ_FOREACH(bdev_module, &g_bdev_mgr.bdev_modules, internal.tailq) { 3802 if (strcmp(name, bdev_module->name) == 0) { 3803 break; 3804 } 3805 } 3806 3807 return bdev_module; 3808 } 3809 3810 static void 3811 _spdk_bdev_write_zero_buffer_next(void *_bdev_io) 3812 { 3813 struct spdk_bdev_io *bdev_io = _bdev_io; 3814 uint64_t num_bytes, num_blocks; 3815 int rc; 3816 3817 num_bytes = spdk_min(spdk_bdev_get_block_size(bdev_io->bdev) * 3818 bdev_io->u.bdev.split_remaining_num_blocks, 3819 ZERO_BUFFER_SIZE); 3820 num_blocks = num_bytes / spdk_bdev_get_block_size(bdev_io->bdev); 3821 3822 rc = spdk_bdev_write_blocks(bdev_io->internal.desc, 3823 spdk_io_channel_from_ctx(bdev_io->internal.ch), 3824 g_bdev_mgr.zero_buffer, 3825 bdev_io->u.bdev.split_current_offset_blocks, num_blocks, 3826 _spdk_bdev_write_zero_buffer_done, bdev_io); 3827 if (rc == 0) { 3828 bdev_io->u.bdev.split_remaining_num_blocks -= num_blocks; 3829 bdev_io->u.bdev.split_current_offset_blocks += num_blocks; 3830 } else if (rc == -ENOMEM) { 3831 _spdk_bdev_queue_io_wait_with_cb(bdev_io, _spdk_bdev_write_zero_buffer_next); 3832 } else { 3833 bdev_io->internal.status = SPDK_BDEV_IO_STATUS_FAILED; 3834 bdev_io->internal.cb(bdev_io, false, bdev_io->internal.caller_ctx); 3835 } 3836 } 3837 3838 static void 3839 _spdk_bdev_write_zero_buffer_done(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg) 3840 { 3841 struct spdk_bdev_io *parent_io = cb_arg; 3842 3843 spdk_bdev_free_io(bdev_io); 3844 3845 if (!success) { 3846 parent_io->internal.status = SPDK_BDEV_IO_STATUS_FAILED; 3847 parent_io->internal.cb(parent_io, false, parent_io->internal.caller_ctx); 3848 return; 3849 } 3850 3851 if (parent_io->u.bdev.split_remaining_num_blocks == 0) { 3852 parent_io->internal.status = SPDK_BDEV_IO_STATUS_SUCCESS; 3853 parent_io->internal.cb(parent_io, true, parent_io->internal.caller_ctx); 3854 return; 3855 } 3856 3857 _spdk_bdev_write_zero_buffer_next(parent_io); 3858 } 3859 3860 struct set_qos_limit_ctx { 3861 void (*cb_fn)(void *cb_arg, int status); 3862 void *cb_arg; 3863 struct spdk_bdev *bdev; 3864 }; 3865 3866 static void 3867 _spdk_bdev_set_qos_limit_done(struct set_qos_limit_ctx *ctx, int status) 3868 { 3869 pthread_mutex_lock(&ctx->bdev->internal.mutex); 3870 ctx->bdev->internal.qos_mod_in_progress = false; 3871 pthread_mutex_unlock(&ctx->bdev->internal.mutex); 3872 3873 ctx->cb_fn(ctx->cb_arg, status); 3874 free(ctx); 3875 } 3876 3877 static void 3878 _spdk_bdev_disable_qos_done(void *cb_arg) 3879 { 3880 struct set_qos_limit_ctx *ctx = cb_arg; 3881 struct spdk_bdev *bdev = ctx->bdev; 3882 struct spdk_bdev_io *bdev_io; 3883 struct spdk_bdev_qos *qos; 3884 3885 pthread_mutex_lock(&bdev->internal.mutex); 3886 qos = bdev->internal.qos; 3887 bdev->internal.qos = NULL; 3888 pthread_mutex_unlock(&bdev->internal.mutex); 3889 3890 while (!TAILQ_EMPTY(&qos->queued)) { 3891 /* Send queued I/O back to their original thread for resubmission. */ 3892 bdev_io = TAILQ_FIRST(&qos->queued); 3893 TAILQ_REMOVE(&qos->queued, bdev_io, internal.link); 3894 3895 if (bdev_io->internal.io_submit_ch) { 3896 /* 3897 * Channel was changed when sending it to the QoS thread - change it back 3898 * before sending it back to the original thread. 3899 */ 3900 bdev_io->internal.ch = bdev_io->internal.io_submit_ch; 3901 bdev_io->internal.io_submit_ch = NULL; 3902 } 3903 3904 spdk_thread_send_msg(spdk_io_channel_get_thread(bdev_io->internal.ch->channel), 3905 _spdk_bdev_io_submit, bdev_io); 3906 } 3907 3908 if (qos->thread != NULL) { 3909 spdk_put_io_channel(spdk_io_channel_from_ctx(qos->ch)); 3910 spdk_poller_unregister(&qos->poller); 3911 } 3912 3913 free(qos); 3914 3915 _spdk_bdev_set_qos_limit_done(ctx, 0); 3916 } 3917 3918 static void 3919 _spdk_bdev_disable_qos_msg_done(struct spdk_io_channel_iter *i, int status) 3920 { 3921 void *io_device = spdk_io_channel_iter_get_io_device(i); 3922 struct spdk_bdev *bdev = __bdev_from_io_dev(io_device); 3923 struct set_qos_limit_ctx *ctx = spdk_io_channel_iter_get_ctx(i); 3924 struct spdk_thread *thread; 3925 3926 pthread_mutex_lock(&bdev->internal.mutex); 3927 thread = bdev->internal.qos->thread; 3928 pthread_mutex_unlock(&bdev->internal.mutex); 3929 3930 if (thread != NULL) { 3931 spdk_thread_send_msg(thread, _spdk_bdev_disable_qos_done, ctx); 3932 } else { 3933 _spdk_bdev_disable_qos_done(ctx); 3934 } 3935 } 3936 3937 static void 3938 _spdk_bdev_disable_qos_msg(struct spdk_io_channel_iter *i) 3939 { 3940 struct spdk_io_channel *ch = spdk_io_channel_iter_get_channel(i); 3941 struct spdk_bdev_channel *bdev_ch = spdk_io_channel_get_ctx(ch); 3942 3943 bdev_ch->flags &= ~BDEV_CH_QOS_ENABLED; 3944 3945 spdk_for_each_channel_continue(i, 0); 3946 } 3947 3948 static void 3949 _spdk_bdev_update_qos_rate_limit_msg(void *cb_arg) 3950 { 3951 struct set_qos_limit_ctx *ctx = cb_arg; 3952 struct spdk_bdev *bdev = ctx->bdev; 3953 3954 pthread_mutex_lock(&bdev->internal.mutex); 3955 spdk_bdev_qos_update_max_quota_per_timeslice(bdev->internal.qos); 3956 pthread_mutex_unlock(&bdev->internal.mutex); 3957 3958 _spdk_bdev_set_qos_limit_done(ctx, 0); 3959 } 3960 3961 static void 3962 _spdk_bdev_enable_qos_msg(struct spdk_io_channel_iter *i) 3963 { 3964 void *io_device = spdk_io_channel_iter_get_io_device(i); 3965 struct spdk_bdev *bdev = __bdev_from_io_dev(io_device); 3966 struct spdk_io_channel *ch = spdk_io_channel_iter_get_channel(i); 3967 struct spdk_bdev_channel *bdev_ch = spdk_io_channel_get_ctx(ch); 3968 3969 pthread_mutex_lock(&bdev->internal.mutex); 3970 _spdk_bdev_enable_qos(bdev, bdev_ch); 3971 pthread_mutex_unlock(&bdev->internal.mutex); 3972 spdk_for_each_channel_continue(i, 0); 3973 } 3974 3975 static void 3976 _spdk_bdev_enable_qos_done(struct spdk_io_channel_iter *i, int status) 3977 { 3978 struct set_qos_limit_ctx *ctx = spdk_io_channel_iter_get_ctx(i); 3979 3980 _spdk_bdev_set_qos_limit_done(ctx, status); 3981 } 3982 3983 static void 3984 _spdk_bdev_set_qos_rate_limits(struct spdk_bdev *bdev, uint64_t *limits) 3985 { 3986 int i; 3987 3988 assert(bdev->internal.qos != NULL); 3989 3990 for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) { 3991 if (limits[i] != SPDK_BDEV_QOS_LIMIT_NOT_DEFINED) { 3992 bdev->internal.qos->rate_limits[i].limit = limits[i]; 3993 3994 if (limits[i] == 0) { 3995 bdev->internal.qos->rate_limits[i].limit = 3996 SPDK_BDEV_QOS_LIMIT_NOT_DEFINED; 3997 } 3998 } 3999 } 4000 } 4001 4002 void 4003 spdk_bdev_set_qos_rate_limits(struct spdk_bdev *bdev, uint64_t *limits, 4004 void (*cb_fn)(void *cb_arg, int status), void *cb_arg) 4005 { 4006 struct set_qos_limit_ctx *ctx; 4007 uint32_t limit_set_complement; 4008 uint64_t min_limit_per_sec; 4009 int i; 4010 bool disable_rate_limit = true; 4011 4012 for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) { 4013 if (limits[i] == SPDK_BDEV_QOS_LIMIT_NOT_DEFINED) { 4014 continue; 4015 } 4016 4017 if (limits[i] > 0) { 4018 disable_rate_limit = false; 4019 } 4020 4021 if (_spdk_bdev_qos_is_iops_rate_limit(i) == true) { 4022 min_limit_per_sec = SPDK_BDEV_QOS_MIN_IOS_PER_SEC; 4023 } else { 4024 /* Change from megabyte to byte rate limit */ 4025 limits[i] = limits[i] * 1024 * 1024; 4026 min_limit_per_sec = SPDK_BDEV_QOS_MIN_BYTES_PER_SEC; 4027 } 4028 4029 limit_set_complement = limits[i] % min_limit_per_sec; 4030 if (limit_set_complement) { 4031 SPDK_ERRLOG("Requested rate limit %" PRIu64 " is not a multiple of %" PRIu64 "\n", 4032 limits[i], min_limit_per_sec); 4033 limits[i] += min_limit_per_sec - limit_set_complement; 4034 SPDK_ERRLOG("Round up the rate limit to %" PRIu64 "\n", limits[i]); 4035 } 4036 } 4037 4038 ctx = calloc(1, sizeof(*ctx)); 4039 if (ctx == NULL) { 4040 cb_fn(cb_arg, -ENOMEM); 4041 return; 4042 } 4043 4044 ctx->cb_fn = cb_fn; 4045 ctx->cb_arg = cb_arg; 4046 ctx->bdev = bdev; 4047 4048 pthread_mutex_lock(&bdev->internal.mutex); 4049 if (bdev->internal.qos_mod_in_progress) { 4050 pthread_mutex_unlock(&bdev->internal.mutex); 4051 free(ctx); 4052 cb_fn(cb_arg, -EAGAIN); 4053 return; 4054 } 4055 bdev->internal.qos_mod_in_progress = true; 4056 4057 if (disable_rate_limit == true && bdev->internal.qos) { 4058 for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) { 4059 if (limits[i] == SPDK_BDEV_QOS_LIMIT_NOT_DEFINED && 4060 (bdev->internal.qos->rate_limits[i].limit > 0 && 4061 bdev->internal.qos->rate_limits[i].limit != 4062 SPDK_BDEV_QOS_LIMIT_NOT_DEFINED)) { 4063 disable_rate_limit = false; 4064 break; 4065 } 4066 } 4067 } 4068 4069 if (disable_rate_limit == false) { 4070 if (bdev->internal.qos == NULL) { 4071 bdev->internal.qos = calloc(1, sizeof(*bdev->internal.qos)); 4072 if (!bdev->internal.qos) { 4073 pthread_mutex_unlock(&bdev->internal.mutex); 4074 SPDK_ERRLOG("Unable to allocate memory for QoS tracking\n"); 4075 free(ctx); 4076 cb_fn(cb_arg, -ENOMEM); 4077 return; 4078 } 4079 } 4080 4081 if (bdev->internal.qos->thread == NULL) { 4082 /* Enabling */ 4083 _spdk_bdev_set_qos_rate_limits(bdev, limits); 4084 4085 spdk_for_each_channel(__bdev_to_io_dev(bdev), 4086 _spdk_bdev_enable_qos_msg, ctx, 4087 _spdk_bdev_enable_qos_done); 4088 } else { 4089 /* Updating */ 4090 _spdk_bdev_set_qos_rate_limits(bdev, limits); 4091 4092 spdk_thread_send_msg(bdev->internal.qos->thread, 4093 _spdk_bdev_update_qos_rate_limit_msg, ctx); 4094 } 4095 } else { 4096 if (bdev->internal.qos != NULL) { 4097 _spdk_bdev_set_qos_rate_limits(bdev, limits); 4098 4099 /* Disabling */ 4100 spdk_for_each_channel(__bdev_to_io_dev(bdev), 4101 _spdk_bdev_disable_qos_msg, ctx, 4102 _spdk_bdev_disable_qos_msg_done); 4103 } else { 4104 pthread_mutex_unlock(&bdev->internal.mutex); 4105 _spdk_bdev_set_qos_limit_done(ctx, 0); 4106 return; 4107 } 4108 } 4109 4110 pthread_mutex_unlock(&bdev->internal.mutex); 4111 } 4112 4113 SPDK_LOG_REGISTER_COMPONENT("bdev", SPDK_LOG_BDEV) 4114 4115 SPDK_TRACE_REGISTER_FN(bdev_trace, "bdev", TRACE_GROUP_BDEV) 4116 { 4117 spdk_trace_register_owner(OWNER_BDEV, 'b'); 4118 spdk_trace_register_object(OBJECT_BDEV_IO, 'i'); 4119 spdk_trace_register_description("BDEV_IO_START", "", TRACE_BDEV_IO_START, OWNER_BDEV, 4120 OBJECT_BDEV_IO, 1, 0, "type: "); 4121 spdk_trace_register_description("BDEV_IO_DONE", "", TRACE_BDEV_IO_DONE, OWNER_BDEV, 4122 OBJECT_BDEV_IO, 0, 0, ""); 4123 } 4124