1 /*- 2 * BSD LICENSE 3 * 4 * Copyright (c) Intel Corporation. 5 * All rights reserved. 6 * 7 * Redistribution and use in source and binary forms, with or without 8 * modification, are permitted provided that the following conditions 9 * are met: 10 * 11 * * Redistributions of source code must retain the above copyright 12 * notice, this list of conditions and the following disclaimer. 13 * * Redistributions in binary form must reproduce the above copyright 14 * notice, this list of conditions and the following disclaimer in 15 * the documentation and/or other materials provided with the 16 * distribution. 17 * * Neither the name of Intel Corporation nor the names of its 18 * contributors may be used to endorse or promote products derived 19 * from this software without specific prior written permission. 20 * 21 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 22 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 23 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR 24 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT 25 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, 26 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT 27 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 28 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY 29 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 30 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE 31 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 32 */ 33 34 #include "spdk/stdinc.h" 35 36 #include "spdk/bdev.h" 37 #include "spdk/conf.h" 38 39 #include "spdk/config.h" 40 #include "spdk/env.h" 41 #include "spdk/event.h" 42 #include "spdk/thread.h" 43 #include "spdk/likely.h" 44 #include "spdk/queue.h" 45 #include "spdk/nvme_spec.h" 46 #include "spdk/scsi_spec.h" 47 #include "spdk/notify.h" 48 #include "spdk/util.h" 49 #include "spdk/trace.h" 50 51 #include "spdk/bdev_module.h" 52 #include "spdk_internal/log.h" 53 #include "spdk/string.h" 54 55 #ifdef SPDK_CONFIG_VTUNE 56 #include "ittnotify.h" 57 #include "ittnotify_types.h" 58 int __itt_init_ittlib(const char *, __itt_group_id); 59 #endif 60 61 #define SPDK_BDEV_IO_POOL_SIZE (64 * 1024 - 1) 62 #define SPDK_BDEV_IO_CACHE_SIZE 256 63 #define BUF_SMALL_POOL_SIZE 8191 64 #define BUF_LARGE_POOL_SIZE 1023 65 #define NOMEM_THRESHOLD_COUNT 8 66 #define ZERO_BUFFER_SIZE 0x100000 67 68 #define OWNER_BDEV 0x2 69 70 #define OBJECT_BDEV_IO 0x2 71 72 #define TRACE_GROUP_BDEV 0x3 73 #define TRACE_BDEV_IO_START SPDK_TPOINT_ID(TRACE_GROUP_BDEV, 0x0) 74 #define TRACE_BDEV_IO_DONE SPDK_TPOINT_ID(TRACE_GROUP_BDEV, 0x1) 75 76 #define SPDK_BDEV_QOS_TIMESLICE_IN_USEC 1000 77 #define SPDK_BDEV_QOS_MIN_IO_PER_TIMESLICE 1 78 #define SPDK_BDEV_QOS_MIN_BYTE_PER_TIMESLICE 512 79 #define SPDK_BDEV_QOS_MIN_IOS_PER_SEC 1000 80 #define SPDK_BDEV_QOS_MIN_BYTES_PER_SEC (1024 * 1024) 81 #define SPDK_BDEV_QOS_LIMIT_NOT_DEFINED UINT64_MAX 82 83 #define SPDK_BDEV_POOL_ALIGNMENT 512 84 85 static const char *qos_conf_type[] = {"Limit_IOPS", 86 "Limit_BPS", "Limit_Read_BPS", "Limit_Write_BPS" 87 }; 88 static const char *qos_rpc_type[] = {"rw_ios_per_sec", 89 "rw_mbytes_per_sec", "r_mbytes_per_sec", "w_mbytes_per_sec" 90 }; 91 92 TAILQ_HEAD(spdk_bdev_list, spdk_bdev); 93 94 struct spdk_bdev_mgr { 95 struct spdk_mempool *bdev_io_pool; 96 97 struct spdk_mempool *buf_small_pool; 98 struct spdk_mempool *buf_large_pool; 99 100 void *zero_buffer; 101 102 TAILQ_HEAD(bdev_module_list, spdk_bdev_module) bdev_modules; 103 104 struct spdk_bdev_list bdevs; 105 106 bool init_complete; 107 bool module_init_complete; 108 109 #ifdef SPDK_CONFIG_VTUNE 110 __itt_domain *domain; 111 #endif 112 }; 113 114 static struct spdk_bdev_mgr g_bdev_mgr = { 115 .bdev_modules = TAILQ_HEAD_INITIALIZER(g_bdev_mgr.bdev_modules), 116 .bdevs = TAILQ_HEAD_INITIALIZER(g_bdev_mgr.bdevs), 117 .init_complete = false, 118 .module_init_complete = false, 119 }; 120 121 static struct spdk_bdev_opts g_bdev_opts = { 122 .bdev_io_pool_size = SPDK_BDEV_IO_POOL_SIZE, 123 .bdev_io_cache_size = SPDK_BDEV_IO_CACHE_SIZE, 124 }; 125 126 static spdk_bdev_init_cb g_init_cb_fn = NULL; 127 static void *g_init_cb_arg = NULL; 128 129 static spdk_bdev_fini_cb g_fini_cb_fn = NULL; 130 static void *g_fini_cb_arg = NULL; 131 static struct spdk_thread *g_fini_thread = NULL; 132 133 struct spdk_bdev_qos_limit { 134 /** IOs or bytes allowed per second (i.e., 1s). */ 135 uint64_t limit; 136 137 /** Remaining IOs or bytes allowed in current timeslice (e.g., 1ms). 138 * For remaining bytes, allowed to run negative if an I/O is submitted when 139 * some bytes are remaining, but the I/O is bigger than that amount. The 140 * excess will be deducted from the next timeslice. 141 */ 142 int64_t remaining_this_timeslice; 143 144 /** Minimum allowed IOs or bytes to be issued in one timeslice (e.g., 1ms). */ 145 uint32_t min_per_timeslice; 146 147 /** Maximum allowed IOs or bytes to be issued in one timeslice (e.g., 1ms). */ 148 uint32_t max_per_timeslice; 149 150 /** Function to check whether to queue the IO. */ 151 bool (*queue_io)(const struct spdk_bdev_qos_limit *limit, struct spdk_bdev_io *io); 152 153 /** Function to update for the submitted IO. */ 154 void (*update_quota)(struct spdk_bdev_qos_limit *limit, struct spdk_bdev_io *io); 155 }; 156 157 struct spdk_bdev_qos { 158 /** Types of structure of rate limits. */ 159 struct spdk_bdev_qos_limit rate_limits[SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES]; 160 161 /** The channel that all I/O are funneled through. */ 162 struct spdk_bdev_channel *ch; 163 164 /** The thread on which the poller is running. */ 165 struct spdk_thread *thread; 166 167 /** Queue of I/O waiting to be issued. */ 168 bdev_io_tailq_t queued; 169 170 /** Size of a timeslice in tsc ticks. */ 171 uint64_t timeslice_size; 172 173 /** Timestamp of start of last timeslice. */ 174 uint64_t last_timeslice; 175 176 /** Poller that processes queued I/O commands each time slice. */ 177 struct spdk_poller *poller; 178 }; 179 180 struct spdk_bdev_mgmt_channel { 181 bdev_io_stailq_t need_buf_small; 182 bdev_io_stailq_t need_buf_large; 183 184 /* 185 * Each thread keeps a cache of bdev_io - this allows 186 * bdev threads which are *not* DPDK threads to still 187 * benefit from a per-thread bdev_io cache. Without 188 * this, non-DPDK threads fetching from the mempool 189 * incur a cmpxchg on get and put. 190 */ 191 bdev_io_stailq_t per_thread_cache; 192 uint32_t per_thread_cache_count; 193 uint32_t bdev_io_cache_size; 194 195 TAILQ_HEAD(, spdk_bdev_shared_resource) shared_resources; 196 TAILQ_HEAD(, spdk_bdev_io_wait_entry) io_wait_queue; 197 }; 198 199 /* 200 * Per-module (or per-io_device) data. Multiple bdevs built on the same io_device 201 * will queue here their IO that awaits retry. It makes it possible to retry sending 202 * IO to one bdev after IO from other bdev completes. 203 */ 204 struct spdk_bdev_shared_resource { 205 /* The bdev management channel */ 206 struct spdk_bdev_mgmt_channel *mgmt_ch; 207 208 /* 209 * Count of I/O submitted to bdev module and waiting for completion. 210 * Incremented before submit_request() is called on an spdk_bdev_io. 211 */ 212 uint64_t io_outstanding; 213 214 /* 215 * Queue of IO awaiting retry because of a previous NOMEM status returned 216 * on this channel. 217 */ 218 bdev_io_tailq_t nomem_io; 219 220 /* 221 * Threshold which io_outstanding must drop to before retrying nomem_io. 222 */ 223 uint64_t nomem_threshold; 224 225 /* I/O channel allocated by a bdev module */ 226 struct spdk_io_channel *shared_ch; 227 228 /* Refcount of bdev channels using this resource */ 229 uint32_t ref; 230 231 TAILQ_ENTRY(spdk_bdev_shared_resource) link; 232 }; 233 234 #define BDEV_CH_RESET_IN_PROGRESS (1 << 0) 235 #define BDEV_CH_QOS_ENABLED (1 << 1) 236 237 struct spdk_bdev_channel { 238 struct spdk_bdev *bdev; 239 240 /* The channel for the underlying device */ 241 struct spdk_io_channel *channel; 242 243 /* Per io_device per thread data */ 244 struct spdk_bdev_shared_resource *shared_resource; 245 246 struct spdk_bdev_io_stat stat; 247 248 /* 249 * Count of I/O submitted through this channel and waiting for completion. 250 * Incremented before submit_request() is called on an spdk_bdev_io. 251 */ 252 uint64_t io_outstanding; 253 254 bdev_io_tailq_t queued_resets; 255 256 uint32_t flags; 257 258 struct spdk_histogram_data *histogram; 259 260 #ifdef SPDK_CONFIG_VTUNE 261 uint64_t start_tsc; 262 uint64_t interval_tsc; 263 __itt_string_handle *handle; 264 struct spdk_bdev_io_stat prev_stat; 265 #endif 266 267 }; 268 269 struct spdk_bdev_desc { 270 struct spdk_bdev *bdev; 271 struct spdk_thread *thread; 272 spdk_bdev_remove_cb_t remove_cb; 273 void *remove_ctx; 274 bool remove_scheduled; 275 bool closed; 276 bool write; 277 TAILQ_ENTRY(spdk_bdev_desc) link; 278 }; 279 280 struct spdk_bdev_iostat_ctx { 281 struct spdk_bdev_io_stat *stat; 282 spdk_bdev_get_device_stat_cb cb; 283 void *cb_arg; 284 }; 285 286 struct set_qos_limit_ctx { 287 void (*cb_fn)(void *cb_arg, int status); 288 void *cb_arg; 289 struct spdk_bdev *bdev; 290 }; 291 292 #define __bdev_to_io_dev(bdev) (((char *)bdev) + 1) 293 #define __bdev_from_io_dev(io_dev) ((struct spdk_bdev *)(((char *)io_dev) - 1)) 294 295 static void _spdk_bdev_write_zero_buffer_done(struct spdk_bdev_io *bdev_io, bool success, 296 void *cb_arg); 297 static void _spdk_bdev_write_zero_buffer_next(void *_bdev_io); 298 299 static void _spdk_bdev_enable_qos_msg(struct spdk_io_channel_iter *i); 300 static void _spdk_bdev_enable_qos_done(struct spdk_io_channel_iter *i, int status); 301 302 static int 303 _spdk_bdev_readv_blocks_with_md(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 304 struct iovec *iov, int iovcnt, void *md_buf, uint64_t offset_blocks, 305 uint64_t num_blocks, spdk_bdev_io_completion_cb cb, void *cb_arg); 306 static int 307 _spdk_bdev_writev_blocks_with_md(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 308 struct iovec *iov, int iovcnt, void *md_buf, 309 uint64_t offset_blocks, uint64_t num_blocks, 310 spdk_bdev_io_completion_cb cb, void *cb_arg); 311 312 void 313 spdk_bdev_get_opts(struct spdk_bdev_opts *opts) 314 { 315 *opts = g_bdev_opts; 316 } 317 318 int 319 spdk_bdev_set_opts(struct spdk_bdev_opts *opts) 320 { 321 uint32_t min_pool_size; 322 323 /* 324 * Add 1 to the thread count to account for the extra mgmt_ch that gets created during subsystem 325 * initialization. A second mgmt_ch will be created on the same thread when the application starts 326 * but before the deferred put_io_channel event is executed for the first mgmt_ch. 327 */ 328 min_pool_size = opts->bdev_io_cache_size * (spdk_thread_get_count() + 1); 329 if (opts->bdev_io_pool_size < min_pool_size) { 330 SPDK_ERRLOG("bdev_io_pool_size %" PRIu32 " is not compatible with bdev_io_cache_size %" PRIu32 331 " and %" PRIu32 " threads\n", opts->bdev_io_pool_size, opts->bdev_io_cache_size, 332 spdk_thread_get_count()); 333 SPDK_ERRLOG("bdev_io_pool_size must be at least %" PRIu32 "\n", min_pool_size); 334 return -1; 335 } 336 337 g_bdev_opts = *opts; 338 return 0; 339 } 340 341 struct spdk_bdev * 342 spdk_bdev_first(void) 343 { 344 struct spdk_bdev *bdev; 345 346 bdev = TAILQ_FIRST(&g_bdev_mgr.bdevs); 347 if (bdev) { 348 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Starting bdev iteration at %s\n", bdev->name); 349 } 350 351 return bdev; 352 } 353 354 struct spdk_bdev * 355 spdk_bdev_next(struct spdk_bdev *prev) 356 { 357 struct spdk_bdev *bdev; 358 359 bdev = TAILQ_NEXT(prev, internal.link); 360 if (bdev) { 361 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Continuing bdev iteration at %s\n", bdev->name); 362 } 363 364 return bdev; 365 } 366 367 static struct spdk_bdev * 368 _bdev_next_leaf(struct spdk_bdev *bdev) 369 { 370 while (bdev != NULL) { 371 if (bdev->internal.claim_module == NULL) { 372 return bdev; 373 } else { 374 bdev = TAILQ_NEXT(bdev, internal.link); 375 } 376 } 377 378 return bdev; 379 } 380 381 struct spdk_bdev * 382 spdk_bdev_first_leaf(void) 383 { 384 struct spdk_bdev *bdev; 385 386 bdev = _bdev_next_leaf(TAILQ_FIRST(&g_bdev_mgr.bdevs)); 387 388 if (bdev) { 389 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Starting bdev iteration at %s\n", bdev->name); 390 } 391 392 return bdev; 393 } 394 395 struct spdk_bdev * 396 spdk_bdev_next_leaf(struct spdk_bdev *prev) 397 { 398 struct spdk_bdev *bdev; 399 400 bdev = _bdev_next_leaf(TAILQ_NEXT(prev, internal.link)); 401 402 if (bdev) { 403 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Continuing bdev iteration at %s\n", bdev->name); 404 } 405 406 return bdev; 407 } 408 409 struct spdk_bdev * 410 spdk_bdev_get_by_name(const char *bdev_name) 411 { 412 struct spdk_bdev_alias *tmp; 413 struct spdk_bdev *bdev = spdk_bdev_first(); 414 415 while (bdev != NULL) { 416 if (strcmp(bdev_name, bdev->name) == 0) { 417 return bdev; 418 } 419 420 TAILQ_FOREACH(tmp, &bdev->aliases, tailq) { 421 if (strcmp(bdev_name, tmp->alias) == 0) { 422 return bdev; 423 } 424 } 425 426 bdev = spdk_bdev_next(bdev); 427 } 428 429 return NULL; 430 } 431 432 void 433 spdk_bdev_io_set_buf(struct spdk_bdev_io *bdev_io, void *buf, size_t len) 434 { 435 struct iovec *iovs; 436 437 if (bdev_io->u.bdev.iovs == NULL) { 438 bdev_io->u.bdev.iovs = &bdev_io->iov; 439 bdev_io->u.bdev.iovcnt = 1; 440 } 441 442 iovs = bdev_io->u.bdev.iovs; 443 444 assert(iovs != NULL); 445 assert(bdev_io->u.bdev.iovcnt >= 1); 446 447 iovs[0].iov_base = buf; 448 iovs[0].iov_len = len; 449 } 450 451 void 452 spdk_bdev_io_set_md_buf(struct spdk_bdev_io *bdev_io, void *md_buf, size_t len) 453 { 454 assert((len / spdk_bdev_get_md_size(bdev_io->bdev)) >= bdev_io->u.bdev.num_blocks); 455 bdev_io->u.bdev.md_buf = md_buf; 456 } 457 458 static bool 459 _is_buf_allocated(const struct iovec *iovs) 460 { 461 if (iovs == NULL) { 462 return false; 463 } 464 465 return iovs[0].iov_base != NULL; 466 } 467 468 static bool 469 _are_iovs_aligned(struct iovec *iovs, int iovcnt, uint32_t alignment) 470 { 471 int i; 472 uintptr_t iov_base; 473 474 if (spdk_likely(alignment == 1)) { 475 return true; 476 } 477 478 for (i = 0; i < iovcnt; i++) { 479 iov_base = (uintptr_t)iovs[i].iov_base; 480 if ((iov_base & (alignment - 1)) != 0) { 481 return false; 482 } 483 } 484 485 return true; 486 } 487 488 static void 489 _copy_iovs_to_buf(void *buf, size_t buf_len, struct iovec *iovs, int iovcnt) 490 { 491 int i; 492 size_t len; 493 494 for (i = 0; i < iovcnt; i++) { 495 len = spdk_min(iovs[i].iov_len, buf_len); 496 memcpy(buf, iovs[i].iov_base, len); 497 buf += len; 498 buf_len -= len; 499 } 500 } 501 502 static void 503 _copy_buf_to_iovs(struct iovec *iovs, int iovcnt, void *buf, size_t buf_len) 504 { 505 int i; 506 size_t len; 507 508 for (i = 0; i < iovcnt; i++) { 509 len = spdk_min(iovs[i].iov_len, buf_len); 510 memcpy(iovs[i].iov_base, buf, len); 511 buf += len; 512 buf_len -= len; 513 } 514 } 515 516 static void 517 _bdev_io_set_bounce_buf(struct spdk_bdev_io *bdev_io, void *buf, size_t len) 518 { 519 /* save original iovec */ 520 bdev_io->internal.orig_iovs = bdev_io->u.bdev.iovs; 521 bdev_io->internal.orig_iovcnt = bdev_io->u.bdev.iovcnt; 522 /* set bounce iov */ 523 bdev_io->u.bdev.iovs = &bdev_io->internal.bounce_iov; 524 bdev_io->u.bdev.iovcnt = 1; 525 /* set bounce buffer for this operation */ 526 bdev_io->u.bdev.iovs[0].iov_base = buf; 527 bdev_io->u.bdev.iovs[0].iov_len = len; 528 /* if this is write path, copy data from original buffer to bounce buffer */ 529 if (bdev_io->type == SPDK_BDEV_IO_TYPE_WRITE) { 530 _copy_iovs_to_buf(buf, len, bdev_io->internal.orig_iovs, bdev_io->internal.orig_iovcnt); 531 } 532 } 533 534 static void 535 _bdev_io_set_bounce_md_buf(struct spdk_bdev_io *bdev_io, void *md_buf, size_t len) 536 { 537 /* save original md_buf */ 538 bdev_io->internal.orig_md_buf = bdev_io->u.bdev.md_buf; 539 /* set bounce md_buf */ 540 bdev_io->u.bdev.md_buf = md_buf; 541 542 if (bdev_io->type == SPDK_BDEV_IO_TYPE_WRITE) { 543 memcpy(md_buf, bdev_io->internal.orig_md_buf, len); 544 } 545 } 546 547 static void 548 _bdev_io_set_buf(struct spdk_bdev_io *bdev_io, void *buf, uint64_t len) 549 { 550 struct spdk_bdev *bdev = bdev_io->bdev; 551 bool buf_allocated; 552 uint64_t md_len, alignment; 553 void *aligned_buf; 554 555 alignment = spdk_bdev_get_buf_align(bdev); 556 buf_allocated = _is_buf_allocated(bdev_io->u.bdev.iovs); 557 aligned_buf = (void *)(((uintptr_t)buf + (alignment - 1)) & ~(alignment - 1)); 558 559 if (buf_allocated) { 560 _bdev_io_set_bounce_buf(bdev_io, aligned_buf, len); 561 } else { 562 spdk_bdev_io_set_buf(bdev_io, aligned_buf, len); 563 } 564 565 if (spdk_bdev_is_md_separate(bdev)) { 566 aligned_buf = (char *)aligned_buf + len; 567 md_len = bdev_io->u.bdev.num_blocks * bdev->md_len; 568 569 assert(((uintptr_t)aligned_buf & (alignment - 1)) == 0); 570 571 if (bdev_io->u.bdev.md_buf != NULL) { 572 _bdev_io_set_bounce_md_buf(bdev_io, aligned_buf, md_len); 573 } else { 574 spdk_bdev_io_set_md_buf(bdev_io, aligned_buf, md_len); 575 } 576 } 577 578 bdev_io->internal.buf = buf; 579 bdev_io->internal.get_buf_cb(spdk_bdev_io_get_io_channel(bdev_io), bdev_io, true); 580 } 581 582 static void 583 spdk_bdev_io_put_buf(struct spdk_bdev_io *bdev_io) 584 { 585 struct spdk_bdev *bdev = bdev_io->bdev; 586 struct spdk_mempool *pool; 587 struct spdk_bdev_io *tmp; 588 bdev_io_stailq_t *stailq; 589 struct spdk_bdev_mgmt_channel *ch; 590 uint64_t buf_len, md_len, alignment; 591 void *buf; 592 593 buf = bdev_io->internal.buf; 594 buf_len = bdev_io->internal.buf_len; 595 md_len = spdk_bdev_is_md_separate(bdev) ? bdev_io->u.bdev.num_blocks * bdev->md_len : 0; 596 alignment = spdk_bdev_get_buf_align(bdev); 597 ch = bdev_io->internal.ch->shared_resource->mgmt_ch; 598 599 bdev_io->internal.buf = NULL; 600 601 if (buf_len + alignment + md_len <= SPDK_BDEV_BUF_SIZE_WITH_MD(SPDK_BDEV_SMALL_BUF_MAX_SIZE) + 602 SPDK_BDEV_POOL_ALIGNMENT) { 603 pool = g_bdev_mgr.buf_small_pool; 604 stailq = &ch->need_buf_small; 605 } else { 606 pool = g_bdev_mgr.buf_large_pool; 607 stailq = &ch->need_buf_large; 608 } 609 610 if (STAILQ_EMPTY(stailq)) { 611 spdk_mempool_put(pool, buf); 612 } else { 613 tmp = STAILQ_FIRST(stailq); 614 STAILQ_REMOVE_HEAD(stailq, internal.buf_link); 615 _bdev_io_set_buf(tmp, buf, tmp->internal.buf_len); 616 } 617 } 618 619 static void 620 _bdev_io_unset_bounce_buf(struct spdk_bdev_io *bdev_io) 621 { 622 if (spdk_likely(bdev_io->internal.orig_iovcnt == 0)) { 623 assert(bdev_io->internal.orig_md_buf == NULL); 624 return; 625 } 626 627 /* if this is read path, copy data from bounce buffer to original buffer */ 628 if (bdev_io->type == SPDK_BDEV_IO_TYPE_READ && 629 bdev_io->internal.status == SPDK_BDEV_IO_STATUS_SUCCESS) { 630 _copy_buf_to_iovs(bdev_io->internal.orig_iovs, 631 bdev_io->internal.orig_iovcnt, 632 bdev_io->internal.bounce_iov.iov_base, 633 bdev_io->internal.bounce_iov.iov_len); 634 } 635 /* set orignal buffer for this io */ 636 bdev_io->u.bdev.iovcnt = bdev_io->internal.orig_iovcnt; 637 bdev_io->u.bdev.iovs = bdev_io->internal.orig_iovs; 638 /* disable bouncing buffer for this io */ 639 bdev_io->internal.orig_iovcnt = 0; 640 bdev_io->internal.orig_iovs = NULL; 641 642 /* do the same for metadata buffer */ 643 if (spdk_unlikely(bdev_io->internal.orig_md_buf != NULL)) { 644 assert(spdk_bdev_is_md_separate(bdev_io->bdev)); 645 646 if (bdev_io->type == SPDK_BDEV_IO_TYPE_READ && 647 bdev_io->internal.status == SPDK_BDEV_IO_STATUS_SUCCESS) { 648 memcpy(bdev_io->internal.orig_md_buf, bdev_io->u.bdev.md_buf, 649 bdev_io->u.bdev.num_blocks * spdk_bdev_get_md_size(bdev_io->bdev)); 650 } 651 652 bdev_io->u.bdev.md_buf = bdev_io->internal.orig_md_buf; 653 bdev_io->internal.orig_md_buf = NULL; 654 } 655 656 spdk_bdev_io_put_buf(bdev_io); 657 } 658 659 void 660 spdk_bdev_io_get_buf(struct spdk_bdev_io *bdev_io, spdk_bdev_io_get_buf_cb cb, uint64_t len) 661 { 662 struct spdk_bdev *bdev = bdev_io->bdev; 663 struct spdk_mempool *pool; 664 bdev_io_stailq_t *stailq; 665 struct spdk_bdev_mgmt_channel *mgmt_ch; 666 uint64_t alignment, md_len; 667 void *buf; 668 669 assert(cb != NULL); 670 671 alignment = spdk_bdev_get_buf_align(bdev); 672 md_len = spdk_bdev_is_md_separate(bdev) ? bdev_io->u.bdev.num_blocks * bdev->md_len : 0; 673 674 if (_is_buf_allocated(bdev_io->u.bdev.iovs) && 675 _are_iovs_aligned(bdev_io->u.bdev.iovs, bdev_io->u.bdev.iovcnt, alignment)) { 676 /* Buffer already present and aligned */ 677 cb(spdk_bdev_io_get_io_channel(bdev_io), bdev_io, true); 678 return; 679 } 680 681 if (len + alignment + md_len > SPDK_BDEV_BUF_SIZE_WITH_MD(SPDK_BDEV_LARGE_BUF_MAX_SIZE) + 682 SPDK_BDEV_POOL_ALIGNMENT) { 683 SPDK_ERRLOG("Length + alignment %" PRIu64 " is larger than allowed\n", 684 len + alignment); 685 cb(spdk_bdev_io_get_io_channel(bdev_io), bdev_io, false); 686 return; 687 } 688 689 mgmt_ch = bdev_io->internal.ch->shared_resource->mgmt_ch; 690 691 bdev_io->internal.buf_len = len; 692 bdev_io->internal.get_buf_cb = cb; 693 694 if (len + alignment + md_len <= SPDK_BDEV_BUF_SIZE_WITH_MD(SPDK_BDEV_SMALL_BUF_MAX_SIZE) + 695 SPDK_BDEV_POOL_ALIGNMENT) { 696 pool = g_bdev_mgr.buf_small_pool; 697 stailq = &mgmt_ch->need_buf_small; 698 } else { 699 pool = g_bdev_mgr.buf_large_pool; 700 stailq = &mgmt_ch->need_buf_large; 701 } 702 703 buf = spdk_mempool_get(pool); 704 if (!buf) { 705 STAILQ_INSERT_TAIL(stailq, bdev_io, internal.buf_link); 706 } else { 707 _bdev_io_set_buf(bdev_io, buf, len); 708 } 709 } 710 711 static int 712 spdk_bdev_module_get_max_ctx_size(void) 713 { 714 struct spdk_bdev_module *bdev_module; 715 int max_bdev_module_size = 0; 716 717 TAILQ_FOREACH(bdev_module, &g_bdev_mgr.bdev_modules, internal.tailq) { 718 if (bdev_module->get_ctx_size && bdev_module->get_ctx_size() > max_bdev_module_size) { 719 max_bdev_module_size = bdev_module->get_ctx_size(); 720 } 721 } 722 723 return max_bdev_module_size; 724 } 725 726 void 727 spdk_bdev_config_text(FILE *fp) 728 { 729 struct spdk_bdev_module *bdev_module; 730 731 TAILQ_FOREACH(bdev_module, &g_bdev_mgr.bdev_modules, internal.tailq) { 732 if (bdev_module->config_text) { 733 bdev_module->config_text(fp); 734 } 735 } 736 } 737 738 static void 739 spdk_bdev_qos_config_json(struct spdk_bdev *bdev, struct spdk_json_write_ctx *w) 740 { 741 int i; 742 struct spdk_bdev_qos *qos = bdev->internal.qos; 743 uint64_t limits[SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES]; 744 745 if (!qos) { 746 return; 747 } 748 749 spdk_bdev_get_qos_rate_limits(bdev, limits); 750 751 spdk_json_write_object_begin(w); 752 spdk_json_write_named_string(w, "method", "set_bdev_qos_limit"); 753 754 spdk_json_write_named_object_begin(w, "params"); 755 spdk_json_write_named_string(w, "name", bdev->name); 756 for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) { 757 if (limits[i] > 0) { 758 spdk_json_write_named_uint64(w, qos_rpc_type[i], limits[i]); 759 } 760 } 761 spdk_json_write_object_end(w); 762 763 spdk_json_write_object_end(w); 764 } 765 766 void 767 spdk_bdev_subsystem_config_json(struct spdk_json_write_ctx *w) 768 { 769 struct spdk_bdev_module *bdev_module; 770 struct spdk_bdev *bdev; 771 772 assert(w != NULL); 773 774 spdk_json_write_array_begin(w); 775 776 spdk_json_write_object_begin(w); 777 spdk_json_write_named_string(w, "method", "set_bdev_options"); 778 spdk_json_write_named_object_begin(w, "params"); 779 spdk_json_write_named_uint32(w, "bdev_io_pool_size", g_bdev_opts.bdev_io_pool_size); 780 spdk_json_write_named_uint32(w, "bdev_io_cache_size", g_bdev_opts.bdev_io_cache_size); 781 spdk_json_write_object_end(w); 782 spdk_json_write_object_end(w); 783 784 TAILQ_FOREACH(bdev_module, &g_bdev_mgr.bdev_modules, internal.tailq) { 785 if (bdev_module->config_json) { 786 bdev_module->config_json(w); 787 } 788 } 789 790 TAILQ_FOREACH(bdev, &g_bdev_mgr.bdevs, internal.link) { 791 if (bdev->fn_table->write_config_json) { 792 bdev->fn_table->write_config_json(bdev, w); 793 } 794 795 spdk_bdev_qos_config_json(bdev, w); 796 } 797 798 spdk_json_write_array_end(w); 799 } 800 801 static int 802 spdk_bdev_mgmt_channel_create(void *io_device, void *ctx_buf) 803 { 804 struct spdk_bdev_mgmt_channel *ch = ctx_buf; 805 struct spdk_bdev_io *bdev_io; 806 uint32_t i; 807 808 STAILQ_INIT(&ch->need_buf_small); 809 STAILQ_INIT(&ch->need_buf_large); 810 811 STAILQ_INIT(&ch->per_thread_cache); 812 ch->bdev_io_cache_size = g_bdev_opts.bdev_io_cache_size; 813 814 /* Pre-populate bdev_io cache to ensure this thread cannot be starved. */ 815 ch->per_thread_cache_count = 0; 816 for (i = 0; i < ch->bdev_io_cache_size; i++) { 817 bdev_io = spdk_mempool_get(g_bdev_mgr.bdev_io_pool); 818 assert(bdev_io != NULL); 819 ch->per_thread_cache_count++; 820 STAILQ_INSERT_HEAD(&ch->per_thread_cache, bdev_io, internal.buf_link); 821 } 822 823 TAILQ_INIT(&ch->shared_resources); 824 TAILQ_INIT(&ch->io_wait_queue); 825 826 return 0; 827 } 828 829 static void 830 spdk_bdev_mgmt_channel_destroy(void *io_device, void *ctx_buf) 831 { 832 struct spdk_bdev_mgmt_channel *ch = ctx_buf; 833 struct spdk_bdev_io *bdev_io; 834 835 if (!STAILQ_EMPTY(&ch->need_buf_small) || !STAILQ_EMPTY(&ch->need_buf_large)) { 836 SPDK_ERRLOG("Pending I/O list wasn't empty on mgmt channel free\n"); 837 } 838 839 if (!TAILQ_EMPTY(&ch->shared_resources)) { 840 SPDK_ERRLOG("Module channel list wasn't empty on mgmt channel free\n"); 841 } 842 843 while (!STAILQ_EMPTY(&ch->per_thread_cache)) { 844 bdev_io = STAILQ_FIRST(&ch->per_thread_cache); 845 STAILQ_REMOVE_HEAD(&ch->per_thread_cache, internal.buf_link); 846 ch->per_thread_cache_count--; 847 spdk_mempool_put(g_bdev_mgr.bdev_io_pool, (void *)bdev_io); 848 } 849 850 assert(ch->per_thread_cache_count == 0); 851 } 852 853 static void 854 spdk_bdev_init_complete(int rc) 855 { 856 spdk_bdev_init_cb cb_fn = g_init_cb_fn; 857 void *cb_arg = g_init_cb_arg; 858 struct spdk_bdev_module *m; 859 860 g_bdev_mgr.init_complete = true; 861 g_init_cb_fn = NULL; 862 g_init_cb_arg = NULL; 863 864 /* 865 * For modules that need to know when subsystem init is complete, 866 * inform them now. 867 */ 868 if (rc == 0) { 869 TAILQ_FOREACH(m, &g_bdev_mgr.bdev_modules, internal.tailq) { 870 if (m->init_complete) { 871 m->init_complete(); 872 } 873 } 874 } 875 876 cb_fn(cb_arg, rc); 877 } 878 879 static void 880 spdk_bdev_module_action_complete(void) 881 { 882 struct spdk_bdev_module *m; 883 884 /* 885 * Don't finish bdev subsystem initialization if 886 * module pre-initialization is still in progress, or 887 * the subsystem been already initialized. 888 */ 889 if (!g_bdev_mgr.module_init_complete || g_bdev_mgr.init_complete) { 890 return; 891 } 892 893 /* 894 * Check all bdev modules for inits/examinations in progress. If any 895 * exist, return immediately since we cannot finish bdev subsystem 896 * initialization until all are completed. 897 */ 898 TAILQ_FOREACH(m, &g_bdev_mgr.bdev_modules, internal.tailq) { 899 if (m->internal.action_in_progress > 0) { 900 return; 901 } 902 } 903 904 /* 905 * Modules already finished initialization - now that all 906 * the bdev modules have finished their asynchronous I/O 907 * processing, the entire bdev layer can be marked as complete. 908 */ 909 spdk_bdev_init_complete(0); 910 } 911 912 static void 913 spdk_bdev_module_action_done(struct spdk_bdev_module *module) 914 { 915 assert(module->internal.action_in_progress > 0); 916 module->internal.action_in_progress--; 917 spdk_bdev_module_action_complete(); 918 } 919 920 void 921 spdk_bdev_module_init_done(struct spdk_bdev_module *module) 922 { 923 spdk_bdev_module_action_done(module); 924 } 925 926 void 927 spdk_bdev_module_examine_done(struct spdk_bdev_module *module) 928 { 929 spdk_bdev_module_action_done(module); 930 } 931 932 /** The last initialized bdev module */ 933 static struct spdk_bdev_module *g_resume_bdev_module = NULL; 934 935 static int 936 spdk_bdev_modules_init(void) 937 { 938 struct spdk_bdev_module *module; 939 int rc = 0; 940 941 TAILQ_FOREACH(module, &g_bdev_mgr.bdev_modules, internal.tailq) { 942 g_resume_bdev_module = module; 943 if (module->async_init) { 944 module->internal.action_in_progress = 1; 945 } 946 rc = module->module_init(); 947 if (rc != 0) { 948 return rc; 949 } 950 } 951 952 g_resume_bdev_module = NULL; 953 return 0; 954 } 955 956 static void 957 spdk_bdev_init_failed(void *cb_arg) 958 { 959 spdk_bdev_init_complete(-1); 960 } 961 962 void 963 spdk_bdev_initialize(spdk_bdev_init_cb cb_fn, void *cb_arg) 964 { 965 struct spdk_conf_section *sp; 966 struct spdk_bdev_opts bdev_opts; 967 int32_t bdev_io_pool_size, bdev_io_cache_size; 968 int cache_size; 969 int rc = 0; 970 char mempool_name[32]; 971 972 assert(cb_fn != NULL); 973 974 sp = spdk_conf_find_section(NULL, "Bdev"); 975 if (sp != NULL) { 976 spdk_bdev_get_opts(&bdev_opts); 977 978 bdev_io_pool_size = spdk_conf_section_get_intval(sp, "BdevIoPoolSize"); 979 if (bdev_io_pool_size >= 0) { 980 bdev_opts.bdev_io_pool_size = bdev_io_pool_size; 981 } 982 983 bdev_io_cache_size = spdk_conf_section_get_intval(sp, "BdevIoCacheSize"); 984 if (bdev_io_cache_size >= 0) { 985 bdev_opts.bdev_io_cache_size = bdev_io_cache_size; 986 } 987 988 if (spdk_bdev_set_opts(&bdev_opts)) { 989 spdk_bdev_init_complete(-1); 990 return; 991 } 992 993 assert(memcmp(&bdev_opts, &g_bdev_opts, sizeof(bdev_opts)) == 0); 994 } 995 996 g_init_cb_fn = cb_fn; 997 g_init_cb_arg = cb_arg; 998 999 spdk_notify_type_register("bdev_register"); 1000 spdk_notify_type_register("bdev_unregister"); 1001 1002 snprintf(mempool_name, sizeof(mempool_name), "bdev_io_%d", getpid()); 1003 1004 g_bdev_mgr.bdev_io_pool = spdk_mempool_create(mempool_name, 1005 g_bdev_opts.bdev_io_pool_size, 1006 sizeof(struct spdk_bdev_io) + 1007 spdk_bdev_module_get_max_ctx_size(), 1008 0, 1009 SPDK_ENV_SOCKET_ID_ANY); 1010 1011 if (g_bdev_mgr.bdev_io_pool == NULL) { 1012 SPDK_ERRLOG("could not allocate spdk_bdev_io pool\n"); 1013 spdk_bdev_init_complete(-1); 1014 return; 1015 } 1016 1017 /** 1018 * Ensure no more than half of the total buffers end up local caches, by 1019 * using spdk_thread_get_count() to determine how many local caches we need 1020 * to account for. 1021 */ 1022 cache_size = BUF_SMALL_POOL_SIZE / (2 * spdk_thread_get_count()); 1023 snprintf(mempool_name, sizeof(mempool_name), "buf_small_pool_%d", getpid()); 1024 1025 g_bdev_mgr.buf_small_pool = spdk_mempool_create(mempool_name, 1026 BUF_SMALL_POOL_SIZE, 1027 SPDK_BDEV_BUF_SIZE_WITH_MD(SPDK_BDEV_SMALL_BUF_MAX_SIZE) + 1028 SPDK_BDEV_POOL_ALIGNMENT, 1029 cache_size, 1030 SPDK_ENV_SOCKET_ID_ANY); 1031 if (!g_bdev_mgr.buf_small_pool) { 1032 SPDK_ERRLOG("create rbuf small pool failed\n"); 1033 spdk_bdev_init_complete(-1); 1034 return; 1035 } 1036 1037 cache_size = BUF_LARGE_POOL_SIZE / (2 * spdk_thread_get_count()); 1038 snprintf(mempool_name, sizeof(mempool_name), "buf_large_pool_%d", getpid()); 1039 1040 g_bdev_mgr.buf_large_pool = spdk_mempool_create(mempool_name, 1041 BUF_LARGE_POOL_SIZE, 1042 SPDK_BDEV_BUF_SIZE_WITH_MD(SPDK_BDEV_LARGE_BUF_MAX_SIZE) + 1043 SPDK_BDEV_POOL_ALIGNMENT, 1044 cache_size, 1045 SPDK_ENV_SOCKET_ID_ANY); 1046 if (!g_bdev_mgr.buf_large_pool) { 1047 SPDK_ERRLOG("create rbuf large pool failed\n"); 1048 spdk_bdev_init_complete(-1); 1049 return; 1050 } 1051 1052 g_bdev_mgr.zero_buffer = spdk_zmalloc(ZERO_BUFFER_SIZE, ZERO_BUFFER_SIZE, 1053 NULL, SPDK_ENV_LCORE_ID_ANY, SPDK_MALLOC_DMA); 1054 if (!g_bdev_mgr.zero_buffer) { 1055 SPDK_ERRLOG("create bdev zero buffer failed\n"); 1056 spdk_bdev_init_complete(-1); 1057 return; 1058 } 1059 1060 #ifdef SPDK_CONFIG_VTUNE 1061 g_bdev_mgr.domain = __itt_domain_create("spdk_bdev"); 1062 #endif 1063 1064 spdk_io_device_register(&g_bdev_mgr, spdk_bdev_mgmt_channel_create, 1065 spdk_bdev_mgmt_channel_destroy, 1066 sizeof(struct spdk_bdev_mgmt_channel), 1067 "bdev_mgr"); 1068 1069 rc = spdk_bdev_modules_init(); 1070 g_bdev_mgr.module_init_complete = true; 1071 if (rc != 0) { 1072 SPDK_ERRLOG("bdev modules init failed\n"); 1073 spdk_thread_send_msg(spdk_get_thread(), spdk_bdev_init_failed, NULL); 1074 return; 1075 } 1076 1077 spdk_bdev_module_action_complete(); 1078 } 1079 1080 static void 1081 spdk_bdev_mgr_unregister_cb(void *io_device) 1082 { 1083 spdk_bdev_fini_cb cb_fn = g_fini_cb_fn; 1084 1085 if (spdk_mempool_count(g_bdev_mgr.bdev_io_pool) != g_bdev_opts.bdev_io_pool_size) { 1086 SPDK_ERRLOG("bdev IO pool count is %zu but should be %u\n", 1087 spdk_mempool_count(g_bdev_mgr.bdev_io_pool), 1088 g_bdev_opts.bdev_io_pool_size); 1089 } 1090 1091 if (spdk_mempool_count(g_bdev_mgr.buf_small_pool) != BUF_SMALL_POOL_SIZE) { 1092 SPDK_ERRLOG("Small buffer pool count is %zu but should be %u\n", 1093 spdk_mempool_count(g_bdev_mgr.buf_small_pool), 1094 BUF_SMALL_POOL_SIZE); 1095 assert(false); 1096 } 1097 1098 if (spdk_mempool_count(g_bdev_mgr.buf_large_pool) != BUF_LARGE_POOL_SIZE) { 1099 SPDK_ERRLOG("Large buffer pool count is %zu but should be %u\n", 1100 spdk_mempool_count(g_bdev_mgr.buf_large_pool), 1101 BUF_LARGE_POOL_SIZE); 1102 assert(false); 1103 } 1104 1105 spdk_mempool_free(g_bdev_mgr.bdev_io_pool); 1106 spdk_mempool_free(g_bdev_mgr.buf_small_pool); 1107 spdk_mempool_free(g_bdev_mgr.buf_large_pool); 1108 spdk_free(g_bdev_mgr.zero_buffer); 1109 1110 cb_fn(g_fini_cb_arg); 1111 g_fini_cb_fn = NULL; 1112 g_fini_cb_arg = NULL; 1113 g_bdev_mgr.init_complete = false; 1114 g_bdev_mgr.module_init_complete = false; 1115 } 1116 1117 static void 1118 spdk_bdev_module_finish_iter(void *arg) 1119 { 1120 struct spdk_bdev_module *bdev_module; 1121 1122 /* Start iterating from the last touched module */ 1123 if (!g_resume_bdev_module) { 1124 bdev_module = TAILQ_LAST(&g_bdev_mgr.bdev_modules, bdev_module_list); 1125 } else { 1126 bdev_module = TAILQ_PREV(g_resume_bdev_module, bdev_module_list, 1127 internal.tailq); 1128 } 1129 1130 while (bdev_module) { 1131 if (bdev_module->async_fini) { 1132 /* Save our place so we can resume later. We must 1133 * save the variable here, before calling module_fini() 1134 * below, because in some cases the module may immediately 1135 * call spdk_bdev_module_finish_done() and re-enter 1136 * this function to continue iterating. */ 1137 g_resume_bdev_module = bdev_module; 1138 } 1139 1140 if (bdev_module->module_fini) { 1141 bdev_module->module_fini(); 1142 } 1143 1144 if (bdev_module->async_fini) { 1145 return; 1146 } 1147 1148 bdev_module = TAILQ_PREV(bdev_module, bdev_module_list, 1149 internal.tailq); 1150 } 1151 1152 g_resume_bdev_module = NULL; 1153 spdk_io_device_unregister(&g_bdev_mgr, spdk_bdev_mgr_unregister_cb); 1154 } 1155 1156 void 1157 spdk_bdev_module_finish_done(void) 1158 { 1159 if (spdk_get_thread() != g_fini_thread) { 1160 spdk_thread_send_msg(g_fini_thread, spdk_bdev_module_finish_iter, NULL); 1161 } else { 1162 spdk_bdev_module_finish_iter(NULL); 1163 } 1164 } 1165 1166 static void 1167 _spdk_bdev_finish_unregister_bdevs_iter(void *cb_arg, int bdeverrno) 1168 { 1169 struct spdk_bdev *bdev = cb_arg; 1170 1171 if (bdeverrno && bdev) { 1172 SPDK_WARNLOG("Unable to unregister bdev '%s' during spdk_bdev_finish()\n", 1173 bdev->name); 1174 1175 /* 1176 * Since the call to spdk_bdev_unregister() failed, we have no way to free this 1177 * bdev; try to continue by manually removing this bdev from the list and continue 1178 * with the next bdev in the list. 1179 */ 1180 TAILQ_REMOVE(&g_bdev_mgr.bdevs, bdev, internal.link); 1181 } 1182 1183 if (TAILQ_EMPTY(&g_bdev_mgr.bdevs)) { 1184 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Done unregistering bdevs\n"); 1185 /* 1186 * Bdev module finish need to be deferred as we might be in the middle of some context 1187 * (like bdev part free) that will use this bdev (or private bdev driver ctx data) 1188 * after returning. 1189 */ 1190 spdk_thread_send_msg(spdk_get_thread(), spdk_bdev_module_finish_iter, NULL); 1191 return; 1192 } 1193 1194 /* 1195 * Unregister last unclaimed bdev in the list, to ensure that bdev subsystem 1196 * shutdown proceeds top-down. The goal is to give virtual bdevs an opportunity 1197 * to detect clean shutdown as opposed to run-time hot removal of the underlying 1198 * base bdevs. 1199 * 1200 * Also, walk the list in the reverse order. 1201 */ 1202 for (bdev = TAILQ_LAST(&g_bdev_mgr.bdevs, spdk_bdev_list); 1203 bdev; bdev = TAILQ_PREV(bdev, spdk_bdev_list, internal.link)) { 1204 if (bdev->internal.claim_module != NULL) { 1205 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Skipping claimed bdev '%s'(<-'%s').\n", 1206 bdev->name, bdev->internal.claim_module->name); 1207 continue; 1208 } 1209 1210 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Unregistering bdev '%s'\n", bdev->name); 1211 spdk_bdev_unregister(bdev, _spdk_bdev_finish_unregister_bdevs_iter, bdev); 1212 return; 1213 } 1214 1215 /* 1216 * If any bdev fails to unclaim underlying bdev properly, we may face the 1217 * case of bdev list consisting of claimed bdevs only (if claims are managed 1218 * correctly, this would mean there's a loop in the claims graph which is 1219 * clearly impossible). Warn and unregister last bdev on the list then. 1220 */ 1221 for (bdev = TAILQ_LAST(&g_bdev_mgr.bdevs, spdk_bdev_list); 1222 bdev; bdev = TAILQ_PREV(bdev, spdk_bdev_list, internal.link)) { 1223 SPDK_ERRLOG("Unregistering claimed bdev '%s'!\n", bdev->name); 1224 spdk_bdev_unregister(bdev, _spdk_bdev_finish_unregister_bdevs_iter, bdev); 1225 return; 1226 } 1227 } 1228 1229 void 1230 spdk_bdev_finish(spdk_bdev_fini_cb cb_fn, void *cb_arg) 1231 { 1232 struct spdk_bdev_module *m; 1233 1234 assert(cb_fn != NULL); 1235 1236 g_fini_thread = spdk_get_thread(); 1237 1238 g_fini_cb_fn = cb_fn; 1239 g_fini_cb_arg = cb_arg; 1240 1241 TAILQ_FOREACH(m, &g_bdev_mgr.bdev_modules, internal.tailq) { 1242 if (m->fini_start) { 1243 m->fini_start(); 1244 } 1245 } 1246 1247 _spdk_bdev_finish_unregister_bdevs_iter(NULL, 0); 1248 } 1249 1250 static struct spdk_bdev_io * 1251 spdk_bdev_get_io(struct spdk_bdev_channel *channel) 1252 { 1253 struct spdk_bdev_mgmt_channel *ch = channel->shared_resource->mgmt_ch; 1254 struct spdk_bdev_io *bdev_io; 1255 1256 if (ch->per_thread_cache_count > 0) { 1257 bdev_io = STAILQ_FIRST(&ch->per_thread_cache); 1258 STAILQ_REMOVE_HEAD(&ch->per_thread_cache, internal.buf_link); 1259 ch->per_thread_cache_count--; 1260 } else if (spdk_unlikely(!TAILQ_EMPTY(&ch->io_wait_queue))) { 1261 /* 1262 * Don't try to look for bdev_ios in the global pool if there are 1263 * waiters on bdev_ios - we don't want this caller to jump the line. 1264 */ 1265 bdev_io = NULL; 1266 } else { 1267 bdev_io = spdk_mempool_get(g_bdev_mgr.bdev_io_pool); 1268 } 1269 1270 return bdev_io; 1271 } 1272 1273 void 1274 spdk_bdev_free_io(struct spdk_bdev_io *bdev_io) 1275 { 1276 struct spdk_bdev_mgmt_channel *ch; 1277 1278 assert(bdev_io != NULL); 1279 assert(bdev_io->internal.status != SPDK_BDEV_IO_STATUS_PENDING); 1280 1281 ch = bdev_io->internal.ch->shared_resource->mgmt_ch; 1282 1283 if (bdev_io->internal.buf != NULL) { 1284 spdk_bdev_io_put_buf(bdev_io); 1285 } 1286 1287 if (ch->per_thread_cache_count < ch->bdev_io_cache_size) { 1288 ch->per_thread_cache_count++; 1289 STAILQ_INSERT_HEAD(&ch->per_thread_cache, bdev_io, internal.buf_link); 1290 while (ch->per_thread_cache_count > 0 && !TAILQ_EMPTY(&ch->io_wait_queue)) { 1291 struct spdk_bdev_io_wait_entry *entry; 1292 1293 entry = TAILQ_FIRST(&ch->io_wait_queue); 1294 TAILQ_REMOVE(&ch->io_wait_queue, entry, link); 1295 entry->cb_fn(entry->cb_arg); 1296 } 1297 } else { 1298 /* We should never have a full cache with entries on the io wait queue. */ 1299 assert(TAILQ_EMPTY(&ch->io_wait_queue)); 1300 spdk_mempool_put(g_bdev_mgr.bdev_io_pool, (void *)bdev_io); 1301 } 1302 } 1303 1304 static bool 1305 _spdk_bdev_qos_is_iops_rate_limit(enum spdk_bdev_qos_rate_limit_type limit) 1306 { 1307 assert(limit != SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES); 1308 1309 switch (limit) { 1310 case SPDK_BDEV_QOS_RW_IOPS_RATE_LIMIT: 1311 return true; 1312 case SPDK_BDEV_QOS_RW_BPS_RATE_LIMIT: 1313 case SPDK_BDEV_QOS_R_BPS_RATE_LIMIT: 1314 case SPDK_BDEV_QOS_W_BPS_RATE_LIMIT: 1315 return false; 1316 case SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES: 1317 default: 1318 return false; 1319 } 1320 } 1321 1322 static bool 1323 _spdk_bdev_qos_io_to_limit(struct spdk_bdev_io *bdev_io) 1324 { 1325 switch (bdev_io->type) { 1326 case SPDK_BDEV_IO_TYPE_NVME_IO: 1327 case SPDK_BDEV_IO_TYPE_NVME_IO_MD: 1328 case SPDK_BDEV_IO_TYPE_READ: 1329 case SPDK_BDEV_IO_TYPE_WRITE: 1330 return true; 1331 default: 1332 return false; 1333 } 1334 } 1335 1336 static bool 1337 _spdk_bdev_is_read_io(struct spdk_bdev_io *bdev_io) 1338 { 1339 switch (bdev_io->type) { 1340 case SPDK_BDEV_IO_TYPE_NVME_IO: 1341 case SPDK_BDEV_IO_TYPE_NVME_IO_MD: 1342 /* Bit 1 (0x2) set for read operation */ 1343 if (bdev_io->u.nvme_passthru.cmd.opc & SPDK_NVME_OPC_READ) { 1344 return true; 1345 } else { 1346 return false; 1347 } 1348 case SPDK_BDEV_IO_TYPE_READ: 1349 return true; 1350 default: 1351 return false; 1352 } 1353 } 1354 1355 static uint64_t 1356 _spdk_bdev_get_io_size_in_byte(struct spdk_bdev_io *bdev_io) 1357 { 1358 struct spdk_bdev *bdev = bdev_io->bdev; 1359 1360 switch (bdev_io->type) { 1361 case SPDK_BDEV_IO_TYPE_NVME_IO: 1362 case SPDK_BDEV_IO_TYPE_NVME_IO_MD: 1363 return bdev_io->u.nvme_passthru.nbytes; 1364 case SPDK_BDEV_IO_TYPE_READ: 1365 case SPDK_BDEV_IO_TYPE_WRITE: 1366 return bdev_io->u.bdev.num_blocks * bdev->blocklen; 1367 default: 1368 return 0; 1369 } 1370 } 1371 1372 static bool 1373 _spdk_bdev_qos_rw_queue_io(const struct spdk_bdev_qos_limit *limit, struct spdk_bdev_io *io) 1374 { 1375 if (limit->max_per_timeslice > 0 && limit->remaining_this_timeslice <= 0) { 1376 return true; 1377 } else { 1378 return false; 1379 } 1380 } 1381 1382 static bool 1383 _spdk_bdev_qos_r_queue_io(const struct spdk_bdev_qos_limit *limit, struct spdk_bdev_io *io) 1384 { 1385 if (_spdk_bdev_is_read_io(io) == false) { 1386 return false; 1387 } 1388 1389 return _spdk_bdev_qos_rw_queue_io(limit, io); 1390 } 1391 1392 static bool 1393 _spdk_bdev_qos_w_queue_io(const struct spdk_bdev_qos_limit *limit, struct spdk_bdev_io *io) 1394 { 1395 if (_spdk_bdev_is_read_io(io) == true) { 1396 return false; 1397 } 1398 1399 return _spdk_bdev_qos_rw_queue_io(limit, io); 1400 } 1401 1402 static void 1403 _spdk_bdev_qos_rw_iops_update_quota(struct spdk_bdev_qos_limit *limit, struct spdk_bdev_io *io) 1404 { 1405 limit->remaining_this_timeslice--; 1406 } 1407 1408 static void 1409 _spdk_bdev_qos_rw_bps_update_quota(struct spdk_bdev_qos_limit *limit, struct spdk_bdev_io *io) 1410 { 1411 limit->remaining_this_timeslice -= _spdk_bdev_get_io_size_in_byte(io); 1412 } 1413 1414 static void 1415 _spdk_bdev_qos_r_bps_update_quota(struct spdk_bdev_qos_limit *limit, struct spdk_bdev_io *io) 1416 { 1417 if (_spdk_bdev_is_read_io(io) == false) { 1418 return; 1419 } 1420 1421 return _spdk_bdev_qos_rw_bps_update_quota(limit, io); 1422 } 1423 1424 static void 1425 _spdk_bdev_qos_w_bps_update_quota(struct spdk_bdev_qos_limit *limit, struct spdk_bdev_io *io) 1426 { 1427 if (_spdk_bdev_is_read_io(io) == true) { 1428 return; 1429 } 1430 1431 return _spdk_bdev_qos_rw_bps_update_quota(limit, io); 1432 } 1433 1434 static void 1435 _spdk_bdev_qos_set_ops(struct spdk_bdev_qos *qos) 1436 { 1437 int i; 1438 1439 for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) { 1440 if (qos->rate_limits[i].limit == SPDK_BDEV_QOS_LIMIT_NOT_DEFINED) { 1441 qos->rate_limits[i].queue_io = NULL; 1442 qos->rate_limits[i].update_quota = NULL; 1443 continue; 1444 } 1445 1446 switch (i) { 1447 case SPDK_BDEV_QOS_RW_IOPS_RATE_LIMIT: 1448 qos->rate_limits[i].queue_io = _spdk_bdev_qos_rw_queue_io; 1449 qos->rate_limits[i].update_quota = _spdk_bdev_qos_rw_iops_update_quota; 1450 break; 1451 case SPDK_BDEV_QOS_RW_BPS_RATE_LIMIT: 1452 qos->rate_limits[i].queue_io = _spdk_bdev_qos_rw_queue_io; 1453 qos->rate_limits[i].update_quota = _spdk_bdev_qos_rw_bps_update_quota; 1454 break; 1455 case SPDK_BDEV_QOS_R_BPS_RATE_LIMIT: 1456 qos->rate_limits[i].queue_io = _spdk_bdev_qos_r_queue_io; 1457 qos->rate_limits[i].update_quota = _spdk_bdev_qos_r_bps_update_quota; 1458 break; 1459 case SPDK_BDEV_QOS_W_BPS_RATE_LIMIT: 1460 qos->rate_limits[i].queue_io = _spdk_bdev_qos_w_queue_io; 1461 qos->rate_limits[i].update_quota = _spdk_bdev_qos_w_bps_update_quota; 1462 break; 1463 default: 1464 break; 1465 } 1466 } 1467 } 1468 1469 static inline void 1470 _spdk_bdev_io_do_submit(struct spdk_bdev_channel *bdev_ch, struct spdk_bdev_io *bdev_io) 1471 { 1472 struct spdk_bdev *bdev = bdev_io->bdev; 1473 struct spdk_io_channel *ch = bdev_ch->channel; 1474 struct spdk_bdev_shared_resource *shared_resource = bdev_ch->shared_resource; 1475 1476 if (spdk_likely(TAILQ_EMPTY(&shared_resource->nomem_io))) { 1477 bdev_ch->io_outstanding++; 1478 shared_resource->io_outstanding++; 1479 bdev_io->internal.in_submit_request = true; 1480 bdev->fn_table->submit_request(ch, bdev_io); 1481 bdev_io->internal.in_submit_request = false; 1482 } else { 1483 TAILQ_INSERT_TAIL(&shared_resource->nomem_io, bdev_io, internal.link); 1484 } 1485 } 1486 1487 static int 1488 _spdk_bdev_qos_io_submit(struct spdk_bdev_channel *ch, struct spdk_bdev_qos *qos) 1489 { 1490 struct spdk_bdev_io *bdev_io = NULL, *tmp = NULL; 1491 int i, submitted_ios = 0; 1492 1493 TAILQ_FOREACH_SAFE(bdev_io, &qos->queued, internal.link, tmp) { 1494 if (_spdk_bdev_qos_io_to_limit(bdev_io) == true) { 1495 for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) { 1496 if (!qos->rate_limits[i].queue_io) { 1497 continue; 1498 } 1499 1500 if (qos->rate_limits[i].queue_io(&qos->rate_limits[i], 1501 bdev_io) == true) { 1502 return submitted_ios; 1503 } 1504 } 1505 for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) { 1506 if (!qos->rate_limits[i].update_quota) { 1507 continue; 1508 } 1509 1510 qos->rate_limits[i].update_quota(&qos->rate_limits[i], bdev_io); 1511 } 1512 } 1513 1514 TAILQ_REMOVE(&qos->queued, bdev_io, internal.link); 1515 _spdk_bdev_io_do_submit(ch, bdev_io); 1516 submitted_ios++; 1517 } 1518 1519 return submitted_ios; 1520 } 1521 1522 static void 1523 _spdk_bdev_queue_io_wait_with_cb(struct spdk_bdev_io *bdev_io, spdk_bdev_io_wait_cb cb_fn) 1524 { 1525 int rc; 1526 1527 bdev_io->internal.waitq_entry.bdev = bdev_io->bdev; 1528 bdev_io->internal.waitq_entry.cb_fn = cb_fn; 1529 bdev_io->internal.waitq_entry.cb_arg = bdev_io; 1530 rc = spdk_bdev_queue_io_wait(bdev_io->bdev, spdk_io_channel_from_ctx(bdev_io->internal.ch), 1531 &bdev_io->internal.waitq_entry); 1532 if (rc != 0) { 1533 SPDK_ERRLOG("Queue IO failed, rc=%d\n", rc); 1534 bdev_io->internal.status = SPDK_BDEV_IO_STATUS_FAILED; 1535 bdev_io->internal.cb(bdev_io, false, bdev_io->internal.caller_ctx); 1536 } 1537 } 1538 1539 static bool 1540 _spdk_bdev_io_type_can_split(uint8_t type) 1541 { 1542 assert(type != SPDK_BDEV_IO_TYPE_INVALID); 1543 assert(type < SPDK_BDEV_NUM_IO_TYPES); 1544 1545 /* Only split READ and WRITE I/O. Theoretically other types of I/O like 1546 * UNMAP could be split, but these types of I/O are typically much larger 1547 * in size (sometimes the size of the entire block device), and the bdev 1548 * module can more efficiently split these types of I/O. Plus those types 1549 * of I/O do not have a payload, which makes the splitting process simpler. 1550 */ 1551 if (type == SPDK_BDEV_IO_TYPE_READ || type == SPDK_BDEV_IO_TYPE_WRITE) { 1552 return true; 1553 } else { 1554 return false; 1555 } 1556 } 1557 1558 static bool 1559 _spdk_bdev_io_should_split(struct spdk_bdev_io *bdev_io) 1560 { 1561 uint64_t start_stripe, end_stripe; 1562 uint32_t io_boundary = bdev_io->bdev->optimal_io_boundary; 1563 1564 if (io_boundary == 0) { 1565 return false; 1566 } 1567 1568 if (!_spdk_bdev_io_type_can_split(bdev_io->type)) { 1569 return false; 1570 } 1571 1572 start_stripe = bdev_io->u.bdev.offset_blocks; 1573 end_stripe = start_stripe + bdev_io->u.bdev.num_blocks - 1; 1574 /* Avoid expensive div operations if possible. These spdk_u32 functions are very cheap. */ 1575 if (spdk_likely(spdk_u32_is_pow2(io_boundary))) { 1576 start_stripe >>= spdk_u32log2(io_boundary); 1577 end_stripe >>= spdk_u32log2(io_boundary); 1578 } else { 1579 start_stripe /= io_boundary; 1580 end_stripe /= io_boundary; 1581 } 1582 return (start_stripe != end_stripe); 1583 } 1584 1585 static uint32_t 1586 _to_next_boundary(uint64_t offset, uint32_t boundary) 1587 { 1588 return (boundary - (offset % boundary)); 1589 } 1590 1591 static void 1592 _spdk_bdev_io_split_done(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg); 1593 1594 static void 1595 _spdk_bdev_io_split(void *_bdev_io) 1596 { 1597 struct spdk_bdev_io *bdev_io = _bdev_io; 1598 uint64_t current_offset, remaining; 1599 uint32_t blocklen, to_next_boundary, to_next_boundary_bytes; 1600 struct iovec *parent_iov, *iov; 1601 uint64_t parent_iov_offset, iov_len; 1602 uint32_t parent_iovpos, parent_iovcnt, child_iovcnt, iovcnt; 1603 void *md_buf = NULL; 1604 int rc; 1605 1606 remaining = bdev_io->u.bdev.split_remaining_num_blocks; 1607 current_offset = bdev_io->u.bdev.split_current_offset_blocks; 1608 blocklen = bdev_io->bdev->blocklen; 1609 parent_iov_offset = (current_offset - bdev_io->u.bdev.offset_blocks) * blocklen; 1610 parent_iovcnt = bdev_io->u.bdev.iovcnt; 1611 1612 for (parent_iovpos = 0; parent_iovpos < parent_iovcnt; parent_iovpos++) { 1613 parent_iov = &bdev_io->u.bdev.iovs[parent_iovpos]; 1614 if (parent_iov_offset < parent_iov->iov_len) { 1615 break; 1616 } 1617 parent_iov_offset -= parent_iov->iov_len; 1618 } 1619 1620 child_iovcnt = 0; 1621 while (remaining > 0 && parent_iovpos < parent_iovcnt && child_iovcnt < BDEV_IO_NUM_CHILD_IOV) { 1622 to_next_boundary = _to_next_boundary(current_offset, bdev_io->bdev->optimal_io_boundary); 1623 to_next_boundary = spdk_min(remaining, to_next_boundary); 1624 to_next_boundary_bytes = to_next_boundary * blocklen; 1625 iov = &bdev_io->child_iov[child_iovcnt]; 1626 iovcnt = 0; 1627 1628 if (bdev_io->u.bdev.md_buf) { 1629 assert((parent_iov_offset % blocklen) > 0); 1630 md_buf = (char *)bdev_io->u.bdev.md_buf + (parent_iov_offset / blocklen) * 1631 spdk_bdev_get_md_size(bdev_io->bdev); 1632 } 1633 1634 while (to_next_boundary_bytes > 0 && parent_iovpos < parent_iovcnt && 1635 child_iovcnt < BDEV_IO_NUM_CHILD_IOV) { 1636 parent_iov = &bdev_io->u.bdev.iovs[parent_iovpos]; 1637 iov_len = spdk_min(to_next_boundary_bytes, parent_iov->iov_len - parent_iov_offset); 1638 to_next_boundary_bytes -= iov_len; 1639 1640 bdev_io->child_iov[child_iovcnt].iov_base = parent_iov->iov_base + parent_iov_offset; 1641 bdev_io->child_iov[child_iovcnt].iov_len = iov_len; 1642 1643 if (iov_len < parent_iov->iov_len - parent_iov_offset) { 1644 parent_iov_offset += iov_len; 1645 } else { 1646 parent_iovpos++; 1647 parent_iov_offset = 0; 1648 } 1649 child_iovcnt++; 1650 iovcnt++; 1651 } 1652 1653 if (to_next_boundary_bytes > 0) { 1654 /* We had to stop this child I/O early because we ran out of 1655 * child_iov space. Make sure the iovs collected are valid and 1656 * then adjust to_next_boundary before starting the child I/O. 1657 */ 1658 if ((to_next_boundary_bytes % blocklen) != 0) { 1659 SPDK_ERRLOG("Remaining %" PRIu32 " is not multiple of block size %" PRIu32 "\n", 1660 to_next_boundary_bytes, blocklen); 1661 bdev_io->internal.status = SPDK_BDEV_IO_STATUS_FAILED; 1662 if (bdev_io->u.bdev.split_outstanding == 0) { 1663 bdev_io->internal.cb(bdev_io, false, bdev_io->internal.caller_ctx); 1664 } 1665 return; 1666 } 1667 to_next_boundary -= to_next_boundary_bytes / blocklen; 1668 } 1669 1670 bdev_io->u.bdev.split_outstanding++; 1671 1672 if (bdev_io->type == SPDK_BDEV_IO_TYPE_READ) { 1673 rc = _spdk_bdev_readv_blocks_with_md(bdev_io->internal.desc, 1674 spdk_io_channel_from_ctx(bdev_io->internal.ch), 1675 iov, iovcnt, md_buf, current_offset, 1676 to_next_boundary, 1677 _spdk_bdev_io_split_done, bdev_io); 1678 } else { 1679 rc = _spdk_bdev_writev_blocks_with_md(bdev_io->internal.desc, 1680 spdk_io_channel_from_ctx(bdev_io->internal.ch), 1681 iov, iovcnt, md_buf, current_offset, 1682 to_next_boundary, 1683 _spdk_bdev_io_split_done, bdev_io); 1684 } 1685 1686 if (rc == 0) { 1687 current_offset += to_next_boundary; 1688 remaining -= to_next_boundary; 1689 bdev_io->u.bdev.split_current_offset_blocks = current_offset; 1690 bdev_io->u.bdev.split_remaining_num_blocks = remaining; 1691 } else { 1692 bdev_io->u.bdev.split_outstanding--; 1693 if (rc == -ENOMEM) { 1694 if (bdev_io->u.bdev.split_outstanding == 0) { 1695 /* No I/O is outstanding. Hence we should wait here. */ 1696 _spdk_bdev_queue_io_wait_with_cb(bdev_io, 1697 _spdk_bdev_io_split); 1698 } 1699 } else { 1700 bdev_io->internal.status = SPDK_BDEV_IO_STATUS_FAILED; 1701 if (bdev_io->u.bdev.split_outstanding == 0) { 1702 bdev_io->internal.cb(bdev_io, false, bdev_io->internal.caller_ctx); 1703 } 1704 } 1705 1706 return; 1707 } 1708 } 1709 } 1710 1711 static void 1712 _spdk_bdev_io_split_done(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg) 1713 { 1714 struct spdk_bdev_io *parent_io = cb_arg; 1715 1716 spdk_bdev_free_io(bdev_io); 1717 1718 if (!success) { 1719 parent_io->internal.status = SPDK_BDEV_IO_STATUS_FAILED; 1720 } 1721 parent_io->u.bdev.split_outstanding--; 1722 if (parent_io->u.bdev.split_outstanding != 0) { 1723 return; 1724 } 1725 1726 /* 1727 * Parent I/O finishes when all blocks are consumed. 1728 */ 1729 if (parent_io->u.bdev.split_remaining_num_blocks == 0) { 1730 parent_io->internal.cb(parent_io, parent_io->internal.status == SPDK_BDEV_IO_STATUS_SUCCESS, 1731 parent_io->internal.caller_ctx); 1732 return; 1733 } 1734 1735 /* 1736 * Continue with the splitting process. This function will complete the parent I/O if the 1737 * splitting is done. 1738 */ 1739 _spdk_bdev_io_split(parent_io); 1740 } 1741 1742 static void 1743 _spdk_bdev_io_split_get_buf_cb(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io, 1744 bool success); 1745 1746 static void 1747 spdk_bdev_io_split(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io) 1748 { 1749 assert(_spdk_bdev_io_type_can_split(bdev_io->type)); 1750 1751 bdev_io->u.bdev.split_current_offset_blocks = bdev_io->u.bdev.offset_blocks; 1752 bdev_io->u.bdev.split_remaining_num_blocks = bdev_io->u.bdev.num_blocks; 1753 bdev_io->u.bdev.split_outstanding = 0; 1754 bdev_io->internal.status = SPDK_BDEV_IO_STATUS_SUCCESS; 1755 1756 if (_is_buf_allocated(bdev_io->u.bdev.iovs)) { 1757 _spdk_bdev_io_split(bdev_io); 1758 } else { 1759 assert(bdev_io->type == SPDK_BDEV_IO_TYPE_READ); 1760 spdk_bdev_io_get_buf(bdev_io, _spdk_bdev_io_split_get_buf_cb, 1761 bdev_io->u.bdev.num_blocks * bdev_io->bdev->blocklen); 1762 } 1763 } 1764 1765 static void 1766 _spdk_bdev_io_split_get_buf_cb(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io, 1767 bool success) 1768 { 1769 if (!success) { 1770 spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED); 1771 return; 1772 } 1773 1774 spdk_bdev_io_split(ch, bdev_io); 1775 } 1776 1777 /* Explicitly mark this inline, since it's used as a function pointer and otherwise won't 1778 * be inlined, at least on some compilers. 1779 */ 1780 static inline void 1781 _spdk_bdev_io_submit(void *ctx) 1782 { 1783 struct spdk_bdev_io *bdev_io = ctx; 1784 struct spdk_bdev *bdev = bdev_io->bdev; 1785 struct spdk_bdev_channel *bdev_ch = bdev_io->internal.ch; 1786 struct spdk_bdev_shared_resource *shared_resource = bdev_ch->shared_resource; 1787 uint64_t tsc; 1788 1789 tsc = spdk_get_ticks(); 1790 bdev_io->internal.submit_tsc = tsc; 1791 spdk_trace_record_tsc(tsc, TRACE_BDEV_IO_START, 0, 0, (uintptr_t)bdev_io, bdev_io->type); 1792 1793 if (spdk_likely(bdev_ch->flags == 0)) { 1794 _spdk_bdev_io_do_submit(bdev_ch, bdev_io); 1795 return; 1796 } 1797 1798 bdev_ch->io_outstanding++; 1799 shared_resource->io_outstanding++; 1800 bdev_io->internal.in_submit_request = true; 1801 if (bdev_ch->flags & BDEV_CH_RESET_IN_PROGRESS) { 1802 spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED); 1803 } else if (bdev_ch->flags & BDEV_CH_QOS_ENABLED) { 1804 bdev_ch->io_outstanding--; 1805 shared_resource->io_outstanding--; 1806 TAILQ_INSERT_TAIL(&bdev->internal.qos->queued, bdev_io, internal.link); 1807 _spdk_bdev_qos_io_submit(bdev_ch, bdev->internal.qos); 1808 } else { 1809 SPDK_ERRLOG("unknown bdev_ch flag %x found\n", bdev_ch->flags); 1810 spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED); 1811 } 1812 bdev_io->internal.in_submit_request = false; 1813 } 1814 1815 static void 1816 spdk_bdev_io_submit(struct spdk_bdev_io *bdev_io) 1817 { 1818 struct spdk_bdev *bdev = bdev_io->bdev; 1819 struct spdk_thread *thread = spdk_bdev_io_get_thread(bdev_io); 1820 1821 assert(thread != NULL); 1822 assert(bdev_io->internal.status == SPDK_BDEV_IO_STATUS_PENDING); 1823 1824 if (bdev->split_on_optimal_io_boundary && _spdk_bdev_io_should_split(bdev_io)) { 1825 spdk_bdev_io_split(NULL, bdev_io); 1826 return; 1827 } 1828 1829 if (bdev_io->internal.ch->flags & BDEV_CH_QOS_ENABLED) { 1830 if ((thread == bdev->internal.qos->thread) || !bdev->internal.qos->thread) { 1831 _spdk_bdev_io_submit(bdev_io); 1832 } else { 1833 bdev_io->internal.io_submit_ch = bdev_io->internal.ch; 1834 bdev_io->internal.ch = bdev->internal.qos->ch; 1835 spdk_thread_send_msg(bdev->internal.qos->thread, _spdk_bdev_io_submit, bdev_io); 1836 } 1837 } else { 1838 _spdk_bdev_io_submit(bdev_io); 1839 } 1840 } 1841 1842 static void 1843 spdk_bdev_io_submit_reset(struct spdk_bdev_io *bdev_io) 1844 { 1845 struct spdk_bdev *bdev = bdev_io->bdev; 1846 struct spdk_bdev_channel *bdev_ch = bdev_io->internal.ch; 1847 struct spdk_io_channel *ch = bdev_ch->channel; 1848 1849 assert(bdev_io->internal.status == SPDK_BDEV_IO_STATUS_PENDING); 1850 1851 bdev_io->internal.in_submit_request = true; 1852 bdev->fn_table->submit_request(ch, bdev_io); 1853 bdev_io->internal.in_submit_request = false; 1854 } 1855 1856 static void 1857 spdk_bdev_io_init(struct spdk_bdev_io *bdev_io, 1858 struct spdk_bdev *bdev, void *cb_arg, 1859 spdk_bdev_io_completion_cb cb) 1860 { 1861 bdev_io->bdev = bdev; 1862 bdev_io->internal.caller_ctx = cb_arg; 1863 bdev_io->internal.cb = cb; 1864 bdev_io->internal.status = SPDK_BDEV_IO_STATUS_PENDING; 1865 bdev_io->internal.in_submit_request = false; 1866 bdev_io->internal.buf = NULL; 1867 bdev_io->internal.io_submit_ch = NULL; 1868 bdev_io->internal.orig_iovs = NULL; 1869 bdev_io->internal.orig_iovcnt = 0; 1870 bdev_io->internal.orig_md_buf = NULL; 1871 } 1872 1873 static bool 1874 _spdk_bdev_io_type_supported(struct spdk_bdev *bdev, enum spdk_bdev_io_type io_type) 1875 { 1876 return bdev->fn_table->io_type_supported(bdev->ctxt, io_type); 1877 } 1878 1879 bool 1880 spdk_bdev_io_type_supported(struct spdk_bdev *bdev, enum spdk_bdev_io_type io_type) 1881 { 1882 bool supported; 1883 1884 supported = _spdk_bdev_io_type_supported(bdev, io_type); 1885 1886 if (!supported) { 1887 switch (io_type) { 1888 case SPDK_BDEV_IO_TYPE_WRITE_ZEROES: 1889 /* The bdev layer will emulate write zeroes as long as write is supported. */ 1890 supported = _spdk_bdev_io_type_supported(bdev, SPDK_BDEV_IO_TYPE_WRITE); 1891 break; 1892 case SPDK_BDEV_IO_TYPE_ZCOPY: 1893 /* Zero copy can be emulated with regular read and write */ 1894 supported = _spdk_bdev_io_type_supported(bdev, SPDK_BDEV_IO_TYPE_READ) && 1895 _spdk_bdev_io_type_supported(bdev, SPDK_BDEV_IO_TYPE_WRITE); 1896 break; 1897 default: 1898 break; 1899 } 1900 } 1901 1902 return supported; 1903 } 1904 1905 int 1906 spdk_bdev_dump_info_json(struct spdk_bdev *bdev, struct spdk_json_write_ctx *w) 1907 { 1908 if (bdev->fn_table->dump_info_json) { 1909 return bdev->fn_table->dump_info_json(bdev->ctxt, w); 1910 } 1911 1912 return 0; 1913 } 1914 1915 static void 1916 spdk_bdev_qos_update_max_quota_per_timeslice(struct spdk_bdev_qos *qos) 1917 { 1918 uint32_t max_per_timeslice = 0; 1919 int i; 1920 1921 for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) { 1922 if (qos->rate_limits[i].limit == SPDK_BDEV_QOS_LIMIT_NOT_DEFINED) { 1923 qos->rate_limits[i].max_per_timeslice = 0; 1924 continue; 1925 } 1926 1927 max_per_timeslice = qos->rate_limits[i].limit * 1928 SPDK_BDEV_QOS_TIMESLICE_IN_USEC / SPDK_SEC_TO_USEC; 1929 1930 qos->rate_limits[i].max_per_timeslice = spdk_max(max_per_timeslice, 1931 qos->rate_limits[i].min_per_timeslice); 1932 1933 qos->rate_limits[i].remaining_this_timeslice = qos->rate_limits[i].max_per_timeslice; 1934 } 1935 1936 _spdk_bdev_qos_set_ops(qos); 1937 } 1938 1939 static int 1940 spdk_bdev_channel_poll_qos(void *arg) 1941 { 1942 struct spdk_bdev_qos *qos = arg; 1943 uint64_t now = spdk_get_ticks(); 1944 int i; 1945 1946 if (now < (qos->last_timeslice + qos->timeslice_size)) { 1947 /* We received our callback earlier than expected - return 1948 * immediately and wait to do accounting until at least one 1949 * timeslice has actually expired. This should never happen 1950 * with a well-behaved timer implementation. 1951 */ 1952 return 0; 1953 } 1954 1955 /* Reset for next round of rate limiting */ 1956 for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) { 1957 /* We may have allowed the IOs or bytes to slightly overrun in the last 1958 * timeslice. remaining_this_timeslice is signed, so if it's negative 1959 * here, we'll account for the overrun so that the next timeslice will 1960 * be appropriately reduced. 1961 */ 1962 if (qos->rate_limits[i].remaining_this_timeslice > 0) { 1963 qos->rate_limits[i].remaining_this_timeslice = 0; 1964 } 1965 } 1966 1967 while (now >= (qos->last_timeslice + qos->timeslice_size)) { 1968 qos->last_timeslice += qos->timeslice_size; 1969 for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) { 1970 qos->rate_limits[i].remaining_this_timeslice += 1971 qos->rate_limits[i].max_per_timeslice; 1972 } 1973 } 1974 1975 return _spdk_bdev_qos_io_submit(qos->ch, qos); 1976 } 1977 1978 static void 1979 _spdk_bdev_channel_destroy_resource(struct spdk_bdev_channel *ch) 1980 { 1981 struct spdk_bdev_shared_resource *shared_resource; 1982 1983 spdk_put_io_channel(ch->channel); 1984 1985 shared_resource = ch->shared_resource; 1986 1987 assert(ch->io_outstanding == 0); 1988 assert(shared_resource->ref > 0); 1989 shared_resource->ref--; 1990 if (shared_resource->ref == 0) { 1991 assert(shared_resource->io_outstanding == 0); 1992 TAILQ_REMOVE(&shared_resource->mgmt_ch->shared_resources, shared_resource, link); 1993 spdk_put_io_channel(spdk_io_channel_from_ctx(shared_resource->mgmt_ch)); 1994 free(shared_resource); 1995 } 1996 } 1997 1998 /* Caller must hold bdev->internal.mutex. */ 1999 static void 2000 _spdk_bdev_enable_qos(struct spdk_bdev *bdev, struct spdk_bdev_channel *ch) 2001 { 2002 struct spdk_bdev_qos *qos = bdev->internal.qos; 2003 int i; 2004 2005 /* Rate limiting on this bdev enabled */ 2006 if (qos) { 2007 if (qos->ch == NULL) { 2008 struct spdk_io_channel *io_ch; 2009 2010 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Selecting channel %p as QoS channel for bdev %s on thread %p\n", ch, 2011 bdev->name, spdk_get_thread()); 2012 2013 /* No qos channel has been selected, so set one up */ 2014 2015 /* Take another reference to ch */ 2016 io_ch = spdk_get_io_channel(__bdev_to_io_dev(bdev)); 2017 assert(io_ch != NULL); 2018 qos->ch = ch; 2019 2020 qos->thread = spdk_io_channel_get_thread(io_ch); 2021 2022 TAILQ_INIT(&qos->queued); 2023 2024 for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) { 2025 if (_spdk_bdev_qos_is_iops_rate_limit(i) == true) { 2026 qos->rate_limits[i].min_per_timeslice = 2027 SPDK_BDEV_QOS_MIN_IO_PER_TIMESLICE; 2028 } else { 2029 qos->rate_limits[i].min_per_timeslice = 2030 SPDK_BDEV_QOS_MIN_BYTE_PER_TIMESLICE; 2031 } 2032 2033 if (qos->rate_limits[i].limit == 0) { 2034 qos->rate_limits[i].limit = SPDK_BDEV_QOS_LIMIT_NOT_DEFINED; 2035 } 2036 } 2037 spdk_bdev_qos_update_max_quota_per_timeslice(qos); 2038 qos->timeslice_size = 2039 SPDK_BDEV_QOS_TIMESLICE_IN_USEC * spdk_get_ticks_hz() / SPDK_SEC_TO_USEC; 2040 qos->last_timeslice = spdk_get_ticks(); 2041 qos->poller = spdk_poller_register(spdk_bdev_channel_poll_qos, 2042 qos, 2043 SPDK_BDEV_QOS_TIMESLICE_IN_USEC); 2044 } 2045 2046 ch->flags |= BDEV_CH_QOS_ENABLED; 2047 } 2048 } 2049 2050 static int 2051 spdk_bdev_channel_create(void *io_device, void *ctx_buf) 2052 { 2053 struct spdk_bdev *bdev = __bdev_from_io_dev(io_device); 2054 struct spdk_bdev_channel *ch = ctx_buf; 2055 struct spdk_io_channel *mgmt_io_ch; 2056 struct spdk_bdev_mgmt_channel *mgmt_ch; 2057 struct spdk_bdev_shared_resource *shared_resource; 2058 2059 ch->bdev = bdev; 2060 ch->channel = bdev->fn_table->get_io_channel(bdev->ctxt); 2061 if (!ch->channel) { 2062 return -1; 2063 } 2064 2065 assert(ch->histogram == NULL); 2066 if (bdev->internal.histogram_enabled) { 2067 ch->histogram = spdk_histogram_data_alloc(); 2068 if (ch->histogram == NULL) { 2069 SPDK_ERRLOG("Could not allocate histogram\n"); 2070 } 2071 } 2072 2073 mgmt_io_ch = spdk_get_io_channel(&g_bdev_mgr); 2074 if (!mgmt_io_ch) { 2075 spdk_put_io_channel(ch->channel); 2076 return -1; 2077 } 2078 2079 mgmt_ch = spdk_io_channel_get_ctx(mgmt_io_ch); 2080 TAILQ_FOREACH(shared_resource, &mgmt_ch->shared_resources, link) { 2081 if (shared_resource->shared_ch == ch->channel) { 2082 spdk_put_io_channel(mgmt_io_ch); 2083 shared_resource->ref++; 2084 break; 2085 } 2086 } 2087 2088 if (shared_resource == NULL) { 2089 shared_resource = calloc(1, sizeof(*shared_resource)); 2090 if (shared_resource == NULL) { 2091 spdk_put_io_channel(ch->channel); 2092 spdk_put_io_channel(mgmt_io_ch); 2093 return -1; 2094 } 2095 2096 shared_resource->mgmt_ch = mgmt_ch; 2097 shared_resource->io_outstanding = 0; 2098 TAILQ_INIT(&shared_resource->nomem_io); 2099 shared_resource->nomem_threshold = 0; 2100 shared_resource->shared_ch = ch->channel; 2101 shared_resource->ref = 1; 2102 TAILQ_INSERT_TAIL(&mgmt_ch->shared_resources, shared_resource, link); 2103 } 2104 2105 memset(&ch->stat, 0, sizeof(ch->stat)); 2106 ch->stat.ticks_rate = spdk_get_ticks_hz(); 2107 ch->io_outstanding = 0; 2108 TAILQ_INIT(&ch->queued_resets); 2109 ch->flags = 0; 2110 ch->shared_resource = shared_resource; 2111 2112 #ifdef SPDK_CONFIG_VTUNE 2113 { 2114 char *name; 2115 __itt_init_ittlib(NULL, 0); 2116 name = spdk_sprintf_alloc("spdk_bdev_%s_%p", ch->bdev->name, ch); 2117 if (!name) { 2118 _spdk_bdev_channel_destroy_resource(ch); 2119 return -1; 2120 } 2121 ch->handle = __itt_string_handle_create(name); 2122 free(name); 2123 ch->start_tsc = spdk_get_ticks(); 2124 ch->interval_tsc = spdk_get_ticks_hz() / 100; 2125 memset(&ch->prev_stat, 0, sizeof(ch->prev_stat)); 2126 } 2127 #endif 2128 2129 pthread_mutex_lock(&bdev->internal.mutex); 2130 _spdk_bdev_enable_qos(bdev, ch); 2131 pthread_mutex_unlock(&bdev->internal.mutex); 2132 2133 return 0; 2134 } 2135 2136 /* 2137 * Abort I/O that are waiting on a data buffer. These types of I/O are 2138 * linked using the spdk_bdev_io internal.buf_link TAILQ_ENTRY. 2139 */ 2140 static void 2141 _spdk_bdev_abort_buf_io(bdev_io_stailq_t *queue, struct spdk_bdev_channel *ch) 2142 { 2143 bdev_io_stailq_t tmp; 2144 struct spdk_bdev_io *bdev_io; 2145 2146 STAILQ_INIT(&tmp); 2147 2148 while (!STAILQ_EMPTY(queue)) { 2149 bdev_io = STAILQ_FIRST(queue); 2150 STAILQ_REMOVE_HEAD(queue, internal.buf_link); 2151 if (bdev_io->internal.ch == ch) { 2152 spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED); 2153 } else { 2154 STAILQ_INSERT_TAIL(&tmp, bdev_io, internal.buf_link); 2155 } 2156 } 2157 2158 STAILQ_SWAP(&tmp, queue, spdk_bdev_io); 2159 } 2160 2161 /* 2162 * Abort I/O that are queued waiting for submission. These types of I/O are 2163 * linked using the spdk_bdev_io link TAILQ_ENTRY. 2164 */ 2165 static void 2166 _spdk_bdev_abort_queued_io(bdev_io_tailq_t *queue, struct spdk_bdev_channel *ch) 2167 { 2168 struct spdk_bdev_io *bdev_io, *tmp; 2169 2170 TAILQ_FOREACH_SAFE(bdev_io, queue, internal.link, tmp) { 2171 if (bdev_io->internal.ch == ch) { 2172 TAILQ_REMOVE(queue, bdev_io, internal.link); 2173 /* 2174 * spdk_bdev_io_complete() assumes that the completed I/O had 2175 * been submitted to the bdev module. Since in this case it 2176 * hadn't, bump io_outstanding to account for the decrement 2177 * that spdk_bdev_io_complete() will do. 2178 */ 2179 if (bdev_io->type != SPDK_BDEV_IO_TYPE_RESET) { 2180 ch->io_outstanding++; 2181 ch->shared_resource->io_outstanding++; 2182 } 2183 spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED); 2184 } 2185 } 2186 } 2187 2188 static void 2189 spdk_bdev_qos_channel_destroy(void *cb_arg) 2190 { 2191 struct spdk_bdev_qos *qos = cb_arg; 2192 2193 spdk_put_io_channel(spdk_io_channel_from_ctx(qos->ch)); 2194 spdk_poller_unregister(&qos->poller); 2195 2196 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Free QoS %p.\n", qos); 2197 2198 free(qos); 2199 } 2200 2201 static int 2202 spdk_bdev_qos_destroy(struct spdk_bdev *bdev) 2203 { 2204 int i; 2205 2206 /* 2207 * Cleanly shutting down the QoS poller is tricky, because 2208 * during the asynchronous operation the user could open 2209 * a new descriptor and create a new channel, spawning 2210 * a new QoS poller. 2211 * 2212 * The strategy is to create a new QoS structure here and swap it 2213 * in. The shutdown path then continues to refer to the old one 2214 * until it completes and then releases it. 2215 */ 2216 struct spdk_bdev_qos *new_qos, *old_qos; 2217 2218 old_qos = bdev->internal.qos; 2219 2220 new_qos = calloc(1, sizeof(*new_qos)); 2221 if (!new_qos) { 2222 SPDK_ERRLOG("Unable to allocate memory to shut down QoS.\n"); 2223 return -ENOMEM; 2224 } 2225 2226 /* Copy the old QoS data into the newly allocated structure */ 2227 memcpy(new_qos, old_qos, sizeof(*new_qos)); 2228 2229 /* Zero out the key parts of the QoS structure */ 2230 new_qos->ch = NULL; 2231 new_qos->thread = NULL; 2232 new_qos->poller = NULL; 2233 TAILQ_INIT(&new_qos->queued); 2234 /* 2235 * The limit member of spdk_bdev_qos_limit structure is not zeroed. 2236 * It will be used later for the new QoS structure. 2237 */ 2238 for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) { 2239 new_qos->rate_limits[i].remaining_this_timeslice = 0; 2240 new_qos->rate_limits[i].min_per_timeslice = 0; 2241 new_qos->rate_limits[i].max_per_timeslice = 0; 2242 } 2243 2244 bdev->internal.qos = new_qos; 2245 2246 if (old_qos->thread == NULL) { 2247 free(old_qos); 2248 } else { 2249 spdk_thread_send_msg(old_qos->thread, spdk_bdev_qos_channel_destroy, 2250 old_qos); 2251 } 2252 2253 /* It is safe to continue with destroying the bdev even though the QoS channel hasn't 2254 * been destroyed yet. The destruction path will end up waiting for the final 2255 * channel to be put before it releases resources. */ 2256 2257 return 0; 2258 } 2259 2260 static void 2261 _spdk_bdev_io_stat_add(struct spdk_bdev_io_stat *total, struct spdk_bdev_io_stat *add) 2262 { 2263 total->bytes_read += add->bytes_read; 2264 total->num_read_ops += add->num_read_ops; 2265 total->bytes_written += add->bytes_written; 2266 total->num_write_ops += add->num_write_ops; 2267 total->bytes_unmapped += add->bytes_unmapped; 2268 total->num_unmap_ops += add->num_unmap_ops; 2269 total->read_latency_ticks += add->read_latency_ticks; 2270 total->write_latency_ticks += add->write_latency_ticks; 2271 total->unmap_latency_ticks += add->unmap_latency_ticks; 2272 } 2273 2274 static void 2275 spdk_bdev_channel_destroy(void *io_device, void *ctx_buf) 2276 { 2277 struct spdk_bdev_channel *ch = ctx_buf; 2278 struct spdk_bdev_mgmt_channel *mgmt_ch; 2279 struct spdk_bdev_shared_resource *shared_resource = ch->shared_resource; 2280 2281 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Destroying channel %p for bdev %s on thread %p\n", ch, ch->bdev->name, 2282 spdk_get_thread()); 2283 2284 /* This channel is going away, so add its statistics into the bdev so that they don't get lost. */ 2285 pthread_mutex_lock(&ch->bdev->internal.mutex); 2286 _spdk_bdev_io_stat_add(&ch->bdev->internal.stat, &ch->stat); 2287 pthread_mutex_unlock(&ch->bdev->internal.mutex); 2288 2289 mgmt_ch = shared_resource->mgmt_ch; 2290 2291 _spdk_bdev_abort_queued_io(&ch->queued_resets, ch); 2292 _spdk_bdev_abort_queued_io(&shared_resource->nomem_io, ch); 2293 _spdk_bdev_abort_buf_io(&mgmt_ch->need_buf_small, ch); 2294 _spdk_bdev_abort_buf_io(&mgmt_ch->need_buf_large, ch); 2295 2296 if (ch->histogram) { 2297 spdk_histogram_data_free(ch->histogram); 2298 } 2299 2300 _spdk_bdev_channel_destroy_resource(ch); 2301 } 2302 2303 int 2304 spdk_bdev_alias_add(struct spdk_bdev *bdev, const char *alias) 2305 { 2306 struct spdk_bdev_alias *tmp; 2307 2308 if (alias == NULL) { 2309 SPDK_ERRLOG("Empty alias passed\n"); 2310 return -EINVAL; 2311 } 2312 2313 if (spdk_bdev_get_by_name(alias)) { 2314 SPDK_ERRLOG("Bdev name/alias: %s already exists\n", alias); 2315 return -EEXIST; 2316 } 2317 2318 tmp = calloc(1, sizeof(*tmp)); 2319 if (tmp == NULL) { 2320 SPDK_ERRLOG("Unable to allocate alias\n"); 2321 return -ENOMEM; 2322 } 2323 2324 tmp->alias = strdup(alias); 2325 if (tmp->alias == NULL) { 2326 free(tmp); 2327 SPDK_ERRLOG("Unable to allocate alias\n"); 2328 return -ENOMEM; 2329 } 2330 2331 TAILQ_INSERT_TAIL(&bdev->aliases, tmp, tailq); 2332 2333 return 0; 2334 } 2335 2336 int 2337 spdk_bdev_alias_del(struct spdk_bdev *bdev, const char *alias) 2338 { 2339 struct spdk_bdev_alias *tmp; 2340 2341 TAILQ_FOREACH(tmp, &bdev->aliases, tailq) { 2342 if (strcmp(alias, tmp->alias) == 0) { 2343 TAILQ_REMOVE(&bdev->aliases, tmp, tailq); 2344 free(tmp->alias); 2345 free(tmp); 2346 return 0; 2347 } 2348 } 2349 2350 SPDK_INFOLOG(SPDK_LOG_BDEV, "Alias %s does not exists\n", alias); 2351 2352 return -ENOENT; 2353 } 2354 2355 void 2356 spdk_bdev_alias_del_all(struct spdk_bdev *bdev) 2357 { 2358 struct spdk_bdev_alias *p, *tmp; 2359 2360 TAILQ_FOREACH_SAFE(p, &bdev->aliases, tailq, tmp) { 2361 TAILQ_REMOVE(&bdev->aliases, p, tailq); 2362 free(p->alias); 2363 free(p); 2364 } 2365 } 2366 2367 struct spdk_io_channel * 2368 spdk_bdev_get_io_channel(struct spdk_bdev_desc *desc) 2369 { 2370 return spdk_get_io_channel(__bdev_to_io_dev(spdk_bdev_desc_get_bdev(desc))); 2371 } 2372 2373 const char * 2374 spdk_bdev_get_name(const struct spdk_bdev *bdev) 2375 { 2376 return bdev->name; 2377 } 2378 2379 const char * 2380 spdk_bdev_get_product_name(const struct spdk_bdev *bdev) 2381 { 2382 return bdev->product_name; 2383 } 2384 2385 const struct spdk_bdev_aliases_list * 2386 spdk_bdev_get_aliases(const struct spdk_bdev *bdev) 2387 { 2388 return &bdev->aliases; 2389 } 2390 2391 uint32_t 2392 spdk_bdev_get_block_size(const struct spdk_bdev *bdev) 2393 { 2394 return bdev->blocklen; 2395 } 2396 2397 uint64_t 2398 spdk_bdev_get_num_blocks(const struct spdk_bdev *bdev) 2399 { 2400 return bdev->blockcnt; 2401 } 2402 2403 const char * 2404 spdk_bdev_get_qos_rpc_type(enum spdk_bdev_qos_rate_limit_type type) 2405 { 2406 return qos_rpc_type[type]; 2407 } 2408 2409 void 2410 spdk_bdev_get_qos_rate_limits(struct spdk_bdev *bdev, uint64_t *limits) 2411 { 2412 int i; 2413 2414 memset(limits, 0, sizeof(*limits) * SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES); 2415 2416 pthread_mutex_lock(&bdev->internal.mutex); 2417 if (bdev->internal.qos) { 2418 for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) { 2419 if (bdev->internal.qos->rate_limits[i].limit != 2420 SPDK_BDEV_QOS_LIMIT_NOT_DEFINED) { 2421 limits[i] = bdev->internal.qos->rate_limits[i].limit; 2422 if (_spdk_bdev_qos_is_iops_rate_limit(i) == false) { 2423 /* Change from Byte to Megabyte which is user visible. */ 2424 limits[i] = limits[i] / 1024 / 1024; 2425 } 2426 } 2427 } 2428 } 2429 pthread_mutex_unlock(&bdev->internal.mutex); 2430 } 2431 2432 size_t 2433 spdk_bdev_get_buf_align(const struct spdk_bdev *bdev) 2434 { 2435 return 1 << bdev->required_alignment; 2436 } 2437 2438 uint32_t 2439 spdk_bdev_get_optimal_io_boundary(const struct spdk_bdev *bdev) 2440 { 2441 return bdev->optimal_io_boundary; 2442 } 2443 2444 bool 2445 spdk_bdev_has_write_cache(const struct spdk_bdev *bdev) 2446 { 2447 return bdev->write_cache; 2448 } 2449 2450 const struct spdk_uuid * 2451 spdk_bdev_get_uuid(const struct spdk_bdev *bdev) 2452 { 2453 return &bdev->uuid; 2454 } 2455 2456 uint32_t 2457 spdk_bdev_get_md_size(const struct spdk_bdev *bdev) 2458 { 2459 return bdev->md_len; 2460 } 2461 2462 bool 2463 spdk_bdev_is_md_interleaved(const struct spdk_bdev *bdev) 2464 { 2465 return (bdev->md_len != 0) && bdev->md_interleave; 2466 } 2467 2468 bool 2469 spdk_bdev_is_md_separate(const struct spdk_bdev *bdev) 2470 { 2471 return (bdev->md_len != 0) && !bdev->md_interleave; 2472 } 2473 2474 uint32_t 2475 spdk_bdev_get_data_block_size(const struct spdk_bdev *bdev) 2476 { 2477 if (spdk_bdev_is_md_interleaved(bdev)) { 2478 return bdev->blocklen - bdev->md_len; 2479 } else { 2480 return bdev->blocklen; 2481 } 2482 } 2483 2484 static uint32_t 2485 _bdev_get_block_size_with_md(const struct spdk_bdev *bdev) 2486 { 2487 if (!spdk_bdev_is_md_interleaved(bdev)) { 2488 return bdev->blocklen + bdev->md_len; 2489 } else { 2490 return bdev->blocklen; 2491 } 2492 } 2493 2494 enum spdk_dif_type spdk_bdev_get_dif_type(const struct spdk_bdev *bdev) 2495 { 2496 if (bdev->md_len != 0) { 2497 return bdev->dif_type; 2498 } else { 2499 return SPDK_DIF_DISABLE; 2500 } 2501 } 2502 2503 bool 2504 spdk_bdev_is_dif_head_of_md(const struct spdk_bdev *bdev) 2505 { 2506 if (spdk_bdev_get_dif_type(bdev) != SPDK_DIF_DISABLE) { 2507 return bdev->dif_is_head_of_md; 2508 } else { 2509 return false; 2510 } 2511 } 2512 2513 bool 2514 spdk_bdev_is_dif_check_enabled(const struct spdk_bdev *bdev, 2515 enum spdk_dif_check_type check_type) 2516 { 2517 if (spdk_bdev_get_dif_type(bdev) == SPDK_DIF_DISABLE) { 2518 return false; 2519 } 2520 2521 switch (check_type) { 2522 case SPDK_DIF_CHECK_TYPE_REFTAG: 2523 return (bdev->dif_check_flags & SPDK_DIF_FLAGS_REFTAG_CHECK) != 0; 2524 case SPDK_DIF_CHECK_TYPE_APPTAG: 2525 return (bdev->dif_check_flags & SPDK_DIF_FLAGS_APPTAG_CHECK) != 0; 2526 case SPDK_DIF_CHECK_TYPE_GUARD: 2527 return (bdev->dif_check_flags & SPDK_DIF_FLAGS_GUARD_CHECK) != 0; 2528 default: 2529 return false; 2530 } 2531 } 2532 2533 uint64_t 2534 spdk_bdev_get_qd(const struct spdk_bdev *bdev) 2535 { 2536 return bdev->internal.measured_queue_depth; 2537 } 2538 2539 uint64_t 2540 spdk_bdev_get_qd_sampling_period(const struct spdk_bdev *bdev) 2541 { 2542 return bdev->internal.period; 2543 } 2544 2545 uint64_t 2546 spdk_bdev_get_weighted_io_time(const struct spdk_bdev *bdev) 2547 { 2548 return bdev->internal.weighted_io_time; 2549 } 2550 2551 uint64_t 2552 spdk_bdev_get_io_time(const struct spdk_bdev *bdev) 2553 { 2554 return bdev->internal.io_time; 2555 } 2556 2557 static void 2558 _calculate_measured_qd_cpl(struct spdk_io_channel_iter *i, int status) 2559 { 2560 struct spdk_bdev *bdev = spdk_io_channel_iter_get_ctx(i); 2561 2562 bdev->internal.measured_queue_depth = bdev->internal.temporary_queue_depth; 2563 2564 if (bdev->internal.measured_queue_depth) { 2565 bdev->internal.io_time += bdev->internal.period; 2566 bdev->internal.weighted_io_time += bdev->internal.period * bdev->internal.measured_queue_depth; 2567 } 2568 } 2569 2570 static void 2571 _calculate_measured_qd(struct spdk_io_channel_iter *i) 2572 { 2573 struct spdk_bdev *bdev = spdk_io_channel_iter_get_ctx(i); 2574 struct spdk_io_channel *io_ch = spdk_io_channel_iter_get_channel(i); 2575 struct spdk_bdev_channel *ch = spdk_io_channel_get_ctx(io_ch); 2576 2577 bdev->internal.temporary_queue_depth += ch->io_outstanding; 2578 spdk_for_each_channel_continue(i, 0); 2579 } 2580 2581 static int 2582 spdk_bdev_calculate_measured_queue_depth(void *ctx) 2583 { 2584 struct spdk_bdev *bdev = ctx; 2585 bdev->internal.temporary_queue_depth = 0; 2586 spdk_for_each_channel(__bdev_to_io_dev(bdev), _calculate_measured_qd, bdev, 2587 _calculate_measured_qd_cpl); 2588 return 0; 2589 } 2590 2591 void 2592 spdk_bdev_set_qd_sampling_period(struct spdk_bdev *bdev, uint64_t period) 2593 { 2594 bdev->internal.period = period; 2595 2596 if (bdev->internal.qd_poller != NULL) { 2597 spdk_poller_unregister(&bdev->internal.qd_poller); 2598 bdev->internal.measured_queue_depth = UINT64_MAX; 2599 } 2600 2601 if (period != 0) { 2602 bdev->internal.qd_poller = spdk_poller_register(spdk_bdev_calculate_measured_queue_depth, bdev, 2603 period); 2604 } 2605 } 2606 2607 int 2608 spdk_bdev_notify_blockcnt_change(struct spdk_bdev *bdev, uint64_t size) 2609 { 2610 int ret; 2611 2612 pthread_mutex_lock(&bdev->internal.mutex); 2613 2614 /* bdev has open descriptors */ 2615 if (!TAILQ_EMPTY(&bdev->internal.open_descs) && 2616 bdev->blockcnt > size) { 2617 ret = -EBUSY; 2618 } else { 2619 bdev->blockcnt = size; 2620 ret = 0; 2621 } 2622 2623 pthread_mutex_unlock(&bdev->internal.mutex); 2624 2625 return ret; 2626 } 2627 2628 /* 2629 * Convert I/O offset and length from bytes to blocks. 2630 * 2631 * Returns zero on success or non-zero if the byte parameters aren't divisible by the block size. 2632 */ 2633 static uint64_t 2634 spdk_bdev_bytes_to_blocks(struct spdk_bdev *bdev, uint64_t offset_bytes, uint64_t *offset_blocks, 2635 uint64_t num_bytes, uint64_t *num_blocks) 2636 { 2637 uint32_t block_size = bdev->blocklen; 2638 uint8_t shift_cnt; 2639 2640 /* Avoid expensive div operations if possible. These spdk_u32 functions are very cheap. */ 2641 if (spdk_likely(spdk_u32_is_pow2(block_size))) { 2642 shift_cnt = spdk_u32log2(block_size); 2643 *offset_blocks = offset_bytes >> shift_cnt; 2644 *num_blocks = num_bytes >> shift_cnt; 2645 return (offset_bytes - (*offset_blocks << shift_cnt)) | 2646 (num_bytes - (*num_blocks << shift_cnt)); 2647 } else { 2648 *offset_blocks = offset_bytes / block_size; 2649 *num_blocks = num_bytes / block_size; 2650 return (offset_bytes % block_size) | (num_bytes % block_size); 2651 } 2652 } 2653 2654 static bool 2655 spdk_bdev_io_valid_blocks(struct spdk_bdev *bdev, uint64_t offset_blocks, uint64_t num_blocks) 2656 { 2657 /* Return failure if offset_blocks + num_blocks is less than offset_blocks; indicates there 2658 * has been an overflow and hence the offset has been wrapped around */ 2659 if (offset_blocks + num_blocks < offset_blocks) { 2660 return false; 2661 } 2662 2663 /* Return failure if offset_blocks + num_blocks exceeds the size of the bdev */ 2664 if (offset_blocks + num_blocks > bdev->blockcnt) { 2665 return false; 2666 } 2667 2668 return true; 2669 } 2670 2671 static bool 2672 _bdev_io_check_md_buf(const struct iovec *iovs, const void *md_buf) 2673 { 2674 return _is_buf_allocated(iovs) == (md_buf != NULL); 2675 } 2676 2677 static int 2678 _spdk_bdev_read_blocks_with_md(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, void *buf, 2679 void *md_buf, int64_t offset_blocks, uint64_t num_blocks, 2680 spdk_bdev_io_completion_cb cb, void *cb_arg) 2681 { 2682 struct spdk_bdev *bdev = spdk_bdev_desc_get_bdev(desc); 2683 struct spdk_bdev_io *bdev_io; 2684 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 2685 2686 if (!spdk_bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) { 2687 return -EINVAL; 2688 } 2689 2690 bdev_io = spdk_bdev_get_io(channel); 2691 if (!bdev_io) { 2692 return -ENOMEM; 2693 } 2694 2695 bdev_io->internal.ch = channel; 2696 bdev_io->internal.desc = desc; 2697 bdev_io->type = SPDK_BDEV_IO_TYPE_READ; 2698 bdev_io->u.bdev.iovs = &bdev_io->iov; 2699 bdev_io->u.bdev.iovs[0].iov_base = buf; 2700 bdev_io->u.bdev.iovs[0].iov_len = num_blocks * bdev->blocklen; 2701 bdev_io->u.bdev.iovcnt = 1; 2702 bdev_io->u.bdev.md_buf = md_buf; 2703 bdev_io->u.bdev.num_blocks = num_blocks; 2704 bdev_io->u.bdev.offset_blocks = offset_blocks; 2705 spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb); 2706 2707 spdk_bdev_io_submit(bdev_io); 2708 return 0; 2709 } 2710 2711 int 2712 spdk_bdev_read(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 2713 void *buf, uint64_t offset, uint64_t nbytes, 2714 spdk_bdev_io_completion_cb cb, void *cb_arg) 2715 { 2716 uint64_t offset_blocks, num_blocks; 2717 2718 if (spdk_bdev_bytes_to_blocks(spdk_bdev_desc_get_bdev(desc), offset, &offset_blocks, 2719 nbytes, &num_blocks) != 0) { 2720 return -EINVAL; 2721 } 2722 2723 return spdk_bdev_read_blocks(desc, ch, buf, offset_blocks, num_blocks, cb, cb_arg); 2724 } 2725 2726 int 2727 spdk_bdev_read_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 2728 void *buf, uint64_t offset_blocks, uint64_t num_blocks, 2729 spdk_bdev_io_completion_cb cb, void *cb_arg) 2730 { 2731 return _spdk_bdev_read_blocks_with_md(desc, ch, buf, NULL, offset_blocks, num_blocks, 2732 cb, cb_arg); 2733 } 2734 2735 int 2736 spdk_bdev_read_blocks_with_md(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 2737 void *buf, void *md_buf, int64_t offset_blocks, uint64_t num_blocks, 2738 spdk_bdev_io_completion_cb cb, void *cb_arg) 2739 { 2740 struct iovec iov = { 2741 .iov_base = buf, 2742 }; 2743 2744 if (!spdk_bdev_is_md_separate(spdk_bdev_desc_get_bdev(desc))) { 2745 return -EINVAL; 2746 } 2747 2748 if (!_bdev_io_check_md_buf(&iov, md_buf)) { 2749 return -EINVAL; 2750 } 2751 2752 return _spdk_bdev_read_blocks_with_md(desc, ch, buf, md_buf, offset_blocks, num_blocks, 2753 cb, cb_arg); 2754 } 2755 2756 int 2757 spdk_bdev_readv(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 2758 struct iovec *iov, int iovcnt, 2759 uint64_t offset, uint64_t nbytes, 2760 spdk_bdev_io_completion_cb cb, void *cb_arg) 2761 { 2762 uint64_t offset_blocks, num_blocks; 2763 2764 if (spdk_bdev_bytes_to_blocks(spdk_bdev_desc_get_bdev(desc), offset, &offset_blocks, 2765 nbytes, &num_blocks) != 0) { 2766 return -EINVAL; 2767 } 2768 2769 return spdk_bdev_readv_blocks(desc, ch, iov, iovcnt, offset_blocks, num_blocks, cb, cb_arg); 2770 } 2771 2772 static int 2773 _spdk_bdev_readv_blocks_with_md(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 2774 struct iovec *iov, int iovcnt, void *md_buf, uint64_t offset_blocks, 2775 uint64_t num_blocks, spdk_bdev_io_completion_cb cb, void *cb_arg) 2776 { 2777 struct spdk_bdev *bdev = spdk_bdev_desc_get_bdev(desc); 2778 struct spdk_bdev_io *bdev_io; 2779 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 2780 2781 if (!spdk_bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) { 2782 return -EINVAL; 2783 } 2784 2785 bdev_io = spdk_bdev_get_io(channel); 2786 if (!bdev_io) { 2787 return -ENOMEM; 2788 } 2789 2790 bdev_io->internal.ch = channel; 2791 bdev_io->internal.desc = desc; 2792 bdev_io->type = SPDK_BDEV_IO_TYPE_READ; 2793 bdev_io->u.bdev.iovs = iov; 2794 bdev_io->u.bdev.iovcnt = iovcnt; 2795 bdev_io->u.bdev.md_buf = md_buf; 2796 bdev_io->u.bdev.num_blocks = num_blocks; 2797 bdev_io->u.bdev.offset_blocks = offset_blocks; 2798 spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb); 2799 2800 spdk_bdev_io_submit(bdev_io); 2801 return 0; 2802 } 2803 2804 int spdk_bdev_readv_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 2805 struct iovec *iov, int iovcnt, 2806 uint64_t offset_blocks, uint64_t num_blocks, 2807 spdk_bdev_io_completion_cb cb, void *cb_arg) 2808 { 2809 return _spdk_bdev_readv_blocks_with_md(desc, ch, iov, iovcnt, NULL, offset_blocks, 2810 num_blocks, cb, cb_arg); 2811 } 2812 2813 int 2814 spdk_bdev_readv_blocks_with_md(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 2815 struct iovec *iov, int iovcnt, void *md_buf, 2816 uint64_t offset_blocks, uint64_t num_blocks, 2817 spdk_bdev_io_completion_cb cb, void *cb_arg) 2818 { 2819 if (!spdk_bdev_is_md_separate(spdk_bdev_desc_get_bdev(desc))) { 2820 return -EINVAL; 2821 } 2822 2823 if (!_bdev_io_check_md_buf(iov, md_buf)) { 2824 return -EINVAL; 2825 } 2826 2827 return _spdk_bdev_readv_blocks_with_md(desc, ch, iov, iovcnt, md_buf, offset_blocks, 2828 num_blocks, cb, cb_arg); 2829 } 2830 2831 static int 2832 _spdk_bdev_write_blocks_with_md(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 2833 void *buf, void *md_buf, uint64_t offset_blocks, uint64_t num_blocks, 2834 spdk_bdev_io_completion_cb cb, void *cb_arg) 2835 { 2836 struct spdk_bdev *bdev = spdk_bdev_desc_get_bdev(desc); 2837 struct spdk_bdev_io *bdev_io; 2838 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 2839 2840 if (!desc->write) { 2841 return -EBADF; 2842 } 2843 2844 if (!spdk_bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) { 2845 return -EINVAL; 2846 } 2847 2848 bdev_io = spdk_bdev_get_io(channel); 2849 if (!bdev_io) { 2850 return -ENOMEM; 2851 } 2852 2853 bdev_io->internal.ch = channel; 2854 bdev_io->internal.desc = desc; 2855 bdev_io->type = SPDK_BDEV_IO_TYPE_WRITE; 2856 bdev_io->u.bdev.iovs = &bdev_io->iov; 2857 bdev_io->u.bdev.iovs[0].iov_base = buf; 2858 bdev_io->u.bdev.iovs[0].iov_len = num_blocks * bdev->blocklen; 2859 bdev_io->u.bdev.iovcnt = 1; 2860 bdev_io->u.bdev.md_buf = md_buf; 2861 bdev_io->u.bdev.num_blocks = num_blocks; 2862 bdev_io->u.bdev.offset_blocks = offset_blocks; 2863 spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb); 2864 2865 spdk_bdev_io_submit(bdev_io); 2866 return 0; 2867 } 2868 2869 int 2870 spdk_bdev_write(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 2871 void *buf, uint64_t offset, uint64_t nbytes, 2872 spdk_bdev_io_completion_cb cb, void *cb_arg) 2873 { 2874 uint64_t offset_blocks, num_blocks; 2875 2876 if (spdk_bdev_bytes_to_blocks(spdk_bdev_desc_get_bdev(desc), offset, &offset_blocks, 2877 nbytes, &num_blocks) != 0) { 2878 return -EINVAL; 2879 } 2880 2881 return spdk_bdev_write_blocks(desc, ch, buf, offset_blocks, num_blocks, cb, cb_arg); 2882 } 2883 2884 int 2885 spdk_bdev_write_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 2886 void *buf, uint64_t offset_blocks, uint64_t num_blocks, 2887 spdk_bdev_io_completion_cb cb, void *cb_arg) 2888 { 2889 return _spdk_bdev_write_blocks_with_md(desc, ch, buf, NULL, offset_blocks, num_blocks, 2890 cb, cb_arg); 2891 } 2892 2893 int 2894 spdk_bdev_write_blocks_with_md(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 2895 void *buf, void *md_buf, uint64_t offset_blocks, uint64_t num_blocks, 2896 spdk_bdev_io_completion_cb cb, void *cb_arg) 2897 { 2898 struct iovec iov = { 2899 .iov_base = buf, 2900 }; 2901 2902 if (!spdk_bdev_is_md_separate(spdk_bdev_desc_get_bdev(desc))) { 2903 return -EINVAL; 2904 } 2905 2906 if (!_bdev_io_check_md_buf(&iov, md_buf)) { 2907 return -EINVAL; 2908 } 2909 2910 return _spdk_bdev_write_blocks_with_md(desc, ch, buf, md_buf, offset_blocks, num_blocks, 2911 cb, cb_arg); 2912 } 2913 2914 static int 2915 _spdk_bdev_writev_blocks_with_md(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 2916 struct iovec *iov, int iovcnt, void *md_buf, 2917 uint64_t offset_blocks, uint64_t num_blocks, 2918 spdk_bdev_io_completion_cb cb, void *cb_arg) 2919 { 2920 struct spdk_bdev *bdev = spdk_bdev_desc_get_bdev(desc); 2921 struct spdk_bdev_io *bdev_io; 2922 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 2923 2924 if (!desc->write) { 2925 return -EBADF; 2926 } 2927 2928 if (!spdk_bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) { 2929 return -EINVAL; 2930 } 2931 2932 bdev_io = spdk_bdev_get_io(channel); 2933 if (!bdev_io) { 2934 return -ENOMEM; 2935 } 2936 2937 bdev_io->internal.ch = channel; 2938 bdev_io->internal.desc = desc; 2939 bdev_io->type = SPDK_BDEV_IO_TYPE_WRITE; 2940 bdev_io->u.bdev.iovs = iov; 2941 bdev_io->u.bdev.iovcnt = iovcnt; 2942 bdev_io->u.bdev.md_buf = md_buf; 2943 bdev_io->u.bdev.num_blocks = num_blocks; 2944 bdev_io->u.bdev.offset_blocks = offset_blocks; 2945 spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb); 2946 2947 spdk_bdev_io_submit(bdev_io); 2948 return 0; 2949 } 2950 2951 int 2952 spdk_bdev_writev(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 2953 struct iovec *iov, int iovcnt, 2954 uint64_t offset, uint64_t len, 2955 spdk_bdev_io_completion_cb cb, void *cb_arg) 2956 { 2957 uint64_t offset_blocks, num_blocks; 2958 2959 if (spdk_bdev_bytes_to_blocks(spdk_bdev_desc_get_bdev(desc), offset, &offset_blocks, 2960 len, &num_blocks) != 0) { 2961 return -EINVAL; 2962 } 2963 2964 return spdk_bdev_writev_blocks(desc, ch, iov, iovcnt, offset_blocks, num_blocks, cb, cb_arg); 2965 } 2966 2967 int 2968 spdk_bdev_writev_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 2969 struct iovec *iov, int iovcnt, 2970 uint64_t offset_blocks, uint64_t num_blocks, 2971 spdk_bdev_io_completion_cb cb, void *cb_arg) 2972 { 2973 return _spdk_bdev_writev_blocks_with_md(desc, ch, iov, iovcnt, NULL, offset_blocks, 2974 num_blocks, cb, cb_arg); 2975 } 2976 2977 int 2978 spdk_bdev_writev_blocks_with_md(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 2979 struct iovec *iov, int iovcnt, void *md_buf, 2980 uint64_t offset_blocks, uint64_t num_blocks, 2981 spdk_bdev_io_completion_cb cb, void *cb_arg) 2982 { 2983 if (!spdk_bdev_is_md_separate(spdk_bdev_desc_get_bdev(desc))) { 2984 return -EINVAL; 2985 } 2986 2987 if (!_bdev_io_check_md_buf(iov, md_buf)) { 2988 return -EINVAL; 2989 } 2990 2991 return _spdk_bdev_writev_blocks_with_md(desc, ch, iov, iovcnt, md_buf, offset_blocks, 2992 num_blocks, cb, cb_arg); 2993 } 2994 2995 static void 2996 bdev_zcopy_get_buf(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io, bool success) 2997 { 2998 if (!success) { 2999 /* Don't use spdk_bdev_io_complete here - this bdev_io was never actually submitted. */ 3000 bdev_io->internal.status = SPDK_BDEV_IO_STATUS_NOMEM; 3001 bdev_io->internal.cb(bdev_io, success, bdev_io->internal.caller_ctx); 3002 return; 3003 } 3004 3005 if (bdev_io->u.bdev.zcopy.populate) { 3006 /* Read the real data into the buffer */ 3007 bdev_io->type = SPDK_BDEV_IO_TYPE_READ; 3008 bdev_io->internal.status = SPDK_BDEV_IO_STATUS_PENDING; 3009 spdk_bdev_io_submit(bdev_io); 3010 return; 3011 } 3012 3013 /* Don't use spdk_bdev_io_complete here - this bdev_io was never actually submitted. */ 3014 bdev_io->internal.status = SPDK_BDEV_IO_STATUS_SUCCESS; 3015 bdev_io->internal.cb(bdev_io, success, bdev_io->internal.caller_ctx); 3016 } 3017 3018 int 3019 spdk_bdev_zcopy_start(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 3020 uint64_t offset_blocks, uint64_t num_blocks, 3021 bool populate, 3022 spdk_bdev_io_completion_cb cb, void *cb_arg) 3023 { 3024 struct spdk_bdev *bdev = spdk_bdev_desc_get_bdev(desc); 3025 struct spdk_bdev_io *bdev_io; 3026 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 3027 3028 if (!desc->write) { 3029 return -EBADF; 3030 } 3031 3032 if (!spdk_bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) { 3033 return -EINVAL; 3034 } 3035 3036 if (!spdk_bdev_io_type_supported(bdev, SPDK_BDEV_IO_TYPE_ZCOPY)) { 3037 return -ENOTSUP; 3038 } 3039 3040 bdev_io = spdk_bdev_get_io(channel); 3041 if (!bdev_io) { 3042 return -ENOMEM; 3043 } 3044 3045 bdev_io->internal.ch = channel; 3046 bdev_io->internal.desc = desc; 3047 bdev_io->type = SPDK_BDEV_IO_TYPE_ZCOPY; 3048 bdev_io->u.bdev.num_blocks = num_blocks; 3049 bdev_io->u.bdev.offset_blocks = offset_blocks; 3050 bdev_io->u.bdev.iovs = NULL; 3051 bdev_io->u.bdev.iovcnt = 0; 3052 bdev_io->u.bdev.zcopy.populate = populate ? 1 : 0; 3053 bdev_io->u.bdev.zcopy.commit = 0; 3054 bdev_io->u.bdev.zcopy.start = 1; 3055 spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb); 3056 3057 if (_spdk_bdev_io_type_supported(bdev, SPDK_BDEV_IO_TYPE_ZCOPY)) { 3058 spdk_bdev_io_submit(bdev_io); 3059 } else { 3060 /* Emulate zcopy by allocating a buffer */ 3061 spdk_bdev_io_get_buf(bdev_io, bdev_zcopy_get_buf, 3062 bdev_io->u.bdev.num_blocks * bdev->blocklen); 3063 } 3064 3065 return 0; 3066 } 3067 3068 int 3069 spdk_bdev_zcopy_end(struct spdk_bdev_io *bdev_io, bool commit, 3070 spdk_bdev_io_completion_cb cb, void *cb_arg) 3071 { 3072 struct spdk_bdev *bdev = bdev_io->bdev; 3073 3074 if (bdev_io->type == SPDK_BDEV_IO_TYPE_READ) { 3075 /* This can happen if the zcopy was emulated in start */ 3076 if (bdev_io->u.bdev.zcopy.start != 1) { 3077 return -EINVAL; 3078 } 3079 bdev_io->type = SPDK_BDEV_IO_TYPE_ZCOPY; 3080 } 3081 3082 if (bdev_io->type != SPDK_BDEV_IO_TYPE_ZCOPY) { 3083 return -EINVAL; 3084 } 3085 3086 bdev_io->u.bdev.zcopy.commit = commit ? 1 : 0; 3087 bdev_io->u.bdev.zcopy.start = 0; 3088 bdev_io->internal.caller_ctx = cb_arg; 3089 bdev_io->internal.cb = cb; 3090 bdev_io->internal.status = SPDK_BDEV_IO_STATUS_PENDING; 3091 3092 if (_spdk_bdev_io_type_supported(bdev, SPDK_BDEV_IO_TYPE_ZCOPY)) { 3093 spdk_bdev_io_submit(bdev_io); 3094 return 0; 3095 } 3096 3097 if (!bdev_io->u.bdev.zcopy.commit) { 3098 /* Don't use spdk_bdev_io_complete here - this bdev_io was never actually submitted. */ 3099 bdev_io->internal.status = SPDK_BDEV_IO_STATUS_SUCCESS; 3100 bdev_io->internal.cb(bdev_io, true, bdev_io->internal.caller_ctx); 3101 return 0; 3102 } 3103 3104 bdev_io->type = SPDK_BDEV_IO_TYPE_WRITE; 3105 spdk_bdev_io_submit(bdev_io); 3106 3107 return 0; 3108 } 3109 3110 int 3111 spdk_bdev_write_zeroes(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 3112 uint64_t offset, uint64_t len, 3113 spdk_bdev_io_completion_cb cb, void *cb_arg) 3114 { 3115 uint64_t offset_blocks, num_blocks; 3116 3117 if (spdk_bdev_bytes_to_blocks(spdk_bdev_desc_get_bdev(desc), offset, &offset_blocks, 3118 len, &num_blocks) != 0) { 3119 return -EINVAL; 3120 } 3121 3122 return spdk_bdev_write_zeroes_blocks(desc, ch, offset_blocks, num_blocks, cb, cb_arg); 3123 } 3124 3125 int 3126 spdk_bdev_write_zeroes_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 3127 uint64_t offset_blocks, uint64_t num_blocks, 3128 spdk_bdev_io_completion_cb cb, void *cb_arg) 3129 { 3130 struct spdk_bdev *bdev = spdk_bdev_desc_get_bdev(desc); 3131 struct spdk_bdev_io *bdev_io; 3132 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 3133 3134 if (!desc->write) { 3135 return -EBADF; 3136 } 3137 3138 if (!spdk_bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) { 3139 return -EINVAL; 3140 } 3141 3142 if (!_spdk_bdev_io_type_supported(bdev, SPDK_BDEV_IO_TYPE_WRITE_ZEROES) && 3143 !_spdk_bdev_io_type_supported(bdev, SPDK_BDEV_IO_TYPE_WRITE)) { 3144 return -ENOTSUP; 3145 } 3146 3147 bdev_io = spdk_bdev_get_io(channel); 3148 3149 if (!bdev_io) { 3150 return -ENOMEM; 3151 } 3152 3153 bdev_io->type = SPDK_BDEV_IO_TYPE_WRITE_ZEROES; 3154 bdev_io->internal.ch = channel; 3155 bdev_io->internal.desc = desc; 3156 bdev_io->u.bdev.offset_blocks = offset_blocks; 3157 bdev_io->u.bdev.num_blocks = num_blocks; 3158 spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb); 3159 3160 if (_spdk_bdev_io_type_supported(bdev, SPDK_BDEV_IO_TYPE_WRITE_ZEROES)) { 3161 spdk_bdev_io_submit(bdev_io); 3162 return 0; 3163 } 3164 3165 assert(_spdk_bdev_io_type_supported(bdev, SPDK_BDEV_IO_TYPE_WRITE)); 3166 assert(_bdev_get_block_size_with_md(bdev) <= ZERO_BUFFER_SIZE); 3167 bdev_io->u.bdev.split_remaining_num_blocks = num_blocks; 3168 bdev_io->u.bdev.split_current_offset_blocks = offset_blocks; 3169 _spdk_bdev_write_zero_buffer_next(bdev_io); 3170 3171 return 0; 3172 } 3173 3174 int 3175 spdk_bdev_unmap(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 3176 uint64_t offset, uint64_t nbytes, 3177 spdk_bdev_io_completion_cb cb, void *cb_arg) 3178 { 3179 uint64_t offset_blocks, num_blocks; 3180 3181 if (spdk_bdev_bytes_to_blocks(spdk_bdev_desc_get_bdev(desc), offset, &offset_blocks, 3182 nbytes, &num_blocks) != 0) { 3183 return -EINVAL; 3184 } 3185 3186 return spdk_bdev_unmap_blocks(desc, ch, offset_blocks, num_blocks, cb, cb_arg); 3187 } 3188 3189 int 3190 spdk_bdev_unmap_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 3191 uint64_t offset_blocks, uint64_t num_blocks, 3192 spdk_bdev_io_completion_cb cb, void *cb_arg) 3193 { 3194 struct spdk_bdev *bdev = spdk_bdev_desc_get_bdev(desc); 3195 struct spdk_bdev_io *bdev_io; 3196 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 3197 3198 if (!desc->write) { 3199 return -EBADF; 3200 } 3201 3202 if (!spdk_bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) { 3203 return -EINVAL; 3204 } 3205 3206 if (num_blocks == 0) { 3207 SPDK_ERRLOG("Can't unmap 0 bytes\n"); 3208 return -EINVAL; 3209 } 3210 3211 bdev_io = spdk_bdev_get_io(channel); 3212 if (!bdev_io) { 3213 return -ENOMEM; 3214 } 3215 3216 bdev_io->internal.ch = channel; 3217 bdev_io->internal.desc = desc; 3218 bdev_io->type = SPDK_BDEV_IO_TYPE_UNMAP; 3219 3220 bdev_io->u.bdev.iovs = &bdev_io->iov; 3221 bdev_io->u.bdev.iovs[0].iov_base = NULL; 3222 bdev_io->u.bdev.iovs[0].iov_len = 0; 3223 bdev_io->u.bdev.iovcnt = 1; 3224 3225 bdev_io->u.bdev.offset_blocks = offset_blocks; 3226 bdev_io->u.bdev.num_blocks = num_blocks; 3227 spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb); 3228 3229 spdk_bdev_io_submit(bdev_io); 3230 return 0; 3231 } 3232 3233 int 3234 spdk_bdev_flush(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 3235 uint64_t offset, uint64_t length, 3236 spdk_bdev_io_completion_cb cb, void *cb_arg) 3237 { 3238 uint64_t offset_blocks, num_blocks; 3239 3240 if (spdk_bdev_bytes_to_blocks(spdk_bdev_desc_get_bdev(desc), offset, &offset_blocks, 3241 length, &num_blocks) != 0) { 3242 return -EINVAL; 3243 } 3244 3245 return spdk_bdev_flush_blocks(desc, ch, offset_blocks, num_blocks, cb, cb_arg); 3246 } 3247 3248 int 3249 spdk_bdev_flush_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 3250 uint64_t offset_blocks, uint64_t num_blocks, 3251 spdk_bdev_io_completion_cb cb, void *cb_arg) 3252 { 3253 struct spdk_bdev *bdev = spdk_bdev_desc_get_bdev(desc); 3254 struct spdk_bdev_io *bdev_io; 3255 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 3256 3257 if (!desc->write) { 3258 return -EBADF; 3259 } 3260 3261 if (!spdk_bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) { 3262 return -EINVAL; 3263 } 3264 3265 bdev_io = spdk_bdev_get_io(channel); 3266 if (!bdev_io) { 3267 return -ENOMEM; 3268 } 3269 3270 bdev_io->internal.ch = channel; 3271 bdev_io->internal.desc = desc; 3272 bdev_io->type = SPDK_BDEV_IO_TYPE_FLUSH; 3273 bdev_io->u.bdev.iovs = NULL; 3274 bdev_io->u.bdev.iovcnt = 0; 3275 bdev_io->u.bdev.offset_blocks = offset_blocks; 3276 bdev_io->u.bdev.num_blocks = num_blocks; 3277 spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb); 3278 3279 spdk_bdev_io_submit(bdev_io); 3280 return 0; 3281 } 3282 3283 static void 3284 _spdk_bdev_reset_dev(struct spdk_io_channel_iter *i, int status) 3285 { 3286 struct spdk_bdev_channel *ch = spdk_io_channel_iter_get_ctx(i); 3287 struct spdk_bdev_io *bdev_io; 3288 3289 bdev_io = TAILQ_FIRST(&ch->queued_resets); 3290 TAILQ_REMOVE(&ch->queued_resets, bdev_io, internal.link); 3291 spdk_bdev_io_submit_reset(bdev_io); 3292 } 3293 3294 static void 3295 _spdk_bdev_reset_freeze_channel(struct spdk_io_channel_iter *i) 3296 { 3297 struct spdk_io_channel *ch; 3298 struct spdk_bdev_channel *channel; 3299 struct spdk_bdev_mgmt_channel *mgmt_channel; 3300 struct spdk_bdev_shared_resource *shared_resource; 3301 bdev_io_tailq_t tmp_queued; 3302 3303 TAILQ_INIT(&tmp_queued); 3304 3305 ch = spdk_io_channel_iter_get_channel(i); 3306 channel = spdk_io_channel_get_ctx(ch); 3307 shared_resource = channel->shared_resource; 3308 mgmt_channel = shared_resource->mgmt_ch; 3309 3310 channel->flags |= BDEV_CH_RESET_IN_PROGRESS; 3311 3312 if ((channel->flags & BDEV_CH_QOS_ENABLED) != 0) { 3313 /* The QoS object is always valid and readable while 3314 * the channel flag is set, so the lock here should not 3315 * be necessary. We're not in the fast path though, so 3316 * just take it anyway. */ 3317 pthread_mutex_lock(&channel->bdev->internal.mutex); 3318 if (channel->bdev->internal.qos->ch == channel) { 3319 TAILQ_SWAP(&channel->bdev->internal.qos->queued, &tmp_queued, spdk_bdev_io, internal.link); 3320 } 3321 pthread_mutex_unlock(&channel->bdev->internal.mutex); 3322 } 3323 3324 _spdk_bdev_abort_queued_io(&shared_resource->nomem_io, channel); 3325 _spdk_bdev_abort_buf_io(&mgmt_channel->need_buf_small, channel); 3326 _spdk_bdev_abort_buf_io(&mgmt_channel->need_buf_large, channel); 3327 _spdk_bdev_abort_queued_io(&tmp_queued, channel); 3328 3329 spdk_for_each_channel_continue(i, 0); 3330 } 3331 3332 static void 3333 _spdk_bdev_start_reset(void *ctx) 3334 { 3335 struct spdk_bdev_channel *ch = ctx; 3336 3337 spdk_for_each_channel(__bdev_to_io_dev(ch->bdev), _spdk_bdev_reset_freeze_channel, 3338 ch, _spdk_bdev_reset_dev); 3339 } 3340 3341 static void 3342 _spdk_bdev_channel_start_reset(struct spdk_bdev_channel *ch) 3343 { 3344 struct spdk_bdev *bdev = ch->bdev; 3345 3346 assert(!TAILQ_EMPTY(&ch->queued_resets)); 3347 3348 pthread_mutex_lock(&bdev->internal.mutex); 3349 if (bdev->internal.reset_in_progress == NULL) { 3350 bdev->internal.reset_in_progress = TAILQ_FIRST(&ch->queued_resets); 3351 /* 3352 * Take a channel reference for the target bdev for the life of this 3353 * reset. This guards against the channel getting destroyed while 3354 * spdk_for_each_channel() calls related to this reset IO are in 3355 * progress. We will release the reference when this reset is 3356 * completed. 3357 */ 3358 bdev->internal.reset_in_progress->u.reset.ch_ref = spdk_get_io_channel(__bdev_to_io_dev(bdev)); 3359 _spdk_bdev_start_reset(ch); 3360 } 3361 pthread_mutex_unlock(&bdev->internal.mutex); 3362 } 3363 3364 int 3365 spdk_bdev_reset(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 3366 spdk_bdev_io_completion_cb cb, void *cb_arg) 3367 { 3368 struct spdk_bdev *bdev = spdk_bdev_desc_get_bdev(desc); 3369 struct spdk_bdev_io *bdev_io; 3370 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 3371 3372 bdev_io = spdk_bdev_get_io(channel); 3373 if (!bdev_io) { 3374 return -ENOMEM; 3375 } 3376 3377 bdev_io->internal.ch = channel; 3378 bdev_io->internal.desc = desc; 3379 bdev_io->type = SPDK_BDEV_IO_TYPE_RESET; 3380 bdev_io->u.reset.ch_ref = NULL; 3381 spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb); 3382 3383 pthread_mutex_lock(&bdev->internal.mutex); 3384 TAILQ_INSERT_TAIL(&channel->queued_resets, bdev_io, internal.link); 3385 pthread_mutex_unlock(&bdev->internal.mutex); 3386 3387 _spdk_bdev_channel_start_reset(channel); 3388 3389 return 0; 3390 } 3391 3392 void 3393 spdk_bdev_get_io_stat(struct spdk_bdev *bdev, struct spdk_io_channel *ch, 3394 struct spdk_bdev_io_stat *stat) 3395 { 3396 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 3397 3398 *stat = channel->stat; 3399 } 3400 3401 static void 3402 _spdk_bdev_get_device_stat_done(struct spdk_io_channel_iter *i, int status) 3403 { 3404 void *io_device = spdk_io_channel_iter_get_io_device(i); 3405 struct spdk_bdev_iostat_ctx *bdev_iostat_ctx = spdk_io_channel_iter_get_ctx(i); 3406 3407 bdev_iostat_ctx->cb(__bdev_from_io_dev(io_device), bdev_iostat_ctx->stat, 3408 bdev_iostat_ctx->cb_arg, 0); 3409 free(bdev_iostat_ctx); 3410 } 3411 3412 static void 3413 _spdk_bdev_get_each_channel_stat(struct spdk_io_channel_iter *i) 3414 { 3415 struct spdk_bdev_iostat_ctx *bdev_iostat_ctx = spdk_io_channel_iter_get_ctx(i); 3416 struct spdk_io_channel *ch = spdk_io_channel_iter_get_channel(i); 3417 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 3418 3419 _spdk_bdev_io_stat_add(bdev_iostat_ctx->stat, &channel->stat); 3420 spdk_for_each_channel_continue(i, 0); 3421 } 3422 3423 void 3424 spdk_bdev_get_device_stat(struct spdk_bdev *bdev, struct spdk_bdev_io_stat *stat, 3425 spdk_bdev_get_device_stat_cb cb, void *cb_arg) 3426 { 3427 struct spdk_bdev_iostat_ctx *bdev_iostat_ctx; 3428 3429 assert(bdev != NULL); 3430 assert(stat != NULL); 3431 assert(cb != NULL); 3432 3433 bdev_iostat_ctx = calloc(1, sizeof(struct spdk_bdev_iostat_ctx)); 3434 if (bdev_iostat_ctx == NULL) { 3435 SPDK_ERRLOG("Unable to allocate memory for spdk_bdev_iostat_ctx\n"); 3436 cb(bdev, stat, cb_arg, -ENOMEM); 3437 return; 3438 } 3439 3440 bdev_iostat_ctx->stat = stat; 3441 bdev_iostat_ctx->cb = cb; 3442 bdev_iostat_ctx->cb_arg = cb_arg; 3443 3444 /* Start with the statistics from previously deleted channels. */ 3445 pthread_mutex_lock(&bdev->internal.mutex); 3446 _spdk_bdev_io_stat_add(bdev_iostat_ctx->stat, &bdev->internal.stat); 3447 pthread_mutex_unlock(&bdev->internal.mutex); 3448 3449 /* Then iterate and add the statistics from each existing channel. */ 3450 spdk_for_each_channel(__bdev_to_io_dev(bdev), 3451 _spdk_bdev_get_each_channel_stat, 3452 bdev_iostat_ctx, 3453 _spdk_bdev_get_device_stat_done); 3454 } 3455 3456 int 3457 spdk_bdev_nvme_admin_passthru(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 3458 const struct spdk_nvme_cmd *cmd, void *buf, size_t nbytes, 3459 spdk_bdev_io_completion_cb cb, void *cb_arg) 3460 { 3461 struct spdk_bdev *bdev = spdk_bdev_desc_get_bdev(desc); 3462 struct spdk_bdev_io *bdev_io; 3463 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 3464 3465 if (!desc->write) { 3466 return -EBADF; 3467 } 3468 3469 bdev_io = spdk_bdev_get_io(channel); 3470 if (!bdev_io) { 3471 return -ENOMEM; 3472 } 3473 3474 bdev_io->internal.ch = channel; 3475 bdev_io->internal.desc = desc; 3476 bdev_io->type = SPDK_BDEV_IO_TYPE_NVME_ADMIN; 3477 bdev_io->u.nvme_passthru.cmd = *cmd; 3478 bdev_io->u.nvme_passthru.buf = buf; 3479 bdev_io->u.nvme_passthru.nbytes = nbytes; 3480 bdev_io->u.nvme_passthru.md_buf = NULL; 3481 bdev_io->u.nvme_passthru.md_len = 0; 3482 3483 spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb); 3484 3485 spdk_bdev_io_submit(bdev_io); 3486 return 0; 3487 } 3488 3489 int 3490 spdk_bdev_nvme_io_passthru(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 3491 const struct spdk_nvme_cmd *cmd, void *buf, size_t nbytes, 3492 spdk_bdev_io_completion_cb cb, void *cb_arg) 3493 { 3494 struct spdk_bdev *bdev = spdk_bdev_desc_get_bdev(desc); 3495 struct spdk_bdev_io *bdev_io; 3496 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 3497 3498 if (!desc->write) { 3499 /* 3500 * Do not try to parse the NVMe command - we could maybe use bits in the opcode 3501 * to easily determine if the command is a read or write, but for now just 3502 * do not allow io_passthru with a read-only descriptor. 3503 */ 3504 return -EBADF; 3505 } 3506 3507 bdev_io = spdk_bdev_get_io(channel); 3508 if (!bdev_io) { 3509 return -ENOMEM; 3510 } 3511 3512 bdev_io->internal.ch = channel; 3513 bdev_io->internal.desc = desc; 3514 bdev_io->type = SPDK_BDEV_IO_TYPE_NVME_IO; 3515 bdev_io->u.nvme_passthru.cmd = *cmd; 3516 bdev_io->u.nvme_passthru.buf = buf; 3517 bdev_io->u.nvme_passthru.nbytes = nbytes; 3518 bdev_io->u.nvme_passthru.md_buf = NULL; 3519 bdev_io->u.nvme_passthru.md_len = 0; 3520 3521 spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb); 3522 3523 spdk_bdev_io_submit(bdev_io); 3524 return 0; 3525 } 3526 3527 int 3528 spdk_bdev_nvme_io_passthru_md(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 3529 const struct spdk_nvme_cmd *cmd, void *buf, size_t nbytes, void *md_buf, size_t md_len, 3530 spdk_bdev_io_completion_cb cb, void *cb_arg) 3531 { 3532 struct spdk_bdev *bdev = spdk_bdev_desc_get_bdev(desc); 3533 struct spdk_bdev_io *bdev_io; 3534 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 3535 3536 if (!desc->write) { 3537 /* 3538 * Do not try to parse the NVMe command - we could maybe use bits in the opcode 3539 * to easily determine if the command is a read or write, but for now just 3540 * do not allow io_passthru with a read-only descriptor. 3541 */ 3542 return -EBADF; 3543 } 3544 3545 bdev_io = spdk_bdev_get_io(channel); 3546 if (!bdev_io) { 3547 return -ENOMEM; 3548 } 3549 3550 bdev_io->internal.ch = channel; 3551 bdev_io->internal.desc = desc; 3552 bdev_io->type = SPDK_BDEV_IO_TYPE_NVME_IO_MD; 3553 bdev_io->u.nvme_passthru.cmd = *cmd; 3554 bdev_io->u.nvme_passthru.buf = buf; 3555 bdev_io->u.nvme_passthru.nbytes = nbytes; 3556 bdev_io->u.nvme_passthru.md_buf = md_buf; 3557 bdev_io->u.nvme_passthru.md_len = md_len; 3558 3559 spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb); 3560 3561 spdk_bdev_io_submit(bdev_io); 3562 return 0; 3563 } 3564 3565 int 3566 spdk_bdev_queue_io_wait(struct spdk_bdev *bdev, struct spdk_io_channel *ch, 3567 struct spdk_bdev_io_wait_entry *entry) 3568 { 3569 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 3570 struct spdk_bdev_mgmt_channel *mgmt_ch = channel->shared_resource->mgmt_ch; 3571 3572 if (bdev != entry->bdev) { 3573 SPDK_ERRLOG("bdevs do not match\n"); 3574 return -EINVAL; 3575 } 3576 3577 if (mgmt_ch->per_thread_cache_count > 0) { 3578 SPDK_ERRLOG("Cannot queue io_wait if spdk_bdev_io available in per-thread cache\n"); 3579 return -EINVAL; 3580 } 3581 3582 TAILQ_INSERT_TAIL(&mgmt_ch->io_wait_queue, entry, link); 3583 return 0; 3584 } 3585 3586 static void 3587 _spdk_bdev_ch_retry_io(struct spdk_bdev_channel *bdev_ch) 3588 { 3589 struct spdk_bdev *bdev = bdev_ch->bdev; 3590 struct spdk_bdev_shared_resource *shared_resource = bdev_ch->shared_resource; 3591 struct spdk_bdev_io *bdev_io; 3592 3593 if (shared_resource->io_outstanding > shared_resource->nomem_threshold) { 3594 /* 3595 * Allow some more I/O to complete before retrying the nomem_io queue. 3596 * Some drivers (such as nvme) cannot immediately take a new I/O in 3597 * the context of a completion, because the resources for the I/O are 3598 * not released until control returns to the bdev poller. Also, we 3599 * may require several small I/O to complete before a larger I/O 3600 * (that requires splitting) can be submitted. 3601 */ 3602 return; 3603 } 3604 3605 while (!TAILQ_EMPTY(&shared_resource->nomem_io)) { 3606 bdev_io = TAILQ_FIRST(&shared_resource->nomem_io); 3607 TAILQ_REMOVE(&shared_resource->nomem_io, bdev_io, internal.link); 3608 bdev_io->internal.ch->io_outstanding++; 3609 shared_resource->io_outstanding++; 3610 bdev_io->internal.status = SPDK_BDEV_IO_STATUS_PENDING; 3611 bdev->fn_table->submit_request(spdk_bdev_io_get_io_channel(bdev_io), bdev_io); 3612 if (bdev_io->internal.status == SPDK_BDEV_IO_STATUS_NOMEM) { 3613 break; 3614 } 3615 } 3616 } 3617 3618 static inline void 3619 _spdk_bdev_io_complete(void *ctx) 3620 { 3621 struct spdk_bdev_io *bdev_io = ctx; 3622 uint64_t tsc, tsc_diff; 3623 3624 if (spdk_unlikely(bdev_io->internal.in_submit_request || bdev_io->internal.io_submit_ch)) { 3625 /* 3626 * Send the completion to the thread that originally submitted the I/O, 3627 * which may not be the current thread in the case of QoS. 3628 */ 3629 if (bdev_io->internal.io_submit_ch) { 3630 bdev_io->internal.ch = bdev_io->internal.io_submit_ch; 3631 bdev_io->internal.io_submit_ch = NULL; 3632 } 3633 3634 /* 3635 * Defer completion to avoid potential infinite recursion if the 3636 * user's completion callback issues a new I/O. 3637 */ 3638 spdk_thread_send_msg(spdk_bdev_io_get_thread(bdev_io), 3639 _spdk_bdev_io_complete, bdev_io); 3640 return; 3641 } 3642 3643 tsc = spdk_get_ticks(); 3644 tsc_diff = tsc - bdev_io->internal.submit_tsc; 3645 spdk_trace_record_tsc(tsc, TRACE_BDEV_IO_DONE, 0, 0, (uintptr_t)bdev_io, 0); 3646 3647 if (bdev_io->internal.ch->histogram) { 3648 spdk_histogram_data_tally(bdev_io->internal.ch->histogram, tsc_diff); 3649 } 3650 3651 if (bdev_io->internal.status == SPDK_BDEV_IO_STATUS_SUCCESS) { 3652 switch (bdev_io->type) { 3653 case SPDK_BDEV_IO_TYPE_READ: 3654 bdev_io->internal.ch->stat.bytes_read += bdev_io->u.bdev.num_blocks * bdev_io->bdev->blocklen; 3655 bdev_io->internal.ch->stat.num_read_ops++; 3656 bdev_io->internal.ch->stat.read_latency_ticks += tsc_diff; 3657 break; 3658 case SPDK_BDEV_IO_TYPE_WRITE: 3659 bdev_io->internal.ch->stat.bytes_written += bdev_io->u.bdev.num_blocks * bdev_io->bdev->blocklen; 3660 bdev_io->internal.ch->stat.num_write_ops++; 3661 bdev_io->internal.ch->stat.write_latency_ticks += tsc_diff; 3662 break; 3663 case SPDK_BDEV_IO_TYPE_UNMAP: 3664 bdev_io->internal.ch->stat.bytes_unmapped += bdev_io->u.bdev.num_blocks * bdev_io->bdev->blocklen; 3665 bdev_io->internal.ch->stat.num_unmap_ops++; 3666 bdev_io->internal.ch->stat.unmap_latency_ticks += tsc_diff; 3667 default: 3668 break; 3669 } 3670 } 3671 3672 #ifdef SPDK_CONFIG_VTUNE 3673 uint64_t now_tsc = spdk_get_ticks(); 3674 if (now_tsc > (bdev_io->internal.ch->start_tsc + bdev_io->internal.ch->interval_tsc)) { 3675 uint64_t data[5]; 3676 3677 data[0] = bdev_io->internal.ch->stat.num_read_ops - bdev_io->internal.ch->prev_stat.num_read_ops; 3678 data[1] = bdev_io->internal.ch->stat.bytes_read - bdev_io->internal.ch->prev_stat.bytes_read; 3679 data[2] = bdev_io->internal.ch->stat.num_write_ops - bdev_io->internal.ch->prev_stat.num_write_ops; 3680 data[3] = bdev_io->internal.ch->stat.bytes_written - bdev_io->internal.ch->prev_stat.bytes_written; 3681 data[4] = bdev_io->bdev->fn_table->get_spin_time ? 3682 bdev_io->bdev->fn_table->get_spin_time(spdk_bdev_io_get_io_channel(bdev_io)) : 0; 3683 3684 __itt_metadata_add(g_bdev_mgr.domain, __itt_null, bdev_io->internal.ch->handle, 3685 __itt_metadata_u64, 5, data); 3686 3687 bdev_io->internal.ch->prev_stat = bdev_io->internal.ch->stat; 3688 bdev_io->internal.ch->start_tsc = now_tsc; 3689 } 3690 #endif 3691 3692 assert(bdev_io->internal.cb != NULL); 3693 assert(spdk_get_thread() == spdk_bdev_io_get_thread(bdev_io)); 3694 3695 bdev_io->internal.cb(bdev_io, bdev_io->internal.status == SPDK_BDEV_IO_STATUS_SUCCESS, 3696 bdev_io->internal.caller_ctx); 3697 } 3698 3699 static void 3700 _spdk_bdev_reset_complete(struct spdk_io_channel_iter *i, int status) 3701 { 3702 struct spdk_bdev_io *bdev_io = spdk_io_channel_iter_get_ctx(i); 3703 3704 if (bdev_io->u.reset.ch_ref != NULL) { 3705 spdk_put_io_channel(bdev_io->u.reset.ch_ref); 3706 bdev_io->u.reset.ch_ref = NULL; 3707 } 3708 3709 _spdk_bdev_io_complete(bdev_io); 3710 } 3711 3712 static void 3713 _spdk_bdev_unfreeze_channel(struct spdk_io_channel_iter *i) 3714 { 3715 struct spdk_io_channel *_ch = spdk_io_channel_iter_get_channel(i); 3716 struct spdk_bdev_channel *ch = spdk_io_channel_get_ctx(_ch); 3717 3718 ch->flags &= ~BDEV_CH_RESET_IN_PROGRESS; 3719 if (!TAILQ_EMPTY(&ch->queued_resets)) { 3720 _spdk_bdev_channel_start_reset(ch); 3721 } 3722 3723 spdk_for_each_channel_continue(i, 0); 3724 } 3725 3726 void 3727 spdk_bdev_io_complete(struct spdk_bdev_io *bdev_io, enum spdk_bdev_io_status status) 3728 { 3729 struct spdk_bdev *bdev = bdev_io->bdev; 3730 struct spdk_bdev_channel *bdev_ch = bdev_io->internal.ch; 3731 struct spdk_bdev_shared_resource *shared_resource = bdev_ch->shared_resource; 3732 3733 bdev_io->internal.status = status; 3734 3735 if (spdk_unlikely(bdev_io->type == SPDK_BDEV_IO_TYPE_RESET)) { 3736 bool unlock_channels = false; 3737 3738 if (status == SPDK_BDEV_IO_STATUS_NOMEM) { 3739 SPDK_ERRLOG("NOMEM returned for reset\n"); 3740 } 3741 pthread_mutex_lock(&bdev->internal.mutex); 3742 if (bdev_io == bdev->internal.reset_in_progress) { 3743 bdev->internal.reset_in_progress = NULL; 3744 unlock_channels = true; 3745 } 3746 pthread_mutex_unlock(&bdev->internal.mutex); 3747 3748 if (unlock_channels) { 3749 spdk_for_each_channel(__bdev_to_io_dev(bdev), _spdk_bdev_unfreeze_channel, 3750 bdev_io, _spdk_bdev_reset_complete); 3751 return; 3752 } 3753 } else { 3754 _bdev_io_unset_bounce_buf(bdev_io); 3755 3756 assert(bdev_ch->io_outstanding > 0); 3757 assert(shared_resource->io_outstanding > 0); 3758 bdev_ch->io_outstanding--; 3759 shared_resource->io_outstanding--; 3760 3761 if (spdk_unlikely(status == SPDK_BDEV_IO_STATUS_NOMEM)) { 3762 TAILQ_INSERT_HEAD(&shared_resource->nomem_io, bdev_io, internal.link); 3763 /* 3764 * Wait for some of the outstanding I/O to complete before we 3765 * retry any of the nomem_io. Normally we will wait for 3766 * NOMEM_THRESHOLD_COUNT I/O to complete but for low queue 3767 * depth channels we will instead wait for half to complete. 3768 */ 3769 shared_resource->nomem_threshold = spdk_max((int64_t)shared_resource->io_outstanding / 2, 3770 (int64_t)shared_resource->io_outstanding - NOMEM_THRESHOLD_COUNT); 3771 return; 3772 } 3773 3774 if (spdk_unlikely(!TAILQ_EMPTY(&shared_resource->nomem_io))) { 3775 _spdk_bdev_ch_retry_io(bdev_ch); 3776 } 3777 } 3778 3779 _spdk_bdev_io_complete(bdev_io); 3780 } 3781 3782 void 3783 spdk_bdev_io_complete_scsi_status(struct spdk_bdev_io *bdev_io, enum spdk_scsi_status sc, 3784 enum spdk_scsi_sense sk, uint8_t asc, uint8_t ascq) 3785 { 3786 if (sc == SPDK_SCSI_STATUS_GOOD) { 3787 bdev_io->internal.status = SPDK_BDEV_IO_STATUS_SUCCESS; 3788 } else { 3789 bdev_io->internal.status = SPDK_BDEV_IO_STATUS_SCSI_ERROR; 3790 bdev_io->internal.error.scsi.sc = sc; 3791 bdev_io->internal.error.scsi.sk = sk; 3792 bdev_io->internal.error.scsi.asc = asc; 3793 bdev_io->internal.error.scsi.ascq = ascq; 3794 } 3795 3796 spdk_bdev_io_complete(bdev_io, bdev_io->internal.status); 3797 } 3798 3799 void 3800 spdk_bdev_io_get_scsi_status(const struct spdk_bdev_io *bdev_io, 3801 int *sc, int *sk, int *asc, int *ascq) 3802 { 3803 assert(sc != NULL); 3804 assert(sk != NULL); 3805 assert(asc != NULL); 3806 assert(ascq != NULL); 3807 3808 switch (bdev_io->internal.status) { 3809 case SPDK_BDEV_IO_STATUS_SUCCESS: 3810 *sc = SPDK_SCSI_STATUS_GOOD; 3811 *sk = SPDK_SCSI_SENSE_NO_SENSE; 3812 *asc = SPDK_SCSI_ASC_NO_ADDITIONAL_SENSE; 3813 *ascq = SPDK_SCSI_ASCQ_CAUSE_NOT_REPORTABLE; 3814 break; 3815 case SPDK_BDEV_IO_STATUS_NVME_ERROR: 3816 spdk_scsi_nvme_translate(bdev_io, sc, sk, asc, ascq); 3817 break; 3818 case SPDK_BDEV_IO_STATUS_SCSI_ERROR: 3819 *sc = bdev_io->internal.error.scsi.sc; 3820 *sk = bdev_io->internal.error.scsi.sk; 3821 *asc = bdev_io->internal.error.scsi.asc; 3822 *ascq = bdev_io->internal.error.scsi.ascq; 3823 break; 3824 default: 3825 *sc = SPDK_SCSI_STATUS_CHECK_CONDITION; 3826 *sk = SPDK_SCSI_SENSE_ABORTED_COMMAND; 3827 *asc = SPDK_SCSI_ASC_NO_ADDITIONAL_SENSE; 3828 *ascq = SPDK_SCSI_ASCQ_CAUSE_NOT_REPORTABLE; 3829 break; 3830 } 3831 } 3832 3833 void 3834 spdk_bdev_io_complete_nvme_status(struct spdk_bdev_io *bdev_io, int sct, int sc) 3835 { 3836 if (sct == SPDK_NVME_SCT_GENERIC && sc == SPDK_NVME_SC_SUCCESS) { 3837 bdev_io->internal.status = SPDK_BDEV_IO_STATUS_SUCCESS; 3838 } else { 3839 bdev_io->internal.error.nvme.sct = sct; 3840 bdev_io->internal.error.nvme.sc = sc; 3841 bdev_io->internal.status = SPDK_BDEV_IO_STATUS_NVME_ERROR; 3842 } 3843 3844 spdk_bdev_io_complete(bdev_io, bdev_io->internal.status); 3845 } 3846 3847 void 3848 spdk_bdev_io_get_nvme_status(const struct spdk_bdev_io *bdev_io, int *sct, int *sc) 3849 { 3850 assert(sct != NULL); 3851 assert(sc != NULL); 3852 3853 if (bdev_io->internal.status == SPDK_BDEV_IO_STATUS_NVME_ERROR) { 3854 *sct = bdev_io->internal.error.nvme.sct; 3855 *sc = bdev_io->internal.error.nvme.sc; 3856 } else if (bdev_io->internal.status == SPDK_BDEV_IO_STATUS_SUCCESS) { 3857 *sct = SPDK_NVME_SCT_GENERIC; 3858 *sc = SPDK_NVME_SC_SUCCESS; 3859 } else { 3860 *sct = SPDK_NVME_SCT_GENERIC; 3861 *sc = SPDK_NVME_SC_INTERNAL_DEVICE_ERROR; 3862 } 3863 } 3864 3865 struct spdk_thread * 3866 spdk_bdev_io_get_thread(struct spdk_bdev_io *bdev_io) 3867 { 3868 return spdk_io_channel_get_thread(bdev_io->internal.ch->channel); 3869 } 3870 3871 struct spdk_io_channel * 3872 spdk_bdev_io_get_io_channel(struct spdk_bdev_io *bdev_io) 3873 { 3874 return bdev_io->internal.ch->channel; 3875 } 3876 3877 static void 3878 _spdk_bdev_qos_config_limit(struct spdk_bdev *bdev, uint64_t *limits) 3879 { 3880 uint64_t min_qos_set; 3881 int i; 3882 3883 for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) { 3884 if (limits[i] != SPDK_BDEV_QOS_LIMIT_NOT_DEFINED) { 3885 break; 3886 } 3887 } 3888 3889 if (i == SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES) { 3890 SPDK_ERRLOG("Invalid rate limits set.\n"); 3891 return; 3892 } 3893 3894 for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) { 3895 if (limits[i] == SPDK_BDEV_QOS_LIMIT_NOT_DEFINED) { 3896 continue; 3897 } 3898 3899 if (_spdk_bdev_qos_is_iops_rate_limit(i) == true) { 3900 min_qos_set = SPDK_BDEV_QOS_MIN_IOS_PER_SEC; 3901 } else { 3902 min_qos_set = SPDK_BDEV_QOS_MIN_BYTES_PER_SEC; 3903 } 3904 3905 if (limits[i] == 0 || limits[i] % min_qos_set) { 3906 SPDK_ERRLOG("Assigned limit %" PRIu64 " on bdev %s is not multiple of %" PRIu64 "\n", 3907 limits[i], bdev->name, min_qos_set); 3908 SPDK_ERRLOG("Failed to enable QoS on this bdev %s\n", bdev->name); 3909 return; 3910 } 3911 } 3912 3913 if (!bdev->internal.qos) { 3914 bdev->internal.qos = calloc(1, sizeof(*bdev->internal.qos)); 3915 if (!bdev->internal.qos) { 3916 SPDK_ERRLOG("Unable to allocate memory for QoS tracking\n"); 3917 return; 3918 } 3919 } 3920 3921 for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) { 3922 bdev->internal.qos->rate_limits[i].limit = limits[i]; 3923 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Bdev:%s QoS type:%d set:%lu\n", 3924 bdev->name, i, limits[i]); 3925 } 3926 3927 return; 3928 } 3929 3930 static void 3931 _spdk_bdev_qos_config(struct spdk_bdev *bdev) 3932 { 3933 struct spdk_conf_section *sp = NULL; 3934 const char *val = NULL; 3935 int i = 0, j = 0; 3936 uint64_t limits[SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES] = {}; 3937 bool config_qos = false; 3938 3939 sp = spdk_conf_find_section(NULL, "QoS"); 3940 if (!sp) { 3941 return; 3942 } 3943 3944 while (j < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES) { 3945 limits[j] = SPDK_BDEV_QOS_LIMIT_NOT_DEFINED; 3946 3947 i = 0; 3948 while (true) { 3949 val = spdk_conf_section_get_nmval(sp, qos_conf_type[j], i, 0); 3950 if (!val) { 3951 break; 3952 } 3953 3954 if (strcmp(bdev->name, val) != 0) { 3955 i++; 3956 continue; 3957 } 3958 3959 val = spdk_conf_section_get_nmval(sp, qos_conf_type[j], i, 1); 3960 if (val) { 3961 if (_spdk_bdev_qos_is_iops_rate_limit(j) == true) { 3962 limits[j] = strtoull(val, NULL, 10); 3963 } else { 3964 limits[j] = strtoull(val, NULL, 10) * 1024 * 1024; 3965 } 3966 config_qos = true; 3967 } 3968 3969 break; 3970 } 3971 3972 j++; 3973 } 3974 3975 if (config_qos == true) { 3976 _spdk_bdev_qos_config_limit(bdev, limits); 3977 } 3978 3979 return; 3980 } 3981 3982 static int 3983 spdk_bdev_init(struct spdk_bdev *bdev) 3984 { 3985 char *bdev_name; 3986 3987 assert(bdev->module != NULL); 3988 3989 if (!bdev->name) { 3990 SPDK_ERRLOG("Bdev name is NULL\n"); 3991 return -EINVAL; 3992 } 3993 3994 if (spdk_bdev_get_by_name(bdev->name)) { 3995 SPDK_ERRLOG("Bdev name:%s already exists\n", bdev->name); 3996 return -EEXIST; 3997 } 3998 3999 /* Users often register their own I/O devices using the bdev name. In 4000 * order to avoid conflicts, prepend bdev_. */ 4001 bdev_name = spdk_sprintf_alloc("bdev_%s", bdev->name); 4002 if (!bdev_name) { 4003 SPDK_ERRLOG("Unable to allocate memory for internal bdev name.\n"); 4004 return -ENOMEM; 4005 } 4006 4007 bdev->internal.status = SPDK_BDEV_STATUS_READY; 4008 bdev->internal.measured_queue_depth = UINT64_MAX; 4009 bdev->internal.claim_module = NULL; 4010 bdev->internal.qd_poller = NULL; 4011 bdev->internal.qos = NULL; 4012 4013 if (spdk_bdev_get_buf_align(bdev) > 1) { 4014 if (bdev->split_on_optimal_io_boundary) { 4015 bdev->optimal_io_boundary = spdk_min(bdev->optimal_io_boundary, 4016 SPDK_BDEV_LARGE_BUF_MAX_SIZE / bdev->blocklen); 4017 } else { 4018 bdev->split_on_optimal_io_boundary = true; 4019 bdev->optimal_io_boundary = SPDK_BDEV_LARGE_BUF_MAX_SIZE / bdev->blocklen; 4020 } 4021 } 4022 4023 TAILQ_INIT(&bdev->internal.open_descs); 4024 4025 TAILQ_INIT(&bdev->aliases); 4026 4027 bdev->internal.reset_in_progress = NULL; 4028 4029 _spdk_bdev_qos_config(bdev); 4030 4031 spdk_io_device_register(__bdev_to_io_dev(bdev), 4032 spdk_bdev_channel_create, spdk_bdev_channel_destroy, 4033 sizeof(struct spdk_bdev_channel), 4034 bdev_name); 4035 4036 free(bdev_name); 4037 4038 pthread_mutex_init(&bdev->internal.mutex, NULL); 4039 return 0; 4040 } 4041 4042 static void 4043 spdk_bdev_destroy_cb(void *io_device) 4044 { 4045 int rc; 4046 struct spdk_bdev *bdev; 4047 spdk_bdev_unregister_cb cb_fn; 4048 void *cb_arg; 4049 4050 bdev = __bdev_from_io_dev(io_device); 4051 cb_fn = bdev->internal.unregister_cb; 4052 cb_arg = bdev->internal.unregister_ctx; 4053 4054 rc = bdev->fn_table->destruct(bdev->ctxt); 4055 if (rc < 0) { 4056 SPDK_ERRLOG("destruct failed\n"); 4057 } 4058 if (rc <= 0 && cb_fn != NULL) { 4059 cb_fn(cb_arg, rc); 4060 } 4061 } 4062 4063 4064 static void 4065 spdk_bdev_fini(struct spdk_bdev *bdev) 4066 { 4067 pthread_mutex_destroy(&bdev->internal.mutex); 4068 4069 free(bdev->internal.qos); 4070 4071 spdk_io_device_unregister(__bdev_to_io_dev(bdev), spdk_bdev_destroy_cb); 4072 } 4073 4074 static void 4075 spdk_bdev_start(struct spdk_bdev *bdev) 4076 { 4077 struct spdk_bdev_module *module; 4078 uint32_t action; 4079 4080 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Inserting bdev %s into list\n", bdev->name); 4081 TAILQ_INSERT_TAIL(&g_bdev_mgr.bdevs, bdev, internal.link); 4082 4083 /* Examine configuration before initializing I/O */ 4084 TAILQ_FOREACH(module, &g_bdev_mgr.bdev_modules, internal.tailq) { 4085 if (module->examine_config) { 4086 action = module->internal.action_in_progress; 4087 module->internal.action_in_progress++; 4088 module->examine_config(bdev); 4089 if (action != module->internal.action_in_progress) { 4090 SPDK_ERRLOG("examine_config for module %s did not call spdk_bdev_module_examine_done()\n", 4091 module->name); 4092 } 4093 } 4094 } 4095 4096 if (bdev->internal.claim_module) { 4097 if (bdev->internal.claim_module->examine_disk) { 4098 bdev->internal.claim_module->internal.action_in_progress++; 4099 bdev->internal.claim_module->examine_disk(bdev); 4100 } 4101 return; 4102 } 4103 4104 TAILQ_FOREACH(module, &g_bdev_mgr.bdev_modules, internal.tailq) { 4105 if (module->examine_disk) { 4106 module->internal.action_in_progress++; 4107 module->examine_disk(bdev); 4108 } 4109 } 4110 } 4111 4112 int 4113 spdk_bdev_register(struct spdk_bdev *bdev) 4114 { 4115 int rc = spdk_bdev_init(bdev); 4116 4117 if (rc == 0) { 4118 spdk_bdev_start(bdev); 4119 } 4120 4121 spdk_notify_send("bdev_register", spdk_bdev_get_name(bdev)); 4122 return rc; 4123 } 4124 4125 int 4126 spdk_vbdev_register(struct spdk_bdev *vbdev, struct spdk_bdev **base_bdevs, int base_bdev_count) 4127 { 4128 SPDK_ERRLOG("This function is deprecated. Use spdk_bdev_register() instead.\n"); 4129 return spdk_bdev_register(vbdev); 4130 } 4131 4132 void 4133 spdk_bdev_destruct_done(struct spdk_bdev *bdev, int bdeverrno) 4134 { 4135 if (bdev->internal.unregister_cb != NULL) { 4136 bdev->internal.unregister_cb(bdev->internal.unregister_ctx, bdeverrno); 4137 } 4138 } 4139 4140 static void 4141 _remove_notify(void *arg) 4142 { 4143 struct spdk_bdev_desc *desc = arg; 4144 4145 desc->remove_scheduled = false; 4146 4147 if (desc->closed) { 4148 free(desc); 4149 } else { 4150 desc->remove_cb(desc->remove_ctx); 4151 } 4152 } 4153 4154 /* Must be called while holding bdev->internal.mutex. 4155 * returns: 0 - bdev removed and ready to be destructed. 4156 * -EBUSY - bdev can't be destructed yet. */ 4157 static int 4158 spdk_bdev_unregister_unsafe(struct spdk_bdev *bdev) 4159 { 4160 struct spdk_bdev_desc *desc, *tmp; 4161 int rc = 0; 4162 4163 /* Notify each descriptor about hotremoval */ 4164 TAILQ_FOREACH_SAFE(desc, &bdev->internal.open_descs, link, tmp) { 4165 rc = -EBUSY; 4166 if (desc->remove_cb) { 4167 /* 4168 * Defer invocation of the remove_cb to a separate message that will 4169 * run later on its thread. This ensures this context unwinds and 4170 * we don't recursively unregister this bdev again if the remove_cb 4171 * immediately closes its descriptor. 4172 */ 4173 if (!desc->remove_scheduled) { 4174 /* Avoid scheduling removal of the same descriptor multiple times. */ 4175 desc->remove_scheduled = true; 4176 spdk_thread_send_msg(desc->thread, _remove_notify, desc); 4177 } 4178 } 4179 } 4180 4181 /* If there are no descriptors, proceed removing the bdev */ 4182 if (rc == 0) { 4183 TAILQ_REMOVE(&g_bdev_mgr.bdevs, bdev, internal.link); 4184 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Removing bdev %s from list done\n", bdev->name); 4185 spdk_notify_send("bdev_unregister", spdk_bdev_get_name(bdev)); 4186 } 4187 4188 return rc; 4189 } 4190 4191 void 4192 spdk_bdev_unregister(struct spdk_bdev *bdev, spdk_bdev_unregister_cb cb_fn, void *cb_arg) 4193 { 4194 struct spdk_thread *thread; 4195 int rc; 4196 4197 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Removing bdev %s from list\n", bdev->name); 4198 4199 thread = spdk_get_thread(); 4200 if (!thread) { 4201 /* The user called this from a non-SPDK thread. */ 4202 if (cb_fn != NULL) { 4203 cb_fn(cb_arg, -ENOTSUP); 4204 } 4205 return; 4206 } 4207 4208 pthread_mutex_lock(&bdev->internal.mutex); 4209 if (bdev->internal.status == SPDK_BDEV_STATUS_REMOVING) { 4210 pthread_mutex_unlock(&bdev->internal.mutex); 4211 if (cb_fn) { 4212 cb_fn(cb_arg, -EBUSY); 4213 } 4214 return; 4215 } 4216 4217 bdev->internal.status = SPDK_BDEV_STATUS_REMOVING; 4218 bdev->internal.unregister_cb = cb_fn; 4219 bdev->internal.unregister_ctx = cb_arg; 4220 4221 /* Call under lock. */ 4222 rc = spdk_bdev_unregister_unsafe(bdev); 4223 pthread_mutex_unlock(&bdev->internal.mutex); 4224 4225 if (rc == 0) { 4226 spdk_bdev_fini(bdev); 4227 } 4228 } 4229 4230 int 4231 spdk_bdev_open(struct spdk_bdev *bdev, bool write, spdk_bdev_remove_cb_t remove_cb, 4232 void *remove_ctx, struct spdk_bdev_desc **_desc) 4233 { 4234 struct spdk_bdev_desc *desc; 4235 struct spdk_thread *thread; 4236 struct set_qos_limit_ctx *ctx; 4237 4238 thread = spdk_get_thread(); 4239 if (!thread) { 4240 SPDK_ERRLOG("Cannot open bdev from non-SPDK thread.\n"); 4241 return -ENOTSUP; 4242 } 4243 4244 desc = calloc(1, sizeof(*desc)); 4245 if (desc == NULL) { 4246 SPDK_ERRLOG("Failed to allocate memory for bdev descriptor\n"); 4247 return -ENOMEM; 4248 } 4249 4250 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Opening descriptor %p for bdev %s on thread %p\n", desc, bdev->name, 4251 spdk_get_thread()); 4252 4253 desc->bdev = bdev; 4254 desc->thread = thread; 4255 desc->remove_cb = remove_cb; 4256 desc->remove_ctx = remove_ctx; 4257 desc->write = write; 4258 *_desc = desc; 4259 4260 pthread_mutex_lock(&bdev->internal.mutex); 4261 4262 if (write && bdev->internal.claim_module) { 4263 SPDK_ERRLOG("Could not open %s - %s module already claimed it\n", 4264 bdev->name, bdev->internal.claim_module->name); 4265 pthread_mutex_unlock(&bdev->internal.mutex); 4266 free(desc); 4267 *_desc = NULL; 4268 return -EPERM; 4269 } 4270 4271 /* Enable QoS */ 4272 if (bdev->internal.qos && bdev->internal.qos->thread == NULL) { 4273 ctx = calloc(1, sizeof(*ctx)); 4274 if (ctx == NULL) { 4275 SPDK_ERRLOG("Failed to allocate memory for QoS context\n"); 4276 pthread_mutex_unlock(&bdev->internal.mutex); 4277 free(desc); 4278 *_desc = NULL; 4279 return -ENOMEM; 4280 } 4281 ctx->bdev = bdev; 4282 spdk_for_each_channel(__bdev_to_io_dev(bdev), 4283 _spdk_bdev_enable_qos_msg, ctx, 4284 _spdk_bdev_enable_qos_done); 4285 } 4286 4287 TAILQ_INSERT_TAIL(&bdev->internal.open_descs, desc, link); 4288 4289 pthread_mutex_unlock(&bdev->internal.mutex); 4290 4291 return 0; 4292 } 4293 4294 void 4295 spdk_bdev_close(struct spdk_bdev_desc *desc) 4296 { 4297 struct spdk_bdev *bdev = spdk_bdev_desc_get_bdev(desc); 4298 int rc; 4299 4300 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Closing descriptor %p for bdev %s on thread %p\n", desc, bdev->name, 4301 spdk_get_thread()); 4302 4303 if (desc->thread != spdk_get_thread()) { 4304 SPDK_ERRLOG("Descriptor %p for bdev %s closed on wrong thread (%p, expected %p)\n", 4305 desc, bdev->name, spdk_get_thread(), desc->thread); 4306 } 4307 4308 pthread_mutex_lock(&bdev->internal.mutex); 4309 4310 TAILQ_REMOVE(&bdev->internal.open_descs, desc, link); 4311 4312 desc->closed = true; 4313 4314 if (!desc->remove_scheduled) { 4315 free(desc); 4316 } 4317 4318 /* If no more descriptors, kill QoS channel */ 4319 if (bdev->internal.qos && TAILQ_EMPTY(&bdev->internal.open_descs)) { 4320 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Closed last descriptor for bdev %s on thread %p. Stopping QoS.\n", 4321 bdev->name, spdk_get_thread()); 4322 4323 if (spdk_bdev_qos_destroy(bdev)) { 4324 /* There isn't anything we can do to recover here. Just let the 4325 * old QoS poller keep running. The QoS handling won't change 4326 * cores when the user allocates a new channel, but it won't break. */ 4327 SPDK_ERRLOG("Unable to shut down QoS poller. It will continue running on the current thread.\n"); 4328 } 4329 } 4330 4331 spdk_bdev_set_qd_sampling_period(bdev, 0); 4332 4333 if (bdev->internal.status == SPDK_BDEV_STATUS_REMOVING && TAILQ_EMPTY(&bdev->internal.open_descs)) { 4334 rc = spdk_bdev_unregister_unsafe(bdev); 4335 pthread_mutex_unlock(&bdev->internal.mutex); 4336 4337 if (rc == 0) { 4338 spdk_bdev_fini(bdev); 4339 } 4340 } else { 4341 pthread_mutex_unlock(&bdev->internal.mutex); 4342 } 4343 } 4344 4345 int 4346 spdk_bdev_module_claim_bdev(struct spdk_bdev *bdev, struct spdk_bdev_desc *desc, 4347 struct spdk_bdev_module *module) 4348 { 4349 if (bdev->internal.claim_module != NULL) { 4350 SPDK_ERRLOG("bdev %s already claimed by module %s\n", bdev->name, 4351 bdev->internal.claim_module->name); 4352 return -EPERM; 4353 } 4354 4355 if (desc && !desc->write) { 4356 desc->write = true; 4357 } 4358 4359 bdev->internal.claim_module = module; 4360 return 0; 4361 } 4362 4363 void 4364 spdk_bdev_module_release_bdev(struct spdk_bdev *bdev) 4365 { 4366 assert(bdev->internal.claim_module != NULL); 4367 bdev->internal.claim_module = NULL; 4368 } 4369 4370 struct spdk_bdev * 4371 spdk_bdev_desc_get_bdev(struct spdk_bdev_desc *desc) 4372 { 4373 return desc->bdev; 4374 } 4375 4376 void 4377 spdk_bdev_io_get_iovec(struct spdk_bdev_io *bdev_io, struct iovec **iovp, int *iovcntp) 4378 { 4379 struct iovec *iovs; 4380 int iovcnt; 4381 4382 if (bdev_io == NULL) { 4383 return; 4384 } 4385 4386 switch (bdev_io->type) { 4387 case SPDK_BDEV_IO_TYPE_READ: 4388 case SPDK_BDEV_IO_TYPE_WRITE: 4389 case SPDK_BDEV_IO_TYPE_ZCOPY: 4390 iovs = bdev_io->u.bdev.iovs; 4391 iovcnt = bdev_io->u.bdev.iovcnt; 4392 break; 4393 default: 4394 iovs = NULL; 4395 iovcnt = 0; 4396 break; 4397 } 4398 4399 if (iovp) { 4400 *iovp = iovs; 4401 } 4402 if (iovcntp) { 4403 *iovcntp = iovcnt; 4404 } 4405 } 4406 4407 void * 4408 spdk_bdev_io_get_md_buf(struct spdk_bdev_io *bdev_io) 4409 { 4410 if (bdev_io == NULL) { 4411 return NULL; 4412 } 4413 4414 if (!spdk_bdev_is_md_separate(bdev_io->bdev)) { 4415 return NULL; 4416 } 4417 4418 if (bdev_io->type == SPDK_BDEV_IO_TYPE_READ || 4419 bdev_io->type == SPDK_BDEV_IO_TYPE_WRITE) { 4420 return bdev_io->u.bdev.md_buf; 4421 } 4422 4423 return NULL; 4424 } 4425 4426 void 4427 spdk_bdev_module_list_add(struct spdk_bdev_module *bdev_module) 4428 { 4429 4430 if (spdk_bdev_module_list_find(bdev_module->name)) { 4431 SPDK_ERRLOG("ERROR: module '%s' already registered.\n", bdev_module->name); 4432 assert(false); 4433 } 4434 4435 /* 4436 * Modules with examine callbacks must be initialized first, so they are 4437 * ready to handle examine callbacks from later modules that will 4438 * register physical bdevs. 4439 */ 4440 if (bdev_module->examine_config != NULL || bdev_module->examine_disk != NULL) { 4441 TAILQ_INSERT_HEAD(&g_bdev_mgr.bdev_modules, bdev_module, internal.tailq); 4442 } else { 4443 TAILQ_INSERT_TAIL(&g_bdev_mgr.bdev_modules, bdev_module, internal.tailq); 4444 } 4445 } 4446 4447 struct spdk_bdev_module * 4448 spdk_bdev_module_list_find(const char *name) 4449 { 4450 struct spdk_bdev_module *bdev_module; 4451 4452 TAILQ_FOREACH(bdev_module, &g_bdev_mgr.bdev_modules, internal.tailq) { 4453 if (strcmp(name, bdev_module->name) == 0) { 4454 break; 4455 } 4456 } 4457 4458 return bdev_module; 4459 } 4460 4461 static void 4462 _spdk_bdev_write_zero_buffer_next(void *_bdev_io) 4463 { 4464 struct spdk_bdev_io *bdev_io = _bdev_io; 4465 uint64_t num_bytes, num_blocks; 4466 void *md_buf = NULL; 4467 int rc; 4468 4469 num_bytes = spdk_min(_bdev_get_block_size_with_md(bdev_io->bdev) * 4470 bdev_io->u.bdev.split_remaining_num_blocks, 4471 ZERO_BUFFER_SIZE); 4472 num_blocks = num_bytes / _bdev_get_block_size_with_md(bdev_io->bdev); 4473 4474 if (spdk_bdev_is_md_separate(bdev_io->bdev)) { 4475 md_buf = (char *)g_bdev_mgr.zero_buffer + 4476 spdk_bdev_get_block_size(bdev_io->bdev) * num_blocks; 4477 } 4478 4479 rc = _spdk_bdev_write_blocks_with_md(bdev_io->internal.desc, 4480 spdk_io_channel_from_ctx(bdev_io->internal.ch), 4481 g_bdev_mgr.zero_buffer, md_buf, 4482 bdev_io->u.bdev.split_current_offset_blocks, num_blocks, 4483 _spdk_bdev_write_zero_buffer_done, bdev_io); 4484 if (rc == 0) { 4485 bdev_io->u.bdev.split_remaining_num_blocks -= num_blocks; 4486 bdev_io->u.bdev.split_current_offset_blocks += num_blocks; 4487 } else if (rc == -ENOMEM) { 4488 _spdk_bdev_queue_io_wait_with_cb(bdev_io, _spdk_bdev_write_zero_buffer_next); 4489 } else { 4490 bdev_io->internal.status = SPDK_BDEV_IO_STATUS_FAILED; 4491 bdev_io->internal.cb(bdev_io, false, bdev_io->internal.caller_ctx); 4492 } 4493 } 4494 4495 static void 4496 _spdk_bdev_write_zero_buffer_done(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg) 4497 { 4498 struct spdk_bdev_io *parent_io = cb_arg; 4499 4500 spdk_bdev_free_io(bdev_io); 4501 4502 if (!success) { 4503 parent_io->internal.status = SPDK_BDEV_IO_STATUS_FAILED; 4504 parent_io->internal.cb(parent_io, false, parent_io->internal.caller_ctx); 4505 return; 4506 } 4507 4508 if (parent_io->u.bdev.split_remaining_num_blocks == 0) { 4509 parent_io->internal.status = SPDK_BDEV_IO_STATUS_SUCCESS; 4510 parent_io->internal.cb(parent_io, true, parent_io->internal.caller_ctx); 4511 return; 4512 } 4513 4514 _spdk_bdev_write_zero_buffer_next(parent_io); 4515 } 4516 4517 static void 4518 _spdk_bdev_set_qos_limit_done(struct set_qos_limit_ctx *ctx, int status) 4519 { 4520 pthread_mutex_lock(&ctx->bdev->internal.mutex); 4521 ctx->bdev->internal.qos_mod_in_progress = false; 4522 pthread_mutex_unlock(&ctx->bdev->internal.mutex); 4523 4524 if (ctx->cb_fn) { 4525 ctx->cb_fn(ctx->cb_arg, status); 4526 } 4527 free(ctx); 4528 } 4529 4530 static void 4531 _spdk_bdev_disable_qos_done(void *cb_arg) 4532 { 4533 struct set_qos_limit_ctx *ctx = cb_arg; 4534 struct spdk_bdev *bdev = ctx->bdev; 4535 struct spdk_bdev_io *bdev_io; 4536 struct spdk_bdev_qos *qos; 4537 4538 pthread_mutex_lock(&bdev->internal.mutex); 4539 qos = bdev->internal.qos; 4540 bdev->internal.qos = NULL; 4541 pthread_mutex_unlock(&bdev->internal.mutex); 4542 4543 while (!TAILQ_EMPTY(&qos->queued)) { 4544 /* Send queued I/O back to their original thread for resubmission. */ 4545 bdev_io = TAILQ_FIRST(&qos->queued); 4546 TAILQ_REMOVE(&qos->queued, bdev_io, internal.link); 4547 4548 if (bdev_io->internal.io_submit_ch) { 4549 /* 4550 * Channel was changed when sending it to the QoS thread - change it back 4551 * before sending it back to the original thread. 4552 */ 4553 bdev_io->internal.ch = bdev_io->internal.io_submit_ch; 4554 bdev_io->internal.io_submit_ch = NULL; 4555 } 4556 4557 spdk_thread_send_msg(spdk_bdev_io_get_thread(bdev_io), 4558 _spdk_bdev_io_submit, bdev_io); 4559 } 4560 4561 if (qos->thread != NULL) { 4562 spdk_put_io_channel(spdk_io_channel_from_ctx(qos->ch)); 4563 spdk_poller_unregister(&qos->poller); 4564 } 4565 4566 free(qos); 4567 4568 _spdk_bdev_set_qos_limit_done(ctx, 0); 4569 } 4570 4571 static void 4572 _spdk_bdev_disable_qos_msg_done(struct spdk_io_channel_iter *i, int status) 4573 { 4574 void *io_device = spdk_io_channel_iter_get_io_device(i); 4575 struct spdk_bdev *bdev = __bdev_from_io_dev(io_device); 4576 struct set_qos_limit_ctx *ctx = spdk_io_channel_iter_get_ctx(i); 4577 struct spdk_thread *thread; 4578 4579 pthread_mutex_lock(&bdev->internal.mutex); 4580 thread = bdev->internal.qos->thread; 4581 pthread_mutex_unlock(&bdev->internal.mutex); 4582 4583 if (thread != NULL) { 4584 spdk_thread_send_msg(thread, _spdk_bdev_disable_qos_done, ctx); 4585 } else { 4586 _spdk_bdev_disable_qos_done(ctx); 4587 } 4588 } 4589 4590 static void 4591 _spdk_bdev_disable_qos_msg(struct spdk_io_channel_iter *i) 4592 { 4593 struct spdk_io_channel *ch = spdk_io_channel_iter_get_channel(i); 4594 struct spdk_bdev_channel *bdev_ch = spdk_io_channel_get_ctx(ch); 4595 4596 bdev_ch->flags &= ~BDEV_CH_QOS_ENABLED; 4597 4598 spdk_for_each_channel_continue(i, 0); 4599 } 4600 4601 static void 4602 _spdk_bdev_update_qos_rate_limit_msg(void *cb_arg) 4603 { 4604 struct set_qos_limit_ctx *ctx = cb_arg; 4605 struct spdk_bdev *bdev = ctx->bdev; 4606 4607 pthread_mutex_lock(&bdev->internal.mutex); 4608 spdk_bdev_qos_update_max_quota_per_timeslice(bdev->internal.qos); 4609 pthread_mutex_unlock(&bdev->internal.mutex); 4610 4611 _spdk_bdev_set_qos_limit_done(ctx, 0); 4612 } 4613 4614 static void 4615 _spdk_bdev_enable_qos_msg(struct spdk_io_channel_iter *i) 4616 { 4617 void *io_device = spdk_io_channel_iter_get_io_device(i); 4618 struct spdk_bdev *bdev = __bdev_from_io_dev(io_device); 4619 struct spdk_io_channel *ch = spdk_io_channel_iter_get_channel(i); 4620 struct spdk_bdev_channel *bdev_ch = spdk_io_channel_get_ctx(ch); 4621 4622 pthread_mutex_lock(&bdev->internal.mutex); 4623 _spdk_bdev_enable_qos(bdev, bdev_ch); 4624 pthread_mutex_unlock(&bdev->internal.mutex); 4625 spdk_for_each_channel_continue(i, 0); 4626 } 4627 4628 static void 4629 _spdk_bdev_enable_qos_done(struct spdk_io_channel_iter *i, int status) 4630 { 4631 struct set_qos_limit_ctx *ctx = spdk_io_channel_iter_get_ctx(i); 4632 4633 _spdk_bdev_set_qos_limit_done(ctx, status); 4634 } 4635 4636 static void 4637 _spdk_bdev_set_qos_rate_limits(struct spdk_bdev *bdev, uint64_t *limits) 4638 { 4639 int i; 4640 4641 assert(bdev->internal.qos != NULL); 4642 4643 for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) { 4644 if (limits[i] != SPDK_BDEV_QOS_LIMIT_NOT_DEFINED) { 4645 bdev->internal.qos->rate_limits[i].limit = limits[i]; 4646 4647 if (limits[i] == 0) { 4648 bdev->internal.qos->rate_limits[i].limit = 4649 SPDK_BDEV_QOS_LIMIT_NOT_DEFINED; 4650 } 4651 } 4652 } 4653 } 4654 4655 void 4656 spdk_bdev_set_qos_rate_limits(struct spdk_bdev *bdev, uint64_t *limits, 4657 void (*cb_fn)(void *cb_arg, int status), void *cb_arg) 4658 { 4659 struct set_qos_limit_ctx *ctx; 4660 uint32_t limit_set_complement; 4661 uint64_t min_limit_per_sec; 4662 int i; 4663 bool disable_rate_limit = true; 4664 4665 for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) { 4666 if (limits[i] == SPDK_BDEV_QOS_LIMIT_NOT_DEFINED) { 4667 continue; 4668 } 4669 4670 if (limits[i] > 0) { 4671 disable_rate_limit = false; 4672 } 4673 4674 if (_spdk_bdev_qos_is_iops_rate_limit(i) == true) { 4675 min_limit_per_sec = SPDK_BDEV_QOS_MIN_IOS_PER_SEC; 4676 } else { 4677 /* Change from megabyte to byte rate limit */ 4678 limits[i] = limits[i] * 1024 * 1024; 4679 min_limit_per_sec = SPDK_BDEV_QOS_MIN_BYTES_PER_SEC; 4680 } 4681 4682 limit_set_complement = limits[i] % min_limit_per_sec; 4683 if (limit_set_complement) { 4684 SPDK_ERRLOG("Requested rate limit %" PRIu64 " is not a multiple of %" PRIu64 "\n", 4685 limits[i], min_limit_per_sec); 4686 limits[i] += min_limit_per_sec - limit_set_complement; 4687 SPDK_ERRLOG("Round up the rate limit to %" PRIu64 "\n", limits[i]); 4688 } 4689 } 4690 4691 ctx = calloc(1, sizeof(*ctx)); 4692 if (ctx == NULL) { 4693 cb_fn(cb_arg, -ENOMEM); 4694 return; 4695 } 4696 4697 ctx->cb_fn = cb_fn; 4698 ctx->cb_arg = cb_arg; 4699 ctx->bdev = bdev; 4700 4701 pthread_mutex_lock(&bdev->internal.mutex); 4702 if (bdev->internal.qos_mod_in_progress) { 4703 pthread_mutex_unlock(&bdev->internal.mutex); 4704 free(ctx); 4705 cb_fn(cb_arg, -EAGAIN); 4706 return; 4707 } 4708 bdev->internal.qos_mod_in_progress = true; 4709 4710 if (disable_rate_limit == true && bdev->internal.qos) { 4711 for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) { 4712 if (limits[i] == SPDK_BDEV_QOS_LIMIT_NOT_DEFINED && 4713 (bdev->internal.qos->rate_limits[i].limit > 0 && 4714 bdev->internal.qos->rate_limits[i].limit != 4715 SPDK_BDEV_QOS_LIMIT_NOT_DEFINED)) { 4716 disable_rate_limit = false; 4717 break; 4718 } 4719 } 4720 } 4721 4722 if (disable_rate_limit == false) { 4723 if (bdev->internal.qos == NULL) { 4724 bdev->internal.qos = calloc(1, sizeof(*bdev->internal.qos)); 4725 if (!bdev->internal.qos) { 4726 pthread_mutex_unlock(&bdev->internal.mutex); 4727 SPDK_ERRLOG("Unable to allocate memory for QoS tracking\n"); 4728 free(ctx); 4729 cb_fn(cb_arg, -ENOMEM); 4730 return; 4731 } 4732 } 4733 4734 if (bdev->internal.qos->thread == NULL) { 4735 /* Enabling */ 4736 _spdk_bdev_set_qos_rate_limits(bdev, limits); 4737 4738 spdk_for_each_channel(__bdev_to_io_dev(bdev), 4739 _spdk_bdev_enable_qos_msg, ctx, 4740 _spdk_bdev_enable_qos_done); 4741 } else { 4742 /* Updating */ 4743 _spdk_bdev_set_qos_rate_limits(bdev, limits); 4744 4745 spdk_thread_send_msg(bdev->internal.qos->thread, 4746 _spdk_bdev_update_qos_rate_limit_msg, ctx); 4747 } 4748 } else { 4749 if (bdev->internal.qos != NULL) { 4750 _spdk_bdev_set_qos_rate_limits(bdev, limits); 4751 4752 /* Disabling */ 4753 spdk_for_each_channel(__bdev_to_io_dev(bdev), 4754 _spdk_bdev_disable_qos_msg, ctx, 4755 _spdk_bdev_disable_qos_msg_done); 4756 } else { 4757 pthread_mutex_unlock(&bdev->internal.mutex); 4758 _spdk_bdev_set_qos_limit_done(ctx, 0); 4759 return; 4760 } 4761 } 4762 4763 pthread_mutex_unlock(&bdev->internal.mutex); 4764 } 4765 4766 struct spdk_bdev_histogram_ctx { 4767 spdk_bdev_histogram_status_cb cb_fn; 4768 void *cb_arg; 4769 struct spdk_bdev *bdev; 4770 int status; 4771 }; 4772 4773 static void 4774 _spdk_bdev_histogram_disable_channel_cb(struct spdk_io_channel_iter *i, int status) 4775 { 4776 struct spdk_bdev_histogram_ctx *ctx = spdk_io_channel_iter_get_ctx(i); 4777 4778 pthread_mutex_lock(&ctx->bdev->internal.mutex); 4779 ctx->bdev->internal.histogram_in_progress = false; 4780 pthread_mutex_unlock(&ctx->bdev->internal.mutex); 4781 ctx->cb_fn(ctx->cb_arg, ctx->status); 4782 free(ctx); 4783 } 4784 4785 static void 4786 _spdk_bdev_histogram_disable_channel(struct spdk_io_channel_iter *i) 4787 { 4788 struct spdk_io_channel *_ch = spdk_io_channel_iter_get_channel(i); 4789 struct spdk_bdev_channel *ch = spdk_io_channel_get_ctx(_ch); 4790 4791 if (ch->histogram != NULL) { 4792 spdk_histogram_data_free(ch->histogram); 4793 ch->histogram = NULL; 4794 } 4795 spdk_for_each_channel_continue(i, 0); 4796 } 4797 4798 static void 4799 _spdk_bdev_histogram_enable_channel_cb(struct spdk_io_channel_iter *i, int status) 4800 { 4801 struct spdk_bdev_histogram_ctx *ctx = spdk_io_channel_iter_get_ctx(i); 4802 4803 if (status != 0) { 4804 ctx->status = status; 4805 ctx->bdev->internal.histogram_enabled = false; 4806 spdk_for_each_channel(__bdev_to_io_dev(ctx->bdev), _spdk_bdev_histogram_disable_channel, ctx, 4807 _spdk_bdev_histogram_disable_channel_cb); 4808 } else { 4809 pthread_mutex_lock(&ctx->bdev->internal.mutex); 4810 ctx->bdev->internal.histogram_in_progress = false; 4811 pthread_mutex_unlock(&ctx->bdev->internal.mutex); 4812 ctx->cb_fn(ctx->cb_arg, ctx->status); 4813 free(ctx); 4814 } 4815 } 4816 4817 static void 4818 _spdk_bdev_histogram_enable_channel(struct spdk_io_channel_iter *i) 4819 { 4820 struct spdk_io_channel *_ch = spdk_io_channel_iter_get_channel(i); 4821 struct spdk_bdev_channel *ch = spdk_io_channel_get_ctx(_ch); 4822 int status = 0; 4823 4824 if (ch->histogram == NULL) { 4825 ch->histogram = spdk_histogram_data_alloc(); 4826 if (ch->histogram == NULL) { 4827 status = -ENOMEM; 4828 } 4829 } 4830 4831 spdk_for_each_channel_continue(i, status); 4832 } 4833 4834 void 4835 spdk_bdev_histogram_enable(struct spdk_bdev *bdev, spdk_bdev_histogram_status_cb cb_fn, 4836 void *cb_arg, bool enable) 4837 { 4838 struct spdk_bdev_histogram_ctx *ctx; 4839 4840 ctx = calloc(1, sizeof(struct spdk_bdev_histogram_ctx)); 4841 if (ctx == NULL) { 4842 cb_fn(cb_arg, -ENOMEM); 4843 return; 4844 } 4845 4846 ctx->bdev = bdev; 4847 ctx->status = 0; 4848 ctx->cb_fn = cb_fn; 4849 ctx->cb_arg = cb_arg; 4850 4851 pthread_mutex_lock(&bdev->internal.mutex); 4852 if (bdev->internal.histogram_in_progress) { 4853 pthread_mutex_unlock(&bdev->internal.mutex); 4854 free(ctx); 4855 cb_fn(cb_arg, -EAGAIN); 4856 return; 4857 } 4858 4859 bdev->internal.histogram_in_progress = true; 4860 pthread_mutex_unlock(&bdev->internal.mutex); 4861 4862 bdev->internal.histogram_enabled = enable; 4863 4864 if (enable) { 4865 /* Allocate histogram for each channel */ 4866 spdk_for_each_channel(__bdev_to_io_dev(bdev), _spdk_bdev_histogram_enable_channel, ctx, 4867 _spdk_bdev_histogram_enable_channel_cb); 4868 } else { 4869 spdk_for_each_channel(__bdev_to_io_dev(bdev), _spdk_bdev_histogram_disable_channel, ctx, 4870 _spdk_bdev_histogram_disable_channel_cb); 4871 } 4872 } 4873 4874 struct spdk_bdev_histogram_data_ctx { 4875 spdk_bdev_histogram_data_cb cb_fn; 4876 void *cb_arg; 4877 struct spdk_bdev *bdev; 4878 /** merged histogram data from all channels */ 4879 struct spdk_histogram_data *histogram; 4880 }; 4881 4882 static void 4883 _spdk_bdev_histogram_get_channel_cb(struct spdk_io_channel_iter *i, int status) 4884 { 4885 struct spdk_bdev_histogram_data_ctx *ctx = spdk_io_channel_iter_get_ctx(i); 4886 4887 ctx->cb_fn(ctx->cb_arg, status, ctx->histogram); 4888 free(ctx); 4889 } 4890 4891 static void 4892 _spdk_bdev_histogram_get_channel(struct spdk_io_channel_iter *i) 4893 { 4894 struct spdk_io_channel *_ch = spdk_io_channel_iter_get_channel(i); 4895 struct spdk_bdev_channel *ch = spdk_io_channel_get_ctx(_ch); 4896 struct spdk_bdev_histogram_data_ctx *ctx = spdk_io_channel_iter_get_ctx(i); 4897 int status = 0; 4898 4899 if (ch->histogram == NULL) { 4900 status = -EFAULT; 4901 } else { 4902 spdk_histogram_data_merge(ctx->histogram, ch->histogram); 4903 } 4904 4905 spdk_for_each_channel_continue(i, status); 4906 } 4907 4908 void 4909 spdk_bdev_histogram_get(struct spdk_bdev *bdev, struct spdk_histogram_data *histogram, 4910 spdk_bdev_histogram_data_cb cb_fn, 4911 void *cb_arg) 4912 { 4913 struct spdk_bdev_histogram_data_ctx *ctx; 4914 4915 ctx = calloc(1, sizeof(struct spdk_bdev_histogram_data_ctx)); 4916 if (ctx == NULL) { 4917 cb_fn(cb_arg, -ENOMEM, NULL); 4918 return; 4919 } 4920 4921 ctx->bdev = bdev; 4922 ctx->cb_fn = cb_fn; 4923 ctx->cb_arg = cb_arg; 4924 4925 ctx->histogram = histogram; 4926 4927 spdk_for_each_channel(__bdev_to_io_dev(bdev), _spdk_bdev_histogram_get_channel, ctx, 4928 _spdk_bdev_histogram_get_channel_cb); 4929 } 4930 4931 SPDK_LOG_REGISTER_COMPONENT("bdev", SPDK_LOG_BDEV) 4932 4933 SPDK_TRACE_REGISTER_FN(bdev_trace, "bdev", TRACE_GROUP_BDEV) 4934 { 4935 spdk_trace_register_owner(OWNER_BDEV, 'b'); 4936 spdk_trace_register_object(OBJECT_BDEV_IO, 'i'); 4937 spdk_trace_register_description("BDEV_IO_START", TRACE_BDEV_IO_START, OWNER_BDEV, 4938 OBJECT_BDEV_IO, 1, 0, "type: "); 4939 spdk_trace_register_description("BDEV_IO_DONE", TRACE_BDEV_IO_DONE, OWNER_BDEV, 4940 OBJECT_BDEV_IO, 0, 0, ""); 4941 } 4942