1 /*- 2 * BSD LICENSE 3 * 4 * Copyright (C) 2008-2012 Daisuke Aoyama <aoyama@peach.ne.jp>. 5 * Copyright (c) Intel Corporation. 6 * All rights reserved. 7 * 8 * Redistribution and use in source and binary forms, with or without 9 * modification, are permitted provided that the following conditions 10 * are met: 11 * 12 * * Redistributions of source code must retain the above copyright 13 * notice, this list of conditions and the following disclaimer. 14 * * Redistributions in binary form must reproduce the above copyright 15 * notice, this list of conditions and the following disclaimer in 16 * the documentation and/or other materials provided with the 17 * distribution. 18 * * Neither the name of Intel Corporation nor the names of its 19 * contributors may be used to endorse or promote products derived 20 * from this software without specific prior written permission. 21 * 22 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 23 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 24 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR 25 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT 26 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, 27 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT 28 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 29 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY 30 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 31 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE 32 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 33 */ 34 35 #include "spdk/stdinc.h" 36 37 #include "spdk/bdev.h" 38 #include "spdk/conf.h" 39 40 #include "spdk/env.h" 41 #include "spdk/event.h" 42 #include "spdk/thread.h" 43 #include "spdk/likely.h" 44 #include "spdk/queue.h" 45 #include "spdk/nvme_spec.h" 46 #include "spdk/scsi_spec.h" 47 #include "spdk/util.h" 48 49 #include "spdk/bdev_module.h" 50 #include "spdk_internal/log.h" 51 #include "spdk/string.h" 52 53 #ifdef SPDK_CONFIG_VTUNE 54 #include "ittnotify.h" 55 #include "ittnotify_types.h" 56 int __itt_init_ittlib(const char *, __itt_group_id); 57 #endif 58 59 #define SPDK_BDEV_IO_POOL_SIZE (64 * 1024) 60 #define SPDK_BDEV_IO_CACHE_SIZE 256 61 #define BUF_SMALL_POOL_SIZE 8192 62 #define BUF_LARGE_POOL_SIZE 1024 63 #define NOMEM_THRESHOLD_COUNT 8 64 #define ZERO_BUFFER_SIZE 0x100000 65 #define SPDK_BDEV_QOS_TIMESLICE_IN_USEC 1000 66 #define SPDK_BDEV_SEC_TO_USEC 1000000ULL 67 #define SPDK_BDEV_QOS_MIN_IO_PER_TIMESLICE 1 68 #define SPDK_BDEV_QOS_MIN_BYTE_PER_TIMESLICE 512 69 #define SPDK_BDEV_QOS_MIN_IOS_PER_SEC 10000 70 #define SPDK_BDEV_QOS_MIN_BW_IN_MB_PER_SEC 10 71 72 enum spdk_bdev_qos_type { 73 SPDK_BDEV_QOS_RW_IOPS_RATE_LIMIT = 0, 74 SPDK_BDEV_QOS_RW_BYTEPS_RATE_LIMIT, 75 SPDK_BDEV_QOS_NUM_TYPES /* Keep last */ 76 }; 77 78 static const char *qos_type_str[SPDK_BDEV_QOS_NUM_TYPES] = {"Limit_IOPS", "Limit_BWPS"}; 79 80 struct spdk_bdev_mgr { 81 struct spdk_mempool *bdev_io_pool; 82 83 struct spdk_mempool *buf_small_pool; 84 struct spdk_mempool *buf_large_pool; 85 86 void *zero_buffer; 87 88 TAILQ_HEAD(, spdk_bdev_module) bdev_modules; 89 90 TAILQ_HEAD(, spdk_bdev) bdevs; 91 92 bool init_complete; 93 bool module_init_complete; 94 95 #ifdef SPDK_CONFIG_VTUNE 96 __itt_domain *domain; 97 #endif 98 }; 99 100 static struct spdk_bdev_mgr g_bdev_mgr = { 101 .bdev_modules = TAILQ_HEAD_INITIALIZER(g_bdev_mgr.bdev_modules), 102 .bdevs = TAILQ_HEAD_INITIALIZER(g_bdev_mgr.bdevs), 103 .init_complete = false, 104 .module_init_complete = false, 105 }; 106 107 static struct spdk_bdev_opts g_bdev_opts = { 108 .bdev_io_pool_size = SPDK_BDEV_IO_POOL_SIZE, 109 .bdev_io_cache_size = SPDK_BDEV_IO_CACHE_SIZE, 110 }; 111 112 static spdk_bdev_init_cb g_init_cb_fn = NULL; 113 static void *g_init_cb_arg = NULL; 114 115 static spdk_bdev_fini_cb g_fini_cb_fn = NULL; 116 static void *g_fini_cb_arg = NULL; 117 static struct spdk_thread *g_fini_thread = NULL; 118 119 struct spdk_bdev_qos { 120 /** Rate limit, in I/O per second */ 121 uint64_t iops_rate_limit; 122 123 /** Rate limit, in byte per second */ 124 uint64_t byte_rate_limit; 125 126 /** The channel that all I/O are funneled through */ 127 struct spdk_bdev_channel *ch; 128 129 /** The thread on which the poller is running. */ 130 struct spdk_thread *thread; 131 132 /** Queue of I/O waiting to be issued. */ 133 bdev_io_tailq_t queued; 134 135 /** Maximum allowed IOs to be issued in one timeslice (e.g., 1ms) and 136 * only valid for the master channel which manages the outstanding IOs. */ 137 uint64_t max_ios_per_timeslice; 138 139 /** Maximum allowed bytes to be issued in one timeslice (e.g., 1ms) and 140 * only valid for the master channel which manages the outstanding IOs. */ 141 uint64_t max_byte_per_timeslice; 142 143 /** Submitted IO in one timeslice (e.g., 1ms) */ 144 uint64_t io_submitted_this_timeslice; 145 146 /** Submitted byte in one timeslice (e.g., 1ms) */ 147 uint64_t byte_submitted_this_timeslice; 148 149 /** Polller that processes queued I/O commands each time slice. */ 150 struct spdk_poller *poller; 151 }; 152 153 struct spdk_bdev_mgmt_channel { 154 bdev_io_stailq_t need_buf_small; 155 bdev_io_stailq_t need_buf_large; 156 157 /* 158 * Each thread keeps a cache of bdev_io - this allows 159 * bdev threads which are *not* DPDK threads to still 160 * benefit from a per-thread bdev_io cache. Without 161 * this, non-DPDK threads fetching from the mempool 162 * incur a cmpxchg on get and put. 163 */ 164 bdev_io_stailq_t per_thread_cache; 165 uint32_t per_thread_cache_count; 166 uint32_t bdev_io_cache_size; 167 168 TAILQ_HEAD(, spdk_bdev_shared_resource) shared_resources; 169 TAILQ_HEAD(, spdk_bdev_io_wait_entry) io_wait_queue; 170 }; 171 172 /* 173 * Per-module (or per-io_device) data. Multiple bdevs built on the same io_device 174 * will queue here their IO that awaits retry. It makes it posible to retry sending 175 * IO to one bdev after IO from other bdev completes. 176 */ 177 struct spdk_bdev_shared_resource { 178 /* The bdev management channel */ 179 struct spdk_bdev_mgmt_channel *mgmt_ch; 180 181 /* 182 * Count of I/O submitted to bdev module and waiting for completion. 183 * Incremented before submit_request() is called on an spdk_bdev_io. 184 */ 185 uint64_t io_outstanding; 186 187 /* 188 * Queue of IO awaiting retry because of a previous NOMEM status returned 189 * on this channel. 190 */ 191 bdev_io_tailq_t nomem_io; 192 193 /* 194 * Threshold which io_outstanding must drop to before retrying nomem_io. 195 */ 196 uint64_t nomem_threshold; 197 198 /* I/O channel allocated by a bdev module */ 199 struct spdk_io_channel *shared_ch; 200 201 /* Refcount of bdev channels using this resource */ 202 uint32_t ref; 203 204 TAILQ_ENTRY(spdk_bdev_shared_resource) link; 205 }; 206 207 #define BDEV_CH_RESET_IN_PROGRESS (1 << 0) 208 #define BDEV_CH_QOS_ENABLED (1 << 1) 209 210 struct spdk_bdev_channel { 211 struct spdk_bdev *bdev; 212 213 /* The channel for the underlying device */ 214 struct spdk_io_channel *channel; 215 216 /* Per io_device per thread data */ 217 struct spdk_bdev_shared_resource *shared_resource; 218 219 struct spdk_bdev_io_stat stat; 220 221 /* 222 * Count of I/O submitted through this channel and waiting for completion. 223 * Incremented before submit_request() is called on an spdk_bdev_io. 224 */ 225 uint64_t io_outstanding; 226 227 bdev_io_tailq_t queued_resets; 228 229 uint32_t flags; 230 231 #ifdef SPDK_CONFIG_VTUNE 232 uint64_t start_tsc; 233 uint64_t interval_tsc; 234 __itt_string_handle *handle; 235 struct spdk_bdev_io_stat prev_stat; 236 #endif 237 238 }; 239 240 struct spdk_bdev_desc { 241 struct spdk_bdev *bdev; 242 spdk_bdev_remove_cb_t remove_cb; 243 void *remove_ctx; 244 bool remove_scheduled; 245 bool write; 246 TAILQ_ENTRY(spdk_bdev_desc) link; 247 }; 248 249 struct spdk_bdev_iostat_ctx { 250 struct spdk_bdev_io_stat *stat; 251 spdk_bdev_get_device_stat_cb cb; 252 void *cb_arg; 253 }; 254 255 #define __bdev_to_io_dev(bdev) (((char *)bdev) + 1) 256 #define __bdev_from_io_dev(io_dev) ((struct spdk_bdev *)(((char *)io_dev) - 1)) 257 258 static void spdk_bdev_write_zeroes_split(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg); 259 260 void 261 spdk_bdev_get_opts(struct spdk_bdev_opts *opts) 262 { 263 *opts = g_bdev_opts; 264 } 265 266 int 267 spdk_bdev_set_opts(struct spdk_bdev_opts *opts) 268 { 269 uint32_t min_pool_size; 270 271 /* 272 * Add 1 to the thread count to account for the extra mgmt_ch that gets created during subsystem 273 * initialization. A second mgmt_ch will be created on the same thread when the application starts 274 * but before the deferred put_io_channel event is executed for the first mgmt_ch. 275 */ 276 min_pool_size = opts->bdev_io_cache_size * (spdk_thread_get_count() + 1); 277 if (opts->bdev_io_pool_size < min_pool_size) { 278 SPDK_ERRLOG("bdev_io_pool_size %" PRIu32 " is not compatible with bdev_io_cache_size %" PRIu32 279 " and %" PRIu32 " threads\n", opts->bdev_io_pool_size, opts->bdev_io_cache_size, 280 spdk_thread_get_count()); 281 SPDK_ERRLOG("bdev_io_pool_size must be at least %" PRIu32 "\n", min_pool_size); 282 return -1; 283 } 284 285 g_bdev_opts = *opts; 286 return 0; 287 } 288 289 struct spdk_bdev * 290 spdk_bdev_first(void) 291 { 292 struct spdk_bdev *bdev; 293 294 bdev = TAILQ_FIRST(&g_bdev_mgr.bdevs); 295 if (bdev) { 296 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Starting bdev iteration at %s\n", bdev->name); 297 } 298 299 return bdev; 300 } 301 302 struct spdk_bdev * 303 spdk_bdev_next(struct spdk_bdev *prev) 304 { 305 struct spdk_bdev *bdev; 306 307 bdev = TAILQ_NEXT(prev, internal.link); 308 if (bdev) { 309 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Continuing bdev iteration at %s\n", bdev->name); 310 } 311 312 return bdev; 313 } 314 315 static struct spdk_bdev * 316 _bdev_next_leaf(struct spdk_bdev *bdev) 317 { 318 while (bdev != NULL) { 319 if (bdev->internal.claim_module == NULL) { 320 return bdev; 321 } else { 322 bdev = TAILQ_NEXT(bdev, internal.link); 323 } 324 } 325 326 return bdev; 327 } 328 329 struct spdk_bdev * 330 spdk_bdev_first_leaf(void) 331 { 332 struct spdk_bdev *bdev; 333 334 bdev = _bdev_next_leaf(TAILQ_FIRST(&g_bdev_mgr.bdevs)); 335 336 if (bdev) { 337 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Starting bdev iteration at %s\n", bdev->name); 338 } 339 340 return bdev; 341 } 342 343 struct spdk_bdev * 344 spdk_bdev_next_leaf(struct spdk_bdev *prev) 345 { 346 struct spdk_bdev *bdev; 347 348 bdev = _bdev_next_leaf(TAILQ_NEXT(prev, internal.link)); 349 350 if (bdev) { 351 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Continuing bdev iteration at %s\n", bdev->name); 352 } 353 354 return bdev; 355 } 356 357 struct spdk_bdev * 358 spdk_bdev_get_by_name(const char *bdev_name) 359 { 360 struct spdk_bdev_alias *tmp; 361 struct spdk_bdev *bdev = spdk_bdev_first(); 362 363 while (bdev != NULL) { 364 if (strcmp(bdev_name, bdev->name) == 0) { 365 return bdev; 366 } 367 368 TAILQ_FOREACH(tmp, &bdev->aliases, tailq) { 369 if (strcmp(bdev_name, tmp->alias) == 0) { 370 return bdev; 371 } 372 } 373 374 bdev = spdk_bdev_next(bdev); 375 } 376 377 return NULL; 378 } 379 380 size_t 381 spdk_bdev_io_set_buf(struct spdk_bdev_io *bdev_io, void *buf, size_t len) 382 { 383 struct iovec **iovs; 384 int *iovcnt; 385 void *aligned_buf; 386 387 iovs = &bdev_io->u.bdev.iovs; 388 iovcnt = &bdev_io->u.bdev.iovcnt; 389 390 if (*iovs == NULL || *iovcnt == 0) { 391 *iovs = &bdev_io->iov; 392 *iovcnt = 1; 393 } 394 395 if (buf != NULL) { 396 aligned_buf = (void *)(((uintptr_t)buf + 511) & ~511UL); 397 len = len - ((uintptr_t)aligned_buf - (uintptr_t)buf); 398 } else { 399 aligned_buf = NULL; 400 assert(len == 0); 401 } 402 403 (*iovs)[0].iov_base = aligned_buf; 404 (*iovs)[0].iov_len = len; 405 406 return len; 407 } 408 409 static void 410 spdk_bdev_io_put_buf(struct spdk_bdev_io *bdev_io) 411 { 412 struct spdk_mempool *pool; 413 struct spdk_bdev_io *tmp; 414 void *buf; 415 bdev_io_stailq_t *stailq; 416 struct spdk_bdev_mgmt_channel *ch; 417 size_t len; 418 419 assert(bdev_io->u.bdev.iovcnt == 1); 420 421 buf = bdev_io->internal.buf; 422 ch = bdev_io->internal.ch->shared_resource->mgmt_ch; 423 424 if (bdev_io->internal.buf_len <= SPDK_BDEV_SMALL_BUF_MAX_SIZE) { 425 pool = g_bdev_mgr.buf_small_pool; 426 stailq = &ch->need_buf_small; 427 len = SPDK_BDEV_SMALL_BUF_MAX_SIZE + 512; 428 } else { 429 pool = g_bdev_mgr.buf_large_pool; 430 stailq = &ch->need_buf_large; 431 len = SPDK_BDEV_LARGE_BUF_MAX_SIZE + 512; 432 } 433 434 if (STAILQ_EMPTY(stailq)) { 435 spdk_mempool_put(pool, buf); 436 } else { 437 tmp = STAILQ_FIRST(stailq); 438 STAILQ_REMOVE_HEAD(stailq, internal.buf_link); 439 len = spdk_bdev_io_set_buf(tmp, buf, len); 440 if (len < tmp->internal.buf_len) { 441 SPDK_ERRLOG("Unable to use buffer due to alignment\n"); 442 spdk_mempool_put(pool, buf); 443 spdk_bdev_io_set_buf(tmp, NULL, 0); 444 return; 445 } 446 tmp->internal.buf = buf; 447 tmp->internal.get_buf_cb(tmp->internal.ch->channel, tmp); 448 } 449 } 450 451 void 452 spdk_bdev_io_get_buf(struct spdk_bdev_io *bdev_io, spdk_bdev_io_get_buf_cb cb, uint64_t len) 453 { 454 struct spdk_mempool *pool; 455 bdev_io_stailq_t *stailq; 456 void *buf = NULL; 457 struct spdk_bdev_mgmt_channel *mgmt_ch; 458 size_t buf_len; 459 460 assert(cb != NULL); 461 assert(bdev_io->u.bdev.iovs != NULL); 462 463 if (spdk_unlikely(bdev_io->u.bdev.iovs[0].iov_base != NULL)) { 464 /* Buffer already present */ 465 cb(bdev_io->internal.ch->channel, bdev_io); 466 return; 467 } 468 469 assert(len <= SPDK_BDEV_LARGE_BUF_MAX_SIZE); 470 mgmt_ch = bdev_io->internal.ch->shared_resource->mgmt_ch; 471 472 bdev_io->internal.buf_len = len; 473 bdev_io->internal.get_buf_cb = cb; 474 if (len <= SPDK_BDEV_SMALL_BUF_MAX_SIZE) { 475 pool = g_bdev_mgr.buf_small_pool; 476 stailq = &mgmt_ch->need_buf_small; 477 buf_len = SPDK_BDEV_SMALL_BUF_MAX_SIZE + 512; 478 } else { 479 pool = g_bdev_mgr.buf_large_pool; 480 stailq = &mgmt_ch->need_buf_large; 481 buf_len = SPDK_BDEV_LARGE_BUF_MAX_SIZE + 512; 482 } 483 484 buf = spdk_mempool_get(pool); 485 486 if (!buf) { 487 STAILQ_INSERT_TAIL(stailq, bdev_io, internal.buf_link); 488 } else { 489 size_t aligned_len; 490 491 aligned_len = spdk_bdev_io_set_buf(bdev_io, buf, buf_len); 492 if (aligned_len < len) { 493 SPDK_ERRLOG("Unable to use buffer after alignment calculations.\n"); 494 spdk_mempool_put(pool, buf); 495 spdk_bdev_io_set_buf(bdev_io, NULL, 0); 496 STAILQ_INSERT_TAIL(stailq, bdev_io, internal.buf_link); 497 return; 498 } 499 500 bdev_io->internal.buf = buf; 501 bdev_io->internal.get_buf_cb(bdev_io->internal.ch->channel, bdev_io); 502 } 503 } 504 505 static int 506 spdk_bdev_module_get_max_ctx_size(void) 507 { 508 struct spdk_bdev_module *bdev_module; 509 int max_bdev_module_size = 0; 510 511 TAILQ_FOREACH(bdev_module, &g_bdev_mgr.bdev_modules, internal.tailq) { 512 if (bdev_module->get_ctx_size && bdev_module->get_ctx_size() > max_bdev_module_size) { 513 max_bdev_module_size = bdev_module->get_ctx_size(); 514 } 515 } 516 517 return max_bdev_module_size; 518 } 519 520 void 521 spdk_bdev_config_text(FILE *fp) 522 { 523 struct spdk_bdev_module *bdev_module; 524 525 TAILQ_FOREACH(bdev_module, &g_bdev_mgr.bdev_modules, internal.tailq) { 526 if (bdev_module->config_text) { 527 bdev_module->config_text(fp); 528 } 529 } 530 } 531 532 void 533 spdk_bdev_subsystem_config_json(struct spdk_json_write_ctx *w) 534 { 535 struct spdk_bdev_module *bdev_module; 536 struct spdk_bdev *bdev; 537 538 assert(w != NULL); 539 540 spdk_json_write_array_begin(w); 541 542 spdk_json_write_object_begin(w); 543 spdk_json_write_named_string(w, "method", "set_bdev_options"); 544 spdk_json_write_name(w, "params"); 545 spdk_json_write_object_begin(w); 546 spdk_json_write_named_uint32(w, "bdev_io_pool_size", g_bdev_opts.bdev_io_pool_size); 547 spdk_json_write_named_uint32(w, "bdev_io_cache_size", g_bdev_opts.bdev_io_cache_size); 548 spdk_json_write_object_end(w); 549 spdk_json_write_object_end(w); 550 551 TAILQ_FOREACH(bdev_module, &g_bdev_mgr.bdev_modules, internal.tailq) { 552 if (bdev_module->config_json) { 553 bdev_module->config_json(w); 554 } 555 } 556 557 TAILQ_FOREACH(bdev, &g_bdev_mgr.bdevs, internal.link) { 558 spdk_bdev_config_json(bdev, w); 559 } 560 561 spdk_json_write_array_end(w); 562 } 563 564 static int 565 spdk_bdev_mgmt_channel_create(void *io_device, void *ctx_buf) 566 { 567 struct spdk_bdev_mgmt_channel *ch = ctx_buf; 568 struct spdk_bdev_io *bdev_io; 569 uint32_t i; 570 571 STAILQ_INIT(&ch->need_buf_small); 572 STAILQ_INIT(&ch->need_buf_large); 573 574 STAILQ_INIT(&ch->per_thread_cache); 575 ch->bdev_io_cache_size = g_bdev_opts.bdev_io_cache_size; 576 577 /* Pre-populate bdev_io cache to ensure this thread cannot be starved. */ 578 ch->per_thread_cache_count = 0; 579 for (i = 0; i < ch->bdev_io_cache_size; i++) { 580 bdev_io = spdk_mempool_get(g_bdev_mgr.bdev_io_pool); 581 assert(bdev_io != NULL); 582 ch->per_thread_cache_count++; 583 STAILQ_INSERT_TAIL(&ch->per_thread_cache, bdev_io, internal.buf_link); 584 } 585 586 TAILQ_INIT(&ch->shared_resources); 587 TAILQ_INIT(&ch->io_wait_queue); 588 589 return 0; 590 } 591 592 static void 593 spdk_bdev_mgmt_channel_destroy(void *io_device, void *ctx_buf) 594 { 595 struct spdk_bdev_mgmt_channel *ch = ctx_buf; 596 struct spdk_bdev_io *bdev_io; 597 598 if (!STAILQ_EMPTY(&ch->need_buf_small) || !STAILQ_EMPTY(&ch->need_buf_large)) { 599 SPDK_ERRLOG("Pending I/O list wasn't empty on mgmt channel free\n"); 600 } 601 602 if (!TAILQ_EMPTY(&ch->shared_resources)) { 603 SPDK_ERRLOG("Module channel list wasn't empty on mgmt channel free\n"); 604 } 605 606 while (!STAILQ_EMPTY(&ch->per_thread_cache)) { 607 bdev_io = STAILQ_FIRST(&ch->per_thread_cache); 608 STAILQ_REMOVE_HEAD(&ch->per_thread_cache, internal.buf_link); 609 ch->per_thread_cache_count--; 610 spdk_mempool_put(g_bdev_mgr.bdev_io_pool, (void *)bdev_io); 611 } 612 613 assert(ch->per_thread_cache_count == 0); 614 } 615 616 static void 617 spdk_bdev_init_complete(int rc) 618 { 619 spdk_bdev_init_cb cb_fn = g_init_cb_fn; 620 void *cb_arg = g_init_cb_arg; 621 struct spdk_bdev_module *m; 622 623 g_bdev_mgr.init_complete = true; 624 g_init_cb_fn = NULL; 625 g_init_cb_arg = NULL; 626 627 /* 628 * For modules that need to know when subsystem init is complete, 629 * inform them now. 630 */ 631 TAILQ_FOREACH(m, &g_bdev_mgr.bdev_modules, internal.tailq) { 632 if (m->init_complete) { 633 m->init_complete(); 634 } 635 } 636 637 cb_fn(cb_arg, rc); 638 } 639 640 static void 641 spdk_bdev_module_action_complete(void) 642 { 643 struct spdk_bdev_module *m; 644 645 /* 646 * Don't finish bdev subsystem initialization if 647 * module pre-initialization is still in progress, or 648 * the subsystem been already initialized. 649 */ 650 if (!g_bdev_mgr.module_init_complete || g_bdev_mgr.init_complete) { 651 return; 652 } 653 654 /* 655 * Check all bdev modules for inits/examinations in progress. If any 656 * exist, return immediately since we cannot finish bdev subsystem 657 * initialization until all are completed. 658 */ 659 TAILQ_FOREACH(m, &g_bdev_mgr.bdev_modules, internal.tailq) { 660 if (m->internal.action_in_progress > 0) { 661 return; 662 } 663 } 664 665 /* 666 * Modules already finished initialization - now that all 667 * the bdev modules have finished their asynchronous I/O 668 * processing, the entire bdev layer can be marked as complete. 669 */ 670 spdk_bdev_init_complete(0); 671 } 672 673 static void 674 spdk_bdev_module_action_done(struct spdk_bdev_module *module) 675 { 676 assert(module->internal.action_in_progress > 0); 677 module->internal.action_in_progress--; 678 spdk_bdev_module_action_complete(); 679 } 680 681 void 682 spdk_bdev_module_init_done(struct spdk_bdev_module *module) 683 { 684 spdk_bdev_module_action_done(module); 685 } 686 687 void 688 spdk_bdev_module_examine_done(struct spdk_bdev_module *module) 689 { 690 spdk_bdev_module_action_done(module); 691 } 692 693 static int 694 spdk_bdev_modules_init(void) 695 { 696 struct spdk_bdev_module *module; 697 int rc = 0; 698 699 TAILQ_FOREACH(module, &g_bdev_mgr.bdev_modules, internal.tailq) { 700 rc = module->module_init(); 701 if (rc != 0) { 702 break; 703 } 704 } 705 706 g_bdev_mgr.module_init_complete = true; 707 return rc; 708 } 709 710 void 711 spdk_bdev_initialize(spdk_bdev_init_cb cb_fn, void *cb_arg) 712 { 713 struct spdk_conf_section *sp; 714 struct spdk_bdev_opts bdev_opts; 715 int32_t bdev_io_pool_size, bdev_io_cache_size; 716 int cache_size; 717 int rc = 0; 718 char mempool_name[32]; 719 720 assert(cb_fn != NULL); 721 722 sp = spdk_conf_find_section(NULL, "Bdev"); 723 if (sp != NULL) { 724 spdk_bdev_get_opts(&bdev_opts); 725 726 bdev_io_pool_size = spdk_conf_section_get_intval(sp, "BdevIoPoolSize"); 727 if (bdev_io_pool_size >= 0) { 728 bdev_opts.bdev_io_pool_size = bdev_io_pool_size; 729 } 730 731 bdev_io_cache_size = spdk_conf_section_get_intval(sp, "BdevIoCacheSize"); 732 if (bdev_io_cache_size >= 0) { 733 bdev_opts.bdev_io_cache_size = bdev_io_cache_size; 734 } 735 736 if (spdk_bdev_set_opts(&bdev_opts)) { 737 spdk_bdev_init_complete(-1); 738 return; 739 } 740 741 assert(memcmp(&bdev_opts, &g_bdev_opts, sizeof(bdev_opts)) == 0); 742 } 743 744 g_init_cb_fn = cb_fn; 745 g_init_cb_arg = cb_arg; 746 747 snprintf(mempool_name, sizeof(mempool_name), "bdev_io_%d", getpid()); 748 749 g_bdev_mgr.bdev_io_pool = spdk_mempool_create(mempool_name, 750 g_bdev_opts.bdev_io_pool_size, 751 sizeof(struct spdk_bdev_io) + 752 spdk_bdev_module_get_max_ctx_size(), 753 0, 754 SPDK_ENV_SOCKET_ID_ANY); 755 756 if (g_bdev_mgr.bdev_io_pool == NULL) { 757 SPDK_ERRLOG("could not allocate spdk_bdev_io pool\n"); 758 spdk_bdev_init_complete(-1); 759 return; 760 } 761 762 /** 763 * Ensure no more than half of the total buffers end up local caches, by 764 * using spdk_thread_get_count() to determine how many local caches we need 765 * to account for. 766 */ 767 cache_size = BUF_SMALL_POOL_SIZE / (2 * spdk_thread_get_count()); 768 snprintf(mempool_name, sizeof(mempool_name), "buf_small_pool_%d", getpid()); 769 770 g_bdev_mgr.buf_small_pool = spdk_mempool_create(mempool_name, 771 BUF_SMALL_POOL_SIZE, 772 SPDK_BDEV_SMALL_BUF_MAX_SIZE + 512, 773 cache_size, 774 SPDK_ENV_SOCKET_ID_ANY); 775 if (!g_bdev_mgr.buf_small_pool) { 776 SPDK_ERRLOG("create rbuf small pool failed\n"); 777 spdk_bdev_init_complete(-1); 778 return; 779 } 780 781 cache_size = BUF_LARGE_POOL_SIZE / (2 * spdk_thread_get_count()); 782 snprintf(mempool_name, sizeof(mempool_name), "buf_large_pool_%d", getpid()); 783 784 g_bdev_mgr.buf_large_pool = spdk_mempool_create(mempool_name, 785 BUF_LARGE_POOL_SIZE, 786 SPDK_BDEV_LARGE_BUF_MAX_SIZE + 512, 787 cache_size, 788 SPDK_ENV_SOCKET_ID_ANY); 789 if (!g_bdev_mgr.buf_large_pool) { 790 SPDK_ERRLOG("create rbuf large pool failed\n"); 791 spdk_bdev_init_complete(-1); 792 return; 793 } 794 795 g_bdev_mgr.zero_buffer = spdk_dma_zmalloc(ZERO_BUFFER_SIZE, ZERO_BUFFER_SIZE, 796 NULL); 797 if (!g_bdev_mgr.zero_buffer) { 798 SPDK_ERRLOG("create bdev zero buffer failed\n"); 799 spdk_bdev_init_complete(-1); 800 return; 801 } 802 803 #ifdef SPDK_CONFIG_VTUNE 804 g_bdev_mgr.domain = __itt_domain_create("spdk_bdev"); 805 #endif 806 807 spdk_io_device_register(&g_bdev_mgr, spdk_bdev_mgmt_channel_create, 808 spdk_bdev_mgmt_channel_destroy, 809 sizeof(struct spdk_bdev_mgmt_channel)); 810 811 rc = spdk_bdev_modules_init(); 812 if (rc != 0) { 813 SPDK_ERRLOG("bdev modules init failed\n"); 814 spdk_bdev_init_complete(-1); 815 return; 816 } 817 818 spdk_bdev_module_action_complete(); 819 } 820 821 static void 822 spdk_bdev_mgr_unregister_cb(void *io_device) 823 { 824 spdk_bdev_fini_cb cb_fn = g_fini_cb_fn; 825 826 if (spdk_mempool_count(g_bdev_mgr.bdev_io_pool) != g_bdev_opts.bdev_io_pool_size) { 827 SPDK_ERRLOG("bdev IO pool count is %zu but should be %u\n", 828 spdk_mempool_count(g_bdev_mgr.bdev_io_pool), 829 g_bdev_opts.bdev_io_pool_size); 830 } 831 832 if (spdk_mempool_count(g_bdev_mgr.buf_small_pool) != BUF_SMALL_POOL_SIZE) { 833 SPDK_ERRLOG("Small buffer pool count is %zu but should be %u\n", 834 spdk_mempool_count(g_bdev_mgr.buf_small_pool), 835 BUF_SMALL_POOL_SIZE); 836 assert(false); 837 } 838 839 if (spdk_mempool_count(g_bdev_mgr.buf_large_pool) != BUF_LARGE_POOL_SIZE) { 840 SPDK_ERRLOG("Large buffer pool count is %zu but should be %u\n", 841 spdk_mempool_count(g_bdev_mgr.buf_large_pool), 842 BUF_LARGE_POOL_SIZE); 843 assert(false); 844 } 845 846 spdk_mempool_free(g_bdev_mgr.bdev_io_pool); 847 spdk_mempool_free(g_bdev_mgr.buf_small_pool); 848 spdk_mempool_free(g_bdev_mgr.buf_large_pool); 849 spdk_dma_free(g_bdev_mgr.zero_buffer); 850 851 cb_fn(g_fini_cb_arg); 852 g_fini_cb_fn = NULL; 853 g_fini_cb_arg = NULL; 854 } 855 856 static struct spdk_bdev_module *g_resume_bdev_module = NULL; 857 858 static void 859 spdk_bdev_module_finish_iter(void *arg) 860 { 861 struct spdk_bdev_module *bdev_module; 862 863 /* Start iterating from the last touched module */ 864 if (!g_resume_bdev_module) { 865 bdev_module = TAILQ_FIRST(&g_bdev_mgr.bdev_modules); 866 } else { 867 bdev_module = TAILQ_NEXT(g_resume_bdev_module, internal.tailq); 868 } 869 870 while (bdev_module) { 871 if (bdev_module->async_fini) { 872 /* Save our place so we can resume later. We must 873 * save the variable here, before calling module_fini() 874 * below, because in some cases the module may immediately 875 * call spdk_bdev_module_finish_done() and re-enter 876 * this function to continue iterating. */ 877 g_resume_bdev_module = bdev_module; 878 } 879 880 if (bdev_module->module_fini) { 881 bdev_module->module_fini(); 882 } 883 884 if (bdev_module->async_fini) { 885 return; 886 } 887 888 bdev_module = TAILQ_NEXT(bdev_module, internal.tailq); 889 } 890 891 g_resume_bdev_module = NULL; 892 spdk_io_device_unregister(&g_bdev_mgr, spdk_bdev_mgr_unregister_cb); 893 } 894 895 void 896 spdk_bdev_module_finish_done(void) 897 { 898 if (spdk_get_thread() != g_fini_thread) { 899 spdk_thread_send_msg(g_fini_thread, spdk_bdev_module_finish_iter, NULL); 900 } else { 901 spdk_bdev_module_finish_iter(NULL); 902 } 903 } 904 905 static void 906 _spdk_bdev_finish_unregister_bdevs_iter(void *cb_arg, int bdeverrno) 907 { 908 struct spdk_bdev *bdev = cb_arg; 909 910 if (bdeverrno && bdev) { 911 SPDK_WARNLOG("Unable to unregister bdev '%s' during spdk_bdev_finish()\n", 912 bdev->name); 913 914 /* 915 * Since the call to spdk_bdev_unregister() failed, we have no way to free this 916 * bdev; try to continue by manually removing this bdev from the list and continue 917 * with the next bdev in the list. 918 */ 919 TAILQ_REMOVE(&g_bdev_mgr.bdevs, bdev, internal.link); 920 } 921 922 if (TAILQ_EMPTY(&g_bdev_mgr.bdevs)) { 923 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Done unregistering bdevs\n"); 924 /* 925 * Bdev module finish need to be deffered as we might be in the middle of some context 926 * (like bdev part free) that will use this bdev (or private bdev driver ctx data) 927 * after returning. 928 */ 929 spdk_thread_send_msg(spdk_get_thread(), spdk_bdev_module_finish_iter, NULL); 930 return; 931 } 932 933 /* 934 * Unregister the first bdev in the list. 935 * 936 * spdk_bdev_unregister() will handle the case where the bdev has open descriptors by 937 * calling the remove_cb of the descriptors first. 938 * 939 * Once this bdev and all of its open descriptors have been cleaned up, this function 940 * will be called again via the unregister completion callback to continue the cleanup 941 * process with the next bdev. 942 */ 943 bdev = TAILQ_FIRST(&g_bdev_mgr.bdevs); 944 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Unregistering bdev '%s'\n", bdev->name); 945 spdk_bdev_unregister(bdev, _spdk_bdev_finish_unregister_bdevs_iter, bdev); 946 } 947 948 void 949 spdk_bdev_finish(spdk_bdev_fini_cb cb_fn, void *cb_arg) 950 { 951 assert(cb_fn != NULL); 952 953 g_fini_thread = spdk_get_thread(); 954 955 g_fini_cb_fn = cb_fn; 956 g_fini_cb_arg = cb_arg; 957 958 _spdk_bdev_finish_unregister_bdevs_iter(NULL, 0); 959 } 960 961 static struct spdk_bdev_io * 962 spdk_bdev_get_io(struct spdk_bdev_channel *channel) 963 { 964 struct spdk_bdev_mgmt_channel *ch = channel->shared_resource->mgmt_ch; 965 struct spdk_bdev_io *bdev_io; 966 967 if (ch->per_thread_cache_count > 0) { 968 bdev_io = STAILQ_FIRST(&ch->per_thread_cache); 969 STAILQ_REMOVE_HEAD(&ch->per_thread_cache, internal.buf_link); 970 ch->per_thread_cache_count--; 971 } else if (spdk_unlikely(!TAILQ_EMPTY(&ch->io_wait_queue))) { 972 /* 973 * Don't try to look for bdev_ios in the global pool if there are 974 * waiters on bdev_ios - we don't want this caller to jump the line. 975 */ 976 bdev_io = NULL; 977 } else { 978 bdev_io = spdk_mempool_get(g_bdev_mgr.bdev_io_pool); 979 } 980 981 return bdev_io; 982 } 983 984 void 985 spdk_bdev_free_io(struct spdk_bdev_io *bdev_io) 986 { 987 struct spdk_bdev_mgmt_channel *ch = bdev_io->internal.ch->shared_resource->mgmt_ch; 988 989 assert(bdev_io != NULL); 990 assert(bdev_io->internal.status != SPDK_BDEV_IO_STATUS_PENDING); 991 992 if (bdev_io->internal.buf != NULL) { 993 spdk_bdev_io_put_buf(bdev_io); 994 } 995 996 if (ch->per_thread_cache_count < ch->bdev_io_cache_size) { 997 ch->per_thread_cache_count++; 998 STAILQ_INSERT_TAIL(&ch->per_thread_cache, bdev_io, internal.buf_link); 999 while (ch->per_thread_cache_count > 0 && !TAILQ_EMPTY(&ch->io_wait_queue)) { 1000 struct spdk_bdev_io_wait_entry *entry; 1001 1002 entry = TAILQ_FIRST(&ch->io_wait_queue); 1003 TAILQ_REMOVE(&ch->io_wait_queue, entry, link); 1004 entry->cb_fn(entry->cb_arg); 1005 } 1006 } else { 1007 /* We should never have a full cache with entries on the io wait queue. */ 1008 assert(TAILQ_EMPTY(&ch->io_wait_queue)); 1009 spdk_mempool_put(g_bdev_mgr.bdev_io_pool, (void *)bdev_io); 1010 } 1011 } 1012 1013 static uint64_t 1014 _spdk_bdev_get_io_size_in_byte(struct spdk_bdev_io *bdev_io) 1015 { 1016 struct spdk_bdev *bdev = bdev_io->bdev; 1017 1018 switch (bdev_io->type) { 1019 case SPDK_BDEV_IO_TYPE_NVME_ADMIN: 1020 case SPDK_BDEV_IO_TYPE_NVME_IO: 1021 case SPDK_BDEV_IO_TYPE_NVME_IO_MD: 1022 return bdev_io->u.nvme_passthru.nbytes; 1023 case SPDK_BDEV_IO_TYPE_READ: 1024 case SPDK_BDEV_IO_TYPE_WRITE: 1025 case SPDK_BDEV_IO_TYPE_UNMAP: 1026 case SPDK_BDEV_IO_TYPE_WRITE_ZEROES: 1027 return bdev_io->u.bdev.num_blocks * bdev->blocklen; 1028 default: 1029 return 0; 1030 } 1031 } 1032 1033 static void 1034 _spdk_bdev_qos_io_submit(struct spdk_bdev_channel *ch) 1035 { 1036 struct spdk_bdev_io *bdev_io = NULL; 1037 struct spdk_bdev *bdev = ch->bdev; 1038 struct spdk_bdev_qos *qos = bdev->internal.qos; 1039 struct spdk_bdev_shared_resource *shared_resource = ch->shared_resource; 1040 1041 while (!TAILQ_EMPTY(&qos->queued)) { 1042 if (qos->max_ios_per_timeslice > 0 && 1043 qos->io_submitted_this_timeslice >= qos->max_ios_per_timeslice) { 1044 break; 1045 } 1046 1047 if (qos->max_byte_per_timeslice > 0 && 1048 qos->byte_submitted_this_timeslice >= qos->max_byte_per_timeslice) { 1049 break; 1050 } 1051 1052 bdev_io = TAILQ_FIRST(&qos->queued); 1053 TAILQ_REMOVE(&qos->queued, bdev_io, internal.link); 1054 qos->io_submitted_this_timeslice++; 1055 qos->byte_submitted_this_timeslice += _spdk_bdev_get_io_size_in_byte(bdev_io); 1056 ch->io_outstanding++; 1057 shared_resource->io_outstanding++; 1058 bdev->fn_table->submit_request(ch->channel, bdev_io); 1059 } 1060 } 1061 1062 static void 1063 _spdk_bdev_io_submit(void *ctx) 1064 { 1065 struct spdk_bdev_io *bdev_io = ctx; 1066 struct spdk_bdev *bdev = bdev_io->bdev; 1067 struct spdk_bdev_channel *bdev_ch = bdev_io->internal.ch; 1068 struct spdk_io_channel *ch = bdev_ch->channel; 1069 struct spdk_bdev_shared_resource *shared_resource = bdev_ch->shared_resource; 1070 1071 bdev_io->internal.submit_tsc = spdk_get_ticks(); 1072 bdev_ch->io_outstanding++; 1073 shared_resource->io_outstanding++; 1074 bdev_io->internal.in_submit_request = true; 1075 if (spdk_likely(bdev_ch->flags == 0)) { 1076 if (spdk_likely(TAILQ_EMPTY(&shared_resource->nomem_io))) { 1077 bdev->fn_table->submit_request(ch, bdev_io); 1078 } else { 1079 bdev_ch->io_outstanding--; 1080 shared_resource->io_outstanding--; 1081 TAILQ_INSERT_TAIL(&shared_resource->nomem_io, bdev_io, internal.link); 1082 } 1083 } else if (bdev_ch->flags & BDEV_CH_RESET_IN_PROGRESS) { 1084 spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED); 1085 } else if (bdev_ch->flags & BDEV_CH_QOS_ENABLED) { 1086 bdev_ch->io_outstanding--; 1087 shared_resource->io_outstanding--; 1088 TAILQ_INSERT_TAIL(&bdev->internal.qos->queued, bdev_io, internal.link); 1089 _spdk_bdev_qos_io_submit(bdev_ch); 1090 } else { 1091 SPDK_ERRLOG("unknown bdev_ch flag %x found\n", bdev_ch->flags); 1092 spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED); 1093 } 1094 bdev_io->internal.in_submit_request = false; 1095 } 1096 1097 static void 1098 spdk_bdev_io_submit(struct spdk_bdev_io *bdev_io) 1099 { 1100 struct spdk_bdev *bdev = bdev_io->bdev; 1101 struct spdk_thread *thread = spdk_io_channel_get_thread(bdev_io->internal.ch->channel); 1102 1103 assert(bdev_io->internal.status == SPDK_BDEV_IO_STATUS_PENDING); 1104 1105 if (bdev_io->internal.ch->flags & BDEV_CH_QOS_ENABLED) { 1106 if (thread == bdev->internal.qos->thread) { 1107 _spdk_bdev_io_submit(bdev_io); 1108 } else { 1109 bdev_io->internal.io_submit_ch = bdev_io->internal.ch; 1110 bdev_io->internal.ch = bdev->internal.qos->ch; 1111 spdk_thread_send_msg(bdev->internal.qos->thread, _spdk_bdev_io_submit, bdev_io); 1112 } 1113 } else { 1114 _spdk_bdev_io_submit(bdev_io); 1115 } 1116 } 1117 1118 static void 1119 spdk_bdev_io_submit_reset(struct spdk_bdev_io *bdev_io) 1120 { 1121 struct spdk_bdev *bdev = bdev_io->bdev; 1122 struct spdk_bdev_channel *bdev_ch = bdev_io->internal.ch; 1123 struct spdk_io_channel *ch = bdev_ch->channel; 1124 1125 assert(bdev_io->internal.status == SPDK_BDEV_IO_STATUS_PENDING); 1126 1127 bdev_io->internal.in_submit_request = true; 1128 bdev->fn_table->submit_request(ch, bdev_io); 1129 bdev_io->internal.in_submit_request = false; 1130 } 1131 1132 static void 1133 spdk_bdev_io_init(struct spdk_bdev_io *bdev_io, 1134 struct spdk_bdev *bdev, void *cb_arg, 1135 spdk_bdev_io_completion_cb cb) 1136 { 1137 bdev_io->bdev = bdev; 1138 bdev_io->internal.caller_ctx = cb_arg; 1139 bdev_io->internal.cb = cb; 1140 bdev_io->internal.status = SPDK_BDEV_IO_STATUS_PENDING; 1141 bdev_io->internal.in_submit_request = false; 1142 bdev_io->internal.buf = NULL; 1143 bdev_io->internal.io_submit_ch = NULL; 1144 } 1145 1146 static bool 1147 _spdk_bdev_io_type_supported(struct spdk_bdev *bdev, enum spdk_bdev_io_type io_type) 1148 { 1149 return bdev->fn_table->io_type_supported(bdev->ctxt, io_type); 1150 } 1151 1152 bool 1153 spdk_bdev_io_type_supported(struct spdk_bdev *bdev, enum spdk_bdev_io_type io_type) 1154 { 1155 bool supported; 1156 1157 supported = _spdk_bdev_io_type_supported(bdev, io_type); 1158 1159 if (!supported) { 1160 switch (io_type) { 1161 case SPDK_BDEV_IO_TYPE_WRITE_ZEROES: 1162 /* The bdev layer will emulate write zeroes as long as write is supported. */ 1163 supported = _spdk_bdev_io_type_supported(bdev, SPDK_BDEV_IO_TYPE_WRITE); 1164 break; 1165 default: 1166 break; 1167 } 1168 } 1169 1170 return supported; 1171 } 1172 1173 int 1174 spdk_bdev_dump_info_json(struct spdk_bdev *bdev, struct spdk_json_write_ctx *w) 1175 { 1176 if (bdev->fn_table->dump_info_json) { 1177 return bdev->fn_table->dump_info_json(bdev->ctxt, w); 1178 } 1179 1180 return 0; 1181 } 1182 1183 void 1184 spdk_bdev_config_json(struct spdk_bdev *bdev, struct spdk_json_write_ctx *w) 1185 { 1186 assert(bdev != NULL); 1187 assert(w != NULL); 1188 1189 if (bdev->fn_table->write_config_json) { 1190 bdev->fn_table->write_config_json(bdev, w); 1191 } else { 1192 spdk_json_write_object_begin(w); 1193 spdk_json_write_named_string(w, "name", bdev->name); 1194 spdk_json_write_object_end(w); 1195 } 1196 } 1197 1198 static void 1199 spdk_bdev_qos_update_max_quota_per_timeslice(struct spdk_bdev_qos *qos) 1200 { 1201 uint64_t max_ios_per_timeslice = 0, max_byte_per_timeslice = 0; 1202 1203 if (qos->iops_rate_limit > 0) { 1204 max_ios_per_timeslice = qos->iops_rate_limit * SPDK_BDEV_QOS_TIMESLICE_IN_USEC / 1205 SPDK_BDEV_SEC_TO_USEC; 1206 qos->max_ios_per_timeslice = spdk_max(max_ios_per_timeslice, 1207 SPDK_BDEV_QOS_MIN_IO_PER_TIMESLICE); 1208 } 1209 1210 if (qos->byte_rate_limit > 0) { 1211 max_byte_per_timeslice = qos->byte_rate_limit * SPDK_BDEV_QOS_TIMESLICE_IN_USEC / 1212 SPDK_BDEV_SEC_TO_USEC; 1213 qos->max_byte_per_timeslice = spdk_max(max_byte_per_timeslice, 1214 SPDK_BDEV_QOS_MIN_BYTE_PER_TIMESLICE); 1215 } 1216 } 1217 1218 static int 1219 spdk_bdev_channel_poll_qos(void *arg) 1220 { 1221 struct spdk_bdev_qos *qos = arg; 1222 1223 /* Reset for next round of rate limiting */ 1224 qos->io_submitted_this_timeslice = 0; 1225 qos->byte_submitted_this_timeslice = 0; 1226 1227 _spdk_bdev_qos_io_submit(qos->ch); 1228 1229 return -1; 1230 } 1231 1232 static void 1233 _spdk_bdev_channel_destroy_resource(struct spdk_bdev_channel *ch) 1234 { 1235 struct spdk_bdev_shared_resource *shared_resource; 1236 1237 if (!ch) { 1238 return; 1239 } 1240 1241 if (ch->channel) { 1242 spdk_put_io_channel(ch->channel); 1243 } 1244 1245 assert(ch->io_outstanding == 0); 1246 1247 shared_resource = ch->shared_resource; 1248 if (shared_resource) { 1249 assert(ch->io_outstanding == 0); 1250 assert(shared_resource->ref > 0); 1251 shared_resource->ref--; 1252 if (shared_resource->ref == 0) { 1253 assert(shared_resource->io_outstanding == 0); 1254 TAILQ_REMOVE(&shared_resource->mgmt_ch->shared_resources, shared_resource, link); 1255 spdk_put_io_channel(spdk_io_channel_from_ctx(shared_resource->mgmt_ch)); 1256 free(shared_resource); 1257 } 1258 } 1259 } 1260 1261 /* Caller must hold bdev->internal.mutex. */ 1262 static int 1263 _spdk_bdev_enable_qos(struct spdk_bdev *bdev, struct spdk_bdev_channel *ch) 1264 { 1265 struct spdk_bdev_qos *qos = bdev->internal.qos; 1266 1267 /* Rate limiting on this bdev enabled */ 1268 if (qos) { 1269 if (qos->ch == NULL) { 1270 struct spdk_io_channel *io_ch; 1271 1272 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Selecting channel %p as QoS channel for bdev %s on thread %p\n", ch, 1273 bdev->name, spdk_get_thread()); 1274 1275 /* No qos channel has been selected, so set one up */ 1276 1277 /* Take another reference to ch */ 1278 io_ch = spdk_get_io_channel(__bdev_to_io_dev(bdev)); 1279 qos->ch = ch; 1280 1281 qos->thread = spdk_io_channel_get_thread(io_ch); 1282 1283 TAILQ_INIT(&qos->queued); 1284 spdk_bdev_qos_update_max_quota_per_timeslice(qos); 1285 qos->io_submitted_this_timeslice = 0; 1286 qos->byte_submitted_this_timeslice = 0; 1287 1288 qos->poller = spdk_poller_register(spdk_bdev_channel_poll_qos, 1289 qos, 1290 SPDK_BDEV_QOS_TIMESLICE_IN_USEC); 1291 } 1292 1293 ch->flags |= BDEV_CH_QOS_ENABLED; 1294 } 1295 1296 return 0; 1297 } 1298 1299 static int 1300 spdk_bdev_channel_create(void *io_device, void *ctx_buf) 1301 { 1302 struct spdk_bdev *bdev = __bdev_from_io_dev(io_device); 1303 struct spdk_bdev_channel *ch = ctx_buf; 1304 struct spdk_io_channel *mgmt_io_ch; 1305 struct spdk_bdev_mgmt_channel *mgmt_ch; 1306 struct spdk_bdev_shared_resource *shared_resource; 1307 1308 ch->bdev = bdev; 1309 ch->channel = bdev->fn_table->get_io_channel(bdev->ctxt); 1310 if (!ch->channel) { 1311 return -1; 1312 } 1313 1314 mgmt_io_ch = spdk_get_io_channel(&g_bdev_mgr); 1315 if (!mgmt_io_ch) { 1316 return -1; 1317 } 1318 1319 mgmt_ch = spdk_io_channel_get_ctx(mgmt_io_ch); 1320 TAILQ_FOREACH(shared_resource, &mgmt_ch->shared_resources, link) { 1321 if (shared_resource->shared_ch == ch->channel) { 1322 spdk_put_io_channel(mgmt_io_ch); 1323 shared_resource->ref++; 1324 break; 1325 } 1326 } 1327 1328 if (shared_resource == NULL) { 1329 shared_resource = calloc(1, sizeof(*shared_resource)); 1330 if (shared_resource == NULL) { 1331 spdk_put_io_channel(mgmt_io_ch); 1332 return -1; 1333 } 1334 1335 shared_resource->mgmt_ch = mgmt_ch; 1336 shared_resource->io_outstanding = 0; 1337 TAILQ_INIT(&shared_resource->nomem_io); 1338 shared_resource->nomem_threshold = 0; 1339 shared_resource->shared_ch = ch->channel; 1340 shared_resource->ref = 1; 1341 TAILQ_INSERT_TAIL(&mgmt_ch->shared_resources, shared_resource, link); 1342 } 1343 1344 memset(&ch->stat, 0, sizeof(ch->stat)); 1345 ch->stat.ticks_rate = spdk_get_ticks_hz(); 1346 ch->io_outstanding = 0; 1347 TAILQ_INIT(&ch->queued_resets); 1348 ch->flags = 0; 1349 ch->shared_resource = shared_resource; 1350 1351 #ifdef SPDK_CONFIG_VTUNE 1352 { 1353 char *name; 1354 __itt_init_ittlib(NULL, 0); 1355 name = spdk_sprintf_alloc("spdk_bdev_%s_%p", ch->bdev->name, ch); 1356 if (!name) { 1357 _spdk_bdev_channel_destroy_resource(ch); 1358 return -1; 1359 } 1360 ch->handle = __itt_string_handle_create(name); 1361 free(name); 1362 ch->start_tsc = spdk_get_ticks(); 1363 ch->interval_tsc = spdk_get_ticks_hz() / 100; 1364 memset(&ch->prev_stat, 0, sizeof(ch->prev_stat)); 1365 } 1366 #endif 1367 1368 pthread_mutex_lock(&bdev->internal.mutex); 1369 1370 if (_spdk_bdev_enable_qos(bdev, ch)) { 1371 _spdk_bdev_channel_destroy_resource(ch); 1372 pthread_mutex_unlock(&bdev->internal.mutex); 1373 return -1; 1374 } 1375 1376 pthread_mutex_unlock(&bdev->internal.mutex); 1377 1378 return 0; 1379 } 1380 1381 /* 1382 * Abort I/O that are waiting on a data buffer. These types of I/O are 1383 * linked using the spdk_bdev_io internal.buf_link TAILQ_ENTRY. 1384 */ 1385 static void 1386 _spdk_bdev_abort_buf_io(bdev_io_stailq_t *queue, struct spdk_bdev_channel *ch) 1387 { 1388 bdev_io_stailq_t tmp; 1389 struct spdk_bdev_io *bdev_io; 1390 1391 STAILQ_INIT(&tmp); 1392 1393 while (!STAILQ_EMPTY(queue)) { 1394 bdev_io = STAILQ_FIRST(queue); 1395 STAILQ_REMOVE_HEAD(queue, internal.buf_link); 1396 if (bdev_io->internal.ch == ch) { 1397 spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED); 1398 } else { 1399 STAILQ_INSERT_TAIL(&tmp, bdev_io, internal.buf_link); 1400 } 1401 } 1402 1403 STAILQ_SWAP(&tmp, queue, spdk_bdev_io); 1404 } 1405 1406 /* 1407 * Abort I/O that are queued waiting for submission. These types of I/O are 1408 * linked using the spdk_bdev_io link TAILQ_ENTRY. 1409 */ 1410 static void 1411 _spdk_bdev_abort_queued_io(bdev_io_tailq_t *queue, struct spdk_bdev_channel *ch) 1412 { 1413 struct spdk_bdev_io *bdev_io, *tmp; 1414 1415 TAILQ_FOREACH_SAFE(bdev_io, queue, internal.link, tmp) { 1416 if (bdev_io->internal.ch == ch) { 1417 TAILQ_REMOVE(queue, bdev_io, internal.link); 1418 /* 1419 * spdk_bdev_io_complete() assumes that the completed I/O had 1420 * been submitted to the bdev module. Since in this case it 1421 * hadn't, bump io_outstanding to account for the decrement 1422 * that spdk_bdev_io_complete() will do. 1423 */ 1424 if (bdev_io->type != SPDK_BDEV_IO_TYPE_RESET) { 1425 ch->io_outstanding++; 1426 ch->shared_resource->io_outstanding++; 1427 } 1428 spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED); 1429 } 1430 } 1431 } 1432 1433 static void 1434 spdk_bdev_qos_channel_destroy(void *cb_arg) 1435 { 1436 struct spdk_bdev_qos *qos = cb_arg; 1437 1438 spdk_put_io_channel(spdk_io_channel_from_ctx(qos->ch)); 1439 spdk_poller_unregister(&qos->poller); 1440 1441 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Free QoS %p.\n", qos); 1442 1443 free(qos); 1444 } 1445 1446 static int 1447 spdk_bdev_qos_destroy(struct spdk_bdev *bdev) 1448 { 1449 /* 1450 * Cleanly shutting down the QoS poller is tricky, because 1451 * during the asynchronous operation the user could open 1452 * a new descriptor and create a new channel, spawning 1453 * a new QoS poller. 1454 * 1455 * The strategy is to create a new QoS structure here and swap it 1456 * in. The shutdown path then continues to refer to the old one 1457 * until it completes and then releases it. 1458 */ 1459 struct spdk_bdev_qos *new_qos, *old_qos; 1460 1461 old_qos = bdev->internal.qos; 1462 1463 new_qos = calloc(1, sizeof(*new_qos)); 1464 if (!new_qos) { 1465 SPDK_ERRLOG("Unable to allocate memory to shut down QoS.\n"); 1466 return -ENOMEM; 1467 } 1468 1469 /* Copy the old QoS data into the newly allocated structure */ 1470 memcpy(new_qos, old_qos, sizeof(*new_qos)); 1471 1472 /* Zero out the key parts of the QoS structure */ 1473 new_qos->ch = NULL; 1474 new_qos->thread = NULL; 1475 new_qos->max_ios_per_timeslice = 0; 1476 new_qos->max_byte_per_timeslice = 0; 1477 new_qos->io_submitted_this_timeslice = 0; 1478 new_qos->byte_submitted_this_timeslice = 0; 1479 new_qos->poller = NULL; 1480 TAILQ_INIT(&new_qos->queued); 1481 1482 bdev->internal.qos = new_qos; 1483 1484 spdk_thread_send_msg(old_qos->thread, spdk_bdev_qos_channel_destroy, 1485 old_qos); 1486 1487 /* It is safe to continue with destroying the bdev even though the QoS channel hasn't 1488 * been destroyed yet. The destruction path will end up waiting for the final 1489 * channel to be put before it releases resources. */ 1490 1491 return 0; 1492 } 1493 1494 static void 1495 spdk_bdev_channel_destroy(void *io_device, void *ctx_buf) 1496 { 1497 struct spdk_bdev_channel *ch = ctx_buf; 1498 struct spdk_bdev_mgmt_channel *mgmt_ch; 1499 struct spdk_bdev_shared_resource *shared_resource = ch->shared_resource; 1500 1501 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Destroying channel %p for bdev %s on thread %p\n", ch, ch->bdev->name, 1502 spdk_get_thread()); 1503 1504 mgmt_ch = shared_resource->mgmt_ch; 1505 1506 _spdk_bdev_abort_queued_io(&ch->queued_resets, ch); 1507 _spdk_bdev_abort_queued_io(&shared_resource->nomem_io, ch); 1508 _spdk_bdev_abort_buf_io(&mgmt_ch->need_buf_small, ch); 1509 _spdk_bdev_abort_buf_io(&mgmt_ch->need_buf_large, ch); 1510 1511 _spdk_bdev_channel_destroy_resource(ch); 1512 } 1513 1514 int 1515 spdk_bdev_alias_add(struct spdk_bdev *bdev, const char *alias) 1516 { 1517 struct spdk_bdev_alias *tmp; 1518 1519 if (alias == NULL) { 1520 SPDK_ERRLOG("Empty alias passed\n"); 1521 return -EINVAL; 1522 } 1523 1524 if (spdk_bdev_get_by_name(alias)) { 1525 SPDK_ERRLOG("Bdev name/alias: %s already exists\n", alias); 1526 return -EEXIST; 1527 } 1528 1529 tmp = calloc(1, sizeof(*tmp)); 1530 if (tmp == NULL) { 1531 SPDK_ERRLOG("Unable to allocate alias\n"); 1532 return -ENOMEM; 1533 } 1534 1535 tmp->alias = strdup(alias); 1536 if (tmp->alias == NULL) { 1537 free(tmp); 1538 SPDK_ERRLOG("Unable to allocate alias\n"); 1539 return -ENOMEM; 1540 } 1541 1542 TAILQ_INSERT_TAIL(&bdev->aliases, tmp, tailq); 1543 1544 return 0; 1545 } 1546 1547 int 1548 spdk_bdev_alias_del(struct spdk_bdev *bdev, const char *alias) 1549 { 1550 struct spdk_bdev_alias *tmp; 1551 1552 TAILQ_FOREACH(tmp, &bdev->aliases, tailq) { 1553 if (strcmp(alias, tmp->alias) == 0) { 1554 TAILQ_REMOVE(&bdev->aliases, tmp, tailq); 1555 free(tmp->alias); 1556 free(tmp); 1557 return 0; 1558 } 1559 } 1560 1561 SPDK_INFOLOG(SPDK_LOG_BDEV, "Alias %s does not exists\n", alias); 1562 1563 return -ENOENT; 1564 } 1565 1566 struct spdk_io_channel * 1567 spdk_bdev_get_io_channel(struct spdk_bdev_desc *desc) 1568 { 1569 return spdk_get_io_channel(__bdev_to_io_dev(desc->bdev)); 1570 } 1571 1572 const char * 1573 spdk_bdev_get_name(const struct spdk_bdev *bdev) 1574 { 1575 return bdev->name; 1576 } 1577 1578 const char * 1579 spdk_bdev_get_product_name(const struct spdk_bdev *bdev) 1580 { 1581 return bdev->product_name; 1582 } 1583 1584 const struct spdk_bdev_aliases_list * 1585 spdk_bdev_get_aliases(const struct spdk_bdev *bdev) 1586 { 1587 return &bdev->aliases; 1588 } 1589 1590 uint32_t 1591 spdk_bdev_get_block_size(const struct spdk_bdev *bdev) 1592 { 1593 return bdev->blocklen; 1594 } 1595 1596 uint64_t 1597 spdk_bdev_get_num_blocks(const struct spdk_bdev *bdev) 1598 { 1599 return bdev->blockcnt; 1600 } 1601 1602 uint64_t 1603 spdk_bdev_get_qos_ios_per_sec(struct spdk_bdev *bdev) 1604 { 1605 uint64_t iops_rate_limit = 0; 1606 1607 pthread_mutex_lock(&bdev->internal.mutex); 1608 if (bdev->internal.qos) { 1609 iops_rate_limit = bdev->internal.qos->iops_rate_limit; 1610 } 1611 pthread_mutex_unlock(&bdev->internal.mutex); 1612 1613 return iops_rate_limit; 1614 } 1615 1616 size_t 1617 spdk_bdev_get_buf_align(const struct spdk_bdev *bdev) 1618 { 1619 /* TODO: push this logic down to the bdev modules */ 1620 if (bdev->need_aligned_buffer) { 1621 return bdev->blocklen; 1622 } 1623 1624 return 1; 1625 } 1626 1627 uint32_t 1628 spdk_bdev_get_optimal_io_boundary(const struct spdk_bdev *bdev) 1629 { 1630 return bdev->optimal_io_boundary; 1631 } 1632 1633 bool 1634 spdk_bdev_has_write_cache(const struct spdk_bdev *bdev) 1635 { 1636 return bdev->write_cache; 1637 } 1638 1639 const struct spdk_uuid * 1640 spdk_bdev_get_uuid(const struct spdk_bdev *bdev) 1641 { 1642 return &bdev->uuid; 1643 } 1644 1645 uint64_t 1646 spdk_bdev_get_qd(const struct spdk_bdev *bdev) 1647 { 1648 return bdev->internal.measured_queue_depth; 1649 } 1650 1651 uint64_t 1652 spdk_bdev_get_qd_sampling_period(const struct spdk_bdev *bdev) 1653 { 1654 return bdev->internal.period; 1655 } 1656 1657 static void 1658 _calculate_measured_qd_cpl(struct spdk_io_channel_iter *i, int status) 1659 { 1660 struct spdk_bdev *bdev = spdk_io_channel_iter_get_ctx(i); 1661 1662 bdev->internal.measured_queue_depth = bdev->internal.temporary_queue_depth; 1663 } 1664 1665 static void 1666 _calculate_measured_qd(struct spdk_io_channel_iter *i) 1667 { 1668 struct spdk_bdev *bdev = spdk_io_channel_iter_get_ctx(i); 1669 struct spdk_io_channel *io_ch = spdk_io_channel_iter_get_channel(i); 1670 struct spdk_bdev_channel *ch = spdk_io_channel_get_ctx(io_ch); 1671 1672 bdev->internal.temporary_queue_depth += ch->io_outstanding; 1673 spdk_for_each_channel_continue(i, 0); 1674 } 1675 1676 static int 1677 spdk_bdev_calculate_measured_queue_depth(void *ctx) 1678 { 1679 struct spdk_bdev *bdev = ctx; 1680 bdev->internal.temporary_queue_depth = 0; 1681 spdk_for_each_channel(__bdev_to_io_dev(bdev), _calculate_measured_qd, bdev, 1682 _calculate_measured_qd_cpl); 1683 return 0; 1684 } 1685 1686 void 1687 spdk_bdev_set_qd_sampling_period(struct spdk_bdev *bdev, uint64_t period) 1688 { 1689 bdev->internal.period = period; 1690 1691 if (bdev->internal.qd_poller != NULL) { 1692 spdk_poller_unregister(&bdev->internal.qd_poller); 1693 bdev->internal.measured_queue_depth = UINT64_MAX; 1694 } 1695 1696 if (period != 0) { 1697 bdev->internal.qd_poller = spdk_poller_register(spdk_bdev_calculate_measured_queue_depth, bdev, 1698 period); 1699 } 1700 } 1701 1702 int 1703 spdk_bdev_notify_blockcnt_change(struct spdk_bdev *bdev, uint64_t size) 1704 { 1705 int ret; 1706 1707 pthread_mutex_lock(&bdev->internal.mutex); 1708 1709 /* bdev has open descriptors */ 1710 if (!TAILQ_EMPTY(&bdev->internal.open_descs) && 1711 bdev->blockcnt > size) { 1712 ret = -EBUSY; 1713 } else { 1714 bdev->blockcnt = size; 1715 ret = 0; 1716 } 1717 1718 pthread_mutex_unlock(&bdev->internal.mutex); 1719 1720 return ret; 1721 } 1722 1723 /* 1724 * Convert I/O offset and length from bytes to blocks. 1725 * 1726 * Returns zero on success or non-zero if the byte parameters aren't divisible by the block size. 1727 */ 1728 static uint64_t 1729 spdk_bdev_bytes_to_blocks(struct spdk_bdev *bdev, uint64_t offset_bytes, uint64_t *offset_blocks, 1730 uint64_t num_bytes, uint64_t *num_blocks) 1731 { 1732 uint32_t block_size = bdev->blocklen; 1733 1734 *offset_blocks = offset_bytes / block_size; 1735 *num_blocks = num_bytes / block_size; 1736 1737 return (offset_bytes % block_size) | (num_bytes % block_size); 1738 } 1739 1740 static bool 1741 spdk_bdev_io_valid_blocks(struct spdk_bdev *bdev, uint64_t offset_blocks, uint64_t num_blocks) 1742 { 1743 /* Return failure if offset_blocks + num_blocks is less than offset_blocks; indicates there 1744 * has been an overflow and hence the offset has been wrapped around */ 1745 if (offset_blocks + num_blocks < offset_blocks) { 1746 return false; 1747 } 1748 1749 /* Return failure if offset_blocks + num_blocks exceeds the size of the bdev */ 1750 if (offset_blocks + num_blocks > bdev->blockcnt) { 1751 return false; 1752 } 1753 1754 return true; 1755 } 1756 1757 int 1758 spdk_bdev_read(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 1759 void *buf, uint64_t offset, uint64_t nbytes, 1760 spdk_bdev_io_completion_cb cb, void *cb_arg) 1761 { 1762 uint64_t offset_blocks, num_blocks; 1763 1764 if (spdk_bdev_bytes_to_blocks(desc->bdev, offset, &offset_blocks, nbytes, &num_blocks) != 0) { 1765 return -EINVAL; 1766 } 1767 1768 return spdk_bdev_read_blocks(desc, ch, buf, offset_blocks, num_blocks, cb, cb_arg); 1769 } 1770 1771 int 1772 spdk_bdev_read_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 1773 void *buf, uint64_t offset_blocks, uint64_t num_blocks, 1774 spdk_bdev_io_completion_cb cb, void *cb_arg) 1775 { 1776 struct spdk_bdev *bdev = desc->bdev; 1777 struct spdk_bdev_io *bdev_io; 1778 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 1779 1780 if (!spdk_bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) { 1781 return -EINVAL; 1782 } 1783 1784 bdev_io = spdk_bdev_get_io(channel); 1785 if (!bdev_io) { 1786 return -ENOMEM; 1787 } 1788 1789 bdev_io->internal.ch = channel; 1790 bdev_io->type = SPDK_BDEV_IO_TYPE_READ; 1791 bdev_io->u.bdev.iovs = &bdev_io->iov; 1792 bdev_io->u.bdev.iovs[0].iov_base = buf; 1793 bdev_io->u.bdev.iovs[0].iov_len = num_blocks * bdev->blocklen; 1794 bdev_io->u.bdev.iovcnt = 1; 1795 bdev_io->u.bdev.num_blocks = num_blocks; 1796 bdev_io->u.bdev.offset_blocks = offset_blocks; 1797 spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb); 1798 1799 spdk_bdev_io_submit(bdev_io); 1800 return 0; 1801 } 1802 1803 int 1804 spdk_bdev_readv(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 1805 struct iovec *iov, int iovcnt, 1806 uint64_t offset, uint64_t nbytes, 1807 spdk_bdev_io_completion_cb cb, void *cb_arg) 1808 { 1809 uint64_t offset_blocks, num_blocks; 1810 1811 if (spdk_bdev_bytes_to_blocks(desc->bdev, offset, &offset_blocks, nbytes, &num_blocks) != 0) { 1812 return -EINVAL; 1813 } 1814 1815 return spdk_bdev_readv_blocks(desc, ch, iov, iovcnt, offset_blocks, num_blocks, cb, cb_arg); 1816 } 1817 1818 int spdk_bdev_readv_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 1819 struct iovec *iov, int iovcnt, 1820 uint64_t offset_blocks, uint64_t num_blocks, 1821 spdk_bdev_io_completion_cb cb, void *cb_arg) 1822 { 1823 struct spdk_bdev *bdev = desc->bdev; 1824 struct spdk_bdev_io *bdev_io; 1825 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 1826 1827 if (!spdk_bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) { 1828 return -EINVAL; 1829 } 1830 1831 bdev_io = spdk_bdev_get_io(channel); 1832 if (!bdev_io) { 1833 return -ENOMEM; 1834 } 1835 1836 bdev_io->internal.ch = channel; 1837 bdev_io->type = SPDK_BDEV_IO_TYPE_READ; 1838 bdev_io->u.bdev.iovs = iov; 1839 bdev_io->u.bdev.iovcnt = iovcnt; 1840 bdev_io->u.bdev.num_blocks = num_blocks; 1841 bdev_io->u.bdev.offset_blocks = offset_blocks; 1842 spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb); 1843 1844 spdk_bdev_io_submit(bdev_io); 1845 return 0; 1846 } 1847 1848 int 1849 spdk_bdev_write(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 1850 void *buf, uint64_t offset, uint64_t nbytes, 1851 spdk_bdev_io_completion_cb cb, void *cb_arg) 1852 { 1853 uint64_t offset_blocks, num_blocks; 1854 1855 if (spdk_bdev_bytes_to_blocks(desc->bdev, offset, &offset_blocks, nbytes, &num_blocks) != 0) { 1856 return -EINVAL; 1857 } 1858 1859 return spdk_bdev_write_blocks(desc, ch, buf, offset_blocks, num_blocks, cb, cb_arg); 1860 } 1861 1862 int 1863 spdk_bdev_write_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 1864 void *buf, uint64_t offset_blocks, uint64_t num_blocks, 1865 spdk_bdev_io_completion_cb cb, void *cb_arg) 1866 { 1867 struct spdk_bdev *bdev = desc->bdev; 1868 struct spdk_bdev_io *bdev_io; 1869 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 1870 1871 if (!desc->write) { 1872 return -EBADF; 1873 } 1874 1875 if (!spdk_bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) { 1876 return -EINVAL; 1877 } 1878 1879 bdev_io = spdk_bdev_get_io(channel); 1880 if (!bdev_io) { 1881 return -ENOMEM; 1882 } 1883 1884 bdev_io->internal.ch = channel; 1885 bdev_io->type = SPDK_BDEV_IO_TYPE_WRITE; 1886 bdev_io->u.bdev.iovs = &bdev_io->iov; 1887 bdev_io->u.bdev.iovs[0].iov_base = buf; 1888 bdev_io->u.bdev.iovs[0].iov_len = num_blocks * bdev->blocklen; 1889 bdev_io->u.bdev.iovcnt = 1; 1890 bdev_io->u.bdev.num_blocks = num_blocks; 1891 bdev_io->u.bdev.offset_blocks = offset_blocks; 1892 spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb); 1893 1894 spdk_bdev_io_submit(bdev_io); 1895 return 0; 1896 } 1897 1898 int 1899 spdk_bdev_writev(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 1900 struct iovec *iov, int iovcnt, 1901 uint64_t offset, uint64_t len, 1902 spdk_bdev_io_completion_cb cb, void *cb_arg) 1903 { 1904 uint64_t offset_blocks, num_blocks; 1905 1906 if (spdk_bdev_bytes_to_blocks(desc->bdev, offset, &offset_blocks, len, &num_blocks) != 0) { 1907 return -EINVAL; 1908 } 1909 1910 return spdk_bdev_writev_blocks(desc, ch, iov, iovcnt, offset_blocks, num_blocks, cb, cb_arg); 1911 } 1912 1913 int 1914 spdk_bdev_writev_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 1915 struct iovec *iov, int iovcnt, 1916 uint64_t offset_blocks, uint64_t num_blocks, 1917 spdk_bdev_io_completion_cb cb, void *cb_arg) 1918 { 1919 struct spdk_bdev *bdev = desc->bdev; 1920 struct spdk_bdev_io *bdev_io; 1921 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 1922 1923 if (!desc->write) { 1924 return -EBADF; 1925 } 1926 1927 if (!spdk_bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) { 1928 return -EINVAL; 1929 } 1930 1931 bdev_io = spdk_bdev_get_io(channel); 1932 if (!bdev_io) { 1933 return -ENOMEM; 1934 } 1935 1936 bdev_io->internal.ch = channel; 1937 bdev_io->type = SPDK_BDEV_IO_TYPE_WRITE; 1938 bdev_io->u.bdev.iovs = iov; 1939 bdev_io->u.bdev.iovcnt = iovcnt; 1940 bdev_io->u.bdev.num_blocks = num_blocks; 1941 bdev_io->u.bdev.offset_blocks = offset_blocks; 1942 spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb); 1943 1944 spdk_bdev_io_submit(bdev_io); 1945 return 0; 1946 } 1947 1948 int 1949 spdk_bdev_write_zeroes(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 1950 uint64_t offset, uint64_t len, 1951 spdk_bdev_io_completion_cb cb, void *cb_arg) 1952 { 1953 uint64_t offset_blocks, num_blocks; 1954 1955 if (spdk_bdev_bytes_to_blocks(desc->bdev, offset, &offset_blocks, len, &num_blocks) != 0) { 1956 return -EINVAL; 1957 } 1958 1959 return spdk_bdev_write_zeroes_blocks(desc, ch, offset_blocks, num_blocks, cb, cb_arg); 1960 } 1961 1962 int 1963 spdk_bdev_write_zeroes_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 1964 uint64_t offset_blocks, uint64_t num_blocks, 1965 spdk_bdev_io_completion_cb cb, void *cb_arg) 1966 { 1967 struct spdk_bdev *bdev = desc->bdev; 1968 struct spdk_bdev_io *bdev_io; 1969 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 1970 uint64_t len; 1971 bool split_request = false; 1972 1973 if (!desc->write) { 1974 return -EBADF; 1975 } 1976 1977 if (!spdk_bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) { 1978 return -EINVAL; 1979 } 1980 1981 bdev_io = spdk_bdev_get_io(channel); 1982 1983 if (!bdev_io) { 1984 return -ENOMEM; 1985 } 1986 1987 bdev_io->internal.ch = channel; 1988 bdev_io->u.bdev.offset_blocks = offset_blocks; 1989 1990 if (_spdk_bdev_io_type_supported(bdev, SPDK_BDEV_IO_TYPE_WRITE_ZEROES)) { 1991 bdev_io->type = SPDK_BDEV_IO_TYPE_WRITE_ZEROES; 1992 bdev_io->u.bdev.num_blocks = num_blocks; 1993 bdev_io->u.bdev.iovs = NULL; 1994 bdev_io->u.bdev.iovcnt = 0; 1995 1996 } else if (_spdk_bdev_io_type_supported(bdev, SPDK_BDEV_IO_TYPE_WRITE)) { 1997 assert(spdk_bdev_get_block_size(bdev) <= ZERO_BUFFER_SIZE); 1998 1999 len = spdk_bdev_get_block_size(bdev) * num_blocks; 2000 2001 if (len > ZERO_BUFFER_SIZE) { 2002 split_request = true; 2003 len = ZERO_BUFFER_SIZE; 2004 } 2005 2006 bdev_io->type = SPDK_BDEV_IO_TYPE_WRITE; 2007 bdev_io->u.bdev.iovs = &bdev_io->iov; 2008 bdev_io->u.bdev.iovs[0].iov_base = g_bdev_mgr.zero_buffer; 2009 bdev_io->u.bdev.iovs[0].iov_len = len; 2010 bdev_io->u.bdev.iovcnt = 1; 2011 bdev_io->u.bdev.num_blocks = len / spdk_bdev_get_block_size(bdev); 2012 bdev_io->u.bdev.split_remaining_num_blocks = num_blocks - bdev_io->u.bdev.num_blocks; 2013 bdev_io->u.bdev.split_current_offset_blocks = offset_blocks + bdev_io->u.bdev.num_blocks; 2014 } else { 2015 spdk_bdev_free_io(bdev_io); 2016 return -ENOTSUP; 2017 } 2018 2019 if (split_request) { 2020 bdev_io->u.bdev.stored_user_cb = cb; 2021 spdk_bdev_io_init(bdev_io, bdev, cb_arg, spdk_bdev_write_zeroes_split); 2022 } else { 2023 spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb); 2024 } 2025 spdk_bdev_io_submit(bdev_io); 2026 return 0; 2027 } 2028 2029 int 2030 spdk_bdev_unmap(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 2031 uint64_t offset, uint64_t nbytes, 2032 spdk_bdev_io_completion_cb cb, void *cb_arg) 2033 { 2034 uint64_t offset_blocks, num_blocks; 2035 2036 if (spdk_bdev_bytes_to_blocks(desc->bdev, offset, &offset_blocks, nbytes, &num_blocks) != 0) { 2037 return -EINVAL; 2038 } 2039 2040 return spdk_bdev_unmap_blocks(desc, ch, offset_blocks, num_blocks, cb, cb_arg); 2041 } 2042 2043 int 2044 spdk_bdev_unmap_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 2045 uint64_t offset_blocks, uint64_t num_blocks, 2046 spdk_bdev_io_completion_cb cb, void *cb_arg) 2047 { 2048 struct spdk_bdev *bdev = desc->bdev; 2049 struct spdk_bdev_io *bdev_io; 2050 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 2051 2052 if (!desc->write) { 2053 return -EBADF; 2054 } 2055 2056 if (!spdk_bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) { 2057 return -EINVAL; 2058 } 2059 2060 if (num_blocks == 0) { 2061 SPDK_ERRLOG("Can't unmap 0 bytes\n"); 2062 return -EINVAL; 2063 } 2064 2065 bdev_io = spdk_bdev_get_io(channel); 2066 if (!bdev_io) { 2067 return -ENOMEM; 2068 } 2069 2070 bdev_io->internal.ch = channel; 2071 bdev_io->type = SPDK_BDEV_IO_TYPE_UNMAP; 2072 2073 bdev_io->u.bdev.iovs = &bdev_io->iov; 2074 bdev_io->u.bdev.iovs[0].iov_base = NULL; 2075 bdev_io->u.bdev.iovs[0].iov_len = 0; 2076 bdev_io->u.bdev.iovcnt = 1; 2077 2078 bdev_io->u.bdev.offset_blocks = offset_blocks; 2079 bdev_io->u.bdev.num_blocks = num_blocks; 2080 spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb); 2081 2082 spdk_bdev_io_submit(bdev_io); 2083 return 0; 2084 } 2085 2086 int 2087 spdk_bdev_flush(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 2088 uint64_t offset, uint64_t length, 2089 spdk_bdev_io_completion_cb cb, void *cb_arg) 2090 { 2091 uint64_t offset_blocks, num_blocks; 2092 2093 if (spdk_bdev_bytes_to_blocks(desc->bdev, offset, &offset_blocks, length, &num_blocks) != 0) { 2094 return -EINVAL; 2095 } 2096 2097 return spdk_bdev_flush_blocks(desc, ch, offset_blocks, num_blocks, cb, cb_arg); 2098 } 2099 2100 int 2101 spdk_bdev_flush_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 2102 uint64_t offset_blocks, uint64_t num_blocks, 2103 spdk_bdev_io_completion_cb cb, void *cb_arg) 2104 { 2105 struct spdk_bdev *bdev = desc->bdev; 2106 struct spdk_bdev_io *bdev_io; 2107 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 2108 2109 if (!desc->write) { 2110 return -EBADF; 2111 } 2112 2113 if (!spdk_bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) { 2114 return -EINVAL; 2115 } 2116 2117 bdev_io = spdk_bdev_get_io(channel); 2118 if (!bdev_io) { 2119 return -ENOMEM; 2120 } 2121 2122 bdev_io->internal.ch = channel; 2123 bdev_io->type = SPDK_BDEV_IO_TYPE_FLUSH; 2124 bdev_io->u.bdev.iovs = NULL; 2125 bdev_io->u.bdev.iovcnt = 0; 2126 bdev_io->u.bdev.offset_blocks = offset_blocks; 2127 bdev_io->u.bdev.num_blocks = num_blocks; 2128 spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb); 2129 2130 spdk_bdev_io_submit(bdev_io); 2131 return 0; 2132 } 2133 2134 static void 2135 _spdk_bdev_reset_dev(struct spdk_io_channel_iter *i, int status) 2136 { 2137 struct spdk_bdev_channel *ch = spdk_io_channel_iter_get_ctx(i); 2138 struct spdk_bdev_io *bdev_io; 2139 2140 bdev_io = TAILQ_FIRST(&ch->queued_resets); 2141 TAILQ_REMOVE(&ch->queued_resets, bdev_io, internal.link); 2142 spdk_bdev_io_submit_reset(bdev_io); 2143 } 2144 2145 static void 2146 _spdk_bdev_reset_freeze_channel(struct spdk_io_channel_iter *i) 2147 { 2148 struct spdk_io_channel *ch; 2149 struct spdk_bdev_channel *channel; 2150 struct spdk_bdev_mgmt_channel *mgmt_channel; 2151 struct spdk_bdev_shared_resource *shared_resource; 2152 bdev_io_tailq_t tmp_queued; 2153 2154 TAILQ_INIT(&tmp_queued); 2155 2156 ch = spdk_io_channel_iter_get_channel(i); 2157 channel = spdk_io_channel_get_ctx(ch); 2158 shared_resource = channel->shared_resource; 2159 mgmt_channel = shared_resource->mgmt_ch; 2160 2161 channel->flags |= BDEV_CH_RESET_IN_PROGRESS; 2162 2163 if ((channel->flags & BDEV_CH_QOS_ENABLED) != 0) { 2164 /* The QoS object is always valid and readable while 2165 * the channel flag is set, so the lock here should not 2166 * be necessary. We're not in the fast path though, so 2167 * just take it anyway. */ 2168 pthread_mutex_lock(&channel->bdev->internal.mutex); 2169 if (channel->bdev->internal.qos->ch == channel) { 2170 TAILQ_SWAP(&channel->bdev->internal.qos->queued, &tmp_queued, spdk_bdev_io, internal.link); 2171 } 2172 pthread_mutex_unlock(&channel->bdev->internal.mutex); 2173 } 2174 2175 _spdk_bdev_abort_queued_io(&shared_resource->nomem_io, channel); 2176 _spdk_bdev_abort_buf_io(&mgmt_channel->need_buf_small, channel); 2177 _spdk_bdev_abort_buf_io(&mgmt_channel->need_buf_large, channel); 2178 _spdk_bdev_abort_queued_io(&tmp_queued, channel); 2179 2180 spdk_for_each_channel_continue(i, 0); 2181 } 2182 2183 static void 2184 _spdk_bdev_start_reset(void *ctx) 2185 { 2186 struct spdk_bdev_channel *ch = ctx; 2187 2188 spdk_for_each_channel(__bdev_to_io_dev(ch->bdev), _spdk_bdev_reset_freeze_channel, 2189 ch, _spdk_bdev_reset_dev); 2190 } 2191 2192 static void 2193 _spdk_bdev_channel_start_reset(struct spdk_bdev_channel *ch) 2194 { 2195 struct spdk_bdev *bdev = ch->bdev; 2196 2197 assert(!TAILQ_EMPTY(&ch->queued_resets)); 2198 2199 pthread_mutex_lock(&bdev->internal.mutex); 2200 if (bdev->internal.reset_in_progress == NULL) { 2201 bdev->internal.reset_in_progress = TAILQ_FIRST(&ch->queued_resets); 2202 /* 2203 * Take a channel reference for the target bdev for the life of this 2204 * reset. This guards against the channel getting destroyed while 2205 * spdk_for_each_channel() calls related to this reset IO are in 2206 * progress. We will release the reference when this reset is 2207 * completed. 2208 */ 2209 bdev->internal.reset_in_progress->u.reset.ch_ref = spdk_get_io_channel(__bdev_to_io_dev(bdev)); 2210 _spdk_bdev_start_reset(ch); 2211 } 2212 pthread_mutex_unlock(&bdev->internal.mutex); 2213 } 2214 2215 int 2216 spdk_bdev_reset(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 2217 spdk_bdev_io_completion_cb cb, void *cb_arg) 2218 { 2219 struct spdk_bdev *bdev = desc->bdev; 2220 struct spdk_bdev_io *bdev_io; 2221 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 2222 2223 bdev_io = spdk_bdev_get_io(channel); 2224 if (!bdev_io) { 2225 return -ENOMEM; 2226 } 2227 2228 bdev_io->internal.ch = channel; 2229 bdev_io->type = SPDK_BDEV_IO_TYPE_RESET; 2230 bdev_io->u.reset.ch_ref = NULL; 2231 spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb); 2232 2233 pthread_mutex_lock(&bdev->internal.mutex); 2234 TAILQ_INSERT_TAIL(&channel->queued_resets, bdev_io, internal.link); 2235 pthread_mutex_unlock(&bdev->internal.mutex); 2236 2237 _spdk_bdev_channel_start_reset(channel); 2238 2239 return 0; 2240 } 2241 2242 void 2243 spdk_bdev_get_io_stat(struct spdk_bdev *bdev, struct spdk_io_channel *ch, 2244 struct spdk_bdev_io_stat *stat) 2245 { 2246 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 2247 2248 *stat = channel->stat; 2249 } 2250 2251 static void 2252 _spdk_bdev_get_device_stat_done(struct spdk_io_channel_iter *i, int status) 2253 { 2254 void *io_device = spdk_io_channel_iter_get_io_device(i); 2255 struct spdk_bdev_iostat_ctx *bdev_iostat_ctx = spdk_io_channel_iter_get_ctx(i); 2256 2257 bdev_iostat_ctx->cb(__bdev_from_io_dev(io_device), bdev_iostat_ctx->stat, 2258 bdev_iostat_ctx->cb_arg, 0); 2259 free(bdev_iostat_ctx); 2260 } 2261 2262 static void 2263 _spdk_bdev_get_each_channel_stat(struct spdk_io_channel_iter *i) 2264 { 2265 struct spdk_bdev_iostat_ctx *bdev_iostat_ctx = spdk_io_channel_iter_get_ctx(i); 2266 struct spdk_io_channel *ch = spdk_io_channel_iter_get_channel(i); 2267 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 2268 2269 bdev_iostat_ctx->stat->bytes_read += channel->stat.bytes_read; 2270 bdev_iostat_ctx->stat->num_read_ops += channel->stat.num_read_ops; 2271 bdev_iostat_ctx->stat->bytes_written += channel->stat.bytes_written; 2272 bdev_iostat_ctx->stat->num_write_ops += channel->stat.num_write_ops; 2273 2274 spdk_for_each_channel_continue(i, 0); 2275 } 2276 2277 void 2278 spdk_bdev_get_device_stat(struct spdk_bdev *bdev, struct spdk_bdev_io_stat *stat, 2279 spdk_bdev_get_device_stat_cb cb, void *cb_arg) 2280 { 2281 struct spdk_bdev_iostat_ctx *bdev_iostat_ctx; 2282 2283 assert(bdev != NULL); 2284 assert(stat != NULL); 2285 assert(cb != NULL); 2286 2287 bdev_iostat_ctx = calloc(1, sizeof(struct spdk_bdev_iostat_ctx)); 2288 if (bdev_iostat_ctx == NULL) { 2289 SPDK_ERRLOG("Unable to allocate memory for spdk_bdev_iostat_ctx\n"); 2290 cb(bdev, stat, cb_arg, -ENOMEM); 2291 return; 2292 } 2293 2294 bdev_iostat_ctx->stat = stat; 2295 bdev_iostat_ctx->cb = cb; 2296 bdev_iostat_ctx->cb_arg = cb_arg; 2297 2298 spdk_for_each_channel(__bdev_to_io_dev(bdev), 2299 _spdk_bdev_get_each_channel_stat, 2300 bdev_iostat_ctx, 2301 _spdk_bdev_get_device_stat_done); 2302 } 2303 2304 int 2305 spdk_bdev_nvme_admin_passthru(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 2306 const struct spdk_nvme_cmd *cmd, void *buf, size_t nbytes, 2307 spdk_bdev_io_completion_cb cb, void *cb_arg) 2308 { 2309 struct spdk_bdev *bdev = desc->bdev; 2310 struct spdk_bdev_io *bdev_io; 2311 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 2312 2313 if (!desc->write) { 2314 return -EBADF; 2315 } 2316 2317 bdev_io = spdk_bdev_get_io(channel); 2318 if (!bdev_io) { 2319 return -ENOMEM; 2320 } 2321 2322 bdev_io->internal.ch = channel; 2323 bdev_io->type = SPDK_BDEV_IO_TYPE_NVME_ADMIN; 2324 bdev_io->u.nvme_passthru.cmd = *cmd; 2325 bdev_io->u.nvme_passthru.buf = buf; 2326 bdev_io->u.nvme_passthru.nbytes = nbytes; 2327 bdev_io->u.nvme_passthru.md_buf = NULL; 2328 bdev_io->u.nvme_passthru.md_len = 0; 2329 2330 spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb); 2331 2332 spdk_bdev_io_submit(bdev_io); 2333 return 0; 2334 } 2335 2336 int 2337 spdk_bdev_nvme_io_passthru(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 2338 const struct spdk_nvme_cmd *cmd, void *buf, size_t nbytes, 2339 spdk_bdev_io_completion_cb cb, void *cb_arg) 2340 { 2341 struct spdk_bdev *bdev = desc->bdev; 2342 struct spdk_bdev_io *bdev_io; 2343 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 2344 2345 if (!desc->write) { 2346 /* 2347 * Do not try to parse the NVMe command - we could maybe use bits in the opcode 2348 * to easily determine if the command is a read or write, but for now just 2349 * do not allow io_passthru with a read-only descriptor. 2350 */ 2351 return -EBADF; 2352 } 2353 2354 bdev_io = spdk_bdev_get_io(channel); 2355 if (!bdev_io) { 2356 return -ENOMEM; 2357 } 2358 2359 bdev_io->internal.ch = channel; 2360 bdev_io->type = SPDK_BDEV_IO_TYPE_NVME_IO; 2361 bdev_io->u.nvme_passthru.cmd = *cmd; 2362 bdev_io->u.nvme_passthru.buf = buf; 2363 bdev_io->u.nvme_passthru.nbytes = nbytes; 2364 bdev_io->u.nvme_passthru.md_buf = NULL; 2365 bdev_io->u.nvme_passthru.md_len = 0; 2366 2367 spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb); 2368 2369 spdk_bdev_io_submit(bdev_io); 2370 return 0; 2371 } 2372 2373 int 2374 spdk_bdev_nvme_io_passthru_md(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 2375 const struct spdk_nvme_cmd *cmd, void *buf, size_t nbytes, void *md_buf, size_t md_len, 2376 spdk_bdev_io_completion_cb cb, void *cb_arg) 2377 { 2378 struct spdk_bdev *bdev = desc->bdev; 2379 struct spdk_bdev_io *bdev_io; 2380 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 2381 2382 if (!desc->write) { 2383 /* 2384 * Do not try to parse the NVMe command - we could maybe use bits in the opcode 2385 * to easily determine if the command is a read or write, but for now just 2386 * do not allow io_passthru with a read-only descriptor. 2387 */ 2388 return -EBADF; 2389 } 2390 2391 bdev_io = spdk_bdev_get_io(channel); 2392 if (!bdev_io) { 2393 return -ENOMEM; 2394 } 2395 2396 bdev_io->internal.ch = channel; 2397 bdev_io->type = SPDK_BDEV_IO_TYPE_NVME_IO_MD; 2398 bdev_io->u.nvme_passthru.cmd = *cmd; 2399 bdev_io->u.nvme_passthru.buf = buf; 2400 bdev_io->u.nvme_passthru.nbytes = nbytes; 2401 bdev_io->u.nvme_passthru.md_buf = md_buf; 2402 bdev_io->u.nvme_passthru.md_len = md_len; 2403 2404 spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb); 2405 2406 spdk_bdev_io_submit(bdev_io); 2407 return 0; 2408 } 2409 2410 int 2411 spdk_bdev_queue_io_wait(struct spdk_bdev *bdev, struct spdk_io_channel *ch, 2412 struct spdk_bdev_io_wait_entry *entry) 2413 { 2414 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 2415 struct spdk_bdev_mgmt_channel *mgmt_ch = channel->shared_resource->mgmt_ch; 2416 2417 if (bdev != entry->bdev) { 2418 SPDK_ERRLOG("bdevs do not match\n"); 2419 return -EINVAL; 2420 } 2421 2422 if (mgmt_ch->per_thread_cache_count > 0) { 2423 SPDK_ERRLOG("Cannot queue io_wait if spdk_bdev_io available in per-thread cache\n"); 2424 return -EINVAL; 2425 } 2426 2427 TAILQ_INSERT_TAIL(&mgmt_ch->io_wait_queue, entry, link); 2428 return 0; 2429 } 2430 2431 static void 2432 _spdk_bdev_ch_retry_io(struct spdk_bdev_channel *bdev_ch) 2433 { 2434 struct spdk_bdev *bdev = bdev_ch->bdev; 2435 struct spdk_bdev_shared_resource *shared_resource = bdev_ch->shared_resource; 2436 struct spdk_bdev_io *bdev_io; 2437 2438 if (shared_resource->io_outstanding > shared_resource->nomem_threshold) { 2439 /* 2440 * Allow some more I/O to complete before retrying the nomem_io queue. 2441 * Some drivers (such as nvme) cannot immediately take a new I/O in 2442 * the context of a completion, because the resources for the I/O are 2443 * not released until control returns to the bdev poller. Also, we 2444 * may require several small I/O to complete before a larger I/O 2445 * (that requires splitting) can be submitted. 2446 */ 2447 return; 2448 } 2449 2450 while (!TAILQ_EMPTY(&shared_resource->nomem_io)) { 2451 bdev_io = TAILQ_FIRST(&shared_resource->nomem_io); 2452 TAILQ_REMOVE(&shared_resource->nomem_io, bdev_io, internal.link); 2453 bdev_io->internal.ch->io_outstanding++; 2454 shared_resource->io_outstanding++; 2455 bdev_io->internal.status = SPDK_BDEV_IO_STATUS_PENDING; 2456 bdev->fn_table->submit_request(bdev_io->internal.ch->channel, bdev_io); 2457 if (bdev_io->internal.status == SPDK_BDEV_IO_STATUS_NOMEM) { 2458 break; 2459 } 2460 } 2461 } 2462 2463 static inline void 2464 _spdk_bdev_io_complete(void *ctx) 2465 { 2466 struct spdk_bdev_io *bdev_io = ctx; 2467 2468 if (spdk_unlikely(bdev_io->internal.in_submit_request || bdev_io->internal.io_submit_ch)) { 2469 /* 2470 * Send the completion to the thread that originally submitted the I/O, 2471 * which may not be the current thread in the case of QoS. 2472 */ 2473 if (bdev_io->internal.io_submit_ch) { 2474 bdev_io->internal.ch = bdev_io->internal.io_submit_ch; 2475 bdev_io->internal.io_submit_ch = NULL; 2476 } 2477 2478 /* 2479 * Defer completion to avoid potential infinite recursion if the 2480 * user's completion callback issues a new I/O. 2481 */ 2482 spdk_thread_send_msg(spdk_io_channel_get_thread(bdev_io->internal.ch->channel), 2483 _spdk_bdev_io_complete, bdev_io); 2484 return; 2485 } 2486 2487 if (bdev_io->internal.status == SPDK_BDEV_IO_STATUS_SUCCESS) { 2488 switch (bdev_io->type) { 2489 case SPDK_BDEV_IO_TYPE_READ: 2490 bdev_io->internal.ch->stat.bytes_read += bdev_io->u.bdev.num_blocks * bdev_io->bdev->blocklen; 2491 bdev_io->internal.ch->stat.num_read_ops++; 2492 bdev_io->internal.ch->stat.read_latency_ticks += (spdk_get_ticks() - bdev_io->internal.submit_tsc); 2493 break; 2494 case SPDK_BDEV_IO_TYPE_WRITE: 2495 bdev_io->internal.ch->stat.bytes_written += bdev_io->u.bdev.num_blocks * bdev_io->bdev->blocklen; 2496 bdev_io->internal.ch->stat.num_write_ops++; 2497 bdev_io->internal.ch->stat.write_latency_ticks += (spdk_get_ticks() - bdev_io->internal.submit_tsc); 2498 break; 2499 default: 2500 break; 2501 } 2502 } 2503 2504 #ifdef SPDK_CONFIG_VTUNE 2505 uint64_t now_tsc = spdk_get_ticks(); 2506 if (now_tsc > (bdev_io->internal.ch->start_tsc + bdev_io->internal.ch->interval_tsc)) { 2507 uint64_t data[5]; 2508 2509 data[0] = bdev_io->internal.ch->stat.num_read_ops - bdev_io->internal.ch->prev_stat.num_read_ops; 2510 data[1] = bdev_io->internal.ch->stat.bytes_read - bdev_io->internal.ch->prev_stat.bytes_read; 2511 data[2] = bdev_io->internal.ch->stat.num_write_ops - bdev_io->internal.ch->prev_stat.num_write_ops; 2512 data[3] = bdev_io->internal.ch->stat.bytes_written - bdev_io->internal.ch->prev_stat.bytes_written; 2513 data[4] = bdev_io->bdev->fn_table->get_spin_time ? 2514 bdev_io->bdev->fn_table->get_spin_time(bdev_io->internal.ch->channel) : 0; 2515 2516 __itt_metadata_add(g_bdev_mgr.domain, __itt_null, bdev_io->internal.ch->handle, 2517 __itt_metadata_u64, 5, data); 2518 2519 bdev_io->internal.ch->prev_stat = bdev_io->internal.ch->stat; 2520 bdev_io->internal.ch->start_tsc = now_tsc; 2521 } 2522 #endif 2523 2524 assert(bdev_io->internal.cb != NULL); 2525 assert(spdk_get_thread() == spdk_io_channel_get_thread(bdev_io->internal.ch->channel)); 2526 2527 bdev_io->internal.cb(bdev_io, bdev_io->internal.status == SPDK_BDEV_IO_STATUS_SUCCESS, 2528 bdev_io->internal.caller_ctx); 2529 } 2530 2531 static void 2532 _spdk_bdev_reset_complete(struct spdk_io_channel_iter *i, int status) 2533 { 2534 struct spdk_bdev_io *bdev_io = spdk_io_channel_iter_get_ctx(i); 2535 2536 if (bdev_io->u.reset.ch_ref != NULL) { 2537 spdk_put_io_channel(bdev_io->u.reset.ch_ref); 2538 bdev_io->u.reset.ch_ref = NULL; 2539 } 2540 2541 _spdk_bdev_io_complete(bdev_io); 2542 } 2543 2544 static void 2545 _spdk_bdev_unfreeze_channel(struct spdk_io_channel_iter *i) 2546 { 2547 struct spdk_io_channel *_ch = spdk_io_channel_iter_get_channel(i); 2548 struct spdk_bdev_channel *ch = spdk_io_channel_get_ctx(_ch); 2549 2550 ch->flags &= ~BDEV_CH_RESET_IN_PROGRESS; 2551 if (!TAILQ_EMPTY(&ch->queued_resets)) { 2552 _spdk_bdev_channel_start_reset(ch); 2553 } 2554 2555 spdk_for_each_channel_continue(i, 0); 2556 } 2557 2558 void 2559 spdk_bdev_io_complete(struct spdk_bdev_io *bdev_io, enum spdk_bdev_io_status status) 2560 { 2561 struct spdk_bdev *bdev = bdev_io->bdev; 2562 struct spdk_bdev_channel *bdev_ch = bdev_io->internal.ch; 2563 struct spdk_bdev_shared_resource *shared_resource = bdev_ch->shared_resource; 2564 2565 bdev_io->internal.status = status; 2566 2567 if (spdk_unlikely(bdev_io->type == SPDK_BDEV_IO_TYPE_RESET)) { 2568 bool unlock_channels = false; 2569 2570 if (status == SPDK_BDEV_IO_STATUS_NOMEM) { 2571 SPDK_ERRLOG("NOMEM returned for reset\n"); 2572 } 2573 pthread_mutex_lock(&bdev->internal.mutex); 2574 if (bdev_io == bdev->internal.reset_in_progress) { 2575 bdev->internal.reset_in_progress = NULL; 2576 unlock_channels = true; 2577 } 2578 pthread_mutex_unlock(&bdev->internal.mutex); 2579 2580 if (unlock_channels) { 2581 spdk_for_each_channel(__bdev_to_io_dev(bdev), _spdk_bdev_unfreeze_channel, 2582 bdev_io, _spdk_bdev_reset_complete); 2583 return; 2584 } 2585 } else { 2586 assert(bdev_ch->io_outstanding > 0); 2587 assert(shared_resource->io_outstanding > 0); 2588 bdev_ch->io_outstanding--; 2589 shared_resource->io_outstanding--; 2590 2591 if (spdk_unlikely(status == SPDK_BDEV_IO_STATUS_NOMEM)) { 2592 TAILQ_INSERT_HEAD(&shared_resource->nomem_io, bdev_io, internal.link); 2593 /* 2594 * Wait for some of the outstanding I/O to complete before we 2595 * retry any of the nomem_io. Normally we will wait for 2596 * NOMEM_THRESHOLD_COUNT I/O to complete but for low queue 2597 * depth channels we will instead wait for half to complete. 2598 */ 2599 shared_resource->nomem_threshold = spdk_max((int64_t)shared_resource->io_outstanding / 2, 2600 (int64_t)shared_resource->io_outstanding - NOMEM_THRESHOLD_COUNT); 2601 return; 2602 } 2603 2604 if (spdk_unlikely(!TAILQ_EMPTY(&shared_resource->nomem_io))) { 2605 _spdk_bdev_ch_retry_io(bdev_ch); 2606 } 2607 } 2608 2609 _spdk_bdev_io_complete(bdev_io); 2610 } 2611 2612 void 2613 spdk_bdev_io_complete_scsi_status(struct spdk_bdev_io *bdev_io, enum spdk_scsi_status sc, 2614 enum spdk_scsi_sense sk, uint8_t asc, uint8_t ascq) 2615 { 2616 if (sc == SPDK_SCSI_STATUS_GOOD) { 2617 bdev_io->internal.status = SPDK_BDEV_IO_STATUS_SUCCESS; 2618 } else { 2619 bdev_io->internal.status = SPDK_BDEV_IO_STATUS_SCSI_ERROR; 2620 bdev_io->internal.error.scsi.sc = sc; 2621 bdev_io->internal.error.scsi.sk = sk; 2622 bdev_io->internal.error.scsi.asc = asc; 2623 bdev_io->internal.error.scsi.ascq = ascq; 2624 } 2625 2626 spdk_bdev_io_complete(bdev_io, bdev_io->internal.status); 2627 } 2628 2629 void 2630 spdk_bdev_io_get_scsi_status(const struct spdk_bdev_io *bdev_io, 2631 int *sc, int *sk, int *asc, int *ascq) 2632 { 2633 assert(sc != NULL); 2634 assert(sk != NULL); 2635 assert(asc != NULL); 2636 assert(ascq != NULL); 2637 2638 switch (bdev_io->internal.status) { 2639 case SPDK_BDEV_IO_STATUS_SUCCESS: 2640 *sc = SPDK_SCSI_STATUS_GOOD; 2641 *sk = SPDK_SCSI_SENSE_NO_SENSE; 2642 *asc = SPDK_SCSI_ASC_NO_ADDITIONAL_SENSE; 2643 *ascq = SPDK_SCSI_ASCQ_CAUSE_NOT_REPORTABLE; 2644 break; 2645 case SPDK_BDEV_IO_STATUS_NVME_ERROR: 2646 spdk_scsi_nvme_translate(bdev_io, sc, sk, asc, ascq); 2647 break; 2648 case SPDK_BDEV_IO_STATUS_SCSI_ERROR: 2649 *sc = bdev_io->internal.error.scsi.sc; 2650 *sk = bdev_io->internal.error.scsi.sk; 2651 *asc = bdev_io->internal.error.scsi.asc; 2652 *ascq = bdev_io->internal.error.scsi.ascq; 2653 break; 2654 default: 2655 *sc = SPDK_SCSI_STATUS_CHECK_CONDITION; 2656 *sk = SPDK_SCSI_SENSE_ABORTED_COMMAND; 2657 *asc = SPDK_SCSI_ASC_NO_ADDITIONAL_SENSE; 2658 *ascq = SPDK_SCSI_ASCQ_CAUSE_NOT_REPORTABLE; 2659 break; 2660 } 2661 } 2662 2663 void 2664 spdk_bdev_io_complete_nvme_status(struct spdk_bdev_io *bdev_io, int sct, int sc) 2665 { 2666 if (sct == SPDK_NVME_SCT_GENERIC && sc == SPDK_NVME_SC_SUCCESS) { 2667 bdev_io->internal.status = SPDK_BDEV_IO_STATUS_SUCCESS; 2668 } else { 2669 bdev_io->internal.error.nvme.sct = sct; 2670 bdev_io->internal.error.nvme.sc = sc; 2671 bdev_io->internal.status = SPDK_BDEV_IO_STATUS_NVME_ERROR; 2672 } 2673 2674 spdk_bdev_io_complete(bdev_io, bdev_io->internal.status); 2675 } 2676 2677 void 2678 spdk_bdev_io_get_nvme_status(const struct spdk_bdev_io *bdev_io, int *sct, int *sc) 2679 { 2680 assert(sct != NULL); 2681 assert(sc != NULL); 2682 2683 if (bdev_io->internal.status == SPDK_BDEV_IO_STATUS_NVME_ERROR) { 2684 *sct = bdev_io->internal.error.nvme.sct; 2685 *sc = bdev_io->internal.error.nvme.sc; 2686 } else if (bdev_io->internal.status == SPDK_BDEV_IO_STATUS_SUCCESS) { 2687 *sct = SPDK_NVME_SCT_GENERIC; 2688 *sc = SPDK_NVME_SC_SUCCESS; 2689 } else { 2690 *sct = SPDK_NVME_SCT_GENERIC; 2691 *sc = SPDK_NVME_SC_INTERNAL_DEVICE_ERROR; 2692 } 2693 } 2694 2695 struct spdk_thread * 2696 spdk_bdev_io_get_thread(struct spdk_bdev_io *bdev_io) 2697 { 2698 return spdk_io_channel_get_thread(bdev_io->internal.ch->channel); 2699 } 2700 2701 static void 2702 _spdk_bdev_qos_config_type(struct spdk_bdev *bdev, uint64_t qos_set, 2703 enum spdk_bdev_qos_type qos_type) 2704 { 2705 uint64_t min_qos_set = 0; 2706 2707 switch (qos_type) { 2708 case SPDK_BDEV_QOS_RW_IOPS_RATE_LIMIT: 2709 min_qos_set = SPDK_BDEV_QOS_MIN_IOS_PER_SEC; 2710 break; 2711 case SPDK_BDEV_QOS_RW_BYTEPS_RATE_LIMIT: 2712 min_qos_set = SPDK_BDEV_QOS_MIN_BW_IN_MB_PER_SEC; 2713 break; 2714 default: 2715 SPDK_ERRLOG("Unsupported QoS type.\n"); 2716 return; 2717 } 2718 2719 if (qos_set % min_qos_set) { 2720 SPDK_ERRLOG("Assigned QoS %" PRIu64 " on bdev %s is not multiple of %lu\n", 2721 qos_set, bdev->name, min_qos_set); 2722 SPDK_ERRLOG("Failed to enable QoS on this bdev %s\n", bdev->name); 2723 return; 2724 } 2725 2726 if (!bdev->internal.qos) { 2727 bdev->internal.qos = calloc(1, sizeof(*bdev->internal.qos)); 2728 if (!bdev->internal.qos) { 2729 SPDK_ERRLOG("Unable to allocate memory for QoS tracking\n"); 2730 return; 2731 } 2732 } 2733 2734 switch (qos_type) { 2735 case SPDK_BDEV_QOS_RW_IOPS_RATE_LIMIT: 2736 bdev->internal.qos->iops_rate_limit = qos_set; 2737 break; 2738 case SPDK_BDEV_QOS_RW_BYTEPS_RATE_LIMIT: 2739 bdev->internal.qos->byte_rate_limit = qos_set * 1024 * 1024; 2740 break; 2741 default: 2742 break; 2743 } 2744 2745 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Bdev:%s QoS type:%d set:%lu\n", 2746 bdev->name, qos_type, qos_set); 2747 2748 return; 2749 } 2750 2751 static void 2752 _spdk_bdev_qos_config(struct spdk_bdev *bdev) 2753 { 2754 struct spdk_conf_section *sp = NULL; 2755 const char *val = NULL; 2756 uint64_t qos_set = 0; 2757 int i = 0, j = 0; 2758 2759 sp = spdk_conf_find_section(NULL, "QoS"); 2760 if (!sp) { 2761 return; 2762 } 2763 2764 while (j < SPDK_BDEV_QOS_NUM_TYPES) { 2765 i = 0; 2766 while (true) { 2767 val = spdk_conf_section_get_nmval(sp, qos_type_str[j], i, 0); 2768 if (!val) { 2769 break; 2770 } 2771 2772 if (strcmp(bdev->name, val) != 0) { 2773 i++; 2774 continue; 2775 } 2776 2777 val = spdk_conf_section_get_nmval(sp, qos_type_str[j], i, 1); 2778 if (val) { 2779 qos_set = strtoull(val, NULL, 10); 2780 _spdk_bdev_qos_config_type(bdev, qos_set, j); 2781 } 2782 2783 break; 2784 } 2785 2786 j++; 2787 } 2788 2789 return; 2790 } 2791 2792 static int 2793 spdk_bdev_init(struct spdk_bdev *bdev) 2794 { 2795 assert(bdev->module != NULL); 2796 2797 if (!bdev->name) { 2798 SPDK_ERRLOG("Bdev name is NULL\n"); 2799 return -EINVAL; 2800 } 2801 2802 if (spdk_bdev_get_by_name(bdev->name)) { 2803 SPDK_ERRLOG("Bdev name:%s already exists\n", bdev->name); 2804 return -EEXIST; 2805 } 2806 2807 bdev->internal.status = SPDK_BDEV_STATUS_READY; 2808 bdev->internal.measured_queue_depth = UINT64_MAX; 2809 2810 TAILQ_INIT(&bdev->internal.open_descs); 2811 2812 TAILQ_INIT(&bdev->aliases); 2813 2814 bdev->internal.reset_in_progress = NULL; 2815 2816 _spdk_bdev_qos_config(bdev); 2817 2818 spdk_io_device_register(__bdev_to_io_dev(bdev), 2819 spdk_bdev_channel_create, spdk_bdev_channel_destroy, 2820 sizeof(struct spdk_bdev_channel)); 2821 2822 pthread_mutex_init(&bdev->internal.mutex, NULL); 2823 return 0; 2824 } 2825 2826 static void 2827 spdk_bdev_destroy_cb(void *io_device) 2828 { 2829 int rc; 2830 struct spdk_bdev *bdev; 2831 spdk_bdev_unregister_cb cb_fn; 2832 void *cb_arg; 2833 2834 bdev = __bdev_from_io_dev(io_device); 2835 cb_fn = bdev->internal.unregister_cb; 2836 cb_arg = bdev->internal.unregister_ctx; 2837 2838 rc = bdev->fn_table->destruct(bdev->ctxt); 2839 if (rc < 0) { 2840 SPDK_ERRLOG("destruct failed\n"); 2841 } 2842 if (rc <= 0 && cb_fn != NULL) { 2843 cb_fn(cb_arg, rc); 2844 } 2845 } 2846 2847 2848 static void 2849 spdk_bdev_fini(struct spdk_bdev *bdev) 2850 { 2851 pthread_mutex_destroy(&bdev->internal.mutex); 2852 2853 free(bdev->internal.qos); 2854 2855 spdk_io_device_unregister(__bdev_to_io_dev(bdev), spdk_bdev_destroy_cb); 2856 } 2857 2858 static void 2859 spdk_bdev_start(struct spdk_bdev *bdev) 2860 { 2861 struct spdk_bdev_module *module; 2862 uint32_t action; 2863 2864 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Inserting bdev %s into list\n", bdev->name); 2865 TAILQ_INSERT_TAIL(&g_bdev_mgr.bdevs, bdev, internal.link); 2866 2867 /* Examine configuration before initializing I/O */ 2868 TAILQ_FOREACH(module, &g_bdev_mgr.bdev_modules, internal.tailq) { 2869 if (module->examine_config) { 2870 action = module->internal.action_in_progress; 2871 module->internal.action_in_progress++; 2872 module->examine_config(bdev); 2873 if (action != module->internal.action_in_progress) { 2874 SPDK_ERRLOG("examine_config for module %s did not call spdk_bdev_module_examine_done()\n", 2875 module->name); 2876 } 2877 } 2878 } 2879 2880 if (bdev->internal.claim_module) { 2881 return; 2882 } 2883 2884 TAILQ_FOREACH(module, &g_bdev_mgr.bdev_modules, internal.tailq) { 2885 if (module->examine_disk) { 2886 module->internal.action_in_progress++; 2887 module->examine_disk(bdev); 2888 } 2889 } 2890 } 2891 2892 int 2893 spdk_bdev_register(struct spdk_bdev *bdev) 2894 { 2895 int rc = spdk_bdev_init(bdev); 2896 2897 if (rc == 0) { 2898 spdk_bdev_start(bdev); 2899 } 2900 2901 return rc; 2902 } 2903 2904 static void 2905 spdk_vbdev_remove_base_bdevs(struct spdk_bdev *vbdev) 2906 { 2907 struct spdk_bdev **bdevs; 2908 struct spdk_bdev *base; 2909 size_t i, j, k; 2910 bool found; 2911 2912 /* Iterate over base bdevs to remove vbdev from them. */ 2913 for (i = 0; i < vbdev->internal.base_bdevs_cnt; i++) { 2914 found = false; 2915 base = vbdev->internal.base_bdevs[i]; 2916 2917 for (j = 0; j < base->vbdevs_cnt; j++) { 2918 if (base->vbdevs[j] != vbdev) { 2919 continue; 2920 } 2921 2922 for (k = j; k + 1 < base->vbdevs_cnt; k++) { 2923 base->vbdevs[k] = base->vbdevs[k + 1]; 2924 } 2925 2926 base->vbdevs_cnt--; 2927 if (base->vbdevs_cnt > 0) { 2928 bdevs = realloc(base->vbdevs, base->vbdevs_cnt * sizeof(bdevs[0])); 2929 /* It would be odd if shrinking memory block fail. */ 2930 assert(bdevs); 2931 base->vbdevs = bdevs; 2932 } else { 2933 free(base->vbdevs); 2934 base->vbdevs = NULL; 2935 } 2936 2937 found = true; 2938 break; 2939 } 2940 2941 if (!found) { 2942 SPDK_WARNLOG("Bdev '%s' is not base bdev of '%s'.\n", base->name, vbdev->name); 2943 } 2944 } 2945 2946 free(vbdev->internal.base_bdevs); 2947 vbdev->internal.base_bdevs = NULL; 2948 vbdev->internal.base_bdevs_cnt = 0; 2949 } 2950 2951 static int 2952 spdk_vbdev_set_base_bdevs(struct spdk_bdev *vbdev, struct spdk_bdev **base_bdevs, size_t cnt) 2953 { 2954 struct spdk_bdev **vbdevs; 2955 struct spdk_bdev *base; 2956 size_t i; 2957 2958 /* Adding base bdevs isn't supported (yet?). */ 2959 assert(vbdev->internal.base_bdevs_cnt == 0); 2960 2961 vbdev->internal.base_bdevs = malloc(cnt * sizeof(vbdev->internal.base_bdevs[0])); 2962 if (!vbdev->internal.base_bdevs) { 2963 SPDK_ERRLOG("%s - realloc() failed\n", vbdev->name); 2964 return -ENOMEM; 2965 } 2966 2967 memcpy(vbdev->internal.base_bdevs, base_bdevs, cnt * sizeof(vbdev->internal.base_bdevs[0])); 2968 vbdev->internal.base_bdevs_cnt = cnt; 2969 2970 /* Iterate over base bdevs to add this vbdev to them. */ 2971 for (i = 0; i < cnt; i++) { 2972 base = vbdev->internal.base_bdevs[i]; 2973 2974 assert(base != NULL); 2975 assert(base->internal.claim_module != NULL); 2976 2977 vbdevs = realloc(base->vbdevs, (base->vbdevs_cnt + 1) * sizeof(vbdevs[0])); 2978 if (!vbdevs) { 2979 SPDK_ERRLOG("%s - realloc() failed\n", base->name); 2980 spdk_vbdev_remove_base_bdevs(vbdev); 2981 return -ENOMEM; 2982 } 2983 2984 vbdevs[base->vbdevs_cnt] = vbdev; 2985 base->vbdevs = vbdevs; 2986 base->vbdevs_cnt++; 2987 } 2988 2989 return 0; 2990 } 2991 2992 int 2993 spdk_vbdev_register(struct spdk_bdev *vbdev, struct spdk_bdev **base_bdevs, int base_bdev_count) 2994 { 2995 int rc; 2996 2997 rc = spdk_bdev_init(vbdev); 2998 if (rc) { 2999 return rc; 3000 } 3001 3002 if (base_bdev_count == 0) { 3003 spdk_bdev_start(vbdev); 3004 return 0; 3005 } 3006 3007 rc = spdk_vbdev_set_base_bdevs(vbdev, base_bdevs, base_bdev_count); 3008 if (rc) { 3009 spdk_bdev_fini(vbdev); 3010 return rc; 3011 } 3012 3013 spdk_bdev_start(vbdev); 3014 return 0; 3015 3016 } 3017 3018 void 3019 spdk_bdev_destruct_done(struct spdk_bdev *bdev, int bdeverrno) 3020 { 3021 if (bdev->internal.unregister_cb != NULL) { 3022 bdev->internal.unregister_cb(bdev->internal.unregister_ctx, bdeverrno); 3023 } 3024 } 3025 3026 static void 3027 _remove_notify(void *arg) 3028 { 3029 struct spdk_bdev_desc *desc = arg; 3030 3031 desc->remove_cb(desc->remove_ctx); 3032 } 3033 3034 void 3035 spdk_bdev_unregister(struct spdk_bdev *bdev, spdk_bdev_unregister_cb cb_fn, void *cb_arg) 3036 { 3037 struct spdk_bdev_desc *desc, *tmp; 3038 bool do_destruct = true; 3039 struct spdk_thread *thread; 3040 3041 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Removing bdev %s from list\n", bdev->name); 3042 3043 thread = spdk_get_thread(); 3044 if (!thread) { 3045 /* The user called this from a non-SPDK thread. */ 3046 if (cb_fn != NULL) { 3047 cb_fn(cb_arg, -ENOTSUP); 3048 } 3049 return; 3050 } 3051 3052 pthread_mutex_lock(&bdev->internal.mutex); 3053 3054 spdk_vbdev_remove_base_bdevs(bdev); 3055 3056 bdev->internal.status = SPDK_BDEV_STATUS_REMOVING; 3057 bdev->internal.unregister_cb = cb_fn; 3058 bdev->internal.unregister_ctx = cb_arg; 3059 3060 TAILQ_FOREACH_SAFE(desc, &bdev->internal.open_descs, link, tmp) { 3061 if (desc->remove_cb) { 3062 do_destruct = false; 3063 /* 3064 * Defer invocation of the remove_cb to a separate message that will 3065 * run later on this thread. This ensures this context unwinds and 3066 * we don't recursively unregister this bdev again if the remove_cb 3067 * immediately closes its descriptor. 3068 */ 3069 if (!desc->remove_scheduled) { 3070 /* Avoid scheduling removal of the same descriptor multiple times. */ 3071 desc->remove_scheduled = true; 3072 spdk_thread_send_msg(thread, _remove_notify, desc); 3073 } 3074 } 3075 } 3076 3077 if (!do_destruct) { 3078 pthread_mutex_unlock(&bdev->internal.mutex); 3079 return; 3080 } 3081 3082 TAILQ_REMOVE(&g_bdev_mgr.bdevs, bdev, internal.link); 3083 pthread_mutex_unlock(&bdev->internal.mutex); 3084 3085 spdk_bdev_fini(bdev); 3086 } 3087 3088 int 3089 spdk_bdev_open(struct spdk_bdev *bdev, bool write, spdk_bdev_remove_cb_t remove_cb, 3090 void *remove_ctx, struct spdk_bdev_desc **_desc) 3091 { 3092 struct spdk_bdev_desc *desc; 3093 3094 desc = calloc(1, sizeof(*desc)); 3095 if (desc == NULL) { 3096 SPDK_ERRLOG("Failed to allocate memory for bdev descriptor\n"); 3097 return -ENOMEM; 3098 } 3099 3100 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Opening descriptor %p for bdev %s on thread %p\n", desc, bdev->name, 3101 spdk_get_thread()); 3102 3103 pthread_mutex_lock(&bdev->internal.mutex); 3104 3105 if (write && bdev->internal.claim_module) { 3106 SPDK_ERRLOG("Could not open %s - already claimed\n", bdev->name); 3107 free(desc); 3108 pthread_mutex_unlock(&bdev->internal.mutex); 3109 return -EPERM; 3110 } 3111 3112 TAILQ_INSERT_TAIL(&bdev->internal.open_descs, desc, link); 3113 3114 desc->bdev = bdev; 3115 desc->remove_cb = remove_cb; 3116 desc->remove_ctx = remove_ctx; 3117 desc->write = write; 3118 *_desc = desc; 3119 3120 pthread_mutex_unlock(&bdev->internal.mutex); 3121 3122 return 0; 3123 } 3124 3125 void 3126 spdk_bdev_close(struct spdk_bdev_desc *desc) 3127 { 3128 struct spdk_bdev *bdev = desc->bdev; 3129 bool do_unregister = false; 3130 3131 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Closing descriptor %p for bdev %s on thread %p\n", desc, bdev->name, 3132 spdk_get_thread()); 3133 3134 pthread_mutex_lock(&bdev->internal.mutex); 3135 3136 TAILQ_REMOVE(&bdev->internal.open_descs, desc, link); 3137 free(desc); 3138 3139 /* If no more descriptors, kill QoS channel */ 3140 if (bdev->internal.qos && TAILQ_EMPTY(&bdev->internal.open_descs)) { 3141 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Closed last descriptor for bdev %s on thread %p. Stopping QoS.\n", 3142 bdev->name, spdk_get_thread()); 3143 3144 if (spdk_bdev_qos_destroy(bdev)) { 3145 /* There isn't anything we can do to recover here. Just let the 3146 * old QoS poller keep running. The QoS handling won't change 3147 * cores when the user allocates a new channel, but it won't break. */ 3148 SPDK_ERRLOG("Unable to shut down QoS poller. It will continue running on the current thread.\n"); 3149 } 3150 } 3151 3152 spdk_bdev_set_qd_sampling_period(bdev, 0); 3153 3154 if (bdev->internal.status == SPDK_BDEV_STATUS_REMOVING && TAILQ_EMPTY(&bdev->internal.open_descs)) { 3155 do_unregister = true; 3156 } 3157 pthread_mutex_unlock(&bdev->internal.mutex); 3158 3159 if (do_unregister == true) { 3160 spdk_bdev_unregister(bdev, bdev->internal.unregister_cb, bdev->internal.unregister_ctx); 3161 } 3162 } 3163 3164 int 3165 spdk_bdev_module_claim_bdev(struct spdk_bdev *bdev, struct spdk_bdev_desc *desc, 3166 struct spdk_bdev_module *module) 3167 { 3168 if (bdev->internal.claim_module != NULL) { 3169 SPDK_ERRLOG("bdev %s already claimed by module %s\n", bdev->name, 3170 bdev->internal.claim_module->name); 3171 return -EPERM; 3172 } 3173 3174 if (desc && !desc->write) { 3175 desc->write = true; 3176 } 3177 3178 bdev->internal.claim_module = module; 3179 return 0; 3180 } 3181 3182 void 3183 spdk_bdev_module_release_bdev(struct spdk_bdev *bdev) 3184 { 3185 assert(bdev->internal.claim_module != NULL); 3186 bdev->internal.claim_module = NULL; 3187 } 3188 3189 struct spdk_bdev * 3190 spdk_bdev_desc_get_bdev(struct spdk_bdev_desc *desc) 3191 { 3192 return desc->bdev; 3193 } 3194 3195 void 3196 spdk_bdev_io_get_iovec(struct spdk_bdev_io *bdev_io, struct iovec **iovp, int *iovcntp) 3197 { 3198 struct iovec *iovs; 3199 int iovcnt; 3200 3201 if (bdev_io == NULL) { 3202 return; 3203 } 3204 3205 switch (bdev_io->type) { 3206 case SPDK_BDEV_IO_TYPE_READ: 3207 iovs = bdev_io->u.bdev.iovs; 3208 iovcnt = bdev_io->u.bdev.iovcnt; 3209 break; 3210 case SPDK_BDEV_IO_TYPE_WRITE: 3211 iovs = bdev_io->u.bdev.iovs; 3212 iovcnt = bdev_io->u.bdev.iovcnt; 3213 break; 3214 default: 3215 iovs = NULL; 3216 iovcnt = 0; 3217 break; 3218 } 3219 3220 if (iovp) { 3221 *iovp = iovs; 3222 } 3223 if (iovcntp) { 3224 *iovcntp = iovcnt; 3225 } 3226 } 3227 3228 void 3229 spdk_bdev_module_list_add(struct spdk_bdev_module *bdev_module) 3230 { 3231 3232 if (spdk_bdev_module_list_find(bdev_module->name)) { 3233 SPDK_ERRLOG("ERROR: module '%s' already registered.\n", bdev_module->name); 3234 assert(false); 3235 } 3236 3237 if (bdev_module->async_init) { 3238 bdev_module->internal.action_in_progress = 1; 3239 } 3240 3241 /* 3242 * Modules with examine callbacks must be initialized first, so they are 3243 * ready to handle examine callbacks from later modules that will 3244 * register physical bdevs. 3245 */ 3246 if (bdev_module->examine_config != NULL || bdev_module->examine_disk != NULL) { 3247 TAILQ_INSERT_HEAD(&g_bdev_mgr.bdev_modules, bdev_module, internal.tailq); 3248 } else { 3249 TAILQ_INSERT_TAIL(&g_bdev_mgr.bdev_modules, bdev_module, internal.tailq); 3250 } 3251 } 3252 3253 struct spdk_bdev_module * 3254 spdk_bdev_module_list_find(const char *name) 3255 { 3256 struct spdk_bdev_module *bdev_module; 3257 3258 TAILQ_FOREACH(bdev_module, &g_bdev_mgr.bdev_modules, internal.tailq) { 3259 if (strcmp(name, bdev_module->name) == 0) { 3260 break; 3261 } 3262 } 3263 3264 return bdev_module; 3265 } 3266 3267 static void 3268 spdk_bdev_write_zeroes_split(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg) 3269 { 3270 uint64_t len; 3271 3272 if (!success) { 3273 bdev_io->internal.cb = bdev_io->u.bdev.stored_user_cb; 3274 _spdk_bdev_io_complete(bdev_io); 3275 return; 3276 } 3277 3278 /* no need to perform the error checking from write_zeroes_blocks because this request already passed those checks. */ 3279 len = spdk_min(spdk_bdev_get_block_size(bdev_io->bdev) * bdev_io->u.bdev.split_remaining_num_blocks, 3280 ZERO_BUFFER_SIZE); 3281 3282 bdev_io->u.bdev.offset_blocks = bdev_io->u.bdev.split_current_offset_blocks; 3283 bdev_io->u.bdev.iovs[0].iov_len = len; 3284 bdev_io->u.bdev.num_blocks = len / spdk_bdev_get_block_size(bdev_io->bdev); 3285 bdev_io->u.bdev.split_remaining_num_blocks -= bdev_io->u.bdev.num_blocks; 3286 bdev_io->u.bdev.split_current_offset_blocks += bdev_io->u.bdev.num_blocks; 3287 3288 /* if this round completes the i/o, change the callback to be the original user callback */ 3289 if (bdev_io->u.bdev.split_remaining_num_blocks == 0) { 3290 spdk_bdev_io_init(bdev_io, bdev_io->bdev, cb_arg, bdev_io->u.bdev.stored_user_cb); 3291 } else { 3292 spdk_bdev_io_init(bdev_io, bdev_io->bdev, cb_arg, spdk_bdev_write_zeroes_split); 3293 } 3294 spdk_bdev_io_submit(bdev_io); 3295 } 3296 3297 struct set_qos_limit_ctx { 3298 void (*cb_fn)(void *cb_arg, int status); 3299 void *cb_arg; 3300 struct spdk_bdev *bdev; 3301 }; 3302 3303 static void 3304 _spdk_bdev_set_qos_limit_done(struct set_qos_limit_ctx *ctx, int status) 3305 { 3306 pthread_mutex_lock(&ctx->bdev->internal.mutex); 3307 ctx->bdev->internal.qos_mod_in_progress = false; 3308 pthread_mutex_unlock(&ctx->bdev->internal.mutex); 3309 3310 ctx->cb_fn(ctx->cb_arg, status); 3311 free(ctx); 3312 } 3313 3314 static void 3315 _spdk_bdev_disable_qos_done(void *cb_arg) 3316 { 3317 struct set_qos_limit_ctx *ctx = cb_arg; 3318 struct spdk_bdev *bdev = ctx->bdev; 3319 struct spdk_bdev_io *bdev_io; 3320 struct spdk_bdev_qos *qos; 3321 3322 pthread_mutex_lock(&bdev->internal.mutex); 3323 qos = bdev->internal.qos; 3324 bdev->internal.qos = NULL; 3325 pthread_mutex_unlock(&bdev->internal.mutex); 3326 3327 while (!TAILQ_EMPTY(&qos->queued)) { 3328 /* Send queued I/O back to their original thread for resubmission. */ 3329 bdev_io = TAILQ_FIRST(&qos->queued); 3330 TAILQ_REMOVE(&qos->queued, bdev_io, internal.link); 3331 3332 if (bdev_io->internal.io_submit_ch) { 3333 /* 3334 * Channel was changed when sending it to the QoS thread - change it back 3335 * before sending it back to the original thread. 3336 */ 3337 bdev_io->internal.ch = bdev_io->internal.io_submit_ch; 3338 bdev_io->internal.io_submit_ch = NULL; 3339 } 3340 3341 spdk_thread_send_msg(spdk_io_channel_get_thread(bdev_io->internal.ch->channel), 3342 _spdk_bdev_io_submit, bdev_io); 3343 } 3344 3345 spdk_put_io_channel(spdk_io_channel_from_ctx(qos->ch)); 3346 spdk_poller_unregister(&qos->poller); 3347 3348 free(qos); 3349 3350 _spdk_bdev_set_qos_limit_done(ctx, 0); 3351 } 3352 3353 static void 3354 _spdk_bdev_disable_qos_msg_done(struct spdk_io_channel_iter *i, int status) 3355 { 3356 void *io_device = spdk_io_channel_iter_get_io_device(i); 3357 struct spdk_bdev *bdev = __bdev_from_io_dev(io_device); 3358 struct set_qos_limit_ctx *ctx = spdk_io_channel_iter_get_ctx(i); 3359 struct spdk_thread *thread; 3360 3361 pthread_mutex_lock(&bdev->internal.mutex); 3362 thread = bdev->internal.qos->thread; 3363 pthread_mutex_unlock(&bdev->internal.mutex); 3364 3365 spdk_thread_send_msg(thread, _spdk_bdev_disable_qos_done, ctx); 3366 } 3367 3368 static void 3369 _spdk_bdev_disable_qos_msg(struct spdk_io_channel_iter *i) 3370 { 3371 struct spdk_io_channel *ch = spdk_io_channel_iter_get_channel(i); 3372 struct spdk_bdev_channel *bdev_ch = spdk_io_channel_get_ctx(ch); 3373 3374 bdev_ch->flags &= ~BDEV_CH_QOS_ENABLED; 3375 3376 spdk_for_each_channel_continue(i, 0); 3377 } 3378 3379 static void 3380 _spdk_bdev_update_qos_limit_iops_msg(void *cb_arg) 3381 { 3382 struct set_qos_limit_ctx *ctx = cb_arg; 3383 struct spdk_bdev *bdev = ctx->bdev; 3384 3385 pthread_mutex_lock(&bdev->internal.mutex); 3386 spdk_bdev_qos_update_max_quota_per_timeslice(bdev->internal.qos); 3387 pthread_mutex_unlock(&bdev->internal.mutex); 3388 3389 _spdk_bdev_set_qos_limit_done(ctx, 0); 3390 } 3391 3392 static void 3393 _spdk_bdev_enable_qos_msg(struct spdk_io_channel_iter *i) 3394 { 3395 void *io_device = spdk_io_channel_iter_get_io_device(i); 3396 struct spdk_bdev *bdev = __bdev_from_io_dev(io_device); 3397 struct spdk_io_channel *ch = spdk_io_channel_iter_get_channel(i); 3398 struct spdk_bdev_channel *bdev_ch = spdk_io_channel_get_ctx(ch); 3399 int rc; 3400 3401 pthread_mutex_lock(&bdev->internal.mutex); 3402 rc = _spdk_bdev_enable_qos(bdev, bdev_ch); 3403 pthread_mutex_unlock(&bdev->internal.mutex); 3404 spdk_for_each_channel_continue(i, rc); 3405 } 3406 3407 static void 3408 _spdk_bdev_enable_qos_done(struct spdk_io_channel_iter *i, int status) 3409 { 3410 struct set_qos_limit_ctx *ctx = spdk_io_channel_iter_get_ctx(i); 3411 3412 _spdk_bdev_set_qos_limit_done(ctx, status); 3413 } 3414 3415 void 3416 spdk_bdev_set_qos_limit_iops(struct spdk_bdev *bdev, uint64_t ios_per_sec, 3417 void (*cb_fn)(void *cb_arg, int status), void *cb_arg) 3418 { 3419 struct set_qos_limit_ctx *ctx; 3420 3421 if (ios_per_sec > 0 && ios_per_sec % SPDK_BDEV_QOS_MIN_IOS_PER_SEC) { 3422 SPDK_ERRLOG("Requested ios_per_sec limit %" PRIu64 " is not a multiple of %u\n", 3423 ios_per_sec, SPDK_BDEV_QOS_MIN_IOS_PER_SEC); 3424 cb_fn(cb_arg, -EINVAL); 3425 return; 3426 } 3427 3428 ctx = calloc(1, sizeof(*ctx)); 3429 if (ctx == NULL) { 3430 cb_fn(cb_arg, -ENOMEM); 3431 return; 3432 } 3433 3434 ctx->cb_fn = cb_fn; 3435 ctx->cb_arg = cb_arg; 3436 ctx->bdev = bdev; 3437 3438 pthread_mutex_lock(&bdev->internal.mutex); 3439 if (bdev->internal.qos_mod_in_progress) { 3440 pthread_mutex_unlock(&bdev->internal.mutex); 3441 free(ctx); 3442 cb_fn(cb_arg, -EAGAIN); 3443 return; 3444 } 3445 bdev->internal.qos_mod_in_progress = true; 3446 3447 if (ios_per_sec > 0) { 3448 if (bdev->internal.qos == NULL) { 3449 /* Enabling */ 3450 bdev->internal.qos = calloc(1, sizeof(*bdev->internal.qos)); 3451 if (!bdev->internal.qos) { 3452 pthread_mutex_unlock(&bdev->internal.mutex); 3453 SPDK_ERRLOG("Unable to allocate memory for QoS tracking\n"); 3454 free(ctx); 3455 cb_fn(cb_arg, -ENOMEM); 3456 return; 3457 } 3458 3459 bdev->internal.qos->iops_rate_limit = ios_per_sec; 3460 spdk_for_each_channel(__bdev_to_io_dev(bdev), 3461 _spdk_bdev_enable_qos_msg, ctx, 3462 _spdk_bdev_enable_qos_done); 3463 } else { 3464 /* Updating */ 3465 bdev->internal.qos->iops_rate_limit = ios_per_sec; 3466 spdk_thread_send_msg(bdev->internal.qos->thread, _spdk_bdev_update_qos_limit_iops_msg, ctx); 3467 } 3468 } else { 3469 if (bdev->internal.qos != NULL) { 3470 /* Disabling */ 3471 spdk_for_each_channel(__bdev_to_io_dev(bdev), 3472 _spdk_bdev_disable_qos_msg, ctx, 3473 _spdk_bdev_disable_qos_msg_done); 3474 } else { 3475 pthread_mutex_unlock(&bdev->internal.mutex); 3476 _spdk_bdev_set_qos_limit_done(ctx, 0); 3477 return; 3478 } 3479 } 3480 3481 pthread_mutex_unlock(&bdev->internal.mutex); 3482 } 3483 3484 SPDK_LOG_REGISTER_COMPONENT("bdev", SPDK_LOG_BDEV) 3485