1 /*- 2 * BSD LICENSE 3 * 4 * Copyright (c) Intel Corporation. 5 * All rights reserved. 6 * 7 * Redistribution and use in source and binary forms, with or without 8 * modification, are permitted provided that the following conditions 9 * are met: 10 * 11 * * Redistributions of source code must retain the above copyright 12 * notice, this list of conditions and the following disclaimer. 13 * * Redistributions in binary form must reproduce the above copyright 14 * notice, this list of conditions and the following disclaimer in 15 * the documentation and/or other materials provided with the 16 * distribution. 17 * * Neither the name of Intel Corporation nor the names of its 18 * contributors may be used to endorse or promote products derived 19 * from this software without specific prior written permission. 20 * 21 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 22 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 23 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR 24 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT 25 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, 26 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT 27 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 28 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY 29 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 30 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE 31 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 32 */ 33 34 #include "spdk/stdinc.h" 35 36 #include "spdk/bdev.h" 37 #include "spdk/conf.h" 38 39 #include "spdk/env.h" 40 #include "spdk/event.h" 41 #include "spdk/thread.h" 42 #include "spdk/likely.h" 43 #include "spdk/queue.h" 44 #include "spdk/nvme_spec.h" 45 #include "spdk/scsi_spec.h" 46 #include "spdk/util.h" 47 #include "spdk/trace.h" 48 49 #include "spdk/bdev_module.h" 50 #include "spdk_internal/log.h" 51 #include "spdk/string.h" 52 53 #ifdef SPDK_CONFIG_VTUNE 54 #include "ittnotify.h" 55 #include "ittnotify_types.h" 56 int __itt_init_ittlib(const char *, __itt_group_id); 57 #endif 58 59 #define SPDK_BDEV_IO_POOL_SIZE (64 * 1024) 60 #define SPDK_BDEV_IO_CACHE_SIZE 256 61 #define BUF_SMALL_POOL_SIZE 8192 62 #define BUF_LARGE_POOL_SIZE 1024 63 #define NOMEM_THRESHOLD_COUNT 8 64 #define ZERO_BUFFER_SIZE 0x100000 65 66 #define OWNER_BDEV 0x2 67 68 #define OBJECT_BDEV_IO 0x2 69 70 #define TRACE_GROUP_BDEV 0x3 71 #define TRACE_BDEV_IO_START SPDK_TPOINT_ID(TRACE_GROUP_BDEV, 0x0) 72 #define TRACE_BDEV_IO_DONE SPDK_TPOINT_ID(TRACE_GROUP_BDEV, 0x1) 73 74 #define SPDK_BDEV_QOS_TIMESLICE_IN_USEC 1000 75 #define SPDK_BDEV_QOS_MIN_IO_PER_TIMESLICE 1 76 #define SPDK_BDEV_QOS_MIN_BYTE_PER_TIMESLICE 512 77 #define SPDK_BDEV_QOS_MIN_IOS_PER_SEC 10000 78 #define SPDK_BDEV_QOS_MIN_BYTES_PER_SEC (10 * 1024 * 1024) 79 #define SPDK_BDEV_QOS_LIMIT_NOT_DEFINED UINT64_MAX 80 81 static const char *qos_conf_type[] = {"Limit_IOPS", "Limit_BPS"}; 82 static const char *qos_rpc_type[] = {"qos_ios_per_sec"}; 83 84 TAILQ_HEAD(spdk_bdev_list, spdk_bdev); 85 86 struct spdk_bdev_mgr { 87 struct spdk_mempool *bdev_io_pool; 88 89 struct spdk_mempool *buf_small_pool; 90 struct spdk_mempool *buf_large_pool; 91 92 void *zero_buffer; 93 94 TAILQ_HEAD(bdev_module_list, spdk_bdev_module) bdev_modules; 95 96 struct spdk_bdev_list bdevs; 97 98 bool init_complete; 99 bool module_init_complete; 100 101 #ifdef SPDK_CONFIG_VTUNE 102 __itt_domain *domain; 103 #endif 104 }; 105 106 static struct spdk_bdev_mgr g_bdev_mgr = { 107 .bdev_modules = TAILQ_HEAD_INITIALIZER(g_bdev_mgr.bdev_modules), 108 .bdevs = TAILQ_HEAD_INITIALIZER(g_bdev_mgr.bdevs), 109 .init_complete = false, 110 .module_init_complete = false, 111 }; 112 113 static struct spdk_bdev_opts g_bdev_opts = { 114 .bdev_io_pool_size = SPDK_BDEV_IO_POOL_SIZE, 115 .bdev_io_cache_size = SPDK_BDEV_IO_CACHE_SIZE, 116 }; 117 118 static spdk_bdev_init_cb g_init_cb_fn = NULL; 119 static void *g_init_cb_arg = NULL; 120 121 static spdk_bdev_fini_cb g_fini_cb_fn = NULL; 122 static void *g_fini_cb_arg = NULL; 123 static struct spdk_thread *g_fini_thread = NULL; 124 125 struct spdk_bdev_qos_limit { 126 /** IOs or bytes allowed per second (i.e., 1s). */ 127 uint64_t limit; 128 129 /** Remaining IOs or bytes allowed in current timeslice (e.g., 1ms). 130 * For remaining bytes, allowed to run negative if an I/O is submitted when 131 * some bytes are remaining, but the I/O is bigger than that amount. The 132 * excess will be deducted from the next timeslice. 133 */ 134 int64_t remaining_this_timeslice; 135 136 /** Minimum allowed IOs or bytes to be issued in one timeslice (e.g., 1ms). */ 137 uint32_t min_per_timeslice; 138 139 /** Maximum allowed IOs or bytes to be issued in one timeslice (e.g., 1ms). */ 140 uint32_t max_per_timeslice; 141 }; 142 143 struct spdk_bdev_qos { 144 /** Types of structure of rate limits. */ 145 struct spdk_bdev_qos_limit rate_limits[SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES]; 146 147 /** The channel that all I/O are funneled through. */ 148 struct spdk_bdev_channel *ch; 149 150 /** The thread on which the poller is running. */ 151 struct spdk_thread *thread; 152 153 /** Queue of I/O waiting to be issued. */ 154 bdev_io_tailq_t queued; 155 156 /** Size of a timeslice in tsc ticks. */ 157 uint64_t timeslice_size; 158 159 /** Timestamp of start of last timeslice. */ 160 uint64_t last_timeslice; 161 162 /** Poller that processes queued I/O commands each time slice. */ 163 struct spdk_poller *poller; 164 }; 165 166 struct spdk_bdev_mgmt_channel { 167 bdev_io_stailq_t need_buf_small; 168 bdev_io_stailq_t need_buf_large; 169 170 /* 171 * Each thread keeps a cache of bdev_io - this allows 172 * bdev threads which are *not* DPDK threads to still 173 * benefit from a per-thread bdev_io cache. Without 174 * this, non-DPDK threads fetching from the mempool 175 * incur a cmpxchg on get and put. 176 */ 177 bdev_io_stailq_t per_thread_cache; 178 uint32_t per_thread_cache_count; 179 uint32_t bdev_io_cache_size; 180 181 TAILQ_HEAD(, spdk_bdev_shared_resource) shared_resources; 182 TAILQ_HEAD(, spdk_bdev_io_wait_entry) io_wait_queue; 183 }; 184 185 /* 186 * Per-module (or per-io_device) data. Multiple bdevs built on the same io_device 187 * will queue here their IO that awaits retry. It makes it possible to retry sending 188 * IO to one bdev after IO from other bdev completes. 189 */ 190 struct spdk_bdev_shared_resource { 191 /* The bdev management channel */ 192 struct spdk_bdev_mgmt_channel *mgmt_ch; 193 194 /* 195 * Count of I/O submitted to bdev module and waiting for completion. 196 * Incremented before submit_request() is called on an spdk_bdev_io. 197 */ 198 uint64_t io_outstanding; 199 200 /* 201 * Queue of IO awaiting retry because of a previous NOMEM status returned 202 * on this channel. 203 */ 204 bdev_io_tailq_t nomem_io; 205 206 /* 207 * Threshold which io_outstanding must drop to before retrying nomem_io. 208 */ 209 uint64_t nomem_threshold; 210 211 /* I/O channel allocated by a bdev module */ 212 struct spdk_io_channel *shared_ch; 213 214 /* Refcount of bdev channels using this resource */ 215 uint32_t ref; 216 217 TAILQ_ENTRY(spdk_bdev_shared_resource) link; 218 }; 219 220 #define BDEV_CH_RESET_IN_PROGRESS (1 << 0) 221 #define BDEV_CH_QOS_ENABLED (1 << 1) 222 223 struct spdk_bdev_channel { 224 struct spdk_bdev *bdev; 225 226 /* The channel for the underlying device */ 227 struct spdk_io_channel *channel; 228 229 /* Per io_device per thread data */ 230 struct spdk_bdev_shared_resource *shared_resource; 231 232 struct spdk_bdev_io_stat stat; 233 234 /* 235 * Count of I/O submitted through this channel and waiting for completion. 236 * Incremented before submit_request() is called on an spdk_bdev_io. 237 */ 238 uint64_t io_outstanding; 239 240 bdev_io_tailq_t queued_resets; 241 242 uint32_t flags; 243 244 #ifdef SPDK_CONFIG_VTUNE 245 uint64_t start_tsc; 246 uint64_t interval_tsc; 247 __itt_string_handle *handle; 248 struct spdk_bdev_io_stat prev_stat; 249 #endif 250 251 }; 252 253 struct spdk_bdev_desc { 254 struct spdk_bdev *bdev; 255 struct spdk_thread *thread; 256 spdk_bdev_remove_cb_t remove_cb; 257 void *remove_ctx; 258 bool remove_scheduled; 259 bool closed; 260 bool write; 261 TAILQ_ENTRY(spdk_bdev_desc) link; 262 }; 263 264 struct spdk_bdev_iostat_ctx { 265 struct spdk_bdev_io_stat *stat; 266 spdk_bdev_get_device_stat_cb cb; 267 void *cb_arg; 268 }; 269 270 #define __bdev_to_io_dev(bdev) (((char *)bdev) + 1) 271 #define __bdev_from_io_dev(io_dev) ((struct spdk_bdev *)(((char *)io_dev) - 1)) 272 273 static void _spdk_bdev_write_zero_buffer_done(struct spdk_bdev_io *bdev_io, bool success, 274 void *cb_arg); 275 static void _spdk_bdev_write_zero_buffer_next(void *_bdev_io); 276 277 void 278 spdk_bdev_get_opts(struct spdk_bdev_opts *opts) 279 { 280 *opts = g_bdev_opts; 281 } 282 283 int 284 spdk_bdev_set_opts(struct spdk_bdev_opts *opts) 285 { 286 uint32_t min_pool_size; 287 288 /* 289 * Add 1 to the thread count to account for the extra mgmt_ch that gets created during subsystem 290 * initialization. A second mgmt_ch will be created on the same thread when the application starts 291 * but before the deferred put_io_channel event is executed for the first mgmt_ch. 292 */ 293 min_pool_size = opts->bdev_io_cache_size * (spdk_thread_get_count() + 1); 294 if (opts->bdev_io_pool_size < min_pool_size) { 295 SPDK_ERRLOG("bdev_io_pool_size %" PRIu32 " is not compatible with bdev_io_cache_size %" PRIu32 296 " and %" PRIu32 " threads\n", opts->bdev_io_pool_size, opts->bdev_io_cache_size, 297 spdk_thread_get_count()); 298 SPDK_ERRLOG("bdev_io_pool_size must be at least %" PRIu32 "\n", min_pool_size); 299 return -1; 300 } 301 302 g_bdev_opts = *opts; 303 return 0; 304 } 305 306 struct spdk_bdev * 307 spdk_bdev_first(void) 308 { 309 struct spdk_bdev *bdev; 310 311 bdev = TAILQ_FIRST(&g_bdev_mgr.bdevs); 312 if (bdev) { 313 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Starting bdev iteration at %s\n", bdev->name); 314 } 315 316 return bdev; 317 } 318 319 struct spdk_bdev * 320 spdk_bdev_next(struct spdk_bdev *prev) 321 { 322 struct spdk_bdev *bdev; 323 324 bdev = TAILQ_NEXT(prev, internal.link); 325 if (bdev) { 326 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Continuing bdev iteration at %s\n", bdev->name); 327 } 328 329 return bdev; 330 } 331 332 static struct spdk_bdev * 333 _bdev_next_leaf(struct spdk_bdev *bdev) 334 { 335 while (bdev != NULL) { 336 if (bdev->internal.claim_module == NULL) { 337 return bdev; 338 } else { 339 bdev = TAILQ_NEXT(bdev, internal.link); 340 } 341 } 342 343 return bdev; 344 } 345 346 struct spdk_bdev * 347 spdk_bdev_first_leaf(void) 348 { 349 struct spdk_bdev *bdev; 350 351 bdev = _bdev_next_leaf(TAILQ_FIRST(&g_bdev_mgr.bdevs)); 352 353 if (bdev) { 354 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Starting bdev iteration at %s\n", bdev->name); 355 } 356 357 return bdev; 358 } 359 360 struct spdk_bdev * 361 spdk_bdev_next_leaf(struct spdk_bdev *prev) 362 { 363 struct spdk_bdev *bdev; 364 365 bdev = _bdev_next_leaf(TAILQ_NEXT(prev, internal.link)); 366 367 if (bdev) { 368 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Continuing bdev iteration at %s\n", bdev->name); 369 } 370 371 return bdev; 372 } 373 374 struct spdk_bdev * 375 spdk_bdev_get_by_name(const char *bdev_name) 376 { 377 struct spdk_bdev_alias *tmp; 378 struct spdk_bdev *bdev = spdk_bdev_first(); 379 380 while (bdev != NULL) { 381 if (strcmp(bdev_name, bdev->name) == 0) { 382 return bdev; 383 } 384 385 TAILQ_FOREACH(tmp, &bdev->aliases, tailq) { 386 if (strcmp(bdev_name, tmp->alias) == 0) { 387 return bdev; 388 } 389 } 390 391 bdev = spdk_bdev_next(bdev); 392 } 393 394 return NULL; 395 } 396 397 void 398 spdk_bdev_io_set_buf(struct spdk_bdev_io *bdev_io, void *buf, size_t len) 399 { 400 struct iovec *iovs; 401 402 iovs = bdev_io->u.bdev.iovs; 403 404 assert(iovs != NULL); 405 assert(bdev_io->u.bdev.iovcnt >= 1); 406 407 iovs[0].iov_base = buf; 408 iovs[0].iov_len = len; 409 } 410 411 static void 412 spdk_bdev_io_put_buf(struct spdk_bdev_io *bdev_io) 413 { 414 struct spdk_mempool *pool; 415 struct spdk_bdev_io *tmp; 416 void *buf, *aligned_buf; 417 bdev_io_stailq_t *stailq; 418 struct spdk_bdev_mgmt_channel *ch; 419 420 assert(bdev_io->u.bdev.iovcnt == 1); 421 422 buf = bdev_io->internal.buf; 423 ch = bdev_io->internal.ch->shared_resource->mgmt_ch; 424 425 bdev_io->internal.buf = NULL; 426 427 if (bdev_io->internal.buf_len <= SPDK_BDEV_SMALL_BUF_MAX_SIZE) { 428 pool = g_bdev_mgr.buf_small_pool; 429 stailq = &ch->need_buf_small; 430 } else { 431 pool = g_bdev_mgr.buf_large_pool; 432 stailq = &ch->need_buf_large; 433 } 434 435 if (STAILQ_EMPTY(stailq)) { 436 spdk_mempool_put(pool, buf); 437 } else { 438 tmp = STAILQ_FIRST(stailq); 439 440 aligned_buf = (void *)(((uintptr_t)buf + 511) & ~511UL); 441 spdk_bdev_io_set_buf(tmp, aligned_buf, tmp->internal.buf_len); 442 443 STAILQ_REMOVE_HEAD(stailq, internal.buf_link); 444 tmp->internal.buf = buf; 445 tmp->internal.get_buf_cb(tmp->internal.ch->channel, tmp); 446 } 447 } 448 449 void 450 spdk_bdev_io_get_buf(struct spdk_bdev_io *bdev_io, spdk_bdev_io_get_buf_cb cb, uint64_t len) 451 { 452 struct spdk_mempool *pool; 453 bdev_io_stailq_t *stailq; 454 void *buf, *aligned_buf; 455 struct spdk_bdev_mgmt_channel *mgmt_ch; 456 457 assert(cb != NULL); 458 assert(bdev_io->u.bdev.iovs != NULL); 459 460 if (spdk_unlikely(bdev_io->u.bdev.iovs[0].iov_base != NULL)) { 461 /* Buffer already present */ 462 cb(bdev_io->internal.ch->channel, bdev_io); 463 return; 464 } 465 466 assert(len <= SPDK_BDEV_LARGE_BUF_MAX_SIZE); 467 mgmt_ch = bdev_io->internal.ch->shared_resource->mgmt_ch; 468 469 bdev_io->internal.buf_len = len; 470 bdev_io->internal.get_buf_cb = cb; 471 if (len <= SPDK_BDEV_SMALL_BUF_MAX_SIZE) { 472 pool = g_bdev_mgr.buf_small_pool; 473 stailq = &mgmt_ch->need_buf_small; 474 } else { 475 pool = g_bdev_mgr.buf_large_pool; 476 stailq = &mgmt_ch->need_buf_large; 477 } 478 479 buf = spdk_mempool_get(pool); 480 481 if (!buf) { 482 STAILQ_INSERT_TAIL(stailq, bdev_io, internal.buf_link); 483 } else { 484 aligned_buf = (void *)(((uintptr_t)buf + 511) & ~511UL); 485 spdk_bdev_io_set_buf(bdev_io, aligned_buf, len); 486 487 bdev_io->internal.buf = buf; 488 bdev_io->internal.get_buf_cb(bdev_io->internal.ch->channel, bdev_io); 489 } 490 } 491 492 static int 493 spdk_bdev_module_get_max_ctx_size(void) 494 { 495 struct spdk_bdev_module *bdev_module; 496 int max_bdev_module_size = 0; 497 498 TAILQ_FOREACH(bdev_module, &g_bdev_mgr.bdev_modules, internal.tailq) { 499 if (bdev_module->get_ctx_size && bdev_module->get_ctx_size() > max_bdev_module_size) { 500 max_bdev_module_size = bdev_module->get_ctx_size(); 501 } 502 } 503 504 return max_bdev_module_size; 505 } 506 507 void 508 spdk_bdev_config_text(FILE *fp) 509 { 510 struct spdk_bdev_module *bdev_module; 511 512 TAILQ_FOREACH(bdev_module, &g_bdev_mgr.bdev_modules, internal.tailq) { 513 if (bdev_module->config_text) { 514 bdev_module->config_text(fp); 515 } 516 } 517 } 518 519 void 520 spdk_bdev_subsystem_config_json(struct spdk_json_write_ctx *w) 521 { 522 struct spdk_bdev_module *bdev_module; 523 struct spdk_bdev *bdev; 524 525 assert(w != NULL); 526 527 spdk_json_write_array_begin(w); 528 529 spdk_json_write_object_begin(w); 530 spdk_json_write_named_string(w, "method", "set_bdev_options"); 531 spdk_json_write_name(w, "params"); 532 spdk_json_write_object_begin(w); 533 spdk_json_write_named_uint32(w, "bdev_io_pool_size", g_bdev_opts.bdev_io_pool_size); 534 spdk_json_write_named_uint32(w, "bdev_io_cache_size", g_bdev_opts.bdev_io_cache_size); 535 spdk_json_write_object_end(w); 536 spdk_json_write_object_end(w); 537 538 TAILQ_FOREACH(bdev_module, &g_bdev_mgr.bdev_modules, internal.tailq) { 539 if (bdev_module->config_json) { 540 bdev_module->config_json(w); 541 } 542 } 543 544 TAILQ_FOREACH(bdev, &g_bdev_mgr.bdevs, internal.link) { 545 if (bdev->fn_table->write_config_json) { 546 bdev->fn_table->write_config_json(bdev, w); 547 } 548 } 549 550 spdk_json_write_array_end(w); 551 } 552 553 static int 554 spdk_bdev_mgmt_channel_create(void *io_device, void *ctx_buf) 555 { 556 struct spdk_bdev_mgmt_channel *ch = ctx_buf; 557 struct spdk_bdev_io *bdev_io; 558 uint32_t i; 559 560 STAILQ_INIT(&ch->need_buf_small); 561 STAILQ_INIT(&ch->need_buf_large); 562 563 STAILQ_INIT(&ch->per_thread_cache); 564 ch->bdev_io_cache_size = g_bdev_opts.bdev_io_cache_size; 565 566 /* Pre-populate bdev_io cache to ensure this thread cannot be starved. */ 567 ch->per_thread_cache_count = 0; 568 for (i = 0; i < ch->bdev_io_cache_size; i++) { 569 bdev_io = spdk_mempool_get(g_bdev_mgr.bdev_io_pool); 570 assert(bdev_io != NULL); 571 ch->per_thread_cache_count++; 572 STAILQ_INSERT_TAIL(&ch->per_thread_cache, bdev_io, internal.buf_link); 573 } 574 575 TAILQ_INIT(&ch->shared_resources); 576 TAILQ_INIT(&ch->io_wait_queue); 577 578 return 0; 579 } 580 581 static void 582 spdk_bdev_mgmt_channel_destroy(void *io_device, void *ctx_buf) 583 { 584 struct spdk_bdev_mgmt_channel *ch = ctx_buf; 585 struct spdk_bdev_io *bdev_io; 586 587 if (!STAILQ_EMPTY(&ch->need_buf_small) || !STAILQ_EMPTY(&ch->need_buf_large)) { 588 SPDK_ERRLOG("Pending I/O list wasn't empty on mgmt channel free\n"); 589 } 590 591 if (!TAILQ_EMPTY(&ch->shared_resources)) { 592 SPDK_ERRLOG("Module channel list wasn't empty on mgmt channel free\n"); 593 } 594 595 while (!STAILQ_EMPTY(&ch->per_thread_cache)) { 596 bdev_io = STAILQ_FIRST(&ch->per_thread_cache); 597 STAILQ_REMOVE_HEAD(&ch->per_thread_cache, internal.buf_link); 598 ch->per_thread_cache_count--; 599 spdk_mempool_put(g_bdev_mgr.bdev_io_pool, (void *)bdev_io); 600 } 601 602 assert(ch->per_thread_cache_count == 0); 603 } 604 605 static void 606 spdk_bdev_init_complete(int rc) 607 { 608 spdk_bdev_init_cb cb_fn = g_init_cb_fn; 609 void *cb_arg = g_init_cb_arg; 610 struct spdk_bdev_module *m; 611 612 g_bdev_mgr.init_complete = true; 613 g_init_cb_fn = NULL; 614 g_init_cb_arg = NULL; 615 616 /* 617 * For modules that need to know when subsystem init is complete, 618 * inform them now. 619 */ 620 if (rc == 0) { 621 TAILQ_FOREACH(m, &g_bdev_mgr.bdev_modules, internal.tailq) { 622 if (m->init_complete) { 623 m->init_complete(); 624 } 625 } 626 } 627 628 cb_fn(cb_arg, rc); 629 } 630 631 static void 632 spdk_bdev_module_action_complete(void) 633 { 634 struct spdk_bdev_module *m; 635 636 /* 637 * Don't finish bdev subsystem initialization if 638 * module pre-initialization is still in progress, or 639 * the subsystem been already initialized. 640 */ 641 if (!g_bdev_mgr.module_init_complete || g_bdev_mgr.init_complete) { 642 return; 643 } 644 645 /* 646 * Check all bdev modules for inits/examinations in progress. If any 647 * exist, return immediately since we cannot finish bdev subsystem 648 * initialization until all are completed. 649 */ 650 TAILQ_FOREACH(m, &g_bdev_mgr.bdev_modules, internal.tailq) { 651 if (m->internal.action_in_progress > 0) { 652 return; 653 } 654 } 655 656 /* 657 * Modules already finished initialization - now that all 658 * the bdev modules have finished their asynchronous I/O 659 * processing, the entire bdev layer can be marked as complete. 660 */ 661 spdk_bdev_init_complete(0); 662 } 663 664 static void 665 spdk_bdev_module_action_done(struct spdk_bdev_module *module) 666 { 667 assert(module->internal.action_in_progress > 0); 668 module->internal.action_in_progress--; 669 spdk_bdev_module_action_complete(); 670 } 671 672 void 673 spdk_bdev_module_init_done(struct spdk_bdev_module *module) 674 { 675 spdk_bdev_module_action_done(module); 676 } 677 678 void 679 spdk_bdev_module_examine_done(struct spdk_bdev_module *module) 680 { 681 spdk_bdev_module_action_done(module); 682 } 683 684 /** The last initialized bdev module */ 685 static struct spdk_bdev_module *g_resume_bdev_module = NULL; 686 687 static int 688 spdk_bdev_modules_init(void) 689 { 690 struct spdk_bdev_module *module; 691 int rc = 0; 692 693 TAILQ_FOREACH(module, &g_bdev_mgr.bdev_modules, internal.tailq) { 694 g_resume_bdev_module = module; 695 rc = module->module_init(); 696 if (rc != 0) { 697 return rc; 698 } 699 } 700 701 g_resume_bdev_module = NULL; 702 return 0; 703 } 704 705 706 static void 707 spdk_bdev_init_failed_complete(void *cb_arg) 708 { 709 spdk_bdev_init_complete(-1); 710 } 711 712 static void 713 spdk_bdev_init_failed(void *cb_arg) 714 { 715 spdk_bdev_finish(spdk_bdev_init_failed_complete, NULL); 716 } 717 718 void 719 spdk_bdev_initialize(spdk_bdev_init_cb cb_fn, void *cb_arg) 720 { 721 struct spdk_conf_section *sp; 722 struct spdk_bdev_opts bdev_opts; 723 int32_t bdev_io_pool_size, bdev_io_cache_size; 724 int cache_size; 725 int rc = 0; 726 char mempool_name[32]; 727 728 assert(cb_fn != NULL); 729 730 sp = spdk_conf_find_section(NULL, "Bdev"); 731 if (sp != NULL) { 732 spdk_bdev_get_opts(&bdev_opts); 733 734 bdev_io_pool_size = spdk_conf_section_get_intval(sp, "BdevIoPoolSize"); 735 if (bdev_io_pool_size >= 0) { 736 bdev_opts.bdev_io_pool_size = bdev_io_pool_size; 737 } 738 739 bdev_io_cache_size = spdk_conf_section_get_intval(sp, "BdevIoCacheSize"); 740 if (bdev_io_cache_size >= 0) { 741 bdev_opts.bdev_io_cache_size = bdev_io_cache_size; 742 } 743 744 if (spdk_bdev_set_opts(&bdev_opts)) { 745 spdk_bdev_init_complete(-1); 746 return; 747 } 748 749 assert(memcmp(&bdev_opts, &g_bdev_opts, sizeof(bdev_opts)) == 0); 750 } 751 752 g_init_cb_fn = cb_fn; 753 g_init_cb_arg = cb_arg; 754 755 snprintf(mempool_name, sizeof(mempool_name), "bdev_io_%d", getpid()); 756 757 g_bdev_mgr.bdev_io_pool = spdk_mempool_create(mempool_name, 758 g_bdev_opts.bdev_io_pool_size, 759 sizeof(struct spdk_bdev_io) + 760 spdk_bdev_module_get_max_ctx_size(), 761 0, 762 SPDK_ENV_SOCKET_ID_ANY); 763 764 if (g_bdev_mgr.bdev_io_pool == NULL) { 765 SPDK_ERRLOG("could not allocate spdk_bdev_io pool\n"); 766 spdk_bdev_init_complete(-1); 767 return; 768 } 769 770 /** 771 * Ensure no more than half of the total buffers end up local caches, by 772 * using spdk_thread_get_count() to determine how many local caches we need 773 * to account for. 774 */ 775 cache_size = BUF_SMALL_POOL_SIZE / (2 * spdk_thread_get_count()); 776 snprintf(mempool_name, sizeof(mempool_name), "buf_small_pool_%d", getpid()); 777 778 g_bdev_mgr.buf_small_pool = spdk_mempool_create(mempool_name, 779 BUF_SMALL_POOL_SIZE, 780 SPDK_BDEV_SMALL_BUF_MAX_SIZE + 512, 781 cache_size, 782 SPDK_ENV_SOCKET_ID_ANY); 783 if (!g_bdev_mgr.buf_small_pool) { 784 SPDK_ERRLOG("create rbuf small pool failed\n"); 785 spdk_bdev_init_complete(-1); 786 return; 787 } 788 789 cache_size = BUF_LARGE_POOL_SIZE / (2 * spdk_thread_get_count()); 790 snprintf(mempool_name, sizeof(mempool_name), "buf_large_pool_%d", getpid()); 791 792 g_bdev_mgr.buf_large_pool = spdk_mempool_create(mempool_name, 793 BUF_LARGE_POOL_SIZE, 794 SPDK_BDEV_LARGE_BUF_MAX_SIZE + 512, 795 cache_size, 796 SPDK_ENV_SOCKET_ID_ANY); 797 if (!g_bdev_mgr.buf_large_pool) { 798 SPDK_ERRLOG("create rbuf large pool failed\n"); 799 spdk_bdev_init_complete(-1); 800 return; 801 } 802 803 g_bdev_mgr.zero_buffer = spdk_dma_zmalloc(ZERO_BUFFER_SIZE, ZERO_BUFFER_SIZE, 804 NULL); 805 if (!g_bdev_mgr.zero_buffer) { 806 SPDK_ERRLOG("create bdev zero buffer failed\n"); 807 spdk_bdev_init_complete(-1); 808 return; 809 } 810 811 #ifdef SPDK_CONFIG_VTUNE 812 g_bdev_mgr.domain = __itt_domain_create("spdk_bdev"); 813 #endif 814 815 spdk_io_device_register(&g_bdev_mgr, spdk_bdev_mgmt_channel_create, 816 spdk_bdev_mgmt_channel_destroy, 817 sizeof(struct spdk_bdev_mgmt_channel), 818 "bdev_mgr"); 819 820 rc = spdk_bdev_modules_init(); 821 g_bdev_mgr.module_init_complete = true; 822 if (rc != 0) { 823 SPDK_ERRLOG("bdev modules init failed\n"); 824 spdk_thread_send_msg(spdk_get_thread(), spdk_bdev_init_failed, NULL); 825 return; 826 } 827 828 spdk_bdev_module_action_complete(); 829 } 830 831 static void 832 spdk_bdev_mgr_unregister_cb(void *io_device) 833 { 834 spdk_bdev_fini_cb cb_fn = g_fini_cb_fn; 835 836 if (spdk_mempool_count(g_bdev_mgr.bdev_io_pool) != g_bdev_opts.bdev_io_pool_size) { 837 SPDK_ERRLOG("bdev IO pool count is %zu but should be %u\n", 838 spdk_mempool_count(g_bdev_mgr.bdev_io_pool), 839 g_bdev_opts.bdev_io_pool_size); 840 } 841 842 if (spdk_mempool_count(g_bdev_mgr.buf_small_pool) != BUF_SMALL_POOL_SIZE) { 843 SPDK_ERRLOG("Small buffer pool count is %zu but should be %u\n", 844 spdk_mempool_count(g_bdev_mgr.buf_small_pool), 845 BUF_SMALL_POOL_SIZE); 846 assert(false); 847 } 848 849 if (spdk_mempool_count(g_bdev_mgr.buf_large_pool) != BUF_LARGE_POOL_SIZE) { 850 SPDK_ERRLOG("Large buffer pool count is %zu but should be %u\n", 851 spdk_mempool_count(g_bdev_mgr.buf_large_pool), 852 BUF_LARGE_POOL_SIZE); 853 assert(false); 854 } 855 856 spdk_mempool_free(g_bdev_mgr.bdev_io_pool); 857 spdk_mempool_free(g_bdev_mgr.buf_small_pool); 858 spdk_mempool_free(g_bdev_mgr.buf_large_pool); 859 spdk_dma_free(g_bdev_mgr.zero_buffer); 860 861 cb_fn(g_fini_cb_arg); 862 g_fini_cb_fn = NULL; 863 g_fini_cb_arg = NULL; 864 } 865 866 static void 867 spdk_bdev_module_finish_iter(void *arg) 868 { 869 struct spdk_bdev_module *bdev_module; 870 871 /* Start iterating from the last touched module */ 872 if (!g_resume_bdev_module) { 873 bdev_module = TAILQ_LAST(&g_bdev_mgr.bdev_modules, bdev_module_list); 874 } else { 875 bdev_module = TAILQ_PREV(g_resume_bdev_module, bdev_module_list, 876 internal.tailq); 877 } 878 879 while (bdev_module) { 880 if (bdev_module->async_fini) { 881 /* Save our place so we can resume later. We must 882 * save the variable here, before calling module_fini() 883 * below, because in some cases the module may immediately 884 * call spdk_bdev_module_finish_done() and re-enter 885 * this function to continue iterating. */ 886 g_resume_bdev_module = bdev_module; 887 } 888 889 if (bdev_module->module_fini) { 890 bdev_module->module_fini(); 891 } 892 893 if (bdev_module->async_fini) { 894 return; 895 } 896 897 bdev_module = TAILQ_PREV(bdev_module, bdev_module_list, 898 internal.tailq); 899 } 900 901 g_resume_bdev_module = NULL; 902 spdk_io_device_unregister(&g_bdev_mgr, spdk_bdev_mgr_unregister_cb); 903 } 904 905 void 906 spdk_bdev_module_finish_done(void) 907 { 908 if (spdk_get_thread() != g_fini_thread) { 909 spdk_thread_send_msg(g_fini_thread, spdk_bdev_module_finish_iter, NULL); 910 } else { 911 spdk_bdev_module_finish_iter(NULL); 912 } 913 } 914 915 static void 916 _spdk_bdev_finish_unregister_bdevs_iter(void *cb_arg, int bdeverrno) 917 { 918 struct spdk_bdev *bdev = cb_arg; 919 920 if (bdeverrno && bdev) { 921 SPDK_WARNLOG("Unable to unregister bdev '%s' during spdk_bdev_finish()\n", 922 bdev->name); 923 924 /* 925 * Since the call to spdk_bdev_unregister() failed, we have no way to free this 926 * bdev; try to continue by manually removing this bdev from the list and continue 927 * with the next bdev in the list. 928 */ 929 TAILQ_REMOVE(&g_bdev_mgr.bdevs, bdev, internal.link); 930 } 931 932 if (TAILQ_EMPTY(&g_bdev_mgr.bdevs)) { 933 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Done unregistering bdevs\n"); 934 /* 935 * Bdev module finish need to be deffered as we might be in the middle of some context 936 * (like bdev part free) that will use this bdev (or private bdev driver ctx data) 937 * after returning. 938 */ 939 spdk_thread_send_msg(spdk_get_thread(), spdk_bdev_module_finish_iter, NULL); 940 return; 941 } 942 943 /* 944 * Unregister the last bdev in the list. The last bdev in the list should be a bdev 945 * that has no bdevs that depend on it. 946 */ 947 bdev = TAILQ_LAST(&g_bdev_mgr.bdevs, spdk_bdev_list); 948 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Unregistering bdev '%s'\n", bdev->name); 949 spdk_bdev_unregister(bdev, _spdk_bdev_finish_unregister_bdevs_iter, bdev); 950 } 951 952 void 953 spdk_bdev_finish(spdk_bdev_fini_cb cb_fn, void *cb_arg) 954 { 955 struct spdk_bdev_module *m; 956 957 assert(cb_fn != NULL); 958 959 g_fini_thread = spdk_get_thread(); 960 961 g_fini_cb_fn = cb_fn; 962 g_fini_cb_arg = cb_arg; 963 964 TAILQ_FOREACH(m, &g_bdev_mgr.bdev_modules, internal.tailq) { 965 if (m->fini_start) { 966 m->fini_start(); 967 } 968 } 969 970 _spdk_bdev_finish_unregister_bdevs_iter(NULL, 0); 971 } 972 973 static struct spdk_bdev_io * 974 spdk_bdev_get_io(struct spdk_bdev_channel *channel) 975 { 976 struct spdk_bdev_mgmt_channel *ch = channel->shared_resource->mgmt_ch; 977 struct spdk_bdev_io *bdev_io; 978 979 if (ch->per_thread_cache_count > 0) { 980 bdev_io = STAILQ_FIRST(&ch->per_thread_cache); 981 STAILQ_REMOVE_HEAD(&ch->per_thread_cache, internal.buf_link); 982 ch->per_thread_cache_count--; 983 } else if (spdk_unlikely(!TAILQ_EMPTY(&ch->io_wait_queue))) { 984 /* 985 * Don't try to look for bdev_ios in the global pool if there are 986 * waiters on bdev_ios - we don't want this caller to jump the line. 987 */ 988 bdev_io = NULL; 989 } else { 990 bdev_io = spdk_mempool_get(g_bdev_mgr.bdev_io_pool); 991 } 992 993 return bdev_io; 994 } 995 996 void 997 spdk_bdev_free_io(struct spdk_bdev_io *bdev_io) 998 { 999 struct spdk_bdev_mgmt_channel *ch = bdev_io->internal.ch->shared_resource->mgmt_ch; 1000 1001 assert(bdev_io != NULL); 1002 assert(bdev_io->internal.status != SPDK_BDEV_IO_STATUS_PENDING); 1003 1004 if (bdev_io->internal.buf != NULL) { 1005 spdk_bdev_io_put_buf(bdev_io); 1006 } 1007 1008 if (ch->per_thread_cache_count < ch->bdev_io_cache_size) { 1009 ch->per_thread_cache_count++; 1010 STAILQ_INSERT_TAIL(&ch->per_thread_cache, bdev_io, internal.buf_link); 1011 while (ch->per_thread_cache_count > 0 && !TAILQ_EMPTY(&ch->io_wait_queue)) { 1012 struct spdk_bdev_io_wait_entry *entry; 1013 1014 entry = TAILQ_FIRST(&ch->io_wait_queue); 1015 TAILQ_REMOVE(&ch->io_wait_queue, entry, link); 1016 entry->cb_fn(entry->cb_arg); 1017 } 1018 } else { 1019 /* We should never have a full cache with entries on the io wait queue. */ 1020 assert(TAILQ_EMPTY(&ch->io_wait_queue)); 1021 spdk_mempool_put(g_bdev_mgr.bdev_io_pool, (void *)bdev_io); 1022 } 1023 } 1024 1025 static bool 1026 _spdk_bdev_qos_is_iops_rate_limit(enum spdk_bdev_qos_rate_limit_type limit) 1027 { 1028 assert(limit != SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES); 1029 1030 switch (limit) { 1031 case SPDK_BDEV_QOS_RW_IOPS_RATE_LIMIT: 1032 return true; 1033 case SPDK_BDEV_QOS_RW_BPS_RATE_LIMIT: 1034 return false; 1035 case SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES: 1036 default: 1037 return false; 1038 } 1039 } 1040 1041 static bool 1042 _spdk_bdev_qos_io_to_limit(struct spdk_bdev_io *bdev_io) 1043 { 1044 switch (bdev_io->type) { 1045 case SPDK_BDEV_IO_TYPE_NVME_IO: 1046 case SPDK_BDEV_IO_TYPE_NVME_IO_MD: 1047 case SPDK_BDEV_IO_TYPE_READ: 1048 case SPDK_BDEV_IO_TYPE_WRITE: 1049 case SPDK_BDEV_IO_TYPE_UNMAP: 1050 case SPDK_BDEV_IO_TYPE_WRITE_ZEROES: 1051 return true; 1052 default: 1053 return false; 1054 } 1055 } 1056 1057 static uint64_t 1058 _spdk_bdev_get_io_size_in_byte(struct spdk_bdev_io *bdev_io) 1059 { 1060 struct spdk_bdev *bdev = bdev_io->bdev; 1061 1062 switch (bdev_io->type) { 1063 case SPDK_BDEV_IO_TYPE_NVME_IO: 1064 case SPDK_BDEV_IO_TYPE_NVME_IO_MD: 1065 return bdev_io->u.nvme_passthru.nbytes; 1066 case SPDK_BDEV_IO_TYPE_READ: 1067 case SPDK_BDEV_IO_TYPE_WRITE: 1068 case SPDK_BDEV_IO_TYPE_UNMAP: 1069 case SPDK_BDEV_IO_TYPE_WRITE_ZEROES: 1070 return bdev_io->u.bdev.num_blocks * bdev->blocklen; 1071 default: 1072 return 0; 1073 } 1074 } 1075 1076 static void 1077 _spdk_bdev_qos_update_per_io(struct spdk_bdev_qos *qos, uint64_t io_size_in_byte) 1078 { 1079 int i; 1080 1081 for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) { 1082 if (qos->rate_limits[i].limit == SPDK_BDEV_QOS_LIMIT_NOT_DEFINED) { 1083 continue; 1084 } 1085 1086 switch (i) { 1087 case SPDK_BDEV_QOS_RW_IOPS_RATE_LIMIT: 1088 qos->rate_limits[i].remaining_this_timeslice--; 1089 break; 1090 case SPDK_BDEV_QOS_RW_BPS_RATE_LIMIT: 1091 qos->rate_limits[i].remaining_this_timeslice -= io_size_in_byte; 1092 break; 1093 case SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES: 1094 default: 1095 break; 1096 } 1097 } 1098 } 1099 1100 static void 1101 _spdk_bdev_qos_io_submit(struct spdk_bdev_channel *ch) 1102 { 1103 struct spdk_bdev_io *bdev_io = NULL; 1104 struct spdk_bdev *bdev = ch->bdev; 1105 struct spdk_bdev_qos *qos = bdev->internal.qos; 1106 struct spdk_bdev_shared_resource *shared_resource = ch->shared_resource; 1107 int i; 1108 bool to_limit_io; 1109 uint64_t io_size_in_byte; 1110 1111 while (!TAILQ_EMPTY(&qos->queued)) { 1112 for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) { 1113 if (qos->rate_limits[i].max_per_timeslice > 0 && 1114 (qos->rate_limits[i].remaining_this_timeslice <= 0)) { 1115 return; 1116 } 1117 } 1118 1119 bdev_io = TAILQ_FIRST(&qos->queued); 1120 TAILQ_REMOVE(&qos->queued, bdev_io, internal.link); 1121 ch->io_outstanding++; 1122 shared_resource->io_outstanding++; 1123 to_limit_io = _spdk_bdev_qos_io_to_limit(bdev_io); 1124 if (to_limit_io == true) { 1125 io_size_in_byte = _spdk_bdev_get_io_size_in_byte(bdev_io); 1126 _spdk_bdev_qos_update_per_io(qos, io_size_in_byte); 1127 } 1128 bdev->fn_table->submit_request(ch->channel, bdev_io); 1129 } 1130 } 1131 1132 static void 1133 _spdk_bdev_queue_io_wait_with_cb(struct spdk_bdev_io *bdev_io, spdk_bdev_io_wait_cb cb_fn) 1134 { 1135 int rc; 1136 1137 bdev_io->internal.waitq_entry.bdev = bdev_io->bdev; 1138 bdev_io->internal.waitq_entry.cb_fn = cb_fn; 1139 bdev_io->internal.waitq_entry.cb_arg = bdev_io; 1140 rc = spdk_bdev_queue_io_wait(bdev_io->bdev, spdk_io_channel_from_ctx(bdev_io->internal.ch), 1141 &bdev_io->internal.waitq_entry); 1142 if (rc != 0) { 1143 SPDK_ERRLOG("Queue IO failed, rc=%d\n", rc); 1144 bdev_io->internal.status = SPDK_BDEV_IO_STATUS_FAILED; 1145 bdev_io->internal.cb(bdev_io, false, bdev_io->internal.caller_ctx); 1146 } 1147 } 1148 1149 static bool 1150 _spdk_bdev_io_type_can_split(uint8_t type) 1151 { 1152 assert(type != SPDK_BDEV_IO_TYPE_INVALID); 1153 assert(type < SPDK_BDEV_NUM_IO_TYPES); 1154 1155 /* Only split READ and WRITE I/O. Theoretically other types of I/O like 1156 * UNMAP could be split, but these types of I/O are typically much larger 1157 * in size (sometimes the size of the entire block device), and the bdev 1158 * module can more efficiently split these types of I/O. Plus those types 1159 * of I/O do not have a payload, which makes the splitting process simpler. 1160 */ 1161 if (type == SPDK_BDEV_IO_TYPE_READ || type == SPDK_BDEV_IO_TYPE_WRITE) { 1162 return true; 1163 } else { 1164 return false; 1165 } 1166 } 1167 1168 static bool 1169 _spdk_bdev_io_should_split(struct spdk_bdev_io *bdev_io) 1170 { 1171 uint64_t start_stripe, end_stripe; 1172 uint32_t io_boundary = bdev_io->bdev->optimal_io_boundary; 1173 1174 if (io_boundary == 0) { 1175 return false; 1176 } 1177 1178 if (!_spdk_bdev_io_type_can_split(bdev_io->type)) { 1179 return false; 1180 } 1181 1182 start_stripe = bdev_io->u.bdev.offset_blocks; 1183 end_stripe = start_stripe + bdev_io->u.bdev.num_blocks - 1; 1184 /* Avoid expensive div operations if possible. These spdk_u32 functions are very cheap. */ 1185 if (spdk_likely(spdk_u32_is_pow2(io_boundary))) { 1186 start_stripe >>= spdk_u32log2(io_boundary); 1187 end_stripe >>= spdk_u32log2(io_boundary); 1188 } else { 1189 start_stripe /= io_boundary; 1190 end_stripe /= io_boundary; 1191 } 1192 return (start_stripe != end_stripe); 1193 } 1194 1195 static uint32_t 1196 _to_next_boundary(uint64_t offset, uint32_t boundary) 1197 { 1198 return (boundary - (offset % boundary)); 1199 } 1200 1201 static void 1202 _spdk_bdev_io_split_done(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg); 1203 1204 static void 1205 _spdk_bdev_io_split_with_payload(void *_bdev_io) 1206 { 1207 struct spdk_bdev_io *bdev_io = _bdev_io; 1208 uint64_t current_offset, remaining, bytes_handled; 1209 uint32_t blocklen, to_next_boundary, to_next_boundary_bytes; 1210 struct iovec *parent_iov; 1211 uint64_t parent_iov_offset, child_iov_len; 1212 uint32_t child_iovcnt; 1213 int rc; 1214 1215 remaining = bdev_io->u.bdev.split_remaining_num_blocks; 1216 current_offset = bdev_io->u.bdev.split_current_offset_blocks; 1217 blocklen = bdev_io->bdev->blocklen; 1218 bytes_handled = (current_offset - bdev_io->u.bdev.offset_blocks) * blocklen; 1219 parent_iov = &bdev_io->u.bdev.iovs[0]; 1220 parent_iov_offset = 0; 1221 1222 while (bytes_handled > 0) { 1223 if (bytes_handled >= parent_iov->iov_len) { 1224 bytes_handled -= parent_iov->iov_len; 1225 parent_iov++; 1226 continue; 1227 } 1228 parent_iov_offset += bytes_handled; 1229 break; 1230 } 1231 1232 to_next_boundary = _to_next_boundary(current_offset, bdev_io->bdev->optimal_io_boundary); 1233 to_next_boundary = spdk_min(remaining, to_next_boundary); 1234 to_next_boundary_bytes = to_next_boundary * blocklen; 1235 child_iovcnt = 0; 1236 while (to_next_boundary_bytes > 0 && child_iovcnt < BDEV_IO_NUM_CHILD_IOV) { 1237 child_iov_len = spdk_min(to_next_boundary_bytes, parent_iov->iov_len - parent_iov_offset); 1238 to_next_boundary_bytes -= child_iov_len; 1239 1240 bdev_io->child_iov[child_iovcnt].iov_base = parent_iov->iov_base + parent_iov_offset; 1241 bdev_io->child_iov[child_iovcnt].iov_len = child_iov_len; 1242 1243 parent_iov++; 1244 parent_iov_offset = 0; 1245 child_iovcnt++; 1246 } 1247 1248 if (to_next_boundary_bytes > 0) { 1249 /* We had to stop this child I/O early because we ran out of 1250 * child_iov space. Make sure the iovs collected are valid and 1251 * then adjust to_next_boundary before starting the child I/O. 1252 */ 1253 if ((to_next_boundary_bytes % blocklen) != 0) { 1254 SPDK_ERRLOG("Remaining %" PRIu32 " is not multiple of block size %" PRIu32 "\n", 1255 to_next_boundary_bytes, blocklen); 1256 bdev_io->internal.status = SPDK_BDEV_IO_STATUS_FAILED; 1257 bdev_io->internal.cb(bdev_io, false, bdev_io->internal.caller_ctx); 1258 return; 1259 } 1260 to_next_boundary -= to_next_boundary_bytes / blocklen; 1261 } 1262 1263 if (bdev_io->type == SPDK_BDEV_IO_TYPE_READ) { 1264 rc = spdk_bdev_readv_blocks(bdev_io->internal.desc, 1265 spdk_io_channel_from_ctx(bdev_io->internal.ch), 1266 bdev_io->child_iov, child_iovcnt, current_offset, to_next_boundary, 1267 _spdk_bdev_io_split_done, bdev_io); 1268 } else { 1269 rc = spdk_bdev_writev_blocks(bdev_io->internal.desc, 1270 spdk_io_channel_from_ctx(bdev_io->internal.ch), 1271 bdev_io->child_iov, child_iovcnt, current_offset, to_next_boundary, 1272 _spdk_bdev_io_split_done, bdev_io); 1273 } 1274 1275 if (rc == 0) { 1276 bdev_io->u.bdev.split_current_offset_blocks += to_next_boundary; 1277 bdev_io->u.bdev.split_remaining_num_blocks -= to_next_boundary; 1278 } else if (rc == -ENOMEM) { 1279 _spdk_bdev_queue_io_wait_with_cb(bdev_io, _spdk_bdev_io_split_with_payload); 1280 } else { 1281 bdev_io->internal.status = SPDK_BDEV_IO_STATUS_FAILED; 1282 bdev_io->internal.cb(bdev_io, false, bdev_io->internal.caller_ctx); 1283 } 1284 } 1285 1286 static void 1287 _spdk_bdev_io_split_done(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg) 1288 { 1289 struct spdk_bdev_io *parent_io = cb_arg; 1290 1291 spdk_bdev_free_io(bdev_io); 1292 1293 if (!success) { 1294 parent_io->internal.status = SPDK_BDEV_IO_STATUS_FAILED; 1295 parent_io->internal.cb(parent_io, false, parent_io->internal.caller_ctx); 1296 return; 1297 } 1298 1299 if (parent_io->u.bdev.split_remaining_num_blocks == 0) { 1300 parent_io->internal.status = SPDK_BDEV_IO_STATUS_SUCCESS; 1301 parent_io->internal.cb(parent_io, true, parent_io->internal.caller_ctx); 1302 return; 1303 } 1304 1305 /* 1306 * Continue with the splitting process. This function will complete the parent I/O if the 1307 * splitting is done. 1308 */ 1309 _spdk_bdev_io_split_with_payload(parent_io); 1310 } 1311 1312 static void 1313 _spdk_bdev_io_split(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io) 1314 { 1315 assert(_spdk_bdev_io_type_can_split(bdev_io->type)); 1316 1317 bdev_io->u.bdev.split_current_offset_blocks = bdev_io->u.bdev.offset_blocks; 1318 bdev_io->u.bdev.split_remaining_num_blocks = bdev_io->u.bdev.num_blocks; 1319 1320 _spdk_bdev_io_split_with_payload(bdev_io); 1321 } 1322 1323 static void 1324 _spdk_bdev_io_submit(void *ctx) 1325 { 1326 struct spdk_bdev_io *bdev_io = ctx; 1327 struct spdk_bdev *bdev = bdev_io->bdev; 1328 struct spdk_bdev_channel *bdev_ch = bdev_io->internal.ch; 1329 struct spdk_io_channel *ch = bdev_ch->channel; 1330 struct spdk_bdev_shared_resource *shared_resource = bdev_ch->shared_resource; 1331 uint64_t tsc; 1332 1333 tsc = spdk_get_ticks(); 1334 bdev_io->internal.submit_tsc = tsc; 1335 spdk_trace_record_tsc(tsc, TRACE_BDEV_IO_START, 0, 0, (uintptr_t)bdev_io, bdev_io->type); 1336 bdev_ch->io_outstanding++; 1337 shared_resource->io_outstanding++; 1338 bdev_io->internal.in_submit_request = true; 1339 if (spdk_likely(bdev_ch->flags == 0)) { 1340 if (spdk_likely(TAILQ_EMPTY(&shared_resource->nomem_io))) { 1341 bdev->fn_table->submit_request(ch, bdev_io); 1342 } else { 1343 bdev_ch->io_outstanding--; 1344 shared_resource->io_outstanding--; 1345 TAILQ_INSERT_TAIL(&shared_resource->nomem_io, bdev_io, internal.link); 1346 } 1347 } else if (bdev_ch->flags & BDEV_CH_RESET_IN_PROGRESS) { 1348 spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED); 1349 } else if (bdev_ch->flags & BDEV_CH_QOS_ENABLED) { 1350 bdev_ch->io_outstanding--; 1351 shared_resource->io_outstanding--; 1352 TAILQ_INSERT_TAIL(&bdev->internal.qos->queued, bdev_io, internal.link); 1353 _spdk_bdev_qos_io_submit(bdev_ch); 1354 } else { 1355 SPDK_ERRLOG("unknown bdev_ch flag %x found\n", bdev_ch->flags); 1356 spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED); 1357 } 1358 bdev_io->internal.in_submit_request = false; 1359 } 1360 1361 static void 1362 spdk_bdev_io_submit(struct spdk_bdev_io *bdev_io) 1363 { 1364 struct spdk_bdev *bdev = bdev_io->bdev; 1365 struct spdk_thread *thread = spdk_io_channel_get_thread(bdev_io->internal.ch->channel); 1366 1367 assert(thread != NULL); 1368 assert(bdev_io->internal.status == SPDK_BDEV_IO_STATUS_PENDING); 1369 1370 if (bdev->split_on_optimal_io_boundary && _spdk_bdev_io_should_split(bdev_io)) { 1371 if (bdev_io->type == SPDK_BDEV_IO_TYPE_READ) { 1372 spdk_bdev_io_get_buf(bdev_io, _spdk_bdev_io_split, 1373 bdev_io->u.bdev.num_blocks * bdev_io->bdev->blocklen); 1374 } else { 1375 _spdk_bdev_io_split(NULL, bdev_io); 1376 } 1377 return; 1378 } 1379 1380 if (bdev_io->internal.ch->flags & BDEV_CH_QOS_ENABLED) { 1381 if ((thread == bdev->internal.qos->thread) || !bdev->internal.qos->thread) { 1382 _spdk_bdev_io_submit(bdev_io); 1383 } else { 1384 bdev_io->internal.io_submit_ch = bdev_io->internal.ch; 1385 bdev_io->internal.ch = bdev->internal.qos->ch; 1386 spdk_thread_send_msg(bdev->internal.qos->thread, _spdk_bdev_io_submit, bdev_io); 1387 } 1388 } else { 1389 _spdk_bdev_io_submit(bdev_io); 1390 } 1391 } 1392 1393 static void 1394 spdk_bdev_io_submit_reset(struct spdk_bdev_io *bdev_io) 1395 { 1396 struct spdk_bdev *bdev = bdev_io->bdev; 1397 struct spdk_bdev_channel *bdev_ch = bdev_io->internal.ch; 1398 struct spdk_io_channel *ch = bdev_ch->channel; 1399 1400 assert(bdev_io->internal.status == SPDK_BDEV_IO_STATUS_PENDING); 1401 1402 bdev_io->internal.in_submit_request = true; 1403 bdev->fn_table->submit_request(ch, bdev_io); 1404 bdev_io->internal.in_submit_request = false; 1405 } 1406 1407 static void 1408 spdk_bdev_io_init(struct spdk_bdev_io *bdev_io, 1409 struct spdk_bdev *bdev, void *cb_arg, 1410 spdk_bdev_io_completion_cb cb) 1411 { 1412 bdev_io->bdev = bdev; 1413 bdev_io->internal.caller_ctx = cb_arg; 1414 bdev_io->internal.cb = cb; 1415 bdev_io->internal.status = SPDK_BDEV_IO_STATUS_PENDING; 1416 bdev_io->internal.in_submit_request = false; 1417 bdev_io->internal.buf = NULL; 1418 bdev_io->internal.io_submit_ch = NULL; 1419 } 1420 1421 static bool 1422 _spdk_bdev_io_type_supported(struct spdk_bdev *bdev, enum spdk_bdev_io_type io_type) 1423 { 1424 return bdev->fn_table->io_type_supported(bdev->ctxt, io_type); 1425 } 1426 1427 bool 1428 spdk_bdev_io_type_supported(struct spdk_bdev *bdev, enum spdk_bdev_io_type io_type) 1429 { 1430 bool supported; 1431 1432 supported = _spdk_bdev_io_type_supported(bdev, io_type); 1433 1434 if (!supported) { 1435 switch (io_type) { 1436 case SPDK_BDEV_IO_TYPE_WRITE_ZEROES: 1437 /* The bdev layer will emulate write zeroes as long as write is supported. */ 1438 supported = _spdk_bdev_io_type_supported(bdev, SPDK_BDEV_IO_TYPE_WRITE); 1439 break; 1440 default: 1441 break; 1442 } 1443 } 1444 1445 return supported; 1446 } 1447 1448 int 1449 spdk_bdev_dump_info_json(struct spdk_bdev *bdev, struct spdk_json_write_ctx *w) 1450 { 1451 if (bdev->fn_table->dump_info_json) { 1452 return bdev->fn_table->dump_info_json(bdev->ctxt, w); 1453 } 1454 1455 return 0; 1456 } 1457 1458 static void 1459 spdk_bdev_qos_update_max_quota_per_timeslice(struct spdk_bdev_qos *qos) 1460 { 1461 uint32_t max_per_timeslice = 0; 1462 int i; 1463 1464 for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) { 1465 if (qos->rate_limits[i].limit == SPDK_BDEV_QOS_LIMIT_NOT_DEFINED) { 1466 qos->rate_limits[i].max_per_timeslice = 0; 1467 continue; 1468 } 1469 1470 max_per_timeslice = qos->rate_limits[i].limit * 1471 SPDK_BDEV_QOS_TIMESLICE_IN_USEC / SPDK_SEC_TO_USEC; 1472 1473 qos->rate_limits[i].max_per_timeslice = spdk_max(max_per_timeslice, 1474 qos->rate_limits[i].min_per_timeslice); 1475 1476 qos->rate_limits[i].remaining_this_timeslice = qos->rate_limits[i].max_per_timeslice; 1477 } 1478 } 1479 1480 static int 1481 spdk_bdev_channel_poll_qos(void *arg) 1482 { 1483 struct spdk_bdev_qos *qos = arg; 1484 uint64_t now = spdk_get_ticks(); 1485 int i; 1486 1487 if (now < (qos->last_timeslice + qos->timeslice_size)) { 1488 /* We received our callback earlier than expected - return 1489 * immediately and wait to do accounting until at least one 1490 * timeslice has actually expired. This should never happen 1491 * with a well-behaved timer implementation. 1492 */ 1493 return 0; 1494 } 1495 1496 /* Reset for next round of rate limiting */ 1497 for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) { 1498 /* We may have allowed the IOs or bytes to slightly overrun in the last 1499 * timeslice. remaining_this_timeslice is signed, so if it's negative 1500 * here, we'll account for the overrun so that the next timeslice will 1501 * be appropriately reduced. 1502 */ 1503 if (qos->rate_limits[i].remaining_this_timeslice > 0) { 1504 qos->rate_limits[i].remaining_this_timeslice = 0; 1505 } 1506 } 1507 1508 while (now >= (qos->last_timeslice + qos->timeslice_size)) { 1509 qos->last_timeslice += qos->timeslice_size; 1510 for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) { 1511 qos->rate_limits[i].remaining_this_timeslice += 1512 qos->rate_limits[i].max_per_timeslice; 1513 } 1514 } 1515 1516 _spdk_bdev_qos_io_submit(qos->ch); 1517 1518 return -1; 1519 } 1520 1521 static void 1522 _spdk_bdev_channel_destroy_resource(struct spdk_bdev_channel *ch) 1523 { 1524 struct spdk_bdev_shared_resource *shared_resource; 1525 1526 if (!ch) { 1527 return; 1528 } 1529 1530 if (ch->channel) { 1531 spdk_put_io_channel(ch->channel); 1532 } 1533 1534 assert(ch->io_outstanding == 0); 1535 1536 shared_resource = ch->shared_resource; 1537 if (shared_resource) { 1538 assert(ch->io_outstanding == 0); 1539 assert(shared_resource->ref > 0); 1540 shared_resource->ref--; 1541 if (shared_resource->ref == 0) { 1542 assert(shared_resource->io_outstanding == 0); 1543 TAILQ_REMOVE(&shared_resource->mgmt_ch->shared_resources, shared_resource, link); 1544 spdk_put_io_channel(spdk_io_channel_from_ctx(shared_resource->mgmt_ch)); 1545 free(shared_resource); 1546 } 1547 } 1548 } 1549 1550 /* Caller must hold bdev->internal.mutex. */ 1551 static void 1552 _spdk_bdev_enable_qos(struct spdk_bdev *bdev, struct spdk_bdev_channel *ch) 1553 { 1554 struct spdk_bdev_qos *qos = bdev->internal.qos; 1555 int i; 1556 1557 /* Rate limiting on this bdev enabled */ 1558 if (qos) { 1559 if (qos->ch == NULL) { 1560 struct spdk_io_channel *io_ch; 1561 1562 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Selecting channel %p as QoS channel for bdev %s on thread %p\n", ch, 1563 bdev->name, spdk_get_thread()); 1564 1565 /* No qos channel has been selected, so set one up */ 1566 1567 /* Take another reference to ch */ 1568 io_ch = spdk_get_io_channel(__bdev_to_io_dev(bdev)); 1569 qos->ch = ch; 1570 1571 qos->thread = spdk_io_channel_get_thread(io_ch); 1572 1573 TAILQ_INIT(&qos->queued); 1574 1575 for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) { 1576 if (_spdk_bdev_qos_is_iops_rate_limit(i) == true) { 1577 qos->rate_limits[i].min_per_timeslice = 1578 SPDK_BDEV_QOS_MIN_IO_PER_TIMESLICE; 1579 } else { 1580 qos->rate_limits[i].min_per_timeslice = 1581 SPDK_BDEV_QOS_MIN_BYTE_PER_TIMESLICE; 1582 } 1583 1584 if (qos->rate_limits[i].limit == 0) { 1585 qos->rate_limits[i].limit = SPDK_BDEV_QOS_LIMIT_NOT_DEFINED; 1586 } 1587 } 1588 spdk_bdev_qos_update_max_quota_per_timeslice(qos); 1589 qos->timeslice_size = 1590 SPDK_BDEV_QOS_TIMESLICE_IN_USEC * spdk_get_ticks_hz() / SPDK_SEC_TO_USEC; 1591 qos->last_timeslice = spdk_get_ticks(); 1592 qos->poller = spdk_poller_register(spdk_bdev_channel_poll_qos, 1593 qos, 1594 SPDK_BDEV_QOS_TIMESLICE_IN_USEC); 1595 } 1596 1597 ch->flags |= BDEV_CH_QOS_ENABLED; 1598 } 1599 } 1600 1601 static int 1602 spdk_bdev_channel_create(void *io_device, void *ctx_buf) 1603 { 1604 struct spdk_bdev *bdev = __bdev_from_io_dev(io_device); 1605 struct spdk_bdev_channel *ch = ctx_buf; 1606 struct spdk_io_channel *mgmt_io_ch; 1607 struct spdk_bdev_mgmt_channel *mgmt_ch; 1608 struct spdk_bdev_shared_resource *shared_resource; 1609 1610 ch->bdev = bdev; 1611 ch->channel = bdev->fn_table->get_io_channel(bdev->ctxt); 1612 if (!ch->channel) { 1613 return -1; 1614 } 1615 1616 mgmt_io_ch = spdk_get_io_channel(&g_bdev_mgr); 1617 if (!mgmt_io_ch) { 1618 return -1; 1619 } 1620 1621 mgmt_ch = spdk_io_channel_get_ctx(mgmt_io_ch); 1622 TAILQ_FOREACH(shared_resource, &mgmt_ch->shared_resources, link) { 1623 if (shared_resource->shared_ch == ch->channel) { 1624 spdk_put_io_channel(mgmt_io_ch); 1625 shared_resource->ref++; 1626 break; 1627 } 1628 } 1629 1630 if (shared_resource == NULL) { 1631 shared_resource = calloc(1, sizeof(*shared_resource)); 1632 if (shared_resource == NULL) { 1633 spdk_put_io_channel(mgmt_io_ch); 1634 return -1; 1635 } 1636 1637 shared_resource->mgmt_ch = mgmt_ch; 1638 shared_resource->io_outstanding = 0; 1639 TAILQ_INIT(&shared_resource->nomem_io); 1640 shared_resource->nomem_threshold = 0; 1641 shared_resource->shared_ch = ch->channel; 1642 shared_resource->ref = 1; 1643 TAILQ_INSERT_TAIL(&mgmt_ch->shared_resources, shared_resource, link); 1644 } 1645 1646 memset(&ch->stat, 0, sizeof(ch->stat)); 1647 ch->stat.ticks_rate = spdk_get_ticks_hz(); 1648 ch->io_outstanding = 0; 1649 TAILQ_INIT(&ch->queued_resets); 1650 ch->flags = 0; 1651 ch->shared_resource = shared_resource; 1652 1653 #ifdef SPDK_CONFIG_VTUNE 1654 { 1655 char *name; 1656 __itt_init_ittlib(NULL, 0); 1657 name = spdk_sprintf_alloc("spdk_bdev_%s_%p", ch->bdev->name, ch); 1658 if (!name) { 1659 _spdk_bdev_channel_destroy_resource(ch); 1660 return -1; 1661 } 1662 ch->handle = __itt_string_handle_create(name); 1663 free(name); 1664 ch->start_tsc = spdk_get_ticks(); 1665 ch->interval_tsc = spdk_get_ticks_hz() / 100; 1666 memset(&ch->prev_stat, 0, sizeof(ch->prev_stat)); 1667 } 1668 #endif 1669 1670 pthread_mutex_lock(&bdev->internal.mutex); 1671 _spdk_bdev_enable_qos(bdev, ch); 1672 pthread_mutex_unlock(&bdev->internal.mutex); 1673 1674 return 0; 1675 } 1676 1677 /* 1678 * Abort I/O that are waiting on a data buffer. These types of I/O are 1679 * linked using the spdk_bdev_io internal.buf_link TAILQ_ENTRY. 1680 */ 1681 static void 1682 _spdk_bdev_abort_buf_io(bdev_io_stailq_t *queue, struct spdk_bdev_channel *ch) 1683 { 1684 bdev_io_stailq_t tmp; 1685 struct spdk_bdev_io *bdev_io; 1686 1687 STAILQ_INIT(&tmp); 1688 1689 while (!STAILQ_EMPTY(queue)) { 1690 bdev_io = STAILQ_FIRST(queue); 1691 STAILQ_REMOVE_HEAD(queue, internal.buf_link); 1692 if (bdev_io->internal.ch == ch) { 1693 spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED); 1694 } else { 1695 STAILQ_INSERT_TAIL(&tmp, bdev_io, internal.buf_link); 1696 } 1697 } 1698 1699 STAILQ_SWAP(&tmp, queue, spdk_bdev_io); 1700 } 1701 1702 /* 1703 * Abort I/O that are queued waiting for submission. These types of I/O are 1704 * linked using the spdk_bdev_io link TAILQ_ENTRY. 1705 */ 1706 static void 1707 _spdk_bdev_abort_queued_io(bdev_io_tailq_t *queue, struct spdk_bdev_channel *ch) 1708 { 1709 struct spdk_bdev_io *bdev_io, *tmp; 1710 1711 TAILQ_FOREACH_SAFE(bdev_io, queue, internal.link, tmp) { 1712 if (bdev_io->internal.ch == ch) { 1713 TAILQ_REMOVE(queue, bdev_io, internal.link); 1714 /* 1715 * spdk_bdev_io_complete() assumes that the completed I/O had 1716 * been submitted to the bdev module. Since in this case it 1717 * hadn't, bump io_outstanding to account for the decrement 1718 * that spdk_bdev_io_complete() will do. 1719 */ 1720 if (bdev_io->type != SPDK_BDEV_IO_TYPE_RESET) { 1721 ch->io_outstanding++; 1722 ch->shared_resource->io_outstanding++; 1723 } 1724 spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED); 1725 } 1726 } 1727 } 1728 1729 static void 1730 spdk_bdev_qos_channel_destroy(void *cb_arg) 1731 { 1732 struct spdk_bdev_qos *qos = cb_arg; 1733 1734 spdk_put_io_channel(spdk_io_channel_from_ctx(qos->ch)); 1735 spdk_poller_unregister(&qos->poller); 1736 1737 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Free QoS %p.\n", qos); 1738 1739 free(qos); 1740 } 1741 1742 static int 1743 spdk_bdev_qos_destroy(struct spdk_bdev *bdev) 1744 { 1745 int i; 1746 1747 /* 1748 * Cleanly shutting down the QoS poller is tricky, because 1749 * during the asynchronous operation the user could open 1750 * a new descriptor and create a new channel, spawning 1751 * a new QoS poller. 1752 * 1753 * The strategy is to create a new QoS structure here and swap it 1754 * in. The shutdown path then continues to refer to the old one 1755 * until it completes and then releases it. 1756 */ 1757 struct spdk_bdev_qos *new_qos, *old_qos; 1758 1759 old_qos = bdev->internal.qos; 1760 1761 new_qos = calloc(1, sizeof(*new_qos)); 1762 if (!new_qos) { 1763 SPDK_ERRLOG("Unable to allocate memory to shut down QoS.\n"); 1764 return -ENOMEM; 1765 } 1766 1767 /* Copy the old QoS data into the newly allocated structure */ 1768 memcpy(new_qos, old_qos, sizeof(*new_qos)); 1769 1770 /* Zero out the key parts of the QoS structure */ 1771 new_qos->ch = NULL; 1772 new_qos->thread = NULL; 1773 new_qos->poller = NULL; 1774 TAILQ_INIT(&new_qos->queued); 1775 /* 1776 * The limit member of spdk_bdev_qos_limit structure is not zeroed. 1777 * It will be used later for the new QoS structure. 1778 */ 1779 for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) { 1780 new_qos->rate_limits[i].remaining_this_timeslice = 0; 1781 new_qos->rate_limits[i].min_per_timeslice = 0; 1782 new_qos->rate_limits[i].max_per_timeslice = 0; 1783 } 1784 1785 bdev->internal.qos = new_qos; 1786 1787 if (old_qos->thread == NULL) { 1788 free(old_qos); 1789 } else { 1790 spdk_thread_send_msg(old_qos->thread, spdk_bdev_qos_channel_destroy, 1791 old_qos); 1792 } 1793 1794 /* It is safe to continue with destroying the bdev even though the QoS channel hasn't 1795 * been destroyed yet. The destruction path will end up waiting for the final 1796 * channel to be put before it releases resources. */ 1797 1798 return 0; 1799 } 1800 1801 static void 1802 _spdk_bdev_io_stat_add(struct spdk_bdev_io_stat *total, struct spdk_bdev_io_stat *add) 1803 { 1804 total->bytes_read += add->bytes_read; 1805 total->num_read_ops += add->num_read_ops; 1806 total->bytes_written += add->bytes_written; 1807 total->num_write_ops += add->num_write_ops; 1808 total->read_latency_ticks += add->read_latency_ticks; 1809 total->write_latency_ticks += add->write_latency_ticks; 1810 } 1811 1812 static void 1813 spdk_bdev_channel_destroy(void *io_device, void *ctx_buf) 1814 { 1815 struct spdk_bdev_channel *ch = ctx_buf; 1816 struct spdk_bdev_mgmt_channel *mgmt_ch; 1817 struct spdk_bdev_shared_resource *shared_resource = ch->shared_resource; 1818 1819 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Destroying channel %p for bdev %s on thread %p\n", ch, ch->bdev->name, 1820 spdk_get_thread()); 1821 1822 /* This channel is going away, so add its statistics into the bdev so that they don't get lost. */ 1823 pthread_mutex_lock(&ch->bdev->internal.mutex); 1824 _spdk_bdev_io_stat_add(&ch->bdev->internal.stat, &ch->stat); 1825 pthread_mutex_unlock(&ch->bdev->internal.mutex); 1826 1827 mgmt_ch = shared_resource->mgmt_ch; 1828 1829 _spdk_bdev_abort_queued_io(&ch->queued_resets, ch); 1830 _spdk_bdev_abort_queued_io(&shared_resource->nomem_io, ch); 1831 _spdk_bdev_abort_buf_io(&mgmt_ch->need_buf_small, ch); 1832 _spdk_bdev_abort_buf_io(&mgmt_ch->need_buf_large, ch); 1833 1834 _spdk_bdev_channel_destroy_resource(ch); 1835 } 1836 1837 int 1838 spdk_bdev_alias_add(struct spdk_bdev *bdev, const char *alias) 1839 { 1840 struct spdk_bdev_alias *tmp; 1841 1842 if (alias == NULL) { 1843 SPDK_ERRLOG("Empty alias passed\n"); 1844 return -EINVAL; 1845 } 1846 1847 if (spdk_bdev_get_by_name(alias)) { 1848 SPDK_ERRLOG("Bdev name/alias: %s already exists\n", alias); 1849 return -EEXIST; 1850 } 1851 1852 tmp = calloc(1, sizeof(*tmp)); 1853 if (tmp == NULL) { 1854 SPDK_ERRLOG("Unable to allocate alias\n"); 1855 return -ENOMEM; 1856 } 1857 1858 tmp->alias = strdup(alias); 1859 if (tmp->alias == NULL) { 1860 free(tmp); 1861 SPDK_ERRLOG("Unable to allocate alias\n"); 1862 return -ENOMEM; 1863 } 1864 1865 TAILQ_INSERT_TAIL(&bdev->aliases, tmp, tailq); 1866 1867 return 0; 1868 } 1869 1870 int 1871 spdk_bdev_alias_del(struct spdk_bdev *bdev, const char *alias) 1872 { 1873 struct spdk_bdev_alias *tmp; 1874 1875 TAILQ_FOREACH(tmp, &bdev->aliases, tailq) { 1876 if (strcmp(alias, tmp->alias) == 0) { 1877 TAILQ_REMOVE(&bdev->aliases, tmp, tailq); 1878 free(tmp->alias); 1879 free(tmp); 1880 return 0; 1881 } 1882 } 1883 1884 SPDK_INFOLOG(SPDK_LOG_BDEV, "Alias %s does not exists\n", alias); 1885 1886 return -ENOENT; 1887 } 1888 1889 void 1890 spdk_bdev_alias_del_all(struct spdk_bdev *bdev) 1891 { 1892 struct spdk_bdev_alias *p, *tmp; 1893 1894 TAILQ_FOREACH_SAFE(p, &bdev->aliases, tailq, tmp) { 1895 TAILQ_REMOVE(&bdev->aliases, p, tailq); 1896 free(p->alias); 1897 free(p); 1898 } 1899 } 1900 1901 struct spdk_io_channel * 1902 spdk_bdev_get_io_channel(struct spdk_bdev_desc *desc) 1903 { 1904 return spdk_get_io_channel(__bdev_to_io_dev(desc->bdev)); 1905 } 1906 1907 const char * 1908 spdk_bdev_get_name(const struct spdk_bdev *bdev) 1909 { 1910 return bdev->name; 1911 } 1912 1913 const char * 1914 spdk_bdev_get_product_name(const struct spdk_bdev *bdev) 1915 { 1916 return bdev->product_name; 1917 } 1918 1919 const struct spdk_bdev_aliases_list * 1920 spdk_bdev_get_aliases(const struct spdk_bdev *bdev) 1921 { 1922 return &bdev->aliases; 1923 } 1924 1925 uint32_t 1926 spdk_bdev_get_block_size(const struct spdk_bdev *bdev) 1927 { 1928 return bdev->blocklen; 1929 } 1930 1931 uint64_t 1932 spdk_bdev_get_num_blocks(const struct spdk_bdev *bdev) 1933 { 1934 return bdev->blockcnt; 1935 } 1936 1937 const char * 1938 spdk_bdev_get_qos_rpc_type(enum spdk_bdev_qos_rate_limit_type type) 1939 { 1940 return qos_rpc_type[type]; 1941 } 1942 1943 void 1944 spdk_bdev_get_qos_rate_limits(struct spdk_bdev *bdev, uint64_t *limits) 1945 { 1946 int i; 1947 1948 memset(limits, 0, sizeof(*limits) * SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES); 1949 1950 pthread_mutex_lock(&bdev->internal.mutex); 1951 if (bdev->internal.qos) { 1952 for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) { 1953 if (bdev->internal.qos->rate_limits[i].limit != 1954 SPDK_BDEV_QOS_LIMIT_NOT_DEFINED) { 1955 limits[i] = bdev->internal.qos->rate_limits[i].limit; 1956 } 1957 } 1958 } 1959 pthread_mutex_unlock(&bdev->internal.mutex); 1960 } 1961 1962 size_t 1963 spdk_bdev_get_buf_align(const struct spdk_bdev *bdev) 1964 { 1965 /* TODO: push this logic down to the bdev modules */ 1966 if (bdev->need_aligned_buffer) { 1967 return bdev->blocklen; 1968 } 1969 1970 return 1; 1971 } 1972 1973 uint32_t 1974 spdk_bdev_get_optimal_io_boundary(const struct spdk_bdev *bdev) 1975 { 1976 return bdev->optimal_io_boundary; 1977 } 1978 1979 bool 1980 spdk_bdev_has_write_cache(const struct spdk_bdev *bdev) 1981 { 1982 return bdev->write_cache; 1983 } 1984 1985 const struct spdk_uuid * 1986 spdk_bdev_get_uuid(const struct spdk_bdev *bdev) 1987 { 1988 return &bdev->uuid; 1989 } 1990 1991 uint64_t 1992 spdk_bdev_get_qd(const struct spdk_bdev *bdev) 1993 { 1994 return bdev->internal.measured_queue_depth; 1995 } 1996 1997 uint64_t 1998 spdk_bdev_get_qd_sampling_period(const struct spdk_bdev *bdev) 1999 { 2000 return bdev->internal.period; 2001 } 2002 2003 uint64_t 2004 spdk_bdev_get_weighted_io_time(const struct spdk_bdev *bdev) 2005 { 2006 return bdev->internal.weighted_io_time; 2007 } 2008 2009 uint64_t 2010 spdk_bdev_get_io_time(const struct spdk_bdev *bdev) 2011 { 2012 return bdev->internal.io_time; 2013 } 2014 2015 static void 2016 _calculate_measured_qd_cpl(struct spdk_io_channel_iter *i, int status) 2017 { 2018 struct spdk_bdev *bdev = spdk_io_channel_iter_get_ctx(i); 2019 2020 bdev->internal.measured_queue_depth = bdev->internal.temporary_queue_depth; 2021 2022 if (bdev->internal.measured_queue_depth) { 2023 bdev->internal.io_time += bdev->internal.period; 2024 bdev->internal.weighted_io_time += bdev->internal.period * bdev->internal.measured_queue_depth; 2025 } 2026 } 2027 2028 static void 2029 _calculate_measured_qd(struct spdk_io_channel_iter *i) 2030 { 2031 struct spdk_bdev *bdev = spdk_io_channel_iter_get_ctx(i); 2032 struct spdk_io_channel *io_ch = spdk_io_channel_iter_get_channel(i); 2033 struct spdk_bdev_channel *ch = spdk_io_channel_get_ctx(io_ch); 2034 2035 bdev->internal.temporary_queue_depth += ch->io_outstanding; 2036 spdk_for_each_channel_continue(i, 0); 2037 } 2038 2039 static int 2040 spdk_bdev_calculate_measured_queue_depth(void *ctx) 2041 { 2042 struct spdk_bdev *bdev = ctx; 2043 bdev->internal.temporary_queue_depth = 0; 2044 spdk_for_each_channel(__bdev_to_io_dev(bdev), _calculate_measured_qd, bdev, 2045 _calculate_measured_qd_cpl); 2046 return 0; 2047 } 2048 2049 void 2050 spdk_bdev_set_qd_sampling_period(struct spdk_bdev *bdev, uint64_t period) 2051 { 2052 bdev->internal.period = period; 2053 2054 if (bdev->internal.qd_poller != NULL) { 2055 spdk_poller_unregister(&bdev->internal.qd_poller); 2056 bdev->internal.measured_queue_depth = UINT64_MAX; 2057 } 2058 2059 if (period != 0) { 2060 bdev->internal.qd_poller = spdk_poller_register(spdk_bdev_calculate_measured_queue_depth, bdev, 2061 period); 2062 } 2063 } 2064 2065 int 2066 spdk_bdev_notify_blockcnt_change(struct spdk_bdev *bdev, uint64_t size) 2067 { 2068 int ret; 2069 2070 pthread_mutex_lock(&bdev->internal.mutex); 2071 2072 /* bdev has open descriptors */ 2073 if (!TAILQ_EMPTY(&bdev->internal.open_descs) && 2074 bdev->blockcnt > size) { 2075 ret = -EBUSY; 2076 } else { 2077 bdev->blockcnt = size; 2078 ret = 0; 2079 } 2080 2081 pthread_mutex_unlock(&bdev->internal.mutex); 2082 2083 return ret; 2084 } 2085 2086 /* 2087 * Convert I/O offset and length from bytes to blocks. 2088 * 2089 * Returns zero on success or non-zero if the byte parameters aren't divisible by the block size. 2090 */ 2091 static uint64_t 2092 spdk_bdev_bytes_to_blocks(struct spdk_bdev *bdev, uint64_t offset_bytes, uint64_t *offset_blocks, 2093 uint64_t num_bytes, uint64_t *num_blocks) 2094 { 2095 uint32_t block_size = bdev->blocklen; 2096 2097 *offset_blocks = offset_bytes / block_size; 2098 *num_blocks = num_bytes / block_size; 2099 2100 return (offset_bytes % block_size) | (num_bytes % block_size); 2101 } 2102 2103 static bool 2104 spdk_bdev_io_valid_blocks(struct spdk_bdev *bdev, uint64_t offset_blocks, uint64_t num_blocks) 2105 { 2106 /* Return failure if offset_blocks + num_blocks is less than offset_blocks; indicates there 2107 * has been an overflow and hence the offset has been wrapped around */ 2108 if (offset_blocks + num_blocks < offset_blocks) { 2109 return false; 2110 } 2111 2112 /* Return failure if offset_blocks + num_blocks exceeds the size of the bdev */ 2113 if (offset_blocks + num_blocks > bdev->blockcnt) { 2114 return false; 2115 } 2116 2117 return true; 2118 } 2119 2120 int 2121 spdk_bdev_read(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 2122 void *buf, uint64_t offset, uint64_t nbytes, 2123 spdk_bdev_io_completion_cb cb, void *cb_arg) 2124 { 2125 uint64_t offset_blocks, num_blocks; 2126 2127 if (spdk_bdev_bytes_to_blocks(desc->bdev, offset, &offset_blocks, nbytes, &num_blocks) != 0) { 2128 return -EINVAL; 2129 } 2130 2131 return spdk_bdev_read_blocks(desc, ch, buf, offset_blocks, num_blocks, cb, cb_arg); 2132 } 2133 2134 int 2135 spdk_bdev_read_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 2136 void *buf, uint64_t offset_blocks, uint64_t num_blocks, 2137 spdk_bdev_io_completion_cb cb, void *cb_arg) 2138 { 2139 struct spdk_bdev *bdev = desc->bdev; 2140 struct spdk_bdev_io *bdev_io; 2141 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 2142 2143 if (!spdk_bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) { 2144 return -EINVAL; 2145 } 2146 2147 bdev_io = spdk_bdev_get_io(channel); 2148 if (!bdev_io) { 2149 return -ENOMEM; 2150 } 2151 2152 bdev_io->internal.ch = channel; 2153 bdev_io->internal.desc = desc; 2154 bdev_io->type = SPDK_BDEV_IO_TYPE_READ; 2155 bdev_io->u.bdev.iovs = &bdev_io->iov; 2156 bdev_io->u.bdev.iovs[0].iov_base = buf; 2157 bdev_io->u.bdev.iovs[0].iov_len = num_blocks * bdev->blocklen; 2158 bdev_io->u.bdev.iovcnt = 1; 2159 bdev_io->u.bdev.num_blocks = num_blocks; 2160 bdev_io->u.bdev.offset_blocks = offset_blocks; 2161 spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb); 2162 2163 spdk_bdev_io_submit(bdev_io); 2164 return 0; 2165 } 2166 2167 int 2168 spdk_bdev_readv(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 2169 struct iovec *iov, int iovcnt, 2170 uint64_t offset, uint64_t nbytes, 2171 spdk_bdev_io_completion_cb cb, void *cb_arg) 2172 { 2173 uint64_t offset_blocks, num_blocks; 2174 2175 if (spdk_bdev_bytes_to_blocks(desc->bdev, offset, &offset_blocks, nbytes, &num_blocks) != 0) { 2176 return -EINVAL; 2177 } 2178 2179 return spdk_bdev_readv_blocks(desc, ch, iov, iovcnt, offset_blocks, num_blocks, cb, cb_arg); 2180 } 2181 2182 int spdk_bdev_readv_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 2183 struct iovec *iov, int iovcnt, 2184 uint64_t offset_blocks, uint64_t num_blocks, 2185 spdk_bdev_io_completion_cb cb, void *cb_arg) 2186 { 2187 struct spdk_bdev *bdev = desc->bdev; 2188 struct spdk_bdev_io *bdev_io; 2189 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 2190 2191 if (!spdk_bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) { 2192 return -EINVAL; 2193 } 2194 2195 bdev_io = spdk_bdev_get_io(channel); 2196 if (!bdev_io) { 2197 return -ENOMEM; 2198 } 2199 2200 bdev_io->internal.ch = channel; 2201 bdev_io->internal.desc = desc; 2202 bdev_io->type = SPDK_BDEV_IO_TYPE_READ; 2203 bdev_io->u.bdev.iovs = iov; 2204 bdev_io->u.bdev.iovcnt = iovcnt; 2205 bdev_io->u.bdev.num_blocks = num_blocks; 2206 bdev_io->u.bdev.offset_blocks = offset_blocks; 2207 spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb); 2208 2209 spdk_bdev_io_submit(bdev_io); 2210 return 0; 2211 } 2212 2213 int 2214 spdk_bdev_write(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 2215 void *buf, uint64_t offset, uint64_t nbytes, 2216 spdk_bdev_io_completion_cb cb, void *cb_arg) 2217 { 2218 uint64_t offset_blocks, num_blocks; 2219 2220 if (spdk_bdev_bytes_to_blocks(desc->bdev, offset, &offset_blocks, nbytes, &num_blocks) != 0) { 2221 return -EINVAL; 2222 } 2223 2224 return spdk_bdev_write_blocks(desc, ch, buf, offset_blocks, num_blocks, cb, cb_arg); 2225 } 2226 2227 int 2228 spdk_bdev_write_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 2229 void *buf, uint64_t offset_blocks, uint64_t num_blocks, 2230 spdk_bdev_io_completion_cb cb, void *cb_arg) 2231 { 2232 struct spdk_bdev *bdev = desc->bdev; 2233 struct spdk_bdev_io *bdev_io; 2234 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 2235 2236 if (!desc->write) { 2237 return -EBADF; 2238 } 2239 2240 if (!spdk_bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) { 2241 return -EINVAL; 2242 } 2243 2244 bdev_io = spdk_bdev_get_io(channel); 2245 if (!bdev_io) { 2246 return -ENOMEM; 2247 } 2248 2249 bdev_io->internal.ch = channel; 2250 bdev_io->internal.desc = desc; 2251 bdev_io->type = SPDK_BDEV_IO_TYPE_WRITE; 2252 bdev_io->u.bdev.iovs = &bdev_io->iov; 2253 bdev_io->u.bdev.iovs[0].iov_base = buf; 2254 bdev_io->u.bdev.iovs[0].iov_len = num_blocks * bdev->blocklen; 2255 bdev_io->u.bdev.iovcnt = 1; 2256 bdev_io->u.bdev.num_blocks = num_blocks; 2257 bdev_io->u.bdev.offset_blocks = offset_blocks; 2258 spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb); 2259 2260 spdk_bdev_io_submit(bdev_io); 2261 return 0; 2262 } 2263 2264 int 2265 spdk_bdev_writev(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 2266 struct iovec *iov, int iovcnt, 2267 uint64_t offset, uint64_t len, 2268 spdk_bdev_io_completion_cb cb, void *cb_arg) 2269 { 2270 uint64_t offset_blocks, num_blocks; 2271 2272 if (spdk_bdev_bytes_to_blocks(desc->bdev, offset, &offset_blocks, len, &num_blocks) != 0) { 2273 return -EINVAL; 2274 } 2275 2276 return spdk_bdev_writev_blocks(desc, ch, iov, iovcnt, offset_blocks, num_blocks, cb, cb_arg); 2277 } 2278 2279 int 2280 spdk_bdev_writev_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 2281 struct iovec *iov, int iovcnt, 2282 uint64_t offset_blocks, uint64_t num_blocks, 2283 spdk_bdev_io_completion_cb cb, void *cb_arg) 2284 { 2285 struct spdk_bdev *bdev = desc->bdev; 2286 struct spdk_bdev_io *bdev_io; 2287 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 2288 2289 if (!desc->write) { 2290 return -EBADF; 2291 } 2292 2293 if (!spdk_bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) { 2294 return -EINVAL; 2295 } 2296 2297 bdev_io = spdk_bdev_get_io(channel); 2298 if (!bdev_io) { 2299 return -ENOMEM; 2300 } 2301 2302 bdev_io->internal.ch = channel; 2303 bdev_io->internal.desc = desc; 2304 bdev_io->type = SPDK_BDEV_IO_TYPE_WRITE; 2305 bdev_io->u.bdev.iovs = iov; 2306 bdev_io->u.bdev.iovcnt = iovcnt; 2307 bdev_io->u.bdev.num_blocks = num_blocks; 2308 bdev_io->u.bdev.offset_blocks = offset_blocks; 2309 spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb); 2310 2311 spdk_bdev_io_submit(bdev_io); 2312 return 0; 2313 } 2314 2315 int 2316 spdk_bdev_write_zeroes(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 2317 uint64_t offset, uint64_t len, 2318 spdk_bdev_io_completion_cb cb, void *cb_arg) 2319 { 2320 uint64_t offset_blocks, num_blocks; 2321 2322 if (spdk_bdev_bytes_to_blocks(desc->bdev, offset, &offset_blocks, len, &num_blocks) != 0) { 2323 return -EINVAL; 2324 } 2325 2326 return spdk_bdev_write_zeroes_blocks(desc, ch, offset_blocks, num_blocks, cb, cb_arg); 2327 } 2328 2329 int 2330 spdk_bdev_write_zeroes_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 2331 uint64_t offset_blocks, uint64_t num_blocks, 2332 spdk_bdev_io_completion_cb cb, void *cb_arg) 2333 { 2334 struct spdk_bdev *bdev = desc->bdev; 2335 struct spdk_bdev_io *bdev_io; 2336 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 2337 2338 if (!desc->write) { 2339 return -EBADF; 2340 } 2341 2342 if (!spdk_bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) { 2343 return -EINVAL; 2344 } 2345 2346 bdev_io = spdk_bdev_get_io(channel); 2347 2348 if (!bdev_io) { 2349 return -ENOMEM; 2350 } 2351 2352 bdev_io->type = SPDK_BDEV_IO_TYPE_WRITE_ZEROES; 2353 bdev_io->internal.ch = channel; 2354 bdev_io->internal.desc = desc; 2355 bdev_io->u.bdev.offset_blocks = offset_blocks; 2356 bdev_io->u.bdev.num_blocks = num_blocks; 2357 spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb); 2358 2359 if (_spdk_bdev_io_type_supported(bdev, SPDK_BDEV_IO_TYPE_WRITE_ZEROES)) { 2360 spdk_bdev_io_submit(bdev_io); 2361 return 0; 2362 } else if (_spdk_bdev_io_type_supported(bdev, SPDK_BDEV_IO_TYPE_WRITE)) { 2363 assert(spdk_bdev_get_block_size(bdev) <= ZERO_BUFFER_SIZE); 2364 bdev_io->u.bdev.split_remaining_num_blocks = num_blocks; 2365 bdev_io->u.bdev.split_current_offset_blocks = offset_blocks; 2366 _spdk_bdev_write_zero_buffer_next(bdev_io); 2367 return 0; 2368 } else { 2369 spdk_bdev_free_io(bdev_io); 2370 return -ENOTSUP; 2371 } 2372 } 2373 2374 int 2375 spdk_bdev_unmap(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 2376 uint64_t offset, uint64_t nbytes, 2377 spdk_bdev_io_completion_cb cb, void *cb_arg) 2378 { 2379 uint64_t offset_blocks, num_blocks; 2380 2381 if (spdk_bdev_bytes_to_blocks(desc->bdev, offset, &offset_blocks, nbytes, &num_blocks) != 0) { 2382 return -EINVAL; 2383 } 2384 2385 return spdk_bdev_unmap_blocks(desc, ch, offset_blocks, num_blocks, cb, cb_arg); 2386 } 2387 2388 int 2389 spdk_bdev_unmap_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 2390 uint64_t offset_blocks, uint64_t num_blocks, 2391 spdk_bdev_io_completion_cb cb, void *cb_arg) 2392 { 2393 struct spdk_bdev *bdev = desc->bdev; 2394 struct spdk_bdev_io *bdev_io; 2395 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 2396 2397 if (!desc->write) { 2398 return -EBADF; 2399 } 2400 2401 if (!spdk_bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) { 2402 return -EINVAL; 2403 } 2404 2405 if (num_blocks == 0) { 2406 SPDK_ERRLOG("Can't unmap 0 bytes\n"); 2407 return -EINVAL; 2408 } 2409 2410 bdev_io = spdk_bdev_get_io(channel); 2411 if (!bdev_io) { 2412 return -ENOMEM; 2413 } 2414 2415 bdev_io->internal.ch = channel; 2416 bdev_io->internal.desc = desc; 2417 bdev_io->type = SPDK_BDEV_IO_TYPE_UNMAP; 2418 2419 bdev_io->u.bdev.iovs = &bdev_io->iov; 2420 bdev_io->u.bdev.iovs[0].iov_base = NULL; 2421 bdev_io->u.bdev.iovs[0].iov_len = 0; 2422 bdev_io->u.bdev.iovcnt = 1; 2423 2424 bdev_io->u.bdev.offset_blocks = offset_blocks; 2425 bdev_io->u.bdev.num_blocks = num_blocks; 2426 spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb); 2427 2428 spdk_bdev_io_submit(bdev_io); 2429 return 0; 2430 } 2431 2432 int 2433 spdk_bdev_flush(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 2434 uint64_t offset, uint64_t length, 2435 spdk_bdev_io_completion_cb cb, void *cb_arg) 2436 { 2437 uint64_t offset_blocks, num_blocks; 2438 2439 if (spdk_bdev_bytes_to_blocks(desc->bdev, offset, &offset_blocks, length, &num_blocks) != 0) { 2440 return -EINVAL; 2441 } 2442 2443 return spdk_bdev_flush_blocks(desc, ch, offset_blocks, num_blocks, cb, cb_arg); 2444 } 2445 2446 int 2447 spdk_bdev_flush_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 2448 uint64_t offset_blocks, uint64_t num_blocks, 2449 spdk_bdev_io_completion_cb cb, void *cb_arg) 2450 { 2451 struct spdk_bdev *bdev = desc->bdev; 2452 struct spdk_bdev_io *bdev_io; 2453 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 2454 2455 if (!desc->write) { 2456 return -EBADF; 2457 } 2458 2459 if (!spdk_bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) { 2460 return -EINVAL; 2461 } 2462 2463 bdev_io = spdk_bdev_get_io(channel); 2464 if (!bdev_io) { 2465 return -ENOMEM; 2466 } 2467 2468 bdev_io->internal.ch = channel; 2469 bdev_io->internal.desc = desc; 2470 bdev_io->type = SPDK_BDEV_IO_TYPE_FLUSH; 2471 bdev_io->u.bdev.iovs = NULL; 2472 bdev_io->u.bdev.iovcnt = 0; 2473 bdev_io->u.bdev.offset_blocks = offset_blocks; 2474 bdev_io->u.bdev.num_blocks = num_blocks; 2475 spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb); 2476 2477 spdk_bdev_io_submit(bdev_io); 2478 return 0; 2479 } 2480 2481 static void 2482 _spdk_bdev_reset_dev(struct spdk_io_channel_iter *i, int status) 2483 { 2484 struct spdk_bdev_channel *ch = spdk_io_channel_iter_get_ctx(i); 2485 struct spdk_bdev_io *bdev_io; 2486 2487 bdev_io = TAILQ_FIRST(&ch->queued_resets); 2488 TAILQ_REMOVE(&ch->queued_resets, bdev_io, internal.link); 2489 spdk_bdev_io_submit_reset(bdev_io); 2490 } 2491 2492 static void 2493 _spdk_bdev_reset_freeze_channel(struct spdk_io_channel_iter *i) 2494 { 2495 struct spdk_io_channel *ch; 2496 struct spdk_bdev_channel *channel; 2497 struct spdk_bdev_mgmt_channel *mgmt_channel; 2498 struct spdk_bdev_shared_resource *shared_resource; 2499 bdev_io_tailq_t tmp_queued; 2500 2501 TAILQ_INIT(&tmp_queued); 2502 2503 ch = spdk_io_channel_iter_get_channel(i); 2504 channel = spdk_io_channel_get_ctx(ch); 2505 shared_resource = channel->shared_resource; 2506 mgmt_channel = shared_resource->mgmt_ch; 2507 2508 channel->flags |= BDEV_CH_RESET_IN_PROGRESS; 2509 2510 if ((channel->flags & BDEV_CH_QOS_ENABLED) != 0) { 2511 /* The QoS object is always valid and readable while 2512 * the channel flag is set, so the lock here should not 2513 * be necessary. We're not in the fast path though, so 2514 * just take it anyway. */ 2515 pthread_mutex_lock(&channel->bdev->internal.mutex); 2516 if (channel->bdev->internal.qos->ch == channel) { 2517 TAILQ_SWAP(&channel->bdev->internal.qos->queued, &tmp_queued, spdk_bdev_io, internal.link); 2518 } 2519 pthread_mutex_unlock(&channel->bdev->internal.mutex); 2520 } 2521 2522 _spdk_bdev_abort_queued_io(&shared_resource->nomem_io, channel); 2523 _spdk_bdev_abort_buf_io(&mgmt_channel->need_buf_small, channel); 2524 _spdk_bdev_abort_buf_io(&mgmt_channel->need_buf_large, channel); 2525 _spdk_bdev_abort_queued_io(&tmp_queued, channel); 2526 2527 spdk_for_each_channel_continue(i, 0); 2528 } 2529 2530 static void 2531 _spdk_bdev_start_reset(void *ctx) 2532 { 2533 struct spdk_bdev_channel *ch = ctx; 2534 2535 spdk_for_each_channel(__bdev_to_io_dev(ch->bdev), _spdk_bdev_reset_freeze_channel, 2536 ch, _spdk_bdev_reset_dev); 2537 } 2538 2539 static void 2540 _spdk_bdev_channel_start_reset(struct spdk_bdev_channel *ch) 2541 { 2542 struct spdk_bdev *bdev = ch->bdev; 2543 2544 assert(!TAILQ_EMPTY(&ch->queued_resets)); 2545 2546 pthread_mutex_lock(&bdev->internal.mutex); 2547 if (bdev->internal.reset_in_progress == NULL) { 2548 bdev->internal.reset_in_progress = TAILQ_FIRST(&ch->queued_resets); 2549 /* 2550 * Take a channel reference for the target bdev for the life of this 2551 * reset. This guards against the channel getting destroyed while 2552 * spdk_for_each_channel() calls related to this reset IO are in 2553 * progress. We will release the reference when this reset is 2554 * completed. 2555 */ 2556 bdev->internal.reset_in_progress->u.reset.ch_ref = spdk_get_io_channel(__bdev_to_io_dev(bdev)); 2557 _spdk_bdev_start_reset(ch); 2558 } 2559 pthread_mutex_unlock(&bdev->internal.mutex); 2560 } 2561 2562 int 2563 spdk_bdev_reset(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 2564 spdk_bdev_io_completion_cb cb, void *cb_arg) 2565 { 2566 struct spdk_bdev *bdev = desc->bdev; 2567 struct spdk_bdev_io *bdev_io; 2568 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 2569 2570 bdev_io = spdk_bdev_get_io(channel); 2571 if (!bdev_io) { 2572 return -ENOMEM; 2573 } 2574 2575 bdev_io->internal.ch = channel; 2576 bdev_io->internal.desc = desc; 2577 bdev_io->type = SPDK_BDEV_IO_TYPE_RESET; 2578 bdev_io->u.reset.ch_ref = NULL; 2579 spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb); 2580 2581 pthread_mutex_lock(&bdev->internal.mutex); 2582 TAILQ_INSERT_TAIL(&channel->queued_resets, bdev_io, internal.link); 2583 pthread_mutex_unlock(&bdev->internal.mutex); 2584 2585 _spdk_bdev_channel_start_reset(channel); 2586 2587 return 0; 2588 } 2589 2590 void 2591 spdk_bdev_get_io_stat(struct spdk_bdev *bdev, struct spdk_io_channel *ch, 2592 struct spdk_bdev_io_stat *stat) 2593 { 2594 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 2595 2596 *stat = channel->stat; 2597 } 2598 2599 static void 2600 _spdk_bdev_get_device_stat_done(struct spdk_io_channel_iter *i, int status) 2601 { 2602 void *io_device = spdk_io_channel_iter_get_io_device(i); 2603 struct spdk_bdev_iostat_ctx *bdev_iostat_ctx = spdk_io_channel_iter_get_ctx(i); 2604 2605 bdev_iostat_ctx->cb(__bdev_from_io_dev(io_device), bdev_iostat_ctx->stat, 2606 bdev_iostat_ctx->cb_arg, 0); 2607 free(bdev_iostat_ctx); 2608 } 2609 2610 static void 2611 _spdk_bdev_get_each_channel_stat(struct spdk_io_channel_iter *i) 2612 { 2613 struct spdk_bdev_iostat_ctx *bdev_iostat_ctx = spdk_io_channel_iter_get_ctx(i); 2614 struct spdk_io_channel *ch = spdk_io_channel_iter_get_channel(i); 2615 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 2616 2617 _spdk_bdev_io_stat_add(bdev_iostat_ctx->stat, &channel->stat); 2618 spdk_for_each_channel_continue(i, 0); 2619 } 2620 2621 void 2622 spdk_bdev_get_device_stat(struct spdk_bdev *bdev, struct spdk_bdev_io_stat *stat, 2623 spdk_bdev_get_device_stat_cb cb, void *cb_arg) 2624 { 2625 struct spdk_bdev_iostat_ctx *bdev_iostat_ctx; 2626 2627 assert(bdev != NULL); 2628 assert(stat != NULL); 2629 assert(cb != NULL); 2630 2631 bdev_iostat_ctx = calloc(1, sizeof(struct spdk_bdev_iostat_ctx)); 2632 if (bdev_iostat_ctx == NULL) { 2633 SPDK_ERRLOG("Unable to allocate memory for spdk_bdev_iostat_ctx\n"); 2634 cb(bdev, stat, cb_arg, -ENOMEM); 2635 return; 2636 } 2637 2638 bdev_iostat_ctx->stat = stat; 2639 bdev_iostat_ctx->cb = cb; 2640 bdev_iostat_ctx->cb_arg = cb_arg; 2641 2642 /* Start with the statistics from previously deleted channels. */ 2643 pthread_mutex_lock(&bdev->internal.mutex); 2644 _spdk_bdev_io_stat_add(bdev_iostat_ctx->stat, &bdev->internal.stat); 2645 pthread_mutex_unlock(&bdev->internal.mutex); 2646 2647 /* Then iterate and add the statistics from each existing channel. */ 2648 spdk_for_each_channel(__bdev_to_io_dev(bdev), 2649 _spdk_bdev_get_each_channel_stat, 2650 bdev_iostat_ctx, 2651 _spdk_bdev_get_device_stat_done); 2652 } 2653 2654 int 2655 spdk_bdev_nvme_admin_passthru(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 2656 const struct spdk_nvme_cmd *cmd, void *buf, size_t nbytes, 2657 spdk_bdev_io_completion_cb cb, void *cb_arg) 2658 { 2659 struct spdk_bdev *bdev = desc->bdev; 2660 struct spdk_bdev_io *bdev_io; 2661 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 2662 2663 if (!desc->write) { 2664 return -EBADF; 2665 } 2666 2667 bdev_io = spdk_bdev_get_io(channel); 2668 if (!bdev_io) { 2669 return -ENOMEM; 2670 } 2671 2672 bdev_io->internal.ch = channel; 2673 bdev_io->internal.desc = desc; 2674 bdev_io->type = SPDK_BDEV_IO_TYPE_NVME_ADMIN; 2675 bdev_io->u.nvme_passthru.cmd = *cmd; 2676 bdev_io->u.nvme_passthru.buf = buf; 2677 bdev_io->u.nvme_passthru.nbytes = nbytes; 2678 bdev_io->u.nvme_passthru.md_buf = NULL; 2679 bdev_io->u.nvme_passthru.md_len = 0; 2680 2681 spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb); 2682 2683 spdk_bdev_io_submit(bdev_io); 2684 return 0; 2685 } 2686 2687 int 2688 spdk_bdev_nvme_io_passthru(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 2689 const struct spdk_nvme_cmd *cmd, void *buf, size_t nbytes, 2690 spdk_bdev_io_completion_cb cb, void *cb_arg) 2691 { 2692 struct spdk_bdev *bdev = desc->bdev; 2693 struct spdk_bdev_io *bdev_io; 2694 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 2695 2696 if (!desc->write) { 2697 /* 2698 * Do not try to parse the NVMe command - we could maybe use bits in the opcode 2699 * to easily determine if the command is a read or write, but for now just 2700 * do not allow io_passthru with a read-only descriptor. 2701 */ 2702 return -EBADF; 2703 } 2704 2705 bdev_io = spdk_bdev_get_io(channel); 2706 if (!bdev_io) { 2707 return -ENOMEM; 2708 } 2709 2710 bdev_io->internal.ch = channel; 2711 bdev_io->internal.desc = desc; 2712 bdev_io->type = SPDK_BDEV_IO_TYPE_NVME_IO; 2713 bdev_io->u.nvme_passthru.cmd = *cmd; 2714 bdev_io->u.nvme_passthru.buf = buf; 2715 bdev_io->u.nvme_passthru.nbytes = nbytes; 2716 bdev_io->u.nvme_passthru.md_buf = NULL; 2717 bdev_io->u.nvme_passthru.md_len = 0; 2718 2719 spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb); 2720 2721 spdk_bdev_io_submit(bdev_io); 2722 return 0; 2723 } 2724 2725 int 2726 spdk_bdev_nvme_io_passthru_md(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 2727 const struct spdk_nvme_cmd *cmd, void *buf, size_t nbytes, void *md_buf, size_t md_len, 2728 spdk_bdev_io_completion_cb cb, void *cb_arg) 2729 { 2730 struct spdk_bdev *bdev = desc->bdev; 2731 struct spdk_bdev_io *bdev_io; 2732 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 2733 2734 if (!desc->write) { 2735 /* 2736 * Do not try to parse the NVMe command - we could maybe use bits in the opcode 2737 * to easily determine if the command is a read or write, but for now just 2738 * do not allow io_passthru with a read-only descriptor. 2739 */ 2740 return -EBADF; 2741 } 2742 2743 bdev_io = spdk_bdev_get_io(channel); 2744 if (!bdev_io) { 2745 return -ENOMEM; 2746 } 2747 2748 bdev_io->internal.ch = channel; 2749 bdev_io->internal.desc = desc; 2750 bdev_io->type = SPDK_BDEV_IO_TYPE_NVME_IO_MD; 2751 bdev_io->u.nvme_passthru.cmd = *cmd; 2752 bdev_io->u.nvme_passthru.buf = buf; 2753 bdev_io->u.nvme_passthru.nbytes = nbytes; 2754 bdev_io->u.nvme_passthru.md_buf = md_buf; 2755 bdev_io->u.nvme_passthru.md_len = md_len; 2756 2757 spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb); 2758 2759 spdk_bdev_io_submit(bdev_io); 2760 return 0; 2761 } 2762 2763 int 2764 spdk_bdev_queue_io_wait(struct spdk_bdev *bdev, struct spdk_io_channel *ch, 2765 struct spdk_bdev_io_wait_entry *entry) 2766 { 2767 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 2768 struct spdk_bdev_mgmt_channel *mgmt_ch = channel->shared_resource->mgmt_ch; 2769 2770 if (bdev != entry->bdev) { 2771 SPDK_ERRLOG("bdevs do not match\n"); 2772 return -EINVAL; 2773 } 2774 2775 if (mgmt_ch->per_thread_cache_count > 0) { 2776 SPDK_ERRLOG("Cannot queue io_wait if spdk_bdev_io available in per-thread cache\n"); 2777 return -EINVAL; 2778 } 2779 2780 TAILQ_INSERT_TAIL(&mgmt_ch->io_wait_queue, entry, link); 2781 return 0; 2782 } 2783 2784 static void 2785 _spdk_bdev_ch_retry_io(struct spdk_bdev_channel *bdev_ch) 2786 { 2787 struct spdk_bdev *bdev = bdev_ch->bdev; 2788 struct spdk_bdev_shared_resource *shared_resource = bdev_ch->shared_resource; 2789 struct spdk_bdev_io *bdev_io; 2790 2791 if (shared_resource->io_outstanding > shared_resource->nomem_threshold) { 2792 /* 2793 * Allow some more I/O to complete before retrying the nomem_io queue. 2794 * Some drivers (such as nvme) cannot immediately take a new I/O in 2795 * the context of a completion, because the resources for the I/O are 2796 * not released until control returns to the bdev poller. Also, we 2797 * may require several small I/O to complete before a larger I/O 2798 * (that requires splitting) can be submitted. 2799 */ 2800 return; 2801 } 2802 2803 while (!TAILQ_EMPTY(&shared_resource->nomem_io)) { 2804 bdev_io = TAILQ_FIRST(&shared_resource->nomem_io); 2805 TAILQ_REMOVE(&shared_resource->nomem_io, bdev_io, internal.link); 2806 bdev_io->internal.ch->io_outstanding++; 2807 shared_resource->io_outstanding++; 2808 bdev_io->internal.status = SPDK_BDEV_IO_STATUS_PENDING; 2809 bdev->fn_table->submit_request(bdev_io->internal.ch->channel, bdev_io); 2810 if (bdev_io->internal.status == SPDK_BDEV_IO_STATUS_NOMEM) { 2811 break; 2812 } 2813 } 2814 } 2815 2816 static inline void 2817 _spdk_bdev_io_complete(void *ctx) 2818 { 2819 struct spdk_bdev_io *bdev_io = ctx; 2820 uint64_t tsc; 2821 2822 if (spdk_unlikely(bdev_io->internal.in_submit_request || bdev_io->internal.io_submit_ch)) { 2823 /* 2824 * Send the completion to the thread that originally submitted the I/O, 2825 * which may not be the current thread in the case of QoS. 2826 */ 2827 if (bdev_io->internal.io_submit_ch) { 2828 bdev_io->internal.ch = bdev_io->internal.io_submit_ch; 2829 bdev_io->internal.io_submit_ch = NULL; 2830 } 2831 2832 /* 2833 * Defer completion to avoid potential infinite recursion if the 2834 * user's completion callback issues a new I/O. 2835 */ 2836 spdk_thread_send_msg(spdk_io_channel_get_thread(bdev_io->internal.ch->channel), 2837 _spdk_bdev_io_complete, bdev_io); 2838 return; 2839 } 2840 2841 tsc = spdk_get_ticks(); 2842 spdk_trace_record_tsc(tsc, TRACE_BDEV_IO_DONE, 0, 0, (uintptr_t)bdev_io, 0); 2843 2844 if (bdev_io->internal.status == SPDK_BDEV_IO_STATUS_SUCCESS) { 2845 switch (bdev_io->type) { 2846 case SPDK_BDEV_IO_TYPE_READ: 2847 bdev_io->internal.ch->stat.bytes_read += bdev_io->u.bdev.num_blocks * bdev_io->bdev->blocklen; 2848 bdev_io->internal.ch->stat.num_read_ops++; 2849 bdev_io->internal.ch->stat.read_latency_ticks += (tsc - bdev_io->internal.submit_tsc); 2850 break; 2851 case SPDK_BDEV_IO_TYPE_WRITE: 2852 bdev_io->internal.ch->stat.bytes_written += bdev_io->u.bdev.num_blocks * bdev_io->bdev->blocklen; 2853 bdev_io->internal.ch->stat.num_write_ops++; 2854 bdev_io->internal.ch->stat.write_latency_ticks += (tsc - bdev_io->internal.submit_tsc); 2855 break; 2856 default: 2857 break; 2858 } 2859 } 2860 2861 #ifdef SPDK_CONFIG_VTUNE 2862 uint64_t now_tsc = spdk_get_ticks(); 2863 if (now_tsc > (bdev_io->internal.ch->start_tsc + bdev_io->internal.ch->interval_tsc)) { 2864 uint64_t data[5]; 2865 2866 data[0] = bdev_io->internal.ch->stat.num_read_ops - bdev_io->internal.ch->prev_stat.num_read_ops; 2867 data[1] = bdev_io->internal.ch->stat.bytes_read - bdev_io->internal.ch->prev_stat.bytes_read; 2868 data[2] = bdev_io->internal.ch->stat.num_write_ops - bdev_io->internal.ch->prev_stat.num_write_ops; 2869 data[3] = bdev_io->internal.ch->stat.bytes_written - bdev_io->internal.ch->prev_stat.bytes_written; 2870 data[4] = bdev_io->bdev->fn_table->get_spin_time ? 2871 bdev_io->bdev->fn_table->get_spin_time(bdev_io->internal.ch->channel) : 0; 2872 2873 __itt_metadata_add(g_bdev_mgr.domain, __itt_null, bdev_io->internal.ch->handle, 2874 __itt_metadata_u64, 5, data); 2875 2876 bdev_io->internal.ch->prev_stat = bdev_io->internal.ch->stat; 2877 bdev_io->internal.ch->start_tsc = now_tsc; 2878 } 2879 #endif 2880 2881 assert(bdev_io->internal.cb != NULL); 2882 assert(spdk_get_thread() == spdk_io_channel_get_thread(bdev_io->internal.ch->channel)); 2883 2884 bdev_io->internal.cb(bdev_io, bdev_io->internal.status == SPDK_BDEV_IO_STATUS_SUCCESS, 2885 bdev_io->internal.caller_ctx); 2886 } 2887 2888 static void 2889 _spdk_bdev_reset_complete(struct spdk_io_channel_iter *i, int status) 2890 { 2891 struct spdk_bdev_io *bdev_io = spdk_io_channel_iter_get_ctx(i); 2892 2893 if (bdev_io->u.reset.ch_ref != NULL) { 2894 spdk_put_io_channel(bdev_io->u.reset.ch_ref); 2895 bdev_io->u.reset.ch_ref = NULL; 2896 } 2897 2898 _spdk_bdev_io_complete(bdev_io); 2899 } 2900 2901 static void 2902 _spdk_bdev_unfreeze_channel(struct spdk_io_channel_iter *i) 2903 { 2904 struct spdk_io_channel *_ch = spdk_io_channel_iter_get_channel(i); 2905 struct spdk_bdev_channel *ch = spdk_io_channel_get_ctx(_ch); 2906 2907 ch->flags &= ~BDEV_CH_RESET_IN_PROGRESS; 2908 if (!TAILQ_EMPTY(&ch->queued_resets)) { 2909 _spdk_bdev_channel_start_reset(ch); 2910 } 2911 2912 spdk_for_each_channel_continue(i, 0); 2913 } 2914 2915 void 2916 spdk_bdev_io_complete(struct spdk_bdev_io *bdev_io, enum spdk_bdev_io_status status) 2917 { 2918 struct spdk_bdev *bdev = bdev_io->bdev; 2919 struct spdk_bdev_channel *bdev_ch = bdev_io->internal.ch; 2920 struct spdk_bdev_shared_resource *shared_resource = bdev_ch->shared_resource; 2921 2922 bdev_io->internal.status = status; 2923 2924 if (spdk_unlikely(bdev_io->type == SPDK_BDEV_IO_TYPE_RESET)) { 2925 bool unlock_channels = false; 2926 2927 if (status == SPDK_BDEV_IO_STATUS_NOMEM) { 2928 SPDK_ERRLOG("NOMEM returned for reset\n"); 2929 } 2930 pthread_mutex_lock(&bdev->internal.mutex); 2931 if (bdev_io == bdev->internal.reset_in_progress) { 2932 bdev->internal.reset_in_progress = NULL; 2933 unlock_channels = true; 2934 } 2935 pthread_mutex_unlock(&bdev->internal.mutex); 2936 2937 if (unlock_channels) { 2938 spdk_for_each_channel(__bdev_to_io_dev(bdev), _spdk_bdev_unfreeze_channel, 2939 bdev_io, _spdk_bdev_reset_complete); 2940 return; 2941 } 2942 } else { 2943 assert(bdev_ch->io_outstanding > 0); 2944 assert(shared_resource->io_outstanding > 0); 2945 bdev_ch->io_outstanding--; 2946 shared_resource->io_outstanding--; 2947 2948 if (spdk_unlikely(status == SPDK_BDEV_IO_STATUS_NOMEM)) { 2949 TAILQ_INSERT_HEAD(&shared_resource->nomem_io, bdev_io, internal.link); 2950 /* 2951 * Wait for some of the outstanding I/O to complete before we 2952 * retry any of the nomem_io. Normally we will wait for 2953 * NOMEM_THRESHOLD_COUNT I/O to complete but for low queue 2954 * depth channels we will instead wait for half to complete. 2955 */ 2956 shared_resource->nomem_threshold = spdk_max((int64_t)shared_resource->io_outstanding / 2, 2957 (int64_t)shared_resource->io_outstanding - NOMEM_THRESHOLD_COUNT); 2958 return; 2959 } 2960 2961 if (spdk_unlikely(!TAILQ_EMPTY(&shared_resource->nomem_io))) { 2962 _spdk_bdev_ch_retry_io(bdev_ch); 2963 } 2964 } 2965 2966 _spdk_bdev_io_complete(bdev_io); 2967 } 2968 2969 void 2970 spdk_bdev_io_complete_scsi_status(struct spdk_bdev_io *bdev_io, enum spdk_scsi_status sc, 2971 enum spdk_scsi_sense sk, uint8_t asc, uint8_t ascq) 2972 { 2973 if (sc == SPDK_SCSI_STATUS_GOOD) { 2974 bdev_io->internal.status = SPDK_BDEV_IO_STATUS_SUCCESS; 2975 } else { 2976 bdev_io->internal.status = SPDK_BDEV_IO_STATUS_SCSI_ERROR; 2977 bdev_io->internal.error.scsi.sc = sc; 2978 bdev_io->internal.error.scsi.sk = sk; 2979 bdev_io->internal.error.scsi.asc = asc; 2980 bdev_io->internal.error.scsi.ascq = ascq; 2981 } 2982 2983 spdk_bdev_io_complete(bdev_io, bdev_io->internal.status); 2984 } 2985 2986 void 2987 spdk_bdev_io_get_scsi_status(const struct spdk_bdev_io *bdev_io, 2988 int *sc, int *sk, int *asc, int *ascq) 2989 { 2990 assert(sc != NULL); 2991 assert(sk != NULL); 2992 assert(asc != NULL); 2993 assert(ascq != NULL); 2994 2995 switch (bdev_io->internal.status) { 2996 case SPDK_BDEV_IO_STATUS_SUCCESS: 2997 *sc = SPDK_SCSI_STATUS_GOOD; 2998 *sk = SPDK_SCSI_SENSE_NO_SENSE; 2999 *asc = SPDK_SCSI_ASC_NO_ADDITIONAL_SENSE; 3000 *ascq = SPDK_SCSI_ASCQ_CAUSE_NOT_REPORTABLE; 3001 break; 3002 case SPDK_BDEV_IO_STATUS_NVME_ERROR: 3003 spdk_scsi_nvme_translate(bdev_io, sc, sk, asc, ascq); 3004 break; 3005 case SPDK_BDEV_IO_STATUS_SCSI_ERROR: 3006 *sc = bdev_io->internal.error.scsi.sc; 3007 *sk = bdev_io->internal.error.scsi.sk; 3008 *asc = bdev_io->internal.error.scsi.asc; 3009 *ascq = bdev_io->internal.error.scsi.ascq; 3010 break; 3011 default: 3012 *sc = SPDK_SCSI_STATUS_CHECK_CONDITION; 3013 *sk = SPDK_SCSI_SENSE_ABORTED_COMMAND; 3014 *asc = SPDK_SCSI_ASC_NO_ADDITIONAL_SENSE; 3015 *ascq = SPDK_SCSI_ASCQ_CAUSE_NOT_REPORTABLE; 3016 break; 3017 } 3018 } 3019 3020 void 3021 spdk_bdev_io_complete_nvme_status(struct spdk_bdev_io *bdev_io, int sct, int sc) 3022 { 3023 if (sct == SPDK_NVME_SCT_GENERIC && sc == SPDK_NVME_SC_SUCCESS) { 3024 bdev_io->internal.status = SPDK_BDEV_IO_STATUS_SUCCESS; 3025 } else { 3026 bdev_io->internal.error.nvme.sct = sct; 3027 bdev_io->internal.error.nvme.sc = sc; 3028 bdev_io->internal.status = SPDK_BDEV_IO_STATUS_NVME_ERROR; 3029 } 3030 3031 spdk_bdev_io_complete(bdev_io, bdev_io->internal.status); 3032 } 3033 3034 void 3035 spdk_bdev_io_get_nvme_status(const struct spdk_bdev_io *bdev_io, int *sct, int *sc) 3036 { 3037 assert(sct != NULL); 3038 assert(sc != NULL); 3039 3040 if (bdev_io->internal.status == SPDK_BDEV_IO_STATUS_NVME_ERROR) { 3041 *sct = bdev_io->internal.error.nvme.sct; 3042 *sc = bdev_io->internal.error.nvme.sc; 3043 } else if (bdev_io->internal.status == SPDK_BDEV_IO_STATUS_SUCCESS) { 3044 *sct = SPDK_NVME_SCT_GENERIC; 3045 *sc = SPDK_NVME_SC_SUCCESS; 3046 } else { 3047 *sct = SPDK_NVME_SCT_GENERIC; 3048 *sc = SPDK_NVME_SC_INTERNAL_DEVICE_ERROR; 3049 } 3050 } 3051 3052 struct spdk_thread * 3053 spdk_bdev_io_get_thread(struct spdk_bdev_io *bdev_io) 3054 { 3055 return spdk_io_channel_get_thread(bdev_io->internal.ch->channel); 3056 } 3057 3058 static void 3059 _spdk_bdev_qos_config_limit(struct spdk_bdev *bdev, uint64_t *limits) 3060 { 3061 uint64_t min_qos_set; 3062 int i; 3063 3064 for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) { 3065 if (limits[i] != SPDK_BDEV_QOS_LIMIT_NOT_DEFINED) { 3066 break; 3067 } 3068 } 3069 3070 if (i == SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES) { 3071 SPDK_ERRLOG("Invalid rate limits set.\n"); 3072 return; 3073 } 3074 3075 for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) { 3076 if (limits[i] == SPDK_BDEV_QOS_LIMIT_NOT_DEFINED) { 3077 continue; 3078 } 3079 3080 if (_spdk_bdev_qos_is_iops_rate_limit(i) == true) { 3081 min_qos_set = SPDK_BDEV_QOS_MIN_IOS_PER_SEC; 3082 } else { 3083 min_qos_set = SPDK_BDEV_QOS_MIN_BYTES_PER_SEC; 3084 } 3085 3086 if (limits[i] == 0 || limits[i] % min_qos_set) { 3087 SPDK_ERRLOG("Assigned limit %" PRIu64 " on bdev %s is not multiple of %" PRIu64 "\n", 3088 limits[i], bdev->name, min_qos_set); 3089 SPDK_ERRLOG("Failed to enable QoS on this bdev %s\n", bdev->name); 3090 return; 3091 } 3092 } 3093 3094 if (!bdev->internal.qos) { 3095 bdev->internal.qos = calloc(1, sizeof(*bdev->internal.qos)); 3096 if (!bdev->internal.qos) { 3097 SPDK_ERRLOG("Unable to allocate memory for QoS tracking\n"); 3098 return; 3099 } 3100 } 3101 3102 for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) { 3103 bdev->internal.qos->rate_limits[i].limit = limits[i]; 3104 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Bdev:%s QoS type:%d set:%lu\n", 3105 bdev->name, i, limits[i]); 3106 } 3107 3108 return; 3109 } 3110 3111 static void 3112 _spdk_bdev_qos_config(struct spdk_bdev *bdev) 3113 { 3114 struct spdk_conf_section *sp = NULL; 3115 const char *val = NULL; 3116 int i = 0, j = 0; 3117 uint64_t limits[SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES] = {}; 3118 bool config_qos = false; 3119 3120 sp = spdk_conf_find_section(NULL, "QoS"); 3121 if (!sp) { 3122 return; 3123 } 3124 3125 while (j < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES) { 3126 limits[j] = SPDK_BDEV_QOS_LIMIT_NOT_DEFINED; 3127 3128 i = 0; 3129 while (true) { 3130 val = spdk_conf_section_get_nmval(sp, qos_conf_type[j], i, 0); 3131 if (!val) { 3132 break; 3133 } 3134 3135 if (strcmp(bdev->name, val) != 0) { 3136 i++; 3137 continue; 3138 } 3139 3140 val = spdk_conf_section_get_nmval(sp, qos_conf_type[j], i, 1); 3141 if (val) { 3142 if (_spdk_bdev_qos_is_iops_rate_limit(j) == true) { 3143 limits[j] = strtoull(val, NULL, 10); 3144 } else { 3145 limits[j] = strtoull(val, NULL, 10) * 1024 * 1024; 3146 } 3147 config_qos = true; 3148 } 3149 3150 break; 3151 } 3152 3153 j++; 3154 } 3155 3156 if (config_qos == true) { 3157 _spdk_bdev_qos_config_limit(bdev, limits); 3158 } 3159 3160 return; 3161 } 3162 3163 static int 3164 spdk_bdev_init(struct spdk_bdev *bdev) 3165 { 3166 char *bdev_name; 3167 3168 assert(bdev->module != NULL); 3169 3170 if (!bdev->name) { 3171 SPDK_ERRLOG("Bdev name is NULL\n"); 3172 return -EINVAL; 3173 } 3174 3175 if (spdk_bdev_get_by_name(bdev->name)) { 3176 SPDK_ERRLOG("Bdev name:%s already exists\n", bdev->name); 3177 return -EEXIST; 3178 } 3179 3180 /* Users often register their own I/O devices using the bdev name. In 3181 * order to avoid conflicts, prepend bdev_. */ 3182 bdev_name = spdk_sprintf_alloc("bdev_%s", bdev->name); 3183 if (!bdev_name) { 3184 SPDK_ERRLOG("Unable to allocate memory for internal bdev name.\n"); 3185 return -ENOMEM; 3186 } 3187 3188 bdev->internal.status = SPDK_BDEV_STATUS_READY; 3189 bdev->internal.measured_queue_depth = UINT64_MAX; 3190 3191 TAILQ_INIT(&bdev->internal.open_descs); 3192 3193 TAILQ_INIT(&bdev->aliases); 3194 3195 bdev->internal.reset_in_progress = NULL; 3196 3197 _spdk_bdev_qos_config(bdev); 3198 3199 spdk_io_device_register(__bdev_to_io_dev(bdev), 3200 spdk_bdev_channel_create, spdk_bdev_channel_destroy, 3201 sizeof(struct spdk_bdev_channel), 3202 bdev_name); 3203 3204 free(bdev_name); 3205 3206 pthread_mutex_init(&bdev->internal.mutex, NULL); 3207 return 0; 3208 } 3209 3210 static void 3211 spdk_bdev_destroy_cb(void *io_device) 3212 { 3213 int rc; 3214 struct spdk_bdev *bdev; 3215 spdk_bdev_unregister_cb cb_fn; 3216 void *cb_arg; 3217 3218 bdev = __bdev_from_io_dev(io_device); 3219 cb_fn = bdev->internal.unregister_cb; 3220 cb_arg = bdev->internal.unregister_ctx; 3221 3222 rc = bdev->fn_table->destruct(bdev->ctxt); 3223 if (rc < 0) { 3224 SPDK_ERRLOG("destruct failed\n"); 3225 } 3226 if (rc <= 0 && cb_fn != NULL) { 3227 cb_fn(cb_arg, rc); 3228 } 3229 } 3230 3231 3232 static void 3233 spdk_bdev_fini(struct spdk_bdev *bdev) 3234 { 3235 pthread_mutex_destroy(&bdev->internal.mutex); 3236 3237 free(bdev->internal.qos); 3238 3239 spdk_io_device_unregister(__bdev_to_io_dev(bdev), spdk_bdev_destroy_cb); 3240 } 3241 3242 static void 3243 spdk_bdev_start(struct spdk_bdev *bdev) 3244 { 3245 struct spdk_bdev_module *module; 3246 uint32_t action; 3247 3248 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Inserting bdev %s into list\n", bdev->name); 3249 TAILQ_INSERT_TAIL(&g_bdev_mgr.bdevs, bdev, internal.link); 3250 3251 /* Examine configuration before initializing I/O */ 3252 TAILQ_FOREACH(module, &g_bdev_mgr.bdev_modules, internal.tailq) { 3253 if (module->examine_config) { 3254 action = module->internal.action_in_progress; 3255 module->internal.action_in_progress++; 3256 module->examine_config(bdev); 3257 if (action != module->internal.action_in_progress) { 3258 SPDK_ERRLOG("examine_config for module %s did not call spdk_bdev_module_examine_done()\n", 3259 module->name); 3260 } 3261 } 3262 } 3263 3264 if (bdev->internal.claim_module) { 3265 return; 3266 } 3267 3268 TAILQ_FOREACH(module, &g_bdev_mgr.bdev_modules, internal.tailq) { 3269 if (module->examine_disk) { 3270 module->internal.action_in_progress++; 3271 module->examine_disk(bdev); 3272 } 3273 } 3274 } 3275 3276 int 3277 spdk_bdev_register(struct spdk_bdev *bdev) 3278 { 3279 int rc = spdk_bdev_init(bdev); 3280 3281 if (rc == 0) { 3282 spdk_bdev_start(bdev); 3283 } 3284 3285 return rc; 3286 } 3287 3288 int 3289 spdk_vbdev_register(struct spdk_bdev *vbdev, struct spdk_bdev **base_bdevs, int base_bdev_count) 3290 { 3291 int rc; 3292 3293 rc = spdk_bdev_init(vbdev); 3294 if (rc) { 3295 return rc; 3296 } 3297 3298 spdk_bdev_start(vbdev); 3299 return 0; 3300 } 3301 3302 void 3303 spdk_bdev_destruct_done(struct spdk_bdev *bdev, int bdeverrno) 3304 { 3305 if (bdev->internal.unregister_cb != NULL) { 3306 bdev->internal.unregister_cb(bdev->internal.unregister_ctx, bdeverrno); 3307 } 3308 } 3309 3310 static void 3311 _remove_notify(void *arg) 3312 { 3313 struct spdk_bdev_desc *desc = arg; 3314 3315 desc->remove_scheduled = false; 3316 3317 if (desc->closed) { 3318 free(desc); 3319 } else { 3320 desc->remove_cb(desc->remove_ctx); 3321 } 3322 } 3323 3324 void 3325 spdk_bdev_unregister(struct spdk_bdev *bdev, spdk_bdev_unregister_cb cb_fn, void *cb_arg) 3326 { 3327 struct spdk_bdev_desc *desc, *tmp; 3328 bool do_destruct = true; 3329 struct spdk_thread *thread; 3330 3331 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Removing bdev %s from list\n", bdev->name); 3332 3333 thread = spdk_get_thread(); 3334 if (!thread) { 3335 /* The user called this from a non-SPDK thread. */ 3336 if (cb_fn != NULL) { 3337 cb_fn(cb_arg, -ENOTSUP); 3338 } 3339 return; 3340 } 3341 3342 pthread_mutex_lock(&bdev->internal.mutex); 3343 3344 bdev->internal.status = SPDK_BDEV_STATUS_REMOVING; 3345 bdev->internal.unregister_cb = cb_fn; 3346 bdev->internal.unregister_ctx = cb_arg; 3347 3348 TAILQ_FOREACH_SAFE(desc, &bdev->internal.open_descs, link, tmp) { 3349 if (desc->remove_cb) { 3350 do_destruct = false; 3351 /* 3352 * Defer invocation of the remove_cb to a separate message that will 3353 * run later on its thread. This ensures this context unwinds and 3354 * we don't recursively unregister this bdev again if the remove_cb 3355 * immediately closes its descriptor. 3356 */ 3357 if (!desc->remove_scheduled) { 3358 /* Avoid scheduling removal of the same descriptor multiple times. */ 3359 desc->remove_scheduled = true; 3360 spdk_thread_send_msg(desc->thread, _remove_notify, desc); 3361 } 3362 } 3363 } 3364 3365 if (!do_destruct) { 3366 pthread_mutex_unlock(&bdev->internal.mutex); 3367 return; 3368 } 3369 3370 TAILQ_REMOVE(&g_bdev_mgr.bdevs, bdev, internal.link); 3371 pthread_mutex_unlock(&bdev->internal.mutex); 3372 3373 spdk_bdev_fini(bdev); 3374 } 3375 3376 int 3377 spdk_bdev_open(struct spdk_bdev *bdev, bool write, spdk_bdev_remove_cb_t remove_cb, 3378 void *remove_ctx, struct spdk_bdev_desc **_desc) 3379 { 3380 struct spdk_bdev_desc *desc; 3381 struct spdk_thread *thread; 3382 3383 thread = spdk_get_thread(); 3384 if (!thread) { 3385 SPDK_ERRLOG("Cannot open bdev from non-SPDK thread.\n"); 3386 return -ENOTSUP; 3387 } 3388 3389 desc = calloc(1, sizeof(*desc)); 3390 if (desc == NULL) { 3391 SPDK_ERRLOG("Failed to allocate memory for bdev descriptor\n"); 3392 return -ENOMEM; 3393 } 3394 3395 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Opening descriptor %p for bdev %s on thread %p\n", desc, bdev->name, 3396 spdk_get_thread()); 3397 3398 pthread_mutex_lock(&bdev->internal.mutex); 3399 3400 if (write && bdev->internal.claim_module) { 3401 SPDK_ERRLOG("Could not open %s - %s module already claimed it\n", 3402 bdev->name, bdev->internal.claim_module->name); 3403 free(desc); 3404 pthread_mutex_unlock(&bdev->internal.mutex); 3405 return -EPERM; 3406 } 3407 3408 TAILQ_INSERT_TAIL(&bdev->internal.open_descs, desc, link); 3409 3410 desc->bdev = bdev; 3411 desc->thread = thread; 3412 desc->remove_cb = remove_cb; 3413 desc->remove_ctx = remove_ctx; 3414 desc->write = write; 3415 *_desc = desc; 3416 3417 pthread_mutex_unlock(&bdev->internal.mutex); 3418 3419 return 0; 3420 } 3421 3422 void 3423 spdk_bdev_close(struct spdk_bdev_desc *desc) 3424 { 3425 struct spdk_bdev *bdev = desc->bdev; 3426 bool do_unregister = false; 3427 3428 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Closing descriptor %p for bdev %s on thread %p\n", desc, bdev->name, 3429 spdk_get_thread()); 3430 3431 assert(desc->thread == spdk_get_thread()); 3432 3433 pthread_mutex_lock(&bdev->internal.mutex); 3434 3435 TAILQ_REMOVE(&bdev->internal.open_descs, desc, link); 3436 3437 desc->closed = true; 3438 3439 if (!desc->remove_scheduled) { 3440 free(desc); 3441 } 3442 3443 /* If no more descriptors, kill QoS channel */ 3444 if (bdev->internal.qos && TAILQ_EMPTY(&bdev->internal.open_descs)) { 3445 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Closed last descriptor for bdev %s on thread %p. Stopping QoS.\n", 3446 bdev->name, spdk_get_thread()); 3447 3448 if (spdk_bdev_qos_destroy(bdev)) { 3449 /* There isn't anything we can do to recover here. Just let the 3450 * old QoS poller keep running. The QoS handling won't change 3451 * cores when the user allocates a new channel, but it won't break. */ 3452 SPDK_ERRLOG("Unable to shut down QoS poller. It will continue running on the current thread.\n"); 3453 } 3454 } 3455 3456 spdk_bdev_set_qd_sampling_period(bdev, 0); 3457 3458 if (bdev->internal.status == SPDK_BDEV_STATUS_REMOVING && TAILQ_EMPTY(&bdev->internal.open_descs)) { 3459 do_unregister = true; 3460 } 3461 pthread_mutex_unlock(&bdev->internal.mutex); 3462 3463 if (do_unregister == true) { 3464 spdk_bdev_unregister(bdev, bdev->internal.unregister_cb, bdev->internal.unregister_ctx); 3465 } 3466 } 3467 3468 int 3469 spdk_bdev_module_claim_bdev(struct spdk_bdev *bdev, struct spdk_bdev_desc *desc, 3470 struct spdk_bdev_module *module) 3471 { 3472 if (bdev->internal.claim_module != NULL) { 3473 SPDK_ERRLOG("bdev %s already claimed by module %s\n", bdev->name, 3474 bdev->internal.claim_module->name); 3475 return -EPERM; 3476 } 3477 3478 if (desc && !desc->write) { 3479 desc->write = true; 3480 } 3481 3482 bdev->internal.claim_module = module; 3483 return 0; 3484 } 3485 3486 void 3487 spdk_bdev_module_release_bdev(struct spdk_bdev *bdev) 3488 { 3489 assert(bdev->internal.claim_module != NULL); 3490 bdev->internal.claim_module = NULL; 3491 } 3492 3493 struct spdk_bdev * 3494 spdk_bdev_desc_get_bdev(struct spdk_bdev_desc *desc) 3495 { 3496 return desc->bdev; 3497 } 3498 3499 void 3500 spdk_bdev_io_get_iovec(struct spdk_bdev_io *bdev_io, struct iovec **iovp, int *iovcntp) 3501 { 3502 struct iovec *iovs; 3503 int iovcnt; 3504 3505 if (bdev_io == NULL) { 3506 return; 3507 } 3508 3509 switch (bdev_io->type) { 3510 case SPDK_BDEV_IO_TYPE_READ: 3511 iovs = bdev_io->u.bdev.iovs; 3512 iovcnt = bdev_io->u.bdev.iovcnt; 3513 break; 3514 case SPDK_BDEV_IO_TYPE_WRITE: 3515 iovs = bdev_io->u.bdev.iovs; 3516 iovcnt = bdev_io->u.bdev.iovcnt; 3517 break; 3518 default: 3519 iovs = NULL; 3520 iovcnt = 0; 3521 break; 3522 } 3523 3524 if (iovp) { 3525 *iovp = iovs; 3526 } 3527 if (iovcntp) { 3528 *iovcntp = iovcnt; 3529 } 3530 } 3531 3532 void 3533 spdk_bdev_module_list_add(struct spdk_bdev_module *bdev_module) 3534 { 3535 3536 if (spdk_bdev_module_list_find(bdev_module->name)) { 3537 SPDK_ERRLOG("ERROR: module '%s' already registered.\n", bdev_module->name); 3538 assert(false); 3539 } 3540 3541 if (bdev_module->async_init) { 3542 bdev_module->internal.action_in_progress = 1; 3543 } 3544 3545 /* 3546 * Modules with examine callbacks must be initialized first, so they are 3547 * ready to handle examine callbacks from later modules that will 3548 * register physical bdevs. 3549 */ 3550 if (bdev_module->examine_config != NULL || bdev_module->examine_disk != NULL) { 3551 TAILQ_INSERT_HEAD(&g_bdev_mgr.bdev_modules, bdev_module, internal.tailq); 3552 } else { 3553 TAILQ_INSERT_TAIL(&g_bdev_mgr.bdev_modules, bdev_module, internal.tailq); 3554 } 3555 } 3556 3557 struct spdk_bdev_module * 3558 spdk_bdev_module_list_find(const char *name) 3559 { 3560 struct spdk_bdev_module *bdev_module; 3561 3562 TAILQ_FOREACH(bdev_module, &g_bdev_mgr.bdev_modules, internal.tailq) { 3563 if (strcmp(name, bdev_module->name) == 0) { 3564 break; 3565 } 3566 } 3567 3568 return bdev_module; 3569 } 3570 3571 static void 3572 _spdk_bdev_write_zero_buffer_next(void *_bdev_io) 3573 { 3574 struct spdk_bdev_io *bdev_io = _bdev_io; 3575 uint64_t num_bytes, num_blocks; 3576 int rc; 3577 3578 num_bytes = spdk_min(spdk_bdev_get_block_size(bdev_io->bdev) * 3579 bdev_io->u.bdev.split_remaining_num_blocks, 3580 ZERO_BUFFER_SIZE); 3581 num_blocks = num_bytes / spdk_bdev_get_block_size(bdev_io->bdev); 3582 3583 rc = spdk_bdev_write_blocks(bdev_io->internal.desc, 3584 spdk_io_channel_from_ctx(bdev_io->internal.ch), 3585 g_bdev_mgr.zero_buffer, 3586 bdev_io->u.bdev.split_current_offset_blocks, num_blocks, 3587 _spdk_bdev_write_zero_buffer_done, bdev_io); 3588 if (rc == 0) { 3589 bdev_io->u.bdev.split_remaining_num_blocks -= num_blocks; 3590 bdev_io->u.bdev.split_current_offset_blocks += num_blocks; 3591 } else if (rc == -ENOMEM) { 3592 _spdk_bdev_queue_io_wait_with_cb(bdev_io, _spdk_bdev_write_zero_buffer_next); 3593 } else { 3594 bdev_io->internal.status = SPDK_BDEV_IO_STATUS_FAILED; 3595 bdev_io->internal.cb(bdev_io, false, bdev_io->internal.caller_ctx); 3596 } 3597 } 3598 3599 static void 3600 _spdk_bdev_write_zero_buffer_done(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg) 3601 { 3602 struct spdk_bdev_io *parent_io = cb_arg; 3603 3604 spdk_bdev_free_io(bdev_io); 3605 3606 if (!success) { 3607 parent_io->internal.status = SPDK_BDEV_IO_STATUS_FAILED; 3608 parent_io->internal.cb(parent_io, false, parent_io->internal.caller_ctx); 3609 return; 3610 } 3611 3612 if (parent_io->u.bdev.split_remaining_num_blocks == 0) { 3613 parent_io->internal.status = SPDK_BDEV_IO_STATUS_SUCCESS; 3614 parent_io->internal.cb(parent_io, true, parent_io->internal.caller_ctx); 3615 return; 3616 } 3617 3618 _spdk_bdev_write_zero_buffer_next(parent_io); 3619 } 3620 3621 struct set_qos_limit_ctx { 3622 void (*cb_fn)(void *cb_arg, int status); 3623 void *cb_arg; 3624 struct spdk_bdev *bdev; 3625 }; 3626 3627 static void 3628 _spdk_bdev_set_qos_limit_done(struct set_qos_limit_ctx *ctx, int status) 3629 { 3630 pthread_mutex_lock(&ctx->bdev->internal.mutex); 3631 ctx->bdev->internal.qos_mod_in_progress = false; 3632 pthread_mutex_unlock(&ctx->bdev->internal.mutex); 3633 3634 ctx->cb_fn(ctx->cb_arg, status); 3635 free(ctx); 3636 } 3637 3638 static void 3639 _spdk_bdev_disable_qos_done(void *cb_arg) 3640 { 3641 struct set_qos_limit_ctx *ctx = cb_arg; 3642 struct spdk_bdev *bdev = ctx->bdev; 3643 struct spdk_bdev_io *bdev_io; 3644 struct spdk_bdev_qos *qos; 3645 3646 pthread_mutex_lock(&bdev->internal.mutex); 3647 qos = bdev->internal.qos; 3648 bdev->internal.qos = NULL; 3649 pthread_mutex_unlock(&bdev->internal.mutex); 3650 3651 while (!TAILQ_EMPTY(&qos->queued)) { 3652 /* Send queued I/O back to their original thread for resubmission. */ 3653 bdev_io = TAILQ_FIRST(&qos->queued); 3654 TAILQ_REMOVE(&qos->queued, bdev_io, internal.link); 3655 3656 if (bdev_io->internal.io_submit_ch) { 3657 /* 3658 * Channel was changed when sending it to the QoS thread - change it back 3659 * before sending it back to the original thread. 3660 */ 3661 bdev_io->internal.ch = bdev_io->internal.io_submit_ch; 3662 bdev_io->internal.io_submit_ch = NULL; 3663 } 3664 3665 spdk_thread_send_msg(spdk_io_channel_get_thread(bdev_io->internal.ch->channel), 3666 _spdk_bdev_io_submit, bdev_io); 3667 } 3668 3669 spdk_put_io_channel(spdk_io_channel_from_ctx(qos->ch)); 3670 spdk_poller_unregister(&qos->poller); 3671 3672 free(qos); 3673 3674 _spdk_bdev_set_qos_limit_done(ctx, 0); 3675 } 3676 3677 static void 3678 _spdk_bdev_disable_qos_msg_done(struct spdk_io_channel_iter *i, int status) 3679 { 3680 void *io_device = spdk_io_channel_iter_get_io_device(i); 3681 struct spdk_bdev *bdev = __bdev_from_io_dev(io_device); 3682 struct set_qos_limit_ctx *ctx = spdk_io_channel_iter_get_ctx(i); 3683 struct spdk_thread *thread; 3684 3685 pthread_mutex_lock(&bdev->internal.mutex); 3686 thread = bdev->internal.qos->thread; 3687 pthread_mutex_unlock(&bdev->internal.mutex); 3688 3689 spdk_thread_send_msg(thread, _spdk_bdev_disable_qos_done, ctx); 3690 } 3691 3692 static void 3693 _spdk_bdev_disable_qos_msg(struct spdk_io_channel_iter *i) 3694 { 3695 struct spdk_io_channel *ch = spdk_io_channel_iter_get_channel(i); 3696 struct spdk_bdev_channel *bdev_ch = spdk_io_channel_get_ctx(ch); 3697 3698 bdev_ch->flags &= ~BDEV_CH_QOS_ENABLED; 3699 3700 spdk_for_each_channel_continue(i, 0); 3701 } 3702 3703 static void 3704 _spdk_bdev_update_qos_rate_limit_msg(void *cb_arg) 3705 { 3706 struct set_qos_limit_ctx *ctx = cb_arg; 3707 struct spdk_bdev *bdev = ctx->bdev; 3708 3709 pthread_mutex_lock(&bdev->internal.mutex); 3710 spdk_bdev_qos_update_max_quota_per_timeslice(bdev->internal.qos); 3711 pthread_mutex_unlock(&bdev->internal.mutex); 3712 3713 _spdk_bdev_set_qos_limit_done(ctx, 0); 3714 } 3715 3716 static void 3717 _spdk_bdev_enable_qos_msg(struct spdk_io_channel_iter *i) 3718 { 3719 void *io_device = spdk_io_channel_iter_get_io_device(i); 3720 struct spdk_bdev *bdev = __bdev_from_io_dev(io_device); 3721 struct spdk_io_channel *ch = spdk_io_channel_iter_get_channel(i); 3722 struct spdk_bdev_channel *bdev_ch = spdk_io_channel_get_ctx(ch); 3723 3724 pthread_mutex_lock(&bdev->internal.mutex); 3725 _spdk_bdev_enable_qos(bdev, bdev_ch); 3726 pthread_mutex_unlock(&bdev->internal.mutex); 3727 spdk_for_each_channel_continue(i, 0); 3728 } 3729 3730 static void 3731 _spdk_bdev_enable_qos_done(struct spdk_io_channel_iter *i, int status) 3732 { 3733 struct set_qos_limit_ctx *ctx = spdk_io_channel_iter_get_ctx(i); 3734 3735 _spdk_bdev_set_qos_limit_done(ctx, status); 3736 } 3737 3738 static void 3739 _spdk_bdev_set_qos_rate_limits(struct spdk_bdev *bdev, uint64_t *limits) 3740 { 3741 int i; 3742 3743 assert(bdev->internal.qos != NULL); 3744 3745 for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) { 3746 if (limits[i] != SPDK_BDEV_QOS_LIMIT_NOT_DEFINED) { 3747 bdev->internal.qos->rate_limits[i].limit = limits[i]; 3748 3749 if (limits[i] == 0) { 3750 bdev->internal.qos->rate_limits[i].limit = 3751 SPDK_BDEV_QOS_LIMIT_NOT_DEFINED; 3752 } 3753 } 3754 } 3755 } 3756 3757 void 3758 spdk_bdev_set_qos_rate_limits(struct spdk_bdev *bdev, uint64_t *limits, 3759 void (*cb_fn)(void *cb_arg, int status), void *cb_arg) 3760 { 3761 struct set_qos_limit_ctx *ctx; 3762 uint32_t limit_set_complement; 3763 uint64_t min_limit_per_sec; 3764 int i; 3765 bool disable_rate_limit = true; 3766 3767 for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) { 3768 if (limits[i] == SPDK_BDEV_QOS_LIMIT_NOT_DEFINED) { 3769 continue; 3770 } 3771 3772 if (limits[i] > 0) { 3773 disable_rate_limit = false; 3774 } 3775 3776 if (_spdk_bdev_qos_is_iops_rate_limit(i) == true) { 3777 min_limit_per_sec = SPDK_BDEV_QOS_MIN_IOS_PER_SEC; 3778 } else { 3779 /* Change from megabyte to byte rate limit */ 3780 limits[i] = limits[i] * 1024 * 1024; 3781 min_limit_per_sec = SPDK_BDEV_QOS_MIN_BYTES_PER_SEC; 3782 } 3783 3784 limit_set_complement = limits[i] % min_limit_per_sec; 3785 if (limit_set_complement) { 3786 SPDK_ERRLOG("Requested rate limit %" PRIu64 " is not a multiple of %" PRIu64 "\n", 3787 limits[i], min_limit_per_sec); 3788 limits[i] += min_limit_per_sec - limit_set_complement; 3789 SPDK_ERRLOG("Round up the rate limit to %" PRIu64 "\n", limits[i]); 3790 } 3791 } 3792 3793 ctx = calloc(1, sizeof(*ctx)); 3794 if (ctx == NULL) { 3795 cb_fn(cb_arg, -ENOMEM); 3796 return; 3797 } 3798 3799 ctx->cb_fn = cb_fn; 3800 ctx->cb_arg = cb_arg; 3801 ctx->bdev = bdev; 3802 3803 pthread_mutex_lock(&bdev->internal.mutex); 3804 if (bdev->internal.qos_mod_in_progress) { 3805 pthread_mutex_unlock(&bdev->internal.mutex); 3806 free(ctx); 3807 cb_fn(cb_arg, -EAGAIN); 3808 return; 3809 } 3810 bdev->internal.qos_mod_in_progress = true; 3811 3812 if (disable_rate_limit == true && bdev->internal.qos) { 3813 for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) { 3814 if (limits[i] == SPDK_BDEV_QOS_LIMIT_NOT_DEFINED && 3815 (bdev->internal.qos->rate_limits[i].limit > 0 && 3816 bdev->internal.qos->rate_limits[i].limit != 3817 SPDK_BDEV_QOS_LIMIT_NOT_DEFINED)) { 3818 disable_rate_limit = false; 3819 break; 3820 } 3821 } 3822 } 3823 3824 if (disable_rate_limit == false) { 3825 if (bdev->internal.qos == NULL) { 3826 /* Enabling */ 3827 bdev->internal.qos = calloc(1, sizeof(*bdev->internal.qos)); 3828 if (!bdev->internal.qos) { 3829 pthread_mutex_unlock(&bdev->internal.mutex); 3830 SPDK_ERRLOG("Unable to allocate memory for QoS tracking\n"); 3831 free(ctx); 3832 cb_fn(cb_arg, -ENOMEM); 3833 return; 3834 } 3835 3836 _spdk_bdev_set_qos_rate_limits(bdev, limits); 3837 3838 spdk_for_each_channel(__bdev_to_io_dev(bdev), 3839 _spdk_bdev_enable_qos_msg, ctx, 3840 _spdk_bdev_enable_qos_done); 3841 } else { 3842 /* Updating */ 3843 _spdk_bdev_set_qos_rate_limits(bdev, limits); 3844 3845 spdk_thread_send_msg(bdev->internal.qos->thread, 3846 _spdk_bdev_update_qos_rate_limit_msg, ctx); 3847 } 3848 } else { 3849 if (bdev->internal.qos != NULL) { 3850 _spdk_bdev_set_qos_rate_limits(bdev, limits); 3851 3852 /* Disabling */ 3853 spdk_for_each_channel(__bdev_to_io_dev(bdev), 3854 _spdk_bdev_disable_qos_msg, ctx, 3855 _spdk_bdev_disable_qos_msg_done); 3856 } else { 3857 pthread_mutex_unlock(&bdev->internal.mutex); 3858 _spdk_bdev_set_qos_limit_done(ctx, 0); 3859 return; 3860 } 3861 } 3862 3863 pthread_mutex_unlock(&bdev->internal.mutex); 3864 } 3865 3866 SPDK_LOG_REGISTER_COMPONENT("bdev", SPDK_LOG_BDEV) 3867 3868 SPDK_TRACE_REGISTER_FN(bdev_trace) 3869 { 3870 spdk_trace_register_owner(OWNER_BDEV, 'b'); 3871 spdk_trace_register_object(OBJECT_BDEV_IO, 'i'); 3872 spdk_trace_register_description("BDEV_IO_START", "", TRACE_BDEV_IO_START, OWNER_BDEV, 3873 OBJECT_BDEV_IO, 1, 0, "type: "); 3874 spdk_trace_register_description("BDEV_IO_DONE", "", TRACE_BDEV_IO_DONE, OWNER_BDEV, 3875 OBJECT_BDEV_IO, 0, 0, ""); 3876 } 3877