1 /*- 2 * BSD LICENSE 3 * 4 * Copyright (c) Intel Corporation. 5 * All rights reserved. 6 * 7 * Redistribution and use in source and binary forms, with or without 8 * modification, are permitted provided that the following conditions 9 * are met: 10 * 11 * * Redistributions of source code must retain the above copyright 12 * notice, this list of conditions and the following disclaimer. 13 * * Redistributions in binary form must reproduce the above copyright 14 * notice, this list of conditions and the following disclaimer in 15 * the documentation and/or other materials provided with the 16 * distribution. 17 * * Neither the name of Intel Corporation nor the names of its 18 * contributors may be used to endorse or promote products derived 19 * from this software without specific prior written permission. 20 * 21 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 22 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 23 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR 24 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT 25 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, 26 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT 27 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 28 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY 29 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 30 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE 31 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 32 */ 33 34 #include "spdk/stdinc.h" 35 36 #include "spdk/bdev.h" 37 #include "spdk/conf.h" 38 39 #include "spdk/config.h" 40 #include "spdk/env.h" 41 #include "spdk/event.h" 42 #include "spdk/thread.h" 43 #include "spdk/likely.h" 44 #include "spdk/queue.h" 45 #include "spdk/nvme_spec.h" 46 #include "spdk/scsi_spec.h" 47 #include "spdk/notify.h" 48 #include "spdk/util.h" 49 #include "spdk/trace.h" 50 51 #include "spdk/bdev_module.h" 52 #include "spdk_internal/log.h" 53 #include "spdk/string.h" 54 55 #ifdef SPDK_CONFIG_VTUNE 56 #include "ittnotify.h" 57 #include "ittnotify_types.h" 58 int __itt_init_ittlib(const char *, __itt_group_id); 59 #endif 60 61 #define SPDK_BDEV_IO_POOL_SIZE (64 * 1024 - 1) 62 #define SPDK_BDEV_IO_CACHE_SIZE 256 63 #define BUF_SMALL_POOL_SIZE 8191 64 #define BUF_LARGE_POOL_SIZE 1023 65 #define NOMEM_THRESHOLD_COUNT 8 66 #define ZERO_BUFFER_SIZE 0x100000 67 68 #define OWNER_BDEV 0x2 69 70 #define OBJECT_BDEV_IO 0x2 71 72 #define TRACE_GROUP_BDEV 0x3 73 #define TRACE_BDEV_IO_START SPDK_TPOINT_ID(TRACE_GROUP_BDEV, 0x0) 74 #define TRACE_BDEV_IO_DONE SPDK_TPOINT_ID(TRACE_GROUP_BDEV, 0x1) 75 76 #define SPDK_BDEV_QOS_TIMESLICE_IN_USEC 1000 77 #define SPDK_BDEV_QOS_MIN_IO_PER_TIMESLICE 1 78 #define SPDK_BDEV_QOS_MIN_BYTE_PER_TIMESLICE 512 79 #define SPDK_BDEV_QOS_MIN_IOS_PER_SEC 1000 80 #define SPDK_BDEV_QOS_MIN_BYTES_PER_SEC (1024 * 1024) 81 #define SPDK_BDEV_QOS_LIMIT_NOT_DEFINED UINT64_MAX 82 83 #define SPDK_BDEV_POOL_ALIGNMENT 512 84 85 static const char *qos_conf_type[] = {"Limit_IOPS", 86 "Limit_BPS", "Limit_Read_BPS", "Limit_Write_BPS" 87 }; 88 static const char *qos_rpc_type[] = {"rw_ios_per_sec", 89 "rw_mbytes_per_sec", "r_mbytes_per_sec", "w_mbytes_per_sec" 90 }; 91 92 TAILQ_HEAD(spdk_bdev_list, spdk_bdev); 93 94 struct spdk_bdev_mgr { 95 struct spdk_mempool *bdev_io_pool; 96 97 struct spdk_mempool *buf_small_pool; 98 struct spdk_mempool *buf_large_pool; 99 100 void *zero_buffer; 101 102 TAILQ_HEAD(bdev_module_list, spdk_bdev_module) bdev_modules; 103 104 struct spdk_bdev_list bdevs; 105 106 bool init_complete; 107 bool module_init_complete; 108 109 pthread_mutex_t mutex; 110 111 #ifdef SPDK_CONFIG_VTUNE 112 __itt_domain *domain; 113 #endif 114 }; 115 116 static struct spdk_bdev_mgr g_bdev_mgr = { 117 .bdev_modules = TAILQ_HEAD_INITIALIZER(g_bdev_mgr.bdev_modules), 118 .bdevs = TAILQ_HEAD_INITIALIZER(g_bdev_mgr.bdevs), 119 .init_complete = false, 120 .module_init_complete = false, 121 .mutex = PTHREAD_MUTEX_INITIALIZER, 122 }; 123 124 125 static struct spdk_bdev_opts g_bdev_opts = { 126 .bdev_io_pool_size = SPDK_BDEV_IO_POOL_SIZE, 127 .bdev_io_cache_size = SPDK_BDEV_IO_CACHE_SIZE, 128 }; 129 130 static spdk_bdev_init_cb g_init_cb_fn = NULL; 131 static void *g_init_cb_arg = NULL; 132 133 static spdk_bdev_fini_cb g_fini_cb_fn = NULL; 134 static void *g_fini_cb_arg = NULL; 135 static struct spdk_thread *g_fini_thread = NULL; 136 137 struct spdk_bdev_qos_limit { 138 /** IOs or bytes allowed per second (i.e., 1s). */ 139 uint64_t limit; 140 141 /** Remaining IOs or bytes allowed in current timeslice (e.g., 1ms). 142 * For remaining bytes, allowed to run negative if an I/O is submitted when 143 * some bytes are remaining, but the I/O is bigger than that amount. The 144 * excess will be deducted from the next timeslice. 145 */ 146 int64_t remaining_this_timeslice; 147 148 /** Minimum allowed IOs or bytes to be issued in one timeslice (e.g., 1ms). */ 149 uint32_t min_per_timeslice; 150 151 /** Maximum allowed IOs or bytes to be issued in one timeslice (e.g., 1ms). */ 152 uint32_t max_per_timeslice; 153 154 /** Function to check whether to queue the IO. */ 155 bool (*queue_io)(const struct spdk_bdev_qos_limit *limit, struct spdk_bdev_io *io); 156 157 /** Function to update for the submitted IO. */ 158 void (*update_quota)(struct spdk_bdev_qos_limit *limit, struct spdk_bdev_io *io); 159 }; 160 161 struct spdk_bdev_qos { 162 /** Types of structure of rate limits. */ 163 struct spdk_bdev_qos_limit rate_limits[SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES]; 164 165 /** The channel that all I/O are funneled through. */ 166 struct spdk_bdev_channel *ch; 167 168 /** The thread on which the poller is running. */ 169 struct spdk_thread *thread; 170 171 /** Queue of I/O waiting to be issued. */ 172 bdev_io_tailq_t queued; 173 174 /** Size of a timeslice in tsc ticks. */ 175 uint64_t timeslice_size; 176 177 /** Timestamp of start of last timeslice. */ 178 uint64_t last_timeslice; 179 180 /** Poller that processes queued I/O commands each time slice. */ 181 struct spdk_poller *poller; 182 }; 183 184 struct spdk_bdev_mgmt_channel { 185 bdev_io_stailq_t need_buf_small; 186 bdev_io_stailq_t need_buf_large; 187 188 /* 189 * Each thread keeps a cache of bdev_io - this allows 190 * bdev threads which are *not* DPDK threads to still 191 * benefit from a per-thread bdev_io cache. Without 192 * this, non-DPDK threads fetching from the mempool 193 * incur a cmpxchg on get and put. 194 */ 195 bdev_io_stailq_t per_thread_cache; 196 uint32_t per_thread_cache_count; 197 uint32_t bdev_io_cache_size; 198 199 TAILQ_HEAD(, spdk_bdev_shared_resource) shared_resources; 200 TAILQ_HEAD(, spdk_bdev_io_wait_entry) io_wait_queue; 201 }; 202 203 /* 204 * Per-module (or per-io_device) data. Multiple bdevs built on the same io_device 205 * will queue here their IO that awaits retry. It makes it possible to retry sending 206 * IO to one bdev after IO from other bdev completes. 207 */ 208 struct spdk_bdev_shared_resource { 209 /* The bdev management channel */ 210 struct spdk_bdev_mgmt_channel *mgmt_ch; 211 212 /* 213 * Count of I/O submitted to bdev module and waiting for completion. 214 * Incremented before submit_request() is called on an spdk_bdev_io. 215 */ 216 uint64_t io_outstanding; 217 218 /* 219 * Queue of IO awaiting retry because of a previous NOMEM status returned 220 * on this channel. 221 */ 222 bdev_io_tailq_t nomem_io; 223 224 /* 225 * Threshold which io_outstanding must drop to before retrying nomem_io. 226 */ 227 uint64_t nomem_threshold; 228 229 /* I/O channel allocated by a bdev module */ 230 struct spdk_io_channel *shared_ch; 231 232 /* Refcount of bdev channels using this resource */ 233 uint32_t ref; 234 235 TAILQ_ENTRY(spdk_bdev_shared_resource) link; 236 }; 237 238 #define BDEV_CH_RESET_IN_PROGRESS (1 << 0) 239 #define BDEV_CH_QOS_ENABLED (1 << 1) 240 241 struct spdk_bdev_channel { 242 struct spdk_bdev *bdev; 243 244 /* The channel for the underlying device */ 245 struct spdk_io_channel *channel; 246 247 /* Per io_device per thread data */ 248 struct spdk_bdev_shared_resource *shared_resource; 249 250 struct spdk_bdev_io_stat stat; 251 252 /* 253 * Count of I/O submitted through this channel and waiting for completion. 254 * Incremented before submit_request() is called on an spdk_bdev_io. 255 */ 256 uint64_t io_outstanding; 257 258 bdev_io_tailq_t queued_resets; 259 260 uint32_t flags; 261 262 struct spdk_histogram_data *histogram; 263 264 #ifdef SPDK_CONFIG_VTUNE 265 uint64_t start_tsc; 266 uint64_t interval_tsc; 267 __itt_string_handle *handle; 268 struct spdk_bdev_io_stat prev_stat; 269 #endif 270 271 }; 272 273 struct spdk_bdev_desc { 274 struct spdk_bdev *bdev; 275 struct spdk_thread *thread; 276 spdk_bdev_remove_cb_t remove_cb; 277 void *remove_ctx; 278 bool remove_scheduled; 279 bool closed; 280 bool write; 281 TAILQ_ENTRY(spdk_bdev_desc) link; 282 }; 283 284 struct spdk_bdev_iostat_ctx { 285 struct spdk_bdev_io_stat *stat; 286 spdk_bdev_get_device_stat_cb cb; 287 void *cb_arg; 288 }; 289 290 struct set_qos_limit_ctx { 291 void (*cb_fn)(void *cb_arg, int status); 292 void *cb_arg; 293 struct spdk_bdev *bdev; 294 }; 295 296 #define __bdev_to_io_dev(bdev) (((char *)bdev) + 1) 297 #define __bdev_from_io_dev(io_dev) ((struct spdk_bdev *)(((char *)io_dev) - 1)) 298 299 static void _spdk_bdev_write_zero_buffer_done(struct spdk_bdev_io *bdev_io, bool success, 300 void *cb_arg); 301 static void _spdk_bdev_write_zero_buffer_next(void *_bdev_io); 302 303 static void _spdk_bdev_enable_qos_msg(struct spdk_io_channel_iter *i); 304 static void _spdk_bdev_enable_qos_done(struct spdk_io_channel_iter *i, int status); 305 306 static int 307 _spdk_bdev_readv_blocks_with_md(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 308 struct iovec *iov, int iovcnt, void *md_buf, uint64_t offset_blocks, 309 uint64_t num_blocks, spdk_bdev_io_completion_cb cb, void *cb_arg); 310 static int 311 _spdk_bdev_writev_blocks_with_md(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 312 struct iovec *iov, int iovcnt, void *md_buf, 313 uint64_t offset_blocks, uint64_t num_blocks, 314 spdk_bdev_io_completion_cb cb, void *cb_arg); 315 316 void 317 spdk_bdev_get_opts(struct spdk_bdev_opts *opts) 318 { 319 *opts = g_bdev_opts; 320 } 321 322 int 323 spdk_bdev_set_opts(struct spdk_bdev_opts *opts) 324 { 325 uint32_t min_pool_size; 326 327 /* 328 * Add 1 to the thread count to account for the extra mgmt_ch that gets created during subsystem 329 * initialization. A second mgmt_ch will be created on the same thread when the application starts 330 * but before the deferred put_io_channel event is executed for the first mgmt_ch. 331 */ 332 min_pool_size = opts->bdev_io_cache_size * (spdk_thread_get_count() + 1); 333 if (opts->bdev_io_pool_size < min_pool_size) { 334 SPDK_ERRLOG("bdev_io_pool_size %" PRIu32 " is not compatible with bdev_io_cache_size %" PRIu32 335 " and %" PRIu32 " threads\n", opts->bdev_io_pool_size, opts->bdev_io_cache_size, 336 spdk_thread_get_count()); 337 SPDK_ERRLOG("bdev_io_pool_size must be at least %" PRIu32 "\n", min_pool_size); 338 return -1; 339 } 340 341 g_bdev_opts = *opts; 342 return 0; 343 } 344 345 struct spdk_bdev * 346 spdk_bdev_first(void) 347 { 348 struct spdk_bdev *bdev; 349 350 bdev = TAILQ_FIRST(&g_bdev_mgr.bdevs); 351 if (bdev) { 352 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Starting bdev iteration at %s\n", bdev->name); 353 } 354 355 return bdev; 356 } 357 358 struct spdk_bdev * 359 spdk_bdev_next(struct spdk_bdev *prev) 360 { 361 struct spdk_bdev *bdev; 362 363 bdev = TAILQ_NEXT(prev, internal.link); 364 if (bdev) { 365 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Continuing bdev iteration at %s\n", bdev->name); 366 } 367 368 return bdev; 369 } 370 371 static struct spdk_bdev * 372 _bdev_next_leaf(struct spdk_bdev *bdev) 373 { 374 while (bdev != NULL) { 375 if (bdev->internal.claim_module == NULL) { 376 return bdev; 377 } else { 378 bdev = TAILQ_NEXT(bdev, internal.link); 379 } 380 } 381 382 return bdev; 383 } 384 385 struct spdk_bdev * 386 spdk_bdev_first_leaf(void) 387 { 388 struct spdk_bdev *bdev; 389 390 bdev = _bdev_next_leaf(TAILQ_FIRST(&g_bdev_mgr.bdevs)); 391 392 if (bdev) { 393 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Starting bdev iteration at %s\n", bdev->name); 394 } 395 396 return bdev; 397 } 398 399 struct spdk_bdev * 400 spdk_bdev_next_leaf(struct spdk_bdev *prev) 401 { 402 struct spdk_bdev *bdev; 403 404 bdev = _bdev_next_leaf(TAILQ_NEXT(prev, internal.link)); 405 406 if (bdev) { 407 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Continuing bdev iteration at %s\n", bdev->name); 408 } 409 410 return bdev; 411 } 412 413 struct spdk_bdev * 414 spdk_bdev_get_by_name(const char *bdev_name) 415 { 416 struct spdk_bdev_alias *tmp; 417 struct spdk_bdev *bdev = spdk_bdev_first(); 418 419 while (bdev != NULL) { 420 if (strcmp(bdev_name, bdev->name) == 0) { 421 return bdev; 422 } 423 424 TAILQ_FOREACH(tmp, &bdev->aliases, tailq) { 425 if (strcmp(bdev_name, tmp->alias) == 0) { 426 return bdev; 427 } 428 } 429 430 bdev = spdk_bdev_next(bdev); 431 } 432 433 return NULL; 434 } 435 436 void 437 spdk_bdev_io_set_buf(struct spdk_bdev_io *bdev_io, void *buf, size_t len) 438 { 439 struct iovec *iovs; 440 441 if (bdev_io->u.bdev.iovs == NULL) { 442 bdev_io->u.bdev.iovs = &bdev_io->iov; 443 bdev_io->u.bdev.iovcnt = 1; 444 } 445 446 iovs = bdev_io->u.bdev.iovs; 447 448 assert(iovs != NULL); 449 assert(bdev_io->u.bdev.iovcnt >= 1); 450 451 iovs[0].iov_base = buf; 452 iovs[0].iov_len = len; 453 } 454 455 void 456 spdk_bdev_io_set_md_buf(struct spdk_bdev_io *bdev_io, void *md_buf, size_t len) 457 { 458 assert((len / spdk_bdev_get_md_size(bdev_io->bdev)) >= bdev_io->u.bdev.num_blocks); 459 bdev_io->u.bdev.md_buf = md_buf; 460 } 461 462 static bool 463 _is_buf_allocated(const struct iovec *iovs) 464 { 465 if (iovs == NULL) { 466 return false; 467 } 468 469 return iovs[0].iov_base != NULL; 470 } 471 472 static bool 473 _are_iovs_aligned(struct iovec *iovs, int iovcnt, uint32_t alignment) 474 { 475 int i; 476 uintptr_t iov_base; 477 478 if (spdk_likely(alignment == 1)) { 479 return true; 480 } 481 482 for (i = 0; i < iovcnt; i++) { 483 iov_base = (uintptr_t)iovs[i].iov_base; 484 if ((iov_base & (alignment - 1)) != 0) { 485 return false; 486 } 487 } 488 489 return true; 490 } 491 492 static void 493 _copy_iovs_to_buf(void *buf, size_t buf_len, struct iovec *iovs, int iovcnt) 494 { 495 int i; 496 size_t len; 497 498 for (i = 0; i < iovcnt; i++) { 499 len = spdk_min(iovs[i].iov_len, buf_len); 500 memcpy(buf, iovs[i].iov_base, len); 501 buf += len; 502 buf_len -= len; 503 } 504 } 505 506 static void 507 _copy_buf_to_iovs(struct iovec *iovs, int iovcnt, void *buf, size_t buf_len) 508 { 509 int i; 510 size_t len; 511 512 for (i = 0; i < iovcnt; i++) { 513 len = spdk_min(iovs[i].iov_len, buf_len); 514 memcpy(iovs[i].iov_base, buf, len); 515 buf += len; 516 buf_len -= len; 517 } 518 } 519 520 static void 521 _bdev_io_set_bounce_buf(struct spdk_bdev_io *bdev_io, void *buf, size_t len) 522 { 523 /* save original iovec */ 524 bdev_io->internal.orig_iovs = bdev_io->u.bdev.iovs; 525 bdev_io->internal.orig_iovcnt = bdev_io->u.bdev.iovcnt; 526 /* set bounce iov */ 527 bdev_io->u.bdev.iovs = &bdev_io->internal.bounce_iov; 528 bdev_io->u.bdev.iovcnt = 1; 529 /* set bounce buffer for this operation */ 530 bdev_io->u.bdev.iovs[0].iov_base = buf; 531 bdev_io->u.bdev.iovs[0].iov_len = len; 532 /* if this is write path, copy data from original buffer to bounce buffer */ 533 if (bdev_io->type == SPDK_BDEV_IO_TYPE_WRITE) { 534 _copy_iovs_to_buf(buf, len, bdev_io->internal.orig_iovs, bdev_io->internal.orig_iovcnt); 535 } 536 } 537 538 static void 539 _bdev_io_set_bounce_md_buf(struct spdk_bdev_io *bdev_io, void *md_buf, size_t len) 540 { 541 /* save original md_buf */ 542 bdev_io->internal.orig_md_buf = bdev_io->u.bdev.md_buf; 543 /* set bounce md_buf */ 544 bdev_io->u.bdev.md_buf = md_buf; 545 546 if (bdev_io->type == SPDK_BDEV_IO_TYPE_WRITE) { 547 memcpy(md_buf, bdev_io->internal.orig_md_buf, len); 548 } 549 } 550 551 static void 552 _bdev_io_set_buf(struct spdk_bdev_io *bdev_io, void *buf, uint64_t len) 553 { 554 struct spdk_bdev *bdev = bdev_io->bdev; 555 bool buf_allocated; 556 uint64_t md_len, alignment; 557 void *aligned_buf; 558 559 alignment = spdk_bdev_get_buf_align(bdev); 560 buf_allocated = _is_buf_allocated(bdev_io->u.bdev.iovs); 561 aligned_buf = (void *)(((uintptr_t)buf + (alignment - 1)) & ~(alignment - 1)); 562 563 if (buf_allocated) { 564 _bdev_io_set_bounce_buf(bdev_io, aligned_buf, len); 565 } else { 566 spdk_bdev_io_set_buf(bdev_io, aligned_buf, len); 567 } 568 569 if (spdk_bdev_is_md_separate(bdev)) { 570 aligned_buf = (char *)aligned_buf + len; 571 md_len = bdev_io->u.bdev.num_blocks * bdev->md_len; 572 573 assert(((uintptr_t)aligned_buf & (alignment - 1)) == 0); 574 575 if (bdev_io->u.bdev.md_buf != NULL) { 576 _bdev_io_set_bounce_md_buf(bdev_io, aligned_buf, md_len); 577 } else { 578 spdk_bdev_io_set_md_buf(bdev_io, aligned_buf, md_len); 579 } 580 } 581 582 bdev_io->internal.buf = buf; 583 bdev_io->internal.get_buf_cb(spdk_bdev_io_get_io_channel(bdev_io), bdev_io, true); 584 } 585 586 static void 587 spdk_bdev_io_put_buf(struct spdk_bdev_io *bdev_io) 588 { 589 struct spdk_bdev *bdev = bdev_io->bdev; 590 struct spdk_mempool *pool; 591 struct spdk_bdev_io *tmp; 592 bdev_io_stailq_t *stailq; 593 struct spdk_bdev_mgmt_channel *ch; 594 uint64_t buf_len, md_len, alignment; 595 void *buf; 596 597 buf = bdev_io->internal.buf; 598 buf_len = bdev_io->internal.buf_len; 599 md_len = spdk_bdev_is_md_separate(bdev) ? bdev_io->u.bdev.num_blocks * bdev->md_len : 0; 600 alignment = spdk_bdev_get_buf_align(bdev); 601 ch = bdev_io->internal.ch->shared_resource->mgmt_ch; 602 603 bdev_io->internal.buf = NULL; 604 605 if (buf_len + alignment + md_len <= SPDK_BDEV_BUF_SIZE_WITH_MD(SPDK_BDEV_SMALL_BUF_MAX_SIZE) + 606 SPDK_BDEV_POOL_ALIGNMENT) { 607 pool = g_bdev_mgr.buf_small_pool; 608 stailq = &ch->need_buf_small; 609 } else { 610 pool = g_bdev_mgr.buf_large_pool; 611 stailq = &ch->need_buf_large; 612 } 613 614 if (STAILQ_EMPTY(stailq)) { 615 spdk_mempool_put(pool, buf); 616 } else { 617 tmp = STAILQ_FIRST(stailq); 618 STAILQ_REMOVE_HEAD(stailq, internal.buf_link); 619 _bdev_io_set_buf(tmp, buf, tmp->internal.buf_len); 620 } 621 } 622 623 static void 624 _bdev_io_unset_bounce_buf(struct spdk_bdev_io *bdev_io) 625 { 626 if (spdk_likely(bdev_io->internal.orig_iovcnt == 0)) { 627 assert(bdev_io->internal.orig_md_buf == NULL); 628 return; 629 } 630 631 /* if this is read path, copy data from bounce buffer to original buffer */ 632 if (bdev_io->type == SPDK_BDEV_IO_TYPE_READ && 633 bdev_io->internal.status == SPDK_BDEV_IO_STATUS_SUCCESS) { 634 _copy_buf_to_iovs(bdev_io->internal.orig_iovs, 635 bdev_io->internal.orig_iovcnt, 636 bdev_io->internal.bounce_iov.iov_base, 637 bdev_io->internal.bounce_iov.iov_len); 638 } 639 /* set orignal buffer for this io */ 640 bdev_io->u.bdev.iovcnt = bdev_io->internal.orig_iovcnt; 641 bdev_io->u.bdev.iovs = bdev_io->internal.orig_iovs; 642 /* disable bouncing buffer for this io */ 643 bdev_io->internal.orig_iovcnt = 0; 644 bdev_io->internal.orig_iovs = NULL; 645 646 /* do the same for metadata buffer */ 647 if (spdk_unlikely(bdev_io->internal.orig_md_buf != NULL)) { 648 assert(spdk_bdev_is_md_separate(bdev_io->bdev)); 649 650 if (bdev_io->type == SPDK_BDEV_IO_TYPE_READ && 651 bdev_io->internal.status == SPDK_BDEV_IO_STATUS_SUCCESS) { 652 memcpy(bdev_io->internal.orig_md_buf, bdev_io->u.bdev.md_buf, 653 bdev_io->u.bdev.num_blocks * spdk_bdev_get_md_size(bdev_io->bdev)); 654 } 655 656 bdev_io->u.bdev.md_buf = bdev_io->internal.orig_md_buf; 657 bdev_io->internal.orig_md_buf = NULL; 658 } 659 660 spdk_bdev_io_put_buf(bdev_io); 661 } 662 663 void 664 spdk_bdev_io_get_buf(struct spdk_bdev_io *bdev_io, spdk_bdev_io_get_buf_cb cb, uint64_t len) 665 { 666 struct spdk_bdev *bdev = bdev_io->bdev; 667 struct spdk_mempool *pool; 668 bdev_io_stailq_t *stailq; 669 struct spdk_bdev_mgmt_channel *mgmt_ch; 670 uint64_t alignment, md_len; 671 void *buf; 672 673 assert(cb != NULL); 674 675 alignment = spdk_bdev_get_buf_align(bdev); 676 md_len = spdk_bdev_is_md_separate(bdev) ? bdev_io->u.bdev.num_blocks * bdev->md_len : 0; 677 678 if (_is_buf_allocated(bdev_io->u.bdev.iovs) && 679 _are_iovs_aligned(bdev_io->u.bdev.iovs, bdev_io->u.bdev.iovcnt, alignment)) { 680 /* Buffer already present and aligned */ 681 cb(spdk_bdev_io_get_io_channel(bdev_io), bdev_io, true); 682 return; 683 } 684 685 if (len + alignment + md_len > SPDK_BDEV_BUF_SIZE_WITH_MD(SPDK_BDEV_LARGE_BUF_MAX_SIZE) + 686 SPDK_BDEV_POOL_ALIGNMENT) { 687 SPDK_ERRLOG("Length + alignment %" PRIu64 " is larger than allowed\n", 688 len + alignment); 689 cb(spdk_bdev_io_get_io_channel(bdev_io), bdev_io, false); 690 return; 691 } 692 693 mgmt_ch = bdev_io->internal.ch->shared_resource->mgmt_ch; 694 695 bdev_io->internal.buf_len = len; 696 bdev_io->internal.get_buf_cb = cb; 697 698 if (len + alignment + md_len <= SPDK_BDEV_BUF_SIZE_WITH_MD(SPDK_BDEV_SMALL_BUF_MAX_SIZE) + 699 SPDK_BDEV_POOL_ALIGNMENT) { 700 pool = g_bdev_mgr.buf_small_pool; 701 stailq = &mgmt_ch->need_buf_small; 702 } else { 703 pool = g_bdev_mgr.buf_large_pool; 704 stailq = &mgmt_ch->need_buf_large; 705 } 706 707 buf = spdk_mempool_get(pool); 708 if (!buf) { 709 STAILQ_INSERT_TAIL(stailq, bdev_io, internal.buf_link); 710 } else { 711 _bdev_io_set_buf(bdev_io, buf, len); 712 } 713 } 714 715 static int 716 spdk_bdev_module_get_max_ctx_size(void) 717 { 718 struct spdk_bdev_module *bdev_module; 719 int max_bdev_module_size = 0; 720 721 TAILQ_FOREACH(bdev_module, &g_bdev_mgr.bdev_modules, internal.tailq) { 722 if (bdev_module->get_ctx_size && bdev_module->get_ctx_size() > max_bdev_module_size) { 723 max_bdev_module_size = bdev_module->get_ctx_size(); 724 } 725 } 726 727 return max_bdev_module_size; 728 } 729 730 void 731 spdk_bdev_config_text(FILE *fp) 732 { 733 struct spdk_bdev_module *bdev_module; 734 735 TAILQ_FOREACH(bdev_module, &g_bdev_mgr.bdev_modules, internal.tailq) { 736 if (bdev_module->config_text) { 737 bdev_module->config_text(fp); 738 } 739 } 740 } 741 742 static void 743 spdk_bdev_qos_config_json(struct spdk_bdev *bdev, struct spdk_json_write_ctx *w) 744 { 745 int i; 746 struct spdk_bdev_qos *qos = bdev->internal.qos; 747 uint64_t limits[SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES]; 748 749 if (!qos) { 750 return; 751 } 752 753 spdk_bdev_get_qos_rate_limits(bdev, limits); 754 755 spdk_json_write_object_begin(w); 756 spdk_json_write_named_string(w, "method", "set_bdev_qos_limit"); 757 758 spdk_json_write_named_object_begin(w, "params"); 759 spdk_json_write_named_string(w, "name", bdev->name); 760 for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) { 761 if (limits[i] > 0) { 762 spdk_json_write_named_uint64(w, qos_rpc_type[i], limits[i]); 763 } 764 } 765 spdk_json_write_object_end(w); 766 767 spdk_json_write_object_end(w); 768 } 769 770 void 771 spdk_bdev_subsystem_config_json(struct spdk_json_write_ctx *w) 772 { 773 struct spdk_bdev_module *bdev_module; 774 struct spdk_bdev *bdev; 775 776 assert(w != NULL); 777 778 spdk_json_write_array_begin(w); 779 780 spdk_json_write_object_begin(w); 781 spdk_json_write_named_string(w, "method", "set_bdev_options"); 782 spdk_json_write_named_object_begin(w, "params"); 783 spdk_json_write_named_uint32(w, "bdev_io_pool_size", g_bdev_opts.bdev_io_pool_size); 784 spdk_json_write_named_uint32(w, "bdev_io_cache_size", g_bdev_opts.bdev_io_cache_size); 785 spdk_json_write_object_end(w); 786 spdk_json_write_object_end(w); 787 788 TAILQ_FOREACH(bdev_module, &g_bdev_mgr.bdev_modules, internal.tailq) { 789 if (bdev_module->config_json) { 790 bdev_module->config_json(w); 791 } 792 } 793 794 pthread_mutex_lock(&g_bdev_mgr.mutex); 795 796 TAILQ_FOREACH(bdev, &g_bdev_mgr.bdevs, internal.link) { 797 if (bdev->fn_table->write_config_json) { 798 bdev->fn_table->write_config_json(bdev, w); 799 } 800 801 spdk_bdev_qos_config_json(bdev, w); 802 } 803 804 pthread_mutex_unlock(&g_bdev_mgr.mutex); 805 806 spdk_json_write_array_end(w); 807 } 808 809 static int 810 spdk_bdev_mgmt_channel_create(void *io_device, void *ctx_buf) 811 { 812 struct spdk_bdev_mgmt_channel *ch = ctx_buf; 813 struct spdk_bdev_io *bdev_io; 814 uint32_t i; 815 816 STAILQ_INIT(&ch->need_buf_small); 817 STAILQ_INIT(&ch->need_buf_large); 818 819 STAILQ_INIT(&ch->per_thread_cache); 820 ch->bdev_io_cache_size = g_bdev_opts.bdev_io_cache_size; 821 822 /* Pre-populate bdev_io cache to ensure this thread cannot be starved. */ 823 ch->per_thread_cache_count = 0; 824 for (i = 0; i < ch->bdev_io_cache_size; i++) { 825 bdev_io = spdk_mempool_get(g_bdev_mgr.bdev_io_pool); 826 assert(bdev_io != NULL); 827 ch->per_thread_cache_count++; 828 STAILQ_INSERT_HEAD(&ch->per_thread_cache, bdev_io, internal.buf_link); 829 } 830 831 TAILQ_INIT(&ch->shared_resources); 832 TAILQ_INIT(&ch->io_wait_queue); 833 834 return 0; 835 } 836 837 static void 838 spdk_bdev_mgmt_channel_destroy(void *io_device, void *ctx_buf) 839 { 840 struct spdk_bdev_mgmt_channel *ch = ctx_buf; 841 struct spdk_bdev_io *bdev_io; 842 843 if (!STAILQ_EMPTY(&ch->need_buf_small) || !STAILQ_EMPTY(&ch->need_buf_large)) { 844 SPDK_ERRLOG("Pending I/O list wasn't empty on mgmt channel free\n"); 845 } 846 847 if (!TAILQ_EMPTY(&ch->shared_resources)) { 848 SPDK_ERRLOG("Module channel list wasn't empty on mgmt channel free\n"); 849 } 850 851 while (!STAILQ_EMPTY(&ch->per_thread_cache)) { 852 bdev_io = STAILQ_FIRST(&ch->per_thread_cache); 853 STAILQ_REMOVE_HEAD(&ch->per_thread_cache, internal.buf_link); 854 ch->per_thread_cache_count--; 855 spdk_mempool_put(g_bdev_mgr.bdev_io_pool, (void *)bdev_io); 856 } 857 858 assert(ch->per_thread_cache_count == 0); 859 } 860 861 static void 862 spdk_bdev_init_complete(int rc) 863 { 864 spdk_bdev_init_cb cb_fn = g_init_cb_fn; 865 void *cb_arg = g_init_cb_arg; 866 struct spdk_bdev_module *m; 867 868 g_bdev_mgr.init_complete = true; 869 g_init_cb_fn = NULL; 870 g_init_cb_arg = NULL; 871 872 /* 873 * For modules that need to know when subsystem init is complete, 874 * inform them now. 875 */ 876 if (rc == 0) { 877 TAILQ_FOREACH(m, &g_bdev_mgr.bdev_modules, internal.tailq) { 878 if (m->init_complete) { 879 m->init_complete(); 880 } 881 } 882 } 883 884 cb_fn(cb_arg, rc); 885 } 886 887 static void 888 spdk_bdev_module_action_complete(void) 889 { 890 struct spdk_bdev_module *m; 891 892 /* 893 * Don't finish bdev subsystem initialization if 894 * module pre-initialization is still in progress, or 895 * the subsystem been already initialized. 896 */ 897 if (!g_bdev_mgr.module_init_complete || g_bdev_mgr.init_complete) { 898 return; 899 } 900 901 /* 902 * Check all bdev modules for inits/examinations in progress. If any 903 * exist, return immediately since we cannot finish bdev subsystem 904 * initialization until all are completed. 905 */ 906 TAILQ_FOREACH(m, &g_bdev_mgr.bdev_modules, internal.tailq) { 907 if (m->internal.action_in_progress > 0) { 908 return; 909 } 910 } 911 912 /* 913 * Modules already finished initialization - now that all 914 * the bdev modules have finished their asynchronous I/O 915 * processing, the entire bdev layer can be marked as complete. 916 */ 917 spdk_bdev_init_complete(0); 918 } 919 920 static void 921 spdk_bdev_module_action_done(struct spdk_bdev_module *module) 922 { 923 assert(module->internal.action_in_progress > 0); 924 module->internal.action_in_progress--; 925 spdk_bdev_module_action_complete(); 926 } 927 928 void 929 spdk_bdev_module_init_done(struct spdk_bdev_module *module) 930 { 931 spdk_bdev_module_action_done(module); 932 } 933 934 void 935 spdk_bdev_module_examine_done(struct spdk_bdev_module *module) 936 { 937 spdk_bdev_module_action_done(module); 938 } 939 940 /** The last initialized bdev module */ 941 static struct spdk_bdev_module *g_resume_bdev_module = NULL; 942 943 static void 944 spdk_bdev_init_failed(void *cb_arg) 945 { 946 struct spdk_bdev_module *module = cb_arg; 947 948 module->internal.action_in_progress--; 949 spdk_bdev_init_complete(-1); 950 } 951 952 static int 953 spdk_bdev_modules_init(void) 954 { 955 struct spdk_bdev_module *module; 956 int rc = 0; 957 958 TAILQ_FOREACH(module, &g_bdev_mgr.bdev_modules, internal.tailq) { 959 g_resume_bdev_module = module; 960 if (module->async_init) { 961 module->internal.action_in_progress = 1; 962 } 963 rc = module->module_init(); 964 if (rc != 0) { 965 /* Bump action_in_progress to prevent other modules from completion of modules_init 966 * Send message to defer application shutdown until resources are cleaned up */ 967 module->internal.action_in_progress = 1; 968 spdk_thread_send_msg(spdk_get_thread(), spdk_bdev_init_failed, module); 969 return rc; 970 } 971 } 972 973 g_resume_bdev_module = NULL; 974 return 0; 975 } 976 977 void 978 spdk_bdev_initialize(spdk_bdev_init_cb cb_fn, void *cb_arg) 979 { 980 struct spdk_conf_section *sp; 981 struct spdk_bdev_opts bdev_opts; 982 int32_t bdev_io_pool_size, bdev_io_cache_size; 983 int cache_size; 984 int rc = 0; 985 char mempool_name[32]; 986 987 assert(cb_fn != NULL); 988 989 sp = spdk_conf_find_section(NULL, "Bdev"); 990 if (sp != NULL) { 991 spdk_bdev_get_opts(&bdev_opts); 992 993 bdev_io_pool_size = spdk_conf_section_get_intval(sp, "BdevIoPoolSize"); 994 if (bdev_io_pool_size >= 0) { 995 bdev_opts.bdev_io_pool_size = bdev_io_pool_size; 996 } 997 998 bdev_io_cache_size = spdk_conf_section_get_intval(sp, "BdevIoCacheSize"); 999 if (bdev_io_cache_size >= 0) { 1000 bdev_opts.bdev_io_cache_size = bdev_io_cache_size; 1001 } 1002 1003 if (spdk_bdev_set_opts(&bdev_opts)) { 1004 spdk_bdev_init_complete(-1); 1005 return; 1006 } 1007 1008 assert(memcmp(&bdev_opts, &g_bdev_opts, sizeof(bdev_opts)) == 0); 1009 } 1010 1011 g_init_cb_fn = cb_fn; 1012 g_init_cb_arg = cb_arg; 1013 1014 spdk_notify_type_register("bdev_register"); 1015 spdk_notify_type_register("bdev_unregister"); 1016 1017 snprintf(mempool_name, sizeof(mempool_name), "bdev_io_%d", getpid()); 1018 1019 g_bdev_mgr.bdev_io_pool = spdk_mempool_create(mempool_name, 1020 g_bdev_opts.bdev_io_pool_size, 1021 sizeof(struct spdk_bdev_io) + 1022 spdk_bdev_module_get_max_ctx_size(), 1023 0, 1024 SPDK_ENV_SOCKET_ID_ANY); 1025 1026 if (g_bdev_mgr.bdev_io_pool == NULL) { 1027 SPDK_ERRLOG("could not allocate spdk_bdev_io pool\n"); 1028 spdk_bdev_init_complete(-1); 1029 return; 1030 } 1031 1032 /** 1033 * Ensure no more than half of the total buffers end up local caches, by 1034 * using spdk_thread_get_count() to determine how many local caches we need 1035 * to account for. 1036 */ 1037 cache_size = BUF_SMALL_POOL_SIZE / (2 * spdk_thread_get_count()); 1038 snprintf(mempool_name, sizeof(mempool_name), "buf_small_pool_%d", getpid()); 1039 1040 g_bdev_mgr.buf_small_pool = spdk_mempool_create(mempool_name, 1041 BUF_SMALL_POOL_SIZE, 1042 SPDK_BDEV_BUF_SIZE_WITH_MD(SPDK_BDEV_SMALL_BUF_MAX_SIZE) + 1043 SPDK_BDEV_POOL_ALIGNMENT, 1044 cache_size, 1045 SPDK_ENV_SOCKET_ID_ANY); 1046 if (!g_bdev_mgr.buf_small_pool) { 1047 SPDK_ERRLOG("create rbuf small pool failed\n"); 1048 spdk_bdev_init_complete(-1); 1049 return; 1050 } 1051 1052 cache_size = BUF_LARGE_POOL_SIZE / (2 * spdk_thread_get_count()); 1053 snprintf(mempool_name, sizeof(mempool_name), "buf_large_pool_%d", getpid()); 1054 1055 g_bdev_mgr.buf_large_pool = spdk_mempool_create(mempool_name, 1056 BUF_LARGE_POOL_SIZE, 1057 SPDK_BDEV_BUF_SIZE_WITH_MD(SPDK_BDEV_LARGE_BUF_MAX_SIZE) + 1058 SPDK_BDEV_POOL_ALIGNMENT, 1059 cache_size, 1060 SPDK_ENV_SOCKET_ID_ANY); 1061 if (!g_bdev_mgr.buf_large_pool) { 1062 SPDK_ERRLOG("create rbuf large pool failed\n"); 1063 spdk_bdev_init_complete(-1); 1064 return; 1065 } 1066 1067 g_bdev_mgr.zero_buffer = spdk_zmalloc(ZERO_BUFFER_SIZE, ZERO_BUFFER_SIZE, 1068 NULL, SPDK_ENV_LCORE_ID_ANY, SPDK_MALLOC_DMA); 1069 if (!g_bdev_mgr.zero_buffer) { 1070 SPDK_ERRLOG("create bdev zero buffer failed\n"); 1071 spdk_bdev_init_complete(-1); 1072 return; 1073 } 1074 1075 #ifdef SPDK_CONFIG_VTUNE 1076 g_bdev_mgr.domain = __itt_domain_create("spdk_bdev"); 1077 #endif 1078 1079 spdk_io_device_register(&g_bdev_mgr, spdk_bdev_mgmt_channel_create, 1080 spdk_bdev_mgmt_channel_destroy, 1081 sizeof(struct spdk_bdev_mgmt_channel), 1082 "bdev_mgr"); 1083 1084 rc = spdk_bdev_modules_init(); 1085 g_bdev_mgr.module_init_complete = true; 1086 if (rc != 0) { 1087 SPDK_ERRLOG("bdev modules init failed\n"); 1088 return; 1089 } 1090 1091 spdk_bdev_module_action_complete(); 1092 } 1093 1094 static void 1095 spdk_bdev_mgr_unregister_cb(void *io_device) 1096 { 1097 spdk_bdev_fini_cb cb_fn = g_fini_cb_fn; 1098 1099 if (spdk_mempool_count(g_bdev_mgr.bdev_io_pool) != g_bdev_opts.bdev_io_pool_size) { 1100 SPDK_ERRLOG("bdev IO pool count is %zu but should be %u\n", 1101 spdk_mempool_count(g_bdev_mgr.bdev_io_pool), 1102 g_bdev_opts.bdev_io_pool_size); 1103 } 1104 1105 if (spdk_mempool_count(g_bdev_mgr.buf_small_pool) != BUF_SMALL_POOL_SIZE) { 1106 SPDK_ERRLOG("Small buffer pool count is %zu but should be %u\n", 1107 spdk_mempool_count(g_bdev_mgr.buf_small_pool), 1108 BUF_SMALL_POOL_SIZE); 1109 assert(false); 1110 } 1111 1112 if (spdk_mempool_count(g_bdev_mgr.buf_large_pool) != BUF_LARGE_POOL_SIZE) { 1113 SPDK_ERRLOG("Large buffer pool count is %zu but should be %u\n", 1114 spdk_mempool_count(g_bdev_mgr.buf_large_pool), 1115 BUF_LARGE_POOL_SIZE); 1116 assert(false); 1117 } 1118 1119 spdk_mempool_free(g_bdev_mgr.bdev_io_pool); 1120 spdk_mempool_free(g_bdev_mgr.buf_small_pool); 1121 spdk_mempool_free(g_bdev_mgr.buf_large_pool); 1122 spdk_free(g_bdev_mgr.zero_buffer); 1123 1124 cb_fn(g_fini_cb_arg); 1125 g_fini_cb_fn = NULL; 1126 g_fini_cb_arg = NULL; 1127 g_bdev_mgr.init_complete = false; 1128 g_bdev_mgr.module_init_complete = false; 1129 pthread_mutex_destroy(&g_bdev_mgr.mutex); 1130 } 1131 1132 static void 1133 spdk_bdev_module_finish_iter(void *arg) 1134 { 1135 struct spdk_bdev_module *bdev_module; 1136 1137 /* FIXME: Handling initialization failures is broken now, 1138 * so we won't even try cleaning up after successfully 1139 * initialized modules. if module_init_complete is false, 1140 * just call spdk_bdev_mgr_unregister_cb 1141 */ 1142 if (!g_bdev_mgr.module_init_complete) { 1143 spdk_bdev_mgr_unregister_cb(NULL); 1144 return; 1145 } 1146 1147 /* Start iterating from the last touched module */ 1148 if (!g_resume_bdev_module) { 1149 bdev_module = TAILQ_LAST(&g_bdev_mgr.bdev_modules, bdev_module_list); 1150 } else { 1151 bdev_module = TAILQ_PREV(g_resume_bdev_module, bdev_module_list, 1152 internal.tailq); 1153 } 1154 1155 while (bdev_module) { 1156 if (bdev_module->async_fini) { 1157 /* Save our place so we can resume later. We must 1158 * save the variable here, before calling module_fini() 1159 * below, because in some cases the module may immediately 1160 * call spdk_bdev_module_finish_done() and re-enter 1161 * this function to continue iterating. */ 1162 g_resume_bdev_module = bdev_module; 1163 } 1164 1165 if (bdev_module->module_fini) { 1166 bdev_module->module_fini(); 1167 } 1168 1169 if (bdev_module->async_fini) { 1170 return; 1171 } 1172 1173 bdev_module = TAILQ_PREV(bdev_module, bdev_module_list, 1174 internal.tailq); 1175 } 1176 1177 g_resume_bdev_module = NULL; 1178 spdk_io_device_unregister(&g_bdev_mgr, spdk_bdev_mgr_unregister_cb); 1179 } 1180 1181 void 1182 spdk_bdev_module_finish_done(void) 1183 { 1184 if (spdk_get_thread() != g_fini_thread) { 1185 spdk_thread_send_msg(g_fini_thread, spdk_bdev_module_finish_iter, NULL); 1186 } else { 1187 spdk_bdev_module_finish_iter(NULL); 1188 } 1189 } 1190 1191 static void 1192 _spdk_bdev_finish_unregister_bdevs_iter(void *cb_arg, int bdeverrno) 1193 { 1194 struct spdk_bdev *bdev = cb_arg; 1195 1196 if (bdeverrno && bdev) { 1197 SPDK_WARNLOG("Unable to unregister bdev '%s' during spdk_bdev_finish()\n", 1198 bdev->name); 1199 1200 /* 1201 * Since the call to spdk_bdev_unregister() failed, we have no way to free this 1202 * bdev; try to continue by manually removing this bdev from the list and continue 1203 * with the next bdev in the list. 1204 */ 1205 TAILQ_REMOVE(&g_bdev_mgr.bdevs, bdev, internal.link); 1206 } 1207 1208 if (TAILQ_EMPTY(&g_bdev_mgr.bdevs)) { 1209 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Done unregistering bdevs\n"); 1210 /* 1211 * Bdev module finish need to be deferred as we might be in the middle of some context 1212 * (like bdev part free) that will use this bdev (or private bdev driver ctx data) 1213 * after returning. 1214 */ 1215 spdk_thread_send_msg(spdk_get_thread(), spdk_bdev_module_finish_iter, NULL); 1216 return; 1217 } 1218 1219 /* 1220 * Unregister last unclaimed bdev in the list, to ensure that bdev subsystem 1221 * shutdown proceeds top-down. The goal is to give virtual bdevs an opportunity 1222 * to detect clean shutdown as opposed to run-time hot removal of the underlying 1223 * base bdevs. 1224 * 1225 * Also, walk the list in the reverse order. 1226 */ 1227 for (bdev = TAILQ_LAST(&g_bdev_mgr.bdevs, spdk_bdev_list); 1228 bdev; bdev = TAILQ_PREV(bdev, spdk_bdev_list, internal.link)) { 1229 if (bdev->internal.claim_module != NULL) { 1230 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Skipping claimed bdev '%s'(<-'%s').\n", 1231 bdev->name, bdev->internal.claim_module->name); 1232 continue; 1233 } 1234 1235 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Unregistering bdev '%s'\n", bdev->name); 1236 spdk_bdev_unregister(bdev, _spdk_bdev_finish_unregister_bdevs_iter, bdev); 1237 return; 1238 } 1239 1240 /* 1241 * If any bdev fails to unclaim underlying bdev properly, we may face the 1242 * case of bdev list consisting of claimed bdevs only (if claims are managed 1243 * correctly, this would mean there's a loop in the claims graph which is 1244 * clearly impossible). Warn and unregister last bdev on the list then. 1245 */ 1246 for (bdev = TAILQ_LAST(&g_bdev_mgr.bdevs, spdk_bdev_list); 1247 bdev; bdev = TAILQ_PREV(bdev, spdk_bdev_list, internal.link)) { 1248 SPDK_WARNLOG("Unregistering claimed bdev '%s'!\n", bdev->name); 1249 spdk_bdev_unregister(bdev, _spdk_bdev_finish_unregister_bdevs_iter, bdev); 1250 return; 1251 } 1252 } 1253 1254 void 1255 spdk_bdev_finish(spdk_bdev_fini_cb cb_fn, void *cb_arg) 1256 { 1257 struct spdk_bdev_module *m; 1258 1259 assert(cb_fn != NULL); 1260 1261 g_fini_thread = spdk_get_thread(); 1262 1263 g_fini_cb_fn = cb_fn; 1264 g_fini_cb_arg = cb_arg; 1265 1266 TAILQ_FOREACH(m, &g_bdev_mgr.bdev_modules, internal.tailq) { 1267 if (m->fini_start) { 1268 m->fini_start(); 1269 } 1270 } 1271 1272 _spdk_bdev_finish_unregister_bdevs_iter(NULL, 0); 1273 } 1274 1275 static struct spdk_bdev_io * 1276 spdk_bdev_get_io(struct spdk_bdev_channel *channel) 1277 { 1278 struct spdk_bdev_mgmt_channel *ch = channel->shared_resource->mgmt_ch; 1279 struct spdk_bdev_io *bdev_io; 1280 1281 if (ch->per_thread_cache_count > 0) { 1282 bdev_io = STAILQ_FIRST(&ch->per_thread_cache); 1283 STAILQ_REMOVE_HEAD(&ch->per_thread_cache, internal.buf_link); 1284 ch->per_thread_cache_count--; 1285 } else if (spdk_unlikely(!TAILQ_EMPTY(&ch->io_wait_queue))) { 1286 /* 1287 * Don't try to look for bdev_ios in the global pool if there are 1288 * waiters on bdev_ios - we don't want this caller to jump the line. 1289 */ 1290 bdev_io = NULL; 1291 } else { 1292 bdev_io = spdk_mempool_get(g_bdev_mgr.bdev_io_pool); 1293 } 1294 1295 return bdev_io; 1296 } 1297 1298 void 1299 spdk_bdev_free_io(struct spdk_bdev_io *bdev_io) 1300 { 1301 struct spdk_bdev_mgmt_channel *ch; 1302 1303 assert(bdev_io != NULL); 1304 assert(bdev_io->internal.status != SPDK_BDEV_IO_STATUS_PENDING); 1305 1306 ch = bdev_io->internal.ch->shared_resource->mgmt_ch; 1307 1308 if (bdev_io->internal.buf != NULL) { 1309 spdk_bdev_io_put_buf(bdev_io); 1310 } 1311 1312 if (ch->per_thread_cache_count < ch->bdev_io_cache_size) { 1313 ch->per_thread_cache_count++; 1314 STAILQ_INSERT_HEAD(&ch->per_thread_cache, bdev_io, internal.buf_link); 1315 while (ch->per_thread_cache_count > 0 && !TAILQ_EMPTY(&ch->io_wait_queue)) { 1316 struct spdk_bdev_io_wait_entry *entry; 1317 1318 entry = TAILQ_FIRST(&ch->io_wait_queue); 1319 TAILQ_REMOVE(&ch->io_wait_queue, entry, link); 1320 entry->cb_fn(entry->cb_arg); 1321 } 1322 } else { 1323 /* We should never have a full cache with entries on the io wait queue. */ 1324 assert(TAILQ_EMPTY(&ch->io_wait_queue)); 1325 spdk_mempool_put(g_bdev_mgr.bdev_io_pool, (void *)bdev_io); 1326 } 1327 } 1328 1329 static bool 1330 _spdk_bdev_qos_is_iops_rate_limit(enum spdk_bdev_qos_rate_limit_type limit) 1331 { 1332 assert(limit != SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES); 1333 1334 switch (limit) { 1335 case SPDK_BDEV_QOS_RW_IOPS_RATE_LIMIT: 1336 return true; 1337 case SPDK_BDEV_QOS_RW_BPS_RATE_LIMIT: 1338 case SPDK_BDEV_QOS_R_BPS_RATE_LIMIT: 1339 case SPDK_BDEV_QOS_W_BPS_RATE_LIMIT: 1340 return false; 1341 case SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES: 1342 default: 1343 return false; 1344 } 1345 } 1346 1347 static bool 1348 _spdk_bdev_qos_io_to_limit(struct spdk_bdev_io *bdev_io) 1349 { 1350 switch (bdev_io->type) { 1351 case SPDK_BDEV_IO_TYPE_NVME_IO: 1352 case SPDK_BDEV_IO_TYPE_NVME_IO_MD: 1353 case SPDK_BDEV_IO_TYPE_READ: 1354 case SPDK_BDEV_IO_TYPE_WRITE: 1355 return true; 1356 default: 1357 return false; 1358 } 1359 } 1360 1361 static bool 1362 _spdk_bdev_is_read_io(struct spdk_bdev_io *bdev_io) 1363 { 1364 switch (bdev_io->type) { 1365 case SPDK_BDEV_IO_TYPE_NVME_IO: 1366 case SPDK_BDEV_IO_TYPE_NVME_IO_MD: 1367 /* Bit 1 (0x2) set for read operation */ 1368 if (bdev_io->u.nvme_passthru.cmd.opc & SPDK_NVME_OPC_READ) { 1369 return true; 1370 } else { 1371 return false; 1372 } 1373 case SPDK_BDEV_IO_TYPE_READ: 1374 return true; 1375 default: 1376 return false; 1377 } 1378 } 1379 1380 static uint64_t 1381 _spdk_bdev_get_io_size_in_byte(struct spdk_bdev_io *bdev_io) 1382 { 1383 struct spdk_bdev *bdev = bdev_io->bdev; 1384 1385 switch (bdev_io->type) { 1386 case SPDK_BDEV_IO_TYPE_NVME_IO: 1387 case SPDK_BDEV_IO_TYPE_NVME_IO_MD: 1388 return bdev_io->u.nvme_passthru.nbytes; 1389 case SPDK_BDEV_IO_TYPE_READ: 1390 case SPDK_BDEV_IO_TYPE_WRITE: 1391 return bdev_io->u.bdev.num_blocks * bdev->blocklen; 1392 default: 1393 return 0; 1394 } 1395 } 1396 1397 static bool 1398 _spdk_bdev_qos_rw_queue_io(const struct spdk_bdev_qos_limit *limit, struct spdk_bdev_io *io) 1399 { 1400 if (limit->max_per_timeslice > 0 && limit->remaining_this_timeslice <= 0) { 1401 return true; 1402 } else { 1403 return false; 1404 } 1405 } 1406 1407 static bool 1408 _spdk_bdev_qos_r_queue_io(const struct spdk_bdev_qos_limit *limit, struct spdk_bdev_io *io) 1409 { 1410 if (_spdk_bdev_is_read_io(io) == false) { 1411 return false; 1412 } 1413 1414 return _spdk_bdev_qos_rw_queue_io(limit, io); 1415 } 1416 1417 static bool 1418 _spdk_bdev_qos_w_queue_io(const struct spdk_bdev_qos_limit *limit, struct spdk_bdev_io *io) 1419 { 1420 if (_spdk_bdev_is_read_io(io) == true) { 1421 return false; 1422 } 1423 1424 return _spdk_bdev_qos_rw_queue_io(limit, io); 1425 } 1426 1427 static void 1428 _spdk_bdev_qos_rw_iops_update_quota(struct spdk_bdev_qos_limit *limit, struct spdk_bdev_io *io) 1429 { 1430 limit->remaining_this_timeslice--; 1431 } 1432 1433 static void 1434 _spdk_bdev_qos_rw_bps_update_quota(struct spdk_bdev_qos_limit *limit, struct spdk_bdev_io *io) 1435 { 1436 limit->remaining_this_timeslice -= _spdk_bdev_get_io_size_in_byte(io); 1437 } 1438 1439 static void 1440 _spdk_bdev_qos_r_bps_update_quota(struct spdk_bdev_qos_limit *limit, struct spdk_bdev_io *io) 1441 { 1442 if (_spdk_bdev_is_read_io(io) == false) { 1443 return; 1444 } 1445 1446 return _spdk_bdev_qos_rw_bps_update_quota(limit, io); 1447 } 1448 1449 static void 1450 _spdk_bdev_qos_w_bps_update_quota(struct spdk_bdev_qos_limit *limit, struct spdk_bdev_io *io) 1451 { 1452 if (_spdk_bdev_is_read_io(io) == true) { 1453 return; 1454 } 1455 1456 return _spdk_bdev_qos_rw_bps_update_quota(limit, io); 1457 } 1458 1459 static void 1460 _spdk_bdev_qos_set_ops(struct spdk_bdev_qos *qos) 1461 { 1462 int i; 1463 1464 for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) { 1465 if (qos->rate_limits[i].limit == SPDK_BDEV_QOS_LIMIT_NOT_DEFINED) { 1466 qos->rate_limits[i].queue_io = NULL; 1467 qos->rate_limits[i].update_quota = NULL; 1468 continue; 1469 } 1470 1471 switch (i) { 1472 case SPDK_BDEV_QOS_RW_IOPS_RATE_LIMIT: 1473 qos->rate_limits[i].queue_io = _spdk_bdev_qos_rw_queue_io; 1474 qos->rate_limits[i].update_quota = _spdk_bdev_qos_rw_iops_update_quota; 1475 break; 1476 case SPDK_BDEV_QOS_RW_BPS_RATE_LIMIT: 1477 qos->rate_limits[i].queue_io = _spdk_bdev_qos_rw_queue_io; 1478 qos->rate_limits[i].update_quota = _spdk_bdev_qos_rw_bps_update_quota; 1479 break; 1480 case SPDK_BDEV_QOS_R_BPS_RATE_LIMIT: 1481 qos->rate_limits[i].queue_io = _spdk_bdev_qos_r_queue_io; 1482 qos->rate_limits[i].update_quota = _spdk_bdev_qos_r_bps_update_quota; 1483 break; 1484 case SPDK_BDEV_QOS_W_BPS_RATE_LIMIT: 1485 qos->rate_limits[i].queue_io = _spdk_bdev_qos_w_queue_io; 1486 qos->rate_limits[i].update_quota = _spdk_bdev_qos_w_bps_update_quota; 1487 break; 1488 default: 1489 break; 1490 } 1491 } 1492 } 1493 1494 static inline void 1495 _spdk_bdev_io_do_submit(struct spdk_bdev_channel *bdev_ch, struct spdk_bdev_io *bdev_io) 1496 { 1497 struct spdk_bdev *bdev = bdev_io->bdev; 1498 struct spdk_io_channel *ch = bdev_ch->channel; 1499 struct spdk_bdev_shared_resource *shared_resource = bdev_ch->shared_resource; 1500 1501 if (spdk_likely(TAILQ_EMPTY(&shared_resource->nomem_io))) { 1502 bdev_ch->io_outstanding++; 1503 shared_resource->io_outstanding++; 1504 bdev_io->internal.in_submit_request = true; 1505 bdev->fn_table->submit_request(ch, bdev_io); 1506 bdev_io->internal.in_submit_request = false; 1507 } else { 1508 TAILQ_INSERT_TAIL(&shared_resource->nomem_io, bdev_io, internal.link); 1509 } 1510 } 1511 1512 static int 1513 _spdk_bdev_qos_io_submit(struct spdk_bdev_channel *ch, struct spdk_bdev_qos *qos) 1514 { 1515 struct spdk_bdev_io *bdev_io = NULL, *tmp = NULL; 1516 int i, submitted_ios = 0; 1517 1518 TAILQ_FOREACH_SAFE(bdev_io, &qos->queued, internal.link, tmp) { 1519 if (_spdk_bdev_qos_io_to_limit(bdev_io) == true) { 1520 for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) { 1521 if (!qos->rate_limits[i].queue_io) { 1522 continue; 1523 } 1524 1525 if (qos->rate_limits[i].queue_io(&qos->rate_limits[i], 1526 bdev_io) == true) { 1527 return submitted_ios; 1528 } 1529 } 1530 for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) { 1531 if (!qos->rate_limits[i].update_quota) { 1532 continue; 1533 } 1534 1535 qos->rate_limits[i].update_quota(&qos->rate_limits[i], bdev_io); 1536 } 1537 } 1538 1539 TAILQ_REMOVE(&qos->queued, bdev_io, internal.link); 1540 _spdk_bdev_io_do_submit(ch, bdev_io); 1541 submitted_ios++; 1542 } 1543 1544 return submitted_ios; 1545 } 1546 1547 static void 1548 _spdk_bdev_queue_io_wait_with_cb(struct spdk_bdev_io *bdev_io, spdk_bdev_io_wait_cb cb_fn) 1549 { 1550 int rc; 1551 1552 bdev_io->internal.waitq_entry.bdev = bdev_io->bdev; 1553 bdev_io->internal.waitq_entry.cb_fn = cb_fn; 1554 bdev_io->internal.waitq_entry.cb_arg = bdev_io; 1555 rc = spdk_bdev_queue_io_wait(bdev_io->bdev, spdk_io_channel_from_ctx(bdev_io->internal.ch), 1556 &bdev_io->internal.waitq_entry); 1557 if (rc != 0) { 1558 SPDK_ERRLOG("Queue IO failed, rc=%d\n", rc); 1559 bdev_io->internal.status = SPDK_BDEV_IO_STATUS_FAILED; 1560 bdev_io->internal.cb(bdev_io, false, bdev_io->internal.caller_ctx); 1561 } 1562 } 1563 1564 static bool 1565 _spdk_bdev_io_type_can_split(uint8_t type) 1566 { 1567 assert(type != SPDK_BDEV_IO_TYPE_INVALID); 1568 assert(type < SPDK_BDEV_NUM_IO_TYPES); 1569 1570 /* Only split READ and WRITE I/O. Theoretically other types of I/O like 1571 * UNMAP could be split, but these types of I/O are typically much larger 1572 * in size (sometimes the size of the entire block device), and the bdev 1573 * module can more efficiently split these types of I/O. Plus those types 1574 * of I/O do not have a payload, which makes the splitting process simpler. 1575 */ 1576 if (type == SPDK_BDEV_IO_TYPE_READ || type == SPDK_BDEV_IO_TYPE_WRITE) { 1577 return true; 1578 } else { 1579 return false; 1580 } 1581 } 1582 1583 static bool 1584 _spdk_bdev_io_should_split(struct spdk_bdev_io *bdev_io) 1585 { 1586 uint64_t start_stripe, end_stripe; 1587 uint32_t io_boundary = bdev_io->bdev->optimal_io_boundary; 1588 1589 if (io_boundary == 0) { 1590 return false; 1591 } 1592 1593 if (!_spdk_bdev_io_type_can_split(bdev_io->type)) { 1594 return false; 1595 } 1596 1597 start_stripe = bdev_io->u.bdev.offset_blocks; 1598 end_stripe = start_stripe + bdev_io->u.bdev.num_blocks - 1; 1599 /* Avoid expensive div operations if possible. These spdk_u32 functions are very cheap. */ 1600 if (spdk_likely(spdk_u32_is_pow2(io_boundary))) { 1601 start_stripe >>= spdk_u32log2(io_boundary); 1602 end_stripe >>= spdk_u32log2(io_boundary); 1603 } else { 1604 start_stripe /= io_boundary; 1605 end_stripe /= io_boundary; 1606 } 1607 return (start_stripe != end_stripe); 1608 } 1609 1610 static uint32_t 1611 _to_next_boundary(uint64_t offset, uint32_t boundary) 1612 { 1613 return (boundary - (offset % boundary)); 1614 } 1615 1616 static void 1617 _spdk_bdev_io_split_done(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg); 1618 1619 static void 1620 _spdk_bdev_io_split(void *_bdev_io) 1621 { 1622 struct spdk_bdev_io *bdev_io = _bdev_io; 1623 uint64_t current_offset, remaining; 1624 uint32_t blocklen, to_next_boundary, to_next_boundary_bytes, to_last_block_bytes; 1625 struct iovec *parent_iov, *iov; 1626 uint64_t parent_iov_offset, iov_len; 1627 uint32_t parent_iovpos, parent_iovcnt, child_iovcnt, iovcnt; 1628 void *md_buf = NULL; 1629 int rc; 1630 1631 remaining = bdev_io->u.bdev.split_remaining_num_blocks; 1632 current_offset = bdev_io->u.bdev.split_current_offset_blocks; 1633 blocklen = bdev_io->bdev->blocklen; 1634 parent_iov_offset = (current_offset - bdev_io->u.bdev.offset_blocks) * blocklen; 1635 parent_iovcnt = bdev_io->u.bdev.iovcnt; 1636 1637 for (parent_iovpos = 0; parent_iovpos < parent_iovcnt; parent_iovpos++) { 1638 parent_iov = &bdev_io->u.bdev.iovs[parent_iovpos]; 1639 if (parent_iov_offset < parent_iov->iov_len) { 1640 break; 1641 } 1642 parent_iov_offset -= parent_iov->iov_len; 1643 } 1644 1645 child_iovcnt = 0; 1646 while (remaining > 0 && parent_iovpos < parent_iovcnt && child_iovcnt < BDEV_IO_NUM_CHILD_IOV) { 1647 to_next_boundary = _to_next_boundary(current_offset, bdev_io->bdev->optimal_io_boundary); 1648 to_next_boundary = spdk_min(remaining, to_next_boundary); 1649 to_next_boundary_bytes = to_next_boundary * blocklen; 1650 iov = &bdev_io->child_iov[child_iovcnt]; 1651 iovcnt = 0; 1652 1653 if (bdev_io->u.bdev.md_buf) { 1654 assert((parent_iov_offset % blocklen) > 0); 1655 md_buf = (char *)bdev_io->u.bdev.md_buf + (parent_iov_offset / blocklen) * 1656 spdk_bdev_get_md_size(bdev_io->bdev); 1657 } 1658 1659 while (to_next_boundary_bytes > 0 && parent_iovpos < parent_iovcnt && 1660 child_iovcnt < BDEV_IO_NUM_CHILD_IOV) { 1661 parent_iov = &bdev_io->u.bdev.iovs[parent_iovpos]; 1662 iov_len = spdk_min(to_next_boundary_bytes, parent_iov->iov_len - parent_iov_offset); 1663 to_next_boundary_bytes -= iov_len; 1664 1665 bdev_io->child_iov[child_iovcnt].iov_base = parent_iov->iov_base + parent_iov_offset; 1666 bdev_io->child_iov[child_iovcnt].iov_len = iov_len; 1667 1668 if (iov_len < parent_iov->iov_len - parent_iov_offset) { 1669 parent_iov_offset += iov_len; 1670 } else { 1671 parent_iovpos++; 1672 parent_iov_offset = 0; 1673 } 1674 child_iovcnt++; 1675 iovcnt++; 1676 } 1677 1678 if (to_next_boundary_bytes > 0) { 1679 /* We had to stop this child I/O early because we ran out of 1680 * child_iov space. Ensure the iovs to be aligned with block 1681 * size and then adjust to_next_boundary before starting the 1682 * child I/O. 1683 */ 1684 assert(child_iovcnt == BDEV_IO_NUM_CHILD_IOV); 1685 to_last_block_bytes = to_next_boundary_bytes % blocklen; 1686 if (to_last_block_bytes != 0) { 1687 uint32_t child_iovpos = child_iovcnt - 1; 1688 /* don't decrease child_iovcnt so the loop will naturally end */ 1689 1690 to_next_boundary_bytes += _to_next_boundary(to_next_boundary_bytes, blocklen); 1691 while (to_last_block_bytes > 0 && iovcnt > 0) { 1692 iov_len = spdk_min(to_last_block_bytes, 1693 bdev_io->child_iov[child_iovpos].iov_len); 1694 bdev_io->child_iov[child_iovpos].iov_len -= iov_len; 1695 if (bdev_io->child_iov[child_iovpos].iov_len == 0) { 1696 child_iovpos--; 1697 iovcnt--; 1698 } 1699 to_last_block_bytes -= iov_len; 1700 } 1701 1702 assert(to_last_block_bytes == 0); 1703 } 1704 to_next_boundary -= to_next_boundary_bytes / blocklen; 1705 } 1706 1707 bdev_io->u.bdev.split_outstanding++; 1708 1709 if (bdev_io->type == SPDK_BDEV_IO_TYPE_READ) { 1710 rc = _spdk_bdev_readv_blocks_with_md(bdev_io->internal.desc, 1711 spdk_io_channel_from_ctx(bdev_io->internal.ch), 1712 iov, iovcnt, md_buf, current_offset, 1713 to_next_boundary, 1714 _spdk_bdev_io_split_done, bdev_io); 1715 } else { 1716 rc = _spdk_bdev_writev_blocks_with_md(bdev_io->internal.desc, 1717 spdk_io_channel_from_ctx(bdev_io->internal.ch), 1718 iov, iovcnt, md_buf, current_offset, 1719 to_next_boundary, 1720 _spdk_bdev_io_split_done, bdev_io); 1721 } 1722 1723 if (rc == 0) { 1724 current_offset += to_next_boundary; 1725 remaining -= to_next_boundary; 1726 bdev_io->u.bdev.split_current_offset_blocks = current_offset; 1727 bdev_io->u.bdev.split_remaining_num_blocks = remaining; 1728 } else { 1729 bdev_io->u.bdev.split_outstanding--; 1730 if (rc == -ENOMEM) { 1731 if (bdev_io->u.bdev.split_outstanding == 0) { 1732 /* No I/O is outstanding. Hence we should wait here. */ 1733 _spdk_bdev_queue_io_wait_with_cb(bdev_io, 1734 _spdk_bdev_io_split); 1735 } 1736 } else { 1737 bdev_io->internal.status = SPDK_BDEV_IO_STATUS_FAILED; 1738 if (bdev_io->u.bdev.split_outstanding == 0) { 1739 bdev_io->internal.cb(bdev_io, false, bdev_io->internal.caller_ctx); 1740 } 1741 } 1742 1743 return; 1744 } 1745 } 1746 } 1747 1748 static void 1749 _spdk_bdev_io_split_done(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg) 1750 { 1751 struct spdk_bdev_io *parent_io = cb_arg; 1752 1753 spdk_bdev_free_io(bdev_io); 1754 1755 if (!success) { 1756 parent_io->internal.status = SPDK_BDEV_IO_STATUS_FAILED; 1757 } 1758 parent_io->u.bdev.split_outstanding--; 1759 if (parent_io->u.bdev.split_outstanding != 0) { 1760 return; 1761 } 1762 1763 /* 1764 * Parent I/O finishes when all blocks are consumed. 1765 */ 1766 if (parent_io->u.bdev.split_remaining_num_blocks == 0) { 1767 parent_io->internal.cb(parent_io, parent_io->internal.status == SPDK_BDEV_IO_STATUS_SUCCESS, 1768 parent_io->internal.caller_ctx); 1769 return; 1770 } 1771 1772 /* 1773 * Continue with the splitting process. This function will complete the parent I/O if the 1774 * splitting is done. 1775 */ 1776 _spdk_bdev_io_split(parent_io); 1777 } 1778 1779 static void 1780 _spdk_bdev_io_split_get_buf_cb(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io, 1781 bool success); 1782 1783 static void 1784 spdk_bdev_io_split(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io) 1785 { 1786 assert(_spdk_bdev_io_type_can_split(bdev_io->type)); 1787 1788 bdev_io->u.bdev.split_current_offset_blocks = bdev_io->u.bdev.offset_blocks; 1789 bdev_io->u.bdev.split_remaining_num_blocks = bdev_io->u.bdev.num_blocks; 1790 bdev_io->u.bdev.split_outstanding = 0; 1791 bdev_io->internal.status = SPDK_BDEV_IO_STATUS_SUCCESS; 1792 1793 if (_is_buf_allocated(bdev_io->u.bdev.iovs)) { 1794 _spdk_bdev_io_split(bdev_io); 1795 } else { 1796 assert(bdev_io->type == SPDK_BDEV_IO_TYPE_READ); 1797 spdk_bdev_io_get_buf(bdev_io, _spdk_bdev_io_split_get_buf_cb, 1798 bdev_io->u.bdev.num_blocks * bdev_io->bdev->blocklen); 1799 } 1800 } 1801 1802 static void 1803 _spdk_bdev_io_split_get_buf_cb(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io, 1804 bool success) 1805 { 1806 if (!success) { 1807 spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED); 1808 return; 1809 } 1810 1811 spdk_bdev_io_split(ch, bdev_io); 1812 } 1813 1814 /* Explicitly mark this inline, since it's used as a function pointer and otherwise won't 1815 * be inlined, at least on some compilers. 1816 */ 1817 static inline void 1818 _spdk_bdev_io_submit(void *ctx) 1819 { 1820 struct spdk_bdev_io *bdev_io = ctx; 1821 struct spdk_bdev *bdev = bdev_io->bdev; 1822 struct spdk_bdev_channel *bdev_ch = bdev_io->internal.ch; 1823 struct spdk_bdev_shared_resource *shared_resource = bdev_ch->shared_resource; 1824 uint64_t tsc; 1825 1826 tsc = spdk_get_ticks(); 1827 bdev_io->internal.submit_tsc = tsc; 1828 spdk_trace_record_tsc(tsc, TRACE_BDEV_IO_START, 0, 0, (uintptr_t)bdev_io, bdev_io->type); 1829 1830 if (spdk_likely(bdev_ch->flags == 0)) { 1831 _spdk_bdev_io_do_submit(bdev_ch, bdev_io); 1832 return; 1833 } 1834 1835 bdev_ch->io_outstanding++; 1836 shared_resource->io_outstanding++; 1837 bdev_io->internal.in_submit_request = true; 1838 if (bdev_ch->flags & BDEV_CH_RESET_IN_PROGRESS) { 1839 spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED); 1840 } else if (bdev_ch->flags & BDEV_CH_QOS_ENABLED) { 1841 bdev_ch->io_outstanding--; 1842 shared_resource->io_outstanding--; 1843 TAILQ_INSERT_TAIL(&bdev->internal.qos->queued, bdev_io, internal.link); 1844 _spdk_bdev_qos_io_submit(bdev_ch, bdev->internal.qos); 1845 } else { 1846 SPDK_ERRLOG("unknown bdev_ch flag %x found\n", bdev_ch->flags); 1847 spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED); 1848 } 1849 bdev_io->internal.in_submit_request = false; 1850 } 1851 1852 static void 1853 spdk_bdev_io_submit(struct spdk_bdev_io *bdev_io) 1854 { 1855 struct spdk_bdev *bdev = bdev_io->bdev; 1856 struct spdk_thread *thread = spdk_bdev_io_get_thread(bdev_io); 1857 1858 assert(thread != NULL); 1859 assert(bdev_io->internal.status == SPDK_BDEV_IO_STATUS_PENDING); 1860 1861 if (bdev->split_on_optimal_io_boundary && _spdk_bdev_io_should_split(bdev_io)) { 1862 spdk_bdev_io_split(NULL, bdev_io); 1863 return; 1864 } 1865 1866 if (bdev_io->internal.ch->flags & BDEV_CH_QOS_ENABLED) { 1867 if ((thread == bdev->internal.qos->thread) || !bdev->internal.qos->thread) { 1868 _spdk_bdev_io_submit(bdev_io); 1869 } else { 1870 bdev_io->internal.io_submit_ch = bdev_io->internal.ch; 1871 bdev_io->internal.ch = bdev->internal.qos->ch; 1872 spdk_thread_send_msg(bdev->internal.qos->thread, _spdk_bdev_io_submit, bdev_io); 1873 } 1874 } else { 1875 _spdk_bdev_io_submit(bdev_io); 1876 } 1877 } 1878 1879 static void 1880 spdk_bdev_io_submit_reset(struct spdk_bdev_io *bdev_io) 1881 { 1882 struct spdk_bdev *bdev = bdev_io->bdev; 1883 struct spdk_bdev_channel *bdev_ch = bdev_io->internal.ch; 1884 struct spdk_io_channel *ch = bdev_ch->channel; 1885 1886 assert(bdev_io->internal.status == SPDK_BDEV_IO_STATUS_PENDING); 1887 1888 bdev_io->internal.in_submit_request = true; 1889 bdev->fn_table->submit_request(ch, bdev_io); 1890 bdev_io->internal.in_submit_request = false; 1891 } 1892 1893 static void 1894 spdk_bdev_io_init(struct spdk_bdev_io *bdev_io, 1895 struct spdk_bdev *bdev, void *cb_arg, 1896 spdk_bdev_io_completion_cb cb) 1897 { 1898 bdev_io->bdev = bdev; 1899 bdev_io->internal.caller_ctx = cb_arg; 1900 bdev_io->internal.cb = cb; 1901 bdev_io->internal.status = SPDK_BDEV_IO_STATUS_PENDING; 1902 bdev_io->internal.in_submit_request = false; 1903 bdev_io->internal.buf = NULL; 1904 bdev_io->internal.io_submit_ch = NULL; 1905 bdev_io->internal.orig_iovs = NULL; 1906 bdev_io->internal.orig_iovcnt = 0; 1907 bdev_io->internal.orig_md_buf = NULL; 1908 } 1909 1910 static bool 1911 _spdk_bdev_io_type_supported(struct spdk_bdev *bdev, enum spdk_bdev_io_type io_type) 1912 { 1913 return bdev->fn_table->io_type_supported(bdev->ctxt, io_type); 1914 } 1915 1916 bool 1917 spdk_bdev_io_type_supported(struct spdk_bdev *bdev, enum spdk_bdev_io_type io_type) 1918 { 1919 bool supported; 1920 1921 supported = _spdk_bdev_io_type_supported(bdev, io_type); 1922 1923 if (!supported) { 1924 switch (io_type) { 1925 case SPDK_BDEV_IO_TYPE_WRITE_ZEROES: 1926 /* The bdev layer will emulate write zeroes as long as write is supported. */ 1927 supported = _spdk_bdev_io_type_supported(bdev, SPDK_BDEV_IO_TYPE_WRITE); 1928 break; 1929 case SPDK_BDEV_IO_TYPE_ZCOPY: 1930 /* Zero copy can be emulated with regular read and write */ 1931 supported = _spdk_bdev_io_type_supported(bdev, SPDK_BDEV_IO_TYPE_READ) && 1932 _spdk_bdev_io_type_supported(bdev, SPDK_BDEV_IO_TYPE_WRITE); 1933 break; 1934 default: 1935 break; 1936 } 1937 } 1938 1939 return supported; 1940 } 1941 1942 int 1943 spdk_bdev_dump_info_json(struct spdk_bdev *bdev, struct spdk_json_write_ctx *w) 1944 { 1945 if (bdev->fn_table->dump_info_json) { 1946 return bdev->fn_table->dump_info_json(bdev->ctxt, w); 1947 } 1948 1949 return 0; 1950 } 1951 1952 static void 1953 spdk_bdev_qos_update_max_quota_per_timeslice(struct spdk_bdev_qos *qos) 1954 { 1955 uint32_t max_per_timeslice = 0; 1956 int i; 1957 1958 for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) { 1959 if (qos->rate_limits[i].limit == SPDK_BDEV_QOS_LIMIT_NOT_DEFINED) { 1960 qos->rate_limits[i].max_per_timeslice = 0; 1961 continue; 1962 } 1963 1964 max_per_timeslice = qos->rate_limits[i].limit * 1965 SPDK_BDEV_QOS_TIMESLICE_IN_USEC / SPDK_SEC_TO_USEC; 1966 1967 qos->rate_limits[i].max_per_timeslice = spdk_max(max_per_timeslice, 1968 qos->rate_limits[i].min_per_timeslice); 1969 1970 qos->rate_limits[i].remaining_this_timeslice = qos->rate_limits[i].max_per_timeslice; 1971 } 1972 1973 _spdk_bdev_qos_set_ops(qos); 1974 } 1975 1976 static int 1977 spdk_bdev_channel_poll_qos(void *arg) 1978 { 1979 struct spdk_bdev_qos *qos = arg; 1980 uint64_t now = spdk_get_ticks(); 1981 int i; 1982 1983 if (now < (qos->last_timeslice + qos->timeslice_size)) { 1984 /* We received our callback earlier than expected - return 1985 * immediately and wait to do accounting until at least one 1986 * timeslice has actually expired. This should never happen 1987 * with a well-behaved timer implementation. 1988 */ 1989 return 0; 1990 } 1991 1992 /* Reset for next round of rate limiting */ 1993 for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) { 1994 /* We may have allowed the IOs or bytes to slightly overrun in the last 1995 * timeslice. remaining_this_timeslice is signed, so if it's negative 1996 * here, we'll account for the overrun so that the next timeslice will 1997 * be appropriately reduced. 1998 */ 1999 if (qos->rate_limits[i].remaining_this_timeslice > 0) { 2000 qos->rate_limits[i].remaining_this_timeslice = 0; 2001 } 2002 } 2003 2004 while (now >= (qos->last_timeslice + qos->timeslice_size)) { 2005 qos->last_timeslice += qos->timeslice_size; 2006 for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) { 2007 qos->rate_limits[i].remaining_this_timeslice += 2008 qos->rate_limits[i].max_per_timeslice; 2009 } 2010 } 2011 2012 return _spdk_bdev_qos_io_submit(qos->ch, qos); 2013 } 2014 2015 static void 2016 _spdk_bdev_channel_destroy_resource(struct spdk_bdev_channel *ch) 2017 { 2018 struct spdk_bdev_shared_resource *shared_resource; 2019 2020 spdk_put_io_channel(ch->channel); 2021 2022 shared_resource = ch->shared_resource; 2023 2024 assert(ch->io_outstanding == 0); 2025 assert(shared_resource->ref > 0); 2026 shared_resource->ref--; 2027 if (shared_resource->ref == 0) { 2028 assert(shared_resource->io_outstanding == 0); 2029 TAILQ_REMOVE(&shared_resource->mgmt_ch->shared_resources, shared_resource, link); 2030 spdk_put_io_channel(spdk_io_channel_from_ctx(shared_resource->mgmt_ch)); 2031 free(shared_resource); 2032 } 2033 } 2034 2035 /* Caller must hold bdev->internal.mutex. */ 2036 static void 2037 _spdk_bdev_enable_qos(struct spdk_bdev *bdev, struct spdk_bdev_channel *ch) 2038 { 2039 struct spdk_bdev_qos *qos = bdev->internal.qos; 2040 int i; 2041 2042 /* Rate limiting on this bdev enabled */ 2043 if (qos) { 2044 if (qos->ch == NULL) { 2045 struct spdk_io_channel *io_ch; 2046 2047 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Selecting channel %p as QoS channel for bdev %s on thread %p\n", ch, 2048 bdev->name, spdk_get_thread()); 2049 2050 /* No qos channel has been selected, so set one up */ 2051 2052 /* Take another reference to ch */ 2053 io_ch = spdk_get_io_channel(__bdev_to_io_dev(bdev)); 2054 assert(io_ch != NULL); 2055 qos->ch = ch; 2056 2057 qos->thread = spdk_io_channel_get_thread(io_ch); 2058 2059 TAILQ_INIT(&qos->queued); 2060 2061 for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) { 2062 if (_spdk_bdev_qos_is_iops_rate_limit(i) == true) { 2063 qos->rate_limits[i].min_per_timeslice = 2064 SPDK_BDEV_QOS_MIN_IO_PER_TIMESLICE; 2065 } else { 2066 qos->rate_limits[i].min_per_timeslice = 2067 SPDK_BDEV_QOS_MIN_BYTE_PER_TIMESLICE; 2068 } 2069 2070 if (qos->rate_limits[i].limit == 0) { 2071 qos->rate_limits[i].limit = SPDK_BDEV_QOS_LIMIT_NOT_DEFINED; 2072 } 2073 } 2074 spdk_bdev_qos_update_max_quota_per_timeslice(qos); 2075 qos->timeslice_size = 2076 SPDK_BDEV_QOS_TIMESLICE_IN_USEC * spdk_get_ticks_hz() / SPDK_SEC_TO_USEC; 2077 qos->last_timeslice = spdk_get_ticks(); 2078 qos->poller = spdk_poller_register(spdk_bdev_channel_poll_qos, 2079 qos, 2080 SPDK_BDEV_QOS_TIMESLICE_IN_USEC); 2081 } 2082 2083 ch->flags |= BDEV_CH_QOS_ENABLED; 2084 } 2085 } 2086 2087 static int 2088 spdk_bdev_channel_create(void *io_device, void *ctx_buf) 2089 { 2090 struct spdk_bdev *bdev = __bdev_from_io_dev(io_device); 2091 struct spdk_bdev_channel *ch = ctx_buf; 2092 struct spdk_io_channel *mgmt_io_ch; 2093 struct spdk_bdev_mgmt_channel *mgmt_ch; 2094 struct spdk_bdev_shared_resource *shared_resource; 2095 2096 ch->bdev = bdev; 2097 ch->channel = bdev->fn_table->get_io_channel(bdev->ctxt); 2098 if (!ch->channel) { 2099 return -1; 2100 } 2101 2102 assert(ch->histogram == NULL); 2103 if (bdev->internal.histogram_enabled) { 2104 ch->histogram = spdk_histogram_data_alloc(); 2105 if (ch->histogram == NULL) { 2106 SPDK_ERRLOG("Could not allocate histogram\n"); 2107 } 2108 } 2109 2110 mgmt_io_ch = spdk_get_io_channel(&g_bdev_mgr); 2111 if (!mgmt_io_ch) { 2112 spdk_put_io_channel(ch->channel); 2113 return -1; 2114 } 2115 2116 mgmt_ch = spdk_io_channel_get_ctx(mgmt_io_ch); 2117 TAILQ_FOREACH(shared_resource, &mgmt_ch->shared_resources, link) { 2118 if (shared_resource->shared_ch == ch->channel) { 2119 spdk_put_io_channel(mgmt_io_ch); 2120 shared_resource->ref++; 2121 break; 2122 } 2123 } 2124 2125 if (shared_resource == NULL) { 2126 shared_resource = calloc(1, sizeof(*shared_resource)); 2127 if (shared_resource == NULL) { 2128 spdk_put_io_channel(ch->channel); 2129 spdk_put_io_channel(mgmt_io_ch); 2130 return -1; 2131 } 2132 2133 shared_resource->mgmt_ch = mgmt_ch; 2134 shared_resource->io_outstanding = 0; 2135 TAILQ_INIT(&shared_resource->nomem_io); 2136 shared_resource->nomem_threshold = 0; 2137 shared_resource->shared_ch = ch->channel; 2138 shared_resource->ref = 1; 2139 TAILQ_INSERT_TAIL(&mgmt_ch->shared_resources, shared_resource, link); 2140 } 2141 2142 memset(&ch->stat, 0, sizeof(ch->stat)); 2143 ch->stat.ticks_rate = spdk_get_ticks_hz(); 2144 ch->io_outstanding = 0; 2145 TAILQ_INIT(&ch->queued_resets); 2146 ch->flags = 0; 2147 ch->shared_resource = shared_resource; 2148 2149 #ifdef SPDK_CONFIG_VTUNE 2150 { 2151 char *name; 2152 __itt_init_ittlib(NULL, 0); 2153 name = spdk_sprintf_alloc("spdk_bdev_%s_%p", ch->bdev->name, ch); 2154 if (!name) { 2155 _spdk_bdev_channel_destroy_resource(ch); 2156 return -1; 2157 } 2158 ch->handle = __itt_string_handle_create(name); 2159 free(name); 2160 ch->start_tsc = spdk_get_ticks(); 2161 ch->interval_tsc = spdk_get_ticks_hz() / 100; 2162 memset(&ch->prev_stat, 0, sizeof(ch->prev_stat)); 2163 } 2164 #endif 2165 2166 pthread_mutex_lock(&bdev->internal.mutex); 2167 _spdk_bdev_enable_qos(bdev, ch); 2168 pthread_mutex_unlock(&bdev->internal.mutex); 2169 2170 return 0; 2171 } 2172 2173 /* 2174 * Abort I/O that are waiting on a data buffer. These types of I/O are 2175 * linked using the spdk_bdev_io internal.buf_link TAILQ_ENTRY. 2176 */ 2177 static void 2178 _spdk_bdev_abort_buf_io(bdev_io_stailq_t *queue, struct spdk_bdev_channel *ch) 2179 { 2180 bdev_io_stailq_t tmp; 2181 struct spdk_bdev_io *bdev_io; 2182 2183 STAILQ_INIT(&tmp); 2184 2185 while (!STAILQ_EMPTY(queue)) { 2186 bdev_io = STAILQ_FIRST(queue); 2187 STAILQ_REMOVE_HEAD(queue, internal.buf_link); 2188 if (bdev_io->internal.ch == ch) { 2189 spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED); 2190 } else { 2191 STAILQ_INSERT_TAIL(&tmp, bdev_io, internal.buf_link); 2192 } 2193 } 2194 2195 STAILQ_SWAP(&tmp, queue, spdk_bdev_io); 2196 } 2197 2198 /* 2199 * Abort I/O that are queued waiting for submission. These types of I/O are 2200 * linked using the spdk_bdev_io link TAILQ_ENTRY. 2201 */ 2202 static void 2203 _spdk_bdev_abort_queued_io(bdev_io_tailq_t *queue, struct spdk_bdev_channel *ch) 2204 { 2205 struct spdk_bdev_io *bdev_io, *tmp; 2206 2207 TAILQ_FOREACH_SAFE(bdev_io, queue, internal.link, tmp) { 2208 if (bdev_io->internal.ch == ch) { 2209 TAILQ_REMOVE(queue, bdev_io, internal.link); 2210 /* 2211 * spdk_bdev_io_complete() assumes that the completed I/O had 2212 * been submitted to the bdev module. Since in this case it 2213 * hadn't, bump io_outstanding to account for the decrement 2214 * that spdk_bdev_io_complete() will do. 2215 */ 2216 if (bdev_io->type != SPDK_BDEV_IO_TYPE_RESET) { 2217 ch->io_outstanding++; 2218 ch->shared_resource->io_outstanding++; 2219 } 2220 spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED); 2221 } 2222 } 2223 } 2224 2225 static void 2226 spdk_bdev_qos_channel_destroy(void *cb_arg) 2227 { 2228 struct spdk_bdev_qos *qos = cb_arg; 2229 2230 spdk_put_io_channel(spdk_io_channel_from_ctx(qos->ch)); 2231 spdk_poller_unregister(&qos->poller); 2232 2233 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Free QoS %p.\n", qos); 2234 2235 free(qos); 2236 } 2237 2238 static int 2239 spdk_bdev_qos_destroy(struct spdk_bdev *bdev) 2240 { 2241 int i; 2242 2243 /* 2244 * Cleanly shutting down the QoS poller is tricky, because 2245 * during the asynchronous operation the user could open 2246 * a new descriptor and create a new channel, spawning 2247 * a new QoS poller. 2248 * 2249 * The strategy is to create a new QoS structure here and swap it 2250 * in. The shutdown path then continues to refer to the old one 2251 * until it completes and then releases it. 2252 */ 2253 struct spdk_bdev_qos *new_qos, *old_qos; 2254 2255 old_qos = bdev->internal.qos; 2256 2257 new_qos = calloc(1, sizeof(*new_qos)); 2258 if (!new_qos) { 2259 SPDK_ERRLOG("Unable to allocate memory to shut down QoS.\n"); 2260 return -ENOMEM; 2261 } 2262 2263 /* Copy the old QoS data into the newly allocated structure */ 2264 memcpy(new_qos, old_qos, sizeof(*new_qos)); 2265 2266 /* Zero out the key parts of the QoS structure */ 2267 new_qos->ch = NULL; 2268 new_qos->thread = NULL; 2269 new_qos->poller = NULL; 2270 TAILQ_INIT(&new_qos->queued); 2271 /* 2272 * The limit member of spdk_bdev_qos_limit structure is not zeroed. 2273 * It will be used later for the new QoS structure. 2274 */ 2275 for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) { 2276 new_qos->rate_limits[i].remaining_this_timeslice = 0; 2277 new_qos->rate_limits[i].min_per_timeslice = 0; 2278 new_qos->rate_limits[i].max_per_timeslice = 0; 2279 } 2280 2281 bdev->internal.qos = new_qos; 2282 2283 if (old_qos->thread == NULL) { 2284 free(old_qos); 2285 } else { 2286 spdk_thread_send_msg(old_qos->thread, spdk_bdev_qos_channel_destroy, 2287 old_qos); 2288 } 2289 2290 /* It is safe to continue with destroying the bdev even though the QoS channel hasn't 2291 * been destroyed yet. The destruction path will end up waiting for the final 2292 * channel to be put before it releases resources. */ 2293 2294 return 0; 2295 } 2296 2297 static void 2298 _spdk_bdev_io_stat_add(struct spdk_bdev_io_stat *total, struct spdk_bdev_io_stat *add) 2299 { 2300 total->bytes_read += add->bytes_read; 2301 total->num_read_ops += add->num_read_ops; 2302 total->bytes_written += add->bytes_written; 2303 total->num_write_ops += add->num_write_ops; 2304 total->bytes_unmapped += add->bytes_unmapped; 2305 total->num_unmap_ops += add->num_unmap_ops; 2306 total->read_latency_ticks += add->read_latency_ticks; 2307 total->write_latency_ticks += add->write_latency_ticks; 2308 total->unmap_latency_ticks += add->unmap_latency_ticks; 2309 } 2310 2311 static void 2312 spdk_bdev_channel_destroy(void *io_device, void *ctx_buf) 2313 { 2314 struct spdk_bdev_channel *ch = ctx_buf; 2315 struct spdk_bdev_mgmt_channel *mgmt_ch; 2316 struct spdk_bdev_shared_resource *shared_resource = ch->shared_resource; 2317 2318 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Destroying channel %p for bdev %s on thread %p\n", ch, ch->bdev->name, 2319 spdk_get_thread()); 2320 2321 /* This channel is going away, so add its statistics into the bdev so that they don't get lost. */ 2322 pthread_mutex_lock(&ch->bdev->internal.mutex); 2323 _spdk_bdev_io_stat_add(&ch->bdev->internal.stat, &ch->stat); 2324 pthread_mutex_unlock(&ch->bdev->internal.mutex); 2325 2326 mgmt_ch = shared_resource->mgmt_ch; 2327 2328 _spdk_bdev_abort_queued_io(&ch->queued_resets, ch); 2329 _spdk_bdev_abort_queued_io(&shared_resource->nomem_io, ch); 2330 _spdk_bdev_abort_buf_io(&mgmt_ch->need_buf_small, ch); 2331 _spdk_bdev_abort_buf_io(&mgmt_ch->need_buf_large, ch); 2332 2333 if (ch->histogram) { 2334 spdk_histogram_data_free(ch->histogram); 2335 } 2336 2337 _spdk_bdev_channel_destroy_resource(ch); 2338 } 2339 2340 int 2341 spdk_bdev_alias_add(struct spdk_bdev *bdev, const char *alias) 2342 { 2343 struct spdk_bdev_alias *tmp; 2344 2345 if (alias == NULL) { 2346 SPDK_ERRLOG("Empty alias passed\n"); 2347 return -EINVAL; 2348 } 2349 2350 if (spdk_bdev_get_by_name(alias)) { 2351 SPDK_ERRLOG("Bdev name/alias: %s already exists\n", alias); 2352 return -EEXIST; 2353 } 2354 2355 tmp = calloc(1, sizeof(*tmp)); 2356 if (tmp == NULL) { 2357 SPDK_ERRLOG("Unable to allocate alias\n"); 2358 return -ENOMEM; 2359 } 2360 2361 tmp->alias = strdup(alias); 2362 if (tmp->alias == NULL) { 2363 free(tmp); 2364 SPDK_ERRLOG("Unable to allocate alias\n"); 2365 return -ENOMEM; 2366 } 2367 2368 TAILQ_INSERT_TAIL(&bdev->aliases, tmp, tailq); 2369 2370 return 0; 2371 } 2372 2373 int 2374 spdk_bdev_alias_del(struct spdk_bdev *bdev, const char *alias) 2375 { 2376 struct spdk_bdev_alias *tmp; 2377 2378 TAILQ_FOREACH(tmp, &bdev->aliases, tailq) { 2379 if (strcmp(alias, tmp->alias) == 0) { 2380 TAILQ_REMOVE(&bdev->aliases, tmp, tailq); 2381 free(tmp->alias); 2382 free(tmp); 2383 return 0; 2384 } 2385 } 2386 2387 SPDK_INFOLOG(SPDK_LOG_BDEV, "Alias %s does not exists\n", alias); 2388 2389 return -ENOENT; 2390 } 2391 2392 void 2393 spdk_bdev_alias_del_all(struct spdk_bdev *bdev) 2394 { 2395 struct spdk_bdev_alias *p, *tmp; 2396 2397 TAILQ_FOREACH_SAFE(p, &bdev->aliases, tailq, tmp) { 2398 TAILQ_REMOVE(&bdev->aliases, p, tailq); 2399 free(p->alias); 2400 free(p); 2401 } 2402 } 2403 2404 struct spdk_io_channel * 2405 spdk_bdev_get_io_channel(struct spdk_bdev_desc *desc) 2406 { 2407 return spdk_get_io_channel(__bdev_to_io_dev(spdk_bdev_desc_get_bdev(desc))); 2408 } 2409 2410 const char * 2411 spdk_bdev_get_name(const struct spdk_bdev *bdev) 2412 { 2413 return bdev->name; 2414 } 2415 2416 const char * 2417 spdk_bdev_get_product_name(const struct spdk_bdev *bdev) 2418 { 2419 return bdev->product_name; 2420 } 2421 2422 const struct spdk_bdev_aliases_list * 2423 spdk_bdev_get_aliases(const struct spdk_bdev *bdev) 2424 { 2425 return &bdev->aliases; 2426 } 2427 2428 uint32_t 2429 spdk_bdev_get_block_size(const struct spdk_bdev *bdev) 2430 { 2431 return bdev->blocklen; 2432 } 2433 2434 uint64_t 2435 spdk_bdev_get_num_blocks(const struct spdk_bdev *bdev) 2436 { 2437 return bdev->blockcnt; 2438 } 2439 2440 const char * 2441 spdk_bdev_get_qos_rpc_type(enum spdk_bdev_qos_rate_limit_type type) 2442 { 2443 return qos_rpc_type[type]; 2444 } 2445 2446 void 2447 spdk_bdev_get_qos_rate_limits(struct spdk_bdev *bdev, uint64_t *limits) 2448 { 2449 int i; 2450 2451 memset(limits, 0, sizeof(*limits) * SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES); 2452 2453 pthread_mutex_lock(&bdev->internal.mutex); 2454 if (bdev->internal.qos) { 2455 for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) { 2456 if (bdev->internal.qos->rate_limits[i].limit != 2457 SPDK_BDEV_QOS_LIMIT_NOT_DEFINED) { 2458 limits[i] = bdev->internal.qos->rate_limits[i].limit; 2459 if (_spdk_bdev_qos_is_iops_rate_limit(i) == false) { 2460 /* Change from Byte to Megabyte which is user visible. */ 2461 limits[i] = limits[i] / 1024 / 1024; 2462 } 2463 } 2464 } 2465 } 2466 pthread_mutex_unlock(&bdev->internal.mutex); 2467 } 2468 2469 size_t 2470 spdk_bdev_get_buf_align(const struct spdk_bdev *bdev) 2471 { 2472 return 1 << bdev->required_alignment; 2473 } 2474 2475 uint32_t 2476 spdk_bdev_get_optimal_io_boundary(const struct spdk_bdev *bdev) 2477 { 2478 return bdev->optimal_io_boundary; 2479 } 2480 2481 bool 2482 spdk_bdev_has_write_cache(const struct spdk_bdev *bdev) 2483 { 2484 return bdev->write_cache; 2485 } 2486 2487 const struct spdk_uuid * 2488 spdk_bdev_get_uuid(const struct spdk_bdev *bdev) 2489 { 2490 return &bdev->uuid; 2491 } 2492 2493 uint32_t 2494 spdk_bdev_get_md_size(const struct spdk_bdev *bdev) 2495 { 2496 return bdev->md_len; 2497 } 2498 2499 bool 2500 spdk_bdev_is_md_interleaved(const struct spdk_bdev *bdev) 2501 { 2502 return (bdev->md_len != 0) && bdev->md_interleave; 2503 } 2504 2505 bool 2506 spdk_bdev_is_md_separate(const struct spdk_bdev *bdev) 2507 { 2508 return (bdev->md_len != 0) && !bdev->md_interleave; 2509 } 2510 2511 uint32_t 2512 spdk_bdev_get_data_block_size(const struct spdk_bdev *bdev) 2513 { 2514 if (spdk_bdev_is_md_interleaved(bdev)) { 2515 return bdev->blocklen - bdev->md_len; 2516 } else { 2517 return bdev->blocklen; 2518 } 2519 } 2520 2521 static uint32_t 2522 _bdev_get_block_size_with_md(const struct spdk_bdev *bdev) 2523 { 2524 if (!spdk_bdev_is_md_interleaved(bdev)) { 2525 return bdev->blocklen + bdev->md_len; 2526 } else { 2527 return bdev->blocklen; 2528 } 2529 } 2530 2531 enum spdk_dif_type spdk_bdev_get_dif_type(const struct spdk_bdev *bdev) 2532 { 2533 if (bdev->md_len != 0) { 2534 return bdev->dif_type; 2535 } else { 2536 return SPDK_DIF_DISABLE; 2537 } 2538 } 2539 2540 bool 2541 spdk_bdev_is_dif_head_of_md(const struct spdk_bdev *bdev) 2542 { 2543 if (spdk_bdev_get_dif_type(bdev) != SPDK_DIF_DISABLE) { 2544 return bdev->dif_is_head_of_md; 2545 } else { 2546 return false; 2547 } 2548 } 2549 2550 bool 2551 spdk_bdev_is_dif_check_enabled(const struct spdk_bdev *bdev, 2552 enum spdk_dif_check_type check_type) 2553 { 2554 if (spdk_bdev_get_dif_type(bdev) == SPDK_DIF_DISABLE) { 2555 return false; 2556 } 2557 2558 switch (check_type) { 2559 case SPDK_DIF_CHECK_TYPE_REFTAG: 2560 return (bdev->dif_check_flags & SPDK_DIF_FLAGS_REFTAG_CHECK) != 0; 2561 case SPDK_DIF_CHECK_TYPE_APPTAG: 2562 return (bdev->dif_check_flags & SPDK_DIF_FLAGS_APPTAG_CHECK) != 0; 2563 case SPDK_DIF_CHECK_TYPE_GUARD: 2564 return (bdev->dif_check_flags & SPDK_DIF_FLAGS_GUARD_CHECK) != 0; 2565 default: 2566 return false; 2567 } 2568 } 2569 2570 uint64_t 2571 spdk_bdev_get_qd(const struct spdk_bdev *bdev) 2572 { 2573 return bdev->internal.measured_queue_depth; 2574 } 2575 2576 uint64_t 2577 spdk_bdev_get_qd_sampling_period(const struct spdk_bdev *bdev) 2578 { 2579 return bdev->internal.period; 2580 } 2581 2582 uint64_t 2583 spdk_bdev_get_weighted_io_time(const struct spdk_bdev *bdev) 2584 { 2585 return bdev->internal.weighted_io_time; 2586 } 2587 2588 uint64_t 2589 spdk_bdev_get_io_time(const struct spdk_bdev *bdev) 2590 { 2591 return bdev->internal.io_time; 2592 } 2593 2594 static void 2595 _calculate_measured_qd_cpl(struct spdk_io_channel_iter *i, int status) 2596 { 2597 struct spdk_bdev *bdev = spdk_io_channel_iter_get_ctx(i); 2598 2599 bdev->internal.measured_queue_depth = bdev->internal.temporary_queue_depth; 2600 2601 if (bdev->internal.measured_queue_depth) { 2602 bdev->internal.io_time += bdev->internal.period; 2603 bdev->internal.weighted_io_time += bdev->internal.period * bdev->internal.measured_queue_depth; 2604 } 2605 } 2606 2607 static void 2608 _calculate_measured_qd(struct spdk_io_channel_iter *i) 2609 { 2610 struct spdk_bdev *bdev = spdk_io_channel_iter_get_ctx(i); 2611 struct spdk_io_channel *io_ch = spdk_io_channel_iter_get_channel(i); 2612 struct spdk_bdev_channel *ch = spdk_io_channel_get_ctx(io_ch); 2613 2614 bdev->internal.temporary_queue_depth += ch->io_outstanding; 2615 spdk_for_each_channel_continue(i, 0); 2616 } 2617 2618 static int 2619 spdk_bdev_calculate_measured_queue_depth(void *ctx) 2620 { 2621 struct spdk_bdev *bdev = ctx; 2622 bdev->internal.temporary_queue_depth = 0; 2623 spdk_for_each_channel(__bdev_to_io_dev(bdev), _calculate_measured_qd, bdev, 2624 _calculate_measured_qd_cpl); 2625 return 0; 2626 } 2627 2628 void 2629 spdk_bdev_set_qd_sampling_period(struct spdk_bdev *bdev, uint64_t period) 2630 { 2631 bdev->internal.period = period; 2632 2633 if (bdev->internal.qd_poller != NULL) { 2634 spdk_poller_unregister(&bdev->internal.qd_poller); 2635 bdev->internal.measured_queue_depth = UINT64_MAX; 2636 } 2637 2638 if (period != 0) { 2639 bdev->internal.qd_poller = spdk_poller_register(spdk_bdev_calculate_measured_queue_depth, bdev, 2640 period); 2641 } 2642 } 2643 2644 int 2645 spdk_bdev_notify_blockcnt_change(struct spdk_bdev *bdev, uint64_t size) 2646 { 2647 int ret; 2648 2649 pthread_mutex_lock(&bdev->internal.mutex); 2650 2651 /* bdev has open descriptors */ 2652 if (!TAILQ_EMPTY(&bdev->internal.open_descs) && 2653 bdev->blockcnt > size) { 2654 ret = -EBUSY; 2655 } else { 2656 bdev->blockcnt = size; 2657 ret = 0; 2658 } 2659 2660 pthread_mutex_unlock(&bdev->internal.mutex); 2661 2662 return ret; 2663 } 2664 2665 /* 2666 * Convert I/O offset and length from bytes to blocks. 2667 * 2668 * Returns zero on success or non-zero if the byte parameters aren't divisible by the block size. 2669 */ 2670 static uint64_t 2671 spdk_bdev_bytes_to_blocks(struct spdk_bdev *bdev, uint64_t offset_bytes, uint64_t *offset_blocks, 2672 uint64_t num_bytes, uint64_t *num_blocks) 2673 { 2674 uint32_t block_size = bdev->blocklen; 2675 uint8_t shift_cnt; 2676 2677 /* Avoid expensive div operations if possible. These spdk_u32 functions are very cheap. */ 2678 if (spdk_likely(spdk_u32_is_pow2(block_size))) { 2679 shift_cnt = spdk_u32log2(block_size); 2680 *offset_blocks = offset_bytes >> shift_cnt; 2681 *num_blocks = num_bytes >> shift_cnt; 2682 return (offset_bytes - (*offset_blocks << shift_cnt)) | 2683 (num_bytes - (*num_blocks << shift_cnt)); 2684 } else { 2685 *offset_blocks = offset_bytes / block_size; 2686 *num_blocks = num_bytes / block_size; 2687 return (offset_bytes % block_size) | (num_bytes % block_size); 2688 } 2689 } 2690 2691 static bool 2692 spdk_bdev_io_valid_blocks(struct spdk_bdev *bdev, uint64_t offset_blocks, uint64_t num_blocks) 2693 { 2694 /* Return failure if offset_blocks + num_blocks is less than offset_blocks; indicates there 2695 * has been an overflow and hence the offset has been wrapped around */ 2696 if (offset_blocks + num_blocks < offset_blocks) { 2697 return false; 2698 } 2699 2700 /* Return failure if offset_blocks + num_blocks exceeds the size of the bdev */ 2701 if (offset_blocks + num_blocks > bdev->blockcnt) { 2702 return false; 2703 } 2704 2705 return true; 2706 } 2707 2708 static bool 2709 _bdev_io_check_md_buf(const struct iovec *iovs, const void *md_buf) 2710 { 2711 return _is_buf_allocated(iovs) == (md_buf != NULL); 2712 } 2713 2714 static int 2715 _spdk_bdev_read_blocks_with_md(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, void *buf, 2716 void *md_buf, int64_t offset_blocks, uint64_t num_blocks, 2717 spdk_bdev_io_completion_cb cb, void *cb_arg) 2718 { 2719 struct spdk_bdev *bdev = spdk_bdev_desc_get_bdev(desc); 2720 struct spdk_bdev_io *bdev_io; 2721 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 2722 2723 if (!spdk_bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) { 2724 return -EINVAL; 2725 } 2726 2727 bdev_io = spdk_bdev_get_io(channel); 2728 if (!bdev_io) { 2729 return -ENOMEM; 2730 } 2731 2732 bdev_io->internal.ch = channel; 2733 bdev_io->internal.desc = desc; 2734 bdev_io->type = SPDK_BDEV_IO_TYPE_READ; 2735 bdev_io->u.bdev.iovs = &bdev_io->iov; 2736 bdev_io->u.bdev.iovs[0].iov_base = buf; 2737 bdev_io->u.bdev.iovs[0].iov_len = num_blocks * bdev->blocklen; 2738 bdev_io->u.bdev.iovcnt = 1; 2739 bdev_io->u.bdev.md_buf = md_buf; 2740 bdev_io->u.bdev.num_blocks = num_blocks; 2741 bdev_io->u.bdev.offset_blocks = offset_blocks; 2742 spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb); 2743 2744 spdk_bdev_io_submit(bdev_io); 2745 return 0; 2746 } 2747 2748 int 2749 spdk_bdev_read(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 2750 void *buf, uint64_t offset, uint64_t nbytes, 2751 spdk_bdev_io_completion_cb cb, void *cb_arg) 2752 { 2753 uint64_t offset_blocks, num_blocks; 2754 2755 if (spdk_bdev_bytes_to_blocks(spdk_bdev_desc_get_bdev(desc), offset, &offset_blocks, 2756 nbytes, &num_blocks) != 0) { 2757 return -EINVAL; 2758 } 2759 2760 return spdk_bdev_read_blocks(desc, ch, buf, offset_blocks, num_blocks, cb, cb_arg); 2761 } 2762 2763 int 2764 spdk_bdev_read_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 2765 void *buf, uint64_t offset_blocks, uint64_t num_blocks, 2766 spdk_bdev_io_completion_cb cb, void *cb_arg) 2767 { 2768 return _spdk_bdev_read_blocks_with_md(desc, ch, buf, NULL, offset_blocks, num_blocks, 2769 cb, cb_arg); 2770 } 2771 2772 int 2773 spdk_bdev_read_blocks_with_md(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 2774 void *buf, void *md_buf, int64_t offset_blocks, uint64_t num_blocks, 2775 spdk_bdev_io_completion_cb cb, void *cb_arg) 2776 { 2777 struct iovec iov = { 2778 .iov_base = buf, 2779 }; 2780 2781 if (!spdk_bdev_is_md_separate(spdk_bdev_desc_get_bdev(desc))) { 2782 return -EINVAL; 2783 } 2784 2785 if (!_bdev_io_check_md_buf(&iov, md_buf)) { 2786 return -EINVAL; 2787 } 2788 2789 return _spdk_bdev_read_blocks_with_md(desc, ch, buf, md_buf, offset_blocks, num_blocks, 2790 cb, cb_arg); 2791 } 2792 2793 int 2794 spdk_bdev_readv(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 2795 struct iovec *iov, int iovcnt, 2796 uint64_t offset, uint64_t nbytes, 2797 spdk_bdev_io_completion_cb cb, void *cb_arg) 2798 { 2799 uint64_t offset_blocks, num_blocks; 2800 2801 if (spdk_bdev_bytes_to_blocks(spdk_bdev_desc_get_bdev(desc), offset, &offset_blocks, 2802 nbytes, &num_blocks) != 0) { 2803 return -EINVAL; 2804 } 2805 2806 return spdk_bdev_readv_blocks(desc, ch, iov, iovcnt, offset_blocks, num_blocks, cb, cb_arg); 2807 } 2808 2809 static int 2810 _spdk_bdev_readv_blocks_with_md(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 2811 struct iovec *iov, int iovcnt, void *md_buf, uint64_t offset_blocks, 2812 uint64_t num_blocks, spdk_bdev_io_completion_cb cb, void *cb_arg) 2813 { 2814 struct spdk_bdev *bdev = spdk_bdev_desc_get_bdev(desc); 2815 struct spdk_bdev_io *bdev_io; 2816 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 2817 2818 if (!spdk_bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) { 2819 return -EINVAL; 2820 } 2821 2822 bdev_io = spdk_bdev_get_io(channel); 2823 if (!bdev_io) { 2824 return -ENOMEM; 2825 } 2826 2827 bdev_io->internal.ch = channel; 2828 bdev_io->internal.desc = desc; 2829 bdev_io->type = SPDK_BDEV_IO_TYPE_READ; 2830 bdev_io->u.bdev.iovs = iov; 2831 bdev_io->u.bdev.iovcnt = iovcnt; 2832 bdev_io->u.bdev.md_buf = md_buf; 2833 bdev_io->u.bdev.num_blocks = num_blocks; 2834 bdev_io->u.bdev.offset_blocks = offset_blocks; 2835 spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb); 2836 2837 spdk_bdev_io_submit(bdev_io); 2838 return 0; 2839 } 2840 2841 int spdk_bdev_readv_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 2842 struct iovec *iov, int iovcnt, 2843 uint64_t offset_blocks, uint64_t num_blocks, 2844 spdk_bdev_io_completion_cb cb, void *cb_arg) 2845 { 2846 return _spdk_bdev_readv_blocks_with_md(desc, ch, iov, iovcnt, NULL, offset_blocks, 2847 num_blocks, cb, cb_arg); 2848 } 2849 2850 int 2851 spdk_bdev_readv_blocks_with_md(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 2852 struct iovec *iov, int iovcnt, void *md_buf, 2853 uint64_t offset_blocks, uint64_t num_blocks, 2854 spdk_bdev_io_completion_cb cb, void *cb_arg) 2855 { 2856 if (!spdk_bdev_is_md_separate(spdk_bdev_desc_get_bdev(desc))) { 2857 return -EINVAL; 2858 } 2859 2860 if (!_bdev_io_check_md_buf(iov, md_buf)) { 2861 return -EINVAL; 2862 } 2863 2864 return _spdk_bdev_readv_blocks_with_md(desc, ch, iov, iovcnt, md_buf, offset_blocks, 2865 num_blocks, cb, cb_arg); 2866 } 2867 2868 static int 2869 _spdk_bdev_write_blocks_with_md(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 2870 void *buf, void *md_buf, uint64_t offset_blocks, uint64_t num_blocks, 2871 spdk_bdev_io_completion_cb cb, void *cb_arg) 2872 { 2873 struct spdk_bdev *bdev = spdk_bdev_desc_get_bdev(desc); 2874 struct spdk_bdev_io *bdev_io; 2875 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 2876 2877 if (!desc->write) { 2878 return -EBADF; 2879 } 2880 2881 if (!spdk_bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) { 2882 return -EINVAL; 2883 } 2884 2885 bdev_io = spdk_bdev_get_io(channel); 2886 if (!bdev_io) { 2887 return -ENOMEM; 2888 } 2889 2890 bdev_io->internal.ch = channel; 2891 bdev_io->internal.desc = desc; 2892 bdev_io->type = SPDK_BDEV_IO_TYPE_WRITE; 2893 bdev_io->u.bdev.iovs = &bdev_io->iov; 2894 bdev_io->u.bdev.iovs[0].iov_base = buf; 2895 bdev_io->u.bdev.iovs[0].iov_len = num_blocks * bdev->blocklen; 2896 bdev_io->u.bdev.iovcnt = 1; 2897 bdev_io->u.bdev.md_buf = md_buf; 2898 bdev_io->u.bdev.num_blocks = num_blocks; 2899 bdev_io->u.bdev.offset_blocks = offset_blocks; 2900 spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb); 2901 2902 spdk_bdev_io_submit(bdev_io); 2903 return 0; 2904 } 2905 2906 int 2907 spdk_bdev_write(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 2908 void *buf, uint64_t offset, uint64_t nbytes, 2909 spdk_bdev_io_completion_cb cb, void *cb_arg) 2910 { 2911 uint64_t offset_blocks, num_blocks; 2912 2913 if (spdk_bdev_bytes_to_blocks(spdk_bdev_desc_get_bdev(desc), offset, &offset_blocks, 2914 nbytes, &num_blocks) != 0) { 2915 return -EINVAL; 2916 } 2917 2918 return spdk_bdev_write_blocks(desc, ch, buf, offset_blocks, num_blocks, cb, cb_arg); 2919 } 2920 2921 int 2922 spdk_bdev_write_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 2923 void *buf, uint64_t offset_blocks, uint64_t num_blocks, 2924 spdk_bdev_io_completion_cb cb, void *cb_arg) 2925 { 2926 return _spdk_bdev_write_blocks_with_md(desc, ch, buf, NULL, offset_blocks, num_blocks, 2927 cb, cb_arg); 2928 } 2929 2930 int 2931 spdk_bdev_write_blocks_with_md(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 2932 void *buf, void *md_buf, uint64_t offset_blocks, uint64_t num_blocks, 2933 spdk_bdev_io_completion_cb cb, void *cb_arg) 2934 { 2935 struct iovec iov = { 2936 .iov_base = buf, 2937 }; 2938 2939 if (!spdk_bdev_is_md_separate(spdk_bdev_desc_get_bdev(desc))) { 2940 return -EINVAL; 2941 } 2942 2943 if (!_bdev_io_check_md_buf(&iov, md_buf)) { 2944 return -EINVAL; 2945 } 2946 2947 return _spdk_bdev_write_blocks_with_md(desc, ch, buf, md_buf, offset_blocks, num_blocks, 2948 cb, cb_arg); 2949 } 2950 2951 static int 2952 _spdk_bdev_writev_blocks_with_md(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 2953 struct iovec *iov, int iovcnt, void *md_buf, 2954 uint64_t offset_blocks, uint64_t num_blocks, 2955 spdk_bdev_io_completion_cb cb, void *cb_arg) 2956 { 2957 struct spdk_bdev *bdev = spdk_bdev_desc_get_bdev(desc); 2958 struct spdk_bdev_io *bdev_io; 2959 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 2960 2961 if (!desc->write) { 2962 return -EBADF; 2963 } 2964 2965 if (!spdk_bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) { 2966 return -EINVAL; 2967 } 2968 2969 bdev_io = spdk_bdev_get_io(channel); 2970 if (!bdev_io) { 2971 return -ENOMEM; 2972 } 2973 2974 bdev_io->internal.ch = channel; 2975 bdev_io->internal.desc = desc; 2976 bdev_io->type = SPDK_BDEV_IO_TYPE_WRITE; 2977 bdev_io->u.bdev.iovs = iov; 2978 bdev_io->u.bdev.iovcnt = iovcnt; 2979 bdev_io->u.bdev.md_buf = md_buf; 2980 bdev_io->u.bdev.num_blocks = num_blocks; 2981 bdev_io->u.bdev.offset_blocks = offset_blocks; 2982 spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb); 2983 2984 spdk_bdev_io_submit(bdev_io); 2985 return 0; 2986 } 2987 2988 int 2989 spdk_bdev_writev(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 2990 struct iovec *iov, int iovcnt, 2991 uint64_t offset, uint64_t len, 2992 spdk_bdev_io_completion_cb cb, void *cb_arg) 2993 { 2994 uint64_t offset_blocks, num_blocks; 2995 2996 if (spdk_bdev_bytes_to_blocks(spdk_bdev_desc_get_bdev(desc), offset, &offset_blocks, 2997 len, &num_blocks) != 0) { 2998 return -EINVAL; 2999 } 3000 3001 return spdk_bdev_writev_blocks(desc, ch, iov, iovcnt, offset_blocks, num_blocks, cb, cb_arg); 3002 } 3003 3004 int 3005 spdk_bdev_writev_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 3006 struct iovec *iov, int iovcnt, 3007 uint64_t offset_blocks, uint64_t num_blocks, 3008 spdk_bdev_io_completion_cb cb, void *cb_arg) 3009 { 3010 return _spdk_bdev_writev_blocks_with_md(desc, ch, iov, iovcnt, NULL, offset_blocks, 3011 num_blocks, cb, cb_arg); 3012 } 3013 3014 int 3015 spdk_bdev_writev_blocks_with_md(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 3016 struct iovec *iov, int iovcnt, void *md_buf, 3017 uint64_t offset_blocks, uint64_t num_blocks, 3018 spdk_bdev_io_completion_cb cb, void *cb_arg) 3019 { 3020 if (!spdk_bdev_is_md_separate(spdk_bdev_desc_get_bdev(desc))) { 3021 return -EINVAL; 3022 } 3023 3024 if (!_bdev_io_check_md_buf(iov, md_buf)) { 3025 return -EINVAL; 3026 } 3027 3028 return _spdk_bdev_writev_blocks_with_md(desc, ch, iov, iovcnt, md_buf, offset_blocks, 3029 num_blocks, cb, cb_arg); 3030 } 3031 3032 static void 3033 bdev_zcopy_get_buf(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io, bool success) 3034 { 3035 if (!success) { 3036 /* Don't use spdk_bdev_io_complete here - this bdev_io was never actually submitted. */ 3037 bdev_io->internal.status = SPDK_BDEV_IO_STATUS_NOMEM; 3038 bdev_io->internal.cb(bdev_io, success, bdev_io->internal.caller_ctx); 3039 return; 3040 } 3041 3042 if (bdev_io->u.bdev.zcopy.populate) { 3043 /* Read the real data into the buffer */ 3044 bdev_io->type = SPDK_BDEV_IO_TYPE_READ; 3045 bdev_io->internal.status = SPDK_BDEV_IO_STATUS_PENDING; 3046 spdk_bdev_io_submit(bdev_io); 3047 return; 3048 } 3049 3050 /* Don't use spdk_bdev_io_complete here - this bdev_io was never actually submitted. */ 3051 bdev_io->internal.status = SPDK_BDEV_IO_STATUS_SUCCESS; 3052 bdev_io->internal.cb(bdev_io, success, bdev_io->internal.caller_ctx); 3053 } 3054 3055 int 3056 spdk_bdev_zcopy_start(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 3057 uint64_t offset_blocks, uint64_t num_blocks, 3058 bool populate, 3059 spdk_bdev_io_completion_cb cb, void *cb_arg) 3060 { 3061 struct spdk_bdev *bdev = spdk_bdev_desc_get_bdev(desc); 3062 struct spdk_bdev_io *bdev_io; 3063 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 3064 3065 if (!desc->write) { 3066 return -EBADF; 3067 } 3068 3069 if (!spdk_bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) { 3070 return -EINVAL; 3071 } 3072 3073 if (!spdk_bdev_io_type_supported(bdev, SPDK_BDEV_IO_TYPE_ZCOPY)) { 3074 return -ENOTSUP; 3075 } 3076 3077 bdev_io = spdk_bdev_get_io(channel); 3078 if (!bdev_io) { 3079 return -ENOMEM; 3080 } 3081 3082 bdev_io->internal.ch = channel; 3083 bdev_io->internal.desc = desc; 3084 bdev_io->type = SPDK_BDEV_IO_TYPE_ZCOPY; 3085 bdev_io->u.bdev.num_blocks = num_blocks; 3086 bdev_io->u.bdev.offset_blocks = offset_blocks; 3087 bdev_io->u.bdev.iovs = NULL; 3088 bdev_io->u.bdev.iovcnt = 0; 3089 bdev_io->u.bdev.zcopy.populate = populate ? 1 : 0; 3090 bdev_io->u.bdev.zcopy.commit = 0; 3091 bdev_io->u.bdev.zcopy.start = 1; 3092 spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb); 3093 3094 if (_spdk_bdev_io_type_supported(bdev, SPDK_BDEV_IO_TYPE_ZCOPY)) { 3095 spdk_bdev_io_submit(bdev_io); 3096 } else { 3097 /* Emulate zcopy by allocating a buffer */ 3098 spdk_bdev_io_get_buf(bdev_io, bdev_zcopy_get_buf, 3099 bdev_io->u.bdev.num_blocks * bdev->blocklen); 3100 } 3101 3102 return 0; 3103 } 3104 3105 int 3106 spdk_bdev_zcopy_end(struct spdk_bdev_io *bdev_io, bool commit, 3107 spdk_bdev_io_completion_cb cb, void *cb_arg) 3108 { 3109 struct spdk_bdev *bdev = bdev_io->bdev; 3110 3111 if (bdev_io->type == SPDK_BDEV_IO_TYPE_READ) { 3112 /* This can happen if the zcopy was emulated in start */ 3113 if (bdev_io->u.bdev.zcopy.start != 1) { 3114 return -EINVAL; 3115 } 3116 bdev_io->type = SPDK_BDEV_IO_TYPE_ZCOPY; 3117 } 3118 3119 if (bdev_io->type != SPDK_BDEV_IO_TYPE_ZCOPY) { 3120 return -EINVAL; 3121 } 3122 3123 bdev_io->u.bdev.zcopy.commit = commit ? 1 : 0; 3124 bdev_io->u.bdev.zcopy.start = 0; 3125 bdev_io->internal.caller_ctx = cb_arg; 3126 bdev_io->internal.cb = cb; 3127 bdev_io->internal.status = SPDK_BDEV_IO_STATUS_PENDING; 3128 3129 if (_spdk_bdev_io_type_supported(bdev, SPDK_BDEV_IO_TYPE_ZCOPY)) { 3130 spdk_bdev_io_submit(bdev_io); 3131 return 0; 3132 } 3133 3134 if (!bdev_io->u.bdev.zcopy.commit) { 3135 /* Don't use spdk_bdev_io_complete here - this bdev_io was never actually submitted. */ 3136 bdev_io->internal.status = SPDK_BDEV_IO_STATUS_SUCCESS; 3137 bdev_io->internal.cb(bdev_io, true, bdev_io->internal.caller_ctx); 3138 return 0; 3139 } 3140 3141 bdev_io->type = SPDK_BDEV_IO_TYPE_WRITE; 3142 spdk_bdev_io_submit(bdev_io); 3143 3144 return 0; 3145 } 3146 3147 int 3148 spdk_bdev_write_zeroes(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 3149 uint64_t offset, uint64_t len, 3150 spdk_bdev_io_completion_cb cb, void *cb_arg) 3151 { 3152 uint64_t offset_blocks, num_blocks; 3153 3154 if (spdk_bdev_bytes_to_blocks(spdk_bdev_desc_get_bdev(desc), offset, &offset_blocks, 3155 len, &num_blocks) != 0) { 3156 return -EINVAL; 3157 } 3158 3159 return spdk_bdev_write_zeroes_blocks(desc, ch, offset_blocks, num_blocks, cb, cb_arg); 3160 } 3161 3162 int 3163 spdk_bdev_write_zeroes_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 3164 uint64_t offset_blocks, uint64_t num_blocks, 3165 spdk_bdev_io_completion_cb cb, void *cb_arg) 3166 { 3167 struct spdk_bdev *bdev = spdk_bdev_desc_get_bdev(desc); 3168 struct spdk_bdev_io *bdev_io; 3169 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 3170 3171 if (!desc->write) { 3172 return -EBADF; 3173 } 3174 3175 if (!spdk_bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) { 3176 return -EINVAL; 3177 } 3178 3179 if (!_spdk_bdev_io_type_supported(bdev, SPDK_BDEV_IO_TYPE_WRITE_ZEROES) && 3180 !_spdk_bdev_io_type_supported(bdev, SPDK_BDEV_IO_TYPE_WRITE)) { 3181 return -ENOTSUP; 3182 } 3183 3184 bdev_io = spdk_bdev_get_io(channel); 3185 3186 if (!bdev_io) { 3187 return -ENOMEM; 3188 } 3189 3190 bdev_io->type = SPDK_BDEV_IO_TYPE_WRITE_ZEROES; 3191 bdev_io->internal.ch = channel; 3192 bdev_io->internal.desc = desc; 3193 bdev_io->u.bdev.offset_blocks = offset_blocks; 3194 bdev_io->u.bdev.num_blocks = num_blocks; 3195 spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb); 3196 3197 if (_spdk_bdev_io_type_supported(bdev, SPDK_BDEV_IO_TYPE_WRITE_ZEROES)) { 3198 spdk_bdev_io_submit(bdev_io); 3199 return 0; 3200 } 3201 3202 assert(_spdk_bdev_io_type_supported(bdev, SPDK_BDEV_IO_TYPE_WRITE)); 3203 assert(_bdev_get_block_size_with_md(bdev) <= ZERO_BUFFER_SIZE); 3204 bdev_io->u.bdev.split_remaining_num_blocks = num_blocks; 3205 bdev_io->u.bdev.split_current_offset_blocks = offset_blocks; 3206 _spdk_bdev_write_zero_buffer_next(bdev_io); 3207 3208 return 0; 3209 } 3210 3211 int 3212 spdk_bdev_unmap(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 3213 uint64_t offset, uint64_t nbytes, 3214 spdk_bdev_io_completion_cb cb, void *cb_arg) 3215 { 3216 uint64_t offset_blocks, num_blocks; 3217 3218 if (spdk_bdev_bytes_to_blocks(spdk_bdev_desc_get_bdev(desc), offset, &offset_blocks, 3219 nbytes, &num_blocks) != 0) { 3220 return -EINVAL; 3221 } 3222 3223 return spdk_bdev_unmap_blocks(desc, ch, offset_blocks, num_blocks, cb, cb_arg); 3224 } 3225 3226 int 3227 spdk_bdev_unmap_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 3228 uint64_t offset_blocks, uint64_t num_blocks, 3229 spdk_bdev_io_completion_cb cb, void *cb_arg) 3230 { 3231 struct spdk_bdev *bdev = spdk_bdev_desc_get_bdev(desc); 3232 struct spdk_bdev_io *bdev_io; 3233 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 3234 3235 if (!desc->write) { 3236 return -EBADF; 3237 } 3238 3239 if (!spdk_bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) { 3240 return -EINVAL; 3241 } 3242 3243 if (num_blocks == 0) { 3244 SPDK_ERRLOG("Can't unmap 0 bytes\n"); 3245 return -EINVAL; 3246 } 3247 3248 bdev_io = spdk_bdev_get_io(channel); 3249 if (!bdev_io) { 3250 return -ENOMEM; 3251 } 3252 3253 bdev_io->internal.ch = channel; 3254 bdev_io->internal.desc = desc; 3255 bdev_io->type = SPDK_BDEV_IO_TYPE_UNMAP; 3256 3257 bdev_io->u.bdev.iovs = &bdev_io->iov; 3258 bdev_io->u.bdev.iovs[0].iov_base = NULL; 3259 bdev_io->u.bdev.iovs[0].iov_len = 0; 3260 bdev_io->u.bdev.iovcnt = 1; 3261 3262 bdev_io->u.bdev.offset_blocks = offset_blocks; 3263 bdev_io->u.bdev.num_blocks = num_blocks; 3264 spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb); 3265 3266 spdk_bdev_io_submit(bdev_io); 3267 return 0; 3268 } 3269 3270 int 3271 spdk_bdev_flush(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 3272 uint64_t offset, uint64_t length, 3273 spdk_bdev_io_completion_cb cb, void *cb_arg) 3274 { 3275 uint64_t offset_blocks, num_blocks; 3276 3277 if (spdk_bdev_bytes_to_blocks(spdk_bdev_desc_get_bdev(desc), offset, &offset_blocks, 3278 length, &num_blocks) != 0) { 3279 return -EINVAL; 3280 } 3281 3282 return spdk_bdev_flush_blocks(desc, ch, offset_blocks, num_blocks, cb, cb_arg); 3283 } 3284 3285 int 3286 spdk_bdev_flush_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 3287 uint64_t offset_blocks, uint64_t num_blocks, 3288 spdk_bdev_io_completion_cb cb, void *cb_arg) 3289 { 3290 struct spdk_bdev *bdev = spdk_bdev_desc_get_bdev(desc); 3291 struct spdk_bdev_io *bdev_io; 3292 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 3293 3294 if (!desc->write) { 3295 return -EBADF; 3296 } 3297 3298 if (!spdk_bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) { 3299 return -EINVAL; 3300 } 3301 3302 bdev_io = spdk_bdev_get_io(channel); 3303 if (!bdev_io) { 3304 return -ENOMEM; 3305 } 3306 3307 bdev_io->internal.ch = channel; 3308 bdev_io->internal.desc = desc; 3309 bdev_io->type = SPDK_BDEV_IO_TYPE_FLUSH; 3310 bdev_io->u.bdev.iovs = NULL; 3311 bdev_io->u.bdev.iovcnt = 0; 3312 bdev_io->u.bdev.offset_blocks = offset_blocks; 3313 bdev_io->u.bdev.num_blocks = num_blocks; 3314 spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb); 3315 3316 spdk_bdev_io_submit(bdev_io); 3317 return 0; 3318 } 3319 3320 static void 3321 _spdk_bdev_reset_dev(struct spdk_io_channel_iter *i, int status) 3322 { 3323 struct spdk_bdev_channel *ch = spdk_io_channel_iter_get_ctx(i); 3324 struct spdk_bdev_io *bdev_io; 3325 3326 bdev_io = TAILQ_FIRST(&ch->queued_resets); 3327 TAILQ_REMOVE(&ch->queued_resets, bdev_io, internal.link); 3328 spdk_bdev_io_submit_reset(bdev_io); 3329 } 3330 3331 static void 3332 _spdk_bdev_reset_freeze_channel(struct spdk_io_channel_iter *i) 3333 { 3334 struct spdk_io_channel *ch; 3335 struct spdk_bdev_channel *channel; 3336 struct spdk_bdev_mgmt_channel *mgmt_channel; 3337 struct spdk_bdev_shared_resource *shared_resource; 3338 bdev_io_tailq_t tmp_queued; 3339 3340 TAILQ_INIT(&tmp_queued); 3341 3342 ch = spdk_io_channel_iter_get_channel(i); 3343 channel = spdk_io_channel_get_ctx(ch); 3344 shared_resource = channel->shared_resource; 3345 mgmt_channel = shared_resource->mgmt_ch; 3346 3347 channel->flags |= BDEV_CH_RESET_IN_PROGRESS; 3348 3349 if ((channel->flags & BDEV_CH_QOS_ENABLED) != 0) { 3350 /* The QoS object is always valid and readable while 3351 * the channel flag is set, so the lock here should not 3352 * be necessary. We're not in the fast path though, so 3353 * just take it anyway. */ 3354 pthread_mutex_lock(&channel->bdev->internal.mutex); 3355 if (channel->bdev->internal.qos->ch == channel) { 3356 TAILQ_SWAP(&channel->bdev->internal.qos->queued, &tmp_queued, spdk_bdev_io, internal.link); 3357 } 3358 pthread_mutex_unlock(&channel->bdev->internal.mutex); 3359 } 3360 3361 _spdk_bdev_abort_queued_io(&shared_resource->nomem_io, channel); 3362 _spdk_bdev_abort_buf_io(&mgmt_channel->need_buf_small, channel); 3363 _spdk_bdev_abort_buf_io(&mgmt_channel->need_buf_large, channel); 3364 _spdk_bdev_abort_queued_io(&tmp_queued, channel); 3365 3366 spdk_for_each_channel_continue(i, 0); 3367 } 3368 3369 static void 3370 _spdk_bdev_start_reset(void *ctx) 3371 { 3372 struct spdk_bdev_channel *ch = ctx; 3373 3374 spdk_for_each_channel(__bdev_to_io_dev(ch->bdev), _spdk_bdev_reset_freeze_channel, 3375 ch, _spdk_bdev_reset_dev); 3376 } 3377 3378 static void 3379 _spdk_bdev_channel_start_reset(struct spdk_bdev_channel *ch) 3380 { 3381 struct spdk_bdev *bdev = ch->bdev; 3382 3383 assert(!TAILQ_EMPTY(&ch->queued_resets)); 3384 3385 pthread_mutex_lock(&bdev->internal.mutex); 3386 if (bdev->internal.reset_in_progress == NULL) { 3387 bdev->internal.reset_in_progress = TAILQ_FIRST(&ch->queued_resets); 3388 /* 3389 * Take a channel reference for the target bdev for the life of this 3390 * reset. This guards against the channel getting destroyed while 3391 * spdk_for_each_channel() calls related to this reset IO are in 3392 * progress. We will release the reference when this reset is 3393 * completed. 3394 */ 3395 bdev->internal.reset_in_progress->u.reset.ch_ref = spdk_get_io_channel(__bdev_to_io_dev(bdev)); 3396 _spdk_bdev_start_reset(ch); 3397 } 3398 pthread_mutex_unlock(&bdev->internal.mutex); 3399 } 3400 3401 int 3402 spdk_bdev_reset(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 3403 spdk_bdev_io_completion_cb cb, void *cb_arg) 3404 { 3405 struct spdk_bdev *bdev = spdk_bdev_desc_get_bdev(desc); 3406 struct spdk_bdev_io *bdev_io; 3407 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 3408 3409 bdev_io = spdk_bdev_get_io(channel); 3410 if (!bdev_io) { 3411 return -ENOMEM; 3412 } 3413 3414 bdev_io->internal.ch = channel; 3415 bdev_io->internal.desc = desc; 3416 bdev_io->type = SPDK_BDEV_IO_TYPE_RESET; 3417 bdev_io->u.reset.ch_ref = NULL; 3418 spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb); 3419 3420 pthread_mutex_lock(&bdev->internal.mutex); 3421 TAILQ_INSERT_TAIL(&channel->queued_resets, bdev_io, internal.link); 3422 pthread_mutex_unlock(&bdev->internal.mutex); 3423 3424 _spdk_bdev_channel_start_reset(channel); 3425 3426 return 0; 3427 } 3428 3429 void 3430 spdk_bdev_get_io_stat(struct spdk_bdev *bdev, struct spdk_io_channel *ch, 3431 struct spdk_bdev_io_stat *stat) 3432 { 3433 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 3434 3435 *stat = channel->stat; 3436 } 3437 3438 static void 3439 _spdk_bdev_get_device_stat_done(struct spdk_io_channel_iter *i, int status) 3440 { 3441 void *io_device = spdk_io_channel_iter_get_io_device(i); 3442 struct spdk_bdev_iostat_ctx *bdev_iostat_ctx = spdk_io_channel_iter_get_ctx(i); 3443 3444 bdev_iostat_ctx->cb(__bdev_from_io_dev(io_device), bdev_iostat_ctx->stat, 3445 bdev_iostat_ctx->cb_arg, 0); 3446 free(bdev_iostat_ctx); 3447 } 3448 3449 static void 3450 _spdk_bdev_get_each_channel_stat(struct spdk_io_channel_iter *i) 3451 { 3452 struct spdk_bdev_iostat_ctx *bdev_iostat_ctx = spdk_io_channel_iter_get_ctx(i); 3453 struct spdk_io_channel *ch = spdk_io_channel_iter_get_channel(i); 3454 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 3455 3456 _spdk_bdev_io_stat_add(bdev_iostat_ctx->stat, &channel->stat); 3457 spdk_for_each_channel_continue(i, 0); 3458 } 3459 3460 void 3461 spdk_bdev_get_device_stat(struct spdk_bdev *bdev, struct spdk_bdev_io_stat *stat, 3462 spdk_bdev_get_device_stat_cb cb, void *cb_arg) 3463 { 3464 struct spdk_bdev_iostat_ctx *bdev_iostat_ctx; 3465 3466 assert(bdev != NULL); 3467 assert(stat != NULL); 3468 assert(cb != NULL); 3469 3470 bdev_iostat_ctx = calloc(1, sizeof(struct spdk_bdev_iostat_ctx)); 3471 if (bdev_iostat_ctx == NULL) { 3472 SPDK_ERRLOG("Unable to allocate memory for spdk_bdev_iostat_ctx\n"); 3473 cb(bdev, stat, cb_arg, -ENOMEM); 3474 return; 3475 } 3476 3477 bdev_iostat_ctx->stat = stat; 3478 bdev_iostat_ctx->cb = cb; 3479 bdev_iostat_ctx->cb_arg = cb_arg; 3480 3481 /* Start with the statistics from previously deleted channels. */ 3482 pthread_mutex_lock(&bdev->internal.mutex); 3483 _spdk_bdev_io_stat_add(bdev_iostat_ctx->stat, &bdev->internal.stat); 3484 pthread_mutex_unlock(&bdev->internal.mutex); 3485 3486 /* Then iterate and add the statistics from each existing channel. */ 3487 spdk_for_each_channel(__bdev_to_io_dev(bdev), 3488 _spdk_bdev_get_each_channel_stat, 3489 bdev_iostat_ctx, 3490 _spdk_bdev_get_device_stat_done); 3491 } 3492 3493 int 3494 spdk_bdev_nvme_admin_passthru(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 3495 const struct spdk_nvme_cmd *cmd, void *buf, size_t nbytes, 3496 spdk_bdev_io_completion_cb cb, void *cb_arg) 3497 { 3498 struct spdk_bdev *bdev = spdk_bdev_desc_get_bdev(desc); 3499 struct spdk_bdev_io *bdev_io; 3500 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 3501 3502 if (!desc->write) { 3503 return -EBADF; 3504 } 3505 3506 bdev_io = spdk_bdev_get_io(channel); 3507 if (!bdev_io) { 3508 return -ENOMEM; 3509 } 3510 3511 bdev_io->internal.ch = channel; 3512 bdev_io->internal.desc = desc; 3513 bdev_io->type = SPDK_BDEV_IO_TYPE_NVME_ADMIN; 3514 bdev_io->u.nvme_passthru.cmd = *cmd; 3515 bdev_io->u.nvme_passthru.buf = buf; 3516 bdev_io->u.nvme_passthru.nbytes = nbytes; 3517 bdev_io->u.nvme_passthru.md_buf = NULL; 3518 bdev_io->u.nvme_passthru.md_len = 0; 3519 3520 spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb); 3521 3522 spdk_bdev_io_submit(bdev_io); 3523 return 0; 3524 } 3525 3526 int 3527 spdk_bdev_nvme_io_passthru(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 3528 const struct spdk_nvme_cmd *cmd, void *buf, size_t nbytes, 3529 spdk_bdev_io_completion_cb cb, void *cb_arg) 3530 { 3531 struct spdk_bdev *bdev = spdk_bdev_desc_get_bdev(desc); 3532 struct spdk_bdev_io *bdev_io; 3533 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 3534 3535 if (!desc->write) { 3536 /* 3537 * Do not try to parse the NVMe command - we could maybe use bits in the opcode 3538 * to easily determine if the command is a read or write, but for now just 3539 * do not allow io_passthru with a read-only descriptor. 3540 */ 3541 return -EBADF; 3542 } 3543 3544 bdev_io = spdk_bdev_get_io(channel); 3545 if (!bdev_io) { 3546 return -ENOMEM; 3547 } 3548 3549 bdev_io->internal.ch = channel; 3550 bdev_io->internal.desc = desc; 3551 bdev_io->type = SPDK_BDEV_IO_TYPE_NVME_IO; 3552 bdev_io->u.nvme_passthru.cmd = *cmd; 3553 bdev_io->u.nvme_passthru.buf = buf; 3554 bdev_io->u.nvme_passthru.nbytes = nbytes; 3555 bdev_io->u.nvme_passthru.md_buf = NULL; 3556 bdev_io->u.nvme_passthru.md_len = 0; 3557 3558 spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb); 3559 3560 spdk_bdev_io_submit(bdev_io); 3561 return 0; 3562 } 3563 3564 int 3565 spdk_bdev_nvme_io_passthru_md(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 3566 const struct spdk_nvme_cmd *cmd, void *buf, size_t nbytes, void *md_buf, size_t md_len, 3567 spdk_bdev_io_completion_cb cb, void *cb_arg) 3568 { 3569 struct spdk_bdev *bdev = spdk_bdev_desc_get_bdev(desc); 3570 struct spdk_bdev_io *bdev_io; 3571 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 3572 3573 if (!desc->write) { 3574 /* 3575 * Do not try to parse the NVMe command - we could maybe use bits in the opcode 3576 * to easily determine if the command is a read or write, but for now just 3577 * do not allow io_passthru with a read-only descriptor. 3578 */ 3579 return -EBADF; 3580 } 3581 3582 bdev_io = spdk_bdev_get_io(channel); 3583 if (!bdev_io) { 3584 return -ENOMEM; 3585 } 3586 3587 bdev_io->internal.ch = channel; 3588 bdev_io->internal.desc = desc; 3589 bdev_io->type = SPDK_BDEV_IO_TYPE_NVME_IO_MD; 3590 bdev_io->u.nvme_passthru.cmd = *cmd; 3591 bdev_io->u.nvme_passthru.buf = buf; 3592 bdev_io->u.nvme_passthru.nbytes = nbytes; 3593 bdev_io->u.nvme_passthru.md_buf = md_buf; 3594 bdev_io->u.nvme_passthru.md_len = md_len; 3595 3596 spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb); 3597 3598 spdk_bdev_io_submit(bdev_io); 3599 return 0; 3600 } 3601 3602 int 3603 spdk_bdev_queue_io_wait(struct spdk_bdev *bdev, struct spdk_io_channel *ch, 3604 struct spdk_bdev_io_wait_entry *entry) 3605 { 3606 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 3607 struct spdk_bdev_mgmt_channel *mgmt_ch = channel->shared_resource->mgmt_ch; 3608 3609 if (bdev != entry->bdev) { 3610 SPDK_ERRLOG("bdevs do not match\n"); 3611 return -EINVAL; 3612 } 3613 3614 if (mgmt_ch->per_thread_cache_count > 0) { 3615 SPDK_ERRLOG("Cannot queue io_wait if spdk_bdev_io available in per-thread cache\n"); 3616 return -EINVAL; 3617 } 3618 3619 TAILQ_INSERT_TAIL(&mgmt_ch->io_wait_queue, entry, link); 3620 return 0; 3621 } 3622 3623 static void 3624 _spdk_bdev_ch_retry_io(struct spdk_bdev_channel *bdev_ch) 3625 { 3626 struct spdk_bdev *bdev = bdev_ch->bdev; 3627 struct spdk_bdev_shared_resource *shared_resource = bdev_ch->shared_resource; 3628 struct spdk_bdev_io *bdev_io; 3629 3630 if (shared_resource->io_outstanding > shared_resource->nomem_threshold) { 3631 /* 3632 * Allow some more I/O to complete before retrying the nomem_io queue. 3633 * Some drivers (such as nvme) cannot immediately take a new I/O in 3634 * the context of a completion, because the resources for the I/O are 3635 * not released until control returns to the bdev poller. Also, we 3636 * may require several small I/O to complete before a larger I/O 3637 * (that requires splitting) can be submitted. 3638 */ 3639 return; 3640 } 3641 3642 while (!TAILQ_EMPTY(&shared_resource->nomem_io)) { 3643 bdev_io = TAILQ_FIRST(&shared_resource->nomem_io); 3644 TAILQ_REMOVE(&shared_resource->nomem_io, bdev_io, internal.link); 3645 bdev_io->internal.ch->io_outstanding++; 3646 shared_resource->io_outstanding++; 3647 bdev_io->internal.status = SPDK_BDEV_IO_STATUS_PENDING; 3648 bdev->fn_table->submit_request(spdk_bdev_io_get_io_channel(bdev_io), bdev_io); 3649 if (bdev_io->internal.status == SPDK_BDEV_IO_STATUS_NOMEM) { 3650 break; 3651 } 3652 } 3653 } 3654 3655 static inline void 3656 _spdk_bdev_io_complete(void *ctx) 3657 { 3658 struct spdk_bdev_io *bdev_io = ctx; 3659 uint64_t tsc, tsc_diff; 3660 3661 if (spdk_unlikely(bdev_io->internal.in_submit_request || bdev_io->internal.io_submit_ch)) { 3662 /* 3663 * Send the completion to the thread that originally submitted the I/O, 3664 * which may not be the current thread in the case of QoS. 3665 */ 3666 if (bdev_io->internal.io_submit_ch) { 3667 bdev_io->internal.ch = bdev_io->internal.io_submit_ch; 3668 bdev_io->internal.io_submit_ch = NULL; 3669 } 3670 3671 /* 3672 * Defer completion to avoid potential infinite recursion if the 3673 * user's completion callback issues a new I/O. 3674 */ 3675 spdk_thread_send_msg(spdk_bdev_io_get_thread(bdev_io), 3676 _spdk_bdev_io_complete, bdev_io); 3677 return; 3678 } 3679 3680 tsc = spdk_get_ticks(); 3681 tsc_diff = tsc - bdev_io->internal.submit_tsc; 3682 spdk_trace_record_tsc(tsc, TRACE_BDEV_IO_DONE, 0, 0, (uintptr_t)bdev_io, 0); 3683 3684 if (bdev_io->internal.ch->histogram) { 3685 spdk_histogram_data_tally(bdev_io->internal.ch->histogram, tsc_diff); 3686 } 3687 3688 if (bdev_io->internal.status == SPDK_BDEV_IO_STATUS_SUCCESS) { 3689 switch (bdev_io->type) { 3690 case SPDK_BDEV_IO_TYPE_READ: 3691 bdev_io->internal.ch->stat.bytes_read += bdev_io->u.bdev.num_blocks * bdev_io->bdev->blocklen; 3692 bdev_io->internal.ch->stat.num_read_ops++; 3693 bdev_io->internal.ch->stat.read_latency_ticks += tsc_diff; 3694 break; 3695 case SPDK_BDEV_IO_TYPE_WRITE: 3696 bdev_io->internal.ch->stat.bytes_written += bdev_io->u.bdev.num_blocks * bdev_io->bdev->blocklen; 3697 bdev_io->internal.ch->stat.num_write_ops++; 3698 bdev_io->internal.ch->stat.write_latency_ticks += tsc_diff; 3699 break; 3700 case SPDK_BDEV_IO_TYPE_UNMAP: 3701 bdev_io->internal.ch->stat.bytes_unmapped += bdev_io->u.bdev.num_blocks * bdev_io->bdev->blocklen; 3702 bdev_io->internal.ch->stat.num_unmap_ops++; 3703 bdev_io->internal.ch->stat.unmap_latency_ticks += tsc_diff; 3704 default: 3705 break; 3706 } 3707 } 3708 3709 #ifdef SPDK_CONFIG_VTUNE 3710 uint64_t now_tsc = spdk_get_ticks(); 3711 if (now_tsc > (bdev_io->internal.ch->start_tsc + bdev_io->internal.ch->interval_tsc)) { 3712 uint64_t data[5]; 3713 3714 data[0] = bdev_io->internal.ch->stat.num_read_ops - bdev_io->internal.ch->prev_stat.num_read_ops; 3715 data[1] = bdev_io->internal.ch->stat.bytes_read - bdev_io->internal.ch->prev_stat.bytes_read; 3716 data[2] = bdev_io->internal.ch->stat.num_write_ops - bdev_io->internal.ch->prev_stat.num_write_ops; 3717 data[3] = bdev_io->internal.ch->stat.bytes_written - bdev_io->internal.ch->prev_stat.bytes_written; 3718 data[4] = bdev_io->bdev->fn_table->get_spin_time ? 3719 bdev_io->bdev->fn_table->get_spin_time(spdk_bdev_io_get_io_channel(bdev_io)) : 0; 3720 3721 __itt_metadata_add(g_bdev_mgr.domain, __itt_null, bdev_io->internal.ch->handle, 3722 __itt_metadata_u64, 5, data); 3723 3724 bdev_io->internal.ch->prev_stat = bdev_io->internal.ch->stat; 3725 bdev_io->internal.ch->start_tsc = now_tsc; 3726 } 3727 #endif 3728 3729 assert(bdev_io->internal.cb != NULL); 3730 assert(spdk_get_thread() == spdk_bdev_io_get_thread(bdev_io)); 3731 3732 bdev_io->internal.cb(bdev_io, bdev_io->internal.status == SPDK_BDEV_IO_STATUS_SUCCESS, 3733 bdev_io->internal.caller_ctx); 3734 } 3735 3736 static void 3737 _spdk_bdev_reset_complete(struct spdk_io_channel_iter *i, int status) 3738 { 3739 struct spdk_bdev_io *bdev_io = spdk_io_channel_iter_get_ctx(i); 3740 3741 if (bdev_io->u.reset.ch_ref != NULL) { 3742 spdk_put_io_channel(bdev_io->u.reset.ch_ref); 3743 bdev_io->u.reset.ch_ref = NULL; 3744 } 3745 3746 _spdk_bdev_io_complete(bdev_io); 3747 } 3748 3749 static void 3750 _spdk_bdev_unfreeze_channel(struct spdk_io_channel_iter *i) 3751 { 3752 struct spdk_io_channel *_ch = spdk_io_channel_iter_get_channel(i); 3753 struct spdk_bdev_channel *ch = spdk_io_channel_get_ctx(_ch); 3754 3755 ch->flags &= ~BDEV_CH_RESET_IN_PROGRESS; 3756 if (!TAILQ_EMPTY(&ch->queued_resets)) { 3757 _spdk_bdev_channel_start_reset(ch); 3758 } 3759 3760 spdk_for_each_channel_continue(i, 0); 3761 } 3762 3763 void 3764 spdk_bdev_io_complete(struct spdk_bdev_io *bdev_io, enum spdk_bdev_io_status status) 3765 { 3766 struct spdk_bdev *bdev = bdev_io->bdev; 3767 struct spdk_bdev_channel *bdev_ch = bdev_io->internal.ch; 3768 struct spdk_bdev_shared_resource *shared_resource = bdev_ch->shared_resource; 3769 3770 bdev_io->internal.status = status; 3771 3772 if (spdk_unlikely(bdev_io->type == SPDK_BDEV_IO_TYPE_RESET)) { 3773 bool unlock_channels = false; 3774 3775 if (status == SPDK_BDEV_IO_STATUS_NOMEM) { 3776 SPDK_ERRLOG("NOMEM returned for reset\n"); 3777 } 3778 pthread_mutex_lock(&bdev->internal.mutex); 3779 if (bdev_io == bdev->internal.reset_in_progress) { 3780 bdev->internal.reset_in_progress = NULL; 3781 unlock_channels = true; 3782 } 3783 pthread_mutex_unlock(&bdev->internal.mutex); 3784 3785 if (unlock_channels) { 3786 spdk_for_each_channel(__bdev_to_io_dev(bdev), _spdk_bdev_unfreeze_channel, 3787 bdev_io, _spdk_bdev_reset_complete); 3788 return; 3789 } 3790 } else { 3791 _bdev_io_unset_bounce_buf(bdev_io); 3792 3793 assert(bdev_ch->io_outstanding > 0); 3794 assert(shared_resource->io_outstanding > 0); 3795 bdev_ch->io_outstanding--; 3796 shared_resource->io_outstanding--; 3797 3798 if (spdk_unlikely(status == SPDK_BDEV_IO_STATUS_NOMEM)) { 3799 TAILQ_INSERT_HEAD(&shared_resource->nomem_io, bdev_io, internal.link); 3800 /* 3801 * Wait for some of the outstanding I/O to complete before we 3802 * retry any of the nomem_io. Normally we will wait for 3803 * NOMEM_THRESHOLD_COUNT I/O to complete but for low queue 3804 * depth channels we will instead wait for half to complete. 3805 */ 3806 shared_resource->nomem_threshold = spdk_max((int64_t)shared_resource->io_outstanding / 2, 3807 (int64_t)shared_resource->io_outstanding - NOMEM_THRESHOLD_COUNT); 3808 return; 3809 } 3810 3811 if (spdk_unlikely(!TAILQ_EMPTY(&shared_resource->nomem_io))) { 3812 _spdk_bdev_ch_retry_io(bdev_ch); 3813 } 3814 } 3815 3816 _spdk_bdev_io_complete(bdev_io); 3817 } 3818 3819 void 3820 spdk_bdev_io_complete_scsi_status(struct spdk_bdev_io *bdev_io, enum spdk_scsi_status sc, 3821 enum spdk_scsi_sense sk, uint8_t asc, uint8_t ascq) 3822 { 3823 if (sc == SPDK_SCSI_STATUS_GOOD) { 3824 bdev_io->internal.status = SPDK_BDEV_IO_STATUS_SUCCESS; 3825 } else { 3826 bdev_io->internal.status = SPDK_BDEV_IO_STATUS_SCSI_ERROR; 3827 bdev_io->internal.error.scsi.sc = sc; 3828 bdev_io->internal.error.scsi.sk = sk; 3829 bdev_io->internal.error.scsi.asc = asc; 3830 bdev_io->internal.error.scsi.ascq = ascq; 3831 } 3832 3833 spdk_bdev_io_complete(bdev_io, bdev_io->internal.status); 3834 } 3835 3836 void 3837 spdk_bdev_io_get_scsi_status(const struct spdk_bdev_io *bdev_io, 3838 int *sc, int *sk, int *asc, int *ascq) 3839 { 3840 assert(sc != NULL); 3841 assert(sk != NULL); 3842 assert(asc != NULL); 3843 assert(ascq != NULL); 3844 3845 switch (bdev_io->internal.status) { 3846 case SPDK_BDEV_IO_STATUS_SUCCESS: 3847 *sc = SPDK_SCSI_STATUS_GOOD; 3848 *sk = SPDK_SCSI_SENSE_NO_SENSE; 3849 *asc = SPDK_SCSI_ASC_NO_ADDITIONAL_SENSE; 3850 *ascq = SPDK_SCSI_ASCQ_CAUSE_NOT_REPORTABLE; 3851 break; 3852 case SPDK_BDEV_IO_STATUS_NVME_ERROR: 3853 spdk_scsi_nvme_translate(bdev_io, sc, sk, asc, ascq); 3854 break; 3855 case SPDK_BDEV_IO_STATUS_SCSI_ERROR: 3856 *sc = bdev_io->internal.error.scsi.sc; 3857 *sk = bdev_io->internal.error.scsi.sk; 3858 *asc = bdev_io->internal.error.scsi.asc; 3859 *ascq = bdev_io->internal.error.scsi.ascq; 3860 break; 3861 default: 3862 *sc = SPDK_SCSI_STATUS_CHECK_CONDITION; 3863 *sk = SPDK_SCSI_SENSE_ABORTED_COMMAND; 3864 *asc = SPDK_SCSI_ASC_NO_ADDITIONAL_SENSE; 3865 *ascq = SPDK_SCSI_ASCQ_CAUSE_NOT_REPORTABLE; 3866 break; 3867 } 3868 } 3869 3870 void 3871 spdk_bdev_io_complete_nvme_status(struct spdk_bdev_io *bdev_io, int sct, int sc) 3872 { 3873 if (sct == SPDK_NVME_SCT_GENERIC && sc == SPDK_NVME_SC_SUCCESS) { 3874 bdev_io->internal.status = SPDK_BDEV_IO_STATUS_SUCCESS; 3875 } else { 3876 bdev_io->internal.error.nvme.sct = sct; 3877 bdev_io->internal.error.nvme.sc = sc; 3878 bdev_io->internal.status = SPDK_BDEV_IO_STATUS_NVME_ERROR; 3879 } 3880 3881 spdk_bdev_io_complete(bdev_io, bdev_io->internal.status); 3882 } 3883 3884 void 3885 spdk_bdev_io_get_nvme_status(const struct spdk_bdev_io *bdev_io, int *sct, int *sc) 3886 { 3887 assert(sct != NULL); 3888 assert(sc != NULL); 3889 3890 if (bdev_io->internal.status == SPDK_BDEV_IO_STATUS_NVME_ERROR) { 3891 *sct = bdev_io->internal.error.nvme.sct; 3892 *sc = bdev_io->internal.error.nvme.sc; 3893 } else if (bdev_io->internal.status == SPDK_BDEV_IO_STATUS_SUCCESS) { 3894 *sct = SPDK_NVME_SCT_GENERIC; 3895 *sc = SPDK_NVME_SC_SUCCESS; 3896 } else { 3897 *sct = SPDK_NVME_SCT_GENERIC; 3898 *sc = SPDK_NVME_SC_INTERNAL_DEVICE_ERROR; 3899 } 3900 } 3901 3902 struct spdk_thread * 3903 spdk_bdev_io_get_thread(struct spdk_bdev_io *bdev_io) 3904 { 3905 return spdk_io_channel_get_thread(bdev_io->internal.ch->channel); 3906 } 3907 3908 struct spdk_io_channel * 3909 spdk_bdev_io_get_io_channel(struct spdk_bdev_io *bdev_io) 3910 { 3911 return bdev_io->internal.ch->channel; 3912 } 3913 3914 static void 3915 _spdk_bdev_qos_config_limit(struct spdk_bdev *bdev, uint64_t *limits) 3916 { 3917 uint64_t min_qos_set; 3918 int i; 3919 3920 for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) { 3921 if (limits[i] != SPDK_BDEV_QOS_LIMIT_NOT_DEFINED) { 3922 break; 3923 } 3924 } 3925 3926 if (i == SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES) { 3927 SPDK_ERRLOG("Invalid rate limits set.\n"); 3928 return; 3929 } 3930 3931 for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) { 3932 if (limits[i] == SPDK_BDEV_QOS_LIMIT_NOT_DEFINED) { 3933 continue; 3934 } 3935 3936 if (_spdk_bdev_qos_is_iops_rate_limit(i) == true) { 3937 min_qos_set = SPDK_BDEV_QOS_MIN_IOS_PER_SEC; 3938 } else { 3939 min_qos_set = SPDK_BDEV_QOS_MIN_BYTES_PER_SEC; 3940 } 3941 3942 if (limits[i] == 0 || limits[i] % min_qos_set) { 3943 SPDK_ERRLOG("Assigned limit %" PRIu64 " on bdev %s is not multiple of %" PRIu64 "\n", 3944 limits[i], bdev->name, min_qos_set); 3945 SPDK_ERRLOG("Failed to enable QoS on this bdev %s\n", bdev->name); 3946 return; 3947 } 3948 } 3949 3950 if (!bdev->internal.qos) { 3951 bdev->internal.qos = calloc(1, sizeof(*bdev->internal.qos)); 3952 if (!bdev->internal.qos) { 3953 SPDK_ERRLOG("Unable to allocate memory for QoS tracking\n"); 3954 return; 3955 } 3956 } 3957 3958 for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) { 3959 bdev->internal.qos->rate_limits[i].limit = limits[i]; 3960 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Bdev:%s QoS type:%d set:%lu\n", 3961 bdev->name, i, limits[i]); 3962 } 3963 3964 return; 3965 } 3966 3967 static void 3968 _spdk_bdev_qos_config(struct spdk_bdev *bdev) 3969 { 3970 struct spdk_conf_section *sp = NULL; 3971 const char *val = NULL; 3972 int i = 0, j = 0; 3973 uint64_t limits[SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES] = {}; 3974 bool config_qos = false; 3975 3976 sp = spdk_conf_find_section(NULL, "QoS"); 3977 if (!sp) { 3978 return; 3979 } 3980 3981 while (j < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES) { 3982 limits[j] = SPDK_BDEV_QOS_LIMIT_NOT_DEFINED; 3983 3984 i = 0; 3985 while (true) { 3986 val = spdk_conf_section_get_nmval(sp, qos_conf_type[j], i, 0); 3987 if (!val) { 3988 break; 3989 } 3990 3991 if (strcmp(bdev->name, val) != 0) { 3992 i++; 3993 continue; 3994 } 3995 3996 val = spdk_conf_section_get_nmval(sp, qos_conf_type[j], i, 1); 3997 if (val) { 3998 if (_spdk_bdev_qos_is_iops_rate_limit(j) == true) { 3999 limits[j] = strtoull(val, NULL, 10); 4000 } else { 4001 limits[j] = strtoull(val, NULL, 10) * 1024 * 1024; 4002 } 4003 config_qos = true; 4004 } 4005 4006 break; 4007 } 4008 4009 j++; 4010 } 4011 4012 if (config_qos == true) { 4013 _spdk_bdev_qos_config_limit(bdev, limits); 4014 } 4015 4016 return; 4017 } 4018 4019 static int 4020 spdk_bdev_init(struct spdk_bdev *bdev) 4021 { 4022 char *bdev_name; 4023 4024 assert(bdev->module != NULL); 4025 4026 if (!bdev->name) { 4027 SPDK_ERRLOG("Bdev name is NULL\n"); 4028 return -EINVAL; 4029 } 4030 4031 if (!strlen(bdev->name)) { 4032 SPDK_ERRLOG("Bdev name must not be an empty string\n"); 4033 return -EINVAL; 4034 } 4035 4036 if (spdk_bdev_get_by_name(bdev->name)) { 4037 SPDK_ERRLOG("Bdev name:%s already exists\n", bdev->name); 4038 return -EEXIST; 4039 } 4040 4041 /* Users often register their own I/O devices using the bdev name. In 4042 * order to avoid conflicts, prepend bdev_. */ 4043 bdev_name = spdk_sprintf_alloc("bdev_%s", bdev->name); 4044 if (!bdev_name) { 4045 SPDK_ERRLOG("Unable to allocate memory for internal bdev name.\n"); 4046 return -ENOMEM; 4047 } 4048 4049 bdev->internal.status = SPDK_BDEV_STATUS_READY; 4050 bdev->internal.measured_queue_depth = UINT64_MAX; 4051 bdev->internal.claim_module = NULL; 4052 bdev->internal.qd_poller = NULL; 4053 bdev->internal.qos = NULL; 4054 4055 /* If the user didn't specify a uuid, generate one. */ 4056 if (spdk_mem_all_zero(&bdev->uuid, sizeof(bdev->uuid))) { 4057 spdk_uuid_generate(&bdev->uuid); 4058 } 4059 4060 if (spdk_bdev_get_buf_align(bdev) > 1) { 4061 if (bdev->split_on_optimal_io_boundary) { 4062 bdev->optimal_io_boundary = spdk_min(bdev->optimal_io_boundary, 4063 SPDK_BDEV_LARGE_BUF_MAX_SIZE / bdev->blocklen); 4064 } else { 4065 bdev->split_on_optimal_io_boundary = true; 4066 bdev->optimal_io_boundary = SPDK_BDEV_LARGE_BUF_MAX_SIZE / bdev->blocklen; 4067 } 4068 } 4069 4070 TAILQ_INIT(&bdev->internal.open_descs); 4071 4072 TAILQ_INIT(&bdev->aliases); 4073 4074 bdev->internal.reset_in_progress = NULL; 4075 4076 _spdk_bdev_qos_config(bdev); 4077 4078 spdk_io_device_register(__bdev_to_io_dev(bdev), 4079 spdk_bdev_channel_create, spdk_bdev_channel_destroy, 4080 sizeof(struct spdk_bdev_channel), 4081 bdev_name); 4082 4083 free(bdev_name); 4084 4085 pthread_mutex_init(&bdev->internal.mutex, NULL); 4086 return 0; 4087 } 4088 4089 static void 4090 spdk_bdev_destroy_cb(void *io_device) 4091 { 4092 int rc; 4093 struct spdk_bdev *bdev; 4094 spdk_bdev_unregister_cb cb_fn; 4095 void *cb_arg; 4096 4097 bdev = __bdev_from_io_dev(io_device); 4098 cb_fn = bdev->internal.unregister_cb; 4099 cb_arg = bdev->internal.unregister_ctx; 4100 4101 rc = bdev->fn_table->destruct(bdev->ctxt); 4102 if (rc < 0) { 4103 SPDK_ERRLOG("destruct failed\n"); 4104 } 4105 if (rc <= 0 && cb_fn != NULL) { 4106 cb_fn(cb_arg, rc); 4107 } 4108 } 4109 4110 4111 static void 4112 spdk_bdev_fini(struct spdk_bdev *bdev) 4113 { 4114 pthread_mutex_destroy(&bdev->internal.mutex); 4115 4116 free(bdev->internal.qos); 4117 4118 spdk_io_device_unregister(__bdev_to_io_dev(bdev), spdk_bdev_destroy_cb); 4119 } 4120 4121 static void 4122 spdk_bdev_start(struct spdk_bdev *bdev) 4123 { 4124 struct spdk_bdev_module *module; 4125 uint32_t action; 4126 4127 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Inserting bdev %s into list\n", bdev->name); 4128 TAILQ_INSERT_TAIL(&g_bdev_mgr.bdevs, bdev, internal.link); 4129 4130 /* Examine configuration before initializing I/O */ 4131 TAILQ_FOREACH(module, &g_bdev_mgr.bdev_modules, internal.tailq) { 4132 if (module->examine_config) { 4133 action = module->internal.action_in_progress; 4134 module->internal.action_in_progress++; 4135 module->examine_config(bdev); 4136 if (action != module->internal.action_in_progress) { 4137 SPDK_ERRLOG("examine_config for module %s did not call spdk_bdev_module_examine_done()\n", 4138 module->name); 4139 } 4140 } 4141 } 4142 4143 if (bdev->internal.claim_module) { 4144 if (bdev->internal.claim_module->examine_disk) { 4145 bdev->internal.claim_module->internal.action_in_progress++; 4146 bdev->internal.claim_module->examine_disk(bdev); 4147 } 4148 return; 4149 } 4150 4151 TAILQ_FOREACH(module, &g_bdev_mgr.bdev_modules, internal.tailq) { 4152 if (module->examine_disk) { 4153 module->internal.action_in_progress++; 4154 module->examine_disk(bdev); 4155 } 4156 } 4157 } 4158 4159 int 4160 spdk_bdev_register(struct spdk_bdev *bdev) 4161 { 4162 int rc = spdk_bdev_init(bdev); 4163 4164 if (rc == 0) { 4165 spdk_bdev_start(bdev); 4166 } 4167 4168 spdk_notify_send("bdev_register", spdk_bdev_get_name(bdev)); 4169 return rc; 4170 } 4171 4172 int 4173 spdk_vbdev_register(struct spdk_bdev *vbdev, struct spdk_bdev **base_bdevs, int base_bdev_count) 4174 { 4175 SPDK_ERRLOG("This function is deprecated. Use spdk_bdev_register() instead.\n"); 4176 return spdk_bdev_register(vbdev); 4177 } 4178 4179 void 4180 spdk_bdev_destruct_done(struct spdk_bdev *bdev, int bdeverrno) 4181 { 4182 if (bdev->internal.unregister_cb != NULL) { 4183 bdev->internal.unregister_cb(bdev->internal.unregister_ctx, bdeverrno); 4184 } 4185 } 4186 4187 static void 4188 _remove_notify(void *arg) 4189 { 4190 struct spdk_bdev_desc *desc = arg; 4191 4192 desc->remove_scheduled = false; 4193 4194 if (desc->closed) { 4195 free(desc); 4196 } else { 4197 desc->remove_cb(desc->remove_ctx); 4198 } 4199 } 4200 4201 /* Must be called while holding bdev->internal.mutex. 4202 * returns: 0 - bdev removed and ready to be destructed. 4203 * -EBUSY - bdev can't be destructed yet. */ 4204 static int 4205 spdk_bdev_unregister_unsafe(struct spdk_bdev *bdev) 4206 { 4207 struct spdk_bdev_desc *desc, *tmp; 4208 int rc = 0; 4209 4210 /* Notify each descriptor about hotremoval */ 4211 TAILQ_FOREACH_SAFE(desc, &bdev->internal.open_descs, link, tmp) { 4212 rc = -EBUSY; 4213 if (desc->remove_cb) { 4214 /* 4215 * Defer invocation of the remove_cb to a separate message that will 4216 * run later on its thread. This ensures this context unwinds and 4217 * we don't recursively unregister this bdev again if the remove_cb 4218 * immediately closes its descriptor. 4219 */ 4220 if (!desc->remove_scheduled) { 4221 /* Avoid scheduling removal of the same descriptor multiple times. */ 4222 desc->remove_scheduled = true; 4223 spdk_thread_send_msg(desc->thread, _remove_notify, desc); 4224 } 4225 } 4226 } 4227 4228 /* If there are no descriptors, proceed removing the bdev */ 4229 if (rc == 0) { 4230 TAILQ_REMOVE(&g_bdev_mgr.bdevs, bdev, internal.link); 4231 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Removing bdev %s from list done\n", bdev->name); 4232 spdk_notify_send("bdev_unregister", spdk_bdev_get_name(bdev)); 4233 } 4234 4235 return rc; 4236 } 4237 4238 void 4239 spdk_bdev_unregister(struct spdk_bdev *bdev, spdk_bdev_unregister_cb cb_fn, void *cb_arg) 4240 { 4241 struct spdk_thread *thread; 4242 int rc; 4243 4244 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Removing bdev %s from list\n", bdev->name); 4245 4246 thread = spdk_get_thread(); 4247 if (!thread) { 4248 /* The user called this from a non-SPDK thread. */ 4249 if (cb_fn != NULL) { 4250 cb_fn(cb_arg, -ENOTSUP); 4251 } 4252 return; 4253 } 4254 4255 pthread_mutex_lock(&g_bdev_mgr.mutex); 4256 pthread_mutex_lock(&bdev->internal.mutex); 4257 if (bdev->internal.status == SPDK_BDEV_STATUS_REMOVING) { 4258 pthread_mutex_unlock(&bdev->internal.mutex); 4259 pthread_mutex_unlock(&g_bdev_mgr.mutex); 4260 if (cb_fn) { 4261 cb_fn(cb_arg, -EBUSY); 4262 } 4263 return; 4264 } 4265 4266 bdev->internal.status = SPDK_BDEV_STATUS_REMOVING; 4267 bdev->internal.unregister_cb = cb_fn; 4268 bdev->internal.unregister_ctx = cb_arg; 4269 4270 /* Call under lock. */ 4271 rc = spdk_bdev_unregister_unsafe(bdev); 4272 pthread_mutex_unlock(&bdev->internal.mutex); 4273 pthread_mutex_unlock(&g_bdev_mgr.mutex); 4274 4275 if (rc == 0) { 4276 spdk_bdev_fini(bdev); 4277 } 4278 } 4279 4280 int 4281 spdk_bdev_open(struct spdk_bdev *bdev, bool write, spdk_bdev_remove_cb_t remove_cb, 4282 void *remove_ctx, struct spdk_bdev_desc **_desc) 4283 { 4284 struct spdk_bdev_desc *desc; 4285 struct spdk_thread *thread; 4286 struct set_qos_limit_ctx *ctx; 4287 4288 thread = spdk_get_thread(); 4289 if (!thread) { 4290 SPDK_ERRLOG("Cannot open bdev from non-SPDK thread.\n"); 4291 return -ENOTSUP; 4292 } 4293 4294 desc = calloc(1, sizeof(*desc)); 4295 if (desc == NULL) { 4296 SPDK_ERRLOG("Failed to allocate memory for bdev descriptor\n"); 4297 return -ENOMEM; 4298 } 4299 4300 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Opening descriptor %p for bdev %s on thread %p\n", desc, bdev->name, 4301 spdk_get_thread()); 4302 4303 desc->bdev = bdev; 4304 desc->thread = thread; 4305 desc->remove_cb = remove_cb; 4306 desc->remove_ctx = remove_ctx; 4307 desc->write = write; 4308 *_desc = desc; 4309 4310 pthread_mutex_lock(&bdev->internal.mutex); 4311 4312 if (write && bdev->internal.claim_module) { 4313 SPDK_ERRLOG("Could not open %s - %s module already claimed it\n", 4314 bdev->name, bdev->internal.claim_module->name); 4315 pthread_mutex_unlock(&bdev->internal.mutex); 4316 free(desc); 4317 *_desc = NULL; 4318 return -EPERM; 4319 } 4320 4321 /* Enable QoS */ 4322 if (bdev->internal.qos && bdev->internal.qos->thread == NULL) { 4323 ctx = calloc(1, sizeof(*ctx)); 4324 if (ctx == NULL) { 4325 SPDK_ERRLOG("Failed to allocate memory for QoS context\n"); 4326 pthread_mutex_unlock(&bdev->internal.mutex); 4327 free(desc); 4328 *_desc = NULL; 4329 return -ENOMEM; 4330 } 4331 ctx->bdev = bdev; 4332 spdk_for_each_channel(__bdev_to_io_dev(bdev), 4333 _spdk_bdev_enable_qos_msg, ctx, 4334 _spdk_bdev_enable_qos_done); 4335 } 4336 4337 TAILQ_INSERT_TAIL(&bdev->internal.open_descs, desc, link); 4338 4339 pthread_mutex_unlock(&bdev->internal.mutex); 4340 4341 return 0; 4342 } 4343 4344 void 4345 spdk_bdev_close(struct spdk_bdev_desc *desc) 4346 { 4347 struct spdk_bdev *bdev = spdk_bdev_desc_get_bdev(desc); 4348 int rc; 4349 4350 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Closing descriptor %p for bdev %s on thread %p\n", desc, bdev->name, 4351 spdk_get_thread()); 4352 4353 assert(desc->thread == spdk_get_thread()); 4354 4355 pthread_mutex_lock(&bdev->internal.mutex); 4356 4357 TAILQ_REMOVE(&bdev->internal.open_descs, desc, link); 4358 4359 desc->closed = true; 4360 4361 if (!desc->remove_scheduled) { 4362 free(desc); 4363 } 4364 4365 /* If no more descriptors, kill QoS channel */ 4366 if (bdev->internal.qos && TAILQ_EMPTY(&bdev->internal.open_descs)) { 4367 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Closed last descriptor for bdev %s on thread %p. Stopping QoS.\n", 4368 bdev->name, spdk_get_thread()); 4369 4370 if (spdk_bdev_qos_destroy(bdev)) { 4371 /* There isn't anything we can do to recover here. Just let the 4372 * old QoS poller keep running. The QoS handling won't change 4373 * cores when the user allocates a new channel, but it won't break. */ 4374 SPDK_ERRLOG("Unable to shut down QoS poller. It will continue running on the current thread.\n"); 4375 } 4376 } 4377 4378 spdk_bdev_set_qd_sampling_period(bdev, 0); 4379 4380 if (bdev->internal.status == SPDK_BDEV_STATUS_REMOVING && TAILQ_EMPTY(&bdev->internal.open_descs)) { 4381 rc = spdk_bdev_unregister_unsafe(bdev); 4382 pthread_mutex_unlock(&bdev->internal.mutex); 4383 4384 if (rc == 0) { 4385 spdk_bdev_fini(bdev); 4386 } 4387 } else { 4388 pthread_mutex_unlock(&bdev->internal.mutex); 4389 } 4390 } 4391 4392 int 4393 spdk_bdev_module_claim_bdev(struct spdk_bdev *bdev, struct spdk_bdev_desc *desc, 4394 struct spdk_bdev_module *module) 4395 { 4396 if (bdev->internal.claim_module != NULL) { 4397 SPDK_ERRLOG("bdev %s already claimed by module %s\n", bdev->name, 4398 bdev->internal.claim_module->name); 4399 return -EPERM; 4400 } 4401 4402 if (desc && !desc->write) { 4403 desc->write = true; 4404 } 4405 4406 bdev->internal.claim_module = module; 4407 return 0; 4408 } 4409 4410 void 4411 spdk_bdev_module_release_bdev(struct spdk_bdev *bdev) 4412 { 4413 assert(bdev->internal.claim_module != NULL); 4414 bdev->internal.claim_module = NULL; 4415 } 4416 4417 struct spdk_bdev * 4418 spdk_bdev_desc_get_bdev(struct spdk_bdev_desc *desc) 4419 { 4420 assert(desc != NULL); 4421 return desc->bdev; 4422 } 4423 4424 void 4425 spdk_bdev_io_get_iovec(struct spdk_bdev_io *bdev_io, struct iovec **iovp, int *iovcntp) 4426 { 4427 struct iovec *iovs; 4428 int iovcnt; 4429 4430 if (bdev_io == NULL) { 4431 return; 4432 } 4433 4434 switch (bdev_io->type) { 4435 case SPDK_BDEV_IO_TYPE_READ: 4436 case SPDK_BDEV_IO_TYPE_WRITE: 4437 case SPDK_BDEV_IO_TYPE_ZCOPY: 4438 iovs = bdev_io->u.bdev.iovs; 4439 iovcnt = bdev_io->u.bdev.iovcnt; 4440 break; 4441 default: 4442 iovs = NULL; 4443 iovcnt = 0; 4444 break; 4445 } 4446 4447 if (iovp) { 4448 *iovp = iovs; 4449 } 4450 if (iovcntp) { 4451 *iovcntp = iovcnt; 4452 } 4453 } 4454 4455 void * 4456 spdk_bdev_io_get_md_buf(struct spdk_bdev_io *bdev_io) 4457 { 4458 if (bdev_io == NULL) { 4459 return NULL; 4460 } 4461 4462 if (!spdk_bdev_is_md_separate(bdev_io->bdev)) { 4463 return NULL; 4464 } 4465 4466 if (bdev_io->type == SPDK_BDEV_IO_TYPE_READ || 4467 bdev_io->type == SPDK_BDEV_IO_TYPE_WRITE) { 4468 return bdev_io->u.bdev.md_buf; 4469 } 4470 4471 return NULL; 4472 } 4473 4474 void 4475 spdk_bdev_module_list_add(struct spdk_bdev_module *bdev_module) 4476 { 4477 4478 if (spdk_bdev_module_list_find(bdev_module->name)) { 4479 SPDK_ERRLOG("ERROR: module '%s' already registered.\n", bdev_module->name); 4480 assert(false); 4481 } 4482 4483 /* 4484 * Modules with examine callbacks must be initialized first, so they are 4485 * ready to handle examine callbacks from later modules that will 4486 * register physical bdevs. 4487 */ 4488 if (bdev_module->examine_config != NULL || bdev_module->examine_disk != NULL) { 4489 TAILQ_INSERT_HEAD(&g_bdev_mgr.bdev_modules, bdev_module, internal.tailq); 4490 } else { 4491 TAILQ_INSERT_TAIL(&g_bdev_mgr.bdev_modules, bdev_module, internal.tailq); 4492 } 4493 } 4494 4495 struct spdk_bdev_module * 4496 spdk_bdev_module_list_find(const char *name) 4497 { 4498 struct spdk_bdev_module *bdev_module; 4499 4500 TAILQ_FOREACH(bdev_module, &g_bdev_mgr.bdev_modules, internal.tailq) { 4501 if (strcmp(name, bdev_module->name) == 0) { 4502 break; 4503 } 4504 } 4505 4506 return bdev_module; 4507 } 4508 4509 static void 4510 _spdk_bdev_write_zero_buffer_next(void *_bdev_io) 4511 { 4512 struct spdk_bdev_io *bdev_io = _bdev_io; 4513 uint64_t num_bytes, num_blocks; 4514 void *md_buf = NULL; 4515 int rc; 4516 4517 num_bytes = spdk_min(_bdev_get_block_size_with_md(bdev_io->bdev) * 4518 bdev_io->u.bdev.split_remaining_num_blocks, 4519 ZERO_BUFFER_SIZE); 4520 num_blocks = num_bytes / _bdev_get_block_size_with_md(bdev_io->bdev); 4521 4522 if (spdk_bdev_is_md_separate(bdev_io->bdev)) { 4523 md_buf = (char *)g_bdev_mgr.zero_buffer + 4524 spdk_bdev_get_block_size(bdev_io->bdev) * num_blocks; 4525 } 4526 4527 rc = _spdk_bdev_write_blocks_with_md(bdev_io->internal.desc, 4528 spdk_io_channel_from_ctx(bdev_io->internal.ch), 4529 g_bdev_mgr.zero_buffer, md_buf, 4530 bdev_io->u.bdev.split_current_offset_blocks, num_blocks, 4531 _spdk_bdev_write_zero_buffer_done, bdev_io); 4532 if (rc == 0) { 4533 bdev_io->u.bdev.split_remaining_num_blocks -= num_blocks; 4534 bdev_io->u.bdev.split_current_offset_blocks += num_blocks; 4535 } else if (rc == -ENOMEM) { 4536 _spdk_bdev_queue_io_wait_with_cb(bdev_io, _spdk_bdev_write_zero_buffer_next); 4537 } else { 4538 bdev_io->internal.status = SPDK_BDEV_IO_STATUS_FAILED; 4539 bdev_io->internal.cb(bdev_io, false, bdev_io->internal.caller_ctx); 4540 } 4541 } 4542 4543 static void 4544 _spdk_bdev_write_zero_buffer_done(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg) 4545 { 4546 struct spdk_bdev_io *parent_io = cb_arg; 4547 4548 spdk_bdev_free_io(bdev_io); 4549 4550 if (!success) { 4551 parent_io->internal.status = SPDK_BDEV_IO_STATUS_FAILED; 4552 parent_io->internal.cb(parent_io, false, parent_io->internal.caller_ctx); 4553 return; 4554 } 4555 4556 if (parent_io->u.bdev.split_remaining_num_blocks == 0) { 4557 parent_io->internal.status = SPDK_BDEV_IO_STATUS_SUCCESS; 4558 parent_io->internal.cb(parent_io, true, parent_io->internal.caller_ctx); 4559 return; 4560 } 4561 4562 _spdk_bdev_write_zero_buffer_next(parent_io); 4563 } 4564 4565 static void 4566 _spdk_bdev_set_qos_limit_done(struct set_qos_limit_ctx *ctx, int status) 4567 { 4568 pthread_mutex_lock(&ctx->bdev->internal.mutex); 4569 ctx->bdev->internal.qos_mod_in_progress = false; 4570 pthread_mutex_unlock(&ctx->bdev->internal.mutex); 4571 4572 if (ctx->cb_fn) { 4573 ctx->cb_fn(ctx->cb_arg, status); 4574 } 4575 free(ctx); 4576 } 4577 4578 static void 4579 _spdk_bdev_disable_qos_done(void *cb_arg) 4580 { 4581 struct set_qos_limit_ctx *ctx = cb_arg; 4582 struct spdk_bdev *bdev = ctx->bdev; 4583 struct spdk_bdev_io *bdev_io; 4584 struct spdk_bdev_qos *qos; 4585 4586 pthread_mutex_lock(&bdev->internal.mutex); 4587 qos = bdev->internal.qos; 4588 bdev->internal.qos = NULL; 4589 pthread_mutex_unlock(&bdev->internal.mutex); 4590 4591 while (!TAILQ_EMPTY(&qos->queued)) { 4592 /* Send queued I/O back to their original thread for resubmission. */ 4593 bdev_io = TAILQ_FIRST(&qos->queued); 4594 TAILQ_REMOVE(&qos->queued, bdev_io, internal.link); 4595 4596 if (bdev_io->internal.io_submit_ch) { 4597 /* 4598 * Channel was changed when sending it to the QoS thread - change it back 4599 * before sending it back to the original thread. 4600 */ 4601 bdev_io->internal.ch = bdev_io->internal.io_submit_ch; 4602 bdev_io->internal.io_submit_ch = NULL; 4603 } 4604 4605 spdk_thread_send_msg(spdk_bdev_io_get_thread(bdev_io), 4606 _spdk_bdev_io_submit, bdev_io); 4607 } 4608 4609 if (qos->thread != NULL) { 4610 spdk_put_io_channel(spdk_io_channel_from_ctx(qos->ch)); 4611 spdk_poller_unregister(&qos->poller); 4612 } 4613 4614 free(qos); 4615 4616 _spdk_bdev_set_qos_limit_done(ctx, 0); 4617 } 4618 4619 static void 4620 _spdk_bdev_disable_qos_msg_done(struct spdk_io_channel_iter *i, int status) 4621 { 4622 void *io_device = spdk_io_channel_iter_get_io_device(i); 4623 struct spdk_bdev *bdev = __bdev_from_io_dev(io_device); 4624 struct set_qos_limit_ctx *ctx = spdk_io_channel_iter_get_ctx(i); 4625 struct spdk_thread *thread; 4626 4627 pthread_mutex_lock(&bdev->internal.mutex); 4628 thread = bdev->internal.qos->thread; 4629 pthread_mutex_unlock(&bdev->internal.mutex); 4630 4631 if (thread != NULL) { 4632 spdk_thread_send_msg(thread, _spdk_bdev_disable_qos_done, ctx); 4633 } else { 4634 _spdk_bdev_disable_qos_done(ctx); 4635 } 4636 } 4637 4638 static void 4639 _spdk_bdev_disable_qos_msg(struct spdk_io_channel_iter *i) 4640 { 4641 struct spdk_io_channel *ch = spdk_io_channel_iter_get_channel(i); 4642 struct spdk_bdev_channel *bdev_ch = spdk_io_channel_get_ctx(ch); 4643 4644 bdev_ch->flags &= ~BDEV_CH_QOS_ENABLED; 4645 4646 spdk_for_each_channel_continue(i, 0); 4647 } 4648 4649 static void 4650 _spdk_bdev_update_qos_rate_limit_msg(void *cb_arg) 4651 { 4652 struct set_qos_limit_ctx *ctx = cb_arg; 4653 struct spdk_bdev *bdev = ctx->bdev; 4654 4655 pthread_mutex_lock(&bdev->internal.mutex); 4656 spdk_bdev_qos_update_max_quota_per_timeslice(bdev->internal.qos); 4657 pthread_mutex_unlock(&bdev->internal.mutex); 4658 4659 _spdk_bdev_set_qos_limit_done(ctx, 0); 4660 } 4661 4662 static void 4663 _spdk_bdev_enable_qos_msg(struct spdk_io_channel_iter *i) 4664 { 4665 void *io_device = spdk_io_channel_iter_get_io_device(i); 4666 struct spdk_bdev *bdev = __bdev_from_io_dev(io_device); 4667 struct spdk_io_channel *ch = spdk_io_channel_iter_get_channel(i); 4668 struct spdk_bdev_channel *bdev_ch = spdk_io_channel_get_ctx(ch); 4669 4670 pthread_mutex_lock(&bdev->internal.mutex); 4671 _spdk_bdev_enable_qos(bdev, bdev_ch); 4672 pthread_mutex_unlock(&bdev->internal.mutex); 4673 spdk_for_each_channel_continue(i, 0); 4674 } 4675 4676 static void 4677 _spdk_bdev_enable_qos_done(struct spdk_io_channel_iter *i, int status) 4678 { 4679 struct set_qos_limit_ctx *ctx = spdk_io_channel_iter_get_ctx(i); 4680 4681 _spdk_bdev_set_qos_limit_done(ctx, status); 4682 } 4683 4684 static void 4685 _spdk_bdev_set_qos_rate_limits(struct spdk_bdev *bdev, uint64_t *limits) 4686 { 4687 int i; 4688 4689 assert(bdev->internal.qos != NULL); 4690 4691 for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) { 4692 if (limits[i] != SPDK_BDEV_QOS_LIMIT_NOT_DEFINED) { 4693 bdev->internal.qos->rate_limits[i].limit = limits[i]; 4694 4695 if (limits[i] == 0) { 4696 bdev->internal.qos->rate_limits[i].limit = 4697 SPDK_BDEV_QOS_LIMIT_NOT_DEFINED; 4698 } 4699 } 4700 } 4701 } 4702 4703 void 4704 spdk_bdev_set_qos_rate_limits(struct spdk_bdev *bdev, uint64_t *limits, 4705 void (*cb_fn)(void *cb_arg, int status), void *cb_arg) 4706 { 4707 struct set_qos_limit_ctx *ctx; 4708 uint32_t limit_set_complement; 4709 uint64_t min_limit_per_sec; 4710 int i; 4711 bool disable_rate_limit = true; 4712 4713 for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) { 4714 if (limits[i] == SPDK_BDEV_QOS_LIMIT_NOT_DEFINED) { 4715 continue; 4716 } 4717 4718 if (limits[i] > 0) { 4719 disable_rate_limit = false; 4720 } 4721 4722 if (_spdk_bdev_qos_is_iops_rate_limit(i) == true) { 4723 min_limit_per_sec = SPDK_BDEV_QOS_MIN_IOS_PER_SEC; 4724 } else { 4725 /* Change from megabyte to byte rate limit */ 4726 limits[i] = limits[i] * 1024 * 1024; 4727 min_limit_per_sec = SPDK_BDEV_QOS_MIN_BYTES_PER_SEC; 4728 } 4729 4730 limit_set_complement = limits[i] % min_limit_per_sec; 4731 if (limit_set_complement) { 4732 SPDK_ERRLOG("Requested rate limit %" PRIu64 " is not a multiple of %" PRIu64 "\n", 4733 limits[i], min_limit_per_sec); 4734 limits[i] += min_limit_per_sec - limit_set_complement; 4735 SPDK_ERRLOG("Round up the rate limit to %" PRIu64 "\n", limits[i]); 4736 } 4737 } 4738 4739 ctx = calloc(1, sizeof(*ctx)); 4740 if (ctx == NULL) { 4741 cb_fn(cb_arg, -ENOMEM); 4742 return; 4743 } 4744 4745 ctx->cb_fn = cb_fn; 4746 ctx->cb_arg = cb_arg; 4747 ctx->bdev = bdev; 4748 4749 pthread_mutex_lock(&bdev->internal.mutex); 4750 if (bdev->internal.qos_mod_in_progress) { 4751 pthread_mutex_unlock(&bdev->internal.mutex); 4752 free(ctx); 4753 cb_fn(cb_arg, -EAGAIN); 4754 return; 4755 } 4756 bdev->internal.qos_mod_in_progress = true; 4757 4758 if (disable_rate_limit == true && bdev->internal.qos) { 4759 for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) { 4760 if (limits[i] == SPDK_BDEV_QOS_LIMIT_NOT_DEFINED && 4761 (bdev->internal.qos->rate_limits[i].limit > 0 && 4762 bdev->internal.qos->rate_limits[i].limit != 4763 SPDK_BDEV_QOS_LIMIT_NOT_DEFINED)) { 4764 disable_rate_limit = false; 4765 break; 4766 } 4767 } 4768 } 4769 4770 if (disable_rate_limit == false) { 4771 if (bdev->internal.qos == NULL) { 4772 bdev->internal.qos = calloc(1, sizeof(*bdev->internal.qos)); 4773 if (!bdev->internal.qos) { 4774 pthread_mutex_unlock(&bdev->internal.mutex); 4775 SPDK_ERRLOG("Unable to allocate memory for QoS tracking\n"); 4776 free(ctx); 4777 cb_fn(cb_arg, -ENOMEM); 4778 return; 4779 } 4780 } 4781 4782 if (bdev->internal.qos->thread == NULL) { 4783 /* Enabling */ 4784 _spdk_bdev_set_qos_rate_limits(bdev, limits); 4785 4786 spdk_for_each_channel(__bdev_to_io_dev(bdev), 4787 _spdk_bdev_enable_qos_msg, ctx, 4788 _spdk_bdev_enable_qos_done); 4789 } else { 4790 /* Updating */ 4791 _spdk_bdev_set_qos_rate_limits(bdev, limits); 4792 4793 spdk_thread_send_msg(bdev->internal.qos->thread, 4794 _spdk_bdev_update_qos_rate_limit_msg, ctx); 4795 } 4796 } else { 4797 if (bdev->internal.qos != NULL) { 4798 _spdk_bdev_set_qos_rate_limits(bdev, limits); 4799 4800 /* Disabling */ 4801 spdk_for_each_channel(__bdev_to_io_dev(bdev), 4802 _spdk_bdev_disable_qos_msg, ctx, 4803 _spdk_bdev_disable_qos_msg_done); 4804 } else { 4805 pthread_mutex_unlock(&bdev->internal.mutex); 4806 _spdk_bdev_set_qos_limit_done(ctx, 0); 4807 return; 4808 } 4809 } 4810 4811 pthread_mutex_unlock(&bdev->internal.mutex); 4812 } 4813 4814 struct spdk_bdev_histogram_ctx { 4815 spdk_bdev_histogram_status_cb cb_fn; 4816 void *cb_arg; 4817 struct spdk_bdev *bdev; 4818 int status; 4819 }; 4820 4821 static void 4822 _spdk_bdev_histogram_disable_channel_cb(struct spdk_io_channel_iter *i, int status) 4823 { 4824 struct spdk_bdev_histogram_ctx *ctx = spdk_io_channel_iter_get_ctx(i); 4825 4826 pthread_mutex_lock(&ctx->bdev->internal.mutex); 4827 ctx->bdev->internal.histogram_in_progress = false; 4828 pthread_mutex_unlock(&ctx->bdev->internal.mutex); 4829 ctx->cb_fn(ctx->cb_arg, ctx->status); 4830 free(ctx); 4831 } 4832 4833 static void 4834 _spdk_bdev_histogram_disable_channel(struct spdk_io_channel_iter *i) 4835 { 4836 struct spdk_io_channel *_ch = spdk_io_channel_iter_get_channel(i); 4837 struct spdk_bdev_channel *ch = spdk_io_channel_get_ctx(_ch); 4838 4839 if (ch->histogram != NULL) { 4840 spdk_histogram_data_free(ch->histogram); 4841 ch->histogram = NULL; 4842 } 4843 spdk_for_each_channel_continue(i, 0); 4844 } 4845 4846 static void 4847 _spdk_bdev_histogram_enable_channel_cb(struct spdk_io_channel_iter *i, int status) 4848 { 4849 struct spdk_bdev_histogram_ctx *ctx = spdk_io_channel_iter_get_ctx(i); 4850 4851 if (status != 0) { 4852 ctx->status = status; 4853 ctx->bdev->internal.histogram_enabled = false; 4854 spdk_for_each_channel(__bdev_to_io_dev(ctx->bdev), _spdk_bdev_histogram_disable_channel, ctx, 4855 _spdk_bdev_histogram_disable_channel_cb); 4856 } else { 4857 pthread_mutex_lock(&ctx->bdev->internal.mutex); 4858 ctx->bdev->internal.histogram_in_progress = false; 4859 pthread_mutex_unlock(&ctx->bdev->internal.mutex); 4860 ctx->cb_fn(ctx->cb_arg, ctx->status); 4861 free(ctx); 4862 } 4863 } 4864 4865 static void 4866 _spdk_bdev_histogram_enable_channel(struct spdk_io_channel_iter *i) 4867 { 4868 struct spdk_io_channel *_ch = spdk_io_channel_iter_get_channel(i); 4869 struct spdk_bdev_channel *ch = spdk_io_channel_get_ctx(_ch); 4870 int status = 0; 4871 4872 if (ch->histogram == NULL) { 4873 ch->histogram = spdk_histogram_data_alloc(); 4874 if (ch->histogram == NULL) { 4875 status = -ENOMEM; 4876 } 4877 } 4878 4879 spdk_for_each_channel_continue(i, status); 4880 } 4881 4882 void 4883 spdk_bdev_histogram_enable(struct spdk_bdev *bdev, spdk_bdev_histogram_status_cb cb_fn, 4884 void *cb_arg, bool enable) 4885 { 4886 struct spdk_bdev_histogram_ctx *ctx; 4887 4888 ctx = calloc(1, sizeof(struct spdk_bdev_histogram_ctx)); 4889 if (ctx == NULL) { 4890 cb_fn(cb_arg, -ENOMEM); 4891 return; 4892 } 4893 4894 ctx->bdev = bdev; 4895 ctx->status = 0; 4896 ctx->cb_fn = cb_fn; 4897 ctx->cb_arg = cb_arg; 4898 4899 pthread_mutex_lock(&bdev->internal.mutex); 4900 if (bdev->internal.histogram_in_progress) { 4901 pthread_mutex_unlock(&bdev->internal.mutex); 4902 free(ctx); 4903 cb_fn(cb_arg, -EAGAIN); 4904 return; 4905 } 4906 4907 bdev->internal.histogram_in_progress = true; 4908 pthread_mutex_unlock(&bdev->internal.mutex); 4909 4910 bdev->internal.histogram_enabled = enable; 4911 4912 if (enable) { 4913 /* Allocate histogram for each channel */ 4914 spdk_for_each_channel(__bdev_to_io_dev(bdev), _spdk_bdev_histogram_enable_channel, ctx, 4915 _spdk_bdev_histogram_enable_channel_cb); 4916 } else { 4917 spdk_for_each_channel(__bdev_to_io_dev(bdev), _spdk_bdev_histogram_disable_channel, ctx, 4918 _spdk_bdev_histogram_disable_channel_cb); 4919 } 4920 } 4921 4922 struct spdk_bdev_histogram_data_ctx { 4923 spdk_bdev_histogram_data_cb cb_fn; 4924 void *cb_arg; 4925 struct spdk_bdev *bdev; 4926 /** merged histogram data from all channels */ 4927 struct spdk_histogram_data *histogram; 4928 }; 4929 4930 static void 4931 _spdk_bdev_histogram_get_channel_cb(struct spdk_io_channel_iter *i, int status) 4932 { 4933 struct spdk_bdev_histogram_data_ctx *ctx = spdk_io_channel_iter_get_ctx(i); 4934 4935 ctx->cb_fn(ctx->cb_arg, status, ctx->histogram); 4936 free(ctx); 4937 } 4938 4939 static void 4940 _spdk_bdev_histogram_get_channel(struct spdk_io_channel_iter *i) 4941 { 4942 struct spdk_io_channel *_ch = spdk_io_channel_iter_get_channel(i); 4943 struct spdk_bdev_channel *ch = spdk_io_channel_get_ctx(_ch); 4944 struct spdk_bdev_histogram_data_ctx *ctx = spdk_io_channel_iter_get_ctx(i); 4945 int status = 0; 4946 4947 if (ch->histogram == NULL) { 4948 status = -EFAULT; 4949 } else { 4950 spdk_histogram_data_merge(ctx->histogram, ch->histogram); 4951 } 4952 4953 spdk_for_each_channel_continue(i, status); 4954 } 4955 4956 void 4957 spdk_bdev_histogram_get(struct spdk_bdev *bdev, struct spdk_histogram_data *histogram, 4958 spdk_bdev_histogram_data_cb cb_fn, 4959 void *cb_arg) 4960 { 4961 struct spdk_bdev_histogram_data_ctx *ctx; 4962 4963 ctx = calloc(1, sizeof(struct spdk_bdev_histogram_data_ctx)); 4964 if (ctx == NULL) { 4965 cb_fn(cb_arg, -ENOMEM, NULL); 4966 return; 4967 } 4968 4969 ctx->bdev = bdev; 4970 ctx->cb_fn = cb_fn; 4971 ctx->cb_arg = cb_arg; 4972 4973 ctx->histogram = histogram; 4974 4975 spdk_for_each_channel(__bdev_to_io_dev(bdev), _spdk_bdev_histogram_get_channel, ctx, 4976 _spdk_bdev_histogram_get_channel_cb); 4977 } 4978 4979 SPDK_LOG_REGISTER_COMPONENT("bdev", SPDK_LOG_BDEV) 4980 4981 SPDK_TRACE_REGISTER_FN(bdev_trace, "bdev", TRACE_GROUP_BDEV) 4982 { 4983 spdk_trace_register_owner(OWNER_BDEV, 'b'); 4984 spdk_trace_register_object(OBJECT_BDEV_IO, 'i'); 4985 spdk_trace_register_description("BDEV_IO_START", TRACE_BDEV_IO_START, OWNER_BDEV, 4986 OBJECT_BDEV_IO, 1, 0, "type: "); 4987 spdk_trace_register_description("BDEV_IO_DONE", TRACE_BDEV_IO_DONE, OWNER_BDEV, 4988 OBJECT_BDEV_IO, 0, 0, ""); 4989 } 4990