xref: /spdk/lib/bdev/bdev.c (revision d8c3ff5fd661c5e5a2ef975ce3849a56a813188f)
1 /*-
2  *   BSD LICENSE
3  *
4  *   Copyright (C) 2008-2012 Daisuke Aoyama <aoyama@peach.ne.jp>.
5  *   Copyright (c) Intel Corporation.
6  *   All rights reserved.
7  *
8  *   Redistribution and use in source and binary forms, with or without
9  *   modification, are permitted provided that the following conditions
10  *   are met:
11  *
12  *     * Redistributions of source code must retain the above copyright
13  *       notice, this list of conditions and the following disclaimer.
14  *     * Redistributions in binary form must reproduce the above copyright
15  *       notice, this list of conditions and the following disclaimer in
16  *       the documentation and/or other materials provided with the
17  *       distribution.
18  *     * Neither the name of Intel Corporation nor the names of its
19  *       contributors may be used to endorse or promote products derived
20  *       from this software without specific prior written permission.
21  *
22  *   THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
23  *   "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
24  *   LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
25  *   A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
26  *   OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
27  *   SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
28  *   LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
29  *   DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
30  *   THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
31  *   (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
32  *   OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
33  */
34 
35 #include "spdk/stdinc.h"
36 
37 #include "spdk/bdev.h"
38 
39 #include "spdk/env.h"
40 #include "spdk/event.h"
41 #include "spdk/io_channel.h"
42 #include "spdk/likely.h"
43 #include "spdk/queue.h"
44 #include "spdk/nvme_spec.h"
45 #include "spdk/scsi_spec.h"
46 #include "spdk/util.h"
47 
48 #include "spdk_internal/bdev.h"
49 #include "spdk_internal/log.h"
50 #include "spdk/string.h"
51 
52 #ifdef SPDK_CONFIG_VTUNE
53 #include "ittnotify.h"
54 #include "ittnotify_types.h"
55 int __itt_init_ittlib(const char *, __itt_group_id);
56 #endif
57 
58 #define SPDK_BDEV_IO_POOL_SIZE	(64 * 1024)
59 #define SPDK_BDEV_IO_CACHE_SIZE	256
60 #define BUF_SMALL_POOL_SIZE	8192
61 #define BUF_LARGE_POOL_SIZE	1024
62 #define NOMEM_THRESHOLD_COUNT	8
63 #define ZERO_BUFFER_SIZE	0x100000
64 
65 typedef TAILQ_HEAD(, spdk_bdev_io) bdev_io_tailq_t;
66 typedef STAILQ_HEAD(, spdk_bdev_io) bdev_io_stailq_t;
67 
68 struct spdk_bdev_mgr {
69 	struct spdk_mempool *bdev_io_pool;
70 
71 	struct spdk_mempool *buf_small_pool;
72 	struct spdk_mempool *buf_large_pool;
73 
74 	void *zero_buffer;
75 
76 	TAILQ_HEAD(, spdk_bdev_module_if) bdev_modules;
77 
78 	TAILQ_HEAD(, spdk_bdev) bdevs;
79 
80 	bool init_complete;
81 	bool module_init_complete;
82 
83 #ifdef SPDK_CONFIG_VTUNE
84 	__itt_domain	*domain;
85 #endif
86 };
87 
88 static struct spdk_bdev_mgr g_bdev_mgr = {
89 	.bdev_modules = TAILQ_HEAD_INITIALIZER(g_bdev_mgr.bdev_modules),
90 	.bdevs = TAILQ_HEAD_INITIALIZER(g_bdev_mgr.bdevs),
91 	.init_complete = false,
92 	.module_init_complete = false,
93 };
94 
95 static spdk_bdev_init_cb	g_init_cb_fn = NULL;
96 static void			*g_init_cb_arg = NULL;
97 
98 static spdk_bdev_fini_cb	g_fini_cb_fn = NULL;
99 static void			*g_fini_cb_arg = NULL;
100 static struct spdk_thread	*g_fini_thread = NULL;
101 
102 
103 struct spdk_bdev_mgmt_channel {
104 	bdev_io_stailq_t need_buf_small;
105 	bdev_io_stailq_t need_buf_large;
106 
107 	/*
108 	 * Each thread keeps a cache of bdev_io - this allows
109 	 *  bdev threads which are *not* DPDK threads to still
110 	 *  benefit from a per-thread bdev_io cache.  Without
111 	 *  this, non-DPDK threads fetching from the mempool
112 	 *  incur a cmpxchg on get and put.
113 	 */
114 	bdev_io_stailq_t per_thread_cache;
115 	uint32_t	per_thread_cache_count;
116 
117 	TAILQ_HEAD(, spdk_bdev_module_channel) module_channels;
118 };
119 
120 struct spdk_bdev_desc {
121 	struct spdk_bdev		*bdev;
122 	spdk_bdev_remove_cb_t		remove_cb;
123 	void				*remove_ctx;
124 	bool				write;
125 	TAILQ_ENTRY(spdk_bdev_desc)	link;
126 };
127 
128 #define BDEV_CH_RESET_IN_PROGRESS	(1 << 0)
129 
130 struct spdk_bdev_channel {
131 	struct spdk_bdev	*bdev;
132 
133 	/* The channel for the underlying device */
134 	struct spdk_io_channel	*channel;
135 
136 	/* Channel for the bdev manager */
137 	struct spdk_io_channel *mgmt_channel;
138 
139 	struct spdk_bdev_io_stat stat;
140 
141 	bdev_io_tailq_t		queued_resets;
142 
143 	uint32_t		flags;
144 
145 	/* Per-device channel */
146 	struct spdk_bdev_module_channel *module_ch;
147 
148 #ifdef SPDK_CONFIG_VTUNE
149 	uint64_t		start_tsc;
150 	uint64_t		interval_tsc;
151 	__itt_string_handle	*handle;
152 #endif
153 
154 };
155 
156 /*
157  * Per-module (or per-io_device) channel. Multiple bdevs built on the same io_device
158  * will queue here their IO that awaits retry. It makes it posible to retry sending
159  * IO to one bdev after IO from other bdev completes.
160  */
161 struct spdk_bdev_module_channel {
162 	/*
163 	 * Count of I/O submitted to bdev module and waiting for completion.
164 	 * Incremented before submit_request() is called on an spdk_bdev_io.
165 	 */
166 	uint64_t		io_outstanding;
167 
168 	/*
169 	 * Queue of IO awaiting retry because of a previous NOMEM status returned
170 	 *  on this channel.
171 	 */
172 	bdev_io_tailq_t		nomem_io;
173 
174 	/*
175 	 * Threshold which io_outstanding must drop to before retrying nomem_io.
176 	 */
177 	uint64_t		nomem_threshold;
178 
179 	/* I/O channel allocated by a bdev module */
180 	struct spdk_io_channel	*module_ch;
181 
182 	uint32_t		ref;
183 
184 	TAILQ_ENTRY(spdk_bdev_module_channel) link;
185 };
186 
187 static void spdk_bdev_write_zeroes_split(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg);
188 
189 struct spdk_bdev *
190 spdk_bdev_first(void)
191 {
192 	struct spdk_bdev *bdev;
193 
194 	bdev = TAILQ_FIRST(&g_bdev_mgr.bdevs);
195 	if (bdev) {
196 		SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Starting bdev iteration at %s\n", bdev->name);
197 	}
198 
199 	return bdev;
200 }
201 
202 struct spdk_bdev *
203 spdk_bdev_next(struct spdk_bdev *prev)
204 {
205 	struct spdk_bdev *bdev;
206 
207 	bdev = TAILQ_NEXT(prev, link);
208 	if (bdev) {
209 		SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Continuing bdev iteration at %s\n", bdev->name);
210 	}
211 
212 	return bdev;
213 }
214 
215 static struct spdk_bdev *
216 _bdev_next_leaf(struct spdk_bdev *bdev)
217 {
218 	while (bdev != NULL) {
219 		if (TAILQ_EMPTY(&bdev->vbdevs)) {
220 			return bdev;
221 		} else {
222 			bdev = TAILQ_NEXT(bdev, link);
223 		}
224 	}
225 
226 	return bdev;
227 }
228 
229 struct spdk_bdev *
230 spdk_bdev_first_leaf(void)
231 {
232 	struct spdk_bdev *bdev;
233 
234 	bdev = _bdev_next_leaf(TAILQ_FIRST(&g_bdev_mgr.bdevs));
235 
236 	if (bdev) {
237 		SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Starting bdev iteration at %s\n", bdev->name);
238 	}
239 
240 	return bdev;
241 }
242 
243 struct spdk_bdev *
244 spdk_bdev_next_leaf(struct spdk_bdev *prev)
245 {
246 	struct spdk_bdev *bdev;
247 
248 	bdev = _bdev_next_leaf(TAILQ_NEXT(prev, link));
249 
250 	if (bdev) {
251 		SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Continuing bdev iteration at %s\n", bdev->name);
252 	}
253 
254 	return bdev;
255 }
256 
257 struct spdk_bdev *
258 spdk_bdev_get_by_name(const char *bdev_name)
259 {
260 	struct spdk_bdev_alias *tmp;
261 	struct spdk_bdev *bdev = spdk_bdev_first();
262 
263 	while (bdev != NULL) {
264 		if (strcmp(bdev_name, bdev->name) == 0) {
265 			return bdev;
266 		}
267 
268 		TAILQ_FOREACH(tmp, &bdev->aliases, tailq) {
269 			if (strcmp(bdev_name, tmp->alias) == 0) {
270 				return bdev;
271 			}
272 		}
273 
274 		bdev = spdk_bdev_next(bdev);
275 	}
276 
277 	return NULL;
278 }
279 
280 static void
281 spdk_bdev_io_set_buf(struct spdk_bdev_io *bdev_io, void *buf)
282 {
283 	assert(bdev_io->get_buf_cb != NULL);
284 	assert(buf != NULL);
285 	assert(bdev_io->u.bdev.iovs != NULL);
286 
287 	bdev_io->buf = buf;
288 	bdev_io->u.bdev.iovs[0].iov_base = (void *)((unsigned long)((char *)buf + 512) & ~511UL);
289 	bdev_io->u.bdev.iovs[0].iov_len = bdev_io->buf_len;
290 	bdev_io->get_buf_cb(bdev_io->ch->channel, bdev_io);
291 }
292 
293 static void
294 spdk_bdev_io_put_buf(struct spdk_bdev_io *bdev_io)
295 {
296 	struct spdk_mempool *pool;
297 	struct spdk_bdev_io *tmp;
298 	void *buf;
299 	bdev_io_stailq_t *stailq;
300 	struct spdk_bdev_mgmt_channel *ch;
301 
302 	assert(bdev_io->u.bdev.iovcnt == 1);
303 
304 	buf = bdev_io->buf;
305 	ch = bdev_io->mgmt_ch;
306 
307 	if (bdev_io->buf_len <= SPDK_BDEV_SMALL_BUF_MAX_SIZE) {
308 		pool = g_bdev_mgr.buf_small_pool;
309 		stailq = &ch->need_buf_small;
310 	} else {
311 		pool = g_bdev_mgr.buf_large_pool;
312 		stailq = &ch->need_buf_large;
313 	}
314 
315 	if (STAILQ_EMPTY(stailq)) {
316 		spdk_mempool_put(pool, buf);
317 	} else {
318 		tmp = STAILQ_FIRST(stailq);
319 		STAILQ_REMOVE_HEAD(stailq, buf_link);
320 		spdk_bdev_io_set_buf(tmp, buf);
321 	}
322 }
323 
324 void
325 spdk_bdev_io_get_buf(struct spdk_bdev_io *bdev_io, spdk_bdev_io_get_buf_cb cb, uint64_t len)
326 {
327 	struct spdk_mempool *pool;
328 	bdev_io_stailq_t *stailq;
329 	void *buf = NULL;
330 	struct spdk_bdev_mgmt_channel *ch;
331 
332 	assert(cb != NULL);
333 	assert(bdev_io->u.bdev.iovs != NULL);
334 
335 	if (spdk_unlikely(bdev_io->u.bdev.iovs[0].iov_base != NULL)) {
336 		/* Buffer already present */
337 		cb(bdev_io->ch->channel, bdev_io);
338 		return;
339 	}
340 
341 	assert(len <= SPDK_BDEV_LARGE_BUF_MAX_SIZE);
342 	ch = spdk_io_channel_get_ctx(bdev_io->ch->mgmt_channel);
343 
344 	bdev_io->buf_len = len;
345 	bdev_io->get_buf_cb = cb;
346 	if (len <= SPDK_BDEV_SMALL_BUF_MAX_SIZE) {
347 		pool = g_bdev_mgr.buf_small_pool;
348 		stailq = &ch->need_buf_small;
349 	} else {
350 		pool = g_bdev_mgr.buf_large_pool;
351 		stailq = &ch->need_buf_large;
352 	}
353 
354 	buf = spdk_mempool_get(pool);
355 
356 	if (!buf) {
357 		STAILQ_INSERT_TAIL(stailq, bdev_io, buf_link);
358 	} else {
359 		spdk_bdev_io_set_buf(bdev_io, buf);
360 	}
361 }
362 
363 static int
364 spdk_bdev_module_get_max_ctx_size(void)
365 {
366 	struct spdk_bdev_module_if *bdev_module;
367 	int max_bdev_module_size = 0;
368 
369 	TAILQ_FOREACH(bdev_module, &g_bdev_mgr.bdev_modules, tailq) {
370 		if (bdev_module->get_ctx_size && bdev_module->get_ctx_size() > max_bdev_module_size) {
371 			max_bdev_module_size = bdev_module->get_ctx_size();
372 		}
373 	}
374 
375 	return max_bdev_module_size;
376 }
377 
378 void
379 spdk_bdev_config_text(FILE *fp)
380 {
381 	struct spdk_bdev_module_if *bdev_module;
382 
383 	TAILQ_FOREACH(bdev_module, &g_bdev_mgr.bdev_modules, tailq) {
384 		if (bdev_module->config_text) {
385 			bdev_module->config_text(fp);
386 		}
387 	}
388 }
389 
390 static int
391 spdk_bdev_mgmt_channel_create(void *io_device, void *ctx_buf)
392 {
393 	struct spdk_bdev_mgmt_channel *ch = ctx_buf;
394 
395 	STAILQ_INIT(&ch->need_buf_small);
396 	STAILQ_INIT(&ch->need_buf_large);
397 
398 	STAILQ_INIT(&ch->per_thread_cache);
399 	ch->per_thread_cache_count = 0;
400 
401 	TAILQ_INIT(&ch->module_channels);
402 
403 	return 0;
404 }
405 
406 static void
407 spdk_bdev_mgmt_channel_free_resources(struct spdk_bdev_mgmt_channel *ch)
408 {
409 	struct spdk_bdev_io *bdev_io;
410 
411 	if (!STAILQ_EMPTY(&ch->need_buf_small) || !STAILQ_EMPTY(&ch->need_buf_large)) {
412 		SPDK_ERRLOG("Pending I/O list wasn't empty on channel free\n");
413 	}
414 
415 	while (!STAILQ_EMPTY(&ch->per_thread_cache)) {
416 		bdev_io = STAILQ_FIRST(&ch->per_thread_cache);
417 		STAILQ_REMOVE_HEAD(&ch->per_thread_cache, buf_link);
418 		ch->per_thread_cache_count--;
419 		spdk_mempool_put(g_bdev_mgr.bdev_io_pool, (void *)bdev_io);
420 	}
421 
422 	assert(ch->per_thread_cache_count == 0);
423 }
424 
425 static void
426 spdk_bdev_mgmt_channel_destroy(void *io_device, void *ctx_buf)
427 {
428 	struct spdk_bdev_mgmt_channel *ch = ctx_buf;
429 
430 	spdk_bdev_mgmt_channel_free_resources(ch);
431 }
432 
433 static void
434 spdk_bdev_init_complete(int rc)
435 {
436 	spdk_bdev_init_cb cb_fn = g_init_cb_fn;
437 	void *cb_arg = g_init_cb_arg;
438 
439 	g_bdev_mgr.init_complete = true;
440 	g_init_cb_fn = NULL;
441 	g_init_cb_arg = NULL;
442 
443 	cb_fn(cb_arg, rc);
444 }
445 
446 static void
447 spdk_bdev_module_action_complete(void)
448 {
449 	struct spdk_bdev_module_if *m;
450 
451 	/*
452 	 * Don't finish bdev subsystem initialization if
453 	 * module pre-initialization is still in progress, or
454 	 * the subsystem been already initialized.
455 	 */
456 	if (!g_bdev_mgr.module_init_complete || g_bdev_mgr.init_complete) {
457 		return;
458 	}
459 
460 	/*
461 	 * Check all bdev modules for inits/examinations in progress. If any
462 	 * exist, return immediately since we cannot finish bdev subsystem
463 	 * initialization until all are completed.
464 	 */
465 	TAILQ_FOREACH(m, &g_bdev_mgr.bdev_modules, tailq) {
466 		if (m->action_in_progress > 0) {
467 			return;
468 		}
469 	}
470 
471 	/*
472 	 * Modules already finished initialization - now that all
473 	 * the bdev modules have finished their asynchronous I/O
474 	 * processing, the entire bdev layer can be marked as complete.
475 	 */
476 	spdk_bdev_init_complete(0);
477 }
478 
479 static void
480 spdk_bdev_module_action_done(struct spdk_bdev_module_if *module)
481 {
482 	assert(module->action_in_progress > 0);
483 	module->action_in_progress--;
484 	spdk_bdev_module_action_complete();
485 }
486 
487 void
488 spdk_bdev_module_init_done(struct spdk_bdev_module_if *module)
489 {
490 	spdk_bdev_module_action_done(module);
491 }
492 
493 void
494 spdk_bdev_module_examine_done(struct spdk_bdev_module_if *module)
495 {
496 	spdk_bdev_module_action_done(module);
497 }
498 
499 static int
500 spdk_bdev_modules_init(void)
501 {
502 	struct spdk_bdev_module_if *module;
503 	int rc = 0;
504 
505 	TAILQ_FOREACH(module, &g_bdev_mgr.bdev_modules, tailq) {
506 		rc = module->module_init();
507 		if (rc != 0) {
508 			break;
509 		}
510 	}
511 
512 	g_bdev_mgr.module_init_complete = true;
513 	return rc;
514 }
515 void
516 spdk_bdev_initialize(spdk_bdev_init_cb cb_fn, void *cb_arg)
517 {
518 	int cache_size;
519 	int rc = 0;
520 	char mempool_name[32];
521 
522 	assert(cb_fn != NULL);
523 
524 	g_init_cb_fn = cb_fn;
525 	g_init_cb_arg = cb_arg;
526 
527 	snprintf(mempool_name, sizeof(mempool_name), "bdev_io_%d", getpid());
528 
529 	g_bdev_mgr.bdev_io_pool = spdk_mempool_create(mempool_name,
530 				  SPDK_BDEV_IO_POOL_SIZE,
531 				  sizeof(struct spdk_bdev_io) +
532 				  spdk_bdev_module_get_max_ctx_size(),
533 				  0,
534 				  SPDK_ENV_SOCKET_ID_ANY);
535 
536 	if (g_bdev_mgr.bdev_io_pool == NULL) {
537 		SPDK_ERRLOG("could not allocate spdk_bdev_io pool\n");
538 		spdk_bdev_init_complete(-1);
539 		return;
540 	}
541 
542 	/**
543 	 * Ensure no more than half of the total buffers end up local caches, by
544 	 *   using spdk_env_get_core_count() to determine how many local caches we need
545 	 *   to account for.
546 	 */
547 	cache_size = BUF_SMALL_POOL_SIZE / (2 * spdk_env_get_core_count());
548 	snprintf(mempool_name, sizeof(mempool_name), "buf_small_pool_%d", getpid());
549 
550 	g_bdev_mgr.buf_small_pool = spdk_mempool_create(mempool_name,
551 				    BUF_SMALL_POOL_SIZE,
552 				    SPDK_BDEV_SMALL_BUF_MAX_SIZE + 512,
553 				    cache_size,
554 				    SPDK_ENV_SOCKET_ID_ANY);
555 	if (!g_bdev_mgr.buf_small_pool) {
556 		SPDK_ERRLOG("create rbuf small pool failed\n");
557 		spdk_bdev_init_complete(-1);
558 		return;
559 	}
560 
561 	cache_size = BUF_LARGE_POOL_SIZE / (2 * spdk_env_get_core_count());
562 	snprintf(mempool_name, sizeof(mempool_name), "buf_large_pool_%d", getpid());
563 
564 	g_bdev_mgr.buf_large_pool = spdk_mempool_create(mempool_name,
565 				    BUF_LARGE_POOL_SIZE,
566 				    SPDK_BDEV_LARGE_BUF_MAX_SIZE + 512,
567 				    cache_size,
568 				    SPDK_ENV_SOCKET_ID_ANY);
569 	if (!g_bdev_mgr.buf_large_pool) {
570 		SPDK_ERRLOG("create rbuf large pool failed\n");
571 		spdk_bdev_init_complete(-1);
572 		return;
573 	}
574 
575 	g_bdev_mgr.zero_buffer = spdk_dma_zmalloc(ZERO_BUFFER_SIZE, ZERO_BUFFER_SIZE,
576 				 NULL);
577 	if (!g_bdev_mgr.zero_buffer) {
578 		SPDK_ERRLOG("create bdev zero buffer failed\n");
579 		spdk_bdev_init_complete(-1);
580 		return;
581 	}
582 
583 #ifdef SPDK_CONFIG_VTUNE
584 	g_bdev_mgr.domain = __itt_domain_create("spdk_bdev");
585 #endif
586 
587 	spdk_io_device_register(&g_bdev_mgr, spdk_bdev_mgmt_channel_create,
588 				spdk_bdev_mgmt_channel_destroy,
589 				sizeof(struct spdk_bdev_mgmt_channel));
590 
591 	rc = spdk_bdev_modules_init();
592 	if (rc != 0) {
593 		SPDK_ERRLOG("bdev modules init failed\n");
594 		spdk_bdev_init_complete(-1);
595 		return;
596 	}
597 
598 	spdk_bdev_module_action_complete();
599 }
600 
601 static void
602 spdk_bdev_module_finish_cb(void *io_device)
603 {
604 	spdk_bdev_fini_cb cb_fn = g_fini_cb_fn;
605 
606 	cb_fn(g_fini_cb_arg);
607 	g_fini_cb_fn = NULL;
608 	g_fini_cb_arg = NULL;
609 }
610 
611 static void
612 spdk_bdev_module_finish_complete(struct spdk_io_channel_iter *i, int status)
613 {
614 	if (spdk_mempool_count(g_bdev_mgr.bdev_io_pool) != SPDK_BDEV_IO_POOL_SIZE) {
615 		SPDK_ERRLOG("bdev IO pool count is %zu but should be %u\n",
616 			    spdk_mempool_count(g_bdev_mgr.bdev_io_pool),
617 			    SPDK_BDEV_IO_POOL_SIZE);
618 	}
619 
620 	if (spdk_mempool_count(g_bdev_mgr.buf_small_pool) != BUF_SMALL_POOL_SIZE) {
621 		SPDK_ERRLOG("Small buffer pool count is %zu but should be %u\n",
622 			    spdk_mempool_count(g_bdev_mgr.buf_small_pool),
623 			    BUF_SMALL_POOL_SIZE);
624 		assert(false);
625 	}
626 
627 	if (spdk_mempool_count(g_bdev_mgr.buf_large_pool) != BUF_LARGE_POOL_SIZE) {
628 		SPDK_ERRLOG("Large buffer pool count is %zu but should be %u\n",
629 			    spdk_mempool_count(g_bdev_mgr.buf_large_pool),
630 			    BUF_LARGE_POOL_SIZE);
631 		assert(false);
632 	}
633 
634 	spdk_mempool_free(g_bdev_mgr.bdev_io_pool);
635 	spdk_mempool_free(g_bdev_mgr.buf_small_pool);
636 	spdk_mempool_free(g_bdev_mgr.buf_large_pool);
637 	spdk_dma_free(g_bdev_mgr.zero_buffer);
638 
639 	spdk_io_device_unregister(&g_bdev_mgr, spdk_bdev_module_finish_cb);
640 }
641 
642 static void
643 mgmt_channel_free_resources(struct spdk_io_channel_iter *i)
644 {
645 	struct spdk_io_channel *_ch = spdk_io_channel_iter_get_channel(i);
646 	struct spdk_bdev_mgmt_channel *ch = spdk_io_channel_get_ctx(_ch);
647 
648 	spdk_bdev_mgmt_channel_free_resources(ch);
649 	spdk_for_each_channel_continue(i, 0);
650 }
651 
652 static void
653 spdk_bdev_module_finish_iter(void *arg)
654 {
655 	/* Notice that this variable is static. It is saved between calls to
656 	 * this function. */
657 	static struct spdk_bdev_module_if *resume_bdev_module = NULL;
658 	struct spdk_bdev_module_if *bdev_module;
659 
660 	/* Start iterating from the last touched module */
661 	if (!resume_bdev_module) {
662 		bdev_module = TAILQ_FIRST(&g_bdev_mgr.bdev_modules);
663 	} else {
664 		bdev_module = TAILQ_NEXT(resume_bdev_module, tailq);
665 	}
666 
667 	while (bdev_module) {
668 		if (bdev_module->async_fini) {
669 			/* Save our place so we can resume later. We must
670 			 * save the variable here, before calling module_fini()
671 			 * below, because in some cases the module may immediately
672 			 * call spdk_bdev_module_finish_done() and re-enter
673 			 * this function to continue iterating. */
674 			resume_bdev_module = bdev_module;
675 		}
676 
677 		if (bdev_module->module_fini) {
678 			bdev_module->module_fini();
679 		}
680 
681 		if (bdev_module->async_fini) {
682 			return;
683 		}
684 
685 		bdev_module = TAILQ_NEXT(bdev_module, tailq);
686 	}
687 
688 	resume_bdev_module = NULL;
689 	spdk_for_each_channel(&g_bdev_mgr, mgmt_channel_free_resources, NULL,
690 			      spdk_bdev_module_finish_complete);
691 }
692 
693 void
694 spdk_bdev_module_finish_done(void)
695 {
696 	if (spdk_get_thread() != g_fini_thread) {
697 		spdk_thread_send_msg(g_fini_thread, spdk_bdev_module_finish_iter, NULL);
698 	} else {
699 		spdk_bdev_module_finish_iter(NULL);
700 	}
701 }
702 
703 static void
704 _spdk_bdev_finish_unregister_bdevs_iter(void *cb_arg, int bdeverrno)
705 {
706 	struct spdk_bdev *bdev = cb_arg;
707 
708 	if (bdeverrno && bdev) {
709 		SPDK_WARNLOG("Unable to unregister bdev '%s' during spdk_bdev_finish()\n",
710 			     bdev->name);
711 
712 		/*
713 		 * Since the call to spdk_bdev_unregister() failed, we have no way to free this
714 		 *  bdev; try to continue by manually removing this bdev from the list and continue
715 		 *  with the next bdev in the list.
716 		 */
717 		TAILQ_REMOVE(&g_bdev_mgr.bdevs, bdev, link);
718 	}
719 
720 	if (TAILQ_EMPTY(&g_bdev_mgr.bdevs)) {
721 		SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Done unregistering bdevs\n");
722 		spdk_bdev_module_finish_iter(NULL);
723 		return;
724 	}
725 
726 	/*
727 	 * Unregister the first bdev in the list.
728 	 *
729 	 * spdk_bdev_unregister() will handle the case where the bdev has open descriptors by
730 	 *  calling the remove_cb of the descriptors first.
731 	 *
732 	 * Once this bdev and all of its open descriptors have been cleaned up, this function
733 	 *  will be called again via the unregister completion callback to continue the cleanup
734 	 *  process with the next bdev.
735 	 */
736 	bdev = TAILQ_FIRST(&g_bdev_mgr.bdevs);
737 	SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Unregistering bdev '%s'\n", bdev->name);
738 	spdk_bdev_unregister(bdev, _spdk_bdev_finish_unregister_bdevs_iter, bdev);
739 }
740 
741 static void
742 _spdk_bdev_finish_unregister_bdevs(void)
743 {
744 	_spdk_bdev_finish_unregister_bdevs_iter(NULL, 0);
745 }
746 
747 void
748 spdk_bdev_finish(spdk_bdev_fini_cb cb_fn, void *cb_arg)
749 {
750 	assert(cb_fn != NULL);
751 
752 	g_fini_thread = spdk_get_thread();
753 
754 	g_fini_cb_fn = cb_fn;
755 	g_fini_cb_arg = cb_arg;
756 
757 	_spdk_bdev_finish_unregister_bdevs();
758 }
759 
760 static struct spdk_bdev_io *
761 spdk_bdev_get_io(struct spdk_io_channel *_ch)
762 {
763 	struct spdk_bdev_mgmt_channel *ch = spdk_io_channel_get_ctx(_ch);
764 	struct spdk_bdev_io *bdev_io;
765 
766 	if (ch->per_thread_cache_count > 0) {
767 		bdev_io = STAILQ_FIRST(&ch->per_thread_cache);
768 		STAILQ_REMOVE_HEAD(&ch->per_thread_cache, buf_link);
769 		ch->per_thread_cache_count--;
770 	} else {
771 		bdev_io = spdk_mempool_get(g_bdev_mgr.bdev_io_pool);
772 		if (!bdev_io) {
773 			SPDK_ERRLOG("Unable to get spdk_bdev_io\n");
774 			abort();
775 		}
776 	}
777 
778 	bdev_io->mgmt_ch = ch;
779 
780 	return bdev_io;
781 }
782 
783 static void
784 spdk_bdev_put_io(struct spdk_bdev_io *bdev_io)
785 {
786 	struct spdk_bdev_mgmt_channel *ch = bdev_io->mgmt_ch;
787 
788 	if (bdev_io->buf != NULL) {
789 		spdk_bdev_io_put_buf(bdev_io);
790 	}
791 
792 	if (ch->per_thread_cache_count < SPDK_BDEV_IO_CACHE_SIZE) {
793 		ch->per_thread_cache_count++;
794 		STAILQ_INSERT_TAIL(&ch->per_thread_cache, bdev_io, buf_link);
795 	} else {
796 		spdk_mempool_put(g_bdev_mgr.bdev_io_pool, (void *)bdev_io);
797 	}
798 }
799 
800 static void
801 spdk_bdev_io_submit(struct spdk_bdev_io *bdev_io)
802 {
803 	struct spdk_bdev *bdev = bdev_io->bdev;
804 	struct spdk_bdev_channel *bdev_ch = bdev_io->ch;
805 	struct spdk_io_channel *ch = bdev_ch->channel;
806 	struct spdk_bdev_module_channel	*shared_ch = bdev_ch->module_ch;
807 
808 	assert(bdev_io->status == SPDK_BDEV_IO_STATUS_PENDING);
809 
810 	bdev_io->submit_tsc = spdk_get_ticks();
811 	shared_ch->io_outstanding++;
812 	bdev_io->in_submit_request = true;
813 	if (spdk_likely(bdev_ch->flags == 0)) {
814 		if (spdk_likely(TAILQ_EMPTY(&shared_ch->nomem_io))) {
815 			bdev->fn_table->submit_request(ch, bdev_io);
816 		} else {
817 			shared_ch->io_outstanding--;
818 			TAILQ_INSERT_TAIL(&shared_ch->nomem_io, bdev_io, link);
819 		}
820 	} else if (bdev_ch->flags & BDEV_CH_RESET_IN_PROGRESS) {
821 		spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED);
822 	} else {
823 		SPDK_ERRLOG("unknown bdev_ch flag %x found\n", bdev_ch->flags);
824 		spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED);
825 	}
826 	bdev_io->in_submit_request = false;
827 }
828 
829 static void
830 spdk_bdev_io_submit_reset(struct spdk_bdev_io *bdev_io)
831 {
832 	struct spdk_bdev *bdev = bdev_io->bdev;
833 	struct spdk_bdev_channel *bdev_ch = bdev_io->ch;
834 	struct spdk_io_channel *ch = bdev_ch->channel;
835 
836 	assert(bdev_io->status == SPDK_BDEV_IO_STATUS_PENDING);
837 
838 	bdev_io->in_submit_request = true;
839 	bdev->fn_table->submit_request(ch, bdev_io);
840 	bdev_io->in_submit_request = false;
841 }
842 
843 static void
844 spdk_bdev_io_init(struct spdk_bdev_io *bdev_io,
845 		  struct spdk_bdev *bdev, void *cb_arg,
846 		  spdk_bdev_io_completion_cb cb)
847 {
848 	bdev_io->bdev = bdev;
849 	bdev_io->caller_ctx = cb_arg;
850 	bdev_io->cb = cb;
851 	bdev_io->status = SPDK_BDEV_IO_STATUS_PENDING;
852 	bdev_io->in_submit_request = false;
853 	bdev_io->buf = NULL;
854 }
855 
856 bool
857 spdk_bdev_io_type_supported(struct spdk_bdev *bdev, enum spdk_bdev_io_type io_type)
858 {
859 	return bdev->fn_table->io_type_supported(bdev->ctxt, io_type);
860 }
861 
862 int
863 spdk_bdev_dump_config_json(struct spdk_bdev *bdev, struct spdk_json_write_ctx *w)
864 {
865 	if (bdev->fn_table->dump_config_json) {
866 		return bdev->fn_table->dump_config_json(bdev->ctxt, w);
867 	}
868 
869 	return 0;
870 }
871 
872 static int
873 _spdk_bdev_channel_create(struct spdk_bdev_channel *ch, void *io_device)
874 {
875 	struct spdk_bdev		*bdev = io_device;
876 	struct spdk_bdev_mgmt_channel	*mgmt_ch;
877 	struct spdk_bdev_module_channel	*shared_ch;
878 
879 	ch->bdev = io_device;
880 	ch->channel = bdev->fn_table->get_io_channel(bdev->ctxt);
881 	if (!ch->channel) {
882 		return -1;
883 	}
884 
885 	ch->mgmt_channel = spdk_get_io_channel(&g_bdev_mgr);
886 	if (!ch->mgmt_channel) {
887 		return -1;
888 	}
889 
890 	mgmt_ch = spdk_io_channel_get_ctx(ch->mgmt_channel);
891 	TAILQ_FOREACH(shared_ch, &mgmt_ch->module_channels, link) {
892 		if (shared_ch->module_ch == ch->channel) {
893 			shared_ch->ref++;
894 			break;
895 		}
896 	}
897 
898 	if (shared_ch == NULL) {
899 		shared_ch = calloc(1, sizeof(*shared_ch));
900 		if (!shared_ch) {
901 			return -1;
902 		}
903 
904 		shared_ch->io_outstanding = 0;
905 		TAILQ_INIT(&shared_ch->nomem_io);
906 		shared_ch->nomem_threshold = 0;
907 		shared_ch->module_ch = ch->channel;
908 		shared_ch->ref = 1;
909 		TAILQ_INSERT_TAIL(&mgmt_ch->module_channels, shared_ch, link);
910 	}
911 
912 	memset(&ch->stat, 0, sizeof(ch->stat));
913 	TAILQ_INIT(&ch->queued_resets);
914 	ch->flags = 0;
915 	ch->module_ch = shared_ch;
916 
917 	return 0;
918 }
919 
920 static void
921 _spdk_bdev_channel_destroy_resource(struct spdk_bdev_channel *ch)
922 {
923 	struct spdk_bdev_mgmt_channel	*mgmt_channel;
924 	struct spdk_bdev_module_channel	*shared_ch = ch->module_ch;
925 
926 	if (ch->channel) {
927 		spdk_put_io_channel(ch->channel);
928 	}
929 
930 	if (ch->mgmt_channel) {
931 		if (shared_ch) {
932 			assert(shared_ch->ref > 0);
933 			shared_ch->ref--;
934 			if (shared_ch->ref == 0) {
935 				mgmt_channel = spdk_io_channel_get_ctx(ch->mgmt_channel);
936 				assert(shared_ch->io_outstanding == 0);
937 				TAILQ_REMOVE(&mgmt_channel->module_channels, shared_ch, link);
938 				free(shared_ch);
939 			}
940 		}
941 		spdk_put_io_channel(ch->mgmt_channel);
942 	}
943 }
944 
945 static int
946 spdk_bdev_channel_create(void *io_device, void *ctx_buf)
947 {
948 	struct spdk_bdev_channel	*ch = ctx_buf;
949 
950 	if (_spdk_bdev_channel_create(ch, io_device) != 0) {
951 		_spdk_bdev_channel_destroy_resource(ch);
952 		return -1;
953 	}
954 
955 #ifdef SPDK_CONFIG_VTUNE
956 	{
957 		char *name;
958 		__itt_init_ittlib(NULL, 0);
959 		name = spdk_sprintf_alloc("spdk_bdev_%s_%p", ch->bdev->name, ch);
960 		if (!name) {
961 			_spdk_bdev_channel_destroy_resource(ch);
962 			return -1;
963 		}
964 		ch->handle = __itt_string_handle_create(name);
965 		free(name);
966 		ch->start_tsc = spdk_get_ticks();
967 		ch->interval_tsc = spdk_get_ticks_hz() / 100;
968 	}
969 #endif
970 
971 	return 0;
972 }
973 
974 /*
975  * Abort I/O that are waiting on a data buffer.  These types of I/O are
976  *  linked using the spdk_bdev_io buf_link TAILQ_ENTRY.
977  */
978 static void
979 _spdk_bdev_abort_buf_io(bdev_io_stailq_t *queue, struct spdk_bdev_channel *ch)
980 {
981 	bdev_io_stailq_t tmp;
982 	struct spdk_bdev_io *bdev_io;
983 
984 	STAILQ_INIT(&tmp);
985 
986 	while (!STAILQ_EMPTY(queue)) {
987 		bdev_io = STAILQ_FIRST(queue);
988 		STAILQ_REMOVE_HEAD(queue, buf_link);
989 		if (bdev_io->ch == ch) {
990 			spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED);
991 		} else {
992 			STAILQ_INSERT_TAIL(&tmp, bdev_io, buf_link);
993 		}
994 	}
995 
996 	STAILQ_SWAP(&tmp, queue, spdk_bdev_io);
997 }
998 
999 /*
1000  * Abort I/O that are queued waiting for submission.  These types of I/O are
1001  *  linked using the spdk_bdev_io link TAILQ_ENTRY.
1002  */
1003 static void
1004 _spdk_bdev_abort_queued_io(bdev_io_tailq_t *queue, struct spdk_bdev_channel *ch)
1005 {
1006 	struct spdk_bdev_io *bdev_io, *tmp;
1007 
1008 	TAILQ_FOREACH_SAFE(bdev_io, queue, link, tmp) {
1009 		if (bdev_io->ch == ch) {
1010 			TAILQ_REMOVE(queue, bdev_io, link);
1011 			/*
1012 			 * spdk_bdev_io_complete() assumes that the completed I/O had
1013 			 *  been submitted to the bdev module.  Since in this case it
1014 			 *  hadn't, bump io_outstanding to account for the decrement
1015 			 *  that spdk_bdev_io_complete() will do.
1016 			 */
1017 			if (bdev_io->type != SPDK_BDEV_IO_TYPE_RESET) {
1018 				ch->module_ch->io_outstanding++;
1019 			}
1020 			spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED);
1021 		}
1022 	}
1023 }
1024 
1025 static void
1026 _spdk_bdev_channel_destroy(struct spdk_bdev_channel *ch)
1027 {
1028 	struct spdk_bdev_mgmt_channel	*mgmt_channel;
1029 	struct spdk_bdev_module_channel	*shared_ch = ch->module_ch;
1030 
1031 	mgmt_channel = spdk_io_channel_get_ctx(ch->mgmt_channel);
1032 
1033 	_spdk_bdev_abort_queued_io(&ch->queued_resets, ch);
1034 	_spdk_bdev_abort_queued_io(&shared_ch->nomem_io, ch);
1035 	_spdk_bdev_abort_buf_io(&mgmt_channel->need_buf_small, ch);
1036 	_spdk_bdev_abort_buf_io(&mgmt_channel->need_buf_large, ch);
1037 
1038 	_spdk_bdev_channel_destroy_resource(ch);
1039 }
1040 
1041 static void
1042 spdk_bdev_channel_destroy(void *io_device, void *ctx_buf)
1043 {
1044 	struct spdk_bdev_channel	*ch = ctx_buf;
1045 
1046 	_spdk_bdev_channel_destroy(ch);
1047 }
1048 
1049 int
1050 spdk_bdev_alias_add(struct spdk_bdev *bdev, const char *alias)
1051 {
1052 	struct spdk_bdev_alias *tmp;
1053 
1054 	if (alias == NULL) {
1055 		SPDK_ERRLOG("Empty alias passed\n");
1056 		return -EINVAL;
1057 	}
1058 
1059 	if (spdk_bdev_get_by_name(alias)) {
1060 		SPDK_ERRLOG("Bdev name/alias: %s already exists\n", alias);
1061 		return -EEXIST;
1062 	}
1063 
1064 	tmp = calloc(1, sizeof(*tmp));
1065 	if (tmp == NULL) {
1066 		SPDK_ERRLOG("Unable to allocate alias\n");
1067 		return -ENOMEM;
1068 	}
1069 
1070 	tmp->alias = strdup(alias);
1071 	if (tmp->alias == NULL) {
1072 		free(tmp);
1073 		SPDK_ERRLOG("Unable to allocate alias\n");
1074 		return -ENOMEM;
1075 	}
1076 
1077 	TAILQ_INSERT_TAIL(&bdev->aliases, tmp, tailq);
1078 
1079 	return 0;
1080 }
1081 
1082 int
1083 spdk_bdev_alias_del(struct spdk_bdev *bdev, const char *alias)
1084 {
1085 	struct spdk_bdev_alias *tmp;
1086 
1087 	TAILQ_FOREACH(tmp, &bdev->aliases, tailq) {
1088 		if (strcmp(alias, tmp->alias) == 0) {
1089 			TAILQ_REMOVE(&bdev->aliases, tmp, tailq);
1090 			free(tmp->alias);
1091 			free(tmp);
1092 			return 0;
1093 		}
1094 	}
1095 
1096 	SPDK_INFOLOG(SPDK_LOG_BDEV, "Alias %s does not exists\n", alias);
1097 
1098 	return -ENOENT;
1099 }
1100 
1101 struct spdk_io_channel *
1102 spdk_bdev_get_io_channel(struct spdk_bdev_desc *desc)
1103 {
1104 	return spdk_get_io_channel(desc->bdev);
1105 }
1106 
1107 const char *
1108 spdk_bdev_get_name(const struct spdk_bdev *bdev)
1109 {
1110 	return bdev->name;
1111 }
1112 
1113 const char *
1114 spdk_bdev_get_product_name(const struct spdk_bdev *bdev)
1115 {
1116 	return bdev->product_name;
1117 }
1118 
1119 const struct spdk_bdev_aliases_list *
1120 spdk_bdev_get_aliases(const struct spdk_bdev *bdev)
1121 {
1122 	return &bdev->aliases;
1123 }
1124 
1125 uint32_t
1126 spdk_bdev_get_block_size(const struct spdk_bdev *bdev)
1127 {
1128 	return bdev->blocklen;
1129 }
1130 
1131 uint64_t
1132 spdk_bdev_get_num_blocks(const struct spdk_bdev *bdev)
1133 {
1134 	return bdev->blockcnt;
1135 }
1136 
1137 size_t
1138 spdk_bdev_get_buf_align(const struct spdk_bdev *bdev)
1139 {
1140 	/* TODO: push this logic down to the bdev modules */
1141 	if (bdev->need_aligned_buffer) {
1142 		return bdev->blocklen;
1143 	}
1144 
1145 	return 1;
1146 }
1147 
1148 uint32_t
1149 spdk_bdev_get_optimal_io_boundary(const struct spdk_bdev *bdev)
1150 {
1151 	return bdev->optimal_io_boundary;
1152 }
1153 
1154 bool
1155 spdk_bdev_has_write_cache(const struct spdk_bdev *bdev)
1156 {
1157 	return bdev->write_cache;
1158 }
1159 
1160 int
1161 spdk_bdev_notify_blockcnt_change(struct spdk_bdev *bdev, uint64_t size)
1162 {
1163 	int ret;
1164 
1165 	pthread_mutex_lock(&bdev->mutex);
1166 
1167 	/* bdev has open descriptors */
1168 	if (!TAILQ_EMPTY(&bdev->open_descs) &&
1169 	    bdev->blockcnt > size) {
1170 		ret = -EBUSY;
1171 	} else {
1172 		bdev->blockcnt = size;
1173 		ret = 0;
1174 	}
1175 
1176 	pthread_mutex_unlock(&bdev->mutex);
1177 
1178 	return ret;
1179 }
1180 
1181 /*
1182  * Convert I/O offset and length from bytes to blocks.
1183  *
1184  * Returns zero on success or non-zero if the byte parameters aren't divisible by the block size.
1185  */
1186 static uint64_t
1187 spdk_bdev_bytes_to_blocks(struct spdk_bdev *bdev, uint64_t offset_bytes, uint64_t *offset_blocks,
1188 			  uint64_t num_bytes, uint64_t *num_blocks)
1189 {
1190 	uint32_t block_size = bdev->blocklen;
1191 
1192 	*offset_blocks = offset_bytes / block_size;
1193 	*num_blocks = num_bytes / block_size;
1194 
1195 	return (offset_bytes % block_size) | (num_bytes % block_size);
1196 }
1197 
1198 static bool
1199 spdk_bdev_io_valid_blocks(struct spdk_bdev *bdev, uint64_t offset_blocks, uint64_t num_blocks)
1200 {
1201 	/* Return failure if offset_blocks + num_blocks is less than offset_blocks; indicates there
1202 	 * has been an overflow and hence the offset has been wrapped around */
1203 	if (offset_blocks + num_blocks < offset_blocks) {
1204 		return false;
1205 	}
1206 
1207 	/* Return failure if offset_blocks + num_blocks exceeds the size of the bdev */
1208 	if (offset_blocks + num_blocks > bdev->blockcnt) {
1209 		return false;
1210 	}
1211 
1212 	return true;
1213 }
1214 
1215 int
1216 spdk_bdev_read(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
1217 	       void *buf, uint64_t offset, uint64_t nbytes,
1218 	       spdk_bdev_io_completion_cb cb, void *cb_arg)
1219 {
1220 	uint64_t offset_blocks, num_blocks;
1221 
1222 	if (spdk_bdev_bytes_to_blocks(desc->bdev, offset, &offset_blocks, nbytes, &num_blocks) != 0) {
1223 		return -EINVAL;
1224 	}
1225 
1226 	return spdk_bdev_read_blocks(desc, ch, buf, offset_blocks, num_blocks, cb, cb_arg);
1227 }
1228 
1229 int
1230 spdk_bdev_read_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
1231 		      void *buf, uint64_t offset_blocks, uint64_t num_blocks,
1232 		      spdk_bdev_io_completion_cb cb, void *cb_arg)
1233 {
1234 	struct spdk_bdev *bdev = desc->bdev;
1235 	struct spdk_bdev_io *bdev_io;
1236 	struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch);
1237 
1238 	if (!spdk_bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) {
1239 		return -EINVAL;
1240 	}
1241 
1242 	bdev_io = spdk_bdev_get_io(channel->mgmt_channel);
1243 	if (!bdev_io) {
1244 		SPDK_ERRLOG("spdk_bdev_io memory allocation failed duing read\n");
1245 		return -ENOMEM;
1246 	}
1247 
1248 	bdev_io->ch = channel;
1249 	bdev_io->type = SPDK_BDEV_IO_TYPE_READ;
1250 	bdev_io->u.bdev.iov.iov_base = buf;
1251 	bdev_io->u.bdev.iov.iov_len = num_blocks * bdev->blocklen;
1252 	bdev_io->u.bdev.iovs = &bdev_io->u.bdev.iov;
1253 	bdev_io->u.bdev.iovcnt = 1;
1254 	bdev_io->u.bdev.num_blocks = num_blocks;
1255 	bdev_io->u.bdev.offset_blocks = offset_blocks;
1256 	spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb);
1257 
1258 	spdk_bdev_io_submit(bdev_io);
1259 	return 0;
1260 }
1261 
1262 int
1263 spdk_bdev_readv(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
1264 		struct iovec *iov, int iovcnt,
1265 		uint64_t offset, uint64_t nbytes,
1266 		spdk_bdev_io_completion_cb cb, void *cb_arg)
1267 {
1268 	uint64_t offset_blocks, num_blocks;
1269 
1270 	if (spdk_bdev_bytes_to_blocks(desc->bdev, offset, &offset_blocks, nbytes, &num_blocks) != 0) {
1271 		return -EINVAL;
1272 	}
1273 
1274 	return spdk_bdev_readv_blocks(desc, ch, iov, iovcnt, offset_blocks, num_blocks, cb, cb_arg);
1275 }
1276 
1277 int spdk_bdev_readv_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
1278 			   struct iovec *iov, int iovcnt,
1279 			   uint64_t offset_blocks, uint64_t num_blocks,
1280 			   spdk_bdev_io_completion_cb cb, void *cb_arg)
1281 {
1282 	struct spdk_bdev *bdev = desc->bdev;
1283 	struct spdk_bdev_io *bdev_io;
1284 	struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch);
1285 
1286 	if (!spdk_bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) {
1287 		return -EINVAL;
1288 	}
1289 
1290 	bdev_io = spdk_bdev_get_io(channel->mgmt_channel);
1291 	if (!bdev_io) {
1292 		SPDK_ERRLOG("spdk_bdev_io memory allocation failed duing read\n");
1293 		return -ENOMEM;
1294 	}
1295 
1296 	bdev_io->ch = channel;
1297 	bdev_io->type = SPDK_BDEV_IO_TYPE_READ;
1298 	bdev_io->u.bdev.iovs = iov;
1299 	bdev_io->u.bdev.iovcnt = iovcnt;
1300 	bdev_io->u.bdev.num_blocks = num_blocks;
1301 	bdev_io->u.bdev.offset_blocks = offset_blocks;
1302 	spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb);
1303 
1304 	spdk_bdev_io_submit(bdev_io);
1305 	return 0;
1306 }
1307 
1308 int
1309 spdk_bdev_write(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
1310 		void *buf, uint64_t offset, uint64_t nbytes,
1311 		spdk_bdev_io_completion_cb cb, void *cb_arg)
1312 {
1313 	uint64_t offset_blocks, num_blocks;
1314 
1315 	if (spdk_bdev_bytes_to_blocks(desc->bdev, offset, &offset_blocks, nbytes, &num_blocks) != 0) {
1316 		return -EINVAL;
1317 	}
1318 
1319 	return spdk_bdev_write_blocks(desc, ch, buf, offset_blocks, num_blocks, cb, cb_arg);
1320 }
1321 
1322 int
1323 spdk_bdev_write_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
1324 		       void *buf, uint64_t offset_blocks, uint64_t num_blocks,
1325 		       spdk_bdev_io_completion_cb cb, void *cb_arg)
1326 {
1327 	struct spdk_bdev *bdev = desc->bdev;
1328 	struct spdk_bdev_io *bdev_io;
1329 	struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch);
1330 
1331 	if (!desc->write) {
1332 		return -EBADF;
1333 	}
1334 
1335 	if (!spdk_bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) {
1336 		return -EINVAL;
1337 	}
1338 
1339 	bdev_io = spdk_bdev_get_io(channel->mgmt_channel);
1340 	if (!bdev_io) {
1341 		SPDK_ERRLOG("bdev_io memory allocation failed duing write\n");
1342 		return -ENOMEM;
1343 	}
1344 
1345 	bdev_io->ch = channel;
1346 	bdev_io->type = SPDK_BDEV_IO_TYPE_WRITE;
1347 	bdev_io->u.bdev.iov.iov_base = buf;
1348 	bdev_io->u.bdev.iov.iov_len = num_blocks * bdev->blocklen;
1349 	bdev_io->u.bdev.iovs = &bdev_io->u.bdev.iov;
1350 	bdev_io->u.bdev.iovcnt = 1;
1351 	bdev_io->u.bdev.num_blocks = num_blocks;
1352 	bdev_io->u.bdev.offset_blocks = offset_blocks;
1353 	spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb);
1354 
1355 	spdk_bdev_io_submit(bdev_io);
1356 	return 0;
1357 }
1358 
1359 int
1360 spdk_bdev_writev(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
1361 		 struct iovec *iov, int iovcnt,
1362 		 uint64_t offset, uint64_t len,
1363 		 spdk_bdev_io_completion_cb cb, void *cb_arg)
1364 {
1365 	uint64_t offset_blocks, num_blocks;
1366 
1367 	if (spdk_bdev_bytes_to_blocks(desc->bdev, offset, &offset_blocks, len, &num_blocks) != 0) {
1368 		return -EINVAL;
1369 	}
1370 
1371 	return spdk_bdev_writev_blocks(desc, ch, iov, iovcnt, offset_blocks, num_blocks, cb, cb_arg);
1372 }
1373 
1374 int
1375 spdk_bdev_writev_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
1376 			struct iovec *iov, int iovcnt,
1377 			uint64_t offset_blocks, uint64_t num_blocks,
1378 			spdk_bdev_io_completion_cb cb, void *cb_arg)
1379 {
1380 	struct spdk_bdev *bdev = desc->bdev;
1381 	struct spdk_bdev_io *bdev_io;
1382 	struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch);
1383 
1384 	if (!desc->write) {
1385 		return -EBADF;
1386 	}
1387 
1388 	if (!spdk_bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) {
1389 		return -EINVAL;
1390 	}
1391 
1392 	bdev_io = spdk_bdev_get_io(channel->mgmt_channel);
1393 	if (!bdev_io) {
1394 		SPDK_ERRLOG("bdev_io memory allocation failed duing writev\n");
1395 		return -ENOMEM;
1396 	}
1397 
1398 	bdev_io->ch = channel;
1399 	bdev_io->type = SPDK_BDEV_IO_TYPE_WRITE;
1400 	bdev_io->u.bdev.iovs = iov;
1401 	bdev_io->u.bdev.iovcnt = iovcnt;
1402 	bdev_io->u.bdev.num_blocks = num_blocks;
1403 	bdev_io->u.bdev.offset_blocks = offset_blocks;
1404 	spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb);
1405 
1406 	spdk_bdev_io_submit(bdev_io);
1407 	return 0;
1408 }
1409 
1410 int
1411 spdk_bdev_write_zeroes(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
1412 		       uint64_t offset, uint64_t len,
1413 		       spdk_bdev_io_completion_cb cb, void *cb_arg)
1414 {
1415 	uint64_t offset_blocks, num_blocks;
1416 
1417 	if (spdk_bdev_bytes_to_blocks(desc->bdev, offset, &offset_blocks, len, &num_blocks) != 0) {
1418 		return -EINVAL;
1419 	}
1420 
1421 	return spdk_bdev_write_zeroes_blocks(desc, ch, offset_blocks, num_blocks, cb, cb_arg);
1422 }
1423 
1424 int
1425 spdk_bdev_write_zeroes_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
1426 			      uint64_t offset_blocks, uint64_t num_blocks,
1427 			      spdk_bdev_io_completion_cb cb, void *cb_arg)
1428 {
1429 	struct spdk_bdev *bdev = desc->bdev;
1430 	struct spdk_bdev_io *bdev_io;
1431 	struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch);
1432 	uint64_t len;
1433 	bool split_request = false;
1434 
1435 	if (num_blocks > UINT64_MAX / spdk_bdev_get_block_size(bdev)) {
1436 		SPDK_ERRLOG("length argument out of range in write_zeroes\n");
1437 		return -ERANGE;
1438 	}
1439 
1440 	if (!spdk_bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) {
1441 		return -EINVAL;
1442 	}
1443 
1444 	bdev_io = spdk_bdev_get_io(channel->mgmt_channel);
1445 
1446 	if (!bdev_io) {
1447 		SPDK_ERRLOG("bdev_io memory allocation failed duing write_zeroes\n");
1448 		return -ENOMEM;
1449 	}
1450 
1451 	bdev_io->ch = channel;
1452 	bdev_io->u.bdev.offset_blocks = offset_blocks;
1453 
1454 	if (spdk_bdev_io_type_supported(bdev, SPDK_BDEV_IO_TYPE_WRITE_ZEROES)) {
1455 		bdev_io->type = SPDK_BDEV_IO_TYPE_WRITE_ZEROES;
1456 		bdev_io->u.bdev.num_blocks = num_blocks;
1457 		bdev_io->u.bdev.iovs = NULL;
1458 		bdev_io->u.bdev.iovcnt = 0;
1459 
1460 	} else {
1461 		assert(spdk_bdev_get_block_size(bdev) <= ZERO_BUFFER_SIZE);
1462 
1463 		len = spdk_bdev_get_block_size(bdev) * num_blocks;
1464 
1465 		if (len > ZERO_BUFFER_SIZE) {
1466 			split_request = true;
1467 			len = ZERO_BUFFER_SIZE;
1468 		}
1469 
1470 		bdev_io->type = SPDK_BDEV_IO_TYPE_WRITE;
1471 		bdev_io->u.bdev.iov.iov_base = g_bdev_mgr.zero_buffer;
1472 		bdev_io->u.bdev.iov.iov_len = len;
1473 		bdev_io->u.bdev.iovs = &bdev_io->u.bdev.iov;
1474 		bdev_io->u.bdev.iovcnt = 1;
1475 		bdev_io->u.bdev.num_blocks = len / spdk_bdev_get_block_size(bdev);
1476 		bdev_io->split_remaining_num_blocks = num_blocks - bdev_io->u.bdev.num_blocks;
1477 		bdev_io->split_current_offset_blocks = offset_blocks + bdev_io->u.bdev.num_blocks;
1478 	}
1479 
1480 	if (split_request) {
1481 		bdev_io->stored_user_cb = cb;
1482 		spdk_bdev_io_init(bdev_io, bdev, cb_arg, spdk_bdev_write_zeroes_split);
1483 	} else {
1484 		spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb);
1485 	}
1486 	spdk_bdev_io_submit(bdev_io);
1487 	return 0;
1488 }
1489 
1490 int
1491 spdk_bdev_unmap(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
1492 		uint64_t offset, uint64_t nbytes,
1493 		spdk_bdev_io_completion_cb cb, void *cb_arg)
1494 {
1495 	uint64_t offset_blocks, num_blocks;
1496 
1497 	if (spdk_bdev_bytes_to_blocks(desc->bdev, offset, &offset_blocks, nbytes, &num_blocks) != 0) {
1498 		return -EINVAL;
1499 	}
1500 
1501 	return spdk_bdev_unmap_blocks(desc, ch, offset_blocks, num_blocks, cb, cb_arg);
1502 }
1503 
1504 int
1505 spdk_bdev_unmap_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
1506 		       uint64_t offset_blocks, uint64_t num_blocks,
1507 		       spdk_bdev_io_completion_cb cb, void *cb_arg)
1508 {
1509 	struct spdk_bdev *bdev = desc->bdev;
1510 	struct spdk_bdev_io *bdev_io;
1511 	struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch);
1512 
1513 	if (!desc->write) {
1514 		return -EBADF;
1515 	}
1516 
1517 	if (!spdk_bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) {
1518 		return -EINVAL;
1519 	}
1520 
1521 	if (num_blocks == 0) {
1522 		SPDK_ERRLOG("Can't unmap 0 bytes\n");
1523 		return -EINVAL;
1524 	}
1525 
1526 	bdev_io = spdk_bdev_get_io(channel->mgmt_channel);
1527 	if (!bdev_io) {
1528 		SPDK_ERRLOG("bdev_io memory allocation failed duing unmap\n");
1529 		return -ENOMEM;
1530 	}
1531 
1532 	bdev_io->ch = channel;
1533 	bdev_io->type = SPDK_BDEV_IO_TYPE_UNMAP;
1534 	bdev_io->u.bdev.iov.iov_base = NULL;
1535 	bdev_io->u.bdev.iov.iov_len = 0;
1536 	bdev_io->u.bdev.iovs = &bdev_io->u.bdev.iov;
1537 	bdev_io->u.bdev.iovcnt = 1;
1538 	bdev_io->u.bdev.offset_blocks = offset_blocks;
1539 	bdev_io->u.bdev.num_blocks = num_blocks;
1540 	spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb);
1541 
1542 	spdk_bdev_io_submit(bdev_io);
1543 	return 0;
1544 }
1545 
1546 int
1547 spdk_bdev_flush(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
1548 		uint64_t offset, uint64_t length,
1549 		spdk_bdev_io_completion_cb cb, void *cb_arg)
1550 {
1551 	uint64_t offset_blocks, num_blocks;
1552 
1553 	if (spdk_bdev_bytes_to_blocks(desc->bdev, offset, &offset_blocks, length, &num_blocks) != 0) {
1554 		return -EINVAL;
1555 	}
1556 
1557 	return spdk_bdev_flush_blocks(desc, ch, offset_blocks, num_blocks, cb, cb_arg);
1558 }
1559 
1560 int
1561 spdk_bdev_flush_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
1562 		       uint64_t offset_blocks, uint64_t num_blocks,
1563 		       spdk_bdev_io_completion_cb cb, void *cb_arg)
1564 {
1565 	struct spdk_bdev *bdev = desc->bdev;
1566 	struct spdk_bdev_io *bdev_io;
1567 	struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch);
1568 
1569 	if (!desc->write) {
1570 		return -EBADF;
1571 	}
1572 
1573 	if (!spdk_bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) {
1574 		return -EINVAL;
1575 	}
1576 
1577 	bdev_io = spdk_bdev_get_io(channel->mgmt_channel);
1578 	if (!bdev_io) {
1579 		SPDK_ERRLOG("bdev_io memory allocation failed duing flush\n");
1580 		return -ENOMEM;
1581 	}
1582 
1583 	bdev_io->ch = channel;
1584 	bdev_io->type = SPDK_BDEV_IO_TYPE_FLUSH;
1585 	bdev_io->u.bdev.iovs = NULL;
1586 	bdev_io->u.bdev.iovcnt = 0;
1587 	bdev_io->u.bdev.offset_blocks = offset_blocks;
1588 	bdev_io->u.bdev.num_blocks = num_blocks;
1589 	spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb);
1590 
1591 	spdk_bdev_io_submit(bdev_io);
1592 	return 0;
1593 }
1594 
1595 static void
1596 _spdk_bdev_reset_dev(struct spdk_io_channel_iter *i, int status)
1597 {
1598 	struct spdk_bdev_channel *ch = spdk_io_channel_iter_get_ctx(i);
1599 	struct spdk_bdev_io *bdev_io;
1600 
1601 	bdev_io = TAILQ_FIRST(&ch->queued_resets);
1602 	TAILQ_REMOVE(&ch->queued_resets, bdev_io, link);
1603 	spdk_bdev_io_submit_reset(bdev_io);
1604 }
1605 
1606 static void
1607 _spdk_bdev_reset_freeze_channel(struct spdk_io_channel_iter *i)
1608 {
1609 	struct spdk_io_channel 		*ch;
1610 	struct spdk_bdev_channel	*channel;
1611 	struct spdk_bdev_mgmt_channel	*mgmt_channel;
1612 	struct spdk_bdev_module_channel	*shared_ch;
1613 
1614 	ch = spdk_io_channel_iter_get_channel(i);
1615 	channel = spdk_io_channel_get_ctx(ch);
1616 	mgmt_channel = spdk_io_channel_get_ctx(channel->mgmt_channel);
1617 	shared_ch = channel->module_ch;
1618 
1619 	channel->flags |= BDEV_CH_RESET_IN_PROGRESS;
1620 
1621 	_spdk_bdev_abort_queued_io(&shared_ch->nomem_io, channel);
1622 	_spdk_bdev_abort_buf_io(&mgmt_channel->need_buf_small, channel);
1623 	_spdk_bdev_abort_buf_io(&mgmt_channel->need_buf_large, channel);
1624 
1625 	spdk_for_each_channel_continue(i, 0);
1626 }
1627 
1628 static void
1629 _spdk_bdev_start_reset(void *ctx)
1630 {
1631 	struct spdk_bdev_channel *ch = ctx;
1632 
1633 	spdk_for_each_channel(ch->bdev, _spdk_bdev_reset_freeze_channel,
1634 			      ch, _spdk_bdev_reset_dev);
1635 }
1636 
1637 static void
1638 _spdk_bdev_channel_start_reset(struct spdk_bdev_channel *ch)
1639 {
1640 	struct spdk_bdev *bdev = ch->bdev;
1641 
1642 	assert(!TAILQ_EMPTY(&ch->queued_resets));
1643 
1644 	pthread_mutex_lock(&bdev->mutex);
1645 	if (bdev->reset_in_progress == NULL) {
1646 		bdev->reset_in_progress = TAILQ_FIRST(&ch->queued_resets);
1647 		/*
1648 		 * Take a channel reference for the target bdev for the life of this
1649 		 *  reset.  This guards against the channel getting destroyed while
1650 		 *  spdk_for_each_channel() calls related to this reset IO are in
1651 		 *  progress.  We will release the reference when this reset is
1652 		 *  completed.
1653 		 */
1654 		bdev->reset_in_progress->u.reset.ch_ref = spdk_get_io_channel(bdev);
1655 		_spdk_bdev_start_reset(ch);
1656 	}
1657 	pthread_mutex_unlock(&bdev->mutex);
1658 }
1659 
1660 int
1661 spdk_bdev_reset(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
1662 		spdk_bdev_io_completion_cb cb, void *cb_arg)
1663 {
1664 	struct spdk_bdev *bdev = desc->bdev;
1665 	struct spdk_bdev_io *bdev_io;
1666 	struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch);
1667 
1668 	bdev_io = spdk_bdev_get_io(channel->mgmt_channel);
1669 	if (!bdev_io) {
1670 		SPDK_ERRLOG("bdev_io memory allocation failed duing reset\n");
1671 		return -ENOMEM;
1672 	}
1673 
1674 	bdev_io->ch = channel;
1675 	bdev_io->type = SPDK_BDEV_IO_TYPE_RESET;
1676 	bdev_io->u.reset.ch_ref = NULL;
1677 	spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb);
1678 
1679 	pthread_mutex_lock(&bdev->mutex);
1680 	TAILQ_INSERT_TAIL(&channel->queued_resets, bdev_io, link);
1681 	pthread_mutex_unlock(&bdev->mutex);
1682 
1683 	_spdk_bdev_channel_start_reset(channel);
1684 
1685 	return 0;
1686 }
1687 
1688 void
1689 spdk_bdev_get_io_stat(struct spdk_bdev *bdev, struct spdk_io_channel *ch,
1690 		      struct spdk_bdev_io_stat *stat)
1691 {
1692 #ifdef SPDK_CONFIG_VTUNE
1693 	SPDK_ERRLOG("Calling spdk_bdev_get_io_stat is not allowed when VTune integration is enabled.\n");
1694 	memset(stat, 0, sizeof(*stat));
1695 	return;
1696 #endif
1697 
1698 	struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch);
1699 
1700 	channel->stat.ticks_rate = spdk_get_ticks_hz();
1701 	*stat = channel->stat;
1702 	memset(&channel->stat, 0, sizeof(channel->stat));
1703 }
1704 
1705 int
1706 spdk_bdev_nvme_admin_passthru(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
1707 			      const struct spdk_nvme_cmd *cmd, void *buf, size_t nbytes,
1708 			      spdk_bdev_io_completion_cb cb, void *cb_arg)
1709 {
1710 	struct spdk_bdev *bdev = desc->bdev;
1711 	struct spdk_bdev_io *bdev_io;
1712 	struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch);
1713 
1714 	if (!desc->write) {
1715 		return -EBADF;
1716 	}
1717 
1718 	bdev_io = spdk_bdev_get_io(channel->mgmt_channel);
1719 	if (!bdev_io) {
1720 		SPDK_ERRLOG("bdev_io memory allocation failed during nvme_admin_passthru\n");
1721 		return -ENOMEM;
1722 	}
1723 
1724 	bdev_io->ch = channel;
1725 	bdev_io->type = SPDK_BDEV_IO_TYPE_NVME_ADMIN;
1726 	bdev_io->u.nvme_passthru.cmd = *cmd;
1727 	bdev_io->u.nvme_passthru.buf = buf;
1728 	bdev_io->u.nvme_passthru.nbytes = nbytes;
1729 	bdev_io->u.nvme_passthru.md_buf = NULL;
1730 	bdev_io->u.nvme_passthru.md_len = 0;
1731 
1732 	spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb);
1733 
1734 	spdk_bdev_io_submit(bdev_io);
1735 	return 0;
1736 }
1737 
1738 int
1739 spdk_bdev_nvme_io_passthru(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
1740 			   const struct spdk_nvme_cmd *cmd, void *buf, size_t nbytes,
1741 			   spdk_bdev_io_completion_cb cb, void *cb_arg)
1742 {
1743 	struct spdk_bdev *bdev = desc->bdev;
1744 	struct spdk_bdev_io *bdev_io;
1745 	struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch);
1746 
1747 	if (!desc->write) {
1748 		/*
1749 		 * Do not try to parse the NVMe command - we could maybe use bits in the opcode
1750 		 *  to easily determine if the command is a read or write, but for now just
1751 		 *  do not allow io_passthru with a read-only descriptor.
1752 		 */
1753 		return -EBADF;
1754 	}
1755 
1756 	bdev_io = spdk_bdev_get_io(channel->mgmt_channel);
1757 	if (!bdev_io) {
1758 		SPDK_ERRLOG("bdev_io memory allocation failed during nvme_admin_passthru\n");
1759 		return -ENOMEM;
1760 	}
1761 
1762 	bdev_io->ch = channel;
1763 	bdev_io->type = SPDK_BDEV_IO_TYPE_NVME_IO;
1764 	bdev_io->u.nvme_passthru.cmd = *cmd;
1765 	bdev_io->u.nvme_passthru.buf = buf;
1766 	bdev_io->u.nvme_passthru.nbytes = nbytes;
1767 	bdev_io->u.nvme_passthru.md_buf = NULL;
1768 	bdev_io->u.nvme_passthru.md_len = 0;
1769 
1770 	spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb);
1771 
1772 	spdk_bdev_io_submit(bdev_io);
1773 	return 0;
1774 }
1775 
1776 int
1777 spdk_bdev_nvme_io_passthru_md(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
1778 			      const struct spdk_nvme_cmd *cmd, void *buf, size_t nbytes, void *md_buf, size_t md_len,
1779 			      spdk_bdev_io_completion_cb cb, void *cb_arg)
1780 {
1781 	struct spdk_bdev *bdev = desc->bdev;
1782 	struct spdk_bdev_io *bdev_io;
1783 	struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch);
1784 
1785 	if (!desc->write) {
1786 		/*
1787 		 * Do not try to parse the NVMe command - we could maybe use bits in the opcode
1788 		 *  to easily determine if the command is a read or write, but for now just
1789 		 *  do not allow io_passthru with a read-only descriptor.
1790 		 */
1791 		return -EBADF;
1792 	}
1793 
1794 	bdev_io = spdk_bdev_get_io(channel->mgmt_channel);
1795 	if (!bdev_io) {
1796 		SPDK_ERRLOG("bdev_io memory allocation failed during nvme_admin_passthru\n");
1797 		return -ENOMEM;
1798 	}
1799 
1800 	bdev_io->ch = channel;
1801 	bdev_io->type = SPDK_BDEV_IO_TYPE_NVME_IO_MD;
1802 	bdev_io->u.nvme_passthru.cmd = *cmd;
1803 	bdev_io->u.nvme_passthru.buf = buf;
1804 	bdev_io->u.nvme_passthru.nbytes = nbytes;
1805 	bdev_io->u.nvme_passthru.md_buf = md_buf;
1806 	bdev_io->u.nvme_passthru.md_len = md_len;
1807 
1808 	spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb);
1809 
1810 	spdk_bdev_io_submit(bdev_io);
1811 	return 0;
1812 }
1813 
1814 int
1815 spdk_bdev_free_io(struct spdk_bdev_io *bdev_io)
1816 {
1817 	if (!bdev_io) {
1818 		SPDK_ERRLOG("bdev_io is NULL\n");
1819 		return -1;
1820 	}
1821 
1822 	if (bdev_io->status == SPDK_BDEV_IO_STATUS_PENDING) {
1823 		SPDK_ERRLOG("bdev_io is in pending state\n");
1824 		assert(false);
1825 		return -1;
1826 	}
1827 
1828 	spdk_bdev_put_io(bdev_io);
1829 
1830 	return 0;
1831 }
1832 
1833 static void
1834 _spdk_bdev_ch_retry_io(struct spdk_bdev_channel *bdev_ch)
1835 {
1836 	struct spdk_bdev *bdev = bdev_ch->bdev;
1837 	struct spdk_bdev_module_channel	*shared_ch = bdev_ch->module_ch;
1838 	struct spdk_bdev_io *bdev_io;
1839 
1840 	if (shared_ch->io_outstanding > shared_ch->nomem_threshold) {
1841 		/*
1842 		 * Allow some more I/O to complete before retrying the nomem_io queue.
1843 		 *  Some drivers (such as nvme) cannot immediately take a new I/O in
1844 		 *  the context of a completion, because the resources for the I/O are
1845 		 *  not released until control returns to the bdev poller.  Also, we
1846 		 *  may require several small I/O to complete before a larger I/O
1847 		 *  (that requires splitting) can be submitted.
1848 		 */
1849 		return;
1850 	}
1851 
1852 	while (!TAILQ_EMPTY(&shared_ch->nomem_io)) {
1853 		bdev_io = TAILQ_FIRST(&shared_ch->nomem_io);
1854 		TAILQ_REMOVE(&shared_ch->nomem_io, bdev_io, link);
1855 		shared_ch->io_outstanding++;
1856 		bdev_io->status = SPDK_BDEV_IO_STATUS_PENDING;
1857 		bdev->fn_table->submit_request(bdev_io->ch->channel, bdev_io);
1858 		if (bdev_io->status == SPDK_BDEV_IO_STATUS_NOMEM) {
1859 			break;
1860 		}
1861 	}
1862 }
1863 
1864 static void
1865 _spdk_bdev_io_complete(void *ctx)
1866 {
1867 	struct spdk_bdev_io *bdev_io = ctx;
1868 
1869 	assert(bdev_io->cb != NULL);
1870 	bdev_io->cb(bdev_io, bdev_io->status == SPDK_BDEV_IO_STATUS_SUCCESS, bdev_io->caller_ctx);
1871 }
1872 
1873 static void
1874 _spdk_bdev_reset_complete(struct spdk_io_channel_iter *i, int status)
1875 {
1876 	struct spdk_bdev_io *bdev_io = spdk_io_channel_iter_get_ctx(i);
1877 
1878 	if (bdev_io->u.reset.ch_ref != NULL) {
1879 		spdk_put_io_channel(bdev_io->u.reset.ch_ref);
1880 		bdev_io->u.reset.ch_ref = NULL;
1881 	}
1882 
1883 	_spdk_bdev_io_complete(bdev_io);
1884 }
1885 
1886 static void
1887 _spdk_bdev_unfreeze_channel(struct spdk_io_channel_iter *i)
1888 {
1889 	struct spdk_io_channel *_ch = spdk_io_channel_iter_get_channel(i);
1890 	struct spdk_bdev_channel *ch = spdk_io_channel_get_ctx(_ch);
1891 
1892 	ch->flags &= ~BDEV_CH_RESET_IN_PROGRESS;
1893 	if (!TAILQ_EMPTY(&ch->queued_resets)) {
1894 		_spdk_bdev_channel_start_reset(ch);
1895 	}
1896 
1897 	spdk_for_each_channel_continue(i, 0);
1898 }
1899 
1900 void
1901 spdk_bdev_io_complete(struct spdk_bdev_io *bdev_io, enum spdk_bdev_io_status status)
1902 {
1903 	struct spdk_bdev *bdev = bdev_io->bdev;
1904 	struct spdk_bdev_channel *bdev_ch = bdev_io->ch;
1905 	struct spdk_bdev_module_channel	*shared_ch = bdev_ch->module_ch;
1906 
1907 	bdev_io->status = status;
1908 
1909 	if (spdk_unlikely(bdev_io->type == SPDK_BDEV_IO_TYPE_RESET)) {
1910 		bool unlock_channels = false;
1911 
1912 		if (status == SPDK_BDEV_IO_STATUS_NOMEM) {
1913 			SPDK_ERRLOG("NOMEM returned for reset\n");
1914 		}
1915 		pthread_mutex_lock(&bdev->mutex);
1916 		if (bdev_io == bdev->reset_in_progress) {
1917 			bdev->reset_in_progress = NULL;
1918 			unlock_channels = true;
1919 		}
1920 		pthread_mutex_unlock(&bdev->mutex);
1921 
1922 		if (unlock_channels) {
1923 			spdk_for_each_channel(bdev, _spdk_bdev_unfreeze_channel, bdev_io,
1924 					      _spdk_bdev_reset_complete);
1925 			return;
1926 		}
1927 	} else {
1928 		assert(shared_ch->io_outstanding > 0);
1929 		shared_ch->io_outstanding--;
1930 		if (spdk_likely(status != SPDK_BDEV_IO_STATUS_NOMEM)) {
1931 			if (spdk_unlikely(!TAILQ_EMPTY(&shared_ch->nomem_io))) {
1932 				_spdk_bdev_ch_retry_io(bdev_ch);
1933 			}
1934 		} else {
1935 			TAILQ_INSERT_HEAD(&shared_ch->nomem_io, bdev_io, link);
1936 			/*
1937 			 * Wait for some of the outstanding I/O to complete before we
1938 			 *  retry any of the nomem_io.  Normally we will wait for
1939 			 *  NOMEM_THRESHOLD_COUNT I/O to complete but for low queue
1940 			 *  depth channels we will instead wait for half to complete.
1941 			 */
1942 			shared_ch->nomem_threshold = spdk_max((int64_t)shared_ch->io_outstanding / 2,
1943 							      (int64_t)shared_ch->io_outstanding - NOMEM_THRESHOLD_COUNT);
1944 			return;
1945 		}
1946 	}
1947 
1948 	if (status == SPDK_BDEV_IO_STATUS_SUCCESS) {
1949 		switch (bdev_io->type) {
1950 		case SPDK_BDEV_IO_TYPE_READ:
1951 			bdev_ch->stat.bytes_read += bdev_io->u.bdev.num_blocks * bdev->blocklen;
1952 			bdev_ch->stat.num_read_ops++;
1953 			bdev_ch->stat.read_latency_ticks += (spdk_get_ticks() - bdev_io->submit_tsc);
1954 			break;
1955 		case SPDK_BDEV_IO_TYPE_WRITE:
1956 			bdev_ch->stat.bytes_written += bdev_io->u.bdev.num_blocks * bdev->blocklen;
1957 			bdev_ch->stat.num_write_ops++;
1958 			bdev_ch->stat.write_latency_ticks += (spdk_get_ticks() - bdev_io->submit_tsc);
1959 			break;
1960 		default:
1961 			break;
1962 		}
1963 	}
1964 
1965 #ifdef SPDK_CONFIG_VTUNE
1966 	uint64_t now_tsc = spdk_get_ticks();
1967 	if (now_tsc > (bdev_ch->start_tsc + bdev_ch->interval_tsc)) {
1968 		uint64_t data[5];
1969 
1970 		data[0] = bdev_ch->stat.num_read_ops;
1971 		data[1] = bdev_ch->stat.bytes_read;
1972 		data[2] = bdev_ch->stat.num_write_ops;
1973 		data[3] = bdev_ch->stat.bytes_written;
1974 		data[4] = bdev->fn_table->get_spin_time ?
1975 			  bdev->fn_table->get_spin_time(bdev_ch->channel) : 0;
1976 
1977 		__itt_metadata_add(g_bdev_mgr.domain, __itt_null, bdev_ch->handle,
1978 				   __itt_metadata_u64, 5, data);
1979 
1980 		memset(&bdev_ch->stat, 0, sizeof(bdev_ch->stat));
1981 		bdev_ch->start_tsc = now_tsc;
1982 	}
1983 #endif
1984 
1985 	if (bdev_io->in_submit_request) {
1986 		/*
1987 		 * Defer completion to avoid potential infinite recursion if the
1988 		 * user's completion callback issues a new I/O.
1989 		 */
1990 		spdk_thread_send_msg(spdk_io_channel_get_thread(bdev_ch->channel),
1991 				     _spdk_bdev_io_complete, bdev_io);
1992 	} else {
1993 		_spdk_bdev_io_complete(bdev_io);
1994 	}
1995 }
1996 
1997 void
1998 spdk_bdev_io_complete_scsi_status(struct spdk_bdev_io *bdev_io, enum spdk_scsi_status sc,
1999 				  enum spdk_scsi_sense sk, uint8_t asc, uint8_t ascq)
2000 {
2001 	if (sc == SPDK_SCSI_STATUS_GOOD) {
2002 		bdev_io->status = SPDK_BDEV_IO_STATUS_SUCCESS;
2003 	} else {
2004 		bdev_io->status = SPDK_BDEV_IO_STATUS_SCSI_ERROR;
2005 		bdev_io->error.scsi.sc = sc;
2006 		bdev_io->error.scsi.sk = sk;
2007 		bdev_io->error.scsi.asc = asc;
2008 		bdev_io->error.scsi.ascq = ascq;
2009 	}
2010 
2011 	spdk_bdev_io_complete(bdev_io, bdev_io->status);
2012 }
2013 
2014 void
2015 spdk_bdev_io_get_scsi_status(const struct spdk_bdev_io *bdev_io,
2016 			     int *sc, int *sk, int *asc, int *ascq)
2017 {
2018 	assert(sc != NULL);
2019 	assert(sk != NULL);
2020 	assert(asc != NULL);
2021 	assert(ascq != NULL);
2022 
2023 	switch (bdev_io->status) {
2024 	case SPDK_BDEV_IO_STATUS_SUCCESS:
2025 		*sc = SPDK_SCSI_STATUS_GOOD;
2026 		*sk = SPDK_SCSI_SENSE_NO_SENSE;
2027 		*asc = SPDK_SCSI_ASC_NO_ADDITIONAL_SENSE;
2028 		*ascq = SPDK_SCSI_ASCQ_CAUSE_NOT_REPORTABLE;
2029 		break;
2030 	case SPDK_BDEV_IO_STATUS_NVME_ERROR:
2031 		spdk_scsi_nvme_translate(bdev_io, sc, sk, asc, ascq);
2032 		break;
2033 	case SPDK_BDEV_IO_STATUS_SCSI_ERROR:
2034 		*sc = bdev_io->error.scsi.sc;
2035 		*sk = bdev_io->error.scsi.sk;
2036 		*asc = bdev_io->error.scsi.asc;
2037 		*ascq = bdev_io->error.scsi.ascq;
2038 		break;
2039 	default:
2040 		*sc = SPDK_SCSI_STATUS_CHECK_CONDITION;
2041 		*sk = SPDK_SCSI_SENSE_ABORTED_COMMAND;
2042 		*asc = SPDK_SCSI_ASC_NO_ADDITIONAL_SENSE;
2043 		*ascq = SPDK_SCSI_ASCQ_CAUSE_NOT_REPORTABLE;
2044 		break;
2045 	}
2046 }
2047 
2048 void
2049 spdk_bdev_io_complete_nvme_status(struct spdk_bdev_io *bdev_io, int sct, int sc)
2050 {
2051 	if (sct == SPDK_NVME_SCT_GENERIC && sc == SPDK_NVME_SC_SUCCESS) {
2052 		bdev_io->status = SPDK_BDEV_IO_STATUS_SUCCESS;
2053 	} else {
2054 		bdev_io->error.nvme.sct = sct;
2055 		bdev_io->error.nvme.sc = sc;
2056 		bdev_io->status = SPDK_BDEV_IO_STATUS_NVME_ERROR;
2057 	}
2058 
2059 	spdk_bdev_io_complete(bdev_io, bdev_io->status);
2060 }
2061 
2062 void
2063 spdk_bdev_io_get_nvme_status(const struct spdk_bdev_io *bdev_io, int *sct, int *sc)
2064 {
2065 	assert(sct != NULL);
2066 	assert(sc != NULL);
2067 
2068 	if (bdev_io->status == SPDK_BDEV_IO_STATUS_NVME_ERROR) {
2069 		*sct = bdev_io->error.nvme.sct;
2070 		*sc = bdev_io->error.nvme.sc;
2071 	} else if (bdev_io->status == SPDK_BDEV_IO_STATUS_SUCCESS) {
2072 		*sct = SPDK_NVME_SCT_GENERIC;
2073 		*sc = SPDK_NVME_SC_SUCCESS;
2074 	} else {
2075 		*sct = SPDK_NVME_SCT_GENERIC;
2076 		*sc = SPDK_NVME_SC_INTERNAL_DEVICE_ERROR;
2077 	}
2078 }
2079 
2080 struct spdk_thread *
2081 spdk_bdev_io_get_thread(struct spdk_bdev_io *bdev_io)
2082 {
2083 	return spdk_io_channel_get_thread(bdev_io->ch->channel);
2084 }
2085 
2086 static int
2087 _spdk_bdev_register(struct spdk_bdev *bdev)
2088 {
2089 	struct spdk_bdev_module_if *module;
2090 
2091 	assert(bdev->module != NULL);
2092 
2093 	if (!bdev->name) {
2094 		SPDK_ERRLOG("Bdev name is NULL\n");
2095 		return -EINVAL;
2096 	}
2097 
2098 	if (spdk_bdev_get_by_name(bdev->name)) {
2099 		SPDK_ERRLOG("Bdev name:%s already exists\n", bdev->name);
2100 		return -EEXIST;
2101 	}
2102 
2103 	bdev->status = SPDK_BDEV_STATUS_READY;
2104 
2105 	TAILQ_INIT(&bdev->open_descs);
2106 
2107 	TAILQ_INIT(&bdev->vbdevs);
2108 	TAILQ_INIT(&bdev->base_bdevs);
2109 
2110 	TAILQ_INIT(&bdev->aliases);
2111 
2112 	bdev->reset_in_progress = NULL;
2113 
2114 	spdk_io_device_register(bdev, spdk_bdev_channel_create, spdk_bdev_channel_destroy,
2115 				sizeof(struct spdk_bdev_channel));
2116 
2117 	pthread_mutex_init(&bdev->mutex, NULL);
2118 	SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Inserting bdev %s into list\n", bdev->name);
2119 	TAILQ_INSERT_TAIL(&g_bdev_mgr.bdevs, bdev, link);
2120 
2121 	TAILQ_FOREACH(module, &g_bdev_mgr.bdev_modules, tailq) {
2122 		if (module->examine) {
2123 			module->action_in_progress++;
2124 			module->examine(bdev);
2125 		}
2126 	}
2127 
2128 	return 0;
2129 }
2130 
2131 int
2132 spdk_bdev_register(struct spdk_bdev *bdev)
2133 {
2134 	return _spdk_bdev_register(bdev);
2135 }
2136 
2137 int
2138 spdk_vbdev_register(struct spdk_bdev *vbdev, struct spdk_bdev **base_bdevs, int base_bdev_count)
2139 {
2140 	int i, rc;
2141 
2142 	rc = _spdk_bdev_register(vbdev);
2143 	if (rc) {
2144 		return rc;
2145 	}
2146 
2147 	for (i = 0; i < base_bdev_count; i++) {
2148 		assert(base_bdevs[i] != NULL);
2149 		TAILQ_INSERT_TAIL(&vbdev->base_bdevs, base_bdevs[i], base_bdev_link);
2150 		TAILQ_INSERT_TAIL(&base_bdevs[i]->vbdevs, vbdev, vbdev_link);
2151 	}
2152 
2153 	return 0;
2154 }
2155 
2156 void
2157 spdk_bdev_unregister_done(struct spdk_bdev *bdev, int bdeverrno)
2158 {
2159 	if (bdev->unregister_cb != NULL) {
2160 		bdev->unregister_cb(bdev->unregister_ctx, bdeverrno);
2161 	}
2162 }
2163 
2164 void
2165 spdk_bdev_unregister(struct spdk_bdev *bdev, spdk_bdev_unregister_cb cb_fn, void *cb_arg)
2166 {
2167 	struct spdk_bdev_desc	*desc, *tmp;
2168 	int			rc;
2169 	bool			do_destruct = true;
2170 	struct spdk_bdev	*base_bdev;
2171 
2172 	SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Removing bdev %s from list\n", bdev->name);
2173 
2174 	pthread_mutex_lock(&bdev->mutex);
2175 
2176 	if (!TAILQ_EMPTY(&bdev->base_bdevs)) {
2177 		TAILQ_FOREACH(base_bdev, &bdev->base_bdevs, base_bdev_link) {
2178 			TAILQ_REMOVE(&base_bdev->vbdevs, bdev, vbdev_link);
2179 		}
2180 	}
2181 
2182 	bdev->status = SPDK_BDEV_STATUS_REMOVING;
2183 	bdev->unregister_cb = cb_fn;
2184 	bdev->unregister_ctx = cb_arg;
2185 
2186 	TAILQ_FOREACH_SAFE(desc, &bdev->open_descs, link, tmp) {
2187 		if (desc->remove_cb) {
2188 			pthread_mutex_unlock(&bdev->mutex);
2189 			do_destruct = false;
2190 			desc->remove_cb(desc->remove_ctx);
2191 			pthread_mutex_lock(&bdev->mutex);
2192 		}
2193 	}
2194 
2195 	if (!do_destruct) {
2196 		pthread_mutex_unlock(&bdev->mutex);
2197 		return;
2198 	}
2199 
2200 	TAILQ_REMOVE(&g_bdev_mgr.bdevs, bdev, link);
2201 	pthread_mutex_unlock(&bdev->mutex);
2202 
2203 	pthread_mutex_destroy(&bdev->mutex);
2204 
2205 	spdk_io_device_unregister(bdev, NULL);
2206 
2207 	rc = bdev->fn_table->destruct(bdev->ctxt);
2208 	if (rc < 0) {
2209 		SPDK_ERRLOG("destruct failed\n");
2210 	}
2211 	if (rc <= 0 && cb_fn != NULL) {
2212 		cb_fn(cb_arg, rc);
2213 	}
2214 }
2215 
2216 int
2217 spdk_bdev_open(struct spdk_bdev *bdev, bool write, spdk_bdev_remove_cb_t remove_cb,
2218 	       void *remove_ctx, struct spdk_bdev_desc **_desc)
2219 {
2220 	struct spdk_bdev_desc *desc;
2221 
2222 	desc = calloc(1, sizeof(*desc));
2223 	if (desc == NULL) {
2224 		SPDK_ERRLOG("Failed to allocate memory for bdev descriptor\n");
2225 		return -ENOMEM;
2226 	}
2227 
2228 	pthread_mutex_lock(&bdev->mutex);
2229 
2230 	if (write && bdev->claim_module) {
2231 		SPDK_INFOLOG(SPDK_LOG_BDEV, "Could not open %s - already claimed\n", bdev->name);
2232 		free(desc);
2233 		pthread_mutex_unlock(&bdev->mutex);
2234 		return -EPERM;
2235 	}
2236 
2237 	TAILQ_INSERT_TAIL(&bdev->open_descs, desc, link);
2238 
2239 	desc->bdev = bdev;
2240 	desc->remove_cb = remove_cb;
2241 	desc->remove_ctx = remove_ctx;
2242 	desc->write = write;
2243 	*_desc = desc;
2244 
2245 	pthread_mutex_unlock(&bdev->mutex);
2246 
2247 	return 0;
2248 }
2249 
2250 void
2251 spdk_bdev_close(struct spdk_bdev_desc *desc)
2252 {
2253 	struct spdk_bdev *bdev = desc->bdev;
2254 	bool do_unregister = false;
2255 
2256 	pthread_mutex_lock(&bdev->mutex);
2257 
2258 	TAILQ_REMOVE(&bdev->open_descs, desc, link);
2259 	free(desc);
2260 
2261 	if (bdev->status == SPDK_BDEV_STATUS_REMOVING && TAILQ_EMPTY(&bdev->open_descs)) {
2262 		do_unregister = true;
2263 	}
2264 	pthread_mutex_unlock(&bdev->mutex);
2265 
2266 	if (do_unregister == true) {
2267 		spdk_bdev_unregister(bdev, bdev->unregister_cb, bdev->unregister_ctx);
2268 	}
2269 }
2270 
2271 int
2272 spdk_bdev_module_claim_bdev(struct spdk_bdev *bdev, struct spdk_bdev_desc *desc,
2273 			    struct spdk_bdev_module_if *module)
2274 {
2275 	if (bdev->claim_module != NULL) {
2276 		SPDK_ERRLOG("bdev %s already claimed by module %s\n", bdev->name,
2277 			    bdev->claim_module->name);
2278 		return -EPERM;
2279 	}
2280 
2281 	if (desc && !desc->write) {
2282 		desc->write = true;
2283 	}
2284 
2285 	bdev->claim_module = module;
2286 	return 0;
2287 }
2288 
2289 void
2290 spdk_bdev_module_release_bdev(struct spdk_bdev *bdev)
2291 {
2292 	assert(bdev->claim_module != NULL);
2293 	bdev->claim_module = NULL;
2294 }
2295 
2296 struct spdk_bdev *
2297 spdk_bdev_desc_get_bdev(struct spdk_bdev_desc *desc)
2298 {
2299 	return desc->bdev;
2300 }
2301 
2302 void
2303 spdk_bdev_io_get_iovec(struct spdk_bdev_io *bdev_io, struct iovec **iovp, int *iovcntp)
2304 {
2305 	struct iovec *iovs;
2306 	int iovcnt;
2307 
2308 	if (bdev_io == NULL) {
2309 		return;
2310 	}
2311 
2312 	switch (bdev_io->type) {
2313 	case SPDK_BDEV_IO_TYPE_READ:
2314 		iovs = bdev_io->u.bdev.iovs;
2315 		iovcnt = bdev_io->u.bdev.iovcnt;
2316 		break;
2317 	case SPDK_BDEV_IO_TYPE_WRITE:
2318 		iovs = bdev_io->u.bdev.iovs;
2319 		iovcnt = bdev_io->u.bdev.iovcnt;
2320 		break;
2321 	default:
2322 		iovs = NULL;
2323 		iovcnt = 0;
2324 		break;
2325 	}
2326 
2327 	if (iovp) {
2328 		*iovp = iovs;
2329 	}
2330 	if (iovcntp) {
2331 		*iovcntp = iovcnt;
2332 	}
2333 }
2334 
2335 void
2336 spdk_bdev_module_list_add(struct spdk_bdev_module_if *bdev_module)
2337 {
2338 	/*
2339 	 * Modules with examine callbacks must be initialized first, so they are
2340 	 *  ready to handle examine callbacks from later modules that will
2341 	 *  register physical bdevs.
2342 	 */
2343 	if (bdev_module->examine != NULL) {
2344 		TAILQ_INSERT_HEAD(&g_bdev_mgr.bdev_modules, bdev_module, tailq);
2345 	} else {
2346 		TAILQ_INSERT_TAIL(&g_bdev_mgr.bdev_modules, bdev_module, tailq);
2347 	}
2348 }
2349 
2350 void
2351 spdk_bdev_part_base_free(struct spdk_bdev_part_base *base)
2352 {
2353 	if (base->desc) {
2354 		spdk_bdev_close(base->desc);
2355 		base->desc = NULL;
2356 	}
2357 	base->base_free_fn(base);
2358 }
2359 
2360 void
2361 spdk_bdev_part_free(struct spdk_bdev_part *part)
2362 {
2363 	struct spdk_bdev_part_base *base;
2364 
2365 	assert(part);
2366 	assert(part->base);
2367 
2368 	base = part->base;
2369 	spdk_io_device_unregister(&part->base, NULL);
2370 	TAILQ_REMOVE(base->tailq, part, tailq);
2371 	free(part->bdev.name);
2372 	free(part);
2373 
2374 	if (__sync_sub_and_fetch(&base->ref, 1) == 0) {
2375 		spdk_bdev_module_release_bdev(base->bdev);
2376 		spdk_bdev_part_base_free(base);
2377 	}
2378 }
2379 
2380 void
2381 spdk_bdev_part_base_hotremove(struct spdk_bdev *base_bdev, struct bdev_part_tailq *tailq)
2382 {
2383 	struct spdk_bdev_part *part, *tmp;
2384 
2385 	TAILQ_FOREACH_SAFE(part, tailq, tailq, tmp) {
2386 		if (part->base->bdev == base_bdev) {
2387 			spdk_bdev_unregister(&part->bdev, NULL, NULL);
2388 		}
2389 	}
2390 }
2391 
2392 static bool
2393 spdk_bdev_part_io_type_supported(void *_part, enum spdk_bdev_io_type io_type)
2394 {
2395 	struct spdk_bdev_part *part = _part;
2396 
2397 	return part->base->bdev->fn_table->io_type_supported(part->base->bdev, io_type);
2398 }
2399 
2400 static struct spdk_io_channel *
2401 spdk_bdev_part_get_io_channel(void *_part)
2402 {
2403 	struct spdk_bdev_part *part = _part;
2404 
2405 	return spdk_get_io_channel(&part->base);
2406 }
2407 
2408 static void
2409 spdk_bdev_part_complete_io(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg)
2410 {
2411 	struct spdk_bdev_io *part_io = cb_arg;
2412 	int status = success ? SPDK_BDEV_IO_STATUS_SUCCESS : SPDK_BDEV_IO_STATUS_FAILED;
2413 
2414 	spdk_bdev_io_complete(part_io, status);
2415 	spdk_bdev_free_io(bdev_io);
2416 }
2417 
2418 static void
2419 spdk_bdev_write_zeroes_split(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg)
2420 {
2421 	uint64_t len;
2422 
2423 	if (!success) {
2424 		bdev_io->cb = bdev_io->stored_user_cb;
2425 		_spdk_bdev_io_complete(bdev_io);
2426 		return;
2427 	}
2428 
2429 	/* no need to perform the error checking from write_zeroes_blocks because this request already passed those checks. */
2430 	len = spdk_min(spdk_bdev_get_block_size(bdev_io->bdev) * bdev_io->split_remaining_num_blocks,
2431 		       ZERO_BUFFER_SIZE);
2432 
2433 	bdev_io->u.bdev.offset_blocks = bdev_io->split_current_offset_blocks;
2434 	bdev_io->u.bdev.iov.iov_len = len;
2435 	bdev_io->u.bdev.num_blocks = len / spdk_bdev_get_block_size(bdev_io->bdev);
2436 	bdev_io->split_remaining_num_blocks -= bdev_io->u.bdev.num_blocks;
2437 	bdev_io->split_current_offset_blocks += bdev_io->u.bdev.num_blocks;
2438 
2439 	/* if this round completes the i/o, change the callback to be the original user callback */
2440 	if (bdev_io->split_remaining_num_blocks == 0) {
2441 		spdk_bdev_io_init(bdev_io, bdev_io->bdev, cb_arg, bdev_io->stored_user_cb);
2442 	} else {
2443 		spdk_bdev_io_init(bdev_io, bdev_io->bdev, cb_arg, spdk_bdev_write_zeroes_split);
2444 	}
2445 	spdk_bdev_io_submit(bdev_io);
2446 }
2447 
2448 void
2449 spdk_bdev_part_submit_request(struct spdk_bdev_part_channel *ch, struct spdk_bdev_io *bdev_io)
2450 {
2451 	struct spdk_bdev_part *part = ch->part;
2452 	struct spdk_io_channel *base_ch = ch->base_ch;
2453 	struct spdk_bdev_desc *base_desc = part->base->desc;
2454 	uint64_t offset;
2455 	int rc = 0;
2456 
2457 	/* Modify the I/O to adjust for the offset within the base bdev. */
2458 	switch (bdev_io->type) {
2459 	case SPDK_BDEV_IO_TYPE_READ:
2460 		offset = bdev_io->u.bdev.offset_blocks + part->offset_blocks;
2461 		rc = spdk_bdev_readv_blocks(base_desc, base_ch, bdev_io->u.bdev.iovs,
2462 					    bdev_io->u.bdev.iovcnt, offset,
2463 					    bdev_io->u.bdev.num_blocks, spdk_bdev_part_complete_io,
2464 					    bdev_io);
2465 		break;
2466 	case SPDK_BDEV_IO_TYPE_WRITE:
2467 		offset = bdev_io->u.bdev.offset_blocks + part->offset_blocks;
2468 		rc = spdk_bdev_writev_blocks(base_desc, base_ch, bdev_io->u.bdev.iovs,
2469 					     bdev_io->u.bdev.iovcnt, offset,
2470 					     bdev_io->u.bdev.num_blocks, spdk_bdev_part_complete_io,
2471 					     bdev_io);
2472 		break;
2473 	case SPDK_BDEV_IO_TYPE_WRITE_ZEROES:
2474 		offset = bdev_io->u.bdev.offset_blocks + part->offset_blocks;
2475 		rc = spdk_bdev_write_zeroes_blocks(base_desc, base_ch, offset, bdev_io->u.bdev.num_blocks,
2476 						   spdk_bdev_part_complete_io, bdev_io);
2477 		break;
2478 	case SPDK_BDEV_IO_TYPE_UNMAP:
2479 		offset = bdev_io->u.bdev.offset_blocks + part->offset_blocks;
2480 		rc = spdk_bdev_unmap_blocks(base_desc, base_ch, offset, bdev_io->u.bdev.num_blocks,
2481 					    spdk_bdev_part_complete_io, bdev_io);
2482 		break;
2483 	case SPDK_BDEV_IO_TYPE_FLUSH:
2484 		offset = bdev_io->u.bdev.offset_blocks + part->offset_blocks;
2485 		rc = spdk_bdev_flush_blocks(base_desc, base_ch, offset, bdev_io->u.bdev.num_blocks,
2486 					    spdk_bdev_part_complete_io, bdev_io);
2487 		break;
2488 	case SPDK_BDEV_IO_TYPE_RESET:
2489 		rc = spdk_bdev_reset(base_desc, base_ch,
2490 				     spdk_bdev_part_complete_io, bdev_io);
2491 		break;
2492 	default:
2493 		SPDK_ERRLOG("split: unknown I/O type %d\n", bdev_io->type);
2494 		spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED);
2495 		return;
2496 	}
2497 
2498 	if (rc != 0) {
2499 		spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED);
2500 	}
2501 }
2502 static int
2503 spdk_bdev_part_channel_create_cb(void *io_device, void *ctx_buf)
2504 {
2505 	struct spdk_bdev_part *part = SPDK_CONTAINEROF(io_device, struct spdk_bdev_part, base);
2506 	struct spdk_bdev_part_channel *ch = ctx_buf;
2507 
2508 	ch->part = part;
2509 	ch->base_ch = spdk_bdev_get_io_channel(part->base->desc);
2510 	if (ch->base_ch == NULL) {
2511 		return -1;
2512 	}
2513 
2514 	if (part->base->ch_create_cb) {
2515 		return part->base->ch_create_cb(io_device, ctx_buf);
2516 	} else {
2517 		return 0;
2518 	}
2519 }
2520 
2521 static void
2522 spdk_bdev_part_channel_destroy_cb(void *io_device, void *ctx_buf)
2523 {
2524 	struct spdk_bdev_part *part = SPDK_CONTAINEROF(io_device, struct spdk_bdev_part, base);
2525 	struct spdk_bdev_part_channel *ch = ctx_buf;
2526 
2527 	if (part->base->ch_destroy_cb) {
2528 		part->base->ch_destroy_cb(io_device, ctx_buf);
2529 	}
2530 	spdk_put_io_channel(ch->base_ch);
2531 }
2532 
2533 int
2534 spdk_bdev_part_base_construct(struct spdk_bdev_part_base *base, struct spdk_bdev *bdev,
2535 			      spdk_bdev_remove_cb_t remove_cb, struct spdk_bdev_module_if *module,
2536 			      struct spdk_bdev_fn_table *fn_table, struct bdev_part_tailq *tailq,
2537 			      spdk_bdev_part_base_free_fn free_fn,
2538 			      uint32_t channel_size, spdk_io_channel_create_cb ch_create_cb,
2539 			      spdk_io_channel_destroy_cb ch_destroy_cb)
2540 {
2541 	int rc;
2542 
2543 	fn_table->get_io_channel = spdk_bdev_part_get_io_channel;
2544 	fn_table->io_type_supported = spdk_bdev_part_io_type_supported;
2545 
2546 	base->bdev = bdev;
2547 	base->desc = NULL;
2548 	base->ref = 0;
2549 	base->module = module;
2550 	base->fn_table = fn_table;
2551 	base->tailq = tailq;
2552 	base->claimed = false;
2553 	base->channel_size = channel_size;
2554 	base->ch_create_cb = ch_create_cb;
2555 	base->ch_destroy_cb = ch_destroy_cb;
2556 	base->base_free_fn = free_fn;
2557 
2558 	rc = spdk_bdev_open(bdev, false, remove_cb, bdev, &base->desc);
2559 	if (rc) {
2560 		spdk_bdev_part_base_free(base);
2561 		SPDK_ERRLOG("could not open bdev %s\n", spdk_bdev_get_name(bdev));
2562 		return -1;
2563 	}
2564 
2565 	return 0;
2566 }
2567 
2568 int
2569 spdk_bdev_part_construct(struct spdk_bdev_part *part, struct spdk_bdev_part_base *base,
2570 			 char *name, uint64_t offset_blocks, uint64_t num_blocks,
2571 			 char *product_name)
2572 {
2573 	part->bdev.name = name;
2574 	part->bdev.blocklen = base->bdev->blocklen;
2575 	part->bdev.blockcnt = num_blocks;
2576 	part->offset_blocks = offset_blocks;
2577 
2578 	part->bdev.write_cache = base->bdev->write_cache;
2579 	part->bdev.need_aligned_buffer = base->bdev->need_aligned_buffer;
2580 	part->bdev.product_name = product_name;
2581 	part->bdev.ctxt = part;
2582 	part->bdev.module = base->module;
2583 	part->bdev.fn_table = base->fn_table;
2584 
2585 	__sync_fetch_and_add(&base->ref, 1);
2586 	part->base = base;
2587 
2588 	if (!base->claimed) {
2589 		int rc;
2590 
2591 		rc = spdk_bdev_module_claim_bdev(base->bdev, base->desc, base->module);
2592 		if (rc) {
2593 			SPDK_ERRLOG("could not claim bdev %s\n", spdk_bdev_get_name(base->bdev));
2594 			free(part->bdev.name);
2595 			return -1;
2596 		}
2597 		base->claimed = true;
2598 	}
2599 
2600 	spdk_io_device_register(&part->base, spdk_bdev_part_channel_create_cb,
2601 				spdk_bdev_part_channel_destroy_cb,
2602 				base->channel_size);
2603 	spdk_vbdev_register(&part->bdev, &base->bdev, 1);
2604 	TAILQ_INSERT_TAIL(base->tailq, part, tailq);
2605 
2606 	return 0;
2607 }
2608 
2609 SPDK_LOG_REGISTER_COMPONENT("bdev", SPDK_LOG_BDEV)
2610