1 /*- 2 * BSD LICENSE 3 * 4 * Copyright (c) Intel Corporation. 5 * All rights reserved. 6 * 7 * Redistribution and use in source and binary forms, with or without 8 * modification, are permitted provided that the following conditions 9 * are met: 10 * 11 * * Redistributions of source code must retain the above copyright 12 * notice, this list of conditions and the following disclaimer. 13 * * Redistributions in binary form must reproduce the above copyright 14 * notice, this list of conditions and the following disclaimer in 15 * the documentation and/or other materials provided with the 16 * distribution. 17 * * Neither the name of Intel Corporation nor the names of its 18 * contributors may be used to endorse or promote products derived 19 * from this software without specific prior written permission. 20 * 21 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 22 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 23 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR 24 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT 25 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, 26 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT 27 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 28 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY 29 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 30 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE 31 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 32 */ 33 34 #include "spdk/stdinc.h" 35 36 #include "spdk/bdev.h" 37 #include "spdk/conf.h" 38 39 #include "spdk/env.h" 40 #include "spdk/event.h" 41 #include "spdk/thread.h" 42 #include "spdk/likely.h" 43 #include "spdk/queue.h" 44 #include "spdk/nvme_spec.h" 45 #include "spdk/scsi_spec.h" 46 #include "spdk/util.h" 47 48 #include "spdk/bdev_module.h" 49 #include "spdk_internal/log.h" 50 #include "spdk/string.h" 51 52 #ifdef SPDK_CONFIG_VTUNE 53 #include "ittnotify.h" 54 #include "ittnotify_types.h" 55 int __itt_init_ittlib(const char *, __itt_group_id); 56 #endif 57 58 #define SPDK_BDEV_IO_POOL_SIZE (64 * 1024) 59 #define SPDK_BDEV_IO_CACHE_SIZE 256 60 #define BUF_SMALL_POOL_SIZE 8192 61 #define BUF_LARGE_POOL_SIZE 1024 62 #define NOMEM_THRESHOLD_COUNT 8 63 #define ZERO_BUFFER_SIZE 0x100000 64 #define SPDK_BDEV_QOS_TIMESLICE_IN_USEC 1000 65 #define SPDK_BDEV_QOS_MIN_IO_PER_TIMESLICE 1 66 #define SPDK_BDEV_QOS_MIN_BYTE_PER_TIMESLICE 512 67 #define SPDK_BDEV_QOS_MIN_IOS_PER_SEC 10000 68 #define SPDK_BDEV_QOS_MIN_BW_IN_MB_PER_SEC 10 69 70 enum spdk_bdev_qos_type { 71 SPDK_BDEV_QOS_RW_IOPS_RATE_LIMIT = 0, 72 SPDK_BDEV_QOS_RW_BYTEPS_RATE_LIMIT, 73 SPDK_BDEV_QOS_NUM_TYPES /* Keep last */ 74 }; 75 76 static const char *qos_type_str[SPDK_BDEV_QOS_NUM_TYPES] = {"Limit_IOPS", "Limit_BWPS"}; 77 78 TAILQ_HEAD(spdk_bdev_list, spdk_bdev); 79 80 struct spdk_bdev_mgr { 81 struct spdk_mempool *bdev_io_pool; 82 83 struct spdk_mempool *buf_small_pool; 84 struct spdk_mempool *buf_large_pool; 85 86 void *zero_buffer; 87 88 TAILQ_HEAD(, spdk_bdev_module) bdev_modules; 89 90 struct spdk_bdev_list bdevs; 91 92 bool init_complete; 93 bool module_init_complete; 94 95 #ifdef SPDK_CONFIG_VTUNE 96 __itt_domain *domain; 97 #endif 98 }; 99 100 static struct spdk_bdev_mgr g_bdev_mgr = { 101 .bdev_modules = TAILQ_HEAD_INITIALIZER(g_bdev_mgr.bdev_modules), 102 .bdevs = TAILQ_HEAD_INITIALIZER(g_bdev_mgr.bdevs), 103 .init_complete = false, 104 .module_init_complete = false, 105 }; 106 107 static struct spdk_bdev_opts g_bdev_opts = { 108 .bdev_io_pool_size = SPDK_BDEV_IO_POOL_SIZE, 109 .bdev_io_cache_size = SPDK_BDEV_IO_CACHE_SIZE, 110 }; 111 112 static spdk_bdev_init_cb g_init_cb_fn = NULL; 113 static void *g_init_cb_arg = NULL; 114 115 static spdk_bdev_fini_cb g_fini_cb_fn = NULL; 116 static void *g_fini_cb_arg = NULL; 117 static struct spdk_thread *g_fini_thread = NULL; 118 119 struct spdk_bdev_qos { 120 /** Rate limit, in I/O per second */ 121 uint64_t iops_rate_limit; 122 123 /** Rate limit, in byte per second */ 124 uint64_t byte_rate_limit; 125 126 /** The channel that all I/O are funneled through */ 127 struct spdk_bdev_channel *ch; 128 129 /** The thread on which the poller is running. */ 130 struct spdk_thread *thread; 131 132 /** Queue of I/O waiting to be issued. */ 133 bdev_io_tailq_t queued; 134 135 /** Maximum allowed IOs to be issued in one timeslice (e.g., 1ms) and 136 * only valid for the master channel which manages the outstanding IOs. */ 137 uint64_t max_ios_per_timeslice; 138 139 /** Maximum allowed bytes to be issued in one timeslice (e.g., 1ms) and 140 * only valid for the master channel which manages the outstanding IOs. */ 141 uint64_t max_byte_per_timeslice; 142 143 /** Remaining IO allowed in current timeslice (e.g., 1ms) */ 144 uint64_t io_remaining_this_timeslice; 145 146 /** Remaining bytes allowed in current timeslice (e.g., 1ms). 147 * Allowed to run negative if an I/O is submitted when some bytes are remaining, 148 * but the I/O is bigger than that amount. The excess will be deducted from the 149 * next timeslice. 150 */ 151 int64_t byte_remaining_this_timeslice; 152 153 /** Poller that processes queued I/O commands each time slice. */ 154 struct spdk_poller *poller; 155 }; 156 157 struct spdk_bdev_mgmt_channel { 158 bdev_io_stailq_t need_buf_small; 159 bdev_io_stailq_t need_buf_large; 160 161 /* 162 * Each thread keeps a cache of bdev_io - this allows 163 * bdev threads which are *not* DPDK threads to still 164 * benefit from a per-thread bdev_io cache. Without 165 * this, non-DPDK threads fetching from the mempool 166 * incur a cmpxchg on get and put. 167 */ 168 bdev_io_stailq_t per_thread_cache; 169 uint32_t per_thread_cache_count; 170 uint32_t bdev_io_cache_size; 171 172 TAILQ_HEAD(, spdk_bdev_shared_resource) shared_resources; 173 TAILQ_HEAD(, spdk_bdev_io_wait_entry) io_wait_queue; 174 }; 175 176 /* 177 * Per-module (or per-io_device) data. Multiple bdevs built on the same io_device 178 * will queue here their IO that awaits retry. It makes it possible to retry sending 179 * IO to one bdev after IO from other bdev completes. 180 */ 181 struct spdk_bdev_shared_resource { 182 /* The bdev management channel */ 183 struct spdk_bdev_mgmt_channel *mgmt_ch; 184 185 /* 186 * Count of I/O submitted to bdev module and waiting for completion. 187 * Incremented before submit_request() is called on an spdk_bdev_io. 188 */ 189 uint64_t io_outstanding; 190 191 /* 192 * Queue of IO awaiting retry because of a previous NOMEM status returned 193 * on this channel. 194 */ 195 bdev_io_tailq_t nomem_io; 196 197 /* 198 * Threshold which io_outstanding must drop to before retrying nomem_io. 199 */ 200 uint64_t nomem_threshold; 201 202 /* I/O channel allocated by a bdev module */ 203 struct spdk_io_channel *shared_ch; 204 205 /* Refcount of bdev channels using this resource */ 206 uint32_t ref; 207 208 TAILQ_ENTRY(spdk_bdev_shared_resource) link; 209 }; 210 211 #define BDEV_CH_RESET_IN_PROGRESS (1 << 0) 212 #define BDEV_CH_QOS_ENABLED (1 << 1) 213 214 struct spdk_bdev_channel { 215 struct spdk_bdev *bdev; 216 217 /* The channel for the underlying device */ 218 struct spdk_io_channel *channel; 219 220 /* Per io_device per thread data */ 221 struct spdk_bdev_shared_resource *shared_resource; 222 223 struct spdk_bdev_io_stat stat; 224 225 /* 226 * Count of I/O submitted through this channel and waiting for completion. 227 * Incremented before submit_request() is called on an spdk_bdev_io. 228 */ 229 uint64_t io_outstanding; 230 231 bdev_io_tailq_t queued_resets; 232 233 uint32_t flags; 234 235 #ifdef SPDK_CONFIG_VTUNE 236 uint64_t start_tsc; 237 uint64_t interval_tsc; 238 __itt_string_handle *handle; 239 struct spdk_bdev_io_stat prev_stat; 240 #endif 241 242 }; 243 244 struct spdk_bdev_desc { 245 struct spdk_bdev *bdev; 246 spdk_bdev_remove_cb_t remove_cb; 247 void *remove_ctx; 248 bool remove_scheduled; 249 bool write; 250 TAILQ_ENTRY(spdk_bdev_desc) link; 251 }; 252 253 struct spdk_bdev_iostat_ctx { 254 struct spdk_bdev_io_stat *stat; 255 spdk_bdev_get_device_stat_cb cb; 256 void *cb_arg; 257 }; 258 259 #define __bdev_to_io_dev(bdev) (((char *)bdev) + 1) 260 #define __bdev_from_io_dev(io_dev) ((struct spdk_bdev *)(((char *)io_dev) - 1)) 261 262 static void _spdk_bdev_write_zero_buffer_done(struct spdk_bdev_io *bdev_io, bool success, 263 void *cb_arg); 264 static void _spdk_bdev_write_zero_buffer_next(void *_bdev_io); 265 266 void 267 spdk_bdev_get_opts(struct spdk_bdev_opts *opts) 268 { 269 *opts = g_bdev_opts; 270 } 271 272 int 273 spdk_bdev_set_opts(struct spdk_bdev_opts *opts) 274 { 275 uint32_t min_pool_size; 276 277 /* 278 * Add 1 to the thread count to account for the extra mgmt_ch that gets created during subsystem 279 * initialization. A second mgmt_ch will be created on the same thread when the application starts 280 * but before the deferred put_io_channel event is executed for the first mgmt_ch. 281 */ 282 min_pool_size = opts->bdev_io_cache_size * (spdk_thread_get_count() + 1); 283 if (opts->bdev_io_pool_size < min_pool_size) { 284 SPDK_ERRLOG("bdev_io_pool_size %" PRIu32 " is not compatible with bdev_io_cache_size %" PRIu32 285 " and %" PRIu32 " threads\n", opts->bdev_io_pool_size, opts->bdev_io_cache_size, 286 spdk_thread_get_count()); 287 SPDK_ERRLOG("bdev_io_pool_size must be at least %" PRIu32 "\n", min_pool_size); 288 return -1; 289 } 290 291 g_bdev_opts = *opts; 292 return 0; 293 } 294 295 struct spdk_bdev * 296 spdk_bdev_first(void) 297 { 298 struct spdk_bdev *bdev; 299 300 bdev = TAILQ_FIRST(&g_bdev_mgr.bdevs); 301 if (bdev) { 302 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Starting bdev iteration at %s\n", bdev->name); 303 } 304 305 return bdev; 306 } 307 308 struct spdk_bdev * 309 spdk_bdev_next(struct spdk_bdev *prev) 310 { 311 struct spdk_bdev *bdev; 312 313 bdev = TAILQ_NEXT(prev, internal.link); 314 if (bdev) { 315 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Continuing bdev iteration at %s\n", bdev->name); 316 } 317 318 return bdev; 319 } 320 321 static struct spdk_bdev * 322 _bdev_next_leaf(struct spdk_bdev *bdev) 323 { 324 while (bdev != NULL) { 325 if (bdev->internal.claim_module == NULL) { 326 return bdev; 327 } else { 328 bdev = TAILQ_NEXT(bdev, internal.link); 329 } 330 } 331 332 return bdev; 333 } 334 335 struct spdk_bdev * 336 spdk_bdev_first_leaf(void) 337 { 338 struct spdk_bdev *bdev; 339 340 bdev = _bdev_next_leaf(TAILQ_FIRST(&g_bdev_mgr.bdevs)); 341 342 if (bdev) { 343 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Starting bdev iteration at %s\n", bdev->name); 344 } 345 346 return bdev; 347 } 348 349 struct spdk_bdev * 350 spdk_bdev_next_leaf(struct spdk_bdev *prev) 351 { 352 struct spdk_bdev *bdev; 353 354 bdev = _bdev_next_leaf(TAILQ_NEXT(prev, internal.link)); 355 356 if (bdev) { 357 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Continuing bdev iteration at %s\n", bdev->name); 358 } 359 360 return bdev; 361 } 362 363 struct spdk_bdev * 364 spdk_bdev_get_by_name(const char *bdev_name) 365 { 366 struct spdk_bdev_alias *tmp; 367 struct spdk_bdev *bdev = spdk_bdev_first(); 368 369 while (bdev != NULL) { 370 if (strcmp(bdev_name, bdev->name) == 0) { 371 return bdev; 372 } 373 374 TAILQ_FOREACH(tmp, &bdev->aliases, tailq) { 375 if (strcmp(bdev_name, tmp->alias) == 0) { 376 return bdev; 377 } 378 } 379 380 bdev = spdk_bdev_next(bdev); 381 } 382 383 return NULL; 384 } 385 386 void 387 spdk_bdev_io_set_buf(struct spdk_bdev_io *bdev_io, void *buf, size_t len) 388 { 389 struct iovec *iovs; 390 391 iovs = bdev_io->u.bdev.iovs; 392 393 assert(iovs != NULL); 394 assert(bdev_io->u.bdev.iovcnt >= 1); 395 396 iovs[0].iov_base = buf; 397 iovs[0].iov_len = len; 398 } 399 400 static void 401 spdk_bdev_io_put_buf(struct spdk_bdev_io *bdev_io) 402 { 403 struct spdk_mempool *pool; 404 struct spdk_bdev_io *tmp; 405 void *buf, *aligned_buf; 406 bdev_io_stailq_t *stailq; 407 struct spdk_bdev_mgmt_channel *ch; 408 409 assert(bdev_io->u.bdev.iovcnt == 1); 410 411 buf = bdev_io->internal.buf; 412 ch = bdev_io->internal.ch->shared_resource->mgmt_ch; 413 414 bdev_io->internal.buf = NULL; 415 416 if (bdev_io->internal.buf_len <= SPDK_BDEV_SMALL_BUF_MAX_SIZE) { 417 pool = g_bdev_mgr.buf_small_pool; 418 stailq = &ch->need_buf_small; 419 } else { 420 pool = g_bdev_mgr.buf_large_pool; 421 stailq = &ch->need_buf_large; 422 } 423 424 if (STAILQ_EMPTY(stailq)) { 425 spdk_mempool_put(pool, buf); 426 } else { 427 tmp = STAILQ_FIRST(stailq); 428 429 aligned_buf = (void *)(((uintptr_t)buf + 511) & ~511UL); 430 spdk_bdev_io_set_buf(bdev_io, aligned_buf, tmp->internal.buf_len); 431 432 STAILQ_REMOVE_HEAD(stailq, internal.buf_link); 433 tmp->internal.buf = buf; 434 tmp->internal.get_buf_cb(tmp->internal.ch->channel, tmp); 435 } 436 } 437 438 void 439 spdk_bdev_io_get_buf(struct spdk_bdev_io *bdev_io, spdk_bdev_io_get_buf_cb cb, uint64_t len) 440 { 441 struct spdk_mempool *pool; 442 bdev_io_stailq_t *stailq; 443 void *buf, *aligned_buf; 444 struct spdk_bdev_mgmt_channel *mgmt_ch; 445 446 assert(cb != NULL); 447 assert(bdev_io->u.bdev.iovs != NULL); 448 449 if (spdk_unlikely(bdev_io->u.bdev.iovs[0].iov_base != NULL)) { 450 /* Buffer already present */ 451 cb(bdev_io->internal.ch->channel, bdev_io); 452 return; 453 } 454 455 assert(len <= SPDK_BDEV_LARGE_BUF_MAX_SIZE); 456 mgmt_ch = bdev_io->internal.ch->shared_resource->mgmt_ch; 457 458 bdev_io->internal.buf_len = len; 459 bdev_io->internal.get_buf_cb = cb; 460 if (len <= SPDK_BDEV_SMALL_BUF_MAX_SIZE) { 461 pool = g_bdev_mgr.buf_small_pool; 462 stailq = &mgmt_ch->need_buf_small; 463 } else { 464 pool = g_bdev_mgr.buf_large_pool; 465 stailq = &mgmt_ch->need_buf_large; 466 } 467 468 buf = spdk_mempool_get(pool); 469 470 if (!buf) { 471 STAILQ_INSERT_TAIL(stailq, bdev_io, internal.buf_link); 472 } else { 473 aligned_buf = (void *)(((uintptr_t)buf + 511) & ~511UL); 474 spdk_bdev_io_set_buf(bdev_io, aligned_buf, len); 475 476 bdev_io->internal.buf = buf; 477 bdev_io->internal.get_buf_cb(bdev_io->internal.ch->channel, bdev_io); 478 } 479 } 480 481 static int 482 spdk_bdev_module_get_max_ctx_size(void) 483 { 484 struct spdk_bdev_module *bdev_module; 485 int max_bdev_module_size = 0; 486 487 TAILQ_FOREACH(bdev_module, &g_bdev_mgr.bdev_modules, internal.tailq) { 488 if (bdev_module->get_ctx_size && bdev_module->get_ctx_size() > max_bdev_module_size) { 489 max_bdev_module_size = bdev_module->get_ctx_size(); 490 } 491 } 492 493 return max_bdev_module_size; 494 } 495 496 void 497 spdk_bdev_config_text(FILE *fp) 498 { 499 struct spdk_bdev_module *bdev_module; 500 501 TAILQ_FOREACH(bdev_module, &g_bdev_mgr.bdev_modules, internal.tailq) { 502 if (bdev_module->config_text) { 503 bdev_module->config_text(fp); 504 } 505 } 506 } 507 508 void 509 spdk_bdev_subsystem_config_json(struct spdk_json_write_ctx *w) 510 { 511 struct spdk_bdev_module *bdev_module; 512 struct spdk_bdev *bdev; 513 514 assert(w != NULL); 515 516 spdk_json_write_array_begin(w); 517 518 spdk_json_write_object_begin(w); 519 spdk_json_write_named_string(w, "method", "set_bdev_options"); 520 spdk_json_write_name(w, "params"); 521 spdk_json_write_object_begin(w); 522 spdk_json_write_named_uint32(w, "bdev_io_pool_size", g_bdev_opts.bdev_io_pool_size); 523 spdk_json_write_named_uint32(w, "bdev_io_cache_size", g_bdev_opts.bdev_io_cache_size); 524 spdk_json_write_object_end(w); 525 spdk_json_write_object_end(w); 526 527 TAILQ_FOREACH(bdev_module, &g_bdev_mgr.bdev_modules, internal.tailq) { 528 if (bdev_module->config_json) { 529 bdev_module->config_json(w); 530 } 531 } 532 533 TAILQ_FOREACH(bdev, &g_bdev_mgr.bdevs, internal.link) { 534 spdk_bdev_config_json(bdev, w); 535 } 536 537 spdk_json_write_array_end(w); 538 } 539 540 static int 541 spdk_bdev_mgmt_channel_create(void *io_device, void *ctx_buf) 542 { 543 struct spdk_bdev_mgmt_channel *ch = ctx_buf; 544 struct spdk_bdev_io *bdev_io; 545 uint32_t i; 546 547 STAILQ_INIT(&ch->need_buf_small); 548 STAILQ_INIT(&ch->need_buf_large); 549 550 STAILQ_INIT(&ch->per_thread_cache); 551 ch->bdev_io_cache_size = g_bdev_opts.bdev_io_cache_size; 552 553 /* Pre-populate bdev_io cache to ensure this thread cannot be starved. */ 554 ch->per_thread_cache_count = 0; 555 for (i = 0; i < ch->bdev_io_cache_size; i++) { 556 bdev_io = spdk_mempool_get(g_bdev_mgr.bdev_io_pool); 557 assert(bdev_io != NULL); 558 ch->per_thread_cache_count++; 559 STAILQ_INSERT_TAIL(&ch->per_thread_cache, bdev_io, internal.buf_link); 560 } 561 562 TAILQ_INIT(&ch->shared_resources); 563 TAILQ_INIT(&ch->io_wait_queue); 564 565 return 0; 566 } 567 568 static void 569 spdk_bdev_mgmt_channel_destroy(void *io_device, void *ctx_buf) 570 { 571 struct spdk_bdev_mgmt_channel *ch = ctx_buf; 572 struct spdk_bdev_io *bdev_io; 573 574 if (!STAILQ_EMPTY(&ch->need_buf_small) || !STAILQ_EMPTY(&ch->need_buf_large)) { 575 SPDK_ERRLOG("Pending I/O list wasn't empty on mgmt channel free\n"); 576 } 577 578 if (!TAILQ_EMPTY(&ch->shared_resources)) { 579 SPDK_ERRLOG("Module channel list wasn't empty on mgmt channel free\n"); 580 } 581 582 while (!STAILQ_EMPTY(&ch->per_thread_cache)) { 583 bdev_io = STAILQ_FIRST(&ch->per_thread_cache); 584 STAILQ_REMOVE_HEAD(&ch->per_thread_cache, internal.buf_link); 585 ch->per_thread_cache_count--; 586 spdk_mempool_put(g_bdev_mgr.bdev_io_pool, (void *)bdev_io); 587 } 588 589 assert(ch->per_thread_cache_count == 0); 590 } 591 592 static void 593 spdk_bdev_init_complete(int rc) 594 { 595 spdk_bdev_init_cb cb_fn = g_init_cb_fn; 596 void *cb_arg = g_init_cb_arg; 597 struct spdk_bdev_module *m; 598 599 g_bdev_mgr.init_complete = true; 600 g_init_cb_fn = NULL; 601 g_init_cb_arg = NULL; 602 603 /* 604 * For modules that need to know when subsystem init is complete, 605 * inform them now. 606 */ 607 if (rc == 0) { 608 TAILQ_FOREACH(m, &g_bdev_mgr.bdev_modules, internal.tailq) { 609 if (m->init_complete) { 610 m->init_complete(); 611 } 612 } 613 } 614 615 cb_fn(cb_arg, rc); 616 } 617 618 static void 619 spdk_bdev_module_action_complete(void) 620 { 621 struct spdk_bdev_module *m; 622 623 /* 624 * Don't finish bdev subsystem initialization if 625 * module pre-initialization is still in progress, or 626 * the subsystem been already initialized. 627 */ 628 if (!g_bdev_mgr.module_init_complete || g_bdev_mgr.init_complete) { 629 return; 630 } 631 632 /* 633 * Check all bdev modules for inits/examinations in progress. If any 634 * exist, return immediately since we cannot finish bdev subsystem 635 * initialization until all are completed. 636 */ 637 TAILQ_FOREACH(m, &g_bdev_mgr.bdev_modules, internal.tailq) { 638 if (m->internal.action_in_progress > 0) { 639 return; 640 } 641 } 642 643 /* 644 * Modules already finished initialization - now that all 645 * the bdev modules have finished their asynchronous I/O 646 * processing, the entire bdev layer can be marked as complete. 647 */ 648 spdk_bdev_init_complete(0); 649 } 650 651 static void 652 spdk_bdev_module_action_done(struct spdk_bdev_module *module) 653 { 654 assert(module->internal.action_in_progress > 0); 655 module->internal.action_in_progress--; 656 spdk_bdev_module_action_complete(); 657 } 658 659 void 660 spdk_bdev_module_init_done(struct spdk_bdev_module *module) 661 { 662 spdk_bdev_module_action_done(module); 663 } 664 665 void 666 spdk_bdev_module_examine_done(struct spdk_bdev_module *module) 667 { 668 spdk_bdev_module_action_done(module); 669 } 670 671 static int 672 spdk_bdev_modules_init(void) 673 { 674 struct spdk_bdev_module *module; 675 int rc = 0; 676 677 TAILQ_FOREACH(module, &g_bdev_mgr.bdev_modules, internal.tailq) { 678 rc = module->module_init(); 679 if (rc != 0) { 680 break; 681 } 682 } 683 684 g_bdev_mgr.module_init_complete = true; 685 return rc; 686 } 687 688 689 static void 690 spdk_bdev_init_failed_complete(void *cb_arg) 691 { 692 spdk_bdev_init_complete(-1); 693 } 694 695 static void 696 spdk_bdev_init_failed(void *cb_arg) 697 { 698 spdk_bdev_finish(spdk_bdev_init_failed_complete, NULL); 699 } 700 701 void 702 spdk_bdev_initialize(spdk_bdev_init_cb cb_fn, void *cb_arg) 703 { 704 struct spdk_conf_section *sp; 705 struct spdk_bdev_opts bdev_opts; 706 int32_t bdev_io_pool_size, bdev_io_cache_size; 707 int cache_size; 708 int rc = 0; 709 char mempool_name[32]; 710 711 assert(cb_fn != NULL); 712 713 sp = spdk_conf_find_section(NULL, "Bdev"); 714 if (sp != NULL) { 715 spdk_bdev_get_opts(&bdev_opts); 716 717 bdev_io_pool_size = spdk_conf_section_get_intval(sp, "BdevIoPoolSize"); 718 if (bdev_io_pool_size >= 0) { 719 bdev_opts.bdev_io_pool_size = bdev_io_pool_size; 720 } 721 722 bdev_io_cache_size = spdk_conf_section_get_intval(sp, "BdevIoCacheSize"); 723 if (bdev_io_cache_size >= 0) { 724 bdev_opts.bdev_io_cache_size = bdev_io_cache_size; 725 } 726 727 if (spdk_bdev_set_opts(&bdev_opts)) { 728 spdk_bdev_init_complete(-1); 729 return; 730 } 731 732 assert(memcmp(&bdev_opts, &g_bdev_opts, sizeof(bdev_opts)) == 0); 733 } 734 735 g_init_cb_fn = cb_fn; 736 g_init_cb_arg = cb_arg; 737 738 snprintf(mempool_name, sizeof(mempool_name), "bdev_io_%d", getpid()); 739 740 g_bdev_mgr.bdev_io_pool = spdk_mempool_create(mempool_name, 741 g_bdev_opts.bdev_io_pool_size, 742 sizeof(struct spdk_bdev_io) + 743 spdk_bdev_module_get_max_ctx_size(), 744 0, 745 SPDK_ENV_SOCKET_ID_ANY); 746 747 if (g_bdev_mgr.bdev_io_pool == NULL) { 748 SPDK_ERRLOG("could not allocate spdk_bdev_io pool\n"); 749 spdk_bdev_init_complete(-1); 750 return; 751 } 752 753 /** 754 * Ensure no more than half of the total buffers end up local caches, by 755 * using spdk_thread_get_count() to determine how many local caches we need 756 * to account for. 757 */ 758 cache_size = BUF_SMALL_POOL_SIZE / (2 * spdk_thread_get_count()); 759 snprintf(mempool_name, sizeof(mempool_name), "buf_small_pool_%d", getpid()); 760 761 g_bdev_mgr.buf_small_pool = spdk_mempool_create(mempool_name, 762 BUF_SMALL_POOL_SIZE, 763 SPDK_BDEV_SMALL_BUF_MAX_SIZE + 512, 764 cache_size, 765 SPDK_ENV_SOCKET_ID_ANY); 766 if (!g_bdev_mgr.buf_small_pool) { 767 SPDK_ERRLOG("create rbuf small pool failed\n"); 768 spdk_bdev_init_complete(-1); 769 return; 770 } 771 772 cache_size = BUF_LARGE_POOL_SIZE / (2 * spdk_thread_get_count()); 773 snprintf(mempool_name, sizeof(mempool_name), "buf_large_pool_%d", getpid()); 774 775 g_bdev_mgr.buf_large_pool = spdk_mempool_create(mempool_name, 776 BUF_LARGE_POOL_SIZE, 777 SPDK_BDEV_LARGE_BUF_MAX_SIZE + 512, 778 cache_size, 779 SPDK_ENV_SOCKET_ID_ANY); 780 if (!g_bdev_mgr.buf_large_pool) { 781 SPDK_ERRLOG("create rbuf large pool failed\n"); 782 spdk_bdev_init_complete(-1); 783 return; 784 } 785 786 g_bdev_mgr.zero_buffer = spdk_dma_zmalloc(ZERO_BUFFER_SIZE, ZERO_BUFFER_SIZE, 787 NULL); 788 if (!g_bdev_mgr.zero_buffer) { 789 SPDK_ERRLOG("create bdev zero buffer failed\n"); 790 spdk_bdev_init_complete(-1); 791 return; 792 } 793 794 #ifdef SPDK_CONFIG_VTUNE 795 g_bdev_mgr.domain = __itt_domain_create("spdk_bdev"); 796 #endif 797 798 spdk_io_device_register(&g_bdev_mgr, spdk_bdev_mgmt_channel_create, 799 spdk_bdev_mgmt_channel_destroy, 800 sizeof(struct spdk_bdev_mgmt_channel)); 801 802 rc = spdk_bdev_modules_init(); 803 if (rc != 0) { 804 SPDK_ERRLOG("bdev modules init failed\n"); 805 spdk_thread_send_msg(spdk_get_thread(), spdk_bdev_init_failed, NULL); 806 return; 807 } 808 809 spdk_bdev_module_action_complete(); 810 } 811 812 static void 813 spdk_bdev_mgr_unregister_cb(void *io_device) 814 { 815 spdk_bdev_fini_cb cb_fn = g_fini_cb_fn; 816 817 if (spdk_mempool_count(g_bdev_mgr.bdev_io_pool) != g_bdev_opts.bdev_io_pool_size) { 818 SPDK_ERRLOG("bdev IO pool count is %zu but should be %u\n", 819 spdk_mempool_count(g_bdev_mgr.bdev_io_pool), 820 g_bdev_opts.bdev_io_pool_size); 821 } 822 823 if (spdk_mempool_count(g_bdev_mgr.buf_small_pool) != BUF_SMALL_POOL_SIZE) { 824 SPDK_ERRLOG("Small buffer pool count is %zu but should be %u\n", 825 spdk_mempool_count(g_bdev_mgr.buf_small_pool), 826 BUF_SMALL_POOL_SIZE); 827 assert(false); 828 } 829 830 if (spdk_mempool_count(g_bdev_mgr.buf_large_pool) != BUF_LARGE_POOL_SIZE) { 831 SPDK_ERRLOG("Large buffer pool count is %zu but should be %u\n", 832 spdk_mempool_count(g_bdev_mgr.buf_large_pool), 833 BUF_LARGE_POOL_SIZE); 834 assert(false); 835 } 836 837 spdk_mempool_free(g_bdev_mgr.bdev_io_pool); 838 spdk_mempool_free(g_bdev_mgr.buf_small_pool); 839 spdk_mempool_free(g_bdev_mgr.buf_large_pool); 840 spdk_dma_free(g_bdev_mgr.zero_buffer); 841 842 cb_fn(g_fini_cb_arg); 843 g_fini_cb_fn = NULL; 844 g_fini_cb_arg = NULL; 845 } 846 847 static struct spdk_bdev_module *g_resume_bdev_module = NULL; 848 849 static void 850 spdk_bdev_module_finish_iter(void *arg) 851 { 852 struct spdk_bdev_module *bdev_module; 853 854 /* Start iterating from the last touched module */ 855 if (!g_resume_bdev_module) { 856 bdev_module = TAILQ_FIRST(&g_bdev_mgr.bdev_modules); 857 } else { 858 bdev_module = TAILQ_NEXT(g_resume_bdev_module, internal.tailq); 859 } 860 861 while (bdev_module) { 862 if (bdev_module->async_fini) { 863 /* Save our place so we can resume later. We must 864 * save the variable here, before calling module_fini() 865 * below, because in some cases the module may immediately 866 * call spdk_bdev_module_finish_done() and re-enter 867 * this function to continue iterating. */ 868 g_resume_bdev_module = bdev_module; 869 } 870 871 if (bdev_module->module_fini) { 872 bdev_module->module_fini(); 873 } 874 875 if (bdev_module->async_fini) { 876 return; 877 } 878 879 bdev_module = TAILQ_NEXT(bdev_module, internal.tailq); 880 } 881 882 g_resume_bdev_module = NULL; 883 spdk_io_device_unregister(&g_bdev_mgr, spdk_bdev_mgr_unregister_cb); 884 } 885 886 void 887 spdk_bdev_module_finish_done(void) 888 { 889 if (spdk_get_thread() != g_fini_thread) { 890 spdk_thread_send_msg(g_fini_thread, spdk_bdev_module_finish_iter, NULL); 891 } else { 892 spdk_bdev_module_finish_iter(NULL); 893 } 894 } 895 896 static void 897 _spdk_bdev_finish_unregister_bdevs_iter(void *cb_arg, int bdeverrno) 898 { 899 struct spdk_bdev *bdev = cb_arg; 900 901 if (bdeverrno && bdev) { 902 SPDK_WARNLOG("Unable to unregister bdev '%s' during spdk_bdev_finish()\n", 903 bdev->name); 904 905 /* 906 * Since the call to spdk_bdev_unregister() failed, we have no way to free this 907 * bdev; try to continue by manually removing this bdev from the list and continue 908 * with the next bdev in the list. 909 */ 910 TAILQ_REMOVE(&g_bdev_mgr.bdevs, bdev, internal.link); 911 } 912 913 if (TAILQ_EMPTY(&g_bdev_mgr.bdevs)) { 914 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Done unregistering bdevs\n"); 915 /* 916 * Bdev module finish need to be deffered as we might be in the middle of some context 917 * (like bdev part free) that will use this bdev (or private bdev driver ctx data) 918 * after returning. 919 */ 920 spdk_thread_send_msg(spdk_get_thread(), spdk_bdev_module_finish_iter, NULL); 921 return; 922 } 923 924 /* 925 * Unregister the last bdev in the list. The last bdev in the list should be a bdev 926 * that has no bdevs that depend on it. 927 */ 928 bdev = TAILQ_LAST(&g_bdev_mgr.bdevs, spdk_bdev_list); 929 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Unregistering bdev '%s'\n", bdev->name); 930 spdk_bdev_unregister(bdev, _spdk_bdev_finish_unregister_bdevs_iter, bdev); 931 } 932 933 void 934 spdk_bdev_finish(spdk_bdev_fini_cb cb_fn, void *cb_arg) 935 { 936 struct spdk_bdev_module *m; 937 938 assert(cb_fn != NULL); 939 940 g_fini_thread = spdk_get_thread(); 941 942 g_fini_cb_fn = cb_fn; 943 g_fini_cb_arg = cb_arg; 944 945 TAILQ_FOREACH(m, &g_bdev_mgr.bdev_modules, internal.tailq) { 946 if (m->fini_start) { 947 m->fini_start(); 948 } 949 } 950 951 _spdk_bdev_finish_unregister_bdevs_iter(NULL, 0); 952 } 953 954 static struct spdk_bdev_io * 955 spdk_bdev_get_io(struct spdk_bdev_channel *channel) 956 { 957 struct spdk_bdev_mgmt_channel *ch = channel->shared_resource->mgmt_ch; 958 struct spdk_bdev_io *bdev_io; 959 960 if (ch->per_thread_cache_count > 0) { 961 bdev_io = STAILQ_FIRST(&ch->per_thread_cache); 962 STAILQ_REMOVE_HEAD(&ch->per_thread_cache, internal.buf_link); 963 ch->per_thread_cache_count--; 964 } else if (spdk_unlikely(!TAILQ_EMPTY(&ch->io_wait_queue))) { 965 /* 966 * Don't try to look for bdev_ios in the global pool if there are 967 * waiters on bdev_ios - we don't want this caller to jump the line. 968 */ 969 bdev_io = NULL; 970 } else { 971 bdev_io = spdk_mempool_get(g_bdev_mgr.bdev_io_pool); 972 } 973 974 return bdev_io; 975 } 976 977 void 978 spdk_bdev_free_io(struct spdk_bdev_io *bdev_io) 979 { 980 struct spdk_bdev_mgmt_channel *ch = bdev_io->internal.ch->shared_resource->mgmt_ch; 981 982 assert(bdev_io != NULL); 983 assert(bdev_io->internal.status != SPDK_BDEV_IO_STATUS_PENDING); 984 985 if (bdev_io->internal.buf != NULL) { 986 spdk_bdev_io_put_buf(bdev_io); 987 } 988 989 if (ch->per_thread_cache_count < ch->bdev_io_cache_size) { 990 ch->per_thread_cache_count++; 991 STAILQ_INSERT_TAIL(&ch->per_thread_cache, bdev_io, internal.buf_link); 992 while (ch->per_thread_cache_count > 0 && !TAILQ_EMPTY(&ch->io_wait_queue)) { 993 struct spdk_bdev_io_wait_entry *entry; 994 995 entry = TAILQ_FIRST(&ch->io_wait_queue); 996 TAILQ_REMOVE(&ch->io_wait_queue, entry, link); 997 entry->cb_fn(entry->cb_arg); 998 } 999 } else { 1000 /* We should never have a full cache with entries on the io wait queue. */ 1001 assert(TAILQ_EMPTY(&ch->io_wait_queue)); 1002 spdk_mempool_put(g_bdev_mgr.bdev_io_pool, (void *)bdev_io); 1003 } 1004 } 1005 1006 static uint64_t 1007 _spdk_bdev_get_io_size_in_byte(struct spdk_bdev_io *bdev_io) 1008 { 1009 struct spdk_bdev *bdev = bdev_io->bdev; 1010 1011 switch (bdev_io->type) { 1012 case SPDK_BDEV_IO_TYPE_NVME_ADMIN: 1013 case SPDK_BDEV_IO_TYPE_NVME_IO: 1014 case SPDK_BDEV_IO_TYPE_NVME_IO_MD: 1015 return bdev_io->u.nvme_passthru.nbytes; 1016 case SPDK_BDEV_IO_TYPE_READ: 1017 case SPDK_BDEV_IO_TYPE_WRITE: 1018 case SPDK_BDEV_IO_TYPE_UNMAP: 1019 case SPDK_BDEV_IO_TYPE_WRITE_ZEROES: 1020 return bdev_io->u.bdev.num_blocks * bdev->blocklen; 1021 default: 1022 return 0; 1023 } 1024 } 1025 1026 static void 1027 _spdk_bdev_qos_io_submit(struct spdk_bdev_channel *ch) 1028 { 1029 struct spdk_bdev_io *bdev_io = NULL; 1030 struct spdk_bdev *bdev = ch->bdev; 1031 struct spdk_bdev_qos *qos = bdev->internal.qos; 1032 struct spdk_bdev_shared_resource *shared_resource = ch->shared_resource; 1033 1034 while (!TAILQ_EMPTY(&qos->queued)) { 1035 if (qos->max_ios_per_timeslice > 0 && qos->io_remaining_this_timeslice == 0) { 1036 break; 1037 } 1038 1039 if (qos->max_byte_per_timeslice > 0 && qos->byte_remaining_this_timeslice <= 0) { 1040 break; 1041 } 1042 1043 bdev_io = TAILQ_FIRST(&qos->queued); 1044 TAILQ_REMOVE(&qos->queued, bdev_io, internal.link); 1045 qos->io_remaining_this_timeslice--; 1046 qos->byte_remaining_this_timeslice -= _spdk_bdev_get_io_size_in_byte(bdev_io); 1047 ch->io_outstanding++; 1048 shared_resource->io_outstanding++; 1049 bdev->fn_table->submit_request(ch->channel, bdev_io); 1050 } 1051 } 1052 1053 static bool 1054 _spdk_bdev_io_type_can_split(uint8_t type) 1055 { 1056 assert(type != SPDK_BDEV_IO_TYPE_INVALID); 1057 assert(type < SPDK_BDEV_NUM_IO_TYPES); 1058 1059 /* Only split READ and WRITE I/O. Theoretically other types of I/O like 1060 * UNMAP could be split, but these types of I/O are typically much larger 1061 * in size (sometimes the size of the entire block device), and the bdev 1062 * module can more efficiently split these types of I/O. Plus those types 1063 * of I/O do not have a payload, which makes the splitting process simpler. 1064 */ 1065 if (type == SPDK_BDEV_IO_TYPE_READ || type == SPDK_BDEV_IO_TYPE_WRITE) { 1066 return true; 1067 } else { 1068 return false; 1069 } 1070 } 1071 1072 static bool 1073 _spdk_bdev_io_should_split(struct spdk_bdev_io *bdev_io) 1074 { 1075 uint64_t start_stripe, end_stripe; 1076 uint32_t io_boundary = bdev_io->bdev->optimal_io_boundary; 1077 1078 if (io_boundary == 0) { 1079 return false; 1080 } 1081 1082 if (!_spdk_bdev_io_type_can_split(bdev_io->type)) { 1083 return false; 1084 } 1085 1086 start_stripe = bdev_io->u.bdev.offset_blocks; 1087 end_stripe = start_stripe + bdev_io->u.bdev.num_blocks - 1; 1088 /* Avoid expensive div operations if possible. These spdk_u32 functions are very cheap. */ 1089 if (spdk_likely(spdk_u32_is_pow2(io_boundary))) { 1090 start_stripe >>= spdk_u32log2(io_boundary); 1091 end_stripe >>= spdk_u32log2(io_boundary); 1092 } else { 1093 start_stripe /= io_boundary; 1094 end_stripe /= io_boundary; 1095 } 1096 return (start_stripe != end_stripe); 1097 } 1098 1099 static uint32_t 1100 _to_next_boundary(uint64_t offset, uint32_t boundary) 1101 { 1102 return (boundary - (offset % boundary)); 1103 } 1104 1105 static void 1106 _spdk_bdev_io_split_done(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg); 1107 1108 static void 1109 _spdk_bdev_io_split_with_payload(void *_bdev_io) 1110 { 1111 struct spdk_bdev_io *bdev_io = _bdev_io; 1112 uint64_t current_offset, remaining, bytes_handled; 1113 uint32_t blocklen, to_next_boundary, to_next_boundary_bytes; 1114 struct iovec *parent_iov; 1115 uint64_t parent_iov_offset, child_iov_len; 1116 uint32_t child_iovcnt; 1117 int rc; 1118 1119 remaining = bdev_io->u.bdev.split_remaining_num_blocks; 1120 current_offset = bdev_io->u.bdev.split_current_offset_blocks; 1121 blocklen = bdev_io->bdev->blocklen; 1122 bytes_handled = (current_offset - bdev_io->u.bdev.offset_blocks) * blocklen; 1123 parent_iov = &bdev_io->u.bdev.iovs[0]; 1124 parent_iov_offset = 0; 1125 1126 while (bytes_handled > 0) { 1127 if (bytes_handled >= parent_iov->iov_len) { 1128 bytes_handled -= parent_iov->iov_len; 1129 parent_iov++; 1130 continue; 1131 } 1132 parent_iov_offset += bytes_handled; 1133 break; 1134 } 1135 1136 to_next_boundary = _to_next_boundary(current_offset, bdev_io->bdev->optimal_io_boundary); 1137 to_next_boundary = spdk_min(remaining, to_next_boundary); 1138 to_next_boundary_bytes = to_next_boundary * blocklen; 1139 child_iovcnt = 0; 1140 while (to_next_boundary_bytes > 0) { 1141 child_iov_len = spdk_min(to_next_boundary_bytes, parent_iov->iov_len - parent_iov_offset); 1142 to_next_boundary_bytes -= child_iov_len; 1143 1144 bdev_io->child_iov[child_iovcnt].iov_base = parent_iov->iov_base + parent_iov_offset; 1145 bdev_io->child_iov[child_iovcnt].iov_len = child_iov_len; 1146 1147 parent_iov++; 1148 parent_iov_offset = 0; 1149 child_iovcnt++; 1150 if (child_iovcnt == BDEV_IO_NUM_CHILD_IOV && to_next_boundary_bytes > 0) { 1151 /* We've run out of child iovs - we need to fail this I/O. */ 1152 bdev_io->internal.status = SPDK_BDEV_IO_STATUS_FAILED; 1153 bdev_io->internal.cb(bdev_io, SPDK_BDEV_IO_STATUS_FAILED, 1154 bdev_io->internal.caller_ctx); 1155 return; 1156 } 1157 } 1158 1159 if (bdev_io->type == SPDK_BDEV_IO_TYPE_READ) { 1160 rc = spdk_bdev_readv_blocks(bdev_io->internal.desc, 1161 spdk_io_channel_from_ctx(bdev_io->internal.ch), 1162 bdev_io->child_iov, child_iovcnt, current_offset, to_next_boundary, 1163 _spdk_bdev_io_split_done, bdev_io); 1164 } else { 1165 rc = spdk_bdev_writev_blocks(bdev_io->internal.desc, 1166 spdk_io_channel_from_ctx(bdev_io->internal.ch), 1167 bdev_io->child_iov, child_iovcnt, current_offset, to_next_boundary, 1168 _spdk_bdev_io_split_done, bdev_io); 1169 } 1170 1171 if (rc == 0) { 1172 bdev_io->u.bdev.split_current_offset_blocks += to_next_boundary; 1173 bdev_io->u.bdev.split_remaining_num_blocks -= to_next_boundary; 1174 } else { 1175 assert(rc == -ENOMEM); 1176 bdev_io->internal.waitq_entry.bdev = bdev_io->bdev; 1177 bdev_io->internal.waitq_entry.cb_fn = _spdk_bdev_io_split_with_payload; 1178 bdev_io->internal.waitq_entry.cb_arg = bdev_io; 1179 spdk_bdev_queue_io_wait(bdev_io->bdev, spdk_io_channel_from_ctx(bdev_io->internal.ch), 1180 &bdev_io->internal.waitq_entry); 1181 } 1182 } 1183 1184 static void 1185 _spdk_bdev_io_split_done(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg) 1186 { 1187 struct spdk_bdev_io *parent_io = cb_arg; 1188 1189 spdk_bdev_free_io(bdev_io); 1190 1191 if (!success) { 1192 parent_io->internal.status = SPDK_BDEV_IO_STATUS_FAILED; 1193 parent_io->internal.cb(parent_io, SPDK_BDEV_IO_STATUS_FAILED, parent_io->internal.caller_ctx); 1194 return; 1195 } 1196 1197 if (parent_io->u.bdev.split_remaining_num_blocks == 0) { 1198 parent_io->internal.status = SPDK_BDEV_IO_STATUS_SUCCESS; 1199 parent_io->internal.cb(parent_io, SPDK_BDEV_IO_STATUS_SUCCESS, parent_io->internal.caller_ctx); 1200 return; 1201 } 1202 1203 /* 1204 * Continue with the splitting process. This function will complete the parent I/O if the 1205 * splitting is done. 1206 */ 1207 _spdk_bdev_io_split_with_payload(parent_io); 1208 } 1209 1210 static void 1211 _spdk_bdev_io_split(struct spdk_bdev_io *bdev_io) 1212 { 1213 assert(_spdk_bdev_io_type_can_split(bdev_io->type)); 1214 1215 bdev_io->u.bdev.split_current_offset_blocks = bdev_io->u.bdev.offset_blocks; 1216 bdev_io->u.bdev.split_remaining_num_blocks = bdev_io->u.bdev.num_blocks; 1217 1218 _spdk_bdev_io_split_with_payload(bdev_io); 1219 } 1220 1221 static void 1222 _spdk_bdev_io_submit(void *ctx) 1223 { 1224 struct spdk_bdev_io *bdev_io = ctx; 1225 struct spdk_bdev *bdev = bdev_io->bdev; 1226 struct spdk_bdev_channel *bdev_ch = bdev_io->internal.ch; 1227 struct spdk_io_channel *ch = bdev_ch->channel; 1228 struct spdk_bdev_shared_resource *shared_resource = bdev_ch->shared_resource; 1229 1230 bdev_io->internal.submit_tsc = spdk_get_ticks(); 1231 bdev_ch->io_outstanding++; 1232 shared_resource->io_outstanding++; 1233 bdev_io->internal.in_submit_request = true; 1234 if (spdk_likely(bdev_ch->flags == 0)) { 1235 if (spdk_likely(TAILQ_EMPTY(&shared_resource->nomem_io))) { 1236 bdev->fn_table->submit_request(ch, bdev_io); 1237 } else { 1238 bdev_ch->io_outstanding--; 1239 shared_resource->io_outstanding--; 1240 TAILQ_INSERT_TAIL(&shared_resource->nomem_io, bdev_io, internal.link); 1241 } 1242 } else if (bdev_ch->flags & BDEV_CH_RESET_IN_PROGRESS) { 1243 spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED); 1244 } else if (bdev_ch->flags & BDEV_CH_QOS_ENABLED) { 1245 bdev_ch->io_outstanding--; 1246 shared_resource->io_outstanding--; 1247 TAILQ_INSERT_TAIL(&bdev->internal.qos->queued, bdev_io, internal.link); 1248 _spdk_bdev_qos_io_submit(bdev_ch); 1249 } else { 1250 SPDK_ERRLOG("unknown bdev_ch flag %x found\n", bdev_ch->flags); 1251 spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED); 1252 } 1253 bdev_io->internal.in_submit_request = false; 1254 } 1255 1256 static void 1257 spdk_bdev_io_submit(struct spdk_bdev_io *bdev_io) 1258 { 1259 struct spdk_bdev *bdev = bdev_io->bdev; 1260 struct spdk_thread *thread = spdk_io_channel_get_thread(bdev_io->internal.ch->channel); 1261 1262 assert(thread != NULL); 1263 assert(bdev_io->internal.status == SPDK_BDEV_IO_STATUS_PENDING); 1264 1265 if (bdev->split_on_optimal_io_boundary && _spdk_bdev_io_should_split(bdev_io)) { 1266 _spdk_bdev_io_split(bdev_io); 1267 return; 1268 } 1269 1270 if (bdev_io->internal.ch->flags & BDEV_CH_QOS_ENABLED) { 1271 if ((thread == bdev->internal.qos->thread) || !bdev->internal.qos->thread) { 1272 _spdk_bdev_io_submit(bdev_io); 1273 } else { 1274 bdev_io->internal.io_submit_ch = bdev_io->internal.ch; 1275 bdev_io->internal.ch = bdev->internal.qos->ch; 1276 spdk_thread_send_msg(bdev->internal.qos->thread, _spdk_bdev_io_submit, bdev_io); 1277 } 1278 } else { 1279 _spdk_bdev_io_submit(bdev_io); 1280 } 1281 } 1282 1283 static void 1284 spdk_bdev_io_submit_reset(struct spdk_bdev_io *bdev_io) 1285 { 1286 struct spdk_bdev *bdev = bdev_io->bdev; 1287 struct spdk_bdev_channel *bdev_ch = bdev_io->internal.ch; 1288 struct spdk_io_channel *ch = bdev_ch->channel; 1289 1290 assert(bdev_io->internal.status == SPDK_BDEV_IO_STATUS_PENDING); 1291 1292 bdev_io->internal.in_submit_request = true; 1293 bdev->fn_table->submit_request(ch, bdev_io); 1294 bdev_io->internal.in_submit_request = false; 1295 } 1296 1297 static void 1298 spdk_bdev_io_init(struct spdk_bdev_io *bdev_io, 1299 struct spdk_bdev *bdev, void *cb_arg, 1300 spdk_bdev_io_completion_cb cb) 1301 { 1302 bdev_io->bdev = bdev; 1303 bdev_io->internal.caller_ctx = cb_arg; 1304 bdev_io->internal.cb = cb; 1305 bdev_io->internal.status = SPDK_BDEV_IO_STATUS_PENDING; 1306 bdev_io->internal.in_submit_request = false; 1307 bdev_io->internal.buf = NULL; 1308 bdev_io->internal.io_submit_ch = NULL; 1309 } 1310 1311 static bool 1312 _spdk_bdev_io_type_supported(struct spdk_bdev *bdev, enum spdk_bdev_io_type io_type) 1313 { 1314 return bdev->fn_table->io_type_supported(bdev->ctxt, io_type); 1315 } 1316 1317 bool 1318 spdk_bdev_io_type_supported(struct spdk_bdev *bdev, enum spdk_bdev_io_type io_type) 1319 { 1320 bool supported; 1321 1322 supported = _spdk_bdev_io_type_supported(bdev, io_type); 1323 1324 if (!supported) { 1325 switch (io_type) { 1326 case SPDK_BDEV_IO_TYPE_WRITE_ZEROES: 1327 /* The bdev layer will emulate write zeroes as long as write is supported. */ 1328 supported = _spdk_bdev_io_type_supported(bdev, SPDK_BDEV_IO_TYPE_WRITE); 1329 break; 1330 default: 1331 break; 1332 } 1333 } 1334 1335 return supported; 1336 } 1337 1338 int 1339 spdk_bdev_dump_info_json(struct spdk_bdev *bdev, struct spdk_json_write_ctx *w) 1340 { 1341 if (bdev->fn_table->dump_info_json) { 1342 return bdev->fn_table->dump_info_json(bdev->ctxt, w); 1343 } 1344 1345 return 0; 1346 } 1347 1348 void 1349 spdk_bdev_config_json(struct spdk_bdev *bdev, struct spdk_json_write_ctx *w) 1350 { 1351 assert(bdev != NULL); 1352 assert(w != NULL); 1353 1354 if (bdev->fn_table->write_config_json) { 1355 bdev->fn_table->write_config_json(bdev, w); 1356 } else { 1357 spdk_json_write_object_begin(w); 1358 spdk_json_write_named_string(w, "name", bdev->name); 1359 spdk_json_write_object_end(w); 1360 } 1361 } 1362 1363 static void 1364 spdk_bdev_qos_update_max_quota_per_timeslice(struct spdk_bdev_qos *qos) 1365 { 1366 uint64_t max_ios_per_timeslice = 0, max_byte_per_timeslice = 0; 1367 1368 if (qos->iops_rate_limit > 0) { 1369 max_ios_per_timeslice = qos->iops_rate_limit * SPDK_BDEV_QOS_TIMESLICE_IN_USEC / 1370 SPDK_SEC_TO_USEC; 1371 qos->max_ios_per_timeslice = spdk_max(max_ios_per_timeslice, 1372 SPDK_BDEV_QOS_MIN_IO_PER_TIMESLICE); 1373 } 1374 1375 if (qos->byte_rate_limit > 0) { 1376 max_byte_per_timeslice = qos->byte_rate_limit * SPDK_BDEV_QOS_TIMESLICE_IN_USEC / 1377 SPDK_SEC_TO_USEC; 1378 qos->max_byte_per_timeslice = spdk_max(max_byte_per_timeslice, 1379 SPDK_BDEV_QOS_MIN_BYTE_PER_TIMESLICE); 1380 } 1381 } 1382 1383 static int 1384 spdk_bdev_channel_poll_qos(void *arg) 1385 { 1386 struct spdk_bdev_qos *qos = arg; 1387 1388 /* Reset for next round of rate limiting */ 1389 qos->io_remaining_this_timeslice = qos->max_ios_per_timeslice; 1390 1391 /* We may have allowed the bytes to slightly overrun in the last timeslice. 1392 * byte_remaining_this_timeslice is signed, so if it's negative here, we'll 1393 * account for the overrun so that the next timeslice will be appropriately 1394 * reduced. 1395 */ 1396 if (qos->byte_remaining_this_timeslice > 0) { 1397 qos->byte_remaining_this_timeslice = 0; 1398 } 1399 qos->byte_remaining_this_timeslice += qos->max_byte_per_timeslice; 1400 1401 _spdk_bdev_qos_io_submit(qos->ch); 1402 1403 return -1; 1404 } 1405 1406 static void 1407 _spdk_bdev_channel_destroy_resource(struct spdk_bdev_channel *ch) 1408 { 1409 struct spdk_bdev_shared_resource *shared_resource; 1410 1411 if (!ch) { 1412 return; 1413 } 1414 1415 if (ch->channel) { 1416 spdk_put_io_channel(ch->channel); 1417 } 1418 1419 assert(ch->io_outstanding == 0); 1420 1421 shared_resource = ch->shared_resource; 1422 if (shared_resource) { 1423 assert(ch->io_outstanding == 0); 1424 assert(shared_resource->ref > 0); 1425 shared_resource->ref--; 1426 if (shared_resource->ref == 0) { 1427 assert(shared_resource->io_outstanding == 0); 1428 TAILQ_REMOVE(&shared_resource->mgmt_ch->shared_resources, shared_resource, link); 1429 spdk_put_io_channel(spdk_io_channel_from_ctx(shared_resource->mgmt_ch)); 1430 free(shared_resource); 1431 } 1432 } 1433 } 1434 1435 /* Caller must hold bdev->internal.mutex. */ 1436 static void 1437 _spdk_bdev_enable_qos(struct spdk_bdev *bdev, struct spdk_bdev_channel *ch) 1438 { 1439 struct spdk_bdev_qos *qos = bdev->internal.qos; 1440 1441 /* Rate limiting on this bdev enabled */ 1442 if (qos) { 1443 if (qos->ch == NULL) { 1444 struct spdk_io_channel *io_ch; 1445 1446 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Selecting channel %p as QoS channel for bdev %s on thread %p\n", ch, 1447 bdev->name, spdk_get_thread()); 1448 1449 /* No qos channel has been selected, so set one up */ 1450 1451 /* Take another reference to ch */ 1452 io_ch = spdk_get_io_channel(__bdev_to_io_dev(bdev)); 1453 qos->ch = ch; 1454 1455 qos->thread = spdk_io_channel_get_thread(io_ch); 1456 1457 TAILQ_INIT(&qos->queued); 1458 spdk_bdev_qos_update_max_quota_per_timeslice(qos); 1459 qos->io_remaining_this_timeslice = qos->max_ios_per_timeslice; 1460 qos->byte_remaining_this_timeslice = qos->max_byte_per_timeslice; 1461 1462 qos->poller = spdk_poller_register(spdk_bdev_channel_poll_qos, 1463 qos, 1464 SPDK_BDEV_QOS_TIMESLICE_IN_USEC); 1465 } 1466 1467 ch->flags |= BDEV_CH_QOS_ENABLED; 1468 } 1469 } 1470 1471 static int 1472 spdk_bdev_channel_create(void *io_device, void *ctx_buf) 1473 { 1474 struct spdk_bdev *bdev = __bdev_from_io_dev(io_device); 1475 struct spdk_bdev_channel *ch = ctx_buf; 1476 struct spdk_io_channel *mgmt_io_ch; 1477 struct spdk_bdev_mgmt_channel *mgmt_ch; 1478 struct spdk_bdev_shared_resource *shared_resource; 1479 1480 ch->bdev = bdev; 1481 ch->channel = bdev->fn_table->get_io_channel(bdev->ctxt); 1482 if (!ch->channel) { 1483 return -1; 1484 } 1485 1486 mgmt_io_ch = spdk_get_io_channel(&g_bdev_mgr); 1487 if (!mgmt_io_ch) { 1488 return -1; 1489 } 1490 1491 mgmt_ch = spdk_io_channel_get_ctx(mgmt_io_ch); 1492 TAILQ_FOREACH(shared_resource, &mgmt_ch->shared_resources, link) { 1493 if (shared_resource->shared_ch == ch->channel) { 1494 spdk_put_io_channel(mgmt_io_ch); 1495 shared_resource->ref++; 1496 break; 1497 } 1498 } 1499 1500 if (shared_resource == NULL) { 1501 shared_resource = calloc(1, sizeof(*shared_resource)); 1502 if (shared_resource == NULL) { 1503 spdk_put_io_channel(mgmt_io_ch); 1504 return -1; 1505 } 1506 1507 shared_resource->mgmt_ch = mgmt_ch; 1508 shared_resource->io_outstanding = 0; 1509 TAILQ_INIT(&shared_resource->nomem_io); 1510 shared_resource->nomem_threshold = 0; 1511 shared_resource->shared_ch = ch->channel; 1512 shared_resource->ref = 1; 1513 TAILQ_INSERT_TAIL(&mgmt_ch->shared_resources, shared_resource, link); 1514 } 1515 1516 memset(&ch->stat, 0, sizeof(ch->stat)); 1517 ch->stat.ticks_rate = spdk_get_ticks_hz(); 1518 ch->io_outstanding = 0; 1519 TAILQ_INIT(&ch->queued_resets); 1520 ch->flags = 0; 1521 ch->shared_resource = shared_resource; 1522 1523 #ifdef SPDK_CONFIG_VTUNE 1524 { 1525 char *name; 1526 __itt_init_ittlib(NULL, 0); 1527 name = spdk_sprintf_alloc("spdk_bdev_%s_%p", ch->bdev->name, ch); 1528 if (!name) { 1529 _spdk_bdev_channel_destroy_resource(ch); 1530 return -1; 1531 } 1532 ch->handle = __itt_string_handle_create(name); 1533 free(name); 1534 ch->start_tsc = spdk_get_ticks(); 1535 ch->interval_tsc = spdk_get_ticks_hz() / 100; 1536 memset(&ch->prev_stat, 0, sizeof(ch->prev_stat)); 1537 } 1538 #endif 1539 1540 pthread_mutex_lock(&bdev->internal.mutex); 1541 _spdk_bdev_enable_qos(bdev, ch); 1542 pthread_mutex_unlock(&bdev->internal.mutex); 1543 1544 return 0; 1545 } 1546 1547 /* 1548 * Abort I/O that are waiting on a data buffer. These types of I/O are 1549 * linked using the spdk_bdev_io internal.buf_link TAILQ_ENTRY. 1550 */ 1551 static void 1552 _spdk_bdev_abort_buf_io(bdev_io_stailq_t *queue, struct spdk_bdev_channel *ch) 1553 { 1554 bdev_io_stailq_t tmp; 1555 struct spdk_bdev_io *bdev_io; 1556 1557 STAILQ_INIT(&tmp); 1558 1559 while (!STAILQ_EMPTY(queue)) { 1560 bdev_io = STAILQ_FIRST(queue); 1561 STAILQ_REMOVE_HEAD(queue, internal.buf_link); 1562 if (bdev_io->internal.ch == ch) { 1563 spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED); 1564 } else { 1565 STAILQ_INSERT_TAIL(&tmp, bdev_io, internal.buf_link); 1566 } 1567 } 1568 1569 STAILQ_SWAP(&tmp, queue, spdk_bdev_io); 1570 } 1571 1572 /* 1573 * Abort I/O that are queued waiting for submission. These types of I/O are 1574 * linked using the spdk_bdev_io link TAILQ_ENTRY. 1575 */ 1576 static void 1577 _spdk_bdev_abort_queued_io(bdev_io_tailq_t *queue, struct spdk_bdev_channel *ch) 1578 { 1579 struct spdk_bdev_io *bdev_io, *tmp; 1580 1581 TAILQ_FOREACH_SAFE(bdev_io, queue, internal.link, tmp) { 1582 if (bdev_io->internal.ch == ch) { 1583 TAILQ_REMOVE(queue, bdev_io, internal.link); 1584 /* 1585 * spdk_bdev_io_complete() assumes that the completed I/O had 1586 * been submitted to the bdev module. Since in this case it 1587 * hadn't, bump io_outstanding to account for the decrement 1588 * that spdk_bdev_io_complete() will do. 1589 */ 1590 if (bdev_io->type != SPDK_BDEV_IO_TYPE_RESET) { 1591 ch->io_outstanding++; 1592 ch->shared_resource->io_outstanding++; 1593 } 1594 spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED); 1595 } 1596 } 1597 } 1598 1599 static void 1600 spdk_bdev_qos_channel_destroy(void *cb_arg) 1601 { 1602 struct spdk_bdev_qos *qos = cb_arg; 1603 1604 spdk_put_io_channel(spdk_io_channel_from_ctx(qos->ch)); 1605 spdk_poller_unregister(&qos->poller); 1606 1607 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Free QoS %p.\n", qos); 1608 1609 free(qos); 1610 } 1611 1612 static int 1613 spdk_bdev_qos_destroy(struct spdk_bdev *bdev) 1614 { 1615 /* 1616 * Cleanly shutting down the QoS poller is tricky, because 1617 * during the asynchronous operation the user could open 1618 * a new descriptor and create a new channel, spawning 1619 * a new QoS poller. 1620 * 1621 * The strategy is to create a new QoS structure here and swap it 1622 * in. The shutdown path then continues to refer to the old one 1623 * until it completes and then releases it. 1624 */ 1625 struct spdk_bdev_qos *new_qos, *old_qos; 1626 1627 old_qos = bdev->internal.qos; 1628 1629 new_qos = calloc(1, sizeof(*new_qos)); 1630 if (!new_qos) { 1631 SPDK_ERRLOG("Unable to allocate memory to shut down QoS.\n"); 1632 return -ENOMEM; 1633 } 1634 1635 /* Copy the old QoS data into the newly allocated structure */ 1636 memcpy(new_qos, old_qos, sizeof(*new_qos)); 1637 1638 /* Zero out the key parts of the QoS structure */ 1639 new_qos->ch = NULL; 1640 new_qos->thread = NULL; 1641 new_qos->max_ios_per_timeslice = 0; 1642 new_qos->max_byte_per_timeslice = 0; 1643 new_qos->io_remaining_this_timeslice = 0; 1644 new_qos->byte_remaining_this_timeslice = 0; 1645 new_qos->poller = NULL; 1646 TAILQ_INIT(&new_qos->queued); 1647 1648 bdev->internal.qos = new_qos; 1649 1650 if (old_qos->thread == NULL) { 1651 free(old_qos); 1652 } else { 1653 spdk_thread_send_msg(old_qos->thread, spdk_bdev_qos_channel_destroy, 1654 old_qos); 1655 } 1656 1657 /* It is safe to continue with destroying the bdev even though the QoS channel hasn't 1658 * been destroyed yet. The destruction path will end up waiting for the final 1659 * channel to be put before it releases resources. */ 1660 1661 return 0; 1662 } 1663 1664 static void 1665 _spdk_bdev_io_stat_add(struct spdk_bdev_io_stat *total, struct spdk_bdev_io_stat *add) 1666 { 1667 total->bytes_read += add->bytes_read; 1668 total->num_read_ops += add->num_read_ops; 1669 total->bytes_written += add->bytes_written; 1670 total->num_write_ops += add->num_write_ops; 1671 total->read_latency_ticks += add->read_latency_ticks; 1672 total->write_latency_ticks += add->write_latency_ticks; 1673 } 1674 1675 static void 1676 spdk_bdev_channel_destroy(void *io_device, void *ctx_buf) 1677 { 1678 struct spdk_bdev_channel *ch = ctx_buf; 1679 struct spdk_bdev_mgmt_channel *mgmt_ch; 1680 struct spdk_bdev_shared_resource *shared_resource = ch->shared_resource; 1681 1682 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Destroying channel %p for bdev %s on thread %p\n", ch, ch->bdev->name, 1683 spdk_get_thread()); 1684 1685 /* This channel is going away, so add its statistics into the bdev so that they don't get lost. */ 1686 pthread_mutex_lock(&ch->bdev->internal.mutex); 1687 _spdk_bdev_io_stat_add(&ch->bdev->internal.stat, &ch->stat); 1688 pthread_mutex_unlock(&ch->bdev->internal.mutex); 1689 1690 mgmt_ch = shared_resource->mgmt_ch; 1691 1692 _spdk_bdev_abort_queued_io(&ch->queued_resets, ch); 1693 _spdk_bdev_abort_queued_io(&shared_resource->nomem_io, ch); 1694 _spdk_bdev_abort_buf_io(&mgmt_ch->need_buf_small, ch); 1695 _spdk_bdev_abort_buf_io(&mgmt_ch->need_buf_large, ch); 1696 1697 _spdk_bdev_channel_destroy_resource(ch); 1698 } 1699 1700 int 1701 spdk_bdev_alias_add(struct spdk_bdev *bdev, const char *alias) 1702 { 1703 struct spdk_bdev_alias *tmp; 1704 1705 if (alias == NULL) { 1706 SPDK_ERRLOG("Empty alias passed\n"); 1707 return -EINVAL; 1708 } 1709 1710 if (spdk_bdev_get_by_name(alias)) { 1711 SPDK_ERRLOG("Bdev name/alias: %s already exists\n", alias); 1712 return -EEXIST; 1713 } 1714 1715 tmp = calloc(1, sizeof(*tmp)); 1716 if (tmp == NULL) { 1717 SPDK_ERRLOG("Unable to allocate alias\n"); 1718 return -ENOMEM; 1719 } 1720 1721 tmp->alias = strdup(alias); 1722 if (tmp->alias == NULL) { 1723 free(tmp); 1724 SPDK_ERRLOG("Unable to allocate alias\n"); 1725 return -ENOMEM; 1726 } 1727 1728 TAILQ_INSERT_TAIL(&bdev->aliases, tmp, tailq); 1729 1730 return 0; 1731 } 1732 1733 int 1734 spdk_bdev_alias_del(struct spdk_bdev *bdev, const char *alias) 1735 { 1736 struct spdk_bdev_alias *tmp; 1737 1738 TAILQ_FOREACH(tmp, &bdev->aliases, tailq) { 1739 if (strcmp(alias, tmp->alias) == 0) { 1740 TAILQ_REMOVE(&bdev->aliases, tmp, tailq); 1741 free(tmp->alias); 1742 free(tmp); 1743 return 0; 1744 } 1745 } 1746 1747 SPDK_INFOLOG(SPDK_LOG_BDEV, "Alias %s does not exists\n", alias); 1748 1749 return -ENOENT; 1750 } 1751 1752 void 1753 spdk_bdev_alias_del_all(struct spdk_bdev *bdev) 1754 { 1755 struct spdk_bdev_alias *p, *tmp; 1756 1757 TAILQ_FOREACH_SAFE(p, &bdev->aliases, tailq, tmp) { 1758 TAILQ_REMOVE(&bdev->aliases, p, tailq); 1759 free(p->alias); 1760 free(p); 1761 } 1762 } 1763 1764 struct spdk_io_channel * 1765 spdk_bdev_get_io_channel(struct spdk_bdev_desc *desc) 1766 { 1767 return spdk_get_io_channel(__bdev_to_io_dev(desc->bdev)); 1768 } 1769 1770 const char * 1771 spdk_bdev_get_name(const struct spdk_bdev *bdev) 1772 { 1773 return bdev->name; 1774 } 1775 1776 const char * 1777 spdk_bdev_get_product_name(const struct spdk_bdev *bdev) 1778 { 1779 return bdev->product_name; 1780 } 1781 1782 const struct spdk_bdev_aliases_list * 1783 spdk_bdev_get_aliases(const struct spdk_bdev *bdev) 1784 { 1785 return &bdev->aliases; 1786 } 1787 1788 uint32_t 1789 spdk_bdev_get_block_size(const struct spdk_bdev *bdev) 1790 { 1791 return bdev->blocklen; 1792 } 1793 1794 uint64_t 1795 spdk_bdev_get_num_blocks(const struct spdk_bdev *bdev) 1796 { 1797 return bdev->blockcnt; 1798 } 1799 1800 uint64_t 1801 spdk_bdev_get_qos_ios_per_sec(struct spdk_bdev *bdev) 1802 { 1803 uint64_t iops_rate_limit = 0; 1804 1805 pthread_mutex_lock(&bdev->internal.mutex); 1806 if (bdev->internal.qos) { 1807 iops_rate_limit = bdev->internal.qos->iops_rate_limit; 1808 } 1809 pthread_mutex_unlock(&bdev->internal.mutex); 1810 1811 return iops_rate_limit; 1812 } 1813 1814 size_t 1815 spdk_bdev_get_buf_align(const struct spdk_bdev *bdev) 1816 { 1817 /* TODO: push this logic down to the bdev modules */ 1818 if (bdev->need_aligned_buffer) { 1819 return bdev->blocklen; 1820 } 1821 1822 return 1; 1823 } 1824 1825 uint32_t 1826 spdk_bdev_get_optimal_io_boundary(const struct spdk_bdev *bdev) 1827 { 1828 return bdev->optimal_io_boundary; 1829 } 1830 1831 bool 1832 spdk_bdev_has_write_cache(const struct spdk_bdev *bdev) 1833 { 1834 return bdev->write_cache; 1835 } 1836 1837 const struct spdk_uuid * 1838 spdk_bdev_get_uuid(const struct spdk_bdev *bdev) 1839 { 1840 return &bdev->uuid; 1841 } 1842 1843 uint64_t 1844 spdk_bdev_get_qd(const struct spdk_bdev *bdev) 1845 { 1846 return bdev->internal.measured_queue_depth; 1847 } 1848 1849 uint64_t 1850 spdk_bdev_get_qd_sampling_period(const struct spdk_bdev *bdev) 1851 { 1852 return bdev->internal.period; 1853 } 1854 1855 uint64_t 1856 spdk_bdev_get_weighted_io_time(const struct spdk_bdev *bdev) 1857 { 1858 return bdev->internal.weighted_io_time; 1859 } 1860 1861 uint64_t 1862 spdk_bdev_get_io_time(const struct spdk_bdev *bdev) 1863 { 1864 return bdev->internal.io_time; 1865 } 1866 1867 static void 1868 _calculate_measured_qd_cpl(struct spdk_io_channel_iter *i, int status) 1869 { 1870 struct spdk_bdev *bdev = spdk_io_channel_iter_get_ctx(i); 1871 1872 bdev->internal.measured_queue_depth = bdev->internal.temporary_queue_depth; 1873 1874 if (bdev->internal.measured_queue_depth) { 1875 bdev->internal.io_time += bdev->internal.period; 1876 bdev->internal.weighted_io_time += bdev->internal.period * bdev->internal.measured_queue_depth; 1877 } 1878 } 1879 1880 static void 1881 _calculate_measured_qd(struct spdk_io_channel_iter *i) 1882 { 1883 struct spdk_bdev *bdev = spdk_io_channel_iter_get_ctx(i); 1884 struct spdk_io_channel *io_ch = spdk_io_channel_iter_get_channel(i); 1885 struct spdk_bdev_channel *ch = spdk_io_channel_get_ctx(io_ch); 1886 1887 bdev->internal.temporary_queue_depth += ch->io_outstanding; 1888 spdk_for_each_channel_continue(i, 0); 1889 } 1890 1891 static int 1892 spdk_bdev_calculate_measured_queue_depth(void *ctx) 1893 { 1894 struct spdk_bdev *bdev = ctx; 1895 bdev->internal.temporary_queue_depth = 0; 1896 spdk_for_each_channel(__bdev_to_io_dev(bdev), _calculate_measured_qd, bdev, 1897 _calculate_measured_qd_cpl); 1898 return 0; 1899 } 1900 1901 void 1902 spdk_bdev_set_qd_sampling_period(struct spdk_bdev *bdev, uint64_t period) 1903 { 1904 bdev->internal.period = period; 1905 1906 if (bdev->internal.qd_poller != NULL) { 1907 spdk_poller_unregister(&bdev->internal.qd_poller); 1908 bdev->internal.measured_queue_depth = UINT64_MAX; 1909 } 1910 1911 if (period != 0) { 1912 bdev->internal.qd_poller = spdk_poller_register(spdk_bdev_calculate_measured_queue_depth, bdev, 1913 period); 1914 } 1915 } 1916 1917 int 1918 spdk_bdev_notify_blockcnt_change(struct spdk_bdev *bdev, uint64_t size) 1919 { 1920 int ret; 1921 1922 pthread_mutex_lock(&bdev->internal.mutex); 1923 1924 /* bdev has open descriptors */ 1925 if (!TAILQ_EMPTY(&bdev->internal.open_descs) && 1926 bdev->blockcnt > size) { 1927 ret = -EBUSY; 1928 } else { 1929 bdev->blockcnt = size; 1930 ret = 0; 1931 } 1932 1933 pthread_mutex_unlock(&bdev->internal.mutex); 1934 1935 return ret; 1936 } 1937 1938 /* 1939 * Convert I/O offset and length from bytes to blocks. 1940 * 1941 * Returns zero on success or non-zero if the byte parameters aren't divisible by the block size. 1942 */ 1943 static uint64_t 1944 spdk_bdev_bytes_to_blocks(struct spdk_bdev *bdev, uint64_t offset_bytes, uint64_t *offset_blocks, 1945 uint64_t num_bytes, uint64_t *num_blocks) 1946 { 1947 uint32_t block_size = bdev->blocklen; 1948 1949 *offset_blocks = offset_bytes / block_size; 1950 *num_blocks = num_bytes / block_size; 1951 1952 return (offset_bytes % block_size) | (num_bytes % block_size); 1953 } 1954 1955 static bool 1956 spdk_bdev_io_valid_blocks(struct spdk_bdev *bdev, uint64_t offset_blocks, uint64_t num_blocks) 1957 { 1958 /* Return failure if offset_blocks + num_blocks is less than offset_blocks; indicates there 1959 * has been an overflow and hence the offset has been wrapped around */ 1960 if (offset_blocks + num_blocks < offset_blocks) { 1961 return false; 1962 } 1963 1964 /* Return failure if offset_blocks + num_blocks exceeds the size of the bdev */ 1965 if (offset_blocks + num_blocks > bdev->blockcnt) { 1966 return false; 1967 } 1968 1969 return true; 1970 } 1971 1972 int 1973 spdk_bdev_read(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 1974 void *buf, uint64_t offset, uint64_t nbytes, 1975 spdk_bdev_io_completion_cb cb, void *cb_arg) 1976 { 1977 uint64_t offset_blocks, num_blocks; 1978 1979 if (spdk_bdev_bytes_to_blocks(desc->bdev, offset, &offset_blocks, nbytes, &num_blocks) != 0) { 1980 return -EINVAL; 1981 } 1982 1983 return spdk_bdev_read_blocks(desc, ch, buf, offset_blocks, num_blocks, cb, cb_arg); 1984 } 1985 1986 int 1987 spdk_bdev_read_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 1988 void *buf, uint64_t offset_blocks, uint64_t num_blocks, 1989 spdk_bdev_io_completion_cb cb, void *cb_arg) 1990 { 1991 struct spdk_bdev *bdev = desc->bdev; 1992 struct spdk_bdev_io *bdev_io; 1993 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 1994 1995 if (!spdk_bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) { 1996 return -EINVAL; 1997 } 1998 1999 bdev_io = spdk_bdev_get_io(channel); 2000 if (!bdev_io) { 2001 return -ENOMEM; 2002 } 2003 2004 bdev_io->internal.ch = channel; 2005 bdev_io->internal.desc = desc; 2006 bdev_io->type = SPDK_BDEV_IO_TYPE_READ; 2007 bdev_io->u.bdev.iovs = &bdev_io->iov; 2008 bdev_io->u.bdev.iovs[0].iov_base = buf; 2009 bdev_io->u.bdev.iovs[0].iov_len = num_blocks * bdev->blocklen; 2010 bdev_io->u.bdev.iovcnt = 1; 2011 bdev_io->u.bdev.num_blocks = num_blocks; 2012 bdev_io->u.bdev.offset_blocks = offset_blocks; 2013 spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb); 2014 2015 spdk_bdev_io_submit(bdev_io); 2016 return 0; 2017 } 2018 2019 int 2020 spdk_bdev_readv(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 2021 struct iovec *iov, int iovcnt, 2022 uint64_t offset, uint64_t nbytes, 2023 spdk_bdev_io_completion_cb cb, void *cb_arg) 2024 { 2025 uint64_t offset_blocks, num_blocks; 2026 2027 if (spdk_bdev_bytes_to_blocks(desc->bdev, offset, &offset_blocks, nbytes, &num_blocks) != 0) { 2028 return -EINVAL; 2029 } 2030 2031 return spdk_bdev_readv_blocks(desc, ch, iov, iovcnt, offset_blocks, num_blocks, cb, cb_arg); 2032 } 2033 2034 int spdk_bdev_readv_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 2035 struct iovec *iov, int iovcnt, 2036 uint64_t offset_blocks, uint64_t num_blocks, 2037 spdk_bdev_io_completion_cb cb, void *cb_arg) 2038 { 2039 struct spdk_bdev *bdev = desc->bdev; 2040 struct spdk_bdev_io *bdev_io; 2041 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 2042 2043 if (!spdk_bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) { 2044 return -EINVAL; 2045 } 2046 2047 bdev_io = spdk_bdev_get_io(channel); 2048 if (!bdev_io) { 2049 return -ENOMEM; 2050 } 2051 2052 bdev_io->internal.ch = channel; 2053 bdev_io->internal.desc = desc; 2054 bdev_io->type = SPDK_BDEV_IO_TYPE_READ; 2055 bdev_io->u.bdev.iovs = iov; 2056 bdev_io->u.bdev.iovcnt = iovcnt; 2057 bdev_io->u.bdev.num_blocks = num_blocks; 2058 bdev_io->u.bdev.offset_blocks = offset_blocks; 2059 spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb); 2060 2061 spdk_bdev_io_submit(bdev_io); 2062 return 0; 2063 } 2064 2065 int 2066 spdk_bdev_write(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 2067 void *buf, uint64_t offset, uint64_t nbytes, 2068 spdk_bdev_io_completion_cb cb, void *cb_arg) 2069 { 2070 uint64_t offset_blocks, num_blocks; 2071 2072 if (spdk_bdev_bytes_to_blocks(desc->bdev, offset, &offset_blocks, nbytes, &num_blocks) != 0) { 2073 return -EINVAL; 2074 } 2075 2076 return spdk_bdev_write_blocks(desc, ch, buf, offset_blocks, num_blocks, cb, cb_arg); 2077 } 2078 2079 int 2080 spdk_bdev_write_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 2081 void *buf, uint64_t offset_blocks, uint64_t num_blocks, 2082 spdk_bdev_io_completion_cb cb, void *cb_arg) 2083 { 2084 struct spdk_bdev *bdev = desc->bdev; 2085 struct spdk_bdev_io *bdev_io; 2086 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 2087 2088 if (!desc->write) { 2089 return -EBADF; 2090 } 2091 2092 if (!spdk_bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) { 2093 return -EINVAL; 2094 } 2095 2096 bdev_io = spdk_bdev_get_io(channel); 2097 if (!bdev_io) { 2098 return -ENOMEM; 2099 } 2100 2101 bdev_io->internal.ch = channel; 2102 bdev_io->internal.desc = desc; 2103 bdev_io->type = SPDK_BDEV_IO_TYPE_WRITE; 2104 bdev_io->u.bdev.iovs = &bdev_io->iov; 2105 bdev_io->u.bdev.iovs[0].iov_base = buf; 2106 bdev_io->u.bdev.iovs[0].iov_len = num_blocks * bdev->blocklen; 2107 bdev_io->u.bdev.iovcnt = 1; 2108 bdev_io->u.bdev.num_blocks = num_blocks; 2109 bdev_io->u.bdev.offset_blocks = offset_blocks; 2110 spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb); 2111 2112 spdk_bdev_io_submit(bdev_io); 2113 return 0; 2114 } 2115 2116 int 2117 spdk_bdev_writev(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 2118 struct iovec *iov, int iovcnt, 2119 uint64_t offset, uint64_t len, 2120 spdk_bdev_io_completion_cb cb, void *cb_arg) 2121 { 2122 uint64_t offset_blocks, num_blocks; 2123 2124 if (spdk_bdev_bytes_to_blocks(desc->bdev, offset, &offset_blocks, len, &num_blocks) != 0) { 2125 return -EINVAL; 2126 } 2127 2128 return spdk_bdev_writev_blocks(desc, ch, iov, iovcnt, offset_blocks, num_blocks, cb, cb_arg); 2129 } 2130 2131 int 2132 spdk_bdev_writev_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 2133 struct iovec *iov, int iovcnt, 2134 uint64_t offset_blocks, uint64_t num_blocks, 2135 spdk_bdev_io_completion_cb cb, void *cb_arg) 2136 { 2137 struct spdk_bdev *bdev = desc->bdev; 2138 struct spdk_bdev_io *bdev_io; 2139 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 2140 2141 if (!desc->write) { 2142 return -EBADF; 2143 } 2144 2145 if (!spdk_bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) { 2146 return -EINVAL; 2147 } 2148 2149 bdev_io = spdk_bdev_get_io(channel); 2150 if (!bdev_io) { 2151 return -ENOMEM; 2152 } 2153 2154 bdev_io->internal.ch = channel; 2155 bdev_io->internal.desc = desc; 2156 bdev_io->type = SPDK_BDEV_IO_TYPE_WRITE; 2157 bdev_io->u.bdev.iovs = iov; 2158 bdev_io->u.bdev.iovcnt = iovcnt; 2159 bdev_io->u.bdev.num_blocks = num_blocks; 2160 bdev_io->u.bdev.offset_blocks = offset_blocks; 2161 spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb); 2162 2163 spdk_bdev_io_submit(bdev_io); 2164 return 0; 2165 } 2166 2167 int 2168 spdk_bdev_write_zeroes(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 2169 uint64_t offset, uint64_t len, 2170 spdk_bdev_io_completion_cb cb, void *cb_arg) 2171 { 2172 uint64_t offset_blocks, num_blocks; 2173 2174 if (spdk_bdev_bytes_to_blocks(desc->bdev, offset, &offset_blocks, len, &num_blocks) != 0) { 2175 return -EINVAL; 2176 } 2177 2178 return spdk_bdev_write_zeroes_blocks(desc, ch, offset_blocks, num_blocks, cb, cb_arg); 2179 } 2180 2181 int 2182 spdk_bdev_write_zeroes_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 2183 uint64_t offset_blocks, uint64_t num_blocks, 2184 spdk_bdev_io_completion_cb cb, void *cb_arg) 2185 { 2186 struct spdk_bdev *bdev = desc->bdev; 2187 struct spdk_bdev_io *bdev_io; 2188 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 2189 2190 if (!desc->write) { 2191 return -EBADF; 2192 } 2193 2194 if (!spdk_bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) { 2195 return -EINVAL; 2196 } 2197 2198 bdev_io = spdk_bdev_get_io(channel); 2199 2200 if (!bdev_io) { 2201 return -ENOMEM; 2202 } 2203 2204 bdev_io->type = SPDK_BDEV_IO_TYPE_WRITE_ZEROES; 2205 bdev_io->internal.ch = channel; 2206 bdev_io->internal.desc = desc; 2207 bdev_io->u.bdev.offset_blocks = offset_blocks; 2208 bdev_io->u.bdev.num_blocks = num_blocks; 2209 spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb); 2210 2211 if (_spdk_bdev_io_type_supported(bdev, SPDK_BDEV_IO_TYPE_WRITE_ZEROES)) { 2212 spdk_bdev_io_submit(bdev_io); 2213 return 0; 2214 } else if (_spdk_bdev_io_type_supported(bdev, SPDK_BDEV_IO_TYPE_WRITE)) { 2215 assert(spdk_bdev_get_block_size(bdev) <= ZERO_BUFFER_SIZE); 2216 bdev_io->u.bdev.split_remaining_num_blocks = num_blocks; 2217 bdev_io->u.bdev.split_current_offset_blocks = offset_blocks; 2218 _spdk_bdev_write_zero_buffer_next(bdev_io); 2219 return 0; 2220 } else { 2221 spdk_bdev_free_io(bdev_io); 2222 return -ENOTSUP; 2223 } 2224 } 2225 2226 int 2227 spdk_bdev_unmap(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 2228 uint64_t offset, uint64_t nbytes, 2229 spdk_bdev_io_completion_cb cb, void *cb_arg) 2230 { 2231 uint64_t offset_blocks, num_blocks; 2232 2233 if (spdk_bdev_bytes_to_blocks(desc->bdev, offset, &offset_blocks, nbytes, &num_blocks) != 0) { 2234 return -EINVAL; 2235 } 2236 2237 return spdk_bdev_unmap_blocks(desc, ch, offset_blocks, num_blocks, cb, cb_arg); 2238 } 2239 2240 int 2241 spdk_bdev_unmap_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 2242 uint64_t offset_blocks, uint64_t num_blocks, 2243 spdk_bdev_io_completion_cb cb, void *cb_arg) 2244 { 2245 struct spdk_bdev *bdev = desc->bdev; 2246 struct spdk_bdev_io *bdev_io; 2247 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 2248 2249 if (!desc->write) { 2250 return -EBADF; 2251 } 2252 2253 if (!spdk_bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) { 2254 return -EINVAL; 2255 } 2256 2257 if (num_blocks == 0) { 2258 SPDK_ERRLOG("Can't unmap 0 bytes\n"); 2259 return -EINVAL; 2260 } 2261 2262 bdev_io = spdk_bdev_get_io(channel); 2263 if (!bdev_io) { 2264 return -ENOMEM; 2265 } 2266 2267 bdev_io->internal.ch = channel; 2268 bdev_io->internal.desc = desc; 2269 bdev_io->type = SPDK_BDEV_IO_TYPE_UNMAP; 2270 2271 bdev_io->u.bdev.iovs = &bdev_io->iov; 2272 bdev_io->u.bdev.iovs[0].iov_base = NULL; 2273 bdev_io->u.bdev.iovs[0].iov_len = 0; 2274 bdev_io->u.bdev.iovcnt = 1; 2275 2276 bdev_io->u.bdev.offset_blocks = offset_blocks; 2277 bdev_io->u.bdev.num_blocks = num_blocks; 2278 spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb); 2279 2280 spdk_bdev_io_submit(bdev_io); 2281 return 0; 2282 } 2283 2284 int 2285 spdk_bdev_flush(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 2286 uint64_t offset, uint64_t length, 2287 spdk_bdev_io_completion_cb cb, void *cb_arg) 2288 { 2289 uint64_t offset_blocks, num_blocks; 2290 2291 if (spdk_bdev_bytes_to_blocks(desc->bdev, offset, &offset_blocks, length, &num_blocks) != 0) { 2292 return -EINVAL; 2293 } 2294 2295 return spdk_bdev_flush_blocks(desc, ch, offset_blocks, num_blocks, cb, cb_arg); 2296 } 2297 2298 int 2299 spdk_bdev_flush_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 2300 uint64_t offset_blocks, uint64_t num_blocks, 2301 spdk_bdev_io_completion_cb cb, void *cb_arg) 2302 { 2303 struct spdk_bdev *bdev = desc->bdev; 2304 struct spdk_bdev_io *bdev_io; 2305 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 2306 2307 if (!desc->write) { 2308 return -EBADF; 2309 } 2310 2311 if (!spdk_bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) { 2312 return -EINVAL; 2313 } 2314 2315 bdev_io = spdk_bdev_get_io(channel); 2316 if (!bdev_io) { 2317 return -ENOMEM; 2318 } 2319 2320 bdev_io->internal.ch = channel; 2321 bdev_io->internal.desc = desc; 2322 bdev_io->type = SPDK_BDEV_IO_TYPE_FLUSH; 2323 bdev_io->u.bdev.iovs = NULL; 2324 bdev_io->u.bdev.iovcnt = 0; 2325 bdev_io->u.bdev.offset_blocks = offset_blocks; 2326 bdev_io->u.bdev.num_blocks = num_blocks; 2327 spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb); 2328 2329 spdk_bdev_io_submit(bdev_io); 2330 return 0; 2331 } 2332 2333 static void 2334 _spdk_bdev_reset_dev(struct spdk_io_channel_iter *i, int status) 2335 { 2336 struct spdk_bdev_channel *ch = spdk_io_channel_iter_get_ctx(i); 2337 struct spdk_bdev_io *bdev_io; 2338 2339 bdev_io = TAILQ_FIRST(&ch->queued_resets); 2340 TAILQ_REMOVE(&ch->queued_resets, bdev_io, internal.link); 2341 spdk_bdev_io_submit_reset(bdev_io); 2342 } 2343 2344 static void 2345 _spdk_bdev_reset_freeze_channel(struct spdk_io_channel_iter *i) 2346 { 2347 struct spdk_io_channel *ch; 2348 struct spdk_bdev_channel *channel; 2349 struct spdk_bdev_mgmt_channel *mgmt_channel; 2350 struct spdk_bdev_shared_resource *shared_resource; 2351 bdev_io_tailq_t tmp_queued; 2352 2353 TAILQ_INIT(&tmp_queued); 2354 2355 ch = spdk_io_channel_iter_get_channel(i); 2356 channel = spdk_io_channel_get_ctx(ch); 2357 shared_resource = channel->shared_resource; 2358 mgmt_channel = shared_resource->mgmt_ch; 2359 2360 channel->flags |= BDEV_CH_RESET_IN_PROGRESS; 2361 2362 if ((channel->flags & BDEV_CH_QOS_ENABLED) != 0) { 2363 /* The QoS object is always valid and readable while 2364 * the channel flag is set, so the lock here should not 2365 * be necessary. We're not in the fast path though, so 2366 * just take it anyway. */ 2367 pthread_mutex_lock(&channel->bdev->internal.mutex); 2368 if (channel->bdev->internal.qos->ch == channel) { 2369 TAILQ_SWAP(&channel->bdev->internal.qos->queued, &tmp_queued, spdk_bdev_io, internal.link); 2370 } 2371 pthread_mutex_unlock(&channel->bdev->internal.mutex); 2372 } 2373 2374 _spdk_bdev_abort_queued_io(&shared_resource->nomem_io, channel); 2375 _spdk_bdev_abort_buf_io(&mgmt_channel->need_buf_small, channel); 2376 _spdk_bdev_abort_buf_io(&mgmt_channel->need_buf_large, channel); 2377 _spdk_bdev_abort_queued_io(&tmp_queued, channel); 2378 2379 spdk_for_each_channel_continue(i, 0); 2380 } 2381 2382 static void 2383 _spdk_bdev_start_reset(void *ctx) 2384 { 2385 struct spdk_bdev_channel *ch = ctx; 2386 2387 spdk_for_each_channel(__bdev_to_io_dev(ch->bdev), _spdk_bdev_reset_freeze_channel, 2388 ch, _spdk_bdev_reset_dev); 2389 } 2390 2391 static void 2392 _spdk_bdev_channel_start_reset(struct spdk_bdev_channel *ch) 2393 { 2394 struct spdk_bdev *bdev = ch->bdev; 2395 2396 assert(!TAILQ_EMPTY(&ch->queued_resets)); 2397 2398 pthread_mutex_lock(&bdev->internal.mutex); 2399 if (bdev->internal.reset_in_progress == NULL) { 2400 bdev->internal.reset_in_progress = TAILQ_FIRST(&ch->queued_resets); 2401 /* 2402 * Take a channel reference for the target bdev for the life of this 2403 * reset. This guards against the channel getting destroyed while 2404 * spdk_for_each_channel() calls related to this reset IO are in 2405 * progress. We will release the reference when this reset is 2406 * completed. 2407 */ 2408 bdev->internal.reset_in_progress->u.reset.ch_ref = spdk_get_io_channel(__bdev_to_io_dev(bdev)); 2409 _spdk_bdev_start_reset(ch); 2410 } 2411 pthread_mutex_unlock(&bdev->internal.mutex); 2412 } 2413 2414 int 2415 spdk_bdev_reset(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 2416 spdk_bdev_io_completion_cb cb, void *cb_arg) 2417 { 2418 struct spdk_bdev *bdev = desc->bdev; 2419 struct spdk_bdev_io *bdev_io; 2420 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 2421 2422 bdev_io = spdk_bdev_get_io(channel); 2423 if (!bdev_io) { 2424 return -ENOMEM; 2425 } 2426 2427 bdev_io->internal.ch = channel; 2428 bdev_io->internal.desc = desc; 2429 bdev_io->type = SPDK_BDEV_IO_TYPE_RESET; 2430 bdev_io->u.reset.ch_ref = NULL; 2431 spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb); 2432 2433 pthread_mutex_lock(&bdev->internal.mutex); 2434 TAILQ_INSERT_TAIL(&channel->queued_resets, bdev_io, internal.link); 2435 pthread_mutex_unlock(&bdev->internal.mutex); 2436 2437 _spdk_bdev_channel_start_reset(channel); 2438 2439 return 0; 2440 } 2441 2442 void 2443 spdk_bdev_get_io_stat(struct spdk_bdev *bdev, struct spdk_io_channel *ch, 2444 struct spdk_bdev_io_stat *stat) 2445 { 2446 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 2447 2448 *stat = channel->stat; 2449 } 2450 2451 static void 2452 _spdk_bdev_get_device_stat_done(struct spdk_io_channel_iter *i, int status) 2453 { 2454 void *io_device = spdk_io_channel_iter_get_io_device(i); 2455 struct spdk_bdev_iostat_ctx *bdev_iostat_ctx = spdk_io_channel_iter_get_ctx(i); 2456 2457 bdev_iostat_ctx->cb(__bdev_from_io_dev(io_device), bdev_iostat_ctx->stat, 2458 bdev_iostat_ctx->cb_arg, 0); 2459 free(bdev_iostat_ctx); 2460 } 2461 2462 static void 2463 _spdk_bdev_get_each_channel_stat(struct spdk_io_channel_iter *i) 2464 { 2465 struct spdk_bdev_iostat_ctx *bdev_iostat_ctx = spdk_io_channel_iter_get_ctx(i); 2466 struct spdk_io_channel *ch = spdk_io_channel_iter_get_channel(i); 2467 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 2468 2469 _spdk_bdev_io_stat_add(bdev_iostat_ctx->stat, &channel->stat); 2470 spdk_for_each_channel_continue(i, 0); 2471 } 2472 2473 void 2474 spdk_bdev_get_device_stat(struct spdk_bdev *bdev, struct spdk_bdev_io_stat *stat, 2475 spdk_bdev_get_device_stat_cb cb, void *cb_arg) 2476 { 2477 struct spdk_bdev_iostat_ctx *bdev_iostat_ctx; 2478 2479 assert(bdev != NULL); 2480 assert(stat != NULL); 2481 assert(cb != NULL); 2482 2483 bdev_iostat_ctx = calloc(1, sizeof(struct spdk_bdev_iostat_ctx)); 2484 if (bdev_iostat_ctx == NULL) { 2485 SPDK_ERRLOG("Unable to allocate memory for spdk_bdev_iostat_ctx\n"); 2486 cb(bdev, stat, cb_arg, -ENOMEM); 2487 return; 2488 } 2489 2490 bdev_iostat_ctx->stat = stat; 2491 bdev_iostat_ctx->cb = cb; 2492 bdev_iostat_ctx->cb_arg = cb_arg; 2493 2494 /* Start with the statistics from previously deleted channels. */ 2495 pthread_mutex_lock(&bdev->internal.mutex); 2496 _spdk_bdev_io_stat_add(bdev_iostat_ctx->stat, &bdev->internal.stat); 2497 pthread_mutex_unlock(&bdev->internal.mutex); 2498 2499 /* Then iterate and add the statistics from each existing channel. */ 2500 spdk_for_each_channel(__bdev_to_io_dev(bdev), 2501 _spdk_bdev_get_each_channel_stat, 2502 bdev_iostat_ctx, 2503 _spdk_bdev_get_device_stat_done); 2504 } 2505 2506 int 2507 spdk_bdev_nvme_admin_passthru(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 2508 const struct spdk_nvme_cmd *cmd, void *buf, size_t nbytes, 2509 spdk_bdev_io_completion_cb cb, void *cb_arg) 2510 { 2511 struct spdk_bdev *bdev = desc->bdev; 2512 struct spdk_bdev_io *bdev_io; 2513 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 2514 2515 if (!desc->write) { 2516 return -EBADF; 2517 } 2518 2519 bdev_io = spdk_bdev_get_io(channel); 2520 if (!bdev_io) { 2521 return -ENOMEM; 2522 } 2523 2524 bdev_io->internal.ch = channel; 2525 bdev_io->internal.desc = desc; 2526 bdev_io->type = SPDK_BDEV_IO_TYPE_NVME_ADMIN; 2527 bdev_io->u.nvme_passthru.cmd = *cmd; 2528 bdev_io->u.nvme_passthru.buf = buf; 2529 bdev_io->u.nvme_passthru.nbytes = nbytes; 2530 bdev_io->u.nvme_passthru.md_buf = NULL; 2531 bdev_io->u.nvme_passthru.md_len = 0; 2532 2533 spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb); 2534 2535 spdk_bdev_io_submit(bdev_io); 2536 return 0; 2537 } 2538 2539 int 2540 spdk_bdev_nvme_io_passthru(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 2541 const struct spdk_nvme_cmd *cmd, void *buf, size_t nbytes, 2542 spdk_bdev_io_completion_cb cb, void *cb_arg) 2543 { 2544 struct spdk_bdev *bdev = desc->bdev; 2545 struct spdk_bdev_io *bdev_io; 2546 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 2547 2548 if (!desc->write) { 2549 /* 2550 * Do not try to parse the NVMe command - we could maybe use bits in the opcode 2551 * to easily determine if the command is a read or write, but for now just 2552 * do not allow io_passthru with a read-only descriptor. 2553 */ 2554 return -EBADF; 2555 } 2556 2557 bdev_io = spdk_bdev_get_io(channel); 2558 if (!bdev_io) { 2559 return -ENOMEM; 2560 } 2561 2562 bdev_io->internal.ch = channel; 2563 bdev_io->internal.desc = desc; 2564 bdev_io->type = SPDK_BDEV_IO_TYPE_NVME_IO; 2565 bdev_io->u.nvme_passthru.cmd = *cmd; 2566 bdev_io->u.nvme_passthru.buf = buf; 2567 bdev_io->u.nvme_passthru.nbytes = nbytes; 2568 bdev_io->u.nvme_passthru.md_buf = NULL; 2569 bdev_io->u.nvme_passthru.md_len = 0; 2570 2571 spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb); 2572 2573 spdk_bdev_io_submit(bdev_io); 2574 return 0; 2575 } 2576 2577 int 2578 spdk_bdev_nvme_io_passthru_md(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 2579 const struct spdk_nvme_cmd *cmd, void *buf, size_t nbytes, void *md_buf, size_t md_len, 2580 spdk_bdev_io_completion_cb cb, void *cb_arg) 2581 { 2582 struct spdk_bdev *bdev = desc->bdev; 2583 struct spdk_bdev_io *bdev_io; 2584 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 2585 2586 if (!desc->write) { 2587 /* 2588 * Do not try to parse the NVMe command - we could maybe use bits in the opcode 2589 * to easily determine if the command is a read or write, but for now just 2590 * do not allow io_passthru with a read-only descriptor. 2591 */ 2592 return -EBADF; 2593 } 2594 2595 bdev_io = spdk_bdev_get_io(channel); 2596 if (!bdev_io) { 2597 return -ENOMEM; 2598 } 2599 2600 bdev_io->internal.ch = channel; 2601 bdev_io->internal.desc = desc; 2602 bdev_io->type = SPDK_BDEV_IO_TYPE_NVME_IO_MD; 2603 bdev_io->u.nvme_passthru.cmd = *cmd; 2604 bdev_io->u.nvme_passthru.buf = buf; 2605 bdev_io->u.nvme_passthru.nbytes = nbytes; 2606 bdev_io->u.nvme_passthru.md_buf = md_buf; 2607 bdev_io->u.nvme_passthru.md_len = md_len; 2608 2609 spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb); 2610 2611 spdk_bdev_io_submit(bdev_io); 2612 return 0; 2613 } 2614 2615 int 2616 spdk_bdev_queue_io_wait(struct spdk_bdev *bdev, struct spdk_io_channel *ch, 2617 struct spdk_bdev_io_wait_entry *entry) 2618 { 2619 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 2620 struct spdk_bdev_mgmt_channel *mgmt_ch = channel->shared_resource->mgmt_ch; 2621 2622 if (bdev != entry->bdev) { 2623 SPDK_ERRLOG("bdevs do not match\n"); 2624 return -EINVAL; 2625 } 2626 2627 if (mgmt_ch->per_thread_cache_count > 0) { 2628 SPDK_ERRLOG("Cannot queue io_wait if spdk_bdev_io available in per-thread cache\n"); 2629 return -EINVAL; 2630 } 2631 2632 TAILQ_INSERT_TAIL(&mgmt_ch->io_wait_queue, entry, link); 2633 return 0; 2634 } 2635 2636 static void 2637 _spdk_bdev_ch_retry_io(struct spdk_bdev_channel *bdev_ch) 2638 { 2639 struct spdk_bdev *bdev = bdev_ch->bdev; 2640 struct spdk_bdev_shared_resource *shared_resource = bdev_ch->shared_resource; 2641 struct spdk_bdev_io *bdev_io; 2642 2643 if (shared_resource->io_outstanding > shared_resource->nomem_threshold) { 2644 /* 2645 * Allow some more I/O to complete before retrying the nomem_io queue. 2646 * Some drivers (such as nvme) cannot immediately take a new I/O in 2647 * the context of a completion, because the resources for the I/O are 2648 * not released until control returns to the bdev poller. Also, we 2649 * may require several small I/O to complete before a larger I/O 2650 * (that requires splitting) can be submitted. 2651 */ 2652 return; 2653 } 2654 2655 while (!TAILQ_EMPTY(&shared_resource->nomem_io)) { 2656 bdev_io = TAILQ_FIRST(&shared_resource->nomem_io); 2657 TAILQ_REMOVE(&shared_resource->nomem_io, bdev_io, internal.link); 2658 bdev_io->internal.ch->io_outstanding++; 2659 shared_resource->io_outstanding++; 2660 bdev_io->internal.status = SPDK_BDEV_IO_STATUS_PENDING; 2661 bdev->fn_table->submit_request(bdev_io->internal.ch->channel, bdev_io); 2662 if (bdev_io->internal.status == SPDK_BDEV_IO_STATUS_NOMEM) { 2663 break; 2664 } 2665 } 2666 } 2667 2668 static inline void 2669 _spdk_bdev_io_complete(void *ctx) 2670 { 2671 struct spdk_bdev_io *bdev_io = ctx; 2672 2673 if (spdk_unlikely(bdev_io->internal.in_submit_request || bdev_io->internal.io_submit_ch)) { 2674 /* 2675 * Send the completion to the thread that originally submitted the I/O, 2676 * which may not be the current thread in the case of QoS. 2677 */ 2678 if (bdev_io->internal.io_submit_ch) { 2679 bdev_io->internal.ch = bdev_io->internal.io_submit_ch; 2680 bdev_io->internal.io_submit_ch = NULL; 2681 } 2682 2683 /* 2684 * Defer completion to avoid potential infinite recursion if the 2685 * user's completion callback issues a new I/O. 2686 */ 2687 spdk_thread_send_msg(spdk_io_channel_get_thread(bdev_io->internal.ch->channel), 2688 _spdk_bdev_io_complete, bdev_io); 2689 return; 2690 } 2691 2692 if (bdev_io->internal.status == SPDK_BDEV_IO_STATUS_SUCCESS) { 2693 switch (bdev_io->type) { 2694 case SPDK_BDEV_IO_TYPE_READ: 2695 bdev_io->internal.ch->stat.bytes_read += bdev_io->u.bdev.num_blocks * bdev_io->bdev->blocklen; 2696 bdev_io->internal.ch->stat.num_read_ops++; 2697 bdev_io->internal.ch->stat.read_latency_ticks += (spdk_get_ticks() - bdev_io->internal.submit_tsc); 2698 break; 2699 case SPDK_BDEV_IO_TYPE_WRITE: 2700 bdev_io->internal.ch->stat.bytes_written += bdev_io->u.bdev.num_blocks * bdev_io->bdev->blocklen; 2701 bdev_io->internal.ch->stat.num_write_ops++; 2702 bdev_io->internal.ch->stat.write_latency_ticks += (spdk_get_ticks() - bdev_io->internal.submit_tsc); 2703 break; 2704 default: 2705 break; 2706 } 2707 } 2708 2709 #ifdef SPDK_CONFIG_VTUNE 2710 uint64_t now_tsc = spdk_get_ticks(); 2711 if (now_tsc > (bdev_io->internal.ch->start_tsc + bdev_io->internal.ch->interval_tsc)) { 2712 uint64_t data[5]; 2713 2714 data[0] = bdev_io->internal.ch->stat.num_read_ops - bdev_io->internal.ch->prev_stat.num_read_ops; 2715 data[1] = bdev_io->internal.ch->stat.bytes_read - bdev_io->internal.ch->prev_stat.bytes_read; 2716 data[2] = bdev_io->internal.ch->stat.num_write_ops - bdev_io->internal.ch->prev_stat.num_write_ops; 2717 data[3] = bdev_io->internal.ch->stat.bytes_written - bdev_io->internal.ch->prev_stat.bytes_written; 2718 data[4] = bdev_io->bdev->fn_table->get_spin_time ? 2719 bdev_io->bdev->fn_table->get_spin_time(bdev_io->internal.ch->channel) : 0; 2720 2721 __itt_metadata_add(g_bdev_mgr.domain, __itt_null, bdev_io->internal.ch->handle, 2722 __itt_metadata_u64, 5, data); 2723 2724 bdev_io->internal.ch->prev_stat = bdev_io->internal.ch->stat; 2725 bdev_io->internal.ch->start_tsc = now_tsc; 2726 } 2727 #endif 2728 2729 assert(bdev_io->internal.cb != NULL); 2730 assert(spdk_get_thread() == spdk_io_channel_get_thread(bdev_io->internal.ch->channel)); 2731 2732 bdev_io->internal.cb(bdev_io, bdev_io->internal.status == SPDK_BDEV_IO_STATUS_SUCCESS, 2733 bdev_io->internal.caller_ctx); 2734 } 2735 2736 static void 2737 _spdk_bdev_reset_complete(struct spdk_io_channel_iter *i, int status) 2738 { 2739 struct spdk_bdev_io *bdev_io = spdk_io_channel_iter_get_ctx(i); 2740 2741 if (bdev_io->u.reset.ch_ref != NULL) { 2742 spdk_put_io_channel(bdev_io->u.reset.ch_ref); 2743 bdev_io->u.reset.ch_ref = NULL; 2744 } 2745 2746 _spdk_bdev_io_complete(bdev_io); 2747 } 2748 2749 static void 2750 _spdk_bdev_unfreeze_channel(struct spdk_io_channel_iter *i) 2751 { 2752 struct spdk_io_channel *_ch = spdk_io_channel_iter_get_channel(i); 2753 struct spdk_bdev_channel *ch = spdk_io_channel_get_ctx(_ch); 2754 2755 ch->flags &= ~BDEV_CH_RESET_IN_PROGRESS; 2756 if (!TAILQ_EMPTY(&ch->queued_resets)) { 2757 _spdk_bdev_channel_start_reset(ch); 2758 } 2759 2760 spdk_for_each_channel_continue(i, 0); 2761 } 2762 2763 void 2764 spdk_bdev_io_complete(struct spdk_bdev_io *bdev_io, enum spdk_bdev_io_status status) 2765 { 2766 struct spdk_bdev *bdev = bdev_io->bdev; 2767 struct spdk_bdev_channel *bdev_ch = bdev_io->internal.ch; 2768 struct spdk_bdev_shared_resource *shared_resource = bdev_ch->shared_resource; 2769 2770 bdev_io->internal.status = status; 2771 2772 if (spdk_unlikely(bdev_io->type == SPDK_BDEV_IO_TYPE_RESET)) { 2773 bool unlock_channels = false; 2774 2775 if (status == SPDK_BDEV_IO_STATUS_NOMEM) { 2776 SPDK_ERRLOG("NOMEM returned for reset\n"); 2777 } 2778 pthread_mutex_lock(&bdev->internal.mutex); 2779 if (bdev_io == bdev->internal.reset_in_progress) { 2780 bdev->internal.reset_in_progress = NULL; 2781 unlock_channels = true; 2782 } 2783 pthread_mutex_unlock(&bdev->internal.mutex); 2784 2785 if (unlock_channels) { 2786 spdk_for_each_channel(__bdev_to_io_dev(bdev), _spdk_bdev_unfreeze_channel, 2787 bdev_io, _spdk_bdev_reset_complete); 2788 return; 2789 } 2790 } else { 2791 assert(bdev_ch->io_outstanding > 0); 2792 assert(shared_resource->io_outstanding > 0); 2793 bdev_ch->io_outstanding--; 2794 shared_resource->io_outstanding--; 2795 2796 if (spdk_unlikely(status == SPDK_BDEV_IO_STATUS_NOMEM)) { 2797 TAILQ_INSERT_HEAD(&shared_resource->nomem_io, bdev_io, internal.link); 2798 /* 2799 * Wait for some of the outstanding I/O to complete before we 2800 * retry any of the nomem_io. Normally we will wait for 2801 * NOMEM_THRESHOLD_COUNT I/O to complete but for low queue 2802 * depth channels we will instead wait for half to complete. 2803 */ 2804 shared_resource->nomem_threshold = spdk_max((int64_t)shared_resource->io_outstanding / 2, 2805 (int64_t)shared_resource->io_outstanding - NOMEM_THRESHOLD_COUNT); 2806 return; 2807 } 2808 2809 if (spdk_unlikely(!TAILQ_EMPTY(&shared_resource->nomem_io))) { 2810 _spdk_bdev_ch_retry_io(bdev_ch); 2811 } 2812 } 2813 2814 _spdk_bdev_io_complete(bdev_io); 2815 } 2816 2817 void 2818 spdk_bdev_io_complete_scsi_status(struct spdk_bdev_io *bdev_io, enum spdk_scsi_status sc, 2819 enum spdk_scsi_sense sk, uint8_t asc, uint8_t ascq) 2820 { 2821 if (sc == SPDK_SCSI_STATUS_GOOD) { 2822 bdev_io->internal.status = SPDK_BDEV_IO_STATUS_SUCCESS; 2823 } else { 2824 bdev_io->internal.status = SPDK_BDEV_IO_STATUS_SCSI_ERROR; 2825 bdev_io->internal.error.scsi.sc = sc; 2826 bdev_io->internal.error.scsi.sk = sk; 2827 bdev_io->internal.error.scsi.asc = asc; 2828 bdev_io->internal.error.scsi.ascq = ascq; 2829 } 2830 2831 spdk_bdev_io_complete(bdev_io, bdev_io->internal.status); 2832 } 2833 2834 void 2835 spdk_bdev_io_get_scsi_status(const struct spdk_bdev_io *bdev_io, 2836 int *sc, int *sk, int *asc, int *ascq) 2837 { 2838 assert(sc != NULL); 2839 assert(sk != NULL); 2840 assert(asc != NULL); 2841 assert(ascq != NULL); 2842 2843 switch (bdev_io->internal.status) { 2844 case SPDK_BDEV_IO_STATUS_SUCCESS: 2845 *sc = SPDK_SCSI_STATUS_GOOD; 2846 *sk = SPDK_SCSI_SENSE_NO_SENSE; 2847 *asc = SPDK_SCSI_ASC_NO_ADDITIONAL_SENSE; 2848 *ascq = SPDK_SCSI_ASCQ_CAUSE_NOT_REPORTABLE; 2849 break; 2850 case SPDK_BDEV_IO_STATUS_NVME_ERROR: 2851 spdk_scsi_nvme_translate(bdev_io, sc, sk, asc, ascq); 2852 break; 2853 case SPDK_BDEV_IO_STATUS_SCSI_ERROR: 2854 *sc = bdev_io->internal.error.scsi.sc; 2855 *sk = bdev_io->internal.error.scsi.sk; 2856 *asc = bdev_io->internal.error.scsi.asc; 2857 *ascq = bdev_io->internal.error.scsi.ascq; 2858 break; 2859 default: 2860 *sc = SPDK_SCSI_STATUS_CHECK_CONDITION; 2861 *sk = SPDK_SCSI_SENSE_ABORTED_COMMAND; 2862 *asc = SPDK_SCSI_ASC_NO_ADDITIONAL_SENSE; 2863 *ascq = SPDK_SCSI_ASCQ_CAUSE_NOT_REPORTABLE; 2864 break; 2865 } 2866 } 2867 2868 void 2869 spdk_bdev_io_complete_nvme_status(struct spdk_bdev_io *bdev_io, int sct, int sc) 2870 { 2871 if (sct == SPDK_NVME_SCT_GENERIC && sc == SPDK_NVME_SC_SUCCESS) { 2872 bdev_io->internal.status = SPDK_BDEV_IO_STATUS_SUCCESS; 2873 } else { 2874 bdev_io->internal.error.nvme.sct = sct; 2875 bdev_io->internal.error.nvme.sc = sc; 2876 bdev_io->internal.status = SPDK_BDEV_IO_STATUS_NVME_ERROR; 2877 } 2878 2879 spdk_bdev_io_complete(bdev_io, bdev_io->internal.status); 2880 } 2881 2882 void 2883 spdk_bdev_io_get_nvme_status(const struct spdk_bdev_io *bdev_io, int *sct, int *sc) 2884 { 2885 assert(sct != NULL); 2886 assert(sc != NULL); 2887 2888 if (bdev_io->internal.status == SPDK_BDEV_IO_STATUS_NVME_ERROR) { 2889 *sct = bdev_io->internal.error.nvme.sct; 2890 *sc = bdev_io->internal.error.nvme.sc; 2891 } else if (bdev_io->internal.status == SPDK_BDEV_IO_STATUS_SUCCESS) { 2892 *sct = SPDK_NVME_SCT_GENERIC; 2893 *sc = SPDK_NVME_SC_SUCCESS; 2894 } else { 2895 *sct = SPDK_NVME_SCT_GENERIC; 2896 *sc = SPDK_NVME_SC_INTERNAL_DEVICE_ERROR; 2897 } 2898 } 2899 2900 struct spdk_thread * 2901 spdk_bdev_io_get_thread(struct spdk_bdev_io *bdev_io) 2902 { 2903 return spdk_io_channel_get_thread(bdev_io->internal.ch->channel); 2904 } 2905 2906 static void 2907 _spdk_bdev_qos_config_type(struct spdk_bdev *bdev, uint64_t qos_set, 2908 enum spdk_bdev_qos_type qos_type) 2909 { 2910 uint64_t min_qos_set = 0; 2911 2912 switch (qos_type) { 2913 case SPDK_BDEV_QOS_RW_IOPS_RATE_LIMIT: 2914 min_qos_set = SPDK_BDEV_QOS_MIN_IOS_PER_SEC; 2915 break; 2916 case SPDK_BDEV_QOS_RW_BYTEPS_RATE_LIMIT: 2917 min_qos_set = SPDK_BDEV_QOS_MIN_BW_IN_MB_PER_SEC; 2918 break; 2919 default: 2920 SPDK_ERRLOG("Unsupported QoS type.\n"); 2921 return; 2922 } 2923 2924 if (qos_set % min_qos_set) { 2925 SPDK_ERRLOG("Assigned QoS %" PRIu64 " on bdev %s is not multiple of %lu\n", 2926 qos_set, bdev->name, min_qos_set); 2927 SPDK_ERRLOG("Failed to enable QoS on this bdev %s\n", bdev->name); 2928 return; 2929 } 2930 2931 if (!bdev->internal.qos) { 2932 bdev->internal.qos = calloc(1, sizeof(*bdev->internal.qos)); 2933 if (!bdev->internal.qos) { 2934 SPDK_ERRLOG("Unable to allocate memory for QoS tracking\n"); 2935 return; 2936 } 2937 } 2938 2939 switch (qos_type) { 2940 case SPDK_BDEV_QOS_RW_IOPS_RATE_LIMIT: 2941 bdev->internal.qos->iops_rate_limit = qos_set; 2942 break; 2943 case SPDK_BDEV_QOS_RW_BYTEPS_RATE_LIMIT: 2944 bdev->internal.qos->byte_rate_limit = qos_set * 1024 * 1024; 2945 break; 2946 default: 2947 break; 2948 } 2949 2950 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Bdev:%s QoS type:%d set:%lu\n", 2951 bdev->name, qos_type, qos_set); 2952 2953 return; 2954 } 2955 2956 static void 2957 _spdk_bdev_qos_config(struct spdk_bdev *bdev) 2958 { 2959 struct spdk_conf_section *sp = NULL; 2960 const char *val = NULL; 2961 uint64_t qos_set = 0; 2962 int i = 0, j = 0; 2963 2964 sp = spdk_conf_find_section(NULL, "QoS"); 2965 if (!sp) { 2966 return; 2967 } 2968 2969 while (j < SPDK_BDEV_QOS_NUM_TYPES) { 2970 i = 0; 2971 while (true) { 2972 val = spdk_conf_section_get_nmval(sp, qos_type_str[j], i, 0); 2973 if (!val) { 2974 break; 2975 } 2976 2977 if (strcmp(bdev->name, val) != 0) { 2978 i++; 2979 continue; 2980 } 2981 2982 val = spdk_conf_section_get_nmval(sp, qos_type_str[j], i, 1); 2983 if (val) { 2984 qos_set = strtoull(val, NULL, 10); 2985 _spdk_bdev_qos_config_type(bdev, qos_set, j); 2986 } 2987 2988 break; 2989 } 2990 2991 j++; 2992 } 2993 2994 return; 2995 } 2996 2997 static int 2998 spdk_bdev_init(struct spdk_bdev *bdev) 2999 { 3000 assert(bdev->module != NULL); 3001 3002 if (!bdev->name) { 3003 SPDK_ERRLOG("Bdev name is NULL\n"); 3004 return -EINVAL; 3005 } 3006 3007 if (spdk_bdev_get_by_name(bdev->name)) { 3008 SPDK_ERRLOG("Bdev name:%s already exists\n", bdev->name); 3009 return -EEXIST; 3010 } 3011 3012 bdev->internal.status = SPDK_BDEV_STATUS_READY; 3013 bdev->internal.measured_queue_depth = UINT64_MAX; 3014 3015 TAILQ_INIT(&bdev->internal.open_descs); 3016 3017 TAILQ_INIT(&bdev->aliases); 3018 3019 bdev->internal.reset_in_progress = NULL; 3020 3021 _spdk_bdev_qos_config(bdev); 3022 3023 spdk_io_device_register(__bdev_to_io_dev(bdev), 3024 spdk_bdev_channel_create, spdk_bdev_channel_destroy, 3025 sizeof(struct spdk_bdev_channel)); 3026 3027 pthread_mutex_init(&bdev->internal.mutex, NULL); 3028 return 0; 3029 } 3030 3031 static void 3032 spdk_bdev_destroy_cb(void *io_device) 3033 { 3034 int rc; 3035 struct spdk_bdev *bdev; 3036 spdk_bdev_unregister_cb cb_fn; 3037 void *cb_arg; 3038 3039 bdev = __bdev_from_io_dev(io_device); 3040 cb_fn = bdev->internal.unregister_cb; 3041 cb_arg = bdev->internal.unregister_ctx; 3042 3043 rc = bdev->fn_table->destruct(bdev->ctxt); 3044 if (rc < 0) { 3045 SPDK_ERRLOG("destruct failed\n"); 3046 } 3047 if (rc <= 0 && cb_fn != NULL) { 3048 cb_fn(cb_arg, rc); 3049 } 3050 } 3051 3052 3053 static void 3054 spdk_bdev_fini(struct spdk_bdev *bdev) 3055 { 3056 pthread_mutex_destroy(&bdev->internal.mutex); 3057 3058 free(bdev->internal.qos); 3059 3060 spdk_io_device_unregister(__bdev_to_io_dev(bdev), spdk_bdev_destroy_cb); 3061 } 3062 3063 static void 3064 spdk_bdev_start(struct spdk_bdev *bdev) 3065 { 3066 struct spdk_bdev_module *module; 3067 uint32_t action; 3068 3069 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Inserting bdev %s into list\n", bdev->name); 3070 TAILQ_INSERT_TAIL(&g_bdev_mgr.bdevs, bdev, internal.link); 3071 3072 /* Examine configuration before initializing I/O */ 3073 TAILQ_FOREACH(module, &g_bdev_mgr.bdev_modules, internal.tailq) { 3074 if (module->examine_config) { 3075 action = module->internal.action_in_progress; 3076 module->internal.action_in_progress++; 3077 module->examine_config(bdev); 3078 if (action != module->internal.action_in_progress) { 3079 SPDK_ERRLOG("examine_config for module %s did not call spdk_bdev_module_examine_done()\n", 3080 module->name); 3081 } 3082 } 3083 } 3084 3085 if (bdev->internal.claim_module) { 3086 return; 3087 } 3088 3089 TAILQ_FOREACH(module, &g_bdev_mgr.bdev_modules, internal.tailq) { 3090 if (module->examine_disk) { 3091 module->internal.action_in_progress++; 3092 module->examine_disk(bdev); 3093 } 3094 } 3095 } 3096 3097 int 3098 spdk_bdev_register(struct spdk_bdev *bdev) 3099 { 3100 int rc = spdk_bdev_init(bdev); 3101 3102 if (rc == 0) { 3103 spdk_bdev_start(bdev); 3104 } 3105 3106 return rc; 3107 } 3108 3109 int 3110 spdk_vbdev_register(struct spdk_bdev *vbdev, struct spdk_bdev **base_bdevs, int base_bdev_count) 3111 { 3112 int rc; 3113 3114 rc = spdk_bdev_init(vbdev); 3115 if (rc) { 3116 return rc; 3117 } 3118 3119 spdk_bdev_start(vbdev); 3120 return 0; 3121 } 3122 3123 void 3124 spdk_bdev_destruct_done(struct spdk_bdev *bdev, int bdeverrno) 3125 { 3126 if (bdev->internal.unregister_cb != NULL) { 3127 bdev->internal.unregister_cb(bdev->internal.unregister_ctx, bdeverrno); 3128 } 3129 } 3130 3131 static void 3132 _remove_notify(void *arg) 3133 { 3134 struct spdk_bdev_desc *desc = arg; 3135 3136 desc->remove_cb(desc->remove_ctx); 3137 } 3138 3139 void 3140 spdk_bdev_unregister(struct spdk_bdev *bdev, spdk_bdev_unregister_cb cb_fn, void *cb_arg) 3141 { 3142 struct spdk_bdev_desc *desc, *tmp; 3143 bool do_destruct = true; 3144 struct spdk_thread *thread; 3145 3146 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Removing bdev %s from list\n", bdev->name); 3147 3148 thread = spdk_get_thread(); 3149 if (!thread) { 3150 /* The user called this from a non-SPDK thread. */ 3151 if (cb_fn != NULL) { 3152 cb_fn(cb_arg, -ENOTSUP); 3153 } 3154 return; 3155 } 3156 3157 pthread_mutex_lock(&bdev->internal.mutex); 3158 3159 bdev->internal.status = SPDK_BDEV_STATUS_REMOVING; 3160 bdev->internal.unregister_cb = cb_fn; 3161 bdev->internal.unregister_ctx = cb_arg; 3162 3163 TAILQ_FOREACH_SAFE(desc, &bdev->internal.open_descs, link, tmp) { 3164 if (desc->remove_cb) { 3165 do_destruct = false; 3166 /* 3167 * Defer invocation of the remove_cb to a separate message that will 3168 * run later on this thread. This ensures this context unwinds and 3169 * we don't recursively unregister this bdev again if the remove_cb 3170 * immediately closes its descriptor. 3171 */ 3172 if (!desc->remove_scheduled) { 3173 /* Avoid scheduling removal of the same descriptor multiple times. */ 3174 desc->remove_scheduled = true; 3175 spdk_thread_send_msg(thread, _remove_notify, desc); 3176 } 3177 } 3178 } 3179 3180 if (!do_destruct) { 3181 pthread_mutex_unlock(&bdev->internal.mutex); 3182 return; 3183 } 3184 3185 TAILQ_REMOVE(&g_bdev_mgr.bdevs, bdev, internal.link); 3186 pthread_mutex_unlock(&bdev->internal.mutex); 3187 3188 spdk_bdev_fini(bdev); 3189 } 3190 3191 int 3192 spdk_bdev_open(struct spdk_bdev *bdev, bool write, spdk_bdev_remove_cb_t remove_cb, 3193 void *remove_ctx, struct spdk_bdev_desc **_desc) 3194 { 3195 struct spdk_bdev_desc *desc; 3196 3197 desc = calloc(1, sizeof(*desc)); 3198 if (desc == NULL) { 3199 SPDK_ERRLOG("Failed to allocate memory for bdev descriptor\n"); 3200 return -ENOMEM; 3201 } 3202 3203 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Opening descriptor %p for bdev %s on thread %p\n", desc, bdev->name, 3204 spdk_get_thread()); 3205 3206 pthread_mutex_lock(&bdev->internal.mutex); 3207 3208 if (write && bdev->internal.claim_module) { 3209 SPDK_ERRLOG("Could not open %s - %s module already claimed it\n", 3210 bdev->name, bdev->internal.claim_module->name); 3211 free(desc); 3212 pthread_mutex_unlock(&bdev->internal.mutex); 3213 return -EPERM; 3214 } 3215 3216 TAILQ_INSERT_TAIL(&bdev->internal.open_descs, desc, link); 3217 3218 desc->bdev = bdev; 3219 desc->remove_cb = remove_cb; 3220 desc->remove_ctx = remove_ctx; 3221 desc->write = write; 3222 *_desc = desc; 3223 3224 pthread_mutex_unlock(&bdev->internal.mutex); 3225 3226 return 0; 3227 } 3228 3229 void 3230 spdk_bdev_close(struct spdk_bdev_desc *desc) 3231 { 3232 struct spdk_bdev *bdev = desc->bdev; 3233 bool do_unregister = false; 3234 3235 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Closing descriptor %p for bdev %s on thread %p\n", desc, bdev->name, 3236 spdk_get_thread()); 3237 3238 pthread_mutex_lock(&bdev->internal.mutex); 3239 3240 TAILQ_REMOVE(&bdev->internal.open_descs, desc, link); 3241 free(desc); 3242 3243 /* If no more descriptors, kill QoS channel */ 3244 if (bdev->internal.qos && TAILQ_EMPTY(&bdev->internal.open_descs)) { 3245 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Closed last descriptor for bdev %s on thread %p. Stopping QoS.\n", 3246 bdev->name, spdk_get_thread()); 3247 3248 if (spdk_bdev_qos_destroy(bdev)) { 3249 /* There isn't anything we can do to recover here. Just let the 3250 * old QoS poller keep running. The QoS handling won't change 3251 * cores when the user allocates a new channel, but it won't break. */ 3252 SPDK_ERRLOG("Unable to shut down QoS poller. It will continue running on the current thread.\n"); 3253 } 3254 } 3255 3256 spdk_bdev_set_qd_sampling_period(bdev, 0); 3257 3258 if (bdev->internal.status == SPDK_BDEV_STATUS_REMOVING && TAILQ_EMPTY(&bdev->internal.open_descs)) { 3259 do_unregister = true; 3260 } 3261 pthread_mutex_unlock(&bdev->internal.mutex); 3262 3263 if (do_unregister == true) { 3264 spdk_bdev_unregister(bdev, bdev->internal.unregister_cb, bdev->internal.unregister_ctx); 3265 } 3266 } 3267 3268 int 3269 spdk_bdev_module_claim_bdev(struct spdk_bdev *bdev, struct spdk_bdev_desc *desc, 3270 struct spdk_bdev_module *module) 3271 { 3272 if (bdev->internal.claim_module != NULL) { 3273 SPDK_ERRLOG("bdev %s already claimed by module %s\n", bdev->name, 3274 bdev->internal.claim_module->name); 3275 return -EPERM; 3276 } 3277 3278 if (desc && !desc->write) { 3279 desc->write = true; 3280 } 3281 3282 bdev->internal.claim_module = module; 3283 return 0; 3284 } 3285 3286 void 3287 spdk_bdev_module_release_bdev(struct spdk_bdev *bdev) 3288 { 3289 assert(bdev->internal.claim_module != NULL); 3290 bdev->internal.claim_module = NULL; 3291 } 3292 3293 struct spdk_bdev * 3294 spdk_bdev_desc_get_bdev(struct spdk_bdev_desc *desc) 3295 { 3296 return desc->bdev; 3297 } 3298 3299 void 3300 spdk_bdev_io_get_iovec(struct spdk_bdev_io *bdev_io, struct iovec **iovp, int *iovcntp) 3301 { 3302 struct iovec *iovs; 3303 int iovcnt; 3304 3305 if (bdev_io == NULL) { 3306 return; 3307 } 3308 3309 switch (bdev_io->type) { 3310 case SPDK_BDEV_IO_TYPE_READ: 3311 iovs = bdev_io->u.bdev.iovs; 3312 iovcnt = bdev_io->u.bdev.iovcnt; 3313 break; 3314 case SPDK_BDEV_IO_TYPE_WRITE: 3315 iovs = bdev_io->u.bdev.iovs; 3316 iovcnt = bdev_io->u.bdev.iovcnt; 3317 break; 3318 default: 3319 iovs = NULL; 3320 iovcnt = 0; 3321 break; 3322 } 3323 3324 if (iovp) { 3325 *iovp = iovs; 3326 } 3327 if (iovcntp) { 3328 *iovcntp = iovcnt; 3329 } 3330 } 3331 3332 void 3333 spdk_bdev_module_list_add(struct spdk_bdev_module *bdev_module) 3334 { 3335 3336 if (spdk_bdev_module_list_find(bdev_module->name)) { 3337 SPDK_ERRLOG("ERROR: module '%s' already registered.\n", bdev_module->name); 3338 assert(false); 3339 } 3340 3341 if (bdev_module->async_init) { 3342 bdev_module->internal.action_in_progress = 1; 3343 } 3344 3345 /* 3346 * Modules with examine callbacks must be initialized first, so they are 3347 * ready to handle examine callbacks from later modules that will 3348 * register physical bdevs. 3349 */ 3350 if (bdev_module->examine_config != NULL || bdev_module->examine_disk != NULL) { 3351 TAILQ_INSERT_HEAD(&g_bdev_mgr.bdev_modules, bdev_module, internal.tailq); 3352 } else { 3353 TAILQ_INSERT_TAIL(&g_bdev_mgr.bdev_modules, bdev_module, internal.tailq); 3354 } 3355 } 3356 3357 struct spdk_bdev_module * 3358 spdk_bdev_module_list_find(const char *name) 3359 { 3360 struct spdk_bdev_module *bdev_module; 3361 3362 TAILQ_FOREACH(bdev_module, &g_bdev_mgr.bdev_modules, internal.tailq) { 3363 if (strcmp(name, bdev_module->name) == 0) { 3364 break; 3365 } 3366 } 3367 3368 return bdev_module; 3369 } 3370 3371 static void 3372 _spdk_bdev_write_zero_buffer_next(void *_bdev_io) 3373 { 3374 struct spdk_bdev_io *bdev_io = _bdev_io; 3375 uint64_t num_bytes, num_blocks; 3376 int rc; 3377 3378 num_bytes = spdk_min(spdk_bdev_get_block_size(bdev_io->bdev) * 3379 bdev_io->u.bdev.split_remaining_num_blocks, 3380 ZERO_BUFFER_SIZE); 3381 num_blocks = num_bytes / spdk_bdev_get_block_size(bdev_io->bdev); 3382 3383 rc = spdk_bdev_write_blocks(bdev_io->internal.desc, 3384 spdk_io_channel_from_ctx(bdev_io->internal.ch), 3385 g_bdev_mgr.zero_buffer, 3386 bdev_io->u.bdev.split_current_offset_blocks, num_blocks, 3387 _spdk_bdev_write_zero_buffer_done, bdev_io); 3388 if (rc == 0) { 3389 bdev_io->u.bdev.split_remaining_num_blocks -= num_blocks; 3390 bdev_io->u.bdev.split_current_offset_blocks += num_blocks; 3391 } else if (rc == -ENOMEM) { 3392 bdev_io->internal.waitq_entry.bdev = bdev_io->bdev; 3393 bdev_io->internal.waitq_entry.cb_fn = _spdk_bdev_write_zero_buffer_next; 3394 bdev_io->internal.waitq_entry.cb_arg = bdev_io; 3395 spdk_bdev_queue_io_wait(bdev_io->bdev, spdk_io_channel_from_ctx(bdev_io->internal.ch), 3396 &bdev_io->internal.waitq_entry); 3397 } else { 3398 /* This should never happen. */ 3399 assert(false); 3400 bdev_io->internal.status = SPDK_BDEV_IO_STATUS_FAILED; 3401 bdev_io->internal.cb(bdev_io, SPDK_BDEV_IO_STATUS_FAILED, bdev_io->internal.caller_ctx); 3402 } 3403 } 3404 3405 static void 3406 _spdk_bdev_write_zero_buffer_done(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg) 3407 { 3408 struct spdk_bdev_io *parent_io = cb_arg; 3409 3410 if (!success) { 3411 parent_io->internal.status = SPDK_BDEV_IO_STATUS_FAILED; 3412 parent_io->internal.cb(parent_io, SPDK_BDEV_IO_STATUS_FAILED, parent_io->internal.caller_ctx); 3413 return; 3414 } 3415 3416 if (parent_io->u.bdev.split_remaining_num_blocks == 0) { 3417 parent_io->internal.status = SPDK_BDEV_IO_STATUS_SUCCESS; 3418 parent_io->internal.cb(parent_io, SPDK_BDEV_IO_STATUS_SUCCESS, parent_io->internal.caller_ctx); 3419 return; 3420 } 3421 3422 _spdk_bdev_write_zero_buffer_next(parent_io); 3423 } 3424 3425 struct set_qos_limit_ctx { 3426 void (*cb_fn)(void *cb_arg, int status); 3427 void *cb_arg; 3428 struct spdk_bdev *bdev; 3429 }; 3430 3431 static void 3432 _spdk_bdev_set_qos_limit_done(struct set_qos_limit_ctx *ctx, int status) 3433 { 3434 pthread_mutex_lock(&ctx->bdev->internal.mutex); 3435 ctx->bdev->internal.qos_mod_in_progress = false; 3436 pthread_mutex_unlock(&ctx->bdev->internal.mutex); 3437 3438 ctx->cb_fn(ctx->cb_arg, status); 3439 free(ctx); 3440 } 3441 3442 static void 3443 _spdk_bdev_disable_qos_done(void *cb_arg) 3444 { 3445 struct set_qos_limit_ctx *ctx = cb_arg; 3446 struct spdk_bdev *bdev = ctx->bdev; 3447 struct spdk_bdev_io *bdev_io; 3448 struct spdk_bdev_qos *qos; 3449 3450 pthread_mutex_lock(&bdev->internal.mutex); 3451 qos = bdev->internal.qos; 3452 bdev->internal.qos = NULL; 3453 pthread_mutex_unlock(&bdev->internal.mutex); 3454 3455 while (!TAILQ_EMPTY(&qos->queued)) { 3456 /* Send queued I/O back to their original thread for resubmission. */ 3457 bdev_io = TAILQ_FIRST(&qos->queued); 3458 TAILQ_REMOVE(&qos->queued, bdev_io, internal.link); 3459 3460 if (bdev_io->internal.io_submit_ch) { 3461 /* 3462 * Channel was changed when sending it to the QoS thread - change it back 3463 * before sending it back to the original thread. 3464 */ 3465 bdev_io->internal.ch = bdev_io->internal.io_submit_ch; 3466 bdev_io->internal.io_submit_ch = NULL; 3467 } 3468 3469 spdk_thread_send_msg(spdk_io_channel_get_thread(bdev_io->internal.ch->channel), 3470 _spdk_bdev_io_submit, bdev_io); 3471 } 3472 3473 spdk_put_io_channel(spdk_io_channel_from_ctx(qos->ch)); 3474 spdk_poller_unregister(&qos->poller); 3475 3476 free(qos); 3477 3478 _spdk_bdev_set_qos_limit_done(ctx, 0); 3479 } 3480 3481 static void 3482 _spdk_bdev_disable_qos_msg_done(struct spdk_io_channel_iter *i, int status) 3483 { 3484 void *io_device = spdk_io_channel_iter_get_io_device(i); 3485 struct spdk_bdev *bdev = __bdev_from_io_dev(io_device); 3486 struct set_qos_limit_ctx *ctx = spdk_io_channel_iter_get_ctx(i); 3487 struct spdk_thread *thread; 3488 3489 pthread_mutex_lock(&bdev->internal.mutex); 3490 thread = bdev->internal.qos->thread; 3491 pthread_mutex_unlock(&bdev->internal.mutex); 3492 3493 spdk_thread_send_msg(thread, _spdk_bdev_disable_qos_done, ctx); 3494 } 3495 3496 static void 3497 _spdk_bdev_disable_qos_msg(struct spdk_io_channel_iter *i) 3498 { 3499 struct spdk_io_channel *ch = spdk_io_channel_iter_get_channel(i); 3500 struct spdk_bdev_channel *bdev_ch = spdk_io_channel_get_ctx(ch); 3501 3502 bdev_ch->flags &= ~BDEV_CH_QOS_ENABLED; 3503 3504 spdk_for_each_channel_continue(i, 0); 3505 } 3506 3507 static void 3508 _spdk_bdev_update_qos_limit_iops_msg(void *cb_arg) 3509 { 3510 struct set_qos_limit_ctx *ctx = cb_arg; 3511 struct spdk_bdev *bdev = ctx->bdev; 3512 3513 pthread_mutex_lock(&bdev->internal.mutex); 3514 spdk_bdev_qos_update_max_quota_per_timeslice(bdev->internal.qos); 3515 pthread_mutex_unlock(&bdev->internal.mutex); 3516 3517 _spdk_bdev_set_qos_limit_done(ctx, 0); 3518 } 3519 3520 static void 3521 _spdk_bdev_enable_qos_msg(struct spdk_io_channel_iter *i) 3522 { 3523 void *io_device = spdk_io_channel_iter_get_io_device(i); 3524 struct spdk_bdev *bdev = __bdev_from_io_dev(io_device); 3525 struct spdk_io_channel *ch = spdk_io_channel_iter_get_channel(i); 3526 struct spdk_bdev_channel *bdev_ch = spdk_io_channel_get_ctx(ch); 3527 3528 pthread_mutex_lock(&bdev->internal.mutex); 3529 _spdk_bdev_enable_qos(bdev, bdev_ch); 3530 pthread_mutex_unlock(&bdev->internal.mutex); 3531 spdk_for_each_channel_continue(i, 0); 3532 } 3533 3534 static void 3535 _spdk_bdev_enable_qos_done(struct spdk_io_channel_iter *i, int status) 3536 { 3537 struct set_qos_limit_ctx *ctx = spdk_io_channel_iter_get_ctx(i); 3538 3539 _spdk_bdev_set_qos_limit_done(ctx, status); 3540 } 3541 3542 void 3543 spdk_bdev_set_qos_limit_iops(struct spdk_bdev *bdev, uint64_t ios_per_sec, 3544 void (*cb_fn)(void *cb_arg, int status), void *cb_arg) 3545 { 3546 struct set_qos_limit_ctx *ctx; 3547 3548 if (ios_per_sec > 0 && ios_per_sec % SPDK_BDEV_QOS_MIN_IOS_PER_SEC) { 3549 SPDK_ERRLOG("Requested ios_per_sec limit %" PRIu64 " is not a multiple of %u\n", 3550 ios_per_sec, SPDK_BDEV_QOS_MIN_IOS_PER_SEC); 3551 cb_fn(cb_arg, -EINVAL); 3552 return; 3553 } 3554 3555 ctx = calloc(1, sizeof(*ctx)); 3556 if (ctx == NULL) { 3557 cb_fn(cb_arg, -ENOMEM); 3558 return; 3559 } 3560 3561 ctx->cb_fn = cb_fn; 3562 ctx->cb_arg = cb_arg; 3563 ctx->bdev = bdev; 3564 3565 pthread_mutex_lock(&bdev->internal.mutex); 3566 if (bdev->internal.qos_mod_in_progress) { 3567 pthread_mutex_unlock(&bdev->internal.mutex); 3568 free(ctx); 3569 cb_fn(cb_arg, -EAGAIN); 3570 return; 3571 } 3572 bdev->internal.qos_mod_in_progress = true; 3573 3574 if (ios_per_sec > 0) { 3575 if (bdev->internal.qos == NULL) { 3576 /* Enabling */ 3577 bdev->internal.qos = calloc(1, sizeof(*bdev->internal.qos)); 3578 if (!bdev->internal.qos) { 3579 pthread_mutex_unlock(&bdev->internal.mutex); 3580 SPDK_ERRLOG("Unable to allocate memory for QoS tracking\n"); 3581 free(ctx); 3582 cb_fn(cb_arg, -ENOMEM); 3583 return; 3584 } 3585 3586 bdev->internal.qos->iops_rate_limit = ios_per_sec; 3587 spdk_for_each_channel(__bdev_to_io_dev(bdev), 3588 _spdk_bdev_enable_qos_msg, ctx, 3589 _spdk_bdev_enable_qos_done); 3590 } else { 3591 /* Updating */ 3592 bdev->internal.qos->iops_rate_limit = ios_per_sec; 3593 spdk_thread_send_msg(bdev->internal.qos->thread, _spdk_bdev_update_qos_limit_iops_msg, ctx); 3594 } 3595 } else { 3596 if (bdev->internal.qos != NULL) { 3597 /* Disabling */ 3598 spdk_for_each_channel(__bdev_to_io_dev(bdev), 3599 _spdk_bdev_disable_qos_msg, ctx, 3600 _spdk_bdev_disable_qos_msg_done); 3601 } else { 3602 pthread_mutex_unlock(&bdev->internal.mutex); 3603 _spdk_bdev_set_qos_limit_done(ctx, 0); 3604 return; 3605 } 3606 } 3607 3608 pthread_mutex_unlock(&bdev->internal.mutex); 3609 } 3610 3611 SPDK_LOG_REGISTER_COMPONENT("bdev", SPDK_LOG_BDEV) 3612