1 /*- 2 * BSD LICENSE 3 * 4 * Copyright (c) Intel Corporation. 5 * All rights reserved. 6 * 7 * Redistribution and use in source and binary forms, with or without 8 * modification, are permitted provided that the following conditions 9 * are met: 10 * 11 * * Redistributions of source code must retain the above copyright 12 * notice, this list of conditions and the following disclaimer. 13 * * Redistributions in binary form must reproduce the above copyright 14 * notice, this list of conditions and the following disclaimer in 15 * the documentation and/or other materials provided with the 16 * distribution. 17 * * Neither the name of Intel Corporation nor the names of its 18 * contributors may be used to endorse or promote products derived 19 * from this software without specific prior written permission. 20 * 21 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 22 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 23 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR 24 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT 25 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, 26 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT 27 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 28 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY 29 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 30 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE 31 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 32 */ 33 34 #include "spdk/stdinc.h" 35 36 #include "spdk/bdev.h" 37 #include "spdk/conf.h" 38 39 #include "spdk/config.h" 40 #include "spdk/env.h" 41 #include "spdk/event.h" 42 #include "spdk/thread.h" 43 #include "spdk/likely.h" 44 #include "spdk/queue.h" 45 #include "spdk/nvme_spec.h" 46 #include "spdk/scsi_spec.h" 47 #include "spdk/util.h" 48 #include "spdk/trace.h" 49 50 #include "spdk/bdev_module.h" 51 #include "spdk_internal/log.h" 52 #include "spdk/string.h" 53 54 #ifdef SPDK_CONFIG_VTUNE 55 #include "ittnotify.h" 56 #include "ittnotify_types.h" 57 int __itt_init_ittlib(const char *, __itt_group_id); 58 #endif 59 60 #define SPDK_BDEV_IO_POOL_SIZE (64 * 1024) 61 #define SPDK_BDEV_IO_CACHE_SIZE 256 62 #define BUF_SMALL_POOL_SIZE 8192 63 #define BUF_LARGE_POOL_SIZE 1024 64 #define NOMEM_THRESHOLD_COUNT 8 65 #define ZERO_BUFFER_SIZE 0x100000 66 67 #define OWNER_BDEV 0x2 68 69 #define OBJECT_BDEV_IO 0x2 70 71 #define TRACE_GROUP_BDEV 0x3 72 #define TRACE_BDEV_IO_START SPDK_TPOINT_ID(TRACE_GROUP_BDEV, 0x0) 73 #define TRACE_BDEV_IO_DONE SPDK_TPOINT_ID(TRACE_GROUP_BDEV, 0x1) 74 75 #define SPDK_BDEV_QOS_TIMESLICE_IN_USEC 1000 76 #define SPDK_BDEV_QOS_MIN_IO_PER_TIMESLICE 1 77 #define SPDK_BDEV_QOS_MIN_BYTE_PER_TIMESLICE 512 78 #define SPDK_BDEV_QOS_MIN_IOS_PER_SEC 10000 79 #define SPDK_BDEV_QOS_MIN_BYTES_PER_SEC (10 * 1024 * 1024) 80 #define SPDK_BDEV_QOS_LIMIT_NOT_DEFINED UINT64_MAX 81 82 static const char *qos_conf_type[] = {"Limit_IOPS", "Limit_BPS"}; 83 static const char *qos_rpc_type[] = {"rw_ios_per_sec", "rw_mbytes_per_sec"}; 84 85 TAILQ_HEAD(spdk_bdev_list, spdk_bdev); 86 87 struct spdk_bdev_mgr { 88 struct spdk_mempool *bdev_io_pool; 89 90 struct spdk_mempool *buf_small_pool; 91 struct spdk_mempool *buf_large_pool; 92 93 void *zero_buffer; 94 95 TAILQ_HEAD(bdev_module_list, spdk_bdev_module) bdev_modules; 96 97 struct spdk_bdev_list bdevs; 98 99 bool init_complete; 100 bool module_init_complete; 101 102 #ifdef SPDK_CONFIG_VTUNE 103 __itt_domain *domain; 104 #endif 105 }; 106 107 static struct spdk_bdev_mgr g_bdev_mgr = { 108 .bdev_modules = TAILQ_HEAD_INITIALIZER(g_bdev_mgr.bdev_modules), 109 .bdevs = TAILQ_HEAD_INITIALIZER(g_bdev_mgr.bdevs), 110 .init_complete = false, 111 .module_init_complete = false, 112 }; 113 114 static struct spdk_bdev_opts g_bdev_opts = { 115 .bdev_io_pool_size = SPDK_BDEV_IO_POOL_SIZE, 116 .bdev_io_cache_size = SPDK_BDEV_IO_CACHE_SIZE, 117 }; 118 119 static spdk_bdev_init_cb g_init_cb_fn = NULL; 120 static void *g_init_cb_arg = NULL; 121 122 static spdk_bdev_fini_cb g_fini_cb_fn = NULL; 123 static void *g_fini_cb_arg = NULL; 124 static struct spdk_thread *g_fini_thread = NULL; 125 126 struct spdk_bdev_qos_limit { 127 /** IOs or bytes allowed per second (i.e., 1s). */ 128 uint64_t limit; 129 130 /** Remaining IOs or bytes allowed in current timeslice (e.g., 1ms). 131 * For remaining bytes, allowed to run negative if an I/O is submitted when 132 * some bytes are remaining, but the I/O is bigger than that amount. The 133 * excess will be deducted from the next timeslice. 134 */ 135 int64_t remaining_this_timeslice; 136 137 /** Minimum allowed IOs or bytes to be issued in one timeslice (e.g., 1ms). */ 138 uint32_t min_per_timeslice; 139 140 /** Maximum allowed IOs or bytes to be issued in one timeslice (e.g., 1ms). */ 141 uint32_t max_per_timeslice; 142 }; 143 144 struct spdk_bdev_qos { 145 /** Types of structure of rate limits. */ 146 struct spdk_bdev_qos_limit rate_limits[SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES]; 147 148 /** The channel that all I/O are funneled through. */ 149 struct spdk_bdev_channel *ch; 150 151 /** The thread on which the poller is running. */ 152 struct spdk_thread *thread; 153 154 /** Queue of I/O waiting to be issued. */ 155 bdev_io_tailq_t queued; 156 157 /** Size of a timeslice in tsc ticks. */ 158 uint64_t timeslice_size; 159 160 /** Timestamp of start of last timeslice. */ 161 uint64_t last_timeslice; 162 163 /** Poller that processes queued I/O commands each time slice. */ 164 struct spdk_poller *poller; 165 }; 166 167 struct spdk_bdev_mgmt_channel { 168 bdev_io_stailq_t need_buf_small; 169 bdev_io_stailq_t need_buf_large; 170 171 /* 172 * Each thread keeps a cache of bdev_io - this allows 173 * bdev threads which are *not* DPDK threads to still 174 * benefit from a per-thread bdev_io cache. Without 175 * this, non-DPDK threads fetching from the mempool 176 * incur a cmpxchg on get and put. 177 */ 178 bdev_io_stailq_t per_thread_cache; 179 uint32_t per_thread_cache_count; 180 uint32_t bdev_io_cache_size; 181 182 TAILQ_HEAD(, spdk_bdev_shared_resource) shared_resources; 183 TAILQ_HEAD(, spdk_bdev_io_wait_entry) io_wait_queue; 184 }; 185 186 /* 187 * Per-module (or per-io_device) data. Multiple bdevs built on the same io_device 188 * will queue here their IO that awaits retry. It makes it possible to retry sending 189 * IO to one bdev after IO from other bdev completes. 190 */ 191 struct spdk_bdev_shared_resource { 192 /* The bdev management channel */ 193 struct spdk_bdev_mgmt_channel *mgmt_ch; 194 195 /* 196 * Count of I/O submitted to bdev module and waiting for completion. 197 * Incremented before submit_request() is called on an spdk_bdev_io. 198 */ 199 uint64_t io_outstanding; 200 201 /* 202 * Queue of IO awaiting retry because of a previous NOMEM status returned 203 * on this channel. 204 */ 205 bdev_io_tailq_t nomem_io; 206 207 /* 208 * Threshold which io_outstanding must drop to before retrying nomem_io. 209 */ 210 uint64_t nomem_threshold; 211 212 /* I/O channel allocated by a bdev module */ 213 struct spdk_io_channel *shared_ch; 214 215 /* Refcount of bdev channels using this resource */ 216 uint32_t ref; 217 218 TAILQ_ENTRY(spdk_bdev_shared_resource) link; 219 }; 220 221 #define BDEV_CH_RESET_IN_PROGRESS (1 << 0) 222 #define BDEV_CH_QOS_ENABLED (1 << 1) 223 224 struct spdk_bdev_channel { 225 struct spdk_bdev *bdev; 226 227 /* The channel for the underlying device */ 228 struct spdk_io_channel *channel; 229 230 /* Per io_device per thread data */ 231 struct spdk_bdev_shared_resource *shared_resource; 232 233 struct spdk_bdev_io_stat stat; 234 235 /* 236 * Count of I/O submitted through this channel and waiting for completion. 237 * Incremented before submit_request() is called on an spdk_bdev_io. 238 */ 239 uint64_t io_outstanding; 240 241 bdev_io_tailq_t queued_resets; 242 243 uint32_t flags; 244 245 #ifdef SPDK_CONFIG_VTUNE 246 uint64_t start_tsc; 247 uint64_t interval_tsc; 248 __itt_string_handle *handle; 249 struct spdk_bdev_io_stat prev_stat; 250 #endif 251 252 }; 253 254 struct spdk_bdev_desc { 255 struct spdk_bdev *bdev; 256 struct spdk_thread *thread; 257 spdk_bdev_remove_cb_t remove_cb; 258 void *remove_ctx; 259 bool remove_scheduled; 260 bool closed; 261 bool write; 262 TAILQ_ENTRY(spdk_bdev_desc) link; 263 }; 264 265 struct spdk_bdev_iostat_ctx { 266 struct spdk_bdev_io_stat *stat; 267 spdk_bdev_get_device_stat_cb cb; 268 void *cb_arg; 269 }; 270 271 #define __bdev_to_io_dev(bdev) (((char *)bdev) + 1) 272 #define __bdev_from_io_dev(io_dev) ((struct spdk_bdev *)(((char *)io_dev) - 1)) 273 274 static void _spdk_bdev_write_zero_buffer_done(struct spdk_bdev_io *bdev_io, bool success, 275 void *cb_arg); 276 static void _spdk_bdev_write_zero_buffer_next(void *_bdev_io); 277 278 void 279 spdk_bdev_get_opts(struct spdk_bdev_opts *opts) 280 { 281 *opts = g_bdev_opts; 282 } 283 284 int 285 spdk_bdev_set_opts(struct spdk_bdev_opts *opts) 286 { 287 uint32_t min_pool_size; 288 289 /* 290 * Add 1 to the thread count to account for the extra mgmt_ch that gets created during subsystem 291 * initialization. A second mgmt_ch will be created on the same thread when the application starts 292 * but before the deferred put_io_channel event is executed for the first mgmt_ch. 293 */ 294 min_pool_size = opts->bdev_io_cache_size * (spdk_thread_get_count() + 1); 295 if (opts->bdev_io_pool_size < min_pool_size) { 296 SPDK_ERRLOG("bdev_io_pool_size %" PRIu32 " is not compatible with bdev_io_cache_size %" PRIu32 297 " and %" PRIu32 " threads\n", opts->bdev_io_pool_size, opts->bdev_io_cache_size, 298 spdk_thread_get_count()); 299 SPDK_ERRLOG("bdev_io_pool_size must be at least %" PRIu32 "\n", min_pool_size); 300 return -1; 301 } 302 303 g_bdev_opts = *opts; 304 return 0; 305 } 306 307 struct spdk_bdev * 308 spdk_bdev_first(void) 309 { 310 struct spdk_bdev *bdev; 311 312 bdev = TAILQ_FIRST(&g_bdev_mgr.bdevs); 313 if (bdev) { 314 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Starting bdev iteration at %s\n", bdev->name); 315 } 316 317 return bdev; 318 } 319 320 struct spdk_bdev * 321 spdk_bdev_next(struct spdk_bdev *prev) 322 { 323 struct spdk_bdev *bdev; 324 325 bdev = TAILQ_NEXT(prev, internal.link); 326 if (bdev) { 327 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Continuing bdev iteration at %s\n", bdev->name); 328 } 329 330 return bdev; 331 } 332 333 static struct spdk_bdev * 334 _bdev_next_leaf(struct spdk_bdev *bdev) 335 { 336 while (bdev != NULL) { 337 if (bdev->internal.claim_module == NULL) { 338 return bdev; 339 } else { 340 bdev = TAILQ_NEXT(bdev, internal.link); 341 } 342 } 343 344 return bdev; 345 } 346 347 struct spdk_bdev * 348 spdk_bdev_first_leaf(void) 349 { 350 struct spdk_bdev *bdev; 351 352 bdev = _bdev_next_leaf(TAILQ_FIRST(&g_bdev_mgr.bdevs)); 353 354 if (bdev) { 355 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Starting bdev iteration at %s\n", bdev->name); 356 } 357 358 return bdev; 359 } 360 361 struct spdk_bdev * 362 spdk_bdev_next_leaf(struct spdk_bdev *prev) 363 { 364 struct spdk_bdev *bdev; 365 366 bdev = _bdev_next_leaf(TAILQ_NEXT(prev, internal.link)); 367 368 if (bdev) { 369 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Continuing bdev iteration at %s\n", bdev->name); 370 } 371 372 return bdev; 373 } 374 375 struct spdk_bdev * 376 spdk_bdev_get_by_name(const char *bdev_name) 377 { 378 struct spdk_bdev_alias *tmp; 379 struct spdk_bdev *bdev = spdk_bdev_first(); 380 381 while (bdev != NULL) { 382 if (strcmp(bdev_name, bdev->name) == 0) { 383 return bdev; 384 } 385 386 TAILQ_FOREACH(tmp, &bdev->aliases, tailq) { 387 if (strcmp(bdev_name, tmp->alias) == 0) { 388 return bdev; 389 } 390 } 391 392 bdev = spdk_bdev_next(bdev); 393 } 394 395 return NULL; 396 } 397 398 void 399 spdk_bdev_io_set_buf(struct spdk_bdev_io *bdev_io, void *buf, size_t len) 400 { 401 struct iovec *iovs; 402 403 iovs = bdev_io->u.bdev.iovs; 404 405 assert(iovs != NULL); 406 assert(bdev_io->u.bdev.iovcnt >= 1); 407 408 iovs[0].iov_base = buf; 409 iovs[0].iov_len = len; 410 } 411 412 static void 413 spdk_bdev_io_put_buf(struct spdk_bdev_io *bdev_io) 414 { 415 struct spdk_mempool *pool; 416 struct spdk_bdev_io *tmp; 417 void *buf, *aligned_buf; 418 bdev_io_stailq_t *stailq; 419 struct spdk_bdev_mgmt_channel *ch; 420 421 assert(bdev_io->u.bdev.iovcnt == 1); 422 423 buf = bdev_io->internal.buf; 424 ch = bdev_io->internal.ch->shared_resource->mgmt_ch; 425 426 bdev_io->internal.buf = NULL; 427 428 if (bdev_io->internal.buf_len <= SPDK_BDEV_SMALL_BUF_MAX_SIZE) { 429 pool = g_bdev_mgr.buf_small_pool; 430 stailq = &ch->need_buf_small; 431 } else { 432 pool = g_bdev_mgr.buf_large_pool; 433 stailq = &ch->need_buf_large; 434 } 435 436 if (STAILQ_EMPTY(stailq)) { 437 spdk_mempool_put(pool, buf); 438 } else { 439 tmp = STAILQ_FIRST(stailq); 440 441 aligned_buf = (void *)(((uintptr_t)buf + 511) & ~511UL); 442 spdk_bdev_io_set_buf(tmp, aligned_buf, tmp->internal.buf_len); 443 444 STAILQ_REMOVE_HEAD(stailq, internal.buf_link); 445 tmp->internal.buf = buf; 446 tmp->internal.get_buf_cb(tmp->internal.ch->channel, tmp); 447 } 448 } 449 450 void 451 spdk_bdev_io_get_buf(struct spdk_bdev_io *bdev_io, spdk_bdev_io_get_buf_cb cb, uint64_t len) 452 { 453 struct spdk_mempool *pool; 454 bdev_io_stailq_t *stailq; 455 void *buf, *aligned_buf; 456 struct spdk_bdev_mgmt_channel *mgmt_ch; 457 458 assert(cb != NULL); 459 assert(bdev_io->u.bdev.iovs != NULL); 460 461 if (spdk_unlikely(bdev_io->u.bdev.iovs[0].iov_base != NULL)) { 462 /* Buffer already present */ 463 cb(bdev_io->internal.ch->channel, bdev_io); 464 return; 465 } 466 467 assert(len <= SPDK_BDEV_LARGE_BUF_MAX_SIZE); 468 mgmt_ch = bdev_io->internal.ch->shared_resource->mgmt_ch; 469 470 bdev_io->internal.buf_len = len; 471 bdev_io->internal.get_buf_cb = cb; 472 if (len <= SPDK_BDEV_SMALL_BUF_MAX_SIZE) { 473 pool = g_bdev_mgr.buf_small_pool; 474 stailq = &mgmt_ch->need_buf_small; 475 } else { 476 pool = g_bdev_mgr.buf_large_pool; 477 stailq = &mgmt_ch->need_buf_large; 478 } 479 480 buf = spdk_mempool_get(pool); 481 482 if (!buf) { 483 STAILQ_INSERT_TAIL(stailq, bdev_io, internal.buf_link); 484 } else { 485 aligned_buf = (void *)(((uintptr_t)buf + 511) & ~511UL); 486 spdk_bdev_io_set_buf(bdev_io, aligned_buf, len); 487 488 bdev_io->internal.buf = buf; 489 bdev_io->internal.get_buf_cb(bdev_io->internal.ch->channel, bdev_io); 490 } 491 } 492 493 static int 494 spdk_bdev_module_get_max_ctx_size(void) 495 { 496 struct spdk_bdev_module *bdev_module; 497 int max_bdev_module_size = 0; 498 499 TAILQ_FOREACH(bdev_module, &g_bdev_mgr.bdev_modules, internal.tailq) { 500 if (bdev_module->get_ctx_size && bdev_module->get_ctx_size() > max_bdev_module_size) { 501 max_bdev_module_size = bdev_module->get_ctx_size(); 502 } 503 } 504 505 return max_bdev_module_size; 506 } 507 508 void 509 spdk_bdev_config_text(FILE *fp) 510 { 511 struct spdk_bdev_module *bdev_module; 512 513 TAILQ_FOREACH(bdev_module, &g_bdev_mgr.bdev_modules, internal.tailq) { 514 if (bdev_module->config_text) { 515 bdev_module->config_text(fp); 516 } 517 } 518 } 519 520 static void 521 spdk_bdev_qos_config_json(struct spdk_bdev *bdev, struct spdk_json_write_ctx *w) 522 { 523 int i; 524 struct spdk_bdev_qos *qos = bdev->internal.qos; 525 uint64_t limits[SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES]; 526 527 if (!qos) { 528 return; 529 } 530 531 spdk_bdev_get_qos_rate_limits(bdev, limits); 532 533 spdk_json_write_object_begin(w); 534 spdk_json_write_named_string(w, "method", "set_bdev_qos_limit"); 535 spdk_json_write_name(w, "params"); 536 537 spdk_json_write_object_begin(w); 538 spdk_json_write_named_string(w, "name", bdev->name); 539 for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) { 540 if (limits[i] > 0) { 541 spdk_json_write_named_uint64(w, qos_rpc_type[i], limits[i]); 542 } 543 } 544 spdk_json_write_object_end(w); 545 546 spdk_json_write_object_end(w); 547 } 548 549 void 550 spdk_bdev_subsystem_config_json(struct spdk_json_write_ctx *w) 551 { 552 struct spdk_bdev_module *bdev_module; 553 struct spdk_bdev *bdev; 554 555 assert(w != NULL); 556 557 spdk_json_write_array_begin(w); 558 559 spdk_json_write_object_begin(w); 560 spdk_json_write_named_string(w, "method", "set_bdev_options"); 561 spdk_json_write_name(w, "params"); 562 spdk_json_write_object_begin(w); 563 spdk_json_write_named_uint32(w, "bdev_io_pool_size", g_bdev_opts.bdev_io_pool_size); 564 spdk_json_write_named_uint32(w, "bdev_io_cache_size", g_bdev_opts.bdev_io_cache_size); 565 spdk_json_write_object_end(w); 566 spdk_json_write_object_end(w); 567 568 TAILQ_FOREACH(bdev_module, &g_bdev_mgr.bdev_modules, internal.tailq) { 569 if (bdev_module->config_json) { 570 bdev_module->config_json(w); 571 } 572 } 573 574 TAILQ_FOREACH(bdev, &g_bdev_mgr.bdevs, internal.link) { 575 spdk_bdev_qos_config_json(bdev, w); 576 577 if (bdev->fn_table->write_config_json) { 578 bdev->fn_table->write_config_json(bdev, w); 579 } 580 } 581 582 spdk_json_write_array_end(w); 583 } 584 585 static int 586 spdk_bdev_mgmt_channel_create(void *io_device, void *ctx_buf) 587 { 588 struct spdk_bdev_mgmt_channel *ch = ctx_buf; 589 struct spdk_bdev_io *bdev_io; 590 uint32_t i; 591 592 STAILQ_INIT(&ch->need_buf_small); 593 STAILQ_INIT(&ch->need_buf_large); 594 595 STAILQ_INIT(&ch->per_thread_cache); 596 ch->bdev_io_cache_size = g_bdev_opts.bdev_io_cache_size; 597 598 /* Pre-populate bdev_io cache to ensure this thread cannot be starved. */ 599 ch->per_thread_cache_count = 0; 600 for (i = 0; i < ch->bdev_io_cache_size; i++) { 601 bdev_io = spdk_mempool_get(g_bdev_mgr.bdev_io_pool); 602 assert(bdev_io != NULL); 603 ch->per_thread_cache_count++; 604 STAILQ_INSERT_TAIL(&ch->per_thread_cache, bdev_io, internal.buf_link); 605 } 606 607 TAILQ_INIT(&ch->shared_resources); 608 TAILQ_INIT(&ch->io_wait_queue); 609 610 return 0; 611 } 612 613 static void 614 spdk_bdev_mgmt_channel_destroy(void *io_device, void *ctx_buf) 615 { 616 struct spdk_bdev_mgmt_channel *ch = ctx_buf; 617 struct spdk_bdev_io *bdev_io; 618 619 if (!STAILQ_EMPTY(&ch->need_buf_small) || !STAILQ_EMPTY(&ch->need_buf_large)) { 620 SPDK_ERRLOG("Pending I/O list wasn't empty on mgmt channel free\n"); 621 } 622 623 if (!TAILQ_EMPTY(&ch->shared_resources)) { 624 SPDK_ERRLOG("Module channel list wasn't empty on mgmt channel free\n"); 625 } 626 627 while (!STAILQ_EMPTY(&ch->per_thread_cache)) { 628 bdev_io = STAILQ_FIRST(&ch->per_thread_cache); 629 STAILQ_REMOVE_HEAD(&ch->per_thread_cache, internal.buf_link); 630 ch->per_thread_cache_count--; 631 spdk_mempool_put(g_bdev_mgr.bdev_io_pool, (void *)bdev_io); 632 } 633 634 assert(ch->per_thread_cache_count == 0); 635 } 636 637 static void 638 spdk_bdev_init_complete(int rc) 639 { 640 spdk_bdev_init_cb cb_fn = g_init_cb_fn; 641 void *cb_arg = g_init_cb_arg; 642 struct spdk_bdev_module *m; 643 644 g_bdev_mgr.init_complete = true; 645 g_init_cb_fn = NULL; 646 g_init_cb_arg = NULL; 647 648 /* 649 * For modules that need to know when subsystem init is complete, 650 * inform them now. 651 */ 652 if (rc == 0) { 653 TAILQ_FOREACH(m, &g_bdev_mgr.bdev_modules, internal.tailq) { 654 if (m->init_complete) { 655 m->init_complete(); 656 } 657 } 658 } 659 660 cb_fn(cb_arg, rc); 661 } 662 663 static void 664 spdk_bdev_module_action_complete(void) 665 { 666 struct spdk_bdev_module *m; 667 668 /* 669 * Don't finish bdev subsystem initialization if 670 * module pre-initialization is still in progress, or 671 * the subsystem been already initialized. 672 */ 673 if (!g_bdev_mgr.module_init_complete || g_bdev_mgr.init_complete) { 674 return; 675 } 676 677 /* 678 * Check all bdev modules for inits/examinations in progress. If any 679 * exist, return immediately since we cannot finish bdev subsystem 680 * initialization until all are completed. 681 */ 682 TAILQ_FOREACH(m, &g_bdev_mgr.bdev_modules, internal.tailq) { 683 if (m->internal.action_in_progress > 0) { 684 return; 685 } 686 } 687 688 /* 689 * Modules already finished initialization - now that all 690 * the bdev modules have finished their asynchronous I/O 691 * processing, the entire bdev layer can be marked as complete. 692 */ 693 spdk_bdev_init_complete(0); 694 } 695 696 static void 697 spdk_bdev_module_action_done(struct spdk_bdev_module *module) 698 { 699 assert(module->internal.action_in_progress > 0); 700 module->internal.action_in_progress--; 701 spdk_bdev_module_action_complete(); 702 } 703 704 void 705 spdk_bdev_module_init_done(struct spdk_bdev_module *module) 706 { 707 spdk_bdev_module_action_done(module); 708 } 709 710 void 711 spdk_bdev_module_examine_done(struct spdk_bdev_module *module) 712 { 713 spdk_bdev_module_action_done(module); 714 } 715 716 /** The last initialized bdev module */ 717 static struct spdk_bdev_module *g_resume_bdev_module = NULL; 718 719 static int 720 spdk_bdev_modules_init(void) 721 { 722 struct spdk_bdev_module *module; 723 int rc = 0; 724 725 TAILQ_FOREACH(module, &g_bdev_mgr.bdev_modules, internal.tailq) { 726 g_resume_bdev_module = module; 727 rc = module->module_init(); 728 if (rc != 0) { 729 return rc; 730 } 731 } 732 733 g_resume_bdev_module = NULL; 734 return 0; 735 } 736 737 738 static void 739 spdk_bdev_init_failed_complete(void *cb_arg) 740 { 741 spdk_bdev_init_complete(-1); 742 } 743 744 static void 745 spdk_bdev_init_failed(void *cb_arg) 746 { 747 spdk_bdev_finish(spdk_bdev_init_failed_complete, NULL); 748 } 749 750 void 751 spdk_bdev_initialize(spdk_bdev_init_cb cb_fn, void *cb_arg) 752 { 753 struct spdk_conf_section *sp; 754 struct spdk_bdev_opts bdev_opts; 755 int32_t bdev_io_pool_size, bdev_io_cache_size; 756 int cache_size; 757 int rc = 0; 758 char mempool_name[32]; 759 760 assert(cb_fn != NULL); 761 762 sp = spdk_conf_find_section(NULL, "Bdev"); 763 if (sp != NULL) { 764 spdk_bdev_get_opts(&bdev_opts); 765 766 bdev_io_pool_size = spdk_conf_section_get_intval(sp, "BdevIoPoolSize"); 767 if (bdev_io_pool_size >= 0) { 768 bdev_opts.bdev_io_pool_size = bdev_io_pool_size; 769 } 770 771 bdev_io_cache_size = spdk_conf_section_get_intval(sp, "BdevIoCacheSize"); 772 if (bdev_io_cache_size >= 0) { 773 bdev_opts.bdev_io_cache_size = bdev_io_cache_size; 774 } 775 776 if (spdk_bdev_set_opts(&bdev_opts)) { 777 spdk_bdev_init_complete(-1); 778 return; 779 } 780 781 assert(memcmp(&bdev_opts, &g_bdev_opts, sizeof(bdev_opts)) == 0); 782 } 783 784 g_init_cb_fn = cb_fn; 785 g_init_cb_arg = cb_arg; 786 787 snprintf(mempool_name, sizeof(mempool_name), "bdev_io_%d", getpid()); 788 789 g_bdev_mgr.bdev_io_pool = spdk_mempool_create(mempool_name, 790 g_bdev_opts.bdev_io_pool_size, 791 sizeof(struct spdk_bdev_io) + 792 spdk_bdev_module_get_max_ctx_size(), 793 0, 794 SPDK_ENV_SOCKET_ID_ANY); 795 796 if (g_bdev_mgr.bdev_io_pool == NULL) { 797 SPDK_ERRLOG("could not allocate spdk_bdev_io pool\n"); 798 spdk_bdev_init_complete(-1); 799 return; 800 } 801 802 /** 803 * Ensure no more than half of the total buffers end up local caches, by 804 * using spdk_thread_get_count() to determine how many local caches we need 805 * to account for. 806 */ 807 cache_size = BUF_SMALL_POOL_SIZE / (2 * spdk_thread_get_count()); 808 snprintf(mempool_name, sizeof(mempool_name), "buf_small_pool_%d", getpid()); 809 810 g_bdev_mgr.buf_small_pool = spdk_mempool_create(mempool_name, 811 BUF_SMALL_POOL_SIZE, 812 SPDK_BDEV_SMALL_BUF_MAX_SIZE + 512, 813 cache_size, 814 SPDK_ENV_SOCKET_ID_ANY); 815 if (!g_bdev_mgr.buf_small_pool) { 816 SPDK_ERRLOG("create rbuf small pool failed\n"); 817 spdk_bdev_init_complete(-1); 818 return; 819 } 820 821 cache_size = BUF_LARGE_POOL_SIZE / (2 * spdk_thread_get_count()); 822 snprintf(mempool_name, sizeof(mempool_name), "buf_large_pool_%d", getpid()); 823 824 g_bdev_mgr.buf_large_pool = spdk_mempool_create(mempool_name, 825 BUF_LARGE_POOL_SIZE, 826 SPDK_BDEV_LARGE_BUF_MAX_SIZE + 512, 827 cache_size, 828 SPDK_ENV_SOCKET_ID_ANY); 829 if (!g_bdev_mgr.buf_large_pool) { 830 SPDK_ERRLOG("create rbuf large pool failed\n"); 831 spdk_bdev_init_complete(-1); 832 return; 833 } 834 835 g_bdev_mgr.zero_buffer = spdk_dma_zmalloc(ZERO_BUFFER_SIZE, ZERO_BUFFER_SIZE, 836 NULL); 837 if (!g_bdev_mgr.zero_buffer) { 838 SPDK_ERRLOG("create bdev zero buffer failed\n"); 839 spdk_bdev_init_complete(-1); 840 return; 841 } 842 843 #ifdef SPDK_CONFIG_VTUNE 844 g_bdev_mgr.domain = __itt_domain_create("spdk_bdev"); 845 #endif 846 847 spdk_io_device_register(&g_bdev_mgr, spdk_bdev_mgmt_channel_create, 848 spdk_bdev_mgmt_channel_destroy, 849 sizeof(struct spdk_bdev_mgmt_channel), 850 "bdev_mgr"); 851 852 rc = spdk_bdev_modules_init(); 853 g_bdev_mgr.module_init_complete = true; 854 if (rc != 0) { 855 SPDK_ERRLOG("bdev modules init failed\n"); 856 spdk_thread_send_msg(spdk_get_thread(), spdk_bdev_init_failed, NULL); 857 return; 858 } 859 860 spdk_bdev_module_action_complete(); 861 } 862 863 static void 864 spdk_bdev_mgr_unregister_cb(void *io_device) 865 { 866 spdk_bdev_fini_cb cb_fn = g_fini_cb_fn; 867 868 if (spdk_mempool_count(g_bdev_mgr.bdev_io_pool) != g_bdev_opts.bdev_io_pool_size) { 869 SPDK_ERRLOG("bdev IO pool count is %zu but should be %u\n", 870 spdk_mempool_count(g_bdev_mgr.bdev_io_pool), 871 g_bdev_opts.bdev_io_pool_size); 872 } 873 874 if (spdk_mempool_count(g_bdev_mgr.buf_small_pool) != BUF_SMALL_POOL_SIZE) { 875 SPDK_ERRLOG("Small buffer pool count is %zu but should be %u\n", 876 spdk_mempool_count(g_bdev_mgr.buf_small_pool), 877 BUF_SMALL_POOL_SIZE); 878 assert(false); 879 } 880 881 if (spdk_mempool_count(g_bdev_mgr.buf_large_pool) != BUF_LARGE_POOL_SIZE) { 882 SPDK_ERRLOG("Large buffer pool count is %zu but should be %u\n", 883 spdk_mempool_count(g_bdev_mgr.buf_large_pool), 884 BUF_LARGE_POOL_SIZE); 885 assert(false); 886 } 887 888 spdk_mempool_free(g_bdev_mgr.bdev_io_pool); 889 spdk_mempool_free(g_bdev_mgr.buf_small_pool); 890 spdk_mempool_free(g_bdev_mgr.buf_large_pool); 891 spdk_dma_free(g_bdev_mgr.zero_buffer); 892 893 cb_fn(g_fini_cb_arg); 894 g_fini_cb_fn = NULL; 895 g_fini_cb_arg = NULL; 896 g_bdev_mgr.init_complete = false; 897 g_bdev_mgr.module_init_complete = false; 898 } 899 900 static void 901 spdk_bdev_module_finish_iter(void *arg) 902 { 903 struct spdk_bdev_module *bdev_module; 904 905 /* Start iterating from the last touched module */ 906 if (!g_resume_bdev_module) { 907 bdev_module = TAILQ_LAST(&g_bdev_mgr.bdev_modules, bdev_module_list); 908 } else { 909 bdev_module = TAILQ_PREV(g_resume_bdev_module, bdev_module_list, 910 internal.tailq); 911 } 912 913 while (bdev_module) { 914 if (bdev_module->async_fini) { 915 /* Save our place so we can resume later. We must 916 * save the variable here, before calling module_fini() 917 * below, because in some cases the module may immediately 918 * call spdk_bdev_module_finish_done() and re-enter 919 * this function to continue iterating. */ 920 g_resume_bdev_module = bdev_module; 921 } 922 923 if (bdev_module->module_fini) { 924 bdev_module->module_fini(); 925 } 926 927 if (bdev_module->async_fini) { 928 return; 929 } 930 931 bdev_module = TAILQ_PREV(bdev_module, bdev_module_list, 932 internal.tailq); 933 } 934 935 g_resume_bdev_module = NULL; 936 spdk_io_device_unregister(&g_bdev_mgr, spdk_bdev_mgr_unregister_cb); 937 } 938 939 void 940 spdk_bdev_module_finish_done(void) 941 { 942 if (spdk_get_thread() != g_fini_thread) { 943 spdk_thread_send_msg(g_fini_thread, spdk_bdev_module_finish_iter, NULL); 944 } else { 945 spdk_bdev_module_finish_iter(NULL); 946 } 947 } 948 949 static void 950 _spdk_bdev_finish_unregister_bdevs_iter(void *cb_arg, int bdeverrno) 951 { 952 struct spdk_bdev *bdev = cb_arg; 953 954 if (bdeverrno && bdev) { 955 SPDK_WARNLOG("Unable to unregister bdev '%s' during spdk_bdev_finish()\n", 956 bdev->name); 957 958 /* 959 * Since the call to spdk_bdev_unregister() failed, we have no way to free this 960 * bdev; try to continue by manually removing this bdev from the list and continue 961 * with the next bdev in the list. 962 */ 963 TAILQ_REMOVE(&g_bdev_mgr.bdevs, bdev, internal.link); 964 } 965 966 if (TAILQ_EMPTY(&g_bdev_mgr.bdevs)) { 967 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Done unregistering bdevs\n"); 968 /* 969 * Bdev module finish need to be deffered as we might be in the middle of some context 970 * (like bdev part free) that will use this bdev (or private bdev driver ctx data) 971 * after returning. 972 */ 973 spdk_thread_send_msg(spdk_get_thread(), spdk_bdev_module_finish_iter, NULL); 974 return; 975 } 976 977 /* 978 * Unregister the last bdev in the list. The last bdev in the list should be a bdev 979 * that has no bdevs that depend on it. 980 */ 981 bdev = TAILQ_LAST(&g_bdev_mgr.bdevs, spdk_bdev_list); 982 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Unregistering bdev '%s'\n", bdev->name); 983 spdk_bdev_unregister(bdev, _spdk_bdev_finish_unregister_bdevs_iter, bdev); 984 } 985 986 void 987 spdk_bdev_finish(spdk_bdev_fini_cb cb_fn, void *cb_arg) 988 { 989 struct spdk_bdev_module *m; 990 991 assert(cb_fn != NULL); 992 993 g_fini_thread = spdk_get_thread(); 994 995 g_fini_cb_fn = cb_fn; 996 g_fini_cb_arg = cb_arg; 997 998 TAILQ_FOREACH(m, &g_bdev_mgr.bdev_modules, internal.tailq) { 999 if (m->fini_start) { 1000 m->fini_start(); 1001 } 1002 } 1003 1004 _spdk_bdev_finish_unregister_bdevs_iter(NULL, 0); 1005 } 1006 1007 static struct spdk_bdev_io * 1008 spdk_bdev_get_io(struct spdk_bdev_channel *channel) 1009 { 1010 struct spdk_bdev_mgmt_channel *ch = channel->shared_resource->mgmt_ch; 1011 struct spdk_bdev_io *bdev_io; 1012 1013 if (ch->per_thread_cache_count > 0) { 1014 bdev_io = STAILQ_FIRST(&ch->per_thread_cache); 1015 STAILQ_REMOVE_HEAD(&ch->per_thread_cache, internal.buf_link); 1016 ch->per_thread_cache_count--; 1017 } else if (spdk_unlikely(!TAILQ_EMPTY(&ch->io_wait_queue))) { 1018 /* 1019 * Don't try to look for bdev_ios in the global pool if there are 1020 * waiters on bdev_ios - we don't want this caller to jump the line. 1021 */ 1022 bdev_io = NULL; 1023 } else { 1024 bdev_io = spdk_mempool_get(g_bdev_mgr.bdev_io_pool); 1025 } 1026 1027 return bdev_io; 1028 } 1029 1030 void 1031 spdk_bdev_free_io(struct spdk_bdev_io *bdev_io) 1032 { 1033 struct spdk_bdev_mgmt_channel *ch = bdev_io->internal.ch->shared_resource->mgmt_ch; 1034 1035 assert(bdev_io != NULL); 1036 assert(bdev_io->internal.status != SPDK_BDEV_IO_STATUS_PENDING); 1037 1038 if (bdev_io->internal.buf != NULL) { 1039 spdk_bdev_io_put_buf(bdev_io); 1040 } 1041 1042 if (ch->per_thread_cache_count < ch->bdev_io_cache_size) { 1043 ch->per_thread_cache_count++; 1044 STAILQ_INSERT_TAIL(&ch->per_thread_cache, bdev_io, internal.buf_link); 1045 while (ch->per_thread_cache_count > 0 && !TAILQ_EMPTY(&ch->io_wait_queue)) { 1046 struct spdk_bdev_io_wait_entry *entry; 1047 1048 entry = TAILQ_FIRST(&ch->io_wait_queue); 1049 TAILQ_REMOVE(&ch->io_wait_queue, entry, link); 1050 entry->cb_fn(entry->cb_arg); 1051 } 1052 } else { 1053 /* We should never have a full cache with entries on the io wait queue. */ 1054 assert(TAILQ_EMPTY(&ch->io_wait_queue)); 1055 spdk_mempool_put(g_bdev_mgr.bdev_io_pool, (void *)bdev_io); 1056 } 1057 } 1058 1059 static bool 1060 _spdk_bdev_qos_is_iops_rate_limit(enum spdk_bdev_qos_rate_limit_type limit) 1061 { 1062 assert(limit != SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES); 1063 1064 switch (limit) { 1065 case SPDK_BDEV_QOS_RW_IOPS_RATE_LIMIT: 1066 return true; 1067 case SPDK_BDEV_QOS_RW_BPS_RATE_LIMIT: 1068 return false; 1069 case SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES: 1070 default: 1071 return false; 1072 } 1073 } 1074 1075 static bool 1076 _spdk_bdev_qos_io_to_limit(struct spdk_bdev_io *bdev_io) 1077 { 1078 switch (bdev_io->type) { 1079 case SPDK_BDEV_IO_TYPE_NVME_IO: 1080 case SPDK_BDEV_IO_TYPE_NVME_IO_MD: 1081 case SPDK_BDEV_IO_TYPE_READ: 1082 case SPDK_BDEV_IO_TYPE_WRITE: 1083 case SPDK_BDEV_IO_TYPE_UNMAP: 1084 case SPDK_BDEV_IO_TYPE_WRITE_ZEROES: 1085 return true; 1086 default: 1087 return false; 1088 } 1089 } 1090 1091 static uint64_t 1092 _spdk_bdev_get_io_size_in_byte(struct spdk_bdev_io *bdev_io) 1093 { 1094 struct spdk_bdev *bdev = bdev_io->bdev; 1095 1096 switch (bdev_io->type) { 1097 case SPDK_BDEV_IO_TYPE_NVME_IO: 1098 case SPDK_BDEV_IO_TYPE_NVME_IO_MD: 1099 return bdev_io->u.nvme_passthru.nbytes; 1100 case SPDK_BDEV_IO_TYPE_READ: 1101 case SPDK_BDEV_IO_TYPE_WRITE: 1102 case SPDK_BDEV_IO_TYPE_UNMAP: 1103 case SPDK_BDEV_IO_TYPE_WRITE_ZEROES: 1104 return bdev_io->u.bdev.num_blocks * bdev->blocklen; 1105 default: 1106 return 0; 1107 } 1108 } 1109 1110 static void 1111 _spdk_bdev_qos_update_per_io(struct spdk_bdev_qos *qos, uint64_t io_size_in_byte) 1112 { 1113 int i; 1114 1115 for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) { 1116 if (qos->rate_limits[i].limit == SPDK_BDEV_QOS_LIMIT_NOT_DEFINED) { 1117 continue; 1118 } 1119 1120 switch (i) { 1121 case SPDK_BDEV_QOS_RW_IOPS_RATE_LIMIT: 1122 qos->rate_limits[i].remaining_this_timeslice--; 1123 break; 1124 case SPDK_BDEV_QOS_RW_BPS_RATE_LIMIT: 1125 qos->rate_limits[i].remaining_this_timeslice -= io_size_in_byte; 1126 break; 1127 case SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES: 1128 default: 1129 break; 1130 } 1131 } 1132 } 1133 1134 static void 1135 _spdk_bdev_qos_io_submit(struct spdk_bdev_channel *ch, struct spdk_bdev_qos *qos) 1136 { 1137 struct spdk_bdev_io *bdev_io = NULL; 1138 struct spdk_bdev *bdev = ch->bdev; 1139 struct spdk_bdev_shared_resource *shared_resource = ch->shared_resource; 1140 int i; 1141 bool to_limit_io; 1142 uint64_t io_size_in_byte; 1143 1144 while (!TAILQ_EMPTY(&qos->queued)) { 1145 for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) { 1146 if (qos->rate_limits[i].max_per_timeslice > 0 && 1147 (qos->rate_limits[i].remaining_this_timeslice <= 0)) { 1148 return; 1149 } 1150 } 1151 1152 bdev_io = TAILQ_FIRST(&qos->queued); 1153 TAILQ_REMOVE(&qos->queued, bdev_io, internal.link); 1154 ch->io_outstanding++; 1155 shared_resource->io_outstanding++; 1156 to_limit_io = _spdk_bdev_qos_io_to_limit(bdev_io); 1157 if (to_limit_io == true) { 1158 io_size_in_byte = _spdk_bdev_get_io_size_in_byte(bdev_io); 1159 _spdk_bdev_qos_update_per_io(qos, io_size_in_byte); 1160 } 1161 bdev->fn_table->submit_request(ch->channel, bdev_io); 1162 } 1163 } 1164 1165 static void 1166 _spdk_bdev_queue_io_wait_with_cb(struct spdk_bdev_io *bdev_io, spdk_bdev_io_wait_cb cb_fn) 1167 { 1168 int rc; 1169 1170 bdev_io->internal.waitq_entry.bdev = bdev_io->bdev; 1171 bdev_io->internal.waitq_entry.cb_fn = cb_fn; 1172 bdev_io->internal.waitq_entry.cb_arg = bdev_io; 1173 rc = spdk_bdev_queue_io_wait(bdev_io->bdev, spdk_io_channel_from_ctx(bdev_io->internal.ch), 1174 &bdev_io->internal.waitq_entry); 1175 if (rc != 0) { 1176 SPDK_ERRLOG("Queue IO failed, rc=%d\n", rc); 1177 bdev_io->internal.status = SPDK_BDEV_IO_STATUS_FAILED; 1178 bdev_io->internal.cb(bdev_io, false, bdev_io->internal.caller_ctx); 1179 } 1180 } 1181 1182 static bool 1183 _spdk_bdev_io_type_can_split(uint8_t type) 1184 { 1185 assert(type != SPDK_BDEV_IO_TYPE_INVALID); 1186 assert(type < SPDK_BDEV_NUM_IO_TYPES); 1187 1188 /* Only split READ and WRITE I/O. Theoretically other types of I/O like 1189 * UNMAP could be split, but these types of I/O are typically much larger 1190 * in size (sometimes the size of the entire block device), and the bdev 1191 * module can more efficiently split these types of I/O. Plus those types 1192 * of I/O do not have a payload, which makes the splitting process simpler. 1193 */ 1194 if (type == SPDK_BDEV_IO_TYPE_READ || type == SPDK_BDEV_IO_TYPE_WRITE) { 1195 return true; 1196 } else { 1197 return false; 1198 } 1199 } 1200 1201 static bool 1202 _spdk_bdev_io_should_split(struct spdk_bdev_io *bdev_io) 1203 { 1204 uint64_t start_stripe, end_stripe; 1205 uint32_t io_boundary = bdev_io->bdev->optimal_io_boundary; 1206 1207 if (io_boundary == 0) { 1208 return false; 1209 } 1210 1211 if (!_spdk_bdev_io_type_can_split(bdev_io->type)) { 1212 return false; 1213 } 1214 1215 start_stripe = bdev_io->u.bdev.offset_blocks; 1216 end_stripe = start_stripe + bdev_io->u.bdev.num_blocks - 1; 1217 /* Avoid expensive div operations if possible. These spdk_u32 functions are very cheap. */ 1218 if (spdk_likely(spdk_u32_is_pow2(io_boundary))) { 1219 start_stripe >>= spdk_u32log2(io_boundary); 1220 end_stripe >>= spdk_u32log2(io_boundary); 1221 } else { 1222 start_stripe /= io_boundary; 1223 end_stripe /= io_boundary; 1224 } 1225 return (start_stripe != end_stripe); 1226 } 1227 1228 static uint32_t 1229 _to_next_boundary(uint64_t offset, uint32_t boundary) 1230 { 1231 return (boundary - (offset % boundary)); 1232 } 1233 1234 static void 1235 _spdk_bdev_io_split_done(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg); 1236 1237 static void 1238 _spdk_bdev_io_split_with_payload(void *_bdev_io) 1239 { 1240 struct spdk_bdev_io *bdev_io = _bdev_io; 1241 uint64_t current_offset, remaining; 1242 uint32_t blocklen, to_next_boundary, to_next_boundary_bytes; 1243 struct iovec *parent_iov, *iov; 1244 uint64_t parent_iov_offset, iov_len; 1245 uint32_t parent_iovpos, parent_iovcnt, child_iovcnt, iovcnt; 1246 int rc; 1247 1248 remaining = bdev_io->u.bdev.split_remaining_num_blocks; 1249 current_offset = bdev_io->u.bdev.split_current_offset_blocks; 1250 blocklen = bdev_io->bdev->blocklen; 1251 parent_iov_offset = (current_offset - bdev_io->u.bdev.offset_blocks) * blocklen; 1252 parent_iovcnt = bdev_io->u.bdev.iovcnt; 1253 1254 for (parent_iovpos = 0; parent_iovpos < parent_iovcnt; parent_iovpos++) { 1255 parent_iov = &bdev_io->u.bdev.iovs[parent_iovpos]; 1256 if (parent_iov_offset < parent_iov->iov_len) { 1257 break; 1258 } 1259 parent_iov_offset -= parent_iov->iov_len; 1260 } 1261 1262 child_iovcnt = 0; 1263 while (remaining > 0 && parent_iovpos < parent_iovcnt && child_iovcnt < BDEV_IO_NUM_CHILD_IOV) { 1264 to_next_boundary = _to_next_boundary(current_offset, bdev_io->bdev->optimal_io_boundary); 1265 to_next_boundary = spdk_min(remaining, to_next_boundary); 1266 to_next_boundary_bytes = to_next_boundary * blocklen; 1267 iov = &bdev_io->child_iov[child_iovcnt]; 1268 iovcnt = 0; 1269 while (to_next_boundary_bytes > 0 && parent_iovpos < parent_iovcnt && 1270 child_iovcnt < BDEV_IO_NUM_CHILD_IOV) { 1271 parent_iov = &bdev_io->u.bdev.iovs[parent_iovpos]; 1272 iov_len = spdk_min(to_next_boundary_bytes, parent_iov->iov_len - parent_iov_offset); 1273 to_next_boundary_bytes -= iov_len; 1274 1275 bdev_io->child_iov[child_iovcnt].iov_base = parent_iov->iov_base + parent_iov_offset; 1276 bdev_io->child_iov[child_iovcnt].iov_len = iov_len; 1277 1278 if (iov_len < parent_iov->iov_len - parent_iov_offset) { 1279 parent_iov_offset += iov_len; 1280 } else { 1281 parent_iovpos++; 1282 parent_iov_offset = 0; 1283 } 1284 child_iovcnt++; 1285 iovcnt++; 1286 } 1287 1288 if (to_next_boundary_bytes > 0) { 1289 /* We had to stop this child I/O early because we ran out of 1290 * child_iov space. Make sure the iovs collected are valid and 1291 * then adjust to_next_boundary before starting the child I/O. 1292 */ 1293 if ((to_next_boundary_bytes % blocklen) != 0) { 1294 SPDK_ERRLOG("Remaining %" PRIu32 " is not multiple of block size %" PRIu32 "\n", 1295 to_next_boundary_bytes, blocklen); 1296 bdev_io->internal.status = SPDK_BDEV_IO_STATUS_FAILED; 1297 if (bdev_io->u.bdev.split_outstanding == 0) { 1298 bdev_io->internal.cb(bdev_io, false, bdev_io->internal.caller_ctx); 1299 } 1300 return; 1301 } 1302 to_next_boundary -= to_next_boundary_bytes / blocklen; 1303 } 1304 1305 bdev_io->u.bdev.split_outstanding++; 1306 1307 if (bdev_io->type == SPDK_BDEV_IO_TYPE_READ) { 1308 rc = spdk_bdev_readv_blocks(bdev_io->internal.desc, 1309 spdk_io_channel_from_ctx(bdev_io->internal.ch), 1310 iov, iovcnt, current_offset, to_next_boundary, 1311 _spdk_bdev_io_split_done, bdev_io); 1312 } else { 1313 rc = spdk_bdev_writev_blocks(bdev_io->internal.desc, 1314 spdk_io_channel_from_ctx(bdev_io->internal.ch), 1315 iov, iovcnt, current_offset, to_next_boundary, 1316 _spdk_bdev_io_split_done, bdev_io); 1317 } 1318 1319 if (rc == 0) { 1320 current_offset += to_next_boundary; 1321 remaining -= to_next_boundary; 1322 bdev_io->u.bdev.split_current_offset_blocks = current_offset; 1323 bdev_io->u.bdev.split_remaining_num_blocks = remaining; 1324 } else { 1325 bdev_io->u.bdev.split_outstanding--; 1326 if (rc == -ENOMEM) { 1327 if (bdev_io->u.bdev.split_outstanding == 0) { 1328 /* No I/O is outstanding. Hence we should wait here. */ 1329 _spdk_bdev_queue_io_wait_with_cb(bdev_io, 1330 _spdk_bdev_io_split_with_payload); 1331 } 1332 } else { 1333 bdev_io->internal.status = SPDK_BDEV_IO_STATUS_FAILED; 1334 if (bdev_io->u.bdev.split_outstanding == 0) { 1335 bdev_io->internal.cb(bdev_io, false, bdev_io->internal.caller_ctx); 1336 } 1337 } 1338 1339 return; 1340 } 1341 } 1342 } 1343 1344 static void 1345 _spdk_bdev_io_split_done(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg) 1346 { 1347 struct spdk_bdev_io *parent_io = cb_arg; 1348 1349 spdk_bdev_free_io(bdev_io); 1350 1351 if (!success) { 1352 parent_io->internal.status = SPDK_BDEV_IO_STATUS_FAILED; 1353 } 1354 parent_io->u.bdev.split_outstanding--; 1355 if (parent_io->u.bdev.split_outstanding != 0) { 1356 return; 1357 } 1358 1359 /* 1360 * Parent I/O finishes when all blocks are consumed or there is any failure of 1361 * child I/O and no outstanding child I/O. 1362 */ 1363 if (parent_io->u.bdev.split_remaining_num_blocks == 0 || 1364 parent_io->internal.status != SPDK_BDEV_IO_STATUS_SUCCESS) { 1365 parent_io->internal.cb(parent_io, parent_io->internal.status == SPDK_BDEV_IO_STATUS_SUCCESS, 1366 parent_io->internal.caller_ctx); 1367 return; 1368 } 1369 1370 /* 1371 * Continue with the splitting process. This function will complete the parent I/O if the 1372 * splitting is done. 1373 */ 1374 _spdk_bdev_io_split_with_payload(parent_io); 1375 } 1376 1377 static void 1378 _spdk_bdev_io_split(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io) 1379 { 1380 assert(_spdk_bdev_io_type_can_split(bdev_io->type)); 1381 1382 bdev_io->u.bdev.split_current_offset_blocks = bdev_io->u.bdev.offset_blocks; 1383 bdev_io->u.bdev.split_remaining_num_blocks = bdev_io->u.bdev.num_blocks; 1384 bdev_io->u.bdev.split_outstanding = 0; 1385 bdev_io->internal.status = SPDK_BDEV_IO_STATUS_SUCCESS; 1386 1387 _spdk_bdev_io_split_with_payload(bdev_io); 1388 } 1389 1390 static void 1391 _spdk_bdev_io_submit(void *ctx) 1392 { 1393 struct spdk_bdev_io *bdev_io = ctx; 1394 struct spdk_bdev *bdev = bdev_io->bdev; 1395 struct spdk_bdev_channel *bdev_ch = bdev_io->internal.ch; 1396 struct spdk_io_channel *ch = bdev_ch->channel; 1397 struct spdk_bdev_shared_resource *shared_resource = bdev_ch->shared_resource; 1398 uint64_t tsc; 1399 1400 tsc = spdk_get_ticks(); 1401 bdev_io->internal.submit_tsc = tsc; 1402 spdk_trace_record_tsc(tsc, TRACE_BDEV_IO_START, 0, 0, (uintptr_t)bdev_io, bdev_io->type); 1403 bdev_ch->io_outstanding++; 1404 shared_resource->io_outstanding++; 1405 bdev_io->internal.in_submit_request = true; 1406 if (spdk_likely(bdev_ch->flags == 0)) { 1407 if (spdk_likely(TAILQ_EMPTY(&shared_resource->nomem_io))) { 1408 bdev->fn_table->submit_request(ch, bdev_io); 1409 } else { 1410 bdev_ch->io_outstanding--; 1411 shared_resource->io_outstanding--; 1412 TAILQ_INSERT_TAIL(&shared_resource->nomem_io, bdev_io, internal.link); 1413 } 1414 } else if (bdev_ch->flags & BDEV_CH_RESET_IN_PROGRESS) { 1415 spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED); 1416 } else if (bdev_ch->flags & BDEV_CH_QOS_ENABLED) { 1417 bdev_ch->io_outstanding--; 1418 shared_resource->io_outstanding--; 1419 TAILQ_INSERT_TAIL(&bdev->internal.qos->queued, bdev_io, internal.link); 1420 _spdk_bdev_qos_io_submit(bdev_ch, bdev->internal.qos); 1421 } else { 1422 SPDK_ERRLOG("unknown bdev_ch flag %x found\n", bdev_ch->flags); 1423 spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED); 1424 } 1425 bdev_io->internal.in_submit_request = false; 1426 } 1427 1428 static void 1429 spdk_bdev_io_submit(struct spdk_bdev_io *bdev_io) 1430 { 1431 struct spdk_bdev *bdev = bdev_io->bdev; 1432 struct spdk_thread *thread = spdk_io_channel_get_thread(bdev_io->internal.ch->channel); 1433 1434 assert(thread != NULL); 1435 assert(bdev_io->internal.status == SPDK_BDEV_IO_STATUS_PENDING); 1436 1437 if (bdev->split_on_optimal_io_boundary && _spdk_bdev_io_should_split(bdev_io)) { 1438 if (bdev_io->type == SPDK_BDEV_IO_TYPE_READ) { 1439 spdk_bdev_io_get_buf(bdev_io, _spdk_bdev_io_split, 1440 bdev_io->u.bdev.num_blocks * bdev_io->bdev->blocklen); 1441 } else { 1442 _spdk_bdev_io_split(NULL, bdev_io); 1443 } 1444 return; 1445 } 1446 1447 if (bdev_io->internal.ch->flags & BDEV_CH_QOS_ENABLED) { 1448 if ((thread == bdev->internal.qos->thread) || !bdev->internal.qos->thread) { 1449 _spdk_bdev_io_submit(bdev_io); 1450 } else { 1451 bdev_io->internal.io_submit_ch = bdev_io->internal.ch; 1452 bdev_io->internal.ch = bdev->internal.qos->ch; 1453 spdk_thread_send_msg(bdev->internal.qos->thread, _spdk_bdev_io_submit, bdev_io); 1454 } 1455 } else { 1456 _spdk_bdev_io_submit(bdev_io); 1457 } 1458 } 1459 1460 static void 1461 spdk_bdev_io_submit_reset(struct spdk_bdev_io *bdev_io) 1462 { 1463 struct spdk_bdev *bdev = bdev_io->bdev; 1464 struct spdk_bdev_channel *bdev_ch = bdev_io->internal.ch; 1465 struct spdk_io_channel *ch = bdev_ch->channel; 1466 1467 assert(bdev_io->internal.status == SPDK_BDEV_IO_STATUS_PENDING); 1468 1469 bdev_io->internal.in_submit_request = true; 1470 bdev->fn_table->submit_request(ch, bdev_io); 1471 bdev_io->internal.in_submit_request = false; 1472 } 1473 1474 static void 1475 spdk_bdev_io_init(struct spdk_bdev_io *bdev_io, 1476 struct spdk_bdev *bdev, void *cb_arg, 1477 spdk_bdev_io_completion_cb cb) 1478 { 1479 bdev_io->bdev = bdev; 1480 bdev_io->internal.caller_ctx = cb_arg; 1481 bdev_io->internal.cb = cb; 1482 bdev_io->internal.status = SPDK_BDEV_IO_STATUS_PENDING; 1483 bdev_io->internal.in_submit_request = false; 1484 bdev_io->internal.buf = NULL; 1485 bdev_io->internal.io_submit_ch = NULL; 1486 } 1487 1488 static bool 1489 _spdk_bdev_io_type_supported(struct spdk_bdev *bdev, enum spdk_bdev_io_type io_type) 1490 { 1491 return bdev->fn_table->io_type_supported(bdev->ctxt, io_type); 1492 } 1493 1494 bool 1495 spdk_bdev_io_type_supported(struct spdk_bdev *bdev, enum spdk_bdev_io_type io_type) 1496 { 1497 bool supported; 1498 1499 supported = _spdk_bdev_io_type_supported(bdev, io_type); 1500 1501 if (!supported) { 1502 switch (io_type) { 1503 case SPDK_BDEV_IO_TYPE_WRITE_ZEROES: 1504 /* The bdev layer will emulate write zeroes as long as write is supported. */ 1505 supported = _spdk_bdev_io_type_supported(bdev, SPDK_BDEV_IO_TYPE_WRITE); 1506 break; 1507 default: 1508 break; 1509 } 1510 } 1511 1512 return supported; 1513 } 1514 1515 int 1516 spdk_bdev_dump_info_json(struct spdk_bdev *bdev, struct spdk_json_write_ctx *w) 1517 { 1518 if (bdev->fn_table->dump_info_json) { 1519 return bdev->fn_table->dump_info_json(bdev->ctxt, w); 1520 } 1521 1522 return 0; 1523 } 1524 1525 static void 1526 spdk_bdev_qos_update_max_quota_per_timeslice(struct spdk_bdev_qos *qos) 1527 { 1528 uint32_t max_per_timeslice = 0; 1529 int i; 1530 1531 for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) { 1532 if (qos->rate_limits[i].limit == SPDK_BDEV_QOS_LIMIT_NOT_DEFINED) { 1533 qos->rate_limits[i].max_per_timeslice = 0; 1534 continue; 1535 } 1536 1537 max_per_timeslice = qos->rate_limits[i].limit * 1538 SPDK_BDEV_QOS_TIMESLICE_IN_USEC / SPDK_SEC_TO_USEC; 1539 1540 qos->rate_limits[i].max_per_timeslice = spdk_max(max_per_timeslice, 1541 qos->rate_limits[i].min_per_timeslice); 1542 1543 qos->rate_limits[i].remaining_this_timeslice = qos->rate_limits[i].max_per_timeslice; 1544 } 1545 } 1546 1547 static int 1548 spdk_bdev_channel_poll_qos(void *arg) 1549 { 1550 struct spdk_bdev_qos *qos = arg; 1551 uint64_t now = spdk_get_ticks(); 1552 int i; 1553 1554 if (now < (qos->last_timeslice + qos->timeslice_size)) { 1555 /* We received our callback earlier than expected - return 1556 * immediately and wait to do accounting until at least one 1557 * timeslice has actually expired. This should never happen 1558 * with a well-behaved timer implementation. 1559 */ 1560 return 0; 1561 } 1562 1563 /* Reset for next round of rate limiting */ 1564 for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) { 1565 /* We may have allowed the IOs or bytes to slightly overrun in the last 1566 * timeslice. remaining_this_timeslice is signed, so if it's negative 1567 * here, we'll account for the overrun so that the next timeslice will 1568 * be appropriately reduced. 1569 */ 1570 if (qos->rate_limits[i].remaining_this_timeslice > 0) { 1571 qos->rate_limits[i].remaining_this_timeslice = 0; 1572 } 1573 } 1574 1575 while (now >= (qos->last_timeslice + qos->timeslice_size)) { 1576 qos->last_timeslice += qos->timeslice_size; 1577 for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) { 1578 qos->rate_limits[i].remaining_this_timeslice += 1579 qos->rate_limits[i].max_per_timeslice; 1580 } 1581 } 1582 1583 _spdk_bdev_qos_io_submit(qos->ch, qos); 1584 1585 return -1; 1586 } 1587 1588 static void 1589 _spdk_bdev_channel_destroy_resource(struct spdk_bdev_channel *ch) 1590 { 1591 struct spdk_bdev_shared_resource *shared_resource; 1592 1593 if (!ch) { 1594 return; 1595 } 1596 1597 if (ch->channel) { 1598 spdk_put_io_channel(ch->channel); 1599 } 1600 1601 assert(ch->io_outstanding == 0); 1602 1603 shared_resource = ch->shared_resource; 1604 if (shared_resource) { 1605 assert(ch->io_outstanding == 0); 1606 assert(shared_resource->ref > 0); 1607 shared_resource->ref--; 1608 if (shared_resource->ref == 0) { 1609 assert(shared_resource->io_outstanding == 0); 1610 TAILQ_REMOVE(&shared_resource->mgmt_ch->shared_resources, shared_resource, link); 1611 spdk_put_io_channel(spdk_io_channel_from_ctx(shared_resource->mgmt_ch)); 1612 free(shared_resource); 1613 } 1614 } 1615 } 1616 1617 /* Caller must hold bdev->internal.mutex. */ 1618 static void 1619 _spdk_bdev_enable_qos(struct spdk_bdev *bdev, struct spdk_bdev_channel *ch) 1620 { 1621 struct spdk_bdev_qos *qos = bdev->internal.qos; 1622 int i; 1623 1624 /* Rate limiting on this bdev enabled */ 1625 if (qos) { 1626 if (qos->ch == NULL) { 1627 struct spdk_io_channel *io_ch; 1628 1629 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Selecting channel %p as QoS channel for bdev %s on thread %p\n", ch, 1630 bdev->name, spdk_get_thread()); 1631 1632 /* No qos channel has been selected, so set one up */ 1633 1634 /* Take another reference to ch */ 1635 io_ch = spdk_get_io_channel(__bdev_to_io_dev(bdev)); 1636 qos->ch = ch; 1637 1638 qos->thread = spdk_io_channel_get_thread(io_ch); 1639 1640 TAILQ_INIT(&qos->queued); 1641 1642 for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) { 1643 if (_spdk_bdev_qos_is_iops_rate_limit(i) == true) { 1644 qos->rate_limits[i].min_per_timeslice = 1645 SPDK_BDEV_QOS_MIN_IO_PER_TIMESLICE; 1646 } else { 1647 qos->rate_limits[i].min_per_timeslice = 1648 SPDK_BDEV_QOS_MIN_BYTE_PER_TIMESLICE; 1649 } 1650 1651 if (qos->rate_limits[i].limit == 0) { 1652 qos->rate_limits[i].limit = SPDK_BDEV_QOS_LIMIT_NOT_DEFINED; 1653 } 1654 } 1655 spdk_bdev_qos_update_max_quota_per_timeslice(qos); 1656 qos->timeslice_size = 1657 SPDK_BDEV_QOS_TIMESLICE_IN_USEC * spdk_get_ticks_hz() / SPDK_SEC_TO_USEC; 1658 qos->last_timeslice = spdk_get_ticks(); 1659 qos->poller = spdk_poller_register(spdk_bdev_channel_poll_qos, 1660 qos, 1661 SPDK_BDEV_QOS_TIMESLICE_IN_USEC); 1662 } 1663 1664 ch->flags |= BDEV_CH_QOS_ENABLED; 1665 } 1666 } 1667 1668 static int 1669 spdk_bdev_channel_create(void *io_device, void *ctx_buf) 1670 { 1671 struct spdk_bdev *bdev = __bdev_from_io_dev(io_device); 1672 struct spdk_bdev_channel *ch = ctx_buf; 1673 struct spdk_io_channel *mgmt_io_ch; 1674 struct spdk_bdev_mgmt_channel *mgmt_ch; 1675 struct spdk_bdev_shared_resource *shared_resource; 1676 1677 ch->bdev = bdev; 1678 ch->channel = bdev->fn_table->get_io_channel(bdev->ctxt); 1679 if (!ch->channel) { 1680 return -1; 1681 } 1682 1683 mgmt_io_ch = spdk_get_io_channel(&g_bdev_mgr); 1684 if (!mgmt_io_ch) { 1685 return -1; 1686 } 1687 1688 mgmt_ch = spdk_io_channel_get_ctx(mgmt_io_ch); 1689 TAILQ_FOREACH(shared_resource, &mgmt_ch->shared_resources, link) { 1690 if (shared_resource->shared_ch == ch->channel) { 1691 spdk_put_io_channel(mgmt_io_ch); 1692 shared_resource->ref++; 1693 break; 1694 } 1695 } 1696 1697 if (shared_resource == NULL) { 1698 shared_resource = calloc(1, sizeof(*shared_resource)); 1699 if (shared_resource == NULL) { 1700 spdk_put_io_channel(mgmt_io_ch); 1701 return -1; 1702 } 1703 1704 shared_resource->mgmt_ch = mgmt_ch; 1705 shared_resource->io_outstanding = 0; 1706 TAILQ_INIT(&shared_resource->nomem_io); 1707 shared_resource->nomem_threshold = 0; 1708 shared_resource->shared_ch = ch->channel; 1709 shared_resource->ref = 1; 1710 TAILQ_INSERT_TAIL(&mgmt_ch->shared_resources, shared_resource, link); 1711 } 1712 1713 memset(&ch->stat, 0, sizeof(ch->stat)); 1714 ch->stat.ticks_rate = spdk_get_ticks_hz(); 1715 ch->io_outstanding = 0; 1716 TAILQ_INIT(&ch->queued_resets); 1717 ch->flags = 0; 1718 ch->shared_resource = shared_resource; 1719 1720 #ifdef SPDK_CONFIG_VTUNE 1721 { 1722 char *name; 1723 __itt_init_ittlib(NULL, 0); 1724 name = spdk_sprintf_alloc("spdk_bdev_%s_%p", ch->bdev->name, ch); 1725 if (!name) { 1726 _spdk_bdev_channel_destroy_resource(ch); 1727 return -1; 1728 } 1729 ch->handle = __itt_string_handle_create(name); 1730 free(name); 1731 ch->start_tsc = spdk_get_ticks(); 1732 ch->interval_tsc = spdk_get_ticks_hz() / 100; 1733 memset(&ch->prev_stat, 0, sizeof(ch->prev_stat)); 1734 } 1735 #endif 1736 1737 pthread_mutex_lock(&bdev->internal.mutex); 1738 _spdk_bdev_enable_qos(bdev, ch); 1739 pthread_mutex_unlock(&bdev->internal.mutex); 1740 1741 return 0; 1742 } 1743 1744 /* 1745 * Abort I/O that are waiting on a data buffer. These types of I/O are 1746 * linked using the spdk_bdev_io internal.buf_link TAILQ_ENTRY. 1747 */ 1748 static void 1749 _spdk_bdev_abort_buf_io(bdev_io_stailq_t *queue, struct spdk_bdev_channel *ch) 1750 { 1751 bdev_io_stailq_t tmp; 1752 struct spdk_bdev_io *bdev_io; 1753 1754 STAILQ_INIT(&tmp); 1755 1756 while (!STAILQ_EMPTY(queue)) { 1757 bdev_io = STAILQ_FIRST(queue); 1758 STAILQ_REMOVE_HEAD(queue, internal.buf_link); 1759 if (bdev_io->internal.ch == ch) { 1760 spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED); 1761 } else { 1762 STAILQ_INSERT_TAIL(&tmp, bdev_io, internal.buf_link); 1763 } 1764 } 1765 1766 STAILQ_SWAP(&tmp, queue, spdk_bdev_io); 1767 } 1768 1769 /* 1770 * Abort I/O that are queued waiting for submission. These types of I/O are 1771 * linked using the spdk_bdev_io link TAILQ_ENTRY. 1772 */ 1773 static void 1774 _spdk_bdev_abort_queued_io(bdev_io_tailq_t *queue, struct spdk_bdev_channel *ch) 1775 { 1776 struct spdk_bdev_io *bdev_io, *tmp; 1777 1778 TAILQ_FOREACH_SAFE(bdev_io, queue, internal.link, tmp) { 1779 if (bdev_io->internal.ch == ch) { 1780 TAILQ_REMOVE(queue, bdev_io, internal.link); 1781 /* 1782 * spdk_bdev_io_complete() assumes that the completed I/O had 1783 * been submitted to the bdev module. Since in this case it 1784 * hadn't, bump io_outstanding to account for the decrement 1785 * that spdk_bdev_io_complete() will do. 1786 */ 1787 if (bdev_io->type != SPDK_BDEV_IO_TYPE_RESET) { 1788 ch->io_outstanding++; 1789 ch->shared_resource->io_outstanding++; 1790 } 1791 spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED); 1792 } 1793 } 1794 } 1795 1796 static void 1797 spdk_bdev_qos_channel_destroy(void *cb_arg) 1798 { 1799 struct spdk_bdev_qos *qos = cb_arg; 1800 1801 spdk_put_io_channel(spdk_io_channel_from_ctx(qos->ch)); 1802 spdk_poller_unregister(&qos->poller); 1803 1804 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Free QoS %p.\n", qos); 1805 1806 free(qos); 1807 } 1808 1809 static int 1810 spdk_bdev_qos_destroy(struct spdk_bdev *bdev) 1811 { 1812 int i; 1813 1814 /* 1815 * Cleanly shutting down the QoS poller is tricky, because 1816 * during the asynchronous operation the user could open 1817 * a new descriptor and create a new channel, spawning 1818 * a new QoS poller. 1819 * 1820 * The strategy is to create a new QoS structure here and swap it 1821 * in. The shutdown path then continues to refer to the old one 1822 * until it completes and then releases it. 1823 */ 1824 struct spdk_bdev_qos *new_qos, *old_qos; 1825 1826 old_qos = bdev->internal.qos; 1827 1828 new_qos = calloc(1, sizeof(*new_qos)); 1829 if (!new_qos) { 1830 SPDK_ERRLOG("Unable to allocate memory to shut down QoS.\n"); 1831 return -ENOMEM; 1832 } 1833 1834 /* Copy the old QoS data into the newly allocated structure */ 1835 memcpy(new_qos, old_qos, sizeof(*new_qos)); 1836 1837 /* Zero out the key parts of the QoS structure */ 1838 new_qos->ch = NULL; 1839 new_qos->thread = NULL; 1840 new_qos->poller = NULL; 1841 TAILQ_INIT(&new_qos->queued); 1842 /* 1843 * The limit member of spdk_bdev_qos_limit structure is not zeroed. 1844 * It will be used later for the new QoS structure. 1845 */ 1846 for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) { 1847 new_qos->rate_limits[i].remaining_this_timeslice = 0; 1848 new_qos->rate_limits[i].min_per_timeslice = 0; 1849 new_qos->rate_limits[i].max_per_timeslice = 0; 1850 } 1851 1852 bdev->internal.qos = new_qos; 1853 1854 if (old_qos->thread == NULL) { 1855 free(old_qos); 1856 } else { 1857 spdk_thread_send_msg(old_qos->thread, spdk_bdev_qos_channel_destroy, 1858 old_qos); 1859 } 1860 1861 /* It is safe to continue with destroying the bdev even though the QoS channel hasn't 1862 * been destroyed yet. The destruction path will end up waiting for the final 1863 * channel to be put before it releases resources. */ 1864 1865 return 0; 1866 } 1867 1868 static void 1869 _spdk_bdev_io_stat_add(struct spdk_bdev_io_stat *total, struct spdk_bdev_io_stat *add) 1870 { 1871 total->bytes_read += add->bytes_read; 1872 total->num_read_ops += add->num_read_ops; 1873 total->bytes_written += add->bytes_written; 1874 total->num_write_ops += add->num_write_ops; 1875 total->read_latency_ticks += add->read_latency_ticks; 1876 total->write_latency_ticks += add->write_latency_ticks; 1877 } 1878 1879 static void 1880 spdk_bdev_channel_destroy(void *io_device, void *ctx_buf) 1881 { 1882 struct spdk_bdev_channel *ch = ctx_buf; 1883 struct spdk_bdev_mgmt_channel *mgmt_ch; 1884 struct spdk_bdev_shared_resource *shared_resource = ch->shared_resource; 1885 1886 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Destroying channel %p for bdev %s on thread %p\n", ch, ch->bdev->name, 1887 spdk_get_thread()); 1888 1889 /* This channel is going away, so add its statistics into the bdev so that they don't get lost. */ 1890 pthread_mutex_lock(&ch->bdev->internal.mutex); 1891 _spdk_bdev_io_stat_add(&ch->bdev->internal.stat, &ch->stat); 1892 pthread_mutex_unlock(&ch->bdev->internal.mutex); 1893 1894 mgmt_ch = shared_resource->mgmt_ch; 1895 1896 _spdk_bdev_abort_queued_io(&ch->queued_resets, ch); 1897 _spdk_bdev_abort_queued_io(&shared_resource->nomem_io, ch); 1898 _spdk_bdev_abort_buf_io(&mgmt_ch->need_buf_small, ch); 1899 _spdk_bdev_abort_buf_io(&mgmt_ch->need_buf_large, ch); 1900 1901 _spdk_bdev_channel_destroy_resource(ch); 1902 } 1903 1904 int 1905 spdk_bdev_alias_add(struct spdk_bdev *bdev, const char *alias) 1906 { 1907 struct spdk_bdev_alias *tmp; 1908 1909 if (alias == NULL) { 1910 SPDK_ERRLOG("Empty alias passed\n"); 1911 return -EINVAL; 1912 } 1913 1914 if (spdk_bdev_get_by_name(alias)) { 1915 SPDK_ERRLOG("Bdev name/alias: %s already exists\n", alias); 1916 return -EEXIST; 1917 } 1918 1919 tmp = calloc(1, sizeof(*tmp)); 1920 if (tmp == NULL) { 1921 SPDK_ERRLOG("Unable to allocate alias\n"); 1922 return -ENOMEM; 1923 } 1924 1925 tmp->alias = strdup(alias); 1926 if (tmp->alias == NULL) { 1927 free(tmp); 1928 SPDK_ERRLOG("Unable to allocate alias\n"); 1929 return -ENOMEM; 1930 } 1931 1932 TAILQ_INSERT_TAIL(&bdev->aliases, tmp, tailq); 1933 1934 return 0; 1935 } 1936 1937 int 1938 spdk_bdev_alias_del(struct spdk_bdev *bdev, const char *alias) 1939 { 1940 struct spdk_bdev_alias *tmp; 1941 1942 TAILQ_FOREACH(tmp, &bdev->aliases, tailq) { 1943 if (strcmp(alias, tmp->alias) == 0) { 1944 TAILQ_REMOVE(&bdev->aliases, tmp, tailq); 1945 free(tmp->alias); 1946 free(tmp); 1947 return 0; 1948 } 1949 } 1950 1951 SPDK_INFOLOG(SPDK_LOG_BDEV, "Alias %s does not exists\n", alias); 1952 1953 return -ENOENT; 1954 } 1955 1956 void 1957 spdk_bdev_alias_del_all(struct spdk_bdev *bdev) 1958 { 1959 struct spdk_bdev_alias *p, *tmp; 1960 1961 TAILQ_FOREACH_SAFE(p, &bdev->aliases, tailq, tmp) { 1962 TAILQ_REMOVE(&bdev->aliases, p, tailq); 1963 free(p->alias); 1964 free(p); 1965 } 1966 } 1967 1968 struct spdk_io_channel * 1969 spdk_bdev_get_io_channel(struct spdk_bdev_desc *desc) 1970 { 1971 return spdk_get_io_channel(__bdev_to_io_dev(desc->bdev)); 1972 } 1973 1974 const char * 1975 spdk_bdev_get_name(const struct spdk_bdev *bdev) 1976 { 1977 return bdev->name; 1978 } 1979 1980 const char * 1981 spdk_bdev_get_product_name(const struct spdk_bdev *bdev) 1982 { 1983 return bdev->product_name; 1984 } 1985 1986 const struct spdk_bdev_aliases_list * 1987 spdk_bdev_get_aliases(const struct spdk_bdev *bdev) 1988 { 1989 return &bdev->aliases; 1990 } 1991 1992 uint32_t 1993 spdk_bdev_get_block_size(const struct spdk_bdev *bdev) 1994 { 1995 return bdev->blocklen; 1996 } 1997 1998 uint64_t 1999 spdk_bdev_get_num_blocks(const struct spdk_bdev *bdev) 2000 { 2001 return bdev->blockcnt; 2002 } 2003 2004 const char * 2005 spdk_bdev_get_qos_rpc_type(enum spdk_bdev_qos_rate_limit_type type) 2006 { 2007 return qos_rpc_type[type]; 2008 } 2009 2010 void 2011 spdk_bdev_get_qos_rate_limits(struct spdk_bdev *bdev, uint64_t *limits) 2012 { 2013 int i; 2014 2015 memset(limits, 0, sizeof(*limits) * SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES); 2016 2017 pthread_mutex_lock(&bdev->internal.mutex); 2018 if (bdev->internal.qos) { 2019 for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) { 2020 if (bdev->internal.qos->rate_limits[i].limit != 2021 SPDK_BDEV_QOS_LIMIT_NOT_DEFINED) { 2022 limits[i] = bdev->internal.qos->rate_limits[i].limit; 2023 if (_spdk_bdev_qos_is_iops_rate_limit(i) == false) { 2024 /* Change from Byte to Megabyte which is user visible. */ 2025 limits[i] = limits[i] / 1024 / 1024; 2026 } 2027 } 2028 } 2029 } 2030 pthread_mutex_unlock(&bdev->internal.mutex); 2031 } 2032 2033 size_t 2034 spdk_bdev_get_buf_align(const struct spdk_bdev *bdev) 2035 { 2036 /* TODO: push this logic down to the bdev modules */ 2037 if (bdev->need_aligned_buffer) { 2038 return bdev->blocklen; 2039 } 2040 2041 return 1; 2042 } 2043 2044 uint32_t 2045 spdk_bdev_get_optimal_io_boundary(const struct spdk_bdev *bdev) 2046 { 2047 return bdev->optimal_io_boundary; 2048 } 2049 2050 bool 2051 spdk_bdev_has_write_cache(const struct spdk_bdev *bdev) 2052 { 2053 return bdev->write_cache; 2054 } 2055 2056 const struct spdk_uuid * 2057 spdk_bdev_get_uuid(const struct spdk_bdev *bdev) 2058 { 2059 return &bdev->uuid; 2060 } 2061 2062 uint64_t 2063 spdk_bdev_get_qd(const struct spdk_bdev *bdev) 2064 { 2065 return bdev->internal.measured_queue_depth; 2066 } 2067 2068 uint64_t 2069 spdk_bdev_get_qd_sampling_period(const struct spdk_bdev *bdev) 2070 { 2071 return bdev->internal.period; 2072 } 2073 2074 uint64_t 2075 spdk_bdev_get_weighted_io_time(const struct spdk_bdev *bdev) 2076 { 2077 return bdev->internal.weighted_io_time; 2078 } 2079 2080 uint64_t 2081 spdk_bdev_get_io_time(const struct spdk_bdev *bdev) 2082 { 2083 return bdev->internal.io_time; 2084 } 2085 2086 static void 2087 _calculate_measured_qd_cpl(struct spdk_io_channel_iter *i, int status) 2088 { 2089 struct spdk_bdev *bdev = spdk_io_channel_iter_get_ctx(i); 2090 2091 bdev->internal.measured_queue_depth = bdev->internal.temporary_queue_depth; 2092 2093 if (bdev->internal.measured_queue_depth) { 2094 bdev->internal.io_time += bdev->internal.period; 2095 bdev->internal.weighted_io_time += bdev->internal.period * bdev->internal.measured_queue_depth; 2096 } 2097 } 2098 2099 static void 2100 _calculate_measured_qd(struct spdk_io_channel_iter *i) 2101 { 2102 struct spdk_bdev *bdev = spdk_io_channel_iter_get_ctx(i); 2103 struct spdk_io_channel *io_ch = spdk_io_channel_iter_get_channel(i); 2104 struct spdk_bdev_channel *ch = spdk_io_channel_get_ctx(io_ch); 2105 2106 bdev->internal.temporary_queue_depth += ch->io_outstanding; 2107 spdk_for_each_channel_continue(i, 0); 2108 } 2109 2110 static int 2111 spdk_bdev_calculate_measured_queue_depth(void *ctx) 2112 { 2113 struct spdk_bdev *bdev = ctx; 2114 bdev->internal.temporary_queue_depth = 0; 2115 spdk_for_each_channel(__bdev_to_io_dev(bdev), _calculate_measured_qd, bdev, 2116 _calculate_measured_qd_cpl); 2117 return 0; 2118 } 2119 2120 void 2121 spdk_bdev_set_qd_sampling_period(struct spdk_bdev *bdev, uint64_t period) 2122 { 2123 bdev->internal.period = period; 2124 2125 if (bdev->internal.qd_poller != NULL) { 2126 spdk_poller_unregister(&bdev->internal.qd_poller); 2127 bdev->internal.measured_queue_depth = UINT64_MAX; 2128 } 2129 2130 if (period != 0) { 2131 bdev->internal.qd_poller = spdk_poller_register(spdk_bdev_calculate_measured_queue_depth, bdev, 2132 period); 2133 } 2134 } 2135 2136 int 2137 spdk_bdev_notify_blockcnt_change(struct spdk_bdev *bdev, uint64_t size) 2138 { 2139 int ret; 2140 2141 pthread_mutex_lock(&bdev->internal.mutex); 2142 2143 /* bdev has open descriptors */ 2144 if (!TAILQ_EMPTY(&bdev->internal.open_descs) && 2145 bdev->blockcnt > size) { 2146 ret = -EBUSY; 2147 } else { 2148 bdev->blockcnt = size; 2149 ret = 0; 2150 } 2151 2152 pthread_mutex_unlock(&bdev->internal.mutex); 2153 2154 return ret; 2155 } 2156 2157 /* 2158 * Convert I/O offset and length from bytes to blocks. 2159 * 2160 * Returns zero on success or non-zero if the byte parameters aren't divisible by the block size. 2161 */ 2162 static uint64_t 2163 spdk_bdev_bytes_to_blocks(struct spdk_bdev *bdev, uint64_t offset_bytes, uint64_t *offset_blocks, 2164 uint64_t num_bytes, uint64_t *num_blocks) 2165 { 2166 uint32_t block_size = bdev->blocklen; 2167 2168 *offset_blocks = offset_bytes / block_size; 2169 *num_blocks = num_bytes / block_size; 2170 2171 return (offset_bytes % block_size) | (num_bytes % block_size); 2172 } 2173 2174 static bool 2175 spdk_bdev_io_valid_blocks(struct spdk_bdev *bdev, uint64_t offset_blocks, uint64_t num_blocks) 2176 { 2177 /* Return failure if offset_blocks + num_blocks is less than offset_blocks; indicates there 2178 * has been an overflow and hence the offset has been wrapped around */ 2179 if (offset_blocks + num_blocks < offset_blocks) { 2180 return false; 2181 } 2182 2183 /* Return failure if offset_blocks + num_blocks exceeds the size of the bdev */ 2184 if (offset_blocks + num_blocks > bdev->blockcnt) { 2185 return false; 2186 } 2187 2188 return true; 2189 } 2190 2191 int 2192 spdk_bdev_read(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 2193 void *buf, uint64_t offset, uint64_t nbytes, 2194 spdk_bdev_io_completion_cb cb, void *cb_arg) 2195 { 2196 uint64_t offset_blocks, num_blocks; 2197 2198 if (spdk_bdev_bytes_to_blocks(desc->bdev, offset, &offset_blocks, nbytes, &num_blocks) != 0) { 2199 return -EINVAL; 2200 } 2201 2202 return spdk_bdev_read_blocks(desc, ch, buf, offset_blocks, num_blocks, cb, cb_arg); 2203 } 2204 2205 int 2206 spdk_bdev_read_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 2207 void *buf, uint64_t offset_blocks, uint64_t num_blocks, 2208 spdk_bdev_io_completion_cb cb, void *cb_arg) 2209 { 2210 struct spdk_bdev *bdev = desc->bdev; 2211 struct spdk_bdev_io *bdev_io; 2212 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 2213 2214 if (!spdk_bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) { 2215 return -EINVAL; 2216 } 2217 2218 bdev_io = spdk_bdev_get_io(channel); 2219 if (!bdev_io) { 2220 return -ENOMEM; 2221 } 2222 2223 bdev_io->internal.ch = channel; 2224 bdev_io->internal.desc = desc; 2225 bdev_io->type = SPDK_BDEV_IO_TYPE_READ; 2226 bdev_io->u.bdev.iovs = &bdev_io->iov; 2227 bdev_io->u.bdev.iovs[0].iov_base = buf; 2228 bdev_io->u.bdev.iovs[0].iov_len = num_blocks * bdev->blocklen; 2229 bdev_io->u.bdev.iovcnt = 1; 2230 bdev_io->u.bdev.num_blocks = num_blocks; 2231 bdev_io->u.bdev.offset_blocks = offset_blocks; 2232 spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb); 2233 2234 spdk_bdev_io_submit(bdev_io); 2235 return 0; 2236 } 2237 2238 int 2239 spdk_bdev_readv(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 2240 struct iovec *iov, int iovcnt, 2241 uint64_t offset, uint64_t nbytes, 2242 spdk_bdev_io_completion_cb cb, void *cb_arg) 2243 { 2244 uint64_t offset_blocks, num_blocks; 2245 2246 if (spdk_bdev_bytes_to_blocks(desc->bdev, offset, &offset_blocks, nbytes, &num_blocks) != 0) { 2247 return -EINVAL; 2248 } 2249 2250 return spdk_bdev_readv_blocks(desc, ch, iov, iovcnt, offset_blocks, num_blocks, cb, cb_arg); 2251 } 2252 2253 int spdk_bdev_readv_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 2254 struct iovec *iov, int iovcnt, 2255 uint64_t offset_blocks, uint64_t num_blocks, 2256 spdk_bdev_io_completion_cb cb, void *cb_arg) 2257 { 2258 struct spdk_bdev *bdev = desc->bdev; 2259 struct spdk_bdev_io *bdev_io; 2260 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 2261 2262 if (!spdk_bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) { 2263 return -EINVAL; 2264 } 2265 2266 bdev_io = spdk_bdev_get_io(channel); 2267 if (!bdev_io) { 2268 return -ENOMEM; 2269 } 2270 2271 bdev_io->internal.ch = channel; 2272 bdev_io->internal.desc = desc; 2273 bdev_io->type = SPDK_BDEV_IO_TYPE_READ; 2274 bdev_io->u.bdev.iovs = iov; 2275 bdev_io->u.bdev.iovcnt = iovcnt; 2276 bdev_io->u.bdev.num_blocks = num_blocks; 2277 bdev_io->u.bdev.offset_blocks = offset_blocks; 2278 spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb); 2279 2280 spdk_bdev_io_submit(bdev_io); 2281 return 0; 2282 } 2283 2284 int 2285 spdk_bdev_write(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 2286 void *buf, uint64_t offset, uint64_t nbytes, 2287 spdk_bdev_io_completion_cb cb, void *cb_arg) 2288 { 2289 uint64_t offset_blocks, num_blocks; 2290 2291 if (spdk_bdev_bytes_to_blocks(desc->bdev, offset, &offset_blocks, nbytes, &num_blocks) != 0) { 2292 return -EINVAL; 2293 } 2294 2295 return spdk_bdev_write_blocks(desc, ch, buf, offset_blocks, num_blocks, cb, cb_arg); 2296 } 2297 2298 int 2299 spdk_bdev_write_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 2300 void *buf, uint64_t offset_blocks, uint64_t num_blocks, 2301 spdk_bdev_io_completion_cb cb, void *cb_arg) 2302 { 2303 struct spdk_bdev *bdev = desc->bdev; 2304 struct spdk_bdev_io *bdev_io; 2305 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 2306 2307 if (!desc->write) { 2308 return -EBADF; 2309 } 2310 2311 if (!spdk_bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) { 2312 return -EINVAL; 2313 } 2314 2315 bdev_io = spdk_bdev_get_io(channel); 2316 if (!bdev_io) { 2317 return -ENOMEM; 2318 } 2319 2320 bdev_io->internal.ch = channel; 2321 bdev_io->internal.desc = desc; 2322 bdev_io->type = SPDK_BDEV_IO_TYPE_WRITE; 2323 bdev_io->u.bdev.iovs = &bdev_io->iov; 2324 bdev_io->u.bdev.iovs[0].iov_base = buf; 2325 bdev_io->u.bdev.iovs[0].iov_len = num_blocks * bdev->blocklen; 2326 bdev_io->u.bdev.iovcnt = 1; 2327 bdev_io->u.bdev.num_blocks = num_blocks; 2328 bdev_io->u.bdev.offset_blocks = offset_blocks; 2329 spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb); 2330 2331 spdk_bdev_io_submit(bdev_io); 2332 return 0; 2333 } 2334 2335 int 2336 spdk_bdev_writev(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 2337 struct iovec *iov, int iovcnt, 2338 uint64_t offset, uint64_t len, 2339 spdk_bdev_io_completion_cb cb, void *cb_arg) 2340 { 2341 uint64_t offset_blocks, num_blocks; 2342 2343 if (spdk_bdev_bytes_to_blocks(desc->bdev, offset, &offset_blocks, len, &num_blocks) != 0) { 2344 return -EINVAL; 2345 } 2346 2347 return spdk_bdev_writev_blocks(desc, ch, iov, iovcnt, offset_blocks, num_blocks, cb, cb_arg); 2348 } 2349 2350 int 2351 spdk_bdev_writev_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 2352 struct iovec *iov, int iovcnt, 2353 uint64_t offset_blocks, uint64_t num_blocks, 2354 spdk_bdev_io_completion_cb cb, void *cb_arg) 2355 { 2356 struct spdk_bdev *bdev = desc->bdev; 2357 struct spdk_bdev_io *bdev_io; 2358 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 2359 2360 if (!desc->write) { 2361 return -EBADF; 2362 } 2363 2364 if (!spdk_bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) { 2365 return -EINVAL; 2366 } 2367 2368 bdev_io = spdk_bdev_get_io(channel); 2369 if (!bdev_io) { 2370 return -ENOMEM; 2371 } 2372 2373 bdev_io->internal.ch = channel; 2374 bdev_io->internal.desc = desc; 2375 bdev_io->type = SPDK_BDEV_IO_TYPE_WRITE; 2376 bdev_io->u.bdev.iovs = iov; 2377 bdev_io->u.bdev.iovcnt = iovcnt; 2378 bdev_io->u.bdev.num_blocks = num_blocks; 2379 bdev_io->u.bdev.offset_blocks = offset_blocks; 2380 spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb); 2381 2382 spdk_bdev_io_submit(bdev_io); 2383 return 0; 2384 } 2385 2386 int 2387 spdk_bdev_write_zeroes(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 2388 uint64_t offset, uint64_t len, 2389 spdk_bdev_io_completion_cb cb, void *cb_arg) 2390 { 2391 uint64_t offset_blocks, num_blocks; 2392 2393 if (spdk_bdev_bytes_to_blocks(desc->bdev, offset, &offset_blocks, len, &num_blocks) != 0) { 2394 return -EINVAL; 2395 } 2396 2397 return spdk_bdev_write_zeroes_blocks(desc, ch, offset_blocks, num_blocks, cb, cb_arg); 2398 } 2399 2400 int 2401 spdk_bdev_write_zeroes_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 2402 uint64_t offset_blocks, uint64_t num_blocks, 2403 spdk_bdev_io_completion_cb cb, void *cb_arg) 2404 { 2405 struct spdk_bdev *bdev = desc->bdev; 2406 struct spdk_bdev_io *bdev_io; 2407 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 2408 2409 if (!desc->write) { 2410 return -EBADF; 2411 } 2412 2413 if (!spdk_bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) { 2414 return -EINVAL; 2415 } 2416 2417 bdev_io = spdk_bdev_get_io(channel); 2418 2419 if (!bdev_io) { 2420 return -ENOMEM; 2421 } 2422 2423 bdev_io->type = SPDK_BDEV_IO_TYPE_WRITE_ZEROES; 2424 bdev_io->internal.ch = channel; 2425 bdev_io->internal.desc = desc; 2426 bdev_io->u.bdev.offset_blocks = offset_blocks; 2427 bdev_io->u.bdev.num_blocks = num_blocks; 2428 spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb); 2429 2430 if (_spdk_bdev_io_type_supported(bdev, SPDK_BDEV_IO_TYPE_WRITE_ZEROES)) { 2431 spdk_bdev_io_submit(bdev_io); 2432 return 0; 2433 } else if (_spdk_bdev_io_type_supported(bdev, SPDK_BDEV_IO_TYPE_WRITE)) { 2434 assert(spdk_bdev_get_block_size(bdev) <= ZERO_BUFFER_SIZE); 2435 bdev_io->u.bdev.split_remaining_num_blocks = num_blocks; 2436 bdev_io->u.bdev.split_current_offset_blocks = offset_blocks; 2437 _spdk_bdev_write_zero_buffer_next(bdev_io); 2438 return 0; 2439 } else { 2440 spdk_bdev_free_io(bdev_io); 2441 return -ENOTSUP; 2442 } 2443 } 2444 2445 int 2446 spdk_bdev_unmap(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 2447 uint64_t offset, uint64_t nbytes, 2448 spdk_bdev_io_completion_cb cb, void *cb_arg) 2449 { 2450 uint64_t offset_blocks, num_blocks; 2451 2452 if (spdk_bdev_bytes_to_blocks(desc->bdev, offset, &offset_blocks, nbytes, &num_blocks) != 0) { 2453 return -EINVAL; 2454 } 2455 2456 return spdk_bdev_unmap_blocks(desc, ch, offset_blocks, num_blocks, cb, cb_arg); 2457 } 2458 2459 int 2460 spdk_bdev_unmap_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 2461 uint64_t offset_blocks, uint64_t num_blocks, 2462 spdk_bdev_io_completion_cb cb, void *cb_arg) 2463 { 2464 struct spdk_bdev *bdev = desc->bdev; 2465 struct spdk_bdev_io *bdev_io; 2466 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 2467 2468 if (!desc->write) { 2469 return -EBADF; 2470 } 2471 2472 if (!spdk_bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) { 2473 return -EINVAL; 2474 } 2475 2476 if (num_blocks == 0) { 2477 SPDK_ERRLOG("Can't unmap 0 bytes\n"); 2478 return -EINVAL; 2479 } 2480 2481 bdev_io = spdk_bdev_get_io(channel); 2482 if (!bdev_io) { 2483 return -ENOMEM; 2484 } 2485 2486 bdev_io->internal.ch = channel; 2487 bdev_io->internal.desc = desc; 2488 bdev_io->type = SPDK_BDEV_IO_TYPE_UNMAP; 2489 2490 bdev_io->u.bdev.iovs = &bdev_io->iov; 2491 bdev_io->u.bdev.iovs[0].iov_base = NULL; 2492 bdev_io->u.bdev.iovs[0].iov_len = 0; 2493 bdev_io->u.bdev.iovcnt = 1; 2494 2495 bdev_io->u.bdev.offset_blocks = offset_blocks; 2496 bdev_io->u.bdev.num_blocks = num_blocks; 2497 spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb); 2498 2499 spdk_bdev_io_submit(bdev_io); 2500 return 0; 2501 } 2502 2503 int 2504 spdk_bdev_flush(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 2505 uint64_t offset, uint64_t length, 2506 spdk_bdev_io_completion_cb cb, void *cb_arg) 2507 { 2508 uint64_t offset_blocks, num_blocks; 2509 2510 if (spdk_bdev_bytes_to_blocks(desc->bdev, offset, &offset_blocks, length, &num_blocks) != 0) { 2511 return -EINVAL; 2512 } 2513 2514 return spdk_bdev_flush_blocks(desc, ch, offset_blocks, num_blocks, cb, cb_arg); 2515 } 2516 2517 int 2518 spdk_bdev_flush_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 2519 uint64_t offset_blocks, uint64_t num_blocks, 2520 spdk_bdev_io_completion_cb cb, void *cb_arg) 2521 { 2522 struct spdk_bdev *bdev = desc->bdev; 2523 struct spdk_bdev_io *bdev_io; 2524 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 2525 2526 if (!desc->write) { 2527 return -EBADF; 2528 } 2529 2530 if (!spdk_bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) { 2531 return -EINVAL; 2532 } 2533 2534 bdev_io = spdk_bdev_get_io(channel); 2535 if (!bdev_io) { 2536 return -ENOMEM; 2537 } 2538 2539 bdev_io->internal.ch = channel; 2540 bdev_io->internal.desc = desc; 2541 bdev_io->type = SPDK_BDEV_IO_TYPE_FLUSH; 2542 bdev_io->u.bdev.iovs = NULL; 2543 bdev_io->u.bdev.iovcnt = 0; 2544 bdev_io->u.bdev.offset_blocks = offset_blocks; 2545 bdev_io->u.bdev.num_blocks = num_blocks; 2546 spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb); 2547 2548 spdk_bdev_io_submit(bdev_io); 2549 return 0; 2550 } 2551 2552 static void 2553 _spdk_bdev_reset_dev(struct spdk_io_channel_iter *i, int status) 2554 { 2555 struct spdk_bdev_channel *ch = spdk_io_channel_iter_get_ctx(i); 2556 struct spdk_bdev_io *bdev_io; 2557 2558 bdev_io = TAILQ_FIRST(&ch->queued_resets); 2559 TAILQ_REMOVE(&ch->queued_resets, bdev_io, internal.link); 2560 spdk_bdev_io_submit_reset(bdev_io); 2561 } 2562 2563 static void 2564 _spdk_bdev_reset_freeze_channel(struct spdk_io_channel_iter *i) 2565 { 2566 struct spdk_io_channel *ch; 2567 struct spdk_bdev_channel *channel; 2568 struct spdk_bdev_mgmt_channel *mgmt_channel; 2569 struct spdk_bdev_shared_resource *shared_resource; 2570 bdev_io_tailq_t tmp_queued; 2571 2572 TAILQ_INIT(&tmp_queued); 2573 2574 ch = spdk_io_channel_iter_get_channel(i); 2575 channel = spdk_io_channel_get_ctx(ch); 2576 shared_resource = channel->shared_resource; 2577 mgmt_channel = shared_resource->mgmt_ch; 2578 2579 channel->flags |= BDEV_CH_RESET_IN_PROGRESS; 2580 2581 if ((channel->flags & BDEV_CH_QOS_ENABLED) != 0) { 2582 /* The QoS object is always valid and readable while 2583 * the channel flag is set, so the lock here should not 2584 * be necessary. We're not in the fast path though, so 2585 * just take it anyway. */ 2586 pthread_mutex_lock(&channel->bdev->internal.mutex); 2587 if (channel->bdev->internal.qos->ch == channel) { 2588 TAILQ_SWAP(&channel->bdev->internal.qos->queued, &tmp_queued, spdk_bdev_io, internal.link); 2589 } 2590 pthread_mutex_unlock(&channel->bdev->internal.mutex); 2591 } 2592 2593 _spdk_bdev_abort_queued_io(&shared_resource->nomem_io, channel); 2594 _spdk_bdev_abort_buf_io(&mgmt_channel->need_buf_small, channel); 2595 _spdk_bdev_abort_buf_io(&mgmt_channel->need_buf_large, channel); 2596 _spdk_bdev_abort_queued_io(&tmp_queued, channel); 2597 2598 spdk_for_each_channel_continue(i, 0); 2599 } 2600 2601 static void 2602 _spdk_bdev_start_reset(void *ctx) 2603 { 2604 struct spdk_bdev_channel *ch = ctx; 2605 2606 spdk_for_each_channel(__bdev_to_io_dev(ch->bdev), _spdk_bdev_reset_freeze_channel, 2607 ch, _spdk_bdev_reset_dev); 2608 } 2609 2610 static void 2611 _spdk_bdev_channel_start_reset(struct spdk_bdev_channel *ch) 2612 { 2613 struct spdk_bdev *bdev = ch->bdev; 2614 2615 assert(!TAILQ_EMPTY(&ch->queued_resets)); 2616 2617 pthread_mutex_lock(&bdev->internal.mutex); 2618 if (bdev->internal.reset_in_progress == NULL) { 2619 bdev->internal.reset_in_progress = TAILQ_FIRST(&ch->queued_resets); 2620 /* 2621 * Take a channel reference for the target bdev for the life of this 2622 * reset. This guards against the channel getting destroyed while 2623 * spdk_for_each_channel() calls related to this reset IO are in 2624 * progress. We will release the reference when this reset is 2625 * completed. 2626 */ 2627 bdev->internal.reset_in_progress->u.reset.ch_ref = spdk_get_io_channel(__bdev_to_io_dev(bdev)); 2628 _spdk_bdev_start_reset(ch); 2629 } 2630 pthread_mutex_unlock(&bdev->internal.mutex); 2631 } 2632 2633 int 2634 spdk_bdev_reset(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 2635 spdk_bdev_io_completion_cb cb, void *cb_arg) 2636 { 2637 struct spdk_bdev *bdev = desc->bdev; 2638 struct spdk_bdev_io *bdev_io; 2639 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 2640 2641 bdev_io = spdk_bdev_get_io(channel); 2642 if (!bdev_io) { 2643 return -ENOMEM; 2644 } 2645 2646 bdev_io->internal.ch = channel; 2647 bdev_io->internal.desc = desc; 2648 bdev_io->type = SPDK_BDEV_IO_TYPE_RESET; 2649 bdev_io->u.reset.ch_ref = NULL; 2650 spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb); 2651 2652 pthread_mutex_lock(&bdev->internal.mutex); 2653 TAILQ_INSERT_TAIL(&channel->queued_resets, bdev_io, internal.link); 2654 pthread_mutex_unlock(&bdev->internal.mutex); 2655 2656 _spdk_bdev_channel_start_reset(channel); 2657 2658 return 0; 2659 } 2660 2661 void 2662 spdk_bdev_get_io_stat(struct spdk_bdev *bdev, struct spdk_io_channel *ch, 2663 struct spdk_bdev_io_stat *stat) 2664 { 2665 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 2666 2667 *stat = channel->stat; 2668 } 2669 2670 static void 2671 _spdk_bdev_get_device_stat_done(struct spdk_io_channel_iter *i, int status) 2672 { 2673 void *io_device = spdk_io_channel_iter_get_io_device(i); 2674 struct spdk_bdev_iostat_ctx *bdev_iostat_ctx = spdk_io_channel_iter_get_ctx(i); 2675 2676 bdev_iostat_ctx->cb(__bdev_from_io_dev(io_device), bdev_iostat_ctx->stat, 2677 bdev_iostat_ctx->cb_arg, 0); 2678 free(bdev_iostat_ctx); 2679 } 2680 2681 static void 2682 _spdk_bdev_get_each_channel_stat(struct spdk_io_channel_iter *i) 2683 { 2684 struct spdk_bdev_iostat_ctx *bdev_iostat_ctx = spdk_io_channel_iter_get_ctx(i); 2685 struct spdk_io_channel *ch = spdk_io_channel_iter_get_channel(i); 2686 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 2687 2688 _spdk_bdev_io_stat_add(bdev_iostat_ctx->stat, &channel->stat); 2689 spdk_for_each_channel_continue(i, 0); 2690 } 2691 2692 void 2693 spdk_bdev_get_device_stat(struct spdk_bdev *bdev, struct spdk_bdev_io_stat *stat, 2694 spdk_bdev_get_device_stat_cb cb, void *cb_arg) 2695 { 2696 struct spdk_bdev_iostat_ctx *bdev_iostat_ctx; 2697 2698 assert(bdev != NULL); 2699 assert(stat != NULL); 2700 assert(cb != NULL); 2701 2702 bdev_iostat_ctx = calloc(1, sizeof(struct spdk_bdev_iostat_ctx)); 2703 if (bdev_iostat_ctx == NULL) { 2704 SPDK_ERRLOG("Unable to allocate memory for spdk_bdev_iostat_ctx\n"); 2705 cb(bdev, stat, cb_arg, -ENOMEM); 2706 return; 2707 } 2708 2709 bdev_iostat_ctx->stat = stat; 2710 bdev_iostat_ctx->cb = cb; 2711 bdev_iostat_ctx->cb_arg = cb_arg; 2712 2713 /* Start with the statistics from previously deleted channels. */ 2714 pthread_mutex_lock(&bdev->internal.mutex); 2715 _spdk_bdev_io_stat_add(bdev_iostat_ctx->stat, &bdev->internal.stat); 2716 pthread_mutex_unlock(&bdev->internal.mutex); 2717 2718 /* Then iterate and add the statistics from each existing channel. */ 2719 spdk_for_each_channel(__bdev_to_io_dev(bdev), 2720 _spdk_bdev_get_each_channel_stat, 2721 bdev_iostat_ctx, 2722 _spdk_bdev_get_device_stat_done); 2723 } 2724 2725 int 2726 spdk_bdev_nvme_admin_passthru(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 2727 const struct spdk_nvme_cmd *cmd, void *buf, size_t nbytes, 2728 spdk_bdev_io_completion_cb cb, void *cb_arg) 2729 { 2730 struct spdk_bdev *bdev = desc->bdev; 2731 struct spdk_bdev_io *bdev_io; 2732 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 2733 2734 if (!desc->write) { 2735 return -EBADF; 2736 } 2737 2738 bdev_io = spdk_bdev_get_io(channel); 2739 if (!bdev_io) { 2740 return -ENOMEM; 2741 } 2742 2743 bdev_io->internal.ch = channel; 2744 bdev_io->internal.desc = desc; 2745 bdev_io->type = SPDK_BDEV_IO_TYPE_NVME_ADMIN; 2746 bdev_io->u.nvme_passthru.cmd = *cmd; 2747 bdev_io->u.nvme_passthru.buf = buf; 2748 bdev_io->u.nvme_passthru.nbytes = nbytes; 2749 bdev_io->u.nvme_passthru.md_buf = NULL; 2750 bdev_io->u.nvme_passthru.md_len = 0; 2751 2752 spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb); 2753 2754 spdk_bdev_io_submit(bdev_io); 2755 return 0; 2756 } 2757 2758 int 2759 spdk_bdev_nvme_io_passthru(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 2760 const struct spdk_nvme_cmd *cmd, void *buf, size_t nbytes, 2761 spdk_bdev_io_completion_cb cb, void *cb_arg) 2762 { 2763 struct spdk_bdev *bdev = desc->bdev; 2764 struct spdk_bdev_io *bdev_io; 2765 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 2766 2767 if (!desc->write) { 2768 /* 2769 * Do not try to parse the NVMe command - we could maybe use bits in the opcode 2770 * to easily determine if the command is a read or write, but for now just 2771 * do not allow io_passthru with a read-only descriptor. 2772 */ 2773 return -EBADF; 2774 } 2775 2776 bdev_io = spdk_bdev_get_io(channel); 2777 if (!bdev_io) { 2778 return -ENOMEM; 2779 } 2780 2781 bdev_io->internal.ch = channel; 2782 bdev_io->internal.desc = desc; 2783 bdev_io->type = SPDK_BDEV_IO_TYPE_NVME_IO; 2784 bdev_io->u.nvme_passthru.cmd = *cmd; 2785 bdev_io->u.nvme_passthru.buf = buf; 2786 bdev_io->u.nvme_passthru.nbytes = nbytes; 2787 bdev_io->u.nvme_passthru.md_buf = NULL; 2788 bdev_io->u.nvme_passthru.md_len = 0; 2789 2790 spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb); 2791 2792 spdk_bdev_io_submit(bdev_io); 2793 return 0; 2794 } 2795 2796 int 2797 spdk_bdev_nvme_io_passthru_md(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 2798 const struct spdk_nvme_cmd *cmd, void *buf, size_t nbytes, void *md_buf, size_t md_len, 2799 spdk_bdev_io_completion_cb cb, void *cb_arg) 2800 { 2801 struct spdk_bdev *bdev = desc->bdev; 2802 struct spdk_bdev_io *bdev_io; 2803 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 2804 2805 if (!desc->write) { 2806 /* 2807 * Do not try to parse the NVMe command - we could maybe use bits in the opcode 2808 * to easily determine if the command is a read or write, but for now just 2809 * do not allow io_passthru with a read-only descriptor. 2810 */ 2811 return -EBADF; 2812 } 2813 2814 bdev_io = spdk_bdev_get_io(channel); 2815 if (!bdev_io) { 2816 return -ENOMEM; 2817 } 2818 2819 bdev_io->internal.ch = channel; 2820 bdev_io->internal.desc = desc; 2821 bdev_io->type = SPDK_BDEV_IO_TYPE_NVME_IO_MD; 2822 bdev_io->u.nvme_passthru.cmd = *cmd; 2823 bdev_io->u.nvme_passthru.buf = buf; 2824 bdev_io->u.nvme_passthru.nbytes = nbytes; 2825 bdev_io->u.nvme_passthru.md_buf = md_buf; 2826 bdev_io->u.nvme_passthru.md_len = md_len; 2827 2828 spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb); 2829 2830 spdk_bdev_io_submit(bdev_io); 2831 return 0; 2832 } 2833 2834 int 2835 spdk_bdev_queue_io_wait(struct spdk_bdev *bdev, struct spdk_io_channel *ch, 2836 struct spdk_bdev_io_wait_entry *entry) 2837 { 2838 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 2839 struct spdk_bdev_mgmt_channel *mgmt_ch = channel->shared_resource->mgmt_ch; 2840 2841 if (bdev != entry->bdev) { 2842 SPDK_ERRLOG("bdevs do not match\n"); 2843 return -EINVAL; 2844 } 2845 2846 if (mgmt_ch->per_thread_cache_count > 0) { 2847 SPDK_ERRLOG("Cannot queue io_wait if spdk_bdev_io available in per-thread cache\n"); 2848 return -EINVAL; 2849 } 2850 2851 TAILQ_INSERT_TAIL(&mgmt_ch->io_wait_queue, entry, link); 2852 return 0; 2853 } 2854 2855 static void 2856 _spdk_bdev_ch_retry_io(struct spdk_bdev_channel *bdev_ch) 2857 { 2858 struct spdk_bdev *bdev = bdev_ch->bdev; 2859 struct spdk_bdev_shared_resource *shared_resource = bdev_ch->shared_resource; 2860 struct spdk_bdev_io *bdev_io; 2861 2862 if (shared_resource->io_outstanding > shared_resource->nomem_threshold) { 2863 /* 2864 * Allow some more I/O to complete before retrying the nomem_io queue. 2865 * Some drivers (such as nvme) cannot immediately take a new I/O in 2866 * the context of a completion, because the resources for the I/O are 2867 * not released until control returns to the bdev poller. Also, we 2868 * may require several small I/O to complete before a larger I/O 2869 * (that requires splitting) can be submitted. 2870 */ 2871 return; 2872 } 2873 2874 while (!TAILQ_EMPTY(&shared_resource->nomem_io)) { 2875 bdev_io = TAILQ_FIRST(&shared_resource->nomem_io); 2876 TAILQ_REMOVE(&shared_resource->nomem_io, bdev_io, internal.link); 2877 bdev_io->internal.ch->io_outstanding++; 2878 shared_resource->io_outstanding++; 2879 bdev_io->internal.status = SPDK_BDEV_IO_STATUS_PENDING; 2880 bdev->fn_table->submit_request(bdev_io->internal.ch->channel, bdev_io); 2881 if (bdev_io->internal.status == SPDK_BDEV_IO_STATUS_NOMEM) { 2882 break; 2883 } 2884 } 2885 } 2886 2887 static inline void 2888 _spdk_bdev_io_complete(void *ctx) 2889 { 2890 struct spdk_bdev_io *bdev_io = ctx; 2891 uint64_t tsc; 2892 2893 if (spdk_unlikely(bdev_io->internal.in_submit_request || bdev_io->internal.io_submit_ch)) { 2894 /* 2895 * Send the completion to the thread that originally submitted the I/O, 2896 * which may not be the current thread in the case of QoS. 2897 */ 2898 if (bdev_io->internal.io_submit_ch) { 2899 bdev_io->internal.ch = bdev_io->internal.io_submit_ch; 2900 bdev_io->internal.io_submit_ch = NULL; 2901 } 2902 2903 /* 2904 * Defer completion to avoid potential infinite recursion if the 2905 * user's completion callback issues a new I/O. 2906 */ 2907 spdk_thread_send_msg(spdk_io_channel_get_thread(bdev_io->internal.ch->channel), 2908 _spdk_bdev_io_complete, bdev_io); 2909 return; 2910 } 2911 2912 tsc = spdk_get_ticks(); 2913 spdk_trace_record_tsc(tsc, TRACE_BDEV_IO_DONE, 0, 0, (uintptr_t)bdev_io, 0); 2914 2915 if (bdev_io->internal.status == SPDK_BDEV_IO_STATUS_SUCCESS) { 2916 switch (bdev_io->type) { 2917 case SPDK_BDEV_IO_TYPE_READ: 2918 bdev_io->internal.ch->stat.bytes_read += bdev_io->u.bdev.num_blocks * bdev_io->bdev->blocklen; 2919 bdev_io->internal.ch->stat.num_read_ops++; 2920 bdev_io->internal.ch->stat.read_latency_ticks += (tsc - bdev_io->internal.submit_tsc); 2921 break; 2922 case SPDK_BDEV_IO_TYPE_WRITE: 2923 bdev_io->internal.ch->stat.bytes_written += bdev_io->u.bdev.num_blocks * bdev_io->bdev->blocklen; 2924 bdev_io->internal.ch->stat.num_write_ops++; 2925 bdev_io->internal.ch->stat.write_latency_ticks += (tsc - bdev_io->internal.submit_tsc); 2926 break; 2927 default: 2928 break; 2929 } 2930 } 2931 2932 #ifdef SPDK_CONFIG_VTUNE 2933 uint64_t now_tsc = spdk_get_ticks(); 2934 if (now_tsc > (bdev_io->internal.ch->start_tsc + bdev_io->internal.ch->interval_tsc)) { 2935 uint64_t data[5]; 2936 2937 data[0] = bdev_io->internal.ch->stat.num_read_ops - bdev_io->internal.ch->prev_stat.num_read_ops; 2938 data[1] = bdev_io->internal.ch->stat.bytes_read - bdev_io->internal.ch->prev_stat.bytes_read; 2939 data[2] = bdev_io->internal.ch->stat.num_write_ops - bdev_io->internal.ch->prev_stat.num_write_ops; 2940 data[3] = bdev_io->internal.ch->stat.bytes_written - bdev_io->internal.ch->prev_stat.bytes_written; 2941 data[4] = bdev_io->bdev->fn_table->get_spin_time ? 2942 bdev_io->bdev->fn_table->get_spin_time(bdev_io->internal.ch->channel) : 0; 2943 2944 __itt_metadata_add(g_bdev_mgr.domain, __itt_null, bdev_io->internal.ch->handle, 2945 __itt_metadata_u64, 5, data); 2946 2947 bdev_io->internal.ch->prev_stat = bdev_io->internal.ch->stat; 2948 bdev_io->internal.ch->start_tsc = now_tsc; 2949 } 2950 #endif 2951 2952 assert(bdev_io->internal.cb != NULL); 2953 assert(spdk_get_thread() == spdk_io_channel_get_thread(bdev_io->internal.ch->channel)); 2954 2955 bdev_io->internal.cb(bdev_io, bdev_io->internal.status == SPDK_BDEV_IO_STATUS_SUCCESS, 2956 bdev_io->internal.caller_ctx); 2957 } 2958 2959 static void 2960 _spdk_bdev_reset_complete(struct spdk_io_channel_iter *i, int status) 2961 { 2962 struct spdk_bdev_io *bdev_io = spdk_io_channel_iter_get_ctx(i); 2963 2964 if (bdev_io->u.reset.ch_ref != NULL) { 2965 spdk_put_io_channel(bdev_io->u.reset.ch_ref); 2966 bdev_io->u.reset.ch_ref = NULL; 2967 } 2968 2969 _spdk_bdev_io_complete(bdev_io); 2970 } 2971 2972 static void 2973 _spdk_bdev_unfreeze_channel(struct spdk_io_channel_iter *i) 2974 { 2975 struct spdk_io_channel *_ch = spdk_io_channel_iter_get_channel(i); 2976 struct spdk_bdev_channel *ch = spdk_io_channel_get_ctx(_ch); 2977 2978 ch->flags &= ~BDEV_CH_RESET_IN_PROGRESS; 2979 if (!TAILQ_EMPTY(&ch->queued_resets)) { 2980 _spdk_bdev_channel_start_reset(ch); 2981 } 2982 2983 spdk_for_each_channel_continue(i, 0); 2984 } 2985 2986 void 2987 spdk_bdev_io_complete(struct spdk_bdev_io *bdev_io, enum spdk_bdev_io_status status) 2988 { 2989 struct spdk_bdev *bdev = bdev_io->bdev; 2990 struct spdk_bdev_channel *bdev_ch = bdev_io->internal.ch; 2991 struct spdk_bdev_shared_resource *shared_resource = bdev_ch->shared_resource; 2992 2993 bdev_io->internal.status = status; 2994 2995 if (spdk_unlikely(bdev_io->type == SPDK_BDEV_IO_TYPE_RESET)) { 2996 bool unlock_channels = false; 2997 2998 if (status == SPDK_BDEV_IO_STATUS_NOMEM) { 2999 SPDK_ERRLOG("NOMEM returned for reset\n"); 3000 } 3001 pthread_mutex_lock(&bdev->internal.mutex); 3002 if (bdev_io == bdev->internal.reset_in_progress) { 3003 bdev->internal.reset_in_progress = NULL; 3004 unlock_channels = true; 3005 } 3006 pthread_mutex_unlock(&bdev->internal.mutex); 3007 3008 if (unlock_channels) { 3009 spdk_for_each_channel(__bdev_to_io_dev(bdev), _spdk_bdev_unfreeze_channel, 3010 bdev_io, _spdk_bdev_reset_complete); 3011 return; 3012 } 3013 } else { 3014 assert(bdev_ch->io_outstanding > 0); 3015 assert(shared_resource->io_outstanding > 0); 3016 bdev_ch->io_outstanding--; 3017 shared_resource->io_outstanding--; 3018 3019 if (spdk_unlikely(status == SPDK_BDEV_IO_STATUS_NOMEM)) { 3020 TAILQ_INSERT_HEAD(&shared_resource->nomem_io, bdev_io, internal.link); 3021 /* 3022 * Wait for some of the outstanding I/O to complete before we 3023 * retry any of the nomem_io. Normally we will wait for 3024 * NOMEM_THRESHOLD_COUNT I/O to complete but for low queue 3025 * depth channels we will instead wait for half to complete. 3026 */ 3027 shared_resource->nomem_threshold = spdk_max((int64_t)shared_resource->io_outstanding / 2, 3028 (int64_t)shared_resource->io_outstanding - NOMEM_THRESHOLD_COUNT); 3029 return; 3030 } 3031 3032 if (spdk_unlikely(!TAILQ_EMPTY(&shared_resource->nomem_io))) { 3033 _spdk_bdev_ch_retry_io(bdev_ch); 3034 } 3035 } 3036 3037 _spdk_bdev_io_complete(bdev_io); 3038 } 3039 3040 void 3041 spdk_bdev_io_complete_scsi_status(struct spdk_bdev_io *bdev_io, enum spdk_scsi_status sc, 3042 enum spdk_scsi_sense sk, uint8_t asc, uint8_t ascq) 3043 { 3044 if (sc == SPDK_SCSI_STATUS_GOOD) { 3045 bdev_io->internal.status = SPDK_BDEV_IO_STATUS_SUCCESS; 3046 } else { 3047 bdev_io->internal.status = SPDK_BDEV_IO_STATUS_SCSI_ERROR; 3048 bdev_io->internal.error.scsi.sc = sc; 3049 bdev_io->internal.error.scsi.sk = sk; 3050 bdev_io->internal.error.scsi.asc = asc; 3051 bdev_io->internal.error.scsi.ascq = ascq; 3052 } 3053 3054 spdk_bdev_io_complete(bdev_io, bdev_io->internal.status); 3055 } 3056 3057 void 3058 spdk_bdev_io_get_scsi_status(const struct spdk_bdev_io *bdev_io, 3059 int *sc, int *sk, int *asc, int *ascq) 3060 { 3061 assert(sc != NULL); 3062 assert(sk != NULL); 3063 assert(asc != NULL); 3064 assert(ascq != NULL); 3065 3066 switch (bdev_io->internal.status) { 3067 case SPDK_BDEV_IO_STATUS_SUCCESS: 3068 *sc = SPDK_SCSI_STATUS_GOOD; 3069 *sk = SPDK_SCSI_SENSE_NO_SENSE; 3070 *asc = SPDK_SCSI_ASC_NO_ADDITIONAL_SENSE; 3071 *ascq = SPDK_SCSI_ASCQ_CAUSE_NOT_REPORTABLE; 3072 break; 3073 case SPDK_BDEV_IO_STATUS_NVME_ERROR: 3074 spdk_scsi_nvme_translate(bdev_io, sc, sk, asc, ascq); 3075 break; 3076 case SPDK_BDEV_IO_STATUS_SCSI_ERROR: 3077 *sc = bdev_io->internal.error.scsi.sc; 3078 *sk = bdev_io->internal.error.scsi.sk; 3079 *asc = bdev_io->internal.error.scsi.asc; 3080 *ascq = bdev_io->internal.error.scsi.ascq; 3081 break; 3082 default: 3083 *sc = SPDK_SCSI_STATUS_CHECK_CONDITION; 3084 *sk = SPDK_SCSI_SENSE_ABORTED_COMMAND; 3085 *asc = SPDK_SCSI_ASC_NO_ADDITIONAL_SENSE; 3086 *ascq = SPDK_SCSI_ASCQ_CAUSE_NOT_REPORTABLE; 3087 break; 3088 } 3089 } 3090 3091 void 3092 spdk_bdev_io_complete_nvme_status(struct spdk_bdev_io *bdev_io, int sct, int sc) 3093 { 3094 if (sct == SPDK_NVME_SCT_GENERIC && sc == SPDK_NVME_SC_SUCCESS) { 3095 bdev_io->internal.status = SPDK_BDEV_IO_STATUS_SUCCESS; 3096 } else { 3097 bdev_io->internal.error.nvme.sct = sct; 3098 bdev_io->internal.error.nvme.sc = sc; 3099 bdev_io->internal.status = SPDK_BDEV_IO_STATUS_NVME_ERROR; 3100 } 3101 3102 spdk_bdev_io_complete(bdev_io, bdev_io->internal.status); 3103 } 3104 3105 void 3106 spdk_bdev_io_get_nvme_status(const struct spdk_bdev_io *bdev_io, int *sct, int *sc) 3107 { 3108 assert(sct != NULL); 3109 assert(sc != NULL); 3110 3111 if (bdev_io->internal.status == SPDK_BDEV_IO_STATUS_NVME_ERROR) { 3112 *sct = bdev_io->internal.error.nvme.sct; 3113 *sc = bdev_io->internal.error.nvme.sc; 3114 } else if (bdev_io->internal.status == SPDK_BDEV_IO_STATUS_SUCCESS) { 3115 *sct = SPDK_NVME_SCT_GENERIC; 3116 *sc = SPDK_NVME_SC_SUCCESS; 3117 } else { 3118 *sct = SPDK_NVME_SCT_GENERIC; 3119 *sc = SPDK_NVME_SC_INTERNAL_DEVICE_ERROR; 3120 } 3121 } 3122 3123 struct spdk_thread * 3124 spdk_bdev_io_get_thread(struct spdk_bdev_io *bdev_io) 3125 { 3126 return spdk_io_channel_get_thread(bdev_io->internal.ch->channel); 3127 } 3128 3129 static void 3130 _spdk_bdev_qos_config_limit(struct spdk_bdev *bdev, uint64_t *limits) 3131 { 3132 uint64_t min_qos_set; 3133 int i; 3134 3135 for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) { 3136 if (limits[i] != SPDK_BDEV_QOS_LIMIT_NOT_DEFINED) { 3137 break; 3138 } 3139 } 3140 3141 if (i == SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES) { 3142 SPDK_ERRLOG("Invalid rate limits set.\n"); 3143 return; 3144 } 3145 3146 for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) { 3147 if (limits[i] == SPDK_BDEV_QOS_LIMIT_NOT_DEFINED) { 3148 continue; 3149 } 3150 3151 if (_spdk_bdev_qos_is_iops_rate_limit(i) == true) { 3152 min_qos_set = SPDK_BDEV_QOS_MIN_IOS_PER_SEC; 3153 } else { 3154 min_qos_set = SPDK_BDEV_QOS_MIN_BYTES_PER_SEC; 3155 } 3156 3157 if (limits[i] == 0 || limits[i] % min_qos_set) { 3158 SPDK_ERRLOG("Assigned limit %" PRIu64 " on bdev %s is not multiple of %" PRIu64 "\n", 3159 limits[i], bdev->name, min_qos_set); 3160 SPDK_ERRLOG("Failed to enable QoS on this bdev %s\n", bdev->name); 3161 return; 3162 } 3163 } 3164 3165 if (!bdev->internal.qos) { 3166 bdev->internal.qos = calloc(1, sizeof(*bdev->internal.qos)); 3167 if (!bdev->internal.qos) { 3168 SPDK_ERRLOG("Unable to allocate memory for QoS tracking\n"); 3169 return; 3170 } 3171 } 3172 3173 for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) { 3174 bdev->internal.qos->rate_limits[i].limit = limits[i]; 3175 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Bdev:%s QoS type:%d set:%lu\n", 3176 bdev->name, i, limits[i]); 3177 } 3178 3179 return; 3180 } 3181 3182 static void 3183 _spdk_bdev_qos_config(struct spdk_bdev *bdev) 3184 { 3185 struct spdk_conf_section *sp = NULL; 3186 const char *val = NULL; 3187 int i = 0, j = 0; 3188 uint64_t limits[SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES] = {}; 3189 bool config_qos = false; 3190 3191 sp = spdk_conf_find_section(NULL, "QoS"); 3192 if (!sp) { 3193 return; 3194 } 3195 3196 while (j < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES) { 3197 limits[j] = SPDK_BDEV_QOS_LIMIT_NOT_DEFINED; 3198 3199 i = 0; 3200 while (true) { 3201 val = spdk_conf_section_get_nmval(sp, qos_conf_type[j], i, 0); 3202 if (!val) { 3203 break; 3204 } 3205 3206 if (strcmp(bdev->name, val) != 0) { 3207 i++; 3208 continue; 3209 } 3210 3211 val = spdk_conf_section_get_nmval(sp, qos_conf_type[j], i, 1); 3212 if (val) { 3213 if (_spdk_bdev_qos_is_iops_rate_limit(j) == true) { 3214 limits[j] = strtoull(val, NULL, 10); 3215 } else { 3216 limits[j] = strtoull(val, NULL, 10) * 1024 * 1024; 3217 } 3218 config_qos = true; 3219 } 3220 3221 break; 3222 } 3223 3224 j++; 3225 } 3226 3227 if (config_qos == true) { 3228 _spdk_bdev_qos_config_limit(bdev, limits); 3229 } 3230 3231 return; 3232 } 3233 3234 static int 3235 spdk_bdev_init(struct spdk_bdev *bdev) 3236 { 3237 char *bdev_name; 3238 3239 assert(bdev->module != NULL); 3240 3241 if (!bdev->name) { 3242 SPDK_ERRLOG("Bdev name is NULL\n"); 3243 return -EINVAL; 3244 } 3245 3246 if (spdk_bdev_get_by_name(bdev->name)) { 3247 SPDK_ERRLOG("Bdev name:%s already exists\n", bdev->name); 3248 return -EEXIST; 3249 } 3250 3251 /* Users often register their own I/O devices using the bdev name. In 3252 * order to avoid conflicts, prepend bdev_. */ 3253 bdev_name = spdk_sprintf_alloc("bdev_%s", bdev->name); 3254 if (!bdev_name) { 3255 SPDK_ERRLOG("Unable to allocate memory for internal bdev name.\n"); 3256 return -ENOMEM; 3257 } 3258 3259 bdev->internal.status = SPDK_BDEV_STATUS_READY; 3260 bdev->internal.measured_queue_depth = UINT64_MAX; 3261 bdev->internal.claim_module = NULL; 3262 bdev->internal.qd_poller = NULL; 3263 bdev->internal.qos = NULL; 3264 3265 TAILQ_INIT(&bdev->internal.open_descs); 3266 3267 TAILQ_INIT(&bdev->aliases); 3268 3269 bdev->internal.reset_in_progress = NULL; 3270 3271 _spdk_bdev_qos_config(bdev); 3272 3273 spdk_io_device_register(__bdev_to_io_dev(bdev), 3274 spdk_bdev_channel_create, spdk_bdev_channel_destroy, 3275 sizeof(struct spdk_bdev_channel), 3276 bdev_name); 3277 3278 free(bdev_name); 3279 3280 pthread_mutex_init(&bdev->internal.mutex, NULL); 3281 return 0; 3282 } 3283 3284 static void 3285 spdk_bdev_destroy_cb(void *io_device) 3286 { 3287 int rc; 3288 struct spdk_bdev *bdev; 3289 spdk_bdev_unregister_cb cb_fn; 3290 void *cb_arg; 3291 3292 bdev = __bdev_from_io_dev(io_device); 3293 cb_fn = bdev->internal.unregister_cb; 3294 cb_arg = bdev->internal.unregister_ctx; 3295 3296 rc = bdev->fn_table->destruct(bdev->ctxt); 3297 if (rc < 0) { 3298 SPDK_ERRLOG("destruct failed\n"); 3299 } 3300 if (rc <= 0 && cb_fn != NULL) { 3301 cb_fn(cb_arg, rc); 3302 } 3303 } 3304 3305 3306 static void 3307 spdk_bdev_fini(struct spdk_bdev *bdev) 3308 { 3309 pthread_mutex_destroy(&bdev->internal.mutex); 3310 3311 free(bdev->internal.qos); 3312 3313 spdk_io_device_unregister(__bdev_to_io_dev(bdev), spdk_bdev_destroy_cb); 3314 } 3315 3316 static void 3317 spdk_bdev_start(struct spdk_bdev *bdev) 3318 { 3319 struct spdk_bdev_module *module; 3320 uint32_t action; 3321 3322 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Inserting bdev %s into list\n", bdev->name); 3323 TAILQ_INSERT_TAIL(&g_bdev_mgr.bdevs, bdev, internal.link); 3324 3325 /* Examine configuration before initializing I/O */ 3326 TAILQ_FOREACH(module, &g_bdev_mgr.bdev_modules, internal.tailq) { 3327 if (module->examine_config) { 3328 action = module->internal.action_in_progress; 3329 module->internal.action_in_progress++; 3330 module->examine_config(bdev); 3331 if (action != module->internal.action_in_progress) { 3332 SPDK_ERRLOG("examine_config for module %s did not call spdk_bdev_module_examine_done()\n", 3333 module->name); 3334 } 3335 } 3336 } 3337 3338 if (bdev->internal.claim_module) { 3339 return; 3340 } 3341 3342 TAILQ_FOREACH(module, &g_bdev_mgr.bdev_modules, internal.tailq) { 3343 if (module->examine_disk) { 3344 module->internal.action_in_progress++; 3345 module->examine_disk(bdev); 3346 } 3347 } 3348 } 3349 3350 int 3351 spdk_bdev_register(struct spdk_bdev *bdev) 3352 { 3353 int rc = spdk_bdev_init(bdev); 3354 3355 if (rc == 0) { 3356 spdk_bdev_start(bdev); 3357 } 3358 3359 return rc; 3360 } 3361 3362 int 3363 spdk_vbdev_register(struct spdk_bdev *vbdev, struct spdk_bdev **base_bdevs, int base_bdev_count) 3364 { 3365 int rc; 3366 3367 rc = spdk_bdev_init(vbdev); 3368 if (rc) { 3369 return rc; 3370 } 3371 3372 spdk_bdev_start(vbdev); 3373 return 0; 3374 } 3375 3376 void 3377 spdk_bdev_destruct_done(struct spdk_bdev *bdev, int bdeverrno) 3378 { 3379 if (bdev->internal.unregister_cb != NULL) { 3380 bdev->internal.unregister_cb(bdev->internal.unregister_ctx, bdeverrno); 3381 } 3382 } 3383 3384 static void 3385 _remove_notify(void *arg) 3386 { 3387 struct spdk_bdev_desc *desc = arg; 3388 3389 desc->remove_scheduled = false; 3390 3391 if (desc->closed) { 3392 free(desc); 3393 } else { 3394 desc->remove_cb(desc->remove_ctx); 3395 } 3396 } 3397 3398 void 3399 spdk_bdev_unregister(struct spdk_bdev *bdev, spdk_bdev_unregister_cb cb_fn, void *cb_arg) 3400 { 3401 struct spdk_bdev_desc *desc, *tmp; 3402 bool do_destruct = true; 3403 struct spdk_thread *thread; 3404 3405 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Removing bdev %s from list\n", bdev->name); 3406 3407 thread = spdk_get_thread(); 3408 if (!thread) { 3409 /* The user called this from a non-SPDK thread. */ 3410 if (cb_fn != NULL) { 3411 cb_fn(cb_arg, -ENOTSUP); 3412 } 3413 return; 3414 } 3415 3416 pthread_mutex_lock(&bdev->internal.mutex); 3417 3418 bdev->internal.status = SPDK_BDEV_STATUS_REMOVING; 3419 bdev->internal.unregister_cb = cb_fn; 3420 bdev->internal.unregister_ctx = cb_arg; 3421 3422 TAILQ_FOREACH_SAFE(desc, &bdev->internal.open_descs, link, tmp) { 3423 if (desc->remove_cb) { 3424 do_destruct = false; 3425 /* 3426 * Defer invocation of the remove_cb to a separate message that will 3427 * run later on its thread. This ensures this context unwinds and 3428 * we don't recursively unregister this bdev again if the remove_cb 3429 * immediately closes its descriptor. 3430 */ 3431 if (!desc->remove_scheduled) { 3432 /* Avoid scheduling removal of the same descriptor multiple times. */ 3433 desc->remove_scheduled = true; 3434 spdk_thread_send_msg(desc->thread, _remove_notify, desc); 3435 } 3436 } 3437 } 3438 3439 if (!do_destruct) { 3440 pthread_mutex_unlock(&bdev->internal.mutex); 3441 return; 3442 } 3443 3444 TAILQ_REMOVE(&g_bdev_mgr.bdevs, bdev, internal.link); 3445 pthread_mutex_unlock(&bdev->internal.mutex); 3446 3447 spdk_bdev_fini(bdev); 3448 } 3449 3450 int 3451 spdk_bdev_open(struct spdk_bdev *bdev, bool write, spdk_bdev_remove_cb_t remove_cb, 3452 void *remove_ctx, struct spdk_bdev_desc **_desc) 3453 { 3454 struct spdk_bdev_desc *desc; 3455 struct spdk_thread *thread; 3456 3457 thread = spdk_get_thread(); 3458 if (!thread) { 3459 SPDK_ERRLOG("Cannot open bdev from non-SPDK thread.\n"); 3460 return -ENOTSUP; 3461 } 3462 3463 desc = calloc(1, sizeof(*desc)); 3464 if (desc == NULL) { 3465 SPDK_ERRLOG("Failed to allocate memory for bdev descriptor\n"); 3466 return -ENOMEM; 3467 } 3468 3469 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Opening descriptor %p for bdev %s on thread %p\n", desc, bdev->name, 3470 spdk_get_thread()); 3471 3472 pthread_mutex_lock(&bdev->internal.mutex); 3473 3474 if (write && bdev->internal.claim_module) { 3475 SPDK_ERRLOG("Could not open %s - %s module already claimed it\n", 3476 bdev->name, bdev->internal.claim_module->name); 3477 free(desc); 3478 pthread_mutex_unlock(&bdev->internal.mutex); 3479 return -EPERM; 3480 } 3481 3482 TAILQ_INSERT_TAIL(&bdev->internal.open_descs, desc, link); 3483 3484 desc->bdev = bdev; 3485 desc->thread = thread; 3486 desc->remove_cb = remove_cb; 3487 desc->remove_ctx = remove_ctx; 3488 desc->write = write; 3489 *_desc = desc; 3490 3491 pthread_mutex_unlock(&bdev->internal.mutex); 3492 3493 return 0; 3494 } 3495 3496 void 3497 spdk_bdev_close(struct spdk_bdev_desc *desc) 3498 { 3499 struct spdk_bdev *bdev = desc->bdev; 3500 bool do_unregister = false; 3501 3502 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Closing descriptor %p for bdev %s on thread %p\n", desc, bdev->name, 3503 spdk_get_thread()); 3504 3505 assert(desc->thread == spdk_get_thread()); 3506 3507 pthread_mutex_lock(&bdev->internal.mutex); 3508 3509 TAILQ_REMOVE(&bdev->internal.open_descs, desc, link); 3510 3511 desc->closed = true; 3512 3513 if (!desc->remove_scheduled) { 3514 free(desc); 3515 } 3516 3517 /* If no more descriptors, kill QoS channel */ 3518 if (bdev->internal.qos && TAILQ_EMPTY(&bdev->internal.open_descs)) { 3519 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Closed last descriptor for bdev %s on thread %p. Stopping QoS.\n", 3520 bdev->name, spdk_get_thread()); 3521 3522 if (spdk_bdev_qos_destroy(bdev)) { 3523 /* There isn't anything we can do to recover here. Just let the 3524 * old QoS poller keep running. The QoS handling won't change 3525 * cores when the user allocates a new channel, but it won't break. */ 3526 SPDK_ERRLOG("Unable to shut down QoS poller. It will continue running on the current thread.\n"); 3527 } 3528 } 3529 3530 spdk_bdev_set_qd_sampling_period(bdev, 0); 3531 3532 if (bdev->internal.status == SPDK_BDEV_STATUS_REMOVING && TAILQ_EMPTY(&bdev->internal.open_descs)) { 3533 do_unregister = true; 3534 } 3535 pthread_mutex_unlock(&bdev->internal.mutex); 3536 3537 if (do_unregister == true) { 3538 spdk_bdev_unregister(bdev, bdev->internal.unregister_cb, bdev->internal.unregister_ctx); 3539 } 3540 } 3541 3542 int 3543 spdk_bdev_module_claim_bdev(struct spdk_bdev *bdev, struct spdk_bdev_desc *desc, 3544 struct spdk_bdev_module *module) 3545 { 3546 if (bdev->internal.claim_module != NULL) { 3547 SPDK_ERRLOG("bdev %s already claimed by module %s\n", bdev->name, 3548 bdev->internal.claim_module->name); 3549 return -EPERM; 3550 } 3551 3552 if (desc && !desc->write) { 3553 desc->write = true; 3554 } 3555 3556 bdev->internal.claim_module = module; 3557 return 0; 3558 } 3559 3560 void 3561 spdk_bdev_module_release_bdev(struct spdk_bdev *bdev) 3562 { 3563 assert(bdev->internal.claim_module != NULL); 3564 bdev->internal.claim_module = NULL; 3565 } 3566 3567 struct spdk_bdev * 3568 spdk_bdev_desc_get_bdev(struct spdk_bdev_desc *desc) 3569 { 3570 return desc->bdev; 3571 } 3572 3573 void 3574 spdk_bdev_io_get_iovec(struct spdk_bdev_io *bdev_io, struct iovec **iovp, int *iovcntp) 3575 { 3576 struct iovec *iovs; 3577 int iovcnt; 3578 3579 if (bdev_io == NULL) { 3580 return; 3581 } 3582 3583 switch (bdev_io->type) { 3584 case SPDK_BDEV_IO_TYPE_READ: 3585 iovs = bdev_io->u.bdev.iovs; 3586 iovcnt = bdev_io->u.bdev.iovcnt; 3587 break; 3588 case SPDK_BDEV_IO_TYPE_WRITE: 3589 iovs = bdev_io->u.bdev.iovs; 3590 iovcnt = bdev_io->u.bdev.iovcnt; 3591 break; 3592 default: 3593 iovs = NULL; 3594 iovcnt = 0; 3595 break; 3596 } 3597 3598 if (iovp) { 3599 *iovp = iovs; 3600 } 3601 if (iovcntp) { 3602 *iovcntp = iovcnt; 3603 } 3604 } 3605 3606 void 3607 spdk_bdev_module_list_add(struct spdk_bdev_module *bdev_module) 3608 { 3609 3610 if (spdk_bdev_module_list_find(bdev_module->name)) { 3611 SPDK_ERRLOG("ERROR: module '%s' already registered.\n", bdev_module->name); 3612 assert(false); 3613 } 3614 3615 if (bdev_module->async_init) { 3616 bdev_module->internal.action_in_progress = 1; 3617 } 3618 3619 /* 3620 * Modules with examine callbacks must be initialized first, so they are 3621 * ready to handle examine callbacks from later modules that will 3622 * register physical bdevs. 3623 */ 3624 if (bdev_module->examine_config != NULL || bdev_module->examine_disk != NULL) { 3625 TAILQ_INSERT_HEAD(&g_bdev_mgr.bdev_modules, bdev_module, internal.tailq); 3626 } else { 3627 TAILQ_INSERT_TAIL(&g_bdev_mgr.bdev_modules, bdev_module, internal.tailq); 3628 } 3629 } 3630 3631 struct spdk_bdev_module * 3632 spdk_bdev_module_list_find(const char *name) 3633 { 3634 struct spdk_bdev_module *bdev_module; 3635 3636 TAILQ_FOREACH(bdev_module, &g_bdev_mgr.bdev_modules, internal.tailq) { 3637 if (strcmp(name, bdev_module->name) == 0) { 3638 break; 3639 } 3640 } 3641 3642 return bdev_module; 3643 } 3644 3645 static void 3646 _spdk_bdev_write_zero_buffer_next(void *_bdev_io) 3647 { 3648 struct spdk_bdev_io *bdev_io = _bdev_io; 3649 uint64_t num_bytes, num_blocks; 3650 int rc; 3651 3652 num_bytes = spdk_min(spdk_bdev_get_block_size(bdev_io->bdev) * 3653 bdev_io->u.bdev.split_remaining_num_blocks, 3654 ZERO_BUFFER_SIZE); 3655 num_blocks = num_bytes / spdk_bdev_get_block_size(bdev_io->bdev); 3656 3657 rc = spdk_bdev_write_blocks(bdev_io->internal.desc, 3658 spdk_io_channel_from_ctx(bdev_io->internal.ch), 3659 g_bdev_mgr.zero_buffer, 3660 bdev_io->u.bdev.split_current_offset_blocks, num_blocks, 3661 _spdk_bdev_write_zero_buffer_done, bdev_io); 3662 if (rc == 0) { 3663 bdev_io->u.bdev.split_remaining_num_blocks -= num_blocks; 3664 bdev_io->u.bdev.split_current_offset_blocks += num_blocks; 3665 } else if (rc == -ENOMEM) { 3666 _spdk_bdev_queue_io_wait_with_cb(bdev_io, _spdk_bdev_write_zero_buffer_next); 3667 } else { 3668 bdev_io->internal.status = SPDK_BDEV_IO_STATUS_FAILED; 3669 bdev_io->internal.cb(bdev_io, false, bdev_io->internal.caller_ctx); 3670 } 3671 } 3672 3673 static void 3674 _spdk_bdev_write_zero_buffer_done(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg) 3675 { 3676 struct spdk_bdev_io *parent_io = cb_arg; 3677 3678 spdk_bdev_free_io(bdev_io); 3679 3680 if (!success) { 3681 parent_io->internal.status = SPDK_BDEV_IO_STATUS_FAILED; 3682 parent_io->internal.cb(parent_io, false, parent_io->internal.caller_ctx); 3683 return; 3684 } 3685 3686 if (parent_io->u.bdev.split_remaining_num_blocks == 0) { 3687 parent_io->internal.status = SPDK_BDEV_IO_STATUS_SUCCESS; 3688 parent_io->internal.cb(parent_io, true, parent_io->internal.caller_ctx); 3689 return; 3690 } 3691 3692 _spdk_bdev_write_zero_buffer_next(parent_io); 3693 } 3694 3695 struct set_qos_limit_ctx { 3696 void (*cb_fn)(void *cb_arg, int status); 3697 void *cb_arg; 3698 struct spdk_bdev *bdev; 3699 }; 3700 3701 static void 3702 _spdk_bdev_set_qos_limit_done(struct set_qos_limit_ctx *ctx, int status) 3703 { 3704 pthread_mutex_lock(&ctx->bdev->internal.mutex); 3705 ctx->bdev->internal.qos_mod_in_progress = false; 3706 pthread_mutex_unlock(&ctx->bdev->internal.mutex); 3707 3708 ctx->cb_fn(ctx->cb_arg, status); 3709 free(ctx); 3710 } 3711 3712 static void 3713 _spdk_bdev_disable_qos_done(void *cb_arg) 3714 { 3715 struct set_qos_limit_ctx *ctx = cb_arg; 3716 struct spdk_bdev *bdev = ctx->bdev; 3717 struct spdk_bdev_io *bdev_io; 3718 struct spdk_bdev_qos *qos; 3719 3720 pthread_mutex_lock(&bdev->internal.mutex); 3721 qos = bdev->internal.qos; 3722 bdev->internal.qos = NULL; 3723 pthread_mutex_unlock(&bdev->internal.mutex); 3724 3725 while (!TAILQ_EMPTY(&qos->queued)) { 3726 /* Send queued I/O back to their original thread for resubmission. */ 3727 bdev_io = TAILQ_FIRST(&qos->queued); 3728 TAILQ_REMOVE(&qos->queued, bdev_io, internal.link); 3729 3730 if (bdev_io->internal.io_submit_ch) { 3731 /* 3732 * Channel was changed when sending it to the QoS thread - change it back 3733 * before sending it back to the original thread. 3734 */ 3735 bdev_io->internal.ch = bdev_io->internal.io_submit_ch; 3736 bdev_io->internal.io_submit_ch = NULL; 3737 } 3738 3739 spdk_thread_send_msg(spdk_io_channel_get_thread(bdev_io->internal.ch->channel), 3740 _spdk_bdev_io_submit, bdev_io); 3741 } 3742 3743 spdk_put_io_channel(spdk_io_channel_from_ctx(qos->ch)); 3744 spdk_poller_unregister(&qos->poller); 3745 3746 free(qos); 3747 3748 _spdk_bdev_set_qos_limit_done(ctx, 0); 3749 } 3750 3751 static void 3752 _spdk_bdev_disable_qos_msg_done(struct spdk_io_channel_iter *i, int status) 3753 { 3754 void *io_device = spdk_io_channel_iter_get_io_device(i); 3755 struct spdk_bdev *bdev = __bdev_from_io_dev(io_device); 3756 struct set_qos_limit_ctx *ctx = spdk_io_channel_iter_get_ctx(i); 3757 struct spdk_thread *thread; 3758 3759 pthread_mutex_lock(&bdev->internal.mutex); 3760 thread = bdev->internal.qos->thread; 3761 pthread_mutex_unlock(&bdev->internal.mutex); 3762 3763 spdk_thread_send_msg(thread, _spdk_bdev_disable_qos_done, ctx); 3764 } 3765 3766 static void 3767 _spdk_bdev_disable_qos_msg(struct spdk_io_channel_iter *i) 3768 { 3769 struct spdk_io_channel *ch = spdk_io_channel_iter_get_channel(i); 3770 struct spdk_bdev_channel *bdev_ch = spdk_io_channel_get_ctx(ch); 3771 3772 bdev_ch->flags &= ~BDEV_CH_QOS_ENABLED; 3773 3774 spdk_for_each_channel_continue(i, 0); 3775 } 3776 3777 static void 3778 _spdk_bdev_update_qos_rate_limit_msg(void *cb_arg) 3779 { 3780 struct set_qos_limit_ctx *ctx = cb_arg; 3781 struct spdk_bdev *bdev = ctx->bdev; 3782 3783 pthread_mutex_lock(&bdev->internal.mutex); 3784 spdk_bdev_qos_update_max_quota_per_timeslice(bdev->internal.qos); 3785 pthread_mutex_unlock(&bdev->internal.mutex); 3786 3787 _spdk_bdev_set_qos_limit_done(ctx, 0); 3788 } 3789 3790 static void 3791 _spdk_bdev_enable_qos_msg(struct spdk_io_channel_iter *i) 3792 { 3793 void *io_device = spdk_io_channel_iter_get_io_device(i); 3794 struct spdk_bdev *bdev = __bdev_from_io_dev(io_device); 3795 struct spdk_io_channel *ch = spdk_io_channel_iter_get_channel(i); 3796 struct spdk_bdev_channel *bdev_ch = spdk_io_channel_get_ctx(ch); 3797 3798 pthread_mutex_lock(&bdev->internal.mutex); 3799 _spdk_bdev_enable_qos(bdev, bdev_ch); 3800 pthread_mutex_unlock(&bdev->internal.mutex); 3801 spdk_for_each_channel_continue(i, 0); 3802 } 3803 3804 static void 3805 _spdk_bdev_enable_qos_done(struct spdk_io_channel_iter *i, int status) 3806 { 3807 struct set_qos_limit_ctx *ctx = spdk_io_channel_iter_get_ctx(i); 3808 3809 _spdk_bdev_set_qos_limit_done(ctx, status); 3810 } 3811 3812 static void 3813 _spdk_bdev_set_qos_rate_limits(struct spdk_bdev *bdev, uint64_t *limits) 3814 { 3815 int i; 3816 3817 assert(bdev->internal.qos != NULL); 3818 3819 for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) { 3820 if (limits[i] != SPDK_BDEV_QOS_LIMIT_NOT_DEFINED) { 3821 bdev->internal.qos->rate_limits[i].limit = limits[i]; 3822 3823 if (limits[i] == 0) { 3824 bdev->internal.qos->rate_limits[i].limit = 3825 SPDK_BDEV_QOS_LIMIT_NOT_DEFINED; 3826 } 3827 } 3828 } 3829 } 3830 3831 void 3832 spdk_bdev_set_qos_rate_limits(struct spdk_bdev *bdev, uint64_t *limits, 3833 void (*cb_fn)(void *cb_arg, int status), void *cb_arg) 3834 { 3835 struct set_qos_limit_ctx *ctx; 3836 uint32_t limit_set_complement; 3837 uint64_t min_limit_per_sec; 3838 int i; 3839 bool disable_rate_limit = true; 3840 3841 for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) { 3842 if (limits[i] == SPDK_BDEV_QOS_LIMIT_NOT_DEFINED) { 3843 continue; 3844 } 3845 3846 if (limits[i] > 0) { 3847 disable_rate_limit = false; 3848 } 3849 3850 if (_spdk_bdev_qos_is_iops_rate_limit(i) == true) { 3851 min_limit_per_sec = SPDK_BDEV_QOS_MIN_IOS_PER_SEC; 3852 } else { 3853 /* Change from megabyte to byte rate limit */ 3854 limits[i] = limits[i] * 1024 * 1024; 3855 min_limit_per_sec = SPDK_BDEV_QOS_MIN_BYTES_PER_SEC; 3856 } 3857 3858 limit_set_complement = limits[i] % min_limit_per_sec; 3859 if (limit_set_complement) { 3860 SPDK_ERRLOG("Requested rate limit %" PRIu64 " is not a multiple of %" PRIu64 "\n", 3861 limits[i], min_limit_per_sec); 3862 limits[i] += min_limit_per_sec - limit_set_complement; 3863 SPDK_ERRLOG("Round up the rate limit to %" PRIu64 "\n", limits[i]); 3864 } 3865 } 3866 3867 ctx = calloc(1, sizeof(*ctx)); 3868 if (ctx == NULL) { 3869 cb_fn(cb_arg, -ENOMEM); 3870 return; 3871 } 3872 3873 ctx->cb_fn = cb_fn; 3874 ctx->cb_arg = cb_arg; 3875 ctx->bdev = bdev; 3876 3877 pthread_mutex_lock(&bdev->internal.mutex); 3878 if (bdev->internal.qos_mod_in_progress) { 3879 pthread_mutex_unlock(&bdev->internal.mutex); 3880 free(ctx); 3881 cb_fn(cb_arg, -EAGAIN); 3882 return; 3883 } 3884 bdev->internal.qos_mod_in_progress = true; 3885 3886 if (disable_rate_limit == true && bdev->internal.qos) { 3887 for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) { 3888 if (limits[i] == SPDK_BDEV_QOS_LIMIT_NOT_DEFINED && 3889 (bdev->internal.qos->rate_limits[i].limit > 0 && 3890 bdev->internal.qos->rate_limits[i].limit != 3891 SPDK_BDEV_QOS_LIMIT_NOT_DEFINED)) { 3892 disable_rate_limit = false; 3893 break; 3894 } 3895 } 3896 } 3897 3898 if (disable_rate_limit == false) { 3899 if (bdev->internal.qos == NULL) { 3900 /* Enabling */ 3901 bdev->internal.qos = calloc(1, sizeof(*bdev->internal.qos)); 3902 if (!bdev->internal.qos) { 3903 pthread_mutex_unlock(&bdev->internal.mutex); 3904 SPDK_ERRLOG("Unable to allocate memory for QoS tracking\n"); 3905 free(ctx); 3906 cb_fn(cb_arg, -ENOMEM); 3907 return; 3908 } 3909 3910 _spdk_bdev_set_qos_rate_limits(bdev, limits); 3911 3912 spdk_for_each_channel(__bdev_to_io_dev(bdev), 3913 _spdk_bdev_enable_qos_msg, ctx, 3914 _spdk_bdev_enable_qos_done); 3915 } else { 3916 /* Updating */ 3917 _spdk_bdev_set_qos_rate_limits(bdev, limits); 3918 3919 spdk_thread_send_msg(bdev->internal.qos->thread, 3920 _spdk_bdev_update_qos_rate_limit_msg, ctx); 3921 } 3922 } else { 3923 if (bdev->internal.qos != NULL) { 3924 _spdk_bdev_set_qos_rate_limits(bdev, limits); 3925 3926 /* Disabling */ 3927 spdk_for_each_channel(__bdev_to_io_dev(bdev), 3928 _spdk_bdev_disable_qos_msg, ctx, 3929 _spdk_bdev_disable_qos_msg_done); 3930 } else { 3931 pthread_mutex_unlock(&bdev->internal.mutex); 3932 _spdk_bdev_set_qos_limit_done(ctx, 0); 3933 return; 3934 } 3935 } 3936 3937 pthread_mutex_unlock(&bdev->internal.mutex); 3938 } 3939 3940 SPDK_LOG_REGISTER_COMPONENT("bdev", SPDK_LOG_BDEV) 3941 3942 SPDK_TRACE_REGISTER_FN(bdev_trace) 3943 { 3944 spdk_trace_register_owner(OWNER_BDEV, 'b'); 3945 spdk_trace_register_object(OBJECT_BDEV_IO, 'i'); 3946 spdk_trace_register_description("BDEV_IO_START", "", TRACE_BDEV_IO_START, OWNER_BDEV, 3947 OBJECT_BDEV_IO, 1, 0, "type: "); 3948 spdk_trace_register_description("BDEV_IO_DONE", "", TRACE_BDEV_IO_DONE, OWNER_BDEV, 3949 OBJECT_BDEV_IO, 0, 0, ""); 3950 } 3951