1 /*- 2 * BSD LICENSE 3 * 4 * Copyright (c) Intel Corporation. 5 * All rights reserved. 6 * 7 * Redistribution and use in source and binary forms, with or without 8 * modification, are permitted provided that the following conditions 9 * are met: 10 * 11 * * Redistributions of source code must retain the above copyright 12 * notice, this list of conditions and the following disclaimer. 13 * * Redistributions in binary form must reproduce the above copyright 14 * notice, this list of conditions and the following disclaimer in 15 * the documentation and/or other materials provided with the 16 * distribution. 17 * * Neither the name of Intel Corporation nor the names of its 18 * contributors may be used to endorse or promote products derived 19 * from this software without specific prior written permission. 20 * 21 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 22 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 23 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR 24 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT 25 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, 26 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT 27 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 28 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY 29 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 30 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE 31 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 32 */ 33 34 #include "spdk/stdinc.h" 35 36 #include "spdk/bdev.h" 37 #include "spdk/conf.h" 38 39 #include "spdk/config.h" 40 #include "spdk/env.h" 41 #include "spdk/event.h" 42 #include "spdk/thread.h" 43 #include "spdk/likely.h" 44 #include "spdk/queue.h" 45 #include "spdk/nvme_spec.h" 46 #include "spdk/scsi_spec.h" 47 #include "spdk/util.h" 48 #include "spdk/trace.h" 49 50 #include "spdk/bdev_module.h" 51 #include "spdk_internal/log.h" 52 #include "spdk/string.h" 53 54 #ifdef SPDK_CONFIG_VTUNE 55 #include "ittnotify.h" 56 #include "ittnotify_types.h" 57 int __itt_init_ittlib(const char *, __itt_group_id); 58 #endif 59 60 #define SPDK_BDEV_IO_POOL_SIZE (64 * 1024) 61 #define SPDK_BDEV_IO_CACHE_SIZE 256 62 #define BUF_SMALL_POOL_SIZE 8192 63 #define BUF_LARGE_POOL_SIZE 1024 64 #define NOMEM_THRESHOLD_COUNT 8 65 #define ZERO_BUFFER_SIZE 0x100000 66 67 #define OWNER_BDEV 0x2 68 69 #define OBJECT_BDEV_IO 0x2 70 71 #define TRACE_GROUP_BDEV 0x3 72 #define TRACE_BDEV_IO_START SPDK_TPOINT_ID(TRACE_GROUP_BDEV, 0x0) 73 #define TRACE_BDEV_IO_DONE SPDK_TPOINT_ID(TRACE_GROUP_BDEV, 0x1) 74 75 #define SPDK_BDEV_QOS_TIMESLICE_IN_USEC 1000 76 #define SPDK_BDEV_QOS_MIN_IO_PER_TIMESLICE 1 77 #define SPDK_BDEV_QOS_MIN_BYTE_PER_TIMESLICE 512 78 #define SPDK_BDEV_QOS_MIN_IOS_PER_SEC 10000 79 #define SPDK_BDEV_QOS_MIN_BYTES_PER_SEC (10 * 1024 * 1024) 80 #define SPDK_BDEV_QOS_LIMIT_NOT_DEFINED UINT64_MAX 81 82 #define SPDK_BDEV_POOL_ALIGNMENT 512 83 84 static const char *qos_conf_type[] = {"Limit_IOPS", "Limit_BPS"}; 85 static const char *qos_rpc_type[] = {"rw_ios_per_sec", "rw_mbytes_per_sec"}; 86 87 TAILQ_HEAD(spdk_bdev_list, spdk_bdev); 88 89 struct spdk_bdev_mgr { 90 struct spdk_mempool *bdev_io_pool; 91 92 struct spdk_mempool *buf_small_pool; 93 struct spdk_mempool *buf_large_pool; 94 95 void *zero_buffer; 96 97 TAILQ_HEAD(bdev_module_list, spdk_bdev_module) bdev_modules; 98 99 struct spdk_bdev_list bdevs; 100 101 bool init_complete; 102 bool module_init_complete; 103 104 #ifdef SPDK_CONFIG_VTUNE 105 __itt_domain *domain; 106 #endif 107 }; 108 109 static struct spdk_bdev_mgr g_bdev_mgr = { 110 .bdev_modules = TAILQ_HEAD_INITIALIZER(g_bdev_mgr.bdev_modules), 111 .bdevs = TAILQ_HEAD_INITIALIZER(g_bdev_mgr.bdevs), 112 .init_complete = false, 113 .module_init_complete = false, 114 }; 115 116 static struct spdk_bdev_opts g_bdev_opts = { 117 .bdev_io_pool_size = SPDK_BDEV_IO_POOL_SIZE, 118 .bdev_io_cache_size = SPDK_BDEV_IO_CACHE_SIZE, 119 }; 120 121 static spdk_bdev_init_cb g_init_cb_fn = NULL; 122 static void *g_init_cb_arg = NULL; 123 124 static spdk_bdev_fini_cb g_fini_cb_fn = NULL; 125 static void *g_fini_cb_arg = NULL; 126 static struct spdk_thread *g_fini_thread = NULL; 127 128 struct spdk_bdev_qos_limit { 129 /** IOs or bytes allowed per second (i.e., 1s). */ 130 uint64_t limit; 131 132 /** Remaining IOs or bytes allowed in current timeslice (e.g., 1ms). 133 * For remaining bytes, allowed to run negative if an I/O is submitted when 134 * some bytes are remaining, but the I/O is bigger than that amount. The 135 * excess will be deducted from the next timeslice. 136 */ 137 int64_t remaining_this_timeslice; 138 139 /** Minimum allowed IOs or bytes to be issued in one timeslice (e.g., 1ms). */ 140 uint32_t min_per_timeslice; 141 142 /** Maximum allowed IOs or bytes to be issued in one timeslice (e.g., 1ms). */ 143 uint32_t max_per_timeslice; 144 }; 145 146 struct spdk_bdev_qos { 147 /** Types of structure of rate limits. */ 148 struct spdk_bdev_qos_limit rate_limits[SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES]; 149 150 /** The channel that all I/O are funneled through. */ 151 struct spdk_bdev_channel *ch; 152 153 /** The thread on which the poller is running. */ 154 struct spdk_thread *thread; 155 156 /** Queue of I/O waiting to be issued. */ 157 bdev_io_tailq_t queued; 158 159 /** Size of a timeslice in tsc ticks. */ 160 uint64_t timeslice_size; 161 162 /** Timestamp of start of last timeslice. */ 163 uint64_t last_timeslice; 164 165 /** Poller that processes queued I/O commands each time slice. */ 166 struct spdk_poller *poller; 167 }; 168 169 struct spdk_bdev_mgmt_channel { 170 bdev_io_stailq_t need_buf_small; 171 bdev_io_stailq_t need_buf_large; 172 173 /* 174 * Each thread keeps a cache of bdev_io - this allows 175 * bdev threads which are *not* DPDK threads to still 176 * benefit from a per-thread bdev_io cache. Without 177 * this, non-DPDK threads fetching from the mempool 178 * incur a cmpxchg on get and put. 179 */ 180 bdev_io_stailq_t per_thread_cache; 181 uint32_t per_thread_cache_count; 182 uint32_t bdev_io_cache_size; 183 184 TAILQ_HEAD(, spdk_bdev_shared_resource) shared_resources; 185 TAILQ_HEAD(, spdk_bdev_io_wait_entry) io_wait_queue; 186 }; 187 188 /* 189 * Per-module (or per-io_device) data. Multiple bdevs built on the same io_device 190 * will queue here their IO that awaits retry. It makes it possible to retry sending 191 * IO to one bdev after IO from other bdev completes. 192 */ 193 struct spdk_bdev_shared_resource { 194 /* The bdev management channel */ 195 struct spdk_bdev_mgmt_channel *mgmt_ch; 196 197 /* 198 * Count of I/O submitted to bdev module and waiting for completion. 199 * Incremented before submit_request() is called on an spdk_bdev_io. 200 */ 201 uint64_t io_outstanding; 202 203 /* 204 * Queue of IO awaiting retry because of a previous NOMEM status returned 205 * on this channel. 206 */ 207 bdev_io_tailq_t nomem_io; 208 209 /* 210 * Threshold which io_outstanding must drop to before retrying nomem_io. 211 */ 212 uint64_t nomem_threshold; 213 214 /* I/O channel allocated by a bdev module */ 215 struct spdk_io_channel *shared_ch; 216 217 /* Refcount of bdev channels using this resource */ 218 uint32_t ref; 219 220 TAILQ_ENTRY(spdk_bdev_shared_resource) link; 221 }; 222 223 #define BDEV_CH_RESET_IN_PROGRESS (1 << 0) 224 #define BDEV_CH_QOS_ENABLED (1 << 1) 225 226 struct spdk_bdev_channel { 227 struct spdk_bdev *bdev; 228 229 /* The channel for the underlying device */ 230 struct spdk_io_channel *channel; 231 232 /* Per io_device per thread data */ 233 struct spdk_bdev_shared_resource *shared_resource; 234 235 struct spdk_bdev_io_stat stat; 236 237 /* 238 * Count of I/O submitted through this channel and waiting for completion. 239 * Incremented before submit_request() is called on an spdk_bdev_io. 240 */ 241 uint64_t io_outstanding; 242 243 bdev_io_tailq_t queued_resets; 244 245 uint32_t flags; 246 247 #ifdef SPDK_CONFIG_VTUNE 248 uint64_t start_tsc; 249 uint64_t interval_tsc; 250 __itt_string_handle *handle; 251 struct spdk_bdev_io_stat prev_stat; 252 #endif 253 254 }; 255 256 struct spdk_bdev_desc { 257 struct spdk_bdev *bdev; 258 struct spdk_thread *thread; 259 spdk_bdev_remove_cb_t remove_cb; 260 void *remove_ctx; 261 bool remove_scheduled; 262 bool closed; 263 bool write; 264 TAILQ_ENTRY(spdk_bdev_desc) link; 265 }; 266 267 struct spdk_bdev_iostat_ctx { 268 struct spdk_bdev_io_stat *stat; 269 spdk_bdev_get_device_stat_cb cb; 270 void *cb_arg; 271 }; 272 273 #define __bdev_to_io_dev(bdev) (((char *)bdev) + 1) 274 #define __bdev_from_io_dev(io_dev) ((struct spdk_bdev *)(((char *)io_dev) - 1)) 275 276 static void _spdk_bdev_write_zero_buffer_done(struct spdk_bdev_io *bdev_io, bool success, 277 void *cb_arg); 278 static void _spdk_bdev_write_zero_buffer_next(void *_bdev_io); 279 280 void 281 spdk_bdev_get_opts(struct spdk_bdev_opts *opts) 282 { 283 *opts = g_bdev_opts; 284 } 285 286 int 287 spdk_bdev_set_opts(struct spdk_bdev_opts *opts) 288 { 289 uint32_t min_pool_size; 290 291 /* 292 * Add 1 to the thread count to account for the extra mgmt_ch that gets created during subsystem 293 * initialization. A second mgmt_ch will be created on the same thread when the application starts 294 * but before the deferred put_io_channel event is executed for the first mgmt_ch. 295 */ 296 min_pool_size = opts->bdev_io_cache_size * (spdk_thread_get_count() + 1); 297 if (opts->bdev_io_pool_size < min_pool_size) { 298 SPDK_ERRLOG("bdev_io_pool_size %" PRIu32 " is not compatible with bdev_io_cache_size %" PRIu32 299 " and %" PRIu32 " threads\n", opts->bdev_io_pool_size, opts->bdev_io_cache_size, 300 spdk_thread_get_count()); 301 SPDK_ERRLOG("bdev_io_pool_size must be at least %" PRIu32 "\n", min_pool_size); 302 return -1; 303 } 304 305 g_bdev_opts = *opts; 306 return 0; 307 } 308 309 struct spdk_bdev * 310 spdk_bdev_first(void) 311 { 312 struct spdk_bdev *bdev; 313 314 bdev = TAILQ_FIRST(&g_bdev_mgr.bdevs); 315 if (bdev) { 316 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Starting bdev iteration at %s\n", bdev->name); 317 } 318 319 return bdev; 320 } 321 322 struct spdk_bdev * 323 spdk_bdev_next(struct spdk_bdev *prev) 324 { 325 struct spdk_bdev *bdev; 326 327 bdev = TAILQ_NEXT(prev, internal.link); 328 if (bdev) { 329 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Continuing bdev iteration at %s\n", bdev->name); 330 } 331 332 return bdev; 333 } 334 335 static struct spdk_bdev * 336 _bdev_next_leaf(struct spdk_bdev *bdev) 337 { 338 while (bdev != NULL) { 339 if (bdev->internal.claim_module == NULL) { 340 return bdev; 341 } else { 342 bdev = TAILQ_NEXT(bdev, internal.link); 343 } 344 } 345 346 return bdev; 347 } 348 349 struct spdk_bdev * 350 spdk_bdev_first_leaf(void) 351 { 352 struct spdk_bdev *bdev; 353 354 bdev = _bdev_next_leaf(TAILQ_FIRST(&g_bdev_mgr.bdevs)); 355 356 if (bdev) { 357 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Starting bdev iteration at %s\n", bdev->name); 358 } 359 360 return bdev; 361 } 362 363 struct spdk_bdev * 364 spdk_bdev_next_leaf(struct spdk_bdev *prev) 365 { 366 struct spdk_bdev *bdev; 367 368 bdev = _bdev_next_leaf(TAILQ_NEXT(prev, internal.link)); 369 370 if (bdev) { 371 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Continuing bdev iteration at %s\n", bdev->name); 372 } 373 374 return bdev; 375 } 376 377 struct spdk_bdev * 378 spdk_bdev_get_by_name(const char *bdev_name) 379 { 380 struct spdk_bdev_alias *tmp; 381 struct spdk_bdev *bdev = spdk_bdev_first(); 382 383 while (bdev != NULL) { 384 if (strcmp(bdev_name, bdev->name) == 0) { 385 return bdev; 386 } 387 388 TAILQ_FOREACH(tmp, &bdev->aliases, tailq) { 389 if (strcmp(bdev_name, tmp->alias) == 0) { 390 return bdev; 391 } 392 } 393 394 bdev = spdk_bdev_next(bdev); 395 } 396 397 return NULL; 398 } 399 400 void 401 spdk_bdev_io_set_buf(struct spdk_bdev_io *bdev_io, void *buf, size_t len) 402 { 403 struct iovec *iovs; 404 405 iovs = bdev_io->u.bdev.iovs; 406 407 assert(iovs != NULL); 408 assert(bdev_io->u.bdev.iovcnt >= 1); 409 410 iovs[0].iov_base = buf; 411 iovs[0].iov_len = len; 412 } 413 414 static bool 415 _is_buf_allocated(struct iovec *iovs) 416 { 417 return iovs[0].iov_base != NULL; 418 } 419 420 static bool 421 _are_iovs_aligned(struct iovec *iovs, int iovcnt, uint32_t alignment) 422 { 423 int i; 424 uintptr_t iov_base; 425 426 if (spdk_likely(alignment == 1)) { 427 return true; 428 } 429 430 for (i = 0; i < iovcnt; i++) { 431 iov_base = (uintptr_t)iovs[i].iov_base; 432 if ((iov_base & (alignment - 1)) != 0) { 433 return false; 434 } 435 } 436 437 return true; 438 } 439 440 static void 441 _copy_iovs_to_buf(void *buf, size_t buf_len, struct iovec *iovs, int iovcnt) 442 { 443 int i; 444 size_t len; 445 446 for (i = 0; i < iovcnt; i++) { 447 len = spdk_min(iovs[i].iov_len, buf_len); 448 memcpy(buf, iovs[i].iov_base, len); 449 buf += len; 450 buf_len -= len; 451 } 452 } 453 454 static void 455 _copy_buf_to_iovs(struct iovec *iovs, int iovcnt, void *buf, size_t buf_len) 456 { 457 int i; 458 size_t len; 459 460 for (i = 0; i < iovcnt; i++) { 461 len = spdk_min(iovs[i].iov_len, buf_len); 462 memcpy(iovs[i].iov_base, buf, len); 463 buf += len; 464 buf_len -= len; 465 } 466 } 467 468 static void 469 _bdev_io_set_bounce_buf(struct spdk_bdev_io *bdev_io, void *buf, size_t len) 470 { 471 /* save original iovec */ 472 bdev_io->internal.orig_iovs = bdev_io->u.bdev.iovs; 473 bdev_io->internal.orig_iovcnt = bdev_io->u.bdev.iovcnt; 474 /* set bounce iov */ 475 bdev_io->u.bdev.iovs = &bdev_io->internal.bounce_iov; 476 bdev_io->u.bdev.iovcnt = 1; 477 /* set bounce buffer for this operation */ 478 bdev_io->u.bdev.iovs[0].iov_base = buf; 479 bdev_io->u.bdev.iovs[0].iov_len = len; 480 /* if this is write path, copy data from original buffer to bounce buffer */ 481 if (bdev_io->type == SPDK_BDEV_IO_TYPE_WRITE) { 482 _copy_iovs_to_buf(buf, len, bdev_io->internal.orig_iovs, bdev_io->internal.orig_iovcnt); 483 } 484 } 485 486 static void 487 spdk_bdev_io_put_buf(struct spdk_bdev_io *bdev_io) 488 { 489 struct spdk_mempool *pool; 490 struct spdk_bdev_io *tmp; 491 void *buf, *aligned_buf; 492 bdev_io_stailq_t *stailq; 493 struct spdk_bdev_mgmt_channel *ch; 494 uint64_t buf_len; 495 uint64_t alignment; 496 bool buf_allocated; 497 498 buf = bdev_io->internal.buf; 499 buf_len = bdev_io->internal.buf_len; 500 alignment = spdk_bdev_get_buf_align(bdev_io->bdev); 501 ch = bdev_io->internal.ch->shared_resource->mgmt_ch; 502 503 bdev_io->internal.buf = NULL; 504 505 if (buf_len + alignment <= SPDK_BDEV_SMALL_BUF_MAX_SIZE + SPDK_BDEV_POOL_ALIGNMENT) { 506 pool = g_bdev_mgr.buf_small_pool; 507 stailq = &ch->need_buf_small; 508 } else { 509 pool = g_bdev_mgr.buf_large_pool; 510 stailq = &ch->need_buf_large; 511 } 512 513 if (STAILQ_EMPTY(stailq)) { 514 spdk_mempool_put(pool, buf); 515 } else { 516 tmp = STAILQ_FIRST(stailq); 517 518 alignment = spdk_bdev_get_buf_align(tmp->bdev); 519 buf_allocated = _is_buf_allocated(tmp->u.bdev.iovs); 520 521 aligned_buf = (void *)(((uintptr_t)buf + 522 (alignment - 1)) & ~(alignment - 1)); 523 if (buf_allocated) { 524 _bdev_io_set_bounce_buf(tmp, aligned_buf, tmp->internal.buf_len); 525 } else { 526 spdk_bdev_io_set_buf(tmp, aligned_buf, tmp->internal.buf_len); 527 } 528 529 STAILQ_REMOVE_HEAD(stailq, internal.buf_link); 530 tmp->internal.buf = buf; 531 tmp->internal.get_buf_cb(tmp->internal.ch->channel, tmp); 532 } 533 } 534 535 static void 536 _bdev_io_unset_bounce_buf(struct spdk_bdev_io *bdev_io) 537 { 538 /* if this is read path, copy data from bounce buffer to original buffer */ 539 if (bdev_io->type == SPDK_BDEV_IO_TYPE_READ && 540 bdev_io->internal.status == SPDK_BDEV_IO_STATUS_SUCCESS) { 541 _copy_buf_to_iovs(bdev_io->internal.orig_iovs, bdev_io->internal.orig_iovcnt, 542 bdev_io->internal.bounce_iov.iov_base, bdev_io->internal.bounce_iov.iov_len); 543 } 544 /* set orignal buffer for this io */ 545 bdev_io->u.bdev.iovcnt = bdev_io->internal.orig_iovcnt; 546 bdev_io->u.bdev.iovs = bdev_io->internal.orig_iovs; 547 /* disable bouncing buffer for this io */ 548 bdev_io->internal.orig_iovcnt = 0; 549 bdev_io->internal.orig_iovs = NULL; 550 /* return bounce buffer to the pool */ 551 spdk_bdev_io_put_buf(bdev_io); 552 } 553 554 void 555 spdk_bdev_io_get_buf(struct spdk_bdev_io *bdev_io, spdk_bdev_io_get_buf_cb cb, uint64_t len) 556 { 557 struct spdk_mempool *pool; 558 bdev_io_stailq_t *stailq; 559 void *buf, *aligned_buf; 560 struct spdk_bdev_mgmt_channel *mgmt_ch; 561 uint64_t alignment; 562 bool buf_allocated; 563 564 assert(cb != NULL); 565 assert(bdev_io->u.bdev.iovs != NULL); 566 567 alignment = spdk_bdev_get_buf_align(bdev_io->bdev); 568 buf_allocated = _is_buf_allocated(bdev_io->u.bdev.iovs); 569 570 if (buf_allocated && 571 _are_iovs_aligned(bdev_io->u.bdev.iovs, bdev_io->u.bdev.iovcnt, alignment)) { 572 /* Buffer already present and aligned */ 573 cb(bdev_io->internal.ch->channel, bdev_io); 574 return; 575 } 576 577 assert(len + alignment <= SPDK_BDEV_LARGE_BUF_MAX_SIZE + SPDK_BDEV_POOL_ALIGNMENT); 578 mgmt_ch = bdev_io->internal.ch->shared_resource->mgmt_ch; 579 580 bdev_io->internal.buf_len = len; 581 bdev_io->internal.get_buf_cb = cb; 582 583 if (len + alignment <= SPDK_BDEV_SMALL_BUF_MAX_SIZE + SPDK_BDEV_POOL_ALIGNMENT) { 584 pool = g_bdev_mgr.buf_small_pool; 585 stailq = &mgmt_ch->need_buf_small; 586 } else { 587 pool = g_bdev_mgr.buf_large_pool; 588 stailq = &mgmt_ch->need_buf_large; 589 } 590 591 buf = spdk_mempool_get(pool); 592 593 if (!buf) { 594 STAILQ_INSERT_TAIL(stailq, bdev_io, internal.buf_link); 595 } else { 596 aligned_buf = (void *)(((uintptr_t)buf + (alignment - 1)) & ~(alignment - 1)); 597 598 if (buf_allocated) { 599 _bdev_io_set_bounce_buf(bdev_io, aligned_buf, len); 600 } else { 601 spdk_bdev_io_set_buf(bdev_io, aligned_buf, len); 602 } 603 bdev_io->internal.buf = buf; 604 bdev_io->internal.get_buf_cb(bdev_io->internal.ch->channel, bdev_io); 605 } 606 } 607 608 static int 609 spdk_bdev_module_get_max_ctx_size(void) 610 { 611 struct spdk_bdev_module *bdev_module; 612 int max_bdev_module_size = 0; 613 614 TAILQ_FOREACH(bdev_module, &g_bdev_mgr.bdev_modules, internal.tailq) { 615 if (bdev_module->get_ctx_size && bdev_module->get_ctx_size() > max_bdev_module_size) { 616 max_bdev_module_size = bdev_module->get_ctx_size(); 617 } 618 } 619 620 return max_bdev_module_size; 621 } 622 623 void 624 spdk_bdev_config_text(FILE *fp) 625 { 626 struct spdk_bdev_module *bdev_module; 627 628 TAILQ_FOREACH(bdev_module, &g_bdev_mgr.bdev_modules, internal.tailq) { 629 if (bdev_module->config_text) { 630 bdev_module->config_text(fp); 631 } 632 } 633 } 634 635 static void 636 spdk_bdev_qos_config_json(struct spdk_bdev *bdev, struct spdk_json_write_ctx *w) 637 { 638 int i; 639 struct spdk_bdev_qos *qos = bdev->internal.qos; 640 uint64_t limits[SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES]; 641 642 if (!qos) { 643 return; 644 } 645 646 spdk_bdev_get_qos_rate_limits(bdev, limits); 647 648 spdk_json_write_object_begin(w); 649 spdk_json_write_named_string(w, "method", "set_bdev_qos_limit"); 650 spdk_json_write_name(w, "params"); 651 652 spdk_json_write_object_begin(w); 653 spdk_json_write_named_string(w, "name", bdev->name); 654 for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) { 655 if (limits[i] > 0) { 656 spdk_json_write_named_uint64(w, qos_rpc_type[i], limits[i]); 657 } 658 } 659 spdk_json_write_object_end(w); 660 661 spdk_json_write_object_end(w); 662 } 663 664 void 665 spdk_bdev_subsystem_config_json(struct spdk_json_write_ctx *w) 666 { 667 struct spdk_bdev_module *bdev_module; 668 struct spdk_bdev *bdev; 669 670 assert(w != NULL); 671 672 spdk_json_write_array_begin(w); 673 674 spdk_json_write_object_begin(w); 675 spdk_json_write_named_string(w, "method", "set_bdev_options"); 676 spdk_json_write_name(w, "params"); 677 spdk_json_write_object_begin(w); 678 spdk_json_write_named_uint32(w, "bdev_io_pool_size", g_bdev_opts.bdev_io_pool_size); 679 spdk_json_write_named_uint32(w, "bdev_io_cache_size", g_bdev_opts.bdev_io_cache_size); 680 spdk_json_write_object_end(w); 681 spdk_json_write_object_end(w); 682 683 TAILQ_FOREACH(bdev_module, &g_bdev_mgr.bdev_modules, internal.tailq) { 684 if (bdev_module->config_json) { 685 bdev_module->config_json(w); 686 } 687 } 688 689 TAILQ_FOREACH(bdev, &g_bdev_mgr.bdevs, internal.link) { 690 spdk_bdev_qos_config_json(bdev, w); 691 692 if (bdev->fn_table->write_config_json) { 693 bdev->fn_table->write_config_json(bdev, w); 694 } 695 } 696 697 spdk_json_write_array_end(w); 698 } 699 700 static int 701 spdk_bdev_mgmt_channel_create(void *io_device, void *ctx_buf) 702 { 703 struct spdk_bdev_mgmt_channel *ch = ctx_buf; 704 struct spdk_bdev_io *bdev_io; 705 uint32_t i; 706 707 STAILQ_INIT(&ch->need_buf_small); 708 STAILQ_INIT(&ch->need_buf_large); 709 710 STAILQ_INIT(&ch->per_thread_cache); 711 ch->bdev_io_cache_size = g_bdev_opts.bdev_io_cache_size; 712 713 /* Pre-populate bdev_io cache to ensure this thread cannot be starved. */ 714 ch->per_thread_cache_count = 0; 715 for (i = 0; i < ch->bdev_io_cache_size; i++) { 716 bdev_io = spdk_mempool_get(g_bdev_mgr.bdev_io_pool); 717 assert(bdev_io != NULL); 718 ch->per_thread_cache_count++; 719 STAILQ_INSERT_TAIL(&ch->per_thread_cache, bdev_io, internal.buf_link); 720 } 721 722 TAILQ_INIT(&ch->shared_resources); 723 TAILQ_INIT(&ch->io_wait_queue); 724 725 return 0; 726 } 727 728 static void 729 spdk_bdev_mgmt_channel_destroy(void *io_device, void *ctx_buf) 730 { 731 struct spdk_bdev_mgmt_channel *ch = ctx_buf; 732 struct spdk_bdev_io *bdev_io; 733 734 if (!STAILQ_EMPTY(&ch->need_buf_small) || !STAILQ_EMPTY(&ch->need_buf_large)) { 735 SPDK_ERRLOG("Pending I/O list wasn't empty on mgmt channel free\n"); 736 } 737 738 if (!TAILQ_EMPTY(&ch->shared_resources)) { 739 SPDK_ERRLOG("Module channel list wasn't empty on mgmt channel free\n"); 740 } 741 742 while (!STAILQ_EMPTY(&ch->per_thread_cache)) { 743 bdev_io = STAILQ_FIRST(&ch->per_thread_cache); 744 STAILQ_REMOVE_HEAD(&ch->per_thread_cache, internal.buf_link); 745 ch->per_thread_cache_count--; 746 spdk_mempool_put(g_bdev_mgr.bdev_io_pool, (void *)bdev_io); 747 } 748 749 assert(ch->per_thread_cache_count == 0); 750 } 751 752 static void 753 spdk_bdev_init_complete(int rc) 754 { 755 spdk_bdev_init_cb cb_fn = g_init_cb_fn; 756 void *cb_arg = g_init_cb_arg; 757 struct spdk_bdev_module *m; 758 759 g_bdev_mgr.init_complete = true; 760 g_init_cb_fn = NULL; 761 g_init_cb_arg = NULL; 762 763 /* 764 * For modules that need to know when subsystem init is complete, 765 * inform them now. 766 */ 767 if (rc == 0) { 768 TAILQ_FOREACH(m, &g_bdev_mgr.bdev_modules, internal.tailq) { 769 if (m->init_complete) { 770 m->init_complete(); 771 } 772 } 773 } 774 775 cb_fn(cb_arg, rc); 776 } 777 778 static void 779 spdk_bdev_module_action_complete(void) 780 { 781 struct spdk_bdev_module *m; 782 783 /* 784 * Don't finish bdev subsystem initialization if 785 * module pre-initialization is still in progress, or 786 * the subsystem been already initialized. 787 */ 788 if (!g_bdev_mgr.module_init_complete || g_bdev_mgr.init_complete) { 789 return; 790 } 791 792 /* 793 * Check all bdev modules for inits/examinations in progress. If any 794 * exist, return immediately since we cannot finish bdev subsystem 795 * initialization until all are completed. 796 */ 797 TAILQ_FOREACH(m, &g_bdev_mgr.bdev_modules, internal.tailq) { 798 if (m->internal.action_in_progress > 0) { 799 return; 800 } 801 } 802 803 /* 804 * Modules already finished initialization - now that all 805 * the bdev modules have finished their asynchronous I/O 806 * processing, the entire bdev layer can be marked as complete. 807 */ 808 spdk_bdev_init_complete(0); 809 } 810 811 static void 812 spdk_bdev_module_action_done(struct spdk_bdev_module *module) 813 { 814 assert(module->internal.action_in_progress > 0); 815 module->internal.action_in_progress--; 816 spdk_bdev_module_action_complete(); 817 } 818 819 void 820 spdk_bdev_module_init_done(struct spdk_bdev_module *module) 821 { 822 spdk_bdev_module_action_done(module); 823 } 824 825 void 826 spdk_bdev_module_examine_done(struct spdk_bdev_module *module) 827 { 828 spdk_bdev_module_action_done(module); 829 } 830 831 /** The last initialized bdev module */ 832 static struct spdk_bdev_module *g_resume_bdev_module = NULL; 833 834 static int 835 spdk_bdev_modules_init(void) 836 { 837 struct spdk_bdev_module *module; 838 int rc = 0; 839 840 TAILQ_FOREACH(module, &g_bdev_mgr.bdev_modules, internal.tailq) { 841 g_resume_bdev_module = module; 842 rc = module->module_init(); 843 if (rc != 0) { 844 return rc; 845 } 846 } 847 848 g_resume_bdev_module = NULL; 849 return 0; 850 } 851 852 853 static void 854 spdk_bdev_init_failed_complete(void *cb_arg) 855 { 856 spdk_bdev_init_complete(-1); 857 } 858 859 static void 860 spdk_bdev_init_failed(void *cb_arg) 861 { 862 spdk_bdev_finish(spdk_bdev_init_failed_complete, NULL); 863 } 864 865 void 866 spdk_bdev_initialize(spdk_bdev_init_cb cb_fn, void *cb_arg) 867 { 868 struct spdk_conf_section *sp; 869 struct spdk_bdev_opts bdev_opts; 870 int32_t bdev_io_pool_size, bdev_io_cache_size; 871 int cache_size; 872 int rc = 0; 873 char mempool_name[32]; 874 875 assert(cb_fn != NULL); 876 877 sp = spdk_conf_find_section(NULL, "Bdev"); 878 if (sp != NULL) { 879 spdk_bdev_get_opts(&bdev_opts); 880 881 bdev_io_pool_size = spdk_conf_section_get_intval(sp, "BdevIoPoolSize"); 882 if (bdev_io_pool_size >= 0) { 883 bdev_opts.bdev_io_pool_size = bdev_io_pool_size; 884 } 885 886 bdev_io_cache_size = spdk_conf_section_get_intval(sp, "BdevIoCacheSize"); 887 if (bdev_io_cache_size >= 0) { 888 bdev_opts.bdev_io_cache_size = bdev_io_cache_size; 889 } 890 891 if (spdk_bdev_set_opts(&bdev_opts)) { 892 spdk_bdev_init_complete(-1); 893 return; 894 } 895 896 assert(memcmp(&bdev_opts, &g_bdev_opts, sizeof(bdev_opts)) == 0); 897 } 898 899 g_init_cb_fn = cb_fn; 900 g_init_cb_arg = cb_arg; 901 902 snprintf(mempool_name, sizeof(mempool_name), "bdev_io_%d", getpid()); 903 904 g_bdev_mgr.bdev_io_pool = spdk_mempool_create(mempool_name, 905 g_bdev_opts.bdev_io_pool_size, 906 sizeof(struct spdk_bdev_io) + 907 spdk_bdev_module_get_max_ctx_size(), 908 0, 909 SPDK_ENV_SOCKET_ID_ANY); 910 911 if (g_bdev_mgr.bdev_io_pool == NULL) { 912 SPDK_ERRLOG("could not allocate spdk_bdev_io pool\n"); 913 spdk_bdev_init_complete(-1); 914 return; 915 } 916 917 /** 918 * Ensure no more than half of the total buffers end up local caches, by 919 * using spdk_thread_get_count() to determine how many local caches we need 920 * to account for. 921 */ 922 cache_size = BUF_SMALL_POOL_SIZE / (2 * spdk_thread_get_count()); 923 snprintf(mempool_name, sizeof(mempool_name), "buf_small_pool_%d", getpid()); 924 925 g_bdev_mgr.buf_small_pool = spdk_mempool_create(mempool_name, 926 BUF_SMALL_POOL_SIZE, 927 SPDK_BDEV_SMALL_BUF_MAX_SIZE + SPDK_BDEV_POOL_ALIGNMENT, 928 cache_size, 929 SPDK_ENV_SOCKET_ID_ANY); 930 if (!g_bdev_mgr.buf_small_pool) { 931 SPDK_ERRLOG("create rbuf small pool failed\n"); 932 spdk_bdev_init_complete(-1); 933 return; 934 } 935 936 cache_size = BUF_LARGE_POOL_SIZE / (2 * spdk_thread_get_count()); 937 snprintf(mempool_name, sizeof(mempool_name), "buf_large_pool_%d", getpid()); 938 939 g_bdev_mgr.buf_large_pool = spdk_mempool_create(mempool_name, 940 BUF_LARGE_POOL_SIZE, 941 SPDK_BDEV_LARGE_BUF_MAX_SIZE + SPDK_BDEV_POOL_ALIGNMENT, 942 cache_size, 943 SPDK_ENV_SOCKET_ID_ANY); 944 if (!g_bdev_mgr.buf_large_pool) { 945 SPDK_ERRLOG("create rbuf large pool failed\n"); 946 spdk_bdev_init_complete(-1); 947 return; 948 } 949 950 g_bdev_mgr.zero_buffer = spdk_dma_zmalloc(ZERO_BUFFER_SIZE, ZERO_BUFFER_SIZE, 951 NULL); 952 if (!g_bdev_mgr.zero_buffer) { 953 SPDK_ERRLOG("create bdev zero buffer failed\n"); 954 spdk_bdev_init_complete(-1); 955 return; 956 } 957 958 #ifdef SPDK_CONFIG_VTUNE 959 g_bdev_mgr.domain = __itt_domain_create("spdk_bdev"); 960 #endif 961 962 spdk_io_device_register(&g_bdev_mgr, spdk_bdev_mgmt_channel_create, 963 spdk_bdev_mgmt_channel_destroy, 964 sizeof(struct spdk_bdev_mgmt_channel), 965 "bdev_mgr"); 966 967 rc = spdk_bdev_modules_init(); 968 g_bdev_mgr.module_init_complete = true; 969 if (rc != 0) { 970 SPDK_ERRLOG("bdev modules init failed\n"); 971 spdk_thread_send_msg(spdk_get_thread(), spdk_bdev_init_failed, NULL); 972 return; 973 } 974 975 spdk_bdev_module_action_complete(); 976 } 977 978 static void 979 spdk_bdev_mgr_unregister_cb(void *io_device) 980 { 981 spdk_bdev_fini_cb cb_fn = g_fini_cb_fn; 982 983 if (spdk_mempool_count(g_bdev_mgr.bdev_io_pool) != g_bdev_opts.bdev_io_pool_size) { 984 SPDK_ERRLOG("bdev IO pool count is %zu but should be %u\n", 985 spdk_mempool_count(g_bdev_mgr.bdev_io_pool), 986 g_bdev_opts.bdev_io_pool_size); 987 } 988 989 if (spdk_mempool_count(g_bdev_mgr.buf_small_pool) != BUF_SMALL_POOL_SIZE) { 990 SPDK_ERRLOG("Small buffer pool count is %zu but should be %u\n", 991 spdk_mempool_count(g_bdev_mgr.buf_small_pool), 992 BUF_SMALL_POOL_SIZE); 993 assert(false); 994 } 995 996 if (spdk_mempool_count(g_bdev_mgr.buf_large_pool) != BUF_LARGE_POOL_SIZE) { 997 SPDK_ERRLOG("Large buffer pool count is %zu but should be %u\n", 998 spdk_mempool_count(g_bdev_mgr.buf_large_pool), 999 BUF_LARGE_POOL_SIZE); 1000 assert(false); 1001 } 1002 1003 spdk_mempool_free(g_bdev_mgr.bdev_io_pool); 1004 spdk_mempool_free(g_bdev_mgr.buf_small_pool); 1005 spdk_mempool_free(g_bdev_mgr.buf_large_pool); 1006 spdk_dma_free(g_bdev_mgr.zero_buffer); 1007 1008 cb_fn(g_fini_cb_arg); 1009 g_fini_cb_fn = NULL; 1010 g_fini_cb_arg = NULL; 1011 g_bdev_mgr.init_complete = false; 1012 g_bdev_mgr.module_init_complete = false; 1013 } 1014 1015 static void 1016 spdk_bdev_module_finish_iter(void *arg) 1017 { 1018 struct spdk_bdev_module *bdev_module; 1019 1020 /* Start iterating from the last touched module */ 1021 if (!g_resume_bdev_module) { 1022 bdev_module = TAILQ_LAST(&g_bdev_mgr.bdev_modules, bdev_module_list); 1023 } else { 1024 bdev_module = TAILQ_PREV(g_resume_bdev_module, bdev_module_list, 1025 internal.tailq); 1026 } 1027 1028 while (bdev_module) { 1029 if (bdev_module->async_fini) { 1030 /* Save our place so we can resume later. We must 1031 * save the variable here, before calling module_fini() 1032 * below, because in some cases the module may immediately 1033 * call spdk_bdev_module_finish_done() and re-enter 1034 * this function to continue iterating. */ 1035 g_resume_bdev_module = bdev_module; 1036 } 1037 1038 if (bdev_module->module_fini) { 1039 bdev_module->module_fini(); 1040 } 1041 1042 if (bdev_module->async_fini) { 1043 return; 1044 } 1045 1046 bdev_module = TAILQ_PREV(bdev_module, bdev_module_list, 1047 internal.tailq); 1048 } 1049 1050 g_resume_bdev_module = NULL; 1051 spdk_io_device_unregister(&g_bdev_mgr, spdk_bdev_mgr_unregister_cb); 1052 } 1053 1054 void 1055 spdk_bdev_module_finish_done(void) 1056 { 1057 if (spdk_get_thread() != g_fini_thread) { 1058 spdk_thread_send_msg(g_fini_thread, spdk_bdev_module_finish_iter, NULL); 1059 } else { 1060 spdk_bdev_module_finish_iter(NULL); 1061 } 1062 } 1063 1064 static void 1065 _spdk_bdev_finish_unregister_bdevs_iter(void *cb_arg, int bdeverrno) 1066 { 1067 struct spdk_bdev *bdev = cb_arg; 1068 1069 if (bdeverrno && bdev) { 1070 SPDK_WARNLOG("Unable to unregister bdev '%s' during spdk_bdev_finish()\n", 1071 bdev->name); 1072 1073 /* 1074 * Since the call to spdk_bdev_unregister() failed, we have no way to free this 1075 * bdev; try to continue by manually removing this bdev from the list and continue 1076 * with the next bdev in the list. 1077 */ 1078 TAILQ_REMOVE(&g_bdev_mgr.bdevs, bdev, internal.link); 1079 } 1080 1081 if (TAILQ_EMPTY(&g_bdev_mgr.bdevs)) { 1082 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Done unregistering bdevs\n"); 1083 /* 1084 * Bdev module finish need to be deffered as we might be in the middle of some context 1085 * (like bdev part free) that will use this bdev (or private bdev driver ctx data) 1086 * after returning. 1087 */ 1088 spdk_thread_send_msg(spdk_get_thread(), spdk_bdev_module_finish_iter, NULL); 1089 return; 1090 } 1091 1092 /* 1093 * Unregister the last bdev in the list. The last bdev in the list should be a bdev 1094 * that has no bdevs that depend on it. 1095 */ 1096 bdev = TAILQ_LAST(&g_bdev_mgr.bdevs, spdk_bdev_list); 1097 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Unregistering bdev '%s'\n", bdev->name); 1098 spdk_bdev_unregister(bdev, _spdk_bdev_finish_unregister_bdevs_iter, bdev); 1099 } 1100 1101 void 1102 spdk_bdev_finish(spdk_bdev_fini_cb cb_fn, void *cb_arg) 1103 { 1104 struct spdk_bdev_module *m; 1105 1106 assert(cb_fn != NULL); 1107 1108 g_fini_thread = spdk_get_thread(); 1109 1110 g_fini_cb_fn = cb_fn; 1111 g_fini_cb_arg = cb_arg; 1112 1113 TAILQ_FOREACH(m, &g_bdev_mgr.bdev_modules, internal.tailq) { 1114 if (m->fini_start) { 1115 m->fini_start(); 1116 } 1117 } 1118 1119 _spdk_bdev_finish_unregister_bdevs_iter(NULL, 0); 1120 } 1121 1122 static struct spdk_bdev_io * 1123 spdk_bdev_get_io(struct spdk_bdev_channel *channel) 1124 { 1125 struct spdk_bdev_mgmt_channel *ch = channel->shared_resource->mgmt_ch; 1126 struct spdk_bdev_io *bdev_io; 1127 1128 if (ch->per_thread_cache_count > 0) { 1129 bdev_io = STAILQ_FIRST(&ch->per_thread_cache); 1130 STAILQ_REMOVE_HEAD(&ch->per_thread_cache, internal.buf_link); 1131 ch->per_thread_cache_count--; 1132 } else if (spdk_unlikely(!TAILQ_EMPTY(&ch->io_wait_queue))) { 1133 /* 1134 * Don't try to look for bdev_ios in the global pool if there are 1135 * waiters on bdev_ios - we don't want this caller to jump the line. 1136 */ 1137 bdev_io = NULL; 1138 } else { 1139 bdev_io = spdk_mempool_get(g_bdev_mgr.bdev_io_pool); 1140 } 1141 1142 return bdev_io; 1143 } 1144 1145 void 1146 spdk_bdev_free_io(struct spdk_bdev_io *bdev_io) 1147 { 1148 struct spdk_bdev_mgmt_channel *ch = bdev_io->internal.ch->shared_resource->mgmt_ch; 1149 1150 assert(bdev_io != NULL); 1151 assert(bdev_io->internal.status != SPDK_BDEV_IO_STATUS_PENDING); 1152 1153 if (bdev_io->internal.buf != NULL) { 1154 spdk_bdev_io_put_buf(bdev_io); 1155 } 1156 1157 if (ch->per_thread_cache_count < ch->bdev_io_cache_size) { 1158 ch->per_thread_cache_count++; 1159 STAILQ_INSERT_TAIL(&ch->per_thread_cache, bdev_io, internal.buf_link); 1160 while (ch->per_thread_cache_count > 0 && !TAILQ_EMPTY(&ch->io_wait_queue)) { 1161 struct spdk_bdev_io_wait_entry *entry; 1162 1163 entry = TAILQ_FIRST(&ch->io_wait_queue); 1164 TAILQ_REMOVE(&ch->io_wait_queue, entry, link); 1165 entry->cb_fn(entry->cb_arg); 1166 } 1167 } else { 1168 /* We should never have a full cache with entries on the io wait queue. */ 1169 assert(TAILQ_EMPTY(&ch->io_wait_queue)); 1170 spdk_mempool_put(g_bdev_mgr.bdev_io_pool, (void *)bdev_io); 1171 } 1172 } 1173 1174 static bool 1175 _spdk_bdev_qos_is_iops_rate_limit(enum spdk_bdev_qos_rate_limit_type limit) 1176 { 1177 assert(limit != SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES); 1178 1179 switch (limit) { 1180 case SPDK_BDEV_QOS_RW_IOPS_RATE_LIMIT: 1181 return true; 1182 case SPDK_BDEV_QOS_RW_BPS_RATE_LIMIT: 1183 return false; 1184 case SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES: 1185 default: 1186 return false; 1187 } 1188 } 1189 1190 static bool 1191 _spdk_bdev_qos_io_to_limit(struct spdk_bdev_io *bdev_io) 1192 { 1193 switch (bdev_io->type) { 1194 case SPDK_BDEV_IO_TYPE_NVME_IO: 1195 case SPDK_BDEV_IO_TYPE_NVME_IO_MD: 1196 case SPDK_BDEV_IO_TYPE_READ: 1197 case SPDK_BDEV_IO_TYPE_WRITE: 1198 case SPDK_BDEV_IO_TYPE_UNMAP: 1199 case SPDK_BDEV_IO_TYPE_WRITE_ZEROES: 1200 return true; 1201 default: 1202 return false; 1203 } 1204 } 1205 1206 static uint64_t 1207 _spdk_bdev_get_io_size_in_byte(struct spdk_bdev_io *bdev_io) 1208 { 1209 struct spdk_bdev *bdev = bdev_io->bdev; 1210 1211 switch (bdev_io->type) { 1212 case SPDK_BDEV_IO_TYPE_NVME_IO: 1213 case SPDK_BDEV_IO_TYPE_NVME_IO_MD: 1214 return bdev_io->u.nvme_passthru.nbytes; 1215 case SPDK_BDEV_IO_TYPE_READ: 1216 case SPDK_BDEV_IO_TYPE_WRITE: 1217 case SPDK_BDEV_IO_TYPE_UNMAP: 1218 case SPDK_BDEV_IO_TYPE_WRITE_ZEROES: 1219 return bdev_io->u.bdev.num_blocks * bdev->blocklen; 1220 default: 1221 return 0; 1222 } 1223 } 1224 1225 static void 1226 _spdk_bdev_qos_update_per_io(struct spdk_bdev_qos *qos, uint64_t io_size_in_byte) 1227 { 1228 int i; 1229 1230 for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) { 1231 if (qos->rate_limits[i].limit == SPDK_BDEV_QOS_LIMIT_NOT_DEFINED) { 1232 continue; 1233 } 1234 1235 switch (i) { 1236 case SPDK_BDEV_QOS_RW_IOPS_RATE_LIMIT: 1237 qos->rate_limits[i].remaining_this_timeslice--; 1238 break; 1239 case SPDK_BDEV_QOS_RW_BPS_RATE_LIMIT: 1240 qos->rate_limits[i].remaining_this_timeslice -= io_size_in_byte; 1241 break; 1242 case SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES: 1243 default: 1244 break; 1245 } 1246 } 1247 } 1248 1249 static int 1250 _spdk_bdev_qos_io_submit(struct spdk_bdev_channel *ch, struct spdk_bdev_qos *qos) 1251 { 1252 struct spdk_bdev_io *bdev_io = NULL; 1253 struct spdk_bdev *bdev = ch->bdev; 1254 struct spdk_bdev_shared_resource *shared_resource = ch->shared_resource; 1255 int i, submitted_ios = 0; 1256 bool to_limit_io; 1257 uint64_t io_size_in_byte; 1258 1259 while (!TAILQ_EMPTY(&qos->queued)) { 1260 for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) { 1261 if (qos->rate_limits[i].max_per_timeslice > 0 && 1262 (qos->rate_limits[i].remaining_this_timeslice <= 0)) { 1263 return submitted_ios; 1264 } 1265 } 1266 1267 bdev_io = TAILQ_FIRST(&qos->queued); 1268 TAILQ_REMOVE(&qos->queued, bdev_io, internal.link); 1269 ch->io_outstanding++; 1270 shared_resource->io_outstanding++; 1271 to_limit_io = _spdk_bdev_qos_io_to_limit(bdev_io); 1272 if (to_limit_io == true) { 1273 io_size_in_byte = _spdk_bdev_get_io_size_in_byte(bdev_io); 1274 _spdk_bdev_qos_update_per_io(qos, io_size_in_byte); 1275 } 1276 bdev->fn_table->submit_request(ch->channel, bdev_io); 1277 submitted_ios++; 1278 } 1279 1280 return submitted_ios; 1281 } 1282 1283 static void 1284 _spdk_bdev_queue_io_wait_with_cb(struct spdk_bdev_io *bdev_io, spdk_bdev_io_wait_cb cb_fn) 1285 { 1286 int rc; 1287 1288 bdev_io->internal.waitq_entry.bdev = bdev_io->bdev; 1289 bdev_io->internal.waitq_entry.cb_fn = cb_fn; 1290 bdev_io->internal.waitq_entry.cb_arg = bdev_io; 1291 rc = spdk_bdev_queue_io_wait(bdev_io->bdev, spdk_io_channel_from_ctx(bdev_io->internal.ch), 1292 &bdev_io->internal.waitq_entry); 1293 if (rc != 0) { 1294 SPDK_ERRLOG("Queue IO failed, rc=%d\n", rc); 1295 bdev_io->internal.status = SPDK_BDEV_IO_STATUS_FAILED; 1296 bdev_io->internal.cb(bdev_io, false, bdev_io->internal.caller_ctx); 1297 } 1298 } 1299 1300 static bool 1301 _spdk_bdev_io_type_can_split(uint8_t type) 1302 { 1303 assert(type != SPDK_BDEV_IO_TYPE_INVALID); 1304 assert(type < SPDK_BDEV_NUM_IO_TYPES); 1305 1306 /* Only split READ and WRITE I/O. Theoretically other types of I/O like 1307 * UNMAP could be split, but these types of I/O are typically much larger 1308 * in size (sometimes the size of the entire block device), and the bdev 1309 * module can more efficiently split these types of I/O. Plus those types 1310 * of I/O do not have a payload, which makes the splitting process simpler. 1311 */ 1312 if (type == SPDK_BDEV_IO_TYPE_READ || type == SPDK_BDEV_IO_TYPE_WRITE) { 1313 return true; 1314 } else { 1315 return false; 1316 } 1317 } 1318 1319 static bool 1320 _spdk_bdev_io_should_split(struct spdk_bdev_io *bdev_io) 1321 { 1322 uint64_t start_stripe, end_stripe; 1323 uint32_t io_boundary = bdev_io->bdev->optimal_io_boundary; 1324 1325 if (io_boundary == 0) { 1326 return false; 1327 } 1328 1329 if (!_spdk_bdev_io_type_can_split(bdev_io->type)) { 1330 return false; 1331 } 1332 1333 start_stripe = bdev_io->u.bdev.offset_blocks; 1334 end_stripe = start_stripe + bdev_io->u.bdev.num_blocks - 1; 1335 /* Avoid expensive div operations if possible. These spdk_u32 functions are very cheap. */ 1336 if (spdk_likely(spdk_u32_is_pow2(io_boundary))) { 1337 start_stripe >>= spdk_u32log2(io_boundary); 1338 end_stripe >>= spdk_u32log2(io_boundary); 1339 } else { 1340 start_stripe /= io_boundary; 1341 end_stripe /= io_boundary; 1342 } 1343 return (start_stripe != end_stripe); 1344 } 1345 1346 static uint32_t 1347 _to_next_boundary(uint64_t offset, uint32_t boundary) 1348 { 1349 return (boundary - (offset % boundary)); 1350 } 1351 1352 static void 1353 _spdk_bdev_io_split_done(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg); 1354 1355 static void 1356 _spdk_bdev_io_split_with_payload(void *_bdev_io) 1357 { 1358 struct spdk_bdev_io *bdev_io = _bdev_io; 1359 uint64_t current_offset, remaining; 1360 uint32_t blocklen, to_next_boundary, to_next_boundary_bytes; 1361 struct iovec *parent_iov, *iov; 1362 uint64_t parent_iov_offset, iov_len; 1363 uint32_t parent_iovpos, parent_iovcnt, child_iovcnt, iovcnt; 1364 int rc; 1365 1366 remaining = bdev_io->u.bdev.split_remaining_num_blocks; 1367 current_offset = bdev_io->u.bdev.split_current_offset_blocks; 1368 blocklen = bdev_io->bdev->blocklen; 1369 parent_iov_offset = (current_offset - bdev_io->u.bdev.offset_blocks) * blocklen; 1370 parent_iovcnt = bdev_io->u.bdev.iovcnt; 1371 1372 for (parent_iovpos = 0; parent_iovpos < parent_iovcnt; parent_iovpos++) { 1373 parent_iov = &bdev_io->u.bdev.iovs[parent_iovpos]; 1374 if (parent_iov_offset < parent_iov->iov_len) { 1375 break; 1376 } 1377 parent_iov_offset -= parent_iov->iov_len; 1378 } 1379 1380 child_iovcnt = 0; 1381 while (remaining > 0 && parent_iovpos < parent_iovcnt && child_iovcnt < BDEV_IO_NUM_CHILD_IOV) { 1382 to_next_boundary = _to_next_boundary(current_offset, bdev_io->bdev->optimal_io_boundary); 1383 to_next_boundary = spdk_min(remaining, to_next_boundary); 1384 to_next_boundary_bytes = to_next_boundary * blocklen; 1385 iov = &bdev_io->child_iov[child_iovcnt]; 1386 iovcnt = 0; 1387 while (to_next_boundary_bytes > 0 && parent_iovpos < parent_iovcnt && 1388 child_iovcnt < BDEV_IO_NUM_CHILD_IOV) { 1389 parent_iov = &bdev_io->u.bdev.iovs[parent_iovpos]; 1390 iov_len = spdk_min(to_next_boundary_bytes, parent_iov->iov_len - parent_iov_offset); 1391 to_next_boundary_bytes -= iov_len; 1392 1393 bdev_io->child_iov[child_iovcnt].iov_base = parent_iov->iov_base + parent_iov_offset; 1394 bdev_io->child_iov[child_iovcnt].iov_len = iov_len; 1395 1396 if (iov_len < parent_iov->iov_len - parent_iov_offset) { 1397 parent_iov_offset += iov_len; 1398 } else { 1399 parent_iovpos++; 1400 parent_iov_offset = 0; 1401 } 1402 child_iovcnt++; 1403 iovcnt++; 1404 } 1405 1406 if (to_next_boundary_bytes > 0) { 1407 /* We had to stop this child I/O early because we ran out of 1408 * child_iov space. Make sure the iovs collected are valid and 1409 * then adjust to_next_boundary before starting the child I/O. 1410 */ 1411 if ((to_next_boundary_bytes % blocklen) != 0) { 1412 SPDK_ERRLOG("Remaining %" PRIu32 " is not multiple of block size %" PRIu32 "\n", 1413 to_next_boundary_bytes, blocklen); 1414 bdev_io->internal.status = SPDK_BDEV_IO_STATUS_FAILED; 1415 if (bdev_io->u.bdev.split_outstanding == 0) { 1416 bdev_io->internal.cb(bdev_io, false, bdev_io->internal.caller_ctx); 1417 } 1418 return; 1419 } 1420 to_next_boundary -= to_next_boundary_bytes / blocklen; 1421 } 1422 1423 bdev_io->u.bdev.split_outstanding++; 1424 1425 if (bdev_io->type == SPDK_BDEV_IO_TYPE_READ) { 1426 rc = spdk_bdev_readv_blocks(bdev_io->internal.desc, 1427 spdk_io_channel_from_ctx(bdev_io->internal.ch), 1428 iov, iovcnt, current_offset, to_next_boundary, 1429 _spdk_bdev_io_split_done, bdev_io); 1430 } else { 1431 rc = spdk_bdev_writev_blocks(bdev_io->internal.desc, 1432 spdk_io_channel_from_ctx(bdev_io->internal.ch), 1433 iov, iovcnt, current_offset, to_next_boundary, 1434 _spdk_bdev_io_split_done, bdev_io); 1435 } 1436 1437 if (rc == 0) { 1438 current_offset += to_next_boundary; 1439 remaining -= to_next_boundary; 1440 bdev_io->u.bdev.split_current_offset_blocks = current_offset; 1441 bdev_io->u.bdev.split_remaining_num_blocks = remaining; 1442 } else { 1443 bdev_io->u.bdev.split_outstanding--; 1444 if (rc == -ENOMEM) { 1445 if (bdev_io->u.bdev.split_outstanding == 0) { 1446 /* No I/O is outstanding. Hence we should wait here. */ 1447 _spdk_bdev_queue_io_wait_with_cb(bdev_io, 1448 _spdk_bdev_io_split_with_payload); 1449 } 1450 } else { 1451 bdev_io->internal.status = SPDK_BDEV_IO_STATUS_FAILED; 1452 if (bdev_io->u.bdev.split_outstanding == 0) { 1453 bdev_io->internal.cb(bdev_io, false, bdev_io->internal.caller_ctx); 1454 } 1455 } 1456 1457 return; 1458 } 1459 } 1460 } 1461 1462 static void 1463 _spdk_bdev_io_split_done(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg) 1464 { 1465 struct spdk_bdev_io *parent_io = cb_arg; 1466 1467 spdk_bdev_free_io(bdev_io); 1468 1469 if (!success) { 1470 parent_io->internal.status = SPDK_BDEV_IO_STATUS_FAILED; 1471 } 1472 parent_io->u.bdev.split_outstanding--; 1473 if (parent_io->u.bdev.split_outstanding != 0) { 1474 return; 1475 } 1476 1477 /* 1478 * Parent I/O finishes when all blocks are consumed or there is any failure of 1479 * child I/O and no outstanding child I/O. 1480 */ 1481 if (parent_io->u.bdev.split_remaining_num_blocks == 0 || 1482 parent_io->internal.status != SPDK_BDEV_IO_STATUS_SUCCESS) { 1483 parent_io->internal.cb(parent_io, parent_io->internal.status == SPDK_BDEV_IO_STATUS_SUCCESS, 1484 parent_io->internal.caller_ctx); 1485 return; 1486 } 1487 1488 /* 1489 * Continue with the splitting process. This function will complete the parent I/O if the 1490 * splitting is done. 1491 */ 1492 _spdk_bdev_io_split_with_payload(parent_io); 1493 } 1494 1495 static void 1496 _spdk_bdev_io_split(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io) 1497 { 1498 assert(_spdk_bdev_io_type_can_split(bdev_io->type)); 1499 1500 bdev_io->u.bdev.split_current_offset_blocks = bdev_io->u.bdev.offset_blocks; 1501 bdev_io->u.bdev.split_remaining_num_blocks = bdev_io->u.bdev.num_blocks; 1502 bdev_io->u.bdev.split_outstanding = 0; 1503 bdev_io->internal.status = SPDK_BDEV_IO_STATUS_SUCCESS; 1504 1505 _spdk_bdev_io_split_with_payload(bdev_io); 1506 } 1507 1508 static void 1509 _spdk_bdev_io_submit(void *ctx) 1510 { 1511 struct spdk_bdev_io *bdev_io = ctx; 1512 struct spdk_bdev *bdev = bdev_io->bdev; 1513 struct spdk_bdev_channel *bdev_ch = bdev_io->internal.ch; 1514 struct spdk_io_channel *ch = bdev_ch->channel; 1515 struct spdk_bdev_shared_resource *shared_resource = bdev_ch->shared_resource; 1516 uint64_t tsc; 1517 1518 tsc = spdk_get_ticks(); 1519 bdev_io->internal.submit_tsc = tsc; 1520 spdk_trace_record_tsc(tsc, TRACE_BDEV_IO_START, 0, 0, (uintptr_t)bdev_io, bdev_io->type); 1521 bdev_ch->io_outstanding++; 1522 shared_resource->io_outstanding++; 1523 bdev_io->internal.in_submit_request = true; 1524 if (spdk_likely(bdev_ch->flags == 0)) { 1525 if (spdk_likely(TAILQ_EMPTY(&shared_resource->nomem_io))) { 1526 bdev->fn_table->submit_request(ch, bdev_io); 1527 } else { 1528 bdev_ch->io_outstanding--; 1529 shared_resource->io_outstanding--; 1530 TAILQ_INSERT_TAIL(&shared_resource->nomem_io, bdev_io, internal.link); 1531 } 1532 } else if (bdev_ch->flags & BDEV_CH_RESET_IN_PROGRESS) { 1533 spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED); 1534 } else if (bdev_ch->flags & BDEV_CH_QOS_ENABLED) { 1535 bdev_ch->io_outstanding--; 1536 shared_resource->io_outstanding--; 1537 TAILQ_INSERT_TAIL(&bdev->internal.qos->queued, bdev_io, internal.link); 1538 _spdk_bdev_qos_io_submit(bdev_ch, bdev->internal.qos); 1539 } else { 1540 SPDK_ERRLOG("unknown bdev_ch flag %x found\n", bdev_ch->flags); 1541 spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED); 1542 } 1543 bdev_io->internal.in_submit_request = false; 1544 } 1545 1546 static void 1547 spdk_bdev_io_submit(struct spdk_bdev_io *bdev_io) 1548 { 1549 struct spdk_bdev *bdev = bdev_io->bdev; 1550 struct spdk_thread *thread = spdk_io_channel_get_thread(bdev_io->internal.ch->channel); 1551 1552 assert(thread != NULL); 1553 assert(bdev_io->internal.status == SPDK_BDEV_IO_STATUS_PENDING); 1554 1555 if (bdev->split_on_optimal_io_boundary && _spdk_bdev_io_should_split(bdev_io)) { 1556 if (bdev_io->type == SPDK_BDEV_IO_TYPE_READ) { 1557 spdk_bdev_io_get_buf(bdev_io, _spdk_bdev_io_split, 1558 bdev_io->u.bdev.num_blocks * bdev_io->bdev->blocklen); 1559 } else { 1560 _spdk_bdev_io_split(NULL, bdev_io); 1561 } 1562 return; 1563 } 1564 1565 if (bdev_io->internal.ch->flags & BDEV_CH_QOS_ENABLED) { 1566 if ((thread == bdev->internal.qos->thread) || !bdev->internal.qos->thread) { 1567 _spdk_bdev_io_submit(bdev_io); 1568 } else { 1569 bdev_io->internal.io_submit_ch = bdev_io->internal.ch; 1570 bdev_io->internal.ch = bdev->internal.qos->ch; 1571 spdk_thread_send_msg(bdev->internal.qos->thread, _spdk_bdev_io_submit, bdev_io); 1572 } 1573 } else { 1574 _spdk_bdev_io_submit(bdev_io); 1575 } 1576 } 1577 1578 static void 1579 spdk_bdev_io_submit_reset(struct spdk_bdev_io *bdev_io) 1580 { 1581 struct spdk_bdev *bdev = bdev_io->bdev; 1582 struct spdk_bdev_channel *bdev_ch = bdev_io->internal.ch; 1583 struct spdk_io_channel *ch = bdev_ch->channel; 1584 1585 assert(bdev_io->internal.status == SPDK_BDEV_IO_STATUS_PENDING); 1586 1587 bdev_io->internal.in_submit_request = true; 1588 bdev->fn_table->submit_request(ch, bdev_io); 1589 bdev_io->internal.in_submit_request = false; 1590 } 1591 1592 static void 1593 spdk_bdev_io_init(struct spdk_bdev_io *bdev_io, 1594 struct spdk_bdev *bdev, void *cb_arg, 1595 spdk_bdev_io_completion_cb cb) 1596 { 1597 bdev_io->bdev = bdev; 1598 bdev_io->internal.caller_ctx = cb_arg; 1599 bdev_io->internal.cb = cb; 1600 bdev_io->internal.status = SPDK_BDEV_IO_STATUS_PENDING; 1601 bdev_io->internal.in_submit_request = false; 1602 bdev_io->internal.buf = NULL; 1603 bdev_io->internal.io_submit_ch = NULL; 1604 bdev_io->internal.orig_iovs = NULL; 1605 bdev_io->internal.orig_iovcnt = 0; 1606 } 1607 1608 static bool 1609 _spdk_bdev_io_type_supported(struct spdk_bdev *bdev, enum spdk_bdev_io_type io_type) 1610 { 1611 return bdev->fn_table->io_type_supported(bdev->ctxt, io_type); 1612 } 1613 1614 bool 1615 spdk_bdev_io_type_supported(struct spdk_bdev *bdev, enum spdk_bdev_io_type io_type) 1616 { 1617 bool supported; 1618 1619 supported = _spdk_bdev_io_type_supported(bdev, io_type); 1620 1621 if (!supported) { 1622 switch (io_type) { 1623 case SPDK_BDEV_IO_TYPE_WRITE_ZEROES: 1624 /* The bdev layer will emulate write zeroes as long as write is supported. */ 1625 supported = _spdk_bdev_io_type_supported(bdev, SPDK_BDEV_IO_TYPE_WRITE); 1626 break; 1627 default: 1628 break; 1629 } 1630 } 1631 1632 return supported; 1633 } 1634 1635 int 1636 spdk_bdev_dump_info_json(struct spdk_bdev *bdev, struct spdk_json_write_ctx *w) 1637 { 1638 if (bdev->fn_table->dump_info_json) { 1639 return bdev->fn_table->dump_info_json(bdev->ctxt, w); 1640 } 1641 1642 return 0; 1643 } 1644 1645 static void 1646 spdk_bdev_qos_update_max_quota_per_timeslice(struct spdk_bdev_qos *qos) 1647 { 1648 uint32_t max_per_timeslice = 0; 1649 int i; 1650 1651 for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) { 1652 if (qos->rate_limits[i].limit == SPDK_BDEV_QOS_LIMIT_NOT_DEFINED) { 1653 qos->rate_limits[i].max_per_timeslice = 0; 1654 continue; 1655 } 1656 1657 max_per_timeslice = qos->rate_limits[i].limit * 1658 SPDK_BDEV_QOS_TIMESLICE_IN_USEC / SPDK_SEC_TO_USEC; 1659 1660 qos->rate_limits[i].max_per_timeslice = spdk_max(max_per_timeslice, 1661 qos->rate_limits[i].min_per_timeslice); 1662 1663 qos->rate_limits[i].remaining_this_timeslice = qos->rate_limits[i].max_per_timeslice; 1664 } 1665 } 1666 1667 static int 1668 spdk_bdev_channel_poll_qos(void *arg) 1669 { 1670 struct spdk_bdev_qos *qos = arg; 1671 uint64_t now = spdk_get_ticks(); 1672 int i; 1673 1674 if (now < (qos->last_timeslice + qos->timeslice_size)) { 1675 /* We received our callback earlier than expected - return 1676 * immediately and wait to do accounting until at least one 1677 * timeslice has actually expired. This should never happen 1678 * with a well-behaved timer implementation. 1679 */ 1680 return 0; 1681 } 1682 1683 /* Reset for next round of rate limiting */ 1684 for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) { 1685 /* We may have allowed the IOs or bytes to slightly overrun in the last 1686 * timeslice. remaining_this_timeslice is signed, so if it's negative 1687 * here, we'll account for the overrun so that the next timeslice will 1688 * be appropriately reduced. 1689 */ 1690 if (qos->rate_limits[i].remaining_this_timeslice > 0) { 1691 qos->rate_limits[i].remaining_this_timeslice = 0; 1692 } 1693 } 1694 1695 while (now >= (qos->last_timeslice + qos->timeslice_size)) { 1696 qos->last_timeslice += qos->timeslice_size; 1697 for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) { 1698 qos->rate_limits[i].remaining_this_timeslice += 1699 qos->rate_limits[i].max_per_timeslice; 1700 } 1701 } 1702 1703 return _spdk_bdev_qos_io_submit(qos->ch, qos); 1704 } 1705 1706 static void 1707 _spdk_bdev_channel_destroy_resource(struct spdk_bdev_channel *ch) 1708 { 1709 struct spdk_bdev_shared_resource *shared_resource; 1710 1711 if (!ch) { 1712 return; 1713 } 1714 1715 if (ch->channel) { 1716 spdk_put_io_channel(ch->channel); 1717 } 1718 1719 assert(ch->io_outstanding == 0); 1720 1721 shared_resource = ch->shared_resource; 1722 if (shared_resource) { 1723 assert(ch->io_outstanding == 0); 1724 assert(shared_resource->ref > 0); 1725 shared_resource->ref--; 1726 if (shared_resource->ref == 0) { 1727 assert(shared_resource->io_outstanding == 0); 1728 TAILQ_REMOVE(&shared_resource->mgmt_ch->shared_resources, shared_resource, link); 1729 spdk_put_io_channel(spdk_io_channel_from_ctx(shared_resource->mgmt_ch)); 1730 free(shared_resource); 1731 } 1732 } 1733 } 1734 1735 /* Caller must hold bdev->internal.mutex. */ 1736 static void 1737 _spdk_bdev_enable_qos(struct spdk_bdev *bdev, struct spdk_bdev_channel *ch) 1738 { 1739 struct spdk_bdev_qos *qos = bdev->internal.qos; 1740 int i; 1741 1742 /* Rate limiting on this bdev enabled */ 1743 if (qos) { 1744 if (qos->ch == NULL) { 1745 struct spdk_io_channel *io_ch; 1746 1747 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Selecting channel %p as QoS channel for bdev %s on thread %p\n", ch, 1748 bdev->name, spdk_get_thread()); 1749 1750 /* No qos channel has been selected, so set one up */ 1751 1752 /* Take another reference to ch */ 1753 io_ch = spdk_get_io_channel(__bdev_to_io_dev(bdev)); 1754 qos->ch = ch; 1755 1756 qos->thread = spdk_io_channel_get_thread(io_ch); 1757 1758 TAILQ_INIT(&qos->queued); 1759 1760 for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) { 1761 if (_spdk_bdev_qos_is_iops_rate_limit(i) == true) { 1762 qos->rate_limits[i].min_per_timeslice = 1763 SPDK_BDEV_QOS_MIN_IO_PER_TIMESLICE; 1764 } else { 1765 qos->rate_limits[i].min_per_timeslice = 1766 SPDK_BDEV_QOS_MIN_BYTE_PER_TIMESLICE; 1767 } 1768 1769 if (qos->rate_limits[i].limit == 0) { 1770 qos->rate_limits[i].limit = SPDK_BDEV_QOS_LIMIT_NOT_DEFINED; 1771 } 1772 } 1773 spdk_bdev_qos_update_max_quota_per_timeslice(qos); 1774 qos->timeslice_size = 1775 SPDK_BDEV_QOS_TIMESLICE_IN_USEC * spdk_get_ticks_hz() / SPDK_SEC_TO_USEC; 1776 qos->last_timeslice = spdk_get_ticks(); 1777 qos->poller = spdk_poller_register(spdk_bdev_channel_poll_qos, 1778 qos, 1779 SPDK_BDEV_QOS_TIMESLICE_IN_USEC); 1780 } 1781 1782 ch->flags |= BDEV_CH_QOS_ENABLED; 1783 } 1784 } 1785 1786 static int 1787 spdk_bdev_channel_create(void *io_device, void *ctx_buf) 1788 { 1789 struct spdk_bdev *bdev = __bdev_from_io_dev(io_device); 1790 struct spdk_bdev_channel *ch = ctx_buf; 1791 struct spdk_io_channel *mgmt_io_ch; 1792 struct spdk_bdev_mgmt_channel *mgmt_ch; 1793 struct spdk_bdev_shared_resource *shared_resource; 1794 1795 ch->bdev = bdev; 1796 ch->channel = bdev->fn_table->get_io_channel(bdev->ctxt); 1797 if (!ch->channel) { 1798 return -1; 1799 } 1800 1801 mgmt_io_ch = spdk_get_io_channel(&g_bdev_mgr); 1802 if (!mgmt_io_ch) { 1803 return -1; 1804 } 1805 1806 mgmt_ch = spdk_io_channel_get_ctx(mgmt_io_ch); 1807 TAILQ_FOREACH(shared_resource, &mgmt_ch->shared_resources, link) { 1808 if (shared_resource->shared_ch == ch->channel) { 1809 spdk_put_io_channel(mgmt_io_ch); 1810 shared_resource->ref++; 1811 break; 1812 } 1813 } 1814 1815 if (shared_resource == NULL) { 1816 shared_resource = calloc(1, sizeof(*shared_resource)); 1817 if (shared_resource == NULL) { 1818 spdk_put_io_channel(mgmt_io_ch); 1819 return -1; 1820 } 1821 1822 shared_resource->mgmt_ch = mgmt_ch; 1823 shared_resource->io_outstanding = 0; 1824 TAILQ_INIT(&shared_resource->nomem_io); 1825 shared_resource->nomem_threshold = 0; 1826 shared_resource->shared_ch = ch->channel; 1827 shared_resource->ref = 1; 1828 TAILQ_INSERT_TAIL(&mgmt_ch->shared_resources, shared_resource, link); 1829 } 1830 1831 memset(&ch->stat, 0, sizeof(ch->stat)); 1832 ch->stat.ticks_rate = spdk_get_ticks_hz(); 1833 ch->io_outstanding = 0; 1834 TAILQ_INIT(&ch->queued_resets); 1835 ch->flags = 0; 1836 ch->shared_resource = shared_resource; 1837 1838 #ifdef SPDK_CONFIG_VTUNE 1839 { 1840 char *name; 1841 __itt_init_ittlib(NULL, 0); 1842 name = spdk_sprintf_alloc("spdk_bdev_%s_%p", ch->bdev->name, ch); 1843 if (!name) { 1844 _spdk_bdev_channel_destroy_resource(ch); 1845 return -1; 1846 } 1847 ch->handle = __itt_string_handle_create(name); 1848 free(name); 1849 ch->start_tsc = spdk_get_ticks(); 1850 ch->interval_tsc = spdk_get_ticks_hz() / 100; 1851 memset(&ch->prev_stat, 0, sizeof(ch->prev_stat)); 1852 } 1853 #endif 1854 1855 pthread_mutex_lock(&bdev->internal.mutex); 1856 _spdk_bdev_enable_qos(bdev, ch); 1857 pthread_mutex_unlock(&bdev->internal.mutex); 1858 1859 return 0; 1860 } 1861 1862 /* 1863 * Abort I/O that are waiting on a data buffer. These types of I/O are 1864 * linked using the spdk_bdev_io internal.buf_link TAILQ_ENTRY. 1865 */ 1866 static void 1867 _spdk_bdev_abort_buf_io(bdev_io_stailq_t *queue, struct spdk_bdev_channel *ch) 1868 { 1869 bdev_io_stailq_t tmp; 1870 struct spdk_bdev_io *bdev_io; 1871 1872 STAILQ_INIT(&tmp); 1873 1874 while (!STAILQ_EMPTY(queue)) { 1875 bdev_io = STAILQ_FIRST(queue); 1876 STAILQ_REMOVE_HEAD(queue, internal.buf_link); 1877 if (bdev_io->internal.ch == ch) { 1878 spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED); 1879 } else { 1880 STAILQ_INSERT_TAIL(&tmp, bdev_io, internal.buf_link); 1881 } 1882 } 1883 1884 STAILQ_SWAP(&tmp, queue, spdk_bdev_io); 1885 } 1886 1887 /* 1888 * Abort I/O that are queued waiting for submission. These types of I/O are 1889 * linked using the spdk_bdev_io link TAILQ_ENTRY. 1890 */ 1891 static void 1892 _spdk_bdev_abort_queued_io(bdev_io_tailq_t *queue, struct spdk_bdev_channel *ch) 1893 { 1894 struct spdk_bdev_io *bdev_io, *tmp; 1895 1896 TAILQ_FOREACH_SAFE(bdev_io, queue, internal.link, tmp) { 1897 if (bdev_io->internal.ch == ch) { 1898 TAILQ_REMOVE(queue, bdev_io, internal.link); 1899 /* 1900 * spdk_bdev_io_complete() assumes that the completed I/O had 1901 * been submitted to the bdev module. Since in this case it 1902 * hadn't, bump io_outstanding to account for the decrement 1903 * that spdk_bdev_io_complete() will do. 1904 */ 1905 if (bdev_io->type != SPDK_BDEV_IO_TYPE_RESET) { 1906 ch->io_outstanding++; 1907 ch->shared_resource->io_outstanding++; 1908 } 1909 spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED); 1910 } 1911 } 1912 } 1913 1914 static void 1915 spdk_bdev_qos_channel_destroy(void *cb_arg) 1916 { 1917 struct spdk_bdev_qos *qos = cb_arg; 1918 1919 spdk_put_io_channel(spdk_io_channel_from_ctx(qos->ch)); 1920 spdk_poller_unregister(&qos->poller); 1921 1922 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Free QoS %p.\n", qos); 1923 1924 free(qos); 1925 } 1926 1927 static int 1928 spdk_bdev_qos_destroy(struct spdk_bdev *bdev) 1929 { 1930 int i; 1931 1932 /* 1933 * Cleanly shutting down the QoS poller is tricky, because 1934 * during the asynchronous operation the user could open 1935 * a new descriptor and create a new channel, spawning 1936 * a new QoS poller. 1937 * 1938 * The strategy is to create a new QoS structure here and swap it 1939 * in. The shutdown path then continues to refer to the old one 1940 * until it completes and then releases it. 1941 */ 1942 struct spdk_bdev_qos *new_qos, *old_qos; 1943 1944 old_qos = bdev->internal.qos; 1945 1946 new_qos = calloc(1, sizeof(*new_qos)); 1947 if (!new_qos) { 1948 SPDK_ERRLOG("Unable to allocate memory to shut down QoS.\n"); 1949 return -ENOMEM; 1950 } 1951 1952 /* Copy the old QoS data into the newly allocated structure */ 1953 memcpy(new_qos, old_qos, sizeof(*new_qos)); 1954 1955 /* Zero out the key parts of the QoS structure */ 1956 new_qos->ch = NULL; 1957 new_qos->thread = NULL; 1958 new_qos->poller = NULL; 1959 TAILQ_INIT(&new_qos->queued); 1960 /* 1961 * The limit member of spdk_bdev_qos_limit structure is not zeroed. 1962 * It will be used later for the new QoS structure. 1963 */ 1964 for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) { 1965 new_qos->rate_limits[i].remaining_this_timeslice = 0; 1966 new_qos->rate_limits[i].min_per_timeslice = 0; 1967 new_qos->rate_limits[i].max_per_timeslice = 0; 1968 } 1969 1970 bdev->internal.qos = new_qos; 1971 1972 if (old_qos->thread == NULL) { 1973 free(old_qos); 1974 } else { 1975 spdk_thread_send_msg(old_qos->thread, spdk_bdev_qos_channel_destroy, 1976 old_qos); 1977 } 1978 1979 /* It is safe to continue with destroying the bdev even though the QoS channel hasn't 1980 * been destroyed yet. The destruction path will end up waiting for the final 1981 * channel to be put before it releases resources. */ 1982 1983 return 0; 1984 } 1985 1986 static void 1987 _spdk_bdev_io_stat_add(struct spdk_bdev_io_stat *total, struct spdk_bdev_io_stat *add) 1988 { 1989 total->bytes_read += add->bytes_read; 1990 total->num_read_ops += add->num_read_ops; 1991 total->bytes_written += add->bytes_written; 1992 total->num_write_ops += add->num_write_ops; 1993 total->read_latency_ticks += add->read_latency_ticks; 1994 total->write_latency_ticks += add->write_latency_ticks; 1995 } 1996 1997 static void 1998 spdk_bdev_channel_destroy(void *io_device, void *ctx_buf) 1999 { 2000 struct spdk_bdev_channel *ch = ctx_buf; 2001 struct spdk_bdev_mgmt_channel *mgmt_ch; 2002 struct spdk_bdev_shared_resource *shared_resource = ch->shared_resource; 2003 2004 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Destroying channel %p for bdev %s on thread %p\n", ch, ch->bdev->name, 2005 spdk_get_thread()); 2006 2007 /* This channel is going away, so add its statistics into the bdev so that they don't get lost. */ 2008 pthread_mutex_lock(&ch->bdev->internal.mutex); 2009 _spdk_bdev_io_stat_add(&ch->bdev->internal.stat, &ch->stat); 2010 pthread_mutex_unlock(&ch->bdev->internal.mutex); 2011 2012 mgmt_ch = shared_resource->mgmt_ch; 2013 2014 _spdk_bdev_abort_queued_io(&ch->queued_resets, ch); 2015 _spdk_bdev_abort_queued_io(&shared_resource->nomem_io, ch); 2016 _spdk_bdev_abort_buf_io(&mgmt_ch->need_buf_small, ch); 2017 _spdk_bdev_abort_buf_io(&mgmt_ch->need_buf_large, ch); 2018 2019 _spdk_bdev_channel_destroy_resource(ch); 2020 } 2021 2022 int 2023 spdk_bdev_alias_add(struct spdk_bdev *bdev, const char *alias) 2024 { 2025 struct spdk_bdev_alias *tmp; 2026 2027 if (alias == NULL) { 2028 SPDK_ERRLOG("Empty alias passed\n"); 2029 return -EINVAL; 2030 } 2031 2032 if (spdk_bdev_get_by_name(alias)) { 2033 SPDK_ERRLOG("Bdev name/alias: %s already exists\n", alias); 2034 return -EEXIST; 2035 } 2036 2037 tmp = calloc(1, sizeof(*tmp)); 2038 if (tmp == NULL) { 2039 SPDK_ERRLOG("Unable to allocate alias\n"); 2040 return -ENOMEM; 2041 } 2042 2043 tmp->alias = strdup(alias); 2044 if (tmp->alias == NULL) { 2045 free(tmp); 2046 SPDK_ERRLOG("Unable to allocate alias\n"); 2047 return -ENOMEM; 2048 } 2049 2050 TAILQ_INSERT_TAIL(&bdev->aliases, tmp, tailq); 2051 2052 return 0; 2053 } 2054 2055 int 2056 spdk_bdev_alias_del(struct spdk_bdev *bdev, const char *alias) 2057 { 2058 struct spdk_bdev_alias *tmp; 2059 2060 TAILQ_FOREACH(tmp, &bdev->aliases, tailq) { 2061 if (strcmp(alias, tmp->alias) == 0) { 2062 TAILQ_REMOVE(&bdev->aliases, tmp, tailq); 2063 free(tmp->alias); 2064 free(tmp); 2065 return 0; 2066 } 2067 } 2068 2069 SPDK_INFOLOG(SPDK_LOG_BDEV, "Alias %s does not exists\n", alias); 2070 2071 return -ENOENT; 2072 } 2073 2074 void 2075 spdk_bdev_alias_del_all(struct spdk_bdev *bdev) 2076 { 2077 struct spdk_bdev_alias *p, *tmp; 2078 2079 TAILQ_FOREACH_SAFE(p, &bdev->aliases, tailq, tmp) { 2080 TAILQ_REMOVE(&bdev->aliases, p, tailq); 2081 free(p->alias); 2082 free(p); 2083 } 2084 } 2085 2086 struct spdk_io_channel * 2087 spdk_bdev_get_io_channel(struct spdk_bdev_desc *desc) 2088 { 2089 return spdk_get_io_channel(__bdev_to_io_dev(desc->bdev)); 2090 } 2091 2092 const char * 2093 spdk_bdev_get_name(const struct spdk_bdev *bdev) 2094 { 2095 return bdev->name; 2096 } 2097 2098 const char * 2099 spdk_bdev_get_product_name(const struct spdk_bdev *bdev) 2100 { 2101 return bdev->product_name; 2102 } 2103 2104 const struct spdk_bdev_aliases_list * 2105 spdk_bdev_get_aliases(const struct spdk_bdev *bdev) 2106 { 2107 return &bdev->aliases; 2108 } 2109 2110 uint32_t 2111 spdk_bdev_get_block_size(const struct spdk_bdev *bdev) 2112 { 2113 return bdev->blocklen; 2114 } 2115 2116 uint64_t 2117 spdk_bdev_get_num_blocks(const struct spdk_bdev *bdev) 2118 { 2119 return bdev->blockcnt; 2120 } 2121 2122 const char * 2123 spdk_bdev_get_qos_rpc_type(enum spdk_bdev_qos_rate_limit_type type) 2124 { 2125 return qos_rpc_type[type]; 2126 } 2127 2128 void 2129 spdk_bdev_get_qos_rate_limits(struct spdk_bdev *bdev, uint64_t *limits) 2130 { 2131 int i; 2132 2133 memset(limits, 0, sizeof(*limits) * SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES); 2134 2135 pthread_mutex_lock(&bdev->internal.mutex); 2136 if (bdev->internal.qos) { 2137 for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) { 2138 if (bdev->internal.qos->rate_limits[i].limit != 2139 SPDK_BDEV_QOS_LIMIT_NOT_DEFINED) { 2140 limits[i] = bdev->internal.qos->rate_limits[i].limit; 2141 if (_spdk_bdev_qos_is_iops_rate_limit(i) == false) { 2142 /* Change from Byte to Megabyte which is user visible. */ 2143 limits[i] = limits[i] / 1024 / 1024; 2144 } 2145 } 2146 } 2147 } 2148 pthread_mutex_unlock(&bdev->internal.mutex); 2149 } 2150 2151 size_t 2152 spdk_bdev_get_buf_align(const struct spdk_bdev *bdev) 2153 { 2154 return 1 << bdev->required_alignment; 2155 } 2156 2157 uint32_t 2158 spdk_bdev_get_optimal_io_boundary(const struct spdk_bdev *bdev) 2159 { 2160 return bdev->optimal_io_boundary; 2161 } 2162 2163 bool 2164 spdk_bdev_has_write_cache(const struct spdk_bdev *bdev) 2165 { 2166 return bdev->write_cache; 2167 } 2168 2169 const struct spdk_uuid * 2170 spdk_bdev_get_uuid(const struct spdk_bdev *bdev) 2171 { 2172 return &bdev->uuid; 2173 } 2174 2175 uint64_t 2176 spdk_bdev_get_qd(const struct spdk_bdev *bdev) 2177 { 2178 return bdev->internal.measured_queue_depth; 2179 } 2180 2181 uint64_t 2182 spdk_bdev_get_qd_sampling_period(const struct spdk_bdev *bdev) 2183 { 2184 return bdev->internal.period; 2185 } 2186 2187 uint64_t 2188 spdk_bdev_get_weighted_io_time(const struct spdk_bdev *bdev) 2189 { 2190 return bdev->internal.weighted_io_time; 2191 } 2192 2193 uint64_t 2194 spdk_bdev_get_io_time(const struct spdk_bdev *bdev) 2195 { 2196 return bdev->internal.io_time; 2197 } 2198 2199 static void 2200 _calculate_measured_qd_cpl(struct spdk_io_channel_iter *i, int status) 2201 { 2202 struct spdk_bdev *bdev = spdk_io_channel_iter_get_ctx(i); 2203 2204 bdev->internal.measured_queue_depth = bdev->internal.temporary_queue_depth; 2205 2206 if (bdev->internal.measured_queue_depth) { 2207 bdev->internal.io_time += bdev->internal.period; 2208 bdev->internal.weighted_io_time += bdev->internal.period * bdev->internal.measured_queue_depth; 2209 } 2210 } 2211 2212 static void 2213 _calculate_measured_qd(struct spdk_io_channel_iter *i) 2214 { 2215 struct spdk_bdev *bdev = spdk_io_channel_iter_get_ctx(i); 2216 struct spdk_io_channel *io_ch = spdk_io_channel_iter_get_channel(i); 2217 struct spdk_bdev_channel *ch = spdk_io_channel_get_ctx(io_ch); 2218 2219 bdev->internal.temporary_queue_depth += ch->io_outstanding; 2220 spdk_for_each_channel_continue(i, 0); 2221 } 2222 2223 static int 2224 spdk_bdev_calculate_measured_queue_depth(void *ctx) 2225 { 2226 struct spdk_bdev *bdev = ctx; 2227 bdev->internal.temporary_queue_depth = 0; 2228 spdk_for_each_channel(__bdev_to_io_dev(bdev), _calculate_measured_qd, bdev, 2229 _calculate_measured_qd_cpl); 2230 return 0; 2231 } 2232 2233 void 2234 spdk_bdev_set_qd_sampling_period(struct spdk_bdev *bdev, uint64_t period) 2235 { 2236 bdev->internal.period = period; 2237 2238 if (bdev->internal.qd_poller != NULL) { 2239 spdk_poller_unregister(&bdev->internal.qd_poller); 2240 bdev->internal.measured_queue_depth = UINT64_MAX; 2241 } 2242 2243 if (period != 0) { 2244 bdev->internal.qd_poller = spdk_poller_register(spdk_bdev_calculate_measured_queue_depth, bdev, 2245 period); 2246 } 2247 } 2248 2249 int 2250 spdk_bdev_notify_blockcnt_change(struct spdk_bdev *bdev, uint64_t size) 2251 { 2252 int ret; 2253 2254 pthread_mutex_lock(&bdev->internal.mutex); 2255 2256 /* bdev has open descriptors */ 2257 if (!TAILQ_EMPTY(&bdev->internal.open_descs) && 2258 bdev->blockcnt > size) { 2259 ret = -EBUSY; 2260 } else { 2261 bdev->blockcnt = size; 2262 ret = 0; 2263 } 2264 2265 pthread_mutex_unlock(&bdev->internal.mutex); 2266 2267 return ret; 2268 } 2269 2270 /* 2271 * Convert I/O offset and length from bytes to blocks. 2272 * 2273 * Returns zero on success or non-zero if the byte parameters aren't divisible by the block size. 2274 */ 2275 static uint64_t 2276 spdk_bdev_bytes_to_blocks(struct spdk_bdev *bdev, uint64_t offset_bytes, uint64_t *offset_blocks, 2277 uint64_t num_bytes, uint64_t *num_blocks) 2278 { 2279 uint32_t block_size = bdev->blocklen; 2280 2281 *offset_blocks = offset_bytes / block_size; 2282 *num_blocks = num_bytes / block_size; 2283 2284 return (offset_bytes % block_size) | (num_bytes % block_size); 2285 } 2286 2287 static bool 2288 spdk_bdev_io_valid_blocks(struct spdk_bdev *bdev, uint64_t offset_blocks, uint64_t num_blocks) 2289 { 2290 /* Return failure if offset_blocks + num_blocks is less than offset_blocks; indicates there 2291 * has been an overflow and hence the offset has been wrapped around */ 2292 if (offset_blocks + num_blocks < offset_blocks) { 2293 return false; 2294 } 2295 2296 /* Return failure if offset_blocks + num_blocks exceeds the size of the bdev */ 2297 if (offset_blocks + num_blocks > bdev->blockcnt) { 2298 return false; 2299 } 2300 2301 return true; 2302 } 2303 2304 int 2305 spdk_bdev_read(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 2306 void *buf, uint64_t offset, uint64_t nbytes, 2307 spdk_bdev_io_completion_cb cb, void *cb_arg) 2308 { 2309 uint64_t offset_blocks, num_blocks; 2310 2311 if (spdk_bdev_bytes_to_blocks(desc->bdev, offset, &offset_blocks, nbytes, &num_blocks) != 0) { 2312 return -EINVAL; 2313 } 2314 2315 return spdk_bdev_read_blocks(desc, ch, buf, offset_blocks, num_blocks, cb, cb_arg); 2316 } 2317 2318 int 2319 spdk_bdev_read_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 2320 void *buf, uint64_t offset_blocks, uint64_t num_blocks, 2321 spdk_bdev_io_completion_cb cb, void *cb_arg) 2322 { 2323 struct spdk_bdev *bdev = desc->bdev; 2324 struct spdk_bdev_io *bdev_io; 2325 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 2326 2327 if (!spdk_bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) { 2328 return -EINVAL; 2329 } 2330 2331 bdev_io = spdk_bdev_get_io(channel); 2332 if (!bdev_io) { 2333 return -ENOMEM; 2334 } 2335 2336 bdev_io->internal.ch = channel; 2337 bdev_io->internal.desc = desc; 2338 bdev_io->type = SPDK_BDEV_IO_TYPE_READ; 2339 bdev_io->u.bdev.iovs = &bdev_io->iov; 2340 bdev_io->u.bdev.iovs[0].iov_base = buf; 2341 bdev_io->u.bdev.iovs[0].iov_len = num_blocks * bdev->blocklen; 2342 bdev_io->u.bdev.iovcnt = 1; 2343 bdev_io->u.bdev.num_blocks = num_blocks; 2344 bdev_io->u.bdev.offset_blocks = offset_blocks; 2345 spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb); 2346 2347 spdk_bdev_io_submit(bdev_io); 2348 return 0; 2349 } 2350 2351 int 2352 spdk_bdev_readv(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 2353 struct iovec *iov, int iovcnt, 2354 uint64_t offset, uint64_t nbytes, 2355 spdk_bdev_io_completion_cb cb, void *cb_arg) 2356 { 2357 uint64_t offset_blocks, num_blocks; 2358 2359 if (spdk_bdev_bytes_to_blocks(desc->bdev, offset, &offset_blocks, nbytes, &num_blocks) != 0) { 2360 return -EINVAL; 2361 } 2362 2363 return spdk_bdev_readv_blocks(desc, ch, iov, iovcnt, offset_blocks, num_blocks, cb, cb_arg); 2364 } 2365 2366 int spdk_bdev_readv_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 2367 struct iovec *iov, int iovcnt, 2368 uint64_t offset_blocks, uint64_t num_blocks, 2369 spdk_bdev_io_completion_cb cb, void *cb_arg) 2370 { 2371 struct spdk_bdev *bdev = desc->bdev; 2372 struct spdk_bdev_io *bdev_io; 2373 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 2374 2375 if (!spdk_bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) { 2376 return -EINVAL; 2377 } 2378 2379 bdev_io = spdk_bdev_get_io(channel); 2380 if (!bdev_io) { 2381 return -ENOMEM; 2382 } 2383 2384 bdev_io->internal.ch = channel; 2385 bdev_io->internal.desc = desc; 2386 bdev_io->type = SPDK_BDEV_IO_TYPE_READ; 2387 bdev_io->u.bdev.iovs = iov; 2388 bdev_io->u.bdev.iovcnt = iovcnt; 2389 bdev_io->u.bdev.num_blocks = num_blocks; 2390 bdev_io->u.bdev.offset_blocks = offset_blocks; 2391 spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb); 2392 2393 spdk_bdev_io_submit(bdev_io); 2394 return 0; 2395 } 2396 2397 int 2398 spdk_bdev_write(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 2399 void *buf, uint64_t offset, uint64_t nbytes, 2400 spdk_bdev_io_completion_cb cb, void *cb_arg) 2401 { 2402 uint64_t offset_blocks, num_blocks; 2403 2404 if (spdk_bdev_bytes_to_blocks(desc->bdev, offset, &offset_blocks, nbytes, &num_blocks) != 0) { 2405 return -EINVAL; 2406 } 2407 2408 return spdk_bdev_write_blocks(desc, ch, buf, offset_blocks, num_blocks, cb, cb_arg); 2409 } 2410 2411 int 2412 spdk_bdev_write_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 2413 void *buf, uint64_t offset_blocks, uint64_t num_blocks, 2414 spdk_bdev_io_completion_cb cb, void *cb_arg) 2415 { 2416 struct spdk_bdev *bdev = desc->bdev; 2417 struct spdk_bdev_io *bdev_io; 2418 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 2419 2420 if (!desc->write) { 2421 return -EBADF; 2422 } 2423 2424 if (!spdk_bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) { 2425 return -EINVAL; 2426 } 2427 2428 bdev_io = spdk_bdev_get_io(channel); 2429 if (!bdev_io) { 2430 return -ENOMEM; 2431 } 2432 2433 bdev_io->internal.ch = channel; 2434 bdev_io->internal.desc = desc; 2435 bdev_io->type = SPDK_BDEV_IO_TYPE_WRITE; 2436 bdev_io->u.bdev.iovs = &bdev_io->iov; 2437 bdev_io->u.bdev.iovs[0].iov_base = buf; 2438 bdev_io->u.bdev.iovs[0].iov_len = num_blocks * bdev->blocklen; 2439 bdev_io->u.bdev.iovcnt = 1; 2440 bdev_io->u.bdev.num_blocks = num_blocks; 2441 bdev_io->u.bdev.offset_blocks = offset_blocks; 2442 spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb); 2443 2444 spdk_bdev_io_submit(bdev_io); 2445 return 0; 2446 } 2447 2448 int 2449 spdk_bdev_writev(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 2450 struct iovec *iov, int iovcnt, 2451 uint64_t offset, uint64_t len, 2452 spdk_bdev_io_completion_cb cb, void *cb_arg) 2453 { 2454 uint64_t offset_blocks, num_blocks; 2455 2456 if (spdk_bdev_bytes_to_blocks(desc->bdev, offset, &offset_blocks, len, &num_blocks) != 0) { 2457 return -EINVAL; 2458 } 2459 2460 return spdk_bdev_writev_blocks(desc, ch, iov, iovcnt, offset_blocks, num_blocks, cb, cb_arg); 2461 } 2462 2463 int 2464 spdk_bdev_writev_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 2465 struct iovec *iov, int iovcnt, 2466 uint64_t offset_blocks, uint64_t num_blocks, 2467 spdk_bdev_io_completion_cb cb, void *cb_arg) 2468 { 2469 struct spdk_bdev *bdev = desc->bdev; 2470 struct spdk_bdev_io *bdev_io; 2471 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 2472 2473 if (!desc->write) { 2474 return -EBADF; 2475 } 2476 2477 if (!spdk_bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) { 2478 return -EINVAL; 2479 } 2480 2481 bdev_io = spdk_bdev_get_io(channel); 2482 if (!bdev_io) { 2483 return -ENOMEM; 2484 } 2485 2486 bdev_io->internal.ch = channel; 2487 bdev_io->internal.desc = desc; 2488 bdev_io->type = SPDK_BDEV_IO_TYPE_WRITE; 2489 bdev_io->u.bdev.iovs = iov; 2490 bdev_io->u.bdev.iovcnt = iovcnt; 2491 bdev_io->u.bdev.num_blocks = num_blocks; 2492 bdev_io->u.bdev.offset_blocks = offset_blocks; 2493 spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb); 2494 2495 spdk_bdev_io_submit(bdev_io); 2496 return 0; 2497 } 2498 2499 int 2500 spdk_bdev_write_zeroes(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 2501 uint64_t offset, uint64_t len, 2502 spdk_bdev_io_completion_cb cb, void *cb_arg) 2503 { 2504 uint64_t offset_blocks, num_blocks; 2505 2506 if (spdk_bdev_bytes_to_blocks(desc->bdev, offset, &offset_blocks, len, &num_blocks) != 0) { 2507 return -EINVAL; 2508 } 2509 2510 return spdk_bdev_write_zeroes_blocks(desc, ch, offset_blocks, num_blocks, cb, cb_arg); 2511 } 2512 2513 int 2514 spdk_bdev_write_zeroes_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 2515 uint64_t offset_blocks, uint64_t num_blocks, 2516 spdk_bdev_io_completion_cb cb, void *cb_arg) 2517 { 2518 struct spdk_bdev *bdev = desc->bdev; 2519 struct spdk_bdev_io *bdev_io; 2520 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 2521 2522 if (!desc->write) { 2523 return -EBADF; 2524 } 2525 2526 if (!spdk_bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) { 2527 return -EINVAL; 2528 } 2529 2530 bdev_io = spdk_bdev_get_io(channel); 2531 2532 if (!bdev_io) { 2533 return -ENOMEM; 2534 } 2535 2536 bdev_io->type = SPDK_BDEV_IO_TYPE_WRITE_ZEROES; 2537 bdev_io->internal.ch = channel; 2538 bdev_io->internal.desc = desc; 2539 bdev_io->u.bdev.offset_blocks = offset_blocks; 2540 bdev_io->u.bdev.num_blocks = num_blocks; 2541 spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb); 2542 2543 if (_spdk_bdev_io_type_supported(bdev, SPDK_BDEV_IO_TYPE_WRITE_ZEROES)) { 2544 spdk_bdev_io_submit(bdev_io); 2545 return 0; 2546 } else if (_spdk_bdev_io_type_supported(bdev, SPDK_BDEV_IO_TYPE_WRITE)) { 2547 assert(spdk_bdev_get_block_size(bdev) <= ZERO_BUFFER_SIZE); 2548 bdev_io->u.bdev.split_remaining_num_blocks = num_blocks; 2549 bdev_io->u.bdev.split_current_offset_blocks = offset_blocks; 2550 _spdk_bdev_write_zero_buffer_next(bdev_io); 2551 return 0; 2552 } else { 2553 spdk_bdev_free_io(bdev_io); 2554 return -ENOTSUP; 2555 } 2556 } 2557 2558 int 2559 spdk_bdev_unmap(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 2560 uint64_t offset, uint64_t nbytes, 2561 spdk_bdev_io_completion_cb cb, void *cb_arg) 2562 { 2563 uint64_t offset_blocks, num_blocks; 2564 2565 if (spdk_bdev_bytes_to_blocks(desc->bdev, offset, &offset_blocks, nbytes, &num_blocks) != 0) { 2566 return -EINVAL; 2567 } 2568 2569 return spdk_bdev_unmap_blocks(desc, ch, offset_blocks, num_blocks, cb, cb_arg); 2570 } 2571 2572 int 2573 spdk_bdev_unmap_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 2574 uint64_t offset_blocks, uint64_t num_blocks, 2575 spdk_bdev_io_completion_cb cb, void *cb_arg) 2576 { 2577 struct spdk_bdev *bdev = desc->bdev; 2578 struct spdk_bdev_io *bdev_io; 2579 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 2580 2581 if (!desc->write) { 2582 return -EBADF; 2583 } 2584 2585 if (!spdk_bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) { 2586 return -EINVAL; 2587 } 2588 2589 if (num_blocks == 0) { 2590 SPDK_ERRLOG("Can't unmap 0 bytes\n"); 2591 return -EINVAL; 2592 } 2593 2594 bdev_io = spdk_bdev_get_io(channel); 2595 if (!bdev_io) { 2596 return -ENOMEM; 2597 } 2598 2599 bdev_io->internal.ch = channel; 2600 bdev_io->internal.desc = desc; 2601 bdev_io->type = SPDK_BDEV_IO_TYPE_UNMAP; 2602 2603 bdev_io->u.bdev.iovs = &bdev_io->iov; 2604 bdev_io->u.bdev.iovs[0].iov_base = NULL; 2605 bdev_io->u.bdev.iovs[0].iov_len = 0; 2606 bdev_io->u.bdev.iovcnt = 1; 2607 2608 bdev_io->u.bdev.offset_blocks = offset_blocks; 2609 bdev_io->u.bdev.num_blocks = num_blocks; 2610 spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb); 2611 2612 spdk_bdev_io_submit(bdev_io); 2613 return 0; 2614 } 2615 2616 int 2617 spdk_bdev_flush(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 2618 uint64_t offset, uint64_t length, 2619 spdk_bdev_io_completion_cb cb, void *cb_arg) 2620 { 2621 uint64_t offset_blocks, num_blocks; 2622 2623 if (spdk_bdev_bytes_to_blocks(desc->bdev, offset, &offset_blocks, length, &num_blocks) != 0) { 2624 return -EINVAL; 2625 } 2626 2627 return spdk_bdev_flush_blocks(desc, ch, offset_blocks, num_blocks, cb, cb_arg); 2628 } 2629 2630 int 2631 spdk_bdev_flush_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 2632 uint64_t offset_blocks, uint64_t num_blocks, 2633 spdk_bdev_io_completion_cb cb, void *cb_arg) 2634 { 2635 struct spdk_bdev *bdev = desc->bdev; 2636 struct spdk_bdev_io *bdev_io; 2637 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 2638 2639 if (!desc->write) { 2640 return -EBADF; 2641 } 2642 2643 if (!spdk_bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) { 2644 return -EINVAL; 2645 } 2646 2647 bdev_io = spdk_bdev_get_io(channel); 2648 if (!bdev_io) { 2649 return -ENOMEM; 2650 } 2651 2652 bdev_io->internal.ch = channel; 2653 bdev_io->internal.desc = desc; 2654 bdev_io->type = SPDK_BDEV_IO_TYPE_FLUSH; 2655 bdev_io->u.bdev.iovs = NULL; 2656 bdev_io->u.bdev.iovcnt = 0; 2657 bdev_io->u.bdev.offset_blocks = offset_blocks; 2658 bdev_io->u.bdev.num_blocks = num_blocks; 2659 spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb); 2660 2661 spdk_bdev_io_submit(bdev_io); 2662 return 0; 2663 } 2664 2665 static void 2666 _spdk_bdev_reset_dev(struct spdk_io_channel_iter *i, int status) 2667 { 2668 struct spdk_bdev_channel *ch = spdk_io_channel_iter_get_ctx(i); 2669 struct spdk_bdev_io *bdev_io; 2670 2671 bdev_io = TAILQ_FIRST(&ch->queued_resets); 2672 TAILQ_REMOVE(&ch->queued_resets, bdev_io, internal.link); 2673 spdk_bdev_io_submit_reset(bdev_io); 2674 } 2675 2676 static void 2677 _spdk_bdev_reset_freeze_channel(struct spdk_io_channel_iter *i) 2678 { 2679 struct spdk_io_channel *ch; 2680 struct spdk_bdev_channel *channel; 2681 struct spdk_bdev_mgmt_channel *mgmt_channel; 2682 struct spdk_bdev_shared_resource *shared_resource; 2683 bdev_io_tailq_t tmp_queued; 2684 2685 TAILQ_INIT(&tmp_queued); 2686 2687 ch = spdk_io_channel_iter_get_channel(i); 2688 channel = spdk_io_channel_get_ctx(ch); 2689 shared_resource = channel->shared_resource; 2690 mgmt_channel = shared_resource->mgmt_ch; 2691 2692 channel->flags |= BDEV_CH_RESET_IN_PROGRESS; 2693 2694 if ((channel->flags & BDEV_CH_QOS_ENABLED) != 0) { 2695 /* The QoS object is always valid and readable while 2696 * the channel flag is set, so the lock here should not 2697 * be necessary. We're not in the fast path though, so 2698 * just take it anyway. */ 2699 pthread_mutex_lock(&channel->bdev->internal.mutex); 2700 if (channel->bdev->internal.qos->ch == channel) { 2701 TAILQ_SWAP(&channel->bdev->internal.qos->queued, &tmp_queued, spdk_bdev_io, internal.link); 2702 } 2703 pthread_mutex_unlock(&channel->bdev->internal.mutex); 2704 } 2705 2706 _spdk_bdev_abort_queued_io(&shared_resource->nomem_io, channel); 2707 _spdk_bdev_abort_buf_io(&mgmt_channel->need_buf_small, channel); 2708 _spdk_bdev_abort_buf_io(&mgmt_channel->need_buf_large, channel); 2709 _spdk_bdev_abort_queued_io(&tmp_queued, channel); 2710 2711 spdk_for_each_channel_continue(i, 0); 2712 } 2713 2714 static void 2715 _spdk_bdev_start_reset(void *ctx) 2716 { 2717 struct spdk_bdev_channel *ch = ctx; 2718 2719 spdk_for_each_channel(__bdev_to_io_dev(ch->bdev), _spdk_bdev_reset_freeze_channel, 2720 ch, _spdk_bdev_reset_dev); 2721 } 2722 2723 static void 2724 _spdk_bdev_channel_start_reset(struct spdk_bdev_channel *ch) 2725 { 2726 struct spdk_bdev *bdev = ch->bdev; 2727 2728 assert(!TAILQ_EMPTY(&ch->queued_resets)); 2729 2730 pthread_mutex_lock(&bdev->internal.mutex); 2731 if (bdev->internal.reset_in_progress == NULL) { 2732 bdev->internal.reset_in_progress = TAILQ_FIRST(&ch->queued_resets); 2733 /* 2734 * Take a channel reference for the target bdev for the life of this 2735 * reset. This guards against the channel getting destroyed while 2736 * spdk_for_each_channel() calls related to this reset IO are in 2737 * progress. We will release the reference when this reset is 2738 * completed. 2739 */ 2740 bdev->internal.reset_in_progress->u.reset.ch_ref = spdk_get_io_channel(__bdev_to_io_dev(bdev)); 2741 _spdk_bdev_start_reset(ch); 2742 } 2743 pthread_mutex_unlock(&bdev->internal.mutex); 2744 } 2745 2746 int 2747 spdk_bdev_reset(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 2748 spdk_bdev_io_completion_cb cb, void *cb_arg) 2749 { 2750 struct spdk_bdev *bdev = desc->bdev; 2751 struct spdk_bdev_io *bdev_io; 2752 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 2753 2754 bdev_io = spdk_bdev_get_io(channel); 2755 if (!bdev_io) { 2756 return -ENOMEM; 2757 } 2758 2759 bdev_io->internal.ch = channel; 2760 bdev_io->internal.desc = desc; 2761 bdev_io->type = SPDK_BDEV_IO_TYPE_RESET; 2762 bdev_io->u.reset.ch_ref = NULL; 2763 spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb); 2764 2765 pthread_mutex_lock(&bdev->internal.mutex); 2766 TAILQ_INSERT_TAIL(&channel->queued_resets, bdev_io, internal.link); 2767 pthread_mutex_unlock(&bdev->internal.mutex); 2768 2769 _spdk_bdev_channel_start_reset(channel); 2770 2771 return 0; 2772 } 2773 2774 void 2775 spdk_bdev_get_io_stat(struct spdk_bdev *bdev, struct spdk_io_channel *ch, 2776 struct spdk_bdev_io_stat *stat) 2777 { 2778 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 2779 2780 *stat = channel->stat; 2781 } 2782 2783 static void 2784 _spdk_bdev_get_device_stat_done(struct spdk_io_channel_iter *i, int status) 2785 { 2786 void *io_device = spdk_io_channel_iter_get_io_device(i); 2787 struct spdk_bdev_iostat_ctx *bdev_iostat_ctx = spdk_io_channel_iter_get_ctx(i); 2788 2789 bdev_iostat_ctx->cb(__bdev_from_io_dev(io_device), bdev_iostat_ctx->stat, 2790 bdev_iostat_ctx->cb_arg, 0); 2791 free(bdev_iostat_ctx); 2792 } 2793 2794 static void 2795 _spdk_bdev_get_each_channel_stat(struct spdk_io_channel_iter *i) 2796 { 2797 struct spdk_bdev_iostat_ctx *bdev_iostat_ctx = spdk_io_channel_iter_get_ctx(i); 2798 struct spdk_io_channel *ch = spdk_io_channel_iter_get_channel(i); 2799 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 2800 2801 _spdk_bdev_io_stat_add(bdev_iostat_ctx->stat, &channel->stat); 2802 spdk_for_each_channel_continue(i, 0); 2803 } 2804 2805 void 2806 spdk_bdev_get_device_stat(struct spdk_bdev *bdev, struct spdk_bdev_io_stat *stat, 2807 spdk_bdev_get_device_stat_cb cb, void *cb_arg) 2808 { 2809 struct spdk_bdev_iostat_ctx *bdev_iostat_ctx; 2810 2811 assert(bdev != NULL); 2812 assert(stat != NULL); 2813 assert(cb != NULL); 2814 2815 bdev_iostat_ctx = calloc(1, sizeof(struct spdk_bdev_iostat_ctx)); 2816 if (bdev_iostat_ctx == NULL) { 2817 SPDK_ERRLOG("Unable to allocate memory for spdk_bdev_iostat_ctx\n"); 2818 cb(bdev, stat, cb_arg, -ENOMEM); 2819 return; 2820 } 2821 2822 bdev_iostat_ctx->stat = stat; 2823 bdev_iostat_ctx->cb = cb; 2824 bdev_iostat_ctx->cb_arg = cb_arg; 2825 2826 /* Start with the statistics from previously deleted channels. */ 2827 pthread_mutex_lock(&bdev->internal.mutex); 2828 _spdk_bdev_io_stat_add(bdev_iostat_ctx->stat, &bdev->internal.stat); 2829 pthread_mutex_unlock(&bdev->internal.mutex); 2830 2831 /* Then iterate and add the statistics from each existing channel. */ 2832 spdk_for_each_channel(__bdev_to_io_dev(bdev), 2833 _spdk_bdev_get_each_channel_stat, 2834 bdev_iostat_ctx, 2835 _spdk_bdev_get_device_stat_done); 2836 } 2837 2838 int 2839 spdk_bdev_nvme_admin_passthru(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 2840 const struct spdk_nvme_cmd *cmd, void *buf, size_t nbytes, 2841 spdk_bdev_io_completion_cb cb, void *cb_arg) 2842 { 2843 struct spdk_bdev *bdev = desc->bdev; 2844 struct spdk_bdev_io *bdev_io; 2845 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 2846 2847 if (!desc->write) { 2848 return -EBADF; 2849 } 2850 2851 bdev_io = spdk_bdev_get_io(channel); 2852 if (!bdev_io) { 2853 return -ENOMEM; 2854 } 2855 2856 bdev_io->internal.ch = channel; 2857 bdev_io->internal.desc = desc; 2858 bdev_io->type = SPDK_BDEV_IO_TYPE_NVME_ADMIN; 2859 bdev_io->u.nvme_passthru.cmd = *cmd; 2860 bdev_io->u.nvme_passthru.buf = buf; 2861 bdev_io->u.nvme_passthru.nbytes = nbytes; 2862 bdev_io->u.nvme_passthru.md_buf = NULL; 2863 bdev_io->u.nvme_passthru.md_len = 0; 2864 2865 spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb); 2866 2867 spdk_bdev_io_submit(bdev_io); 2868 return 0; 2869 } 2870 2871 int 2872 spdk_bdev_nvme_io_passthru(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 2873 const struct spdk_nvme_cmd *cmd, void *buf, size_t nbytes, 2874 spdk_bdev_io_completion_cb cb, void *cb_arg) 2875 { 2876 struct spdk_bdev *bdev = desc->bdev; 2877 struct spdk_bdev_io *bdev_io; 2878 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 2879 2880 if (!desc->write) { 2881 /* 2882 * Do not try to parse the NVMe command - we could maybe use bits in the opcode 2883 * to easily determine if the command is a read or write, but for now just 2884 * do not allow io_passthru with a read-only descriptor. 2885 */ 2886 return -EBADF; 2887 } 2888 2889 bdev_io = spdk_bdev_get_io(channel); 2890 if (!bdev_io) { 2891 return -ENOMEM; 2892 } 2893 2894 bdev_io->internal.ch = channel; 2895 bdev_io->internal.desc = desc; 2896 bdev_io->type = SPDK_BDEV_IO_TYPE_NVME_IO; 2897 bdev_io->u.nvme_passthru.cmd = *cmd; 2898 bdev_io->u.nvme_passthru.buf = buf; 2899 bdev_io->u.nvme_passthru.nbytes = nbytes; 2900 bdev_io->u.nvme_passthru.md_buf = NULL; 2901 bdev_io->u.nvme_passthru.md_len = 0; 2902 2903 spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb); 2904 2905 spdk_bdev_io_submit(bdev_io); 2906 return 0; 2907 } 2908 2909 int 2910 spdk_bdev_nvme_io_passthru_md(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 2911 const struct spdk_nvme_cmd *cmd, void *buf, size_t nbytes, void *md_buf, size_t md_len, 2912 spdk_bdev_io_completion_cb cb, void *cb_arg) 2913 { 2914 struct spdk_bdev *bdev = desc->bdev; 2915 struct spdk_bdev_io *bdev_io; 2916 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 2917 2918 if (!desc->write) { 2919 /* 2920 * Do not try to parse the NVMe command - we could maybe use bits in the opcode 2921 * to easily determine if the command is a read or write, but for now just 2922 * do not allow io_passthru with a read-only descriptor. 2923 */ 2924 return -EBADF; 2925 } 2926 2927 bdev_io = spdk_bdev_get_io(channel); 2928 if (!bdev_io) { 2929 return -ENOMEM; 2930 } 2931 2932 bdev_io->internal.ch = channel; 2933 bdev_io->internal.desc = desc; 2934 bdev_io->type = SPDK_BDEV_IO_TYPE_NVME_IO_MD; 2935 bdev_io->u.nvme_passthru.cmd = *cmd; 2936 bdev_io->u.nvme_passthru.buf = buf; 2937 bdev_io->u.nvme_passthru.nbytes = nbytes; 2938 bdev_io->u.nvme_passthru.md_buf = md_buf; 2939 bdev_io->u.nvme_passthru.md_len = md_len; 2940 2941 spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb); 2942 2943 spdk_bdev_io_submit(bdev_io); 2944 return 0; 2945 } 2946 2947 int 2948 spdk_bdev_queue_io_wait(struct spdk_bdev *bdev, struct spdk_io_channel *ch, 2949 struct spdk_bdev_io_wait_entry *entry) 2950 { 2951 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 2952 struct spdk_bdev_mgmt_channel *mgmt_ch = channel->shared_resource->mgmt_ch; 2953 2954 if (bdev != entry->bdev) { 2955 SPDK_ERRLOG("bdevs do not match\n"); 2956 return -EINVAL; 2957 } 2958 2959 if (mgmt_ch->per_thread_cache_count > 0) { 2960 SPDK_ERRLOG("Cannot queue io_wait if spdk_bdev_io available in per-thread cache\n"); 2961 return -EINVAL; 2962 } 2963 2964 TAILQ_INSERT_TAIL(&mgmt_ch->io_wait_queue, entry, link); 2965 return 0; 2966 } 2967 2968 static void 2969 _spdk_bdev_ch_retry_io(struct spdk_bdev_channel *bdev_ch) 2970 { 2971 struct spdk_bdev *bdev = bdev_ch->bdev; 2972 struct spdk_bdev_shared_resource *shared_resource = bdev_ch->shared_resource; 2973 struct spdk_bdev_io *bdev_io; 2974 2975 if (shared_resource->io_outstanding > shared_resource->nomem_threshold) { 2976 /* 2977 * Allow some more I/O to complete before retrying the nomem_io queue. 2978 * Some drivers (such as nvme) cannot immediately take a new I/O in 2979 * the context of a completion, because the resources for the I/O are 2980 * not released until control returns to the bdev poller. Also, we 2981 * may require several small I/O to complete before a larger I/O 2982 * (that requires splitting) can be submitted. 2983 */ 2984 return; 2985 } 2986 2987 while (!TAILQ_EMPTY(&shared_resource->nomem_io)) { 2988 bdev_io = TAILQ_FIRST(&shared_resource->nomem_io); 2989 TAILQ_REMOVE(&shared_resource->nomem_io, bdev_io, internal.link); 2990 bdev_io->internal.ch->io_outstanding++; 2991 shared_resource->io_outstanding++; 2992 bdev_io->internal.status = SPDK_BDEV_IO_STATUS_PENDING; 2993 bdev->fn_table->submit_request(bdev_io->internal.ch->channel, bdev_io); 2994 if (bdev_io->internal.status == SPDK_BDEV_IO_STATUS_NOMEM) { 2995 break; 2996 } 2997 } 2998 } 2999 3000 static inline void 3001 _spdk_bdev_io_complete(void *ctx) 3002 { 3003 struct spdk_bdev_io *bdev_io = ctx; 3004 uint64_t tsc; 3005 3006 if (spdk_unlikely(bdev_io->internal.in_submit_request || bdev_io->internal.io_submit_ch)) { 3007 /* 3008 * Send the completion to the thread that originally submitted the I/O, 3009 * which may not be the current thread in the case of QoS. 3010 */ 3011 if (bdev_io->internal.io_submit_ch) { 3012 bdev_io->internal.ch = bdev_io->internal.io_submit_ch; 3013 bdev_io->internal.io_submit_ch = NULL; 3014 } 3015 3016 /* 3017 * Defer completion to avoid potential infinite recursion if the 3018 * user's completion callback issues a new I/O. 3019 */ 3020 spdk_thread_send_msg(spdk_io_channel_get_thread(bdev_io->internal.ch->channel), 3021 _spdk_bdev_io_complete, bdev_io); 3022 return; 3023 } 3024 3025 tsc = spdk_get_ticks(); 3026 spdk_trace_record_tsc(tsc, TRACE_BDEV_IO_DONE, 0, 0, (uintptr_t)bdev_io, 0); 3027 3028 if (bdev_io->internal.status == SPDK_BDEV_IO_STATUS_SUCCESS) { 3029 switch (bdev_io->type) { 3030 case SPDK_BDEV_IO_TYPE_READ: 3031 bdev_io->internal.ch->stat.bytes_read += bdev_io->u.bdev.num_blocks * bdev_io->bdev->blocklen; 3032 bdev_io->internal.ch->stat.num_read_ops++; 3033 bdev_io->internal.ch->stat.read_latency_ticks += (tsc - bdev_io->internal.submit_tsc); 3034 break; 3035 case SPDK_BDEV_IO_TYPE_WRITE: 3036 bdev_io->internal.ch->stat.bytes_written += bdev_io->u.bdev.num_blocks * bdev_io->bdev->blocklen; 3037 bdev_io->internal.ch->stat.num_write_ops++; 3038 bdev_io->internal.ch->stat.write_latency_ticks += (tsc - bdev_io->internal.submit_tsc); 3039 break; 3040 default: 3041 break; 3042 } 3043 } 3044 3045 #ifdef SPDK_CONFIG_VTUNE 3046 uint64_t now_tsc = spdk_get_ticks(); 3047 if (now_tsc > (bdev_io->internal.ch->start_tsc + bdev_io->internal.ch->interval_tsc)) { 3048 uint64_t data[5]; 3049 3050 data[0] = bdev_io->internal.ch->stat.num_read_ops - bdev_io->internal.ch->prev_stat.num_read_ops; 3051 data[1] = bdev_io->internal.ch->stat.bytes_read - bdev_io->internal.ch->prev_stat.bytes_read; 3052 data[2] = bdev_io->internal.ch->stat.num_write_ops - bdev_io->internal.ch->prev_stat.num_write_ops; 3053 data[3] = bdev_io->internal.ch->stat.bytes_written - bdev_io->internal.ch->prev_stat.bytes_written; 3054 data[4] = bdev_io->bdev->fn_table->get_spin_time ? 3055 bdev_io->bdev->fn_table->get_spin_time(bdev_io->internal.ch->channel) : 0; 3056 3057 __itt_metadata_add(g_bdev_mgr.domain, __itt_null, bdev_io->internal.ch->handle, 3058 __itt_metadata_u64, 5, data); 3059 3060 bdev_io->internal.ch->prev_stat = bdev_io->internal.ch->stat; 3061 bdev_io->internal.ch->start_tsc = now_tsc; 3062 } 3063 #endif 3064 3065 assert(bdev_io->internal.cb != NULL); 3066 assert(spdk_get_thread() == spdk_io_channel_get_thread(bdev_io->internal.ch->channel)); 3067 3068 bdev_io->internal.cb(bdev_io, bdev_io->internal.status == SPDK_BDEV_IO_STATUS_SUCCESS, 3069 bdev_io->internal.caller_ctx); 3070 } 3071 3072 static void 3073 _spdk_bdev_reset_complete(struct spdk_io_channel_iter *i, int status) 3074 { 3075 struct spdk_bdev_io *bdev_io = spdk_io_channel_iter_get_ctx(i); 3076 3077 if (bdev_io->u.reset.ch_ref != NULL) { 3078 spdk_put_io_channel(bdev_io->u.reset.ch_ref); 3079 bdev_io->u.reset.ch_ref = NULL; 3080 } 3081 3082 _spdk_bdev_io_complete(bdev_io); 3083 } 3084 3085 static void 3086 _spdk_bdev_unfreeze_channel(struct spdk_io_channel_iter *i) 3087 { 3088 struct spdk_io_channel *_ch = spdk_io_channel_iter_get_channel(i); 3089 struct spdk_bdev_channel *ch = spdk_io_channel_get_ctx(_ch); 3090 3091 ch->flags &= ~BDEV_CH_RESET_IN_PROGRESS; 3092 if (!TAILQ_EMPTY(&ch->queued_resets)) { 3093 _spdk_bdev_channel_start_reset(ch); 3094 } 3095 3096 spdk_for_each_channel_continue(i, 0); 3097 } 3098 3099 void 3100 spdk_bdev_io_complete(struct spdk_bdev_io *bdev_io, enum spdk_bdev_io_status status) 3101 { 3102 struct spdk_bdev *bdev = bdev_io->bdev; 3103 struct spdk_bdev_channel *bdev_ch = bdev_io->internal.ch; 3104 struct spdk_bdev_shared_resource *shared_resource = bdev_ch->shared_resource; 3105 3106 bdev_io->internal.status = status; 3107 3108 if (spdk_unlikely(bdev_io->type == SPDK_BDEV_IO_TYPE_RESET)) { 3109 bool unlock_channels = false; 3110 3111 if (status == SPDK_BDEV_IO_STATUS_NOMEM) { 3112 SPDK_ERRLOG("NOMEM returned for reset\n"); 3113 } 3114 pthread_mutex_lock(&bdev->internal.mutex); 3115 if (bdev_io == bdev->internal.reset_in_progress) { 3116 bdev->internal.reset_in_progress = NULL; 3117 unlock_channels = true; 3118 } 3119 pthread_mutex_unlock(&bdev->internal.mutex); 3120 3121 if (unlock_channels) { 3122 spdk_for_each_channel(__bdev_to_io_dev(bdev), _spdk_bdev_unfreeze_channel, 3123 bdev_io, _spdk_bdev_reset_complete); 3124 return; 3125 } 3126 } else { 3127 if (spdk_unlikely(bdev_io->internal.orig_iovcnt > 0)) { 3128 _bdev_io_unset_bounce_buf(bdev_io); 3129 } 3130 3131 assert(bdev_ch->io_outstanding > 0); 3132 assert(shared_resource->io_outstanding > 0); 3133 bdev_ch->io_outstanding--; 3134 shared_resource->io_outstanding--; 3135 3136 if (spdk_unlikely(status == SPDK_BDEV_IO_STATUS_NOMEM)) { 3137 TAILQ_INSERT_HEAD(&shared_resource->nomem_io, bdev_io, internal.link); 3138 /* 3139 * Wait for some of the outstanding I/O to complete before we 3140 * retry any of the nomem_io. Normally we will wait for 3141 * NOMEM_THRESHOLD_COUNT I/O to complete but for low queue 3142 * depth channels we will instead wait for half to complete. 3143 */ 3144 shared_resource->nomem_threshold = spdk_max((int64_t)shared_resource->io_outstanding / 2, 3145 (int64_t)shared_resource->io_outstanding - NOMEM_THRESHOLD_COUNT); 3146 return; 3147 } 3148 3149 if (spdk_unlikely(!TAILQ_EMPTY(&shared_resource->nomem_io))) { 3150 _spdk_bdev_ch_retry_io(bdev_ch); 3151 } 3152 } 3153 3154 _spdk_bdev_io_complete(bdev_io); 3155 } 3156 3157 void 3158 spdk_bdev_io_complete_scsi_status(struct spdk_bdev_io *bdev_io, enum spdk_scsi_status sc, 3159 enum spdk_scsi_sense sk, uint8_t asc, uint8_t ascq) 3160 { 3161 if (sc == SPDK_SCSI_STATUS_GOOD) { 3162 bdev_io->internal.status = SPDK_BDEV_IO_STATUS_SUCCESS; 3163 } else { 3164 bdev_io->internal.status = SPDK_BDEV_IO_STATUS_SCSI_ERROR; 3165 bdev_io->internal.error.scsi.sc = sc; 3166 bdev_io->internal.error.scsi.sk = sk; 3167 bdev_io->internal.error.scsi.asc = asc; 3168 bdev_io->internal.error.scsi.ascq = ascq; 3169 } 3170 3171 spdk_bdev_io_complete(bdev_io, bdev_io->internal.status); 3172 } 3173 3174 void 3175 spdk_bdev_io_get_scsi_status(const struct spdk_bdev_io *bdev_io, 3176 int *sc, int *sk, int *asc, int *ascq) 3177 { 3178 assert(sc != NULL); 3179 assert(sk != NULL); 3180 assert(asc != NULL); 3181 assert(ascq != NULL); 3182 3183 switch (bdev_io->internal.status) { 3184 case SPDK_BDEV_IO_STATUS_SUCCESS: 3185 *sc = SPDK_SCSI_STATUS_GOOD; 3186 *sk = SPDK_SCSI_SENSE_NO_SENSE; 3187 *asc = SPDK_SCSI_ASC_NO_ADDITIONAL_SENSE; 3188 *ascq = SPDK_SCSI_ASCQ_CAUSE_NOT_REPORTABLE; 3189 break; 3190 case SPDK_BDEV_IO_STATUS_NVME_ERROR: 3191 spdk_scsi_nvme_translate(bdev_io, sc, sk, asc, ascq); 3192 break; 3193 case SPDK_BDEV_IO_STATUS_SCSI_ERROR: 3194 *sc = bdev_io->internal.error.scsi.sc; 3195 *sk = bdev_io->internal.error.scsi.sk; 3196 *asc = bdev_io->internal.error.scsi.asc; 3197 *ascq = bdev_io->internal.error.scsi.ascq; 3198 break; 3199 default: 3200 *sc = SPDK_SCSI_STATUS_CHECK_CONDITION; 3201 *sk = SPDK_SCSI_SENSE_ABORTED_COMMAND; 3202 *asc = SPDK_SCSI_ASC_NO_ADDITIONAL_SENSE; 3203 *ascq = SPDK_SCSI_ASCQ_CAUSE_NOT_REPORTABLE; 3204 break; 3205 } 3206 } 3207 3208 void 3209 spdk_bdev_io_complete_nvme_status(struct spdk_bdev_io *bdev_io, int sct, int sc) 3210 { 3211 if (sct == SPDK_NVME_SCT_GENERIC && sc == SPDK_NVME_SC_SUCCESS) { 3212 bdev_io->internal.status = SPDK_BDEV_IO_STATUS_SUCCESS; 3213 } else { 3214 bdev_io->internal.error.nvme.sct = sct; 3215 bdev_io->internal.error.nvme.sc = sc; 3216 bdev_io->internal.status = SPDK_BDEV_IO_STATUS_NVME_ERROR; 3217 } 3218 3219 spdk_bdev_io_complete(bdev_io, bdev_io->internal.status); 3220 } 3221 3222 void 3223 spdk_bdev_io_get_nvme_status(const struct spdk_bdev_io *bdev_io, int *sct, int *sc) 3224 { 3225 assert(sct != NULL); 3226 assert(sc != NULL); 3227 3228 if (bdev_io->internal.status == SPDK_BDEV_IO_STATUS_NVME_ERROR) { 3229 *sct = bdev_io->internal.error.nvme.sct; 3230 *sc = bdev_io->internal.error.nvme.sc; 3231 } else if (bdev_io->internal.status == SPDK_BDEV_IO_STATUS_SUCCESS) { 3232 *sct = SPDK_NVME_SCT_GENERIC; 3233 *sc = SPDK_NVME_SC_SUCCESS; 3234 } else { 3235 *sct = SPDK_NVME_SCT_GENERIC; 3236 *sc = SPDK_NVME_SC_INTERNAL_DEVICE_ERROR; 3237 } 3238 } 3239 3240 struct spdk_thread * 3241 spdk_bdev_io_get_thread(struct spdk_bdev_io *bdev_io) 3242 { 3243 return spdk_io_channel_get_thread(bdev_io->internal.ch->channel); 3244 } 3245 3246 static void 3247 _spdk_bdev_qos_config_limit(struct spdk_bdev *bdev, uint64_t *limits) 3248 { 3249 uint64_t min_qos_set; 3250 int i; 3251 3252 for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) { 3253 if (limits[i] != SPDK_BDEV_QOS_LIMIT_NOT_DEFINED) { 3254 break; 3255 } 3256 } 3257 3258 if (i == SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES) { 3259 SPDK_ERRLOG("Invalid rate limits set.\n"); 3260 return; 3261 } 3262 3263 for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) { 3264 if (limits[i] == SPDK_BDEV_QOS_LIMIT_NOT_DEFINED) { 3265 continue; 3266 } 3267 3268 if (_spdk_bdev_qos_is_iops_rate_limit(i) == true) { 3269 min_qos_set = SPDK_BDEV_QOS_MIN_IOS_PER_SEC; 3270 } else { 3271 min_qos_set = SPDK_BDEV_QOS_MIN_BYTES_PER_SEC; 3272 } 3273 3274 if (limits[i] == 0 || limits[i] % min_qos_set) { 3275 SPDK_ERRLOG("Assigned limit %" PRIu64 " on bdev %s is not multiple of %" PRIu64 "\n", 3276 limits[i], bdev->name, min_qos_set); 3277 SPDK_ERRLOG("Failed to enable QoS on this bdev %s\n", bdev->name); 3278 return; 3279 } 3280 } 3281 3282 if (!bdev->internal.qos) { 3283 bdev->internal.qos = calloc(1, sizeof(*bdev->internal.qos)); 3284 if (!bdev->internal.qos) { 3285 SPDK_ERRLOG("Unable to allocate memory for QoS tracking\n"); 3286 return; 3287 } 3288 } 3289 3290 for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) { 3291 bdev->internal.qos->rate_limits[i].limit = limits[i]; 3292 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Bdev:%s QoS type:%d set:%lu\n", 3293 bdev->name, i, limits[i]); 3294 } 3295 3296 return; 3297 } 3298 3299 static void 3300 _spdk_bdev_qos_config(struct spdk_bdev *bdev) 3301 { 3302 struct spdk_conf_section *sp = NULL; 3303 const char *val = NULL; 3304 int i = 0, j = 0; 3305 uint64_t limits[SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES] = {}; 3306 bool config_qos = false; 3307 3308 sp = spdk_conf_find_section(NULL, "QoS"); 3309 if (!sp) { 3310 return; 3311 } 3312 3313 while (j < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES) { 3314 limits[j] = SPDK_BDEV_QOS_LIMIT_NOT_DEFINED; 3315 3316 i = 0; 3317 while (true) { 3318 val = spdk_conf_section_get_nmval(sp, qos_conf_type[j], i, 0); 3319 if (!val) { 3320 break; 3321 } 3322 3323 if (strcmp(bdev->name, val) != 0) { 3324 i++; 3325 continue; 3326 } 3327 3328 val = spdk_conf_section_get_nmval(sp, qos_conf_type[j], i, 1); 3329 if (val) { 3330 if (_spdk_bdev_qos_is_iops_rate_limit(j) == true) { 3331 limits[j] = strtoull(val, NULL, 10); 3332 } else { 3333 limits[j] = strtoull(val, NULL, 10) * 1024 * 1024; 3334 } 3335 config_qos = true; 3336 } 3337 3338 break; 3339 } 3340 3341 j++; 3342 } 3343 3344 if (config_qos == true) { 3345 _spdk_bdev_qos_config_limit(bdev, limits); 3346 } 3347 3348 return; 3349 } 3350 3351 static int 3352 spdk_bdev_init(struct spdk_bdev *bdev) 3353 { 3354 char *bdev_name; 3355 3356 assert(bdev->module != NULL); 3357 3358 if (!bdev->name) { 3359 SPDK_ERRLOG("Bdev name is NULL\n"); 3360 return -EINVAL; 3361 } 3362 3363 if (spdk_bdev_get_by_name(bdev->name)) { 3364 SPDK_ERRLOG("Bdev name:%s already exists\n", bdev->name); 3365 return -EEXIST; 3366 } 3367 3368 /* Users often register their own I/O devices using the bdev name. In 3369 * order to avoid conflicts, prepend bdev_. */ 3370 bdev_name = spdk_sprintf_alloc("bdev_%s", bdev->name); 3371 if (!bdev_name) { 3372 SPDK_ERRLOG("Unable to allocate memory for internal bdev name.\n"); 3373 return -ENOMEM; 3374 } 3375 3376 bdev->internal.status = SPDK_BDEV_STATUS_READY; 3377 bdev->internal.measured_queue_depth = UINT64_MAX; 3378 bdev->internal.claim_module = NULL; 3379 bdev->internal.qd_poller = NULL; 3380 bdev->internal.qos = NULL; 3381 3382 if (spdk_bdev_get_buf_align(bdev) > 1) { 3383 if (bdev->split_on_optimal_io_boundary) { 3384 bdev->optimal_io_boundary = spdk_min(bdev->optimal_io_boundary, 3385 SPDK_BDEV_LARGE_BUF_MAX_SIZE / bdev->blocklen); 3386 } else { 3387 bdev->split_on_optimal_io_boundary = true; 3388 bdev->optimal_io_boundary = SPDK_BDEV_LARGE_BUF_MAX_SIZE / bdev->blocklen; 3389 } 3390 } 3391 3392 TAILQ_INIT(&bdev->internal.open_descs); 3393 3394 TAILQ_INIT(&bdev->aliases); 3395 3396 bdev->internal.reset_in_progress = NULL; 3397 3398 _spdk_bdev_qos_config(bdev); 3399 3400 spdk_io_device_register(__bdev_to_io_dev(bdev), 3401 spdk_bdev_channel_create, spdk_bdev_channel_destroy, 3402 sizeof(struct spdk_bdev_channel), 3403 bdev_name); 3404 3405 free(bdev_name); 3406 3407 pthread_mutex_init(&bdev->internal.mutex, NULL); 3408 return 0; 3409 } 3410 3411 static void 3412 spdk_bdev_destroy_cb(void *io_device) 3413 { 3414 int rc; 3415 struct spdk_bdev *bdev; 3416 spdk_bdev_unregister_cb cb_fn; 3417 void *cb_arg; 3418 3419 bdev = __bdev_from_io_dev(io_device); 3420 cb_fn = bdev->internal.unregister_cb; 3421 cb_arg = bdev->internal.unregister_ctx; 3422 3423 rc = bdev->fn_table->destruct(bdev->ctxt); 3424 if (rc < 0) { 3425 SPDK_ERRLOG("destruct failed\n"); 3426 } 3427 if (rc <= 0 && cb_fn != NULL) { 3428 cb_fn(cb_arg, rc); 3429 } 3430 } 3431 3432 3433 static void 3434 spdk_bdev_fini(struct spdk_bdev *bdev) 3435 { 3436 pthread_mutex_destroy(&bdev->internal.mutex); 3437 3438 free(bdev->internal.qos); 3439 3440 spdk_io_device_unregister(__bdev_to_io_dev(bdev), spdk_bdev_destroy_cb); 3441 } 3442 3443 static void 3444 spdk_bdev_start(struct spdk_bdev *bdev) 3445 { 3446 struct spdk_bdev_module *module; 3447 uint32_t action; 3448 3449 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Inserting bdev %s into list\n", bdev->name); 3450 TAILQ_INSERT_TAIL(&g_bdev_mgr.bdevs, bdev, internal.link); 3451 3452 /* Examine configuration before initializing I/O */ 3453 TAILQ_FOREACH(module, &g_bdev_mgr.bdev_modules, internal.tailq) { 3454 if (module->examine_config) { 3455 action = module->internal.action_in_progress; 3456 module->internal.action_in_progress++; 3457 module->examine_config(bdev); 3458 if (action != module->internal.action_in_progress) { 3459 SPDK_ERRLOG("examine_config for module %s did not call spdk_bdev_module_examine_done()\n", 3460 module->name); 3461 } 3462 } 3463 } 3464 3465 if (bdev->internal.claim_module) { 3466 return; 3467 } 3468 3469 TAILQ_FOREACH(module, &g_bdev_mgr.bdev_modules, internal.tailq) { 3470 if (module->examine_disk) { 3471 module->internal.action_in_progress++; 3472 module->examine_disk(bdev); 3473 } 3474 } 3475 } 3476 3477 int 3478 spdk_bdev_register(struct spdk_bdev *bdev) 3479 { 3480 int rc = spdk_bdev_init(bdev); 3481 3482 if (rc == 0) { 3483 spdk_bdev_start(bdev); 3484 } 3485 3486 return rc; 3487 } 3488 3489 int 3490 spdk_vbdev_register(struct spdk_bdev *vbdev, struct spdk_bdev **base_bdevs, int base_bdev_count) 3491 { 3492 int rc; 3493 3494 rc = spdk_bdev_init(vbdev); 3495 if (rc) { 3496 return rc; 3497 } 3498 3499 spdk_bdev_start(vbdev); 3500 return 0; 3501 } 3502 3503 void 3504 spdk_bdev_destruct_done(struct spdk_bdev *bdev, int bdeverrno) 3505 { 3506 if (bdev->internal.unregister_cb != NULL) { 3507 bdev->internal.unregister_cb(bdev->internal.unregister_ctx, bdeverrno); 3508 } 3509 } 3510 3511 static void 3512 _remove_notify(void *arg) 3513 { 3514 struct spdk_bdev_desc *desc = arg; 3515 3516 desc->remove_scheduled = false; 3517 3518 if (desc->closed) { 3519 free(desc); 3520 } else { 3521 desc->remove_cb(desc->remove_ctx); 3522 } 3523 } 3524 3525 void 3526 spdk_bdev_unregister(struct spdk_bdev *bdev, spdk_bdev_unregister_cb cb_fn, void *cb_arg) 3527 { 3528 struct spdk_bdev_desc *desc, *tmp; 3529 bool do_destruct = true; 3530 struct spdk_thread *thread; 3531 3532 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Removing bdev %s from list\n", bdev->name); 3533 3534 thread = spdk_get_thread(); 3535 if (!thread) { 3536 /* The user called this from a non-SPDK thread. */ 3537 if (cb_fn != NULL) { 3538 cb_fn(cb_arg, -ENOTSUP); 3539 } 3540 return; 3541 } 3542 3543 pthread_mutex_lock(&bdev->internal.mutex); 3544 3545 bdev->internal.status = SPDK_BDEV_STATUS_REMOVING; 3546 bdev->internal.unregister_cb = cb_fn; 3547 bdev->internal.unregister_ctx = cb_arg; 3548 3549 TAILQ_FOREACH_SAFE(desc, &bdev->internal.open_descs, link, tmp) { 3550 if (desc->remove_cb) { 3551 do_destruct = false; 3552 /* 3553 * Defer invocation of the remove_cb to a separate message that will 3554 * run later on its thread. This ensures this context unwinds and 3555 * we don't recursively unregister this bdev again if the remove_cb 3556 * immediately closes its descriptor. 3557 */ 3558 if (!desc->remove_scheduled) { 3559 /* Avoid scheduling removal of the same descriptor multiple times. */ 3560 desc->remove_scheduled = true; 3561 spdk_thread_send_msg(desc->thread, _remove_notify, desc); 3562 } 3563 } 3564 } 3565 3566 if (!do_destruct) { 3567 pthread_mutex_unlock(&bdev->internal.mutex); 3568 return; 3569 } 3570 3571 TAILQ_REMOVE(&g_bdev_mgr.bdevs, bdev, internal.link); 3572 pthread_mutex_unlock(&bdev->internal.mutex); 3573 3574 spdk_bdev_fini(bdev); 3575 } 3576 3577 int 3578 spdk_bdev_open(struct spdk_bdev *bdev, bool write, spdk_bdev_remove_cb_t remove_cb, 3579 void *remove_ctx, struct spdk_bdev_desc **_desc) 3580 { 3581 struct spdk_bdev_desc *desc; 3582 struct spdk_thread *thread; 3583 3584 thread = spdk_get_thread(); 3585 if (!thread) { 3586 SPDK_ERRLOG("Cannot open bdev from non-SPDK thread.\n"); 3587 return -ENOTSUP; 3588 } 3589 3590 desc = calloc(1, sizeof(*desc)); 3591 if (desc == NULL) { 3592 SPDK_ERRLOG("Failed to allocate memory for bdev descriptor\n"); 3593 return -ENOMEM; 3594 } 3595 3596 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Opening descriptor %p for bdev %s on thread %p\n", desc, bdev->name, 3597 spdk_get_thread()); 3598 3599 pthread_mutex_lock(&bdev->internal.mutex); 3600 3601 if (write && bdev->internal.claim_module) { 3602 SPDK_ERRLOG("Could not open %s - %s module already claimed it\n", 3603 bdev->name, bdev->internal.claim_module->name); 3604 free(desc); 3605 pthread_mutex_unlock(&bdev->internal.mutex); 3606 return -EPERM; 3607 } 3608 3609 TAILQ_INSERT_TAIL(&bdev->internal.open_descs, desc, link); 3610 3611 desc->bdev = bdev; 3612 desc->thread = thread; 3613 desc->remove_cb = remove_cb; 3614 desc->remove_ctx = remove_ctx; 3615 desc->write = write; 3616 *_desc = desc; 3617 3618 pthread_mutex_unlock(&bdev->internal.mutex); 3619 3620 return 0; 3621 } 3622 3623 void 3624 spdk_bdev_close(struct spdk_bdev_desc *desc) 3625 { 3626 struct spdk_bdev *bdev = desc->bdev; 3627 bool do_unregister = false; 3628 3629 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Closing descriptor %p for bdev %s on thread %p\n", desc, bdev->name, 3630 spdk_get_thread()); 3631 3632 assert(desc->thread == spdk_get_thread()); 3633 3634 pthread_mutex_lock(&bdev->internal.mutex); 3635 3636 TAILQ_REMOVE(&bdev->internal.open_descs, desc, link); 3637 3638 desc->closed = true; 3639 3640 if (!desc->remove_scheduled) { 3641 free(desc); 3642 } 3643 3644 /* If no more descriptors, kill QoS channel */ 3645 if (bdev->internal.qos && TAILQ_EMPTY(&bdev->internal.open_descs)) { 3646 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Closed last descriptor for bdev %s on thread %p. Stopping QoS.\n", 3647 bdev->name, spdk_get_thread()); 3648 3649 if (spdk_bdev_qos_destroy(bdev)) { 3650 /* There isn't anything we can do to recover here. Just let the 3651 * old QoS poller keep running. The QoS handling won't change 3652 * cores when the user allocates a new channel, but it won't break. */ 3653 SPDK_ERRLOG("Unable to shut down QoS poller. It will continue running on the current thread.\n"); 3654 } 3655 } 3656 3657 spdk_bdev_set_qd_sampling_period(bdev, 0); 3658 3659 if (bdev->internal.status == SPDK_BDEV_STATUS_REMOVING && TAILQ_EMPTY(&bdev->internal.open_descs)) { 3660 do_unregister = true; 3661 } 3662 pthread_mutex_unlock(&bdev->internal.mutex); 3663 3664 if (do_unregister == true) { 3665 spdk_bdev_unregister(bdev, bdev->internal.unregister_cb, bdev->internal.unregister_ctx); 3666 } 3667 } 3668 3669 int 3670 spdk_bdev_module_claim_bdev(struct spdk_bdev *bdev, struct spdk_bdev_desc *desc, 3671 struct spdk_bdev_module *module) 3672 { 3673 if (bdev->internal.claim_module != NULL) { 3674 SPDK_ERRLOG("bdev %s already claimed by module %s\n", bdev->name, 3675 bdev->internal.claim_module->name); 3676 return -EPERM; 3677 } 3678 3679 if (desc && !desc->write) { 3680 desc->write = true; 3681 } 3682 3683 bdev->internal.claim_module = module; 3684 return 0; 3685 } 3686 3687 void 3688 spdk_bdev_module_release_bdev(struct spdk_bdev *bdev) 3689 { 3690 assert(bdev->internal.claim_module != NULL); 3691 bdev->internal.claim_module = NULL; 3692 } 3693 3694 struct spdk_bdev * 3695 spdk_bdev_desc_get_bdev(struct spdk_bdev_desc *desc) 3696 { 3697 return desc->bdev; 3698 } 3699 3700 void 3701 spdk_bdev_io_get_iovec(struct spdk_bdev_io *bdev_io, struct iovec **iovp, int *iovcntp) 3702 { 3703 struct iovec *iovs; 3704 int iovcnt; 3705 3706 if (bdev_io == NULL) { 3707 return; 3708 } 3709 3710 switch (bdev_io->type) { 3711 case SPDK_BDEV_IO_TYPE_READ: 3712 iovs = bdev_io->u.bdev.iovs; 3713 iovcnt = bdev_io->u.bdev.iovcnt; 3714 break; 3715 case SPDK_BDEV_IO_TYPE_WRITE: 3716 iovs = bdev_io->u.bdev.iovs; 3717 iovcnt = bdev_io->u.bdev.iovcnt; 3718 break; 3719 default: 3720 iovs = NULL; 3721 iovcnt = 0; 3722 break; 3723 } 3724 3725 if (iovp) { 3726 *iovp = iovs; 3727 } 3728 if (iovcntp) { 3729 *iovcntp = iovcnt; 3730 } 3731 } 3732 3733 void 3734 spdk_bdev_module_list_add(struct spdk_bdev_module *bdev_module) 3735 { 3736 3737 if (spdk_bdev_module_list_find(bdev_module->name)) { 3738 SPDK_ERRLOG("ERROR: module '%s' already registered.\n", bdev_module->name); 3739 assert(false); 3740 } 3741 3742 if (bdev_module->async_init) { 3743 bdev_module->internal.action_in_progress = 1; 3744 } 3745 3746 /* 3747 * Modules with examine callbacks must be initialized first, so they are 3748 * ready to handle examine callbacks from later modules that will 3749 * register physical bdevs. 3750 */ 3751 if (bdev_module->examine_config != NULL || bdev_module->examine_disk != NULL) { 3752 TAILQ_INSERT_HEAD(&g_bdev_mgr.bdev_modules, bdev_module, internal.tailq); 3753 } else { 3754 TAILQ_INSERT_TAIL(&g_bdev_mgr.bdev_modules, bdev_module, internal.tailq); 3755 } 3756 } 3757 3758 struct spdk_bdev_module * 3759 spdk_bdev_module_list_find(const char *name) 3760 { 3761 struct spdk_bdev_module *bdev_module; 3762 3763 TAILQ_FOREACH(bdev_module, &g_bdev_mgr.bdev_modules, internal.tailq) { 3764 if (strcmp(name, bdev_module->name) == 0) { 3765 break; 3766 } 3767 } 3768 3769 return bdev_module; 3770 } 3771 3772 static void 3773 _spdk_bdev_write_zero_buffer_next(void *_bdev_io) 3774 { 3775 struct spdk_bdev_io *bdev_io = _bdev_io; 3776 uint64_t num_bytes, num_blocks; 3777 int rc; 3778 3779 num_bytes = spdk_min(spdk_bdev_get_block_size(bdev_io->bdev) * 3780 bdev_io->u.bdev.split_remaining_num_blocks, 3781 ZERO_BUFFER_SIZE); 3782 num_blocks = num_bytes / spdk_bdev_get_block_size(bdev_io->bdev); 3783 3784 rc = spdk_bdev_write_blocks(bdev_io->internal.desc, 3785 spdk_io_channel_from_ctx(bdev_io->internal.ch), 3786 g_bdev_mgr.zero_buffer, 3787 bdev_io->u.bdev.split_current_offset_blocks, num_blocks, 3788 _spdk_bdev_write_zero_buffer_done, bdev_io); 3789 if (rc == 0) { 3790 bdev_io->u.bdev.split_remaining_num_blocks -= num_blocks; 3791 bdev_io->u.bdev.split_current_offset_blocks += num_blocks; 3792 } else if (rc == -ENOMEM) { 3793 _spdk_bdev_queue_io_wait_with_cb(bdev_io, _spdk_bdev_write_zero_buffer_next); 3794 } else { 3795 bdev_io->internal.status = SPDK_BDEV_IO_STATUS_FAILED; 3796 bdev_io->internal.cb(bdev_io, false, bdev_io->internal.caller_ctx); 3797 } 3798 } 3799 3800 static void 3801 _spdk_bdev_write_zero_buffer_done(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg) 3802 { 3803 struct spdk_bdev_io *parent_io = cb_arg; 3804 3805 spdk_bdev_free_io(bdev_io); 3806 3807 if (!success) { 3808 parent_io->internal.status = SPDK_BDEV_IO_STATUS_FAILED; 3809 parent_io->internal.cb(parent_io, false, parent_io->internal.caller_ctx); 3810 return; 3811 } 3812 3813 if (parent_io->u.bdev.split_remaining_num_blocks == 0) { 3814 parent_io->internal.status = SPDK_BDEV_IO_STATUS_SUCCESS; 3815 parent_io->internal.cb(parent_io, true, parent_io->internal.caller_ctx); 3816 return; 3817 } 3818 3819 _spdk_bdev_write_zero_buffer_next(parent_io); 3820 } 3821 3822 struct set_qos_limit_ctx { 3823 void (*cb_fn)(void *cb_arg, int status); 3824 void *cb_arg; 3825 struct spdk_bdev *bdev; 3826 }; 3827 3828 static void 3829 _spdk_bdev_set_qos_limit_done(struct set_qos_limit_ctx *ctx, int status) 3830 { 3831 pthread_mutex_lock(&ctx->bdev->internal.mutex); 3832 ctx->bdev->internal.qos_mod_in_progress = false; 3833 pthread_mutex_unlock(&ctx->bdev->internal.mutex); 3834 3835 ctx->cb_fn(ctx->cb_arg, status); 3836 free(ctx); 3837 } 3838 3839 static void 3840 _spdk_bdev_disable_qos_done(void *cb_arg) 3841 { 3842 struct set_qos_limit_ctx *ctx = cb_arg; 3843 struct spdk_bdev *bdev = ctx->bdev; 3844 struct spdk_bdev_io *bdev_io; 3845 struct spdk_bdev_qos *qos; 3846 3847 pthread_mutex_lock(&bdev->internal.mutex); 3848 qos = bdev->internal.qos; 3849 bdev->internal.qos = NULL; 3850 pthread_mutex_unlock(&bdev->internal.mutex); 3851 3852 while (!TAILQ_EMPTY(&qos->queued)) { 3853 /* Send queued I/O back to their original thread for resubmission. */ 3854 bdev_io = TAILQ_FIRST(&qos->queued); 3855 TAILQ_REMOVE(&qos->queued, bdev_io, internal.link); 3856 3857 if (bdev_io->internal.io_submit_ch) { 3858 /* 3859 * Channel was changed when sending it to the QoS thread - change it back 3860 * before sending it back to the original thread. 3861 */ 3862 bdev_io->internal.ch = bdev_io->internal.io_submit_ch; 3863 bdev_io->internal.io_submit_ch = NULL; 3864 } 3865 3866 spdk_thread_send_msg(spdk_io_channel_get_thread(bdev_io->internal.ch->channel), 3867 _spdk_bdev_io_submit, bdev_io); 3868 } 3869 3870 spdk_put_io_channel(spdk_io_channel_from_ctx(qos->ch)); 3871 spdk_poller_unregister(&qos->poller); 3872 3873 free(qos); 3874 3875 _spdk_bdev_set_qos_limit_done(ctx, 0); 3876 } 3877 3878 static void 3879 _spdk_bdev_disable_qos_msg_done(struct spdk_io_channel_iter *i, int status) 3880 { 3881 void *io_device = spdk_io_channel_iter_get_io_device(i); 3882 struct spdk_bdev *bdev = __bdev_from_io_dev(io_device); 3883 struct set_qos_limit_ctx *ctx = spdk_io_channel_iter_get_ctx(i); 3884 struct spdk_thread *thread; 3885 3886 pthread_mutex_lock(&bdev->internal.mutex); 3887 thread = bdev->internal.qos->thread; 3888 pthread_mutex_unlock(&bdev->internal.mutex); 3889 3890 spdk_thread_send_msg(thread, _spdk_bdev_disable_qos_done, ctx); 3891 } 3892 3893 static void 3894 _spdk_bdev_disable_qos_msg(struct spdk_io_channel_iter *i) 3895 { 3896 struct spdk_io_channel *ch = spdk_io_channel_iter_get_channel(i); 3897 struct spdk_bdev_channel *bdev_ch = spdk_io_channel_get_ctx(ch); 3898 3899 bdev_ch->flags &= ~BDEV_CH_QOS_ENABLED; 3900 3901 spdk_for_each_channel_continue(i, 0); 3902 } 3903 3904 static void 3905 _spdk_bdev_update_qos_rate_limit_msg(void *cb_arg) 3906 { 3907 struct set_qos_limit_ctx *ctx = cb_arg; 3908 struct spdk_bdev *bdev = ctx->bdev; 3909 3910 pthread_mutex_lock(&bdev->internal.mutex); 3911 spdk_bdev_qos_update_max_quota_per_timeslice(bdev->internal.qos); 3912 pthread_mutex_unlock(&bdev->internal.mutex); 3913 3914 _spdk_bdev_set_qos_limit_done(ctx, 0); 3915 } 3916 3917 static void 3918 _spdk_bdev_enable_qos_msg(struct spdk_io_channel_iter *i) 3919 { 3920 void *io_device = spdk_io_channel_iter_get_io_device(i); 3921 struct spdk_bdev *bdev = __bdev_from_io_dev(io_device); 3922 struct spdk_io_channel *ch = spdk_io_channel_iter_get_channel(i); 3923 struct spdk_bdev_channel *bdev_ch = spdk_io_channel_get_ctx(ch); 3924 3925 pthread_mutex_lock(&bdev->internal.mutex); 3926 _spdk_bdev_enable_qos(bdev, bdev_ch); 3927 pthread_mutex_unlock(&bdev->internal.mutex); 3928 spdk_for_each_channel_continue(i, 0); 3929 } 3930 3931 static void 3932 _spdk_bdev_enable_qos_done(struct spdk_io_channel_iter *i, int status) 3933 { 3934 struct set_qos_limit_ctx *ctx = spdk_io_channel_iter_get_ctx(i); 3935 3936 _spdk_bdev_set_qos_limit_done(ctx, status); 3937 } 3938 3939 static void 3940 _spdk_bdev_set_qos_rate_limits(struct spdk_bdev *bdev, uint64_t *limits) 3941 { 3942 int i; 3943 3944 assert(bdev->internal.qos != NULL); 3945 3946 for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) { 3947 if (limits[i] != SPDK_BDEV_QOS_LIMIT_NOT_DEFINED) { 3948 bdev->internal.qos->rate_limits[i].limit = limits[i]; 3949 3950 if (limits[i] == 0) { 3951 bdev->internal.qos->rate_limits[i].limit = 3952 SPDK_BDEV_QOS_LIMIT_NOT_DEFINED; 3953 } 3954 } 3955 } 3956 } 3957 3958 void 3959 spdk_bdev_set_qos_rate_limits(struct spdk_bdev *bdev, uint64_t *limits, 3960 void (*cb_fn)(void *cb_arg, int status), void *cb_arg) 3961 { 3962 struct set_qos_limit_ctx *ctx; 3963 uint32_t limit_set_complement; 3964 uint64_t min_limit_per_sec; 3965 int i; 3966 bool disable_rate_limit = true; 3967 3968 for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) { 3969 if (limits[i] == SPDK_BDEV_QOS_LIMIT_NOT_DEFINED) { 3970 continue; 3971 } 3972 3973 if (limits[i] > 0) { 3974 disable_rate_limit = false; 3975 } 3976 3977 if (_spdk_bdev_qos_is_iops_rate_limit(i) == true) { 3978 min_limit_per_sec = SPDK_BDEV_QOS_MIN_IOS_PER_SEC; 3979 } else { 3980 /* Change from megabyte to byte rate limit */ 3981 limits[i] = limits[i] * 1024 * 1024; 3982 min_limit_per_sec = SPDK_BDEV_QOS_MIN_BYTES_PER_SEC; 3983 } 3984 3985 limit_set_complement = limits[i] % min_limit_per_sec; 3986 if (limit_set_complement) { 3987 SPDK_ERRLOG("Requested rate limit %" PRIu64 " is not a multiple of %" PRIu64 "\n", 3988 limits[i], min_limit_per_sec); 3989 limits[i] += min_limit_per_sec - limit_set_complement; 3990 SPDK_ERRLOG("Round up the rate limit to %" PRIu64 "\n", limits[i]); 3991 } 3992 } 3993 3994 ctx = calloc(1, sizeof(*ctx)); 3995 if (ctx == NULL) { 3996 cb_fn(cb_arg, -ENOMEM); 3997 return; 3998 } 3999 4000 ctx->cb_fn = cb_fn; 4001 ctx->cb_arg = cb_arg; 4002 ctx->bdev = bdev; 4003 4004 pthread_mutex_lock(&bdev->internal.mutex); 4005 if (bdev->internal.qos_mod_in_progress) { 4006 pthread_mutex_unlock(&bdev->internal.mutex); 4007 free(ctx); 4008 cb_fn(cb_arg, -EAGAIN); 4009 return; 4010 } 4011 bdev->internal.qos_mod_in_progress = true; 4012 4013 if (disable_rate_limit == true && bdev->internal.qos) { 4014 for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) { 4015 if (limits[i] == SPDK_BDEV_QOS_LIMIT_NOT_DEFINED && 4016 (bdev->internal.qos->rate_limits[i].limit > 0 && 4017 bdev->internal.qos->rate_limits[i].limit != 4018 SPDK_BDEV_QOS_LIMIT_NOT_DEFINED)) { 4019 disable_rate_limit = false; 4020 break; 4021 } 4022 } 4023 } 4024 4025 if (disable_rate_limit == false) { 4026 if (bdev->internal.qos == NULL) { 4027 /* Enabling */ 4028 bdev->internal.qos = calloc(1, sizeof(*bdev->internal.qos)); 4029 if (!bdev->internal.qos) { 4030 pthread_mutex_unlock(&bdev->internal.mutex); 4031 SPDK_ERRLOG("Unable to allocate memory for QoS tracking\n"); 4032 free(ctx); 4033 cb_fn(cb_arg, -ENOMEM); 4034 return; 4035 } 4036 4037 _spdk_bdev_set_qos_rate_limits(bdev, limits); 4038 4039 spdk_for_each_channel(__bdev_to_io_dev(bdev), 4040 _spdk_bdev_enable_qos_msg, ctx, 4041 _spdk_bdev_enable_qos_done); 4042 } else { 4043 /* Updating */ 4044 _spdk_bdev_set_qos_rate_limits(bdev, limits); 4045 4046 spdk_thread_send_msg(bdev->internal.qos->thread, 4047 _spdk_bdev_update_qos_rate_limit_msg, ctx); 4048 } 4049 } else { 4050 if (bdev->internal.qos != NULL) { 4051 _spdk_bdev_set_qos_rate_limits(bdev, limits); 4052 4053 /* Disabling */ 4054 spdk_for_each_channel(__bdev_to_io_dev(bdev), 4055 _spdk_bdev_disable_qos_msg, ctx, 4056 _spdk_bdev_disable_qos_msg_done); 4057 } else { 4058 pthread_mutex_unlock(&bdev->internal.mutex); 4059 _spdk_bdev_set_qos_limit_done(ctx, 0); 4060 return; 4061 } 4062 } 4063 4064 pthread_mutex_unlock(&bdev->internal.mutex); 4065 } 4066 4067 SPDK_LOG_REGISTER_COMPONENT("bdev", SPDK_LOG_BDEV) 4068 4069 SPDK_TRACE_REGISTER_FN(bdev_trace) 4070 { 4071 spdk_trace_register_owner(OWNER_BDEV, 'b'); 4072 spdk_trace_register_object(OBJECT_BDEV_IO, 'i'); 4073 spdk_trace_register_description("BDEV_IO_START", "", TRACE_BDEV_IO_START, OWNER_BDEV, 4074 OBJECT_BDEV_IO, 1, 0, "type: "); 4075 spdk_trace_register_description("BDEV_IO_DONE", "", TRACE_BDEV_IO_DONE, OWNER_BDEV, 4076 OBJECT_BDEV_IO, 0, 0, ""); 4077 } 4078