1 /*- 2 * BSD LICENSE 3 * 4 * Copyright (c) Intel Corporation. 5 * All rights reserved. 6 * 7 * Redistribution and use in source and binary forms, with or without 8 * modification, are permitted provided that the following conditions 9 * are met: 10 * 11 * * Redistributions of source code must retain the above copyright 12 * notice, this list of conditions and the following disclaimer. 13 * * Redistributions in binary form must reproduce the above copyright 14 * notice, this list of conditions and the following disclaimer in 15 * the documentation and/or other materials provided with the 16 * distribution. 17 * * Neither the name of Intel Corporation nor the names of its 18 * contributors may be used to endorse or promote products derived 19 * from this software without specific prior written permission. 20 * 21 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 22 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 23 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR 24 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT 25 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, 26 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT 27 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 28 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY 29 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 30 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE 31 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 32 */ 33 34 #include "spdk/stdinc.h" 35 36 #include "spdk/bdev.h" 37 #include "spdk/conf.h" 38 39 #include "spdk/config.h" 40 #include "spdk/env.h" 41 #include "spdk/event.h" 42 #include "spdk/thread.h" 43 #include "spdk/likely.h" 44 #include "spdk/queue.h" 45 #include "spdk/nvme_spec.h" 46 #include "spdk/scsi_spec.h" 47 #include "spdk/notify.h" 48 #include "spdk/util.h" 49 #include "spdk/trace.h" 50 51 #include "spdk/bdev_module.h" 52 #include "spdk_internal/log.h" 53 #include "spdk/string.h" 54 55 #ifdef SPDK_CONFIG_VTUNE 56 #include "ittnotify.h" 57 #include "ittnotify_types.h" 58 int __itt_init_ittlib(const char *, __itt_group_id); 59 #endif 60 61 #define SPDK_BDEV_IO_POOL_SIZE (64 * 1024) 62 #define SPDK_BDEV_IO_CACHE_SIZE 256 63 #define BUF_SMALL_POOL_SIZE 8192 64 #define BUF_LARGE_POOL_SIZE 1024 65 #define NOMEM_THRESHOLD_COUNT 8 66 #define ZERO_BUFFER_SIZE 0x100000 67 68 #define OWNER_BDEV 0x2 69 70 #define OBJECT_BDEV_IO 0x2 71 72 #define TRACE_GROUP_BDEV 0x3 73 #define TRACE_BDEV_IO_START SPDK_TPOINT_ID(TRACE_GROUP_BDEV, 0x0) 74 #define TRACE_BDEV_IO_DONE SPDK_TPOINT_ID(TRACE_GROUP_BDEV, 0x1) 75 76 #define SPDK_BDEV_QOS_TIMESLICE_IN_USEC 1000 77 #define SPDK_BDEV_QOS_MIN_IO_PER_TIMESLICE 1 78 #define SPDK_BDEV_QOS_MIN_BYTE_PER_TIMESLICE 512 79 #define SPDK_BDEV_QOS_MIN_IOS_PER_SEC 10000 80 #define SPDK_BDEV_QOS_MIN_BYTES_PER_SEC (10 * 1024 * 1024) 81 #define SPDK_BDEV_QOS_LIMIT_NOT_DEFINED UINT64_MAX 82 83 #define SPDK_BDEV_POOL_ALIGNMENT 512 84 85 static const char *qos_conf_type[] = {"Limit_IOPS", 86 "Limit_BPS", "Limit_Read_BPS", "Limit_Write_BPS" 87 }; 88 static const char *qos_rpc_type[] = {"rw_ios_per_sec", 89 "rw_mbytes_per_sec", "r_mbytes_per_sec", "w_mbytes_per_sec" 90 }; 91 92 TAILQ_HEAD(spdk_bdev_list, spdk_bdev); 93 94 struct spdk_bdev_mgr { 95 struct spdk_mempool *bdev_io_pool; 96 97 struct spdk_mempool *buf_small_pool; 98 struct spdk_mempool *buf_large_pool; 99 100 void *zero_buffer; 101 102 TAILQ_HEAD(bdev_module_list, spdk_bdev_module) bdev_modules; 103 104 struct spdk_bdev_list bdevs; 105 106 bool init_complete; 107 bool module_init_complete; 108 109 #ifdef SPDK_CONFIG_VTUNE 110 __itt_domain *domain; 111 #endif 112 }; 113 114 static struct spdk_bdev_mgr g_bdev_mgr = { 115 .bdev_modules = TAILQ_HEAD_INITIALIZER(g_bdev_mgr.bdev_modules), 116 .bdevs = TAILQ_HEAD_INITIALIZER(g_bdev_mgr.bdevs), 117 .init_complete = false, 118 .module_init_complete = false, 119 }; 120 121 static struct spdk_bdev_opts g_bdev_opts = { 122 .bdev_io_pool_size = SPDK_BDEV_IO_POOL_SIZE, 123 .bdev_io_cache_size = SPDK_BDEV_IO_CACHE_SIZE, 124 }; 125 126 static spdk_bdev_init_cb g_init_cb_fn = NULL; 127 static void *g_init_cb_arg = NULL; 128 129 static spdk_bdev_fini_cb g_fini_cb_fn = NULL; 130 static void *g_fini_cb_arg = NULL; 131 static struct spdk_thread *g_fini_thread = NULL; 132 133 struct spdk_bdev_qos_limit { 134 /** IOs or bytes allowed per second (i.e., 1s). */ 135 uint64_t limit; 136 137 /** Remaining IOs or bytes allowed in current timeslice (e.g., 1ms). 138 * For remaining bytes, allowed to run negative if an I/O is submitted when 139 * some bytes are remaining, but the I/O is bigger than that amount. The 140 * excess will be deducted from the next timeslice. 141 */ 142 int64_t remaining_this_timeslice; 143 144 /** Minimum allowed IOs or bytes to be issued in one timeslice (e.g., 1ms). */ 145 uint32_t min_per_timeslice; 146 147 /** Maximum allowed IOs or bytes to be issued in one timeslice (e.g., 1ms). */ 148 uint32_t max_per_timeslice; 149 150 /** Function to check whether to queue the IO. */ 151 bool (*queue_io)(const struct spdk_bdev_qos_limit *limit, struct spdk_bdev_io *io); 152 153 /** Function to update for the submitted IO. */ 154 void (*update_quota)(struct spdk_bdev_qos_limit *limit, struct spdk_bdev_io *io); 155 }; 156 157 struct spdk_bdev_qos { 158 /** Types of structure of rate limits. */ 159 struct spdk_bdev_qos_limit rate_limits[SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES]; 160 161 /** The channel that all I/O are funneled through. */ 162 struct spdk_bdev_channel *ch; 163 164 /** The thread on which the poller is running. */ 165 struct spdk_thread *thread; 166 167 /** Queue of I/O waiting to be issued. */ 168 bdev_io_tailq_t queued; 169 170 /** Size of a timeslice in tsc ticks. */ 171 uint64_t timeslice_size; 172 173 /** Timestamp of start of last timeslice. */ 174 uint64_t last_timeslice; 175 176 /** Poller that processes queued I/O commands each time slice. */ 177 struct spdk_poller *poller; 178 }; 179 180 struct spdk_bdev_mgmt_channel { 181 bdev_io_stailq_t need_buf_small; 182 bdev_io_stailq_t need_buf_large; 183 184 /* 185 * Each thread keeps a cache of bdev_io - this allows 186 * bdev threads which are *not* DPDK threads to still 187 * benefit from a per-thread bdev_io cache. Without 188 * this, non-DPDK threads fetching from the mempool 189 * incur a cmpxchg on get and put. 190 */ 191 bdev_io_stailq_t per_thread_cache; 192 uint32_t per_thread_cache_count; 193 uint32_t bdev_io_cache_size; 194 195 TAILQ_HEAD(, spdk_bdev_shared_resource) shared_resources; 196 TAILQ_HEAD(, spdk_bdev_io_wait_entry) io_wait_queue; 197 }; 198 199 /* 200 * Per-module (or per-io_device) data. Multiple bdevs built on the same io_device 201 * will queue here their IO that awaits retry. It makes it possible to retry sending 202 * IO to one bdev after IO from other bdev completes. 203 */ 204 struct spdk_bdev_shared_resource { 205 /* The bdev management channel */ 206 struct spdk_bdev_mgmt_channel *mgmt_ch; 207 208 /* 209 * Count of I/O submitted to bdev module and waiting for completion. 210 * Incremented before submit_request() is called on an spdk_bdev_io. 211 */ 212 uint64_t io_outstanding; 213 214 /* 215 * Queue of IO awaiting retry because of a previous NOMEM status returned 216 * on this channel. 217 */ 218 bdev_io_tailq_t nomem_io; 219 220 /* 221 * Threshold which io_outstanding must drop to before retrying nomem_io. 222 */ 223 uint64_t nomem_threshold; 224 225 /* I/O channel allocated by a bdev module */ 226 struct spdk_io_channel *shared_ch; 227 228 /* Refcount of bdev channels using this resource */ 229 uint32_t ref; 230 231 TAILQ_ENTRY(spdk_bdev_shared_resource) link; 232 }; 233 234 #define BDEV_CH_RESET_IN_PROGRESS (1 << 0) 235 #define BDEV_CH_QOS_ENABLED (1 << 1) 236 237 struct spdk_bdev_channel { 238 struct spdk_bdev *bdev; 239 240 /* The channel for the underlying device */ 241 struct spdk_io_channel *channel; 242 243 /* Per io_device per thread data */ 244 struct spdk_bdev_shared_resource *shared_resource; 245 246 struct spdk_bdev_io_stat stat; 247 248 /* 249 * Count of I/O submitted through this channel and waiting for completion. 250 * Incremented before submit_request() is called on an spdk_bdev_io. 251 */ 252 uint64_t io_outstanding; 253 254 bdev_io_tailq_t queued_resets; 255 256 uint32_t flags; 257 258 struct spdk_histogram_data *histogram; 259 260 #ifdef SPDK_CONFIG_VTUNE 261 uint64_t start_tsc; 262 uint64_t interval_tsc; 263 __itt_string_handle *handle; 264 struct spdk_bdev_io_stat prev_stat; 265 #endif 266 267 }; 268 269 struct spdk_bdev_desc { 270 struct spdk_bdev *bdev; 271 struct spdk_thread *thread; 272 spdk_bdev_remove_cb_t remove_cb; 273 void *remove_ctx; 274 bool remove_scheduled; 275 bool closed; 276 bool write; 277 TAILQ_ENTRY(spdk_bdev_desc) link; 278 }; 279 280 struct spdk_bdev_iostat_ctx { 281 struct spdk_bdev_io_stat *stat; 282 spdk_bdev_get_device_stat_cb cb; 283 void *cb_arg; 284 }; 285 286 struct set_qos_limit_ctx { 287 void (*cb_fn)(void *cb_arg, int status); 288 void *cb_arg; 289 struct spdk_bdev *bdev; 290 }; 291 292 #define __bdev_to_io_dev(bdev) (((char *)bdev) + 1) 293 #define __bdev_from_io_dev(io_dev) ((struct spdk_bdev *)(((char *)io_dev) - 1)) 294 295 static void _spdk_bdev_write_zero_buffer_done(struct spdk_bdev_io *bdev_io, bool success, 296 void *cb_arg); 297 static void _spdk_bdev_write_zero_buffer_next(void *_bdev_io); 298 299 static void _spdk_bdev_enable_qos_msg(struct spdk_io_channel_iter *i); 300 static void _spdk_bdev_enable_qos_done(struct spdk_io_channel_iter *i, int status); 301 302 void 303 spdk_bdev_get_opts(struct spdk_bdev_opts *opts) 304 { 305 *opts = g_bdev_opts; 306 } 307 308 int 309 spdk_bdev_set_opts(struct spdk_bdev_opts *opts) 310 { 311 uint32_t min_pool_size; 312 313 /* 314 * Add 1 to the thread count to account for the extra mgmt_ch that gets created during subsystem 315 * initialization. A second mgmt_ch will be created on the same thread when the application starts 316 * but before the deferred put_io_channel event is executed for the first mgmt_ch. 317 */ 318 min_pool_size = opts->bdev_io_cache_size * (spdk_thread_get_count() + 1); 319 if (opts->bdev_io_pool_size < min_pool_size) { 320 SPDK_ERRLOG("bdev_io_pool_size %" PRIu32 " is not compatible with bdev_io_cache_size %" PRIu32 321 " and %" PRIu32 " threads\n", opts->bdev_io_pool_size, opts->bdev_io_cache_size, 322 spdk_thread_get_count()); 323 SPDK_ERRLOG("bdev_io_pool_size must be at least %" PRIu32 "\n", min_pool_size); 324 return -1; 325 } 326 327 g_bdev_opts = *opts; 328 return 0; 329 } 330 331 struct spdk_bdev * 332 spdk_bdev_first(void) 333 { 334 struct spdk_bdev *bdev; 335 336 bdev = TAILQ_FIRST(&g_bdev_mgr.bdevs); 337 if (bdev) { 338 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Starting bdev iteration at %s\n", bdev->name); 339 } 340 341 return bdev; 342 } 343 344 struct spdk_bdev * 345 spdk_bdev_next(struct spdk_bdev *prev) 346 { 347 struct spdk_bdev *bdev; 348 349 bdev = TAILQ_NEXT(prev, internal.link); 350 if (bdev) { 351 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Continuing bdev iteration at %s\n", bdev->name); 352 } 353 354 return bdev; 355 } 356 357 static struct spdk_bdev * 358 _bdev_next_leaf(struct spdk_bdev *bdev) 359 { 360 while (bdev != NULL) { 361 if (bdev->internal.claim_module == NULL) { 362 return bdev; 363 } else { 364 bdev = TAILQ_NEXT(bdev, internal.link); 365 } 366 } 367 368 return bdev; 369 } 370 371 struct spdk_bdev * 372 spdk_bdev_first_leaf(void) 373 { 374 struct spdk_bdev *bdev; 375 376 bdev = _bdev_next_leaf(TAILQ_FIRST(&g_bdev_mgr.bdevs)); 377 378 if (bdev) { 379 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Starting bdev iteration at %s\n", bdev->name); 380 } 381 382 return bdev; 383 } 384 385 struct spdk_bdev * 386 spdk_bdev_next_leaf(struct spdk_bdev *prev) 387 { 388 struct spdk_bdev *bdev; 389 390 bdev = _bdev_next_leaf(TAILQ_NEXT(prev, internal.link)); 391 392 if (bdev) { 393 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Continuing bdev iteration at %s\n", bdev->name); 394 } 395 396 return bdev; 397 } 398 399 struct spdk_bdev * 400 spdk_bdev_get_by_name(const char *bdev_name) 401 { 402 struct spdk_bdev_alias *tmp; 403 struct spdk_bdev *bdev = spdk_bdev_first(); 404 405 while (bdev != NULL) { 406 if (strcmp(bdev_name, bdev->name) == 0) { 407 return bdev; 408 } 409 410 TAILQ_FOREACH(tmp, &bdev->aliases, tailq) { 411 if (strcmp(bdev_name, tmp->alias) == 0) { 412 return bdev; 413 } 414 } 415 416 bdev = spdk_bdev_next(bdev); 417 } 418 419 return NULL; 420 } 421 422 void 423 spdk_bdev_io_set_buf(struct spdk_bdev_io *bdev_io, void *buf, size_t len) 424 { 425 struct iovec *iovs; 426 427 iovs = bdev_io->u.bdev.iovs; 428 429 assert(iovs != NULL); 430 assert(bdev_io->u.bdev.iovcnt >= 1); 431 432 iovs[0].iov_base = buf; 433 iovs[0].iov_len = len; 434 } 435 436 static bool 437 _is_buf_allocated(struct iovec *iovs) 438 { 439 return iovs[0].iov_base != NULL; 440 } 441 442 static bool 443 _are_iovs_aligned(struct iovec *iovs, int iovcnt, uint32_t alignment) 444 { 445 int i; 446 uintptr_t iov_base; 447 448 if (spdk_likely(alignment == 1)) { 449 return true; 450 } 451 452 for (i = 0; i < iovcnt; i++) { 453 iov_base = (uintptr_t)iovs[i].iov_base; 454 if ((iov_base & (alignment - 1)) != 0) { 455 return false; 456 } 457 } 458 459 return true; 460 } 461 462 static void 463 _copy_iovs_to_buf(void *buf, size_t buf_len, struct iovec *iovs, int iovcnt) 464 { 465 int i; 466 size_t len; 467 468 for (i = 0; i < iovcnt; i++) { 469 len = spdk_min(iovs[i].iov_len, buf_len); 470 memcpy(buf, iovs[i].iov_base, len); 471 buf += len; 472 buf_len -= len; 473 } 474 } 475 476 static void 477 _copy_buf_to_iovs(struct iovec *iovs, int iovcnt, void *buf, size_t buf_len) 478 { 479 int i; 480 size_t len; 481 482 for (i = 0; i < iovcnt; i++) { 483 len = spdk_min(iovs[i].iov_len, buf_len); 484 memcpy(iovs[i].iov_base, buf, len); 485 buf += len; 486 buf_len -= len; 487 } 488 } 489 490 static void 491 _bdev_io_set_bounce_buf(struct spdk_bdev_io *bdev_io, void *buf, size_t len) 492 { 493 /* save original iovec */ 494 bdev_io->internal.orig_iovs = bdev_io->u.bdev.iovs; 495 bdev_io->internal.orig_iovcnt = bdev_io->u.bdev.iovcnt; 496 /* set bounce iov */ 497 bdev_io->u.bdev.iovs = &bdev_io->internal.bounce_iov; 498 bdev_io->u.bdev.iovcnt = 1; 499 /* set bounce buffer for this operation */ 500 bdev_io->u.bdev.iovs[0].iov_base = buf; 501 bdev_io->u.bdev.iovs[0].iov_len = len; 502 /* if this is write path, copy data from original buffer to bounce buffer */ 503 if (bdev_io->type == SPDK_BDEV_IO_TYPE_WRITE) { 504 _copy_iovs_to_buf(buf, len, bdev_io->internal.orig_iovs, bdev_io->internal.orig_iovcnt); 505 } 506 } 507 508 static void 509 spdk_bdev_io_put_buf(struct spdk_bdev_io *bdev_io) 510 { 511 struct spdk_mempool *pool; 512 struct spdk_bdev_io *tmp; 513 void *buf, *aligned_buf; 514 bdev_io_stailq_t *stailq; 515 struct spdk_bdev_mgmt_channel *ch; 516 uint64_t buf_len; 517 uint64_t alignment; 518 bool buf_allocated; 519 520 buf = bdev_io->internal.buf; 521 buf_len = bdev_io->internal.buf_len; 522 alignment = spdk_bdev_get_buf_align(bdev_io->bdev); 523 ch = bdev_io->internal.ch->shared_resource->mgmt_ch; 524 525 bdev_io->internal.buf = NULL; 526 527 if (buf_len + alignment <= SPDK_BDEV_BUF_SIZE_WITH_MD(SPDK_BDEV_SMALL_BUF_MAX_SIZE) + 528 SPDK_BDEV_POOL_ALIGNMENT) { 529 pool = g_bdev_mgr.buf_small_pool; 530 stailq = &ch->need_buf_small; 531 } else { 532 pool = g_bdev_mgr.buf_large_pool; 533 stailq = &ch->need_buf_large; 534 } 535 536 if (STAILQ_EMPTY(stailq)) { 537 spdk_mempool_put(pool, buf); 538 } else { 539 tmp = STAILQ_FIRST(stailq); 540 541 alignment = spdk_bdev_get_buf_align(tmp->bdev); 542 buf_allocated = _is_buf_allocated(tmp->u.bdev.iovs); 543 544 aligned_buf = (void *)(((uintptr_t)buf + 545 (alignment - 1)) & ~(alignment - 1)); 546 if (buf_allocated) { 547 _bdev_io_set_bounce_buf(tmp, aligned_buf, tmp->internal.buf_len); 548 } else { 549 spdk_bdev_io_set_buf(tmp, aligned_buf, tmp->internal.buf_len); 550 } 551 552 STAILQ_REMOVE_HEAD(stailq, internal.buf_link); 553 tmp->internal.buf = buf; 554 tmp->internal.get_buf_cb(tmp->internal.ch->channel, tmp, true); 555 } 556 } 557 558 static void 559 _bdev_io_unset_bounce_buf(struct spdk_bdev_io *bdev_io) 560 { 561 /* if this is read path, copy data from bounce buffer to original buffer */ 562 if (bdev_io->type == SPDK_BDEV_IO_TYPE_READ && 563 bdev_io->internal.status == SPDK_BDEV_IO_STATUS_SUCCESS) { 564 _copy_buf_to_iovs(bdev_io->internal.orig_iovs, bdev_io->internal.orig_iovcnt, 565 bdev_io->internal.bounce_iov.iov_base, bdev_io->internal.bounce_iov.iov_len); 566 } 567 /* set orignal buffer for this io */ 568 bdev_io->u.bdev.iovcnt = bdev_io->internal.orig_iovcnt; 569 bdev_io->u.bdev.iovs = bdev_io->internal.orig_iovs; 570 /* disable bouncing buffer for this io */ 571 bdev_io->internal.orig_iovcnt = 0; 572 bdev_io->internal.orig_iovs = NULL; 573 /* return bounce buffer to the pool */ 574 spdk_bdev_io_put_buf(bdev_io); 575 } 576 577 void 578 spdk_bdev_io_get_buf(struct spdk_bdev_io *bdev_io, spdk_bdev_io_get_buf_cb cb, uint64_t len) 579 { 580 struct spdk_mempool *pool; 581 bdev_io_stailq_t *stailq; 582 void *buf, *aligned_buf; 583 struct spdk_bdev_mgmt_channel *mgmt_ch; 584 uint64_t alignment; 585 bool buf_allocated; 586 587 assert(cb != NULL); 588 assert(bdev_io->u.bdev.iovs != NULL); 589 590 alignment = spdk_bdev_get_buf_align(bdev_io->bdev); 591 buf_allocated = _is_buf_allocated(bdev_io->u.bdev.iovs); 592 593 if (buf_allocated && 594 _are_iovs_aligned(bdev_io->u.bdev.iovs, bdev_io->u.bdev.iovcnt, alignment)) { 595 /* Buffer already present and aligned */ 596 cb(bdev_io->internal.ch->channel, bdev_io, true); 597 return; 598 } 599 600 if (len + alignment > SPDK_BDEV_BUF_SIZE_WITH_MD(SPDK_BDEV_LARGE_BUF_MAX_SIZE) + 601 SPDK_BDEV_POOL_ALIGNMENT) { 602 SPDK_ERRLOG("Length + alignment %" PRIu64 " is larger than allowed\n", 603 len + alignment); 604 cb(bdev_io->internal.ch->channel, bdev_io, false); 605 return; 606 } 607 608 mgmt_ch = bdev_io->internal.ch->shared_resource->mgmt_ch; 609 610 bdev_io->internal.buf_len = len; 611 bdev_io->internal.get_buf_cb = cb; 612 613 if (len + alignment <= SPDK_BDEV_BUF_SIZE_WITH_MD(SPDK_BDEV_SMALL_BUF_MAX_SIZE) + 614 SPDK_BDEV_POOL_ALIGNMENT) { 615 pool = g_bdev_mgr.buf_small_pool; 616 stailq = &mgmt_ch->need_buf_small; 617 } else { 618 pool = g_bdev_mgr.buf_large_pool; 619 stailq = &mgmt_ch->need_buf_large; 620 } 621 622 buf = spdk_mempool_get(pool); 623 624 if (!buf) { 625 STAILQ_INSERT_TAIL(stailq, bdev_io, internal.buf_link); 626 } else { 627 aligned_buf = (void *)(((uintptr_t)buf + (alignment - 1)) & ~(alignment - 1)); 628 629 if (buf_allocated) { 630 _bdev_io_set_bounce_buf(bdev_io, aligned_buf, len); 631 } else { 632 spdk_bdev_io_set_buf(bdev_io, aligned_buf, len); 633 } 634 bdev_io->internal.buf = buf; 635 bdev_io->internal.get_buf_cb(bdev_io->internal.ch->channel, bdev_io, true); 636 } 637 } 638 639 static int 640 spdk_bdev_module_get_max_ctx_size(void) 641 { 642 struct spdk_bdev_module *bdev_module; 643 int max_bdev_module_size = 0; 644 645 TAILQ_FOREACH(bdev_module, &g_bdev_mgr.bdev_modules, internal.tailq) { 646 if (bdev_module->get_ctx_size && bdev_module->get_ctx_size() > max_bdev_module_size) { 647 max_bdev_module_size = bdev_module->get_ctx_size(); 648 } 649 } 650 651 return max_bdev_module_size; 652 } 653 654 void 655 spdk_bdev_config_text(FILE *fp) 656 { 657 struct spdk_bdev_module *bdev_module; 658 659 TAILQ_FOREACH(bdev_module, &g_bdev_mgr.bdev_modules, internal.tailq) { 660 if (bdev_module->config_text) { 661 bdev_module->config_text(fp); 662 } 663 } 664 } 665 666 static void 667 spdk_bdev_qos_config_json(struct spdk_bdev *bdev, struct spdk_json_write_ctx *w) 668 { 669 int i; 670 struct spdk_bdev_qos *qos = bdev->internal.qos; 671 uint64_t limits[SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES]; 672 673 if (!qos) { 674 return; 675 } 676 677 spdk_bdev_get_qos_rate_limits(bdev, limits); 678 679 spdk_json_write_object_begin(w); 680 spdk_json_write_named_string(w, "method", "set_bdev_qos_limit"); 681 682 spdk_json_write_named_object_begin(w, "params"); 683 spdk_json_write_named_string(w, "name", bdev->name); 684 for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) { 685 if (limits[i] > 0) { 686 spdk_json_write_named_uint64(w, qos_rpc_type[i], limits[i]); 687 } 688 } 689 spdk_json_write_object_end(w); 690 691 spdk_json_write_object_end(w); 692 } 693 694 void 695 spdk_bdev_subsystem_config_json(struct spdk_json_write_ctx *w) 696 { 697 struct spdk_bdev_module *bdev_module; 698 struct spdk_bdev *bdev; 699 700 assert(w != NULL); 701 702 spdk_json_write_array_begin(w); 703 704 spdk_json_write_object_begin(w); 705 spdk_json_write_named_string(w, "method", "set_bdev_options"); 706 spdk_json_write_named_object_begin(w, "params"); 707 spdk_json_write_named_uint32(w, "bdev_io_pool_size", g_bdev_opts.bdev_io_pool_size); 708 spdk_json_write_named_uint32(w, "bdev_io_cache_size", g_bdev_opts.bdev_io_cache_size); 709 spdk_json_write_object_end(w); 710 spdk_json_write_object_end(w); 711 712 TAILQ_FOREACH(bdev_module, &g_bdev_mgr.bdev_modules, internal.tailq) { 713 if (bdev_module->config_json) { 714 bdev_module->config_json(w); 715 } 716 } 717 718 TAILQ_FOREACH(bdev, &g_bdev_mgr.bdevs, internal.link) { 719 if (bdev->fn_table->write_config_json) { 720 bdev->fn_table->write_config_json(bdev, w); 721 } 722 723 spdk_bdev_qos_config_json(bdev, w); 724 } 725 726 spdk_json_write_array_end(w); 727 } 728 729 static int 730 spdk_bdev_mgmt_channel_create(void *io_device, void *ctx_buf) 731 { 732 struct spdk_bdev_mgmt_channel *ch = ctx_buf; 733 struct spdk_bdev_io *bdev_io; 734 uint32_t i; 735 736 STAILQ_INIT(&ch->need_buf_small); 737 STAILQ_INIT(&ch->need_buf_large); 738 739 STAILQ_INIT(&ch->per_thread_cache); 740 ch->bdev_io_cache_size = g_bdev_opts.bdev_io_cache_size; 741 742 /* Pre-populate bdev_io cache to ensure this thread cannot be starved. */ 743 ch->per_thread_cache_count = 0; 744 for (i = 0; i < ch->bdev_io_cache_size; i++) { 745 bdev_io = spdk_mempool_get(g_bdev_mgr.bdev_io_pool); 746 assert(bdev_io != NULL); 747 ch->per_thread_cache_count++; 748 STAILQ_INSERT_HEAD(&ch->per_thread_cache, bdev_io, internal.buf_link); 749 } 750 751 TAILQ_INIT(&ch->shared_resources); 752 TAILQ_INIT(&ch->io_wait_queue); 753 754 return 0; 755 } 756 757 static void 758 spdk_bdev_mgmt_channel_destroy(void *io_device, void *ctx_buf) 759 { 760 struct spdk_bdev_mgmt_channel *ch = ctx_buf; 761 struct spdk_bdev_io *bdev_io; 762 763 if (!STAILQ_EMPTY(&ch->need_buf_small) || !STAILQ_EMPTY(&ch->need_buf_large)) { 764 SPDK_ERRLOG("Pending I/O list wasn't empty on mgmt channel free\n"); 765 } 766 767 if (!TAILQ_EMPTY(&ch->shared_resources)) { 768 SPDK_ERRLOG("Module channel list wasn't empty on mgmt channel free\n"); 769 } 770 771 while (!STAILQ_EMPTY(&ch->per_thread_cache)) { 772 bdev_io = STAILQ_FIRST(&ch->per_thread_cache); 773 STAILQ_REMOVE_HEAD(&ch->per_thread_cache, internal.buf_link); 774 ch->per_thread_cache_count--; 775 spdk_mempool_put(g_bdev_mgr.bdev_io_pool, (void *)bdev_io); 776 } 777 778 assert(ch->per_thread_cache_count == 0); 779 } 780 781 static void 782 spdk_bdev_init_complete(int rc) 783 { 784 spdk_bdev_init_cb cb_fn = g_init_cb_fn; 785 void *cb_arg = g_init_cb_arg; 786 struct spdk_bdev_module *m; 787 788 g_bdev_mgr.init_complete = true; 789 g_init_cb_fn = NULL; 790 g_init_cb_arg = NULL; 791 792 /* 793 * For modules that need to know when subsystem init is complete, 794 * inform them now. 795 */ 796 if (rc == 0) { 797 TAILQ_FOREACH(m, &g_bdev_mgr.bdev_modules, internal.tailq) { 798 if (m->init_complete) { 799 m->init_complete(); 800 } 801 } 802 } 803 804 cb_fn(cb_arg, rc); 805 } 806 807 static void 808 spdk_bdev_module_action_complete(void) 809 { 810 struct spdk_bdev_module *m; 811 812 /* 813 * Don't finish bdev subsystem initialization if 814 * module pre-initialization is still in progress, or 815 * the subsystem been already initialized. 816 */ 817 if (!g_bdev_mgr.module_init_complete || g_bdev_mgr.init_complete) { 818 return; 819 } 820 821 /* 822 * Check all bdev modules for inits/examinations in progress. If any 823 * exist, return immediately since we cannot finish bdev subsystem 824 * initialization until all are completed. 825 */ 826 TAILQ_FOREACH(m, &g_bdev_mgr.bdev_modules, internal.tailq) { 827 if (m->internal.action_in_progress > 0) { 828 return; 829 } 830 } 831 832 /* 833 * Modules already finished initialization - now that all 834 * the bdev modules have finished their asynchronous I/O 835 * processing, the entire bdev layer can be marked as complete. 836 */ 837 spdk_bdev_init_complete(0); 838 } 839 840 static void 841 spdk_bdev_module_action_done(struct spdk_bdev_module *module) 842 { 843 assert(module->internal.action_in_progress > 0); 844 module->internal.action_in_progress--; 845 spdk_bdev_module_action_complete(); 846 } 847 848 void 849 spdk_bdev_module_init_done(struct spdk_bdev_module *module) 850 { 851 spdk_bdev_module_action_done(module); 852 } 853 854 void 855 spdk_bdev_module_examine_done(struct spdk_bdev_module *module) 856 { 857 spdk_bdev_module_action_done(module); 858 } 859 860 /** The last initialized bdev module */ 861 static struct spdk_bdev_module *g_resume_bdev_module = NULL; 862 863 static int 864 spdk_bdev_modules_init(void) 865 { 866 struct spdk_bdev_module *module; 867 int rc = 0; 868 869 TAILQ_FOREACH(module, &g_bdev_mgr.bdev_modules, internal.tailq) { 870 g_resume_bdev_module = module; 871 rc = module->module_init(); 872 if (rc != 0) { 873 return rc; 874 } 875 } 876 877 g_resume_bdev_module = NULL; 878 return 0; 879 } 880 881 static void 882 spdk_bdev_init_failed(void *cb_arg) 883 { 884 spdk_bdev_init_complete(-1); 885 } 886 887 void 888 spdk_bdev_initialize(spdk_bdev_init_cb cb_fn, void *cb_arg) 889 { 890 struct spdk_conf_section *sp; 891 struct spdk_bdev_opts bdev_opts; 892 int32_t bdev_io_pool_size, bdev_io_cache_size; 893 int cache_size; 894 int rc = 0; 895 char mempool_name[32]; 896 897 assert(cb_fn != NULL); 898 899 sp = spdk_conf_find_section(NULL, "Bdev"); 900 if (sp != NULL) { 901 spdk_bdev_get_opts(&bdev_opts); 902 903 bdev_io_pool_size = spdk_conf_section_get_intval(sp, "BdevIoPoolSize"); 904 if (bdev_io_pool_size >= 0) { 905 bdev_opts.bdev_io_pool_size = bdev_io_pool_size; 906 } 907 908 bdev_io_cache_size = spdk_conf_section_get_intval(sp, "BdevIoCacheSize"); 909 if (bdev_io_cache_size >= 0) { 910 bdev_opts.bdev_io_cache_size = bdev_io_cache_size; 911 } 912 913 if (spdk_bdev_set_opts(&bdev_opts)) { 914 spdk_bdev_init_complete(-1); 915 return; 916 } 917 918 assert(memcmp(&bdev_opts, &g_bdev_opts, sizeof(bdev_opts)) == 0); 919 } 920 921 g_init_cb_fn = cb_fn; 922 g_init_cb_arg = cb_arg; 923 924 spdk_notify_type_register("bdev_register"); 925 spdk_notify_type_register("bdev_unregister"); 926 927 snprintf(mempool_name, sizeof(mempool_name), "bdev_io_%d", getpid()); 928 929 g_bdev_mgr.bdev_io_pool = spdk_mempool_create(mempool_name, 930 g_bdev_opts.bdev_io_pool_size, 931 sizeof(struct spdk_bdev_io) + 932 spdk_bdev_module_get_max_ctx_size(), 933 0, 934 SPDK_ENV_SOCKET_ID_ANY); 935 936 if (g_bdev_mgr.bdev_io_pool == NULL) { 937 SPDK_ERRLOG("could not allocate spdk_bdev_io pool\n"); 938 spdk_bdev_init_complete(-1); 939 return; 940 } 941 942 /** 943 * Ensure no more than half of the total buffers end up local caches, by 944 * using spdk_thread_get_count() to determine how many local caches we need 945 * to account for. 946 */ 947 cache_size = BUF_SMALL_POOL_SIZE / (2 * spdk_thread_get_count()); 948 snprintf(mempool_name, sizeof(mempool_name), "buf_small_pool_%d", getpid()); 949 950 g_bdev_mgr.buf_small_pool = spdk_mempool_create(mempool_name, 951 BUF_SMALL_POOL_SIZE, 952 SPDK_BDEV_BUF_SIZE_WITH_MD(SPDK_BDEV_SMALL_BUF_MAX_SIZE) + 953 SPDK_BDEV_POOL_ALIGNMENT, 954 cache_size, 955 SPDK_ENV_SOCKET_ID_ANY); 956 if (!g_bdev_mgr.buf_small_pool) { 957 SPDK_ERRLOG("create rbuf small pool failed\n"); 958 spdk_bdev_init_complete(-1); 959 return; 960 } 961 962 cache_size = BUF_LARGE_POOL_SIZE / (2 * spdk_thread_get_count()); 963 snprintf(mempool_name, sizeof(mempool_name), "buf_large_pool_%d", getpid()); 964 965 g_bdev_mgr.buf_large_pool = spdk_mempool_create(mempool_name, 966 BUF_LARGE_POOL_SIZE, 967 SPDK_BDEV_BUF_SIZE_WITH_MD(SPDK_BDEV_LARGE_BUF_MAX_SIZE) + 968 SPDK_BDEV_POOL_ALIGNMENT, 969 cache_size, 970 SPDK_ENV_SOCKET_ID_ANY); 971 if (!g_bdev_mgr.buf_large_pool) { 972 SPDK_ERRLOG("create rbuf large pool failed\n"); 973 spdk_bdev_init_complete(-1); 974 return; 975 } 976 977 g_bdev_mgr.zero_buffer = spdk_zmalloc(ZERO_BUFFER_SIZE, ZERO_BUFFER_SIZE, 978 NULL, SPDK_ENV_LCORE_ID_ANY, SPDK_MALLOC_DMA); 979 if (!g_bdev_mgr.zero_buffer) { 980 SPDK_ERRLOG("create bdev zero buffer failed\n"); 981 spdk_bdev_init_complete(-1); 982 return; 983 } 984 985 #ifdef SPDK_CONFIG_VTUNE 986 g_bdev_mgr.domain = __itt_domain_create("spdk_bdev"); 987 #endif 988 989 spdk_io_device_register(&g_bdev_mgr, spdk_bdev_mgmt_channel_create, 990 spdk_bdev_mgmt_channel_destroy, 991 sizeof(struct spdk_bdev_mgmt_channel), 992 "bdev_mgr"); 993 994 rc = spdk_bdev_modules_init(); 995 g_bdev_mgr.module_init_complete = true; 996 if (rc != 0) { 997 SPDK_ERRLOG("bdev modules init failed\n"); 998 spdk_thread_send_msg(spdk_get_thread(), spdk_bdev_init_failed, NULL); 999 return; 1000 } 1001 1002 spdk_bdev_module_action_complete(); 1003 } 1004 1005 static void 1006 spdk_bdev_mgr_unregister_cb(void *io_device) 1007 { 1008 spdk_bdev_fini_cb cb_fn = g_fini_cb_fn; 1009 1010 if (spdk_mempool_count(g_bdev_mgr.bdev_io_pool) != g_bdev_opts.bdev_io_pool_size) { 1011 SPDK_ERRLOG("bdev IO pool count is %zu but should be %u\n", 1012 spdk_mempool_count(g_bdev_mgr.bdev_io_pool), 1013 g_bdev_opts.bdev_io_pool_size); 1014 } 1015 1016 if (spdk_mempool_count(g_bdev_mgr.buf_small_pool) != BUF_SMALL_POOL_SIZE) { 1017 SPDK_ERRLOG("Small buffer pool count is %zu but should be %u\n", 1018 spdk_mempool_count(g_bdev_mgr.buf_small_pool), 1019 BUF_SMALL_POOL_SIZE); 1020 assert(false); 1021 } 1022 1023 if (spdk_mempool_count(g_bdev_mgr.buf_large_pool) != BUF_LARGE_POOL_SIZE) { 1024 SPDK_ERRLOG("Large buffer pool count is %zu but should be %u\n", 1025 spdk_mempool_count(g_bdev_mgr.buf_large_pool), 1026 BUF_LARGE_POOL_SIZE); 1027 assert(false); 1028 } 1029 1030 spdk_mempool_free(g_bdev_mgr.bdev_io_pool); 1031 spdk_mempool_free(g_bdev_mgr.buf_small_pool); 1032 spdk_mempool_free(g_bdev_mgr.buf_large_pool); 1033 spdk_free(g_bdev_mgr.zero_buffer); 1034 1035 cb_fn(g_fini_cb_arg); 1036 g_fini_cb_fn = NULL; 1037 g_fini_cb_arg = NULL; 1038 g_bdev_mgr.init_complete = false; 1039 g_bdev_mgr.module_init_complete = false; 1040 } 1041 1042 static void 1043 spdk_bdev_module_finish_iter(void *arg) 1044 { 1045 struct spdk_bdev_module *bdev_module; 1046 1047 /* Start iterating from the last touched module */ 1048 if (!g_resume_bdev_module) { 1049 bdev_module = TAILQ_LAST(&g_bdev_mgr.bdev_modules, bdev_module_list); 1050 } else { 1051 bdev_module = TAILQ_PREV(g_resume_bdev_module, bdev_module_list, 1052 internal.tailq); 1053 } 1054 1055 while (bdev_module) { 1056 if (bdev_module->async_fini) { 1057 /* Save our place so we can resume later. We must 1058 * save the variable here, before calling module_fini() 1059 * below, because in some cases the module may immediately 1060 * call spdk_bdev_module_finish_done() and re-enter 1061 * this function to continue iterating. */ 1062 g_resume_bdev_module = bdev_module; 1063 } 1064 1065 if (bdev_module->module_fini) { 1066 bdev_module->module_fini(); 1067 } 1068 1069 if (bdev_module->async_fini) { 1070 return; 1071 } 1072 1073 bdev_module = TAILQ_PREV(bdev_module, bdev_module_list, 1074 internal.tailq); 1075 } 1076 1077 g_resume_bdev_module = NULL; 1078 spdk_io_device_unregister(&g_bdev_mgr, spdk_bdev_mgr_unregister_cb); 1079 } 1080 1081 void 1082 spdk_bdev_module_finish_done(void) 1083 { 1084 if (spdk_get_thread() != g_fini_thread) { 1085 spdk_thread_send_msg(g_fini_thread, spdk_bdev_module_finish_iter, NULL); 1086 } else { 1087 spdk_bdev_module_finish_iter(NULL); 1088 } 1089 } 1090 1091 static void 1092 _spdk_bdev_finish_unregister_bdevs_iter(void *cb_arg, int bdeverrno) 1093 { 1094 struct spdk_bdev *bdev = cb_arg; 1095 1096 if (bdeverrno && bdev) { 1097 SPDK_WARNLOG("Unable to unregister bdev '%s' during spdk_bdev_finish()\n", 1098 bdev->name); 1099 1100 /* 1101 * Since the call to spdk_bdev_unregister() failed, we have no way to free this 1102 * bdev; try to continue by manually removing this bdev from the list and continue 1103 * with the next bdev in the list. 1104 */ 1105 TAILQ_REMOVE(&g_bdev_mgr.bdevs, bdev, internal.link); 1106 } 1107 1108 if (TAILQ_EMPTY(&g_bdev_mgr.bdevs)) { 1109 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Done unregistering bdevs\n"); 1110 /* 1111 * Bdev module finish need to be deferred as we might be in the middle of some context 1112 * (like bdev part free) that will use this bdev (or private bdev driver ctx data) 1113 * after returning. 1114 */ 1115 spdk_thread_send_msg(spdk_get_thread(), spdk_bdev_module_finish_iter, NULL); 1116 return; 1117 } 1118 1119 /* 1120 * Unregister last unclaimed bdev in the list, to ensure that bdev subsystem 1121 * shutdown proceeds top-down. The goal is to give virtual bdevs an opportunity 1122 * to detect clean shutdown as opposed to run-time hot removal of the underlying 1123 * base bdevs. 1124 * 1125 * Also, walk the list in the reverse order. 1126 */ 1127 for (bdev = TAILQ_LAST(&g_bdev_mgr.bdevs, spdk_bdev_list); 1128 bdev; bdev = TAILQ_PREV(bdev, spdk_bdev_list, internal.link)) { 1129 if (bdev->internal.claim_module != NULL) { 1130 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Skipping claimed bdev '%s'(<-'%s').\n", 1131 bdev->name, bdev->internal.claim_module->name); 1132 continue; 1133 } 1134 1135 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Unregistering bdev '%s'\n", bdev->name); 1136 spdk_bdev_unregister(bdev, _spdk_bdev_finish_unregister_bdevs_iter, bdev); 1137 return; 1138 } 1139 1140 /* 1141 * If any bdev fails to unclaim underlying bdev properly, we may face the 1142 * case of bdev list consisting of claimed bdevs only (if claims are managed 1143 * correctly, this would mean there's a loop in the claims graph which is 1144 * clearly impossible). Warn and unregister last bdev on the list then. 1145 */ 1146 for (bdev = TAILQ_LAST(&g_bdev_mgr.bdevs, spdk_bdev_list); 1147 bdev; bdev = TAILQ_PREV(bdev, spdk_bdev_list, internal.link)) { 1148 SPDK_ERRLOG("Unregistering claimed bdev '%s'!\n", bdev->name); 1149 spdk_bdev_unregister(bdev, _spdk_bdev_finish_unregister_bdevs_iter, bdev); 1150 return; 1151 } 1152 } 1153 1154 void 1155 spdk_bdev_finish(spdk_bdev_fini_cb cb_fn, void *cb_arg) 1156 { 1157 struct spdk_bdev_module *m; 1158 1159 assert(cb_fn != NULL); 1160 1161 g_fini_thread = spdk_get_thread(); 1162 1163 g_fini_cb_fn = cb_fn; 1164 g_fini_cb_arg = cb_arg; 1165 1166 TAILQ_FOREACH(m, &g_bdev_mgr.bdev_modules, internal.tailq) { 1167 if (m->fini_start) { 1168 m->fini_start(); 1169 } 1170 } 1171 1172 _spdk_bdev_finish_unregister_bdevs_iter(NULL, 0); 1173 } 1174 1175 static struct spdk_bdev_io * 1176 spdk_bdev_get_io(struct spdk_bdev_channel *channel) 1177 { 1178 struct spdk_bdev_mgmt_channel *ch = channel->shared_resource->mgmt_ch; 1179 struct spdk_bdev_io *bdev_io; 1180 1181 if (ch->per_thread_cache_count > 0) { 1182 bdev_io = STAILQ_FIRST(&ch->per_thread_cache); 1183 STAILQ_REMOVE_HEAD(&ch->per_thread_cache, internal.buf_link); 1184 ch->per_thread_cache_count--; 1185 } else if (spdk_unlikely(!TAILQ_EMPTY(&ch->io_wait_queue))) { 1186 /* 1187 * Don't try to look for bdev_ios in the global pool if there are 1188 * waiters on bdev_ios - we don't want this caller to jump the line. 1189 */ 1190 bdev_io = NULL; 1191 } else { 1192 bdev_io = spdk_mempool_get(g_bdev_mgr.bdev_io_pool); 1193 } 1194 1195 return bdev_io; 1196 } 1197 1198 void 1199 spdk_bdev_free_io(struct spdk_bdev_io *bdev_io) 1200 { 1201 struct spdk_bdev_mgmt_channel *ch; 1202 1203 assert(bdev_io != NULL); 1204 assert(bdev_io->internal.status != SPDK_BDEV_IO_STATUS_PENDING); 1205 1206 ch = bdev_io->internal.ch->shared_resource->mgmt_ch; 1207 1208 if (bdev_io->internal.buf != NULL) { 1209 spdk_bdev_io_put_buf(bdev_io); 1210 } 1211 1212 if (ch->per_thread_cache_count < ch->bdev_io_cache_size) { 1213 ch->per_thread_cache_count++; 1214 STAILQ_INSERT_HEAD(&ch->per_thread_cache, bdev_io, internal.buf_link); 1215 while (ch->per_thread_cache_count > 0 && !TAILQ_EMPTY(&ch->io_wait_queue)) { 1216 struct spdk_bdev_io_wait_entry *entry; 1217 1218 entry = TAILQ_FIRST(&ch->io_wait_queue); 1219 TAILQ_REMOVE(&ch->io_wait_queue, entry, link); 1220 entry->cb_fn(entry->cb_arg); 1221 } 1222 } else { 1223 /* We should never have a full cache with entries on the io wait queue. */ 1224 assert(TAILQ_EMPTY(&ch->io_wait_queue)); 1225 spdk_mempool_put(g_bdev_mgr.bdev_io_pool, (void *)bdev_io); 1226 } 1227 } 1228 1229 static bool 1230 _spdk_bdev_qos_is_iops_rate_limit(enum spdk_bdev_qos_rate_limit_type limit) 1231 { 1232 assert(limit != SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES); 1233 1234 switch (limit) { 1235 case SPDK_BDEV_QOS_RW_IOPS_RATE_LIMIT: 1236 return true; 1237 case SPDK_BDEV_QOS_RW_BPS_RATE_LIMIT: 1238 case SPDK_BDEV_QOS_R_BPS_RATE_LIMIT: 1239 case SPDK_BDEV_QOS_W_BPS_RATE_LIMIT: 1240 return false; 1241 case SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES: 1242 default: 1243 return false; 1244 } 1245 } 1246 1247 static bool 1248 _spdk_bdev_qos_io_to_limit(struct spdk_bdev_io *bdev_io) 1249 { 1250 switch (bdev_io->type) { 1251 case SPDK_BDEV_IO_TYPE_NVME_IO: 1252 case SPDK_BDEV_IO_TYPE_NVME_IO_MD: 1253 case SPDK_BDEV_IO_TYPE_READ: 1254 case SPDK_BDEV_IO_TYPE_WRITE: 1255 return true; 1256 default: 1257 return false; 1258 } 1259 } 1260 1261 static bool 1262 _spdk_bdev_is_read_io(struct spdk_bdev_io *bdev_io) 1263 { 1264 switch (bdev_io->type) { 1265 case SPDK_BDEV_IO_TYPE_NVME_IO: 1266 case SPDK_BDEV_IO_TYPE_NVME_IO_MD: 1267 /* Bit 1 (0x2) set for read operation */ 1268 if (bdev_io->u.nvme_passthru.cmd.opc & SPDK_NVME_OPC_READ) { 1269 return true; 1270 } else { 1271 return false; 1272 } 1273 case SPDK_BDEV_IO_TYPE_READ: 1274 return true; 1275 default: 1276 return false; 1277 } 1278 } 1279 1280 static uint64_t 1281 _spdk_bdev_get_io_size_in_byte(struct spdk_bdev_io *bdev_io) 1282 { 1283 struct spdk_bdev *bdev = bdev_io->bdev; 1284 1285 switch (bdev_io->type) { 1286 case SPDK_BDEV_IO_TYPE_NVME_IO: 1287 case SPDK_BDEV_IO_TYPE_NVME_IO_MD: 1288 return bdev_io->u.nvme_passthru.nbytes; 1289 case SPDK_BDEV_IO_TYPE_READ: 1290 case SPDK_BDEV_IO_TYPE_WRITE: 1291 return bdev_io->u.bdev.num_blocks * bdev->blocklen; 1292 default: 1293 return 0; 1294 } 1295 } 1296 1297 static bool 1298 _spdk_bdev_qos_rw_queue_io(const struct spdk_bdev_qos_limit *limit, struct spdk_bdev_io *io) 1299 { 1300 if (limit->max_per_timeslice > 0 && limit->remaining_this_timeslice <= 0) { 1301 return true; 1302 } else { 1303 return false; 1304 } 1305 } 1306 1307 static bool 1308 _spdk_bdev_qos_r_queue_io(const struct spdk_bdev_qos_limit *limit, struct spdk_bdev_io *io) 1309 { 1310 if (_spdk_bdev_is_read_io(io) == false) { 1311 return false; 1312 } 1313 1314 return _spdk_bdev_qos_rw_queue_io(limit, io); 1315 } 1316 1317 static bool 1318 _spdk_bdev_qos_w_queue_io(const struct spdk_bdev_qos_limit *limit, struct spdk_bdev_io *io) 1319 { 1320 if (_spdk_bdev_is_read_io(io) == true) { 1321 return false; 1322 } 1323 1324 return _spdk_bdev_qos_rw_queue_io(limit, io); 1325 } 1326 1327 static void 1328 _spdk_bdev_qos_rw_iops_update_quota(struct spdk_bdev_qos_limit *limit, struct spdk_bdev_io *io) 1329 { 1330 limit->remaining_this_timeslice--; 1331 } 1332 1333 static void 1334 _spdk_bdev_qos_rw_bps_update_quota(struct spdk_bdev_qos_limit *limit, struct spdk_bdev_io *io) 1335 { 1336 limit->remaining_this_timeslice -= _spdk_bdev_get_io_size_in_byte(io); 1337 } 1338 1339 static void 1340 _spdk_bdev_qos_r_bps_update_quota(struct spdk_bdev_qos_limit *limit, struct spdk_bdev_io *io) 1341 { 1342 if (_spdk_bdev_is_read_io(io) == false) { 1343 return; 1344 } 1345 1346 return _spdk_bdev_qos_rw_bps_update_quota(limit, io); 1347 } 1348 1349 static void 1350 _spdk_bdev_qos_w_bps_update_quota(struct spdk_bdev_qos_limit *limit, struct spdk_bdev_io *io) 1351 { 1352 if (_spdk_bdev_is_read_io(io) == true) { 1353 return; 1354 } 1355 1356 return _spdk_bdev_qos_rw_bps_update_quota(limit, io); 1357 } 1358 1359 static void 1360 _spdk_bdev_qos_set_ops(struct spdk_bdev_qos *qos) 1361 { 1362 int i; 1363 1364 for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) { 1365 if (qos->rate_limits[i].limit == SPDK_BDEV_QOS_LIMIT_NOT_DEFINED) { 1366 qos->rate_limits[i].queue_io = NULL; 1367 qos->rate_limits[i].update_quota = NULL; 1368 continue; 1369 } 1370 1371 switch (i) { 1372 case SPDK_BDEV_QOS_RW_IOPS_RATE_LIMIT: 1373 qos->rate_limits[i].queue_io = _spdk_bdev_qos_rw_queue_io; 1374 qos->rate_limits[i].update_quota = _spdk_bdev_qos_rw_iops_update_quota; 1375 break; 1376 case SPDK_BDEV_QOS_RW_BPS_RATE_LIMIT: 1377 qos->rate_limits[i].queue_io = _spdk_bdev_qos_rw_queue_io; 1378 qos->rate_limits[i].update_quota = _spdk_bdev_qos_rw_bps_update_quota; 1379 break; 1380 case SPDK_BDEV_QOS_R_BPS_RATE_LIMIT: 1381 qos->rate_limits[i].queue_io = _spdk_bdev_qos_r_queue_io; 1382 qos->rate_limits[i].update_quota = _spdk_bdev_qos_r_bps_update_quota; 1383 break; 1384 case SPDK_BDEV_QOS_W_BPS_RATE_LIMIT: 1385 qos->rate_limits[i].queue_io = _spdk_bdev_qos_w_queue_io; 1386 qos->rate_limits[i].update_quota = _spdk_bdev_qos_w_bps_update_quota; 1387 break; 1388 default: 1389 break; 1390 } 1391 } 1392 } 1393 1394 static int 1395 _spdk_bdev_qos_io_submit(struct spdk_bdev_channel *ch, struct spdk_bdev_qos *qos) 1396 { 1397 struct spdk_bdev_io *bdev_io = NULL, *tmp = NULL; 1398 struct spdk_bdev *bdev = ch->bdev; 1399 struct spdk_bdev_shared_resource *shared_resource = ch->shared_resource; 1400 int i, submitted_ios = 0; 1401 1402 TAILQ_FOREACH_SAFE(bdev_io, &qos->queued, internal.link, tmp) { 1403 if (_spdk_bdev_qos_io_to_limit(bdev_io) == true) { 1404 for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) { 1405 if (!qos->rate_limits[i].queue_io) { 1406 continue; 1407 } 1408 1409 if (qos->rate_limits[i].queue_io(&qos->rate_limits[i], 1410 bdev_io) == true) { 1411 return submitted_ios; 1412 } 1413 } 1414 for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) { 1415 if (!qos->rate_limits[i].update_quota) { 1416 continue; 1417 } 1418 1419 qos->rate_limits[i].update_quota(&qos->rate_limits[i], bdev_io); 1420 } 1421 } 1422 1423 TAILQ_REMOVE(&qos->queued, bdev_io, internal.link); 1424 ch->io_outstanding++; 1425 shared_resource->io_outstanding++; 1426 bdev_io->internal.in_submit_request = true; 1427 bdev->fn_table->submit_request(ch->channel, bdev_io); 1428 bdev_io->internal.in_submit_request = false; 1429 submitted_ios++; 1430 } 1431 1432 return submitted_ios; 1433 } 1434 1435 static void 1436 _spdk_bdev_queue_io_wait_with_cb(struct spdk_bdev_io *bdev_io, spdk_bdev_io_wait_cb cb_fn) 1437 { 1438 int rc; 1439 1440 bdev_io->internal.waitq_entry.bdev = bdev_io->bdev; 1441 bdev_io->internal.waitq_entry.cb_fn = cb_fn; 1442 bdev_io->internal.waitq_entry.cb_arg = bdev_io; 1443 rc = spdk_bdev_queue_io_wait(bdev_io->bdev, spdk_io_channel_from_ctx(bdev_io->internal.ch), 1444 &bdev_io->internal.waitq_entry); 1445 if (rc != 0) { 1446 SPDK_ERRLOG("Queue IO failed, rc=%d\n", rc); 1447 bdev_io->internal.status = SPDK_BDEV_IO_STATUS_FAILED; 1448 bdev_io->internal.cb(bdev_io, false, bdev_io->internal.caller_ctx); 1449 } 1450 } 1451 1452 static bool 1453 _spdk_bdev_io_type_can_split(uint8_t type) 1454 { 1455 assert(type != SPDK_BDEV_IO_TYPE_INVALID); 1456 assert(type < SPDK_BDEV_NUM_IO_TYPES); 1457 1458 /* Only split READ and WRITE I/O. Theoretically other types of I/O like 1459 * UNMAP could be split, but these types of I/O are typically much larger 1460 * in size (sometimes the size of the entire block device), and the bdev 1461 * module can more efficiently split these types of I/O. Plus those types 1462 * of I/O do not have a payload, which makes the splitting process simpler. 1463 */ 1464 if (type == SPDK_BDEV_IO_TYPE_READ || type == SPDK_BDEV_IO_TYPE_WRITE) { 1465 return true; 1466 } else { 1467 return false; 1468 } 1469 } 1470 1471 static bool 1472 _spdk_bdev_io_should_split(struct spdk_bdev_io *bdev_io) 1473 { 1474 uint64_t start_stripe, end_stripe; 1475 uint32_t io_boundary = bdev_io->bdev->optimal_io_boundary; 1476 1477 if (io_boundary == 0) { 1478 return false; 1479 } 1480 1481 if (!_spdk_bdev_io_type_can_split(bdev_io->type)) { 1482 return false; 1483 } 1484 1485 start_stripe = bdev_io->u.bdev.offset_blocks; 1486 end_stripe = start_stripe + bdev_io->u.bdev.num_blocks - 1; 1487 /* Avoid expensive div operations if possible. These spdk_u32 functions are very cheap. */ 1488 if (spdk_likely(spdk_u32_is_pow2(io_boundary))) { 1489 start_stripe >>= spdk_u32log2(io_boundary); 1490 end_stripe >>= spdk_u32log2(io_boundary); 1491 } else { 1492 start_stripe /= io_boundary; 1493 end_stripe /= io_boundary; 1494 } 1495 return (start_stripe != end_stripe); 1496 } 1497 1498 static uint32_t 1499 _to_next_boundary(uint64_t offset, uint32_t boundary) 1500 { 1501 return (boundary - (offset % boundary)); 1502 } 1503 1504 static void 1505 _spdk_bdev_io_split_done(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg); 1506 1507 static void 1508 _spdk_bdev_io_split_with_payload(void *_bdev_io) 1509 { 1510 struct spdk_bdev_io *bdev_io = _bdev_io; 1511 uint64_t current_offset, remaining; 1512 uint32_t blocklen, to_next_boundary, to_next_boundary_bytes; 1513 struct iovec *parent_iov, *iov; 1514 uint64_t parent_iov_offset, iov_len; 1515 uint32_t parent_iovpos, parent_iovcnt, child_iovcnt, iovcnt; 1516 int rc; 1517 1518 remaining = bdev_io->u.bdev.split_remaining_num_blocks; 1519 current_offset = bdev_io->u.bdev.split_current_offset_blocks; 1520 blocklen = bdev_io->bdev->blocklen; 1521 parent_iov_offset = (current_offset - bdev_io->u.bdev.offset_blocks) * blocklen; 1522 parent_iovcnt = bdev_io->u.bdev.iovcnt; 1523 1524 for (parent_iovpos = 0; parent_iovpos < parent_iovcnt; parent_iovpos++) { 1525 parent_iov = &bdev_io->u.bdev.iovs[parent_iovpos]; 1526 if (parent_iov_offset < parent_iov->iov_len) { 1527 break; 1528 } 1529 parent_iov_offset -= parent_iov->iov_len; 1530 } 1531 1532 child_iovcnt = 0; 1533 while (remaining > 0 && parent_iovpos < parent_iovcnt && child_iovcnt < BDEV_IO_NUM_CHILD_IOV) { 1534 to_next_boundary = _to_next_boundary(current_offset, bdev_io->bdev->optimal_io_boundary); 1535 to_next_boundary = spdk_min(remaining, to_next_boundary); 1536 to_next_boundary_bytes = to_next_boundary * blocklen; 1537 iov = &bdev_io->child_iov[child_iovcnt]; 1538 iovcnt = 0; 1539 while (to_next_boundary_bytes > 0 && parent_iovpos < parent_iovcnt && 1540 child_iovcnt < BDEV_IO_NUM_CHILD_IOV) { 1541 parent_iov = &bdev_io->u.bdev.iovs[parent_iovpos]; 1542 iov_len = spdk_min(to_next_boundary_bytes, parent_iov->iov_len - parent_iov_offset); 1543 to_next_boundary_bytes -= iov_len; 1544 1545 bdev_io->child_iov[child_iovcnt].iov_base = parent_iov->iov_base + parent_iov_offset; 1546 bdev_io->child_iov[child_iovcnt].iov_len = iov_len; 1547 1548 if (iov_len < parent_iov->iov_len - parent_iov_offset) { 1549 parent_iov_offset += iov_len; 1550 } else { 1551 parent_iovpos++; 1552 parent_iov_offset = 0; 1553 } 1554 child_iovcnt++; 1555 iovcnt++; 1556 } 1557 1558 if (to_next_boundary_bytes > 0) { 1559 /* We had to stop this child I/O early because we ran out of 1560 * child_iov space. Make sure the iovs collected are valid and 1561 * then adjust to_next_boundary before starting the child I/O. 1562 */ 1563 if ((to_next_boundary_bytes % blocklen) != 0) { 1564 SPDK_ERRLOG("Remaining %" PRIu32 " is not multiple of block size %" PRIu32 "\n", 1565 to_next_boundary_bytes, blocklen); 1566 bdev_io->internal.status = SPDK_BDEV_IO_STATUS_FAILED; 1567 if (bdev_io->u.bdev.split_outstanding == 0) { 1568 bdev_io->internal.cb(bdev_io, false, bdev_io->internal.caller_ctx); 1569 } 1570 return; 1571 } 1572 to_next_boundary -= to_next_boundary_bytes / blocklen; 1573 } 1574 1575 bdev_io->u.bdev.split_outstanding++; 1576 1577 if (bdev_io->type == SPDK_BDEV_IO_TYPE_READ) { 1578 rc = spdk_bdev_readv_blocks(bdev_io->internal.desc, 1579 spdk_io_channel_from_ctx(bdev_io->internal.ch), 1580 iov, iovcnt, current_offset, to_next_boundary, 1581 _spdk_bdev_io_split_done, bdev_io); 1582 } else { 1583 rc = spdk_bdev_writev_blocks(bdev_io->internal.desc, 1584 spdk_io_channel_from_ctx(bdev_io->internal.ch), 1585 iov, iovcnt, current_offset, to_next_boundary, 1586 _spdk_bdev_io_split_done, bdev_io); 1587 } 1588 1589 if (rc == 0) { 1590 current_offset += to_next_boundary; 1591 remaining -= to_next_boundary; 1592 bdev_io->u.bdev.split_current_offset_blocks = current_offset; 1593 bdev_io->u.bdev.split_remaining_num_blocks = remaining; 1594 } else { 1595 bdev_io->u.bdev.split_outstanding--; 1596 if (rc == -ENOMEM) { 1597 if (bdev_io->u.bdev.split_outstanding == 0) { 1598 /* No I/O is outstanding. Hence we should wait here. */ 1599 _spdk_bdev_queue_io_wait_with_cb(bdev_io, 1600 _spdk_bdev_io_split_with_payload); 1601 } 1602 } else { 1603 bdev_io->internal.status = SPDK_BDEV_IO_STATUS_FAILED; 1604 if (bdev_io->u.bdev.split_outstanding == 0) { 1605 bdev_io->internal.cb(bdev_io, false, bdev_io->internal.caller_ctx); 1606 } 1607 } 1608 1609 return; 1610 } 1611 } 1612 } 1613 1614 static void 1615 _spdk_bdev_io_split_done(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg) 1616 { 1617 struct spdk_bdev_io *parent_io = cb_arg; 1618 1619 spdk_bdev_free_io(bdev_io); 1620 1621 if (!success) { 1622 parent_io->internal.status = SPDK_BDEV_IO_STATUS_FAILED; 1623 } 1624 parent_io->u.bdev.split_outstanding--; 1625 if (parent_io->u.bdev.split_outstanding != 0) { 1626 return; 1627 } 1628 1629 /* 1630 * Parent I/O finishes when all blocks are consumed or there is any failure of 1631 * child I/O and no outstanding child I/O. 1632 */ 1633 if (parent_io->u.bdev.split_remaining_num_blocks == 0 || 1634 parent_io->internal.status != SPDK_BDEV_IO_STATUS_SUCCESS) { 1635 parent_io->internal.cb(parent_io, parent_io->internal.status == SPDK_BDEV_IO_STATUS_SUCCESS, 1636 parent_io->internal.caller_ctx); 1637 return; 1638 } 1639 1640 /* 1641 * Continue with the splitting process. This function will complete the parent I/O if the 1642 * splitting is done. 1643 */ 1644 _spdk_bdev_io_split_with_payload(parent_io); 1645 } 1646 1647 static void 1648 _spdk_bdev_io_split(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io) 1649 { 1650 assert(_spdk_bdev_io_type_can_split(bdev_io->type)); 1651 1652 bdev_io->u.bdev.split_current_offset_blocks = bdev_io->u.bdev.offset_blocks; 1653 bdev_io->u.bdev.split_remaining_num_blocks = bdev_io->u.bdev.num_blocks; 1654 bdev_io->u.bdev.split_outstanding = 0; 1655 bdev_io->internal.status = SPDK_BDEV_IO_STATUS_SUCCESS; 1656 1657 _spdk_bdev_io_split_with_payload(bdev_io); 1658 } 1659 1660 static void 1661 _spdk_bdev_io_split_get_buf_cb(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io, 1662 bool success) 1663 { 1664 if (!success) { 1665 spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED); 1666 return; 1667 } 1668 1669 _spdk_bdev_io_split(ch, bdev_io); 1670 } 1671 1672 /* Explicitly mark this inline, since it's used as a function pointer and otherwise won't 1673 * be inlined, at least on some compilers. 1674 */ 1675 static inline void 1676 _spdk_bdev_io_submit(void *ctx) 1677 { 1678 struct spdk_bdev_io *bdev_io = ctx; 1679 struct spdk_bdev *bdev = bdev_io->bdev; 1680 struct spdk_bdev_channel *bdev_ch = bdev_io->internal.ch; 1681 struct spdk_io_channel *ch = bdev_ch->channel; 1682 struct spdk_bdev_shared_resource *shared_resource = bdev_ch->shared_resource; 1683 uint64_t tsc; 1684 1685 tsc = spdk_get_ticks(); 1686 bdev_io->internal.submit_tsc = tsc; 1687 spdk_trace_record_tsc(tsc, TRACE_BDEV_IO_START, 0, 0, (uintptr_t)bdev_io, bdev_io->type); 1688 bdev_ch->io_outstanding++; 1689 shared_resource->io_outstanding++; 1690 bdev_io->internal.in_submit_request = true; 1691 if (spdk_likely(bdev_ch->flags == 0)) { 1692 if (spdk_likely(TAILQ_EMPTY(&shared_resource->nomem_io))) { 1693 bdev->fn_table->submit_request(ch, bdev_io); 1694 } else { 1695 bdev_ch->io_outstanding--; 1696 shared_resource->io_outstanding--; 1697 TAILQ_INSERT_TAIL(&shared_resource->nomem_io, bdev_io, internal.link); 1698 } 1699 } else if (bdev_ch->flags & BDEV_CH_RESET_IN_PROGRESS) { 1700 spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED); 1701 } else if (bdev_ch->flags & BDEV_CH_QOS_ENABLED) { 1702 bdev_ch->io_outstanding--; 1703 shared_resource->io_outstanding--; 1704 TAILQ_INSERT_TAIL(&bdev->internal.qos->queued, bdev_io, internal.link); 1705 _spdk_bdev_qos_io_submit(bdev_ch, bdev->internal.qos); 1706 } else { 1707 SPDK_ERRLOG("unknown bdev_ch flag %x found\n", bdev_ch->flags); 1708 spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED); 1709 } 1710 bdev_io->internal.in_submit_request = false; 1711 } 1712 1713 static void 1714 spdk_bdev_io_submit(struct spdk_bdev_io *bdev_io) 1715 { 1716 struct spdk_bdev *bdev = bdev_io->bdev; 1717 struct spdk_thread *thread = spdk_io_channel_get_thread(bdev_io->internal.ch->channel); 1718 1719 assert(thread != NULL); 1720 assert(bdev_io->internal.status == SPDK_BDEV_IO_STATUS_PENDING); 1721 1722 if (bdev->split_on_optimal_io_boundary && _spdk_bdev_io_should_split(bdev_io)) { 1723 if (bdev_io->type == SPDK_BDEV_IO_TYPE_READ) { 1724 spdk_bdev_io_get_buf(bdev_io, _spdk_bdev_io_split_get_buf_cb, 1725 bdev_io->u.bdev.num_blocks * bdev_io->bdev->blocklen); 1726 } else { 1727 _spdk_bdev_io_split(NULL, bdev_io); 1728 } 1729 return; 1730 } 1731 1732 if (bdev_io->internal.ch->flags & BDEV_CH_QOS_ENABLED) { 1733 if ((thread == bdev->internal.qos->thread) || !bdev->internal.qos->thread) { 1734 _spdk_bdev_io_submit(bdev_io); 1735 } else { 1736 bdev_io->internal.io_submit_ch = bdev_io->internal.ch; 1737 bdev_io->internal.ch = bdev->internal.qos->ch; 1738 spdk_thread_send_msg(bdev->internal.qos->thread, _spdk_bdev_io_submit, bdev_io); 1739 } 1740 } else { 1741 _spdk_bdev_io_submit(bdev_io); 1742 } 1743 } 1744 1745 static void 1746 spdk_bdev_io_submit_reset(struct spdk_bdev_io *bdev_io) 1747 { 1748 struct spdk_bdev *bdev = bdev_io->bdev; 1749 struct spdk_bdev_channel *bdev_ch = bdev_io->internal.ch; 1750 struct spdk_io_channel *ch = bdev_ch->channel; 1751 1752 assert(bdev_io->internal.status == SPDK_BDEV_IO_STATUS_PENDING); 1753 1754 bdev_io->internal.in_submit_request = true; 1755 bdev->fn_table->submit_request(ch, bdev_io); 1756 bdev_io->internal.in_submit_request = false; 1757 } 1758 1759 static void 1760 spdk_bdev_io_init(struct spdk_bdev_io *bdev_io, 1761 struct spdk_bdev *bdev, void *cb_arg, 1762 spdk_bdev_io_completion_cb cb) 1763 { 1764 bdev_io->bdev = bdev; 1765 bdev_io->internal.caller_ctx = cb_arg; 1766 bdev_io->internal.cb = cb; 1767 bdev_io->internal.status = SPDK_BDEV_IO_STATUS_PENDING; 1768 bdev_io->internal.in_submit_request = false; 1769 bdev_io->internal.buf = NULL; 1770 bdev_io->internal.io_submit_ch = NULL; 1771 bdev_io->internal.orig_iovs = NULL; 1772 bdev_io->internal.orig_iovcnt = 0; 1773 } 1774 1775 static bool 1776 _spdk_bdev_io_type_supported(struct spdk_bdev *bdev, enum spdk_bdev_io_type io_type) 1777 { 1778 return bdev->fn_table->io_type_supported(bdev->ctxt, io_type); 1779 } 1780 1781 bool 1782 spdk_bdev_io_type_supported(struct spdk_bdev *bdev, enum spdk_bdev_io_type io_type) 1783 { 1784 bool supported; 1785 1786 supported = _spdk_bdev_io_type_supported(bdev, io_type); 1787 1788 if (!supported) { 1789 switch (io_type) { 1790 case SPDK_BDEV_IO_TYPE_WRITE_ZEROES: 1791 /* The bdev layer will emulate write zeroes as long as write is supported. */ 1792 supported = _spdk_bdev_io_type_supported(bdev, SPDK_BDEV_IO_TYPE_WRITE); 1793 break; 1794 default: 1795 break; 1796 } 1797 } 1798 1799 return supported; 1800 } 1801 1802 int 1803 spdk_bdev_dump_info_json(struct spdk_bdev *bdev, struct spdk_json_write_ctx *w) 1804 { 1805 if (bdev->fn_table->dump_info_json) { 1806 return bdev->fn_table->dump_info_json(bdev->ctxt, w); 1807 } 1808 1809 return 0; 1810 } 1811 1812 static void 1813 spdk_bdev_qos_update_max_quota_per_timeslice(struct spdk_bdev_qos *qos) 1814 { 1815 uint32_t max_per_timeslice = 0; 1816 int i; 1817 1818 for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) { 1819 if (qos->rate_limits[i].limit == SPDK_BDEV_QOS_LIMIT_NOT_DEFINED) { 1820 qos->rate_limits[i].max_per_timeslice = 0; 1821 continue; 1822 } 1823 1824 max_per_timeslice = qos->rate_limits[i].limit * 1825 SPDK_BDEV_QOS_TIMESLICE_IN_USEC / SPDK_SEC_TO_USEC; 1826 1827 qos->rate_limits[i].max_per_timeslice = spdk_max(max_per_timeslice, 1828 qos->rate_limits[i].min_per_timeslice); 1829 1830 qos->rate_limits[i].remaining_this_timeslice = qos->rate_limits[i].max_per_timeslice; 1831 } 1832 1833 _spdk_bdev_qos_set_ops(qos); 1834 } 1835 1836 static int 1837 spdk_bdev_channel_poll_qos(void *arg) 1838 { 1839 struct spdk_bdev_qos *qos = arg; 1840 uint64_t now = spdk_get_ticks(); 1841 int i; 1842 1843 if (now < (qos->last_timeslice + qos->timeslice_size)) { 1844 /* We received our callback earlier than expected - return 1845 * immediately and wait to do accounting until at least one 1846 * timeslice has actually expired. This should never happen 1847 * with a well-behaved timer implementation. 1848 */ 1849 return 0; 1850 } 1851 1852 /* Reset for next round of rate limiting */ 1853 for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) { 1854 /* We may have allowed the IOs or bytes to slightly overrun in the last 1855 * timeslice. remaining_this_timeslice is signed, so if it's negative 1856 * here, we'll account for the overrun so that the next timeslice will 1857 * be appropriately reduced. 1858 */ 1859 if (qos->rate_limits[i].remaining_this_timeslice > 0) { 1860 qos->rate_limits[i].remaining_this_timeslice = 0; 1861 } 1862 } 1863 1864 while (now >= (qos->last_timeslice + qos->timeslice_size)) { 1865 qos->last_timeslice += qos->timeslice_size; 1866 for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) { 1867 qos->rate_limits[i].remaining_this_timeslice += 1868 qos->rate_limits[i].max_per_timeslice; 1869 } 1870 } 1871 1872 return _spdk_bdev_qos_io_submit(qos->ch, qos); 1873 } 1874 1875 static void 1876 _spdk_bdev_channel_destroy_resource(struct spdk_bdev_channel *ch) 1877 { 1878 struct spdk_bdev_shared_resource *shared_resource; 1879 1880 spdk_put_io_channel(ch->channel); 1881 1882 shared_resource = ch->shared_resource; 1883 1884 assert(ch->io_outstanding == 0); 1885 assert(shared_resource->ref > 0); 1886 shared_resource->ref--; 1887 if (shared_resource->ref == 0) { 1888 assert(shared_resource->io_outstanding == 0); 1889 TAILQ_REMOVE(&shared_resource->mgmt_ch->shared_resources, shared_resource, link); 1890 spdk_put_io_channel(spdk_io_channel_from_ctx(shared_resource->mgmt_ch)); 1891 free(shared_resource); 1892 } 1893 } 1894 1895 /* Caller must hold bdev->internal.mutex. */ 1896 static void 1897 _spdk_bdev_enable_qos(struct spdk_bdev *bdev, struct spdk_bdev_channel *ch) 1898 { 1899 struct spdk_bdev_qos *qos = bdev->internal.qos; 1900 int i; 1901 1902 /* Rate limiting on this bdev enabled */ 1903 if (qos) { 1904 if (qos->ch == NULL) { 1905 struct spdk_io_channel *io_ch; 1906 1907 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Selecting channel %p as QoS channel for bdev %s on thread %p\n", ch, 1908 bdev->name, spdk_get_thread()); 1909 1910 /* No qos channel has been selected, so set one up */ 1911 1912 /* Take another reference to ch */ 1913 io_ch = spdk_get_io_channel(__bdev_to_io_dev(bdev)); 1914 assert(io_ch != NULL); 1915 qos->ch = ch; 1916 1917 qos->thread = spdk_io_channel_get_thread(io_ch); 1918 1919 TAILQ_INIT(&qos->queued); 1920 1921 for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) { 1922 if (_spdk_bdev_qos_is_iops_rate_limit(i) == true) { 1923 qos->rate_limits[i].min_per_timeslice = 1924 SPDK_BDEV_QOS_MIN_IO_PER_TIMESLICE; 1925 } else { 1926 qos->rate_limits[i].min_per_timeslice = 1927 SPDK_BDEV_QOS_MIN_BYTE_PER_TIMESLICE; 1928 } 1929 1930 if (qos->rate_limits[i].limit == 0) { 1931 qos->rate_limits[i].limit = SPDK_BDEV_QOS_LIMIT_NOT_DEFINED; 1932 } 1933 } 1934 spdk_bdev_qos_update_max_quota_per_timeslice(qos); 1935 qos->timeslice_size = 1936 SPDK_BDEV_QOS_TIMESLICE_IN_USEC * spdk_get_ticks_hz() / SPDK_SEC_TO_USEC; 1937 qos->last_timeslice = spdk_get_ticks(); 1938 qos->poller = spdk_poller_register(spdk_bdev_channel_poll_qos, 1939 qos, 1940 SPDK_BDEV_QOS_TIMESLICE_IN_USEC); 1941 } 1942 1943 ch->flags |= BDEV_CH_QOS_ENABLED; 1944 } 1945 } 1946 1947 static int 1948 spdk_bdev_channel_create(void *io_device, void *ctx_buf) 1949 { 1950 struct spdk_bdev *bdev = __bdev_from_io_dev(io_device); 1951 struct spdk_bdev_channel *ch = ctx_buf; 1952 struct spdk_io_channel *mgmt_io_ch; 1953 struct spdk_bdev_mgmt_channel *mgmt_ch; 1954 struct spdk_bdev_shared_resource *shared_resource; 1955 1956 ch->bdev = bdev; 1957 ch->channel = bdev->fn_table->get_io_channel(bdev->ctxt); 1958 if (!ch->channel) { 1959 return -1; 1960 } 1961 1962 assert(ch->histogram == NULL); 1963 if (bdev->internal.histogram_enabled) { 1964 ch->histogram = spdk_histogram_data_alloc(); 1965 if (ch->histogram == NULL) { 1966 SPDK_ERRLOG("Could not allocate histogram\n"); 1967 } 1968 } 1969 1970 mgmt_io_ch = spdk_get_io_channel(&g_bdev_mgr); 1971 if (!mgmt_io_ch) { 1972 spdk_put_io_channel(ch->channel); 1973 return -1; 1974 } 1975 1976 mgmt_ch = spdk_io_channel_get_ctx(mgmt_io_ch); 1977 TAILQ_FOREACH(shared_resource, &mgmt_ch->shared_resources, link) { 1978 if (shared_resource->shared_ch == ch->channel) { 1979 spdk_put_io_channel(mgmt_io_ch); 1980 shared_resource->ref++; 1981 break; 1982 } 1983 } 1984 1985 if (shared_resource == NULL) { 1986 shared_resource = calloc(1, sizeof(*shared_resource)); 1987 if (shared_resource == NULL) { 1988 spdk_put_io_channel(ch->channel); 1989 spdk_put_io_channel(mgmt_io_ch); 1990 return -1; 1991 } 1992 1993 shared_resource->mgmt_ch = mgmt_ch; 1994 shared_resource->io_outstanding = 0; 1995 TAILQ_INIT(&shared_resource->nomem_io); 1996 shared_resource->nomem_threshold = 0; 1997 shared_resource->shared_ch = ch->channel; 1998 shared_resource->ref = 1; 1999 TAILQ_INSERT_TAIL(&mgmt_ch->shared_resources, shared_resource, link); 2000 } 2001 2002 memset(&ch->stat, 0, sizeof(ch->stat)); 2003 ch->stat.ticks_rate = spdk_get_ticks_hz(); 2004 ch->io_outstanding = 0; 2005 TAILQ_INIT(&ch->queued_resets); 2006 ch->flags = 0; 2007 ch->shared_resource = shared_resource; 2008 2009 #ifdef SPDK_CONFIG_VTUNE 2010 { 2011 char *name; 2012 __itt_init_ittlib(NULL, 0); 2013 name = spdk_sprintf_alloc("spdk_bdev_%s_%p", ch->bdev->name, ch); 2014 if (!name) { 2015 _spdk_bdev_channel_destroy_resource(ch); 2016 return -1; 2017 } 2018 ch->handle = __itt_string_handle_create(name); 2019 free(name); 2020 ch->start_tsc = spdk_get_ticks(); 2021 ch->interval_tsc = spdk_get_ticks_hz() / 100; 2022 memset(&ch->prev_stat, 0, sizeof(ch->prev_stat)); 2023 } 2024 #endif 2025 2026 pthread_mutex_lock(&bdev->internal.mutex); 2027 _spdk_bdev_enable_qos(bdev, ch); 2028 pthread_mutex_unlock(&bdev->internal.mutex); 2029 2030 return 0; 2031 } 2032 2033 /* 2034 * Abort I/O that are waiting on a data buffer. These types of I/O are 2035 * linked using the spdk_bdev_io internal.buf_link TAILQ_ENTRY. 2036 */ 2037 static void 2038 _spdk_bdev_abort_buf_io(bdev_io_stailq_t *queue, struct spdk_bdev_channel *ch) 2039 { 2040 bdev_io_stailq_t tmp; 2041 struct spdk_bdev_io *bdev_io; 2042 2043 STAILQ_INIT(&tmp); 2044 2045 while (!STAILQ_EMPTY(queue)) { 2046 bdev_io = STAILQ_FIRST(queue); 2047 STAILQ_REMOVE_HEAD(queue, internal.buf_link); 2048 if (bdev_io->internal.ch == ch) { 2049 spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED); 2050 } else { 2051 STAILQ_INSERT_TAIL(&tmp, bdev_io, internal.buf_link); 2052 } 2053 } 2054 2055 STAILQ_SWAP(&tmp, queue, spdk_bdev_io); 2056 } 2057 2058 /* 2059 * Abort I/O that are queued waiting for submission. These types of I/O are 2060 * linked using the spdk_bdev_io link TAILQ_ENTRY. 2061 */ 2062 static void 2063 _spdk_bdev_abort_queued_io(bdev_io_tailq_t *queue, struct spdk_bdev_channel *ch) 2064 { 2065 struct spdk_bdev_io *bdev_io, *tmp; 2066 2067 TAILQ_FOREACH_SAFE(bdev_io, queue, internal.link, tmp) { 2068 if (bdev_io->internal.ch == ch) { 2069 TAILQ_REMOVE(queue, bdev_io, internal.link); 2070 /* 2071 * spdk_bdev_io_complete() assumes that the completed I/O had 2072 * been submitted to the bdev module. Since in this case it 2073 * hadn't, bump io_outstanding to account for the decrement 2074 * that spdk_bdev_io_complete() will do. 2075 */ 2076 if (bdev_io->type != SPDK_BDEV_IO_TYPE_RESET) { 2077 ch->io_outstanding++; 2078 ch->shared_resource->io_outstanding++; 2079 } 2080 spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED); 2081 } 2082 } 2083 } 2084 2085 static void 2086 spdk_bdev_qos_channel_destroy(void *cb_arg) 2087 { 2088 struct spdk_bdev_qos *qos = cb_arg; 2089 2090 spdk_put_io_channel(spdk_io_channel_from_ctx(qos->ch)); 2091 spdk_poller_unregister(&qos->poller); 2092 2093 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Free QoS %p.\n", qos); 2094 2095 free(qos); 2096 } 2097 2098 static int 2099 spdk_bdev_qos_destroy(struct spdk_bdev *bdev) 2100 { 2101 int i; 2102 2103 /* 2104 * Cleanly shutting down the QoS poller is tricky, because 2105 * during the asynchronous operation the user could open 2106 * a new descriptor and create a new channel, spawning 2107 * a new QoS poller. 2108 * 2109 * The strategy is to create a new QoS structure here and swap it 2110 * in. The shutdown path then continues to refer to the old one 2111 * until it completes and then releases it. 2112 */ 2113 struct spdk_bdev_qos *new_qos, *old_qos; 2114 2115 old_qos = bdev->internal.qos; 2116 2117 new_qos = calloc(1, sizeof(*new_qos)); 2118 if (!new_qos) { 2119 SPDK_ERRLOG("Unable to allocate memory to shut down QoS.\n"); 2120 return -ENOMEM; 2121 } 2122 2123 /* Copy the old QoS data into the newly allocated structure */ 2124 memcpy(new_qos, old_qos, sizeof(*new_qos)); 2125 2126 /* Zero out the key parts of the QoS structure */ 2127 new_qos->ch = NULL; 2128 new_qos->thread = NULL; 2129 new_qos->poller = NULL; 2130 TAILQ_INIT(&new_qos->queued); 2131 /* 2132 * The limit member of spdk_bdev_qos_limit structure is not zeroed. 2133 * It will be used later for the new QoS structure. 2134 */ 2135 for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) { 2136 new_qos->rate_limits[i].remaining_this_timeslice = 0; 2137 new_qos->rate_limits[i].min_per_timeslice = 0; 2138 new_qos->rate_limits[i].max_per_timeslice = 0; 2139 } 2140 2141 bdev->internal.qos = new_qos; 2142 2143 if (old_qos->thread == NULL) { 2144 free(old_qos); 2145 } else { 2146 spdk_thread_send_msg(old_qos->thread, spdk_bdev_qos_channel_destroy, 2147 old_qos); 2148 } 2149 2150 /* It is safe to continue with destroying the bdev even though the QoS channel hasn't 2151 * been destroyed yet. The destruction path will end up waiting for the final 2152 * channel to be put before it releases resources. */ 2153 2154 return 0; 2155 } 2156 2157 static void 2158 _spdk_bdev_io_stat_add(struct spdk_bdev_io_stat *total, struct spdk_bdev_io_stat *add) 2159 { 2160 total->bytes_read += add->bytes_read; 2161 total->num_read_ops += add->num_read_ops; 2162 total->bytes_written += add->bytes_written; 2163 total->num_write_ops += add->num_write_ops; 2164 total->bytes_unmapped += add->bytes_unmapped; 2165 total->num_unmap_ops += add->num_unmap_ops; 2166 total->read_latency_ticks += add->read_latency_ticks; 2167 total->write_latency_ticks += add->write_latency_ticks; 2168 total->unmap_latency_ticks += add->unmap_latency_ticks; 2169 } 2170 2171 static void 2172 spdk_bdev_channel_destroy(void *io_device, void *ctx_buf) 2173 { 2174 struct spdk_bdev_channel *ch = ctx_buf; 2175 struct spdk_bdev_mgmt_channel *mgmt_ch; 2176 struct spdk_bdev_shared_resource *shared_resource = ch->shared_resource; 2177 2178 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Destroying channel %p for bdev %s on thread %p\n", ch, ch->bdev->name, 2179 spdk_get_thread()); 2180 2181 /* This channel is going away, so add its statistics into the bdev so that they don't get lost. */ 2182 pthread_mutex_lock(&ch->bdev->internal.mutex); 2183 _spdk_bdev_io_stat_add(&ch->bdev->internal.stat, &ch->stat); 2184 pthread_mutex_unlock(&ch->bdev->internal.mutex); 2185 2186 mgmt_ch = shared_resource->mgmt_ch; 2187 2188 _spdk_bdev_abort_queued_io(&ch->queued_resets, ch); 2189 _spdk_bdev_abort_queued_io(&shared_resource->nomem_io, ch); 2190 _spdk_bdev_abort_buf_io(&mgmt_ch->need_buf_small, ch); 2191 _spdk_bdev_abort_buf_io(&mgmt_ch->need_buf_large, ch); 2192 2193 if (ch->histogram) { 2194 spdk_histogram_data_free(ch->histogram); 2195 } 2196 2197 _spdk_bdev_channel_destroy_resource(ch); 2198 } 2199 2200 int 2201 spdk_bdev_alias_add(struct spdk_bdev *bdev, const char *alias) 2202 { 2203 struct spdk_bdev_alias *tmp; 2204 2205 if (alias == NULL) { 2206 SPDK_ERRLOG("Empty alias passed\n"); 2207 return -EINVAL; 2208 } 2209 2210 if (spdk_bdev_get_by_name(alias)) { 2211 SPDK_ERRLOG("Bdev name/alias: %s already exists\n", alias); 2212 return -EEXIST; 2213 } 2214 2215 tmp = calloc(1, sizeof(*tmp)); 2216 if (tmp == NULL) { 2217 SPDK_ERRLOG("Unable to allocate alias\n"); 2218 return -ENOMEM; 2219 } 2220 2221 tmp->alias = strdup(alias); 2222 if (tmp->alias == NULL) { 2223 free(tmp); 2224 SPDK_ERRLOG("Unable to allocate alias\n"); 2225 return -ENOMEM; 2226 } 2227 2228 TAILQ_INSERT_TAIL(&bdev->aliases, tmp, tailq); 2229 2230 return 0; 2231 } 2232 2233 int 2234 spdk_bdev_alias_del(struct spdk_bdev *bdev, const char *alias) 2235 { 2236 struct spdk_bdev_alias *tmp; 2237 2238 TAILQ_FOREACH(tmp, &bdev->aliases, tailq) { 2239 if (strcmp(alias, tmp->alias) == 0) { 2240 TAILQ_REMOVE(&bdev->aliases, tmp, tailq); 2241 free(tmp->alias); 2242 free(tmp); 2243 return 0; 2244 } 2245 } 2246 2247 SPDK_INFOLOG(SPDK_LOG_BDEV, "Alias %s does not exists\n", alias); 2248 2249 return -ENOENT; 2250 } 2251 2252 void 2253 spdk_bdev_alias_del_all(struct spdk_bdev *bdev) 2254 { 2255 struct spdk_bdev_alias *p, *tmp; 2256 2257 TAILQ_FOREACH_SAFE(p, &bdev->aliases, tailq, tmp) { 2258 TAILQ_REMOVE(&bdev->aliases, p, tailq); 2259 free(p->alias); 2260 free(p); 2261 } 2262 } 2263 2264 struct spdk_io_channel * 2265 spdk_bdev_get_io_channel(struct spdk_bdev_desc *desc) 2266 { 2267 return spdk_get_io_channel(__bdev_to_io_dev(desc->bdev)); 2268 } 2269 2270 const char * 2271 spdk_bdev_get_name(const struct spdk_bdev *bdev) 2272 { 2273 return bdev->name; 2274 } 2275 2276 const char * 2277 spdk_bdev_get_product_name(const struct spdk_bdev *bdev) 2278 { 2279 return bdev->product_name; 2280 } 2281 2282 const struct spdk_bdev_aliases_list * 2283 spdk_bdev_get_aliases(const struct spdk_bdev *bdev) 2284 { 2285 return &bdev->aliases; 2286 } 2287 2288 uint32_t 2289 spdk_bdev_get_block_size(const struct spdk_bdev *bdev) 2290 { 2291 return bdev->blocklen; 2292 } 2293 2294 uint64_t 2295 spdk_bdev_get_num_blocks(const struct spdk_bdev *bdev) 2296 { 2297 return bdev->blockcnt; 2298 } 2299 2300 const char * 2301 spdk_bdev_get_qos_rpc_type(enum spdk_bdev_qos_rate_limit_type type) 2302 { 2303 return qos_rpc_type[type]; 2304 } 2305 2306 void 2307 spdk_bdev_get_qos_rate_limits(struct spdk_bdev *bdev, uint64_t *limits) 2308 { 2309 int i; 2310 2311 memset(limits, 0, sizeof(*limits) * SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES); 2312 2313 pthread_mutex_lock(&bdev->internal.mutex); 2314 if (bdev->internal.qos) { 2315 for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) { 2316 if (bdev->internal.qos->rate_limits[i].limit != 2317 SPDK_BDEV_QOS_LIMIT_NOT_DEFINED) { 2318 limits[i] = bdev->internal.qos->rate_limits[i].limit; 2319 if (_spdk_bdev_qos_is_iops_rate_limit(i) == false) { 2320 /* Change from Byte to Megabyte which is user visible. */ 2321 limits[i] = limits[i] / 1024 / 1024; 2322 } 2323 } 2324 } 2325 } 2326 pthread_mutex_unlock(&bdev->internal.mutex); 2327 } 2328 2329 size_t 2330 spdk_bdev_get_buf_align(const struct spdk_bdev *bdev) 2331 { 2332 return 1 << bdev->required_alignment; 2333 } 2334 2335 uint32_t 2336 spdk_bdev_get_optimal_io_boundary(const struct spdk_bdev *bdev) 2337 { 2338 return bdev->optimal_io_boundary; 2339 } 2340 2341 bool 2342 spdk_bdev_has_write_cache(const struct spdk_bdev *bdev) 2343 { 2344 return bdev->write_cache; 2345 } 2346 2347 const struct spdk_uuid * 2348 spdk_bdev_get_uuid(const struct spdk_bdev *bdev) 2349 { 2350 return &bdev->uuid; 2351 } 2352 2353 uint32_t 2354 spdk_bdev_get_md_size(const struct spdk_bdev *bdev) 2355 { 2356 return bdev->md_len; 2357 } 2358 2359 bool 2360 spdk_bdev_is_md_interleaved(const struct spdk_bdev *bdev) 2361 { 2362 return (bdev->md_len != 0) && bdev->md_interleave; 2363 } 2364 2365 uint32_t 2366 spdk_bdev_get_data_block_size(const struct spdk_bdev *bdev) 2367 { 2368 if (spdk_bdev_is_md_interleaved(bdev)) { 2369 return bdev->blocklen - bdev->md_len; 2370 } else { 2371 return bdev->blocklen; 2372 } 2373 } 2374 2375 enum spdk_dif_type spdk_bdev_get_dif_type(const struct spdk_bdev *bdev) 2376 { 2377 if (bdev->md_len != 0) { 2378 return bdev->dif_type; 2379 } else { 2380 return SPDK_DIF_DISABLE; 2381 } 2382 } 2383 2384 bool 2385 spdk_bdev_is_dif_head_of_md(const struct spdk_bdev *bdev) 2386 { 2387 if (spdk_bdev_get_dif_type(bdev) != SPDK_DIF_DISABLE) { 2388 return bdev->dif_is_head_of_md; 2389 } else { 2390 return false; 2391 } 2392 } 2393 2394 bool 2395 spdk_bdev_is_dif_check_enabled(const struct spdk_bdev *bdev, 2396 enum spdk_dif_check_type check_type) 2397 { 2398 if (spdk_bdev_get_dif_type(bdev) == SPDK_DIF_DISABLE) { 2399 return false; 2400 } 2401 2402 switch (check_type) { 2403 case SPDK_DIF_CHECK_TYPE_REFTAG: 2404 return (bdev->dif_check_flags & SPDK_DIF_FLAGS_REFTAG_CHECK) != 0; 2405 case SPDK_DIF_CHECK_TYPE_APPTAG: 2406 return (bdev->dif_check_flags & SPDK_DIF_FLAGS_APPTAG_CHECK) != 0; 2407 case SPDK_DIF_CHECK_TYPE_GUARD: 2408 return (bdev->dif_check_flags & SPDK_DIF_FLAGS_GUARD_CHECK) != 0; 2409 default: 2410 return false; 2411 } 2412 } 2413 2414 uint64_t 2415 spdk_bdev_get_qd(const struct spdk_bdev *bdev) 2416 { 2417 return bdev->internal.measured_queue_depth; 2418 } 2419 2420 uint64_t 2421 spdk_bdev_get_qd_sampling_period(const struct spdk_bdev *bdev) 2422 { 2423 return bdev->internal.period; 2424 } 2425 2426 uint64_t 2427 spdk_bdev_get_weighted_io_time(const struct spdk_bdev *bdev) 2428 { 2429 return bdev->internal.weighted_io_time; 2430 } 2431 2432 uint64_t 2433 spdk_bdev_get_io_time(const struct spdk_bdev *bdev) 2434 { 2435 return bdev->internal.io_time; 2436 } 2437 2438 static void 2439 _calculate_measured_qd_cpl(struct spdk_io_channel_iter *i, int status) 2440 { 2441 struct spdk_bdev *bdev = spdk_io_channel_iter_get_ctx(i); 2442 2443 bdev->internal.measured_queue_depth = bdev->internal.temporary_queue_depth; 2444 2445 if (bdev->internal.measured_queue_depth) { 2446 bdev->internal.io_time += bdev->internal.period; 2447 bdev->internal.weighted_io_time += bdev->internal.period * bdev->internal.measured_queue_depth; 2448 } 2449 } 2450 2451 static void 2452 _calculate_measured_qd(struct spdk_io_channel_iter *i) 2453 { 2454 struct spdk_bdev *bdev = spdk_io_channel_iter_get_ctx(i); 2455 struct spdk_io_channel *io_ch = spdk_io_channel_iter_get_channel(i); 2456 struct spdk_bdev_channel *ch = spdk_io_channel_get_ctx(io_ch); 2457 2458 bdev->internal.temporary_queue_depth += ch->io_outstanding; 2459 spdk_for_each_channel_continue(i, 0); 2460 } 2461 2462 static int 2463 spdk_bdev_calculate_measured_queue_depth(void *ctx) 2464 { 2465 struct spdk_bdev *bdev = ctx; 2466 bdev->internal.temporary_queue_depth = 0; 2467 spdk_for_each_channel(__bdev_to_io_dev(bdev), _calculate_measured_qd, bdev, 2468 _calculate_measured_qd_cpl); 2469 return 0; 2470 } 2471 2472 void 2473 spdk_bdev_set_qd_sampling_period(struct spdk_bdev *bdev, uint64_t period) 2474 { 2475 bdev->internal.period = period; 2476 2477 if (bdev->internal.qd_poller != NULL) { 2478 spdk_poller_unregister(&bdev->internal.qd_poller); 2479 bdev->internal.measured_queue_depth = UINT64_MAX; 2480 } 2481 2482 if (period != 0) { 2483 bdev->internal.qd_poller = spdk_poller_register(spdk_bdev_calculate_measured_queue_depth, bdev, 2484 period); 2485 } 2486 } 2487 2488 int 2489 spdk_bdev_notify_blockcnt_change(struct spdk_bdev *bdev, uint64_t size) 2490 { 2491 int ret; 2492 2493 pthread_mutex_lock(&bdev->internal.mutex); 2494 2495 /* bdev has open descriptors */ 2496 if (!TAILQ_EMPTY(&bdev->internal.open_descs) && 2497 bdev->blockcnt > size) { 2498 ret = -EBUSY; 2499 } else { 2500 bdev->blockcnt = size; 2501 ret = 0; 2502 } 2503 2504 pthread_mutex_unlock(&bdev->internal.mutex); 2505 2506 return ret; 2507 } 2508 2509 /* 2510 * Convert I/O offset and length from bytes to blocks. 2511 * 2512 * Returns zero on success or non-zero if the byte parameters aren't divisible by the block size. 2513 */ 2514 static uint64_t 2515 spdk_bdev_bytes_to_blocks(struct spdk_bdev *bdev, uint64_t offset_bytes, uint64_t *offset_blocks, 2516 uint64_t num_bytes, uint64_t *num_blocks) 2517 { 2518 uint32_t block_size = bdev->blocklen; 2519 uint8_t shift_cnt; 2520 2521 /* Avoid expensive div operations if possible. These spdk_u32 functions are very cheap. */ 2522 if (spdk_likely(spdk_u32_is_pow2(block_size))) { 2523 shift_cnt = spdk_u32log2(block_size); 2524 *offset_blocks = offset_bytes >> shift_cnt; 2525 *num_blocks = num_bytes >> shift_cnt; 2526 return (offset_bytes - (*offset_blocks << shift_cnt)) | 2527 (num_bytes - (*num_blocks << shift_cnt)); 2528 } else { 2529 *offset_blocks = offset_bytes / block_size; 2530 *num_blocks = num_bytes / block_size; 2531 return (offset_bytes % block_size) | (num_bytes % block_size); 2532 } 2533 } 2534 2535 static bool 2536 spdk_bdev_io_valid_blocks(struct spdk_bdev *bdev, uint64_t offset_blocks, uint64_t num_blocks) 2537 { 2538 /* Return failure if offset_blocks + num_blocks is less than offset_blocks; indicates there 2539 * has been an overflow and hence the offset has been wrapped around */ 2540 if (offset_blocks + num_blocks < offset_blocks) { 2541 return false; 2542 } 2543 2544 /* Return failure if offset_blocks + num_blocks exceeds the size of the bdev */ 2545 if (offset_blocks + num_blocks > bdev->blockcnt) { 2546 return false; 2547 } 2548 2549 return true; 2550 } 2551 2552 int 2553 spdk_bdev_read(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 2554 void *buf, uint64_t offset, uint64_t nbytes, 2555 spdk_bdev_io_completion_cb cb, void *cb_arg) 2556 { 2557 uint64_t offset_blocks, num_blocks; 2558 2559 if (spdk_bdev_bytes_to_blocks(desc->bdev, offset, &offset_blocks, nbytes, &num_blocks) != 0) { 2560 return -EINVAL; 2561 } 2562 2563 return spdk_bdev_read_blocks(desc, ch, buf, offset_blocks, num_blocks, cb, cb_arg); 2564 } 2565 2566 int 2567 spdk_bdev_read_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 2568 void *buf, uint64_t offset_blocks, uint64_t num_blocks, 2569 spdk_bdev_io_completion_cb cb, void *cb_arg) 2570 { 2571 struct spdk_bdev *bdev = desc->bdev; 2572 struct spdk_bdev_io *bdev_io; 2573 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 2574 2575 if (!spdk_bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) { 2576 return -EINVAL; 2577 } 2578 2579 bdev_io = spdk_bdev_get_io(channel); 2580 if (!bdev_io) { 2581 return -ENOMEM; 2582 } 2583 2584 bdev_io->internal.ch = channel; 2585 bdev_io->internal.desc = desc; 2586 bdev_io->type = SPDK_BDEV_IO_TYPE_READ; 2587 bdev_io->u.bdev.iovs = &bdev_io->iov; 2588 bdev_io->u.bdev.iovs[0].iov_base = buf; 2589 bdev_io->u.bdev.iovs[0].iov_len = num_blocks * bdev->blocklen; 2590 bdev_io->u.bdev.iovcnt = 1; 2591 bdev_io->u.bdev.num_blocks = num_blocks; 2592 bdev_io->u.bdev.offset_blocks = offset_blocks; 2593 spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb); 2594 2595 spdk_bdev_io_submit(bdev_io); 2596 return 0; 2597 } 2598 2599 int 2600 spdk_bdev_readv(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 2601 struct iovec *iov, int iovcnt, 2602 uint64_t offset, uint64_t nbytes, 2603 spdk_bdev_io_completion_cb cb, void *cb_arg) 2604 { 2605 uint64_t offset_blocks, num_blocks; 2606 2607 if (spdk_bdev_bytes_to_blocks(desc->bdev, offset, &offset_blocks, nbytes, &num_blocks) != 0) { 2608 return -EINVAL; 2609 } 2610 2611 return spdk_bdev_readv_blocks(desc, ch, iov, iovcnt, offset_blocks, num_blocks, cb, cb_arg); 2612 } 2613 2614 int spdk_bdev_readv_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 2615 struct iovec *iov, int iovcnt, 2616 uint64_t offset_blocks, uint64_t num_blocks, 2617 spdk_bdev_io_completion_cb cb, void *cb_arg) 2618 { 2619 struct spdk_bdev *bdev = desc->bdev; 2620 struct spdk_bdev_io *bdev_io; 2621 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 2622 2623 if (!spdk_bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) { 2624 return -EINVAL; 2625 } 2626 2627 bdev_io = spdk_bdev_get_io(channel); 2628 if (!bdev_io) { 2629 return -ENOMEM; 2630 } 2631 2632 bdev_io->internal.ch = channel; 2633 bdev_io->internal.desc = desc; 2634 bdev_io->type = SPDK_BDEV_IO_TYPE_READ; 2635 bdev_io->u.bdev.iovs = iov; 2636 bdev_io->u.bdev.iovcnt = iovcnt; 2637 bdev_io->u.bdev.num_blocks = num_blocks; 2638 bdev_io->u.bdev.offset_blocks = offset_blocks; 2639 spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb); 2640 2641 spdk_bdev_io_submit(bdev_io); 2642 return 0; 2643 } 2644 2645 int 2646 spdk_bdev_write(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 2647 void *buf, uint64_t offset, uint64_t nbytes, 2648 spdk_bdev_io_completion_cb cb, void *cb_arg) 2649 { 2650 uint64_t offset_blocks, num_blocks; 2651 2652 if (spdk_bdev_bytes_to_blocks(desc->bdev, offset, &offset_blocks, nbytes, &num_blocks) != 0) { 2653 return -EINVAL; 2654 } 2655 2656 return spdk_bdev_write_blocks(desc, ch, buf, offset_blocks, num_blocks, cb, cb_arg); 2657 } 2658 2659 int 2660 spdk_bdev_write_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 2661 void *buf, uint64_t offset_blocks, uint64_t num_blocks, 2662 spdk_bdev_io_completion_cb cb, void *cb_arg) 2663 { 2664 struct spdk_bdev *bdev = desc->bdev; 2665 struct spdk_bdev_io *bdev_io; 2666 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 2667 2668 if (!desc->write) { 2669 return -EBADF; 2670 } 2671 2672 if (!spdk_bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) { 2673 return -EINVAL; 2674 } 2675 2676 bdev_io = spdk_bdev_get_io(channel); 2677 if (!bdev_io) { 2678 return -ENOMEM; 2679 } 2680 2681 bdev_io->internal.ch = channel; 2682 bdev_io->internal.desc = desc; 2683 bdev_io->type = SPDK_BDEV_IO_TYPE_WRITE; 2684 bdev_io->u.bdev.iovs = &bdev_io->iov; 2685 bdev_io->u.bdev.iovs[0].iov_base = buf; 2686 bdev_io->u.bdev.iovs[0].iov_len = num_blocks * bdev->blocklen; 2687 bdev_io->u.bdev.iovcnt = 1; 2688 bdev_io->u.bdev.num_blocks = num_blocks; 2689 bdev_io->u.bdev.offset_blocks = offset_blocks; 2690 spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb); 2691 2692 spdk_bdev_io_submit(bdev_io); 2693 return 0; 2694 } 2695 2696 int 2697 spdk_bdev_writev(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 2698 struct iovec *iov, int iovcnt, 2699 uint64_t offset, uint64_t len, 2700 spdk_bdev_io_completion_cb cb, void *cb_arg) 2701 { 2702 uint64_t offset_blocks, num_blocks; 2703 2704 if (spdk_bdev_bytes_to_blocks(desc->bdev, offset, &offset_blocks, len, &num_blocks) != 0) { 2705 return -EINVAL; 2706 } 2707 2708 return spdk_bdev_writev_blocks(desc, ch, iov, iovcnt, offset_blocks, num_blocks, cb, cb_arg); 2709 } 2710 2711 int 2712 spdk_bdev_writev_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 2713 struct iovec *iov, int iovcnt, 2714 uint64_t offset_blocks, uint64_t num_blocks, 2715 spdk_bdev_io_completion_cb cb, void *cb_arg) 2716 { 2717 struct spdk_bdev *bdev = desc->bdev; 2718 struct spdk_bdev_io *bdev_io; 2719 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 2720 2721 if (!desc->write) { 2722 return -EBADF; 2723 } 2724 2725 if (!spdk_bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) { 2726 return -EINVAL; 2727 } 2728 2729 bdev_io = spdk_bdev_get_io(channel); 2730 if (!bdev_io) { 2731 return -ENOMEM; 2732 } 2733 2734 bdev_io->internal.ch = channel; 2735 bdev_io->internal.desc = desc; 2736 bdev_io->type = SPDK_BDEV_IO_TYPE_WRITE; 2737 bdev_io->u.bdev.iovs = iov; 2738 bdev_io->u.bdev.iovcnt = iovcnt; 2739 bdev_io->u.bdev.num_blocks = num_blocks; 2740 bdev_io->u.bdev.offset_blocks = offset_blocks; 2741 spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb); 2742 2743 spdk_bdev_io_submit(bdev_io); 2744 return 0; 2745 } 2746 2747 int 2748 spdk_bdev_write_zeroes(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 2749 uint64_t offset, uint64_t len, 2750 spdk_bdev_io_completion_cb cb, void *cb_arg) 2751 { 2752 uint64_t offset_blocks, num_blocks; 2753 2754 if (spdk_bdev_bytes_to_blocks(desc->bdev, offset, &offset_blocks, len, &num_blocks) != 0) { 2755 return -EINVAL; 2756 } 2757 2758 return spdk_bdev_write_zeroes_blocks(desc, ch, offset_blocks, num_blocks, cb, cb_arg); 2759 } 2760 2761 int 2762 spdk_bdev_write_zeroes_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 2763 uint64_t offset_blocks, uint64_t num_blocks, 2764 spdk_bdev_io_completion_cb cb, void *cb_arg) 2765 { 2766 struct spdk_bdev *bdev = desc->bdev; 2767 struct spdk_bdev_io *bdev_io; 2768 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 2769 2770 if (!desc->write) { 2771 return -EBADF; 2772 } 2773 2774 if (!spdk_bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) { 2775 return -EINVAL; 2776 } 2777 2778 bdev_io = spdk_bdev_get_io(channel); 2779 2780 if (!bdev_io) { 2781 return -ENOMEM; 2782 } 2783 2784 bdev_io->type = SPDK_BDEV_IO_TYPE_WRITE_ZEROES; 2785 bdev_io->internal.ch = channel; 2786 bdev_io->internal.desc = desc; 2787 bdev_io->u.bdev.offset_blocks = offset_blocks; 2788 bdev_io->u.bdev.num_blocks = num_blocks; 2789 spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb); 2790 2791 if (_spdk_bdev_io_type_supported(bdev, SPDK_BDEV_IO_TYPE_WRITE_ZEROES)) { 2792 spdk_bdev_io_submit(bdev_io); 2793 return 0; 2794 } else if (_spdk_bdev_io_type_supported(bdev, SPDK_BDEV_IO_TYPE_WRITE)) { 2795 assert(spdk_bdev_get_block_size(bdev) <= ZERO_BUFFER_SIZE); 2796 bdev_io->u.bdev.split_remaining_num_blocks = num_blocks; 2797 bdev_io->u.bdev.split_current_offset_blocks = offset_blocks; 2798 _spdk_bdev_write_zero_buffer_next(bdev_io); 2799 return 0; 2800 } else { 2801 spdk_bdev_free_io(bdev_io); 2802 return -ENOTSUP; 2803 } 2804 } 2805 2806 int 2807 spdk_bdev_unmap(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 2808 uint64_t offset, uint64_t nbytes, 2809 spdk_bdev_io_completion_cb cb, void *cb_arg) 2810 { 2811 uint64_t offset_blocks, num_blocks; 2812 2813 if (spdk_bdev_bytes_to_blocks(desc->bdev, offset, &offset_blocks, nbytes, &num_blocks) != 0) { 2814 return -EINVAL; 2815 } 2816 2817 return spdk_bdev_unmap_blocks(desc, ch, offset_blocks, num_blocks, cb, cb_arg); 2818 } 2819 2820 int 2821 spdk_bdev_unmap_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 2822 uint64_t offset_blocks, uint64_t num_blocks, 2823 spdk_bdev_io_completion_cb cb, void *cb_arg) 2824 { 2825 struct spdk_bdev *bdev = desc->bdev; 2826 struct spdk_bdev_io *bdev_io; 2827 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 2828 2829 if (!desc->write) { 2830 return -EBADF; 2831 } 2832 2833 if (!spdk_bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) { 2834 return -EINVAL; 2835 } 2836 2837 if (num_blocks == 0) { 2838 SPDK_ERRLOG("Can't unmap 0 bytes\n"); 2839 return -EINVAL; 2840 } 2841 2842 bdev_io = spdk_bdev_get_io(channel); 2843 if (!bdev_io) { 2844 return -ENOMEM; 2845 } 2846 2847 bdev_io->internal.ch = channel; 2848 bdev_io->internal.desc = desc; 2849 bdev_io->type = SPDK_BDEV_IO_TYPE_UNMAP; 2850 2851 bdev_io->u.bdev.iovs = &bdev_io->iov; 2852 bdev_io->u.bdev.iovs[0].iov_base = NULL; 2853 bdev_io->u.bdev.iovs[0].iov_len = 0; 2854 bdev_io->u.bdev.iovcnt = 1; 2855 2856 bdev_io->u.bdev.offset_blocks = offset_blocks; 2857 bdev_io->u.bdev.num_blocks = num_blocks; 2858 spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb); 2859 2860 spdk_bdev_io_submit(bdev_io); 2861 return 0; 2862 } 2863 2864 int 2865 spdk_bdev_flush(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 2866 uint64_t offset, uint64_t length, 2867 spdk_bdev_io_completion_cb cb, void *cb_arg) 2868 { 2869 uint64_t offset_blocks, num_blocks; 2870 2871 if (spdk_bdev_bytes_to_blocks(desc->bdev, offset, &offset_blocks, length, &num_blocks) != 0) { 2872 return -EINVAL; 2873 } 2874 2875 return spdk_bdev_flush_blocks(desc, ch, offset_blocks, num_blocks, cb, cb_arg); 2876 } 2877 2878 int 2879 spdk_bdev_flush_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 2880 uint64_t offset_blocks, uint64_t num_blocks, 2881 spdk_bdev_io_completion_cb cb, void *cb_arg) 2882 { 2883 struct spdk_bdev *bdev = desc->bdev; 2884 struct spdk_bdev_io *bdev_io; 2885 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 2886 2887 if (!desc->write) { 2888 return -EBADF; 2889 } 2890 2891 if (!spdk_bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) { 2892 return -EINVAL; 2893 } 2894 2895 bdev_io = spdk_bdev_get_io(channel); 2896 if (!bdev_io) { 2897 return -ENOMEM; 2898 } 2899 2900 bdev_io->internal.ch = channel; 2901 bdev_io->internal.desc = desc; 2902 bdev_io->type = SPDK_BDEV_IO_TYPE_FLUSH; 2903 bdev_io->u.bdev.iovs = NULL; 2904 bdev_io->u.bdev.iovcnt = 0; 2905 bdev_io->u.bdev.offset_blocks = offset_blocks; 2906 bdev_io->u.bdev.num_blocks = num_blocks; 2907 spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb); 2908 2909 spdk_bdev_io_submit(bdev_io); 2910 return 0; 2911 } 2912 2913 static void 2914 _spdk_bdev_reset_dev(struct spdk_io_channel_iter *i, int status) 2915 { 2916 struct spdk_bdev_channel *ch = spdk_io_channel_iter_get_ctx(i); 2917 struct spdk_bdev_io *bdev_io; 2918 2919 bdev_io = TAILQ_FIRST(&ch->queued_resets); 2920 TAILQ_REMOVE(&ch->queued_resets, bdev_io, internal.link); 2921 spdk_bdev_io_submit_reset(bdev_io); 2922 } 2923 2924 static void 2925 _spdk_bdev_reset_freeze_channel(struct spdk_io_channel_iter *i) 2926 { 2927 struct spdk_io_channel *ch; 2928 struct spdk_bdev_channel *channel; 2929 struct spdk_bdev_mgmt_channel *mgmt_channel; 2930 struct spdk_bdev_shared_resource *shared_resource; 2931 bdev_io_tailq_t tmp_queued; 2932 2933 TAILQ_INIT(&tmp_queued); 2934 2935 ch = spdk_io_channel_iter_get_channel(i); 2936 channel = spdk_io_channel_get_ctx(ch); 2937 shared_resource = channel->shared_resource; 2938 mgmt_channel = shared_resource->mgmt_ch; 2939 2940 channel->flags |= BDEV_CH_RESET_IN_PROGRESS; 2941 2942 if ((channel->flags & BDEV_CH_QOS_ENABLED) != 0) { 2943 /* The QoS object is always valid and readable while 2944 * the channel flag is set, so the lock here should not 2945 * be necessary. We're not in the fast path though, so 2946 * just take it anyway. */ 2947 pthread_mutex_lock(&channel->bdev->internal.mutex); 2948 if (channel->bdev->internal.qos->ch == channel) { 2949 TAILQ_SWAP(&channel->bdev->internal.qos->queued, &tmp_queued, spdk_bdev_io, internal.link); 2950 } 2951 pthread_mutex_unlock(&channel->bdev->internal.mutex); 2952 } 2953 2954 _spdk_bdev_abort_queued_io(&shared_resource->nomem_io, channel); 2955 _spdk_bdev_abort_buf_io(&mgmt_channel->need_buf_small, channel); 2956 _spdk_bdev_abort_buf_io(&mgmt_channel->need_buf_large, channel); 2957 _spdk_bdev_abort_queued_io(&tmp_queued, channel); 2958 2959 spdk_for_each_channel_continue(i, 0); 2960 } 2961 2962 static void 2963 _spdk_bdev_start_reset(void *ctx) 2964 { 2965 struct spdk_bdev_channel *ch = ctx; 2966 2967 spdk_for_each_channel(__bdev_to_io_dev(ch->bdev), _spdk_bdev_reset_freeze_channel, 2968 ch, _spdk_bdev_reset_dev); 2969 } 2970 2971 static void 2972 _spdk_bdev_channel_start_reset(struct spdk_bdev_channel *ch) 2973 { 2974 struct spdk_bdev *bdev = ch->bdev; 2975 2976 assert(!TAILQ_EMPTY(&ch->queued_resets)); 2977 2978 pthread_mutex_lock(&bdev->internal.mutex); 2979 if (bdev->internal.reset_in_progress == NULL) { 2980 bdev->internal.reset_in_progress = TAILQ_FIRST(&ch->queued_resets); 2981 /* 2982 * Take a channel reference for the target bdev for the life of this 2983 * reset. This guards against the channel getting destroyed while 2984 * spdk_for_each_channel() calls related to this reset IO are in 2985 * progress. We will release the reference when this reset is 2986 * completed. 2987 */ 2988 bdev->internal.reset_in_progress->u.reset.ch_ref = spdk_get_io_channel(__bdev_to_io_dev(bdev)); 2989 _spdk_bdev_start_reset(ch); 2990 } 2991 pthread_mutex_unlock(&bdev->internal.mutex); 2992 } 2993 2994 int 2995 spdk_bdev_reset(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 2996 spdk_bdev_io_completion_cb cb, void *cb_arg) 2997 { 2998 struct spdk_bdev *bdev = desc->bdev; 2999 struct spdk_bdev_io *bdev_io; 3000 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 3001 3002 bdev_io = spdk_bdev_get_io(channel); 3003 if (!bdev_io) { 3004 return -ENOMEM; 3005 } 3006 3007 bdev_io->internal.ch = channel; 3008 bdev_io->internal.desc = desc; 3009 bdev_io->type = SPDK_BDEV_IO_TYPE_RESET; 3010 bdev_io->u.reset.ch_ref = NULL; 3011 spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb); 3012 3013 pthread_mutex_lock(&bdev->internal.mutex); 3014 TAILQ_INSERT_TAIL(&channel->queued_resets, bdev_io, internal.link); 3015 pthread_mutex_unlock(&bdev->internal.mutex); 3016 3017 _spdk_bdev_channel_start_reset(channel); 3018 3019 return 0; 3020 } 3021 3022 void 3023 spdk_bdev_get_io_stat(struct spdk_bdev *bdev, struct spdk_io_channel *ch, 3024 struct spdk_bdev_io_stat *stat) 3025 { 3026 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 3027 3028 *stat = channel->stat; 3029 } 3030 3031 static void 3032 _spdk_bdev_get_device_stat_done(struct spdk_io_channel_iter *i, int status) 3033 { 3034 void *io_device = spdk_io_channel_iter_get_io_device(i); 3035 struct spdk_bdev_iostat_ctx *bdev_iostat_ctx = spdk_io_channel_iter_get_ctx(i); 3036 3037 bdev_iostat_ctx->cb(__bdev_from_io_dev(io_device), bdev_iostat_ctx->stat, 3038 bdev_iostat_ctx->cb_arg, 0); 3039 free(bdev_iostat_ctx); 3040 } 3041 3042 static void 3043 _spdk_bdev_get_each_channel_stat(struct spdk_io_channel_iter *i) 3044 { 3045 struct spdk_bdev_iostat_ctx *bdev_iostat_ctx = spdk_io_channel_iter_get_ctx(i); 3046 struct spdk_io_channel *ch = spdk_io_channel_iter_get_channel(i); 3047 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 3048 3049 _spdk_bdev_io_stat_add(bdev_iostat_ctx->stat, &channel->stat); 3050 spdk_for_each_channel_continue(i, 0); 3051 } 3052 3053 void 3054 spdk_bdev_get_device_stat(struct spdk_bdev *bdev, struct spdk_bdev_io_stat *stat, 3055 spdk_bdev_get_device_stat_cb cb, void *cb_arg) 3056 { 3057 struct spdk_bdev_iostat_ctx *bdev_iostat_ctx; 3058 3059 assert(bdev != NULL); 3060 assert(stat != NULL); 3061 assert(cb != NULL); 3062 3063 bdev_iostat_ctx = calloc(1, sizeof(struct spdk_bdev_iostat_ctx)); 3064 if (bdev_iostat_ctx == NULL) { 3065 SPDK_ERRLOG("Unable to allocate memory for spdk_bdev_iostat_ctx\n"); 3066 cb(bdev, stat, cb_arg, -ENOMEM); 3067 return; 3068 } 3069 3070 bdev_iostat_ctx->stat = stat; 3071 bdev_iostat_ctx->cb = cb; 3072 bdev_iostat_ctx->cb_arg = cb_arg; 3073 3074 /* Start with the statistics from previously deleted channels. */ 3075 pthread_mutex_lock(&bdev->internal.mutex); 3076 _spdk_bdev_io_stat_add(bdev_iostat_ctx->stat, &bdev->internal.stat); 3077 pthread_mutex_unlock(&bdev->internal.mutex); 3078 3079 /* Then iterate and add the statistics from each existing channel. */ 3080 spdk_for_each_channel(__bdev_to_io_dev(bdev), 3081 _spdk_bdev_get_each_channel_stat, 3082 bdev_iostat_ctx, 3083 _spdk_bdev_get_device_stat_done); 3084 } 3085 3086 int 3087 spdk_bdev_nvme_admin_passthru(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 3088 const struct spdk_nvme_cmd *cmd, void *buf, size_t nbytes, 3089 spdk_bdev_io_completion_cb cb, void *cb_arg) 3090 { 3091 struct spdk_bdev *bdev = desc->bdev; 3092 struct spdk_bdev_io *bdev_io; 3093 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 3094 3095 if (!desc->write) { 3096 return -EBADF; 3097 } 3098 3099 bdev_io = spdk_bdev_get_io(channel); 3100 if (!bdev_io) { 3101 return -ENOMEM; 3102 } 3103 3104 bdev_io->internal.ch = channel; 3105 bdev_io->internal.desc = desc; 3106 bdev_io->type = SPDK_BDEV_IO_TYPE_NVME_ADMIN; 3107 bdev_io->u.nvme_passthru.cmd = *cmd; 3108 bdev_io->u.nvme_passthru.buf = buf; 3109 bdev_io->u.nvme_passthru.nbytes = nbytes; 3110 bdev_io->u.nvme_passthru.md_buf = NULL; 3111 bdev_io->u.nvme_passthru.md_len = 0; 3112 3113 spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb); 3114 3115 spdk_bdev_io_submit(bdev_io); 3116 return 0; 3117 } 3118 3119 int 3120 spdk_bdev_nvme_io_passthru(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 3121 const struct spdk_nvme_cmd *cmd, void *buf, size_t nbytes, 3122 spdk_bdev_io_completion_cb cb, void *cb_arg) 3123 { 3124 struct spdk_bdev *bdev = desc->bdev; 3125 struct spdk_bdev_io *bdev_io; 3126 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 3127 3128 if (!desc->write) { 3129 /* 3130 * Do not try to parse the NVMe command - we could maybe use bits in the opcode 3131 * to easily determine if the command is a read or write, but for now just 3132 * do not allow io_passthru with a read-only descriptor. 3133 */ 3134 return -EBADF; 3135 } 3136 3137 bdev_io = spdk_bdev_get_io(channel); 3138 if (!bdev_io) { 3139 return -ENOMEM; 3140 } 3141 3142 bdev_io->internal.ch = channel; 3143 bdev_io->internal.desc = desc; 3144 bdev_io->type = SPDK_BDEV_IO_TYPE_NVME_IO; 3145 bdev_io->u.nvme_passthru.cmd = *cmd; 3146 bdev_io->u.nvme_passthru.buf = buf; 3147 bdev_io->u.nvme_passthru.nbytes = nbytes; 3148 bdev_io->u.nvme_passthru.md_buf = NULL; 3149 bdev_io->u.nvme_passthru.md_len = 0; 3150 3151 spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb); 3152 3153 spdk_bdev_io_submit(bdev_io); 3154 return 0; 3155 } 3156 3157 int 3158 spdk_bdev_nvme_io_passthru_md(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 3159 const struct spdk_nvme_cmd *cmd, void *buf, size_t nbytes, void *md_buf, size_t md_len, 3160 spdk_bdev_io_completion_cb cb, void *cb_arg) 3161 { 3162 struct spdk_bdev *bdev = desc->bdev; 3163 struct spdk_bdev_io *bdev_io; 3164 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 3165 3166 if (!desc->write) { 3167 /* 3168 * Do not try to parse the NVMe command - we could maybe use bits in the opcode 3169 * to easily determine if the command is a read or write, but for now just 3170 * do not allow io_passthru with a read-only descriptor. 3171 */ 3172 return -EBADF; 3173 } 3174 3175 bdev_io = spdk_bdev_get_io(channel); 3176 if (!bdev_io) { 3177 return -ENOMEM; 3178 } 3179 3180 bdev_io->internal.ch = channel; 3181 bdev_io->internal.desc = desc; 3182 bdev_io->type = SPDK_BDEV_IO_TYPE_NVME_IO_MD; 3183 bdev_io->u.nvme_passthru.cmd = *cmd; 3184 bdev_io->u.nvme_passthru.buf = buf; 3185 bdev_io->u.nvme_passthru.nbytes = nbytes; 3186 bdev_io->u.nvme_passthru.md_buf = md_buf; 3187 bdev_io->u.nvme_passthru.md_len = md_len; 3188 3189 spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb); 3190 3191 spdk_bdev_io_submit(bdev_io); 3192 return 0; 3193 } 3194 3195 int 3196 spdk_bdev_queue_io_wait(struct spdk_bdev *bdev, struct spdk_io_channel *ch, 3197 struct spdk_bdev_io_wait_entry *entry) 3198 { 3199 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 3200 struct spdk_bdev_mgmt_channel *mgmt_ch = channel->shared_resource->mgmt_ch; 3201 3202 if (bdev != entry->bdev) { 3203 SPDK_ERRLOG("bdevs do not match\n"); 3204 return -EINVAL; 3205 } 3206 3207 if (mgmt_ch->per_thread_cache_count > 0) { 3208 SPDK_ERRLOG("Cannot queue io_wait if spdk_bdev_io available in per-thread cache\n"); 3209 return -EINVAL; 3210 } 3211 3212 TAILQ_INSERT_TAIL(&mgmt_ch->io_wait_queue, entry, link); 3213 return 0; 3214 } 3215 3216 static void 3217 _spdk_bdev_ch_retry_io(struct spdk_bdev_channel *bdev_ch) 3218 { 3219 struct spdk_bdev *bdev = bdev_ch->bdev; 3220 struct spdk_bdev_shared_resource *shared_resource = bdev_ch->shared_resource; 3221 struct spdk_bdev_io *bdev_io; 3222 3223 if (shared_resource->io_outstanding > shared_resource->nomem_threshold) { 3224 /* 3225 * Allow some more I/O to complete before retrying the nomem_io queue. 3226 * Some drivers (such as nvme) cannot immediately take a new I/O in 3227 * the context of a completion, because the resources for the I/O are 3228 * not released until control returns to the bdev poller. Also, we 3229 * may require several small I/O to complete before a larger I/O 3230 * (that requires splitting) can be submitted. 3231 */ 3232 return; 3233 } 3234 3235 while (!TAILQ_EMPTY(&shared_resource->nomem_io)) { 3236 bdev_io = TAILQ_FIRST(&shared_resource->nomem_io); 3237 TAILQ_REMOVE(&shared_resource->nomem_io, bdev_io, internal.link); 3238 bdev_io->internal.ch->io_outstanding++; 3239 shared_resource->io_outstanding++; 3240 bdev_io->internal.status = SPDK_BDEV_IO_STATUS_PENDING; 3241 bdev->fn_table->submit_request(bdev_io->internal.ch->channel, bdev_io); 3242 if (bdev_io->internal.status == SPDK_BDEV_IO_STATUS_NOMEM) { 3243 break; 3244 } 3245 } 3246 } 3247 3248 static inline void 3249 _spdk_bdev_io_complete(void *ctx) 3250 { 3251 struct spdk_bdev_io *bdev_io = ctx; 3252 uint64_t tsc, tsc_diff; 3253 3254 if (spdk_unlikely(bdev_io->internal.in_submit_request || bdev_io->internal.io_submit_ch)) { 3255 /* 3256 * Send the completion to the thread that originally submitted the I/O, 3257 * which may not be the current thread in the case of QoS. 3258 */ 3259 if (bdev_io->internal.io_submit_ch) { 3260 bdev_io->internal.ch = bdev_io->internal.io_submit_ch; 3261 bdev_io->internal.io_submit_ch = NULL; 3262 } 3263 3264 /* 3265 * Defer completion to avoid potential infinite recursion if the 3266 * user's completion callback issues a new I/O. 3267 */ 3268 spdk_thread_send_msg(spdk_io_channel_get_thread(bdev_io->internal.ch->channel), 3269 _spdk_bdev_io_complete, bdev_io); 3270 return; 3271 } 3272 3273 tsc = spdk_get_ticks(); 3274 tsc_diff = tsc - bdev_io->internal.submit_tsc; 3275 spdk_trace_record_tsc(tsc, TRACE_BDEV_IO_DONE, 0, 0, (uintptr_t)bdev_io, 0); 3276 3277 if (bdev_io->internal.ch->histogram) { 3278 spdk_histogram_data_tally(bdev_io->internal.ch->histogram, tsc_diff); 3279 } 3280 3281 if (bdev_io->internal.status == SPDK_BDEV_IO_STATUS_SUCCESS) { 3282 switch (bdev_io->type) { 3283 case SPDK_BDEV_IO_TYPE_READ: 3284 bdev_io->internal.ch->stat.bytes_read += bdev_io->u.bdev.num_blocks * bdev_io->bdev->blocklen; 3285 bdev_io->internal.ch->stat.num_read_ops++; 3286 bdev_io->internal.ch->stat.read_latency_ticks += tsc_diff; 3287 break; 3288 case SPDK_BDEV_IO_TYPE_WRITE: 3289 bdev_io->internal.ch->stat.bytes_written += bdev_io->u.bdev.num_blocks * bdev_io->bdev->blocklen; 3290 bdev_io->internal.ch->stat.num_write_ops++; 3291 bdev_io->internal.ch->stat.write_latency_ticks += tsc_diff; 3292 break; 3293 case SPDK_BDEV_IO_TYPE_UNMAP: 3294 bdev_io->internal.ch->stat.bytes_unmapped += bdev_io->u.bdev.num_blocks * bdev_io->bdev->blocklen; 3295 bdev_io->internal.ch->stat.num_unmap_ops++; 3296 bdev_io->internal.ch->stat.unmap_latency_ticks += tsc_diff; 3297 default: 3298 break; 3299 } 3300 } 3301 3302 #ifdef SPDK_CONFIG_VTUNE 3303 uint64_t now_tsc = spdk_get_ticks(); 3304 if (now_tsc > (bdev_io->internal.ch->start_tsc + bdev_io->internal.ch->interval_tsc)) { 3305 uint64_t data[5]; 3306 3307 data[0] = bdev_io->internal.ch->stat.num_read_ops - bdev_io->internal.ch->prev_stat.num_read_ops; 3308 data[1] = bdev_io->internal.ch->stat.bytes_read - bdev_io->internal.ch->prev_stat.bytes_read; 3309 data[2] = bdev_io->internal.ch->stat.num_write_ops - bdev_io->internal.ch->prev_stat.num_write_ops; 3310 data[3] = bdev_io->internal.ch->stat.bytes_written - bdev_io->internal.ch->prev_stat.bytes_written; 3311 data[4] = bdev_io->bdev->fn_table->get_spin_time ? 3312 bdev_io->bdev->fn_table->get_spin_time(bdev_io->internal.ch->channel) : 0; 3313 3314 __itt_metadata_add(g_bdev_mgr.domain, __itt_null, bdev_io->internal.ch->handle, 3315 __itt_metadata_u64, 5, data); 3316 3317 bdev_io->internal.ch->prev_stat = bdev_io->internal.ch->stat; 3318 bdev_io->internal.ch->start_tsc = now_tsc; 3319 } 3320 #endif 3321 3322 assert(bdev_io->internal.cb != NULL); 3323 assert(spdk_get_thread() == spdk_io_channel_get_thread(bdev_io->internal.ch->channel)); 3324 3325 bdev_io->internal.cb(bdev_io, bdev_io->internal.status == SPDK_BDEV_IO_STATUS_SUCCESS, 3326 bdev_io->internal.caller_ctx); 3327 } 3328 3329 static void 3330 _spdk_bdev_reset_complete(struct spdk_io_channel_iter *i, int status) 3331 { 3332 struct spdk_bdev_io *bdev_io = spdk_io_channel_iter_get_ctx(i); 3333 3334 if (bdev_io->u.reset.ch_ref != NULL) { 3335 spdk_put_io_channel(bdev_io->u.reset.ch_ref); 3336 bdev_io->u.reset.ch_ref = NULL; 3337 } 3338 3339 _spdk_bdev_io_complete(bdev_io); 3340 } 3341 3342 static void 3343 _spdk_bdev_unfreeze_channel(struct spdk_io_channel_iter *i) 3344 { 3345 struct spdk_io_channel *_ch = spdk_io_channel_iter_get_channel(i); 3346 struct spdk_bdev_channel *ch = spdk_io_channel_get_ctx(_ch); 3347 3348 ch->flags &= ~BDEV_CH_RESET_IN_PROGRESS; 3349 if (!TAILQ_EMPTY(&ch->queued_resets)) { 3350 _spdk_bdev_channel_start_reset(ch); 3351 } 3352 3353 spdk_for_each_channel_continue(i, 0); 3354 } 3355 3356 void 3357 spdk_bdev_io_complete(struct spdk_bdev_io *bdev_io, enum spdk_bdev_io_status status) 3358 { 3359 struct spdk_bdev *bdev = bdev_io->bdev; 3360 struct spdk_bdev_channel *bdev_ch = bdev_io->internal.ch; 3361 struct spdk_bdev_shared_resource *shared_resource = bdev_ch->shared_resource; 3362 3363 bdev_io->internal.status = status; 3364 3365 if (spdk_unlikely(bdev_io->type == SPDK_BDEV_IO_TYPE_RESET)) { 3366 bool unlock_channels = false; 3367 3368 if (status == SPDK_BDEV_IO_STATUS_NOMEM) { 3369 SPDK_ERRLOG("NOMEM returned for reset\n"); 3370 } 3371 pthread_mutex_lock(&bdev->internal.mutex); 3372 if (bdev_io == bdev->internal.reset_in_progress) { 3373 bdev->internal.reset_in_progress = NULL; 3374 unlock_channels = true; 3375 } 3376 pthread_mutex_unlock(&bdev->internal.mutex); 3377 3378 if (unlock_channels) { 3379 spdk_for_each_channel(__bdev_to_io_dev(bdev), _spdk_bdev_unfreeze_channel, 3380 bdev_io, _spdk_bdev_reset_complete); 3381 return; 3382 } 3383 } else { 3384 if (spdk_unlikely(bdev_io->internal.orig_iovcnt > 0)) { 3385 _bdev_io_unset_bounce_buf(bdev_io); 3386 } 3387 3388 assert(bdev_ch->io_outstanding > 0); 3389 assert(shared_resource->io_outstanding > 0); 3390 bdev_ch->io_outstanding--; 3391 shared_resource->io_outstanding--; 3392 3393 if (spdk_unlikely(status == SPDK_BDEV_IO_STATUS_NOMEM)) { 3394 assert(shared_resource->io_outstanding > 0); 3395 TAILQ_INSERT_HEAD(&shared_resource->nomem_io, bdev_io, internal.link); 3396 /* 3397 * Wait for some of the outstanding I/O to complete before we 3398 * retry any of the nomem_io. Normally we will wait for 3399 * NOMEM_THRESHOLD_COUNT I/O to complete but for low queue 3400 * depth channels we will instead wait for half to complete. 3401 */ 3402 shared_resource->nomem_threshold = spdk_max((int64_t)shared_resource->io_outstanding / 2, 3403 (int64_t)shared_resource->io_outstanding - NOMEM_THRESHOLD_COUNT); 3404 return; 3405 } 3406 3407 if (spdk_unlikely(!TAILQ_EMPTY(&shared_resource->nomem_io))) { 3408 _spdk_bdev_ch_retry_io(bdev_ch); 3409 } 3410 } 3411 3412 _spdk_bdev_io_complete(bdev_io); 3413 } 3414 3415 void 3416 spdk_bdev_io_complete_scsi_status(struct spdk_bdev_io *bdev_io, enum spdk_scsi_status sc, 3417 enum spdk_scsi_sense sk, uint8_t asc, uint8_t ascq) 3418 { 3419 if (sc == SPDK_SCSI_STATUS_GOOD) { 3420 bdev_io->internal.status = SPDK_BDEV_IO_STATUS_SUCCESS; 3421 } else { 3422 bdev_io->internal.status = SPDK_BDEV_IO_STATUS_SCSI_ERROR; 3423 bdev_io->internal.error.scsi.sc = sc; 3424 bdev_io->internal.error.scsi.sk = sk; 3425 bdev_io->internal.error.scsi.asc = asc; 3426 bdev_io->internal.error.scsi.ascq = ascq; 3427 } 3428 3429 spdk_bdev_io_complete(bdev_io, bdev_io->internal.status); 3430 } 3431 3432 void 3433 spdk_bdev_io_get_scsi_status(const struct spdk_bdev_io *bdev_io, 3434 int *sc, int *sk, int *asc, int *ascq) 3435 { 3436 assert(sc != NULL); 3437 assert(sk != NULL); 3438 assert(asc != NULL); 3439 assert(ascq != NULL); 3440 3441 switch (bdev_io->internal.status) { 3442 case SPDK_BDEV_IO_STATUS_SUCCESS: 3443 *sc = SPDK_SCSI_STATUS_GOOD; 3444 *sk = SPDK_SCSI_SENSE_NO_SENSE; 3445 *asc = SPDK_SCSI_ASC_NO_ADDITIONAL_SENSE; 3446 *ascq = SPDK_SCSI_ASCQ_CAUSE_NOT_REPORTABLE; 3447 break; 3448 case SPDK_BDEV_IO_STATUS_NVME_ERROR: 3449 spdk_scsi_nvme_translate(bdev_io, sc, sk, asc, ascq); 3450 break; 3451 case SPDK_BDEV_IO_STATUS_SCSI_ERROR: 3452 *sc = bdev_io->internal.error.scsi.sc; 3453 *sk = bdev_io->internal.error.scsi.sk; 3454 *asc = bdev_io->internal.error.scsi.asc; 3455 *ascq = bdev_io->internal.error.scsi.ascq; 3456 break; 3457 default: 3458 *sc = SPDK_SCSI_STATUS_CHECK_CONDITION; 3459 *sk = SPDK_SCSI_SENSE_ABORTED_COMMAND; 3460 *asc = SPDK_SCSI_ASC_NO_ADDITIONAL_SENSE; 3461 *ascq = SPDK_SCSI_ASCQ_CAUSE_NOT_REPORTABLE; 3462 break; 3463 } 3464 } 3465 3466 void 3467 spdk_bdev_io_complete_nvme_status(struct spdk_bdev_io *bdev_io, int sct, int sc) 3468 { 3469 if (sct == SPDK_NVME_SCT_GENERIC && sc == SPDK_NVME_SC_SUCCESS) { 3470 bdev_io->internal.status = SPDK_BDEV_IO_STATUS_SUCCESS; 3471 } else { 3472 bdev_io->internal.error.nvme.sct = sct; 3473 bdev_io->internal.error.nvme.sc = sc; 3474 bdev_io->internal.status = SPDK_BDEV_IO_STATUS_NVME_ERROR; 3475 } 3476 3477 spdk_bdev_io_complete(bdev_io, bdev_io->internal.status); 3478 } 3479 3480 void 3481 spdk_bdev_io_get_nvme_status(const struct spdk_bdev_io *bdev_io, int *sct, int *sc) 3482 { 3483 assert(sct != NULL); 3484 assert(sc != NULL); 3485 3486 if (bdev_io->internal.status == SPDK_BDEV_IO_STATUS_NVME_ERROR) { 3487 *sct = bdev_io->internal.error.nvme.sct; 3488 *sc = bdev_io->internal.error.nvme.sc; 3489 } else if (bdev_io->internal.status == SPDK_BDEV_IO_STATUS_SUCCESS) { 3490 *sct = SPDK_NVME_SCT_GENERIC; 3491 *sc = SPDK_NVME_SC_SUCCESS; 3492 } else { 3493 *sct = SPDK_NVME_SCT_GENERIC; 3494 *sc = SPDK_NVME_SC_INTERNAL_DEVICE_ERROR; 3495 } 3496 } 3497 3498 struct spdk_thread * 3499 spdk_bdev_io_get_thread(struct spdk_bdev_io *bdev_io) 3500 { 3501 return spdk_io_channel_get_thread(bdev_io->internal.ch->channel); 3502 } 3503 3504 struct spdk_io_channel * 3505 spdk_bdev_io_get_io_channel(struct spdk_bdev_io *bdev_io) 3506 { 3507 return bdev_io->internal.ch->channel; 3508 } 3509 3510 static void 3511 _spdk_bdev_qos_config_limit(struct spdk_bdev *bdev, uint64_t *limits) 3512 { 3513 uint64_t min_qos_set; 3514 int i; 3515 3516 for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) { 3517 if (limits[i] != SPDK_BDEV_QOS_LIMIT_NOT_DEFINED) { 3518 break; 3519 } 3520 } 3521 3522 if (i == SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES) { 3523 SPDK_ERRLOG("Invalid rate limits set.\n"); 3524 return; 3525 } 3526 3527 for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) { 3528 if (limits[i] == SPDK_BDEV_QOS_LIMIT_NOT_DEFINED) { 3529 continue; 3530 } 3531 3532 if (_spdk_bdev_qos_is_iops_rate_limit(i) == true) { 3533 min_qos_set = SPDK_BDEV_QOS_MIN_IOS_PER_SEC; 3534 } else { 3535 min_qos_set = SPDK_BDEV_QOS_MIN_BYTES_PER_SEC; 3536 } 3537 3538 if (limits[i] == 0 || limits[i] % min_qos_set) { 3539 SPDK_ERRLOG("Assigned limit %" PRIu64 " on bdev %s is not multiple of %" PRIu64 "\n", 3540 limits[i], bdev->name, min_qos_set); 3541 SPDK_ERRLOG("Failed to enable QoS on this bdev %s\n", bdev->name); 3542 return; 3543 } 3544 } 3545 3546 if (!bdev->internal.qos) { 3547 bdev->internal.qos = calloc(1, sizeof(*bdev->internal.qos)); 3548 if (!bdev->internal.qos) { 3549 SPDK_ERRLOG("Unable to allocate memory for QoS tracking\n"); 3550 return; 3551 } 3552 } 3553 3554 for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) { 3555 bdev->internal.qos->rate_limits[i].limit = limits[i]; 3556 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Bdev:%s QoS type:%d set:%lu\n", 3557 bdev->name, i, limits[i]); 3558 } 3559 3560 return; 3561 } 3562 3563 static void 3564 _spdk_bdev_qos_config(struct spdk_bdev *bdev) 3565 { 3566 struct spdk_conf_section *sp = NULL; 3567 const char *val = NULL; 3568 int i = 0, j = 0; 3569 uint64_t limits[SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES] = {}; 3570 bool config_qos = false; 3571 3572 sp = spdk_conf_find_section(NULL, "QoS"); 3573 if (!sp) { 3574 return; 3575 } 3576 3577 while (j < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES) { 3578 limits[j] = SPDK_BDEV_QOS_LIMIT_NOT_DEFINED; 3579 3580 i = 0; 3581 while (true) { 3582 val = spdk_conf_section_get_nmval(sp, qos_conf_type[j], i, 0); 3583 if (!val) { 3584 break; 3585 } 3586 3587 if (strcmp(bdev->name, val) != 0) { 3588 i++; 3589 continue; 3590 } 3591 3592 val = spdk_conf_section_get_nmval(sp, qos_conf_type[j], i, 1); 3593 if (val) { 3594 if (_spdk_bdev_qos_is_iops_rate_limit(j) == true) { 3595 limits[j] = strtoull(val, NULL, 10); 3596 } else { 3597 limits[j] = strtoull(val, NULL, 10) * 1024 * 1024; 3598 } 3599 config_qos = true; 3600 } 3601 3602 break; 3603 } 3604 3605 j++; 3606 } 3607 3608 if (config_qos == true) { 3609 _spdk_bdev_qos_config_limit(bdev, limits); 3610 } 3611 3612 return; 3613 } 3614 3615 static int 3616 spdk_bdev_init(struct spdk_bdev *bdev) 3617 { 3618 char *bdev_name; 3619 3620 assert(bdev->module != NULL); 3621 3622 if (!bdev->name) { 3623 SPDK_ERRLOG("Bdev name is NULL\n"); 3624 return -EINVAL; 3625 } 3626 3627 if (spdk_bdev_get_by_name(bdev->name)) { 3628 SPDK_ERRLOG("Bdev name:%s already exists\n", bdev->name); 3629 return -EEXIST; 3630 } 3631 3632 /* Users often register their own I/O devices using the bdev name. In 3633 * order to avoid conflicts, prepend bdev_. */ 3634 bdev_name = spdk_sprintf_alloc("bdev_%s", bdev->name); 3635 if (!bdev_name) { 3636 SPDK_ERRLOG("Unable to allocate memory for internal bdev name.\n"); 3637 return -ENOMEM; 3638 } 3639 3640 bdev->internal.status = SPDK_BDEV_STATUS_READY; 3641 bdev->internal.measured_queue_depth = UINT64_MAX; 3642 bdev->internal.claim_module = NULL; 3643 bdev->internal.qd_poller = NULL; 3644 bdev->internal.qos = NULL; 3645 3646 if (spdk_bdev_get_buf_align(bdev) > 1) { 3647 if (bdev->split_on_optimal_io_boundary) { 3648 bdev->optimal_io_boundary = spdk_min(bdev->optimal_io_boundary, 3649 SPDK_BDEV_LARGE_BUF_MAX_SIZE / bdev->blocklen); 3650 } else { 3651 bdev->split_on_optimal_io_boundary = true; 3652 bdev->optimal_io_boundary = SPDK_BDEV_LARGE_BUF_MAX_SIZE / bdev->blocklen; 3653 } 3654 } 3655 3656 TAILQ_INIT(&bdev->internal.open_descs); 3657 3658 TAILQ_INIT(&bdev->aliases); 3659 3660 bdev->internal.reset_in_progress = NULL; 3661 3662 _spdk_bdev_qos_config(bdev); 3663 3664 spdk_io_device_register(__bdev_to_io_dev(bdev), 3665 spdk_bdev_channel_create, spdk_bdev_channel_destroy, 3666 sizeof(struct spdk_bdev_channel), 3667 bdev_name); 3668 3669 free(bdev_name); 3670 3671 pthread_mutex_init(&bdev->internal.mutex, NULL); 3672 return 0; 3673 } 3674 3675 static void 3676 spdk_bdev_destroy_cb(void *io_device) 3677 { 3678 int rc; 3679 struct spdk_bdev *bdev; 3680 spdk_bdev_unregister_cb cb_fn; 3681 void *cb_arg; 3682 3683 bdev = __bdev_from_io_dev(io_device); 3684 cb_fn = bdev->internal.unregister_cb; 3685 cb_arg = bdev->internal.unregister_ctx; 3686 3687 rc = bdev->fn_table->destruct(bdev->ctxt); 3688 if (rc < 0) { 3689 SPDK_ERRLOG("destruct failed\n"); 3690 } 3691 if (rc <= 0 && cb_fn != NULL) { 3692 cb_fn(cb_arg, rc); 3693 } 3694 } 3695 3696 3697 static void 3698 spdk_bdev_fini(struct spdk_bdev *bdev) 3699 { 3700 pthread_mutex_destroy(&bdev->internal.mutex); 3701 3702 free(bdev->internal.qos); 3703 3704 spdk_io_device_unregister(__bdev_to_io_dev(bdev), spdk_bdev_destroy_cb); 3705 } 3706 3707 static void 3708 spdk_bdev_start(struct spdk_bdev *bdev) 3709 { 3710 struct spdk_bdev_module *module; 3711 uint32_t action; 3712 3713 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Inserting bdev %s into list\n", bdev->name); 3714 TAILQ_INSERT_TAIL(&g_bdev_mgr.bdevs, bdev, internal.link); 3715 3716 /* Examine configuration before initializing I/O */ 3717 TAILQ_FOREACH(module, &g_bdev_mgr.bdev_modules, internal.tailq) { 3718 if (module->examine_config) { 3719 action = module->internal.action_in_progress; 3720 module->internal.action_in_progress++; 3721 module->examine_config(bdev); 3722 if (action != module->internal.action_in_progress) { 3723 SPDK_ERRLOG("examine_config for module %s did not call spdk_bdev_module_examine_done()\n", 3724 module->name); 3725 } 3726 } 3727 } 3728 3729 if (bdev->internal.claim_module) { 3730 if (bdev->internal.claim_module->examine_disk) { 3731 bdev->internal.claim_module->internal.action_in_progress++; 3732 bdev->internal.claim_module->examine_disk(bdev); 3733 } 3734 return; 3735 } 3736 3737 TAILQ_FOREACH(module, &g_bdev_mgr.bdev_modules, internal.tailq) { 3738 if (module->examine_disk) { 3739 module->internal.action_in_progress++; 3740 module->examine_disk(bdev); 3741 } 3742 } 3743 } 3744 3745 int 3746 spdk_bdev_register(struct spdk_bdev *bdev) 3747 { 3748 int rc = spdk_bdev_init(bdev); 3749 3750 if (rc == 0) { 3751 spdk_bdev_start(bdev); 3752 } 3753 3754 spdk_notify_send("bdev_register", spdk_bdev_get_name(bdev)); 3755 return rc; 3756 } 3757 3758 int 3759 spdk_vbdev_register(struct spdk_bdev *vbdev, struct spdk_bdev **base_bdevs, int base_bdev_count) 3760 { 3761 SPDK_ERRLOG("This function is deprecated. Use spdk_bdev_register() instead.\n"); 3762 return spdk_bdev_register(vbdev); 3763 } 3764 3765 void 3766 spdk_bdev_destruct_done(struct spdk_bdev *bdev, int bdeverrno) 3767 { 3768 if (bdev->internal.unregister_cb != NULL) { 3769 bdev->internal.unregister_cb(bdev->internal.unregister_ctx, bdeverrno); 3770 } 3771 } 3772 3773 static void 3774 _remove_notify(void *arg) 3775 { 3776 struct spdk_bdev_desc *desc = arg; 3777 3778 desc->remove_scheduled = false; 3779 3780 if (desc->closed) { 3781 free(desc); 3782 } else { 3783 desc->remove_cb(desc->remove_ctx); 3784 } 3785 } 3786 3787 /* Must be called while holding bdev->internal.mutex. 3788 * returns: 0 - bdev removed and ready to be destructed. 3789 * -EBUSY - bdev can't be destructed yet. */ 3790 static int 3791 spdk_bdev_unregister_unsafe(struct spdk_bdev *bdev) 3792 { 3793 struct spdk_bdev_desc *desc, *tmp; 3794 int rc = 0; 3795 3796 TAILQ_FOREACH_SAFE(desc, &bdev->internal.open_descs, link, tmp) { 3797 if (desc->remove_cb) { 3798 rc = -EBUSY; 3799 /* 3800 * Defer invocation of the remove_cb to a separate message that will 3801 * run later on its thread. This ensures this context unwinds and 3802 * we don't recursively unregister this bdev again if the remove_cb 3803 * immediately closes its descriptor. 3804 */ 3805 if (!desc->remove_scheduled) { 3806 /* Avoid scheduling removal of the same descriptor multiple times. */ 3807 desc->remove_scheduled = true; 3808 spdk_thread_send_msg(desc->thread, _remove_notify, desc); 3809 } 3810 } 3811 } 3812 3813 if (rc == 0) { 3814 TAILQ_REMOVE(&g_bdev_mgr.bdevs, bdev, internal.link); 3815 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Removing bdev %s from list done\n", bdev->name); 3816 spdk_notify_send("bdev_unregister", spdk_bdev_get_name(bdev)); 3817 } 3818 3819 return rc; 3820 } 3821 3822 void 3823 spdk_bdev_unregister(struct spdk_bdev *bdev, spdk_bdev_unregister_cb cb_fn, void *cb_arg) 3824 { 3825 struct spdk_thread *thread; 3826 int rc; 3827 3828 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Removing bdev %s from list\n", bdev->name); 3829 3830 thread = spdk_get_thread(); 3831 if (!thread) { 3832 /* The user called this from a non-SPDK thread. */ 3833 if (cb_fn != NULL) { 3834 cb_fn(cb_arg, -ENOTSUP); 3835 } 3836 return; 3837 } 3838 3839 pthread_mutex_lock(&bdev->internal.mutex); 3840 if (bdev->internal.status == SPDK_BDEV_STATUS_REMOVING) { 3841 pthread_mutex_unlock(&bdev->internal.mutex); 3842 if (cb_fn) { 3843 cb_fn(cb_arg, -EBUSY); 3844 } 3845 return; 3846 } 3847 3848 bdev->internal.status = SPDK_BDEV_STATUS_REMOVING; 3849 bdev->internal.unregister_cb = cb_fn; 3850 bdev->internal.unregister_ctx = cb_arg; 3851 3852 /* Call under lock. */ 3853 rc = spdk_bdev_unregister_unsafe(bdev); 3854 pthread_mutex_unlock(&bdev->internal.mutex); 3855 3856 if (rc == 0) { 3857 spdk_bdev_fini(bdev); 3858 } 3859 } 3860 3861 int 3862 spdk_bdev_open(struct spdk_bdev *bdev, bool write, spdk_bdev_remove_cb_t remove_cb, 3863 void *remove_ctx, struct spdk_bdev_desc **_desc) 3864 { 3865 struct spdk_bdev_desc *desc; 3866 struct spdk_thread *thread; 3867 struct set_qos_limit_ctx *ctx; 3868 3869 thread = spdk_get_thread(); 3870 if (!thread) { 3871 SPDK_ERRLOG("Cannot open bdev from non-SPDK thread.\n"); 3872 return -ENOTSUP; 3873 } 3874 3875 desc = calloc(1, sizeof(*desc)); 3876 if (desc == NULL) { 3877 SPDK_ERRLOG("Failed to allocate memory for bdev descriptor\n"); 3878 return -ENOMEM; 3879 } 3880 3881 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Opening descriptor %p for bdev %s on thread %p\n", desc, bdev->name, 3882 spdk_get_thread()); 3883 3884 desc->bdev = bdev; 3885 desc->thread = thread; 3886 desc->remove_cb = remove_cb; 3887 desc->remove_ctx = remove_ctx; 3888 desc->write = write; 3889 *_desc = desc; 3890 3891 pthread_mutex_lock(&bdev->internal.mutex); 3892 3893 if (write && bdev->internal.claim_module) { 3894 SPDK_ERRLOG("Could not open %s - %s module already claimed it\n", 3895 bdev->name, bdev->internal.claim_module->name); 3896 pthread_mutex_unlock(&bdev->internal.mutex); 3897 free(desc); 3898 *_desc = NULL; 3899 return -EPERM; 3900 } 3901 3902 /* Enable QoS */ 3903 if (bdev->internal.qos && bdev->internal.qos->thread == NULL) { 3904 ctx = calloc(1, sizeof(*ctx)); 3905 if (ctx == NULL) { 3906 SPDK_ERRLOG("Failed to allocate memory for QoS context\n"); 3907 pthread_mutex_unlock(&bdev->internal.mutex); 3908 free(desc); 3909 *_desc = NULL; 3910 return -ENOMEM; 3911 } 3912 ctx->bdev = bdev; 3913 spdk_for_each_channel(__bdev_to_io_dev(bdev), 3914 _spdk_bdev_enable_qos_msg, ctx, 3915 _spdk_bdev_enable_qos_done); 3916 } 3917 3918 TAILQ_INSERT_TAIL(&bdev->internal.open_descs, desc, link); 3919 3920 pthread_mutex_unlock(&bdev->internal.mutex); 3921 3922 return 0; 3923 } 3924 3925 void 3926 spdk_bdev_close(struct spdk_bdev_desc *desc) 3927 { 3928 struct spdk_bdev *bdev = desc->bdev; 3929 int rc; 3930 3931 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Closing descriptor %p for bdev %s on thread %p\n", desc, bdev->name, 3932 spdk_get_thread()); 3933 3934 assert(desc->thread == spdk_get_thread()); 3935 3936 pthread_mutex_lock(&bdev->internal.mutex); 3937 3938 TAILQ_REMOVE(&bdev->internal.open_descs, desc, link); 3939 3940 desc->closed = true; 3941 3942 if (!desc->remove_scheduled) { 3943 free(desc); 3944 } 3945 3946 /* If no more descriptors, kill QoS channel */ 3947 if (bdev->internal.qos && TAILQ_EMPTY(&bdev->internal.open_descs)) { 3948 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Closed last descriptor for bdev %s on thread %p. Stopping QoS.\n", 3949 bdev->name, spdk_get_thread()); 3950 3951 if (spdk_bdev_qos_destroy(bdev)) { 3952 /* There isn't anything we can do to recover here. Just let the 3953 * old QoS poller keep running. The QoS handling won't change 3954 * cores when the user allocates a new channel, but it won't break. */ 3955 SPDK_ERRLOG("Unable to shut down QoS poller. It will continue running on the current thread.\n"); 3956 } 3957 } 3958 3959 spdk_bdev_set_qd_sampling_period(bdev, 0); 3960 3961 if (bdev->internal.status == SPDK_BDEV_STATUS_REMOVING && TAILQ_EMPTY(&bdev->internal.open_descs)) { 3962 rc = spdk_bdev_unregister_unsafe(bdev); 3963 pthread_mutex_unlock(&bdev->internal.mutex); 3964 3965 if (rc == 0) { 3966 spdk_bdev_fini(bdev); 3967 } 3968 } else { 3969 pthread_mutex_unlock(&bdev->internal.mutex); 3970 } 3971 } 3972 3973 int 3974 spdk_bdev_module_claim_bdev(struct spdk_bdev *bdev, struct spdk_bdev_desc *desc, 3975 struct spdk_bdev_module *module) 3976 { 3977 if (bdev->internal.claim_module != NULL) { 3978 SPDK_ERRLOG("bdev %s already claimed by module %s\n", bdev->name, 3979 bdev->internal.claim_module->name); 3980 return -EPERM; 3981 } 3982 3983 if (desc && !desc->write) { 3984 desc->write = true; 3985 } 3986 3987 bdev->internal.claim_module = module; 3988 return 0; 3989 } 3990 3991 void 3992 spdk_bdev_module_release_bdev(struct spdk_bdev *bdev) 3993 { 3994 assert(bdev->internal.claim_module != NULL); 3995 bdev->internal.claim_module = NULL; 3996 } 3997 3998 struct spdk_bdev * 3999 spdk_bdev_desc_get_bdev(struct spdk_bdev_desc *desc) 4000 { 4001 return desc->bdev; 4002 } 4003 4004 void 4005 spdk_bdev_io_get_iovec(struct spdk_bdev_io *bdev_io, struct iovec **iovp, int *iovcntp) 4006 { 4007 struct iovec *iovs; 4008 int iovcnt; 4009 4010 if (bdev_io == NULL) { 4011 return; 4012 } 4013 4014 switch (bdev_io->type) { 4015 case SPDK_BDEV_IO_TYPE_READ: 4016 iovs = bdev_io->u.bdev.iovs; 4017 iovcnt = bdev_io->u.bdev.iovcnt; 4018 break; 4019 case SPDK_BDEV_IO_TYPE_WRITE: 4020 iovs = bdev_io->u.bdev.iovs; 4021 iovcnt = bdev_io->u.bdev.iovcnt; 4022 break; 4023 default: 4024 iovs = NULL; 4025 iovcnt = 0; 4026 break; 4027 } 4028 4029 if (iovp) { 4030 *iovp = iovs; 4031 } 4032 if (iovcntp) { 4033 *iovcntp = iovcnt; 4034 } 4035 } 4036 4037 void 4038 spdk_bdev_module_list_add(struct spdk_bdev_module *bdev_module) 4039 { 4040 4041 if (spdk_bdev_module_list_find(bdev_module->name)) { 4042 SPDK_ERRLOG("ERROR: module '%s' already registered.\n", bdev_module->name); 4043 assert(false); 4044 } 4045 4046 if (bdev_module->async_init) { 4047 bdev_module->internal.action_in_progress = 1; 4048 } 4049 4050 /* 4051 * Modules with examine callbacks must be initialized first, so they are 4052 * ready to handle examine callbacks from later modules that will 4053 * register physical bdevs. 4054 */ 4055 if (bdev_module->examine_config != NULL || bdev_module->examine_disk != NULL) { 4056 TAILQ_INSERT_HEAD(&g_bdev_mgr.bdev_modules, bdev_module, internal.tailq); 4057 } else { 4058 TAILQ_INSERT_TAIL(&g_bdev_mgr.bdev_modules, bdev_module, internal.tailq); 4059 } 4060 } 4061 4062 struct spdk_bdev_module * 4063 spdk_bdev_module_list_find(const char *name) 4064 { 4065 struct spdk_bdev_module *bdev_module; 4066 4067 TAILQ_FOREACH(bdev_module, &g_bdev_mgr.bdev_modules, internal.tailq) { 4068 if (strcmp(name, bdev_module->name) == 0) { 4069 break; 4070 } 4071 } 4072 4073 return bdev_module; 4074 } 4075 4076 static void 4077 _spdk_bdev_write_zero_buffer_next(void *_bdev_io) 4078 { 4079 struct spdk_bdev_io *bdev_io = _bdev_io; 4080 uint64_t num_bytes, num_blocks; 4081 int rc; 4082 4083 num_bytes = spdk_min(spdk_bdev_get_block_size(bdev_io->bdev) * 4084 bdev_io->u.bdev.split_remaining_num_blocks, 4085 ZERO_BUFFER_SIZE); 4086 num_blocks = num_bytes / spdk_bdev_get_block_size(bdev_io->bdev); 4087 4088 rc = spdk_bdev_write_blocks(bdev_io->internal.desc, 4089 spdk_io_channel_from_ctx(bdev_io->internal.ch), 4090 g_bdev_mgr.zero_buffer, 4091 bdev_io->u.bdev.split_current_offset_blocks, num_blocks, 4092 _spdk_bdev_write_zero_buffer_done, bdev_io); 4093 if (rc == 0) { 4094 bdev_io->u.bdev.split_remaining_num_blocks -= num_blocks; 4095 bdev_io->u.bdev.split_current_offset_blocks += num_blocks; 4096 } else if (rc == -ENOMEM) { 4097 _spdk_bdev_queue_io_wait_with_cb(bdev_io, _spdk_bdev_write_zero_buffer_next); 4098 } else { 4099 bdev_io->internal.status = SPDK_BDEV_IO_STATUS_FAILED; 4100 bdev_io->internal.cb(bdev_io, false, bdev_io->internal.caller_ctx); 4101 } 4102 } 4103 4104 static void 4105 _spdk_bdev_write_zero_buffer_done(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg) 4106 { 4107 struct spdk_bdev_io *parent_io = cb_arg; 4108 4109 spdk_bdev_free_io(bdev_io); 4110 4111 if (!success) { 4112 parent_io->internal.status = SPDK_BDEV_IO_STATUS_FAILED; 4113 parent_io->internal.cb(parent_io, false, parent_io->internal.caller_ctx); 4114 return; 4115 } 4116 4117 if (parent_io->u.bdev.split_remaining_num_blocks == 0) { 4118 parent_io->internal.status = SPDK_BDEV_IO_STATUS_SUCCESS; 4119 parent_io->internal.cb(parent_io, true, parent_io->internal.caller_ctx); 4120 return; 4121 } 4122 4123 _spdk_bdev_write_zero_buffer_next(parent_io); 4124 } 4125 4126 static void 4127 _spdk_bdev_set_qos_limit_done(struct set_qos_limit_ctx *ctx, int status) 4128 { 4129 pthread_mutex_lock(&ctx->bdev->internal.mutex); 4130 ctx->bdev->internal.qos_mod_in_progress = false; 4131 pthread_mutex_unlock(&ctx->bdev->internal.mutex); 4132 4133 if (ctx->cb_fn) { 4134 ctx->cb_fn(ctx->cb_arg, status); 4135 } 4136 free(ctx); 4137 } 4138 4139 static void 4140 _spdk_bdev_disable_qos_done(void *cb_arg) 4141 { 4142 struct set_qos_limit_ctx *ctx = cb_arg; 4143 struct spdk_bdev *bdev = ctx->bdev; 4144 struct spdk_bdev_io *bdev_io; 4145 struct spdk_bdev_qos *qos; 4146 4147 pthread_mutex_lock(&bdev->internal.mutex); 4148 qos = bdev->internal.qos; 4149 bdev->internal.qos = NULL; 4150 pthread_mutex_unlock(&bdev->internal.mutex); 4151 4152 while (!TAILQ_EMPTY(&qos->queued)) { 4153 /* Send queued I/O back to their original thread for resubmission. */ 4154 bdev_io = TAILQ_FIRST(&qos->queued); 4155 TAILQ_REMOVE(&qos->queued, bdev_io, internal.link); 4156 4157 if (bdev_io->internal.io_submit_ch) { 4158 /* 4159 * Channel was changed when sending it to the QoS thread - change it back 4160 * before sending it back to the original thread. 4161 */ 4162 bdev_io->internal.ch = bdev_io->internal.io_submit_ch; 4163 bdev_io->internal.io_submit_ch = NULL; 4164 } 4165 4166 spdk_thread_send_msg(spdk_io_channel_get_thread(bdev_io->internal.ch->channel), 4167 _spdk_bdev_io_submit, bdev_io); 4168 } 4169 4170 if (qos->thread != NULL) { 4171 spdk_put_io_channel(spdk_io_channel_from_ctx(qos->ch)); 4172 spdk_poller_unregister(&qos->poller); 4173 } 4174 4175 free(qos); 4176 4177 _spdk_bdev_set_qos_limit_done(ctx, 0); 4178 } 4179 4180 static void 4181 _spdk_bdev_disable_qos_msg_done(struct spdk_io_channel_iter *i, int status) 4182 { 4183 void *io_device = spdk_io_channel_iter_get_io_device(i); 4184 struct spdk_bdev *bdev = __bdev_from_io_dev(io_device); 4185 struct set_qos_limit_ctx *ctx = spdk_io_channel_iter_get_ctx(i); 4186 struct spdk_thread *thread; 4187 4188 pthread_mutex_lock(&bdev->internal.mutex); 4189 thread = bdev->internal.qos->thread; 4190 pthread_mutex_unlock(&bdev->internal.mutex); 4191 4192 if (thread != NULL) { 4193 spdk_thread_send_msg(thread, _spdk_bdev_disable_qos_done, ctx); 4194 } else { 4195 _spdk_bdev_disable_qos_done(ctx); 4196 } 4197 } 4198 4199 static void 4200 _spdk_bdev_disable_qos_msg(struct spdk_io_channel_iter *i) 4201 { 4202 struct spdk_io_channel *ch = spdk_io_channel_iter_get_channel(i); 4203 struct spdk_bdev_channel *bdev_ch = spdk_io_channel_get_ctx(ch); 4204 4205 bdev_ch->flags &= ~BDEV_CH_QOS_ENABLED; 4206 4207 spdk_for_each_channel_continue(i, 0); 4208 } 4209 4210 static void 4211 _spdk_bdev_update_qos_rate_limit_msg(void *cb_arg) 4212 { 4213 struct set_qos_limit_ctx *ctx = cb_arg; 4214 struct spdk_bdev *bdev = ctx->bdev; 4215 4216 pthread_mutex_lock(&bdev->internal.mutex); 4217 spdk_bdev_qos_update_max_quota_per_timeslice(bdev->internal.qos); 4218 pthread_mutex_unlock(&bdev->internal.mutex); 4219 4220 _spdk_bdev_set_qos_limit_done(ctx, 0); 4221 } 4222 4223 static void 4224 _spdk_bdev_enable_qos_msg(struct spdk_io_channel_iter *i) 4225 { 4226 void *io_device = spdk_io_channel_iter_get_io_device(i); 4227 struct spdk_bdev *bdev = __bdev_from_io_dev(io_device); 4228 struct spdk_io_channel *ch = spdk_io_channel_iter_get_channel(i); 4229 struct spdk_bdev_channel *bdev_ch = spdk_io_channel_get_ctx(ch); 4230 4231 pthread_mutex_lock(&bdev->internal.mutex); 4232 _spdk_bdev_enable_qos(bdev, bdev_ch); 4233 pthread_mutex_unlock(&bdev->internal.mutex); 4234 spdk_for_each_channel_continue(i, 0); 4235 } 4236 4237 static void 4238 _spdk_bdev_enable_qos_done(struct spdk_io_channel_iter *i, int status) 4239 { 4240 struct set_qos_limit_ctx *ctx = spdk_io_channel_iter_get_ctx(i); 4241 4242 _spdk_bdev_set_qos_limit_done(ctx, status); 4243 } 4244 4245 static void 4246 _spdk_bdev_set_qos_rate_limits(struct spdk_bdev *bdev, uint64_t *limits) 4247 { 4248 int i; 4249 4250 assert(bdev->internal.qos != NULL); 4251 4252 for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) { 4253 if (limits[i] != SPDK_BDEV_QOS_LIMIT_NOT_DEFINED) { 4254 bdev->internal.qos->rate_limits[i].limit = limits[i]; 4255 4256 if (limits[i] == 0) { 4257 bdev->internal.qos->rate_limits[i].limit = 4258 SPDK_BDEV_QOS_LIMIT_NOT_DEFINED; 4259 } 4260 } 4261 } 4262 } 4263 4264 void 4265 spdk_bdev_set_qos_rate_limits(struct spdk_bdev *bdev, uint64_t *limits, 4266 void (*cb_fn)(void *cb_arg, int status), void *cb_arg) 4267 { 4268 struct set_qos_limit_ctx *ctx; 4269 uint32_t limit_set_complement; 4270 uint64_t min_limit_per_sec; 4271 int i; 4272 bool disable_rate_limit = true; 4273 4274 for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) { 4275 if (limits[i] == SPDK_BDEV_QOS_LIMIT_NOT_DEFINED) { 4276 continue; 4277 } 4278 4279 if (limits[i] > 0) { 4280 disable_rate_limit = false; 4281 } 4282 4283 if (_spdk_bdev_qos_is_iops_rate_limit(i) == true) { 4284 min_limit_per_sec = SPDK_BDEV_QOS_MIN_IOS_PER_SEC; 4285 } else { 4286 /* Change from megabyte to byte rate limit */ 4287 limits[i] = limits[i] * 1024 * 1024; 4288 min_limit_per_sec = SPDK_BDEV_QOS_MIN_BYTES_PER_SEC; 4289 } 4290 4291 limit_set_complement = limits[i] % min_limit_per_sec; 4292 if (limit_set_complement) { 4293 SPDK_ERRLOG("Requested rate limit %" PRIu64 " is not a multiple of %" PRIu64 "\n", 4294 limits[i], min_limit_per_sec); 4295 limits[i] += min_limit_per_sec - limit_set_complement; 4296 SPDK_ERRLOG("Round up the rate limit to %" PRIu64 "\n", limits[i]); 4297 } 4298 } 4299 4300 ctx = calloc(1, sizeof(*ctx)); 4301 if (ctx == NULL) { 4302 cb_fn(cb_arg, -ENOMEM); 4303 return; 4304 } 4305 4306 ctx->cb_fn = cb_fn; 4307 ctx->cb_arg = cb_arg; 4308 ctx->bdev = bdev; 4309 4310 pthread_mutex_lock(&bdev->internal.mutex); 4311 if (bdev->internal.qos_mod_in_progress) { 4312 pthread_mutex_unlock(&bdev->internal.mutex); 4313 free(ctx); 4314 cb_fn(cb_arg, -EAGAIN); 4315 return; 4316 } 4317 bdev->internal.qos_mod_in_progress = true; 4318 4319 if (disable_rate_limit == true && bdev->internal.qos) { 4320 for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) { 4321 if (limits[i] == SPDK_BDEV_QOS_LIMIT_NOT_DEFINED && 4322 (bdev->internal.qos->rate_limits[i].limit > 0 && 4323 bdev->internal.qos->rate_limits[i].limit != 4324 SPDK_BDEV_QOS_LIMIT_NOT_DEFINED)) { 4325 disable_rate_limit = false; 4326 break; 4327 } 4328 } 4329 } 4330 4331 if (disable_rate_limit == false) { 4332 if (bdev->internal.qos == NULL) { 4333 bdev->internal.qos = calloc(1, sizeof(*bdev->internal.qos)); 4334 if (!bdev->internal.qos) { 4335 pthread_mutex_unlock(&bdev->internal.mutex); 4336 SPDK_ERRLOG("Unable to allocate memory for QoS tracking\n"); 4337 free(ctx); 4338 cb_fn(cb_arg, -ENOMEM); 4339 return; 4340 } 4341 } 4342 4343 if (bdev->internal.qos->thread == NULL) { 4344 /* Enabling */ 4345 _spdk_bdev_set_qos_rate_limits(bdev, limits); 4346 4347 spdk_for_each_channel(__bdev_to_io_dev(bdev), 4348 _spdk_bdev_enable_qos_msg, ctx, 4349 _spdk_bdev_enable_qos_done); 4350 } else { 4351 /* Updating */ 4352 _spdk_bdev_set_qos_rate_limits(bdev, limits); 4353 4354 spdk_thread_send_msg(bdev->internal.qos->thread, 4355 _spdk_bdev_update_qos_rate_limit_msg, ctx); 4356 } 4357 } else { 4358 if (bdev->internal.qos != NULL) { 4359 _spdk_bdev_set_qos_rate_limits(bdev, limits); 4360 4361 /* Disabling */ 4362 spdk_for_each_channel(__bdev_to_io_dev(bdev), 4363 _spdk_bdev_disable_qos_msg, ctx, 4364 _spdk_bdev_disable_qos_msg_done); 4365 } else { 4366 pthread_mutex_unlock(&bdev->internal.mutex); 4367 _spdk_bdev_set_qos_limit_done(ctx, 0); 4368 return; 4369 } 4370 } 4371 4372 pthread_mutex_unlock(&bdev->internal.mutex); 4373 } 4374 4375 struct spdk_bdev_histogram_ctx { 4376 spdk_bdev_histogram_status_cb cb_fn; 4377 void *cb_arg; 4378 struct spdk_bdev *bdev; 4379 int status; 4380 }; 4381 4382 static void 4383 _spdk_bdev_histogram_disable_channel_cb(struct spdk_io_channel_iter *i, int status) 4384 { 4385 struct spdk_bdev_histogram_ctx *ctx = spdk_io_channel_iter_get_ctx(i); 4386 4387 pthread_mutex_lock(&ctx->bdev->internal.mutex); 4388 ctx->bdev->internal.histogram_in_progress = false; 4389 pthread_mutex_unlock(&ctx->bdev->internal.mutex); 4390 ctx->cb_fn(ctx->cb_arg, ctx->status); 4391 free(ctx); 4392 } 4393 4394 static void 4395 _spdk_bdev_histogram_disable_channel(struct spdk_io_channel_iter *i) 4396 { 4397 struct spdk_io_channel *_ch = spdk_io_channel_iter_get_channel(i); 4398 struct spdk_bdev_channel *ch = spdk_io_channel_get_ctx(_ch); 4399 4400 if (ch->histogram != NULL) { 4401 spdk_histogram_data_free(ch->histogram); 4402 ch->histogram = NULL; 4403 } 4404 spdk_for_each_channel_continue(i, 0); 4405 } 4406 4407 static void 4408 _spdk_bdev_histogram_enable_channel_cb(struct spdk_io_channel_iter *i, int status) 4409 { 4410 struct spdk_bdev_histogram_ctx *ctx = spdk_io_channel_iter_get_ctx(i); 4411 4412 if (status != 0) { 4413 ctx->status = status; 4414 ctx->bdev->internal.histogram_enabled = false; 4415 spdk_for_each_channel(__bdev_to_io_dev(ctx->bdev), _spdk_bdev_histogram_disable_channel, ctx, 4416 _spdk_bdev_histogram_disable_channel_cb); 4417 } else { 4418 pthread_mutex_lock(&ctx->bdev->internal.mutex); 4419 ctx->bdev->internal.histogram_in_progress = false; 4420 pthread_mutex_unlock(&ctx->bdev->internal.mutex); 4421 ctx->cb_fn(ctx->cb_arg, ctx->status); 4422 free(ctx); 4423 } 4424 } 4425 4426 static void 4427 _spdk_bdev_histogram_enable_channel(struct spdk_io_channel_iter *i) 4428 { 4429 struct spdk_io_channel *_ch = spdk_io_channel_iter_get_channel(i); 4430 struct spdk_bdev_channel *ch = spdk_io_channel_get_ctx(_ch); 4431 int status = 0; 4432 4433 if (ch->histogram == NULL) { 4434 ch->histogram = spdk_histogram_data_alloc(); 4435 if (ch->histogram == NULL) { 4436 status = -ENOMEM; 4437 } 4438 } 4439 4440 spdk_for_each_channel_continue(i, status); 4441 } 4442 4443 void 4444 spdk_bdev_histogram_enable(struct spdk_bdev *bdev, spdk_bdev_histogram_status_cb cb_fn, 4445 void *cb_arg, bool enable) 4446 { 4447 struct spdk_bdev_histogram_ctx *ctx; 4448 4449 ctx = calloc(1, sizeof(struct spdk_bdev_histogram_ctx)); 4450 if (ctx == NULL) { 4451 cb_fn(cb_arg, -ENOMEM); 4452 return; 4453 } 4454 4455 ctx->bdev = bdev; 4456 ctx->status = 0; 4457 ctx->cb_fn = cb_fn; 4458 ctx->cb_arg = cb_arg; 4459 4460 pthread_mutex_lock(&bdev->internal.mutex); 4461 if (bdev->internal.histogram_in_progress) { 4462 pthread_mutex_unlock(&bdev->internal.mutex); 4463 free(ctx); 4464 cb_fn(cb_arg, -EAGAIN); 4465 return; 4466 } 4467 4468 bdev->internal.histogram_in_progress = true; 4469 pthread_mutex_unlock(&bdev->internal.mutex); 4470 4471 bdev->internal.histogram_enabled = enable; 4472 4473 if (enable) { 4474 /* Allocate histogram for each channel */ 4475 spdk_for_each_channel(__bdev_to_io_dev(bdev), _spdk_bdev_histogram_enable_channel, ctx, 4476 _spdk_bdev_histogram_enable_channel_cb); 4477 } else { 4478 spdk_for_each_channel(__bdev_to_io_dev(bdev), _spdk_bdev_histogram_disable_channel, ctx, 4479 _spdk_bdev_histogram_disable_channel_cb); 4480 } 4481 } 4482 4483 struct spdk_bdev_histogram_data_ctx { 4484 spdk_bdev_histogram_data_cb cb_fn; 4485 void *cb_arg; 4486 struct spdk_bdev *bdev; 4487 /** merged histogram data from all channels */ 4488 struct spdk_histogram_data *histogram; 4489 }; 4490 4491 static void 4492 _spdk_bdev_histogram_get_channel_cb(struct spdk_io_channel_iter *i, int status) 4493 { 4494 struct spdk_bdev_histogram_data_ctx *ctx = spdk_io_channel_iter_get_ctx(i); 4495 4496 ctx->cb_fn(ctx->cb_arg, status, ctx->histogram); 4497 free(ctx); 4498 } 4499 4500 static void 4501 _spdk_bdev_histogram_get_channel(struct spdk_io_channel_iter *i) 4502 { 4503 struct spdk_io_channel *_ch = spdk_io_channel_iter_get_channel(i); 4504 struct spdk_bdev_channel *ch = spdk_io_channel_get_ctx(_ch); 4505 struct spdk_bdev_histogram_data_ctx *ctx = spdk_io_channel_iter_get_ctx(i); 4506 int status = 0; 4507 4508 if (ch->histogram == NULL) { 4509 status = -EFAULT; 4510 } else { 4511 spdk_histogram_data_merge(ctx->histogram, ch->histogram); 4512 } 4513 4514 spdk_for_each_channel_continue(i, status); 4515 } 4516 4517 void 4518 spdk_bdev_histogram_get(struct spdk_bdev *bdev, struct spdk_histogram_data *histogram, 4519 spdk_bdev_histogram_data_cb cb_fn, 4520 void *cb_arg) 4521 { 4522 struct spdk_bdev_histogram_data_ctx *ctx; 4523 4524 ctx = calloc(1, sizeof(struct spdk_bdev_histogram_data_ctx)); 4525 if (ctx == NULL) { 4526 cb_fn(cb_arg, -ENOMEM, NULL); 4527 return; 4528 } 4529 4530 ctx->bdev = bdev; 4531 ctx->cb_fn = cb_fn; 4532 ctx->cb_arg = cb_arg; 4533 4534 ctx->histogram = histogram; 4535 4536 spdk_for_each_channel(__bdev_to_io_dev(bdev), _spdk_bdev_histogram_get_channel, ctx, 4537 _spdk_bdev_histogram_get_channel_cb); 4538 } 4539 4540 SPDK_LOG_REGISTER_COMPONENT("bdev", SPDK_LOG_BDEV) 4541 4542 SPDK_TRACE_REGISTER_FN(bdev_trace, "bdev", TRACE_GROUP_BDEV) 4543 { 4544 spdk_trace_register_owner(OWNER_BDEV, 'b'); 4545 spdk_trace_register_object(OBJECT_BDEV_IO, 'i'); 4546 spdk_trace_register_description("BDEV_IO_START", "", TRACE_BDEV_IO_START, OWNER_BDEV, 4547 OBJECT_BDEV_IO, 1, 0, "type: "); 4548 spdk_trace_register_description("BDEV_IO_DONE", "", TRACE_BDEV_IO_DONE, OWNER_BDEV, 4549 OBJECT_BDEV_IO, 0, 0, ""); 4550 } 4551