1 /*- 2 * BSD LICENSE 3 * 4 * Copyright (C) 2008-2012 Daisuke Aoyama <aoyama@peach.ne.jp>. 5 * Copyright (c) Intel Corporation. 6 * All rights reserved. 7 * 8 * Redistribution and use in source and binary forms, with or without 9 * modification, are permitted provided that the following conditions 10 * are met: 11 * 12 * * Redistributions of source code must retain the above copyright 13 * notice, this list of conditions and the following disclaimer. 14 * * Redistributions in binary form must reproduce the above copyright 15 * notice, this list of conditions and the following disclaimer in 16 * the documentation and/or other materials provided with the 17 * distribution. 18 * * Neither the name of Intel Corporation nor the names of its 19 * contributors may be used to endorse or promote products derived 20 * from this software without specific prior written permission. 21 * 22 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 23 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 24 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR 25 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT 26 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, 27 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT 28 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 29 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY 30 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 31 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE 32 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 33 */ 34 35 #include "spdk/stdinc.h" 36 37 #include "spdk/bdev.h" 38 #include "spdk/conf.h" 39 40 #include "spdk/env.h" 41 #include "spdk/event.h" 42 #include "spdk/thread.h" 43 #include "spdk/likely.h" 44 #include "spdk/queue.h" 45 #include "spdk/nvme_spec.h" 46 #include "spdk/scsi_spec.h" 47 #include "spdk/util.h" 48 49 #include "spdk/bdev_module.h" 50 #include "spdk_internal/log.h" 51 #include "spdk/string.h" 52 53 #ifdef SPDK_CONFIG_VTUNE 54 #include "ittnotify.h" 55 #include "ittnotify_types.h" 56 int __itt_init_ittlib(const char *, __itt_group_id); 57 #endif 58 59 #define SPDK_BDEV_IO_POOL_SIZE (64 * 1024) 60 #define SPDK_BDEV_IO_CACHE_SIZE 256 61 #define BUF_SMALL_POOL_SIZE 8192 62 #define BUF_LARGE_POOL_SIZE 1024 63 #define NOMEM_THRESHOLD_COUNT 8 64 #define ZERO_BUFFER_SIZE 0x100000 65 #define SPDK_BDEV_QOS_TIMESLICE_IN_USEC 1000 66 #define SPDK_BDEV_SEC_TO_USEC 1000000ULL 67 #define SPDK_BDEV_QOS_MIN_IO_PER_TIMESLICE 1 68 #define SPDK_BDEV_QOS_MIN_BYTE_PER_TIMESLICE 512 69 #define SPDK_BDEV_QOS_MIN_IOS_PER_SEC 10000 70 #define SPDK_BDEV_QOS_MIN_BW_IN_MB_PER_SEC 10 71 72 enum spdk_bdev_qos_type { 73 SPDK_BDEV_QOS_RW_IOPS_RATE_LIMIT = 0, 74 SPDK_BDEV_QOS_RW_BYTEPS_RATE_LIMIT, 75 SPDK_BDEV_QOS_NUM_TYPES /* Keep last */ 76 }; 77 78 static const char *qos_type_str[SPDK_BDEV_QOS_NUM_TYPES] = {"Limit_IOPS", "Limit_BWPS"}; 79 80 struct spdk_bdev_mgr { 81 struct spdk_mempool *bdev_io_pool; 82 83 struct spdk_mempool *buf_small_pool; 84 struct spdk_mempool *buf_large_pool; 85 86 void *zero_buffer; 87 88 TAILQ_HEAD(, spdk_bdev_module) bdev_modules; 89 90 TAILQ_HEAD(, spdk_bdev) bdevs; 91 92 bool init_complete; 93 bool module_init_complete; 94 95 #ifdef SPDK_CONFIG_VTUNE 96 __itt_domain *domain; 97 #endif 98 }; 99 100 static struct spdk_bdev_mgr g_bdev_mgr = { 101 .bdev_modules = TAILQ_HEAD_INITIALIZER(g_bdev_mgr.bdev_modules), 102 .bdevs = TAILQ_HEAD_INITIALIZER(g_bdev_mgr.bdevs), 103 .init_complete = false, 104 .module_init_complete = false, 105 }; 106 107 static struct spdk_bdev_opts g_bdev_opts = { 108 .bdev_io_pool_size = SPDK_BDEV_IO_POOL_SIZE, 109 .bdev_io_cache_size = SPDK_BDEV_IO_CACHE_SIZE, 110 }; 111 112 static spdk_bdev_init_cb g_init_cb_fn = NULL; 113 static void *g_init_cb_arg = NULL; 114 115 static spdk_bdev_fini_cb g_fini_cb_fn = NULL; 116 static void *g_fini_cb_arg = NULL; 117 static struct spdk_thread *g_fini_thread = NULL; 118 119 struct spdk_bdev_qos { 120 /** Rate limit, in I/O per second */ 121 uint64_t iops_rate_limit; 122 123 /** Rate limit, in byte per second */ 124 uint64_t byte_rate_limit; 125 126 /** The channel that all I/O are funneled through */ 127 struct spdk_bdev_channel *ch; 128 129 /** The thread on which the poller is running. */ 130 struct spdk_thread *thread; 131 132 /** Queue of I/O waiting to be issued. */ 133 bdev_io_tailq_t queued; 134 135 /** Maximum allowed IOs to be issued in one timeslice (e.g., 1ms) and 136 * only valid for the master channel which manages the outstanding IOs. */ 137 uint64_t max_ios_per_timeslice; 138 139 /** Maximum allowed bytes to be issued in one timeslice (e.g., 1ms) and 140 * only valid for the master channel which manages the outstanding IOs. */ 141 uint64_t max_byte_per_timeslice; 142 143 /** Submitted IO in one timeslice (e.g., 1ms) */ 144 uint64_t io_submitted_this_timeslice; 145 146 /** Submitted byte in one timeslice (e.g., 1ms) */ 147 uint64_t byte_submitted_this_timeslice; 148 149 /** Polller that processes queued I/O commands each time slice. */ 150 struct spdk_poller *poller; 151 }; 152 153 struct spdk_bdev_mgmt_channel { 154 bdev_io_stailq_t need_buf_small; 155 bdev_io_stailq_t need_buf_large; 156 157 /* 158 * Each thread keeps a cache of bdev_io - this allows 159 * bdev threads which are *not* DPDK threads to still 160 * benefit from a per-thread bdev_io cache. Without 161 * this, non-DPDK threads fetching from the mempool 162 * incur a cmpxchg on get and put. 163 */ 164 bdev_io_stailq_t per_thread_cache; 165 uint32_t per_thread_cache_count; 166 uint32_t bdev_io_cache_size; 167 168 TAILQ_HEAD(, spdk_bdev_shared_resource) shared_resources; 169 TAILQ_HEAD(, spdk_bdev_io_wait_entry) io_wait_queue; 170 }; 171 172 /* 173 * Per-module (or per-io_device) data. Multiple bdevs built on the same io_device 174 * will queue here their IO that awaits retry. It makes it posible to retry sending 175 * IO to one bdev after IO from other bdev completes. 176 */ 177 struct spdk_bdev_shared_resource { 178 /* The bdev management channel */ 179 struct spdk_bdev_mgmt_channel *mgmt_ch; 180 181 /* 182 * Count of I/O submitted to bdev module and waiting for completion. 183 * Incremented before submit_request() is called on an spdk_bdev_io. 184 */ 185 uint64_t io_outstanding; 186 187 /* 188 * Queue of IO awaiting retry because of a previous NOMEM status returned 189 * on this channel. 190 */ 191 bdev_io_tailq_t nomem_io; 192 193 /* 194 * Threshold which io_outstanding must drop to before retrying nomem_io. 195 */ 196 uint64_t nomem_threshold; 197 198 /* I/O channel allocated by a bdev module */ 199 struct spdk_io_channel *shared_ch; 200 201 /* Refcount of bdev channels using this resource */ 202 uint32_t ref; 203 204 TAILQ_ENTRY(spdk_bdev_shared_resource) link; 205 }; 206 207 #define BDEV_CH_RESET_IN_PROGRESS (1 << 0) 208 #define BDEV_CH_QOS_ENABLED (1 << 1) 209 210 struct spdk_bdev_channel { 211 struct spdk_bdev *bdev; 212 213 /* The channel for the underlying device */ 214 struct spdk_io_channel *channel; 215 216 /* Per io_device per thread data */ 217 struct spdk_bdev_shared_resource *shared_resource; 218 219 struct spdk_bdev_io_stat stat; 220 221 /* 222 * Count of I/O submitted through this channel and waiting for completion. 223 * Incremented before submit_request() is called on an spdk_bdev_io. 224 */ 225 uint64_t io_outstanding; 226 227 bdev_io_tailq_t queued_resets; 228 229 uint32_t flags; 230 231 #ifdef SPDK_CONFIG_VTUNE 232 uint64_t start_tsc; 233 uint64_t interval_tsc; 234 __itt_string_handle *handle; 235 struct spdk_bdev_io_stat prev_stat; 236 #endif 237 238 }; 239 240 struct spdk_bdev_desc { 241 struct spdk_bdev *bdev; 242 spdk_bdev_remove_cb_t remove_cb; 243 void *remove_ctx; 244 bool remove_scheduled; 245 bool write; 246 TAILQ_ENTRY(spdk_bdev_desc) link; 247 }; 248 249 struct spdk_bdev_iostat_ctx { 250 struct spdk_bdev_io_stat *stat; 251 spdk_bdev_get_device_stat_cb cb; 252 void *cb_arg; 253 }; 254 255 #define __bdev_to_io_dev(bdev) (((char *)bdev) + 1) 256 #define __bdev_from_io_dev(io_dev) ((struct spdk_bdev *)(((char *)io_dev) - 1)) 257 258 static void spdk_bdev_write_zeroes_split(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg); 259 260 void 261 spdk_bdev_get_opts(struct spdk_bdev_opts *opts) 262 { 263 *opts = g_bdev_opts; 264 } 265 266 int 267 spdk_bdev_set_opts(struct spdk_bdev_opts *opts) 268 { 269 uint32_t min_pool_size; 270 271 /* 272 * Add 1 to the thread count to account for the extra mgmt_ch that gets created during subsystem 273 * initialization. A second mgmt_ch will be created on the same thread when the application starts 274 * but before the deferred put_io_channel event is executed for the first mgmt_ch. 275 */ 276 min_pool_size = opts->bdev_io_cache_size * (spdk_thread_get_count() + 1); 277 if (opts->bdev_io_pool_size < min_pool_size) { 278 SPDK_ERRLOG("bdev_io_pool_size %" PRIu32 " is not compatible with bdev_io_cache_size %" PRIu32 279 " and %" PRIu32 " threads\n", opts->bdev_io_pool_size, opts->bdev_io_cache_size, 280 spdk_thread_get_count()); 281 SPDK_ERRLOG("bdev_io_pool_size must be at least %" PRIu32 "\n", min_pool_size); 282 return -1; 283 } 284 285 g_bdev_opts = *opts; 286 return 0; 287 } 288 289 struct spdk_bdev * 290 spdk_bdev_first(void) 291 { 292 struct spdk_bdev *bdev; 293 294 bdev = TAILQ_FIRST(&g_bdev_mgr.bdevs); 295 if (bdev) { 296 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Starting bdev iteration at %s\n", bdev->name); 297 } 298 299 return bdev; 300 } 301 302 struct spdk_bdev * 303 spdk_bdev_next(struct spdk_bdev *prev) 304 { 305 struct spdk_bdev *bdev; 306 307 bdev = TAILQ_NEXT(prev, internal.link); 308 if (bdev) { 309 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Continuing bdev iteration at %s\n", bdev->name); 310 } 311 312 return bdev; 313 } 314 315 static struct spdk_bdev * 316 _bdev_next_leaf(struct spdk_bdev *bdev) 317 { 318 while (bdev != NULL) { 319 if (bdev->internal.claim_module == NULL) { 320 return bdev; 321 } else { 322 bdev = TAILQ_NEXT(bdev, internal.link); 323 } 324 } 325 326 return bdev; 327 } 328 329 struct spdk_bdev * 330 spdk_bdev_first_leaf(void) 331 { 332 struct spdk_bdev *bdev; 333 334 bdev = _bdev_next_leaf(TAILQ_FIRST(&g_bdev_mgr.bdevs)); 335 336 if (bdev) { 337 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Starting bdev iteration at %s\n", bdev->name); 338 } 339 340 return bdev; 341 } 342 343 struct spdk_bdev * 344 spdk_bdev_next_leaf(struct spdk_bdev *prev) 345 { 346 struct spdk_bdev *bdev; 347 348 bdev = _bdev_next_leaf(TAILQ_NEXT(prev, internal.link)); 349 350 if (bdev) { 351 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Continuing bdev iteration at %s\n", bdev->name); 352 } 353 354 return bdev; 355 } 356 357 struct spdk_bdev * 358 spdk_bdev_get_by_name(const char *bdev_name) 359 { 360 struct spdk_bdev_alias *tmp; 361 struct spdk_bdev *bdev = spdk_bdev_first(); 362 363 while (bdev != NULL) { 364 if (strcmp(bdev_name, bdev->name) == 0) { 365 return bdev; 366 } 367 368 TAILQ_FOREACH(tmp, &bdev->aliases, tailq) { 369 if (strcmp(bdev_name, tmp->alias) == 0) { 370 return bdev; 371 } 372 } 373 374 bdev = spdk_bdev_next(bdev); 375 } 376 377 return NULL; 378 } 379 380 size_t 381 spdk_bdev_io_set_buf(struct spdk_bdev_io *bdev_io, void *buf, size_t len) 382 { 383 struct iovec **iovs; 384 int *iovcnt; 385 void *aligned_buf; 386 387 iovs = &bdev_io->u.bdev.iovs; 388 iovcnt = &bdev_io->u.bdev.iovcnt; 389 390 if (*iovs == NULL || *iovcnt == 0) { 391 *iovs = &bdev_io->iov; 392 *iovcnt = 1; 393 } 394 395 if (buf != NULL) { 396 aligned_buf = (void *)(((uintptr_t)buf + 512) & ~511UL); 397 len = len - ((uintptr_t)aligned_buf - (uintptr_t)buf); 398 } else { 399 aligned_buf = NULL; 400 assert(len == 0); 401 } 402 403 (*iovs)[0].iov_base = aligned_buf; 404 (*iovs)[0].iov_len = len; 405 406 return len; 407 } 408 409 static void 410 spdk_bdev_io_put_buf(struct spdk_bdev_io *bdev_io) 411 { 412 struct spdk_mempool *pool; 413 struct spdk_bdev_io *tmp; 414 void *buf; 415 bdev_io_stailq_t *stailq; 416 struct spdk_bdev_mgmt_channel *ch; 417 size_t len; 418 419 assert(bdev_io->u.bdev.iovcnt == 1); 420 421 buf = bdev_io->internal.buf; 422 ch = bdev_io->internal.ch->shared_resource->mgmt_ch; 423 424 if (bdev_io->internal.buf_len <= SPDK_BDEV_SMALL_BUF_MAX_SIZE) { 425 pool = g_bdev_mgr.buf_small_pool; 426 stailq = &ch->need_buf_small; 427 len = SPDK_BDEV_SMALL_BUF_MAX_SIZE + 512; 428 } else { 429 pool = g_bdev_mgr.buf_large_pool; 430 stailq = &ch->need_buf_large; 431 len = SPDK_BDEV_LARGE_BUF_MAX_SIZE + 512; 432 } 433 434 if (STAILQ_EMPTY(stailq)) { 435 spdk_mempool_put(pool, buf); 436 } else { 437 tmp = STAILQ_FIRST(stailq); 438 STAILQ_REMOVE_HEAD(stailq, internal.buf_link); 439 len = spdk_bdev_io_set_buf(tmp, buf, len); 440 if (len < tmp->internal.buf_len) { 441 SPDK_ERRLOG("Unable to use buffer due to alignment\n"); 442 spdk_mempool_put(pool, buf); 443 spdk_bdev_io_set_buf(tmp, NULL, 0); 444 return; 445 } 446 tmp->internal.buf = buf; 447 tmp->internal.get_buf_cb(tmp->internal.ch->channel, tmp); 448 } 449 } 450 451 void 452 spdk_bdev_io_get_buf(struct spdk_bdev_io *bdev_io, spdk_bdev_io_get_buf_cb cb, uint64_t len) 453 { 454 struct spdk_mempool *pool; 455 bdev_io_stailq_t *stailq; 456 void *buf = NULL; 457 struct spdk_bdev_mgmt_channel *mgmt_ch; 458 size_t buf_len; 459 460 assert(cb != NULL); 461 assert(bdev_io->u.bdev.iovs != NULL); 462 463 if (spdk_unlikely(bdev_io->u.bdev.iovs[0].iov_base != NULL)) { 464 /* Buffer already present */ 465 cb(bdev_io->internal.ch->channel, bdev_io); 466 return; 467 } 468 469 assert(len <= SPDK_BDEV_LARGE_BUF_MAX_SIZE); 470 mgmt_ch = bdev_io->internal.ch->shared_resource->mgmt_ch; 471 472 bdev_io->internal.buf_len = len; 473 bdev_io->internal.get_buf_cb = cb; 474 if (len <= SPDK_BDEV_SMALL_BUF_MAX_SIZE) { 475 pool = g_bdev_mgr.buf_small_pool; 476 stailq = &mgmt_ch->need_buf_small; 477 buf_len = SPDK_BDEV_SMALL_BUF_MAX_SIZE + 512; 478 } else { 479 pool = g_bdev_mgr.buf_large_pool; 480 stailq = &mgmt_ch->need_buf_large; 481 buf_len = SPDK_BDEV_LARGE_BUF_MAX_SIZE + 512; 482 } 483 484 buf = spdk_mempool_get(pool); 485 486 if (!buf) { 487 STAILQ_INSERT_TAIL(stailq, bdev_io, internal.buf_link); 488 } else { 489 size_t aligned_len; 490 491 aligned_len = spdk_bdev_io_set_buf(bdev_io, buf, buf_len); 492 if (aligned_len < len) { 493 SPDK_ERRLOG("Unable to use buffer after alignment calculations.\n"); 494 spdk_mempool_put(pool, buf); 495 spdk_bdev_io_set_buf(bdev_io, NULL, 0); 496 STAILQ_INSERT_TAIL(stailq, bdev_io, internal.buf_link); 497 return; 498 } 499 500 bdev_io->internal.buf = buf; 501 bdev_io->internal.get_buf_cb(bdev_io->internal.ch->channel, bdev_io); 502 } 503 } 504 505 static int 506 spdk_bdev_module_get_max_ctx_size(void) 507 { 508 struct spdk_bdev_module *bdev_module; 509 int max_bdev_module_size = 0; 510 511 TAILQ_FOREACH(bdev_module, &g_bdev_mgr.bdev_modules, internal.tailq) { 512 if (bdev_module->get_ctx_size && bdev_module->get_ctx_size() > max_bdev_module_size) { 513 max_bdev_module_size = bdev_module->get_ctx_size(); 514 } 515 } 516 517 return max_bdev_module_size; 518 } 519 520 void 521 spdk_bdev_config_text(FILE *fp) 522 { 523 struct spdk_bdev_module *bdev_module; 524 525 TAILQ_FOREACH(bdev_module, &g_bdev_mgr.bdev_modules, internal.tailq) { 526 if (bdev_module->config_text) { 527 bdev_module->config_text(fp); 528 } 529 } 530 } 531 532 void 533 spdk_bdev_subsystem_config_json(struct spdk_json_write_ctx *w) 534 { 535 struct spdk_bdev_module *bdev_module; 536 struct spdk_bdev *bdev; 537 538 assert(w != NULL); 539 540 spdk_json_write_array_begin(w); 541 542 spdk_json_write_object_begin(w); 543 spdk_json_write_named_string(w, "method", "set_bdev_options"); 544 spdk_json_write_name(w, "params"); 545 spdk_json_write_object_begin(w); 546 spdk_json_write_named_uint32(w, "bdev_io_pool_size", g_bdev_opts.bdev_io_pool_size); 547 spdk_json_write_named_uint32(w, "bdev_io_cache_size", g_bdev_opts.bdev_io_cache_size); 548 spdk_json_write_object_end(w); 549 spdk_json_write_object_end(w); 550 551 TAILQ_FOREACH(bdev_module, &g_bdev_mgr.bdev_modules, internal.tailq) { 552 if (bdev_module->config_json) { 553 bdev_module->config_json(w); 554 } 555 } 556 557 TAILQ_FOREACH(bdev, &g_bdev_mgr.bdevs, internal.link) { 558 spdk_bdev_config_json(bdev, w); 559 } 560 561 spdk_json_write_array_end(w); 562 } 563 564 static int 565 spdk_bdev_mgmt_channel_create(void *io_device, void *ctx_buf) 566 { 567 struct spdk_bdev_mgmt_channel *ch = ctx_buf; 568 struct spdk_bdev_io *bdev_io; 569 uint32_t i; 570 571 STAILQ_INIT(&ch->need_buf_small); 572 STAILQ_INIT(&ch->need_buf_large); 573 574 STAILQ_INIT(&ch->per_thread_cache); 575 ch->bdev_io_cache_size = g_bdev_opts.bdev_io_cache_size; 576 577 /* Pre-populate bdev_io cache to ensure this thread cannot be starved. */ 578 ch->per_thread_cache_count = 0; 579 for (i = 0; i < ch->bdev_io_cache_size; i++) { 580 bdev_io = spdk_mempool_get(g_bdev_mgr.bdev_io_pool); 581 assert(bdev_io != NULL); 582 ch->per_thread_cache_count++; 583 STAILQ_INSERT_TAIL(&ch->per_thread_cache, bdev_io, internal.buf_link); 584 } 585 586 TAILQ_INIT(&ch->shared_resources); 587 TAILQ_INIT(&ch->io_wait_queue); 588 589 return 0; 590 } 591 592 static void 593 spdk_bdev_mgmt_channel_destroy(void *io_device, void *ctx_buf) 594 { 595 struct spdk_bdev_mgmt_channel *ch = ctx_buf; 596 struct spdk_bdev_io *bdev_io; 597 598 if (!STAILQ_EMPTY(&ch->need_buf_small) || !STAILQ_EMPTY(&ch->need_buf_large)) { 599 SPDK_ERRLOG("Pending I/O list wasn't empty on mgmt channel free\n"); 600 } 601 602 if (!TAILQ_EMPTY(&ch->shared_resources)) { 603 SPDK_ERRLOG("Module channel list wasn't empty on mgmt channel free\n"); 604 } 605 606 while (!STAILQ_EMPTY(&ch->per_thread_cache)) { 607 bdev_io = STAILQ_FIRST(&ch->per_thread_cache); 608 STAILQ_REMOVE_HEAD(&ch->per_thread_cache, internal.buf_link); 609 ch->per_thread_cache_count--; 610 spdk_mempool_put(g_bdev_mgr.bdev_io_pool, (void *)bdev_io); 611 } 612 613 assert(ch->per_thread_cache_count == 0); 614 } 615 616 static void 617 spdk_bdev_init_complete(int rc) 618 { 619 spdk_bdev_init_cb cb_fn = g_init_cb_fn; 620 void *cb_arg = g_init_cb_arg; 621 struct spdk_bdev_module *m; 622 623 g_bdev_mgr.init_complete = true; 624 g_init_cb_fn = NULL; 625 g_init_cb_arg = NULL; 626 627 /* 628 * For modules that need to know when subsystem init is complete, 629 * inform them now. 630 */ 631 TAILQ_FOREACH(m, &g_bdev_mgr.bdev_modules, internal.tailq) { 632 if (m->init_complete) { 633 m->init_complete(); 634 } 635 } 636 637 cb_fn(cb_arg, rc); 638 } 639 640 static void 641 spdk_bdev_module_action_complete(void) 642 { 643 struct spdk_bdev_module *m; 644 645 /* 646 * Don't finish bdev subsystem initialization if 647 * module pre-initialization is still in progress, or 648 * the subsystem been already initialized. 649 */ 650 if (!g_bdev_mgr.module_init_complete || g_bdev_mgr.init_complete) { 651 return; 652 } 653 654 /* 655 * Check all bdev modules for inits/examinations in progress. If any 656 * exist, return immediately since we cannot finish bdev subsystem 657 * initialization until all are completed. 658 */ 659 TAILQ_FOREACH(m, &g_bdev_mgr.bdev_modules, internal.tailq) { 660 if (m->internal.action_in_progress > 0) { 661 return; 662 } 663 } 664 665 /* 666 * Modules already finished initialization - now that all 667 * the bdev modules have finished their asynchronous I/O 668 * processing, the entire bdev layer can be marked as complete. 669 */ 670 spdk_bdev_init_complete(0); 671 } 672 673 static void 674 spdk_bdev_module_action_done(struct spdk_bdev_module *module) 675 { 676 assert(module->internal.action_in_progress > 0); 677 module->internal.action_in_progress--; 678 spdk_bdev_module_action_complete(); 679 } 680 681 void 682 spdk_bdev_module_init_done(struct spdk_bdev_module *module) 683 { 684 spdk_bdev_module_action_done(module); 685 } 686 687 void 688 spdk_bdev_module_examine_done(struct spdk_bdev_module *module) 689 { 690 spdk_bdev_module_action_done(module); 691 } 692 693 static int 694 spdk_bdev_modules_init(void) 695 { 696 struct spdk_bdev_module *module; 697 int rc = 0; 698 699 TAILQ_FOREACH(module, &g_bdev_mgr.bdev_modules, internal.tailq) { 700 rc = module->module_init(); 701 if (rc != 0) { 702 break; 703 } 704 } 705 706 g_bdev_mgr.module_init_complete = true; 707 return rc; 708 } 709 710 void 711 spdk_bdev_initialize(spdk_bdev_init_cb cb_fn, void *cb_arg) 712 { 713 struct spdk_conf_section *sp; 714 struct spdk_bdev_opts bdev_opts; 715 int32_t bdev_io_pool_size, bdev_io_cache_size; 716 int cache_size; 717 int rc = 0; 718 char mempool_name[32]; 719 720 assert(cb_fn != NULL); 721 722 sp = spdk_conf_find_section(NULL, "Bdev"); 723 if (sp != NULL) { 724 spdk_bdev_get_opts(&bdev_opts); 725 726 bdev_io_pool_size = spdk_conf_section_get_intval(sp, "BdevIoPoolSize"); 727 if (bdev_io_pool_size >= 0) { 728 bdev_opts.bdev_io_pool_size = bdev_io_pool_size; 729 } 730 731 bdev_io_cache_size = spdk_conf_section_get_intval(sp, "BdevIoCacheSize"); 732 if (bdev_io_cache_size >= 0) { 733 bdev_opts.bdev_io_cache_size = bdev_io_cache_size; 734 } 735 736 if (spdk_bdev_set_opts(&bdev_opts)) { 737 spdk_bdev_init_complete(-1); 738 return; 739 } 740 741 assert(memcmp(&bdev_opts, &g_bdev_opts, sizeof(bdev_opts)) == 0); 742 } 743 744 g_init_cb_fn = cb_fn; 745 g_init_cb_arg = cb_arg; 746 747 snprintf(mempool_name, sizeof(mempool_name), "bdev_io_%d", getpid()); 748 749 g_bdev_mgr.bdev_io_pool = spdk_mempool_create(mempool_name, 750 g_bdev_opts.bdev_io_pool_size, 751 sizeof(struct spdk_bdev_io) + 752 spdk_bdev_module_get_max_ctx_size(), 753 0, 754 SPDK_ENV_SOCKET_ID_ANY); 755 756 if (g_bdev_mgr.bdev_io_pool == NULL) { 757 SPDK_ERRLOG("could not allocate spdk_bdev_io pool\n"); 758 spdk_bdev_init_complete(-1); 759 return; 760 } 761 762 /** 763 * Ensure no more than half of the total buffers end up local caches, by 764 * using spdk_thread_get_count() to determine how many local caches we need 765 * to account for. 766 */ 767 cache_size = BUF_SMALL_POOL_SIZE / (2 * spdk_thread_get_count()); 768 snprintf(mempool_name, sizeof(mempool_name), "buf_small_pool_%d", getpid()); 769 770 g_bdev_mgr.buf_small_pool = spdk_mempool_create(mempool_name, 771 BUF_SMALL_POOL_SIZE, 772 SPDK_BDEV_SMALL_BUF_MAX_SIZE + 512, 773 cache_size, 774 SPDK_ENV_SOCKET_ID_ANY); 775 if (!g_bdev_mgr.buf_small_pool) { 776 SPDK_ERRLOG("create rbuf small pool failed\n"); 777 spdk_bdev_init_complete(-1); 778 return; 779 } 780 781 cache_size = BUF_LARGE_POOL_SIZE / (2 * spdk_thread_get_count()); 782 snprintf(mempool_name, sizeof(mempool_name), "buf_large_pool_%d", getpid()); 783 784 g_bdev_mgr.buf_large_pool = spdk_mempool_create(mempool_name, 785 BUF_LARGE_POOL_SIZE, 786 SPDK_BDEV_LARGE_BUF_MAX_SIZE + 512, 787 cache_size, 788 SPDK_ENV_SOCKET_ID_ANY); 789 if (!g_bdev_mgr.buf_large_pool) { 790 SPDK_ERRLOG("create rbuf large pool failed\n"); 791 spdk_bdev_init_complete(-1); 792 return; 793 } 794 795 g_bdev_mgr.zero_buffer = spdk_dma_zmalloc(ZERO_BUFFER_SIZE, ZERO_BUFFER_SIZE, 796 NULL); 797 if (!g_bdev_mgr.zero_buffer) { 798 SPDK_ERRLOG("create bdev zero buffer failed\n"); 799 spdk_bdev_init_complete(-1); 800 return; 801 } 802 803 #ifdef SPDK_CONFIG_VTUNE 804 g_bdev_mgr.domain = __itt_domain_create("spdk_bdev"); 805 #endif 806 807 spdk_io_device_register(&g_bdev_mgr, spdk_bdev_mgmt_channel_create, 808 spdk_bdev_mgmt_channel_destroy, 809 sizeof(struct spdk_bdev_mgmt_channel)); 810 811 rc = spdk_bdev_modules_init(); 812 if (rc != 0) { 813 SPDK_ERRLOG("bdev modules init failed\n"); 814 spdk_bdev_init_complete(-1); 815 return; 816 } 817 818 spdk_bdev_module_action_complete(); 819 } 820 821 static void 822 spdk_bdev_mgr_unregister_cb(void *io_device) 823 { 824 spdk_bdev_fini_cb cb_fn = g_fini_cb_fn; 825 826 if (spdk_mempool_count(g_bdev_mgr.bdev_io_pool) != g_bdev_opts.bdev_io_pool_size) { 827 SPDK_ERRLOG("bdev IO pool count is %zu but should be %u\n", 828 spdk_mempool_count(g_bdev_mgr.bdev_io_pool), 829 g_bdev_opts.bdev_io_pool_size); 830 } 831 832 if (spdk_mempool_count(g_bdev_mgr.buf_small_pool) != BUF_SMALL_POOL_SIZE) { 833 SPDK_ERRLOG("Small buffer pool count is %zu but should be %u\n", 834 spdk_mempool_count(g_bdev_mgr.buf_small_pool), 835 BUF_SMALL_POOL_SIZE); 836 assert(false); 837 } 838 839 if (spdk_mempool_count(g_bdev_mgr.buf_large_pool) != BUF_LARGE_POOL_SIZE) { 840 SPDK_ERRLOG("Large buffer pool count is %zu but should be %u\n", 841 spdk_mempool_count(g_bdev_mgr.buf_large_pool), 842 BUF_LARGE_POOL_SIZE); 843 assert(false); 844 } 845 846 spdk_mempool_free(g_bdev_mgr.bdev_io_pool); 847 spdk_mempool_free(g_bdev_mgr.buf_small_pool); 848 spdk_mempool_free(g_bdev_mgr.buf_large_pool); 849 spdk_dma_free(g_bdev_mgr.zero_buffer); 850 851 cb_fn(g_fini_cb_arg); 852 g_fini_cb_fn = NULL; 853 g_fini_cb_arg = NULL; 854 } 855 856 static struct spdk_bdev_module *g_resume_bdev_module = NULL; 857 858 static void 859 spdk_bdev_module_finish_iter(void *arg) 860 { 861 struct spdk_bdev_module *bdev_module; 862 863 /* Start iterating from the last touched module */ 864 if (!g_resume_bdev_module) { 865 bdev_module = TAILQ_FIRST(&g_bdev_mgr.bdev_modules); 866 } else { 867 bdev_module = TAILQ_NEXT(g_resume_bdev_module, internal.tailq); 868 } 869 870 while (bdev_module) { 871 if (bdev_module->async_fini) { 872 /* Save our place so we can resume later. We must 873 * save the variable here, before calling module_fini() 874 * below, because in some cases the module may immediately 875 * call spdk_bdev_module_finish_done() and re-enter 876 * this function to continue iterating. */ 877 g_resume_bdev_module = bdev_module; 878 } 879 880 if (bdev_module->module_fini) { 881 bdev_module->module_fini(); 882 } 883 884 if (bdev_module->async_fini) { 885 return; 886 } 887 888 bdev_module = TAILQ_NEXT(bdev_module, internal.tailq); 889 } 890 891 g_resume_bdev_module = NULL; 892 spdk_io_device_unregister(&g_bdev_mgr, spdk_bdev_mgr_unregister_cb); 893 } 894 895 void 896 spdk_bdev_module_finish_done(void) 897 { 898 if (spdk_get_thread() != g_fini_thread) { 899 spdk_thread_send_msg(g_fini_thread, spdk_bdev_module_finish_iter, NULL); 900 } else { 901 spdk_bdev_module_finish_iter(NULL); 902 } 903 } 904 905 static void 906 _spdk_bdev_finish_unregister_bdevs_iter(void *cb_arg, int bdeverrno) 907 { 908 struct spdk_bdev *bdev = cb_arg; 909 910 if (bdeverrno && bdev) { 911 SPDK_WARNLOG("Unable to unregister bdev '%s' during spdk_bdev_finish()\n", 912 bdev->name); 913 914 /* 915 * Since the call to spdk_bdev_unregister() failed, we have no way to free this 916 * bdev; try to continue by manually removing this bdev from the list and continue 917 * with the next bdev in the list. 918 */ 919 TAILQ_REMOVE(&g_bdev_mgr.bdevs, bdev, internal.link); 920 } 921 922 if (TAILQ_EMPTY(&g_bdev_mgr.bdevs)) { 923 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Done unregistering bdevs\n"); 924 /* 925 * Bdev module finish need to be deffered as we might be in the middle of some context 926 * (like bdev part free) that will use this bdev (or private bdev driver ctx data) 927 * after returning. 928 */ 929 spdk_thread_send_msg(spdk_get_thread(), spdk_bdev_module_finish_iter, NULL); 930 return; 931 } 932 933 /* 934 * Unregister the first bdev in the list. 935 * 936 * spdk_bdev_unregister() will handle the case where the bdev has open descriptors by 937 * calling the remove_cb of the descriptors first. 938 * 939 * Once this bdev and all of its open descriptors have been cleaned up, this function 940 * will be called again via the unregister completion callback to continue the cleanup 941 * process with the next bdev. 942 */ 943 bdev = TAILQ_FIRST(&g_bdev_mgr.bdevs); 944 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Unregistering bdev '%s'\n", bdev->name); 945 spdk_bdev_unregister(bdev, _spdk_bdev_finish_unregister_bdevs_iter, bdev); 946 } 947 948 void 949 spdk_bdev_finish(spdk_bdev_fini_cb cb_fn, void *cb_arg) 950 { 951 assert(cb_fn != NULL); 952 953 g_fini_thread = spdk_get_thread(); 954 955 g_fini_cb_fn = cb_fn; 956 g_fini_cb_arg = cb_arg; 957 958 _spdk_bdev_finish_unregister_bdevs_iter(NULL, 0); 959 } 960 961 static struct spdk_bdev_io * 962 spdk_bdev_get_io(struct spdk_bdev_channel *channel) 963 { 964 struct spdk_bdev_mgmt_channel *ch = channel->shared_resource->mgmt_ch; 965 struct spdk_bdev_io *bdev_io; 966 967 if (ch->per_thread_cache_count > 0) { 968 bdev_io = STAILQ_FIRST(&ch->per_thread_cache); 969 STAILQ_REMOVE_HEAD(&ch->per_thread_cache, internal.buf_link); 970 ch->per_thread_cache_count--; 971 } else if (spdk_unlikely(!TAILQ_EMPTY(&ch->io_wait_queue))) { 972 /* 973 * Don't try to look for bdev_ios in the global pool if there are 974 * waiters on bdev_ios - we don't want this caller to jump the line. 975 */ 976 bdev_io = NULL; 977 } else { 978 bdev_io = spdk_mempool_get(g_bdev_mgr.bdev_io_pool); 979 } 980 981 return bdev_io; 982 } 983 984 void 985 spdk_bdev_free_io(struct spdk_bdev_io *bdev_io) 986 { 987 struct spdk_bdev_mgmt_channel *ch = bdev_io->internal.ch->shared_resource->mgmt_ch; 988 989 assert(bdev_io != NULL); 990 assert(bdev_io->internal.status != SPDK_BDEV_IO_STATUS_PENDING); 991 992 if (bdev_io->internal.buf != NULL) { 993 spdk_bdev_io_put_buf(bdev_io); 994 } 995 996 if (ch->per_thread_cache_count < ch->bdev_io_cache_size) { 997 ch->per_thread_cache_count++; 998 STAILQ_INSERT_TAIL(&ch->per_thread_cache, bdev_io, internal.buf_link); 999 while (ch->per_thread_cache_count > 0 && !TAILQ_EMPTY(&ch->io_wait_queue)) { 1000 struct spdk_bdev_io_wait_entry *entry; 1001 1002 entry = TAILQ_FIRST(&ch->io_wait_queue); 1003 TAILQ_REMOVE(&ch->io_wait_queue, entry, link); 1004 entry->cb_fn(entry->cb_arg); 1005 } 1006 } else { 1007 /* We should never have a full cache with entries on the io wait queue. */ 1008 assert(TAILQ_EMPTY(&ch->io_wait_queue)); 1009 spdk_mempool_put(g_bdev_mgr.bdev_io_pool, (void *)bdev_io); 1010 } 1011 } 1012 1013 static uint64_t 1014 _spdk_bdev_get_io_size_in_byte(struct spdk_bdev_io *bdev_io) 1015 { 1016 struct spdk_bdev *bdev = bdev_io->bdev; 1017 1018 switch (bdev_io->type) { 1019 case SPDK_BDEV_IO_TYPE_NVME_ADMIN: 1020 case SPDK_BDEV_IO_TYPE_NVME_IO: 1021 case SPDK_BDEV_IO_TYPE_NVME_IO_MD: 1022 return bdev_io->u.nvme_passthru.nbytes; 1023 case SPDK_BDEV_IO_TYPE_READ: 1024 case SPDK_BDEV_IO_TYPE_WRITE: 1025 case SPDK_BDEV_IO_TYPE_UNMAP: 1026 case SPDK_BDEV_IO_TYPE_WRITE_ZEROES: 1027 return bdev_io->u.bdev.num_blocks * bdev->blocklen; 1028 default: 1029 return 0; 1030 } 1031 } 1032 1033 static void 1034 _spdk_bdev_qos_io_submit(struct spdk_bdev_channel *ch) 1035 { 1036 struct spdk_bdev_io *bdev_io = NULL; 1037 struct spdk_bdev *bdev = ch->bdev; 1038 struct spdk_bdev_qos *qos = bdev->internal.qos; 1039 struct spdk_bdev_shared_resource *shared_resource = ch->shared_resource; 1040 1041 while (!TAILQ_EMPTY(&qos->queued)) { 1042 if (qos->max_ios_per_timeslice > 0 && 1043 qos->io_submitted_this_timeslice >= qos->max_ios_per_timeslice) { 1044 break; 1045 } 1046 1047 if (qos->max_byte_per_timeslice > 0 && 1048 qos->byte_submitted_this_timeslice >= qos->max_byte_per_timeslice) { 1049 break; 1050 } 1051 1052 bdev_io = TAILQ_FIRST(&qos->queued); 1053 TAILQ_REMOVE(&qos->queued, bdev_io, internal.link); 1054 qos->io_submitted_this_timeslice++; 1055 qos->byte_submitted_this_timeslice += _spdk_bdev_get_io_size_in_byte(bdev_io); 1056 ch->io_outstanding++; 1057 shared_resource->io_outstanding++; 1058 bdev->fn_table->submit_request(ch->channel, bdev_io); 1059 } 1060 } 1061 1062 static void 1063 _spdk_bdev_io_submit(void *ctx) 1064 { 1065 struct spdk_bdev_io *bdev_io = ctx; 1066 struct spdk_bdev *bdev = bdev_io->bdev; 1067 struct spdk_bdev_channel *bdev_ch = bdev_io->internal.ch; 1068 struct spdk_io_channel *ch = bdev_ch->channel; 1069 struct spdk_bdev_shared_resource *shared_resource = bdev_ch->shared_resource; 1070 1071 bdev_io->internal.submit_tsc = spdk_get_ticks(); 1072 bdev_ch->io_outstanding++; 1073 shared_resource->io_outstanding++; 1074 bdev_io->internal.in_submit_request = true; 1075 if (spdk_likely(bdev_ch->flags == 0)) { 1076 if (spdk_likely(TAILQ_EMPTY(&shared_resource->nomem_io))) { 1077 bdev->fn_table->submit_request(ch, bdev_io); 1078 } else { 1079 bdev_ch->io_outstanding--; 1080 shared_resource->io_outstanding--; 1081 TAILQ_INSERT_TAIL(&shared_resource->nomem_io, bdev_io, internal.link); 1082 } 1083 } else if (bdev_ch->flags & BDEV_CH_RESET_IN_PROGRESS) { 1084 spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED); 1085 } else if (bdev_ch->flags & BDEV_CH_QOS_ENABLED) { 1086 bdev_ch->io_outstanding--; 1087 shared_resource->io_outstanding--; 1088 TAILQ_INSERT_TAIL(&bdev->internal.qos->queued, bdev_io, internal.link); 1089 _spdk_bdev_qos_io_submit(bdev_ch); 1090 } else { 1091 SPDK_ERRLOG("unknown bdev_ch flag %x found\n", bdev_ch->flags); 1092 spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED); 1093 } 1094 bdev_io->internal.in_submit_request = false; 1095 } 1096 1097 static void 1098 spdk_bdev_io_submit(struct spdk_bdev_io *bdev_io) 1099 { 1100 struct spdk_bdev *bdev = bdev_io->bdev; 1101 struct spdk_thread *thread = spdk_io_channel_get_thread(bdev_io->internal.ch->channel); 1102 1103 assert(bdev_io->internal.status == SPDK_BDEV_IO_STATUS_PENDING); 1104 1105 if (bdev_io->internal.ch->flags & BDEV_CH_QOS_ENABLED) { 1106 if (thread == bdev->internal.qos->thread) { 1107 _spdk_bdev_io_submit(bdev_io); 1108 } else { 1109 bdev_io->internal.io_submit_ch = bdev_io->internal.ch; 1110 bdev_io->internal.ch = bdev->internal.qos->ch; 1111 spdk_thread_send_msg(bdev->internal.qos->thread, _spdk_bdev_io_submit, bdev_io); 1112 } 1113 } else { 1114 _spdk_bdev_io_submit(bdev_io); 1115 } 1116 } 1117 1118 static void 1119 spdk_bdev_io_submit_reset(struct spdk_bdev_io *bdev_io) 1120 { 1121 struct spdk_bdev *bdev = bdev_io->bdev; 1122 struct spdk_bdev_channel *bdev_ch = bdev_io->internal.ch; 1123 struct spdk_io_channel *ch = bdev_ch->channel; 1124 1125 assert(bdev_io->internal.status == SPDK_BDEV_IO_STATUS_PENDING); 1126 1127 bdev_io->internal.in_submit_request = true; 1128 bdev->fn_table->submit_request(ch, bdev_io); 1129 bdev_io->internal.in_submit_request = false; 1130 } 1131 1132 static void 1133 spdk_bdev_io_init(struct spdk_bdev_io *bdev_io, 1134 struct spdk_bdev *bdev, void *cb_arg, 1135 spdk_bdev_io_completion_cb cb) 1136 { 1137 bdev_io->bdev = bdev; 1138 bdev_io->internal.caller_ctx = cb_arg; 1139 bdev_io->internal.cb = cb; 1140 bdev_io->internal.status = SPDK_BDEV_IO_STATUS_PENDING; 1141 bdev_io->internal.in_submit_request = false; 1142 bdev_io->internal.buf = NULL; 1143 bdev_io->internal.io_submit_ch = NULL; 1144 } 1145 1146 static bool 1147 _spdk_bdev_io_type_supported(struct spdk_bdev *bdev, enum spdk_bdev_io_type io_type) 1148 { 1149 return bdev->fn_table->io_type_supported(bdev->ctxt, io_type); 1150 } 1151 1152 bool 1153 spdk_bdev_io_type_supported(struct spdk_bdev *bdev, enum spdk_bdev_io_type io_type) 1154 { 1155 bool supported; 1156 1157 supported = _spdk_bdev_io_type_supported(bdev, io_type); 1158 1159 if (!supported) { 1160 switch (io_type) { 1161 case SPDK_BDEV_IO_TYPE_WRITE_ZEROES: 1162 /* The bdev layer will emulate write zeroes as long as write is supported. */ 1163 supported = _spdk_bdev_io_type_supported(bdev, SPDK_BDEV_IO_TYPE_WRITE); 1164 break; 1165 default: 1166 break; 1167 } 1168 } 1169 1170 return supported; 1171 } 1172 1173 int 1174 spdk_bdev_dump_info_json(struct spdk_bdev *bdev, struct spdk_json_write_ctx *w) 1175 { 1176 if (bdev->fn_table->dump_info_json) { 1177 return bdev->fn_table->dump_info_json(bdev->ctxt, w); 1178 } 1179 1180 return 0; 1181 } 1182 1183 void 1184 spdk_bdev_config_json(struct spdk_bdev *bdev, struct spdk_json_write_ctx *w) 1185 { 1186 assert(bdev != NULL); 1187 assert(w != NULL); 1188 1189 if (bdev->fn_table->write_config_json) { 1190 bdev->fn_table->write_config_json(bdev, w); 1191 } else { 1192 spdk_json_write_object_begin(w); 1193 spdk_json_write_named_string(w, "name", bdev->name); 1194 spdk_json_write_object_end(w); 1195 } 1196 } 1197 1198 static void 1199 spdk_bdev_qos_update_max_quota_per_timeslice(struct spdk_bdev_qos *qos) 1200 { 1201 uint64_t max_ios_per_timeslice = 0, max_byte_per_timeslice = 0; 1202 1203 if (qos->iops_rate_limit > 0) { 1204 max_ios_per_timeslice = qos->iops_rate_limit * SPDK_BDEV_QOS_TIMESLICE_IN_USEC / 1205 SPDK_BDEV_SEC_TO_USEC; 1206 qos->max_ios_per_timeslice = spdk_max(max_ios_per_timeslice, 1207 SPDK_BDEV_QOS_MIN_IO_PER_TIMESLICE); 1208 } 1209 1210 if (qos->byte_rate_limit > 0) { 1211 max_byte_per_timeslice = qos->byte_rate_limit * SPDK_BDEV_QOS_TIMESLICE_IN_USEC / 1212 SPDK_BDEV_SEC_TO_USEC; 1213 qos->max_byte_per_timeslice = spdk_max(max_byte_per_timeslice, 1214 SPDK_BDEV_QOS_MIN_BYTE_PER_TIMESLICE); 1215 } 1216 } 1217 1218 static int 1219 spdk_bdev_channel_poll_qos(void *arg) 1220 { 1221 struct spdk_bdev_qos *qos = arg; 1222 1223 /* Reset for next round of rate limiting */ 1224 qos->io_submitted_this_timeslice = 0; 1225 qos->byte_submitted_this_timeslice = 0; 1226 1227 _spdk_bdev_qos_io_submit(qos->ch); 1228 1229 return -1; 1230 } 1231 1232 static void 1233 _spdk_bdev_channel_destroy_resource(struct spdk_bdev_channel *ch) 1234 { 1235 struct spdk_bdev_shared_resource *shared_resource; 1236 1237 if (!ch) { 1238 return; 1239 } 1240 1241 if (ch->channel) { 1242 spdk_put_io_channel(ch->channel); 1243 } 1244 1245 assert(ch->io_outstanding == 0); 1246 1247 shared_resource = ch->shared_resource; 1248 if (shared_resource) { 1249 assert(ch->io_outstanding == 0); 1250 assert(shared_resource->ref > 0); 1251 shared_resource->ref--; 1252 if (shared_resource->ref == 0) { 1253 assert(shared_resource->io_outstanding == 0); 1254 TAILQ_REMOVE(&shared_resource->mgmt_ch->shared_resources, shared_resource, link); 1255 spdk_put_io_channel(spdk_io_channel_from_ctx(shared_resource->mgmt_ch)); 1256 free(shared_resource); 1257 } 1258 } 1259 } 1260 1261 /* Caller must hold bdev->internal.mutex. */ 1262 static int 1263 _spdk_bdev_enable_qos(struct spdk_bdev *bdev, struct spdk_bdev_channel *ch) 1264 { 1265 struct spdk_bdev_qos *qos = bdev->internal.qos; 1266 1267 /* Rate limiting on this bdev enabled */ 1268 if (qos) { 1269 if (qos->ch == NULL) { 1270 struct spdk_io_channel *io_ch; 1271 1272 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Selecting channel %p as QoS channel for bdev %s on thread %p\n", ch, 1273 bdev->name, spdk_get_thread()); 1274 1275 /* No qos channel has been selected, so set one up */ 1276 1277 /* Take another reference to ch */ 1278 io_ch = spdk_get_io_channel(__bdev_to_io_dev(bdev)); 1279 qos->ch = ch; 1280 1281 qos->thread = spdk_io_channel_get_thread(io_ch); 1282 1283 TAILQ_INIT(&qos->queued); 1284 spdk_bdev_qos_update_max_quota_per_timeslice(qos); 1285 qos->io_submitted_this_timeslice = 0; 1286 qos->byte_submitted_this_timeslice = 0; 1287 1288 qos->poller = spdk_poller_register(spdk_bdev_channel_poll_qos, 1289 qos, 1290 SPDK_BDEV_QOS_TIMESLICE_IN_USEC); 1291 } 1292 1293 ch->flags |= BDEV_CH_QOS_ENABLED; 1294 } 1295 1296 return 0; 1297 } 1298 1299 static int 1300 spdk_bdev_channel_create(void *io_device, void *ctx_buf) 1301 { 1302 struct spdk_bdev *bdev = __bdev_from_io_dev(io_device); 1303 struct spdk_bdev_channel *ch = ctx_buf; 1304 struct spdk_io_channel *mgmt_io_ch; 1305 struct spdk_bdev_mgmt_channel *mgmt_ch; 1306 struct spdk_bdev_shared_resource *shared_resource; 1307 1308 ch->bdev = bdev; 1309 ch->channel = bdev->fn_table->get_io_channel(bdev->ctxt); 1310 if (!ch->channel) { 1311 return -1; 1312 } 1313 1314 mgmt_io_ch = spdk_get_io_channel(&g_bdev_mgr); 1315 if (!mgmt_io_ch) { 1316 return -1; 1317 } 1318 1319 mgmt_ch = spdk_io_channel_get_ctx(mgmt_io_ch); 1320 TAILQ_FOREACH(shared_resource, &mgmt_ch->shared_resources, link) { 1321 if (shared_resource->shared_ch == ch->channel) { 1322 spdk_put_io_channel(mgmt_io_ch); 1323 shared_resource->ref++; 1324 break; 1325 } 1326 } 1327 1328 if (shared_resource == NULL) { 1329 shared_resource = calloc(1, sizeof(*shared_resource)); 1330 if (shared_resource == NULL) { 1331 spdk_put_io_channel(mgmt_io_ch); 1332 return -1; 1333 } 1334 1335 shared_resource->mgmt_ch = mgmt_ch; 1336 shared_resource->io_outstanding = 0; 1337 TAILQ_INIT(&shared_resource->nomem_io); 1338 shared_resource->nomem_threshold = 0; 1339 shared_resource->shared_ch = ch->channel; 1340 shared_resource->ref = 1; 1341 TAILQ_INSERT_TAIL(&mgmt_ch->shared_resources, shared_resource, link); 1342 } 1343 1344 memset(&ch->stat, 0, sizeof(ch->stat)); 1345 ch->stat.ticks_rate = spdk_get_ticks_hz(); 1346 ch->io_outstanding = 0; 1347 TAILQ_INIT(&ch->queued_resets); 1348 ch->flags = 0; 1349 ch->shared_resource = shared_resource; 1350 1351 #ifdef SPDK_CONFIG_VTUNE 1352 { 1353 char *name; 1354 __itt_init_ittlib(NULL, 0); 1355 name = spdk_sprintf_alloc("spdk_bdev_%s_%p", ch->bdev->name, ch); 1356 if (!name) { 1357 _spdk_bdev_channel_destroy_resource(ch); 1358 return -1; 1359 } 1360 ch->handle = __itt_string_handle_create(name); 1361 free(name); 1362 ch->start_tsc = spdk_get_ticks(); 1363 ch->interval_tsc = spdk_get_ticks_hz() / 100; 1364 memset(&ch->prev_stat, 0, sizeof(ch->prev_stat)); 1365 } 1366 #endif 1367 1368 pthread_mutex_lock(&bdev->internal.mutex); 1369 1370 if (_spdk_bdev_enable_qos(bdev, ch)) { 1371 _spdk_bdev_channel_destroy_resource(ch); 1372 pthread_mutex_unlock(&bdev->internal.mutex); 1373 return -1; 1374 } 1375 1376 pthread_mutex_unlock(&bdev->internal.mutex); 1377 1378 return 0; 1379 } 1380 1381 /* 1382 * Abort I/O that are waiting on a data buffer. These types of I/O are 1383 * linked using the spdk_bdev_io internal.buf_link TAILQ_ENTRY. 1384 */ 1385 static void 1386 _spdk_bdev_abort_buf_io(bdev_io_stailq_t *queue, struct spdk_bdev_channel *ch) 1387 { 1388 bdev_io_stailq_t tmp; 1389 struct spdk_bdev_io *bdev_io; 1390 1391 STAILQ_INIT(&tmp); 1392 1393 while (!STAILQ_EMPTY(queue)) { 1394 bdev_io = STAILQ_FIRST(queue); 1395 STAILQ_REMOVE_HEAD(queue, internal.buf_link); 1396 if (bdev_io->internal.ch == ch) { 1397 spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED); 1398 } else { 1399 STAILQ_INSERT_TAIL(&tmp, bdev_io, internal.buf_link); 1400 } 1401 } 1402 1403 STAILQ_SWAP(&tmp, queue, spdk_bdev_io); 1404 } 1405 1406 /* 1407 * Abort I/O that are queued waiting for submission. These types of I/O are 1408 * linked using the spdk_bdev_io link TAILQ_ENTRY. 1409 */ 1410 static void 1411 _spdk_bdev_abort_queued_io(bdev_io_tailq_t *queue, struct spdk_bdev_channel *ch) 1412 { 1413 struct spdk_bdev_io *bdev_io, *tmp; 1414 1415 TAILQ_FOREACH_SAFE(bdev_io, queue, internal.link, tmp) { 1416 if (bdev_io->internal.ch == ch) { 1417 TAILQ_REMOVE(queue, bdev_io, internal.link); 1418 /* 1419 * spdk_bdev_io_complete() assumes that the completed I/O had 1420 * been submitted to the bdev module. Since in this case it 1421 * hadn't, bump io_outstanding to account for the decrement 1422 * that spdk_bdev_io_complete() will do. 1423 */ 1424 if (bdev_io->type != SPDK_BDEV_IO_TYPE_RESET) { 1425 ch->io_outstanding++; 1426 ch->shared_resource->io_outstanding++; 1427 } 1428 spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED); 1429 } 1430 } 1431 } 1432 1433 static void 1434 spdk_bdev_qos_channel_destroy(void *cb_arg) 1435 { 1436 struct spdk_bdev_qos *qos = cb_arg; 1437 1438 spdk_put_io_channel(spdk_io_channel_from_ctx(qos->ch)); 1439 spdk_poller_unregister(&qos->poller); 1440 1441 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Free QoS %p.\n", qos); 1442 1443 free(qos); 1444 } 1445 1446 static int 1447 spdk_bdev_qos_destroy(struct spdk_bdev *bdev) 1448 { 1449 /* 1450 * Cleanly shutting down the QoS poller is tricky, because 1451 * during the asynchronous operation the user could open 1452 * a new descriptor and create a new channel, spawning 1453 * a new QoS poller. 1454 * 1455 * The strategy is to create a new QoS structure here and swap it 1456 * in. The shutdown path then continues to refer to the old one 1457 * until it completes and then releases it. 1458 */ 1459 struct spdk_bdev_qos *new_qos, *old_qos; 1460 1461 old_qos = bdev->internal.qos; 1462 1463 new_qos = calloc(1, sizeof(*new_qos)); 1464 if (!new_qos) { 1465 SPDK_ERRLOG("Unable to allocate memory to shut down QoS.\n"); 1466 return -ENOMEM; 1467 } 1468 1469 /* Copy the old QoS data into the newly allocated structure */ 1470 memcpy(new_qos, old_qos, sizeof(*new_qos)); 1471 1472 /* Zero out the key parts of the QoS structure */ 1473 new_qos->ch = NULL; 1474 new_qos->thread = NULL; 1475 new_qos->max_ios_per_timeslice = 0; 1476 new_qos->max_byte_per_timeslice = 0; 1477 new_qos->io_submitted_this_timeslice = 0; 1478 new_qos->byte_submitted_this_timeslice = 0; 1479 new_qos->poller = NULL; 1480 TAILQ_INIT(&new_qos->queued); 1481 1482 bdev->internal.qos = new_qos; 1483 1484 spdk_thread_send_msg(old_qos->thread, spdk_bdev_qos_channel_destroy, 1485 old_qos); 1486 1487 /* It is safe to continue with destroying the bdev even though the QoS channel hasn't 1488 * been destroyed yet. The destruction path will end up waiting for the final 1489 * channel to be put before it releases resources. */ 1490 1491 return 0; 1492 } 1493 1494 static void 1495 spdk_bdev_channel_destroy(void *io_device, void *ctx_buf) 1496 { 1497 struct spdk_bdev_channel *ch = ctx_buf; 1498 struct spdk_bdev_mgmt_channel *mgmt_ch; 1499 struct spdk_bdev_shared_resource *shared_resource = ch->shared_resource; 1500 1501 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Destroying channel %p for bdev %s on thread %p\n", ch, ch->bdev->name, 1502 spdk_get_thread()); 1503 1504 mgmt_ch = shared_resource->mgmt_ch; 1505 1506 _spdk_bdev_abort_queued_io(&ch->queued_resets, ch); 1507 _spdk_bdev_abort_queued_io(&shared_resource->nomem_io, ch); 1508 _spdk_bdev_abort_buf_io(&mgmt_ch->need_buf_small, ch); 1509 _spdk_bdev_abort_buf_io(&mgmt_ch->need_buf_large, ch); 1510 1511 _spdk_bdev_channel_destroy_resource(ch); 1512 } 1513 1514 int 1515 spdk_bdev_alias_add(struct spdk_bdev *bdev, const char *alias) 1516 { 1517 struct spdk_bdev_alias *tmp; 1518 1519 if (alias == NULL) { 1520 SPDK_ERRLOG("Empty alias passed\n"); 1521 return -EINVAL; 1522 } 1523 1524 if (spdk_bdev_get_by_name(alias)) { 1525 SPDK_ERRLOG("Bdev name/alias: %s already exists\n", alias); 1526 return -EEXIST; 1527 } 1528 1529 tmp = calloc(1, sizeof(*tmp)); 1530 if (tmp == NULL) { 1531 SPDK_ERRLOG("Unable to allocate alias\n"); 1532 return -ENOMEM; 1533 } 1534 1535 tmp->alias = strdup(alias); 1536 if (tmp->alias == NULL) { 1537 free(tmp); 1538 SPDK_ERRLOG("Unable to allocate alias\n"); 1539 return -ENOMEM; 1540 } 1541 1542 TAILQ_INSERT_TAIL(&bdev->aliases, tmp, tailq); 1543 1544 return 0; 1545 } 1546 1547 int 1548 spdk_bdev_alias_del(struct spdk_bdev *bdev, const char *alias) 1549 { 1550 struct spdk_bdev_alias *tmp; 1551 1552 TAILQ_FOREACH(tmp, &bdev->aliases, tailq) { 1553 if (strcmp(alias, tmp->alias) == 0) { 1554 TAILQ_REMOVE(&bdev->aliases, tmp, tailq); 1555 free(tmp->alias); 1556 free(tmp); 1557 return 0; 1558 } 1559 } 1560 1561 SPDK_INFOLOG(SPDK_LOG_BDEV, "Alias %s does not exists\n", alias); 1562 1563 return -ENOENT; 1564 } 1565 1566 struct spdk_io_channel * 1567 spdk_bdev_get_io_channel(struct spdk_bdev_desc *desc) 1568 { 1569 return spdk_get_io_channel(__bdev_to_io_dev(desc->bdev)); 1570 } 1571 1572 const char * 1573 spdk_bdev_get_name(const struct spdk_bdev *bdev) 1574 { 1575 return bdev->name; 1576 } 1577 1578 const char * 1579 spdk_bdev_get_product_name(const struct spdk_bdev *bdev) 1580 { 1581 return bdev->product_name; 1582 } 1583 1584 const struct spdk_bdev_aliases_list * 1585 spdk_bdev_get_aliases(const struct spdk_bdev *bdev) 1586 { 1587 return &bdev->aliases; 1588 } 1589 1590 uint32_t 1591 spdk_bdev_get_block_size(const struct spdk_bdev *bdev) 1592 { 1593 return bdev->blocklen; 1594 } 1595 1596 uint64_t 1597 spdk_bdev_get_num_blocks(const struct spdk_bdev *bdev) 1598 { 1599 return bdev->blockcnt; 1600 } 1601 1602 uint64_t 1603 spdk_bdev_get_qos_ios_per_sec(struct spdk_bdev *bdev) 1604 { 1605 uint64_t iops_rate_limit = 0; 1606 1607 pthread_mutex_lock(&bdev->internal.mutex); 1608 if (bdev->internal.qos) { 1609 iops_rate_limit = bdev->internal.qos->iops_rate_limit; 1610 } 1611 pthread_mutex_unlock(&bdev->internal.mutex); 1612 1613 return iops_rate_limit; 1614 } 1615 1616 size_t 1617 spdk_bdev_get_buf_align(const struct spdk_bdev *bdev) 1618 { 1619 /* TODO: push this logic down to the bdev modules */ 1620 if (bdev->need_aligned_buffer) { 1621 return bdev->blocklen; 1622 } 1623 1624 return 1; 1625 } 1626 1627 uint32_t 1628 spdk_bdev_get_optimal_io_boundary(const struct spdk_bdev *bdev) 1629 { 1630 return bdev->optimal_io_boundary; 1631 } 1632 1633 bool 1634 spdk_bdev_has_write_cache(const struct spdk_bdev *bdev) 1635 { 1636 return bdev->write_cache; 1637 } 1638 1639 const struct spdk_uuid * 1640 spdk_bdev_get_uuid(const struct spdk_bdev *bdev) 1641 { 1642 return &bdev->uuid; 1643 } 1644 1645 int 1646 spdk_bdev_notify_blockcnt_change(struct spdk_bdev *bdev, uint64_t size) 1647 { 1648 int ret; 1649 1650 pthread_mutex_lock(&bdev->internal.mutex); 1651 1652 /* bdev has open descriptors */ 1653 if (!TAILQ_EMPTY(&bdev->internal.open_descs) && 1654 bdev->blockcnt > size) { 1655 ret = -EBUSY; 1656 } else { 1657 bdev->blockcnt = size; 1658 ret = 0; 1659 } 1660 1661 pthread_mutex_unlock(&bdev->internal.mutex); 1662 1663 return ret; 1664 } 1665 1666 /* 1667 * Convert I/O offset and length from bytes to blocks. 1668 * 1669 * Returns zero on success or non-zero if the byte parameters aren't divisible by the block size. 1670 */ 1671 static uint64_t 1672 spdk_bdev_bytes_to_blocks(struct spdk_bdev *bdev, uint64_t offset_bytes, uint64_t *offset_blocks, 1673 uint64_t num_bytes, uint64_t *num_blocks) 1674 { 1675 uint32_t block_size = bdev->blocklen; 1676 1677 *offset_blocks = offset_bytes / block_size; 1678 *num_blocks = num_bytes / block_size; 1679 1680 return (offset_bytes % block_size) | (num_bytes % block_size); 1681 } 1682 1683 static bool 1684 spdk_bdev_io_valid_blocks(struct spdk_bdev *bdev, uint64_t offset_blocks, uint64_t num_blocks) 1685 { 1686 /* Return failure if offset_blocks + num_blocks is less than offset_blocks; indicates there 1687 * has been an overflow and hence the offset has been wrapped around */ 1688 if (offset_blocks + num_blocks < offset_blocks) { 1689 return false; 1690 } 1691 1692 /* Return failure if offset_blocks + num_blocks exceeds the size of the bdev */ 1693 if (offset_blocks + num_blocks > bdev->blockcnt) { 1694 return false; 1695 } 1696 1697 return true; 1698 } 1699 1700 int 1701 spdk_bdev_read(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 1702 void *buf, uint64_t offset, uint64_t nbytes, 1703 spdk_bdev_io_completion_cb cb, void *cb_arg) 1704 { 1705 uint64_t offset_blocks, num_blocks; 1706 1707 if (spdk_bdev_bytes_to_blocks(desc->bdev, offset, &offset_blocks, nbytes, &num_blocks) != 0) { 1708 return -EINVAL; 1709 } 1710 1711 return spdk_bdev_read_blocks(desc, ch, buf, offset_blocks, num_blocks, cb, cb_arg); 1712 } 1713 1714 int 1715 spdk_bdev_read_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 1716 void *buf, uint64_t offset_blocks, uint64_t num_blocks, 1717 spdk_bdev_io_completion_cb cb, void *cb_arg) 1718 { 1719 struct spdk_bdev *bdev = desc->bdev; 1720 struct spdk_bdev_io *bdev_io; 1721 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 1722 1723 if (!spdk_bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) { 1724 return -EINVAL; 1725 } 1726 1727 bdev_io = spdk_bdev_get_io(channel); 1728 if (!bdev_io) { 1729 return -ENOMEM; 1730 } 1731 1732 bdev_io->internal.ch = channel; 1733 bdev_io->type = SPDK_BDEV_IO_TYPE_READ; 1734 bdev_io->u.bdev.iovs = &bdev_io->iov; 1735 bdev_io->u.bdev.iovs[0].iov_base = buf; 1736 bdev_io->u.bdev.iovs[0].iov_len = num_blocks * bdev->blocklen; 1737 bdev_io->u.bdev.iovcnt = 1; 1738 bdev_io->u.bdev.num_blocks = num_blocks; 1739 bdev_io->u.bdev.offset_blocks = offset_blocks; 1740 spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb); 1741 1742 spdk_bdev_io_submit(bdev_io); 1743 return 0; 1744 } 1745 1746 int 1747 spdk_bdev_readv(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 1748 struct iovec *iov, int iovcnt, 1749 uint64_t offset, uint64_t nbytes, 1750 spdk_bdev_io_completion_cb cb, void *cb_arg) 1751 { 1752 uint64_t offset_blocks, num_blocks; 1753 1754 if (spdk_bdev_bytes_to_blocks(desc->bdev, offset, &offset_blocks, nbytes, &num_blocks) != 0) { 1755 return -EINVAL; 1756 } 1757 1758 return spdk_bdev_readv_blocks(desc, ch, iov, iovcnt, offset_blocks, num_blocks, cb, cb_arg); 1759 } 1760 1761 int spdk_bdev_readv_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 1762 struct iovec *iov, int iovcnt, 1763 uint64_t offset_blocks, uint64_t num_blocks, 1764 spdk_bdev_io_completion_cb cb, void *cb_arg) 1765 { 1766 struct spdk_bdev *bdev = desc->bdev; 1767 struct spdk_bdev_io *bdev_io; 1768 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 1769 1770 if (!spdk_bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) { 1771 return -EINVAL; 1772 } 1773 1774 bdev_io = spdk_bdev_get_io(channel); 1775 if (!bdev_io) { 1776 return -ENOMEM; 1777 } 1778 1779 bdev_io->internal.ch = channel; 1780 bdev_io->type = SPDK_BDEV_IO_TYPE_READ; 1781 bdev_io->u.bdev.iovs = iov; 1782 bdev_io->u.bdev.iovcnt = iovcnt; 1783 bdev_io->u.bdev.num_blocks = num_blocks; 1784 bdev_io->u.bdev.offset_blocks = offset_blocks; 1785 spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb); 1786 1787 spdk_bdev_io_submit(bdev_io); 1788 return 0; 1789 } 1790 1791 int 1792 spdk_bdev_write(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 1793 void *buf, uint64_t offset, uint64_t nbytes, 1794 spdk_bdev_io_completion_cb cb, void *cb_arg) 1795 { 1796 uint64_t offset_blocks, num_blocks; 1797 1798 if (spdk_bdev_bytes_to_blocks(desc->bdev, offset, &offset_blocks, nbytes, &num_blocks) != 0) { 1799 return -EINVAL; 1800 } 1801 1802 return spdk_bdev_write_blocks(desc, ch, buf, offset_blocks, num_blocks, cb, cb_arg); 1803 } 1804 1805 int 1806 spdk_bdev_write_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 1807 void *buf, uint64_t offset_blocks, uint64_t num_blocks, 1808 spdk_bdev_io_completion_cb cb, void *cb_arg) 1809 { 1810 struct spdk_bdev *bdev = desc->bdev; 1811 struct spdk_bdev_io *bdev_io; 1812 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 1813 1814 if (!desc->write) { 1815 return -EBADF; 1816 } 1817 1818 if (!spdk_bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) { 1819 return -EINVAL; 1820 } 1821 1822 bdev_io = spdk_bdev_get_io(channel); 1823 if (!bdev_io) { 1824 return -ENOMEM; 1825 } 1826 1827 bdev_io->internal.ch = channel; 1828 bdev_io->type = SPDK_BDEV_IO_TYPE_WRITE; 1829 bdev_io->u.bdev.iovs = &bdev_io->iov; 1830 bdev_io->u.bdev.iovs[0].iov_base = buf; 1831 bdev_io->u.bdev.iovs[0].iov_len = num_blocks * bdev->blocklen; 1832 bdev_io->u.bdev.iovcnt = 1; 1833 bdev_io->u.bdev.num_blocks = num_blocks; 1834 bdev_io->u.bdev.offset_blocks = offset_blocks; 1835 spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb); 1836 1837 spdk_bdev_io_submit(bdev_io); 1838 return 0; 1839 } 1840 1841 int 1842 spdk_bdev_writev(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 1843 struct iovec *iov, int iovcnt, 1844 uint64_t offset, uint64_t len, 1845 spdk_bdev_io_completion_cb cb, void *cb_arg) 1846 { 1847 uint64_t offset_blocks, num_blocks; 1848 1849 if (spdk_bdev_bytes_to_blocks(desc->bdev, offset, &offset_blocks, len, &num_blocks) != 0) { 1850 return -EINVAL; 1851 } 1852 1853 return spdk_bdev_writev_blocks(desc, ch, iov, iovcnt, offset_blocks, num_blocks, cb, cb_arg); 1854 } 1855 1856 int 1857 spdk_bdev_writev_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 1858 struct iovec *iov, int iovcnt, 1859 uint64_t offset_blocks, uint64_t num_blocks, 1860 spdk_bdev_io_completion_cb cb, void *cb_arg) 1861 { 1862 struct spdk_bdev *bdev = desc->bdev; 1863 struct spdk_bdev_io *bdev_io; 1864 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 1865 1866 if (!desc->write) { 1867 return -EBADF; 1868 } 1869 1870 if (!spdk_bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) { 1871 return -EINVAL; 1872 } 1873 1874 bdev_io = spdk_bdev_get_io(channel); 1875 if (!bdev_io) { 1876 return -ENOMEM; 1877 } 1878 1879 bdev_io->internal.ch = channel; 1880 bdev_io->type = SPDK_BDEV_IO_TYPE_WRITE; 1881 bdev_io->u.bdev.iovs = iov; 1882 bdev_io->u.bdev.iovcnt = iovcnt; 1883 bdev_io->u.bdev.num_blocks = num_blocks; 1884 bdev_io->u.bdev.offset_blocks = offset_blocks; 1885 spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb); 1886 1887 spdk_bdev_io_submit(bdev_io); 1888 return 0; 1889 } 1890 1891 int 1892 spdk_bdev_write_zeroes(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 1893 uint64_t offset, uint64_t len, 1894 spdk_bdev_io_completion_cb cb, void *cb_arg) 1895 { 1896 uint64_t offset_blocks, num_blocks; 1897 1898 if (spdk_bdev_bytes_to_blocks(desc->bdev, offset, &offset_blocks, len, &num_blocks) != 0) { 1899 return -EINVAL; 1900 } 1901 1902 return spdk_bdev_write_zeroes_blocks(desc, ch, offset_blocks, num_blocks, cb, cb_arg); 1903 } 1904 1905 int 1906 spdk_bdev_write_zeroes_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 1907 uint64_t offset_blocks, uint64_t num_blocks, 1908 spdk_bdev_io_completion_cb cb, void *cb_arg) 1909 { 1910 struct spdk_bdev *bdev = desc->bdev; 1911 struct spdk_bdev_io *bdev_io; 1912 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 1913 uint64_t len; 1914 bool split_request = false; 1915 1916 if (!desc->write) { 1917 return -EBADF; 1918 } 1919 1920 if (!spdk_bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) { 1921 return -EINVAL; 1922 } 1923 1924 bdev_io = spdk_bdev_get_io(channel); 1925 1926 if (!bdev_io) { 1927 return -ENOMEM; 1928 } 1929 1930 bdev_io->internal.ch = channel; 1931 bdev_io->u.bdev.offset_blocks = offset_blocks; 1932 1933 if (_spdk_bdev_io_type_supported(bdev, SPDK_BDEV_IO_TYPE_WRITE_ZEROES)) { 1934 bdev_io->type = SPDK_BDEV_IO_TYPE_WRITE_ZEROES; 1935 bdev_io->u.bdev.num_blocks = num_blocks; 1936 bdev_io->u.bdev.iovs = NULL; 1937 bdev_io->u.bdev.iovcnt = 0; 1938 1939 } else if (_spdk_bdev_io_type_supported(bdev, SPDK_BDEV_IO_TYPE_WRITE)) { 1940 assert(spdk_bdev_get_block_size(bdev) <= ZERO_BUFFER_SIZE); 1941 1942 len = spdk_bdev_get_block_size(bdev) * num_blocks; 1943 1944 if (len > ZERO_BUFFER_SIZE) { 1945 split_request = true; 1946 len = ZERO_BUFFER_SIZE; 1947 } 1948 1949 bdev_io->type = SPDK_BDEV_IO_TYPE_WRITE; 1950 bdev_io->u.bdev.iovs = &bdev_io->iov; 1951 bdev_io->u.bdev.iovs[0].iov_base = g_bdev_mgr.zero_buffer; 1952 bdev_io->u.bdev.iovs[0].iov_len = len; 1953 bdev_io->u.bdev.iovcnt = 1; 1954 bdev_io->u.bdev.num_blocks = len / spdk_bdev_get_block_size(bdev); 1955 bdev_io->u.bdev.split_remaining_num_blocks = num_blocks - bdev_io->u.bdev.num_blocks; 1956 bdev_io->u.bdev.split_current_offset_blocks = offset_blocks + bdev_io->u.bdev.num_blocks; 1957 } else { 1958 spdk_bdev_free_io(bdev_io); 1959 return -ENOTSUP; 1960 } 1961 1962 if (split_request) { 1963 bdev_io->u.bdev.stored_user_cb = cb; 1964 spdk_bdev_io_init(bdev_io, bdev, cb_arg, spdk_bdev_write_zeroes_split); 1965 } else { 1966 spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb); 1967 } 1968 spdk_bdev_io_submit(bdev_io); 1969 return 0; 1970 } 1971 1972 int 1973 spdk_bdev_unmap(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 1974 uint64_t offset, uint64_t nbytes, 1975 spdk_bdev_io_completion_cb cb, void *cb_arg) 1976 { 1977 uint64_t offset_blocks, num_blocks; 1978 1979 if (spdk_bdev_bytes_to_blocks(desc->bdev, offset, &offset_blocks, nbytes, &num_blocks) != 0) { 1980 return -EINVAL; 1981 } 1982 1983 return spdk_bdev_unmap_blocks(desc, ch, offset_blocks, num_blocks, cb, cb_arg); 1984 } 1985 1986 int 1987 spdk_bdev_unmap_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 1988 uint64_t offset_blocks, uint64_t num_blocks, 1989 spdk_bdev_io_completion_cb cb, void *cb_arg) 1990 { 1991 struct spdk_bdev *bdev = desc->bdev; 1992 struct spdk_bdev_io *bdev_io; 1993 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 1994 1995 if (!desc->write) { 1996 return -EBADF; 1997 } 1998 1999 if (!spdk_bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) { 2000 return -EINVAL; 2001 } 2002 2003 if (num_blocks == 0) { 2004 SPDK_ERRLOG("Can't unmap 0 bytes\n"); 2005 return -EINVAL; 2006 } 2007 2008 bdev_io = spdk_bdev_get_io(channel); 2009 if (!bdev_io) { 2010 return -ENOMEM; 2011 } 2012 2013 bdev_io->internal.ch = channel; 2014 bdev_io->type = SPDK_BDEV_IO_TYPE_UNMAP; 2015 2016 bdev_io->u.bdev.iovs = &bdev_io->iov; 2017 bdev_io->u.bdev.iovs[0].iov_base = NULL; 2018 bdev_io->u.bdev.iovs[0].iov_len = 0; 2019 bdev_io->u.bdev.iovcnt = 1; 2020 2021 bdev_io->u.bdev.offset_blocks = offset_blocks; 2022 bdev_io->u.bdev.num_blocks = num_blocks; 2023 spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb); 2024 2025 spdk_bdev_io_submit(bdev_io); 2026 return 0; 2027 } 2028 2029 int 2030 spdk_bdev_flush(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 2031 uint64_t offset, uint64_t length, 2032 spdk_bdev_io_completion_cb cb, void *cb_arg) 2033 { 2034 uint64_t offset_blocks, num_blocks; 2035 2036 if (spdk_bdev_bytes_to_blocks(desc->bdev, offset, &offset_blocks, length, &num_blocks) != 0) { 2037 return -EINVAL; 2038 } 2039 2040 return spdk_bdev_flush_blocks(desc, ch, offset_blocks, num_blocks, cb, cb_arg); 2041 } 2042 2043 int 2044 spdk_bdev_flush_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 2045 uint64_t offset_blocks, uint64_t num_blocks, 2046 spdk_bdev_io_completion_cb cb, void *cb_arg) 2047 { 2048 struct spdk_bdev *bdev = desc->bdev; 2049 struct spdk_bdev_io *bdev_io; 2050 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 2051 2052 if (!desc->write) { 2053 return -EBADF; 2054 } 2055 2056 if (!spdk_bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) { 2057 return -EINVAL; 2058 } 2059 2060 bdev_io = spdk_bdev_get_io(channel); 2061 if (!bdev_io) { 2062 return -ENOMEM; 2063 } 2064 2065 bdev_io->internal.ch = channel; 2066 bdev_io->type = SPDK_BDEV_IO_TYPE_FLUSH; 2067 bdev_io->u.bdev.iovs = NULL; 2068 bdev_io->u.bdev.iovcnt = 0; 2069 bdev_io->u.bdev.offset_blocks = offset_blocks; 2070 bdev_io->u.bdev.num_blocks = num_blocks; 2071 spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb); 2072 2073 spdk_bdev_io_submit(bdev_io); 2074 return 0; 2075 } 2076 2077 static void 2078 _spdk_bdev_reset_dev(struct spdk_io_channel_iter *i, int status) 2079 { 2080 struct spdk_bdev_channel *ch = spdk_io_channel_iter_get_ctx(i); 2081 struct spdk_bdev_io *bdev_io; 2082 2083 bdev_io = TAILQ_FIRST(&ch->queued_resets); 2084 TAILQ_REMOVE(&ch->queued_resets, bdev_io, internal.link); 2085 spdk_bdev_io_submit_reset(bdev_io); 2086 } 2087 2088 static void 2089 _spdk_bdev_reset_freeze_channel(struct spdk_io_channel_iter *i) 2090 { 2091 struct spdk_io_channel *ch; 2092 struct spdk_bdev_channel *channel; 2093 struct spdk_bdev_mgmt_channel *mgmt_channel; 2094 struct spdk_bdev_shared_resource *shared_resource; 2095 bdev_io_tailq_t tmp_queued; 2096 2097 TAILQ_INIT(&tmp_queued); 2098 2099 ch = spdk_io_channel_iter_get_channel(i); 2100 channel = spdk_io_channel_get_ctx(ch); 2101 shared_resource = channel->shared_resource; 2102 mgmt_channel = shared_resource->mgmt_ch; 2103 2104 channel->flags |= BDEV_CH_RESET_IN_PROGRESS; 2105 2106 if ((channel->flags & BDEV_CH_QOS_ENABLED) != 0) { 2107 /* The QoS object is always valid and readable while 2108 * the channel flag is set, so the lock here should not 2109 * be necessary. We're not in the fast path though, so 2110 * just take it anyway. */ 2111 pthread_mutex_lock(&channel->bdev->internal.mutex); 2112 if (channel->bdev->internal.qos->ch == channel) { 2113 TAILQ_SWAP(&channel->bdev->internal.qos->queued, &tmp_queued, spdk_bdev_io, internal.link); 2114 } 2115 pthread_mutex_unlock(&channel->bdev->internal.mutex); 2116 } 2117 2118 _spdk_bdev_abort_queued_io(&shared_resource->nomem_io, channel); 2119 _spdk_bdev_abort_buf_io(&mgmt_channel->need_buf_small, channel); 2120 _spdk_bdev_abort_buf_io(&mgmt_channel->need_buf_large, channel); 2121 _spdk_bdev_abort_queued_io(&tmp_queued, channel); 2122 2123 spdk_for_each_channel_continue(i, 0); 2124 } 2125 2126 static void 2127 _spdk_bdev_start_reset(void *ctx) 2128 { 2129 struct spdk_bdev_channel *ch = ctx; 2130 2131 spdk_for_each_channel(__bdev_to_io_dev(ch->bdev), _spdk_bdev_reset_freeze_channel, 2132 ch, _spdk_bdev_reset_dev); 2133 } 2134 2135 static void 2136 _spdk_bdev_channel_start_reset(struct spdk_bdev_channel *ch) 2137 { 2138 struct spdk_bdev *bdev = ch->bdev; 2139 2140 assert(!TAILQ_EMPTY(&ch->queued_resets)); 2141 2142 pthread_mutex_lock(&bdev->internal.mutex); 2143 if (bdev->internal.reset_in_progress == NULL) { 2144 bdev->internal.reset_in_progress = TAILQ_FIRST(&ch->queued_resets); 2145 /* 2146 * Take a channel reference for the target bdev for the life of this 2147 * reset. This guards against the channel getting destroyed while 2148 * spdk_for_each_channel() calls related to this reset IO are in 2149 * progress. We will release the reference when this reset is 2150 * completed. 2151 */ 2152 bdev->internal.reset_in_progress->u.reset.ch_ref = spdk_get_io_channel(__bdev_to_io_dev(bdev)); 2153 _spdk_bdev_start_reset(ch); 2154 } 2155 pthread_mutex_unlock(&bdev->internal.mutex); 2156 } 2157 2158 int 2159 spdk_bdev_reset(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 2160 spdk_bdev_io_completion_cb cb, void *cb_arg) 2161 { 2162 struct spdk_bdev *bdev = desc->bdev; 2163 struct spdk_bdev_io *bdev_io; 2164 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 2165 2166 bdev_io = spdk_bdev_get_io(channel); 2167 if (!bdev_io) { 2168 return -ENOMEM; 2169 } 2170 2171 bdev_io->internal.ch = channel; 2172 bdev_io->type = SPDK_BDEV_IO_TYPE_RESET; 2173 bdev_io->u.reset.ch_ref = NULL; 2174 spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb); 2175 2176 pthread_mutex_lock(&bdev->internal.mutex); 2177 TAILQ_INSERT_TAIL(&channel->queued_resets, bdev_io, internal.link); 2178 pthread_mutex_unlock(&bdev->internal.mutex); 2179 2180 _spdk_bdev_channel_start_reset(channel); 2181 2182 return 0; 2183 } 2184 2185 void 2186 spdk_bdev_get_io_stat(struct spdk_bdev *bdev, struct spdk_io_channel *ch, 2187 struct spdk_bdev_io_stat *stat) 2188 { 2189 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 2190 2191 *stat = channel->stat; 2192 } 2193 2194 static void 2195 _spdk_bdev_get_device_stat_done(struct spdk_io_channel_iter *i, int status) 2196 { 2197 void *io_device = spdk_io_channel_iter_get_io_device(i); 2198 struct spdk_bdev_iostat_ctx *bdev_iostat_ctx = spdk_io_channel_iter_get_ctx(i); 2199 2200 bdev_iostat_ctx->cb(__bdev_from_io_dev(io_device), bdev_iostat_ctx->stat, 2201 bdev_iostat_ctx->cb_arg, 0); 2202 free(bdev_iostat_ctx); 2203 } 2204 2205 static void 2206 _spdk_bdev_get_each_channel_stat(struct spdk_io_channel_iter *i) 2207 { 2208 struct spdk_bdev_iostat_ctx *bdev_iostat_ctx = spdk_io_channel_iter_get_ctx(i); 2209 struct spdk_io_channel *ch = spdk_io_channel_iter_get_channel(i); 2210 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 2211 2212 bdev_iostat_ctx->stat->bytes_read += channel->stat.bytes_read; 2213 bdev_iostat_ctx->stat->num_read_ops += channel->stat.num_read_ops; 2214 bdev_iostat_ctx->stat->bytes_written += channel->stat.bytes_written; 2215 bdev_iostat_ctx->stat->num_write_ops += channel->stat.num_write_ops; 2216 2217 spdk_for_each_channel_continue(i, 0); 2218 } 2219 2220 void 2221 spdk_bdev_get_device_stat(struct spdk_bdev *bdev, struct spdk_bdev_io_stat *stat, 2222 spdk_bdev_get_device_stat_cb cb, void *cb_arg) 2223 { 2224 struct spdk_bdev_iostat_ctx *bdev_iostat_ctx; 2225 2226 assert(bdev != NULL); 2227 assert(stat != NULL); 2228 assert(cb != NULL); 2229 2230 bdev_iostat_ctx = calloc(1, sizeof(struct spdk_bdev_iostat_ctx)); 2231 if (bdev_iostat_ctx == NULL) { 2232 SPDK_ERRLOG("Unable to allocate memory for spdk_bdev_iostat_ctx\n"); 2233 cb(bdev, stat, cb_arg, -ENOMEM); 2234 return; 2235 } 2236 2237 bdev_iostat_ctx->stat = stat; 2238 bdev_iostat_ctx->cb = cb; 2239 bdev_iostat_ctx->cb_arg = cb_arg; 2240 2241 spdk_for_each_channel(__bdev_to_io_dev(bdev), 2242 _spdk_bdev_get_each_channel_stat, 2243 bdev_iostat_ctx, 2244 _spdk_bdev_get_device_stat_done); 2245 } 2246 2247 int 2248 spdk_bdev_nvme_admin_passthru(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 2249 const struct spdk_nvme_cmd *cmd, void *buf, size_t nbytes, 2250 spdk_bdev_io_completion_cb cb, void *cb_arg) 2251 { 2252 struct spdk_bdev *bdev = desc->bdev; 2253 struct spdk_bdev_io *bdev_io; 2254 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 2255 2256 if (!desc->write) { 2257 return -EBADF; 2258 } 2259 2260 bdev_io = spdk_bdev_get_io(channel); 2261 if (!bdev_io) { 2262 return -ENOMEM; 2263 } 2264 2265 bdev_io->internal.ch = channel; 2266 bdev_io->type = SPDK_BDEV_IO_TYPE_NVME_ADMIN; 2267 bdev_io->u.nvme_passthru.cmd = *cmd; 2268 bdev_io->u.nvme_passthru.buf = buf; 2269 bdev_io->u.nvme_passthru.nbytes = nbytes; 2270 bdev_io->u.nvme_passthru.md_buf = NULL; 2271 bdev_io->u.nvme_passthru.md_len = 0; 2272 2273 spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb); 2274 2275 spdk_bdev_io_submit(bdev_io); 2276 return 0; 2277 } 2278 2279 int 2280 spdk_bdev_nvme_io_passthru(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 2281 const struct spdk_nvme_cmd *cmd, void *buf, size_t nbytes, 2282 spdk_bdev_io_completion_cb cb, void *cb_arg) 2283 { 2284 struct spdk_bdev *bdev = desc->bdev; 2285 struct spdk_bdev_io *bdev_io; 2286 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 2287 2288 if (!desc->write) { 2289 /* 2290 * Do not try to parse the NVMe command - we could maybe use bits in the opcode 2291 * to easily determine if the command is a read or write, but for now just 2292 * do not allow io_passthru with a read-only descriptor. 2293 */ 2294 return -EBADF; 2295 } 2296 2297 bdev_io = spdk_bdev_get_io(channel); 2298 if (!bdev_io) { 2299 return -ENOMEM; 2300 } 2301 2302 bdev_io->internal.ch = channel; 2303 bdev_io->type = SPDK_BDEV_IO_TYPE_NVME_IO; 2304 bdev_io->u.nvme_passthru.cmd = *cmd; 2305 bdev_io->u.nvme_passthru.buf = buf; 2306 bdev_io->u.nvme_passthru.nbytes = nbytes; 2307 bdev_io->u.nvme_passthru.md_buf = NULL; 2308 bdev_io->u.nvme_passthru.md_len = 0; 2309 2310 spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb); 2311 2312 spdk_bdev_io_submit(bdev_io); 2313 return 0; 2314 } 2315 2316 int 2317 spdk_bdev_nvme_io_passthru_md(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 2318 const struct spdk_nvme_cmd *cmd, void *buf, size_t nbytes, void *md_buf, size_t md_len, 2319 spdk_bdev_io_completion_cb cb, void *cb_arg) 2320 { 2321 struct spdk_bdev *bdev = desc->bdev; 2322 struct spdk_bdev_io *bdev_io; 2323 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 2324 2325 if (!desc->write) { 2326 /* 2327 * Do not try to parse the NVMe command - we could maybe use bits in the opcode 2328 * to easily determine if the command is a read or write, but for now just 2329 * do not allow io_passthru with a read-only descriptor. 2330 */ 2331 return -EBADF; 2332 } 2333 2334 bdev_io = spdk_bdev_get_io(channel); 2335 if (!bdev_io) { 2336 return -ENOMEM; 2337 } 2338 2339 bdev_io->internal.ch = channel; 2340 bdev_io->type = SPDK_BDEV_IO_TYPE_NVME_IO_MD; 2341 bdev_io->u.nvme_passthru.cmd = *cmd; 2342 bdev_io->u.nvme_passthru.buf = buf; 2343 bdev_io->u.nvme_passthru.nbytes = nbytes; 2344 bdev_io->u.nvme_passthru.md_buf = md_buf; 2345 bdev_io->u.nvme_passthru.md_len = md_len; 2346 2347 spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb); 2348 2349 spdk_bdev_io_submit(bdev_io); 2350 return 0; 2351 } 2352 2353 int 2354 spdk_bdev_queue_io_wait(struct spdk_bdev *bdev, struct spdk_io_channel *ch, 2355 struct spdk_bdev_io_wait_entry *entry) 2356 { 2357 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 2358 struct spdk_bdev_mgmt_channel *mgmt_ch = channel->shared_resource->mgmt_ch; 2359 2360 if (bdev != entry->bdev) { 2361 SPDK_ERRLOG("bdevs do not match\n"); 2362 return -EINVAL; 2363 } 2364 2365 if (mgmt_ch->per_thread_cache_count > 0) { 2366 SPDK_ERRLOG("Cannot queue io_wait if spdk_bdev_io available in per-thread cache\n"); 2367 return -EINVAL; 2368 } 2369 2370 TAILQ_INSERT_TAIL(&mgmt_ch->io_wait_queue, entry, link); 2371 return 0; 2372 } 2373 2374 static void 2375 _spdk_bdev_ch_retry_io(struct spdk_bdev_channel *bdev_ch) 2376 { 2377 struct spdk_bdev *bdev = bdev_ch->bdev; 2378 struct spdk_bdev_shared_resource *shared_resource = bdev_ch->shared_resource; 2379 struct spdk_bdev_io *bdev_io; 2380 2381 if (shared_resource->io_outstanding > shared_resource->nomem_threshold) { 2382 /* 2383 * Allow some more I/O to complete before retrying the nomem_io queue. 2384 * Some drivers (such as nvme) cannot immediately take a new I/O in 2385 * the context of a completion, because the resources for the I/O are 2386 * not released until control returns to the bdev poller. Also, we 2387 * may require several small I/O to complete before a larger I/O 2388 * (that requires splitting) can be submitted. 2389 */ 2390 return; 2391 } 2392 2393 while (!TAILQ_EMPTY(&shared_resource->nomem_io)) { 2394 bdev_io = TAILQ_FIRST(&shared_resource->nomem_io); 2395 TAILQ_REMOVE(&shared_resource->nomem_io, bdev_io, internal.link); 2396 bdev_io->internal.ch->io_outstanding++; 2397 shared_resource->io_outstanding++; 2398 bdev_io->internal.status = SPDK_BDEV_IO_STATUS_PENDING; 2399 bdev->fn_table->submit_request(bdev_io->internal.ch->channel, bdev_io); 2400 if (bdev_io->internal.status == SPDK_BDEV_IO_STATUS_NOMEM) { 2401 break; 2402 } 2403 } 2404 } 2405 2406 static inline void 2407 _spdk_bdev_io_complete(void *ctx) 2408 { 2409 struct spdk_bdev_io *bdev_io = ctx; 2410 2411 if (spdk_unlikely(bdev_io->internal.in_submit_request || bdev_io->internal.io_submit_ch)) { 2412 /* 2413 * Send the completion to the thread that originally submitted the I/O, 2414 * which may not be the current thread in the case of QoS. 2415 */ 2416 if (bdev_io->internal.io_submit_ch) { 2417 bdev_io->internal.ch = bdev_io->internal.io_submit_ch; 2418 bdev_io->internal.io_submit_ch = NULL; 2419 } 2420 2421 /* 2422 * Defer completion to avoid potential infinite recursion if the 2423 * user's completion callback issues a new I/O. 2424 */ 2425 spdk_thread_send_msg(spdk_io_channel_get_thread(bdev_io->internal.ch->channel), 2426 _spdk_bdev_io_complete, bdev_io); 2427 return; 2428 } 2429 2430 if (bdev_io->internal.status == SPDK_BDEV_IO_STATUS_SUCCESS) { 2431 switch (bdev_io->type) { 2432 case SPDK_BDEV_IO_TYPE_READ: 2433 bdev_io->internal.ch->stat.bytes_read += bdev_io->u.bdev.num_blocks * bdev_io->bdev->blocklen; 2434 bdev_io->internal.ch->stat.num_read_ops++; 2435 bdev_io->internal.ch->stat.read_latency_ticks += (spdk_get_ticks() - bdev_io->internal.submit_tsc); 2436 break; 2437 case SPDK_BDEV_IO_TYPE_WRITE: 2438 bdev_io->internal.ch->stat.bytes_written += bdev_io->u.bdev.num_blocks * bdev_io->bdev->blocklen; 2439 bdev_io->internal.ch->stat.num_write_ops++; 2440 bdev_io->internal.ch->stat.write_latency_ticks += (spdk_get_ticks() - bdev_io->internal.submit_tsc); 2441 break; 2442 default: 2443 break; 2444 } 2445 } 2446 2447 #ifdef SPDK_CONFIG_VTUNE 2448 uint64_t now_tsc = spdk_get_ticks(); 2449 if (now_tsc > (bdev_io->internal.ch->start_tsc + bdev_io->internal.ch->interval_tsc)) { 2450 uint64_t data[5]; 2451 2452 data[0] = bdev_io->internal.ch->stat.num_read_ops - bdev_io->internal.ch->prev_stat.num_read_ops; 2453 data[1] = bdev_io->internal.ch->stat.bytes_read - bdev_io->internal.ch->prev_stat.bytes_read; 2454 data[2] = bdev_io->internal.ch->stat.num_write_ops - bdev_io->internal.ch->prev_stat.num_write_ops; 2455 data[3] = bdev_io->internal.ch->stat.bytes_written - bdev_io->internal.ch->prev_stat.bytes_written; 2456 data[4] = bdev_io->bdev->fn_table->get_spin_time ? 2457 bdev_io->bdev->fn_table->get_spin_time(bdev_io->internal.ch->channel) : 0; 2458 2459 __itt_metadata_add(g_bdev_mgr.domain, __itt_null, bdev_io->internal.ch->handle, 2460 __itt_metadata_u64, 5, data); 2461 2462 bdev_io->internal.ch->prev_stat = bdev_io->internal.ch->stat; 2463 bdev_io->internal.ch->start_tsc = now_tsc; 2464 } 2465 #endif 2466 2467 assert(bdev_io->internal.cb != NULL); 2468 assert(spdk_get_thread() == spdk_io_channel_get_thread(bdev_io->internal.ch->channel)); 2469 2470 bdev_io->internal.cb(bdev_io, bdev_io->internal.status == SPDK_BDEV_IO_STATUS_SUCCESS, 2471 bdev_io->internal.caller_ctx); 2472 } 2473 2474 static void 2475 _spdk_bdev_reset_complete(struct spdk_io_channel_iter *i, int status) 2476 { 2477 struct spdk_bdev_io *bdev_io = spdk_io_channel_iter_get_ctx(i); 2478 2479 if (bdev_io->u.reset.ch_ref != NULL) { 2480 spdk_put_io_channel(bdev_io->u.reset.ch_ref); 2481 bdev_io->u.reset.ch_ref = NULL; 2482 } 2483 2484 _spdk_bdev_io_complete(bdev_io); 2485 } 2486 2487 static void 2488 _spdk_bdev_unfreeze_channel(struct spdk_io_channel_iter *i) 2489 { 2490 struct spdk_io_channel *_ch = spdk_io_channel_iter_get_channel(i); 2491 struct spdk_bdev_channel *ch = spdk_io_channel_get_ctx(_ch); 2492 2493 ch->flags &= ~BDEV_CH_RESET_IN_PROGRESS; 2494 if (!TAILQ_EMPTY(&ch->queued_resets)) { 2495 _spdk_bdev_channel_start_reset(ch); 2496 } 2497 2498 spdk_for_each_channel_continue(i, 0); 2499 } 2500 2501 void 2502 spdk_bdev_io_complete(struct spdk_bdev_io *bdev_io, enum spdk_bdev_io_status status) 2503 { 2504 struct spdk_bdev *bdev = bdev_io->bdev; 2505 struct spdk_bdev_channel *bdev_ch = bdev_io->internal.ch; 2506 struct spdk_bdev_shared_resource *shared_resource = bdev_ch->shared_resource; 2507 2508 bdev_io->internal.status = status; 2509 2510 if (spdk_unlikely(bdev_io->type == SPDK_BDEV_IO_TYPE_RESET)) { 2511 bool unlock_channels = false; 2512 2513 if (status == SPDK_BDEV_IO_STATUS_NOMEM) { 2514 SPDK_ERRLOG("NOMEM returned for reset\n"); 2515 } 2516 pthread_mutex_lock(&bdev->internal.mutex); 2517 if (bdev_io == bdev->internal.reset_in_progress) { 2518 bdev->internal.reset_in_progress = NULL; 2519 unlock_channels = true; 2520 } 2521 pthread_mutex_unlock(&bdev->internal.mutex); 2522 2523 if (unlock_channels) { 2524 spdk_for_each_channel(__bdev_to_io_dev(bdev), _spdk_bdev_unfreeze_channel, 2525 bdev_io, _spdk_bdev_reset_complete); 2526 return; 2527 } 2528 } else { 2529 assert(bdev_ch->io_outstanding > 0); 2530 assert(shared_resource->io_outstanding > 0); 2531 bdev_ch->io_outstanding--; 2532 shared_resource->io_outstanding--; 2533 2534 if (spdk_unlikely(status == SPDK_BDEV_IO_STATUS_NOMEM)) { 2535 TAILQ_INSERT_HEAD(&shared_resource->nomem_io, bdev_io, internal.link); 2536 /* 2537 * Wait for some of the outstanding I/O to complete before we 2538 * retry any of the nomem_io. Normally we will wait for 2539 * NOMEM_THRESHOLD_COUNT I/O to complete but for low queue 2540 * depth channels we will instead wait for half to complete. 2541 */ 2542 shared_resource->nomem_threshold = spdk_max((int64_t)shared_resource->io_outstanding / 2, 2543 (int64_t)shared_resource->io_outstanding - NOMEM_THRESHOLD_COUNT); 2544 return; 2545 } 2546 2547 if (spdk_unlikely(!TAILQ_EMPTY(&shared_resource->nomem_io))) { 2548 _spdk_bdev_ch_retry_io(bdev_ch); 2549 } 2550 } 2551 2552 _spdk_bdev_io_complete(bdev_io); 2553 } 2554 2555 void 2556 spdk_bdev_io_complete_scsi_status(struct spdk_bdev_io *bdev_io, enum spdk_scsi_status sc, 2557 enum spdk_scsi_sense sk, uint8_t asc, uint8_t ascq) 2558 { 2559 if (sc == SPDK_SCSI_STATUS_GOOD) { 2560 bdev_io->internal.status = SPDK_BDEV_IO_STATUS_SUCCESS; 2561 } else { 2562 bdev_io->internal.status = SPDK_BDEV_IO_STATUS_SCSI_ERROR; 2563 bdev_io->internal.error.scsi.sc = sc; 2564 bdev_io->internal.error.scsi.sk = sk; 2565 bdev_io->internal.error.scsi.asc = asc; 2566 bdev_io->internal.error.scsi.ascq = ascq; 2567 } 2568 2569 spdk_bdev_io_complete(bdev_io, bdev_io->internal.status); 2570 } 2571 2572 void 2573 spdk_bdev_io_get_scsi_status(const struct spdk_bdev_io *bdev_io, 2574 int *sc, int *sk, int *asc, int *ascq) 2575 { 2576 assert(sc != NULL); 2577 assert(sk != NULL); 2578 assert(asc != NULL); 2579 assert(ascq != NULL); 2580 2581 switch (bdev_io->internal.status) { 2582 case SPDK_BDEV_IO_STATUS_SUCCESS: 2583 *sc = SPDK_SCSI_STATUS_GOOD; 2584 *sk = SPDK_SCSI_SENSE_NO_SENSE; 2585 *asc = SPDK_SCSI_ASC_NO_ADDITIONAL_SENSE; 2586 *ascq = SPDK_SCSI_ASCQ_CAUSE_NOT_REPORTABLE; 2587 break; 2588 case SPDK_BDEV_IO_STATUS_NVME_ERROR: 2589 spdk_scsi_nvme_translate(bdev_io, sc, sk, asc, ascq); 2590 break; 2591 case SPDK_BDEV_IO_STATUS_SCSI_ERROR: 2592 *sc = bdev_io->internal.error.scsi.sc; 2593 *sk = bdev_io->internal.error.scsi.sk; 2594 *asc = bdev_io->internal.error.scsi.asc; 2595 *ascq = bdev_io->internal.error.scsi.ascq; 2596 break; 2597 default: 2598 *sc = SPDK_SCSI_STATUS_CHECK_CONDITION; 2599 *sk = SPDK_SCSI_SENSE_ABORTED_COMMAND; 2600 *asc = SPDK_SCSI_ASC_NO_ADDITIONAL_SENSE; 2601 *ascq = SPDK_SCSI_ASCQ_CAUSE_NOT_REPORTABLE; 2602 break; 2603 } 2604 } 2605 2606 void 2607 spdk_bdev_io_complete_nvme_status(struct spdk_bdev_io *bdev_io, int sct, int sc) 2608 { 2609 if (sct == SPDK_NVME_SCT_GENERIC && sc == SPDK_NVME_SC_SUCCESS) { 2610 bdev_io->internal.status = SPDK_BDEV_IO_STATUS_SUCCESS; 2611 } else { 2612 bdev_io->internal.error.nvme.sct = sct; 2613 bdev_io->internal.error.nvme.sc = sc; 2614 bdev_io->internal.status = SPDK_BDEV_IO_STATUS_NVME_ERROR; 2615 } 2616 2617 spdk_bdev_io_complete(bdev_io, bdev_io->internal.status); 2618 } 2619 2620 void 2621 spdk_bdev_io_get_nvme_status(const struct spdk_bdev_io *bdev_io, int *sct, int *sc) 2622 { 2623 assert(sct != NULL); 2624 assert(sc != NULL); 2625 2626 if (bdev_io->internal.status == SPDK_BDEV_IO_STATUS_NVME_ERROR) { 2627 *sct = bdev_io->internal.error.nvme.sct; 2628 *sc = bdev_io->internal.error.nvme.sc; 2629 } else if (bdev_io->internal.status == SPDK_BDEV_IO_STATUS_SUCCESS) { 2630 *sct = SPDK_NVME_SCT_GENERIC; 2631 *sc = SPDK_NVME_SC_SUCCESS; 2632 } else { 2633 *sct = SPDK_NVME_SCT_GENERIC; 2634 *sc = SPDK_NVME_SC_INTERNAL_DEVICE_ERROR; 2635 } 2636 } 2637 2638 struct spdk_thread * 2639 spdk_bdev_io_get_thread(struct spdk_bdev_io *bdev_io) 2640 { 2641 return spdk_io_channel_get_thread(bdev_io->internal.ch->channel); 2642 } 2643 2644 static void 2645 _spdk_bdev_qos_config_type(struct spdk_bdev *bdev, uint64_t qos_set, 2646 enum spdk_bdev_qos_type qos_type) 2647 { 2648 uint64_t min_qos_set = 0; 2649 2650 switch (qos_type) { 2651 case SPDK_BDEV_QOS_RW_IOPS_RATE_LIMIT: 2652 min_qos_set = SPDK_BDEV_QOS_MIN_IOS_PER_SEC; 2653 break; 2654 case SPDK_BDEV_QOS_RW_BYTEPS_RATE_LIMIT: 2655 min_qos_set = SPDK_BDEV_QOS_MIN_BW_IN_MB_PER_SEC; 2656 break; 2657 default: 2658 SPDK_ERRLOG("Unsupported QoS type.\n"); 2659 return; 2660 } 2661 2662 if (qos_set % min_qos_set) { 2663 SPDK_ERRLOG("Assigned QoS %" PRIu64 " on bdev %s is not multiple of %lu\n", 2664 qos_set, bdev->name, min_qos_set); 2665 SPDK_ERRLOG("Failed to enable QoS on this bdev %s\n", bdev->name); 2666 return; 2667 } 2668 2669 if (!bdev->internal.qos) { 2670 bdev->internal.qos = calloc(1, sizeof(*bdev->internal.qos)); 2671 if (!bdev->internal.qos) { 2672 SPDK_ERRLOG("Unable to allocate memory for QoS tracking\n"); 2673 return; 2674 } 2675 } 2676 2677 switch (qos_type) { 2678 case SPDK_BDEV_QOS_RW_IOPS_RATE_LIMIT: 2679 bdev->internal.qos->iops_rate_limit = qos_set; 2680 break; 2681 case SPDK_BDEV_QOS_RW_BYTEPS_RATE_LIMIT: 2682 bdev->internal.qos->byte_rate_limit = qos_set * 1024 * 1024; 2683 break; 2684 default: 2685 break; 2686 } 2687 2688 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Bdev:%s QoS type:%d set:%lu\n", 2689 bdev->name, qos_type, qos_set); 2690 2691 return; 2692 } 2693 2694 static void 2695 _spdk_bdev_qos_config(struct spdk_bdev *bdev) 2696 { 2697 struct spdk_conf_section *sp = NULL; 2698 const char *val = NULL; 2699 uint64_t qos_set = 0; 2700 int i = 0, j = 0; 2701 2702 sp = spdk_conf_find_section(NULL, "QoS"); 2703 if (!sp) { 2704 return; 2705 } 2706 2707 while (j < SPDK_BDEV_QOS_NUM_TYPES) { 2708 i = 0; 2709 while (true) { 2710 val = spdk_conf_section_get_nmval(sp, qos_type_str[j], i, 0); 2711 if (!val) { 2712 break; 2713 } 2714 2715 if (strcmp(bdev->name, val) != 0) { 2716 i++; 2717 continue; 2718 } 2719 2720 val = spdk_conf_section_get_nmval(sp, qos_type_str[j], i, 1); 2721 if (val) { 2722 qos_set = strtoull(val, NULL, 10); 2723 _spdk_bdev_qos_config_type(bdev, qos_set, j); 2724 } 2725 2726 break; 2727 } 2728 2729 j++; 2730 } 2731 2732 return; 2733 } 2734 2735 static int 2736 spdk_bdev_init(struct spdk_bdev *bdev) 2737 { 2738 assert(bdev->module != NULL); 2739 2740 if (!bdev->name) { 2741 SPDK_ERRLOG("Bdev name is NULL\n"); 2742 return -EINVAL; 2743 } 2744 2745 if (spdk_bdev_get_by_name(bdev->name)) { 2746 SPDK_ERRLOG("Bdev name:%s already exists\n", bdev->name); 2747 return -EEXIST; 2748 } 2749 2750 bdev->internal.status = SPDK_BDEV_STATUS_READY; 2751 2752 TAILQ_INIT(&bdev->internal.open_descs); 2753 2754 TAILQ_INIT(&bdev->aliases); 2755 2756 bdev->internal.reset_in_progress = NULL; 2757 2758 _spdk_bdev_qos_config(bdev); 2759 2760 spdk_io_device_register(__bdev_to_io_dev(bdev), 2761 spdk_bdev_channel_create, spdk_bdev_channel_destroy, 2762 sizeof(struct spdk_bdev_channel)); 2763 2764 pthread_mutex_init(&bdev->internal.mutex, NULL); 2765 return 0; 2766 } 2767 2768 static void 2769 spdk_bdev_destroy_cb(void *io_device) 2770 { 2771 int rc; 2772 struct spdk_bdev *bdev; 2773 spdk_bdev_unregister_cb cb_fn; 2774 void *cb_arg; 2775 2776 bdev = __bdev_from_io_dev(io_device); 2777 cb_fn = bdev->internal.unregister_cb; 2778 cb_arg = bdev->internal.unregister_ctx; 2779 2780 rc = bdev->fn_table->destruct(bdev->ctxt); 2781 if (rc < 0) { 2782 SPDK_ERRLOG("destruct failed\n"); 2783 } 2784 if (rc <= 0 && cb_fn != NULL) { 2785 cb_fn(cb_arg, rc); 2786 } 2787 } 2788 2789 2790 static void 2791 spdk_bdev_fini(struct spdk_bdev *bdev) 2792 { 2793 pthread_mutex_destroy(&bdev->internal.mutex); 2794 2795 free(bdev->internal.qos); 2796 2797 spdk_io_device_unregister(__bdev_to_io_dev(bdev), spdk_bdev_destroy_cb); 2798 } 2799 2800 static void 2801 spdk_bdev_start(struct spdk_bdev *bdev) 2802 { 2803 struct spdk_bdev_module *module; 2804 uint32_t action; 2805 2806 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Inserting bdev %s into list\n", bdev->name); 2807 TAILQ_INSERT_TAIL(&g_bdev_mgr.bdevs, bdev, internal.link); 2808 2809 /* Examine configuration before initializing I/O */ 2810 TAILQ_FOREACH(module, &g_bdev_mgr.bdev_modules, internal.tailq) { 2811 if (module->examine_config) { 2812 action = module->internal.action_in_progress; 2813 module->internal.action_in_progress++; 2814 module->examine_config(bdev); 2815 if (action != module->internal.action_in_progress) { 2816 SPDK_ERRLOG("examine_config for module %s did not call spdk_bdev_module_examine_done()\n", 2817 module->name); 2818 } 2819 } 2820 } 2821 2822 if (bdev->internal.claim_module) { 2823 return; 2824 } 2825 2826 TAILQ_FOREACH(module, &g_bdev_mgr.bdev_modules, internal.tailq) { 2827 if (module->examine_disk) { 2828 module->internal.action_in_progress++; 2829 module->examine_disk(bdev); 2830 } 2831 } 2832 } 2833 2834 int 2835 spdk_bdev_register(struct spdk_bdev *bdev) 2836 { 2837 int rc = spdk_bdev_init(bdev); 2838 2839 if (rc == 0) { 2840 spdk_bdev_start(bdev); 2841 } 2842 2843 return rc; 2844 } 2845 2846 static void 2847 spdk_vbdev_remove_base_bdevs(struct spdk_bdev *vbdev) 2848 { 2849 struct spdk_bdev **bdevs; 2850 struct spdk_bdev *base; 2851 size_t i, j, k; 2852 bool found; 2853 2854 /* Iterate over base bdevs to remove vbdev from them. */ 2855 for (i = 0; i < vbdev->internal.base_bdevs_cnt; i++) { 2856 found = false; 2857 base = vbdev->internal.base_bdevs[i]; 2858 2859 for (j = 0; j < base->vbdevs_cnt; j++) { 2860 if (base->vbdevs[j] != vbdev) { 2861 continue; 2862 } 2863 2864 for (k = j; k + 1 < base->vbdevs_cnt; k++) { 2865 base->vbdevs[k] = base->vbdevs[k + 1]; 2866 } 2867 2868 base->vbdevs_cnt--; 2869 if (base->vbdevs_cnt > 0) { 2870 bdevs = realloc(base->vbdevs, base->vbdevs_cnt * sizeof(bdevs[0])); 2871 /* It would be odd if shrinking memory block fail. */ 2872 assert(bdevs); 2873 base->vbdevs = bdevs; 2874 } else { 2875 free(base->vbdevs); 2876 base->vbdevs = NULL; 2877 } 2878 2879 found = true; 2880 break; 2881 } 2882 2883 if (!found) { 2884 SPDK_WARNLOG("Bdev '%s' is not base bdev of '%s'.\n", base->name, vbdev->name); 2885 } 2886 } 2887 2888 free(vbdev->internal.base_bdevs); 2889 vbdev->internal.base_bdevs = NULL; 2890 vbdev->internal.base_bdevs_cnt = 0; 2891 } 2892 2893 static int 2894 spdk_vbdev_set_base_bdevs(struct spdk_bdev *vbdev, struct spdk_bdev **base_bdevs, size_t cnt) 2895 { 2896 struct spdk_bdev **vbdevs; 2897 struct spdk_bdev *base; 2898 size_t i; 2899 2900 /* Adding base bdevs isn't supported (yet?). */ 2901 assert(vbdev->internal.base_bdevs_cnt == 0); 2902 2903 vbdev->internal.base_bdevs = malloc(cnt * sizeof(vbdev->internal.base_bdevs[0])); 2904 if (!vbdev->internal.base_bdevs) { 2905 SPDK_ERRLOG("%s - realloc() failed\n", vbdev->name); 2906 return -ENOMEM; 2907 } 2908 2909 memcpy(vbdev->internal.base_bdevs, base_bdevs, cnt * sizeof(vbdev->internal.base_bdevs[0])); 2910 vbdev->internal.base_bdevs_cnt = cnt; 2911 2912 /* Iterate over base bdevs to add this vbdev to them. */ 2913 for (i = 0; i < cnt; i++) { 2914 base = vbdev->internal.base_bdevs[i]; 2915 2916 assert(base != NULL); 2917 assert(base->internal.claim_module != NULL); 2918 2919 vbdevs = realloc(base->vbdevs, (base->vbdevs_cnt + 1) * sizeof(vbdevs[0])); 2920 if (!vbdevs) { 2921 SPDK_ERRLOG("%s - realloc() failed\n", base->name); 2922 spdk_vbdev_remove_base_bdevs(vbdev); 2923 return -ENOMEM; 2924 } 2925 2926 vbdevs[base->vbdevs_cnt] = vbdev; 2927 base->vbdevs = vbdevs; 2928 base->vbdevs_cnt++; 2929 } 2930 2931 return 0; 2932 } 2933 2934 int 2935 spdk_vbdev_register(struct spdk_bdev *vbdev, struct spdk_bdev **base_bdevs, int base_bdev_count) 2936 { 2937 int rc; 2938 2939 rc = spdk_bdev_init(vbdev); 2940 if (rc) { 2941 return rc; 2942 } 2943 2944 if (base_bdev_count == 0) { 2945 spdk_bdev_start(vbdev); 2946 return 0; 2947 } 2948 2949 rc = spdk_vbdev_set_base_bdevs(vbdev, base_bdevs, base_bdev_count); 2950 if (rc) { 2951 spdk_bdev_fini(vbdev); 2952 return rc; 2953 } 2954 2955 spdk_bdev_start(vbdev); 2956 return 0; 2957 2958 } 2959 2960 void 2961 spdk_bdev_destruct_done(struct spdk_bdev *bdev, int bdeverrno) 2962 { 2963 if (bdev->internal.unregister_cb != NULL) { 2964 bdev->internal.unregister_cb(bdev->internal.unregister_ctx, bdeverrno); 2965 } 2966 } 2967 2968 static void 2969 _remove_notify(void *arg) 2970 { 2971 struct spdk_bdev_desc *desc = arg; 2972 2973 desc->remove_cb(desc->remove_ctx); 2974 } 2975 2976 void 2977 spdk_bdev_unregister(struct spdk_bdev *bdev, spdk_bdev_unregister_cb cb_fn, void *cb_arg) 2978 { 2979 struct spdk_bdev_desc *desc, *tmp; 2980 bool do_destruct = true; 2981 struct spdk_thread *thread; 2982 2983 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Removing bdev %s from list\n", bdev->name); 2984 2985 thread = spdk_get_thread(); 2986 if (!thread) { 2987 /* The user called this from a non-SPDK thread. */ 2988 if (cb_fn != NULL) { 2989 cb_fn(cb_arg, -ENOTSUP); 2990 } 2991 return; 2992 } 2993 2994 pthread_mutex_lock(&bdev->internal.mutex); 2995 2996 spdk_vbdev_remove_base_bdevs(bdev); 2997 2998 bdev->internal.status = SPDK_BDEV_STATUS_REMOVING; 2999 bdev->internal.unregister_cb = cb_fn; 3000 bdev->internal.unregister_ctx = cb_arg; 3001 3002 TAILQ_FOREACH_SAFE(desc, &bdev->internal.open_descs, link, tmp) { 3003 if (desc->remove_cb) { 3004 do_destruct = false; 3005 /* 3006 * Defer invocation of the remove_cb to a separate message that will 3007 * run later on this thread. This ensures this context unwinds and 3008 * we don't recursively unregister this bdev again if the remove_cb 3009 * immediately closes its descriptor. 3010 */ 3011 if (!desc->remove_scheduled) { 3012 /* Avoid scheduling removal of the same descriptor multiple times. */ 3013 desc->remove_scheduled = true; 3014 spdk_thread_send_msg(thread, _remove_notify, desc); 3015 } 3016 } 3017 } 3018 3019 if (!do_destruct) { 3020 pthread_mutex_unlock(&bdev->internal.mutex); 3021 return; 3022 } 3023 3024 TAILQ_REMOVE(&g_bdev_mgr.bdevs, bdev, internal.link); 3025 pthread_mutex_unlock(&bdev->internal.mutex); 3026 3027 spdk_bdev_fini(bdev); 3028 } 3029 3030 int 3031 spdk_bdev_open(struct spdk_bdev *bdev, bool write, spdk_bdev_remove_cb_t remove_cb, 3032 void *remove_ctx, struct spdk_bdev_desc **_desc) 3033 { 3034 struct spdk_bdev_desc *desc; 3035 3036 desc = calloc(1, sizeof(*desc)); 3037 if (desc == NULL) { 3038 SPDK_ERRLOG("Failed to allocate memory for bdev descriptor\n"); 3039 return -ENOMEM; 3040 } 3041 3042 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Opening descriptor %p for bdev %s on thread %p\n", desc, bdev->name, 3043 spdk_get_thread()); 3044 3045 pthread_mutex_lock(&bdev->internal.mutex); 3046 3047 if (write && bdev->internal.claim_module) { 3048 SPDK_ERRLOG("Could not open %s - already claimed\n", bdev->name); 3049 free(desc); 3050 pthread_mutex_unlock(&bdev->internal.mutex); 3051 return -EPERM; 3052 } 3053 3054 TAILQ_INSERT_TAIL(&bdev->internal.open_descs, desc, link); 3055 3056 desc->bdev = bdev; 3057 desc->remove_cb = remove_cb; 3058 desc->remove_ctx = remove_ctx; 3059 desc->write = write; 3060 *_desc = desc; 3061 3062 pthread_mutex_unlock(&bdev->internal.mutex); 3063 3064 return 0; 3065 } 3066 3067 void 3068 spdk_bdev_close(struct spdk_bdev_desc *desc) 3069 { 3070 struct spdk_bdev *bdev = desc->bdev; 3071 bool do_unregister = false; 3072 3073 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Closing descriptor %p for bdev %s on thread %p\n", desc, bdev->name, 3074 spdk_get_thread()); 3075 3076 pthread_mutex_lock(&bdev->internal.mutex); 3077 3078 TAILQ_REMOVE(&bdev->internal.open_descs, desc, link); 3079 free(desc); 3080 3081 /* If no more descriptors, kill QoS channel */ 3082 if (bdev->internal.qos && TAILQ_EMPTY(&bdev->internal.open_descs)) { 3083 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Closed last descriptor for bdev %s on thread %p. Stopping QoS.\n", 3084 bdev->name, spdk_get_thread()); 3085 3086 if (spdk_bdev_qos_destroy(bdev)) { 3087 /* There isn't anything we can do to recover here. Just let the 3088 * old QoS poller keep running. The QoS handling won't change 3089 * cores when the user allocates a new channel, but it won't break. */ 3090 SPDK_ERRLOG("Unable to shut down QoS poller. It will continue running on the current thread.\n"); 3091 } 3092 } 3093 3094 if (bdev->internal.status == SPDK_BDEV_STATUS_REMOVING && TAILQ_EMPTY(&bdev->internal.open_descs)) { 3095 do_unregister = true; 3096 } 3097 pthread_mutex_unlock(&bdev->internal.mutex); 3098 3099 if (do_unregister == true) { 3100 spdk_bdev_unregister(bdev, bdev->internal.unregister_cb, bdev->internal.unregister_ctx); 3101 } 3102 } 3103 3104 int 3105 spdk_bdev_module_claim_bdev(struct spdk_bdev *bdev, struct spdk_bdev_desc *desc, 3106 struct spdk_bdev_module *module) 3107 { 3108 if (bdev->internal.claim_module != NULL) { 3109 SPDK_ERRLOG("bdev %s already claimed by module %s\n", bdev->name, 3110 bdev->internal.claim_module->name); 3111 return -EPERM; 3112 } 3113 3114 if (desc && !desc->write) { 3115 desc->write = true; 3116 } 3117 3118 bdev->internal.claim_module = module; 3119 return 0; 3120 } 3121 3122 void 3123 spdk_bdev_module_release_bdev(struct spdk_bdev *bdev) 3124 { 3125 assert(bdev->internal.claim_module != NULL); 3126 bdev->internal.claim_module = NULL; 3127 } 3128 3129 struct spdk_bdev * 3130 spdk_bdev_desc_get_bdev(struct spdk_bdev_desc *desc) 3131 { 3132 return desc->bdev; 3133 } 3134 3135 void 3136 spdk_bdev_io_get_iovec(struct spdk_bdev_io *bdev_io, struct iovec **iovp, int *iovcntp) 3137 { 3138 struct iovec *iovs; 3139 int iovcnt; 3140 3141 if (bdev_io == NULL) { 3142 return; 3143 } 3144 3145 switch (bdev_io->type) { 3146 case SPDK_BDEV_IO_TYPE_READ: 3147 iovs = bdev_io->u.bdev.iovs; 3148 iovcnt = bdev_io->u.bdev.iovcnt; 3149 break; 3150 case SPDK_BDEV_IO_TYPE_WRITE: 3151 iovs = bdev_io->u.bdev.iovs; 3152 iovcnt = bdev_io->u.bdev.iovcnt; 3153 break; 3154 default: 3155 iovs = NULL; 3156 iovcnt = 0; 3157 break; 3158 } 3159 3160 if (iovp) { 3161 *iovp = iovs; 3162 } 3163 if (iovcntp) { 3164 *iovcntp = iovcnt; 3165 } 3166 } 3167 3168 void 3169 spdk_bdev_module_list_add(struct spdk_bdev_module *bdev_module) 3170 { 3171 3172 if (spdk_bdev_module_list_find(bdev_module->name)) { 3173 SPDK_ERRLOG("ERROR: module '%s' already registered.\n", bdev_module->name); 3174 assert(false); 3175 } 3176 3177 if (bdev_module->async_init) { 3178 bdev_module->internal.action_in_progress = 1; 3179 } 3180 3181 /* 3182 * Modules with examine callbacks must be initialized first, so they are 3183 * ready to handle examine callbacks from later modules that will 3184 * register physical bdevs. 3185 */ 3186 if (bdev_module->examine_config != NULL || bdev_module->examine_disk != NULL) { 3187 TAILQ_INSERT_HEAD(&g_bdev_mgr.bdev_modules, bdev_module, internal.tailq); 3188 } else { 3189 TAILQ_INSERT_TAIL(&g_bdev_mgr.bdev_modules, bdev_module, internal.tailq); 3190 } 3191 } 3192 3193 struct spdk_bdev_module * 3194 spdk_bdev_module_list_find(const char *name) 3195 { 3196 struct spdk_bdev_module *bdev_module; 3197 3198 TAILQ_FOREACH(bdev_module, &g_bdev_mgr.bdev_modules, internal.tailq) { 3199 if (strcmp(name, bdev_module->name) == 0) { 3200 break; 3201 } 3202 } 3203 3204 return bdev_module; 3205 } 3206 3207 static void 3208 spdk_bdev_write_zeroes_split(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg) 3209 { 3210 uint64_t len; 3211 3212 if (!success) { 3213 bdev_io->internal.cb = bdev_io->u.bdev.stored_user_cb; 3214 _spdk_bdev_io_complete(bdev_io); 3215 return; 3216 } 3217 3218 /* no need to perform the error checking from write_zeroes_blocks because this request already passed those checks. */ 3219 len = spdk_min(spdk_bdev_get_block_size(bdev_io->bdev) * bdev_io->u.bdev.split_remaining_num_blocks, 3220 ZERO_BUFFER_SIZE); 3221 3222 bdev_io->u.bdev.offset_blocks = bdev_io->u.bdev.split_current_offset_blocks; 3223 bdev_io->u.bdev.iovs[0].iov_len = len; 3224 bdev_io->u.bdev.num_blocks = len / spdk_bdev_get_block_size(bdev_io->bdev); 3225 bdev_io->u.bdev.split_remaining_num_blocks -= bdev_io->u.bdev.num_blocks; 3226 bdev_io->u.bdev.split_current_offset_blocks += bdev_io->u.bdev.num_blocks; 3227 3228 /* if this round completes the i/o, change the callback to be the original user callback */ 3229 if (bdev_io->u.bdev.split_remaining_num_blocks == 0) { 3230 spdk_bdev_io_init(bdev_io, bdev_io->bdev, cb_arg, bdev_io->u.bdev.stored_user_cb); 3231 } else { 3232 spdk_bdev_io_init(bdev_io, bdev_io->bdev, cb_arg, spdk_bdev_write_zeroes_split); 3233 } 3234 spdk_bdev_io_submit(bdev_io); 3235 } 3236 3237 struct set_qos_limit_ctx { 3238 void (*cb_fn)(void *cb_arg, int status); 3239 void *cb_arg; 3240 struct spdk_bdev *bdev; 3241 }; 3242 3243 static void 3244 _spdk_bdev_set_qos_limit_done(struct set_qos_limit_ctx *ctx, int status) 3245 { 3246 pthread_mutex_lock(&ctx->bdev->internal.mutex); 3247 ctx->bdev->internal.qos_mod_in_progress = false; 3248 pthread_mutex_unlock(&ctx->bdev->internal.mutex); 3249 3250 ctx->cb_fn(ctx->cb_arg, status); 3251 free(ctx); 3252 } 3253 3254 static void 3255 _spdk_bdev_disable_qos_done(void *cb_arg) 3256 { 3257 struct set_qos_limit_ctx *ctx = cb_arg; 3258 struct spdk_bdev *bdev = ctx->bdev; 3259 struct spdk_bdev_io *bdev_io; 3260 struct spdk_bdev_qos *qos; 3261 3262 pthread_mutex_lock(&bdev->internal.mutex); 3263 qos = bdev->internal.qos; 3264 bdev->internal.qos = NULL; 3265 pthread_mutex_unlock(&bdev->internal.mutex); 3266 3267 while (!TAILQ_EMPTY(&qos->queued)) { 3268 /* Send queued I/O back to their original thread for resubmission. */ 3269 bdev_io = TAILQ_FIRST(&qos->queued); 3270 TAILQ_REMOVE(&qos->queued, bdev_io, internal.link); 3271 3272 if (bdev_io->internal.io_submit_ch) { 3273 /* 3274 * Channel was changed when sending it to the QoS thread - change it back 3275 * before sending it back to the original thread. 3276 */ 3277 bdev_io->internal.ch = bdev_io->internal.io_submit_ch; 3278 bdev_io->internal.io_submit_ch = NULL; 3279 } 3280 3281 spdk_thread_send_msg(spdk_io_channel_get_thread(bdev_io->internal.ch->channel), 3282 _spdk_bdev_io_submit, bdev_io); 3283 } 3284 3285 spdk_put_io_channel(spdk_io_channel_from_ctx(qos->ch)); 3286 spdk_poller_unregister(&qos->poller); 3287 3288 free(qos); 3289 3290 _spdk_bdev_set_qos_limit_done(ctx, 0); 3291 } 3292 3293 static void 3294 _spdk_bdev_disable_qos_msg_done(struct spdk_io_channel_iter *i, int status) 3295 { 3296 void *io_device = spdk_io_channel_iter_get_io_device(i); 3297 struct spdk_bdev *bdev = __bdev_from_io_dev(io_device); 3298 struct set_qos_limit_ctx *ctx = spdk_io_channel_iter_get_ctx(i); 3299 struct spdk_thread *thread; 3300 3301 pthread_mutex_lock(&bdev->internal.mutex); 3302 thread = bdev->internal.qos->thread; 3303 pthread_mutex_unlock(&bdev->internal.mutex); 3304 3305 spdk_thread_send_msg(thread, _spdk_bdev_disable_qos_done, ctx); 3306 } 3307 3308 static void 3309 _spdk_bdev_disable_qos_msg(struct spdk_io_channel_iter *i) 3310 { 3311 struct spdk_io_channel *ch = spdk_io_channel_iter_get_channel(i); 3312 struct spdk_bdev_channel *bdev_ch = spdk_io_channel_get_ctx(ch); 3313 3314 bdev_ch->flags &= ~BDEV_CH_QOS_ENABLED; 3315 3316 spdk_for_each_channel_continue(i, 0); 3317 } 3318 3319 static void 3320 _spdk_bdev_update_qos_limit_iops_msg(void *cb_arg) 3321 { 3322 struct set_qos_limit_ctx *ctx = cb_arg; 3323 struct spdk_bdev *bdev = ctx->bdev; 3324 3325 pthread_mutex_lock(&bdev->internal.mutex); 3326 spdk_bdev_qos_update_max_quota_per_timeslice(bdev->internal.qos); 3327 pthread_mutex_unlock(&bdev->internal.mutex); 3328 3329 _spdk_bdev_set_qos_limit_done(ctx, 0); 3330 } 3331 3332 static void 3333 _spdk_bdev_enable_qos_msg(struct spdk_io_channel_iter *i) 3334 { 3335 void *io_device = spdk_io_channel_iter_get_io_device(i); 3336 struct spdk_bdev *bdev = __bdev_from_io_dev(io_device); 3337 struct spdk_io_channel *ch = spdk_io_channel_iter_get_channel(i); 3338 struct spdk_bdev_channel *bdev_ch = spdk_io_channel_get_ctx(ch); 3339 int rc; 3340 3341 pthread_mutex_lock(&bdev->internal.mutex); 3342 rc = _spdk_bdev_enable_qos(bdev, bdev_ch); 3343 pthread_mutex_unlock(&bdev->internal.mutex); 3344 spdk_for_each_channel_continue(i, rc); 3345 } 3346 3347 static void 3348 _spdk_bdev_enable_qos_done(struct spdk_io_channel_iter *i, int status) 3349 { 3350 struct set_qos_limit_ctx *ctx = spdk_io_channel_iter_get_ctx(i); 3351 3352 _spdk_bdev_set_qos_limit_done(ctx, status); 3353 } 3354 3355 void 3356 spdk_bdev_set_qos_limit_iops(struct spdk_bdev *bdev, uint64_t ios_per_sec, 3357 void (*cb_fn)(void *cb_arg, int status), void *cb_arg) 3358 { 3359 struct set_qos_limit_ctx *ctx; 3360 3361 if (ios_per_sec > 0 && ios_per_sec % SPDK_BDEV_QOS_MIN_IOS_PER_SEC) { 3362 SPDK_ERRLOG("Requested ios_per_sec limit %" PRIu64 " is not a multiple of %u\n", 3363 ios_per_sec, SPDK_BDEV_QOS_MIN_IOS_PER_SEC); 3364 cb_fn(cb_arg, -EINVAL); 3365 return; 3366 } 3367 3368 ctx = calloc(1, sizeof(*ctx)); 3369 if (ctx == NULL) { 3370 cb_fn(cb_arg, -ENOMEM); 3371 return; 3372 } 3373 3374 ctx->cb_fn = cb_fn; 3375 ctx->cb_arg = cb_arg; 3376 ctx->bdev = bdev; 3377 3378 pthread_mutex_lock(&bdev->internal.mutex); 3379 if (bdev->internal.qos_mod_in_progress) { 3380 pthread_mutex_unlock(&bdev->internal.mutex); 3381 free(ctx); 3382 cb_fn(cb_arg, -EAGAIN); 3383 return; 3384 } 3385 bdev->internal.qos_mod_in_progress = true; 3386 3387 if (ios_per_sec > 0) { 3388 if (bdev->internal.qos == NULL) { 3389 /* Enabling */ 3390 bdev->internal.qos = calloc(1, sizeof(*bdev->internal.qos)); 3391 if (!bdev->internal.qos) { 3392 pthread_mutex_unlock(&bdev->internal.mutex); 3393 SPDK_ERRLOG("Unable to allocate memory for QoS tracking\n"); 3394 free(ctx); 3395 cb_fn(cb_arg, -ENOMEM); 3396 return; 3397 } 3398 3399 bdev->internal.qos->iops_rate_limit = ios_per_sec; 3400 spdk_for_each_channel(__bdev_to_io_dev(bdev), 3401 _spdk_bdev_enable_qos_msg, ctx, 3402 _spdk_bdev_enable_qos_done); 3403 } else { 3404 /* Updating */ 3405 bdev->internal.qos->iops_rate_limit = ios_per_sec; 3406 spdk_thread_send_msg(bdev->internal.qos->thread, _spdk_bdev_update_qos_limit_iops_msg, ctx); 3407 } 3408 } else { 3409 if (bdev->internal.qos != NULL) { 3410 /* Disabling */ 3411 spdk_for_each_channel(__bdev_to_io_dev(bdev), 3412 _spdk_bdev_disable_qos_msg, ctx, 3413 _spdk_bdev_disable_qos_msg_done); 3414 } else { 3415 pthread_mutex_unlock(&bdev->internal.mutex); 3416 _spdk_bdev_set_qos_limit_done(ctx, 0); 3417 return; 3418 } 3419 } 3420 3421 pthread_mutex_unlock(&bdev->internal.mutex); 3422 } 3423 3424 SPDK_LOG_REGISTER_COMPONENT("bdev", SPDK_LOG_BDEV) 3425