1 /*- 2 * BSD LICENSE 3 * 4 * Copyright (c) Intel Corporation. 5 * All rights reserved. 6 * 7 * Redistribution and use in source and binary forms, with or without 8 * modification, are permitted provided that the following conditions 9 * are met: 10 * 11 * * Redistributions of source code must retain the above copyright 12 * notice, this list of conditions and the following disclaimer. 13 * * Redistributions in binary form must reproduce the above copyright 14 * notice, this list of conditions and the following disclaimer in 15 * the documentation and/or other materials provided with the 16 * distribution. 17 * * Neither the name of Intel Corporation nor the names of its 18 * contributors may be used to endorse or promote products derived 19 * from this software without specific prior written permission. 20 * 21 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 22 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 23 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR 24 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT 25 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, 26 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT 27 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 28 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY 29 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 30 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE 31 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 32 */ 33 34 #include "spdk/stdinc.h" 35 36 #include "spdk/bdev.h" 37 #include "spdk/conf.h" 38 39 #include "spdk/env.h" 40 #include "spdk/event.h" 41 #include "spdk/thread.h" 42 #include "spdk/likely.h" 43 #include "spdk/queue.h" 44 #include "spdk/nvme_spec.h" 45 #include "spdk/scsi_spec.h" 46 #include "spdk/util.h" 47 #include "spdk/trace.h" 48 49 #include "spdk/bdev_module.h" 50 #include "spdk_internal/log.h" 51 #include "spdk/string.h" 52 53 #ifdef SPDK_CONFIG_VTUNE 54 #include "ittnotify.h" 55 #include "ittnotify_types.h" 56 int __itt_init_ittlib(const char *, __itt_group_id); 57 #endif 58 59 #define SPDK_BDEV_IO_POOL_SIZE (64 * 1024) 60 #define SPDK_BDEV_IO_CACHE_SIZE 256 61 #define BUF_SMALL_POOL_SIZE 8192 62 #define BUF_LARGE_POOL_SIZE 1024 63 #define NOMEM_THRESHOLD_COUNT 8 64 #define ZERO_BUFFER_SIZE 0x100000 65 66 #define OWNER_BDEV 0x2 67 68 #define OBJECT_BDEV_IO 0x2 69 70 #define TRACE_GROUP_BDEV 0x3 71 #define TRACE_BDEV_IO_START SPDK_TPOINT_ID(TRACE_GROUP_BDEV, 0x0) 72 #define TRACE_BDEV_IO_DONE SPDK_TPOINT_ID(TRACE_GROUP_BDEV, 0x1) 73 74 #define SPDK_BDEV_QOS_TIMESLICE_IN_USEC 1000 75 #define SPDK_BDEV_QOS_MIN_IO_PER_TIMESLICE 1 76 #define SPDK_BDEV_QOS_MIN_BYTE_PER_TIMESLICE 512 77 #define SPDK_BDEV_QOS_MIN_IOS_PER_SEC 10000 78 #define SPDK_BDEV_QOS_MIN_BW_IN_MB_PER_SEC 10 79 80 enum spdk_bdev_qos_type { 81 SPDK_BDEV_QOS_RW_IOPS_RATE_LIMIT = 0, 82 SPDK_BDEV_QOS_RW_BYTEPS_RATE_LIMIT, 83 SPDK_BDEV_QOS_NUM_TYPES /* Keep last */ 84 }; 85 86 static const char *qos_type_str[SPDK_BDEV_QOS_NUM_TYPES] = {"Limit_IOPS", "Limit_BWPS"}; 87 88 TAILQ_HEAD(spdk_bdev_list, spdk_bdev); 89 90 struct spdk_bdev_mgr { 91 struct spdk_mempool *bdev_io_pool; 92 93 struct spdk_mempool *buf_small_pool; 94 struct spdk_mempool *buf_large_pool; 95 96 void *zero_buffer; 97 98 TAILQ_HEAD(, spdk_bdev_module) bdev_modules; 99 100 struct spdk_bdev_list bdevs; 101 102 bool init_complete; 103 bool module_init_complete; 104 105 #ifdef SPDK_CONFIG_VTUNE 106 __itt_domain *domain; 107 #endif 108 }; 109 110 static struct spdk_bdev_mgr g_bdev_mgr = { 111 .bdev_modules = TAILQ_HEAD_INITIALIZER(g_bdev_mgr.bdev_modules), 112 .bdevs = TAILQ_HEAD_INITIALIZER(g_bdev_mgr.bdevs), 113 .init_complete = false, 114 .module_init_complete = false, 115 }; 116 117 static struct spdk_bdev_opts g_bdev_opts = { 118 .bdev_io_pool_size = SPDK_BDEV_IO_POOL_SIZE, 119 .bdev_io_cache_size = SPDK_BDEV_IO_CACHE_SIZE, 120 }; 121 122 static spdk_bdev_init_cb g_init_cb_fn = NULL; 123 static void *g_init_cb_arg = NULL; 124 125 static spdk_bdev_fini_cb g_fini_cb_fn = NULL; 126 static void *g_fini_cb_arg = NULL; 127 static struct spdk_thread *g_fini_thread = NULL; 128 129 struct spdk_bdev_qos { 130 /** Rate limit, in I/O per second */ 131 uint64_t iops_rate_limit; 132 133 /** Rate limit, in byte per second */ 134 uint64_t byte_rate_limit; 135 136 /** The channel that all I/O are funneled through */ 137 struct spdk_bdev_channel *ch; 138 139 /** The thread on which the poller is running. */ 140 struct spdk_thread *thread; 141 142 /** Queue of I/O waiting to be issued. */ 143 bdev_io_tailq_t queued; 144 145 /** Size of a timeslice in tsc ticks. */ 146 uint64_t timeslice_size; 147 148 /** Timestamp of start of last timeslice. */ 149 uint64_t last_timeslice; 150 151 /** Maximum allowed IOs to be issued in one timeslice (e.g., 1ms) and 152 * only valid for the master channel which manages the outstanding IOs. */ 153 uint64_t max_ios_per_timeslice; 154 155 /** Maximum allowed bytes to be issued in one timeslice (e.g., 1ms) and 156 * only valid for the master channel which manages the outstanding IOs. */ 157 uint64_t max_byte_per_timeslice; 158 159 /** Remaining IO allowed in current timeslice (e.g., 1ms) */ 160 uint64_t io_remaining_this_timeslice; 161 162 /** Remaining bytes allowed in current timeslice (e.g., 1ms). 163 * Allowed to run negative if an I/O is submitted when some bytes are remaining, 164 * but the I/O is bigger than that amount. The excess will be deducted from the 165 * next timeslice. 166 */ 167 int64_t byte_remaining_this_timeslice; 168 169 /** Poller that processes queued I/O commands each time slice. */ 170 struct spdk_poller *poller; 171 }; 172 173 struct spdk_bdev_mgmt_channel { 174 bdev_io_stailq_t need_buf_small; 175 bdev_io_stailq_t need_buf_large; 176 177 /* 178 * Each thread keeps a cache of bdev_io - this allows 179 * bdev threads which are *not* DPDK threads to still 180 * benefit from a per-thread bdev_io cache. Without 181 * this, non-DPDK threads fetching from the mempool 182 * incur a cmpxchg on get and put. 183 */ 184 bdev_io_stailq_t per_thread_cache; 185 uint32_t per_thread_cache_count; 186 uint32_t bdev_io_cache_size; 187 188 TAILQ_HEAD(, spdk_bdev_shared_resource) shared_resources; 189 TAILQ_HEAD(, spdk_bdev_io_wait_entry) io_wait_queue; 190 }; 191 192 /* 193 * Per-module (or per-io_device) data. Multiple bdevs built on the same io_device 194 * will queue here their IO that awaits retry. It makes it possible to retry sending 195 * IO to one bdev after IO from other bdev completes. 196 */ 197 struct spdk_bdev_shared_resource { 198 /* The bdev management channel */ 199 struct spdk_bdev_mgmt_channel *mgmt_ch; 200 201 /* 202 * Count of I/O submitted to bdev module and waiting for completion. 203 * Incremented before submit_request() is called on an spdk_bdev_io. 204 */ 205 uint64_t io_outstanding; 206 207 /* 208 * Queue of IO awaiting retry because of a previous NOMEM status returned 209 * on this channel. 210 */ 211 bdev_io_tailq_t nomem_io; 212 213 /* 214 * Threshold which io_outstanding must drop to before retrying nomem_io. 215 */ 216 uint64_t nomem_threshold; 217 218 /* I/O channel allocated by a bdev module */ 219 struct spdk_io_channel *shared_ch; 220 221 /* Refcount of bdev channels using this resource */ 222 uint32_t ref; 223 224 TAILQ_ENTRY(spdk_bdev_shared_resource) link; 225 }; 226 227 #define BDEV_CH_RESET_IN_PROGRESS (1 << 0) 228 #define BDEV_CH_QOS_ENABLED (1 << 1) 229 230 struct spdk_bdev_channel { 231 struct spdk_bdev *bdev; 232 233 /* The channel for the underlying device */ 234 struct spdk_io_channel *channel; 235 236 /* Per io_device per thread data */ 237 struct spdk_bdev_shared_resource *shared_resource; 238 239 struct spdk_bdev_io_stat stat; 240 241 /* 242 * Count of I/O submitted through this channel and waiting for completion. 243 * Incremented before submit_request() is called on an spdk_bdev_io. 244 */ 245 uint64_t io_outstanding; 246 247 bdev_io_tailq_t queued_resets; 248 249 uint32_t flags; 250 251 #ifdef SPDK_CONFIG_VTUNE 252 uint64_t start_tsc; 253 uint64_t interval_tsc; 254 __itt_string_handle *handle; 255 struct spdk_bdev_io_stat prev_stat; 256 #endif 257 258 }; 259 260 struct spdk_bdev_desc { 261 struct spdk_bdev *bdev; 262 spdk_bdev_remove_cb_t remove_cb; 263 void *remove_ctx; 264 bool remove_scheduled; 265 bool write; 266 TAILQ_ENTRY(spdk_bdev_desc) link; 267 }; 268 269 struct spdk_bdev_iostat_ctx { 270 struct spdk_bdev_io_stat *stat; 271 spdk_bdev_get_device_stat_cb cb; 272 void *cb_arg; 273 }; 274 275 #define __bdev_to_io_dev(bdev) (((char *)bdev) + 1) 276 #define __bdev_from_io_dev(io_dev) ((struct spdk_bdev *)(((char *)io_dev) - 1)) 277 278 static void _spdk_bdev_write_zero_buffer_done(struct spdk_bdev_io *bdev_io, bool success, 279 void *cb_arg); 280 static void _spdk_bdev_write_zero_buffer_next(void *_bdev_io); 281 282 void 283 spdk_bdev_get_opts(struct spdk_bdev_opts *opts) 284 { 285 *opts = g_bdev_opts; 286 } 287 288 int 289 spdk_bdev_set_opts(struct spdk_bdev_opts *opts) 290 { 291 uint32_t min_pool_size; 292 293 /* 294 * Add 1 to the thread count to account for the extra mgmt_ch that gets created during subsystem 295 * initialization. A second mgmt_ch will be created on the same thread when the application starts 296 * but before the deferred put_io_channel event is executed for the first mgmt_ch. 297 */ 298 min_pool_size = opts->bdev_io_cache_size * (spdk_thread_get_count() + 1); 299 if (opts->bdev_io_pool_size < min_pool_size) { 300 SPDK_ERRLOG("bdev_io_pool_size %" PRIu32 " is not compatible with bdev_io_cache_size %" PRIu32 301 " and %" PRIu32 " threads\n", opts->bdev_io_pool_size, opts->bdev_io_cache_size, 302 spdk_thread_get_count()); 303 SPDK_ERRLOG("bdev_io_pool_size must be at least %" PRIu32 "\n", min_pool_size); 304 return -1; 305 } 306 307 g_bdev_opts = *opts; 308 return 0; 309 } 310 311 struct spdk_bdev * 312 spdk_bdev_first(void) 313 { 314 struct spdk_bdev *bdev; 315 316 bdev = TAILQ_FIRST(&g_bdev_mgr.bdevs); 317 if (bdev) { 318 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Starting bdev iteration at %s\n", bdev->name); 319 } 320 321 return bdev; 322 } 323 324 struct spdk_bdev * 325 spdk_bdev_next(struct spdk_bdev *prev) 326 { 327 struct spdk_bdev *bdev; 328 329 bdev = TAILQ_NEXT(prev, internal.link); 330 if (bdev) { 331 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Continuing bdev iteration at %s\n", bdev->name); 332 } 333 334 return bdev; 335 } 336 337 static struct spdk_bdev * 338 _bdev_next_leaf(struct spdk_bdev *bdev) 339 { 340 while (bdev != NULL) { 341 if (bdev->internal.claim_module == NULL) { 342 return bdev; 343 } else { 344 bdev = TAILQ_NEXT(bdev, internal.link); 345 } 346 } 347 348 return bdev; 349 } 350 351 struct spdk_bdev * 352 spdk_bdev_first_leaf(void) 353 { 354 struct spdk_bdev *bdev; 355 356 bdev = _bdev_next_leaf(TAILQ_FIRST(&g_bdev_mgr.bdevs)); 357 358 if (bdev) { 359 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Starting bdev iteration at %s\n", bdev->name); 360 } 361 362 return bdev; 363 } 364 365 struct spdk_bdev * 366 spdk_bdev_next_leaf(struct spdk_bdev *prev) 367 { 368 struct spdk_bdev *bdev; 369 370 bdev = _bdev_next_leaf(TAILQ_NEXT(prev, internal.link)); 371 372 if (bdev) { 373 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Continuing bdev iteration at %s\n", bdev->name); 374 } 375 376 return bdev; 377 } 378 379 struct spdk_bdev * 380 spdk_bdev_get_by_name(const char *bdev_name) 381 { 382 struct spdk_bdev_alias *tmp; 383 struct spdk_bdev *bdev = spdk_bdev_first(); 384 385 while (bdev != NULL) { 386 if (strcmp(bdev_name, bdev->name) == 0) { 387 return bdev; 388 } 389 390 TAILQ_FOREACH(tmp, &bdev->aliases, tailq) { 391 if (strcmp(bdev_name, tmp->alias) == 0) { 392 return bdev; 393 } 394 } 395 396 bdev = spdk_bdev_next(bdev); 397 } 398 399 return NULL; 400 } 401 402 void 403 spdk_bdev_io_set_buf(struct spdk_bdev_io *bdev_io, void *buf, size_t len) 404 { 405 struct iovec *iovs; 406 407 iovs = bdev_io->u.bdev.iovs; 408 409 assert(iovs != NULL); 410 assert(bdev_io->u.bdev.iovcnt >= 1); 411 412 iovs[0].iov_base = buf; 413 iovs[0].iov_len = len; 414 } 415 416 static void 417 spdk_bdev_io_put_buf(struct spdk_bdev_io *bdev_io) 418 { 419 struct spdk_mempool *pool; 420 struct spdk_bdev_io *tmp; 421 void *buf, *aligned_buf; 422 bdev_io_stailq_t *stailq; 423 struct spdk_bdev_mgmt_channel *ch; 424 425 assert(bdev_io->u.bdev.iovcnt == 1); 426 427 buf = bdev_io->internal.buf; 428 ch = bdev_io->internal.ch->shared_resource->mgmt_ch; 429 430 bdev_io->internal.buf = NULL; 431 432 if (bdev_io->internal.buf_len <= SPDK_BDEV_SMALL_BUF_MAX_SIZE) { 433 pool = g_bdev_mgr.buf_small_pool; 434 stailq = &ch->need_buf_small; 435 } else { 436 pool = g_bdev_mgr.buf_large_pool; 437 stailq = &ch->need_buf_large; 438 } 439 440 if (STAILQ_EMPTY(stailq)) { 441 spdk_mempool_put(pool, buf); 442 } else { 443 tmp = STAILQ_FIRST(stailq); 444 445 aligned_buf = (void *)(((uintptr_t)buf + 511) & ~511UL); 446 spdk_bdev_io_set_buf(bdev_io, aligned_buf, tmp->internal.buf_len); 447 448 STAILQ_REMOVE_HEAD(stailq, internal.buf_link); 449 tmp->internal.buf = buf; 450 tmp->internal.get_buf_cb(tmp->internal.ch->channel, tmp); 451 } 452 } 453 454 void 455 spdk_bdev_io_get_buf(struct spdk_bdev_io *bdev_io, spdk_bdev_io_get_buf_cb cb, uint64_t len) 456 { 457 struct spdk_mempool *pool; 458 bdev_io_stailq_t *stailq; 459 void *buf, *aligned_buf; 460 struct spdk_bdev_mgmt_channel *mgmt_ch; 461 462 assert(cb != NULL); 463 assert(bdev_io->u.bdev.iovs != NULL); 464 465 if (spdk_unlikely(bdev_io->u.bdev.iovs[0].iov_base != NULL)) { 466 /* Buffer already present */ 467 cb(bdev_io->internal.ch->channel, bdev_io); 468 return; 469 } 470 471 assert(len <= SPDK_BDEV_LARGE_BUF_MAX_SIZE); 472 mgmt_ch = bdev_io->internal.ch->shared_resource->mgmt_ch; 473 474 bdev_io->internal.buf_len = len; 475 bdev_io->internal.get_buf_cb = cb; 476 if (len <= SPDK_BDEV_SMALL_BUF_MAX_SIZE) { 477 pool = g_bdev_mgr.buf_small_pool; 478 stailq = &mgmt_ch->need_buf_small; 479 } else { 480 pool = g_bdev_mgr.buf_large_pool; 481 stailq = &mgmt_ch->need_buf_large; 482 } 483 484 buf = spdk_mempool_get(pool); 485 486 if (!buf) { 487 STAILQ_INSERT_TAIL(stailq, bdev_io, internal.buf_link); 488 } else { 489 aligned_buf = (void *)(((uintptr_t)buf + 511) & ~511UL); 490 spdk_bdev_io_set_buf(bdev_io, aligned_buf, len); 491 492 bdev_io->internal.buf = buf; 493 bdev_io->internal.get_buf_cb(bdev_io->internal.ch->channel, bdev_io); 494 } 495 } 496 497 static int 498 spdk_bdev_module_get_max_ctx_size(void) 499 { 500 struct spdk_bdev_module *bdev_module; 501 int max_bdev_module_size = 0; 502 503 TAILQ_FOREACH(bdev_module, &g_bdev_mgr.bdev_modules, internal.tailq) { 504 if (bdev_module->get_ctx_size && bdev_module->get_ctx_size() > max_bdev_module_size) { 505 max_bdev_module_size = bdev_module->get_ctx_size(); 506 } 507 } 508 509 return max_bdev_module_size; 510 } 511 512 void 513 spdk_bdev_config_text(FILE *fp) 514 { 515 struct spdk_bdev_module *bdev_module; 516 517 TAILQ_FOREACH(bdev_module, &g_bdev_mgr.bdev_modules, internal.tailq) { 518 if (bdev_module->config_text) { 519 bdev_module->config_text(fp); 520 } 521 } 522 } 523 524 void 525 spdk_bdev_subsystem_config_json(struct spdk_json_write_ctx *w) 526 { 527 struct spdk_bdev_module *bdev_module; 528 struct spdk_bdev *bdev; 529 530 assert(w != NULL); 531 532 spdk_json_write_array_begin(w); 533 534 spdk_json_write_object_begin(w); 535 spdk_json_write_named_string(w, "method", "set_bdev_options"); 536 spdk_json_write_name(w, "params"); 537 spdk_json_write_object_begin(w); 538 spdk_json_write_named_uint32(w, "bdev_io_pool_size", g_bdev_opts.bdev_io_pool_size); 539 spdk_json_write_named_uint32(w, "bdev_io_cache_size", g_bdev_opts.bdev_io_cache_size); 540 spdk_json_write_object_end(w); 541 spdk_json_write_object_end(w); 542 543 TAILQ_FOREACH(bdev_module, &g_bdev_mgr.bdev_modules, internal.tailq) { 544 if (bdev_module->config_json) { 545 bdev_module->config_json(w); 546 } 547 } 548 549 TAILQ_FOREACH(bdev, &g_bdev_mgr.bdevs, internal.link) { 550 spdk_bdev_config_json(bdev, w); 551 } 552 553 spdk_json_write_array_end(w); 554 } 555 556 static int 557 spdk_bdev_mgmt_channel_create(void *io_device, void *ctx_buf) 558 { 559 struct spdk_bdev_mgmt_channel *ch = ctx_buf; 560 struct spdk_bdev_io *bdev_io; 561 uint32_t i; 562 563 STAILQ_INIT(&ch->need_buf_small); 564 STAILQ_INIT(&ch->need_buf_large); 565 566 STAILQ_INIT(&ch->per_thread_cache); 567 ch->bdev_io_cache_size = g_bdev_opts.bdev_io_cache_size; 568 569 /* Pre-populate bdev_io cache to ensure this thread cannot be starved. */ 570 ch->per_thread_cache_count = 0; 571 for (i = 0; i < ch->bdev_io_cache_size; i++) { 572 bdev_io = spdk_mempool_get(g_bdev_mgr.bdev_io_pool); 573 assert(bdev_io != NULL); 574 ch->per_thread_cache_count++; 575 STAILQ_INSERT_TAIL(&ch->per_thread_cache, bdev_io, internal.buf_link); 576 } 577 578 TAILQ_INIT(&ch->shared_resources); 579 TAILQ_INIT(&ch->io_wait_queue); 580 581 return 0; 582 } 583 584 static void 585 spdk_bdev_mgmt_channel_destroy(void *io_device, void *ctx_buf) 586 { 587 struct spdk_bdev_mgmt_channel *ch = ctx_buf; 588 struct spdk_bdev_io *bdev_io; 589 590 if (!STAILQ_EMPTY(&ch->need_buf_small) || !STAILQ_EMPTY(&ch->need_buf_large)) { 591 SPDK_ERRLOG("Pending I/O list wasn't empty on mgmt channel free\n"); 592 } 593 594 if (!TAILQ_EMPTY(&ch->shared_resources)) { 595 SPDK_ERRLOG("Module channel list wasn't empty on mgmt channel free\n"); 596 } 597 598 while (!STAILQ_EMPTY(&ch->per_thread_cache)) { 599 bdev_io = STAILQ_FIRST(&ch->per_thread_cache); 600 STAILQ_REMOVE_HEAD(&ch->per_thread_cache, internal.buf_link); 601 ch->per_thread_cache_count--; 602 spdk_mempool_put(g_bdev_mgr.bdev_io_pool, (void *)bdev_io); 603 } 604 605 assert(ch->per_thread_cache_count == 0); 606 } 607 608 static void 609 spdk_bdev_init_complete(int rc) 610 { 611 spdk_bdev_init_cb cb_fn = g_init_cb_fn; 612 void *cb_arg = g_init_cb_arg; 613 struct spdk_bdev_module *m; 614 615 g_bdev_mgr.init_complete = true; 616 g_init_cb_fn = NULL; 617 g_init_cb_arg = NULL; 618 619 /* 620 * For modules that need to know when subsystem init is complete, 621 * inform them now. 622 */ 623 if (rc == 0) { 624 TAILQ_FOREACH(m, &g_bdev_mgr.bdev_modules, internal.tailq) { 625 if (m->init_complete) { 626 m->init_complete(); 627 } 628 } 629 } 630 631 cb_fn(cb_arg, rc); 632 } 633 634 static void 635 spdk_bdev_module_action_complete(void) 636 { 637 struct spdk_bdev_module *m; 638 639 /* 640 * Don't finish bdev subsystem initialization if 641 * module pre-initialization is still in progress, or 642 * the subsystem been already initialized. 643 */ 644 if (!g_bdev_mgr.module_init_complete || g_bdev_mgr.init_complete) { 645 return; 646 } 647 648 /* 649 * Check all bdev modules for inits/examinations in progress. If any 650 * exist, return immediately since we cannot finish bdev subsystem 651 * initialization until all are completed. 652 */ 653 TAILQ_FOREACH(m, &g_bdev_mgr.bdev_modules, internal.tailq) { 654 if (m->internal.action_in_progress > 0) { 655 return; 656 } 657 } 658 659 /* 660 * Modules already finished initialization - now that all 661 * the bdev modules have finished their asynchronous I/O 662 * processing, the entire bdev layer can be marked as complete. 663 */ 664 spdk_bdev_init_complete(0); 665 } 666 667 static void 668 spdk_bdev_module_action_done(struct spdk_bdev_module *module) 669 { 670 assert(module->internal.action_in_progress > 0); 671 module->internal.action_in_progress--; 672 spdk_bdev_module_action_complete(); 673 } 674 675 void 676 spdk_bdev_module_init_done(struct spdk_bdev_module *module) 677 { 678 spdk_bdev_module_action_done(module); 679 } 680 681 void 682 spdk_bdev_module_examine_done(struct spdk_bdev_module *module) 683 { 684 spdk_bdev_module_action_done(module); 685 } 686 687 static int 688 spdk_bdev_modules_init(void) 689 { 690 struct spdk_bdev_module *module; 691 int rc = 0; 692 693 TAILQ_FOREACH(module, &g_bdev_mgr.bdev_modules, internal.tailq) { 694 rc = module->module_init(); 695 if (rc != 0) { 696 break; 697 } 698 } 699 700 g_bdev_mgr.module_init_complete = true; 701 return rc; 702 } 703 704 705 static void 706 spdk_bdev_init_failed_complete(void *cb_arg) 707 { 708 spdk_bdev_init_complete(-1); 709 } 710 711 static void 712 spdk_bdev_init_failed(void *cb_arg) 713 { 714 spdk_bdev_finish(spdk_bdev_init_failed_complete, NULL); 715 } 716 717 void 718 spdk_bdev_initialize(spdk_bdev_init_cb cb_fn, void *cb_arg) 719 { 720 struct spdk_conf_section *sp; 721 struct spdk_bdev_opts bdev_opts; 722 int32_t bdev_io_pool_size, bdev_io_cache_size; 723 int cache_size; 724 int rc = 0; 725 char mempool_name[32]; 726 727 assert(cb_fn != NULL); 728 729 sp = spdk_conf_find_section(NULL, "Bdev"); 730 if (sp != NULL) { 731 spdk_bdev_get_opts(&bdev_opts); 732 733 bdev_io_pool_size = spdk_conf_section_get_intval(sp, "BdevIoPoolSize"); 734 if (bdev_io_pool_size >= 0) { 735 bdev_opts.bdev_io_pool_size = bdev_io_pool_size; 736 } 737 738 bdev_io_cache_size = spdk_conf_section_get_intval(sp, "BdevIoCacheSize"); 739 if (bdev_io_cache_size >= 0) { 740 bdev_opts.bdev_io_cache_size = bdev_io_cache_size; 741 } 742 743 if (spdk_bdev_set_opts(&bdev_opts)) { 744 spdk_bdev_init_complete(-1); 745 return; 746 } 747 748 assert(memcmp(&bdev_opts, &g_bdev_opts, sizeof(bdev_opts)) == 0); 749 } 750 751 g_init_cb_fn = cb_fn; 752 g_init_cb_arg = cb_arg; 753 754 snprintf(mempool_name, sizeof(mempool_name), "bdev_io_%d", getpid()); 755 756 g_bdev_mgr.bdev_io_pool = spdk_mempool_create(mempool_name, 757 g_bdev_opts.bdev_io_pool_size, 758 sizeof(struct spdk_bdev_io) + 759 spdk_bdev_module_get_max_ctx_size(), 760 0, 761 SPDK_ENV_SOCKET_ID_ANY); 762 763 if (g_bdev_mgr.bdev_io_pool == NULL) { 764 SPDK_ERRLOG("could not allocate spdk_bdev_io pool\n"); 765 spdk_bdev_init_complete(-1); 766 return; 767 } 768 769 /** 770 * Ensure no more than half of the total buffers end up local caches, by 771 * using spdk_thread_get_count() to determine how many local caches we need 772 * to account for. 773 */ 774 cache_size = BUF_SMALL_POOL_SIZE / (2 * spdk_thread_get_count()); 775 snprintf(mempool_name, sizeof(mempool_name), "buf_small_pool_%d", getpid()); 776 777 g_bdev_mgr.buf_small_pool = spdk_mempool_create(mempool_name, 778 BUF_SMALL_POOL_SIZE, 779 SPDK_BDEV_SMALL_BUF_MAX_SIZE + 512, 780 cache_size, 781 SPDK_ENV_SOCKET_ID_ANY); 782 if (!g_bdev_mgr.buf_small_pool) { 783 SPDK_ERRLOG("create rbuf small pool failed\n"); 784 spdk_bdev_init_complete(-1); 785 return; 786 } 787 788 cache_size = BUF_LARGE_POOL_SIZE / (2 * spdk_thread_get_count()); 789 snprintf(mempool_name, sizeof(mempool_name), "buf_large_pool_%d", getpid()); 790 791 g_bdev_mgr.buf_large_pool = spdk_mempool_create(mempool_name, 792 BUF_LARGE_POOL_SIZE, 793 SPDK_BDEV_LARGE_BUF_MAX_SIZE + 512, 794 cache_size, 795 SPDK_ENV_SOCKET_ID_ANY); 796 if (!g_bdev_mgr.buf_large_pool) { 797 SPDK_ERRLOG("create rbuf large pool failed\n"); 798 spdk_bdev_init_complete(-1); 799 return; 800 } 801 802 g_bdev_mgr.zero_buffer = spdk_dma_zmalloc(ZERO_BUFFER_SIZE, ZERO_BUFFER_SIZE, 803 NULL); 804 if (!g_bdev_mgr.zero_buffer) { 805 SPDK_ERRLOG("create bdev zero buffer failed\n"); 806 spdk_bdev_init_complete(-1); 807 return; 808 } 809 810 #ifdef SPDK_CONFIG_VTUNE 811 g_bdev_mgr.domain = __itt_domain_create("spdk_bdev"); 812 #endif 813 814 spdk_io_device_register(&g_bdev_mgr, spdk_bdev_mgmt_channel_create, 815 spdk_bdev_mgmt_channel_destroy, 816 sizeof(struct spdk_bdev_mgmt_channel)); 817 818 rc = spdk_bdev_modules_init(); 819 if (rc != 0) { 820 SPDK_ERRLOG("bdev modules init failed\n"); 821 spdk_thread_send_msg(spdk_get_thread(), spdk_bdev_init_failed, NULL); 822 return; 823 } 824 825 spdk_bdev_module_action_complete(); 826 } 827 828 static void 829 spdk_bdev_mgr_unregister_cb(void *io_device) 830 { 831 spdk_bdev_fini_cb cb_fn = g_fini_cb_fn; 832 833 if (spdk_mempool_count(g_bdev_mgr.bdev_io_pool) != g_bdev_opts.bdev_io_pool_size) { 834 SPDK_ERRLOG("bdev IO pool count is %zu but should be %u\n", 835 spdk_mempool_count(g_bdev_mgr.bdev_io_pool), 836 g_bdev_opts.bdev_io_pool_size); 837 } 838 839 if (spdk_mempool_count(g_bdev_mgr.buf_small_pool) != BUF_SMALL_POOL_SIZE) { 840 SPDK_ERRLOG("Small buffer pool count is %zu but should be %u\n", 841 spdk_mempool_count(g_bdev_mgr.buf_small_pool), 842 BUF_SMALL_POOL_SIZE); 843 assert(false); 844 } 845 846 if (spdk_mempool_count(g_bdev_mgr.buf_large_pool) != BUF_LARGE_POOL_SIZE) { 847 SPDK_ERRLOG("Large buffer pool count is %zu but should be %u\n", 848 spdk_mempool_count(g_bdev_mgr.buf_large_pool), 849 BUF_LARGE_POOL_SIZE); 850 assert(false); 851 } 852 853 spdk_mempool_free(g_bdev_mgr.bdev_io_pool); 854 spdk_mempool_free(g_bdev_mgr.buf_small_pool); 855 spdk_mempool_free(g_bdev_mgr.buf_large_pool); 856 spdk_dma_free(g_bdev_mgr.zero_buffer); 857 858 cb_fn(g_fini_cb_arg); 859 g_fini_cb_fn = NULL; 860 g_fini_cb_arg = NULL; 861 } 862 863 static struct spdk_bdev_module *g_resume_bdev_module = NULL; 864 865 static void 866 spdk_bdev_module_finish_iter(void *arg) 867 { 868 struct spdk_bdev_module *bdev_module; 869 870 /* Start iterating from the last touched module */ 871 if (!g_resume_bdev_module) { 872 bdev_module = TAILQ_FIRST(&g_bdev_mgr.bdev_modules); 873 } else { 874 bdev_module = TAILQ_NEXT(g_resume_bdev_module, internal.tailq); 875 } 876 877 while (bdev_module) { 878 if (bdev_module->async_fini) { 879 /* Save our place so we can resume later. We must 880 * save the variable here, before calling module_fini() 881 * below, because in some cases the module may immediately 882 * call spdk_bdev_module_finish_done() and re-enter 883 * this function to continue iterating. */ 884 g_resume_bdev_module = bdev_module; 885 } 886 887 if (bdev_module->module_fini) { 888 bdev_module->module_fini(); 889 } 890 891 if (bdev_module->async_fini) { 892 return; 893 } 894 895 bdev_module = TAILQ_NEXT(bdev_module, internal.tailq); 896 } 897 898 g_resume_bdev_module = NULL; 899 spdk_io_device_unregister(&g_bdev_mgr, spdk_bdev_mgr_unregister_cb); 900 } 901 902 void 903 spdk_bdev_module_finish_done(void) 904 { 905 if (spdk_get_thread() != g_fini_thread) { 906 spdk_thread_send_msg(g_fini_thread, spdk_bdev_module_finish_iter, NULL); 907 } else { 908 spdk_bdev_module_finish_iter(NULL); 909 } 910 } 911 912 static void 913 _spdk_bdev_finish_unregister_bdevs_iter(void *cb_arg, int bdeverrno) 914 { 915 struct spdk_bdev *bdev = cb_arg; 916 917 if (bdeverrno && bdev) { 918 SPDK_WARNLOG("Unable to unregister bdev '%s' during spdk_bdev_finish()\n", 919 bdev->name); 920 921 /* 922 * Since the call to spdk_bdev_unregister() failed, we have no way to free this 923 * bdev; try to continue by manually removing this bdev from the list and continue 924 * with the next bdev in the list. 925 */ 926 TAILQ_REMOVE(&g_bdev_mgr.bdevs, bdev, internal.link); 927 } 928 929 if (TAILQ_EMPTY(&g_bdev_mgr.bdevs)) { 930 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Done unregistering bdevs\n"); 931 /* 932 * Bdev module finish need to be deffered as we might be in the middle of some context 933 * (like bdev part free) that will use this bdev (or private bdev driver ctx data) 934 * after returning. 935 */ 936 spdk_thread_send_msg(spdk_get_thread(), spdk_bdev_module_finish_iter, NULL); 937 return; 938 } 939 940 /* 941 * Unregister the last bdev in the list. The last bdev in the list should be a bdev 942 * that has no bdevs that depend on it. 943 */ 944 bdev = TAILQ_LAST(&g_bdev_mgr.bdevs, spdk_bdev_list); 945 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Unregistering bdev '%s'\n", bdev->name); 946 spdk_bdev_unregister(bdev, _spdk_bdev_finish_unregister_bdevs_iter, bdev); 947 } 948 949 void 950 spdk_bdev_finish(spdk_bdev_fini_cb cb_fn, void *cb_arg) 951 { 952 struct spdk_bdev_module *m; 953 954 assert(cb_fn != NULL); 955 956 g_fini_thread = spdk_get_thread(); 957 958 g_fini_cb_fn = cb_fn; 959 g_fini_cb_arg = cb_arg; 960 961 TAILQ_FOREACH(m, &g_bdev_mgr.bdev_modules, internal.tailq) { 962 if (m->fini_start) { 963 m->fini_start(); 964 } 965 } 966 967 _spdk_bdev_finish_unregister_bdevs_iter(NULL, 0); 968 } 969 970 static struct spdk_bdev_io * 971 spdk_bdev_get_io(struct spdk_bdev_channel *channel) 972 { 973 struct spdk_bdev_mgmt_channel *ch = channel->shared_resource->mgmt_ch; 974 struct spdk_bdev_io *bdev_io; 975 976 if (ch->per_thread_cache_count > 0) { 977 bdev_io = STAILQ_FIRST(&ch->per_thread_cache); 978 STAILQ_REMOVE_HEAD(&ch->per_thread_cache, internal.buf_link); 979 ch->per_thread_cache_count--; 980 } else if (spdk_unlikely(!TAILQ_EMPTY(&ch->io_wait_queue))) { 981 /* 982 * Don't try to look for bdev_ios in the global pool if there are 983 * waiters on bdev_ios - we don't want this caller to jump the line. 984 */ 985 bdev_io = NULL; 986 } else { 987 bdev_io = spdk_mempool_get(g_bdev_mgr.bdev_io_pool); 988 } 989 990 return bdev_io; 991 } 992 993 void 994 spdk_bdev_free_io(struct spdk_bdev_io *bdev_io) 995 { 996 struct spdk_bdev_mgmt_channel *ch = bdev_io->internal.ch->shared_resource->mgmt_ch; 997 998 assert(bdev_io != NULL); 999 assert(bdev_io->internal.status != SPDK_BDEV_IO_STATUS_PENDING); 1000 1001 if (bdev_io->internal.buf != NULL) { 1002 spdk_bdev_io_put_buf(bdev_io); 1003 } 1004 1005 if (ch->per_thread_cache_count < ch->bdev_io_cache_size) { 1006 ch->per_thread_cache_count++; 1007 STAILQ_INSERT_TAIL(&ch->per_thread_cache, bdev_io, internal.buf_link); 1008 while (ch->per_thread_cache_count > 0 && !TAILQ_EMPTY(&ch->io_wait_queue)) { 1009 struct spdk_bdev_io_wait_entry *entry; 1010 1011 entry = TAILQ_FIRST(&ch->io_wait_queue); 1012 TAILQ_REMOVE(&ch->io_wait_queue, entry, link); 1013 entry->cb_fn(entry->cb_arg); 1014 } 1015 } else { 1016 /* We should never have a full cache with entries on the io wait queue. */ 1017 assert(TAILQ_EMPTY(&ch->io_wait_queue)); 1018 spdk_mempool_put(g_bdev_mgr.bdev_io_pool, (void *)bdev_io); 1019 } 1020 } 1021 1022 static uint64_t 1023 _spdk_bdev_get_io_size_in_byte(struct spdk_bdev_io *bdev_io) 1024 { 1025 struct spdk_bdev *bdev = bdev_io->bdev; 1026 1027 switch (bdev_io->type) { 1028 case SPDK_BDEV_IO_TYPE_NVME_ADMIN: 1029 case SPDK_BDEV_IO_TYPE_NVME_IO: 1030 case SPDK_BDEV_IO_TYPE_NVME_IO_MD: 1031 return bdev_io->u.nvme_passthru.nbytes; 1032 case SPDK_BDEV_IO_TYPE_READ: 1033 case SPDK_BDEV_IO_TYPE_WRITE: 1034 case SPDK_BDEV_IO_TYPE_UNMAP: 1035 case SPDK_BDEV_IO_TYPE_WRITE_ZEROES: 1036 return bdev_io->u.bdev.num_blocks * bdev->blocklen; 1037 default: 1038 return 0; 1039 } 1040 } 1041 1042 static void 1043 _spdk_bdev_qos_io_submit(struct spdk_bdev_channel *ch) 1044 { 1045 struct spdk_bdev_io *bdev_io = NULL; 1046 struct spdk_bdev *bdev = ch->bdev; 1047 struct spdk_bdev_qos *qos = bdev->internal.qos; 1048 struct spdk_bdev_shared_resource *shared_resource = ch->shared_resource; 1049 1050 while (!TAILQ_EMPTY(&qos->queued)) { 1051 if (qos->max_ios_per_timeslice > 0 && qos->io_remaining_this_timeslice == 0) { 1052 break; 1053 } 1054 1055 if (qos->max_byte_per_timeslice > 0 && qos->byte_remaining_this_timeslice <= 0) { 1056 break; 1057 } 1058 1059 bdev_io = TAILQ_FIRST(&qos->queued); 1060 TAILQ_REMOVE(&qos->queued, bdev_io, internal.link); 1061 qos->io_remaining_this_timeslice--; 1062 qos->byte_remaining_this_timeslice -= _spdk_bdev_get_io_size_in_byte(bdev_io); 1063 ch->io_outstanding++; 1064 shared_resource->io_outstanding++; 1065 bdev->fn_table->submit_request(ch->channel, bdev_io); 1066 } 1067 } 1068 1069 static bool 1070 _spdk_bdev_io_type_can_split(uint8_t type) 1071 { 1072 assert(type != SPDK_BDEV_IO_TYPE_INVALID); 1073 assert(type < SPDK_BDEV_NUM_IO_TYPES); 1074 1075 /* Only split READ and WRITE I/O. Theoretically other types of I/O like 1076 * UNMAP could be split, but these types of I/O are typically much larger 1077 * in size (sometimes the size of the entire block device), and the bdev 1078 * module can more efficiently split these types of I/O. Plus those types 1079 * of I/O do not have a payload, which makes the splitting process simpler. 1080 */ 1081 if (type == SPDK_BDEV_IO_TYPE_READ || type == SPDK_BDEV_IO_TYPE_WRITE) { 1082 return true; 1083 } else { 1084 return false; 1085 } 1086 } 1087 1088 static bool 1089 _spdk_bdev_io_should_split(struct spdk_bdev_io *bdev_io) 1090 { 1091 uint64_t start_stripe, end_stripe; 1092 uint32_t io_boundary = bdev_io->bdev->optimal_io_boundary; 1093 1094 if (io_boundary == 0) { 1095 return false; 1096 } 1097 1098 if (!_spdk_bdev_io_type_can_split(bdev_io->type)) { 1099 return false; 1100 } 1101 1102 start_stripe = bdev_io->u.bdev.offset_blocks; 1103 end_stripe = start_stripe + bdev_io->u.bdev.num_blocks - 1; 1104 /* Avoid expensive div operations if possible. These spdk_u32 functions are very cheap. */ 1105 if (spdk_likely(spdk_u32_is_pow2(io_boundary))) { 1106 start_stripe >>= spdk_u32log2(io_boundary); 1107 end_stripe >>= spdk_u32log2(io_boundary); 1108 } else { 1109 start_stripe /= io_boundary; 1110 end_stripe /= io_boundary; 1111 } 1112 return (start_stripe != end_stripe); 1113 } 1114 1115 static uint32_t 1116 _to_next_boundary(uint64_t offset, uint32_t boundary) 1117 { 1118 return (boundary - (offset % boundary)); 1119 } 1120 1121 static void 1122 _spdk_bdev_io_split_done(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg); 1123 1124 static void 1125 _spdk_bdev_io_split_with_payload(void *_bdev_io) 1126 { 1127 struct spdk_bdev_io *bdev_io = _bdev_io; 1128 uint64_t current_offset, remaining, bytes_handled; 1129 uint32_t blocklen, to_next_boundary, to_next_boundary_bytes; 1130 struct iovec *parent_iov; 1131 uint64_t parent_iov_offset, child_iov_len; 1132 uint32_t child_iovcnt; 1133 int rc; 1134 1135 remaining = bdev_io->u.bdev.split_remaining_num_blocks; 1136 current_offset = bdev_io->u.bdev.split_current_offset_blocks; 1137 blocklen = bdev_io->bdev->blocklen; 1138 bytes_handled = (current_offset - bdev_io->u.bdev.offset_blocks) * blocklen; 1139 parent_iov = &bdev_io->u.bdev.iovs[0]; 1140 parent_iov_offset = 0; 1141 1142 while (bytes_handled > 0) { 1143 if (bytes_handled >= parent_iov->iov_len) { 1144 bytes_handled -= parent_iov->iov_len; 1145 parent_iov++; 1146 continue; 1147 } 1148 parent_iov_offset += bytes_handled; 1149 break; 1150 } 1151 1152 to_next_boundary = _to_next_boundary(current_offset, bdev_io->bdev->optimal_io_boundary); 1153 to_next_boundary = spdk_min(remaining, to_next_boundary); 1154 to_next_boundary_bytes = to_next_boundary * blocklen; 1155 child_iovcnt = 0; 1156 while (to_next_boundary_bytes > 0) { 1157 child_iov_len = spdk_min(to_next_boundary_bytes, parent_iov->iov_len - parent_iov_offset); 1158 to_next_boundary_bytes -= child_iov_len; 1159 1160 bdev_io->child_iov[child_iovcnt].iov_base = parent_iov->iov_base + parent_iov_offset; 1161 bdev_io->child_iov[child_iovcnt].iov_len = child_iov_len; 1162 1163 parent_iov++; 1164 parent_iov_offset = 0; 1165 child_iovcnt++; 1166 if (child_iovcnt == BDEV_IO_NUM_CHILD_IOV && to_next_boundary_bytes > 0) { 1167 /* We've run out of child iovs - we need to fail this I/O. */ 1168 bdev_io->internal.status = SPDK_BDEV_IO_STATUS_FAILED; 1169 bdev_io->internal.cb(bdev_io, SPDK_BDEV_IO_STATUS_FAILED, 1170 bdev_io->internal.caller_ctx); 1171 return; 1172 } 1173 } 1174 1175 if (bdev_io->type == SPDK_BDEV_IO_TYPE_READ) { 1176 rc = spdk_bdev_readv_blocks(bdev_io->internal.desc, 1177 spdk_io_channel_from_ctx(bdev_io->internal.ch), 1178 bdev_io->child_iov, child_iovcnt, current_offset, to_next_boundary, 1179 _spdk_bdev_io_split_done, bdev_io); 1180 } else { 1181 rc = spdk_bdev_writev_blocks(bdev_io->internal.desc, 1182 spdk_io_channel_from_ctx(bdev_io->internal.ch), 1183 bdev_io->child_iov, child_iovcnt, current_offset, to_next_boundary, 1184 _spdk_bdev_io_split_done, bdev_io); 1185 } 1186 1187 if (rc == 0) { 1188 bdev_io->u.bdev.split_current_offset_blocks += to_next_boundary; 1189 bdev_io->u.bdev.split_remaining_num_blocks -= to_next_boundary; 1190 } else { 1191 assert(rc == -ENOMEM); 1192 bdev_io->internal.waitq_entry.bdev = bdev_io->bdev; 1193 bdev_io->internal.waitq_entry.cb_fn = _spdk_bdev_io_split_with_payload; 1194 bdev_io->internal.waitq_entry.cb_arg = bdev_io; 1195 spdk_bdev_queue_io_wait(bdev_io->bdev, spdk_io_channel_from_ctx(bdev_io->internal.ch), 1196 &bdev_io->internal.waitq_entry); 1197 } 1198 } 1199 1200 static void 1201 _spdk_bdev_io_split_done(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg) 1202 { 1203 struct spdk_bdev_io *parent_io = cb_arg; 1204 1205 spdk_bdev_free_io(bdev_io); 1206 1207 if (!success) { 1208 parent_io->internal.status = SPDK_BDEV_IO_STATUS_FAILED; 1209 parent_io->internal.cb(parent_io, SPDK_BDEV_IO_STATUS_FAILED, parent_io->internal.caller_ctx); 1210 return; 1211 } 1212 1213 if (parent_io->u.bdev.split_remaining_num_blocks == 0) { 1214 parent_io->internal.status = SPDK_BDEV_IO_STATUS_SUCCESS; 1215 parent_io->internal.cb(parent_io, SPDK_BDEV_IO_STATUS_SUCCESS, parent_io->internal.caller_ctx); 1216 return; 1217 } 1218 1219 /* 1220 * Continue with the splitting process. This function will complete the parent I/O if the 1221 * splitting is done. 1222 */ 1223 _spdk_bdev_io_split_with_payload(parent_io); 1224 } 1225 1226 static void 1227 _spdk_bdev_io_split(struct spdk_bdev_io *bdev_io) 1228 { 1229 assert(_spdk_bdev_io_type_can_split(bdev_io->type)); 1230 1231 bdev_io->u.bdev.split_current_offset_blocks = bdev_io->u.bdev.offset_blocks; 1232 bdev_io->u.bdev.split_remaining_num_blocks = bdev_io->u.bdev.num_blocks; 1233 1234 _spdk_bdev_io_split_with_payload(bdev_io); 1235 } 1236 1237 static void 1238 _spdk_bdev_io_submit(void *ctx) 1239 { 1240 struct spdk_bdev_io *bdev_io = ctx; 1241 struct spdk_bdev *bdev = bdev_io->bdev; 1242 struct spdk_bdev_channel *bdev_ch = bdev_io->internal.ch; 1243 struct spdk_io_channel *ch = bdev_ch->channel; 1244 struct spdk_bdev_shared_resource *shared_resource = bdev_ch->shared_resource; 1245 uint64_t tsc; 1246 1247 tsc = spdk_get_ticks(); 1248 bdev_io->internal.submit_tsc = tsc; 1249 spdk_trace_record_tsc(tsc, TRACE_BDEV_IO_START, 0, 0, (uintptr_t)bdev_io, bdev_io->type); 1250 bdev_ch->io_outstanding++; 1251 shared_resource->io_outstanding++; 1252 bdev_io->internal.in_submit_request = true; 1253 if (spdk_likely(bdev_ch->flags == 0)) { 1254 if (spdk_likely(TAILQ_EMPTY(&shared_resource->nomem_io))) { 1255 bdev->fn_table->submit_request(ch, bdev_io); 1256 } else { 1257 bdev_ch->io_outstanding--; 1258 shared_resource->io_outstanding--; 1259 TAILQ_INSERT_TAIL(&shared_resource->nomem_io, bdev_io, internal.link); 1260 } 1261 } else if (bdev_ch->flags & BDEV_CH_RESET_IN_PROGRESS) { 1262 spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED); 1263 } else if (bdev_ch->flags & BDEV_CH_QOS_ENABLED) { 1264 bdev_ch->io_outstanding--; 1265 shared_resource->io_outstanding--; 1266 TAILQ_INSERT_TAIL(&bdev->internal.qos->queued, bdev_io, internal.link); 1267 _spdk_bdev_qos_io_submit(bdev_ch); 1268 } else { 1269 SPDK_ERRLOG("unknown bdev_ch flag %x found\n", bdev_ch->flags); 1270 spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED); 1271 } 1272 bdev_io->internal.in_submit_request = false; 1273 } 1274 1275 static void 1276 spdk_bdev_io_submit(struct spdk_bdev_io *bdev_io) 1277 { 1278 struct spdk_bdev *bdev = bdev_io->bdev; 1279 struct spdk_thread *thread = spdk_io_channel_get_thread(bdev_io->internal.ch->channel); 1280 1281 assert(thread != NULL); 1282 assert(bdev_io->internal.status == SPDK_BDEV_IO_STATUS_PENDING); 1283 1284 if (bdev->split_on_optimal_io_boundary && _spdk_bdev_io_should_split(bdev_io)) { 1285 _spdk_bdev_io_split(bdev_io); 1286 return; 1287 } 1288 1289 if (bdev_io->internal.ch->flags & BDEV_CH_QOS_ENABLED) { 1290 if ((thread == bdev->internal.qos->thread) || !bdev->internal.qos->thread) { 1291 _spdk_bdev_io_submit(bdev_io); 1292 } else { 1293 bdev_io->internal.io_submit_ch = bdev_io->internal.ch; 1294 bdev_io->internal.ch = bdev->internal.qos->ch; 1295 spdk_thread_send_msg(bdev->internal.qos->thread, _spdk_bdev_io_submit, bdev_io); 1296 } 1297 } else { 1298 _spdk_bdev_io_submit(bdev_io); 1299 } 1300 } 1301 1302 static void 1303 spdk_bdev_io_submit_reset(struct spdk_bdev_io *bdev_io) 1304 { 1305 struct spdk_bdev *bdev = bdev_io->bdev; 1306 struct spdk_bdev_channel *bdev_ch = bdev_io->internal.ch; 1307 struct spdk_io_channel *ch = bdev_ch->channel; 1308 1309 assert(bdev_io->internal.status == SPDK_BDEV_IO_STATUS_PENDING); 1310 1311 bdev_io->internal.in_submit_request = true; 1312 bdev->fn_table->submit_request(ch, bdev_io); 1313 bdev_io->internal.in_submit_request = false; 1314 } 1315 1316 static void 1317 spdk_bdev_io_init(struct spdk_bdev_io *bdev_io, 1318 struct spdk_bdev *bdev, void *cb_arg, 1319 spdk_bdev_io_completion_cb cb) 1320 { 1321 bdev_io->bdev = bdev; 1322 bdev_io->internal.caller_ctx = cb_arg; 1323 bdev_io->internal.cb = cb; 1324 bdev_io->internal.status = SPDK_BDEV_IO_STATUS_PENDING; 1325 bdev_io->internal.in_submit_request = false; 1326 bdev_io->internal.buf = NULL; 1327 bdev_io->internal.io_submit_ch = NULL; 1328 } 1329 1330 static bool 1331 _spdk_bdev_io_type_supported(struct spdk_bdev *bdev, enum spdk_bdev_io_type io_type) 1332 { 1333 return bdev->fn_table->io_type_supported(bdev->ctxt, io_type); 1334 } 1335 1336 bool 1337 spdk_bdev_io_type_supported(struct spdk_bdev *bdev, enum spdk_bdev_io_type io_type) 1338 { 1339 bool supported; 1340 1341 supported = _spdk_bdev_io_type_supported(bdev, io_type); 1342 1343 if (!supported) { 1344 switch (io_type) { 1345 case SPDK_BDEV_IO_TYPE_WRITE_ZEROES: 1346 /* The bdev layer will emulate write zeroes as long as write is supported. */ 1347 supported = _spdk_bdev_io_type_supported(bdev, SPDK_BDEV_IO_TYPE_WRITE); 1348 break; 1349 default: 1350 break; 1351 } 1352 } 1353 1354 return supported; 1355 } 1356 1357 int 1358 spdk_bdev_dump_info_json(struct spdk_bdev *bdev, struct spdk_json_write_ctx *w) 1359 { 1360 if (bdev->fn_table->dump_info_json) { 1361 return bdev->fn_table->dump_info_json(bdev->ctxt, w); 1362 } 1363 1364 return 0; 1365 } 1366 1367 void 1368 spdk_bdev_config_json(struct spdk_bdev *bdev, struct spdk_json_write_ctx *w) 1369 { 1370 assert(bdev != NULL); 1371 assert(w != NULL); 1372 1373 if (bdev->fn_table->write_config_json) { 1374 bdev->fn_table->write_config_json(bdev, w); 1375 } else { 1376 spdk_json_write_object_begin(w); 1377 spdk_json_write_named_string(w, "name", bdev->name); 1378 spdk_json_write_object_end(w); 1379 } 1380 } 1381 1382 static void 1383 spdk_bdev_qos_update_max_quota_per_timeslice(struct spdk_bdev_qos *qos) 1384 { 1385 uint64_t max_ios_per_timeslice = 0, max_byte_per_timeslice = 0; 1386 1387 if (qos->iops_rate_limit > 0) { 1388 max_ios_per_timeslice = qos->iops_rate_limit * SPDK_BDEV_QOS_TIMESLICE_IN_USEC / 1389 SPDK_SEC_TO_USEC; 1390 qos->max_ios_per_timeslice = spdk_max(max_ios_per_timeslice, 1391 SPDK_BDEV_QOS_MIN_IO_PER_TIMESLICE); 1392 } 1393 1394 if (qos->byte_rate_limit > 0) { 1395 max_byte_per_timeslice = qos->byte_rate_limit * SPDK_BDEV_QOS_TIMESLICE_IN_USEC / 1396 SPDK_SEC_TO_USEC; 1397 qos->max_byte_per_timeslice = spdk_max(max_byte_per_timeslice, 1398 SPDK_BDEV_QOS_MIN_BYTE_PER_TIMESLICE); 1399 } 1400 } 1401 1402 static int 1403 spdk_bdev_channel_poll_qos(void *arg) 1404 { 1405 struct spdk_bdev_qos *qos = arg; 1406 uint64_t now = spdk_get_ticks(); 1407 1408 if (now < (qos->last_timeslice + qos->timeslice_size)) { 1409 /* We received our callback earlier than expected - return 1410 * immediately and wait to do accounting until at least one 1411 * timeslice has actually expired. This should never happen 1412 * with a well-behaved timer implementation. 1413 */ 1414 return 0; 1415 } 1416 1417 /* Reset for next round of rate limiting */ 1418 qos->io_remaining_this_timeslice = 0; 1419 /* We may have allowed the bytes to slightly overrun in the last timeslice. 1420 * byte_remaining_this_timeslice is signed, so if it's negative here, we'll 1421 * account for the overrun so that the next timeslice will be appropriately 1422 * reduced. 1423 */ 1424 if (qos->byte_remaining_this_timeslice > 0) { 1425 qos->byte_remaining_this_timeslice = 0; 1426 } 1427 1428 while (now >= (qos->last_timeslice + qos->timeslice_size)) { 1429 qos->last_timeslice += qos->timeslice_size; 1430 qos->io_remaining_this_timeslice += qos->max_ios_per_timeslice; 1431 qos->byte_remaining_this_timeslice += qos->max_byte_per_timeslice; 1432 } 1433 1434 _spdk_bdev_qos_io_submit(qos->ch); 1435 1436 return -1; 1437 } 1438 1439 static void 1440 _spdk_bdev_channel_destroy_resource(struct spdk_bdev_channel *ch) 1441 { 1442 struct spdk_bdev_shared_resource *shared_resource; 1443 1444 if (!ch) { 1445 return; 1446 } 1447 1448 if (ch->channel) { 1449 spdk_put_io_channel(ch->channel); 1450 } 1451 1452 assert(ch->io_outstanding == 0); 1453 1454 shared_resource = ch->shared_resource; 1455 if (shared_resource) { 1456 assert(ch->io_outstanding == 0); 1457 assert(shared_resource->ref > 0); 1458 shared_resource->ref--; 1459 if (shared_resource->ref == 0) { 1460 assert(shared_resource->io_outstanding == 0); 1461 TAILQ_REMOVE(&shared_resource->mgmt_ch->shared_resources, shared_resource, link); 1462 spdk_put_io_channel(spdk_io_channel_from_ctx(shared_resource->mgmt_ch)); 1463 free(shared_resource); 1464 } 1465 } 1466 } 1467 1468 /* Caller must hold bdev->internal.mutex. */ 1469 static void 1470 _spdk_bdev_enable_qos(struct spdk_bdev *bdev, struct spdk_bdev_channel *ch) 1471 { 1472 struct spdk_bdev_qos *qos = bdev->internal.qos; 1473 1474 /* Rate limiting on this bdev enabled */ 1475 if (qos) { 1476 if (qos->ch == NULL) { 1477 struct spdk_io_channel *io_ch; 1478 1479 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Selecting channel %p as QoS channel for bdev %s on thread %p\n", ch, 1480 bdev->name, spdk_get_thread()); 1481 1482 /* No qos channel has been selected, so set one up */ 1483 1484 /* Take another reference to ch */ 1485 io_ch = spdk_get_io_channel(__bdev_to_io_dev(bdev)); 1486 qos->ch = ch; 1487 1488 qos->thread = spdk_io_channel_get_thread(io_ch); 1489 1490 TAILQ_INIT(&qos->queued); 1491 spdk_bdev_qos_update_max_quota_per_timeslice(qos); 1492 qos->io_remaining_this_timeslice = qos->max_ios_per_timeslice; 1493 qos->byte_remaining_this_timeslice = qos->max_byte_per_timeslice; 1494 qos->timeslice_size = 1495 SPDK_BDEV_QOS_TIMESLICE_IN_USEC * spdk_get_ticks_hz() / SPDK_SEC_TO_USEC; 1496 qos->last_timeslice = spdk_get_ticks(); 1497 qos->poller = spdk_poller_register(spdk_bdev_channel_poll_qos, 1498 qos, 1499 SPDK_BDEV_QOS_TIMESLICE_IN_USEC); 1500 } 1501 1502 ch->flags |= BDEV_CH_QOS_ENABLED; 1503 } 1504 } 1505 1506 static int 1507 spdk_bdev_channel_create(void *io_device, void *ctx_buf) 1508 { 1509 struct spdk_bdev *bdev = __bdev_from_io_dev(io_device); 1510 struct spdk_bdev_channel *ch = ctx_buf; 1511 struct spdk_io_channel *mgmt_io_ch; 1512 struct spdk_bdev_mgmt_channel *mgmt_ch; 1513 struct spdk_bdev_shared_resource *shared_resource; 1514 1515 ch->bdev = bdev; 1516 ch->channel = bdev->fn_table->get_io_channel(bdev->ctxt); 1517 if (!ch->channel) { 1518 return -1; 1519 } 1520 1521 mgmt_io_ch = spdk_get_io_channel(&g_bdev_mgr); 1522 if (!mgmt_io_ch) { 1523 return -1; 1524 } 1525 1526 mgmt_ch = spdk_io_channel_get_ctx(mgmt_io_ch); 1527 TAILQ_FOREACH(shared_resource, &mgmt_ch->shared_resources, link) { 1528 if (shared_resource->shared_ch == ch->channel) { 1529 spdk_put_io_channel(mgmt_io_ch); 1530 shared_resource->ref++; 1531 break; 1532 } 1533 } 1534 1535 if (shared_resource == NULL) { 1536 shared_resource = calloc(1, sizeof(*shared_resource)); 1537 if (shared_resource == NULL) { 1538 spdk_put_io_channel(mgmt_io_ch); 1539 return -1; 1540 } 1541 1542 shared_resource->mgmt_ch = mgmt_ch; 1543 shared_resource->io_outstanding = 0; 1544 TAILQ_INIT(&shared_resource->nomem_io); 1545 shared_resource->nomem_threshold = 0; 1546 shared_resource->shared_ch = ch->channel; 1547 shared_resource->ref = 1; 1548 TAILQ_INSERT_TAIL(&mgmt_ch->shared_resources, shared_resource, link); 1549 } 1550 1551 memset(&ch->stat, 0, sizeof(ch->stat)); 1552 ch->stat.ticks_rate = spdk_get_ticks_hz(); 1553 ch->io_outstanding = 0; 1554 TAILQ_INIT(&ch->queued_resets); 1555 ch->flags = 0; 1556 ch->shared_resource = shared_resource; 1557 1558 #ifdef SPDK_CONFIG_VTUNE 1559 { 1560 char *name; 1561 __itt_init_ittlib(NULL, 0); 1562 name = spdk_sprintf_alloc("spdk_bdev_%s_%p", ch->bdev->name, ch); 1563 if (!name) { 1564 _spdk_bdev_channel_destroy_resource(ch); 1565 return -1; 1566 } 1567 ch->handle = __itt_string_handle_create(name); 1568 free(name); 1569 ch->start_tsc = spdk_get_ticks(); 1570 ch->interval_tsc = spdk_get_ticks_hz() / 100; 1571 memset(&ch->prev_stat, 0, sizeof(ch->prev_stat)); 1572 } 1573 #endif 1574 1575 pthread_mutex_lock(&bdev->internal.mutex); 1576 _spdk_bdev_enable_qos(bdev, ch); 1577 pthread_mutex_unlock(&bdev->internal.mutex); 1578 1579 return 0; 1580 } 1581 1582 /* 1583 * Abort I/O that are waiting on a data buffer. These types of I/O are 1584 * linked using the spdk_bdev_io internal.buf_link TAILQ_ENTRY. 1585 */ 1586 static void 1587 _spdk_bdev_abort_buf_io(bdev_io_stailq_t *queue, struct spdk_bdev_channel *ch) 1588 { 1589 bdev_io_stailq_t tmp; 1590 struct spdk_bdev_io *bdev_io; 1591 1592 STAILQ_INIT(&tmp); 1593 1594 while (!STAILQ_EMPTY(queue)) { 1595 bdev_io = STAILQ_FIRST(queue); 1596 STAILQ_REMOVE_HEAD(queue, internal.buf_link); 1597 if (bdev_io->internal.ch == ch) { 1598 spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED); 1599 } else { 1600 STAILQ_INSERT_TAIL(&tmp, bdev_io, internal.buf_link); 1601 } 1602 } 1603 1604 STAILQ_SWAP(&tmp, queue, spdk_bdev_io); 1605 } 1606 1607 /* 1608 * Abort I/O that are queued waiting for submission. These types of I/O are 1609 * linked using the spdk_bdev_io link TAILQ_ENTRY. 1610 */ 1611 static void 1612 _spdk_bdev_abort_queued_io(bdev_io_tailq_t *queue, struct spdk_bdev_channel *ch) 1613 { 1614 struct spdk_bdev_io *bdev_io, *tmp; 1615 1616 TAILQ_FOREACH_SAFE(bdev_io, queue, internal.link, tmp) { 1617 if (bdev_io->internal.ch == ch) { 1618 TAILQ_REMOVE(queue, bdev_io, internal.link); 1619 /* 1620 * spdk_bdev_io_complete() assumes that the completed I/O had 1621 * been submitted to the bdev module. Since in this case it 1622 * hadn't, bump io_outstanding to account for the decrement 1623 * that spdk_bdev_io_complete() will do. 1624 */ 1625 if (bdev_io->type != SPDK_BDEV_IO_TYPE_RESET) { 1626 ch->io_outstanding++; 1627 ch->shared_resource->io_outstanding++; 1628 } 1629 spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED); 1630 } 1631 } 1632 } 1633 1634 static void 1635 spdk_bdev_qos_channel_destroy(void *cb_arg) 1636 { 1637 struct spdk_bdev_qos *qos = cb_arg; 1638 1639 spdk_put_io_channel(spdk_io_channel_from_ctx(qos->ch)); 1640 spdk_poller_unregister(&qos->poller); 1641 1642 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Free QoS %p.\n", qos); 1643 1644 free(qos); 1645 } 1646 1647 static int 1648 spdk_bdev_qos_destroy(struct spdk_bdev *bdev) 1649 { 1650 /* 1651 * Cleanly shutting down the QoS poller is tricky, because 1652 * during the asynchronous operation the user could open 1653 * a new descriptor and create a new channel, spawning 1654 * a new QoS poller. 1655 * 1656 * The strategy is to create a new QoS structure here and swap it 1657 * in. The shutdown path then continues to refer to the old one 1658 * until it completes and then releases it. 1659 */ 1660 struct spdk_bdev_qos *new_qos, *old_qos; 1661 1662 old_qos = bdev->internal.qos; 1663 1664 new_qos = calloc(1, sizeof(*new_qos)); 1665 if (!new_qos) { 1666 SPDK_ERRLOG("Unable to allocate memory to shut down QoS.\n"); 1667 return -ENOMEM; 1668 } 1669 1670 /* Copy the old QoS data into the newly allocated structure */ 1671 memcpy(new_qos, old_qos, sizeof(*new_qos)); 1672 1673 /* Zero out the key parts of the QoS structure */ 1674 new_qos->ch = NULL; 1675 new_qos->thread = NULL; 1676 new_qos->max_ios_per_timeslice = 0; 1677 new_qos->max_byte_per_timeslice = 0; 1678 new_qos->io_remaining_this_timeslice = 0; 1679 new_qos->byte_remaining_this_timeslice = 0; 1680 new_qos->poller = NULL; 1681 TAILQ_INIT(&new_qos->queued); 1682 1683 bdev->internal.qos = new_qos; 1684 1685 if (old_qos->thread == NULL) { 1686 free(old_qos); 1687 } else { 1688 spdk_thread_send_msg(old_qos->thread, spdk_bdev_qos_channel_destroy, 1689 old_qos); 1690 } 1691 1692 /* It is safe to continue with destroying the bdev even though the QoS channel hasn't 1693 * been destroyed yet. The destruction path will end up waiting for the final 1694 * channel to be put before it releases resources. */ 1695 1696 return 0; 1697 } 1698 1699 static void 1700 _spdk_bdev_io_stat_add(struct spdk_bdev_io_stat *total, struct spdk_bdev_io_stat *add) 1701 { 1702 total->bytes_read += add->bytes_read; 1703 total->num_read_ops += add->num_read_ops; 1704 total->bytes_written += add->bytes_written; 1705 total->num_write_ops += add->num_write_ops; 1706 total->read_latency_ticks += add->read_latency_ticks; 1707 total->write_latency_ticks += add->write_latency_ticks; 1708 } 1709 1710 static void 1711 spdk_bdev_channel_destroy(void *io_device, void *ctx_buf) 1712 { 1713 struct spdk_bdev_channel *ch = ctx_buf; 1714 struct spdk_bdev_mgmt_channel *mgmt_ch; 1715 struct spdk_bdev_shared_resource *shared_resource = ch->shared_resource; 1716 1717 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Destroying channel %p for bdev %s on thread %p\n", ch, ch->bdev->name, 1718 spdk_get_thread()); 1719 1720 /* This channel is going away, so add its statistics into the bdev so that they don't get lost. */ 1721 pthread_mutex_lock(&ch->bdev->internal.mutex); 1722 _spdk_bdev_io_stat_add(&ch->bdev->internal.stat, &ch->stat); 1723 pthread_mutex_unlock(&ch->bdev->internal.mutex); 1724 1725 mgmt_ch = shared_resource->mgmt_ch; 1726 1727 _spdk_bdev_abort_queued_io(&ch->queued_resets, ch); 1728 _spdk_bdev_abort_queued_io(&shared_resource->nomem_io, ch); 1729 _spdk_bdev_abort_buf_io(&mgmt_ch->need_buf_small, ch); 1730 _spdk_bdev_abort_buf_io(&mgmt_ch->need_buf_large, ch); 1731 1732 _spdk_bdev_channel_destroy_resource(ch); 1733 } 1734 1735 int 1736 spdk_bdev_alias_add(struct spdk_bdev *bdev, const char *alias) 1737 { 1738 struct spdk_bdev_alias *tmp; 1739 1740 if (alias == NULL) { 1741 SPDK_ERRLOG("Empty alias passed\n"); 1742 return -EINVAL; 1743 } 1744 1745 if (spdk_bdev_get_by_name(alias)) { 1746 SPDK_ERRLOG("Bdev name/alias: %s already exists\n", alias); 1747 return -EEXIST; 1748 } 1749 1750 tmp = calloc(1, sizeof(*tmp)); 1751 if (tmp == NULL) { 1752 SPDK_ERRLOG("Unable to allocate alias\n"); 1753 return -ENOMEM; 1754 } 1755 1756 tmp->alias = strdup(alias); 1757 if (tmp->alias == NULL) { 1758 free(tmp); 1759 SPDK_ERRLOG("Unable to allocate alias\n"); 1760 return -ENOMEM; 1761 } 1762 1763 TAILQ_INSERT_TAIL(&bdev->aliases, tmp, tailq); 1764 1765 return 0; 1766 } 1767 1768 int 1769 spdk_bdev_alias_del(struct spdk_bdev *bdev, const char *alias) 1770 { 1771 struct spdk_bdev_alias *tmp; 1772 1773 TAILQ_FOREACH(tmp, &bdev->aliases, tailq) { 1774 if (strcmp(alias, tmp->alias) == 0) { 1775 TAILQ_REMOVE(&bdev->aliases, tmp, tailq); 1776 free(tmp->alias); 1777 free(tmp); 1778 return 0; 1779 } 1780 } 1781 1782 SPDK_INFOLOG(SPDK_LOG_BDEV, "Alias %s does not exists\n", alias); 1783 1784 return -ENOENT; 1785 } 1786 1787 void 1788 spdk_bdev_alias_del_all(struct spdk_bdev *bdev) 1789 { 1790 struct spdk_bdev_alias *p, *tmp; 1791 1792 TAILQ_FOREACH_SAFE(p, &bdev->aliases, tailq, tmp) { 1793 TAILQ_REMOVE(&bdev->aliases, p, tailq); 1794 free(p->alias); 1795 free(p); 1796 } 1797 } 1798 1799 struct spdk_io_channel * 1800 spdk_bdev_get_io_channel(struct spdk_bdev_desc *desc) 1801 { 1802 return spdk_get_io_channel(__bdev_to_io_dev(desc->bdev)); 1803 } 1804 1805 const char * 1806 spdk_bdev_get_name(const struct spdk_bdev *bdev) 1807 { 1808 return bdev->name; 1809 } 1810 1811 const char * 1812 spdk_bdev_get_product_name(const struct spdk_bdev *bdev) 1813 { 1814 return bdev->product_name; 1815 } 1816 1817 const struct spdk_bdev_aliases_list * 1818 spdk_bdev_get_aliases(const struct spdk_bdev *bdev) 1819 { 1820 return &bdev->aliases; 1821 } 1822 1823 uint32_t 1824 spdk_bdev_get_block_size(const struct spdk_bdev *bdev) 1825 { 1826 return bdev->blocklen; 1827 } 1828 1829 uint64_t 1830 spdk_bdev_get_num_blocks(const struct spdk_bdev *bdev) 1831 { 1832 return bdev->blockcnt; 1833 } 1834 1835 uint64_t 1836 spdk_bdev_get_qos_ios_per_sec(struct spdk_bdev *bdev) 1837 { 1838 uint64_t iops_rate_limit = 0; 1839 1840 pthread_mutex_lock(&bdev->internal.mutex); 1841 if (bdev->internal.qos) { 1842 iops_rate_limit = bdev->internal.qos->iops_rate_limit; 1843 } 1844 pthread_mutex_unlock(&bdev->internal.mutex); 1845 1846 return iops_rate_limit; 1847 } 1848 1849 size_t 1850 spdk_bdev_get_buf_align(const struct spdk_bdev *bdev) 1851 { 1852 /* TODO: push this logic down to the bdev modules */ 1853 if (bdev->need_aligned_buffer) { 1854 return bdev->blocklen; 1855 } 1856 1857 return 1; 1858 } 1859 1860 uint32_t 1861 spdk_bdev_get_optimal_io_boundary(const struct spdk_bdev *bdev) 1862 { 1863 return bdev->optimal_io_boundary; 1864 } 1865 1866 bool 1867 spdk_bdev_has_write_cache(const struct spdk_bdev *bdev) 1868 { 1869 return bdev->write_cache; 1870 } 1871 1872 const struct spdk_uuid * 1873 spdk_bdev_get_uuid(const struct spdk_bdev *bdev) 1874 { 1875 return &bdev->uuid; 1876 } 1877 1878 uint64_t 1879 spdk_bdev_get_qd(const struct spdk_bdev *bdev) 1880 { 1881 return bdev->internal.measured_queue_depth; 1882 } 1883 1884 uint64_t 1885 spdk_bdev_get_qd_sampling_period(const struct spdk_bdev *bdev) 1886 { 1887 return bdev->internal.period; 1888 } 1889 1890 uint64_t 1891 spdk_bdev_get_weighted_io_time(const struct spdk_bdev *bdev) 1892 { 1893 return bdev->internal.weighted_io_time; 1894 } 1895 1896 uint64_t 1897 spdk_bdev_get_io_time(const struct spdk_bdev *bdev) 1898 { 1899 return bdev->internal.io_time; 1900 } 1901 1902 static void 1903 _calculate_measured_qd_cpl(struct spdk_io_channel_iter *i, int status) 1904 { 1905 struct spdk_bdev *bdev = spdk_io_channel_iter_get_ctx(i); 1906 1907 bdev->internal.measured_queue_depth = bdev->internal.temporary_queue_depth; 1908 1909 if (bdev->internal.measured_queue_depth) { 1910 bdev->internal.io_time += bdev->internal.period; 1911 bdev->internal.weighted_io_time += bdev->internal.period * bdev->internal.measured_queue_depth; 1912 } 1913 } 1914 1915 static void 1916 _calculate_measured_qd(struct spdk_io_channel_iter *i) 1917 { 1918 struct spdk_bdev *bdev = spdk_io_channel_iter_get_ctx(i); 1919 struct spdk_io_channel *io_ch = spdk_io_channel_iter_get_channel(i); 1920 struct spdk_bdev_channel *ch = spdk_io_channel_get_ctx(io_ch); 1921 1922 bdev->internal.temporary_queue_depth += ch->io_outstanding; 1923 spdk_for_each_channel_continue(i, 0); 1924 } 1925 1926 static int 1927 spdk_bdev_calculate_measured_queue_depth(void *ctx) 1928 { 1929 struct spdk_bdev *bdev = ctx; 1930 bdev->internal.temporary_queue_depth = 0; 1931 spdk_for_each_channel(__bdev_to_io_dev(bdev), _calculate_measured_qd, bdev, 1932 _calculate_measured_qd_cpl); 1933 return 0; 1934 } 1935 1936 void 1937 spdk_bdev_set_qd_sampling_period(struct spdk_bdev *bdev, uint64_t period) 1938 { 1939 bdev->internal.period = period; 1940 1941 if (bdev->internal.qd_poller != NULL) { 1942 spdk_poller_unregister(&bdev->internal.qd_poller); 1943 bdev->internal.measured_queue_depth = UINT64_MAX; 1944 } 1945 1946 if (period != 0) { 1947 bdev->internal.qd_poller = spdk_poller_register(spdk_bdev_calculate_measured_queue_depth, bdev, 1948 period); 1949 } 1950 } 1951 1952 int 1953 spdk_bdev_notify_blockcnt_change(struct spdk_bdev *bdev, uint64_t size) 1954 { 1955 int ret; 1956 1957 pthread_mutex_lock(&bdev->internal.mutex); 1958 1959 /* bdev has open descriptors */ 1960 if (!TAILQ_EMPTY(&bdev->internal.open_descs) && 1961 bdev->blockcnt > size) { 1962 ret = -EBUSY; 1963 } else { 1964 bdev->blockcnt = size; 1965 ret = 0; 1966 } 1967 1968 pthread_mutex_unlock(&bdev->internal.mutex); 1969 1970 return ret; 1971 } 1972 1973 /* 1974 * Convert I/O offset and length from bytes to blocks. 1975 * 1976 * Returns zero on success or non-zero if the byte parameters aren't divisible by the block size. 1977 */ 1978 static uint64_t 1979 spdk_bdev_bytes_to_blocks(struct spdk_bdev *bdev, uint64_t offset_bytes, uint64_t *offset_blocks, 1980 uint64_t num_bytes, uint64_t *num_blocks) 1981 { 1982 uint32_t block_size = bdev->blocklen; 1983 1984 *offset_blocks = offset_bytes / block_size; 1985 *num_blocks = num_bytes / block_size; 1986 1987 return (offset_bytes % block_size) | (num_bytes % block_size); 1988 } 1989 1990 static bool 1991 spdk_bdev_io_valid_blocks(struct spdk_bdev *bdev, uint64_t offset_blocks, uint64_t num_blocks) 1992 { 1993 /* Return failure if offset_blocks + num_blocks is less than offset_blocks; indicates there 1994 * has been an overflow and hence the offset has been wrapped around */ 1995 if (offset_blocks + num_blocks < offset_blocks) { 1996 return false; 1997 } 1998 1999 /* Return failure if offset_blocks + num_blocks exceeds the size of the bdev */ 2000 if (offset_blocks + num_blocks > bdev->blockcnt) { 2001 return false; 2002 } 2003 2004 return true; 2005 } 2006 2007 int 2008 spdk_bdev_read(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 2009 void *buf, uint64_t offset, uint64_t nbytes, 2010 spdk_bdev_io_completion_cb cb, void *cb_arg) 2011 { 2012 uint64_t offset_blocks, num_blocks; 2013 2014 if (spdk_bdev_bytes_to_blocks(desc->bdev, offset, &offset_blocks, nbytes, &num_blocks) != 0) { 2015 return -EINVAL; 2016 } 2017 2018 return spdk_bdev_read_blocks(desc, ch, buf, offset_blocks, num_blocks, cb, cb_arg); 2019 } 2020 2021 int 2022 spdk_bdev_read_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 2023 void *buf, uint64_t offset_blocks, uint64_t num_blocks, 2024 spdk_bdev_io_completion_cb cb, void *cb_arg) 2025 { 2026 struct spdk_bdev *bdev = desc->bdev; 2027 struct spdk_bdev_io *bdev_io; 2028 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 2029 2030 if (!spdk_bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) { 2031 return -EINVAL; 2032 } 2033 2034 bdev_io = spdk_bdev_get_io(channel); 2035 if (!bdev_io) { 2036 return -ENOMEM; 2037 } 2038 2039 bdev_io->internal.ch = channel; 2040 bdev_io->internal.desc = desc; 2041 bdev_io->type = SPDK_BDEV_IO_TYPE_READ; 2042 bdev_io->u.bdev.iovs = &bdev_io->iov; 2043 bdev_io->u.bdev.iovs[0].iov_base = buf; 2044 bdev_io->u.bdev.iovs[0].iov_len = num_blocks * bdev->blocklen; 2045 bdev_io->u.bdev.iovcnt = 1; 2046 bdev_io->u.bdev.num_blocks = num_blocks; 2047 bdev_io->u.bdev.offset_blocks = offset_blocks; 2048 spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb); 2049 2050 spdk_bdev_io_submit(bdev_io); 2051 return 0; 2052 } 2053 2054 int 2055 spdk_bdev_readv(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 2056 struct iovec *iov, int iovcnt, 2057 uint64_t offset, uint64_t nbytes, 2058 spdk_bdev_io_completion_cb cb, void *cb_arg) 2059 { 2060 uint64_t offset_blocks, num_blocks; 2061 2062 if (spdk_bdev_bytes_to_blocks(desc->bdev, offset, &offset_blocks, nbytes, &num_blocks) != 0) { 2063 return -EINVAL; 2064 } 2065 2066 return spdk_bdev_readv_blocks(desc, ch, iov, iovcnt, offset_blocks, num_blocks, cb, cb_arg); 2067 } 2068 2069 int spdk_bdev_readv_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 2070 struct iovec *iov, int iovcnt, 2071 uint64_t offset_blocks, uint64_t num_blocks, 2072 spdk_bdev_io_completion_cb cb, void *cb_arg) 2073 { 2074 struct spdk_bdev *bdev = desc->bdev; 2075 struct spdk_bdev_io *bdev_io; 2076 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 2077 2078 if (!spdk_bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) { 2079 return -EINVAL; 2080 } 2081 2082 bdev_io = spdk_bdev_get_io(channel); 2083 if (!bdev_io) { 2084 return -ENOMEM; 2085 } 2086 2087 bdev_io->internal.ch = channel; 2088 bdev_io->internal.desc = desc; 2089 bdev_io->type = SPDK_BDEV_IO_TYPE_READ; 2090 bdev_io->u.bdev.iovs = iov; 2091 bdev_io->u.bdev.iovcnt = iovcnt; 2092 bdev_io->u.bdev.num_blocks = num_blocks; 2093 bdev_io->u.bdev.offset_blocks = offset_blocks; 2094 spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb); 2095 2096 spdk_bdev_io_submit(bdev_io); 2097 return 0; 2098 } 2099 2100 int 2101 spdk_bdev_write(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 2102 void *buf, uint64_t offset, uint64_t nbytes, 2103 spdk_bdev_io_completion_cb cb, void *cb_arg) 2104 { 2105 uint64_t offset_blocks, num_blocks; 2106 2107 if (spdk_bdev_bytes_to_blocks(desc->bdev, offset, &offset_blocks, nbytes, &num_blocks) != 0) { 2108 return -EINVAL; 2109 } 2110 2111 return spdk_bdev_write_blocks(desc, ch, buf, offset_blocks, num_blocks, cb, cb_arg); 2112 } 2113 2114 int 2115 spdk_bdev_write_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 2116 void *buf, uint64_t offset_blocks, uint64_t num_blocks, 2117 spdk_bdev_io_completion_cb cb, void *cb_arg) 2118 { 2119 struct spdk_bdev *bdev = desc->bdev; 2120 struct spdk_bdev_io *bdev_io; 2121 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 2122 2123 if (!desc->write) { 2124 return -EBADF; 2125 } 2126 2127 if (!spdk_bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) { 2128 return -EINVAL; 2129 } 2130 2131 bdev_io = spdk_bdev_get_io(channel); 2132 if (!bdev_io) { 2133 return -ENOMEM; 2134 } 2135 2136 bdev_io->internal.ch = channel; 2137 bdev_io->internal.desc = desc; 2138 bdev_io->type = SPDK_BDEV_IO_TYPE_WRITE; 2139 bdev_io->u.bdev.iovs = &bdev_io->iov; 2140 bdev_io->u.bdev.iovs[0].iov_base = buf; 2141 bdev_io->u.bdev.iovs[0].iov_len = num_blocks * bdev->blocklen; 2142 bdev_io->u.bdev.iovcnt = 1; 2143 bdev_io->u.bdev.num_blocks = num_blocks; 2144 bdev_io->u.bdev.offset_blocks = offset_blocks; 2145 spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb); 2146 2147 spdk_bdev_io_submit(bdev_io); 2148 return 0; 2149 } 2150 2151 int 2152 spdk_bdev_writev(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 2153 struct iovec *iov, int iovcnt, 2154 uint64_t offset, uint64_t len, 2155 spdk_bdev_io_completion_cb cb, void *cb_arg) 2156 { 2157 uint64_t offset_blocks, num_blocks; 2158 2159 if (spdk_bdev_bytes_to_blocks(desc->bdev, offset, &offset_blocks, len, &num_blocks) != 0) { 2160 return -EINVAL; 2161 } 2162 2163 return spdk_bdev_writev_blocks(desc, ch, iov, iovcnt, offset_blocks, num_blocks, cb, cb_arg); 2164 } 2165 2166 int 2167 spdk_bdev_writev_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 2168 struct iovec *iov, int iovcnt, 2169 uint64_t offset_blocks, uint64_t num_blocks, 2170 spdk_bdev_io_completion_cb cb, void *cb_arg) 2171 { 2172 struct spdk_bdev *bdev = desc->bdev; 2173 struct spdk_bdev_io *bdev_io; 2174 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 2175 2176 if (!desc->write) { 2177 return -EBADF; 2178 } 2179 2180 if (!spdk_bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) { 2181 return -EINVAL; 2182 } 2183 2184 bdev_io = spdk_bdev_get_io(channel); 2185 if (!bdev_io) { 2186 return -ENOMEM; 2187 } 2188 2189 bdev_io->internal.ch = channel; 2190 bdev_io->internal.desc = desc; 2191 bdev_io->type = SPDK_BDEV_IO_TYPE_WRITE; 2192 bdev_io->u.bdev.iovs = iov; 2193 bdev_io->u.bdev.iovcnt = iovcnt; 2194 bdev_io->u.bdev.num_blocks = num_blocks; 2195 bdev_io->u.bdev.offset_blocks = offset_blocks; 2196 spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb); 2197 2198 spdk_bdev_io_submit(bdev_io); 2199 return 0; 2200 } 2201 2202 int 2203 spdk_bdev_write_zeroes(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 2204 uint64_t offset, uint64_t len, 2205 spdk_bdev_io_completion_cb cb, void *cb_arg) 2206 { 2207 uint64_t offset_blocks, num_blocks; 2208 2209 if (spdk_bdev_bytes_to_blocks(desc->bdev, offset, &offset_blocks, len, &num_blocks) != 0) { 2210 return -EINVAL; 2211 } 2212 2213 return spdk_bdev_write_zeroes_blocks(desc, ch, offset_blocks, num_blocks, cb, cb_arg); 2214 } 2215 2216 int 2217 spdk_bdev_write_zeroes_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 2218 uint64_t offset_blocks, uint64_t num_blocks, 2219 spdk_bdev_io_completion_cb cb, void *cb_arg) 2220 { 2221 struct spdk_bdev *bdev = desc->bdev; 2222 struct spdk_bdev_io *bdev_io; 2223 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 2224 2225 if (!desc->write) { 2226 return -EBADF; 2227 } 2228 2229 if (!spdk_bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) { 2230 return -EINVAL; 2231 } 2232 2233 bdev_io = spdk_bdev_get_io(channel); 2234 2235 if (!bdev_io) { 2236 return -ENOMEM; 2237 } 2238 2239 bdev_io->type = SPDK_BDEV_IO_TYPE_WRITE_ZEROES; 2240 bdev_io->internal.ch = channel; 2241 bdev_io->internal.desc = desc; 2242 bdev_io->u.bdev.offset_blocks = offset_blocks; 2243 bdev_io->u.bdev.num_blocks = num_blocks; 2244 spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb); 2245 2246 if (_spdk_bdev_io_type_supported(bdev, SPDK_BDEV_IO_TYPE_WRITE_ZEROES)) { 2247 spdk_bdev_io_submit(bdev_io); 2248 return 0; 2249 } else if (_spdk_bdev_io_type_supported(bdev, SPDK_BDEV_IO_TYPE_WRITE)) { 2250 assert(spdk_bdev_get_block_size(bdev) <= ZERO_BUFFER_SIZE); 2251 bdev_io->u.bdev.split_remaining_num_blocks = num_blocks; 2252 bdev_io->u.bdev.split_current_offset_blocks = offset_blocks; 2253 _spdk_bdev_write_zero_buffer_next(bdev_io); 2254 return 0; 2255 } else { 2256 spdk_bdev_free_io(bdev_io); 2257 return -ENOTSUP; 2258 } 2259 } 2260 2261 int 2262 spdk_bdev_unmap(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 2263 uint64_t offset, uint64_t nbytes, 2264 spdk_bdev_io_completion_cb cb, void *cb_arg) 2265 { 2266 uint64_t offset_blocks, num_blocks; 2267 2268 if (spdk_bdev_bytes_to_blocks(desc->bdev, offset, &offset_blocks, nbytes, &num_blocks) != 0) { 2269 return -EINVAL; 2270 } 2271 2272 return spdk_bdev_unmap_blocks(desc, ch, offset_blocks, num_blocks, cb, cb_arg); 2273 } 2274 2275 int 2276 spdk_bdev_unmap_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 2277 uint64_t offset_blocks, uint64_t num_blocks, 2278 spdk_bdev_io_completion_cb cb, void *cb_arg) 2279 { 2280 struct spdk_bdev *bdev = desc->bdev; 2281 struct spdk_bdev_io *bdev_io; 2282 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 2283 2284 if (!desc->write) { 2285 return -EBADF; 2286 } 2287 2288 if (!spdk_bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) { 2289 return -EINVAL; 2290 } 2291 2292 if (num_blocks == 0) { 2293 SPDK_ERRLOG("Can't unmap 0 bytes\n"); 2294 return -EINVAL; 2295 } 2296 2297 bdev_io = spdk_bdev_get_io(channel); 2298 if (!bdev_io) { 2299 return -ENOMEM; 2300 } 2301 2302 bdev_io->internal.ch = channel; 2303 bdev_io->internal.desc = desc; 2304 bdev_io->type = SPDK_BDEV_IO_TYPE_UNMAP; 2305 2306 bdev_io->u.bdev.iovs = &bdev_io->iov; 2307 bdev_io->u.bdev.iovs[0].iov_base = NULL; 2308 bdev_io->u.bdev.iovs[0].iov_len = 0; 2309 bdev_io->u.bdev.iovcnt = 1; 2310 2311 bdev_io->u.bdev.offset_blocks = offset_blocks; 2312 bdev_io->u.bdev.num_blocks = num_blocks; 2313 spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb); 2314 2315 spdk_bdev_io_submit(bdev_io); 2316 return 0; 2317 } 2318 2319 int 2320 spdk_bdev_flush(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 2321 uint64_t offset, uint64_t length, 2322 spdk_bdev_io_completion_cb cb, void *cb_arg) 2323 { 2324 uint64_t offset_blocks, num_blocks; 2325 2326 if (spdk_bdev_bytes_to_blocks(desc->bdev, offset, &offset_blocks, length, &num_blocks) != 0) { 2327 return -EINVAL; 2328 } 2329 2330 return spdk_bdev_flush_blocks(desc, ch, offset_blocks, num_blocks, cb, cb_arg); 2331 } 2332 2333 int 2334 spdk_bdev_flush_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 2335 uint64_t offset_blocks, uint64_t num_blocks, 2336 spdk_bdev_io_completion_cb cb, void *cb_arg) 2337 { 2338 struct spdk_bdev *bdev = desc->bdev; 2339 struct spdk_bdev_io *bdev_io; 2340 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 2341 2342 if (!desc->write) { 2343 return -EBADF; 2344 } 2345 2346 if (!spdk_bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) { 2347 return -EINVAL; 2348 } 2349 2350 bdev_io = spdk_bdev_get_io(channel); 2351 if (!bdev_io) { 2352 return -ENOMEM; 2353 } 2354 2355 bdev_io->internal.ch = channel; 2356 bdev_io->internal.desc = desc; 2357 bdev_io->type = SPDK_BDEV_IO_TYPE_FLUSH; 2358 bdev_io->u.bdev.iovs = NULL; 2359 bdev_io->u.bdev.iovcnt = 0; 2360 bdev_io->u.bdev.offset_blocks = offset_blocks; 2361 bdev_io->u.bdev.num_blocks = num_blocks; 2362 spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb); 2363 2364 spdk_bdev_io_submit(bdev_io); 2365 return 0; 2366 } 2367 2368 static void 2369 _spdk_bdev_reset_dev(struct spdk_io_channel_iter *i, int status) 2370 { 2371 struct spdk_bdev_channel *ch = spdk_io_channel_iter_get_ctx(i); 2372 struct spdk_bdev_io *bdev_io; 2373 2374 bdev_io = TAILQ_FIRST(&ch->queued_resets); 2375 TAILQ_REMOVE(&ch->queued_resets, bdev_io, internal.link); 2376 spdk_bdev_io_submit_reset(bdev_io); 2377 } 2378 2379 static void 2380 _spdk_bdev_reset_freeze_channel(struct spdk_io_channel_iter *i) 2381 { 2382 struct spdk_io_channel *ch; 2383 struct spdk_bdev_channel *channel; 2384 struct spdk_bdev_mgmt_channel *mgmt_channel; 2385 struct spdk_bdev_shared_resource *shared_resource; 2386 bdev_io_tailq_t tmp_queued; 2387 2388 TAILQ_INIT(&tmp_queued); 2389 2390 ch = spdk_io_channel_iter_get_channel(i); 2391 channel = spdk_io_channel_get_ctx(ch); 2392 shared_resource = channel->shared_resource; 2393 mgmt_channel = shared_resource->mgmt_ch; 2394 2395 channel->flags |= BDEV_CH_RESET_IN_PROGRESS; 2396 2397 if ((channel->flags & BDEV_CH_QOS_ENABLED) != 0) { 2398 /* The QoS object is always valid and readable while 2399 * the channel flag is set, so the lock here should not 2400 * be necessary. We're not in the fast path though, so 2401 * just take it anyway. */ 2402 pthread_mutex_lock(&channel->bdev->internal.mutex); 2403 if (channel->bdev->internal.qos->ch == channel) { 2404 TAILQ_SWAP(&channel->bdev->internal.qos->queued, &tmp_queued, spdk_bdev_io, internal.link); 2405 } 2406 pthread_mutex_unlock(&channel->bdev->internal.mutex); 2407 } 2408 2409 _spdk_bdev_abort_queued_io(&shared_resource->nomem_io, channel); 2410 _spdk_bdev_abort_buf_io(&mgmt_channel->need_buf_small, channel); 2411 _spdk_bdev_abort_buf_io(&mgmt_channel->need_buf_large, channel); 2412 _spdk_bdev_abort_queued_io(&tmp_queued, channel); 2413 2414 spdk_for_each_channel_continue(i, 0); 2415 } 2416 2417 static void 2418 _spdk_bdev_start_reset(void *ctx) 2419 { 2420 struct spdk_bdev_channel *ch = ctx; 2421 2422 spdk_for_each_channel(__bdev_to_io_dev(ch->bdev), _spdk_bdev_reset_freeze_channel, 2423 ch, _spdk_bdev_reset_dev); 2424 } 2425 2426 static void 2427 _spdk_bdev_channel_start_reset(struct spdk_bdev_channel *ch) 2428 { 2429 struct spdk_bdev *bdev = ch->bdev; 2430 2431 assert(!TAILQ_EMPTY(&ch->queued_resets)); 2432 2433 pthread_mutex_lock(&bdev->internal.mutex); 2434 if (bdev->internal.reset_in_progress == NULL) { 2435 bdev->internal.reset_in_progress = TAILQ_FIRST(&ch->queued_resets); 2436 /* 2437 * Take a channel reference for the target bdev for the life of this 2438 * reset. This guards against the channel getting destroyed while 2439 * spdk_for_each_channel() calls related to this reset IO are in 2440 * progress. We will release the reference when this reset is 2441 * completed. 2442 */ 2443 bdev->internal.reset_in_progress->u.reset.ch_ref = spdk_get_io_channel(__bdev_to_io_dev(bdev)); 2444 _spdk_bdev_start_reset(ch); 2445 } 2446 pthread_mutex_unlock(&bdev->internal.mutex); 2447 } 2448 2449 int 2450 spdk_bdev_reset(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 2451 spdk_bdev_io_completion_cb cb, void *cb_arg) 2452 { 2453 struct spdk_bdev *bdev = desc->bdev; 2454 struct spdk_bdev_io *bdev_io; 2455 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 2456 2457 bdev_io = spdk_bdev_get_io(channel); 2458 if (!bdev_io) { 2459 return -ENOMEM; 2460 } 2461 2462 bdev_io->internal.ch = channel; 2463 bdev_io->internal.desc = desc; 2464 bdev_io->type = SPDK_BDEV_IO_TYPE_RESET; 2465 bdev_io->u.reset.ch_ref = NULL; 2466 spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb); 2467 2468 pthread_mutex_lock(&bdev->internal.mutex); 2469 TAILQ_INSERT_TAIL(&channel->queued_resets, bdev_io, internal.link); 2470 pthread_mutex_unlock(&bdev->internal.mutex); 2471 2472 _spdk_bdev_channel_start_reset(channel); 2473 2474 return 0; 2475 } 2476 2477 void 2478 spdk_bdev_get_io_stat(struct spdk_bdev *bdev, struct spdk_io_channel *ch, 2479 struct spdk_bdev_io_stat *stat) 2480 { 2481 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 2482 2483 *stat = channel->stat; 2484 } 2485 2486 static void 2487 _spdk_bdev_get_device_stat_done(struct spdk_io_channel_iter *i, int status) 2488 { 2489 void *io_device = spdk_io_channel_iter_get_io_device(i); 2490 struct spdk_bdev_iostat_ctx *bdev_iostat_ctx = spdk_io_channel_iter_get_ctx(i); 2491 2492 bdev_iostat_ctx->cb(__bdev_from_io_dev(io_device), bdev_iostat_ctx->stat, 2493 bdev_iostat_ctx->cb_arg, 0); 2494 free(bdev_iostat_ctx); 2495 } 2496 2497 static void 2498 _spdk_bdev_get_each_channel_stat(struct spdk_io_channel_iter *i) 2499 { 2500 struct spdk_bdev_iostat_ctx *bdev_iostat_ctx = spdk_io_channel_iter_get_ctx(i); 2501 struct spdk_io_channel *ch = spdk_io_channel_iter_get_channel(i); 2502 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 2503 2504 _spdk_bdev_io_stat_add(bdev_iostat_ctx->stat, &channel->stat); 2505 spdk_for_each_channel_continue(i, 0); 2506 } 2507 2508 void 2509 spdk_bdev_get_device_stat(struct spdk_bdev *bdev, struct spdk_bdev_io_stat *stat, 2510 spdk_bdev_get_device_stat_cb cb, void *cb_arg) 2511 { 2512 struct spdk_bdev_iostat_ctx *bdev_iostat_ctx; 2513 2514 assert(bdev != NULL); 2515 assert(stat != NULL); 2516 assert(cb != NULL); 2517 2518 bdev_iostat_ctx = calloc(1, sizeof(struct spdk_bdev_iostat_ctx)); 2519 if (bdev_iostat_ctx == NULL) { 2520 SPDK_ERRLOG("Unable to allocate memory for spdk_bdev_iostat_ctx\n"); 2521 cb(bdev, stat, cb_arg, -ENOMEM); 2522 return; 2523 } 2524 2525 bdev_iostat_ctx->stat = stat; 2526 bdev_iostat_ctx->cb = cb; 2527 bdev_iostat_ctx->cb_arg = cb_arg; 2528 2529 /* Start with the statistics from previously deleted channels. */ 2530 pthread_mutex_lock(&bdev->internal.mutex); 2531 _spdk_bdev_io_stat_add(bdev_iostat_ctx->stat, &bdev->internal.stat); 2532 pthread_mutex_unlock(&bdev->internal.mutex); 2533 2534 /* Then iterate and add the statistics from each existing channel. */ 2535 spdk_for_each_channel(__bdev_to_io_dev(bdev), 2536 _spdk_bdev_get_each_channel_stat, 2537 bdev_iostat_ctx, 2538 _spdk_bdev_get_device_stat_done); 2539 } 2540 2541 int 2542 spdk_bdev_nvme_admin_passthru(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 2543 const struct spdk_nvme_cmd *cmd, void *buf, size_t nbytes, 2544 spdk_bdev_io_completion_cb cb, void *cb_arg) 2545 { 2546 struct spdk_bdev *bdev = desc->bdev; 2547 struct spdk_bdev_io *bdev_io; 2548 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 2549 2550 if (!desc->write) { 2551 return -EBADF; 2552 } 2553 2554 bdev_io = spdk_bdev_get_io(channel); 2555 if (!bdev_io) { 2556 return -ENOMEM; 2557 } 2558 2559 bdev_io->internal.ch = channel; 2560 bdev_io->internal.desc = desc; 2561 bdev_io->type = SPDK_BDEV_IO_TYPE_NVME_ADMIN; 2562 bdev_io->u.nvme_passthru.cmd = *cmd; 2563 bdev_io->u.nvme_passthru.buf = buf; 2564 bdev_io->u.nvme_passthru.nbytes = nbytes; 2565 bdev_io->u.nvme_passthru.md_buf = NULL; 2566 bdev_io->u.nvme_passthru.md_len = 0; 2567 2568 spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb); 2569 2570 spdk_bdev_io_submit(bdev_io); 2571 return 0; 2572 } 2573 2574 int 2575 spdk_bdev_nvme_io_passthru(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 2576 const struct spdk_nvme_cmd *cmd, void *buf, size_t nbytes, 2577 spdk_bdev_io_completion_cb cb, void *cb_arg) 2578 { 2579 struct spdk_bdev *bdev = desc->bdev; 2580 struct spdk_bdev_io *bdev_io; 2581 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 2582 2583 if (!desc->write) { 2584 /* 2585 * Do not try to parse the NVMe command - we could maybe use bits in the opcode 2586 * to easily determine if the command is a read or write, but for now just 2587 * do not allow io_passthru with a read-only descriptor. 2588 */ 2589 return -EBADF; 2590 } 2591 2592 bdev_io = spdk_bdev_get_io(channel); 2593 if (!bdev_io) { 2594 return -ENOMEM; 2595 } 2596 2597 bdev_io->internal.ch = channel; 2598 bdev_io->internal.desc = desc; 2599 bdev_io->type = SPDK_BDEV_IO_TYPE_NVME_IO; 2600 bdev_io->u.nvme_passthru.cmd = *cmd; 2601 bdev_io->u.nvme_passthru.buf = buf; 2602 bdev_io->u.nvme_passthru.nbytes = nbytes; 2603 bdev_io->u.nvme_passthru.md_buf = NULL; 2604 bdev_io->u.nvme_passthru.md_len = 0; 2605 2606 spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb); 2607 2608 spdk_bdev_io_submit(bdev_io); 2609 return 0; 2610 } 2611 2612 int 2613 spdk_bdev_nvme_io_passthru_md(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 2614 const struct spdk_nvme_cmd *cmd, void *buf, size_t nbytes, void *md_buf, size_t md_len, 2615 spdk_bdev_io_completion_cb cb, void *cb_arg) 2616 { 2617 struct spdk_bdev *bdev = desc->bdev; 2618 struct spdk_bdev_io *bdev_io; 2619 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 2620 2621 if (!desc->write) { 2622 /* 2623 * Do not try to parse the NVMe command - we could maybe use bits in the opcode 2624 * to easily determine if the command is a read or write, but for now just 2625 * do not allow io_passthru with a read-only descriptor. 2626 */ 2627 return -EBADF; 2628 } 2629 2630 bdev_io = spdk_bdev_get_io(channel); 2631 if (!bdev_io) { 2632 return -ENOMEM; 2633 } 2634 2635 bdev_io->internal.ch = channel; 2636 bdev_io->internal.desc = desc; 2637 bdev_io->type = SPDK_BDEV_IO_TYPE_NVME_IO_MD; 2638 bdev_io->u.nvme_passthru.cmd = *cmd; 2639 bdev_io->u.nvme_passthru.buf = buf; 2640 bdev_io->u.nvme_passthru.nbytes = nbytes; 2641 bdev_io->u.nvme_passthru.md_buf = md_buf; 2642 bdev_io->u.nvme_passthru.md_len = md_len; 2643 2644 spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb); 2645 2646 spdk_bdev_io_submit(bdev_io); 2647 return 0; 2648 } 2649 2650 int 2651 spdk_bdev_queue_io_wait(struct spdk_bdev *bdev, struct spdk_io_channel *ch, 2652 struct spdk_bdev_io_wait_entry *entry) 2653 { 2654 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 2655 struct spdk_bdev_mgmt_channel *mgmt_ch = channel->shared_resource->mgmt_ch; 2656 2657 if (bdev != entry->bdev) { 2658 SPDK_ERRLOG("bdevs do not match\n"); 2659 return -EINVAL; 2660 } 2661 2662 if (mgmt_ch->per_thread_cache_count > 0) { 2663 SPDK_ERRLOG("Cannot queue io_wait if spdk_bdev_io available in per-thread cache\n"); 2664 return -EINVAL; 2665 } 2666 2667 TAILQ_INSERT_TAIL(&mgmt_ch->io_wait_queue, entry, link); 2668 return 0; 2669 } 2670 2671 static void 2672 _spdk_bdev_ch_retry_io(struct spdk_bdev_channel *bdev_ch) 2673 { 2674 struct spdk_bdev *bdev = bdev_ch->bdev; 2675 struct spdk_bdev_shared_resource *shared_resource = bdev_ch->shared_resource; 2676 struct spdk_bdev_io *bdev_io; 2677 2678 if (shared_resource->io_outstanding > shared_resource->nomem_threshold) { 2679 /* 2680 * Allow some more I/O to complete before retrying the nomem_io queue. 2681 * Some drivers (such as nvme) cannot immediately take a new I/O in 2682 * the context of a completion, because the resources for the I/O are 2683 * not released until control returns to the bdev poller. Also, we 2684 * may require several small I/O to complete before a larger I/O 2685 * (that requires splitting) can be submitted. 2686 */ 2687 return; 2688 } 2689 2690 while (!TAILQ_EMPTY(&shared_resource->nomem_io)) { 2691 bdev_io = TAILQ_FIRST(&shared_resource->nomem_io); 2692 TAILQ_REMOVE(&shared_resource->nomem_io, bdev_io, internal.link); 2693 bdev_io->internal.ch->io_outstanding++; 2694 shared_resource->io_outstanding++; 2695 bdev_io->internal.status = SPDK_BDEV_IO_STATUS_PENDING; 2696 bdev->fn_table->submit_request(bdev_io->internal.ch->channel, bdev_io); 2697 if (bdev_io->internal.status == SPDK_BDEV_IO_STATUS_NOMEM) { 2698 break; 2699 } 2700 } 2701 } 2702 2703 static inline void 2704 _spdk_bdev_io_complete(void *ctx) 2705 { 2706 struct spdk_bdev_io *bdev_io = ctx; 2707 uint64_t tsc; 2708 2709 if (spdk_unlikely(bdev_io->internal.in_submit_request || bdev_io->internal.io_submit_ch)) { 2710 /* 2711 * Send the completion to the thread that originally submitted the I/O, 2712 * which may not be the current thread in the case of QoS. 2713 */ 2714 if (bdev_io->internal.io_submit_ch) { 2715 bdev_io->internal.ch = bdev_io->internal.io_submit_ch; 2716 bdev_io->internal.io_submit_ch = NULL; 2717 } 2718 2719 /* 2720 * Defer completion to avoid potential infinite recursion if the 2721 * user's completion callback issues a new I/O. 2722 */ 2723 spdk_thread_send_msg(spdk_io_channel_get_thread(bdev_io->internal.ch->channel), 2724 _spdk_bdev_io_complete, bdev_io); 2725 return; 2726 } 2727 2728 tsc = spdk_get_ticks(); 2729 spdk_trace_record_tsc(tsc, TRACE_BDEV_IO_DONE, 0, 0, (uintptr_t)bdev_io, 0); 2730 2731 if (bdev_io->internal.status == SPDK_BDEV_IO_STATUS_SUCCESS) { 2732 switch (bdev_io->type) { 2733 case SPDK_BDEV_IO_TYPE_READ: 2734 bdev_io->internal.ch->stat.bytes_read += bdev_io->u.bdev.num_blocks * bdev_io->bdev->blocklen; 2735 bdev_io->internal.ch->stat.num_read_ops++; 2736 bdev_io->internal.ch->stat.read_latency_ticks += (tsc - bdev_io->internal.submit_tsc); 2737 break; 2738 case SPDK_BDEV_IO_TYPE_WRITE: 2739 bdev_io->internal.ch->stat.bytes_written += bdev_io->u.bdev.num_blocks * bdev_io->bdev->blocklen; 2740 bdev_io->internal.ch->stat.num_write_ops++; 2741 bdev_io->internal.ch->stat.write_latency_ticks += (tsc - bdev_io->internal.submit_tsc); 2742 break; 2743 default: 2744 break; 2745 } 2746 } 2747 2748 #ifdef SPDK_CONFIG_VTUNE 2749 uint64_t now_tsc = spdk_get_ticks(); 2750 if (now_tsc > (bdev_io->internal.ch->start_tsc + bdev_io->internal.ch->interval_tsc)) { 2751 uint64_t data[5]; 2752 2753 data[0] = bdev_io->internal.ch->stat.num_read_ops - bdev_io->internal.ch->prev_stat.num_read_ops; 2754 data[1] = bdev_io->internal.ch->stat.bytes_read - bdev_io->internal.ch->prev_stat.bytes_read; 2755 data[2] = bdev_io->internal.ch->stat.num_write_ops - bdev_io->internal.ch->prev_stat.num_write_ops; 2756 data[3] = bdev_io->internal.ch->stat.bytes_written - bdev_io->internal.ch->prev_stat.bytes_written; 2757 data[4] = bdev_io->bdev->fn_table->get_spin_time ? 2758 bdev_io->bdev->fn_table->get_spin_time(bdev_io->internal.ch->channel) : 0; 2759 2760 __itt_metadata_add(g_bdev_mgr.domain, __itt_null, bdev_io->internal.ch->handle, 2761 __itt_metadata_u64, 5, data); 2762 2763 bdev_io->internal.ch->prev_stat = bdev_io->internal.ch->stat; 2764 bdev_io->internal.ch->start_tsc = now_tsc; 2765 } 2766 #endif 2767 2768 assert(bdev_io->internal.cb != NULL); 2769 assert(spdk_get_thread() == spdk_io_channel_get_thread(bdev_io->internal.ch->channel)); 2770 2771 bdev_io->internal.cb(bdev_io, bdev_io->internal.status == SPDK_BDEV_IO_STATUS_SUCCESS, 2772 bdev_io->internal.caller_ctx); 2773 } 2774 2775 static void 2776 _spdk_bdev_reset_complete(struct spdk_io_channel_iter *i, int status) 2777 { 2778 struct spdk_bdev_io *bdev_io = spdk_io_channel_iter_get_ctx(i); 2779 2780 if (bdev_io->u.reset.ch_ref != NULL) { 2781 spdk_put_io_channel(bdev_io->u.reset.ch_ref); 2782 bdev_io->u.reset.ch_ref = NULL; 2783 } 2784 2785 _spdk_bdev_io_complete(bdev_io); 2786 } 2787 2788 static void 2789 _spdk_bdev_unfreeze_channel(struct spdk_io_channel_iter *i) 2790 { 2791 struct spdk_io_channel *_ch = spdk_io_channel_iter_get_channel(i); 2792 struct spdk_bdev_channel *ch = spdk_io_channel_get_ctx(_ch); 2793 2794 ch->flags &= ~BDEV_CH_RESET_IN_PROGRESS; 2795 if (!TAILQ_EMPTY(&ch->queued_resets)) { 2796 _spdk_bdev_channel_start_reset(ch); 2797 } 2798 2799 spdk_for_each_channel_continue(i, 0); 2800 } 2801 2802 void 2803 spdk_bdev_io_complete(struct spdk_bdev_io *bdev_io, enum spdk_bdev_io_status status) 2804 { 2805 struct spdk_bdev *bdev = bdev_io->bdev; 2806 struct spdk_bdev_channel *bdev_ch = bdev_io->internal.ch; 2807 struct spdk_bdev_shared_resource *shared_resource = bdev_ch->shared_resource; 2808 2809 bdev_io->internal.status = status; 2810 2811 if (spdk_unlikely(bdev_io->type == SPDK_BDEV_IO_TYPE_RESET)) { 2812 bool unlock_channels = false; 2813 2814 if (status == SPDK_BDEV_IO_STATUS_NOMEM) { 2815 SPDK_ERRLOG("NOMEM returned for reset\n"); 2816 } 2817 pthread_mutex_lock(&bdev->internal.mutex); 2818 if (bdev_io == bdev->internal.reset_in_progress) { 2819 bdev->internal.reset_in_progress = NULL; 2820 unlock_channels = true; 2821 } 2822 pthread_mutex_unlock(&bdev->internal.mutex); 2823 2824 if (unlock_channels) { 2825 spdk_for_each_channel(__bdev_to_io_dev(bdev), _spdk_bdev_unfreeze_channel, 2826 bdev_io, _spdk_bdev_reset_complete); 2827 return; 2828 } 2829 } else { 2830 assert(bdev_ch->io_outstanding > 0); 2831 assert(shared_resource->io_outstanding > 0); 2832 bdev_ch->io_outstanding--; 2833 shared_resource->io_outstanding--; 2834 2835 if (spdk_unlikely(status == SPDK_BDEV_IO_STATUS_NOMEM)) { 2836 TAILQ_INSERT_HEAD(&shared_resource->nomem_io, bdev_io, internal.link); 2837 /* 2838 * Wait for some of the outstanding I/O to complete before we 2839 * retry any of the nomem_io. Normally we will wait for 2840 * NOMEM_THRESHOLD_COUNT I/O to complete but for low queue 2841 * depth channels we will instead wait for half to complete. 2842 */ 2843 shared_resource->nomem_threshold = spdk_max((int64_t)shared_resource->io_outstanding / 2, 2844 (int64_t)shared_resource->io_outstanding - NOMEM_THRESHOLD_COUNT); 2845 return; 2846 } 2847 2848 if (spdk_unlikely(!TAILQ_EMPTY(&shared_resource->nomem_io))) { 2849 _spdk_bdev_ch_retry_io(bdev_ch); 2850 } 2851 } 2852 2853 _spdk_bdev_io_complete(bdev_io); 2854 } 2855 2856 void 2857 spdk_bdev_io_complete_scsi_status(struct spdk_bdev_io *bdev_io, enum spdk_scsi_status sc, 2858 enum spdk_scsi_sense sk, uint8_t asc, uint8_t ascq) 2859 { 2860 if (sc == SPDK_SCSI_STATUS_GOOD) { 2861 bdev_io->internal.status = SPDK_BDEV_IO_STATUS_SUCCESS; 2862 } else { 2863 bdev_io->internal.status = SPDK_BDEV_IO_STATUS_SCSI_ERROR; 2864 bdev_io->internal.error.scsi.sc = sc; 2865 bdev_io->internal.error.scsi.sk = sk; 2866 bdev_io->internal.error.scsi.asc = asc; 2867 bdev_io->internal.error.scsi.ascq = ascq; 2868 } 2869 2870 spdk_bdev_io_complete(bdev_io, bdev_io->internal.status); 2871 } 2872 2873 void 2874 spdk_bdev_io_get_scsi_status(const struct spdk_bdev_io *bdev_io, 2875 int *sc, int *sk, int *asc, int *ascq) 2876 { 2877 assert(sc != NULL); 2878 assert(sk != NULL); 2879 assert(asc != NULL); 2880 assert(ascq != NULL); 2881 2882 switch (bdev_io->internal.status) { 2883 case SPDK_BDEV_IO_STATUS_SUCCESS: 2884 *sc = SPDK_SCSI_STATUS_GOOD; 2885 *sk = SPDK_SCSI_SENSE_NO_SENSE; 2886 *asc = SPDK_SCSI_ASC_NO_ADDITIONAL_SENSE; 2887 *ascq = SPDK_SCSI_ASCQ_CAUSE_NOT_REPORTABLE; 2888 break; 2889 case SPDK_BDEV_IO_STATUS_NVME_ERROR: 2890 spdk_scsi_nvme_translate(bdev_io, sc, sk, asc, ascq); 2891 break; 2892 case SPDK_BDEV_IO_STATUS_SCSI_ERROR: 2893 *sc = bdev_io->internal.error.scsi.sc; 2894 *sk = bdev_io->internal.error.scsi.sk; 2895 *asc = bdev_io->internal.error.scsi.asc; 2896 *ascq = bdev_io->internal.error.scsi.ascq; 2897 break; 2898 default: 2899 *sc = SPDK_SCSI_STATUS_CHECK_CONDITION; 2900 *sk = SPDK_SCSI_SENSE_ABORTED_COMMAND; 2901 *asc = SPDK_SCSI_ASC_NO_ADDITIONAL_SENSE; 2902 *ascq = SPDK_SCSI_ASCQ_CAUSE_NOT_REPORTABLE; 2903 break; 2904 } 2905 } 2906 2907 void 2908 spdk_bdev_io_complete_nvme_status(struct spdk_bdev_io *bdev_io, int sct, int sc) 2909 { 2910 if (sct == SPDK_NVME_SCT_GENERIC && sc == SPDK_NVME_SC_SUCCESS) { 2911 bdev_io->internal.status = SPDK_BDEV_IO_STATUS_SUCCESS; 2912 } else { 2913 bdev_io->internal.error.nvme.sct = sct; 2914 bdev_io->internal.error.nvme.sc = sc; 2915 bdev_io->internal.status = SPDK_BDEV_IO_STATUS_NVME_ERROR; 2916 } 2917 2918 spdk_bdev_io_complete(bdev_io, bdev_io->internal.status); 2919 } 2920 2921 void 2922 spdk_bdev_io_get_nvme_status(const struct spdk_bdev_io *bdev_io, int *sct, int *sc) 2923 { 2924 assert(sct != NULL); 2925 assert(sc != NULL); 2926 2927 if (bdev_io->internal.status == SPDK_BDEV_IO_STATUS_NVME_ERROR) { 2928 *sct = bdev_io->internal.error.nvme.sct; 2929 *sc = bdev_io->internal.error.nvme.sc; 2930 } else if (bdev_io->internal.status == SPDK_BDEV_IO_STATUS_SUCCESS) { 2931 *sct = SPDK_NVME_SCT_GENERIC; 2932 *sc = SPDK_NVME_SC_SUCCESS; 2933 } else { 2934 *sct = SPDK_NVME_SCT_GENERIC; 2935 *sc = SPDK_NVME_SC_INTERNAL_DEVICE_ERROR; 2936 } 2937 } 2938 2939 struct spdk_thread * 2940 spdk_bdev_io_get_thread(struct spdk_bdev_io *bdev_io) 2941 { 2942 return spdk_io_channel_get_thread(bdev_io->internal.ch->channel); 2943 } 2944 2945 static void 2946 _spdk_bdev_qos_config_type(struct spdk_bdev *bdev, uint64_t qos_set, 2947 enum spdk_bdev_qos_type qos_type) 2948 { 2949 uint64_t min_qos_set = 0; 2950 2951 switch (qos_type) { 2952 case SPDK_BDEV_QOS_RW_IOPS_RATE_LIMIT: 2953 min_qos_set = SPDK_BDEV_QOS_MIN_IOS_PER_SEC; 2954 break; 2955 case SPDK_BDEV_QOS_RW_BYTEPS_RATE_LIMIT: 2956 min_qos_set = SPDK_BDEV_QOS_MIN_BW_IN_MB_PER_SEC; 2957 break; 2958 default: 2959 SPDK_ERRLOG("Unsupported QoS type.\n"); 2960 return; 2961 } 2962 2963 if (qos_set % min_qos_set) { 2964 SPDK_ERRLOG("Assigned QoS %" PRIu64 " on bdev %s is not multiple of %lu\n", 2965 qos_set, bdev->name, min_qos_set); 2966 SPDK_ERRLOG("Failed to enable QoS on this bdev %s\n", bdev->name); 2967 return; 2968 } 2969 2970 if (!bdev->internal.qos) { 2971 bdev->internal.qos = calloc(1, sizeof(*bdev->internal.qos)); 2972 if (!bdev->internal.qos) { 2973 SPDK_ERRLOG("Unable to allocate memory for QoS tracking\n"); 2974 return; 2975 } 2976 } 2977 2978 switch (qos_type) { 2979 case SPDK_BDEV_QOS_RW_IOPS_RATE_LIMIT: 2980 bdev->internal.qos->iops_rate_limit = qos_set; 2981 break; 2982 case SPDK_BDEV_QOS_RW_BYTEPS_RATE_LIMIT: 2983 bdev->internal.qos->byte_rate_limit = qos_set * 1024 * 1024; 2984 break; 2985 default: 2986 break; 2987 } 2988 2989 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Bdev:%s QoS type:%d set:%lu\n", 2990 bdev->name, qos_type, qos_set); 2991 2992 return; 2993 } 2994 2995 static void 2996 _spdk_bdev_qos_config(struct spdk_bdev *bdev) 2997 { 2998 struct spdk_conf_section *sp = NULL; 2999 const char *val = NULL; 3000 uint64_t qos_set = 0; 3001 int i = 0, j = 0; 3002 3003 sp = spdk_conf_find_section(NULL, "QoS"); 3004 if (!sp) { 3005 return; 3006 } 3007 3008 while (j < SPDK_BDEV_QOS_NUM_TYPES) { 3009 i = 0; 3010 while (true) { 3011 val = spdk_conf_section_get_nmval(sp, qos_type_str[j], i, 0); 3012 if (!val) { 3013 break; 3014 } 3015 3016 if (strcmp(bdev->name, val) != 0) { 3017 i++; 3018 continue; 3019 } 3020 3021 val = spdk_conf_section_get_nmval(sp, qos_type_str[j], i, 1); 3022 if (val) { 3023 qos_set = strtoull(val, NULL, 10); 3024 _spdk_bdev_qos_config_type(bdev, qos_set, j); 3025 } 3026 3027 break; 3028 } 3029 3030 j++; 3031 } 3032 3033 return; 3034 } 3035 3036 static int 3037 spdk_bdev_init(struct spdk_bdev *bdev) 3038 { 3039 assert(bdev->module != NULL); 3040 3041 if (!bdev->name) { 3042 SPDK_ERRLOG("Bdev name is NULL\n"); 3043 return -EINVAL; 3044 } 3045 3046 if (spdk_bdev_get_by_name(bdev->name)) { 3047 SPDK_ERRLOG("Bdev name:%s already exists\n", bdev->name); 3048 return -EEXIST; 3049 } 3050 3051 bdev->internal.status = SPDK_BDEV_STATUS_READY; 3052 bdev->internal.measured_queue_depth = UINT64_MAX; 3053 3054 TAILQ_INIT(&bdev->internal.open_descs); 3055 3056 TAILQ_INIT(&bdev->aliases); 3057 3058 bdev->internal.reset_in_progress = NULL; 3059 3060 _spdk_bdev_qos_config(bdev); 3061 3062 spdk_io_device_register(__bdev_to_io_dev(bdev), 3063 spdk_bdev_channel_create, spdk_bdev_channel_destroy, 3064 sizeof(struct spdk_bdev_channel)); 3065 3066 pthread_mutex_init(&bdev->internal.mutex, NULL); 3067 return 0; 3068 } 3069 3070 static void 3071 spdk_bdev_destroy_cb(void *io_device) 3072 { 3073 int rc; 3074 struct spdk_bdev *bdev; 3075 spdk_bdev_unregister_cb cb_fn; 3076 void *cb_arg; 3077 3078 bdev = __bdev_from_io_dev(io_device); 3079 cb_fn = bdev->internal.unregister_cb; 3080 cb_arg = bdev->internal.unregister_ctx; 3081 3082 rc = bdev->fn_table->destruct(bdev->ctxt); 3083 if (rc < 0) { 3084 SPDK_ERRLOG("destruct failed\n"); 3085 } 3086 if (rc <= 0 && cb_fn != NULL) { 3087 cb_fn(cb_arg, rc); 3088 } 3089 } 3090 3091 3092 static void 3093 spdk_bdev_fini(struct spdk_bdev *bdev) 3094 { 3095 pthread_mutex_destroy(&bdev->internal.mutex); 3096 3097 free(bdev->internal.qos); 3098 3099 spdk_io_device_unregister(__bdev_to_io_dev(bdev), spdk_bdev_destroy_cb); 3100 } 3101 3102 static void 3103 spdk_bdev_start(struct spdk_bdev *bdev) 3104 { 3105 struct spdk_bdev_module *module; 3106 uint32_t action; 3107 3108 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Inserting bdev %s into list\n", bdev->name); 3109 TAILQ_INSERT_TAIL(&g_bdev_mgr.bdevs, bdev, internal.link); 3110 3111 /* Examine configuration before initializing I/O */ 3112 TAILQ_FOREACH(module, &g_bdev_mgr.bdev_modules, internal.tailq) { 3113 if (module->examine_config) { 3114 action = module->internal.action_in_progress; 3115 module->internal.action_in_progress++; 3116 module->examine_config(bdev); 3117 if (action != module->internal.action_in_progress) { 3118 SPDK_ERRLOG("examine_config for module %s did not call spdk_bdev_module_examine_done()\n", 3119 module->name); 3120 } 3121 } 3122 } 3123 3124 if (bdev->internal.claim_module) { 3125 return; 3126 } 3127 3128 TAILQ_FOREACH(module, &g_bdev_mgr.bdev_modules, internal.tailq) { 3129 if (module->examine_disk) { 3130 module->internal.action_in_progress++; 3131 module->examine_disk(bdev); 3132 } 3133 } 3134 } 3135 3136 int 3137 spdk_bdev_register(struct spdk_bdev *bdev) 3138 { 3139 int rc = spdk_bdev_init(bdev); 3140 3141 if (rc == 0) { 3142 spdk_bdev_start(bdev); 3143 } 3144 3145 return rc; 3146 } 3147 3148 int 3149 spdk_vbdev_register(struct spdk_bdev *vbdev, struct spdk_bdev **base_bdevs, int base_bdev_count) 3150 { 3151 int rc; 3152 3153 rc = spdk_bdev_init(vbdev); 3154 if (rc) { 3155 return rc; 3156 } 3157 3158 spdk_bdev_start(vbdev); 3159 return 0; 3160 } 3161 3162 void 3163 spdk_bdev_destruct_done(struct spdk_bdev *bdev, int bdeverrno) 3164 { 3165 if (bdev->internal.unregister_cb != NULL) { 3166 bdev->internal.unregister_cb(bdev->internal.unregister_ctx, bdeverrno); 3167 } 3168 } 3169 3170 static void 3171 _remove_notify(void *arg) 3172 { 3173 struct spdk_bdev_desc *desc = arg; 3174 3175 desc->remove_cb(desc->remove_ctx); 3176 } 3177 3178 void 3179 spdk_bdev_unregister(struct spdk_bdev *bdev, spdk_bdev_unregister_cb cb_fn, void *cb_arg) 3180 { 3181 struct spdk_bdev_desc *desc, *tmp; 3182 bool do_destruct = true; 3183 struct spdk_thread *thread; 3184 3185 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Removing bdev %s from list\n", bdev->name); 3186 3187 thread = spdk_get_thread(); 3188 if (!thread) { 3189 /* The user called this from a non-SPDK thread. */ 3190 if (cb_fn != NULL) { 3191 cb_fn(cb_arg, -ENOTSUP); 3192 } 3193 return; 3194 } 3195 3196 pthread_mutex_lock(&bdev->internal.mutex); 3197 3198 bdev->internal.status = SPDK_BDEV_STATUS_REMOVING; 3199 bdev->internal.unregister_cb = cb_fn; 3200 bdev->internal.unregister_ctx = cb_arg; 3201 3202 TAILQ_FOREACH_SAFE(desc, &bdev->internal.open_descs, link, tmp) { 3203 if (desc->remove_cb) { 3204 do_destruct = false; 3205 /* 3206 * Defer invocation of the remove_cb to a separate message that will 3207 * run later on this thread. This ensures this context unwinds and 3208 * we don't recursively unregister this bdev again if the remove_cb 3209 * immediately closes its descriptor. 3210 */ 3211 if (!desc->remove_scheduled) { 3212 /* Avoid scheduling removal of the same descriptor multiple times. */ 3213 desc->remove_scheduled = true; 3214 spdk_thread_send_msg(thread, _remove_notify, desc); 3215 } 3216 } 3217 } 3218 3219 if (!do_destruct) { 3220 pthread_mutex_unlock(&bdev->internal.mutex); 3221 return; 3222 } 3223 3224 TAILQ_REMOVE(&g_bdev_mgr.bdevs, bdev, internal.link); 3225 pthread_mutex_unlock(&bdev->internal.mutex); 3226 3227 spdk_bdev_fini(bdev); 3228 } 3229 3230 int 3231 spdk_bdev_open(struct spdk_bdev *bdev, bool write, spdk_bdev_remove_cb_t remove_cb, 3232 void *remove_ctx, struct spdk_bdev_desc **_desc) 3233 { 3234 struct spdk_bdev_desc *desc; 3235 3236 desc = calloc(1, sizeof(*desc)); 3237 if (desc == NULL) { 3238 SPDK_ERRLOG("Failed to allocate memory for bdev descriptor\n"); 3239 return -ENOMEM; 3240 } 3241 3242 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Opening descriptor %p for bdev %s on thread %p\n", desc, bdev->name, 3243 spdk_get_thread()); 3244 3245 pthread_mutex_lock(&bdev->internal.mutex); 3246 3247 if (write && bdev->internal.claim_module) { 3248 SPDK_ERRLOG("Could not open %s - %s module already claimed it\n", 3249 bdev->name, bdev->internal.claim_module->name); 3250 free(desc); 3251 pthread_mutex_unlock(&bdev->internal.mutex); 3252 return -EPERM; 3253 } 3254 3255 TAILQ_INSERT_TAIL(&bdev->internal.open_descs, desc, link); 3256 3257 desc->bdev = bdev; 3258 desc->remove_cb = remove_cb; 3259 desc->remove_ctx = remove_ctx; 3260 desc->write = write; 3261 *_desc = desc; 3262 3263 pthread_mutex_unlock(&bdev->internal.mutex); 3264 3265 return 0; 3266 } 3267 3268 void 3269 spdk_bdev_close(struct spdk_bdev_desc *desc) 3270 { 3271 struct spdk_bdev *bdev = desc->bdev; 3272 bool do_unregister = false; 3273 3274 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Closing descriptor %p for bdev %s on thread %p\n", desc, bdev->name, 3275 spdk_get_thread()); 3276 3277 pthread_mutex_lock(&bdev->internal.mutex); 3278 3279 TAILQ_REMOVE(&bdev->internal.open_descs, desc, link); 3280 free(desc); 3281 3282 /* If no more descriptors, kill QoS channel */ 3283 if (bdev->internal.qos && TAILQ_EMPTY(&bdev->internal.open_descs)) { 3284 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Closed last descriptor for bdev %s on thread %p. Stopping QoS.\n", 3285 bdev->name, spdk_get_thread()); 3286 3287 if (spdk_bdev_qos_destroy(bdev)) { 3288 /* There isn't anything we can do to recover here. Just let the 3289 * old QoS poller keep running. The QoS handling won't change 3290 * cores when the user allocates a new channel, but it won't break. */ 3291 SPDK_ERRLOG("Unable to shut down QoS poller. It will continue running on the current thread.\n"); 3292 } 3293 } 3294 3295 spdk_bdev_set_qd_sampling_period(bdev, 0); 3296 3297 if (bdev->internal.status == SPDK_BDEV_STATUS_REMOVING && TAILQ_EMPTY(&bdev->internal.open_descs)) { 3298 do_unregister = true; 3299 } 3300 pthread_mutex_unlock(&bdev->internal.mutex); 3301 3302 if (do_unregister == true) { 3303 spdk_bdev_unregister(bdev, bdev->internal.unregister_cb, bdev->internal.unregister_ctx); 3304 } 3305 } 3306 3307 int 3308 spdk_bdev_module_claim_bdev(struct spdk_bdev *bdev, struct spdk_bdev_desc *desc, 3309 struct spdk_bdev_module *module) 3310 { 3311 if (bdev->internal.claim_module != NULL) { 3312 SPDK_ERRLOG("bdev %s already claimed by module %s\n", bdev->name, 3313 bdev->internal.claim_module->name); 3314 return -EPERM; 3315 } 3316 3317 if (desc && !desc->write) { 3318 desc->write = true; 3319 } 3320 3321 bdev->internal.claim_module = module; 3322 return 0; 3323 } 3324 3325 void 3326 spdk_bdev_module_release_bdev(struct spdk_bdev *bdev) 3327 { 3328 assert(bdev->internal.claim_module != NULL); 3329 bdev->internal.claim_module = NULL; 3330 } 3331 3332 struct spdk_bdev * 3333 spdk_bdev_desc_get_bdev(struct spdk_bdev_desc *desc) 3334 { 3335 return desc->bdev; 3336 } 3337 3338 void 3339 spdk_bdev_io_get_iovec(struct spdk_bdev_io *bdev_io, struct iovec **iovp, int *iovcntp) 3340 { 3341 struct iovec *iovs; 3342 int iovcnt; 3343 3344 if (bdev_io == NULL) { 3345 return; 3346 } 3347 3348 switch (bdev_io->type) { 3349 case SPDK_BDEV_IO_TYPE_READ: 3350 iovs = bdev_io->u.bdev.iovs; 3351 iovcnt = bdev_io->u.bdev.iovcnt; 3352 break; 3353 case SPDK_BDEV_IO_TYPE_WRITE: 3354 iovs = bdev_io->u.bdev.iovs; 3355 iovcnt = bdev_io->u.bdev.iovcnt; 3356 break; 3357 default: 3358 iovs = NULL; 3359 iovcnt = 0; 3360 break; 3361 } 3362 3363 if (iovp) { 3364 *iovp = iovs; 3365 } 3366 if (iovcntp) { 3367 *iovcntp = iovcnt; 3368 } 3369 } 3370 3371 void 3372 spdk_bdev_module_list_add(struct spdk_bdev_module *bdev_module) 3373 { 3374 3375 if (spdk_bdev_module_list_find(bdev_module->name)) { 3376 SPDK_ERRLOG("ERROR: module '%s' already registered.\n", bdev_module->name); 3377 assert(false); 3378 } 3379 3380 if (bdev_module->async_init) { 3381 bdev_module->internal.action_in_progress = 1; 3382 } 3383 3384 /* 3385 * Modules with examine callbacks must be initialized first, so they are 3386 * ready to handle examine callbacks from later modules that will 3387 * register physical bdevs. 3388 */ 3389 if (bdev_module->examine_config != NULL || bdev_module->examine_disk != NULL) { 3390 TAILQ_INSERT_HEAD(&g_bdev_mgr.bdev_modules, bdev_module, internal.tailq); 3391 } else { 3392 TAILQ_INSERT_TAIL(&g_bdev_mgr.bdev_modules, bdev_module, internal.tailq); 3393 } 3394 } 3395 3396 struct spdk_bdev_module * 3397 spdk_bdev_module_list_find(const char *name) 3398 { 3399 struct spdk_bdev_module *bdev_module; 3400 3401 TAILQ_FOREACH(bdev_module, &g_bdev_mgr.bdev_modules, internal.tailq) { 3402 if (strcmp(name, bdev_module->name) == 0) { 3403 break; 3404 } 3405 } 3406 3407 return bdev_module; 3408 } 3409 3410 static void 3411 _spdk_bdev_write_zero_buffer_next(void *_bdev_io) 3412 { 3413 struct spdk_bdev_io *bdev_io = _bdev_io; 3414 uint64_t num_bytes, num_blocks; 3415 int rc; 3416 3417 num_bytes = spdk_min(spdk_bdev_get_block_size(bdev_io->bdev) * 3418 bdev_io->u.bdev.split_remaining_num_blocks, 3419 ZERO_BUFFER_SIZE); 3420 num_blocks = num_bytes / spdk_bdev_get_block_size(bdev_io->bdev); 3421 3422 rc = spdk_bdev_write_blocks(bdev_io->internal.desc, 3423 spdk_io_channel_from_ctx(bdev_io->internal.ch), 3424 g_bdev_mgr.zero_buffer, 3425 bdev_io->u.bdev.split_current_offset_blocks, num_blocks, 3426 _spdk_bdev_write_zero_buffer_done, bdev_io); 3427 if (rc == 0) { 3428 bdev_io->u.bdev.split_remaining_num_blocks -= num_blocks; 3429 bdev_io->u.bdev.split_current_offset_blocks += num_blocks; 3430 } else if (rc == -ENOMEM) { 3431 bdev_io->internal.waitq_entry.bdev = bdev_io->bdev; 3432 bdev_io->internal.waitq_entry.cb_fn = _spdk_bdev_write_zero_buffer_next; 3433 bdev_io->internal.waitq_entry.cb_arg = bdev_io; 3434 spdk_bdev_queue_io_wait(bdev_io->bdev, spdk_io_channel_from_ctx(bdev_io->internal.ch), 3435 &bdev_io->internal.waitq_entry); 3436 } else { 3437 /* This should never happen. */ 3438 assert(false); 3439 bdev_io->internal.status = SPDK_BDEV_IO_STATUS_FAILED; 3440 bdev_io->internal.cb(bdev_io, SPDK_BDEV_IO_STATUS_FAILED, bdev_io->internal.caller_ctx); 3441 } 3442 } 3443 3444 static void 3445 _spdk_bdev_write_zero_buffer_done(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg) 3446 { 3447 struct spdk_bdev_io *parent_io = cb_arg; 3448 3449 if (!success) { 3450 parent_io->internal.status = SPDK_BDEV_IO_STATUS_FAILED; 3451 parent_io->internal.cb(parent_io, SPDK_BDEV_IO_STATUS_FAILED, parent_io->internal.caller_ctx); 3452 return; 3453 } 3454 3455 if (parent_io->u.bdev.split_remaining_num_blocks == 0) { 3456 parent_io->internal.status = SPDK_BDEV_IO_STATUS_SUCCESS; 3457 parent_io->internal.cb(parent_io, SPDK_BDEV_IO_STATUS_SUCCESS, parent_io->internal.caller_ctx); 3458 return; 3459 } 3460 3461 _spdk_bdev_write_zero_buffer_next(parent_io); 3462 } 3463 3464 struct set_qos_limit_ctx { 3465 void (*cb_fn)(void *cb_arg, int status); 3466 void *cb_arg; 3467 struct spdk_bdev *bdev; 3468 }; 3469 3470 static void 3471 _spdk_bdev_set_qos_limit_done(struct set_qos_limit_ctx *ctx, int status) 3472 { 3473 pthread_mutex_lock(&ctx->bdev->internal.mutex); 3474 ctx->bdev->internal.qos_mod_in_progress = false; 3475 pthread_mutex_unlock(&ctx->bdev->internal.mutex); 3476 3477 ctx->cb_fn(ctx->cb_arg, status); 3478 free(ctx); 3479 } 3480 3481 static void 3482 _spdk_bdev_disable_qos_done(void *cb_arg) 3483 { 3484 struct set_qos_limit_ctx *ctx = cb_arg; 3485 struct spdk_bdev *bdev = ctx->bdev; 3486 struct spdk_bdev_io *bdev_io; 3487 struct spdk_bdev_qos *qos; 3488 3489 pthread_mutex_lock(&bdev->internal.mutex); 3490 qos = bdev->internal.qos; 3491 bdev->internal.qos = NULL; 3492 pthread_mutex_unlock(&bdev->internal.mutex); 3493 3494 while (!TAILQ_EMPTY(&qos->queued)) { 3495 /* Send queued I/O back to their original thread for resubmission. */ 3496 bdev_io = TAILQ_FIRST(&qos->queued); 3497 TAILQ_REMOVE(&qos->queued, bdev_io, internal.link); 3498 3499 if (bdev_io->internal.io_submit_ch) { 3500 /* 3501 * Channel was changed when sending it to the QoS thread - change it back 3502 * before sending it back to the original thread. 3503 */ 3504 bdev_io->internal.ch = bdev_io->internal.io_submit_ch; 3505 bdev_io->internal.io_submit_ch = NULL; 3506 } 3507 3508 spdk_thread_send_msg(spdk_io_channel_get_thread(bdev_io->internal.ch->channel), 3509 _spdk_bdev_io_submit, bdev_io); 3510 } 3511 3512 spdk_put_io_channel(spdk_io_channel_from_ctx(qos->ch)); 3513 spdk_poller_unregister(&qos->poller); 3514 3515 free(qos); 3516 3517 _spdk_bdev_set_qos_limit_done(ctx, 0); 3518 } 3519 3520 static void 3521 _spdk_bdev_disable_qos_msg_done(struct spdk_io_channel_iter *i, int status) 3522 { 3523 void *io_device = spdk_io_channel_iter_get_io_device(i); 3524 struct spdk_bdev *bdev = __bdev_from_io_dev(io_device); 3525 struct set_qos_limit_ctx *ctx = spdk_io_channel_iter_get_ctx(i); 3526 struct spdk_thread *thread; 3527 3528 pthread_mutex_lock(&bdev->internal.mutex); 3529 thread = bdev->internal.qos->thread; 3530 pthread_mutex_unlock(&bdev->internal.mutex); 3531 3532 spdk_thread_send_msg(thread, _spdk_bdev_disable_qos_done, ctx); 3533 } 3534 3535 static void 3536 _spdk_bdev_disable_qos_msg(struct spdk_io_channel_iter *i) 3537 { 3538 struct spdk_io_channel *ch = spdk_io_channel_iter_get_channel(i); 3539 struct spdk_bdev_channel *bdev_ch = spdk_io_channel_get_ctx(ch); 3540 3541 bdev_ch->flags &= ~BDEV_CH_QOS_ENABLED; 3542 3543 spdk_for_each_channel_continue(i, 0); 3544 } 3545 3546 static void 3547 _spdk_bdev_update_qos_limit_iops_msg(void *cb_arg) 3548 { 3549 struct set_qos_limit_ctx *ctx = cb_arg; 3550 struct spdk_bdev *bdev = ctx->bdev; 3551 3552 pthread_mutex_lock(&bdev->internal.mutex); 3553 spdk_bdev_qos_update_max_quota_per_timeslice(bdev->internal.qos); 3554 pthread_mutex_unlock(&bdev->internal.mutex); 3555 3556 _spdk_bdev_set_qos_limit_done(ctx, 0); 3557 } 3558 3559 static void 3560 _spdk_bdev_enable_qos_msg(struct spdk_io_channel_iter *i) 3561 { 3562 void *io_device = spdk_io_channel_iter_get_io_device(i); 3563 struct spdk_bdev *bdev = __bdev_from_io_dev(io_device); 3564 struct spdk_io_channel *ch = spdk_io_channel_iter_get_channel(i); 3565 struct spdk_bdev_channel *bdev_ch = spdk_io_channel_get_ctx(ch); 3566 3567 pthread_mutex_lock(&bdev->internal.mutex); 3568 _spdk_bdev_enable_qos(bdev, bdev_ch); 3569 pthread_mutex_unlock(&bdev->internal.mutex); 3570 spdk_for_each_channel_continue(i, 0); 3571 } 3572 3573 static void 3574 _spdk_bdev_enable_qos_done(struct spdk_io_channel_iter *i, int status) 3575 { 3576 struct set_qos_limit_ctx *ctx = spdk_io_channel_iter_get_ctx(i); 3577 3578 _spdk_bdev_set_qos_limit_done(ctx, status); 3579 } 3580 3581 void 3582 spdk_bdev_set_qos_limit_iops(struct spdk_bdev *bdev, uint64_t ios_per_sec, 3583 void (*cb_fn)(void *cb_arg, int status), void *cb_arg) 3584 { 3585 struct set_qos_limit_ctx *ctx; 3586 3587 if (ios_per_sec > 0 && ios_per_sec % SPDK_BDEV_QOS_MIN_IOS_PER_SEC) { 3588 SPDK_ERRLOG("Requested ios_per_sec limit %" PRIu64 " is not a multiple of %u\n", 3589 ios_per_sec, SPDK_BDEV_QOS_MIN_IOS_PER_SEC); 3590 cb_fn(cb_arg, -EINVAL); 3591 return; 3592 } 3593 3594 ctx = calloc(1, sizeof(*ctx)); 3595 if (ctx == NULL) { 3596 cb_fn(cb_arg, -ENOMEM); 3597 return; 3598 } 3599 3600 ctx->cb_fn = cb_fn; 3601 ctx->cb_arg = cb_arg; 3602 ctx->bdev = bdev; 3603 3604 pthread_mutex_lock(&bdev->internal.mutex); 3605 if (bdev->internal.qos_mod_in_progress) { 3606 pthread_mutex_unlock(&bdev->internal.mutex); 3607 free(ctx); 3608 cb_fn(cb_arg, -EAGAIN); 3609 return; 3610 } 3611 bdev->internal.qos_mod_in_progress = true; 3612 3613 if (ios_per_sec > 0) { 3614 if (bdev->internal.qos == NULL) { 3615 /* Enabling */ 3616 bdev->internal.qos = calloc(1, sizeof(*bdev->internal.qos)); 3617 if (!bdev->internal.qos) { 3618 pthread_mutex_unlock(&bdev->internal.mutex); 3619 SPDK_ERRLOG("Unable to allocate memory for QoS tracking\n"); 3620 free(ctx); 3621 cb_fn(cb_arg, -ENOMEM); 3622 return; 3623 } 3624 3625 bdev->internal.qos->iops_rate_limit = ios_per_sec; 3626 spdk_for_each_channel(__bdev_to_io_dev(bdev), 3627 _spdk_bdev_enable_qos_msg, ctx, 3628 _spdk_bdev_enable_qos_done); 3629 } else { 3630 /* Updating */ 3631 bdev->internal.qos->iops_rate_limit = ios_per_sec; 3632 spdk_thread_send_msg(bdev->internal.qos->thread, _spdk_bdev_update_qos_limit_iops_msg, ctx); 3633 } 3634 } else { 3635 if (bdev->internal.qos != NULL) { 3636 /* Disabling */ 3637 spdk_for_each_channel(__bdev_to_io_dev(bdev), 3638 _spdk_bdev_disable_qos_msg, ctx, 3639 _spdk_bdev_disable_qos_msg_done); 3640 } else { 3641 pthread_mutex_unlock(&bdev->internal.mutex); 3642 _spdk_bdev_set_qos_limit_done(ctx, 0); 3643 return; 3644 } 3645 } 3646 3647 pthread_mutex_unlock(&bdev->internal.mutex); 3648 } 3649 3650 SPDK_LOG_REGISTER_COMPONENT("bdev", SPDK_LOG_BDEV) 3651 3652 SPDK_TRACE_REGISTER_FN(bdev_trace) 3653 { 3654 spdk_trace_register_owner(OWNER_BDEV, 'b'); 3655 spdk_trace_register_object(OBJECT_BDEV_IO, 'i'); 3656 spdk_trace_register_description("BDEV_IO_START", "", TRACE_BDEV_IO_START, OWNER_BDEV, 3657 OBJECT_BDEV_IO, 1, 0, 0, "type: "); 3658 spdk_trace_register_description("BDEV_IO_DONE", "", TRACE_BDEV_IO_DONE, OWNER_BDEV, 3659 OBJECT_BDEV_IO, 0, 0, 0, ""); 3660 } 3661