1 /*- 2 * BSD LICENSE 3 * 4 * Copyright (c) Intel Corporation. 5 * All rights reserved. 6 * 7 * Redistribution and use in source and binary forms, with or without 8 * modification, are permitted provided that the following conditions 9 * are met: 10 * 11 * * Redistributions of source code must retain the above copyright 12 * notice, this list of conditions and the following disclaimer. 13 * * Redistributions in binary form must reproduce the above copyright 14 * notice, this list of conditions and the following disclaimer in 15 * the documentation and/or other materials provided with the 16 * distribution. 17 * * Neither the name of Intel Corporation nor the names of its 18 * contributors may be used to endorse or promote products derived 19 * from this software without specific prior written permission. 20 * 21 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 22 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 23 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR 24 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT 25 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, 26 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT 27 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 28 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY 29 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 30 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE 31 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 32 */ 33 34 #include "spdk/stdinc.h" 35 36 #include "spdk/bdev.h" 37 #include "spdk/conf.h" 38 39 #include "spdk/env.h" 40 #include "spdk/event.h" 41 #include "spdk/thread.h" 42 #include "spdk/likely.h" 43 #include "spdk/queue.h" 44 #include "spdk/nvme_spec.h" 45 #include "spdk/scsi_spec.h" 46 #include "spdk/util.h" 47 #include "spdk/trace.h" 48 49 #include "spdk/bdev_module.h" 50 #include "spdk_internal/log.h" 51 #include "spdk/string.h" 52 53 #ifdef SPDK_CONFIG_VTUNE 54 #include "ittnotify.h" 55 #include "ittnotify_types.h" 56 int __itt_init_ittlib(const char *, __itt_group_id); 57 #endif 58 59 #define SPDK_BDEV_IO_POOL_SIZE (64 * 1024) 60 #define SPDK_BDEV_IO_CACHE_SIZE 256 61 #define BUF_SMALL_POOL_SIZE 8192 62 #define BUF_LARGE_POOL_SIZE 1024 63 #define NOMEM_THRESHOLD_COUNT 8 64 #define ZERO_BUFFER_SIZE 0x100000 65 66 #define OWNER_BDEV 0x2 67 68 #define OBJECT_BDEV_IO 0x2 69 70 #define TRACE_GROUP_BDEV 0x3 71 #define TRACE_BDEV_IO_START SPDK_TPOINT_ID(TRACE_GROUP_BDEV, 0x0) 72 #define TRACE_BDEV_IO_DONE SPDK_TPOINT_ID(TRACE_GROUP_BDEV, 0x1) 73 74 #define SPDK_BDEV_QOS_TIMESLICE_IN_USEC 1000 75 #define SPDK_BDEV_QOS_MIN_IO_PER_TIMESLICE 1 76 #define SPDK_BDEV_QOS_MIN_BYTE_PER_TIMESLICE 512 77 #define SPDK_BDEV_QOS_MIN_IOS_PER_SEC 10000 78 #define SPDK_BDEV_QOS_MIN_BW_IN_MB_PER_SEC 10 79 80 enum spdk_bdev_qos_type { 81 SPDK_BDEV_QOS_RW_IOPS_RATE_LIMIT = 0, 82 SPDK_BDEV_QOS_RW_BYTEPS_RATE_LIMIT, 83 SPDK_BDEV_QOS_NUM_TYPES /* Keep last */ 84 }; 85 86 static const char *qos_type_str[SPDK_BDEV_QOS_NUM_TYPES] = {"Limit_IOPS", "Limit_BWPS"}; 87 88 TAILQ_HEAD(spdk_bdev_list, spdk_bdev); 89 90 struct spdk_bdev_mgr { 91 struct spdk_mempool *bdev_io_pool; 92 93 struct spdk_mempool *buf_small_pool; 94 struct spdk_mempool *buf_large_pool; 95 96 void *zero_buffer; 97 98 TAILQ_HEAD(, spdk_bdev_module) bdev_modules; 99 100 struct spdk_bdev_list bdevs; 101 102 bool init_complete; 103 bool module_init_complete; 104 105 #ifdef SPDK_CONFIG_VTUNE 106 __itt_domain *domain; 107 #endif 108 }; 109 110 static struct spdk_bdev_mgr g_bdev_mgr = { 111 .bdev_modules = TAILQ_HEAD_INITIALIZER(g_bdev_mgr.bdev_modules), 112 .bdevs = TAILQ_HEAD_INITIALIZER(g_bdev_mgr.bdevs), 113 .init_complete = false, 114 .module_init_complete = false, 115 }; 116 117 static struct spdk_bdev_opts g_bdev_opts = { 118 .bdev_io_pool_size = SPDK_BDEV_IO_POOL_SIZE, 119 .bdev_io_cache_size = SPDK_BDEV_IO_CACHE_SIZE, 120 }; 121 122 static spdk_bdev_init_cb g_init_cb_fn = NULL; 123 static void *g_init_cb_arg = NULL; 124 125 static spdk_bdev_fini_cb g_fini_cb_fn = NULL; 126 static void *g_fini_cb_arg = NULL; 127 static struct spdk_thread *g_fini_thread = NULL; 128 129 struct spdk_bdev_qos { 130 /** Rate limit, in I/O per second */ 131 uint64_t iops_rate_limit; 132 133 /** Rate limit, in byte per second */ 134 uint64_t byte_rate_limit; 135 136 /** The channel that all I/O are funneled through */ 137 struct spdk_bdev_channel *ch; 138 139 /** The thread on which the poller is running. */ 140 struct spdk_thread *thread; 141 142 /** Queue of I/O waiting to be issued. */ 143 bdev_io_tailq_t queued; 144 145 /** Size of a timeslice in tsc ticks. */ 146 uint64_t timeslice_size; 147 148 /** Timestamp of start of last timeslice. */ 149 uint64_t last_timeslice; 150 151 /** Maximum allowed IOs to be issued in one timeslice (e.g., 1ms) and 152 * only valid for the master channel which manages the outstanding IOs. */ 153 uint64_t max_ios_per_timeslice; 154 155 /** Maximum allowed bytes to be issued in one timeslice (e.g., 1ms) and 156 * only valid for the master channel which manages the outstanding IOs. */ 157 uint64_t max_byte_per_timeslice; 158 159 /** Remaining IO allowed in current timeslice (e.g., 1ms) */ 160 uint64_t io_remaining_this_timeslice; 161 162 /** Remaining bytes allowed in current timeslice (e.g., 1ms). 163 * Allowed to run negative if an I/O is submitted when some bytes are remaining, 164 * but the I/O is bigger than that amount. The excess will be deducted from the 165 * next timeslice. 166 */ 167 int64_t byte_remaining_this_timeslice; 168 169 /** Poller that processes queued I/O commands each time slice. */ 170 struct spdk_poller *poller; 171 }; 172 173 struct spdk_bdev_mgmt_channel { 174 bdev_io_stailq_t need_buf_small; 175 bdev_io_stailq_t need_buf_large; 176 177 /* 178 * Each thread keeps a cache of bdev_io - this allows 179 * bdev threads which are *not* DPDK threads to still 180 * benefit from a per-thread bdev_io cache. Without 181 * this, non-DPDK threads fetching from the mempool 182 * incur a cmpxchg on get and put. 183 */ 184 bdev_io_stailq_t per_thread_cache; 185 uint32_t per_thread_cache_count; 186 uint32_t bdev_io_cache_size; 187 188 TAILQ_HEAD(, spdk_bdev_shared_resource) shared_resources; 189 TAILQ_HEAD(, spdk_bdev_io_wait_entry) io_wait_queue; 190 }; 191 192 /* 193 * Per-module (or per-io_device) data. Multiple bdevs built on the same io_device 194 * will queue here their IO that awaits retry. It makes it possible to retry sending 195 * IO to one bdev after IO from other bdev completes. 196 */ 197 struct spdk_bdev_shared_resource { 198 /* The bdev management channel */ 199 struct spdk_bdev_mgmt_channel *mgmt_ch; 200 201 /* 202 * Count of I/O submitted to bdev module and waiting for completion. 203 * Incremented before submit_request() is called on an spdk_bdev_io. 204 */ 205 uint64_t io_outstanding; 206 207 /* 208 * Queue of IO awaiting retry because of a previous NOMEM status returned 209 * on this channel. 210 */ 211 bdev_io_tailq_t nomem_io; 212 213 /* 214 * Threshold which io_outstanding must drop to before retrying nomem_io. 215 */ 216 uint64_t nomem_threshold; 217 218 /* I/O channel allocated by a bdev module */ 219 struct spdk_io_channel *shared_ch; 220 221 /* Refcount of bdev channels using this resource */ 222 uint32_t ref; 223 224 TAILQ_ENTRY(spdk_bdev_shared_resource) link; 225 }; 226 227 #define BDEV_CH_RESET_IN_PROGRESS (1 << 0) 228 #define BDEV_CH_QOS_ENABLED (1 << 1) 229 230 struct spdk_bdev_channel { 231 struct spdk_bdev *bdev; 232 233 /* The channel for the underlying device */ 234 struct spdk_io_channel *channel; 235 236 /* Per io_device per thread data */ 237 struct spdk_bdev_shared_resource *shared_resource; 238 239 struct spdk_bdev_io_stat stat; 240 241 /* 242 * Count of I/O submitted through this channel and waiting for completion. 243 * Incremented before submit_request() is called on an spdk_bdev_io. 244 */ 245 uint64_t io_outstanding; 246 247 bdev_io_tailq_t queued_resets; 248 249 uint32_t flags; 250 251 #ifdef SPDK_CONFIG_VTUNE 252 uint64_t start_tsc; 253 uint64_t interval_tsc; 254 __itt_string_handle *handle; 255 struct spdk_bdev_io_stat prev_stat; 256 #endif 257 258 }; 259 260 struct spdk_bdev_desc { 261 struct spdk_bdev *bdev; 262 struct spdk_thread *thread; 263 spdk_bdev_remove_cb_t remove_cb; 264 void *remove_ctx; 265 bool remove_scheduled; 266 bool closed; 267 bool write; 268 TAILQ_ENTRY(spdk_bdev_desc) link; 269 }; 270 271 struct spdk_bdev_iostat_ctx { 272 struct spdk_bdev_io_stat *stat; 273 spdk_bdev_get_device_stat_cb cb; 274 void *cb_arg; 275 }; 276 277 #define __bdev_to_io_dev(bdev) (((char *)bdev) + 1) 278 #define __bdev_from_io_dev(io_dev) ((struct spdk_bdev *)(((char *)io_dev) - 1)) 279 280 static void _spdk_bdev_write_zero_buffer_done(struct spdk_bdev_io *bdev_io, bool success, 281 void *cb_arg); 282 static void _spdk_bdev_write_zero_buffer_next(void *_bdev_io); 283 284 void 285 spdk_bdev_get_opts(struct spdk_bdev_opts *opts) 286 { 287 *opts = g_bdev_opts; 288 } 289 290 int 291 spdk_bdev_set_opts(struct spdk_bdev_opts *opts) 292 { 293 uint32_t min_pool_size; 294 295 /* 296 * Add 1 to the thread count to account for the extra mgmt_ch that gets created during subsystem 297 * initialization. A second mgmt_ch will be created on the same thread when the application starts 298 * but before the deferred put_io_channel event is executed for the first mgmt_ch. 299 */ 300 min_pool_size = opts->bdev_io_cache_size * (spdk_thread_get_count() + 1); 301 if (opts->bdev_io_pool_size < min_pool_size) { 302 SPDK_ERRLOG("bdev_io_pool_size %" PRIu32 " is not compatible with bdev_io_cache_size %" PRIu32 303 " and %" PRIu32 " threads\n", opts->bdev_io_pool_size, opts->bdev_io_cache_size, 304 spdk_thread_get_count()); 305 SPDK_ERRLOG("bdev_io_pool_size must be at least %" PRIu32 "\n", min_pool_size); 306 return -1; 307 } 308 309 g_bdev_opts = *opts; 310 return 0; 311 } 312 313 struct spdk_bdev * 314 spdk_bdev_first(void) 315 { 316 struct spdk_bdev *bdev; 317 318 bdev = TAILQ_FIRST(&g_bdev_mgr.bdevs); 319 if (bdev) { 320 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Starting bdev iteration at %s\n", bdev->name); 321 } 322 323 return bdev; 324 } 325 326 struct spdk_bdev * 327 spdk_bdev_next(struct spdk_bdev *prev) 328 { 329 struct spdk_bdev *bdev; 330 331 bdev = TAILQ_NEXT(prev, internal.link); 332 if (bdev) { 333 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Continuing bdev iteration at %s\n", bdev->name); 334 } 335 336 return bdev; 337 } 338 339 static struct spdk_bdev * 340 _bdev_next_leaf(struct spdk_bdev *bdev) 341 { 342 while (bdev != NULL) { 343 if (bdev->internal.claim_module == NULL) { 344 return bdev; 345 } else { 346 bdev = TAILQ_NEXT(bdev, internal.link); 347 } 348 } 349 350 return bdev; 351 } 352 353 struct spdk_bdev * 354 spdk_bdev_first_leaf(void) 355 { 356 struct spdk_bdev *bdev; 357 358 bdev = _bdev_next_leaf(TAILQ_FIRST(&g_bdev_mgr.bdevs)); 359 360 if (bdev) { 361 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Starting bdev iteration at %s\n", bdev->name); 362 } 363 364 return bdev; 365 } 366 367 struct spdk_bdev * 368 spdk_bdev_next_leaf(struct spdk_bdev *prev) 369 { 370 struct spdk_bdev *bdev; 371 372 bdev = _bdev_next_leaf(TAILQ_NEXT(prev, internal.link)); 373 374 if (bdev) { 375 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Continuing bdev iteration at %s\n", bdev->name); 376 } 377 378 return bdev; 379 } 380 381 struct spdk_bdev * 382 spdk_bdev_get_by_name(const char *bdev_name) 383 { 384 struct spdk_bdev_alias *tmp; 385 struct spdk_bdev *bdev = spdk_bdev_first(); 386 387 while (bdev != NULL) { 388 if (strcmp(bdev_name, bdev->name) == 0) { 389 return bdev; 390 } 391 392 TAILQ_FOREACH(tmp, &bdev->aliases, tailq) { 393 if (strcmp(bdev_name, tmp->alias) == 0) { 394 return bdev; 395 } 396 } 397 398 bdev = spdk_bdev_next(bdev); 399 } 400 401 return NULL; 402 } 403 404 void 405 spdk_bdev_io_set_buf(struct spdk_bdev_io *bdev_io, void *buf, size_t len) 406 { 407 struct iovec *iovs; 408 409 iovs = bdev_io->u.bdev.iovs; 410 411 assert(iovs != NULL); 412 assert(bdev_io->u.bdev.iovcnt >= 1); 413 414 iovs[0].iov_base = buf; 415 iovs[0].iov_len = len; 416 } 417 418 static void 419 spdk_bdev_io_put_buf(struct spdk_bdev_io *bdev_io) 420 { 421 struct spdk_mempool *pool; 422 struct spdk_bdev_io *tmp; 423 void *buf, *aligned_buf; 424 bdev_io_stailq_t *stailq; 425 struct spdk_bdev_mgmt_channel *ch; 426 427 assert(bdev_io->u.bdev.iovcnt == 1); 428 429 buf = bdev_io->internal.buf; 430 ch = bdev_io->internal.ch->shared_resource->mgmt_ch; 431 432 bdev_io->internal.buf = NULL; 433 434 if (bdev_io->internal.buf_len <= SPDK_BDEV_SMALL_BUF_MAX_SIZE) { 435 pool = g_bdev_mgr.buf_small_pool; 436 stailq = &ch->need_buf_small; 437 } else { 438 pool = g_bdev_mgr.buf_large_pool; 439 stailq = &ch->need_buf_large; 440 } 441 442 if (STAILQ_EMPTY(stailq)) { 443 spdk_mempool_put(pool, buf); 444 } else { 445 tmp = STAILQ_FIRST(stailq); 446 447 aligned_buf = (void *)(((uintptr_t)buf + 511) & ~511UL); 448 spdk_bdev_io_set_buf(tmp, aligned_buf, tmp->internal.buf_len); 449 450 STAILQ_REMOVE_HEAD(stailq, internal.buf_link); 451 tmp->internal.buf = buf; 452 tmp->internal.get_buf_cb(tmp->internal.ch->channel, tmp); 453 } 454 } 455 456 void 457 spdk_bdev_io_get_buf(struct spdk_bdev_io *bdev_io, spdk_bdev_io_get_buf_cb cb, uint64_t len) 458 { 459 struct spdk_mempool *pool; 460 bdev_io_stailq_t *stailq; 461 void *buf, *aligned_buf; 462 struct spdk_bdev_mgmt_channel *mgmt_ch; 463 464 assert(cb != NULL); 465 assert(bdev_io->u.bdev.iovs != NULL); 466 467 if (spdk_unlikely(bdev_io->u.bdev.iovs[0].iov_base != NULL)) { 468 /* Buffer already present */ 469 cb(bdev_io->internal.ch->channel, bdev_io); 470 return; 471 } 472 473 assert(len <= SPDK_BDEV_LARGE_BUF_MAX_SIZE); 474 mgmt_ch = bdev_io->internal.ch->shared_resource->mgmt_ch; 475 476 bdev_io->internal.buf_len = len; 477 bdev_io->internal.get_buf_cb = cb; 478 if (len <= SPDK_BDEV_SMALL_BUF_MAX_SIZE) { 479 pool = g_bdev_mgr.buf_small_pool; 480 stailq = &mgmt_ch->need_buf_small; 481 } else { 482 pool = g_bdev_mgr.buf_large_pool; 483 stailq = &mgmt_ch->need_buf_large; 484 } 485 486 buf = spdk_mempool_get(pool); 487 488 if (!buf) { 489 STAILQ_INSERT_TAIL(stailq, bdev_io, internal.buf_link); 490 } else { 491 aligned_buf = (void *)(((uintptr_t)buf + 511) & ~511UL); 492 spdk_bdev_io_set_buf(bdev_io, aligned_buf, len); 493 494 bdev_io->internal.buf = buf; 495 bdev_io->internal.get_buf_cb(bdev_io->internal.ch->channel, bdev_io); 496 } 497 } 498 499 static int 500 spdk_bdev_module_get_max_ctx_size(void) 501 { 502 struct spdk_bdev_module *bdev_module; 503 int max_bdev_module_size = 0; 504 505 TAILQ_FOREACH(bdev_module, &g_bdev_mgr.bdev_modules, internal.tailq) { 506 if (bdev_module->get_ctx_size && bdev_module->get_ctx_size() > max_bdev_module_size) { 507 max_bdev_module_size = bdev_module->get_ctx_size(); 508 } 509 } 510 511 return max_bdev_module_size; 512 } 513 514 void 515 spdk_bdev_config_text(FILE *fp) 516 { 517 struct spdk_bdev_module *bdev_module; 518 519 TAILQ_FOREACH(bdev_module, &g_bdev_mgr.bdev_modules, internal.tailq) { 520 if (bdev_module->config_text) { 521 bdev_module->config_text(fp); 522 } 523 } 524 } 525 526 void 527 spdk_bdev_subsystem_config_json(struct spdk_json_write_ctx *w) 528 { 529 struct spdk_bdev_module *bdev_module; 530 struct spdk_bdev *bdev; 531 532 assert(w != NULL); 533 534 spdk_json_write_array_begin(w); 535 536 spdk_json_write_object_begin(w); 537 spdk_json_write_named_string(w, "method", "set_bdev_options"); 538 spdk_json_write_name(w, "params"); 539 spdk_json_write_object_begin(w); 540 spdk_json_write_named_uint32(w, "bdev_io_pool_size", g_bdev_opts.bdev_io_pool_size); 541 spdk_json_write_named_uint32(w, "bdev_io_cache_size", g_bdev_opts.bdev_io_cache_size); 542 spdk_json_write_object_end(w); 543 spdk_json_write_object_end(w); 544 545 TAILQ_FOREACH(bdev_module, &g_bdev_mgr.bdev_modules, internal.tailq) { 546 if (bdev_module->config_json) { 547 bdev_module->config_json(w); 548 } 549 } 550 551 TAILQ_FOREACH(bdev, &g_bdev_mgr.bdevs, internal.link) { 552 if (bdev->fn_table->write_config_json) { 553 bdev->fn_table->write_config_json(bdev, w); 554 } 555 } 556 557 spdk_json_write_array_end(w); 558 } 559 560 static int 561 spdk_bdev_mgmt_channel_create(void *io_device, void *ctx_buf) 562 { 563 struct spdk_bdev_mgmt_channel *ch = ctx_buf; 564 struct spdk_bdev_io *bdev_io; 565 uint32_t i; 566 567 STAILQ_INIT(&ch->need_buf_small); 568 STAILQ_INIT(&ch->need_buf_large); 569 570 STAILQ_INIT(&ch->per_thread_cache); 571 ch->bdev_io_cache_size = g_bdev_opts.bdev_io_cache_size; 572 573 /* Pre-populate bdev_io cache to ensure this thread cannot be starved. */ 574 ch->per_thread_cache_count = 0; 575 for (i = 0; i < ch->bdev_io_cache_size; i++) { 576 bdev_io = spdk_mempool_get(g_bdev_mgr.bdev_io_pool); 577 assert(bdev_io != NULL); 578 ch->per_thread_cache_count++; 579 STAILQ_INSERT_TAIL(&ch->per_thread_cache, bdev_io, internal.buf_link); 580 } 581 582 TAILQ_INIT(&ch->shared_resources); 583 TAILQ_INIT(&ch->io_wait_queue); 584 585 return 0; 586 } 587 588 static void 589 spdk_bdev_mgmt_channel_destroy(void *io_device, void *ctx_buf) 590 { 591 struct spdk_bdev_mgmt_channel *ch = ctx_buf; 592 struct spdk_bdev_io *bdev_io; 593 594 if (!STAILQ_EMPTY(&ch->need_buf_small) || !STAILQ_EMPTY(&ch->need_buf_large)) { 595 SPDK_ERRLOG("Pending I/O list wasn't empty on mgmt channel free\n"); 596 } 597 598 if (!TAILQ_EMPTY(&ch->shared_resources)) { 599 SPDK_ERRLOG("Module channel list wasn't empty on mgmt channel free\n"); 600 } 601 602 while (!STAILQ_EMPTY(&ch->per_thread_cache)) { 603 bdev_io = STAILQ_FIRST(&ch->per_thread_cache); 604 STAILQ_REMOVE_HEAD(&ch->per_thread_cache, internal.buf_link); 605 ch->per_thread_cache_count--; 606 spdk_mempool_put(g_bdev_mgr.bdev_io_pool, (void *)bdev_io); 607 } 608 609 assert(ch->per_thread_cache_count == 0); 610 } 611 612 static void 613 spdk_bdev_init_complete(int rc) 614 { 615 spdk_bdev_init_cb cb_fn = g_init_cb_fn; 616 void *cb_arg = g_init_cb_arg; 617 struct spdk_bdev_module *m; 618 619 g_bdev_mgr.init_complete = true; 620 g_init_cb_fn = NULL; 621 g_init_cb_arg = NULL; 622 623 /* 624 * For modules that need to know when subsystem init is complete, 625 * inform them now. 626 */ 627 if (rc == 0) { 628 TAILQ_FOREACH(m, &g_bdev_mgr.bdev_modules, internal.tailq) { 629 if (m->init_complete) { 630 m->init_complete(); 631 } 632 } 633 } 634 635 cb_fn(cb_arg, rc); 636 } 637 638 static void 639 spdk_bdev_module_action_complete(void) 640 { 641 struct spdk_bdev_module *m; 642 643 /* 644 * Don't finish bdev subsystem initialization if 645 * module pre-initialization is still in progress, or 646 * the subsystem been already initialized. 647 */ 648 if (!g_bdev_mgr.module_init_complete || g_bdev_mgr.init_complete) { 649 return; 650 } 651 652 /* 653 * Check all bdev modules for inits/examinations in progress. If any 654 * exist, return immediately since we cannot finish bdev subsystem 655 * initialization until all are completed. 656 */ 657 TAILQ_FOREACH(m, &g_bdev_mgr.bdev_modules, internal.tailq) { 658 if (m->internal.action_in_progress > 0) { 659 return; 660 } 661 } 662 663 /* 664 * Modules already finished initialization - now that all 665 * the bdev modules have finished their asynchronous I/O 666 * processing, the entire bdev layer can be marked as complete. 667 */ 668 spdk_bdev_init_complete(0); 669 } 670 671 static void 672 spdk_bdev_module_action_done(struct spdk_bdev_module *module) 673 { 674 assert(module->internal.action_in_progress > 0); 675 module->internal.action_in_progress--; 676 spdk_bdev_module_action_complete(); 677 } 678 679 void 680 spdk_bdev_module_init_done(struct spdk_bdev_module *module) 681 { 682 spdk_bdev_module_action_done(module); 683 } 684 685 void 686 spdk_bdev_module_examine_done(struct spdk_bdev_module *module) 687 { 688 spdk_bdev_module_action_done(module); 689 } 690 691 static int 692 spdk_bdev_modules_init(void) 693 { 694 struct spdk_bdev_module *module; 695 int rc = 0; 696 697 TAILQ_FOREACH(module, &g_bdev_mgr.bdev_modules, internal.tailq) { 698 rc = module->module_init(); 699 if (rc != 0) { 700 break; 701 } 702 } 703 704 g_bdev_mgr.module_init_complete = true; 705 return rc; 706 } 707 708 709 static void 710 spdk_bdev_init_failed_complete(void *cb_arg) 711 { 712 spdk_bdev_init_complete(-1); 713 } 714 715 static void 716 spdk_bdev_init_failed(void *cb_arg) 717 { 718 spdk_bdev_finish(spdk_bdev_init_failed_complete, NULL); 719 } 720 721 void 722 spdk_bdev_initialize(spdk_bdev_init_cb cb_fn, void *cb_arg) 723 { 724 struct spdk_conf_section *sp; 725 struct spdk_bdev_opts bdev_opts; 726 int32_t bdev_io_pool_size, bdev_io_cache_size; 727 int cache_size; 728 int rc = 0; 729 char mempool_name[32]; 730 731 assert(cb_fn != NULL); 732 733 sp = spdk_conf_find_section(NULL, "Bdev"); 734 if (sp != NULL) { 735 spdk_bdev_get_opts(&bdev_opts); 736 737 bdev_io_pool_size = spdk_conf_section_get_intval(sp, "BdevIoPoolSize"); 738 if (bdev_io_pool_size >= 0) { 739 bdev_opts.bdev_io_pool_size = bdev_io_pool_size; 740 } 741 742 bdev_io_cache_size = spdk_conf_section_get_intval(sp, "BdevIoCacheSize"); 743 if (bdev_io_cache_size >= 0) { 744 bdev_opts.bdev_io_cache_size = bdev_io_cache_size; 745 } 746 747 if (spdk_bdev_set_opts(&bdev_opts)) { 748 spdk_bdev_init_complete(-1); 749 return; 750 } 751 752 assert(memcmp(&bdev_opts, &g_bdev_opts, sizeof(bdev_opts)) == 0); 753 } 754 755 g_init_cb_fn = cb_fn; 756 g_init_cb_arg = cb_arg; 757 758 snprintf(mempool_name, sizeof(mempool_name), "bdev_io_%d", getpid()); 759 760 g_bdev_mgr.bdev_io_pool = spdk_mempool_create(mempool_name, 761 g_bdev_opts.bdev_io_pool_size, 762 sizeof(struct spdk_bdev_io) + 763 spdk_bdev_module_get_max_ctx_size(), 764 0, 765 SPDK_ENV_SOCKET_ID_ANY); 766 767 if (g_bdev_mgr.bdev_io_pool == NULL) { 768 SPDK_ERRLOG("could not allocate spdk_bdev_io pool\n"); 769 spdk_bdev_init_complete(-1); 770 return; 771 } 772 773 /** 774 * Ensure no more than half of the total buffers end up local caches, by 775 * using spdk_thread_get_count() to determine how many local caches we need 776 * to account for. 777 */ 778 cache_size = BUF_SMALL_POOL_SIZE / (2 * spdk_thread_get_count()); 779 snprintf(mempool_name, sizeof(mempool_name), "buf_small_pool_%d", getpid()); 780 781 g_bdev_mgr.buf_small_pool = spdk_mempool_create(mempool_name, 782 BUF_SMALL_POOL_SIZE, 783 SPDK_BDEV_SMALL_BUF_MAX_SIZE + 512, 784 cache_size, 785 SPDK_ENV_SOCKET_ID_ANY); 786 if (!g_bdev_mgr.buf_small_pool) { 787 SPDK_ERRLOG("create rbuf small pool failed\n"); 788 spdk_bdev_init_complete(-1); 789 return; 790 } 791 792 cache_size = BUF_LARGE_POOL_SIZE / (2 * spdk_thread_get_count()); 793 snprintf(mempool_name, sizeof(mempool_name), "buf_large_pool_%d", getpid()); 794 795 g_bdev_mgr.buf_large_pool = spdk_mempool_create(mempool_name, 796 BUF_LARGE_POOL_SIZE, 797 SPDK_BDEV_LARGE_BUF_MAX_SIZE + 512, 798 cache_size, 799 SPDK_ENV_SOCKET_ID_ANY); 800 if (!g_bdev_mgr.buf_large_pool) { 801 SPDK_ERRLOG("create rbuf large pool failed\n"); 802 spdk_bdev_init_complete(-1); 803 return; 804 } 805 806 g_bdev_mgr.zero_buffer = spdk_dma_zmalloc(ZERO_BUFFER_SIZE, ZERO_BUFFER_SIZE, 807 NULL); 808 if (!g_bdev_mgr.zero_buffer) { 809 SPDK_ERRLOG("create bdev zero buffer failed\n"); 810 spdk_bdev_init_complete(-1); 811 return; 812 } 813 814 #ifdef SPDK_CONFIG_VTUNE 815 g_bdev_mgr.domain = __itt_domain_create("spdk_bdev"); 816 #endif 817 818 spdk_io_device_register(&g_bdev_mgr, spdk_bdev_mgmt_channel_create, 819 spdk_bdev_mgmt_channel_destroy, 820 sizeof(struct spdk_bdev_mgmt_channel), 821 "bdev_mgr"); 822 823 rc = spdk_bdev_modules_init(); 824 if (rc != 0) { 825 SPDK_ERRLOG("bdev modules init failed\n"); 826 spdk_thread_send_msg(spdk_get_thread(), spdk_bdev_init_failed, NULL); 827 return; 828 } 829 830 spdk_bdev_module_action_complete(); 831 } 832 833 static void 834 spdk_bdev_mgr_unregister_cb(void *io_device) 835 { 836 spdk_bdev_fini_cb cb_fn = g_fini_cb_fn; 837 838 if (spdk_mempool_count(g_bdev_mgr.bdev_io_pool) != g_bdev_opts.bdev_io_pool_size) { 839 SPDK_ERRLOG("bdev IO pool count is %zu but should be %u\n", 840 spdk_mempool_count(g_bdev_mgr.bdev_io_pool), 841 g_bdev_opts.bdev_io_pool_size); 842 } 843 844 if (spdk_mempool_count(g_bdev_mgr.buf_small_pool) != BUF_SMALL_POOL_SIZE) { 845 SPDK_ERRLOG("Small buffer pool count is %zu but should be %u\n", 846 spdk_mempool_count(g_bdev_mgr.buf_small_pool), 847 BUF_SMALL_POOL_SIZE); 848 assert(false); 849 } 850 851 if (spdk_mempool_count(g_bdev_mgr.buf_large_pool) != BUF_LARGE_POOL_SIZE) { 852 SPDK_ERRLOG("Large buffer pool count is %zu but should be %u\n", 853 spdk_mempool_count(g_bdev_mgr.buf_large_pool), 854 BUF_LARGE_POOL_SIZE); 855 assert(false); 856 } 857 858 spdk_mempool_free(g_bdev_mgr.bdev_io_pool); 859 spdk_mempool_free(g_bdev_mgr.buf_small_pool); 860 spdk_mempool_free(g_bdev_mgr.buf_large_pool); 861 spdk_dma_free(g_bdev_mgr.zero_buffer); 862 863 cb_fn(g_fini_cb_arg); 864 g_fini_cb_fn = NULL; 865 g_fini_cb_arg = NULL; 866 } 867 868 static struct spdk_bdev_module *g_resume_bdev_module = NULL; 869 870 static void 871 spdk_bdev_module_finish_iter(void *arg) 872 { 873 struct spdk_bdev_module *bdev_module; 874 875 /* Start iterating from the last touched module */ 876 if (!g_resume_bdev_module) { 877 bdev_module = TAILQ_FIRST(&g_bdev_mgr.bdev_modules); 878 } else { 879 bdev_module = TAILQ_NEXT(g_resume_bdev_module, internal.tailq); 880 } 881 882 while (bdev_module) { 883 if (bdev_module->async_fini) { 884 /* Save our place so we can resume later. We must 885 * save the variable here, before calling module_fini() 886 * below, because in some cases the module may immediately 887 * call spdk_bdev_module_finish_done() and re-enter 888 * this function to continue iterating. */ 889 g_resume_bdev_module = bdev_module; 890 } 891 892 if (bdev_module->module_fini) { 893 bdev_module->module_fini(); 894 } 895 896 if (bdev_module->async_fini) { 897 return; 898 } 899 900 bdev_module = TAILQ_NEXT(bdev_module, internal.tailq); 901 } 902 903 g_resume_bdev_module = NULL; 904 spdk_io_device_unregister(&g_bdev_mgr, spdk_bdev_mgr_unregister_cb); 905 } 906 907 void 908 spdk_bdev_module_finish_done(void) 909 { 910 if (spdk_get_thread() != g_fini_thread) { 911 spdk_thread_send_msg(g_fini_thread, spdk_bdev_module_finish_iter, NULL); 912 } else { 913 spdk_bdev_module_finish_iter(NULL); 914 } 915 } 916 917 static void 918 _spdk_bdev_finish_unregister_bdevs_iter(void *cb_arg, int bdeverrno) 919 { 920 struct spdk_bdev *bdev = cb_arg; 921 922 if (bdeverrno && bdev) { 923 SPDK_WARNLOG("Unable to unregister bdev '%s' during spdk_bdev_finish()\n", 924 bdev->name); 925 926 /* 927 * Since the call to spdk_bdev_unregister() failed, we have no way to free this 928 * bdev; try to continue by manually removing this bdev from the list and continue 929 * with the next bdev in the list. 930 */ 931 TAILQ_REMOVE(&g_bdev_mgr.bdevs, bdev, internal.link); 932 } 933 934 if (TAILQ_EMPTY(&g_bdev_mgr.bdevs)) { 935 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Done unregistering bdevs\n"); 936 /* 937 * Bdev module finish need to be deffered as we might be in the middle of some context 938 * (like bdev part free) that will use this bdev (or private bdev driver ctx data) 939 * after returning. 940 */ 941 spdk_thread_send_msg(spdk_get_thread(), spdk_bdev_module_finish_iter, NULL); 942 return; 943 } 944 945 /* 946 * Unregister the last bdev in the list. The last bdev in the list should be a bdev 947 * that has no bdevs that depend on it. 948 */ 949 bdev = TAILQ_LAST(&g_bdev_mgr.bdevs, spdk_bdev_list); 950 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Unregistering bdev '%s'\n", bdev->name); 951 spdk_bdev_unregister(bdev, _spdk_bdev_finish_unregister_bdevs_iter, bdev); 952 } 953 954 void 955 spdk_bdev_finish(spdk_bdev_fini_cb cb_fn, void *cb_arg) 956 { 957 struct spdk_bdev_module *m; 958 959 assert(cb_fn != NULL); 960 961 g_fini_thread = spdk_get_thread(); 962 963 g_fini_cb_fn = cb_fn; 964 g_fini_cb_arg = cb_arg; 965 966 TAILQ_FOREACH(m, &g_bdev_mgr.bdev_modules, internal.tailq) { 967 if (m->fini_start) { 968 m->fini_start(); 969 } 970 } 971 972 _spdk_bdev_finish_unregister_bdevs_iter(NULL, 0); 973 } 974 975 static struct spdk_bdev_io * 976 spdk_bdev_get_io(struct spdk_bdev_channel *channel) 977 { 978 struct spdk_bdev_mgmt_channel *ch = channel->shared_resource->mgmt_ch; 979 struct spdk_bdev_io *bdev_io; 980 981 if (ch->per_thread_cache_count > 0) { 982 bdev_io = STAILQ_FIRST(&ch->per_thread_cache); 983 STAILQ_REMOVE_HEAD(&ch->per_thread_cache, internal.buf_link); 984 ch->per_thread_cache_count--; 985 } else if (spdk_unlikely(!TAILQ_EMPTY(&ch->io_wait_queue))) { 986 /* 987 * Don't try to look for bdev_ios in the global pool if there are 988 * waiters on bdev_ios - we don't want this caller to jump the line. 989 */ 990 bdev_io = NULL; 991 } else { 992 bdev_io = spdk_mempool_get(g_bdev_mgr.bdev_io_pool); 993 } 994 995 return bdev_io; 996 } 997 998 void 999 spdk_bdev_free_io(struct spdk_bdev_io *bdev_io) 1000 { 1001 struct spdk_bdev_mgmt_channel *ch = bdev_io->internal.ch->shared_resource->mgmt_ch; 1002 1003 assert(bdev_io != NULL); 1004 assert(bdev_io->internal.status != SPDK_BDEV_IO_STATUS_PENDING); 1005 1006 if (bdev_io->internal.buf != NULL) { 1007 spdk_bdev_io_put_buf(bdev_io); 1008 } 1009 1010 if (ch->per_thread_cache_count < ch->bdev_io_cache_size) { 1011 ch->per_thread_cache_count++; 1012 STAILQ_INSERT_TAIL(&ch->per_thread_cache, bdev_io, internal.buf_link); 1013 while (ch->per_thread_cache_count > 0 && !TAILQ_EMPTY(&ch->io_wait_queue)) { 1014 struct spdk_bdev_io_wait_entry *entry; 1015 1016 entry = TAILQ_FIRST(&ch->io_wait_queue); 1017 TAILQ_REMOVE(&ch->io_wait_queue, entry, link); 1018 entry->cb_fn(entry->cb_arg); 1019 } 1020 } else { 1021 /* We should never have a full cache with entries on the io wait queue. */ 1022 assert(TAILQ_EMPTY(&ch->io_wait_queue)); 1023 spdk_mempool_put(g_bdev_mgr.bdev_io_pool, (void *)bdev_io); 1024 } 1025 } 1026 1027 static uint64_t 1028 _spdk_bdev_get_io_size_in_byte(struct spdk_bdev_io *bdev_io) 1029 { 1030 struct spdk_bdev *bdev = bdev_io->bdev; 1031 1032 switch (bdev_io->type) { 1033 case SPDK_BDEV_IO_TYPE_NVME_ADMIN: 1034 case SPDK_BDEV_IO_TYPE_NVME_IO: 1035 case SPDK_BDEV_IO_TYPE_NVME_IO_MD: 1036 return bdev_io->u.nvme_passthru.nbytes; 1037 case SPDK_BDEV_IO_TYPE_READ: 1038 case SPDK_BDEV_IO_TYPE_WRITE: 1039 case SPDK_BDEV_IO_TYPE_UNMAP: 1040 case SPDK_BDEV_IO_TYPE_WRITE_ZEROES: 1041 return bdev_io->u.bdev.num_blocks * bdev->blocklen; 1042 default: 1043 return 0; 1044 } 1045 } 1046 1047 static void 1048 _spdk_bdev_qos_io_submit(struct spdk_bdev_channel *ch) 1049 { 1050 struct spdk_bdev_io *bdev_io = NULL; 1051 struct spdk_bdev *bdev = ch->bdev; 1052 struct spdk_bdev_qos *qos = bdev->internal.qos; 1053 struct spdk_bdev_shared_resource *shared_resource = ch->shared_resource; 1054 1055 while (!TAILQ_EMPTY(&qos->queued)) { 1056 if (qos->max_ios_per_timeslice > 0 && qos->io_remaining_this_timeslice == 0) { 1057 break; 1058 } 1059 1060 if (qos->max_byte_per_timeslice > 0 && qos->byte_remaining_this_timeslice <= 0) { 1061 break; 1062 } 1063 1064 bdev_io = TAILQ_FIRST(&qos->queued); 1065 TAILQ_REMOVE(&qos->queued, bdev_io, internal.link); 1066 qos->io_remaining_this_timeslice--; 1067 qos->byte_remaining_this_timeslice -= _spdk_bdev_get_io_size_in_byte(bdev_io); 1068 ch->io_outstanding++; 1069 shared_resource->io_outstanding++; 1070 bdev->fn_table->submit_request(ch->channel, bdev_io); 1071 } 1072 } 1073 1074 static bool 1075 _spdk_bdev_io_type_can_split(uint8_t type) 1076 { 1077 assert(type != SPDK_BDEV_IO_TYPE_INVALID); 1078 assert(type < SPDK_BDEV_NUM_IO_TYPES); 1079 1080 /* Only split READ and WRITE I/O. Theoretically other types of I/O like 1081 * UNMAP could be split, but these types of I/O are typically much larger 1082 * in size (sometimes the size of the entire block device), and the bdev 1083 * module can more efficiently split these types of I/O. Plus those types 1084 * of I/O do not have a payload, which makes the splitting process simpler. 1085 */ 1086 if (type == SPDK_BDEV_IO_TYPE_READ || type == SPDK_BDEV_IO_TYPE_WRITE) { 1087 return true; 1088 } else { 1089 return false; 1090 } 1091 } 1092 1093 static bool 1094 _spdk_bdev_io_should_split(struct spdk_bdev_io *bdev_io) 1095 { 1096 uint64_t start_stripe, end_stripe; 1097 uint32_t io_boundary = bdev_io->bdev->optimal_io_boundary; 1098 1099 if (io_boundary == 0) { 1100 return false; 1101 } 1102 1103 if (!_spdk_bdev_io_type_can_split(bdev_io->type)) { 1104 return false; 1105 } 1106 1107 start_stripe = bdev_io->u.bdev.offset_blocks; 1108 end_stripe = start_stripe + bdev_io->u.bdev.num_blocks - 1; 1109 /* Avoid expensive div operations if possible. These spdk_u32 functions are very cheap. */ 1110 if (spdk_likely(spdk_u32_is_pow2(io_boundary))) { 1111 start_stripe >>= spdk_u32log2(io_boundary); 1112 end_stripe >>= spdk_u32log2(io_boundary); 1113 } else { 1114 start_stripe /= io_boundary; 1115 end_stripe /= io_boundary; 1116 } 1117 return (start_stripe != end_stripe); 1118 } 1119 1120 static uint32_t 1121 _to_next_boundary(uint64_t offset, uint32_t boundary) 1122 { 1123 return (boundary - (offset % boundary)); 1124 } 1125 1126 static void 1127 _spdk_bdev_io_split_done(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg); 1128 1129 static void 1130 _spdk_bdev_io_split_with_payload(void *_bdev_io) 1131 { 1132 struct spdk_bdev_io *bdev_io = _bdev_io; 1133 uint64_t current_offset, remaining, bytes_handled; 1134 uint32_t blocklen, to_next_boundary, to_next_boundary_bytes; 1135 struct iovec *parent_iov; 1136 uint64_t parent_iov_offset, child_iov_len; 1137 uint32_t child_iovcnt; 1138 int rc; 1139 1140 remaining = bdev_io->u.bdev.split_remaining_num_blocks; 1141 current_offset = bdev_io->u.bdev.split_current_offset_blocks; 1142 blocklen = bdev_io->bdev->blocklen; 1143 bytes_handled = (current_offset - bdev_io->u.bdev.offset_blocks) * blocklen; 1144 parent_iov = &bdev_io->u.bdev.iovs[0]; 1145 parent_iov_offset = 0; 1146 1147 while (bytes_handled > 0) { 1148 if (bytes_handled >= parent_iov->iov_len) { 1149 bytes_handled -= parent_iov->iov_len; 1150 parent_iov++; 1151 continue; 1152 } 1153 parent_iov_offset += bytes_handled; 1154 break; 1155 } 1156 1157 to_next_boundary = _to_next_boundary(current_offset, bdev_io->bdev->optimal_io_boundary); 1158 to_next_boundary = spdk_min(remaining, to_next_boundary); 1159 to_next_boundary_bytes = to_next_boundary * blocklen; 1160 child_iovcnt = 0; 1161 while (to_next_boundary_bytes > 0) { 1162 child_iov_len = spdk_min(to_next_boundary_bytes, parent_iov->iov_len - parent_iov_offset); 1163 to_next_boundary_bytes -= child_iov_len; 1164 1165 bdev_io->child_iov[child_iovcnt].iov_base = parent_iov->iov_base + parent_iov_offset; 1166 bdev_io->child_iov[child_iovcnt].iov_len = child_iov_len; 1167 1168 parent_iov++; 1169 parent_iov_offset = 0; 1170 child_iovcnt++; 1171 if (child_iovcnt == BDEV_IO_NUM_CHILD_IOV && to_next_boundary_bytes > 0) { 1172 /* We've run out of child iovs - we need to fail this I/O. */ 1173 bdev_io->internal.status = SPDK_BDEV_IO_STATUS_FAILED; 1174 bdev_io->internal.cb(bdev_io, SPDK_BDEV_IO_STATUS_FAILED, 1175 bdev_io->internal.caller_ctx); 1176 return; 1177 } 1178 } 1179 1180 if (bdev_io->type == SPDK_BDEV_IO_TYPE_READ) { 1181 rc = spdk_bdev_readv_blocks(bdev_io->internal.desc, 1182 spdk_io_channel_from_ctx(bdev_io->internal.ch), 1183 bdev_io->child_iov, child_iovcnt, current_offset, to_next_boundary, 1184 _spdk_bdev_io_split_done, bdev_io); 1185 } else { 1186 rc = spdk_bdev_writev_blocks(bdev_io->internal.desc, 1187 spdk_io_channel_from_ctx(bdev_io->internal.ch), 1188 bdev_io->child_iov, child_iovcnt, current_offset, to_next_boundary, 1189 _spdk_bdev_io_split_done, bdev_io); 1190 } 1191 1192 if (rc == 0) { 1193 bdev_io->u.bdev.split_current_offset_blocks += to_next_boundary; 1194 bdev_io->u.bdev.split_remaining_num_blocks -= to_next_boundary; 1195 } else { 1196 assert(rc == -ENOMEM); 1197 bdev_io->internal.waitq_entry.bdev = bdev_io->bdev; 1198 bdev_io->internal.waitq_entry.cb_fn = _spdk_bdev_io_split_with_payload; 1199 bdev_io->internal.waitq_entry.cb_arg = bdev_io; 1200 spdk_bdev_queue_io_wait(bdev_io->bdev, spdk_io_channel_from_ctx(bdev_io->internal.ch), 1201 &bdev_io->internal.waitq_entry); 1202 } 1203 } 1204 1205 static void 1206 _spdk_bdev_io_split_done(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg) 1207 { 1208 struct spdk_bdev_io *parent_io = cb_arg; 1209 1210 spdk_bdev_free_io(bdev_io); 1211 1212 if (!success) { 1213 parent_io->internal.status = SPDK_BDEV_IO_STATUS_FAILED; 1214 parent_io->internal.cb(parent_io, SPDK_BDEV_IO_STATUS_FAILED, parent_io->internal.caller_ctx); 1215 return; 1216 } 1217 1218 if (parent_io->u.bdev.split_remaining_num_blocks == 0) { 1219 parent_io->internal.status = SPDK_BDEV_IO_STATUS_SUCCESS; 1220 parent_io->internal.cb(parent_io, SPDK_BDEV_IO_STATUS_SUCCESS, parent_io->internal.caller_ctx); 1221 return; 1222 } 1223 1224 /* 1225 * Continue with the splitting process. This function will complete the parent I/O if the 1226 * splitting is done. 1227 */ 1228 _spdk_bdev_io_split_with_payload(parent_io); 1229 } 1230 1231 static void 1232 _spdk_bdev_io_split(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io) 1233 { 1234 assert(_spdk_bdev_io_type_can_split(bdev_io->type)); 1235 1236 bdev_io->u.bdev.split_current_offset_blocks = bdev_io->u.bdev.offset_blocks; 1237 bdev_io->u.bdev.split_remaining_num_blocks = bdev_io->u.bdev.num_blocks; 1238 1239 _spdk_bdev_io_split_with_payload(bdev_io); 1240 } 1241 1242 static void 1243 _spdk_bdev_io_submit(void *ctx) 1244 { 1245 struct spdk_bdev_io *bdev_io = ctx; 1246 struct spdk_bdev *bdev = bdev_io->bdev; 1247 struct spdk_bdev_channel *bdev_ch = bdev_io->internal.ch; 1248 struct spdk_io_channel *ch = bdev_ch->channel; 1249 struct spdk_bdev_shared_resource *shared_resource = bdev_ch->shared_resource; 1250 uint64_t tsc; 1251 1252 tsc = spdk_get_ticks(); 1253 bdev_io->internal.submit_tsc = tsc; 1254 spdk_trace_record_tsc(tsc, TRACE_BDEV_IO_START, 0, 0, (uintptr_t)bdev_io, bdev_io->type); 1255 bdev_ch->io_outstanding++; 1256 shared_resource->io_outstanding++; 1257 bdev_io->internal.in_submit_request = true; 1258 if (spdk_likely(bdev_ch->flags == 0)) { 1259 if (spdk_likely(TAILQ_EMPTY(&shared_resource->nomem_io))) { 1260 bdev->fn_table->submit_request(ch, bdev_io); 1261 } else { 1262 bdev_ch->io_outstanding--; 1263 shared_resource->io_outstanding--; 1264 TAILQ_INSERT_TAIL(&shared_resource->nomem_io, bdev_io, internal.link); 1265 } 1266 } else if (bdev_ch->flags & BDEV_CH_RESET_IN_PROGRESS) { 1267 spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED); 1268 } else if (bdev_ch->flags & BDEV_CH_QOS_ENABLED) { 1269 bdev_ch->io_outstanding--; 1270 shared_resource->io_outstanding--; 1271 TAILQ_INSERT_TAIL(&bdev->internal.qos->queued, bdev_io, internal.link); 1272 _spdk_bdev_qos_io_submit(bdev_ch); 1273 } else { 1274 SPDK_ERRLOG("unknown bdev_ch flag %x found\n", bdev_ch->flags); 1275 spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED); 1276 } 1277 bdev_io->internal.in_submit_request = false; 1278 } 1279 1280 static void 1281 spdk_bdev_io_submit(struct spdk_bdev_io *bdev_io) 1282 { 1283 struct spdk_bdev *bdev = bdev_io->bdev; 1284 struct spdk_thread *thread = spdk_io_channel_get_thread(bdev_io->internal.ch->channel); 1285 1286 assert(thread != NULL); 1287 assert(bdev_io->internal.status == SPDK_BDEV_IO_STATUS_PENDING); 1288 1289 if (bdev->split_on_optimal_io_boundary && _spdk_bdev_io_should_split(bdev_io)) { 1290 if (bdev_io->type == SPDK_BDEV_IO_TYPE_READ) { 1291 spdk_bdev_io_get_buf(bdev_io, _spdk_bdev_io_split, 1292 bdev_io->u.bdev.num_blocks * bdev_io->bdev->blocklen); 1293 } else { 1294 _spdk_bdev_io_split(NULL, bdev_io); 1295 } 1296 return; 1297 } 1298 1299 if (bdev_io->internal.ch->flags & BDEV_CH_QOS_ENABLED) { 1300 if ((thread == bdev->internal.qos->thread) || !bdev->internal.qos->thread) { 1301 _spdk_bdev_io_submit(bdev_io); 1302 } else { 1303 bdev_io->internal.io_submit_ch = bdev_io->internal.ch; 1304 bdev_io->internal.ch = bdev->internal.qos->ch; 1305 spdk_thread_send_msg(bdev->internal.qos->thread, _spdk_bdev_io_submit, bdev_io); 1306 } 1307 } else { 1308 _spdk_bdev_io_submit(bdev_io); 1309 } 1310 } 1311 1312 static void 1313 spdk_bdev_io_submit_reset(struct spdk_bdev_io *bdev_io) 1314 { 1315 struct spdk_bdev *bdev = bdev_io->bdev; 1316 struct spdk_bdev_channel *bdev_ch = bdev_io->internal.ch; 1317 struct spdk_io_channel *ch = bdev_ch->channel; 1318 1319 assert(bdev_io->internal.status == SPDK_BDEV_IO_STATUS_PENDING); 1320 1321 bdev_io->internal.in_submit_request = true; 1322 bdev->fn_table->submit_request(ch, bdev_io); 1323 bdev_io->internal.in_submit_request = false; 1324 } 1325 1326 static void 1327 spdk_bdev_io_init(struct spdk_bdev_io *bdev_io, 1328 struct spdk_bdev *bdev, void *cb_arg, 1329 spdk_bdev_io_completion_cb cb) 1330 { 1331 bdev_io->bdev = bdev; 1332 bdev_io->internal.caller_ctx = cb_arg; 1333 bdev_io->internal.cb = cb; 1334 bdev_io->internal.status = SPDK_BDEV_IO_STATUS_PENDING; 1335 bdev_io->internal.in_submit_request = false; 1336 bdev_io->internal.buf = NULL; 1337 bdev_io->internal.io_submit_ch = NULL; 1338 } 1339 1340 static bool 1341 _spdk_bdev_io_type_supported(struct spdk_bdev *bdev, enum spdk_bdev_io_type io_type) 1342 { 1343 return bdev->fn_table->io_type_supported(bdev->ctxt, io_type); 1344 } 1345 1346 bool 1347 spdk_bdev_io_type_supported(struct spdk_bdev *bdev, enum spdk_bdev_io_type io_type) 1348 { 1349 bool supported; 1350 1351 supported = _spdk_bdev_io_type_supported(bdev, io_type); 1352 1353 if (!supported) { 1354 switch (io_type) { 1355 case SPDK_BDEV_IO_TYPE_WRITE_ZEROES: 1356 /* The bdev layer will emulate write zeroes as long as write is supported. */ 1357 supported = _spdk_bdev_io_type_supported(bdev, SPDK_BDEV_IO_TYPE_WRITE); 1358 break; 1359 default: 1360 break; 1361 } 1362 } 1363 1364 return supported; 1365 } 1366 1367 int 1368 spdk_bdev_dump_info_json(struct spdk_bdev *bdev, struct spdk_json_write_ctx *w) 1369 { 1370 if (bdev->fn_table->dump_info_json) { 1371 return bdev->fn_table->dump_info_json(bdev->ctxt, w); 1372 } 1373 1374 return 0; 1375 } 1376 1377 static void 1378 spdk_bdev_qos_update_max_quota_per_timeslice(struct spdk_bdev_qos *qos) 1379 { 1380 uint64_t max_ios_per_timeslice = 0, max_byte_per_timeslice = 0; 1381 1382 if (qos->iops_rate_limit > 0) { 1383 max_ios_per_timeslice = qos->iops_rate_limit * SPDK_BDEV_QOS_TIMESLICE_IN_USEC / 1384 SPDK_SEC_TO_USEC; 1385 qos->max_ios_per_timeslice = spdk_max(max_ios_per_timeslice, 1386 SPDK_BDEV_QOS_MIN_IO_PER_TIMESLICE); 1387 } 1388 1389 if (qos->byte_rate_limit > 0) { 1390 max_byte_per_timeslice = qos->byte_rate_limit * SPDK_BDEV_QOS_TIMESLICE_IN_USEC / 1391 SPDK_SEC_TO_USEC; 1392 qos->max_byte_per_timeslice = spdk_max(max_byte_per_timeslice, 1393 SPDK_BDEV_QOS_MIN_BYTE_PER_TIMESLICE); 1394 } 1395 } 1396 1397 static int 1398 spdk_bdev_channel_poll_qos(void *arg) 1399 { 1400 struct spdk_bdev_qos *qos = arg; 1401 uint64_t now = spdk_get_ticks(); 1402 1403 if (now < (qos->last_timeslice + qos->timeslice_size)) { 1404 /* We received our callback earlier than expected - return 1405 * immediately and wait to do accounting until at least one 1406 * timeslice has actually expired. This should never happen 1407 * with a well-behaved timer implementation. 1408 */ 1409 return 0; 1410 } 1411 1412 /* Reset for next round of rate limiting */ 1413 qos->io_remaining_this_timeslice = 0; 1414 /* We may have allowed the bytes to slightly overrun in the last timeslice. 1415 * byte_remaining_this_timeslice is signed, so if it's negative here, we'll 1416 * account for the overrun so that the next timeslice will be appropriately 1417 * reduced. 1418 */ 1419 if (qos->byte_remaining_this_timeslice > 0) { 1420 qos->byte_remaining_this_timeslice = 0; 1421 } 1422 1423 while (now >= (qos->last_timeslice + qos->timeslice_size)) { 1424 qos->last_timeslice += qos->timeslice_size; 1425 qos->io_remaining_this_timeslice += qos->max_ios_per_timeslice; 1426 qos->byte_remaining_this_timeslice += qos->max_byte_per_timeslice; 1427 } 1428 1429 _spdk_bdev_qos_io_submit(qos->ch); 1430 1431 return -1; 1432 } 1433 1434 static void 1435 _spdk_bdev_channel_destroy_resource(struct spdk_bdev_channel *ch) 1436 { 1437 struct spdk_bdev_shared_resource *shared_resource; 1438 1439 if (!ch) { 1440 return; 1441 } 1442 1443 if (ch->channel) { 1444 spdk_put_io_channel(ch->channel); 1445 } 1446 1447 assert(ch->io_outstanding == 0); 1448 1449 shared_resource = ch->shared_resource; 1450 if (shared_resource) { 1451 assert(ch->io_outstanding == 0); 1452 assert(shared_resource->ref > 0); 1453 shared_resource->ref--; 1454 if (shared_resource->ref == 0) { 1455 assert(shared_resource->io_outstanding == 0); 1456 TAILQ_REMOVE(&shared_resource->mgmt_ch->shared_resources, shared_resource, link); 1457 spdk_put_io_channel(spdk_io_channel_from_ctx(shared_resource->mgmt_ch)); 1458 free(shared_resource); 1459 } 1460 } 1461 } 1462 1463 /* Caller must hold bdev->internal.mutex. */ 1464 static void 1465 _spdk_bdev_enable_qos(struct spdk_bdev *bdev, struct spdk_bdev_channel *ch) 1466 { 1467 struct spdk_bdev_qos *qos = bdev->internal.qos; 1468 1469 /* Rate limiting on this bdev enabled */ 1470 if (qos) { 1471 if (qos->ch == NULL) { 1472 struct spdk_io_channel *io_ch; 1473 1474 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Selecting channel %p as QoS channel for bdev %s on thread %p\n", ch, 1475 bdev->name, spdk_get_thread()); 1476 1477 /* No qos channel has been selected, so set one up */ 1478 1479 /* Take another reference to ch */ 1480 io_ch = spdk_get_io_channel(__bdev_to_io_dev(bdev)); 1481 qos->ch = ch; 1482 1483 qos->thread = spdk_io_channel_get_thread(io_ch); 1484 1485 TAILQ_INIT(&qos->queued); 1486 spdk_bdev_qos_update_max_quota_per_timeslice(qos); 1487 qos->io_remaining_this_timeslice = qos->max_ios_per_timeslice; 1488 qos->byte_remaining_this_timeslice = qos->max_byte_per_timeslice; 1489 qos->timeslice_size = 1490 SPDK_BDEV_QOS_TIMESLICE_IN_USEC * spdk_get_ticks_hz() / SPDK_SEC_TO_USEC; 1491 qos->last_timeslice = spdk_get_ticks(); 1492 qos->poller = spdk_poller_register(spdk_bdev_channel_poll_qos, 1493 qos, 1494 SPDK_BDEV_QOS_TIMESLICE_IN_USEC); 1495 } 1496 1497 ch->flags |= BDEV_CH_QOS_ENABLED; 1498 } 1499 } 1500 1501 static int 1502 spdk_bdev_channel_create(void *io_device, void *ctx_buf) 1503 { 1504 struct spdk_bdev *bdev = __bdev_from_io_dev(io_device); 1505 struct spdk_bdev_channel *ch = ctx_buf; 1506 struct spdk_io_channel *mgmt_io_ch; 1507 struct spdk_bdev_mgmt_channel *mgmt_ch; 1508 struct spdk_bdev_shared_resource *shared_resource; 1509 1510 ch->bdev = bdev; 1511 ch->channel = bdev->fn_table->get_io_channel(bdev->ctxt); 1512 if (!ch->channel) { 1513 return -1; 1514 } 1515 1516 mgmt_io_ch = spdk_get_io_channel(&g_bdev_mgr); 1517 if (!mgmt_io_ch) { 1518 return -1; 1519 } 1520 1521 mgmt_ch = spdk_io_channel_get_ctx(mgmt_io_ch); 1522 TAILQ_FOREACH(shared_resource, &mgmt_ch->shared_resources, link) { 1523 if (shared_resource->shared_ch == ch->channel) { 1524 spdk_put_io_channel(mgmt_io_ch); 1525 shared_resource->ref++; 1526 break; 1527 } 1528 } 1529 1530 if (shared_resource == NULL) { 1531 shared_resource = calloc(1, sizeof(*shared_resource)); 1532 if (shared_resource == NULL) { 1533 spdk_put_io_channel(mgmt_io_ch); 1534 return -1; 1535 } 1536 1537 shared_resource->mgmt_ch = mgmt_ch; 1538 shared_resource->io_outstanding = 0; 1539 TAILQ_INIT(&shared_resource->nomem_io); 1540 shared_resource->nomem_threshold = 0; 1541 shared_resource->shared_ch = ch->channel; 1542 shared_resource->ref = 1; 1543 TAILQ_INSERT_TAIL(&mgmt_ch->shared_resources, shared_resource, link); 1544 } 1545 1546 memset(&ch->stat, 0, sizeof(ch->stat)); 1547 ch->stat.ticks_rate = spdk_get_ticks_hz(); 1548 ch->io_outstanding = 0; 1549 TAILQ_INIT(&ch->queued_resets); 1550 ch->flags = 0; 1551 ch->shared_resource = shared_resource; 1552 1553 #ifdef SPDK_CONFIG_VTUNE 1554 { 1555 char *name; 1556 __itt_init_ittlib(NULL, 0); 1557 name = spdk_sprintf_alloc("spdk_bdev_%s_%p", ch->bdev->name, ch); 1558 if (!name) { 1559 _spdk_bdev_channel_destroy_resource(ch); 1560 return -1; 1561 } 1562 ch->handle = __itt_string_handle_create(name); 1563 free(name); 1564 ch->start_tsc = spdk_get_ticks(); 1565 ch->interval_tsc = spdk_get_ticks_hz() / 100; 1566 memset(&ch->prev_stat, 0, sizeof(ch->prev_stat)); 1567 } 1568 #endif 1569 1570 pthread_mutex_lock(&bdev->internal.mutex); 1571 _spdk_bdev_enable_qos(bdev, ch); 1572 pthread_mutex_unlock(&bdev->internal.mutex); 1573 1574 return 0; 1575 } 1576 1577 /* 1578 * Abort I/O that are waiting on a data buffer. These types of I/O are 1579 * linked using the spdk_bdev_io internal.buf_link TAILQ_ENTRY. 1580 */ 1581 static void 1582 _spdk_bdev_abort_buf_io(bdev_io_stailq_t *queue, struct spdk_bdev_channel *ch) 1583 { 1584 bdev_io_stailq_t tmp; 1585 struct spdk_bdev_io *bdev_io; 1586 1587 STAILQ_INIT(&tmp); 1588 1589 while (!STAILQ_EMPTY(queue)) { 1590 bdev_io = STAILQ_FIRST(queue); 1591 STAILQ_REMOVE_HEAD(queue, internal.buf_link); 1592 if (bdev_io->internal.ch == ch) { 1593 spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED); 1594 } else { 1595 STAILQ_INSERT_TAIL(&tmp, bdev_io, internal.buf_link); 1596 } 1597 } 1598 1599 STAILQ_SWAP(&tmp, queue, spdk_bdev_io); 1600 } 1601 1602 /* 1603 * Abort I/O that are queued waiting for submission. These types of I/O are 1604 * linked using the spdk_bdev_io link TAILQ_ENTRY. 1605 */ 1606 static void 1607 _spdk_bdev_abort_queued_io(bdev_io_tailq_t *queue, struct spdk_bdev_channel *ch) 1608 { 1609 struct spdk_bdev_io *bdev_io, *tmp; 1610 1611 TAILQ_FOREACH_SAFE(bdev_io, queue, internal.link, tmp) { 1612 if (bdev_io->internal.ch == ch) { 1613 TAILQ_REMOVE(queue, bdev_io, internal.link); 1614 /* 1615 * spdk_bdev_io_complete() assumes that the completed I/O had 1616 * been submitted to the bdev module. Since in this case it 1617 * hadn't, bump io_outstanding to account for the decrement 1618 * that spdk_bdev_io_complete() will do. 1619 */ 1620 if (bdev_io->type != SPDK_BDEV_IO_TYPE_RESET) { 1621 ch->io_outstanding++; 1622 ch->shared_resource->io_outstanding++; 1623 } 1624 spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED); 1625 } 1626 } 1627 } 1628 1629 static void 1630 spdk_bdev_qos_channel_destroy(void *cb_arg) 1631 { 1632 struct spdk_bdev_qos *qos = cb_arg; 1633 1634 spdk_put_io_channel(spdk_io_channel_from_ctx(qos->ch)); 1635 spdk_poller_unregister(&qos->poller); 1636 1637 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Free QoS %p.\n", qos); 1638 1639 free(qos); 1640 } 1641 1642 static int 1643 spdk_bdev_qos_destroy(struct spdk_bdev *bdev) 1644 { 1645 /* 1646 * Cleanly shutting down the QoS poller is tricky, because 1647 * during the asynchronous operation the user could open 1648 * a new descriptor and create a new channel, spawning 1649 * a new QoS poller. 1650 * 1651 * The strategy is to create a new QoS structure here and swap it 1652 * in. The shutdown path then continues to refer to the old one 1653 * until it completes and then releases it. 1654 */ 1655 struct spdk_bdev_qos *new_qos, *old_qos; 1656 1657 old_qos = bdev->internal.qos; 1658 1659 new_qos = calloc(1, sizeof(*new_qos)); 1660 if (!new_qos) { 1661 SPDK_ERRLOG("Unable to allocate memory to shut down QoS.\n"); 1662 return -ENOMEM; 1663 } 1664 1665 /* Copy the old QoS data into the newly allocated structure */ 1666 memcpy(new_qos, old_qos, sizeof(*new_qos)); 1667 1668 /* Zero out the key parts of the QoS structure */ 1669 new_qos->ch = NULL; 1670 new_qos->thread = NULL; 1671 new_qos->max_ios_per_timeslice = 0; 1672 new_qos->max_byte_per_timeslice = 0; 1673 new_qos->io_remaining_this_timeslice = 0; 1674 new_qos->byte_remaining_this_timeslice = 0; 1675 new_qos->poller = NULL; 1676 TAILQ_INIT(&new_qos->queued); 1677 1678 bdev->internal.qos = new_qos; 1679 1680 if (old_qos->thread == NULL) { 1681 free(old_qos); 1682 } else { 1683 spdk_thread_send_msg(old_qos->thread, spdk_bdev_qos_channel_destroy, 1684 old_qos); 1685 } 1686 1687 /* It is safe to continue with destroying the bdev even though the QoS channel hasn't 1688 * been destroyed yet. The destruction path will end up waiting for the final 1689 * channel to be put before it releases resources. */ 1690 1691 return 0; 1692 } 1693 1694 static void 1695 _spdk_bdev_io_stat_add(struct spdk_bdev_io_stat *total, struct spdk_bdev_io_stat *add) 1696 { 1697 total->bytes_read += add->bytes_read; 1698 total->num_read_ops += add->num_read_ops; 1699 total->bytes_written += add->bytes_written; 1700 total->num_write_ops += add->num_write_ops; 1701 total->read_latency_ticks += add->read_latency_ticks; 1702 total->write_latency_ticks += add->write_latency_ticks; 1703 } 1704 1705 static void 1706 spdk_bdev_channel_destroy(void *io_device, void *ctx_buf) 1707 { 1708 struct spdk_bdev_channel *ch = ctx_buf; 1709 struct spdk_bdev_mgmt_channel *mgmt_ch; 1710 struct spdk_bdev_shared_resource *shared_resource = ch->shared_resource; 1711 1712 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Destroying channel %p for bdev %s on thread %p\n", ch, ch->bdev->name, 1713 spdk_get_thread()); 1714 1715 /* This channel is going away, so add its statistics into the bdev so that they don't get lost. */ 1716 pthread_mutex_lock(&ch->bdev->internal.mutex); 1717 _spdk_bdev_io_stat_add(&ch->bdev->internal.stat, &ch->stat); 1718 pthread_mutex_unlock(&ch->bdev->internal.mutex); 1719 1720 mgmt_ch = shared_resource->mgmt_ch; 1721 1722 _spdk_bdev_abort_queued_io(&ch->queued_resets, ch); 1723 _spdk_bdev_abort_queued_io(&shared_resource->nomem_io, ch); 1724 _spdk_bdev_abort_buf_io(&mgmt_ch->need_buf_small, ch); 1725 _spdk_bdev_abort_buf_io(&mgmt_ch->need_buf_large, ch); 1726 1727 _spdk_bdev_channel_destroy_resource(ch); 1728 } 1729 1730 int 1731 spdk_bdev_alias_add(struct spdk_bdev *bdev, const char *alias) 1732 { 1733 struct spdk_bdev_alias *tmp; 1734 1735 if (alias == NULL) { 1736 SPDK_ERRLOG("Empty alias passed\n"); 1737 return -EINVAL; 1738 } 1739 1740 if (spdk_bdev_get_by_name(alias)) { 1741 SPDK_ERRLOG("Bdev name/alias: %s already exists\n", alias); 1742 return -EEXIST; 1743 } 1744 1745 tmp = calloc(1, sizeof(*tmp)); 1746 if (tmp == NULL) { 1747 SPDK_ERRLOG("Unable to allocate alias\n"); 1748 return -ENOMEM; 1749 } 1750 1751 tmp->alias = strdup(alias); 1752 if (tmp->alias == NULL) { 1753 free(tmp); 1754 SPDK_ERRLOG("Unable to allocate alias\n"); 1755 return -ENOMEM; 1756 } 1757 1758 TAILQ_INSERT_TAIL(&bdev->aliases, tmp, tailq); 1759 1760 return 0; 1761 } 1762 1763 int 1764 spdk_bdev_alias_del(struct spdk_bdev *bdev, const char *alias) 1765 { 1766 struct spdk_bdev_alias *tmp; 1767 1768 TAILQ_FOREACH(tmp, &bdev->aliases, tailq) { 1769 if (strcmp(alias, tmp->alias) == 0) { 1770 TAILQ_REMOVE(&bdev->aliases, tmp, tailq); 1771 free(tmp->alias); 1772 free(tmp); 1773 return 0; 1774 } 1775 } 1776 1777 SPDK_INFOLOG(SPDK_LOG_BDEV, "Alias %s does not exists\n", alias); 1778 1779 return -ENOENT; 1780 } 1781 1782 void 1783 spdk_bdev_alias_del_all(struct spdk_bdev *bdev) 1784 { 1785 struct spdk_bdev_alias *p, *tmp; 1786 1787 TAILQ_FOREACH_SAFE(p, &bdev->aliases, tailq, tmp) { 1788 TAILQ_REMOVE(&bdev->aliases, p, tailq); 1789 free(p->alias); 1790 free(p); 1791 } 1792 } 1793 1794 struct spdk_io_channel * 1795 spdk_bdev_get_io_channel(struct spdk_bdev_desc *desc) 1796 { 1797 return spdk_get_io_channel(__bdev_to_io_dev(desc->bdev)); 1798 } 1799 1800 const char * 1801 spdk_bdev_get_name(const struct spdk_bdev *bdev) 1802 { 1803 return bdev->name; 1804 } 1805 1806 const char * 1807 spdk_bdev_get_product_name(const struct spdk_bdev *bdev) 1808 { 1809 return bdev->product_name; 1810 } 1811 1812 const struct spdk_bdev_aliases_list * 1813 spdk_bdev_get_aliases(const struct spdk_bdev *bdev) 1814 { 1815 return &bdev->aliases; 1816 } 1817 1818 uint32_t 1819 spdk_bdev_get_block_size(const struct spdk_bdev *bdev) 1820 { 1821 return bdev->blocklen; 1822 } 1823 1824 uint64_t 1825 spdk_bdev_get_num_blocks(const struct spdk_bdev *bdev) 1826 { 1827 return bdev->blockcnt; 1828 } 1829 1830 uint64_t 1831 spdk_bdev_get_qos_ios_per_sec(struct spdk_bdev *bdev) 1832 { 1833 uint64_t iops_rate_limit = 0; 1834 1835 pthread_mutex_lock(&bdev->internal.mutex); 1836 if (bdev->internal.qos) { 1837 iops_rate_limit = bdev->internal.qos->iops_rate_limit; 1838 } 1839 pthread_mutex_unlock(&bdev->internal.mutex); 1840 1841 return iops_rate_limit; 1842 } 1843 1844 size_t 1845 spdk_bdev_get_buf_align(const struct spdk_bdev *bdev) 1846 { 1847 /* TODO: push this logic down to the bdev modules */ 1848 if (bdev->need_aligned_buffer) { 1849 return bdev->blocklen; 1850 } 1851 1852 return 1; 1853 } 1854 1855 uint32_t 1856 spdk_bdev_get_optimal_io_boundary(const struct spdk_bdev *bdev) 1857 { 1858 return bdev->optimal_io_boundary; 1859 } 1860 1861 bool 1862 spdk_bdev_has_write_cache(const struct spdk_bdev *bdev) 1863 { 1864 return bdev->write_cache; 1865 } 1866 1867 const struct spdk_uuid * 1868 spdk_bdev_get_uuid(const struct spdk_bdev *bdev) 1869 { 1870 return &bdev->uuid; 1871 } 1872 1873 uint64_t 1874 spdk_bdev_get_qd(const struct spdk_bdev *bdev) 1875 { 1876 return bdev->internal.measured_queue_depth; 1877 } 1878 1879 uint64_t 1880 spdk_bdev_get_qd_sampling_period(const struct spdk_bdev *bdev) 1881 { 1882 return bdev->internal.period; 1883 } 1884 1885 uint64_t 1886 spdk_bdev_get_weighted_io_time(const struct spdk_bdev *bdev) 1887 { 1888 return bdev->internal.weighted_io_time; 1889 } 1890 1891 uint64_t 1892 spdk_bdev_get_io_time(const struct spdk_bdev *bdev) 1893 { 1894 return bdev->internal.io_time; 1895 } 1896 1897 static void 1898 _calculate_measured_qd_cpl(struct spdk_io_channel_iter *i, int status) 1899 { 1900 struct spdk_bdev *bdev = spdk_io_channel_iter_get_ctx(i); 1901 1902 bdev->internal.measured_queue_depth = bdev->internal.temporary_queue_depth; 1903 1904 if (bdev->internal.measured_queue_depth) { 1905 bdev->internal.io_time += bdev->internal.period; 1906 bdev->internal.weighted_io_time += bdev->internal.period * bdev->internal.measured_queue_depth; 1907 } 1908 } 1909 1910 static void 1911 _calculate_measured_qd(struct spdk_io_channel_iter *i) 1912 { 1913 struct spdk_bdev *bdev = spdk_io_channel_iter_get_ctx(i); 1914 struct spdk_io_channel *io_ch = spdk_io_channel_iter_get_channel(i); 1915 struct spdk_bdev_channel *ch = spdk_io_channel_get_ctx(io_ch); 1916 1917 bdev->internal.temporary_queue_depth += ch->io_outstanding; 1918 spdk_for_each_channel_continue(i, 0); 1919 } 1920 1921 static int 1922 spdk_bdev_calculate_measured_queue_depth(void *ctx) 1923 { 1924 struct spdk_bdev *bdev = ctx; 1925 bdev->internal.temporary_queue_depth = 0; 1926 spdk_for_each_channel(__bdev_to_io_dev(bdev), _calculate_measured_qd, bdev, 1927 _calculate_measured_qd_cpl); 1928 return 0; 1929 } 1930 1931 void 1932 spdk_bdev_set_qd_sampling_period(struct spdk_bdev *bdev, uint64_t period) 1933 { 1934 bdev->internal.period = period; 1935 1936 if (bdev->internal.qd_poller != NULL) { 1937 spdk_poller_unregister(&bdev->internal.qd_poller); 1938 bdev->internal.measured_queue_depth = UINT64_MAX; 1939 } 1940 1941 if (period != 0) { 1942 bdev->internal.qd_poller = spdk_poller_register(spdk_bdev_calculate_measured_queue_depth, bdev, 1943 period); 1944 } 1945 } 1946 1947 int 1948 spdk_bdev_notify_blockcnt_change(struct spdk_bdev *bdev, uint64_t size) 1949 { 1950 int ret; 1951 1952 pthread_mutex_lock(&bdev->internal.mutex); 1953 1954 /* bdev has open descriptors */ 1955 if (!TAILQ_EMPTY(&bdev->internal.open_descs) && 1956 bdev->blockcnt > size) { 1957 ret = -EBUSY; 1958 } else { 1959 bdev->blockcnt = size; 1960 ret = 0; 1961 } 1962 1963 pthread_mutex_unlock(&bdev->internal.mutex); 1964 1965 return ret; 1966 } 1967 1968 /* 1969 * Convert I/O offset and length from bytes to blocks. 1970 * 1971 * Returns zero on success or non-zero if the byte parameters aren't divisible by the block size. 1972 */ 1973 static uint64_t 1974 spdk_bdev_bytes_to_blocks(struct spdk_bdev *bdev, uint64_t offset_bytes, uint64_t *offset_blocks, 1975 uint64_t num_bytes, uint64_t *num_blocks) 1976 { 1977 uint32_t block_size = bdev->blocklen; 1978 1979 *offset_blocks = offset_bytes / block_size; 1980 *num_blocks = num_bytes / block_size; 1981 1982 return (offset_bytes % block_size) | (num_bytes % block_size); 1983 } 1984 1985 static bool 1986 spdk_bdev_io_valid_blocks(struct spdk_bdev *bdev, uint64_t offset_blocks, uint64_t num_blocks) 1987 { 1988 /* Return failure if offset_blocks + num_blocks is less than offset_blocks; indicates there 1989 * has been an overflow and hence the offset has been wrapped around */ 1990 if (offset_blocks + num_blocks < offset_blocks) { 1991 return false; 1992 } 1993 1994 /* Return failure if offset_blocks + num_blocks exceeds the size of the bdev */ 1995 if (offset_blocks + num_blocks > bdev->blockcnt) { 1996 return false; 1997 } 1998 1999 return true; 2000 } 2001 2002 int 2003 spdk_bdev_read(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 2004 void *buf, uint64_t offset, uint64_t nbytes, 2005 spdk_bdev_io_completion_cb cb, void *cb_arg) 2006 { 2007 uint64_t offset_blocks, num_blocks; 2008 2009 if (spdk_bdev_bytes_to_blocks(desc->bdev, offset, &offset_blocks, nbytes, &num_blocks) != 0) { 2010 return -EINVAL; 2011 } 2012 2013 return spdk_bdev_read_blocks(desc, ch, buf, offset_blocks, num_blocks, cb, cb_arg); 2014 } 2015 2016 int 2017 spdk_bdev_read_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 2018 void *buf, uint64_t offset_blocks, uint64_t num_blocks, 2019 spdk_bdev_io_completion_cb cb, void *cb_arg) 2020 { 2021 struct spdk_bdev *bdev = desc->bdev; 2022 struct spdk_bdev_io *bdev_io; 2023 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 2024 2025 if (!spdk_bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) { 2026 return -EINVAL; 2027 } 2028 2029 bdev_io = spdk_bdev_get_io(channel); 2030 if (!bdev_io) { 2031 return -ENOMEM; 2032 } 2033 2034 bdev_io->internal.ch = channel; 2035 bdev_io->internal.desc = desc; 2036 bdev_io->type = SPDK_BDEV_IO_TYPE_READ; 2037 bdev_io->u.bdev.iovs = &bdev_io->iov; 2038 bdev_io->u.bdev.iovs[0].iov_base = buf; 2039 bdev_io->u.bdev.iovs[0].iov_len = num_blocks * bdev->blocklen; 2040 bdev_io->u.bdev.iovcnt = 1; 2041 bdev_io->u.bdev.num_blocks = num_blocks; 2042 bdev_io->u.bdev.offset_blocks = offset_blocks; 2043 spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb); 2044 2045 spdk_bdev_io_submit(bdev_io); 2046 return 0; 2047 } 2048 2049 int 2050 spdk_bdev_readv(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 2051 struct iovec *iov, int iovcnt, 2052 uint64_t offset, uint64_t nbytes, 2053 spdk_bdev_io_completion_cb cb, void *cb_arg) 2054 { 2055 uint64_t offset_blocks, num_blocks; 2056 2057 if (spdk_bdev_bytes_to_blocks(desc->bdev, offset, &offset_blocks, nbytes, &num_blocks) != 0) { 2058 return -EINVAL; 2059 } 2060 2061 return spdk_bdev_readv_blocks(desc, ch, iov, iovcnt, offset_blocks, num_blocks, cb, cb_arg); 2062 } 2063 2064 int spdk_bdev_readv_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 2065 struct iovec *iov, int iovcnt, 2066 uint64_t offset_blocks, uint64_t num_blocks, 2067 spdk_bdev_io_completion_cb cb, void *cb_arg) 2068 { 2069 struct spdk_bdev *bdev = desc->bdev; 2070 struct spdk_bdev_io *bdev_io; 2071 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 2072 2073 if (!spdk_bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) { 2074 return -EINVAL; 2075 } 2076 2077 bdev_io = spdk_bdev_get_io(channel); 2078 if (!bdev_io) { 2079 return -ENOMEM; 2080 } 2081 2082 bdev_io->internal.ch = channel; 2083 bdev_io->internal.desc = desc; 2084 bdev_io->type = SPDK_BDEV_IO_TYPE_READ; 2085 bdev_io->u.bdev.iovs = iov; 2086 bdev_io->u.bdev.iovcnt = iovcnt; 2087 bdev_io->u.bdev.num_blocks = num_blocks; 2088 bdev_io->u.bdev.offset_blocks = offset_blocks; 2089 spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb); 2090 2091 spdk_bdev_io_submit(bdev_io); 2092 return 0; 2093 } 2094 2095 int 2096 spdk_bdev_write(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 2097 void *buf, uint64_t offset, uint64_t nbytes, 2098 spdk_bdev_io_completion_cb cb, void *cb_arg) 2099 { 2100 uint64_t offset_blocks, num_blocks; 2101 2102 if (spdk_bdev_bytes_to_blocks(desc->bdev, offset, &offset_blocks, nbytes, &num_blocks) != 0) { 2103 return -EINVAL; 2104 } 2105 2106 return spdk_bdev_write_blocks(desc, ch, buf, offset_blocks, num_blocks, cb, cb_arg); 2107 } 2108 2109 int 2110 spdk_bdev_write_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 2111 void *buf, uint64_t offset_blocks, uint64_t num_blocks, 2112 spdk_bdev_io_completion_cb cb, void *cb_arg) 2113 { 2114 struct spdk_bdev *bdev = desc->bdev; 2115 struct spdk_bdev_io *bdev_io; 2116 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 2117 2118 if (!desc->write) { 2119 return -EBADF; 2120 } 2121 2122 if (!spdk_bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) { 2123 return -EINVAL; 2124 } 2125 2126 bdev_io = spdk_bdev_get_io(channel); 2127 if (!bdev_io) { 2128 return -ENOMEM; 2129 } 2130 2131 bdev_io->internal.ch = channel; 2132 bdev_io->internal.desc = desc; 2133 bdev_io->type = SPDK_BDEV_IO_TYPE_WRITE; 2134 bdev_io->u.bdev.iovs = &bdev_io->iov; 2135 bdev_io->u.bdev.iovs[0].iov_base = buf; 2136 bdev_io->u.bdev.iovs[0].iov_len = num_blocks * bdev->blocklen; 2137 bdev_io->u.bdev.iovcnt = 1; 2138 bdev_io->u.bdev.num_blocks = num_blocks; 2139 bdev_io->u.bdev.offset_blocks = offset_blocks; 2140 spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb); 2141 2142 spdk_bdev_io_submit(bdev_io); 2143 return 0; 2144 } 2145 2146 int 2147 spdk_bdev_writev(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 2148 struct iovec *iov, int iovcnt, 2149 uint64_t offset, uint64_t len, 2150 spdk_bdev_io_completion_cb cb, void *cb_arg) 2151 { 2152 uint64_t offset_blocks, num_blocks; 2153 2154 if (spdk_bdev_bytes_to_blocks(desc->bdev, offset, &offset_blocks, len, &num_blocks) != 0) { 2155 return -EINVAL; 2156 } 2157 2158 return spdk_bdev_writev_blocks(desc, ch, iov, iovcnt, offset_blocks, num_blocks, cb, cb_arg); 2159 } 2160 2161 int 2162 spdk_bdev_writev_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 2163 struct iovec *iov, int iovcnt, 2164 uint64_t offset_blocks, uint64_t num_blocks, 2165 spdk_bdev_io_completion_cb cb, void *cb_arg) 2166 { 2167 struct spdk_bdev *bdev = desc->bdev; 2168 struct spdk_bdev_io *bdev_io; 2169 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 2170 2171 if (!desc->write) { 2172 return -EBADF; 2173 } 2174 2175 if (!spdk_bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) { 2176 return -EINVAL; 2177 } 2178 2179 bdev_io = spdk_bdev_get_io(channel); 2180 if (!bdev_io) { 2181 return -ENOMEM; 2182 } 2183 2184 bdev_io->internal.ch = channel; 2185 bdev_io->internal.desc = desc; 2186 bdev_io->type = SPDK_BDEV_IO_TYPE_WRITE; 2187 bdev_io->u.bdev.iovs = iov; 2188 bdev_io->u.bdev.iovcnt = iovcnt; 2189 bdev_io->u.bdev.num_blocks = num_blocks; 2190 bdev_io->u.bdev.offset_blocks = offset_blocks; 2191 spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb); 2192 2193 spdk_bdev_io_submit(bdev_io); 2194 return 0; 2195 } 2196 2197 int 2198 spdk_bdev_write_zeroes(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 2199 uint64_t offset, uint64_t len, 2200 spdk_bdev_io_completion_cb cb, void *cb_arg) 2201 { 2202 uint64_t offset_blocks, num_blocks; 2203 2204 if (spdk_bdev_bytes_to_blocks(desc->bdev, offset, &offset_blocks, len, &num_blocks) != 0) { 2205 return -EINVAL; 2206 } 2207 2208 return spdk_bdev_write_zeroes_blocks(desc, ch, offset_blocks, num_blocks, cb, cb_arg); 2209 } 2210 2211 int 2212 spdk_bdev_write_zeroes_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 2213 uint64_t offset_blocks, uint64_t num_blocks, 2214 spdk_bdev_io_completion_cb cb, void *cb_arg) 2215 { 2216 struct spdk_bdev *bdev = desc->bdev; 2217 struct spdk_bdev_io *bdev_io; 2218 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 2219 2220 if (!desc->write) { 2221 return -EBADF; 2222 } 2223 2224 if (!spdk_bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) { 2225 return -EINVAL; 2226 } 2227 2228 bdev_io = spdk_bdev_get_io(channel); 2229 2230 if (!bdev_io) { 2231 return -ENOMEM; 2232 } 2233 2234 bdev_io->type = SPDK_BDEV_IO_TYPE_WRITE_ZEROES; 2235 bdev_io->internal.ch = channel; 2236 bdev_io->internal.desc = desc; 2237 bdev_io->u.bdev.offset_blocks = offset_blocks; 2238 bdev_io->u.bdev.num_blocks = num_blocks; 2239 spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb); 2240 2241 if (_spdk_bdev_io_type_supported(bdev, SPDK_BDEV_IO_TYPE_WRITE_ZEROES)) { 2242 spdk_bdev_io_submit(bdev_io); 2243 return 0; 2244 } else if (_spdk_bdev_io_type_supported(bdev, SPDK_BDEV_IO_TYPE_WRITE)) { 2245 assert(spdk_bdev_get_block_size(bdev) <= ZERO_BUFFER_SIZE); 2246 bdev_io->u.bdev.split_remaining_num_blocks = num_blocks; 2247 bdev_io->u.bdev.split_current_offset_blocks = offset_blocks; 2248 _spdk_bdev_write_zero_buffer_next(bdev_io); 2249 return 0; 2250 } else { 2251 spdk_bdev_free_io(bdev_io); 2252 return -ENOTSUP; 2253 } 2254 } 2255 2256 int 2257 spdk_bdev_unmap(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 2258 uint64_t offset, uint64_t nbytes, 2259 spdk_bdev_io_completion_cb cb, void *cb_arg) 2260 { 2261 uint64_t offset_blocks, num_blocks; 2262 2263 if (spdk_bdev_bytes_to_blocks(desc->bdev, offset, &offset_blocks, nbytes, &num_blocks) != 0) { 2264 return -EINVAL; 2265 } 2266 2267 return spdk_bdev_unmap_blocks(desc, ch, offset_blocks, num_blocks, cb, cb_arg); 2268 } 2269 2270 int 2271 spdk_bdev_unmap_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 2272 uint64_t offset_blocks, uint64_t num_blocks, 2273 spdk_bdev_io_completion_cb cb, void *cb_arg) 2274 { 2275 struct spdk_bdev *bdev = desc->bdev; 2276 struct spdk_bdev_io *bdev_io; 2277 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 2278 2279 if (!desc->write) { 2280 return -EBADF; 2281 } 2282 2283 if (!spdk_bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) { 2284 return -EINVAL; 2285 } 2286 2287 if (num_blocks == 0) { 2288 SPDK_ERRLOG("Can't unmap 0 bytes\n"); 2289 return -EINVAL; 2290 } 2291 2292 bdev_io = spdk_bdev_get_io(channel); 2293 if (!bdev_io) { 2294 return -ENOMEM; 2295 } 2296 2297 bdev_io->internal.ch = channel; 2298 bdev_io->internal.desc = desc; 2299 bdev_io->type = SPDK_BDEV_IO_TYPE_UNMAP; 2300 2301 bdev_io->u.bdev.iovs = &bdev_io->iov; 2302 bdev_io->u.bdev.iovs[0].iov_base = NULL; 2303 bdev_io->u.bdev.iovs[0].iov_len = 0; 2304 bdev_io->u.bdev.iovcnt = 1; 2305 2306 bdev_io->u.bdev.offset_blocks = offset_blocks; 2307 bdev_io->u.bdev.num_blocks = num_blocks; 2308 spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb); 2309 2310 spdk_bdev_io_submit(bdev_io); 2311 return 0; 2312 } 2313 2314 int 2315 spdk_bdev_flush(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 2316 uint64_t offset, uint64_t length, 2317 spdk_bdev_io_completion_cb cb, void *cb_arg) 2318 { 2319 uint64_t offset_blocks, num_blocks; 2320 2321 if (spdk_bdev_bytes_to_blocks(desc->bdev, offset, &offset_blocks, length, &num_blocks) != 0) { 2322 return -EINVAL; 2323 } 2324 2325 return spdk_bdev_flush_blocks(desc, ch, offset_blocks, num_blocks, cb, cb_arg); 2326 } 2327 2328 int 2329 spdk_bdev_flush_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 2330 uint64_t offset_blocks, uint64_t num_blocks, 2331 spdk_bdev_io_completion_cb cb, void *cb_arg) 2332 { 2333 struct spdk_bdev *bdev = desc->bdev; 2334 struct spdk_bdev_io *bdev_io; 2335 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 2336 2337 if (!desc->write) { 2338 return -EBADF; 2339 } 2340 2341 if (!spdk_bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) { 2342 return -EINVAL; 2343 } 2344 2345 bdev_io = spdk_bdev_get_io(channel); 2346 if (!bdev_io) { 2347 return -ENOMEM; 2348 } 2349 2350 bdev_io->internal.ch = channel; 2351 bdev_io->internal.desc = desc; 2352 bdev_io->type = SPDK_BDEV_IO_TYPE_FLUSH; 2353 bdev_io->u.bdev.iovs = NULL; 2354 bdev_io->u.bdev.iovcnt = 0; 2355 bdev_io->u.bdev.offset_blocks = offset_blocks; 2356 bdev_io->u.bdev.num_blocks = num_blocks; 2357 spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb); 2358 2359 spdk_bdev_io_submit(bdev_io); 2360 return 0; 2361 } 2362 2363 static void 2364 _spdk_bdev_reset_dev(struct spdk_io_channel_iter *i, int status) 2365 { 2366 struct spdk_bdev_channel *ch = spdk_io_channel_iter_get_ctx(i); 2367 struct spdk_bdev_io *bdev_io; 2368 2369 bdev_io = TAILQ_FIRST(&ch->queued_resets); 2370 TAILQ_REMOVE(&ch->queued_resets, bdev_io, internal.link); 2371 spdk_bdev_io_submit_reset(bdev_io); 2372 } 2373 2374 static void 2375 _spdk_bdev_reset_freeze_channel(struct spdk_io_channel_iter *i) 2376 { 2377 struct spdk_io_channel *ch; 2378 struct spdk_bdev_channel *channel; 2379 struct spdk_bdev_mgmt_channel *mgmt_channel; 2380 struct spdk_bdev_shared_resource *shared_resource; 2381 bdev_io_tailq_t tmp_queued; 2382 2383 TAILQ_INIT(&tmp_queued); 2384 2385 ch = spdk_io_channel_iter_get_channel(i); 2386 channel = spdk_io_channel_get_ctx(ch); 2387 shared_resource = channel->shared_resource; 2388 mgmt_channel = shared_resource->mgmt_ch; 2389 2390 channel->flags |= BDEV_CH_RESET_IN_PROGRESS; 2391 2392 if ((channel->flags & BDEV_CH_QOS_ENABLED) != 0) { 2393 /* The QoS object is always valid and readable while 2394 * the channel flag is set, so the lock here should not 2395 * be necessary. We're not in the fast path though, so 2396 * just take it anyway. */ 2397 pthread_mutex_lock(&channel->bdev->internal.mutex); 2398 if (channel->bdev->internal.qos->ch == channel) { 2399 TAILQ_SWAP(&channel->bdev->internal.qos->queued, &tmp_queued, spdk_bdev_io, internal.link); 2400 } 2401 pthread_mutex_unlock(&channel->bdev->internal.mutex); 2402 } 2403 2404 _spdk_bdev_abort_queued_io(&shared_resource->nomem_io, channel); 2405 _spdk_bdev_abort_buf_io(&mgmt_channel->need_buf_small, channel); 2406 _spdk_bdev_abort_buf_io(&mgmt_channel->need_buf_large, channel); 2407 _spdk_bdev_abort_queued_io(&tmp_queued, channel); 2408 2409 spdk_for_each_channel_continue(i, 0); 2410 } 2411 2412 static void 2413 _spdk_bdev_start_reset(void *ctx) 2414 { 2415 struct spdk_bdev_channel *ch = ctx; 2416 2417 spdk_for_each_channel(__bdev_to_io_dev(ch->bdev), _spdk_bdev_reset_freeze_channel, 2418 ch, _spdk_bdev_reset_dev); 2419 } 2420 2421 static void 2422 _spdk_bdev_channel_start_reset(struct spdk_bdev_channel *ch) 2423 { 2424 struct spdk_bdev *bdev = ch->bdev; 2425 2426 assert(!TAILQ_EMPTY(&ch->queued_resets)); 2427 2428 pthread_mutex_lock(&bdev->internal.mutex); 2429 if (bdev->internal.reset_in_progress == NULL) { 2430 bdev->internal.reset_in_progress = TAILQ_FIRST(&ch->queued_resets); 2431 /* 2432 * Take a channel reference for the target bdev for the life of this 2433 * reset. This guards against the channel getting destroyed while 2434 * spdk_for_each_channel() calls related to this reset IO are in 2435 * progress. We will release the reference when this reset is 2436 * completed. 2437 */ 2438 bdev->internal.reset_in_progress->u.reset.ch_ref = spdk_get_io_channel(__bdev_to_io_dev(bdev)); 2439 _spdk_bdev_start_reset(ch); 2440 } 2441 pthread_mutex_unlock(&bdev->internal.mutex); 2442 } 2443 2444 int 2445 spdk_bdev_reset(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 2446 spdk_bdev_io_completion_cb cb, void *cb_arg) 2447 { 2448 struct spdk_bdev *bdev = desc->bdev; 2449 struct spdk_bdev_io *bdev_io; 2450 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 2451 2452 bdev_io = spdk_bdev_get_io(channel); 2453 if (!bdev_io) { 2454 return -ENOMEM; 2455 } 2456 2457 bdev_io->internal.ch = channel; 2458 bdev_io->internal.desc = desc; 2459 bdev_io->type = SPDK_BDEV_IO_TYPE_RESET; 2460 bdev_io->u.reset.ch_ref = NULL; 2461 spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb); 2462 2463 pthread_mutex_lock(&bdev->internal.mutex); 2464 TAILQ_INSERT_TAIL(&channel->queued_resets, bdev_io, internal.link); 2465 pthread_mutex_unlock(&bdev->internal.mutex); 2466 2467 _spdk_bdev_channel_start_reset(channel); 2468 2469 return 0; 2470 } 2471 2472 void 2473 spdk_bdev_get_io_stat(struct spdk_bdev *bdev, struct spdk_io_channel *ch, 2474 struct spdk_bdev_io_stat *stat) 2475 { 2476 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 2477 2478 *stat = channel->stat; 2479 } 2480 2481 static void 2482 _spdk_bdev_get_device_stat_done(struct spdk_io_channel_iter *i, int status) 2483 { 2484 void *io_device = spdk_io_channel_iter_get_io_device(i); 2485 struct spdk_bdev_iostat_ctx *bdev_iostat_ctx = spdk_io_channel_iter_get_ctx(i); 2486 2487 bdev_iostat_ctx->cb(__bdev_from_io_dev(io_device), bdev_iostat_ctx->stat, 2488 bdev_iostat_ctx->cb_arg, 0); 2489 free(bdev_iostat_ctx); 2490 } 2491 2492 static void 2493 _spdk_bdev_get_each_channel_stat(struct spdk_io_channel_iter *i) 2494 { 2495 struct spdk_bdev_iostat_ctx *bdev_iostat_ctx = spdk_io_channel_iter_get_ctx(i); 2496 struct spdk_io_channel *ch = spdk_io_channel_iter_get_channel(i); 2497 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 2498 2499 _spdk_bdev_io_stat_add(bdev_iostat_ctx->stat, &channel->stat); 2500 spdk_for_each_channel_continue(i, 0); 2501 } 2502 2503 void 2504 spdk_bdev_get_device_stat(struct spdk_bdev *bdev, struct spdk_bdev_io_stat *stat, 2505 spdk_bdev_get_device_stat_cb cb, void *cb_arg) 2506 { 2507 struct spdk_bdev_iostat_ctx *bdev_iostat_ctx; 2508 2509 assert(bdev != NULL); 2510 assert(stat != NULL); 2511 assert(cb != NULL); 2512 2513 bdev_iostat_ctx = calloc(1, sizeof(struct spdk_bdev_iostat_ctx)); 2514 if (bdev_iostat_ctx == NULL) { 2515 SPDK_ERRLOG("Unable to allocate memory for spdk_bdev_iostat_ctx\n"); 2516 cb(bdev, stat, cb_arg, -ENOMEM); 2517 return; 2518 } 2519 2520 bdev_iostat_ctx->stat = stat; 2521 bdev_iostat_ctx->cb = cb; 2522 bdev_iostat_ctx->cb_arg = cb_arg; 2523 2524 /* Start with the statistics from previously deleted channels. */ 2525 pthread_mutex_lock(&bdev->internal.mutex); 2526 _spdk_bdev_io_stat_add(bdev_iostat_ctx->stat, &bdev->internal.stat); 2527 pthread_mutex_unlock(&bdev->internal.mutex); 2528 2529 /* Then iterate and add the statistics from each existing channel. */ 2530 spdk_for_each_channel(__bdev_to_io_dev(bdev), 2531 _spdk_bdev_get_each_channel_stat, 2532 bdev_iostat_ctx, 2533 _spdk_bdev_get_device_stat_done); 2534 } 2535 2536 int 2537 spdk_bdev_nvme_admin_passthru(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 2538 const struct spdk_nvme_cmd *cmd, void *buf, size_t nbytes, 2539 spdk_bdev_io_completion_cb cb, void *cb_arg) 2540 { 2541 struct spdk_bdev *bdev = desc->bdev; 2542 struct spdk_bdev_io *bdev_io; 2543 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 2544 2545 if (!desc->write) { 2546 return -EBADF; 2547 } 2548 2549 bdev_io = spdk_bdev_get_io(channel); 2550 if (!bdev_io) { 2551 return -ENOMEM; 2552 } 2553 2554 bdev_io->internal.ch = channel; 2555 bdev_io->internal.desc = desc; 2556 bdev_io->type = SPDK_BDEV_IO_TYPE_NVME_ADMIN; 2557 bdev_io->u.nvme_passthru.cmd = *cmd; 2558 bdev_io->u.nvme_passthru.buf = buf; 2559 bdev_io->u.nvme_passthru.nbytes = nbytes; 2560 bdev_io->u.nvme_passthru.md_buf = NULL; 2561 bdev_io->u.nvme_passthru.md_len = 0; 2562 2563 spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb); 2564 2565 spdk_bdev_io_submit(bdev_io); 2566 return 0; 2567 } 2568 2569 int 2570 spdk_bdev_nvme_io_passthru(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 2571 const struct spdk_nvme_cmd *cmd, void *buf, size_t nbytes, 2572 spdk_bdev_io_completion_cb cb, void *cb_arg) 2573 { 2574 struct spdk_bdev *bdev = desc->bdev; 2575 struct spdk_bdev_io *bdev_io; 2576 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 2577 2578 if (!desc->write) { 2579 /* 2580 * Do not try to parse the NVMe command - we could maybe use bits in the opcode 2581 * to easily determine if the command is a read or write, but for now just 2582 * do not allow io_passthru with a read-only descriptor. 2583 */ 2584 return -EBADF; 2585 } 2586 2587 bdev_io = spdk_bdev_get_io(channel); 2588 if (!bdev_io) { 2589 return -ENOMEM; 2590 } 2591 2592 bdev_io->internal.ch = channel; 2593 bdev_io->internal.desc = desc; 2594 bdev_io->type = SPDK_BDEV_IO_TYPE_NVME_IO; 2595 bdev_io->u.nvme_passthru.cmd = *cmd; 2596 bdev_io->u.nvme_passthru.buf = buf; 2597 bdev_io->u.nvme_passthru.nbytes = nbytes; 2598 bdev_io->u.nvme_passthru.md_buf = NULL; 2599 bdev_io->u.nvme_passthru.md_len = 0; 2600 2601 spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb); 2602 2603 spdk_bdev_io_submit(bdev_io); 2604 return 0; 2605 } 2606 2607 int 2608 spdk_bdev_nvme_io_passthru_md(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 2609 const struct spdk_nvme_cmd *cmd, void *buf, size_t nbytes, void *md_buf, size_t md_len, 2610 spdk_bdev_io_completion_cb cb, void *cb_arg) 2611 { 2612 struct spdk_bdev *bdev = desc->bdev; 2613 struct spdk_bdev_io *bdev_io; 2614 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 2615 2616 if (!desc->write) { 2617 /* 2618 * Do not try to parse the NVMe command - we could maybe use bits in the opcode 2619 * to easily determine if the command is a read or write, but for now just 2620 * do not allow io_passthru with a read-only descriptor. 2621 */ 2622 return -EBADF; 2623 } 2624 2625 bdev_io = spdk_bdev_get_io(channel); 2626 if (!bdev_io) { 2627 return -ENOMEM; 2628 } 2629 2630 bdev_io->internal.ch = channel; 2631 bdev_io->internal.desc = desc; 2632 bdev_io->type = SPDK_BDEV_IO_TYPE_NVME_IO_MD; 2633 bdev_io->u.nvme_passthru.cmd = *cmd; 2634 bdev_io->u.nvme_passthru.buf = buf; 2635 bdev_io->u.nvme_passthru.nbytes = nbytes; 2636 bdev_io->u.nvme_passthru.md_buf = md_buf; 2637 bdev_io->u.nvme_passthru.md_len = md_len; 2638 2639 spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb); 2640 2641 spdk_bdev_io_submit(bdev_io); 2642 return 0; 2643 } 2644 2645 int 2646 spdk_bdev_queue_io_wait(struct spdk_bdev *bdev, struct spdk_io_channel *ch, 2647 struct spdk_bdev_io_wait_entry *entry) 2648 { 2649 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 2650 struct spdk_bdev_mgmt_channel *mgmt_ch = channel->shared_resource->mgmt_ch; 2651 2652 if (bdev != entry->bdev) { 2653 SPDK_ERRLOG("bdevs do not match\n"); 2654 return -EINVAL; 2655 } 2656 2657 if (mgmt_ch->per_thread_cache_count > 0) { 2658 SPDK_ERRLOG("Cannot queue io_wait if spdk_bdev_io available in per-thread cache\n"); 2659 return -EINVAL; 2660 } 2661 2662 TAILQ_INSERT_TAIL(&mgmt_ch->io_wait_queue, entry, link); 2663 return 0; 2664 } 2665 2666 static void 2667 _spdk_bdev_ch_retry_io(struct spdk_bdev_channel *bdev_ch) 2668 { 2669 struct spdk_bdev *bdev = bdev_ch->bdev; 2670 struct spdk_bdev_shared_resource *shared_resource = bdev_ch->shared_resource; 2671 struct spdk_bdev_io *bdev_io; 2672 2673 if (shared_resource->io_outstanding > shared_resource->nomem_threshold) { 2674 /* 2675 * Allow some more I/O to complete before retrying the nomem_io queue. 2676 * Some drivers (such as nvme) cannot immediately take a new I/O in 2677 * the context of a completion, because the resources for the I/O are 2678 * not released until control returns to the bdev poller. Also, we 2679 * may require several small I/O to complete before a larger I/O 2680 * (that requires splitting) can be submitted. 2681 */ 2682 return; 2683 } 2684 2685 while (!TAILQ_EMPTY(&shared_resource->nomem_io)) { 2686 bdev_io = TAILQ_FIRST(&shared_resource->nomem_io); 2687 TAILQ_REMOVE(&shared_resource->nomem_io, bdev_io, internal.link); 2688 bdev_io->internal.ch->io_outstanding++; 2689 shared_resource->io_outstanding++; 2690 bdev_io->internal.status = SPDK_BDEV_IO_STATUS_PENDING; 2691 bdev->fn_table->submit_request(bdev_io->internal.ch->channel, bdev_io); 2692 if (bdev_io->internal.status == SPDK_BDEV_IO_STATUS_NOMEM) { 2693 break; 2694 } 2695 } 2696 } 2697 2698 static inline void 2699 _spdk_bdev_io_complete(void *ctx) 2700 { 2701 struct spdk_bdev_io *bdev_io = ctx; 2702 uint64_t tsc; 2703 2704 if (spdk_unlikely(bdev_io->internal.in_submit_request || bdev_io->internal.io_submit_ch)) { 2705 /* 2706 * Send the completion to the thread that originally submitted the I/O, 2707 * which may not be the current thread in the case of QoS. 2708 */ 2709 if (bdev_io->internal.io_submit_ch) { 2710 bdev_io->internal.ch = bdev_io->internal.io_submit_ch; 2711 bdev_io->internal.io_submit_ch = NULL; 2712 } 2713 2714 /* 2715 * Defer completion to avoid potential infinite recursion if the 2716 * user's completion callback issues a new I/O. 2717 */ 2718 spdk_thread_send_msg(spdk_io_channel_get_thread(bdev_io->internal.ch->channel), 2719 _spdk_bdev_io_complete, bdev_io); 2720 return; 2721 } 2722 2723 tsc = spdk_get_ticks(); 2724 spdk_trace_record_tsc(tsc, TRACE_BDEV_IO_DONE, 0, 0, (uintptr_t)bdev_io, 0); 2725 2726 if (bdev_io->internal.status == SPDK_BDEV_IO_STATUS_SUCCESS) { 2727 switch (bdev_io->type) { 2728 case SPDK_BDEV_IO_TYPE_READ: 2729 bdev_io->internal.ch->stat.bytes_read += bdev_io->u.bdev.num_blocks * bdev_io->bdev->blocklen; 2730 bdev_io->internal.ch->stat.num_read_ops++; 2731 bdev_io->internal.ch->stat.read_latency_ticks += (tsc - bdev_io->internal.submit_tsc); 2732 break; 2733 case SPDK_BDEV_IO_TYPE_WRITE: 2734 bdev_io->internal.ch->stat.bytes_written += bdev_io->u.bdev.num_blocks * bdev_io->bdev->blocklen; 2735 bdev_io->internal.ch->stat.num_write_ops++; 2736 bdev_io->internal.ch->stat.write_latency_ticks += (tsc - bdev_io->internal.submit_tsc); 2737 break; 2738 default: 2739 break; 2740 } 2741 } 2742 2743 #ifdef SPDK_CONFIG_VTUNE 2744 uint64_t now_tsc = spdk_get_ticks(); 2745 if (now_tsc > (bdev_io->internal.ch->start_tsc + bdev_io->internal.ch->interval_tsc)) { 2746 uint64_t data[5]; 2747 2748 data[0] = bdev_io->internal.ch->stat.num_read_ops - bdev_io->internal.ch->prev_stat.num_read_ops; 2749 data[1] = bdev_io->internal.ch->stat.bytes_read - bdev_io->internal.ch->prev_stat.bytes_read; 2750 data[2] = bdev_io->internal.ch->stat.num_write_ops - bdev_io->internal.ch->prev_stat.num_write_ops; 2751 data[3] = bdev_io->internal.ch->stat.bytes_written - bdev_io->internal.ch->prev_stat.bytes_written; 2752 data[4] = bdev_io->bdev->fn_table->get_spin_time ? 2753 bdev_io->bdev->fn_table->get_spin_time(bdev_io->internal.ch->channel) : 0; 2754 2755 __itt_metadata_add(g_bdev_mgr.domain, __itt_null, bdev_io->internal.ch->handle, 2756 __itt_metadata_u64, 5, data); 2757 2758 bdev_io->internal.ch->prev_stat = bdev_io->internal.ch->stat; 2759 bdev_io->internal.ch->start_tsc = now_tsc; 2760 } 2761 #endif 2762 2763 assert(bdev_io->internal.cb != NULL); 2764 assert(spdk_get_thread() == spdk_io_channel_get_thread(bdev_io->internal.ch->channel)); 2765 2766 bdev_io->internal.cb(bdev_io, bdev_io->internal.status == SPDK_BDEV_IO_STATUS_SUCCESS, 2767 bdev_io->internal.caller_ctx); 2768 } 2769 2770 static void 2771 _spdk_bdev_reset_complete(struct spdk_io_channel_iter *i, int status) 2772 { 2773 struct spdk_bdev_io *bdev_io = spdk_io_channel_iter_get_ctx(i); 2774 2775 if (bdev_io->u.reset.ch_ref != NULL) { 2776 spdk_put_io_channel(bdev_io->u.reset.ch_ref); 2777 bdev_io->u.reset.ch_ref = NULL; 2778 } 2779 2780 _spdk_bdev_io_complete(bdev_io); 2781 } 2782 2783 static void 2784 _spdk_bdev_unfreeze_channel(struct spdk_io_channel_iter *i) 2785 { 2786 struct spdk_io_channel *_ch = spdk_io_channel_iter_get_channel(i); 2787 struct spdk_bdev_channel *ch = spdk_io_channel_get_ctx(_ch); 2788 2789 ch->flags &= ~BDEV_CH_RESET_IN_PROGRESS; 2790 if (!TAILQ_EMPTY(&ch->queued_resets)) { 2791 _spdk_bdev_channel_start_reset(ch); 2792 } 2793 2794 spdk_for_each_channel_continue(i, 0); 2795 } 2796 2797 void 2798 spdk_bdev_io_complete(struct spdk_bdev_io *bdev_io, enum spdk_bdev_io_status status) 2799 { 2800 struct spdk_bdev *bdev = bdev_io->bdev; 2801 struct spdk_bdev_channel *bdev_ch = bdev_io->internal.ch; 2802 struct spdk_bdev_shared_resource *shared_resource = bdev_ch->shared_resource; 2803 2804 bdev_io->internal.status = status; 2805 2806 if (spdk_unlikely(bdev_io->type == SPDK_BDEV_IO_TYPE_RESET)) { 2807 bool unlock_channels = false; 2808 2809 if (status == SPDK_BDEV_IO_STATUS_NOMEM) { 2810 SPDK_ERRLOG("NOMEM returned for reset\n"); 2811 } 2812 pthread_mutex_lock(&bdev->internal.mutex); 2813 if (bdev_io == bdev->internal.reset_in_progress) { 2814 bdev->internal.reset_in_progress = NULL; 2815 unlock_channels = true; 2816 } 2817 pthread_mutex_unlock(&bdev->internal.mutex); 2818 2819 if (unlock_channels) { 2820 spdk_for_each_channel(__bdev_to_io_dev(bdev), _spdk_bdev_unfreeze_channel, 2821 bdev_io, _spdk_bdev_reset_complete); 2822 return; 2823 } 2824 } else { 2825 assert(bdev_ch->io_outstanding > 0); 2826 assert(shared_resource->io_outstanding > 0); 2827 bdev_ch->io_outstanding--; 2828 shared_resource->io_outstanding--; 2829 2830 if (spdk_unlikely(status == SPDK_BDEV_IO_STATUS_NOMEM)) { 2831 TAILQ_INSERT_HEAD(&shared_resource->nomem_io, bdev_io, internal.link); 2832 /* 2833 * Wait for some of the outstanding I/O to complete before we 2834 * retry any of the nomem_io. Normally we will wait for 2835 * NOMEM_THRESHOLD_COUNT I/O to complete but for low queue 2836 * depth channels we will instead wait for half to complete. 2837 */ 2838 shared_resource->nomem_threshold = spdk_max((int64_t)shared_resource->io_outstanding / 2, 2839 (int64_t)shared_resource->io_outstanding - NOMEM_THRESHOLD_COUNT); 2840 return; 2841 } 2842 2843 if (spdk_unlikely(!TAILQ_EMPTY(&shared_resource->nomem_io))) { 2844 _spdk_bdev_ch_retry_io(bdev_ch); 2845 } 2846 } 2847 2848 _spdk_bdev_io_complete(bdev_io); 2849 } 2850 2851 void 2852 spdk_bdev_io_complete_scsi_status(struct spdk_bdev_io *bdev_io, enum spdk_scsi_status sc, 2853 enum spdk_scsi_sense sk, uint8_t asc, uint8_t ascq) 2854 { 2855 if (sc == SPDK_SCSI_STATUS_GOOD) { 2856 bdev_io->internal.status = SPDK_BDEV_IO_STATUS_SUCCESS; 2857 } else { 2858 bdev_io->internal.status = SPDK_BDEV_IO_STATUS_SCSI_ERROR; 2859 bdev_io->internal.error.scsi.sc = sc; 2860 bdev_io->internal.error.scsi.sk = sk; 2861 bdev_io->internal.error.scsi.asc = asc; 2862 bdev_io->internal.error.scsi.ascq = ascq; 2863 } 2864 2865 spdk_bdev_io_complete(bdev_io, bdev_io->internal.status); 2866 } 2867 2868 void 2869 spdk_bdev_io_get_scsi_status(const struct spdk_bdev_io *bdev_io, 2870 int *sc, int *sk, int *asc, int *ascq) 2871 { 2872 assert(sc != NULL); 2873 assert(sk != NULL); 2874 assert(asc != NULL); 2875 assert(ascq != NULL); 2876 2877 switch (bdev_io->internal.status) { 2878 case SPDK_BDEV_IO_STATUS_SUCCESS: 2879 *sc = SPDK_SCSI_STATUS_GOOD; 2880 *sk = SPDK_SCSI_SENSE_NO_SENSE; 2881 *asc = SPDK_SCSI_ASC_NO_ADDITIONAL_SENSE; 2882 *ascq = SPDK_SCSI_ASCQ_CAUSE_NOT_REPORTABLE; 2883 break; 2884 case SPDK_BDEV_IO_STATUS_NVME_ERROR: 2885 spdk_scsi_nvme_translate(bdev_io, sc, sk, asc, ascq); 2886 break; 2887 case SPDK_BDEV_IO_STATUS_SCSI_ERROR: 2888 *sc = bdev_io->internal.error.scsi.sc; 2889 *sk = bdev_io->internal.error.scsi.sk; 2890 *asc = bdev_io->internal.error.scsi.asc; 2891 *ascq = bdev_io->internal.error.scsi.ascq; 2892 break; 2893 default: 2894 *sc = SPDK_SCSI_STATUS_CHECK_CONDITION; 2895 *sk = SPDK_SCSI_SENSE_ABORTED_COMMAND; 2896 *asc = SPDK_SCSI_ASC_NO_ADDITIONAL_SENSE; 2897 *ascq = SPDK_SCSI_ASCQ_CAUSE_NOT_REPORTABLE; 2898 break; 2899 } 2900 } 2901 2902 void 2903 spdk_bdev_io_complete_nvme_status(struct spdk_bdev_io *bdev_io, int sct, int sc) 2904 { 2905 if (sct == SPDK_NVME_SCT_GENERIC && sc == SPDK_NVME_SC_SUCCESS) { 2906 bdev_io->internal.status = SPDK_BDEV_IO_STATUS_SUCCESS; 2907 } else { 2908 bdev_io->internal.error.nvme.sct = sct; 2909 bdev_io->internal.error.nvme.sc = sc; 2910 bdev_io->internal.status = SPDK_BDEV_IO_STATUS_NVME_ERROR; 2911 } 2912 2913 spdk_bdev_io_complete(bdev_io, bdev_io->internal.status); 2914 } 2915 2916 void 2917 spdk_bdev_io_get_nvme_status(const struct spdk_bdev_io *bdev_io, int *sct, int *sc) 2918 { 2919 assert(sct != NULL); 2920 assert(sc != NULL); 2921 2922 if (bdev_io->internal.status == SPDK_BDEV_IO_STATUS_NVME_ERROR) { 2923 *sct = bdev_io->internal.error.nvme.sct; 2924 *sc = bdev_io->internal.error.nvme.sc; 2925 } else if (bdev_io->internal.status == SPDK_BDEV_IO_STATUS_SUCCESS) { 2926 *sct = SPDK_NVME_SCT_GENERIC; 2927 *sc = SPDK_NVME_SC_SUCCESS; 2928 } else { 2929 *sct = SPDK_NVME_SCT_GENERIC; 2930 *sc = SPDK_NVME_SC_INTERNAL_DEVICE_ERROR; 2931 } 2932 } 2933 2934 struct spdk_thread * 2935 spdk_bdev_io_get_thread(struct spdk_bdev_io *bdev_io) 2936 { 2937 return spdk_io_channel_get_thread(bdev_io->internal.ch->channel); 2938 } 2939 2940 static void 2941 _spdk_bdev_qos_config_type(struct spdk_bdev *bdev, uint64_t qos_set, 2942 enum spdk_bdev_qos_type qos_type) 2943 { 2944 uint64_t min_qos_set = 0; 2945 2946 switch (qos_type) { 2947 case SPDK_BDEV_QOS_RW_IOPS_RATE_LIMIT: 2948 min_qos_set = SPDK_BDEV_QOS_MIN_IOS_PER_SEC; 2949 break; 2950 case SPDK_BDEV_QOS_RW_BYTEPS_RATE_LIMIT: 2951 min_qos_set = SPDK_BDEV_QOS_MIN_BW_IN_MB_PER_SEC; 2952 break; 2953 default: 2954 SPDK_ERRLOG("Unsupported QoS type.\n"); 2955 return; 2956 } 2957 2958 if (qos_set % min_qos_set) { 2959 SPDK_ERRLOG("Assigned QoS %" PRIu64 " on bdev %s is not multiple of %lu\n", 2960 qos_set, bdev->name, min_qos_set); 2961 SPDK_ERRLOG("Failed to enable QoS on this bdev %s\n", bdev->name); 2962 return; 2963 } 2964 2965 if (!bdev->internal.qos) { 2966 bdev->internal.qos = calloc(1, sizeof(*bdev->internal.qos)); 2967 if (!bdev->internal.qos) { 2968 SPDK_ERRLOG("Unable to allocate memory for QoS tracking\n"); 2969 return; 2970 } 2971 } 2972 2973 switch (qos_type) { 2974 case SPDK_BDEV_QOS_RW_IOPS_RATE_LIMIT: 2975 bdev->internal.qos->iops_rate_limit = qos_set; 2976 break; 2977 case SPDK_BDEV_QOS_RW_BYTEPS_RATE_LIMIT: 2978 bdev->internal.qos->byte_rate_limit = qos_set * 1024 * 1024; 2979 break; 2980 default: 2981 break; 2982 } 2983 2984 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Bdev:%s QoS type:%d set:%lu\n", 2985 bdev->name, qos_type, qos_set); 2986 2987 return; 2988 } 2989 2990 static void 2991 _spdk_bdev_qos_config(struct spdk_bdev *bdev) 2992 { 2993 struct spdk_conf_section *sp = NULL; 2994 const char *val = NULL; 2995 uint64_t qos_set = 0; 2996 int i = 0, j = 0; 2997 2998 sp = spdk_conf_find_section(NULL, "QoS"); 2999 if (!sp) { 3000 return; 3001 } 3002 3003 while (j < SPDK_BDEV_QOS_NUM_TYPES) { 3004 i = 0; 3005 while (true) { 3006 val = spdk_conf_section_get_nmval(sp, qos_type_str[j], i, 0); 3007 if (!val) { 3008 break; 3009 } 3010 3011 if (strcmp(bdev->name, val) != 0) { 3012 i++; 3013 continue; 3014 } 3015 3016 val = spdk_conf_section_get_nmval(sp, qos_type_str[j], i, 1); 3017 if (val) { 3018 qos_set = strtoull(val, NULL, 10); 3019 _spdk_bdev_qos_config_type(bdev, qos_set, j); 3020 } 3021 3022 break; 3023 } 3024 3025 j++; 3026 } 3027 3028 return; 3029 } 3030 3031 static int 3032 spdk_bdev_init(struct spdk_bdev *bdev) 3033 { 3034 char *bdev_name; 3035 3036 assert(bdev->module != NULL); 3037 3038 if (!bdev->name) { 3039 SPDK_ERRLOG("Bdev name is NULL\n"); 3040 return -EINVAL; 3041 } 3042 3043 if (spdk_bdev_get_by_name(bdev->name)) { 3044 SPDK_ERRLOG("Bdev name:%s already exists\n", bdev->name); 3045 return -EEXIST; 3046 } 3047 3048 /* Users often register their own I/O devices using the bdev name. In 3049 * order to avoid conflicts, prepend bdev_. */ 3050 bdev_name = spdk_sprintf_alloc("bdev_%s", bdev->name); 3051 if (!bdev_name) { 3052 SPDK_ERRLOG("Unable to allocate memory for internal bdev name.\n"); 3053 return -ENOMEM; 3054 } 3055 3056 bdev->internal.status = SPDK_BDEV_STATUS_READY; 3057 bdev->internal.measured_queue_depth = UINT64_MAX; 3058 3059 TAILQ_INIT(&bdev->internal.open_descs); 3060 3061 TAILQ_INIT(&bdev->aliases); 3062 3063 bdev->internal.reset_in_progress = NULL; 3064 3065 _spdk_bdev_qos_config(bdev); 3066 3067 spdk_io_device_register(__bdev_to_io_dev(bdev), 3068 spdk_bdev_channel_create, spdk_bdev_channel_destroy, 3069 sizeof(struct spdk_bdev_channel), 3070 bdev_name); 3071 3072 free(bdev_name); 3073 3074 pthread_mutex_init(&bdev->internal.mutex, NULL); 3075 return 0; 3076 } 3077 3078 static void 3079 spdk_bdev_destroy_cb(void *io_device) 3080 { 3081 int rc; 3082 struct spdk_bdev *bdev; 3083 spdk_bdev_unregister_cb cb_fn; 3084 void *cb_arg; 3085 3086 bdev = __bdev_from_io_dev(io_device); 3087 cb_fn = bdev->internal.unregister_cb; 3088 cb_arg = bdev->internal.unregister_ctx; 3089 3090 rc = bdev->fn_table->destruct(bdev->ctxt); 3091 if (rc < 0) { 3092 SPDK_ERRLOG("destruct failed\n"); 3093 } 3094 if (rc <= 0 && cb_fn != NULL) { 3095 cb_fn(cb_arg, rc); 3096 } 3097 } 3098 3099 3100 static void 3101 spdk_bdev_fini(struct spdk_bdev *bdev) 3102 { 3103 pthread_mutex_destroy(&bdev->internal.mutex); 3104 3105 free(bdev->internal.qos); 3106 3107 spdk_io_device_unregister(__bdev_to_io_dev(bdev), spdk_bdev_destroy_cb); 3108 } 3109 3110 static void 3111 spdk_bdev_start(struct spdk_bdev *bdev) 3112 { 3113 struct spdk_bdev_module *module; 3114 uint32_t action; 3115 3116 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Inserting bdev %s into list\n", bdev->name); 3117 TAILQ_INSERT_TAIL(&g_bdev_mgr.bdevs, bdev, internal.link); 3118 3119 /* Examine configuration before initializing I/O */ 3120 TAILQ_FOREACH(module, &g_bdev_mgr.bdev_modules, internal.tailq) { 3121 if (module->examine_config) { 3122 action = module->internal.action_in_progress; 3123 module->internal.action_in_progress++; 3124 module->examine_config(bdev); 3125 if (action != module->internal.action_in_progress) { 3126 SPDK_ERRLOG("examine_config for module %s did not call spdk_bdev_module_examine_done()\n", 3127 module->name); 3128 } 3129 } 3130 } 3131 3132 if (bdev->internal.claim_module) { 3133 return; 3134 } 3135 3136 TAILQ_FOREACH(module, &g_bdev_mgr.bdev_modules, internal.tailq) { 3137 if (module->examine_disk) { 3138 module->internal.action_in_progress++; 3139 module->examine_disk(bdev); 3140 } 3141 } 3142 } 3143 3144 int 3145 spdk_bdev_register(struct spdk_bdev *bdev) 3146 { 3147 int rc = spdk_bdev_init(bdev); 3148 3149 if (rc == 0) { 3150 spdk_bdev_start(bdev); 3151 } 3152 3153 return rc; 3154 } 3155 3156 int 3157 spdk_vbdev_register(struct spdk_bdev *vbdev, struct spdk_bdev **base_bdevs, int base_bdev_count) 3158 { 3159 int rc; 3160 3161 rc = spdk_bdev_init(vbdev); 3162 if (rc) { 3163 return rc; 3164 } 3165 3166 spdk_bdev_start(vbdev); 3167 return 0; 3168 } 3169 3170 void 3171 spdk_bdev_destruct_done(struct spdk_bdev *bdev, int bdeverrno) 3172 { 3173 if (bdev->internal.unregister_cb != NULL) { 3174 bdev->internal.unregister_cb(bdev->internal.unregister_ctx, bdeverrno); 3175 } 3176 } 3177 3178 static void 3179 _remove_notify(void *arg) 3180 { 3181 struct spdk_bdev_desc *desc = arg; 3182 3183 desc->remove_scheduled = false; 3184 3185 if (desc->closed) { 3186 free(desc); 3187 } else { 3188 desc->remove_cb(desc->remove_ctx); 3189 } 3190 } 3191 3192 void 3193 spdk_bdev_unregister(struct spdk_bdev *bdev, spdk_bdev_unregister_cb cb_fn, void *cb_arg) 3194 { 3195 struct spdk_bdev_desc *desc, *tmp; 3196 bool do_destruct = true; 3197 struct spdk_thread *thread; 3198 3199 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Removing bdev %s from list\n", bdev->name); 3200 3201 thread = spdk_get_thread(); 3202 if (!thread) { 3203 /* The user called this from a non-SPDK thread. */ 3204 if (cb_fn != NULL) { 3205 cb_fn(cb_arg, -ENOTSUP); 3206 } 3207 return; 3208 } 3209 3210 pthread_mutex_lock(&bdev->internal.mutex); 3211 3212 bdev->internal.status = SPDK_BDEV_STATUS_REMOVING; 3213 bdev->internal.unregister_cb = cb_fn; 3214 bdev->internal.unregister_ctx = cb_arg; 3215 3216 TAILQ_FOREACH_SAFE(desc, &bdev->internal.open_descs, link, tmp) { 3217 if (desc->remove_cb) { 3218 do_destruct = false; 3219 /* 3220 * Defer invocation of the remove_cb to a separate message that will 3221 * run later on its thread. This ensures this context unwinds and 3222 * we don't recursively unregister this bdev again if the remove_cb 3223 * immediately closes its descriptor. 3224 */ 3225 if (!desc->remove_scheduled) { 3226 /* Avoid scheduling removal of the same descriptor multiple times. */ 3227 desc->remove_scheduled = true; 3228 spdk_thread_send_msg(desc->thread, _remove_notify, desc); 3229 } 3230 } 3231 } 3232 3233 if (!do_destruct) { 3234 pthread_mutex_unlock(&bdev->internal.mutex); 3235 return; 3236 } 3237 3238 TAILQ_REMOVE(&g_bdev_mgr.bdevs, bdev, internal.link); 3239 pthread_mutex_unlock(&bdev->internal.mutex); 3240 3241 spdk_bdev_fini(bdev); 3242 } 3243 3244 int 3245 spdk_bdev_open(struct spdk_bdev *bdev, bool write, spdk_bdev_remove_cb_t remove_cb, 3246 void *remove_ctx, struct spdk_bdev_desc **_desc) 3247 { 3248 struct spdk_bdev_desc *desc; 3249 struct spdk_thread *thread; 3250 3251 thread = spdk_get_thread(); 3252 if (!thread) { 3253 SPDK_ERRLOG("Cannot open bdev from non-SPDK thread.\n"); 3254 return -ENOTSUP; 3255 } 3256 3257 desc = calloc(1, sizeof(*desc)); 3258 if (desc == NULL) { 3259 SPDK_ERRLOG("Failed to allocate memory for bdev descriptor\n"); 3260 return -ENOMEM; 3261 } 3262 3263 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Opening descriptor %p for bdev %s on thread %p\n", desc, bdev->name, 3264 spdk_get_thread()); 3265 3266 pthread_mutex_lock(&bdev->internal.mutex); 3267 3268 if (write && bdev->internal.claim_module) { 3269 SPDK_ERRLOG("Could not open %s - %s module already claimed it\n", 3270 bdev->name, bdev->internal.claim_module->name); 3271 free(desc); 3272 pthread_mutex_unlock(&bdev->internal.mutex); 3273 return -EPERM; 3274 } 3275 3276 TAILQ_INSERT_TAIL(&bdev->internal.open_descs, desc, link); 3277 3278 desc->bdev = bdev; 3279 desc->thread = thread; 3280 desc->remove_cb = remove_cb; 3281 desc->remove_ctx = remove_ctx; 3282 desc->write = write; 3283 *_desc = desc; 3284 3285 pthread_mutex_unlock(&bdev->internal.mutex); 3286 3287 return 0; 3288 } 3289 3290 void 3291 spdk_bdev_close(struct spdk_bdev_desc *desc) 3292 { 3293 struct spdk_bdev *bdev = desc->bdev; 3294 bool do_unregister = false; 3295 3296 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Closing descriptor %p for bdev %s on thread %p\n", desc, bdev->name, 3297 spdk_get_thread()); 3298 3299 assert(desc->thread == spdk_get_thread()); 3300 3301 pthread_mutex_lock(&bdev->internal.mutex); 3302 3303 TAILQ_REMOVE(&bdev->internal.open_descs, desc, link); 3304 3305 desc->closed = true; 3306 3307 if (!desc->remove_scheduled) { 3308 free(desc); 3309 } 3310 3311 /* If no more descriptors, kill QoS channel */ 3312 if (bdev->internal.qos && TAILQ_EMPTY(&bdev->internal.open_descs)) { 3313 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Closed last descriptor for bdev %s on thread %p. Stopping QoS.\n", 3314 bdev->name, spdk_get_thread()); 3315 3316 if (spdk_bdev_qos_destroy(bdev)) { 3317 /* There isn't anything we can do to recover here. Just let the 3318 * old QoS poller keep running. The QoS handling won't change 3319 * cores when the user allocates a new channel, but it won't break. */ 3320 SPDK_ERRLOG("Unable to shut down QoS poller. It will continue running on the current thread.\n"); 3321 } 3322 } 3323 3324 spdk_bdev_set_qd_sampling_period(bdev, 0); 3325 3326 if (bdev->internal.status == SPDK_BDEV_STATUS_REMOVING && TAILQ_EMPTY(&bdev->internal.open_descs)) { 3327 do_unregister = true; 3328 } 3329 pthread_mutex_unlock(&bdev->internal.mutex); 3330 3331 if (do_unregister == true) { 3332 spdk_bdev_unregister(bdev, bdev->internal.unregister_cb, bdev->internal.unregister_ctx); 3333 } 3334 } 3335 3336 int 3337 spdk_bdev_module_claim_bdev(struct spdk_bdev *bdev, struct spdk_bdev_desc *desc, 3338 struct spdk_bdev_module *module) 3339 { 3340 if (bdev->internal.claim_module != NULL) { 3341 SPDK_ERRLOG("bdev %s already claimed by module %s\n", bdev->name, 3342 bdev->internal.claim_module->name); 3343 return -EPERM; 3344 } 3345 3346 if (desc && !desc->write) { 3347 desc->write = true; 3348 } 3349 3350 bdev->internal.claim_module = module; 3351 return 0; 3352 } 3353 3354 void 3355 spdk_bdev_module_release_bdev(struct spdk_bdev *bdev) 3356 { 3357 assert(bdev->internal.claim_module != NULL); 3358 bdev->internal.claim_module = NULL; 3359 } 3360 3361 struct spdk_bdev * 3362 spdk_bdev_desc_get_bdev(struct spdk_bdev_desc *desc) 3363 { 3364 return desc->bdev; 3365 } 3366 3367 void 3368 spdk_bdev_io_get_iovec(struct spdk_bdev_io *bdev_io, struct iovec **iovp, int *iovcntp) 3369 { 3370 struct iovec *iovs; 3371 int iovcnt; 3372 3373 if (bdev_io == NULL) { 3374 return; 3375 } 3376 3377 switch (bdev_io->type) { 3378 case SPDK_BDEV_IO_TYPE_READ: 3379 iovs = bdev_io->u.bdev.iovs; 3380 iovcnt = bdev_io->u.bdev.iovcnt; 3381 break; 3382 case SPDK_BDEV_IO_TYPE_WRITE: 3383 iovs = bdev_io->u.bdev.iovs; 3384 iovcnt = bdev_io->u.bdev.iovcnt; 3385 break; 3386 default: 3387 iovs = NULL; 3388 iovcnt = 0; 3389 break; 3390 } 3391 3392 if (iovp) { 3393 *iovp = iovs; 3394 } 3395 if (iovcntp) { 3396 *iovcntp = iovcnt; 3397 } 3398 } 3399 3400 void 3401 spdk_bdev_module_list_add(struct spdk_bdev_module *bdev_module) 3402 { 3403 3404 if (spdk_bdev_module_list_find(bdev_module->name)) { 3405 SPDK_ERRLOG("ERROR: module '%s' already registered.\n", bdev_module->name); 3406 assert(false); 3407 } 3408 3409 if (bdev_module->async_init) { 3410 bdev_module->internal.action_in_progress = 1; 3411 } 3412 3413 /* 3414 * Modules with examine callbacks must be initialized first, so they are 3415 * ready to handle examine callbacks from later modules that will 3416 * register physical bdevs. 3417 */ 3418 if (bdev_module->examine_config != NULL || bdev_module->examine_disk != NULL) { 3419 TAILQ_INSERT_HEAD(&g_bdev_mgr.bdev_modules, bdev_module, internal.tailq); 3420 } else { 3421 TAILQ_INSERT_TAIL(&g_bdev_mgr.bdev_modules, bdev_module, internal.tailq); 3422 } 3423 } 3424 3425 struct spdk_bdev_module * 3426 spdk_bdev_module_list_find(const char *name) 3427 { 3428 struct spdk_bdev_module *bdev_module; 3429 3430 TAILQ_FOREACH(bdev_module, &g_bdev_mgr.bdev_modules, internal.tailq) { 3431 if (strcmp(name, bdev_module->name) == 0) { 3432 break; 3433 } 3434 } 3435 3436 return bdev_module; 3437 } 3438 3439 static void 3440 _spdk_bdev_write_zero_buffer_next(void *_bdev_io) 3441 { 3442 struct spdk_bdev_io *bdev_io = _bdev_io; 3443 uint64_t num_bytes, num_blocks; 3444 int rc; 3445 3446 num_bytes = spdk_min(spdk_bdev_get_block_size(bdev_io->bdev) * 3447 bdev_io->u.bdev.split_remaining_num_blocks, 3448 ZERO_BUFFER_SIZE); 3449 num_blocks = num_bytes / spdk_bdev_get_block_size(bdev_io->bdev); 3450 3451 rc = spdk_bdev_write_blocks(bdev_io->internal.desc, 3452 spdk_io_channel_from_ctx(bdev_io->internal.ch), 3453 g_bdev_mgr.zero_buffer, 3454 bdev_io->u.bdev.split_current_offset_blocks, num_blocks, 3455 _spdk_bdev_write_zero_buffer_done, bdev_io); 3456 if (rc == 0) { 3457 bdev_io->u.bdev.split_remaining_num_blocks -= num_blocks; 3458 bdev_io->u.bdev.split_current_offset_blocks += num_blocks; 3459 } else if (rc == -ENOMEM) { 3460 bdev_io->internal.waitq_entry.bdev = bdev_io->bdev; 3461 bdev_io->internal.waitq_entry.cb_fn = _spdk_bdev_write_zero_buffer_next; 3462 bdev_io->internal.waitq_entry.cb_arg = bdev_io; 3463 spdk_bdev_queue_io_wait(bdev_io->bdev, spdk_io_channel_from_ctx(bdev_io->internal.ch), 3464 &bdev_io->internal.waitq_entry); 3465 } else { 3466 /* This should never happen. */ 3467 assert(false); 3468 bdev_io->internal.status = SPDK_BDEV_IO_STATUS_FAILED; 3469 bdev_io->internal.cb(bdev_io, SPDK_BDEV_IO_STATUS_FAILED, bdev_io->internal.caller_ctx); 3470 } 3471 } 3472 3473 static void 3474 _spdk_bdev_write_zero_buffer_done(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg) 3475 { 3476 struct spdk_bdev_io *parent_io = cb_arg; 3477 3478 spdk_bdev_free_io(bdev_io); 3479 3480 if (!success) { 3481 parent_io->internal.status = SPDK_BDEV_IO_STATUS_FAILED; 3482 parent_io->internal.cb(parent_io, SPDK_BDEV_IO_STATUS_FAILED, parent_io->internal.caller_ctx); 3483 return; 3484 } 3485 3486 if (parent_io->u.bdev.split_remaining_num_blocks == 0) { 3487 parent_io->internal.status = SPDK_BDEV_IO_STATUS_SUCCESS; 3488 parent_io->internal.cb(parent_io, SPDK_BDEV_IO_STATUS_SUCCESS, parent_io->internal.caller_ctx); 3489 return; 3490 } 3491 3492 _spdk_bdev_write_zero_buffer_next(parent_io); 3493 } 3494 3495 struct set_qos_limit_ctx { 3496 void (*cb_fn)(void *cb_arg, int status); 3497 void *cb_arg; 3498 struct spdk_bdev *bdev; 3499 }; 3500 3501 static void 3502 _spdk_bdev_set_qos_limit_done(struct set_qos_limit_ctx *ctx, int status) 3503 { 3504 pthread_mutex_lock(&ctx->bdev->internal.mutex); 3505 ctx->bdev->internal.qos_mod_in_progress = false; 3506 pthread_mutex_unlock(&ctx->bdev->internal.mutex); 3507 3508 ctx->cb_fn(ctx->cb_arg, status); 3509 free(ctx); 3510 } 3511 3512 static void 3513 _spdk_bdev_disable_qos_done(void *cb_arg) 3514 { 3515 struct set_qos_limit_ctx *ctx = cb_arg; 3516 struct spdk_bdev *bdev = ctx->bdev; 3517 struct spdk_bdev_io *bdev_io; 3518 struct spdk_bdev_qos *qos; 3519 3520 pthread_mutex_lock(&bdev->internal.mutex); 3521 qos = bdev->internal.qos; 3522 bdev->internal.qos = NULL; 3523 pthread_mutex_unlock(&bdev->internal.mutex); 3524 3525 while (!TAILQ_EMPTY(&qos->queued)) { 3526 /* Send queued I/O back to their original thread for resubmission. */ 3527 bdev_io = TAILQ_FIRST(&qos->queued); 3528 TAILQ_REMOVE(&qos->queued, bdev_io, internal.link); 3529 3530 if (bdev_io->internal.io_submit_ch) { 3531 /* 3532 * Channel was changed when sending it to the QoS thread - change it back 3533 * before sending it back to the original thread. 3534 */ 3535 bdev_io->internal.ch = bdev_io->internal.io_submit_ch; 3536 bdev_io->internal.io_submit_ch = NULL; 3537 } 3538 3539 spdk_thread_send_msg(spdk_io_channel_get_thread(bdev_io->internal.ch->channel), 3540 _spdk_bdev_io_submit, bdev_io); 3541 } 3542 3543 spdk_put_io_channel(spdk_io_channel_from_ctx(qos->ch)); 3544 spdk_poller_unregister(&qos->poller); 3545 3546 free(qos); 3547 3548 _spdk_bdev_set_qos_limit_done(ctx, 0); 3549 } 3550 3551 static void 3552 _spdk_bdev_disable_qos_msg_done(struct spdk_io_channel_iter *i, int status) 3553 { 3554 void *io_device = spdk_io_channel_iter_get_io_device(i); 3555 struct spdk_bdev *bdev = __bdev_from_io_dev(io_device); 3556 struct set_qos_limit_ctx *ctx = spdk_io_channel_iter_get_ctx(i); 3557 struct spdk_thread *thread; 3558 3559 pthread_mutex_lock(&bdev->internal.mutex); 3560 thread = bdev->internal.qos->thread; 3561 pthread_mutex_unlock(&bdev->internal.mutex); 3562 3563 spdk_thread_send_msg(thread, _spdk_bdev_disable_qos_done, ctx); 3564 } 3565 3566 static void 3567 _spdk_bdev_disable_qos_msg(struct spdk_io_channel_iter *i) 3568 { 3569 struct spdk_io_channel *ch = spdk_io_channel_iter_get_channel(i); 3570 struct spdk_bdev_channel *bdev_ch = spdk_io_channel_get_ctx(ch); 3571 3572 bdev_ch->flags &= ~BDEV_CH_QOS_ENABLED; 3573 3574 spdk_for_each_channel_continue(i, 0); 3575 } 3576 3577 static void 3578 _spdk_bdev_update_qos_limit_iops_msg(void *cb_arg) 3579 { 3580 struct set_qos_limit_ctx *ctx = cb_arg; 3581 struct spdk_bdev *bdev = ctx->bdev; 3582 3583 pthread_mutex_lock(&bdev->internal.mutex); 3584 spdk_bdev_qos_update_max_quota_per_timeslice(bdev->internal.qos); 3585 pthread_mutex_unlock(&bdev->internal.mutex); 3586 3587 _spdk_bdev_set_qos_limit_done(ctx, 0); 3588 } 3589 3590 static void 3591 _spdk_bdev_enable_qos_msg(struct spdk_io_channel_iter *i) 3592 { 3593 void *io_device = spdk_io_channel_iter_get_io_device(i); 3594 struct spdk_bdev *bdev = __bdev_from_io_dev(io_device); 3595 struct spdk_io_channel *ch = spdk_io_channel_iter_get_channel(i); 3596 struct spdk_bdev_channel *bdev_ch = spdk_io_channel_get_ctx(ch); 3597 3598 pthread_mutex_lock(&bdev->internal.mutex); 3599 _spdk_bdev_enable_qos(bdev, bdev_ch); 3600 pthread_mutex_unlock(&bdev->internal.mutex); 3601 spdk_for_each_channel_continue(i, 0); 3602 } 3603 3604 static void 3605 _spdk_bdev_enable_qos_done(struct spdk_io_channel_iter *i, int status) 3606 { 3607 struct set_qos_limit_ctx *ctx = spdk_io_channel_iter_get_ctx(i); 3608 3609 _spdk_bdev_set_qos_limit_done(ctx, status); 3610 } 3611 3612 void 3613 spdk_bdev_set_qos_limit_iops(struct spdk_bdev *bdev, uint64_t ios_per_sec, 3614 void (*cb_fn)(void *cb_arg, int status), void *cb_arg) 3615 { 3616 struct set_qos_limit_ctx *ctx; 3617 3618 if (ios_per_sec > 0 && ios_per_sec % SPDK_BDEV_QOS_MIN_IOS_PER_SEC) { 3619 SPDK_ERRLOG("Requested ios_per_sec limit %" PRIu64 " is not a multiple of %u\n", 3620 ios_per_sec, SPDK_BDEV_QOS_MIN_IOS_PER_SEC); 3621 cb_fn(cb_arg, -EINVAL); 3622 return; 3623 } 3624 3625 ctx = calloc(1, sizeof(*ctx)); 3626 if (ctx == NULL) { 3627 cb_fn(cb_arg, -ENOMEM); 3628 return; 3629 } 3630 3631 ctx->cb_fn = cb_fn; 3632 ctx->cb_arg = cb_arg; 3633 ctx->bdev = bdev; 3634 3635 pthread_mutex_lock(&bdev->internal.mutex); 3636 if (bdev->internal.qos_mod_in_progress) { 3637 pthread_mutex_unlock(&bdev->internal.mutex); 3638 free(ctx); 3639 cb_fn(cb_arg, -EAGAIN); 3640 return; 3641 } 3642 bdev->internal.qos_mod_in_progress = true; 3643 3644 if (ios_per_sec > 0) { 3645 if (bdev->internal.qos == NULL) { 3646 /* Enabling */ 3647 bdev->internal.qos = calloc(1, sizeof(*bdev->internal.qos)); 3648 if (!bdev->internal.qos) { 3649 pthread_mutex_unlock(&bdev->internal.mutex); 3650 SPDK_ERRLOG("Unable to allocate memory for QoS tracking\n"); 3651 free(ctx); 3652 cb_fn(cb_arg, -ENOMEM); 3653 return; 3654 } 3655 3656 bdev->internal.qos->iops_rate_limit = ios_per_sec; 3657 spdk_for_each_channel(__bdev_to_io_dev(bdev), 3658 _spdk_bdev_enable_qos_msg, ctx, 3659 _spdk_bdev_enable_qos_done); 3660 } else { 3661 /* Updating */ 3662 bdev->internal.qos->iops_rate_limit = ios_per_sec; 3663 spdk_thread_send_msg(bdev->internal.qos->thread, _spdk_bdev_update_qos_limit_iops_msg, ctx); 3664 } 3665 } else { 3666 if (bdev->internal.qos != NULL) { 3667 /* Disabling */ 3668 spdk_for_each_channel(__bdev_to_io_dev(bdev), 3669 _spdk_bdev_disable_qos_msg, ctx, 3670 _spdk_bdev_disable_qos_msg_done); 3671 } else { 3672 pthread_mutex_unlock(&bdev->internal.mutex); 3673 _spdk_bdev_set_qos_limit_done(ctx, 0); 3674 return; 3675 } 3676 } 3677 3678 pthread_mutex_unlock(&bdev->internal.mutex); 3679 } 3680 3681 SPDK_LOG_REGISTER_COMPONENT("bdev", SPDK_LOG_BDEV) 3682 3683 SPDK_TRACE_REGISTER_FN(bdev_trace) 3684 { 3685 spdk_trace_register_owner(OWNER_BDEV, 'b'); 3686 spdk_trace_register_object(OBJECT_BDEV_IO, 'i'); 3687 spdk_trace_register_description("BDEV_IO_START", "", TRACE_BDEV_IO_START, OWNER_BDEV, 3688 OBJECT_BDEV_IO, 1, 0, "type: "); 3689 spdk_trace_register_description("BDEV_IO_DONE", "", TRACE_BDEV_IO_DONE, OWNER_BDEV, 3690 OBJECT_BDEV_IO, 0, 0, ""); 3691 } 3692