1 /*- 2 * BSD LICENSE 3 * 4 * Copyright (C) 2008-2012 Daisuke Aoyama <aoyama@peach.ne.jp>. 5 * Copyright (c) Intel Corporation. 6 * All rights reserved. 7 * 8 * Redistribution and use in source and binary forms, with or without 9 * modification, are permitted provided that the following conditions 10 * are met: 11 * 12 * * Redistributions of source code must retain the above copyright 13 * notice, this list of conditions and the following disclaimer. 14 * * Redistributions in binary form must reproduce the above copyright 15 * notice, this list of conditions and the following disclaimer in 16 * the documentation and/or other materials provided with the 17 * distribution. 18 * * Neither the name of Intel Corporation nor the names of its 19 * contributors may be used to endorse or promote products derived 20 * from this software without specific prior written permission. 21 * 22 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 23 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 24 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR 25 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT 26 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, 27 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT 28 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 29 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY 30 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 31 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE 32 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 33 */ 34 35 #include "spdk/stdinc.h" 36 37 #include "spdk/bdev.h" 38 #include "spdk/conf.h" 39 40 #include "spdk/env.h" 41 #include "spdk/event.h" 42 #include "spdk/thread.h" 43 #include "spdk/likely.h" 44 #include "spdk/queue.h" 45 #include "spdk/nvme_spec.h" 46 #include "spdk/scsi_spec.h" 47 #include "spdk/util.h" 48 49 #include "spdk/bdev_module.h" 50 #include "spdk_internal/log.h" 51 #include "spdk/string.h" 52 53 #ifdef SPDK_CONFIG_VTUNE 54 #include "ittnotify.h" 55 #include "ittnotify_types.h" 56 int __itt_init_ittlib(const char *, __itt_group_id); 57 #endif 58 59 #define SPDK_BDEV_IO_POOL_SIZE (64 * 1024) 60 #define SPDK_BDEV_IO_CACHE_SIZE 256 61 #define BUF_SMALL_POOL_SIZE 8192 62 #define BUF_LARGE_POOL_SIZE 1024 63 #define NOMEM_THRESHOLD_COUNT 8 64 #define ZERO_BUFFER_SIZE 0x100000 65 #define SPDK_BDEV_QOS_TIMESLICE_IN_USEC 1000 66 #define SPDK_BDEV_SEC_TO_USEC 1000000ULL 67 #define SPDK_BDEV_QOS_MIN_IO_PER_TIMESLICE 1 68 #define SPDK_BDEV_QOS_MIN_BYTE_PER_TIMESLICE 512 69 #define SPDK_BDEV_QOS_MIN_IOS_PER_SEC 10000 70 #define SPDK_BDEV_QOS_MIN_BW_IN_MB_PER_SEC 10 71 72 enum spdk_bdev_qos_type { 73 SPDK_BDEV_QOS_RW_IOPS_RATE_LIMIT = 0, 74 SPDK_BDEV_QOS_RW_BYTEPS_RATE_LIMIT, 75 SPDK_BDEV_QOS_NUM_TYPES /* Keep last */ 76 }; 77 78 static const char *qos_type_str[SPDK_BDEV_QOS_NUM_TYPES] = {"Limit_IOPS", "Limit_BWPS"}; 79 80 struct spdk_bdev_mgr { 81 struct spdk_mempool *bdev_io_pool; 82 83 struct spdk_mempool *buf_small_pool; 84 struct spdk_mempool *buf_large_pool; 85 86 void *zero_buffer; 87 88 TAILQ_HEAD(, spdk_bdev_module) bdev_modules; 89 90 TAILQ_HEAD(, spdk_bdev) bdevs; 91 92 bool init_complete; 93 bool module_init_complete; 94 95 #ifdef SPDK_CONFIG_VTUNE 96 __itt_domain *domain; 97 #endif 98 }; 99 100 static struct spdk_bdev_mgr g_bdev_mgr = { 101 .bdev_modules = TAILQ_HEAD_INITIALIZER(g_bdev_mgr.bdev_modules), 102 .bdevs = TAILQ_HEAD_INITIALIZER(g_bdev_mgr.bdevs), 103 .init_complete = false, 104 .module_init_complete = false, 105 }; 106 107 static struct spdk_bdev_opts g_bdev_opts = { 108 .bdev_io_pool_size = SPDK_BDEV_IO_POOL_SIZE, 109 .bdev_io_cache_size = SPDK_BDEV_IO_CACHE_SIZE, 110 }; 111 112 static spdk_bdev_init_cb g_init_cb_fn = NULL; 113 static void *g_init_cb_arg = NULL; 114 115 static spdk_bdev_fini_cb g_fini_cb_fn = NULL; 116 static void *g_fini_cb_arg = NULL; 117 static struct spdk_thread *g_fini_thread = NULL; 118 119 struct spdk_bdev_qos { 120 /** Rate limit, in I/O per second */ 121 uint64_t iops_rate_limit; 122 123 /** Rate limit, in byte per second */ 124 uint64_t byte_rate_limit; 125 126 /** The channel that all I/O are funneled through */ 127 struct spdk_bdev_channel *ch; 128 129 /** The thread on which the poller is running. */ 130 struct spdk_thread *thread; 131 132 /** Queue of I/O waiting to be issued. */ 133 bdev_io_tailq_t queued; 134 135 /** Maximum allowed IOs to be issued in one timeslice (e.g., 1ms) and 136 * only valid for the master channel which manages the outstanding IOs. */ 137 uint64_t max_ios_per_timeslice; 138 139 /** Maximum allowed bytes to be issued in one timeslice (e.g., 1ms) and 140 * only valid for the master channel which manages the outstanding IOs. */ 141 uint64_t max_byte_per_timeslice; 142 143 /** Submitted IO in one timeslice (e.g., 1ms) */ 144 uint64_t io_submitted_this_timeslice; 145 146 /** Submitted byte in one timeslice (e.g., 1ms) */ 147 uint64_t byte_submitted_this_timeslice; 148 149 /** Polller that processes queued I/O commands each time slice. */ 150 struct spdk_poller *poller; 151 }; 152 153 struct spdk_bdev_mgmt_channel { 154 bdev_io_stailq_t need_buf_small; 155 bdev_io_stailq_t need_buf_large; 156 157 /* 158 * Each thread keeps a cache of bdev_io - this allows 159 * bdev threads which are *not* DPDK threads to still 160 * benefit from a per-thread bdev_io cache. Without 161 * this, non-DPDK threads fetching from the mempool 162 * incur a cmpxchg on get and put. 163 */ 164 bdev_io_stailq_t per_thread_cache; 165 uint32_t per_thread_cache_count; 166 uint32_t bdev_io_cache_size; 167 168 TAILQ_HEAD(, spdk_bdev_shared_resource) shared_resources; 169 TAILQ_HEAD(, spdk_bdev_io_wait_entry) io_wait_queue; 170 }; 171 172 /* 173 * Per-module (or per-io_device) data. Multiple bdevs built on the same io_device 174 * will queue here their IO that awaits retry. It makes it posible to retry sending 175 * IO to one bdev after IO from other bdev completes. 176 */ 177 struct spdk_bdev_shared_resource { 178 /* The bdev management channel */ 179 struct spdk_bdev_mgmt_channel *mgmt_ch; 180 181 /* 182 * Count of I/O submitted to bdev module and waiting for completion. 183 * Incremented before submit_request() is called on an spdk_bdev_io. 184 */ 185 uint64_t io_outstanding; 186 187 /* 188 * Queue of IO awaiting retry because of a previous NOMEM status returned 189 * on this channel. 190 */ 191 bdev_io_tailq_t nomem_io; 192 193 /* 194 * Threshold which io_outstanding must drop to before retrying nomem_io. 195 */ 196 uint64_t nomem_threshold; 197 198 /* I/O channel allocated by a bdev module */ 199 struct spdk_io_channel *shared_ch; 200 201 /* Refcount of bdev channels using this resource */ 202 uint32_t ref; 203 204 TAILQ_ENTRY(spdk_bdev_shared_resource) link; 205 }; 206 207 #define BDEV_CH_RESET_IN_PROGRESS (1 << 0) 208 #define BDEV_CH_QOS_ENABLED (1 << 1) 209 210 struct spdk_bdev_channel { 211 struct spdk_bdev *bdev; 212 213 /* The channel for the underlying device */ 214 struct spdk_io_channel *channel; 215 216 /* Per io_device per thread data */ 217 struct spdk_bdev_shared_resource *shared_resource; 218 219 struct spdk_bdev_io_stat stat; 220 221 /* 222 * Count of I/O submitted through this channel and waiting for completion. 223 * Incremented before submit_request() is called on an spdk_bdev_io. 224 */ 225 uint64_t io_outstanding; 226 227 bdev_io_tailq_t queued_resets; 228 229 uint32_t flags; 230 231 #ifdef SPDK_CONFIG_VTUNE 232 uint64_t start_tsc; 233 uint64_t interval_tsc; 234 __itt_string_handle *handle; 235 struct spdk_bdev_io_stat prev_stat; 236 #endif 237 238 }; 239 240 struct spdk_bdev_desc { 241 struct spdk_bdev *bdev; 242 spdk_bdev_remove_cb_t remove_cb; 243 void *remove_ctx; 244 bool remove_scheduled; 245 bool write; 246 TAILQ_ENTRY(spdk_bdev_desc) link; 247 }; 248 249 struct spdk_bdev_iostat_ctx { 250 struct spdk_bdev_io_stat *stat; 251 spdk_bdev_get_device_stat_cb cb; 252 void *cb_arg; 253 }; 254 255 #define __bdev_to_io_dev(bdev) (((char *)bdev) + 1) 256 #define __bdev_from_io_dev(io_dev) ((struct spdk_bdev *)(((char *)io_dev) - 1)) 257 258 static void spdk_bdev_write_zeroes_split(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg); 259 260 void 261 spdk_bdev_get_opts(struct spdk_bdev_opts *opts) 262 { 263 *opts = g_bdev_opts; 264 } 265 266 int 267 spdk_bdev_set_opts(struct spdk_bdev_opts *opts) 268 { 269 uint32_t min_pool_size; 270 271 /* 272 * Add 1 to the thread count to account for the extra mgmt_ch that gets created during subsystem 273 * initialization. A second mgmt_ch will be created on the same thread when the application starts 274 * but before the deferred put_io_channel event is executed for the first mgmt_ch. 275 */ 276 min_pool_size = opts->bdev_io_cache_size * (spdk_thread_get_count() + 1); 277 if (opts->bdev_io_pool_size < min_pool_size) { 278 SPDK_ERRLOG("bdev_io_pool_size %" PRIu32 " is not compatible with bdev_io_cache_size %" PRIu32 279 " and %" PRIu32 " threads\n", opts->bdev_io_pool_size, opts->bdev_io_cache_size, 280 spdk_thread_get_count()); 281 SPDK_ERRLOG("bdev_io_pool_size must be at least %" PRIu32 "\n", min_pool_size); 282 return -1; 283 } 284 285 g_bdev_opts = *opts; 286 return 0; 287 } 288 289 struct spdk_bdev * 290 spdk_bdev_first(void) 291 { 292 struct spdk_bdev *bdev; 293 294 bdev = TAILQ_FIRST(&g_bdev_mgr.bdevs); 295 if (bdev) { 296 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Starting bdev iteration at %s\n", bdev->name); 297 } 298 299 return bdev; 300 } 301 302 struct spdk_bdev * 303 spdk_bdev_next(struct spdk_bdev *prev) 304 { 305 struct spdk_bdev *bdev; 306 307 bdev = TAILQ_NEXT(prev, internal.link); 308 if (bdev) { 309 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Continuing bdev iteration at %s\n", bdev->name); 310 } 311 312 return bdev; 313 } 314 315 static struct spdk_bdev * 316 _bdev_next_leaf(struct spdk_bdev *bdev) 317 { 318 while (bdev != NULL) { 319 if (bdev->internal.claim_module == NULL) { 320 return bdev; 321 } else { 322 bdev = TAILQ_NEXT(bdev, internal.link); 323 } 324 } 325 326 return bdev; 327 } 328 329 struct spdk_bdev * 330 spdk_bdev_first_leaf(void) 331 { 332 struct spdk_bdev *bdev; 333 334 bdev = _bdev_next_leaf(TAILQ_FIRST(&g_bdev_mgr.bdevs)); 335 336 if (bdev) { 337 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Starting bdev iteration at %s\n", bdev->name); 338 } 339 340 return bdev; 341 } 342 343 struct spdk_bdev * 344 spdk_bdev_next_leaf(struct spdk_bdev *prev) 345 { 346 struct spdk_bdev *bdev; 347 348 bdev = _bdev_next_leaf(TAILQ_NEXT(prev, internal.link)); 349 350 if (bdev) { 351 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Continuing bdev iteration at %s\n", bdev->name); 352 } 353 354 return bdev; 355 } 356 357 struct spdk_bdev * 358 spdk_bdev_get_by_name(const char *bdev_name) 359 { 360 struct spdk_bdev_alias *tmp; 361 struct spdk_bdev *bdev = spdk_bdev_first(); 362 363 while (bdev != NULL) { 364 if (strcmp(bdev_name, bdev->name) == 0) { 365 return bdev; 366 } 367 368 TAILQ_FOREACH(tmp, &bdev->aliases, tailq) { 369 if (strcmp(bdev_name, tmp->alias) == 0) { 370 return bdev; 371 } 372 } 373 374 bdev = spdk_bdev_next(bdev); 375 } 376 377 return NULL; 378 } 379 380 static void 381 spdk_bdev_io_set_buf(struct spdk_bdev_io *bdev_io, void *buf) 382 { 383 assert(bdev_io->internal.get_buf_cb != NULL); 384 assert(buf != NULL); 385 assert(bdev_io->u.bdev.iovs != NULL); 386 387 bdev_io->internal.buf = buf; 388 bdev_io->u.bdev.iovs[0].iov_base = (void *)((unsigned long)((char *)buf + 512) & ~511UL); 389 bdev_io->u.bdev.iovs[0].iov_len = bdev_io->internal.buf_len; 390 bdev_io->internal.get_buf_cb(bdev_io->internal.ch->channel, bdev_io); 391 } 392 393 static void 394 spdk_bdev_io_put_buf(struct spdk_bdev_io *bdev_io) 395 { 396 struct spdk_mempool *pool; 397 struct spdk_bdev_io *tmp; 398 void *buf; 399 bdev_io_stailq_t *stailq; 400 struct spdk_bdev_mgmt_channel *ch; 401 402 assert(bdev_io->u.bdev.iovcnt == 1); 403 404 buf = bdev_io->internal.buf; 405 ch = bdev_io->internal.ch->shared_resource->mgmt_ch; 406 407 if (bdev_io->internal.buf_len <= SPDK_BDEV_SMALL_BUF_MAX_SIZE) { 408 pool = g_bdev_mgr.buf_small_pool; 409 stailq = &ch->need_buf_small; 410 } else { 411 pool = g_bdev_mgr.buf_large_pool; 412 stailq = &ch->need_buf_large; 413 } 414 415 if (STAILQ_EMPTY(stailq)) { 416 spdk_mempool_put(pool, buf); 417 } else { 418 tmp = STAILQ_FIRST(stailq); 419 STAILQ_REMOVE_HEAD(stailq, internal.buf_link); 420 spdk_bdev_io_set_buf(tmp, buf); 421 } 422 } 423 424 void 425 spdk_bdev_io_get_buf(struct spdk_bdev_io *bdev_io, spdk_bdev_io_get_buf_cb cb, uint64_t len) 426 { 427 struct spdk_mempool *pool; 428 bdev_io_stailq_t *stailq; 429 void *buf = NULL; 430 struct spdk_bdev_mgmt_channel *mgmt_ch; 431 432 assert(cb != NULL); 433 assert(bdev_io->u.bdev.iovs != NULL); 434 435 if (spdk_unlikely(bdev_io->u.bdev.iovs[0].iov_base != NULL)) { 436 /* Buffer already present */ 437 cb(bdev_io->internal.ch->channel, bdev_io); 438 return; 439 } 440 441 assert(len <= SPDK_BDEV_LARGE_BUF_MAX_SIZE); 442 mgmt_ch = bdev_io->internal.ch->shared_resource->mgmt_ch; 443 444 bdev_io->internal.buf_len = len; 445 bdev_io->internal.get_buf_cb = cb; 446 if (len <= SPDK_BDEV_SMALL_BUF_MAX_SIZE) { 447 pool = g_bdev_mgr.buf_small_pool; 448 stailq = &mgmt_ch->need_buf_small; 449 } else { 450 pool = g_bdev_mgr.buf_large_pool; 451 stailq = &mgmt_ch->need_buf_large; 452 } 453 454 buf = spdk_mempool_get(pool); 455 456 if (!buf) { 457 STAILQ_INSERT_TAIL(stailq, bdev_io, internal.buf_link); 458 } else { 459 spdk_bdev_io_set_buf(bdev_io, buf); 460 } 461 } 462 463 static int 464 spdk_bdev_module_get_max_ctx_size(void) 465 { 466 struct spdk_bdev_module *bdev_module; 467 int max_bdev_module_size = 0; 468 469 TAILQ_FOREACH(bdev_module, &g_bdev_mgr.bdev_modules, internal.tailq) { 470 if (bdev_module->get_ctx_size && bdev_module->get_ctx_size() > max_bdev_module_size) { 471 max_bdev_module_size = bdev_module->get_ctx_size(); 472 } 473 } 474 475 return max_bdev_module_size; 476 } 477 478 void 479 spdk_bdev_config_text(FILE *fp) 480 { 481 struct spdk_bdev_module *bdev_module; 482 483 TAILQ_FOREACH(bdev_module, &g_bdev_mgr.bdev_modules, internal.tailq) { 484 if (bdev_module->config_text) { 485 bdev_module->config_text(fp); 486 } 487 } 488 } 489 490 void 491 spdk_bdev_subsystem_config_json(struct spdk_json_write_ctx *w) 492 { 493 struct spdk_bdev_module *bdev_module; 494 struct spdk_bdev *bdev; 495 496 assert(w != NULL); 497 498 spdk_json_write_array_begin(w); 499 500 spdk_json_write_object_begin(w); 501 spdk_json_write_named_string(w, "method", "set_bdev_options"); 502 spdk_json_write_name(w, "params"); 503 spdk_json_write_object_begin(w); 504 spdk_json_write_named_uint32(w, "bdev_io_pool_size", g_bdev_opts.bdev_io_pool_size); 505 spdk_json_write_named_uint32(w, "bdev_io_cache_size", g_bdev_opts.bdev_io_cache_size); 506 spdk_json_write_object_end(w); 507 spdk_json_write_object_end(w); 508 509 TAILQ_FOREACH(bdev_module, &g_bdev_mgr.bdev_modules, internal.tailq) { 510 if (bdev_module->config_json) { 511 bdev_module->config_json(w); 512 } 513 } 514 515 TAILQ_FOREACH(bdev, &g_bdev_mgr.bdevs, internal.link) { 516 spdk_bdev_config_json(bdev, w); 517 } 518 519 spdk_json_write_array_end(w); 520 } 521 522 static int 523 spdk_bdev_mgmt_channel_create(void *io_device, void *ctx_buf) 524 { 525 struct spdk_bdev_mgmt_channel *ch = ctx_buf; 526 struct spdk_bdev_io *bdev_io; 527 uint32_t i; 528 529 STAILQ_INIT(&ch->need_buf_small); 530 STAILQ_INIT(&ch->need_buf_large); 531 532 STAILQ_INIT(&ch->per_thread_cache); 533 ch->bdev_io_cache_size = g_bdev_opts.bdev_io_cache_size; 534 535 /* Pre-populate bdev_io cache to ensure this thread cannot be starved. */ 536 ch->per_thread_cache_count = 0; 537 for (i = 0; i < ch->bdev_io_cache_size; i++) { 538 bdev_io = spdk_mempool_get(g_bdev_mgr.bdev_io_pool); 539 assert(bdev_io != NULL); 540 ch->per_thread_cache_count++; 541 STAILQ_INSERT_TAIL(&ch->per_thread_cache, bdev_io, internal.buf_link); 542 } 543 544 TAILQ_INIT(&ch->shared_resources); 545 TAILQ_INIT(&ch->io_wait_queue); 546 547 return 0; 548 } 549 550 static void 551 spdk_bdev_mgmt_channel_destroy(void *io_device, void *ctx_buf) 552 { 553 struct spdk_bdev_mgmt_channel *ch = ctx_buf; 554 struct spdk_bdev_io *bdev_io; 555 556 if (!STAILQ_EMPTY(&ch->need_buf_small) || !STAILQ_EMPTY(&ch->need_buf_large)) { 557 SPDK_ERRLOG("Pending I/O list wasn't empty on mgmt channel free\n"); 558 } 559 560 if (!TAILQ_EMPTY(&ch->shared_resources)) { 561 SPDK_ERRLOG("Module channel list wasn't empty on mgmt channel free\n"); 562 } 563 564 while (!STAILQ_EMPTY(&ch->per_thread_cache)) { 565 bdev_io = STAILQ_FIRST(&ch->per_thread_cache); 566 STAILQ_REMOVE_HEAD(&ch->per_thread_cache, internal.buf_link); 567 ch->per_thread_cache_count--; 568 spdk_mempool_put(g_bdev_mgr.bdev_io_pool, (void *)bdev_io); 569 } 570 571 assert(ch->per_thread_cache_count == 0); 572 } 573 574 static void 575 spdk_bdev_init_complete(int rc) 576 { 577 spdk_bdev_init_cb cb_fn = g_init_cb_fn; 578 void *cb_arg = g_init_cb_arg; 579 struct spdk_bdev_module *m; 580 581 g_bdev_mgr.init_complete = true; 582 g_init_cb_fn = NULL; 583 g_init_cb_arg = NULL; 584 585 /* 586 * For modules that need to know when subsystem init is complete, 587 * inform them now. 588 */ 589 TAILQ_FOREACH(m, &g_bdev_mgr.bdev_modules, internal.tailq) { 590 if (m->init_complete) { 591 m->init_complete(); 592 } 593 } 594 595 cb_fn(cb_arg, rc); 596 } 597 598 static void 599 spdk_bdev_module_action_complete(void) 600 { 601 struct spdk_bdev_module *m; 602 603 /* 604 * Don't finish bdev subsystem initialization if 605 * module pre-initialization is still in progress, or 606 * the subsystem been already initialized. 607 */ 608 if (!g_bdev_mgr.module_init_complete || g_bdev_mgr.init_complete) { 609 return; 610 } 611 612 /* 613 * Check all bdev modules for inits/examinations in progress. If any 614 * exist, return immediately since we cannot finish bdev subsystem 615 * initialization until all are completed. 616 */ 617 TAILQ_FOREACH(m, &g_bdev_mgr.bdev_modules, internal.tailq) { 618 if (m->internal.action_in_progress > 0) { 619 return; 620 } 621 } 622 623 /* 624 * Modules already finished initialization - now that all 625 * the bdev modules have finished their asynchronous I/O 626 * processing, the entire bdev layer can be marked as complete. 627 */ 628 spdk_bdev_init_complete(0); 629 } 630 631 static void 632 spdk_bdev_module_action_done(struct spdk_bdev_module *module) 633 { 634 assert(module->internal.action_in_progress > 0); 635 module->internal.action_in_progress--; 636 spdk_bdev_module_action_complete(); 637 } 638 639 void 640 spdk_bdev_module_init_done(struct spdk_bdev_module *module) 641 { 642 spdk_bdev_module_action_done(module); 643 } 644 645 void 646 spdk_bdev_module_examine_done(struct spdk_bdev_module *module) 647 { 648 spdk_bdev_module_action_done(module); 649 } 650 651 static int 652 spdk_bdev_modules_init(void) 653 { 654 struct spdk_bdev_module *module; 655 int rc = 0; 656 657 TAILQ_FOREACH(module, &g_bdev_mgr.bdev_modules, internal.tailq) { 658 rc = module->module_init(); 659 if (rc != 0) { 660 break; 661 } 662 } 663 664 g_bdev_mgr.module_init_complete = true; 665 return rc; 666 } 667 668 void 669 spdk_bdev_initialize(spdk_bdev_init_cb cb_fn, void *cb_arg) 670 { 671 struct spdk_conf_section *sp; 672 struct spdk_bdev_opts bdev_opts; 673 int32_t bdev_io_pool_size, bdev_io_cache_size; 674 int cache_size; 675 int rc = 0; 676 char mempool_name[32]; 677 678 assert(cb_fn != NULL); 679 680 sp = spdk_conf_find_section(NULL, "Bdev"); 681 if (sp != NULL) { 682 spdk_bdev_get_opts(&bdev_opts); 683 684 bdev_io_pool_size = spdk_conf_section_get_intval(sp, "BdevIoPoolSize"); 685 if (bdev_io_pool_size >= 0) { 686 bdev_opts.bdev_io_pool_size = bdev_io_pool_size; 687 } 688 689 bdev_io_cache_size = spdk_conf_section_get_intval(sp, "BdevIoCacheSize"); 690 if (bdev_io_cache_size >= 0) { 691 bdev_opts.bdev_io_cache_size = bdev_io_cache_size; 692 } 693 694 if (spdk_bdev_set_opts(&bdev_opts)) { 695 spdk_bdev_init_complete(-1); 696 return; 697 } 698 699 assert(memcmp(&bdev_opts, &g_bdev_opts, sizeof(bdev_opts)) == 0); 700 } 701 702 g_init_cb_fn = cb_fn; 703 g_init_cb_arg = cb_arg; 704 705 snprintf(mempool_name, sizeof(mempool_name), "bdev_io_%d", getpid()); 706 707 g_bdev_mgr.bdev_io_pool = spdk_mempool_create(mempool_name, 708 g_bdev_opts.bdev_io_pool_size, 709 sizeof(struct spdk_bdev_io) + 710 spdk_bdev_module_get_max_ctx_size(), 711 0, 712 SPDK_ENV_SOCKET_ID_ANY); 713 714 if (g_bdev_mgr.bdev_io_pool == NULL) { 715 SPDK_ERRLOG("could not allocate spdk_bdev_io pool\n"); 716 spdk_bdev_init_complete(-1); 717 return; 718 } 719 720 /** 721 * Ensure no more than half of the total buffers end up local caches, by 722 * using spdk_thread_get_count() to determine how many local caches we need 723 * to account for. 724 */ 725 cache_size = BUF_SMALL_POOL_SIZE / (2 * spdk_thread_get_count()); 726 snprintf(mempool_name, sizeof(mempool_name), "buf_small_pool_%d", getpid()); 727 728 g_bdev_mgr.buf_small_pool = spdk_mempool_create(mempool_name, 729 BUF_SMALL_POOL_SIZE, 730 SPDK_BDEV_SMALL_BUF_MAX_SIZE + 512, 731 cache_size, 732 SPDK_ENV_SOCKET_ID_ANY); 733 if (!g_bdev_mgr.buf_small_pool) { 734 SPDK_ERRLOG("create rbuf small pool failed\n"); 735 spdk_bdev_init_complete(-1); 736 return; 737 } 738 739 cache_size = BUF_LARGE_POOL_SIZE / (2 * spdk_thread_get_count()); 740 snprintf(mempool_name, sizeof(mempool_name), "buf_large_pool_%d", getpid()); 741 742 g_bdev_mgr.buf_large_pool = spdk_mempool_create(mempool_name, 743 BUF_LARGE_POOL_SIZE, 744 SPDK_BDEV_LARGE_BUF_MAX_SIZE + 512, 745 cache_size, 746 SPDK_ENV_SOCKET_ID_ANY); 747 if (!g_bdev_mgr.buf_large_pool) { 748 SPDK_ERRLOG("create rbuf large pool failed\n"); 749 spdk_bdev_init_complete(-1); 750 return; 751 } 752 753 g_bdev_mgr.zero_buffer = spdk_dma_zmalloc(ZERO_BUFFER_SIZE, ZERO_BUFFER_SIZE, 754 NULL); 755 if (!g_bdev_mgr.zero_buffer) { 756 SPDK_ERRLOG("create bdev zero buffer failed\n"); 757 spdk_bdev_init_complete(-1); 758 return; 759 } 760 761 #ifdef SPDK_CONFIG_VTUNE 762 g_bdev_mgr.domain = __itt_domain_create("spdk_bdev"); 763 #endif 764 765 spdk_io_device_register(&g_bdev_mgr, spdk_bdev_mgmt_channel_create, 766 spdk_bdev_mgmt_channel_destroy, 767 sizeof(struct spdk_bdev_mgmt_channel)); 768 769 rc = spdk_bdev_modules_init(); 770 if (rc != 0) { 771 SPDK_ERRLOG("bdev modules init failed\n"); 772 spdk_bdev_init_complete(-1); 773 return; 774 } 775 776 spdk_bdev_module_action_complete(); 777 } 778 779 static void 780 spdk_bdev_mgr_unregister_cb(void *io_device) 781 { 782 spdk_bdev_fini_cb cb_fn = g_fini_cb_fn; 783 784 if (spdk_mempool_count(g_bdev_mgr.bdev_io_pool) != g_bdev_opts.bdev_io_pool_size) { 785 SPDK_ERRLOG("bdev IO pool count is %zu but should be %u\n", 786 spdk_mempool_count(g_bdev_mgr.bdev_io_pool), 787 g_bdev_opts.bdev_io_pool_size); 788 } 789 790 if (spdk_mempool_count(g_bdev_mgr.buf_small_pool) != BUF_SMALL_POOL_SIZE) { 791 SPDK_ERRLOG("Small buffer pool count is %zu but should be %u\n", 792 spdk_mempool_count(g_bdev_mgr.buf_small_pool), 793 BUF_SMALL_POOL_SIZE); 794 assert(false); 795 } 796 797 if (spdk_mempool_count(g_bdev_mgr.buf_large_pool) != BUF_LARGE_POOL_SIZE) { 798 SPDK_ERRLOG("Large buffer pool count is %zu but should be %u\n", 799 spdk_mempool_count(g_bdev_mgr.buf_large_pool), 800 BUF_LARGE_POOL_SIZE); 801 assert(false); 802 } 803 804 spdk_mempool_free(g_bdev_mgr.bdev_io_pool); 805 spdk_mempool_free(g_bdev_mgr.buf_small_pool); 806 spdk_mempool_free(g_bdev_mgr.buf_large_pool); 807 spdk_dma_free(g_bdev_mgr.zero_buffer); 808 809 cb_fn(g_fini_cb_arg); 810 g_fini_cb_fn = NULL; 811 g_fini_cb_arg = NULL; 812 } 813 814 static struct spdk_bdev_module *g_resume_bdev_module = NULL; 815 816 static void 817 spdk_bdev_module_finish_iter(void *arg) 818 { 819 struct spdk_bdev_module *bdev_module; 820 821 /* Start iterating from the last touched module */ 822 if (!g_resume_bdev_module) { 823 bdev_module = TAILQ_FIRST(&g_bdev_mgr.bdev_modules); 824 } else { 825 bdev_module = TAILQ_NEXT(g_resume_bdev_module, internal.tailq); 826 } 827 828 while (bdev_module) { 829 if (bdev_module->async_fini) { 830 /* Save our place so we can resume later. We must 831 * save the variable here, before calling module_fini() 832 * below, because in some cases the module may immediately 833 * call spdk_bdev_module_finish_done() and re-enter 834 * this function to continue iterating. */ 835 g_resume_bdev_module = bdev_module; 836 } 837 838 if (bdev_module->module_fini) { 839 bdev_module->module_fini(); 840 } 841 842 if (bdev_module->async_fini) { 843 return; 844 } 845 846 bdev_module = TAILQ_NEXT(bdev_module, internal.tailq); 847 } 848 849 g_resume_bdev_module = NULL; 850 spdk_io_device_unregister(&g_bdev_mgr, spdk_bdev_mgr_unregister_cb); 851 } 852 853 void 854 spdk_bdev_module_finish_done(void) 855 { 856 if (spdk_get_thread() != g_fini_thread) { 857 spdk_thread_send_msg(g_fini_thread, spdk_bdev_module_finish_iter, NULL); 858 } else { 859 spdk_bdev_module_finish_iter(NULL); 860 } 861 } 862 863 static void 864 _spdk_bdev_finish_unregister_bdevs_iter(void *cb_arg, int bdeverrno) 865 { 866 struct spdk_bdev *bdev = cb_arg; 867 868 if (bdeverrno && bdev) { 869 SPDK_WARNLOG("Unable to unregister bdev '%s' during spdk_bdev_finish()\n", 870 bdev->name); 871 872 /* 873 * Since the call to spdk_bdev_unregister() failed, we have no way to free this 874 * bdev; try to continue by manually removing this bdev from the list and continue 875 * with the next bdev in the list. 876 */ 877 TAILQ_REMOVE(&g_bdev_mgr.bdevs, bdev, internal.link); 878 } 879 880 if (TAILQ_EMPTY(&g_bdev_mgr.bdevs)) { 881 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Done unregistering bdevs\n"); 882 /* 883 * Bdev module finish need to be deffered as we might be in the middle of some context 884 * (like bdev part free) that will use this bdev (or private bdev driver ctx data) 885 * after returning. 886 */ 887 spdk_thread_send_msg(spdk_get_thread(), spdk_bdev_module_finish_iter, NULL); 888 return; 889 } 890 891 /* 892 * Unregister the first bdev in the list. 893 * 894 * spdk_bdev_unregister() will handle the case where the bdev has open descriptors by 895 * calling the remove_cb of the descriptors first. 896 * 897 * Once this bdev and all of its open descriptors have been cleaned up, this function 898 * will be called again via the unregister completion callback to continue the cleanup 899 * process with the next bdev. 900 */ 901 bdev = TAILQ_FIRST(&g_bdev_mgr.bdevs); 902 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Unregistering bdev '%s'\n", bdev->name); 903 spdk_bdev_unregister(bdev, _spdk_bdev_finish_unregister_bdevs_iter, bdev); 904 } 905 906 void 907 spdk_bdev_finish(spdk_bdev_fini_cb cb_fn, void *cb_arg) 908 { 909 assert(cb_fn != NULL); 910 911 g_fini_thread = spdk_get_thread(); 912 913 g_fini_cb_fn = cb_fn; 914 g_fini_cb_arg = cb_arg; 915 916 _spdk_bdev_finish_unregister_bdevs_iter(NULL, 0); 917 } 918 919 static struct spdk_bdev_io * 920 spdk_bdev_get_io(struct spdk_bdev_channel *channel) 921 { 922 struct spdk_bdev_mgmt_channel *ch = channel->shared_resource->mgmt_ch; 923 struct spdk_bdev_io *bdev_io; 924 925 if (ch->per_thread_cache_count > 0) { 926 bdev_io = STAILQ_FIRST(&ch->per_thread_cache); 927 STAILQ_REMOVE_HEAD(&ch->per_thread_cache, internal.buf_link); 928 ch->per_thread_cache_count--; 929 } else if (spdk_unlikely(!TAILQ_EMPTY(&ch->io_wait_queue))) { 930 /* 931 * Don't try to look for bdev_ios in the global pool if there are 932 * waiters on bdev_ios - we don't want this caller to jump the line. 933 */ 934 bdev_io = NULL; 935 } else { 936 bdev_io = spdk_mempool_get(g_bdev_mgr.bdev_io_pool); 937 } 938 939 return bdev_io; 940 } 941 942 void 943 spdk_bdev_free_io(struct spdk_bdev_io *bdev_io) 944 { 945 struct spdk_bdev_mgmt_channel *ch = bdev_io->internal.ch->shared_resource->mgmt_ch; 946 947 assert(bdev_io != NULL); 948 assert(bdev_io->internal.status != SPDK_BDEV_IO_STATUS_PENDING); 949 950 if (bdev_io->internal.buf != NULL) { 951 spdk_bdev_io_put_buf(bdev_io); 952 } 953 954 if (ch->per_thread_cache_count < ch->bdev_io_cache_size) { 955 ch->per_thread_cache_count++; 956 STAILQ_INSERT_TAIL(&ch->per_thread_cache, bdev_io, internal.buf_link); 957 while (ch->per_thread_cache_count > 0 && !TAILQ_EMPTY(&ch->io_wait_queue)) { 958 struct spdk_bdev_io_wait_entry *entry; 959 960 entry = TAILQ_FIRST(&ch->io_wait_queue); 961 TAILQ_REMOVE(&ch->io_wait_queue, entry, link); 962 entry->cb_fn(entry->cb_arg); 963 } 964 } else { 965 /* We should never have a full cache with entries on the io wait queue. */ 966 assert(TAILQ_EMPTY(&ch->io_wait_queue)); 967 spdk_mempool_put(g_bdev_mgr.bdev_io_pool, (void *)bdev_io); 968 } 969 } 970 971 static uint64_t 972 _spdk_bdev_get_io_size_in_byte(struct spdk_bdev_io *bdev_io) 973 { 974 struct spdk_bdev *bdev = bdev_io->bdev; 975 976 switch (bdev_io->type) { 977 case SPDK_BDEV_IO_TYPE_NVME_ADMIN: 978 case SPDK_BDEV_IO_TYPE_NVME_IO: 979 case SPDK_BDEV_IO_TYPE_NVME_IO_MD: 980 return bdev_io->u.nvme_passthru.nbytes; 981 case SPDK_BDEV_IO_TYPE_READ: 982 case SPDK_BDEV_IO_TYPE_WRITE: 983 case SPDK_BDEV_IO_TYPE_UNMAP: 984 case SPDK_BDEV_IO_TYPE_WRITE_ZEROES: 985 return bdev_io->u.bdev.num_blocks * bdev->blocklen; 986 default: 987 return 0; 988 } 989 } 990 991 static void 992 _spdk_bdev_qos_io_submit(struct spdk_bdev_channel *ch) 993 { 994 struct spdk_bdev_io *bdev_io = NULL; 995 struct spdk_bdev *bdev = ch->bdev; 996 struct spdk_bdev_qos *qos = bdev->internal.qos; 997 struct spdk_bdev_shared_resource *shared_resource = ch->shared_resource; 998 999 while (!TAILQ_EMPTY(&qos->queued)) { 1000 if (qos->max_ios_per_timeslice > 0 && 1001 qos->io_submitted_this_timeslice >= qos->max_ios_per_timeslice) { 1002 break; 1003 } 1004 1005 if (qos->max_byte_per_timeslice > 0 && 1006 qos->byte_submitted_this_timeslice >= qos->max_byte_per_timeslice) { 1007 break; 1008 } 1009 1010 bdev_io = TAILQ_FIRST(&qos->queued); 1011 TAILQ_REMOVE(&qos->queued, bdev_io, internal.link); 1012 qos->io_submitted_this_timeslice++; 1013 qos->byte_submitted_this_timeslice += _spdk_bdev_get_io_size_in_byte(bdev_io); 1014 ch->io_outstanding++; 1015 shared_resource->io_outstanding++; 1016 bdev->fn_table->submit_request(ch->channel, bdev_io); 1017 } 1018 } 1019 1020 static void 1021 _spdk_bdev_io_submit(void *ctx) 1022 { 1023 struct spdk_bdev_io *bdev_io = ctx; 1024 struct spdk_bdev *bdev = bdev_io->bdev; 1025 struct spdk_bdev_channel *bdev_ch = bdev_io->internal.ch; 1026 struct spdk_io_channel *ch = bdev_ch->channel; 1027 struct spdk_bdev_shared_resource *shared_resource = bdev_ch->shared_resource; 1028 1029 bdev_io->internal.submit_tsc = spdk_get_ticks(); 1030 bdev_ch->io_outstanding++; 1031 shared_resource->io_outstanding++; 1032 bdev_io->internal.in_submit_request = true; 1033 if (spdk_likely(bdev_ch->flags == 0)) { 1034 if (spdk_likely(TAILQ_EMPTY(&shared_resource->nomem_io))) { 1035 bdev->fn_table->submit_request(ch, bdev_io); 1036 } else { 1037 bdev_ch->io_outstanding--; 1038 shared_resource->io_outstanding--; 1039 TAILQ_INSERT_TAIL(&shared_resource->nomem_io, bdev_io, internal.link); 1040 } 1041 } else if (bdev_ch->flags & BDEV_CH_RESET_IN_PROGRESS) { 1042 spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED); 1043 } else if (bdev_ch->flags & BDEV_CH_QOS_ENABLED) { 1044 bdev_ch->io_outstanding--; 1045 shared_resource->io_outstanding--; 1046 TAILQ_INSERT_TAIL(&bdev->internal.qos->queued, bdev_io, internal.link); 1047 _spdk_bdev_qos_io_submit(bdev_ch); 1048 } else { 1049 SPDK_ERRLOG("unknown bdev_ch flag %x found\n", bdev_ch->flags); 1050 spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED); 1051 } 1052 bdev_io->internal.in_submit_request = false; 1053 } 1054 1055 static void 1056 spdk_bdev_io_submit(struct spdk_bdev_io *bdev_io) 1057 { 1058 struct spdk_bdev *bdev = bdev_io->bdev; 1059 struct spdk_thread *thread = spdk_io_channel_get_thread(bdev_io->internal.ch->channel); 1060 1061 assert(bdev_io->internal.status == SPDK_BDEV_IO_STATUS_PENDING); 1062 1063 if (bdev_io->internal.ch->flags & BDEV_CH_QOS_ENABLED) { 1064 if (thread == bdev->internal.qos->thread) { 1065 _spdk_bdev_io_submit(bdev_io); 1066 } else { 1067 bdev_io->internal.io_submit_ch = bdev_io->internal.ch; 1068 bdev_io->internal.ch = bdev->internal.qos->ch; 1069 spdk_thread_send_msg(bdev->internal.qos->thread, _spdk_bdev_io_submit, bdev_io); 1070 } 1071 } else { 1072 _spdk_bdev_io_submit(bdev_io); 1073 } 1074 } 1075 1076 static void 1077 spdk_bdev_io_submit_reset(struct spdk_bdev_io *bdev_io) 1078 { 1079 struct spdk_bdev *bdev = bdev_io->bdev; 1080 struct spdk_bdev_channel *bdev_ch = bdev_io->internal.ch; 1081 struct spdk_io_channel *ch = bdev_ch->channel; 1082 1083 assert(bdev_io->internal.status == SPDK_BDEV_IO_STATUS_PENDING); 1084 1085 bdev_io->internal.in_submit_request = true; 1086 bdev->fn_table->submit_request(ch, bdev_io); 1087 bdev_io->internal.in_submit_request = false; 1088 } 1089 1090 static void 1091 spdk_bdev_io_init(struct spdk_bdev_io *bdev_io, 1092 struct spdk_bdev *bdev, void *cb_arg, 1093 spdk_bdev_io_completion_cb cb) 1094 { 1095 bdev_io->bdev = bdev; 1096 bdev_io->internal.caller_ctx = cb_arg; 1097 bdev_io->internal.cb = cb; 1098 bdev_io->internal.status = SPDK_BDEV_IO_STATUS_PENDING; 1099 bdev_io->internal.in_submit_request = false; 1100 bdev_io->internal.buf = NULL; 1101 bdev_io->internal.io_submit_ch = NULL; 1102 } 1103 1104 static bool 1105 _spdk_bdev_io_type_supported(struct spdk_bdev *bdev, enum spdk_bdev_io_type io_type) 1106 { 1107 return bdev->fn_table->io_type_supported(bdev->ctxt, io_type); 1108 } 1109 1110 bool 1111 spdk_bdev_io_type_supported(struct spdk_bdev *bdev, enum spdk_bdev_io_type io_type) 1112 { 1113 bool supported; 1114 1115 supported = _spdk_bdev_io_type_supported(bdev, io_type); 1116 1117 if (!supported) { 1118 switch (io_type) { 1119 case SPDK_BDEV_IO_TYPE_WRITE_ZEROES: 1120 /* The bdev layer will emulate write zeroes as long as write is supported. */ 1121 supported = _spdk_bdev_io_type_supported(bdev, SPDK_BDEV_IO_TYPE_WRITE); 1122 break; 1123 default: 1124 break; 1125 } 1126 } 1127 1128 return supported; 1129 } 1130 1131 int 1132 spdk_bdev_dump_info_json(struct spdk_bdev *bdev, struct spdk_json_write_ctx *w) 1133 { 1134 if (bdev->fn_table->dump_info_json) { 1135 return bdev->fn_table->dump_info_json(bdev->ctxt, w); 1136 } 1137 1138 return 0; 1139 } 1140 1141 void 1142 spdk_bdev_config_json(struct spdk_bdev *bdev, struct spdk_json_write_ctx *w) 1143 { 1144 assert(bdev != NULL); 1145 assert(w != NULL); 1146 1147 if (bdev->fn_table->write_config_json) { 1148 bdev->fn_table->write_config_json(bdev, w); 1149 } else { 1150 spdk_json_write_object_begin(w); 1151 spdk_json_write_named_string(w, "name", bdev->name); 1152 spdk_json_write_object_end(w); 1153 } 1154 } 1155 1156 static void 1157 spdk_bdev_qos_update_max_quota_per_timeslice(struct spdk_bdev_qos *qos) 1158 { 1159 uint64_t max_ios_per_timeslice = 0, max_byte_per_timeslice = 0; 1160 1161 if (qos->iops_rate_limit > 0) { 1162 max_ios_per_timeslice = qos->iops_rate_limit * SPDK_BDEV_QOS_TIMESLICE_IN_USEC / 1163 SPDK_BDEV_SEC_TO_USEC; 1164 qos->max_ios_per_timeslice = spdk_max(max_ios_per_timeslice, 1165 SPDK_BDEV_QOS_MIN_IO_PER_TIMESLICE); 1166 } 1167 1168 if (qos->byte_rate_limit > 0) { 1169 max_byte_per_timeslice = qos->byte_rate_limit * SPDK_BDEV_QOS_TIMESLICE_IN_USEC / 1170 SPDK_BDEV_SEC_TO_USEC; 1171 qos->max_byte_per_timeslice = spdk_max(max_byte_per_timeslice, 1172 SPDK_BDEV_QOS_MIN_BYTE_PER_TIMESLICE); 1173 } 1174 } 1175 1176 static int 1177 spdk_bdev_channel_poll_qos(void *arg) 1178 { 1179 struct spdk_bdev_qos *qos = arg; 1180 1181 /* Reset for next round of rate limiting */ 1182 qos->io_submitted_this_timeslice = 0; 1183 qos->byte_submitted_this_timeslice = 0; 1184 1185 _spdk_bdev_qos_io_submit(qos->ch); 1186 1187 return -1; 1188 } 1189 1190 static void 1191 _spdk_bdev_channel_destroy_resource(struct spdk_bdev_channel *ch) 1192 { 1193 struct spdk_bdev_shared_resource *shared_resource; 1194 1195 if (!ch) { 1196 return; 1197 } 1198 1199 if (ch->channel) { 1200 spdk_put_io_channel(ch->channel); 1201 } 1202 1203 assert(ch->io_outstanding == 0); 1204 1205 shared_resource = ch->shared_resource; 1206 if (shared_resource) { 1207 assert(ch->io_outstanding == 0); 1208 assert(shared_resource->ref > 0); 1209 shared_resource->ref--; 1210 if (shared_resource->ref == 0) { 1211 assert(shared_resource->io_outstanding == 0); 1212 TAILQ_REMOVE(&shared_resource->mgmt_ch->shared_resources, shared_resource, link); 1213 spdk_put_io_channel(spdk_io_channel_from_ctx(shared_resource->mgmt_ch)); 1214 free(shared_resource); 1215 } 1216 } 1217 } 1218 1219 /* Caller must hold bdev->internal.mutex. */ 1220 static int 1221 _spdk_bdev_enable_qos(struct spdk_bdev *bdev, struct spdk_bdev_channel *ch) 1222 { 1223 struct spdk_bdev_qos *qos = bdev->internal.qos; 1224 1225 /* Rate limiting on this bdev enabled */ 1226 if (qos) { 1227 if (qos->ch == NULL) { 1228 struct spdk_io_channel *io_ch; 1229 1230 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Selecting channel %p as QoS channel for bdev %s on thread %p\n", ch, 1231 bdev->name, spdk_get_thread()); 1232 1233 /* No qos channel has been selected, so set one up */ 1234 1235 /* Take another reference to ch */ 1236 io_ch = spdk_get_io_channel(__bdev_to_io_dev(bdev)); 1237 qos->ch = ch; 1238 1239 qos->thread = spdk_io_channel_get_thread(io_ch); 1240 1241 TAILQ_INIT(&qos->queued); 1242 spdk_bdev_qos_update_max_quota_per_timeslice(qos); 1243 qos->io_submitted_this_timeslice = 0; 1244 qos->byte_submitted_this_timeslice = 0; 1245 1246 qos->poller = spdk_poller_register(spdk_bdev_channel_poll_qos, 1247 qos, 1248 SPDK_BDEV_QOS_TIMESLICE_IN_USEC); 1249 } 1250 1251 ch->flags |= BDEV_CH_QOS_ENABLED; 1252 } 1253 1254 return 0; 1255 } 1256 1257 static int 1258 spdk_bdev_channel_create(void *io_device, void *ctx_buf) 1259 { 1260 struct spdk_bdev *bdev = __bdev_from_io_dev(io_device); 1261 struct spdk_bdev_channel *ch = ctx_buf; 1262 struct spdk_io_channel *mgmt_io_ch; 1263 struct spdk_bdev_mgmt_channel *mgmt_ch; 1264 struct spdk_bdev_shared_resource *shared_resource; 1265 1266 ch->bdev = bdev; 1267 ch->channel = bdev->fn_table->get_io_channel(bdev->ctxt); 1268 if (!ch->channel) { 1269 return -1; 1270 } 1271 1272 mgmt_io_ch = spdk_get_io_channel(&g_bdev_mgr); 1273 if (!mgmt_io_ch) { 1274 return -1; 1275 } 1276 1277 mgmt_ch = spdk_io_channel_get_ctx(mgmt_io_ch); 1278 TAILQ_FOREACH(shared_resource, &mgmt_ch->shared_resources, link) { 1279 if (shared_resource->shared_ch == ch->channel) { 1280 spdk_put_io_channel(mgmt_io_ch); 1281 shared_resource->ref++; 1282 break; 1283 } 1284 } 1285 1286 if (shared_resource == NULL) { 1287 shared_resource = calloc(1, sizeof(*shared_resource)); 1288 if (shared_resource == NULL) { 1289 spdk_put_io_channel(mgmt_io_ch); 1290 return -1; 1291 } 1292 1293 shared_resource->mgmt_ch = mgmt_ch; 1294 shared_resource->io_outstanding = 0; 1295 TAILQ_INIT(&shared_resource->nomem_io); 1296 shared_resource->nomem_threshold = 0; 1297 shared_resource->shared_ch = ch->channel; 1298 shared_resource->ref = 1; 1299 TAILQ_INSERT_TAIL(&mgmt_ch->shared_resources, shared_resource, link); 1300 } 1301 1302 memset(&ch->stat, 0, sizeof(ch->stat)); 1303 ch->stat.ticks_rate = spdk_get_ticks_hz(); 1304 ch->io_outstanding = 0; 1305 TAILQ_INIT(&ch->queued_resets); 1306 ch->flags = 0; 1307 ch->shared_resource = shared_resource; 1308 1309 #ifdef SPDK_CONFIG_VTUNE 1310 { 1311 char *name; 1312 __itt_init_ittlib(NULL, 0); 1313 name = spdk_sprintf_alloc("spdk_bdev_%s_%p", ch->bdev->name, ch); 1314 if (!name) { 1315 _spdk_bdev_channel_destroy_resource(ch); 1316 return -1; 1317 } 1318 ch->handle = __itt_string_handle_create(name); 1319 free(name); 1320 ch->start_tsc = spdk_get_ticks(); 1321 ch->interval_tsc = spdk_get_ticks_hz() / 100; 1322 memset(&ch->prev_stat, 0, sizeof(ch->prev_stat)); 1323 } 1324 #endif 1325 1326 pthread_mutex_lock(&bdev->internal.mutex); 1327 1328 if (_spdk_bdev_enable_qos(bdev, ch)) { 1329 _spdk_bdev_channel_destroy_resource(ch); 1330 pthread_mutex_unlock(&bdev->internal.mutex); 1331 return -1; 1332 } 1333 1334 pthread_mutex_unlock(&bdev->internal.mutex); 1335 1336 return 0; 1337 } 1338 1339 /* 1340 * Abort I/O that are waiting on a data buffer. These types of I/O are 1341 * linked using the spdk_bdev_io internal.buf_link TAILQ_ENTRY. 1342 */ 1343 static void 1344 _spdk_bdev_abort_buf_io(bdev_io_stailq_t *queue, struct spdk_bdev_channel *ch) 1345 { 1346 bdev_io_stailq_t tmp; 1347 struct spdk_bdev_io *bdev_io; 1348 1349 STAILQ_INIT(&tmp); 1350 1351 while (!STAILQ_EMPTY(queue)) { 1352 bdev_io = STAILQ_FIRST(queue); 1353 STAILQ_REMOVE_HEAD(queue, internal.buf_link); 1354 if (bdev_io->internal.ch == ch) { 1355 spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED); 1356 } else { 1357 STAILQ_INSERT_TAIL(&tmp, bdev_io, internal.buf_link); 1358 } 1359 } 1360 1361 STAILQ_SWAP(&tmp, queue, spdk_bdev_io); 1362 } 1363 1364 /* 1365 * Abort I/O that are queued waiting for submission. These types of I/O are 1366 * linked using the spdk_bdev_io link TAILQ_ENTRY. 1367 */ 1368 static void 1369 _spdk_bdev_abort_queued_io(bdev_io_tailq_t *queue, struct spdk_bdev_channel *ch) 1370 { 1371 struct spdk_bdev_io *bdev_io, *tmp; 1372 1373 TAILQ_FOREACH_SAFE(bdev_io, queue, internal.link, tmp) { 1374 if (bdev_io->internal.ch == ch) { 1375 TAILQ_REMOVE(queue, bdev_io, internal.link); 1376 /* 1377 * spdk_bdev_io_complete() assumes that the completed I/O had 1378 * been submitted to the bdev module. Since in this case it 1379 * hadn't, bump io_outstanding to account for the decrement 1380 * that spdk_bdev_io_complete() will do. 1381 */ 1382 if (bdev_io->type != SPDK_BDEV_IO_TYPE_RESET) { 1383 ch->io_outstanding++; 1384 ch->shared_resource->io_outstanding++; 1385 } 1386 spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED); 1387 } 1388 } 1389 } 1390 1391 static void 1392 spdk_bdev_qos_channel_destroy(void *cb_arg) 1393 { 1394 struct spdk_bdev_qos *qos = cb_arg; 1395 1396 spdk_put_io_channel(spdk_io_channel_from_ctx(qos->ch)); 1397 spdk_poller_unregister(&qos->poller); 1398 1399 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Free QoS %p.\n", qos); 1400 1401 free(qos); 1402 } 1403 1404 static int 1405 spdk_bdev_qos_destroy(struct spdk_bdev *bdev) 1406 { 1407 /* 1408 * Cleanly shutting down the QoS poller is tricky, because 1409 * during the asynchronous operation the user could open 1410 * a new descriptor and create a new channel, spawning 1411 * a new QoS poller. 1412 * 1413 * The strategy is to create a new QoS structure here and swap it 1414 * in. The shutdown path then continues to refer to the old one 1415 * until it completes and then releases it. 1416 */ 1417 struct spdk_bdev_qos *new_qos, *old_qos; 1418 1419 old_qos = bdev->internal.qos; 1420 1421 new_qos = calloc(1, sizeof(*new_qos)); 1422 if (!new_qos) { 1423 SPDK_ERRLOG("Unable to allocate memory to shut down QoS.\n"); 1424 return -ENOMEM; 1425 } 1426 1427 /* Copy the old QoS data into the newly allocated structure */ 1428 memcpy(new_qos, old_qos, sizeof(*new_qos)); 1429 1430 /* Zero out the key parts of the QoS structure */ 1431 new_qos->ch = NULL; 1432 new_qos->thread = NULL; 1433 new_qos->max_ios_per_timeslice = 0; 1434 new_qos->max_byte_per_timeslice = 0; 1435 new_qos->io_submitted_this_timeslice = 0; 1436 new_qos->byte_submitted_this_timeslice = 0; 1437 new_qos->poller = NULL; 1438 TAILQ_INIT(&new_qos->queued); 1439 1440 bdev->internal.qos = new_qos; 1441 1442 spdk_thread_send_msg(old_qos->thread, spdk_bdev_qos_channel_destroy, 1443 old_qos); 1444 1445 /* It is safe to continue with destroying the bdev even though the QoS channel hasn't 1446 * been destroyed yet. The destruction path will end up waiting for the final 1447 * channel to be put before it releases resources. */ 1448 1449 return 0; 1450 } 1451 1452 static void 1453 spdk_bdev_channel_destroy(void *io_device, void *ctx_buf) 1454 { 1455 struct spdk_bdev_channel *ch = ctx_buf; 1456 struct spdk_bdev_mgmt_channel *mgmt_ch; 1457 struct spdk_bdev_shared_resource *shared_resource = ch->shared_resource; 1458 1459 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Destroying channel %p for bdev %s on thread %p\n", ch, ch->bdev->name, 1460 spdk_get_thread()); 1461 1462 mgmt_ch = shared_resource->mgmt_ch; 1463 1464 _spdk_bdev_abort_queued_io(&ch->queued_resets, ch); 1465 _spdk_bdev_abort_queued_io(&shared_resource->nomem_io, ch); 1466 _spdk_bdev_abort_buf_io(&mgmt_ch->need_buf_small, ch); 1467 _spdk_bdev_abort_buf_io(&mgmt_ch->need_buf_large, ch); 1468 1469 _spdk_bdev_channel_destroy_resource(ch); 1470 } 1471 1472 int 1473 spdk_bdev_alias_add(struct spdk_bdev *bdev, const char *alias) 1474 { 1475 struct spdk_bdev_alias *tmp; 1476 1477 if (alias == NULL) { 1478 SPDK_ERRLOG("Empty alias passed\n"); 1479 return -EINVAL; 1480 } 1481 1482 if (spdk_bdev_get_by_name(alias)) { 1483 SPDK_ERRLOG("Bdev name/alias: %s already exists\n", alias); 1484 return -EEXIST; 1485 } 1486 1487 tmp = calloc(1, sizeof(*tmp)); 1488 if (tmp == NULL) { 1489 SPDK_ERRLOG("Unable to allocate alias\n"); 1490 return -ENOMEM; 1491 } 1492 1493 tmp->alias = strdup(alias); 1494 if (tmp->alias == NULL) { 1495 free(tmp); 1496 SPDK_ERRLOG("Unable to allocate alias\n"); 1497 return -ENOMEM; 1498 } 1499 1500 TAILQ_INSERT_TAIL(&bdev->aliases, tmp, tailq); 1501 1502 return 0; 1503 } 1504 1505 int 1506 spdk_bdev_alias_del(struct spdk_bdev *bdev, const char *alias) 1507 { 1508 struct spdk_bdev_alias *tmp; 1509 1510 TAILQ_FOREACH(tmp, &bdev->aliases, tailq) { 1511 if (strcmp(alias, tmp->alias) == 0) { 1512 TAILQ_REMOVE(&bdev->aliases, tmp, tailq); 1513 free(tmp->alias); 1514 free(tmp); 1515 return 0; 1516 } 1517 } 1518 1519 SPDK_INFOLOG(SPDK_LOG_BDEV, "Alias %s does not exists\n", alias); 1520 1521 return -ENOENT; 1522 } 1523 1524 struct spdk_io_channel * 1525 spdk_bdev_get_io_channel(struct spdk_bdev_desc *desc) 1526 { 1527 return spdk_get_io_channel(__bdev_to_io_dev(desc->bdev)); 1528 } 1529 1530 const char * 1531 spdk_bdev_get_name(const struct spdk_bdev *bdev) 1532 { 1533 return bdev->name; 1534 } 1535 1536 const char * 1537 spdk_bdev_get_product_name(const struct spdk_bdev *bdev) 1538 { 1539 return bdev->product_name; 1540 } 1541 1542 const struct spdk_bdev_aliases_list * 1543 spdk_bdev_get_aliases(const struct spdk_bdev *bdev) 1544 { 1545 return &bdev->aliases; 1546 } 1547 1548 uint32_t 1549 spdk_bdev_get_block_size(const struct spdk_bdev *bdev) 1550 { 1551 return bdev->blocklen; 1552 } 1553 1554 uint64_t 1555 spdk_bdev_get_num_blocks(const struct spdk_bdev *bdev) 1556 { 1557 return bdev->blockcnt; 1558 } 1559 1560 uint64_t 1561 spdk_bdev_get_qos_ios_per_sec(struct spdk_bdev *bdev) 1562 { 1563 uint64_t iops_rate_limit = 0; 1564 1565 pthread_mutex_lock(&bdev->internal.mutex); 1566 if (bdev->internal.qos) { 1567 iops_rate_limit = bdev->internal.qos->iops_rate_limit; 1568 } 1569 pthread_mutex_unlock(&bdev->internal.mutex); 1570 1571 return iops_rate_limit; 1572 } 1573 1574 size_t 1575 spdk_bdev_get_buf_align(const struct spdk_bdev *bdev) 1576 { 1577 /* TODO: push this logic down to the bdev modules */ 1578 if (bdev->need_aligned_buffer) { 1579 return bdev->blocklen; 1580 } 1581 1582 return 1; 1583 } 1584 1585 uint32_t 1586 spdk_bdev_get_optimal_io_boundary(const struct spdk_bdev *bdev) 1587 { 1588 return bdev->optimal_io_boundary; 1589 } 1590 1591 bool 1592 spdk_bdev_has_write_cache(const struct spdk_bdev *bdev) 1593 { 1594 return bdev->write_cache; 1595 } 1596 1597 const struct spdk_uuid * 1598 spdk_bdev_get_uuid(const struct spdk_bdev *bdev) 1599 { 1600 return &bdev->uuid; 1601 } 1602 1603 int 1604 spdk_bdev_notify_blockcnt_change(struct spdk_bdev *bdev, uint64_t size) 1605 { 1606 int ret; 1607 1608 pthread_mutex_lock(&bdev->internal.mutex); 1609 1610 /* bdev has open descriptors */ 1611 if (!TAILQ_EMPTY(&bdev->internal.open_descs) && 1612 bdev->blockcnt > size) { 1613 ret = -EBUSY; 1614 } else { 1615 bdev->blockcnt = size; 1616 ret = 0; 1617 } 1618 1619 pthread_mutex_unlock(&bdev->internal.mutex); 1620 1621 return ret; 1622 } 1623 1624 /* 1625 * Convert I/O offset and length from bytes to blocks. 1626 * 1627 * Returns zero on success or non-zero if the byte parameters aren't divisible by the block size. 1628 */ 1629 static uint64_t 1630 spdk_bdev_bytes_to_blocks(struct spdk_bdev *bdev, uint64_t offset_bytes, uint64_t *offset_blocks, 1631 uint64_t num_bytes, uint64_t *num_blocks) 1632 { 1633 uint32_t block_size = bdev->blocklen; 1634 1635 *offset_blocks = offset_bytes / block_size; 1636 *num_blocks = num_bytes / block_size; 1637 1638 return (offset_bytes % block_size) | (num_bytes % block_size); 1639 } 1640 1641 static bool 1642 spdk_bdev_io_valid_blocks(struct spdk_bdev *bdev, uint64_t offset_blocks, uint64_t num_blocks) 1643 { 1644 /* Return failure if offset_blocks + num_blocks is less than offset_blocks; indicates there 1645 * has been an overflow and hence the offset has been wrapped around */ 1646 if (offset_blocks + num_blocks < offset_blocks) { 1647 return false; 1648 } 1649 1650 /* Return failure if offset_blocks + num_blocks exceeds the size of the bdev */ 1651 if (offset_blocks + num_blocks > bdev->blockcnt) { 1652 return false; 1653 } 1654 1655 return true; 1656 } 1657 1658 int 1659 spdk_bdev_read(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 1660 void *buf, uint64_t offset, uint64_t nbytes, 1661 spdk_bdev_io_completion_cb cb, void *cb_arg) 1662 { 1663 uint64_t offset_blocks, num_blocks; 1664 1665 if (spdk_bdev_bytes_to_blocks(desc->bdev, offset, &offset_blocks, nbytes, &num_blocks) != 0) { 1666 return -EINVAL; 1667 } 1668 1669 return spdk_bdev_read_blocks(desc, ch, buf, offset_blocks, num_blocks, cb, cb_arg); 1670 } 1671 1672 int 1673 spdk_bdev_read_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 1674 void *buf, uint64_t offset_blocks, uint64_t num_blocks, 1675 spdk_bdev_io_completion_cb cb, void *cb_arg) 1676 { 1677 struct spdk_bdev *bdev = desc->bdev; 1678 struct spdk_bdev_io *bdev_io; 1679 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 1680 1681 if (!spdk_bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) { 1682 return -EINVAL; 1683 } 1684 1685 bdev_io = spdk_bdev_get_io(channel); 1686 if (!bdev_io) { 1687 return -ENOMEM; 1688 } 1689 1690 bdev_io->internal.ch = channel; 1691 bdev_io->type = SPDK_BDEV_IO_TYPE_READ; 1692 bdev_io->u.bdev.iov.iov_base = buf; 1693 bdev_io->u.bdev.iov.iov_len = num_blocks * bdev->blocklen; 1694 bdev_io->u.bdev.iovs = &bdev_io->u.bdev.iov; 1695 bdev_io->u.bdev.iovcnt = 1; 1696 bdev_io->u.bdev.num_blocks = num_blocks; 1697 bdev_io->u.bdev.offset_blocks = offset_blocks; 1698 spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb); 1699 1700 spdk_bdev_io_submit(bdev_io); 1701 return 0; 1702 } 1703 1704 int 1705 spdk_bdev_readv(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 1706 struct iovec *iov, int iovcnt, 1707 uint64_t offset, uint64_t nbytes, 1708 spdk_bdev_io_completion_cb cb, void *cb_arg) 1709 { 1710 uint64_t offset_blocks, num_blocks; 1711 1712 if (spdk_bdev_bytes_to_blocks(desc->bdev, offset, &offset_blocks, nbytes, &num_blocks) != 0) { 1713 return -EINVAL; 1714 } 1715 1716 return spdk_bdev_readv_blocks(desc, ch, iov, iovcnt, offset_blocks, num_blocks, cb, cb_arg); 1717 } 1718 1719 int spdk_bdev_readv_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 1720 struct iovec *iov, int iovcnt, 1721 uint64_t offset_blocks, uint64_t num_blocks, 1722 spdk_bdev_io_completion_cb cb, void *cb_arg) 1723 { 1724 struct spdk_bdev *bdev = desc->bdev; 1725 struct spdk_bdev_io *bdev_io; 1726 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 1727 1728 if (!spdk_bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) { 1729 return -EINVAL; 1730 } 1731 1732 bdev_io = spdk_bdev_get_io(channel); 1733 if (!bdev_io) { 1734 return -ENOMEM; 1735 } 1736 1737 bdev_io->internal.ch = channel; 1738 bdev_io->type = SPDK_BDEV_IO_TYPE_READ; 1739 bdev_io->u.bdev.iovs = iov; 1740 bdev_io->u.bdev.iovcnt = iovcnt; 1741 bdev_io->u.bdev.num_blocks = num_blocks; 1742 bdev_io->u.bdev.offset_blocks = offset_blocks; 1743 spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb); 1744 1745 spdk_bdev_io_submit(bdev_io); 1746 return 0; 1747 } 1748 1749 int 1750 spdk_bdev_write(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 1751 void *buf, uint64_t offset, uint64_t nbytes, 1752 spdk_bdev_io_completion_cb cb, void *cb_arg) 1753 { 1754 uint64_t offset_blocks, num_blocks; 1755 1756 if (spdk_bdev_bytes_to_blocks(desc->bdev, offset, &offset_blocks, nbytes, &num_blocks) != 0) { 1757 return -EINVAL; 1758 } 1759 1760 return spdk_bdev_write_blocks(desc, ch, buf, offset_blocks, num_blocks, cb, cb_arg); 1761 } 1762 1763 int 1764 spdk_bdev_write_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 1765 void *buf, uint64_t offset_blocks, uint64_t num_blocks, 1766 spdk_bdev_io_completion_cb cb, void *cb_arg) 1767 { 1768 struct spdk_bdev *bdev = desc->bdev; 1769 struct spdk_bdev_io *bdev_io; 1770 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 1771 1772 if (!desc->write) { 1773 return -EBADF; 1774 } 1775 1776 if (!spdk_bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) { 1777 return -EINVAL; 1778 } 1779 1780 bdev_io = spdk_bdev_get_io(channel); 1781 if (!bdev_io) { 1782 return -ENOMEM; 1783 } 1784 1785 bdev_io->internal.ch = channel; 1786 bdev_io->type = SPDK_BDEV_IO_TYPE_WRITE; 1787 bdev_io->u.bdev.iov.iov_base = buf; 1788 bdev_io->u.bdev.iov.iov_len = num_blocks * bdev->blocklen; 1789 bdev_io->u.bdev.iovs = &bdev_io->u.bdev.iov; 1790 bdev_io->u.bdev.iovcnt = 1; 1791 bdev_io->u.bdev.num_blocks = num_blocks; 1792 bdev_io->u.bdev.offset_blocks = offset_blocks; 1793 spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb); 1794 1795 spdk_bdev_io_submit(bdev_io); 1796 return 0; 1797 } 1798 1799 int 1800 spdk_bdev_writev(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 1801 struct iovec *iov, int iovcnt, 1802 uint64_t offset, uint64_t len, 1803 spdk_bdev_io_completion_cb cb, void *cb_arg) 1804 { 1805 uint64_t offset_blocks, num_blocks; 1806 1807 if (spdk_bdev_bytes_to_blocks(desc->bdev, offset, &offset_blocks, len, &num_blocks) != 0) { 1808 return -EINVAL; 1809 } 1810 1811 return spdk_bdev_writev_blocks(desc, ch, iov, iovcnt, offset_blocks, num_blocks, cb, cb_arg); 1812 } 1813 1814 int 1815 spdk_bdev_writev_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 1816 struct iovec *iov, int iovcnt, 1817 uint64_t offset_blocks, uint64_t num_blocks, 1818 spdk_bdev_io_completion_cb cb, void *cb_arg) 1819 { 1820 struct spdk_bdev *bdev = desc->bdev; 1821 struct spdk_bdev_io *bdev_io; 1822 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 1823 1824 if (!desc->write) { 1825 return -EBADF; 1826 } 1827 1828 if (!spdk_bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) { 1829 return -EINVAL; 1830 } 1831 1832 bdev_io = spdk_bdev_get_io(channel); 1833 if (!bdev_io) { 1834 return -ENOMEM; 1835 } 1836 1837 bdev_io->internal.ch = channel; 1838 bdev_io->type = SPDK_BDEV_IO_TYPE_WRITE; 1839 bdev_io->u.bdev.iovs = iov; 1840 bdev_io->u.bdev.iovcnt = iovcnt; 1841 bdev_io->u.bdev.num_blocks = num_blocks; 1842 bdev_io->u.bdev.offset_blocks = offset_blocks; 1843 spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb); 1844 1845 spdk_bdev_io_submit(bdev_io); 1846 return 0; 1847 } 1848 1849 int 1850 spdk_bdev_write_zeroes(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 1851 uint64_t offset, uint64_t len, 1852 spdk_bdev_io_completion_cb cb, void *cb_arg) 1853 { 1854 uint64_t offset_blocks, num_blocks; 1855 1856 if (spdk_bdev_bytes_to_blocks(desc->bdev, offset, &offset_blocks, len, &num_blocks) != 0) { 1857 return -EINVAL; 1858 } 1859 1860 return spdk_bdev_write_zeroes_blocks(desc, ch, offset_blocks, num_blocks, cb, cb_arg); 1861 } 1862 1863 int 1864 spdk_bdev_write_zeroes_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 1865 uint64_t offset_blocks, uint64_t num_blocks, 1866 spdk_bdev_io_completion_cb cb, void *cb_arg) 1867 { 1868 struct spdk_bdev *bdev = desc->bdev; 1869 struct spdk_bdev_io *bdev_io; 1870 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 1871 uint64_t len; 1872 bool split_request = false; 1873 1874 if (!desc->write) { 1875 return -EBADF; 1876 } 1877 1878 if (!spdk_bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) { 1879 return -EINVAL; 1880 } 1881 1882 bdev_io = spdk_bdev_get_io(channel); 1883 1884 if (!bdev_io) { 1885 return -ENOMEM; 1886 } 1887 1888 bdev_io->internal.ch = channel; 1889 bdev_io->u.bdev.offset_blocks = offset_blocks; 1890 1891 if (_spdk_bdev_io_type_supported(bdev, SPDK_BDEV_IO_TYPE_WRITE_ZEROES)) { 1892 bdev_io->type = SPDK_BDEV_IO_TYPE_WRITE_ZEROES; 1893 bdev_io->u.bdev.num_blocks = num_blocks; 1894 bdev_io->u.bdev.iovs = NULL; 1895 bdev_io->u.bdev.iovcnt = 0; 1896 1897 } else if (_spdk_bdev_io_type_supported(bdev, SPDK_BDEV_IO_TYPE_WRITE)) { 1898 assert(spdk_bdev_get_block_size(bdev) <= ZERO_BUFFER_SIZE); 1899 1900 len = spdk_bdev_get_block_size(bdev) * num_blocks; 1901 1902 if (len > ZERO_BUFFER_SIZE) { 1903 split_request = true; 1904 len = ZERO_BUFFER_SIZE; 1905 } 1906 1907 bdev_io->type = SPDK_BDEV_IO_TYPE_WRITE; 1908 bdev_io->u.bdev.iov.iov_base = g_bdev_mgr.zero_buffer; 1909 bdev_io->u.bdev.iov.iov_len = len; 1910 bdev_io->u.bdev.iovs = &bdev_io->u.bdev.iov; 1911 bdev_io->u.bdev.iovcnt = 1; 1912 bdev_io->u.bdev.num_blocks = len / spdk_bdev_get_block_size(bdev); 1913 bdev_io->u.bdev.split_remaining_num_blocks = num_blocks - bdev_io->u.bdev.num_blocks; 1914 bdev_io->u.bdev.split_current_offset_blocks = offset_blocks + bdev_io->u.bdev.num_blocks; 1915 } else { 1916 spdk_bdev_free_io(bdev_io); 1917 return -ENOTSUP; 1918 } 1919 1920 if (split_request) { 1921 bdev_io->u.bdev.stored_user_cb = cb; 1922 spdk_bdev_io_init(bdev_io, bdev, cb_arg, spdk_bdev_write_zeroes_split); 1923 } else { 1924 spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb); 1925 } 1926 spdk_bdev_io_submit(bdev_io); 1927 return 0; 1928 } 1929 1930 int 1931 spdk_bdev_unmap(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 1932 uint64_t offset, uint64_t nbytes, 1933 spdk_bdev_io_completion_cb cb, void *cb_arg) 1934 { 1935 uint64_t offset_blocks, num_blocks; 1936 1937 if (spdk_bdev_bytes_to_blocks(desc->bdev, offset, &offset_blocks, nbytes, &num_blocks) != 0) { 1938 return -EINVAL; 1939 } 1940 1941 return spdk_bdev_unmap_blocks(desc, ch, offset_blocks, num_blocks, cb, cb_arg); 1942 } 1943 1944 int 1945 spdk_bdev_unmap_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 1946 uint64_t offset_blocks, uint64_t num_blocks, 1947 spdk_bdev_io_completion_cb cb, void *cb_arg) 1948 { 1949 struct spdk_bdev *bdev = desc->bdev; 1950 struct spdk_bdev_io *bdev_io; 1951 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 1952 1953 if (!desc->write) { 1954 return -EBADF; 1955 } 1956 1957 if (!spdk_bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) { 1958 return -EINVAL; 1959 } 1960 1961 if (num_blocks == 0) { 1962 SPDK_ERRLOG("Can't unmap 0 bytes\n"); 1963 return -EINVAL; 1964 } 1965 1966 bdev_io = spdk_bdev_get_io(channel); 1967 if (!bdev_io) { 1968 return -ENOMEM; 1969 } 1970 1971 bdev_io->internal.ch = channel; 1972 bdev_io->type = SPDK_BDEV_IO_TYPE_UNMAP; 1973 bdev_io->u.bdev.iov.iov_base = NULL; 1974 bdev_io->u.bdev.iov.iov_len = 0; 1975 bdev_io->u.bdev.iovs = &bdev_io->u.bdev.iov; 1976 bdev_io->u.bdev.iovcnt = 1; 1977 bdev_io->u.bdev.offset_blocks = offset_blocks; 1978 bdev_io->u.bdev.num_blocks = num_blocks; 1979 spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb); 1980 1981 spdk_bdev_io_submit(bdev_io); 1982 return 0; 1983 } 1984 1985 int 1986 spdk_bdev_flush(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 1987 uint64_t offset, uint64_t length, 1988 spdk_bdev_io_completion_cb cb, void *cb_arg) 1989 { 1990 uint64_t offset_blocks, num_blocks; 1991 1992 if (spdk_bdev_bytes_to_blocks(desc->bdev, offset, &offset_blocks, length, &num_blocks) != 0) { 1993 return -EINVAL; 1994 } 1995 1996 return spdk_bdev_flush_blocks(desc, ch, offset_blocks, num_blocks, cb, cb_arg); 1997 } 1998 1999 int 2000 spdk_bdev_flush_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 2001 uint64_t offset_blocks, uint64_t num_blocks, 2002 spdk_bdev_io_completion_cb cb, void *cb_arg) 2003 { 2004 struct spdk_bdev *bdev = desc->bdev; 2005 struct spdk_bdev_io *bdev_io; 2006 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 2007 2008 if (!desc->write) { 2009 return -EBADF; 2010 } 2011 2012 if (!spdk_bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) { 2013 return -EINVAL; 2014 } 2015 2016 bdev_io = spdk_bdev_get_io(channel); 2017 if (!bdev_io) { 2018 return -ENOMEM; 2019 } 2020 2021 bdev_io->internal.ch = channel; 2022 bdev_io->type = SPDK_BDEV_IO_TYPE_FLUSH; 2023 bdev_io->u.bdev.iovs = NULL; 2024 bdev_io->u.bdev.iovcnt = 0; 2025 bdev_io->u.bdev.offset_blocks = offset_blocks; 2026 bdev_io->u.bdev.num_blocks = num_blocks; 2027 spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb); 2028 2029 spdk_bdev_io_submit(bdev_io); 2030 return 0; 2031 } 2032 2033 static void 2034 _spdk_bdev_reset_dev(struct spdk_io_channel_iter *i, int status) 2035 { 2036 struct spdk_bdev_channel *ch = spdk_io_channel_iter_get_ctx(i); 2037 struct spdk_bdev_io *bdev_io; 2038 2039 bdev_io = TAILQ_FIRST(&ch->queued_resets); 2040 TAILQ_REMOVE(&ch->queued_resets, bdev_io, internal.link); 2041 spdk_bdev_io_submit_reset(bdev_io); 2042 } 2043 2044 static void 2045 _spdk_bdev_reset_freeze_channel(struct spdk_io_channel_iter *i) 2046 { 2047 struct spdk_io_channel *ch; 2048 struct spdk_bdev_channel *channel; 2049 struct spdk_bdev_mgmt_channel *mgmt_channel; 2050 struct spdk_bdev_shared_resource *shared_resource; 2051 bdev_io_tailq_t tmp_queued; 2052 2053 TAILQ_INIT(&tmp_queued); 2054 2055 ch = spdk_io_channel_iter_get_channel(i); 2056 channel = spdk_io_channel_get_ctx(ch); 2057 shared_resource = channel->shared_resource; 2058 mgmt_channel = shared_resource->mgmt_ch; 2059 2060 channel->flags |= BDEV_CH_RESET_IN_PROGRESS; 2061 2062 if ((channel->flags & BDEV_CH_QOS_ENABLED) != 0) { 2063 /* The QoS object is always valid and readable while 2064 * the channel flag is set, so the lock here should not 2065 * be necessary. We're not in the fast path though, so 2066 * just take it anyway. */ 2067 pthread_mutex_lock(&channel->bdev->internal.mutex); 2068 if (channel->bdev->internal.qos->ch == channel) { 2069 TAILQ_SWAP(&channel->bdev->internal.qos->queued, &tmp_queued, spdk_bdev_io, internal.link); 2070 } 2071 pthread_mutex_unlock(&channel->bdev->internal.mutex); 2072 } 2073 2074 _spdk_bdev_abort_queued_io(&shared_resource->nomem_io, channel); 2075 _spdk_bdev_abort_buf_io(&mgmt_channel->need_buf_small, channel); 2076 _spdk_bdev_abort_buf_io(&mgmt_channel->need_buf_large, channel); 2077 _spdk_bdev_abort_queued_io(&tmp_queued, channel); 2078 2079 spdk_for_each_channel_continue(i, 0); 2080 } 2081 2082 static void 2083 _spdk_bdev_start_reset(void *ctx) 2084 { 2085 struct spdk_bdev_channel *ch = ctx; 2086 2087 spdk_for_each_channel(__bdev_to_io_dev(ch->bdev), _spdk_bdev_reset_freeze_channel, 2088 ch, _spdk_bdev_reset_dev); 2089 } 2090 2091 static void 2092 _spdk_bdev_channel_start_reset(struct spdk_bdev_channel *ch) 2093 { 2094 struct spdk_bdev *bdev = ch->bdev; 2095 2096 assert(!TAILQ_EMPTY(&ch->queued_resets)); 2097 2098 pthread_mutex_lock(&bdev->internal.mutex); 2099 if (bdev->internal.reset_in_progress == NULL) { 2100 bdev->internal.reset_in_progress = TAILQ_FIRST(&ch->queued_resets); 2101 /* 2102 * Take a channel reference for the target bdev for the life of this 2103 * reset. This guards against the channel getting destroyed while 2104 * spdk_for_each_channel() calls related to this reset IO are in 2105 * progress. We will release the reference when this reset is 2106 * completed. 2107 */ 2108 bdev->internal.reset_in_progress->u.reset.ch_ref = spdk_get_io_channel(__bdev_to_io_dev(bdev)); 2109 _spdk_bdev_start_reset(ch); 2110 } 2111 pthread_mutex_unlock(&bdev->internal.mutex); 2112 } 2113 2114 int 2115 spdk_bdev_reset(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 2116 spdk_bdev_io_completion_cb cb, void *cb_arg) 2117 { 2118 struct spdk_bdev *bdev = desc->bdev; 2119 struct spdk_bdev_io *bdev_io; 2120 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 2121 2122 bdev_io = spdk_bdev_get_io(channel); 2123 if (!bdev_io) { 2124 return -ENOMEM; 2125 } 2126 2127 bdev_io->internal.ch = channel; 2128 bdev_io->type = SPDK_BDEV_IO_TYPE_RESET; 2129 bdev_io->u.reset.ch_ref = NULL; 2130 spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb); 2131 2132 pthread_mutex_lock(&bdev->internal.mutex); 2133 TAILQ_INSERT_TAIL(&channel->queued_resets, bdev_io, internal.link); 2134 pthread_mutex_unlock(&bdev->internal.mutex); 2135 2136 _spdk_bdev_channel_start_reset(channel); 2137 2138 return 0; 2139 } 2140 2141 void 2142 spdk_bdev_get_io_stat(struct spdk_bdev *bdev, struct spdk_io_channel *ch, 2143 struct spdk_bdev_io_stat *stat) 2144 { 2145 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 2146 2147 *stat = channel->stat; 2148 } 2149 2150 static void 2151 _spdk_bdev_get_device_stat_done(struct spdk_io_channel_iter *i, int status) 2152 { 2153 void *io_device = spdk_io_channel_iter_get_io_device(i); 2154 struct spdk_bdev_iostat_ctx *bdev_iostat_ctx = spdk_io_channel_iter_get_ctx(i); 2155 2156 bdev_iostat_ctx->cb(__bdev_from_io_dev(io_device), bdev_iostat_ctx->stat, 2157 bdev_iostat_ctx->cb_arg, 0); 2158 free(bdev_iostat_ctx); 2159 } 2160 2161 static void 2162 _spdk_bdev_get_each_channel_stat(struct spdk_io_channel_iter *i) 2163 { 2164 struct spdk_bdev_iostat_ctx *bdev_iostat_ctx = spdk_io_channel_iter_get_ctx(i); 2165 struct spdk_io_channel *ch = spdk_io_channel_iter_get_channel(i); 2166 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 2167 2168 bdev_iostat_ctx->stat->bytes_read += channel->stat.bytes_read; 2169 bdev_iostat_ctx->stat->num_read_ops += channel->stat.num_read_ops; 2170 bdev_iostat_ctx->stat->bytes_written += channel->stat.bytes_written; 2171 bdev_iostat_ctx->stat->num_write_ops += channel->stat.num_write_ops; 2172 2173 spdk_for_each_channel_continue(i, 0); 2174 } 2175 2176 void 2177 spdk_bdev_get_device_stat(struct spdk_bdev *bdev, struct spdk_bdev_io_stat *stat, 2178 spdk_bdev_get_device_stat_cb cb, void *cb_arg) 2179 { 2180 struct spdk_bdev_iostat_ctx *bdev_iostat_ctx; 2181 2182 assert(bdev != NULL); 2183 assert(stat != NULL); 2184 assert(cb != NULL); 2185 2186 bdev_iostat_ctx = calloc(1, sizeof(struct spdk_bdev_iostat_ctx)); 2187 if (bdev_iostat_ctx == NULL) { 2188 SPDK_ERRLOG("Unable to allocate memory for spdk_bdev_iostat_ctx\n"); 2189 cb(bdev, stat, cb_arg, -ENOMEM); 2190 return; 2191 } 2192 2193 bdev_iostat_ctx->stat = stat; 2194 bdev_iostat_ctx->cb = cb; 2195 bdev_iostat_ctx->cb_arg = cb_arg; 2196 2197 spdk_for_each_channel(__bdev_to_io_dev(bdev), 2198 _spdk_bdev_get_each_channel_stat, 2199 bdev_iostat_ctx, 2200 _spdk_bdev_get_device_stat_done); 2201 } 2202 2203 int 2204 spdk_bdev_nvme_admin_passthru(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 2205 const struct spdk_nvme_cmd *cmd, void *buf, size_t nbytes, 2206 spdk_bdev_io_completion_cb cb, void *cb_arg) 2207 { 2208 struct spdk_bdev *bdev = desc->bdev; 2209 struct spdk_bdev_io *bdev_io; 2210 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 2211 2212 if (!desc->write) { 2213 return -EBADF; 2214 } 2215 2216 bdev_io = spdk_bdev_get_io(channel); 2217 if (!bdev_io) { 2218 return -ENOMEM; 2219 } 2220 2221 bdev_io->internal.ch = channel; 2222 bdev_io->type = SPDK_BDEV_IO_TYPE_NVME_ADMIN; 2223 bdev_io->u.nvme_passthru.cmd = *cmd; 2224 bdev_io->u.nvme_passthru.buf = buf; 2225 bdev_io->u.nvme_passthru.nbytes = nbytes; 2226 bdev_io->u.nvme_passthru.md_buf = NULL; 2227 bdev_io->u.nvme_passthru.md_len = 0; 2228 2229 spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb); 2230 2231 spdk_bdev_io_submit(bdev_io); 2232 return 0; 2233 } 2234 2235 int 2236 spdk_bdev_nvme_io_passthru(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 2237 const struct spdk_nvme_cmd *cmd, void *buf, size_t nbytes, 2238 spdk_bdev_io_completion_cb cb, void *cb_arg) 2239 { 2240 struct spdk_bdev *bdev = desc->bdev; 2241 struct spdk_bdev_io *bdev_io; 2242 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 2243 2244 if (!desc->write) { 2245 /* 2246 * Do not try to parse the NVMe command - we could maybe use bits in the opcode 2247 * to easily determine if the command is a read or write, but for now just 2248 * do not allow io_passthru with a read-only descriptor. 2249 */ 2250 return -EBADF; 2251 } 2252 2253 bdev_io = spdk_bdev_get_io(channel); 2254 if (!bdev_io) { 2255 return -ENOMEM; 2256 } 2257 2258 bdev_io->internal.ch = channel; 2259 bdev_io->type = SPDK_BDEV_IO_TYPE_NVME_IO; 2260 bdev_io->u.nvme_passthru.cmd = *cmd; 2261 bdev_io->u.nvme_passthru.buf = buf; 2262 bdev_io->u.nvme_passthru.nbytes = nbytes; 2263 bdev_io->u.nvme_passthru.md_buf = NULL; 2264 bdev_io->u.nvme_passthru.md_len = 0; 2265 2266 spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb); 2267 2268 spdk_bdev_io_submit(bdev_io); 2269 return 0; 2270 } 2271 2272 int 2273 spdk_bdev_nvme_io_passthru_md(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 2274 const struct spdk_nvme_cmd *cmd, void *buf, size_t nbytes, void *md_buf, size_t md_len, 2275 spdk_bdev_io_completion_cb cb, void *cb_arg) 2276 { 2277 struct spdk_bdev *bdev = desc->bdev; 2278 struct spdk_bdev_io *bdev_io; 2279 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 2280 2281 if (!desc->write) { 2282 /* 2283 * Do not try to parse the NVMe command - we could maybe use bits in the opcode 2284 * to easily determine if the command is a read or write, but for now just 2285 * do not allow io_passthru with a read-only descriptor. 2286 */ 2287 return -EBADF; 2288 } 2289 2290 bdev_io = spdk_bdev_get_io(channel); 2291 if (!bdev_io) { 2292 return -ENOMEM; 2293 } 2294 2295 bdev_io->internal.ch = channel; 2296 bdev_io->type = SPDK_BDEV_IO_TYPE_NVME_IO_MD; 2297 bdev_io->u.nvme_passthru.cmd = *cmd; 2298 bdev_io->u.nvme_passthru.buf = buf; 2299 bdev_io->u.nvme_passthru.nbytes = nbytes; 2300 bdev_io->u.nvme_passthru.md_buf = md_buf; 2301 bdev_io->u.nvme_passthru.md_len = md_len; 2302 2303 spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb); 2304 2305 spdk_bdev_io_submit(bdev_io); 2306 return 0; 2307 } 2308 2309 int 2310 spdk_bdev_queue_io_wait(struct spdk_bdev *bdev, struct spdk_io_channel *ch, 2311 struct spdk_bdev_io_wait_entry *entry) 2312 { 2313 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 2314 struct spdk_bdev_mgmt_channel *mgmt_ch = channel->shared_resource->mgmt_ch; 2315 2316 if (bdev != entry->bdev) { 2317 SPDK_ERRLOG("bdevs do not match\n"); 2318 return -EINVAL; 2319 } 2320 2321 if (mgmt_ch->per_thread_cache_count > 0) { 2322 SPDK_ERRLOG("Cannot queue io_wait if spdk_bdev_io available in per-thread cache\n"); 2323 return -EINVAL; 2324 } 2325 2326 TAILQ_INSERT_TAIL(&mgmt_ch->io_wait_queue, entry, link); 2327 return 0; 2328 } 2329 2330 static void 2331 _spdk_bdev_ch_retry_io(struct spdk_bdev_channel *bdev_ch) 2332 { 2333 struct spdk_bdev *bdev = bdev_ch->bdev; 2334 struct spdk_bdev_shared_resource *shared_resource = bdev_ch->shared_resource; 2335 struct spdk_bdev_io *bdev_io; 2336 2337 if (shared_resource->io_outstanding > shared_resource->nomem_threshold) { 2338 /* 2339 * Allow some more I/O to complete before retrying the nomem_io queue. 2340 * Some drivers (such as nvme) cannot immediately take a new I/O in 2341 * the context of a completion, because the resources for the I/O are 2342 * not released until control returns to the bdev poller. Also, we 2343 * may require several small I/O to complete before a larger I/O 2344 * (that requires splitting) can be submitted. 2345 */ 2346 return; 2347 } 2348 2349 while (!TAILQ_EMPTY(&shared_resource->nomem_io)) { 2350 bdev_io = TAILQ_FIRST(&shared_resource->nomem_io); 2351 TAILQ_REMOVE(&shared_resource->nomem_io, bdev_io, internal.link); 2352 bdev_io->internal.ch->io_outstanding++; 2353 shared_resource->io_outstanding++; 2354 bdev_io->internal.status = SPDK_BDEV_IO_STATUS_PENDING; 2355 bdev->fn_table->submit_request(bdev_io->internal.ch->channel, bdev_io); 2356 if (bdev_io->internal.status == SPDK_BDEV_IO_STATUS_NOMEM) { 2357 break; 2358 } 2359 } 2360 } 2361 2362 static inline void 2363 _spdk_bdev_io_complete(void *ctx) 2364 { 2365 struct spdk_bdev_io *bdev_io = ctx; 2366 2367 if (spdk_unlikely(bdev_io->internal.in_submit_request || bdev_io->internal.io_submit_ch)) { 2368 /* 2369 * Send the completion to the thread that originally submitted the I/O, 2370 * which may not be the current thread in the case of QoS. 2371 */ 2372 if (bdev_io->internal.io_submit_ch) { 2373 bdev_io->internal.ch = bdev_io->internal.io_submit_ch; 2374 bdev_io->internal.io_submit_ch = NULL; 2375 } 2376 2377 /* 2378 * Defer completion to avoid potential infinite recursion if the 2379 * user's completion callback issues a new I/O. 2380 */ 2381 spdk_thread_send_msg(spdk_io_channel_get_thread(bdev_io->internal.ch->channel), 2382 _spdk_bdev_io_complete, bdev_io); 2383 return; 2384 } 2385 2386 if (bdev_io->internal.status == SPDK_BDEV_IO_STATUS_SUCCESS) { 2387 switch (bdev_io->type) { 2388 case SPDK_BDEV_IO_TYPE_READ: 2389 bdev_io->internal.ch->stat.bytes_read += bdev_io->u.bdev.num_blocks * bdev_io->bdev->blocklen; 2390 bdev_io->internal.ch->stat.num_read_ops++; 2391 bdev_io->internal.ch->stat.read_latency_ticks += (spdk_get_ticks() - bdev_io->internal.submit_tsc); 2392 break; 2393 case SPDK_BDEV_IO_TYPE_WRITE: 2394 bdev_io->internal.ch->stat.bytes_written += bdev_io->u.bdev.num_blocks * bdev_io->bdev->blocklen; 2395 bdev_io->internal.ch->stat.num_write_ops++; 2396 bdev_io->internal.ch->stat.write_latency_ticks += (spdk_get_ticks() - bdev_io->internal.submit_tsc); 2397 break; 2398 default: 2399 break; 2400 } 2401 } 2402 2403 #ifdef SPDK_CONFIG_VTUNE 2404 uint64_t now_tsc = spdk_get_ticks(); 2405 if (now_tsc > (bdev_io->internal.ch->start_tsc + bdev_io->internal.ch->interval_tsc)) { 2406 uint64_t data[5]; 2407 2408 data[0] = bdev_io->internal.ch->stat.num_read_ops - bdev_io->internal.ch->prev_stat.num_read_ops; 2409 data[1] = bdev_io->internal.ch->stat.bytes_read - bdev_io->internal.ch->prev_stat.bytes_read; 2410 data[2] = bdev_io->internal.ch->stat.num_write_ops - bdev_io->internal.ch->prev_stat.num_write_ops; 2411 data[3] = bdev_io->internal.ch->stat.bytes_written - bdev_io->internal.ch->prev_stat.bytes_written; 2412 data[4] = bdev_io->bdev->fn_table->get_spin_time ? 2413 bdev_io->bdev->fn_table->get_spin_time(bdev_io->internal.ch->channel) : 0; 2414 2415 __itt_metadata_add(g_bdev_mgr.domain, __itt_null, bdev_io->internal.ch->handle, 2416 __itt_metadata_u64, 5, data); 2417 2418 bdev_io->internal.ch->prev_stat = bdev_io->internal.ch->stat; 2419 bdev_io->internal.ch->start_tsc = now_tsc; 2420 } 2421 #endif 2422 2423 assert(bdev_io->internal.cb != NULL); 2424 assert(spdk_get_thread() == spdk_io_channel_get_thread(bdev_io->internal.ch->channel)); 2425 2426 bdev_io->internal.cb(bdev_io, bdev_io->internal.status == SPDK_BDEV_IO_STATUS_SUCCESS, 2427 bdev_io->internal.caller_ctx); 2428 } 2429 2430 static void 2431 _spdk_bdev_reset_complete(struct spdk_io_channel_iter *i, int status) 2432 { 2433 struct spdk_bdev_io *bdev_io = spdk_io_channel_iter_get_ctx(i); 2434 2435 if (bdev_io->u.reset.ch_ref != NULL) { 2436 spdk_put_io_channel(bdev_io->u.reset.ch_ref); 2437 bdev_io->u.reset.ch_ref = NULL; 2438 } 2439 2440 _spdk_bdev_io_complete(bdev_io); 2441 } 2442 2443 static void 2444 _spdk_bdev_unfreeze_channel(struct spdk_io_channel_iter *i) 2445 { 2446 struct spdk_io_channel *_ch = spdk_io_channel_iter_get_channel(i); 2447 struct spdk_bdev_channel *ch = spdk_io_channel_get_ctx(_ch); 2448 2449 ch->flags &= ~BDEV_CH_RESET_IN_PROGRESS; 2450 if (!TAILQ_EMPTY(&ch->queued_resets)) { 2451 _spdk_bdev_channel_start_reset(ch); 2452 } 2453 2454 spdk_for_each_channel_continue(i, 0); 2455 } 2456 2457 void 2458 spdk_bdev_io_complete(struct spdk_bdev_io *bdev_io, enum spdk_bdev_io_status status) 2459 { 2460 struct spdk_bdev *bdev = bdev_io->bdev; 2461 struct spdk_bdev_channel *bdev_ch = bdev_io->internal.ch; 2462 struct spdk_bdev_shared_resource *shared_resource = bdev_ch->shared_resource; 2463 2464 bdev_io->internal.status = status; 2465 2466 if (spdk_unlikely(bdev_io->type == SPDK_BDEV_IO_TYPE_RESET)) { 2467 bool unlock_channels = false; 2468 2469 if (status == SPDK_BDEV_IO_STATUS_NOMEM) { 2470 SPDK_ERRLOG("NOMEM returned for reset\n"); 2471 } 2472 pthread_mutex_lock(&bdev->internal.mutex); 2473 if (bdev_io == bdev->internal.reset_in_progress) { 2474 bdev->internal.reset_in_progress = NULL; 2475 unlock_channels = true; 2476 } 2477 pthread_mutex_unlock(&bdev->internal.mutex); 2478 2479 if (unlock_channels) { 2480 spdk_for_each_channel(__bdev_to_io_dev(bdev), _spdk_bdev_unfreeze_channel, 2481 bdev_io, _spdk_bdev_reset_complete); 2482 return; 2483 } 2484 } else { 2485 assert(bdev_ch->io_outstanding > 0); 2486 assert(shared_resource->io_outstanding > 0); 2487 bdev_ch->io_outstanding--; 2488 shared_resource->io_outstanding--; 2489 2490 if (spdk_unlikely(status == SPDK_BDEV_IO_STATUS_NOMEM)) { 2491 TAILQ_INSERT_HEAD(&shared_resource->nomem_io, bdev_io, internal.link); 2492 /* 2493 * Wait for some of the outstanding I/O to complete before we 2494 * retry any of the nomem_io. Normally we will wait for 2495 * NOMEM_THRESHOLD_COUNT I/O to complete but for low queue 2496 * depth channels we will instead wait for half to complete. 2497 */ 2498 shared_resource->nomem_threshold = spdk_max((int64_t)shared_resource->io_outstanding / 2, 2499 (int64_t)shared_resource->io_outstanding - NOMEM_THRESHOLD_COUNT); 2500 return; 2501 } 2502 2503 if (spdk_unlikely(!TAILQ_EMPTY(&shared_resource->nomem_io))) { 2504 _spdk_bdev_ch_retry_io(bdev_ch); 2505 } 2506 } 2507 2508 _spdk_bdev_io_complete(bdev_io); 2509 } 2510 2511 void 2512 spdk_bdev_io_complete_scsi_status(struct spdk_bdev_io *bdev_io, enum spdk_scsi_status sc, 2513 enum spdk_scsi_sense sk, uint8_t asc, uint8_t ascq) 2514 { 2515 if (sc == SPDK_SCSI_STATUS_GOOD) { 2516 bdev_io->internal.status = SPDK_BDEV_IO_STATUS_SUCCESS; 2517 } else { 2518 bdev_io->internal.status = SPDK_BDEV_IO_STATUS_SCSI_ERROR; 2519 bdev_io->internal.error.scsi.sc = sc; 2520 bdev_io->internal.error.scsi.sk = sk; 2521 bdev_io->internal.error.scsi.asc = asc; 2522 bdev_io->internal.error.scsi.ascq = ascq; 2523 } 2524 2525 spdk_bdev_io_complete(bdev_io, bdev_io->internal.status); 2526 } 2527 2528 void 2529 spdk_bdev_io_get_scsi_status(const struct spdk_bdev_io *bdev_io, 2530 int *sc, int *sk, int *asc, int *ascq) 2531 { 2532 assert(sc != NULL); 2533 assert(sk != NULL); 2534 assert(asc != NULL); 2535 assert(ascq != NULL); 2536 2537 switch (bdev_io->internal.status) { 2538 case SPDK_BDEV_IO_STATUS_SUCCESS: 2539 *sc = SPDK_SCSI_STATUS_GOOD; 2540 *sk = SPDK_SCSI_SENSE_NO_SENSE; 2541 *asc = SPDK_SCSI_ASC_NO_ADDITIONAL_SENSE; 2542 *ascq = SPDK_SCSI_ASCQ_CAUSE_NOT_REPORTABLE; 2543 break; 2544 case SPDK_BDEV_IO_STATUS_NVME_ERROR: 2545 spdk_scsi_nvme_translate(bdev_io, sc, sk, asc, ascq); 2546 break; 2547 case SPDK_BDEV_IO_STATUS_SCSI_ERROR: 2548 *sc = bdev_io->internal.error.scsi.sc; 2549 *sk = bdev_io->internal.error.scsi.sk; 2550 *asc = bdev_io->internal.error.scsi.asc; 2551 *ascq = bdev_io->internal.error.scsi.ascq; 2552 break; 2553 default: 2554 *sc = SPDK_SCSI_STATUS_CHECK_CONDITION; 2555 *sk = SPDK_SCSI_SENSE_ABORTED_COMMAND; 2556 *asc = SPDK_SCSI_ASC_NO_ADDITIONAL_SENSE; 2557 *ascq = SPDK_SCSI_ASCQ_CAUSE_NOT_REPORTABLE; 2558 break; 2559 } 2560 } 2561 2562 void 2563 spdk_bdev_io_complete_nvme_status(struct spdk_bdev_io *bdev_io, int sct, int sc) 2564 { 2565 if (sct == SPDK_NVME_SCT_GENERIC && sc == SPDK_NVME_SC_SUCCESS) { 2566 bdev_io->internal.status = SPDK_BDEV_IO_STATUS_SUCCESS; 2567 } else { 2568 bdev_io->internal.error.nvme.sct = sct; 2569 bdev_io->internal.error.nvme.sc = sc; 2570 bdev_io->internal.status = SPDK_BDEV_IO_STATUS_NVME_ERROR; 2571 } 2572 2573 spdk_bdev_io_complete(bdev_io, bdev_io->internal.status); 2574 } 2575 2576 void 2577 spdk_bdev_io_get_nvme_status(const struct spdk_bdev_io *bdev_io, int *sct, int *sc) 2578 { 2579 assert(sct != NULL); 2580 assert(sc != NULL); 2581 2582 if (bdev_io->internal.status == SPDK_BDEV_IO_STATUS_NVME_ERROR) { 2583 *sct = bdev_io->internal.error.nvme.sct; 2584 *sc = bdev_io->internal.error.nvme.sc; 2585 } else if (bdev_io->internal.status == SPDK_BDEV_IO_STATUS_SUCCESS) { 2586 *sct = SPDK_NVME_SCT_GENERIC; 2587 *sc = SPDK_NVME_SC_SUCCESS; 2588 } else { 2589 *sct = SPDK_NVME_SCT_GENERIC; 2590 *sc = SPDK_NVME_SC_INTERNAL_DEVICE_ERROR; 2591 } 2592 } 2593 2594 struct spdk_thread * 2595 spdk_bdev_io_get_thread(struct spdk_bdev_io *bdev_io) 2596 { 2597 return spdk_io_channel_get_thread(bdev_io->internal.ch->channel); 2598 } 2599 2600 static void 2601 _spdk_bdev_qos_config_type(struct spdk_bdev *bdev, uint64_t qos_set, 2602 enum spdk_bdev_qos_type qos_type) 2603 { 2604 uint64_t min_qos_set = 0; 2605 2606 switch (qos_type) { 2607 case SPDK_BDEV_QOS_RW_IOPS_RATE_LIMIT: 2608 min_qos_set = SPDK_BDEV_QOS_MIN_IOS_PER_SEC; 2609 break; 2610 case SPDK_BDEV_QOS_RW_BYTEPS_RATE_LIMIT: 2611 min_qos_set = SPDK_BDEV_QOS_MIN_BW_IN_MB_PER_SEC; 2612 break; 2613 default: 2614 SPDK_ERRLOG("Unsupported QoS type.\n"); 2615 return; 2616 } 2617 2618 if (qos_set % min_qos_set) { 2619 SPDK_ERRLOG("Assigned QoS %" PRIu64 " on bdev %s is not multiple of %lu\n", 2620 qos_set, bdev->name, min_qos_set); 2621 SPDK_ERRLOG("Failed to enable QoS on this bdev %s\n", bdev->name); 2622 return; 2623 } 2624 2625 if (!bdev->internal.qos) { 2626 bdev->internal.qos = calloc(1, sizeof(*bdev->internal.qos)); 2627 if (!bdev->internal.qos) { 2628 SPDK_ERRLOG("Unable to allocate memory for QoS tracking\n"); 2629 return; 2630 } 2631 } 2632 2633 switch (qos_type) { 2634 case SPDK_BDEV_QOS_RW_IOPS_RATE_LIMIT: 2635 bdev->internal.qos->iops_rate_limit = qos_set; 2636 break; 2637 case SPDK_BDEV_QOS_RW_BYTEPS_RATE_LIMIT: 2638 bdev->internal.qos->byte_rate_limit = qos_set * 1024 * 1024; 2639 break; 2640 default: 2641 break; 2642 } 2643 2644 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Bdev:%s QoS type:%d set:%lu\n", 2645 bdev->name, qos_type, qos_set); 2646 2647 return; 2648 } 2649 2650 static void 2651 _spdk_bdev_qos_config(struct spdk_bdev *bdev) 2652 { 2653 struct spdk_conf_section *sp = NULL; 2654 const char *val = NULL; 2655 uint64_t qos_set = 0; 2656 int i = 0, j = 0; 2657 2658 sp = spdk_conf_find_section(NULL, "QoS"); 2659 if (!sp) { 2660 return; 2661 } 2662 2663 while (j < SPDK_BDEV_QOS_NUM_TYPES) { 2664 i = 0; 2665 while (true) { 2666 val = spdk_conf_section_get_nmval(sp, qos_type_str[j], i, 0); 2667 if (!val) { 2668 break; 2669 } 2670 2671 if (strcmp(bdev->name, val) != 0) { 2672 i++; 2673 continue; 2674 } 2675 2676 val = spdk_conf_section_get_nmval(sp, qos_type_str[j], i, 1); 2677 if (val) { 2678 qos_set = strtoull(val, NULL, 10); 2679 _spdk_bdev_qos_config_type(bdev, qos_set, j); 2680 } 2681 2682 break; 2683 } 2684 2685 j++; 2686 } 2687 2688 return; 2689 } 2690 2691 static int 2692 spdk_bdev_init(struct spdk_bdev *bdev) 2693 { 2694 assert(bdev->module != NULL); 2695 2696 if (!bdev->name) { 2697 SPDK_ERRLOG("Bdev name is NULL\n"); 2698 return -EINVAL; 2699 } 2700 2701 if (spdk_bdev_get_by_name(bdev->name)) { 2702 SPDK_ERRLOG("Bdev name:%s already exists\n", bdev->name); 2703 return -EEXIST; 2704 } 2705 2706 bdev->internal.status = SPDK_BDEV_STATUS_READY; 2707 2708 TAILQ_INIT(&bdev->internal.open_descs); 2709 2710 TAILQ_INIT(&bdev->aliases); 2711 2712 bdev->internal.reset_in_progress = NULL; 2713 2714 _spdk_bdev_qos_config(bdev); 2715 2716 spdk_io_device_register(__bdev_to_io_dev(bdev), 2717 spdk_bdev_channel_create, spdk_bdev_channel_destroy, 2718 sizeof(struct spdk_bdev_channel)); 2719 2720 pthread_mutex_init(&bdev->internal.mutex, NULL); 2721 return 0; 2722 } 2723 2724 static void 2725 spdk_bdev_destroy_cb(void *io_device) 2726 { 2727 int rc; 2728 struct spdk_bdev *bdev; 2729 spdk_bdev_unregister_cb cb_fn; 2730 void *cb_arg; 2731 2732 bdev = __bdev_from_io_dev(io_device); 2733 cb_fn = bdev->internal.unregister_cb; 2734 cb_arg = bdev->internal.unregister_ctx; 2735 2736 rc = bdev->fn_table->destruct(bdev->ctxt); 2737 if (rc < 0) { 2738 SPDK_ERRLOG("destruct failed\n"); 2739 } 2740 if (rc <= 0 && cb_fn != NULL) { 2741 cb_fn(cb_arg, rc); 2742 } 2743 } 2744 2745 2746 static void 2747 spdk_bdev_fini(struct spdk_bdev *bdev) 2748 { 2749 pthread_mutex_destroy(&bdev->internal.mutex); 2750 2751 free(bdev->internal.qos); 2752 2753 spdk_io_device_unregister(__bdev_to_io_dev(bdev), spdk_bdev_destroy_cb); 2754 } 2755 2756 static void 2757 spdk_bdev_start(struct spdk_bdev *bdev) 2758 { 2759 struct spdk_bdev_module *module; 2760 2761 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Inserting bdev %s into list\n", bdev->name); 2762 TAILQ_INSERT_TAIL(&g_bdev_mgr.bdevs, bdev, internal.link); 2763 2764 TAILQ_FOREACH(module, &g_bdev_mgr.bdev_modules, internal.tailq) { 2765 if (module->examine) { 2766 module->internal.action_in_progress++; 2767 module->examine(bdev); 2768 } 2769 } 2770 } 2771 2772 int 2773 spdk_bdev_register(struct spdk_bdev *bdev) 2774 { 2775 int rc = spdk_bdev_init(bdev); 2776 2777 if (rc == 0) { 2778 spdk_bdev_start(bdev); 2779 } 2780 2781 return rc; 2782 } 2783 2784 static void 2785 spdk_vbdev_remove_base_bdevs(struct spdk_bdev *vbdev) 2786 { 2787 struct spdk_bdev **bdevs; 2788 struct spdk_bdev *base; 2789 size_t i, j, k; 2790 bool found; 2791 2792 /* Iterate over base bdevs to remove vbdev from them. */ 2793 for (i = 0; i < vbdev->internal.base_bdevs_cnt; i++) { 2794 found = false; 2795 base = vbdev->internal.base_bdevs[i]; 2796 2797 for (j = 0; j < base->vbdevs_cnt; j++) { 2798 if (base->vbdevs[j] != vbdev) { 2799 continue; 2800 } 2801 2802 for (k = j; k + 1 < base->vbdevs_cnt; k++) { 2803 base->vbdevs[k] = base->vbdevs[k + 1]; 2804 } 2805 2806 base->vbdevs_cnt--; 2807 if (base->vbdevs_cnt > 0) { 2808 bdevs = realloc(base->vbdevs, base->vbdevs_cnt * sizeof(bdevs[0])); 2809 /* It would be odd if shrinking memory block fail. */ 2810 assert(bdevs); 2811 base->vbdevs = bdevs; 2812 } else { 2813 free(base->vbdevs); 2814 base->vbdevs = NULL; 2815 } 2816 2817 found = true; 2818 break; 2819 } 2820 2821 if (!found) { 2822 SPDK_WARNLOG("Bdev '%s' is not base bdev of '%s'.\n", base->name, vbdev->name); 2823 } 2824 } 2825 2826 free(vbdev->internal.base_bdevs); 2827 vbdev->internal.base_bdevs = NULL; 2828 vbdev->internal.base_bdevs_cnt = 0; 2829 } 2830 2831 static int 2832 spdk_vbdev_set_base_bdevs(struct spdk_bdev *vbdev, struct spdk_bdev **base_bdevs, size_t cnt) 2833 { 2834 struct spdk_bdev **vbdevs; 2835 struct spdk_bdev *base; 2836 size_t i; 2837 2838 /* Adding base bdevs isn't supported (yet?). */ 2839 assert(vbdev->internal.base_bdevs_cnt == 0); 2840 2841 vbdev->internal.base_bdevs = malloc(cnt * sizeof(vbdev->internal.base_bdevs[0])); 2842 if (!vbdev->internal.base_bdevs) { 2843 SPDK_ERRLOG("%s - realloc() failed\n", vbdev->name); 2844 return -ENOMEM; 2845 } 2846 2847 memcpy(vbdev->internal.base_bdevs, base_bdevs, cnt * sizeof(vbdev->internal.base_bdevs[0])); 2848 vbdev->internal.base_bdevs_cnt = cnt; 2849 2850 /* Iterate over base bdevs to add this vbdev to them. */ 2851 for (i = 0; i < cnt; i++) { 2852 base = vbdev->internal.base_bdevs[i]; 2853 2854 assert(base != NULL); 2855 assert(base->internal.claim_module != NULL); 2856 2857 vbdevs = realloc(base->vbdevs, (base->vbdevs_cnt + 1) * sizeof(vbdevs[0])); 2858 if (!vbdevs) { 2859 SPDK_ERRLOG("%s - realloc() failed\n", base->name); 2860 spdk_vbdev_remove_base_bdevs(vbdev); 2861 return -ENOMEM; 2862 } 2863 2864 vbdevs[base->vbdevs_cnt] = vbdev; 2865 base->vbdevs = vbdevs; 2866 base->vbdevs_cnt++; 2867 } 2868 2869 return 0; 2870 } 2871 2872 int 2873 spdk_vbdev_register(struct spdk_bdev *vbdev, struct spdk_bdev **base_bdevs, int base_bdev_count) 2874 { 2875 int rc; 2876 2877 rc = spdk_bdev_init(vbdev); 2878 if (rc) { 2879 return rc; 2880 } 2881 2882 if (base_bdev_count == 0) { 2883 spdk_bdev_start(vbdev); 2884 return 0; 2885 } 2886 2887 rc = spdk_vbdev_set_base_bdevs(vbdev, base_bdevs, base_bdev_count); 2888 if (rc) { 2889 spdk_bdev_fini(vbdev); 2890 return rc; 2891 } 2892 2893 spdk_bdev_start(vbdev); 2894 return 0; 2895 2896 } 2897 2898 void 2899 spdk_bdev_destruct_done(struct spdk_bdev *bdev, int bdeverrno) 2900 { 2901 if (bdev->internal.unregister_cb != NULL) { 2902 bdev->internal.unregister_cb(bdev->internal.unregister_ctx, bdeverrno); 2903 } 2904 } 2905 2906 static void 2907 _remove_notify(void *arg) 2908 { 2909 struct spdk_bdev_desc *desc = arg; 2910 2911 desc->remove_cb(desc->remove_ctx); 2912 } 2913 2914 void 2915 spdk_bdev_unregister(struct spdk_bdev *bdev, spdk_bdev_unregister_cb cb_fn, void *cb_arg) 2916 { 2917 struct spdk_bdev_desc *desc, *tmp; 2918 bool do_destruct = true; 2919 struct spdk_thread *thread; 2920 2921 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Removing bdev %s from list\n", bdev->name); 2922 2923 thread = spdk_get_thread(); 2924 if (!thread) { 2925 /* The user called this from a non-SPDK thread. */ 2926 if (cb_fn != NULL) { 2927 cb_fn(cb_arg, -ENOTSUP); 2928 } 2929 return; 2930 } 2931 2932 pthread_mutex_lock(&bdev->internal.mutex); 2933 2934 spdk_vbdev_remove_base_bdevs(bdev); 2935 2936 bdev->internal.status = SPDK_BDEV_STATUS_REMOVING; 2937 bdev->internal.unregister_cb = cb_fn; 2938 bdev->internal.unregister_ctx = cb_arg; 2939 2940 TAILQ_FOREACH_SAFE(desc, &bdev->internal.open_descs, link, tmp) { 2941 if (desc->remove_cb) { 2942 do_destruct = false; 2943 /* 2944 * Defer invocation of the remove_cb to a separate message that will 2945 * run later on this thread. This ensures this context unwinds and 2946 * we don't recursively unregister this bdev again if the remove_cb 2947 * immediately closes its descriptor. 2948 */ 2949 if (!desc->remove_scheduled) { 2950 /* Avoid scheduling removal of the same descriptor multiple times. */ 2951 desc->remove_scheduled = true; 2952 spdk_thread_send_msg(thread, _remove_notify, desc); 2953 } 2954 } 2955 } 2956 2957 if (!do_destruct) { 2958 pthread_mutex_unlock(&bdev->internal.mutex); 2959 return; 2960 } 2961 2962 TAILQ_REMOVE(&g_bdev_mgr.bdevs, bdev, internal.link); 2963 pthread_mutex_unlock(&bdev->internal.mutex); 2964 2965 spdk_bdev_fini(bdev); 2966 } 2967 2968 int 2969 spdk_bdev_open(struct spdk_bdev *bdev, bool write, spdk_bdev_remove_cb_t remove_cb, 2970 void *remove_ctx, struct spdk_bdev_desc **_desc) 2971 { 2972 struct spdk_bdev_desc *desc; 2973 2974 desc = calloc(1, sizeof(*desc)); 2975 if (desc == NULL) { 2976 SPDK_ERRLOG("Failed to allocate memory for bdev descriptor\n"); 2977 return -ENOMEM; 2978 } 2979 2980 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Opening descriptor %p for bdev %s on thread %p\n", desc, bdev->name, 2981 spdk_get_thread()); 2982 2983 pthread_mutex_lock(&bdev->internal.mutex); 2984 2985 if (write && bdev->internal.claim_module) { 2986 SPDK_ERRLOG("Could not open %s - already claimed\n", bdev->name); 2987 free(desc); 2988 pthread_mutex_unlock(&bdev->internal.mutex); 2989 return -EPERM; 2990 } 2991 2992 TAILQ_INSERT_TAIL(&bdev->internal.open_descs, desc, link); 2993 2994 desc->bdev = bdev; 2995 desc->remove_cb = remove_cb; 2996 desc->remove_ctx = remove_ctx; 2997 desc->write = write; 2998 *_desc = desc; 2999 3000 pthread_mutex_unlock(&bdev->internal.mutex); 3001 3002 return 0; 3003 } 3004 3005 void 3006 spdk_bdev_close(struct spdk_bdev_desc *desc) 3007 { 3008 struct spdk_bdev *bdev = desc->bdev; 3009 bool do_unregister = false; 3010 3011 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Closing descriptor %p for bdev %s on thread %p\n", desc, bdev->name, 3012 spdk_get_thread()); 3013 3014 pthread_mutex_lock(&bdev->internal.mutex); 3015 3016 TAILQ_REMOVE(&bdev->internal.open_descs, desc, link); 3017 free(desc); 3018 3019 /* If no more descriptors, kill QoS channel */ 3020 if (bdev->internal.qos && TAILQ_EMPTY(&bdev->internal.open_descs)) { 3021 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Closed last descriptor for bdev %s on thread %p. Stopping QoS.\n", 3022 bdev->name, spdk_get_thread()); 3023 3024 if (spdk_bdev_qos_destroy(bdev)) { 3025 /* There isn't anything we can do to recover here. Just let the 3026 * old QoS poller keep running. The QoS handling won't change 3027 * cores when the user allocates a new channel, but it won't break. */ 3028 SPDK_ERRLOG("Unable to shut down QoS poller. It will continue running on the current thread.\n"); 3029 } 3030 } 3031 3032 if (bdev->internal.status == SPDK_BDEV_STATUS_REMOVING && TAILQ_EMPTY(&bdev->internal.open_descs)) { 3033 do_unregister = true; 3034 } 3035 pthread_mutex_unlock(&bdev->internal.mutex); 3036 3037 if (do_unregister == true) { 3038 spdk_bdev_unregister(bdev, bdev->internal.unregister_cb, bdev->internal.unregister_ctx); 3039 } 3040 } 3041 3042 int 3043 spdk_bdev_module_claim_bdev(struct spdk_bdev *bdev, struct spdk_bdev_desc *desc, 3044 struct spdk_bdev_module *module) 3045 { 3046 if (bdev->internal.claim_module != NULL) { 3047 SPDK_ERRLOG("bdev %s already claimed by module %s\n", bdev->name, 3048 bdev->internal.claim_module->name); 3049 return -EPERM; 3050 } 3051 3052 if (desc && !desc->write) { 3053 desc->write = true; 3054 } 3055 3056 bdev->internal.claim_module = module; 3057 return 0; 3058 } 3059 3060 void 3061 spdk_bdev_module_release_bdev(struct spdk_bdev *bdev) 3062 { 3063 assert(bdev->internal.claim_module != NULL); 3064 bdev->internal.claim_module = NULL; 3065 } 3066 3067 struct spdk_bdev * 3068 spdk_bdev_desc_get_bdev(struct spdk_bdev_desc *desc) 3069 { 3070 return desc->bdev; 3071 } 3072 3073 void 3074 spdk_bdev_io_get_iovec(struct spdk_bdev_io *bdev_io, struct iovec **iovp, int *iovcntp) 3075 { 3076 struct iovec *iovs; 3077 int iovcnt; 3078 3079 if (bdev_io == NULL) { 3080 return; 3081 } 3082 3083 switch (bdev_io->type) { 3084 case SPDK_BDEV_IO_TYPE_READ: 3085 iovs = bdev_io->u.bdev.iovs; 3086 iovcnt = bdev_io->u.bdev.iovcnt; 3087 break; 3088 case SPDK_BDEV_IO_TYPE_WRITE: 3089 iovs = bdev_io->u.bdev.iovs; 3090 iovcnt = bdev_io->u.bdev.iovcnt; 3091 break; 3092 default: 3093 iovs = NULL; 3094 iovcnt = 0; 3095 break; 3096 } 3097 3098 if (iovp) { 3099 *iovp = iovs; 3100 } 3101 if (iovcntp) { 3102 *iovcntp = iovcnt; 3103 } 3104 } 3105 3106 void 3107 spdk_bdev_module_list_add(struct spdk_bdev_module *bdev_module) 3108 { 3109 3110 if (spdk_bdev_module_list_find(bdev_module->name)) { 3111 SPDK_ERRLOG("ERROR: module '%s' already registered.\n", bdev_module->name); 3112 assert(false); 3113 } 3114 3115 if (bdev_module->async_init) { 3116 bdev_module->internal.action_in_progress = 1; 3117 } 3118 3119 /* 3120 * Modules with examine callbacks must be initialized first, so they are 3121 * ready to handle examine callbacks from later modules that will 3122 * register physical bdevs. 3123 */ 3124 if (bdev_module->examine != NULL) { 3125 TAILQ_INSERT_HEAD(&g_bdev_mgr.bdev_modules, bdev_module, internal.tailq); 3126 } else { 3127 TAILQ_INSERT_TAIL(&g_bdev_mgr.bdev_modules, bdev_module, internal.tailq); 3128 } 3129 } 3130 3131 struct spdk_bdev_module * 3132 spdk_bdev_module_list_find(const char *name) 3133 { 3134 struct spdk_bdev_module *bdev_module; 3135 3136 TAILQ_FOREACH(bdev_module, &g_bdev_mgr.bdev_modules, internal.tailq) { 3137 if (strcmp(name, bdev_module->name) == 0) { 3138 break; 3139 } 3140 } 3141 3142 return bdev_module; 3143 } 3144 3145 static void 3146 spdk_bdev_write_zeroes_split(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg) 3147 { 3148 uint64_t len; 3149 3150 if (!success) { 3151 bdev_io->internal.cb = bdev_io->u.bdev.stored_user_cb; 3152 _spdk_bdev_io_complete(bdev_io); 3153 return; 3154 } 3155 3156 /* no need to perform the error checking from write_zeroes_blocks because this request already passed those checks. */ 3157 len = spdk_min(spdk_bdev_get_block_size(bdev_io->bdev) * bdev_io->u.bdev.split_remaining_num_blocks, 3158 ZERO_BUFFER_SIZE); 3159 3160 bdev_io->u.bdev.offset_blocks = bdev_io->u.bdev.split_current_offset_blocks; 3161 bdev_io->u.bdev.iov.iov_len = len; 3162 bdev_io->u.bdev.num_blocks = len / spdk_bdev_get_block_size(bdev_io->bdev); 3163 bdev_io->u.bdev.split_remaining_num_blocks -= bdev_io->u.bdev.num_blocks; 3164 bdev_io->u.bdev.split_current_offset_blocks += bdev_io->u.bdev.num_blocks; 3165 3166 /* if this round completes the i/o, change the callback to be the original user callback */ 3167 if (bdev_io->u.bdev.split_remaining_num_blocks == 0) { 3168 spdk_bdev_io_init(bdev_io, bdev_io->bdev, cb_arg, bdev_io->u.bdev.stored_user_cb); 3169 } else { 3170 spdk_bdev_io_init(bdev_io, bdev_io->bdev, cb_arg, spdk_bdev_write_zeroes_split); 3171 } 3172 spdk_bdev_io_submit(bdev_io); 3173 } 3174 3175 struct set_qos_limit_ctx { 3176 void (*cb_fn)(void *cb_arg, int status); 3177 void *cb_arg; 3178 struct spdk_bdev *bdev; 3179 }; 3180 3181 static void 3182 _spdk_bdev_set_qos_limit_done(struct set_qos_limit_ctx *ctx, int status) 3183 { 3184 pthread_mutex_lock(&ctx->bdev->internal.mutex); 3185 ctx->bdev->internal.qos_mod_in_progress = false; 3186 pthread_mutex_unlock(&ctx->bdev->internal.mutex); 3187 3188 ctx->cb_fn(ctx->cb_arg, status); 3189 free(ctx); 3190 } 3191 3192 static void 3193 _spdk_bdev_disable_qos_done(void *cb_arg) 3194 { 3195 struct set_qos_limit_ctx *ctx = cb_arg; 3196 struct spdk_bdev *bdev = ctx->bdev; 3197 struct spdk_bdev_qos *qos; 3198 3199 pthread_mutex_lock(&bdev->internal.mutex); 3200 qos = bdev->internal.qos; 3201 bdev->internal.qos = NULL; 3202 pthread_mutex_unlock(&bdev->internal.mutex); 3203 3204 _spdk_bdev_abort_queued_io(&qos->queued, qos->ch); 3205 spdk_put_io_channel(spdk_io_channel_from_ctx(qos->ch)); 3206 spdk_poller_unregister(&qos->poller); 3207 3208 free(qos); 3209 3210 _spdk_bdev_set_qos_limit_done(ctx, 0); 3211 } 3212 3213 static void 3214 _spdk_bdev_disable_qos_msg_done(struct spdk_io_channel_iter *i, int status) 3215 { 3216 void *io_device = spdk_io_channel_iter_get_io_device(i); 3217 struct spdk_bdev *bdev = __bdev_from_io_dev(io_device); 3218 struct set_qos_limit_ctx *ctx = spdk_io_channel_iter_get_ctx(i); 3219 struct spdk_thread *thread; 3220 3221 pthread_mutex_lock(&bdev->internal.mutex); 3222 thread = bdev->internal.qos->thread; 3223 pthread_mutex_unlock(&bdev->internal.mutex); 3224 3225 spdk_thread_send_msg(thread, _spdk_bdev_disable_qos_done, ctx); 3226 } 3227 3228 static void 3229 _spdk_bdev_disable_qos_msg(struct spdk_io_channel_iter *i) 3230 { 3231 struct spdk_io_channel *ch = spdk_io_channel_iter_get_channel(i); 3232 struct spdk_bdev_channel *bdev_ch = spdk_io_channel_get_ctx(ch); 3233 3234 bdev_ch->flags &= ~BDEV_CH_QOS_ENABLED; 3235 3236 spdk_for_each_channel_continue(i, 0); 3237 } 3238 3239 static void 3240 _spdk_bdev_update_qos_limit_iops_msg(void *cb_arg) 3241 { 3242 struct set_qos_limit_ctx *ctx = cb_arg; 3243 struct spdk_bdev *bdev = ctx->bdev; 3244 3245 pthread_mutex_lock(&bdev->internal.mutex); 3246 spdk_bdev_qos_update_max_quota_per_timeslice(bdev->internal.qos); 3247 pthread_mutex_unlock(&bdev->internal.mutex); 3248 3249 _spdk_bdev_set_qos_limit_done(ctx, 0); 3250 } 3251 3252 static void 3253 _spdk_bdev_enable_qos_msg(struct spdk_io_channel_iter *i) 3254 { 3255 void *io_device = spdk_io_channel_iter_get_io_device(i); 3256 struct spdk_bdev *bdev = __bdev_from_io_dev(io_device); 3257 struct spdk_io_channel *ch = spdk_io_channel_iter_get_channel(i); 3258 struct spdk_bdev_channel *bdev_ch = spdk_io_channel_get_ctx(ch); 3259 int rc; 3260 3261 pthread_mutex_lock(&bdev->internal.mutex); 3262 rc = _spdk_bdev_enable_qos(bdev, bdev_ch); 3263 pthread_mutex_unlock(&bdev->internal.mutex); 3264 spdk_for_each_channel_continue(i, rc); 3265 } 3266 3267 static void 3268 _spdk_bdev_enable_qos_done(struct spdk_io_channel_iter *i, int status) 3269 { 3270 struct set_qos_limit_ctx *ctx = spdk_io_channel_iter_get_ctx(i); 3271 3272 _spdk_bdev_set_qos_limit_done(ctx, status); 3273 } 3274 3275 void 3276 spdk_bdev_set_qos_limit_iops(struct spdk_bdev *bdev, uint64_t ios_per_sec, 3277 void (*cb_fn)(void *cb_arg, int status), void *cb_arg) 3278 { 3279 struct set_qos_limit_ctx *ctx; 3280 3281 if (ios_per_sec > 0 && ios_per_sec % SPDK_BDEV_QOS_MIN_IOS_PER_SEC) { 3282 SPDK_ERRLOG("Requested ios_per_sec limit %" PRIu64 " is not a multiple of %u\n", 3283 ios_per_sec, SPDK_BDEV_QOS_MIN_IOS_PER_SEC); 3284 cb_fn(cb_arg, -EINVAL); 3285 return; 3286 } 3287 3288 ctx = calloc(1, sizeof(*ctx)); 3289 if (ctx == NULL) { 3290 cb_fn(cb_arg, -ENOMEM); 3291 return; 3292 } 3293 3294 ctx->cb_fn = cb_fn; 3295 ctx->cb_arg = cb_arg; 3296 ctx->bdev = bdev; 3297 3298 pthread_mutex_lock(&bdev->internal.mutex); 3299 if (bdev->internal.qos_mod_in_progress) { 3300 pthread_mutex_unlock(&bdev->internal.mutex); 3301 free(ctx); 3302 cb_fn(cb_arg, -EAGAIN); 3303 return; 3304 } 3305 bdev->internal.qos_mod_in_progress = true; 3306 3307 if (ios_per_sec > 0) { 3308 if (bdev->internal.qos == NULL) { 3309 /* Enabling */ 3310 bdev->internal.qos = calloc(1, sizeof(*bdev->internal.qos)); 3311 if (!bdev->internal.qos) { 3312 pthread_mutex_unlock(&bdev->internal.mutex); 3313 SPDK_ERRLOG("Unable to allocate memory for QoS tracking\n"); 3314 free(ctx); 3315 cb_fn(cb_arg, -ENOMEM); 3316 return; 3317 } 3318 3319 bdev->internal.qos->iops_rate_limit = ios_per_sec; 3320 spdk_for_each_channel(__bdev_to_io_dev(bdev), 3321 _spdk_bdev_enable_qos_msg, ctx, 3322 _spdk_bdev_enable_qos_done); 3323 } else { 3324 /* Updating */ 3325 bdev->internal.qos->iops_rate_limit = ios_per_sec; 3326 spdk_thread_send_msg(bdev->internal.qos->thread, _spdk_bdev_update_qos_limit_iops_msg, ctx); 3327 } 3328 } else { 3329 if (bdev->internal.qos != NULL) { 3330 /* Disabling */ 3331 spdk_for_each_channel(__bdev_to_io_dev(bdev), 3332 _spdk_bdev_disable_qos_msg, ctx, 3333 _spdk_bdev_disable_qos_msg_done); 3334 } else { 3335 pthread_mutex_unlock(&bdev->internal.mutex); 3336 _spdk_bdev_set_qos_limit_done(ctx, 0); 3337 return; 3338 } 3339 } 3340 3341 pthread_mutex_unlock(&bdev->internal.mutex); 3342 } 3343 3344 SPDK_LOG_REGISTER_COMPONENT("bdev", SPDK_LOG_BDEV) 3345