1 /*- 2 * BSD LICENSE 3 * 4 * Copyright (c) Intel Corporation. 5 * All rights reserved. 6 * 7 * Redistribution and use in source and binary forms, with or without 8 * modification, are permitted provided that the following conditions 9 * are met: 10 * 11 * * Redistributions of source code must retain the above copyright 12 * notice, this list of conditions and the following disclaimer. 13 * * Redistributions in binary form must reproduce the above copyright 14 * notice, this list of conditions and the following disclaimer in 15 * the documentation and/or other materials provided with the 16 * distribution. 17 * * Neither the name of Intel Corporation nor the names of its 18 * contributors may be used to endorse or promote products derived 19 * from this software without specific prior written permission. 20 * 21 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 22 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 23 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR 24 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT 25 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, 26 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT 27 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 28 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY 29 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 30 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE 31 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 32 */ 33 34 #include "spdk/stdinc.h" 35 36 #include "spdk/bdev.h" 37 #include "spdk/conf.h" 38 39 #include "spdk/env.h" 40 #include "spdk/event.h" 41 #include "spdk/thread.h" 42 #include "spdk/likely.h" 43 #include "spdk/queue.h" 44 #include "spdk/nvme_spec.h" 45 #include "spdk/scsi_spec.h" 46 #include "spdk/util.h" 47 48 #include "spdk/bdev_module.h" 49 #include "spdk_internal/log.h" 50 #include "spdk/string.h" 51 52 #ifdef SPDK_CONFIG_VTUNE 53 #include "ittnotify.h" 54 #include "ittnotify_types.h" 55 int __itt_init_ittlib(const char *, __itt_group_id); 56 #endif 57 58 #define SPDK_BDEV_IO_POOL_SIZE (64 * 1024) 59 #define SPDK_BDEV_IO_CACHE_SIZE 256 60 #define BUF_SMALL_POOL_SIZE 8192 61 #define BUF_LARGE_POOL_SIZE 1024 62 #define NOMEM_THRESHOLD_COUNT 8 63 #define ZERO_BUFFER_SIZE 0x100000 64 #define SPDK_BDEV_QOS_TIMESLICE_IN_USEC 1000 65 #define SPDK_BDEV_QOS_MIN_IO_PER_TIMESLICE 1 66 #define SPDK_BDEV_QOS_MIN_BYTE_PER_TIMESLICE 512 67 #define SPDK_BDEV_QOS_MIN_IOS_PER_SEC 10000 68 #define SPDK_BDEV_QOS_MIN_BW_IN_MB_PER_SEC 10 69 70 enum spdk_bdev_qos_type { 71 SPDK_BDEV_QOS_RW_IOPS_RATE_LIMIT = 0, 72 SPDK_BDEV_QOS_RW_BYTEPS_RATE_LIMIT, 73 SPDK_BDEV_QOS_NUM_TYPES /* Keep last */ 74 }; 75 76 static const char *qos_type_str[SPDK_BDEV_QOS_NUM_TYPES] = {"Limit_IOPS", "Limit_BWPS"}; 77 78 TAILQ_HEAD(spdk_bdev_list, spdk_bdev); 79 80 struct spdk_bdev_mgr { 81 struct spdk_mempool *bdev_io_pool; 82 83 struct spdk_mempool *buf_small_pool; 84 struct spdk_mempool *buf_large_pool; 85 86 void *zero_buffer; 87 88 TAILQ_HEAD(, spdk_bdev_module) bdev_modules; 89 90 struct spdk_bdev_list bdevs; 91 92 bool init_complete; 93 bool module_init_complete; 94 95 #ifdef SPDK_CONFIG_VTUNE 96 __itt_domain *domain; 97 #endif 98 }; 99 100 static struct spdk_bdev_mgr g_bdev_mgr = { 101 .bdev_modules = TAILQ_HEAD_INITIALIZER(g_bdev_mgr.bdev_modules), 102 .bdevs = TAILQ_HEAD_INITIALIZER(g_bdev_mgr.bdevs), 103 .init_complete = false, 104 .module_init_complete = false, 105 }; 106 107 static struct spdk_bdev_opts g_bdev_opts = { 108 .bdev_io_pool_size = SPDK_BDEV_IO_POOL_SIZE, 109 .bdev_io_cache_size = SPDK_BDEV_IO_CACHE_SIZE, 110 }; 111 112 static spdk_bdev_init_cb g_init_cb_fn = NULL; 113 static void *g_init_cb_arg = NULL; 114 115 static spdk_bdev_fini_cb g_fini_cb_fn = NULL; 116 static void *g_fini_cb_arg = NULL; 117 static struct spdk_thread *g_fini_thread = NULL; 118 119 struct spdk_bdev_qos { 120 /** Rate limit, in I/O per second */ 121 uint64_t iops_rate_limit; 122 123 /** Rate limit, in byte per second */ 124 uint64_t byte_rate_limit; 125 126 /** The channel that all I/O are funneled through */ 127 struct spdk_bdev_channel *ch; 128 129 /** The thread on which the poller is running. */ 130 struct spdk_thread *thread; 131 132 /** Queue of I/O waiting to be issued. */ 133 bdev_io_tailq_t queued; 134 135 /** Maximum allowed IOs to be issued in one timeslice (e.g., 1ms) and 136 * only valid for the master channel which manages the outstanding IOs. */ 137 uint64_t max_ios_per_timeslice; 138 139 /** Maximum allowed bytes to be issued in one timeslice (e.g., 1ms) and 140 * only valid for the master channel which manages the outstanding IOs. */ 141 uint64_t max_byte_per_timeslice; 142 143 /** Remaining IO allowed in current timeslice (e.g., 1ms) */ 144 uint64_t io_remaining_this_timeslice; 145 146 /** Remaining bytes allowed in current timeslice (e.g., 1ms). 147 * Allowed to run negative if an I/O is submitted when some bytes are remaining, 148 * but the I/O is bigger than that amount. The excess will be deducted from the 149 * next timeslice. 150 */ 151 int64_t byte_remaining_this_timeslice; 152 153 /** Poller that processes queued I/O commands each time slice. */ 154 struct spdk_poller *poller; 155 }; 156 157 struct spdk_bdev_mgmt_channel { 158 bdev_io_stailq_t need_buf_small; 159 bdev_io_stailq_t need_buf_large; 160 161 /* 162 * Each thread keeps a cache of bdev_io - this allows 163 * bdev threads which are *not* DPDK threads to still 164 * benefit from a per-thread bdev_io cache. Without 165 * this, non-DPDK threads fetching from the mempool 166 * incur a cmpxchg on get and put. 167 */ 168 bdev_io_stailq_t per_thread_cache; 169 uint32_t per_thread_cache_count; 170 uint32_t bdev_io_cache_size; 171 172 TAILQ_HEAD(, spdk_bdev_shared_resource) shared_resources; 173 TAILQ_HEAD(, spdk_bdev_io_wait_entry) io_wait_queue; 174 }; 175 176 /* 177 * Per-module (or per-io_device) data. Multiple bdevs built on the same io_device 178 * will queue here their IO that awaits retry. It makes it possible to retry sending 179 * IO to one bdev after IO from other bdev completes. 180 */ 181 struct spdk_bdev_shared_resource { 182 /* The bdev management channel */ 183 struct spdk_bdev_mgmt_channel *mgmt_ch; 184 185 /* 186 * Count of I/O submitted to bdev module and waiting for completion. 187 * Incremented before submit_request() is called on an spdk_bdev_io. 188 */ 189 uint64_t io_outstanding; 190 191 /* 192 * Queue of IO awaiting retry because of a previous NOMEM status returned 193 * on this channel. 194 */ 195 bdev_io_tailq_t nomem_io; 196 197 /* 198 * Threshold which io_outstanding must drop to before retrying nomem_io. 199 */ 200 uint64_t nomem_threshold; 201 202 /* I/O channel allocated by a bdev module */ 203 struct spdk_io_channel *shared_ch; 204 205 /* Refcount of bdev channels using this resource */ 206 uint32_t ref; 207 208 TAILQ_ENTRY(spdk_bdev_shared_resource) link; 209 }; 210 211 #define BDEV_CH_RESET_IN_PROGRESS (1 << 0) 212 #define BDEV_CH_QOS_ENABLED (1 << 1) 213 214 struct spdk_bdev_channel { 215 struct spdk_bdev *bdev; 216 217 /* The channel for the underlying device */ 218 struct spdk_io_channel *channel; 219 220 /* Per io_device per thread data */ 221 struct spdk_bdev_shared_resource *shared_resource; 222 223 struct spdk_bdev_io_stat stat; 224 225 /* 226 * Count of I/O submitted through this channel and waiting for completion. 227 * Incremented before submit_request() is called on an spdk_bdev_io. 228 */ 229 uint64_t io_outstanding; 230 231 bdev_io_tailq_t queued_resets; 232 233 uint32_t flags; 234 235 #ifdef SPDK_CONFIG_VTUNE 236 uint64_t start_tsc; 237 uint64_t interval_tsc; 238 __itt_string_handle *handle; 239 struct spdk_bdev_io_stat prev_stat; 240 #endif 241 242 }; 243 244 struct spdk_bdev_desc { 245 struct spdk_bdev *bdev; 246 spdk_bdev_remove_cb_t remove_cb; 247 void *remove_ctx; 248 bool remove_scheduled; 249 bool write; 250 TAILQ_ENTRY(spdk_bdev_desc) link; 251 }; 252 253 struct spdk_bdev_iostat_ctx { 254 struct spdk_bdev_io_stat *stat; 255 spdk_bdev_get_device_stat_cb cb; 256 void *cb_arg; 257 }; 258 259 #define __bdev_to_io_dev(bdev) (((char *)bdev) + 1) 260 #define __bdev_from_io_dev(io_dev) ((struct spdk_bdev *)(((char *)io_dev) - 1)) 261 262 static void _spdk_bdev_write_zero_buffer_done(struct spdk_bdev_io *bdev_io, bool success, 263 void *cb_arg); 264 static void _spdk_bdev_write_zero_buffer_next(void *_bdev_io); 265 266 void 267 spdk_bdev_get_opts(struct spdk_bdev_opts *opts) 268 { 269 *opts = g_bdev_opts; 270 } 271 272 int 273 spdk_bdev_set_opts(struct spdk_bdev_opts *opts) 274 { 275 uint32_t min_pool_size; 276 277 /* 278 * Add 1 to the thread count to account for the extra mgmt_ch that gets created during subsystem 279 * initialization. A second mgmt_ch will be created on the same thread when the application starts 280 * but before the deferred put_io_channel event is executed for the first mgmt_ch. 281 */ 282 min_pool_size = opts->bdev_io_cache_size * (spdk_thread_get_count() + 1); 283 if (opts->bdev_io_pool_size < min_pool_size) { 284 SPDK_ERRLOG("bdev_io_pool_size %" PRIu32 " is not compatible with bdev_io_cache_size %" PRIu32 285 " and %" PRIu32 " threads\n", opts->bdev_io_pool_size, opts->bdev_io_cache_size, 286 spdk_thread_get_count()); 287 SPDK_ERRLOG("bdev_io_pool_size must be at least %" PRIu32 "\n", min_pool_size); 288 return -1; 289 } 290 291 g_bdev_opts = *opts; 292 return 0; 293 } 294 295 struct spdk_bdev * 296 spdk_bdev_first(void) 297 { 298 struct spdk_bdev *bdev; 299 300 bdev = TAILQ_FIRST(&g_bdev_mgr.bdevs); 301 if (bdev) { 302 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Starting bdev iteration at %s\n", bdev->name); 303 } 304 305 return bdev; 306 } 307 308 struct spdk_bdev * 309 spdk_bdev_next(struct spdk_bdev *prev) 310 { 311 struct spdk_bdev *bdev; 312 313 bdev = TAILQ_NEXT(prev, internal.link); 314 if (bdev) { 315 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Continuing bdev iteration at %s\n", bdev->name); 316 } 317 318 return bdev; 319 } 320 321 static struct spdk_bdev * 322 _bdev_next_leaf(struct spdk_bdev *bdev) 323 { 324 while (bdev != NULL) { 325 if (bdev->internal.claim_module == NULL) { 326 return bdev; 327 } else { 328 bdev = TAILQ_NEXT(bdev, internal.link); 329 } 330 } 331 332 return bdev; 333 } 334 335 struct spdk_bdev * 336 spdk_bdev_first_leaf(void) 337 { 338 struct spdk_bdev *bdev; 339 340 bdev = _bdev_next_leaf(TAILQ_FIRST(&g_bdev_mgr.bdevs)); 341 342 if (bdev) { 343 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Starting bdev iteration at %s\n", bdev->name); 344 } 345 346 return bdev; 347 } 348 349 struct spdk_bdev * 350 spdk_bdev_next_leaf(struct spdk_bdev *prev) 351 { 352 struct spdk_bdev *bdev; 353 354 bdev = _bdev_next_leaf(TAILQ_NEXT(prev, internal.link)); 355 356 if (bdev) { 357 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Continuing bdev iteration at %s\n", bdev->name); 358 } 359 360 return bdev; 361 } 362 363 struct spdk_bdev * 364 spdk_bdev_get_by_name(const char *bdev_name) 365 { 366 struct spdk_bdev_alias *tmp; 367 struct spdk_bdev *bdev = spdk_bdev_first(); 368 369 while (bdev != NULL) { 370 if (strcmp(bdev_name, bdev->name) == 0) { 371 return bdev; 372 } 373 374 TAILQ_FOREACH(tmp, &bdev->aliases, tailq) { 375 if (strcmp(bdev_name, tmp->alias) == 0) { 376 return bdev; 377 } 378 } 379 380 bdev = spdk_bdev_next(bdev); 381 } 382 383 return NULL; 384 } 385 386 void 387 spdk_bdev_io_set_buf(struct spdk_bdev_io *bdev_io, void *buf, size_t len) 388 { 389 struct iovec *iovs; 390 391 iovs = bdev_io->u.bdev.iovs; 392 393 assert(iovs != NULL); 394 assert(bdev_io->u.bdev.iovcnt >= 1); 395 396 iovs[0].iov_base = buf; 397 iovs[0].iov_len = len; 398 } 399 400 static void 401 spdk_bdev_io_put_buf(struct spdk_bdev_io *bdev_io) 402 { 403 struct spdk_mempool *pool; 404 struct spdk_bdev_io *tmp; 405 void *buf, *aligned_buf; 406 bdev_io_stailq_t *stailq; 407 struct spdk_bdev_mgmt_channel *ch; 408 409 assert(bdev_io->u.bdev.iovcnt == 1); 410 411 buf = bdev_io->internal.buf; 412 ch = bdev_io->internal.ch->shared_resource->mgmt_ch; 413 414 bdev_io->internal.buf = NULL; 415 416 if (bdev_io->internal.buf_len <= SPDK_BDEV_SMALL_BUF_MAX_SIZE) { 417 pool = g_bdev_mgr.buf_small_pool; 418 stailq = &ch->need_buf_small; 419 } else { 420 pool = g_bdev_mgr.buf_large_pool; 421 stailq = &ch->need_buf_large; 422 } 423 424 if (STAILQ_EMPTY(stailq)) { 425 spdk_mempool_put(pool, buf); 426 } else { 427 tmp = STAILQ_FIRST(stailq); 428 429 aligned_buf = (void *)(((uintptr_t)buf + 511) & ~511UL); 430 spdk_bdev_io_set_buf(bdev_io, aligned_buf, tmp->internal.buf_len); 431 432 STAILQ_REMOVE_HEAD(stailq, internal.buf_link); 433 tmp->internal.buf = buf; 434 tmp->internal.get_buf_cb(tmp->internal.ch->channel, tmp); 435 } 436 } 437 438 void 439 spdk_bdev_io_get_buf(struct spdk_bdev_io *bdev_io, spdk_bdev_io_get_buf_cb cb, uint64_t len) 440 { 441 struct spdk_mempool *pool; 442 bdev_io_stailq_t *stailq; 443 void *buf, *aligned_buf; 444 struct spdk_bdev_mgmt_channel *mgmt_ch; 445 446 assert(cb != NULL); 447 assert(bdev_io->u.bdev.iovs != NULL); 448 449 if (spdk_unlikely(bdev_io->u.bdev.iovs[0].iov_base != NULL)) { 450 /* Buffer already present */ 451 cb(bdev_io->internal.ch->channel, bdev_io); 452 return; 453 } 454 455 assert(len <= SPDK_BDEV_LARGE_BUF_MAX_SIZE); 456 mgmt_ch = bdev_io->internal.ch->shared_resource->mgmt_ch; 457 458 bdev_io->internal.buf_len = len; 459 bdev_io->internal.get_buf_cb = cb; 460 if (len <= SPDK_BDEV_SMALL_BUF_MAX_SIZE) { 461 pool = g_bdev_mgr.buf_small_pool; 462 stailq = &mgmt_ch->need_buf_small; 463 } else { 464 pool = g_bdev_mgr.buf_large_pool; 465 stailq = &mgmt_ch->need_buf_large; 466 } 467 468 buf = spdk_mempool_get(pool); 469 470 if (!buf) { 471 STAILQ_INSERT_TAIL(stailq, bdev_io, internal.buf_link); 472 } else { 473 aligned_buf = (void *)(((uintptr_t)buf + 511) & ~511UL); 474 spdk_bdev_io_set_buf(bdev_io, aligned_buf, len); 475 476 bdev_io->internal.buf = buf; 477 bdev_io->internal.get_buf_cb(bdev_io->internal.ch->channel, bdev_io); 478 } 479 } 480 481 static int 482 spdk_bdev_module_get_max_ctx_size(void) 483 { 484 struct spdk_bdev_module *bdev_module; 485 int max_bdev_module_size = 0; 486 487 TAILQ_FOREACH(bdev_module, &g_bdev_mgr.bdev_modules, internal.tailq) { 488 if (bdev_module->get_ctx_size && bdev_module->get_ctx_size() > max_bdev_module_size) { 489 max_bdev_module_size = bdev_module->get_ctx_size(); 490 } 491 } 492 493 return max_bdev_module_size; 494 } 495 496 void 497 spdk_bdev_config_text(FILE *fp) 498 { 499 struct spdk_bdev_module *bdev_module; 500 501 TAILQ_FOREACH(bdev_module, &g_bdev_mgr.bdev_modules, internal.tailq) { 502 if (bdev_module->config_text) { 503 bdev_module->config_text(fp); 504 } 505 } 506 } 507 508 void 509 spdk_bdev_subsystem_config_json(struct spdk_json_write_ctx *w) 510 { 511 struct spdk_bdev_module *bdev_module; 512 struct spdk_bdev *bdev; 513 514 assert(w != NULL); 515 516 spdk_json_write_array_begin(w); 517 518 spdk_json_write_object_begin(w); 519 spdk_json_write_named_string(w, "method", "set_bdev_options"); 520 spdk_json_write_name(w, "params"); 521 spdk_json_write_object_begin(w); 522 spdk_json_write_named_uint32(w, "bdev_io_pool_size", g_bdev_opts.bdev_io_pool_size); 523 spdk_json_write_named_uint32(w, "bdev_io_cache_size", g_bdev_opts.bdev_io_cache_size); 524 spdk_json_write_object_end(w); 525 spdk_json_write_object_end(w); 526 527 TAILQ_FOREACH(bdev_module, &g_bdev_mgr.bdev_modules, internal.tailq) { 528 if (bdev_module->config_json) { 529 bdev_module->config_json(w); 530 } 531 } 532 533 TAILQ_FOREACH(bdev, &g_bdev_mgr.bdevs, internal.link) { 534 spdk_bdev_config_json(bdev, w); 535 } 536 537 spdk_json_write_array_end(w); 538 } 539 540 static int 541 spdk_bdev_mgmt_channel_create(void *io_device, void *ctx_buf) 542 { 543 struct spdk_bdev_mgmt_channel *ch = ctx_buf; 544 struct spdk_bdev_io *bdev_io; 545 uint32_t i; 546 547 STAILQ_INIT(&ch->need_buf_small); 548 STAILQ_INIT(&ch->need_buf_large); 549 550 STAILQ_INIT(&ch->per_thread_cache); 551 ch->bdev_io_cache_size = g_bdev_opts.bdev_io_cache_size; 552 553 /* Pre-populate bdev_io cache to ensure this thread cannot be starved. */ 554 ch->per_thread_cache_count = 0; 555 for (i = 0; i < ch->bdev_io_cache_size; i++) { 556 bdev_io = spdk_mempool_get(g_bdev_mgr.bdev_io_pool); 557 assert(bdev_io != NULL); 558 ch->per_thread_cache_count++; 559 STAILQ_INSERT_TAIL(&ch->per_thread_cache, bdev_io, internal.buf_link); 560 } 561 562 TAILQ_INIT(&ch->shared_resources); 563 TAILQ_INIT(&ch->io_wait_queue); 564 565 return 0; 566 } 567 568 static void 569 spdk_bdev_mgmt_channel_destroy(void *io_device, void *ctx_buf) 570 { 571 struct spdk_bdev_mgmt_channel *ch = ctx_buf; 572 struct spdk_bdev_io *bdev_io; 573 574 if (!STAILQ_EMPTY(&ch->need_buf_small) || !STAILQ_EMPTY(&ch->need_buf_large)) { 575 SPDK_ERRLOG("Pending I/O list wasn't empty on mgmt channel free\n"); 576 } 577 578 if (!TAILQ_EMPTY(&ch->shared_resources)) { 579 SPDK_ERRLOG("Module channel list wasn't empty on mgmt channel free\n"); 580 } 581 582 while (!STAILQ_EMPTY(&ch->per_thread_cache)) { 583 bdev_io = STAILQ_FIRST(&ch->per_thread_cache); 584 STAILQ_REMOVE_HEAD(&ch->per_thread_cache, internal.buf_link); 585 ch->per_thread_cache_count--; 586 spdk_mempool_put(g_bdev_mgr.bdev_io_pool, (void *)bdev_io); 587 } 588 589 assert(ch->per_thread_cache_count == 0); 590 } 591 592 static void 593 spdk_bdev_init_complete(int rc) 594 { 595 spdk_bdev_init_cb cb_fn = g_init_cb_fn; 596 void *cb_arg = g_init_cb_arg; 597 struct spdk_bdev_module *m; 598 599 g_bdev_mgr.init_complete = true; 600 g_init_cb_fn = NULL; 601 g_init_cb_arg = NULL; 602 603 /* 604 * For modules that need to know when subsystem init is complete, 605 * inform them now. 606 */ 607 if (rc == 0) { 608 TAILQ_FOREACH(m, &g_bdev_mgr.bdev_modules, internal.tailq) { 609 if (m->init_complete) { 610 m->init_complete(); 611 } 612 } 613 } 614 615 cb_fn(cb_arg, rc); 616 } 617 618 static void 619 spdk_bdev_module_action_complete(void) 620 { 621 struct spdk_bdev_module *m; 622 623 /* 624 * Don't finish bdev subsystem initialization if 625 * module pre-initialization is still in progress, or 626 * the subsystem been already initialized. 627 */ 628 if (!g_bdev_mgr.module_init_complete || g_bdev_mgr.init_complete) { 629 return; 630 } 631 632 /* 633 * Check all bdev modules for inits/examinations in progress. If any 634 * exist, return immediately since we cannot finish bdev subsystem 635 * initialization until all are completed. 636 */ 637 TAILQ_FOREACH(m, &g_bdev_mgr.bdev_modules, internal.tailq) { 638 if (m->internal.action_in_progress > 0) { 639 return; 640 } 641 } 642 643 /* 644 * Modules already finished initialization - now that all 645 * the bdev modules have finished their asynchronous I/O 646 * processing, the entire bdev layer can be marked as complete. 647 */ 648 spdk_bdev_init_complete(0); 649 } 650 651 static void 652 spdk_bdev_module_action_done(struct spdk_bdev_module *module) 653 { 654 assert(module->internal.action_in_progress > 0); 655 module->internal.action_in_progress--; 656 spdk_bdev_module_action_complete(); 657 } 658 659 void 660 spdk_bdev_module_init_done(struct spdk_bdev_module *module) 661 { 662 spdk_bdev_module_action_done(module); 663 } 664 665 void 666 spdk_bdev_module_examine_done(struct spdk_bdev_module *module) 667 { 668 spdk_bdev_module_action_done(module); 669 } 670 671 static int 672 spdk_bdev_modules_init(void) 673 { 674 struct spdk_bdev_module *module; 675 int rc = 0; 676 677 TAILQ_FOREACH(module, &g_bdev_mgr.bdev_modules, internal.tailq) { 678 rc = module->module_init(); 679 if (rc != 0) { 680 break; 681 } 682 } 683 684 g_bdev_mgr.module_init_complete = true; 685 return rc; 686 } 687 688 689 static void 690 spdk_bdev_init_failed_complete(void *cb_arg) 691 { 692 spdk_bdev_init_complete(-1); 693 } 694 695 static void 696 spdk_bdev_init_failed(void *cb_arg) 697 { 698 spdk_bdev_finish(spdk_bdev_init_failed_complete, NULL); 699 } 700 701 void 702 spdk_bdev_initialize(spdk_bdev_init_cb cb_fn, void *cb_arg) 703 { 704 struct spdk_conf_section *sp; 705 struct spdk_bdev_opts bdev_opts; 706 int32_t bdev_io_pool_size, bdev_io_cache_size; 707 int cache_size; 708 int rc = 0; 709 char mempool_name[32]; 710 711 assert(cb_fn != NULL); 712 713 sp = spdk_conf_find_section(NULL, "Bdev"); 714 if (sp != NULL) { 715 spdk_bdev_get_opts(&bdev_opts); 716 717 bdev_io_pool_size = spdk_conf_section_get_intval(sp, "BdevIoPoolSize"); 718 if (bdev_io_pool_size >= 0) { 719 bdev_opts.bdev_io_pool_size = bdev_io_pool_size; 720 } 721 722 bdev_io_cache_size = spdk_conf_section_get_intval(sp, "BdevIoCacheSize"); 723 if (bdev_io_cache_size >= 0) { 724 bdev_opts.bdev_io_cache_size = bdev_io_cache_size; 725 } 726 727 if (spdk_bdev_set_opts(&bdev_opts)) { 728 spdk_bdev_init_complete(-1); 729 return; 730 } 731 732 assert(memcmp(&bdev_opts, &g_bdev_opts, sizeof(bdev_opts)) == 0); 733 } 734 735 g_init_cb_fn = cb_fn; 736 g_init_cb_arg = cb_arg; 737 738 snprintf(mempool_name, sizeof(mempool_name), "bdev_io_%d", getpid()); 739 740 g_bdev_mgr.bdev_io_pool = spdk_mempool_create(mempool_name, 741 g_bdev_opts.bdev_io_pool_size, 742 sizeof(struct spdk_bdev_io) + 743 spdk_bdev_module_get_max_ctx_size(), 744 0, 745 SPDK_ENV_SOCKET_ID_ANY); 746 747 if (g_bdev_mgr.bdev_io_pool == NULL) { 748 SPDK_ERRLOG("could not allocate spdk_bdev_io pool\n"); 749 spdk_bdev_init_complete(-1); 750 return; 751 } 752 753 /** 754 * Ensure no more than half of the total buffers end up local caches, by 755 * using spdk_thread_get_count() to determine how many local caches we need 756 * to account for. 757 */ 758 cache_size = BUF_SMALL_POOL_SIZE / (2 * spdk_thread_get_count()); 759 snprintf(mempool_name, sizeof(mempool_name), "buf_small_pool_%d", getpid()); 760 761 g_bdev_mgr.buf_small_pool = spdk_mempool_create(mempool_name, 762 BUF_SMALL_POOL_SIZE, 763 SPDK_BDEV_SMALL_BUF_MAX_SIZE + 512, 764 cache_size, 765 SPDK_ENV_SOCKET_ID_ANY); 766 if (!g_bdev_mgr.buf_small_pool) { 767 SPDK_ERRLOG("create rbuf small pool failed\n"); 768 spdk_bdev_init_complete(-1); 769 return; 770 } 771 772 cache_size = BUF_LARGE_POOL_SIZE / (2 * spdk_thread_get_count()); 773 snprintf(mempool_name, sizeof(mempool_name), "buf_large_pool_%d", getpid()); 774 775 g_bdev_mgr.buf_large_pool = spdk_mempool_create(mempool_name, 776 BUF_LARGE_POOL_SIZE, 777 SPDK_BDEV_LARGE_BUF_MAX_SIZE + 512, 778 cache_size, 779 SPDK_ENV_SOCKET_ID_ANY); 780 if (!g_bdev_mgr.buf_large_pool) { 781 SPDK_ERRLOG("create rbuf large pool failed\n"); 782 spdk_bdev_init_complete(-1); 783 return; 784 } 785 786 g_bdev_mgr.zero_buffer = spdk_dma_zmalloc(ZERO_BUFFER_SIZE, ZERO_BUFFER_SIZE, 787 NULL); 788 if (!g_bdev_mgr.zero_buffer) { 789 SPDK_ERRLOG("create bdev zero buffer failed\n"); 790 spdk_bdev_init_complete(-1); 791 return; 792 } 793 794 #ifdef SPDK_CONFIG_VTUNE 795 g_bdev_mgr.domain = __itt_domain_create("spdk_bdev"); 796 #endif 797 798 spdk_io_device_register(&g_bdev_mgr, spdk_bdev_mgmt_channel_create, 799 spdk_bdev_mgmt_channel_destroy, 800 sizeof(struct spdk_bdev_mgmt_channel)); 801 802 rc = spdk_bdev_modules_init(); 803 if (rc != 0) { 804 SPDK_ERRLOG("bdev modules init failed\n"); 805 spdk_thread_send_msg(spdk_get_thread(), spdk_bdev_init_failed, NULL); 806 return; 807 } 808 809 spdk_bdev_module_action_complete(); 810 } 811 812 static void 813 spdk_bdev_mgr_unregister_cb(void *io_device) 814 { 815 spdk_bdev_fini_cb cb_fn = g_fini_cb_fn; 816 817 if (spdk_mempool_count(g_bdev_mgr.bdev_io_pool) != g_bdev_opts.bdev_io_pool_size) { 818 SPDK_ERRLOG("bdev IO pool count is %zu but should be %u\n", 819 spdk_mempool_count(g_bdev_mgr.bdev_io_pool), 820 g_bdev_opts.bdev_io_pool_size); 821 } 822 823 if (spdk_mempool_count(g_bdev_mgr.buf_small_pool) != BUF_SMALL_POOL_SIZE) { 824 SPDK_ERRLOG("Small buffer pool count is %zu but should be %u\n", 825 spdk_mempool_count(g_bdev_mgr.buf_small_pool), 826 BUF_SMALL_POOL_SIZE); 827 assert(false); 828 } 829 830 if (spdk_mempool_count(g_bdev_mgr.buf_large_pool) != BUF_LARGE_POOL_SIZE) { 831 SPDK_ERRLOG("Large buffer pool count is %zu but should be %u\n", 832 spdk_mempool_count(g_bdev_mgr.buf_large_pool), 833 BUF_LARGE_POOL_SIZE); 834 assert(false); 835 } 836 837 spdk_mempool_free(g_bdev_mgr.bdev_io_pool); 838 spdk_mempool_free(g_bdev_mgr.buf_small_pool); 839 spdk_mempool_free(g_bdev_mgr.buf_large_pool); 840 spdk_dma_free(g_bdev_mgr.zero_buffer); 841 842 cb_fn(g_fini_cb_arg); 843 g_fini_cb_fn = NULL; 844 g_fini_cb_arg = NULL; 845 } 846 847 static struct spdk_bdev_module *g_resume_bdev_module = NULL; 848 849 static void 850 spdk_bdev_module_finish_iter(void *arg) 851 { 852 struct spdk_bdev_module *bdev_module; 853 854 /* Start iterating from the last touched module */ 855 if (!g_resume_bdev_module) { 856 bdev_module = TAILQ_FIRST(&g_bdev_mgr.bdev_modules); 857 } else { 858 bdev_module = TAILQ_NEXT(g_resume_bdev_module, internal.tailq); 859 } 860 861 while (bdev_module) { 862 if (bdev_module->async_fini) { 863 /* Save our place so we can resume later. We must 864 * save the variable here, before calling module_fini() 865 * below, because in some cases the module may immediately 866 * call spdk_bdev_module_finish_done() and re-enter 867 * this function to continue iterating. */ 868 g_resume_bdev_module = bdev_module; 869 } 870 871 if (bdev_module->module_fini) { 872 bdev_module->module_fini(); 873 } 874 875 if (bdev_module->async_fini) { 876 return; 877 } 878 879 bdev_module = TAILQ_NEXT(bdev_module, internal.tailq); 880 } 881 882 g_resume_bdev_module = NULL; 883 spdk_io_device_unregister(&g_bdev_mgr, spdk_bdev_mgr_unregister_cb); 884 } 885 886 void 887 spdk_bdev_module_finish_done(void) 888 { 889 if (spdk_get_thread() != g_fini_thread) { 890 spdk_thread_send_msg(g_fini_thread, spdk_bdev_module_finish_iter, NULL); 891 } else { 892 spdk_bdev_module_finish_iter(NULL); 893 } 894 } 895 896 static void 897 _spdk_bdev_finish_unregister_bdevs_iter(void *cb_arg, int bdeverrno) 898 { 899 struct spdk_bdev *bdev = cb_arg; 900 901 if (bdeverrno && bdev) { 902 SPDK_WARNLOG("Unable to unregister bdev '%s' during spdk_bdev_finish()\n", 903 bdev->name); 904 905 /* 906 * Since the call to spdk_bdev_unregister() failed, we have no way to free this 907 * bdev; try to continue by manually removing this bdev from the list and continue 908 * with the next bdev in the list. 909 */ 910 TAILQ_REMOVE(&g_bdev_mgr.bdevs, bdev, internal.link); 911 } 912 913 if (TAILQ_EMPTY(&g_bdev_mgr.bdevs)) { 914 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Done unregistering bdevs\n"); 915 /* 916 * Bdev module finish need to be deffered as we might be in the middle of some context 917 * (like bdev part free) that will use this bdev (or private bdev driver ctx data) 918 * after returning. 919 */ 920 spdk_thread_send_msg(spdk_get_thread(), spdk_bdev_module_finish_iter, NULL); 921 return; 922 } 923 924 /* 925 * Unregister the last bdev in the list. The last bdev in the list should be a bdev 926 * that has no bdevs that depend on it. 927 */ 928 bdev = TAILQ_LAST(&g_bdev_mgr.bdevs, spdk_bdev_list); 929 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Unregistering bdev '%s'\n", bdev->name); 930 spdk_bdev_unregister(bdev, _spdk_bdev_finish_unregister_bdevs_iter, bdev); 931 } 932 933 void 934 spdk_bdev_finish(spdk_bdev_fini_cb cb_fn, void *cb_arg) 935 { 936 struct spdk_bdev_module *m; 937 938 assert(cb_fn != NULL); 939 940 g_fini_thread = spdk_get_thread(); 941 942 g_fini_cb_fn = cb_fn; 943 g_fini_cb_arg = cb_arg; 944 945 TAILQ_FOREACH(m, &g_bdev_mgr.bdev_modules, internal.tailq) { 946 if (m->fini_start) { 947 m->fini_start(); 948 } 949 } 950 951 _spdk_bdev_finish_unregister_bdevs_iter(NULL, 0); 952 } 953 954 static struct spdk_bdev_io * 955 spdk_bdev_get_io(struct spdk_bdev_channel *channel) 956 { 957 struct spdk_bdev_mgmt_channel *ch = channel->shared_resource->mgmt_ch; 958 struct spdk_bdev_io *bdev_io; 959 960 if (ch->per_thread_cache_count > 0) { 961 bdev_io = STAILQ_FIRST(&ch->per_thread_cache); 962 STAILQ_REMOVE_HEAD(&ch->per_thread_cache, internal.buf_link); 963 ch->per_thread_cache_count--; 964 } else if (spdk_unlikely(!TAILQ_EMPTY(&ch->io_wait_queue))) { 965 /* 966 * Don't try to look for bdev_ios in the global pool if there are 967 * waiters on bdev_ios - we don't want this caller to jump the line. 968 */ 969 bdev_io = NULL; 970 } else { 971 bdev_io = spdk_mempool_get(g_bdev_mgr.bdev_io_pool); 972 } 973 974 return bdev_io; 975 } 976 977 void 978 spdk_bdev_free_io(struct spdk_bdev_io *bdev_io) 979 { 980 struct spdk_bdev_mgmt_channel *ch = bdev_io->internal.ch->shared_resource->mgmt_ch; 981 982 assert(bdev_io != NULL); 983 assert(bdev_io->internal.status != SPDK_BDEV_IO_STATUS_PENDING); 984 985 if (bdev_io->internal.buf != NULL) { 986 spdk_bdev_io_put_buf(bdev_io); 987 } 988 989 if (ch->per_thread_cache_count < ch->bdev_io_cache_size) { 990 ch->per_thread_cache_count++; 991 STAILQ_INSERT_TAIL(&ch->per_thread_cache, bdev_io, internal.buf_link); 992 while (ch->per_thread_cache_count > 0 && !TAILQ_EMPTY(&ch->io_wait_queue)) { 993 struct spdk_bdev_io_wait_entry *entry; 994 995 entry = TAILQ_FIRST(&ch->io_wait_queue); 996 TAILQ_REMOVE(&ch->io_wait_queue, entry, link); 997 entry->cb_fn(entry->cb_arg); 998 } 999 } else { 1000 /* We should never have a full cache with entries on the io wait queue. */ 1001 assert(TAILQ_EMPTY(&ch->io_wait_queue)); 1002 spdk_mempool_put(g_bdev_mgr.bdev_io_pool, (void *)bdev_io); 1003 } 1004 } 1005 1006 static uint64_t 1007 _spdk_bdev_get_io_size_in_byte(struct spdk_bdev_io *bdev_io) 1008 { 1009 struct spdk_bdev *bdev = bdev_io->bdev; 1010 1011 switch (bdev_io->type) { 1012 case SPDK_BDEV_IO_TYPE_NVME_ADMIN: 1013 case SPDK_BDEV_IO_TYPE_NVME_IO: 1014 case SPDK_BDEV_IO_TYPE_NVME_IO_MD: 1015 return bdev_io->u.nvme_passthru.nbytes; 1016 case SPDK_BDEV_IO_TYPE_READ: 1017 case SPDK_BDEV_IO_TYPE_WRITE: 1018 case SPDK_BDEV_IO_TYPE_UNMAP: 1019 case SPDK_BDEV_IO_TYPE_WRITE_ZEROES: 1020 return bdev_io->u.bdev.num_blocks * bdev->blocklen; 1021 default: 1022 return 0; 1023 } 1024 } 1025 1026 static void 1027 _spdk_bdev_qos_io_submit(struct spdk_bdev_channel *ch) 1028 { 1029 struct spdk_bdev_io *bdev_io = NULL; 1030 struct spdk_bdev *bdev = ch->bdev; 1031 struct spdk_bdev_qos *qos = bdev->internal.qos; 1032 struct spdk_bdev_shared_resource *shared_resource = ch->shared_resource; 1033 1034 while (!TAILQ_EMPTY(&qos->queued)) { 1035 if (qos->max_ios_per_timeslice > 0 && qos->io_remaining_this_timeslice == 0) { 1036 break; 1037 } 1038 1039 if (qos->max_byte_per_timeslice > 0 && qos->byte_remaining_this_timeslice <= 0) { 1040 break; 1041 } 1042 1043 bdev_io = TAILQ_FIRST(&qos->queued); 1044 TAILQ_REMOVE(&qos->queued, bdev_io, internal.link); 1045 qos->io_remaining_this_timeslice--; 1046 qos->byte_remaining_this_timeslice -= _spdk_bdev_get_io_size_in_byte(bdev_io); 1047 ch->io_outstanding++; 1048 shared_resource->io_outstanding++; 1049 bdev->fn_table->submit_request(ch->channel, bdev_io); 1050 } 1051 } 1052 1053 static bool 1054 _spdk_bdev_io_type_can_split(uint8_t type) 1055 { 1056 assert(type != SPDK_BDEV_IO_TYPE_INVALID); 1057 assert(type < SPDK_BDEV_NUM_IO_TYPES); 1058 1059 switch (type) { 1060 case SPDK_BDEV_IO_TYPE_RESET: 1061 case SPDK_BDEV_IO_TYPE_NVME_ADMIN: 1062 case SPDK_BDEV_IO_TYPE_NVME_IO: 1063 case SPDK_BDEV_IO_TYPE_NVME_IO_MD: 1064 /* These types of bdev_io do not specify an LBA offset/length. */ 1065 return false; 1066 default: 1067 return true; 1068 } 1069 } 1070 1071 static bool 1072 _spdk_bdev_io_spans_boundary(struct spdk_bdev_io *bdev_io) 1073 { 1074 uint64_t start_stripe, end_stripe; 1075 uint32_t io_boundary = bdev_io->bdev->optimal_io_boundary; 1076 1077 if (io_boundary == 0) { 1078 return false; 1079 } 1080 1081 if (!_spdk_bdev_io_type_can_split(bdev_io->type)) { 1082 return false; 1083 } 1084 1085 start_stripe = bdev_io->u.bdev.offset_blocks; 1086 end_stripe = start_stripe + bdev_io->u.bdev.num_blocks - 1; 1087 /* Avoid expensive div operations if possible. These spdk_u32 functions are very cheap. */ 1088 if (spdk_likely(spdk_u32_is_pow2(io_boundary))) { 1089 start_stripe >>= spdk_u32log2(io_boundary); 1090 end_stripe >>= spdk_u32log2(io_boundary); 1091 } else { 1092 start_stripe /= io_boundary; 1093 end_stripe /= io_boundary; 1094 } 1095 return (start_stripe != end_stripe); 1096 } 1097 1098 static uint32_t 1099 _to_next_boundary(uint64_t offset, uint32_t boundary) 1100 { 1101 return (boundary - (offset % boundary)); 1102 } 1103 1104 static void 1105 _spdk_bdev_io_split_done(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg); 1106 1107 static void 1108 _spdk_bdev_io_split_with_payload(void *_bdev_io) 1109 { 1110 struct spdk_bdev_io *bdev_io = _bdev_io; 1111 uint64_t current_offset, remaining, bytes_handled; 1112 uint32_t blocklen, to_next_boundary, to_next_boundary_bytes; 1113 struct iovec *parent_iov; 1114 uint64_t parent_iov_offset, child_iov_len; 1115 uint32_t child_iovcnt; 1116 int rc; 1117 1118 remaining = bdev_io->u.bdev.split_remaining_num_blocks; 1119 current_offset = bdev_io->u.bdev.split_current_offset_blocks; 1120 blocklen = bdev_io->bdev->blocklen; 1121 bytes_handled = (current_offset - bdev_io->u.bdev.offset_blocks) * blocklen; 1122 parent_iov = &bdev_io->u.bdev.iovs[0]; 1123 parent_iov_offset = 0; 1124 1125 while (bytes_handled > 0) { 1126 if (bytes_handled >= parent_iov->iov_len) { 1127 bytes_handled -= parent_iov->iov_len; 1128 parent_iov++; 1129 continue; 1130 } 1131 parent_iov_offset += bytes_handled; 1132 break; 1133 } 1134 1135 to_next_boundary = _to_next_boundary(current_offset, bdev_io->bdev->optimal_io_boundary); 1136 to_next_boundary = spdk_min(remaining, to_next_boundary); 1137 to_next_boundary_bytes = to_next_boundary * blocklen; 1138 child_iovcnt = 0; 1139 while (to_next_boundary_bytes > 0) { 1140 child_iov_len = spdk_min(to_next_boundary_bytes, parent_iov->iov_len - parent_iov_offset); 1141 to_next_boundary_bytes -= child_iov_len; 1142 1143 bdev_io->child_iov[child_iovcnt].iov_base = parent_iov->iov_base + parent_iov_offset; 1144 bdev_io->child_iov[child_iovcnt].iov_len = child_iov_len; 1145 1146 parent_iov++; 1147 parent_iov_offset = 0; 1148 child_iovcnt++; 1149 if (child_iovcnt == BDEV_IO_NUM_CHILD_IOV && to_next_boundary_bytes > 0) { 1150 /* We've run out of child iovs - we need to fail this I/O. */ 1151 bdev_io->internal.status = SPDK_BDEV_IO_STATUS_FAILED; 1152 bdev_io->internal.cb(bdev_io, SPDK_BDEV_IO_STATUS_FAILED, 1153 bdev_io->internal.caller_ctx); 1154 return; 1155 } 1156 } 1157 1158 if (bdev_io->type == SPDK_BDEV_IO_TYPE_READ) { 1159 rc = spdk_bdev_readv_blocks(bdev_io->internal.desc, 1160 spdk_io_channel_from_ctx(bdev_io->internal.ch), 1161 bdev_io->child_iov, child_iovcnt, current_offset, to_next_boundary, 1162 _spdk_bdev_io_split_done, bdev_io); 1163 } else { 1164 rc = spdk_bdev_writev_blocks(bdev_io->internal.desc, 1165 spdk_io_channel_from_ctx(bdev_io->internal.ch), 1166 bdev_io->child_iov, child_iovcnt, current_offset, to_next_boundary, 1167 _spdk_bdev_io_split_done, bdev_io); 1168 } 1169 1170 if (rc == 0) { 1171 bdev_io->u.bdev.split_current_offset_blocks += to_next_boundary; 1172 bdev_io->u.bdev.split_remaining_num_blocks -= to_next_boundary; 1173 } else { 1174 assert(rc == -ENOMEM); 1175 bdev_io->internal.waitq_entry.bdev = bdev_io->bdev; 1176 bdev_io->internal.waitq_entry.cb_fn = _spdk_bdev_io_split_with_payload; 1177 bdev_io->internal.waitq_entry.cb_arg = bdev_io; 1178 spdk_bdev_queue_io_wait(bdev_io->bdev, spdk_io_channel_from_ctx(bdev_io->internal.ch), 1179 &bdev_io->internal.waitq_entry); 1180 } 1181 } 1182 1183 static void 1184 _spdk_bdev_io_split_no_payload(void *_bdev_io) 1185 { 1186 struct spdk_bdev_io *bdev_io = _bdev_io; 1187 uint64_t current_offset, remaining; 1188 uint32_t to_next_boundary; 1189 int rc; 1190 1191 remaining = bdev_io->u.bdev.split_remaining_num_blocks; 1192 current_offset = bdev_io->u.bdev.split_current_offset_blocks; 1193 1194 to_next_boundary = _to_next_boundary(current_offset, bdev_io->bdev->optimal_io_boundary); 1195 to_next_boundary = spdk_min(remaining, to_next_boundary); 1196 1197 if (bdev_io->type == SPDK_BDEV_IO_TYPE_UNMAP) { 1198 rc = spdk_bdev_unmap_blocks(bdev_io->internal.desc, 1199 spdk_io_channel_from_ctx(bdev_io->internal.ch), 1200 current_offset, to_next_boundary, 1201 _spdk_bdev_io_split_done, bdev_io); 1202 } else if (bdev_io->type == SPDK_BDEV_IO_TYPE_WRITE_ZEROES) { 1203 rc = spdk_bdev_write_zeroes_blocks(bdev_io->internal.desc, 1204 spdk_io_channel_from_ctx(bdev_io->internal.ch), 1205 current_offset, to_next_boundary, 1206 _spdk_bdev_io_split_done, bdev_io); 1207 } else { 1208 assert(bdev_io->type == SPDK_BDEV_IO_TYPE_FLUSH); 1209 rc = spdk_bdev_flush_blocks(bdev_io->internal.desc, 1210 spdk_io_channel_from_ctx(bdev_io->internal.ch), 1211 current_offset, to_next_boundary, 1212 _spdk_bdev_io_split_done, bdev_io); 1213 } 1214 1215 if (rc == 0) { 1216 bdev_io->u.bdev.split_current_offset_blocks += to_next_boundary; 1217 bdev_io->u.bdev.split_remaining_num_blocks -= to_next_boundary; 1218 } else { 1219 assert(rc == -ENOMEM); 1220 bdev_io->internal.waitq_entry.bdev = bdev_io->bdev; 1221 bdev_io->internal.waitq_entry.cb_fn = _spdk_bdev_io_split_with_payload; 1222 bdev_io->internal.waitq_entry.cb_arg = bdev_io; 1223 spdk_bdev_queue_io_wait(bdev_io->bdev, spdk_io_channel_from_ctx(bdev_io->internal.ch), 1224 &bdev_io->internal.waitq_entry); 1225 } 1226 } 1227 1228 static void 1229 _spdk_bdev_io_split_done(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg) 1230 { 1231 struct spdk_bdev_io *parent_io = cb_arg; 1232 1233 spdk_bdev_free_io(bdev_io); 1234 1235 if (!success) { 1236 parent_io->internal.status = SPDK_BDEV_IO_STATUS_FAILED; 1237 parent_io->internal.cb(parent_io, SPDK_BDEV_IO_STATUS_FAILED, parent_io->internal.caller_ctx); 1238 return; 1239 } 1240 1241 if (parent_io->u.bdev.split_remaining_num_blocks == 0) { 1242 parent_io->internal.status = SPDK_BDEV_IO_STATUS_SUCCESS; 1243 parent_io->internal.cb(parent_io, SPDK_BDEV_IO_STATUS_SUCCESS, parent_io->internal.caller_ctx); 1244 return; 1245 } 1246 1247 /* 1248 * Continue with the splitting process. This function will complete the parent I/O if the 1249 * splitting is done. 1250 */ 1251 if (parent_io->type == SPDK_BDEV_IO_TYPE_READ || parent_io->type == SPDK_BDEV_IO_TYPE_WRITE) { 1252 _spdk_bdev_io_split_with_payload(parent_io); 1253 } else { 1254 _spdk_bdev_io_split_no_payload(parent_io); 1255 } 1256 } 1257 1258 static void 1259 _spdk_bdev_io_split(struct spdk_bdev_io *bdev_io) 1260 { 1261 assert(_spdk_bdev_io_type_can_split(bdev_io->type)); 1262 1263 bdev_io->u.bdev.split_current_offset_blocks = bdev_io->u.bdev.offset_blocks; 1264 bdev_io->u.bdev.split_remaining_num_blocks = bdev_io->u.bdev.num_blocks; 1265 1266 if (bdev_io->type == SPDK_BDEV_IO_TYPE_READ || bdev_io->type == SPDK_BDEV_IO_TYPE_WRITE) { 1267 _spdk_bdev_io_split_with_payload(bdev_io); 1268 } else { 1269 _spdk_bdev_io_split_no_payload(bdev_io); 1270 } 1271 } 1272 1273 static void 1274 _spdk_bdev_io_submit(void *ctx) 1275 { 1276 struct spdk_bdev_io *bdev_io = ctx; 1277 struct spdk_bdev *bdev = bdev_io->bdev; 1278 struct spdk_bdev_channel *bdev_ch = bdev_io->internal.ch; 1279 struct spdk_io_channel *ch = bdev_ch->channel; 1280 struct spdk_bdev_shared_resource *shared_resource = bdev_ch->shared_resource; 1281 1282 bdev_io->internal.submit_tsc = spdk_get_ticks(); 1283 bdev_ch->io_outstanding++; 1284 shared_resource->io_outstanding++; 1285 bdev_io->internal.in_submit_request = true; 1286 if (spdk_likely(bdev_ch->flags == 0)) { 1287 if (spdk_likely(TAILQ_EMPTY(&shared_resource->nomem_io))) { 1288 bdev->fn_table->submit_request(ch, bdev_io); 1289 } else { 1290 bdev_ch->io_outstanding--; 1291 shared_resource->io_outstanding--; 1292 TAILQ_INSERT_TAIL(&shared_resource->nomem_io, bdev_io, internal.link); 1293 } 1294 } else if (bdev_ch->flags & BDEV_CH_RESET_IN_PROGRESS) { 1295 spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED); 1296 } else if (bdev_ch->flags & BDEV_CH_QOS_ENABLED) { 1297 bdev_ch->io_outstanding--; 1298 shared_resource->io_outstanding--; 1299 TAILQ_INSERT_TAIL(&bdev->internal.qos->queued, bdev_io, internal.link); 1300 _spdk_bdev_qos_io_submit(bdev_ch); 1301 } else { 1302 SPDK_ERRLOG("unknown bdev_ch flag %x found\n", bdev_ch->flags); 1303 spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED); 1304 } 1305 bdev_io->internal.in_submit_request = false; 1306 } 1307 1308 static void 1309 spdk_bdev_io_submit(struct spdk_bdev_io *bdev_io) 1310 { 1311 struct spdk_bdev *bdev = bdev_io->bdev; 1312 struct spdk_thread *thread = spdk_io_channel_get_thread(bdev_io->internal.ch->channel); 1313 1314 assert(thread != NULL); 1315 assert(bdev_io->internal.status == SPDK_BDEV_IO_STATUS_PENDING); 1316 1317 if (bdev->split_on_optimal_io_boundary && _spdk_bdev_io_spans_boundary(bdev_io)) { 1318 _spdk_bdev_io_split(bdev_io); 1319 return; 1320 } 1321 1322 if (bdev_io->internal.ch->flags & BDEV_CH_QOS_ENABLED) { 1323 if ((thread == bdev->internal.qos->thread) || !bdev->internal.qos->thread) { 1324 _spdk_bdev_io_submit(bdev_io); 1325 } else { 1326 bdev_io->internal.io_submit_ch = bdev_io->internal.ch; 1327 bdev_io->internal.ch = bdev->internal.qos->ch; 1328 spdk_thread_send_msg(bdev->internal.qos->thread, _spdk_bdev_io_submit, bdev_io); 1329 } 1330 } else { 1331 _spdk_bdev_io_submit(bdev_io); 1332 } 1333 } 1334 1335 static void 1336 spdk_bdev_io_submit_reset(struct spdk_bdev_io *bdev_io) 1337 { 1338 struct spdk_bdev *bdev = bdev_io->bdev; 1339 struct spdk_bdev_channel *bdev_ch = bdev_io->internal.ch; 1340 struct spdk_io_channel *ch = bdev_ch->channel; 1341 1342 assert(bdev_io->internal.status == SPDK_BDEV_IO_STATUS_PENDING); 1343 1344 bdev_io->internal.in_submit_request = true; 1345 bdev->fn_table->submit_request(ch, bdev_io); 1346 bdev_io->internal.in_submit_request = false; 1347 } 1348 1349 static void 1350 spdk_bdev_io_init(struct spdk_bdev_io *bdev_io, 1351 struct spdk_bdev *bdev, void *cb_arg, 1352 spdk_bdev_io_completion_cb cb) 1353 { 1354 bdev_io->bdev = bdev; 1355 bdev_io->internal.caller_ctx = cb_arg; 1356 bdev_io->internal.cb = cb; 1357 bdev_io->internal.status = SPDK_BDEV_IO_STATUS_PENDING; 1358 bdev_io->internal.in_submit_request = false; 1359 bdev_io->internal.buf = NULL; 1360 bdev_io->internal.io_submit_ch = NULL; 1361 } 1362 1363 static bool 1364 _spdk_bdev_io_type_supported(struct spdk_bdev *bdev, enum spdk_bdev_io_type io_type) 1365 { 1366 return bdev->fn_table->io_type_supported(bdev->ctxt, io_type); 1367 } 1368 1369 bool 1370 spdk_bdev_io_type_supported(struct spdk_bdev *bdev, enum spdk_bdev_io_type io_type) 1371 { 1372 bool supported; 1373 1374 supported = _spdk_bdev_io_type_supported(bdev, io_type); 1375 1376 if (!supported) { 1377 switch (io_type) { 1378 case SPDK_BDEV_IO_TYPE_WRITE_ZEROES: 1379 /* The bdev layer will emulate write zeroes as long as write is supported. */ 1380 supported = _spdk_bdev_io_type_supported(bdev, SPDK_BDEV_IO_TYPE_WRITE); 1381 break; 1382 default: 1383 break; 1384 } 1385 } 1386 1387 return supported; 1388 } 1389 1390 int 1391 spdk_bdev_dump_info_json(struct spdk_bdev *bdev, struct spdk_json_write_ctx *w) 1392 { 1393 if (bdev->fn_table->dump_info_json) { 1394 return bdev->fn_table->dump_info_json(bdev->ctxt, w); 1395 } 1396 1397 return 0; 1398 } 1399 1400 void 1401 spdk_bdev_config_json(struct spdk_bdev *bdev, struct spdk_json_write_ctx *w) 1402 { 1403 assert(bdev != NULL); 1404 assert(w != NULL); 1405 1406 if (bdev->fn_table->write_config_json) { 1407 bdev->fn_table->write_config_json(bdev, w); 1408 } else { 1409 spdk_json_write_object_begin(w); 1410 spdk_json_write_named_string(w, "name", bdev->name); 1411 spdk_json_write_object_end(w); 1412 } 1413 } 1414 1415 static void 1416 spdk_bdev_qos_update_max_quota_per_timeslice(struct spdk_bdev_qos *qos) 1417 { 1418 uint64_t max_ios_per_timeslice = 0, max_byte_per_timeslice = 0; 1419 1420 if (qos->iops_rate_limit > 0) { 1421 max_ios_per_timeslice = qos->iops_rate_limit * SPDK_BDEV_QOS_TIMESLICE_IN_USEC / 1422 SPDK_SEC_TO_USEC; 1423 qos->max_ios_per_timeslice = spdk_max(max_ios_per_timeslice, 1424 SPDK_BDEV_QOS_MIN_IO_PER_TIMESLICE); 1425 } 1426 1427 if (qos->byte_rate_limit > 0) { 1428 max_byte_per_timeslice = qos->byte_rate_limit * SPDK_BDEV_QOS_TIMESLICE_IN_USEC / 1429 SPDK_SEC_TO_USEC; 1430 qos->max_byte_per_timeslice = spdk_max(max_byte_per_timeslice, 1431 SPDK_BDEV_QOS_MIN_BYTE_PER_TIMESLICE); 1432 } 1433 } 1434 1435 static int 1436 spdk_bdev_channel_poll_qos(void *arg) 1437 { 1438 struct spdk_bdev_qos *qos = arg; 1439 1440 /* Reset for next round of rate limiting */ 1441 qos->io_remaining_this_timeslice = qos->max_ios_per_timeslice; 1442 1443 /* We may have allowed the bytes to slightly overrun in the last timeslice. 1444 * byte_remaining_this_timeslice is signed, so if it's negative here, we'll 1445 * account for the overrun so that the next timeslice will be appropriately 1446 * reduced. 1447 */ 1448 if (qos->byte_remaining_this_timeslice > 0) { 1449 qos->byte_remaining_this_timeslice = 0; 1450 } 1451 qos->byte_remaining_this_timeslice += qos->max_byte_per_timeslice; 1452 1453 _spdk_bdev_qos_io_submit(qos->ch); 1454 1455 return -1; 1456 } 1457 1458 static void 1459 _spdk_bdev_channel_destroy_resource(struct spdk_bdev_channel *ch) 1460 { 1461 struct spdk_bdev_shared_resource *shared_resource; 1462 1463 if (!ch) { 1464 return; 1465 } 1466 1467 if (ch->channel) { 1468 spdk_put_io_channel(ch->channel); 1469 } 1470 1471 assert(ch->io_outstanding == 0); 1472 1473 shared_resource = ch->shared_resource; 1474 if (shared_resource) { 1475 assert(ch->io_outstanding == 0); 1476 assert(shared_resource->ref > 0); 1477 shared_resource->ref--; 1478 if (shared_resource->ref == 0) { 1479 assert(shared_resource->io_outstanding == 0); 1480 TAILQ_REMOVE(&shared_resource->mgmt_ch->shared_resources, shared_resource, link); 1481 spdk_put_io_channel(spdk_io_channel_from_ctx(shared_resource->mgmt_ch)); 1482 free(shared_resource); 1483 } 1484 } 1485 } 1486 1487 /* Caller must hold bdev->internal.mutex. */ 1488 static void 1489 _spdk_bdev_enable_qos(struct spdk_bdev *bdev, struct spdk_bdev_channel *ch) 1490 { 1491 struct spdk_bdev_qos *qos = bdev->internal.qos; 1492 1493 /* Rate limiting on this bdev enabled */ 1494 if (qos) { 1495 if (qos->ch == NULL) { 1496 struct spdk_io_channel *io_ch; 1497 1498 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Selecting channel %p as QoS channel for bdev %s on thread %p\n", ch, 1499 bdev->name, spdk_get_thread()); 1500 1501 /* No qos channel has been selected, so set one up */ 1502 1503 /* Take another reference to ch */ 1504 io_ch = spdk_get_io_channel(__bdev_to_io_dev(bdev)); 1505 qos->ch = ch; 1506 1507 qos->thread = spdk_io_channel_get_thread(io_ch); 1508 1509 TAILQ_INIT(&qos->queued); 1510 spdk_bdev_qos_update_max_quota_per_timeslice(qos); 1511 qos->io_remaining_this_timeslice = qos->max_ios_per_timeslice; 1512 qos->byte_remaining_this_timeslice = qos->max_byte_per_timeslice; 1513 1514 qos->poller = spdk_poller_register(spdk_bdev_channel_poll_qos, 1515 qos, 1516 SPDK_BDEV_QOS_TIMESLICE_IN_USEC); 1517 } 1518 1519 ch->flags |= BDEV_CH_QOS_ENABLED; 1520 } 1521 } 1522 1523 static int 1524 spdk_bdev_channel_create(void *io_device, void *ctx_buf) 1525 { 1526 struct spdk_bdev *bdev = __bdev_from_io_dev(io_device); 1527 struct spdk_bdev_channel *ch = ctx_buf; 1528 struct spdk_io_channel *mgmt_io_ch; 1529 struct spdk_bdev_mgmt_channel *mgmt_ch; 1530 struct spdk_bdev_shared_resource *shared_resource; 1531 1532 ch->bdev = bdev; 1533 ch->channel = bdev->fn_table->get_io_channel(bdev->ctxt); 1534 if (!ch->channel) { 1535 return -1; 1536 } 1537 1538 mgmt_io_ch = spdk_get_io_channel(&g_bdev_mgr); 1539 if (!mgmt_io_ch) { 1540 return -1; 1541 } 1542 1543 mgmt_ch = spdk_io_channel_get_ctx(mgmt_io_ch); 1544 TAILQ_FOREACH(shared_resource, &mgmt_ch->shared_resources, link) { 1545 if (shared_resource->shared_ch == ch->channel) { 1546 spdk_put_io_channel(mgmt_io_ch); 1547 shared_resource->ref++; 1548 break; 1549 } 1550 } 1551 1552 if (shared_resource == NULL) { 1553 shared_resource = calloc(1, sizeof(*shared_resource)); 1554 if (shared_resource == NULL) { 1555 spdk_put_io_channel(mgmt_io_ch); 1556 return -1; 1557 } 1558 1559 shared_resource->mgmt_ch = mgmt_ch; 1560 shared_resource->io_outstanding = 0; 1561 TAILQ_INIT(&shared_resource->nomem_io); 1562 shared_resource->nomem_threshold = 0; 1563 shared_resource->shared_ch = ch->channel; 1564 shared_resource->ref = 1; 1565 TAILQ_INSERT_TAIL(&mgmt_ch->shared_resources, shared_resource, link); 1566 } 1567 1568 memset(&ch->stat, 0, sizeof(ch->stat)); 1569 ch->stat.ticks_rate = spdk_get_ticks_hz(); 1570 ch->io_outstanding = 0; 1571 TAILQ_INIT(&ch->queued_resets); 1572 ch->flags = 0; 1573 ch->shared_resource = shared_resource; 1574 1575 #ifdef SPDK_CONFIG_VTUNE 1576 { 1577 char *name; 1578 __itt_init_ittlib(NULL, 0); 1579 name = spdk_sprintf_alloc("spdk_bdev_%s_%p", ch->bdev->name, ch); 1580 if (!name) { 1581 _spdk_bdev_channel_destroy_resource(ch); 1582 return -1; 1583 } 1584 ch->handle = __itt_string_handle_create(name); 1585 free(name); 1586 ch->start_tsc = spdk_get_ticks(); 1587 ch->interval_tsc = spdk_get_ticks_hz() / 100; 1588 memset(&ch->prev_stat, 0, sizeof(ch->prev_stat)); 1589 } 1590 #endif 1591 1592 pthread_mutex_lock(&bdev->internal.mutex); 1593 _spdk_bdev_enable_qos(bdev, ch); 1594 pthread_mutex_unlock(&bdev->internal.mutex); 1595 1596 return 0; 1597 } 1598 1599 /* 1600 * Abort I/O that are waiting on a data buffer. These types of I/O are 1601 * linked using the spdk_bdev_io internal.buf_link TAILQ_ENTRY. 1602 */ 1603 static void 1604 _spdk_bdev_abort_buf_io(bdev_io_stailq_t *queue, struct spdk_bdev_channel *ch) 1605 { 1606 bdev_io_stailq_t tmp; 1607 struct spdk_bdev_io *bdev_io; 1608 1609 STAILQ_INIT(&tmp); 1610 1611 while (!STAILQ_EMPTY(queue)) { 1612 bdev_io = STAILQ_FIRST(queue); 1613 STAILQ_REMOVE_HEAD(queue, internal.buf_link); 1614 if (bdev_io->internal.ch == ch) { 1615 spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED); 1616 } else { 1617 STAILQ_INSERT_TAIL(&tmp, bdev_io, internal.buf_link); 1618 } 1619 } 1620 1621 STAILQ_SWAP(&tmp, queue, spdk_bdev_io); 1622 } 1623 1624 /* 1625 * Abort I/O that are queued waiting for submission. These types of I/O are 1626 * linked using the spdk_bdev_io link TAILQ_ENTRY. 1627 */ 1628 static void 1629 _spdk_bdev_abort_queued_io(bdev_io_tailq_t *queue, struct spdk_bdev_channel *ch) 1630 { 1631 struct spdk_bdev_io *bdev_io, *tmp; 1632 1633 TAILQ_FOREACH_SAFE(bdev_io, queue, internal.link, tmp) { 1634 if (bdev_io->internal.ch == ch) { 1635 TAILQ_REMOVE(queue, bdev_io, internal.link); 1636 /* 1637 * spdk_bdev_io_complete() assumes that the completed I/O had 1638 * been submitted to the bdev module. Since in this case it 1639 * hadn't, bump io_outstanding to account for the decrement 1640 * that spdk_bdev_io_complete() will do. 1641 */ 1642 if (bdev_io->type != SPDK_BDEV_IO_TYPE_RESET) { 1643 ch->io_outstanding++; 1644 ch->shared_resource->io_outstanding++; 1645 } 1646 spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED); 1647 } 1648 } 1649 } 1650 1651 static void 1652 spdk_bdev_qos_channel_destroy(void *cb_arg) 1653 { 1654 struct spdk_bdev_qos *qos = cb_arg; 1655 1656 spdk_put_io_channel(spdk_io_channel_from_ctx(qos->ch)); 1657 spdk_poller_unregister(&qos->poller); 1658 1659 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Free QoS %p.\n", qos); 1660 1661 free(qos); 1662 } 1663 1664 static int 1665 spdk_bdev_qos_destroy(struct spdk_bdev *bdev) 1666 { 1667 /* 1668 * Cleanly shutting down the QoS poller is tricky, because 1669 * during the asynchronous operation the user could open 1670 * a new descriptor and create a new channel, spawning 1671 * a new QoS poller. 1672 * 1673 * The strategy is to create a new QoS structure here and swap it 1674 * in. The shutdown path then continues to refer to the old one 1675 * until it completes and then releases it. 1676 */ 1677 struct spdk_bdev_qos *new_qos, *old_qos; 1678 1679 old_qos = bdev->internal.qos; 1680 1681 new_qos = calloc(1, sizeof(*new_qos)); 1682 if (!new_qos) { 1683 SPDK_ERRLOG("Unable to allocate memory to shut down QoS.\n"); 1684 return -ENOMEM; 1685 } 1686 1687 /* Copy the old QoS data into the newly allocated structure */ 1688 memcpy(new_qos, old_qos, sizeof(*new_qos)); 1689 1690 /* Zero out the key parts of the QoS structure */ 1691 new_qos->ch = NULL; 1692 new_qos->thread = NULL; 1693 new_qos->max_ios_per_timeslice = 0; 1694 new_qos->max_byte_per_timeslice = 0; 1695 new_qos->io_remaining_this_timeslice = 0; 1696 new_qos->byte_remaining_this_timeslice = 0; 1697 new_qos->poller = NULL; 1698 TAILQ_INIT(&new_qos->queued); 1699 1700 bdev->internal.qos = new_qos; 1701 1702 if (old_qos->thread == NULL) { 1703 free(old_qos); 1704 } else { 1705 spdk_thread_send_msg(old_qos->thread, spdk_bdev_qos_channel_destroy, 1706 old_qos); 1707 } 1708 1709 /* It is safe to continue with destroying the bdev even though the QoS channel hasn't 1710 * been destroyed yet. The destruction path will end up waiting for the final 1711 * channel to be put before it releases resources. */ 1712 1713 return 0; 1714 } 1715 1716 static void 1717 _spdk_bdev_io_stat_add(struct spdk_bdev_io_stat *total, struct spdk_bdev_io_stat *add) 1718 { 1719 total->bytes_read += add->bytes_read; 1720 total->num_read_ops += add->num_read_ops; 1721 total->bytes_written += add->bytes_written; 1722 total->num_write_ops += add->num_write_ops; 1723 total->read_latency_ticks += add->read_latency_ticks; 1724 total->write_latency_ticks += add->write_latency_ticks; 1725 } 1726 1727 static void 1728 spdk_bdev_channel_destroy(void *io_device, void *ctx_buf) 1729 { 1730 struct spdk_bdev_channel *ch = ctx_buf; 1731 struct spdk_bdev_mgmt_channel *mgmt_ch; 1732 struct spdk_bdev_shared_resource *shared_resource = ch->shared_resource; 1733 1734 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Destroying channel %p for bdev %s on thread %p\n", ch, ch->bdev->name, 1735 spdk_get_thread()); 1736 1737 /* This channel is going away, so add its statistics into the bdev so that they don't get lost. */ 1738 pthread_mutex_lock(&ch->bdev->internal.mutex); 1739 _spdk_bdev_io_stat_add(&ch->bdev->internal.stat, &ch->stat); 1740 pthread_mutex_unlock(&ch->bdev->internal.mutex); 1741 1742 mgmt_ch = shared_resource->mgmt_ch; 1743 1744 _spdk_bdev_abort_queued_io(&ch->queued_resets, ch); 1745 _spdk_bdev_abort_queued_io(&shared_resource->nomem_io, ch); 1746 _spdk_bdev_abort_buf_io(&mgmt_ch->need_buf_small, ch); 1747 _spdk_bdev_abort_buf_io(&mgmt_ch->need_buf_large, ch); 1748 1749 _spdk_bdev_channel_destroy_resource(ch); 1750 } 1751 1752 int 1753 spdk_bdev_alias_add(struct spdk_bdev *bdev, const char *alias) 1754 { 1755 struct spdk_bdev_alias *tmp; 1756 1757 if (alias == NULL) { 1758 SPDK_ERRLOG("Empty alias passed\n"); 1759 return -EINVAL; 1760 } 1761 1762 if (spdk_bdev_get_by_name(alias)) { 1763 SPDK_ERRLOG("Bdev name/alias: %s already exists\n", alias); 1764 return -EEXIST; 1765 } 1766 1767 tmp = calloc(1, sizeof(*tmp)); 1768 if (tmp == NULL) { 1769 SPDK_ERRLOG("Unable to allocate alias\n"); 1770 return -ENOMEM; 1771 } 1772 1773 tmp->alias = strdup(alias); 1774 if (tmp->alias == NULL) { 1775 free(tmp); 1776 SPDK_ERRLOG("Unable to allocate alias\n"); 1777 return -ENOMEM; 1778 } 1779 1780 TAILQ_INSERT_TAIL(&bdev->aliases, tmp, tailq); 1781 1782 return 0; 1783 } 1784 1785 int 1786 spdk_bdev_alias_del(struct spdk_bdev *bdev, const char *alias) 1787 { 1788 struct spdk_bdev_alias *tmp; 1789 1790 TAILQ_FOREACH(tmp, &bdev->aliases, tailq) { 1791 if (strcmp(alias, tmp->alias) == 0) { 1792 TAILQ_REMOVE(&bdev->aliases, tmp, tailq); 1793 free(tmp->alias); 1794 free(tmp); 1795 return 0; 1796 } 1797 } 1798 1799 SPDK_INFOLOG(SPDK_LOG_BDEV, "Alias %s does not exists\n", alias); 1800 1801 return -ENOENT; 1802 } 1803 1804 void 1805 spdk_bdev_alias_del_all(struct spdk_bdev *bdev) 1806 { 1807 struct spdk_bdev_alias *p, *tmp; 1808 1809 TAILQ_FOREACH_SAFE(p, &bdev->aliases, tailq, tmp) { 1810 TAILQ_REMOVE(&bdev->aliases, p, tailq); 1811 free(p->alias); 1812 free(p); 1813 } 1814 } 1815 1816 struct spdk_io_channel * 1817 spdk_bdev_get_io_channel(struct spdk_bdev_desc *desc) 1818 { 1819 return spdk_get_io_channel(__bdev_to_io_dev(desc->bdev)); 1820 } 1821 1822 const char * 1823 spdk_bdev_get_name(const struct spdk_bdev *bdev) 1824 { 1825 return bdev->name; 1826 } 1827 1828 const char * 1829 spdk_bdev_get_product_name(const struct spdk_bdev *bdev) 1830 { 1831 return bdev->product_name; 1832 } 1833 1834 const struct spdk_bdev_aliases_list * 1835 spdk_bdev_get_aliases(const struct spdk_bdev *bdev) 1836 { 1837 return &bdev->aliases; 1838 } 1839 1840 uint32_t 1841 spdk_bdev_get_block_size(const struct spdk_bdev *bdev) 1842 { 1843 return bdev->blocklen; 1844 } 1845 1846 uint64_t 1847 spdk_bdev_get_num_blocks(const struct spdk_bdev *bdev) 1848 { 1849 return bdev->blockcnt; 1850 } 1851 1852 uint64_t 1853 spdk_bdev_get_qos_ios_per_sec(struct spdk_bdev *bdev) 1854 { 1855 uint64_t iops_rate_limit = 0; 1856 1857 pthread_mutex_lock(&bdev->internal.mutex); 1858 if (bdev->internal.qos) { 1859 iops_rate_limit = bdev->internal.qos->iops_rate_limit; 1860 } 1861 pthread_mutex_unlock(&bdev->internal.mutex); 1862 1863 return iops_rate_limit; 1864 } 1865 1866 size_t 1867 spdk_bdev_get_buf_align(const struct spdk_bdev *bdev) 1868 { 1869 /* TODO: push this logic down to the bdev modules */ 1870 if (bdev->need_aligned_buffer) { 1871 return bdev->blocklen; 1872 } 1873 1874 return 1; 1875 } 1876 1877 uint32_t 1878 spdk_bdev_get_optimal_io_boundary(const struct spdk_bdev *bdev) 1879 { 1880 return bdev->optimal_io_boundary; 1881 } 1882 1883 bool 1884 spdk_bdev_has_write_cache(const struct spdk_bdev *bdev) 1885 { 1886 return bdev->write_cache; 1887 } 1888 1889 const struct spdk_uuid * 1890 spdk_bdev_get_uuid(const struct spdk_bdev *bdev) 1891 { 1892 return &bdev->uuid; 1893 } 1894 1895 uint64_t 1896 spdk_bdev_get_qd(const struct spdk_bdev *bdev) 1897 { 1898 return bdev->internal.measured_queue_depth; 1899 } 1900 1901 uint64_t 1902 spdk_bdev_get_qd_sampling_period(const struct spdk_bdev *bdev) 1903 { 1904 return bdev->internal.period; 1905 } 1906 1907 uint64_t 1908 spdk_bdev_get_weighted_io_time(const struct spdk_bdev *bdev) 1909 { 1910 return bdev->internal.weighted_io_time; 1911 } 1912 1913 uint64_t 1914 spdk_bdev_get_io_time(const struct spdk_bdev *bdev) 1915 { 1916 return bdev->internal.io_time; 1917 } 1918 1919 static void 1920 _calculate_measured_qd_cpl(struct spdk_io_channel_iter *i, int status) 1921 { 1922 struct spdk_bdev *bdev = spdk_io_channel_iter_get_ctx(i); 1923 1924 bdev->internal.measured_queue_depth = bdev->internal.temporary_queue_depth; 1925 1926 if (bdev->internal.measured_queue_depth) { 1927 bdev->internal.io_time += bdev->internal.period; 1928 bdev->internal.weighted_io_time += bdev->internal.period * bdev->internal.measured_queue_depth; 1929 } 1930 } 1931 1932 static void 1933 _calculate_measured_qd(struct spdk_io_channel_iter *i) 1934 { 1935 struct spdk_bdev *bdev = spdk_io_channel_iter_get_ctx(i); 1936 struct spdk_io_channel *io_ch = spdk_io_channel_iter_get_channel(i); 1937 struct spdk_bdev_channel *ch = spdk_io_channel_get_ctx(io_ch); 1938 1939 bdev->internal.temporary_queue_depth += ch->io_outstanding; 1940 spdk_for_each_channel_continue(i, 0); 1941 } 1942 1943 static int 1944 spdk_bdev_calculate_measured_queue_depth(void *ctx) 1945 { 1946 struct spdk_bdev *bdev = ctx; 1947 bdev->internal.temporary_queue_depth = 0; 1948 spdk_for_each_channel(__bdev_to_io_dev(bdev), _calculate_measured_qd, bdev, 1949 _calculate_measured_qd_cpl); 1950 return 0; 1951 } 1952 1953 void 1954 spdk_bdev_set_qd_sampling_period(struct spdk_bdev *bdev, uint64_t period) 1955 { 1956 bdev->internal.period = period; 1957 1958 if (bdev->internal.qd_poller != NULL) { 1959 spdk_poller_unregister(&bdev->internal.qd_poller); 1960 bdev->internal.measured_queue_depth = UINT64_MAX; 1961 } 1962 1963 if (period != 0) { 1964 bdev->internal.qd_poller = spdk_poller_register(spdk_bdev_calculate_measured_queue_depth, bdev, 1965 period); 1966 } 1967 } 1968 1969 int 1970 spdk_bdev_notify_blockcnt_change(struct spdk_bdev *bdev, uint64_t size) 1971 { 1972 int ret; 1973 1974 pthread_mutex_lock(&bdev->internal.mutex); 1975 1976 /* bdev has open descriptors */ 1977 if (!TAILQ_EMPTY(&bdev->internal.open_descs) && 1978 bdev->blockcnt > size) { 1979 ret = -EBUSY; 1980 } else { 1981 bdev->blockcnt = size; 1982 ret = 0; 1983 } 1984 1985 pthread_mutex_unlock(&bdev->internal.mutex); 1986 1987 return ret; 1988 } 1989 1990 /* 1991 * Convert I/O offset and length from bytes to blocks. 1992 * 1993 * Returns zero on success or non-zero if the byte parameters aren't divisible by the block size. 1994 */ 1995 static uint64_t 1996 spdk_bdev_bytes_to_blocks(struct spdk_bdev *bdev, uint64_t offset_bytes, uint64_t *offset_blocks, 1997 uint64_t num_bytes, uint64_t *num_blocks) 1998 { 1999 uint32_t block_size = bdev->blocklen; 2000 2001 *offset_blocks = offset_bytes / block_size; 2002 *num_blocks = num_bytes / block_size; 2003 2004 return (offset_bytes % block_size) | (num_bytes % block_size); 2005 } 2006 2007 static bool 2008 spdk_bdev_io_valid_blocks(struct spdk_bdev *bdev, uint64_t offset_blocks, uint64_t num_blocks) 2009 { 2010 /* Return failure if offset_blocks + num_blocks is less than offset_blocks; indicates there 2011 * has been an overflow and hence the offset has been wrapped around */ 2012 if (offset_blocks + num_blocks < offset_blocks) { 2013 return false; 2014 } 2015 2016 /* Return failure if offset_blocks + num_blocks exceeds the size of the bdev */ 2017 if (offset_blocks + num_blocks > bdev->blockcnt) { 2018 return false; 2019 } 2020 2021 return true; 2022 } 2023 2024 int 2025 spdk_bdev_read(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 2026 void *buf, uint64_t offset, uint64_t nbytes, 2027 spdk_bdev_io_completion_cb cb, void *cb_arg) 2028 { 2029 uint64_t offset_blocks, num_blocks; 2030 2031 if (spdk_bdev_bytes_to_blocks(desc->bdev, offset, &offset_blocks, nbytes, &num_blocks) != 0) { 2032 return -EINVAL; 2033 } 2034 2035 return spdk_bdev_read_blocks(desc, ch, buf, offset_blocks, num_blocks, cb, cb_arg); 2036 } 2037 2038 int 2039 spdk_bdev_read_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 2040 void *buf, uint64_t offset_blocks, uint64_t num_blocks, 2041 spdk_bdev_io_completion_cb cb, void *cb_arg) 2042 { 2043 struct spdk_bdev *bdev = desc->bdev; 2044 struct spdk_bdev_io *bdev_io; 2045 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 2046 2047 if (!spdk_bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) { 2048 return -EINVAL; 2049 } 2050 2051 bdev_io = spdk_bdev_get_io(channel); 2052 if (!bdev_io) { 2053 return -ENOMEM; 2054 } 2055 2056 bdev_io->internal.ch = channel; 2057 bdev_io->internal.desc = desc; 2058 bdev_io->type = SPDK_BDEV_IO_TYPE_READ; 2059 bdev_io->u.bdev.iovs = &bdev_io->iov; 2060 bdev_io->u.bdev.iovs[0].iov_base = buf; 2061 bdev_io->u.bdev.iovs[0].iov_len = num_blocks * bdev->blocklen; 2062 bdev_io->u.bdev.iovcnt = 1; 2063 bdev_io->u.bdev.num_blocks = num_blocks; 2064 bdev_io->u.bdev.offset_blocks = offset_blocks; 2065 spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb); 2066 2067 spdk_bdev_io_submit(bdev_io); 2068 return 0; 2069 } 2070 2071 int 2072 spdk_bdev_readv(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 2073 struct iovec *iov, int iovcnt, 2074 uint64_t offset, uint64_t nbytes, 2075 spdk_bdev_io_completion_cb cb, void *cb_arg) 2076 { 2077 uint64_t offset_blocks, num_blocks; 2078 2079 if (spdk_bdev_bytes_to_blocks(desc->bdev, offset, &offset_blocks, nbytes, &num_blocks) != 0) { 2080 return -EINVAL; 2081 } 2082 2083 return spdk_bdev_readv_blocks(desc, ch, iov, iovcnt, offset_blocks, num_blocks, cb, cb_arg); 2084 } 2085 2086 int spdk_bdev_readv_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 2087 struct iovec *iov, int iovcnt, 2088 uint64_t offset_blocks, uint64_t num_blocks, 2089 spdk_bdev_io_completion_cb cb, void *cb_arg) 2090 { 2091 struct spdk_bdev *bdev = desc->bdev; 2092 struct spdk_bdev_io *bdev_io; 2093 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 2094 2095 if (!spdk_bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) { 2096 return -EINVAL; 2097 } 2098 2099 bdev_io = spdk_bdev_get_io(channel); 2100 if (!bdev_io) { 2101 return -ENOMEM; 2102 } 2103 2104 bdev_io->internal.ch = channel; 2105 bdev_io->internal.desc = desc; 2106 bdev_io->type = SPDK_BDEV_IO_TYPE_READ; 2107 bdev_io->u.bdev.iovs = iov; 2108 bdev_io->u.bdev.iovcnt = iovcnt; 2109 bdev_io->u.bdev.num_blocks = num_blocks; 2110 bdev_io->u.bdev.offset_blocks = offset_blocks; 2111 spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb); 2112 2113 spdk_bdev_io_submit(bdev_io); 2114 return 0; 2115 } 2116 2117 int 2118 spdk_bdev_write(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 2119 void *buf, uint64_t offset, uint64_t nbytes, 2120 spdk_bdev_io_completion_cb cb, void *cb_arg) 2121 { 2122 uint64_t offset_blocks, num_blocks; 2123 2124 if (spdk_bdev_bytes_to_blocks(desc->bdev, offset, &offset_blocks, nbytes, &num_blocks) != 0) { 2125 return -EINVAL; 2126 } 2127 2128 return spdk_bdev_write_blocks(desc, ch, buf, offset_blocks, num_blocks, cb, cb_arg); 2129 } 2130 2131 int 2132 spdk_bdev_write_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 2133 void *buf, uint64_t offset_blocks, uint64_t num_blocks, 2134 spdk_bdev_io_completion_cb cb, void *cb_arg) 2135 { 2136 struct spdk_bdev *bdev = desc->bdev; 2137 struct spdk_bdev_io *bdev_io; 2138 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 2139 2140 if (!desc->write) { 2141 return -EBADF; 2142 } 2143 2144 if (!spdk_bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) { 2145 return -EINVAL; 2146 } 2147 2148 bdev_io = spdk_bdev_get_io(channel); 2149 if (!bdev_io) { 2150 return -ENOMEM; 2151 } 2152 2153 bdev_io->internal.ch = channel; 2154 bdev_io->internal.desc = desc; 2155 bdev_io->type = SPDK_BDEV_IO_TYPE_WRITE; 2156 bdev_io->u.bdev.iovs = &bdev_io->iov; 2157 bdev_io->u.bdev.iovs[0].iov_base = buf; 2158 bdev_io->u.bdev.iovs[0].iov_len = num_blocks * bdev->blocklen; 2159 bdev_io->u.bdev.iovcnt = 1; 2160 bdev_io->u.bdev.num_blocks = num_blocks; 2161 bdev_io->u.bdev.offset_blocks = offset_blocks; 2162 spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb); 2163 2164 spdk_bdev_io_submit(bdev_io); 2165 return 0; 2166 } 2167 2168 int 2169 spdk_bdev_writev(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 2170 struct iovec *iov, int iovcnt, 2171 uint64_t offset, uint64_t len, 2172 spdk_bdev_io_completion_cb cb, void *cb_arg) 2173 { 2174 uint64_t offset_blocks, num_blocks; 2175 2176 if (spdk_bdev_bytes_to_blocks(desc->bdev, offset, &offset_blocks, len, &num_blocks) != 0) { 2177 return -EINVAL; 2178 } 2179 2180 return spdk_bdev_writev_blocks(desc, ch, iov, iovcnt, offset_blocks, num_blocks, cb, cb_arg); 2181 } 2182 2183 int 2184 spdk_bdev_writev_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 2185 struct iovec *iov, int iovcnt, 2186 uint64_t offset_blocks, uint64_t num_blocks, 2187 spdk_bdev_io_completion_cb cb, void *cb_arg) 2188 { 2189 struct spdk_bdev *bdev = desc->bdev; 2190 struct spdk_bdev_io *bdev_io; 2191 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 2192 2193 if (!desc->write) { 2194 return -EBADF; 2195 } 2196 2197 if (!spdk_bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) { 2198 return -EINVAL; 2199 } 2200 2201 bdev_io = spdk_bdev_get_io(channel); 2202 if (!bdev_io) { 2203 return -ENOMEM; 2204 } 2205 2206 bdev_io->internal.ch = channel; 2207 bdev_io->internal.desc = desc; 2208 bdev_io->type = SPDK_BDEV_IO_TYPE_WRITE; 2209 bdev_io->u.bdev.iovs = iov; 2210 bdev_io->u.bdev.iovcnt = iovcnt; 2211 bdev_io->u.bdev.num_blocks = num_blocks; 2212 bdev_io->u.bdev.offset_blocks = offset_blocks; 2213 spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb); 2214 2215 spdk_bdev_io_submit(bdev_io); 2216 return 0; 2217 } 2218 2219 int 2220 spdk_bdev_write_zeroes(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 2221 uint64_t offset, uint64_t len, 2222 spdk_bdev_io_completion_cb cb, void *cb_arg) 2223 { 2224 uint64_t offset_blocks, num_blocks; 2225 2226 if (spdk_bdev_bytes_to_blocks(desc->bdev, offset, &offset_blocks, len, &num_blocks) != 0) { 2227 return -EINVAL; 2228 } 2229 2230 return spdk_bdev_write_zeroes_blocks(desc, ch, offset_blocks, num_blocks, cb, cb_arg); 2231 } 2232 2233 int 2234 spdk_bdev_write_zeroes_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 2235 uint64_t offset_blocks, uint64_t num_blocks, 2236 spdk_bdev_io_completion_cb cb, void *cb_arg) 2237 { 2238 struct spdk_bdev *bdev = desc->bdev; 2239 struct spdk_bdev_io *bdev_io; 2240 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 2241 2242 if (!desc->write) { 2243 return -EBADF; 2244 } 2245 2246 if (!spdk_bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) { 2247 return -EINVAL; 2248 } 2249 2250 bdev_io = spdk_bdev_get_io(channel); 2251 2252 if (!bdev_io) { 2253 return -ENOMEM; 2254 } 2255 2256 bdev_io->type = SPDK_BDEV_IO_TYPE_WRITE_ZEROES; 2257 bdev_io->internal.ch = channel; 2258 bdev_io->internal.desc = desc; 2259 bdev_io->u.bdev.offset_blocks = offset_blocks; 2260 bdev_io->u.bdev.num_blocks = num_blocks; 2261 spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb); 2262 2263 if (_spdk_bdev_io_type_supported(bdev, SPDK_BDEV_IO_TYPE_WRITE_ZEROES)) { 2264 spdk_bdev_io_submit(bdev_io); 2265 return 0; 2266 } else if (_spdk_bdev_io_type_supported(bdev, SPDK_BDEV_IO_TYPE_WRITE)) { 2267 assert(spdk_bdev_get_block_size(bdev) <= ZERO_BUFFER_SIZE); 2268 bdev_io->u.bdev.split_remaining_num_blocks = num_blocks; 2269 bdev_io->u.bdev.split_current_offset_blocks = offset_blocks; 2270 _spdk_bdev_write_zero_buffer_next(bdev_io); 2271 return 0; 2272 } else { 2273 spdk_bdev_free_io(bdev_io); 2274 return -ENOTSUP; 2275 } 2276 } 2277 2278 int 2279 spdk_bdev_unmap(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 2280 uint64_t offset, uint64_t nbytes, 2281 spdk_bdev_io_completion_cb cb, void *cb_arg) 2282 { 2283 uint64_t offset_blocks, num_blocks; 2284 2285 if (spdk_bdev_bytes_to_blocks(desc->bdev, offset, &offset_blocks, nbytes, &num_blocks) != 0) { 2286 return -EINVAL; 2287 } 2288 2289 return spdk_bdev_unmap_blocks(desc, ch, offset_blocks, num_blocks, cb, cb_arg); 2290 } 2291 2292 int 2293 spdk_bdev_unmap_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 2294 uint64_t offset_blocks, uint64_t num_blocks, 2295 spdk_bdev_io_completion_cb cb, void *cb_arg) 2296 { 2297 struct spdk_bdev *bdev = desc->bdev; 2298 struct spdk_bdev_io *bdev_io; 2299 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 2300 2301 if (!desc->write) { 2302 return -EBADF; 2303 } 2304 2305 if (!spdk_bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) { 2306 return -EINVAL; 2307 } 2308 2309 if (num_blocks == 0) { 2310 SPDK_ERRLOG("Can't unmap 0 bytes\n"); 2311 return -EINVAL; 2312 } 2313 2314 bdev_io = spdk_bdev_get_io(channel); 2315 if (!bdev_io) { 2316 return -ENOMEM; 2317 } 2318 2319 bdev_io->internal.ch = channel; 2320 bdev_io->internal.desc = desc; 2321 bdev_io->type = SPDK_BDEV_IO_TYPE_UNMAP; 2322 2323 bdev_io->u.bdev.iovs = &bdev_io->iov; 2324 bdev_io->u.bdev.iovs[0].iov_base = NULL; 2325 bdev_io->u.bdev.iovs[0].iov_len = 0; 2326 bdev_io->u.bdev.iovcnt = 1; 2327 2328 bdev_io->u.bdev.offset_blocks = offset_blocks; 2329 bdev_io->u.bdev.num_blocks = num_blocks; 2330 spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb); 2331 2332 spdk_bdev_io_submit(bdev_io); 2333 return 0; 2334 } 2335 2336 int 2337 spdk_bdev_flush(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 2338 uint64_t offset, uint64_t length, 2339 spdk_bdev_io_completion_cb cb, void *cb_arg) 2340 { 2341 uint64_t offset_blocks, num_blocks; 2342 2343 if (spdk_bdev_bytes_to_blocks(desc->bdev, offset, &offset_blocks, length, &num_blocks) != 0) { 2344 return -EINVAL; 2345 } 2346 2347 return spdk_bdev_flush_blocks(desc, ch, offset_blocks, num_blocks, cb, cb_arg); 2348 } 2349 2350 int 2351 spdk_bdev_flush_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 2352 uint64_t offset_blocks, uint64_t num_blocks, 2353 spdk_bdev_io_completion_cb cb, void *cb_arg) 2354 { 2355 struct spdk_bdev *bdev = desc->bdev; 2356 struct spdk_bdev_io *bdev_io; 2357 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 2358 2359 if (!desc->write) { 2360 return -EBADF; 2361 } 2362 2363 if (!spdk_bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) { 2364 return -EINVAL; 2365 } 2366 2367 bdev_io = spdk_bdev_get_io(channel); 2368 if (!bdev_io) { 2369 return -ENOMEM; 2370 } 2371 2372 bdev_io->internal.ch = channel; 2373 bdev_io->internal.desc = desc; 2374 bdev_io->type = SPDK_BDEV_IO_TYPE_FLUSH; 2375 bdev_io->u.bdev.iovs = NULL; 2376 bdev_io->u.bdev.iovcnt = 0; 2377 bdev_io->u.bdev.offset_blocks = offset_blocks; 2378 bdev_io->u.bdev.num_blocks = num_blocks; 2379 spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb); 2380 2381 spdk_bdev_io_submit(bdev_io); 2382 return 0; 2383 } 2384 2385 static void 2386 _spdk_bdev_reset_dev(struct spdk_io_channel_iter *i, int status) 2387 { 2388 struct spdk_bdev_channel *ch = spdk_io_channel_iter_get_ctx(i); 2389 struct spdk_bdev_io *bdev_io; 2390 2391 bdev_io = TAILQ_FIRST(&ch->queued_resets); 2392 TAILQ_REMOVE(&ch->queued_resets, bdev_io, internal.link); 2393 spdk_bdev_io_submit_reset(bdev_io); 2394 } 2395 2396 static void 2397 _spdk_bdev_reset_freeze_channel(struct spdk_io_channel_iter *i) 2398 { 2399 struct spdk_io_channel *ch; 2400 struct spdk_bdev_channel *channel; 2401 struct spdk_bdev_mgmt_channel *mgmt_channel; 2402 struct spdk_bdev_shared_resource *shared_resource; 2403 bdev_io_tailq_t tmp_queued; 2404 2405 TAILQ_INIT(&tmp_queued); 2406 2407 ch = spdk_io_channel_iter_get_channel(i); 2408 channel = spdk_io_channel_get_ctx(ch); 2409 shared_resource = channel->shared_resource; 2410 mgmt_channel = shared_resource->mgmt_ch; 2411 2412 channel->flags |= BDEV_CH_RESET_IN_PROGRESS; 2413 2414 if ((channel->flags & BDEV_CH_QOS_ENABLED) != 0) { 2415 /* The QoS object is always valid and readable while 2416 * the channel flag is set, so the lock here should not 2417 * be necessary. We're not in the fast path though, so 2418 * just take it anyway. */ 2419 pthread_mutex_lock(&channel->bdev->internal.mutex); 2420 if (channel->bdev->internal.qos->ch == channel) { 2421 TAILQ_SWAP(&channel->bdev->internal.qos->queued, &tmp_queued, spdk_bdev_io, internal.link); 2422 } 2423 pthread_mutex_unlock(&channel->bdev->internal.mutex); 2424 } 2425 2426 _spdk_bdev_abort_queued_io(&shared_resource->nomem_io, channel); 2427 _spdk_bdev_abort_buf_io(&mgmt_channel->need_buf_small, channel); 2428 _spdk_bdev_abort_buf_io(&mgmt_channel->need_buf_large, channel); 2429 _spdk_bdev_abort_queued_io(&tmp_queued, channel); 2430 2431 spdk_for_each_channel_continue(i, 0); 2432 } 2433 2434 static void 2435 _spdk_bdev_start_reset(void *ctx) 2436 { 2437 struct spdk_bdev_channel *ch = ctx; 2438 2439 spdk_for_each_channel(__bdev_to_io_dev(ch->bdev), _spdk_bdev_reset_freeze_channel, 2440 ch, _spdk_bdev_reset_dev); 2441 } 2442 2443 static void 2444 _spdk_bdev_channel_start_reset(struct spdk_bdev_channel *ch) 2445 { 2446 struct spdk_bdev *bdev = ch->bdev; 2447 2448 assert(!TAILQ_EMPTY(&ch->queued_resets)); 2449 2450 pthread_mutex_lock(&bdev->internal.mutex); 2451 if (bdev->internal.reset_in_progress == NULL) { 2452 bdev->internal.reset_in_progress = TAILQ_FIRST(&ch->queued_resets); 2453 /* 2454 * Take a channel reference for the target bdev for the life of this 2455 * reset. This guards against the channel getting destroyed while 2456 * spdk_for_each_channel() calls related to this reset IO are in 2457 * progress. We will release the reference when this reset is 2458 * completed. 2459 */ 2460 bdev->internal.reset_in_progress->u.reset.ch_ref = spdk_get_io_channel(__bdev_to_io_dev(bdev)); 2461 _spdk_bdev_start_reset(ch); 2462 } 2463 pthread_mutex_unlock(&bdev->internal.mutex); 2464 } 2465 2466 int 2467 spdk_bdev_reset(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 2468 spdk_bdev_io_completion_cb cb, void *cb_arg) 2469 { 2470 struct spdk_bdev *bdev = desc->bdev; 2471 struct spdk_bdev_io *bdev_io; 2472 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 2473 2474 bdev_io = spdk_bdev_get_io(channel); 2475 if (!bdev_io) { 2476 return -ENOMEM; 2477 } 2478 2479 bdev_io->internal.ch = channel; 2480 bdev_io->internal.desc = desc; 2481 bdev_io->type = SPDK_BDEV_IO_TYPE_RESET; 2482 bdev_io->u.reset.ch_ref = NULL; 2483 spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb); 2484 2485 pthread_mutex_lock(&bdev->internal.mutex); 2486 TAILQ_INSERT_TAIL(&channel->queued_resets, bdev_io, internal.link); 2487 pthread_mutex_unlock(&bdev->internal.mutex); 2488 2489 _spdk_bdev_channel_start_reset(channel); 2490 2491 return 0; 2492 } 2493 2494 void 2495 spdk_bdev_get_io_stat(struct spdk_bdev *bdev, struct spdk_io_channel *ch, 2496 struct spdk_bdev_io_stat *stat) 2497 { 2498 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 2499 2500 *stat = channel->stat; 2501 } 2502 2503 static void 2504 _spdk_bdev_get_device_stat_done(struct spdk_io_channel_iter *i, int status) 2505 { 2506 void *io_device = spdk_io_channel_iter_get_io_device(i); 2507 struct spdk_bdev_iostat_ctx *bdev_iostat_ctx = spdk_io_channel_iter_get_ctx(i); 2508 2509 bdev_iostat_ctx->cb(__bdev_from_io_dev(io_device), bdev_iostat_ctx->stat, 2510 bdev_iostat_ctx->cb_arg, 0); 2511 free(bdev_iostat_ctx); 2512 } 2513 2514 static void 2515 _spdk_bdev_get_each_channel_stat(struct spdk_io_channel_iter *i) 2516 { 2517 struct spdk_bdev_iostat_ctx *bdev_iostat_ctx = spdk_io_channel_iter_get_ctx(i); 2518 struct spdk_io_channel *ch = spdk_io_channel_iter_get_channel(i); 2519 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 2520 2521 _spdk_bdev_io_stat_add(bdev_iostat_ctx->stat, &channel->stat); 2522 spdk_for_each_channel_continue(i, 0); 2523 } 2524 2525 void 2526 spdk_bdev_get_device_stat(struct spdk_bdev *bdev, struct spdk_bdev_io_stat *stat, 2527 spdk_bdev_get_device_stat_cb cb, void *cb_arg) 2528 { 2529 struct spdk_bdev_iostat_ctx *bdev_iostat_ctx; 2530 2531 assert(bdev != NULL); 2532 assert(stat != NULL); 2533 assert(cb != NULL); 2534 2535 bdev_iostat_ctx = calloc(1, sizeof(struct spdk_bdev_iostat_ctx)); 2536 if (bdev_iostat_ctx == NULL) { 2537 SPDK_ERRLOG("Unable to allocate memory for spdk_bdev_iostat_ctx\n"); 2538 cb(bdev, stat, cb_arg, -ENOMEM); 2539 return; 2540 } 2541 2542 bdev_iostat_ctx->stat = stat; 2543 bdev_iostat_ctx->cb = cb; 2544 bdev_iostat_ctx->cb_arg = cb_arg; 2545 2546 /* Start with the statistics from previously deleted channels. */ 2547 pthread_mutex_lock(&bdev->internal.mutex); 2548 _spdk_bdev_io_stat_add(bdev_iostat_ctx->stat, &bdev->internal.stat); 2549 pthread_mutex_unlock(&bdev->internal.mutex); 2550 2551 /* Then iterate and add the statistics from each existing channel. */ 2552 spdk_for_each_channel(__bdev_to_io_dev(bdev), 2553 _spdk_bdev_get_each_channel_stat, 2554 bdev_iostat_ctx, 2555 _spdk_bdev_get_device_stat_done); 2556 } 2557 2558 int 2559 spdk_bdev_nvme_admin_passthru(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 2560 const struct spdk_nvme_cmd *cmd, void *buf, size_t nbytes, 2561 spdk_bdev_io_completion_cb cb, void *cb_arg) 2562 { 2563 struct spdk_bdev *bdev = desc->bdev; 2564 struct spdk_bdev_io *bdev_io; 2565 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 2566 2567 if (!desc->write) { 2568 return -EBADF; 2569 } 2570 2571 bdev_io = spdk_bdev_get_io(channel); 2572 if (!bdev_io) { 2573 return -ENOMEM; 2574 } 2575 2576 bdev_io->internal.ch = channel; 2577 bdev_io->internal.desc = desc; 2578 bdev_io->type = SPDK_BDEV_IO_TYPE_NVME_ADMIN; 2579 bdev_io->u.nvme_passthru.cmd = *cmd; 2580 bdev_io->u.nvme_passthru.buf = buf; 2581 bdev_io->u.nvme_passthru.nbytes = nbytes; 2582 bdev_io->u.nvme_passthru.md_buf = NULL; 2583 bdev_io->u.nvme_passthru.md_len = 0; 2584 2585 spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb); 2586 2587 spdk_bdev_io_submit(bdev_io); 2588 return 0; 2589 } 2590 2591 int 2592 spdk_bdev_nvme_io_passthru(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 2593 const struct spdk_nvme_cmd *cmd, void *buf, size_t nbytes, 2594 spdk_bdev_io_completion_cb cb, void *cb_arg) 2595 { 2596 struct spdk_bdev *bdev = desc->bdev; 2597 struct spdk_bdev_io *bdev_io; 2598 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 2599 2600 if (!desc->write) { 2601 /* 2602 * Do not try to parse the NVMe command - we could maybe use bits in the opcode 2603 * to easily determine if the command is a read or write, but for now just 2604 * do not allow io_passthru with a read-only descriptor. 2605 */ 2606 return -EBADF; 2607 } 2608 2609 bdev_io = spdk_bdev_get_io(channel); 2610 if (!bdev_io) { 2611 return -ENOMEM; 2612 } 2613 2614 bdev_io->internal.ch = channel; 2615 bdev_io->internal.desc = desc; 2616 bdev_io->type = SPDK_BDEV_IO_TYPE_NVME_IO; 2617 bdev_io->u.nvme_passthru.cmd = *cmd; 2618 bdev_io->u.nvme_passthru.buf = buf; 2619 bdev_io->u.nvme_passthru.nbytes = nbytes; 2620 bdev_io->u.nvme_passthru.md_buf = NULL; 2621 bdev_io->u.nvme_passthru.md_len = 0; 2622 2623 spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb); 2624 2625 spdk_bdev_io_submit(bdev_io); 2626 return 0; 2627 } 2628 2629 int 2630 spdk_bdev_nvme_io_passthru_md(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 2631 const struct spdk_nvme_cmd *cmd, void *buf, size_t nbytes, void *md_buf, size_t md_len, 2632 spdk_bdev_io_completion_cb cb, void *cb_arg) 2633 { 2634 struct spdk_bdev *bdev = desc->bdev; 2635 struct spdk_bdev_io *bdev_io; 2636 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 2637 2638 if (!desc->write) { 2639 /* 2640 * Do not try to parse the NVMe command - we could maybe use bits in the opcode 2641 * to easily determine if the command is a read or write, but for now just 2642 * do not allow io_passthru with a read-only descriptor. 2643 */ 2644 return -EBADF; 2645 } 2646 2647 bdev_io = spdk_bdev_get_io(channel); 2648 if (!bdev_io) { 2649 return -ENOMEM; 2650 } 2651 2652 bdev_io->internal.ch = channel; 2653 bdev_io->internal.desc = desc; 2654 bdev_io->type = SPDK_BDEV_IO_TYPE_NVME_IO_MD; 2655 bdev_io->u.nvme_passthru.cmd = *cmd; 2656 bdev_io->u.nvme_passthru.buf = buf; 2657 bdev_io->u.nvme_passthru.nbytes = nbytes; 2658 bdev_io->u.nvme_passthru.md_buf = md_buf; 2659 bdev_io->u.nvme_passthru.md_len = md_len; 2660 2661 spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb); 2662 2663 spdk_bdev_io_submit(bdev_io); 2664 return 0; 2665 } 2666 2667 int 2668 spdk_bdev_queue_io_wait(struct spdk_bdev *bdev, struct spdk_io_channel *ch, 2669 struct spdk_bdev_io_wait_entry *entry) 2670 { 2671 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 2672 struct spdk_bdev_mgmt_channel *mgmt_ch = channel->shared_resource->mgmt_ch; 2673 2674 if (bdev != entry->bdev) { 2675 SPDK_ERRLOG("bdevs do not match\n"); 2676 return -EINVAL; 2677 } 2678 2679 if (mgmt_ch->per_thread_cache_count > 0) { 2680 SPDK_ERRLOG("Cannot queue io_wait if spdk_bdev_io available in per-thread cache\n"); 2681 return -EINVAL; 2682 } 2683 2684 TAILQ_INSERT_TAIL(&mgmt_ch->io_wait_queue, entry, link); 2685 return 0; 2686 } 2687 2688 static void 2689 _spdk_bdev_ch_retry_io(struct spdk_bdev_channel *bdev_ch) 2690 { 2691 struct spdk_bdev *bdev = bdev_ch->bdev; 2692 struct spdk_bdev_shared_resource *shared_resource = bdev_ch->shared_resource; 2693 struct spdk_bdev_io *bdev_io; 2694 2695 if (shared_resource->io_outstanding > shared_resource->nomem_threshold) { 2696 /* 2697 * Allow some more I/O to complete before retrying the nomem_io queue. 2698 * Some drivers (such as nvme) cannot immediately take a new I/O in 2699 * the context of a completion, because the resources for the I/O are 2700 * not released until control returns to the bdev poller. Also, we 2701 * may require several small I/O to complete before a larger I/O 2702 * (that requires splitting) can be submitted. 2703 */ 2704 return; 2705 } 2706 2707 while (!TAILQ_EMPTY(&shared_resource->nomem_io)) { 2708 bdev_io = TAILQ_FIRST(&shared_resource->nomem_io); 2709 TAILQ_REMOVE(&shared_resource->nomem_io, bdev_io, internal.link); 2710 bdev_io->internal.ch->io_outstanding++; 2711 shared_resource->io_outstanding++; 2712 bdev_io->internal.status = SPDK_BDEV_IO_STATUS_PENDING; 2713 bdev->fn_table->submit_request(bdev_io->internal.ch->channel, bdev_io); 2714 if (bdev_io->internal.status == SPDK_BDEV_IO_STATUS_NOMEM) { 2715 break; 2716 } 2717 } 2718 } 2719 2720 static inline void 2721 _spdk_bdev_io_complete(void *ctx) 2722 { 2723 struct spdk_bdev_io *bdev_io = ctx; 2724 2725 if (spdk_unlikely(bdev_io->internal.in_submit_request || bdev_io->internal.io_submit_ch)) { 2726 /* 2727 * Send the completion to the thread that originally submitted the I/O, 2728 * which may not be the current thread in the case of QoS. 2729 */ 2730 if (bdev_io->internal.io_submit_ch) { 2731 bdev_io->internal.ch = bdev_io->internal.io_submit_ch; 2732 bdev_io->internal.io_submit_ch = NULL; 2733 } 2734 2735 /* 2736 * Defer completion to avoid potential infinite recursion if the 2737 * user's completion callback issues a new I/O. 2738 */ 2739 spdk_thread_send_msg(spdk_io_channel_get_thread(bdev_io->internal.ch->channel), 2740 _spdk_bdev_io_complete, bdev_io); 2741 return; 2742 } 2743 2744 if (bdev_io->internal.status == SPDK_BDEV_IO_STATUS_SUCCESS) { 2745 switch (bdev_io->type) { 2746 case SPDK_BDEV_IO_TYPE_READ: 2747 bdev_io->internal.ch->stat.bytes_read += bdev_io->u.bdev.num_blocks * bdev_io->bdev->blocklen; 2748 bdev_io->internal.ch->stat.num_read_ops++; 2749 bdev_io->internal.ch->stat.read_latency_ticks += (spdk_get_ticks() - bdev_io->internal.submit_tsc); 2750 break; 2751 case SPDK_BDEV_IO_TYPE_WRITE: 2752 bdev_io->internal.ch->stat.bytes_written += bdev_io->u.bdev.num_blocks * bdev_io->bdev->blocklen; 2753 bdev_io->internal.ch->stat.num_write_ops++; 2754 bdev_io->internal.ch->stat.write_latency_ticks += (spdk_get_ticks() - bdev_io->internal.submit_tsc); 2755 break; 2756 default: 2757 break; 2758 } 2759 } 2760 2761 #ifdef SPDK_CONFIG_VTUNE 2762 uint64_t now_tsc = spdk_get_ticks(); 2763 if (now_tsc > (bdev_io->internal.ch->start_tsc + bdev_io->internal.ch->interval_tsc)) { 2764 uint64_t data[5]; 2765 2766 data[0] = bdev_io->internal.ch->stat.num_read_ops - bdev_io->internal.ch->prev_stat.num_read_ops; 2767 data[1] = bdev_io->internal.ch->stat.bytes_read - bdev_io->internal.ch->prev_stat.bytes_read; 2768 data[2] = bdev_io->internal.ch->stat.num_write_ops - bdev_io->internal.ch->prev_stat.num_write_ops; 2769 data[3] = bdev_io->internal.ch->stat.bytes_written - bdev_io->internal.ch->prev_stat.bytes_written; 2770 data[4] = bdev_io->bdev->fn_table->get_spin_time ? 2771 bdev_io->bdev->fn_table->get_spin_time(bdev_io->internal.ch->channel) : 0; 2772 2773 __itt_metadata_add(g_bdev_mgr.domain, __itt_null, bdev_io->internal.ch->handle, 2774 __itt_metadata_u64, 5, data); 2775 2776 bdev_io->internal.ch->prev_stat = bdev_io->internal.ch->stat; 2777 bdev_io->internal.ch->start_tsc = now_tsc; 2778 } 2779 #endif 2780 2781 assert(bdev_io->internal.cb != NULL); 2782 assert(spdk_get_thread() == spdk_io_channel_get_thread(bdev_io->internal.ch->channel)); 2783 2784 bdev_io->internal.cb(bdev_io, bdev_io->internal.status == SPDK_BDEV_IO_STATUS_SUCCESS, 2785 bdev_io->internal.caller_ctx); 2786 } 2787 2788 static void 2789 _spdk_bdev_reset_complete(struct spdk_io_channel_iter *i, int status) 2790 { 2791 struct spdk_bdev_io *bdev_io = spdk_io_channel_iter_get_ctx(i); 2792 2793 if (bdev_io->u.reset.ch_ref != NULL) { 2794 spdk_put_io_channel(bdev_io->u.reset.ch_ref); 2795 bdev_io->u.reset.ch_ref = NULL; 2796 } 2797 2798 _spdk_bdev_io_complete(bdev_io); 2799 } 2800 2801 static void 2802 _spdk_bdev_unfreeze_channel(struct spdk_io_channel_iter *i) 2803 { 2804 struct spdk_io_channel *_ch = spdk_io_channel_iter_get_channel(i); 2805 struct spdk_bdev_channel *ch = spdk_io_channel_get_ctx(_ch); 2806 2807 ch->flags &= ~BDEV_CH_RESET_IN_PROGRESS; 2808 if (!TAILQ_EMPTY(&ch->queued_resets)) { 2809 _spdk_bdev_channel_start_reset(ch); 2810 } 2811 2812 spdk_for_each_channel_continue(i, 0); 2813 } 2814 2815 void 2816 spdk_bdev_io_complete(struct spdk_bdev_io *bdev_io, enum spdk_bdev_io_status status) 2817 { 2818 struct spdk_bdev *bdev = bdev_io->bdev; 2819 struct spdk_bdev_channel *bdev_ch = bdev_io->internal.ch; 2820 struct spdk_bdev_shared_resource *shared_resource = bdev_ch->shared_resource; 2821 2822 bdev_io->internal.status = status; 2823 2824 if (spdk_unlikely(bdev_io->type == SPDK_BDEV_IO_TYPE_RESET)) { 2825 bool unlock_channels = false; 2826 2827 if (status == SPDK_BDEV_IO_STATUS_NOMEM) { 2828 SPDK_ERRLOG("NOMEM returned for reset\n"); 2829 } 2830 pthread_mutex_lock(&bdev->internal.mutex); 2831 if (bdev_io == bdev->internal.reset_in_progress) { 2832 bdev->internal.reset_in_progress = NULL; 2833 unlock_channels = true; 2834 } 2835 pthread_mutex_unlock(&bdev->internal.mutex); 2836 2837 if (unlock_channels) { 2838 spdk_for_each_channel(__bdev_to_io_dev(bdev), _spdk_bdev_unfreeze_channel, 2839 bdev_io, _spdk_bdev_reset_complete); 2840 return; 2841 } 2842 } else { 2843 assert(bdev_ch->io_outstanding > 0); 2844 assert(shared_resource->io_outstanding > 0); 2845 bdev_ch->io_outstanding--; 2846 shared_resource->io_outstanding--; 2847 2848 if (spdk_unlikely(status == SPDK_BDEV_IO_STATUS_NOMEM)) { 2849 TAILQ_INSERT_HEAD(&shared_resource->nomem_io, bdev_io, internal.link); 2850 /* 2851 * Wait for some of the outstanding I/O to complete before we 2852 * retry any of the nomem_io. Normally we will wait for 2853 * NOMEM_THRESHOLD_COUNT I/O to complete but for low queue 2854 * depth channels we will instead wait for half to complete. 2855 */ 2856 shared_resource->nomem_threshold = spdk_max((int64_t)shared_resource->io_outstanding / 2, 2857 (int64_t)shared_resource->io_outstanding - NOMEM_THRESHOLD_COUNT); 2858 return; 2859 } 2860 2861 if (spdk_unlikely(!TAILQ_EMPTY(&shared_resource->nomem_io))) { 2862 _spdk_bdev_ch_retry_io(bdev_ch); 2863 } 2864 } 2865 2866 _spdk_bdev_io_complete(bdev_io); 2867 } 2868 2869 void 2870 spdk_bdev_io_complete_scsi_status(struct spdk_bdev_io *bdev_io, enum spdk_scsi_status sc, 2871 enum spdk_scsi_sense sk, uint8_t asc, uint8_t ascq) 2872 { 2873 if (sc == SPDK_SCSI_STATUS_GOOD) { 2874 bdev_io->internal.status = SPDK_BDEV_IO_STATUS_SUCCESS; 2875 } else { 2876 bdev_io->internal.status = SPDK_BDEV_IO_STATUS_SCSI_ERROR; 2877 bdev_io->internal.error.scsi.sc = sc; 2878 bdev_io->internal.error.scsi.sk = sk; 2879 bdev_io->internal.error.scsi.asc = asc; 2880 bdev_io->internal.error.scsi.ascq = ascq; 2881 } 2882 2883 spdk_bdev_io_complete(bdev_io, bdev_io->internal.status); 2884 } 2885 2886 void 2887 spdk_bdev_io_get_scsi_status(const struct spdk_bdev_io *bdev_io, 2888 int *sc, int *sk, int *asc, int *ascq) 2889 { 2890 assert(sc != NULL); 2891 assert(sk != NULL); 2892 assert(asc != NULL); 2893 assert(ascq != NULL); 2894 2895 switch (bdev_io->internal.status) { 2896 case SPDK_BDEV_IO_STATUS_SUCCESS: 2897 *sc = SPDK_SCSI_STATUS_GOOD; 2898 *sk = SPDK_SCSI_SENSE_NO_SENSE; 2899 *asc = SPDK_SCSI_ASC_NO_ADDITIONAL_SENSE; 2900 *ascq = SPDK_SCSI_ASCQ_CAUSE_NOT_REPORTABLE; 2901 break; 2902 case SPDK_BDEV_IO_STATUS_NVME_ERROR: 2903 spdk_scsi_nvme_translate(bdev_io, sc, sk, asc, ascq); 2904 break; 2905 case SPDK_BDEV_IO_STATUS_SCSI_ERROR: 2906 *sc = bdev_io->internal.error.scsi.sc; 2907 *sk = bdev_io->internal.error.scsi.sk; 2908 *asc = bdev_io->internal.error.scsi.asc; 2909 *ascq = bdev_io->internal.error.scsi.ascq; 2910 break; 2911 default: 2912 *sc = SPDK_SCSI_STATUS_CHECK_CONDITION; 2913 *sk = SPDK_SCSI_SENSE_ABORTED_COMMAND; 2914 *asc = SPDK_SCSI_ASC_NO_ADDITIONAL_SENSE; 2915 *ascq = SPDK_SCSI_ASCQ_CAUSE_NOT_REPORTABLE; 2916 break; 2917 } 2918 } 2919 2920 void 2921 spdk_bdev_io_complete_nvme_status(struct spdk_bdev_io *bdev_io, int sct, int sc) 2922 { 2923 if (sct == SPDK_NVME_SCT_GENERIC && sc == SPDK_NVME_SC_SUCCESS) { 2924 bdev_io->internal.status = SPDK_BDEV_IO_STATUS_SUCCESS; 2925 } else { 2926 bdev_io->internal.error.nvme.sct = sct; 2927 bdev_io->internal.error.nvme.sc = sc; 2928 bdev_io->internal.status = SPDK_BDEV_IO_STATUS_NVME_ERROR; 2929 } 2930 2931 spdk_bdev_io_complete(bdev_io, bdev_io->internal.status); 2932 } 2933 2934 void 2935 spdk_bdev_io_get_nvme_status(const struct spdk_bdev_io *bdev_io, int *sct, int *sc) 2936 { 2937 assert(sct != NULL); 2938 assert(sc != NULL); 2939 2940 if (bdev_io->internal.status == SPDK_BDEV_IO_STATUS_NVME_ERROR) { 2941 *sct = bdev_io->internal.error.nvme.sct; 2942 *sc = bdev_io->internal.error.nvme.sc; 2943 } else if (bdev_io->internal.status == SPDK_BDEV_IO_STATUS_SUCCESS) { 2944 *sct = SPDK_NVME_SCT_GENERIC; 2945 *sc = SPDK_NVME_SC_SUCCESS; 2946 } else { 2947 *sct = SPDK_NVME_SCT_GENERIC; 2948 *sc = SPDK_NVME_SC_INTERNAL_DEVICE_ERROR; 2949 } 2950 } 2951 2952 struct spdk_thread * 2953 spdk_bdev_io_get_thread(struct spdk_bdev_io *bdev_io) 2954 { 2955 return spdk_io_channel_get_thread(bdev_io->internal.ch->channel); 2956 } 2957 2958 static void 2959 _spdk_bdev_qos_config_type(struct spdk_bdev *bdev, uint64_t qos_set, 2960 enum spdk_bdev_qos_type qos_type) 2961 { 2962 uint64_t min_qos_set = 0; 2963 2964 switch (qos_type) { 2965 case SPDK_BDEV_QOS_RW_IOPS_RATE_LIMIT: 2966 min_qos_set = SPDK_BDEV_QOS_MIN_IOS_PER_SEC; 2967 break; 2968 case SPDK_BDEV_QOS_RW_BYTEPS_RATE_LIMIT: 2969 min_qos_set = SPDK_BDEV_QOS_MIN_BW_IN_MB_PER_SEC; 2970 break; 2971 default: 2972 SPDK_ERRLOG("Unsupported QoS type.\n"); 2973 return; 2974 } 2975 2976 if (qos_set % min_qos_set) { 2977 SPDK_ERRLOG("Assigned QoS %" PRIu64 " on bdev %s is not multiple of %lu\n", 2978 qos_set, bdev->name, min_qos_set); 2979 SPDK_ERRLOG("Failed to enable QoS on this bdev %s\n", bdev->name); 2980 return; 2981 } 2982 2983 if (!bdev->internal.qos) { 2984 bdev->internal.qos = calloc(1, sizeof(*bdev->internal.qos)); 2985 if (!bdev->internal.qos) { 2986 SPDK_ERRLOG("Unable to allocate memory for QoS tracking\n"); 2987 return; 2988 } 2989 } 2990 2991 switch (qos_type) { 2992 case SPDK_BDEV_QOS_RW_IOPS_RATE_LIMIT: 2993 bdev->internal.qos->iops_rate_limit = qos_set; 2994 break; 2995 case SPDK_BDEV_QOS_RW_BYTEPS_RATE_LIMIT: 2996 bdev->internal.qos->byte_rate_limit = qos_set * 1024 * 1024; 2997 break; 2998 default: 2999 break; 3000 } 3001 3002 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Bdev:%s QoS type:%d set:%lu\n", 3003 bdev->name, qos_type, qos_set); 3004 3005 return; 3006 } 3007 3008 static void 3009 _spdk_bdev_qos_config(struct spdk_bdev *bdev) 3010 { 3011 struct spdk_conf_section *sp = NULL; 3012 const char *val = NULL; 3013 uint64_t qos_set = 0; 3014 int i = 0, j = 0; 3015 3016 sp = spdk_conf_find_section(NULL, "QoS"); 3017 if (!sp) { 3018 return; 3019 } 3020 3021 while (j < SPDK_BDEV_QOS_NUM_TYPES) { 3022 i = 0; 3023 while (true) { 3024 val = spdk_conf_section_get_nmval(sp, qos_type_str[j], i, 0); 3025 if (!val) { 3026 break; 3027 } 3028 3029 if (strcmp(bdev->name, val) != 0) { 3030 i++; 3031 continue; 3032 } 3033 3034 val = spdk_conf_section_get_nmval(sp, qos_type_str[j], i, 1); 3035 if (val) { 3036 qos_set = strtoull(val, NULL, 10); 3037 _spdk_bdev_qos_config_type(bdev, qos_set, j); 3038 } 3039 3040 break; 3041 } 3042 3043 j++; 3044 } 3045 3046 return; 3047 } 3048 3049 static int 3050 spdk_bdev_init(struct spdk_bdev *bdev) 3051 { 3052 assert(bdev->module != NULL); 3053 3054 if (!bdev->name) { 3055 SPDK_ERRLOG("Bdev name is NULL\n"); 3056 return -EINVAL; 3057 } 3058 3059 if (spdk_bdev_get_by_name(bdev->name)) { 3060 SPDK_ERRLOG("Bdev name:%s already exists\n", bdev->name); 3061 return -EEXIST; 3062 } 3063 3064 bdev->internal.status = SPDK_BDEV_STATUS_READY; 3065 bdev->internal.measured_queue_depth = UINT64_MAX; 3066 3067 TAILQ_INIT(&bdev->internal.open_descs); 3068 3069 TAILQ_INIT(&bdev->aliases); 3070 3071 bdev->internal.reset_in_progress = NULL; 3072 3073 _spdk_bdev_qos_config(bdev); 3074 3075 spdk_io_device_register(__bdev_to_io_dev(bdev), 3076 spdk_bdev_channel_create, spdk_bdev_channel_destroy, 3077 sizeof(struct spdk_bdev_channel)); 3078 3079 pthread_mutex_init(&bdev->internal.mutex, NULL); 3080 return 0; 3081 } 3082 3083 static void 3084 spdk_bdev_destroy_cb(void *io_device) 3085 { 3086 int rc; 3087 struct spdk_bdev *bdev; 3088 spdk_bdev_unregister_cb cb_fn; 3089 void *cb_arg; 3090 3091 bdev = __bdev_from_io_dev(io_device); 3092 cb_fn = bdev->internal.unregister_cb; 3093 cb_arg = bdev->internal.unregister_ctx; 3094 3095 rc = bdev->fn_table->destruct(bdev->ctxt); 3096 if (rc < 0) { 3097 SPDK_ERRLOG("destruct failed\n"); 3098 } 3099 if (rc <= 0 && cb_fn != NULL) { 3100 cb_fn(cb_arg, rc); 3101 } 3102 } 3103 3104 3105 static void 3106 spdk_bdev_fini(struct spdk_bdev *bdev) 3107 { 3108 pthread_mutex_destroy(&bdev->internal.mutex); 3109 3110 free(bdev->internal.qos); 3111 3112 spdk_io_device_unregister(__bdev_to_io_dev(bdev), spdk_bdev_destroy_cb); 3113 } 3114 3115 static void 3116 spdk_bdev_start(struct spdk_bdev *bdev) 3117 { 3118 struct spdk_bdev_module *module; 3119 uint32_t action; 3120 3121 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Inserting bdev %s into list\n", bdev->name); 3122 TAILQ_INSERT_TAIL(&g_bdev_mgr.bdevs, bdev, internal.link); 3123 3124 /* Examine configuration before initializing I/O */ 3125 TAILQ_FOREACH(module, &g_bdev_mgr.bdev_modules, internal.tailq) { 3126 if (module->examine_config) { 3127 action = module->internal.action_in_progress; 3128 module->internal.action_in_progress++; 3129 module->examine_config(bdev); 3130 if (action != module->internal.action_in_progress) { 3131 SPDK_ERRLOG("examine_config for module %s did not call spdk_bdev_module_examine_done()\n", 3132 module->name); 3133 } 3134 } 3135 } 3136 3137 if (bdev->internal.claim_module) { 3138 return; 3139 } 3140 3141 TAILQ_FOREACH(module, &g_bdev_mgr.bdev_modules, internal.tailq) { 3142 if (module->examine_disk) { 3143 module->internal.action_in_progress++; 3144 module->examine_disk(bdev); 3145 } 3146 } 3147 } 3148 3149 int 3150 spdk_bdev_register(struct spdk_bdev *bdev) 3151 { 3152 int rc = spdk_bdev_init(bdev); 3153 3154 if (rc == 0) { 3155 spdk_bdev_start(bdev); 3156 } 3157 3158 return rc; 3159 } 3160 3161 int 3162 spdk_vbdev_register(struct spdk_bdev *vbdev, struct spdk_bdev **base_bdevs, int base_bdev_count) 3163 { 3164 int rc; 3165 3166 rc = spdk_bdev_init(vbdev); 3167 if (rc) { 3168 return rc; 3169 } 3170 3171 spdk_bdev_start(vbdev); 3172 return 0; 3173 } 3174 3175 void 3176 spdk_bdev_destruct_done(struct spdk_bdev *bdev, int bdeverrno) 3177 { 3178 if (bdev->internal.unregister_cb != NULL) { 3179 bdev->internal.unregister_cb(bdev->internal.unregister_ctx, bdeverrno); 3180 } 3181 } 3182 3183 static void 3184 _remove_notify(void *arg) 3185 { 3186 struct spdk_bdev_desc *desc = arg; 3187 3188 desc->remove_cb(desc->remove_ctx); 3189 } 3190 3191 void 3192 spdk_bdev_unregister(struct spdk_bdev *bdev, spdk_bdev_unregister_cb cb_fn, void *cb_arg) 3193 { 3194 struct spdk_bdev_desc *desc, *tmp; 3195 bool do_destruct = true; 3196 struct spdk_thread *thread; 3197 3198 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Removing bdev %s from list\n", bdev->name); 3199 3200 thread = spdk_get_thread(); 3201 if (!thread) { 3202 /* The user called this from a non-SPDK thread. */ 3203 if (cb_fn != NULL) { 3204 cb_fn(cb_arg, -ENOTSUP); 3205 } 3206 return; 3207 } 3208 3209 pthread_mutex_lock(&bdev->internal.mutex); 3210 3211 bdev->internal.status = SPDK_BDEV_STATUS_REMOVING; 3212 bdev->internal.unregister_cb = cb_fn; 3213 bdev->internal.unregister_ctx = cb_arg; 3214 3215 TAILQ_FOREACH_SAFE(desc, &bdev->internal.open_descs, link, tmp) { 3216 if (desc->remove_cb) { 3217 do_destruct = false; 3218 /* 3219 * Defer invocation of the remove_cb to a separate message that will 3220 * run later on this thread. This ensures this context unwinds and 3221 * we don't recursively unregister this bdev again if the remove_cb 3222 * immediately closes its descriptor. 3223 */ 3224 if (!desc->remove_scheduled) { 3225 /* Avoid scheduling removal of the same descriptor multiple times. */ 3226 desc->remove_scheduled = true; 3227 spdk_thread_send_msg(thread, _remove_notify, desc); 3228 } 3229 } 3230 } 3231 3232 if (!do_destruct) { 3233 pthread_mutex_unlock(&bdev->internal.mutex); 3234 return; 3235 } 3236 3237 TAILQ_REMOVE(&g_bdev_mgr.bdevs, bdev, internal.link); 3238 pthread_mutex_unlock(&bdev->internal.mutex); 3239 3240 spdk_bdev_fini(bdev); 3241 } 3242 3243 int 3244 spdk_bdev_open(struct spdk_bdev *bdev, bool write, spdk_bdev_remove_cb_t remove_cb, 3245 void *remove_ctx, struct spdk_bdev_desc **_desc) 3246 { 3247 struct spdk_bdev_desc *desc; 3248 3249 desc = calloc(1, sizeof(*desc)); 3250 if (desc == NULL) { 3251 SPDK_ERRLOG("Failed to allocate memory for bdev descriptor\n"); 3252 return -ENOMEM; 3253 } 3254 3255 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Opening descriptor %p for bdev %s on thread %p\n", desc, bdev->name, 3256 spdk_get_thread()); 3257 3258 pthread_mutex_lock(&bdev->internal.mutex); 3259 3260 if (write && bdev->internal.claim_module) { 3261 SPDK_ERRLOG("Could not open %s - %s module already claimed it\n", 3262 bdev->name, bdev->internal.claim_module->name); 3263 free(desc); 3264 pthread_mutex_unlock(&bdev->internal.mutex); 3265 return -EPERM; 3266 } 3267 3268 TAILQ_INSERT_TAIL(&bdev->internal.open_descs, desc, link); 3269 3270 desc->bdev = bdev; 3271 desc->remove_cb = remove_cb; 3272 desc->remove_ctx = remove_ctx; 3273 desc->write = write; 3274 *_desc = desc; 3275 3276 pthread_mutex_unlock(&bdev->internal.mutex); 3277 3278 return 0; 3279 } 3280 3281 void 3282 spdk_bdev_close(struct spdk_bdev_desc *desc) 3283 { 3284 struct spdk_bdev *bdev = desc->bdev; 3285 bool do_unregister = false; 3286 3287 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Closing descriptor %p for bdev %s on thread %p\n", desc, bdev->name, 3288 spdk_get_thread()); 3289 3290 pthread_mutex_lock(&bdev->internal.mutex); 3291 3292 TAILQ_REMOVE(&bdev->internal.open_descs, desc, link); 3293 free(desc); 3294 3295 /* If no more descriptors, kill QoS channel */ 3296 if (bdev->internal.qos && TAILQ_EMPTY(&bdev->internal.open_descs)) { 3297 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Closed last descriptor for bdev %s on thread %p. Stopping QoS.\n", 3298 bdev->name, spdk_get_thread()); 3299 3300 if (spdk_bdev_qos_destroy(bdev)) { 3301 /* There isn't anything we can do to recover here. Just let the 3302 * old QoS poller keep running. The QoS handling won't change 3303 * cores when the user allocates a new channel, but it won't break. */ 3304 SPDK_ERRLOG("Unable to shut down QoS poller. It will continue running on the current thread.\n"); 3305 } 3306 } 3307 3308 spdk_bdev_set_qd_sampling_period(bdev, 0); 3309 3310 if (bdev->internal.status == SPDK_BDEV_STATUS_REMOVING && TAILQ_EMPTY(&bdev->internal.open_descs)) { 3311 do_unregister = true; 3312 } 3313 pthread_mutex_unlock(&bdev->internal.mutex); 3314 3315 if (do_unregister == true) { 3316 spdk_bdev_unregister(bdev, bdev->internal.unregister_cb, bdev->internal.unregister_ctx); 3317 } 3318 } 3319 3320 int 3321 spdk_bdev_module_claim_bdev(struct spdk_bdev *bdev, struct spdk_bdev_desc *desc, 3322 struct spdk_bdev_module *module) 3323 { 3324 if (bdev->internal.claim_module != NULL) { 3325 SPDK_ERRLOG("bdev %s already claimed by module %s\n", bdev->name, 3326 bdev->internal.claim_module->name); 3327 return -EPERM; 3328 } 3329 3330 if (desc && !desc->write) { 3331 desc->write = true; 3332 } 3333 3334 bdev->internal.claim_module = module; 3335 return 0; 3336 } 3337 3338 void 3339 spdk_bdev_module_release_bdev(struct spdk_bdev *bdev) 3340 { 3341 assert(bdev->internal.claim_module != NULL); 3342 bdev->internal.claim_module = NULL; 3343 } 3344 3345 struct spdk_bdev * 3346 spdk_bdev_desc_get_bdev(struct spdk_bdev_desc *desc) 3347 { 3348 return desc->bdev; 3349 } 3350 3351 void 3352 spdk_bdev_io_get_iovec(struct spdk_bdev_io *bdev_io, struct iovec **iovp, int *iovcntp) 3353 { 3354 struct iovec *iovs; 3355 int iovcnt; 3356 3357 if (bdev_io == NULL) { 3358 return; 3359 } 3360 3361 switch (bdev_io->type) { 3362 case SPDK_BDEV_IO_TYPE_READ: 3363 iovs = bdev_io->u.bdev.iovs; 3364 iovcnt = bdev_io->u.bdev.iovcnt; 3365 break; 3366 case SPDK_BDEV_IO_TYPE_WRITE: 3367 iovs = bdev_io->u.bdev.iovs; 3368 iovcnt = bdev_io->u.bdev.iovcnt; 3369 break; 3370 default: 3371 iovs = NULL; 3372 iovcnt = 0; 3373 break; 3374 } 3375 3376 if (iovp) { 3377 *iovp = iovs; 3378 } 3379 if (iovcntp) { 3380 *iovcntp = iovcnt; 3381 } 3382 } 3383 3384 void 3385 spdk_bdev_module_list_add(struct spdk_bdev_module *bdev_module) 3386 { 3387 3388 if (spdk_bdev_module_list_find(bdev_module->name)) { 3389 SPDK_ERRLOG("ERROR: module '%s' already registered.\n", bdev_module->name); 3390 assert(false); 3391 } 3392 3393 if (bdev_module->async_init) { 3394 bdev_module->internal.action_in_progress = 1; 3395 } 3396 3397 /* 3398 * Modules with examine callbacks must be initialized first, so they are 3399 * ready to handle examine callbacks from later modules that will 3400 * register physical bdevs. 3401 */ 3402 if (bdev_module->examine_config != NULL || bdev_module->examine_disk != NULL) { 3403 TAILQ_INSERT_HEAD(&g_bdev_mgr.bdev_modules, bdev_module, internal.tailq); 3404 } else { 3405 TAILQ_INSERT_TAIL(&g_bdev_mgr.bdev_modules, bdev_module, internal.tailq); 3406 } 3407 } 3408 3409 struct spdk_bdev_module * 3410 spdk_bdev_module_list_find(const char *name) 3411 { 3412 struct spdk_bdev_module *bdev_module; 3413 3414 TAILQ_FOREACH(bdev_module, &g_bdev_mgr.bdev_modules, internal.tailq) { 3415 if (strcmp(name, bdev_module->name) == 0) { 3416 break; 3417 } 3418 } 3419 3420 return bdev_module; 3421 } 3422 3423 static void 3424 _spdk_bdev_write_zero_buffer_next(void *_bdev_io) 3425 { 3426 struct spdk_bdev_io *bdev_io = _bdev_io; 3427 uint64_t num_bytes, num_blocks; 3428 int rc; 3429 3430 num_bytes = spdk_min(spdk_bdev_get_block_size(bdev_io->bdev) * 3431 bdev_io->u.bdev.split_remaining_num_blocks, 3432 ZERO_BUFFER_SIZE); 3433 num_blocks = num_bytes / spdk_bdev_get_block_size(bdev_io->bdev); 3434 3435 rc = spdk_bdev_write_blocks(bdev_io->internal.desc, 3436 spdk_io_channel_from_ctx(bdev_io->internal.ch), 3437 g_bdev_mgr.zero_buffer, 3438 bdev_io->u.bdev.split_current_offset_blocks, num_blocks, 3439 _spdk_bdev_write_zero_buffer_done, bdev_io); 3440 if (rc == 0) { 3441 bdev_io->u.bdev.split_remaining_num_blocks -= num_blocks; 3442 bdev_io->u.bdev.split_current_offset_blocks += num_blocks; 3443 } else if (rc == -ENOMEM) { 3444 bdev_io->internal.waitq_entry.bdev = bdev_io->bdev; 3445 bdev_io->internal.waitq_entry.cb_fn = _spdk_bdev_write_zero_buffer_next; 3446 bdev_io->internal.waitq_entry.cb_arg = bdev_io; 3447 spdk_bdev_queue_io_wait(bdev_io->bdev, spdk_io_channel_from_ctx(bdev_io->internal.ch), 3448 &bdev_io->internal.waitq_entry); 3449 } else { 3450 /* This should never happen. */ 3451 assert(false); 3452 bdev_io->internal.status = SPDK_BDEV_IO_STATUS_FAILED; 3453 bdev_io->internal.cb(bdev_io, SPDK_BDEV_IO_STATUS_FAILED, bdev_io->internal.caller_ctx); 3454 } 3455 } 3456 3457 static void 3458 _spdk_bdev_write_zero_buffer_done(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg) 3459 { 3460 struct spdk_bdev_io *parent_io = cb_arg; 3461 3462 if (!success) { 3463 parent_io->internal.status = SPDK_BDEV_IO_STATUS_FAILED; 3464 parent_io->internal.cb(parent_io, SPDK_BDEV_IO_STATUS_FAILED, parent_io->internal.caller_ctx); 3465 return; 3466 } 3467 3468 if (parent_io->u.bdev.split_remaining_num_blocks == 0) { 3469 parent_io->internal.status = SPDK_BDEV_IO_STATUS_SUCCESS; 3470 parent_io->internal.cb(parent_io, SPDK_BDEV_IO_STATUS_SUCCESS, parent_io->internal.caller_ctx); 3471 return; 3472 } 3473 3474 _spdk_bdev_write_zero_buffer_next(parent_io); 3475 } 3476 3477 struct set_qos_limit_ctx { 3478 void (*cb_fn)(void *cb_arg, int status); 3479 void *cb_arg; 3480 struct spdk_bdev *bdev; 3481 }; 3482 3483 static void 3484 _spdk_bdev_set_qos_limit_done(struct set_qos_limit_ctx *ctx, int status) 3485 { 3486 pthread_mutex_lock(&ctx->bdev->internal.mutex); 3487 ctx->bdev->internal.qos_mod_in_progress = false; 3488 pthread_mutex_unlock(&ctx->bdev->internal.mutex); 3489 3490 ctx->cb_fn(ctx->cb_arg, status); 3491 free(ctx); 3492 } 3493 3494 static void 3495 _spdk_bdev_disable_qos_done(void *cb_arg) 3496 { 3497 struct set_qos_limit_ctx *ctx = cb_arg; 3498 struct spdk_bdev *bdev = ctx->bdev; 3499 struct spdk_bdev_io *bdev_io; 3500 struct spdk_bdev_qos *qos; 3501 3502 pthread_mutex_lock(&bdev->internal.mutex); 3503 qos = bdev->internal.qos; 3504 bdev->internal.qos = NULL; 3505 pthread_mutex_unlock(&bdev->internal.mutex); 3506 3507 while (!TAILQ_EMPTY(&qos->queued)) { 3508 /* Send queued I/O back to their original thread for resubmission. */ 3509 bdev_io = TAILQ_FIRST(&qos->queued); 3510 TAILQ_REMOVE(&qos->queued, bdev_io, internal.link); 3511 3512 if (bdev_io->internal.io_submit_ch) { 3513 /* 3514 * Channel was changed when sending it to the QoS thread - change it back 3515 * before sending it back to the original thread. 3516 */ 3517 bdev_io->internal.ch = bdev_io->internal.io_submit_ch; 3518 bdev_io->internal.io_submit_ch = NULL; 3519 } 3520 3521 spdk_thread_send_msg(spdk_io_channel_get_thread(bdev_io->internal.ch->channel), 3522 _spdk_bdev_io_submit, bdev_io); 3523 } 3524 3525 spdk_put_io_channel(spdk_io_channel_from_ctx(qos->ch)); 3526 spdk_poller_unregister(&qos->poller); 3527 3528 free(qos); 3529 3530 _spdk_bdev_set_qos_limit_done(ctx, 0); 3531 } 3532 3533 static void 3534 _spdk_bdev_disable_qos_msg_done(struct spdk_io_channel_iter *i, int status) 3535 { 3536 void *io_device = spdk_io_channel_iter_get_io_device(i); 3537 struct spdk_bdev *bdev = __bdev_from_io_dev(io_device); 3538 struct set_qos_limit_ctx *ctx = spdk_io_channel_iter_get_ctx(i); 3539 struct spdk_thread *thread; 3540 3541 pthread_mutex_lock(&bdev->internal.mutex); 3542 thread = bdev->internal.qos->thread; 3543 pthread_mutex_unlock(&bdev->internal.mutex); 3544 3545 spdk_thread_send_msg(thread, _spdk_bdev_disable_qos_done, ctx); 3546 } 3547 3548 static void 3549 _spdk_bdev_disable_qos_msg(struct spdk_io_channel_iter *i) 3550 { 3551 struct spdk_io_channel *ch = spdk_io_channel_iter_get_channel(i); 3552 struct spdk_bdev_channel *bdev_ch = spdk_io_channel_get_ctx(ch); 3553 3554 bdev_ch->flags &= ~BDEV_CH_QOS_ENABLED; 3555 3556 spdk_for_each_channel_continue(i, 0); 3557 } 3558 3559 static void 3560 _spdk_bdev_update_qos_limit_iops_msg(void *cb_arg) 3561 { 3562 struct set_qos_limit_ctx *ctx = cb_arg; 3563 struct spdk_bdev *bdev = ctx->bdev; 3564 3565 pthread_mutex_lock(&bdev->internal.mutex); 3566 spdk_bdev_qos_update_max_quota_per_timeslice(bdev->internal.qos); 3567 pthread_mutex_unlock(&bdev->internal.mutex); 3568 3569 _spdk_bdev_set_qos_limit_done(ctx, 0); 3570 } 3571 3572 static void 3573 _spdk_bdev_enable_qos_msg(struct spdk_io_channel_iter *i) 3574 { 3575 void *io_device = spdk_io_channel_iter_get_io_device(i); 3576 struct spdk_bdev *bdev = __bdev_from_io_dev(io_device); 3577 struct spdk_io_channel *ch = spdk_io_channel_iter_get_channel(i); 3578 struct spdk_bdev_channel *bdev_ch = spdk_io_channel_get_ctx(ch); 3579 3580 pthread_mutex_lock(&bdev->internal.mutex); 3581 _spdk_bdev_enable_qos(bdev, bdev_ch); 3582 pthread_mutex_unlock(&bdev->internal.mutex); 3583 spdk_for_each_channel_continue(i, 0); 3584 } 3585 3586 static void 3587 _spdk_bdev_enable_qos_done(struct spdk_io_channel_iter *i, int status) 3588 { 3589 struct set_qos_limit_ctx *ctx = spdk_io_channel_iter_get_ctx(i); 3590 3591 _spdk_bdev_set_qos_limit_done(ctx, status); 3592 } 3593 3594 void 3595 spdk_bdev_set_qos_limit_iops(struct spdk_bdev *bdev, uint64_t ios_per_sec, 3596 void (*cb_fn)(void *cb_arg, int status), void *cb_arg) 3597 { 3598 struct set_qos_limit_ctx *ctx; 3599 3600 if (ios_per_sec > 0 && ios_per_sec % SPDK_BDEV_QOS_MIN_IOS_PER_SEC) { 3601 SPDK_ERRLOG("Requested ios_per_sec limit %" PRIu64 " is not a multiple of %u\n", 3602 ios_per_sec, SPDK_BDEV_QOS_MIN_IOS_PER_SEC); 3603 cb_fn(cb_arg, -EINVAL); 3604 return; 3605 } 3606 3607 ctx = calloc(1, sizeof(*ctx)); 3608 if (ctx == NULL) { 3609 cb_fn(cb_arg, -ENOMEM); 3610 return; 3611 } 3612 3613 ctx->cb_fn = cb_fn; 3614 ctx->cb_arg = cb_arg; 3615 ctx->bdev = bdev; 3616 3617 pthread_mutex_lock(&bdev->internal.mutex); 3618 if (bdev->internal.qos_mod_in_progress) { 3619 pthread_mutex_unlock(&bdev->internal.mutex); 3620 free(ctx); 3621 cb_fn(cb_arg, -EAGAIN); 3622 return; 3623 } 3624 bdev->internal.qos_mod_in_progress = true; 3625 3626 if (ios_per_sec > 0) { 3627 if (bdev->internal.qos == NULL) { 3628 /* Enabling */ 3629 bdev->internal.qos = calloc(1, sizeof(*bdev->internal.qos)); 3630 if (!bdev->internal.qos) { 3631 pthread_mutex_unlock(&bdev->internal.mutex); 3632 SPDK_ERRLOG("Unable to allocate memory for QoS tracking\n"); 3633 free(ctx); 3634 cb_fn(cb_arg, -ENOMEM); 3635 return; 3636 } 3637 3638 bdev->internal.qos->iops_rate_limit = ios_per_sec; 3639 spdk_for_each_channel(__bdev_to_io_dev(bdev), 3640 _spdk_bdev_enable_qos_msg, ctx, 3641 _spdk_bdev_enable_qos_done); 3642 } else { 3643 /* Updating */ 3644 bdev->internal.qos->iops_rate_limit = ios_per_sec; 3645 spdk_thread_send_msg(bdev->internal.qos->thread, _spdk_bdev_update_qos_limit_iops_msg, ctx); 3646 } 3647 } else { 3648 if (bdev->internal.qos != NULL) { 3649 /* Disabling */ 3650 spdk_for_each_channel(__bdev_to_io_dev(bdev), 3651 _spdk_bdev_disable_qos_msg, ctx, 3652 _spdk_bdev_disable_qos_msg_done); 3653 } else { 3654 pthread_mutex_unlock(&bdev->internal.mutex); 3655 _spdk_bdev_set_qos_limit_done(ctx, 0); 3656 return; 3657 } 3658 } 3659 3660 pthread_mutex_unlock(&bdev->internal.mutex); 3661 } 3662 3663 SPDK_LOG_REGISTER_COMPONENT("bdev", SPDK_LOG_BDEV) 3664