1 /*- 2 * BSD LICENSE 3 * 4 * Copyright (c) Intel Corporation. 5 * All rights reserved. 6 * 7 * Redistribution and use in source and binary forms, with or without 8 * modification, are permitted provided that the following conditions 9 * are met: 10 * 11 * * Redistributions of source code must retain the above copyright 12 * notice, this list of conditions and the following disclaimer. 13 * * Redistributions in binary form must reproduce the above copyright 14 * notice, this list of conditions and the following disclaimer in 15 * the documentation and/or other materials provided with the 16 * distribution. 17 * * Neither the name of Intel Corporation nor the names of its 18 * contributors may be used to endorse or promote products derived 19 * from this software without specific prior written permission. 20 * 21 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 22 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 23 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR 24 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT 25 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, 26 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT 27 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 28 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY 29 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 30 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE 31 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 32 */ 33 34 #include "spdk/stdinc.h" 35 36 #include "spdk/bdev.h" 37 #include "spdk/conf.h" 38 39 #include "spdk/config.h" 40 #include "spdk/env.h" 41 #include "spdk/event.h" 42 #include "spdk/thread.h" 43 #include "spdk/likely.h" 44 #include "spdk/queue.h" 45 #include "spdk/nvme_spec.h" 46 #include "spdk/scsi_spec.h" 47 #include "spdk/notify.h" 48 #include "spdk/util.h" 49 #include "spdk/trace.h" 50 51 #include "spdk/bdev_module.h" 52 #include "spdk_internal/log.h" 53 #include "spdk/string.h" 54 55 #ifdef SPDK_CONFIG_VTUNE 56 #include "ittnotify.h" 57 #include "ittnotify_types.h" 58 int __itt_init_ittlib(const char *, __itt_group_id); 59 #endif 60 61 #define SPDK_BDEV_IO_POOL_SIZE (64 * 1024 - 1) 62 #define SPDK_BDEV_IO_CACHE_SIZE 256 63 #define BUF_SMALL_POOL_SIZE 8191 64 #define BUF_LARGE_POOL_SIZE 1023 65 #define NOMEM_THRESHOLD_COUNT 8 66 #define ZERO_BUFFER_SIZE 0x100000 67 68 #define OWNER_BDEV 0x2 69 70 #define OBJECT_BDEV_IO 0x2 71 72 #define TRACE_GROUP_BDEV 0x3 73 #define TRACE_BDEV_IO_START SPDK_TPOINT_ID(TRACE_GROUP_BDEV, 0x0) 74 #define TRACE_BDEV_IO_DONE SPDK_TPOINT_ID(TRACE_GROUP_BDEV, 0x1) 75 76 #define SPDK_BDEV_QOS_TIMESLICE_IN_USEC 1000 77 #define SPDK_BDEV_QOS_MIN_IO_PER_TIMESLICE 1 78 #define SPDK_BDEV_QOS_MIN_BYTE_PER_TIMESLICE 512 79 #define SPDK_BDEV_QOS_MIN_IOS_PER_SEC 10000 80 #define SPDK_BDEV_QOS_MIN_BYTES_PER_SEC (10 * 1024 * 1024) 81 #define SPDK_BDEV_QOS_LIMIT_NOT_DEFINED UINT64_MAX 82 83 #define SPDK_BDEV_POOL_ALIGNMENT 512 84 85 static const char *qos_conf_type[] = {"Limit_IOPS", 86 "Limit_BPS", "Limit_Read_BPS", "Limit_Write_BPS" 87 }; 88 static const char *qos_rpc_type[] = {"rw_ios_per_sec", 89 "rw_mbytes_per_sec", "r_mbytes_per_sec", "w_mbytes_per_sec" 90 }; 91 92 TAILQ_HEAD(spdk_bdev_list, spdk_bdev); 93 94 struct spdk_bdev_mgr { 95 struct spdk_mempool *bdev_io_pool; 96 97 struct spdk_mempool *buf_small_pool; 98 struct spdk_mempool *buf_large_pool; 99 100 void *zero_buffer; 101 102 TAILQ_HEAD(bdev_module_list, spdk_bdev_module) bdev_modules; 103 104 struct spdk_bdev_list bdevs; 105 106 bool init_complete; 107 bool module_init_complete; 108 109 #ifdef SPDK_CONFIG_VTUNE 110 __itt_domain *domain; 111 #endif 112 }; 113 114 static struct spdk_bdev_mgr g_bdev_mgr = { 115 .bdev_modules = TAILQ_HEAD_INITIALIZER(g_bdev_mgr.bdev_modules), 116 .bdevs = TAILQ_HEAD_INITIALIZER(g_bdev_mgr.bdevs), 117 .init_complete = false, 118 .module_init_complete = false, 119 }; 120 121 static struct spdk_bdev_opts g_bdev_opts = { 122 .bdev_io_pool_size = SPDK_BDEV_IO_POOL_SIZE, 123 .bdev_io_cache_size = SPDK_BDEV_IO_CACHE_SIZE, 124 }; 125 126 static spdk_bdev_init_cb g_init_cb_fn = NULL; 127 static void *g_init_cb_arg = NULL; 128 129 static spdk_bdev_fini_cb g_fini_cb_fn = NULL; 130 static void *g_fini_cb_arg = NULL; 131 static struct spdk_thread *g_fini_thread = NULL; 132 133 struct spdk_bdev_qos_limit { 134 /** IOs or bytes allowed per second (i.e., 1s). */ 135 uint64_t limit; 136 137 /** Remaining IOs or bytes allowed in current timeslice (e.g., 1ms). 138 * For remaining bytes, allowed to run negative if an I/O is submitted when 139 * some bytes are remaining, but the I/O is bigger than that amount. The 140 * excess will be deducted from the next timeslice. 141 */ 142 int64_t remaining_this_timeslice; 143 144 /** Minimum allowed IOs or bytes to be issued in one timeslice (e.g., 1ms). */ 145 uint32_t min_per_timeslice; 146 147 /** Maximum allowed IOs or bytes to be issued in one timeslice (e.g., 1ms). */ 148 uint32_t max_per_timeslice; 149 150 /** Function to check whether to queue the IO. */ 151 bool (*queue_io)(const struct spdk_bdev_qos_limit *limit, struct spdk_bdev_io *io); 152 153 /** Function to update for the submitted IO. */ 154 void (*update_quota)(struct spdk_bdev_qos_limit *limit, struct spdk_bdev_io *io); 155 }; 156 157 struct spdk_bdev_qos { 158 /** Types of structure of rate limits. */ 159 struct spdk_bdev_qos_limit rate_limits[SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES]; 160 161 /** The channel that all I/O are funneled through. */ 162 struct spdk_bdev_channel *ch; 163 164 /** The thread on which the poller is running. */ 165 struct spdk_thread *thread; 166 167 /** Queue of I/O waiting to be issued. */ 168 bdev_io_tailq_t queued; 169 170 /** Size of a timeslice in tsc ticks. */ 171 uint64_t timeslice_size; 172 173 /** Timestamp of start of last timeslice. */ 174 uint64_t last_timeslice; 175 176 /** Poller that processes queued I/O commands each time slice. */ 177 struct spdk_poller *poller; 178 }; 179 180 struct spdk_bdev_mgmt_channel { 181 bdev_io_stailq_t need_buf_small; 182 bdev_io_stailq_t need_buf_large; 183 184 /* 185 * Each thread keeps a cache of bdev_io - this allows 186 * bdev threads which are *not* DPDK threads to still 187 * benefit from a per-thread bdev_io cache. Without 188 * this, non-DPDK threads fetching from the mempool 189 * incur a cmpxchg on get and put. 190 */ 191 bdev_io_stailq_t per_thread_cache; 192 uint32_t per_thread_cache_count; 193 uint32_t bdev_io_cache_size; 194 195 TAILQ_HEAD(, spdk_bdev_shared_resource) shared_resources; 196 TAILQ_HEAD(, spdk_bdev_io_wait_entry) io_wait_queue; 197 }; 198 199 /* 200 * Per-module (or per-io_device) data. Multiple bdevs built on the same io_device 201 * will queue here their IO that awaits retry. It makes it possible to retry sending 202 * IO to one bdev after IO from other bdev completes. 203 */ 204 struct spdk_bdev_shared_resource { 205 /* The bdev management channel */ 206 struct spdk_bdev_mgmt_channel *mgmt_ch; 207 208 /* 209 * Count of I/O submitted to bdev module and waiting for completion. 210 * Incremented before submit_request() is called on an spdk_bdev_io. 211 */ 212 uint64_t io_outstanding; 213 214 /* 215 * Queue of IO awaiting retry because of a previous NOMEM status returned 216 * on this channel. 217 */ 218 bdev_io_tailq_t nomem_io; 219 220 /* 221 * Threshold which io_outstanding must drop to before retrying nomem_io. 222 */ 223 uint64_t nomem_threshold; 224 225 /* I/O channel allocated by a bdev module */ 226 struct spdk_io_channel *shared_ch; 227 228 /* Refcount of bdev channels using this resource */ 229 uint32_t ref; 230 231 TAILQ_ENTRY(spdk_bdev_shared_resource) link; 232 }; 233 234 #define BDEV_CH_RESET_IN_PROGRESS (1 << 0) 235 #define BDEV_CH_QOS_ENABLED (1 << 1) 236 237 struct spdk_bdev_channel { 238 struct spdk_bdev *bdev; 239 240 /* The channel for the underlying device */ 241 struct spdk_io_channel *channel; 242 243 /* Per io_device per thread data */ 244 struct spdk_bdev_shared_resource *shared_resource; 245 246 struct spdk_bdev_io_stat stat; 247 248 /* 249 * Count of I/O submitted through this channel and waiting for completion. 250 * Incremented before submit_request() is called on an spdk_bdev_io. 251 */ 252 uint64_t io_outstanding; 253 254 bdev_io_tailq_t queued_resets; 255 256 uint32_t flags; 257 258 struct spdk_histogram_data *histogram; 259 260 #ifdef SPDK_CONFIG_VTUNE 261 uint64_t start_tsc; 262 uint64_t interval_tsc; 263 __itt_string_handle *handle; 264 struct spdk_bdev_io_stat prev_stat; 265 #endif 266 267 }; 268 269 struct spdk_bdev_desc { 270 struct spdk_bdev *bdev; 271 struct spdk_thread *thread; 272 spdk_bdev_remove_cb_t remove_cb; 273 void *remove_ctx; 274 bool remove_scheduled; 275 bool closed; 276 bool write; 277 TAILQ_ENTRY(spdk_bdev_desc) link; 278 }; 279 280 struct spdk_bdev_iostat_ctx { 281 struct spdk_bdev_io_stat *stat; 282 spdk_bdev_get_device_stat_cb cb; 283 void *cb_arg; 284 }; 285 286 struct set_qos_limit_ctx { 287 void (*cb_fn)(void *cb_arg, int status); 288 void *cb_arg; 289 struct spdk_bdev *bdev; 290 }; 291 292 #define __bdev_to_io_dev(bdev) (((char *)bdev) + 1) 293 #define __bdev_from_io_dev(io_dev) ((struct spdk_bdev *)(((char *)io_dev) - 1)) 294 295 static void _spdk_bdev_write_zero_buffer_done(struct spdk_bdev_io *bdev_io, bool success, 296 void *cb_arg); 297 static void _spdk_bdev_write_zero_buffer_next(void *_bdev_io); 298 299 static void _spdk_bdev_enable_qos_msg(struct spdk_io_channel_iter *i); 300 static void _spdk_bdev_enable_qos_done(struct spdk_io_channel_iter *i, int status); 301 302 void 303 spdk_bdev_get_opts(struct spdk_bdev_opts *opts) 304 { 305 *opts = g_bdev_opts; 306 } 307 308 int 309 spdk_bdev_set_opts(struct spdk_bdev_opts *opts) 310 { 311 uint32_t min_pool_size; 312 313 /* 314 * Add 1 to the thread count to account for the extra mgmt_ch that gets created during subsystem 315 * initialization. A second mgmt_ch will be created on the same thread when the application starts 316 * but before the deferred put_io_channel event is executed for the first mgmt_ch. 317 */ 318 min_pool_size = opts->bdev_io_cache_size * (spdk_thread_get_count() + 1); 319 if (opts->bdev_io_pool_size < min_pool_size) { 320 SPDK_ERRLOG("bdev_io_pool_size %" PRIu32 " is not compatible with bdev_io_cache_size %" PRIu32 321 " and %" PRIu32 " threads\n", opts->bdev_io_pool_size, opts->bdev_io_cache_size, 322 spdk_thread_get_count()); 323 SPDK_ERRLOG("bdev_io_pool_size must be at least %" PRIu32 "\n", min_pool_size); 324 return -1; 325 } 326 327 g_bdev_opts = *opts; 328 return 0; 329 } 330 331 struct spdk_bdev * 332 spdk_bdev_first(void) 333 { 334 struct spdk_bdev *bdev; 335 336 bdev = TAILQ_FIRST(&g_bdev_mgr.bdevs); 337 if (bdev) { 338 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Starting bdev iteration at %s\n", bdev->name); 339 } 340 341 return bdev; 342 } 343 344 struct spdk_bdev * 345 spdk_bdev_next(struct spdk_bdev *prev) 346 { 347 struct spdk_bdev *bdev; 348 349 bdev = TAILQ_NEXT(prev, internal.link); 350 if (bdev) { 351 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Continuing bdev iteration at %s\n", bdev->name); 352 } 353 354 return bdev; 355 } 356 357 static struct spdk_bdev * 358 _bdev_next_leaf(struct spdk_bdev *bdev) 359 { 360 while (bdev != NULL) { 361 if (bdev->internal.claim_module == NULL) { 362 return bdev; 363 } else { 364 bdev = TAILQ_NEXT(bdev, internal.link); 365 } 366 } 367 368 return bdev; 369 } 370 371 struct spdk_bdev * 372 spdk_bdev_first_leaf(void) 373 { 374 struct spdk_bdev *bdev; 375 376 bdev = _bdev_next_leaf(TAILQ_FIRST(&g_bdev_mgr.bdevs)); 377 378 if (bdev) { 379 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Starting bdev iteration at %s\n", bdev->name); 380 } 381 382 return bdev; 383 } 384 385 struct spdk_bdev * 386 spdk_bdev_next_leaf(struct spdk_bdev *prev) 387 { 388 struct spdk_bdev *bdev; 389 390 bdev = _bdev_next_leaf(TAILQ_NEXT(prev, internal.link)); 391 392 if (bdev) { 393 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Continuing bdev iteration at %s\n", bdev->name); 394 } 395 396 return bdev; 397 } 398 399 struct spdk_bdev * 400 spdk_bdev_get_by_name(const char *bdev_name) 401 { 402 struct spdk_bdev_alias *tmp; 403 struct spdk_bdev *bdev = spdk_bdev_first(); 404 405 while (bdev != NULL) { 406 if (strcmp(bdev_name, bdev->name) == 0) { 407 return bdev; 408 } 409 410 TAILQ_FOREACH(tmp, &bdev->aliases, tailq) { 411 if (strcmp(bdev_name, tmp->alias) == 0) { 412 return bdev; 413 } 414 } 415 416 bdev = spdk_bdev_next(bdev); 417 } 418 419 return NULL; 420 } 421 422 void 423 spdk_bdev_io_set_buf(struct spdk_bdev_io *bdev_io, void *buf, size_t len) 424 { 425 struct iovec *iovs; 426 427 if (bdev_io->u.bdev.iovs == NULL) { 428 bdev_io->u.bdev.iovs = &bdev_io->iov; 429 bdev_io->u.bdev.iovcnt = 1; 430 } 431 432 iovs = bdev_io->u.bdev.iovs; 433 434 assert(iovs != NULL); 435 assert(bdev_io->u.bdev.iovcnt >= 1); 436 437 iovs[0].iov_base = buf; 438 iovs[0].iov_len = len; 439 } 440 441 static bool 442 _is_buf_allocated(struct iovec *iovs) 443 { 444 if (iovs == NULL) { 445 return false; 446 } 447 448 return iovs[0].iov_base != NULL; 449 } 450 451 static bool 452 _are_iovs_aligned(struct iovec *iovs, int iovcnt, uint32_t alignment) 453 { 454 int i; 455 uintptr_t iov_base; 456 457 if (spdk_likely(alignment == 1)) { 458 return true; 459 } 460 461 for (i = 0; i < iovcnt; i++) { 462 iov_base = (uintptr_t)iovs[i].iov_base; 463 if ((iov_base & (alignment - 1)) != 0) { 464 return false; 465 } 466 } 467 468 return true; 469 } 470 471 static void 472 _copy_iovs_to_buf(void *buf, size_t buf_len, struct iovec *iovs, int iovcnt) 473 { 474 int i; 475 size_t len; 476 477 for (i = 0; i < iovcnt; i++) { 478 len = spdk_min(iovs[i].iov_len, buf_len); 479 memcpy(buf, iovs[i].iov_base, len); 480 buf += len; 481 buf_len -= len; 482 } 483 } 484 485 static void 486 _copy_buf_to_iovs(struct iovec *iovs, int iovcnt, void *buf, size_t buf_len) 487 { 488 int i; 489 size_t len; 490 491 for (i = 0; i < iovcnt; i++) { 492 len = spdk_min(iovs[i].iov_len, buf_len); 493 memcpy(iovs[i].iov_base, buf, len); 494 buf += len; 495 buf_len -= len; 496 } 497 } 498 499 static void 500 _bdev_io_set_bounce_buf(struct spdk_bdev_io *bdev_io, void *buf, size_t len) 501 { 502 /* save original iovec */ 503 bdev_io->internal.orig_iovs = bdev_io->u.bdev.iovs; 504 bdev_io->internal.orig_iovcnt = bdev_io->u.bdev.iovcnt; 505 /* set bounce iov */ 506 bdev_io->u.bdev.iovs = &bdev_io->internal.bounce_iov; 507 bdev_io->u.bdev.iovcnt = 1; 508 /* set bounce buffer for this operation */ 509 bdev_io->u.bdev.iovs[0].iov_base = buf; 510 bdev_io->u.bdev.iovs[0].iov_len = len; 511 /* if this is write path, copy data from original buffer to bounce buffer */ 512 if (bdev_io->type == SPDK_BDEV_IO_TYPE_WRITE) { 513 _copy_iovs_to_buf(buf, len, bdev_io->internal.orig_iovs, bdev_io->internal.orig_iovcnt); 514 } 515 } 516 517 static void 518 spdk_bdev_io_put_buf(struct spdk_bdev_io *bdev_io) 519 { 520 struct spdk_mempool *pool; 521 struct spdk_bdev_io *tmp; 522 void *buf, *aligned_buf; 523 bdev_io_stailq_t *stailq; 524 struct spdk_bdev_mgmt_channel *ch; 525 uint64_t buf_len; 526 uint64_t alignment; 527 bool buf_allocated; 528 529 buf = bdev_io->internal.buf; 530 buf_len = bdev_io->internal.buf_len; 531 alignment = spdk_bdev_get_buf_align(bdev_io->bdev); 532 ch = bdev_io->internal.ch->shared_resource->mgmt_ch; 533 534 bdev_io->internal.buf = NULL; 535 536 if (buf_len + alignment <= SPDK_BDEV_BUF_SIZE_WITH_MD(SPDK_BDEV_SMALL_BUF_MAX_SIZE) + 537 SPDK_BDEV_POOL_ALIGNMENT) { 538 pool = g_bdev_mgr.buf_small_pool; 539 stailq = &ch->need_buf_small; 540 } else { 541 pool = g_bdev_mgr.buf_large_pool; 542 stailq = &ch->need_buf_large; 543 } 544 545 if (STAILQ_EMPTY(stailq)) { 546 spdk_mempool_put(pool, buf); 547 } else { 548 tmp = STAILQ_FIRST(stailq); 549 550 alignment = spdk_bdev_get_buf_align(tmp->bdev); 551 buf_allocated = _is_buf_allocated(tmp->u.bdev.iovs); 552 553 aligned_buf = (void *)(((uintptr_t)buf + 554 (alignment - 1)) & ~(alignment - 1)); 555 if (buf_allocated) { 556 _bdev_io_set_bounce_buf(tmp, aligned_buf, tmp->internal.buf_len); 557 } else { 558 spdk_bdev_io_set_buf(tmp, aligned_buf, tmp->internal.buf_len); 559 } 560 561 STAILQ_REMOVE_HEAD(stailq, internal.buf_link); 562 tmp->internal.buf = buf; 563 tmp->internal.get_buf_cb(tmp->internal.ch->channel, tmp, true); 564 } 565 } 566 567 static void 568 _bdev_io_unset_bounce_buf(struct spdk_bdev_io *bdev_io) 569 { 570 /* if this is read path, copy data from bounce buffer to original buffer */ 571 if (bdev_io->type == SPDK_BDEV_IO_TYPE_READ && 572 bdev_io->internal.status == SPDK_BDEV_IO_STATUS_SUCCESS) { 573 _copy_buf_to_iovs(bdev_io->internal.orig_iovs, bdev_io->internal.orig_iovcnt, 574 bdev_io->internal.bounce_iov.iov_base, bdev_io->internal.bounce_iov.iov_len); 575 } 576 /* set orignal buffer for this io */ 577 bdev_io->u.bdev.iovcnt = bdev_io->internal.orig_iovcnt; 578 bdev_io->u.bdev.iovs = bdev_io->internal.orig_iovs; 579 /* disable bouncing buffer for this io */ 580 bdev_io->internal.orig_iovcnt = 0; 581 bdev_io->internal.orig_iovs = NULL; 582 /* return bounce buffer to the pool */ 583 spdk_bdev_io_put_buf(bdev_io); 584 } 585 586 void 587 spdk_bdev_io_get_buf(struct spdk_bdev_io *bdev_io, spdk_bdev_io_get_buf_cb cb, uint64_t len) 588 { 589 struct spdk_mempool *pool; 590 bdev_io_stailq_t *stailq; 591 void *buf, *aligned_buf; 592 struct spdk_bdev_mgmt_channel *mgmt_ch; 593 uint64_t alignment; 594 bool buf_allocated; 595 596 assert(cb != NULL); 597 598 alignment = spdk_bdev_get_buf_align(bdev_io->bdev); 599 buf_allocated = _is_buf_allocated(bdev_io->u.bdev.iovs); 600 601 if (buf_allocated && 602 _are_iovs_aligned(bdev_io->u.bdev.iovs, bdev_io->u.bdev.iovcnt, alignment)) { 603 /* Buffer already present and aligned */ 604 cb(bdev_io->internal.ch->channel, bdev_io, true); 605 return; 606 } 607 608 if (len + alignment > SPDK_BDEV_BUF_SIZE_WITH_MD(SPDK_BDEV_LARGE_BUF_MAX_SIZE) + 609 SPDK_BDEV_POOL_ALIGNMENT) { 610 SPDK_ERRLOG("Length + alignment %" PRIu64 " is larger than allowed\n", 611 len + alignment); 612 cb(bdev_io->internal.ch->channel, bdev_io, false); 613 return; 614 } 615 616 mgmt_ch = bdev_io->internal.ch->shared_resource->mgmt_ch; 617 618 bdev_io->internal.buf_len = len; 619 bdev_io->internal.get_buf_cb = cb; 620 621 if (len + alignment <= SPDK_BDEV_BUF_SIZE_WITH_MD(SPDK_BDEV_SMALL_BUF_MAX_SIZE) + 622 SPDK_BDEV_POOL_ALIGNMENT) { 623 pool = g_bdev_mgr.buf_small_pool; 624 stailq = &mgmt_ch->need_buf_small; 625 } else { 626 pool = g_bdev_mgr.buf_large_pool; 627 stailq = &mgmt_ch->need_buf_large; 628 } 629 630 buf = spdk_mempool_get(pool); 631 632 if (!buf) { 633 STAILQ_INSERT_TAIL(stailq, bdev_io, internal.buf_link); 634 } else { 635 aligned_buf = (void *)(((uintptr_t)buf + (alignment - 1)) & ~(alignment - 1)); 636 637 if (buf_allocated) { 638 _bdev_io_set_bounce_buf(bdev_io, aligned_buf, len); 639 } else { 640 spdk_bdev_io_set_buf(bdev_io, aligned_buf, len); 641 } 642 bdev_io->internal.buf = buf; 643 bdev_io->internal.get_buf_cb(bdev_io->internal.ch->channel, bdev_io, true); 644 } 645 } 646 647 static int 648 spdk_bdev_module_get_max_ctx_size(void) 649 { 650 struct spdk_bdev_module *bdev_module; 651 int max_bdev_module_size = 0; 652 653 TAILQ_FOREACH(bdev_module, &g_bdev_mgr.bdev_modules, internal.tailq) { 654 if (bdev_module->get_ctx_size && bdev_module->get_ctx_size() > max_bdev_module_size) { 655 max_bdev_module_size = bdev_module->get_ctx_size(); 656 } 657 } 658 659 return max_bdev_module_size; 660 } 661 662 void 663 spdk_bdev_config_text(FILE *fp) 664 { 665 struct spdk_bdev_module *bdev_module; 666 667 TAILQ_FOREACH(bdev_module, &g_bdev_mgr.bdev_modules, internal.tailq) { 668 if (bdev_module->config_text) { 669 bdev_module->config_text(fp); 670 } 671 } 672 } 673 674 static void 675 spdk_bdev_qos_config_json(struct spdk_bdev *bdev, struct spdk_json_write_ctx *w) 676 { 677 int i; 678 struct spdk_bdev_qos *qos = bdev->internal.qos; 679 uint64_t limits[SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES]; 680 681 if (!qos) { 682 return; 683 } 684 685 spdk_bdev_get_qos_rate_limits(bdev, limits); 686 687 spdk_json_write_object_begin(w); 688 spdk_json_write_named_string(w, "method", "set_bdev_qos_limit"); 689 690 spdk_json_write_named_object_begin(w, "params"); 691 spdk_json_write_named_string(w, "name", bdev->name); 692 for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) { 693 if (limits[i] > 0) { 694 spdk_json_write_named_uint64(w, qos_rpc_type[i], limits[i]); 695 } 696 } 697 spdk_json_write_object_end(w); 698 699 spdk_json_write_object_end(w); 700 } 701 702 void 703 spdk_bdev_subsystem_config_json(struct spdk_json_write_ctx *w) 704 { 705 struct spdk_bdev_module *bdev_module; 706 struct spdk_bdev *bdev; 707 708 assert(w != NULL); 709 710 spdk_json_write_array_begin(w); 711 712 spdk_json_write_object_begin(w); 713 spdk_json_write_named_string(w, "method", "set_bdev_options"); 714 spdk_json_write_named_object_begin(w, "params"); 715 spdk_json_write_named_uint32(w, "bdev_io_pool_size", g_bdev_opts.bdev_io_pool_size); 716 spdk_json_write_named_uint32(w, "bdev_io_cache_size", g_bdev_opts.bdev_io_cache_size); 717 spdk_json_write_object_end(w); 718 spdk_json_write_object_end(w); 719 720 TAILQ_FOREACH(bdev_module, &g_bdev_mgr.bdev_modules, internal.tailq) { 721 if (bdev_module->config_json) { 722 bdev_module->config_json(w); 723 } 724 } 725 726 TAILQ_FOREACH(bdev, &g_bdev_mgr.bdevs, internal.link) { 727 if (bdev->fn_table->write_config_json) { 728 bdev->fn_table->write_config_json(bdev, w); 729 } 730 731 spdk_bdev_qos_config_json(bdev, w); 732 } 733 734 spdk_json_write_array_end(w); 735 } 736 737 static int 738 spdk_bdev_mgmt_channel_create(void *io_device, void *ctx_buf) 739 { 740 struct spdk_bdev_mgmt_channel *ch = ctx_buf; 741 struct spdk_bdev_io *bdev_io; 742 uint32_t i; 743 744 STAILQ_INIT(&ch->need_buf_small); 745 STAILQ_INIT(&ch->need_buf_large); 746 747 STAILQ_INIT(&ch->per_thread_cache); 748 ch->bdev_io_cache_size = g_bdev_opts.bdev_io_cache_size; 749 750 /* Pre-populate bdev_io cache to ensure this thread cannot be starved. */ 751 ch->per_thread_cache_count = 0; 752 for (i = 0; i < ch->bdev_io_cache_size; i++) { 753 bdev_io = spdk_mempool_get(g_bdev_mgr.bdev_io_pool); 754 assert(bdev_io != NULL); 755 ch->per_thread_cache_count++; 756 STAILQ_INSERT_HEAD(&ch->per_thread_cache, bdev_io, internal.buf_link); 757 } 758 759 TAILQ_INIT(&ch->shared_resources); 760 TAILQ_INIT(&ch->io_wait_queue); 761 762 return 0; 763 } 764 765 static void 766 spdk_bdev_mgmt_channel_destroy(void *io_device, void *ctx_buf) 767 { 768 struct spdk_bdev_mgmt_channel *ch = ctx_buf; 769 struct spdk_bdev_io *bdev_io; 770 771 if (!STAILQ_EMPTY(&ch->need_buf_small) || !STAILQ_EMPTY(&ch->need_buf_large)) { 772 SPDK_ERRLOG("Pending I/O list wasn't empty on mgmt channel free\n"); 773 } 774 775 if (!TAILQ_EMPTY(&ch->shared_resources)) { 776 SPDK_ERRLOG("Module channel list wasn't empty on mgmt channel free\n"); 777 } 778 779 while (!STAILQ_EMPTY(&ch->per_thread_cache)) { 780 bdev_io = STAILQ_FIRST(&ch->per_thread_cache); 781 STAILQ_REMOVE_HEAD(&ch->per_thread_cache, internal.buf_link); 782 ch->per_thread_cache_count--; 783 spdk_mempool_put(g_bdev_mgr.bdev_io_pool, (void *)bdev_io); 784 } 785 786 assert(ch->per_thread_cache_count == 0); 787 } 788 789 static void 790 spdk_bdev_init_complete(int rc) 791 { 792 spdk_bdev_init_cb cb_fn = g_init_cb_fn; 793 void *cb_arg = g_init_cb_arg; 794 struct spdk_bdev_module *m; 795 796 g_bdev_mgr.init_complete = true; 797 g_init_cb_fn = NULL; 798 g_init_cb_arg = NULL; 799 800 /* 801 * For modules that need to know when subsystem init is complete, 802 * inform them now. 803 */ 804 if (rc == 0) { 805 TAILQ_FOREACH(m, &g_bdev_mgr.bdev_modules, internal.tailq) { 806 if (m->init_complete) { 807 m->init_complete(); 808 } 809 } 810 } 811 812 cb_fn(cb_arg, rc); 813 } 814 815 static void 816 spdk_bdev_module_action_complete(void) 817 { 818 struct spdk_bdev_module *m; 819 820 /* 821 * Don't finish bdev subsystem initialization if 822 * module pre-initialization is still in progress, or 823 * the subsystem been already initialized. 824 */ 825 if (!g_bdev_mgr.module_init_complete || g_bdev_mgr.init_complete) { 826 return; 827 } 828 829 /* 830 * Check all bdev modules for inits/examinations in progress. If any 831 * exist, return immediately since we cannot finish bdev subsystem 832 * initialization until all are completed. 833 */ 834 TAILQ_FOREACH(m, &g_bdev_mgr.bdev_modules, internal.tailq) { 835 if (m->internal.action_in_progress > 0) { 836 return; 837 } 838 } 839 840 /* 841 * Modules already finished initialization - now that all 842 * the bdev modules have finished their asynchronous I/O 843 * processing, the entire bdev layer can be marked as complete. 844 */ 845 spdk_bdev_init_complete(0); 846 } 847 848 static void 849 spdk_bdev_module_action_done(struct spdk_bdev_module *module) 850 { 851 assert(module->internal.action_in_progress > 0); 852 module->internal.action_in_progress--; 853 spdk_bdev_module_action_complete(); 854 } 855 856 void 857 spdk_bdev_module_init_done(struct spdk_bdev_module *module) 858 { 859 spdk_bdev_module_action_done(module); 860 } 861 862 void 863 spdk_bdev_module_examine_done(struct spdk_bdev_module *module) 864 { 865 spdk_bdev_module_action_done(module); 866 } 867 868 /** The last initialized bdev module */ 869 static struct spdk_bdev_module *g_resume_bdev_module = NULL; 870 871 static int 872 spdk_bdev_modules_init(void) 873 { 874 struct spdk_bdev_module *module; 875 int rc = 0; 876 877 TAILQ_FOREACH(module, &g_bdev_mgr.bdev_modules, internal.tailq) { 878 g_resume_bdev_module = module; 879 if (module->async_init) { 880 module->internal.action_in_progress = 1; 881 } 882 rc = module->module_init(); 883 if (rc != 0) { 884 return rc; 885 } 886 } 887 888 g_resume_bdev_module = NULL; 889 return 0; 890 } 891 892 static void 893 spdk_bdev_init_failed(void *cb_arg) 894 { 895 spdk_bdev_init_complete(-1); 896 } 897 898 void 899 spdk_bdev_initialize(spdk_bdev_init_cb cb_fn, void *cb_arg) 900 { 901 struct spdk_conf_section *sp; 902 struct spdk_bdev_opts bdev_opts; 903 int32_t bdev_io_pool_size, bdev_io_cache_size; 904 int cache_size; 905 int rc = 0; 906 char mempool_name[32]; 907 908 assert(cb_fn != NULL); 909 910 sp = spdk_conf_find_section(NULL, "Bdev"); 911 if (sp != NULL) { 912 spdk_bdev_get_opts(&bdev_opts); 913 914 bdev_io_pool_size = spdk_conf_section_get_intval(sp, "BdevIoPoolSize"); 915 if (bdev_io_pool_size >= 0) { 916 bdev_opts.bdev_io_pool_size = bdev_io_pool_size; 917 } 918 919 bdev_io_cache_size = spdk_conf_section_get_intval(sp, "BdevIoCacheSize"); 920 if (bdev_io_cache_size >= 0) { 921 bdev_opts.bdev_io_cache_size = bdev_io_cache_size; 922 } 923 924 if (spdk_bdev_set_opts(&bdev_opts)) { 925 spdk_bdev_init_complete(-1); 926 return; 927 } 928 929 assert(memcmp(&bdev_opts, &g_bdev_opts, sizeof(bdev_opts)) == 0); 930 } 931 932 g_init_cb_fn = cb_fn; 933 g_init_cb_arg = cb_arg; 934 935 spdk_notify_type_register("bdev_register"); 936 spdk_notify_type_register("bdev_unregister"); 937 938 snprintf(mempool_name, sizeof(mempool_name), "bdev_io_%d", getpid()); 939 940 g_bdev_mgr.bdev_io_pool = spdk_mempool_create(mempool_name, 941 g_bdev_opts.bdev_io_pool_size, 942 sizeof(struct spdk_bdev_io) + 943 spdk_bdev_module_get_max_ctx_size(), 944 0, 945 SPDK_ENV_SOCKET_ID_ANY); 946 947 if (g_bdev_mgr.bdev_io_pool == NULL) { 948 SPDK_ERRLOG("could not allocate spdk_bdev_io pool\n"); 949 spdk_bdev_init_complete(-1); 950 return; 951 } 952 953 /** 954 * Ensure no more than half of the total buffers end up local caches, by 955 * using spdk_thread_get_count() to determine how many local caches we need 956 * to account for. 957 */ 958 cache_size = BUF_SMALL_POOL_SIZE / (2 * spdk_thread_get_count()); 959 snprintf(mempool_name, sizeof(mempool_name), "buf_small_pool_%d", getpid()); 960 961 g_bdev_mgr.buf_small_pool = spdk_mempool_create(mempool_name, 962 BUF_SMALL_POOL_SIZE, 963 SPDK_BDEV_BUF_SIZE_WITH_MD(SPDK_BDEV_SMALL_BUF_MAX_SIZE) + 964 SPDK_BDEV_POOL_ALIGNMENT, 965 cache_size, 966 SPDK_ENV_SOCKET_ID_ANY); 967 if (!g_bdev_mgr.buf_small_pool) { 968 SPDK_ERRLOG("create rbuf small pool failed\n"); 969 spdk_bdev_init_complete(-1); 970 return; 971 } 972 973 cache_size = BUF_LARGE_POOL_SIZE / (2 * spdk_thread_get_count()); 974 snprintf(mempool_name, sizeof(mempool_name), "buf_large_pool_%d", getpid()); 975 976 g_bdev_mgr.buf_large_pool = spdk_mempool_create(mempool_name, 977 BUF_LARGE_POOL_SIZE, 978 SPDK_BDEV_BUF_SIZE_WITH_MD(SPDK_BDEV_LARGE_BUF_MAX_SIZE) + 979 SPDK_BDEV_POOL_ALIGNMENT, 980 cache_size, 981 SPDK_ENV_SOCKET_ID_ANY); 982 if (!g_bdev_mgr.buf_large_pool) { 983 SPDK_ERRLOG("create rbuf large pool failed\n"); 984 spdk_bdev_init_complete(-1); 985 return; 986 } 987 988 g_bdev_mgr.zero_buffer = spdk_zmalloc(ZERO_BUFFER_SIZE, ZERO_BUFFER_SIZE, 989 NULL, SPDK_ENV_LCORE_ID_ANY, SPDK_MALLOC_DMA); 990 if (!g_bdev_mgr.zero_buffer) { 991 SPDK_ERRLOG("create bdev zero buffer failed\n"); 992 spdk_bdev_init_complete(-1); 993 return; 994 } 995 996 #ifdef SPDK_CONFIG_VTUNE 997 g_bdev_mgr.domain = __itt_domain_create("spdk_bdev"); 998 #endif 999 1000 spdk_io_device_register(&g_bdev_mgr, spdk_bdev_mgmt_channel_create, 1001 spdk_bdev_mgmt_channel_destroy, 1002 sizeof(struct spdk_bdev_mgmt_channel), 1003 "bdev_mgr"); 1004 1005 rc = spdk_bdev_modules_init(); 1006 g_bdev_mgr.module_init_complete = true; 1007 if (rc != 0) { 1008 SPDK_ERRLOG("bdev modules init failed\n"); 1009 spdk_thread_send_msg(spdk_get_thread(), spdk_bdev_init_failed, NULL); 1010 return; 1011 } 1012 1013 spdk_bdev_module_action_complete(); 1014 } 1015 1016 static void 1017 spdk_bdev_mgr_unregister_cb(void *io_device) 1018 { 1019 spdk_bdev_fini_cb cb_fn = g_fini_cb_fn; 1020 1021 if (spdk_mempool_count(g_bdev_mgr.bdev_io_pool) != g_bdev_opts.bdev_io_pool_size) { 1022 SPDK_ERRLOG("bdev IO pool count is %zu but should be %u\n", 1023 spdk_mempool_count(g_bdev_mgr.bdev_io_pool), 1024 g_bdev_opts.bdev_io_pool_size); 1025 } 1026 1027 if (spdk_mempool_count(g_bdev_mgr.buf_small_pool) != BUF_SMALL_POOL_SIZE) { 1028 SPDK_ERRLOG("Small buffer pool count is %zu but should be %u\n", 1029 spdk_mempool_count(g_bdev_mgr.buf_small_pool), 1030 BUF_SMALL_POOL_SIZE); 1031 assert(false); 1032 } 1033 1034 if (spdk_mempool_count(g_bdev_mgr.buf_large_pool) != BUF_LARGE_POOL_SIZE) { 1035 SPDK_ERRLOG("Large buffer pool count is %zu but should be %u\n", 1036 spdk_mempool_count(g_bdev_mgr.buf_large_pool), 1037 BUF_LARGE_POOL_SIZE); 1038 assert(false); 1039 } 1040 1041 spdk_mempool_free(g_bdev_mgr.bdev_io_pool); 1042 spdk_mempool_free(g_bdev_mgr.buf_small_pool); 1043 spdk_mempool_free(g_bdev_mgr.buf_large_pool); 1044 spdk_free(g_bdev_mgr.zero_buffer); 1045 1046 cb_fn(g_fini_cb_arg); 1047 g_fini_cb_fn = NULL; 1048 g_fini_cb_arg = NULL; 1049 g_bdev_mgr.init_complete = false; 1050 g_bdev_mgr.module_init_complete = false; 1051 } 1052 1053 static void 1054 spdk_bdev_module_finish_iter(void *arg) 1055 { 1056 struct spdk_bdev_module *bdev_module; 1057 1058 /* Start iterating from the last touched module */ 1059 if (!g_resume_bdev_module) { 1060 bdev_module = TAILQ_LAST(&g_bdev_mgr.bdev_modules, bdev_module_list); 1061 } else { 1062 bdev_module = TAILQ_PREV(g_resume_bdev_module, bdev_module_list, 1063 internal.tailq); 1064 } 1065 1066 while (bdev_module) { 1067 if (bdev_module->async_fini) { 1068 /* Save our place so we can resume later. We must 1069 * save the variable here, before calling module_fini() 1070 * below, because in some cases the module may immediately 1071 * call spdk_bdev_module_finish_done() and re-enter 1072 * this function to continue iterating. */ 1073 g_resume_bdev_module = bdev_module; 1074 } 1075 1076 if (bdev_module->module_fini) { 1077 bdev_module->module_fini(); 1078 } 1079 1080 if (bdev_module->async_fini) { 1081 return; 1082 } 1083 1084 bdev_module = TAILQ_PREV(bdev_module, bdev_module_list, 1085 internal.tailq); 1086 } 1087 1088 g_resume_bdev_module = NULL; 1089 spdk_io_device_unregister(&g_bdev_mgr, spdk_bdev_mgr_unregister_cb); 1090 } 1091 1092 void 1093 spdk_bdev_module_finish_done(void) 1094 { 1095 if (spdk_get_thread() != g_fini_thread) { 1096 spdk_thread_send_msg(g_fini_thread, spdk_bdev_module_finish_iter, NULL); 1097 } else { 1098 spdk_bdev_module_finish_iter(NULL); 1099 } 1100 } 1101 1102 static void 1103 _spdk_bdev_finish_unregister_bdevs_iter(void *cb_arg, int bdeverrno) 1104 { 1105 struct spdk_bdev *bdev = cb_arg; 1106 1107 if (bdeverrno && bdev) { 1108 SPDK_WARNLOG("Unable to unregister bdev '%s' during spdk_bdev_finish()\n", 1109 bdev->name); 1110 1111 /* 1112 * Since the call to spdk_bdev_unregister() failed, we have no way to free this 1113 * bdev; try to continue by manually removing this bdev from the list and continue 1114 * with the next bdev in the list. 1115 */ 1116 TAILQ_REMOVE(&g_bdev_mgr.bdevs, bdev, internal.link); 1117 } 1118 1119 if (TAILQ_EMPTY(&g_bdev_mgr.bdevs)) { 1120 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Done unregistering bdevs\n"); 1121 /* 1122 * Bdev module finish need to be deferred as we might be in the middle of some context 1123 * (like bdev part free) that will use this bdev (or private bdev driver ctx data) 1124 * after returning. 1125 */ 1126 spdk_thread_send_msg(spdk_get_thread(), spdk_bdev_module_finish_iter, NULL); 1127 return; 1128 } 1129 1130 /* 1131 * Unregister last unclaimed bdev in the list, to ensure that bdev subsystem 1132 * shutdown proceeds top-down. The goal is to give virtual bdevs an opportunity 1133 * to detect clean shutdown as opposed to run-time hot removal of the underlying 1134 * base bdevs. 1135 * 1136 * Also, walk the list in the reverse order. 1137 */ 1138 for (bdev = TAILQ_LAST(&g_bdev_mgr.bdevs, spdk_bdev_list); 1139 bdev; bdev = TAILQ_PREV(bdev, spdk_bdev_list, internal.link)) { 1140 if (bdev->internal.claim_module != NULL) { 1141 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Skipping claimed bdev '%s'(<-'%s').\n", 1142 bdev->name, bdev->internal.claim_module->name); 1143 continue; 1144 } 1145 1146 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Unregistering bdev '%s'\n", bdev->name); 1147 spdk_bdev_unregister(bdev, _spdk_bdev_finish_unregister_bdevs_iter, bdev); 1148 return; 1149 } 1150 1151 /* 1152 * If any bdev fails to unclaim underlying bdev properly, we may face the 1153 * case of bdev list consisting of claimed bdevs only (if claims are managed 1154 * correctly, this would mean there's a loop in the claims graph which is 1155 * clearly impossible). Warn and unregister last bdev on the list then. 1156 */ 1157 for (bdev = TAILQ_LAST(&g_bdev_mgr.bdevs, spdk_bdev_list); 1158 bdev; bdev = TAILQ_PREV(bdev, spdk_bdev_list, internal.link)) { 1159 SPDK_ERRLOG("Unregistering claimed bdev '%s'!\n", bdev->name); 1160 spdk_bdev_unregister(bdev, _spdk_bdev_finish_unregister_bdevs_iter, bdev); 1161 return; 1162 } 1163 } 1164 1165 void 1166 spdk_bdev_finish(spdk_bdev_fini_cb cb_fn, void *cb_arg) 1167 { 1168 struct spdk_bdev_module *m; 1169 1170 assert(cb_fn != NULL); 1171 1172 g_fini_thread = spdk_get_thread(); 1173 1174 g_fini_cb_fn = cb_fn; 1175 g_fini_cb_arg = cb_arg; 1176 1177 TAILQ_FOREACH(m, &g_bdev_mgr.bdev_modules, internal.tailq) { 1178 if (m->fini_start) { 1179 m->fini_start(); 1180 } 1181 } 1182 1183 _spdk_bdev_finish_unregister_bdevs_iter(NULL, 0); 1184 } 1185 1186 static struct spdk_bdev_io * 1187 spdk_bdev_get_io(struct spdk_bdev_channel *channel) 1188 { 1189 struct spdk_bdev_mgmt_channel *ch = channel->shared_resource->mgmt_ch; 1190 struct spdk_bdev_io *bdev_io; 1191 1192 if (ch->per_thread_cache_count > 0) { 1193 bdev_io = STAILQ_FIRST(&ch->per_thread_cache); 1194 STAILQ_REMOVE_HEAD(&ch->per_thread_cache, internal.buf_link); 1195 ch->per_thread_cache_count--; 1196 } else if (spdk_unlikely(!TAILQ_EMPTY(&ch->io_wait_queue))) { 1197 /* 1198 * Don't try to look for bdev_ios in the global pool if there are 1199 * waiters on bdev_ios - we don't want this caller to jump the line. 1200 */ 1201 bdev_io = NULL; 1202 } else { 1203 bdev_io = spdk_mempool_get(g_bdev_mgr.bdev_io_pool); 1204 } 1205 1206 return bdev_io; 1207 } 1208 1209 void 1210 spdk_bdev_free_io(struct spdk_bdev_io *bdev_io) 1211 { 1212 struct spdk_bdev_mgmt_channel *ch; 1213 1214 assert(bdev_io != NULL); 1215 assert(bdev_io->internal.status != SPDK_BDEV_IO_STATUS_PENDING); 1216 1217 ch = bdev_io->internal.ch->shared_resource->mgmt_ch; 1218 1219 if (bdev_io->internal.buf != NULL) { 1220 spdk_bdev_io_put_buf(bdev_io); 1221 } 1222 1223 if (ch->per_thread_cache_count < ch->bdev_io_cache_size) { 1224 ch->per_thread_cache_count++; 1225 STAILQ_INSERT_HEAD(&ch->per_thread_cache, bdev_io, internal.buf_link); 1226 while (ch->per_thread_cache_count > 0 && !TAILQ_EMPTY(&ch->io_wait_queue)) { 1227 struct spdk_bdev_io_wait_entry *entry; 1228 1229 entry = TAILQ_FIRST(&ch->io_wait_queue); 1230 TAILQ_REMOVE(&ch->io_wait_queue, entry, link); 1231 entry->cb_fn(entry->cb_arg); 1232 } 1233 } else { 1234 /* We should never have a full cache with entries on the io wait queue. */ 1235 assert(TAILQ_EMPTY(&ch->io_wait_queue)); 1236 spdk_mempool_put(g_bdev_mgr.bdev_io_pool, (void *)bdev_io); 1237 } 1238 } 1239 1240 static bool 1241 _spdk_bdev_qos_is_iops_rate_limit(enum spdk_bdev_qos_rate_limit_type limit) 1242 { 1243 assert(limit != SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES); 1244 1245 switch (limit) { 1246 case SPDK_BDEV_QOS_RW_IOPS_RATE_LIMIT: 1247 return true; 1248 case SPDK_BDEV_QOS_RW_BPS_RATE_LIMIT: 1249 case SPDK_BDEV_QOS_R_BPS_RATE_LIMIT: 1250 case SPDK_BDEV_QOS_W_BPS_RATE_LIMIT: 1251 return false; 1252 case SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES: 1253 default: 1254 return false; 1255 } 1256 } 1257 1258 static bool 1259 _spdk_bdev_qos_io_to_limit(struct spdk_bdev_io *bdev_io) 1260 { 1261 switch (bdev_io->type) { 1262 case SPDK_BDEV_IO_TYPE_NVME_IO: 1263 case SPDK_BDEV_IO_TYPE_NVME_IO_MD: 1264 case SPDK_BDEV_IO_TYPE_READ: 1265 case SPDK_BDEV_IO_TYPE_WRITE: 1266 return true; 1267 default: 1268 return false; 1269 } 1270 } 1271 1272 static bool 1273 _spdk_bdev_is_read_io(struct spdk_bdev_io *bdev_io) 1274 { 1275 switch (bdev_io->type) { 1276 case SPDK_BDEV_IO_TYPE_NVME_IO: 1277 case SPDK_BDEV_IO_TYPE_NVME_IO_MD: 1278 /* Bit 1 (0x2) set for read operation */ 1279 if (bdev_io->u.nvme_passthru.cmd.opc & SPDK_NVME_OPC_READ) { 1280 return true; 1281 } else { 1282 return false; 1283 } 1284 case SPDK_BDEV_IO_TYPE_READ: 1285 return true; 1286 default: 1287 return false; 1288 } 1289 } 1290 1291 static uint64_t 1292 _spdk_bdev_get_io_size_in_byte(struct spdk_bdev_io *bdev_io) 1293 { 1294 struct spdk_bdev *bdev = bdev_io->bdev; 1295 1296 switch (bdev_io->type) { 1297 case SPDK_BDEV_IO_TYPE_NVME_IO: 1298 case SPDK_BDEV_IO_TYPE_NVME_IO_MD: 1299 return bdev_io->u.nvme_passthru.nbytes; 1300 case SPDK_BDEV_IO_TYPE_READ: 1301 case SPDK_BDEV_IO_TYPE_WRITE: 1302 return bdev_io->u.bdev.num_blocks * bdev->blocklen; 1303 default: 1304 return 0; 1305 } 1306 } 1307 1308 static bool 1309 _spdk_bdev_qos_rw_queue_io(const struct spdk_bdev_qos_limit *limit, struct spdk_bdev_io *io) 1310 { 1311 if (limit->max_per_timeslice > 0 && limit->remaining_this_timeslice <= 0) { 1312 return true; 1313 } else { 1314 return false; 1315 } 1316 } 1317 1318 static bool 1319 _spdk_bdev_qos_r_queue_io(const struct spdk_bdev_qos_limit *limit, struct spdk_bdev_io *io) 1320 { 1321 if (_spdk_bdev_is_read_io(io) == false) { 1322 return false; 1323 } 1324 1325 return _spdk_bdev_qos_rw_queue_io(limit, io); 1326 } 1327 1328 static bool 1329 _spdk_bdev_qos_w_queue_io(const struct spdk_bdev_qos_limit *limit, struct spdk_bdev_io *io) 1330 { 1331 if (_spdk_bdev_is_read_io(io) == true) { 1332 return false; 1333 } 1334 1335 return _spdk_bdev_qos_rw_queue_io(limit, io); 1336 } 1337 1338 static void 1339 _spdk_bdev_qos_rw_iops_update_quota(struct spdk_bdev_qos_limit *limit, struct spdk_bdev_io *io) 1340 { 1341 limit->remaining_this_timeslice--; 1342 } 1343 1344 static void 1345 _spdk_bdev_qos_rw_bps_update_quota(struct spdk_bdev_qos_limit *limit, struct spdk_bdev_io *io) 1346 { 1347 limit->remaining_this_timeslice -= _spdk_bdev_get_io_size_in_byte(io); 1348 } 1349 1350 static void 1351 _spdk_bdev_qos_r_bps_update_quota(struct spdk_bdev_qos_limit *limit, struct spdk_bdev_io *io) 1352 { 1353 if (_spdk_bdev_is_read_io(io) == false) { 1354 return; 1355 } 1356 1357 return _spdk_bdev_qos_rw_bps_update_quota(limit, io); 1358 } 1359 1360 static void 1361 _spdk_bdev_qos_w_bps_update_quota(struct spdk_bdev_qos_limit *limit, struct spdk_bdev_io *io) 1362 { 1363 if (_spdk_bdev_is_read_io(io) == true) { 1364 return; 1365 } 1366 1367 return _spdk_bdev_qos_rw_bps_update_quota(limit, io); 1368 } 1369 1370 static void 1371 _spdk_bdev_qos_set_ops(struct spdk_bdev_qos *qos) 1372 { 1373 int i; 1374 1375 for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) { 1376 if (qos->rate_limits[i].limit == SPDK_BDEV_QOS_LIMIT_NOT_DEFINED) { 1377 qos->rate_limits[i].queue_io = NULL; 1378 qos->rate_limits[i].update_quota = NULL; 1379 continue; 1380 } 1381 1382 switch (i) { 1383 case SPDK_BDEV_QOS_RW_IOPS_RATE_LIMIT: 1384 qos->rate_limits[i].queue_io = _spdk_bdev_qos_rw_queue_io; 1385 qos->rate_limits[i].update_quota = _spdk_bdev_qos_rw_iops_update_quota; 1386 break; 1387 case SPDK_BDEV_QOS_RW_BPS_RATE_LIMIT: 1388 qos->rate_limits[i].queue_io = _spdk_bdev_qos_rw_queue_io; 1389 qos->rate_limits[i].update_quota = _spdk_bdev_qos_rw_bps_update_quota; 1390 break; 1391 case SPDK_BDEV_QOS_R_BPS_RATE_LIMIT: 1392 qos->rate_limits[i].queue_io = _spdk_bdev_qos_r_queue_io; 1393 qos->rate_limits[i].update_quota = _spdk_bdev_qos_r_bps_update_quota; 1394 break; 1395 case SPDK_BDEV_QOS_W_BPS_RATE_LIMIT: 1396 qos->rate_limits[i].queue_io = _spdk_bdev_qos_w_queue_io; 1397 qos->rate_limits[i].update_quota = _spdk_bdev_qos_w_bps_update_quota; 1398 break; 1399 default: 1400 break; 1401 } 1402 } 1403 } 1404 1405 static int 1406 _spdk_bdev_qos_io_submit(struct spdk_bdev_channel *ch, struct spdk_bdev_qos *qos) 1407 { 1408 struct spdk_bdev_io *bdev_io = NULL, *tmp = NULL; 1409 struct spdk_bdev *bdev = ch->bdev; 1410 struct spdk_bdev_shared_resource *shared_resource = ch->shared_resource; 1411 int i, submitted_ios = 0; 1412 1413 TAILQ_FOREACH_SAFE(bdev_io, &qos->queued, internal.link, tmp) { 1414 if (_spdk_bdev_qos_io_to_limit(bdev_io) == true) { 1415 for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) { 1416 if (!qos->rate_limits[i].queue_io) { 1417 continue; 1418 } 1419 1420 if (qos->rate_limits[i].queue_io(&qos->rate_limits[i], 1421 bdev_io) == true) { 1422 return submitted_ios; 1423 } 1424 } 1425 for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) { 1426 if (!qos->rate_limits[i].update_quota) { 1427 continue; 1428 } 1429 1430 qos->rate_limits[i].update_quota(&qos->rate_limits[i], bdev_io); 1431 } 1432 } 1433 1434 TAILQ_REMOVE(&qos->queued, bdev_io, internal.link); 1435 ch->io_outstanding++; 1436 shared_resource->io_outstanding++; 1437 bdev_io->internal.in_submit_request = true; 1438 bdev->fn_table->submit_request(ch->channel, bdev_io); 1439 bdev_io->internal.in_submit_request = false; 1440 submitted_ios++; 1441 } 1442 1443 return submitted_ios; 1444 } 1445 1446 static void 1447 _spdk_bdev_queue_io_wait_with_cb(struct spdk_bdev_io *bdev_io, spdk_bdev_io_wait_cb cb_fn) 1448 { 1449 int rc; 1450 1451 bdev_io->internal.waitq_entry.bdev = bdev_io->bdev; 1452 bdev_io->internal.waitq_entry.cb_fn = cb_fn; 1453 bdev_io->internal.waitq_entry.cb_arg = bdev_io; 1454 rc = spdk_bdev_queue_io_wait(bdev_io->bdev, spdk_io_channel_from_ctx(bdev_io->internal.ch), 1455 &bdev_io->internal.waitq_entry); 1456 if (rc != 0) { 1457 SPDK_ERRLOG("Queue IO failed, rc=%d\n", rc); 1458 bdev_io->internal.status = SPDK_BDEV_IO_STATUS_FAILED; 1459 bdev_io->internal.cb(bdev_io, false, bdev_io->internal.caller_ctx); 1460 } 1461 } 1462 1463 static bool 1464 _spdk_bdev_io_type_can_split(uint8_t type) 1465 { 1466 assert(type != SPDK_BDEV_IO_TYPE_INVALID); 1467 assert(type < SPDK_BDEV_NUM_IO_TYPES); 1468 1469 /* Only split READ and WRITE I/O. Theoretically other types of I/O like 1470 * UNMAP could be split, but these types of I/O are typically much larger 1471 * in size (sometimes the size of the entire block device), and the bdev 1472 * module can more efficiently split these types of I/O. Plus those types 1473 * of I/O do not have a payload, which makes the splitting process simpler. 1474 */ 1475 if (type == SPDK_BDEV_IO_TYPE_READ || type == SPDK_BDEV_IO_TYPE_WRITE) { 1476 return true; 1477 } else { 1478 return false; 1479 } 1480 } 1481 1482 static bool 1483 _spdk_bdev_io_should_split(struct spdk_bdev_io *bdev_io) 1484 { 1485 uint64_t start_stripe, end_stripe; 1486 uint32_t io_boundary = bdev_io->bdev->optimal_io_boundary; 1487 1488 if (io_boundary == 0) { 1489 return false; 1490 } 1491 1492 if (!_spdk_bdev_io_type_can_split(bdev_io->type)) { 1493 return false; 1494 } 1495 1496 start_stripe = bdev_io->u.bdev.offset_blocks; 1497 end_stripe = start_stripe + bdev_io->u.bdev.num_blocks - 1; 1498 /* Avoid expensive div operations if possible. These spdk_u32 functions are very cheap. */ 1499 if (spdk_likely(spdk_u32_is_pow2(io_boundary))) { 1500 start_stripe >>= spdk_u32log2(io_boundary); 1501 end_stripe >>= spdk_u32log2(io_boundary); 1502 } else { 1503 start_stripe /= io_boundary; 1504 end_stripe /= io_boundary; 1505 } 1506 return (start_stripe != end_stripe); 1507 } 1508 1509 static uint32_t 1510 _to_next_boundary(uint64_t offset, uint32_t boundary) 1511 { 1512 return (boundary - (offset % boundary)); 1513 } 1514 1515 static void 1516 _spdk_bdev_io_split_done(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg); 1517 1518 static void 1519 _spdk_bdev_io_split_with_payload(void *_bdev_io) 1520 { 1521 struct spdk_bdev_io *bdev_io = _bdev_io; 1522 uint64_t current_offset, remaining; 1523 uint32_t blocklen, to_next_boundary, to_next_boundary_bytes; 1524 struct iovec *parent_iov, *iov; 1525 uint64_t parent_iov_offset, iov_len; 1526 uint32_t parent_iovpos, parent_iovcnt, child_iovcnt, iovcnt; 1527 int rc; 1528 1529 remaining = bdev_io->u.bdev.split_remaining_num_blocks; 1530 current_offset = bdev_io->u.bdev.split_current_offset_blocks; 1531 blocklen = bdev_io->bdev->blocklen; 1532 parent_iov_offset = (current_offset - bdev_io->u.bdev.offset_blocks) * blocklen; 1533 parent_iovcnt = bdev_io->u.bdev.iovcnt; 1534 1535 for (parent_iovpos = 0; parent_iovpos < parent_iovcnt; parent_iovpos++) { 1536 parent_iov = &bdev_io->u.bdev.iovs[parent_iovpos]; 1537 if (parent_iov_offset < parent_iov->iov_len) { 1538 break; 1539 } 1540 parent_iov_offset -= parent_iov->iov_len; 1541 } 1542 1543 child_iovcnt = 0; 1544 while (remaining > 0 && parent_iovpos < parent_iovcnt && child_iovcnt < BDEV_IO_NUM_CHILD_IOV) { 1545 to_next_boundary = _to_next_boundary(current_offset, bdev_io->bdev->optimal_io_boundary); 1546 to_next_boundary = spdk_min(remaining, to_next_boundary); 1547 to_next_boundary_bytes = to_next_boundary * blocklen; 1548 iov = &bdev_io->child_iov[child_iovcnt]; 1549 iovcnt = 0; 1550 while (to_next_boundary_bytes > 0 && parent_iovpos < parent_iovcnt && 1551 child_iovcnt < BDEV_IO_NUM_CHILD_IOV) { 1552 parent_iov = &bdev_io->u.bdev.iovs[parent_iovpos]; 1553 iov_len = spdk_min(to_next_boundary_bytes, parent_iov->iov_len - parent_iov_offset); 1554 to_next_boundary_bytes -= iov_len; 1555 1556 bdev_io->child_iov[child_iovcnt].iov_base = parent_iov->iov_base + parent_iov_offset; 1557 bdev_io->child_iov[child_iovcnt].iov_len = iov_len; 1558 1559 if (iov_len < parent_iov->iov_len - parent_iov_offset) { 1560 parent_iov_offset += iov_len; 1561 } else { 1562 parent_iovpos++; 1563 parent_iov_offset = 0; 1564 } 1565 child_iovcnt++; 1566 iovcnt++; 1567 } 1568 1569 if (to_next_boundary_bytes > 0) { 1570 /* We had to stop this child I/O early because we ran out of 1571 * child_iov space. Make sure the iovs collected are valid and 1572 * then adjust to_next_boundary before starting the child I/O. 1573 */ 1574 if ((to_next_boundary_bytes % blocklen) != 0) { 1575 SPDK_ERRLOG("Remaining %" PRIu32 " is not multiple of block size %" PRIu32 "\n", 1576 to_next_boundary_bytes, blocklen); 1577 bdev_io->internal.status = SPDK_BDEV_IO_STATUS_FAILED; 1578 if (bdev_io->u.bdev.split_outstanding == 0) { 1579 bdev_io->internal.cb(bdev_io, false, bdev_io->internal.caller_ctx); 1580 } 1581 return; 1582 } 1583 to_next_boundary -= to_next_boundary_bytes / blocklen; 1584 } 1585 1586 bdev_io->u.bdev.split_outstanding++; 1587 1588 if (bdev_io->type == SPDK_BDEV_IO_TYPE_READ) { 1589 rc = spdk_bdev_readv_blocks(bdev_io->internal.desc, 1590 spdk_io_channel_from_ctx(bdev_io->internal.ch), 1591 iov, iovcnt, current_offset, to_next_boundary, 1592 _spdk_bdev_io_split_done, bdev_io); 1593 } else { 1594 rc = spdk_bdev_writev_blocks(bdev_io->internal.desc, 1595 spdk_io_channel_from_ctx(bdev_io->internal.ch), 1596 iov, iovcnt, current_offset, to_next_boundary, 1597 _spdk_bdev_io_split_done, bdev_io); 1598 } 1599 1600 if (rc == 0) { 1601 current_offset += to_next_boundary; 1602 remaining -= to_next_boundary; 1603 bdev_io->u.bdev.split_current_offset_blocks = current_offset; 1604 bdev_io->u.bdev.split_remaining_num_blocks = remaining; 1605 } else { 1606 bdev_io->u.bdev.split_outstanding--; 1607 if (rc == -ENOMEM) { 1608 if (bdev_io->u.bdev.split_outstanding == 0) { 1609 /* No I/O is outstanding. Hence we should wait here. */ 1610 _spdk_bdev_queue_io_wait_with_cb(bdev_io, 1611 _spdk_bdev_io_split_with_payload); 1612 } 1613 } else { 1614 bdev_io->internal.status = SPDK_BDEV_IO_STATUS_FAILED; 1615 if (bdev_io->u.bdev.split_outstanding == 0) { 1616 bdev_io->internal.cb(bdev_io, false, bdev_io->internal.caller_ctx); 1617 } 1618 } 1619 1620 return; 1621 } 1622 } 1623 } 1624 1625 static void 1626 _spdk_bdev_io_split_done(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg) 1627 { 1628 struct spdk_bdev_io *parent_io = cb_arg; 1629 1630 spdk_bdev_free_io(bdev_io); 1631 1632 if (!success) { 1633 parent_io->internal.status = SPDK_BDEV_IO_STATUS_FAILED; 1634 } 1635 parent_io->u.bdev.split_outstanding--; 1636 if (parent_io->u.bdev.split_outstanding != 0) { 1637 return; 1638 } 1639 1640 /* 1641 * Parent I/O finishes when all blocks are consumed or there is any failure of 1642 * child I/O and no outstanding child I/O. 1643 */ 1644 if (parent_io->u.bdev.split_remaining_num_blocks == 0 || 1645 parent_io->internal.status != SPDK_BDEV_IO_STATUS_SUCCESS) { 1646 parent_io->internal.cb(parent_io, parent_io->internal.status == SPDK_BDEV_IO_STATUS_SUCCESS, 1647 parent_io->internal.caller_ctx); 1648 return; 1649 } 1650 1651 /* 1652 * Continue with the splitting process. This function will complete the parent I/O if the 1653 * splitting is done. 1654 */ 1655 _spdk_bdev_io_split_with_payload(parent_io); 1656 } 1657 1658 static void 1659 _spdk_bdev_io_split(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io) 1660 { 1661 assert(_spdk_bdev_io_type_can_split(bdev_io->type)); 1662 1663 bdev_io->u.bdev.split_current_offset_blocks = bdev_io->u.bdev.offset_blocks; 1664 bdev_io->u.bdev.split_remaining_num_blocks = bdev_io->u.bdev.num_blocks; 1665 bdev_io->u.bdev.split_outstanding = 0; 1666 bdev_io->internal.status = SPDK_BDEV_IO_STATUS_SUCCESS; 1667 1668 _spdk_bdev_io_split_with_payload(bdev_io); 1669 } 1670 1671 static void 1672 _spdk_bdev_io_split_get_buf_cb(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io, 1673 bool success) 1674 { 1675 if (!success) { 1676 spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED); 1677 return; 1678 } 1679 1680 _spdk_bdev_io_split(ch, bdev_io); 1681 } 1682 1683 /* Explicitly mark this inline, since it's used as a function pointer and otherwise won't 1684 * be inlined, at least on some compilers. 1685 */ 1686 static inline void 1687 _spdk_bdev_io_submit(void *ctx) 1688 { 1689 struct spdk_bdev_io *bdev_io = ctx; 1690 struct spdk_bdev *bdev = bdev_io->bdev; 1691 struct spdk_bdev_channel *bdev_ch = bdev_io->internal.ch; 1692 struct spdk_io_channel *ch = bdev_ch->channel; 1693 struct spdk_bdev_shared_resource *shared_resource = bdev_ch->shared_resource; 1694 uint64_t tsc; 1695 1696 tsc = spdk_get_ticks(); 1697 bdev_io->internal.submit_tsc = tsc; 1698 spdk_trace_record_tsc(tsc, TRACE_BDEV_IO_START, 0, 0, (uintptr_t)bdev_io, bdev_io->type); 1699 bdev_ch->io_outstanding++; 1700 shared_resource->io_outstanding++; 1701 bdev_io->internal.in_submit_request = true; 1702 if (spdk_likely(bdev_ch->flags == 0)) { 1703 if (spdk_likely(TAILQ_EMPTY(&shared_resource->nomem_io))) { 1704 bdev->fn_table->submit_request(ch, bdev_io); 1705 } else { 1706 bdev_ch->io_outstanding--; 1707 shared_resource->io_outstanding--; 1708 TAILQ_INSERT_TAIL(&shared_resource->nomem_io, bdev_io, internal.link); 1709 } 1710 } else if (bdev_ch->flags & BDEV_CH_RESET_IN_PROGRESS) { 1711 spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED); 1712 } else if (bdev_ch->flags & BDEV_CH_QOS_ENABLED) { 1713 bdev_ch->io_outstanding--; 1714 shared_resource->io_outstanding--; 1715 TAILQ_INSERT_TAIL(&bdev->internal.qos->queued, bdev_io, internal.link); 1716 _spdk_bdev_qos_io_submit(bdev_ch, bdev->internal.qos); 1717 } else { 1718 SPDK_ERRLOG("unknown bdev_ch flag %x found\n", bdev_ch->flags); 1719 spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED); 1720 } 1721 bdev_io->internal.in_submit_request = false; 1722 } 1723 1724 static void 1725 spdk_bdev_io_submit(struct spdk_bdev_io *bdev_io) 1726 { 1727 struct spdk_bdev *bdev = bdev_io->bdev; 1728 struct spdk_thread *thread = spdk_io_channel_get_thread(bdev_io->internal.ch->channel); 1729 1730 assert(thread != NULL); 1731 assert(bdev_io->internal.status == SPDK_BDEV_IO_STATUS_PENDING); 1732 1733 if (bdev->split_on_optimal_io_boundary && _spdk_bdev_io_should_split(bdev_io)) { 1734 if (bdev_io->type == SPDK_BDEV_IO_TYPE_READ) { 1735 spdk_bdev_io_get_buf(bdev_io, _spdk_bdev_io_split_get_buf_cb, 1736 bdev_io->u.bdev.num_blocks * bdev_io->bdev->blocklen); 1737 } else { 1738 _spdk_bdev_io_split(NULL, bdev_io); 1739 } 1740 return; 1741 } 1742 1743 if (bdev_io->internal.ch->flags & BDEV_CH_QOS_ENABLED) { 1744 if ((thread == bdev->internal.qos->thread) || !bdev->internal.qos->thread) { 1745 _spdk_bdev_io_submit(bdev_io); 1746 } else { 1747 bdev_io->internal.io_submit_ch = bdev_io->internal.ch; 1748 bdev_io->internal.ch = bdev->internal.qos->ch; 1749 spdk_thread_send_msg(bdev->internal.qos->thread, _spdk_bdev_io_submit, bdev_io); 1750 } 1751 } else { 1752 _spdk_bdev_io_submit(bdev_io); 1753 } 1754 } 1755 1756 static void 1757 spdk_bdev_io_submit_reset(struct spdk_bdev_io *bdev_io) 1758 { 1759 struct spdk_bdev *bdev = bdev_io->bdev; 1760 struct spdk_bdev_channel *bdev_ch = bdev_io->internal.ch; 1761 struct spdk_io_channel *ch = bdev_ch->channel; 1762 1763 assert(bdev_io->internal.status == SPDK_BDEV_IO_STATUS_PENDING); 1764 1765 bdev_io->internal.in_submit_request = true; 1766 bdev->fn_table->submit_request(ch, bdev_io); 1767 bdev_io->internal.in_submit_request = false; 1768 } 1769 1770 static void 1771 spdk_bdev_io_init(struct spdk_bdev_io *bdev_io, 1772 struct spdk_bdev *bdev, void *cb_arg, 1773 spdk_bdev_io_completion_cb cb) 1774 { 1775 bdev_io->bdev = bdev; 1776 bdev_io->internal.caller_ctx = cb_arg; 1777 bdev_io->internal.cb = cb; 1778 bdev_io->internal.status = SPDK_BDEV_IO_STATUS_PENDING; 1779 bdev_io->internal.in_submit_request = false; 1780 bdev_io->internal.buf = NULL; 1781 bdev_io->internal.io_submit_ch = NULL; 1782 bdev_io->internal.orig_iovs = NULL; 1783 bdev_io->internal.orig_iovcnt = 0; 1784 } 1785 1786 static bool 1787 _spdk_bdev_io_type_supported(struct spdk_bdev *bdev, enum spdk_bdev_io_type io_type) 1788 { 1789 return bdev->fn_table->io_type_supported(bdev->ctxt, io_type); 1790 } 1791 1792 bool 1793 spdk_bdev_io_type_supported(struct spdk_bdev *bdev, enum spdk_bdev_io_type io_type) 1794 { 1795 bool supported; 1796 1797 supported = _spdk_bdev_io_type_supported(bdev, io_type); 1798 1799 if (!supported) { 1800 switch (io_type) { 1801 case SPDK_BDEV_IO_TYPE_WRITE_ZEROES: 1802 /* The bdev layer will emulate write zeroes as long as write is supported. */ 1803 supported = _spdk_bdev_io_type_supported(bdev, SPDK_BDEV_IO_TYPE_WRITE); 1804 break; 1805 case SPDK_BDEV_IO_TYPE_ZCOPY: 1806 /* Zero copy can be emulated with regular read and write */ 1807 supported = _spdk_bdev_io_type_supported(bdev, SPDK_BDEV_IO_TYPE_READ) && 1808 _spdk_bdev_io_type_supported(bdev, SPDK_BDEV_IO_TYPE_WRITE); 1809 break; 1810 default: 1811 break; 1812 } 1813 } 1814 1815 return supported; 1816 } 1817 1818 int 1819 spdk_bdev_dump_info_json(struct spdk_bdev *bdev, struct spdk_json_write_ctx *w) 1820 { 1821 if (bdev->fn_table->dump_info_json) { 1822 return bdev->fn_table->dump_info_json(bdev->ctxt, w); 1823 } 1824 1825 return 0; 1826 } 1827 1828 static void 1829 spdk_bdev_qos_update_max_quota_per_timeslice(struct spdk_bdev_qos *qos) 1830 { 1831 uint32_t max_per_timeslice = 0; 1832 int i; 1833 1834 for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) { 1835 if (qos->rate_limits[i].limit == SPDK_BDEV_QOS_LIMIT_NOT_DEFINED) { 1836 qos->rate_limits[i].max_per_timeslice = 0; 1837 continue; 1838 } 1839 1840 max_per_timeslice = qos->rate_limits[i].limit * 1841 SPDK_BDEV_QOS_TIMESLICE_IN_USEC / SPDK_SEC_TO_USEC; 1842 1843 qos->rate_limits[i].max_per_timeslice = spdk_max(max_per_timeslice, 1844 qos->rate_limits[i].min_per_timeslice); 1845 1846 qos->rate_limits[i].remaining_this_timeslice = qos->rate_limits[i].max_per_timeslice; 1847 } 1848 1849 _spdk_bdev_qos_set_ops(qos); 1850 } 1851 1852 static int 1853 spdk_bdev_channel_poll_qos(void *arg) 1854 { 1855 struct spdk_bdev_qos *qos = arg; 1856 uint64_t now = spdk_get_ticks(); 1857 int i; 1858 1859 if (now < (qos->last_timeslice + qos->timeslice_size)) { 1860 /* We received our callback earlier than expected - return 1861 * immediately and wait to do accounting until at least one 1862 * timeslice has actually expired. This should never happen 1863 * with a well-behaved timer implementation. 1864 */ 1865 return 0; 1866 } 1867 1868 /* Reset for next round of rate limiting */ 1869 for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) { 1870 /* We may have allowed the IOs or bytes to slightly overrun in the last 1871 * timeslice. remaining_this_timeslice is signed, so if it's negative 1872 * here, we'll account for the overrun so that the next timeslice will 1873 * be appropriately reduced. 1874 */ 1875 if (qos->rate_limits[i].remaining_this_timeslice > 0) { 1876 qos->rate_limits[i].remaining_this_timeslice = 0; 1877 } 1878 } 1879 1880 while (now >= (qos->last_timeslice + qos->timeslice_size)) { 1881 qos->last_timeslice += qos->timeslice_size; 1882 for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) { 1883 qos->rate_limits[i].remaining_this_timeslice += 1884 qos->rate_limits[i].max_per_timeslice; 1885 } 1886 } 1887 1888 return _spdk_bdev_qos_io_submit(qos->ch, qos); 1889 } 1890 1891 static void 1892 _spdk_bdev_channel_destroy_resource(struct spdk_bdev_channel *ch) 1893 { 1894 struct spdk_bdev_shared_resource *shared_resource; 1895 1896 spdk_put_io_channel(ch->channel); 1897 1898 shared_resource = ch->shared_resource; 1899 1900 assert(ch->io_outstanding == 0); 1901 assert(shared_resource->ref > 0); 1902 shared_resource->ref--; 1903 if (shared_resource->ref == 0) { 1904 assert(shared_resource->io_outstanding == 0); 1905 TAILQ_REMOVE(&shared_resource->mgmt_ch->shared_resources, shared_resource, link); 1906 spdk_put_io_channel(spdk_io_channel_from_ctx(shared_resource->mgmt_ch)); 1907 free(shared_resource); 1908 } 1909 } 1910 1911 /* Caller must hold bdev->internal.mutex. */ 1912 static void 1913 _spdk_bdev_enable_qos(struct spdk_bdev *bdev, struct spdk_bdev_channel *ch) 1914 { 1915 struct spdk_bdev_qos *qos = bdev->internal.qos; 1916 int i; 1917 1918 /* Rate limiting on this bdev enabled */ 1919 if (qos) { 1920 if (qos->ch == NULL) { 1921 struct spdk_io_channel *io_ch; 1922 1923 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Selecting channel %p as QoS channel for bdev %s on thread %p\n", ch, 1924 bdev->name, spdk_get_thread()); 1925 1926 /* No qos channel has been selected, so set one up */ 1927 1928 /* Take another reference to ch */ 1929 io_ch = spdk_get_io_channel(__bdev_to_io_dev(bdev)); 1930 assert(io_ch != NULL); 1931 qos->ch = ch; 1932 1933 qos->thread = spdk_io_channel_get_thread(io_ch); 1934 1935 TAILQ_INIT(&qos->queued); 1936 1937 for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) { 1938 if (_spdk_bdev_qos_is_iops_rate_limit(i) == true) { 1939 qos->rate_limits[i].min_per_timeslice = 1940 SPDK_BDEV_QOS_MIN_IO_PER_TIMESLICE; 1941 } else { 1942 qos->rate_limits[i].min_per_timeslice = 1943 SPDK_BDEV_QOS_MIN_BYTE_PER_TIMESLICE; 1944 } 1945 1946 if (qos->rate_limits[i].limit == 0) { 1947 qos->rate_limits[i].limit = SPDK_BDEV_QOS_LIMIT_NOT_DEFINED; 1948 } 1949 } 1950 spdk_bdev_qos_update_max_quota_per_timeslice(qos); 1951 qos->timeslice_size = 1952 SPDK_BDEV_QOS_TIMESLICE_IN_USEC * spdk_get_ticks_hz() / SPDK_SEC_TO_USEC; 1953 qos->last_timeslice = spdk_get_ticks(); 1954 qos->poller = spdk_poller_register(spdk_bdev_channel_poll_qos, 1955 qos, 1956 SPDK_BDEV_QOS_TIMESLICE_IN_USEC); 1957 } 1958 1959 ch->flags |= BDEV_CH_QOS_ENABLED; 1960 } 1961 } 1962 1963 static int 1964 spdk_bdev_channel_create(void *io_device, void *ctx_buf) 1965 { 1966 struct spdk_bdev *bdev = __bdev_from_io_dev(io_device); 1967 struct spdk_bdev_channel *ch = ctx_buf; 1968 struct spdk_io_channel *mgmt_io_ch; 1969 struct spdk_bdev_mgmt_channel *mgmt_ch; 1970 struct spdk_bdev_shared_resource *shared_resource; 1971 1972 ch->bdev = bdev; 1973 ch->channel = bdev->fn_table->get_io_channel(bdev->ctxt); 1974 if (!ch->channel) { 1975 return -1; 1976 } 1977 1978 assert(ch->histogram == NULL); 1979 if (bdev->internal.histogram_enabled) { 1980 ch->histogram = spdk_histogram_data_alloc(); 1981 if (ch->histogram == NULL) { 1982 SPDK_ERRLOG("Could not allocate histogram\n"); 1983 } 1984 } 1985 1986 mgmt_io_ch = spdk_get_io_channel(&g_bdev_mgr); 1987 if (!mgmt_io_ch) { 1988 spdk_put_io_channel(ch->channel); 1989 return -1; 1990 } 1991 1992 mgmt_ch = spdk_io_channel_get_ctx(mgmt_io_ch); 1993 TAILQ_FOREACH(shared_resource, &mgmt_ch->shared_resources, link) { 1994 if (shared_resource->shared_ch == ch->channel) { 1995 spdk_put_io_channel(mgmt_io_ch); 1996 shared_resource->ref++; 1997 break; 1998 } 1999 } 2000 2001 if (shared_resource == NULL) { 2002 shared_resource = calloc(1, sizeof(*shared_resource)); 2003 if (shared_resource == NULL) { 2004 spdk_put_io_channel(ch->channel); 2005 spdk_put_io_channel(mgmt_io_ch); 2006 return -1; 2007 } 2008 2009 shared_resource->mgmt_ch = mgmt_ch; 2010 shared_resource->io_outstanding = 0; 2011 TAILQ_INIT(&shared_resource->nomem_io); 2012 shared_resource->nomem_threshold = 0; 2013 shared_resource->shared_ch = ch->channel; 2014 shared_resource->ref = 1; 2015 TAILQ_INSERT_TAIL(&mgmt_ch->shared_resources, shared_resource, link); 2016 } 2017 2018 memset(&ch->stat, 0, sizeof(ch->stat)); 2019 ch->stat.ticks_rate = spdk_get_ticks_hz(); 2020 ch->io_outstanding = 0; 2021 TAILQ_INIT(&ch->queued_resets); 2022 ch->flags = 0; 2023 ch->shared_resource = shared_resource; 2024 2025 #ifdef SPDK_CONFIG_VTUNE 2026 { 2027 char *name; 2028 __itt_init_ittlib(NULL, 0); 2029 name = spdk_sprintf_alloc("spdk_bdev_%s_%p", ch->bdev->name, ch); 2030 if (!name) { 2031 _spdk_bdev_channel_destroy_resource(ch); 2032 return -1; 2033 } 2034 ch->handle = __itt_string_handle_create(name); 2035 free(name); 2036 ch->start_tsc = spdk_get_ticks(); 2037 ch->interval_tsc = spdk_get_ticks_hz() / 100; 2038 memset(&ch->prev_stat, 0, sizeof(ch->prev_stat)); 2039 } 2040 #endif 2041 2042 pthread_mutex_lock(&bdev->internal.mutex); 2043 _spdk_bdev_enable_qos(bdev, ch); 2044 pthread_mutex_unlock(&bdev->internal.mutex); 2045 2046 return 0; 2047 } 2048 2049 /* 2050 * Abort I/O that are waiting on a data buffer. These types of I/O are 2051 * linked using the spdk_bdev_io internal.buf_link TAILQ_ENTRY. 2052 */ 2053 static void 2054 _spdk_bdev_abort_buf_io(bdev_io_stailq_t *queue, struct spdk_bdev_channel *ch) 2055 { 2056 bdev_io_stailq_t tmp; 2057 struct spdk_bdev_io *bdev_io; 2058 2059 STAILQ_INIT(&tmp); 2060 2061 while (!STAILQ_EMPTY(queue)) { 2062 bdev_io = STAILQ_FIRST(queue); 2063 STAILQ_REMOVE_HEAD(queue, internal.buf_link); 2064 if (bdev_io->internal.ch == ch) { 2065 spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED); 2066 } else { 2067 STAILQ_INSERT_TAIL(&tmp, bdev_io, internal.buf_link); 2068 } 2069 } 2070 2071 STAILQ_SWAP(&tmp, queue, spdk_bdev_io); 2072 } 2073 2074 /* 2075 * Abort I/O that are queued waiting for submission. These types of I/O are 2076 * linked using the spdk_bdev_io link TAILQ_ENTRY. 2077 */ 2078 static void 2079 _spdk_bdev_abort_queued_io(bdev_io_tailq_t *queue, struct spdk_bdev_channel *ch) 2080 { 2081 struct spdk_bdev_io *bdev_io, *tmp; 2082 2083 TAILQ_FOREACH_SAFE(bdev_io, queue, internal.link, tmp) { 2084 if (bdev_io->internal.ch == ch) { 2085 TAILQ_REMOVE(queue, bdev_io, internal.link); 2086 /* 2087 * spdk_bdev_io_complete() assumes that the completed I/O had 2088 * been submitted to the bdev module. Since in this case it 2089 * hadn't, bump io_outstanding to account for the decrement 2090 * that spdk_bdev_io_complete() will do. 2091 */ 2092 if (bdev_io->type != SPDK_BDEV_IO_TYPE_RESET) { 2093 ch->io_outstanding++; 2094 ch->shared_resource->io_outstanding++; 2095 } 2096 spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED); 2097 } 2098 } 2099 } 2100 2101 static void 2102 spdk_bdev_qos_channel_destroy(void *cb_arg) 2103 { 2104 struct spdk_bdev_qos *qos = cb_arg; 2105 2106 spdk_put_io_channel(spdk_io_channel_from_ctx(qos->ch)); 2107 spdk_poller_unregister(&qos->poller); 2108 2109 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Free QoS %p.\n", qos); 2110 2111 free(qos); 2112 } 2113 2114 static int 2115 spdk_bdev_qos_destroy(struct spdk_bdev *bdev) 2116 { 2117 int i; 2118 2119 /* 2120 * Cleanly shutting down the QoS poller is tricky, because 2121 * during the asynchronous operation the user could open 2122 * a new descriptor and create a new channel, spawning 2123 * a new QoS poller. 2124 * 2125 * The strategy is to create a new QoS structure here and swap it 2126 * in. The shutdown path then continues to refer to the old one 2127 * until it completes and then releases it. 2128 */ 2129 struct spdk_bdev_qos *new_qos, *old_qos; 2130 2131 old_qos = bdev->internal.qos; 2132 2133 new_qos = calloc(1, sizeof(*new_qos)); 2134 if (!new_qos) { 2135 SPDK_ERRLOG("Unable to allocate memory to shut down QoS.\n"); 2136 return -ENOMEM; 2137 } 2138 2139 /* Copy the old QoS data into the newly allocated structure */ 2140 memcpy(new_qos, old_qos, sizeof(*new_qos)); 2141 2142 /* Zero out the key parts of the QoS structure */ 2143 new_qos->ch = NULL; 2144 new_qos->thread = NULL; 2145 new_qos->poller = NULL; 2146 TAILQ_INIT(&new_qos->queued); 2147 /* 2148 * The limit member of spdk_bdev_qos_limit structure is not zeroed. 2149 * It will be used later for the new QoS structure. 2150 */ 2151 for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) { 2152 new_qos->rate_limits[i].remaining_this_timeslice = 0; 2153 new_qos->rate_limits[i].min_per_timeslice = 0; 2154 new_qos->rate_limits[i].max_per_timeslice = 0; 2155 } 2156 2157 bdev->internal.qos = new_qos; 2158 2159 if (old_qos->thread == NULL) { 2160 free(old_qos); 2161 } else { 2162 spdk_thread_send_msg(old_qos->thread, spdk_bdev_qos_channel_destroy, 2163 old_qos); 2164 } 2165 2166 /* It is safe to continue with destroying the bdev even though the QoS channel hasn't 2167 * been destroyed yet. The destruction path will end up waiting for the final 2168 * channel to be put before it releases resources. */ 2169 2170 return 0; 2171 } 2172 2173 static void 2174 _spdk_bdev_io_stat_add(struct spdk_bdev_io_stat *total, struct spdk_bdev_io_stat *add) 2175 { 2176 total->bytes_read += add->bytes_read; 2177 total->num_read_ops += add->num_read_ops; 2178 total->bytes_written += add->bytes_written; 2179 total->num_write_ops += add->num_write_ops; 2180 total->bytes_unmapped += add->bytes_unmapped; 2181 total->num_unmap_ops += add->num_unmap_ops; 2182 total->read_latency_ticks += add->read_latency_ticks; 2183 total->write_latency_ticks += add->write_latency_ticks; 2184 total->unmap_latency_ticks += add->unmap_latency_ticks; 2185 } 2186 2187 static void 2188 spdk_bdev_channel_destroy(void *io_device, void *ctx_buf) 2189 { 2190 struct spdk_bdev_channel *ch = ctx_buf; 2191 struct spdk_bdev_mgmt_channel *mgmt_ch; 2192 struct spdk_bdev_shared_resource *shared_resource = ch->shared_resource; 2193 2194 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Destroying channel %p for bdev %s on thread %p\n", ch, ch->bdev->name, 2195 spdk_get_thread()); 2196 2197 /* This channel is going away, so add its statistics into the bdev so that they don't get lost. */ 2198 pthread_mutex_lock(&ch->bdev->internal.mutex); 2199 _spdk_bdev_io_stat_add(&ch->bdev->internal.stat, &ch->stat); 2200 pthread_mutex_unlock(&ch->bdev->internal.mutex); 2201 2202 mgmt_ch = shared_resource->mgmt_ch; 2203 2204 _spdk_bdev_abort_queued_io(&ch->queued_resets, ch); 2205 _spdk_bdev_abort_queued_io(&shared_resource->nomem_io, ch); 2206 _spdk_bdev_abort_buf_io(&mgmt_ch->need_buf_small, ch); 2207 _spdk_bdev_abort_buf_io(&mgmt_ch->need_buf_large, ch); 2208 2209 if (ch->histogram) { 2210 spdk_histogram_data_free(ch->histogram); 2211 } 2212 2213 _spdk_bdev_channel_destroy_resource(ch); 2214 } 2215 2216 int 2217 spdk_bdev_alias_add(struct spdk_bdev *bdev, const char *alias) 2218 { 2219 struct spdk_bdev_alias *tmp; 2220 2221 if (alias == NULL) { 2222 SPDK_ERRLOG("Empty alias passed\n"); 2223 return -EINVAL; 2224 } 2225 2226 if (spdk_bdev_get_by_name(alias)) { 2227 SPDK_ERRLOG("Bdev name/alias: %s already exists\n", alias); 2228 return -EEXIST; 2229 } 2230 2231 tmp = calloc(1, sizeof(*tmp)); 2232 if (tmp == NULL) { 2233 SPDK_ERRLOG("Unable to allocate alias\n"); 2234 return -ENOMEM; 2235 } 2236 2237 tmp->alias = strdup(alias); 2238 if (tmp->alias == NULL) { 2239 free(tmp); 2240 SPDK_ERRLOG("Unable to allocate alias\n"); 2241 return -ENOMEM; 2242 } 2243 2244 TAILQ_INSERT_TAIL(&bdev->aliases, tmp, tailq); 2245 2246 return 0; 2247 } 2248 2249 int 2250 spdk_bdev_alias_del(struct spdk_bdev *bdev, const char *alias) 2251 { 2252 struct spdk_bdev_alias *tmp; 2253 2254 TAILQ_FOREACH(tmp, &bdev->aliases, tailq) { 2255 if (strcmp(alias, tmp->alias) == 0) { 2256 TAILQ_REMOVE(&bdev->aliases, tmp, tailq); 2257 free(tmp->alias); 2258 free(tmp); 2259 return 0; 2260 } 2261 } 2262 2263 SPDK_INFOLOG(SPDK_LOG_BDEV, "Alias %s does not exists\n", alias); 2264 2265 return -ENOENT; 2266 } 2267 2268 void 2269 spdk_bdev_alias_del_all(struct spdk_bdev *bdev) 2270 { 2271 struct spdk_bdev_alias *p, *tmp; 2272 2273 TAILQ_FOREACH_SAFE(p, &bdev->aliases, tailq, tmp) { 2274 TAILQ_REMOVE(&bdev->aliases, p, tailq); 2275 free(p->alias); 2276 free(p); 2277 } 2278 } 2279 2280 struct spdk_io_channel * 2281 spdk_bdev_get_io_channel(struct spdk_bdev_desc *desc) 2282 { 2283 return spdk_get_io_channel(__bdev_to_io_dev(desc->bdev)); 2284 } 2285 2286 const char * 2287 spdk_bdev_get_name(const struct spdk_bdev *bdev) 2288 { 2289 return bdev->name; 2290 } 2291 2292 const char * 2293 spdk_bdev_get_product_name(const struct spdk_bdev *bdev) 2294 { 2295 return bdev->product_name; 2296 } 2297 2298 const struct spdk_bdev_aliases_list * 2299 spdk_bdev_get_aliases(const struct spdk_bdev *bdev) 2300 { 2301 return &bdev->aliases; 2302 } 2303 2304 uint32_t 2305 spdk_bdev_get_block_size(const struct spdk_bdev *bdev) 2306 { 2307 return bdev->blocklen; 2308 } 2309 2310 uint64_t 2311 spdk_bdev_get_num_blocks(const struct spdk_bdev *bdev) 2312 { 2313 return bdev->blockcnt; 2314 } 2315 2316 const char * 2317 spdk_bdev_get_qos_rpc_type(enum spdk_bdev_qos_rate_limit_type type) 2318 { 2319 return qos_rpc_type[type]; 2320 } 2321 2322 void 2323 spdk_bdev_get_qos_rate_limits(struct spdk_bdev *bdev, uint64_t *limits) 2324 { 2325 int i; 2326 2327 memset(limits, 0, sizeof(*limits) * SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES); 2328 2329 pthread_mutex_lock(&bdev->internal.mutex); 2330 if (bdev->internal.qos) { 2331 for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) { 2332 if (bdev->internal.qos->rate_limits[i].limit != 2333 SPDK_BDEV_QOS_LIMIT_NOT_DEFINED) { 2334 limits[i] = bdev->internal.qos->rate_limits[i].limit; 2335 if (_spdk_bdev_qos_is_iops_rate_limit(i) == false) { 2336 /* Change from Byte to Megabyte which is user visible. */ 2337 limits[i] = limits[i] / 1024 / 1024; 2338 } 2339 } 2340 } 2341 } 2342 pthread_mutex_unlock(&bdev->internal.mutex); 2343 } 2344 2345 size_t 2346 spdk_bdev_get_buf_align(const struct spdk_bdev *bdev) 2347 { 2348 return 1 << bdev->required_alignment; 2349 } 2350 2351 uint32_t 2352 spdk_bdev_get_optimal_io_boundary(const struct spdk_bdev *bdev) 2353 { 2354 return bdev->optimal_io_boundary; 2355 } 2356 2357 bool 2358 spdk_bdev_has_write_cache(const struct spdk_bdev *bdev) 2359 { 2360 return bdev->write_cache; 2361 } 2362 2363 const struct spdk_uuid * 2364 spdk_bdev_get_uuid(const struct spdk_bdev *bdev) 2365 { 2366 return &bdev->uuid; 2367 } 2368 2369 uint32_t 2370 spdk_bdev_get_md_size(const struct spdk_bdev *bdev) 2371 { 2372 return bdev->md_len; 2373 } 2374 2375 bool 2376 spdk_bdev_is_md_interleaved(const struct spdk_bdev *bdev) 2377 { 2378 return (bdev->md_len != 0) && bdev->md_interleave; 2379 } 2380 2381 uint32_t 2382 spdk_bdev_get_data_block_size(const struct spdk_bdev *bdev) 2383 { 2384 if (spdk_bdev_is_md_interleaved(bdev)) { 2385 return bdev->blocklen - bdev->md_len; 2386 } else { 2387 return bdev->blocklen; 2388 } 2389 } 2390 2391 enum spdk_dif_type spdk_bdev_get_dif_type(const struct spdk_bdev *bdev) 2392 { 2393 if (bdev->md_len != 0) { 2394 return bdev->dif_type; 2395 } else { 2396 return SPDK_DIF_DISABLE; 2397 } 2398 } 2399 2400 bool 2401 spdk_bdev_is_dif_head_of_md(const struct spdk_bdev *bdev) 2402 { 2403 if (spdk_bdev_get_dif_type(bdev) != SPDK_DIF_DISABLE) { 2404 return bdev->dif_is_head_of_md; 2405 } else { 2406 return false; 2407 } 2408 } 2409 2410 bool 2411 spdk_bdev_is_dif_check_enabled(const struct spdk_bdev *bdev, 2412 enum spdk_dif_check_type check_type) 2413 { 2414 if (spdk_bdev_get_dif_type(bdev) == SPDK_DIF_DISABLE) { 2415 return false; 2416 } 2417 2418 switch (check_type) { 2419 case SPDK_DIF_CHECK_TYPE_REFTAG: 2420 return (bdev->dif_check_flags & SPDK_DIF_FLAGS_REFTAG_CHECK) != 0; 2421 case SPDK_DIF_CHECK_TYPE_APPTAG: 2422 return (bdev->dif_check_flags & SPDK_DIF_FLAGS_APPTAG_CHECK) != 0; 2423 case SPDK_DIF_CHECK_TYPE_GUARD: 2424 return (bdev->dif_check_flags & SPDK_DIF_FLAGS_GUARD_CHECK) != 0; 2425 default: 2426 return false; 2427 } 2428 } 2429 2430 uint64_t 2431 spdk_bdev_get_qd(const struct spdk_bdev *bdev) 2432 { 2433 return bdev->internal.measured_queue_depth; 2434 } 2435 2436 uint64_t 2437 spdk_bdev_get_qd_sampling_period(const struct spdk_bdev *bdev) 2438 { 2439 return bdev->internal.period; 2440 } 2441 2442 uint64_t 2443 spdk_bdev_get_weighted_io_time(const struct spdk_bdev *bdev) 2444 { 2445 return bdev->internal.weighted_io_time; 2446 } 2447 2448 uint64_t 2449 spdk_bdev_get_io_time(const struct spdk_bdev *bdev) 2450 { 2451 return bdev->internal.io_time; 2452 } 2453 2454 static void 2455 _calculate_measured_qd_cpl(struct spdk_io_channel_iter *i, int status) 2456 { 2457 struct spdk_bdev *bdev = spdk_io_channel_iter_get_ctx(i); 2458 2459 bdev->internal.measured_queue_depth = bdev->internal.temporary_queue_depth; 2460 2461 if (bdev->internal.measured_queue_depth) { 2462 bdev->internal.io_time += bdev->internal.period; 2463 bdev->internal.weighted_io_time += bdev->internal.period * bdev->internal.measured_queue_depth; 2464 } 2465 } 2466 2467 static void 2468 _calculate_measured_qd(struct spdk_io_channel_iter *i) 2469 { 2470 struct spdk_bdev *bdev = spdk_io_channel_iter_get_ctx(i); 2471 struct spdk_io_channel *io_ch = spdk_io_channel_iter_get_channel(i); 2472 struct spdk_bdev_channel *ch = spdk_io_channel_get_ctx(io_ch); 2473 2474 bdev->internal.temporary_queue_depth += ch->io_outstanding; 2475 spdk_for_each_channel_continue(i, 0); 2476 } 2477 2478 static int 2479 spdk_bdev_calculate_measured_queue_depth(void *ctx) 2480 { 2481 struct spdk_bdev *bdev = ctx; 2482 bdev->internal.temporary_queue_depth = 0; 2483 spdk_for_each_channel(__bdev_to_io_dev(bdev), _calculate_measured_qd, bdev, 2484 _calculate_measured_qd_cpl); 2485 return 0; 2486 } 2487 2488 void 2489 spdk_bdev_set_qd_sampling_period(struct spdk_bdev *bdev, uint64_t period) 2490 { 2491 bdev->internal.period = period; 2492 2493 if (bdev->internal.qd_poller != NULL) { 2494 spdk_poller_unregister(&bdev->internal.qd_poller); 2495 bdev->internal.measured_queue_depth = UINT64_MAX; 2496 } 2497 2498 if (period != 0) { 2499 bdev->internal.qd_poller = spdk_poller_register(spdk_bdev_calculate_measured_queue_depth, bdev, 2500 period); 2501 } 2502 } 2503 2504 int 2505 spdk_bdev_notify_blockcnt_change(struct spdk_bdev *bdev, uint64_t size) 2506 { 2507 int ret; 2508 2509 pthread_mutex_lock(&bdev->internal.mutex); 2510 2511 /* bdev has open descriptors */ 2512 if (!TAILQ_EMPTY(&bdev->internal.open_descs) && 2513 bdev->blockcnt > size) { 2514 ret = -EBUSY; 2515 } else { 2516 bdev->blockcnt = size; 2517 ret = 0; 2518 } 2519 2520 pthread_mutex_unlock(&bdev->internal.mutex); 2521 2522 return ret; 2523 } 2524 2525 /* 2526 * Convert I/O offset and length from bytes to blocks. 2527 * 2528 * Returns zero on success or non-zero if the byte parameters aren't divisible by the block size. 2529 */ 2530 static uint64_t 2531 spdk_bdev_bytes_to_blocks(struct spdk_bdev *bdev, uint64_t offset_bytes, uint64_t *offset_blocks, 2532 uint64_t num_bytes, uint64_t *num_blocks) 2533 { 2534 uint32_t block_size = bdev->blocklen; 2535 uint8_t shift_cnt; 2536 2537 /* Avoid expensive div operations if possible. These spdk_u32 functions are very cheap. */ 2538 if (spdk_likely(spdk_u32_is_pow2(block_size))) { 2539 shift_cnt = spdk_u32log2(block_size); 2540 *offset_blocks = offset_bytes >> shift_cnt; 2541 *num_blocks = num_bytes >> shift_cnt; 2542 return (offset_bytes - (*offset_blocks << shift_cnt)) | 2543 (num_bytes - (*num_blocks << shift_cnt)); 2544 } else { 2545 *offset_blocks = offset_bytes / block_size; 2546 *num_blocks = num_bytes / block_size; 2547 return (offset_bytes % block_size) | (num_bytes % block_size); 2548 } 2549 } 2550 2551 static bool 2552 spdk_bdev_io_valid_blocks(struct spdk_bdev *bdev, uint64_t offset_blocks, uint64_t num_blocks) 2553 { 2554 /* Return failure if offset_blocks + num_blocks is less than offset_blocks; indicates there 2555 * has been an overflow and hence the offset has been wrapped around */ 2556 if (offset_blocks + num_blocks < offset_blocks) { 2557 return false; 2558 } 2559 2560 /* Return failure if offset_blocks + num_blocks exceeds the size of the bdev */ 2561 if (offset_blocks + num_blocks > bdev->blockcnt) { 2562 return false; 2563 } 2564 2565 return true; 2566 } 2567 2568 int 2569 spdk_bdev_read(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 2570 void *buf, uint64_t offset, uint64_t nbytes, 2571 spdk_bdev_io_completion_cb cb, void *cb_arg) 2572 { 2573 uint64_t offset_blocks, num_blocks; 2574 2575 if (spdk_bdev_bytes_to_blocks(desc->bdev, offset, &offset_blocks, nbytes, &num_blocks) != 0) { 2576 return -EINVAL; 2577 } 2578 2579 return spdk_bdev_read_blocks(desc, ch, buf, offset_blocks, num_blocks, cb, cb_arg); 2580 } 2581 2582 int 2583 spdk_bdev_read_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 2584 void *buf, uint64_t offset_blocks, uint64_t num_blocks, 2585 spdk_bdev_io_completion_cb cb, void *cb_arg) 2586 { 2587 struct spdk_bdev *bdev = desc->bdev; 2588 struct spdk_bdev_io *bdev_io; 2589 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 2590 2591 if (!spdk_bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) { 2592 return -EINVAL; 2593 } 2594 2595 bdev_io = spdk_bdev_get_io(channel); 2596 if (!bdev_io) { 2597 return -ENOMEM; 2598 } 2599 2600 bdev_io->internal.ch = channel; 2601 bdev_io->internal.desc = desc; 2602 bdev_io->type = SPDK_BDEV_IO_TYPE_READ; 2603 bdev_io->u.bdev.iovs = &bdev_io->iov; 2604 bdev_io->u.bdev.iovs[0].iov_base = buf; 2605 bdev_io->u.bdev.iovs[0].iov_len = num_blocks * bdev->blocklen; 2606 bdev_io->u.bdev.iovcnt = 1; 2607 bdev_io->u.bdev.num_blocks = num_blocks; 2608 bdev_io->u.bdev.offset_blocks = offset_blocks; 2609 spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb); 2610 2611 spdk_bdev_io_submit(bdev_io); 2612 return 0; 2613 } 2614 2615 int 2616 spdk_bdev_readv(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 2617 struct iovec *iov, int iovcnt, 2618 uint64_t offset, uint64_t nbytes, 2619 spdk_bdev_io_completion_cb cb, void *cb_arg) 2620 { 2621 uint64_t offset_blocks, num_blocks; 2622 2623 if (spdk_bdev_bytes_to_blocks(desc->bdev, offset, &offset_blocks, nbytes, &num_blocks) != 0) { 2624 return -EINVAL; 2625 } 2626 2627 return spdk_bdev_readv_blocks(desc, ch, iov, iovcnt, offset_blocks, num_blocks, cb, cb_arg); 2628 } 2629 2630 int spdk_bdev_readv_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 2631 struct iovec *iov, int iovcnt, 2632 uint64_t offset_blocks, uint64_t num_blocks, 2633 spdk_bdev_io_completion_cb cb, void *cb_arg) 2634 { 2635 struct spdk_bdev *bdev = desc->bdev; 2636 struct spdk_bdev_io *bdev_io; 2637 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 2638 2639 if (!spdk_bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) { 2640 return -EINVAL; 2641 } 2642 2643 bdev_io = spdk_bdev_get_io(channel); 2644 if (!bdev_io) { 2645 return -ENOMEM; 2646 } 2647 2648 bdev_io->internal.ch = channel; 2649 bdev_io->internal.desc = desc; 2650 bdev_io->type = SPDK_BDEV_IO_TYPE_READ; 2651 bdev_io->u.bdev.iovs = iov; 2652 bdev_io->u.bdev.iovcnt = iovcnt; 2653 bdev_io->u.bdev.num_blocks = num_blocks; 2654 bdev_io->u.bdev.offset_blocks = offset_blocks; 2655 spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb); 2656 2657 spdk_bdev_io_submit(bdev_io); 2658 return 0; 2659 } 2660 2661 int 2662 spdk_bdev_write(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 2663 void *buf, uint64_t offset, uint64_t nbytes, 2664 spdk_bdev_io_completion_cb cb, void *cb_arg) 2665 { 2666 uint64_t offset_blocks, num_blocks; 2667 2668 if (spdk_bdev_bytes_to_blocks(desc->bdev, offset, &offset_blocks, nbytes, &num_blocks) != 0) { 2669 return -EINVAL; 2670 } 2671 2672 return spdk_bdev_write_blocks(desc, ch, buf, offset_blocks, num_blocks, cb, cb_arg); 2673 } 2674 2675 int 2676 spdk_bdev_write_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 2677 void *buf, uint64_t offset_blocks, uint64_t num_blocks, 2678 spdk_bdev_io_completion_cb cb, void *cb_arg) 2679 { 2680 struct spdk_bdev *bdev = desc->bdev; 2681 struct spdk_bdev_io *bdev_io; 2682 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 2683 2684 if (!desc->write) { 2685 return -EBADF; 2686 } 2687 2688 if (!spdk_bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) { 2689 return -EINVAL; 2690 } 2691 2692 bdev_io = spdk_bdev_get_io(channel); 2693 if (!bdev_io) { 2694 return -ENOMEM; 2695 } 2696 2697 bdev_io->internal.ch = channel; 2698 bdev_io->internal.desc = desc; 2699 bdev_io->type = SPDK_BDEV_IO_TYPE_WRITE; 2700 bdev_io->u.bdev.iovs = &bdev_io->iov; 2701 bdev_io->u.bdev.iovs[0].iov_base = buf; 2702 bdev_io->u.bdev.iovs[0].iov_len = num_blocks * bdev->blocklen; 2703 bdev_io->u.bdev.iovcnt = 1; 2704 bdev_io->u.bdev.num_blocks = num_blocks; 2705 bdev_io->u.bdev.offset_blocks = offset_blocks; 2706 spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb); 2707 2708 spdk_bdev_io_submit(bdev_io); 2709 return 0; 2710 } 2711 2712 int 2713 spdk_bdev_writev(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 2714 struct iovec *iov, int iovcnt, 2715 uint64_t offset, uint64_t len, 2716 spdk_bdev_io_completion_cb cb, void *cb_arg) 2717 { 2718 uint64_t offset_blocks, num_blocks; 2719 2720 if (spdk_bdev_bytes_to_blocks(desc->bdev, offset, &offset_blocks, len, &num_blocks) != 0) { 2721 return -EINVAL; 2722 } 2723 2724 return spdk_bdev_writev_blocks(desc, ch, iov, iovcnt, offset_blocks, num_blocks, cb, cb_arg); 2725 } 2726 2727 int 2728 spdk_bdev_writev_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 2729 struct iovec *iov, int iovcnt, 2730 uint64_t offset_blocks, uint64_t num_blocks, 2731 spdk_bdev_io_completion_cb cb, void *cb_arg) 2732 { 2733 struct spdk_bdev *bdev = desc->bdev; 2734 struct spdk_bdev_io *bdev_io; 2735 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 2736 2737 if (!desc->write) { 2738 return -EBADF; 2739 } 2740 2741 if (!spdk_bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) { 2742 return -EINVAL; 2743 } 2744 2745 bdev_io = spdk_bdev_get_io(channel); 2746 if (!bdev_io) { 2747 return -ENOMEM; 2748 } 2749 2750 bdev_io->internal.ch = channel; 2751 bdev_io->internal.desc = desc; 2752 bdev_io->type = SPDK_BDEV_IO_TYPE_WRITE; 2753 bdev_io->u.bdev.iovs = iov; 2754 bdev_io->u.bdev.iovcnt = iovcnt; 2755 bdev_io->u.bdev.num_blocks = num_blocks; 2756 bdev_io->u.bdev.offset_blocks = offset_blocks; 2757 spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb); 2758 2759 spdk_bdev_io_submit(bdev_io); 2760 return 0; 2761 } 2762 2763 static void 2764 bdev_zcopy_get_buf(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io, bool success) 2765 { 2766 if (!success) { 2767 /* Don't use spdk_bdev_io_complete here - this bdev_io was never actually submitted. */ 2768 bdev_io->internal.status = SPDK_BDEV_IO_STATUS_NOMEM; 2769 bdev_io->internal.cb(bdev_io, success, bdev_io->internal.caller_ctx); 2770 return; 2771 } 2772 2773 if (bdev_io->u.bdev.zcopy.populate) { 2774 /* Read the real data into the buffer */ 2775 bdev_io->type = SPDK_BDEV_IO_TYPE_READ; 2776 bdev_io->internal.status = SPDK_BDEV_IO_STATUS_PENDING; 2777 spdk_bdev_io_submit(bdev_io); 2778 return; 2779 } 2780 2781 /* Don't use spdk_bdev_io_complete here - this bdev_io was never actually submitted. */ 2782 bdev_io->internal.status = SPDK_BDEV_IO_STATUS_SUCCESS; 2783 bdev_io->internal.cb(bdev_io, success, bdev_io->internal.caller_ctx); 2784 } 2785 2786 int 2787 spdk_bdev_zcopy_start(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 2788 uint64_t offset_blocks, uint64_t num_blocks, 2789 bool populate, 2790 spdk_bdev_io_completion_cb cb, void *cb_arg) 2791 { 2792 struct spdk_bdev *bdev = desc->bdev; 2793 struct spdk_bdev_io *bdev_io; 2794 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 2795 2796 if (!desc->write) { 2797 return -EBADF; 2798 } 2799 2800 if (!spdk_bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) { 2801 return -EINVAL; 2802 } 2803 2804 if (!spdk_bdev_io_type_supported(bdev, SPDK_BDEV_IO_TYPE_ZCOPY)) { 2805 return -ENOTSUP; 2806 } 2807 2808 bdev_io = spdk_bdev_get_io(channel); 2809 if (!bdev_io) { 2810 return -ENOMEM; 2811 } 2812 2813 bdev_io->internal.ch = channel; 2814 bdev_io->internal.desc = desc; 2815 bdev_io->type = SPDK_BDEV_IO_TYPE_ZCOPY; 2816 bdev_io->u.bdev.num_blocks = num_blocks; 2817 bdev_io->u.bdev.offset_blocks = offset_blocks; 2818 bdev_io->u.bdev.iovs = NULL; 2819 bdev_io->u.bdev.iovcnt = 0; 2820 bdev_io->u.bdev.zcopy.populate = populate ? 1 : 0; 2821 bdev_io->u.bdev.zcopy.commit = 0; 2822 bdev_io->u.bdev.zcopy.start = 1; 2823 spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb); 2824 2825 if (_spdk_bdev_io_type_supported(bdev, SPDK_BDEV_IO_TYPE_ZCOPY)) { 2826 spdk_bdev_io_submit(bdev_io); 2827 } else { 2828 /* Emulate zcopy by allocating a buffer */ 2829 spdk_bdev_io_get_buf(bdev_io, bdev_zcopy_get_buf, 2830 bdev_io->u.bdev.num_blocks * bdev->blocklen); 2831 } 2832 2833 return 0; 2834 } 2835 2836 int 2837 spdk_bdev_zcopy_end(struct spdk_bdev_io *bdev_io, bool commit, 2838 spdk_bdev_io_completion_cb cb, void *cb_arg) 2839 { 2840 struct spdk_bdev *bdev = bdev_io->bdev; 2841 2842 if (bdev_io->type == SPDK_BDEV_IO_TYPE_READ) { 2843 /* This can happen if the zcopy was emulated in start */ 2844 if (bdev_io->u.bdev.zcopy.start != 1) { 2845 return -EINVAL; 2846 } 2847 bdev_io->type = SPDK_BDEV_IO_TYPE_ZCOPY; 2848 } 2849 2850 if (bdev_io->type != SPDK_BDEV_IO_TYPE_ZCOPY) { 2851 return -EINVAL; 2852 } 2853 2854 bdev_io->u.bdev.zcopy.commit = commit ? 1 : 0; 2855 bdev_io->u.bdev.zcopy.start = 0; 2856 bdev_io->internal.caller_ctx = cb_arg; 2857 bdev_io->internal.cb = cb; 2858 bdev_io->internal.status = SPDK_BDEV_IO_STATUS_PENDING; 2859 2860 if (_spdk_bdev_io_type_supported(bdev, SPDK_BDEV_IO_TYPE_ZCOPY)) { 2861 spdk_bdev_io_submit(bdev_io); 2862 return 0; 2863 } 2864 2865 if (!bdev_io->u.bdev.zcopy.commit) { 2866 /* Don't use spdk_bdev_io_complete here - this bdev_io was never actually submitted. */ 2867 bdev_io->internal.status = SPDK_BDEV_IO_STATUS_SUCCESS; 2868 bdev_io->internal.cb(bdev_io, true, bdev_io->internal.caller_ctx); 2869 return 0; 2870 } 2871 2872 bdev_io->type = SPDK_BDEV_IO_TYPE_WRITE; 2873 spdk_bdev_io_submit(bdev_io); 2874 2875 return 0; 2876 } 2877 2878 int 2879 spdk_bdev_write_zeroes(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 2880 uint64_t offset, uint64_t len, 2881 spdk_bdev_io_completion_cb cb, void *cb_arg) 2882 { 2883 uint64_t offset_blocks, num_blocks; 2884 2885 if (spdk_bdev_bytes_to_blocks(desc->bdev, offset, &offset_blocks, len, &num_blocks) != 0) { 2886 return -EINVAL; 2887 } 2888 2889 return spdk_bdev_write_zeroes_blocks(desc, ch, offset_blocks, num_blocks, cb, cb_arg); 2890 } 2891 2892 int 2893 spdk_bdev_write_zeroes_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 2894 uint64_t offset_blocks, uint64_t num_blocks, 2895 spdk_bdev_io_completion_cb cb, void *cb_arg) 2896 { 2897 struct spdk_bdev *bdev = desc->bdev; 2898 struct spdk_bdev_io *bdev_io; 2899 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 2900 2901 if (!desc->write) { 2902 return -EBADF; 2903 } 2904 2905 if (!spdk_bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) { 2906 return -EINVAL; 2907 } 2908 2909 bdev_io = spdk_bdev_get_io(channel); 2910 2911 if (!bdev_io) { 2912 return -ENOMEM; 2913 } 2914 2915 bdev_io->type = SPDK_BDEV_IO_TYPE_WRITE_ZEROES; 2916 bdev_io->internal.ch = channel; 2917 bdev_io->internal.desc = desc; 2918 bdev_io->u.bdev.offset_blocks = offset_blocks; 2919 bdev_io->u.bdev.num_blocks = num_blocks; 2920 spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb); 2921 2922 if (_spdk_bdev_io_type_supported(bdev, SPDK_BDEV_IO_TYPE_WRITE_ZEROES)) { 2923 spdk_bdev_io_submit(bdev_io); 2924 return 0; 2925 } else if (_spdk_bdev_io_type_supported(bdev, SPDK_BDEV_IO_TYPE_WRITE)) { 2926 assert(spdk_bdev_get_block_size(bdev) <= ZERO_BUFFER_SIZE); 2927 bdev_io->u.bdev.split_remaining_num_blocks = num_blocks; 2928 bdev_io->u.bdev.split_current_offset_blocks = offset_blocks; 2929 _spdk_bdev_write_zero_buffer_next(bdev_io); 2930 return 0; 2931 } else { 2932 spdk_bdev_free_io(bdev_io); 2933 return -ENOTSUP; 2934 } 2935 } 2936 2937 int 2938 spdk_bdev_unmap(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 2939 uint64_t offset, uint64_t nbytes, 2940 spdk_bdev_io_completion_cb cb, void *cb_arg) 2941 { 2942 uint64_t offset_blocks, num_blocks; 2943 2944 if (spdk_bdev_bytes_to_blocks(desc->bdev, offset, &offset_blocks, nbytes, &num_blocks) != 0) { 2945 return -EINVAL; 2946 } 2947 2948 return spdk_bdev_unmap_blocks(desc, ch, offset_blocks, num_blocks, cb, cb_arg); 2949 } 2950 2951 int 2952 spdk_bdev_unmap_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 2953 uint64_t offset_blocks, uint64_t num_blocks, 2954 spdk_bdev_io_completion_cb cb, void *cb_arg) 2955 { 2956 struct spdk_bdev *bdev = desc->bdev; 2957 struct spdk_bdev_io *bdev_io; 2958 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 2959 2960 if (!desc->write) { 2961 return -EBADF; 2962 } 2963 2964 if (!spdk_bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) { 2965 return -EINVAL; 2966 } 2967 2968 if (num_blocks == 0) { 2969 SPDK_ERRLOG("Can't unmap 0 bytes\n"); 2970 return -EINVAL; 2971 } 2972 2973 bdev_io = spdk_bdev_get_io(channel); 2974 if (!bdev_io) { 2975 return -ENOMEM; 2976 } 2977 2978 bdev_io->internal.ch = channel; 2979 bdev_io->internal.desc = desc; 2980 bdev_io->type = SPDK_BDEV_IO_TYPE_UNMAP; 2981 2982 bdev_io->u.bdev.iovs = &bdev_io->iov; 2983 bdev_io->u.bdev.iovs[0].iov_base = NULL; 2984 bdev_io->u.bdev.iovs[0].iov_len = 0; 2985 bdev_io->u.bdev.iovcnt = 1; 2986 2987 bdev_io->u.bdev.offset_blocks = offset_blocks; 2988 bdev_io->u.bdev.num_blocks = num_blocks; 2989 spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb); 2990 2991 spdk_bdev_io_submit(bdev_io); 2992 return 0; 2993 } 2994 2995 int 2996 spdk_bdev_flush(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 2997 uint64_t offset, uint64_t length, 2998 spdk_bdev_io_completion_cb cb, void *cb_arg) 2999 { 3000 uint64_t offset_blocks, num_blocks; 3001 3002 if (spdk_bdev_bytes_to_blocks(desc->bdev, offset, &offset_blocks, length, &num_blocks) != 0) { 3003 return -EINVAL; 3004 } 3005 3006 return spdk_bdev_flush_blocks(desc, ch, offset_blocks, num_blocks, cb, cb_arg); 3007 } 3008 3009 int 3010 spdk_bdev_flush_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 3011 uint64_t offset_blocks, uint64_t num_blocks, 3012 spdk_bdev_io_completion_cb cb, void *cb_arg) 3013 { 3014 struct spdk_bdev *bdev = desc->bdev; 3015 struct spdk_bdev_io *bdev_io; 3016 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 3017 3018 if (!desc->write) { 3019 return -EBADF; 3020 } 3021 3022 if (!spdk_bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) { 3023 return -EINVAL; 3024 } 3025 3026 bdev_io = spdk_bdev_get_io(channel); 3027 if (!bdev_io) { 3028 return -ENOMEM; 3029 } 3030 3031 bdev_io->internal.ch = channel; 3032 bdev_io->internal.desc = desc; 3033 bdev_io->type = SPDK_BDEV_IO_TYPE_FLUSH; 3034 bdev_io->u.bdev.iovs = NULL; 3035 bdev_io->u.bdev.iovcnt = 0; 3036 bdev_io->u.bdev.offset_blocks = offset_blocks; 3037 bdev_io->u.bdev.num_blocks = num_blocks; 3038 spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb); 3039 3040 spdk_bdev_io_submit(bdev_io); 3041 return 0; 3042 } 3043 3044 static void 3045 _spdk_bdev_reset_dev(struct spdk_io_channel_iter *i, int status) 3046 { 3047 struct spdk_bdev_channel *ch = spdk_io_channel_iter_get_ctx(i); 3048 struct spdk_bdev_io *bdev_io; 3049 3050 bdev_io = TAILQ_FIRST(&ch->queued_resets); 3051 TAILQ_REMOVE(&ch->queued_resets, bdev_io, internal.link); 3052 spdk_bdev_io_submit_reset(bdev_io); 3053 } 3054 3055 static void 3056 _spdk_bdev_reset_freeze_channel(struct spdk_io_channel_iter *i) 3057 { 3058 struct spdk_io_channel *ch; 3059 struct spdk_bdev_channel *channel; 3060 struct spdk_bdev_mgmt_channel *mgmt_channel; 3061 struct spdk_bdev_shared_resource *shared_resource; 3062 bdev_io_tailq_t tmp_queued; 3063 3064 TAILQ_INIT(&tmp_queued); 3065 3066 ch = spdk_io_channel_iter_get_channel(i); 3067 channel = spdk_io_channel_get_ctx(ch); 3068 shared_resource = channel->shared_resource; 3069 mgmt_channel = shared_resource->mgmt_ch; 3070 3071 channel->flags |= BDEV_CH_RESET_IN_PROGRESS; 3072 3073 if ((channel->flags & BDEV_CH_QOS_ENABLED) != 0) { 3074 /* The QoS object is always valid and readable while 3075 * the channel flag is set, so the lock here should not 3076 * be necessary. We're not in the fast path though, so 3077 * just take it anyway. */ 3078 pthread_mutex_lock(&channel->bdev->internal.mutex); 3079 if (channel->bdev->internal.qos->ch == channel) { 3080 TAILQ_SWAP(&channel->bdev->internal.qos->queued, &tmp_queued, spdk_bdev_io, internal.link); 3081 } 3082 pthread_mutex_unlock(&channel->bdev->internal.mutex); 3083 } 3084 3085 _spdk_bdev_abort_queued_io(&shared_resource->nomem_io, channel); 3086 _spdk_bdev_abort_buf_io(&mgmt_channel->need_buf_small, channel); 3087 _spdk_bdev_abort_buf_io(&mgmt_channel->need_buf_large, channel); 3088 _spdk_bdev_abort_queued_io(&tmp_queued, channel); 3089 3090 spdk_for_each_channel_continue(i, 0); 3091 } 3092 3093 static void 3094 _spdk_bdev_start_reset(void *ctx) 3095 { 3096 struct spdk_bdev_channel *ch = ctx; 3097 3098 spdk_for_each_channel(__bdev_to_io_dev(ch->bdev), _spdk_bdev_reset_freeze_channel, 3099 ch, _spdk_bdev_reset_dev); 3100 } 3101 3102 static void 3103 _spdk_bdev_channel_start_reset(struct spdk_bdev_channel *ch) 3104 { 3105 struct spdk_bdev *bdev = ch->bdev; 3106 3107 assert(!TAILQ_EMPTY(&ch->queued_resets)); 3108 3109 pthread_mutex_lock(&bdev->internal.mutex); 3110 if (bdev->internal.reset_in_progress == NULL) { 3111 bdev->internal.reset_in_progress = TAILQ_FIRST(&ch->queued_resets); 3112 /* 3113 * Take a channel reference for the target bdev for the life of this 3114 * reset. This guards against the channel getting destroyed while 3115 * spdk_for_each_channel() calls related to this reset IO are in 3116 * progress. We will release the reference when this reset is 3117 * completed. 3118 */ 3119 bdev->internal.reset_in_progress->u.reset.ch_ref = spdk_get_io_channel(__bdev_to_io_dev(bdev)); 3120 _spdk_bdev_start_reset(ch); 3121 } 3122 pthread_mutex_unlock(&bdev->internal.mutex); 3123 } 3124 3125 int 3126 spdk_bdev_reset(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 3127 spdk_bdev_io_completion_cb cb, void *cb_arg) 3128 { 3129 struct spdk_bdev *bdev = desc->bdev; 3130 struct spdk_bdev_io *bdev_io; 3131 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 3132 3133 bdev_io = spdk_bdev_get_io(channel); 3134 if (!bdev_io) { 3135 return -ENOMEM; 3136 } 3137 3138 bdev_io->internal.ch = channel; 3139 bdev_io->internal.desc = desc; 3140 bdev_io->type = SPDK_BDEV_IO_TYPE_RESET; 3141 bdev_io->u.reset.ch_ref = NULL; 3142 spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb); 3143 3144 pthread_mutex_lock(&bdev->internal.mutex); 3145 TAILQ_INSERT_TAIL(&channel->queued_resets, bdev_io, internal.link); 3146 pthread_mutex_unlock(&bdev->internal.mutex); 3147 3148 _spdk_bdev_channel_start_reset(channel); 3149 3150 return 0; 3151 } 3152 3153 void 3154 spdk_bdev_get_io_stat(struct spdk_bdev *bdev, struct spdk_io_channel *ch, 3155 struct spdk_bdev_io_stat *stat) 3156 { 3157 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 3158 3159 *stat = channel->stat; 3160 } 3161 3162 static void 3163 _spdk_bdev_get_device_stat_done(struct spdk_io_channel_iter *i, int status) 3164 { 3165 void *io_device = spdk_io_channel_iter_get_io_device(i); 3166 struct spdk_bdev_iostat_ctx *bdev_iostat_ctx = spdk_io_channel_iter_get_ctx(i); 3167 3168 bdev_iostat_ctx->cb(__bdev_from_io_dev(io_device), bdev_iostat_ctx->stat, 3169 bdev_iostat_ctx->cb_arg, 0); 3170 free(bdev_iostat_ctx); 3171 } 3172 3173 static void 3174 _spdk_bdev_get_each_channel_stat(struct spdk_io_channel_iter *i) 3175 { 3176 struct spdk_bdev_iostat_ctx *bdev_iostat_ctx = spdk_io_channel_iter_get_ctx(i); 3177 struct spdk_io_channel *ch = spdk_io_channel_iter_get_channel(i); 3178 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 3179 3180 _spdk_bdev_io_stat_add(bdev_iostat_ctx->stat, &channel->stat); 3181 spdk_for_each_channel_continue(i, 0); 3182 } 3183 3184 void 3185 spdk_bdev_get_device_stat(struct spdk_bdev *bdev, struct spdk_bdev_io_stat *stat, 3186 spdk_bdev_get_device_stat_cb cb, void *cb_arg) 3187 { 3188 struct spdk_bdev_iostat_ctx *bdev_iostat_ctx; 3189 3190 assert(bdev != NULL); 3191 assert(stat != NULL); 3192 assert(cb != NULL); 3193 3194 bdev_iostat_ctx = calloc(1, sizeof(struct spdk_bdev_iostat_ctx)); 3195 if (bdev_iostat_ctx == NULL) { 3196 SPDK_ERRLOG("Unable to allocate memory for spdk_bdev_iostat_ctx\n"); 3197 cb(bdev, stat, cb_arg, -ENOMEM); 3198 return; 3199 } 3200 3201 bdev_iostat_ctx->stat = stat; 3202 bdev_iostat_ctx->cb = cb; 3203 bdev_iostat_ctx->cb_arg = cb_arg; 3204 3205 /* Start with the statistics from previously deleted channels. */ 3206 pthread_mutex_lock(&bdev->internal.mutex); 3207 _spdk_bdev_io_stat_add(bdev_iostat_ctx->stat, &bdev->internal.stat); 3208 pthread_mutex_unlock(&bdev->internal.mutex); 3209 3210 /* Then iterate and add the statistics from each existing channel. */ 3211 spdk_for_each_channel(__bdev_to_io_dev(bdev), 3212 _spdk_bdev_get_each_channel_stat, 3213 bdev_iostat_ctx, 3214 _spdk_bdev_get_device_stat_done); 3215 } 3216 3217 int 3218 spdk_bdev_nvme_admin_passthru(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 3219 const struct spdk_nvme_cmd *cmd, void *buf, size_t nbytes, 3220 spdk_bdev_io_completion_cb cb, void *cb_arg) 3221 { 3222 struct spdk_bdev *bdev = desc->bdev; 3223 struct spdk_bdev_io *bdev_io; 3224 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 3225 3226 if (!desc->write) { 3227 return -EBADF; 3228 } 3229 3230 bdev_io = spdk_bdev_get_io(channel); 3231 if (!bdev_io) { 3232 return -ENOMEM; 3233 } 3234 3235 bdev_io->internal.ch = channel; 3236 bdev_io->internal.desc = desc; 3237 bdev_io->type = SPDK_BDEV_IO_TYPE_NVME_ADMIN; 3238 bdev_io->u.nvme_passthru.cmd = *cmd; 3239 bdev_io->u.nvme_passthru.buf = buf; 3240 bdev_io->u.nvme_passthru.nbytes = nbytes; 3241 bdev_io->u.nvme_passthru.md_buf = NULL; 3242 bdev_io->u.nvme_passthru.md_len = 0; 3243 3244 spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb); 3245 3246 spdk_bdev_io_submit(bdev_io); 3247 return 0; 3248 } 3249 3250 int 3251 spdk_bdev_nvme_io_passthru(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 3252 const struct spdk_nvme_cmd *cmd, void *buf, size_t nbytes, 3253 spdk_bdev_io_completion_cb cb, void *cb_arg) 3254 { 3255 struct spdk_bdev *bdev = desc->bdev; 3256 struct spdk_bdev_io *bdev_io; 3257 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 3258 3259 if (!desc->write) { 3260 /* 3261 * Do not try to parse the NVMe command - we could maybe use bits in the opcode 3262 * to easily determine if the command is a read or write, but for now just 3263 * do not allow io_passthru with a read-only descriptor. 3264 */ 3265 return -EBADF; 3266 } 3267 3268 bdev_io = spdk_bdev_get_io(channel); 3269 if (!bdev_io) { 3270 return -ENOMEM; 3271 } 3272 3273 bdev_io->internal.ch = channel; 3274 bdev_io->internal.desc = desc; 3275 bdev_io->type = SPDK_BDEV_IO_TYPE_NVME_IO; 3276 bdev_io->u.nvme_passthru.cmd = *cmd; 3277 bdev_io->u.nvme_passthru.buf = buf; 3278 bdev_io->u.nvme_passthru.nbytes = nbytes; 3279 bdev_io->u.nvme_passthru.md_buf = NULL; 3280 bdev_io->u.nvme_passthru.md_len = 0; 3281 3282 spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb); 3283 3284 spdk_bdev_io_submit(bdev_io); 3285 return 0; 3286 } 3287 3288 int 3289 spdk_bdev_nvme_io_passthru_md(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 3290 const struct spdk_nvme_cmd *cmd, void *buf, size_t nbytes, void *md_buf, size_t md_len, 3291 spdk_bdev_io_completion_cb cb, void *cb_arg) 3292 { 3293 struct spdk_bdev *bdev = desc->bdev; 3294 struct spdk_bdev_io *bdev_io; 3295 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 3296 3297 if (!desc->write) { 3298 /* 3299 * Do not try to parse the NVMe command - we could maybe use bits in the opcode 3300 * to easily determine if the command is a read or write, but for now just 3301 * do not allow io_passthru with a read-only descriptor. 3302 */ 3303 return -EBADF; 3304 } 3305 3306 bdev_io = spdk_bdev_get_io(channel); 3307 if (!bdev_io) { 3308 return -ENOMEM; 3309 } 3310 3311 bdev_io->internal.ch = channel; 3312 bdev_io->internal.desc = desc; 3313 bdev_io->type = SPDK_BDEV_IO_TYPE_NVME_IO_MD; 3314 bdev_io->u.nvme_passthru.cmd = *cmd; 3315 bdev_io->u.nvme_passthru.buf = buf; 3316 bdev_io->u.nvme_passthru.nbytes = nbytes; 3317 bdev_io->u.nvme_passthru.md_buf = md_buf; 3318 bdev_io->u.nvme_passthru.md_len = md_len; 3319 3320 spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb); 3321 3322 spdk_bdev_io_submit(bdev_io); 3323 return 0; 3324 } 3325 3326 int 3327 spdk_bdev_queue_io_wait(struct spdk_bdev *bdev, struct spdk_io_channel *ch, 3328 struct spdk_bdev_io_wait_entry *entry) 3329 { 3330 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 3331 struct spdk_bdev_mgmt_channel *mgmt_ch = channel->shared_resource->mgmt_ch; 3332 3333 if (bdev != entry->bdev) { 3334 SPDK_ERRLOG("bdevs do not match\n"); 3335 return -EINVAL; 3336 } 3337 3338 if (mgmt_ch->per_thread_cache_count > 0) { 3339 SPDK_ERRLOG("Cannot queue io_wait if spdk_bdev_io available in per-thread cache\n"); 3340 return -EINVAL; 3341 } 3342 3343 TAILQ_INSERT_TAIL(&mgmt_ch->io_wait_queue, entry, link); 3344 return 0; 3345 } 3346 3347 static void 3348 _spdk_bdev_ch_retry_io(struct spdk_bdev_channel *bdev_ch) 3349 { 3350 struct spdk_bdev *bdev = bdev_ch->bdev; 3351 struct spdk_bdev_shared_resource *shared_resource = bdev_ch->shared_resource; 3352 struct spdk_bdev_io *bdev_io; 3353 3354 if (shared_resource->io_outstanding > shared_resource->nomem_threshold) { 3355 /* 3356 * Allow some more I/O to complete before retrying the nomem_io queue. 3357 * Some drivers (such as nvme) cannot immediately take a new I/O in 3358 * the context of a completion, because the resources for the I/O are 3359 * not released until control returns to the bdev poller. Also, we 3360 * may require several small I/O to complete before a larger I/O 3361 * (that requires splitting) can be submitted. 3362 */ 3363 return; 3364 } 3365 3366 while (!TAILQ_EMPTY(&shared_resource->nomem_io)) { 3367 bdev_io = TAILQ_FIRST(&shared_resource->nomem_io); 3368 TAILQ_REMOVE(&shared_resource->nomem_io, bdev_io, internal.link); 3369 bdev_io->internal.ch->io_outstanding++; 3370 shared_resource->io_outstanding++; 3371 bdev_io->internal.status = SPDK_BDEV_IO_STATUS_PENDING; 3372 bdev->fn_table->submit_request(bdev_io->internal.ch->channel, bdev_io); 3373 if (bdev_io->internal.status == SPDK_BDEV_IO_STATUS_NOMEM) { 3374 break; 3375 } 3376 } 3377 } 3378 3379 static inline void 3380 _spdk_bdev_io_complete(void *ctx) 3381 { 3382 struct spdk_bdev_io *bdev_io = ctx; 3383 uint64_t tsc, tsc_diff; 3384 3385 if (spdk_unlikely(bdev_io->internal.in_submit_request || bdev_io->internal.io_submit_ch)) { 3386 /* 3387 * Send the completion to the thread that originally submitted the I/O, 3388 * which may not be the current thread in the case of QoS. 3389 */ 3390 if (bdev_io->internal.io_submit_ch) { 3391 bdev_io->internal.ch = bdev_io->internal.io_submit_ch; 3392 bdev_io->internal.io_submit_ch = NULL; 3393 } 3394 3395 /* 3396 * Defer completion to avoid potential infinite recursion if the 3397 * user's completion callback issues a new I/O. 3398 */ 3399 spdk_thread_send_msg(spdk_io_channel_get_thread(bdev_io->internal.ch->channel), 3400 _spdk_bdev_io_complete, bdev_io); 3401 return; 3402 } 3403 3404 tsc = spdk_get_ticks(); 3405 tsc_diff = tsc - bdev_io->internal.submit_tsc; 3406 spdk_trace_record_tsc(tsc, TRACE_BDEV_IO_DONE, 0, 0, (uintptr_t)bdev_io, 0); 3407 3408 if (bdev_io->internal.ch->histogram) { 3409 spdk_histogram_data_tally(bdev_io->internal.ch->histogram, tsc_diff); 3410 } 3411 3412 if (bdev_io->internal.status == SPDK_BDEV_IO_STATUS_SUCCESS) { 3413 switch (bdev_io->type) { 3414 case SPDK_BDEV_IO_TYPE_READ: 3415 bdev_io->internal.ch->stat.bytes_read += bdev_io->u.bdev.num_blocks * bdev_io->bdev->blocklen; 3416 bdev_io->internal.ch->stat.num_read_ops++; 3417 bdev_io->internal.ch->stat.read_latency_ticks += tsc_diff; 3418 break; 3419 case SPDK_BDEV_IO_TYPE_WRITE: 3420 bdev_io->internal.ch->stat.bytes_written += bdev_io->u.bdev.num_blocks * bdev_io->bdev->blocklen; 3421 bdev_io->internal.ch->stat.num_write_ops++; 3422 bdev_io->internal.ch->stat.write_latency_ticks += tsc_diff; 3423 break; 3424 case SPDK_BDEV_IO_TYPE_UNMAP: 3425 bdev_io->internal.ch->stat.bytes_unmapped += bdev_io->u.bdev.num_blocks * bdev_io->bdev->blocklen; 3426 bdev_io->internal.ch->stat.num_unmap_ops++; 3427 bdev_io->internal.ch->stat.unmap_latency_ticks += tsc_diff; 3428 default: 3429 break; 3430 } 3431 } 3432 3433 #ifdef SPDK_CONFIG_VTUNE 3434 uint64_t now_tsc = spdk_get_ticks(); 3435 if (now_tsc > (bdev_io->internal.ch->start_tsc + bdev_io->internal.ch->interval_tsc)) { 3436 uint64_t data[5]; 3437 3438 data[0] = bdev_io->internal.ch->stat.num_read_ops - bdev_io->internal.ch->prev_stat.num_read_ops; 3439 data[1] = bdev_io->internal.ch->stat.bytes_read - bdev_io->internal.ch->prev_stat.bytes_read; 3440 data[2] = bdev_io->internal.ch->stat.num_write_ops - bdev_io->internal.ch->prev_stat.num_write_ops; 3441 data[3] = bdev_io->internal.ch->stat.bytes_written - bdev_io->internal.ch->prev_stat.bytes_written; 3442 data[4] = bdev_io->bdev->fn_table->get_spin_time ? 3443 bdev_io->bdev->fn_table->get_spin_time(bdev_io->internal.ch->channel) : 0; 3444 3445 __itt_metadata_add(g_bdev_mgr.domain, __itt_null, bdev_io->internal.ch->handle, 3446 __itt_metadata_u64, 5, data); 3447 3448 bdev_io->internal.ch->prev_stat = bdev_io->internal.ch->stat; 3449 bdev_io->internal.ch->start_tsc = now_tsc; 3450 } 3451 #endif 3452 3453 assert(bdev_io->internal.cb != NULL); 3454 assert(spdk_get_thread() == spdk_io_channel_get_thread(bdev_io->internal.ch->channel)); 3455 3456 bdev_io->internal.cb(bdev_io, bdev_io->internal.status == SPDK_BDEV_IO_STATUS_SUCCESS, 3457 bdev_io->internal.caller_ctx); 3458 } 3459 3460 static void 3461 _spdk_bdev_reset_complete(struct spdk_io_channel_iter *i, int status) 3462 { 3463 struct spdk_bdev_io *bdev_io = spdk_io_channel_iter_get_ctx(i); 3464 3465 if (bdev_io->u.reset.ch_ref != NULL) { 3466 spdk_put_io_channel(bdev_io->u.reset.ch_ref); 3467 bdev_io->u.reset.ch_ref = NULL; 3468 } 3469 3470 _spdk_bdev_io_complete(bdev_io); 3471 } 3472 3473 static void 3474 _spdk_bdev_unfreeze_channel(struct spdk_io_channel_iter *i) 3475 { 3476 struct spdk_io_channel *_ch = spdk_io_channel_iter_get_channel(i); 3477 struct spdk_bdev_channel *ch = spdk_io_channel_get_ctx(_ch); 3478 3479 ch->flags &= ~BDEV_CH_RESET_IN_PROGRESS; 3480 if (!TAILQ_EMPTY(&ch->queued_resets)) { 3481 _spdk_bdev_channel_start_reset(ch); 3482 } 3483 3484 spdk_for_each_channel_continue(i, 0); 3485 } 3486 3487 void 3488 spdk_bdev_io_complete(struct spdk_bdev_io *bdev_io, enum spdk_bdev_io_status status) 3489 { 3490 struct spdk_bdev *bdev = bdev_io->bdev; 3491 struct spdk_bdev_channel *bdev_ch = bdev_io->internal.ch; 3492 struct spdk_bdev_shared_resource *shared_resource = bdev_ch->shared_resource; 3493 3494 bdev_io->internal.status = status; 3495 3496 if (spdk_unlikely(bdev_io->type == SPDK_BDEV_IO_TYPE_RESET)) { 3497 bool unlock_channels = false; 3498 3499 if (status == SPDK_BDEV_IO_STATUS_NOMEM) { 3500 SPDK_ERRLOG("NOMEM returned for reset\n"); 3501 } 3502 pthread_mutex_lock(&bdev->internal.mutex); 3503 if (bdev_io == bdev->internal.reset_in_progress) { 3504 bdev->internal.reset_in_progress = NULL; 3505 unlock_channels = true; 3506 } 3507 pthread_mutex_unlock(&bdev->internal.mutex); 3508 3509 if (unlock_channels) { 3510 spdk_for_each_channel(__bdev_to_io_dev(bdev), _spdk_bdev_unfreeze_channel, 3511 bdev_io, _spdk_bdev_reset_complete); 3512 return; 3513 } 3514 } else { 3515 if (spdk_unlikely(bdev_io->internal.orig_iovcnt > 0)) { 3516 _bdev_io_unset_bounce_buf(bdev_io); 3517 } 3518 3519 assert(bdev_ch->io_outstanding > 0); 3520 assert(shared_resource->io_outstanding > 0); 3521 bdev_ch->io_outstanding--; 3522 shared_resource->io_outstanding--; 3523 3524 if (spdk_unlikely(status == SPDK_BDEV_IO_STATUS_NOMEM)) { 3525 assert(shared_resource->io_outstanding > 0); 3526 TAILQ_INSERT_HEAD(&shared_resource->nomem_io, bdev_io, internal.link); 3527 /* 3528 * Wait for some of the outstanding I/O to complete before we 3529 * retry any of the nomem_io. Normally we will wait for 3530 * NOMEM_THRESHOLD_COUNT I/O to complete but for low queue 3531 * depth channels we will instead wait for half to complete. 3532 */ 3533 shared_resource->nomem_threshold = spdk_max((int64_t)shared_resource->io_outstanding / 2, 3534 (int64_t)shared_resource->io_outstanding - NOMEM_THRESHOLD_COUNT); 3535 return; 3536 } 3537 3538 if (spdk_unlikely(!TAILQ_EMPTY(&shared_resource->nomem_io))) { 3539 _spdk_bdev_ch_retry_io(bdev_ch); 3540 } 3541 } 3542 3543 _spdk_bdev_io_complete(bdev_io); 3544 } 3545 3546 void 3547 spdk_bdev_io_complete_scsi_status(struct spdk_bdev_io *bdev_io, enum spdk_scsi_status sc, 3548 enum spdk_scsi_sense sk, uint8_t asc, uint8_t ascq) 3549 { 3550 if (sc == SPDK_SCSI_STATUS_GOOD) { 3551 bdev_io->internal.status = SPDK_BDEV_IO_STATUS_SUCCESS; 3552 } else { 3553 bdev_io->internal.status = SPDK_BDEV_IO_STATUS_SCSI_ERROR; 3554 bdev_io->internal.error.scsi.sc = sc; 3555 bdev_io->internal.error.scsi.sk = sk; 3556 bdev_io->internal.error.scsi.asc = asc; 3557 bdev_io->internal.error.scsi.ascq = ascq; 3558 } 3559 3560 spdk_bdev_io_complete(bdev_io, bdev_io->internal.status); 3561 } 3562 3563 void 3564 spdk_bdev_io_get_scsi_status(const struct spdk_bdev_io *bdev_io, 3565 int *sc, int *sk, int *asc, int *ascq) 3566 { 3567 assert(sc != NULL); 3568 assert(sk != NULL); 3569 assert(asc != NULL); 3570 assert(ascq != NULL); 3571 3572 switch (bdev_io->internal.status) { 3573 case SPDK_BDEV_IO_STATUS_SUCCESS: 3574 *sc = SPDK_SCSI_STATUS_GOOD; 3575 *sk = SPDK_SCSI_SENSE_NO_SENSE; 3576 *asc = SPDK_SCSI_ASC_NO_ADDITIONAL_SENSE; 3577 *ascq = SPDK_SCSI_ASCQ_CAUSE_NOT_REPORTABLE; 3578 break; 3579 case SPDK_BDEV_IO_STATUS_NVME_ERROR: 3580 spdk_scsi_nvme_translate(bdev_io, sc, sk, asc, ascq); 3581 break; 3582 case SPDK_BDEV_IO_STATUS_SCSI_ERROR: 3583 *sc = bdev_io->internal.error.scsi.sc; 3584 *sk = bdev_io->internal.error.scsi.sk; 3585 *asc = bdev_io->internal.error.scsi.asc; 3586 *ascq = bdev_io->internal.error.scsi.ascq; 3587 break; 3588 default: 3589 *sc = SPDK_SCSI_STATUS_CHECK_CONDITION; 3590 *sk = SPDK_SCSI_SENSE_ABORTED_COMMAND; 3591 *asc = SPDK_SCSI_ASC_NO_ADDITIONAL_SENSE; 3592 *ascq = SPDK_SCSI_ASCQ_CAUSE_NOT_REPORTABLE; 3593 break; 3594 } 3595 } 3596 3597 void 3598 spdk_bdev_io_complete_nvme_status(struct spdk_bdev_io *bdev_io, int sct, int sc) 3599 { 3600 if (sct == SPDK_NVME_SCT_GENERIC && sc == SPDK_NVME_SC_SUCCESS) { 3601 bdev_io->internal.status = SPDK_BDEV_IO_STATUS_SUCCESS; 3602 } else { 3603 bdev_io->internal.error.nvme.sct = sct; 3604 bdev_io->internal.error.nvme.sc = sc; 3605 bdev_io->internal.status = SPDK_BDEV_IO_STATUS_NVME_ERROR; 3606 } 3607 3608 spdk_bdev_io_complete(bdev_io, bdev_io->internal.status); 3609 } 3610 3611 void 3612 spdk_bdev_io_get_nvme_status(const struct spdk_bdev_io *bdev_io, int *sct, int *sc) 3613 { 3614 assert(sct != NULL); 3615 assert(sc != NULL); 3616 3617 if (bdev_io->internal.status == SPDK_BDEV_IO_STATUS_NVME_ERROR) { 3618 *sct = bdev_io->internal.error.nvme.sct; 3619 *sc = bdev_io->internal.error.nvme.sc; 3620 } else if (bdev_io->internal.status == SPDK_BDEV_IO_STATUS_SUCCESS) { 3621 *sct = SPDK_NVME_SCT_GENERIC; 3622 *sc = SPDK_NVME_SC_SUCCESS; 3623 } else { 3624 *sct = SPDK_NVME_SCT_GENERIC; 3625 *sc = SPDK_NVME_SC_INTERNAL_DEVICE_ERROR; 3626 } 3627 } 3628 3629 struct spdk_thread * 3630 spdk_bdev_io_get_thread(struct spdk_bdev_io *bdev_io) 3631 { 3632 return spdk_io_channel_get_thread(bdev_io->internal.ch->channel); 3633 } 3634 3635 struct spdk_io_channel * 3636 spdk_bdev_io_get_io_channel(struct spdk_bdev_io *bdev_io) 3637 { 3638 return bdev_io->internal.ch->channel; 3639 } 3640 3641 static void 3642 _spdk_bdev_qos_config_limit(struct spdk_bdev *bdev, uint64_t *limits) 3643 { 3644 uint64_t min_qos_set; 3645 int i; 3646 3647 for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) { 3648 if (limits[i] != SPDK_BDEV_QOS_LIMIT_NOT_DEFINED) { 3649 break; 3650 } 3651 } 3652 3653 if (i == SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES) { 3654 SPDK_ERRLOG("Invalid rate limits set.\n"); 3655 return; 3656 } 3657 3658 for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) { 3659 if (limits[i] == SPDK_BDEV_QOS_LIMIT_NOT_DEFINED) { 3660 continue; 3661 } 3662 3663 if (_spdk_bdev_qos_is_iops_rate_limit(i) == true) { 3664 min_qos_set = SPDK_BDEV_QOS_MIN_IOS_PER_SEC; 3665 } else { 3666 min_qos_set = SPDK_BDEV_QOS_MIN_BYTES_PER_SEC; 3667 } 3668 3669 if (limits[i] == 0 || limits[i] % min_qos_set) { 3670 SPDK_ERRLOG("Assigned limit %" PRIu64 " on bdev %s is not multiple of %" PRIu64 "\n", 3671 limits[i], bdev->name, min_qos_set); 3672 SPDK_ERRLOG("Failed to enable QoS on this bdev %s\n", bdev->name); 3673 return; 3674 } 3675 } 3676 3677 if (!bdev->internal.qos) { 3678 bdev->internal.qos = calloc(1, sizeof(*bdev->internal.qos)); 3679 if (!bdev->internal.qos) { 3680 SPDK_ERRLOG("Unable to allocate memory for QoS tracking\n"); 3681 return; 3682 } 3683 } 3684 3685 for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) { 3686 bdev->internal.qos->rate_limits[i].limit = limits[i]; 3687 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Bdev:%s QoS type:%d set:%lu\n", 3688 bdev->name, i, limits[i]); 3689 } 3690 3691 return; 3692 } 3693 3694 static void 3695 _spdk_bdev_qos_config(struct spdk_bdev *bdev) 3696 { 3697 struct spdk_conf_section *sp = NULL; 3698 const char *val = NULL; 3699 int i = 0, j = 0; 3700 uint64_t limits[SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES] = {}; 3701 bool config_qos = false; 3702 3703 sp = spdk_conf_find_section(NULL, "QoS"); 3704 if (!sp) { 3705 return; 3706 } 3707 3708 while (j < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES) { 3709 limits[j] = SPDK_BDEV_QOS_LIMIT_NOT_DEFINED; 3710 3711 i = 0; 3712 while (true) { 3713 val = spdk_conf_section_get_nmval(sp, qos_conf_type[j], i, 0); 3714 if (!val) { 3715 break; 3716 } 3717 3718 if (strcmp(bdev->name, val) != 0) { 3719 i++; 3720 continue; 3721 } 3722 3723 val = spdk_conf_section_get_nmval(sp, qos_conf_type[j], i, 1); 3724 if (val) { 3725 if (_spdk_bdev_qos_is_iops_rate_limit(j) == true) { 3726 limits[j] = strtoull(val, NULL, 10); 3727 } else { 3728 limits[j] = strtoull(val, NULL, 10) * 1024 * 1024; 3729 } 3730 config_qos = true; 3731 } 3732 3733 break; 3734 } 3735 3736 j++; 3737 } 3738 3739 if (config_qos == true) { 3740 _spdk_bdev_qos_config_limit(bdev, limits); 3741 } 3742 3743 return; 3744 } 3745 3746 static int 3747 spdk_bdev_init(struct spdk_bdev *bdev) 3748 { 3749 char *bdev_name; 3750 3751 assert(bdev->module != NULL); 3752 3753 if (!bdev->name) { 3754 SPDK_ERRLOG("Bdev name is NULL\n"); 3755 return -EINVAL; 3756 } 3757 3758 if (spdk_bdev_get_by_name(bdev->name)) { 3759 SPDK_ERRLOG("Bdev name:%s already exists\n", bdev->name); 3760 return -EEXIST; 3761 } 3762 3763 /* Users often register their own I/O devices using the bdev name. In 3764 * order to avoid conflicts, prepend bdev_. */ 3765 bdev_name = spdk_sprintf_alloc("bdev_%s", bdev->name); 3766 if (!bdev_name) { 3767 SPDK_ERRLOG("Unable to allocate memory for internal bdev name.\n"); 3768 return -ENOMEM; 3769 } 3770 3771 bdev->internal.status = SPDK_BDEV_STATUS_READY; 3772 bdev->internal.measured_queue_depth = UINT64_MAX; 3773 bdev->internal.claim_module = NULL; 3774 bdev->internal.qd_poller = NULL; 3775 bdev->internal.qos = NULL; 3776 3777 if (spdk_bdev_get_buf_align(bdev) > 1) { 3778 if (bdev->split_on_optimal_io_boundary) { 3779 bdev->optimal_io_boundary = spdk_min(bdev->optimal_io_boundary, 3780 SPDK_BDEV_LARGE_BUF_MAX_SIZE / bdev->blocklen); 3781 } else { 3782 bdev->split_on_optimal_io_boundary = true; 3783 bdev->optimal_io_boundary = SPDK_BDEV_LARGE_BUF_MAX_SIZE / bdev->blocklen; 3784 } 3785 } 3786 3787 TAILQ_INIT(&bdev->internal.open_descs); 3788 3789 TAILQ_INIT(&bdev->aliases); 3790 3791 bdev->internal.reset_in_progress = NULL; 3792 3793 _spdk_bdev_qos_config(bdev); 3794 3795 spdk_io_device_register(__bdev_to_io_dev(bdev), 3796 spdk_bdev_channel_create, spdk_bdev_channel_destroy, 3797 sizeof(struct spdk_bdev_channel), 3798 bdev_name); 3799 3800 free(bdev_name); 3801 3802 pthread_mutex_init(&bdev->internal.mutex, NULL); 3803 return 0; 3804 } 3805 3806 static void 3807 spdk_bdev_destroy_cb(void *io_device) 3808 { 3809 int rc; 3810 struct spdk_bdev *bdev; 3811 spdk_bdev_unregister_cb cb_fn; 3812 void *cb_arg; 3813 3814 bdev = __bdev_from_io_dev(io_device); 3815 cb_fn = bdev->internal.unregister_cb; 3816 cb_arg = bdev->internal.unregister_ctx; 3817 3818 rc = bdev->fn_table->destruct(bdev->ctxt); 3819 if (rc < 0) { 3820 SPDK_ERRLOG("destruct failed\n"); 3821 } 3822 if (rc <= 0 && cb_fn != NULL) { 3823 cb_fn(cb_arg, rc); 3824 } 3825 } 3826 3827 3828 static void 3829 spdk_bdev_fini(struct spdk_bdev *bdev) 3830 { 3831 pthread_mutex_destroy(&bdev->internal.mutex); 3832 3833 free(bdev->internal.qos); 3834 3835 spdk_io_device_unregister(__bdev_to_io_dev(bdev), spdk_bdev_destroy_cb); 3836 } 3837 3838 static void 3839 spdk_bdev_start(struct spdk_bdev *bdev) 3840 { 3841 struct spdk_bdev_module *module; 3842 uint32_t action; 3843 3844 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Inserting bdev %s into list\n", bdev->name); 3845 TAILQ_INSERT_TAIL(&g_bdev_mgr.bdevs, bdev, internal.link); 3846 3847 /* Examine configuration before initializing I/O */ 3848 TAILQ_FOREACH(module, &g_bdev_mgr.bdev_modules, internal.tailq) { 3849 if (module->examine_config) { 3850 action = module->internal.action_in_progress; 3851 module->internal.action_in_progress++; 3852 module->examine_config(bdev); 3853 if (action != module->internal.action_in_progress) { 3854 SPDK_ERRLOG("examine_config for module %s did not call spdk_bdev_module_examine_done()\n", 3855 module->name); 3856 } 3857 } 3858 } 3859 3860 if (bdev->internal.claim_module) { 3861 if (bdev->internal.claim_module->examine_disk) { 3862 bdev->internal.claim_module->internal.action_in_progress++; 3863 bdev->internal.claim_module->examine_disk(bdev); 3864 } 3865 return; 3866 } 3867 3868 TAILQ_FOREACH(module, &g_bdev_mgr.bdev_modules, internal.tailq) { 3869 if (module->examine_disk) { 3870 module->internal.action_in_progress++; 3871 module->examine_disk(bdev); 3872 } 3873 } 3874 } 3875 3876 int 3877 spdk_bdev_register(struct spdk_bdev *bdev) 3878 { 3879 int rc = spdk_bdev_init(bdev); 3880 3881 if (rc == 0) { 3882 spdk_bdev_start(bdev); 3883 } 3884 3885 spdk_notify_send("bdev_register", spdk_bdev_get_name(bdev)); 3886 return rc; 3887 } 3888 3889 int 3890 spdk_vbdev_register(struct spdk_bdev *vbdev, struct spdk_bdev **base_bdevs, int base_bdev_count) 3891 { 3892 SPDK_ERRLOG("This function is deprecated. Use spdk_bdev_register() instead.\n"); 3893 return spdk_bdev_register(vbdev); 3894 } 3895 3896 void 3897 spdk_bdev_destruct_done(struct spdk_bdev *bdev, int bdeverrno) 3898 { 3899 if (bdev->internal.unregister_cb != NULL) { 3900 bdev->internal.unregister_cb(bdev->internal.unregister_ctx, bdeverrno); 3901 } 3902 } 3903 3904 static void 3905 _remove_notify(void *arg) 3906 { 3907 struct spdk_bdev_desc *desc = arg; 3908 3909 desc->remove_scheduled = false; 3910 3911 if (desc->closed) { 3912 free(desc); 3913 } else { 3914 desc->remove_cb(desc->remove_ctx); 3915 } 3916 } 3917 3918 /* Must be called while holding bdev->internal.mutex. 3919 * returns: 0 - bdev removed and ready to be destructed. 3920 * -EBUSY - bdev can't be destructed yet. */ 3921 static int 3922 spdk_bdev_unregister_unsafe(struct spdk_bdev *bdev) 3923 { 3924 struct spdk_bdev_desc *desc, *tmp; 3925 int rc = 0; 3926 3927 /* Notify each descriptor about hotremoval */ 3928 TAILQ_FOREACH_SAFE(desc, &bdev->internal.open_descs, link, tmp) { 3929 rc = -EBUSY; 3930 if (desc->remove_cb) { 3931 /* 3932 * Defer invocation of the remove_cb to a separate message that will 3933 * run later on its thread. This ensures this context unwinds and 3934 * we don't recursively unregister this bdev again if the remove_cb 3935 * immediately closes its descriptor. 3936 */ 3937 if (!desc->remove_scheduled) { 3938 /* Avoid scheduling removal of the same descriptor multiple times. */ 3939 desc->remove_scheduled = true; 3940 spdk_thread_send_msg(desc->thread, _remove_notify, desc); 3941 } 3942 } 3943 } 3944 3945 /* If there are no descriptors, proceed removing the bdev */ 3946 if (rc == 0) { 3947 TAILQ_REMOVE(&g_bdev_mgr.bdevs, bdev, internal.link); 3948 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Removing bdev %s from list done\n", bdev->name); 3949 spdk_notify_send("bdev_unregister", spdk_bdev_get_name(bdev)); 3950 } 3951 3952 return rc; 3953 } 3954 3955 void 3956 spdk_bdev_unregister(struct spdk_bdev *bdev, spdk_bdev_unregister_cb cb_fn, void *cb_arg) 3957 { 3958 struct spdk_thread *thread; 3959 int rc; 3960 3961 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Removing bdev %s from list\n", bdev->name); 3962 3963 thread = spdk_get_thread(); 3964 if (!thread) { 3965 /* The user called this from a non-SPDK thread. */ 3966 if (cb_fn != NULL) { 3967 cb_fn(cb_arg, -ENOTSUP); 3968 } 3969 return; 3970 } 3971 3972 pthread_mutex_lock(&bdev->internal.mutex); 3973 if (bdev->internal.status == SPDK_BDEV_STATUS_REMOVING) { 3974 pthread_mutex_unlock(&bdev->internal.mutex); 3975 if (cb_fn) { 3976 cb_fn(cb_arg, -EBUSY); 3977 } 3978 return; 3979 } 3980 3981 bdev->internal.status = SPDK_BDEV_STATUS_REMOVING; 3982 bdev->internal.unregister_cb = cb_fn; 3983 bdev->internal.unregister_ctx = cb_arg; 3984 3985 /* Call under lock. */ 3986 rc = spdk_bdev_unregister_unsafe(bdev); 3987 pthread_mutex_unlock(&bdev->internal.mutex); 3988 3989 if (rc == 0) { 3990 spdk_bdev_fini(bdev); 3991 } 3992 } 3993 3994 int 3995 spdk_bdev_open(struct spdk_bdev *bdev, bool write, spdk_bdev_remove_cb_t remove_cb, 3996 void *remove_ctx, struct spdk_bdev_desc **_desc) 3997 { 3998 struct spdk_bdev_desc *desc; 3999 struct spdk_thread *thread; 4000 struct set_qos_limit_ctx *ctx; 4001 4002 thread = spdk_get_thread(); 4003 if (!thread) { 4004 SPDK_ERRLOG("Cannot open bdev from non-SPDK thread.\n"); 4005 return -ENOTSUP; 4006 } 4007 4008 desc = calloc(1, sizeof(*desc)); 4009 if (desc == NULL) { 4010 SPDK_ERRLOG("Failed to allocate memory for bdev descriptor\n"); 4011 return -ENOMEM; 4012 } 4013 4014 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Opening descriptor %p for bdev %s on thread %p\n", desc, bdev->name, 4015 spdk_get_thread()); 4016 4017 desc->bdev = bdev; 4018 desc->thread = thread; 4019 desc->remove_cb = remove_cb; 4020 desc->remove_ctx = remove_ctx; 4021 desc->write = write; 4022 *_desc = desc; 4023 4024 pthread_mutex_lock(&bdev->internal.mutex); 4025 4026 if (write && bdev->internal.claim_module) { 4027 SPDK_ERRLOG("Could not open %s - %s module already claimed it\n", 4028 bdev->name, bdev->internal.claim_module->name); 4029 pthread_mutex_unlock(&bdev->internal.mutex); 4030 free(desc); 4031 *_desc = NULL; 4032 return -EPERM; 4033 } 4034 4035 /* Enable QoS */ 4036 if (bdev->internal.qos && bdev->internal.qos->thread == NULL) { 4037 ctx = calloc(1, sizeof(*ctx)); 4038 if (ctx == NULL) { 4039 SPDK_ERRLOG("Failed to allocate memory for QoS context\n"); 4040 pthread_mutex_unlock(&bdev->internal.mutex); 4041 free(desc); 4042 *_desc = NULL; 4043 return -ENOMEM; 4044 } 4045 ctx->bdev = bdev; 4046 spdk_for_each_channel(__bdev_to_io_dev(bdev), 4047 _spdk_bdev_enable_qos_msg, ctx, 4048 _spdk_bdev_enable_qos_done); 4049 } 4050 4051 TAILQ_INSERT_TAIL(&bdev->internal.open_descs, desc, link); 4052 4053 pthread_mutex_unlock(&bdev->internal.mutex); 4054 4055 return 0; 4056 } 4057 4058 void 4059 spdk_bdev_close(struct spdk_bdev_desc *desc) 4060 { 4061 struct spdk_bdev *bdev = desc->bdev; 4062 int rc; 4063 4064 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Closing descriptor %p for bdev %s on thread %p\n", desc, bdev->name, 4065 spdk_get_thread()); 4066 4067 if (desc->thread != spdk_get_thread()) { 4068 SPDK_ERRLOG("Descriptor %p for bdev %s closed on wrong thread (%p, expected %p)\n", 4069 desc, bdev->name, spdk_get_thread(), desc->thread); 4070 } 4071 4072 pthread_mutex_lock(&bdev->internal.mutex); 4073 4074 TAILQ_REMOVE(&bdev->internal.open_descs, desc, link); 4075 4076 desc->closed = true; 4077 4078 if (!desc->remove_scheduled) { 4079 free(desc); 4080 } 4081 4082 /* If no more descriptors, kill QoS channel */ 4083 if (bdev->internal.qos && TAILQ_EMPTY(&bdev->internal.open_descs)) { 4084 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Closed last descriptor for bdev %s on thread %p. Stopping QoS.\n", 4085 bdev->name, spdk_get_thread()); 4086 4087 if (spdk_bdev_qos_destroy(bdev)) { 4088 /* There isn't anything we can do to recover here. Just let the 4089 * old QoS poller keep running. The QoS handling won't change 4090 * cores when the user allocates a new channel, but it won't break. */ 4091 SPDK_ERRLOG("Unable to shut down QoS poller. It will continue running on the current thread.\n"); 4092 } 4093 } 4094 4095 spdk_bdev_set_qd_sampling_period(bdev, 0); 4096 4097 if (bdev->internal.status == SPDK_BDEV_STATUS_REMOVING && TAILQ_EMPTY(&bdev->internal.open_descs)) { 4098 rc = spdk_bdev_unregister_unsafe(bdev); 4099 pthread_mutex_unlock(&bdev->internal.mutex); 4100 4101 if (rc == 0) { 4102 spdk_bdev_fini(bdev); 4103 } 4104 } else { 4105 pthread_mutex_unlock(&bdev->internal.mutex); 4106 } 4107 } 4108 4109 int 4110 spdk_bdev_module_claim_bdev(struct spdk_bdev *bdev, struct spdk_bdev_desc *desc, 4111 struct spdk_bdev_module *module) 4112 { 4113 if (bdev->internal.claim_module != NULL) { 4114 SPDK_ERRLOG("bdev %s already claimed by module %s\n", bdev->name, 4115 bdev->internal.claim_module->name); 4116 return -EPERM; 4117 } 4118 4119 if (desc && !desc->write) { 4120 desc->write = true; 4121 } 4122 4123 bdev->internal.claim_module = module; 4124 return 0; 4125 } 4126 4127 void 4128 spdk_bdev_module_release_bdev(struct spdk_bdev *bdev) 4129 { 4130 assert(bdev->internal.claim_module != NULL); 4131 bdev->internal.claim_module = NULL; 4132 } 4133 4134 struct spdk_bdev * 4135 spdk_bdev_desc_get_bdev(struct spdk_bdev_desc *desc) 4136 { 4137 return desc->bdev; 4138 } 4139 4140 void 4141 spdk_bdev_io_get_iovec(struct spdk_bdev_io *bdev_io, struct iovec **iovp, int *iovcntp) 4142 { 4143 struct iovec *iovs; 4144 int iovcnt; 4145 4146 if (bdev_io == NULL) { 4147 return; 4148 } 4149 4150 switch (bdev_io->type) { 4151 case SPDK_BDEV_IO_TYPE_READ: 4152 case SPDK_BDEV_IO_TYPE_WRITE: 4153 case SPDK_BDEV_IO_TYPE_ZCOPY: 4154 iovs = bdev_io->u.bdev.iovs; 4155 iovcnt = bdev_io->u.bdev.iovcnt; 4156 break; 4157 default: 4158 iovs = NULL; 4159 iovcnt = 0; 4160 break; 4161 } 4162 4163 if (iovp) { 4164 *iovp = iovs; 4165 } 4166 if (iovcntp) { 4167 *iovcntp = iovcnt; 4168 } 4169 } 4170 4171 void 4172 spdk_bdev_module_list_add(struct spdk_bdev_module *bdev_module) 4173 { 4174 4175 if (spdk_bdev_module_list_find(bdev_module->name)) { 4176 SPDK_ERRLOG("ERROR: module '%s' already registered.\n", bdev_module->name); 4177 assert(false); 4178 } 4179 4180 /* 4181 * Modules with examine callbacks must be initialized first, so they are 4182 * ready to handle examine callbacks from later modules that will 4183 * register physical bdevs. 4184 */ 4185 if (bdev_module->examine_config != NULL || bdev_module->examine_disk != NULL) { 4186 TAILQ_INSERT_HEAD(&g_bdev_mgr.bdev_modules, bdev_module, internal.tailq); 4187 } else { 4188 TAILQ_INSERT_TAIL(&g_bdev_mgr.bdev_modules, bdev_module, internal.tailq); 4189 } 4190 } 4191 4192 struct spdk_bdev_module * 4193 spdk_bdev_module_list_find(const char *name) 4194 { 4195 struct spdk_bdev_module *bdev_module; 4196 4197 TAILQ_FOREACH(bdev_module, &g_bdev_mgr.bdev_modules, internal.tailq) { 4198 if (strcmp(name, bdev_module->name) == 0) { 4199 break; 4200 } 4201 } 4202 4203 return bdev_module; 4204 } 4205 4206 static void 4207 _spdk_bdev_write_zero_buffer_next(void *_bdev_io) 4208 { 4209 struct spdk_bdev_io *bdev_io = _bdev_io; 4210 uint64_t num_bytes, num_blocks; 4211 int rc; 4212 4213 num_bytes = spdk_min(spdk_bdev_get_block_size(bdev_io->bdev) * 4214 bdev_io->u.bdev.split_remaining_num_blocks, 4215 ZERO_BUFFER_SIZE); 4216 num_blocks = num_bytes / spdk_bdev_get_block_size(bdev_io->bdev); 4217 4218 rc = spdk_bdev_write_blocks(bdev_io->internal.desc, 4219 spdk_io_channel_from_ctx(bdev_io->internal.ch), 4220 g_bdev_mgr.zero_buffer, 4221 bdev_io->u.bdev.split_current_offset_blocks, num_blocks, 4222 _spdk_bdev_write_zero_buffer_done, bdev_io); 4223 if (rc == 0) { 4224 bdev_io->u.bdev.split_remaining_num_blocks -= num_blocks; 4225 bdev_io->u.bdev.split_current_offset_blocks += num_blocks; 4226 } else if (rc == -ENOMEM) { 4227 _spdk_bdev_queue_io_wait_with_cb(bdev_io, _spdk_bdev_write_zero_buffer_next); 4228 } else { 4229 bdev_io->internal.status = SPDK_BDEV_IO_STATUS_FAILED; 4230 bdev_io->internal.cb(bdev_io, false, bdev_io->internal.caller_ctx); 4231 } 4232 } 4233 4234 static void 4235 _spdk_bdev_write_zero_buffer_done(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg) 4236 { 4237 struct spdk_bdev_io *parent_io = cb_arg; 4238 4239 spdk_bdev_free_io(bdev_io); 4240 4241 if (!success) { 4242 parent_io->internal.status = SPDK_BDEV_IO_STATUS_FAILED; 4243 parent_io->internal.cb(parent_io, false, parent_io->internal.caller_ctx); 4244 return; 4245 } 4246 4247 if (parent_io->u.bdev.split_remaining_num_blocks == 0) { 4248 parent_io->internal.status = SPDK_BDEV_IO_STATUS_SUCCESS; 4249 parent_io->internal.cb(parent_io, true, parent_io->internal.caller_ctx); 4250 return; 4251 } 4252 4253 _spdk_bdev_write_zero_buffer_next(parent_io); 4254 } 4255 4256 static void 4257 _spdk_bdev_set_qos_limit_done(struct set_qos_limit_ctx *ctx, int status) 4258 { 4259 pthread_mutex_lock(&ctx->bdev->internal.mutex); 4260 ctx->bdev->internal.qos_mod_in_progress = false; 4261 pthread_mutex_unlock(&ctx->bdev->internal.mutex); 4262 4263 if (ctx->cb_fn) { 4264 ctx->cb_fn(ctx->cb_arg, status); 4265 } 4266 free(ctx); 4267 } 4268 4269 static void 4270 _spdk_bdev_disable_qos_done(void *cb_arg) 4271 { 4272 struct set_qos_limit_ctx *ctx = cb_arg; 4273 struct spdk_bdev *bdev = ctx->bdev; 4274 struct spdk_bdev_io *bdev_io; 4275 struct spdk_bdev_qos *qos; 4276 4277 pthread_mutex_lock(&bdev->internal.mutex); 4278 qos = bdev->internal.qos; 4279 bdev->internal.qos = NULL; 4280 pthread_mutex_unlock(&bdev->internal.mutex); 4281 4282 while (!TAILQ_EMPTY(&qos->queued)) { 4283 /* Send queued I/O back to their original thread for resubmission. */ 4284 bdev_io = TAILQ_FIRST(&qos->queued); 4285 TAILQ_REMOVE(&qos->queued, bdev_io, internal.link); 4286 4287 if (bdev_io->internal.io_submit_ch) { 4288 /* 4289 * Channel was changed when sending it to the QoS thread - change it back 4290 * before sending it back to the original thread. 4291 */ 4292 bdev_io->internal.ch = bdev_io->internal.io_submit_ch; 4293 bdev_io->internal.io_submit_ch = NULL; 4294 } 4295 4296 spdk_thread_send_msg(spdk_io_channel_get_thread(bdev_io->internal.ch->channel), 4297 _spdk_bdev_io_submit, bdev_io); 4298 } 4299 4300 if (qos->thread != NULL) { 4301 spdk_put_io_channel(spdk_io_channel_from_ctx(qos->ch)); 4302 spdk_poller_unregister(&qos->poller); 4303 } 4304 4305 free(qos); 4306 4307 _spdk_bdev_set_qos_limit_done(ctx, 0); 4308 } 4309 4310 static void 4311 _spdk_bdev_disable_qos_msg_done(struct spdk_io_channel_iter *i, int status) 4312 { 4313 void *io_device = spdk_io_channel_iter_get_io_device(i); 4314 struct spdk_bdev *bdev = __bdev_from_io_dev(io_device); 4315 struct set_qos_limit_ctx *ctx = spdk_io_channel_iter_get_ctx(i); 4316 struct spdk_thread *thread; 4317 4318 pthread_mutex_lock(&bdev->internal.mutex); 4319 thread = bdev->internal.qos->thread; 4320 pthread_mutex_unlock(&bdev->internal.mutex); 4321 4322 if (thread != NULL) { 4323 spdk_thread_send_msg(thread, _spdk_bdev_disable_qos_done, ctx); 4324 } else { 4325 _spdk_bdev_disable_qos_done(ctx); 4326 } 4327 } 4328 4329 static void 4330 _spdk_bdev_disable_qos_msg(struct spdk_io_channel_iter *i) 4331 { 4332 struct spdk_io_channel *ch = spdk_io_channel_iter_get_channel(i); 4333 struct spdk_bdev_channel *bdev_ch = spdk_io_channel_get_ctx(ch); 4334 4335 bdev_ch->flags &= ~BDEV_CH_QOS_ENABLED; 4336 4337 spdk_for_each_channel_continue(i, 0); 4338 } 4339 4340 static void 4341 _spdk_bdev_update_qos_rate_limit_msg(void *cb_arg) 4342 { 4343 struct set_qos_limit_ctx *ctx = cb_arg; 4344 struct spdk_bdev *bdev = ctx->bdev; 4345 4346 pthread_mutex_lock(&bdev->internal.mutex); 4347 spdk_bdev_qos_update_max_quota_per_timeslice(bdev->internal.qos); 4348 pthread_mutex_unlock(&bdev->internal.mutex); 4349 4350 _spdk_bdev_set_qos_limit_done(ctx, 0); 4351 } 4352 4353 static void 4354 _spdk_bdev_enable_qos_msg(struct spdk_io_channel_iter *i) 4355 { 4356 void *io_device = spdk_io_channel_iter_get_io_device(i); 4357 struct spdk_bdev *bdev = __bdev_from_io_dev(io_device); 4358 struct spdk_io_channel *ch = spdk_io_channel_iter_get_channel(i); 4359 struct spdk_bdev_channel *bdev_ch = spdk_io_channel_get_ctx(ch); 4360 4361 pthread_mutex_lock(&bdev->internal.mutex); 4362 _spdk_bdev_enable_qos(bdev, bdev_ch); 4363 pthread_mutex_unlock(&bdev->internal.mutex); 4364 spdk_for_each_channel_continue(i, 0); 4365 } 4366 4367 static void 4368 _spdk_bdev_enable_qos_done(struct spdk_io_channel_iter *i, int status) 4369 { 4370 struct set_qos_limit_ctx *ctx = spdk_io_channel_iter_get_ctx(i); 4371 4372 _spdk_bdev_set_qos_limit_done(ctx, status); 4373 } 4374 4375 static void 4376 _spdk_bdev_set_qos_rate_limits(struct spdk_bdev *bdev, uint64_t *limits) 4377 { 4378 int i; 4379 4380 assert(bdev->internal.qos != NULL); 4381 4382 for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) { 4383 if (limits[i] != SPDK_BDEV_QOS_LIMIT_NOT_DEFINED) { 4384 bdev->internal.qos->rate_limits[i].limit = limits[i]; 4385 4386 if (limits[i] == 0) { 4387 bdev->internal.qos->rate_limits[i].limit = 4388 SPDK_BDEV_QOS_LIMIT_NOT_DEFINED; 4389 } 4390 } 4391 } 4392 } 4393 4394 void 4395 spdk_bdev_set_qos_rate_limits(struct spdk_bdev *bdev, uint64_t *limits, 4396 void (*cb_fn)(void *cb_arg, int status), void *cb_arg) 4397 { 4398 struct set_qos_limit_ctx *ctx; 4399 uint32_t limit_set_complement; 4400 uint64_t min_limit_per_sec; 4401 int i; 4402 bool disable_rate_limit = true; 4403 4404 for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) { 4405 if (limits[i] == SPDK_BDEV_QOS_LIMIT_NOT_DEFINED) { 4406 continue; 4407 } 4408 4409 if (limits[i] > 0) { 4410 disable_rate_limit = false; 4411 } 4412 4413 if (_spdk_bdev_qos_is_iops_rate_limit(i) == true) { 4414 min_limit_per_sec = SPDK_BDEV_QOS_MIN_IOS_PER_SEC; 4415 } else { 4416 /* Change from megabyte to byte rate limit */ 4417 limits[i] = limits[i] * 1024 * 1024; 4418 min_limit_per_sec = SPDK_BDEV_QOS_MIN_BYTES_PER_SEC; 4419 } 4420 4421 limit_set_complement = limits[i] % min_limit_per_sec; 4422 if (limit_set_complement) { 4423 SPDK_ERRLOG("Requested rate limit %" PRIu64 " is not a multiple of %" PRIu64 "\n", 4424 limits[i], min_limit_per_sec); 4425 limits[i] += min_limit_per_sec - limit_set_complement; 4426 SPDK_ERRLOG("Round up the rate limit to %" PRIu64 "\n", limits[i]); 4427 } 4428 } 4429 4430 ctx = calloc(1, sizeof(*ctx)); 4431 if (ctx == NULL) { 4432 cb_fn(cb_arg, -ENOMEM); 4433 return; 4434 } 4435 4436 ctx->cb_fn = cb_fn; 4437 ctx->cb_arg = cb_arg; 4438 ctx->bdev = bdev; 4439 4440 pthread_mutex_lock(&bdev->internal.mutex); 4441 if (bdev->internal.qos_mod_in_progress) { 4442 pthread_mutex_unlock(&bdev->internal.mutex); 4443 free(ctx); 4444 cb_fn(cb_arg, -EAGAIN); 4445 return; 4446 } 4447 bdev->internal.qos_mod_in_progress = true; 4448 4449 if (disable_rate_limit == true && bdev->internal.qos) { 4450 for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) { 4451 if (limits[i] == SPDK_BDEV_QOS_LIMIT_NOT_DEFINED && 4452 (bdev->internal.qos->rate_limits[i].limit > 0 && 4453 bdev->internal.qos->rate_limits[i].limit != 4454 SPDK_BDEV_QOS_LIMIT_NOT_DEFINED)) { 4455 disable_rate_limit = false; 4456 break; 4457 } 4458 } 4459 } 4460 4461 if (disable_rate_limit == false) { 4462 if (bdev->internal.qos == NULL) { 4463 bdev->internal.qos = calloc(1, sizeof(*bdev->internal.qos)); 4464 if (!bdev->internal.qos) { 4465 pthread_mutex_unlock(&bdev->internal.mutex); 4466 SPDK_ERRLOG("Unable to allocate memory for QoS tracking\n"); 4467 free(ctx); 4468 cb_fn(cb_arg, -ENOMEM); 4469 return; 4470 } 4471 } 4472 4473 if (bdev->internal.qos->thread == NULL) { 4474 /* Enabling */ 4475 _spdk_bdev_set_qos_rate_limits(bdev, limits); 4476 4477 spdk_for_each_channel(__bdev_to_io_dev(bdev), 4478 _spdk_bdev_enable_qos_msg, ctx, 4479 _spdk_bdev_enable_qos_done); 4480 } else { 4481 /* Updating */ 4482 _spdk_bdev_set_qos_rate_limits(bdev, limits); 4483 4484 spdk_thread_send_msg(bdev->internal.qos->thread, 4485 _spdk_bdev_update_qos_rate_limit_msg, ctx); 4486 } 4487 } else { 4488 if (bdev->internal.qos != NULL) { 4489 _spdk_bdev_set_qos_rate_limits(bdev, limits); 4490 4491 /* Disabling */ 4492 spdk_for_each_channel(__bdev_to_io_dev(bdev), 4493 _spdk_bdev_disable_qos_msg, ctx, 4494 _spdk_bdev_disable_qos_msg_done); 4495 } else { 4496 pthread_mutex_unlock(&bdev->internal.mutex); 4497 _spdk_bdev_set_qos_limit_done(ctx, 0); 4498 return; 4499 } 4500 } 4501 4502 pthread_mutex_unlock(&bdev->internal.mutex); 4503 } 4504 4505 struct spdk_bdev_histogram_ctx { 4506 spdk_bdev_histogram_status_cb cb_fn; 4507 void *cb_arg; 4508 struct spdk_bdev *bdev; 4509 int status; 4510 }; 4511 4512 static void 4513 _spdk_bdev_histogram_disable_channel_cb(struct spdk_io_channel_iter *i, int status) 4514 { 4515 struct spdk_bdev_histogram_ctx *ctx = spdk_io_channel_iter_get_ctx(i); 4516 4517 pthread_mutex_lock(&ctx->bdev->internal.mutex); 4518 ctx->bdev->internal.histogram_in_progress = false; 4519 pthread_mutex_unlock(&ctx->bdev->internal.mutex); 4520 ctx->cb_fn(ctx->cb_arg, ctx->status); 4521 free(ctx); 4522 } 4523 4524 static void 4525 _spdk_bdev_histogram_disable_channel(struct spdk_io_channel_iter *i) 4526 { 4527 struct spdk_io_channel *_ch = spdk_io_channel_iter_get_channel(i); 4528 struct spdk_bdev_channel *ch = spdk_io_channel_get_ctx(_ch); 4529 4530 if (ch->histogram != NULL) { 4531 spdk_histogram_data_free(ch->histogram); 4532 ch->histogram = NULL; 4533 } 4534 spdk_for_each_channel_continue(i, 0); 4535 } 4536 4537 static void 4538 _spdk_bdev_histogram_enable_channel_cb(struct spdk_io_channel_iter *i, int status) 4539 { 4540 struct spdk_bdev_histogram_ctx *ctx = spdk_io_channel_iter_get_ctx(i); 4541 4542 if (status != 0) { 4543 ctx->status = status; 4544 ctx->bdev->internal.histogram_enabled = false; 4545 spdk_for_each_channel(__bdev_to_io_dev(ctx->bdev), _spdk_bdev_histogram_disable_channel, ctx, 4546 _spdk_bdev_histogram_disable_channel_cb); 4547 } else { 4548 pthread_mutex_lock(&ctx->bdev->internal.mutex); 4549 ctx->bdev->internal.histogram_in_progress = false; 4550 pthread_mutex_unlock(&ctx->bdev->internal.mutex); 4551 ctx->cb_fn(ctx->cb_arg, ctx->status); 4552 free(ctx); 4553 } 4554 } 4555 4556 static void 4557 _spdk_bdev_histogram_enable_channel(struct spdk_io_channel_iter *i) 4558 { 4559 struct spdk_io_channel *_ch = spdk_io_channel_iter_get_channel(i); 4560 struct spdk_bdev_channel *ch = spdk_io_channel_get_ctx(_ch); 4561 int status = 0; 4562 4563 if (ch->histogram == NULL) { 4564 ch->histogram = spdk_histogram_data_alloc(); 4565 if (ch->histogram == NULL) { 4566 status = -ENOMEM; 4567 } 4568 } 4569 4570 spdk_for_each_channel_continue(i, status); 4571 } 4572 4573 void 4574 spdk_bdev_histogram_enable(struct spdk_bdev *bdev, spdk_bdev_histogram_status_cb cb_fn, 4575 void *cb_arg, bool enable) 4576 { 4577 struct spdk_bdev_histogram_ctx *ctx; 4578 4579 ctx = calloc(1, sizeof(struct spdk_bdev_histogram_ctx)); 4580 if (ctx == NULL) { 4581 cb_fn(cb_arg, -ENOMEM); 4582 return; 4583 } 4584 4585 ctx->bdev = bdev; 4586 ctx->status = 0; 4587 ctx->cb_fn = cb_fn; 4588 ctx->cb_arg = cb_arg; 4589 4590 pthread_mutex_lock(&bdev->internal.mutex); 4591 if (bdev->internal.histogram_in_progress) { 4592 pthread_mutex_unlock(&bdev->internal.mutex); 4593 free(ctx); 4594 cb_fn(cb_arg, -EAGAIN); 4595 return; 4596 } 4597 4598 bdev->internal.histogram_in_progress = true; 4599 pthread_mutex_unlock(&bdev->internal.mutex); 4600 4601 bdev->internal.histogram_enabled = enable; 4602 4603 if (enable) { 4604 /* Allocate histogram for each channel */ 4605 spdk_for_each_channel(__bdev_to_io_dev(bdev), _spdk_bdev_histogram_enable_channel, ctx, 4606 _spdk_bdev_histogram_enable_channel_cb); 4607 } else { 4608 spdk_for_each_channel(__bdev_to_io_dev(bdev), _spdk_bdev_histogram_disable_channel, ctx, 4609 _spdk_bdev_histogram_disable_channel_cb); 4610 } 4611 } 4612 4613 struct spdk_bdev_histogram_data_ctx { 4614 spdk_bdev_histogram_data_cb cb_fn; 4615 void *cb_arg; 4616 struct spdk_bdev *bdev; 4617 /** merged histogram data from all channels */ 4618 struct spdk_histogram_data *histogram; 4619 }; 4620 4621 static void 4622 _spdk_bdev_histogram_get_channel_cb(struct spdk_io_channel_iter *i, int status) 4623 { 4624 struct spdk_bdev_histogram_data_ctx *ctx = spdk_io_channel_iter_get_ctx(i); 4625 4626 ctx->cb_fn(ctx->cb_arg, status, ctx->histogram); 4627 free(ctx); 4628 } 4629 4630 static void 4631 _spdk_bdev_histogram_get_channel(struct spdk_io_channel_iter *i) 4632 { 4633 struct spdk_io_channel *_ch = spdk_io_channel_iter_get_channel(i); 4634 struct spdk_bdev_channel *ch = spdk_io_channel_get_ctx(_ch); 4635 struct spdk_bdev_histogram_data_ctx *ctx = spdk_io_channel_iter_get_ctx(i); 4636 int status = 0; 4637 4638 if (ch->histogram == NULL) { 4639 status = -EFAULT; 4640 } else { 4641 spdk_histogram_data_merge(ctx->histogram, ch->histogram); 4642 } 4643 4644 spdk_for_each_channel_continue(i, status); 4645 } 4646 4647 void 4648 spdk_bdev_histogram_get(struct spdk_bdev *bdev, struct spdk_histogram_data *histogram, 4649 spdk_bdev_histogram_data_cb cb_fn, 4650 void *cb_arg) 4651 { 4652 struct spdk_bdev_histogram_data_ctx *ctx; 4653 4654 ctx = calloc(1, sizeof(struct spdk_bdev_histogram_data_ctx)); 4655 if (ctx == NULL) { 4656 cb_fn(cb_arg, -ENOMEM, NULL); 4657 return; 4658 } 4659 4660 ctx->bdev = bdev; 4661 ctx->cb_fn = cb_fn; 4662 ctx->cb_arg = cb_arg; 4663 4664 ctx->histogram = histogram; 4665 4666 spdk_for_each_channel(__bdev_to_io_dev(bdev), _spdk_bdev_histogram_get_channel, ctx, 4667 _spdk_bdev_histogram_get_channel_cb); 4668 } 4669 4670 SPDK_LOG_REGISTER_COMPONENT("bdev", SPDK_LOG_BDEV) 4671 4672 SPDK_TRACE_REGISTER_FN(bdev_trace, "bdev", TRACE_GROUP_BDEV) 4673 { 4674 spdk_trace_register_owner(OWNER_BDEV, 'b'); 4675 spdk_trace_register_object(OBJECT_BDEV_IO, 'i'); 4676 spdk_trace_register_description("BDEV_IO_START", TRACE_BDEV_IO_START, OWNER_BDEV, 4677 OBJECT_BDEV_IO, 1, 0, "type: "); 4678 spdk_trace_register_description("BDEV_IO_DONE", TRACE_BDEV_IO_DONE, OWNER_BDEV, 4679 OBJECT_BDEV_IO, 0, 0, ""); 4680 } 4681