1 /*- 2 * BSD LICENSE 3 * 4 * Copyright (c) Intel Corporation. 5 * All rights reserved. 6 * 7 * Redistribution and use in source and binary forms, with or without 8 * modification, are permitted provided that the following conditions 9 * are met: 10 * 11 * * Redistributions of source code must retain the above copyright 12 * notice, this list of conditions and the following disclaimer. 13 * * Redistributions in binary form must reproduce the above copyright 14 * notice, this list of conditions and the following disclaimer in 15 * the documentation and/or other materials provided with the 16 * distribution. 17 * * Neither the name of Intel Corporation nor the names of its 18 * contributors may be used to endorse or promote products derived 19 * from this software without specific prior written permission. 20 * 21 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 22 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 23 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR 24 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT 25 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, 26 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT 27 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 28 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY 29 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 30 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE 31 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 32 */ 33 34 #include "spdk/stdinc.h" 35 36 #include "spdk/bdev.h" 37 #include "spdk/conf.h" 38 39 #include "spdk/config.h" 40 #include "spdk/env.h" 41 #include "spdk/event.h" 42 #include "spdk/thread.h" 43 #include "spdk/likely.h" 44 #include "spdk/queue.h" 45 #include "spdk/nvme_spec.h" 46 #include "spdk/scsi_spec.h" 47 #include "spdk/util.h" 48 #include "spdk/trace.h" 49 50 #include "spdk/bdev_module.h" 51 #include "spdk_internal/log.h" 52 #include "spdk/string.h" 53 54 #ifdef SPDK_CONFIG_VTUNE 55 #include "ittnotify.h" 56 #include "ittnotify_types.h" 57 int __itt_init_ittlib(const char *, __itt_group_id); 58 #endif 59 60 #define SPDK_BDEV_IO_POOL_SIZE (64 * 1024) 61 #define SPDK_BDEV_IO_CACHE_SIZE 256 62 #define BUF_SMALL_POOL_SIZE 8192 63 #define BUF_LARGE_POOL_SIZE 1024 64 #define NOMEM_THRESHOLD_COUNT 8 65 #define ZERO_BUFFER_SIZE 0x100000 66 67 #define OWNER_BDEV 0x2 68 69 #define OBJECT_BDEV_IO 0x2 70 71 #define TRACE_GROUP_BDEV 0x3 72 #define TRACE_BDEV_IO_START SPDK_TPOINT_ID(TRACE_GROUP_BDEV, 0x0) 73 #define TRACE_BDEV_IO_DONE SPDK_TPOINT_ID(TRACE_GROUP_BDEV, 0x1) 74 75 #define SPDK_BDEV_QOS_TIMESLICE_IN_USEC 1000 76 #define SPDK_BDEV_QOS_MIN_IO_PER_TIMESLICE 1 77 #define SPDK_BDEV_QOS_MIN_BYTE_PER_TIMESLICE 512 78 #define SPDK_BDEV_QOS_MIN_IOS_PER_SEC 10000 79 #define SPDK_BDEV_QOS_MIN_BYTES_PER_SEC (10 * 1024 * 1024) 80 #define SPDK_BDEV_QOS_LIMIT_NOT_DEFINED UINT64_MAX 81 82 static const char *qos_conf_type[] = {"Limit_IOPS", "Limit_BPS"}; 83 static const char *qos_rpc_type[] = {"qos_ios_per_sec"}; 84 85 TAILQ_HEAD(spdk_bdev_list, spdk_bdev); 86 87 struct spdk_bdev_mgr { 88 struct spdk_mempool *bdev_io_pool; 89 90 struct spdk_mempool *buf_small_pool; 91 struct spdk_mempool *buf_large_pool; 92 93 void *zero_buffer; 94 95 TAILQ_HEAD(bdev_module_list, spdk_bdev_module) bdev_modules; 96 97 struct spdk_bdev_list bdevs; 98 99 bool init_complete; 100 bool module_init_complete; 101 102 #ifdef SPDK_CONFIG_VTUNE 103 __itt_domain *domain; 104 #endif 105 }; 106 107 static struct spdk_bdev_mgr g_bdev_mgr = { 108 .bdev_modules = TAILQ_HEAD_INITIALIZER(g_bdev_mgr.bdev_modules), 109 .bdevs = TAILQ_HEAD_INITIALIZER(g_bdev_mgr.bdevs), 110 .init_complete = false, 111 .module_init_complete = false, 112 }; 113 114 static struct spdk_bdev_opts g_bdev_opts = { 115 .bdev_io_pool_size = SPDK_BDEV_IO_POOL_SIZE, 116 .bdev_io_cache_size = SPDK_BDEV_IO_CACHE_SIZE, 117 }; 118 119 static spdk_bdev_init_cb g_init_cb_fn = NULL; 120 static void *g_init_cb_arg = NULL; 121 122 static spdk_bdev_fini_cb g_fini_cb_fn = NULL; 123 static void *g_fini_cb_arg = NULL; 124 static struct spdk_thread *g_fini_thread = NULL; 125 126 struct spdk_bdev_qos_limit { 127 /** IOs or bytes allowed per second (i.e., 1s). */ 128 uint64_t limit; 129 130 /** Remaining IOs or bytes allowed in current timeslice (e.g., 1ms). 131 * For remaining bytes, allowed to run negative if an I/O is submitted when 132 * some bytes are remaining, but the I/O is bigger than that amount. The 133 * excess will be deducted from the next timeslice. 134 */ 135 int64_t remaining_this_timeslice; 136 137 /** Minimum allowed IOs or bytes to be issued in one timeslice (e.g., 1ms). */ 138 uint32_t min_per_timeslice; 139 140 /** Maximum allowed IOs or bytes to be issued in one timeslice (e.g., 1ms). */ 141 uint32_t max_per_timeslice; 142 }; 143 144 struct spdk_bdev_qos { 145 /** Types of structure of rate limits. */ 146 struct spdk_bdev_qos_limit rate_limits[SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES]; 147 148 /** The channel that all I/O are funneled through. */ 149 struct spdk_bdev_channel *ch; 150 151 /** The thread on which the poller is running. */ 152 struct spdk_thread *thread; 153 154 /** Queue of I/O waiting to be issued. */ 155 bdev_io_tailq_t queued; 156 157 /** Size of a timeslice in tsc ticks. */ 158 uint64_t timeslice_size; 159 160 /** Timestamp of start of last timeslice. */ 161 uint64_t last_timeslice; 162 163 /** Poller that processes queued I/O commands each time slice. */ 164 struct spdk_poller *poller; 165 }; 166 167 struct spdk_bdev_mgmt_channel { 168 bdev_io_stailq_t need_buf_small; 169 bdev_io_stailq_t need_buf_large; 170 171 /* 172 * Each thread keeps a cache of bdev_io - this allows 173 * bdev threads which are *not* DPDK threads to still 174 * benefit from a per-thread bdev_io cache. Without 175 * this, non-DPDK threads fetching from the mempool 176 * incur a cmpxchg on get and put. 177 */ 178 bdev_io_stailq_t per_thread_cache; 179 uint32_t per_thread_cache_count; 180 uint32_t bdev_io_cache_size; 181 182 TAILQ_HEAD(, spdk_bdev_shared_resource) shared_resources; 183 TAILQ_HEAD(, spdk_bdev_io_wait_entry) io_wait_queue; 184 }; 185 186 /* 187 * Per-module (or per-io_device) data. Multiple bdevs built on the same io_device 188 * will queue here their IO that awaits retry. It makes it possible to retry sending 189 * IO to one bdev after IO from other bdev completes. 190 */ 191 struct spdk_bdev_shared_resource { 192 /* The bdev management channel */ 193 struct spdk_bdev_mgmt_channel *mgmt_ch; 194 195 /* 196 * Count of I/O submitted to bdev module and waiting for completion. 197 * Incremented before submit_request() is called on an spdk_bdev_io. 198 */ 199 uint64_t io_outstanding; 200 201 /* 202 * Queue of IO awaiting retry because of a previous NOMEM status returned 203 * on this channel. 204 */ 205 bdev_io_tailq_t nomem_io; 206 207 /* 208 * Threshold which io_outstanding must drop to before retrying nomem_io. 209 */ 210 uint64_t nomem_threshold; 211 212 /* I/O channel allocated by a bdev module */ 213 struct spdk_io_channel *shared_ch; 214 215 /* Refcount of bdev channels using this resource */ 216 uint32_t ref; 217 218 TAILQ_ENTRY(spdk_bdev_shared_resource) link; 219 }; 220 221 #define BDEV_CH_RESET_IN_PROGRESS (1 << 0) 222 #define BDEV_CH_QOS_ENABLED (1 << 1) 223 224 struct spdk_bdev_channel { 225 struct spdk_bdev *bdev; 226 227 /* The channel for the underlying device */ 228 struct spdk_io_channel *channel; 229 230 /* Per io_device per thread data */ 231 struct spdk_bdev_shared_resource *shared_resource; 232 233 struct spdk_bdev_io_stat stat; 234 235 /* 236 * Count of I/O submitted through this channel and waiting for completion. 237 * Incremented before submit_request() is called on an spdk_bdev_io. 238 */ 239 uint64_t io_outstanding; 240 241 bdev_io_tailq_t queued_resets; 242 243 uint32_t flags; 244 245 #ifdef SPDK_CONFIG_VTUNE 246 uint64_t start_tsc; 247 uint64_t interval_tsc; 248 __itt_string_handle *handle; 249 struct spdk_bdev_io_stat prev_stat; 250 #endif 251 252 }; 253 254 struct spdk_bdev_desc { 255 struct spdk_bdev *bdev; 256 struct spdk_thread *thread; 257 spdk_bdev_remove_cb_t remove_cb; 258 void *remove_ctx; 259 bool remove_scheduled; 260 bool closed; 261 bool write; 262 TAILQ_ENTRY(spdk_bdev_desc) link; 263 }; 264 265 struct spdk_bdev_iostat_ctx { 266 struct spdk_bdev_io_stat *stat; 267 spdk_bdev_get_device_stat_cb cb; 268 void *cb_arg; 269 }; 270 271 #define __bdev_to_io_dev(bdev) (((char *)bdev) + 1) 272 #define __bdev_from_io_dev(io_dev) ((struct spdk_bdev *)(((char *)io_dev) - 1)) 273 274 static void _spdk_bdev_write_zero_buffer_done(struct spdk_bdev_io *bdev_io, bool success, 275 void *cb_arg); 276 static void _spdk_bdev_write_zero_buffer_next(void *_bdev_io); 277 278 void 279 spdk_bdev_get_opts(struct spdk_bdev_opts *opts) 280 { 281 *opts = g_bdev_opts; 282 } 283 284 int 285 spdk_bdev_set_opts(struct spdk_bdev_opts *opts) 286 { 287 uint32_t min_pool_size; 288 289 /* 290 * Add 1 to the thread count to account for the extra mgmt_ch that gets created during subsystem 291 * initialization. A second mgmt_ch will be created on the same thread when the application starts 292 * but before the deferred put_io_channel event is executed for the first mgmt_ch. 293 */ 294 min_pool_size = opts->bdev_io_cache_size * (spdk_thread_get_count() + 1); 295 if (opts->bdev_io_pool_size < min_pool_size) { 296 SPDK_ERRLOG("bdev_io_pool_size %" PRIu32 " is not compatible with bdev_io_cache_size %" PRIu32 297 " and %" PRIu32 " threads\n", opts->bdev_io_pool_size, opts->bdev_io_cache_size, 298 spdk_thread_get_count()); 299 SPDK_ERRLOG("bdev_io_pool_size must be at least %" PRIu32 "\n", min_pool_size); 300 return -1; 301 } 302 303 g_bdev_opts = *opts; 304 return 0; 305 } 306 307 struct spdk_bdev * 308 spdk_bdev_first(void) 309 { 310 struct spdk_bdev *bdev; 311 312 bdev = TAILQ_FIRST(&g_bdev_mgr.bdevs); 313 if (bdev) { 314 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Starting bdev iteration at %s\n", bdev->name); 315 } 316 317 return bdev; 318 } 319 320 struct spdk_bdev * 321 spdk_bdev_next(struct spdk_bdev *prev) 322 { 323 struct spdk_bdev *bdev; 324 325 bdev = TAILQ_NEXT(prev, internal.link); 326 if (bdev) { 327 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Continuing bdev iteration at %s\n", bdev->name); 328 } 329 330 return bdev; 331 } 332 333 static struct spdk_bdev * 334 _bdev_next_leaf(struct spdk_bdev *bdev) 335 { 336 while (bdev != NULL) { 337 if (bdev->internal.claim_module == NULL) { 338 return bdev; 339 } else { 340 bdev = TAILQ_NEXT(bdev, internal.link); 341 } 342 } 343 344 return bdev; 345 } 346 347 struct spdk_bdev * 348 spdk_bdev_first_leaf(void) 349 { 350 struct spdk_bdev *bdev; 351 352 bdev = _bdev_next_leaf(TAILQ_FIRST(&g_bdev_mgr.bdevs)); 353 354 if (bdev) { 355 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Starting bdev iteration at %s\n", bdev->name); 356 } 357 358 return bdev; 359 } 360 361 struct spdk_bdev * 362 spdk_bdev_next_leaf(struct spdk_bdev *prev) 363 { 364 struct spdk_bdev *bdev; 365 366 bdev = _bdev_next_leaf(TAILQ_NEXT(prev, internal.link)); 367 368 if (bdev) { 369 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Continuing bdev iteration at %s\n", bdev->name); 370 } 371 372 return bdev; 373 } 374 375 struct spdk_bdev * 376 spdk_bdev_get_by_name(const char *bdev_name) 377 { 378 struct spdk_bdev_alias *tmp; 379 struct spdk_bdev *bdev = spdk_bdev_first(); 380 381 while (bdev != NULL) { 382 if (strcmp(bdev_name, bdev->name) == 0) { 383 return bdev; 384 } 385 386 TAILQ_FOREACH(tmp, &bdev->aliases, tailq) { 387 if (strcmp(bdev_name, tmp->alias) == 0) { 388 return bdev; 389 } 390 } 391 392 bdev = spdk_bdev_next(bdev); 393 } 394 395 return NULL; 396 } 397 398 void 399 spdk_bdev_io_set_buf(struct spdk_bdev_io *bdev_io, void *buf, size_t len) 400 { 401 struct iovec *iovs; 402 403 iovs = bdev_io->u.bdev.iovs; 404 405 assert(iovs != NULL); 406 assert(bdev_io->u.bdev.iovcnt >= 1); 407 408 iovs[0].iov_base = buf; 409 iovs[0].iov_len = len; 410 } 411 412 static void 413 spdk_bdev_io_put_buf(struct spdk_bdev_io *bdev_io) 414 { 415 struct spdk_mempool *pool; 416 struct spdk_bdev_io *tmp; 417 void *buf, *aligned_buf; 418 bdev_io_stailq_t *stailq; 419 struct spdk_bdev_mgmt_channel *ch; 420 421 assert(bdev_io->u.bdev.iovcnt == 1); 422 423 buf = bdev_io->internal.buf; 424 ch = bdev_io->internal.ch->shared_resource->mgmt_ch; 425 426 bdev_io->internal.buf = NULL; 427 428 if (bdev_io->internal.buf_len <= SPDK_BDEV_SMALL_BUF_MAX_SIZE) { 429 pool = g_bdev_mgr.buf_small_pool; 430 stailq = &ch->need_buf_small; 431 } else { 432 pool = g_bdev_mgr.buf_large_pool; 433 stailq = &ch->need_buf_large; 434 } 435 436 if (STAILQ_EMPTY(stailq)) { 437 spdk_mempool_put(pool, buf); 438 } else { 439 tmp = STAILQ_FIRST(stailq); 440 441 aligned_buf = (void *)(((uintptr_t)buf + 511) & ~511UL); 442 spdk_bdev_io_set_buf(tmp, aligned_buf, tmp->internal.buf_len); 443 444 STAILQ_REMOVE_HEAD(stailq, internal.buf_link); 445 tmp->internal.buf = buf; 446 tmp->internal.get_buf_cb(tmp->internal.ch->channel, tmp); 447 } 448 } 449 450 void 451 spdk_bdev_io_get_buf(struct spdk_bdev_io *bdev_io, spdk_bdev_io_get_buf_cb cb, uint64_t len) 452 { 453 struct spdk_mempool *pool; 454 bdev_io_stailq_t *stailq; 455 void *buf, *aligned_buf; 456 struct spdk_bdev_mgmt_channel *mgmt_ch; 457 458 assert(cb != NULL); 459 assert(bdev_io->u.bdev.iovs != NULL); 460 461 if (spdk_unlikely(bdev_io->u.bdev.iovs[0].iov_base != NULL)) { 462 /* Buffer already present */ 463 cb(bdev_io->internal.ch->channel, bdev_io); 464 return; 465 } 466 467 assert(len <= SPDK_BDEV_LARGE_BUF_MAX_SIZE); 468 mgmt_ch = bdev_io->internal.ch->shared_resource->mgmt_ch; 469 470 bdev_io->internal.buf_len = len; 471 bdev_io->internal.get_buf_cb = cb; 472 if (len <= SPDK_BDEV_SMALL_BUF_MAX_SIZE) { 473 pool = g_bdev_mgr.buf_small_pool; 474 stailq = &mgmt_ch->need_buf_small; 475 } else { 476 pool = g_bdev_mgr.buf_large_pool; 477 stailq = &mgmt_ch->need_buf_large; 478 } 479 480 buf = spdk_mempool_get(pool); 481 482 if (!buf) { 483 STAILQ_INSERT_TAIL(stailq, bdev_io, internal.buf_link); 484 } else { 485 aligned_buf = (void *)(((uintptr_t)buf + 511) & ~511UL); 486 spdk_bdev_io_set_buf(bdev_io, aligned_buf, len); 487 488 bdev_io->internal.buf = buf; 489 bdev_io->internal.get_buf_cb(bdev_io->internal.ch->channel, bdev_io); 490 } 491 } 492 493 static int 494 spdk_bdev_module_get_max_ctx_size(void) 495 { 496 struct spdk_bdev_module *bdev_module; 497 int max_bdev_module_size = 0; 498 499 TAILQ_FOREACH(bdev_module, &g_bdev_mgr.bdev_modules, internal.tailq) { 500 if (bdev_module->get_ctx_size && bdev_module->get_ctx_size() > max_bdev_module_size) { 501 max_bdev_module_size = bdev_module->get_ctx_size(); 502 } 503 } 504 505 return max_bdev_module_size; 506 } 507 508 void 509 spdk_bdev_config_text(FILE *fp) 510 { 511 struct spdk_bdev_module *bdev_module; 512 513 TAILQ_FOREACH(bdev_module, &g_bdev_mgr.bdev_modules, internal.tailq) { 514 if (bdev_module->config_text) { 515 bdev_module->config_text(fp); 516 } 517 } 518 } 519 520 void 521 spdk_bdev_subsystem_config_json(struct spdk_json_write_ctx *w) 522 { 523 struct spdk_bdev_module *bdev_module; 524 struct spdk_bdev *bdev; 525 526 assert(w != NULL); 527 528 spdk_json_write_array_begin(w); 529 530 spdk_json_write_object_begin(w); 531 spdk_json_write_named_string(w, "method", "set_bdev_options"); 532 spdk_json_write_name(w, "params"); 533 spdk_json_write_object_begin(w); 534 spdk_json_write_named_uint32(w, "bdev_io_pool_size", g_bdev_opts.bdev_io_pool_size); 535 spdk_json_write_named_uint32(w, "bdev_io_cache_size", g_bdev_opts.bdev_io_cache_size); 536 spdk_json_write_object_end(w); 537 spdk_json_write_object_end(w); 538 539 TAILQ_FOREACH(bdev_module, &g_bdev_mgr.bdev_modules, internal.tailq) { 540 if (bdev_module->config_json) { 541 bdev_module->config_json(w); 542 } 543 } 544 545 TAILQ_FOREACH(bdev, &g_bdev_mgr.bdevs, internal.link) { 546 if (bdev->fn_table->write_config_json) { 547 bdev->fn_table->write_config_json(bdev, w); 548 } 549 } 550 551 spdk_json_write_array_end(w); 552 } 553 554 static int 555 spdk_bdev_mgmt_channel_create(void *io_device, void *ctx_buf) 556 { 557 struct spdk_bdev_mgmt_channel *ch = ctx_buf; 558 struct spdk_bdev_io *bdev_io; 559 uint32_t i; 560 561 STAILQ_INIT(&ch->need_buf_small); 562 STAILQ_INIT(&ch->need_buf_large); 563 564 STAILQ_INIT(&ch->per_thread_cache); 565 ch->bdev_io_cache_size = g_bdev_opts.bdev_io_cache_size; 566 567 /* Pre-populate bdev_io cache to ensure this thread cannot be starved. */ 568 ch->per_thread_cache_count = 0; 569 for (i = 0; i < ch->bdev_io_cache_size; i++) { 570 bdev_io = spdk_mempool_get(g_bdev_mgr.bdev_io_pool); 571 assert(bdev_io != NULL); 572 ch->per_thread_cache_count++; 573 STAILQ_INSERT_TAIL(&ch->per_thread_cache, bdev_io, internal.buf_link); 574 } 575 576 TAILQ_INIT(&ch->shared_resources); 577 TAILQ_INIT(&ch->io_wait_queue); 578 579 return 0; 580 } 581 582 static void 583 spdk_bdev_mgmt_channel_destroy(void *io_device, void *ctx_buf) 584 { 585 struct spdk_bdev_mgmt_channel *ch = ctx_buf; 586 struct spdk_bdev_io *bdev_io; 587 588 if (!STAILQ_EMPTY(&ch->need_buf_small) || !STAILQ_EMPTY(&ch->need_buf_large)) { 589 SPDK_ERRLOG("Pending I/O list wasn't empty on mgmt channel free\n"); 590 } 591 592 if (!TAILQ_EMPTY(&ch->shared_resources)) { 593 SPDK_ERRLOG("Module channel list wasn't empty on mgmt channel free\n"); 594 } 595 596 while (!STAILQ_EMPTY(&ch->per_thread_cache)) { 597 bdev_io = STAILQ_FIRST(&ch->per_thread_cache); 598 STAILQ_REMOVE_HEAD(&ch->per_thread_cache, internal.buf_link); 599 ch->per_thread_cache_count--; 600 spdk_mempool_put(g_bdev_mgr.bdev_io_pool, (void *)bdev_io); 601 } 602 603 assert(ch->per_thread_cache_count == 0); 604 } 605 606 static void 607 spdk_bdev_init_complete(int rc) 608 { 609 spdk_bdev_init_cb cb_fn = g_init_cb_fn; 610 void *cb_arg = g_init_cb_arg; 611 struct spdk_bdev_module *m; 612 613 g_bdev_mgr.init_complete = true; 614 g_init_cb_fn = NULL; 615 g_init_cb_arg = NULL; 616 617 /* 618 * For modules that need to know when subsystem init is complete, 619 * inform them now. 620 */ 621 if (rc == 0) { 622 TAILQ_FOREACH(m, &g_bdev_mgr.bdev_modules, internal.tailq) { 623 if (m->init_complete) { 624 m->init_complete(); 625 } 626 } 627 } 628 629 cb_fn(cb_arg, rc); 630 } 631 632 static void 633 spdk_bdev_module_action_complete(void) 634 { 635 struct spdk_bdev_module *m; 636 637 /* 638 * Don't finish bdev subsystem initialization if 639 * module pre-initialization is still in progress, or 640 * the subsystem been already initialized. 641 */ 642 if (!g_bdev_mgr.module_init_complete || g_bdev_mgr.init_complete) { 643 return; 644 } 645 646 /* 647 * Check all bdev modules for inits/examinations in progress. If any 648 * exist, return immediately since we cannot finish bdev subsystem 649 * initialization until all are completed. 650 */ 651 TAILQ_FOREACH(m, &g_bdev_mgr.bdev_modules, internal.tailq) { 652 if (m->internal.action_in_progress > 0) { 653 return; 654 } 655 } 656 657 /* 658 * Modules already finished initialization - now that all 659 * the bdev modules have finished their asynchronous I/O 660 * processing, the entire bdev layer can be marked as complete. 661 */ 662 spdk_bdev_init_complete(0); 663 } 664 665 static void 666 spdk_bdev_module_action_done(struct spdk_bdev_module *module) 667 { 668 assert(module->internal.action_in_progress > 0); 669 module->internal.action_in_progress--; 670 spdk_bdev_module_action_complete(); 671 } 672 673 void 674 spdk_bdev_module_init_done(struct spdk_bdev_module *module) 675 { 676 spdk_bdev_module_action_done(module); 677 } 678 679 void 680 spdk_bdev_module_examine_done(struct spdk_bdev_module *module) 681 { 682 spdk_bdev_module_action_done(module); 683 } 684 685 /** The last initialized bdev module */ 686 static struct spdk_bdev_module *g_resume_bdev_module = NULL; 687 688 static int 689 spdk_bdev_modules_init(void) 690 { 691 struct spdk_bdev_module *module; 692 int rc = 0; 693 694 TAILQ_FOREACH(module, &g_bdev_mgr.bdev_modules, internal.tailq) { 695 g_resume_bdev_module = module; 696 rc = module->module_init(); 697 if (rc != 0) { 698 return rc; 699 } 700 } 701 702 g_resume_bdev_module = NULL; 703 return 0; 704 } 705 706 707 static void 708 spdk_bdev_init_failed_complete(void *cb_arg) 709 { 710 spdk_bdev_init_complete(-1); 711 } 712 713 static void 714 spdk_bdev_init_failed(void *cb_arg) 715 { 716 spdk_bdev_finish(spdk_bdev_init_failed_complete, NULL); 717 } 718 719 void 720 spdk_bdev_initialize(spdk_bdev_init_cb cb_fn, void *cb_arg) 721 { 722 struct spdk_conf_section *sp; 723 struct spdk_bdev_opts bdev_opts; 724 int32_t bdev_io_pool_size, bdev_io_cache_size; 725 int cache_size; 726 int rc = 0; 727 char mempool_name[32]; 728 729 assert(cb_fn != NULL); 730 731 sp = spdk_conf_find_section(NULL, "Bdev"); 732 if (sp != NULL) { 733 spdk_bdev_get_opts(&bdev_opts); 734 735 bdev_io_pool_size = spdk_conf_section_get_intval(sp, "BdevIoPoolSize"); 736 if (bdev_io_pool_size >= 0) { 737 bdev_opts.bdev_io_pool_size = bdev_io_pool_size; 738 } 739 740 bdev_io_cache_size = spdk_conf_section_get_intval(sp, "BdevIoCacheSize"); 741 if (bdev_io_cache_size >= 0) { 742 bdev_opts.bdev_io_cache_size = bdev_io_cache_size; 743 } 744 745 if (spdk_bdev_set_opts(&bdev_opts)) { 746 spdk_bdev_init_complete(-1); 747 return; 748 } 749 750 assert(memcmp(&bdev_opts, &g_bdev_opts, sizeof(bdev_opts)) == 0); 751 } 752 753 g_init_cb_fn = cb_fn; 754 g_init_cb_arg = cb_arg; 755 756 snprintf(mempool_name, sizeof(mempool_name), "bdev_io_%d", getpid()); 757 758 g_bdev_mgr.bdev_io_pool = spdk_mempool_create(mempool_name, 759 g_bdev_opts.bdev_io_pool_size, 760 sizeof(struct spdk_bdev_io) + 761 spdk_bdev_module_get_max_ctx_size(), 762 0, 763 SPDK_ENV_SOCKET_ID_ANY); 764 765 if (g_bdev_mgr.bdev_io_pool == NULL) { 766 SPDK_ERRLOG("could not allocate spdk_bdev_io pool\n"); 767 spdk_bdev_init_complete(-1); 768 return; 769 } 770 771 /** 772 * Ensure no more than half of the total buffers end up local caches, by 773 * using spdk_thread_get_count() to determine how many local caches we need 774 * to account for. 775 */ 776 cache_size = BUF_SMALL_POOL_SIZE / (2 * spdk_thread_get_count()); 777 snprintf(mempool_name, sizeof(mempool_name), "buf_small_pool_%d", getpid()); 778 779 g_bdev_mgr.buf_small_pool = spdk_mempool_create(mempool_name, 780 BUF_SMALL_POOL_SIZE, 781 SPDK_BDEV_SMALL_BUF_MAX_SIZE + 512, 782 cache_size, 783 SPDK_ENV_SOCKET_ID_ANY); 784 if (!g_bdev_mgr.buf_small_pool) { 785 SPDK_ERRLOG("create rbuf small pool failed\n"); 786 spdk_bdev_init_complete(-1); 787 return; 788 } 789 790 cache_size = BUF_LARGE_POOL_SIZE / (2 * spdk_thread_get_count()); 791 snprintf(mempool_name, sizeof(mempool_name), "buf_large_pool_%d", getpid()); 792 793 g_bdev_mgr.buf_large_pool = spdk_mempool_create(mempool_name, 794 BUF_LARGE_POOL_SIZE, 795 SPDK_BDEV_LARGE_BUF_MAX_SIZE + 512, 796 cache_size, 797 SPDK_ENV_SOCKET_ID_ANY); 798 if (!g_bdev_mgr.buf_large_pool) { 799 SPDK_ERRLOG("create rbuf large pool failed\n"); 800 spdk_bdev_init_complete(-1); 801 return; 802 } 803 804 g_bdev_mgr.zero_buffer = spdk_dma_zmalloc(ZERO_BUFFER_SIZE, ZERO_BUFFER_SIZE, 805 NULL); 806 if (!g_bdev_mgr.zero_buffer) { 807 SPDK_ERRLOG("create bdev zero buffer failed\n"); 808 spdk_bdev_init_complete(-1); 809 return; 810 } 811 812 #ifdef SPDK_CONFIG_VTUNE 813 g_bdev_mgr.domain = __itt_domain_create("spdk_bdev"); 814 #endif 815 816 spdk_io_device_register(&g_bdev_mgr, spdk_bdev_mgmt_channel_create, 817 spdk_bdev_mgmt_channel_destroy, 818 sizeof(struct spdk_bdev_mgmt_channel), 819 "bdev_mgr"); 820 821 rc = spdk_bdev_modules_init(); 822 g_bdev_mgr.module_init_complete = true; 823 if (rc != 0) { 824 SPDK_ERRLOG("bdev modules init failed\n"); 825 spdk_thread_send_msg(spdk_get_thread(), spdk_bdev_init_failed, NULL); 826 return; 827 } 828 829 spdk_bdev_module_action_complete(); 830 } 831 832 static void 833 spdk_bdev_mgr_unregister_cb(void *io_device) 834 { 835 spdk_bdev_fini_cb cb_fn = g_fini_cb_fn; 836 837 if (spdk_mempool_count(g_bdev_mgr.bdev_io_pool) != g_bdev_opts.bdev_io_pool_size) { 838 SPDK_ERRLOG("bdev IO pool count is %zu but should be %u\n", 839 spdk_mempool_count(g_bdev_mgr.bdev_io_pool), 840 g_bdev_opts.bdev_io_pool_size); 841 } 842 843 if (spdk_mempool_count(g_bdev_mgr.buf_small_pool) != BUF_SMALL_POOL_SIZE) { 844 SPDK_ERRLOG("Small buffer pool count is %zu but should be %u\n", 845 spdk_mempool_count(g_bdev_mgr.buf_small_pool), 846 BUF_SMALL_POOL_SIZE); 847 assert(false); 848 } 849 850 if (spdk_mempool_count(g_bdev_mgr.buf_large_pool) != BUF_LARGE_POOL_SIZE) { 851 SPDK_ERRLOG("Large buffer pool count is %zu but should be %u\n", 852 spdk_mempool_count(g_bdev_mgr.buf_large_pool), 853 BUF_LARGE_POOL_SIZE); 854 assert(false); 855 } 856 857 spdk_mempool_free(g_bdev_mgr.bdev_io_pool); 858 spdk_mempool_free(g_bdev_mgr.buf_small_pool); 859 spdk_mempool_free(g_bdev_mgr.buf_large_pool); 860 spdk_dma_free(g_bdev_mgr.zero_buffer); 861 862 cb_fn(g_fini_cb_arg); 863 g_fini_cb_fn = NULL; 864 g_fini_cb_arg = NULL; 865 } 866 867 static void 868 spdk_bdev_module_finish_iter(void *arg) 869 { 870 struct spdk_bdev_module *bdev_module; 871 872 /* Start iterating from the last touched module */ 873 if (!g_resume_bdev_module) { 874 bdev_module = TAILQ_LAST(&g_bdev_mgr.bdev_modules, bdev_module_list); 875 } else { 876 bdev_module = TAILQ_PREV(g_resume_bdev_module, bdev_module_list, 877 internal.tailq); 878 } 879 880 while (bdev_module) { 881 if (bdev_module->async_fini) { 882 /* Save our place so we can resume later. We must 883 * save the variable here, before calling module_fini() 884 * below, because in some cases the module may immediately 885 * call spdk_bdev_module_finish_done() and re-enter 886 * this function to continue iterating. */ 887 g_resume_bdev_module = bdev_module; 888 } 889 890 if (bdev_module->module_fini) { 891 bdev_module->module_fini(); 892 } 893 894 if (bdev_module->async_fini) { 895 return; 896 } 897 898 bdev_module = TAILQ_PREV(bdev_module, bdev_module_list, 899 internal.tailq); 900 } 901 902 g_resume_bdev_module = NULL; 903 spdk_io_device_unregister(&g_bdev_mgr, spdk_bdev_mgr_unregister_cb); 904 } 905 906 void 907 spdk_bdev_module_finish_done(void) 908 { 909 if (spdk_get_thread() != g_fini_thread) { 910 spdk_thread_send_msg(g_fini_thread, spdk_bdev_module_finish_iter, NULL); 911 } else { 912 spdk_bdev_module_finish_iter(NULL); 913 } 914 } 915 916 static void 917 _spdk_bdev_finish_unregister_bdevs_iter(void *cb_arg, int bdeverrno) 918 { 919 struct spdk_bdev *bdev = cb_arg; 920 921 if (bdeverrno && bdev) { 922 SPDK_WARNLOG("Unable to unregister bdev '%s' during spdk_bdev_finish()\n", 923 bdev->name); 924 925 /* 926 * Since the call to spdk_bdev_unregister() failed, we have no way to free this 927 * bdev; try to continue by manually removing this bdev from the list and continue 928 * with the next bdev in the list. 929 */ 930 TAILQ_REMOVE(&g_bdev_mgr.bdevs, bdev, internal.link); 931 } 932 933 if (TAILQ_EMPTY(&g_bdev_mgr.bdevs)) { 934 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Done unregistering bdevs\n"); 935 /* 936 * Bdev module finish need to be deffered as we might be in the middle of some context 937 * (like bdev part free) that will use this bdev (or private bdev driver ctx data) 938 * after returning. 939 */ 940 spdk_thread_send_msg(spdk_get_thread(), spdk_bdev_module_finish_iter, NULL); 941 return; 942 } 943 944 /* 945 * Unregister the last bdev in the list. The last bdev in the list should be a bdev 946 * that has no bdevs that depend on it. 947 */ 948 bdev = TAILQ_LAST(&g_bdev_mgr.bdevs, spdk_bdev_list); 949 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Unregistering bdev '%s'\n", bdev->name); 950 spdk_bdev_unregister(bdev, _spdk_bdev_finish_unregister_bdevs_iter, bdev); 951 } 952 953 void 954 spdk_bdev_finish(spdk_bdev_fini_cb cb_fn, void *cb_arg) 955 { 956 struct spdk_bdev_module *m; 957 958 assert(cb_fn != NULL); 959 960 g_fini_thread = spdk_get_thread(); 961 962 g_fini_cb_fn = cb_fn; 963 g_fini_cb_arg = cb_arg; 964 965 TAILQ_FOREACH(m, &g_bdev_mgr.bdev_modules, internal.tailq) { 966 if (m->fini_start) { 967 m->fini_start(); 968 } 969 } 970 971 _spdk_bdev_finish_unregister_bdevs_iter(NULL, 0); 972 } 973 974 static struct spdk_bdev_io * 975 spdk_bdev_get_io(struct spdk_bdev_channel *channel) 976 { 977 struct spdk_bdev_mgmt_channel *ch = channel->shared_resource->mgmt_ch; 978 struct spdk_bdev_io *bdev_io; 979 980 if (ch->per_thread_cache_count > 0) { 981 bdev_io = STAILQ_FIRST(&ch->per_thread_cache); 982 STAILQ_REMOVE_HEAD(&ch->per_thread_cache, internal.buf_link); 983 ch->per_thread_cache_count--; 984 } else if (spdk_unlikely(!TAILQ_EMPTY(&ch->io_wait_queue))) { 985 /* 986 * Don't try to look for bdev_ios in the global pool if there are 987 * waiters on bdev_ios - we don't want this caller to jump the line. 988 */ 989 bdev_io = NULL; 990 } else { 991 bdev_io = spdk_mempool_get(g_bdev_mgr.bdev_io_pool); 992 } 993 994 return bdev_io; 995 } 996 997 void 998 spdk_bdev_free_io(struct spdk_bdev_io *bdev_io) 999 { 1000 struct spdk_bdev_mgmt_channel *ch = bdev_io->internal.ch->shared_resource->mgmt_ch; 1001 1002 assert(bdev_io != NULL); 1003 assert(bdev_io->internal.status != SPDK_BDEV_IO_STATUS_PENDING); 1004 1005 if (bdev_io->internal.buf != NULL) { 1006 spdk_bdev_io_put_buf(bdev_io); 1007 } 1008 1009 if (ch->per_thread_cache_count < ch->bdev_io_cache_size) { 1010 ch->per_thread_cache_count++; 1011 STAILQ_INSERT_TAIL(&ch->per_thread_cache, bdev_io, internal.buf_link); 1012 while (ch->per_thread_cache_count > 0 && !TAILQ_EMPTY(&ch->io_wait_queue)) { 1013 struct spdk_bdev_io_wait_entry *entry; 1014 1015 entry = TAILQ_FIRST(&ch->io_wait_queue); 1016 TAILQ_REMOVE(&ch->io_wait_queue, entry, link); 1017 entry->cb_fn(entry->cb_arg); 1018 } 1019 } else { 1020 /* We should never have a full cache with entries on the io wait queue. */ 1021 assert(TAILQ_EMPTY(&ch->io_wait_queue)); 1022 spdk_mempool_put(g_bdev_mgr.bdev_io_pool, (void *)bdev_io); 1023 } 1024 } 1025 1026 static bool 1027 _spdk_bdev_qos_is_iops_rate_limit(enum spdk_bdev_qos_rate_limit_type limit) 1028 { 1029 assert(limit != SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES); 1030 1031 switch (limit) { 1032 case SPDK_BDEV_QOS_RW_IOPS_RATE_LIMIT: 1033 return true; 1034 case SPDK_BDEV_QOS_RW_BPS_RATE_LIMIT: 1035 return false; 1036 case SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES: 1037 default: 1038 return false; 1039 } 1040 } 1041 1042 static bool 1043 _spdk_bdev_qos_io_to_limit(struct spdk_bdev_io *bdev_io) 1044 { 1045 switch (bdev_io->type) { 1046 case SPDK_BDEV_IO_TYPE_NVME_IO: 1047 case SPDK_BDEV_IO_TYPE_NVME_IO_MD: 1048 case SPDK_BDEV_IO_TYPE_READ: 1049 case SPDK_BDEV_IO_TYPE_WRITE: 1050 case SPDK_BDEV_IO_TYPE_UNMAP: 1051 case SPDK_BDEV_IO_TYPE_WRITE_ZEROES: 1052 return true; 1053 default: 1054 return false; 1055 } 1056 } 1057 1058 static uint64_t 1059 _spdk_bdev_get_io_size_in_byte(struct spdk_bdev_io *bdev_io) 1060 { 1061 struct spdk_bdev *bdev = bdev_io->bdev; 1062 1063 switch (bdev_io->type) { 1064 case SPDK_BDEV_IO_TYPE_NVME_IO: 1065 case SPDK_BDEV_IO_TYPE_NVME_IO_MD: 1066 return bdev_io->u.nvme_passthru.nbytes; 1067 case SPDK_BDEV_IO_TYPE_READ: 1068 case SPDK_BDEV_IO_TYPE_WRITE: 1069 case SPDK_BDEV_IO_TYPE_UNMAP: 1070 case SPDK_BDEV_IO_TYPE_WRITE_ZEROES: 1071 return bdev_io->u.bdev.num_blocks * bdev->blocklen; 1072 default: 1073 return 0; 1074 } 1075 } 1076 1077 static void 1078 _spdk_bdev_qos_update_per_io(struct spdk_bdev_qos *qos, uint64_t io_size_in_byte) 1079 { 1080 int i; 1081 1082 for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) { 1083 if (qos->rate_limits[i].limit == SPDK_BDEV_QOS_LIMIT_NOT_DEFINED) { 1084 continue; 1085 } 1086 1087 switch (i) { 1088 case SPDK_BDEV_QOS_RW_IOPS_RATE_LIMIT: 1089 qos->rate_limits[i].remaining_this_timeslice--; 1090 break; 1091 case SPDK_BDEV_QOS_RW_BPS_RATE_LIMIT: 1092 qos->rate_limits[i].remaining_this_timeslice -= io_size_in_byte; 1093 break; 1094 case SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES: 1095 default: 1096 break; 1097 } 1098 } 1099 } 1100 1101 static void 1102 _spdk_bdev_qos_io_submit(struct spdk_bdev_channel *ch) 1103 { 1104 struct spdk_bdev_io *bdev_io = NULL; 1105 struct spdk_bdev *bdev = ch->bdev; 1106 struct spdk_bdev_qos *qos = bdev->internal.qos; 1107 struct spdk_bdev_shared_resource *shared_resource = ch->shared_resource; 1108 int i; 1109 bool to_limit_io; 1110 uint64_t io_size_in_byte; 1111 1112 while (!TAILQ_EMPTY(&qos->queued)) { 1113 for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) { 1114 if (qos->rate_limits[i].max_per_timeslice > 0 && 1115 (qos->rate_limits[i].remaining_this_timeslice <= 0)) { 1116 return; 1117 } 1118 } 1119 1120 bdev_io = TAILQ_FIRST(&qos->queued); 1121 TAILQ_REMOVE(&qos->queued, bdev_io, internal.link); 1122 ch->io_outstanding++; 1123 shared_resource->io_outstanding++; 1124 to_limit_io = _spdk_bdev_qos_io_to_limit(bdev_io); 1125 if (to_limit_io == true) { 1126 io_size_in_byte = _spdk_bdev_get_io_size_in_byte(bdev_io); 1127 _spdk_bdev_qos_update_per_io(qos, io_size_in_byte); 1128 } 1129 bdev->fn_table->submit_request(ch->channel, bdev_io); 1130 } 1131 } 1132 1133 static void 1134 _spdk_bdev_queue_io_wait_with_cb(struct spdk_bdev_io *bdev_io, spdk_bdev_io_wait_cb cb_fn) 1135 { 1136 int rc; 1137 1138 bdev_io->internal.waitq_entry.bdev = bdev_io->bdev; 1139 bdev_io->internal.waitq_entry.cb_fn = cb_fn; 1140 bdev_io->internal.waitq_entry.cb_arg = bdev_io; 1141 rc = spdk_bdev_queue_io_wait(bdev_io->bdev, spdk_io_channel_from_ctx(bdev_io->internal.ch), 1142 &bdev_io->internal.waitq_entry); 1143 if (rc != 0) { 1144 SPDK_ERRLOG("Queue IO failed, rc=%d\n", rc); 1145 bdev_io->internal.status = SPDK_BDEV_IO_STATUS_FAILED; 1146 bdev_io->internal.cb(bdev_io, false, bdev_io->internal.caller_ctx); 1147 } 1148 } 1149 1150 static bool 1151 _spdk_bdev_io_type_can_split(uint8_t type) 1152 { 1153 assert(type != SPDK_BDEV_IO_TYPE_INVALID); 1154 assert(type < SPDK_BDEV_NUM_IO_TYPES); 1155 1156 /* Only split READ and WRITE I/O. Theoretically other types of I/O like 1157 * UNMAP could be split, but these types of I/O are typically much larger 1158 * in size (sometimes the size of the entire block device), and the bdev 1159 * module can more efficiently split these types of I/O. Plus those types 1160 * of I/O do not have a payload, which makes the splitting process simpler. 1161 */ 1162 if (type == SPDK_BDEV_IO_TYPE_READ || type == SPDK_BDEV_IO_TYPE_WRITE) { 1163 return true; 1164 } else { 1165 return false; 1166 } 1167 } 1168 1169 static bool 1170 _spdk_bdev_io_should_split(struct spdk_bdev_io *bdev_io) 1171 { 1172 uint64_t start_stripe, end_stripe; 1173 uint32_t io_boundary = bdev_io->bdev->optimal_io_boundary; 1174 1175 if (io_boundary == 0) { 1176 return false; 1177 } 1178 1179 if (!_spdk_bdev_io_type_can_split(bdev_io->type)) { 1180 return false; 1181 } 1182 1183 start_stripe = bdev_io->u.bdev.offset_blocks; 1184 end_stripe = start_stripe + bdev_io->u.bdev.num_blocks - 1; 1185 /* Avoid expensive div operations if possible. These spdk_u32 functions are very cheap. */ 1186 if (spdk_likely(spdk_u32_is_pow2(io_boundary))) { 1187 start_stripe >>= spdk_u32log2(io_boundary); 1188 end_stripe >>= spdk_u32log2(io_boundary); 1189 } else { 1190 start_stripe /= io_boundary; 1191 end_stripe /= io_boundary; 1192 } 1193 return (start_stripe != end_stripe); 1194 } 1195 1196 static uint32_t 1197 _to_next_boundary(uint64_t offset, uint32_t boundary) 1198 { 1199 return (boundary - (offset % boundary)); 1200 } 1201 1202 static void 1203 _spdk_bdev_io_split_done(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg); 1204 1205 static void 1206 _spdk_bdev_io_split_with_payload(void *_bdev_io) 1207 { 1208 struct spdk_bdev_io *bdev_io = _bdev_io; 1209 uint64_t current_offset, remaining, bytes_handled; 1210 uint32_t blocklen, to_next_boundary, to_next_boundary_bytes; 1211 struct iovec *parent_iov; 1212 uint64_t parent_iov_offset, child_iov_len; 1213 uint32_t child_iovcnt; 1214 int rc; 1215 1216 remaining = bdev_io->u.bdev.split_remaining_num_blocks; 1217 current_offset = bdev_io->u.bdev.split_current_offset_blocks; 1218 blocklen = bdev_io->bdev->blocklen; 1219 bytes_handled = (current_offset - bdev_io->u.bdev.offset_blocks) * blocklen; 1220 parent_iov = &bdev_io->u.bdev.iovs[0]; 1221 parent_iov_offset = 0; 1222 1223 while (bytes_handled > 0) { 1224 if (bytes_handled >= parent_iov->iov_len) { 1225 bytes_handled -= parent_iov->iov_len; 1226 parent_iov++; 1227 continue; 1228 } 1229 parent_iov_offset += bytes_handled; 1230 break; 1231 } 1232 1233 to_next_boundary = _to_next_boundary(current_offset, bdev_io->bdev->optimal_io_boundary); 1234 to_next_boundary = spdk_min(remaining, to_next_boundary); 1235 to_next_boundary_bytes = to_next_boundary * blocklen; 1236 child_iovcnt = 0; 1237 while (to_next_boundary_bytes > 0 && child_iovcnt < BDEV_IO_NUM_CHILD_IOV) { 1238 child_iov_len = spdk_min(to_next_boundary_bytes, parent_iov->iov_len - parent_iov_offset); 1239 to_next_boundary_bytes -= child_iov_len; 1240 1241 bdev_io->child_iov[child_iovcnt].iov_base = parent_iov->iov_base + parent_iov_offset; 1242 bdev_io->child_iov[child_iovcnt].iov_len = child_iov_len; 1243 1244 parent_iov++; 1245 parent_iov_offset = 0; 1246 child_iovcnt++; 1247 } 1248 1249 if (to_next_boundary_bytes > 0) { 1250 /* We had to stop this child I/O early because we ran out of 1251 * child_iov space. Make sure the iovs collected are valid and 1252 * then adjust to_next_boundary before starting the child I/O. 1253 */ 1254 if ((to_next_boundary_bytes % blocklen) != 0) { 1255 SPDK_ERRLOG("Remaining %" PRIu32 " is not multiple of block size %" PRIu32 "\n", 1256 to_next_boundary_bytes, blocklen); 1257 bdev_io->internal.status = SPDK_BDEV_IO_STATUS_FAILED; 1258 bdev_io->internal.cb(bdev_io, false, bdev_io->internal.caller_ctx); 1259 return; 1260 } 1261 to_next_boundary -= to_next_boundary_bytes / blocklen; 1262 } 1263 1264 if (bdev_io->type == SPDK_BDEV_IO_TYPE_READ) { 1265 rc = spdk_bdev_readv_blocks(bdev_io->internal.desc, 1266 spdk_io_channel_from_ctx(bdev_io->internal.ch), 1267 bdev_io->child_iov, child_iovcnt, current_offset, to_next_boundary, 1268 _spdk_bdev_io_split_done, bdev_io); 1269 } else { 1270 rc = spdk_bdev_writev_blocks(bdev_io->internal.desc, 1271 spdk_io_channel_from_ctx(bdev_io->internal.ch), 1272 bdev_io->child_iov, child_iovcnt, current_offset, to_next_boundary, 1273 _spdk_bdev_io_split_done, bdev_io); 1274 } 1275 1276 if (rc == 0) { 1277 bdev_io->u.bdev.split_current_offset_blocks += to_next_boundary; 1278 bdev_io->u.bdev.split_remaining_num_blocks -= to_next_boundary; 1279 } else if (rc == -ENOMEM) { 1280 _spdk_bdev_queue_io_wait_with_cb(bdev_io, _spdk_bdev_io_split_with_payload); 1281 } else { 1282 bdev_io->internal.status = SPDK_BDEV_IO_STATUS_FAILED; 1283 bdev_io->internal.cb(bdev_io, false, bdev_io->internal.caller_ctx); 1284 } 1285 } 1286 1287 static void 1288 _spdk_bdev_io_split_done(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg) 1289 { 1290 struct spdk_bdev_io *parent_io = cb_arg; 1291 1292 spdk_bdev_free_io(bdev_io); 1293 1294 if (!success) { 1295 parent_io->internal.status = SPDK_BDEV_IO_STATUS_FAILED; 1296 parent_io->internal.cb(parent_io, false, parent_io->internal.caller_ctx); 1297 return; 1298 } 1299 1300 if (parent_io->u.bdev.split_remaining_num_blocks == 0) { 1301 parent_io->internal.status = SPDK_BDEV_IO_STATUS_SUCCESS; 1302 parent_io->internal.cb(parent_io, true, parent_io->internal.caller_ctx); 1303 return; 1304 } 1305 1306 /* 1307 * Continue with the splitting process. This function will complete the parent I/O if the 1308 * splitting is done. 1309 */ 1310 _spdk_bdev_io_split_with_payload(parent_io); 1311 } 1312 1313 static void 1314 _spdk_bdev_io_split(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io) 1315 { 1316 assert(_spdk_bdev_io_type_can_split(bdev_io->type)); 1317 1318 bdev_io->u.bdev.split_current_offset_blocks = bdev_io->u.bdev.offset_blocks; 1319 bdev_io->u.bdev.split_remaining_num_blocks = bdev_io->u.bdev.num_blocks; 1320 1321 _spdk_bdev_io_split_with_payload(bdev_io); 1322 } 1323 1324 static void 1325 _spdk_bdev_io_submit(void *ctx) 1326 { 1327 struct spdk_bdev_io *bdev_io = ctx; 1328 struct spdk_bdev *bdev = bdev_io->bdev; 1329 struct spdk_bdev_channel *bdev_ch = bdev_io->internal.ch; 1330 struct spdk_io_channel *ch = bdev_ch->channel; 1331 struct spdk_bdev_shared_resource *shared_resource = bdev_ch->shared_resource; 1332 uint64_t tsc; 1333 1334 tsc = spdk_get_ticks(); 1335 bdev_io->internal.submit_tsc = tsc; 1336 spdk_trace_record_tsc(tsc, TRACE_BDEV_IO_START, 0, 0, (uintptr_t)bdev_io, bdev_io->type); 1337 bdev_ch->io_outstanding++; 1338 shared_resource->io_outstanding++; 1339 bdev_io->internal.in_submit_request = true; 1340 if (spdk_likely(bdev_ch->flags == 0)) { 1341 if (spdk_likely(TAILQ_EMPTY(&shared_resource->nomem_io))) { 1342 bdev->fn_table->submit_request(ch, bdev_io); 1343 } else { 1344 bdev_ch->io_outstanding--; 1345 shared_resource->io_outstanding--; 1346 TAILQ_INSERT_TAIL(&shared_resource->nomem_io, bdev_io, internal.link); 1347 } 1348 } else if (bdev_ch->flags & BDEV_CH_RESET_IN_PROGRESS) { 1349 spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED); 1350 } else if (bdev_ch->flags & BDEV_CH_QOS_ENABLED) { 1351 bdev_ch->io_outstanding--; 1352 shared_resource->io_outstanding--; 1353 TAILQ_INSERT_TAIL(&bdev->internal.qos->queued, bdev_io, internal.link); 1354 _spdk_bdev_qos_io_submit(bdev_ch); 1355 } else { 1356 SPDK_ERRLOG("unknown bdev_ch flag %x found\n", bdev_ch->flags); 1357 spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED); 1358 } 1359 bdev_io->internal.in_submit_request = false; 1360 } 1361 1362 static void 1363 spdk_bdev_io_submit(struct spdk_bdev_io *bdev_io) 1364 { 1365 struct spdk_bdev *bdev = bdev_io->bdev; 1366 struct spdk_thread *thread = spdk_io_channel_get_thread(bdev_io->internal.ch->channel); 1367 1368 assert(thread != NULL); 1369 assert(bdev_io->internal.status == SPDK_BDEV_IO_STATUS_PENDING); 1370 1371 if (bdev->split_on_optimal_io_boundary && _spdk_bdev_io_should_split(bdev_io)) { 1372 if (bdev_io->type == SPDK_BDEV_IO_TYPE_READ) { 1373 spdk_bdev_io_get_buf(bdev_io, _spdk_bdev_io_split, 1374 bdev_io->u.bdev.num_blocks * bdev_io->bdev->blocklen); 1375 } else { 1376 _spdk_bdev_io_split(NULL, bdev_io); 1377 } 1378 return; 1379 } 1380 1381 if (bdev_io->internal.ch->flags & BDEV_CH_QOS_ENABLED) { 1382 if ((thread == bdev->internal.qos->thread) || !bdev->internal.qos->thread) { 1383 _spdk_bdev_io_submit(bdev_io); 1384 } else { 1385 bdev_io->internal.io_submit_ch = bdev_io->internal.ch; 1386 bdev_io->internal.ch = bdev->internal.qos->ch; 1387 spdk_thread_send_msg(bdev->internal.qos->thread, _spdk_bdev_io_submit, bdev_io); 1388 } 1389 } else { 1390 _spdk_bdev_io_submit(bdev_io); 1391 } 1392 } 1393 1394 static void 1395 spdk_bdev_io_submit_reset(struct spdk_bdev_io *bdev_io) 1396 { 1397 struct spdk_bdev *bdev = bdev_io->bdev; 1398 struct spdk_bdev_channel *bdev_ch = bdev_io->internal.ch; 1399 struct spdk_io_channel *ch = bdev_ch->channel; 1400 1401 assert(bdev_io->internal.status == SPDK_BDEV_IO_STATUS_PENDING); 1402 1403 bdev_io->internal.in_submit_request = true; 1404 bdev->fn_table->submit_request(ch, bdev_io); 1405 bdev_io->internal.in_submit_request = false; 1406 } 1407 1408 static void 1409 spdk_bdev_io_init(struct spdk_bdev_io *bdev_io, 1410 struct spdk_bdev *bdev, void *cb_arg, 1411 spdk_bdev_io_completion_cb cb) 1412 { 1413 bdev_io->bdev = bdev; 1414 bdev_io->internal.caller_ctx = cb_arg; 1415 bdev_io->internal.cb = cb; 1416 bdev_io->internal.status = SPDK_BDEV_IO_STATUS_PENDING; 1417 bdev_io->internal.in_submit_request = false; 1418 bdev_io->internal.buf = NULL; 1419 bdev_io->internal.io_submit_ch = NULL; 1420 } 1421 1422 static bool 1423 _spdk_bdev_io_type_supported(struct spdk_bdev *bdev, enum spdk_bdev_io_type io_type) 1424 { 1425 return bdev->fn_table->io_type_supported(bdev->ctxt, io_type); 1426 } 1427 1428 bool 1429 spdk_bdev_io_type_supported(struct spdk_bdev *bdev, enum spdk_bdev_io_type io_type) 1430 { 1431 bool supported; 1432 1433 supported = _spdk_bdev_io_type_supported(bdev, io_type); 1434 1435 if (!supported) { 1436 switch (io_type) { 1437 case SPDK_BDEV_IO_TYPE_WRITE_ZEROES: 1438 /* The bdev layer will emulate write zeroes as long as write is supported. */ 1439 supported = _spdk_bdev_io_type_supported(bdev, SPDK_BDEV_IO_TYPE_WRITE); 1440 break; 1441 default: 1442 break; 1443 } 1444 } 1445 1446 return supported; 1447 } 1448 1449 int 1450 spdk_bdev_dump_info_json(struct spdk_bdev *bdev, struct spdk_json_write_ctx *w) 1451 { 1452 if (bdev->fn_table->dump_info_json) { 1453 return bdev->fn_table->dump_info_json(bdev->ctxt, w); 1454 } 1455 1456 return 0; 1457 } 1458 1459 static void 1460 spdk_bdev_qos_update_max_quota_per_timeslice(struct spdk_bdev_qos *qos) 1461 { 1462 uint32_t max_per_timeslice = 0; 1463 int i; 1464 1465 for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) { 1466 if (qos->rate_limits[i].limit == SPDK_BDEV_QOS_LIMIT_NOT_DEFINED) { 1467 qos->rate_limits[i].max_per_timeslice = 0; 1468 continue; 1469 } 1470 1471 max_per_timeslice = qos->rate_limits[i].limit * 1472 SPDK_BDEV_QOS_TIMESLICE_IN_USEC / SPDK_SEC_TO_USEC; 1473 1474 qos->rate_limits[i].max_per_timeslice = spdk_max(max_per_timeslice, 1475 qos->rate_limits[i].min_per_timeslice); 1476 1477 qos->rate_limits[i].remaining_this_timeslice = qos->rate_limits[i].max_per_timeslice; 1478 } 1479 } 1480 1481 static int 1482 spdk_bdev_channel_poll_qos(void *arg) 1483 { 1484 struct spdk_bdev_qos *qos = arg; 1485 uint64_t now = spdk_get_ticks(); 1486 int i; 1487 1488 if (now < (qos->last_timeslice + qos->timeslice_size)) { 1489 /* We received our callback earlier than expected - return 1490 * immediately and wait to do accounting until at least one 1491 * timeslice has actually expired. This should never happen 1492 * with a well-behaved timer implementation. 1493 */ 1494 return 0; 1495 } 1496 1497 /* Reset for next round of rate limiting */ 1498 for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) { 1499 /* We may have allowed the IOs or bytes to slightly overrun in the last 1500 * timeslice. remaining_this_timeslice is signed, so if it's negative 1501 * here, we'll account for the overrun so that the next timeslice will 1502 * be appropriately reduced. 1503 */ 1504 if (qos->rate_limits[i].remaining_this_timeslice > 0) { 1505 qos->rate_limits[i].remaining_this_timeslice = 0; 1506 } 1507 } 1508 1509 while (now >= (qos->last_timeslice + qos->timeslice_size)) { 1510 qos->last_timeslice += qos->timeslice_size; 1511 for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) { 1512 qos->rate_limits[i].remaining_this_timeslice += 1513 qos->rate_limits[i].max_per_timeslice; 1514 } 1515 } 1516 1517 _spdk_bdev_qos_io_submit(qos->ch); 1518 1519 return -1; 1520 } 1521 1522 static void 1523 _spdk_bdev_channel_destroy_resource(struct spdk_bdev_channel *ch) 1524 { 1525 struct spdk_bdev_shared_resource *shared_resource; 1526 1527 if (!ch) { 1528 return; 1529 } 1530 1531 if (ch->channel) { 1532 spdk_put_io_channel(ch->channel); 1533 } 1534 1535 assert(ch->io_outstanding == 0); 1536 1537 shared_resource = ch->shared_resource; 1538 if (shared_resource) { 1539 assert(ch->io_outstanding == 0); 1540 assert(shared_resource->ref > 0); 1541 shared_resource->ref--; 1542 if (shared_resource->ref == 0) { 1543 assert(shared_resource->io_outstanding == 0); 1544 TAILQ_REMOVE(&shared_resource->mgmt_ch->shared_resources, shared_resource, link); 1545 spdk_put_io_channel(spdk_io_channel_from_ctx(shared_resource->mgmt_ch)); 1546 free(shared_resource); 1547 } 1548 } 1549 } 1550 1551 /* Caller must hold bdev->internal.mutex. */ 1552 static void 1553 _spdk_bdev_enable_qos(struct spdk_bdev *bdev, struct spdk_bdev_channel *ch) 1554 { 1555 struct spdk_bdev_qos *qos = bdev->internal.qos; 1556 int i; 1557 1558 /* Rate limiting on this bdev enabled */ 1559 if (qos) { 1560 if (qos->ch == NULL) { 1561 struct spdk_io_channel *io_ch; 1562 1563 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Selecting channel %p as QoS channel for bdev %s on thread %p\n", ch, 1564 bdev->name, spdk_get_thread()); 1565 1566 /* No qos channel has been selected, so set one up */ 1567 1568 /* Take another reference to ch */ 1569 io_ch = spdk_get_io_channel(__bdev_to_io_dev(bdev)); 1570 qos->ch = ch; 1571 1572 qos->thread = spdk_io_channel_get_thread(io_ch); 1573 1574 TAILQ_INIT(&qos->queued); 1575 1576 for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) { 1577 if (_spdk_bdev_qos_is_iops_rate_limit(i) == true) { 1578 qos->rate_limits[i].min_per_timeslice = 1579 SPDK_BDEV_QOS_MIN_IO_PER_TIMESLICE; 1580 } else { 1581 qos->rate_limits[i].min_per_timeslice = 1582 SPDK_BDEV_QOS_MIN_BYTE_PER_TIMESLICE; 1583 } 1584 1585 if (qos->rate_limits[i].limit == 0) { 1586 qos->rate_limits[i].limit = SPDK_BDEV_QOS_LIMIT_NOT_DEFINED; 1587 } 1588 } 1589 spdk_bdev_qos_update_max_quota_per_timeslice(qos); 1590 qos->timeslice_size = 1591 SPDK_BDEV_QOS_TIMESLICE_IN_USEC * spdk_get_ticks_hz() / SPDK_SEC_TO_USEC; 1592 qos->last_timeslice = spdk_get_ticks(); 1593 qos->poller = spdk_poller_register(spdk_bdev_channel_poll_qos, 1594 qos, 1595 SPDK_BDEV_QOS_TIMESLICE_IN_USEC); 1596 } 1597 1598 ch->flags |= BDEV_CH_QOS_ENABLED; 1599 } 1600 } 1601 1602 static int 1603 spdk_bdev_channel_create(void *io_device, void *ctx_buf) 1604 { 1605 struct spdk_bdev *bdev = __bdev_from_io_dev(io_device); 1606 struct spdk_bdev_channel *ch = ctx_buf; 1607 struct spdk_io_channel *mgmt_io_ch; 1608 struct spdk_bdev_mgmt_channel *mgmt_ch; 1609 struct spdk_bdev_shared_resource *shared_resource; 1610 1611 ch->bdev = bdev; 1612 ch->channel = bdev->fn_table->get_io_channel(bdev->ctxt); 1613 if (!ch->channel) { 1614 return -1; 1615 } 1616 1617 mgmt_io_ch = spdk_get_io_channel(&g_bdev_mgr); 1618 if (!mgmt_io_ch) { 1619 return -1; 1620 } 1621 1622 mgmt_ch = spdk_io_channel_get_ctx(mgmt_io_ch); 1623 TAILQ_FOREACH(shared_resource, &mgmt_ch->shared_resources, link) { 1624 if (shared_resource->shared_ch == ch->channel) { 1625 spdk_put_io_channel(mgmt_io_ch); 1626 shared_resource->ref++; 1627 break; 1628 } 1629 } 1630 1631 if (shared_resource == NULL) { 1632 shared_resource = calloc(1, sizeof(*shared_resource)); 1633 if (shared_resource == NULL) { 1634 spdk_put_io_channel(mgmt_io_ch); 1635 return -1; 1636 } 1637 1638 shared_resource->mgmt_ch = mgmt_ch; 1639 shared_resource->io_outstanding = 0; 1640 TAILQ_INIT(&shared_resource->nomem_io); 1641 shared_resource->nomem_threshold = 0; 1642 shared_resource->shared_ch = ch->channel; 1643 shared_resource->ref = 1; 1644 TAILQ_INSERT_TAIL(&mgmt_ch->shared_resources, shared_resource, link); 1645 } 1646 1647 memset(&ch->stat, 0, sizeof(ch->stat)); 1648 ch->stat.ticks_rate = spdk_get_ticks_hz(); 1649 ch->io_outstanding = 0; 1650 TAILQ_INIT(&ch->queued_resets); 1651 ch->flags = 0; 1652 ch->shared_resource = shared_resource; 1653 1654 #ifdef SPDK_CONFIG_VTUNE 1655 { 1656 char *name; 1657 __itt_init_ittlib(NULL, 0); 1658 name = spdk_sprintf_alloc("spdk_bdev_%s_%p", ch->bdev->name, ch); 1659 if (!name) { 1660 _spdk_bdev_channel_destroy_resource(ch); 1661 return -1; 1662 } 1663 ch->handle = __itt_string_handle_create(name); 1664 free(name); 1665 ch->start_tsc = spdk_get_ticks(); 1666 ch->interval_tsc = spdk_get_ticks_hz() / 100; 1667 memset(&ch->prev_stat, 0, sizeof(ch->prev_stat)); 1668 } 1669 #endif 1670 1671 pthread_mutex_lock(&bdev->internal.mutex); 1672 _spdk_bdev_enable_qos(bdev, ch); 1673 pthread_mutex_unlock(&bdev->internal.mutex); 1674 1675 return 0; 1676 } 1677 1678 /* 1679 * Abort I/O that are waiting on a data buffer. These types of I/O are 1680 * linked using the spdk_bdev_io internal.buf_link TAILQ_ENTRY. 1681 */ 1682 static void 1683 _spdk_bdev_abort_buf_io(bdev_io_stailq_t *queue, struct spdk_bdev_channel *ch) 1684 { 1685 bdev_io_stailq_t tmp; 1686 struct spdk_bdev_io *bdev_io; 1687 1688 STAILQ_INIT(&tmp); 1689 1690 while (!STAILQ_EMPTY(queue)) { 1691 bdev_io = STAILQ_FIRST(queue); 1692 STAILQ_REMOVE_HEAD(queue, internal.buf_link); 1693 if (bdev_io->internal.ch == ch) { 1694 spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED); 1695 } else { 1696 STAILQ_INSERT_TAIL(&tmp, bdev_io, internal.buf_link); 1697 } 1698 } 1699 1700 STAILQ_SWAP(&tmp, queue, spdk_bdev_io); 1701 } 1702 1703 /* 1704 * Abort I/O that are queued waiting for submission. These types of I/O are 1705 * linked using the spdk_bdev_io link TAILQ_ENTRY. 1706 */ 1707 static void 1708 _spdk_bdev_abort_queued_io(bdev_io_tailq_t *queue, struct spdk_bdev_channel *ch) 1709 { 1710 struct spdk_bdev_io *bdev_io, *tmp; 1711 1712 TAILQ_FOREACH_SAFE(bdev_io, queue, internal.link, tmp) { 1713 if (bdev_io->internal.ch == ch) { 1714 TAILQ_REMOVE(queue, bdev_io, internal.link); 1715 /* 1716 * spdk_bdev_io_complete() assumes that the completed I/O had 1717 * been submitted to the bdev module. Since in this case it 1718 * hadn't, bump io_outstanding to account for the decrement 1719 * that spdk_bdev_io_complete() will do. 1720 */ 1721 if (bdev_io->type != SPDK_BDEV_IO_TYPE_RESET) { 1722 ch->io_outstanding++; 1723 ch->shared_resource->io_outstanding++; 1724 } 1725 spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED); 1726 } 1727 } 1728 } 1729 1730 static void 1731 spdk_bdev_qos_channel_destroy(void *cb_arg) 1732 { 1733 struct spdk_bdev_qos *qos = cb_arg; 1734 1735 spdk_put_io_channel(spdk_io_channel_from_ctx(qos->ch)); 1736 spdk_poller_unregister(&qos->poller); 1737 1738 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Free QoS %p.\n", qos); 1739 1740 free(qos); 1741 } 1742 1743 static int 1744 spdk_bdev_qos_destroy(struct spdk_bdev *bdev) 1745 { 1746 int i; 1747 1748 /* 1749 * Cleanly shutting down the QoS poller is tricky, because 1750 * during the asynchronous operation the user could open 1751 * a new descriptor and create a new channel, spawning 1752 * a new QoS poller. 1753 * 1754 * The strategy is to create a new QoS structure here and swap it 1755 * in. The shutdown path then continues to refer to the old one 1756 * until it completes and then releases it. 1757 */ 1758 struct spdk_bdev_qos *new_qos, *old_qos; 1759 1760 old_qos = bdev->internal.qos; 1761 1762 new_qos = calloc(1, sizeof(*new_qos)); 1763 if (!new_qos) { 1764 SPDK_ERRLOG("Unable to allocate memory to shut down QoS.\n"); 1765 return -ENOMEM; 1766 } 1767 1768 /* Copy the old QoS data into the newly allocated structure */ 1769 memcpy(new_qos, old_qos, sizeof(*new_qos)); 1770 1771 /* Zero out the key parts of the QoS structure */ 1772 new_qos->ch = NULL; 1773 new_qos->thread = NULL; 1774 new_qos->poller = NULL; 1775 TAILQ_INIT(&new_qos->queued); 1776 /* 1777 * The limit member of spdk_bdev_qos_limit structure is not zeroed. 1778 * It will be used later for the new QoS structure. 1779 */ 1780 for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) { 1781 new_qos->rate_limits[i].remaining_this_timeslice = 0; 1782 new_qos->rate_limits[i].min_per_timeslice = 0; 1783 new_qos->rate_limits[i].max_per_timeslice = 0; 1784 } 1785 1786 bdev->internal.qos = new_qos; 1787 1788 if (old_qos->thread == NULL) { 1789 free(old_qos); 1790 } else { 1791 spdk_thread_send_msg(old_qos->thread, spdk_bdev_qos_channel_destroy, 1792 old_qos); 1793 } 1794 1795 /* It is safe to continue with destroying the bdev even though the QoS channel hasn't 1796 * been destroyed yet. The destruction path will end up waiting for the final 1797 * channel to be put before it releases resources. */ 1798 1799 return 0; 1800 } 1801 1802 static void 1803 _spdk_bdev_io_stat_add(struct spdk_bdev_io_stat *total, struct spdk_bdev_io_stat *add) 1804 { 1805 total->bytes_read += add->bytes_read; 1806 total->num_read_ops += add->num_read_ops; 1807 total->bytes_written += add->bytes_written; 1808 total->num_write_ops += add->num_write_ops; 1809 total->read_latency_ticks += add->read_latency_ticks; 1810 total->write_latency_ticks += add->write_latency_ticks; 1811 } 1812 1813 static void 1814 spdk_bdev_channel_destroy(void *io_device, void *ctx_buf) 1815 { 1816 struct spdk_bdev_channel *ch = ctx_buf; 1817 struct spdk_bdev_mgmt_channel *mgmt_ch; 1818 struct spdk_bdev_shared_resource *shared_resource = ch->shared_resource; 1819 1820 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Destroying channel %p for bdev %s on thread %p\n", ch, ch->bdev->name, 1821 spdk_get_thread()); 1822 1823 /* This channel is going away, so add its statistics into the bdev so that they don't get lost. */ 1824 pthread_mutex_lock(&ch->bdev->internal.mutex); 1825 _spdk_bdev_io_stat_add(&ch->bdev->internal.stat, &ch->stat); 1826 pthread_mutex_unlock(&ch->bdev->internal.mutex); 1827 1828 mgmt_ch = shared_resource->mgmt_ch; 1829 1830 _spdk_bdev_abort_queued_io(&ch->queued_resets, ch); 1831 _spdk_bdev_abort_queued_io(&shared_resource->nomem_io, ch); 1832 _spdk_bdev_abort_buf_io(&mgmt_ch->need_buf_small, ch); 1833 _spdk_bdev_abort_buf_io(&mgmt_ch->need_buf_large, ch); 1834 1835 _spdk_bdev_channel_destroy_resource(ch); 1836 } 1837 1838 int 1839 spdk_bdev_alias_add(struct spdk_bdev *bdev, const char *alias) 1840 { 1841 struct spdk_bdev_alias *tmp; 1842 1843 if (alias == NULL) { 1844 SPDK_ERRLOG("Empty alias passed\n"); 1845 return -EINVAL; 1846 } 1847 1848 if (spdk_bdev_get_by_name(alias)) { 1849 SPDK_ERRLOG("Bdev name/alias: %s already exists\n", alias); 1850 return -EEXIST; 1851 } 1852 1853 tmp = calloc(1, sizeof(*tmp)); 1854 if (tmp == NULL) { 1855 SPDK_ERRLOG("Unable to allocate alias\n"); 1856 return -ENOMEM; 1857 } 1858 1859 tmp->alias = strdup(alias); 1860 if (tmp->alias == NULL) { 1861 free(tmp); 1862 SPDK_ERRLOG("Unable to allocate alias\n"); 1863 return -ENOMEM; 1864 } 1865 1866 TAILQ_INSERT_TAIL(&bdev->aliases, tmp, tailq); 1867 1868 return 0; 1869 } 1870 1871 int 1872 spdk_bdev_alias_del(struct spdk_bdev *bdev, const char *alias) 1873 { 1874 struct spdk_bdev_alias *tmp; 1875 1876 TAILQ_FOREACH(tmp, &bdev->aliases, tailq) { 1877 if (strcmp(alias, tmp->alias) == 0) { 1878 TAILQ_REMOVE(&bdev->aliases, tmp, tailq); 1879 free(tmp->alias); 1880 free(tmp); 1881 return 0; 1882 } 1883 } 1884 1885 SPDK_INFOLOG(SPDK_LOG_BDEV, "Alias %s does not exists\n", alias); 1886 1887 return -ENOENT; 1888 } 1889 1890 void 1891 spdk_bdev_alias_del_all(struct spdk_bdev *bdev) 1892 { 1893 struct spdk_bdev_alias *p, *tmp; 1894 1895 TAILQ_FOREACH_SAFE(p, &bdev->aliases, tailq, tmp) { 1896 TAILQ_REMOVE(&bdev->aliases, p, tailq); 1897 free(p->alias); 1898 free(p); 1899 } 1900 } 1901 1902 struct spdk_io_channel * 1903 spdk_bdev_get_io_channel(struct spdk_bdev_desc *desc) 1904 { 1905 return spdk_get_io_channel(__bdev_to_io_dev(desc->bdev)); 1906 } 1907 1908 const char * 1909 spdk_bdev_get_name(const struct spdk_bdev *bdev) 1910 { 1911 return bdev->name; 1912 } 1913 1914 const char * 1915 spdk_bdev_get_product_name(const struct spdk_bdev *bdev) 1916 { 1917 return bdev->product_name; 1918 } 1919 1920 const struct spdk_bdev_aliases_list * 1921 spdk_bdev_get_aliases(const struct spdk_bdev *bdev) 1922 { 1923 return &bdev->aliases; 1924 } 1925 1926 uint32_t 1927 spdk_bdev_get_block_size(const struct spdk_bdev *bdev) 1928 { 1929 return bdev->blocklen; 1930 } 1931 1932 uint64_t 1933 spdk_bdev_get_num_blocks(const struct spdk_bdev *bdev) 1934 { 1935 return bdev->blockcnt; 1936 } 1937 1938 const char * 1939 spdk_bdev_get_qos_rpc_type(enum spdk_bdev_qos_rate_limit_type type) 1940 { 1941 return qos_rpc_type[type]; 1942 } 1943 1944 void 1945 spdk_bdev_get_qos_rate_limits(struct spdk_bdev *bdev, uint64_t *limits) 1946 { 1947 int i; 1948 1949 memset(limits, 0, sizeof(*limits) * SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES); 1950 1951 pthread_mutex_lock(&bdev->internal.mutex); 1952 if (bdev->internal.qos) { 1953 for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) { 1954 if (bdev->internal.qos->rate_limits[i].limit != 1955 SPDK_BDEV_QOS_LIMIT_NOT_DEFINED) { 1956 limits[i] = bdev->internal.qos->rate_limits[i].limit; 1957 } 1958 } 1959 } 1960 pthread_mutex_unlock(&bdev->internal.mutex); 1961 } 1962 1963 size_t 1964 spdk_bdev_get_buf_align(const struct spdk_bdev *bdev) 1965 { 1966 /* TODO: push this logic down to the bdev modules */ 1967 if (bdev->need_aligned_buffer) { 1968 return bdev->blocklen; 1969 } 1970 1971 return 1; 1972 } 1973 1974 uint32_t 1975 spdk_bdev_get_optimal_io_boundary(const struct spdk_bdev *bdev) 1976 { 1977 return bdev->optimal_io_boundary; 1978 } 1979 1980 bool 1981 spdk_bdev_has_write_cache(const struct spdk_bdev *bdev) 1982 { 1983 return bdev->write_cache; 1984 } 1985 1986 const struct spdk_uuid * 1987 spdk_bdev_get_uuid(const struct spdk_bdev *bdev) 1988 { 1989 return &bdev->uuid; 1990 } 1991 1992 uint64_t 1993 spdk_bdev_get_qd(const struct spdk_bdev *bdev) 1994 { 1995 return bdev->internal.measured_queue_depth; 1996 } 1997 1998 uint64_t 1999 spdk_bdev_get_qd_sampling_period(const struct spdk_bdev *bdev) 2000 { 2001 return bdev->internal.period; 2002 } 2003 2004 uint64_t 2005 spdk_bdev_get_weighted_io_time(const struct spdk_bdev *bdev) 2006 { 2007 return bdev->internal.weighted_io_time; 2008 } 2009 2010 uint64_t 2011 spdk_bdev_get_io_time(const struct spdk_bdev *bdev) 2012 { 2013 return bdev->internal.io_time; 2014 } 2015 2016 static void 2017 _calculate_measured_qd_cpl(struct spdk_io_channel_iter *i, int status) 2018 { 2019 struct spdk_bdev *bdev = spdk_io_channel_iter_get_ctx(i); 2020 2021 bdev->internal.measured_queue_depth = bdev->internal.temporary_queue_depth; 2022 2023 if (bdev->internal.measured_queue_depth) { 2024 bdev->internal.io_time += bdev->internal.period; 2025 bdev->internal.weighted_io_time += bdev->internal.period * bdev->internal.measured_queue_depth; 2026 } 2027 } 2028 2029 static void 2030 _calculate_measured_qd(struct spdk_io_channel_iter *i) 2031 { 2032 struct spdk_bdev *bdev = spdk_io_channel_iter_get_ctx(i); 2033 struct spdk_io_channel *io_ch = spdk_io_channel_iter_get_channel(i); 2034 struct spdk_bdev_channel *ch = spdk_io_channel_get_ctx(io_ch); 2035 2036 bdev->internal.temporary_queue_depth += ch->io_outstanding; 2037 spdk_for_each_channel_continue(i, 0); 2038 } 2039 2040 static int 2041 spdk_bdev_calculate_measured_queue_depth(void *ctx) 2042 { 2043 struct spdk_bdev *bdev = ctx; 2044 bdev->internal.temporary_queue_depth = 0; 2045 spdk_for_each_channel(__bdev_to_io_dev(bdev), _calculate_measured_qd, bdev, 2046 _calculate_measured_qd_cpl); 2047 return 0; 2048 } 2049 2050 void 2051 spdk_bdev_set_qd_sampling_period(struct spdk_bdev *bdev, uint64_t period) 2052 { 2053 bdev->internal.period = period; 2054 2055 if (bdev->internal.qd_poller != NULL) { 2056 spdk_poller_unregister(&bdev->internal.qd_poller); 2057 bdev->internal.measured_queue_depth = UINT64_MAX; 2058 } 2059 2060 if (period != 0) { 2061 bdev->internal.qd_poller = spdk_poller_register(spdk_bdev_calculate_measured_queue_depth, bdev, 2062 period); 2063 } 2064 } 2065 2066 int 2067 spdk_bdev_notify_blockcnt_change(struct spdk_bdev *bdev, uint64_t size) 2068 { 2069 int ret; 2070 2071 pthread_mutex_lock(&bdev->internal.mutex); 2072 2073 /* bdev has open descriptors */ 2074 if (!TAILQ_EMPTY(&bdev->internal.open_descs) && 2075 bdev->blockcnt > size) { 2076 ret = -EBUSY; 2077 } else { 2078 bdev->blockcnt = size; 2079 ret = 0; 2080 } 2081 2082 pthread_mutex_unlock(&bdev->internal.mutex); 2083 2084 return ret; 2085 } 2086 2087 /* 2088 * Convert I/O offset and length from bytes to blocks. 2089 * 2090 * Returns zero on success or non-zero if the byte parameters aren't divisible by the block size. 2091 */ 2092 static uint64_t 2093 spdk_bdev_bytes_to_blocks(struct spdk_bdev *bdev, uint64_t offset_bytes, uint64_t *offset_blocks, 2094 uint64_t num_bytes, uint64_t *num_blocks) 2095 { 2096 uint32_t block_size = bdev->blocklen; 2097 2098 *offset_blocks = offset_bytes / block_size; 2099 *num_blocks = num_bytes / block_size; 2100 2101 return (offset_bytes % block_size) | (num_bytes % block_size); 2102 } 2103 2104 static bool 2105 spdk_bdev_io_valid_blocks(struct spdk_bdev *bdev, uint64_t offset_blocks, uint64_t num_blocks) 2106 { 2107 /* Return failure if offset_blocks + num_blocks is less than offset_blocks; indicates there 2108 * has been an overflow and hence the offset has been wrapped around */ 2109 if (offset_blocks + num_blocks < offset_blocks) { 2110 return false; 2111 } 2112 2113 /* Return failure if offset_blocks + num_blocks exceeds the size of the bdev */ 2114 if (offset_blocks + num_blocks > bdev->blockcnt) { 2115 return false; 2116 } 2117 2118 return true; 2119 } 2120 2121 int 2122 spdk_bdev_read(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 2123 void *buf, uint64_t offset, uint64_t nbytes, 2124 spdk_bdev_io_completion_cb cb, void *cb_arg) 2125 { 2126 uint64_t offset_blocks, num_blocks; 2127 2128 if (spdk_bdev_bytes_to_blocks(desc->bdev, offset, &offset_blocks, nbytes, &num_blocks) != 0) { 2129 return -EINVAL; 2130 } 2131 2132 return spdk_bdev_read_blocks(desc, ch, buf, offset_blocks, num_blocks, cb, cb_arg); 2133 } 2134 2135 int 2136 spdk_bdev_read_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 2137 void *buf, uint64_t offset_blocks, uint64_t num_blocks, 2138 spdk_bdev_io_completion_cb cb, void *cb_arg) 2139 { 2140 struct spdk_bdev *bdev = desc->bdev; 2141 struct spdk_bdev_io *bdev_io; 2142 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 2143 2144 if (!spdk_bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) { 2145 return -EINVAL; 2146 } 2147 2148 bdev_io = spdk_bdev_get_io(channel); 2149 if (!bdev_io) { 2150 return -ENOMEM; 2151 } 2152 2153 bdev_io->internal.ch = channel; 2154 bdev_io->internal.desc = desc; 2155 bdev_io->type = SPDK_BDEV_IO_TYPE_READ; 2156 bdev_io->u.bdev.iovs = &bdev_io->iov; 2157 bdev_io->u.bdev.iovs[0].iov_base = buf; 2158 bdev_io->u.bdev.iovs[0].iov_len = num_blocks * bdev->blocklen; 2159 bdev_io->u.bdev.iovcnt = 1; 2160 bdev_io->u.bdev.num_blocks = num_blocks; 2161 bdev_io->u.bdev.offset_blocks = offset_blocks; 2162 spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb); 2163 2164 spdk_bdev_io_submit(bdev_io); 2165 return 0; 2166 } 2167 2168 int 2169 spdk_bdev_readv(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 2170 struct iovec *iov, int iovcnt, 2171 uint64_t offset, uint64_t nbytes, 2172 spdk_bdev_io_completion_cb cb, void *cb_arg) 2173 { 2174 uint64_t offset_blocks, num_blocks; 2175 2176 if (spdk_bdev_bytes_to_blocks(desc->bdev, offset, &offset_blocks, nbytes, &num_blocks) != 0) { 2177 return -EINVAL; 2178 } 2179 2180 return spdk_bdev_readv_blocks(desc, ch, iov, iovcnt, offset_blocks, num_blocks, cb, cb_arg); 2181 } 2182 2183 int spdk_bdev_readv_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 2184 struct iovec *iov, int iovcnt, 2185 uint64_t offset_blocks, uint64_t num_blocks, 2186 spdk_bdev_io_completion_cb cb, void *cb_arg) 2187 { 2188 struct spdk_bdev *bdev = desc->bdev; 2189 struct spdk_bdev_io *bdev_io; 2190 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 2191 2192 if (!spdk_bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) { 2193 return -EINVAL; 2194 } 2195 2196 bdev_io = spdk_bdev_get_io(channel); 2197 if (!bdev_io) { 2198 return -ENOMEM; 2199 } 2200 2201 bdev_io->internal.ch = channel; 2202 bdev_io->internal.desc = desc; 2203 bdev_io->type = SPDK_BDEV_IO_TYPE_READ; 2204 bdev_io->u.bdev.iovs = iov; 2205 bdev_io->u.bdev.iovcnt = iovcnt; 2206 bdev_io->u.bdev.num_blocks = num_blocks; 2207 bdev_io->u.bdev.offset_blocks = offset_blocks; 2208 spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb); 2209 2210 spdk_bdev_io_submit(bdev_io); 2211 return 0; 2212 } 2213 2214 int 2215 spdk_bdev_write(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 2216 void *buf, uint64_t offset, uint64_t nbytes, 2217 spdk_bdev_io_completion_cb cb, void *cb_arg) 2218 { 2219 uint64_t offset_blocks, num_blocks; 2220 2221 if (spdk_bdev_bytes_to_blocks(desc->bdev, offset, &offset_blocks, nbytes, &num_blocks) != 0) { 2222 return -EINVAL; 2223 } 2224 2225 return spdk_bdev_write_blocks(desc, ch, buf, offset_blocks, num_blocks, cb, cb_arg); 2226 } 2227 2228 int 2229 spdk_bdev_write_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 2230 void *buf, uint64_t offset_blocks, uint64_t num_blocks, 2231 spdk_bdev_io_completion_cb cb, void *cb_arg) 2232 { 2233 struct spdk_bdev *bdev = desc->bdev; 2234 struct spdk_bdev_io *bdev_io; 2235 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 2236 2237 if (!desc->write) { 2238 return -EBADF; 2239 } 2240 2241 if (!spdk_bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) { 2242 return -EINVAL; 2243 } 2244 2245 bdev_io = spdk_bdev_get_io(channel); 2246 if (!bdev_io) { 2247 return -ENOMEM; 2248 } 2249 2250 bdev_io->internal.ch = channel; 2251 bdev_io->internal.desc = desc; 2252 bdev_io->type = SPDK_BDEV_IO_TYPE_WRITE; 2253 bdev_io->u.bdev.iovs = &bdev_io->iov; 2254 bdev_io->u.bdev.iovs[0].iov_base = buf; 2255 bdev_io->u.bdev.iovs[0].iov_len = num_blocks * bdev->blocklen; 2256 bdev_io->u.bdev.iovcnt = 1; 2257 bdev_io->u.bdev.num_blocks = num_blocks; 2258 bdev_io->u.bdev.offset_blocks = offset_blocks; 2259 spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb); 2260 2261 spdk_bdev_io_submit(bdev_io); 2262 return 0; 2263 } 2264 2265 int 2266 spdk_bdev_writev(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 2267 struct iovec *iov, int iovcnt, 2268 uint64_t offset, uint64_t len, 2269 spdk_bdev_io_completion_cb cb, void *cb_arg) 2270 { 2271 uint64_t offset_blocks, num_blocks; 2272 2273 if (spdk_bdev_bytes_to_blocks(desc->bdev, offset, &offset_blocks, len, &num_blocks) != 0) { 2274 return -EINVAL; 2275 } 2276 2277 return spdk_bdev_writev_blocks(desc, ch, iov, iovcnt, offset_blocks, num_blocks, cb, cb_arg); 2278 } 2279 2280 int 2281 spdk_bdev_writev_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 2282 struct iovec *iov, int iovcnt, 2283 uint64_t offset_blocks, uint64_t num_blocks, 2284 spdk_bdev_io_completion_cb cb, void *cb_arg) 2285 { 2286 struct spdk_bdev *bdev = desc->bdev; 2287 struct spdk_bdev_io *bdev_io; 2288 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 2289 2290 if (!desc->write) { 2291 return -EBADF; 2292 } 2293 2294 if (!spdk_bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) { 2295 return -EINVAL; 2296 } 2297 2298 bdev_io = spdk_bdev_get_io(channel); 2299 if (!bdev_io) { 2300 return -ENOMEM; 2301 } 2302 2303 bdev_io->internal.ch = channel; 2304 bdev_io->internal.desc = desc; 2305 bdev_io->type = SPDK_BDEV_IO_TYPE_WRITE; 2306 bdev_io->u.bdev.iovs = iov; 2307 bdev_io->u.bdev.iovcnt = iovcnt; 2308 bdev_io->u.bdev.num_blocks = num_blocks; 2309 bdev_io->u.bdev.offset_blocks = offset_blocks; 2310 spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb); 2311 2312 spdk_bdev_io_submit(bdev_io); 2313 return 0; 2314 } 2315 2316 int 2317 spdk_bdev_write_zeroes(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 2318 uint64_t offset, uint64_t len, 2319 spdk_bdev_io_completion_cb cb, void *cb_arg) 2320 { 2321 uint64_t offset_blocks, num_blocks; 2322 2323 if (spdk_bdev_bytes_to_blocks(desc->bdev, offset, &offset_blocks, len, &num_blocks) != 0) { 2324 return -EINVAL; 2325 } 2326 2327 return spdk_bdev_write_zeroes_blocks(desc, ch, offset_blocks, num_blocks, cb, cb_arg); 2328 } 2329 2330 int 2331 spdk_bdev_write_zeroes_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 2332 uint64_t offset_blocks, uint64_t num_blocks, 2333 spdk_bdev_io_completion_cb cb, void *cb_arg) 2334 { 2335 struct spdk_bdev *bdev = desc->bdev; 2336 struct spdk_bdev_io *bdev_io; 2337 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 2338 2339 if (!desc->write) { 2340 return -EBADF; 2341 } 2342 2343 if (!spdk_bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) { 2344 return -EINVAL; 2345 } 2346 2347 bdev_io = spdk_bdev_get_io(channel); 2348 2349 if (!bdev_io) { 2350 return -ENOMEM; 2351 } 2352 2353 bdev_io->type = SPDK_BDEV_IO_TYPE_WRITE_ZEROES; 2354 bdev_io->internal.ch = channel; 2355 bdev_io->internal.desc = desc; 2356 bdev_io->u.bdev.offset_blocks = offset_blocks; 2357 bdev_io->u.bdev.num_blocks = num_blocks; 2358 spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb); 2359 2360 if (_spdk_bdev_io_type_supported(bdev, SPDK_BDEV_IO_TYPE_WRITE_ZEROES)) { 2361 spdk_bdev_io_submit(bdev_io); 2362 return 0; 2363 } else if (_spdk_bdev_io_type_supported(bdev, SPDK_BDEV_IO_TYPE_WRITE)) { 2364 assert(spdk_bdev_get_block_size(bdev) <= ZERO_BUFFER_SIZE); 2365 bdev_io->u.bdev.split_remaining_num_blocks = num_blocks; 2366 bdev_io->u.bdev.split_current_offset_blocks = offset_blocks; 2367 _spdk_bdev_write_zero_buffer_next(bdev_io); 2368 return 0; 2369 } else { 2370 spdk_bdev_free_io(bdev_io); 2371 return -ENOTSUP; 2372 } 2373 } 2374 2375 int 2376 spdk_bdev_unmap(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 2377 uint64_t offset, uint64_t nbytes, 2378 spdk_bdev_io_completion_cb cb, void *cb_arg) 2379 { 2380 uint64_t offset_blocks, num_blocks; 2381 2382 if (spdk_bdev_bytes_to_blocks(desc->bdev, offset, &offset_blocks, nbytes, &num_blocks) != 0) { 2383 return -EINVAL; 2384 } 2385 2386 return spdk_bdev_unmap_blocks(desc, ch, offset_blocks, num_blocks, cb, cb_arg); 2387 } 2388 2389 int 2390 spdk_bdev_unmap_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 2391 uint64_t offset_blocks, uint64_t num_blocks, 2392 spdk_bdev_io_completion_cb cb, void *cb_arg) 2393 { 2394 struct spdk_bdev *bdev = desc->bdev; 2395 struct spdk_bdev_io *bdev_io; 2396 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 2397 2398 if (!desc->write) { 2399 return -EBADF; 2400 } 2401 2402 if (!spdk_bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) { 2403 return -EINVAL; 2404 } 2405 2406 if (num_blocks == 0) { 2407 SPDK_ERRLOG("Can't unmap 0 bytes\n"); 2408 return -EINVAL; 2409 } 2410 2411 bdev_io = spdk_bdev_get_io(channel); 2412 if (!bdev_io) { 2413 return -ENOMEM; 2414 } 2415 2416 bdev_io->internal.ch = channel; 2417 bdev_io->internal.desc = desc; 2418 bdev_io->type = SPDK_BDEV_IO_TYPE_UNMAP; 2419 2420 bdev_io->u.bdev.iovs = &bdev_io->iov; 2421 bdev_io->u.bdev.iovs[0].iov_base = NULL; 2422 bdev_io->u.bdev.iovs[0].iov_len = 0; 2423 bdev_io->u.bdev.iovcnt = 1; 2424 2425 bdev_io->u.bdev.offset_blocks = offset_blocks; 2426 bdev_io->u.bdev.num_blocks = num_blocks; 2427 spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb); 2428 2429 spdk_bdev_io_submit(bdev_io); 2430 return 0; 2431 } 2432 2433 int 2434 spdk_bdev_flush(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 2435 uint64_t offset, uint64_t length, 2436 spdk_bdev_io_completion_cb cb, void *cb_arg) 2437 { 2438 uint64_t offset_blocks, num_blocks; 2439 2440 if (spdk_bdev_bytes_to_blocks(desc->bdev, offset, &offset_blocks, length, &num_blocks) != 0) { 2441 return -EINVAL; 2442 } 2443 2444 return spdk_bdev_flush_blocks(desc, ch, offset_blocks, num_blocks, cb, cb_arg); 2445 } 2446 2447 int 2448 spdk_bdev_flush_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 2449 uint64_t offset_blocks, uint64_t num_blocks, 2450 spdk_bdev_io_completion_cb cb, void *cb_arg) 2451 { 2452 struct spdk_bdev *bdev = desc->bdev; 2453 struct spdk_bdev_io *bdev_io; 2454 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 2455 2456 if (!desc->write) { 2457 return -EBADF; 2458 } 2459 2460 if (!spdk_bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) { 2461 return -EINVAL; 2462 } 2463 2464 bdev_io = spdk_bdev_get_io(channel); 2465 if (!bdev_io) { 2466 return -ENOMEM; 2467 } 2468 2469 bdev_io->internal.ch = channel; 2470 bdev_io->internal.desc = desc; 2471 bdev_io->type = SPDK_BDEV_IO_TYPE_FLUSH; 2472 bdev_io->u.bdev.iovs = NULL; 2473 bdev_io->u.bdev.iovcnt = 0; 2474 bdev_io->u.bdev.offset_blocks = offset_blocks; 2475 bdev_io->u.bdev.num_blocks = num_blocks; 2476 spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb); 2477 2478 spdk_bdev_io_submit(bdev_io); 2479 return 0; 2480 } 2481 2482 static void 2483 _spdk_bdev_reset_dev(struct spdk_io_channel_iter *i, int status) 2484 { 2485 struct spdk_bdev_channel *ch = spdk_io_channel_iter_get_ctx(i); 2486 struct spdk_bdev_io *bdev_io; 2487 2488 bdev_io = TAILQ_FIRST(&ch->queued_resets); 2489 TAILQ_REMOVE(&ch->queued_resets, bdev_io, internal.link); 2490 spdk_bdev_io_submit_reset(bdev_io); 2491 } 2492 2493 static void 2494 _spdk_bdev_reset_freeze_channel(struct spdk_io_channel_iter *i) 2495 { 2496 struct spdk_io_channel *ch; 2497 struct spdk_bdev_channel *channel; 2498 struct spdk_bdev_mgmt_channel *mgmt_channel; 2499 struct spdk_bdev_shared_resource *shared_resource; 2500 bdev_io_tailq_t tmp_queued; 2501 2502 TAILQ_INIT(&tmp_queued); 2503 2504 ch = spdk_io_channel_iter_get_channel(i); 2505 channel = spdk_io_channel_get_ctx(ch); 2506 shared_resource = channel->shared_resource; 2507 mgmt_channel = shared_resource->mgmt_ch; 2508 2509 channel->flags |= BDEV_CH_RESET_IN_PROGRESS; 2510 2511 if ((channel->flags & BDEV_CH_QOS_ENABLED) != 0) { 2512 /* The QoS object is always valid and readable while 2513 * the channel flag is set, so the lock here should not 2514 * be necessary. We're not in the fast path though, so 2515 * just take it anyway. */ 2516 pthread_mutex_lock(&channel->bdev->internal.mutex); 2517 if (channel->bdev->internal.qos->ch == channel) { 2518 TAILQ_SWAP(&channel->bdev->internal.qos->queued, &tmp_queued, spdk_bdev_io, internal.link); 2519 } 2520 pthread_mutex_unlock(&channel->bdev->internal.mutex); 2521 } 2522 2523 _spdk_bdev_abort_queued_io(&shared_resource->nomem_io, channel); 2524 _spdk_bdev_abort_buf_io(&mgmt_channel->need_buf_small, channel); 2525 _spdk_bdev_abort_buf_io(&mgmt_channel->need_buf_large, channel); 2526 _spdk_bdev_abort_queued_io(&tmp_queued, channel); 2527 2528 spdk_for_each_channel_continue(i, 0); 2529 } 2530 2531 static void 2532 _spdk_bdev_start_reset(void *ctx) 2533 { 2534 struct spdk_bdev_channel *ch = ctx; 2535 2536 spdk_for_each_channel(__bdev_to_io_dev(ch->bdev), _spdk_bdev_reset_freeze_channel, 2537 ch, _spdk_bdev_reset_dev); 2538 } 2539 2540 static void 2541 _spdk_bdev_channel_start_reset(struct spdk_bdev_channel *ch) 2542 { 2543 struct spdk_bdev *bdev = ch->bdev; 2544 2545 assert(!TAILQ_EMPTY(&ch->queued_resets)); 2546 2547 pthread_mutex_lock(&bdev->internal.mutex); 2548 if (bdev->internal.reset_in_progress == NULL) { 2549 bdev->internal.reset_in_progress = TAILQ_FIRST(&ch->queued_resets); 2550 /* 2551 * Take a channel reference for the target bdev for the life of this 2552 * reset. This guards against the channel getting destroyed while 2553 * spdk_for_each_channel() calls related to this reset IO are in 2554 * progress. We will release the reference when this reset is 2555 * completed. 2556 */ 2557 bdev->internal.reset_in_progress->u.reset.ch_ref = spdk_get_io_channel(__bdev_to_io_dev(bdev)); 2558 _spdk_bdev_start_reset(ch); 2559 } 2560 pthread_mutex_unlock(&bdev->internal.mutex); 2561 } 2562 2563 int 2564 spdk_bdev_reset(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 2565 spdk_bdev_io_completion_cb cb, void *cb_arg) 2566 { 2567 struct spdk_bdev *bdev = desc->bdev; 2568 struct spdk_bdev_io *bdev_io; 2569 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 2570 2571 bdev_io = spdk_bdev_get_io(channel); 2572 if (!bdev_io) { 2573 return -ENOMEM; 2574 } 2575 2576 bdev_io->internal.ch = channel; 2577 bdev_io->internal.desc = desc; 2578 bdev_io->type = SPDK_BDEV_IO_TYPE_RESET; 2579 bdev_io->u.reset.ch_ref = NULL; 2580 spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb); 2581 2582 pthread_mutex_lock(&bdev->internal.mutex); 2583 TAILQ_INSERT_TAIL(&channel->queued_resets, bdev_io, internal.link); 2584 pthread_mutex_unlock(&bdev->internal.mutex); 2585 2586 _spdk_bdev_channel_start_reset(channel); 2587 2588 return 0; 2589 } 2590 2591 void 2592 spdk_bdev_get_io_stat(struct spdk_bdev *bdev, struct spdk_io_channel *ch, 2593 struct spdk_bdev_io_stat *stat) 2594 { 2595 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 2596 2597 *stat = channel->stat; 2598 } 2599 2600 static void 2601 _spdk_bdev_get_device_stat_done(struct spdk_io_channel_iter *i, int status) 2602 { 2603 void *io_device = spdk_io_channel_iter_get_io_device(i); 2604 struct spdk_bdev_iostat_ctx *bdev_iostat_ctx = spdk_io_channel_iter_get_ctx(i); 2605 2606 bdev_iostat_ctx->cb(__bdev_from_io_dev(io_device), bdev_iostat_ctx->stat, 2607 bdev_iostat_ctx->cb_arg, 0); 2608 free(bdev_iostat_ctx); 2609 } 2610 2611 static void 2612 _spdk_bdev_get_each_channel_stat(struct spdk_io_channel_iter *i) 2613 { 2614 struct spdk_bdev_iostat_ctx *bdev_iostat_ctx = spdk_io_channel_iter_get_ctx(i); 2615 struct spdk_io_channel *ch = spdk_io_channel_iter_get_channel(i); 2616 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 2617 2618 _spdk_bdev_io_stat_add(bdev_iostat_ctx->stat, &channel->stat); 2619 spdk_for_each_channel_continue(i, 0); 2620 } 2621 2622 void 2623 spdk_bdev_get_device_stat(struct spdk_bdev *bdev, struct spdk_bdev_io_stat *stat, 2624 spdk_bdev_get_device_stat_cb cb, void *cb_arg) 2625 { 2626 struct spdk_bdev_iostat_ctx *bdev_iostat_ctx; 2627 2628 assert(bdev != NULL); 2629 assert(stat != NULL); 2630 assert(cb != NULL); 2631 2632 bdev_iostat_ctx = calloc(1, sizeof(struct spdk_bdev_iostat_ctx)); 2633 if (bdev_iostat_ctx == NULL) { 2634 SPDK_ERRLOG("Unable to allocate memory for spdk_bdev_iostat_ctx\n"); 2635 cb(bdev, stat, cb_arg, -ENOMEM); 2636 return; 2637 } 2638 2639 bdev_iostat_ctx->stat = stat; 2640 bdev_iostat_ctx->cb = cb; 2641 bdev_iostat_ctx->cb_arg = cb_arg; 2642 2643 /* Start with the statistics from previously deleted channels. */ 2644 pthread_mutex_lock(&bdev->internal.mutex); 2645 _spdk_bdev_io_stat_add(bdev_iostat_ctx->stat, &bdev->internal.stat); 2646 pthread_mutex_unlock(&bdev->internal.mutex); 2647 2648 /* Then iterate and add the statistics from each existing channel. */ 2649 spdk_for_each_channel(__bdev_to_io_dev(bdev), 2650 _spdk_bdev_get_each_channel_stat, 2651 bdev_iostat_ctx, 2652 _spdk_bdev_get_device_stat_done); 2653 } 2654 2655 int 2656 spdk_bdev_nvme_admin_passthru(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 2657 const struct spdk_nvme_cmd *cmd, void *buf, size_t nbytes, 2658 spdk_bdev_io_completion_cb cb, void *cb_arg) 2659 { 2660 struct spdk_bdev *bdev = desc->bdev; 2661 struct spdk_bdev_io *bdev_io; 2662 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 2663 2664 if (!desc->write) { 2665 return -EBADF; 2666 } 2667 2668 bdev_io = spdk_bdev_get_io(channel); 2669 if (!bdev_io) { 2670 return -ENOMEM; 2671 } 2672 2673 bdev_io->internal.ch = channel; 2674 bdev_io->internal.desc = desc; 2675 bdev_io->type = SPDK_BDEV_IO_TYPE_NVME_ADMIN; 2676 bdev_io->u.nvme_passthru.cmd = *cmd; 2677 bdev_io->u.nvme_passthru.buf = buf; 2678 bdev_io->u.nvme_passthru.nbytes = nbytes; 2679 bdev_io->u.nvme_passthru.md_buf = NULL; 2680 bdev_io->u.nvme_passthru.md_len = 0; 2681 2682 spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb); 2683 2684 spdk_bdev_io_submit(bdev_io); 2685 return 0; 2686 } 2687 2688 int 2689 spdk_bdev_nvme_io_passthru(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 2690 const struct spdk_nvme_cmd *cmd, void *buf, size_t nbytes, 2691 spdk_bdev_io_completion_cb cb, void *cb_arg) 2692 { 2693 struct spdk_bdev *bdev = desc->bdev; 2694 struct spdk_bdev_io *bdev_io; 2695 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 2696 2697 if (!desc->write) { 2698 /* 2699 * Do not try to parse the NVMe command - we could maybe use bits in the opcode 2700 * to easily determine if the command is a read or write, but for now just 2701 * do not allow io_passthru with a read-only descriptor. 2702 */ 2703 return -EBADF; 2704 } 2705 2706 bdev_io = spdk_bdev_get_io(channel); 2707 if (!bdev_io) { 2708 return -ENOMEM; 2709 } 2710 2711 bdev_io->internal.ch = channel; 2712 bdev_io->internal.desc = desc; 2713 bdev_io->type = SPDK_BDEV_IO_TYPE_NVME_IO; 2714 bdev_io->u.nvme_passthru.cmd = *cmd; 2715 bdev_io->u.nvme_passthru.buf = buf; 2716 bdev_io->u.nvme_passthru.nbytes = nbytes; 2717 bdev_io->u.nvme_passthru.md_buf = NULL; 2718 bdev_io->u.nvme_passthru.md_len = 0; 2719 2720 spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb); 2721 2722 spdk_bdev_io_submit(bdev_io); 2723 return 0; 2724 } 2725 2726 int 2727 spdk_bdev_nvme_io_passthru_md(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 2728 const struct spdk_nvme_cmd *cmd, void *buf, size_t nbytes, void *md_buf, size_t md_len, 2729 spdk_bdev_io_completion_cb cb, void *cb_arg) 2730 { 2731 struct spdk_bdev *bdev = desc->bdev; 2732 struct spdk_bdev_io *bdev_io; 2733 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 2734 2735 if (!desc->write) { 2736 /* 2737 * Do not try to parse the NVMe command - we could maybe use bits in the opcode 2738 * to easily determine if the command is a read or write, but for now just 2739 * do not allow io_passthru with a read-only descriptor. 2740 */ 2741 return -EBADF; 2742 } 2743 2744 bdev_io = spdk_bdev_get_io(channel); 2745 if (!bdev_io) { 2746 return -ENOMEM; 2747 } 2748 2749 bdev_io->internal.ch = channel; 2750 bdev_io->internal.desc = desc; 2751 bdev_io->type = SPDK_BDEV_IO_TYPE_NVME_IO_MD; 2752 bdev_io->u.nvme_passthru.cmd = *cmd; 2753 bdev_io->u.nvme_passthru.buf = buf; 2754 bdev_io->u.nvme_passthru.nbytes = nbytes; 2755 bdev_io->u.nvme_passthru.md_buf = md_buf; 2756 bdev_io->u.nvme_passthru.md_len = md_len; 2757 2758 spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb); 2759 2760 spdk_bdev_io_submit(bdev_io); 2761 return 0; 2762 } 2763 2764 int 2765 spdk_bdev_queue_io_wait(struct spdk_bdev *bdev, struct spdk_io_channel *ch, 2766 struct spdk_bdev_io_wait_entry *entry) 2767 { 2768 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 2769 struct spdk_bdev_mgmt_channel *mgmt_ch = channel->shared_resource->mgmt_ch; 2770 2771 if (bdev != entry->bdev) { 2772 SPDK_ERRLOG("bdevs do not match\n"); 2773 return -EINVAL; 2774 } 2775 2776 if (mgmt_ch->per_thread_cache_count > 0) { 2777 SPDK_ERRLOG("Cannot queue io_wait if spdk_bdev_io available in per-thread cache\n"); 2778 return -EINVAL; 2779 } 2780 2781 TAILQ_INSERT_TAIL(&mgmt_ch->io_wait_queue, entry, link); 2782 return 0; 2783 } 2784 2785 static void 2786 _spdk_bdev_ch_retry_io(struct spdk_bdev_channel *bdev_ch) 2787 { 2788 struct spdk_bdev *bdev = bdev_ch->bdev; 2789 struct spdk_bdev_shared_resource *shared_resource = bdev_ch->shared_resource; 2790 struct spdk_bdev_io *bdev_io; 2791 2792 if (shared_resource->io_outstanding > shared_resource->nomem_threshold) { 2793 /* 2794 * Allow some more I/O to complete before retrying the nomem_io queue. 2795 * Some drivers (such as nvme) cannot immediately take a new I/O in 2796 * the context of a completion, because the resources for the I/O are 2797 * not released until control returns to the bdev poller. Also, we 2798 * may require several small I/O to complete before a larger I/O 2799 * (that requires splitting) can be submitted. 2800 */ 2801 return; 2802 } 2803 2804 while (!TAILQ_EMPTY(&shared_resource->nomem_io)) { 2805 bdev_io = TAILQ_FIRST(&shared_resource->nomem_io); 2806 TAILQ_REMOVE(&shared_resource->nomem_io, bdev_io, internal.link); 2807 bdev_io->internal.ch->io_outstanding++; 2808 shared_resource->io_outstanding++; 2809 bdev_io->internal.status = SPDK_BDEV_IO_STATUS_PENDING; 2810 bdev->fn_table->submit_request(bdev_io->internal.ch->channel, bdev_io); 2811 if (bdev_io->internal.status == SPDK_BDEV_IO_STATUS_NOMEM) { 2812 break; 2813 } 2814 } 2815 } 2816 2817 static inline void 2818 _spdk_bdev_io_complete(void *ctx) 2819 { 2820 struct spdk_bdev_io *bdev_io = ctx; 2821 uint64_t tsc; 2822 2823 if (spdk_unlikely(bdev_io->internal.in_submit_request || bdev_io->internal.io_submit_ch)) { 2824 /* 2825 * Send the completion to the thread that originally submitted the I/O, 2826 * which may not be the current thread in the case of QoS. 2827 */ 2828 if (bdev_io->internal.io_submit_ch) { 2829 bdev_io->internal.ch = bdev_io->internal.io_submit_ch; 2830 bdev_io->internal.io_submit_ch = NULL; 2831 } 2832 2833 /* 2834 * Defer completion to avoid potential infinite recursion if the 2835 * user's completion callback issues a new I/O. 2836 */ 2837 spdk_thread_send_msg(spdk_io_channel_get_thread(bdev_io->internal.ch->channel), 2838 _spdk_bdev_io_complete, bdev_io); 2839 return; 2840 } 2841 2842 tsc = spdk_get_ticks(); 2843 spdk_trace_record_tsc(tsc, TRACE_BDEV_IO_DONE, 0, 0, (uintptr_t)bdev_io, 0); 2844 2845 if (bdev_io->internal.status == SPDK_BDEV_IO_STATUS_SUCCESS) { 2846 switch (bdev_io->type) { 2847 case SPDK_BDEV_IO_TYPE_READ: 2848 bdev_io->internal.ch->stat.bytes_read += bdev_io->u.bdev.num_blocks * bdev_io->bdev->blocklen; 2849 bdev_io->internal.ch->stat.num_read_ops++; 2850 bdev_io->internal.ch->stat.read_latency_ticks += (tsc - bdev_io->internal.submit_tsc); 2851 break; 2852 case SPDK_BDEV_IO_TYPE_WRITE: 2853 bdev_io->internal.ch->stat.bytes_written += bdev_io->u.bdev.num_blocks * bdev_io->bdev->blocklen; 2854 bdev_io->internal.ch->stat.num_write_ops++; 2855 bdev_io->internal.ch->stat.write_latency_ticks += (tsc - bdev_io->internal.submit_tsc); 2856 break; 2857 default: 2858 break; 2859 } 2860 } 2861 2862 #ifdef SPDK_CONFIG_VTUNE 2863 uint64_t now_tsc = spdk_get_ticks(); 2864 if (now_tsc > (bdev_io->internal.ch->start_tsc + bdev_io->internal.ch->interval_tsc)) { 2865 uint64_t data[5]; 2866 2867 data[0] = bdev_io->internal.ch->stat.num_read_ops - bdev_io->internal.ch->prev_stat.num_read_ops; 2868 data[1] = bdev_io->internal.ch->stat.bytes_read - bdev_io->internal.ch->prev_stat.bytes_read; 2869 data[2] = bdev_io->internal.ch->stat.num_write_ops - bdev_io->internal.ch->prev_stat.num_write_ops; 2870 data[3] = bdev_io->internal.ch->stat.bytes_written - bdev_io->internal.ch->prev_stat.bytes_written; 2871 data[4] = bdev_io->bdev->fn_table->get_spin_time ? 2872 bdev_io->bdev->fn_table->get_spin_time(bdev_io->internal.ch->channel) : 0; 2873 2874 __itt_metadata_add(g_bdev_mgr.domain, __itt_null, bdev_io->internal.ch->handle, 2875 __itt_metadata_u64, 5, data); 2876 2877 bdev_io->internal.ch->prev_stat = bdev_io->internal.ch->stat; 2878 bdev_io->internal.ch->start_tsc = now_tsc; 2879 } 2880 #endif 2881 2882 assert(bdev_io->internal.cb != NULL); 2883 assert(spdk_get_thread() == spdk_io_channel_get_thread(bdev_io->internal.ch->channel)); 2884 2885 bdev_io->internal.cb(bdev_io, bdev_io->internal.status == SPDK_BDEV_IO_STATUS_SUCCESS, 2886 bdev_io->internal.caller_ctx); 2887 } 2888 2889 static void 2890 _spdk_bdev_reset_complete(struct spdk_io_channel_iter *i, int status) 2891 { 2892 struct spdk_bdev_io *bdev_io = spdk_io_channel_iter_get_ctx(i); 2893 2894 if (bdev_io->u.reset.ch_ref != NULL) { 2895 spdk_put_io_channel(bdev_io->u.reset.ch_ref); 2896 bdev_io->u.reset.ch_ref = NULL; 2897 } 2898 2899 _spdk_bdev_io_complete(bdev_io); 2900 } 2901 2902 static void 2903 _spdk_bdev_unfreeze_channel(struct spdk_io_channel_iter *i) 2904 { 2905 struct spdk_io_channel *_ch = spdk_io_channel_iter_get_channel(i); 2906 struct spdk_bdev_channel *ch = spdk_io_channel_get_ctx(_ch); 2907 2908 ch->flags &= ~BDEV_CH_RESET_IN_PROGRESS; 2909 if (!TAILQ_EMPTY(&ch->queued_resets)) { 2910 _spdk_bdev_channel_start_reset(ch); 2911 } 2912 2913 spdk_for_each_channel_continue(i, 0); 2914 } 2915 2916 void 2917 spdk_bdev_io_complete(struct spdk_bdev_io *bdev_io, enum spdk_bdev_io_status status) 2918 { 2919 struct spdk_bdev *bdev = bdev_io->bdev; 2920 struct spdk_bdev_channel *bdev_ch = bdev_io->internal.ch; 2921 struct spdk_bdev_shared_resource *shared_resource = bdev_ch->shared_resource; 2922 2923 bdev_io->internal.status = status; 2924 2925 if (spdk_unlikely(bdev_io->type == SPDK_BDEV_IO_TYPE_RESET)) { 2926 bool unlock_channels = false; 2927 2928 if (status == SPDK_BDEV_IO_STATUS_NOMEM) { 2929 SPDK_ERRLOG("NOMEM returned for reset\n"); 2930 } 2931 pthread_mutex_lock(&bdev->internal.mutex); 2932 if (bdev_io == bdev->internal.reset_in_progress) { 2933 bdev->internal.reset_in_progress = NULL; 2934 unlock_channels = true; 2935 } 2936 pthread_mutex_unlock(&bdev->internal.mutex); 2937 2938 if (unlock_channels) { 2939 spdk_for_each_channel(__bdev_to_io_dev(bdev), _spdk_bdev_unfreeze_channel, 2940 bdev_io, _spdk_bdev_reset_complete); 2941 return; 2942 } 2943 } else { 2944 assert(bdev_ch->io_outstanding > 0); 2945 assert(shared_resource->io_outstanding > 0); 2946 bdev_ch->io_outstanding--; 2947 shared_resource->io_outstanding--; 2948 2949 if (spdk_unlikely(status == SPDK_BDEV_IO_STATUS_NOMEM)) { 2950 TAILQ_INSERT_HEAD(&shared_resource->nomem_io, bdev_io, internal.link); 2951 /* 2952 * Wait for some of the outstanding I/O to complete before we 2953 * retry any of the nomem_io. Normally we will wait for 2954 * NOMEM_THRESHOLD_COUNT I/O to complete but for low queue 2955 * depth channels we will instead wait for half to complete. 2956 */ 2957 shared_resource->nomem_threshold = spdk_max((int64_t)shared_resource->io_outstanding / 2, 2958 (int64_t)shared_resource->io_outstanding - NOMEM_THRESHOLD_COUNT); 2959 return; 2960 } 2961 2962 if (spdk_unlikely(!TAILQ_EMPTY(&shared_resource->nomem_io))) { 2963 _spdk_bdev_ch_retry_io(bdev_ch); 2964 } 2965 } 2966 2967 _spdk_bdev_io_complete(bdev_io); 2968 } 2969 2970 void 2971 spdk_bdev_io_complete_scsi_status(struct spdk_bdev_io *bdev_io, enum spdk_scsi_status sc, 2972 enum spdk_scsi_sense sk, uint8_t asc, uint8_t ascq) 2973 { 2974 if (sc == SPDK_SCSI_STATUS_GOOD) { 2975 bdev_io->internal.status = SPDK_BDEV_IO_STATUS_SUCCESS; 2976 } else { 2977 bdev_io->internal.status = SPDK_BDEV_IO_STATUS_SCSI_ERROR; 2978 bdev_io->internal.error.scsi.sc = sc; 2979 bdev_io->internal.error.scsi.sk = sk; 2980 bdev_io->internal.error.scsi.asc = asc; 2981 bdev_io->internal.error.scsi.ascq = ascq; 2982 } 2983 2984 spdk_bdev_io_complete(bdev_io, bdev_io->internal.status); 2985 } 2986 2987 void 2988 spdk_bdev_io_get_scsi_status(const struct spdk_bdev_io *bdev_io, 2989 int *sc, int *sk, int *asc, int *ascq) 2990 { 2991 assert(sc != NULL); 2992 assert(sk != NULL); 2993 assert(asc != NULL); 2994 assert(ascq != NULL); 2995 2996 switch (bdev_io->internal.status) { 2997 case SPDK_BDEV_IO_STATUS_SUCCESS: 2998 *sc = SPDK_SCSI_STATUS_GOOD; 2999 *sk = SPDK_SCSI_SENSE_NO_SENSE; 3000 *asc = SPDK_SCSI_ASC_NO_ADDITIONAL_SENSE; 3001 *ascq = SPDK_SCSI_ASCQ_CAUSE_NOT_REPORTABLE; 3002 break; 3003 case SPDK_BDEV_IO_STATUS_NVME_ERROR: 3004 spdk_scsi_nvme_translate(bdev_io, sc, sk, asc, ascq); 3005 break; 3006 case SPDK_BDEV_IO_STATUS_SCSI_ERROR: 3007 *sc = bdev_io->internal.error.scsi.sc; 3008 *sk = bdev_io->internal.error.scsi.sk; 3009 *asc = bdev_io->internal.error.scsi.asc; 3010 *ascq = bdev_io->internal.error.scsi.ascq; 3011 break; 3012 default: 3013 *sc = SPDK_SCSI_STATUS_CHECK_CONDITION; 3014 *sk = SPDK_SCSI_SENSE_ABORTED_COMMAND; 3015 *asc = SPDK_SCSI_ASC_NO_ADDITIONAL_SENSE; 3016 *ascq = SPDK_SCSI_ASCQ_CAUSE_NOT_REPORTABLE; 3017 break; 3018 } 3019 } 3020 3021 void 3022 spdk_bdev_io_complete_nvme_status(struct spdk_bdev_io *bdev_io, int sct, int sc) 3023 { 3024 if (sct == SPDK_NVME_SCT_GENERIC && sc == SPDK_NVME_SC_SUCCESS) { 3025 bdev_io->internal.status = SPDK_BDEV_IO_STATUS_SUCCESS; 3026 } else { 3027 bdev_io->internal.error.nvme.sct = sct; 3028 bdev_io->internal.error.nvme.sc = sc; 3029 bdev_io->internal.status = SPDK_BDEV_IO_STATUS_NVME_ERROR; 3030 } 3031 3032 spdk_bdev_io_complete(bdev_io, bdev_io->internal.status); 3033 } 3034 3035 void 3036 spdk_bdev_io_get_nvme_status(const struct spdk_bdev_io *bdev_io, int *sct, int *sc) 3037 { 3038 assert(sct != NULL); 3039 assert(sc != NULL); 3040 3041 if (bdev_io->internal.status == SPDK_BDEV_IO_STATUS_NVME_ERROR) { 3042 *sct = bdev_io->internal.error.nvme.sct; 3043 *sc = bdev_io->internal.error.nvme.sc; 3044 } else if (bdev_io->internal.status == SPDK_BDEV_IO_STATUS_SUCCESS) { 3045 *sct = SPDK_NVME_SCT_GENERIC; 3046 *sc = SPDK_NVME_SC_SUCCESS; 3047 } else { 3048 *sct = SPDK_NVME_SCT_GENERIC; 3049 *sc = SPDK_NVME_SC_INTERNAL_DEVICE_ERROR; 3050 } 3051 } 3052 3053 struct spdk_thread * 3054 spdk_bdev_io_get_thread(struct spdk_bdev_io *bdev_io) 3055 { 3056 return spdk_io_channel_get_thread(bdev_io->internal.ch->channel); 3057 } 3058 3059 static void 3060 _spdk_bdev_qos_config_limit(struct spdk_bdev *bdev, uint64_t *limits) 3061 { 3062 uint64_t min_qos_set; 3063 int i; 3064 3065 for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) { 3066 if (limits[i] != SPDK_BDEV_QOS_LIMIT_NOT_DEFINED) { 3067 break; 3068 } 3069 } 3070 3071 if (i == SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES) { 3072 SPDK_ERRLOG("Invalid rate limits set.\n"); 3073 return; 3074 } 3075 3076 for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) { 3077 if (limits[i] == SPDK_BDEV_QOS_LIMIT_NOT_DEFINED) { 3078 continue; 3079 } 3080 3081 if (_spdk_bdev_qos_is_iops_rate_limit(i) == true) { 3082 min_qos_set = SPDK_BDEV_QOS_MIN_IOS_PER_SEC; 3083 } else { 3084 min_qos_set = SPDK_BDEV_QOS_MIN_BYTES_PER_SEC; 3085 } 3086 3087 if (limits[i] == 0 || limits[i] % min_qos_set) { 3088 SPDK_ERRLOG("Assigned limit %" PRIu64 " on bdev %s is not multiple of %" PRIu64 "\n", 3089 limits[i], bdev->name, min_qos_set); 3090 SPDK_ERRLOG("Failed to enable QoS on this bdev %s\n", bdev->name); 3091 return; 3092 } 3093 } 3094 3095 if (!bdev->internal.qos) { 3096 bdev->internal.qos = calloc(1, sizeof(*bdev->internal.qos)); 3097 if (!bdev->internal.qos) { 3098 SPDK_ERRLOG("Unable to allocate memory for QoS tracking\n"); 3099 return; 3100 } 3101 } 3102 3103 for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) { 3104 bdev->internal.qos->rate_limits[i].limit = limits[i]; 3105 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Bdev:%s QoS type:%d set:%lu\n", 3106 bdev->name, i, limits[i]); 3107 } 3108 3109 return; 3110 } 3111 3112 static void 3113 _spdk_bdev_qos_config(struct spdk_bdev *bdev) 3114 { 3115 struct spdk_conf_section *sp = NULL; 3116 const char *val = NULL; 3117 int i = 0, j = 0; 3118 uint64_t limits[SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES] = {}; 3119 bool config_qos = false; 3120 3121 sp = spdk_conf_find_section(NULL, "QoS"); 3122 if (!sp) { 3123 return; 3124 } 3125 3126 while (j < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES) { 3127 limits[j] = SPDK_BDEV_QOS_LIMIT_NOT_DEFINED; 3128 3129 i = 0; 3130 while (true) { 3131 val = spdk_conf_section_get_nmval(sp, qos_conf_type[j], i, 0); 3132 if (!val) { 3133 break; 3134 } 3135 3136 if (strcmp(bdev->name, val) != 0) { 3137 i++; 3138 continue; 3139 } 3140 3141 val = spdk_conf_section_get_nmval(sp, qos_conf_type[j], i, 1); 3142 if (val) { 3143 if (_spdk_bdev_qos_is_iops_rate_limit(j) == true) { 3144 limits[j] = strtoull(val, NULL, 10); 3145 } else { 3146 limits[j] = strtoull(val, NULL, 10) * 1024 * 1024; 3147 } 3148 config_qos = true; 3149 } 3150 3151 break; 3152 } 3153 3154 j++; 3155 } 3156 3157 if (config_qos == true) { 3158 _spdk_bdev_qos_config_limit(bdev, limits); 3159 } 3160 3161 return; 3162 } 3163 3164 static int 3165 spdk_bdev_init(struct spdk_bdev *bdev) 3166 { 3167 char *bdev_name; 3168 3169 assert(bdev->module != NULL); 3170 3171 if (!bdev->name) { 3172 SPDK_ERRLOG("Bdev name is NULL\n"); 3173 return -EINVAL; 3174 } 3175 3176 if (spdk_bdev_get_by_name(bdev->name)) { 3177 SPDK_ERRLOG("Bdev name:%s already exists\n", bdev->name); 3178 return -EEXIST; 3179 } 3180 3181 /* Users often register their own I/O devices using the bdev name. In 3182 * order to avoid conflicts, prepend bdev_. */ 3183 bdev_name = spdk_sprintf_alloc("bdev_%s", bdev->name); 3184 if (!bdev_name) { 3185 SPDK_ERRLOG("Unable to allocate memory for internal bdev name.\n"); 3186 return -ENOMEM; 3187 } 3188 3189 bdev->internal.status = SPDK_BDEV_STATUS_READY; 3190 bdev->internal.measured_queue_depth = UINT64_MAX; 3191 3192 TAILQ_INIT(&bdev->internal.open_descs); 3193 3194 TAILQ_INIT(&bdev->aliases); 3195 3196 bdev->internal.reset_in_progress = NULL; 3197 3198 _spdk_bdev_qos_config(bdev); 3199 3200 spdk_io_device_register(__bdev_to_io_dev(bdev), 3201 spdk_bdev_channel_create, spdk_bdev_channel_destroy, 3202 sizeof(struct spdk_bdev_channel), 3203 bdev_name); 3204 3205 free(bdev_name); 3206 3207 pthread_mutex_init(&bdev->internal.mutex, NULL); 3208 return 0; 3209 } 3210 3211 static void 3212 spdk_bdev_destroy_cb(void *io_device) 3213 { 3214 int rc; 3215 struct spdk_bdev *bdev; 3216 spdk_bdev_unregister_cb cb_fn; 3217 void *cb_arg; 3218 3219 bdev = __bdev_from_io_dev(io_device); 3220 cb_fn = bdev->internal.unregister_cb; 3221 cb_arg = bdev->internal.unregister_ctx; 3222 3223 rc = bdev->fn_table->destruct(bdev->ctxt); 3224 if (rc < 0) { 3225 SPDK_ERRLOG("destruct failed\n"); 3226 } 3227 if (rc <= 0 && cb_fn != NULL) { 3228 cb_fn(cb_arg, rc); 3229 } 3230 } 3231 3232 3233 static void 3234 spdk_bdev_fini(struct spdk_bdev *bdev) 3235 { 3236 pthread_mutex_destroy(&bdev->internal.mutex); 3237 3238 free(bdev->internal.qos); 3239 3240 spdk_io_device_unregister(__bdev_to_io_dev(bdev), spdk_bdev_destroy_cb); 3241 } 3242 3243 static void 3244 spdk_bdev_start(struct spdk_bdev *bdev) 3245 { 3246 struct spdk_bdev_module *module; 3247 uint32_t action; 3248 3249 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Inserting bdev %s into list\n", bdev->name); 3250 TAILQ_INSERT_TAIL(&g_bdev_mgr.bdevs, bdev, internal.link); 3251 3252 /* Examine configuration before initializing I/O */ 3253 TAILQ_FOREACH(module, &g_bdev_mgr.bdev_modules, internal.tailq) { 3254 if (module->examine_config) { 3255 action = module->internal.action_in_progress; 3256 module->internal.action_in_progress++; 3257 module->examine_config(bdev); 3258 if (action != module->internal.action_in_progress) { 3259 SPDK_ERRLOG("examine_config for module %s did not call spdk_bdev_module_examine_done()\n", 3260 module->name); 3261 } 3262 } 3263 } 3264 3265 if (bdev->internal.claim_module) { 3266 return; 3267 } 3268 3269 TAILQ_FOREACH(module, &g_bdev_mgr.bdev_modules, internal.tailq) { 3270 if (module->examine_disk) { 3271 module->internal.action_in_progress++; 3272 module->examine_disk(bdev); 3273 } 3274 } 3275 } 3276 3277 int 3278 spdk_bdev_register(struct spdk_bdev *bdev) 3279 { 3280 int rc = spdk_bdev_init(bdev); 3281 3282 if (rc == 0) { 3283 spdk_bdev_start(bdev); 3284 } 3285 3286 return rc; 3287 } 3288 3289 int 3290 spdk_vbdev_register(struct spdk_bdev *vbdev, struct spdk_bdev **base_bdevs, int base_bdev_count) 3291 { 3292 int rc; 3293 3294 rc = spdk_bdev_init(vbdev); 3295 if (rc) { 3296 return rc; 3297 } 3298 3299 spdk_bdev_start(vbdev); 3300 return 0; 3301 } 3302 3303 void 3304 spdk_bdev_destruct_done(struct spdk_bdev *bdev, int bdeverrno) 3305 { 3306 if (bdev->internal.unregister_cb != NULL) { 3307 bdev->internal.unregister_cb(bdev->internal.unregister_ctx, bdeverrno); 3308 } 3309 } 3310 3311 static void 3312 _remove_notify(void *arg) 3313 { 3314 struct spdk_bdev_desc *desc = arg; 3315 3316 desc->remove_scheduled = false; 3317 3318 if (desc->closed) { 3319 free(desc); 3320 } else { 3321 desc->remove_cb(desc->remove_ctx); 3322 } 3323 } 3324 3325 void 3326 spdk_bdev_unregister(struct spdk_bdev *bdev, spdk_bdev_unregister_cb cb_fn, void *cb_arg) 3327 { 3328 struct spdk_bdev_desc *desc, *tmp; 3329 bool do_destruct = true; 3330 struct spdk_thread *thread; 3331 3332 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Removing bdev %s from list\n", bdev->name); 3333 3334 thread = spdk_get_thread(); 3335 if (!thread) { 3336 /* The user called this from a non-SPDK thread. */ 3337 if (cb_fn != NULL) { 3338 cb_fn(cb_arg, -ENOTSUP); 3339 } 3340 return; 3341 } 3342 3343 pthread_mutex_lock(&bdev->internal.mutex); 3344 3345 bdev->internal.status = SPDK_BDEV_STATUS_REMOVING; 3346 bdev->internal.unregister_cb = cb_fn; 3347 bdev->internal.unregister_ctx = cb_arg; 3348 3349 TAILQ_FOREACH_SAFE(desc, &bdev->internal.open_descs, link, tmp) { 3350 if (desc->remove_cb) { 3351 do_destruct = false; 3352 /* 3353 * Defer invocation of the remove_cb to a separate message that will 3354 * run later on its thread. This ensures this context unwinds and 3355 * we don't recursively unregister this bdev again if the remove_cb 3356 * immediately closes its descriptor. 3357 */ 3358 if (!desc->remove_scheduled) { 3359 /* Avoid scheduling removal of the same descriptor multiple times. */ 3360 desc->remove_scheduled = true; 3361 spdk_thread_send_msg(desc->thread, _remove_notify, desc); 3362 } 3363 } 3364 } 3365 3366 if (!do_destruct) { 3367 pthread_mutex_unlock(&bdev->internal.mutex); 3368 return; 3369 } 3370 3371 TAILQ_REMOVE(&g_bdev_mgr.bdevs, bdev, internal.link); 3372 pthread_mutex_unlock(&bdev->internal.mutex); 3373 3374 spdk_bdev_fini(bdev); 3375 } 3376 3377 int 3378 spdk_bdev_open(struct spdk_bdev *bdev, bool write, spdk_bdev_remove_cb_t remove_cb, 3379 void *remove_ctx, struct spdk_bdev_desc **_desc) 3380 { 3381 struct spdk_bdev_desc *desc; 3382 struct spdk_thread *thread; 3383 3384 thread = spdk_get_thread(); 3385 if (!thread) { 3386 SPDK_ERRLOG("Cannot open bdev from non-SPDK thread.\n"); 3387 return -ENOTSUP; 3388 } 3389 3390 desc = calloc(1, sizeof(*desc)); 3391 if (desc == NULL) { 3392 SPDK_ERRLOG("Failed to allocate memory for bdev descriptor\n"); 3393 return -ENOMEM; 3394 } 3395 3396 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Opening descriptor %p for bdev %s on thread %p\n", desc, bdev->name, 3397 spdk_get_thread()); 3398 3399 pthread_mutex_lock(&bdev->internal.mutex); 3400 3401 if (write && bdev->internal.claim_module) { 3402 SPDK_ERRLOG("Could not open %s - %s module already claimed it\n", 3403 bdev->name, bdev->internal.claim_module->name); 3404 free(desc); 3405 pthread_mutex_unlock(&bdev->internal.mutex); 3406 return -EPERM; 3407 } 3408 3409 TAILQ_INSERT_TAIL(&bdev->internal.open_descs, desc, link); 3410 3411 desc->bdev = bdev; 3412 desc->thread = thread; 3413 desc->remove_cb = remove_cb; 3414 desc->remove_ctx = remove_ctx; 3415 desc->write = write; 3416 *_desc = desc; 3417 3418 pthread_mutex_unlock(&bdev->internal.mutex); 3419 3420 return 0; 3421 } 3422 3423 void 3424 spdk_bdev_close(struct spdk_bdev_desc *desc) 3425 { 3426 struct spdk_bdev *bdev = desc->bdev; 3427 bool do_unregister = false; 3428 3429 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Closing descriptor %p for bdev %s on thread %p\n", desc, bdev->name, 3430 spdk_get_thread()); 3431 3432 assert(desc->thread == spdk_get_thread()); 3433 3434 pthread_mutex_lock(&bdev->internal.mutex); 3435 3436 TAILQ_REMOVE(&bdev->internal.open_descs, desc, link); 3437 3438 desc->closed = true; 3439 3440 if (!desc->remove_scheduled) { 3441 free(desc); 3442 } 3443 3444 /* If no more descriptors, kill QoS channel */ 3445 if (bdev->internal.qos && TAILQ_EMPTY(&bdev->internal.open_descs)) { 3446 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Closed last descriptor for bdev %s on thread %p. Stopping QoS.\n", 3447 bdev->name, spdk_get_thread()); 3448 3449 if (spdk_bdev_qos_destroy(bdev)) { 3450 /* There isn't anything we can do to recover here. Just let the 3451 * old QoS poller keep running. The QoS handling won't change 3452 * cores when the user allocates a new channel, but it won't break. */ 3453 SPDK_ERRLOG("Unable to shut down QoS poller. It will continue running on the current thread.\n"); 3454 } 3455 } 3456 3457 spdk_bdev_set_qd_sampling_period(bdev, 0); 3458 3459 if (bdev->internal.status == SPDK_BDEV_STATUS_REMOVING && TAILQ_EMPTY(&bdev->internal.open_descs)) { 3460 do_unregister = true; 3461 } 3462 pthread_mutex_unlock(&bdev->internal.mutex); 3463 3464 if (do_unregister == true) { 3465 spdk_bdev_unregister(bdev, bdev->internal.unregister_cb, bdev->internal.unregister_ctx); 3466 } 3467 } 3468 3469 int 3470 spdk_bdev_module_claim_bdev(struct spdk_bdev *bdev, struct spdk_bdev_desc *desc, 3471 struct spdk_bdev_module *module) 3472 { 3473 if (bdev->internal.claim_module != NULL) { 3474 SPDK_ERRLOG("bdev %s already claimed by module %s\n", bdev->name, 3475 bdev->internal.claim_module->name); 3476 return -EPERM; 3477 } 3478 3479 if (desc && !desc->write) { 3480 desc->write = true; 3481 } 3482 3483 bdev->internal.claim_module = module; 3484 return 0; 3485 } 3486 3487 void 3488 spdk_bdev_module_release_bdev(struct spdk_bdev *bdev) 3489 { 3490 assert(bdev->internal.claim_module != NULL); 3491 bdev->internal.claim_module = NULL; 3492 } 3493 3494 struct spdk_bdev * 3495 spdk_bdev_desc_get_bdev(struct spdk_bdev_desc *desc) 3496 { 3497 return desc->bdev; 3498 } 3499 3500 void 3501 spdk_bdev_io_get_iovec(struct spdk_bdev_io *bdev_io, struct iovec **iovp, int *iovcntp) 3502 { 3503 struct iovec *iovs; 3504 int iovcnt; 3505 3506 if (bdev_io == NULL) { 3507 return; 3508 } 3509 3510 switch (bdev_io->type) { 3511 case SPDK_BDEV_IO_TYPE_READ: 3512 iovs = bdev_io->u.bdev.iovs; 3513 iovcnt = bdev_io->u.bdev.iovcnt; 3514 break; 3515 case SPDK_BDEV_IO_TYPE_WRITE: 3516 iovs = bdev_io->u.bdev.iovs; 3517 iovcnt = bdev_io->u.bdev.iovcnt; 3518 break; 3519 default: 3520 iovs = NULL; 3521 iovcnt = 0; 3522 break; 3523 } 3524 3525 if (iovp) { 3526 *iovp = iovs; 3527 } 3528 if (iovcntp) { 3529 *iovcntp = iovcnt; 3530 } 3531 } 3532 3533 void 3534 spdk_bdev_module_list_add(struct spdk_bdev_module *bdev_module) 3535 { 3536 3537 if (spdk_bdev_module_list_find(bdev_module->name)) { 3538 SPDK_ERRLOG("ERROR: module '%s' already registered.\n", bdev_module->name); 3539 assert(false); 3540 } 3541 3542 if (bdev_module->async_init) { 3543 bdev_module->internal.action_in_progress = 1; 3544 } 3545 3546 /* 3547 * Modules with examine callbacks must be initialized first, so they are 3548 * ready to handle examine callbacks from later modules that will 3549 * register physical bdevs. 3550 */ 3551 if (bdev_module->examine_config != NULL || bdev_module->examine_disk != NULL) { 3552 TAILQ_INSERT_HEAD(&g_bdev_mgr.bdev_modules, bdev_module, internal.tailq); 3553 } else { 3554 TAILQ_INSERT_TAIL(&g_bdev_mgr.bdev_modules, bdev_module, internal.tailq); 3555 } 3556 } 3557 3558 struct spdk_bdev_module * 3559 spdk_bdev_module_list_find(const char *name) 3560 { 3561 struct spdk_bdev_module *bdev_module; 3562 3563 TAILQ_FOREACH(bdev_module, &g_bdev_mgr.bdev_modules, internal.tailq) { 3564 if (strcmp(name, bdev_module->name) == 0) { 3565 break; 3566 } 3567 } 3568 3569 return bdev_module; 3570 } 3571 3572 static void 3573 _spdk_bdev_write_zero_buffer_next(void *_bdev_io) 3574 { 3575 struct spdk_bdev_io *bdev_io = _bdev_io; 3576 uint64_t num_bytes, num_blocks; 3577 int rc; 3578 3579 num_bytes = spdk_min(spdk_bdev_get_block_size(bdev_io->bdev) * 3580 bdev_io->u.bdev.split_remaining_num_blocks, 3581 ZERO_BUFFER_SIZE); 3582 num_blocks = num_bytes / spdk_bdev_get_block_size(bdev_io->bdev); 3583 3584 rc = spdk_bdev_write_blocks(bdev_io->internal.desc, 3585 spdk_io_channel_from_ctx(bdev_io->internal.ch), 3586 g_bdev_mgr.zero_buffer, 3587 bdev_io->u.bdev.split_current_offset_blocks, num_blocks, 3588 _spdk_bdev_write_zero_buffer_done, bdev_io); 3589 if (rc == 0) { 3590 bdev_io->u.bdev.split_remaining_num_blocks -= num_blocks; 3591 bdev_io->u.bdev.split_current_offset_blocks += num_blocks; 3592 } else if (rc == -ENOMEM) { 3593 _spdk_bdev_queue_io_wait_with_cb(bdev_io, _spdk_bdev_write_zero_buffer_next); 3594 } else { 3595 bdev_io->internal.status = SPDK_BDEV_IO_STATUS_FAILED; 3596 bdev_io->internal.cb(bdev_io, false, bdev_io->internal.caller_ctx); 3597 } 3598 } 3599 3600 static void 3601 _spdk_bdev_write_zero_buffer_done(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg) 3602 { 3603 struct spdk_bdev_io *parent_io = cb_arg; 3604 3605 spdk_bdev_free_io(bdev_io); 3606 3607 if (!success) { 3608 parent_io->internal.status = SPDK_BDEV_IO_STATUS_FAILED; 3609 parent_io->internal.cb(parent_io, false, parent_io->internal.caller_ctx); 3610 return; 3611 } 3612 3613 if (parent_io->u.bdev.split_remaining_num_blocks == 0) { 3614 parent_io->internal.status = SPDK_BDEV_IO_STATUS_SUCCESS; 3615 parent_io->internal.cb(parent_io, true, parent_io->internal.caller_ctx); 3616 return; 3617 } 3618 3619 _spdk_bdev_write_zero_buffer_next(parent_io); 3620 } 3621 3622 struct set_qos_limit_ctx { 3623 void (*cb_fn)(void *cb_arg, int status); 3624 void *cb_arg; 3625 struct spdk_bdev *bdev; 3626 }; 3627 3628 static void 3629 _spdk_bdev_set_qos_limit_done(struct set_qos_limit_ctx *ctx, int status) 3630 { 3631 pthread_mutex_lock(&ctx->bdev->internal.mutex); 3632 ctx->bdev->internal.qos_mod_in_progress = false; 3633 pthread_mutex_unlock(&ctx->bdev->internal.mutex); 3634 3635 ctx->cb_fn(ctx->cb_arg, status); 3636 free(ctx); 3637 } 3638 3639 static void 3640 _spdk_bdev_disable_qos_done(void *cb_arg) 3641 { 3642 struct set_qos_limit_ctx *ctx = cb_arg; 3643 struct spdk_bdev *bdev = ctx->bdev; 3644 struct spdk_bdev_io *bdev_io; 3645 struct spdk_bdev_qos *qos; 3646 3647 pthread_mutex_lock(&bdev->internal.mutex); 3648 qos = bdev->internal.qos; 3649 bdev->internal.qos = NULL; 3650 pthread_mutex_unlock(&bdev->internal.mutex); 3651 3652 while (!TAILQ_EMPTY(&qos->queued)) { 3653 /* Send queued I/O back to their original thread for resubmission. */ 3654 bdev_io = TAILQ_FIRST(&qos->queued); 3655 TAILQ_REMOVE(&qos->queued, bdev_io, internal.link); 3656 3657 if (bdev_io->internal.io_submit_ch) { 3658 /* 3659 * Channel was changed when sending it to the QoS thread - change it back 3660 * before sending it back to the original thread. 3661 */ 3662 bdev_io->internal.ch = bdev_io->internal.io_submit_ch; 3663 bdev_io->internal.io_submit_ch = NULL; 3664 } 3665 3666 spdk_thread_send_msg(spdk_io_channel_get_thread(bdev_io->internal.ch->channel), 3667 _spdk_bdev_io_submit, bdev_io); 3668 } 3669 3670 spdk_put_io_channel(spdk_io_channel_from_ctx(qos->ch)); 3671 spdk_poller_unregister(&qos->poller); 3672 3673 free(qos); 3674 3675 _spdk_bdev_set_qos_limit_done(ctx, 0); 3676 } 3677 3678 static void 3679 _spdk_bdev_disable_qos_msg_done(struct spdk_io_channel_iter *i, int status) 3680 { 3681 void *io_device = spdk_io_channel_iter_get_io_device(i); 3682 struct spdk_bdev *bdev = __bdev_from_io_dev(io_device); 3683 struct set_qos_limit_ctx *ctx = spdk_io_channel_iter_get_ctx(i); 3684 struct spdk_thread *thread; 3685 3686 pthread_mutex_lock(&bdev->internal.mutex); 3687 thread = bdev->internal.qos->thread; 3688 pthread_mutex_unlock(&bdev->internal.mutex); 3689 3690 spdk_thread_send_msg(thread, _spdk_bdev_disable_qos_done, ctx); 3691 } 3692 3693 static void 3694 _spdk_bdev_disable_qos_msg(struct spdk_io_channel_iter *i) 3695 { 3696 struct spdk_io_channel *ch = spdk_io_channel_iter_get_channel(i); 3697 struct spdk_bdev_channel *bdev_ch = spdk_io_channel_get_ctx(ch); 3698 3699 bdev_ch->flags &= ~BDEV_CH_QOS_ENABLED; 3700 3701 spdk_for_each_channel_continue(i, 0); 3702 } 3703 3704 static void 3705 _spdk_bdev_update_qos_rate_limit_msg(void *cb_arg) 3706 { 3707 struct set_qos_limit_ctx *ctx = cb_arg; 3708 struct spdk_bdev *bdev = ctx->bdev; 3709 3710 pthread_mutex_lock(&bdev->internal.mutex); 3711 spdk_bdev_qos_update_max_quota_per_timeslice(bdev->internal.qos); 3712 pthread_mutex_unlock(&bdev->internal.mutex); 3713 3714 _spdk_bdev_set_qos_limit_done(ctx, 0); 3715 } 3716 3717 static void 3718 _spdk_bdev_enable_qos_msg(struct spdk_io_channel_iter *i) 3719 { 3720 void *io_device = spdk_io_channel_iter_get_io_device(i); 3721 struct spdk_bdev *bdev = __bdev_from_io_dev(io_device); 3722 struct spdk_io_channel *ch = spdk_io_channel_iter_get_channel(i); 3723 struct spdk_bdev_channel *bdev_ch = spdk_io_channel_get_ctx(ch); 3724 3725 pthread_mutex_lock(&bdev->internal.mutex); 3726 _spdk_bdev_enable_qos(bdev, bdev_ch); 3727 pthread_mutex_unlock(&bdev->internal.mutex); 3728 spdk_for_each_channel_continue(i, 0); 3729 } 3730 3731 static void 3732 _spdk_bdev_enable_qos_done(struct spdk_io_channel_iter *i, int status) 3733 { 3734 struct set_qos_limit_ctx *ctx = spdk_io_channel_iter_get_ctx(i); 3735 3736 _spdk_bdev_set_qos_limit_done(ctx, status); 3737 } 3738 3739 static void 3740 _spdk_bdev_set_qos_rate_limits(struct spdk_bdev *bdev, uint64_t *limits) 3741 { 3742 int i; 3743 3744 assert(bdev->internal.qos != NULL); 3745 3746 for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) { 3747 if (limits[i] != SPDK_BDEV_QOS_LIMIT_NOT_DEFINED) { 3748 bdev->internal.qos->rate_limits[i].limit = limits[i]; 3749 3750 if (limits[i] == 0) { 3751 bdev->internal.qos->rate_limits[i].limit = 3752 SPDK_BDEV_QOS_LIMIT_NOT_DEFINED; 3753 } 3754 } 3755 } 3756 } 3757 3758 void 3759 spdk_bdev_set_qos_rate_limits(struct spdk_bdev *bdev, uint64_t *limits, 3760 void (*cb_fn)(void *cb_arg, int status), void *cb_arg) 3761 { 3762 struct set_qos_limit_ctx *ctx; 3763 uint32_t limit_set_complement; 3764 uint64_t min_limit_per_sec; 3765 int i; 3766 bool disable_rate_limit = true; 3767 3768 for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) { 3769 if (limits[i] == SPDK_BDEV_QOS_LIMIT_NOT_DEFINED) { 3770 continue; 3771 } 3772 3773 if (limits[i] > 0) { 3774 disable_rate_limit = false; 3775 } 3776 3777 if (_spdk_bdev_qos_is_iops_rate_limit(i) == true) { 3778 min_limit_per_sec = SPDK_BDEV_QOS_MIN_IOS_PER_SEC; 3779 } else { 3780 /* Change from megabyte to byte rate limit */ 3781 limits[i] = limits[i] * 1024 * 1024; 3782 min_limit_per_sec = SPDK_BDEV_QOS_MIN_BYTES_PER_SEC; 3783 } 3784 3785 limit_set_complement = limits[i] % min_limit_per_sec; 3786 if (limit_set_complement) { 3787 SPDK_ERRLOG("Requested rate limit %" PRIu64 " is not a multiple of %" PRIu64 "\n", 3788 limits[i], min_limit_per_sec); 3789 limits[i] += min_limit_per_sec - limit_set_complement; 3790 SPDK_ERRLOG("Round up the rate limit to %" PRIu64 "\n", limits[i]); 3791 } 3792 } 3793 3794 ctx = calloc(1, sizeof(*ctx)); 3795 if (ctx == NULL) { 3796 cb_fn(cb_arg, -ENOMEM); 3797 return; 3798 } 3799 3800 ctx->cb_fn = cb_fn; 3801 ctx->cb_arg = cb_arg; 3802 ctx->bdev = bdev; 3803 3804 pthread_mutex_lock(&bdev->internal.mutex); 3805 if (bdev->internal.qos_mod_in_progress) { 3806 pthread_mutex_unlock(&bdev->internal.mutex); 3807 free(ctx); 3808 cb_fn(cb_arg, -EAGAIN); 3809 return; 3810 } 3811 bdev->internal.qos_mod_in_progress = true; 3812 3813 if (disable_rate_limit == true && bdev->internal.qos) { 3814 for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) { 3815 if (limits[i] == SPDK_BDEV_QOS_LIMIT_NOT_DEFINED && 3816 (bdev->internal.qos->rate_limits[i].limit > 0 && 3817 bdev->internal.qos->rate_limits[i].limit != 3818 SPDK_BDEV_QOS_LIMIT_NOT_DEFINED)) { 3819 disable_rate_limit = false; 3820 break; 3821 } 3822 } 3823 } 3824 3825 if (disable_rate_limit == false) { 3826 if (bdev->internal.qos == NULL) { 3827 /* Enabling */ 3828 bdev->internal.qos = calloc(1, sizeof(*bdev->internal.qos)); 3829 if (!bdev->internal.qos) { 3830 pthread_mutex_unlock(&bdev->internal.mutex); 3831 SPDK_ERRLOG("Unable to allocate memory for QoS tracking\n"); 3832 free(ctx); 3833 cb_fn(cb_arg, -ENOMEM); 3834 return; 3835 } 3836 3837 _spdk_bdev_set_qos_rate_limits(bdev, limits); 3838 3839 spdk_for_each_channel(__bdev_to_io_dev(bdev), 3840 _spdk_bdev_enable_qos_msg, ctx, 3841 _spdk_bdev_enable_qos_done); 3842 } else { 3843 /* Updating */ 3844 _spdk_bdev_set_qos_rate_limits(bdev, limits); 3845 3846 spdk_thread_send_msg(bdev->internal.qos->thread, 3847 _spdk_bdev_update_qos_rate_limit_msg, ctx); 3848 } 3849 } else { 3850 if (bdev->internal.qos != NULL) { 3851 _spdk_bdev_set_qos_rate_limits(bdev, limits); 3852 3853 /* Disabling */ 3854 spdk_for_each_channel(__bdev_to_io_dev(bdev), 3855 _spdk_bdev_disable_qos_msg, ctx, 3856 _spdk_bdev_disable_qos_msg_done); 3857 } else { 3858 pthread_mutex_unlock(&bdev->internal.mutex); 3859 _spdk_bdev_set_qos_limit_done(ctx, 0); 3860 return; 3861 } 3862 } 3863 3864 pthread_mutex_unlock(&bdev->internal.mutex); 3865 } 3866 3867 SPDK_LOG_REGISTER_COMPONENT("bdev", SPDK_LOG_BDEV) 3868 3869 SPDK_TRACE_REGISTER_FN(bdev_trace) 3870 { 3871 spdk_trace_register_owner(OWNER_BDEV, 'b'); 3872 spdk_trace_register_object(OBJECT_BDEV_IO, 'i'); 3873 spdk_trace_register_description("BDEV_IO_START", "", TRACE_BDEV_IO_START, OWNER_BDEV, 3874 OBJECT_BDEV_IO, 1, 0, "type: "); 3875 spdk_trace_register_description("BDEV_IO_DONE", "", TRACE_BDEV_IO_DONE, OWNER_BDEV, 3876 OBJECT_BDEV_IO, 0, 0, ""); 3877 } 3878