1 /*- 2 * BSD LICENSE 3 * 4 * Copyright (C) 2008-2012 Daisuke Aoyama <aoyama@peach.ne.jp>. 5 * Copyright (c) Intel Corporation. 6 * All rights reserved. 7 * 8 * Redistribution and use in source and binary forms, with or without 9 * modification, are permitted provided that the following conditions 10 * are met: 11 * 12 * * Redistributions of source code must retain the above copyright 13 * notice, this list of conditions and the following disclaimer. 14 * * Redistributions in binary form must reproduce the above copyright 15 * notice, this list of conditions and the following disclaimer in 16 * the documentation and/or other materials provided with the 17 * distribution. 18 * * Neither the name of Intel Corporation nor the names of its 19 * contributors may be used to endorse or promote products derived 20 * from this software without specific prior written permission. 21 * 22 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 23 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 24 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR 25 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT 26 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, 27 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT 28 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 29 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY 30 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 31 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE 32 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 33 */ 34 35 #include "spdk/stdinc.h" 36 37 #include "spdk/bdev.h" 38 39 #include "spdk/env.h" 40 #include "spdk/event.h" 41 #include "spdk/io_channel.h" 42 #include "spdk/likely.h" 43 #include "spdk/queue.h" 44 #include "spdk/nvme_spec.h" 45 #include "spdk/scsi_spec.h" 46 #include "spdk/util.h" 47 48 #include "spdk_internal/bdev.h" 49 #include "spdk_internal/log.h" 50 #include "spdk/string.h" 51 52 #ifdef SPDK_CONFIG_VTUNE 53 #include "ittnotify.h" 54 #include "ittnotify_types.h" 55 int __itt_init_ittlib(const char *, __itt_group_id); 56 #endif 57 58 #define SPDK_BDEV_IO_POOL_SIZE (64 * 1024) 59 #define SPDK_BDEV_IO_CACHE_SIZE 256 60 #define BUF_SMALL_POOL_SIZE 8192 61 #define BUF_LARGE_POOL_SIZE 1024 62 #define NOMEM_THRESHOLD_COUNT 8 63 #define ZERO_BUFFER_SIZE 0x100000 64 #define SPDK_BDEV_QOS_TIMESLICE_IN_US 1000 65 66 typedef TAILQ_HEAD(, spdk_bdev_io) bdev_io_tailq_t; 67 typedef STAILQ_HEAD(, spdk_bdev_io) bdev_io_stailq_t; 68 69 struct spdk_bdev_mgr { 70 struct spdk_mempool *bdev_io_pool; 71 72 struct spdk_mempool *buf_small_pool; 73 struct spdk_mempool *buf_large_pool; 74 75 void *zero_buffer; 76 77 TAILQ_HEAD(, spdk_bdev_module_if) bdev_modules; 78 79 TAILQ_HEAD(, spdk_bdev) bdevs; 80 81 bool init_complete; 82 bool module_init_complete; 83 84 #ifdef SPDK_CONFIG_VTUNE 85 __itt_domain *domain; 86 #endif 87 }; 88 89 static struct spdk_bdev_mgr g_bdev_mgr = { 90 .bdev_modules = TAILQ_HEAD_INITIALIZER(g_bdev_mgr.bdev_modules), 91 .bdevs = TAILQ_HEAD_INITIALIZER(g_bdev_mgr.bdevs), 92 .init_complete = false, 93 .module_init_complete = false, 94 }; 95 96 static spdk_bdev_init_cb g_init_cb_fn = NULL; 97 static void *g_init_cb_arg = NULL; 98 99 static spdk_bdev_fini_cb g_fini_cb_fn = NULL; 100 static void *g_fini_cb_arg = NULL; 101 static struct spdk_thread *g_fini_thread = NULL; 102 103 104 struct spdk_bdev_mgmt_channel { 105 bdev_io_stailq_t need_buf_small; 106 bdev_io_stailq_t need_buf_large; 107 108 /* 109 * Each thread keeps a cache of bdev_io - this allows 110 * bdev threads which are *not* DPDK threads to still 111 * benefit from a per-thread bdev_io cache. Without 112 * this, non-DPDK threads fetching from the mempool 113 * incur a cmpxchg on get and put. 114 */ 115 bdev_io_stailq_t per_thread_cache; 116 uint32_t per_thread_cache_count; 117 118 TAILQ_HEAD(, spdk_bdev_module_channel) module_channels; 119 }; 120 121 struct spdk_bdev_desc { 122 struct spdk_bdev *bdev; 123 spdk_bdev_remove_cb_t remove_cb; 124 void *remove_ctx; 125 bool write; 126 TAILQ_ENTRY(spdk_bdev_desc) link; 127 }; 128 129 #define BDEV_CH_RESET_IN_PROGRESS (1 << 0) 130 131 struct spdk_bdev_channel { 132 struct spdk_bdev *bdev; 133 134 /* The channel for the underlying device */ 135 struct spdk_io_channel *channel; 136 137 /* Channel for the bdev manager */ 138 struct spdk_io_channel *mgmt_channel; 139 140 struct spdk_bdev_io_stat stat; 141 142 bdev_io_tailq_t queued_resets; 143 144 uint32_t flags; 145 146 /* 147 * Rate limiting on this channel. 148 * Queue of IO awaiting issue because of a QoS rate limiting happened 149 * on this channel. 150 */ 151 bdev_io_tailq_t qos_io; 152 153 /* 154 * Rate limiting on this channel. 155 * Maximum allowed IOs to be issued in one timeslice (e.g., 1ms) and 156 * only valid for the master channel which manages the outstanding IOs. 157 */ 158 uint64_t qos_max_ios_per_timeslice; 159 160 /* 161 * Rate limiting on this channel. 162 * Submitted IO in one timeslice (e.g., 1ms) 163 */ 164 uint64_t io_submitted_this_timeslice; 165 166 /* 167 * Rate limiting on this channel. 168 * Periodic running QoS poller in millisecond. 169 */ 170 struct spdk_poller *qos_poller; 171 172 /* Per-device channel */ 173 struct spdk_bdev_module_channel *module_ch; 174 175 #ifdef SPDK_CONFIG_VTUNE 176 uint64_t start_tsc; 177 uint64_t interval_tsc; 178 __itt_string_handle *handle; 179 #endif 180 181 }; 182 183 #define __bdev_to_io_dev(bdev) (((char *)bdev) + 1) 184 #define __bdev_from_io_dev(io_dev) ((struct spdk_bdev *)(((char *)io_dev) - 1)) 185 186 /* 187 * Per-module (or per-io_device) channel. Multiple bdevs built on the same io_device 188 * will queue here their IO that awaits retry. It makes it posible to retry sending 189 * IO to one bdev after IO from other bdev completes. 190 */ 191 struct spdk_bdev_module_channel { 192 /* 193 * Count of I/O submitted to bdev module and waiting for completion. 194 * Incremented before submit_request() is called on an spdk_bdev_io. 195 */ 196 uint64_t io_outstanding; 197 198 /* 199 * Queue of IO awaiting retry because of a previous NOMEM status returned 200 * on this channel. 201 */ 202 bdev_io_tailq_t nomem_io; 203 204 /* 205 * Threshold which io_outstanding must drop to before retrying nomem_io. 206 */ 207 uint64_t nomem_threshold; 208 209 /* I/O channel allocated by a bdev module */ 210 struct spdk_io_channel *module_ch; 211 212 uint32_t ref; 213 214 TAILQ_ENTRY(spdk_bdev_module_channel) link; 215 }; 216 217 static void spdk_bdev_write_zeroes_split(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg); 218 219 struct spdk_bdev * 220 spdk_bdev_first(void) 221 { 222 struct spdk_bdev *bdev; 223 224 bdev = TAILQ_FIRST(&g_bdev_mgr.bdevs); 225 if (bdev) { 226 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Starting bdev iteration at %s\n", bdev->name); 227 } 228 229 return bdev; 230 } 231 232 struct spdk_bdev * 233 spdk_bdev_next(struct spdk_bdev *prev) 234 { 235 struct spdk_bdev *bdev; 236 237 bdev = TAILQ_NEXT(prev, link); 238 if (bdev) { 239 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Continuing bdev iteration at %s\n", bdev->name); 240 } 241 242 return bdev; 243 } 244 245 static struct spdk_bdev * 246 _bdev_next_leaf(struct spdk_bdev *bdev) 247 { 248 while (bdev != NULL) { 249 if (TAILQ_EMPTY(&bdev->vbdevs)) { 250 return bdev; 251 } else { 252 bdev = TAILQ_NEXT(bdev, link); 253 } 254 } 255 256 return bdev; 257 } 258 259 struct spdk_bdev * 260 spdk_bdev_first_leaf(void) 261 { 262 struct spdk_bdev *bdev; 263 264 bdev = _bdev_next_leaf(TAILQ_FIRST(&g_bdev_mgr.bdevs)); 265 266 if (bdev) { 267 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Starting bdev iteration at %s\n", bdev->name); 268 } 269 270 return bdev; 271 } 272 273 struct spdk_bdev * 274 spdk_bdev_next_leaf(struct spdk_bdev *prev) 275 { 276 struct spdk_bdev *bdev; 277 278 bdev = _bdev_next_leaf(TAILQ_NEXT(prev, link)); 279 280 if (bdev) { 281 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Continuing bdev iteration at %s\n", bdev->name); 282 } 283 284 return bdev; 285 } 286 287 struct spdk_bdev * 288 spdk_bdev_get_by_name(const char *bdev_name) 289 { 290 struct spdk_bdev_alias *tmp; 291 struct spdk_bdev *bdev = spdk_bdev_first(); 292 293 while (bdev != NULL) { 294 if (strcmp(bdev_name, bdev->name) == 0) { 295 return bdev; 296 } 297 298 TAILQ_FOREACH(tmp, &bdev->aliases, tailq) { 299 if (strcmp(bdev_name, tmp->alias) == 0) { 300 return bdev; 301 } 302 } 303 304 bdev = spdk_bdev_next(bdev); 305 } 306 307 return NULL; 308 } 309 310 static void 311 spdk_bdev_io_set_buf(struct spdk_bdev_io *bdev_io, void *buf) 312 { 313 assert(bdev_io->get_buf_cb != NULL); 314 assert(buf != NULL); 315 assert(bdev_io->u.bdev.iovs != NULL); 316 317 bdev_io->buf = buf; 318 bdev_io->u.bdev.iovs[0].iov_base = (void *)((unsigned long)((char *)buf + 512) & ~511UL); 319 bdev_io->u.bdev.iovs[0].iov_len = bdev_io->buf_len; 320 bdev_io->get_buf_cb(bdev_io->ch->channel, bdev_io); 321 } 322 323 static void 324 spdk_bdev_io_put_buf(struct spdk_bdev_io *bdev_io) 325 { 326 struct spdk_mempool *pool; 327 struct spdk_bdev_io *tmp; 328 void *buf; 329 bdev_io_stailq_t *stailq; 330 struct spdk_bdev_mgmt_channel *ch; 331 332 assert(bdev_io->u.bdev.iovcnt == 1); 333 334 buf = bdev_io->buf; 335 ch = bdev_io->mgmt_ch; 336 337 if (bdev_io->buf_len <= SPDK_BDEV_SMALL_BUF_MAX_SIZE) { 338 pool = g_bdev_mgr.buf_small_pool; 339 stailq = &ch->need_buf_small; 340 } else { 341 pool = g_bdev_mgr.buf_large_pool; 342 stailq = &ch->need_buf_large; 343 } 344 345 if (STAILQ_EMPTY(stailq)) { 346 spdk_mempool_put(pool, buf); 347 } else { 348 tmp = STAILQ_FIRST(stailq); 349 STAILQ_REMOVE_HEAD(stailq, buf_link); 350 spdk_bdev_io_set_buf(tmp, buf); 351 } 352 } 353 354 void 355 spdk_bdev_io_get_buf(struct spdk_bdev_io *bdev_io, spdk_bdev_io_get_buf_cb cb, uint64_t len) 356 { 357 struct spdk_mempool *pool; 358 bdev_io_stailq_t *stailq; 359 void *buf = NULL; 360 struct spdk_bdev_mgmt_channel *ch; 361 362 assert(cb != NULL); 363 assert(bdev_io->u.bdev.iovs != NULL); 364 365 if (spdk_unlikely(bdev_io->u.bdev.iovs[0].iov_base != NULL)) { 366 /* Buffer already present */ 367 cb(bdev_io->ch->channel, bdev_io); 368 return; 369 } 370 371 assert(len <= SPDK_BDEV_LARGE_BUF_MAX_SIZE); 372 ch = spdk_io_channel_get_ctx(bdev_io->ch->mgmt_channel); 373 374 bdev_io->buf_len = len; 375 bdev_io->get_buf_cb = cb; 376 if (len <= SPDK_BDEV_SMALL_BUF_MAX_SIZE) { 377 pool = g_bdev_mgr.buf_small_pool; 378 stailq = &ch->need_buf_small; 379 } else { 380 pool = g_bdev_mgr.buf_large_pool; 381 stailq = &ch->need_buf_large; 382 } 383 384 buf = spdk_mempool_get(pool); 385 386 if (!buf) { 387 STAILQ_INSERT_TAIL(stailq, bdev_io, buf_link); 388 } else { 389 spdk_bdev_io_set_buf(bdev_io, buf); 390 } 391 } 392 393 static int 394 spdk_bdev_module_get_max_ctx_size(void) 395 { 396 struct spdk_bdev_module_if *bdev_module; 397 int max_bdev_module_size = 0; 398 399 TAILQ_FOREACH(bdev_module, &g_bdev_mgr.bdev_modules, tailq) { 400 if (bdev_module->get_ctx_size && bdev_module->get_ctx_size() > max_bdev_module_size) { 401 max_bdev_module_size = bdev_module->get_ctx_size(); 402 } 403 } 404 405 return max_bdev_module_size; 406 } 407 408 void 409 spdk_bdev_config_text(FILE *fp) 410 { 411 struct spdk_bdev_module_if *bdev_module; 412 413 TAILQ_FOREACH(bdev_module, &g_bdev_mgr.bdev_modules, tailq) { 414 if (bdev_module->config_text) { 415 bdev_module->config_text(fp); 416 } 417 } 418 } 419 420 static int 421 spdk_bdev_mgmt_channel_create(void *io_device, void *ctx_buf) 422 { 423 struct spdk_bdev_mgmt_channel *ch = ctx_buf; 424 425 STAILQ_INIT(&ch->need_buf_small); 426 STAILQ_INIT(&ch->need_buf_large); 427 428 STAILQ_INIT(&ch->per_thread_cache); 429 ch->per_thread_cache_count = 0; 430 431 TAILQ_INIT(&ch->module_channels); 432 433 return 0; 434 } 435 436 static void 437 spdk_bdev_mgmt_channel_free_resources(struct spdk_bdev_mgmt_channel *ch) 438 { 439 struct spdk_bdev_io *bdev_io; 440 441 if (!STAILQ_EMPTY(&ch->need_buf_small) || !STAILQ_EMPTY(&ch->need_buf_large)) { 442 SPDK_ERRLOG("Pending I/O list wasn't empty on channel free\n"); 443 } 444 445 while (!STAILQ_EMPTY(&ch->per_thread_cache)) { 446 bdev_io = STAILQ_FIRST(&ch->per_thread_cache); 447 STAILQ_REMOVE_HEAD(&ch->per_thread_cache, buf_link); 448 ch->per_thread_cache_count--; 449 spdk_mempool_put(g_bdev_mgr.bdev_io_pool, (void *)bdev_io); 450 } 451 452 assert(ch->per_thread_cache_count == 0); 453 } 454 455 static void 456 spdk_bdev_mgmt_channel_destroy(void *io_device, void *ctx_buf) 457 { 458 struct spdk_bdev_mgmt_channel *ch = ctx_buf; 459 460 spdk_bdev_mgmt_channel_free_resources(ch); 461 } 462 463 static void 464 spdk_bdev_init_complete(int rc) 465 { 466 spdk_bdev_init_cb cb_fn = g_init_cb_fn; 467 void *cb_arg = g_init_cb_arg; 468 469 g_bdev_mgr.init_complete = true; 470 g_init_cb_fn = NULL; 471 g_init_cb_arg = NULL; 472 473 cb_fn(cb_arg, rc); 474 } 475 476 static void 477 spdk_bdev_module_action_complete(void) 478 { 479 struct spdk_bdev_module_if *m; 480 481 /* 482 * Don't finish bdev subsystem initialization if 483 * module pre-initialization is still in progress, or 484 * the subsystem been already initialized. 485 */ 486 if (!g_bdev_mgr.module_init_complete || g_bdev_mgr.init_complete) { 487 return; 488 } 489 490 /* 491 * Check all bdev modules for inits/examinations in progress. If any 492 * exist, return immediately since we cannot finish bdev subsystem 493 * initialization until all are completed. 494 */ 495 TAILQ_FOREACH(m, &g_bdev_mgr.bdev_modules, tailq) { 496 if (m->action_in_progress > 0) { 497 return; 498 } 499 } 500 501 /* 502 * Modules already finished initialization - now that all 503 * the bdev modules have finished their asynchronous I/O 504 * processing, the entire bdev layer can be marked as complete. 505 */ 506 spdk_bdev_init_complete(0); 507 } 508 509 static void 510 spdk_bdev_module_action_done(struct spdk_bdev_module_if *module) 511 { 512 assert(module->action_in_progress > 0); 513 module->action_in_progress--; 514 spdk_bdev_module_action_complete(); 515 } 516 517 void 518 spdk_bdev_module_init_done(struct spdk_bdev_module_if *module) 519 { 520 spdk_bdev_module_action_done(module); 521 } 522 523 void 524 spdk_bdev_module_examine_done(struct spdk_bdev_module_if *module) 525 { 526 spdk_bdev_module_action_done(module); 527 } 528 529 static int 530 spdk_bdev_modules_init(void) 531 { 532 struct spdk_bdev_module_if *module; 533 int rc = 0; 534 535 TAILQ_FOREACH(module, &g_bdev_mgr.bdev_modules, tailq) { 536 rc = module->module_init(); 537 if (rc != 0) { 538 break; 539 } 540 } 541 542 g_bdev_mgr.module_init_complete = true; 543 return rc; 544 } 545 void 546 spdk_bdev_initialize(spdk_bdev_init_cb cb_fn, void *cb_arg) 547 { 548 int cache_size; 549 int rc = 0; 550 char mempool_name[32]; 551 552 assert(cb_fn != NULL); 553 554 g_init_cb_fn = cb_fn; 555 g_init_cb_arg = cb_arg; 556 557 snprintf(mempool_name, sizeof(mempool_name), "bdev_io_%d", getpid()); 558 559 g_bdev_mgr.bdev_io_pool = spdk_mempool_create(mempool_name, 560 SPDK_BDEV_IO_POOL_SIZE, 561 sizeof(struct spdk_bdev_io) + 562 spdk_bdev_module_get_max_ctx_size(), 563 0, 564 SPDK_ENV_SOCKET_ID_ANY); 565 566 if (g_bdev_mgr.bdev_io_pool == NULL) { 567 SPDK_ERRLOG("could not allocate spdk_bdev_io pool\n"); 568 spdk_bdev_init_complete(-1); 569 return; 570 } 571 572 /** 573 * Ensure no more than half of the total buffers end up local caches, by 574 * using spdk_env_get_core_count() to determine how many local caches we need 575 * to account for. 576 */ 577 cache_size = BUF_SMALL_POOL_SIZE / (2 * spdk_env_get_core_count()); 578 snprintf(mempool_name, sizeof(mempool_name), "buf_small_pool_%d", getpid()); 579 580 g_bdev_mgr.buf_small_pool = spdk_mempool_create(mempool_name, 581 BUF_SMALL_POOL_SIZE, 582 SPDK_BDEV_SMALL_BUF_MAX_SIZE + 512, 583 cache_size, 584 SPDK_ENV_SOCKET_ID_ANY); 585 if (!g_bdev_mgr.buf_small_pool) { 586 SPDK_ERRLOG("create rbuf small pool failed\n"); 587 spdk_bdev_init_complete(-1); 588 return; 589 } 590 591 cache_size = BUF_LARGE_POOL_SIZE / (2 * spdk_env_get_core_count()); 592 snprintf(mempool_name, sizeof(mempool_name), "buf_large_pool_%d", getpid()); 593 594 g_bdev_mgr.buf_large_pool = spdk_mempool_create(mempool_name, 595 BUF_LARGE_POOL_SIZE, 596 SPDK_BDEV_LARGE_BUF_MAX_SIZE + 512, 597 cache_size, 598 SPDK_ENV_SOCKET_ID_ANY); 599 if (!g_bdev_mgr.buf_large_pool) { 600 SPDK_ERRLOG("create rbuf large pool failed\n"); 601 spdk_bdev_init_complete(-1); 602 return; 603 } 604 605 g_bdev_mgr.zero_buffer = spdk_dma_zmalloc(ZERO_BUFFER_SIZE, ZERO_BUFFER_SIZE, 606 NULL); 607 if (!g_bdev_mgr.zero_buffer) { 608 SPDK_ERRLOG("create bdev zero buffer failed\n"); 609 spdk_bdev_init_complete(-1); 610 return; 611 } 612 613 #ifdef SPDK_CONFIG_VTUNE 614 g_bdev_mgr.domain = __itt_domain_create("spdk_bdev"); 615 #endif 616 617 spdk_io_device_register(&g_bdev_mgr, spdk_bdev_mgmt_channel_create, 618 spdk_bdev_mgmt_channel_destroy, 619 sizeof(struct spdk_bdev_mgmt_channel)); 620 621 rc = spdk_bdev_modules_init(); 622 if (rc != 0) { 623 SPDK_ERRLOG("bdev modules init failed\n"); 624 spdk_bdev_init_complete(-1); 625 return; 626 } 627 628 spdk_bdev_module_action_complete(); 629 } 630 631 static void 632 spdk_bdev_module_finish_cb(void *io_device) 633 { 634 spdk_bdev_fini_cb cb_fn = g_fini_cb_fn; 635 636 cb_fn(g_fini_cb_arg); 637 g_fini_cb_fn = NULL; 638 g_fini_cb_arg = NULL; 639 } 640 641 static void 642 spdk_bdev_module_finish_complete(struct spdk_io_channel_iter *i, int status) 643 { 644 if (spdk_mempool_count(g_bdev_mgr.bdev_io_pool) != SPDK_BDEV_IO_POOL_SIZE) { 645 SPDK_ERRLOG("bdev IO pool count is %zu but should be %u\n", 646 spdk_mempool_count(g_bdev_mgr.bdev_io_pool), 647 SPDK_BDEV_IO_POOL_SIZE); 648 } 649 650 if (spdk_mempool_count(g_bdev_mgr.buf_small_pool) != BUF_SMALL_POOL_SIZE) { 651 SPDK_ERRLOG("Small buffer pool count is %zu but should be %u\n", 652 spdk_mempool_count(g_bdev_mgr.buf_small_pool), 653 BUF_SMALL_POOL_SIZE); 654 assert(false); 655 } 656 657 if (spdk_mempool_count(g_bdev_mgr.buf_large_pool) != BUF_LARGE_POOL_SIZE) { 658 SPDK_ERRLOG("Large buffer pool count is %zu but should be %u\n", 659 spdk_mempool_count(g_bdev_mgr.buf_large_pool), 660 BUF_LARGE_POOL_SIZE); 661 assert(false); 662 } 663 664 spdk_mempool_free(g_bdev_mgr.bdev_io_pool); 665 spdk_mempool_free(g_bdev_mgr.buf_small_pool); 666 spdk_mempool_free(g_bdev_mgr.buf_large_pool); 667 spdk_dma_free(g_bdev_mgr.zero_buffer); 668 669 spdk_io_device_unregister(&g_bdev_mgr, spdk_bdev_module_finish_cb); 670 } 671 672 static void 673 mgmt_channel_free_resources(struct spdk_io_channel_iter *i) 674 { 675 struct spdk_io_channel *_ch = spdk_io_channel_iter_get_channel(i); 676 struct spdk_bdev_mgmt_channel *ch = spdk_io_channel_get_ctx(_ch); 677 678 spdk_bdev_mgmt_channel_free_resources(ch); 679 spdk_for_each_channel_continue(i, 0); 680 } 681 682 static void 683 spdk_bdev_module_finish_iter(void *arg) 684 { 685 /* Notice that this variable is static. It is saved between calls to 686 * this function. */ 687 static struct spdk_bdev_module_if *resume_bdev_module = NULL; 688 struct spdk_bdev_module_if *bdev_module; 689 690 /* Start iterating from the last touched module */ 691 if (!resume_bdev_module) { 692 bdev_module = TAILQ_FIRST(&g_bdev_mgr.bdev_modules); 693 } else { 694 bdev_module = TAILQ_NEXT(resume_bdev_module, tailq); 695 } 696 697 while (bdev_module) { 698 if (bdev_module->async_fini) { 699 /* Save our place so we can resume later. We must 700 * save the variable here, before calling module_fini() 701 * below, because in some cases the module may immediately 702 * call spdk_bdev_module_finish_done() and re-enter 703 * this function to continue iterating. */ 704 resume_bdev_module = bdev_module; 705 } 706 707 if (bdev_module->module_fini) { 708 bdev_module->module_fini(); 709 } 710 711 if (bdev_module->async_fini) { 712 return; 713 } 714 715 bdev_module = TAILQ_NEXT(bdev_module, tailq); 716 } 717 718 resume_bdev_module = NULL; 719 spdk_for_each_channel(&g_bdev_mgr, mgmt_channel_free_resources, NULL, 720 spdk_bdev_module_finish_complete); 721 } 722 723 void 724 spdk_bdev_module_finish_done(void) 725 { 726 if (spdk_get_thread() != g_fini_thread) { 727 spdk_thread_send_msg(g_fini_thread, spdk_bdev_module_finish_iter, NULL); 728 } else { 729 spdk_bdev_module_finish_iter(NULL); 730 } 731 } 732 733 static void 734 _spdk_bdev_finish_unregister_bdevs_iter(void *cb_arg, int bdeverrno) 735 { 736 struct spdk_bdev *bdev = cb_arg; 737 738 if (bdeverrno && bdev) { 739 SPDK_WARNLOG("Unable to unregister bdev '%s' during spdk_bdev_finish()\n", 740 bdev->name); 741 742 /* 743 * Since the call to spdk_bdev_unregister() failed, we have no way to free this 744 * bdev; try to continue by manually removing this bdev from the list and continue 745 * with the next bdev in the list. 746 */ 747 TAILQ_REMOVE(&g_bdev_mgr.bdevs, bdev, link); 748 } 749 750 if (TAILQ_EMPTY(&g_bdev_mgr.bdevs)) { 751 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Done unregistering bdevs\n"); 752 spdk_bdev_module_finish_iter(NULL); 753 return; 754 } 755 756 /* 757 * Unregister the first bdev in the list. 758 * 759 * spdk_bdev_unregister() will handle the case where the bdev has open descriptors by 760 * calling the remove_cb of the descriptors first. 761 * 762 * Once this bdev and all of its open descriptors have been cleaned up, this function 763 * will be called again via the unregister completion callback to continue the cleanup 764 * process with the next bdev. 765 */ 766 bdev = TAILQ_FIRST(&g_bdev_mgr.bdevs); 767 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Unregistering bdev '%s'\n", bdev->name); 768 spdk_bdev_unregister(bdev, _spdk_bdev_finish_unregister_bdevs_iter, bdev); 769 } 770 771 static void 772 _spdk_bdev_finish_unregister_bdevs(void) 773 { 774 _spdk_bdev_finish_unregister_bdevs_iter(NULL, 0); 775 } 776 777 void 778 spdk_bdev_finish(spdk_bdev_fini_cb cb_fn, void *cb_arg) 779 { 780 assert(cb_fn != NULL); 781 782 g_fini_thread = spdk_get_thread(); 783 784 g_fini_cb_fn = cb_fn; 785 g_fini_cb_arg = cb_arg; 786 787 _spdk_bdev_finish_unregister_bdevs(); 788 } 789 790 static struct spdk_bdev_io * 791 spdk_bdev_get_io(struct spdk_io_channel *_ch) 792 { 793 struct spdk_bdev_mgmt_channel *ch = spdk_io_channel_get_ctx(_ch); 794 struct spdk_bdev_io *bdev_io; 795 796 if (ch->per_thread_cache_count > 0) { 797 bdev_io = STAILQ_FIRST(&ch->per_thread_cache); 798 STAILQ_REMOVE_HEAD(&ch->per_thread_cache, buf_link); 799 ch->per_thread_cache_count--; 800 } else { 801 bdev_io = spdk_mempool_get(g_bdev_mgr.bdev_io_pool); 802 if (!bdev_io) { 803 SPDK_ERRLOG("Unable to get spdk_bdev_io\n"); 804 abort(); 805 } 806 } 807 808 bdev_io->mgmt_ch = ch; 809 810 return bdev_io; 811 } 812 813 static void 814 spdk_bdev_put_io(struct spdk_bdev_io *bdev_io) 815 { 816 struct spdk_bdev_mgmt_channel *ch = bdev_io->mgmt_ch; 817 818 if (bdev_io->buf != NULL) { 819 spdk_bdev_io_put_buf(bdev_io); 820 } 821 822 if (ch->per_thread_cache_count < SPDK_BDEV_IO_CACHE_SIZE) { 823 ch->per_thread_cache_count++; 824 STAILQ_INSERT_TAIL(&ch->per_thread_cache, bdev_io, buf_link); 825 } else { 826 spdk_mempool_put(g_bdev_mgr.bdev_io_pool, (void *)bdev_io); 827 } 828 } 829 830 static void 831 spdk_bdev_io_submit(struct spdk_bdev_io *bdev_io) 832 { 833 struct spdk_bdev *bdev = bdev_io->bdev; 834 struct spdk_bdev_channel *bdev_ch = bdev_io->ch; 835 struct spdk_io_channel *ch = bdev_ch->channel; 836 struct spdk_bdev_module_channel *shared_ch = bdev_ch->module_ch; 837 838 assert(bdev_io->status == SPDK_BDEV_IO_STATUS_PENDING); 839 840 bdev_io->submit_tsc = spdk_get_ticks(); 841 shared_ch->io_outstanding++; 842 bdev_io->in_submit_request = true; 843 if (spdk_likely(bdev_ch->flags == 0)) { 844 if (spdk_likely(TAILQ_EMPTY(&shared_ch->nomem_io))) { 845 bdev->fn_table->submit_request(ch, bdev_io); 846 } else { 847 shared_ch->io_outstanding--; 848 TAILQ_INSERT_TAIL(&shared_ch->nomem_io, bdev_io, link); 849 } 850 } else if (bdev_ch->flags & BDEV_CH_RESET_IN_PROGRESS) { 851 spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED); 852 } else { 853 SPDK_ERRLOG("unknown bdev_ch flag %x found\n", bdev_ch->flags); 854 spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED); 855 } 856 bdev_io->in_submit_request = false; 857 } 858 859 static void 860 spdk_bdev_io_submit_reset(struct spdk_bdev_io *bdev_io) 861 { 862 struct spdk_bdev *bdev = bdev_io->bdev; 863 struct spdk_bdev_channel *bdev_ch = bdev_io->ch; 864 struct spdk_io_channel *ch = bdev_ch->channel; 865 866 assert(bdev_io->status == SPDK_BDEV_IO_STATUS_PENDING); 867 868 bdev_io->in_submit_request = true; 869 bdev->fn_table->submit_request(ch, bdev_io); 870 bdev_io->in_submit_request = false; 871 } 872 873 static void 874 spdk_bdev_io_init(struct spdk_bdev_io *bdev_io, 875 struct spdk_bdev *bdev, void *cb_arg, 876 spdk_bdev_io_completion_cb cb) 877 { 878 bdev_io->bdev = bdev; 879 bdev_io->caller_ctx = cb_arg; 880 bdev_io->cb = cb; 881 bdev_io->status = SPDK_BDEV_IO_STATUS_PENDING; 882 bdev_io->in_submit_request = false; 883 bdev_io->buf = NULL; 884 } 885 886 bool 887 spdk_bdev_io_type_supported(struct spdk_bdev *bdev, enum spdk_bdev_io_type io_type) 888 { 889 return bdev->fn_table->io_type_supported(bdev->ctxt, io_type); 890 } 891 892 int 893 spdk_bdev_dump_info_json(struct spdk_bdev *bdev, struct spdk_json_write_ctx *w) 894 { 895 if (bdev->fn_table->dump_info_json) { 896 return bdev->fn_table->dump_info_json(bdev->ctxt, w); 897 } 898 899 return 0; 900 } 901 902 static int 903 _spdk_bdev_channel_create(struct spdk_bdev_channel *ch, void *io_device) 904 { 905 struct spdk_bdev *bdev = __bdev_from_io_dev(io_device); 906 struct spdk_bdev_mgmt_channel *mgmt_ch; 907 struct spdk_bdev_module_channel *shared_ch; 908 909 ch->bdev = bdev; 910 ch->channel = bdev->fn_table->get_io_channel(bdev->ctxt); 911 if (!ch->channel) { 912 return -1; 913 } 914 915 ch->mgmt_channel = spdk_get_io_channel(&g_bdev_mgr); 916 if (!ch->mgmt_channel) { 917 return -1; 918 } 919 920 mgmt_ch = spdk_io_channel_get_ctx(ch->mgmt_channel); 921 TAILQ_FOREACH(shared_ch, &mgmt_ch->module_channels, link) { 922 if (shared_ch->module_ch == ch->channel) { 923 shared_ch->ref++; 924 break; 925 } 926 } 927 928 if (shared_ch == NULL) { 929 shared_ch = calloc(1, sizeof(*shared_ch)); 930 if (!shared_ch) { 931 return -1; 932 } 933 934 shared_ch->io_outstanding = 0; 935 TAILQ_INIT(&shared_ch->nomem_io); 936 shared_ch->nomem_threshold = 0; 937 shared_ch->module_ch = ch->channel; 938 shared_ch->ref = 1; 939 TAILQ_INSERT_TAIL(&mgmt_ch->module_channels, shared_ch, link); 940 } 941 942 memset(&ch->stat, 0, sizeof(ch->stat)); 943 TAILQ_INIT(&ch->queued_resets); 944 TAILQ_INIT(&ch->qos_io); 945 ch->qos_max_ios_per_timeslice = 0; 946 ch->io_submitted_this_timeslice = 0; 947 ch->qos_poller = NULL; 948 ch->flags = 0; 949 ch->module_ch = shared_ch; 950 951 return 0; 952 } 953 954 static void 955 _spdk_bdev_channel_destroy_resource(struct spdk_bdev_channel *ch) 956 { 957 struct spdk_bdev_mgmt_channel *mgmt_channel; 958 struct spdk_bdev_module_channel *shared_ch = ch->module_ch; 959 960 if (ch->channel) { 961 spdk_put_io_channel(ch->channel); 962 } 963 964 if (ch->mgmt_channel) { 965 if (shared_ch) { 966 assert(shared_ch->ref > 0); 967 shared_ch->ref--; 968 if (shared_ch->ref == 0) { 969 mgmt_channel = spdk_io_channel_get_ctx(ch->mgmt_channel); 970 assert(shared_ch->io_outstanding == 0); 971 TAILQ_REMOVE(&mgmt_channel->module_channels, shared_ch, link); 972 free(shared_ch); 973 } 974 } 975 spdk_put_io_channel(ch->mgmt_channel); 976 } 977 } 978 979 static int 980 spdk_bdev_channel_create(void *io_device, void *ctx_buf) 981 { 982 struct spdk_bdev_channel *ch = ctx_buf; 983 984 if (_spdk_bdev_channel_create(ch, io_device) != 0) { 985 _spdk_bdev_channel_destroy_resource(ch); 986 return -1; 987 } 988 989 #ifdef SPDK_CONFIG_VTUNE 990 { 991 char *name; 992 __itt_init_ittlib(NULL, 0); 993 name = spdk_sprintf_alloc("spdk_bdev_%s_%p", ch->bdev->name, ch); 994 if (!name) { 995 _spdk_bdev_channel_destroy_resource(ch); 996 return -1; 997 } 998 ch->handle = __itt_string_handle_create(name); 999 free(name); 1000 ch->start_tsc = spdk_get_ticks(); 1001 ch->interval_tsc = spdk_get_ticks_hz() / 100; 1002 } 1003 #endif 1004 1005 return 0; 1006 } 1007 1008 /* 1009 * Abort I/O that are waiting on a data buffer. These types of I/O are 1010 * linked using the spdk_bdev_io buf_link TAILQ_ENTRY. 1011 */ 1012 static void 1013 _spdk_bdev_abort_buf_io(bdev_io_stailq_t *queue, struct spdk_bdev_channel *ch) 1014 { 1015 bdev_io_stailq_t tmp; 1016 struct spdk_bdev_io *bdev_io; 1017 1018 STAILQ_INIT(&tmp); 1019 1020 while (!STAILQ_EMPTY(queue)) { 1021 bdev_io = STAILQ_FIRST(queue); 1022 STAILQ_REMOVE_HEAD(queue, buf_link); 1023 if (bdev_io->ch == ch) { 1024 spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED); 1025 } else { 1026 STAILQ_INSERT_TAIL(&tmp, bdev_io, buf_link); 1027 } 1028 } 1029 1030 STAILQ_SWAP(&tmp, queue, spdk_bdev_io); 1031 } 1032 1033 /* 1034 * Abort I/O that are queued waiting for submission. These types of I/O are 1035 * linked using the spdk_bdev_io link TAILQ_ENTRY. 1036 */ 1037 static void 1038 _spdk_bdev_abort_queued_io(bdev_io_tailq_t *queue, struct spdk_bdev_channel *ch) 1039 { 1040 struct spdk_bdev_io *bdev_io, *tmp; 1041 1042 TAILQ_FOREACH_SAFE(bdev_io, queue, link, tmp) { 1043 if (bdev_io->ch == ch) { 1044 TAILQ_REMOVE(queue, bdev_io, link); 1045 /* 1046 * spdk_bdev_io_complete() assumes that the completed I/O had 1047 * been submitted to the bdev module. Since in this case it 1048 * hadn't, bump io_outstanding to account for the decrement 1049 * that spdk_bdev_io_complete() will do. 1050 */ 1051 if (bdev_io->type != SPDK_BDEV_IO_TYPE_RESET) { 1052 ch->module_ch->io_outstanding++; 1053 } 1054 spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED); 1055 } 1056 } 1057 } 1058 1059 static void 1060 _spdk_bdev_channel_destroy(struct spdk_bdev_channel *ch) 1061 { 1062 struct spdk_bdev_mgmt_channel *mgmt_channel; 1063 struct spdk_bdev_module_channel *shared_ch = ch->module_ch; 1064 1065 mgmt_channel = spdk_io_channel_get_ctx(ch->mgmt_channel); 1066 1067 _spdk_bdev_abort_queued_io(&ch->queued_resets, ch); 1068 _spdk_bdev_abort_queued_io(&ch->qos_io, ch); 1069 _spdk_bdev_abort_queued_io(&shared_ch->nomem_io, ch); 1070 _spdk_bdev_abort_buf_io(&mgmt_channel->need_buf_small, ch); 1071 _spdk_bdev_abort_buf_io(&mgmt_channel->need_buf_large, ch); 1072 1073 _spdk_bdev_channel_destroy_resource(ch); 1074 } 1075 1076 static void 1077 spdk_bdev_channel_destroy(void *io_device, void *ctx_buf) 1078 { 1079 struct spdk_bdev_channel *ch = ctx_buf; 1080 1081 _spdk_bdev_channel_destroy(ch); 1082 } 1083 1084 int 1085 spdk_bdev_alias_add(struct spdk_bdev *bdev, const char *alias) 1086 { 1087 struct spdk_bdev_alias *tmp; 1088 1089 if (alias == NULL) { 1090 SPDK_ERRLOG("Empty alias passed\n"); 1091 return -EINVAL; 1092 } 1093 1094 if (spdk_bdev_get_by_name(alias)) { 1095 SPDK_ERRLOG("Bdev name/alias: %s already exists\n", alias); 1096 return -EEXIST; 1097 } 1098 1099 tmp = calloc(1, sizeof(*tmp)); 1100 if (tmp == NULL) { 1101 SPDK_ERRLOG("Unable to allocate alias\n"); 1102 return -ENOMEM; 1103 } 1104 1105 tmp->alias = strdup(alias); 1106 if (tmp->alias == NULL) { 1107 free(tmp); 1108 SPDK_ERRLOG("Unable to allocate alias\n"); 1109 return -ENOMEM; 1110 } 1111 1112 TAILQ_INSERT_TAIL(&bdev->aliases, tmp, tailq); 1113 1114 return 0; 1115 } 1116 1117 int 1118 spdk_bdev_alias_del(struct spdk_bdev *bdev, const char *alias) 1119 { 1120 struct spdk_bdev_alias *tmp; 1121 1122 TAILQ_FOREACH(tmp, &bdev->aliases, tailq) { 1123 if (strcmp(alias, tmp->alias) == 0) { 1124 TAILQ_REMOVE(&bdev->aliases, tmp, tailq); 1125 free(tmp->alias); 1126 free(tmp); 1127 return 0; 1128 } 1129 } 1130 1131 SPDK_INFOLOG(SPDK_LOG_BDEV, "Alias %s does not exists\n", alias); 1132 1133 return -ENOENT; 1134 } 1135 1136 struct spdk_io_channel * 1137 spdk_bdev_get_io_channel(struct spdk_bdev_desc *desc) 1138 { 1139 return spdk_get_io_channel(__bdev_to_io_dev(desc->bdev)); 1140 } 1141 1142 const char * 1143 spdk_bdev_get_name(const struct spdk_bdev *bdev) 1144 { 1145 return bdev->name; 1146 } 1147 1148 const char * 1149 spdk_bdev_get_product_name(const struct spdk_bdev *bdev) 1150 { 1151 return bdev->product_name; 1152 } 1153 1154 const struct spdk_bdev_aliases_list * 1155 spdk_bdev_get_aliases(const struct spdk_bdev *bdev) 1156 { 1157 return &bdev->aliases; 1158 } 1159 1160 uint32_t 1161 spdk_bdev_get_block_size(const struct spdk_bdev *bdev) 1162 { 1163 return bdev->blocklen; 1164 } 1165 1166 uint64_t 1167 spdk_bdev_get_num_blocks(const struct spdk_bdev *bdev) 1168 { 1169 return bdev->blockcnt; 1170 } 1171 1172 size_t 1173 spdk_bdev_get_buf_align(const struct spdk_bdev *bdev) 1174 { 1175 /* TODO: push this logic down to the bdev modules */ 1176 if (bdev->need_aligned_buffer) { 1177 return bdev->blocklen; 1178 } 1179 1180 return 1; 1181 } 1182 1183 uint32_t 1184 spdk_bdev_get_optimal_io_boundary(const struct spdk_bdev *bdev) 1185 { 1186 return bdev->optimal_io_boundary; 1187 } 1188 1189 bool 1190 spdk_bdev_has_write_cache(const struct spdk_bdev *bdev) 1191 { 1192 return bdev->write_cache; 1193 } 1194 1195 int 1196 spdk_bdev_notify_blockcnt_change(struct spdk_bdev *bdev, uint64_t size) 1197 { 1198 int ret; 1199 1200 pthread_mutex_lock(&bdev->mutex); 1201 1202 /* bdev has open descriptors */ 1203 if (!TAILQ_EMPTY(&bdev->open_descs) && 1204 bdev->blockcnt > size) { 1205 ret = -EBUSY; 1206 } else { 1207 bdev->blockcnt = size; 1208 ret = 0; 1209 } 1210 1211 pthread_mutex_unlock(&bdev->mutex); 1212 1213 return ret; 1214 } 1215 1216 /* 1217 * Convert I/O offset and length from bytes to blocks. 1218 * 1219 * Returns zero on success or non-zero if the byte parameters aren't divisible by the block size. 1220 */ 1221 static uint64_t 1222 spdk_bdev_bytes_to_blocks(struct spdk_bdev *bdev, uint64_t offset_bytes, uint64_t *offset_blocks, 1223 uint64_t num_bytes, uint64_t *num_blocks) 1224 { 1225 uint32_t block_size = bdev->blocklen; 1226 1227 *offset_blocks = offset_bytes / block_size; 1228 *num_blocks = num_bytes / block_size; 1229 1230 return (offset_bytes % block_size) | (num_bytes % block_size); 1231 } 1232 1233 static bool 1234 spdk_bdev_io_valid_blocks(struct spdk_bdev *bdev, uint64_t offset_blocks, uint64_t num_blocks) 1235 { 1236 /* Return failure if offset_blocks + num_blocks is less than offset_blocks; indicates there 1237 * has been an overflow and hence the offset has been wrapped around */ 1238 if (offset_blocks + num_blocks < offset_blocks) { 1239 return false; 1240 } 1241 1242 /* Return failure if offset_blocks + num_blocks exceeds the size of the bdev */ 1243 if (offset_blocks + num_blocks > bdev->blockcnt) { 1244 return false; 1245 } 1246 1247 return true; 1248 } 1249 1250 int 1251 spdk_bdev_read(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 1252 void *buf, uint64_t offset, uint64_t nbytes, 1253 spdk_bdev_io_completion_cb cb, void *cb_arg) 1254 { 1255 uint64_t offset_blocks, num_blocks; 1256 1257 if (spdk_bdev_bytes_to_blocks(desc->bdev, offset, &offset_blocks, nbytes, &num_blocks) != 0) { 1258 return -EINVAL; 1259 } 1260 1261 return spdk_bdev_read_blocks(desc, ch, buf, offset_blocks, num_blocks, cb, cb_arg); 1262 } 1263 1264 int 1265 spdk_bdev_read_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 1266 void *buf, uint64_t offset_blocks, uint64_t num_blocks, 1267 spdk_bdev_io_completion_cb cb, void *cb_arg) 1268 { 1269 struct spdk_bdev *bdev = desc->bdev; 1270 struct spdk_bdev_io *bdev_io; 1271 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 1272 1273 if (!spdk_bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) { 1274 return -EINVAL; 1275 } 1276 1277 bdev_io = spdk_bdev_get_io(channel->mgmt_channel); 1278 if (!bdev_io) { 1279 SPDK_ERRLOG("spdk_bdev_io memory allocation failed duing read\n"); 1280 return -ENOMEM; 1281 } 1282 1283 bdev_io->ch = channel; 1284 bdev_io->type = SPDK_BDEV_IO_TYPE_READ; 1285 bdev_io->u.bdev.iov.iov_base = buf; 1286 bdev_io->u.bdev.iov.iov_len = num_blocks * bdev->blocklen; 1287 bdev_io->u.bdev.iovs = &bdev_io->u.bdev.iov; 1288 bdev_io->u.bdev.iovcnt = 1; 1289 bdev_io->u.bdev.num_blocks = num_blocks; 1290 bdev_io->u.bdev.offset_blocks = offset_blocks; 1291 spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb); 1292 1293 spdk_bdev_io_submit(bdev_io); 1294 return 0; 1295 } 1296 1297 int 1298 spdk_bdev_readv(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 1299 struct iovec *iov, int iovcnt, 1300 uint64_t offset, uint64_t nbytes, 1301 spdk_bdev_io_completion_cb cb, void *cb_arg) 1302 { 1303 uint64_t offset_blocks, num_blocks; 1304 1305 if (spdk_bdev_bytes_to_blocks(desc->bdev, offset, &offset_blocks, nbytes, &num_blocks) != 0) { 1306 return -EINVAL; 1307 } 1308 1309 return spdk_bdev_readv_blocks(desc, ch, iov, iovcnt, offset_blocks, num_blocks, cb, cb_arg); 1310 } 1311 1312 int spdk_bdev_readv_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 1313 struct iovec *iov, int iovcnt, 1314 uint64_t offset_blocks, uint64_t num_blocks, 1315 spdk_bdev_io_completion_cb cb, void *cb_arg) 1316 { 1317 struct spdk_bdev *bdev = desc->bdev; 1318 struct spdk_bdev_io *bdev_io; 1319 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 1320 1321 if (!spdk_bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) { 1322 return -EINVAL; 1323 } 1324 1325 bdev_io = spdk_bdev_get_io(channel->mgmt_channel); 1326 if (!bdev_io) { 1327 SPDK_ERRLOG("spdk_bdev_io memory allocation failed duing read\n"); 1328 return -ENOMEM; 1329 } 1330 1331 bdev_io->ch = channel; 1332 bdev_io->type = SPDK_BDEV_IO_TYPE_READ; 1333 bdev_io->u.bdev.iovs = iov; 1334 bdev_io->u.bdev.iovcnt = iovcnt; 1335 bdev_io->u.bdev.num_blocks = num_blocks; 1336 bdev_io->u.bdev.offset_blocks = offset_blocks; 1337 spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb); 1338 1339 spdk_bdev_io_submit(bdev_io); 1340 return 0; 1341 } 1342 1343 int 1344 spdk_bdev_write(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 1345 void *buf, uint64_t offset, uint64_t nbytes, 1346 spdk_bdev_io_completion_cb cb, void *cb_arg) 1347 { 1348 uint64_t offset_blocks, num_blocks; 1349 1350 if (spdk_bdev_bytes_to_blocks(desc->bdev, offset, &offset_blocks, nbytes, &num_blocks) != 0) { 1351 return -EINVAL; 1352 } 1353 1354 return spdk_bdev_write_blocks(desc, ch, buf, offset_blocks, num_blocks, cb, cb_arg); 1355 } 1356 1357 int 1358 spdk_bdev_write_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 1359 void *buf, uint64_t offset_blocks, uint64_t num_blocks, 1360 spdk_bdev_io_completion_cb cb, void *cb_arg) 1361 { 1362 struct spdk_bdev *bdev = desc->bdev; 1363 struct spdk_bdev_io *bdev_io; 1364 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 1365 1366 if (!desc->write) { 1367 return -EBADF; 1368 } 1369 1370 if (!spdk_bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) { 1371 return -EINVAL; 1372 } 1373 1374 bdev_io = spdk_bdev_get_io(channel->mgmt_channel); 1375 if (!bdev_io) { 1376 SPDK_ERRLOG("bdev_io memory allocation failed duing write\n"); 1377 return -ENOMEM; 1378 } 1379 1380 bdev_io->ch = channel; 1381 bdev_io->type = SPDK_BDEV_IO_TYPE_WRITE; 1382 bdev_io->u.bdev.iov.iov_base = buf; 1383 bdev_io->u.bdev.iov.iov_len = num_blocks * bdev->blocklen; 1384 bdev_io->u.bdev.iovs = &bdev_io->u.bdev.iov; 1385 bdev_io->u.bdev.iovcnt = 1; 1386 bdev_io->u.bdev.num_blocks = num_blocks; 1387 bdev_io->u.bdev.offset_blocks = offset_blocks; 1388 spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb); 1389 1390 spdk_bdev_io_submit(bdev_io); 1391 return 0; 1392 } 1393 1394 int 1395 spdk_bdev_writev(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 1396 struct iovec *iov, int iovcnt, 1397 uint64_t offset, uint64_t len, 1398 spdk_bdev_io_completion_cb cb, void *cb_arg) 1399 { 1400 uint64_t offset_blocks, num_blocks; 1401 1402 if (spdk_bdev_bytes_to_blocks(desc->bdev, offset, &offset_blocks, len, &num_blocks) != 0) { 1403 return -EINVAL; 1404 } 1405 1406 return spdk_bdev_writev_blocks(desc, ch, iov, iovcnt, offset_blocks, num_blocks, cb, cb_arg); 1407 } 1408 1409 int 1410 spdk_bdev_writev_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 1411 struct iovec *iov, int iovcnt, 1412 uint64_t offset_blocks, uint64_t num_blocks, 1413 spdk_bdev_io_completion_cb cb, void *cb_arg) 1414 { 1415 struct spdk_bdev *bdev = desc->bdev; 1416 struct spdk_bdev_io *bdev_io; 1417 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 1418 1419 if (!desc->write) { 1420 return -EBADF; 1421 } 1422 1423 if (!spdk_bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) { 1424 return -EINVAL; 1425 } 1426 1427 bdev_io = spdk_bdev_get_io(channel->mgmt_channel); 1428 if (!bdev_io) { 1429 SPDK_ERRLOG("bdev_io memory allocation failed duing writev\n"); 1430 return -ENOMEM; 1431 } 1432 1433 bdev_io->ch = channel; 1434 bdev_io->type = SPDK_BDEV_IO_TYPE_WRITE; 1435 bdev_io->u.bdev.iovs = iov; 1436 bdev_io->u.bdev.iovcnt = iovcnt; 1437 bdev_io->u.bdev.num_blocks = num_blocks; 1438 bdev_io->u.bdev.offset_blocks = offset_blocks; 1439 spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb); 1440 1441 spdk_bdev_io_submit(bdev_io); 1442 return 0; 1443 } 1444 1445 int 1446 spdk_bdev_write_zeroes(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 1447 uint64_t offset, uint64_t len, 1448 spdk_bdev_io_completion_cb cb, void *cb_arg) 1449 { 1450 uint64_t offset_blocks, num_blocks; 1451 1452 if (spdk_bdev_bytes_to_blocks(desc->bdev, offset, &offset_blocks, len, &num_blocks) != 0) { 1453 return -EINVAL; 1454 } 1455 1456 return spdk_bdev_write_zeroes_blocks(desc, ch, offset_blocks, num_blocks, cb, cb_arg); 1457 } 1458 1459 int 1460 spdk_bdev_write_zeroes_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 1461 uint64_t offset_blocks, uint64_t num_blocks, 1462 spdk_bdev_io_completion_cb cb, void *cb_arg) 1463 { 1464 struct spdk_bdev *bdev = desc->bdev; 1465 struct spdk_bdev_io *bdev_io; 1466 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 1467 uint64_t len; 1468 bool split_request = false; 1469 1470 if (num_blocks > UINT64_MAX / spdk_bdev_get_block_size(bdev)) { 1471 SPDK_ERRLOG("length argument out of range in write_zeroes\n"); 1472 return -ERANGE; 1473 } 1474 1475 if (!spdk_bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) { 1476 return -EINVAL; 1477 } 1478 1479 bdev_io = spdk_bdev_get_io(channel->mgmt_channel); 1480 1481 if (!bdev_io) { 1482 SPDK_ERRLOG("bdev_io memory allocation failed duing write_zeroes\n"); 1483 return -ENOMEM; 1484 } 1485 1486 bdev_io->ch = channel; 1487 bdev_io->u.bdev.offset_blocks = offset_blocks; 1488 1489 if (spdk_bdev_io_type_supported(bdev, SPDK_BDEV_IO_TYPE_WRITE_ZEROES)) { 1490 bdev_io->type = SPDK_BDEV_IO_TYPE_WRITE_ZEROES; 1491 bdev_io->u.bdev.num_blocks = num_blocks; 1492 bdev_io->u.bdev.iovs = NULL; 1493 bdev_io->u.bdev.iovcnt = 0; 1494 1495 } else { 1496 assert(spdk_bdev_get_block_size(bdev) <= ZERO_BUFFER_SIZE); 1497 1498 len = spdk_bdev_get_block_size(bdev) * num_blocks; 1499 1500 if (len > ZERO_BUFFER_SIZE) { 1501 split_request = true; 1502 len = ZERO_BUFFER_SIZE; 1503 } 1504 1505 bdev_io->type = SPDK_BDEV_IO_TYPE_WRITE; 1506 bdev_io->u.bdev.iov.iov_base = g_bdev_mgr.zero_buffer; 1507 bdev_io->u.bdev.iov.iov_len = len; 1508 bdev_io->u.bdev.iovs = &bdev_io->u.bdev.iov; 1509 bdev_io->u.bdev.iovcnt = 1; 1510 bdev_io->u.bdev.num_blocks = len / spdk_bdev_get_block_size(bdev); 1511 bdev_io->split_remaining_num_blocks = num_blocks - bdev_io->u.bdev.num_blocks; 1512 bdev_io->split_current_offset_blocks = offset_blocks + bdev_io->u.bdev.num_blocks; 1513 } 1514 1515 if (split_request) { 1516 bdev_io->stored_user_cb = cb; 1517 spdk_bdev_io_init(bdev_io, bdev, cb_arg, spdk_bdev_write_zeroes_split); 1518 } else { 1519 spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb); 1520 } 1521 spdk_bdev_io_submit(bdev_io); 1522 return 0; 1523 } 1524 1525 int 1526 spdk_bdev_unmap(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 1527 uint64_t offset, uint64_t nbytes, 1528 spdk_bdev_io_completion_cb cb, void *cb_arg) 1529 { 1530 uint64_t offset_blocks, num_blocks; 1531 1532 if (spdk_bdev_bytes_to_blocks(desc->bdev, offset, &offset_blocks, nbytes, &num_blocks) != 0) { 1533 return -EINVAL; 1534 } 1535 1536 return spdk_bdev_unmap_blocks(desc, ch, offset_blocks, num_blocks, cb, cb_arg); 1537 } 1538 1539 int 1540 spdk_bdev_unmap_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 1541 uint64_t offset_blocks, uint64_t num_blocks, 1542 spdk_bdev_io_completion_cb cb, void *cb_arg) 1543 { 1544 struct spdk_bdev *bdev = desc->bdev; 1545 struct spdk_bdev_io *bdev_io; 1546 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 1547 1548 if (!desc->write) { 1549 return -EBADF; 1550 } 1551 1552 if (!spdk_bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) { 1553 return -EINVAL; 1554 } 1555 1556 if (num_blocks == 0) { 1557 SPDK_ERRLOG("Can't unmap 0 bytes\n"); 1558 return -EINVAL; 1559 } 1560 1561 bdev_io = spdk_bdev_get_io(channel->mgmt_channel); 1562 if (!bdev_io) { 1563 SPDK_ERRLOG("bdev_io memory allocation failed duing unmap\n"); 1564 return -ENOMEM; 1565 } 1566 1567 bdev_io->ch = channel; 1568 bdev_io->type = SPDK_BDEV_IO_TYPE_UNMAP; 1569 bdev_io->u.bdev.iov.iov_base = NULL; 1570 bdev_io->u.bdev.iov.iov_len = 0; 1571 bdev_io->u.bdev.iovs = &bdev_io->u.bdev.iov; 1572 bdev_io->u.bdev.iovcnt = 1; 1573 bdev_io->u.bdev.offset_blocks = offset_blocks; 1574 bdev_io->u.bdev.num_blocks = num_blocks; 1575 spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb); 1576 1577 spdk_bdev_io_submit(bdev_io); 1578 return 0; 1579 } 1580 1581 int 1582 spdk_bdev_flush(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 1583 uint64_t offset, uint64_t length, 1584 spdk_bdev_io_completion_cb cb, void *cb_arg) 1585 { 1586 uint64_t offset_blocks, num_blocks; 1587 1588 if (spdk_bdev_bytes_to_blocks(desc->bdev, offset, &offset_blocks, length, &num_blocks) != 0) { 1589 return -EINVAL; 1590 } 1591 1592 return spdk_bdev_flush_blocks(desc, ch, offset_blocks, num_blocks, cb, cb_arg); 1593 } 1594 1595 int 1596 spdk_bdev_flush_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 1597 uint64_t offset_blocks, uint64_t num_blocks, 1598 spdk_bdev_io_completion_cb cb, void *cb_arg) 1599 { 1600 struct spdk_bdev *bdev = desc->bdev; 1601 struct spdk_bdev_io *bdev_io; 1602 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 1603 1604 if (!desc->write) { 1605 return -EBADF; 1606 } 1607 1608 if (!spdk_bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) { 1609 return -EINVAL; 1610 } 1611 1612 bdev_io = spdk_bdev_get_io(channel->mgmt_channel); 1613 if (!bdev_io) { 1614 SPDK_ERRLOG("bdev_io memory allocation failed duing flush\n"); 1615 return -ENOMEM; 1616 } 1617 1618 bdev_io->ch = channel; 1619 bdev_io->type = SPDK_BDEV_IO_TYPE_FLUSH; 1620 bdev_io->u.bdev.iovs = NULL; 1621 bdev_io->u.bdev.iovcnt = 0; 1622 bdev_io->u.bdev.offset_blocks = offset_blocks; 1623 bdev_io->u.bdev.num_blocks = num_blocks; 1624 spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb); 1625 1626 spdk_bdev_io_submit(bdev_io); 1627 return 0; 1628 } 1629 1630 static void 1631 _spdk_bdev_reset_dev(struct spdk_io_channel_iter *i, int status) 1632 { 1633 struct spdk_bdev_channel *ch = spdk_io_channel_iter_get_ctx(i); 1634 struct spdk_bdev_io *bdev_io; 1635 1636 bdev_io = TAILQ_FIRST(&ch->queued_resets); 1637 TAILQ_REMOVE(&ch->queued_resets, bdev_io, link); 1638 spdk_bdev_io_submit_reset(bdev_io); 1639 } 1640 1641 static void 1642 _spdk_bdev_reset_freeze_channel(struct spdk_io_channel_iter *i) 1643 { 1644 struct spdk_io_channel *ch; 1645 struct spdk_bdev_channel *channel; 1646 struct spdk_bdev_mgmt_channel *mgmt_channel; 1647 struct spdk_bdev_module_channel *shared_ch; 1648 1649 ch = spdk_io_channel_iter_get_channel(i); 1650 channel = spdk_io_channel_get_ctx(ch); 1651 mgmt_channel = spdk_io_channel_get_ctx(channel->mgmt_channel); 1652 shared_ch = channel->module_ch; 1653 1654 channel->flags |= BDEV_CH_RESET_IN_PROGRESS; 1655 1656 _spdk_bdev_abort_queued_io(&shared_ch->nomem_io, channel); 1657 _spdk_bdev_abort_queued_io(&channel->qos_io, channel); 1658 _spdk_bdev_abort_buf_io(&mgmt_channel->need_buf_small, channel); 1659 _spdk_bdev_abort_buf_io(&mgmt_channel->need_buf_large, channel); 1660 1661 spdk_for_each_channel_continue(i, 0); 1662 } 1663 1664 static void 1665 _spdk_bdev_start_reset(void *ctx) 1666 { 1667 struct spdk_bdev_channel *ch = ctx; 1668 1669 spdk_for_each_channel(__bdev_to_io_dev(ch->bdev), _spdk_bdev_reset_freeze_channel, 1670 ch, _spdk_bdev_reset_dev); 1671 } 1672 1673 static void 1674 _spdk_bdev_channel_start_reset(struct spdk_bdev_channel *ch) 1675 { 1676 struct spdk_bdev *bdev = ch->bdev; 1677 1678 assert(!TAILQ_EMPTY(&ch->queued_resets)); 1679 1680 pthread_mutex_lock(&bdev->mutex); 1681 if (bdev->reset_in_progress == NULL) { 1682 bdev->reset_in_progress = TAILQ_FIRST(&ch->queued_resets); 1683 /* 1684 * Take a channel reference for the target bdev for the life of this 1685 * reset. This guards against the channel getting destroyed while 1686 * spdk_for_each_channel() calls related to this reset IO are in 1687 * progress. We will release the reference when this reset is 1688 * completed. 1689 */ 1690 bdev->reset_in_progress->u.reset.ch_ref = spdk_get_io_channel(__bdev_to_io_dev(bdev)); 1691 _spdk_bdev_start_reset(ch); 1692 } 1693 pthread_mutex_unlock(&bdev->mutex); 1694 } 1695 1696 int 1697 spdk_bdev_reset(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 1698 spdk_bdev_io_completion_cb cb, void *cb_arg) 1699 { 1700 struct spdk_bdev *bdev = desc->bdev; 1701 struct spdk_bdev_io *bdev_io; 1702 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 1703 1704 bdev_io = spdk_bdev_get_io(channel->mgmt_channel); 1705 if (!bdev_io) { 1706 SPDK_ERRLOG("bdev_io memory allocation failed duing reset\n"); 1707 return -ENOMEM; 1708 } 1709 1710 bdev_io->ch = channel; 1711 bdev_io->type = SPDK_BDEV_IO_TYPE_RESET; 1712 bdev_io->u.reset.ch_ref = NULL; 1713 spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb); 1714 1715 pthread_mutex_lock(&bdev->mutex); 1716 TAILQ_INSERT_TAIL(&channel->queued_resets, bdev_io, link); 1717 pthread_mutex_unlock(&bdev->mutex); 1718 1719 _spdk_bdev_channel_start_reset(channel); 1720 1721 return 0; 1722 } 1723 1724 void 1725 spdk_bdev_get_io_stat(struct spdk_bdev *bdev, struct spdk_io_channel *ch, 1726 struct spdk_bdev_io_stat *stat) 1727 { 1728 #ifdef SPDK_CONFIG_VTUNE 1729 SPDK_ERRLOG("Calling spdk_bdev_get_io_stat is not allowed when VTune integration is enabled.\n"); 1730 memset(stat, 0, sizeof(*stat)); 1731 return; 1732 #endif 1733 1734 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 1735 1736 channel->stat.ticks_rate = spdk_get_ticks_hz(); 1737 *stat = channel->stat; 1738 memset(&channel->stat, 0, sizeof(channel->stat)); 1739 } 1740 1741 int 1742 spdk_bdev_nvme_admin_passthru(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 1743 const struct spdk_nvme_cmd *cmd, void *buf, size_t nbytes, 1744 spdk_bdev_io_completion_cb cb, void *cb_arg) 1745 { 1746 struct spdk_bdev *bdev = desc->bdev; 1747 struct spdk_bdev_io *bdev_io; 1748 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 1749 1750 if (!desc->write) { 1751 return -EBADF; 1752 } 1753 1754 bdev_io = spdk_bdev_get_io(channel->mgmt_channel); 1755 if (!bdev_io) { 1756 SPDK_ERRLOG("bdev_io memory allocation failed during nvme_admin_passthru\n"); 1757 return -ENOMEM; 1758 } 1759 1760 bdev_io->ch = channel; 1761 bdev_io->type = SPDK_BDEV_IO_TYPE_NVME_ADMIN; 1762 bdev_io->u.nvme_passthru.cmd = *cmd; 1763 bdev_io->u.nvme_passthru.buf = buf; 1764 bdev_io->u.nvme_passthru.nbytes = nbytes; 1765 bdev_io->u.nvme_passthru.md_buf = NULL; 1766 bdev_io->u.nvme_passthru.md_len = 0; 1767 1768 spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb); 1769 1770 spdk_bdev_io_submit(bdev_io); 1771 return 0; 1772 } 1773 1774 int 1775 spdk_bdev_nvme_io_passthru(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 1776 const struct spdk_nvme_cmd *cmd, void *buf, size_t nbytes, 1777 spdk_bdev_io_completion_cb cb, void *cb_arg) 1778 { 1779 struct spdk_bdev *bdev = desc->bdev; 1780 struct spdk_bdev_io *bdev_io; 1781 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 1782 1783 if (!desc->write) { 1784 /* 1785 * Do not try to parse the NVMe command - we could maybe use bits in the opcode 1786 * to easily determine if the command is a read or write, but for now just 1787 * do not allow io_passthru with a read-only descriptor. 1788 */ 1789 return -EBADF; 1790 } 1791 1792 bdev_io = spdk_bdev_get_io(channel->mgmt_channel); 1793 if (!bdev_io) { 1794 SPDK_ERRLOG("bdev_io memory allocation failed during nvme_admin_passthru\n"); 1795 return -ENOMEM; 1796 } 1797 1798 bdev_io->ch = channel; 1799 bdev_io->type = SPDK_BDEV_IO_TYPE_NVME_IO; 1800 bdev_io->u.nvme_passthru.cmd = *cmd; 1801 bdev_io->u.nvme_passthru.buf = buf; 1802 bdev_io->u.nvme_passthru.nbytes = nbytes; 1803 bdev_io->u.nvme_passthru.md_buf = NULL; 1804 bdev_io->u.nvme_passthru.md_len = 0; 1805 1806 spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb); 1807 1808 spdk_bdev_io_submit(bdev_io); 1809 return 0; 1810 } 1811 1812 int 1813 spdk_bdev_nvme_io_passthru_md(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch, 1814 const struct spdk_nvme_cmd *cmd, void *buf, size_t nbytes, void *md_buf, size_t md_len, 1815 spdk_bdev_io_completion_cb cb, void *cb_arg) 1816 { 1817 struct spdk_bdev *bdev = desc->bdev; 1818 struct spdk_bdev_io *bdev_io; 1819 struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch); 1820 1821 if (!desc->write) { 1822 /* 1823 * Do not try to parse the NVMe command - we could maybe use bits in the opcode 1824 * to easily determine if the command is a read or write, but for now just 1825 * do not allow io_passthru with a read-only descriptor. 1826 */ 1827 return -EBADF; 1828 } 1829 1830 bdev_io = spdk_bdev_get_io(channel->mgmt_channel); 1831 if (!bdev_io) { 1832 SPDK_ERRLOG("bdev_io memory allocation failed during nvme_admin_passthru\n"); 1833 return -ENOMEM; 1834 } 1835 1836 bdev_io->ch = channel; 1837 bdev_io->type = SPDK_BDEV_IO_TYPE_NVME_IO_MD; 1838 bdev_io->u.nvme_passthru.cmd = *cmd; 1839 bdev_io->u.nvme_passthru.buf = buf; 1840 bdev_io->u.nvme_passthru.nbytes = nbytes; 1841 bdev_io->u.nvme_passthru.md_buf = md_buf; 1842 bdev_io->u.nvme_passthru.md_len = md_len; 1843 1844 spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb); 1845 1846 spdk_bdev_io_submit(bdev_io); 1847 return 0; 1848 } 1849 1850 int 1851 spdk_bdev_free_io(struct spdk_bdev_io *bdev_io) 1852 { 1853 if (!bdev_io) { 1854 SPDK_ERRLOG("bdev_io is NULL\n"); 1855 return -1; 1856 } 1857 1858 if (bdev_io->status == SPDK_BDEV_IO_STATUS_PENDING) { 1859 SPDK_ERRLOG("bdev_io is in pending state\n"); 1860 assert(false); 1861 return -1; 1862 } 1863 1864 spdk_bdev_put_io(bdev_io); 1865 1866 return 0; 1867 } 1868 1869 static void 1870 _spdk_bdev_ch_retry_io(struct spdk_bdev_channel *bdev_ch) 1871 { 1872 struct spdk_bdev *bdev = bdev_ch->bdev; 1873 struct spdk_bdev_module_channel *shared_ch = bdev_ch->module_ch; 1874 struct spdk_bdev_io *bdev_io; 1875 1876 if (shared_ch->io_outstanding > shared_ch->nomem_threshold) { 1877 /* 1878 * Allow some more I/O to complete before retrying the nomem_io queue. 1879 * Some drivers (such as nvme) cannot immediately take a new I/O in 1880 * the context of a completion, because the resources for the I/O are 1881 * not released until control returns to the bdev poller. Also, we 1882 * may require several small I/O to complete before a larger I/O 1883 * (that requires splitting) can be submitted. 1884 */ 1885 return; 1886 } 1887 1888 while (!TAILQ_EMPTY(&shared_ch->nomem_io)) { 1889 bdev_io = TAILQ_FIRST(&shared_ch->nomem_io); 1890 TAILQ_REMOVE(&shared_ch->nomem_io, bdev_io, link); 1891 shared_ch->io_outstanding++; 1892 bdev_io->status = SPDK_BDEV_IO_STATUS_PENDING; 1893 bdev->fn_table->submit_request(bdev_io->ch->channel, bdev_io); 1894 if (bdev_io->status == SPDK_BDEV_IO_STATUS_NOMEM) { 1895 break; 1896 } 1897 } 1898 } 1899 1900 static void 1901 _spdk_bdev_io_complete(void *ctx) 1902 { 1903 struct spdk_bdev_io *bdev_io = ctx; 1904 1905 assert(bdev_io->cb != NULL); 1906 bdev_io->cb(bdev_io, bdev_io->status == SPDK_BDEV_IO_STATUS_SUCCESS, bdev_io->caller_ctx); 1907 } 1908 1909 static void 1910 _spdk_bdev_reset_complete(struct spdk_io_channel_iter *i, int status) 1911 { 1912 struct spdk_bdev_io *bdev_io = spdk_io_channel_iter_get_ctx(i); 1913 1914 if (bdev_io->u.reset.ch_ref != NULL) { 1915 spdk_put_io_channel(bdev_io->u.reset.ch_ref); 1916 bdev_io->u.reset.ch_ref = NULL; 1917 } 1918 1919 _spdk_bdev_io_complete(bdev_io); 1920 } 1921 1922 static void 1923 _spdk_bdev_unfreeze_channel(struct spdk_io_channel_iter *i) 1924 { 1925 struct spdk_io_channel *_ch = spdk_io_channel_iter_get_channel(i); 1926 struct spdk_bdev_channel *ch = spdk_io_channel_get_ctx(_ch); 1927 1928 ch->flags &= ~BDEV_CH_RESET_IN_PROGRESS; 1929 if (!TAILQ_EMPTY(&ch->queued_resets)) { 1930 _spdk_bdev_channel_start_reset(ch); 1931 } 1932 1933 spdk_for_each_channel_continue(i, 0); 1934 } 1935 1936 void 1937 spdk_bdev_io_complete(struct spdk_bdev_io *bdev_io, enum spdk_bdev_io_status status) 1938 { 1939 struct spdk_bdev *bdev = bdev_io->bdev; 1940 struct spdk_bdev_channel *bdev_ch = bdev_io->ch; 1941 struct spdk_bdev_module_channel *shared_ch = bdev_ch->module_ch; 1942 1943 bdev_io->status = status; 1944 1945 if (spdk_unlikely(bdev_io->type == SPDK_BDEV_IO_TYPE_RESET)) { 1946 bool unlock_channels = false; 1947 1948 if (status == SPDK_BDEV_IO_STATUS_NOMEM) { 1949 SPDK_ERRLOG("NOMEM returned for reset\n"); 1950 } 1951 pthread_mutex_lock(&bdev->mutex); 1952 if (bdev_io == bdev->reset_in_progress) { 1953 bdev->reset_in_progress = NULL; 1954 unlock_channels = true; 1955 } 1956 pthread_mutex_unlock(&bdev->mutex); 1957 1958 if (unlock_channels) { 1959 spdk_for_each_channel(__bdev_to_io_dev(bdev), _spdk_bdev_unfreeze_channel, 1960 bdev_io, _spdk_bdev_reset_complete); 1961 return; 1962 } 1963 } else { 1964 assert(shared_ch->io_outstanding > 0); 1965 shared_ch->io_outstanding--; 1966 if (spdk_likely(status != SPDK_BDEV_IO_STATUS_NOMEM)) { 1967 if (spdk_unlikely(!TAILQ_EMPTY(&shared_ch->nomem_io))) { 1968 _spdk_bdev_ch_retry_io(bdev_ch); 1969 } 1970 } else { 1971 TAILQ_INSERT_HEAD(&shared_ch->nomem_io, bdev_io, link); 1972 /* 1973 * Wait for some of the outstanding I/O to complete before we 1974 * retry any of the nomem_io. Normally we will wait for 1975 * NOMEM_THRESHOLD_COUNT I/O to complete but for low queue 1976 * depth channels we will instead wait for half to complete. 1977 */ 1978 shared_ch->nomem_threshold = spdk_max((int64_t)shared_ch->io_outstanding / 2, 1979 (int64_t)shared_ch->io_outstanding - NOMEM_THRESHOLD_COUNT); 1980 return; 1981 } 1982 } 1983 1984 if (status == SPDK_BDEV_IO_STATUS_SUCCESS) { 1985 switch (bdev_io->type) { 1986 case SPDK_BDEV_IO_TYPE_READ: 1987 bdev_ch->stat.bytes_read += bdev_io->u.bdev.num_blocks * bdev->blocklen; 1988 bdev_ch->stat.num_read_ops++; 1989 bdev_ch->stat.read_latency_ticks += (spdk_get_ticks() - bdev_io->submit_tsc); 1990 break; 1991 case SPDK_BDEV_IO_TYPE_WRITE: 1992 bdev_ch->stat.bytes_written += bdev_io->u.bdev.num_blocks * bdev->blocklen; 1993 bdev_ch->stat.num_write_ops++; 1994 bdev_ch->stat.write_latency_ticks += (spdk_get_ticks() - bdev_io->submit_tsc); 1995 break; 1996 default: 1997 break; 1998 } 1999 } 2000 2001 #ifdef SPDK_CONFIG_VTUNE 2002 uint64_t now_tsc = spdk_get_ticks(); 2003 if (now_tsc > (bdev_ch->start_tsc + bdev_ch->interval_tsc)) { 2004 uint64_t data[5]; 2005 2006 data[0] = bdev_ch->stat.num_read_ops; 2007 data[1] = bdev_ch->stat.bytes_read; 2008 data[2] = bdev_ch->stat.num_write_ops; 2009 data[3] = bdev_ch->stat.bytes_written; 2010 data[4] = bdev->fn_table->get_spin_time ? 2011 bdev->fn_table->get_spin_time(bdev_ch->channel) : 0; 2012 2013 __itt_metadata_add(g_bdev_mgr.domain, __itt_null, bdev_ch->handle, 2014 __itt_metadata_u64, 5, data); 2015 2016 memset(&bdev_ch->stat, 0, sizeof(bdev_ch->stat)); 2017 bdev_ch->start_tsc = now_tsc; 2018 } 2019 #endif 2020 2021 if (bdev_io->in_submit_request) { 2022 /* 2023 * Defer completion to avoid potential infinite recursion if the 2024 * user's completion callback issues a new I/O. 2025 */ 2026 spdk_thread_send_msg(spdk_io_channel_get_thread(bdev_ch->channel), 2027 _spdk_bdev_io_complete, bdev_io); 2028 } else { 2029 _spdk_bdev_io_complete(bdev_io); 2030 } 2031 } 2032 2033 void 2034 spdk_bdev_io_complete_scsi_status(struct spdk_bdev_io *bdev_io, enum spdk_scsi_status sc, 2035 enum spdk_scsi_sense sk, uint8_t asc, uint8_t ascq) 2036 { 2037 if (sc == SPDK_SCSI_STATUS_GOOD) { 2038 bdev_io->status = SPDK_BDEV_IO_STATUS_SUCCESS; 2039 } else { 2040 bdev_io->status = SPDK_BDEV_IO_STATUS_SCSI_ERROR; 2041 bdev_io->error.scsi.sc = sc; 2042 bdev_io->error.scsi.sk = sk; 2043 bdev_io->error.scsi.asc = asc; 2044 bdev_io->error.scsi.ascq = ascq; 2045 } 2046 2047 spdk_bdev_io_complete(bdev_io, bdev_io->status); 2048 } 2049 2050 void 2051 spdk_bdev_io_get_scsi_status(const struct spdk_bdev_io *bdev_io, 2052 int *sc, int *sk, int *asc, int *ascq) 2053 { 2054 assert(sc != NULL); 2055 assert(sk != NULL); 2056 assert(asc != NULL); 2057 assert(ascq != NULL); 2058 2059 switch (bdev_io->status) { 2060 case SPDK_BDEV_IO_STATUS_SUCCESS: 2061 *sc = SPDK_SCSI_STATUS_GOOD; 2062 *sk = SPDK_SCSI_SENSE_NO_SENSE; 2063 *asc = SPDK_SCSI_ASC_NO_ADDITIONAL_SENSE; 2064 *ascq = SPDK_SCSI_ASCQ_CAUSE_NOT_REPORTABLE; 2065 break; 2066 case SPDK_BDEV_IO_STATUS_NVME_ERROR: 2067 spdk_scsi_nvme_translate(bdev_io, sc, sk, asc, ascq); 2068 break; 2069 case SPDK_BDEV_IO_STATUS_SCSI_ERROR: 2070 *sc = bdev_io->error.scsi.sc; 2071 *sk = bdev_io->error.scsi.sk; 2072 *asc = bdev_io->error.scsi.asc; 2073 *ascq = bdev_io->error.scsi.ascq; 2074 break; 2075 default: 2076 *sc = SPDK_SCSI_STATUS_CHECK_CONDITION; 2077 *sk = SPDK_SCSI_SENSE_ABORTED_COMMAND; 2078 *asc = SPDK_SCSI_ASC_NO_ADDITIONAL_SENSE; 2079 *ascq = SPDK_SCSI_ASCQ_CAUSE_NOT_REPORTABLE; 2080 break; 2081 } 2082 } 2083 2084 void 2085 spdk_bdev_io_complete_nvme_status(struct spdk_bdev_io *bdev_io, int sct, int sc) 2086 { 2087 if (sct == SPDK_NVME_SCT_GENERIC && sc == SPDK_NVME_SC_SUCCESS) { 2088 bdev_io->status = SPDK_BDEV_IO_STATUS_SUCCESS; 2089 } else { 2090 bdev_io->error.nvme.sct = sct; 2091 bdev_io->error.nvme.sc = sc; 2092 bdev_io->status = SPDK_BDEV_IO_STATUS_NVME_ERROR; 2093 } 2094 2095 spdk_bdev_io_complete(bdev_io, bdev_io->status); 2096 } 2097 2098 void 2099 spdk_bdev_io_get_nvme_status(const struct spdk_bdev_io *bdev_io, int *sct, int *sc) 2100 { 2101 assert(sct != NULL); 2102 assert(sc != NULL); 2103 2104 if (bdev_io->status == SPDK_BDEV_IO_STATUS_NVME_ERROR) { 2105 *sct = bdev_io->error.nvme.sct; 2106 *sc = bdev_io->error.nvme.sc; 2107 } else if (bdev_io->status == SPDK_BDEV_IO_STATUS_SUCCESS) { 2108 *sct = SPDK_NVME_SCT_GENERIC; 2109 *sc = SPDK_NVME_SC_SUCCESS; 2110 } else { 2111 *sct = SPDK_NVME_SCT_GENERIC; 2112 *sc = SPDK_NVME_SC_INTERNAL_DEVICE_ERROR; 2113 } 2114 } 2115 2116 struct spdk_thread * 2117 spdk_bdev_io_get_thread(struct spdk_bdev_io *bdev_io) 2118 { 2119 return spdk_io_channel_get_thread(bdev_io->ch->channel); 2120 } 2121 2122 static int 2123 _spdk_bdev_register(struct spdk_bdev *bdev) 2124 { 2125 struct spdk_bdev_module_if *module; 2126 2127 assert(bdev->module != NULL); 2128 2129 if (!bdev->name) { 2130 SPDK_ERRLOG("Bdev name is NULL\n"); 2131 return -EINVAL; 2132 } 2133 2134 if (spdk_bdev_get_by_name(bdev->name)) { 2135 SPDK_ERRLOG("Bdev name:%s already exists\n", bdev->name); 2136 return -EEXIST; 2137 } 2138 2139 bdev->status = SPDK_BDEV_STATUS_READY; 2140 2141 TAILQ_INIT(&bdev->open_descs); 2142 2143 TAILQ_INIT(&bdev->vbdevs); 2144 TAILQ_INIT(&bdev->base_bdevs); 2145 2146 TAILQ_INIT(&bdev->aliases); 2147 2148 bdev->reset_in_progress = NULL; 2149 2150 spdk_io_device_register(__bdev_to_io_dev(bdev), 2151 spdk_bdev_channel_create, spdk_bdev_channel_destroy, 2152 sizeof(struct spdk_bdev_channel)); 2153 2154 pthread_mutex_init(&bdev->mutex, NULL); 2155 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Inserting bdev %s into list\n", bdev->name); 2156 TAILQ_INSERT_TAIL(&g_bdev_mgr.bdevs, bdev, link); 2157 2158 TAILQ_FOREACH(module, &g_bdev_mgr.bdev_modules, tailq) { 2159 if (module->examine) { 2160 module->action_in_progress++; 2161 module->examine(bdev); 2162 } 2163 } 2164 2165 return 0; 2166 } 2167 2168 int 2169 spdk_bdev_register(struct spdk_bdev *bdev) 2170 { 2171 return _spdk_bdev_register(bdev); 2172 } 2173 2174 int 2175 spdk_vbdev_register(struct spdk_bdev *vbdev, struct spdk_bdev **base_bdevs, int base_bdev_count) 2176 { 2177 int i, rc; 2178 2179 rc = _spdk_bdev_register(vbdev); 2180 if (rc) { 2181 return rc; 2182 } 2183 2184 for (i = 0; i < base_bdev_count; i++) { 2185 assert(base_bdevs[i] != NULL); 2186 TAILQ_INSERT_TAIL(&vbdev->base_bdevs, base_bdevs[i], base_bdev_link); 2187 TAILQ_INSERT_TAIL(&base_bdevs[i]->vbdevs, vbdev, vbdev_link); 2188 } 2189 2190 return 0; 2191 } 2192 2193 void 2194 spdk_bdev_unregister_done(struct spdk_bdev *bdev, int bdeverrno) 2195 { 2196 if (bdev->unregister_cb != NULL) { 2197 bdev->unregister_cb(bdev->unregister_ctx, bdeverrno); 2198 } 2199 } 2200 2201 static void 2202 _remove_notify(void *arg) 2203 { 2204 struct spdk_bdev_desc *desc = arg; 2205 2206 desc->remove_cb(desc->remove_ctx); 2207 } 2208 2209 void 2210 spdk_bdev_unregister(struct spdk_bdev *bdev, spdk_bdev_unregister_cb cb_fn, void *cb_arg) 2211 { 2212 struct spdk_bdev_desc *desc, *tmp; 2213 int rc; 2214 bool do_destruct = true; 2215 struct spdk_bdev *base_bdev; 2216 2217 SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Removing bdev %s from list\n", bdev->name); 2218 2219 pthread_mutex_lock(&bdev->mutex); 2220 2221 if (!TAILQ_EMPTY(&bdev->base_bdevs)) { 2222 TAILQ_FOREACH(base_bdev, &bdev->base_bdevs, base_bdev_link) { 2223 TAILQ_REMOVE(&base_bdev->vbdevs, bdev, vbdev_link); 2224 } 2225 } 2226 2227 bdev->status = SPDK_BDEV_STATUS_REMOVING; 2228 bdev->unregister_cb = cb_fn; 2229 bdev->unregister_ctx = cb_arg; 2230 2231 TAILQ_FOREACH_SAFE(desc, &bdev->open_descs, link, tmp) { 2232 if (desc->remove_cb) { 2233 do_destruct = false; 2234 /* 2235 * Defer invocation of the remove_cb to a separate message that will 2236 * run later on this thread. This ensures this context unwinds and 2237 * we don't recursively unregister this bdev again if the remove_cb 2238 * immediately closes its descriptor. 2239 */ 2240 spdk_thread_send_msg(spdk_get_thread(), _remove_notify, desc); 2241 } 2242 } 2243 2244 if (!do_destruct) { 2245 pthread_mutex_unlock(&bdev->mutex); 2246 return; 2247 } 2248 2249 TAILQ_REMOVE(&g_bdev_mgr.bdevs, bdev, link); 2250 pthread_mutex_unlock(&bdev->mutex); 2251 2252 pthread_mutex_destroy(&bdev->mutex); 2253 2254 spdk_io_device_unregister(__bdev_to_io_dev(bdev), NULL); 2255 2256 rc = bdev->fn_table->destruct(bdev->ctxt); 2257 if (rc < 0) { 2258 SPDK_ERRLOG("destruct failed\n"); 2259 } 2260 if (rc <= 0 && cb_fn != NULL) { 2261 cb_fn(cb_arg, rc); 2262 } 2263 } 2264 2265 int 2266 spdk_bdev_open(struct spdk_bdev *bdev, bool write, spdk_bdev_remove_cb_t remove_cb, 2267 void *remove_ctx, struct spdk_bdev_desc **_desc) 2268 { 2269 struct spdk_bdev_desc *desc; 2270 2271 desc = calloc(1, sizeof(*desc)); 2272 if (desc == NULL) { 2273 SPDK_ERRLOG("Failed to allocate memory for bdev descriptor\n"); 2274 return -ENOMEM; 2275 } 2276 2277 pthread_mutex_lock(&bdev->mutex); 2278 2279 if (write && bdev->claim_module) { 2280 SPDK_INFOLOG(SPDK_LOG_BDEV, "Could not open %s - already claimed\n", bdev->name); 2281 free(desc); 2282 pthread_mutex_unlock(&bdev->mutex); 2283 return -EPERM; 2284 } 2285 2286 TAILQ_INSERT_TAIL(&bdev->open_descs, desc, link); 2287 2288 desc->bdev = bdev; 2289 desc->remove_cb = remove_cb; 2290 desc->remove_ctx = remove_ctx; 2291 desc->write = write; 2292 *_desc = desc; 2293 2294 pthread_mutex_unlock(&bdev->mutex); 2295 2296 return 0; 2297 } 2298 2299 void 2300 spdk_bdev_close(struct spdk_bdev_desc *desc) 2301 { 2302 struct spdk_bdev *bdev = desc->bdev; 2303 bool do_unregister = false; 2304 2305 pthread_mutex_lock(&bdev->mutex); 2306 2307 TAILQ_REMOVE(&bdev->open_descs, desc, link); 2308 free(desc); 2309 2310 if (bdev->status == SPDK_BDEV_STATUS_REMOVING && TAILQ_EMPTY(&bdev->open_descs)) { 2311 do_unregister = true; 2312 } 2313 pthread_mutex_unlock(&bdev->mutex); 2314 2315 if (do_unregister == true) { 2316 spdk_bdev_unregister(bdev, bdev->unregister_cb, bdev->unregister_ctx); 2317 } 2318 } 2319 2320 int 2321 spdk_bdev_module_claim_bdev(struct spdk_bdev *bdev, struct spdk_bdev_desc *desc, 2322 struct spdk_bdev_module_if *module) 2323 { 2324 if (bdev->claim_module != NULL) { 2325 SPDK_ERRLOG("bdev %s already claimed by module %s\n", bdev->name, 2326 bdev->claim_module->name); 2327 return -EPERM; 2328 } 2329 2330 if (desc && !desc->write) { 2331 desc->write = true; 2332 } 2333 2334 bdev->claim_module = module; 2335 return 0; 2336 } 2337 2338 void 2339 spdk_bdev_module_release_bdev(struct spdk_bdev *bdev) 2340 { 2341 assert(bdev->claim_module != NULL); 2342 bdev->claim_module = NULL; 2343 } 2344 2345 struct spdk_bdev * 2346 spdk_bdev_desc_get_bdev(struct spdk_bdev_desc *desc) 2347 { 2348 return desc->bdev; 2349 } 2350 2351 void 2352 spdk_bdev_io_get_iovec(struct spdk_bdev_io *bdev_io, struct iovec **iovp, int *iovcntp) 2353 { 2354 struct iovec *iovs; 2355 int iovcnt; 2356 2357 if (bdev_io == NULL) { 2358 return; 2359 } 2360 2361 switch (bdev_io->type) { 2362 case SPDK_BDEV_IO_TYPE_READ: 2363 iovs = bdev_io->u.bdev.iovs; 2364 iovcnt = bdev_io->u.bdev.iovcnt; 2365 break; 2366 case SPDK_BDEV_IO_TYPE_WRITE: 2367 iovs = bdev_io->u.bdev.iovs; 2368 iovcnt = bdev_io->u.bdev.iovcnt; 2369 break; 2370 default: 2371 iovs = NULL; 2372 iovcnt = 0; 2373 break; 2374 } 2375 2376 if (iovp) { 2377 *iovp = iovs; 2378 } 2379 if (iovcntp) { 2380 *iovcntp = iovcnt; 2381 } 2382 } 2383 2384 void 2385 spdk_bdev_module_list_add(struct spdk_bdev_module_if *bdev_module) 2386 { 2387 /* 2388 * Modules with examine callbacks must be initialized first, so they are 2389 * ready to handle examine callbacks from later modules that will 2390 * register physical bdevs. 2391 */ 2392 if (bdev_module->examine != NULL) { 2393 TAILQ_INSERT_HEAD(&g_bdev_mgr.bdev_modules, bdev_module, tailq); 2394 } else { 2395 TAILQ_INSERT_TAIL(&g_bdev_mgr.bdev_modules, bdev_module, tailq); 2396 } 2397 } 2398 2399 static void 2400 spdk_bdev_write_zeroes_split(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg) 2401 { 2402 uint64_t len; 2403 2404 if (!success) { 2405 bdev_io->cb = bdev_io->stored_user_cb; 2406 _spdk_bdev_io_complete(bdev_io); 2407 return; 2408 } 2409 2410 /* no need to perform the error checking from write_zeroes_blocks because this request already passed those checks. */ 2411 len = spdk_min(spdk_bdev_get_block_size(bdev_io->bdev) * bdev_io->split_remaining_num_blocks, 2412 ZERO_BUFFER_SIZE); 2413 2414 bdev_io->u.bdev.offset_blocks = bdev_io->split_current_offset_blocks; 2415 bdev_io->u.bdev.iov.iov_len = len; 2416 bdev_io->u.bdev.num_blocks = len / spdk_bdev_get_block_size(bdev_io->bdev); 2417 bdev_io->split_remaining_num_blocks -= bdev_io->u.bdev.num_blocks; 2418 bdev_io->split_current_offset_blocks += bdev_io->u.bdev.num_blocks; 2419 2420 /* if this round completes the i/o, change the callback to be the original user callback */ 2421 if (bdev_io->split_remaining_num_blocks == 0) { 2422 spdk_bdev_io_init(bdev_io, bdev_io->bdev, cb_arg, bdev_io->stored_user_cb); 2423 } else { 2424 spdk_bdev_io_init(bdev_io, bdev_io->bdev, cb_arg, spdk_bdev_write_zeroes_split); 2425 } 2426 spdk_bdev_io_submit(bdev_io); 2427 } 2428 2429 SPDK_LOG_REGISTER_COMPONENT("bdev", SPDK_LOG_BDEV) 2430