1 /*- 2 * BSD LICENSE 3 * 4 * Copyright (c) Intel Corporation. 5 * All rights reserved. 6 * 7 * Redistribution and use in source and binary forms, with or without 8 * modification, are permitted provided that the following conditions 9 * are met: 10 * 11 * * Redistributions of source code must retain the above copyright 12 * notice, this list of conditions and the following disclaimer. 13 * * Redistributions in binary form must reproduce the above copyright 14 * notice, this list of conditions and the following disclaimer in 15 * the documentation and/or other materials provided with the 16 * distribution. 17 * * Neither the name of Intel Corporation nor the names of its 18 * contributors may be used to endorse or promote products derived 19 * from this software without specific prior written permission. 20 * 21 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 22 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 23 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR 24 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT 25 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, 26 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT 27 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 28 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY 29 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 30 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE 31 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 32 */ 33 34 #include "spdk/stdinc.h" 35 36 #include "spdk/bdev.h" 37 #include "spdk/conf.h" 38 39 #include "spdk/env.h" 40 #include "spdk/event.h" 41 #include "spdk/thread.h" 42 #include "spdk/likely.h" 43 #include "spdk/queue.h" 44 #include "spdk/nvme_spec.h" 45 #include "spdk/scsi_spec.h" 46 #include "spdk/util.h" 47 48 #include "spdk/bdev_module.h" 49 #include "spdk_internal/log.h" 50 #include "spdk/string.h" 51 52 #ifdef SPDK_CONFIG_VTUNE 53 #include "ittnotify.h" 54 #include "ittnotify_types.h" 55 int __itt_init_ittlib(const char *, __itt_group_id); 56 #endif 57 58 #define SPDK_BDEV_IO_POOL_SIZE (64 * 1024) 59 #define SPDK_BDEV_IO_CACHE_SIZE 256 60 #define BUF_SMALL_POOL_SIZE 8192 61 #define BUF_LARGE_POOL_SIZE 1024 62 #define NOMEM_THRESHOLD_COUNT 8 63 #define ZERO_BUFFER_SIZE 0x100000 64 #define SPDK_BDEV_QOS_TIMESLICE_IN_USEC 1000 65 #define SPDK_BDEV_SEC_TO_USEC 1000000ULL 66 #define SPDK_BDEV_QOS_MIN_IO_PER_TIMESLICE 1 67 #define SPDK_BDEV_QOS_MIN_BYTE_PER_TIMESLICE 512 68 #define SPDK_BDEV_QOS_MIN_IOS_PER_SEC 10000 69 #define SPDK_BDEV_QOS_MIN_BW_IN_MB_PER_SEC 10 70 71 enum spdk_bdev_qos_type { 72 SPDK_BDEV_QOS_RW_IOPS_RATE_LIMIT = 0, 73 SPDK_BDEV_QOS_RW_BYTEPS_RATE_LIMIT, 74 SPDK_BDEV_QOS_NUM_TYPES /* Keep last */ 75 }; 76 77 static const char *qos_type_str[SPDK_BDEV_QOS_NUM_TYPES] = {"Limit_IOPS", "Limit_BWPS"}; 78 79 TAILQ_HEAD(spdk_bdev_list, spdk_bdev); 80 81 struct spdk_bdev_mgr { 82 struct spdk_mempool *bdev_io_pool; 83 84 struct spdk_mempool *buf_small_pool; 85 struct spdk_mempool *buf_large_pool; 86 87 void *zero_buffer; 88 89 TAILQ_HEAD(, spdk_bdev_module) bdev_modules; 90 91 struct spdk_bdev_list bdevs; 92 93 bool init_complete; 94 bool module_init_complete; 95 96 #ifdef SPDK_CONFIG_VTUNE 97 __itt_domain *domain; 98 #endif 99 }; 100 101 static struct spdk_bdev_mgr g_bdev_mgr = { 102 .bdev_modules = TAILQ_HEAD_INITIALIZER(g_bdev_mgr.bdev_modules), 103 .bdevs = TAILQ_HEAD_INITIALIZER(g_bdev_mgr.bdevs), 104 .init_complete = false, 105 .module_init_complete = false, 106 }; 107 108 static struct spdk_bdev_opts g_bdev_opts = { 109 .bdev_io_pool_size = SPDK_BDEV_IO_POOL_SIZE, 110 .bdev_io_cache_size = SPDK_BDEV_IO_CACHE_SIZE, 111 }; 112 113 static spdk_bdev_init_cb g_init_cb_fn = NULL; 114 static void *g_init_cb_arg = NULL; 115 116 static spdk_bdev_fini_cb g_fini_cb_fn = NULL; 117 static void *g_fini_cb_arg = NULL; 118 static struct spdk_thread *g_fini_thread = NULL; 119 120 struct spdk_bdev_qos { 121 /** Rate limit, in I/O per second */ 122 uint64_t iops_rate_limit; 123 124 /** Rate limit, in byte per second */ 125 uint64_t byte_rate_limit; 126 127 /** The channel that all I/O are funneled through */ 128 struct spdk_bdev_channel *ch; 129 130 /** The thread on which the poller is running. */ 131 struct spdk_thread *thread; 132 133 /** Queue of I/O waiting to be issued. */ 134 bdev_io_tailq_t queued; 135 136 /** Maximum allowed IOs to be issued in one timeslice (e.g., 1ms) and 137 * only valid for the master channel which manages the outstanding IOs. */ 138 uint64_t max_ios_per_timeslice; 139 140 /** Maximum allowed bytes to be issued in one timeslice (e.g., 1ms) and 141 * only valid for the master channel which manages the outstanding IOs. */ 142 uint64_t max_byte_per_timeslice; 143 144 /** Submitted IO in one timeslice (e.g., 1ms) */ 145 uint64_t io_submitted_this_timeslice; 146 147 /** Submitted byte in one timeslice (e.g., 1ms) */ 148 uint64_t byte_submitted_this_timeslice; 149 150 /** Polller that processes queued I/O commands each time slice. */ 151 struct spdk_poller *poller; 152 }; 153 154 struct spdk_bdev_mgmt_channel { 155 bdev_io_stailq_t need_buf_small; 156 bdev_io_stailq_t need_buf_large; 157 158 /* 159 * Each thread keeps a cache of bdev_io - this allows 160 * bdev threads which are *not* DPDK threads to still 161 * benefit from a per-thread bdev_io cache. Without 162 * this, non-DPDK threads fetching from the mempool 163 * incur a cmpxchg on get and put. 164 */ 165 bdev_io_stailq_t per_thread_cache; 166 uint32_t per_thread_cache_count; 167 uint32_t bdev_io_cache_size; 168 169 TAILQ_HEAD(, spdk_bdev_shared_resource) shared_resources; 170 TAILQ_HEAD(, spdk_bdev_io_wait_entry) io_wait_queue; 171 }; 172 173 /* 174 * Per-module (or per-io_device) data. Multiple bdevs built on the same io_device 175 * will queue here their IO that awaits retry. It makes it posible to retry sending 176 * IO to one bdev after IO from other bdev completes. 177 */ 178 struct spdk_bdev_shared_resource { 179 /* The bdev management channel */ 180 struct spdk_bdev_mgmt_channel *mgmt_ch; 181 182 /* 183 * Count of I/O submitted to bdev module and waiting for completion. 184 * Incremented before submit_request() is called on an spdk_bdev_io. 185 */ 186 uint64_t io_outstanding; 187 188 /* 189 * Queue of IO awaiting retry because of a previous NOMEM status returned 190 * on this channel. 191 */ 192 bdev_io_tailq_t nomem_io; 193 194 /* 195 * Threshold which io_outstanding must drop to before retrying nomem_io. 196 */ 197 uint64_t nomem_threshold; 198 199 /* I/O channel allocated by a bdev module */ 200 struct spdk_io_channel *shared_ch; 201 202 /* Refcount of bdev channels using this resource */ 203 uint32_t ref; 204 205 TAILQ_ENTRY(spdk_bdev_shared_resource) link; 206 }; 207 208 #define BDEV_CH_RESET_IN_PROGRESS (1 << 0) 209 #define BDEV_CH_QOS_ENABLED (1 << 1) 210 211 struct spdk_bdev_channel { 212 struct spdk_bdev *bdev; 213 214 /* The channel for the underlying device */ 215 struct spdk_io_channel *channel; 216 217 /* Per io_device per thread data */ 218 struct spdk_bdev_shared_resource *shared_resource; 219 220 struct spdk_bdev_io_stat stat; 221 222 /* 223 * Count of I/O submitted through this channel and waiting for completion. 224 * Incremented before submit_request() is called on an spdk_bdev_io. 225 */ 226 uint64_t io_outstanding; 227 228 bdev_io_tailq_t queued_resets; 229 230 uint32_t flags; 231 232 #ifdef SPDK_CONFIG_VTUNE 233 uint64_t start_tsc; 234 uint64_t interval_tsc; 235 __itt_string_handle *handle; 236 struct spdk_bdev_io_stat prev_stat; 237 #endif 238 239 }; 240 241 struct spdk_bdev_desc { 242 struct spdk_bdev *bdev; 243 spdk_bdev_remove_cb_t remove_cb; 244 void *remove_ctx; 245 bool remove_scheduled; 246 bool write; 247 TAILQ_ENTRY(spdk_bdev_desc) link; 248 }; 249 250 struct spdk_bdev_iostat_ctx { 251 struct spdk_bdev_io_stat *stat; 252 spdk_bdev_get_device_stat_cb cb; 253 void *cb_arg; 254 }; 255 256 #define __bdev_to_io_dev(bdev) (((char *)bdev) + 1) 257 #define __bdev_from_io_dev(io_dev) ((struct spdk_bdev *)(((char *)io_dev) - 1)) 258 259 static void spdk_bdev_write_zeroes_split(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg); 260 261 void 262 spdk_bdev_get_opts(struct spdk_bdev_opts *opts) 263 { 264 *opts = g_bdev_opts; 265 } 266 267 int 268 spdk_bdev_set_opts(struct spdk_bdev_opts *opts) 269 { 270 uint32_t min_pool_size; 271 272 /* 273 * Add 1 to the thread count to account for the extra mgmt_ch that gets created during subsystem 274 * initialization. A second mgmt_ch will be created on the same thread when the application starts 275 * but before the deferred put_io_channel event is executed for the first mgmt_ch. 276 */ 277 min_pool_size = opts->bdev_io_cache_size * (spdk_thread_get_count() + 1); 278 if (opts->bdev_io_pool_size < min_pool_size) { 279 SPDK_ERRLOG("bdev_io_pool_size %" PRIu32 " is not compatible with bdev_io_cache_size %" PRIu32 280 " and %" PRIu32 " threads\n", opts->bdev_io_pool_size, opts->bdev_io_cache_size, 281 spdk_thread_get_count()); 282 SPDK_ERRLOG("bdev_io_pool_size must be at least %" PRIu32 "\n", min_pool_size); 283 return -1; 284 } 285 286 g_bdev_opts = *opts; 287 return 0; 288 } 289 290 struct spdk_bdev * 291 spdk_bdev_first(void) 292 { 293 struct spdk_bdev *bdev; 294 295 bdev = TAILQ_FIRST(&g_bdev_mgr.bdevs); 296 if (bdev) { 297 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Starting bdev iteration at %s\n", bdev->name); 298 } 299 300 return bdev; 301 } 302 303 struct spdk_bdev * 304 spdk_bdev_next(struct spdk_bdev *prev) 305 { 306 struct spdk_bdev *bdev; 307 308 bdev = TAILQ_NEXT(prev, internal.link); 309 if (bdev) { 310 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Continuing bdev iteration at %s\n", bdev->name); 311 } 312 313 return bdev; 314 } 315 316 static struct spdk_bdev * 317 _bdev_next_leaf(struct spdk_bdev *bdev) 318 { 319 while (bdev != NULL) { 320 if (bdev->internal.claim_module == NULL) { 321 return bdev; 322 } else { 323 bdev = TAILQ_NEXT(bdev, internal.link); 324 } 325 } 326 327 return bdev; 328 } 329 330 struct spdk_bdev * 331 spdk_bdev_first_leaf(void) 332 { 333 struct spdk_bdev *bdev; 334 335 bdev = _bdev_next_leaf(TAILQ_FIRST(&g_bdev_mgr.bdevs)); 336 337 if (bdev) { 338 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Starting bdev iteration at %s\n", bdev->name); 339 } 340 341 return bdev; 342 } 343 344 struct spdk_bdev * 345 spdk_bdev_next_leaf(struct spdk_bdev *prev) 346 { 347 struct spdk_bdev *bdev; 348 349 bdev = _bdev_next_leaf(TAILQ_NEXT(prev, internal.link)); 350 351 if (bdev) { 352 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Continuing bdev iteration at %s\n", bdev->name); 353 } 354 355 return bdev; 356 } 357 358 struct spdk_bdev * 359 spdk_bdev_get_by_name(const char *bdev_name) 360 { 361 struct spdk_bdev_alias *tmp; 362 struct spdk_bdev *bdev = spdk_bdev_first(); 363 364 while (bdev != NULL) { 365 if (strcmp(bdev_name, bdev->name) == 0) { 366 return bdev; 367 } 368 369 TAILQ_FOREACH(tmp, &bdev->aliases, tailq) { 370 if (strcmp(bdev_name, tmp->alias) == 0) { 371 return bdev; 372 } 373 } 374 375 bdev = spdk_bdev_next(bdev); 376 } 377 378 return NULL; 379 } 380 381 void 382 spdk_bdev_io_set_buf(struct spdk_bdev_io *bdev_io, void *buf, size_t len) 383 { 384 struct iovec *iovs; 385 386 iovs = bdev_io->u.bdev.iovs; 387 388 assert(iovs != NULL); 389 assert(bdev_io->u.bdev.iovcnt >= 1); 390 391 iovs[0].iov_base = buf; 392 iovs[0].iov_len = len; 393 } 394 395 static void 396 spdk_bdev_io_put_buf(struct spdk_bdev_io *bdev_io) 397 { 398 struct spdk_mempool *pool; 399 struct spdk_bdev_io *tmp; 400 void *buf, *aligned_buf; 401 bdev_io_stailq_t *stailq; 402 struct spdk_bdev_mgmt_channel *ch; 403 404 assert(bdev_io->u.bdev.iovcnt == 1); 405 406 buf = bdev_io->internal.buf; 407 ch = bdev_io->internal.ch->shared_resource->mgmt_ch; 408 409 bdev_io->internal.buf = NULL; 410 411 if (bdev_io->internal.buf_len <= SPDK_BDEV_SMALL_BUF_MAX_SIZE) { 412 pool = g_bdev_mgr.buf_small_pool; 413 stailq = &ch->need_buf_small; 414 } else { 415 pool = g_bdev_mgr.buf_large_pool; 416 stailq = &ch->need_buf_large; 417 } 418 419 if (STAILQ_EMPTY(stailq)) { 420 spdk_mempool_put(pool, buf); 421 } else { 422 tmp = STAILQ_FIRST(stailq); 423 424 aligned_buf = (void *)(((uintptr_t)buf + 511) & ~511UL); 425 spdk_bdev_io_set_buf(bdev_io, aligned_buf, tmp->internal.buf_len); 426 427 STAILQ_REMOVE_HEAD(stailq, internal.buf_link); 428 tmp->internal.buf = buf; 429 tmp->internal.get_buf_cb(tmp->internal.ch->channel, tmp); 430 } 431 } 432 433 void 434 spdk_bdev_io_get_buf(struct spdk_bdev_io *bdev_io, spdk_bdev_io_get_buf_cb cb, uint64_t len) 435 { 436 struct spdk_mempool *pool; 437 bdev_io_stailq_t *stailq; 438 void *buf, *aligned_buf; 439 struct spdk_bdev_mgmt_channel *mgmt_ch; 440 441 assert(cb != NULL); 442 assert(bdev_io->u.bdev.iovs != NULL); 443 444 if (spdk_unlikely(bdev_io->u.bdev.iovs[0].iov_base != NULL)) { 445 /* Buffer already present */ 446 cb(bdev_io->internal.ch->channel, bdev_io); 447 return; 448 } 449 450 assert(len <= SPDK_BDEV_LARGE_BUF_MAX_SIZE); 451 mgmt_ch = bdev_io->internal.ch->shared_resource->mgmt_ch; 452 453 bdev_io->internal.buf_len = len; 454 bdev_io->internal.get_buf_cb = cb; 455 if (len <= SPDK_BDEV_SMALL_BUF_MAX_SIZE) { 456 pool = g_bdev_mgr.buf_small_pool; 457 stailq = &mgmt_ch->need_buf_small; 458 } else { 459 pool = g_bdev_mgr.buf_large_pool; 460 stailq = &mgmt_ch->need_buf_large; 461 } 462 463 buf = spdk_mempool_get(pool); 464 465 if (!buf) { 466 STAILQ_INSERT_TAIL(stailq, bdev_io, internal.buf_link); 467 } else { 468 aligned_buf = (void *)(((uintptr_t)buf + 511) & ~511UL); 469 spdk_bdev_io_set_buf(bdev_io, aligned_buf, len); 470 471 bdev_io->internal.buf = buf; 472 bdev_io->internal.get_buf_cb(bdev_io->internal.ch->channel, bdev_io); 473 } 474 } 475 476 static int 477 spdk_bdev_module_get_max_ctx_size(void) 478 { 479 struct spdk_bdev_module *bdev_module; 480 int max_bdev_module_size = 0; 481 482 TAILQ_FOREACH(bdev_module, &g_bdev_mgr.bdev_modules, internal.tailq) { 483 if (bdev_module->get_ctx_size && bdev_module->get_ctx_size() > max_bdev_module_size) { 484 max_bdev_module_size = bdev_module->get_ctx_size(); 485 } 486 } 487 488 return max_bdev_module_size; 489 } 490 491 void 492 spdk_bdev_config_text(FILE *fp) 493 { 494 struct spdk_bdev_module *bdev_module; 495 496 TAILQ_FOREACH(bdev_module, &g_bdev_mgr.bdev_modules, internal.tailq) { 497 if (bdev_module->config_text) { 498 bdev_module->config_text(fp); 499 } 500 } 501 } 502 503 void 504 spdk_bdev_subsystem_config_json(struct spdk_json_write_ctx *w) 505 { 506 struct spdk_bdev_module *bdev_module; 507 struct spdk_bdev *bdev; 508 509 assert(w != NULL); 510 511 spdk_json_write_array_begin(w); 512 513 spdk_json_write_object_begin(w); 514 spdk_json_write_named_string(w, "method", "set_bdev_options"); 515 spdk_json_write_name(w, "params"); 516 spdk_json_write_object_begin(w); 517 spdk_json_write_named_uint32(w, "bdev_io_pool_size", g_bdev_opts.bdev_io_pool_size); 518 spdk_json_write_named_uint32(w, "bdev_io_cache_size", g_bdev_opts.bdev_io_cache_size); 519 spdk_json_write_object_end(w); 520 spdk_json_write_object_end(w); 521 522 TAILQ_FOREACH(bdev_module, &g_bdev_mgr.bdev_modules, internal.tailq) { 523 if (bdev_module->config_json) { 524 bdev_module->config_json(w); 525 } 526 } 527 528 TAILQ_FOREACH(bdev, &g_bdev_mgr.bdevs, internal.link) { 529 spdk_bdev_config_json(bdev, w); 530 } 531 532 spdk_json_write_array_end(w); 533 } 534 535 static int 536 spdk_bdev_mgmt_channel_create(void *io_device, void *ctx_buf) 537 { 538 struct spdk_bdev_mgmt_channel *ch = ctx_buf; 539 struct spdk_bdev_io *bdev_io; 540 uint32_t i; 541 542 STAILQ_INIT(&ch->need_buf_small); 543 STAILQ_INIT(&ch->need_buf_large); 544 545 STAILQ_INIT(&ch->per_thread_cache); 546 ch->bdev_io_cache_size = g_bdev_opts.bdev_io_cache_size; 547 548 /* Pre-populate bdev_io cache to ensure this thread cannot be starved. */ 549 ch->per_thread_cache_count = 0; 550 for (i = 0; i < ch->bdev_io_cache_size; i++) { 551 bdev_io = spdk_mempool_get(g_bdev_mgr.bdev_io_pool); 552 assert(bdev_io != NULL); 553 ch->per_thread_cache_count++; 554 STAILQ_INSERT_TAIL(&ch->per_thread_cache, bdev_io, internal.buf_link); 555 } 556 557 TAILQ_INIT(&ch->shared_resources); 558 TAILQ_INIT(&ch->io_wait_queue); 559 560 return 0; 561 } 562 563 static void 564 spdk_bdev_mgmt_channel_destroy(void *io_device, void *ctx_buf) 565 { 566 struct spdk_bdev_mgmt_channel *ch = ctx_buf; 567 struct spdk_bdev_io *bdev_io; 568 569 if (!STAILQ_EMPTY(&ch->need_buf_small) || !STAILQ_EMPTY(&ch->need_buf_large)) { 570 SPDK_ERRLOG("Pending I/O list wasn't empty on mgmt channel free\n"); 571 } 572 573 if (!TAILQ_EMPTY(&ch->shared_resources)) { 574 SPDK_ERRLOG("Module channel list wasn't empty on mgmt channel free\n"); 575 } 576 577 while (!STAILQ_EMPTY(&ch->per_thread_cache)) { 578 bdev_io = STAILQ_FIRST(&ch->per_thread_cache); 579 STAILQ_REMOVE_HEAD(&ch->per_thread_cache, internal.buf_link); 580 ch->per_thread_cache_count--; 581 spdk_mempool_put(g_bdev_mgr.bdev_io_pool, (void *)bdev_io); 582 } 583 584 assert(ch->per_thread_cache_count == 0); 585 } 586 587 static void 588 spdk_bdev_init_complete(int rc) 589 { 590 spdk_bdev_init_cb cb_fn = g_init_cb_fn; 591 void *cb_arg = g_init_cb_arg; 592 struct spdk_bdev_module *m; 593 594 g_bdev_mgr.init_complete = true; 595 g_init_cb_fn = NULL; 596 g_init_cb_arg = NULL; 597 598 /* 599 * For modules that need to know when subsystem init is complete, 600 * inform them now. 601 */ 602 if (rc == 0) { 603 TAILQ_FOREACH(m, &g_bdev_mgr.bdev_modules, internal.tailq) { 604 if (m->init_complete) { 605 m->init_complete(); 606 } 607 } 608 } 609 610 cb_fn(cb_arg, rc); 611 } 612 613 static void 614 spdk_bdev_module_action_complete(void) 615 { 616 struct spdk_bdev_module *m; 617 618 /* 619 * Don't finish bdev subsystem initialization if 620 * module pre-initialization is still in progress, or 621 * the subsystem been already initialized. 622 */ 623 if (!g_bdev_mgr.module_init_complete || g_bdev_mgr.init_complete) { 624 return; 625 } 626 627 /* 628 * Check all bdev modules for inits/examinations in progress. If any 629 * exist, return immediately since we cannot finish bdev subsystem 630 * initialization until all are completed. 631 */ 632 TAILQ_FOREACH(m, &g_bdev_mgr.bdev_modules, internal.tailq) { 633 if (m->internal.action_in_progress > 0) { 634 return; 635 } 636 } 637 638 /* 639 * Modules already finished initialization - now that all 640 * the bdev modules have finished their asynchronous I/O 641 * processing, the entire bdev layer can be marked as complete. 642 */ 643 spdk_bdev_init_complete(0); 644 } 645 646 static void 647 spdk_bdev_module_action_done(struct spdk_bdev_module *module) 648 { 649 assert(module->internal.action_in_progress > 0); 650 module->internal.action_in_progress--; 651 spdk_bdev_module_action_complete(); 652 } 653 654 void 655 spdk_bdev_module_init_done(struct spdk_bdev_module *module) 656 { 657 spdk_bdev_module_action_done(module); 658 } 659 660 void 661 spdk_bdev_module_examine_done(struct spdk_bdev_module *module) 662 { 663 spdk_bdev_module_action_done(module); 664 } 665 666 static int 667 spdk_bdev_modules_init(void) 668 { 669 struct spdk_bdev_module *module; 670 int rc = 0; 671 672 TAILQ_FOREACH(module, &g_bdev_mgr.bdev_modules, internal.tailq) { 673 rc = module->module_init(); 674 if (rc != 0) { 675 break; 676 } 677 } 678 679 g_bdev_mgr.module_init_complete = true; 680 return rc; 681 } 682 683 684 static void 685 spdk_bdev_init_failed_complete(void *cb_arg) 686 { 687 spdk_bdev_init_complete(-1); 688 } 689 690 static void 691 spdk_bdev_init_failed(void *cb_arg) 692 { 693 spdk_bdev_finish(spdk_bdev_init_failed_complete, NULL); 694 } 695 696 void 697 spdk_bdev_initialize(spdk_bdev_init_cb cb_fn, void *cb_arg) 698 { 699 struct spdk_conf_section *sp; 700 struct spdk_bdev_opts bdev_opts; 701 int32_t bdev_io_pool_size, bdev_io_cache_size; 702 int cache_size; 703 int rc = 0; 704 char mempool_name[32]; 705 706 assert(cb_fn != NULL); 707 708 sp = spdk_conf_find_section(NULL, "Bdev"); 709 if (sp != NULL) { 710 spdk_bdev_get_opts(&bdev_opts); 711 712 bdev_io_pool_size = spdk_conf_section_get_intval(sp, "BdevIoPoolSize"); 713 if (bdev_io_pool_size >= 0) { 714 bdev_opts.bdev_io_pool_size = bdev_io_pool_size; 715 } 716 717 bdev_io_cache_size = spdk_conf_section_get_intval(sp, "BdevIoCacheSize"); 718 if (bdev_io_cache_size >= 0) { 719 bdev_opts.bdev_io_cache_size = bdev_io_cache_size; 720 } 721 722 if (spdk_bdev_set_opts(&bdev_opts)) { 723 spdk_bdev_init_complete(-1); 724 return; 725 } 726 727 assert(memcmp(&bdev_opts, &g_bdev_opts, sizeof(bdev_opts)) == 0); 728 } 729 730 g_init_cb_fn = cb_fn; 731 g_init_cb_arg = cb_arg; 732 733 snprintf(mempool_name, sizeof(mempool_name), "bdev_io_%d", getpid()); 734 735 g_bdev_mgr.bdev_io_pool = spdk_mempool_create(mempool_name, 736 g_bdev_opts.bdev_io_pool_size, 737 sizeof(struct spdk_bdev_io) + 738 spdk_bdev_module_get_max_ctx_size(), 739 0, 740 SPDK_ENV_SOCKET_ID_ANY); 741 742 if (g_bdev_mgr.bdev_io_pool == NULL) { 743 SPDK_ERRLOG("could not allocate spdk_bdev_io pool\n"); 744 spdk_bdev_init_complete(-1); 745 return; 746 } 747 748 /** 749 * Ensure no more than half of the total buffers end up local caches, by 750 * using spdk_thread_get_count() to determine how many local caches we need 751 * to account for. 752 */ 753 cache_size = BUF_SMALL_POOL_SIZE / (2 * spdk_thread_get_count()); 754 snprintf(mempool_name, sizeof(mempool_name), "buf_small_pool_%d", getpid()); 755 756 g_bdev_mgr.buf_small_pool = spdk_mempool_create(mempool_name, 757 BUF_SMALL_POOL_SIZE, 758 SPDK_BDEV_SMALL_BUF_MAX_SIZE + 512, 759 cache_size, 760 SPDK_ENV_SOCKET_ID_ANY); 761 if (!g_bdev_mgr.buf_small_pool) { 762 SPDK_ERRLOG("create rbuf small pool failed\n"); 763 spdk_bdev_init_complete(-1); 764 return; 765 } 766 767 cache_size = BUF_LARGE_POOL_SIZE / (2 * spdk_thread_get_count()); 768 snprintf(mempool_name, sizeof(mempool_name), "buf_large_pool_%d", getpid()); 769 770 g_bdev_mgr.buf_large_pool = spdk_mempool_create(mempool_name, 771 BUF_LARGE_POOL_SIZE, 772 SPDK_BDEV_LARGE_BUF_MAX_SIZE + 512, 773 cache_size, 774 SPDK_ENV_SOCKET_ID_ANY); 775 if (!g_bdev_mgr.buf_large_pool) { 776 SPDK_ERRLOG("create rbuf large pool failed\n"); 777 spdk_bdev_init_complete(-1); 778 return; 779 } 780 781 g_bdev_mgr.zero_buffer = spdk_dma_zmalloc(ZERO_BUFFER_SIZE, ZERO_BUFFER_SIZE, 782 NULL); 783 if (!g_bdev_mgr.zero_buffer) { 784 SPDK_ERRLOG("create bdev zero buffer failed\n"); 785 spdk_bdev_init_complete(-1); 786 return; 787 } 788 789 #ifdef SPDK_CONFIG_VTUNE 790 g_bdev_mgr.domain = __itt_domain_create("spdk_bdev"); 791 #endif 792 793 spdk_io_device_register(&g_bdev_mgr, spdk_bdev_mgmt_channel_create, 794 spdk_bdev_mgmt_channel_destroy, 795 sizeof(struct spdk_bdev_mgmt_channel)); 796 797 rc = spdk_bdev_modules_init(); 798 if (rc != 0) { 799 SPDK_ERRLOG("bdev modules init failed\n"); 800 spdk_thread_send_msg(spdk_get_thread(), spdk_bdev_init_failed, NULL); 801 return; 802 } 803 804 spdk_bdev_module_action_complete(); 805 } 806 807 static void 808 spdk_bdev_mgr_unregister_cb(void *io_device) 809 { 810 spdk_bdev_fini_cb cb_fn = g_fini_cb_fn; 811 812 if (spdk_mempool_count(g_bdev_mgr.bdev_io_pool) != g_bdev_opts.bdev_io_pool_size) { 813 SPDK_ERRLOG("bdev IO pool count is %zu but should be %u\n", 814 spdk_mempool_count(g_bdev_mgr.bdev_io_pool), 815 g_bdev_opts.bdev_io_pool_size); 816 } 817 818 if (spdk_mempool_count(g_bdev_mgr.buf_small_pool) != BUF_SMALL_POOL_SIZE) { 819 SPDK_ERRLOG("Small buffer pool count is %zu but should be %u\n", 820 spdk_mempool_count(g_bdev_mgr.buf_small_pool), 821 BUF_SMALL_POOL_SIZE); 822 assert(false); 823 } 824 825 if (spdk_mempool_count(g_bdev_mgr.buf_large_pool) != BUF_LARGE_POOL_SIZE) { 826 SPDK_ERRLOG("Large buffer pool count is %zu but should be %u\n", 827 spdk_mempool_count(g_bdev_mgr.buf_large_pool), 828 BUF_LARGE_POOL_SIZE); 829 assert(false); 830 } 831 832 spdk_mempool_free(g_bdev_mgr.bdev_io_pool); 833 spdk_mempool_free(g_bdev_mgr.buf_small_pool); 834 spdk_mempool_free(g_bdev_mgr.buf_large_pool); 835 spdk_dma_free(g_bdev_mgr.zero_buffer); 836 837 cb_fn(g_fini_cb_arg); 838 g_fini_cb_fn = NULL; 839 g_fini_cb_arg = NULL; 840 } 841 842 static struct spdk_bdev_module *g_resume_bdev_module = NULL; 843 844 static void 845 spdk_bdev_module_finish_iter(void *arg) 846 { 847 struct spdk_bdev_module *bdev_module; 848 849 /* Start iterating from the last touched module */ 850 if (!g_resume_bdev_module) { 851 bdev_module = TAILQ_FIRST(&g_bdev_mgr.bdev_modules); 852 } else { 853 bdev_module = TAILQ_NEXT(g_resume_bdev_module, internal.tailq); 854 } 855 856 while (bdev_module) { 857 if (bdev_module->async_fini) { 858 /* Save our place so we can resume later. We must 859 * save the variable here, before calling module_fini() 860 * below, because in some cases the module may immediately 861 * call spdk_bdev_module_finish_done() and re-enter 862 * this function to continue iterating. */ 863 g_resume_bdev_module = bdev_module; 864 } 865 866 if (bdev_module->module_fini) { 867 bdev_module->module_fini(); 868 } 869 870 if (bdev_module->async_fini) { 871 return; 872 } 873 874 bdev_module = TAILQ_NEXT(bdev_module, internal.tailq); 875 } 876 877 g_resume_bdev_module = NULL; 878 spdk_io_device_unregister(&g_bdev_mgr, spdk_bdev_mgr_unregister_cb); 879 } 880 881 void 882 spdk_bdev_module_finish_done(void) 883 { 884 if (spdk_get_thread() != g_fini_thread) { 885 spdk_thread_send_msg(g_fini_thread, spdk_bdev_module_finish_iter, NULL); 886 } else { 887 spdk_bdev_module_finish_iter(NULL); 888 } 889 } 890 891 static void 892 _spdk_bdev_finish_unregister_bdevs_iter(void *cb_arg, int bdeverrno) 893 { 894 struct spdk_bdev *bdev = cb_arg; 895 896 if (bdeverrno && bdev) { 897 SPDK_WARNLOG("Unable to unregister bdev '%s' during spdk_bdev_finish()\n", 898 bdev->name); 899 900 /* 901 * Since the call to spdk_bdev_unregister() failed, we have no way to free this 902 * bdev; try to continue by manually removing this bdev from the list and continue 903 * with the next bdev in the list. 904 */ 905 TAILQ_REMOVE(&g_bdev_mgr.bdevs, bdev, internal.link); 906 } 907 908 if (TAILQ_EMPTY(&g_bdev_mgr.bdevs)) { 909 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Done unregistering bdevs\n"); 910 /* 911 * Bdev module finish need to be deffered as we might be in the middle of some context 912 * (like bdev part free) that will use this bdev (or private bdev driver ctx data) 913 * after returning. 914 */ 915 spdk_thread_send_msg(spdk_get_thread(), spdk_bdev_module_finish_iter, NULL); 916 return; 917 } 918 919 /* 920 * Unregister the last bdev in the list. The last bdev in the list should be a bdev 921 * that has no bdevs that depend on it. 922 */ 923 bdev = TAILQ_LAST(&g_bdev_mgr.bdevs, spdk_bdev_list); 924 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Unregistering bdev '%s'\n", bdev->name); 925 spdk_bdev_unregister(bdev, _spdk_bdev_finish_unregister_bdevs_iter, bdev); 926 } 927 928 void 929 spdk_bdev_finish(spdk_bdev_fini_cb cb_fn, void *cb_arg) 930 { 931 struct spdk_bdev_module *m; 932 933 assert(cb_fn != NULL); 934 935 g_fini_thread = spdk_get_thread(); 936 937 g_fini_cb_fn = cb_fn; 938 g_fini_cb_arg = cb_arg; 939 940 TAILQ_FOREACH(m, &g_bdev_mgr.bdev_modules, internal.tailq) { 941 if (m->fini_start) { 942 m->fini_start(); 943 } 944 } 945 946 _spdk_bdev_finish_unregister_bdevs_iter(NULL, 0); 947 } 948 949 static struct spdk_bdev_io * 950 spdk_bdev_get_io(struct spdk_bdev_channel *channel) 951 { 952 struct spdk_bdev_mgmt_channel *ch = channel->shared_resource->mgmt_ch; 953 struct spdk_bdev_io *bdev_io; 954 955 if (ch->per_thread_cache_count > 0) { 956 bdev_io = STAILQ_FIRST(&ch->per_thread_cache); 957 STAILQ_REMOVE_HEAD(&ch->per_thread_cache, internal.buf_link); 958 ch->per_thread_cache_count--; 959 } else if (spdk_unlikely(!TAILQ_EMPTY(&ch->io_wait_queue))) { 960 /* 961 * Don't try to look for bdev_ios in the global pool if there are 962 * waiters on bdev_ios - we don't want this caller to jump the line. 963 */ 964 bdev_io = NULL; 965 } else { 966 bdev_io = spdk_mempool_get(g_bdev_mgr.bdev_io_pool); 967 } 968 969 return bdev_io; 970 } 971 972 void 973 spdk_bdev_free_io(struct spdk_bdev_io *bdev_io) 974 { 975 struct spdk_bdev_mgmt_channel *ch = bdev_io->internal.ch->shared_resource->mgmt_ch; 976 977 assert(bdev_io != NULL); 978 assert(bdev_io->internal.status != SPDK_BDEV_IO_STATUS_PENDING); 979 980 if (bdev_io->internal.buf != NULL) { 981 spdk_bdev_io_put_buf(bdev_io); 982 } 983 984 if (ch->per_thread_cache_count < ch->bdev_io_cache_size) { 985 ch->per_thread_cache_count++; 986 STAILQ_INSERT_TAIL(&ch->per_thread_cache, bdev_io, internal.buf_link); 987 while (ch->per_thread_cache_count > 0 && !TAILQ_EMPTY(&ch->io_wait_queue)) { 988 struct spdk_bdev_io_wait_entry *entry; 989 990 entry = TAILQ_FIRST(&ch->io_wait_queue); 991 TAILQ_REMOVE(&ch->io_wait_queue, entry, link); 992 entry->cb_fn(entry->cb_arg); 993 } 994 } else { 995 /* We should never have a full cache with entries on the io wait queue. */ 996 assert(TAILQ_EMPTY(&ch->io_wait_queue)); 997 spdk_mempool_put(g_bdev_mgr.bdev_io_pool, (void *)bdev_io); 998 } 999 } 1000 1001 static uint64_t 1002 _spdk_bdev_get_io_size_in_byte(struct spdk_bdev_io *bdev_io) 1003 { 1004 struct spdk_bdev *bdev = bdev_io->bdev; 1005 1006 switch (bdev_io->type) { 1007 case SPDK_BDEV_IO_TYPE_NVME_ADMIN: 1008 case SPDK_BDEV_IO_TYPE_NVME_IO: 1009 case SPDK_BDEV_IO_TYPE_NVME_IO_MD: 1010 return bdev_io->u.nvme_passthru.nbytes; 1011 case SPDK_BDEV_IO_TYPE_READ: 1012 case SPDK_BDEV_IO_TYPE_WRITE: 1013 case SPDK_BDEV_IO_TYPE_UNMAP: 1014 case SPDK_BDEV_IO_TYPE_WRITE_ZEROES: 1015 return bdev_io->u.bdev.num_blocks * bdev->blocklen; 1016 default: 1017 return 0; 1018 } 1019 } 1020 1021 static void 1022 _spdk_bdev_qos_io_submit(struct spdk_bdev_channel *ch) 1023 { 1024 struct spdk_bdev_io *bdev_io = NULL; 1025 struct spdk_bdev *bdev = ch->bdev; 1026 struct spdk_bdev_qos *qos = bdev->internal.qos; 1027 struct spdk_bdev_shared_resource *shared_resource = ch->shared_resource; 1028 1029 while (!TAILQ_EMPTY(&qos->queued)) { 1030 if (qos->max_ios_per_timeslice > 0 && 1031 qos->io_submitted_this_timeslice >= qos->max_ios_per_timeslice) { 1032 break; 1033 } 1034 1035 if (qos->max_byte_per_timeslice > 0 && 1036 qos->byte_submitted_this_timeslice >= qos->max_byte_per_timeslice) { 1037 break; 1038 } 1039 1040 bdev_io = TAILQ_FIRST(&qos->queued); 1041 TAILQ_REMOVE(&qos->queued, bdev_io, internal.link); 1042 qos->io_submitted_this_timeslice++; 1043 qos->byte_submitted_this_timeslice += _spdk_bdev_get_io_size_in_byte(bdev_io); 1044 ch->io_outstanding++; 1045 shared_resource->io_outstanding++; 1046 bdev->fn_table->submit_request(ch->channel, bdev_io); 1047 } 1048 } 1049 1050 static void 1051 _spdk_bdev_io_submit(void *ctx) 1052 { 1053 struct spdk_bdev_io *bdev_io = ctx; 1054 struct spdk_bdev *bdev = bdev_io->bdev; 1055 struct spdk_bdev_channel *bdev_ch = bdev_io->internal.ch; 1056 struct spdk_io_channel *ch = bdev_ch->channel; 1057 struct spdk_bdev_shared_resource *shared_resource = bdev_ch->shared_resource; 1058 1059 bdev_io->internal.submit_tsc = spdk_get_ticks(); 1060 bdev_ch->io_outstanding++; 1061 shared_resource->io_outstanding++; 1062 bdev_io->internal.in_submit_request = true; 1063 if (spdk_likely(bdev_ch->flags == 0)) { 1064 if (spdk_likely(TAILQ_EMPTY(&shared_resource->nomem_io))) { 1065 bdev->fn_table->submit_request(ch, bdev_io); 1066 } else { 1067 bdev_ch->io_outstanding--; 1068 shared_resource->io_outstanding--; 1069 TAILQ_INSERT_TAIL(&shared_resource->nomem_io, bdev_io, internal.link); 1070 } 1071 } else if (bdev_ch->flags & BDEV_CH_RESET_IN_PROGRESS) { 1072 spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED); 1073 } else if (bdev_ch->flags & BDEV_CH_QOS_ENABLED) { 1074 bdev_ch->io_outstanding--; 1075 shared_resource->io_outstanding--; 1076 TAILQ_INSERT_TAIL(&bdev->internal.qos->queued, bdev_io, internal.link); 1077 _spdk_bdev_qos_io_submit(bdev_ch); 1078 } else { 1079 SPDK_ERRLOG("unknown bdev_ch flag %x found\n", bdev_ch->flags); 1080 spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED); 1081 } 1082 bdev_io->internal.in_submit_request = false; 1083 } 1084 1085 static void 1086 spdk_bdev_io_submit(struct spdk_bdev_io *bdev_io) 1087 { 1088 struct spdk_bdev *bdev = bdev_io->bdev; 1089 struct spdk_thread *thread = spdk_io_channel_get_thread(bdev_io->internal.ch->channel); 1090 1091 assert(thread != NULL); 1092 assert(bdev_io->internal.status == SPDK_BDEV_IO_STATUS_PENDING); 1093 1094 if (bdev_io->internal.ch->flags & BDEV_CH_QOS_ENABLED) { 1095 if ((thread == bdev->internal.qos->thread) || !bdev->internal.qos->thread) { 1096 _spdk_bdev_io_submit(bdev_io); 1097 } else { 1098 bdev_io->internal.io_submit_ch = bdev_io->internal.ch; 1099 bdev_io->internal.ch = bdev->internal.qos->ch; 1100 spdk_thread_send_msg(bdev->internal.qos->thread, _spdk_bdev_io_submit, bdev_io); 1101 } 1102 } else { 1103 _spdk_bdev_io_submit(bdev_io); 1104 } 1105 } 1106 1107 static void 1108 spdk_bdev_io_submit_reset(struct spdk_bdev_io *bdev_io) 1109 { 1110 struct spdk_bdev *bdev = bdev_io->bdev; 1111 struct spdk_bdev_channel *bdev_ch = bdev_io->internal.ch; 1112 struct spdk_io_channel *ch = bdev_ch->channel; 1113 1114 assert(bdev_io->internal.status == SPDK_BDEV_IO_STATUS_PENDING); 1115 1116 bdev_io->internal.in_submit_request = true; 1117 bdev->fn_table->submit_request(ch, bdev_io); 1118 bdev_io->internal.in_submit_request = false; 1119 } 1120 1121 static void 1122 spdk_bdev_io_init(struct spdk_bdev_io *bdev_io, 1123 struct spdk_bdev *bdev, void *cb_arg, 1124 spdk_bdev_io_completion_cb cb) 1125 { 1126 bdev_io->bdev = bdev; 1127 bdev_io->internal.caller_ctx = cb_arg; 1128 bdev_io->internal.cb = cb; 1129 bdev_io->internal.status = SPDK_BDEV_IO_STATUS_PENDING; 1130 bdev_io->internal.in_submit_request = false; 1131 bdev_io->internal.buf = NULL; 1132 bdev_io->internal.io_submit_ch = NULL; 1133 } 1134 1135 static bool 1136 _spdk_bdev_io_type_supported(struct spdk_bdev *bdev, enum spdk_bdev_io_type io_type) 1137 { 1138 return bdev->fn_table->io_type_supported(bdev->ctxt, io_type); 1139 } 1140 1141 bool 1142 spdk_bdev_io_type_supported(struct spdk_bdev *bdev, enum spdk_bdev_io_type io_type) 1143 { 1144 bool supported; 1145 1146 supported = _spdk_bdev_io_type_supported(bdev, io_type); 1147 1148 if (!supported) { 1149 switch (io_type) { 1150 case SPDK_BDEV_IO_TYPE_WRITE_ZEROES: 1151 /* The bdev layer will emulate write zeroes as long as write is supported. */ 1152 supported = _spdk_bdev_io_type_supported(bdev, SPDK_BDEV_IO_TYPE_WRITE); 1153 break; 1154 default: 1155 break; 1156 } 1157 } 1158 1159 return supported; 1160 } 1161 1162 int 1163 spdk_bdev_dump_info_json(struct spdk_bdev *bdev, struct spdk_json_write_ctx *w) 1164 { 1165 if (bdev->fn_table->dump_info_json) { 1166 return bdev->fn_table->dump_info_json(bdev->ctxt, w); 1167 } 1168 1169 return 0; 1170 } 1171 1172 void 1173 spdk_bdev_config_json(struct spdk_bdev *bdev, struct spdk_json_write_ctx *w) 1174 { 1175 assert(bdev != NULL); 1176 assert(w != NULL); 1177 1178 if (bdev->fn_table->write_config_json) { 1179 bdev->fn_table->write_config_json(bdev, w); 1180 } else { 1181 spdk_json_write_object_begin(w); 1182 spdk_json_write_named_string(w, "name", bdev->name); 1183 spdk_json_write_object_end(w); 1184 } 1185 } 1186 1187 static void 1188 spdk_bdev_qos_update_max_quota_per_timeslice(struct spdk_bdev_qos *qos) 1189 { 1190 uint64_t max_ios_per_timeslice = 0, max_byte_per_timeslice = 0; 1191 1192 if (qos->iops_rate_limit > 0) { 1193 max_ios_per_timeslice = qos->iops_rate_limit * SPDK_BDEV_QOS_TIMESLICE_IN_USEC / 1194 SPDK_BDEV_SEC_TO_USEC; 1195 qos->max_ios_per_timeslice = spdk_max(max_ios_per_timeslice, 1196 SPDK_BDEV_QOS_MIN_IO_PER_TIMESLICE); 1197 } 1198 1199 if (qos->byte_rate_limit > 0) { 1200 max_byte_per_timeslice = qos->byte_rate_limit * SPDK_BDEV_QOS_TIMESLICE_IN_USEC / 1201 SPDK_BDEV_SEC_TO_USEC; 1202 qos->max_byte_per_timeslice = spdk_max(max_byte_per_timeslice, 1203 SPDK_BDEV_QOS_MIN_BYTE_PER_TIMESLICE); 1204 } 1205 } 1206 1207 static int 1208 spdk_bdev_channel_poll_qos(void *arg) 1209 { 1210 struct spdk_bdev_qos *qos = arg; 1211 1212 /* Reset for next round of rate limiting */ 1213 qos->io_submitted_this_timeslice = 0; 1214 1215 /* More bytes sent in the last timeslice, allow less in this timeslice */ 1216 if (qos->byte_submitted_this_timeslice > qos->max_byte_per_timeslice) { 1217 qos->byte_submitted_this_timeslice -= qos->max_byte_per_timeslice; 1218 } else { 1219 qos->byte_submitted_this_timeslice = 0; 1220 } 1221 1222 _spdk_bdev_qos_io_submit(qos->ch); 1223 1224 return -1; 1225 } 1226 1227 static void 1228 _spdk_bdev_channel_destroy_resource(struct spdk_bdev_channel *ch) 1229 { 1230 struct spdk_bdev_shared_resource *shared_resource; 1231 1232 if (!ch) { 1233 return; 1234 } 1235 1236 if (ch->channel) { 1237 spdk_put_io_channel(ch->channel); 1238 } 1239 1240 assert(ch->io_outstanding == 0); 1241 1242 shared_resource = ch->shared_resource; 1243 if (shared_resource) { 1244 assert(ch->io_outstanding == 0); 1245 assert(shared_resource->ref > 0); 1246 shared_resource->ref--; 1247 if (shared_resource->ref == 0) { 1248 assert(shared_resource->io_outstanding == 0); 1249 TAILQ_REMOVE(&shared_resource->mgmt_ch->shared_resources, shared_resource, link); 1250 spdk_put_io_channel(spdk_io_channel_from_ctx(shared_resource->mgmt_ch)); 1251 free(shared_resource); 1252 } 1253 } 1254 } 1255 1256 /* Caller must hold bdev->internal.mutex. */ 1257 static void 1258 _spdk_bdev_enable_qos(struct spdk_bdev *bdev, struct spdk_bdev_channel *ch) 1259 { 1260 struct spdk_bdev_qos *qos = bdev->internal.qos; 1261 1262 /* Rate limiting on this bdev enabled */ 1263 if (qos) { 1264 if (qos->ch == NULL) { 1265 struct spdk_io_channel *io_ch; 1266 1267 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Selecting channel %p as QoS channel for bdev %s on thread %p\n", ch, 1268 bdev->name, spdk_get_thread()); 1269 1270 /* No qos channel has been selected, so set one up */ 1271 1272 /* Take another reference to ch */ 1273 io_ch = spdk_get_io_channel(__bdev_to_io_dev(bdev)); 1274 qos->ch = ch; 1275 1276 qos->thread = spdk_io_channel_get_thread(io_ch); 1277 1278 TAILQ_INIT(&qos->queued); 1279 spdk_bdev_qos_update_max_quota_per_timeslice(qos); 1280 qos->io_submitted_this_timeslice = 0; 1281 qos->byte_submitted_this_timeslice = 0; 1282 1283 qos->poller = spdk_poller_register(spdk_bdev_channel_poll_qos, 1284 qos, 1285 SPDK_BDEV_QOS_TIMESLICE_IN_USEC); 1286 } 1287 1288 ch->flags |= BDEV_CH_QOS_ENABLED; 1289 } 1290 } 1291 1292 static int 1293 spdk_bdev_channel_create(void *io_device, void *ctx_buf) 1294 { 1295 struct spdk_bdev *bdev = __bdev_from_io_dev(io_device); 1296 struct spdk_bdev_channel *ch = ctx_buf; 1297 struct spdk_io_channel *mgmt_io_ch; 1298 struct spdk_bdev_mgmt_channel *mgmt_ch; 1299 struct spdk_bdev_shared_resource *shared_resource; 1300 1301 ch->bdev = bdev; 1302 ch->channel = bdev->fn_table->get_io_channel(bdev->ctxt); 1303 if (!ch->channel) { 1304 return -1; 1305 } 1306 1307 mgmt_io_ch = spdk_get_io_channel(&g_bdev_mgr); 1308 if (!mgmt_io_ch) { 1309 return -1; 1310 } 1311 1312 mgmt_ch = spdk_io_channel_get_ctx(mgmt_io_ch); 1313 TAILQ_FOREACH(shared_resource, &mgmt_ch->shared_resources, link) { 1314 if (shared_resource->shared_ch == ch->channel) { 1315 spdk_put_io_channel(mgmt_io_ch); 1316 shared_resource->ref++; 1317 break; 1318 } 1319 } 1320 1321 if (shared_resource == NULL) { 1322 shared_resource = calloc(1, sizeof(*shared_resource)); 1323 if (shared_resource == NULL) { 1324 spdk_put_io_channel(mgmt_io_ch); 1325 return -1; 1326 } 1327 1328 shared_resource->mgmt_ch = mgmt_ch; 1329 shared_resource->io_outstanding = 0; 1330 TAILQ_INIT(&shared_resource->nomem_io); 1331 shared_resource->nomem_threshold = 0; 1332 shared_resource->shared_ch = ch->channel; 1333 shared_resource->ref = 1; 1334 TAILQ_INSERT_TAIL(&mgmt_ch->shared_resources, shared_resource, link); 1335 } 1336 1337 memset(&ch->stat, 0, sizeof(ch->stat)); 1338 ch->stat.ticks_rate = spdk_get_ticks_hz(); 1339 ch->io_outstanding = 0; 1340 TAILQ_INIT(&ch->queued_resets); 1341 ch->flags = 0; 1342 ch->shared_resource = shared_resource; 1343 1344 #ifdef SPDK_CONFIG_VTUNE 1345 { 1346 char *name; 1347 __itt_init_ittlib(NULL, 0); 1348 name = spdk_sprintf_alloc("spdk_bdev_%s_%p", ch->bdev->name, ch); 1349 if (!name) { 1350 _spdk_bdev_channel_destroy_resource(ch); 1351 return -1; 1352 } 1353 ch->handle = __itt_string_handle_create(name); 1354 free(name); 1355 ch->start_tsc = spdk_get_ticks(); 1356 ch->interval_tsc = spdk_get_ticks_hz() / 100; 1357 memset(&ch->prev_stat, 0, sizeof(ch->prev_stat)); 1358 } 1359 #endif 1360 1361 pthread_mutex_lock(&bdev->internal.mutex); 1362 _spdk_bdev_enable_qos(bdev, ch); 1363 pthread_mutex_unlock(&bdev->internal.mutex); 1364 1365 return 0; 1366 } 1367 1368 /* 1369 * Abort I/O that are waiting on a data buffer. These types of I/O are 1370 * linked using the spdk_bdev_io internal.buf_link TAILQ_ENTRY. 1371 */ 1372 static void 1373 _spdk_bdev_abort_buf_io(bdev_io_stailq_t *queue, struct spdk_bdev_channel *ch) 1374 { 1375 bdev_io_stailq_t tmp; 1376 struct spdk_bdev_io *bdev_io; 1377 1378 STAILQ_INIT(&tmp); 1379 1380 while (!STAILQ_EMPTY(queue)) { 1381 bdev_io = STAILQ_FIRST(queue); 1382 STAILQ_REMOVE_HEAD(queue, internal.buf_link); 1383 if (bdev_io->internal.ch == ch) { 1384 spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED); 1385 } else { 1386 STAILQ_INSERT_TAIL(&tmp, bdev_io, internal.buf_link); 1387 } 1388 } 1389 1390 STAILQ_SWAP(&tmp, queue, spdk_bdev_io); 1391 } 1392 1393 /* 1394 * Abort I/O that are queued waiting for submission. These types of I/O are 1395 * linked using the spdk_bdev_io link TAILQ_ENTRY. 1396 */ 1397 static void 1398 _spdk_bdev_abort_queued_io(bdev_io_tailq_t *queue, struct spdk_bdev_channel *ch) 1399 { 1400 struct spdk_bdev_io *bdev_io, *tmp; 1401 1402 TAILQ_FOREACH_SAFE(bdev_io, queue, internal.link, tmp) { 1403 if (bdev_io->internal.ch == ch) { 1404 TAILQ_REMOVE(queue, bdev_io, internal.link); 1405 /* 1406 * spdk_bdev_io_complete() assumes that the completed I/O had 1407 * been submitted to the bdev module. Since in this case it 1408 * hadn't, bump io_outstanding to account for the decrement 1409 * that spdk_bdev_io_complete() will do. 1410 */ 1411 if (bdev_io->type != SPDK_BDEV_IO_TYPE_RESET) { 1412 ch->io_outstanding++; 1413 ch->shared_resource->io_outstanding++; 1414 } 1415 spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED); 1416 } 1417 } 1418 } 1419 1420 static void 1421 spdk_bdev_qos_channel_destroy(void *cb_arg) 1422 { 1423 struct spdk_bdev_qos *qos = cb_arg; 1424 1425 spdk_put_io_channel(spdk_io_channel_from_ctx(qos->ch)); 1426 spdk_poller_unregister(&qos->poller); 1427 1428 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Free QoS %p.\n", qos); 1429 1430 free(qos); 1431 } 1432 1433 static int 1434 spdk_bdev_qos_destroy(struct spdk_bdev *bdev) 1435 { 1436 /* 1437 * Cleanly shutting down the QoS poller is tricky, because 1438 * during the asynchronous operation the user could open 1439 * a new descriptor and create a new channel, spawning 1440 * a new QoS poller. 1441 * 1442 * The strategy is to create a new QoS structure here and swap it 1443 * in. The shutdown path then continues to refer to the old one 1444 * until it completes and then releases it. 1445 */ 1446 struct spdk_bdev_qos *new_qos, *old_qos; 1447 1448 old_qos = bdev->internal.qos; 1449 1450 new_qos = calloc(1, sizeof(*new_qos)); 1451 if (!new_qos) { 1452 SPDK_ERRLOG("Unable to allocate memory to shut down QoS.\n"); 1453 return -ENOMEM; 1454 } 1455 1456 /* Copy the old QoS data into the newly allocated structure */ 1457 memcpy(new_qos, old_qos, sizeof(*new_qos)); 1458 1459 /* Zero out the key parts of the QoS structure */ 1460 new_qos->ch = NULL; 1461 new_qos->thread = NULL; 1462 new_qos->max_ios_per_timeslice = 0; 1463 new_qos->max_byte_per_timeslice = 0; 1464 new_qos->io_submitted_this_timeslice = 0; 1465 new_qos->byte_submitted_this_timeslice = 0; 1466 new_qos->poller = NULL; 1467 TAILQ_INIT(&new_qos->queued); 1468 1469 bdev->internal.qos = new_qos; 1470 1471 if (old_qos->thread == NULL) { 1472 free(old_qos); 1473 } else { 1474 spdk_thread_send_msg(old_qos->thread, spdk_bdev_qos_channel_destroy, 1475 old_qos); 1476 } 1477 1478 /* It is safe to continue with destroying the bdev even though the QoS channel hasn't 1479 * been destroyed yet. The destruction path will end up waiting for the final 1480 * channel to be put before it releases resources. */ 1481 1482 return 0; 1483 } 1484 1485 static void 1486 _spdk_bdev_io_stat_add(struct spdk_bdev_io_stat *total, struct spdk_bdev_io_stat *add) 1487 { 1488 total->bytes_read += add->bytes_read; 1489 total->num_read_ops += add->num_read_ops; 1490 total->bytes_written += add->bytes_written; 1491 total->num_write_ops += add->num_write_ops; 1492 total->read_latency_ticks += add->read_latency_ticks; 1493 total->write_latency_ticks += add->write_latency_ticks; 1494 } 1495 1496 static void 1497 spdk_bdev_channel_destroy(void *io_device, void *ctx_buf) 1498 { 1499 struct spdk_bdev_channel *ch = ctx_buf; 1500 struct spdk_bdev_mgmt_channel *mgmt_ch; 1501 struct spdk_bdev_shared_resource *shared_resource = ch->shared_resource; 1502 1503 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Destroying channel %p for bdev %s on thread %p\n", ch, ch->bdev->name, 1504 spdk_get_thread()); 1505 1506 /* This channel is going away, so add its statistics into the bdev so that they don't get lost. */ 1507 pthread_mutex_lock(&ch->bdev->internal.mutex); 1508 _spdk_bdev_io_stat_add(&ch->bdev->internal.stat, &ch->stat); 1509 pthread_mutex_unlock(&ch->bdev->internal.mutex); 1510 1511 mgmt_ch = shared_resource->mgmt_ch; 1512 1513 _spdk_bdev_abort_queued_io(&ch->queued_resets, ch); 1514 _spdk_bdev_abort_queued_io(&shared_resource->nomem_io, ch); 1515 _spdk_bdev_abort_buf_io(&mgmt_ch->need_buf_small, ch); 1516 _spdk_bdev_abort_buf_io(&mgmt_ch->need_buf_large, ch); 1517 1518 _spdk_bdev_channel_destroy_resource(ch); 1519 } 1520 1521 int 1522 spdk_bdev_alias_add(struct spdk_bdev *bdev, const char *alias) 1523 { 1524 struct spdk_bdev_alias *tmp; 1525 1526 if (alias == NULL) { 1527 SPDK_ERRLOG("Empty alias passed\n"); 1528 return -EINVAL; 1529 } 1530 1531 if (spdk_bdev_get_by_name(alias)) { 1532 SPDK_ERRLOG("Bdev name/alias: %s already exists\n", alias); 1533 return -EEXIST; 1534 } 1535 1536 tmp = calloc(1, sizeof(*tmp)); 1537 if (tmp == NULL) { 1538 SPDK_ERRLOG("Unable to allocate alias\n"); 1539 return -ENOMEM; 1540 } 1541 1542 tmp->alias = strdup(alias); 1543 if (tmp->alias == NULL) { 1544 free(tmp); 1545 SPDK_ERRLOG("Unable to allocate alias\n"); 1546 return -ENOMEM; 1547 } 1548 1549 TAILQ_INSERT_TAIL(&bdev->aliases, tmp, tailq); 1550 1551 return 0; 1552 } 1553 1554 int 1555 spdk_bdev_alias_del(struct spdk_bdev *bdev, const char *alias) 1556 { 1557 struct spdk_bdev_alias *tmp; 1558 1559 TAILQ_FOREACH(tmp, &bdev->aliases, tailq) { 1560 if (strcmp(alias, tmp->alias) == 0) { 1561 TAILQ_REMOVE(&bdev->aliases, tmp, tailq); 1562 free(tmp->alias); 1563 free(tmp); 1564 return 0; 1565 } 1566 } 1567 1568 SPDK_INFOLOG(SPDK_LOG_BDEV, "Alias %s does not exists\n", alias); 1569 1570 return -ENOENT; 1571 } 1572 1573 void 1574 spdk_bdev_alias_del_all(struct spdk_bdev *bdev) 1575 { 1576 struct spdk_bdev_alias *p, *tmp; 1577 1578 TAILQ_FOREACH_SAFE(p, &bdev->aliases, tailq, tmp) { 1579 TAILQ_REMOVE(&bdev->aliases, p, tailq); 1580 free(p->alias); 1581 free(p); 1582 } 1583 } 1584 1585 struct spdk_io_channel * 1586 spdk_bdev_get_io_channel(struct spdk_bdev_desc *desc) 1587 { 1588 return spdk_get_io_channel(__bdev_to_io_dev(desc->bdev)); 1589 } 1590 1591 const char * 1592 spdk_bdev_get_name(const struct spdk_bdev *bdev) 1593 { 1594 return bdev->name; 1595 } 1596 1597 const char * 1598 spdk_bdev_get_product_name(const struct spdk_bdev *bdev) 1599 { 1600 return bdev->product_name; 1601 } 1602 1603 const struct spdk_bdev_aliases_list * 1604 spdk_bdev_get_aliases(const struct spdk_bdev *bdev) 1605 { 1606 return &bdev->aliases; 1607 } 1608 1609 uint32_t 1610 spdk_bdev_get_block_size(const struct spdk_bdev *bdev) 1611 { 1612 return bdev->blocklen; 1613 } 1614 1615 uint64_t 1616 spdk_bdev_get_num_blocks(const struct spdk_bdev *bdev) 1617 { 1618 return bdev->blockcnt; 1619 } 1620 1621 uint64_t 1622 spdk_bdev_get_qos_ios_per_sec(struct spdk_bdev *bdev) 1623 { 1624 uint64_t iops_rate_limit = 0; 1625 1626 pthread_mutex_lock(&bdev->internal.mutex); 1627 if (bdev->internal.qos) { 1628 iops_rate_limit = bdev->internal.qos->iops_rate_limit; 1629 } 1630 pthread_mutex_unlock(&bdev->internal.mutex); 1631 1632 return iops_rate_limit; 1633 } 1634 1635 size_t 1636 spdk_bdev_get_buf_align(const struct spdk_bdev *bdev) 1637 { 1638 /* TODO: push this logic down to the bdev modules */ 1639 if (bdev->need_aligned_buffer) { 1640 return bdev->blocklen; 1641 } 1642 1643 return 1; 1644 } 1645 1646 uint32_t 1647 spdk_bdev_get_optimal_io_boundary(const struct spdk_bdev *bdev) 1648 { 1649 return bdev->optimal_io_boundary; 1650 } 1651 1652 bool 1653 spdk_bdev_has_write_cache(const struct spdk_bdev *bdev) 1654 { 1655 return bdev->write_cache; 1656 } 1657 1658 const struct spdk_uuid * 1659 spdk_bdev_get_uuid(const struct spdk_bdev *bdev) 1660 { 1661 return &bdev->uuid; 1662 } 1663 1664 uint64_t 1665 spdk_bdev_get_qd(const struct spdk_bdev *bdev) 1666 { 1667 return bdev->internal.measured_queue_depth; 1668 } 1669 1670 uint64_t 1671 spdk_bdev_get_qd_sampling_period(const struct spdk_bdev *bdev) 1672 { 1673 return bdev->internal.period; 1674 } 1675 1676 uint64_t 1677 spdk_bdev_get_weighted_io_time(const struct spdk_bdev *bdev) 1678 { 1679 return bdev->internal.weighted_io_time; 1680 } 1681 1682 uint64_t 1683 spdk_bdev_get_io_time(const struct spdk_bdev *bdev) 1684 { 1685 return bdev->internal.io_time; 1686 } 1687 1688 static void 1689 _calculate_measured_qd_cpl(struct spdk_io_channel_iter *i, int status) 1690 { 1691 struct spdk_bdev *bdev = spdk_io_channel_iter_get_ctx(i); 1692 1693 bdev->internal.measured_queue_depth = bdev->internal.temporary_queue_depth; 1694 1695 if (bdev->internal.measured_queue_depth) { 1696 bdev->internal.weighted_io_time += bdev->internal.period * bdev->internal.measured_queue_depth; 1697 } 1698 } 1699 1700 static void 1701 _calculate_measured_qd(struct spdk_io_channel_iter *i) 1702 { 1703 struct spdk_bdev *bdev = spdk_io_channel_iter_get_ctx(i); 1704 struct spdk_io_channel *io_ch = spdk_io_channel_iter_get_channel(i); 1705 struct spdk_bdev_channel *ch = spdk_io_channel_get_ctx(io_ch); 1706 1707 bdev->internal.temporary_queue_depth += ch->io_outstanding; 1708 spdk_for_each_channel_continue(i, 0); 1709 } 1710 1711 static int 1712 spdk_bdev_calculate_measured_queue_depth(void *ctx) 1713 { 1714 struct spdk_bdev *bdev = ctx; 1715 bdev->internal.temporary_queue_depth = 0; 1716 spdk_for_each_channel(__bdev_to_io_dev(bdev), _calculate_measured_qd, bdev, 1717 _calculate_measured_qd_cpl); 1718 return 0; 1719 } 1720 1721 void 1722 spdk_bdev_set_qd_sampling_period(struct spdk_bdev *bdev, uint64_t period) 1723 { 1724 bdev->internal.period = period; 1725 1726 if (bdev->internal.qd_poller != NULL) { 1727 spdk_poller_unregister(&bdev->internal.qd_poller); 1728 bdev->internal.measured_queue_depth = UINT64_MAX; 1729 } 1730 1731 if (period != 0) { 1732 bdev->internal.qd_poller = spdk_poller_register(spdk_bdev_calculate_measured_queue_depth, bdev, 1733 period); 1734 } 1735 } 1736 1737 int 1738 spdk_bdev_notify_blockcnt_change(struct spdk_bdev *bdev, uint64_t size) 1739 { 1740 int ret; 1741 1742 pthread_mutex_lock(&bdev->internal.mutex); 1743 1744 /* bdev has open descriptors */ 1745 if (!TAILQ_EMPTY(&bdev->internal.open_descs) && 1746 bdev->blockcnt > size) { 1747 ret = -EBUSY; 1748 } else { 1749 bdev->blockcnt = size; 1750 ret = 0; 1751 } 1752 1753 pthread_mutex_unlock(&bdev->internal.mutex); 1754 1755 return ret; 1756 } 1757 1758 /* 1759 * Convert I/O offset and length from bytes to blocks. 1760 * 1761 * Returns zero on success or non-zero if the byte parameters aren't divisible by the block size. 1762 */ 1763 static uint64_t 1764 spdk_bdev_bytes_to_blocks(struct spdk_bdev *bdev, uint64_t offset_bytes, uint64_t *offset_blocks, 1765 uint64_t num_bytes, uint64_t *num_blocks) 1766 { 1767 uint32_t block_size = bdev->blocklen; 1768 1769 *offset_blocks = offset_bytes / block_size; 1770 *num_blocks = num_bytes / block_size; 1771 1772 return (offset_bytes % block_size) | (num_bytes % block_size); 1773 } 1774 1775 static bool 1776 spdk_bdev_io_valid_blocks(struct spdk_bdev *bdev, uint64_t offset_blocks, uint64_t num_blocks) 1777 { 1778 /* Return failure if offset_blocks + num_blocks is less than offset_blocks; indicates there 1779 * has been an overflow and hence the offset has been wrapped around */ 1780 if (offset_blocks + num_blocks < offset_blocks) { 1781 return false; 1782 } 1783 1784 /* Return failure if offset_blocks + num_blocks exceeds the size of the bdev */ 1785 if (offset_blocks + num_blocks > bdev->blockcnt) { 1786 return false; 1787 } 1788 1789 return true; 1790 } 1791 1792 int 1793 spdk_bdev_read(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 1794 void *buf, uint64_t offset, uint64_t nbytes, 1795 spdk_bdev_io_completion_cb cb, void *cb_arg) 1796 { 1797 uint64_t offset_blocks, num_blocks; 1798 1799 if (spdk_bdev_bytes_to_blocks(desc->bdev, offset, &offset_blocks, nbytes, &num_blocks) != 0) { 1800 return -EINVAL; 1801 } 1802 1803 return spdk_bdev_read_blocks(desc, ch, buf, offset_blocks, num_blocks, cb, cb_arg); 1804 } 1805 1806 int 1807 spdk_bdev_read_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 1808 void *buf, uint64_t offset_blocks, uint64_t num_blocks, 1809 spdk_bdev_io_completion_cb cb, void *cb_arg) 1810 { 1811 struct spdk_bdev *bdev = desc->bdev; 1812 struct spdk_bdev_io *bdev_io; 1813 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 1814 1815 if (!spdk_bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) { 1816 return -EINVAL; 1817 } 1818 1819 bdev_io = spdk_bdev_get_io(channel); 1820 if (!bdev_io) { 1821 return -ENOMEM; 1822 } 1823 1824 bdev_io->internal.ch = channel; 1825 bdev_io->internal.desc = desc; 1826 bdev_io->type = SPDK_BDEV_IO_TYPE_READ; 1827 bdev_io->u.bdev.iovs = &bdev_io->iov; 1828 bdev_io->u.bdev.iovs[0].iov_base = buf; 1829 bdev_io->u.bdev.iovs[0].iov_len = num_blocks * bdev->blocklen; 1830 bdev_io->u.bdev.iovcnt = 1; 1831 bdev_io->u.bdev.num_blocks = num_blocks; 1832 bdev_io->u.bdev.offset_blocks = offset_blocks; 1833 spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb); 1834 1835 spdk_bdev_io_submit(bdev_io); 1836 return 0; 1837 } 1838 1839 int 1840 spdk_bdev_readv(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 1841 struct iovec *iov, int iovcnt, 1842 uint64_t offset, uint64_t nbytes, 1843 spdk_bdev_io_completion_cb cb, void *cb_arg) 1844 { 1845 uint64_t offset_blocks, num_blocks; 1846 1847 if (spdk_bdev_bytes_to_blocks(desc->bdev, offset, &offset_blocks, nbytes, &num_blocks) != 0) { 1848 return -EINVAL; 1849 } 1850 1851 return spdk_bdev_readv_blocks(desc, ch, iov, iovcnt, offset_blocks, num_blocks, cb, cb_arg); 1852 } 1853 1854 int spdk_bdev_readv_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 1855 struct iovec *iov, int iovcnt, 1856 uint64_t offset_blocks, uint64_t num_blocks, 1857 spdk_bdev_io_completion_cb cb, void *cb_arg) 1858 { 1859 struct spdk_bdev *bdev = desc->bdev; 1860 struct spdk_bdev_io *bdev_io; 1861 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 1862 1863 if (!spdk_bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) { 1864 return -EINVAL; 1865 } 1866 1867 bdev_io = spdk_bdev_get_io(channel); 1868 if (!bdev_io) { 1869 return -ENOMEM; 1870 } 1871 1872 bdev_io->internal.ch = channel; 1873 bdev_io->internal.desc = desc; 1874 bdev_io->type = SPDK_BDEV_IO_TYPE_READ; 1875 bdev_io->u.bdev.iovs = iov; 1876 bdev_io->u.bdev.iovcnt = iovcnt; 1877 bdev_io->u.bdev.num_blocks = num_blocks; 1878 bdev_io->u.bdev.offset_blocks = offset_blocks; 1879 spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb); 1880 1881 spdk_bdev_io_submit(bdev_io); 1882 return 0; 1883 } 1884 1885 int 1886 spdk_bdev_write(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 1887 void *buf, uint64_t offset, uint64_t nbytes, 1888 spdk_bdev_io_completion_cb cb, void *cb_arg) 1889 { 1890 uint64_t offset_blocks, num_blocks; 1891 1892 if (spdk_bdev_bytes_to_blocks(desc->bdev, offset, &offset_blocks, nbytes, &num_blocks) != 0) { 1893 return -EINVAL; 1894 } 1895 1896 return spdk_bdev_write_blocks(desc, ch, buf, offset_blocks, num_blocks, cb, cb_arg); 1897 } 1898 1899 int 1900 spdk_bdev_write_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 1901 void *buf, uint64_t offset_blocks, uint64_t num_blocks, 1902 spdk_bdev_io_completion_cb cb, void *cb_arg) 1903 { 1904 struct spdk_bdev *bdev = desc->bdev; 1905 struct spdk_bdev_io *bdev_io; 1906 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 1907 1908 if (!desc->write) { 1909 return -EBADF; 1910 } 1911 1912 if (!spdk_bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) { 1913 return -EINVAL; 1914 } 1915 1916 bdev_io = spdk_bdev_get_io(channel); 1917 if (!bdev_io) { 1918 return -ENOMEM; 1919 } 1920 1921 bdev_io->internal.ch = channel; 1922 bdev_io->internal.desc = desc; 1923 bdev_io->type = SPDK_BDEV_IO_TYPE_WRITE; 1924 bdev_io->u.bdev.iovs = &bdev_io->iov; 1925 bdev_io->u.bdev.iovs[0].iov_base = buf; 1926 bdev_io->u.bdev.iovs[0].iov_len = num_blocks * bdev->blocklen; 1927 bdev_io->u.bdev.iovcnt = 1; 1928 bdev_io->u.bdev.num_blocks = num_blocks; 1929 bdev_io->u.bdev.offset_blocks = offset_blocks; 1930 spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb); 1931 1932 spdk_bdev_io_submit(bdev_io); 1933 return 0; 1934 } 1935 1936 int 1937 spdk_bdev_writev(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 1938 struct iovec *iov, int iovcnt, 1939 uint64_t offset, uint64_t len, 1940 spdk_bdev_io_completion_cb cb, void *cb_arg) 1941 { 1942 uint64_t offset_blocks, num_blocks; 1943 1944 if (spdk_bdev_bytes_to_blocks(desc->bdev, offset, &offset_blocks, len, &num_blocks) != 0) { 1945 return -EINVAL; 1946 } 1947 1948 return spdk_bdev_writev_blocks(desc, ch, iov, iovcnt, offset_blocks, num_blocks, cb, cb_arg); 1949 } 1950 1951 int 1952 spdk_bdev_writev_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 1953 struct iovec *iov, int iovcnt, 1954 uint64_t offset_blocks, uint64_t num_blocks, 1955 spdk_bdev_io_completion_cb cb, void *cb_arg) 1956 { 1957 struct spdk_bdev *bdev = desc->bdev; 1958 struct spdk_bdev_io *bdev_io; 1959 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 1960 1961 if (!desc->write) { 1962 return -EBADF; 1963 } 1964 1965 if (!spdk_bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) { 1966 return -EINVAL; 1967 } 1968 1969 bdev_io = spdk_bdev_get_io(channel); 1970 if (!bdev_io) { 1971 return -ENOMEM; 1972 } 1973 1974 bdev_io->internal.ch = channel; 1975 bdev_io->internal.desc = desc; 1976 bdev_io->type = SPDK_BDEV_IO_TYPE_WRITE; 1977 bdev_io->u.bdev.iovs = iov; 1978 bdev_io->u.bdev.iovcnt = iovcnt; 1979 bdev_io->u.bdev.num_blocks = num_blocks; 1980 bdev_io->u.bdev.offset_blocks = offset_blocks; 1981 spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb); 1982 1983 spdk_bdev_io_submit(bdev_io); 1984 return 0; 1985 } 1986 1987 int 1988 spdk_bdev_write_zeroes(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 1989 uint64_t offset, uint64_t len, 1990 spdk_bdev_io_completion_cb cb, void *cb_arg) 1991 { 1992 uint64_t offset_blocks, num_blocks; 1993 1994 if (spdk_bdev_bytes_to_blocks(desc->bdev, offset, &offset_blocks, len, &num_blocks) != 0) { 1995 return -EINVAL; 1996 } 1997 1998 return spdk_bdev_write_zeroes_blocks(desc, ch, offset_blocks, num_blocks, cb, cb_arg); 1999 } 2000 2001 int 2002 spdk_bdev_write_zeroes_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 2003 uint64_t offset_blocks, uint64_t num_blocks, 2004 spdk_bdev_io_completion_cb cb, void *cb_arg) 2005 { 2006 struct spdk_bdev *bdev = desc->bdev; 2007 struct spdk_bdev_io *bdev_io; 2008 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 2009 uint64_t len; 2010 bool split_request = false; 2011 2012 if (!desc->write) { 2013 return -EBADF; 2014 } 2015 2016 if (!spdk_bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) { 2017 return -EINVAL; 2018 } 2019 2020 bdev_io = spdk_bdev_get_io(channel); 2021 2022 if (!bdev_io) { 2023 return -ENOMEM; 2024 } 2025 2026 bdev_io->internal.ch = channel; 2027 bdev_io->internal.desc = desc; 2028 bdev_io->u.bdev.offset_blocks = offset_blocks; 2029 2030 if (_spdk_bdev_io_type_supported(bdev, SPDK_BDEV_IO_TYPE_WRITE_ZEROES)) { 2031 bdev_io->type = SPDK_BDEV_IO_TYPE_WRITE_ZEROES; 2032 bdev_io->u.bdev.num_blocks = num_blocks; 2033 bdev_io->u.bdev.iovs = NULL; 2034 bdev_io->u.bdev.iovcnt = 0; 2035 2036 } else if (_spdk_bdev_io_type_supported(bdev, SPDK_BDEV_IO_TYPE_WRITE)) { 2037 assert(spdk_bdev_get_block_size(bdev) <= ZERO_BUFFER_SIZE); 2038 2039 len = spdk_bdev_get_block_size(bdev) * num_blocks; 2040 2041 if (len > ZERO_BUFFER_SIZE) { 2042 split_request = true; 2043 len = ZERO_BUFFER_SIZE; 2044 } 2045 2046 bdev_io->type = SPDK_BDEV_IO_TYPE_WRITE; 2047 bdev_io->u.bdev.iovs = &bdev_io->iov; 2048 bdev_io->u.bdev.iovs[0].iov_base = g_bdev_mgr.zero_buffer; 2049 bdev_io->u.bdev.iovs[0].iov_len = len; 2050 bdev_io->u.bdev.iovcnt = 1; 2051 bdev_io->u.bdev.num_blocks = len / spdk_bdev_get_block_size(bdev); 2052 bdev_io->u.bdev.split_remaining_num_blocks = num_blocks - bdev_io->u.bdev.num_blocks; 2053 bdev_io->u.bdev.split_current_offset_blocks = offset_blocks + bdev_io->u.bdev.num_blocks; 2054 } else { 2055 spdk_bdev_free_io(bdev_io); 2056 return -ENOTSUP; 2057 } 2058 2059 if (split_request) { 2060 bdev_io->u.bdev.stored_user_cb = cb; 2061 spdk_bdev_io_init(bdev_io, bdev, cb_arg, spdk_bdev_write_zeroes_split); 2062 } else { 2063 spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb); 2064 } 2065 spdk_bdev_io_submit(bdev_io); 2066 return 0; 2067 } 2068 2069 int 2070 spdk_bdev_unmap(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 2071 uint64_t offset, uint64_t nbytes, 2072 spdk_bdev_io_completion_cb cb, void *cb_arg) 2073 { 2074 uint64_t offset_blocks, num_blocks; 2075 2076 if (spdk_bdev_bytes_to_blocks(desc->bdev, offset, &offset_blocks, nbytes, &num_blocks) != 0) { 2077 return -EINVAL; 2078 } 2079 2080 return spdk_bdev_unmap_blocks(desc, ch, offset_blocks, num_blocks, cb, cb_arg); 2081 } 2082 2083 int 2084 spdk_bdev_unmap_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 2085 uint64_t offset_blocks, uint64_t num_blocks, 2086 spdk_bdev_io_completion_cb cb, void *cb_arg) 2087 { 2088 struct spdk_bdev *bdev = desc->bdev; 2089 struct spdk_bdev_io *bdev_io; 2090 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 2091 2092 if (!desc->write) { 2093 return -EBADF; 2094 } 2095 2096 if (!spdk_bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) { 2097 return -EINVAL; 2098 } 2099 2100 if (num_blocks == 0) { 2101 SPDK_ERRLOG("Can't unmap 0 bytes\n"); 2102 return -EINVAL; 2103 } 2104 2105 bdev_io = spdk_bdev_get_io(channel); 2106 if (!bdev_io) { 2107 return -ENOMEM; 2108 } 2109 2110 bdev_io->internal.ch = channel; 2111 bdev_io->internal.desc = desc; 2112 bdev_io->type = SPDK_BDEV_IO_TYPE_UNMAP; 2113 2114 bdev_io->u.bdev.iovs = &bdev_io->iov; 2115 bdev_io->u.bdev.iovs[0].iov_base = NULL; 2116 bdev_io->u.bdev.iovs[0].iov_len = 0; 2117 bdev_io->u.bdev.iovcnt = 1; 2118 2119 bdev_io->u.bdev.offset_blocks = offset_blocks; 2120 bdev_io->u.bdev.num_blocks = num_blocks; 2121 spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb); 2122 2123 spdk_bdev_io_submit(bdev_io); 2124 return 0; 2125 } 2126 2127 int 2128 spdk_bdev_flush(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 2129 uint64_t offset, uint64_t length, 2130 spdk_bdev_io_completion_cb cb, void *cb_arg) 2131 { 2132 uint64_t offset_blocks, num_blocks; 2133 2134 if (spdk_bdev_bytes_to_blocks(desc->bdev, offset, &offset_blocks, length, &num_blocks) != 0) { 2135 return -EINVAL; 2136 } 2137 2138 return spdk_bdev_flush_blocks(desc, ch, offset_blocks, num_blocks, cb, cb_arg); 2139 } 2140 2141 int 2142 spdk_bdev_flush_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 2143 uint64_t offset_blocks, uint64_t num_blocks, 2144 spdk_bdev_io_completion_cb cb, void *cb_arg) 2145 { 2146 struct spdk_bdev *bdev = desc->bdev; 2147 struct spdk_bdev_io *bdev_io; 2148 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 2149 2150 if (!desc->write) { 2151 return -EBADF; 2152 } 2153 2154 if (!spdk_bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) { 2155 return -EINVAL; 2156 } 2157 2158 bdev_io = spdk_bdev_get_io(channel); 2159 if (!bdev_io) { 2160 return -ENOMEM; 2161 } 2162 2163 bdev_io->internal.ch = channel; 2164 bdev_io->internal.desc = desc; 2165 bdev_io->type = SPDK_BDEV_IO_TYPE_FLUSH; 2166 bdev_io->u.bdev.iovs = NULL; 2167 bdev_io->u.bdev.iovcnt = 0; 2168 bdev_io->u.bdev.offset_blocks = offset_blocks; 2169 bdev_io->u.bdev.num_blocks = num_blocks; 2170 spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb); 2171 2172 spdk_bdev_io_submit(bdev_io); 2173 return 0; 2174 } 2175 2176 static void 2177 _spdk_bdev_reset_dev(struct spdk_io_channel_iter *i, int status) 2178 { 2179 struct spdk_bdev_channel *ch = spdk_io_channel_iter_get_ctx(i); 2180 struct spdk_bdev_io *bdev_io; 2181 2182 bdev_io = TAILQ_FIRST(&ch->queued_resets); 2183 TAILQ_REMOVE(&ch->queued_resets, bdev_io, internal.link); 2184 spdk_bdev_io_submit_reset(bdev_io); 2185 } 2186 2187 static void 2188 _spdk_bdev_reset_freeze_channel(struct spdk_io_channel_iter *i) 2189 { 2190 struct spdk_io_channel *ch; 2191 struct spdk_bdev_channel *channel; 2192 struct spdk_bdev_mgmt_channel *mgmt_channel; 2193 struct spdk_bdev_shared_resource *shared_resource; 2194 bdev_io_tailq_t tmp_queued; 2195 2196 TAILQ_INIT(&tmp_queued); 2197 2198 ch = spdk_io_channel_iter_get_channel(i); 2199 channel = spdk_io_channel_get_ctx(ch); 2200 shared_resource = channel->shared_resource; 2201 mgmt_channel = shared_resource->mgmt_ch; 2202 2203 channel->flags |= BDEV_CH_RESET_IN_PROGRESS; 2204 2205 if ((channel->flags & BDEV_CH_QOS_ENABLED) != 0) { 2206 /* The QoS object is always valid and readable while 2207 * the channel flag is set, so the lock here should not 2208 * be necessary. We're not in the fast path though, so 2209 * just take it anyway. */ 2210 pthread_mutex_lock(&channel->bdev->internal.mutex); 2211 if (channel->bdev->internal.qos->ch == channel) { 2212 TAILQ_SWAP(&channel->bdev->internal.qos->queued, &tmp_queued, spdk_bdev_io, internal.link); 2213 } 2214 pthread_mutex_unlock(&channel->bdev->internal.mutex); 2215 } 2216 2217 _spdk_bdev_abort_queued_io(&shared_resource->nomem_io, channel); 2218 _spdk_bdev_abort_buf_io(&mgmt_channel->need_buf_small, channel); 2219 _spdk_bdev_abort_buf_io(&mgmt_channel->need_buf_large, channel); 2220 _spdk_bdev_abort_queued_io(&tmp_queued, channel); 2221 2222 spdk_for_each_channel_continue(i, 0); 2223 } 2224 2225 static void 2226 _spdk_bdev_start_reset(void *ctx) 2227 { 2228 struct spdk_bdev_channel *ch = ctx; 2229 2230 spdk_for_each_channel(__bdev_to_io_dev(ch->bdev), _spdk_bdev_reset_freeze_channel, 2231 ch, _spdk_bdev_reset_dev); 2232 } 2233 2234 static void 2235 _spdk_bdev_channel_start_reset(struct spdk_bdev_channel *ch) 2236 { 2237 struct spdk_bdev *bdev = ch->bdev; 2238 2239 assert(!TAILQ_EMPTY(&ch->queued_resets)); 2240 2241 pthread_mutex_lock(&bdev->internal.mutex); 2242 if (bdev->internal.reset_in_progress == NULL) { 2243 bdev->internal.reset_in_progress = TAILQ_FIRST(&ch->queued_resets); 2244 /* 2245 * Take a channel reference for the target bdev for the life of this 2246 * reset. This guards against the channel getting destroyed while 2247 * spdk_for_each_channel() calls related to this reset IO are in 2248 * progress. We will release the reference when this reset is 2249 * completed. 2250 */ 2251 bdev->internal.reset_in_progress->u.reset.ch_ref = spdk_get_io_channel(__bdev_to_io_dev(bdev)); 2252 _spdk_bdev_start_reset(ch); 2253 } 2254 pthread_mutex_unlock(&bdev->internal.mutex); 2255 } 2256 2257 int 2258 spdk_bdev_reset(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 2259 spdk_bdev_io_completion_cb cb, void *cb_arg) 2260 { 2261 struct spdk_bdev *bdev = desc->bdev; 2262 struct spdk_bdev_io *bdev_io; 2263 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 2264 2265 bdev_io = spdk_bdev_get_io(channel); 2266 if (!bdev_io) { 2267 return -ENOMEM; 2268 } 2269 2270 bdev_io->internal.ch = channel; 2271 bdev_io->internal.desc = desc; 2272 bdev_io->type = SPDK_BDEV_IO_TYPE_RESET; 2273 bdev_io->u.reset.ch_ref = NULL; 2274 spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb); 2275 2276 pthread_mutex_lock(&bdev->internal.mutex); 2277 TAILQ_INSERT_TAIL(&channel->queued_resets, bdev_io, internal.link); 2278 pthread_mutex_unlock(&bdev->internal.mutex); 2279 2280 _spdk_bdev_channel_start_reset(channel); 2281 2282 return 0; 2283 } 2284 2285 void 2286 spdk_bdev_get_io_stat(struct spdk_bdev *bdev, struct spdk_io_channel *ch, 2287 struct spdk_bdev_io_stat *stat) 2288 { 2289 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 2290 2291 *stat = channel->stat; 2292 } 2293 2294 static void 2295 _spdk_bdev_get_device_stat_done(struct spdk_io_channel_iter *i, int status) 2296 { 2297 void *io_device = spdk_io_channel_iter_get_io_device(i); 2298 struct spdk_bdev_iostat_ctx *bdev_iostat_ctx = spdk_io_channel_iter_get_ctx(i); 2299 2300 bdev_iostat_ctx->cb(__bdev_from_io_dev(io_device), bdev_iostat_ctx->stat, 2301 bdev_iostat_ctx->cb_arg, 0); 2302 free(bdev_iostat_ctx); 2303 } 2304 2305 static void 2306 _spdk_bdev_get_each_channel_stat(struct spdk_io_channel_iter *i) 2307 { 2308 struct spdk_bdev_iostat_ctx *bdev_iostat_ctx = spdk_io_channel_iter_get_ctx(i); 2309 struct spdk_io_channel *ch = spdk_io_channel_iter_get_channel(i); 2310 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 2311 2312 _spdk_bdev_io_stat_add(bdev_iostat_ctx->stat, &channel->stat); 2313 spdk_for_each_channel_continue(i, 0); 2314 } 2315 2316 void 2317 spdk_bdev_get_device_stat(struct spdk_bdev *bdev, struct spdk_bdev_io_stat *stat, 2318 spdk_bdev_get_device_stat_cb cb, void *cb_arg) 2319 { 2320 struct spdk_bdev_iostat_ctx *bdev_iostat_ctx; 2321 2322 assert(bdev != NULL); 2323 assert(stat != NULL); 2324 assert(cb != NULL); 2325 2326 bdev_iostat_ctx = calloc(1, sizeof(struct spdk_bdev_iostat_ctx)); 2327 if (bdev_iostat_ctx == NULL) { 2328 SPDK_ERRLOG("Unable to allocate memory for spdk_bdev_iostat_ctx\n"); 2329 cb(bdev, stat, cb_arg, -ENOMEM); 2330 return; 2331 } 2332 2333 bdev_iostat_ctx->stat = stat; 2334 bdev_iostat_ctx->cb = cb; 2335 bdev_iostat_ctx->cb_arg = cb_arg; 2336 2337 /* Start with the statistics from previously deleted channels. */ 2338 pthread_mutex_lock(&bdev->internal.mutex); 2339 _spdk_bdev_io_stat_add(bdev_iostat_ctx->stat, &bdev->internal.stat); 2340 pthread_mutex_unlock(&bdev->internal.mutex); 2341 2342 /* Then iterate and add the statistics from each existing channel. */ 2343 spdk_for_each_channel(__bdev_to_io_dev(bdev), 2344 _spdk_bdev_get_each_channel_stat, 2345 bdev_iostat_ctx, 2346 _spdk_bdev_get_device_stat_done); 2347 } 2348 2349 int 2350 spdk_bdev_nvme_admin_passthru(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 2351 const struct spdk_nvme_cmd *cmd, void *buf, size_t nbytes, 2352 spdk_bdev_io_completion_cb cb, void *cb_arg) 2353 { 2354 struct spdk_bdev *bdev = desc->bdev; 2355 struct spdk_bdev_io *bdev_io; 2356 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 2357 2358 if (!desc->write) { 2359 return -EBADF; 2360 } 2361 2362 bdev_io = spdk_bdev_get_io(channel); 2363 if (!bdev_io) { 2364 return -ENOMEM; 2365 } 2366 2367 bdev_io->internal.ch = channel; 2368 bdev_io->internal.desc = desc; 2369 bdev_io->type = SPDK_BDEV_IO_TYPE_NVME_ADMIN; 2370 bdev_io->u.nvme_passthru.cmd = *cmd; 2371 bdev_io->u.nvme_passthru.buf = buf; 2372 bdev_io->u.nvme_passthru.nbytes = nbytes; 2373 bdev_io->u.nvme_passthru.md_buf = NULL; 2374 bdev_io->u.nvme_passthru.md_len = 0; 2375 2376 spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb); 2377 2378 spdk_bdev_io_submit(bdev_io); 2379 return 0; 2380 } 2381 2382 int 2383 spdk_bdev_nvme_io_passthru(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 2384 const struct spdk_nvme_cmd *cmd, void *buf, size_t nbytes, 2385 spdk_bdev_io_completion_cb cb, void *cb_arg) 2386 { 2387 struct spdk_bdev *bdev = desc->bdev; 2388 struct spdk_bdev_io *bdev_io; 2389 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 2390 2391 if (!desc->write) { 2392 /* 2393 * Do not try to parse the NVMe command - we could maybe use bits in the opcode 2394 * to easily determine if the command is a read or write, but for now just 2395 * do not allow io_passthru with a read-only descriptor. 2396 */ 2397 return -EBADF; 2398 } 2399 2400 bdev_io = spdk_bdev_get_io(channel); 2401 if (!bdev_io) { 2402 return -ENOMEM; 2403 } 2404 2405 bdev_io->internal.ch = channel; 2406 bdev_io->internal.desc = desc; 2407 bdev_io->type = SPDK_BDEV_IO_TYPE_NVME_IO; 2408 bdev_io->u.nvme_passthru.cmd = *cmd; 2409 bdev_io->u.nvme_passthru.buf = buf; 2410 bdev_io->u.nvme_passthru.nbytes = nbytes; 2411 bdev_io->u.nvme_passthru.md_buf = NULL; 2412 bdev_io->u.nvme_passthru.md_len = 0; 2413 2414 spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb); 2415 2416 spdk_bdev_io_submit(bdev_io); 2417 return 0; 2418 } 2419 2420 int 2421 spdk_bdev_nvme_io_passthru_md(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 2422 const struct spdk_nvme_cmd *cmd, void *buf, size_t nbytes, void *md_buf, size_t md_len, 2423 spdk_bdev_io_completion_cb cb, void *cb_arg) 2424 { 2425 struct spdk_bdev *bdev = desc->bdev; 2426 struct spdk_bdev_io *bdev_io; 2427 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 2428 2429 if (!desc->write) { 2430 /* 2431 * Do not try to parse the NVMe command - we could maybe use bits in the opcode 2432 * to easily determine if the command is a read or write, but for now just 2433 * do not allow io_passthru with a read-only descriptor. 2434 */ 2435 return -EBADF; 2436 } 2437 2438 bdev_io = spdk_bdev_get_io(channel); 2439 if (!bdev_io) { 2440 return -ENOMEM; 2441 } 2442 2443 bdev_io->internal.ch = channel; 2444 bdev_io->internal.desc = desc; 2445 bdev_io->type = SPDK_BDEV_IO_TYPE_NVME_IO_MD; 2446 bdev_io->u.nvme_passthru.cmd = *cmd; 2447 bdev_io->u.nvme_passthru.buf = buf; 2448 bdev_io->u.nvme_passthru.nbytes = nbytes; 2449 bdev_io->u.nvme_passthru.md_buf = md_buf; 2450 bdev_io->u.nvme_passthru.md_len = md_len; 2451 2452 spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb); 2453 2454 spdk_bdev_io_submit(bdev_io); 2455 return 0; 2456 } 2457 2458 int 2459 spdk_bdev_queue_io_wait(struct spdk_bdev *bdev, struct spdk_io_channel *ch, 2460 struct spdk_bdev_io_wait_entry *entry) 2461 { 2462 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 2463 struct spdk_bdev_mgmt_channel *mgmt_ch = channel->shared_resource->mgmt_ch; 2464 2465 if (bdev != entry->bdev) { 2466 SPDK_ERRLOG("bdevs do not match\n"); 2467 return -EINVAL; 2468 } 2469 2470 if (mgmt_ch->per_thread_cache_count > 0) { 2471 SPDK_ERRLOG("Cannot queue io_wait if spdk_bdev_io available in per-thread cache\n"); 2472 return -EINVAL; 2473 } 2474 2475 TAILQ_INSERT_TAIL(&mgmt_ch->io_wait_queue, entry, link); 2476 return 0; 2477 } 2478 2479 static void 2480 _spdk_bdev_ch_retry_io(struct spdk_bdev_channel *bdev_ch) 2481 { 2482 struct spdk_bdev *bdev = bdev_ch->bdev; 2483 struct spdk_bdev_shared_resource *shared_resource = bdev_ch->shared_resource; 2484 struct spdk_bdev_io *bdev_io; 2485 2486 if (shared_resource->io_outstanding > shared_resource->nomem_threshold) { 2487 /* 2488 * Allow some more I/O to complete before retrying the nomem_io queue. 2489 * Some drivers (such as nvme) cannot immediately take a new I/O in 2490 * the context of a completion, because the resources for the I/O are 2491 * not released until control returns to the bdev poller. Also, we 2492 * may require several small I/O to complete before a larger I/O 2493 * (that requires splitting) can be submitted. 2494 */ 2495 return; 2496 } 2497 2498 while (!TAILQ_EMPTY(&shared_resource->nomem_io)) { 2499 bdev_io = TAILQ_FIRST(&shared_resource->nomem_io); 2500 TAILQ_REMOVE(&shared_resource->nomem_io, bdev_io, internal.link); 2501 bdev_io->internal.ch->io_outstanding++; 2502 shared_resource->io_outstanding++; 2503 bdev_io->internal.status = SPDK_BDEV_IO_STATUS_PENDING; 2504 bdev->fn_table->submit_request(bdev_io->internal.ch->channel, bdev_io); 2505 if (bdev_io->internal.status == SPDK_BDEV_IO_STATUS_NOMEM) { 2506 break; 2507 } 2508 } 2509 } 2510 2511 static inline void 2512 _spdk_bdev_io_complete(void *ctx) 2513 { 2514 struct spdk_bdev_io *bdev_io = ctx; 2515 2516 if (spdk_unlikely(bdev_io->internal.in_submit_request || bdev_io->internal.io_submit_ch)) { 2517 /* 2518 * Send the completion to the thread that originally submitted the I/O, 2519 * which may not be the current thread in the case of QoS. 2520 */ 2521 if (bdev_io->internal.io_submit_ch) { 2522 bdev_io->internal.ch = bdev_io->internal.io_submit_ch; 2523 bdev_io->internal.io_submit_ch = NULL; 2524 } 2525 2526 /* 2527 * Defer completion to avoid potential infinite recursion if the 2528 * user's completion callback issues a new I/O. 2529 */ 2530 spdk_thread_send_msg(spdk_io_channel_get_thread(bdev_io->internal.ch->channel), 2531 _spdk_bdev_io_complete, bdev_io); 2532 return; 2533 } 2534 2535 if (bdev_io->internal.status == SPDK_BDEV_IO_STATUS_SUCCESS) { 2536 switch (bdev_io->type) { 2537 case SPDK_BDEV_IO_TYPE_READ: 2538 bdev_io->internal.ch->stat.bytes_read += bdev_io->u.bdev.num_blocks * bdev_io->bdev->blocklen; 2539 bdev_io->internal.ch->stat.num_read_ops++; 2540 bdev_io->internal.ch->stat.read_latency_ticks += (spdk_get_ticks() - bdev_io->internal.submit_tsc); 2541 break; 2542 case SPDK_BDEV_IO_TYPE_WRITE: 2543 bdev_io->internal.ch->stat.bytes_written += bdev_io->u.bdev.num_blocks * bdev_io->bdev->blocklen; 2544 bdev_io->internal.ch->stat.num_write_ops++; 2545 bdev_io->internal.ch->stat.write_latency_ticks += (spdk_get_ticks() - bdev_io->internal.submit_tsc); 2546 break; 2547 default: 2548 break; 2549 } 2550 } 2551 2552 #ifdef SPDK_CONFIG_VTUNE 2553 uint64_t now_tsc = spdk_get_ticks(); 2554 if (now_tsc > (bdev_io->internal.ch->start_tsc + bdev_io->internal.ch->interval_tsc)) { 2555 uint64_t data[5]; 2556 2557 data[0] = bdev_io->internal.ch->stat.num_read_ops - bdev_io->internal.ch->prev_stat.num_read_ops; 2558 data[1] = bdev_io->internal.ch->stat.bytes_read - bdev_io->internal.ch->prev_stat.bytes_read; 2559 data[2] = bdev_io->internal.ch->stat.num_write_ops - bdev_io->internal.ch->prev_stat.num_write_ops; 2560 data[3] = bdev_io->internal.ch->stat.bytes_written - bdev_io->internal.ch->prev_stat.bytes_written; 2561 data[4] = bdev_io->bdev->fn_table->get_spin_time ? 2562 bdev_io->bdev->fn_table->get_spin_time(bdev_io->internal.ch->channel) : 0; 2563 2564 __itt_metadata_add(g_bdev_mgr.domain, __itt_null, bdev_io->internal.ch->handle, 2565 __itt_metadata_u64, 5, data); 2566 2567 bdev_io->internal.ch->prev_stat = bdev_io->internal.ch->stat; 2568 bdev_io->internal.ch->start_tsc = now_tsc; 2569 } 2570 #endif 2571 2572 assert(bdev_io->internal.cb != NULL); 2573 assert(spdk_get_thread() == spdk_io_channel_get_thread(bdev_io->internal.ch->channel)); 2574 2575 bdev_io->internal.cb(bdev_io, bdev_io->internal.status == SPDK_BDEV_IO_STATUS_SUCCESS, 2576 bdev_io->internal.caller_ctx); 2577 } 2578 2579 static void 2580 _spdk_bdev_reset_complete(struct spdk_io_channel_iter *i, int status) 2581 { 2582 struct spdk_bdev_io *bdev_io = spdk_io_channel_iter_get_ctx(i); 2583 2584 if (bdev_io->u.reset.ch_ref != NULL) { 2585 spdk_put_io_channel(bdev_io->u.reset.ch_ref); 2586 bdev_io->u.reset.ch_ref = NULL; 2587 } 2588 2589 _spdk_bdev_io_complete(bdev_io); 2590 } 2591 2592 static void 2593 _spdk_bdev_unfreeze_channel(struct spdk_io_channel_iter *i) 2594 { 2595 struct spdk_io_channel *_ch = spdk_io_channel_iter_get_channel(i); 2596 struct spdk_bdev_channel *ch = spdk_io_channel_get_ctx(_ch); 2597 2598 ch->flags &= ~BDEV_CH_RESET_IN_PROGRESS; 2599 if (!TAILQ_EMPTY(&ch->queued_resets)) { 2600 _spdk_bdev_channel_start_reset(ch); 2601 } 2602 2603 spdk_for_each_channel_continue(i, 0); 2604 } 2605 2606 void 2607 spdk_bdev_io_complete(struct spdk_bdev_io *bdev_io, enum spdk_bdev_io_status status) 2608 { 2609 struct spdk_bdev *bdev = bdev_io->bdev; 2610 struct spdk_bdev_channel *bdev_ch = bdev_io->internal.ch; 2611 struct spdk_bdev_shared_resource *shared_resource = bdev_ch->shared_resource; 2612 2613 bdev_io->internal.status = status; 2614 2615 if (spdk_unlikely(bdev_io->type == SPDK_BDEV_IO_TYPE_RESET)) { 2616 bool unlock_channels = false; 2617 2618 if (status == SPDK_BDEV_IO_STATUS_NOMEM) { 2619 SPDK_ERRLOG("NOMEM returned for reset\n"); 2620 } 2621 pthread_mutex_lock(&bdev->internal.mutex); 2622 if (bdev_io == bdev->internal.reset_in_progress) { 2623 bdev->internal.reset_in_progress = NULL; 2624 unlock_channels = true; 2625 } 2626 pthread_mutex_unlock(&bdev->internal.mutex); 2627 2628 if (unlock_channels) { 2629 spdk_for_each_channel(__bdev_to_io_dev(bdev), _spdk_bdev_unfreeze_channel, 2630 bdev_io, _spdk_bdev_reset_complete); 2631 return; 2632 } 2633 } else { 2634 assert(bdev_ch->io_outstanding > 0); 2635 assert(shared_resource->io_outstanding > 0); 2636 bdev_ch->io_outstanding--; 2637 shared_resource->io_outstanding--; 2638 2639 if (spdk_unlikely(status == SPDK_BDEV_IO_STATUS_NOMEM)) { 2640 TAILQ_INSERT_HEAD(&shared_resource->nomem_io, bdev_io, internal.link); 2641 /* 2642 * Wait for some of the outstanding I/O to complete before we 2643 * retry any of the nomem_io. Normally we will wait for 2644 * NOMEM_THRESHOLD_COUNT I/O to complete but for low queue 2645 * depth channels we will instead wait for half to complete. 2646 */ 2647 shared_resource->nomem_threshold = spdk_max((int64_t)shared_resource->io_outstanding / 2, 2648 (int64_t)shared_resource->io_outstanding - NOMEM_THRESHOLD_COUNT); 2649 return; 2650 } 2651 2652 if (spdk_unlikely(!TAILQ_EMPTY(&shared_resource->nomem_io))) { 2653 _spdk_bdev_ch_retry_io(bdev_ch); 2654 } 2655 } 2656 2657 _spdk_bdev_io_complete(bdev_io); 2658 } 2659 2660 void 2661 spdk_bdev_io_complete_scsi_status(struct spdk_bdev_io *bdev_io, enum spdk_scsi_status sc, 2662 enum spdk_scsi_sense sk, uint8_t asc, uint8_t ascq) 2663 { 2664 if (sc == SPDK_SCSI_STATUS_GOOD) { 2665 bdev_io->internal.status = SPDK_BDEV_IO_STATUS_SUCCESS; 2666 } else { 2667 bdev_io->internal.status = SPDK_BDEV_IO_STATUS_SCSI_ERROR; 2668 bdev_io->internal.error.scsi.sc = sc; 2669 bdev_io->internal.error.scsi.sk = sk; 2670 bdev_io->internal.error.scsi.asc = asc; 2671 bdev_io->internal.error.scsi.ascq = ascq; 2672 } 2673 2674 spdk_bdev_io_complete(bdev_io, bdev_io->internal.status); 2675 } 2676 2677 void 2678 spdk_bdev_io_get_scsi_status(const struct spdk_bdev_io *bdev_io, 2679 int *sc, int *sk, int *asc, int *ascq) 2680 { 2681 assert(sc != NULL); 2682 assert(sk != NULL); 2683 assert(asc != NULL); 2684 assert(ascq != NULL); 2685 2686 switch (bdev_io->internal.status) { 2687 case SPDK_BDEV_IO_STATUS_SUCCESS: 2688 *sc = SPDK_SCSI_STATUS_GOOD; 2689 *sk = SPDK_SCSI_SENSE_NO_SENSE; 2690 *asc = SPDK_SCSI_ASC_NO_ADDITIONAL_SENSE; 2691 *ascq = SPDK_SCSI_ASCQ_CAUSE_NOT_REPORTABLE; 2692 break; 2693 case SPDK_BDEV_IO_STATUS_NVME_ERROR: 2694 spdk_scsi_nvme_translate(bdev_io, sc, sk, asc, ascq); 2695 break; 2696 case SPDK_BDEV_IO_STATUS_SCSI_ERROR: 2697 *sc = bdev_io->internal.error.scsi.sc; 2698 *sk = bdev_io->internal.error.scsi.sk; 2699 *asc = bdev_io->internal.error.scsi.asc; 2700 *ascq = bdev_io->internal.error.scsi.ascq; 2701 break; 2702 default: 2703 *sc = SPDK_SCSI_STATUS_CHECK_CONDITION; 2704 *sk = SPDK_SCSI_SENSE_ABORTED_COMMAND; 2705 *asc = SPDK_SCSI_ASC_NO_ADDITIONAL_SENSE; 2706 *ascq = SPDK_SCSI_ASCQ_CAUSE_NOT_REPORTABLE; 2707 break; 2708 } 2709 } 2710 2711 void 2712 spdk_bdev_io_complete_nvme_status(struct spdk_bdev_io *bdev_io, int sct, int sc) 2713 { 2714 if (sct == SPDK_NVME_SCT_GENERIC && sc == SPDK_NVME_SC_SUCCESS) { 2715 bdev_io->internal.status = SPDK_BDEV_IO_STATUS_SUCCESS; 2716 } else { 2717 bdev_io->internal.error.nvme.sct = sct; 2718 bdev_io->internal.error.nvme.sc = sc; 2719 bdev_io->internal.status = SPDK_BDEV_IO_STATUS_NVME_ERROR; 2720 } 2721 2722 spdk_bdev_io_complete(bdev_io, bdev_io->internal.status); 2723 } 2724 2725 void 2726 spdk_bdev_io_get_nvme_status(const struct spdk_bdev_io *bdev_io, int *sct, int *sc) 2727 { 2728 assert(sct != NULL); 2729 assert(sc != NULL); 2730 2731 if (bdev_io->internal.status == SPDK_BDEV_IO_STATUS_NVME_ERROR) { 2732 *sct = bdev_io->internal.error.nvme.sct; 2733 *sc = bdev_io->internal.error.nvme.sc; 2734 } else if (bdev_io->internal.status == SPDK_BDEV_IO_STATUS_SUCCESS) { 2735 *sct = SPDK_NVME_SCT_GENERIC; 2736 *sc = SPDK_NVME_SC_SUCCESS; 2737 } else { 2738 *sct = SPDK_NVME_SCT_GENERIC; 2739 *sc = SPDK_NVME_SC_INTERNAL_DEVICE_ERROR; 2740 } 2741 } 2742 2743 struct spdk_thread * 2744 spdk_bdev_io_get_thread(struct spdk_bdev_io *bdev_io) 2745 { 2746 return spdk_io_channel_get_thread(bdev_io->internal.ch->channel); 2747 } 2748 2749 static void 2750 _spdk_bdev_qos_config_type(struct spdk_bdev *bdev, uint64_t qos_set, 2751 enum spdk_bdev_qos_type qos_type) 2752 { 2753 uint64_t min_qos_set = 0; 2754 2755 switch (qos_type) { 2756 case SPDK_BDEV_QOS_RW_IOPS_RATE_LIMIT: 2757 min_qos_set = SPDK_BDEV_QOS_MIN_IOS_PER_SEC; 2758 break; 2759 case SPDK_BDEV_QOS_RW_BYTEPS_RATE_LIMIT: 2760 min_qos_set = SPDK_BDEV_QOS_MIN_BW_IN_MB_PER_SEC; 2761 break; 2762 default: 2763 SPDK_ERRLOG("Unsupported QoS type.\n"); 2764 return; 2765 } 2766 2767 if (qos_set % min_qos_set) { 2768 SPDK_ERRLOG("Assigned QoS %" PRIu64 " on bdev %s is not multiple of %lu\n", 2769 qos_set, bdev->name, min_qos_set); 2770 SPDK_ERRLOG("Failed to enable QoS on this bdev %s\n", bdev->name); 2771 return; 2772 } 2773 2774 if (!bdev->internal.qos) { 2775 bdev->internal.qos = calloc(1, sizeof(*bdev->internal.qos)); 2776 if (!bdev->internal.qos) { 2777 SPDK_ERRLOG("Unable to allocate memory for QoS tracking\n"); 2778 return; 2779 } 2780 } 2781 2782 switch (qos_type) { 2783 case SPDK_BDEV_QOS_RW_IOPS_RATE_LIMIT: 2784 bdev->internal.qos->iops_rate_limit = qos_set; 2785 break; 2786 case SPDK_BDEV_QOS_RW_BYTEPS_RATE_LIMIT: 2787 bdev->internal.qos->byte_rate_limit = qos_set * 1024 * 1024; 2788 break; 2789 default: 2790 break; 2791 } 2792 2793 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Bdev:%s QoS type:%d set:%lu\n", 2794 bdev->name, qos_type, qos_set); 2795 2796 return; 2797 } 2798 2799 static void 2800 _spdk_bdev_qos_config(struct spdk_bdev *bdev) 2801 { 2802 struct spdk_conf_section *sp = NULL; 2803 const char *val = NULL; 2804 uint64_t qos_set = 0; 2805 int i = 0, j = 0; 2806 2807 sp = spdk_conf_find_section(NULL, "QoS"); 2808 if (!sp) { 2809 return; 2810 } 2811 2812 while (j < SPDK_BDEV_QOS_NUM_TYPES) { 2813 i = 0; 2814 while (true) { 2815 val = spdk_conf_section_get_nmval(sp, qos_type_str[j], i, 0); 2816 if (!val) { 2817 break; 2818 } 2819 2820 if (strcmp(bdev->name, val) != 0) { 2821 i++; 2822 continue; 2823 } 2824 2825 val = spdk_conf_section_get_nmval(sp, qos_type_str[j], i, 1); 2826 if (val) { 2827 qos_set = strtoull(val, NULL, 10); 2828 _spdk_bdev_qos_config_type(bdev, qos_set, j); 2829 } 2830 2831 break; 2832 } 2833 2834 j++; 2835 } 2836 2837 return; 2838 } 2839 2840 static int 2841 spdk_bdev_init(struct spdk_bdev *bdev) 2842 { 2843 assert(bdev->module != NULL); 2844 2845 if (!bdev->name) { 2846 SPDK_ERRLOG("Bdev name is NULL\n"); 2847 return -EINVAL; 2848 } 2849 2850 if (spdk_bdev_get_by_name(bdev->name)) { 2851 SPDK_ERRLOG("Bdev name:%s already exists\n", bdev->name); 2852 return -EEXIST; 2853 } 2854 2855 bdev->internal.status = SPDK_BDEV_STATUS_READY; 2856 bdev->internal.measured_queue_depth = UINT64_MAX; 2857 2858 TAILQ_INIT(&bdev->internal.open_descs); 2859 2860 TAILQ_INIT(&bdev->aliases); 2861 2862 bdev->internal.reset_in_progress = NULL; 2863 2864 _spdk_bdev_qos_config(bdev); 2865 2866 spdk_io_device_register(__bdev_to_io_dev(bdev), 2867 spdk_bdev_channel_create, spdk_bdev_channel_destroy, 2868 sizeof(struct spdk_bdev_channel)); 2869 2870 pthread_mutex_init(&bdev->internal.mutex, NULL); 2871 return 0; 2872 } 2873 2874 static void 2875 spdk_bdev_destroy_cb(void *io_device) 2876 { 2877 int rc; 2878 struct spdk_bdev *bdev; 2879 spdk_bdev_unregister_cb cb_fn; 2880 void *cb_arg; 2881 2882 bdev = __bdev_from_io_dev(io_device); 2883 cb_fn = bdev->internal.unregister_cb; 2884 cb_arg = bdev->internal.unregister_ctx; 2885 2886 rc = bdev->fn_table->destruct(bdev->ctxt); 2887 if (rc < 0) { 2888 SPDK_ERRLOG("destruct failed\n"); 2889 } 2890 if (rc <= 0 && cb_fn != NULL) { 2891 cb_fn(cb_arg, rc); 2892 } 2893 } 2894 2895 2896 static void 2897 spdk_bdev_fini(struct spdk_bdev *bdev) 2898 { 2899 pthread_mutex_destroy(&bdev->internal.mutex); 2900 2901 free(bdev->internal.qos); 2902 2903 spdk_io_device_unregister(__bdev_to_io_dev(bdev), spdk_bdev_destroy_cb); 2904 } 2905 2906 static void 2907 spdk_bdev_start(struct spdk_bdev *bdev) 2908 { 2909 struct spdk_bdev_module *module; 2910 uint32_t action; 2911 2912 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Inserting bdev %s into list\n", bdev->name); 2913 TAILQ_INSERT_TAIL(&g_bdev_mgr.bdevs, bdev, internal.link); 2914 2915 /* Examine configuration before initializing I/O */ 2916 TAILQ_FOREACH(module, &g_bdev_mgr.bdev_modules, internal.tailq) { 2917 if (module->examine_config) { 2918 action = module->internal.action_in_progress; 2919 module->internal.action_in_progress++; 2920 module->examine_config(bdev); 2921 if (action != module->internal.action_in_progress) { 2922 SPDK_ERRLOG("examine_config for module %s did not call spdk_bdev_module_examine_done()\n", 2923 module->name); 2924 } 2925 } 2926 } 2927 2928 if (bdev->internal.claim_module) { 2929 return; 2930 } 2931 2932 TAILQ_FOREACH(module, &g_bdev_mgr.bdev_modules, internal.tailq) { 2933 if (module->examine_disk) { 2934 module->internal.action_in_progress++; 2935 module->examine_disk(bdev); 2936 } 2937 } 2938 } 2939 2940 int 2941 spdk_bdev_register(struct spdk_bdev *bdev) 2942 { 2943 int rc = spdk_bdev_init(bdev); 2944 2945 if (rc == 0) { 2946 spdk_bdev_start(bdev); 2947 } 2948 2949 return rc; 2950 } 2951 2952 int 2953 spdk_vbdev_register(struct spdk_bdev *vbdev, struct spdk_bdev **base_bdevs, int base_bdev_count) 2954 { 2955 int rc; 2956 2957 rc = spdk_bdev_init(vbdev); 2958 if (rc) { 2959 return rc; 2960 } 2961 2962 spdk_bdev_start(vbdev); 2963 return 0; 2964 } 2965 2966 void 2967 spdk_bdev_destruct_done(struct spdk_bdev *bdev, int bdeverrno) 2968 { 2969 if (bdev->internal.unregister_cb != NULL) { 2970 bdev->internal.unregister_cb(bdev->internal.unregister_ctx, bdeverrno); 2971 } 2972 } 2973 2974 static void 2975 _remove_notify(void *arg) 2976 { 2977 struct spdk_bdev_desc *desc = arg; 2978 2979 desc->remove_cb(desc->remove_ctx); 2980 } 2981 2982 void 2983 spdk_bdev_unregister(struct spdk_bdev *bdev, spdk_bdev_unregister_cb cb_fn, void *cb_arg) 2984 { 2985 struct spdk_bdev_desc *desc, *tmp; 2986 bool do_destruct = true; 2987 struct spdk_thread *thread; 2988 2989 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Removing bdev %s from list\n", bdev->name); 2990 2991 thread = spdk_get_thread(); 2992 if (!thread) { 2993 /* The user called this from a non-SPDK thread. */ 2994 if (cb_fn != NULL) { 2995 cb_fn(cb_arg, -ENOTSUP); 2996 } 2997 return; 2998 } 2999 3000 pthread_mutex_lock(&bdev->internal.mutex); 3001 3002 bdev->internal.status = SPDK_BDEV_STATUS_REMOVING; 3003 bdev->internal.unregister_cb = cb_fn; 3004 bdev->internal.unregister_ctx = cb_arg; 3005 3006 TAILQ_FOREACH_SAFE(desc, &bdev->internal.open_descs, link, tmp) { 3007 if (desc->remove_cb) { 3008 do_destruct = false; 3009 /* 3010 * Defer invocation of the remove_cb to a separate message that will 3011 * run later on this thread. This ensures this context unwinds and 3012 * we don't recursively unregister this bdev again if the remove_cb 3013 * immediately closes its descriptor. 3014 */ 3015 if (!desc->remove_scheduled) { 3016 /* Avoid scheduling removal of the same descriptor multiple times. */ 3017 desc->remove_scheduled = true; 3018 spdk_thread_send_msg(thread, _remove_notify, desc); 3019 } 3020 } 3021 } 3022 3023 if (!do_destruct) { 3024 pthread_mutex_unlock(&bdev->internal.mutex); 3025 return; 3026 } 3027 3028 TAILQ_REMOVE(&g_bdev_mgr.bdevs, bdev, internal.link); 3029 pthread_mutex_unlock(&bdev->internal.mutex); 3030 3031 spdk_bdev_fini(bdev); 3032 } 3033 3034 int 3035 spdk_bdev_open(struct spdk_bdev *bdev, bool write, spdk_bdev_remove_cb_t remove_cb, 3036 void *remove_ctx, struct spdk_bdev_desc **_desc) 3037 { 3038 struct spdk_bdev_desc *desc; 3039 3040 desc = calloc(1, sizeof(*desc)); 3041 if (desc == NULL) { 3042 SPDK_ERRLOG("Failed to allocate memory for bdev descriptor\n"); 3043 return -ENOMEM; 3044 } 3045 3046 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Opening descriptor %p for bdev %s on thread %p\n", desc, bdev->name, 3047 spdk_get_thread()); 3048 3049 pthread_mutex_lock(&bdev->internal.mutex); 3050 3051 if (write && bdev->internal.claim_module) { 3052 SPDK_ERRLOG("Could not open %s - %s module already claimed it\n", 3053 bdev->name, bdev->internal.claim_module->name); 3054 free(desc); 3055 pthread_mutex_unlock(&bdev->internal.mutex); 3056 return -EPERM; 3057 } 3058 3059 TAILQ_INSERT_TAIL(&bdev->internal.open_descs, desc, link); 3060 3061 desc->bdev = bdev; 3062 desc->remove_cb = remove_cb; 3063 desc->remove_ctx = remove_ctx; 3064 desc->write = write; 3065 *_desc = desc; 3066 3067 pthread_mutex_unlock(&bdev->internal.mutex); 3068 3069 return 0; 3070 } 3071 3072 void 3073 spdk_bdev_close(struct spdk_bdev_desc *desc) 3074 { 3075 struct spdk_bdev *bdev = desc->bdev; 3076 bool do_unregister = false; 3077 3078 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Closing descriptor %p for bdev %s on thread %p\n", desc, bdev->name, 3079 spdk_get_thread()); 3080 3081 pthread_mutex_lock(&bdev->internal.mutex); 3082 3083 TAILQ_REMOVE(&bdev->internal.open_descs, desc, link); 3084 free(desc); 3085 3086 /* If no more descriptors, kill QoS channel */ 3087 if (bdev->internal.qos && TAILQ_EMPTY(&bdev->internal.open_descs)) { 3088 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Closed last descriptor for bdev %s on thread %p. Stopping QoS.\n", 3089 bdev->name, spdk_get_thread()); 3090 3091 if (spdk_bdev_qos_destroy(bdev)) { 3092 /* There isn't anything we can do to recover here. Just let the 3093 * old QoS poller keep running. The QoS handling won't change 3094 * cores when the user allocates a new channel, but it won't break. */ 3095 SPDK_ERRLOG("Unable to shut down QoS poller. It will continue running on the current thread.\n"); 3096 } 3097 } 3098 3099 spdk_bdev_set_qd_sampling_period(bdev, 0); 3100 3101 if (bdev->internal.status == SPDK_BDEV_STATUS_REMOVING && TAILQ_EMPTY(&bdev->internal.open_descs)) { 3102 do_unregister = true; 3103 } 3104 pthread_mutex_unlock(&bdev->internal.mutex); 3105 3106 if (do_unregister == true) { 3107 spdk_bdev_unregister(bdev, bdev->internal.unregister_cb, bdev->internal.unregister_ctx); 3108 } 3109 } 3110 3111 int 3112 spdk_bdev_module_claim_bdev(struct spdk_bdev *bdev, struct spdk_bdev_desc *desc, 3113 struct spdk_bdev_module *module) 3114 { 3115 if (bdev->internal.claim_module != NULL) { 3116 SPDK_ERRLOG("bdev %s already claimed by module %s\n", bdev->name, 3117 bdev->internal.claim_module->name); 3118 return -EPERM; 3119 } 3120 3121 if (desc && !desc->write) { 3122 desc->write = true; 3123 } 3124 3125 bdev->internal.claim_module = module; 3126 return 0; 3127 } 3128 3129 void 3130 spdk_bdev_module_release_bdev(struct spdk_bdev *bdev) 3131 { 3132 assert(bdev->internal.claim_module != NULL); 3133 bdev->internal.claim_module = NULL; 3134 } 3135 3136 struct spdk_bdev * 3137 spdk_bdev_desc_get_bdev(struct spdk_bdev_desc *desc) 3138 { 3139 return desc->bdev; 3140 } 3141 3142 void 3143 spdk_bdev_io_get_iovec(struct spdk_bdev_io *bdev_io, struct iovec **iovp, int *iovcntp) 3144 { 3145 struct iovec *iovs; 3146 int iovcnt; 3147 3148 if (bdev_io == NULL) { 3149 return; 3150 } 3151 3152 switch (bdev_io->type) { 3153 case SPDK_BDEV_IO_TYPE_READ: 3154 iovs = bdev_io->u.bdev.iovs; 3155 iovcnt = bdev_io->u.bdev.iovcnt; 3156 break; 3157 case SPDK_BDEV_IO_TYPE_WRITE: 3158 iovs = bdev_io->u.bdev.iovs; 3159 iovcnt = bdev_io->u.bdev.iovcnt; 3160 break; 3161 default: 3162 iovs = NULL; 3163 iovcnt = 0; 3164 break; 3165 } 3166 3167 if (iovp) { 3168 *iovp = iovs; 3169 } 3170 if (iovcntp) { 3171 *iovcntp = iovcnt; 3172 } 3173 } 3174 3175 void 3176 spdk_bdev_module_list_add(struct spdk_bdev_module *bdev_module) 3177 { 3178 3179 if (spdk_bdev_module_list_find(bdev_module->name)) { 3180 SPDK_ERRLOG("ERROR: module '%s' already registered.\n", bdev_module->name); 3181 assert(false); 3182 } 3183 3184 if (bdev_module->async_init) { 3185 bdev_module->internal.action_in_progress = 1; 3186 } 3187 3188 /* 3189 * Modules with examine callbacks must be initialized first, so they are 3190 * ready to handle examine callbacks from later modules that will 3191 * register physical bdevs. 3192 */ 3193 if (bdev_module->examine_config != NULL || bdev_module->examine_disk != NULL) { 3194 TAILQ_INSERT_HEAD(&g_bdev_mgr.bdev_modules, bdev_module, internal.tailq); 3195 } else { 3196 TAILQ_INSERT_TAIL(&g_bdev_mgr.bdev_modules, bdev_module, internal.tailq); 3197 } 3198 } 3199 3200 struct spdk_bdev_module * 3201 spdk_bdev_module_list_find(const char *name) 3202 { 3203 struct spdk_bdev_module *bdev_module; 3204 3205 TAILQ_FOREACH(bdev_module, &g_bdev_mgr.bdev_modules, internal.tailq) { 3206 if (strcmp(name, bdev_module->name) == 0) { 3207 break; 3208 } 3209 } 3210 3211 return bdev_module; 3212 } 3213 3214 static void 3215 spdk_bdev_write_zeroes_split(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg) 3216 { 3217 uint64_t len; 3218 3219 if (!success) { 3220 bdev_io->internal.cb = bdev_io->u.bdev.stored_user_cb; 3221 _spdk_bdev_io_complete(bdev_io); 3222 return; 3223 } 3224 3225 /* no need to perform the error checking from write_zeroes_blocks because this request already passed those checks. */ 3226 len = spdk_min(spdk_bdev_get_block_size(bdev_io->bdev) * bdev_io->u.bdev.split_remaining_num_blocks, 3227 ZERO_BUFFER_SIZE); 3228 3229 bdev_io->u.bdev.offset_blocks = bdev_io->u.bdev.split_current_offset_blocks; 3230 bdev_io->u.bdev.iovs[0].iov_len = len; 3231 bdev_io->u.bdev.num_blocks = len / spdk_bdev_get_block_size(bdev_io->bdev); 3232 bdev_io->u.bdev.split_remaining_num_blocks -= bdev_io->u.bdev.num_blocks; 3233 bdev_io->u.bdev.split_current_offset_blocks += bdev_io->u.bdev.num_blocks; 3234 3235 /* if this round completes the i/o, change the callback to be the original user callback */ 3236 if (bdev_io->u.bdev.split_remaining_num_blocks == 0) { 3237 spdk_bdev_io_init(bdev_io, bdev_io->bdev, cb_arg, bdev_io->u.bdev.stored_user_cb); 3238 } else { 3239 spdk_bdev_io_init(bdev_io, bdev_io->bdev, cb_arg, spdk_bdev_write_zeroes_split); 3240 } 3241 spdk_bdev_io_submit(bdev_io); 3242 } 3243 3244 struct set_qos_limit_ctx { 3245 void (*cb_fn)(void *cb_arg, int status); 3246 void *cb_arg; 3247 struct spdk_bdev *bdev; 3248 }; 3249 3250 static void 3251 _spdk_bdev_set_qos_limit_done(struct set_qos_limit_ctx *ctx, int status) 3252 { 3253 pthread_mutex_lock(&ctx->bdev->internal.mutex); 3254 ctx->bdev->internal.qos_mod_in_progress = false; 3255 pthread_mutex_unlock(&ctx->bdev->internal.mutex); 3256 3257 ctx->cb_fn(ctx->cb_arg, status); 3258 free(ctx); 3259 } 3260 3261 static void 3262 _spdk_bdev_disable_qos_done(void *cb_arg) 3263 { 3264 struct set_qos_limit_ctx *ctx = cb_arg; 3265 struct spdk_bdev *bdev = ctx->bdev; 3266 struct spdk_bdev_io *bdev_io; 3267 struct spdk_bdev_qos *qos; 3268 3269 pthread_mutex_lock(&bdev->internal.mutex); 3270 qos = bdev->internal.qos; 3271 bdev->internal.qos = NULL; 3272 pthread_mutex_unlock(&bdev->internal.mutex); 3273 3274 while (!TAILQ_EMPTY(&qos->queued)) { 3275 /* Send queued I/O back to their original thread for resubmission. */ 3276 bdev_io = TAILQ_FIRST(&qos->queued); 3277 TAILQ_REMOVE(&qos->queued, bdev_io, internal.link); 3278 3279 if (bdev_io->internal.io_submit_ch) { 3280 /* 3281 * Channel was changed when sending it to the QoS thread - change it back 3282 * before sending it back to the original thread. 3283 */ 3284 bdev_io->internal.ch = bdev_io->internal.io_submit_ch; 3285 bdev_io->internal.io_submit_ch = NULL; 3286 } 3287 3288 spdk_thread_send_msg(spdk_io_channel_get_thread(bdev_io->internal.ch->channel), 3289 _spdk_bdev_io_submit, bdev_io); 3290 } 3291 3292 spdk_put_io_channel(spdk_io_channel_from_ctx(qos->ch)); 3293 spdk_poller_unregister(&qos->poller); 3294 3295 free(qos); 3296 3297 _spdk_bdev_set_qos_limit_done(ctx, 0); 3298 } 3299 3300 static void 3301 _spdk_bdev_disable_qos_msg_done(struct spdk_io_channel_iter *i, int status) 3302 { 3303 void *io_device = spdk_io_channel_iter_get_io_device(i); 3304 struct spdk_bdev *bdev = __bdev_from_io_dev(io_device); 3305 struct set_qos_limit_ctx *ctx = spdk_io_channel_iter_get_ctx(i); 3306 struct spdk_thread *thread; 3307 3308 pthread_mutex_lock(&bdev->internal.mutex); 3309 thread = bdev->internal.qos->thread; 3310 pthread_mutex_unlock(&bdev->internal.mutex); 3311 3312 spdk_thread_send_msg(thread, _spdk_bdev_disable_qos_done, ctx); 3313 } 3314 3315 static void 3316 _spdk_bdev_disable_qos_msg(struct spdk_io_channel_iter *i) 3317 { 3318 struct spdk_io_channel *ch = spdk_io_channel_iter_get_channel(i); 3319 struct spdk_bdev_channel *bdev_ch = spdk_io_channel_get_ctx(ch); 3320 3321 bdev_ch->flags &= ~BDEV_CH_QOS_ENABLED; 3322 3323 spdk_for_each_channel_continue(i, 0); 3324 } 3325 3326 static void 3327 _spdk_bdev_update_qos_limit_iops_msg(void *cb_arg) 3328 { 3329 struct set_qos_limit_ctx *ctx = cb_arg; 3330 struct spdk_bdev *bdev = ctx->bdev; 3331 3332 pthread_mutex_lock(&bdev->internal.mutex); 3333 spdk_bdev_qos_update_max_quota_per_timeslice(bdev->internal.qos); 3334 pthread_mutex_unlock(&bdev->internal.mutex); 3335 3336 _spdk_bdev_set_qos_limit_done(ctx, 0); 3337 } 3338 3339 static void 3340 _spdk_bdev_enable_qos_msg(struct spdk_io_channel_iter *i) 3341 { 3342 void *io_device = spdk_io_channel_iter_get_io_device(i); 3343 struct spdk_bdev *bdev = __bdev_from_io_dev(io_device); 3344 struct spdk_io_channel *ch = spdk_io_channel_iter_get_channel(i); 3345 struct spdk_bdev_channel *bdev_ch = spdk_io_channel_get_ctx(ch); 3346 3347 pthread_mutex_lock(&bdev->internal.mutex); 3348 _spdk_bdev_enable_qos(bdev, bdev_ch); 3349 pthread_mutex_unlock(&bdev->internal.mutex); 3350 spdk_for_each_channel_continue(i, 0); 3351 } 3352 3353 static void 3354 _spdk_bdev_enable_qos_done(struct spdk_io_channel_iter *i, int status) 3355 { 3356 struct set_qos_limit_ctx *ctx = spdk_io_channel_iter_get_ctx(i); 3357 3358 _spdk_bdev_set_qos_limit_done(ctx, status); 3359 } 3360 3361 void 3362 spdk_bdev_set_qos_limit_iops(struct spdk_bdev *bdev, uint64_t ios_per_sec, 3363 void (*cb_fn)(void *cb_arg, int status), void *cb_arg) 3364 { 3365 struct set_qos_limit_ctx *ctx; 3366 3367 if (ios_per_sec > 0 && ios_per_sec % SPDK_BDEV_QOS_MIN_IOS_PER_SEC) { 3368 SPDK_ERRLOG("Requested ios_per_sec limit %" PRIu64 " is not a multiple of %u\n", 3369 ios_per_sec, SPDK_BDEV_QOS_MIN_IOS_PER_SEC); 3370 cb_fn(cb_arg, -EINVAL); 3371 return; 3372 } 3373 3374 ctx = calloc(1, sizeof(*ctx)); 3375 if (ctx == NULL) { 3376 cb_fn(cb_arg, -ENOMEM); 3377 return; 3378 } 3379 3380 ctx->cb_fn = cb_fn; 3381 ctx->cb_arg = cb_arg; 3382 ctx->bdev = bdev; 3383 3384 pthread_mutex_lock(&bdev->internal.mutex); 3385 if (bdev->internal.qos_mod_in_progress) { 3386 pthread_mutex_unlock(&bdev->internal.mutex); 3387 free(ctx); 3388 cb_fn(cb_arg, -EAGAIN); 3389 return; 3390 } 3391 bdev->internal.qos_mod_in_progress = true; 3392 3393 if (ios_per_sec > 0) { 3394 if (bdev->internal.qos == NULL) { 3395 /* Enabling */ 3396 bdev->internal.qos = calloc(1, sizeof(*bdev->internal.qos)); 3397 if (!bdev->internal.qos) { 3398 pthread_mutex_unlock(&bdev->internal.mutex); 3399 SPDK_ERRLOG("Unable to allocate memory for QoS tracking\n"); 3400 free(ctx); 3401 cb_fn(cb_arg, -ENOMEM); 3402 return; 3403 } 3404 3405 bdev->internal.qos->iops_rate_limit = ios_per_sec; 3406 spdk_for_each_channel(__bdev_to_io_dev(bdev), 3407 _spdk_bdev_enable_qos_msg, ctx, 3408 _spdk_bdev_enable_qos_done); 3409 } else { 3410 /* Updating */ 3411 bdev->internal.qos->iops_rate_limit = ios_per_sec; 3412 spdk_thread_send_msg(bdev->internal.qos->thread, _spdk_bdev_update_qos_limit_iops_msg, ctx); 3413 } 3414 } else { 3415 if (bdev->internal.qos != NULL) { 3416 /* Disabling */ 3417 spdk_for_each_channel(__bdev_to_io_dev(bdev), 3418 _spdk_bdev_disable_qos_msg, ctx, 3419 _spdk_bdev_disable_qos_msg_done); 3420 } else { 3421 pthread_mutex_unlock(&bdev->internal.mutex); 3422 _spdk_bdev_set_qos_limit_done(ctx, 0); 3423 return; 3424 } 3425 } 3426 3427 pthread_mutex_unlock(&bdev->internal.mutex); 3428 } 3429 3430 SPDK_LOG_REGISTER_COMPONENT("bdev", SPDK_LOG_BDEV) 3431