xref: /spdk/lib/bdev/bdev.c (revision c3bc40a6ef659197499eb2d01f3da092071a8f77)
1 /*-
2  *   BSD LICENSE
3  *
4  *   Copyright (C) 2008-2012 Daisuke Aoyama <aoyama@peach.ne.jp>.
5  *   Copyright (c) Intel Corporation.
6  *   All rights reserved.
7  *
8  *   Redistribution and use in source and binary forms, with or without
9  *   modification, are permitted provided that the following conditions
10  *   are met:
11  *
12  *     * Redistributions of source code must retain the above copyright
13  *       notice, this list of conditions and the following disclaimer.
14  *     * Redistributions in binary form must reproduce the above copyright
15  *       notice, this list of conditions and the following disclaimer in
16  *       the documentation and/or other materials provided with the
17  *       distribution.
18  *     * Neither the name of Intel Corporation nor the names of its
19  *       contributors may be used to endorse or promote products derived
20  *       from this software without specific prior written permission.
21  *
22  *   THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
23  *   "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
24  *   LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
25  *   A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
26  *   OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
27  *   SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
28  *   LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
29  *   DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
30  *   THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
31  *   (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
32  *   OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
33  */
34 
35 #include "spdk/stdinc.h"
36 
37 #include "spdk/bdev.h"
38 
39 #include "spdk/env.h"
40 #include "spdk/event.h"
41 #include "spdk/io_channel.h"
42 #include "spdk/likely.h"
43 #include "spdk/queue.h"
44 #include "spdk/nvme_spec.h"
45 #include "spdk/scsi_spec.h"
46 #include "spdk/util.h"
47 
48 #include "spdk_internal/bdev.h"
49 #include "spdk_internal/log.h"
50 #include "spdk/string.h"
51 
52 #ifdef SPDK_CONFIG_VTUNE
53 #include "ittnotify.h"
54 #include "ittnotify_types.h"
55 int __itt_init_ittlib(const char *, __itt_group_id);
56 #endif
57 
58 #define SPDK_BDEV_IO_POOL_SIZE			(64 * 1024)
59 #define SPDK_BDEV_IO_CACHE_SIZE			256
60 #define BUF_SMALL_POOL_SIZE			8192
61 #define BUF_LARGE_POOL_SIZE			1024
62 #define NOMEM_THRESHOLD_COUNT			8
63 #define ZERO_BUFFER_SIZE			0x100000
64 #define SPDK_BDEV_QOS_TIMESLICE_IN_USEC		1000
65 #define SPDK_BDEV_SEC_TO_USEC			1000000ULL
66 #define SPDK_BDEV_QOS_MIN_IO_PER_TIMESLICE	1
67 
68 typedef TAILQ_HEAD(, spdk_bdev_io) bdev_io_tailq_t;
69 typedef STAILQ_HEAD(, spdk_bdev_io) bdev_io_stailq_t;
70 
71 struct spdk_bdev_mgr {
72 	struct spdk_mempool *bdev_io_pool;
73 
74 	struct spdk_mempool *buf_small_pool;
75 	struct spdk_mempool *buf_large_pool;
76 
77 	void *zero_buffer;
78 
79 	TAILQ_HEAD(, spdk_bdev_module) bdev_modules;
80 
81 	TAILQ_HEAD(, spdk_bdev) bdevs;
82 
83 	bool init_complete;
84 	bool module_init_complete;
85 
86 #ifdef SPDK_CONFIG_VTUNE
87 	__itt_domain	*domain;
88 #endif
89 };
90 
91 static struct spdk_bdev_mgr g_bdev_mgr = {
92 	.bdev_modules = TAILQ_HEAD_INITIALIZER(g_bdev_mgr.bdev_modules),
93 	.bdevs = TAILQ_HEAD_INITIALIZER(g_bdev_mgr.bdevs),
94 	.init_complete = false,
95 	.module_init_complete = false,
96 };
97 
98 static spdk_bdev_init_cb	g_init_cb_fn = NULL;
99 static void			*g_init_cb_arg = NULL;
100 
101 static spdk_bdev_fini_cb	g_fini_cb_fn = NULL;
102 static void			*g_fini_cb_arg = NULL;
103 static struct spdk_thread	*g_fini_thread = NULL;
104 
105 
106 struct spdk_bdev_mgmt_channel {
107 	bdev_io_stailq_t need_buf_small;
108 	bdev_io_stailq_t need_buf_large;
109 
110 	/*
111 	 * Each thread keeps a cache of bdev_io - this allows
112 	 *  bdev threads which are *not* DPDK threads to still
113 	 *  benefit from a per-thread bdev_io cache.  Without
114 	 *  this, non-DPDK threads fetching from the mempool
115 	 *  incur a cmpxchg on get and put.
116 	 */
117 	bdev_io_stailq_t per_thread_cache;
118 	uint32_t	per_thread_cache_count;
119 
120 	TAILQ_HEAD(, spdk_bdev_module_channel) module_channels;
121 };
122 
123 struct spdk_bdev_desc {
124 	struct spdk_bdev		*bdev;
125 	spdk_bdev_remove_cb_t		remove_cb;
126 	void				*remove_ctx;
127 	bool				write;
128 	TAILQ_ENTRY(spdk_bdev_desc)	link;
129 };
130 
131 #define BDEV_CH_RESET_IN_PROGRESS	(1 << 0)
132 #define BDEV_CH_QOS_ENABLED		(1 << 1)
133 
134 struct spdk_bdev_channel {
135 	struct spdk_bdev	*bdev;
136 
137 	/* The channel for the underlying device */
138 	struct spdk_io_channel	*channel;
139 
140 	/* Channel for the bdev manager */
141 	struct spdk_io_channel	*mgmt_channel;
142 
143 	struct spdk_bdev_io_stat stat;
144 
145 	bdev_io_tailq_t		queued_resets;
146 
147 	uint32_t		flags;
148 
149 	/*
150 	 * Rate limiting on this channel.
151 	 * Queue of IO awaiting issue because of a QoS rate limiting happened
152 	 *  on this channel.
153 	 */
154 	bdev_io_tailq_t		qos_io;
155 
156 	/*
157 	 * Rate limiting on this channel.
158 	 * Maximum allowed IOs to be issued in one timeslice (e.g., 1ms) and
159 	 *  only valid for the master channel which manages the outstanding IOs.
160 	 */
161 	uint64_t		qos_max_ios_per_timeslice;
162 
163 	/*
164 	 * Rate limiting on this channel.
165 	 * Submitted IO in one timeslice (e.g., 1ms)
166 	 */
167 	uint64_t		io_submitted_this_timeslice;
168 
169 	/*
170 	 * Rate limiting on this channel.
171 	 * Periodic running QoS poller in millisecond.
172 	 */
173 	struct spdk_poller	*qos_poller;
174 
175 	/* Per-device channel */
176 	struct spdk_bdev_module_channel *module_ch;
177 
178 #ifdef SPDK_CONFIG_VTUNE
179 	uint64_t		start_tsc;
180 	uint64_t		interval_tsc;
181 	__itt_string_handle	*handle;
182 #endif
183 
184 };
185 
186 #define __bdev_to_io_dev(bdev)		(((char *)bdev) + 1)
187 #define __bdev_from_io_dev(io_dev)	((struct spdk_bdev *)(((char *)io_dev) - 1))
188 
189 /*
190  * Per-module (or per-io_device) channel. Multiple bdevs built on the same io_device
191  * will queue here their IO that awaits retry. It makes it posible to retry sending
192  * IO to one bdev after IO from other bdev completes.
193  */
194 struct spdk_bdev_module_channel {
195 	/*
196 	 * Count of I/O submitted to bdev module and waiting for completion.
197 	 * Incremented before submit_request() is called on an spdk_bdev_io.
198 	 */
199 	uint64_t		io_outstanding;
200 
201 	/*
202 	 * Queue of IO awaiting retry because of a previous NOMEM status returned
203 	 *  on this channel.
204 	 */
205 	bdev_io_tailq_t		nomem_io;
206 
207 	/*
208 	 * Threshold which io_outstanding must drop to before retrying nomem_io.
209 	 */
210 	uint64_t		nomem_threshold;
211 
212 	/* I/O channel allocated by a bdev module */
213 	struct spdk_io_channel	*module_ch;
214 
215 	uint32_t		ref;
216 
217 	TAILQ_ENTRY(spdk_bdev_module_channel) link;
218 };
219 
220 static void spdk_bdev_write_zeroes_split(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg);
221 
222 struct spdk_bdev *
223 spdk_bdev_first(void)
224 {
225 	struct spdk_bdev *bdev;
226 
227 	bdev = TAILQ_FIRST(&g_bdev_mgr.bdevs);
228 	if (bdev) {
229 		SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Starting bdev iteration at %s\n", bdev->name);
230 	}
231 
232 	return bdev;
233 }
234 
235 struct spdk_bdev *
236 spdk_bdev_next(struct spdk_bdev *prev)
237 {
238 	struct spdk_bdev *bdev;
239 
240 	bdev = TAILQ_NEXT(prev, link);
241 	if (bdev) {
242 		SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Continuing bdev iteration at %s\n", bdev->name);
243 	}
244 
245 	return bdev;
246 }
247 
248 static struct spdk_bdev *
249 _bdev_next_leaf(struct spdk_bdev *bdev)
250 {
251 	while (bdev != NULL) {
252 		if (bdev->claim_module == NULL) {
253 			return bdev;
254 		} else {
255 			bdev = TAILQ_NEXT(bdev, link);
256 		}
257 	}
258 
259 	return bdev;
260 }
261 
262 struct spdk_bdev *
263 spdk_bdev_first_leaf(void)
264 {
265 	struct spdk_bdev *bdev;
266 
267 	bdev = _bdev_next_leaf(TAILQ_FIRST(&g_bdev_mgr.bdevs));
268 
269 	if (bdev) {
270 		SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Starting bdev iteration at %s\n", bdev->name);
271 	}
272 
273 	return bdev;
274 }
275 
276 struct spdk_bdev *
277 spdk_bdev_next_leaf(struct spdk_bdev *prev)
278 {
279 	struct spdk_bdev *bdev;
280 
281 	bdev = _bdev_next_leaf(TAILQ_NEXT(prev, link));
282 
283 	if (bdev) {
284 		SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Continuing bdev iteration at %s\n", bdev->name);
285 	}
286 
287 	return bdev;
288 }
289 
290 struct spdk_bdev *
291 spdk_bdev_get_by_name(const char *bdev_name)
292 {
293 	struct spdk_bdev_alias *tmp;
294 	struct spdk_bdev *bdev = spdk_bdev_first();
295 
296 	while (bdev != NULL) {
297 		if (strcmp(bdev_name, bdev->name) == 0) {
298 			return bdev;
299 		}
300 
301 		TAILQ_FOREACH(tmp, &bdev->aliases, tailq) {
302 			if (strcmp(bdev_name, tmp->alias) == 0) {
303 				return bdev;
304 			}
305 		}
306 
307 		bdev = spdk_bdev_next(bdev);
308 	}
309 
310 	return NULL;
311 }
312 
313 static void
314 spdk_bdev_io_set_buf(struct spdk_bdev_io *bdev_io, void *buf)
315 {
316 	assert(bdev_io->get_buf_cb != NULL);
317 	assert(buf != NULL);
318 	assert(bdev_io->u.bdev.iovs != NULL);
319 
320 	bdev_io->buf = buf;
321 	bdev_io->u.bdev.iovs[0].iov_base = (void *)((unsigned long)((char *)buf + 512) & ~511UL);
322 	bdev_io->u.bdev.iovs[0].iov_len = bdev_io->buf_len;
323 	bdev_io->get_buf_cb(bdev_io->ch->channel, bdev_io);
324 }
325 
326 static void
327 spdk_bdev_io_put_buf(struct spdk_bdev_io *bdev_io)
328 {
329 	struct spdk_mempool *pool;
330 	struct spdk_bdev_io *tmp;
331 	void *buf;
332 	bdev_io_stailq_t *stailq;
333 	struct spdk_bdev_mgmt_channel *ch;
334 
335 	assert(bdev_io->u.bdev.iovcnt == 1);
336 
337 	buf = bdev_io->buf;
338 	ch = bdev_io->mgmt_ch;
339 
340 	if (bdev_io->buf_len <= SPDK_BDEV_SMALL_BUF_MAX_SIZE) {
341 		pool = g_bdev_mgr.buf_small_pool;
342 		stailq = &ch->need_buf_small;
343 	} else {
344 		pool = g_bdev_mgr.buf_large_pool;
345 		stailq = &ch->need_buf_large;
346 	}
347 
348 	if (STAILQ_EMPTY(stailq)) {
349 		spdk_mempool_put(pool, buf);
350 	} else {
351 		tmp = STAILQ_FIRST(stailq);
352 		STAILQ_REMOVE_HEAD(stailq, buf_link);
353 		spdk_bdev_io_set_buf(tmp, buf);
354 	}
355 }
356 
357 void
358 spdk_bdev_io_get_buf(struct spdk_bdev_io *bdev_io, spdk_bdev_io_get_buf_cb cb, uint64_t len)
359 {
360 	struct spdk_mempool *pool;
361 	bdev_io_stailq_t *stailq;
362 	void *buf = NULL;
363 	struct spdk_bdev_mgmt_channel *ch;
364 
365 	assert(cb != NULL);
366 	assert(bdev_io->u.bdev.iovs != NULL);
367 
368 	if (spdk_unlikely(bdev_io->u.bdev.iovs[0].iov_base != NULL)) {
369 		/* Buffer already present */
370 		cb(bdev_io->ch->channel, bdev_io);
371 		return;
372 	}
373 
374 	assert(len <= SPDK_BDEV_LARGE_BUF_MAX_SIZE);
375 	ch = spdk_io_channel_get_ctx(bdev_io->ch->mgmt_channel);
376 
377 	bdev_io->buf_len = len;
378 	bdev_io->get_buf_cb = cb;
379 	if (len <= SPDK_BDEV_SMALL_BUF_MAX_SIZE) {
380 		pool = g_bdev_mgr.buf_small_pool;
381 		stailq = &ch->need_buf_small;
382 	} else {
383 		pool = g_bdev_mgr.buf_large_pool;
384 		stailq = &ch->need_buf_large;
385 	}
386 
387 	buf = spdk_mempool_get(pool);
388 
389 	if (!buf) {
390 		STAILQ_INSERT_TAIL(stailq, bdev_io, buf_link);
391 	} else {
392 		spdk_bdev_io_set_buf(bdev_io, buf);
393 	}
394 }
395 
396 static int
397 spdk_bdev_module_get_max_ctx_size(void)
398 {
399 	struct spdk_bdev_module *bdev_module;
400 	int max_bdev_module_size = 0;
401 
402 	TAILQ_FOREACH(bdev_module, &g_bdev_mgr.bdev_modules, tailq) {
403 		if (bdev_module->get_ctx_size && bdev_module->get_ctx_size() > max_bdev_module_size) {
404 			max_bdev_module_size = bdev_module->get_ctx_size();
405 		}
406 	}
407 
408 	return max_bdev_module_size;
409 }
410 
411 void
412 spdk_bdev_config_text(FILE *fp)
413 {
414 	struct spdk_bdev_module *bdev_module;
415 
416 	TAILQ_FOREACH(bdev_module, &g_bdev_mgr.bdev_modules, tailq) {
417 		if (bdev_module->config_text) {
418 			bdev_module->config_text(fp);
419 		}
420 	}
421 }
422 
423 static int
424 spdk_bdev_mgmt_channel_create(void *io_device, void *ctx_buf)
425 {
426 	struct spdk_bdev_mgmt_channel *ch = ctx_buf;
427 
428 	STAILQ_INIT(&ch->need_buf_small);
429 	STAILQ_INIT(&ch->need_buf_large);
430 
431 	STAILQ_INIT(&ch->per_thread_cache);
432 	ch->per_thread_cache_count = 0;
433 
434 	TAILQ_INIT(&ch->module_channels);
435 
436 	return 0;
437 }
438 
439 static void
440 spdk_bdev_mgmt_channel_free_resources(struct spdk_bdev_mgmt_channel *ch)
441 {
442 	struct spdk_bdev_io *bdev_io;
443 
444 	if (!STAILQ_EMPTY(&ch->need_buf_small) || !STAILQ_EMPTY(&ch->need_buf_large)) {
445 		SPDK_ERRLOG("Pending I/O list wasn't empty on channel free\n");
446 	}
447 
448 	while (!STAILQ_EMPTY(&ch->per_thread_cache)) {
449 		bdev_io = STAILQ_FIRST(&ch->per_thread_cache);
450 		STAILQ_REMOVE_HEAD(&ch->per_thread_cache, buf_link);
451 		ch->per_thread_cache_count--;
452 		spdk_mempool_put(g_bdev_mgr.bdev_io_pool, (void *)bdev_io);
453 	}
454 
455 	assert(ch->per_thread_cache_count == 0);
456 }
457 
458 static void
459 spdk_bdev_mgmt_channel_destroy(void *io_device, void *ctx_buf)
460 {
461 	struct spdk_bdev_mgmt_channel *ch = ctx_buf;
462 
463 	spdk_bdev_mgmt_channel_free_resources(ch);
464 }
465 
466 static void
467 spdk_bdev_init_complete(int rc)
468 {
469 	spdk_bdev_init_cb cb_fn = g_init_cb_fn;
470 	void *cb_arg = g_init_cb_arg;
471 
472 	g_bdev_mgr.init_complete = true;
473 	g_init_cb_fn = NULL;
474 	g_init_cb_arg = NULL;
475 
476 	cb_fn(cb_arg, rc);
477 }
478 
479 static void
480 spdk_bdev_module_action_complete(void)
481 {
482 	struct spdk_bdev_module *m;
483 
484 	/*
485 	 * Don't finish bdev subsystem initialization if
486 	 * module pre-initialization is still in progress, or
487 	 * the subsystem been already initialized.
488 	 */
489 	if (!g_bdev_mgr.module_init_complete || g_bdev_mgr.init_complete) {
490 		return;
491 	}
492 
493 	/*
494 	 * Check all bdev modules for inits/examinations in progress. If any
495 	 * exist, return immediately since we cannot finish bdev subsystem
496 	 * initialization until all are completed.
497 	 */
498 	TAILQ_FOREACH(m, &g_bdev_mgr.bdev_modules, tailq) {
499 		if (m->action_in_progress > 0) {
500 			return;
501 		}
502 	}
503 
504 	/*
505 	 * Modules already finished initialization - now that all
506 	 * the bdev modules have finished their asynchronous I/O
507 	 * processing, the entire bdev layer can be marked as complete.
508 	 */
509 	spdk_bdev_init_complete(0);
510 }
511 
512 static void
513 spdk_bdev_module_action_done(struct spdk_bdev_module *module)
514 {
515 	assert(module->action_in_progress > 0);
516 	module->action_in_progress--;
517 	spdk_bdev_module_action_complete();
518 }
519 
520 void
521 spdk_bdev_module_init_done(struct spdk_bdev_module *module)
522 {
523 	spdk_bdev_module_action_done(module);
524 }
525 
526 void
527 spdk_bdev_module_examine_done(struct spdk_bdev_module *module)
528 {
529 	spdk_bdev_module_action_done(module);
530 }
531 
532 static int
533 spdk_bdev_modules_init(void)
534 {
535 	struct spdk_bdev_module *module;
536 	int rc = 0;
537 
538 	TAILQ_FOREACH(module, &g_bdev_mgr.bdev_modules, tailq) {
539 		rc = module->module_init();
540 		if (rc != 0) {
541 			break;
542 		}
543 	}
544 
545 	g_bdev_mgr.module_init_complete = true;
546 	return rc;
547 }
548 void
549 spdk_bdev_initialize(spdk_bdev_init_cb cb_fn, void *cb_arg)
550 {
551 	int cache_size;
552 	int rc = 0;
553 	char mempool_name[32];
554 
555 	assert(cb_fn != NULL);
556 
557 	g_init_cb_fn = cb_fn;
558 	g_init_cb_arg = cb_arg;
559 
560 	snprintf(mempool_name, sizeof(mempool_name), "bdev_io_%d", getpid());
561 
562 	g_bdev_mgr.bdev_io_pool = spdk_mempool_create(mempool_name,
563 				  SPDK_BDEV_IO_POOL_SIZE,
564 				  sizeof(struct spdk_bdev_io) +
565 				  spdk_bdev_module_get_max_ctx_size(),
566 				  0,
567 				  SPDK_ENV_SOCKET_ID_ANY);
568 
569 	if (g_bdev_mgr.bdev_io_pool == NULL) {
570 		SPDK_ERRLOG("could not allocate spdk_bdev_io pool\n");
571 		spdk_bdev_init_complete(-1);
572 		return;
573 	}
574 
575 	/**
576 	 * Ensure no more than half of the total buffers end up local caches, by
577 	 *   using spdk_env_get_core_count() to determine how many local caches we need
578 	 *   to account for.
579 	 */
580 	cache_size = BUF_SMALL_POOL_SIZE / (2 * spdk_env_get_core_count());
581 	snprintf(mempool_name, sizeof(mempool_name), "buf_small_pool_%d", getpid());
582 
583 	g_bdev_mgr.buf_small_pool = spdk_mempool_create(mempool_name,
584 				    BUF_SMALL_POOL_SIZE,
585 				    SPDK_BDEV_SMALL_BUF_MAX_SIZE + 512,
586 				    cache_size,
587 				    SPDK_ENV_SOCKET_ID_ANY);
588 	if (!g_bdev_mgr.buf_small_pool) {
589 		SPDK_ERRLOG("create rbuf small pool failed\n");
590 		spdk_bdev_init_complete(-1);
591 		return;
592 	}
593 
594 	cache_size = BUF_LARGE_POOL_SIZE / (2 * spdk_env_get_core_count());
595 	snprintf(mempool_name, sizeof(mempool_name), "buf_large_pool_%d", getpid());
596 
597 	g_bdev_mgr.buf_large_pool = spdk_mempool_create(mempool_name,
598 				    BUF_LARGE_POOL_SIZE,
599 				    SPDK_BDEV_LARGE_BUF_MAX_SIZE + 512,
600 				    cache_size,
601 				    SPDK_ENV_SOCKET_ID_ANY);
602 	if (!g_bdev_mgr.buf_large_pool) {
603 		SPDK_ERRLOG("create rbuf large pool failed\n");
604 		spdk_bdev_init_complete(-1);
605 		return;
606 	}
607 
608 	g_bdev_mgr.zero_buffer = spdk_dma_zmalloc(ZERO_BUFFER_SIZE, ZERO_BUFFER_SIZE,
609 				 NULL);
610 	if (!g_bdev_mgr.zero_buffer) {
611 		SPDK_ERRLOG("create bdev zero buffer failed\n");
612 		spdk_bdev_init_complete(-1);
613 		return;
614 	}
615 
616 #ifdef SPDK_CONFIG_VTUNE
617 	g_bdev_mgr.domain = __itt_domain_create("spdk_bdev");
618 #endif
619 
620 	spdk_io_device_register(&g_bdev_mgr, spdk_bdev_mgmt_channel_create,
621 				spdk_bdev_mgmt_channel_destroy,
622 				sizeof(struct spdk_bdev_mgmt_channel));
623 
624 	rc = spdk_bdev_modules_init();
625 	if (rc != 0) {
626 		SPDK_ERRLOG("bdev modules init failed\n");
627 		spdk_bdev_init_complete(-1);
628 		return;
629 	}
630 
631 	spdk_bdev_module_action_complete();
632 }
633 
634 static void
635 spdk_bdev_module_finish_cb(void *io_device)
636 {
637 	spdk_bdev_fini_cb cb_fn = g_fini_cb_fn;
638 
639 	cb_fn(g_fini_cb_arg);
640 	g_fini_cb_fn = NULL;
641 	g_fini_cb_arg = NULL;
642 }
643 
644 static void
645 spdk_bdev_module_finish_complete(struct spdk_io_channel_iter *i, int status)
646 {
647 	if (spdk_mempool_count(g_bdev_mgr.bdev_io_pool) != SPDK_BDEV_IO_POOL_SIZE) {
648 		SPDK_ERRLOG("bdev IO pool count is %zu but should be %u\n",
649 			    spdk_mempool_count(g_bdev_mgr.bdev_io_pool),
650 			    SPDK_BDEV_IO_POOL_SIZE);
651 	}
652 
653 	if (spdk_mempool_count(g_bdev_mgr.buf_small_pool) != BUF_SMALL_POOL_SIZE) {
654 		SPDK_ERRLOG("Small buffer pool count is %zu but should be %u\n",
655 			    spdk_mempool_count(g_bdev_mgr.buf_small_pool),
656 			    BUF_SMALL_POOL_SIZE);
657 		assert(false);
658 	}
659 
660 	if (spdk_mempool_count(g_bdev_mgr.buf_large_pool) != BUF_LARGE_POOL_SIZE) {
661 		SPDK_ERRLOG("Large buffer pool count is %zu but should be %u\n",
662 			    spdk_mempool_count(g_bdev_mgr.buf_large_pool),
663 			    BUF_LARGE_POOL_SIZE);
664 		assert(false);
665 	}
666 
667 	spdk_mempool_free(g_bdev_mgr.bdev_io_pool);
668 	spdk_mempool_free(g_bdev_mgr.buf_small_pool);
669 	spdk_mempool_free(g_bdev_mgr.buf_large_pool);
670 	spdk_dma_free(g_bdev_mgr.zero_buffer);
671 
672 	spdk_io_device_unregister(&g_bdev_mgr, spdk_bdev_module_finish_cb);
673 }
674 
675 static void
676 mgmt_channel_free_resources(struct spdk_io_channel_iter *i)
677 {
678 	struct spdk_io_channel *_ch = spdk_io_channel_iter_get_channel(i);
679 	struct spdk_bdev_mgmt_channel *ch = spdk_io_channel_get_ctx(_ch);
680 
681 	spdk_bdev_mgmt_channel_free_resources(ch);
682 	spdk_for_each_channel_continue(i, 0);
683 }
684 
685 static void
686 spdk_bdev_module_finish_iter(void *arg)
687 {
688 	/* Notice that this variable is static. It is saved between calls to
689 	 * this function. */
690 	static struct spdk_bdev_module *resume_bdev_module = NULL;
691 	struct spdk_bdev_module *bdev_module;
692 
693 	/* Start iterating from the last touched module */
694 	if (!resume_bdev_module) {
695 		bdev_module = TAILQ_FIRST(&g_bdev_mgr.bdev_modules);
696 	} else {
697 		bdev_module = TAILQ_NEXT(resume_bdev_module, tailq);
698 	}
699 
700 	while (bdev_module) {
701 		if (bdev_module->async_fini) {
702 			/* Save our place so we can resume later. We must
703 			 * save the variable here, before calling module_fini()
704 			 * below, because in some cases the module may immediately
705 			 * call spdk_bdev_module_finish_done() and re-enter
706 			 * this function to continue iterating. */
707 			resume_bdev_module = bdev_module;
708 		}
709 
710 		if (bdev_module->module_fini) {
711 			bdev_module->module_fini();
712 		}
713 
714 		if (bdev_module->async_fini) {
715 			return;
716 		}
717 
718 		bdev_module = TAILQ_NEXT(bdev_module, tailq);
719 	}
720 
721 	resume_bdev_module = NULL;
722 	spdk_for_each_channel(&g_bdev_mgr, mgmt_channel_free_resources, NULL,
723 			      spdk_bdev_module_finish_complete);
724 }
725 
726 void
727 spdk_bdev_module_finish_done(void)
728 {
729 	if (spdk_get_thread() != g_fini_thread) {
730 		spdk_thread_send_msg(g_fini_thread, spdk_bdev_module_finish_iter, NULL);
731 	} else {
732 		spdk_bdev_module_finish_iter(NULL);
733 	}
734 }
735 
736 static void
737 _spdk_bdev_finish_unregister_bdevs_iter(void *cb_arg, int bdeverrno)
738 {
739 	struct spdk_bdev *bdev = cb_arg;
740 
741 	if (bdeverrno && bdev) {
742 		SPDK_WARNLOG("Unable to unregister bdev '%s' during spdk_bdev_finish()\n",
743 			     bdev->name);
744 
745 		/*
746 		 * Since the call to spdk_bdev_unregister() failed, we have no way to free this
747 		 *  bdev; try to continue by manually removing this bdev from the list and continue
748 		 *  with the next bdev in the list.
749 		 */
750 		TAILQ_REMOVE(&g_bdev_mgr.bdevs, bdev, link);
751 	}
752 
753 	if (TAILQ_EMPTY(&g_bdev_mgr.bdevs)) {
754 		SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Done unregistering bdevs\n");
755 		spdk_bdev_module_finish_iter(NULL);
756 		return;
757 	}
758 
759 	/*
760 	 * Unregister the first bdev in the list.
761 	 *
762 	 * spdk_bdev_unregister() will handle the case where the bdev has open descriptors by
763 	 *  calling the remove_cb of the descriptors first.
764 	 *
765 	 * Once this bdev and all of its open descriptors have been cleaned up, this function
766 	 *  will be called again via the unregister completion callback to continue the cleanup
767 	 *  process with the next bdev.
768 	 */
769 	bdev = TAILQ_FIRST(&g_bdev_mgr.bdevs);
770 	SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Unregistering bdev '%s'\n", bdev->name);
771 	spdk_bdev_unregister(bdev, _spdk_bdev_finish_unregister_bdevs_iter, bdev);
772 }
773 
774 static void
775 _spdk_bdev_finish_unregister_bdevs(void)
776 {
777 	_spdk_bdev_finish_unregister_bdevs_iter(NULL, 0);
778 }
779 
780 void
781 spdk_bdev_finish(spdk_bdev_fini_cb cb_fn, void *cb_arg)
782 {
783 	assert(cb_fn != NULL);
784 
785 	g_fini_thread = spdk_get_thread();
786 
787 	g_fini_cb_fn = cb_fn;
788 	g_fini_cb_arg = cb_arg;
789 
790 	_spdk_bdev_finish_unregister_bdevs();
791 }
792 
793 static struct spdk_bdev_io *
794 spdk_bdev_get_io(struct spdk_io_channel *_ch)
795 {
796 	struct spdk_bdev_mgmt_channel *ch = spdk_io_channel_get_ctx(_ch);
797 	struct spdk_bdev_io *bdev_io;
798 
799 	if (ch->per_thread_cache_count > 0) {
800 		bdev_io = STAILQ_FIRST(&ch->per_thread_cache);
801 		STAILQ_REMOVE_HEAD(&ch->per_thread_cache, buf_link);
802 		ch->per_thread_cache_count--;
803 	} else {
804 		bdev_io = spdk_mempool_get(g_bdev_mgr.bdev_io_pool);
805 		if (!bdev_io) {
806 			SPDK_ERRLOG("Unable to get spdk_bdev_io\n");
807 			return NULL;
808 		}
809 	}
810 
811 	bdev_io->mgmt_ch = ch;
812 
813 	return bdev_io;
814 }
815 
816 static void
817 spdk_bdev_put_io(struct spdk_bdev_io *bdev_io)
818 {
819 	struct spdk_bdev_mgmt_channel *ch = bdev_io->mgmt_ch;
820 
821 	if (bdev_io->buf != NULL) {
822 		spdk_bdev_io_put_buf(bdev_io);
823 	}
824 
825 	if (ch->per_thread_cache_count < SPDK_BDEV_IO_CACHE_SIZE) {
826 		ch->per_thread_cache_count++;
827 		STAILQ_INSERT_TAIL(&ch->per_thread_cache, bdev_io, buf_link);
828 	} else {
829 		spdk_mempool_put(g_bdev_mgr.bdev_io_pool, (void *)bdev_io);
830 	}
831 }
832 
833 static void
834 _spdk_bdev_qos_io_submit(void *ctx)
835 {
836 	struct spdk_bdev_channel	*ch = ctx;
837 	struct spdk_bdev_io		*bdev_io = NULL;
838 	struct spdk_bdev		*bdev = ch->bdev;
839 	struct spdk_bdev_module_channel *shared_ch = ch->module_ch;
840 
841 	while (!TAILQ_EMPTY(&ch->qos_io)) {
842 		if (ch->io_submitted_this_timeslice < ch->qos_max_ios_per_timeslice) {
843 			bdev_io = TAILQ_FIRST(&ch->qos_io);
844 			TAILQ_REMOVE(&ch->qos_io, bdev_io, link);
845 			ch->io_submitted_this_timeslice++;
846 			shared_ch->io_outstanding++;
847 			bdev->fn_table->submit_request(ch->channel, bdev_io);
848 		} else {
849 			break;
850 		}
851 	}
852 }
853 
854 static void
855 _spdk_bdev_io_submit(void *ctx)
856 {
857 	struct spdk_bdev_io *bdev_io = ctx;
858 	struct spdk_bdev *bdev = bdev_io->bdev;
859 	struct spdk_bdev_channel *bdev_ch = bdev_io->ch;
860 	struct spdk_io_channel *ch = bdev_ch->channel;
861 	struct spdk_bdev_module_channel	*shared_ch = bdev_ch->module_ch;
862 
863 	bdev_io->submit_tsc = spdk_get_ticks();
864 	shared_ch->io_outstanding++;
865 	bdev_io->in_submit_request = true;
866 	if (spdk_likely(bdev_ch->flags == 0)) {
867 		if (spdk_likely(TAILQ_EMPTY(&shared_ch->nomem_io))) {
868 			bdev->fn_table->submit_request(ch, bdev_io);
869 		} else {
870 			shared_ch->io_outstanding--;
871 			TAILQ_INSERT_TAIL(&shared_ch->nomem_io, bdev_io, link);
872 		}
873 	} else if (bdev_ch->flags & BDEV_CH_RESET_IN_PROGRESS) {
874 		spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED);
875 	} else if (bdev_ch->flags & BDEV_CH_QOS_ENABLED) {
876 		shared_ch->io_outstanding--;
877 		TAILQ_INSERT_TAIL(&bdev_ch->qos_io, bdev_io, link);
878 		_spdk_bdev_qos_io_submit(bdev_ch);
879 	} else {
880 		SPDK_ERRLOG("unknown bdev_ch flag %x found\n", bdev_ch->flags);
881 		spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED);
882 	}
883 	bdev_io->in_submit_request = false;
884 }
885 
886 static void
887 spdk_bdev_io_submit(struct spdk_bdev_io *bdev_io)
888 {
889 	struct spdk_bdev *bdev = bdev_io->bdev;
890 
891 	assert(bdev_io->status == SPDK_BDEV_IO_STATUS_PENDING);
892 
893 	/* QoS channel and thread have been properly configured */
894 	if (bdev->ios_per_sec > 0 && bdev->qos_channel && bdev->qos_thread) {
895 		bdev_io->io_submit_ch = bdev_io->ch;
896 		bdev_io->ch = bdev->qos_channel;
897 		spdk_thread_send_msg(bdev->qos_thread, _spdk_bdev_io_submit, bdev_io);
898 	} else {
899 		_spdk_bdev_io_submit(bdev_io);
900 	}
901 }
902 
903 static void
904 spdk_bdev_io_submit_reset(struct spdk_bdev_io *bdev_io)
905 {
906 	struct spdk_bdev *bdev = bdev_io->bdev;
907 	struct spdk_bdev_channel *bdev_ch = bdev_io->ch;
908 	struct spdk_io_channel *ch = bdev_ch->channel;
909 
910 	assert(bdev_io->status == SPDK_BDEV_IO_STATUS_PENDING);
911 
912 	bdev_io->in_submit_request = true;
913 	bdev->fn_table->submit_request(ch, bdev_io);
914 	bdev_io->in_submit_request = false;
915 }
916 
917 static void
918 spdk_bdev_io_init(struct spdk_bdev_io *bdev_io,
919 		  struct spdk_bdev *bdev, void *cb_arg,
920 		  spdk_bdev_io_completion_cb cb)
921 {
922 	bdev_io->bdev = bdev;
923 	bdev_io->caller_ctx = cb_arg;
924 	bdev_io->cb = cb;
925 	bdev_io->status = SPDK_BDEV_IO_STATUS_PENDING;
926 	bdev_io->in_submit_request = false;
927 	bdev_io->buf = NULL;
928 	bdev_io->io_submit_ch = NULL;
929 }
930 
931 bool
932 spdk_bdev_io_type_supported(struct spdk_bdev *bdev, enum spdk_bdev_io_type io_type)
933 {
934 	return bdev->fn_table->io_type_supported(bdev->ctxt, io_type);
935 }
936 
937 int
938 spdk_bdev_dump_info_json(struct spdk_bdev *bdev, struct spdk_json_write_ctx *w)
939 {
940 	if (bdev->fn_table->dump_info_json) {
941 		return bdev->fn_table->dump_info_json(bdev->ctxt, w);
942 	}
943 
944 	return 0;
945 }
946 
947 static void
948 spdk_bdev_qos_get_max_ios_per_timeslice(struct spdk_bdev *bdev)
949 {
950 	uint64_t	qos_max_ios_per_timeslice = 0;
951 
952 	qos_max_ios_per_timeslice = bdev->ios_per_sec * SPDK_BDEV_QOS_TIMESLICE_IN_USEC /
953 				    SPDK_BDEV_SEC_TO_USEC;
954 	bdev->qos_channel->qos_max_ios_per_timeslice = spdk_max(qos_max_ios_per_timeslice,
955 			SPDK_BDEV_QOS_MIN_IO_PER_TIMESLICE);
956 }
957 
958 static int
959 spdk_bdev_channel_poll_qos(void *arg)
960 {
961 	struct spdk_bdev_channel	*ch = arg;
962 	struct spdk_bdev		*bdev = ch->bdev;
963 
964 	/* Reset for next round of rate limiting */
965 	ch->io_submitted_this_timeslice = 0;
966 	spdk_bdev_qos_get_max_ios_per_timeslice(bdev);
967 
968 	_spdk_bdev_qos_io_submit(ch);
969 
970 	return -1;
971 }
972 
973 static void
974 spdk_bdev_qos_register_poller(void *ctx)
975 {
976 	struct spdk_bdev_channel	*ch = ctx;
977 
978 	ch->qos_poller = spdk_poller_register(spdk_bdev_channel_poll_qos, ch,
979 					      SPDK_BDEV_QOS_TIMESLICE_IN_USEC);
980 }
981 
982 static void
983 spdk_bdev_qos_unregister_poller(void *ctx)
984 {
985 	struct spdk_poller	*poller = ctx;
986 
987 	spdk_poller_unregister(&poller);
988 }
989 
990 static int
991 _spdk_bdev_channel_create(struct spdk_bdev_channel *ch, void *io_device)
992 {
993 	struct spdk_bdev		*bdev = __bdev_from_io_dev(io_device);
994 	struct spdk_bdev_mgmt_channel	*mgmt_ch;
995 	struct spdk_bdev_module_channel	*shared_ch;
996 
997 	ch->bdev = bdev;
998 	ch->channel = bdev->fn_table->get_io_channel(bdev->ctxt);
999 	if (!ch->channel) {
1000 		return -1;
1001 	}
1002 
1003 	ch->mgmt_channel = spdk_get_io_channel(&g_bdev_mgr);
1004 	if (!ch->mgmt_channel) {
1005 		return -1;
1006 	}
1007 
1008 	mgmt_ch = spdk_io_channel_get_ctx(ch->mgmt_channel);
1009 	TAILQ_FOREACH(shared_ch, &mgmt_ch->module_channels, link) {
1010 		if (shared_ch->module_ch == ch->channel) {
1011 			shared_ch->ref++;
1012 			break;
1013 		}
1014 	}
1015 
1016 	if (shared_ch == NULL) {
1017 		shared_ch = calloc(1, sizeof(*shared_ch));
1018 		if (!shared_ch) {
1019 			return -1;
1020 		}
1021 
1022 		shared_ch->io_outstanding = 0;
1023 		TAILQ_INIT(&shared_ch->nomem_io);
1024 		shared_ch->nomem_threshold = 0;
1025 		shared_ch->module_ch = ch->channel;
1026 		shared_ch->ref = 1;
1027 		TAILQ_INSERT_TAIL(&mgmt_ch->module_channels, shared_ch, link);
1028 	}
1029 
1030 	memset(&ch->stat, 0, sizeof(ch->stat));
1031 	TAILQ_INIT(&ch->queued_resets);
1032 	TAILQ_INIT(&ch->qos_io);
1033 	ch->qos_max_ios_per_timeslice = 0;
1034 	ch->io_submitted_this_timeslice = 0;
1035 	ch->qos_poller = NULL;
1036 	ch->flags = 0;
1037 	ch->module_ch = shared_ch;
1038 
1039 	return 0;
1040 }
1041 
1042 static void
1043 _spdk_bdev_channel_destroy_resource(struct spdk_bdev_channel *ch)
1044 {
1045 	struct spdk_bdev_mgmt_channel	*mgmt_channel;
1046 	struct spdk_bdev_module_channel	*shared_ch = NULL;
1047 
1048 	if (!ch) {
1049 		return;
1050 	}
1051 
1052 	if (ch->channel) {
1053 		spdk_put_io_channel(ch->channel);
1054 	}
1055 
1056 	if (ch->mgmt_channel) {
1057 		shared_ch = ch->module_ch;
1058 		if (shared_ch) {
1059 			assert(shared_ch->ref > 0);
1060 			shared_ch->ref--;
1061 			if (shared_ch->ref == 0) {
1062 				mgmt_channel = spdk_io_channel_get_ctx(ch->mgmt_channel);
1063 				assert(shared_ch->io_outstanding == 0);
1064 				TAILQ_REMOVE(&mgmt_channel->module_channels, shared_ch, link);
1065 				free(shared_ch);
1066 			}
1067 		}
1068 		spdk_put_io_channel(ch->mgmt_channel);
1069 	}
1070 }
1071 
1072 static int
1073 _spdk_bdev_qos_channel_create(struct spdk_bdev *bdev)
1074 {
1075 	bdev->qos_channel = calloc(1, sizeof(struct spdk_bdev_channel));
1076 	if (!bdev->qos_channel) {
1077 		return -1;
1078 	}
1079 
1080 	bdev->qos_thread = spdk_get_thread();
1081 	if (!bdev->qos_thread) {
1082 		return -1;
1083 	}
1084 
1085 	if (_spdk_bdev_channel_create(bdev->qos_channel, __bdev_to_io_dev(bdev)) != 0) {
1086 		return -1;
1087 	}
1088 
1089 	bdev->qos_channel->flags |= BDEV_CH_QOS_ENABLED;
1090 	spdk_bdev_qos_get_max_ios_per_timeslice(bdev);
1091 	spdk_bdev_qos_register_poller(bdev->qos_channel);
1092 
1093 	return 0;
1094 }
1095 
1096 static void
1097 _spdk_bdev_qos_channel_destroy(void *ctx)
1098 {
1099 	struct spdk_bdev_channel	*qos_channel = ctx;
1100 	struct spdk_bdev		*bdev = NULL;
1101 	struct spdk_poller		*poller = NULL;
1102 
1103 	if (!qos_channel) {
1104 		SPDK_DEBUGLOG(SPDK_LOG_BDEV, "QoS channel already NULL\n");
1105 		return;
1106 	}
1107 
1108 	bdev = qos_channel->bdev;
1109 	poller = qos_channel->qos_poller;
1110 
1111 	assert(bdev->qos_thread == spdk_get_thread());
1112 	assert(bdev->qos_channel == qos_channel);
1113 
1114 	free(bdev->qos_channel);
1115 	bdev->qos_channel = NULL;
1116 	bdev->qos_thread = NULL;
1117 
1118 	if (!poller) {
1119 		SPDK_DEBUGLOG(SPDK_LOG_BDEV, "QoS poller already NULL\n");
1120 	} else {
1121 		spdk_bdev_qos_unregister_poller(poller);
1122 	}
1123 }
1124 
1125 static void
1126 spdk_bdev_qos_channel_create_async(void *ctx)
1127 {
1128 	struct spdk_bdev	*bdev = ctx;
1129 
1130 	if (!bdev->qos_channel) {
1131 		if (_spdk_bdev_qos_channel_create(bdev) != 0) {
1132 			SPDK_ERRLOG("QoS channel failed to create\n");
1133 			_spdk_bdev_channel_destroy_resource(bdev->qos_channel);
1134 			_spdk_bdev_qos_channel_destroy(bdev->qos_channel);
1135 		}
1136 	}
1137 }
1138 
1139 static int
1140 spdk_bdev_qos_channel_create(void *ctx)
1141 {
1142 	struct spdk_bdev	*bdev = ctx;
1143 	struct spdk_thread	*qos_thread = bdev->qos_thread;
1144 
1145 	/*
1146 	 * There is an async destroying on going.
1147 	 * Send a message to that thread to defer the creation.
1148 	 */
1149 	if (bdev->qos_channel_destroying == true) {
1150 		if (qos_thread) {
1151 			spdk_thread_send_msg(qos_thread,
1152 					     spdk_bdev_qos_channel_create_async, bdev);
1153 			return 0;
1154 		}
1155 	}
1156 
1157 	if (!bdev->qos_channel) {
1158 		return _spdk_bdev_qos_channel_create(bdev);
1159 	} else {
1160 		return 0;
1161 	}
1162 }
1163 
1164 static int
1165 spdk_bdev_channel_create(void *io_device, void *ctx_buf)
1166 {
1167 	struct spdk_bdev		*bdev = __bdev_from_io_dev(io_device);
1168 	struct spdk_bdev_channel	*ch = ctx_buf;
1169 
1170 	if (_spdk_bdev_channel_create(ch, io_device) != 0) {
1171 		_spdk_bdev_channel_destroy_resource(ch);
1172 		return -1;
1173 	}
1174 
1175 	/* Rate limiting on this bdev enabled */
1176 	if (bdev->ios_per_sec > 0) {
1177 		if (spdk_bdev_qos_channel_create(bdev) != 0) {
1178 			_spdk_bdev_channel_destroy_resource(ch);
1179 			_spdk_bdev_channel_destroy_resource(bdev->qos_channel);
1180 			_spdk_bdev_qos_channel_destroy(bdev->qos_channel);
1181 			return -1;
1182 		}
1183 	}
1184 
1185 	pthread_mutex_lock(&bdev->mutex);
1186 	bdev->channel_count++;
1187 	pthread_mutex_unlock(&bdev->mutex);
1188 
1189 #ifdef SPDK_CONFIG_VTUNE
1190 	{
1191 		char *name;
1192 		__itt_init_ittlib(NULL, 0);
1193 		name = spdk_sprintf_alloc("spdk_bdev_%s_%p", ch->bdev->name, ch);
1194 		if (!name) {
1195 			_spdk_bdev_channel_destroy_resource(ch);
1196 			_spdk_bdev_channel_destroy_resource(bdev->qos_channel);
1197 			_spdk_bdev_qos_channel_destroy(bdev->qos_channel);
1198 			return -1;
1199 		}
1200 		ch->handle = __itt_string_handle_create(name);
1201 		free(name);
1202 		ch->start_tsc = spdk_get_ticks();
1203 		ch->interval_tsc = spdk_get_ticks_hz() / 100;
1204 	}
1205 #endif
1206 
1207 	return 0;
1208 }
1209 
1210 /*
1211  * Abort I/O that are waiting on a data buffer.  These types of I/O are
1212  *  linked using the spdk_bdev_io buf_link TAILQ_ENTRY.
1213  */
1214 static void
1215 _spdk_bdev_abort_buf_io(bdev_io_stailq_t *queue, struct spdk_bdev_channel *ch)
1216 {
1217 	bdev_io_stailq_t tmp;
1218 	struct spdk_bdev_io *bdev_io;
1219 
1220 	STAILQ_INIT(&tmp);
1221 
1222 	while (!STAILQ_EMPTY(queue)) {
1223 		bdev_io = STAILQ_FIRST(queue);
1224 		STAILQ_REMOVE_HEAD(queue, buf_link);
1225 		if (bdev_io->ch == ch) {
1226 			spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED);
1227 		} else {
1228 			STAILQ_INSERT_TAIL(&tmp, bdev_io, buf_link);
1229 		}
1230 	}
1231 
1232 	STAILQ_SWAP(&tmp, queue, spdk_bdev_io);
1233 }
1234 
1235 /*
1236  * Abort I/O that are queued waiting for submission.  These types of I/O are
1237  *  linked using the spdk_bdev_io link TAILQ_ENTRY.
1238  */
1239 static void
1240 _spdk_bdev_abort_queued_io(bdev_io_tailq_t *queue, struct spdk_bdev_channel *ch)
1241 {
1242 	struct spdk_bdev_io *bdev_io, *tmp;
1243 
1244 	TAILQ_FOREACH_SAFE(bdev_io, queue, link, tmp) {
1245 		if (bdev_io->ch == ch) {
1246 			TAILQ_REMOVE(queue, bdev_io, link);
1247 			/*
1248 			 * spdk_bdev_io_complete() assumes that the completed I/O had
1249 			 *  been submitted to the bdev module.  Since in this case it
1250 			 *  hadn't, bump io_outstanding to account for the decrement
1251 			 *  that spdk_bdev_io_complete() will do.
1252 			 */
1253 			if (bdev_io->type != SPDK_BDEV_IO_TYPE_RESET) {
1254 				ch->module_ch->io_outstanding++;
1255 			}
1256 			spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED);
1257 		}
1258 	}
1259 }
1260 
1261 static void
1262 _spdk_bdev_channel_destroy(struct spdk_bdev_channel *ch)
1263 {
1264 	struct spdk_bdev_mgmt_channel	*mgmt_channel;
1265 	struct spdk_bdev_module_channel	*shared_ch = ch->module_ch;
1266 
1267 	mgmt_channel = spdk_io_channel_get_ctx(ch->mgmt_channel);
1268 
1269 	_spdk_bdev_abort_queued_io(&ch->queued_resets, ch);
1270 	_spdk_bdev_abort_queued_io(&ch->qos_io, ch);
1271 	_spdk_bdev_abort_queued_io(&shared_ch->nomem_io, ch);
1272 	_spdk_bdev_abort_buf_io(&mgmt_channel->need_buf_small, ch);
1273 	_spdk_bdev_abort_buf_io(&mgmt_channel->need_buf_large, ch);
1274 
1275 	_spdk_bdev_channel_destroy_resource(ch);
1276 }
1277 
1278 static void
1279 spdk_bdev_qos_channel_destroy(void *ctx)
1280 {
1281 	struct spdk_bdev	*bdev = ctx;
1282 
1283 	bdev->qos_channel_destroying = false;
1284 
1285 	_spdk_bdev_channel_destroy(bdev->qos_channel);
1286 	_spdk_bdev_qos_channel_destroy(bdev->qos_channel);
1287 }
1288 
1289 static void
1290 spdk_bdev_channel_destroy(void *io_device, void *ctx_buf)
1291 {
1292 	struct spdk_bdev_channel	*ch = ctx_buf;
1293 	struct spdk_bdev		*bdev = ch->bdev;
1294 	uint32_t			channel_count = 0;
1295 
1296 	_spdk_bdev_channel_destroy(ch);
1297 
1298 	pthread_mutex_lock(&bdev->mutex);
1299 	bdev->channel_count--;
1300 	channel_count = bdev->channel_count;
1301 	pthread_mutex_unlock(&bdev->mutex);
1302 
1303 	/* Destroy QoS channel as no active bdev channels there */
1304 	if (channel_count == 0 && bdev->ios_per_sec > 0 && bdev->qos_thread) {
1305 		if (bdev->qos_thread == spdk_get_thread()) {
1306 			spdk_bdev_qos_channel_destroy(bdev);
1307 		} else {
1308 			bdev->qos_channel_destroying = true;
1309 			spdk_thread_send_msg(bdev->qos_thread,
1310 					     spdk_bdev_qos_channel_destroy, bdev);
1311 		}
1312 	}
1313 }
1314 
1315 int
1316 spdk_bdev_alias_add(struct spdk_bdev *bdev, const char *alias)
1317 {
1318 	struct spdk_bdev_alias *tmp;
1319 
1320 	if (alias == NULL) {
1321 		SPDK_ERRLOG("Empty alias passed\n");
1322 		return -EINVAL;
1323 	}
1324 
1325 	if (spdk_bdev_get_by_name(alias)) {
1326 		SPDK_ERRLOG("Bdev name/alias: %s already exists\n", alias);
1327 		return -EEXIST;
1328 	}
1329 
1330 	tmp = calloc(1, sizeof(*tmp));
1331 	if (tmp == NULL) {
1332 		SPDK_ERRLOG("Unable to allocate alias\n");
1333 		return -ENOMEM;
1334 	}
1335 
1336 	tmp->alias = strdup(alias);
1337 	if (tmp->alias == NULL) {
1338 		free(tmp);
1339 		SPDK_ERRLOG("Unable to allocate alias\n");
1340 		return -ENOMEM;
1341 	}
1342 
1343 	TAILQ_INSERT_TAIL(&bdev->aliases, tmp, tailq);
1344 
1345 	return 0;
1346 }
1347 
1348 int
1349 spdk_bdev_alias_del(struct spdk_bdev *bdev, const char *alias)
1350 {
1351 	struct spdk_bdev_alias *tmp;
1352 
1353 	TAILQ_FOREACH(tmp, &bdev->aliases, tailq) {
1354 		if (strcmp(alias, tmp->alias) == 0) {
1355 			TAILQ_REMOVE(&bdev->aliases, tmp, tailq);
1356 			free(tmp->alias);
1357 			free(tmp);
1358 			return 0;
1359 		}
1360 	}
1361 
1362 	SPDK_INFOLOG(SPDK_LOG_BDEV, "Alias %s does not exists\n", alias);
1363 
1364 	return -ENOENT;
1365 }
1366 
1367 struct spdk_io_channel *
1368 spdk_bdev_get_io_channel(struct spdk_bdev_desc *desc)
1369 {
1370 	return spdk_get_io_channel(__bdev_to_io_dev(desc->bdev));
1371 }
1372 
1373 const char *
1374 spdk_bdev_get_name(const struct spdk_bdev *bdev)
1375 {
1376 	return bdev->name;
1377 }
1378 
1379 const char *
1380 spdk_bdev_get_product_name(const struct spdk_bdev *bdev)
1381 {
1382 	return bdev->product_name;
1383 }
1384 
1385 const struct spdk_bdev_aliases_list *
1386 spdk_bdev_get_aliases(const struct spdk_bdev *bdev)
1387 {
1388 	return &bdev->aliases;
1389 }
1390 
1391 uint32_t
1392 spdk_bdev_get_block_size(const struct spdk_bdev *bdev)
1393 {
1394 	return bdev->blocklen;
1395 }
1396 
1397 uint64_t
1398 spdk_bdev_get_num_blocks(const struct spdk_bdev *bdev)
1399 {
1400 	return bdev->blockcnt;
1401 }
1402 
1403 size_t
1404 spdk_bdev_get_buf_align(const struct spdk_bdev *bdev)
1405 {
1406 	/* TODO: push this logic down to the bdev modules */
1407 	if (bdev->need_aligned_buffer) {
1408 		return bdev->blocklen;
1409 	}
1410 
1411 	return 1;
1412 }
1413 
1414 uint32_t
1415 spdk_bdev_get_optimal_io_boundary(const struct spdk_bdev *bdev)
1416 {
1417 	return bdev->optimal_io_boundary;
1418 }
1419 
1420 bool
1421 spdk_bdev_has_write_cache(const struct spdk_bdev *bdev)
1422 {
1423 	return bdev->write_cache;
1424 }
1425 
1426 const struct spdk_uuid *
1427 spdk_bdev_get_uuid(const struct spdk_bdev *bdev)
1428 {
1429 	return &bdev->uuid;
1430 }
1431 
1432 int
1433 spdk_bdev_notify_blockcnt_change(struct spdk_bdev *bdev, uint64_t size)
1434 {
1435 	int ret;
1436 
1437 	pthread_mutex_lock(&bdev->mutex);
1438 
1439 	/* bdev has open descriptors */
1440 	if (!TAILQ_EMPTY(&bdev->open_descs) &&
1441 	    bdev->blockcnt > size) {
1442 		ret = -EBUSY;
1443 	} else {
1444 		bdev->blockcnt = size;
1445 		ret = 0;
1446 	}
1447 
1448 	pthread_mutex_unlock(&bdev->mutex);
1449 
1450 	return ret;
1451 }
1452 
1453 /*
1454  * Convert I/O offset and length from bytes to blocks.
1455  *
1456  * Returns zero on success or non-zero if the byte parameters aren't divisible by the block size.
1457  */
1458 static uint64_t
1459 spdk_bdev_bytes_to_blocks(struct spdk_bdev *bdev, uint64_t offset_bytes, uint64_t *offset_blocks,
1460 			  uint64_t num_bytes, uint64_t *num_blocks)
1461 {
1462 	uint32_t block_size = bdev->blocklen;
1463 
1464 	*offset_blocks = offset_bytes / block_size;
1465 	*num_blocks = num_bytes / block_size;
1466 
1467 	return (offset_bytes % block_size) | (num_bytes % block_size);
1468 }
1469 
1470 static bool
1471 spdk_bdev_io_valid_blocks(struct spdk_bdev *bdev, uint64_t offset_blocks, uint64_t num_blocks)
1472 {
1473 	/* Return failure if offset_blocks + num_blocks is less than offset_blocks; indicates there
1474 	 * has been an overflow and hence the offset has been wrapped around */
1475 	if (offset_blocks + num_blocks < offset_blocks) {
1476 		return false;
1477 	}
1478 
1479 	/* Return failure if offset_blocks + num_blocks exceeds the size of the bdev */
1480 	if (offset_blocks + num_blocks > bdev->blockcnt) {
1481 		return false;
1482 	}
1483 
1484 	return true;
1485 }
1486 
1487 int
1488 spdk_bdev_read(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
1489 	       void *buf, uint64_t offset, uint64_t nbytes,
1490 	       spdk_bdev_io_completion_cb cb, void *cb_arg)
1491 {
1492 	uint64_t offset_blocks, num_blocks;
1493 
1494 	if (spdk_bdev_bytes_to_blocks(desc->bdev, offset, &offset_blocks, nbytes, &num_blocks) != 0) {
1495 		return -EINVAL;
1496 	}
1497 
1498 	return spdk_bdev_read_blocks(desc, ch, buf, offset_blocks, num_blocks, cb, cb_arg);
1499 }
1500 
1501 int
1502 spdk_bdev_read_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
1503 		      void *buf, uint64_t offset_blocks, uint64_t num_blocks,
1504 		      spdk_bdev_io_completion_cb cb, void *cb_arg)
1505 {
1506 	struct spdk_bdev *bdev = desc->bdev;
1507 	struct spdk_bdev_io *bdev_io;
1508 	struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch);
1509 
1510 	if (!spdk_bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) {
1511 		return -EINVAL;
1512 	}
1513 
1514 	bdev_io = spdk_bdev_get_io(channel->mgmt_channel);
1515 	if (!bdev_io) {
1516 		SPDK_ERRLOG("spdk_bdev_io memory allocation failed duing read\n");
1517 		return -ENOMEM;
1518 	}
1519 
1520 	bdev_io->ch = channel;
1521 	bdev_io->type = SPDK_BDEV_IO_TYPE_READ;
1522 	bdev_io->u.bdev.iov.iov_base = buf;
1523 	bdev_io->u.bdev.iov.iov_len = num_blocks * bdev->blocklen;
1524 	bdev_io->u.bdev.iovs = &bdev_io->u.bdev.iov;
1525 	bdev_io->u.bdev.iovcnt = 1;
1526 	bdev_io->u.bdev.num_blocks = num_blocks;
1527 	bdev_io->u.bdev.offset_blocks = offset_blocks;
1528 	spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb);
1529 
1530 	spdk_bdev_io_submit(bdev_io);
1531 	return 0;
1532 }
1533 
1534 int
1535 spdk_bdev_readv(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
1536 		struct iovec *iov, int iovcnt,
1537 		uint64_t offset, uint64_t nbytes,
1538 		spdk_bdev_io_completion_cb cb, void *cb_arg)
1539 {
1540 	uint64_t offset_blocks, num_blocks;
1541 
1542 	if (spdk_bdev_bytes_to_blocks(desc->bdev, offset, &offset_blocks, nbytes, &num_blocks) != 0) {
1543 		return -EINVAL;
1544 	}
1545 
1546 	return spdk_bdev_readv_blocks(desc, ch, iov, iovcnt, offset_blocks, num_blocks, cb, cb_arg);
1547 }
1548 
1549 int spdk_bdev_readv_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
1550 			   struct iovec *iov, int iovcnt,
1551 			   uint64_t offset_blocks, uint64_t num_blocks,
1552 			   spdk_bdev_io_completion_cb cb, void *cb_arg)
1553 {
1554 	struct spdk_bdev *bdev = desc->bdev;
1555 	struct spdk_bdev_io *bdev_io;
1556 	struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch);
1557 
1558 	if (!spdk_bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) {
1559 		return -EINVAL;
1560 	}
1561 
1562 	bdev_io = spdk_bdev_get_io(channel->mgmt_channel);
1563 	if (!bdev_io) {
1564 		SPDK_ERRLOG("spdk_bdev_io memory allocation failed duing read\n");
1565 		return -ENOMEM;
1566 	}
1567 
1568 	bdev_io->ch = channel;
1569 	bdev_io->type = SPDK_BDEV_IO_TYPE_READ;
1570 	bdev_io->u.bdev.iovs = iov;
1571 	bdev_io->u.bdev.iovcnt = iovcnt;
1572 	bdev_io->u.bdev.num_blocks = num_blocks;
1573 	bdev_io->u.bdev.offset_blocks = offset_blocks;
1574 	spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb);
1575 
1576 	spdk_bdev_io_submit(bdev_io);
1577 	return 0;
1578 }
1579 
1580 int
1581 spdk_bdev_write(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
1582 		void *buf, uint64_t offset, uint64_t nbytes,
1583 		spdk_bdev_io_completion_cb cb, void *cb_arg)
1584 {
1585 	uint64_t offset_blocks, num_blocks;
1586 
1587 	if (spdk_bdev_bytes_to_blocks(desc->bdev, offset, &offset_blocks, nbytes, &num_blocks) != 0) {
1588 		return -EINVAL;
1589 	}
1590 
1591 	return spdk_bdev_write_blocks(desc, ch, buf, offset_blocks, num_blocks, cb, cb_arg);
1592 }
1593 
1594 int
1595 spdk_bdev_write_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
1596 		       void *buf, uint64_t offset_blocks, uint64_t num_blocks,
1597 		       spdk_bdev_io_completion_cb cb, void *cb_arg)
1598 {
1599 	struct spdk_bdev *bdev = desc->bdev;
1600 	struct spdk_bdev_io *bdev_io;
1601 	struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch);
1602 
1603 	if (!desc->write) {
1604 		return -EBADF;
1605 	}
1606 
1607 	if (!spdk_bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) {
1608 		return -EINVAL;
1609 	}
1610 
1611 	bdev_io = spdk_bdev_get_io(channel->mgmt_channel);
1612 	if (!bdev_io) {
1613 		SPDK_ERRLOG("bdev_io memory allocation failed duing write\n");
1614 		return -ENOMEM;
1615 	}
1616 
1617 	bdev_io->ch = channel;
1618 	bdev_io->type = SPDK_BDEV_IO_TYPE_WRITE;
1619 	bdev_io->u.bdev.iov.iov_base = buf;
1620 	bdev_io->u.bdev.iov.iov_len = num_blocks * bdev->blocklen;
1621 	bdev_io->u.bdev.iovs = &bdev_io->u.bdev.iov;
1622 	bdev_io->u.bdev.iovcnt = 1;
1623 	bdev_io->u.bdev.num_blocks = num_blocks;
1624 	bdev_io->u.bdev.offset_blocks = offset_blocks;
1625 	spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb);
1626 
1627 	spdk_bdev_io_submit(bdev_io);
1628 	return 0;
1629 }
1630 
1631 int
1632 spdk_bdev_writev(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
1633 		 struct iovec *iov, int iovcnt,
1634 		 uint64_t offset, uint64_t len,
1635 		 spdk_bdev_io_completion_cb cb, void *cb_arg)
1636 {
1637 	uint64_t offset_blocks, num_blocks;
1638 
1639 	if (spdk_bdev_bytes_to_blocks(desc->bdev, offset, &offset_blocks, len, &num_blocks) != 0) {
1640 		return -EINVAL;
1641 	}
1642 
1643 	return spdk_bdev_writev_blocks(desc, ch, iov, iovcnt, offset_blocks, num_blocks, cb, cb_arg);
1644 }
1645 
1646 int
1647 spdk_bdev_writev_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
1648 			struct iovec *iov, int iovcnt,
1649 			uint64_t offset_blocks, uint64_t num_blocks,
1650 			spdk_bdev_io_completion_cb cb, void *cb_arg)
1651 {
1652 	struct spdk_bdev *bdev = desc->bdev;
1653 	struct spdk_bdev_io *bdev_io;
1654 	struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch);
1655 
1656 	if (!desc->write) {
1657 		return -EBADF;
1658 	}
1659 
1660 	if (!spdk_bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) {
1661 		return -EINVAL;
1662 	}
1663 
1664 	bdev_io = spdk_bdev_get_io(channel->mgmt_channel);
1665 	if (!bdev_io) {
1666 		SPDK_ERRLOG("bdev_io memory allocation failed duing writev\n");
1667 		return -ENOMEM;
1668 	}
1669 
1670 	bdev_io->ch = channel;
1671 	bdev_io->type = SPDK_BDEV_IO_TYPE_WRITE;
1672 	bdev_io->u.bdev.iovs = iov;
1673 	bdev_io->u.bdev.iovcnt = iovcnt;
1674 	bdev_io->u.bdev.num_blocks = num_blocks;
1675 	bdev_io->u.bdev.offset_blocks = offset_blocks;
1676 	spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb);
1677 
1678 	spdk_bdev_io_submit(bdev_io);
1679 	return 0;
1680 }
1681 
1682 int
1683 spdk_bdev_write_zeroes(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
1684 		       uint64_t offset, uint64_t len,
1685 		       spdk_bdev_io_completion_cb cb, void *cb_arg)
1686 {
1687 	uint64_t offset_blocks, num_blocks;
1688 
1689 	if (spdk_bdev_bytes_to_blocks(desc->bdev, offset, &offset_blocks, len, &num_blocks) != 0) {
1690 		return -EINVAL;
1691 	}
1692 
1693 	return spdk_bdev_write_zeroes_blocks(desc, ch, offset_blocks, num_blocks, cb, cb_arg);
1694 }
1695 
1696 int
1697 spdk_bdev_write_zeroes_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
1698 			      uint64_t offset_blocks, uint64_t num_blocks,
1699 			      spdk_bdev_io_completion_cb cb, void *cb_arg)
1700 {
1701 	struct spdk_bdev *bdev = desc->bdev;
1702 	struct spdk_bdev_io *bdev_io;
1703 	struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch);
1704 	uint64_t len;
1705 	bool split_request = false;
1706 
1707 	if (num_blocks > UINT64_MAX / spdk_bdev_get_block_size(bdev)) {
1708 		SPDK_ERRLOG("length argument out of range in write_zeroes\n");
1709 		return -ERANGE;
1710 	}
1711 
1712 	if (!spdk_bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) {
1713 		return -EINVAL;
1714 	}
1715 
1716 	bdev_io = spdk_bdev_get_io(channel->mgmt_channel);
1717 
1718 	if (!bdev_io) {
1719 		SPDK_ERRLOG("bdev_io memory allocation failed duing write_zeroes\n");
1720 		return -ENOMEM;
1721 	}
1722 
1723 	bdev_io->ch = channel;
1724 	bdev_io->u.bdev.offset_blocks = offset_blocks;
1725 
1726 	if (spdk_bdev_io_type_supported(bdev, SPDK_BDEV_IO_TYPE_WRITE_ZEROES)) {
1727 		bdev_io->type = SPDK_BDEV_IO_TYPE_WRITE_ZEROES;
1728 		bdev_io->u.bdev.num_blocks = num_blocks;
1729 		bdev_io->u.bdev.iovs = NULL;
1730 		bdev_io->u.bdev.iovcnt = 0;
1731 
1732 	} else {
1733 		assert(spdk_bdev_get_block_size(bdev) <= ZERO_BUFFER_SIZE);
1734 
1735 		len = spdk_bdev_get_block_size(bdev) * num_blocks;
1736 
1737 		if (len > ZERO_BUFFER_SIZE) {
1738 			split_request = true;
1739 			len = ZERO_BUFFER_SIZE;
1740 		}
1741 
1742 		bdev_io->type = SPDK_BDEV_IO_TYPE_WRITE;
1743 		bdev_io->u.bdev.iov.iov_base = g_bdev_mgr.zero_buffer;
1744 		bdev_io->u.bdev.iov.iov_len = len;
1745 		bdev_io->u.bdev.iovs = &bdev_io->u.bdev.iov;
1746 		bdev_io->u.bdev.iovcnt = 1;
1747 		bdev_io->u.bdev.num_blocks = len / spdk_bdev_get_block_size(bdev);
1748 		bdev_io->split_remaining_num_blocks = num_blocks - bdev_io->u.bdev.num_blocks;
1749 		bdev_io->split_current_offset_blocks = offset_blocks + bdev_io->u.bdev.num_blocks;
1750 	}
1751 
1752 	if (split_request) {
1753 		bdev_io->stored_user_cb = cb;
1754 		spdk_bdev_io_init(bdev_io, bdev, cb_arg, spdk_bdev_write_zeroes_split);
1755 	} else {
1756 		spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb);
1757 	}
1758 	spdk_bdev_io_submit(bdev_io);
1759 	return 0;
1760 }
1761 
1762 int
1763 spdk_bdev_unmap(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
1764 		uint64_t offset, uint64_t nbytes,
1765 		spdk_bdev_io_completion_cb cb, void *cb_arg)
1766 {
1767 	uint64_t offset_blocks, num_blocks;
1768 
1769 	if (spdk_bdev_bytes_to_blocks(desc->bdev, offset, &offset_blocks, nbytes, &num_blocks) != 0) {
1770 		return -EINVAL;
1771 	}
1772 
1773 	return spdk_bdev_unmap_blocks(desc, ch, offset_blocks, num_blocks, cb, cb_arg);
1774 }
1775 
1776 int
1777 spdk_bdev_unmap_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
1778 		       uint64_t offset_blocks, uint64_t num_blocks,
1779 		       spdk_bdev_io_completion_cb cb, void *cb_arg)
1780 {
1781 	struct spdk_bdev *bdev = desc->bdev;
1782 	struct spdk_bdev_io *bdev_io;
1783 	struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch);
1784 
1785 	if (!desc->write) {
1786 		return -EBADF;
1787 	}
1788 
1789 	if (!spdk_bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) {
1790 		return -EINVAL;
1791 	}
1792 
1793 	if (num_blocks == 0) {
1794 		SPDK_ERRLOG("Can't unmap 0 bytes\n");
1795 		return -EINVAL;
1796 	}
1797 
1798 	bdev_io = spdk_bdev_get_io(channel->mgmt_channel);
1799 	if (!bdev_io) {
1800 		SPDK_ERRLOG("bdev_io memory allocation failed duing unmap\n");
1801 		return -ENOMEM;
1802 	}
1803 
1804 	bdev_io->ch = channel;
1805 	bdev_io->type = SPDK_BDEV_IO_TYPE_UNMAP;
1806 	bdev_io->u.bdev.iov.iov_base = NULL;
1807 	bdev_io->u.bdev.iov.iov_len = 0;
1808 	bdev_io->u.bdev.iovs = &bdev_io->u.bdev.iov;
1809 	bdev_io->u.bdev.iovcnt = 1;
1810 	bdev_io->u.bdev.offset_blocks = offset_blocks;
1811 	bdev_io->u.bdev.num_blocks = num_blocks;
1812 	spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb);
1813 
1814 	spdk_bdev_io_submit(bdev_io);
1815 	return 0;
1816 }
1817 
1818 int
1819 spdk_bdev_flush(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
1820 		uint64_t offset, uint64_t length,
1821 		spdk_bdev_io_completion_cb cb, void *cb_arg)
1822 {
1823 	uint64_t offset_blocks, num_blocks;
1824 
1825 	if (spdk_bdev_bytes_to_blocks(desc->bdev, offset, &offset_blocks, length, &num_blocks) != 0) {
1826 		return -EINVAL;
1827 	}
1828 
1829 	return spdk_bdev_flush_blocks(desc, ch, offset_blocks, num_blocks, cb, cb_arg);
1830 }
1831 
1832 int
1833 spdk_bdev_flush_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
1834 		       uint64_t offset_blocks, uint64_t num_blocks,
1835 		       spdk_bdev_io_completion_cb cb, void *cb_arg)
1836 {
1837 	struct spdk_bdev *bdev = desc->bdev;
1838 	struct spdk_bdev_io *bdev_io;
1839 	struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch);
1840 
1841 	if (!desc->write) {
1842 		return -EBADF;
1843 	}
1844 
1845 	if (!spdk_bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) {
1846 		return -EINVAL;
1847 	}
1848 
1849 	bdev_io = spdk_bdev_get_io(channel->mgmt_channel);
1850 	if (!bdev_io) {
1851 		SPDK_ERRLOG("bdev_io memory allocation failed duing flush\n");
1852 		return -ENOMEM;
1853 	}
1854 
1855 	bdev_io->ch = channel;
1856 	bdev_io->type = SPDK_BDEV_IO_TYPE_FLUSH;
1857 	bdev_io->u.bdev.iovs = NULL;
1858 	bdev_io->u.bdev.iovcnt = 0;
1859 	bdev_io->u.bdev.offset_blocks = offset_blocks;
1860 	bdev_io->u.bdev.num_blocks = num_blocks;
1861 	spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb);
1862 
1863 	spdk_bdev_io_submit(bdev_io);
1864 	return 0;
1865 }
1866 
1867 static void
1868 _spdk_bdev_reset_dev(struct spdk_io_channel_iter *i, int status)
1869 {
1870 	struct spdk_bdev_channel *ch = spdk_io_channel_iter_get_ctx(i);
1871 	struct spdk_bdev_io *bdev_io;
1872 
1873 	bdev_io = TAILQ_FIRST(&ch->queued_resets);
1874 	TAILQ_REMOVE(&ch->queued_resets, bdev_io, link);
1875 	spdk_bdev_io_submit_reset(bdev_io);
1876 }
1877 
1878 static void
1879 _spdk_bdev_reset_freeze_channel(struct spdk_io_channel_iter *i)
1880 {
1881 	struct spdk_io_channel		*ch;
1882 	struct spdk_bdev_channel	*channel;
1883 	struct spdk_bdev_mgmt_channel	*mgmt_channel;
1884 	struct spdk_bdev_module_channel	*shared_ch;
1885 
1886 	ch = spdk_io_channel_iter_get_channel(i);
1887 	channel = spdk_io_channel_get_ctx(ch);
1888 	mgmt_channel = spdk_io_channel_get_ctx(channel->mgmt_channel);
1889 	shared_ch = channel->module_ch;
1890 
1891 	channel->flags |= BDEV_CH_RESET_IN_PROGRESS;
1892 
1893 	_spdk_bdev_abort_queued_io(&shared_ch->nomem_io, channel);
1894 	_spdk_bdev_abort_queued_io(&channel->qos_io, channel);
1895 	_spdk_bdev_abort_buf_io(&mgmt_channel->need_buf_small, channel);
1896 	_spdk_bdev_abort_buf_io(&mgmt_channel->need_buf_large, channel);
1897 
1898 	spdk_for_each_channel_continue(i, 0);
1899 }
1900 
1901 static void
1902 _spdk_bdev_start_reset(void *ctx)
1903 {
1904 	struct spdk_bdev_channel *ch = ctx;
1905 
1906 	spdk_for_each_channel(__bdev_to_io_dev(ch->bdev), _spdk_bdev_reset_freeze_channel,
1907 			      ch, _spdk_bdev_reset_dev);
1908 }
1909 
1910 static void
1911 _spdk_bdev_channel_start_reset(struct spdk_bdev_channel *ch)
1912 {
1913 	struct spdk_bdev *bdev = ch->bdev;
1914 
1915 	assert(!TAILQ_EMPTY(&ch->queued_resets));
1916 
1917 	pthread_mutex_lock(&bdev->mutex);
1918 	if (bdev->reset_in_progress == NULL) {
1919 		bdev->reset_in_progress = TAILQ_FIRST(&ch->queued_resets);
1920 		/*
1921 		 * Take a channel reference for the target bdev for the life of this
1922 		 *  reset.  This guards against the channel getting destroyed while
1923 		 *  spdk_for_each_channel() calls related to this reset IO are in
1924 		 *  progress.  We will release the reference when this reset is
1925 		 *  completed.
1926 		 */
1927 		bdev->reset_in_progress->u.reset.ch_ref = spdk_get_io_channel(__bdev_to_io_dev(bdev));
1928 		_spdk_bdev_start_reset(ch);
1929 	}
1930 	pthread_mutex_unlock(&bdev->mutex);
1931 }
1932 
1933 int
1934 spdk_bdev_reset(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
1935 		spdk_bdev_io_completion_cb cb, void *cb_arg)
1936 {
1937 	struct spdk_bdev *bdev = desc->bdev;
1938 	struct spdk_bdev_io *bdev_io;
1939 	struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch);
1940 
1941 	bdev_io = spdk_bdev_get_io(channel->mgmt_channel);
1942 	if (!bdev_io) {
1943 		SPDK_ERRLOG("bdev_io memory allocation failed duing reset\n");
1944 		return -ENOMEM;
1945 	}
1946 
1947 	bdev_io->ch = channel;
1948 	bdev_io->type = SPDK_BDEV_IO_TYPE_RESET;
1949 	bdev_io->u.reset.ch_ref = NULL;
1950 	spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb);
1951 
1952 	pthread_mutex_lock(&bdev->mutex);
1953 	TAILQ_INSERT_TAIL(&channel->queued_resets, bdev_io, link);
1954 	pthread_mutex_unlock(&bdev->mutex);
1955 
1956 	_spdk_bdev_channel_start_reset(channel);
1957 
1958 	return 0;
1959 }
1960 
1961 void
1962 spdk_bdev_get_io_stat(struct spdk_bdev *bdev, struct spdk_io_channel *ch,
1963 		      struct spdk_bdev_io_stat *stat)
1964 {
1965 #ifdef SPDK_CONFIG_VTUNE
1966 	SPDK_ERRLOG("Calling spdk_bdev_get_io_stat is not allowed when VTune integration is enabled.\n");
1967 	memset(stat, 0, sizeof(*stat));
1968 	return;
1969 #endif
1970 
1971 	struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch);
1972 
1973 	channel->stat.ticks_rate = spdk_get_ticks_hz();
1974 	*stat = channel->stat;
1975 	memset(&channel->stat, 0, sizeof(channel->stat));
1976 }
1977 
1978 int
1979 spdk_bdev_nvme_admin_passthru(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
1980 			      const struct spdk_nvme_cmd *cmd, void *buf, size_t nbytes,
1981 			      spdk_bdev_io_completion_cb cb, void *cb_arg)
1982 {
1983 	struct spdk_bdev *bdev = desc->bdev;
1984 	struct spdk_bdev_io *bdev_io;
1985 	struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch);
1986 
1987 	if (!desc->write) {
1988 		return -EBADF;
1989 	}
1990 
1991 	bdev_io = spdk_bdev_get_io(channel->mgmt_channel);
1992 	if (!bdev_io) {
1993 		SPDK_ERRLOG("bdev_io memory allocation failed during nvme_admin_passthru\n");
1994 		return -ENOMEM;
1995 	}
1996 
1997 	bdev_io->ch = channel;
1998 	bdev_io->type = SPDK_BDEV_IO_TYPE_NVME_ADMIN;
1999 	bdev_io->u.nvme_passthru.cmd = *cmd;
2000 	bdev_io->u.nvme_passthru.buf = buf;
2001 	bdev_io->u.nvme_passthru.nbytes = nbytes;
2002 	bdev_io->u.nvme_passthru.md_buf = NULL;
2003 	bdev_io->u.nvme_passthru.md_len = 0;
2004 
2005 	spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb);
2006 
2007 	spdk_bdev_io_submit(bdev_io);
2008 	return 0;
2009 }
2010 
2011 int
2012 spdk_bdev_nvme_io_passthru(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
2013 			   const struct spdk_nvme_cmd *cmd, void *buf, size_t nbytes,
2014 			   spdk_bdev_io_completion_cb cb, void *cb_arg)
2015 {
2016 	struct spdk_bdev *bdev = desc->bdev;
2017 	struct spdk_bdev_io *bdev_io;
2018 	struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch);
2019 
2020 	if (!desc->write) {
2021 		/*
2022 		 * Do not try to parse the NVMe command - we could maybe use bits in the opcode
2023 		 *  to easily determine if the command is a read or write, but for now just
2024 		 *  do not allow io_passthru with a read-only descriptor.
2025 		 */
2026 		return -EBADF;
2027 	}
2028 
2029 	bdev_io = spdk_bdev_get_io(channel->mgmt_channel);
2030 	if (!bdev_io) {
2031 		SPDK_ERRLOG("bdev_io memory allocation failed during nvme_admin_passthru\n");
2032 		return -ENOMEM;
2033 	}
2034 
2035 	bdev_io->ch = channel;
2036 	bdev_io->type = SPDK_BDEV_IO_TYPE_NVME_IO;
2037 	bdev_io->u.nvme_passthru.cmd = *cmd;
2038 	bdev_io->u.nvme_passthru.buf = buf;
2039 	bdev_io->u.nvme_passthru.nbytes = nbytes;
2040 	bdev_io->u.nvme_passthru.md_buf = NULL;
2041 	bdev_io->u.nvme_passthru.md_len = 0;
2042 
2043 	spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb);
2044 
2045 	spdk_bdev_io_submit(bdev_io);
2046 	return 0;
2047 }
2048 
2049 int
2050 spdk_bdev_nvme_io_passthru_md(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
2051 			      const struct spdk_nvme_cmd *cmd, void *buf, size_t nbytes, void *md_buf, size_t md_len,
2052 			      spdk_bdev_io_completion_cb cb, void *cb_arg)
2053 {
2054 	struct spdk_bdev *bdev = desc->bdev;
2055 	struct spdk_bdev_io *bdev_io;
2056 	struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch);
2057 
2058 	if (!desc->write) {
2059 		/*
2060 		 * Do not try to parse the NVMe command - we could maybe use bits in the opcode
2061 		 *  to easily determine if the command is a read or write, but for now just
2062 		 *  do not allow io_passthru with a read-only descriptor.
2063 		 */
2064 		return -EBADF;
2065 	}
2066 
2067 	bdev_io = spdk_bdev_get_io(channel->mgmt_channel);
2068 	if (!bdev_io) {
2069 		SPDK_ERRLOG("bdev_io memory allocation failed during nvme_admin_passthru\n");
2070 		return -ENOMEM;
2071 	}
2072 
2073 	bdev_io->ch = channel;
2074 	bdev_io->type = SPDK_BDEV_IO_TYPE_NVME_IO_MD;
2075 	bdev_io->u.nvme_passthru.cmd = *cmd;
2076 	bdev_io->u.nvme_passthru.buf = buf;
2077 	bdev_io->u.nvme_passthru.nbytes = nbytes;
2078 	bdev_io->u.nvme_passthru.md_buf = md_buf;
2079 	bdev_io->u.nvme_passthru.md_len = md_len;
2080 
2081 	spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb);
2082 
2083 	spdk_bdev_io_submit(bdev_io);
2084 	return 0;
2085 }
2086 
2087 int
2088 spdk_bdev_free_io(struct spdk_bdev_io *bdev_io)
2089 {
2090 	if (!bdev_io) {
2091 		SPDK_ERRLOG("bdev_io is NULL\n");
2092 		return -1;
2093 	}
2094 
2095 	if (bdev_io->status == SPDK_BDEV_IO_STATUS_PENDING) {
2096 		SPDK_ERRLOG("bdev_io is in pending state\n");
2097 		assert(false);
2098 		return -1;
2099 	}
2100 
2101 	spdk_bdev_put_io(bdev_io);
2102 
2103 	return 0;
2104 }
2105 
2106 static void
2107 _spdk_bdev_ch_retry_io(struct spdk_bdev_channel *bdev_ch)
2108 {
2109 	struct spdk_bdev *bdev = bdev_ch->bdev;
2110 	struct spdk_bdev_module_channel	*shared_ch = bdev_ch->module_ch;
2111 	struct spdk_bdev_io *bdev_io;
2112 
2113 	if (shared_ch->io_outstanding > shared_ch->nomem_threshold) {
2114 		/*
2115 		 * Allow some more I/O to complete before retrying the nomem_io queue.
2116 		 *  Some drivers (such as nvme) cannot immediately take a new I/O in
2117 		 *  the context of a completion, because the resources for the I/O are
2118 		 *  not released until control returns to the bdev poller.  Also, we
2119 		 *  may require several small I/O to complete before a larger I/O
2120 		 *  (that requires splitting) can be submitted.
2121 		 */
2122 		return;
2123 	}
2124 
2125 	while (!TAILQ_EMPTY(&shared_ch->nomem_io)) {
2126 		bdev_io = TAILQ_FIRST(&shared_ch->nomem_io);
2127 		TAILQ_REMOVE(&shared_ch->nomem_io, bdev_io, link);
2128 		shared_ch->io_outstanding++;
2129 		bdev_io->status = SPDK_BDEV_IO_STATUS_PENDING;
2130 		bdev->fn_table->submit_request(bdev_io->ch->channel, bdev_io);
2131 		if (bdev_io->status == SPDK_BDEV_IO_STATUS_NOMEM) {
2132 			break;
2133 		}
2134 	}
2135 }
2136 
2137 static void
2138 _spdk_bdev_qos_io_complete(void *ctx)
2139 {
2140 	struct spdk_bdev_io *bdev_io = ctx;
2141 
2142 	bdev_io->cb(bdev_io, bdev_io->status == SPDK_BDEV_IO_STATUS_SUCCESS, bdev_io->caller_ctx);
2143 }
2144 
2145 static void
2146 _spdk_bdev_io_complete(void *ctx)
2147 {
2148 	struct spdk_bdev_io *bdev_io = ctx;
2149 
2150 	assert(bdev_io->cb != NULL);
2151 
2152 	if (bdev_io->io_submit_ch) {
2153 		bdev_io->ch = bdev_io->io_submit_ch;
2154 		bdev_io->io_submit_ch = NULL;
2155 		spdk_thread_send_msg(spdk_io_channel_get_thread(bdev_io->ch->channel),
2156 				     _spdk_bdev_qos_io_complete, bdev_io);
2157 	} else {
2158 		bdev_io->cb(bdev_io, bdev_io->status == SPDK_BDEV_IO_STATUS_SUCCESS,
2159 			    bdev_io->caller_ctx);
2160 	}
2161 }
2162 
2163 static void
2164 _spdk_bdev_reset_complete(struct spdk_io_channel_iter *i, int status)
2165 {
2166 	struct spdk_bdev_io *bdev_io = spdk_io_channel_iter_get_ctx(i);
2167 
2168 	if (bdev_io->u.reset.ch_ref != NULL) {
2169 		spdk_put_io_channel(bdev_io->u.reset.ch_ref);
2170 		bdev_io->u.reset.ch_ref = NULL;
2171 	}
2172 
2173 	_spdk_bdev_io_complete(bdev_io);
2174 }
2175 
2176 static void
2177 _spdk_bdev_unfreeze_channel(struct spdk_io_channel_iter *i)
2178 {
2179 	struct spdk_io_channel *_ch = spdk_io_channel_iter_get_channel(i);
2180 	struct spdk_bdev_channel *ch = spdk_io_channel_get_ctx(_ch);
2181 
2182 	ch->flags &= ~BDEV_CH_RESET_IN_PROGRESS;
2183 	if (!TAILQ_EMPTY(&ch->queued_resets)) {
2184 		_spdk_bdev_channel_start_reset(ch);
2185 	}
2186 
2187 	spdk_for_each_channel_continue(i, 0);
2188 }
2189 
2190 void
2191 spdk_bdev_io_complete(struct spdk_bdev_io *bdev_io, enum spdk_bdev_io_status status)
2192 {
2193 	struct spdk_bdev *bdev = bdev_io->bdev;
2194 	struct spdk_bdev_channel *bdev_ch = bdev_io->ch;
2195 	struct spdk_bdev_module_channel	*shared_ch = bdev_ch->module_ch;
2196 
2197 	bdev_io->status = status;
2198 
2199 	if (spdk_unlikely(bdev_io->type == SPDK_BDEV_IO_TYPE_RESET)) {
2200 		bool unlock_channels = false;
2201 
2202 		if (status == SPDK_BDEV_IO_STATUS_NOMEM) {
2203 			SPDK_ERRLOG("NOMEM returned for reset\n");
2204 		}
2205 		pthread_mutex_lock(&bdev->mutex);
2206 		if (bdev_io == bdev->reset_in_progress) {
2207 			bdev->reset_in_progress = NULL;
2208 			unlock_channels = true;
2209 		}
2210 		pthread_mutex_unlock(&bdev->mutex);
2211 
2212 		if (unlock_channels) {
2213 			spdk_for_each_channel(__bdev_to_io_dev(bdev), _spdk_bdev_unfreeze_channel,
2214 					      bdev_io, _spdk_bdev_reset_complete);
2215 			return;
2216 		}
2217 	} else {
2218 		assert(shared_ch->io_outstanding > 0);
2219 		shared_ch->io_outstanding--;
2220 		if (spdk_likely(status != SPDK_BDEV_IO_STATUS_NOMEM)) {
2221 			if (spdk_unlikely(!TAILQ_EMPTY(&shared_ch->nomem_io))) {
2222 				_spdk_bdev_ch_retry_io(bdev_ch);
2223 			}
2224 		} else {
2225 			TAILQ_INSERT_HEAD(&shared_ch->nomem_io, bdev_io, link);
2226 			/*
2227 			 * Wait for some of the outstanding I/O to complete before we
2228 			 *  retry any of the nomem_io.  Normally we will wait for
2229 			 *  NOMEM_THRESHOLD_COUNT I/O to complete but for low queue
2230 			 *  depth channels we will instead wait for half to complete.
2231 			 */
2232 			shared_ch->nomem_threshold = spdk_max((int64_t)shared_ch->io_outstanding / 2,
2233 							      (int64_t)shared_ch->io_outstanding - NOMEM_THRESHOLD_COUNT);
2234 			return;
2235 		}
2236 	}
2237 
2238 	if (status == SPDK_BDEV_IO_STATUS_SUCCESS) {
2239 		switch (bdev_io->type) {
2240 		case SPDK_BDEV_IO_TYPE_READ:
2241 			bdev_ch->stat.bytes_read += bdev_io->u.bdev.num_blocks * bdev->blocklen;
2242 			bdev_ch->stat.num_read_ops++;
2243 			bdev_ch->stat.read_latency_ticks += (spdk_get_ticks() - bdev_io->submit_tsc);
2244 			break;
2245 		case SPDK_BDEV_IO_TYPE_WRITE:
2246 			bdev_ch->stat.bytes_written += bdev_io->u.bdev.num_blocks * bdev->blocklen;
2247 			bdev_ch->stat.num_write_ops++;
2248 			bdev_ch->stat.write_latency_ticks += (spdk_get_ticks() - bdev_io->submit_tsc);
2249 			break;
2250 		default:
2251 			break;
2252 		}
2253 	}
2254 
2255 #ifdef SPDK_CONFIG_VTUNE
2256 	uint64_t now_tsc = spdk_get_ticks();
2257 	if (now_tsc > (bdev_ch->start_tsc + bdev_ch->interval_tsc)) {
2258 		uint64_t data[5];
2259 
2260 		data[0] = bdev_ch->stat.num_read_ops;
2261 		data[1] = bdev_ch->stat.bytes_read;
2262 		data[2] = bdev_ch->stat.num_write_ops;
2263 		data[3] = bdev_ch->stat.bytes_written;
2264 		data[4] = bdev->fn_table->get_spin_time ?
2265 			  bdev->fn_table->get_spin_time(bdev_ch->channel) : 0;
2266 
2267 		__itt_metadata_add(g_bdev_mgr.domain, __itt_null, bdev_ch->handle,
2268 				   __itt_metadata_u64, 5, data);
2269 
2270 		memset(&bdev_ch->stat, 0, sizeof(bdev_ch->stat));
2271 		bdev_ch->start_tsc = now_tsc;
2272 	}
2273 #endif
2274 
2275 	if (bdev_io->in_submit_request) {
2276 		/*
2277 		 * Defer completion to avoid potential infinite recursion if the
2278 		 * user's completion callback issues a new I/O.
2279 		 */
2280 		spdk_thread_send_msg(spdk_io_channel_get_thread(bdev_ch->channel),
2281 				     _spdk_bdev_io_complete, bdev_io);
2282 	} else {
2283 		_spdk_bdev_io_complete(bdev_io);
2284 	}
2285 }
2286 
2287 void
2288 spdk_bdev_io_complete_scsi_status(struct spdk_bdev_io *bdev_io, enum spdk_scsi_status sc,
2289 				  enum spdk_scsi_sense sk, uint8_t asc, uint8_t ascq)
2290 {
2291 	if (sc == SPDK_SCSI_STATUS_GOOD) {
2292 		bdev_io->status = SPDK_BDEV_IO_STATUS_SUCCESS;
2293 	} else {
2294 		bdev_io->status = SPDK_BDEV_IO_STATUS_SCSI_ERROR;
2295 		bdev_io->error.scsi.sc = sc;
2296 		bdev_io->error.scsi.sk = sk;
2297 		bdev_io->error.scsi.asc = asc;
2298 		bdev_io->error.scsi.ascq = ascq;
2299 	}
2300 
2301 	spdk_bdev_io_complete(bdev_io, bdev_io->status);
2302 }
2303 
2304 void
2305 spdk_bdev_io_get_scsi_status(const struct spdk_bdev_io *bdev_io,
2306 			     int *sc, int *sk, int *asc, int *ascq)
2307 {
2308 	assert(sc != NULL);
2309 	assert(sk != NULL);
2310 	assert(asc != NULL);
2311 	assert(ascq != NULL);
2312 
2313 	switch (bdev_io->status) {
2314 	case SPDK_BDEV_IO_STATUS_SUCCESS:
2315 		*sc = SPDK_SCSI_STATUS_GOOD;
2316 		*sk = SPDK_SCSI_SENSE_NO_SENSE;
2317 		*asc = SPDK_SCSI_ASC_NO_ADDITIONAL_SENSE;
2318 		*ascq = SPDK_SCSI_ASCQ_CAUSE_NOT_REPORTABLE;
2319 		break;
2320 	case SPDK_BDEV_IO_STATUS_NVME_ERROR:
2321 		spdk_scsi_nvme_translate(bdev_io, sc, sk, asc, ascq);
2322 		break;
2323 	case SPDK_BDEV_IO_STATUS_SCSI_ERROR:
2324 		*sc = bdev_io->error.scsi.sc;
2325 		*sk = bdev_io->error.scsi.sk;
2326 		*asc = bdev_io->error.scsi.asc;
2327 		*ascq = bdev_io->error.scsi.ascq;
2328 		break;
2329 	default:
2330 		*sc = SPDK_SCSI_STATUS_CHECK_CONDITION;
2331 		*sk = SPDK_SCSI_SENSE_ABORTED_COMMAND;
2332 		*asc = SPDK_SCSI_ASC_NO_ADDITIONAL_SENSE;
2333 		*ascq = SPDK_SCSI_ASCQ_CAUSE_NOT_REPORTABLE;
2334 		break;
2335 	}
2336 }
2337 
2338 void
2339 spdk_bdev_io_complete_nvme_status(struct spdk_bdev_io *bdev_io, int sct, int sc)
2340 {
2341 	if (sct == SPDK_NVME_SCT_GENERIC && sc == SPDK_NVME_SC_SUCCESS) {
2342 		bdev_io->status = SPDK_BDEV_IO_STATUS_SUCCESS;
2343 	} else {
2344 		bdev_io->error.nvme.sct = sct;
2345 		bdev_io->error.nvme.sc = sc;
2346 		bdev_io->status = SPDK_BDEV_IO_STATUS_NVME_ERROR;
2347 	}
2348 
2349 	spdk_bdev_io_complete(bdev_io, bdev_io->status);
2350 }
2351 
2352 void
2353 spdk_bdev_io_get_nvme_status(const struct spdk_bdev_io *bdev_io, int *sct, int *sc)
2354 {
2355 	assert(sct != NULL);
2356 	assert(sc != NULL);
2357 
2358 	if (bdev_io->status == SPDK_BDEV_IO_STATUS_NVME_ERROR) {
2359 		*sct = bdev_io->error.nvme.sct;
2360 		*sc = bdev_io->error.nvme.sc;
2361 	} else if (bdev_io->status == SPDK_BDEV_IO_STATUS_SUCCESS) {
2362 		*sct = SPDK_NVME_SCT_GENERIC;
2363 		*sc = SPDK_NVME_SC_SUCCESS;
2364 	} else {
2365 		*sct = SPDK_NVME_SCT_GENERIC;
2366 		*sc = SPDK_NVME_SC_INTERNAL_DEVICE_ERROR;
2367 	}
2368 }
2369 
2370 struct spdk_thread *
2371 spdk_bdev_io_get_thread(struct spdk_bdev_io *bdev_io)
2372 {
2373 	return spdk_io_channel_get_thread(bdev_io->ch->channel);
2374 }
2375 
2376 static int
2377 _spdk_bdev_register(struct spdk_bdev *bdev)
2378 {
2379 	struct spdk_bdev_module *module;
2380 
2381 	assert(bdev->module != NULL);
2382 
2383 	if (!bdev->name) {
2384 		SPDK_ERRLOG("Bdev name is NULL\n");
2385 		return -EINVAL;
2386 	}
2387 
2388 	if (spdk_bdev_get_by_name(bdev->name)) {
2389 		SPDK_ERRLOG("Bdev name:%s already exists\n", bdev->name);
2390 		return -EEXIST;
2391 	}
2392 
2393 	bdev->status = SPDK_BDEV_STATUS_READY;
2394 
2395 	TAILQ_INIT(&bdev->open_descs);
2396 
2397 	TAILQ_INIT(&bdev->vbdevs);
2398 	TAILQ_INIT(&bdev->base_bdevs);
2399 
2400 	TAILQ_INIT(&bdev->aliases);
2401 
2402 	bdev->reset_in_progress = NULL;
2403 
2404 	spdk_io_device_register(__bdev_to_io_dev(bdev),
2405 				spdk_bdev_channel_create, spdk_bdev_channel_destroy,
2406 				sizeof(struct spdk_bdev_channel));
2407 
2408 	pthread_mutex_init(&bdev->mutex, NULL);
2409 	SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Inserting bdev %s into list\n", bdev->name);
2410 	TAILQ_INSERT_TAIL(&g_bdev_mgr.bdevs, bdev, link);
2411 
2412 	TAILQ_FOREACH(module, &g_bdev_mgr.bdev_modules, tailq) {
2413 		if (module->examine) {
2414 			module->action_in_progress++;
2415 			module->examine(bdev);
2416 		}
2417 	}
2418 
2419 	return 0;
2420 }
2421 
2422 int
2423 spdk_bdev_register(struct spdk_bdev *bdev)
2424 {
2425 	return _spdk_bdev_register(bdev);
2426 }
2427 
2428 int
2429 spdk_vbdev_register(struct spdk_bdev *vbdev, struct spdk_bdev **base_bdevs, int base_bdev_count)
2430 {
2431 	int i, rc;
2432 
2433 	rc = _spdk_bdev_register(vbdev);
2434 	if (rc) {
2435 		return rc;
2436 	}
2437 
2438 	for (i = 0; i < base_bdev_count; i++) {
2439 		assert(base_bdevs[i] != NULL);
2440 		assert(base_bdevs[i]->claim_module != NULL);
2441 		TAILQ_INSERT_TAIL(&vbdev->base_bdevs, base_bdevs[i], base_bdev_link);
2442 		TAILQ_INSERT_TAIL(&base_bdevs[i]->vbdevs, vbdev, vbdev_link);
2443 	}
2444 
2445 	return 0;
2446 }
2447 
2448 void
2449 spdk_bdev_unregister_done(struct spdk_bdev *bdev, int bdeverrno)
2450 {
2451 	if (bdev->unregister_cb != NULL) {
2452 		bdev->unregister_cb(bdev->unregister_ctx, bdeverrno);
2453 	}
2454 }
2455 
2456 static void
2457 _remove_notify(void *arg)
2458 {
2459 	struct spdk_bdev_desc *desc = arg;
2460 
2461 	desc->remove_cb(desc->remove_ctx);
2462 }
2463 
2464 void
2465 spdk_bdev_unregister(struct spdk_bdev *bdev, spdk_bdev_unregister_cb cb_fn, void *cb_arg)
2466 {
2467 	struct spdk_bdev_desc	*desc, *tmp;
2468 	int			rc;
2469 	bool			do_destruct = true;
2470 	struct spdk_bdev	*base_bdev;
2471 
2472 	SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Removing bdev %s from list\n", bdev->name);
2473 
2474 	pthread_mutex_lock(&bdev->mutex);
2475 
2476 	if (!TAILQ_EMPTY(&bdev->base_bdevs)) {
2477 		TAILQ_FOREACH(base_bdev, &bdev->base_bdevs, base_bdev_link) {
2478 			TAILQ_REMOVE(&base_bdev->vbdevs, bdev, vbdev_link);
2479 		}
2480 	}
2481 
2482 	bdev->status = SPDK_BDEV_STATUS_REMOVING;
2483 	bdev->unregister_cb = cb_fn;
2484 	bdev->unregister_ctx = cb_arg;
2485 
2486 	TAILQ_FOREACH_SAFE(desc, &bdev->open_descs, link, tmp) {
2487 		if (desc->remove_cb) {
2488 			do_destruct = false;
2489 			/*
2490 			 * Defer invocation of the remove_cb to a separate message that will
2491 			 *  run later on this thread.  This ensures this context unwinds and
2492 			 *  we don't recursively unregister this bdev again if the remove_cb
2493 			 *  immediately closes its descriptor.
2494 			 */
2495 			spdk_thread_send_msg(spdk_get_thread(), _remove_notify, desc);
2496 		}
2497 	}
2498 
2499 	if (!do_destruct) {
2500 		pthread_mutex_unlock(&bdev->mutex);
2501 		return;
2502 	}
2503 
2504 	TAILQ_REMOVE(&g_bdev_mgr.bdevs, bdev, link);
2505 	pthread_mutex_unlock(&bdev->mutex);
2506 
2507 	pthread_mutex_destroy(&bdev->mutex);
2508 
2509 	spdk_io_device_unregister(__bdev_to_io_dev(bdev), NULL);
2510 
2511 	rc = bdev->fn_table->destruct(bdev->ctxt);
2512 	if (rc < 0) {
2513 		SPDK_ERRLOG("destruct failed\n");
2514 	}
2515 	if (rc <= 0 && cb_fn != NULL) {
2516 		cb_fn(cb_arg, rc);
2517 	}
2518 }
2519 
2520 int
2521 spdk_bdev_open(struct spdk_bdev *bdev, bool write, spdk_bdev_remove_cb_t remove_cb,
2522 	       void *remove_ctx, struct spdk_bdev_desc **_desc)
2523 {
2524 	struct spdk_bdev_desc *desc;
2525 
2526 	desc = calloc(1, sizeof(*desc));
2527 	if (desc == NULL) {
2528 		SPDK_ERRLOG("Failed to allocate memory for bdev descriptor\n");
2529 		return -ENOMEM;
2530 	}
2531 
2532 	pthread_mutex_lock(&bdev->mutex);
2533 
2534 	if (write && bdev->claim_module) {
2535 		SPDK_INFOLOG(SPDK_LOG_BDEV, "Could not open %s - already claimed\n", bdev->name);
2536 		free(desc);
2537 		pthread_mutex_unlock(&bdev->mutex);
2538 		return -EPERM;
2539 	}
2540 
2541 	TAILQ_INSERT_TAIL(&bdev->open_descs, desc, link);
2542 
2543 	desc->bdev = bdev;
2544 	desc->remove_cb = remove_cb;
2545 	desc->remove_ctx = remove_ctx;
2546 	desc->write = write;
2547 	*_desc = desc;
2548 
2549 	pthread_mutex_unlock(&bdev->mutex);
2550 
2551 	return 0;
2552 }
2553 
2554 void
2555 spdk_bdev_close(struct spdk_bdev_desc *desc)
2556 {
2557 	struct spdk_bdev *bdev = desc->bdev;
2558 	bool do_unregister = false;
2559 
2560 	pthread_mutex_lock(&bdev->mutex);
2561 
2562 	TAILQ_REMOVE(&bdev->open_descs, desc, link);
2563 	free(desc);
2564 
2565 	if (bdev->status == SPDK_BDEV_STATUS_REMOVING && TAILQ_EMPTY(&bdev->open_descs)) {
2566 		do_unregister = true;
2567 	}
2568 	pthread_mutex_unlock(&bdev->mutex);
2569 
2570 	if (do_unregister == true) {
2571 		spdk_bdev_unregister(bdev, bdev->unregister_cb, bdev->unregister_ctx);
2572 	}
2573 }
2574 
2575 int
2576 spdk_bdev_module_claim_bdev(struct spdk_bdev *bdev, struct spdk_bdev_desc *desc,
2577 			    struct spdk_bdev_module *module)
2578 {
2579 	if (bdev->claim_module != NULL) {
2580 		SPDK_ERRLOG("bdev %s already claimed by module %s\n", bdev->name,
2581 			    bdev->claim_module->name);
2582 		return -EPERM;
2583 	}
2584 
2585 	if (desc && !desc->write) {
2586 		desc->write = true;
2587 	}
2588 
2589 	bdev->claim_module = module;
2590 	return 0;
2591 }
2592 
2593 void
2594 spdk_bdev_module_release_bdev(struct spdk_bdev *bdev)
2595 {
2596 	assert(bdev->claim_module != NULL);
2597 	bdev->claim_module = NULL;
2598 }
2599 
2600 struct spdk_bdev *
2601 spdk_bdev_desc_get_bdev(struct spdk_bdev_desc *desc)
2602 {
2603 	return desc->bdev;
2604 }
2605 
2606 void
2607 spdk_bdev_io_get_iovec(struct spdk_bdev_io *bdev_io, struct iovec **iovp, int *iovcntp)
2608 {
2609 	struct iovec *iovs;
2610 	int iovcnt;
2611 
2612 	if (bdev_io == NULL) {
2613 		return;
2614 	}
2615 
2616 	switch (bdev_io->type) {
2617 	case SPDK_BDEV_IO_TYPE_READ:
2618 		iovs = bdev_io->u.bdev.iovs;
2619 		iovcnt = bdev_io->u.bdev.iovcnt;
2620 		break;
2621 	case SPDK_BDEV_IO_TYPE_WRITE:
2622 		iovs = bdev_io->u.bdev.iovs;
2623 		iovcnt = bdev_io->u.bdev.iovcnt;
2624 		break;
2625 	default:
2626 		iovs = NULL;
2627 		iovcnt = 0;
2628 		break;
2629 	}
2630 
2631 	if (iovp) {
2632 		*iovp = iovs;
2633 	}
2634 	if (iovcntp) {
2635 		*iovcntp = iovcnt;
2636 	}
2637 }
2638 
2639 void
2640 spdk_bdev_module_list_add(struct spdk_bdev_module *bdev_module)
2641 {
2642 
2643 	if (spdk_bdev_module_list_find(bdev_module->name)) {
2644 		fprintf(stderr, "ERROR: module '%s' already registered.\n", bdev_module->name);
2645 		assert(false);
2646 	}
2647 
2648 	if (bdev_module->async_init) {
2649 		bdev_module->action_in_progress = 1;
2650 	}
2651 
2652 	/*
2653 	 * Modules with examine callbacks must be initialized first, so they are
2654 	 *  ready to handle examine callbacks from later modules that will
2655 	 *  register physical bdevs.
2656 	 */
2657 	if (bdev_module->examine != NULL) {
2658 		TAILQ_INSERT_HEAD(&g_bdev_mgr.bdev_modules, bdev_module, tailq);
2659 	} else {
2660 		TAILQ_INSERT_TAIL(&g_bdev_mgr.bdev_modules, bdev_module, tailq);
2661 	}
2662 }
2663 
2664 struct spdk_bdev_module *
2665 spdk_bdev_module_list_find(const char *name)
2666 {
2667 	struct spdk_bdev_module *bdev_module;
2668 
2669 	TAILQ_FOREACH(bdev_module, &g_bdev_mgr.bdev_modules, tailq) {
2670 		if (strcmp(name, bdev_module->name) == 0) {
2671 			break;
2672 		}
2673 	}
2674 
2675 	return bdev_module;
2676 }
2677 
2678 static void
2679 spdk_bdev_write_zeroes_split(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg)
2680 {
2681 	uint64_t len;
2682 
2683 	if (!success) {
2684 		bdev_io->cb = bdev_io->stored_user_cb;
2685 		_spdk_bdev_io_complete(bdev_io);
2686 		return;
2687 	}
2688 
2689 	/* no need to perform the error checking from write_zeroes_blocks because this request already passed those checks. */
2690 	len = spdk_min(spdk_bdev_get_block_size(bdev_io->bdev) * bdev_io->split_remaining_num_blocks,
2691 		       ZERO_BUFFER_SIZE);
2692 
2693 	bdev_io->u.bdev.offset_blocks = bdev_io->split_current_offset_blocks;
2694 	bdev_io->u.bdev.iov.iov_len = len;
2695 	bdev_io->u.bdev.num_blocks = len / spdk_bdev_get_block_size(bdev_io->bdev);
2696 	bdev_io->split_remaining_num_blocks -= bdev_io->u.bdev.num_blocks;
2697 	bdev_io->split_current_offset_blocks += bdev_io->u.bdev.num_blocks;
2698 
2699 	/* if this round completes the i/o, change the callback to be the original user callback */
2700 	if (bdev_io->split_remaining_num_blocks == 0) {
2701 		spdk_bdev_io_init(bdev_io, bdev_io->bdev, cb_arg, bdev_io->stored_user_cb);
2702 	} else {
2703 		spdk_bdev_io_init(bdev_io, bdev_io->bdev, cb_arg, spdk_bdev_write_zeroes_split);
2704 	}
2705 	spdk_bdev_io_submit(bdev_io);
2706 }
2707 
2708 SPDK_LOG_REGISTER_COMPONENT("bdev", SPDK_LOG_BDEV)
2709