xref: /spdk/lib/bdev/bdev.c (revision 9aed854be08215eb96edeff897e8a25a2858dc78)
1 /*-
2  *   BSD LICENSE
3  *
4  *   Copyright (C) 2008-2012 Daisuke Aoyama <aoyama@peach.ne.jp>.
5  *   Copyright (c) Intel Corporation.
6  *   All rights reserved.
7  *
8  *   Redistribution and use in source and binary forms, with or without
9  *   modification, are permitted provided that the following conditions
10  *   are met:
11  *
12  *     * Redistributions of source code must retain the above copyright
13  *       notice, this list of conditions and the following disclaimer.
14  *     * Redistributions in binary form must reproduce the above copyright
15  *       notice, this list of conditions and the following disclaimer in
16  *       the documentation and/or other materials provided with the
17  *       distribution.
18  *     * Neither the name of Intel Corporation nor the names of its
19  *       contributors may be used to endorse or promote products derived
20  *       from this software without specific prior written permission.
21  *
22  *   THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
23  *   "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
24  *   LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
25  *   A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
26  *   OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
27  *   SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
28  *   LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
29  *   DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
30  *   THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
31  *   (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
32  *   OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
33  */
34 
35 #include "spdk/stdinc.h"
36 
37 #include "spdk/bdev.h"
38 
39 #include "spdk/env.h"
40 #include "spdk/event.h"
41 #include "spdk/io_channel.h"
42 #include "spdk/likely.h"
43 #include "spdk/queue.h"
44 #include "spdk/nvme_spec.h"
45 #include "spdk/scsi_spec.h"
46 #include "spdk/util.h"
47 
48 #include "spdk_internal/bdev.h"
49 #include "spdk_internal/log.h"
50 #include "spdk/string.h"
51 
52 #ifdef SPDK_CONFIG_VTUNE
53 #include "ittnotify.h"
54 #include "ittnotify_types.h"
55 int __itt_init_ittlib(const char *, __itt_group_id);
56 #endif
57 
58 #define SPDK_BDEV_IO_POOL_SIZE	(64 * 1024)
59 #define SPDK_BDEV_IO_CACHE_SIZE	256
60 #define BUF_SMALL_POOL_SIZE	8192
61 #define BUF_LARGE_POOL_SIZE	1024
62 #define NOMEM_THRESHOLD_COUNT	8
63 #define ZERO_BUFFER_SIZE	0x100000
64 
65 typedef TAILQ_HEAD(, spdk_bdev_io) bdev_io_tailq_t;
66 typedef STAILQ_HEAD(, spdk_bdev_io) bdev_io_stailq_t;
67 
68 struct spdk_bdev_mgr {
69 	struct spdk_mempool *bdev_io_pool;
70 
71 	struct spdk_mempool *buf_small_pool;
72 	struct spdk_mempool *buf_large_pool;
73 
74 	void *zero_buffer;
75 
76 	TAILQ_HEAD(, spdk_bdev_module_if) bdev_modules;
77 
78 	TAILQ_HEAD(, spdk_bdev) bdevs;
79 
80 	bool init_complete;
81 	bool module_init_complete;
82 
83 #ifdef SPDK_CONFIG_VTUNE
84 	__itt_domain	*domain;
85 #endif
86 };
87 
88 static struct spdk_bdev_mgr g_bdev_mgr = {
89 	.bdev_modules = TAILQ_HEAD_INITIALIZER(g_bdev_mgr.bdev_modules),
90 	.bdevs = TAILQ_HEAD_INITIALIZER(g_bdev_mgr.bdevs),
91 	.init_complete = false,
92 	.module_init_complete = false,
93 };
94 
95 static spdk_bdev_init_cb	g_init_cb_fn = NULL;
96 static void			*g_init_cb_arg = NULL;
97 
98 static spdk_bdev_fini_cb	g_fini_cb_fn = NULL;
99 static void			*g_fini_cb_arg = NULL;
100 static struct spdk_thread	*g_fini_thread = NULL;
101 
102 
103 struct spdk_bdev_mgmt_channel {
104 	bdev_io_stailq_t need_buf_small;
105 	bdev_io_stailq_t need_buf_large;
106 
107 	/*
108 	 * Each thread keeps a cache of bdev_io - this allows
109 	 *  bdev threads which are *not* DPDK threads to still
110 	 *  benefit from a per-thread bdev_io cache.  Without
111 	 *  this, non-DPDK threads fetching from the mempool
112 	 *  incur a cmpxchg on get and put.
113 	 */
114 	bdev_io_stailq_t per_thread_cache;
115 	uint32_t	per_thread_cache_count;
116 
117 	TAILQ_HEAD(, spdk_bdev_module_channel) module_channels;
118 };
119 
120 struct spdk_bdev_desc {
121 	struct spdk_bdev		*bdev;
122 	spdk_bdev_remove_cb_t		remove_cb;
123 	void				*remove_ctx;
124 	bool				write;
125 	TAILQ_ENTRY(spdk_bdev_desc)	link;
126 };
127 
128 #define BDEV_CH_RESET_IN_PROGRESS	(1 << 0)
129 
130 struct spdk_bdev_channel {
131 	struct spdk_bdev	*bdev;
132 
133 	/* The channel for the underlying device */
134 	struct spdk_io_channel	*channel;
135 
136 	/* Channel for the bdev manager */
137 	struct spdk_io_channel *mgmt_channel;
138 
139 	struct spdk_bdev_io_stat stat;
140 
141 	bdev_io_tailq_t		queued_resets;
142 
143 	uint32_t		flags;
144 
145 	/* Per-device channel */
146 	struct spdk_bdev_module_channel *module_ch;
147 
148 #ifdef SPDK_CONFIG_VTUNE
149 	uint64_t		start_tsc;
150 	uint64_t		interval_tsc;
151 	__itt_string_handle	*handle;
152 #endif
153 
154 };
155 
156 /*
157  * Per-module (or per-io_device) channel. Multiple bdevs built on the same io_device
158  * will queue here their IO that awaits retry. It makes it posible to retry sending
159  * IO to one bdev after IO from other bdev completes.
160  */
161 struct spdk_bdev_module_channel {
162 	/*
163 	 * Count of I/O submitted to bdev module and waiting for completion.
164 	 * Incremented before submit_request() is called on an spdk_bdev_io.
165 	 */
166 	uint64_t		io_outstanding;
167 
168 	/*
169 	 * Queue of IO awaiting retry because of a previous NOMEM status returned
170 	 *  on this channel.
171 	 */
172 	bdev_io_tailq_t		nomem_io;
173 
174 	/*
175 	 * Threshold which io_outstanding must drop to before retrying nomem_io.
176 	 */
177 	uint64_t		nomem_threshold;
178 
179 	/* I/O channel allocated by a bdev module */
180 	struct spdk_io_channel	*module_ch;
181 
182 	uint32_t		ref;
183 
184 	TAILQ_ENTRY(spdk_bdev_module_channel) link;
185 };
186 
187 static void spdk_bdev_write_zeroes_split(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg);
188 
189 struct spdk_bdev *
190 spdk_bdev_first(void)
191 {
192 	struct spdk_bdev *bdev;
193 
194 	bdev = TAILQ_FIRST(&g_bdev_mgr.bdevs);
195 	if (bdev) {
196 		SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Starting bdev iteration at %s\n", bdev->name);
197 	}
198 
199 	return bdev;
200 }
201 
202 struct spdk_bdev *
203 spdk_bdev_next(struct spdk_bdev *prev)
204 {
205 	struct spdk_bdev *bdev;
206 
207 	bdev = TAILQ_NEXT(prev, link);
208 	if (bdev) {
209 		SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Continuing bdev iteration at %s\n", bdev->name);
210 	}
211 
212 	return bdev;
213 }
214 
215 static struct spdk_bdev *
216 _bdev_next_leaf(struct spdk_bdev *bdev)
217 {
218 	while (bdev != NULL) {
219 		if (TAILQ_EMPTY(&bdev->vbdevs)) {
220 			return bdev;
221 		} else {
222 			bdev = TAILQ_NEXT(bdev, link);
223 		}
224 	}
225 
226 	return bdev;
227 }
228 
229 struct spdk_bdev *
230 spdk_bdev_first_leaf(void)
231 {
232 	struct spdk_bdev *bdev;
233 
234 	bdev = _bdev_next_leaf(TAILQ_FIRST(&g_bdev_mgr.bdevs));
235 
236 	if (bdev) {
237 		SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Starting bdev iteration at %s\n", bdev->name);
238 	}
239 
240 	return bdev;
241 }
242 
243 struct spdk_bdev *
244 spdk_bdev_next_leaf(struct spdk_bdev *prev)
245 {
246 	struct spdk_bdev *bdev;
247 
248 	bdev = _bdev_next_leaf(TAILQ_NEXT(prev, link));
249 
250 	if (bdev) {
251 		SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Continuing bdev iteration at %s\n", bdev->name);
252 	}
253 
254 	return bdev;
255 }
256 
257 struct spdk_bdev *
258 spdk_bdev_get_by_name(const char *bdev_name)
259 {
260 	struct spdk_bdev_alias *tmp;
261 	struct spdk_bdev *bdev = spdk_bdev_first();
262 
263 	while (bdev != NULL) {
264 		if (strcmp(bdev_name, bdev->name) == 0) {
265 			return bdev;
266 		}
267 
268 		TAILQ_FOREACH(tmp, &bdev->aliases, tailq) {
269 			if (strcmp(bdev_name, tmp->alias) == 0) {
270 				return bdev;
271 			}
272 		}
273 
274 		bdev = spdk_bdev_next(bdev);
275 	}
276 
277 	return NULL;
278 }
279 
280 static void
281 spdk_bdev_io_set_buf(struct spdk_bdev_io *bdev_io, void *buf)
282 {
283 	assert(bdev_io->get_buf_cb != NULL);
284 	assert(buf != NULL);
285 	assert(bdev_io->u.bdev.iovs != NULL);
286 
287 	bdev_io->buf = buf;
288 	bdev_io->u.bdev.iovs[0].iov_base = (void *)((unsigned long)((char *)buf + 512) & ~511UL);
289 	bdev_io->u.bdev.iovs[0].iov_len = bdev_io->buf_len;
290 	bdev_io->get_buf_cb(bdev_io->ch->channel, bdev_io);
291 }
292 
293 static void
294 spdk_bdev_io_put_buf(struct spdk_bdev_io *bdev_io)
295 {
296 	struct spdk_mempool *pool;
297 	struct spdk_bdev_io *tmp;
298 	void *buf;
299 	bdev_io_stailq_t *stailq;
300 	struct spdk_bdev_mgmt_channel *ch;
301 
302 	assert(bdev_io->u.bdev.iovcnt == 1);
303 
304 	buf = bdev_io->buf;
305 	ch = bdev_io->mgmt_ch;
306 
307 	if (bdev_io->buf_len <= SPDK_BDEV_SMALL_BUF_MAX_SIZE) {
308 		pool = g_bdev_mgr.buf_small_pool;
309 		stailq = &ch->need_buf_small;
310 	} else {
311 		pool = g_bdev_mgr.buf_large_pool;
312 		stailq = &ch->need_buf_large;
313 	}
314 
315 	if (STAILQ_EMPTY(stailq)) {
316 		spdk_mempool_put(pool, buf);
317 	} else {
318 		tmp = STAILQ_FIRST(stailq);
319 		STAILQ_REMOVE_HEAD(stailq, buf_link);
320 		spdk_bdev_io_set_buf(tmp, buf);
321 	}
322 }
323 
324 void
325 spdk_bdev_io_get_buf(struct spdk_bdev_io *bdev_io, spdk_bdev_io_get_buf_cb cb, uint64_t len)
326 {
327 	struct spdk_mempool *pool;
328 	bdev_io_stailq_t *stailq;
329 	void *buf = NULL;
330 	struct spdk_bdev_mgmt_channel *ch;
331 
332 	assert(cb != NULL);
333 	assert(bdev_io->u.bdev.iovs != NULL);
334 
335 	if (spdk_unlikely(bdev_io->u.bdev.iovs[0].iov_base != NULL)) {
336 		/* Buffer already present */
337 		cb(bdev_io->ch->channel, bdev_io);
338 		return;
339 	}
340 
341 	assert(len <= SPDK_BDEV_LARGE_BUF_MAX_SIZE);
342 	ch = spdk_io_channel_get_ctx(bdev_io->ch->mgmt_channel);
343 
344 	bdev_io->buf_len = len;
345 	bdev_io->get_buf_cb = cb;
346 	if (len <= SPDK_BDEV_SMALL_BUF_MAX_SIZE) {
347 		pool = g_bdev_mgr.buf_small_pool;
348 		stailq = &ch->need_buf_small;
349 	} else {
350 		pool = g_bdev_mgr.buf_large_pool;
351 		stailq = &ch->need_buf_large;
352 	}
353 
354 	buf = spdk_mempool_get(pool);
355 
356 	if (!buf) {
357 		STAILQ_INSERT_TAIL(stailq, bdev_io, buf_link);
358 	} else {
359 		spdk_bdev_io_set_buf(bdev_io, buf);
360 	}
361 }
362 
363 static int
364 spdk_bdev_module_get_max_ctx_size(void)
365 {
366 	struct spdk_bdev_module_if *bdev_module;
367 	int max_bdev_module_size = 0;
368 
369 	TAILQ_FOREACH(bdev_module, &g_bdev_mgr.bdev_modules, tailq) {
370 		if (bdev_module->get_ctx_size && bdev_module->get_ctx_size() > max_bdev_module_size) {
371 			max_bdev_module_size = bdev_module->get_ctx_size();
372 		}
373 	}
374 
375 	return max_bdev_module_size;
376 }
377 
378 void
379 spdk_bdev_config_text(FILE *fp)
380 {
381 	struct spdk_bdev_module_if *bdev_module;
382 
383 	TAILQ_FOREACH(bdev_module, &g_bdev_mgr.bdev_modules, tailq) {
384 		if (bdev_module->config_text) {
385 			bdev_module->config_text(fp);
386 		}
387 	}
388 }
389 
390 static int
391 spdk_bdev_mgmt_channel_create(void *io_device, void *ctx_buf)
392 {
393 	struct spdk_bdev_mgmt_channel *ch = ctx_buf;
394 
395 	STAILQ_INIT(&ch->need_buf_small);
396 	STAILQ_INIT(&ch->need_buf_large);
397 
398 	STAILQ_INIT(&ch->per_thread_cache);
399 	ch->per_thread_cache_count = 0;
400 
401 	TAILQ_INIT(&ch->module_channels);
402 
403 	return 0;
404 }
405 
406 static void
407 spdk_bdev_mgmt_channel_free_resources(struct spdk_bdev_mgmt_channel *ch)
408 {
409 	struct spdk_bdev_io *bdev_io;
410 
411 	if (!STAILQ_EMPTY(&ch->need_buf_small) || !STAILQ_EMPTY(&ch->need_buf_large)) {
412 		SPDK_ERRLOG("Pending I/O list wasn't empty on channel free\n");
413 	}
414 
415 	while (!STAILQ_EMPTY(&ch->per_thread_cache)) {
416 		bdev_io = STAILQ_FIRST(&ch->per_thread_cache);
417 		STAILQ_REMOVE_HEAD(&ch->per_thread_cache, buf_link);
418 		ch->per_thread_cache_count--;
419 		spdk_mempool_put(g_bdev_mgr.bdev_io_pool, (void *)bdev_io);
420 	}
421 
422 	assert(ch->per_thread_cache_count == 0);
423 }
424 
425 static void
426 spdk_bdev_mgmt_channel_destroy(void *io_device, void *ctx_buf)
427 {
428 	struct spdk_bdev_mgmt_channel *ch = ctx_buf;
429 
430 	spdk_bdev_mgmt_channel_free_resources(ch);
431 }
432 
433 static void
434 spdk_bdev_init_complete(int rc)
435 {
436 	spdk_bdev_init_cb cb_fn = g_init_cb_fn;
437 	void *cb_arg = g_init_cb_arg;
438 
439 	g_bdev_mgr.init_complete = true;
440 	g_init_cb_fn = NULL;
441 	g_init_cb_arg = NULL;
442 
443 	cb_fn(cb_arg, rc);
444 }
445 
446 static void
447 spdk_bdev_module_action_complete(void)
448 {
449 	struct spdk_bdev_module_if *m;
450 
451 	/*
452 	 * Don't finish bdev subsystem initialization if
453 	 * module pre-initialization is still in progress, or
454 	 * the subsystem been already initialized.
455 	 */
456 	if (!g_bdev_mgr.module_init_complete || g_bdev_mgr.init_complete) {
457 		return;
458 	}
459 
460 	/*
461 	 * Check all bdev modules for inits/examinations in progress. If any
462 	 * exist, return immediately since we cannot finish bdev subsystem
463 	 * initialization until all are completed.
464 	 */
465 	TAILQ_FOREACH(m, &g_bdev_mgr.bdev_modules, tailq) {
466 		if (m->action_in_progress > 0) {
467 			return;
468 		}
469 	}
470 
471 	/*
472 	 * Modules already finished initialization - now that all
473 	 * the bdev modules have finished their asynchronous I/O
474 	 * processing, the entire bdev layer can be marked as complete.
475 	 */
476 	spdk_bdev_init_complete(0);
477 }
478 
479 static void
480 spdk_bdev_module_action_done(struct spdk_bdev_module_if *module)
481 {
482 	assert(module->action_in_progress > 0);
483 	module->action_in_progress--;
484 	spdk_bdev_module_action_complete();
485 }
486 
487 void
488 spdk_bdev_module_init_done(struct spdk_bdev_module_if *module)
489 {
490 	spdk_bdev_module_action_done(module);
491 }
492 
493 void
494 spdk_bdev_module_examine_done(struct spdk_bdev_module_if *module)
495 {
496 	spdk_bdev_module_action_done(module);
497 }
498 
499 static int
500 spdk_bdev_modules_init(void)
501 {
502 	struct spdk_bdev_module_if *module;
503 	int rc = 0;
504 
505 	TAILQ_FOREACH(module, &g_bdev_mgr.bdev_modules, tailq) {
506 		rc = module->module_init();
507 		if (rc != 0) {
508 			break;
509 		}
510 	}
511 
512 	g_bdev_mgr.module_init_complete = true;
513 	return rc;
514 }
515 void
516 spdk_bdev_initialize(spdk_bdev_init_cb cb_fn, void *cb_arg)
517 {
518 	int cache_size;
519 	int rc = 0;
520 	char mempool_name[32];
521 
522 	assert(cb_fn != NULL);
523 
524 	g_init_cb_fn = cb_fn;
525 	g_init_cb_arg = cb_arg;
526 
527 	snprintf(mempool_name, sizeof(mempool_name), "bdev_io_%d", getpid());
528 
529 	g_bdev_mgr.bdev_io_pool = spdk_mempool_create(mempool_name,
530 				  SPDK_BDEV_IO_POOL_SIZE,
531 				  sizeof(struct spdk_bdev_io) +
532 				  spdk_bdev_module_get_max_ctx_size(),
533 				  0,
534 				  SPDK_ENV_SOCKET_ID_ANY);
535 
536 	if (g_bdev_mgr.bdev_io_pool == NULL) {
537 		SPDK_ERRLOG("could not allocate spdk_bdev_io pool\n");
538 		spdk_bdev_init_complete(-1);
539 		return;
540 	}
541 
542 	/**
543 	 * Ensure no more than half of the total buffers end up local caches, by
544 	 *   using spdk_env_get_core_count() to determine how many local caches we need
545 	 *   to account for.
546 	 */
547 	cache_size = BUF_SMALL_POOL_SIZE / (2 * spdk_env_get_core_count());
548 	snprintf(mempool_name, sizeof(mempool_name), "buf_small_pool_%d", getpid());
549 
550 	g_bdev_mgr.buf_small_pool = spdk_mempool_create(mempool_name,
551 				    BUF_SMALL_POOL_SIZE,
552 				    SPDK_BDEV_SMALL_BUF_MAX_SIZE + 512,
553 				    cache_size,
554 				    SPDK_ENV_SOCKET_ID_ANY);
555 	if (!g_bdev_mgr.buf_small_pool) {
556 		SPDK_ERRLOG("create rbuf small pool failed\n");
557 		spdk_bdev_init_complete(-1);
558 		return;
559 	}
560 
561 	cache_size = BUF_LARGE_POOL_SIZE / (2 * spdk_env_get_core_count());
562 	snprintf(mempool_name, sizeof(mempool_name), "buf_large_pool_%d", getpid());
563 
564 	g_bdev_mgr.buf_large_pool = spdk_mempool_create(mempool_name,
565 				    BUF_LARGE_POOL_SIZE,
566 				    SPDK_BDEV_LARGE_BUF_MAX_SIZE + 512,
567 				    cache_size,
568 				    SPDK_ENV_SOCKET_ID_ANY);
569 	if (!g_bdev_mgr.buf_large_pool) {
570 		SPDK_ERRLOG("create rbuf large pool failed\n");
571 		spdk_bdev_init_complete(-1);
572 		return;
573 	}
574 
575 	g_bdev_mgr.zero_buffer = spdk_dma_zmalloc(ZERO_BUFFER_SIZE, ZERO_BUFFER_SIZE,
576 				 NULL);
577 	if (!g_bdev_mgr.zero_buffer) {
578 		SPDK_ERRLOG("create bdev zero buffer failed\n");
579 		spdk_bdev_init_complete(-1);
580 		return;
581 	}
582 
583 #ifdef SPDK_CONFIG_VTUNE
584 	g_bdev_mgr.domain = __itt_domain_create("spdk_bdev");
585 #endif
586 
587 	spdk_io_device_register(&g_bdev_mgr, spdk_bdev_mgmt_channel_create,
588 				spdk_bdev_mgmt_channel_destroy,
589 				sizeof(struct spdk_bdev_mgmt_channel));
590 
591 	rc = spdk_bdev_modules_init();
592 	if (rc != 0) {
593 		SPDK_ERRLOG("bdev modules init failed\n");
594 		spdk_bdev_init_complete(-1);
595 		return;
596 	}
597 
598 	spdk_bdev_module_action_complete();
599 }
600 
601 static void
602 spdk_bdev_module_finish_cb(void *io_device)
603 {
604 	spdk_bdev_fini_cb cb_fn = g_fini_cb_fn;
605 
606 	cb_fn(g_fini_cb_arg);
607 	g_fini_cb_fn = NULL;
608 	g_fini_cb_arg = NULL;
609 }
610 
611 static void
612 spdk_bdev_module_finish_complete(struct spdk_io_channel_iter *i, int status)
613 {
614 	if (spdk_mempool_count(g_bdev_mgr.bdev_io_pool) != SPDK_BDEV_IO_POOL_SIZE) {
615 		SPDK_ERRLOG("bdev IO pool count is %zu but should be %u\n",
616 			    spdk_mempool_count(g_bdev_mgr.bdev_io_pool),
617 			    SPDK_BDEV_IO_POOL_SIZE);
618 	}
619 
620 	if (spdk_mempool_count(g_bdev_mgr.buf_small_pool) != BUF_SMALL_POOL_SIZE) {
621 		SPDK_ERRLOG("Small buffer pool count is %zu but should be %u\n",
622 			    spdk_mempool_count(g_bdev_mgr.buf_small_pool),
623 			    BUF_SMALL_POOL_SIZE);
624 		assert(false);
625 	}
626 
627 	if (spdk_mempool_count(g_bdev_mgr.buf_large_pool) != BUF_LARGE_POOL_SIZE) {
628 		SPDK_ERRLOG("Large buffer pool count is %zu but should be %u\n",
629 			    spdk_mempool_count(g_bdev_mgr.buf_large_pool),
630 			    BUF_LARGE_POOL_SIZE);
631 		assert(false);
632 	}
633 
634 	spdk_mempool_free(g_bdev_mgr.bdev_io_pool);
635 	spdk_mempool_free(g_bdev_mgr.buf_small_pool);
636 	spdk_mempool_free(g_bdev_mgr.buf_large_pool);
637 	spdk_dma_free(g_bdev_mgr.zero_buffer);
638 
639 	spdk_io_device_unregister(&g_bdev_mgr, spdk_bdev_module_finish_cb);
640 }
641 
642 static void
643 mgmt_channel_free_resources(struct spdk_io_channel_iter *i)
644 {
645 	struct spdk_io_channel *_ch = spdk_io_channel_iter_get_channel(i);
646 	struct spdk_bdev_mgmt_channel *ch = spdk_io_channel_get_ctx(_ch);
647 
648 	spdk_bdev_mgmt_channel_free_resources(ch);
649 	spdk_for_each_channel_continue(i, 0);
650 }
651 
652 static void
653 spdk_bdev_module_finish_iter(void *arg)
654 {
655 	/* Notice that this variable is static. It is saved between calls to
656 	 * this function. */
657 	static struct spdk_bdev_module_if *resume_bdev_module = NULL;
658 	struct spdk_bdev_module_if *bdev_module;
659 
660 	/* Start iterating from the last touched module */
661 	if (!resume_bdev_module) {
662 		bdev_module = TAILQ_FIRST(&g_bdev_mgr.bdev_modules);
663 	} else {
664 		bdev_module = TAILQ_NEXT(resume_bdev_module, tailq);
665 	}
666 
667 	while (bdev_module) {
668 		if (bdev_module->async_fini) {
669 			/* Save our place so we can resume later. We must
670 			 * save the variable here, before calling module_fini()
671 			 * below, because in some cases the module may immediately
672 			 * call spdk_bdev_module_finish_done() and re-enter
673 			 * this function to continue iterating. */
674 			resume_bdev_module = bdev_module;
675 		}
676 
677 		if (bdev_module->module_fini) {
678 			bdev_module->module_fini();
679 		}
680 
681 		if (bdev_module->async_fini) {
682 			return;
683 		}
684 
685 		bdev_module = TAILQ_NEXT(bdev_module, tailq);
686 	}
687 
688 	resume_bdev_module = NULL;
689 	spdk_for_each_channel(&g_bdev_mgr, mgmt_channel_free_resources, NULL,
690 			      spdk_bdev_module_finish_complete);
691 }
692 
693 void
694 spdk_bdev_module_finish_done(void)
695 {
696 	if (spdk_get_thread() != g_fini_thread) {
697 		spdk_thread_send_msg(g_fini_thread, spdk_bdev_module_finish_iter, NULL);
698 	} else {
699 		spdk_bdev_module_finish_iter(NULL);
700 	}
701 }
702 
703 static void
704 _spdk_bdev_finish_unregister_bdevs_iter(void *cb_arg, int bdeverrno)
705 {
706 	struct spdk_bdev *bdev = cb_arg;
707 
708 	if (bdeverrno && bdev) {
709 		SPDK_WARNLOG("Unable to unregister bdev '%s' during spdk_bdev_finish()\n",
710 			     bdev->name);
711 
712 		/*
713 		 * Since the call to spdk_bdev_unregister() failed, we have no way to free this
714 		 *  bdev; try to continue by manually removing this bdev from the list and continue
715 		 *  with the next bdev in the list.
716 		 */
717 		TAILQ_REMOVE(&g_bdev_mgr.bdevs, bdev, link);
718 	}
719 
720 	if (TAILQ_EMPTY(&g_bdev_mgr.bdevs)) {
721 		SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Done unregistering bdevs\n");
722 		spdk_bdev_module_finish_iter(NULL);
723 		return;
724 	}
725 
726 	/*
727 	 * Unregister the first bdev in the list.
728 	 *
729 	 * spdk_bdev_unregister() will handle the case where the bdev has open descriptors by
730 	 *  calling the remove_cb of the descriptors first.
731 	 *
732 	 * Once this bdev and all of its open descriptors have been cleaned up, this function
733 	 *  will be called again via the unregister completion callback to continue the cleanup
734 	 *  process with the next bdev.
735 	 */
736 	bdev = TAILQ_FIRST(&g_bdev_mgr.bdevs);
737 	SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Unregistering bdev '%s'\n", bdev->name);
738 	spdk_bdev_unregister(bdev, _spdk_bdev_finish_unregister_bdevs_iter, bdev);
739 }
740 
741 static void
742 _spdk_bdev_finish_unregister_bdevs(void)
743 {
744 	_spdk_bdev_finish_unregister_bdevs_iter(NULL, 0);
745 }
746 
747 void
748 spdk_bdev_finish(spdk_bdev_fini_cb cb_fn, void *cb_arg)
749 {
750 	assert(cb_fn != NULL);
751 
752 	g_fini_thread = spdk_get_thread();
753 
754 	g_fini_cb_fn = cb_fn;
755 	g_fini_cb_arg = cb_arg;
756 
757 	_spdk_bdev_finish_unregister_bdevs();
758 }
759 
760 static struct spdk_bdev_io *
761 spdk_bdev_get_io(struct spdk_io_channel *_ch)
762 {
763 	struct spdk_bdev_mgmt_channel *ch = spdk_io_channel_get_ctx(_ch);
764 	struct spdk_bdev_io *bdev_io;
765 
766 	if (ch->per_thread_cache_count > 0) {
767 		bdev_io = STAILQ_FIRST(&ch->per_thread_cache);
768 		STAILQ_REMOVE_HEAD(&ch->per_thread_cache, buf_link);
769 		ch->per_thread_cache_count--;
770 	} else {
771 		bdev_io = spdk_mempool_get(g_bdev_mgr.bdev_io_pool);
772 		if (!bdev_io) {
773 			SPDK_ERRLOG("Unable to get spdk_bdev_io\n");
774 			abort();
775 		}
776 	}
777 
778 	bdev_io->mgmt_ch = ch;
779 
780 	return bdev_io;
781 }
782 
783 static void
784 spdk_bdev_put_io(struct spdk_bdev_io *bdev_io)
785 {
786 	struct spdk_bdev_mgmt_channel *ch = bdev_io->mgmt_ch;
787 
788 	if (bdev_io->buf != NULL) {
789 		spdk_bdev_io_put_buf(bdev_io);
790 	}
791 
792 	if (ch->per_thread_cache_count < SPDK_BDEV_IO_CACHE_SIZE) {
793 		ch->per_thread_cache_count++;
794 		STAILQ_INSERT_TAIL(&ch->per_thread_cache, bdev_io, buf_link);
795 	} else {
796 		spdk_mempool_put(g_bdev_mgr.bdev_io_pool, (void *)bdev_io);
797 	}
798 }
799 
800 static void
801 spdk_bdev_io_submit(struct spdk_bdev_io *bdev_io)
802 {
803 	struct spdk_bdev *bdev = bdev_io->bdev;
804 	struct spdk_bdev_channel *bdev_ch = bdev_io->ch;
805 	struct spdk_io_channel *ch = bdev_ch->channel;
806 	struct spdk_bdev_module_channel	*shared_ch = bdev_ch->module_ch;
807 
808 	assert(bdev_io->status == SPDK_BDEV_IO_STATUS_PENDING);
809 
810 	bdev_io->submit_tsc = spdk_get_ticks();
811 	shared_ch->io_outstanding++;
812 	bdev_io->in_submit_request = true;
813 	if (spdk_likely(bdev_ch->flags == 0)) {
814 		if (spdk_likely(TAILQ_EMPTY(&shared_ch->nomem_io))) {
815 			bdev->fn_table->submit_request(ch, bdev_io);
816 		} else {
817 			shared_ch->io_outstanding--;
818 			TAILQ_INSERT_TAIL(&shared_ch->nomem_io, bdev_io, link);
819 		}
820 	} else if (bdev_ch->flags & BDEV_CH_RESET_IN_PROGRESS) {
821 		spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED);
822 	} else {
823 		SPDK_ERRLOG("unknown bdev_ch flag %x found\n", bdev_ch->flags);
824 		spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED);
825 	}
826 	bdev_io->in_submit_request = false;
827 }
828 
829 static void
830 spdk_bdev_io_submit_reset(struct spdk_bdev_io *bdev_io)
831 {
832 	struct spdk_bdev *bdev = bdev_io->bdev;
833 	struct spdk_bdev_channel *bdev_ch = bdev_io->ch;
834 	struct spdk_io_channel *ch = bdev_ch->channel;
835 
836 	assert(bdev_io->status == SPDK_BDEV_IO_STATUS_PENDING);
837 
838 	bdev_io->in_submit_request = true;
839 	bdev->fn_table->submit_request(ch, bdev_io);
840 	bdev_io->in_submit_request = false;
841 }
842 
843 static void
844 spdk_bdev_io_init(struct spdk_bdev_io *bdev_io,
845 		  struct spdk_bdev *bdev, void *cb_arg,
846 		  spdk_bdev_io_completion_cb cb)
847 {
848 	bdev_io->bdev = bdev;
849 	bdev_io->caller_ctx = cb_arg;
850 	bdev_io->cb = cb;
851 	bdev_io->status = SPDK_BDEV_IO_STATUS_PENDING;
852 	bdev_io->in_submit_request = false;
853 	bdev_io->buf = NULL;
854 }
855 
856 bool
857 spdk_bdev_io_type_supported(struct spdk_bdev *bdev, enum spdk_bdev_io_type io_type)
858 {
859 	return bdev->fn_table->io_type_supported(bdev->ctxt, io_type);
860 }
861 
862 int
863 spdk_bdev_dump_config_json(struct spdk_bdev *bdev, struct spdk_json_write_ctx *w)
864 {
865 	if (bdev->fn_table->dump_config_json) {
866 		return bdev->fn_table->dump_config_json(bdev->ctxt, w);
867 	}
868 
869 	return 0;
870 }
871 
872 static void
873 _spdk_bdev_channel_destroy_resource(struct spdk_bdev_channel *ch)
874 {
875 	struct spdk_bdev_mgmt_channel	*mgmt_channel;
876 	struct spdk_bdev_module_channel	*shared_ch = ch->module_ch;
877 
878 	if (ch->channel) {
879 		spdk_put_io_channel(ch->channel);
880 	}
881 
882 	if (ch->mgmt_channel) {
883 		if (shared_ch) {
884 			assert(shared_ch->ref > 0);
885 			shared_ch->ref--;
886 			if (shared_ch->ref == 0) {
887 				mgmt_channel = spdk_io_channel_get_ctx(ch->mgmt_channel);
888 				assert(shared_ch->io_outstanding == 0);
889 				TAILQ_REMOVE(&mgmt_channel->module_channels, shared_ch, link);
890 				free(shared_ch);
891 			}
892 		}
893 		spdk_put_io_channel(ch->mgmt_channel);
894 	}
895 }
896 
897 static int
898 spdk_bdev_channel_create(void *io_device, void *ctx_buf)
899 {
900 	struct spdk_bdev		*bdev = io_device;
901 	struct spdk_bdev_channel	*ch = ctx_buf;
902 	struct spdk_bdev_mgmt_channel	*mgmt_ch;
903 	struct spdk_bdev_module_channel	*shared_ch;
904 
905 	ch->bdev = io_device;
906 	ch->channel = bdev->fn_table->get_io_channel(bdev->ctxt);
907 	if (!ch->channel) {
908 		_spdk_bdev_channel_destroy_resource(ch);
909 		return -1;
910 	}
911 
912 	ch->mgmt_channel = spdk_get_io_channel(&g_bdev_mgr);
913 	if (!ch->mgmt_channel) {
914 		_spdk_bdev_channel_destroy_resource(ch);
915 		return -1;
916 	}
917 
918 	mgmt_ch = spdk_io_channel_get_ctx(ch->mgmt_channel);
919 	TAILQ_FOREACH(shared_ch, &mgmt_ch->module_channels, link) {
920 		if (shared_ch->module_ch == ch->channel) {
921 			shared_ch->ref++;
922 			break;
923 		}
924 	}
925 
926 	if (shared_ch == NULL) {
927 		shared_ch = calloc(1, sizeof(*shared_ch));
928 		if (!shared_ch) {
929 			_spdk_bdev_channel_destroy_resource(ch);
930 			return -1;
931 		}
932 
933 		shared_ch->io_outstanding = 0;
934 		TAILQ_INIT(&shared_ch->nomem_io);
935 		shared_ch->nomem_threshold = 0;
936 		shared_ch->module_ch = ch->channel;
937 		shared_ch->ref = 1;
938 		TAILQ_INSERT_TAIL(&mgmt_ch->module_channels, shared_ch, link);
939 	}
940 
941 	memset(&ch->stat, 0, sizeof(ch->stat));
942 	TAILQ_INIT(&ch->queued_resets);
943 	ch->flags = 0;
944 	ch->module_ch = shared_ch;
945 
946 #ifdef SPDK_CONFIG_VTUNE
947 	{
948 		char *name;
949 		__itt_init_ittlib(NULL, 0);
950 		name = spdk_sprintf_alloc("spdk_bdev_%s_%p", ch->bdev->name, ch);
951 		if (!name) {
952 			_spdk_bdev_channel_destroy_resource(ch);
953 			return -1;
954 		}
955 		ch->handle = __itt_string_handle_create(name);
956 		free(name);
957 		ch->start_tsc = spdk_get_ticks();
958 		ch->interval_tsc = spdk_get_ticks_hz() / 100;
959 	}
960 #endif
961 
962 	return 0;
963 }
964 
965 /*
966  * Abort I/O that are waiting on a data buffer.  These types of I/O are
967  *  linked using the spdk_bdev_io buf_link TAILQ_ENTRY.
968  */
969 static void
970 _spdk_bdev_abort_buf_io(bdev_io_stailq_t *queue, struct spdk_bdev_channel *ch)
971 {
972 	bdev_io_stailq_t tmp;
973 	struct spdk_bdev_io *bdev_io;
974 
975 	STAILQ_INIT(&tmp);
976 
977 	while (!STAILQ_EMPTY(queue)) {
978 		bdev_io = STAILQ_FIRST(queue);
979 		STAILQ_REMOVE_HEAD(queue, buf_link);
980 		if (bdev_io->ch == ch) {
981 			spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED);
982 		} else {
983 			STAILQ_INSERT_TAIL(&tmp, bdev_io, buf_link);
984 		}
985 	}
986 
987 	STAILQ_SWAP(&tmp, queue, spdk_bdev_io);
988 }
989 
990 /*
991  * Abort I/O that are queued waiting for submission.  These types of I/O are
992  *  linked using the spdk_bdev_io link TAILQ_ENTRY.
993  */
994 static void
995 _spdk_bdev_abort_queued_io(bdev_io_tailq_t *queue, struct spdk_bdev_channel *ch)
996 {
997 	struct spdk_bdev_io *bdev_io, *tmp;
998 
999 	TAILQ_FOREACH_SAFE(bdev_io, queue, link, tmp) {
1000 		if (bdev_io->ch == ch) {
1001 			TAILQ_REMOVE(queue, bdev_io, link);
1002 			/*
1003 			 * spdk_bdev_io_complete() assumes that the completed I/O had
1004 			 *  been submitted to the bdev module.  Since in this case it
1005 			 *  hadn't, bump io_outstanding to account for the decrement
1006 			 *  that spdk_bdev_io_complete() will do.
1007 			 */
1008 			if (bdev_io->type != SPDK_BDEV_IO_TYPE_RESET) {
1009 				ch->module_ch->io_outstanding++;
1010 			}
1011 			spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED);
1012 		}
1013 	}
1014 }
1015 
1016 static void
1017 spdk_bdev_channel_destroy(void *io_device, void *ctx_buf)
1018 {
1019 	struct spdk_bdev_channel	*ch = ctx_buf;
1020 	struct spdk_bdev_mgmt_channel	*mgmt_channel;
1021 	struct spdk_bdev_module_channel	*shared_ch = ch->module_ch;
1022 
1023 	mgmt_channel = spdk_io_channel_get_ctx(ch->mgmt_channel);
1024 
1025 	_spdk_bdev_abort_queued_io(&ch->queued_resets, ch);
1026 	_spdk_bdev_abort_queued_io(&shared_ch->nomem_io, ch);
1027 	_spdk_bdev_abort_buf_io(&mgmt_channel->need_buf_small, ch);
1028 	_spdk_bdev_abort_buf_io(&mgmt_channel->need_buf_large, ch);
1029 
1030 	_spdk_bdev_channel_destroy_resource(ch);
1031 }
1032 
1033 int
1034 spdk_bdev_alias_add(struct spdk_bdev *bdev, const char *alias)
1035 {
1036 	struct spdk_bdev_alias *tmp;
1037 
1038 	if (alias == NULL) {
1039 		SPDK_ERRLOG("Empty alias passed\n");
1040 		return -EINVAL;
1041 	}
1042 
1043 	if (spdk_bdev_get_by_name(alias)) {
1044 		SPDK_ERRLOG("Bdev name/alias: %s already exists\n", alias);
1045 		return -EEXIST;
1046 	}
1047 
1048 	tmp = calloc(1, sizeof(*tmp));
1049 	if (tmp == NULL) {
1050 		SPDK_ERRLOG("Unable to allocate alias\n");
1051 		return -ENOMEM;
1052 	}
1053 
1054 	tmp->alias = strdup(alias);
1055 	if (tmp->alias == NULL) {
1056 		free(tmp);
1057 		SPDK_ERRLOG("Unable to allocate alias\n");
1058 		return -ENOMEM;
1059 	}
1060 
1061 	TAILQ_INSERT_TAIL(&bdev->aliases, tmp, tailq);
1062 
1063 	return 0;
1064 }
1065 
1066 int
1067 spdk_bdev_alias_del(struct spdk_bdev *bdev, const char *alias)
1068 {
1069 	struct spdk_bdev_alias *tmp;
1070 
1071 	TAILQ_FOREACH(tmp, &bdev->aliases, tailq) {
1072 		if (strcmp(alias, tmp->alias) == 0) {
1073 			TAILQ_REMOVE(&bdev->aliases, tmp, tailq);
1074 			free(tmp->alias);
1075 			free(tmp);
1076 			return 0;
1077 		}
1078 	}
1079 
1080 	SPDK_INFOLOG(SPDK_LOG_BDEV, "Alias %s does not exists\n", alias);
1081 
1082 	return -ENOENT;
1083 }
1084 
1085 struct spdk_io_channel *
1086 spdk_bdev_get_io_channel(struct spdk_bdev_desc *desc)
1087 {
1088 	return spdk_get_io_channel(desc->bdev);
1089 }
1090 
1091 const char *
1092 spdk_bdev_get_name(const struct spdk_bdev *bdev)
1093 {
1094 	return bdev->name;
1095 }
1096 
1097 const char *
1098 spdk_bdev_get_product_name(const struct spdk_bdev *bdev)
1099 {
1100 	return bdev->product_name;
1101 }
1102 
1103 const struct spdk_bdev_aliases_list *
1104 spdk_bdev_get_aliases(const struct spdk_bdev *bdev)
1105 {
1106 	return &bdev->aliases;
1107 }
1108 
1109 uint32_t
1110 spdk_bdev_get_block_size(const struct spdk_bdev *bdev)
1111 {
1112 	return bdev->blocklen;
1113 }
1114 
1115 uint64_t
1116 spdk_bdev_get_num_blocks(const struct spdk_bdev *bdev)
1117 {
1118 	return bdev->blockcnt;
1119 }
1120 
1121 size_t
1122 spdk_bdev_get_buf_align(const struct spdk_bdev *bdev)
1123 {
1124 	/* TODO: push this logic down to the bdev modules */
1125 	if (bdev->need_aligned_buffer) {
1126 		return bdev->blocklen;
1127 	}
1128 
1129 	return 1;
1130 }
1131 
1132 uint32_t
1133 spdk_bdev_get_optimal_io_boundary(const struct spdk_bdev *bdev)
1134 {
1135 	return bdev->optimal_io_boundary;
1136 }
1137 
1138 bool
1139 spdk_bdev_has_write_cache(const struct spdk_bdev *bdev)
1140 {
1141 	return bdev->write_cache;
1142 }
1143 
1144 int
1145 spdk_bdev_notify_blockcnt_change(struct spdk_bdev *bdev, uint64_t size)
1146 {
1147 	int ret;
1148 
1149 	pthread_mutex_lock(&bdev->mutex);
1150 
1151 	/* bdev has open descriptors */
1152 	if (!TAILQ_EMPTY(&bdev->open_descs) &&
1153 	    bdev->blockcnt > size) {
1154 		ret = -EBUSY;
1155 	} else {
1156 		bdev->blockcnt = size;
1157 		ret = 0;
1158 	}
1159 
1160 	pthread_mutex_unlock(&bdev->mutex);
1161 
1162 	return ret;
1163 }
1164 
1165 /*
1166  * Convert I/O offset and length from bytes to blocks.
1167  *
1168  * Returns zero on success or non-zero if the byte parameters aren't divisible by the block size.
1169  */
1170 static uint64_t
1171 spdk_bdev_bytes_to_blocks(struct spdk_bdev *bdev, uint64_t offset_bytes, uint64_t *offset_blocks,
1172 			  uint64_t num_bytes, uint64_t *num_blocks)
1173 {
1174 	uint32_t block_size = bdev->blocklen;
1175 
1176 	*offset_blocks = offset_bytes / block_size;
1177 	*num_blocks = num_bytes / block_size;
1178 
1179 	return (offset_bytes % block_size) | (num_bytes % block_size);
1180 }
1181 
1182 static bool
1183 spdk_bdev_io_valid_blocks(struct spdk_bdev *bdev, uint64_t offset_blocks, uint64_t num_blocks)
1184 {
1185 	/* Return failure if offset_blocks + num_blocks is less than offset_blocks; indicates there
1186 	 * has been an overflow and hence the offset has been wrapped around */
1187 	if (offset_blocks + num_blocks < offset_blocks) {
1188 		return false;
1189 	}
1190 
1191 	/* Return failure if offset_blocks + num_blocks exceeds the size of the bdev */
1192 	if (offset_blocks + num_blocks > bdev->blockcnt) {
1193 		return false;
1194 	}
1195 
1196 	return true;
1197 }
1198 
1199 int
1200 spdk_bdev_read(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
1201 	       void *buf, uint64_t offset, uint64_t nbytes,
1202 	       spdk_bdev_io_completion_cb cb, void *cb_arg)
1203 {
1204 	uint64_t offset_blocks, num_blocks;
1205 
1206 	if (spdk_bdev_bytes_to_blocks(desc->bdev, offset, &offset_blocks, nbytes, &num_blocks) != 0) {
1207 		return -EINVAL;
1208 	}
1209 
1210 	return spdk_bdev_read_blocks(desc, ch, buf, offset_blocks, num_blocks, cb, cb_arg);
1211 }
1212 
1213 int
1214 spdk_bdev_read_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
1215 		      void *buf, uint64_t offset_blocks, uint64_t num_blocks,
1216 		      spdk_bdev_io_completion_cb cb, void *cb_arg)
1217 {
1218 	struct spdk_bdev *bdev = desc->bdev;
1219 	struct spdk_bdev_io *bdev_io;
1220 	struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch);
1221 
1222 	if (!spdk_bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) {
1223 		return -EINVAL;
1224 	}
1225 
1226 	bdev_io = spdk_bdev_get_io(channel->mgmt_channel);
1227 	if (!bdev_io) {
1228 		SPDK_ERRLOG("spdk_bdev_io memory allocation failed duing read\n");
1229 		return -ENOMEM;
1230 	}
1231 
1232 	bdev_io->ch = channel;
1233 	bdev_io->type = SPDK_BDEV_IO_TYPE_READ;
1234 	bdev_io->u.bdev.iov.iov_base = buf;
1235 	bdev_io->u.bdev.iov.iov_len = num_blocks * bdev->blocklen;
1236 	bdev_io->u.bdev.iovs = &bdev_io->u.bdev.iov;
1237 	bdev_io->u.bdev.iovcnt = 1;
1238 	bdev_io->u.bdev.num_blocks = num_blocks;
1239 	bdev_io->u.bdev.offset_blocks = offset_blocks;
1240 	spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb);
1241 
1242 	spdk_bdev_io_submit(bdev_io);
1243 	return 0;
1244 }
1245 
1246 int
1247 spdk_bdev_readv(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
1248 		struct iovec *iov, int iovcnt,
1249 		uint64_t offset, uint64_t nbytes,
1250 		spdk_bdev_io_completion_cb cb, void *cb_arg)
1251 {
1252 	uint64_t offset_blocks, num_blocks;
1253 
1254 	if (spdk_bdev_bytes_to_blocks(desc->bdev, offset, &offset_blocks, nbytes, &num_blocks) != 0) {
1255 		return -EINVAL;
1256 	}
1257 
1258 	return spdk_bdev_readv_blocks(desc, ch, iov, iovcnt, offset_blocks, num_blocks, cb, cb_arg);
1259 }
1260 
1261 int spdk_bdev_readv_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
1262 			   struct iovec *iov, int iovcnt,
1263 			   uint64_t offset_blocks, uint64_t num_blocks,
1264 			   spdk_bdev_io_completion_cb cb, void *cb_arg)
1265 {
1266 	struct spdk_bdev *bdev = desc->bdev;
1267 	struct spdk_bdev_io *bdev_io;
1268 	struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch);
1269 
1270 	if (!spdk_bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) {
1271 		return -EINVAL;
1272 	}
1273 
1274 	bdev_io = spdk_bdev_get_io(channel->mgmt_channel);
1275 	if (!bdev_io) {
1276 		SPDK_ERRLOG("spdk_bdev_io memory allocation failed duing read\n");
1277 		return -ENOMEM;
1278 	}
1279 
1280 	bdev_io->ch = channel;
1281 	bdev_io->type = SPDK_BDEV_IO_TYPE_READ;
1282 	bdev_io->u.bdev.iovs = iov;
1283 	bdev_io->u.bdev.iovcnt = iovcnt;
1284 	bdev_io->u.bdev.num_blocks = num_blocks;
1285 	bdev_io->u.bdev.offset_blocks = offset_blocks;
1286 	spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb);
1287 
1288 	spdk_bdev_io_submit(bdev_io);
1289 	return 0;
1290 }
1291 
1292 int
1293 spdk_bdev_write(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
1294 		void *buf, uint64_t offset, uint64_t nbytes,
1295 		spdk_bdev_io_completion_cb cb, void *cb_arg)
1296 {
1297 	uint64_t offset_blocks, num_blocks;
1298 
1299 	if (spdk_bdev_bytes_to_blocks(desc->bdev, offset, &offset_blocks, nbytes, &num_blocks) != 0) {
1300 		return -EINVAL;
1301 	}
1302 
1303 	return spdk_bdev_write_blocks(desc, ch, buf, offset_blocks, num_blocks, cb, cb_arg);
1304 }
1305 
1306 int
1307 spdk_bdev_write_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
1308 		       void *buf, uint64_t offset_blocks, uint64_t num_blocks,
1309 		       spdk_bdev_io_completion_cb cb, void *cb_arg)
1310 {
1311 	struct spdk_bdev *bdev = desc->bdev;
1312 	struct spdk_bdev_io *bdev_io;
1313 	struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch);
1314 
1315 	if (!desc->write) {
1316 		return -EBADF;
1317 	}
1318 
1319 	if (!spdk_bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) {
1320 		return -EINVAL;
1321 	}
1322 
1323 	bdev_io = spdk_bdev_get_io(channel->mgmt_channel);
1324 	if (!bdev_io) {
1325 		SPDK_ERRLOG("bdev_io memory allocation failed duing write\n");
1326 		return -ENOMEM;
1327 	}
1328 
1329 	bdev_io->ch = channel;
1330 	bdev_io->type = SPDK_BDEV_IO_TYPE_WRITE;
1331 	bdev_io->u.bdev.iov.iov_base = buf;
1332 	bdev_io->u.bdev.iov.iov_len = num_blocks * bdev->blocklen;
1333 	bdev_io->u.bdev.iovs = &bdev_io->u.bdev.iov;
1334 	bdev_io->u.bdev.iovcnt = 1;
1335 	bdev_io->u.bdev.num_blocks = num_blocks;
1336 	bdev_io->u.bdev.offset_blocks = offset_blocks;
1337 	spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb);
1338 
1339 	spdk_bdev_io_submit(bdev_io);
1340 	return 0;
1341 }
1342 
1343 int
1344 spdk_bdev_writev(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
1345 		 struct iovec *iov, int iovcnt,
1346 		 uint64_t offset, uint64_t len,
1347 		 spdk_bdev_io_completion_cb cb, void *cb_arg)
1348 {
1349 	uint64_t offset_blocks, num_blocks;
1350 
1351 	if (spdk_bdev_bytes_to_blocks(desc->bdev, offset, &offset_blocks, len, &num_blocks) != 0) {
1352 		return -EINVAL;
1353 	}
1354 
1355 	return spdk_bdev_writev_blocks(desc, ch, iov, iovcnt, offset_blocks, num_blocks, cb, cb_arg);
1356 }
1357 
1358 int
1359 spdk_bdev_writev_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
1360 			struct iovec *iov, int iovcnt,
1361 			uint64_t offset_blocks, uint64_t num_blocks,
1362 			spdk_bdev_io_completion_cb cb, void *cb_arg)
1363 {
1364 	struct spdk_bdev *bdev = desc->bdev;
1365 	struct spdk_bdev_io *bdev_io;
1366 	struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch);
1367 
1368 	if (!desc->write) {
1369 		return -EBADF;
1370 	}
1371 
1372 	if (!spdk_bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) {
1373 		return -EINVAL;
1374 	}
1375 
1376 	bdev_io = spdk_bdev_get_io(channel->mgmt_channel);
1377 	if (!bdev_io) {
1378 		SPDK_ERRLOG("bdev_io memory allocation failed duing writev\n");
1379 		return -ENOMEM;
1380 	}
1381 
1382 	bdev_io->ch = channel;
1383 	bdev_io->type = SPDK_BDEV_IO_TYPE_WRITE;
1384 	bdev_io->u.bdev.iovs = iov;
1385 	bdev_io->u.bdev.iovcnt = iovcnt;
1386 	bdev_io->u.bdev.num_blocks = num_blocks;
1387 	bdev_io->u.bdev.offset_blocks = offset_blocks;
1388 	spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb);
1389 
1390 	spdk_bdev_io_submit(bdev_io);
1391 	return 0;
1392 }
1393 
1394 int
1395 spdk_bdev_write_zeroes(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
1396 		       uint64_t offset, uint64_t len,
1397 		       spdk_bdev_io_completion_cb cb, void *cb_arg)
1398 {
1399 	uint64_t offset_blocks, num_blocks;
1400 
1401 	if (spdk_bdev_bytes_to_blocks(desc->bdev, offset, &offset_blocks, len, &num_blocks) != 0) {
1402 		return -EINVAL;
1403 	}
1404 
1405 	return spdk_bdev_write_zeroes_blocks(desc, ch, offset_blocks, num_blocks, cb, cb_arg);
1406 }
1407 
1408 int
1409 spdk_bdev_write_zeroes_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
1410 			      uint64_t offset_blocks, uint64_t num_blocks,
1411 			      spdk_bdev_io_completion_cb cb, void *cb_arg)
1412 {
1413 	struct spdk_bdev *bdev = desc->bdev;
1414 	struct spdk_bdev_io *bdev_io;
1415 	struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch);
1416 	uint64_t len;
1417 	bool split_request = false;
1418 
1419 	if (num_blocks > UINT64_MAX / spdk_bdev_get_block_size(bdev)) {
1420 		SPDK_ERRLOG("length argument out of range in write_zeroes\n");
1421 		return -ERANGE;
1422 	}
1423 
1424 	if (!spdk_bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) {
1425 		return -EINVAL;
1426 	}
1427 
1428 	bdev_io = spdk_bdev_get_io(channel->mgmt_channel);
1429 
1430 	if (!bdev_io) {
1431 		SPDK_ERRLOG("bdev_io memory allocation failed duing write_zeroes\n");
1432 		return -ENOMEM;
1433 	}
1434 
1435 	bdev_io->ch = channel;
1436 	bdev_io->u.bdev.offset_blocks = offset_blocks;
1437 
1438 	if (spdk_bdev_io_type_supported(bdev, SPDK_BDEV_IO_TYPE_WRITE_ZEROES)) {
1439 		bdev_io->type = SPDK_BDEV_IO_TYPE_WRITE_ZEROES;
1440 		bdev_io->u.bdev.num_blocks = num_blocks;
1441 		bdev_io->u.bdev.iovs = NULL;
1442 		bdev_io->u.bdev.iovcnt = 0;
1443 
1444 	} else {
1445 		assert(spdk_bdev_get_block_size(bdev) <= ZERO_BUFFER_SIZE);
1446 
1447 		len = spdk_bdev_get_block_size(bdev) * num_blocks;
1448 
1449 		if (len > ZERO_BUFFER_SIZE) {
1450 			split_request = true;
1451 			len = ZERO_BUFFER_SIZE;
1452 		}
1453 
1454 		bdev_io->type = SPDK_BDEV_IO_TYPE_WRITE;
1455 		bdev_io->u.bdev.iov.iov_base = g_bdev_mgr.zero_buffer;
1456 		bdev_io->u.bdev.iov.iov_len = len;
1457 		bdev_io->u.bdev.iovs = &bdev_io->u.bdev.iov;
1458 		bdev_io->u.bdev.iovcnt = 1;
1459 		bdev_io->u.bdev.num_blocks = len / spdk_bdev_get_block_size(bdev);
1460 		bdev_io->split_remaining_num_blocks = num_blocks - bdev_io->u.bdev.num_blocks;
1461 		bdev_io->split_current_offset_blocks = offset_blocks + bdev_io->u.bdev.num_blocks;
1462 	}
1463 
1464 	if (split_request) {
1465 		bdev_io->stored_user_cb = cb;
1466 		spdk_bdev_io_init(bdev_io, bdev, cb_arg, spdk_bdev_write_zeroes_split);
1467 	} else {
1468 		spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb);
1469 	}
1470 	spdk_bdev_io_submit(bdev_io);
1471 	return 0;
1472 }
1473 
1474 int
1475 spdk_bdev_unmap(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
1476 		uint64_t offset, uint64_t nbytes,
1477 		spdk_bdev_io_completion_cb cb, void *cb_arg)
1478 {
1479 	uint64_t offset_blocks, num_blocks;
1480 
1481 	if (spdk_bdev_bytes_to_blocks(desc->bdev, offset, &offset_blocks, nbytes, &num_blocks) != 0) {
1482 		return -EINVAL;
1483 	}
1484 
1485 	return spdk_bdev_unmap_blocks(desc, ch, offset_blocks, num_blocks, cb, cb_arg);
1486 }
1487 
1488 int
1489 spdk_bdev_unmap_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
1490 		       uint64_t offset_blocks, uint64_t num_blocks,
1491 		       spdk_bdev_io_completion_cb cb, void *cb_arg)
1492 {
1493 	struct spdk_bdev *bdev = desc->bdev;
1494 	struct spdk_bdev_io *bdev_io;
1495 	struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch);
1496 
1497 	if (!desc->write) {
1498 		return -EBADF;
1499 	}
1500 
1501 	if (!spdk_bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) {
1502 		return -EINVAL;
1503 	}
1504 
1505 	if (num_blocks == 0) {
1506 		SPDK_ERRLOG("Can't unmap 0 bytes\n");
1507 		return -EINVAL;
1508 	}
1509 
1510 	bdev_io = spdk_bdev_get_io(channel->mgmt_channel);
1511 	if (!bdev_io) {
1512 		SPDK_ERRLOG("bdev_io memory allocation failed duing unmap\n");
1513 		return -ENOMEM;
1514 	}
1515 
1516 	bdev_io->ch = channel;
1517 	bdev_io->type = SPDK_BDEV_IO_TYPE_UNMAP;
1518 	bdev_io->u.bdev.iov.iov_base = NULL;
1519 	bdev_io->u.bdev.iov.iov_len = 0;
1520 	bdev_io->u.bdev.iovs = &bdev_io->u.bdev.iov;
1521 	bdev_io->u.bdev.iovcnt = 1;
1522 	bdev_io->u.bdev.offset_blocks = offset_blocks;
1523 	bdev_io->u.bdev.num_blocks = num_blocks;
1524 	spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb);
1525 
1526 	spdk_bdev_io_submit(bdev_io);
1527 	return 0;
1528 }
1529 
1530 int
1531 spdk_bdev_flush(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
1532 		uint64_t offset, uint64_t length,
1533 		spdk_bdev_io_completion_cb cb, void *cb_arg)
1534 {
1535 	uint64_t offset_blocks, num_blocks;
1536 
1537 	if (spdk_bdev_bytes_to_blocks(desc->bdev, offset, &offset_blocks, length, &num_blocks) != 0) {
1538 		return -EINVAL;
1539 	}
1540 
1541 	return spdk_bdev_flush_blocks(desc, ch, offset_blocks, num_blocks, cb, cb_arg);
1542 }
1543 
1544 int
1545 spdk_bdev_flush_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
1546 		       uint64_t offset_blocks, uint64_t num_blocks,
1547 		       spdk_bdev_io_completion_cb cb, void *cb_arg)
1548 {
1549 	struct spdk_bdev *bdev = desc->bdev;
1550 	struct spdk_bdev_io *bdev_io;
1551 	struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch);
1552 
1553 	if (!desc->write) {
1554 		return -EBADF;
1555 	}
1556 
1557 	if (!spdk_bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) {
1558 		return -EINVAL;
1559 	}
1560 
1561 	bdev_io = spdk_bdev_get_io(channel->mgmt_channel);
1562 	if (!bdev_io) {
1563 		SPDK_ERRLOG("bdev_io memory allocation failed duing flush\n");
1564 		return -ENOMEM;
1565 	}
1566 
1567 	bdev_io->ch = channel;
1568 	bdev_io->type = SPDK_BDEV_IO_TYPE_FLUSH;
1569 	bdev_io->u.bdev.iovs = NULL;
1570 	bdev_io->u.bdev.iovcnt = 0;
1571 	bdev_io->u.bdev.offset_blocks = offset_blocks;
1572 	bdev_io->u.bdev.num_blocks = num_blocks;
1573 	spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb);
1574 
1575 	spdk_bdev_io_submit(bdev_io);
1576 	return 0;
1577 }
1578 
1579 static void
1580 _spdk_bdev_reset_dev(struct spdk_io_channel_iter *i, int status)
1581 {
1582 	struct spdk_bdev_channel *ch = spdk_io_channel_iter_get_ctx(i);
1583 	struct spdk_bdev_io *bdev_io;
1584 
1585 	bdev_io = TAILQ_FIRST(&ch->queued_resets);
1586 	TAILQ_REMOVE(&ch->queued_resets, bdev_io, link);
1587 	spdk_bdev_io_submit_reset(bdev_io);
1588 }
1589 
1590 static void
1591 _spdk_bdev_reset_freeze_channel(struct spdk_io_channel_iter *i)
1592 {
1593 	struct spdk_io_channel 		*ch;
1594 	struct spdk_bdev_channel	*channel;
1595 	struct spdk_bdev_mgmt_channel	*mgmt_channel;
1596 	struct spdk_bdev_module_channel	*shared_ch;
1597 
1598 	ch = spdk_io_channel_iter_get_channel(i);
1599 	channel = spdk_io_channel_get_ctx(ch);
1600 	mgmt_channel = spdk_io_channel_get_ctx(channel->mgmt_channel);
1601 	shared_ch = channel->module_ch;
1602 
1603 	channel->flags |= BDEV_CH_RESET_IN_PROGRESS;
1604 
1605 	_spdk_bdev_abort_queued_io(&shared_ch->nomem_io, channel);
1606 	_spdk_bdev_abort_buf_io(&mgmt_channel->need_buf_small, channel);
1607 	_spdk_bdev_abort_buf_io(&mgmt_channel->need_buf_large, channel);
1608 
1609 	spdk_for_each_channel_continue(i, 0);
1610 }
1611 
1612 static void
1613 _spdk_bdev_start_reset(void *ctx)
1614 {
1615 	struct spdk_bdev_channel *ch = ctx;
1616 
1617 	spdk_for_each_channel(ch->bdev, _spdk_bdev_reset_freeze_channel,
1618 			      ch, _spdk_bdev_reset_dev);
1619 }
1620 
1621 static void
1622 _spdk_bdev_channel_start_reset(struct spdk_bdev_channel *ch)
1623 {
1624 	struct spdk_bdev *bdev = ch->bdev;
1625 
1626 	assert(!TAILQ_EMPTY(&ch->queued_resets));
1627 
1628 	pthread_mutex_lock(&bdev->mutex);
1629 	if (bdev->reset_in_progress == NULL) {
1630 		bdev->reset_in_progress = TAILQ_FIRST(&ch->queued_resets);
1631 		/*
1632 		 * Take a channel reference for the target bdev for the life of this
1633 		 *  reset.  This guards against the channel getting destroyed while
1634 		 *  spdk_for_each_channel() calls related to this reset IO are in
1635 		 *  progress.  We will release the reference when this reset is
1636 		 *  completed.
1637 		 */
1638 		bdev->reset_in_progress->u.reset.ch_ref = spdk_get_io_channel(bdev);
1639 		_spdk_bdev_start_reset(ch);
1640 	}
1641 	pthread_mutex_unlock(&bdev->mutex);
1642 }
1643 
1644 int
1645 spdk_bdev_reset(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
1646 		spdk_bdev_io_completion_cb cb, void *cb_arg)
1647 {
1648 	struct spdk_bdev *bdev = desc->bdev;
1649 	struct spdk_bdev_io *bdev_io;
1650 	struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch);
1651 
1652 	bdev_io = spdk_bdev_get_io(channel->mgmt_channel);
1653 	if (!bdev_io) {
1654 		SPDK_ERRLOG("bdev_io memory allocation failed duing reset\n");
1655 		return -ENOMEM;
1656 	}
1657 
1658 	bdev_io->ch = channel;
1659 	bdev_io->type = SPDK_BDEV_IO_TYPE_RESET;
1660 	bdev_io->u.reset.ch_ref = NULL;
1661 	spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb);
1662 
1663 	pthread_mutex_lock(&bdev->mutex);
1664 	TAILQ_INSERT_TAIL(&channel->queued_resets, bdev_io, link);
1665 	pthread_mutex_unlock(&bdev->mutex);
1666 
1667 	_spdk_bdev_channel_start_reset(channel);
1668 
1669 	return 0;
1670 }
1671 
1672 void
1673 spdk_bdev_get_io_stat(struct spdk_bdev *bdev, struct spdk_io_channel *ch,
1674 		      struct spdk_bdev_io_stat *stat)
1675 {
1676 #ifdef SPDK_CONFIG_VTUNE
1677 	SPDK_ERRLOG("Calling spdk_bdev_get_io_stat is not allowed when VTune integration is enabled.\n");
1678 	memset(stat, 0, sizeof(*stat));
1679 	return;
1680 #endif
1681 
1682 	struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch);
1683 
1684 	channel->stat.ticks_rate = spdk_get_ticks_hz();
1685 	*stat = channel->stat;
1686 	memset(&channel->stat, 0, sizeof(channel->stat));
1687 }
1688 
1689 int
1690 spdk_bdev_nvme_admin_passthru(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
1691 			      const struct spdk_nvme_cmd *cmd, void *buf, size_t nbytes,
1692 			      spdk_bdev_io_completion_cb cb, void *cb_arg)
1693 {
1694 	struct spdk_bdev *bdev = desc->bdev;
1695 	struct spdk_bdev_io *bdev_io;
1696 	struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch);
1697 
1698 	if (!desc->write) {
1699 		return -EBADF;
1700 	}
1701 
1702 	bdev_io = spdk_bdev_get_io(channel->mgmt_channel);
1703 	if (!bdev_io) {
1704 		SPDK_ERRLOG("bdev_io memory allocation failed during nvme_admin_passthru\n");
1705 		return -ENOMEM;
1706 	}
1707 
1708 	bdev_io->ch = channel;
1709 	bdev_io->type = SPDK_BDEV_IO_TYPE_NVME_ADMIN;
1710 	bdev_io->u.nvme_passthru.cmd = *cmd;
1711 	bdev_io->u.nvme_passthru.buf = buf;
1712 	bdev_io->u.nvme_passthru.nbytes = nbytes;
1713 	bdev_io->u.nvme_passthru.md_buf = NULL;
1714 	bdev_io->u.nvme_passthru.md_len = 0;
1715 
1716 	spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb);
1717 
1718 	spdk_bdev_io_submit(bdev_io);
1719 	return 0;
1720 }
1721 
1722 int
1723 spdk_bdev_nvme_io_passthru(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
1724 			   const struct spdk_nvme_cmd *cmd, void *buf, size_t nbytes,
1725 			   spdk_bdev_io_completion_cb cb, void *cb_arg)
1726 {
1727 	struct spdk_bdev *bdev = desc->bdev;
1728 	struct spdk_bdev_io *bdev_io;
1729 	struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch);
1730 
1731 	if (!desc->write) {
1732 		/*
1733 		 * Do not try to parse the NVMe command - we could maybe use bits in the opcode
1734 		 *  to easily determine if the command is a read or write, but for now just
1735 		 *  do not allow io_passthru with a read-only descriptor.
1736 		 */
1737 		return -EBADF;
1738 	}
1739 
1740 	bdev_io = spdk_bdev_get_io(channel->mgmt_channel);
1741 	if (!bdev_io) {
1742 		SPDK_ERRLOG("bdev_io memory allocation failed during nvme_admin_passthru\n");
1743 		return -ENOMEM;
1744 	}
1745 
1746 	bdev_io->ch = channel;
1747 	bdev_io->type = SPDK_BDEV_IO_TYPE_NVME_IO;
1748 	bdev_io->u.nvme_passthru.cmd = *cmd;
1749 	bdev_io->u.nvme_passthru.buf = buf;
1750 	bdev_io->u.nvme_passthru.nbytes = nbytes;
1751 	bdev_io->u.nvme_passthru.md_buf = NULL;
1752 	bdev_io->u.nvme_passthru.md_len = 0;
1753 
1754 	spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb);
1755 
1756 	spdk_bdev_io_submit(bdev_io);
1757 	return 0;
1758 }
1759 
1760 int
1761 spdk_bdev_nvme_io_passthru_md(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
1762 			      const struct spdk_nvme_cmd *cmd, void *buf, size_t nbytes, void *md_buf, size_t md_len,
1763 			      spdk_bdev_io_completion_cb cb, void *cb_arg)
1764 {
1765 	struct spdk_bdev *bdev = desc->bdev;
1766 	struct spdk_bdev_io *bdev_io;
1767 	struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch);
1768 
1769 	if (!desc->write) {
1770 		/*
1771 		 * Do not try to parse the NVMe command - we could maybe use bits in the opcode
1772 		 *  to easily determine if the command is a read or write, but for now just
1773 		 *  do not allow io_passthru with a read-only descriptor.
1774 		 */
1775 		return -EBADF;
1776 	}
1777 
1778 	bdev_io = spdk_bdev_get_io(channel->mgmt_channel);
1779 	if (!bdev_io) {
1780 		SPDK_ERRLOG("bdev_io memory allocation failed during nvme_admin_passthru\n");
1781 		return -ENOMEM;
1782 	}
1783 
1784 	bdev_io->ch = channel;
1785 	bdev_io->type = SPDK_BDEV_IO_TYPE_NVME_IO_MD;
1786 	bdev_io->u.nvme_passthru.cmd = *cmd;
1787 	bdev_io->u.nvme_passthru.buf = buf;
1788 	bdev_io->u.nvme_passthru.nbytes = nbytes;
1789 	bdev_io->u.nvme_passthru.md_buf = md_buf;
1790 	bdev_io->u.nvme_passthru.md_len = md_len;
1791 
1792 	spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb);
1793 
1794 	spdk_bdev_io_submit(bdev_io);
1795 	return 0;
1796 }
1797 
1798 int
1799 spdk_bdev_free_io(struct spdk_bdev_io *bdev_io)
1800 {
1801 	if (!bdev_io) {
1802 		SPDK_ERRLOG("bdev_io is NULL\n");
1803 		return -1;
1804 	}
1805 
1806 	if (bdev_io->status == SPDK_BDEV_IO_STATUS_PENDING) {
1807 		SPDK_ERRLOG("bdev_io is in pending state\n");
1808 		assert(false);
1809 		return -1;
1810 	}
1811 
1812 	spdk_bdev_put_io(bdev_io);
1813 
1814 	return 0;
1815 }
1816 
1817 static void
1818 _spdk_bdev_ch_retry_io(struct spdk_bdev_channel *bdev_ch)
1819 {
1820 	struct spdk_bdev *bdev = bdev_ch->bdev;
1821 	struct spdk_bdev_module_channel	*shared_ch = bdev_ch->module_ch;
1822 	struct spdk_bdev_io *bdev_io;
1823 
1824 	if (shared_ch->io_outstanding > shared_ch->nomem_threshold) {
1825 		/*
1826 		 * Allow some more I/O to complete before retrying the nomem_io queue.
1827 		 *  Some drivers (such as nvme) cannot immediately take a new I/O in
1828 		 *  the context of a completion, because the resources for the I/O are
1829 		 *  not released until control returns to the bdev poller.  Also, we
1830 		 *  may require several small I/O to complete before a larger I/O
1831 		 *  (that requires splitting) can be submitted.
1832 		 */
1833 		return;
1834 	}
1835 
1836 	while (!TAILQ_EMPTY(&shared_ch->nomem_io)) {
1837 		bdev_io = TAILQ_FIRST(&shared_ch->nomem_io);
1838 		TAILQ_REMOVE(&shared_ch->nomem_io, bdev_io, link);
1839 		shared_ch->io_outstanding++;
1840 		bdev_io->status = SPDK_BDEV_IO_STATUS_PENDING;
1841 		bdev->fn_table->submit_request(bdev_io->ch->channel, bdev_io);
1842 		if (bdev_io->status == SPDK_BDEV_IO_STATUS_NOMEM) {
1843 			break;
1844 		}
1845 	}
1846 }
1847 
1848 static void
1849 _spdk_bdev_io_complete(void *ctx)
1850 {
1851 	struct spdk_bdev_io *bdev_io = ctx;
1852 
1853 	assert(bdev_io->cb != NULL);
1854 	bdev_io->cb(bdev_io, bdev_io->status == SPDK_BDEV_IO_STATUS_SUCCESS, bdev_io->caller_ctx);
1855 }
1856 
1857 static void
1858 _spdk_bdev_reset_complete(struct spdk_io_channel_iter *i, int status)
1859 {
1860 	struct spdk_bdev_io *bdev_io = spdk_io_channel_iter_get_ctx(i);
1861 
1862 	if (bdev_io->u.reset.ch_ref != NULL) {
1863 		spdk_put_io_channel(bdev_io->u.reset.ch_ref);
1864 		bdev_io->u.reset.ch_ref = NULL;
1865 	}
1866 
1867 	_spdk_bdev_io_complete(bdev_io);
1868 }
1869 
1870 static void
1871 _spdk_bdev_unfreeze_channel(struct spdk_io_channel_iter *i)
1872 {
1873 	struct spdk_io_channel *_ch = spdk_io_channel_iter_get_channel(i);
1874 	struct spdk_bdev_channel *ch = spdk_io_channel_get_ctx(_ch);
1875 
1876 	ch->flags &= ~BDEV_CH_RESET_IN_PROGRESS;
1877 	if (!TAILQ_EMPTY(&ch->queued_resets)) {
1878 		_spdk_bdev_channel_start_reset(ch);
1879 	}
1880 
1881 	spdk_for_each_channel_continue(i, 0);
1882 }
1883 
1884 void
1885 spdk_bdev_io_complete(struct spdk_bdev_io *bdev_io, enum spdk_bdev_io_status status)
1886 {
1887 	struct spdk_bdev *bdev = bdev_io->bdev;
1888 	struct spdk_bdev_channel *bdev_ch = bdev_io->ch;
1889 	struct spdk_bdev_module_channel	*shared_ch = bdev_ch->module_ch;
1890 
1891 	bdev_io->status = status;
1892 
1893 	if (spdk_unlikely(bdev_io->type == SPDK_BDEV_IO_TYPE_RESET)) {
1894 		bool unlock_channels = false;
1895 
1896 		if (status == SPDK_BDEV_IO_STATUS_NOMEM) {
1897 			SPDK_ERRLOG("NOMEM returned for reset\n");
1898 		}
1899 		pthread_mutex_lock(&bdev->mutex);
1900 		if (bdev_io == bdev->reset_in_progress) {
1901 			bdev->reset_in_progress = NULL;
1902 			unlock_channels = true;
1903 		}
1904 		pthread_mutex_unlock(&bdev->mutex);
1905 
1906 		if (unlock_channels) {
1907 			spdk_for_each_channel(bdev, _spdk_bdev_unfreeze_channel, bdev_io,
1908 					      _spdk_bdev_reset_complete);
1909 			return;
1910 		}
1911 	} else {
1912 		assert(shared_ch->io_outstanding > 0);
1913 		shared_ch->io_outstanding--;
1914 		if (spdk_likely(status != SPDK_BDEV_IO_STATUS_NOMEM)) {
1915 			if (spdk_unlikely(!TAILQ_EMPTY(&shared_ch->nomem_io))) {
1916 				_spdk_bdev_ch_retry_io(bdev_ch);
1917 			}
1918 		} else {
1919 			TAILQ_INSERT_HEAD(&shared_ch->nomem_io, bdev_io, link);
1920 			/*
1921 			 * Wait for some of the outstanding I/O to complete before we
1922 			 *  retry any of the nomem_io.  Normally we will wait for
1923 			 *  NOMEM_THRESHOLD_COUNT I/O to complete but for low queue
1924 			 *  depth channels we will instead wait for half to complete.
1925 			 */
1926 			shared_ch->nomem_threshold = spdk_max((int64_t)shared_ch->io_outstanding / 2,
1927 							      (int64_t)shared_ch->io_outstanding - NOMEM_THRESHOLD_COUNT);
1928 			return;
1929 		}
1930 	}
1931 
1932 	if (status == SPDK_BDEV_IO_STATUS_SUCCESS) {
1933 		switch (bdev_io->type) {
1934 		case SPDK_BDEV_IO_TYPE_READ:
1935 			bdev_ch->stat.bytes_read += bdev_io->u.bdev.num_blocks * bdev->blocklen;
1936 			bdev_ch->stat.num_read_ops++;
1937 			bdev_ch->stat.read_latency_ticks += (spdk_get_ticks() - bdev_io->submit_tsc);
1938 			break;
1939 		case SPDK_BDEV_IO_TYPE_WRITE:
1940 			bdev_ch->stat.bytes_written += bdev_io->u.bdev.num_blocks * bdev->blocklen;
1941 			bdev_ch->stat.num_write_ops++;
1942 			bdev_ch->stat.write_latency_ticks += (spdk_get_ticks() - bdev_io->submit_tsc);
1943 			break;
1944 		default:
1945 			break;
1946 		}
1947 	}
1948 
1949 #ifdef SPDK_CONFIG_VTUNE
1950 	uint64_t now_tsc = spdk_get_ticks();
1951 	if (now_tsc > (bdev_ch->start_tsc + bdev_ch->interval_tsc)) {
1952 		uint64_t data[5];
1953 
1954 		data[0] = bdev_ch->stat.num_read_ops;
1955 		data[1] = bdev_ch->stat.bytes_read;
1956 		data[2] = bdev_ch->stat.num_write_ops;
1957 		data[3] = bdev_ch->stat.bytes_written;
1958 		data[4] = bdev->fn_table->get_spin_time ?
1959 			  bdev->fn_table->get_spin_time(bdev_ch->channel) : 0;
1960 
1961 		__itt_metadata_add(g_bdev_mgr.domain, __itt_null, bdev_ch->handle,
1962 				   __itt_metadata_u64, 5, data);
1963 
1964 		memset(&bdev_ch->stat, 0, sizeof(bdev_ch->stat));
1965 		bdev_ch->start_tsc = now_tsc;
1966 	}
1967 #endif
1968 
1969 	if (bdev_io->in_submit_request) {
1970 		/*
1971 		 * Defer completion to avoid potential infinite recursion if the
1972 		 * user's completion callback issues a new I/O.
1973 		 */
1974 		spdk_thread_send_msg(spdk_io_channel_get_thread(bdev_ch->channel),
1975 				     _spdk_bdev_io_complete, bdev_io);
1976 	} else {
1977 		_spdk_bdev_io_complete(bdev_io);
1978 	}
1979 }
1980 
1981 void
1982 spdk_bdev_io_complete_scsi_status(struct spdk_bdev_io *bdev_io, enum spdk_scsi_status sc,
1983 				  enum spdk_scsi_sense sk, uint8_t asc, uint8_t ascq)
1984 {
1985 	if (sc == SPDK_SCSI_STATUS_GOOD) {
1986 		bdev_io->status = SPDK_BDEV_IO_STATUS_SUCCESS;
1987 	} else {
1988 		bdev_io->status = SPDK_BDEV_IO_STATUS_SCSI_ERROR;
1989 		bdev_io->error.scsi.sc = sc;
1990 		bdev_io->error.scsi.sk = sk;
1991 		bdev_io->error.scsi.asc = asc;
1992 		bdev_io->error.scsi.ascq = ascq;
1993 	}
1994 
1995 	spdk_bdev_io_complete(bdev_io, bdev_io->status);
1996 }
1997 
1998 void
1999 spdk_bdev_io_get_scsi_status(const struct spdk_bdev_io *bdev_io,
2000 			     int *sc, int *sk, int *asc, int *ascq)
2001 {
2002 	assert(sc != NULL);
2003 	assert(sk != NULL);
2004 	assert(asc != NULL);
2005 	assert(ascq != NULL);
2006 
2007 	switch (bdev_io->status) {
2008 	case SPDK_BDEV_IO_STATUS_SUCCESS:
2009 		*sc = SPDK_SCSI_STATUS_GOOD;
2010 		*sk = SPDK_SCSI_SENSE_NO_SENSE;
2011 		*asc = SPDK_SCSI_ASC_NO_ADDITIONAL_SENSE;
2012 		*ascq = SPDK_SCSI_ASCQ_CAUSE_NOT_REPORTABLE;
2013 		break;
2014 	case SPDK_BDEV_IO_STATUS_NVME_ERROR:
2015 		spdk_scsi_nvme_translate(bdev_io, sc, sk, asc, ascq);
2016 		break;
2017 	case SPDK_BDEV_IO_STATUS_SCSI_ERROR:
2018 		*sc = bdev_io->error.scsi.sc;
2019 		*sk = bdev_io->error.scsi.sk;
2020 		*asc = bdev_io->error.scsi.asc;
2021 		*ascq = bdev_io->error.scsi.ascq;
2022 		break;
2023 	default:
2024 		*sc = SPDK_SCSI_STATUS_CHECK_CONDITION;
2025 		*sk = SPDK_SCSI_SENSE_ABORTED_COMMAND;
2026 		*asc = SPDK_SCSI_ASC_NO_ADDITIONAL_SENSE;
2027 		*ascq = SPDK_SCSI_ASCQ_CAUSE_NOT_REPORTABLE;
2028 		break;
2029 	}
2030 }
2031 
2032 void
2033 spdk_bdev_io_complete_nvme_status(struct spdk_bdev_io *bdev_io, int sct, int sc)
2034 {
2035 	if (sct == SPDK_NVME_SCT_GENERIC && sc == SPDK_NVME_SC_SUCCESS) {
2036 		bdev_io->status = SPDK_BDEV_IO_STATUS_SUCCESS;
2037 	} else {
2038 		bdev_io->error.nvme.sct = sct;
2039 		bdev_io->error.nvme.sc = sc;
2040 		bdev_io->status = SPDK_BDEV_IO_STATUS_NVME_ERROR;
2041 	}
2042 
2043 	spdk_bdev_io_complete(bdev_io, bdev_io->status);
2044 }
2045 
2046 void
2047 spdk_bdev_io_get_nvme_status(const struct spdk_bdev_io *bdev_io, int *sct, int *sc)
2048 {
2049 	assert(sct != NULL);
2050 	assert(sc != NULL);
2051 
2052 	if (bdev_io->status == SPDK_BDEV_IO_STATUS_NVME_ERROR) {
2053 		*sct = bdev_io->error.nvme.sct;
2054 		*sc = bdev_io->error.nvme.sc;
2055 	} else if (bdev_io->status == SPDK_BDEV_IO_STATUS_SUCCESS) {
2056 		*sct = SPDK_NVME_SCT_GENERIC;
2057 		*sc = SPDK_NVME_SC_SUCCESS;
2058 	} else {
2059 		*sct = SPDK_NVME_SCT_GENERIC;
2060 		*sc = SPDK_NVME_SC_INTERNAL_DEVICE_ERROR;
2061 	}
2062 }
2063 
2064 struct spdk_thread *
2065 spdk_bdev_io_get_thread(struct spdk_bdev_io *bdev_io)
2066 {
2067 	return spdk_io_channel_get_thread(bdev_io->ch->channel);
2068 }
2069 
2070 static int
2071 _spdk_bdev_register(struct spdk_bdev *bdev)
2072 {
2073 	struct spdk_bdev_module_if *module;
2074 
2075 	assert(bdev->module != NULL);
2076 
2077 	if (!bdev->name) {
2078 		SPDK_ERRLOG("Bdev name is NULL\n");
2079 		return -EINVAL;
2080 	}
2081 
2082 	if (spdk_bdev_get_by_name(bdev->name)) {
2083 		SPDK_ERRLOG("Bdev name:%s already exists\n", bdev->name);
2084 		return -EEXIST;
2085 	}
2086 
2087 	bdev->status = SPDK_BDEV_STATUS_READY;
2088 
2089 	TAILQ_INIT(&bdev->open_descs);
2090 
2091 	TAILQ_INIT(&bdev->vbdevs);
2092 	TAILQ_INIT(&bdev->base_bdevs);
2093 
2094 	TAILQ_INIT(&bdev->aliases);
2095 
2096 	bdev->reset_in_progress = NULL;
2097 
2098 	spdk_io_device_register(bdev, spdk_bdev_channel_create, spdk_bdev_channel_destroy,
2099 				sizeof(struct spdk_bdev_channel));
2100 
2101 	pthread_mutex_init(&bdev->mutex, NULL);
2102 	SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Inserting bdev %s into list\n", bdev->name);
2103 	TAILQ_INSERT_TAIL(&g_bdev_mgr.bdevs, bdev, link);
2104 
2105 	TAILQ_FOREACH(module, &g_bdev_mgr.bdev_modules, tailq) {
2106 		if (module->examine) {
2107 			module->action_in_progress++;
2108 			module->examine(bdev);
2109 		}
2110 	}
2111 
2112 	return 0;
2113 }
2114 
2115 int
2116 spdk_bdev_register(struct spdk_bdev *bdev)
2117 {
2118 	return _spdk_bdev_register(bdev);
2119 }
2120 
2121 int
2122 spdk_vbdev_register(struct spdk_bdev *vbdev, struct spdk_bdev **base_bdevs, int base_bdev_count)
2123 {
2124 	int i, rc;
2125 
2126 	rc = _spdk_bdev_register(vbdev);
2127 	if (rc) {
2128 		return rc;
2129 	}
2130 
2131 	for (i = 0; i < base_bdev_count; i++) {
2132 		assert(base_bdevs[i] != NULL);
2133 		TAILQ_INSERT_TAIL(&vbdev->base_bdevs, base_bdevs[i], base_bdev_link);
2134 		TAILQ_INSERT_TAIL(&base_bdevs[i]->vbdevs, vbdev, vbdev_link);
2135 	}
2136 
2137 	return 0;
2138 }
2139 
2140 void
2141 spdk_bdev_unregister_done(struct spdk_bdev *bdev, int bdeverrno)
2142 {
2143 	if (bdev->unregister_cb != NULL) {
2144 		bdev->unregister_cb(bdev->unregister_ctx, bdeverrno);
2145 	}
2146 }
2147 
2148 void
2149 spdk_bdev_unregister(struct spdk_bdev *bdev, spdk_bdev_unregister_cb cb_fn, void *cb_arg)
2150 {
2151 	struct spdk_bdev_desc	*desc, *tmp;
2152 	int			rc;
2153 	bool			do_destruct = true;
2154 	struct spdk_bdev	*base_bdev;
2155 
2156 	SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Removing bdev %s from list\n", bdev->name);
2157 
2158 	pthread_mutex_lock(&bdev->mutex);
2159 
2160 	if (!TAILQ_EMPTY(&bdev->base_bdevs)) {
2161 		TAILQ_FOREACH(base_bdev, &bdev->base_bdevs, base_bdev_link) {
2162 			TAILQ_REMOVE(&base_bdev->vbdevs, bdev, vbdev_link);
2163 		}
2164 	}
2165 
2166 	bdev->status = SPDK_BDEV_STATUS_REMOVING;
2167 	bdev->unregister_cb = cb_fn;
2168 	bdev->unregister_ctx = cb_arg;
2169 
2170 	TAILQ_FOREACH_SAFE(desc, &bdev->open_descs, link, tmp) {
2171 		if (desc->remove_cb) {
2172 			pthread_mutex_unlock(&bdev->mutex);
2173 			do_destruct = false;
2174 			desc->remove_cb(desc->remove_ctx);
2175 			pthread_mutex_lock(&bdev->mutex);
2176 		}
2177 	}
2178 
2179 	if (!do_destruct) {
2180 		pthread_mutex_unlock(&bdev->mutex);
2181 		return;
2182 	}
2183 
2184 	TAILQ_REMOVE(&g_bdev_mgr.bdevs, bdev, link);
2185 	pthread_mutex_unlock(&bdev->mutex);
2186 
2187 	pthread_mutex_destroy(&bdev->mutex);
2188 
2189 	spdk_io_device_unregister(bdev, NULL);
2190 
2191 	rc = bdev->fn_table->destruct(bdev->ctxt);
2192 	if (rc < 0) {
2193 		SPDK_ERRLOG("destruct failed\n");
2194 	}
2195 	if (rc <= 0 && cb_fn != NULL) {
2196 		cb_fn(cb_arg, rc);
2197 	}
2198 }
2199 
2200 int
2201 spdk_bdev_open(struct spdk_bdev *bdev, bool write, spdk_bdev_remove_cb_t remove_cb,
2202 	       void *remove_ctx, struct spdk_bdev_desc **_desc)
2203 {
2204 	struct spdk_bdev_desc *desc;
2205 
2206 	desc = calloc(1, sizeof(*desc));
2207 	if (desc == NULL) {
2208 		SPDK_ERRLOG("Failed to allocate memory for bdev descriptor\n");
2209 		return -ENOMEM;
2210 	}
2211 
2212 	pthread_mutex_lock(&bdev->mutex);
2213 
2214 	if (write && bdev->claim_module) {
2215 		SPDK_INFOLOG(SPDK_LOG_BDEV, "Could not open %s - already claimed\n", bdev->name);
2216 		free(desc);
2217 		pthread_mutex_unlock(&bdev->mutex);
2218 		return -EPERM;
2219 	}
2220 
2221 	TAILQ_INSERT_TAIL(&bdev->open_descs, desc, link);
2222 
2223 	desc->bdev = bdev;
2224 	desc->remove_cb = remove_cb;
2225 	desc->remove_ctx = remove_ctx;
2226 	desc->write = write;
2227 	*_desc = desc;
2228 
2229 	pthread_mutex_unlock(&bdev->mutex);
2230 
2231 	return 0;
2232 }
2233 
2234 void
2235 spdk_bdev_close(struct spdk_bdev_desc *desc)
2236 {
2237 	struct spdk_bdev *bdev = desc->bdev;
2238 	bool do_unregister = false;
2239 
2240 	pthread_mutex_lock(&bdev->mutex);
2241 
2242 	TAILQ_REMOVE(&bdev->open_descs, desc, link);
2243 	free(desc);
2244 
2245 	if (bdev->status == SPDK_BDEV_STATUS_REMOVING && TAILQ_EMPTY(&bdev->open_descs)) {
2246 		do_unregister = true;
2247 	}
2248 	pthread_mutex_unlock(&bdev->mutex);
2249 
2250 	if (do_unregister == true) {
2251 		spdk_bdev_unregister(bdev, bdev->unregister_cb, bdev->unregister_ctx);
2252 	}
2253 }
2254 
2255 int
2256 spdk_bdev_module_claim_bdev(struct spdk_bdev *bdev, struct spdk_bdev_desc *desc,
2257 			    struct spdk_bdev_module_if *module)
2258 {
2259 	if (bdev->claim_module != NULL) {
2260 		SPDK_ERRLOG("bdev %s already claimed by module %s\n", bdev->name,
2261 			    bdev->claim_module->name);
2262 		return -EPERM;
2263 	}
2264 
2265 	if (desc && !desc->write) {
2266 		desc->write = true;
2267 	}
2268 
2269 	bdev->claim_module = module;
2270 	return 0;
2271 }
2272 
2273 void
2274 spdk_bdev_module_release_bdev(struct spdk_bdev *bdev)
2275 {
2276 	assert(bdev->claim_module != NULL);
2277 	bdev->claim_module = NULL;
2278 }
2279 
2280 struct spdk_bdev *
2281 spdk_bdev_desc_get_bdev(struct spdk_bdev_desc *desc)
2282 {
2283 	return desc->bdev;
2284 }
2285 
2286 void
2287 spdk_bdev_io_get_iovec(struct spdk_bdev_io *bdev_io, struct iovec **iovp, int *iovcntp)
2288 {
2289 	struct iovec *iovs;
2290 	int iovcnt;
2291 
2292 	if (bdev_io == NULL) {
2293 		return;
2294 	}
2295 
2296 	switch (bdev_io->type) {
2297 	case SPDK_BDEV_IO_TYPE_READ:
2298 		iovs = bdev_io->u.bdev.iovs;
2299 		iovcnt = bdev_io->u.bdev.iovcnt;
2300 		break;
2301 	case SPDK_BDEV_IO_TYPE_WRITE:
2302 		iovs = bdev_io->u.bdev.iovs;
2303 		iovcnt = bdev_io->u.bdev.iovcnt;
2304 		break;
2305 	default:
2306 		iovs = NULL;
2307 		iovcnt = 0;
2308 		break;
2309 	}
2310 
2311 	if (iovp) {
2312 		*iovp = iovs;
2313 	}
2314 	if (iovcntp) {
2315 		*iovcntp = iovcnt;
2316 	}
2317 }
2318 
2319 void
2320 spdk_bdev_module_list_add(struct spdk_bdev_module_if *bdev_module)
2321 {
2322 	/*
2323 	 * Modules with examine callbacks must be initialized first, so they are
2324 	 *  ready to handle examine callbacks from later modules that will
2325 	 *  register physical bdevs.
2326 	 */
2327 	if (bdev_module->examine != NULL) {
2328 		TAILQ_INSERT_HEAD(&g_bdev_mgr.bdev_modules, bdev_module, tailq);
2329 	} else {
2330 		TAILQ_INSERT_TAIL(&g_bdev_mgr.bdev_modules, bdev_module, tailq);
2331 	}
2332 }
2333 
2334 void
2335 spdk_bdev_part_base_free(struct spdk_bdev_part_base *base)
2336 {
2337 	if (base->desc) {
2338 		spdk_bdev_close(base->desc);
2339 		base->desc = NULL;
2340 	}
2341 	base->base_free_fn(base);
2342 }
2343 
2344 void
2345 spdk_bdev_part_free(struct spdk_bdev_part *part)
2346 {
2347 	struct spdk_bdev_part_base *base;
2348 
2349 	assert(part);
2350 	assert(part->base);
2351 
2352 	base = part->base;
2353 	spdk_io_device_unregister(&part->base, NULL);
2354 	TAILQ_REMOVE(base->tailq, part, tailq);
2355 	free(part->bdev.name);
2356 	free(part);
2357 
2358 	if (__sync_sub_and_fetch(&base->ref, 1) == 0) {
2359 		spdk_bdev_module_release_bdev(base->bdev);
2360 		spdk_bdev_part_base_free(base);
2361 	}
2362 }
2363 
2364 void
2365 spdk_bdev_part_base_hotremove(struct spdk_bdev *base_bdev, struct bdev_part_tailq *tailq)
2366 {
2367 	struct spdk_bdev_part *part, *tmp;
2368 
2369 	TAILQ_FOREACH_SAFE(part, tailq, tailq, tmp) {
2370 		if (part->base->bdev == base_bdev) {
2371 			spdk_bdev_unregister(&part->bdev, NULL, NULL);
2372 		}
2373 	}
2374 }
2375 
2376 static bool
2377 spdk_bdev_part_io_type_supported(void *_part, enum spdk_bdev_io_type io_type)
2378 {
2379 	struct spdk_bdev_part *part = _part;
2380 
2381 	return part->base->bdev->fn_table->io_type_supported(part->base->bdev, io_type);
2382 }
2383 
2384 static struct spdk_io_channel *
2385 spdk_bdev_part_get_io_channel(void *_part)
2386 {
2387 	struct spdk_bdev_part *part = _part;
2388 
2389 	return spdk_get_io_channel(&part->base);
2390 }
2391 
2392 static void
2393 spdk_bdev_part_complete_io(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg)
2394 {
2395 	struct spdk_bdev_io *part_io = cb_arg;
2396 	int status = success ? SPDK_BDEV_IO_STATUS_SUCCESS : SPDK_BDEV_IO_STATUS_FAILED;
2397 
2398 	spdk_bdev_io_complete(part_io, status);
2399 	spdk_bdev_free_io(bdev_io);
2400 }
2401 
2402 static void
2403 spdk_bdev_write_zeroes_split(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg)
2404 {
2405 	uint64_t len;
2406 
2407 	if (!success) {
2408 		bdev_io->cb = bdev_io->stored_user_cb;
2409 		_spdk_bdev_io_complete(bdev_io);
2410 		return;
2411 	}
2412 
2413 	/* no need to perform the error checking from write_zeroes_blocks because this request already passed those checks. */
2414 	len = spdk_min(spdk_bdev_get_block_size(bdev_io->bdev) * bdev_io->split_remaining_num_blocks,
2415 		       ZERO_BUFFER_SIZE);
2416 
2417 	bdev_io->u.bdev.offset_blocks = bdev_io->split_current_offset_blocks;
2418 	bdev_io->u.bdev.iov.iov_len = len;
2419 	bdev_io->u.bdev.num_blocks = len / spdk_bdev_get_block_size(bdev_io->bdev);
2420 	bdev_io->split_remaining_num_blocks -= bdev_io->u.bdev.num_blocks;
2421 	bdev_io->split_current_offset_blocks += bdev_io->u.bdev.num_blocks;
2422 
2423 	/* if this round completes the i/o, change the callback to be the original user callback */
2424 	if (bdev_io->split_remaining_num_blocks == 0) {
2425 		spdk_bdev_io_init(bdev_io, bdev_io->bdev, cb_arg, bdev_io->stored_user_cb);
2426 	} else {
2427 		spdk_bdev_io_init(bdev_io, bdev_io->bdev, cb_arg, spdk_bdev_write_zeroes_split);
2428 	}
2429 	spdk_bdev_io_submit(bdev_io);
2430 }
2431 
2432 void
2433 spdk_bdev_part_submit_request(struct spdk_bdev_part_channel *ch, struct spdk_bdev_io *bdev_io)
2434 {
2435 	struct spdk_bdev_part *part = ch->part;
2436 	struct spdk_io_channel *base_ch = ch->base_ch;
2437 	struct spdk_bdev_desc *base_desc = part->base->desc;
2438 	uint64_t offset;
2439 	int rc = 0;
2440 
2441 	/* Modify the I/O to adjust for the offset within the base bdev. */
2442 	switch (bdev_io->type) {
2443 	case SPDK_BDEV_IO_TYPE_READ:
2444 		offset = bdev_io->u.bdev.offset_blocks + part->offset_blocks;
2445 		rc = spdk_bdev_readv_blocks(base_desc, base_ch, bdev_io->u.bdev.iovs,
2446 					    bdev_io->u.bdev.iovcnt, offset,
2447 					    bdev_io->u.bdev.num_blocks, spdk_bdev_part_complete_io,
2448 					    bdev_io);
2449 		break;
2450 	case SPDK_BDEV_IO_TYPE_WRITE:
2451 		offset = bdev_io->u.bdev.offset_blocks + part->offset_blocks;
2452 		rc = spdk_bdev_writev_blocks(base_desc, base_ch, bdev_io->u.bdev.iovs,
2453 					     bdev_io->u.bdev.iovcnt, offset,
2454 					     bdev_io->u.bdev.num_blocks, spdk_bdev_part_complete_io,
2455 					     bdev_io);
2456 		break;
2457 	case SPDK_BDEV_IO_TYPE_WRITE_ZEROES:
2458 		offset = bdev_io->u.bdev.offset_blocks + part->offset_blocks;
2459 		rc = spdk_bdev_write_zeroes_blocks(base_desc, base_ch, offset, bdev_io->u.bdev.num_blocks,
2460 						   spdk_bdev_part_complete_io, bdev_io);
2461 		break;
2462 	case SPDK_BDEV_IO_TYPE_UNMAP:
2463 		offset = bdev_io->u.bdev.offset_blocks + part->offset_blocks;
2464 		rc = spdk_bdev_unmap_blocks(base_desc, base_ch, offset, bdev_io->u.bdev.num_blocks,
2465 					    spdk_bdev_part_complete_io, bdev_io);
2466 		break;
2467 	case SPDK_BDEV_IO_TYPE_FLUSH:
2468 		offset = bdev_io->u.bdev.offset_blocks + part->offset_blocks;
2469 		rc = spdk_bdev_flush_blocks(base_desc, base_ch, offset, bdev_io->u.bdev.num_blocks,
2470 					    spdk_bdev_part_complete_io, bdev_io);
2471 		break;
2472 	case SPDK_BDEV_IO_TYPE_RESET:
2473 		rc = spdk_bdev_reset(base_desc, base_ch,
2474 				     spdk_bdev_part_complete_io, bdev_io);
2475 		break;
2476 	default:
2477 		SPDK_ERRLOG("split: unknown I/O type %d\n", bdev_io->type);
2478 		spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED);
2479 		return;
2480 	}
2481 
2482 	if (rc != 0) {
2483 		spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED);
2484 	}
2485 }
2486 static int
2487 spdk_bdev_part_channel_create_cb(void *io_device, void *ctx_buf)
2488 {
2489 	struct spdk_bdev_part *part = SPDK_CONTAINEROF(io_device, struct spdk_bdev_part, base);
2490 	struct spdk_bdev_part_channel *ch = ctx_buf;
2491 
2492 	ch->part = part;
2493 	ch->base_ch = spdk_bdev_get_io_channel(part->base->desc);
2494 	if (ch->base_ch == NULL) {
2495 		return -1;
2496 	}
2497 
2498 	if (part->base->ch_create_cb) {
2499 		return part->base->ch_create_cb(io_device, ctx_buf);
2500 	} else {
2501 		return 0;
2502 	}
2503 }
2504 
2505 static void
2506 spdk_bdev_part_channel_destroy_cb(void *io_device, void *ctx_buf)
2507 {
2508 	struct spdk_bdev_part *part = SPDK_CONTAINEROF(io_device, struct spdk_bdev_part, base);
2509 	struct spdk_bdev_part_channel *ch = ctx_buf;
2510 
2511 	if (part->base->ch_destroy_cb) {
2512 		part->base->ch_destroy_cb(io_device, ctx_buf);
2513 	}
2514 	spdk_put_io_channel(ch->base_ch);
2515 }
2516 
2517 int
2518 spdk_bdev_part_base_construct(struct spdk_bdev_part_base *base, struct spdk_bdev *bdev,
2519 			      spdk_bdev_remove_cb_t remove_cb, struct spdk_bdev_module_if *module,
2520 			      struct spdk_bdev_fn_table *fn_table, struct bdev_part_tailq *tailq,
2521 			      spdk_bdev_part_base_free_fn free_fn,
2522 			      uint32_t channel_size, spdk_io_channel_create_cb ch_create_cb,
2523 			      spdk_io_channel_destroy_cb ch_destroy_cb)
2524 {
2525 	int rc;
2526 
2527 	fn_table->get_io_channel = spdk_bdev_part_get_io_channel;
2528 	fn_table->io_type_supported = spdk_bdev_part_io_type_supported;
2529 
2530 	base->bdev = bdev;
2531 	base->desc = NULL;
2532 	base->ref = 0;
2533 	base->module = module;
2534 	base->fn_table = fn_table;
2535 	base->tailq = tailq;
2536 	base->claimed = false;
2537 	base->channel_size = channel_size;
2538 	base->ch_create_cb = ch_create_cb;
2539 	base->ch_destroy_cb = ch_destroy_cb;
2540 	base->base_free_fn = free_fn;
2541 
2542 	rc = spdk_bdev_open(bdev, false, remove_cb, bdev, &base->desc);
2543 	if (rc) {
2544 		spdk_bdev_part_base_free(base);
2545 		SPDK_ERRLOG("could not open bdev %s\n", spdk_bdev_get_name(bdev));
2546 		return -1;
2547 	}
2548 
2549 	return 0;
2550 }
2551 
2552 int
2553 spdk_bdev_part_construct(struct spdk_bdev_part *part, struct spdk_bdev_part_base *base,
2554 			 char *name, uint64_t offset_blocks, uint64_t num_blocks,
2555 			 char *product_name)
2556 {
2557 	part->bdev.name = name;
2558 	part->bdev.blocklen = base->bdev->blocklen;
2559 	part->bdev.blockcnt = num_blocks;
2560 	part->offset_blocks = offset_blocks;
2561 
2562 	part->bdev.write_cache = base->bdev->write_cache;
2563 	part->bdev.need_aligned_buffer = base->bdev->need_aligned_buffer;
2564 	part->bdev.product_name = product_name;
2565 	part->bdev.ctxt = part;
2566 	part->bdev.module = base->module;
2567 	part->bdev.fn_table = base->fn_table;
2568 
2569 	__sync_fetch_and_add(&base->ref, 1);
2570 	part->base = base;
2571 
2572 	if (!base->claimed) {
2573 		int rc;
2574 
2575 		rc = spdk_bdev_module_claim_bdev(base->bdev, base->desc, base->module);
2576 		if (rc) {
2577 			SPDK_ERRLOG("could not claim bdev %s\n", spdk_bdev_get_name(base->bdev));
2578 			free(part->bdev.name);
2579 			return -1;
2580 		}
2581 		base->claimed = true;
2582 	}
2583 
2584 	spdk_io_device_register(&part->base, spdk_bdev_part_channel_create_cb,
2585 				spdk_bdev_part_channel_destroy_cb,
2586 				base->channel_size);
2587 	spdk_vbdev_register(&part->bdev, &base->bdev, 1);
2588 	TAILQ_INSERT_TAIL(base->tailq, part, tailq);
2589 
2590 	return 0;
2591 }
2592 
2593 SPDK_LOG_REGISTER_COMPONENT("bdev", SPDK_LOG_BDEV)
2594