1 /*- 2 * BSD LICENSE 3 * 4 * Copyright (c) Intel Corporation. 5 * All rights reserved. 6 * 7 * Redistribution and use in source and binary forms, with or without 8 * modification, are permitted provided that the following conditions 9 * are met: 10 * 11 * * Redistributions of source code must retain the above copyright 12 * notice, this list of conditions and the following disclaimer. 13 * * Redistributions in binary form must reproduce the above copyright 14 * notice, this list of conditions and the following disclaimer in 15 * the documentation and/or other materials provided with the 16 * distribution. 17 * * Neither the name of Intel Corporation nor the names of its 18 * contributors may be used to endorse or promote products derived 19 * from this software without specific prior written permission. 20 * 21 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 22 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 23 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR 24 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT 25 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, 26 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT 27 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 28 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY 29 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 30 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE 31 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 32 */ 33 34 #include "spdk/stdinc.h" 35 36 #include "spdk/bdev.h" 37 #include "spdk/conf.h" 38 39 #include "spdk/config.h" 40 #include "spdk/env.h" 41 #include "spdk/event.h" 42 #include "spdk/thread.h" 43 #include "spdk/likely.h" 44 #include "spdk/queue.h" 45 #include "spdk/nvme_spec.h" 46 #include "spdk/scsi_spec.h" 47 #include "spdk/util.h" 48 #include "spdk/trace.h" 49 50 #include "spdk/bdev_module.h" 51 #include "spdk_internal/log.h" 52 #include "spdk/string.h" 53 54 #ifdef SPDK_CONFIG_VTUNE 55 #include "ittnotify.h" 56 #include "ittnotify_types.h" 57 int __itt_init_ittlib(const char *, __itt_group_id); 58 #endif 59 60 #define SPDK_BDEV_IO_POOL_SIZE (64 * 1024) 61 #define SPDK_BDEV_IO_CACHE_SIZE 256 62 #define BUF_SMALL_POOL_SIZE 8192 63 #define BUF_LARGE_POOL_SIZE 1024 64 #define NOMEM_THRESHOLD_COUNT 8 65 #define ZERO_BUFFER_SIZE 0x100000 66 67 #define OWNER_BDEV 0x2 68 69 #define OBJECT_BDEV_IO 0x2 70 71 #define TRACE_GROUP_BDEV 0x3 72 #define TRACE_BDEV_IO_START SPDK_TPOINT_ID(TRACE_GROUP_BDEV, 0x0) 73 #define TRACE_BDEV_IO_DONE SPDK_TPOINT_ID(TRACE_GROUP_BDEV, 0x1) 74 75 #define SPDK_BDEV_QOS_TIMESLICE_IN_USEC 1000 76 #define SPDK_BDEV_QOS_MIN_IO_PER_TIMESLICE 1 77 #define SPDK_BDEV_QOS_MIN_BYTE_PER_TIMESLICE 512 78 #define SPDK_BDEV_QOS_MIN_IOS_PER_SEC 10000 79 #define SPDK_BDEV_QOS_MIN_BYTES_PER_SEC (10 * 1024 * 1024) 80 #define SPDK_BDEV_QOS_LIMIT_NOT_DEFINED UINT64_MAX 81 82 #define SPDK_BDEV_POOL_ALIGNMENT 512 83 84 static const char *qos_conf_type[] = {"Limit_IOPS", "Limit_BPS"}; 85 static const char *qos_rpc_type[] = {"rw_ios_per_sec", "rw_mbytes_per_sec"}; 86 87 TAILQ_HEAD(spdk_bdev_list, spdk_bdev); 88 89 struct spdk_bdev_mgr { 90 struct spdk_mempool *bdev_io_pool; 91 92 struct spdk_mempool *buf_small_pool; 93 struct spdk_mempool *buf_large_pool; 94 95 void *zero_buffer; 96 97 TAILQ_HEAD(bdev_module_list, spdk_bdev_module) bdev_modules; 98 99 struct spdk_bdev_list bdevs; 100 101 bool init_complete; 102 bool module_init_complete; 103 104 #ifdef SPDK_CONFIG_VTUNE 105 __itt_domain *domain; 106 #endif 107 }; 108 109 static struct spdk_bdev_mgr g_bdev_mgr = { 110 .bdev_modules = TAILQ_HEAD_INITIALIZER(g_bdev_mgr.bdev_modules), 111 .bdevs = TAILQ_HEAD_INITIALIZER(g_bdev_mgr.bdevs), 112 .init_complete = false, 113 .module_init_complete = false, 114 }; 115 116 static struct spdk_bdev_opts g_bdev_opts = { 117 .bdev_io_pool_size = SPDK_BDEV_IO_POOL_SIZE, 118 .bdev_io_cache_size = SPDK_BDEV_IO_CACHE_SIZE, 119 }; 120 121 static spdk_bdev_init_cb g_init_cb_fn = NULL; 122 static void *g_init_cb_arg = NULL; 123 124 static spdk_bdev_fini_cb g_fini_cb_fn = NULL; 125 static void *g_fini_cb_arg = NULL; 126 static struct spdk_thread *g_fini_thread = NULL; 127 128 struct spdk_bdev_qos_limit { 129 /** IOs or bytes allowed per second (i.e., 1s). */ 130 uint64_t limit; 131 132 /** Remaining IOs or bytes allowed in current timeslice (e.g., 1ms). 133 * For remaining bytes, allowed to run negative if an I/O is submitted when 134 * some bytes are remaining, but the I/O is bigger than that amount. The 135 * excess will be deducted from the next timeslice. 136 */ 137 int64_t remaining_this_timeslice; 138 139 /** Minimum allowed IOs or bytes to be issued in one timeslice (e.g., 1ms). */ 140 uint32_t min_per_timeslice; 141 142 /** Maximum allowed IOs or bytes to be issued in one timeslice (e.g., 1ms). */ 143 uint32_t max_per_timeslice; 144 }; 145 146 struct spdk_bdev_qos { 147 /** Types of structure of rate limits. */ 148 struct spdk_bdev_qos_limit rate_limits[SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES]; 149 150 /** The channel that all I/O are funneled through. */ 151 struct spdk_bdev_channel *ch; 152 153 /** The thread on which the poller is running. */ 154 struct spdk_thread *thread; 155 156 /** Queue of I/O waiting to be issued. */ 157 bdev_io_tailq_t queued; 158 159 /** Size of a timeslice in tsc ticks. */ 160 uint64_t timeslice_size; 161 162 /** Timestamp of start of last timeslice. */ 163 uint64_t last_timeslice; 164 165 /** Poller that processes queued I/O commands each time slice. */ 166 struct spdk_poller *poller; 167 }; 168 169 struct spdk_bdev_mgmt_channel { 170 bdev_io_stailq_t need_buf_small; 171 bdev_io_stailq_t need_buf_large; 172 173 /* 174 * Each thread keeps a cache of bdev_io - this allows 175 * bdev threads which are *not* DPDK threads to still 176 * benefit from a per-thread bdev_io cache. Without 177 * this, non-DPDK threads fetching from the mempool 178 * incur a cmpxchg on get and put. 179 */ 180 bdev_io_stailq_t per_thread_cache; 181 uint32_t per_thread_cache_count; 182 uint32_t bdev_io_cache_size; 183 184 TAILQ_HEAD(, spdk_bdev_shared_resource) shared_resources; 185 TAILQ_HEAD(, spdk_bdev_io_wait_entry) io_wait_queue; 186 }; 187 188 /* 189 * Per-module (or per-io_device) data. Multiple bdevs built on the same io_device 190 * will queue here their IO that awaits retry. It makes it possible to retry sending 191 * IO to one bdev after IO from other bdev completes. 192 */ 193 struct spdk_bdev_shared_resource { 194 /* The bdev management channel */ 195 struct spdk_bdev_mgmt_channel *mgmt_ch; 196 197 /* 198 * Count of I/O submitted to bdev module and waiting for completion. 199 * Incremented before submit_request() is called on an spdk_bdev_io. 200 */ 201 uint64_t io_outstanding; 202 203 /* 204 * Queue of IO awaiting retry because of a previous NOMEM status returned 205 * on this channel. 206 */ 207 bdev_io_tailq_t nomem_io; 208 209 /* 210 * Threshold which io_outstanding must drop to before retrying nomem_io. 211 */ 212 uint64_t nomem_threshold; 213 214 /* I/O channel allocated by a bdev module */ 215 struct spdk_io_channel *shared_ch; 216 217 /* Refcount of bdev channels using this resource */ 218 uint32_t ref; 219 220 TAILQ_ENTRY(spdk_bdev_shared_resource) link; 221 }; 222 223 #define BDEV_CH_RESET_IN_PROGRESS (1 << 0) 224 #define BDEV_CH_QOS_ENABLED (1 << 1) 225 226 struct spdk_bdev_channel { 227 struct spdk_bdev *bdev; 228 229 /* The channel for the underlying device */ 230 struct spdk_io_channel *channel; 231 232 /* Per io_device per thread data */ 233 struct spdk_bdev_shared_resource *shared_resource; 234 235 struct spdk_bdev_io_stat stat; 236 237 /* 238 * Count of I/O submitted through this channel and waiting for completion. 239 * Incremented before submit_request() is called on an spdk_bdev_io. 240 */ 241 uint64_t io_outstanding; 242 243 bdev_io_tailq_t queued_resets; 244 245 uint32_t flags; 246 247 #ifdef SPDK_CONFIG_VTUNE 248 uint64_t start_tsc; 249 uint64_t interval_tsc; 250 __itt_string_handle *handle; 251 struct spdk_bdev_io_stat prev_stat; 252 #endif 253 254 }; 255 256 struct spdk_bdev_desc { 257 struct spdk_bdev *bdev; 258 struct spdk_thread *thread; 259 spdk_bdev_remove_cb_t remove_cb; 260 void *remove_ctx; 261 bool remove_scheduled; 262 bool closed; 263 bool write; 264 TAILQ_ENTRY(spdk_bdev_desc) link; 265 }; 266 267 struct spdk_bdev_iostat_ctx { 268 struct spdk_bdev_io_stat *stat; 269 spdk_bdev_get_device_stat_cb cb; 270 void *cb_arg; 271 }; 272 273 #define __bdev_to_io_dev(bdev) (((char *)bdev) + 1) 274 #define __bdev_from_io_dev(io_dev) ((struct spdk_bdev *)(((char *)io_dev) - 1)) 275 276 static void _spdk_bdev_write_zero_buffer_done(struct spdk_bdev_io *bdev_io, bool success, 277 void *cb_arg); 278 static void _spdk_bdev_write_zero_buffer_next(void *_bdev_io); 279 280 void 281 spdk_bdev_get_opts(struct spdk_bdev_opts *opts) 282 { 283 *opts = g_bdev_opts; 284 } 285 286 int 287 spdk_bdev_set_opts(struct spdk_bdev_opts *opts) 288 { 289 uint32_t min_pool_size; 290 291 /* 292 * Add 1 to the thread count to account for the extra mgmt_ch that gets created during subsystem 293 * initialization. A second mgmt_ch will be created on the same thread when the application starts 294 * but before the deferred put_io_channel event is executed for the first mgmt_ch. 295 */ 296 min_pool_size = opts->bdev_io_cache_size * (spdk_thread_get_count() + 1); 297 if (opts->bdev_io_pool_size < min_pool_size) { 298 SPDK_ERRLOG("bdev_io_pool_size %" PRIu32 " is not compatible with bdev_io_cache_size %" PRIu32 299 " and %" PRIu32 " threads\n", opts->bdev_io_pool_size, opts->bdev_io_cache_size, 300 spdk_thread_get_count()); 301 SPDK_ERRLOG("bdev_io_pool_size must be at least %" PRIu32 "\n", min_pool_size); 302 return -1; 303 } 304 305 g_bdev_opts = *opts; 306 return 0; 307 } 308 309 struct spdk_bdev * 310 spdk_bdev_first(void) 311 { 312 struct spdk_bdev *bdev; 313 314 bdev = TAILQ_FIRST(&g_bdev_mgr.bdevs); 315 if (bdev) { 316 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Starting bdev iteration at %s\n", bdev->name); 317 } 318 319 return bdev; 320 } 321 322 struct spdk_bdev * 323 spdk_bdev_next(struct spdk_bdev *prev) 324 { 325 struct spdk_bdev *bdev; 326 327 bdev = TAILQ_NEXT(prev, internal.link); 328 if (bdev) { 329 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Continuing bdev iteration at %s\n", bdev->name); 330 } 331 332 return bdev; 333 } 334 335 static struct spdk_bdev * 336 _bdev_next_leaf(struct spdk_bdev *bdev) 337 { 338 while (bdev != NULL) { 339 if (bdev->internal.claim_module == NULL) { 340 return bdev; 341 } else { 342 bdev = TAILQ_NEXT(bdev, internal.link); 343 } 344 } 345 346 return bdev; 347 } 348 349 struct spdk_bdev * 350 spdk_bdev_first_leaf(void) 351 { 352 struct spdk_bdev *bdev; 353 354 bdev = _bdev_next_leaf(TAILQ_FIRST(&g_bdev_mgr.bdevs)); 355 356 if (bdev) { 357 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Starting bdev iteration at %s\n", bdev->name); 358 } 359 360 return bdev; 361 } 362 363 struct spdk_bdev * 364 spdk_bdev_next_leaf(struct spdk_bdev *prev) 365 { 366 struct spdk_bdev *bdev; 367 368 bdev = _bdev_next_leaf(TAILQ_NEXT(prev, internal.link)); 369 370 if (bdev) { 371 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Continuing bdev iteration at %s\n", bdev->name); 372 } 373 374 return bdev; 375 } 376 377 struct spdk_bdev * 378 spdk_bdev_get_by_name(const char *bdev_name) 379 { 380 struct spdk_bdev_alias *tmp; 381 struct spdk_bdev *bdev = spdk_bdev_first(); 382 383 while (bdev != NULL) { 384 if (strcmp(bdev_name, bdev->name) == 0) { 385 return bdev; 386 } 387 388 TAILQ_FOREACH(tmp, &bdev->aliases, tailq) { 389 if (strcmp(bdev_name, tmp->alias) == 0) { 390 return bdev; 391 } 392 } 393 394 bdev = spdk_bdev_next(bdev); 395 } 396 397 return NULL; 398 } 399 400 void 401 spdk_bdev_io_set_buf(struct spdk_bdev_io *bdev_io, void *buf, size_t len) 402 { 403 struct iovec *iovs; 404 405 iovs = bdev_io->u.bdev.iovs; 406 407 assert(iovs != NULL); 408 assert(bdev_io->u.bdev.iovcnt >= 1); 409 410 iovs[0].iov_base = buf; 411 iovs[0].iov_len = len; 412 } 413 414 static void 415 spdk_bdev_io_put_buf(struct spdk_bdev_io *bdev_io) 416 { 417 struct spdk_mempool *pool; 418 struct spdk_bdev_io *tmp; 419 void *buf, *aligned_buf; 420 bdev_io_stailq_t *stailq; 421 struct spdk_bdev_mgmt_channel *ch; 422 uint64_t buf_len; 423 uint64_t alignment; 424 425 assert(bdev_io->u.bdev.iovcnt == 1); 426 427 buf = bdev_io->internal.buf; 428 buf_len = bdev_io->internal.buf_len; 429 alignment = spdk_bdev_get_buf_align(bdev_io->bdev); 430 ch = bdev_io->internal.ch->shared_resource->mgmt_ch; 431 432 bdev_io->internal.buf = NULL; 433 434 if (buf_len + alignment <= SPDK_BDEV_SMALL_BUF_MAX_SIZE + SPDK_BDEV_POOL_ALIGNMENT) { 435 pool = g_bdev_mgr.buf_small_pool; 436 stailq = &ch->need_buf_small; 437 } else { 438 pool = g_bdev_mgr.buf_large_pool; 439 stailq = &ch->need_buf_large; 440 } 441 442 if (STAILQ_EMPTY(stailq)) { 443 spdk_mempool_put(pool, buf); 444 } else { 445 tmp = STAILQ_FIRST(stailq); 446 447 alignment = spdk_bdev_get_buf_align(tmp->bdev); 448 aligned_buf = (void *)(((uintptr_t)buf + 449 (alignment - 1)) & ~(alignment - 1)); 450 spdk_bdev_io_set_buf(tmp, aligned_buf, tmp->internal.buf_len); 451 452 STAILQ_REMOVE_HEAD(stailq, internal.buf_link); 453 tmp->internal.buf = buf; 454 tmp->internal.get_buf_cb(tmp->internal.ch->channel, tmp); 455 } 456 } 457 458 void 459 spdk_bdev_io_get_buf(struct spdk_bdev_io *bdev_io, spdk_bdev_io_get_buf_cb cb, uint64_t len) 460 { 461 struct spdk_mempool *pool; 462 bdev_io_stailq_t *stailq; 463 void *buf, *aligned_buf; 464 struct spdk_bdev_mgmt_channel *mgmt_ch; 465 uint64_t alignment; 466 467 assert(cb != NULL); 468 assert(bdev_io->u.bdev.iovs != NULL); 469 470 alignment = spdk_bdev_get_buf_align(bdev_io->bdev); 471 472 if (spdk_unlikely(bdev_io->u.bdev.iovs[0].iov_base != NULL)) { 473 /* Buffer already present */ 474 cb(bdev_io->internal.ch->channel, bdev_io); 475 return; 476 } 477 478 assert(len + alignment <= SPDK_BDEV_LARGE_BUF_MAX_SIZE + SPDK_BDEV_POOL_ALIGNMENT); 479 mgmt_ch = bdev_io->internal.ch->shared_resource->mgmt_ch; 480 481 bdev_io->internal.buf_len = len; 482 bdev_io->internal.get_buf_cb = cb; 483 484 if (len + alignment <= SPDK_BDEV_SMALL_BUF_MAX_SIZE + SPDK_BDEV_POOL_ALIGNMENT) { 485 pool = g_bdev_mgr.buf_small_pool; 486 stailq = &mgmt_ch->need_buf_small; 487 } else { 488 pool = g_bdev_mgr.buf_large_pool; 489 stailq = &mgmt_ch->need_buf_large; 490 } 491 492 buf = spdk_mempool_get(pool); 493 494 if (!buf) { 495 STAILQ_INSERT_TAIL(stailq, bdev_io, internal.buf_link); 496 } else { 497 aligned_buf = (void *)(((uintptr_t)buf + (alignment - 1)) & ~(alignment - 1)); 498 499 spdk_bdev_io_set_buf(bdev_io, aligned_buf, len); 500 501 bdev_io->internal.buf = buf; 502 bdev_io->internal.get_buf_cb(bdev_io->internal.ch->channel, bdev_io); 503 } 504 } 505 506 static int 507 spdk_bdev_module_get_max_ctx_size(void) 508 { 509 struct spdk_bdev_module *bdev_module; 510 int max_bdev_module_size = 0; 511 512 TAILQ_FOREACH(bdev_module, &g_bdev_mgr.bdev_modules, internal.tailq) { 513 if (bdev_module->get_ctx_size && bdev_module->get_ctx_size() > max_bdev_module_size) { 514 max_bdev_module_size = bdev_module->get_ctx_size(); 515 } 516 } 517 518 return max_bdev_module_size; 519 } 520 521 void 522 spdk_bdev_config_text(FILE *fp) 523 { 524 struct spdk_bdev_module *bdev_module; 525 526 TAILQ_FOREACH(bdev_module, &g_bdev_mgr.bdev_modules, internal.tailq) { 527 if (bdev_module->config_text) { 528 bdev_module->config_text(fp); 529 } 530 } 531 } 532 533 static void 534 spdk_bdev_qos_config_json(struct spdk_bdev *bdev, struct spdk_json_write_ctx *w) 535 { 536 int i; 537 struct spdk_bdev_qos *qos = bdev->internal.qos; 538 uint64_t limits[SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES]; 539 540 if (!qos) { 541 return; 542 } 543 544 spdk_bdev_get_qos_rate_limits(bdev, limits); 545 546 spdk_json_write_object_begin(w); 547 spdk_json_write_named_string(w, "method", "set_bdev_qos_limit"); 548 spdk_json_write_name(w, "params"); 549 550 spdk_json_write_object_begin(w); 551 spdk_json_write_named_string(w, "name", bdev->name); 552 for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) { 553 if (limits[i] > 0) { 554 spdk_json_write_named_uint64(w, qos_rpc_type[i], limits[i]); 555 } 556 } 557 spdk_json_write_object_end(w); 558 559 spdk_json_write_object_end(w); 560 } 561 562 void 563 spdk_bdev_subsystem_config_json(struct spdk_json_write_ctx *w) 564 { 565 struct spdk_bdev_module *bdev_module; 566 struct spdk_bdev *bdev; 567 568 assert(w != NULL); 569 570 spdk_json_write_array_begin(w); 571 572 spdk_json_write_object_begin(w); 573 spdk_json_write_named_string(w, "method", "set_bdev_options"); 574 spdk_json_write_name(w, "params"); 575 spdk_json_write_object_begin(w); 576 spdk_json_write_named_uint32(w, "bdev_io_pool_size", g_bdev_opts.bdev_io_pool_size); 577 spdk_json_write_named_uint32(w, "bdev_io_cache_size", g_bdev_opts.bdev_io_cache_size); 578 spdk_json_write_object_end(w); 579 spdk_json_write_object_end(w); 580 581 TAILQ_FOREACH(bdev_module, &g_bdev_mgr.bdev_modules, internal.tailq) { 582 if (bdev_module->config_json) { 583 bdev_module->config_json(w); 584 } 585 } 586 587 TAILQ_FOREACH(bdev, &g_bdev_mgr.bdevs, internal.link) { 588 spdk_bdev_qos_config_json(bdev, w); 589 590 if (bdev->fn_table->write_config_json) { 591 bdev->fn_table->write_config_json(bdev, w); 592 } 593 } 594 595 spdk_json_write_array_end(w); 596 } 597 598 static int 599 spdk_bdev_mgmt_channel_create(void *io_device, void *ctx_buf) 600 { 601 struct spdk_bdev_mgmt_channel *ch = ctx_buf; 602 struct spdk_bdev_io *bdev_io; 603 uint32_t i; 604 605 STAILQ_INIT(&ch->need_buf_small); 606 STAILQ_INIT(&ch->need_buf_large); 607 608 STAILQ_INIT(&ch->per_thread_cache); 609 ch->bdev_io_cache_size = g_bdev_opts.bdev_io_cache_size; 610 611 /* Pre-populate bdev_io cache to ensure this thread cannot be starved. */ 612 ch->per_thread_cache_count = 0; 613 for (i = 0; i < ch->bdev_io_cache_size; i++) { 614 bdev_io = spdk_mempool_get(g_bdev_mgr.bdev_io_pool); 615 assert(bdev_io != NULL); 616 ch->per_thread_cache_count++; 617 STAILQ_INSERT_TAIL(&ch->per_thread_cache, bdev_io, internal.buf_link); 618 } 619 620 TAILQ_INIT(&ch->shared_resources); 621 TAILQ_INIT(&ch->io_wait_queue); 622 623 return 0; 624 } 625 626 static void 627 spdk_bdev_mgmt_channel_destroy(void *io_device, void *ctx_buf) 628 { 629 struct spdk_bdev_mgmt_channel *ch = ctx_buf; 630 struct spdk_bdev_io *bdev_io; 631 632 if (!STAILQ_EMPTY(&ch->need_buf_small) || !STAILQ_EMPTY(&ch->need_buf_large)) { 633 SPDK_ERRLOG("Pending I/O list wasn't empty on mgmt channel free\n"); 634 } 635 636 if (!TAILQ_EMPTY(&ch->shared_resources)) { 637 SPDK_ERRLOG("Module channel list wasn't empty on mgmt channel free\n"); 638 } 639 640 while (!STAILQ_EMPTY(&ch->per_thread_cache)) { 641 bdev_io = STAILQ_FIRST(&ch->per_thread_cache); 642 STAILQ_REMOVE_HEAD(&ch->per_thread_cache, internal.buf_link); 643 ch->per_thread_cache_count--; 644 spdk_mempool_put(g_bdev_mgr.bdev_io_pool, (void *)bdev_io); 645 } 646 647 assert(ch->per_thread_cache_count == 0); 648 } 649 650 static void 651 spdk_bdev_init_complete(int rc) 652 { 653 spdk_bdev_init_cb cb_fn = g_init_cb_fn; 654 void *cb_arg = g_init_cb_arg; 655 struct spdk_bdev_module *m; 656 657 g_bdev_mgr.init_complete = true; 658 g_init_cb_fn = NULL; 659 g_init_cb_arg = NULL; 660 661 /* 662 * For modules that need to know when subsystem init is complete, 663 * inform them now. 664 */ 665 if (rc == 0) { 666 TAILQ_FOREACH(m, &g_bdev_mgr.bdev_modules, internal.tailq) { 667 if (m->init_complete) { 668 m->init_complete(); 669 } 670 } 671 } 672 673 cb_fn(cb_arg, rc); 674 } 675 676 static void 677 spdk_bdev_module_action_complete(void) 678 { 679 struct spdk_bdev_module *m; 680 681 /* 682 * Don't finish bdev subsystem initialization if 683 * module pre-initialization is still in progress, or 684 * the subsystem been already initialized. 685 */ 686 if (!g_bdev_mgr.module_init_complete || g_bdev_mgr.init_complete) { 687 return; 688 } 689 690 /* 691 * Check all bdev modules for inits/examinations in progress. If any 692 * exist, return immediately since we cannot finish bdev subsystem 693 * initialization until all are completed. 694 */ 695 TAILQ_FOREACH(m, &g_bdev_mgr.bdev_modules, internal.tailq) { 696 if (m->internal.action_in_progress > 0) { 697 return; 698 } 699 } 700 701 /* 702 * Modules already finished initialization - now that all 703 * the bdev modules have finished their asynchronous I/O 704 * processing, the entire bdev layer can be marked as complete. 705 */ 706 spdk_bdev_init_complete(0); 707 } 708 709 static void 710 spdk_bdev_module_action_done(struct spdk_bdev_module *module) 711 { 712 assert(module->internal.action_in_progress > 0); 713 module->internal.action_in_progress--; 714 spdk_bdev_module_action_complete(); 715 } 716 717 void 718 spdk_bdev_module_init_done(struct spdk_bdev_module *module) 719 { 720 spdk_bdev_module_action_done(module); 721 } 722 723 void 724 spdk_bdev_module_examine_done(struct spdk_bdev_module *module) 725 { 726 spdk_bdev_module_action_done(module); 727 } 728 729 /** The last initialized bdev module */ 730 static struct spdk_bdev_module *g_resume_bdev_module = NULL; 731 732 static int 733 spdk_bdev_modules_init(void) 734 { 735 struct spdk_bdev_module *module; 736 int rc = 0; 737 738 TAILQ_FOREACH(module, &g_bdev_mgr.bdev_modules, internal.tailq) { 739 g_resume_bdev_module = module; 740 rc = module->module_init(); 741 if (rc != 0) { 742 return rc; 743 } 744 } 745 746 g_resume_bdev_module = NULL; 747 return 0; 748 } 749 750 751 static void 752 spdk_bdev_init_failed_complete(void *cb_arg) 753 { 754 spdk_bdev_init_complete(-1); 755 } 756 757 static void 758 spdk_bdev_init_failed(void *cb_arg) 759 { 760 spdk_bdev_finish(spdk_bdev_init_failed_complete, NULL); 761 } 762 763 void 764 spdk_bdev_initialize(spdk_bdev_init_cb cb_fn, void *cb_arg) 765 { 766 struct spdk_conf_section *sp; 767 struct spdk_bdev_opts bdev_opts; 768 int32_t bdev_io_pool_size, bdev_io_cache_size; 769 int cache_size; 770 int rc = 0; 771 char mempool_name[32]; 772 773 assert(cb_fn != NULL); 774 775 sp = spdk_conf_find_section(NULL, "Bdev"); 776 if (sp != NULL) { 777 spdk_bdev_get_opts(&bdev_opts); 778 779 bdev_io_pool_size = spdk_conf_section_get_intval(sp, "BdevIoPoolSize"); 780 if (bdev_io_pool_size >= 0) { 781 bdev_opts.bdev_io_pool_size = bdev_io_pool_size; 782 } 783 784 bdev_io_cache_size = spdk_conf_section_get_intval(sp, "BdevIoCacheSize"); 785 if (bdev_io_cache_size >= 0) { 786 bdev_opts.bdev_io_cache_size = bdev_io_cache_size; 787 } 788 789 if (spdk_bdev_set_opts(&bdev_opts)) { 790 spdk_bdev_init_complete(-1); 791 return; 792 } 793 794 assert(memcmp(&bdev_opts, &g_bdev_opts, sizeof(bdev_opts)) == 0); 795 } 796 797 g_init_cb_fn = cb_fn; 798 g_init_cb_arg = cb_arg; 799 800 snprintf(mempool_name, sizeof(mempool_name), "bdev_io_%d", getpid()); 801 802 g_bdev_mgr.bdev_io_pool = spdk_mempool_create(mempool_name, 803 g_bdev_opts.bdev_io_pool_size, 804 sizeof(struct spdk_bdev_io) + 805 spdk_bdev_module_get_max_ctx_size(), 806 0, 807 SPDK_ENV_SOCKET_ID_ANY); 808 809 if (g_bdev_mgr.bdev_io_pool == NULL) { 810 SPDK_ERRLOG("could not allocate spdk_bdev_io pool\n"); 811 spdk_bdev_init_complete(-1); 812 return; 813 } 814 815 /** 816 * Ensure no more than half of the total buffers end up local caches, by 817 * using spdk_thread_get_count() to determine how many local caches we need 818 * to account for. 819 */ 820 cache_size = BUF_SMALL_POOL_SIZE / (2 * spdk_thread_get_count()); 821 snprintf(mempool_name, sizeof(mempool_name), "buf_small_pool_%d", getpid()); 822 823 g_bdev_mgr.buf_small_pool = spdk_mempool_create(mempool_name, 824 BUF_SMALL_POOL_SIZE, 825 SPDK_BDEV_SMALL_BUF_MAX_SIZE + SPDK_BDEV_POOL_ALIGNMENT, 826 cache_size, 827 SPDK_ENV_SOCKET_ID_ANY); 828 if (!g_bdev_mgr.buf_small_pool) { 829 SPDK_ERRLOG("create rbuf small pool failed\n"); 830 spdk_bdev_init_complete(-1); 831 return; 832 } 833 834 cache_size = BUF_LARGE_POOL_SIZE / (2 * spdk_thread_get_count()); 835 snprintf(mempool_name, sizeof(mempool_name), "buf_large_pool_%d", getpid()); 836 837 g_bdev_mgr.buf_large_pool = spdk_mempool_create(mempool_name, 838 BUF_LARGE_POOL_SIZE, 839 SPDK_BDEV_LARGE_BUF_MAX_SIZE + SPDK_BDEV_POOL_ALIGNMENT, 840 cache_size, 841 SPDK_ENV_SOCKET_ID_ANY); 842 if (!g_bdev_mgr.buf_large_pool) { 843 SPDK_ERRLOG("create rbuf large pool failed\n"); 844 spdk_bdev_init_complete(-1); 845 return; 846 } 847 848 g_bdev_mgr.zero_buffer = spdk_dma_zmalloc(ZERO_BUFFER_SIZE, ZERO_BUFFER_SIZE, 849 NULL); 850 if (!g_bdev_mgr.zero_buffer) { 851 SPDK_ERRLOG("create bdev zero buffer failed\n"); 852 spdk_bdev_init_complete(-1); 853 return; 854 } 855 856 #ifdef SPDK_CONFIG_VTUNE 857 g_bdev_mgr.domain = __itt_domain_create("spdk_bdev"); 858 #endif 859 860 spdk_io_device_register(&g_bdev_mgr, spdk_bdev_mgmt_channel_create, 861 spdk_bdev_mgmt_channel_destroy, 862 sizeof(struct spdk_bdev_mgmt_channel), 863 "bdev_mgr"); 864 865 rc = spdk_bdev_modules_init(); 866 g_bdev_mgr.module_init_complete = true; 867 if (rc != 0) { 868 SPDK_ERRLOG("bdev modules init failed\n"); 869 spdk_thread_send_msg(spdk_get_thread(), spdk_bdev_init_failed, NULL); 870 return; 871 } 872 873 spdk_bdev_module_action_complete(); 874 } 875 876 static void 877 spdk_bdev_mgr_unregister_cb(void *io_device) 878 { 879 spdk_bdev_fini_cb cb_fn = g_fini_cb_fn; 880 881 if (spdk_mempool_count(g_bdev_mgr.bdev_io_pool) != g_bdev_opts.bdev_io_pool_size) { 882 SPDK_ERRLOG("bdev IO pool count is %zu but should be %u\n", 883 spdk_mempool_count(g_bdev_mgr.bdev_io_pool), 884 g_bdev_opts.bdev_io_pool_size); 885 } 886 887 if (spdk_mempool_count(g_bdev_mgr.buf_small_pool) != BUF_SMALL_POOL_SIZE) { 888 SPDK_ERRLOG("Small buffer pool count is %zu but should be %u\n", 889 spdk_mempool_count(g_bdev_mgr.buf_small_pool), 890 BUF_SMALL_POOL_SIZE); 891 assert(false); 892 } 893 894 if (spdk_mempool_count(g_bdev_mgr.buf_large_pool) != BUF_LARGE_POOL_SIZE) { 895 SPDK_ERRLOG("Large buffer pool count is %zu but should be %u\n", 896 spdk_mempool_count(g_bdev_mgr.buf_large_pool), 897 BUF_LARGE_POOL_SIZE); 898 assert(false); 899 } 900 901 spdk_mempool_free(g_bdev_mgr.bdev_io_pool); 902 spdk_mempool_free(g_bdev_mgr.buf_small_pool); 903 spdk_mempool_free(g_bdev_mgr.buf_large_pool); 904 spdk_dma_free(g_bdev_mgr.zero_buffer); 905 906 cb_fn(g_fini_cb_arg); 907 g_fini_cb_fn = NULL; 908 g_fini_cb_arg = NULL; 909 g_bdev_mgr.init_complete = false; 910 g_bdev_mgr.module_init_complete = false; 911 } 912 913 static void 914 spdk_bdev_module_finish_iter(void *arg) 915 { 916 struct spdk_bdev_module *bdev_module; 917 918 /* Start iterating from the last touched module */ 919 if (!g_resume_bdev_module) { 920 bdev_module = TAILQ_LAST(&g_bdev_mgr.bdev_modules, bdev_module_list); 921 } else { 922 bdev_module = TAILQ_PREV(g_resume_bdev_module, bdev_module_list, 923 internal.tailq); 924 } 925 926 while (bdev_module) { 927 if (bdev_module->async_fini) { 928 /* Save our place so we can resume later. We must 929 * save the variable here, before calling module_fini() 930 * below, because in some cases the module may immediately 931 * call spdk_bdev_module_finish_done() and re-enter 932 * this function to continue iterating. */ 933 g_resume_bdev_module = bdev_module; 934 } 935 936 if (bdev_module->module_fini) { 937 bdev_module->module_fini(); 938 } 939 940 if (bdev_module->async_fini) { 941 return; 942 } 943 944 bdev_module = TAILQ_PREV(bdev_module, bdev_module_list, 945 internal.tailq); 946 } 947 948 g_resume_bdev_module = NULL; 949 spdk_io_device_unregister(&g_bdev_mgr, spdk_bdev_mgr_unregister_cb); 950 } 951 952 void 953 spdk_bdev_module_finish_done(void) 954 { 955 if (spdk_get_thread() != g_fini_thread) { 956 spdk_thread_send_msg(g_fini_thread, spdk_bdev_module_finish_iter, NULL); 957 } else { 958 spdk_bdev_module_finish_iter(NULL); 959 } 960 } 961 962 static void 963 _spdk_bdev_finish_unregister_bdevs_iter(void *cb_arg, int bdeverrno) 964 { 965 struct spdk_bdev *bdev = cb_arg; 966 967 if (bdeverrno && bdev) { 968 SPDK_WARNLOG("Unable to unregister bdev '%s' during spdk_bdev_finish()\n", 969 bdev->name); 970 971 /* 972 * Since the call to spdk_bdev_unregister() failed, we have no way to free this 973 * bdev; try to continue by manually removing this bdev from the list and continue 974 * with the next bdev in the list. 975 */ 976 TAILQ_REMOVE(&g_bdev_mgr.bdevs, bdev, internal.link); 977 } 978 979 if (TAILQ_EMPTY(&g_bdev_mgr.bdevs)) { 980 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Done unregistering bdevs\n"); 981 /* 982 * Bdev module finish need to be deffered as we might be in the middle of some context 983 * (like bdev part free) that will use this bdev (or private bdev driver ctx data) 984 * after returning. 985 */ 986 spdk_thread_send_msg(spdk_get_thread(), spdk_bdev_module_finish_iter, NULL); 987 return; 988 } 989 990 /* 991 * Unregister the last bdev in the list. The last bdev in the list should be a bdev 992 * that has no bdevs that depend on it. 993 */ 994 bdev = TAILQ_LAST(&g_bdev_mgr.bdevs, spdk_bdev_list); 995 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Unregistering bdev '%s'\n", bdev->name); 996 spdk_bdev_unregister(bdev, _spdk_bdev_finish_unregister_bdevs_iter, bdev); 997 } 998 999 void 1000 spdk_bdev_finish(spdk_bdev_fini_cb cb_fn, void *cb_arg) 1001 { 1002 struct spdk_bdev_module *m; 1003 1004 assert(cb_fn != NULL); 1005 1006 g_fini_thread = spdk_get_thread(); 1007 1008 g_fini_cb_fn = cb_fn; 1009 g_fini_cb_arg = cb_arg; 1010 1011 TAILQ_FOREACH(m, &g_bdev_mgr.bdev_modules, internal.tailq) { 1012 if (m->fini_start) { 1013 m->fini_start(); 1014 } 1015 } 1016 1017 _spdk_bdev_finish_unregister_bdevs_iter(NULL, 0); 1018 } 1019 1020 static struct spdk_bdev_io * 1021 spdk_bdev_get_io(struct spdk_bdev_channel *channel) 1022 { 1023 struct spdk_bdev_mgmt_channel *ch = channel->shared_resource->mgmt_ch; 1024 struct spdk_bdev_io *bdev_io; 1025 1026 if (ch->per_thread_cache_count > 0) { 1027 bdev_io = STAILQ_FIRST(&ch->per_thread_cache); 1028 STAILQ_REMOVE_HEAD(&ch->per_thread_cache, internal.buf_link); 1029 ch->per_thread_cache_count--; 1030 } else if (spdk_unlikely(!TAILQ_EMPTY(&ch->io_wait_queue))) { 1031 /* 1032 * Don't try to look for bdev_ios in the global pool if there are 1033 * waiters on bdev_ios - we don't want this caller to jump the line. 1034 */ 1035 bdev_io = NULL; 1036 } else { 1037 bdev_io = spdk_mempool_get(g_bdev_mgr.bdev_io_pool); 1038 } 1039 1040 return bdev_io; 1041 } 1042 1043 void 1044 spdk_bdev_free_io(struct spdk_bdev_io *bdev_io) 1045 { 1046 struct spdk_bdev_mgmt_channel *ch = bdev_io->internal.ch->shared_resource->mgmt_ch; 1047 1048 assert(bdev_io != NULL); 1049 assert(bdev_io->internal.status != SPDK_BDEV_IO_STATUS_PENDING); 1050 1051 if (bdev_io->internal.buf != NULL) { 1052 spdk_bdev_io_put_buf(bdev_io); 1053 } 1054 1055 if (ch->per_thread_cache_count < ch->bdev_io_cache_size) { 1056 ch->per_thread_cache_count++; 1057 STAILQ_INSERT_TAIL(&ch->per_thread_cache, bdev_io, internal.buf_link); 1058 while (ch->per_thread_cache_count > 0 && !TAILQ_EMPTY(&ch->io_wait_queue)) { 1059 struct spdk_bdev_io_wait_entry *entry; 1060 1061 entry = TAILQ_FIRST(&ch->io_wait_queue); 1062 TAILQ_REMOVE(&ch->io_wait_queue, entry, link); 1063 entry->cb_fn(entry->cb_arg); 1064 } 1065 } else { 1066 /* We should never have a full cache with entries on the io wait queue. */ 1067 assert(TAILQ_EMPTY(&ch->io_wait_queue)); 1068 spdk_mempool_put(g_bdev_mgr.bdev_io_pool, (void *)bdev_io); 1069 } 1070 } 1071 1072 static bool 1073 _spdk_bdev_qos_is_iops_rate_limit(enum spdk_bdev_qos_rate_limit_type limit) 1074 { 1075 assert(limit != SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES); 1076 1077 switch (limit) { 1078 case SPDK_BDEV_QOS_RW_IOPS_RATE_LIMIT: 1079 return true; 1080 case SPDK_BDEV_QOS_RW_BPS_RATE_LIMIT: 1081 return false; 1082 case SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES: 1083 default: 1084 return false; 1085 } 1086 } 1087 1088 static bool 1089 _spdk_bdev_qos_io_to_limit(struct spdk_bdev_io *bdev_io) 1090 { 1091 switch (bdev_io->type) { 1092 case SPDK_BDEV_IO_TYPE_NVME_IO: 1093 case SPDK_BDEV_IO_TYPE_NVME_IO_MD: 1094 case SPDK_BDEV_IO_TYPE_READ: 1095 case SPDK_BDEV_IO_TYPE_WRITE: 1096 case SPDK_BDEV_IO_TYPE_UNMAP: 1097 case SPDK_BDEV_IO_TYPE_WRITE_ZEROES: 1098 return true; 1099 default: 1100 return false; 1101 } 1102 } 1103 1104 static uint64_t 1105 _spdk_bdev_get_io_size_in_byte(struct spdk_bdev_io *bdev_io) 1106 { 1107 struct spdk_bdev *bdev = bdev_io->bdev; 1108 1109 switch (bdev_io->type) { 1110 case SPDK_BDEV_IO_TYPE_NVME_IO: 1111 case SPDK_BDEV_IO_TYPE_NVME_IO_MD: 1112 return bdev_io->u.nvme_passthru.nbytes; 1113 case SPDK_BDEV_IO_TYPE_READ: 1114 case SPDK_BDEV_IO_TYPE_WRITE: 1115 case SPDK_BDEV_IO_TYPE_UNMAP: 1116 case SPDK_BDEV_IO_TYPE_WRITE_ZEROES: 1117 return bdev_io->u.bdev.num_blocks * bdev->blocklen; 1118 default: 1119 return 0; 1120 } 1121 } 1122 1123 static void 1124 _spdk_bdev_qos_update_per_io(struct spdk_bdev_qos *qos, uint64_t io_size_in_byte) 1125 { 1126 int i; 1127 1128 for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) { 1129 if (qos->rate_limits[i].limit == SPDK_BDEV_QOS_LIMIT_NOT_DEFINED) { 1130 continue; 1131 } 1132 1133 switch (i) { 1134 case SPDK_BDEV_QOS_RW_IOPS_RATE_LIMIT: 1135 qos->rate_limits[i].remaining_this_timeslice--; 1136 break; 1137 case SPDK_BDEV_QOS_RW_BPS_RATE_LIMIT: 1138 qos->rate_limits[i].remaining_this_timeslice -= io_size_in_byte; 1139 break; 1140 case SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES: 1141 default: 1142 break; 1143 } 1144 } 1145 } 1146 1147 static int 1148 _spdk_bdev_qos_io_submit(struct spdk_bdev_channel *ch, struct spdk_bdev_qos *qos) 1149 { 1150 struct spdk_bdev_io *bdev_io = NULL; 1151 struct spdk_bdev *bdev = ch->bdev; 1152 struct spdk_bdev_shared_resource *shared_resource = ch->shared_resource; 1153 int i, submitted_ios = 0; 1154 bool to_limit_io; 1155 uint64_t io_size_in_byte; 1156 1157 while (!TAILQ_EMPTY(&qos->queued)) { 1158 for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) { 1159 if (qos->rate_limits[i].max_per_timeslice > 0 && 1160 (qos->rate_limits[i].remaining_this_timeslice <= 0)) { 1161 return submitted_ios; 1162 } 1163 } 1164 1165 bdev_io = TAILQ_FIRST(&qos->queued); 1166 TAILQ_REMOVE(&qos->queued, bdev_io, internal.link); 1167 ch->io_outstanding++; 1168 shared_resource->io_outstanding++; 1169 to_limit_io = _spdk_bdev_qos_io_to_limit(bdev_io); 1170 if (to_limit_io == true) { 1171 io_size_in_byte = _spdk_bdev_get_io_size_in_byte(bdev_io); 1172 _spdk_bdev_qos_update_per_io(qos, io_size_in_byte); 1173 } 1174 bdev->fn_table->submit_request(ch->channel, bdev_io); 1175 submitted_ios++; 1176 } 1177 1178 return submitted_ios; 1179 } 1180 1181 static void 1182 _spdk_bdev_queue_io_wait_with_cb(struct spdk_bdev_io *bdev_io, spdk_bdev_io_wait_cb cb_fn) 1183 { 1184 int rc; 1185 1186 bdev_io->internal.waitq_entry.bdev = bdev_io->bdev; 1187 bdev_io->internal.waitq_entry.cb_fn = cb_fn; 1188 bdev_io->internal.waitq_entry.cb_arg = bdev_io; 1189 rc = spdk_bdev_queue_io_wait(bdev_io->bdev, spdk_io_channel_from_ctx(bdev_io->internal.ch), 1190 &bdev_io->internal.waitq_entry); 1191 if (rc != 0) { 1192 SPDK_ERRLOG("Queue IO failed, rc=%d\n", rc); 1193 bdev_io->internal.status = SPDK_BDEV_IO_STATUS_FAILED; 1194 bdev_io->internal.cb(bdev_io, false, bdev_io->internal.caller_ctx); 1195 } 1196 } 1197 1198 static bool 1199 _spdk_bdev_io_type_can_split(uint8_t type) 1200 { 1201 assert(type != SPDK_BDEV_IO_TYPE_INVALID); 1202 assert(type < SPDK_BDEV_NUM_IO_TYPES); 1203 1204 /* Only split READ and WRITE I/O. Theoretically other types of I/O like 1205 * UNMAP could be split, but these types of I/O are typically much larger 1206 * in size (sometimes the size of the entire block device), and the bdev 1207 * module can more efficiently split these types of I/O. Plus those types 1208 * of I/O do not have a payload, which makes the splitting process simpler. 1209 */ 1210 if (type == SPDK_BDEV_IO_TYPE_READ || type == SPDK_BDEV_IO_TYPE_WRITE) { 1211 return true; 1212 } else { 1213 return false; 1214 } 1215 } 1216 1217 static bool 1218 _spdk_bdev_io_should_split(struct spdk_bdev_io *bdev_io) 1219 { 1220 uint64_t start_stripe, end_stripe; 1221 uint32_t io_boundary = bdev_io->bdev->optimal_io_boundary; 1222 1223 if (io_boundary == 0) { 1224 return false; 1225 } 1226 1227 if (!_spdk_bdev_io_type_can_split(bdev_io->type)) { 1228 return false; 1229 } 1230 1231 start_stripe = bdev_io->u.bdev.offset_blocks; 1232 end_stripe = start_stripe + bdev_io->u.bdev.num_blocks - 1; 1233 /* Avoid expensive div operations if possible. These spdk_u32 functions are very cheap. */ 1234 if (spdk_likely(spdk_u32_is_pow2(io_boundary))) { 1235 start_stripe >>= spdk_u32log2(io_boundary); 1236 end_stripe >>= spdk_u32log2(io_boundary); 1237 } else { 1238 start_stripe /= io_boundary; 1239 end_stripe /= io_boundary; 1240 } 1241 return (start_stripe != end_stripe); 1242 } 1243 1244 static uint32_t 1245 _to_next_boundary(uint64_t offset, uint32_t boundary) 1246 { 1247 return (boundary - (offset % boundary)); 1248 } 1249 1250 static void 1251 _spdk_bdev_io_split_done(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg); 1252 1253 static void 1254 _spdk_bdev_io_split_with_payload(void *_bdev_io) 1255 { 1256 struct spdk_bdev_io *bdev_io = _bdev_io; 1257 uint64_t current_offset, remaining; 1258 uint32_t blocklen, to_next_boundary, to_next_boundary_bytes; 1259 struct iovec *parent_iov, *iov; 1260 uint64_t parent_iov_offset, iov_len; 1261 uint32_t parent_iovpos, parent_iovcnt, child_iovcnt, iovcnt; 1262 int rc; 1263 1264 remaining = bdev_io->u.bdev.split_remaining_num_blocks; 1265 current_offset = bdev_io->u.bdev.split_current_offset_blocks; 1266 blocklen = bdev_io->bdev->blocklen; 1267 parent_iov_offset = (current_offset - bdev_io->u.bdev.offset_blocks) * blocklen; 1268 parent_iovcnt = bdev_io->u.bdev.iovcnt; 1269 1270 for (parent_iovpos = 0; parent_iovpos < parent_iovcnt; parent_iovpos++) { 1271 parent_iov = &bdev_io->u.bdev.iovs[parent_iovpos]; 1272 if (parent_iov_offset < parent_iov->iov_len) { 1273 break; 1274 } 1275 parent_iov_offset -= parent_iov->iov_len; 1276 } 1277 1278 child_iovcnt = 0; 1279 while (remaining > 0 && parent_iovpos < parent_iovcnt && child_iovcnt < BDEV_IO_NUM_CHILD_IOV) { 1280 to_next_boundary = _to_next_boundary(current_offset, bdev_io->bdev->optimal_io_boundary); 1281 to_next_boundary = spdk_min(remaining, to_next_boundary); 1282 to_next_boundary_bytes = to_next_boundary * blocklen; 1283 iov = &bdev_io->child_iov[child_iovcnt]; 1284 iovcnt = 0; 1285 while (to_next_boundary_bytes > 0 && parent_iovpos < parent_iovcnt && 1286 child_iovcnt < BDEV_IO_NUM_CHILD_IOV) { 1287 parent_iov = &bdev_io->u.bdev.iovs[parent_iovpos]; 1288 iov_len = spdk_min(to_next_boundary_bytes, parent_iov->iov_len - parent_iov_offset); 1289 to_next_boundary_bytes -= iov_len; 1290 1291 bdev_io->child_iov[child_iovcnt].iov_base = parent_iov->iov_base + parent_iov_offset; 1292 bdev_io->child_iov[child_iovcnt].iov_len = iov_len; 1293 1294 if (iov_len < parent_iov->iov_len - parent_iov_offset) { 1295 parent_iov_offset += iov_len; 1296 } else { 1297 parent_iovpos++; 1298 parent_iov_offset = 0; 1299 } 1300 child_iovcnt++; 1301 iovcnt++; 1302 } 1303 1304 if (to_next_boundary_bytes > 0) { 1305 /* We had to stop this child I/O early because we ran out of 1306 * child_iov space. Make sure the iovs collected are valid and 1307 * then adjust to_next_boundary before starting the child I/O. 1308 */ 1309 if ((to_next_boundary_bytes % blocklen) != 0) { 1310 SPDK_ERRLOG("Remaining %" PRIu32 " is not multiple of block size %" PRIu32 "\n", 1311 to_next_boundary_bytes, blocklen); 1312 bdev_io->internal.status = SPDK_BDEV_IO_STATUS_FAILED; 1313 if (bdev_io->u.bdev.split_outstanding == 0) { 1314 bdev_io->internal.cb(bdev_io, false, bdev_io->internal.caller_ctx); 1315 } 1316 return; 1317 } 1318 to_next_boundary -= to_next_boundary_bytes / blocklen; 1319 } 1320 1321 bdev_io->u.bdev.split_outstanding++; 1322 1323 if (bdev_io->type == SPDK_BDEV_IO_TYPE_READ) { 1324 rc = spdk_bdev_readv_blocks(bdev_io->internal.desc, 1325 spdk_io_channel_from_ctx(bdev_io->internal.ch), 1326 iov, iovcnt, current_offset, to_next_boundary, 1327 _spdk_bdev_io_split_done, bdev_io); 1328 } else { 1329 rc = spdk_bdev_writev_blocks(bdev_io->internal.desc, 1330 spdk_io_channel_from_ctx(bdev_io->internal.ch), 1331 iov, iovcnt, current_offset, to_next_boundary, 1332 _spdk_bdev_io_split_done, bdev_io); 1333 } 1334 1335 if (rc == 0) { 1336 current_offset += to_next_boundary; 1337 remaining -= to_next_boundary; 1338 bdev_io->u.bdev.split_current_offset_blocks = current_offset; 1339 bdev_io->u.bdev.split_remaining_num_blocks = remaining; 1340 } else { 1341 bdev_io->u.bdev.split_outstanding--; 1342 if (rc == -ENOMEM) { 1343 if (bdev_io->u.bdev.split_outstanding == 0) { 1344 /* No I/O is outstanding. Hence we should wait here. */ 1345 _spdk_bdev_queue_io_wait_with_cb(bdev_io, 1346 _spdk_bdev_io_split_with_payload); 1347 } 1348 } else { 1349 bdev_io->internal.status = SPDK_BDEV_IO_STATUS_FAILED; 1350 if (bdev_io->u.bdev.split_outstanding == 0) { 1351 bdev_io->internal.cb(bdev_io, false, bdev_io->internal.caller_ctx); 1352 } 1353 } 1354 1355 return; 1356 } 1357 } 1358 } 1359 1360 static void 1361 _spdk_bdev_io_split_done(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg) 1362 { 1363 struct spdk_bdev_io *parent_io = cb_arg; 1364 1365 spdk_bdev_free_io(bdev_io); 1366 1367 if (!success) { 1368 parent_io->internal.status = SPDK_BDEV_IO_STATUS_FAILED; 1369 } 1370 parent_io->u.bdev.split_outstanding--; 1371 if (parent_io->u.bdev.split_outstanding != 0) { 1372 return; 1373 } 1374 1375 /* 1376 * Parent I/O finishes when all blocks are consumed or there is any failure of 1377 * child I/O and no outstanding child I/O. 1378 */ 1379 if (parent_io->u.bdev.split_remaining_num_blocks == 0 || 1380 parent_io->internal.status != SPDK_BDEV_IO_STATUS_SUCCESS) { 1381 parent_io->internal.cb(parent_io, parent_io->internal.status == SPDK_BDEV_IO_STATUS_SUCCESS, 1382 parent_io->internal.caller_ctx); 1383 return; 1384 } 1385 1386 /* 1387 * Continue with the splitting process. This function will complete the parent I/O if the 1388 * splitting is done. 1389 */ 1390 _spdk_bdev_io_split_with_payload(parent_io); 1391 } 1392 1393 static void 1394 _spdk_bdev_io_split(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io) 1395 { 1396 assert(_spdk_bdev_io_type_can_split(bdev_io->type)); 1397 1398 bdev_io->u.bdev.split_current_offset_blocks = bdev_io->u.bdev.offset_blocks; 1399 bdev_io->u.bdev.split_remaining_num_blocks = bdev_io->u.bdev.num_blocks; 1400 bdev_io->u.bdev.split_outstanding = 0; 1401 bdev_io->internal.status = SPDK_BDEV_IO_STATUS_SUCCESS; 1402 1403 _spdk_bdev_io_split_with_payload(bdev_io); 1404 } 1405 1406 static void 1407 _spdk_bdev_io_submit(void *ctx) 1408 { 1409 struct spdk_bdev_io *bdev_io = ctx; 1410 struct spdk_bdev *bdev = bdev_io->bdev; 1411 struct spdk_bdev_channel *bdev_ch = bdev_io->internal.ch; 1412 struct spdk_io_channel *ch = bdev_ch->channel; 1413 struct spdk_bdev_shared_resource *shared_resource = bdev_ch->shared_resource; 1414 uint64_t tsc; 1415 1416 tsc = spdk_get_ticks(); 1417 bdev_io->internal.submit_tsc = tsc; 1418 spdk_trace_record_tsc(tsc, TRACE_BDEV_IO_START, 0, 0, (uintptr_t)bdev_io, bdev_io->type); 1419 bdev_ch->io_outstanding++; 1420 shared_resource->io_outstanding++; 1421 bdev_io->internal.in_submit_request = true; 1422 if (spdk_likely(bdev_ch->flags == 0)) { 1423 if (spdk_likely(TAILQ_EMPTY(&shared_resource->nomem_io))) { 1424 bdev->fn_table->submit_request(ch, bdev_io); 1425 } else { 1426 bdev_ch->io_outstanding--; 1427 shared_resource->io_outstanding--; 1428 TAILQ_INSERT_TAIL(&shared_resource->nomem_io, bdev_io, internal.link); 1429 } 1430 } else if (bdev_ch->flags & BDEV_CH_RESET_IN_PROGRESS) { 1431 spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED); 1432 } else if (bdev_ch->flags & BDEV_CH_QOS_ENABLED) { 1433 bdev_ch->io_outstanding--; 1434 shared_resource->io_outstanding--; 1435 TAILQ_INSERT_TAIL(&bdev->internal.qos->queued, bdev_io, internal.link); 1436 _spdk_bdev_qos_io_submit(bdev_ch, bdev->internal.qos); 1437 } else { 1438 SPDK_ERRLOG("unknown bdev_ch flag %x found\n", bdev_ch->flags); 1439 spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED); 1440 } 1441 bdev_io->internal.in_submit_request = false; 1442 } 1443 1444 static void 1445 spdk_bdev_io_submit(struct spdk_bdev_io *bdev_io) 1446 { 1447 struct spdk_bdev *bdev = bdev_io->bdev; 1448 struct spdk_thread *thread = spdk_io_channel_get_thread(bdev_io->internal.ch->channel); 1449 1450 assert(thread != NULL); 1451 assert(bdev_io->internal.status == SPDK_BDEV_IO_STATUS_PENDING); 1452 1453 if (bdev->split_on_optimal_io_boundary && _spdk_bdev_io_should_split(bdev_io)) { 1454 if (bdev_io->type == SPDK_BDEV_IO_TYPE_READ) { 1455 spdk_bdev_io_get_buf(bdev_io, _spdk_bdev_io_split, 1456 bdev_io->u.bdev.num_blocks * bdev_io->bdev->blocklen); 1457 } else { 1458 _spdk_bdev_io_split(NULL, bdev_io); 1459 } 1460 return; 1461 } 1462 1463 if (bdev_io->internal.ch->flags & BDEV_CH_QOS_ENABLED) { 1464 if ((thread == bdev->internal.qos->thread) || !bdev->internal.qos->thread) { 1465 _spdk_bdev_io_submit(bdev_io); 1466 } else { 1467 bdev_io->internal.io_submit_ch = bdev_io->internal.ch; 1468 bdev_io->internal.ch = bdev->internal.qos->ch; 1469 spdk_thread_send_msg(bdev->internal.qos->thread, _spdk_bdev_io_submit, bdev_io); 1470 } 1471 } else { 1472 _spdk_bdev_io_submit(bdev_io); 1473 } 1474 } 1475 1476 static void 1477 spdk_bdev_io_submit_reset(struct spdk_bdev_io *bdev_io) 1478 { 1479 struct spdk_bdev *bdev = bdev_io->bdev; 1480 struct spdk_bdev_channel *bdev_ch = bdev_io->internal.ch; 1481 struct spdk_io_channel *ch = bdev_ch->channel; 1482 1483 assert(bdev_io->internal.status == SPDK_BDEV_IO_STATUS_PENDING); 1484 1485 bdev_io->internal.in_submit_request = true; 1486 bdev->fn_table->submit_request(ch, bdev_io); 1487 bdev_io->internal.in_submit_request = false; 1488 } 1489 1490 static void 1491 spdk_bdev_io_init(struct spdk_bdev_io *bdev_io, 1492 struct spdk_bdev *bdev, void *cb_arg, 1493 spdk_bdev_io_completion_cb cb) 1494 { 1495 bdev_io->bdev = bdev; 1496 bdev_io->internal.caller_ctx = cb_arg; 1497 bdev_io->internal.cb = cb; 1498 bdev_io->internal.status = SPDK_BDEV_IO_STATUS_PENDING; 1499 bdev_io->internal.in_submit_request = false; 1500 bdev_io->internal.buf = NULL; 1501 bdev_io->internal.io_submit_ch = NULL; 1502 } 1503 1504 static bool 1505 _spdk_bdev_io_type_supported(struct spdk_bdev *bdev, enum spdk_bdev_io_type io_type) 1506 { 1507 return bdev->fn_table->io_type_supported(bdev->ctxt, io_type); 1508 } 1509 1510 bool 1511 spdk_bdev_io_type_supported(struct spdk_bdev *bdev, enum spdk_bdev_io_type io_type) 1512 { 1513 bool supported; 1514 1515 supported = _spdk_bdev_io_type_supported(bdev, io_type); 1516 1517 if (!supported) { 1518 switch (io_type) { 1519 case SPDK_BDEV_IO_TYPE_WRITE_ZEROES: 1520 /* The bdev layer will emulate write zeroes as long as write is supported. */ 1521 supported = _spdk_bdev_io_type_supported(bdev, SPDK_BDEV_IO_TYPE_WRITE); 1522 break; 1523 default: 1524 break; 1525 } 1526 } 1527 1528 return supported; 1529 } 1530 1531 int 1532 spdk_bdev_dump_info_json(struct spdk_bdev *bdev, struct spdk_json_write_ctx *w) 1533 { 1534 if (bdev->fn_table->dump_info_json) { 1535 return bdev->fn_table->dump_info_json(bdev->ctxt, w); 1536 } 1537 1538 return 0; 1539 } 1540 1541 static void 1542 spdk_bdev_qos_update_max_quota_per_timeslice(struct spdk_bdev_qos *qos) 1543 { 1544 uint32_t max_per_timeslice = 0; 1545 int i; 1546 1547 for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) { 1548 if (qos->rate_limits[i].limit == SPDK_BDEV_QOS_LIMIT_NOT_DEFINED) { 1549 qos->rate_limits[i].max_per_timeslice = 0; 1550 continue; 1551 } 1552 1553 max_per_timeslice = qos->rate_limits[i].limit * 1554 SPDK_BDEV_QOS_TIMESLICE_IN_USEC / SPDK_SEC_TO_USEC; 1555 1556 qos->rate_limits[i].max_per_timeslice = spdk_max(max_per_timeslice, 1557 qos->rate_limits[i].min_per_timeslice); 1558 1559 qos->rate_limits[i].remaining_this_timeslice = qos->rate_limits[i].max_per_timeslice; 1560 } 1561 } 1562 1563 static int 1564 spdk_bdev_channel_poll_qos(void *arg) 1565 { 1566 struct spdk_bdev_qos *qos = arg; 1567 uint64_t now = spdk_get_ticks(); 1568 int i; 1569 1570 if (now < (qos->last_timeslice + qos->timeslice_size)) { 1571 /* We received our callback earlier than expected - return 1572 * immediately and wait to do accounting until at least one 1573 * timeslice has actually expired. This should never happen 1574 * with a well-behaved timer implementation. 1575 */ 1576 return 0; 1577 } 1578 1579 /* Reset for next round of rate limiting */ 1580 for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) { 1581 /* We may have allowed the IOs or bytes to slightly overrun in the last 1582 * timeslice. remaining_this_timeslice is signed, so if it's negative 1583 * here, we'll account for the overrun so that the next timeslice will 1584 * be appropriately reduced. 1585 */ 1586 if (qos->rate_limits[i].remaining_this_timeslice > 0) { 1587 qos->rate_limits[i].remaining_this_timeslice = 0; 1588 } 1589 } 1590 1591 while (now >= (qos->last_timeslice + qos->timeslice_size)) { 1592 qos->last_timeslice += qos->timeslice_size; 1593 for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) { 1594 qos->rate_limits[i].remaining_this_timeslice += 1595 qos->rate_limits[i].max_per_timeslice; 1596 } 1597 } 1598 1599 return _spdk_bdev_qos_io_submit(qos->ch, qos); 1600 } 1601 1602 static void 1603 _spdk_bdev_channel_destroy_resource(struct spdk_bdev_channel *ch) 1604 { 1605 struct spdk_bdev_shared_resource *shared_resource; 1606 1607 if (!ch) { 1608 return; 1609 } 1610 1611 if (ch->channel) { 1612 spdk_put_io_channel(ch->channel); 1613 } 1614 1615 assert(ch->io_outstanding == 0); 1616 1617 shared_resource = ch->shared_resource; 1618 if (shared_resource) { 1619 assert(ch->io_outstanding == 0); 1620 assert(shared_resource->ref > 0); 1621 shared_resource->ref--; 1622 if (shared_resource->ref == 0) { 1623 assert(shared_resource->io_outstanding == 0); 1624 TAILQ_REMOVE(&shared_resource->mgmt_ch->shared_resources, shared_resource, link); 1625 spdk_put_io_channel(spdk_io_channel_from_ctx(shared_resource->mgmt_ch)); 1626 free(shared_resource); 1627 } 1628 } 1629 } 1630 1631 /* Caller must hold bdev->internal.mutex. */ 1632 static void 1633 _spdk_bdev_enable_qos(struct spdk_bdev *bdev, struct spdk_bdev_channel *ch) 1634 { 1635 struct spdk_bdev_qos *qos = bdev->internal.qos; 1636 int i; 1637 1638 /* Rate limiting on this bdev enabled */ 1639 if (qos) { 1640 if (qos->ch == NULL) { 1641 struct spdk_io_channel *io_ch; 1642 1643 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Selecting channel %p as QoS channel for bdev %s on thread %p\n", ch, 1644 bdev->name, spdk_get_thread()); 1645 1646 /* No qos channel has been selected, so set one up */ 1647 1648 /* Take another reference to ch */ 1649 io_ch = spdk_get_io_channel(__bdev_to_io_dev(bdev)); 1650 qos->ch = ch; 1651 1652 qos->thread = spdk_io_channel_get_thread(io_ch); 1653 1654 TAILQ_INIT(&qos->queued); 1655 1656 for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) { 1657 if (_spdk_bdev_qos_is_iops_rate_limit(i) == true) { 1658 qos->rate_limits[i].min_per_timeslice = 1659 SPDK_BDEV_QOS_MIN_IO_PER_TIMESLICE; 1660 } else { 1661 qos->rate_limits[i].min_per_timeslice = 1662 SPDK_BDEV_QOS_MIN_BYTE_PER_TIMESLICE; 1663 } 1664 1665 if (qos->rate_limits[i].limit == 0) { 1666 qos->rate_limits[i].limit = SPDK_BDEV_QOS_LIMIT_NOT_DEFINED; 1667 } 1668 } 1669 spdk_bdev_qos_update_max_quota_per_timeslice(qos); 1670 qos->timeslice_size = 1671 SPDK_BDEV_QOS_TIMESLICE_IN_USEC * spdk_get_ticks_hz() / SPDK_SEC_TO_USEC; 1672 qos->last_timeslice = spdk_get_ticks(); 1673 qos->poller = spdk_poller_register(spdk_bdev_channel_poll_qos, 1674 qos, 1675 SPDK_BDEV_QOS_TIMESLICE_IN_USEC); 1676 } 1677 1678 ch->flags |= BDEV_CH_QOS_ENABLED; 1679 } 1680 } 1681 1682 static int 1683 spdk_bdev_channel_create(void *io_device, void *ctx_buf) 1684 { 1685 struct spdk_bdev *bdev = __bdev_from_io_dev(io_device); 1686 struct spdk_bdev_channel *ch = ctx_buf; 1687 struct spdk_io_channel *mgmt_io_ch; 1688 struct spdk_bdev_mgmt_channel *mgmt_ch; 1689 struct spdk_bdev_shared_resource *shared_resource; 1690 1691 ch->bdev = bdev; 1692 ch->channel = bdev->fn_table->get_io_channel(bdev->ctxt); 1693 if (!ch->channel) { 1694 return -1; 1695 } 1696 1697 mgmt_io_ch = spdk_get_io_channel(&g_bdev_mgr); 1698 if (!mgmt_io_ch) { 1699 return -1; 1700 } 1701 1702 mgmt_ch = spdk_io_channel_get_ctx(mgmt_io_ch); 1703 TAILQ_FOREACH(shared_resource, &mgmt_ch->shared_resources, link) { 1704 if (shared_resource->shared_ch == ch->channel) { 1705 spdk_put_io_channel(mgmt_io_ch); 1706 shared_resource->ref++; 1707 break; 1708 } 1709 } 1710 1711 if (shared_resource == NULL) { 1712 shared_resource = calloc(1, sizeof(*shared_resource)); 1713 if (shared_resource == NULL) { 1714 spdk_put_io_channel(mgmt_io_ch); 1715 return -1; 1716 } 1717 1718 shared_resource->mgmt_ch = mgmt_ch; 1719 shared_resource->io_outstanding = 0; 1720 TAILQ_INIT(&shared_resource->nomem_io); 1721 shared_resource->nomem_threshold = 0; 1722 shared_resource->shared_ch = ch->channel; 1723 shared_resource->ref = 1; 1724 TAILQ_INSERT_TAIL(&mgmt_ch->shared_resources, shared_resource, link); 1725 } 1726 1727 memset(&ch->stat, 0, sizeof(ch->stat)); 1728 ch->stat.ticks_rate = spdk_get_ticks_hz(); 1729 ch->io_outstanding = 0; 1730 TAILQ_INIT(&ch->queued_resets); 1731 ch->flags = 0; 1732 ch->shared_resource = shared_resource; 1733 1734 #ifdef SPDK_CONFIG_VTUNE 1735 { 1736 char *name; 1737 __itt_init_ittlib(NULL, 0); 1738 name = spdk_sprintf_alloc("spdk_bdev_%s_%p", ch->bdev->name, ch); 1739 if (!name) { 1740 _spdk_bdev_channel_destroy_resource(ch); 1741 return -1; 1742 } 1743 ch->handle = __itt_string_handle_create(name); 1744 free(name); 1745 ch->start_tsc = spdk_get_ticks(); 1746 ch->interval_tsc = spdk_get_ticks_hz() / 100; 1747 memset(&ch->prev_stat, 0, sizeof(ch->prev_stat)); 1748 } 1749 #endif 1750 1751 pthread_mutex_lock(&bdev->internal.mutex); 1752 _spdk_bdev_enable_qos(bdev, ch); 1753 pthread_mutex_unlock(&bdev->internal.mutex); 1754 1755 return 0; 1756 } 1757 1758 /* 1759 * Abort I/O that are waiting on a data buffer. These types of I/O are 1760 * linked using the spdk_bdev_io internal.buf_link TAILQ_ENTRY. 1761 */ 1762 static void 1763 _spdk_bdev_abort_buf_io(bdev_io_stailq_t *queue, struct spdk_bdev_channel *ch) 1764 { 1765 bdev_io_stailq_t tmp; 1766 struct spdk_bdev_io *bdev_io; 1767 1768 STAILQ_INIT(&tmp); 1769 1770 while (!STAILQ_EMPTY(queue)) { 1771 bdev_io = STAILQ_FIRST(queue); 1772 STAILQ_REMOVE_HEAD(queue, internal.buf_link); 1773 if (bdev_io->internal.ch == ch) { 1774 spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED); 1775 } else { 1776 STAILQ_INSERT_TAIL(&tmp, bdev_io, internal.buf_link); 1777 } 1778 } 1779 1780 STAILQ_SWAP(&tmp, queue, spdk_bdev_io); 1781 } 1782 1783 /* 1784 * Abort I/O that are queued waiting for submission. These types of I/O are 1785 * linked using the spdk_bdev_io link TAILQ_ENTRY. 1786 */ 1787 static void 1788 _spdk_bdev_abort_queued_io(bdev_io_tailq_t *queue, struct spdk_bdev_channel *ch) 1789 { 1790 struct spdk_bdev_io *bdev_io, *tmp; 1791 1792 TAILQ_FOREACH_SAFE(bdev_io, queue, internal.link, tmp) { 1793 if (bdev_io->internal.ch == ch) { 1794 TAILQ_REMOVE(queue, bdev_io, internal.link); 1795 /* 1796 * spdk_bdev_io_complete() assumes that the completed I/O had 1797 * been submitted to the bdev module. Since in this case it 1798 * hadn't, bump io_outstanding to account for the decrement 1799 * that spdk_bdev_io_complete() will do. 1800 */ 1801 if (bdev_io->type != SPDK_BDEV_IO_TYPE_RESET) { 1802 ch->io_outstanding++; 1803 ch->shared_resource->io_outstanding++; 1804 } 1805 spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED); 1806 } 1807 } 1808 } 1809 1810 static void 1811 spdk_bdev_qos_channel_destroy(void *cb_arg) 1812 { 1813 struct spdk_bdev_qos *qos = cb_arg; 1814 1815 spdk_put_io_channel(spdk_io_channel_from_ctx(qos->ch)); 1816 spdk_poller_unregister(&qos->poller); 1817 1818 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Free QoS %p.\n", qos); 1819 1820 free(qos); 1821 } 1822 1823 static int 1824 spdk_bdev_qos_destroy(struct spdk_bdev *bdev) 1825 { 1826 int i; 1827 1828 /* 1829 * Cleanly shutting down the QoS poller is tricky, because 1830 * during the asynchronous operation the user could open 1831 * a new descriptor and create a new channel, spawning 1832 * a new QoS poller. 1833 * 1834 * The strategy is to create a new QoS structure here and swap it 1835 * in. The shutdown path then continues to refer to the old one 1836 * until it completes and then releases it. 1837 */ 1838 struct spdk_bdev_qos *new_qos, *old_qos; 1839 1840 old_qos = bdev->internal.qos; 1841 1842 new_qos = calloc(1, sizeof(*new_qos)); 1843 if (!new_qos) { 1844 SPDK_ERRLOG("Unable to allocate memory to shut down QoS.\n"); 1845 return -ENOMEM; 1846 } 1847 1848 /* Copy the old QoS data into the newly allocated structure */ 1849 memcpy(new_qos, old_qos, sizeof(*new_qos)); 1850 1851 /* Zero out the key parts of the QoS structure */ 1852 new_qos->ch = NULL; 1853 new_qos->thread = NULL; 1854 new_qos->poller = NULL; 1855 TAILQ_INIT(&new_qos->queued); 1856 /* 1857 * The limit member of spdk_bdev_qos_limit structure is not zeroed. 1858 * It will be used later for the new QoS structure. 1859 */ 1860 for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) { 1861 new_qos->rate_limits[i].remaining_this_timeslice = 0; 1862 new_qos->rate_limits[i].min_per_timeslice = 0; 1863 new_qos->rate_limits[i].max_per_timeslice = 0; 1864 } 1865 1866 bdev->internal.qos = new_qos; 1867 1868 if (old_qos->thread == NULL) { 1869 free(old_qos); 1870 } else { 1871 spdk_thread_send_msg(old_qos->thread, spdk_bdev_qos_channel_destroy, 1872 old_qos); 1873 } 1874 1875 /* It is safe to continue with destroying the bdev even though the QoS channel hasn't 1876 * been destroyed yet. The destruction path will end up waiting for the final 1877 * channel to be put before it releases resources. */ 1878 1879 return 0; 1880 } 1881 1882 static void 1883 _spdk_bdev_io_stat_add(struct spdk_bdev_io_stat *total, struct spdk_bdev_io_stat *add) 1884 { 1885 total->bytes_read += add->bytes_read; 1886 total->num_read_ops += add->num_read_ops; 1887 total->bytes_written += add->bytes_written; 1888 total->num_write_ops += add->num_write_ops; 1889 total->read_latency_ticks += add->read_latency_ticks; 1890 total->write_latency_ticks += add->write_latency_ticks; 1891 } 1892 1893 static void 1894 spdk_bdev_channel_destroy(void *io_device, void *ctx_buf) 1895 { 1896 struct spdk_bdev_channel *ch = ctx_buf; 1897 struct spdk_bdev_mgmt_channel *mgmt_ch; 1898 struct spdk_bdev_shared_resource *shared_resource = ch->shared_resource; 1899 1900 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Destroying channel %p for bdev %s on thread %p\n", ch, ch->bdev->name, 1901 spdk_get_thread()); 1902 1903 /* This channel is going away, so add its statistics into the bdev so that they don't get lost. */ 1904 pthread_mutex_lock(&ch->bdev->internal.mutex); 1905 _spdk_bdev_io_stat_add(&ch->bdev->internal.stat, &ch->stat); 1906 pthread_mutex_unlock(&ch->bdev->internal.mutex); 1907 1908 mgmt_ch = shared_resource->mgmt_ch; 1909 1910 _spdk_bdev_abort_queued_io(&ch->queued_resets, ch); 1911 _spdk_bdev_abort_queued_io(&shared_resource->nomem_io, ch); 1912 _spdk_bdev_abort_buf_io(&mgmt_ch->need_buf_small, ch); 1913 _spdk_bdev_abort_buf_io(&mgmt_ch->need_buf_large, ch); 1914 1915 _spdk_bdev_channel_destroy_resource(ch); 1916 } 1917 1918 int 1919 spdk_bdev_alias_add(struct spdk_bdev *bdev, const char *alias) 1920 { 1921 struct spdk_bdev_alias *tmp; 1922 1923 if (alias == NULL) { 1924 SPDK_ERRLOG("Empty alias passed\n"); 1925 return -EINVAL; 1926 } 1927 1928 if (spdk_bdev_get_by_name(alias)) { 1929 SPDK_ERRLOG("Bdev name/alias: %s already exists\n", alias); 1930 return -EEXIST; 1931 } 1932 1933 tmp = calloc(1, sizeof(*tmp)); 1934 if (tmp == NULL) { 1935 SPDK_ERRLOG("Unable to allocate alias\n"); 1936 return -ENOMEM; 1937 } 1938 1939 tmp->alias = strdup(alias); 1940 if (tmp->alias == NULL) { 1941 free(tmp); 1942 SPDK_ERRLOG("Unable to allocate alias\n"); 1943 return -ENOMEM; 1944 } 1945 1946 TAILQ_INSERT_TAIL(&bdev->aliases, tmp, tailq); 1947 1948 return 0; 1949 } 1950 1951 int 1952 spdk_bdev_alias_del(struct spdk_bdev *bdev, const char *alias) 1953 { 1954 struct spdk_bdev_alias *tmp; 1955 1956 TAILQ_FOREACH(tmp, &bdev->aliases, tailq) { 1957 if (strcmp(alias, tmp->alias) == 0) { 1958 TAILQ_REMOVE(&bdev->aliases, tmp, tailq); 1959 free(tmp->alias); 1960 free(tmp); 1961 return 0; 1962 } 1963 } 1964 1965 SPDK_INFOLOG(SPDK_LOG_BDEV, "Alias %s does not exists\n", alias); 1966 1967 return -ENOENT; 1968 } 1969 1970 void 1971 spdk_bdev_alias_del_all(struct spdk_bdev *bdev) 1972 { 1973 struct spdk_bdev_alias *p, *tmp; 1974 1975 TAILQ_FOREACH_SAFE(p, &bdev->aliases, tailq, tmp) { 1976 TAILQ_REMOVE(&bdev->aliases, p, tailq); 1977 free(p->alias); 1978 free(p); 1979 } 1980 } 1981 1982 struct spdk_io_channel * 1983 spdk_bdev_get_io_channel(struct spdk_bdev_desc *desc) 1984 { 1985 return spdk_get_io_channel(__bdev_to_io_dev(desc->bdev)); 1986 } 1987 1988 const char * 1989 spdk_bdev_get_name(const struct spdk_bdev *bdev) 1990 { 1991 return bdev->name; 1992 } 1993 1994 const char * 1995 spdk_bdev_get_product_name(const struct spdk_bdev *bdev) 1996 { 1997 return bdev->product_name; 1998 } 1999 2000 const struct spdk_bdev_aliases_list * 2001 spdk_bdev_get_aliases(const struct spdk_bdev *bdev) 2002 { 2003 return &bdev->aliases; 2004 } 2005 2006 uint32_t 2007 spdk_bdev_get_block_size(const struct spdk_bdev *bdev) 2008 { 2009 return bdev->blocklen; 2010 } 2011 2012 uint64_t 2013 spdk_bdev_get_num_blocks(const struct spdk_bdev *bdev) 2014 { 2015 return bdev->blockcnt; 2016 } 2017 2018 const char * 2019 spdk_bdev_get_qos_rpc_type(enum spdk_bdev_qos_rate_limit_type type) 2020 { 2021 return qos_rpc_type[type]; 2022 } 2023 2024 void 2025 spdk_bdev_get_qos_rate_limits(struct spdk_bdev *bdev, uint64_t *limits) 2026 { 2027 int i; 2028 2029 memset(limits, 0, sizeof(*limits) * SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES); 2030 2031 pthread_mutex_lock(&bdev->internal.mutex); 2032 if (bdev->internal.qos) { 2033 for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) { 2034 if (bdev->internal.qos->rate_limits[i].limit != 2035 SPDK_BDEV_QOS_LIMIT_NOT_DEFINED) { 2036 limits[i] = bdev->internal.qos->rate_limits[i].limit; 2037 if (_spdk_bdev_qos_is_iops_rate_limit(i) == false) { 2038 /* Change from Byte to Megabyte which is user visible. */ 2039 limits[i] = limits[i] / 1024 / 1024; 2040 } 2041 } 2042 } 2043 } 2044 pthread_mutex_unlock(&bdev->internal.mutex); 2045 } 2046 2047 size_t 2048 spdk_bdev_get_buf_align(const struct spdk_bdev *bdev) 2049 { 2050 return 1 << bdev->required_alignment; 2051 } 2052 2053 uint32_t 2054 spdk_bdev_get_optimal_io_boundary(const struct spdk_bdev *bdev) 2055 { 2056 return bdev->optimal_io_boundary; 2057 } 2058 2059 bool 2060 spdk_bdev_has_write_cache(const struct spdk_bdev *bdev) 2061 { 2062 return bdev->write_cache; 2063 } 2064 2065 const struct spdk_uuid * 2066 spdk_bdev_get_uuid(const struct spdk_bdev *bdev) 2067 { 2068 return &bdev->uuid; 2069 } 2070 2071 uint64_t 2072 spdk_bdev_get_qd(const struct spdk_bdev *bdev) 2073 { 2074 return bdev->internal.measured_queue_depth; 2075 } 2076 2077 uint64_t 2078 spdk_bdev_get_qd_sampling_period(const struct spdk_bdev *bdev) 2079 { 2080 return bdev->internal.period; 2081 } 2082 2083 uint64_t 2084 spdk_bdev_get_weighted_io_time(const struct spdk_bdev *bdev) 2085 { 2086 return bdev->internal.weighted_io_time; 2087 } 2088 2089 uint64_t 2090 spdk_bdev_get_io_time(const struct spdk_bdev *bdev) 2091 { 2092 return bdev->internal.io_time; 2093 } 2094 2095 static void 2096 _calculate_measured_qd_cpl(struct spdk_io_channel_iter *i, int status) 2097 { 2098 struct spdk_bdev *bdev = spdk_io_channel_iter_get_ctx(i); 2099 2100 bdev->internal.measured_queue_depth = bdev->internal.temporary_queue_depth; 2101 2102 if (bdev->internal.measured_queue_depth) { 2103 bdev->internal.io_time += bdev->internal.period; 2104 bdev->internal.weighted_io_time += bdev->internal.period * bdev->internal.measured_queue_depth; 2105 } 2106 } 2107 2108 static void 2109 _calculate_measured_qd(struct spdk_io_channel_iter *i) 2110 { 2111 struct spdk_bdev *bdev = spdk_io_channel_iter_get_ctx(i); 2112 struct spdk_io_channel *io_ch = spdk_io_channel_iter_get_channel(i); 2113 struct spdk_bdev_channel *ch = spdk_io_channel_get_ctx(io_ch); 2114 2115 bdev->internal.temporary_queue_depth += ch->io_outstanding; 2116 spdk_for_each_channel_continue(i, 0); 2117 } 2118 2119 static int 2120 spdk_bdev_calculate_measured_queue_depth(void *ctx) 2121 { 2122 struct spdk_bdev *bdev = ctx; 2123 bdev->internal.temporary_queue_depth = 0; 2124 spdk_for_each_channel(__bdev_to_io_dev(bdev), _calculate_measured_qd, bdev, 2125 _calculate_measured_qd_cpl); 2126 return 0; 2127 } 2128 2129 void 2130 spdk_bdev_set_qd_sampling_period(struct spdk_bdev *bdev, uint64_t period) 2131 { 2132 bdev->internal.period = period; 2133 2134 if (bdev->internal.qd_poller != NULL) { 2135 spdk_poller_unregister(&bdev->internal.qd_poller); 2136 bdev->internal.measured_queue_depth = UINT64_MAX; 2137 } 2138 2139 if (period != 0) { 2140 bdev->internal.qd_poller = spdk_poller_register(spdk_bdev_calculate_measured_queue_depth, bdev, 2141 period); 2142 } 2143 } 2144 2145 int 2146 spdk_bdev_notify_blockcnt_change(struct spdk_bdev *bdev, uint64_t size) 2147 { 2148 int ret; 2149 2150 pthread_mutex_lock(&bdev->internal.mutex); 2151 2152 /* bdev has open descriptors */ 2153 if (!TAILQ_EMPTY(&bdev->internal.open_descs) && 2154 bdev->blockcnt > size) { 2155 ret = -EBUSY; 2156 } else { 2157 bdev->blockcnt = size; 2158 ret = 0; 2159 } 2160 2161 pthread_mutex_unlock(&bdev->internal.mutex); 2162 2163 return ret; 2164 } 2165 2166 /* 2167 * Convert I/O offset and length from bytes to blocks. 2168 * 2169 * Returns zero on success or non-zero if the byte parameters aren't divisible by the block size. 2170 */ 2171 static uint64_t 2172 spdk_bdev_bytes_to_blocks(struct spdk_bdev *bdev, uint64_t offset_bytes, uint64_t *offset_blocks, 2173 uint64_t num_bytes, uint64_t *num_blocks) 2174 { 2175 uint32_t block_size = bdev->blocklen; 2176 2177 *offset_blocks = offset_bytes / block_size; 2178 *num_blocks = num_bytes / block_size; 2179 2180 return (offset_bytes % block_size) | (num_bytes % block_size); 2181 } 2182 2183 static bool 2184 spdk_bdev_io_valid_blocks(struct spdk_bdev *bdev, uint64_t offset_blocks, uint64_t num_blocks) 2185 { 2186 /* Return failure if offset_blocks + num_blocks is less than offset_blocks; indicates there 2187 * has been an overflow and hence the offset has been wrapped around */ 2188 if (offset_blocks + num_blocks < offset_blocks) { 2189 return false; 2190 } 2191 2192 /* Return failure if offset_blocks + num_blocks exceeds the size of the bdev */ 2193 if (offset_blocks + num_blocks > bdev->blockcnt) { 2194 return false; 2195 } 2196 2197 return true; 2198 } 2199 2200 int 2201 spdk_bdev_read(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 2202 void *buf, uint64_t offset, uint64_t nbytes, 2203 spdk_bdev_io_completion_cb cb, void *cb_arg) 2204 { 2205 uint64_t offset_blocks, num_blocks; 2206 2207 if (spdk_bdev_bytes_to_blocks(desc->bdev, offset, &offset_blocks, nbytes, &num_blocks) != 0) { 2208 return -EINVAL; 2209 } 2210 2211 return spdk_bdev_read_blocks(desc, ch, buf, offset_blocks, num_blocks, cb, cb_arg); 2212 } 2213 2214 int 2215 spdk_bdev_read_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 2216 void *buf, uint64_t offset_blocks, uint64_t num_blocks, 2217 spdk_bdev_io_completion_cb cb, void *cb_arg) 2218 { 2219 struct spdk_bdev *bdev = desc->bdev; 2220 struct spdk_bdev_io *bdev_io; 2221 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 2222 2223 if (!spdk_bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) { 2224 return -EINVAL; 2225 } 2226 2227 bdev_io = spdk_bdev_get_io(channel); 2228 if (!bdev_io) { 2229 return -ENOMEM; 2230 } 2231 2232 bdev_io->internal.ch = channel; 2233 bdev_io->internal.desc = desc; 2234 bdev_io->type = SPDK_BDEV_IO_TYPE_READ; 2235 bdev_io->u.bdev.iovs = &bdev_io->iov; 2236 bdev_io->u.bdev.iovs[0].iov_base = buf; 2237 bdev_io->u.bdev.iovs[0].iov_len = num_blocks * bdev->blocklen; 2238 bdev_io->u.bdev.iovcnt = 1; 2239 bdev_io->u.bdev.num_blocks = num_blocks; 2240 bdev_io->u.bdev.offset_blocks = offset_blocks; 2241 spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb); 2242 2243 spdk_bdev_io_submit(bdev_io); 2244 return 0; 2245 } 2246 2247 int 2248 spdk_bdev_readv(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 2249 struct iovec *iov, int iovcnt, 2250 uint64_t offset, uint64_t nbytes, 2251 spdk_bdev_io_completion_cb cb, void *cb_arg) 2252 { 2253 uint64_t offset_blocks, num_blocks; 2254 2255 if (spdk_bdev_bytes_to_blocks(desc->bdev, offset, &offset_blocks, nbytes, &num_blocks) != 0) { 2256 return -EINVAL; 2257 } 2258 2259 return spdk_bdev_readv_blocks(desc, ch, iov, iovcnt, offset_blocks, num_blocks, cb, cb_arg); 2260 } 2261 2262 int spdk_bdev_readv_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 2263 struct iovec *iov, int iovcnt, 2264 uint64_t offset_blocks, uint64_t num_blocks, 2265 spdk_bdev_io_completion_cb cb, void *cb_arg) 2266 { 2267 struct spdk_bdev *bdev = desc->bdev; 2268 struct spdk_bdev_io *bdev_io; 2269 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 2270 2271 if (!spdk_bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) { 2272 return -EINVAL; 2273 } 2274 2275 bdev_io = spdk_bdev_get_io(channel); 2276 if (!bdev_io) { 2277 return -ENOMEM; 2278 } 2279 2280 bdev_io->internal.ch = channel; 2281 bdev_io->internal.desc = desc; 2282 bdev_io->type = SPDK_BDEV_IO_TYPE_READ; 2283 bdev_io->u.bdev.iovs = iov; 2284 bdev_io->u.bdev.iovcnt = iovcnt; 2285 bdev_io->u.bdev.num_blocks = num_blocks; 2286 bdev_io->u.bdev.offset_blocks = offset_blocks; 2287 spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb); 2288 2289 spdk_bdev_io_submit(bdev_io); 2290 return 0; 2291 } 2292 2293 int 2294 spdk_bdev_write(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 2295 void *buf, uint64_t offset, uint64_t nbytes, 2296 spdk_bdev_io_completion_cb cb, void *cb_arg) 2297 { 2298 uint64_t offset_blocks, num_blocks; 2299 2300 if (spdk_bdev_bytes_to_blocks(desc->bdev, offset, &offset_blocks, nbytes, &num_blocks) != 0) { 2301 return -EINVAL; 2302 } 2303 2304 return spdk_bdev_write_blocks(desc, ch, buf, offset_blocks, num_blocks, cb, cb_arg); 2305 } 2306 2307 int 2308 spdk_bdev_write_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 2309 void *buf, uint64_t offset_blocks, uint64_t num_blocks, 2310 spdk_bdev_io_completion_cb cb, void *cb_arg) 2311 { 2312 struct spdk_bdev *bdev = desc->bdev; 2313 struct spdk_bdev_io *bdev_io; 2314 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 2315 2316 if (!desc->write) { 2317 return -EBADF; 2318 } 2319 2320 if (!spdk_bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) { 2321 return -EINVAL; 2322 } 2323 2324 bdev_io = spdk_bdev_get_io(channel); 2325 if (!bdev_io) { 2326 return -ENOMEM; 2327 } 2328 2329 bdev_io->internal.ch = channel; 2330 bdev_io->internal.desc = desc; 2331 bdev_io->type = SPDK_BDEV_IO_TYPE_WRITE; 2332 bdev_io->u.bdev.iovs = &bdev_io->iov; 2333 bdev_io->u.bdev.iovs[0].iov_base = buf; 2334 bdev_io->u.bdev.iovs[0].iov_len = num_blocks * bdev->blocklen; 2335 bdev_io->u.bdev.iovcnt = 1; 2336 bdev_io->u.bdev.num_blocks = num_blocks; 2337 bdev_io->u.bdev.offset_blocks = offset_blocks; 2338 spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb); 2339 2340 spdk_bdev_io_submit(bdev_io); 2341 return 0; 2342 } 2343 2344 int 2345 spdk_bdev_writev(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 2346 struct iovec *iov, int iovcnt, 2347 uint64_t offset, uint64_t len, 2348 spdk_bdev_io_completion_cb cb, void *cb_arg) 2349 { 2350 uint64_t offset_blocks, num_blocks; 2351 2352 if (spdk_bdev_bytes_to_blocks(desc->bdev, offset, &offset_blocks, len, &num_blocks) != 0) { 2353 return -EINVAL; 2354 } 2355 2356 return spdk_bdev_writev_blocks(desc, ch, iov, iovcnt, offset_blocks, num_blocks, cb, cb_arg); 2357 } 2358 2359 int 2360 spdk_bdev_writev_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 2361 struct iovec *iov, int iovcnt, 2362 uint64_t offset_blocks, uint64_t num_blocks, 2363 spdk_bdev_io_completion_cb cb, void *cb_arg) 2364 { 2365 struct spdk_bdev *bdev = desc->bdev; 2366 struct spdk_bdev_io *bdev_io; 2367 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 2368 2369 if (!desc->write) { 2370 return -EBADF; 2371 } 2372 2373 if (!spdk_bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) { 2374 return -EINVAL; 2375 } 2376 2377 bdev_io = spdk_bdev_get_io(channel); 2378 if (!bdev_io) { 2379 return -ENOMEM; 2380 } 2381 2382 bdev_io->internal.ch = channel; 2383 bdev_io->internal.desc = desc; 2384 bdev_io->type = SPDK_BDEV_IO_TYPE_WRITE; 2385 bdev_io->u.bdev.iovs = iov; 2386 bdev_io->u.bdev.iovcnt = iovcnt; 2387 bdev_io->u.bdev.num_blocks = num_blocks; 2388 bdev_io->u.bdev.offset_blocks = offset_blocks; 2389 spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb); 2390 2391 spdk_bdev_io_submit(bdev_io); 2392 return 0; 2393 } 2394 2395 int 2396 spdk_bdev_write_zeroes(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 2397 uint64_t offset, uint64_t len, 2398 spdk_bdev_io_completion_cb cb, void *cb_arg) 2399 { 2400 uint64_t offset_blocks, num_blocks; 2401 2402 if (spdk_bdev_bytes_to_blocks(desc->bdev, offset, &offset_blocks, len, &num_blocks) != 0) { 2403 return -EINVAL; 2404 } 2405 2406 return spdk_bdev_write_zeroes_blocks(desc, ch, offset_blocks, num_blocks, cb, cb_arg); 2407 } 2408 2409 int 2410 spdk_bdev_write_zeroes_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 2411 uint64_t offset_blocks, uint64_t num_blocks, 2412 spdk_bdev_io_completion_cb cb, void *cb_arg) 2413 { 2414 struct spdk_bdev *bdev = desc->bdev; 2415 struct spdk_bdev_io *bdev_io; 2416 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 2417 2418 if (!desc->write) { 2419 return -EBADF; 2420 } 2421 2422 if (!spdk_bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) { 2423 return -EINVAL; 2424 } 2425 2426 bdev_io = spdk_bdev_get_io(channel); 2427 2428 if (!bdev_io) { 2429 return -ENOMEM; 2430 } 2431 2432 bdev_io->type = SPDK_BDEV_IO_TYPE_WRITE_ZEROES; 2433 bdev_io->internal.ch = channel; 2434 bdev_io->internal.desc = desc; 2435 bdev_io->u.bdev.offset_blocks = offset_blocks; 2436 bdev_io->u.bdev.num_blocks = num_blocks; 2437 spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb); 2438 2439 if (_spdk_bdev_io_type_supported(bdev, SPDK_BDEV_IO_TYPE_WRITE_ZEROES)) { 2440 spdk_bdev_io_submit(bdev_io); 2441 return 0; 2442 } else if (_spdk_bdev_io_type_supported(bdev, SPDK_BDEV_IO_TYPE_WRITE)) { 2443 assert(spdk_bdev_get_block_size(bdev) <= ZERO_BUFFER_SIZE); 2444 bdev_io->u.bdev.split_remaining_num_blocks = num_blocks; 2445 bdev_io->u.bdev.split_current_offset_blocks = offset_blocks; 2446 _spdk_bdev_write_zero_buffer_next(bdev_io); 2447 return 0; 2448 } else { 2449 spdk_bdev_free_io(bdev_io); 2450 return -ENOTSUP; 2451 } 2452 } 2453 2454 int 2455 spdk_bdev_unmap(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 2456 uint64_t offset, uint64_t nbytes, 2457 spdk_bdev_io_completion_cb cb, void *cb_arg) 2458 { 2459 uint64_t offset_blocks, num_blocks; 2460 2461 if (spdk_bdev_bytes_to_blocks(desc->bdev, offset, &offset_blocks, nbytes, &num_blocks) != 0) { 2462 return -EINVAL; 2463 } 2464 2465 return spdk_bdev_unmap_blocks(desc, ch, offset_blocks, num_blocks, cb, cb_arg); 2466 } 2467 2468 int 2469 spdk_bdev_unmap_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 2470 uint64_t offset_blocks, uint64_t num_blocks, 2471 spdk_bdev_io_completion_cb cb, void *cb_arg) 2472 { 2473 struct spdk_bdev *bdev = desc->bdev; 2474 struct spdk_bdev_io *bdev_io; 2475 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 2476 2477 if (!desc->write) { 2478 return -EBADF; 2479 } 2480 2481 if (!spdk_bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) { 2482 return -EINVAL; 2483 } 2484 2485 if (num_blocks == 0) { 2486 SPDK_ERRLOG("Can't unmap 0 bytes\n"); 2487 return -EINVAL; 2488 } 2489 2490 bdev_io = spdk_bdev_get_io(channel); 2491 if (!bdev_io) { 2492 return -ENOMEM; 2493 } 2494 2495 bdev_io->internal.ch = channel; 2496 bdev_io->internal.desc = desc; 2497 bdev_io->type = SPDK_BDEV_IO_TYPE_UNMAP; 2498 2499 bdev_io->u.bdev.iovs = &bdev_io->iov; 2500 bdev_io->u.bdev.iovs[0].iov_base = NULL; 2501 bdev_io->u.bdev.iovs[0].iov_len = 0; 2502 bdev_io->u.bdev.iovcnt = 1; 2503 2504 bdev_io->u.bdev.offset_blocks = offset_blocks; 2505 bdev_io->u.bdev.num_blocks = num_blocks; 2506 spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb); 2507 2508 spdk_bdev_io_submit(bdev_io); 2509 return 0; 2510 } 2511 2512 int 2513 spdk_bdev_flush(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 2514 uint64_t offset, uint64_t length, 2515 spdk_bdev_io_completion_cb cb, void *cb_arg) 2516 { 2517 uint64_t offset_blocks, num_blocks; 2518 2519 if (spdk_bdev_bytes_to_blocks(desc->bdev, offset, &offset_blocks, length, &num_blocks) != 0) { 2520 return -EINVAL; 2521 } 2522 2523 return spdk_bdev_flush_blocks(desc, ch, offset_blocks, num_blocks, cb, cb_arg); 2524 } 2525 2526 int 2527 spdk_bdev_flush_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 2528 uint64_t offset_blocks, uint64_t num_blocks, 2529 spdk_bdev_io_completion_cb cb, void *cb_arg) 2530 { 2531 struct spdk_bdev *bdev = desc->bdev; 2532 struct spdk_bdev_io *bdev_io; 2533 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 2534 2535 if (!desc->write) { 2536 return -EBADF; 2537 } 2538 2539 if (!spdk_bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) { 2540 return -EINVAL; 2541 } 2542 2543 bdev_io = spdk_bdev_get_io(channel); 2544 if (!bdev_io) { 2545 return -ENOMEM; 2546 } 2547 2548 bdev_io->internal.ch = channel; 2549 bdev_io->internal.desc = desc; 2550 bdev_io->type = SPDK_BDEV_IO_TYPE_FLUSH; 2551 bdev_io->u.bdev.iovs = NULL; 2552 bdev_io->u.bdev.iovcnt = 0; 2553 bdev_io->u.bdev.offset_blocks = offset_blocks; 2554 bdev_io->u.bdev.num_blocks = num_blocks; 2555 spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb); 2556 2557 spdk_bdev_io_submit(bdev_io); 2558 return 0; 2559 } 2560 2561 static void 2562 _spdk_bdev_reset_dev(struct spdk_io_channel_iter *i, int status) 2563 { 2564 struct spdk_bdev_channel *ch = spdk_io_channel_iter_get_ctx(i); 2565 struct spdk_bdev_io *bdev_io; 2566 2567 bdev_io = TAILQ_FIRST(&ch->queued_resets); 2568 TAILQ_REMOVE(&ch->queued_resets, bdev_io, internal.link); 2569 spdk_bdev_io_submit_reset(bdev_io); 2570 } 2571 2572 static void 2573 _spdk_bdev_reset_freeze_channel(struct spdk_io_channel_iter *i) 2574 { 2575 struct spdk_io_channel *ch; 2576 struct spdk_bdev_channel *channel; 2577 struct spdk_bdev_mgmt_channel *mgmt_channel; 2578 struct spdk_bdev_shared_resource *shared_resource; 2579 bdev_io_tailq_t tmp_queued; 2580 2581 TAILQ_INIT(&tmp_queued); 2582 2583 ch = spdk_io_channel_iter_get_channel(i); 2584 channel = spdk_io_channel_get_ctx(ch); 2585 shared_resource = channel->shared_resource; 2586 mgmt_channel = shared_resource->mgmt_ch; 2587 2588 channel->flags |= BDEV_CH_RESET_IN_PROGRESS; 2589 2590 if ((channel->flags & BDEV_CH_QOS_ENABLED) != 0) { 2591 /* The QoS object is always valid and readable while 2592 * the channel flag is set, so the lock here should not 2593 * be necessary. We're not in the fast path though, so 2594 * just take it anyway. */ 2595 pthread_mutex_lock(&channel->bdev->internal.mutex); 2596 if (channel->bdev->internal.qos->ch == channel) { 2597 TAILQ_SWAP(&channel->bdev->internal.qos->queued, &tmp_queued, spdk_bdev_io, internal.link); 2598 } 2599 pthread_mutex_unlock(&channel->bdev->internal.mutex); 2600 } 2601 2602 _spdk_bdev_abort_queued_io(&shared_resource->nomem_io, channel); 2603 _spdk_bdev_abort_buf_io(&mgmt_channel->need_buf_small, channel); 2604 _spdk_bdev_abort_buf_io(&mgmt_channel->need_buf_large, channel); 2605 _spdk_bdev_abort_queued_io(&tmp_queued, channel); 2606 2607 spdk_for_each_channel_continue(i, 0); 2608 } 2609 2610 static void 2611 _spdk_bdev_start_reset(void *ctx) 2612 { 2613 struct spdk_bdev_channel *ch = ctx; 2614 2615 spdk_for_each_channel(__bdev_to_io_dev(ch->bdev), _spdk_bdev_reset_freeze_channel, 2616 ch, _spdk_bdev_reset_dev); 2617 } 2618 2619 static void 2620 _spdk_bdev_channel_start_reset(struct spdk_bdev_channel *ch) 2621 { 2622 struct spdk_bdev *bdev = ch->bdev; 2623 2624 assert(!TAILQ_EMPTY(&ch->queued_resets)); 2625 2626 pthread_mutex_lock(&bdev->internal.mutex); 2627 if (bdev->internal.reset_in_progress == NULL) { 2628 bdev->internal.reset_in_progress = TAILQ_FIRST(&ch->queued_resets); 2629 /* 2630 * Take a channel reference for the target bdev for the life of this 2631 * reset. This guards against the channel getting destroyed while 2632 * spdk_for_each_channel() calls related to this reset IO are in 2633 * progress. We will release the reference when this reset is 2634 * completed. 2635 */ 2636 bdev->internal.reset_in_progress->u.reset.ch_ref = spdk_get_io_channel(__bdev_to_io_dev(bdev)); 2637 _spdk_bdev_start_reset(ch); 2638 } 2639 pthread_mutex_unlock(&bdev->internal.mutex); 2640 } 2641 2642 int 2643 spdk_bdev_reset(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 2644 spdk_bdev_io_completion_cb cb, void *cb_arg) 2645 { 2646 struct spdk_bdev *bdev = desc->bdev; 2647 struct spdk_bdev_io *bdev_io; 2648 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 2649 2650 bdev_io = spdk_bdev_get_io(channel); 2651 if (!bdev_io) { 2652 return -ENOMEM; 2653 } 2654 2655 bdev_io->internal.ch = channel; 2656 bdev_io->internal.desc = desc; 2657 bdev_io->type = SPDK_BDEV_IO_TYPE_RESET; 2658 bdev_io->u.reset.ch_ref = NULL; 2659 spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb); 2660 2661 pthread_mutex_lock(&bdev->internal.mutex); 2662 TAILQ_INSERT_TAIL(&channel->queued_resets, bdev_io, internal.link); 2663 pthread_mutex_unlock(&bdev->internal.mutex); 2664 2665 _spdk_bdev_channel_start_reset(channel); 2666 2667 return 0; 2668 } 2669 2670 void 2671 spdk_bdev_get_io_stat(struct spdk_bdev *bdev, struct spdk_io_channel *ch, 2672 struct spdk_bdev_io_stat *stat) 2673 { 2674 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 2675 2676 *stat = channel->stat; 2677 } 2678 2679 static void 2680 _spdk_bdev_get_device_stat_done(struct spdk_io_channel_iter *i, int status) 2681 { 2682 void *io_device = spdk_io_channel_iter_get_io_device(i); 2683 struct spdk_bdev_iostat_ctx *bdev_iostat_ctx = spdk_io_channel_iter_get_ctx(i); 2684 2685 bdev_iostat_ctx->cb(__bdev_from_io_dev(io_device), bdev_iostat_ctx->stat, 2686 bdev_iostat_ctx->cb_arg, 0); 2687 free(bdev_iostat_ctx); 2688 } 2689 2690 static void 2691 _spdk_bdev_get_each_channel_stat(struct spdk_io_channel_iter *i) 2692 { 2693 struct spdk_bdev_iostat_ctx *bdev_iostat_ctx = spdk_io_channel_iter_get_ctx(i); 2694 struct spdk_io_channel *ch = spdk_io_channel_iter_get_channel(i); 2695 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 2696 2697 _spdk_bdev_io_stat_add(bdev_iostat_ctx->stat, &channel->stat); 2698 spdk_for_each_channel_continue(i, 0); 2699 } 2700 2701 void 2702 spdk_bdev_get_device_stat(struct spdk_bdev *bdev, struct spdk_bdev_io_stat *stat, 2703 spdk_bdev_get_device_stat_cb cb, void *cb_arg) 2704 { 2705 struct spdk_bdev_iostat_ctx *bdev_iostat_ctx; 2706 2707 assert(bdev != NULL); 2708 assert(stat != NULL); 2709 assert(cb != NULL); 2710 2711 bdev_iostat_ctx = calloc(1, sizeof(struct spdk_bdev_iostat_ctx)); 2712 if (bdev_iostat_ctx == NULL) { 2713 SPDK_ERRLOG("Unable to allocate memory for spdk_bdev_iostat_ctx\n"); 2714 cb(bdev, stat, cb_arg, -ENOMEM); 2715 return; 2716 } 2717 2718 bdev_iostat_ctx->stat = stat; 2719 bdev_iostat_ctx->cb = cb; 2720 bdev_iostat_ctx->cb_arg = cb_arg; 2721 2722 /* Start with the statistics from previously deleted channels. */ 2723 pthread_mutex_lock(&bdev->internal.mutex); 2724 _spdk_bdev_io_stat_add(bdev_iostat_ctx->stat, &bdev->internal.stat); 2725 pthread_mutex_unlock(&bdev->internal.mutex); 2726 2727 /* Then iterate and add the statistics from each existing channel. */ 2728 spdk_for_each_channel(__bdev_to_io_dev(bdev), 2729 _spdk_bdev_get_each_channel_stat, 2730 bdev_iostat_ctx, 2731 _spdk_bdev_get_device_stat_done); 2732 } 2733 2734 int 2735 spdk_bdev_nvme_admin_passthru(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 2736 const struct spdk_nvme_cmd *cmd, void *buf, size_t nbytes, 2737 spdk_bdev_io_completion_cb cb, void *cb_arg) 2738 { 2739 struct spdk_bdev *bdev = desc->bdev; 2740 struct spdk_bdev_io *bdev_io; 2741 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 2742 2743 if (!desc->write) { 2744 return -EBADF; 2745 } 2746 2747 bdev_io = spdk_bdev_get_io(channel); 2748 if (!bdev_io) { 2749 return -ENOMEM; 2750 } 2751 2752 bdev_io->internal.ch = channel; 2753 bdev_io->internal.desc = desc; 2754 bdev_io->type = SPDK_BDEV_IO_TYPE_NVME_ADMIN; 2755 bdev_io->u.nvme_passthru.cmd = *cmd; 2756 bdev_io->u.nvme_passthru.buf = buf; 2757 bdev_io->u.nvme_passthru.nbytes = nbytes; 2758 bdev_io->u.nvme_passthru.md_buf = NULL; 2759 bdev_io->u.nvme_passthru.md_len = 0; 2760 2761 spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb); 2762 2763 spdk_bdev_io_submit(bdev_io); 2764 return 0; 2765 } 2766 2767 int 2768 spdk_bdev_nvme_io_passthru(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 2769 const struct spdk_nvme_cmd *cmd, void *buf, size_t nbytes, 2770 spdk_bdev_io_completion_cb cb, void *cb_arg) 2771 { 2772 struct spdk_bdev *bdev = desc->bdev; 2773 struct spdk_bdev_io *bdev_io; 2774 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 2775 2776 if (!desc->write) { 2777 /* 2778 * Do not try to parse the NVMe command - we could maybe use bits in the opcode 2779 * to easily determine if the command is a read or write, but for now just 2780 * do not allow io_passthru with a read-only descriptor. 2781 */ 2782 return -EBADF; 2783 } 2784 2785 bdev_io = spdk_bdev_get_io(channel); 2786 if (!bdev_io) { 2787 return -ENOMEM; 2788 } 2789 2790 bdev_io->internal.ch = channel; 2791 bdev_io->internal.desc = desc; 2792 bdev_io->type = SPDK_BDEV_IO_TYPE_NVME_IO; 2793 bdev_io->u.nvme_passthru.cmd = *cmd; 2794 bdev_io->u.nvme_passthru.buf = buf; 2795 bdev_io->u.nvme_passthru.nbytes = nbytes; 2796 bdev_io->u.nvme_passthru.md_buf = NULL; 2797 bdev_io->u.nvme_passthru.md_len = 0; 2798 2799 spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb); 2800 2801 spdk_bdev_io_submit(bdev_io); 2802 return 0; 2803 } 2804 2805 int 2806 spdk_bdev_nvme_io_passthru_md(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 2807 const struct spdk_nvme_cmd *cmd, void *buf, size_t nbytes, void *md_buf, size_t md_len, 2808 spdk_bdev_io_completion_cb cb, void *cb_arg) 2809 { 2810 struct spdk_bdev *bdev = desc->bdev; 2811 struct spdk_bdev_io *bdev_io; 2812 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 2813 2814 if (!desc->write) { 2815 /* 2816 * Do not try to parse the NVMe command - we could maybe use bits in the opcode 2817 * to easily determine if the command is a read or write, but for now just 2818 * do not allow io_passthru with a read-only descriptor. 2819 */ 2820 return -EBADF; 2821 } 2822 2823 bdev_io = spdk_bdev_get_io(channel); 2824 if (!bdev_io) { 2825 return -ENOMEM; 2826 } 2827 2828 bdev_io->internal.ch = channel; 2829 bdev_io->internal.desc = desc; 2830 bdev_io->type = SPDK_BDEV_IO_TYPE_NVME_IO_MD; 2831 bdev_io->u.nvme_passthru.cmd = *cmd; 2832 bdev_io->u.nvme_passthru.buf = buf; 2833 bdev_io->u.nvme_passthru.nbytes = nbytes; 2834 bdev_io->u.nvme_passthru.md_buf = md_buf; 2835 bdev_io->u.nvme_passthru.md_len = md_len; 2836 2837 spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb); 2838 2839 spdk_bdev_io_submit(bdev_io); 2840 return 0; 2841 } 2842 2843 int 2844 spdk_bdev_queue_io_wait(struct spdk_bdev *bdev, struct spdk_io_channel *ch, 2845 struct spdk_bdev_io_wait_entry *entry) 2846 { 2847 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 2848 struct spdk_bdev_mgmt_channel *mgmt_ch = channel->shared_resource->mgmt_ch; 2849 2850 if (bdev != entry->bdev) { 2851 SPDK_ERRLOG("bdevs do not match\n"); 2852 return -EINVAL; 2853 } 2854 2855 if (mgmt_ch->per_thread_cache_count > 0) { 2856 SPDK_ERRLOG("Cannot queue io_wait if spdk_bdev_io available in per-thread cache\n"); 2857 return -EINVAL; 2858 } 2859 2860 TAILQ_INSERT_TAIL(&mgmt_ch->io_wait_queue, entry, link); 2861 return 0; 2862 } 2863 2864 static void 2865 _spdk_bdev_ch_retry_io(struct spdk_bdev_channel *bdev_ch) 2866 { 2867 struct spdk_bdev *bdev = bdev_ch->bdev; 2868 struct spdk_bdev_shared_resource *shared_resource = bdev_ch->shared_resource; 2869 struct spdk_bdev_io *bdev_io; 2870 2871 if (shared_resource->io_outstanding > shared_resource->nomem_threshold) { 2872 /* 2873 * Allow some more I/O to complete before retrying the nomem_io queue. 2874 * Some drivers (such as nvme) cannot immediately take a new I/O in 2875 * the context of a completion, because the resources for the I/O are 2876 * not released until control returns to the bdev poller. Also, we 2877 * may require several small I/O to complete before a larger I/O 2878 * (that requires splitting) can be submitted. 2879 */ 2880 return; 2881 } 2882 2883 while (!TAILQ_EMPTY(&shared_resource->nomem_io)) { 2884 bdev_io = TAILQ_FIRST(&shared_resource->nomem_io); 2885 TAILQ_REMOVE(&shared_resource->nomem_io, bdev_io, internal.link); 2886 bdev_io->internal.ch->io_outstanding++; 2887 shared_resource->io_outstanding++; 2888 bdev_io->internal.status = SPDK_BDEV_IO_STATUS_PENDING; 2889 bdev->fn_table->submit_request(bdev_io->internal.ch->channel, bdev_io); 2890 if (bdev_io->internal.status == SPDK_BDEV_IO_STATUS_NOMEM) { 2891 break; 2892 } 2893 } 2894 } 2895 2896 static inline void 2897 _spdk_bdev_io_complete(void *ctx) 2898 { 2899 struct spdk_bdev_io *bdev_io = ctx; 2900 uint64_t tsc; 2901 2902 if (spdk_unlikely(bdev_io->internal.in_submit_request || bdev_io->internal.io_submit_ch)) { 2903 /* 2904 * Send the completion to the thread that originally submitted the I/O, 2905 * which may not be the current thread in the case of QoS. 2906 */ 2907 if (bdev_io->internal.io_submit_ch) { 2908 bdev_io->internal.ch = bdev_io->internal.io_submit_ch; 2909 bdev_io->internal.io_submit_ch = NULL; 2910 } 2911 2912 /* 2913 * Defer completion to avoid potential infinite recursion if the 2914 * user's completion callback issues a new I/O. 2915 */ 2916 spdk_thread_send_msg(spdk_io_channel_get_thread(bdev_io->internal.ch->channel), 2917 _spdk_bdev_io_complete, bdev_io); 2918 return; 2919 } 2920 2921 tsc = spdk_get_ticks(); 2922 spdk_trace_record_tsc(tsc, TRACE_BDEV_IO_DONE, 0, 0, (uintptr_t)bdev_io, 0); 2923 2924 if (bdev_io->internal.status == SPDK_BDEV_IO_STATUS_SUCCESS) { 2925 switch (bdev_io->type) { 2926 case SPDK_BDEV_IO_TYPE_READ: 2927 bdev_io->internal.ch->stat.bytes_read += bdev_io->u.bdev.num_blocks * bdev_io->bdev->blocklen; 2928 bdev_io->internal.ch->stat.num_read_ops++; 2929 bdev_io->internal.ch->stat.read_latency_ticks += (tsc - bdev_io->internal.submit_tsc); 2930 break; 2931 case SPDK_BDEV_IO_TYPE_WRITE: 2932 bdev_io->internal.ch->stat.bytes_written += bdev_io->u.bdev.num_blocks * bdev_io->bdev->blocklen; 2933 bdev_io->internal.ch->stat.num_write_ops++; 2934 bdev_io->internal.ch->stat.write_latency_ticks += (tsc - bdev_io->internal.submit_tsc); 2935 break; 2936 default: 2937 break; 2938 } 2939 } 2940 2941 #ifdef SPDK_CONFIG_VTUNE 2942 uint64_t now_tsc = spdk_get_ticks(); 2943 if (now_tsc > (bdev_io->internal.ch->start_tsc + bdev_io->internal.ch->interval_tsc)) { 2944 uint64_t data[5]; 2945 2946 data[0] = bdev_io->internal.ch->stat.num_read_ops - bdev_io->internal.ch->prev_stat.num_read_ops; 2947 data[1] = bdev_io->internal.ch->stat.bytes_read - bdev_io->internal.ch->prev_stat.bytes_read; 2948 data[2] = bdev_io->internal.ch->stat.num_write_ops - bdev_io->internal.ch->prev_stat.num_write_ops; 2949 data[3] = bdev_io->internal.ch->stat.bytes_written - bdev_io->internal.ch->prev_stat.bytes_written; 2950 data[4] = bdev_io->bdev->fn_table->get_spin_time ? 2951 bdev_io->bdev->fn_table->get_spin_time(bdev_io->internal.ch->channel) : 0; 2952 2953 __itt_metadata_add(g_bdev_mgr.domain, __itt_null, bdev_io->internal.ch->handle, 2954 __itt_metadata_u64, 5, data); 2955 2956 bdev_io->internal.ch->prev_stat = bdev_io->internal.ch->stat; 2957 bdev_io->internal.ch->start_tsc = now_tsc; 2958 } 2959 #endif 2960 2961 assert(bdev_io->internal.cb != NULL); 2962 assert(spdk_get_thread() == spdk_io_channel_get_thread(bdev_io->internal.ch->channel)); 2963 2964 bdev_io->internal.cb(bdev_io, bdev_io->internal.status == SPDK_BDEV_IO_STATUS_SUCCESS, 2965 bdev_io->internal.caller_ctx); 2966 } 2967 2968 static void 2969 _spdk_bdev_reset_complete(struct spdk_io_channel_iter *i, int status) 2970 { 2971 struct spdk_bdev_io *bdev_io = spdk_io_channel_iter_get_ctx(i); 2972 2973 if (bdev_io->u.reset.ch_ref != NULL) { 2974 spdk_put_io_channel(bdev_io->u.reset.ch_ref); 2975 bdev_io->u.reset.ch_ref = NULL; 2976 } 2977 2978 _spdk_bdev_io_complete(bdev_io); 2979 } 2980 2981 static void 2982 _spdk_bdev_unfreeze_channel(struct spdk_io_channel_iter *i) 2983 { 2984 struct spdk_io_channel *_ch = spdk_io_channel_iter_get_channel(i); 2985 struct spdk_bdev_channel *ch = spdk_io_channel_get_ctx(_ch); 2986 2987 ch->flags &= ~BDEV_CH_RESET_IN_PROGRESS; 2988 if (!TAILQ_EMPTY(&ch->queued_resets)) { 2989 _spdk_bdev_channel_start_reset(ch); 2990 } 2991 2992 spdk_for_each_channel_continue(i, 0); 2993 } 2994 2995 void 2996 spdk_bdev_io_complete(struct spdk_bdev_io *bdev_io, enum spdk_bdev_io_status status) 2997 { 2998 struct spdk_bdev *bdev = bdev_io->bdev; 2999 struct spdk_bdev_channel *bdev_ch = bdev_io->internal.ch; 3000 struct spdk_bdev_shared_resource *shared_resource = bdev_ch->shared_resource; 3001 3002 bdev_io->internal.status = status; 3003 3004 if (spdk_unlikely(bdev_io->type == SPDK_BDEV_IO_TYPE_RESET)) { 3005 bool unlock_channels = false; 3006 3007 if (status == SPDK_BDEV_IO_STATUS_NOMEM) { 3008 SPDK_ERRLOG("NOMEM returned for reset\n"); 3009 } 3010 pthread_mutex_lock(&bdev->internal.mutex); 3011 if (bdev_io == bdev->internal.reset_in_progress) { 3012 bdev->internal.reset_in_progress = NULL; 3013 unlock_channels = true; 3014 } 3015 pthread_mutex_unlock(&bdev->internal.mutex); 3016 3017 if (unlock_channels) { 3018 spdk_for_each_channel(__bdev_to_io_dev(bdev), _spdk_bdev_unfreeze_channel, 3019 bdev_io, _spdk_bdev_reset_complete); 3020 return; 3021 } 3022 } else { 3023 assert(bdev_ch->io_outstanding > 0); 3024 assert(shared_resource->io_outstanding > 0); 3025 bdev_ch->io_outstanding--; 3026 shared_resource->io_outstanding--; 3027 3028 if (spdk_unlikely(status == SPDK_BDEV_IO_STATUS_NOMEM)) { 3029 TAILQ_INSERT_HEAD(&shared_resource->nomem_io, bdev_io, internal.link); 3030 /* 3031 * Wait for some of the outstanding I/O to complete before we 3032 * retry any of the nomem_io. Normally we will wait for 3033 * NOMEM_THRESHOLD_COUNT I/O to complete but for low queue 3034 * depth channels we will instead wait for half to complete. 3035 */ 3036 shared_resource->nomem_threshold = spdk_max((int64_t)shared_resource->io_outstanding / 2, 3037 (int64_t)shared_resource->io_outstanding - NOMEM_THRESHOLD_COUNT); 3038 return; 3039 } 3040 3041 if (spdk_unlikely(!TAILQ_EMPTY(&shared_resource->nomem_io))) { 3042 _spdk_bdev_ch_retry_io(bdev_ch); 3043 } 3044 } 3045 3046 _spdk_bdev_io_complete(bdev_io); 3047 } 3048 3049 void 3050 spdk_bdev_io_complete_scsi_status(struct spdk_bdev_io *bdev_io, enum spdk_scsi_status sc, 3051 enum spdk_scsi_sense sk, uint8_t asc, uint8_t ascq) 3052 { 3053 if (sc == SPDK_SCSI_STATUS_GOOD) { 3054 bdev_io->internal.status = SPDK_BDEV_IO_STATUS_SUCCESS; 3055 } else { 3056 bdev_io->internal.status = SPDK_BDEV_IO_STATUS_SCSI_ERROR; 3057 bdev_io->internal.error.scsi.sc = sc; 3058 bdev_io->internal.error.scsi.sk = sk; 3059 bdev_io->internal.error.scsi.asc = asc; 3060 bdev_io->internal.error.scsi.ascq = ascq; 3061 } 3062 3063 spdk_bdev_io_complete(bdev_io, bdev_io->internal.status); 3064 } 3065 3066 void 3067 spdk_bdev_io_get_scsi_status(const struct spdk_bdev_io *bdev_io, 3068 int *sc, int *sk, int *asc, int *ascq) 3069 { 3070 assert(sc != NULL); 3071 assert(sk != NULL); 3072 assert(asc != NULL); 3073 assert(ascq != NULL); 3074 3075 switch (bdev_io->internal.status) { 3076 case SPDK_BDEV_IO_STATUS_SUCCESS: 3077 *sc = SPDK_SCSI_STATUS_GOOD; 3078 *sk = SPDK_SCSI_SENSE_NO_SENSE; 3079 *asc = SPDK_SCSI_ASC_NO_ADDITIONAL_SENSE; 3080 *ascq = SPDK_SCSI_ASCQ_CAUSE_NOT_REPORTABLE; 3081 break; 3082 case SPDK_BDEV_IO_STATUS_NVME_ERROR: 3083 spdk_scsi_nvme_translate(bdev_io, sc, sk, asc, ascq); 3084 break; 3085 case SPDK_BDEV_IO_STATUS_SCSI_ERROR: 3086 *sc = bdev_io->internal.error.scsi.sc; 3087 *sk = bdev_io->internal.error.scsi.sk; 3088 *asc = bdev_io->internal.error.scsi.asc; 3089 *ascq = bdev_io->internal.error.scsi.ascq; 3090 break; 3091 default: 3092 *sc = SPDK_SCSI_STATUS_CHECK_CONDITION; 3093 *sk = SPDK_SCSI_SENSE_ABORTED_COMMAND; 3094 *asc = SPDK_SCSI_ASC_NO_ADDITIONAL_SENSE; 3095 *ascq = SPDK_SCSI_ASCQ_CAUSE_NOT_REPORTABLE; 3096 break; 3097 } 3098 } 3099 3100 void 3101 spdk_bdev_io_complete_nvme_status(struct spdk_bdev_io *bdev_io, int sct, int sc) 3102 { 3103 if (sct == SPDK_NVME_SCT_GENERIC && sc == SPDK_NVME_SC_SUCCESS) { 3104 bdev_io->internal.status = SPDK_BDEV_IO_STATUS_SUCCESS; 3105 } else { 3106 bdev_io->internal.error.nvme.sct = sct; 3107 bdev_io->internal.error.nvme.sc = sc; 3108 bdev_io->internal.status = SPDK_BDEV_IO_STATUS_NVME_ERROR; 3109 } 3110 3111 spdk_bdev_io_complete(bdev_io, bdev_io->internal.status); 3112 } 3113 3114 void 3115 spdk_bdev_io_get_nvme_status(const struct spdk_bdev_io *bdev_io, int *sct, int *sc) 3116 { 3117 assert(sct != NULL); 3118 assert(sc != NULL); 3119 3120 if (bdev_io->internal.status == SPDK_BDEV_IO_STATUS_NVME_ERROR) { 3121 *sct = bdev_io->internal.error.nvme.sct; 3122 *sc = bdev_io->internal.error.nvme.sc; 3123 } else if (bdev_io->internal.status == SPDK_BDEV_IO_STATUS_SUCCESS) { 3124 *sct = SPDK_NVME_SCT_GENERIC; 3125 *sc = SPDK_NVME_SC_SUCCESS; 3126 } else { 3127 *sct = SPDK_NVME_SCT_GENERIC; 3128 *sc = SPDK_NVME_SC_INTERNAL_DEVICE_ERROR; 3129 } 3130 } 3131 3132 struct spdk_thread * 3133 spdk_bdev_io_get_thread(struct spdk_bdev_io *bdev_io) 3134 { 3135 return spdk_io_channel_get_thread(bdev_io->internal.ch->channel); 3136 } 3137 3138 static void 3139 _spdk_bdev_qos_config_limit(struct spdk_bdev *bdev, uint64_t *limits) 3140 { 3141 uint64_t min_qos_set; 3142 int i; 3143 3144 for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) { 3145 if (limits[i] != SPDK_BDEV_QOS_LIMIT_NOT_DEFINED) { 3146 break; 3147 } 3148 } 3149 3150 if (i == SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES) { 3151 SPDK_ERRLOG("Invalid rate limits set.\n"); 3152 return; 3153 } 3154 3155 for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) { 3156 if (limits[i] == SPDK_BDEV_QOS_LIMIT_NOT_DEFINED) { 3157 continue; 3158 } 3159 3160 if (_spdk_bdev_qos_is_iops_rate_limit(i) == true) { 3161 min_qos_set = SPDK_BDEV_QOS_MIN_IOS_PER_SEC; 3162 } else { 3163 min_qos_set = SPDK_BDEV_QOS_MIN_BYTES_PER_SEC; 3164 } 3165 3166 if (limits[i] == 0 || limits[i] % min_qos_set) { 3167 SPDK_ERRLOG("Assigned limit %" PRIu64 " on bdev %s is not multiple of %" PRIu64 "\n", 3168 limits[i], bdev->name, min_qos_set); 3169 SPDK_ERRLOG("Failed to enable QoS on this bdev %s\n", bdev->name); 3170 return; 3171 } 3172 } 3173 3174 if (!bdev->internal.qos) { 3175 bdev->internal.qos = calloc(1, sizeof(*bdev->internal.qos)); 3176 if (!bdev->internal.qos) { 3177 SPDK_ERRLOG("Unable to allocate memory for QoS tracking\n"); 3178 return; 3179 } 3180 } 3181 3182 for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) { 3183 bdev->internal.qos->rate_limits[i].limit = limits[i]; 3184 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Bdev:%s QoS type:%d set:%lu\n", 3185 bdev->name, i, limits[i]); 3186 } 3187 3188 return; 3189 } 3190 3191 static void 3192 _spdk_bdev_qos_config(struct spdk_bdev *bdev) 3193 { 3194 struct spdk_conf_section *sp = NULL; 3195 const char *val = NULL; 3196 int i = 0, j = 0; 3197 uint64_t limits[SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES] = {}; 3198 bool config_qos = false; 3199 3200 sp = spdk_conf_find_section(NULL, "QoS"); 3201 if (!sp) { 3202 return; 3203 } 3204 3205 while (j < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES) { 3206 limits[j] = SPDK_BDEV_QOS_LIMIT_NOT_DEFINED; 3207 3208 i = 0; 3209 while (true) { 3210 val = spdk_conf_section_get_nmval(sp, qos_conf_type[j], i, 0); 3211 if (!val) { 3212 break; 3213 } 3214 3215 if (strcmp(bdev->name, val) != 0) { 3216 i++; 3217 continue; 3218 } 3219 3220 val = spdk_conf_section_get_nmval(sp, qos_conf_type[j], i, 1); 3221 if (val) { 3222 if (_spdk_bdev_qos_is_iops_rate_limit(j) == true) { 3223 limits[j] = strtoull(val, NULL, 10); 3224 } else { 3225 limits[j] = strtoull(val, NULL, 10) * 1024 * 1024; 3226 } 3227 config_qos = true; 3228 } 3229 3230 break; 3231 } 3232 3233 j++; 3234 } 3235 3236 if (config_qos == true) { 3237 _spdk_bdev_qos_config_limit(bdev, limits); 3238 } 3239 3240 return; 3241 } 3242 3243 static int 3244 spdk_bdev_init(struct spdk_bdev *bdev) 3245 { 3246 char *bdev_name; 3247 3248 assert(bdev->module != NULL); 3249 3250 if (!bdev->name) { 3251 SPDK_ERRLOG("Bdev name is NULL\n"); 3252 return -EINVAL; 3253 } 3254 3255 if (spdk_bdev_get_by_name(bdev->name)) { 3256 SPDK_ERRLOG("Bdev name:%s already exists\n", bdev->name); 3257 return -EEXIST; 3258 } 3259 3260 /* Users often register their own I/O devices using the bdev name. In 3261 * order to avoid conflicts, prepend bdev_. */ 3262 bdev_name = spdk_sprintf_alloc("bdev_%s", bdev->name); 3263 if (!bdev_name) { 3264 SPDK_ERRLOG("Unable to allocate memory for internal bdev name.\n"); 3265 return -ENOMEM; 3266 } 3267 3268 bdev->internal.status = SPDK_BDEV_STATUS_READY; 3269 bdev->internal.measured_queue_depth = UINT64_MAX; 3270 bdev->internal.claim_module = NULL; 3271 bdev->internal.qd_poller = NULL; 3272 bdev->internal.qos = NULL; 3273 3274 if (spdk_bdev_get_buf_align(bdev) > 1) { 3275 if (bdev->split_on_optimal_io_boundary) { 3276 bdev->optimal_io_boundary = spdk_min(bdev->optimal_io_boundary, 3277 SPDK_BDEV_LARGE_BUF_MAX_SIZE / bdev->blocklen); 3278 } else { 3279 bdev->split_on_optimal_io_boundary = true; 3280 bdev->optimal_io_boundary = SPDK_BDEV_LARGE_BUF_MAX_SIZE / bdev->blocklen; 3281 } 3282 } 3283 3284 TAILQ_INIT(&bdev->internal.open_descs); 3285 3286 TAILQ_INIT(&bdev->aliases); 3287 3288 bdev->internal.reset_in_progress = NULL; 3289 3290 _spdk_bdev_qos_config(bdev); 3291 3292 spdk_io_device_register(__bdev_to_io_dev(bdev), 3293 spdk_bdev_channel_create, spdk_bdev_channel_destroy, 3294 sizeof(struct spdk_bdev_channel), 3295 bdev_name); 3296 3297 free(bdev_name); 3298 3299 pthread_mutex_init(&bdev->internal.mutex, NULL); 3300 return 0; 3301 } 3302 3303 static void 3304 spdk_bdev_destroy_cb(void *io_device) 3305 { 3306 int rc; 3307 struct spdk_bdev *bdev; 3308 spdk_bdev_unregister_cb cb_fn; 3309 void *cb_arg; 3310 3311 bdev = __bdev_from_io_dev(io_device); 3312 cb_fn = bdev->internal.unregister_cb; 3313 cb_arg = bdev->internal.unregister_ctx; 3314 3315 rc = bdev->fn_table->destruct(bdev->ctxt); 3316 if (rc < 0) { 3317 SPDK_ERRLOG("destruct failed\n"); 3318 } 3319 if (rc <= 0 && cb_fn != NULL) { 3320 cb_fn(cb_arg, rc); 3321 } 3322 } 3323 3324 3325 static void 3326 spdk_bdev_fini(struct spdk_bdev *bdev) 3327 { 3328 pthread_mutex_destroy(&bdev->internal.mutex); 3329 3330 free(bdev->internal.qos); 3331 3332 spdk_io_device_unregister(__bdev_to_io_dev(bdev), spdk_bdev_destroy_cb); 3333 } 3334 3335 static void 3336 spdk_bdev_start(struct spdk_bdev *bdev) 3337 { 3338 struct spdk_bdev_module *module; 3339 uint32_t action; 3340 3341 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Inserting bdev %s into list\n", bdev->name); 3342 TAILQ_INSERT_TAIL(&g_bdev_mgr.bdevs, bdev, internal.link); 3343 3344 /* Examine configuration before initializing I/O */ 3345 TAILQ_FOREACH(module, &g_bdev_mgr.bdev_modules, internal.tailq) { 3346 if (module->examine_config) { 3347 action = module->internal.action_in_progress; 3348 module->internal.action_in_progress++; 3349 module->examine_config(bdev); 3350 if (action != module->internal.action_in_progress) { 3351 SPDK_ERRLOG("examine_config for module %s did not call spdk_bdev_module_examine_done()\n", 3352 module->name); 3353 } 3354 } 3355 } 3356 3357 if (bdev->internal.claim_module) { 3358 return; 3359 } 3360 3361 TAILQ_FOREACH(module, &g_bdev_mgr.bdev_modules, internal.tailq) { 3362 if (module->examine_disk) { 3363 module->internal.action_in_progress++; 3364 module->examine_disk(bdev); 3365 } 3366 } 3367 } 3368 3369 int 3370 spdk_bdev_register(struct spdk_bdev *bdev) 3371 { 3372 int rc = spdk_bdev_init(bdev); 3373 3374 if (rc == 0) { 3375 spdk_bdev_start(bdev); 3376 } 3377 3378 return rc; 3379 } 3380 3381 int 3382 spdk_vbdev_register(struct spdk_bdev *vbdev, struct spdk_bdev **base_bdevs, int base_bdev_count) 3383 { 3384 int rc; 3385 3386 rc = spdk_bdev_init(vbdev); 3387 if (rc) { 3388 return rc; 3389 } 3390 3391 spdk_bdev_start(vbdev); 3392 return 0; 3393 } 3394 3395 void 3396 spdk_bdev_destruct_done(struct spdk_bdev *bdev, int bdeverrno) 3397 { 3398 if (bdev->internal.unregister_cb != NULL) { 3399 bdev->internal.unregister_cb(bdev->internal.unregister_ctx, bdeverrno); 3400 } 3401 } 3402 3403 static void 3404 _remove_notify(void *arg) 3405 { 3406 struct spdk_bdev_desc *desc = arg; 3407 3408 desc->remove_scheduled = false; 3409 3410 if (desc->closed) { 3411 free(desc); 3412 } else { 3413 desc->remove_cb(desc->remove_ctx); 3414 } 3415 } 3416 3417 void 3418 spdk_bdev_unregister(struct spdk_bdev *bdev, spdk_bdev_unregister_cb cb_fn, void *cb_arg) 3419 { 3420 struct spdk_bdev_desc *desc, *tmp; 3421 bool do_destruct = true; 3422 struct spdk_thread *thread; 3423 3424 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Removing bdev %s from list\n", bdev->name); 3425 3426 thread = spdk_get_thread(); 3427 if (!thread) { 3428 /* The user called this from a non-SPDK thread. */ 3429 if (cb_fn != NULL) { 3430 cb_fn(cb_arg, -ENOTSUP); 3431 } 3432 return; 3433 } 3434 3435 pthread_mutex_lock(&bdev->internal.mutex); 3436 3437 bdev->internal.status = SPDK_BDEV_STATUS_REMOVING; 3438 bdev->internal.unregister_cb = cb_fn; 3439 bdev->internal.unregister_ctx = cb_arg; 3440 3441 TAILQ_FOREACH_SAFE(desc, &bdev->internal.open_descs, link, tmp) { 3442 if (desc->remove_cb) { 3443 do_destruct = false; 3444 /* 3445 * Defer invocation of the remove_cb to a separate message that will 3446 * run later on its thread. This ensures this context unwinds and 3447 * we don't recursively unregister this bdev again if the remove_cb 3448 * immediately closes its descriptor. 3449 */ 3450 if (!desc->remove_scheduled) { 3451 /* Avoid scheduling removal of the same descriptor multiple times. */ 3452 desc->remove_scheduled = true; 3453 spdk_thread_send_msg(desc->thread, _remove_notify, desc); 3454 } 3455 } 3456 } 3457 3458 if (!do_destruct) { 3459 pthread_mutex_unlock(&bdev->internal.mutex); 3460 return; 3461 } 3462 3463 TAILQ_REMOVE(&g_bdev_mgr.bdevs, bdev, internal.link); 3464 pthread_mutex_unlock(&bdev->internal.mutex); 3465 3466 spdk_bdev_fini(bdev); 3467 } 3468 3469 int 3470 spdk_bdev_open(struct spdk_bdev *bdev, bool write, spdk_bdev_remove_cb_t remove_cb, 3471 void *remove_ctx, struct spdk_bdev_desc **_desc) 3472 { 3473 struct spdk_bdev_desc *desc; 3474 struct spdk_thread *thread; 3475 3476 thread = spdk_get_thread(); 3477 if (!thread) { 3478 SPDK_ERRLOG("Cannot open bdev from non-SPDK thread.\n"); 3479 return -ENOTSUP; 3480 } 3481 3482 desc = calloc(1, sizeof(*desc)); 3483 if (desc == NULL) { 3484 SPDK_ERRLOG("Failed to allocate memory for bdev descriptor\n"); 3485 return -ENOMEM; 3486 } 3487 3488 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Opening descriptor %p for bdev %s on thread %p\n", desc, bdev->name, 3489 spdk_get_thread()); 3490 3491 pthread_mutex_lock(&bdev->internal.mutex); 3492 3493 if (write && bdev->internal.claim_module) { 3494 SPDK_ERRLOG("Could not open %s - %s module already claimed it\n", 3495 bdev->name, bdev->internal.claim_module->name); 3496 free(desc); 3497 pthread_mutex_unlock(&bdev->internal.mutex); 3498 return -EPERM; 3499 } 3500 3501 TAILQ_INSERT_TAIL(&bdev->internal.open_descs, desc, link); 3502 3503 desc->bdev = bdev; 3504 desc->thread = thread; 3505 desc->remove_cb = remove_cb; 3506 desc->remove_ctx = remove_ctx; 3507 desc->write = write; 3508 *_desc = desc; 3509 3510 pthread_mutex_unlock(&bdev->internal.mutex); 3511 3512 return 0; 3513 } 3514 3515 void 3516 spdk_bdev_close(struct spdk_bdev_desc *desc) 3517 { 3518 struct spdk_bdev *bdev = desc->bdev; 3519 bool do_unregister = false; 3520 3521 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Closing descriptor %p for bdev %s on thread %p\n", desc, bdev->name, 3522 spdk_get_thread()); 3523 3524 assert(desc->thread == spdk_get_thread()); 3525 3526 pthread_mutex_lock(&bdev->internal.mutex); 3527 3528 TAILQ_REMOVE(&bdev->internal.open_descs, desc, link); 3529 3530 desc->closed = true; 3531 3532 if (!desc->remove_scheduled) { 3533 free(desc); 3534 } 3535 3536 /* If no more descriptors, kill QoS channel */ 3537 if (bdev->internal.qos && TAILQ_EMPTY(&bdev->internal.open_descs)) { 3538 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Closed last descriptor for bdev %s on thread %p. Stopping QoS.\n", 3539 bdev->name, spdk_get_thread()); 3540 3541 if (spdk_bdev_qos_destroy(bdev)) { 3542 /* There isn't anything we can do to recover here. Just let the 3543 * old QoS poller keep running. The QoS handling won't change 3544 * cores when the user allocates a new channel, but it won't break. */ 3545 SPDK_ERRLOG("Unable to shut down QoS poller. It will continue running on the current thread.\n"); 3546 } 3547 } 3548 3549 spdk_bdev_set_qd_sampling_period(bdev, 0); 3550 3551 if (bdev->internal.status == SPDK_BDEV_STATUS_REMOVING && TAILQ_EMPTY(&bdev->internal.open_descs)) { 3552 do_unregister = true; 3553 } 3554 pthread_mutex_unlock(&bdev->internal.mutex); 3555 3556 if (do_unregister == true) { 3557 spdk_bdev_unregister(bdev, bdev->internal.unregister_cb, bdev->internal.unregister_ctx); 3558 } 3559 } 3560 3561 int 3562 spdk_bdev_module_claim_bdev(struct spdk_bdev *bdev, struct spdk_bdev_desc *desc, 3563 struct spdk_bdev_module *module) 3564 { 3565 if (bdev->internal.claim_module != NULL) { 3566 SPDK_ERRLOG("bdev %s already claimed by module %s\n", bdev->name, 3567 bdev->internal.claim_module->name); 3568 return -EPERM; 3569 } 3570 3571 if (desc && !desc->write) { 3572 desc->write = true; 3573 } 3574 3575 bdev->internal.claim_module = module; 3576 return 0; 3577 } 3578 3579 void 3580 spdk_bdev_module_release_bdev(struct spdk_bdev *bdev) 3581 { 3582 assert(bdev->internal.claim_module != NULL); 3583 bdev->internal.claim_module = NULL; 3584 } 3585 3586 struct spdk_bdev * 3587 spdk_bdev_desc_get_bdev(struct spdk_bdev_desc *desc) 3588 { 3589 return desc->bdev; 3590 } 3591 3592 void 3593 spdk_bdev_io_get_iovec(struct spdk_bdev_io *bdev_io, struct iovec **iovp, int *iovcntp) 3594 { 3595 struct iovec *iovs; 3596 int iovcnt; 3597 3598 if (bdev_io == NULL) { 3599 return; 3600 } 3601 3602 switch (bdev_io->type) { 3603 case SPDK_BDEV_IO_TYPE_READ: 3604 iovs = bdev_io->u.bdev.iovs; 3605 iovcnt = bdev_io->u.bdev.iovcnt; 3606 break; 3607 case SPDK_BDEV_IO_TYPE_WRITE: 3608 iovs = bdev_io->u.bdev.iovs; 3609 iovcnt = bdev_io->u.bdev.iovcnt; 3610 break; 3611 default: 3612 iovs = NULL; 3613 iovcnt = 0; 3614 break; 3615 } 3616 3617 if (iovp) { 3618 *iovp = iovs; 3619 } 3620 if (iovcntp) { 3621 *iovcntp = iovcnt; 3622 } 3623 } 3624 3625 void 3626 spdk_bdev_module_list_add(struct spdk_bdev_module *bdev_module) 3627 { 3628 3629 if (spdk_bdev_module_list_find(bdev_module->name)) { 3630 SPDK_ERRLOG("ERROR: module '%s' already registered.\n", bdev_module->name); 3631 assert(false); 3632 } 3633 3634 if (bdev_module->async_init) { 3635 bdev_module->internal.action_in_progress = 1; 3636 } 3637 3638 /* 3639 * Modules with examine callbacks must be initialized first, so they are 3640 * ready to handle examine callbacks from later modules that will 3641 * register physical bdevs. 3642 */ 3643 if (bdev_module->examine_config != NULL || bdev_module->examine_disk != NULL) { 3644 TAILQ_INSERT_HEAD(&g_bdev_mgr.bdev_modules, bdev_module, internal.tailq); 3645 } else { 3646 TAILQ_INSERT_TAIL(&g_bdev_mgr.bdev_modules, bdev_module, internal.tailq); 3647 } 3648 } 3649 3650 struct spdk_bdev_module * 3651 spdk_bdev_module_list_find(const char *name) 3652 { 3653 struct spdk_bdev_module *bdev_module; 3654 3655 TAILQ_FOREACH(bdev_module, &g_bdev_mgr.bdev_modules, internal.tailq) { 3656 if (strcmp(name, bdev_module->name) == 0) { 3657 break; 3658 } 3659 } 3660 3661 return bdev_module; 3662 } 3663 3664 static void 3665 _spdk_bdev_write_zero_buffer_next(void *_bdev_io) 3666 { 3667 struct spdk_bdev_io *bdev_io = _bdev_io; 3668 uint64_t num_bytes, num_blocks; 3669 int rc; 3670 3671 num_bytes = spdk_min(spdk_bdev_get_block_size(bdev_io->bdev) * 3672 bdev_io->u.bdev.split_remaining_num_blocks, 3673 ZERO_BUFFER_SIZE); 3674 num_blocks = num_bytes / spdk_bdev_get_block_size(bdev_io->bdev); 3675 3676 rc = spdk_bdev_write_blocks(bdev_io->internal.desc, 3677 spdk_io_channel_from_ctx(bdev_io->internal.ch), 3678 g_bdev_mgr.zero_buffer, 3679 bdev_io->u.bdev.split_current_offset_blocks, num_blocks, 3680 _spdk_bdev_write_zero_buffer_done, bdev_io); 3681 if (rc == 0) { 3682 bdev_io->u.bdev.split_remaining_num_blocks -= num_blocks; 3683 bdev_io->u.bdev.split_current_offset_blocks += num_blocks; 3684 } else if (rc == -ENOMEM) { 3685 _spdk_bdev_queue_io_wait_with_cb(bdev_io, _spdk_bdev_write_zero_buffer_next); 3686 } else { 3687 bdev_io->internal.status = SPDK_BDEV_IO_STATUS_FAILED; 3688 bdev_io->internal.cb(bdev_io, false, bdev_io->internal.caller_ctx); 3689 } 3690 } 3691 3692 static void 3693 _spdk_bdev_write_zero_buffer_done(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg) 3694 { 3695 struct spdk_bdev_io *parent_io = cb_arg; 3696 3697 spdk_bdev_free_io(bdev_io); 3698 3699 if (!success) { 3700 parent_io->internal.status = SPDK_BDEV_IO_STATUS_FAILED; 3701 parent_io->internal.cb(parent_io, false, parent_io->internal.caller_ctx); 3702 return; 3703 } 3704 3705 if (parent_io->u.bdev.split_remaining_num_blocks == 0) { 3706 parent_io->internal.status = SPDK_BDEV_IO_STATUS_SUCCESS; 3707 parent_io->internal.cb(parent_io, true, parent_io->internal.caller_ctx); 3708 return; 3709 } 3710 3711 _spdk_bdev_write_zero_buffer_next(parent_io); 3712 } 3713 3714 struct set_qos_limit_ctx { 3715 void (*cb_fn)(void *cb_arg, int status); 3716 void *cb_arg; 3717 struct spdk_bdev *bdev; 3718 }; 3719 3720 static void 3721 _spdk_bdev_set_qos_limit_done(struct set_qos_limit_ctx *ctx, int status) 3722 { 3723 pthread_mutex_lock(&ctx->bdev->internal.mutex); 3724 ctx->bdev->internal.qos_mod_in_progress = false; 3725 pthread_mutex_unlock(&ctx->bdev->internal.mutex); 3726 3727 ctx->cb_fn(ctx->cb_arg, status); 3728 free(ctx); 3729 } 3730 3731 static void 3732 _spdk_bdev_disable_qos_done(void *cb_arg) 3733 { 3734 struct set_qos_limit_ctx *ctx = cb_arg; 3735 struct spdk_bdev *bdev = ctx->bdev; 3736 struct spdk_bdev_io *bdev_io; 3737 struct spdk_bdev_qos *qos; 3738 3739 pthread_mutex_lock(&bdev->internal.mutex); 3740 qos = bdev->internal.qos; 3741 bdev->internal.qos = NULL; 3742 pthread_mutex_unlock(&bdev->internal.mutex); 3743 3744 while (!TAILQ_EMPTY(&qos->queued)) { 3745 /* Send queued I/O back to their original thread for resubmission. */ 3746 bdev_io = TAILQ_FIRST(&qos->queued); 3747 TAILQ_REMOVE(&qos->queued, bdev_io, internal.link); 3748 3749 if (bdev_io->internal.io_submit_ch) { 3750 /* 3751 * Channel was changed when sending it to the QoS thread - change it back 3752 * before sending it back to the original thread. 3753 */ 3754 bdev_io->internal.ch = bdev_io->internal.io_submit_ch; 3755 bdev_io->internal.io_submit_ch = NULL; 3756 } 3757 3758 spdk_thread_send_msg(spdk_io_channel_get_thread(bdev_io->internal.ch->channel), 3759 _spdk_bdev_io_submit, bdev_io); 3760 } 3761 3762 spdk_put_io_channel(spdk_io_channel_from_ctx(qos->ch)); 3763 spdk_poller_unregister(&qos->poller); 3764 3765 free(qos); 3766 3767 _spdk_bdev_set_qos_limit_done(ctx, 0); 3768 } 3769 3770 static void 3771 _spdk_bdev_disable_qos_msg_done(struct spdk_io_channel_iter *i, int status) 3772 { 3773 void *io_device = spdk_io_channel_iter_get_io_device(i); 3774 struct spdk_bdev *bdev = __bdev_from_io_dev(io_device); 3775 struct set_qos_limit_ctx *ctx = spdk_io_channel_iter_get_ctx(i); 3776 struct spdk_thread *thread; 3777 3778 pthread_mutex_lock(&bdev->internal.mutex); 3779 thread = bdev->internal.qos->thread; 3780 pthread_mutex_unlock(&bdev->internal.mutex); 3781 3782 spdk_thread_send_msg(thread, _spdk_bdev_disable_qos_done, ctx); 3783 } 3784 3785 static void 3786 _spdk_bdev_disable_qos_msg(struct spdk_io_channel_iter *i) 3787 { 3788 struct spdk_io_channel *ch = spdk_io_channel_iter_get_channel(i); 3789 struct spdk_bdev_channel *bdev_ch = spdk_io_channel_get_ctx(ch); 3790 3791 bdev_ch->flags &= ~BDEV_CH_QOS_ENABLED; 3792 3793 spdk_for_each_channel_continue(i, 0); 3794 } 3795 3796 static void 3797 _spdk_bdev_update_qos_rate_limit_msg(void *cb_arg) 3798 { 3799 struct set_qos_limit_ctx *ctx = cb_arg; 3800 struct spdk_bdev *bdev = ctx->bdev; 3801 3802 pthread_mutex_lock(&bdev->internal.mutex); 3803 spdk_bdev_qos_update_max_quota_per_timeslice(bdev->internal.qos); 3804 pthread_mutex_unlock(&bdev->internal.mutex); 3805 3806 _spdk_bdev_set_qos_limit_done(ctx, 0); 3807 } 3808 3809 static void 3810 _spdk_bdev_enable_qos_msg(struct spdk_io_channel_iter *i) 3811 { 3812 void *io_device = spdk_io_channel_iter_get_io_device(i); 3813 struct spdk_bdev *bdev = __bdev_from_io_dev(io_device); 3814 struct spdk_io_channel *ch = spdk_io_channel_iter_get_channel(i); 3815 struct spdk_bdev_channel *bdev_ch = spdk_io_channel_get_ctx(ch); 3816 3817 pthread_mutex_lock(&bdev->internal.mutex); 3818 _spdk_bdev_enable_qos(bdev, bdev_ch); 3819 pthread_mutex_unlock(&bdev->internal.mutex); 3820 spdk_for_each_channel_continue(i, 0); 3821 } 3822 3823 static void 3824 _spdk_bdev_enable_qos_done(struct spdk_io_channel_iter *i, int status) 3825 { 3826 struct set_qos_limit_ctx *ctx = spdk_io_channel_iter_get_ctx(i); 3827 3828 _spdk_bdev_set_qos_limit_done(ctx, status); 3829 } 3830 3831 static void 3832 _spdk_bdev_set_qos_rate_limits(struct spdk_bdev *bdev, uint64_t *limits) 3833 { 3834 int i; 3835 3836 assert(bdev->internal.qos != NULL); 3837 3838 for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) { 3839 if (limits[i] != SPDK_BDEV_QOS_LIMIT_NOT_DEFINED) { 3840 bdev->internal.qos->rate_limits[i].limit = limits[i]; 3841 3842 if (limits[i] == 0) { 3843 bdev->internal.qos->rate_limits[i].limit = 3844 SPDK_BDEV_QOS_LIMIT_NOT_DEFINED; 3845 } 3846 } 3847 } 3848 } 3849 3850 void 3851 spdk_bdev_set_qos_rate_limits(struct spdk_bdev *bdev, uint64_t *limits, 3852 void (*cb_fn)(void *cb_arg, int status), void *cb_arg) 3853 { 3854 struct set_qos_limit_ctx *ctx; 3855 uint32_t limit_set_complement; 3856 uint64_t min_limit_per_sec; 3857 int i; 3858 bool disable_rate_limit = true; 3859 3860 for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) { 3861 if (limits[i] == SPDK_BDEV_QOS_LIMIT_NOT_DEFINED) { 3862 continue; 3863 } 3864 3865 if (limits[i] > 0) { 3866 disable_rate_limit = false; 3867 } 3868 3869 if (_spdk_bdev_qos_is_iops_rate_limit(i) == true) { 3870 min_limit_per_sec = SPDK_BDEV_QOS_MIN_IOS_PER_SEC; 3871 } else { 3872 /* Change from megabyte to byte rate limit */ 3873 limits[i] = limits[i] * 1024 * 1024; 3874 min_limit_per_sec = SPDK_BDEV_QOS_MIN_BYTES_PER_SEC; 3875 } 3876 3877 limit_set_complement = limits[i] % min_limit_per_sec; 3878 if (limit_set_complement) { 3879 SPDK_ERRLOG("Requested rate limit %" PRIu64 " is not a multiple of %" PRIu64 "\n", 3880 limits[i], min_limit_per_sec); 3881 limits[i] += min_limit_per_sec - limit_set_complement; 3882 SPDK_ERRLOG("Round up the rate limit to %" PRIu64 "\n", limits[i]); 3883 } 3884 } 3885 3886 ctx = calloc(1, sizeof(*ctx)); 3887 if (ctx == NULL) { 3888 cb_fn(cb_arg, -ENOMEM); 3889 return; 3890 } 3891 3892 ctx->cb_fn = cb_fn; 3893 ctx->cb_arg = cb_arg; 3894 ctx->bdev = bdev; 3895 3896 pthread_mutex_lock(&bdev->internal.mutex); 3897 if (bdev->internal.qos_mod_in_progress) { 3898 pthread_mutex_unlock(&bdev->internal.mutex); 3899 free(ctx); 3900 cb_fn(cb_arg, -EAGAIN); 3901 return; 3902 } 3903 bdev->internal.qos_mod_in_progress = true; 3904 3905 if (disable_rate_limit == true && bdev->internal.qos) { 3906 for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) { 3907 if (limits[i] == SPDK_BDEV_QOS_LIMIT_NOT_DEFINED && 3908 (bdev->internal.qos->rate_limits[i].limit > 0 && 3909 bdev->internal.qos->rate_limits[i].limit != 3910 SPDK_BDEV_QOS_LIMIT_NOT_DEFINED)) { 3911 disable_rate_limit = false; 3912 break; 3913 } 3914 } 3915 } 3916 3917 if (disable_rate_limit == false) { 3918 if (bdev->internal.qos == NULL) { 3919 /* Enabling */ 3920 bdev->internal.qos = calloc(1, sizeof(*bdev->internal.qos)); 3921 if (!bdev->internal.qos) { 3922 pthread_mutex_unlock(&bdev->internal.mutex); 3923 SPDK_ERRLOG("Unable to allocate memory for QoS tracking\n"); 3924 free(ctx); 3925 cb_fn(cb_arg, -ENOMEM); 3926 return; 3927 } 3928 3929 _spdk_bdev_set_qos_rate_limits(bdev, limits); 3930 3931 spdk_for_each_channel(__bdev_to_io_dev(bdev), 3932 _spdk_bdev_enable_qos_msg, ctx, 3933 _spdk_bdev_enable_qos_done); 3934 } else { 3935 /* Updating */ 3936 _spdk_bdev_set_qos_rate_limits(bdev, limits); 3937 3938 spdk_thread_send_msg(bdev->internal.qos->thread, 3939 _spdk_bdev_update_qos_rate_limit_msg, ctx); 3940 } 3941 } else { 3942 if (bdev->internal.qos != NULL) { 3943 _spdk_bdev_set_qos_rate_limits(bdev, limits); 3944 3945 /* Disabling */ 3946 spdk_for_each_channel(__bdev_to_io_dev(bdev), 3947 _spdk_bdev_disable_qos_msg, ctx, 3948 _spdk_bdev_disable_qos_msg_done); 3949 } else { 3950 pthread_mutex_unlock(&bdev->internal.mutex); 3951 _spdk_bdev_set_qos_limit_done(ctx, 0); 3952 return; 3953 } 3954 } 3955 3956 pthread_mutex_unlock(&bdev->internal.mutex); 3957 } 3958 3959 SPDK_LOG_REGISTER_COMPONENT("bdev", SPDK_LOG_BDEV) 3960 3961 SPDK_TRACE_REGISTER_FN(bdev_trace) 3962 { 3963 spdk_trace_register_owner(OWNER_BDEV, 'b'); 3964 spdk_trace_register_object(OBJECT_BDEV_IO, 'i'); 3965 spdk_trace_register_description("BDEV_IO_START", "", TRACE_BDEV_IO_START, OWNER_BDEV, 3966 OBJECT_BDEV_IO, 1, 0, "type: "); 3967 spdk_trace_register_description("BDEV_IO_DONE", "", TRACE_BDEV_IO_DONE, OWNER_BDEV, 3968 OBJECT_BDEV_IO, 0, 0, ""); 3969 } 3970