xref: /spdk/lib/bdev/bdev.c (revision 5ffa5c003a9ddf6be2bae0496dc20953661ebe1b)
1 /*-
2  *   BSD LICENSE
3  *
4  *   Copyright (C) 2008-2012 Daisuke Aoyama <aoyama@peach.ne.jp>.
5  *   Copyright (c) Intel Corporation.
6  *   All rights reserved.
7  *
8  *   Redistribution and use in source and binary forms, with or without
9  *   modification, are permitted provided that the following conditions
10  *   are met:
11  *
12  *     * Redistributions of source code must retain the above copyright
13  *       notice, this list of conditions and the following disclaimer.
14  *     * Redistributions in binary form must reproduce the above copyright
15  *       notice, this list of conditions and the following disclaimer in
16  *       the documentation and/or other materials provided with the
17  *       distribution.
18  *     * Neither the name of Intel Corporation nor the names of its
19  *       contributors may be used to endorse or promote products derived
20  *       from this software without specific prior written permission.
21  *
22  *   THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
23  *   "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
24  *   LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
25  *   A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
26  *   OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
27  *   SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
28  *   LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
29  *   DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
30  *   THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
31  *   (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
32  *   OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
33  */
34 
35 #include "spdk/stdinc.h"
36 
37 #include "spdk/bdev.h"
38 
39 #include "spdk/env.h"
40 #include "spdk/event.h"
41 #include "spdk/io_channel.h"
42 #include "spdk/likely.h"
43 #include "spdk/queue.h"
44 #include "spdk/nvme_spec.h"
45 #include "spdk/scsi_spec.h"
46 #include "spdk/util.h"
47 
48 #include "spdk_internal/bdev.h"
49 #include "spdk_internal/log.h"
50 #include "spdk/string.h"
51 
52 #ifdef SPDK_CONFIG_VTUNE
53 #include "ittnotify.h"
54 #include "ittnotify_types.h"
55 int __itt_init_ittlib(const char *, __itt_group_id);
56 #endif
57 
58 #define SPDK_BDEV_IO_POOL_SIZE			(64 * 1024)
59 #define SPDK_BDEV_IO_CACHE_SIZE			256
60 #define BUF_SMALL_POOL_SIZE			8192
61 #define BUF_LARGE_POOL_SIZE			1024
62 #define NOMEM_THRESHOLD_COUNT			8
63 #define ZERO_BUFFER_SIZE			0x100000
64 #define SPDK_BDEV_QOS_TIMESLICE_IN_USEC		1000
65 #define SPDK_BDEV_SEC_TO_USEC			1000000ULL
66 #define SPDK_BDEV_QOS_MIN_IO_PER_TIMESLICE	1
67 
68 typedef TAILQ_HEAD(, spdk_bdev_io) bdev_io_tailq_t;
69 typedef STAILQ_HEAD(, spdk_bdev_io) bdev_io_stailq_t;
70 
71 struct spdk_bdev_mgr {
72 	struct spdk_mempool *bdev_io_pool;
73 
74 	struct spdk_mempool *buf_small_pool;
75 	struct spdk_mempool *buf_large_pool;
76 
77 	void *zero_buffer;
78 
79 	TAILQ_HEAD(, spdk_bdev_module) bdev_modules;
80 
81 	TAILQ_HEAD(, spdk_bdev) bdevs;
82 
83 	bool init_complete;
84 	bool module_init_complete;
85 
86 #ifdef SPDK_CONFIG_VTUNE
87 	__itt_domain	*domain;
88 #endif
89 };
90 
91 static struct spdk_bdev_mgr g_bdev_mgr = {
92 	.bdev_modules = TAILQ_HEAD_INITIALIZER(g_bdev_mgr.bdev_modules),
93 	.bdevs = TAILQ_HEAD_INITIALIZER(g_bdev_mgr.bdevs),
94 	.init_complete = false,
95 	.module_init_complete = false,
96 };
97 
98 static spdk_bdev_init_cb	g_init_cb_fn = NULL;
99 static void			*g_init_cb_arg = NULL;
100 
101 static spdk_bdev_fini_cb	g_fini_cb_fn = NULL;
102 static void			*g_fini_cb_arg = NULL;
103 static struct spdk_thread	*g_fini_thread = NULL;
104 
105 
106 struct spdk_bdev_mgmt_channel {
107 	bdev_io_stailq_t need_buf_small;
108 	bdev_io_stailq_t need_buf_large;
109 
110 	/*
111 	 * Each thread keeps a cache of bdev_io - this allows
112 	 *  bdev threads which are *not* DPDK threads to still
113 	 *  benefit from a per-thread bdev_io cache.  Without
114 	 *  this, non-DPDK threads fetching from the mempool
115 	 *  incur a cmpxchg on get and put.
116 	 */
117 	bdev_io_stailq_t per_thread_cache;
118 	uint32_t	per_thread_cache_count;
119 
120 	TAILQ_HEAD(, spdk_bdev_module_channel) module_channels;
121 };
122 
123 struct spdk_bdev_desc {
124 	struct spdk_bdev		*bdev;
125 	spdk_bdev_remove_cb_t		remove_cb;
126 	void				*remove_ctx;
127 	bool				write;
128 	TAILQ_ENTRY(spdk_bdev_desc)	link;
129 };
130 
131 #define BDEV_CH_RESET_IN_PROGRESS	(1 << 0)
132 #define BDEV_CH_QOS_ENABLED		(1 << 1)
133 
134 struct spdk_bdev_channel {
135 	struct spdk_bdev	*bdev;
136 
137 	/* The channel for the underlying device */
138 	struct spdk_io_channel	*channel;
139 
140 	/* Channel for the bdev manager */
141 	struct spdk_io_channel	*mgmt_channel;
142 
143 	struct spdk_bdev_io_stat stat;
144 
145 	/*
146 	 * Count of I/O submitted through this channel and waiting for completion.
147 	 * Incremented before submit_request() is called on an spdk_bdev_io.
148 	 */
149 	uint64_t		io_outstanding;
150 
151 	bdev_io_tailq_t		queued_resets;
152 
153 	uint32_t		flags;
154 
155 	/*
156 	 * Rate limiting on this channel.
157 	 * Queue of IO awaiting issue because of a QoS rate limiting happened
158 	 *  on this channel.
159 	 */
160 	bdev_io_tailq_t		qos_io;
161 
162 	/*
163 	 * Rate limiting on this channel.
164 	 * Maximum allowed IOs to be issued in one timeslice (e.g., 1ms) and
165 	 *  only valid for the master channel which manages the outstanding IOs.
166 	 */
167 	uint64_t		qos_max_ios_per_timeslice;
168 
169 	/*
170 	 * Rate limiting on this channel.
171 	 * Submitted IO in one timeslice (e.g., 1ms)
172 	 */
173 	uint64_t		io_submitted_this_timeslice;
174 
175 	/*
176 	 * Rate limiting on this channel.
177 	 * Periodic running QoS poller in millisecond.
178 	 */
179 	struct spdk_poller	*qos_poller;
180 
181 	/* Per-device channel */
182 	struct spdk_bdev_module_channel *module_ch;
183 
184 #ifdef SPDK_CONFIG_VTUNE
185 	uint64_t		start_tsc;
186 	uint64_t		interval_tsc;
187 	__itt_string_handle	*handle;
188 #endif
189 
190 };
191 
192 #define __bdev_to_io_dev(bdev)		(((char *)bdev) + 1)
193 #define __bdev_from_io_dev(io_dev)	((struct spdk_bdev *)(((char *)io_dev) - 1))
194 
195 /*
196  * Per-module (or per-io_device) channel. Multiple bdevs built on the same io_device
197  * will queue here their IO that awaits retry. It makes it posible to retry sending
198  * IO to one bdev after IO from other bdev completes.
199  */
200 struct spdk_bdev_module_channel {
201 	/*
202 	 * Count of I/O submitted to bdev module and waiting for completion.
203 	 * Incremented before submit_request() is called on an spdk_bdev_io.
204 	 */
205 	uint64_t		io_outstanding;
206 
207 	/*
208 	 * Queue of IO awaiting retry because of a previous NOMEM status returned
209 	 *  on this channel.
210 	 */
211 	bdev_io_tailq_t		nomem_io;
212 
213 	/*
214 	 * Threshold which io_outstanding must drop to before retrying nomem_io.
215 	 */
216 	uint64_t		nomem_threshold;
217 
218 	/* I/O channel allocated by a bdev module */
219 	struct spdk_io_channel	*module_ch;
220 
221 	uint32_t		ref;
222 
223 	TAILQ_ENTRY(spdk_bdev_module_channel) link;
224 };
225 
226 static void spdk_bdev_write_zeroes_split(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg);
227 
228 struct spdk_bdev *
229 spdk_bdev_first(void)
230 {
231 	struct spdk_bdev *bdev;
232 
233 	bdev = TAILQ_FIRST(&g_bdev_mgr.bdevs);
234 	if (bdev) {
235 		SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Starting bdev iteration at %s\n", bdev->name);
236 	}
237 
238 	return bdev;
239 }
240 
241 struct spdk_bdev *
242 spdk_bdev_next(struct spdk_bdev *prev)
243 {
244 	struct spdk_bdev *bdev;
245 
246 	bdev = TAILQ_NEXT(prev, link);
247 	if (bdev) {
248 		SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Continuing bdev iteration at %s\n", bdev->name);
249 	}
250 
251 	return bdev;
252 }
253 
254 static struct spdk_bdev *
255 _bdev_next_leaf(struct spdk_bdev *bdev)
256 {
257 	while (bdev != NULL) {
258 		if (bdev->claim_module == NULL) {
259 			return bdev;
260 		} else {
261 			bdev = TAILQ_NEXT(bdev, link);
262 		}
263 	}
264 
265 	return bdev;
266 }
267 
268 struct spdk_bdev *
269 spdk_bdev_first_leaf(void)
270 {
271 	struct spdk_bdev *bdev;
272 
273 	bdev = _bdev_next_leaf(TAILQ_FIRST(&g_bdev_mgr.bdevs));
274 
275 	if (bdev) {
276 		SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Starting bdev iteration at %s\n", bdev->name);
277 	}
278 
279 	return bdev;
280 }
281 
282 struct spdk_bdev *
283 spdk_bdev_next_leaf(struct spdk_bdev *prev)
284 {
285 	struct spdk_bdev *bdev;
286 
287 	bdev = _bdev_next_leaf(TAILQ_NEXT(prev, link));
288 
289 	if (bdev) {
290 		SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Continuing bdev iteration at %s\n", bdev->name);
291 	}
292 
293 	return bdev;
294 }
295 
296 struct spdk_bdev *
297 spdk_bdev_get_by_name(const char *bdev_name)
298 {
299 	struct spdk_bdev_alias *tmp;
300 	struct spdk_bdev *bdev = spdk_bdev_first();
301 
302 	while (bdev != NULL) {
303 		if (strcmp(bdev_name, bdev->name) == 0) {
304 			return bdev;
305 		}
306 
307 		TAILQ_FOREACH(tmp, &bdev->aliases, tailq) {
308 			if (strcmp(bdev_name, tmp->alias) == 0) {
309 				return bdev;
310 			}
311 		}
312 
313 		bdev = spdk_bdev_next(bdev);
314 	}
315 
316 	return NULL;
317 }
318 
319 static void
320 spdk_bdev_io_set_buf(struct spdk_bdev_io *bdev_io, void *buf)
321 {
322 	assert(bdev_io->get_buf_cb != NULL);
323 	assert(buf != NULL);
324 	assert(bdev_io->u.bdev.iovs != NULL);
325 
326 	bdev_io->buf = buf;
327 	bdev_io->u.bdev.iovs[0].iov_base = (void *)((unsigned long)((char *)buf + 512) & ~511UL);
328 	bdev_io->u.bdev.iovs[0].iov_len = bdev_io->buf_len;
329 	bdev_io->get_buf_cb(bdev_io->ch->channel, bdev_io);
330 }
331 
332 static void
333 spdk_bdev_io_put_buf(struct spdk_bdev_io *bdev_io)
334 {
335 	struct spdk_mempool *pool;
336 	struct spdk_bdev_io *tmp;
337 	void *buf;
338 	bdev_io_stailq_t *stailq;
339 	struct spdk_bdev_mgmt_channel *ch;
340 
341 	assert(bdev_io->u.bdev.iovcnt == 1);
342 
343 	buf = bdev_io->buf;
344 	ch = bdev_io->mgmt_ch;
345 
346 	if (bdev_io->buf_len <= SPDK_BDEV_SMALL_BUF_MAX_SIZE) {
347 		pool = g_bdev_mgr.buf_small_pool;
348 		stailq = &ch->need_buf_small;
349 	} else {
350 		pool = g_bdev_mgr.buf_large_pool;
351 		stailq = &ch->need_buf_large;
352 	}
353 
354 	if (STAILQ_EMPTY(stailq)) {
355 		spdk_mempool_put(pool, buf);
356 	} else {
357 		tmp = STAILQ_FIRST(stailq);
358 		STAILQ_REMOVE_HEAD(stailq, buf_link);
359 		spdk_bdev_io_set_buf(tmp, buf);
360 	}
361 }
362 
363 void
364 spdk_bdev_io_get_buf(struct spdk_bdev_io *bdev_io, spdk_bdev_io_get_buf_cb cb, uint64_t len)
365 {
366 	struct spdk_mempool *pool;
367 	bdev_io_stailq_t *stailq;
368 	void *buf = NULL;
369 	struct spdk_bdev_mgmt_channel *ch;
370 
371 	assert(cb != NULL);
372 	assert(bdev_io->u.bdev.iovs != NULL);
373 
374 	if (spdk_unlikely(bdev_io->u.bdev.iovs[0].iov_base != NULL)) {
375 		/* Buffer already present */
376 		cb(bdev_io->ch->channel, bdev_io);
377 		return;
378 	}
379 
380 	assert(len <= SPDK_BDEV_LARGE_BUF_MAX_SIZE);
381 	ch = spdk_io_channel_get_ctx(bdev_io->ch->mgmt_channel);
382 
383 	bdev_io->buf_len = len;
384 	bdev_io->get_buf_cb = cb;
385 	if (len <= SPDK_BDEV_SMALL_BUF_MAX_SIZE) {
386 		pool = g_bdev_mgr.buf_small_pool;
387 		stailq = &ch->need_buf_small;
388 	} else {
389 		pool = g_bdev_mgr.buf_large_pool;
390 		stailq = &ch->need_buf_large;
391 	}
392 
393 	buf = spdk_mempool_get(pool);
394 
395 	if (!buf) {
396 		STAILQ_INSERT_TAIL(stailq, bdev_io, buf_link);
397 	} else {
398 		spdk_bdev_io_set_buf(bdev_io, buf);
399 	}
400 }
401 
402 static int
403 spdk_bdev_module_get_max_ctx_size(void)
404 {
405 	struct spdk_bdev_module *bdev_module;
406 	int max_bdev_module_size = 0;
407 
408 	TAILQ_FOREACH(bdev_module, &g_bdev_mgr.bdev_modules, tailq) {
409 		if (bdev_module->get_ctx_size && bdev_module->get_ctx_size() > max_bdev_module_size) {
410 			max_bdev_module_size = bdev_module->get_ctx_size();
411 		}
412 	}
413 
414 	return max_bdev_module_size;
415 }
416 
417 void
418 spdk_bdev_config_text(FILE *fp)
419 {
420 	struct spdk_bdev_module *bdev_module;
421 
422 	TAILQ_FOREACH(bdev_module, &g_bdev_mgr.bdev_modules, tailq) {
423 		if (bdev_module->config_text) {
424 			bdev_module->config_text(fp);
425 		}
426 	}
427 }
428 
429 int
430 spdk_bdev_config_json(struct spdk_json_write_ctx *w)
431 {
432 	struct spdk_bdev_module *bdev_module;
433 	struct spdk_bdev *bdev;
434 
435 	if (!w) {
436 		return -EINVAL;
437 	}
438 
439 	spdk_json_write_array_begin(w);
440 
441 	TAILQ_FOREACH(bdev_module, &g_bdev_mgr.bdev_modules, tailq) {
442 		if (bdev_module->config_json) {
443 			bdev_module->config_json(w);
444 		}
445 	}
446 
447 	TAILQ_FOREACH(bdev, &g_bdev_mgr.bdevs, link) {
448 		spdk_bdev_write_config_json(bdev, w);
449 	}
450 
451 	spdk_json_write_array_end(w);
452 	return 0;
453 }
454 
455 static int
456 spdk_bdev_mgmt_channel_create(void *io_device, void *ctx_buf)
457 {
458 	struct spdk_bdev_mgmt_channel *ch = ctx_buf;
459 
460 	STAILQ_INIT(&ch->need_buf_small);
461 	STAILQ_INIT(&ch->need_buf_large);
462 
463 	STAILQ_INIT(&ch->per_thread_cache);
464 	ch->per_thread_cache_count = 0;
465 
466 	TAILQ_INIT(&ch->module_channels);
467 
468 	return 0;
469 }
470 
471 static void
472 spdk_bdev_mgmt_channel_free_resources(struct spdk_bdev_mgmt_channel *ch)
473 {
474 	struct spdk_bdev_io *bdev_io;
475 
476 	if (!STAILQ_EMPTY(&ch->need_buf_small) || !STAILQ_EMPTY(&ch->need_buf_large)) {
477 		SPDK_ERRLOG("Pending I/O list wasn't empty on channel free\n");
478 	}
479 
480 	while (!STAILQ_EMPTY(&ch->per_thread_cache)) {
481 		bdev_io = STAILQ_FIRST(&ch->per_thread_cache);
482 		STAILQ_REMOVE_HEAD(&ch->per_thread_cache, buf_link);
483 		ch->per_thread_cache_count--;
484 		spdk_mempool_put(g_bdev_mgr.bdev_io_pool, (void *)bdev_io);
485 	}
486 
487 	assert(ch->per_thread_cache_count == 0);
488 }
489 
490 static void
491 spdk_bdev_mgmt_channel_destroy(void *io_device, void *ctx_buf)
492 {
493 	struct spdk_bdev_mgmt_channel *ch = ctx_buf;
494 
495 	spdk_bdev_mgmt_channel_free_resources(ch);
496 }
497 
498 static void
499 spdk_bdev_init_complete(int rc)
500 {
501 	spdk_bdev_init_cb cb_fn = g_init_cb_fn;
502 	void *cb_arg = g_init_cb_arg;
503 
504 	g_bdev_mgr.init_complete = true;
505 	g_init_cb_fn = NULL;
506 	g_init_cb_arg = NULL;
507 
508 	cb_fn(cb_arg, rc);
509 }
510 
511 static void
512 spdk_bdev_module_action_complete(void)
513 {
514 	struct spdk_bdev_module *m;
515 
516 	/*
517 	 * Don't finish bdev subsystem initialization if
518 	 * module pre-initialization is still in progress, or
519 	 * the subsystem been already initialized.
520 	 */
521 	if (!g_bdev_mgr.module_init_complete || g_bdev_mgr.init_complete) {
522 		return;
523 	}
524 
525 	/*
526 	 * Check all bdev modules for inits/examinations in progress. If any
527 	 * exist, return immediately since we cannot finish bdev subsystem
528 	 * initialization until all are completed.
529 	 */
530 	TAILQ_FOREACH(m, &g_bdev_mgr.bdev_modules, tailq) {
531 		if (m->action_in_progress > 0) {
532 			return;
533 		}
534 	}
535 
536 	/*
537 	 * Modules already finished initialization - now that all
538 	 * the bdev modules have finished their asynchronous I/O
539 	 * processing, the entire bdev layer can be marked as complete.
540 	 */
541 	spdk_bdev_init_complete(0);
542 }
543 
544 static void
545 spdk_bdev_module_action_done(struct spdk_bdev_module *module)
546 {
547 	assert(module->action_in_progress > 0);
548 	module->action_in_progress--;
549 	spdk_bdev_module_action_complete();
550 }
551 
552 void
553 spdk_bdev_module_init_done(struct spdk_bdev_module *module)
554 {
555 	spdk_bdev_module_action_done(module);
556 }
557 
558 void
559 spdk_bdev_module_examine_done(struct spdk_bdev_module *module)
560 {
561 	spdk_bdev_module_action_done(module);
562 }
563 
564 static int
565 spdk_bdev_modules_init(void)
566 {
567 	struct spdk_bdev_module *module;
568 	int rc = 0;
569 
570 	TAILQ_FOREACH(module, &g_bdev_mgr.bdev_modules, tailq) {
571 		rc = module->module_init();
572 		if (rc != 0) {
573 			break;
574 		}
575 	}
576 
577 	g_bdev_mgr.module_init_complete = true;
578 	return rc;
579 }
580 void
581 spdk_bdev_initialize(spdk_bdev_init_cb cb_fn, void *cb_arg)
582 {
583 	int cache_size;
584 	int rc = 0;
585 	char mempool_name[32];
586 
587 	assert(cb_fn != NULL);
588 
589 	g_init_cb_fn = cb_fn;
590 	g_init_cb_arg = cb_arg;
591 
592 	snprintf(mempool_name, sizeof(mempool_name), "bdev_io_%d", getpid());
593 
594 	g_bdev_mgr.bdev_io_pool = spdk_mempool_create(mempool_name,
595 				  SPDK_BDEV_IO_POOL_SIZE,
596 				  sizeof(struct spdk_bdev_io) +
597 				  spdk_bdev_module_get_max_ctx_size(),
598 				  0,
599 				  SPDK_ENV_SOCKET_ID_ANY);
600 
601 	if (g_bdev_mgr.bdev_io_pool == NULL) {
602 		SPDK_ERRLOG("could not allocate spdk_bdev_io pool\n");
603 		spdk_bdev_init_complete(-1);
604 		return;
605 	}
606 
607 	/**
608 	 * Ensure no more than half of the total buffers end up local caches, by
609 	 *   using spdk_env_get_core_count() to determine how many local caches we need
610 	 *   to account for.
611 	 */
612 	cache_size = BUF_SMALL_POOL_SIZE / (2 * spdk_env_get_core_count());
613 	snprintf(mempool_name, sizeof(mempool_name), "buf_small_pool_%d", getpid());
614 
615 	g_bdev_mgr.buf_small_pool = spdk_mempool_create(mempool_name,
616 				    BUF_SMALL_POOL_SIZE,
617 				    SPDK_BDEV_SMALL_BUF_MAX_SIZE + 512,
618 				    cache_size,
619 				    SPDK_ENV_SOCKET_ID_ANY);
620 	if (!g_bdev_mgr.buf_small_pool) {
621 		SPDK_ERRLOG("create rbuf small pool failed\n");
622 		spdk_bdev_init_complete(-1);
623 		return;
624 	}
625 
626 	cache_size = BUF_LARGE_POOL_SIZE / (2 * spdk_env_get_core_count());
627 	snprintf(mempool_name, sizeof(mempool_name), "buf_large_pool_%d", getpid());
628 
629 	g_bdev_mgr.buf_large_pool = spdk_mempool_create(mempool_name,
630 				    BUF_LARGE_POOL_SIZE,
631 				    SPDK_BDEV_LARGE_BUF_MAX_SIZE + 512,
632 				    cache_size,
633 				    SPDK_ENV_SOCKET_ID_ANY);
634 	if (!g_bdev_mgr.buf_large_pool) {
635 		SPDK_ERRLOG("create rbuf large pool failed\n");
636 		spdk_bdev_init_complete(-1);
637 		return;
638 	}
639 
640 	g_bdev_mgr.zero_buffer = spdk_dma_zmalloc(ZERO_BUFFER_SIZE, ZERO_BUFFER_SIZE,
641 				 NULL);
642 	if (!g_bdev_mgr.zero_buffer) {
643 		SPDK_ERRLOG("create bdev zero buffer failed\n");
644 		spdk_bdev_init_complete(-1);
645 		return;
646 	}
647 
648 #ifdef SPDK_CONFIG_VTUNE
649 	g_bdev_mgr.domain = __itt_domain_create("spdk_bdev");
650 #endif
651 
652 	spdk_io_device_register(&g_bdev_mgr, spdk_bdev_mgmt_channel_create,
653 				spdk_bdev_mgmt_channel_destroy,
654 				sizeof(struct spdk_bdev_mgmt_channel));
655 
656 	rc = spdk_bdev_modules_init();
657 	if (rc != 0) {
658 		SPDK_ERRLOG("bdev modules init failed\n");
659 		spdk_bdev_init_complete(-1);
660 		return;
661 	}
662 
663 	spdk_bdev_module_action_complete();
664 }
665 
666 static void
667 spdk_bdev_module_finish_cb(void *io_device)
668 {
669 	spdk_bdev_fini_cb cb_fn = g_fini_cb_fn;
670 
671 	cb_fn(g_fini_cb_arg);
672 	g_fini_cb_fn = NULL;
673 	g_fini_cb_arg = NULL;
674 }
675 
676 static void
677 spdk_bdev_module_finish_complete(struct spdk_io_channel_iter *i, int status)
678 {
679 	if (spdk_mempool_count(g_bdev_mgr.bdev_io_pool) != SPDK_BDEV_IO_POOL_SIZE) {
680 		SPDK_ERRLOG("bdev IO pool count is %zu but should be %u\n",
681 			    spdk_mempool_count(g_bdev_mgr.bdev_io_pool),
682 			    SPDK_BDEV_IO_POOL_SIZE);
683 	}
684 
685 	if (spdk_mempool_count(g_bdev_mgr.buf_small_pool) != BUF_SMALL_POOL_SIZE) {
686 		SPDK_ERRLOG("Small buffer pool count is %zu but should be %u\n",
687 			    spdk_mempool_count(g_bdev_mgr.buf_small_pool),
688 			    BUF_SMALL_POOL_SIZE);
689 		assert(false);
690 	}
691 
692 	if (spdk_mempool_count(g_bdev_mgr.buf_large_pool) != BUF_LARGE_POOL_SIZE) {
693 		SPDK_ERRLOG("Large buffer pool count is %zu but should be %u\n",
694 			    spdk_mempool_count(g_bdev_mgr.buf_large_pool),
695 			    BUF_LARGE_POOL_SIZE);
696 		assert(false);
697 	}
698 
699 	spdk_mempool_free(g_bdev_mgr.bdev_io_pool);
700 	spdk_mempool_free(g_bdev_mgr.buf_small_pool);
701 	spdk_mempool_free(g_bdev_mgr.buf_large_pool);
702 	spdk_dma_free(g_bdev_mgr.zero_buffer);
703 
704 	spdk_io_device_unregister(&g_bdev_mgr, spdk_bdev_module_finish_cb);
705 }
706 
707 static void
708 mgmt_channel_free_resources(struct spdk_io_channel_iter *i)
709 {
710 	struct spdk_io_channel *_ch = spdk_io_channel_iter_get_channel(i);
711 	struct spdk_bdev_mgmt_channel *ch = spdk_io_channel_get_ctx(_ch);
712 
713 	spdk_bdev_mgmt_channel_free_resources(ch);
714 	spdk_for_each_channel_continue(i, 0);
715 }
716 
717 static void
718 spdk_bdev_module_finish_iter(void *arg)
719 {
720 	/* Notice that this variable is static. It is saved between calls to
721 	 * this function. */
722 	static struct spdk_bdev_module *resume_bdev_module = NULL;
723 	struct spdk_bdev_module *bdev_module;
724 
725 	/* Start iterating from the last touched module */
726 	if (!resume_bdev_module) {
727 		bdev_module = TAILQ_FIRST(&g_bdev_mgr.bdev_modules);
728 	} else {
729 		bdev_module = TAILQ_NEXT(resume_bdev_module, tailq);
730 	}
731 
732 	while (bdev_module) {
733 		if (bdev_module->async_fini) {
734 			/* Save our place so we can resume later. We must
735 			 * save the variable here, before calling module_fini()
736 			 * below, because in some cases the module may immediately
737 			 * call spdk_bdev_module_finish_done() and re-enter
738 			 * this function to continue iterating. */
739 			resume_bdev_module = bdev_module;
740 		}
741 
742 		if (bdev_module->module_fini) {
743 			bdev_module->module_fini();
744 		}
745 
746 		if (bdev_module->async_fini) {
747 			return;
748 		}
749 
750 		bdev_module = TAILQ_NEXT(bdev_module, tailq);
751 	}
752 
753 	resume_bdev_module = NULL;
754 	spdk_for_each_channel(&g_bdev_mgr, mgmt_channel_free_resources, NULL,
755 			      spdk_bdev_module_finish_complete);
756 }
757 
758 void
759 spdk_bdev_module_finish_done(void)
760 {
761 	if (spdk_get_thread() != g_fini_thread) {
762 		spdk_thread_send_msg(g_fini_thread, spdk_bdev_module_finish_iter, NULL);
763 	} else {
764 		spdk_bdev_module_finish_iter(NULL);
765 	}
766 }
767 
768 static void
769 _spdk_bdev_finish_unregister_bdevs_iter(void *cb_arg, int bdeverrno)
770 {
771 	struct spdk_bdev *bdev = cb_arg;
772 
773 	if (bdeverrno && bdev) {
774 		SPDK_WARNLOG("Unable to unregister bdev '%s' during spdk_bdev_finish()\n",
775 			     bdev->name);
776 
777 		/*
778 		 * Since the call to spdk_bdev_unregister() failed, we have no way to free this
779 		 *  bdev; try to continue by manually removing this bdev from the list and continue
780 		 *  with the next bdev in the list.
781 		 */
782 		TAILQ_REMOVE(&g_bdev_mgr.bdevs, bdev, link);
783 	}
784 
785 	if (TAILQ_EMPTY(&g_bdev_mgr.bdevs)) {
786 		SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Done unregistering bdevs\n");
787 		spdk_bdev_module_finish_iter(NULL);
788 		return;
789 	}
790 
791 	/*
792 	 * Unregister the first bdev in the list.
793 	 *
794 	 * spdk_bdev_unregister() will handle the case where the bdev has open descriptors by
795 	 *  calling the remove_cb of the descriptors first.
796 	 *
797 	 * Once this bdev and all of its open descriptors have been cleaned up, this function
798 	 *  will be called again via the unregister completion callback to continue the cleanup
799 	 *  process with the next bdev.
800 	 */
801 	bdev = TAILQ_FIRST(&g_bdev_mgr.bdevs);
802 	SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Unregistering bdev '%s'\n", bdev->name);
803 	spdk_bdev_unregister(bdev, _spdk_bdev_finish_unregister_bdevs_iter, bdev);
804 }
805 
806 static void
807 _spdk_bdev_finish_unregister_bdevs(void)
808 {
809 	_spdk_bdev_finish_unregister_bdevs_iter(NULL, 0);
810 }
811 
812 void
813 spdk_bdev_finish(spdk_bdev_fini_cb cb_fn, void *cb_arg)
814 {
815 	assert(cb_fn != NULL);
816 
817 	g_fini_thread = spdk_get_thread();
818 
819 	g_fini_cb_fn = cb_fn;
820 	g_fini_cb_arg = cb_arg;
821 
822 	_spdk_bdev_finish_unregister_bdevs();
823 }
824 
825 static struct spdk_bdev_io *
826 spdk_bdev_get_io(struct spdk_io_channel *_ch)
827 {
828 	struct spdk_bdev_mgmt_channel *ch = spdk_io_channel_get_ctx(_ch);
829 	struct spdk_bdev_io *bdev_io;
830 
831 	if (ch->per_thread_cache_count > 0) {
832 		bdev_io = STAILQ_FIRST(&ch->per_thread_cache);
833 		STAILQ_REMOVE_HEAD(&ch->per_thread_cache, buf_link);
834 		ch->per_thread_cache_count--;
835 	} else {
836 		bdev_io = spdk_mempool_get(g_bdev_mgr.bdev_io_pool);
837 		if (!bdev_io) {
838 			SPDK_ERRLOG("Unable to get spdk_bdev_io\n");
839 			return NULL;
840 		}
841 	}
842 
843 	bdev_io->mgmt_ch = ch;
844 
845 	return bdev_io;
846 }
847 
848 static void
849 spdk_bdev_put_io(struct spdk_bdev_io *bdev_io)
850 {
851 	struct spdk_bdev_mgmt_channel *ch = bdev_io->mgmt_ch;
852 
853 	if (bdev_io->buf != NULL) {
854 		spdk_bdev_io_put_buf(bdev_io);
855 	}
856 
857 	if (ch->per_thread_cache_count < SPDK_BDEV_IO_CACHE_SIZE) {
858 		ch->per_thread_cache_count++;
859 		STAILQ_INSERT_TAIL(&ch->per_thread_cache, bdev_io, buf_link);
860 	} else {
861 		spdk_mempool_put(g_bdev_mgr.bdev_io_pool, (void *)bdev_io);
862 	}
863 }
864 
865 static void
866 _spdk_bdev_qos_io_submit(void *ctx)
867 {
868 	struct spdk_bdev_channel	*ch = ctx;
869 	struct spdk_bdev_io		*bdev_io = NULL;
870 	struct spdk_bdev		*bdev = ch->bdev;
871 	struct spdk_bdev_module_channel *shared_ch = ch->module_ch;
872 
873 	while (!TAILQ_EMPTY(&ch->qos_io)) {
874 		if (ch->io_submitted_this_timeslice < ch->qos_max_ios_per_timeslice) {
875 			bdev_io = TAILQ_FIRST(&ch->qos_io);
876 			TAILQ_REMOVE(&ch->qos_io, bdev_io, link);
877 			ch->io_submitted_this_timeslice++;
878 			shared_ch->io_outstanding++;
879 			bdev->fn_table->submit_request(ch->channel, bdev_io);
880 		} else {
881 			break;
882 		}
883 	}
884 }
885 
886 static void
887 _spdk_bdev_io_submit(void *ctx)
888 {
889 	struct spdk_bdev_io *bdev_io = ctx;
890 	struct spdk_bdev *bdev = bdev_io->bdev;
891 	struct spdk_bdev_channel *bdev_ch = bdev_io->ch;
892 	struct spdk_io_channel *ch = bdev_ch->channel;
893 	struct spdk_bdev_module_channel	*shared_ch = bdev_ch->module_ch;
894 
895 	bdev_io->submit_tsc = spdk_get_ticks();
896 	bdev_ch->io_outstanding++;
897 	shared_ch->io_outstanding++;
898 	bdev_io->in_submit_request = true;
899 	if (spdk_likely(bdev_ch->flags == 0)) {
900 		if (spdk_likely(TAILQ_EMPTY(&shared_ch->nomem_io))) {
901 			bdev->fn_table->submit_request(ch, bdev_io);
902 		} else {
903 			bdev_ch->io_outstanding--;
904 			shared_ch->io_outstanding--;
905 			TAILQ_INSERT_TAIL(&shared_ch->nomem_io, bdev_io, link);
906 		}
907 	} else if (bdev_ch->flags & BDEV_CH_RESET_IN_PROGRESS) {
908 		spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED);
909 	} else if (bdev_ch->flags & BDEV_CH_QOS_ENABLED) {
910 		shared_ch->io_outstanding--;
911 		TAILQ_INSERT_TAIL(&bdev_ch->qos_io, bdev_io, link);
912 		_spdk_bdev_qos_io_submit(bdev_ch);
913 	} else {
914 		SPDK_ERRLOG("unknown bdev_ch flag %x found\n", bdev_ch->flags);
915 		spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED);
916 	}
917 	bdev_io->in_submit_request = false;
918 }
919 
920 static void
921 spdk_bdev_io_submit(struct spdk_bdev_io *bdev_io)
922 {
923 	struct spdk_bdev *bdev = bdev_io->bdev;
924 
925 	assert(bdev_io->status == SPDK_BDEV_IO_STATUS_PENDING);
926 
927 	/* QoS channel and thread have been properly configured */
928 	if (bdev->ios_per_sec > 0 && bdev->qos_channel && bdev->qos_thread) {
929 		bdev_io->io_submit_ch = bdev_io->ch;
930 		bdev_io->ch = bdev->qos_channel;
931 		spdk_thread_send_msg(bdev->qos_thread, _spdk_bdev_io_submit, bdev_io);
932 	} else {
933 		_spdk_bdev_io_submit(bdev_io);
934 	}
935 }
936 
937 static void
938 spdk_bdev_io_submit_reset(struct spdk_bdev_io *bdev_io)
939 {
940 	struct spdk_bdev *bdev = bdev_io->bdev;
941 	struct spdk_bdev_channel *bdev_ch = bdev_io->ch;
942 	struct spdk_io_channel *ch = bdev_ch->channel;
943 
944 	assert(bdev_io->status == SPDK_BDEV_IO_STATUS_PENDING);
945 
946 	bdev_io->in_submit_request = true;
947 	bdev->fn_table->submit_request(ch, bdev_io);
948 	bdev_io->in_submit_request = false;
949 }
950 
951 static void
952 spdk_bdev_io_init(struct spdk_bdev_io *bdev_io,
953 		  struct spdk_bdev *bdev, void *cb_arg,
954 		  spdk_bdev_io_completion_cb cb)
955 {
956 	bdev_io->bdev = bdev;
957 	bdev_io->caller_ctx = cb_arg;
958 	bdev_io->cb = cb;
959 	bdev_io->status = SPDK_BDEV_IO_STATUS_PENDING;
960 	bdev_io->in_submit_request = false;
961 	bdev_io->buf = NULL;
962 	bdev_io->io_submit_ch = NULL;
963 }
964 
965 bool
966 spdk_bdev_io_type_supported(struct spdk_bdev *bdev, enum spdk_bdev_io_type io_type)
967 {
968 	return bdev->fn_table->io_type_supported(bdev->ctxt, io_type);
969 }
970 
971 int
972 spdk_bdev_dump_info_json(struct spdk_bdev *bdev, struct spdk_json_write_ctx *w)
973 {
974 	if (bdev->fn_table->dump_info_json) {
975 		return bdev->fn_table->dump_info_json(bdev->ctxt, w);
976 	}
977 
978 	return 0;
979 }
980 
981 int
982 spdk_bdev_write_config_json(struct spdk_bdev *bdev, struct spdk_json_write_ctx *w)
983 {
984 	if (bdev == NULL || w == NULL) {
985 		return -EINVAL;
986 	}
987 
988 	if (bdev->fn_table->write_config_json) {
989 		bdev->fn_table->write_config_json(bdev, w);
990 	} else {
991 		spdk_json_write_object_begin(w);
992 		spdk_json_write_named_string(w, "name", bdev->name);
993 		spdk_json_write_object_end(w);
994 	}
995 
996 	return 0;
997 }
998 
999 static void
1000 spdk_bdev_qos_get_max_ios_per_timeslice(struct spdk_bdev *bdev)
1001 {
1002 	uint64_t	qos_max_ios_per_timeslice = 0;
1003 
1004 	qos_max_ios_per_timeslice = bdev->ios_per_sec * SPDK_BDEV_QOS_TIMESLICE_IN_USEC /
1005 				    SPDK_BDEV_SEC_TO_USEC;
1006 	bdev->qos_channel->qos_max_ios_per_timeslice = spdk_max(qos_max_ios_per_timeslice,
1007 			SPDK_BDEV_QOS_MIN_IO_PER_TIMESLICE);
1008 }
1009 
1010 static int
1011 spdk_bdev_channel_poll_qos(void *arg)
1012 {
1013 	struct spdk_bdev_channel	*ch = arg;
1014 	struct spdk_bdev		*bdev = ch->bdev;
1015 
1016 	/* Reset for next round of rate limiting */
1017 	ch->io_submitted_this_timeslice = 0;
1018 	spdk_bdev_qos_get_max_ios_per_timeslice(bdev);
1019 
1020 	_spdk_bdev_qos_io_submit(ch);
1021 
1022 	return -1;
1023 }
1024 
1025 static int
1026 _spdk_bdev_channel_create(struct spdk_bdev_channel *ch, void *io_device)
1027 {
1028 	struct spdk_bdev		*bdev = __bdev_from_io_dev(io_device);
1029 	struct spdk_bdev_mgmt_channel	*mgmt_ch;
1030 	struct spdk_bdev_module_channel	*shared_ch;
1031 
1032 	ch->bdev = bdev;
1033 	ch->channel = bdev->fn_table->get_io_channel(bdev->ctxt);
1034 	if (!ch->channel) {
1035 		return -1;
1036 	}
1037 
1038 	ch->mgmt_channel = spdk_get_io_channel(&g_bdev_mgr);
1039 	if (!ch->mgmt_channel) {
1040 		return -1;
1041 	}
1042 
1043 	mgmt_ch = spdk_io_channel_get_ctx(ch->mgmt_channel);
1044 	TAILQ_FOREACH(shared_ch, &mgmt_ch->module_channels, link) {
1045 		if (shared_ch->module_ch == ch->channel) {
1046 			shared_ch->ref++;
1047 			break;
1048 		}
1049 	}
1050 
1051 	if (shared_ch == NULL) {
1052 		shared_ch = calloc(1, sizeof(*shared_ch));
1053 		if (!shared_ch) {
1054 			return -1;
1055 		}
1056 
1057 		shared_ch->io_outstanding = 0;
1058 		TAILQ_INIT(&shared_ch->nomem_io);
1059 		shared_ch->nomem_threshold = 0;
1060 		shared_ch->module_ch = ch->channel;
1061 		shared_ch->ref = 1;
1062 		TAILQ_INSERT_TAIL(&mgmt_ch->module_channels, shared_ch, link);
1063 	}
1064 
1065 	memset(&ch->stat, 0, sizeof(ch->stat));
1066 	ch->io_outstanding = 0;
1067 	TAILQ_INIT(&ch->queued_resets);
1068 	TAILQ_INIT(&ch->qos_io);
1069 	ch->qos_max_ios_per_timeslice = 0;
1070 	ch->io_submitted_this_timeslice = 0;
1071 	ch->qos_poller = NULL;
1072 	ch->flags = 0;
1073 	ch->module_ch = shared_ch;
1074 
1075 	return 0;
1076 }
1077 
1078 static void
1079 _spdk_bdev_channel_destroy_resource(struct spdk_bdev_channel *ch)
1080 {
1081 	struct spdk_bdev_mgmt_channel	*mgmt_channel;
1082 	struct spdk_bdev_module_channel	*shared_ch = NULL;
1083 
1084 	if (!ch) {
1085 		return;
1086 	}
1087 
1088 	if (ch->channel) {
1089 		spdk_put_io_channel(ch->channel);
1090 	}
1091 
1092 	if (ch->mgmt_channel) {
1093 		shared_ch = ch->module_ch;
1094 		if (shared_ch) {
1095 			assert(ch->io_outstanding == 0);
1096 			assert(shared_ch->ref > 0);
1097 			shared_ch->ref--;
1098 			if (shared_ch->ref == 0) {
1099 				mgmt_channel = spdk_io_channel_get_ctx(ch->mgmt_channel);
1100 				assert(shared_ch->io_outstanding == 0);
1101 				TAILQ_REMOVE(&mgmt_channel->module_channels, shared_ch, link);
1102 				free(shared_ch);
1103 			}
1104 		}
1105 		spdk_put_io_channel(ch->mgmt_channel);
1106 	}
1107 }
1108 
1109 /* Caller must hold bdev->mutex. */
1110 static int
1111 spdk_bdev_qos_channel_create(struct spdk_bdev *bdev)
1112 {
1113 	assert(bdev->qos_channel == NULL);
1114 	assert(bdev->qos_thread == NULL);
1115 
1116 	bdev->qos_channel = calloc(1, sizeof(struct spdk_bdev_channel));
1117 	if (!bdev->qos_channel) {
1118 		return -1;
1119 	}
1120 
1121 	bdev->qos_thread = spdk_get_thread();
1122 	if (!bdev->qos_thread) {
1123 		free(bdev->qos_channel);
1124 		bdev->qos_channel = NULL;
1125 		return -1;
1126 	}
1127 
1128 	if (_spdk_bdev_channel_create(bdev->qos_channel, __bdev_to_io_dev(bdev)) != 0) {
1129 		free(bdev->qos_channel);
1130 		bdev->qos_channel = NULL;
1131 		bdev->qos_thread = NULL;
1132 		return -1;
1133 	}
1134 
1135 	bdev->qos_channel->flags |= BDEV_CH_QOS_ENABLED;
1136 	spdk_bdev_qos_get_max_ios_per_timeslice(bdev);
1137 	bdev->qos_channel->qos_poller = spdk_poller_register(
1138 						spdk_bdev_channel_poll_qos,
1139 						bdev->qos_channel,
1140 						SPDK_BDEV_QOS_TIMESLICE_IN_USEC);
1141 
1142 	return 0;
1143 }
1144 
1145 static int
1146 spdk_bdev_channel_create(void *io_device, void *ctx_buf)
1147 {
1148 	struct spdk_bdev		*bdev = __bdev_from_io_dev(io_device);
1149 	struct spdk_bdev_channel	*ch = ctx_buf;
1150 
1151 	if (_spdk_bdev_channel_create(ch, io_device) != 0) {
1152 		_spdk_bdev_channel_destroy_resource(ch);
1153 		return -1;
1154 	}
1155 
1156 #ifdef SPDK_CONFIG_VTUNE
1157 	{
1158 		char *name;
1159 		__itt_init_ittlib(NULL, 0);
1160 		name = spdk_sprintf_alloc("spdk_bdev_%s_%p", ch->bdev->name, ch);
1161 		if (!name) {
1162 			_spdk_bdev_channel_destroy_resource(ch);
1163 			return -1;
1164 		}
1165 		ch->handle = __itt_string_handle_create(name);
1166 		free(name);
1167 		ch->start_tsc = spdk_get_ticks();
1168 		ch->interval_tsc = spdk_get_ticks_hz() / 100;
1169 	}
1170 #endif
1171 
1172 	pthread_mutex_lock(&bdev->mutex);
1173 
1174 	/* Rate limiting on this bdev enabled */
1175 	if (bdev->ios_per_sec > 0 && bdev->qos_channel == NULL) {
1176 		if (spdk_bdev_qos_channel_create(bdev) != 0) {
1177 			_spdk_bdev_channel_destroy_resource(ch);
1178 			pthread_mutex_unlock(&bdev->mutex);
1179 			return -1;
1180 		}
1181 	}
1182 
1183 	bdev->channel_count++;
1184 
1185 	pthread_mutex_unlock(&bdev->mutex);
1186 
1187 	return 0;
1188 }
1189 
1190 /*
1191  * Abort I/O that are waiting on a data buffer.  These types of I/O are
1192  *  linked using the spdk_bdev_io buf_link TAILQ_ENTRY.
1193  */
1194 static void
1195 _spdk_bdev_abort_buf_io(bdev_io_stailq_t *queue, struct spdk_bdev_channel *ch)
1196 {
1197 	bdev_io_stailq_t tmp;
1198 	struct spdk_bdev_io *bdev_io;
1199 
1200 	STAILQ_INIT(&tmp);
1201 
1202 	while (!STAILQ_EMPTY(queue)) {
1203 		bdev_io = STAILQ_FIRST(queue);
1204 		STAILQ_REMOVE_HEAD(queue, buf_link);
1205 		if (bdev_io->ch == ch) {
1206 			spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED);
1207 		} else {
1208 			STAILQ_INSERT_TAIL(&tmp, bdev_io, buf_link);
1209 		}
1210 	}
1211 
1212 	STAILQ_SWAP(&tmp, queue, spdk_bdev_io);
1213 }
1214 
1215 /*
1216  * Abort I/O that are queued waiting for submission.  These types of I/O are
1217  *  linked using the spdk_bdev_io link TAILQ_ENTRY.
1218  */
1219 static void
1220 _spdk_bdev_abort_queued_io(bdev_io_tailq_t *queue, struct spdk_bdev_channel *ch)
1221 {
1222 	struct spdk_bdev_io *bdev_io, *tmp;
1223 
1224 	TAILQ_FOREACH_SAFE(bdev_io, queue, link, tmp) {
1225 		if (bdev_io->ch == ch) {
1226 			TAILQ_REMOVE(queue, bdev_io, link);
1227 			/*
1228 			 * spdk_bdev_io_complete() assumes that the completed I/O had
1229 			 *  been submitted to the bdev module.  Since in this case it
1230 			 *  hadn't, bump io_outstanding to account for the decrement
1231 			 *  that spdk_bdev_io_complete() will do.
1232 			 */
1233 			if (bdev_io->type != SPDK_BDEV_IO_TYPE_RESET) {
1234 				ch->io_outstanding++;
1235 				ch->module_ch->io_outstanding++;
1236 			}
1237 			spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED);
1238 		}
1239 	}
1240 }
1241 
1242 static void
1243 _spdk_bdev_channel_destroy(struct spdk_bdev_channel *ch)
1244 {
1245 	struct spdk_bdev_mgmt_channel	*mgmt_channel;
1246 	struct spdk_bdev_module_channel	*shared_ch = ch->module_ch;
1247 
1248 	mgmt_channel = spdk_io_channel_get_ctx(ch->mgmt_channel);
1249 
1250 	_spdk_bdev_abort_queued_io(&ch->queued_resets, ch);
1251 	_spdk_bdev_abort_queued_io(&ch->qos_io, ch);
1252 	_spdk_bdev_abort_queued_io(&shared_ch->nomem_io, ch);
1253 	_spdk_bdev_abort_buf_io(&mgmt_channel->need_buf_small, ch);
1254 	_spdk_bdev_abort_buf_io(&mgmt_channel->need_buf_large, ch);
1255 
1256 	_spdk_bdev_channel_destroy_resource(ch);
1257 }
1258 
1259 static void
1260 spdk_bdev_qos_channel_destroy(void *ctx)
1261 {
1262 	struct spdk_bdev_channel *qos_channel = ctx;
1263 
1264 	_spdk_bdev_channel_destroy(qos_channel);
1265 
1266 	spdk_poller_unregister(&qos_channel->qos_poller);
1267 	free(qos_channel);
1268 }
1269 
1270 static void
1271 spdk_bdev_channel_destroy(void *io_device, void *ctx_buf)
1272 {
1273 	struct spdk_bdev_channel	*ch = ctx_buf;
1274 	struct spdk_bdev		*bdev = ch->bdev;
1275 
1276 	_spdk_bdev_channel_destroy(ch);
1277 
1278 	pthread_mutex_lock(&bdev->mutex);
1279 	bdev->channel_count--;
1280 	if (bdev->channel_count == 0 && bdev->qos_channel != NULL) {
1281 		/* All I/O channels for this bdev have been destroyed - destroy the QoS channel. */
1282 		spdk_thread_send_msg(bdev->qos_thread, spdk_bdev_qos_channel_destroy,
1283 				     bdev->qos_channel);
1284 
1285 		/*
1286 		 * Set qos_channel to NULL within the critical section so that
1287 		 * if another channel is created, it will see qos_channel == NULL and
1288 		 * re-create the QoS channel even if the asynchronous qos_channel_destroy
1289 		 * isn't finished yet.
1290 		 */
1291 		bdev->qos_channel = NULL;
1292 		bdev->qos_thread = NULL;
1293 	}
1294 	pthread_mutex_unlock(&bdev->mutex);
1295 }
1296 
1297 int
1298 spdk_bdev_alias_add(struct spdk_bdev *bdev, const char *alias)
1299 {
1300 	struct spdk_bdev_alias *tmp;
1301 
1302 	if (alias == NULL) {
1303 		SPDK_ERRLOG("Empty alias passed\n");
1304 		return -EINVAL;
1305 	}
1306 
1307 	if (spdk_bdev_get_by_name(alias)) {
1308 		SPDK_ERRLOG("Bdev name/alias: %s already exists\n", alias);
1309 		return -EEXIST;
1310 	}
1311 
1312 	tmp = calloc(1, sizeof(*tmp));
1313 	if (tmp == NULL) {
1314 		SPDK_ERRLOG("Unable to allocate alias\n");
1315 		return -ENOMEM;
1316 	}
1317 
1318 	tmp->alias = strdup(alias);
1319 	if (tmp->alias == NULL) {
1320 		free(tmp);
1321 		SPDK_ERRLOG("Unable to allocate alias\n");
1322 		return -ENOMEM;
1323 	}
1324 
1325 	TAILQ_INSERT_TAIL(&bdev->aliases, tmp, tailq);
1326 
1327 	return 0;
1328 }
1329 
1330 int
1331 spdk_bdev_alias_del(struct spdk_bdev *bdev, const char *alias)
1332 {
1333 	struct spdk_bdev_alias *tmp;
1334 
1335 	TAILQ_FOREACH(tmp, &bdev->aliases, tailq) {
1336 		if (strcmp(alias, tmp->alias) == 0) {
1337 			TAILQ_REMOVE(&bdev->aliases, tmp, tailq);
1338 			free(tmp->alias);
1339 			free(tmp);
1340 			return 0;
1341 		}
1342 	}
1343 
1344 	SPDK_INFOLOG(SPDK_LOG_BDEV, "Alias %s does not exists\n", alias);
1345 
1346 	return -ENOENT;
1347 }
1348 
1349 struct spdk_io_channel *
1350 spdk_bdev_get_io_channel(struct spdk_bdev_desc *desc)
1351 {
1352 	return spdk_get_io_channel(__bdev_to_io_dev(desc->bdev));
1353 }
1354 
1355 const char *
1356 spdk_bdev_get_name(const struct spdk_bdev *bdev)
1357 {
1358 	return bdev->name;
1359 }
1360 
1361 const char *
1362 spdk_bdev_get_product_name(const struct spdk_bdev *bdev)
1363 {
1364 	return bdev->product_name;
1365 }
1366 
1367 const struct spdk_bdev_aliases_list *
1368 spdk_bdev_get_aliases(const struct spdk_bdev *bdev)
1369 {
1370 	return &bdev->aliases;
1371 }
1372 
1373 uint32_t
1374 spdk_bdev_get_block_size(const struct spdk_bdev *bdev)
1375 {
1376 	return bdev->blocklen;
1377 }
1378 
1379 uint64_t
1380 spdk_bdev_get_num_blocks(const struct spdk_bdev *bdev)
1381 {
1382 	return bdev->blockcnt;
1383 }
1384 
1385 size_t
1386 spdk_bdev_get_buf_align(const struct spdk_bdev *bdev)
1387 {
1388 	/* TODO: push this logic down to the bdev modules */
1389 	if (bdev->need_aligned_buffer) {
1390 		return bdev->blocklen;
1391 	}
1392 
1393 	return 1;
1394 }
1395 
1396 uint32_t
1397 spdk_bdev_get_optimal_io_boundary(const struct spdk_bdev *bdev)
1398 {
1399 	return bdev->optimal_io_boundary;
1400 }
1401 
1402 bool
1403 spdk_bdev_has_write_cache(const struct spdk_bdev *bdev)
1404 {
1405 	return bdev->write_cache;
1406 }
1407 
1408 const struct spdk_uuid *
1409 spdk_bdev_get_uuid(const struct spdk_bdev *bdev)
1410 {
1411 	return &bdev->uuid;
1412 }
1413 
1414 int
1415 spdk_bdev_notify_blockcnt_change(struct spdk_bdev *bdev, uint64_t size)
1416 {
1417 	int ret;
1418 
1419 	pthread_mutex_lock(&bdev->mutex);
1420 
1421 	/* bdev has open descriptors */
1422 	if (!TAILQ_EMPTY(&bdev->open_descs) &&
1423 	    bdev->blockcnt > size) {
1424 		ret = -EBUSY;
1425 	} else {
1426 		bdev->blockcnt = size;
1427 		ret = 0;
1428 	}
1429 
1430 	pthread_mutex_unlock(&bdev->mutex);
1431 
1432 	return ret;
1433 }
1434 
1435 /*
1436  * Convert I/O offset and length from bytes to blocks.
1437  *
1438  * Returns zero on success or non-zero if the byte parameters aren't divisible by the block size.
1439  */
1440 static uint64_t
1441 spdk_bdev_bytes_to_blocks(struct spdk_bdev *bdev, uint64_t offset_bytes, uint64_t *offset_blocks,
1442 			  uint64_t num_bytes, uint64_t *num_blocks)
1443 {
1444 	uint32_t block_size = bdev->blocklen;
1445 
1446 	*offset_blocks = offset_bytes / block_size;
1447 	*num_blocks = num_bytes / block_size;
1448 
1449 	return (offset_bytes % block_size) | (num_bytes % block_size);
1450 }
1451 
1452 static bool
1453 spdk_bdev_io_valid_blocks(struct spdk_bdev *bdev, uint64_t offset_blocks, uint64_t num_blocks)
1454 {
1455 	/* Return failure if offset_blocks + num_blocks is less than offset_blocks; indicates there
1456 	 * has been an overflow and hence the offset has been wrapped around */
1457 	if (offset_blocks + num_blocks < offset_blocks) {
1458 		return false;
1459 	}
1460 
1461 	/* Return failure if offset_blocks + num_blocks exceeds the size of the bdev */
1462 	if (offset_blocks + num_blocks > bdev->blockcnt) {
1463 		return false;
1464 	}
1465 
1466 	return true;
1467 }
1468 
1469 int
1470 spdk_bdev_read(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
1471 	       void *buf, uint64_t offset, uint64_t nbytes,
1472 	       spdk_bdev_io_completion_cb cb, void *cb_arg)
1473 {
1474 	uint64_t offset_blocks, num_blocks;
1475 
1476 	if (spdk_bdev_bytes_to_blocks(desc->bdev, offset, &offset_blocks, nbytes, &num_blocks) != 0) {
1477 		return -EINVAL;
1478 	}
1479 
1480 	return spdk_bdev_read_blocks(desc, ch, buf, offset_blocks, num_blocks, cb, cb_arg);
1481 }
1482 
1483 int
1484 spdk_bdev_read_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
1485 		      void *buf, uint64_t offset_blocks, uint64_t num_blocks,
1486 		      spdk_bdev_io_completion_cb cb, void *cb_arg)
1487 {
1488 	struct spdk_bdev *bdev = desc->bdev;
1489 	struct spdk_bdev_io *bdev_io;
1490 	struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch);
1491 
1492 	if (!spdk_bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) {
1493 		return -EINVAL;
1494 	}
1495 
1496 	bdev_io = spdk_bdev_get_io(channel->mgmt_channel);
1497 	if (!bdev_io) {
1498 		SPDK_ERRLOG("spdk_bdev_io memory allocation failed duing read\n");
1499 		return -ENOMEM;
1500 	}
1501 
1502 	bdev_io->ch = channel;
1503 	bdev_io->type = SPDK_BDEV_IO_TYPE_READ;
1504 	bdev_io->u.bdev.iov.iov_base = buf;
1505 	bdev_io->u.bdev.iov.iov_len = num_blocks * bdev->blocklen;
1506 	bdev_io->u.bdev.iovs = &bdev_io->u.bdev.iov;
1507 	bdev_io->u.bdev.iovcnt = 1;
1508 	bdev_io->u.bdev.num_blocks = num_blocks;
1509 	bdev_io->u.bdev.offset_blocks = offset_blocks;
1510 	spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb);
1511 
1512 	spdk_bdev_io_submit(bdev_io);
1513 	return 0;
1514 }
1515 
1516 int
1517 spdk_bdev_readv(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
1518 		struct iovec *iov, int iovcnt,
1519 		uint64_t offset, uint64_t nbytes,
1520 		spdk_bdev_io_completion_cb cb, void *cb_arg)
1521 {
1522 	uint64_t offset_blocks, num_blocks;
1523 
1524 	if (spdk_bdev_bytes_to_blocks(desc->bdev, offset, &offset_blocks, nbytes, &num_blocks) != 0) {
1525 		return -EINVAL;
1526 	}
1527 
1528 	return spdk_bdev_readv_blocks(desc, ch, iov, iovcnt, offset_blocks, num_blocks, cb, cb_arg);
1529 }
1530 
1531 int spdk_bdev_readv_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
1532 			   struct iovec *iov, int iovcnt,
1533 			   uint64_t offset_blocks, uint64_t num_blocks,
1534 			   spdk_bdev_io_completion_cb cb, void *cb_arg)
1535 {
1536 	struct spdk_bdev *bdev = desc->bdev;
1537 	struct spdk_bdev_io *bdev_io;
1538 	struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch);
1539 
1540 	if (!spdk_bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) {
1541 		return -EINVAL;
1542 	}
1543 
1544 	bdev_io = spdk_bdev_get_io(channel->mgmt_channel);
1545 	if (!bdev_io) {
1546 		SPDK_ERRLOG("spdk_bdev_io memory allocation failed duing read\n");
1547 		return -ENOMEM;
1548 	}
1549 
1550 	bdev_io->ch = channel;
1551 	bdev_io->type = SPDK_BDEV_IO_TYPE_READ;
1552 	bdev_io->u.bdev.iovs = iov;
1553 	bdev_io->u.bdev.iovcnt = iovcnt;
1554 	bdev_io->u.bdev.num_blocks = num_blocks;
1555 	bdev_io->u.bdev.offset_blocks = offset_blocks;
1556 	spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb);
1557 
1558 	spdk_bdev_io_submit(bdev_io);
1559 	return 0;
1560 }
1561 
1562 int
1563 spdk_bdev_write(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
1564 		void *buf, uint64_t offset, uint64_t nbytes,
1565 		spdk_bdev_io_completion_cb cb, void *cb_arg)
1566 {
1567 	uint64_t offset_blocks, num_blocks;
1568 
1569 	if (spdk_bdev_bytes_to_blocks(desc->bdev, offset, &offset_blocks, nbytes, &num_blocks) != 0) {
1570 		return -EINVAL;
1571 	}
1572 
1573 	return spdk_bdev_write_blocks(desc, ch, buf, offset_blocks, num_blocks, cb, cb_arg);
1574 }
1575 
1576 int
1577 spdk_bdev_write_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
1578 		       void *buf, uint64_t offset_blocks, uint64_t num_blocks,
1579 		       spdk_bdev_io_completion_cb cb, void *cb_arg)
1580 {
1581 	struct spdk_bdev *bdev = desc->bdev;
1582 	struct spdk_bdev_io *bdev_io;
1583 	struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch);
1584 
1585 	if (!desc->write) {
1586 		return -EBADF;
1587 	}
1588 
1589 	if (!spdk_bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) {
1590 		return -EINVAL;
1591 	}
1592 
1593 	bdev_io = spdk_bdev_get_io(channel->mgmt_channel);
1594 	if (!bdev_io) {
1595 		SPDK_ERRLOG("bdev_io memory allocation failed duing write\n");
1596 		return -ENOMEM;
1597 	}
1598 
1599 	bdev_io->ch = channel;
1600 	bdev_io->type = SPDK_BDEV_IO_TYPE_WRITE;
1601 	bdev_io->u.bdev.iov.iov_base = buf;
1602 	bdev_io->u.bdev.iov.iov_len = num_blocks * bdev->blocklen;
1603 	bdev_io->u.bdev.iovs = &bdev_io->u.bdev.iov;
1604 	bdev_io->u.bdev.iovcnt = 1;
1605 	bdev_io->u.bdev.num_blocks = num_blocks;
1606 	bdev_io->u.bdev.offset_blocks = offset_blocks;
1607 	spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb);
1608 
1609 	spdk_bdev_io_submit(bdev_io);
1610 	return 0;
1611 }
1612 
1613 int
1614 spdk_bdev_writev(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
1615 		 struct iovec *iov, int iovcnt,
1616 		 uint64_t offset, uint64_t len,
1617 		 spdk_bdev_io_completion_cb cb, void *cb_arg)
1618 {
1619 	uint64_t offset_blocks, num_blocks;
1620 
1621 	if (spdk_bdev_bytes_to_blocks(desc->bdev, offset, &offset_blocks, len, &num_blocks) != 0) {
1622 		return -EINVAL;
1623 	}
1624 
1625 	return spdk_bdev_writev_blocks(desc, ch, iov, iovcnt, offset_blocks, num_blocks, cb, cb_arg);
1626 }
1627 
1628 int
1629 spdk_bdev_writev_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
1630 			struct iovec *iov, int iovcnt,
1631 			uint64_t offset_blocks, uint64_t num_blocks,
1632 			spdk_bdev_io_completion_cb cb, void *cb_arg)
1633 {
1634 	struct spdk_bdev *bdev = desc->bdev;
1635 	struct spdk_bdev_io *bdev_io;
1636 	struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch);
1637 
1638 	if (!desc->write) {
1639 		return -EBADF;
1640 	}
1641 
1642 	if (!spdk_bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) {
1643 		return -EINVAL;
1644 	}
1645 
1646 	bdev_io = spdk_bdev_get_io(channel->mgmt_channel);
1647 	if (!bdev_io) {
1648 		SPDK_ERRLOG("bdev_io memory allocation failed duing writev\n");
1649 		return -ENOMEM;
1650 	}
1651 
1652 	bdev_io->ch = channel;
1653 	bdev_io->type = SPDK_BDEV_IO_TYPE_WRITE;
1654 	bdev_io->u.bdev.iovs = iov;
1655 	bdev_io->u.bdev.iovcnt = iovcnt;
1656 	bdev_io->u.bdev.num_blocks = num_blocks;
1657 	bdev_io->u.bdev.offset_blocks = offset_blocks;
1658 	spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb);
1659 
1660 	spdk_bdev_io_submit(bdev_io);
1661 	return 0;
1662 }
1663 
1664 int
1665 spdk_bdev_write_zeroes(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
1666 		       uint64_t offset, uint64_t len,
1667 		       spdk_bdev_io_completion_cb cb, void *cb_arg)
1668 {
1669 	uint64_t offset_blocks, num_blocks;
1670 
1671 	if (spdk_bdev_bytes_to_blocks(desc->bdev, offset, &offset_blocks, len, &num_blocks) != 0) {
1672 		return -EINVAL;
1673 	}
1674 
1675 	return spdk_bdev_write_zeroes_blocks(desc, ch, offset_blocks, num_blocks, cb, cb_arg);
1676 }
1677 
1678 int
1679 spdk_bdev_write_zeroes_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
1680 			      uint64_t offset_blocks, uint64_t num_blocks,
1681 			      spdk_bdev_io_completion_cb cb, void *cb_arg)
1682 {
1683 	struct spdk_bdev *bdev = desc->bdev;
1684 	struct spdk_bdev_io *bdev_io;
1685 	struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch);
1686 	uint64_t len;
1687 	bool split_request = false;
1688 
1689 	if (num_blocks > UINT64_MAX / spdk_bdev_get_block_size(bdev)) {
1690 		SPDK_ERRLOG("length argument out of range in write_zeroes\n");
1691 		return -ERANGE;
1692 	}
1693 
1694 	if (!spdk_bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) {
1695 		return -EINVAL;
1696 	}
1697 
1698 	bdev_io = spdk_bdev_get_io(channel->mgmt_channel);
1699 
1700 	if (!bdev_io) {
1701 		SPDK_ERRLOG("bdev_io memory allocation failed duing write_zeroes\n");
1702 		return -ENOMEM;
1703 	}
1704 
1705 	bdev_io->ch = channel;
1706 	bdev_io->u.bdev.offset_blocks = offset_blocks;
1707 
1708 	if (spdk_bdev_io_type_supported(bdev, SPDK_BDEV_IO_TYPE_WRITE_ZEROES)) {
1709 		bdev_io->type = SPDK_BDEV_IO_TYPE_WRITE_ZEROES;
1710 		bdev_io->u.bdev.num_blocks = num_blocks;
1711 		bdev_io->u.bdev.iovs = NULL;
1712 		bdev_io->u.bdev.iovcnt = 0;
1713 
1714 	} else {
1715 		assert(spdk_bdev_get_block_size(bdev) <= ZERO_BUFFER_SIZE);
1716 
1717 		len = spdk_bdev_get_block_size(bdev) * num_blocks;
1718 
1719 		if (len > ZERO_BUFFER_SIZE) {
1720 			split_request = true;
1721 			len = ZERO_BUFFER_SIZE;
1722 		}
1723 
1724 		bdev_io->type = SPDK_BDEV_IO_TYPE_WRITE;
1725 		bdev_io->u.bdev.iov.iov_base = g_bdev_mgr.zero_buffer;
1726 		bdev_io->u.bdev.iov.iov_len = len;
1727 		bdev_io->u.bdev.iovs = &bdev_io->u.bdev.iov;
1728 		bdev_io->u.bdev.iovcnt = 1;
1729 		bdev_io->u.bdev.num_blocks = len / spdk_bdev_get_block_size(bdev);
1730 		bdev_io->u.bdev.split_remaining_num_blocks = num_blocks - bdev_io->u.bdev.num_blocks;
1731 		bdev_io->u.bdev.split_current_offset_blocks = offset_blocks + bdev_io->u.bdev.num_blocks;
1732 	}
1733 
1734 	if (split_request) {
1735 		bdev_io->u.bdev.stored_user_cb = cb;
1736 		spdk_bdev_io_init(bdev_io, bdev, cb_arg, spdk_bdev_write_zeroes_split);
1737 	} else {
1738 		spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb);
1739 	}
1740 	spdk_bdev_io_submit(bdev_io);
1741 	return 0;
1742 }
1743 
1744 int
1745 spdk_bdev_unmap(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
1746 		uint64_t offset, uint64_t nbytes,
1747 		spdk_bdev_io_completion_cb cb, void *cb_arg)
1748 {
1749 	uint64_t offset_blocks, num_blocks;
1750 
1751 	if (spdk_bdev_bytes_to_blocks(desc->bdev, offset, &offset_blocks, nbytes, &num_blocks) != 0) {
1752 		return -EINVAL;
1753 	}
1754 
1755 	return spdk_bdev_unmap_blocks(desc, ch, offset_blocks, num_blocks, cb, cb_arg);
1756 }
1757 
1758 int
1759 spdk_bdev_unmap_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
1760 		       uint64_t offset_blocks, uint64_t num_blocks,
1761 		       spdk_bdev_io_completion_cb cb, void *cb_arg)
1762 {
1763 	struct spdk_bdev *bdev = desc->bdev;
1764 	struct spdk_bdev_io *bdev_io;
1765 	struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch);
1766 
1767 	if (!desc->write) {
1768 		return -EBADF;
1769 	}
1770 
1771 	if (!spdk_bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) {
1772 		return -EINVAL;
1773 	}
1774 
1775 	if (num_blocks == 0) {
1776 		SPDK_ERRLOG("Can't unmap 0 bytes\n");
1777 		return -EINVAL;
1778 	}
1779 
1780 	bdev_io = spdk_bdev_get_io(channel->mgmt_channel);
1781 	if (!bdev_io) {
1782 		SPDK_ERRLOG("bdev_io memory allocation failed duing unmap\n");
1783 		return -ENOMEM;
1784 	}
1785 
1786 	bdev_io->ch = channel;
1787 	bdev_io->type = SPDK_BDEV_IO_TYPE_UNMAP;
1788 	bdev_io->u.bdev.iov.iov_base = NULL;
1789 	bdev_io->u.bdev.iov.iov_len = 0;
1790 	bdev_io->u.bdev.iovs = &bdev_io->u.bdev.iov;
1791 	bdev_io->u.bdev.iovcnt = 1;
1792 	bdev_io->u.bdev.offset_blocks = offset_blocks;
1793 	bdev_io->u.bdev.num_blocks = num_blocks;
1794 	spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb);
1795 
1796 	spdk_bdev_io_submit(bdev_io);
1797 	return 0;
1798 }
1799 
1800 int
1801 spdk_bdev_flush(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
1802 		uint64_t offset, uint64_t length,
1803 		spdk_bdev_io_completion_cb cb, void *cb_arg)
1804 {
1805 	uint64_t offset_blocks, num_blocks;
1806 
1807 	if (spdk_bdev_bytes_to_blocks(desc->bdev, offset, &offset_blocks, length, &num_blocks) != 0) {
1808 		return -EINVAL;
1809 	}
1810 
1811 	return spdk_bdev_flush_blocks(desc, ch, offset_blocks, num_blocks, cb, cb_arg);
1812 }
1813 
1814 int
1815 spdk_bdev_flush_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
1816 		       uint64_t offset_blocks, uint64_t num_blocks,
1817 		       spdk_bdev_io_completion_cb cb, void *cb_arg)
1818 {
1819 	struct spdk_bdev *bdev = desc->bdev;
1820 	struct spdk_bdev_io *bdev_io;
1821 	struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch);
1822 
1823 	if (!desc->write) {
1824 		return -EBADF;
1825 	}
1826 
1827 	if (!spdk_bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) {
1828 		return -EINVAL;
1829 	}
1830 
1831 	bdev_io = spdk_bdev_get_io(channel->mgmt_channel);
1832 	if (!bdev_io) {
1833 		SPDK_ERRLOG("bdev_io memory allocation failed duing flush\n");
1834 		return -ENOMEM;
1835 	}
1836 
1837 	bdev_io->ch = channel;
1838 	bdev_io->type = SPDK_BDEV_IO_TYPE_FLUSH;
1839 	bdev_io->u.bdev.iovs = NULL;
1840 	bdev_io->u.bdev.iovcnt = 0;
1841 	bdev_io->u.bdev.offset_blocks = offset_blocks;
1842 	bdev_io->u.bdev.num_blocks = num_blocks;
1843 	spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb);
1844 
1845 	spdk_bdev_io_submit(bdev_io);
1846 	return 0;
1847 }
1848 
1849 static void
1850 _spdk_bdev_reset_dev(struct spdk_io_channel_iter *i, int status)
1851 {
1852 	struct spdk_bdev_channel *ch = spdk_io_channel_iter_get_ctx(i);
1853 	struct spdk_bdev_io *bdev_io;
1854 
1855 	bdev_io = TAILQ_FIRST(&ch->queued_resets);
1856 	TAILQ_REMOVE(&ch->queued_resets, bdev_io, link);
1857 	spdk_bdev_io_submit_reset(bdev_io);
1858 }
1859 
1860 static void
1861 _spdk_bdev_reset_freeze_channel(struct spdk_io_channel_iter *i)
1862 {
1863 	struct spdk_io_channel		*ch;
1864 	struct spdk_bdev_channel	*channel;
1865 	struct spdk_bdev_mgmt_channel	*mgmt_channel;
1866 	struct spdk_bdev_module_channel	*shared_ch;
1867 
1868 	ch = spdk_io_channel_iter_get_channel(i);
1869 	channel = spdk_io_channel_get_ctx(ch);
1870 	mgmt_channel = spdk_io_channel_get_ctx(channel->mgmt_channel);
1871 	shared_ch = channel->module_ch;
1872 
1873 	channel->flags |= BDEV_CH_RESET_IN_PROGRESS;
1874 
1875 	_spdk_bdev_abort_queued_io(&shared_ch->nomem_io, channel);
1876 	_spdk_bdev_abort_queued_io(&channel->qos_io, channel);
1877 	_spdk_bdev_abort_buf_io(&mgmt_channel->need_buf_small, channel);
1878 	_spdk_bdev_abort_buf_io(&mgmt_channel->need_buf_large, channel);
1879 
1880 	spdk_for_each_channel_continue(i, 0);
1881 }
1882 
1883 static void
1884 _spdk_bdev_start_reset(void *ctx)
1885 {
1886 	struct spdk_bdev_channel *ch = ctx;
1887 
1888 	spdk_for_each_channel(__bdev_to_io_dev(ch->bdev), _spdk_bdev_reset_freeze_channel,
1889 			      ch, _spdk_bdev_reset_dev);
1890 }
1891 
1892 static void
1893 _spdk_bdev_channel_start_reset(struct spdk_bdev_channel *ch)
1894 {
1895 	struct spdk_bdev *bdev = ch->bdev;
1896 
1897 	assert(!TAILQ_EMPTY(&ch->queued_resets));
1898 
1899 	pthread_mutex_lock(&bdev->mutex);
1900 	if (bdev->reset_in_progress == NULL) {
1901 		bdev->reset_in_progress = TAILQ_FIRST(&ch->queued_resets);
1902 		/*
1903 		 * Take a channel reference for the target bdev for the life of this
1904 		 *  reset.  This guards against the channel getting destroyed while
1905 		 *  spdk_for_each_channel() calls related to this reset IO are in
1906 		 *  progress.  We will release the reference when this reset is
1907 		 *  completed.
1908 		 */
1909 		bdev->reset_in_progress->u.reset.ch_ref = spdk_get_io_channel(__bdev_to_io_dev(bdev));
1910 		_spdk_bdev_start_reset(ch);
1911 	}
1912 	pthread_mutex_unlock(&bdev->mutex);
1913 }
1914 
1915 int
1916 spdk_bdev_reset(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
1917 		spdk_bdev_io_completion_cb cb, void *cb_arg)
1918 {
1919 	struct spdk_bdev *bdev = desc->bdev;
1920 	struct spdk_bdev_io *bdev_io;
1921 	struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch);
1922 
1923 	bdev_io = spdk_bdev_get_io(channel->mgmt_channel);
1924 	if (!bdev_io) {
1925 		SPDK_ERRLOG("bdev_io memory allocation failed duing reset\n");
1926 		return -ENOMEM;
1927 	}
1928 
1929 	bdev_io->ch = channel;
1930 	bdev_io->type = SPDK_BDEV_IO_TYPE_RESET;
1931 	bdev_io->u.reset.ch_ref = NULL;
1932 	spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb);
1933 
1934 	pthread_mutex_lock(&bdev->mutex);
1935 	TAILQ_INSERT_TAIL(&channel->queued_resets, bdev_io, link);
1936 	pthread_mutex_unlock(&bdev->mutex);
1937 
1938 	_spdk_bdev_channel_start_reset(channel);
1939 
1940 	return 0;
1941 }
1942 
1943 void
1944 spdk_bdev_get_io_stat(struct spdk_bdev *bdev, struct spdk_io_channel *ch,
1945 		      struct spdk_bdev_io_stat *stat)
1946 {
1947 #ifdef SPDK_CONFIG_VTUNE
1948 	SPDK_ERRLOG("Calling spdk_bdev_get_io_stat is not allowed when VTune integration is enabled.\n");
1949 	memset(stat, 0, sizeof(*stat));
1950 	return;
1951 #endif
1952 
1953 	struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch);
1954 
1955 	channel->stat.ticks_rate = spdk_get_ticks_hz();
1956 	*stat = channel->stat;
1957 	memset(&channel->stat, 0, sizeof(channel->stat));
1958 }
1959 
1960 int
1961 spdk_bdev_nvme_admin_passthru(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
1962 			      const struct spdk_nvme_cmd *cmd, void *buf, size_t nbytes,
1963 			      spdk_bdev_io_completion_cb cb, void *cb_arg)
1964 {
1965 	struct spdk_bdev *bdev = desc->bdev;
1966 	struct spdk_bdev_io *bdev_io;
1967 	struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch);
1968 
1969 	if (!desc->write) {
1970 		return -EBADF;
1971 	}
1972 
1973 	bdev_io = spdk_bdev_get_io(channel->mgmt_channel);
1974 	if (!bdev_io) {
1975 		SPDK_ERRLOG("bdev_io memory allocation failed during nvme_admin_passthru\n");
1976 		return -ENOMEM;
1977 	}
1978 
1979 	bdev_io->ch = channel;
1980 	bdev_io->type = SPDK_BDEV_IO_TYPE_NVME_ADMIN;
1981 	bdev_io->u.nvme_passthru.cmd = *cmd;
1982 	bdev_io->u.nvme_passthru.buf = buf;
1983 	bdev_io->u.nvme_passthru.nbytes = nbytes;
1984 	bdev_io->u.nvme_passthru.md_buf = NULL;
1985 	bdev_io->u.nvme_passthru.md_len = 0;
1986 
1987 	spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb);
1988 
1989 	spdk_bdev_io_submit(bdev_io);
1990 	return 0;
1991 }
1992 
1993 int
1994 spdk_bdev_nvme_io_passthru(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
1995 			   const struct spdk_nvme_cmd *cmd, void *buf, size_t nbytes,
1996 			   spdk_bdev_io_completion_cb cb, void *cb_arg)
1997 {
1998 	struct spdk_bdev *bdev = desc->bdev;
1999 	struct spdk_bdev_io *bdev_io;
2000 	struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch);
2001 
2002 	if (!desc->write) {
2003 		/*
2004 		 * Do not try to parse the NVMe command - we could maybe use bits in the opcode
2005 		 *  to easily determine if the command is a read or write, but for now just
2006 		 *  do not allow io_passthru with a read-only descriptor.
2007 		 */
2008 		return -EBADF;
2009 	}
2010 
2011 	bdev_io = spdk_bdev_get_io(channel->mgmt_channel);
2012 	if (!bdev_io) {
2013 		SPDK_ERRLOG("bdev_io memory allocation failed during nvme_admin_passthru\n");
2014 		return -ENOMEM;
2015 	}
2016 
2017 	bdev_io->ch = channel;
2018 	bdev_io->type = SPDK_BDEV_IO_TYPE_NVME_IO;
2019 	bdev_io->u.nvme_passthru.cmd = *cmd;
2020 	bdev_io->u.nvme_passthru.buf = buf;
2021 	bdev_io->u.nvme_passthru.nbytes = nbytes;
2022 	bdev_io->u.nvme_passthru.md_buf = NULL;
2023 	bdev_io->u.nvme_passthru.md_len = 0;
2024 
2025 	spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb);
2026 
2027 	spdk_bdev_io_submit(bdev_io);
2028 	return 0;
2029 }
2030 
2031 int
2032 spdk_bdev_nvme_io_passthru_md(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
2033 			      const struct spdk_nvme_cmd *cmd, void *buf, size_t nbytes, void *md_buf, size_t md_len,
2034 			      spdk_bdev_io_completion_cb cb, void *cb_arg)
2035 {
2036 	struct spdk_bdev *bdev = desc->bdev;
2037 	struct spdk_bdev_io *bdev_io;
2038 	struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch);
2039 
2040 	if (!desc->write) {
2041 		/*
2042 		 * Do not try to parse the NVMe command - we could maybe use bits in the opcode
2043 		 *  to easily determine if the command is a read or write, but for now just
2044 		 *  do not allow io_passthru with a read-only descriptor.
2045 		 */
2046 		return -EBADF;
2047 	}
2048 
2049 	bdev_io = spdk_bdev_get_io(channel->mgmt_channel);
2050 	if (!bdev_io) {
2051 		SPDK_ERRLOG("bdev_io memory allocation failed during nvme_admin_passthru\n");
2052 		return -ENOMEM;
2053 	}
2054 
2055 	bdev_io->ch = channel;
2056 	bdev_io->type = SPDK_BDEV_IO_TYPE_NVME_IO_MD;
2057 	bdev_io->u.nvme_passthru.cmd = *cmd;
2058 	bdev_io->u.nvme_passthru.buf = buf;
2059 	bdev_io->u.nvme_passthru.nbytes = nbytes;
2060 	bdev_io->u.nvme_passthru.md_buf = md_buf;
2061 	bdev_io->u.nvme_passthru.md_len = md_len;
2062 
2063 	spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb);
2064 
2065 	spdk_bdev_io_submit(bdev_io);
2066 	return 0;
2067 }
2068 
2069 int
2070 spdk_bdev_free_io(struct spdk_bdev_io *bdev_io)
2071 {
2072 	if (!bdev_io) {
2073 		SPDK_ERRLOG("bdev_io is NULL\n");
2074 		return -1;
2075 	}
2076 
2077 	if (bdev_io->status == SPDK_BDEV_IO_STATUS_PENDING) {
2078 		SPDK_ERRLOG("bdev_io is in pending state\n");
2079 		assert(false);
2080 		return -1;
2081 	}
2082 
2083 	spdk_bdev_put_io(bdev_io);
2084 
2085 	return 0;
2086 }
2087 
2088 static void
2089 _spdk_bdev_ch_retry_io(struct spdk_bdev_channel *bdev_ch)
2090 {
2091 	struct spdk_bdev *bdev = bdev_ch->bdev;
2092 	struct spdk_bdev_module_channel	*shared_ch = bdev_ch->module_ch;
2093 	struct spdk_bdev_io *bdev_io;
2094 
2095 	if (shared_ch->io_outstanding > shared_ch->nomem_threshold) {
2096 		/*
2097 		 * Allow some more I/O to complete before retrying the nomem_io queue.
2098 		 *  Some drivers (such as nvme) cannot immediately take a new I/O in
2099 		 *  the context of a completion, because the resources for the I/O are
2100 		 *  not released until control returns to the bdev poller.  Also, we
2101 		 *  may require several small I/O to complete before a larger I/O
2102 		 *  (that requires splitting) can be submitted.
2103 		 */
2104 		return;
2105 	}
2106 
2107 	while (!TAILQ_EMPTY(&shared_ch->nomem_io)) {
2108 		bdev_io = TAILQ_FIRST(&shared_ch->nomem_io);
2109 		TAILQ_REMOVE(&shared_ch->nomem_io, bdev_io, link);
2110 		bdev_io->ch->io_outstanding++;
2111 		shared_ch->io_outstanding++;
2112 		bdev_io->status = SPDK_BDEV_IO_STATUS_PENDING;
2113 		bdev->fn_table->submit_request(bdev_io->ch->channel, bdev_io);
2114 		if (bdev_io->status == SPDK_BDEV_IO_STATUS_NOMEM) {
2115 			break;
2116 		}
2117 	}
2118 }
2119 
2120 static inline void
2121 _spdk_bdev_io_complete(void *ctx)
2122 {
2123 	struct spdk_bdev_io *bdev_io = ctx;
2124 
2125 	if (spdk_unlikely(bdev_io->in_submit_request || bdev_io->io_submit_ch)) {
2126 		/*
2127 		 * Send the completion to the thread that originally submitted the I/O,
2128 		 * which may not be the current thread in the case of QoS.
2129 		 */
2130 		if (bdev_io->io_submit_ch) {
2131 			bdev_io->ch = bdev_io->io_submit_ch;
2132 			bdev_io->io_submit_ch = NULL;
2133 		}
2134 
2135 		/*
2136 		 * Defer completion to avoid potential infinite recursion if the
2137 		 * user's completion callback issues a new I/O.
2138 		 */
2139 		spdk_thread_send_msg(spdk_io_channel_get_thread(bdev_io->ch->channel),
2140 				     _spdk_bdev_io_complete, bdev_io);
2141 		return;
2142 	}
2143 
2144 	assert(bdev_io->cb != NULL);
2145 	assert(spdk_get_thread() == spdk_io_channel_get_thread(bdev_io->ch->channel));
2146 
2147 	bdev_io->cb(bdev_io, bdev_io->status == SPDK_BDEV_IO_STATUS_SUCCESS,
2148 		    bdev_io->caller_ctx);
2149 }
2150 
2151 static void
2152 _spdk_bdev_reset_complete(struct spdk_io_channel_iter *i, int status)
2153 {
2154 	struct spdk_bdev_io *bdev_io = spdk_io_channel_iter_get_ctx(i);
2155 
2156 	if (bdev_io->u.reset.ch_ref != NULL) {
2157 		spdk_put_io_channel(bdev_io->u.reset.ch_ref);
2158 		bdev_io->u.reset.ch_ref = NULL;
2159 	}
2160 
2161 	_spdk_bdev_io_complete(bdev_io);
2162 }
2163 
2164 static void
2165 _spdk_bdev_unfreeze_channel(struct spdk_io_channel_iter *i)
2166 {
2167 	struct spdk_io_channel *_ch = spdk_io_channel_iter_get_channel(i);
2168 	struct spdk_bdev_channel *ch = spdk_io_channel_get_ctx(_ch);
2169 
2170 	ch->flags &= ~BDEV_CH_RESET_IN_PROGRESS;
2171 	if (!TAILQ_EMPTY(&ch->queued_resets)) {
2172 		_spdk_bdev_channel_start_reset(ch);
2173 	}
2174 
2175 	spdk_for_each_channel_continue(i, 0);
2176 }
2177 
2178 void
2179 spdk_bdev_io_complete(struct spdk_bdev_io *bdev_io, enum spdk_bdev_io_status status)
2180 {
2181 	struct spdk_bdev *bdev = bdev_io->bdev;
2182 	struct spdk_bdev_channel *bdev_ch = bdev_io->ch;
2183 	struct spdk_bdev_module_channel	*shared_ch = bdev_ch->module_ch;
2184 
2185 	bdev_io->status = status;
2186 
2187 	if (spdk_unlikely(bdev_io->type == SPDK_BDEV_IO_TYPE_RESET)) {
2188 		bool unlock_channels = false;
2189 
2190 		if (status == SPDK_BDEV_IO_STATUS_NOMEM) {
2191 			SPDK_ERRLOG("NOMEM returned for reset\n");
2192 		}
2193 		pthread_mutex_lock(&bdev->mutex);
2194 		if (bdev_io == bdev->reset_in_progress) {
2195 			bdev->reset_in_progress = NULL;
2196 			unlock_channels = true;
2197 		}
2198 		pthread_mutex_unlock(&bdev->mutex);
2199 
2200 		if (unlock_channels) {
2201 			spdk_for_each_channel(__bdev_to_io_dev(bdev), _spdk_bdev_unfreeze_channel,
2202 					      bdev_io, _spdk_bdev_reset_complete);
2203 			return;
2204 		}
2205 	} else {
2206 		assert(bdev_ch->io_outstanding > 0);
2207 		assert(shared_ch->io_outstanding > 0);
2208 		bdev_ch->io_outstanding--;
2209 		shared_ch->io_outstanding--;
2210 
2211 		if (spdk_unlikely(status == SPDK_BDEV_IO_STATUS_NOMEM)) {
2212 			TAILQ_INSERT_HEAD(&shared_ch->nomem_io, bdev_io, link);
2213 			/*
2214 			 * Wait for some of the outstanding I/O to complete before we
2215 			 *  retry any of the nomem_io.  Normally we will wait for
2216 			 *  NOMEM_THRESHOLD_COUNT I/O to complete but for low queue
2217 			 *  depth channels we will instead wait for half to complete.
2218 			 */
2219 			shared_ch->nomem_threshold = spdk_max((int64_t)shared_ch->io_outstanding / 2,
2220 							      (int64_t)shared_ch->io_outstanding - NOMEM_THRESHOLD_COUNT);
2221 			return;
2222 		}
2223 
2224 		if (spdk_unlikely(!TAILQ_EMPTY(&shared_ch->nomem_io))) {
2225 			_spdk_bdev_ch_retry_io(bdev_ch);
2226 		}
2227 	}
2228 
2229 	if (status == SPDK_BDEV_IO_STATUS_SUCCESS) {
2230 		switch (bdev_io->type) {
2231 		case SPDK_BDEV_IO_TYPE_READ:
2232 			bdev_ch->stat.bytes_read += bdev_io->u.bdev.num_blocks * bdev->blocklen;
2233 			bdev_ch->stat.num_read_ops++;
2234 			bdev_ch->stat.read_latency_ticks += (spdk_get_ticks() - bdev_io->submit_tsc);
2235 			break;
2236 		case SPDK_BDEV_IO_TYPE_WRITE:
2237 			bdev_ch->stat.bytes_written += bdev_io->u.bdev.num_blocks * bdev->blocklen;
2238 			bdev_ch->stat.num_write_ops++;
2239 			bdev_ch->stat.write_latency_ticks += (spdk_get_ticks() - bdev_io->submit_tsc);
2240 			break;
2241 		default:
2242 			break;
2243 		}
2244 	}
2245 
2246 #ifdef SPDK_CONFIG_VTUNE
2247 	uint64_t now_tsc = spdk_get_ticks();
2248 	if (now_tsc > (bdev_ch->start_tsc + bdev_ch->interval_tsc)) {
2249 		uint64_t data[5];
2250 
2251 		data[0] = bdev_ch->stat.num_read_ops;
2252 		data[1] = bdev_ch->stat.bytes_read;
2253 		data[2] = bdev_ch->stat.num_write_ops;
2254 		data[3] = bdev_ch->stat.bytes_written;
2255 		data[4] = bdev->fn_table->get_spin_time ?
2256 			  bdev->fn_table->get_spin_time(bdev_ch->channel) : 0;
2257 
2258 		__itt_metadata_add(g_bdev_mgr.domain, __itt_null, bdev_ch->handle,
2259 				   __itt_metadata_u64, 5, data);
2260 
2261 		memset(&bdev_ch->stat, 0, sizeof(bdev_ch->stat));
2262 		bdev_ch->start_tsc = now_tsc;
2263 	}
2264 #endif
2265 
2266 	_spdk_bdev_io_complete(bdev_io);
2267 }
2268 
2269 void
2270 spdk_bdev_io_complete_scsi_status(struct spdk_bdev_io *bdev_io, enum spdk_scsi_status sc,
2271 				  enum spdk_scsi_sense sk, uint8_t asc, uint8_t ascq)
2272 {
2273 	if (sc == SPDK_SCSI_STATUS_GOOD) {
2274 		bdev_io->status = SPDK_BDEV_IO_STATUS_SUCCESS;
2275 	} else {
2276 		bdev_io->status = SPDK_BDEV_IO_STATUS_SCSI_ERROR;
2277 		bdev_io->error.scsi.sc = sc;
2278 		bdev_io->error.scsi.sk = sk;
2279 		bdev_io->error.scsi.asc = asc;
2280 		bdev_io->error.scsi.ascq = ascq;
2281 	}
2282 
2283 	spdk_bdev_io_complete(bdev_io, bdev_io->status);
2284 }
2285 
2286 void
2287 spdk_bdev_io_get_scsi_status(const struct spdk_bdev_io *bdev_io,
2288 			     int *sc, int *sk, int *asc, int *ascq)
2289 {
2290 	assert(sc != NULL);
2291 	assert(sk != NULL);
2292 	assert(asc != NULL);
2293 	assert(ascq != NULL);
2294 
2295 	switch (bdev_io->status) {
2296 	case SPDK_BDEV_IO_STATUS_SUCCESS:
2297 		*sc = SPDK_SCSI_STATUS_GOOD;
2298 		*sk = SPDK_SCSI_SENSE_NO_SENSE;
2299 		*asc = SPDK_SCSI_ASC_NO_ADDITIONAL_SENSE;
2300 		*ascq = SPDK_SCSI_ASCQ_CAUSE_NOT_REPORTABLE;
2301 		break;
2302 	case SPDK_BDEV_IO_STATUS_NVME_ERROR:
2303 		spdk_scsi_nvme_translate(bdev_io, sc, sk, asc, ascq);
2304 		break;
2305 	case SPDK_BDEV_IO_STATUS_SCSI_ERROR:
2306 		*sc = bdev_io->error.scsi.sc;
2307 		*sk = bdev_io->error.scsi.sk;
2308 		*asc = bdev_io->error.scsi.asc;
2309 		*ascq = bdev_io->error.scsi.ascq;
2310 		break;
2311 	default:
2312 		*sc = SPDK_SCSI_STATUS_CHECK_CONDITION;
2313 		*sk = SPDK_SCSI_SENSE_ABORTED_COMMAND;
2314 		*asc = SPDK_SCSI_ASC_NO_ADDITIONAL_SENSE;
2315 		*ascq = SPDK_SCSI_ASCQ_CAUSE_NOT_REPORTABLE;
2316 		break;
2317 	}
2318 }
2319 
2320 void
2321 spdk_bdev_io_complete_nvme_status(struct spdk_bdev_io *bdev_io, int sct, int sc)
2322 {
2323 	if (sct == SPDK_NVME_SCT_GENERIC && sc == SPDK_NVME_SC_SUCCESS) {
2324 		bdev_io->status = SPDK_BDEV_IO_STATUS_SUCCESS;
2325 	} else {
2326 		bdev_io->error.nvme.sct = sct;
2327 		bdev_io->error.nvme.sc = sc;
2328 		bdev_io->status = SPDK_BDEV_IO_STATUS_NVME_ERROR;
2329 	}
2330 
2331 	spdk_bdev_io_complete(bdev_io, bdev_io->status);
2332 }
2333 
2334 void
2335 spdk_bdev_io_get_nvme_status(const struct spdk_bdev_io *bdev_io, int *sct, int *sc)
2336 {
2337 	assert(sct != NULL);
2338 	assert(sc != NULL);
2339 
2340 	if (bdev_io->status == SPDK_BDEV_IO_STATUS_NVME_ERROR) {
2341 		*sct = bdev_io->error.nvme.sct;
2342 		*sc = bdev_io->error.nvme.sc;
2343 	} else if (bdev_io->status == SPDK_BDEV_IO_STATUS_SUCCESS) {
2344 		*sct = SPDK_NVME_SCT_GENERIC;
2345 		*sc = SPDK_NVME_SC_SUCCESS;
2346 	} else {
2347 		*sct = SPDK_NVME_SCT_GENERIC;
2348 		*sc = SPDK_NVME_SC_INTERNAL_DEVICE_ERROR;
2349 	}
2350 }
2351 
2352 struct spdk_thread *
2353 spdk_bdev_io_get_thread(struct spdk_bdev_io *bdev_io)
2354 {
2355 	return spdk_io_channel_get_thread(bdev_io->ch->channel);
2356 }
2357 
2358 static int
2359 _spdk_bdev_register(struct spdk_bdev *bdev)
2360 {
2361 	struct spdk_bdev_module *module;
2362 
2363 	assert(bdev->module != NULL);
2364 
2365 	if (!bdev->name) {
2366 		SPDK_ERRLOG("Bdev name is NULL\n");
2367 		return -EINVAL;
2368 	}
2369 
2370 	if (spdk_bdev_get_by_name(bdev->name)) {
2371 		SPDK_ERRLOG("Bdev name:%s already exists\n", bdev->name);
2372 		return -EEXIST;
2373 	}
2374 
2375 	bdev->status = SPDK_BDEV_STATUS_READY;
2376 
2377 	TAILQ_INIT(&bdev->open_descs);
2378 
2379 	TAILQ_INIT(&bdev->vbdevs);
2380 	TAILQ_INIT(&bdev->base_bdevs);
2381 
2382 	TAILQ_INIT(&bdev->aliases);
2383 
2384 	bdev->reset_in_progress = NULL;
2385 
2386 	spdk_io_device_register(__bdev_to_io_dev(bdev),
2387 				spdk_bdev_channel_create, spdk_bdev_channel_destroy,
2388 				sizeof(struct spdk_bdev_channel));
2389 
2390 	pthread_mutex_init(&bdev->mutex, NULL);
2391 	SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Inserting bdev %s into list\n", bdev->name);
2392 	TAILQ_INSERT_TAIL(&g_bdev_mgr.bdevs, bdev, link);
2393 
2394 	TAILQ_FOREACH(module, &g_bdev_mgr.bdev_modules, tailq) {
2395 		if (module->examine) {
2396 			module->action_in_progress++;
2397 			module->examine(bdev);
2398 		}
2399 	}
2400 
2401 	return 0;
2402 }
2403 
2404 int
2405 spdk_bdev_register(struct spdk_bdev *bdev)
2406 {
2407 	return _spdk_bdev_register(bdev);
2408 }
2409 
2410 int
2411 spdk_vbdev_register(struct spdk_bdev *vbdev, struct spdk_bdev **base_bdevs, int base_bdev_count)
2412 {
2413 	int i, rc;
2414 
2415 	rc = _spdk_bdev_register(vbdev);
2416 	if (rc) {
2417 		return rc;
2418 	}
2419 
2420 	for (i = 0; i < base_bdev_count; i++) {
2421 		assert(base_bdevs[i] != NULL);
2422 		assert(base_bdevs[i]->claim_module != NULL);
2423 		TAILQ_INSERT_TAIL(&vbdev->base_bdevs, base_bdevs[i], base_bdev_link);
2424 		TAILQ_INSERT_TAIL(&base_bdevs[i]->vbdevs, vbdev, vbdev_link);
2425 	}
2426 
2427 	return 0;
2428 }
2429 
2430 void
2431 spdk_bdev_unregister_done(struct spdk_bdev *bdev, int bdeverrno)
2432 {
2433 	if (bdev->unregister_cb != NULL) {
2434 		bdev->unregister_cb(bdev->unregister_ctx, bdeverrno);
2435 	}
2436 }
2437 
2438 static void
2439 _remove_notify(void *arg)
2440 {
2441 	struct spdk_bdev_desc *desc = arg;
2442 
2443 	desc->remove_cb(desc->remove_ctx);
2444 }
2445 
2446 void
2447 spdk_bdev_unregister(struct spdk_bdev *bdev, spdk_bdev_unregister_cb cb_fn, void *cb_arg)
2448 {
2449 	struct spdk_bdev_desc	*desc, *tmp;
2450 	int			rc;
2451 	bool			do_destruct = true;
2452 	struct spdk_bdev	*base_bdev;
2453 
2454 	SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Removing bdev %s from list\n", bdev->name);
2455 
2456 	pthread_mutex_lock(&bdev->mutex);
2457 
2458 	if (!TAILQ_EMPTY(&bdev->base_bdevs)) {
2459 		TAILQ_FOREACH(base_bdev, &bdev->base_bdevs, base_bdev_link) {
2460 			TAILQ_REMOVE(&base_bdev->vbdevs, bdev, vbdev_link);
2461 		}
2462 	}
2463 
2464 	bdev->status = SPDK_BDEV_STATUS_REMOVING;
2465 	bdev->unregister_cb = cb_fn;
2466 	bdev->unregister_ctx = cb_arg;
2467 
2468 	TAILQ_FOREACH_SAFE(desc, &bdev->open_descs, link, tmp) {
2469 		if (desc->remove_cb) {
2470 			do_destruct = false;
2471 			/*
2472 			 * Defer invocation of the remove_cb to a separate message that will
2473 			 *  run later on this thread.  This ensures this context unwinds and
2474 			 *  we don't recursively unregister this bdev again if the remove_cb
2475 			 *  immediately closes its descriptor.
2476 			 */
2477 			spdk_thread_send_msg(spdk_get_thread(), _remove_notify, desc);
2478 		}
2479 	}
2480 
2481 	if (!do_destruct) {
2482 		pthread_mutex_unlock(&bdev->mutex);
2483 		return;
2484 	}
2485 
2486 	TAILQ_REMOVE(&g_bdev_mgr.bdevs, bdev, link);
2487 	pthread_mutex_unlock(&bdev->mutex);
2488 
2489 	pthread_mutex_destroy(&bdev->mutex);
2490 
2491 	spdk_io_device_unregister(__bdev_to_io_dev(bdev), NULL);
2492 
2493 	rc = bdev->fn_table->destruct(bdev->ctxt);
2494 	if (rc < 0) {
2495 		SPDK_ERRLOG("destruct failed\n");
2496 	}
2497 	if (rc <= 0 && cb_fn != NULL) {
2498 		cb_fn(cb_arg, rc);
2499 	}
2500 }
2501 
2502 int
2503 spdk_bdev_open(struct spdk_bdev *bdev, bool write, spdk_bdev_remove_cb_t remove_cb,
2504 	       void *remove_ctx, struct spdk_bdev_desc **_desc)
2505 {
2506 	struct spdk_bdev_desc *desc;
2507 
2508 	desc = calloc(1, sizeof(*desc));
2509 	if (desc == NULL) {
2510 		SPDK_ERRLOG("Failed to allocate memory for bdev descriptor\n");
2511 		return -ENOMEM;
2512 	}
2513 
2514 	pthread_mutex_lock(&bdev->mutex);
2515 
2516 	if (write && bdev->claim_module) {
2517 		SPDK_INFOLOG(SPDK_LOG_BDEV, "Could not open %s - already claimed\n", bdev->name);
2518 		free(desc);
2519 		pthread_mutex_unlock(&bdev->mutex);
2520 		return -EPERM;
2521 	}
2522 
2523 	TAILQ_INSERT_TAIL(&bdev->open_descs, desc, link);
2524 
2525 	desc->bdev = bdev;
2526 	desc->remove_cb = remove_cb;
2527 	desc->remove_ctx = remove_ctx;
2528 	desc->write = write;
2529 	*_desc = desc;
2530 
2531 	pthread_mutex_unlock(&bdev->mutex);
2532 
2533 	return 0;
2534 }
2535 
2536 void
2537 spdk_bdev_close(struct spdk_bdev_desc *desc)
2538 {
2539 	struct spdk_bdev *bdev = desc->bdev;
2540 	bool do_unregister = false;
2541 
2542 	pthread_mutex_lock(&bdev->mutex);
2543 
2544 	TAILQ_REMOVE(&bdev->open_descs, desc, link);
2545 	free(desc);
2546 
2547 	if (bdev->status == SPDK_BDEV_STATUS_REMOVING && TAILQ_EMPTY(&bdev->open_descs)) {
2548 		do_unregister = true;
2549 	}
2550 	pthread_mutex_unlock(&bdev->mutex);
2551 
2552 	if (do_unregister == true) {
2553 		spdk_bdev_unregister(bdev, bdev->unregister_cb, bdev->unregister_ctx);
2554 	}
2555 }
2556 
2557 int
2558 spdk_bdev_module_claim_bdev(struct spdk_bdev *bdev, struct spdk_bdev_desc *desc,
2559 			    struct spdk_bdev_module *module)
2560 {
2561 	if (bdev->claim_module != NULL) {
2562 		SPDK_ERRLOG("bdev %s already claimed by module %s\n", bdev->name,
2563 			    bdev->claim_module->name);
2564 		return -EPERM;
2565 	}
2566 
2567 	if (desc && !desc->write) {
2568 		desc->write = true;
2569 	}
2570 
2571 	bdev->claim_module = module;
2572 	return 0;
2573 }
2574 
2575 void
2576 spdk_bdev_module_release_bdev(struct spdk_bdev *bdev)
2577 {
2578 	assert(bdev->claim_module != NULL);
2579 	bdev->claim_module = NULL;
2580 }
2581 
2582 struct spdk_bdev *
2583 spdk_bdev_desc_get_bdev(struct spdk_bdev_desc *desc)
2584 {
2585 	return desc->bdev;
2586 }
2587 
2588 void
2589 spdk_bdev_io_get_iovec(struct spdk_bdev_io *bdev_io, struct iovec **iovp, int *iovcntp)
2590 {
2591 	struct iovec *iovs;
2592 	int iovcnt;
2593 
2594 	if (bdev_io == NULL) {
2595 		return;
2596 	}
2597 
2598 	switch (bdev_io->type) {
2599 	case SPDK_BDEV_IO_TYPE_READ:
2600 		iovs = bdev_io->u.bdev.iovs;
2601 		iovcnt = bdev_io->u.bdev.iovcnt;
2602 		break;
2603 	case SPDK_BDEV_IO_TYPE_WRITE:
2604 		iovs = bdev_io->u.bdev.iovs;
2605 		iovcnt = bdev_io->u.bdev.iovcnt;
2606 		break;
2607 	default:
2608 		iovs = NULL;
2609 		iovcnt = 0;
2610 		break;
2611 	}
2612 
2613 	if (iovp) {
2614 		*iovp = iovs;
2615 	}
2616 	if (iovcntp) {
2617 		*iovcntp = iovcnt;
2618 	}
2619 }
2620 
2621 void
2622 spdk_bdev_module_list_add(struct spdk_bdev_module *bdev_module)
2623 {
2624 
2625 	if (spdk_bdev_module_list_find(bdev_module->name)) {
2626 		fprintf(stderr, "ERROR: module '%s' already registered.\n", bdev_module->name);
2627 		assert(false);
2628 	}
2629 
2630 	if (bdev_module->async_init) {
2631 		bdev_module->action_in_progress = 1;
2632 	}
2633 
2634 	/*
2635 	 * Modules with examine callbacks must be initialized first, so they are
2636 	 *  ready to handle examine callbacks from later modules that will
2637 	 *  register physical bdevs.
2638 	 */
2639 	if (bdev_module->examine != NULL) {
2640 		TAILQ_INSERT_HEAD(&g_bdev_mgr.bdev_modules, bdev_module, tailq);
2641 	} else {
2642 		TAILQ_INSERT_TAIL(&g_bdev_mgr.bdev_modules, bdev_module, tailq);
2643 	}
2644 }
2645 
2646 struct spdk_bdev_module *
2647 spdk_bdev_module_list_find(const char *name)
2648 {
2649 	struct spdk_bdev_module *bdev_module;
2650 
2651 	TAILQ_FOREACH(bdev_module, &g_bdev_mgr.bdev_modules, tailq) {
2652 		if (strcmp(name, bdev_module->name) == 0) {
2653 			break;
2654 		}
2655 	}
2656 
2657 	return bdev_module;
2658 }
2659 
2660 static void
2661 spdk_bdev_write_zeroes_split(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg)
2662 {
2663 	uint64_t len;
2664 
2665 	if (!success) {
2666 		bdev_io->cb = bdev_io->u.bdev.stored_user_cb;
2667 		_spdk_bdev_io_complete(bdev_io);
2668 		return;
2669 	}
2670 
2671 	/* no need to perform the error checking from write_zeroes_blocks because this request already passed those checks. */
2672 	len = spdk_min(spdk_bdev_get_block_size(bdev_io->bdev) * bdev_io->u.bdev.split_remaining_num_blocks,
2673 		       ZERO_BUFFER_SIZE);
2674 
2675 	bdev_io->u.bdev.offset_blocks = bdev_io->u.bdev.split_current_offset_blocks;
2676 	bdev_io->u.bdev.iov.iov_len = len;
2677 	bdev_io->u.bdev.num_blocks = len / spdk_bdev_get_block_size(bdev_io->bdev);
2678 	bdev_io->u.bdev.split_remaining_num_blocks -= bdev_io->u.bdev.num_blocks;
2679 	bdev_io->u.bdev.split_current_offset_blocks += bdev_io->u.bdev.num_blocks;
2680 
2681 	/* if this round completes the i/o, change the callback to be the original user callback */
2682 	if (bdev_io->u.bdev.split_remaining_num_blocks == 0) {
2683 		spdk_bdev_io_init(bdev_io, bdev_io->bdev, cb_arg, bdev_io->u.bdev.stored_user_cb);
2684 	} else {
2685 		spdk_bdev_io_init(bdev_io, bdev_io->bdev, cb_arg, spdk_bdev_write_zeroes_split);
2686 	}
2687 	spdk_bdev_io_submit(bdev_io);
2688 }
2689 
2690 SPDK_LOG_REGISTER_COMPONENT("bdev", SPDK_LOG_BDEV)
2691