xref: /spdk/lib/bdev/bdev.c (revision 2e1dbc458758ba3d2709299913bc199fa3926bda)
1 /*-
2  *   BSD LICENSE
3  *
4  *   Copyright (C) 2008-2012 Daisuke Aoyama <aoyama@peach.ne.jp>.
5  *   Copyright (c) Intel Corporation.
6  *   All rights reserved.
7  *
8  *   Redistribution and use in source and binary forms, with or without
9  *   modification, are permitted provided that the following conditions
10  *   are met:
11  *
12  *     * Redistributions of source code must retain the above copyright
13  *       notice, this list of conditions and the following disclaimer.
14  *     * Redistributions in binary form must reproduce the above copyright
15  *       notice, this list of conditions and the following disclaimer in
16  *       the documentation and/or other materials provided with the
17  *       distribution.
18  *     * Neither the name of Intel Corporation nor the names of its
19  *       contributors may be used to endorse or promote products derived
20  *       from this software without specific prior written permission.
21  *
22  *   THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
23  *   "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
24  *   LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
25  *   A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
26  *   OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
27  *   SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
28  *   LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
29  *   DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
30  *   THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
31  *   (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
32  *   OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
33  */
34 
35 #include "spdk/stdinc.h"
36 
37 #include "spdk/bdev.h"
38 #include "spdk/conf.h"
39 
40 #include "spdk/env.h"
41 #include "spdk/event.h"
42 #include "spdk/io_channel.h"
43 #include "spdk/likely.h"
44 #include "spdk/queue.h"
45 #include "spdk/nvme_spec.h"
46 #include "spdk/scsi_spec.h"
47 #include "spdk/util.h"
48 
49 #include "spdk_internal/bdev.h"
50 #include "spdk_internal/log.h"
51 #include "spdk/string.h"
52 
53 #ifdef SPDK_CONFIG_VTUNE
54 #include "ittnotify.h"
55 #include "ittnotify_types.h"
56 int __itt_init_ittlib(const char *, __itt_group_id);
57 #endif
58 
59 #define SPDK_BDEV_IO_POOL_SIZE			(64 * 1024)
60 #define SPDK_BDEV_IO_CACHE_SIZE			256
61 #define BUF_SMALL_POOL_SIZE			8192
62 #define BUF_LARGE_POOL_SIZE			1024
63 #define NOMEM_THRESHOLD_COUNT			8
64 #define ZERO_BUFFER_SIZE			0x100000
65 #define SPDK_BDEV_QOS_TIMESLICE_IN_USEC		1000
66 #define SPDK_BDEV_SEC_TO_USEC			1000000ULL
67 #define SPDK_BDEV_QOS_MIN_IO_PER_TIMESLICE	1
68 #define SPDK_BDEV_QOS_MIN_IOS_PER_SEC		10000
69 
70 typedef TAILQ_HEAD(, spdk_bdev_io) bdev_io_tailq_t;
71 typedef STAILQ_HEAD(, spdk_bdev_io) bdev_io_stailq_t;
72 
73 struct spdk_bdev_mgr {
74 	struct spdk_mempool *bdev_io_pool;
75 
76 	struct spdk_mempool *buf_small_pool;
77 	struct spdk_mempool *buf_large_pool;
78 
79 	void *zero_buffer;
80 
81 	TAILQ_HEAD(, spdk_bdev_module) bdev_modules;
82 
83 	TAILQ_HEAD(, spdk_bdev) bdevs;
84 
85 	bool init_complete;
86 	bool module_init_complete;
87 
88 #ifdef SPDK_CONFIG_VTUNE
89 	__itt_domain	*domain;
90 #endif
91 };
92 
93 static struct spdk_bdev_mgr g_bdev_mgr = {
94 	.bdev_modules = TAILQ_HEAD_INITIALIZER(g_bdev_mgr.bdev_modules),
95 	.bdevs = TAILQ_HEAD_INITIALIZER(g_bdev_mgr.bdevs),
96 	.init_complete = false,
97 	.module_init_complete = false,
98 };
99 
100 static spdk_bdev_init_cb	g_init_cb_fn = NULL;
101 static void			*g_init_cb_arg = NULL;
102 
103 static spdk_bdev_fini_cb	g_fini_cb_fn = NULL;
104 static void			*g_fini_cb_arg = NULL;
105 static struct spdk_thread	*g_fini_thread = NULL;
106 
107 
108 struct spdk_bdev_mgmt_channel {
109 	bdev_io_stailq_t need_buf_small;
110 	bdev_io_stailq_t need_buf_large;
111 
112 	/*
113 	 * Each thread keeps a cache of bdev_io - this allows
114 	 *  bdev threads which are *not* DPDK threads to still
115 	 *  benefit from a per-thread bdev_io cache.  Without
116 	 *  this, non-DPDK threads fetching from the mempool
117 	 *  incur a cmpxchg on get and put.
118 	 */
119 	bdev_io_stailq_t per_thread_cache;
120 	uint32_t	per_thread_cache_count;
121 };
122 
123 /*
124  * Per-module (or per-io_device) channel. Multiple bdevs built on the same io_device
125  * will queue here their IO that awaits retry. It makes it posible to retry sending
126  * IO to one bdev after IO from other bdev completes.
127  */
128 struct spdk_bdev_module_channel {
129 
130 	/* The bdev management channel */
131 	struct spdk_bdev_mgmt_channel *mgmt_ch;
132 
133 	/*
134 	 * Count of I/O submitted to bdev module and waiting for completion.
135 	 * Incremented before submit_request() is called on an spdk_bdev_io.
136 	 */
137 	uint64_t		io_outstanding;
138 
139 	/*
140 	 * Queue of IO awaiting retry because of a previous NOMEM status returned
141 	 *  on this channel.
142 	 */
143 	bdev_io_tailq_t		nomem_io;
144 
145 	/*
146 	 * Threshold which io_outstanding must drop to before retrying nomem_io.
147 	 */
148 	uint64_t		nomem_threshold;
149 
150 	TAILQ_ENTRY(spdk_bdev_module_channel) link;
151 };
152 
153 #define BDEV_CH_RESET_IN_PROGRESS	(1 << 0)
154 #define BDEV_CH_QOS_ENABLED		(1 << 1)
155 
156 struct spdk_bdev_channel {
157 	struct spdk_bdev	*bdev;
158 
159 	/* The channel for the underlying device */
160 	struct spdk_io_channel	*channel;
161 
162 	/* Channel for the bdev module */
163 	struct spdk_bdev_module_channel	*module_ch;
164 
165 	struct spdk_bdev_io_stat stat;
166 
167 	/*
168 	 * Count of I/O submitted through this channel and waiting for completion.
169 	 * Incremented before submit_request() is called on an spdk_bdev_io.
170 	 */
171 	uint64_t		io_outstanding;
172 
173 	bdev_io_tailq_t		queued_resets;
174 
175 	uint32_t		flags;
176 
177 	/*
178 	 * Rate limiting on this channel.
179 	 * Queue of IO awaiting issue because of a QoS rate limiting happened
180 	 *  on this channel.
181 	 */
182 	bdev_io_tailq_t		qos_io;
183 
184 	/*
185 	 * Rate limiting on this channel.
186 	 * Maximum allowed IOs to be issued in one timeslice (e.g., 1ms) and
187 	 *  only valid for the master channel which manages the outstanding IOs.
188 	 */
189 	uint64_t		qos_max_ios_per_timeslice;
190 
191 	/*
192 	 * Rate limiting on this channel.
193 	 * Submitted IO in one timeslice (e.g., 1ms)
194 	 */
195 	uint64_t		io_submitted_this_timeslice;
196 
197 	/*
198 	 * Rate limiting on this channel.
199 	 * Periodic running QoS poller in millisecond.
200 	 */
201 	struct spdk_poller	*qos_poller;
202 
203 #ifdef SPDK_CONFIG_VTUNE
204 	uint64_t		start_tsc;
205 	uint64_t		interval_tsc;
206 	__itt_string_handle	*handle;
207 #endif
208 
209 };
210 
211 struct spdk_bdev_desc {
212 	struct spdk_bdev		*bdev;
213 	spdk_bdev_remove_cb_t		remove_cb;
214 	void				*remove_ctx;
215 	bool				write;
216 	TAILQ_ENTRY(spdk_bdev_desc)	link;
217 };
218 
219 #define __bdev_to_io_dev(bdev)		(((char *)bdev) + 1)
220 #define __bdev_from_io_dev(io_dev)	((struct spdk_bdev *)(((char *)io_dev) - 1))
221 
222 static void spdk_bdev_write_zeroes_split(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg);
223 
224 struct spdk_bdev *
225 spdk_bdev_first(void)
226 {
227 	struct spdk_bdev *bdev;
228 
229 	bdev = TAILQ_FIRST(&g_bdev_mgr.bdevs);
230 	if (bdev) {
231 		SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Starting bdev iteration at %s\n", bdev->name);
232 	}
233 
234 	return bdev;
235 }
236 
237 struct spdk_bdev *
238 spdk_bdev_next(struct spdk_bdev *prev)
239 {
240 	struct spdk_bdev *bdev;
241 
242 	bdev = TAILQ_NEXT(prev, link);
243 	if (bdev) {
244 		SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Continuing bdev iteration at %s\n", bdev->name);
245 	}
246 
247 	return bdev;
248 }
249 
250 static struct spdk_bdev *
251 _bdev_next_leaf(struct spdk_bdev *bdev)
252 {
253 	while (bdev != NULL) {
254 		if (bdev->claim_module == NULL) {
255 			return bdev;
256 		} else {
257 			bdev = TAILQ_NEXT(bdev, link);
258 		}
259 	}
260 
261 	return bdev;
262 }
263 
264 struct spdk_bdev *
265 spdk_bdev_first_leaf(void)
266 {
267 	struct spdk_bdev *bdev;
268 
269 	bdev = _bdev_next_leaf(TAILQ_FIRST(&g_bdev_mgr.bdevs));
270 
271 	if (bdev) {
272 		SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Starting bdev iteration at %s\n", bdev->name);
273 	}
274 
275 	return bdev;
276 }
277 
278 struct spdk_bdev *
279 spdk_bdev_next_leaf(struct spdk_bdev *prev)
280 {
281 	struct spdk_bdev *bdev;
282 
283 	bdev = _bdev_next_leaf(TAILQ_NEXT(prev, link));
284 
285 	if (bdev) {
286 		SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Continuing bdev iteration at %s\n", bdev->name);
287 	}
288 
289 	return bdev;
290 }
291 
292 struct spdk_bdev *
293 spdk_bdev_get_by_name(const char *bdev_name)
294 {
295 	struct spdk_bdev_alias *tmp;
296 	struct spdk_bdev *bdev = spdk_bdev_first();
297 
298 	while (bdev != NULL) {
299 		if (strcmp(bdev_name, bdev->name) == 0) {
300 			return bdev;
301 		}
302 
303 		TAILQ_FOREACH(tmp, &bdev->aliases, tailq) {
304 			if (strcmp(bdev_name, tmp->alias) == 0) {
305 				return bdev;
306 			}
307 		}
308 
309 		bdev = spdk_bdev_next(bdev);
310 	}
311 
312 	return NULL;
313 }
314 
315 static void
316 spdk_bdev_io_set_buf(struct spdk_bdev_io *bdev_io, void *buf)
317 {
318 	assert(bdev_io->get_buf_cb != NULL);
319 	assert(buf != NULL);
320 	assert(bdev_io->u.bdev.iovs != NULL);
321 
322 	bdev_io->buf = buf;
323 	bdev_io->u.bdev.iovs[0].iov_base = (void *)((unsigned long)((char *)buf + 512) & ~511UL);
324 	bdev_io->u.bdev.iovs[0].iov_len = bdev_io->buf_len;
325 	bdev_io->get_buf_cb(bdev_io->ch->channel, bdev_io);
326 }
327 
328 static void
329 spdk_bdev_io_put_buf(struct spdk_bdev_io *bdev_io)
330 {
331 	struct spdk_mempool *pool;
332 	struct spdk_bdev_io *tmp;
333 	void *buf;
334 	bdev_io_stailq_t *stailq;
335 	struct spdk_bdev_mgmt_channel *ch;
336 
337 	assert(bdev_io->u.bdev.iovcnt == 1);
338 
339 	buf = bdev_io->buf;
340 	ch = bdev_io->ch->module_ch->mgmt_ch;
341 
342 	if (bdev_io->buf_len <= SPDK_BDEV_SMALL_BUF_MAX_SIZE) {
343 		pool = g_bdev_mgr.buf_small_pool;
344 		stailq = &ch->need_buf_small;
345 	} else {
346 		pool = g_bdev_mgr.buf_large_pool;
347 		stailq = &ch->need_buf_large;
348 	}
349 
350 	if (STAILQ_EMPTY(stailq)) {
351 		spdk_mempool_put(pool, buf);
352 	} else {
353 		tmp = STAILQ_FIRST(stailq);
354 		STAILQ_REMOVE_HEAD(stailq, buf_link);
355 		spdk_bdev_io_set_buf(tmp, buf);
356 	}
357 }
358 
359 void
360 spdk_bdev_io_get_buf(struct spdk_bdev_io *bdev_io, spdk_bdev_io_get_buf_cb cb, uint64_t len)
361 {
362 	struct spdk_mempool *pool;
363 	bdev_io_stailq_t *stailq;
364 	void *buf = NULL;
365 	struct spdk_bdev_mgmt_channel *mgmt_ch;
366 
367 	assert(cb != NULL);
368 	assert(bdev_io->u.bdev.iovs != NULL);
369 
370 	if (spdk_unlikely(bdev_io->u.bdev.iovs[0].iov_base != NULL)) {
371 		/* Buffer already present */
372 		cb(bdev_io->ch->channel, bdev_io);
373 		return;
374 	}
375 
376 	assert(len <= SPDK_BDEV_LARGE_BUF_MAX_SIZE);
377 	mgmt_ch = bdev_io->ch->module_ch->mgmt_ch;
378 
379 	bdev_io->buf_len = len;
380 	bdev_io->get_buf_cb = cb;
381 	if (len <= SPDK_BDEV_SMALL_BUF_MAX_SIZE) {
382 		pool = g_bdev_mgr.buf_small_pool;
383 		stailq = &mgmt_ch->need_buf_small;
384 	} else {
385 		pool = g_bdev_mgr.buf_large_pool;
386 		stailq = &mgmt_ch->need_buf_large;
387 	}
388 
389 	buf = spdk_mempool_get(pool);
390 
391 	if (!buf) {
392 		STAILQ_INSERT_TAIL(stailq, bdev_io, buf_link);
393 	} else {
394 		spdk_bdev_io_set_buf(bdev_io, buf);
395 	}
396 }
397 
398 static int
399 spdk_bdev_module_get_max_ctx_size(void)
400 {
401 	struct spdk_bdev_module *bdev_module;
402 	int max_bdev_module_size = 0;
403 
404 	TAILQ_FOREACH(bdev_module, &g_bdev_mgr.bdev_modules, tailq) {
405 		if (bdev_module->get_ctx_size && bdev_module->get_ctx_size() > max_bdev_module_size) {
406 			max_bdev_module_size = bdev_module->get_ctx_size();
407 		}
408 	}
409 
410 	return max_bdev_module_size;
411 }
412 
413 void
414 spdk_bdev_config_text(FILE *fp)
415 {
416 	struct spdk_bdev_module *bdev_module;
417 
418 	TAILQ_FOREACH(bdev_module, &g_bdev_mgr.bdev_modules, tailq) {
419 		if (bdev_module->config_text) {
420 			bdev_module->config_text(fp);
421 		}
422 	}
423 }
424 
425 void
426 spdk_bdev_subsystem_config_json(struct spdk_json_write_ctx *w)
427 {
428 	struct spdk_bdev_module *bdev_module;
429 	struct spdk_bdev *bdev;
430 
431 	assert(w != NULL);
432 
433 	spdk_json_write_array_begin(w);
434 
435 	TAILQ_FOREACH(bdev_module, &g_bdev_mgr.bdev_modules, tailq) {
436 		if (bdev_module->config_json) {
437 			bdev_module->config_json(w);
438 		}
439 	}
440 
441 	TAILQ_FOREACH(bdev, &g_bdev_mgr.bdevs, link) {
442 		spdk_bdev_config_json(bdev, w);
443 	}
444 
445 	spdk_json_write_array_end(w);
446 }
447 
448 static int
449 spdk_bdev_mgmt_channel_create(void *io_device, void *ctx_buf)
450 {
451 	struct spdk_bdev_mgmt_channel *ch = ctx_buf;
452 
453 	STAILQ_INIT(&ch->need_buf_small);
454 	STAILQ_INIT(&ch->need_buf_large);
455 
456 	STAILQ_INIT(&ch->per_thread_cache);
457 	ch->per_thread_cache_count = 0;
458 
459 	return 0;
460 }
461 
462 static void
463 spdk_bdev_mgmt_channel_destroy(void *io_device, void *ctx_buf)
464 {
465 	struct spdk_bdev_mgmt_channel *ch = ctx_buf;
466 	struct spdk_bdev_io *bdev_io;
467 
468 	if (!STAILQ_EMPTY(&ch->need_buf_small) || !STAILQ_EMPTY(&ch->need_buf_large)) {
469 		SPDK_ERRLOG("Pending I/O list wasn't empty on channel free\n");
470 	}
471 
472 	while (!STAILQ_EMPTY(&ch->per_thread_cache)) {
473 		bdev_io = STAILQ_FIRST(&ch->per_thread_cache);
474 		STAILQ_REMOVE_HEAD(&ch->per_thread_cache, buf_link);
475 		ch->per_thread_cache_count--;
476 		spdk_mempool_put(g_bdev_mgr.bdev_io_pool, (void *)bdev_io);
477 	}
478 
479 	assert(ch->per_thread_cache_count == 0);
480 }
481 
482 static void
483 spdk_bdev_init_complete(int rc)
484 {
485 	spdk_bdev_init_cb cb_fn = g_init_cb_fn;
486 	void *cb_arg = g_init_cb_arg;
487 
488 	g_bdev_mgr.init_complete = true;
489 	g_init_cb_fn = NULL;
490 	g_init_cb_arg = NULL;
491 
492 	cb_fn(cb_arg, rc);
493 }
494 
495 static void
496 spdk_bdev_module_action_complete(void)
497 {
498 	struct spdk_bdev_module *m;
499 
500 	/*
501 	 * Don't finish bdev subsystem initialization if
502 	 * module pre-initialization is still in progress, or
503 	 * the subsystem been already initialized.
504 	 */
505 	if (!g_bdev_mgr.module_init_complete || g_bdev_mgr.init_complete) {
506 		return;
507 	}
508 
509 	/*
510 	 * Check all bdev modules for inits/examinations in progress. If any
511 	 * exist, return immediately since we cannot finish bdev subsystem
512 	 * initialization until all are completed.
513 	 */
514 	TAILQ_FOREACH(m, &g_bdev_mgr.bdev_modules, tailq) {
515 		if (m->action_in_progress > 0) {
516 			return;
517 		}
518 	}
519 
520 	/*
521 	 * For modules that need to know when subsystem init is complete,
522 	 * inform them now.
523 	 */
524 	TAILQ_FOREACH(m, &g_bdev_mgr.bdev_modules, tailq) {
525 		if (m->init_complete) {
526 			m->init_complete();
527 		}
528 	}
529 
530 	/*
531 	 * Modules already finished initialization - now that all
532 	 * the bdev modules have finished their asynchronous I/O
533 	 * processing, the entire bdev layer can be marked as complete.
534 	 */
535 	spdk_bdev_init_complete(0);
536 }
537 
538 static void
539 spdk_bdev_module_action_done(struct spdk_bdev_module *module)
540 {
541 	assert(module->action_in_progress > 0);
542 	module->action_in_progress--;
543 	spdk_bdev_module_action_complete();
544 }
545 
546 void
547 spdk_bdev_module_init_done(struct spdk_bdev_module *module)
548 {
549 	spdk_bdev_module_action_done(module);
550 }
551 
552 void
553 spdk_bdev_module_examine_done(struct spdk_bdev_module *module)
554 {
555 	spdk_bdev_module_action_done(module);
556 }
557 
558 static int
559 spdk_bdev_module_channel_create(void *io_device, void *ctx_buf)
560 {
561 	struct spdk_bdev_module_channel *ch = ctx_buf;
562 	struct spdk_io_channel *mgmt_ch;
563 
564 	ch->io_outstanding = 0;
565 	TAILQ_INIT(&ch->nomem_io);
566 	ch->nomem_threshold = 0;
567 
568 	mgmt_ch = spdk_get_io_channel(&g_bdev_mgr);
569 	if (!mgmt_ch) {
570 		return -1;
571 	}
572 
573 	ch->mgmt_ch = spdk_io_channel_get_ctx(mgmt_ch);
574 
575 	return 0;
576 }
577 
578 static void
579 spdk_bdev_module_channel_destroy(void *io_device, void *ctx_buf)
580 {
581 	struct spdk_bdev_module_channel *ch = ctx_buf;
582 
583 	assert(ch->io_outstanding == 0);
584 	assert(TAILQ_EMPTY(&ch->nomem_io));
585 
586 	spdk_put_io_channel(spdk_io_channel_from_ctx(ch->mgmt_ch));
587 }
588 
589 static int
590 spdk_bdev_modules_init(void)
591 {
592 	struct spdk_bdev_module *module;
593 	int rc = 0;
594 
595 	TAILQ_FOREACH(module, &g_bdev_mgr.bdev_modules, tailq) {
596 		spdk_io_device_register(module,
597 					spdk_bdev_module_channel_create,
598 					spdk_bdev_module_channel_destroy,
599 					sizeof(struct spdk_bdev_module_channel));
600 		rc = module->module_init();
601 		if (rc != 0) {
602 			break;
603 		}
604 	}
605 
606 	g_bdev_mgr.module_init_complete = true;
607 	return rc;
608 }
609 void
610 spdk_bdev_initialize(spdk_bdev_init_cb cb_fn, void *cb_arg)
611 {
612 	int cache_size;
613 	int rc = 0;
614 	char mempool_name[32];
615 
616 	assert(cb_fn != NULL);
617 
618 	g_init_cb_fn = cb_fn;
619 	g_init_cb_arg = cb_arg;
620 
621 	snprintf(mempool_name, sizeof(mempool_name), "bdev_io_%d", getpid());
622 
623 	g_bdev_mgr.bdev_io_pool = spdk_mempool_create(mempool_name,
624 				  SPDK_BDEV_IO_POOL_SIZE,
625 				  sizeof(struct spdk_bdev_io) +
626 				  spdk_bdev_module_get_max_ctx_size(),
627 				  0,
628 				  SPDK_ENV_SOCKET_ID_ANY);
629 
630 	if (g_bdev_mgr.bdev_io_pool == NULL) {
631 		SPDK_ERRLOG("could not allocate spdk_bdev_io pool\n");
632 		spdk_bdev_init_complete(-1);
633 		return;
634 	}
635 
636 	/**
637 	 * Ensure no more than half of the total buffers end up local caches, by
638 	 *   using spdk_env_get_core_count() to determine how many local caches we need
639 	 *   to account for.
640 	 */
641 	cache_size = BUF_SMALL_POOL_SIZE / (2 * spdk_env_get_core_count());
642 	snprintf(mempool_name, sizeof(mempool_name), "buf_small_pool_%d", getpid());
643 
644 	g_bdev_mgr.buf_small_pool = spdk_mempool_create(mempool_name,
645 				    BUF_SMALL_POOL_SIZE,
646 				    SPDK_BDEV_SMALL_BUF_MAX_SIZE + 512,
647 				    cache_size,
648 				    SPDK_ENV_SOCKET_ID_ANY);
649 	if (!g_bdev_mgr.buf_small_pool) {
650 		SPDK_ERRLOG("create rbuf small pool failed\n");
651 		spdk_bdev_init_complete(-1);
652 		return;
653 	}
654 
655 	cache_size = BUF_LARGE_POOL_SIZE / (2 * spdk_env_get_core_count());
656 	snprintf(mempool_name, sizeof(mempool_name), "buf_large_pool_%d", getpid());
657 
658 	g_bdev_mgr.buf_large_pool = spdk_mempool_create(mempool_name,
659 				    BUF_LARGE_POOL_SIZE,
660 				    SPDK_BDEV_LARGE_BUF_MAX_SIZE + 512,
661 				    cache_size,
662 				    SPDK_ENV_SOCKET_ID_ANY);
663 	if (!g_bdev_mgr.buf_large_pool) {
664 		SPDK_ERRLOG("create rbuf large pool failed\n");
665 		spdk_bdev_init_complete(-1);
666 		return;
667 	}
668 
669 	g_bdev_mgr.zero_buffer = spdk_dma_zmalloc(ZERO_BUFFER_SIZE, ZERO_BUFFER_SIZE,
670 				 NULL);
671 	if (!g_bdev_mgr.zero_buffer) {
672 		SPDK_ERRLOG("create bdev zero buffer failed\n");
673 		spdk_bdev_init_complete(-1);
674 		return;
675 	}
676 
677 #ifdef SPDK_CONFIG_VTUNE
678 	g_bdev_mgr.domain = __itt_domain_create("spdk_bdev");
679 #endif
680 
681 	spdk_io_device_register(&g_bdev_mgr, spdk_bdev_mgmt_channel_create,
682 				spdk_bdev_mgmt_channel_destroy,
683 				sizeof(struct spdk_bdev_mgmt_channel));
684 
685 	rc = spdk_bdev_modules_init();
686 	if (rc != 0) {
687 		SPDK_ERRLOG("bdev modules init failed\n");
688 		spdk_bdev_init_complete(-1);
689 		return;
690 	}
691 
692 	spdk_bdev_module_action_complete();
693 }
694 
695 static void
696 spdk_bdev_mgr_unregister_cb(void *io_device)
697 {
698 	spdk_bdev_fini_cb cb_fn = g_fini_cb_fn;
699 
700 	cb_fn(g_fini_cb_arg);
701 	g_fini_cb_fn = NULL;
702 	g_fini_cb_arg = NULL;
703 }
704 
705 static struct spdk_bdev_module *g_resume_bdev_module = NULL;
706 
707 static void
708 spdk_bdev_module_finish_iter(void *arg)
709 {
710 	struct spdk_bdev_module *bdev_module;
711 
712 	/* Start iterating from the last touched module */
713 	if (!g_resume_bdev_module) {
714 		bdev_module = TAILQ_FIRST(&g_bdev_mgr.bdev_modules);
715 	} else {
716 		bdev_module = TAILQ_NEXT(g_resume_bdev_module, tailq);
717 	}
718 
719 	if (bdev_module) {
720 		/* Save our place so we can resume later. We must
721 		 * save the variable here, before calling module_fini()
722 		 * below, because in some cases the module may immediately
723 		 * call spdk_bdev_module_finish_done() and re-enter
724 		 * this function to continue iterating. */
725 		g_resume_bdev_module = bdev_module;
726 
727 		if (bdev_module->module_fini) {
728 			bdev_module->module_fini();
729 		}
730 
731 		if (!bdev_module->async_fini) {
732 			spdk_bdev_module_finish_done();
733 		}
734 
735 		return;
736 	}
737 
738 	g_resume_bdev_module = NULL;
739 
740 	if (spdk_mempool_count(g_bdev_mgr.bdev_io_pool) != SPDK_BDEV_IO_POOL_SIZE) {
741 		SPDK_ERRLOG("bdev IO pool count is %zu but should be %u\n",
742 			    spdk_mempool_count(g_bdev_mgr.bdev_io_pool),
743 			    SPDK_BDEV_IO_POOL_SIZE);
744 	}
745 
746 	if (spdk_mempool_count(g_bdev_mgr.buf_small_pool) != BUF_SMALL_POOL_SIZE) {
747 		SPDK_ERRLOG("Small buffer pool count is %zu but should be %u\n",
748 			    spdk_mempool_count(g_bdev_mgr.buf_small_pool),
749 			    BUF_SMALL_POOL_SIZE);
750 		assert(false);
751 	}
752 
753 	if (spdk_mempool_count(g_bdev_mgr.buf_large_pool) != BUF_LARGE_POOL_SIZE) {
754 		SPDK_ERRLOG("Large buffer pool count is %zu but should be %u\n",
755 			    spdk_mempool_count(g_bdev_mgr.buf_large_pool),
756 			    BUF_LARGE_POOL_SIZE);
757 		assert(false);
758 	}
759 
760 	spdk_mempool_free(g_bdev_mgr.bdev_io_pool);
761 	spdk_mempool_free(g_bdev_mgr.buf_small_pool);
762 	spdk_mempool_free(g_bdev_mgr.buf_large_pool);
763 	spdk_dma_free(g_bdev_mgr.zero_buffer);
764 
765 	spdk_io_device_unregister(&g_bdev_mgr, spdk_bdev_mgr_unregister_cb);
766 }
767 
768 static void
769 spdk_bdev_module_unregister_cb(void *io_device)
770 {
771 	if (spdk_get_thread() != g_fini_thread) {
772 		spdk_thread_send_msg(g_fini_thread, spdk_bdev_module_finish_iter, NULL);
773 	} else {
774 		spdk_bdev_module_finish_iter(NULL);
775 	}
776 }
777 
778 void
779 spdk_bdev_module_finish_done(void)
780 {
781 	spdk_io_device_unregister(g_resume_bdev_module, spdk_bdev_module_unregister_cb);
782 }
783 
784 static void
785 _spdk_bdev_finish_unregister_bdevs_iter(void *cb_arg, int bdeverrno)
786 {
787 	struct spdk_bdev *bdev = cb_arg;
788 
789 	if (bdeverrno && bdev) {
790 		SPDK_WARNLOG("Unable to unregister bdev '%s' during spdk_bdev_finish()\n",
791 			     bdev->name);
792 
793 		/*
794 		 * Since the call to spdk_bdev_unregister() failed, we have no way to free this
795 		 *  bdev; try to continue by manually removing this bdev from the list and continue
796 		 *  with the next bdev in the list.
797 		 */
798 		TAILQ_REMOVE(&g_bdev_mgr.bdevs, bdev, link);
799 	}
800 
801 	if (TAILQ_EMPTY(&g_bdev_mgr.bdevs)) {
802 		SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Done unregistering bdevs\n");
803 		/*
804 		 * Bdev module finish need to be deffered as we might be in the middle of some context
805 		 * (like bdev part free) that will use this bdev (or private bdev driver ctx data)
806 		 * after returning.
807 		 */
808 		spdk_thread_send_msg(spdk_get_thread(), spdk_bdev_module_finish_iter, NULL);
809 		return;
810 	}
811 
812 	/*
813 	 * Unregister the first bdev in the list.
814 	 *
815 	 * spdk_bdev_unregister() will handle the case where the bdev has open descriptors by
816 	 *  calling the remove_cb of the descriptors first.
817 	 *
818 	 * Once this bdev and all of its open descriptors have been cleaned up, this function
819 	 *  will be called again via the unregister completion callback to continue the cleanup
820 	 *  process with the next bdev.
821 	 */
822 	bdev = TAILQ_FIRST(&g_bdev_mgr.bdevs);
823 	SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Unregistering bdev '%s'\n", bdev->name);
824 	spdk_bdev_unregister(bdev, _spdk_bdev_finish_unregister_bdevs_iter, bdev);
825 }
826 
827 void
828 spdk_bdev_finish(spdk_bdev_fini_cb cb_fn, void *cb_arg)
829 {
830 	assert(cb_fn != NULL);
831 
832 	g_fini_thread = spdk_get_thread();
833 
834 	g_fini_cb_fn = cb_fn;
835 	g_fini_cb_arg = cb_arg;
836 
837 	_spdk_bdev_finish_unregister_bdevs_iter(NULL, 0);
838 }
839 
840 static struct spdk_bdev_io *
841 spdk_bdev_get_io(struct spdk_bdev_channel *channel)
842 {
843 	struct spdk_bdev_mgmt_channel *ch = channel->module_ch->mgmt_ch;
844 	struct spdk_bdev_io *bdev_io;
845 
846 	if (ch->per_thread_cache_count > 0) {
847 		bdev_io = STAILQ_FIRST(&ch->per_thread_cache);
848 		STAILQ_REMOVE_HEAD(&ch->per_thread_cache, buf_link);
849 		ch->per_thread_cache_count--;
850 	} else {
851 		bdev_io = spdk_mempool_get(g_bdev_mgr.bdev_io_pool);
852 		if (!bdev_io) {
853 			SPDK_ERRLOG("Unable to get spdk_bdev_io\n");
854 			return NULL;
855 		}
856 	}
857 
858 	return bdev_io;
859 }
860 
861 static void
862 spdk_bdev_put_io(struct spdk_bdev_io *bdev_io)
863 {
864 	struct spdk_bdev_mgmt_channel *ch = bdev_io->ch->module_ch->mgmt_ch;
865 
866 	if (bdev_io->buf != NULL) {
867 		spdk_bdev_io_put_buf(bdev_io);
868 	}
869 
870 	if (ch->per_thread_cache_count < SPDK_BDEV_IO_CACHE_SIZE) {
871 		ch->per_thread_cache_count++;
872 		STAILQ_INSERT_TAIL(&ch->per_thread_cache, bdev_io, buf_link);
873 	} else {
874 		spdk_mempool_put(g_bdev_mgr.bdev_io_pool, (void *)bdev_io);
875 	}
876 }
877 
878 static void
879 _spdk_bdev_qos_io_submit(struct spdk_bdev_channel *ch)
880 {
881 	struct spdk_bdev_io		*bdev_io = NULL;
882 	struct spdk_bdev		*bdev = ch->bdev;
883 	struct spdk_bdev_module_channel *module_ch = ch->module_ch;
884 
885 	while (!TAILQ_EMPTY(&ch->qos_io)) {
886 		if (ch->io_submitted_this_timeslice < ch->qos_max_ios_per_timeslice) {
887 			bdev_io = TAILQ_FIRST(&ch->qos_io);
888 			TAILQ_REMOVE(&ch->qos_io, bdev_io, link);
889 			ch->io_submitted_this_timeslice++;
890 			ch->io_outstanding++;
891 			module_ch->io_outstanding++;
892 			bdev->fn_table->submit_request(ch->channel, bdev_io);
893 		} else {
894 			break;
895 		}
896 	}
897 }
898 
899 static void
900 _spdk_bdev_io_submit(void *ctx)
901 {
902 	struct spdk_bdev_io *bdev_io = ctx;
903 	struct spdk_bdev *bdev = bdev_io->bdev;
904 	struct spdk_bdev_channel *bdev_ch = bdev_io->ch;
905 	struct spdk_io_channel *ch = bdev_ch->channel;
906 	struct spdk_bdev_module_channel	*module_ch = bdev_ch->module_ch;
907 
908 	bdev_io->submit_tsc = spdk_get_ticks();
909 	bdev_ch->io_outstanding++;
910 	module_ch->io_outstanding++;
911 	bdev_io->in_submit_request = true;
912 	if (spdk_likely(bdev_ch->flags == 0)) {
913 		if (spdk_likely(TAILQ_EMPTY(&module_ch->nomem_io))) {
914 			bdev->fn_table->submit_request(ch, bdev_io);
915 		} else {
916 			bdev_ch->io_outstanding--;
917 			module_ch->io_outstanding--;
918 			TAILQ_INSERT_TAIL(&module_ch->nomem_io, bdev_io, link);
919 		}
920 	} else if (bdev_ch->flags & BDEV_CH_RESET_IN_PROGRESS) {
921 		spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED);
922 	} else if (bdev_ch->flags & BDEV_CH_QOS_ENABLED) {
923 		bdev_ch->io_outstanding--;
924 		module_ch->io_outstanding--;
925 		TAILQ_INSERT_TAIL(&bdev_ch->qos_io, bdev_io, link);
926 		_spdk_bdev_qos_io_submit(bdev_ch);
927 	} else {
928 		SPDK_ERRLOG("unknown bdev_ch flag %x found\n", bdev_ch->flags);
929 		spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED);
930 	}
931 	bdev_io->in_submit_request = false;
932 }
933 
934 static void
935 spdk_bdev_io_submit(struct spdk_bdev_io *bdev_io)
936 {
937 	struct spdk_bdev *bdev = bdev_io->bdev;
938 
939 	assert(bdev_io->status == SPDK_BDEV_IO_STATUS_PENDING);
940 
941 	if (bdev_io->ch->flags & BDEV_CH_QOS_ENABLED) {
942 		bdev_io->io_submit_ch = bdev_io->ch;
943 		bdev_io->ch = bdev->qos_channel;
944 		spdk_thread_send_msg(bdev->qos_thread, _spdk_bdev_io_submit, bdev_io);
945 	} else {
946 		_spdk_bdev_io_submit(bdev_io);
947 	}
948 }
949 
950 static void
951 spdk_bdev_io_submit_reset(struct spdk_bdev_io *bdev_io)
952 {
953 	struct spdk_bdev *bdev = bdev_io->bdev;
954 	struct spdk_bdev_channel *bdev_ch = bdev_io->ch;
955 	struct spdk_io_channel *ch = bdev_ch->channel;
956 
957 	assert(bdev_io->status == SPDK_BDEV_IO_STATUS_PENDING);
958 
959 	bdev_io->in_submit_request = true;
960 	bdev->fn_table->submit_request(ch, bdev_io);
961 	bdev_io->in_submit_request = false;
962 }
963 
964 static void
965 spdk_bdev_io_init(struct spdk_bdev_io *bdev_io,
966 		  struct spdk_bdev *bdev, void *cb_arg,
967 		  spdk_bdev_io_completion_cb cb)
968 {
969 	bdev_io->bdev = bdev;
970 	bdev_io->caller_ctx = cb_arg;
971 	bdev_io->cb = cb;
972 	bdev_io->status = SPDK_BDEV_IO_STATUS_PENDING;
973 	bdev_io->in_submit_request = false;
974 	bdev_io->buf = NULL;
975 	bdev_io->io_submit_ch = NULL;
976 }
977 
978 bool
979 spdk_bdev_io_type_supported(struct spdk_bdev *bdev, enum spdk_bdev_io_type io_type)
980 {
981 	return bdev->fn_table->io_type_supported(bdev->ctxt, io_type);
982 }
983 
984 int
985 spdk_bdev_dump_info_json(struct spdk_bdev *bdev, struct spdk_json_write_ctx *w)
986 {
987 	if (bdev->fn_table->dump_info_json) {
988 		return bdev->fn_table->dump_info_json(bdev->ctxt, w);
989 	}
990 
991 	return 0;
992 }
993 
994 void
995 spdk_bdev_config_json(struct spdk_bdev *bdev, struct spdk_json_write_ctx *w)
996 {
997 	assert(bdev != NULL);
998 	assert(w != NULL);
999 
1000 	if (bdev->fn_table->write_config_json) {
1001 		bdev->fn_table->write_config_json(bdev, w);
1002 	} else {
1003 		spdk_json_write_object_begin(w);
1004 		spdk_json_write_named_string(w, "name", bdev->name);
1005 		spdk_json_write_object_end(w);
1006 	}
1007 }
1008 
1009 static void
1010 spdk_bdev_qos_get_max_ios_per_timeslice(struct spdk_bdev_channel *qos_ch)
1011 {
1012 	uint64_t		qos_max_ios_per_timeslice = 0;
1013 	struct spdk_bdev	*bdev = qos_ch->bdev;
1014 
1015 	qos_max_ios_per_timeslice = bdev->ios_per_sec * SPDK_BDEV_QOS_TIMESLICE_IN_USEC /
1016 				    SPDK_BDEV_SEC_TO_USEC;
1017 	qos_ch->qos_max_ios_per_timeslice = spdk_max(qos_max_ios_per_timeslice,
1018 					    SPDK_BDEV_QOS_MIN_IO_PER_TIMESLICE);
1019 }
1020 
1021 static int
1022 spdk_bdev_channel_poll_qos(void *arg)
1023 {
1024 	struct spdk_bdev_channel	*ch = arg;
1025 
1026 	/* Reset for next round of rate limiting */
1027 	ch->io_submitted_this_timeslice = 0;
1028 	spdk_bdev_qos_get_max_ios_per_timeslice(ch);
1029 
1030 	_spdk_bdev_qos_io_submit(ch);
1031 
1032 	return -1;
1033 }
1034 
1035 static int
1036 _spdk_bdev_channel_create(struct spdk_bdev_channel *ch, void *io_device)
1037 {
1038 	struct spdk_bdev		*bdev = __bdev_from_io_dev(io_device);
1039 
1040 	ch->bdev = bdev;
1041 	ch->channel = bdev->fn_table->get_io_channel(bdev->ctxt);
1042 	if (!ch->channel) {
1043 		return -1;
1044 	}
1045 
1046 	ch->module_ch = spdk_io_channel_get_ctx(spdk_get_io_channel(bdev->module));
1047 
1048 	memset(&ch->stat, 0, sizeof(ch->stat));
1049 	ch->io_outstanding = 0;
1050 	TAILQ_INIT(&ch->queued_resets);
1051 	TAILQ_INIT(&ch->qos_io);
1052 	ch->qos_max_ios_per_timeslice = 0;
1053 	ch->io_submitted_this_timeslice = 0;
1054 	ch->qos_poller = NULL;
1055 	ch->flags = 0;
1056 
1057 	return 0;
1058 }
1059 
1060 static void
1061 _spdk_bdev_channel_destroy_resource(struct spdk_bdev_channel *ch)
1062 {
1063 	if (!ch) {
1064 		return;
1065 	}
1066 
1067 	if (ch->channel) {
1068 		spdk_put_io_channel(ch->channel);
1069 	}
1070 
1071 	if (ch->module_ch) {
1072 		spdk_put_io_channel(spdk_io_channel_from_ctx(ch->module_ch));
1073 	}
1074 }
1075 
1076 /* Caller must hold bdev->mutex. */
1077 static int
1078 spdk_bdev_qos_channel_create(struct spdk_bdev *bdev)
1079 {
1080 	assert(bdev->qos_channel == NULL);
1081 	assert(bdev->qos_thread == NULL);
1082 
1083 	bdev->qos_channel = calloc(1, sizeof(struct spdk_bdev_channel));
1084 	if (!bdev->qos_channel) {
1085 		return -1;
1086 	}
1087 
1088 	bdev->qos_thread = spdk_get_thread();
1089 	if (!bdev->qos_thread) {
1090 		free(bdev->qos_channel);
1091 		bdev->qos_channel = NULL;
1092 		return -1;
1093 	}
1094 
1095 	if (_spdk_bdev_channel_create(bdev->qos_channel, __bdev_to_io_dev(bdev)) != 0) {
1096 		free(bdev->qos_channel);
1097 		bdev->qos_channel = NULL;
1098 		bdev->qos_thread = NULL;
1099 		return -1;
1100 	}
1101 
1102 	bdev->qos_channel->flags |= BDEV_CH_QOS_ENABLED;
1103 	spdk_bdev_qos_get_max_ios_per_timeslice(bdev->qos_channel);
1104 	bdev->qos_channel->qos_poller = spdk_poller_register(
1105 						spdk_bdev_channel_poll_qos,
1106 						bdev->qos_channel,
1107 						SPDK_BDEV_QOS_TIMESLICE_IN_USEC);
1108 
1109 	return 0;
1110 }
1111 
1112 static int
1113 spdk_bdev_channel_create(void *io_device, void *ctx_buf)
1114 {
1115 	struct spdk_bdev		*bdev = __bdev_from_io_dev(io_device);
1116 	struct spdk_bdev_channel	*ch = ctx_buf;
1117 
1118 	if (_spdk_bdev_channel_create(ch, io_device) != 0) {
1119 		_spdk_bdev_channel_destroy_resource(ch);
1120 		return -1;
1121 	}
1122 
1123 #ifdef SPDK_CONFIG_VTUNE
1124 	{
1125 		char *name;
1126 		__itt_init_ittlib(NULL, 0);
1127 		name = spdk_sprintf_alloc("spdk_bdev_%s_%p", ch->bdev->name, ch);
1128 		if (!name) {
1129 			_spdk_bdev_channel_destroy_resource(ch);
1130 			return -1;
1131 		}
1132 		ch->handle = __itt_string_handle_create(name);
1133 		free(name);
1134 		ch->start_tsc = spdk_get_ticks();
1135 		ch->interval_tsc = spdk_get_ticks_hz() / 100;
1136 	}
1137 #endif
1138 
1139 	pthread_mutex_lock(&bdev->mutex);
1140 
1141 	/* Rate limiting on this bdev enabled */
1142 	if (bdev->ios_per_sec) {
1143 		if (bdev->qos_channel == NULL) {
1144 			if (spdk_bdev_qos_channel_create(bdev) != 0) {
1145 				_spdk_bdev_channel_destroy_resource(ch);
1146 				pthread_mutex_unlock(&bdev->mutex);
1147 				return -1;
1148 			}
1149 		}
1150 		ch->flags |= BDEV_CH_QOS_ENABLED;
1151 	}
1152 
1153 	bdev->channel_count++;
1154 
1155 	pthread_mutex_unlock(&bdev->mutex);
1156 
1157 	return 0;
1158 }
1159 
1160 /*
1161  * Abort I/O that are waiting on a data buffer.  These types of I/O are
1162  *  linked using the spdk_bdev_io buf_link TAILQ_ENTRY.
1163  */
1164 static void
1165 _spdk_bdev_abort_buf_io(bdev_io_stailq_t *queue, struct spdk_bdev_channel *ch)
1166 {
1167 	bdev_io_stailq_t tmp;
1168 	struct spdk_bdev_io *bdev_io;
1169 
1170 	STAILQ_INIT(&tmp);
1171 
1172 	while (!STAILQ_EMPTY(queue)) {
1173 		bdev_io = STAILQ_FIRST(queue);
1174 		STAILQ_REMOVE_HEAD(queue, buf_link);
1175 		if (bdev_io->ch == ch) {
1176 			spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED);
1177 		} else {
1178 			STAILQ_INSERT_TAIL(&tmp, bdev_io, buf_link);
1179 		}
1180 	}
1181 
1182 	STAILQ_SWAP(&tmp, queue, spdk_bdev_io);
1183 }
1184 
1185 /*
1186  * Abort I/O that are queued waiting for submission.  These types of I/O are
1187  *  linked using the spdk_bdev_io link TAILQ_ENTRY.
1188  */
1189 static void
1190 _spdk_bdev_abort_queued_io(bdev_io_tailq_t *queue, struct spdk_bdev_channel *ch)
1191 {
1192 	struct spdk_bdev_io *bdev_io, *tmp;
1193 
1194 	TAILQ_FOREACH_SAFE(bdev_io, queue, link, tmp) {
1195 		if (bdev_io->ch == ch) {
1196 			TAILQ_REMOVE(queue, bdev_io, link);
1197 			/*
1198 			 * spdk_bdev_io_complete() assumes that the completed I/O had
1199 			 *  been submitted to the bdev module.  Since in this case it
1200 			 *  hadn't, bump io_outstanding to account for the decrement
1201 			 *  that spdk_bdev_io_complete() will do.
1202 			 */
1203 			if (bdev_io->type != SPDK_BDEV_IO_TYPE_RESET) {
1204 				ch->io_outstanding++;
1205 				ch->module_ch->io_outstanding++;
1206 			}
1207 			spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED);
1208 		}
1209 	}
1210 }
1211 
1212 static void
1213 _spdk_bdev_channel_destroy(struct spdk_bdev_channel *ch)
1214 {
1215 	struct spdk_bdev_mgmt_channel	*mgmt_ch;
1216 	struct spdk_bdev_module_channel	*module_ch = ch->module_ch;
1217 
1218 	mgmt_ch = module_ch->mgmt_ch;
1219 
1220 	_spdk_bdev_abort_queued_io(&ch->queued_resets, ch);
1221 	_spdk_bdev_abort_queued_io(&ch->qos_io, ch);
1222 	_spdk_bdev_abort_queued_io(&module_ch->nomem_io, ch);
1223 	_spdk_bdev_abort_buf_io(&mgmt_ch->need_buf_small, ch);
1224 	_spdk_bdev_abort_buf_io(&mgmt_ch->need_buf_large, ch);
1225 
1226 	_spdk_bdev_channel_destroy_resource(ch);
1227 }
1228 
1229 static void
1230 spdk_bdev_qos_channel_destroy(void *ctx)
1231 {
1232 	struct spdk_bdev_channel *qos_channel = ctx;
1233 
1234 	_spdk_bdev_channel_destroy(qos_channel);
1235 
1236 	spdk_poller_unregister(&qos_channel->qos_poller);
1237 	free(qos_channel);
1238 }
1239 
1240 static void
1241 spdk_bdev_channel_destroy(void *io_device, void *ctx_buf)
1242 {
1243 	struct spdk_bdev_channel	*ch = ctx_buf;
1244 	struct spdk_bdev		*bdev = ch->bdev;
1245 
1246 	_spdk_bdev_channel_destroy(ch);
1247 
1248 	pthread_mutex_lock(&bdev->mutex);
1249 	bdev->channel_count--;
1250 	if (bdev->channel_count == 0 && bdev->qos_channel != NULL) {
1251 		/* All I/O channels for this bdev have been destroyed - destroy the QoS channel. */
1252 		spdk_thread_send_msg(bdev->qos_thread, spdk_bdev_qos_channel_destroy,
1253 				     bdev->qos_channel);
1254 
1255 		/*
1256 		 * Set qos_channel to NULL within the critical section so that
1257 		 * if another channel is created, it will see qos_channel == NULL and
1258 		 * re-create the QoS channel even if the asynchronous qos_channel_destroy
1259 		 * isn't finished yet.
1260 		 */
1261 		bdev->qos_channel = NULL;
1262 		bdev->qos_thread = NULL;
1263 	}
1264 	pthread_mutex_unlock(&bdev->mutex);
1265 }
1266 
1267 int
1268 spdk_bdev_alias_add(struct spdk_bdev *bdev, const char *alias)
1269 {
1270 	struct spdk_bdev_alias *tmp;
1271 
1272 	if (alias == NULL) {
1273 		SPDK_ERRLOG("Empty alias passed\n");
1274 		return -EINVAL;
1275 	}
1276 
1277 	if (spdk_bdev_get_by_name(alias)) {
1278 		SPDK_ERRLOG("Bdev name/alias: %s already exists\n", alias);
1279 		return -EEXIST;
1280 	}
1281 
1282 	tmp = calloc(1, sizeof(*tmp));
1283 	if (tmp == NULL) {
1284 		SPDK_ERRLOG("Unable to allocate alias\n");
1285 		return -ENOMEM;
1286 	}
1287 
1288 	tmp->alias = strdup(alias);
1289 	if (tmp->alias == NULL) {
1290 		free(tmp);
1291 		SPDK_ERRLOG("Unable to allocate alias\n");
1292 		return -ENOMEM;
1293 	}
1294 
1295 	TAILQ_INSERT_TAIL(&bdev->aliases, tmp, tailq);
1296 
1297 	return 0;
1298 }
1299 
1300 int
1301 spdk_bdev_alias_del(struct spdk_bdev *bdev, const char *alias)
1302 {
1303 	struct spdk_bdev_alias *tmp;
1304 
1305 	TAILQ_FOREACH(tmp, &bdev->aliases, tailq) {
1306 		if (strcmp(alias, tmp->alias) == 0) {
1307 			TAILQ_REMOVE(&bdev->aliases, tmp, tailq);
1308 			free(tmp->alias);
1309 			free(tmp);
1310 			return 0;
1311 		}
1312 	}
1313 
1314 	SPDK_INFOLOG(SPDK_LOG_BDEV, "Alias %s does not exists\n", alias);
1315 
1316 	return -ENOENT;
1317 }
1318 
1319 struct spdk_io_channel *
1320 spdk_bdev_get_io_channel(struct spdk_bdev_desc *desc)
1321 {
1322 	return spdk_get_io_channel(__bdev_to_io_dev(desc->bdev));
1323 }
1324 
1325 const char *
1326 spdk_bdev_get_name(const struct spdk_bdev *bdev)
1327 {
1328 	return bdev->name;
1329 }
1330 
1331 const char *
1332 spdk_bdev_get_product_name(const struct spdk_bdev *bdev)
1333 {
1334 	return bdev->product_name;
1335 }
1336 
1337 const struct spdk_bdev_aliases_list *
1338 spdk_bdev_get_aliases(const struct spdk_bdev *bdev)
1339 {
1340 	return &bdev->aliases;
1341 }
1342 
1343 uint32_t
1344 spdk_bdev_get_block_size(const struct spdk_bdev *bdev)
1345 {
1346 	return bdev->blocklen;
1347 }
1348 
1349 uint64_t
1350 spdk_bdev_get_num_blocks(const struct spdk_bdev *bdev)
1351 {
1352 	return bdev->blockcnt;
1353 }
1354 
1355 size_t
1356 spdk_bdev_get_buf_align(const struct spdk_bdev *bdev)
1357 {
1358 	/* TODO: push this logic down to the bdev modules */
1359 	if (bdev->need_aligned_buffer) {
1360 		return bdev->blocklen;
1361 	}
1362 
1363 	return 1;
1364 }
1365 
1366 uint32_t
1367 spdk_bdev_get_optimal_io_boundary(const struct spdk_bdev *bdev)
1368 {
1369 	return bdev->optimal_io_boundary;
1370 }
1371 
1372 bool
1373 spdk_bdev_has_write_cache(const struct spdk_bdev *bdev)
1374 {
1375 	return bdev->write_cache;
1376 }
1377 
1378 const struct spdk_uuid *
1379 spdk_bdev_get_uuid(const struct spdk_bdev *bdev)
1380 {
1381 	return &bdev->uuid;
1382 }
1383 
1384 int
1385 spdk_bdev_notify_blockcnt_change(struct spdk_bdev *bdev, uint64_t size)
1386 {
1387 	int ret;
1388 
1389 	pthread_mutex_lock(&bdev->mutex);
1390 
1391 	/* bdev has open descriptors */
1392 	if (!TAILQ_EMPTY(&bdev->open_descs) &&
1393 	    bdev->blockcnt > size) {
1394 		ret = -EBUSY;
1395 	} else {
1396 		bdev->blockcnt = size;
1397 		ret = 0;
1398 	}
1399 
1400 	pthread_mutex_unlock(&bdev->mutex);
1401 
1402 	return ret;
1403 }
1404 
1405 /*
1406  * Convert I/O offset and length from bytes to blocks.
1407  *
1408  * Returns zero on success or non-zero if the byte parameters aren't divisible by the block size.
1409  */
1410 static uint64_t
1411 spdk_bdev_bytes_to_blocks(struct spdk_bdev *bdev, uint64_t offset_bytes, uint64_t *offset_blocks,
1412 			  uint64_t num_bytes, uint64_t *num_blocks)
1413 {
1414 	uint32_t block_size = bdev->blocklen;
1415 
1416 	*offset_blocks = offset_bytes / block_size;
1417 	*num_blocks = num_bytes / block_size;
1418 
1419 	return (offset_bytes % block_size) | (num_bytes % block_size);
1420 }
1421 
1422 static bool
1423 spdk_bdev_io_valid_blocks(struct spdk_bdev *bdev, uint64_t offset_blocks, uint64_t num_blocks)
1424 {
1425 	/* Return failure if offset_blocks + num_blocks is less than offset_blocks; indicates there
1426 	 * has been an overflow and hence the offset has been wrapped around */
1427 	if (offset_blocks + num_blocks < offset_blocks) {
1428 		return false;
1429 	}
1430 
1431 	/* Return failure if offset_blocks + num_blocks exceeds the size of the bdev */
1432 	if (offset_blocks + num_blocks > bdev->blockcnt) {
1433 		return false;
1434 	}
1435 
1436 	return true;
1437 }
1438 
1439 int
1440 spdk_bdev_read(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
1441 	       void *buf, uint64_t offset, uint64_t nbytes,
1442 	       spdk_bdev_io_completion_cb cb, void *cb_arg)
1443 {
1444 	uint64_t offset_blocks, num_blocks;
1445 
1446 	if (spdk_bdev_bytes_to_blocks(desc->bdev, offset, &offset_blocks, nbytes, &num_blocks) != 0) {
1447 		return -EINVAL;
1448 	}
1449 
1450 	return spdk_bdev_read_blocks(desc, ch, buf, offset_blocks, num_blocks, cb, cb_arg);
1451 }
1452 
1453 int
1454 spdk_bdev_read_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
1455 		      void *buf, uint64_t offset_blocks, uint64_t num_blocks,
1456 		      spdk_bdev_io_completion_cb cb, void *cb_arg)
1457 {
1458 	struct spdk_bdev *bdev = desc->bdev;
1459 	struct spdk_bdev_io *bdev_io;
1460 	struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch);
1461 
1462 	if (!spdk_bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) {
1463 		return -EINVAL;
1464 	}
1465 
1466 	bdev_io = spdk_bdev_get_io(channel);
1467 	if (!bdev_io) {
1468 		SPDK_ERRLOG("spdk_bdev_io memory allocation failed duing read\n");
1469 		return -ENOMEM;
1470 	}
1471 
1472 	bdev_io->ch = channel;
1473 	bdev_io->type = SPDK_BDEV_IO_TYPE_READ;
1474 	bdev_io->u.bdev.iov.iov_base = buf;
1475 	bdev_io->u.bdev.iov.iov_len = num_blocks * bdev->blocklen;
1476 	bdev_io->u.bdev.iovs = &bdev_io->u.bdev.iov;
1477 	bdev_io->u.bdev.iovcnt = 1;
1478 	bdev_io->u.bdev.num_blocks = num_blocks;
1479 	bdev_io->u.bdev.offset_blocks = offset_blocks;
1480 	spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb);
1481 
1482 	spdk_bdev_io_submit(bdev_io);
1483 	return 0;
1484 }
1485 
1486 int
1487 spdk_bdev_readv(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
1488 		struct iovec *iov, int iovcnt,
1489 		uint64_t offset, uint64_t nbytes,
1490 		spdk_bdev_io_completion_cb cb, void *cb_arg)
1491 {
1492 	uint64_t offset_blocks, num_blocks;
1493 
1494 	if (spdk_bdev_bytes_to_blocks(desc->bdev, offset, &offset_blocks, nbytes, &num_blocks) != 0) {
1495 		return -EINVAL;
1496 	}
1497 
1498 	return spdk_bdev_readv_blocks(desc, ch, iov, iovcnt, offset_blocks, num_blocks, cb, cb_arg);
1499 }
1500 
1501 int spdk_bdev_readv_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
1502 			   struct iovec *iov, int iovcnt,
1503 			   uint64_t offset_blocks, uint64_t num_blocks,
1504 			   spdk_bdev_io_completion_cb cb, void *cb_arg)
1505 {
1506 	struct spdk_bdev *bdev = desc->bdev;
1507 	struct spdk_bdev_io *bdev_io;
1508 	struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch);
1509 
1510 	if (!spdk_bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) {
1511 		return -EINVAL;
1512 	}
1513 
1514 	bdev_io = spdk_bdev_get_io(channel);
1515 	if (!bdev_io) {
1516 		SPDK_ERRLOG("spdk_bdev_io memory allocation failed duing read\n");
1517 		return -ENOMEM;
1518 	}
1519 
1520 	bdev_io->ch = channel;
1521 	bdev_io->type = SPDK_BDEV_IO_TYPE_READ;
1522 	bdev_io->u.bdev.iovs = iov;
1523 	bdev_io->u.bdev.iovcnt = iovcnt;
1524 	bdev_io->u.bdev.num_blocks = num_blocks;
1525 	bdev_io->u.bdev.offset_blocks = offset_blocks;
1526 	spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb);
1527 
1528 	spdk_bdev_io_submit(bdev_io);
1529 	return 0;
1530 }
1531 
1532 int
1533 spdk_bdev_write(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
1534 		void *buf, uint64_t offset, uint64_t nbytes,
1535 		spdk_bdev_io_completion_cb cb, void *cb_arg)
1536 {
1537 	uint64_t offset_blocks, num_blocks;
1538 
1539 	if (spdk_bdev_bytes_to_blocks(desc->bdev, offset, &offset_blocks, nbytes, &num_blocks) != 0) {
1540 		return -EINVAL;
1541 	}
1542 
1543 	return spdk_bdev_write_blocks(desc, ch, buf, offset_blocks, num_blocks, cb, cb_arg);
1544 }
1545 
1546 int
1547 spdk_bdev_write_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
1548 		       void *buf, uint64_t offset_blocks, uint64_t num_blocks,
1549 		       spdk_bdev_io_completion_cb cb, void *cb_arg)
1550 {
1551 	struct spdk_bdev *bdev = desc->bdev;
1552 	struct spdk_bdev_io *bdev_io;
1553 	struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch);
1554 
1555 	if (!desc->write) {
1556 		return -EBADF;
1557 	}
1558 
1559 	if (!spdk_bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) {
1560 		return -EINVAL;
1561 	}
1562 
1563 	bdev_io = spdk_bdev_get_io(channel);
1564 	if (!bdev_io) {
1565 		SPDK_ERRLOG("bdev_io memory allocation failed duing write\n");
1566 		return -ENOMEM;
1567 	}
1568 
1569 	bdev_io->ch = channel;
1570 	bdev_io->type = SPDK_BDEV_IO_TYPE_WRITE;
1571 	bdev_io->u.bdev.iov.iov_base = buf;
1572 	bdev_io->u.bdev.iov.iov_len = num_blocks * bdev->blocklen;
1573 	bdev_io->u.bdev.iovs = &bdev_io->u.bdev.iov;
1574 	bdev_io->u.bdev.iovcnt = 1;
1575 	bdev_io->u.bdev.num_blocks = num_blocks;
1576 	bdev_io->u.bdev.offset_blocks = offset_blocks;
1577 	spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb);
1578 
1579 	spdk_bdev_io_submit(bdev_io);
1580 	return 0;
1581 }
1582 
1583 int
1584 spdk_bdev_writev(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
1585 		 struct iovec *iov, int iovcnt,
1586 		 uint64_t offset, uint64_t len,
1587 		 spdk_bdev_io_completion_cb cb, void *cb_arg)
1588 {
1589 	uint64_t offset_blocks, num_blocks;
1590 
1591 	if (spdk_bdev_bytes_to_blocks(desc->bdev, offset, &offset_blocks, len, &num_blocks) != 0) {
1592 		return -EINVAL;
1593 	}
1594 
1595 	return spdk_bdev_writev_blocks(desc, ch, iov, iovcnt, offset_blocks, num_blocks, cb, cb_arg);
1596 }
1597 
1598 int
1599 spdk_bdev_writev_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
1600 			struct iovec *iov, int iovcnt,
1601 			uint64_t offset_blocks, uint64_t num_blocks,
1602 			spdk_bdev_io_completion_cb cb, void *cb_arg)
1603 {
1604 	struct spdk_bdev *bdev = desc->bdev;
1605 	struct spdk_bdev_io *bdev_io;
1606 	struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch);
1607 
1608 	if (!desc->write) {
1609 		return -EBADF;
1610 	}
1611 
1612 	if (!spdk_bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) {
1613 		return -EINVAL;
1614 	}
1615 
1616 	bdev_io = spdk_bdev_get_io(channel);
1617 	if (!bdev_io) {
1618 		SPDK_ERRLOG("bdev_io memory allocation failed duing writev\n");
1619 		return -ENOMEM;
1620 	}
1621 
1622 	bdev_io->ch = channel;
1623 	bdev_io->type = SPDK_BDEV_IO_TYPE_WRITE;
1624 	bdev_io->u.bdev.iovs = iov;
1625 	bdev_io->u.bdev.iovcnt = iovcnt;
1626 	bdev_io->u.bdev.num_blocks = num_blocks;
1627 	bdev_io->u.bdev.offset_blocks = offset_blocks;
1628 	spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb);
1629 
1630 	spdk_bdev_io_submit(bdev_io);
1631 	return 0;
1632 }
1633 
1634 int
1635 spdk_bdev_write_zeroes(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
1636 		       uint64_t offset, uint64_t len,
1637 		       spdk_bdev_io_completion_cb cb, void *cb_arg)
1638 {
1639 	uint64_t offset_blocks, num_blocks;
1640 
1641 	if (spdk_bdev_bytes_to_blocks(desc->bdev, offset, &offset_blocks, len, &num_blocks) != 0) {
1642 		return -EINVAL;
1643 	}
1644 
1645 	return spdk_bdev_write_zeroes_blocks(desc, ch, offset_blocks, num_blocks, cb, cb_arg);
1646 }
1647 
1648 int
1649 spdk_bdev_write_zeroes_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
1650 			      uint64_t offset_blocks, uint64_t num_blocks,
1651 			      spdk_bdev_io_completion_cb cb, void *cb_arg)
1652 {
1653 	struct spdk_bdev *bdev = desc->bdev;
1654 	struct spdk_bdev_io *bdev_io;
1655 	struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch);
1656 	uint64_t len;
1657 	bool split_request = false;
1658 
1659 	if (num_blocks > UINT64_MAX / spdk_bdev_get_block_size(bdev)) {
1660 		SPDK_ERRLOG("length argument out of range in write_zeroes\n");
1661 		return -ERANGE;
1662 	}
1663 
1664 	if (!spdk_bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) {
1665 		return -EINVAL;
1666 	}
1667 
1668 	bdev_io = spdk_bdev_get_io(channel);
1669 
1670 	if (!bdev_io) {
1671 		SPDK_ERRLOG("bdev_io memory allocation failed duing write_zeroes\n");
1672 		return -ENOMEM;
1673 	}
1674 
1675 	bdev_io->ch = channel;
1676 	bdev_io->u.bdev.offset_blocks = offset_blocks;
1677 
1678 	if (spdk_bdev_io_type_supported(bdev, SPDK_BDEV_IO_TYPE_WRITE_ZEROES)) {
1679 		bdev_io->type = SPDK_BDEV_IO_TYPE_WRITE_ZEROES;
1680 		bdev_io->u.bdev.num_blocks = num_blocks;
1681 		bdev_io->u.bdev.iovs = NULL;
1682 		bdev_io->u.bdev.iovcnt = 0;
1683 
1684 	} else {
1685 		assert(spdk_bdev_get_block_size(bdev) <= ZERO_BUFFER_SIZE);
1686 
1687 		len = spdk_bdev_get_block_size(bdev) * num_blocks;
1688 
1689 		if (len > ZERO_BUFFER_SIZE) {
1690 			split_request = true;
1691 			len = ZERO_BUFFER_SIZE;
1692 		}
1693 
1694 		bdev_io->type = SPDK_BDEV_IO_TYPE_WRITE;
1695 		bdev_io->u.bdev.iov.iov_base = g_bdev_mgr.zero_buffer;
1696 		bdev_io->u.bdev.iov.iov_len = len;
1697 		bdev_io->u.bdev.iovs = &bdev_io->u.bdev.iov;
1698 		bdev_io->u.bdev.iovcnt = 1;
1699 		bdev_io->u.bdev.num_blocks = len / spdk_bdev_get_block_size(bdev);
1700 		bdev_io->u.bdev.split_remaining_num_blocks = num_blocks - bdev_io->u.bdev.num_blocks;
1701 		bdev_io->u.bdev.split_current_offset_blocks = offset_blocks + bdev_io->u.bdev.num_blocks;
1702 	}
1703 
1704 	if (split_request) {
1705 		bdev_io->u.bdev.stored_user_cb = cb;
1706 		spdk_bdev_io_init(bdev_io, bdev, cb_arg, spdk_bdev_write_zeroes_split);
1707 	} else {
1708 		spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb);
1709 	}
1710 	spdk_bdev_io_submit(bdev_io);
1711 	return 0;
1712 }
1713 
1714 int
1715 spdk_bdev_unmap(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
1716 		uint64_t offset, uint64_t nbytes,
1717 		spdk_bdev_io_completion_cb cb, void *cb_arg)
1718 {
1719 	uint64_t offset_blocks, num_blocks;
1720 
1721 	if (spdk_bdev_bytes_to_blocks(desc->bdev, offset, &offset_blocks, nbytes, &num_blocks) != 0) {
1722 		return -EINVAL;
1723 	}
1724 
1725 	return spdk_bdev_unmap_blocks(desc, ch, offset_blocks, num_blocks, cb, cb_arg);
1726 }
1727 
1728 int
1729 spdk_bdev_unmap_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
1730 		       uint64_t offset_blocks, uint64_t num_blocks,
1731 		       spdk_bdev_io_completion_cb cb, void *cb_arg)
1732 {
1733 	struct spdk_bdev *bdev = desc->bdev;
1734 	struct spdk_bdev_io *bdev_io;
1735 	struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch);
1736 
1737 	if (!desc->write) {
1738 		return -EBADF;
1739 	}
1740 
1741 	if (!spdk_bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) {
1742 		return -EINVAL;
1743 	}
1744 
1745 	if (num_blocks == 0) {
1746 		SPDK_ERRLOG("Can't unmap 0 bytes\n");
1747 		return -EINVAL;
1748 	}
1749 
1750 	bdev_io = spdk_bdev_get_io(channel);
1751 	if (!bdev_io) {
1752 		SPDK_ERRLOG("bdev_io memory allocation failed duing unmap\n");
1753 		return -ENOMEM;
1754 	}
1755 
1756 	bdev_io->ch = channel;
1757 	bdev_io->type = SPDK_BDEV_IO_TYPE_UNMAP;
1758 	bdev_io->u.bdev.iov.iov_base = NULL;
1759 	bdev_io->u.bdev.iov.iov_len = 0;
1760 	bdev_io->u.bdev.iovs = &bdev_io->u.bdev.iov;
1761 	bdev_io->u.bdev.iovcnt = 1;
1762 	bdev_io->u.bdev.offset_blocks = offset_blocks;
1763 	bdev_io->u.bdev.num_blocks = num_blocks;
1764 	spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb);
1765 
1766 	spdk_bdev_io_submit(bdev_io);
1767 	return 0;
1768 }
1769 
1770 int
1771 spdk_bdev_flush(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
1772 		uint64_t offset, uint64_t length,
1773 		spdk_bdev_io_completion_cb cb, void *cb_arg)
1774 {
1775 	uint64_t offset_blocks, num_blocks;
1776 
1777 	if (spdk_bdev_bytes_to_blocks(desc->bdev, offset, &offset_blocks, length, &num_blocks) != 0) {
1778 		return -EINVAL;
1779 	}
1780 
1781 	return spdk_bdev_flush_blocks(desc, ch, offset_blocks, num_blocks, cb, cb_arg);
1782 }
1783 
1784 int
1785 spdk_bdev_flush_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
1786 		       uint64_t offset_blocks, uint64_t num_blocks,
1787 		       spdk_bdev_io_completion_cb cb, void *cb_arg)
1788 {
1789 	struct spdk_bdev *bdev = desc->bdev;
1790 	struct spdk_bdev_io *bdev_io;
1791 	struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch);
1792 
1793 	if (!desc->write) {
1794 		return -EBADF;
1795 	}
1796 
1797 	if (!spdk_bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) {
1798 		return -EINVAL;
1799 	}
1800 
1801 	bdev_io = spdk_bdev_get_io(channel);
1802 	if (!bdev_io) {
1803 		SPDK_ERRLOG("bdev_io memory allocation failed duing flush\n");
1804 		return -ENOMEM;
1805 	}
1806 
1807 	bdev_io->ch = channel;
1808 	bdev_io->type = SPDK_BDEV_IO_TYPE_FLUSH;
1809 	bdev_io->u.bdev.iovs = NULL;
1810 	bdev_io->u.bdev.iovcnt = 0;
1811 	bdev_io->u.bdev.offset_blocks = offset_blocks;
1812 	bdev_io->u.bdev.num_blocks = num_blocks;
1813 	spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb);
1814 
1815 	spdk_bdev_io_submit(bdev_io);
1816 	return 0;
1817 }
1818 
1819 static void
1820 _spdk_bdev_reset_dev(struct spdk_io_channel_iter *i, int status)
1821 {
1822 	struct spdk_bdev_channel *ch = spdk_io_channel_iter_get_ctx(i);
1823 	struct spdk_bdev_io *bdev_io;
1824 
1825 	bdev_io = TAILQ_FIRST(&ch->queued_resets);
1826 	TAILQ_REMOVE(&ch->queued_resets, bdev_io, link);
1827 	spdk_bdev_io_submit_reset(bdev_io);
1828 }
1829 
1830 static void
1831 _spdk_bdev_reset_freeze_channel(struct spdk_io_channel_iter *i)
1832 {
1833 	struct spdk_io_channel		*ch;
1834 	struct spdk_bdev_channel	*channel;
1835 	struct spdk_bdev_mgmt_channel	*mgmt_channel;
1836 	struct spdk_bdev_module_channel	*module_ch;
1837 
1838 	ch = spdk_io_channel_iter_get_channel(i);
1839 	channel = spdk_io_channel_get_ctx(ch);
1840 	module_ch = channel->module_ch;
1841 	mgmt_channel = module_ch->mgmt_ch;
1842 
1843 	channel->flags |= BDEV_CH_RESET_IN_PROGRESS;
1844 
1845 	_spdk_bdev_abort_queued_io(&module_ch->nomem_io, channel);
1846 	_spdk_bdev_abort_queued_io(&channel->qos_io, channel);
1847 	_spdk_bdev_abort_buf_io(&mgmt_channel->need_buf_small, channel);
1848 	_spdk_bdev_abort_buf_io(&mgmt_channel->need_buf_large, channel);
1849 
1850 	spdk_for_each_channel_continue(i, 0);
1851 }
1852 
1853 static void
1854 _spdk_bdev_reset_freeze_qos_channel(void *ctx)
1855 {
1856 	struct spdk_bdev		*bdev = ctx;
1857 	struct spdk_bdev_mgmt_channel	*mgmt_channel = NULL;
1858 	struct spdk_bdev_channel	*qos_channel = bdev->qos_channel;
1859 	struct spdk_bdev_module_channel	*module_ch = NULL;
1860 
1861 	if (qos_channel) {
1862 		module_ch = qos_channel->module_ch;
1863 		mgmt_channel = module_ch->mgmt_ch;
1864 
1865 		qos_channel->flags |= BDEV_CH_RESET_IN_PROGRESS;
1866 
1867 		_spdk_bdev_abort_queued_io(&module_ch->nomem_io, qos_channel);
1868 		_spdk_bdev_abort_queued_io(&qos_channel->qos_io, qos_channel);
1869 		_spdk_bdev_abort_buf_io(&mgmt_channel->need_buf_small, qos_channel);
1870 		_spdk_bdev_abort_buf_io(&mgmt_channel->need_buf_large, qos_channel);
1871 	}
1872 }
1873 
1874 static void
1875 _spdk_bdev_start_reset(void *ctx)
1876 {
1877 	struct spdk_bdev_channel *ch = ctx;
1878 
1879 	spdk_for_each_channel(__bdev_to_io_dev(ch->bdev), _spdk_bdev_reset_freeze_channel,
1880 			      ch, _spdk_bdev_reset_dev);
1881 }
1882 
1883 static void
1884 _spdk_bdev_channel_start_reset(struct spdk_bdev_channel *ch)
1885 {
1886 	struct spdk_bdev *bdev = ch->bdev;
1887 
1888 	assert(!TAILQ_EMPTY(&ch->queued_resets));
1889 
1890 	pthread_mutex_lock(&bdev->mutex);
1891 	if (bdev->reset_in_progress == NULL) {
1892 		bdev->reset_in_progress = TAILQ_FIRST(&ch->queued_resets);
1893 		/*
1894 		 * Take a channel reference for the target bdev for the life of this
1895 		 *  reset.  This guards against the channel getting destroyed while
1896 		 *  spdk_for_each_channel() calls related to this reset IO are in
1897 		 *  progress.  We will release the reference when this reset is
1898 		 *  completed.
1899 		 */
1900 		bdev->reset_in_progress->u.reset.ch_ref = spdk_get_io_channel(__bdev_to_io_dev(bdev));
1901 		_spdk_bdev_start_reset(ch);
1902 	}
1903 	pthread_mutex_unlock(&bdev->mutex);
1904 }
1905 
1906 int
1907 spdk_bdev_reset(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
1908 		spdk_bdev_io_completion_cb cb, void *cb_arg)
1909 {
1910 	struct spdk_bdev *bdev = desc->bdev;
1911 	struct spdk_bdev_io *bdev_io;
1912 	struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch);
1913 
1914 	bdev_io = spdk_bdev_get_io(channel);
1915 	if (!bdev_io) {
1916 		SPDK_ERRLOG("bdev_io memory allocation failed duing reset\n");
1917 		return -ENOMEM;
1918 	}
1919 
1920 	bdev_io->ch = channel;
1921 	bdev_io->type = SPDK_BDEV_IO_TYPE_RESET;
1922 	bdev_io->u.reset.ch_ref = NULL;
1923 	spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb);
1924 
1925 	pthread_mutex_lock(&bdev->mutex);
1926 	TAILQ_INSERT_TAIL(&channel->queued_resets, bdev_io, link);
1927 	pthread_mutex_unlock(&bdev->mutex);
1928 
1929 	_spdk_bdev_channel_start_reset(channel);
1930 
1931 	/* Explicitly handle the QoS bdev channel as no IO channel associated */
1932 	if (bdev->qos_thread) {
1933 		spdk_thread_send_msg(bdev->qos_thread,
1934 				     _spdk_bdev_reset_freeze_qos_channel, bdev);
1935 	}
1936 
1937 	return 0;
1938 }
1939 
1940 void
1941 spdk_bdev_get_io_stat(struct spdk_bdev *bdev, struct spdk_io_channel *ch,
1942 		      struct spdk_bdev_io_stat *stat)
1943 {
1944 #ifdef SPDK_CONFIG_VTUNE
1945 	SPDK_ERRLOG("Calling spdk_bdev_get_io_stat is not allowed when VTune integration is enabled.\n");
1946 	memset(stat, 0, sizeof(*stat));
1947 	return;
1948 #endif
1949 
1950 	struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch);
1951 
1952 	channel->stat.ticks_rate = spdk_get_ticks_hz();
1953 	*stat = channel->stat;
1954 	memset(&channel->stat, 0, sizeof(channel->stat));
1955 }
1956 
1957 int
1958 spdk_bdev_nvme_admin_passthru(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
1959 			      const struct spdk_nvme_cmd *cmd, void *buf, size_t nbytes,
1960 			      spdk_bdev_io_completion_cb cb, void *cb_arg)
1961 {
1962 	struct spdk_bdev *bdev = desc->bdev;
1963 	struct spdk_bdev_io *bdev_io;
1964 	struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch);
1965 
1966 	if (!desc->write) {
1967 		return -EBADF;
1968 	}
1969 
1970 	bdev_io = spdk_bdev_get_io(channel);
1971 	if (!bdev_io) {
1972 		SPDK_ERRLOG("bdev_io memory allocation failed during nvme_admin_passthru\n");
1973 		return -ENOMEM;
1974 	}
1975 
1976 	bdev_io->ch = channel;
1977 	bdev_io->type = SPDK_BDEV_IO_TYPE_NVME_ADMIN;
1978 	bdev_io->u.nvme_passthru.cmd = *cmd;
1979 	bdev_io->u.nvme_passthru.buf = buf;
1980 	bdev_io->u.nvme_passthru.nbytes = nbytes;
1981 	bdev_io->u.nvme_passthru.md_buf = NULL;
1982 	bdev_io->u.nvme_passthru.md_len = 0;
1983 
1984 	spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb);
1985 
1986 	spdk_bdev_io_submit(bdev_io);
1987 	return 0;
1988 }
1989 
1990 int
1991 spdk_bdev_nvme_io_passthru(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
1992 			   const struct spdk_nvme_cmd *cmd, void *buf, size_t nbytes,
1993 			   spdk_bdev_io_completion_cb cb, void *cb_arg)
1994 {
1995 	struct spdk_bdev *bdev = desc->bdev;
1996 	struct spdk_bdev_io *bdev_io;
1997 	struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch);
1998 
1999 	if (!desc->write) {
2000 		/*
2001 		 * Do not try to parse the NVMe command - we could maybe use bits in the opcode
2002 		 *  to easily determine if the command is a read or write, but for now just
2003 		 *  do not allow io_passthru with a read-only descriptor.
2004 		 */
2005 		return -EBADF;
2006 	}
2007 
2008 	bdev_io = spdk_bdev_get_io(channel);
2009 	if (!bdev_io) {
2010 		SPDK_ERRLOG("bdev_io memory allocation failed during nvme_admin_passthru\n");
2011 		return -ENOMEM;
2012 	}
2013 
2014 	bdev_io->ch = channel;
2015 	bdev_io->type = SPDK_BDEV_IO_TYPE_NVME_IO;
2016 	bdev_io->u.nvme_passthru.cmd = *cmd;
2017 	bdev_io->u.nvme_passthru.buf = buf;
2018 	bdev_io->u.nvme_passthru.nbytes = nbytes;
2019 	bdev_io->u.nvme_passthru.md_buf = NULL;
2020 	bdev_io->u.nvme_passthru.md_len = 0;
2021 
2022 	spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb);
2023 
2024 	spdk_bdev_io_submit(bdev_io);
2025 	return 0;
2026 }
2027 
2028 int
2029 spdk_bdev_nvme_io_passthru_md(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
2030 			      const struct spdk_nvme_cmd *cmd, void *buf, size_t nbytes, void *md_buf, size_t md_len,
2031 			      spdk_bdev_io_completion_cb cb, void *cb_arg)
2032 {
2033 	struct spdk_bdev *bdev = desc->bdev;
2034 	struct spdk_bdev_io *bdev_io;
2035 	struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch);
2036 
2037 	if (!desc->write) {
2038 		/*
2039 		 * Do not try to parse the NVMe command - we could maybe use bits in the opcode
2040 		 *  to easily determine if the command is a read or write, but for now just
2041 		 *  do not allow io_passthru with a read-only descriptor.
2042 		 */
2043 		return -EBADF;
2044 	}
2045 
2046 	bdev_io = spdk_bdev_get_io(channel);
2047 	if (!bdev_io) {
2048 		SPDK_ERRLOG("bdev_io memory allocation failed during nvme_admin_passthru\n");
2049 		return -ENOMEM;
2050 	}
2051 
2052 	bdev_io->ch = channel;
2053 	bdev_io->type = SPDK_BDEV_IO_TYPE_NVME_IO_MD;
2054 	bdev_io->u.nvme_passthru.cmd = *cmd;
2055 	bdev_io->u.nvme_passthru.buf = buf;
2056 	bdev_io->u.nvme_passthru.nbytes = nbytes;
2057 	bdev_io->u.nvme_passthru.md_buf = md_buf;
2058 	bdev_io->u.nvme_passthru.md_len = md_len;
2059 
2060 	spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb);
2061 
2062 	spdk_bdev_io_submit(bdev_io);
2063 	return 0;
2064 }
2065 
2066 int
2067 spdk_bdev_free_io(struct spdk_bdev_io *bdev_io)
2068 {
2069 	if (!bdev_io) {
2070 		SPDK_ERRLOG("bdev_io is NULL\n");
2071 		return -1;
2072 	}
2073 
2074 	if (bdev_io->status == SPDK_BDEV_IO_STATUS_PENDING) {
2075 		SPDK_ERRLOG("bdev_io is in pending state\n");
2076 		assert(false);
2077 		return -1;
2078 	}
2079 
2080 	spdk_bdev_put_io(bdev_io);
2081 
2082 	return 0;
2083 }
2084 
2085 static void
2086 _spdk_bdev_ch_retry_io(struct spdk_bdev_channel *bdev_ch)
2087 {
2088 	struct spdk_bdev *bdev = bdev_ch->bdev;
2089 	struct spdk_bdev_module_channel	*module_ch = bdev_ch->module_ch;
2090 	struct spdk_bdev_io *bdev_io;
2091 
2092 	if (module_ch->io_outstanding > module_ch->nomem_threshold) {
2093 		/*
2094 		 * Allow some more I/O to complete before retrying the nomem_io queue.
2095 		 *  Some drivers (such as nvme) cannot immediately take a new I/O in
2096 		 *  the context of a completion, because the resources for the I/O are
2097 		 *  not released until control returns to the bdev poller.  Also, we
2098 		 *  may require several small I/O to complete before a larger I/O
2099 		 *  (that requires splitting) can be submitted.
2100 		 */
2101 		return;
2102 	}
2103 
2104 	while (!TAILQ_EMPTY(&module_ch->nomem_io)) {
2105 		bdev_io = TAILQ_FIRST(&module_ch->nomem_io);
2106 		TAILQ_REMOVE(&module_ch->nomem_io, bdev_io, link);
2107 		bdev_io->ch->io_outstanding++;
2108 		module_ch->io_outstanding++;
2109 		bdev_io->status = SPDK_BDEV_IO_STATUS_PENDING;
2110 		bdev->fn_table->submit_request(bdev_io->ch->channel, bdev_io);
2111 		if (bdev_io->status == SPDK_BDEV_IO_STATUS_NOMEM) {
2112 			break;
2113 		}
2114 	}
2115 }
2116 
2117 static inline void
2118 _spdk_bdev_io_complete(void *ctx)
2119 {
2120 	struct spdk_bdev_io *bdev_io = ctx;
2121 
2122 	if (spdk_unlikely(bdev_io->in_submit_request || bdev_io->io_submit_ch)) {
2123 		/*
2124 		 * Send the completion to the thread that originally submitted the I/O,
2125 		 * which may not be the current thread in the case of QoS.
2126 		 */
2127 		if (bdev_io->io_submit_ch) {
2128 			bdev_io->ch = bdev_io->io_submit_ch;
2129 			bdev_io->io_submit_ch = NULL;
2130 		}
2131 
2132 		/*
2133 		 * Defer completion to avoid potential infinite recursion if the
2134 		 * user's completion callback issues a new I/O.
2135 		 */
2136 		spdk_thread_send_msg(spdk_io_channel_get_thread(bdev_io->ch->channel),
2137 				     _spdk_bdev_io_complete, bdev_io);
2138 		return;
2139 	}
2140 
2141 	assert(bdev_io->cb != NULL);
2142 	assert(spdk_get_thread() == spdk_io_channel_get_thread(bdev_io->ch->channel));
2143 
2144 	bdev_io->cb(bdev_io, bdev_io->status == SPDK_BDEV_IO_STATUS_SUCCESS,
2145 		    bdev_io->caller_ctx);
2146 }
2147 
2148 static void
2149 _spdk_bdev_unfreeze_qos_channel(void *ctx)
2150 {
2151 	struct spdk_bdev	*bdev = ctx;
2152 
2153 	if (bdev->qos_channel) {
2154 		bdev->qos_channel->flags &= ~BDEV_CH_RESET_IN_PROGRESS;
2155 		assert(TAILQ_EMPTY(&bdev->qos_channel->queued_resets));
2156 	}
2157 }
2158 
2159 static void
2160 _spdk_bdev_reset_complete(struct spdk_io_channel_iter *i, int status)
2161 {
2162 	struct spdk_bdev_io *bdev_io = spdk_io_channel_iter_get_ctx(i);
2163 
2164 	if (bdev_io->u.reset.ch_ref != NULL) {
2165 		spdk_put_io_channel(bdev_io->u.reset.ch_ref);
2166 		bdev_io->u.reset.ch_ref = NULL;
2167 	}
2168 
2169 	_spdk_bdev_io_complete(bdev_io);
2170 }
2171 
2172 static void
2173 _spdk_bdev_unfreeze_channel(struct spdk_io_channel_iter *i)
2174 {
2175 	struct spdk_io_channel *_ch = spdk_io_channel_iter_get_channel(i);
2176 	struct spdk_bdev_channel *ch = spdk_io_channel_get_ctx(_ch);
2177 
2178 	ch->flags &= ~BDEV_CH_RESET_IN_PROGRESS;
2179 	if (!TAILQ_EMPTY(&ch->queued_resets)) {
2180 		_spdk_bdev_channel_start_reset(ch);
2181 	}
2182 
2183 	spdk_for_each_channel_continue(i, 0);
2184 }
2185 
2186 void
2187 spdk_bdev_io_complete(struct spdk_bdev_io *bdev_io, enum spdk_bdev_io_status status)
2188 {
2189 	struct spdk_bdev *bdev = bdev_io->bdev;
2190 	struct spdk_bdev_channel *bdev_ch = bdev_io->ch;
2191 	struct spdk_bdev_module_channel	*module_ch = bdev_ch->module_ch;
2192 
2193 	bdev_io->status = status;
2194 
2195 	if (spdk_unlikely(bdev_io->type == SPDK_BDEV_IO_TYPE_RESET)) {
2196 		bool unlock_channels = false;
2197 
2198 		if (status == SPDK_BDEV_IO_STATUS_NOMEM) {
2199 			SPDK_ERRLOG("NOMEM returned for reset\n");
2200 		}
2201 		pthread_mutex_lock(&bdev->mutex);
2202 		if (bdev_io == bdev->reset_in_progress) {
2203 			bdev->reset_in_progress = NULL;
2204 			unlock_channels = true;
2205 		}
2206 		pthread_mutex_unlock(&bdev->mutex);
2207 
2208 		if (unlock_channels) {
2209 			/* Explicitly handle the QoS bdev channel as no IO channel associated */
2210 			if (bdev->qos_thread) {
2211 				spdk_thread_send_msg(bdev->qos_thread,
2212 						     _spdk_bdev_unfreeze_qos_channel, bdev);
2213 			}
2214 
2215 			spdk_for_each_channel(__bdev_to_io_dev(bdev), _spdk_bdev_unfreeze_channel,
2216 					      bdev_io, _spdk_bdev_reset_complete);
2217 			return;
2218 		}
2219 	} else {
2220 		assert(bdev_ch->io_outstanding > 0);
2221 		assert(module_ch->io_outstanding > 0);
2222 		bdev_ch->io_outstanding--;
2223 		module_ch->io_outstanding--;
2224 
2225 		if (spdk_unlikely(status == SPDK_BDEV_IO_STATUS_NOMEM)) {
2226 			TAILQ_INSERT_HEAD(&module_ch->nomem_io, bdev_io, link);
2227 			/*
2228 			 * Wait for some of the outstanding I/O to complete before we
2229 			 *  retry any of the nomem_io.  Normally we will wait for
2230 			 *  NOMEM_THRESHOLD_COUNT I/O to complete but for low queue
2231 			 *  depth channels we will instead wait for half to complete.
2232 			 */
2233 			module_ch->nomem_threshold = spdk_max((int64_t)module_ch->io_outstanding / 2,
2234 							      (int64_t)module_ch->io_outstanding - NOMEM_THRESHOLD_COUNT);
2235 			return;
2236 		}
2237 
2238 		if (spdk_unlikely(!TAILQ_EMPTY(&module_ch->nomem_io))) {
2239 			_spdk_bdev_ch_retry_io(bdev_ch);
2240 		}
2241 	}
2242 
2243 	if (status == SPDK_BDEV_IO_STATUS_SUCCESS) {
2244 		switch (bdev_io->type) {
2245 		case SPDK_BDEV_IO_TYPE_READ:
2246 			bdev_ch->stat.bytes_read += bdev_io->u.bdev.num_blocks * bdev->blocklen;
2247 			bdev_ch->stat.num_read_ops++;
2248 			bdev_ch->stat.read_latency_ticks += (spdk_get_ticks() - bdev_io->submit_tsc);
2249 			break;
2250 		case SPDK_BDEV_IO_TYPE_WRITE:
2251 			bdev_ch->stat.bytes_written += bdev_io->u.bdev.num_blocks * bdev->blocklen;
2252 			bdev_ch->stat.num_write_ops++;
2253 			bdev_ch->stat.write_latency_ticks += (spdk_get_ticks() - bdev_io->submit_tsc);
2254 			break;
2255 		default:
2256 			break;
2257 		}
2258 	}
2259 
2260 #ifdef SPDK_CONFIG_VTUNE
2261 	uint64_t now_tsc = spdk_get_ticks();
2262 	if (now_tsc > (bdev_ch->start_tsc + bdev_ch->interval_tsc)) {
2263 		uint64_t data[5];
2264 
2265 		data[0] = bdev_ch->stat.num_read_ops;
2266 		data[1] = bdev_ch->stat.bytes_read;
2267 		data[2] = bdev_ch->stat.num_write_ops;
2268 		data[3] = bdev_ch->stat.bytes_written;
2269 		data[4] = bdev->fn_table->get_spin_time ?
2270 			  bdev->fn_table->get_spin_time(bdev_ch->channel) : 0;
2271 
2272 		__itt_metadata_add(g_bdev_mgr.domain, __itt_null, bdev_ch->handle,
2273 				   __itt_metadata_u64, 5, data);
2274 
2275 		memset(&bdev_ch->stat, 0, sizeof(bdev_ch->stat));
2276 		bdev_ch->start_tsc = now_tsc;
2277 	}
2278 #endif
2279 
2280 	_spdk_bdev_io_complete(bdev_io);
2281 }
2282 
2283 void
2284 spdk_bdev_io_complete_scsi_status(struct spdk_bdev_io *bdev_io, enum spdk_scsi_status sc,
2285 				  enum spdk_scsi_sense sk, uint8_t asc, uint8_t ascq)
2286 {
2287 	if (sc == SPDK_SCSI_STATUS_GOOD) {
2288 		bdev_io->status = SPDK_BDEV_IO_STATUS_SUCCESS;
2289 	} else {
2290 		bdev_io->status = SPDK_BDEV_IO_STATUS_SCSI_ERROR;
2291 		bdev_io->error.scsi.sc = sc;
2292 		bdev_io->error.scsi.sk = sk;
2293 		bdev_io->error.scsi.asc = asc;
2294 		bdev_io->error.scsi.ascq = ascq;
2295 	}
2296 
2297 	spdk_bdev_io_complete(bdev_io, bdev_io->status);
2298 }
2299 
2300 void
2301 spdk_bdev_io_get_scsi_status(const struct spdk_bdev_io *bdev_io,
2302 			     int *sc, int *sk, int *asc, int *ascq)
2303 {
2304 	assert(sc != NULL);
2305 	assert(sk != NULL);
2306 	assert(asc != NULL);
2307 	assert(ascq != NULL);
2308 
2309 	switch (bdev_io->status) {
2310 	case SPDK_BDEV_IO_STATUS_SUCCESS:
2311 		*sc = SPDK_SCSI_STATUS_GOOD;
2312 		*sk = SPDK_SCSI_SENSE_NO_SENSE;
2313 		*asc = SPDK_SCSI_ASC_NO_ADDITIONAL_SENSE;
2314 		*ascq = SPDK_SCSI_ASCQ_CAUSE_NOT_REPORTABLE;
2315 		break;
2316 	case SPDK_BDEV_IO_STATUS_NVME_ERROR:
2317 		spdk_scsi_nvme_translate(bdev_io, sc, sk, asc, ascq);
2318 		break;
2319 	case SPDK_BDEV_IO_STATUS_SCSI_ERROR:
2320 		*sc = bdev_io->error.scsi.sc;
2321 		*sk = bdev_io->error.scsi.sk;
2322 		*asc = bdev_io->error.scsi.asc;
2323 		*ascq = bdev_io->error.scsi.ascq;
2324 		break;
2325 	default:
2326 		*sc = SPDK_SCSI_STATUS_CHECK_CONDITION;
2327 		*sk = SPDK_SCSI_SENSE_ABORTED_COMMAND;
2328 		*asc = SPDK_SCSI_ASC_NO_ADDITIONAL_SENSE;
2329 		*ascq = SPDK_SCSI_ASCQ_CAUSE_NOT_REPORTABLE;
2330 		break;
2331 	}
2332 }
2333 
2334 void
2335 spdk_bdev_io_complete_nvme_status(struct spdk_bdev_io *bdev_io, int sct, int sc)
2336 {
2337 	if (sct == SPDK_NVME_SCT_GENERIC && sc == SPDK_NVME_SC_SUCCESS) {
2338 		bdev_io->status = SPDK_BDEV_IO_STATUS_SUCCESS;
2339 	} else {
2340 		bdev_io->error.nvme.sct = sct;
2341 		bdev_io->error.nvme.sc = sc;
2342 		bdev_io->status = SPDK_BDEV_IO_STATUS_NVME_ERROR;
2343 	}
2344 
2345 	spdk_bdev_io_complete(bdev_io, bdev_io->status);
2346 }
2347 
2348 void
2349 spdk_bdev_io_get_nvme_status(const struct spdk_bdev_io *bdev_io, int *sct, int *sc)
2350 {
2351 	assert(sct != NULL);
2352 	assert(sc != NULL);
2353 
2354 	if (bdev_io->status == SPDK_BDEV_IO_STATUS_NVME_ERROR) {
2355 		*sct = bdev_io->error.nvme.sct;
2356 		*sc = bdev_io->error.nvme.sc;
2357 	} else if (bdev_io->status == SPDK_BDEV_IO_STATUS_SUCCESS) {
2358 		*sct = SPDK_NVME_SCT_GENERIC;
2359 		*sc = SPDK_NVME_SC_SUCCESS;
2360 	} else {
2361 		*sct = SPDK_NVME_SCT_GENERIC;
2362 		*sc = SPDK_NVME_SC_INTERNAL_DEVICE_ERROR;
2363 	}
2364 }
2365 
2366 struct spdk_thread *
2367 spdk_bdev_io_get_thread(struct spdk_bdev_io *bdev_io)
2368 {
2369 	return spdk_io_channel_get_thread(bdev_io->ch->channel);
2370 }
2371 
2372 static void
2373 _spdk_bdev_qos_config(struct spdk_bdev *bdev)
2374 {
2375 	struct spdk_conf_section	*sp = NULL;
2376 	const char			*val = NULL;
2377 	int				ios_per_sec = 0;
2378 	int				i = 0;
2379 
2380 	sp = spdk_conf_find_section(NULL, "QoS");
2381 	if (!sp) {
2382 		return;
2383 	}
2384 
2385 	while (true) {
2386 		val = spdk_conf_section_get_nmval(sp, "Limit_IOPS", i, 0);
2387 		if (!val) {
2388 			break;
2389 		}
2390 
2391 		if (strcmp(bdev->name, val) != 0) {
2392 			i++;
2393 			continue;
2394 		}
2395 
2396 		val = spdk_conf_section_get_nmval(sp, "Limit_IOPS", i, 1);
2397 		if (!val) {
2398 			return;
2399 		}
2400 
2401 		ios_per_sec = (int)strtol(val, NULL, 10);
2402 		if (ios_per_sec > 0) {
2403 			if (ios_per_sec % SPDK_BDEV_QOS_MIN_IOS_PER_SEC) {
2404 				SPDK_ERRLOG("Assigned IOPS %u on bdev %s is not multiple of %u\n",
2405 					    ios_per_sec, bdev->name, SPDK_BDEV_QOS_MIN_IOS_PER_SEC);
2406 				SPDK_ERRLOG("Failed to enable QoS on this bdev %s\n", bdev->name);
2407 			} else {
2408 				bdev->ios_per_sec = (uint64_t)ios_per_sec;
2409 				SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Bdev:%s QoS:%lu\n",
2410 					      bdev->name, bdev->ios_per_sec);
2411 			}
2412 		}
2413 
2414 		return;
2415 	}
2416 }
2417 
2418 static int
2419 spdk_bdev_init(struct spdk_bdev *bdev)
2420 {
2421 	assert(bdev->module != NULL);
2422 
2423 	if (!bdev->name) {
2424 		SPDK_ERRLOG("Bdev name is NULL\n");
2425 		return -EINVAL;
2426 	}
2427 
2428 	if (spdk_bdev_get_by_name(bdev->name)) {
2429 		SPDK_ERRLOG("Bdev name:%s already exists\n", bdev->name);
2430 		return -EEXIST;
2431 	}
2432 
2433 	bdev->status = SPDK_BDEV_STATUS_READY;
2434 
2435 	TAILQ_INIT(&bdev->open_descs);
2436 
2437 	TAILQ_INIT(&bdev->aliases);
2438 
2439 	bdev->reset_in_progress = NULL;
2440 
2441 	_spdk_bdev_qos_config(bdev);
2442 
2443 	spdk_io_device_register(__bdev_to_io_dev(bdev),
2444 				spdk_bdev_channel_create, spdk_bdev_channel_destroy,
2445 				sizeof(struct spdk_bdev_channel));
2446 
2447 	pthread_mutex_init(&bdev->mutex, NULL);
2448 	return 0;
2449 }
2450 
2451 static void
2452 spdk_bdev_destroy_cb(void *io_device)
2453 {
2454 	int			rc;
2455 	struct spdk_bdev	*bdev;
2456 	spdk_bdev_unregister_cb	cb_fn;
2457 	void			*cb_arg;
2458 
2459 	bdev = __bdev_from_io_dev(io_device);
2460 	cb_fn = bdev->unregister_cb;
2461 	cb_arg = bdev->unregister_ctx;
2462 
2463 	rc = bdev->fn_table->destruct(bdev->ctxt);
2464 	if (rc < 0) {
2465 		SPDK_ERRLOG("destruct failed\n");
2466 	}
2467 	if (rc <= 0 && cb_fn != NULL) {
2468 		cb_fn(cb_arg, rc);
2469 	}
2470 }
2471 
2472 
2473 static void
2474 spdk_bdev_fini(struct spdk_bdev *bdev)
2475 {
2476 	pthread_mutex_destroy(&bdev->mutex);
2477 
2478 	spdk_io_device_unregister(__bdev_to_io_dev(bdev), spdk_bdev_destroy_cb);
2479 }
2480 
2481 static void
2482 spdk_bdev_start(struct spdk_bdev *bdev)
2483 {
2484 	struct spdk_bdev_module *module;
2485 
2486 	SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Inserting bdev %s into list\n", bdev->name);
2487 	TAILQ_INSERT_TAIL(&g_bdev_mgr.bdevs, bdev, link);
2488 
2489 	TAILQ_FOREACH(module, &g_bdev_mgr.bdev_modules, tailq) {
2490 		if (module->examine) {
2491 			module->action_in_progress++;
2492 			module->examine(bdev);
2493 		}
2494 	}
2495 }
2496 
2497 int
2498 spdk_bdev_register(struct spdk_bdev *bdev)
2499 {
2500 	int rc = spdk_bdev_init(bdev);
2501 
2502 	if (rc == 0) {
2503 		spdk_bdev_start(bdev);
2504 	}
2505 
2506 	return rc;
2507 }
2508 
2509 static void
2510 spdk_vbdev_remove_base_bdevs(struct spdk_bdev *vbdev)
2511 {
2512 	struct spdk_bdev **bdevs;
2513 	struct spdk_bdev *base;
2514 	size_t i, j, k;
2515 	bool found;
2516 
2517 	/* Iterate over base bdevs to remove vbdev from them. */
2518 	for (i = 0; i < vbdev->base_bdevs_cnt; i++) {
2519 		found = false;
2520 		base = vbdev->base_bdevs[i];
2521 
2522 		for (j = 0; j < base->vbdevs_cnt; j++) {
2523 			if (base->vbdevs[j] != vbdev) {
2524 				continue;
2525 			}
2526 
2527 			for (k = j; k + 1 < base->vbdevs_cnt; k++) {
2528 				base->vbdevs[k] = base->vbdevs[k + 1];
2529 			}
2530 
2531 			base->vbdevs_cnt--;
2532 			if (base->vbdevs_cnt > 0) {
2533 				bdevs = realloc(base->vbdevs, base->vbdevs_cnt * sizeof(bdevs[0]));
2534 				/* It would be odd if shrinking memory block fail. */
2535 				assert(bdevs);
2536 				base->vbdevs = bdevs;
2537 			} else {
2538 				free(base->vbdevs);
2539 				base->vbdevs = NULL;
2540 			}
2541 
2542 			found = true;
2543 			break;
2544 		}
2545 
2546 		if (!found) {
2547 			SPDK_WARNLOG("Bdev '%s' is not base bdev of '%s'.\n", base->name, vbdev->name);
2548 		}
2549 	}
2550 
2551 	free(vbdev->base_bdevs);
2552 	vbdev->base_bdevs = NULL;
2553 	vbdev->base_bdevs_cnt = 0;
2554 }
2555 
2556 static int
2557 spdk_vbdev_set_base_bdevs(struct spdk_bdev *vbdev, struct spdk_bdev **base_bdevs, size_t cnt)
2558 {
2559 	struct spdk_bdev **vbdevs;
2560 	struct spdk_bdev *base;
2561 	size_t i;
2562 
2563 	/* Adding base bdevs isn't supported (yet?). */
2564 	assert(vbdev->base_bdevs_cnt == 0);
2565 
2566 	vbdev->base_bdevs = malloc(cnt * sizeof(vbdev->base_bdevs[0]));
2567 	if (!vbdev->base_bdevs) {
2568 		SPDK_ERRLOG("%s - realloc() failed\n", vbdev->name);
2569 		return -ENOMEM;
2570 	}
2571 
2572 	memcpy(vbdev->base_bdevs, base_bdevs, cnt * sizeof(vbdev->base_bdevs[0]));
2573 	vbdev->base_bdevs_cnt = cnt;
2574 
2575 	/* Iterate over base bdevs to add this vbdev to them. */
2576 	for (i = 0; i < cnt; i++) {
2577 		base = vbdev->base_bdevs[i];
2578 
2579 		assert(base != NULL);
2580 		assert(base->claim_module != NULL);
2581 
2582 		vbdevs = realloc(base->vbdevs, (base->vbdevs_cnt + 1) * sizeof(vbdevs[0]));
2583 		if (!vbdevs) {
2584 			SPDK_ERRLOG("%s - realloc() failed\n", base->name);
2585 			spdk_vbdev_remove_base_bdevs(vbdev);
2586 			return -ENOMEM;
2587 		}
2588 
2589 		vbdevs[base->vbdevs_cnt] = vbdev;
2590 		base->vbdevs = vbdevs;
2591 		base->vbdevs_cnt++;
2592 	}
2593 
2594 	return 0;
2595 }
2596 
2597 int
2598 spdk_vbdev_register(struct spdk_bdev *vbdev, struct spdk_bdev **base_bdevs, int base_bdev_count)
2599 {
2600 	int rc;
2601 
2602 	rc = spdk_bdev_init(vbdev);
2603 	if (rc) {
2604 		return rc;
2605 	}
2606 
2607 	if (base_bdev_count == 0) {
2608 		spdk_bdev_start(vbdev);
2609 		return 0;
2610 	}
2611 
2612 	rc = spdk_vbdev_set_base_bdevs(vbdev, base_bdevs, base_bdev_count);
2613 	if (rc) {
2614 		spdk_bdev_fini(vbdev);
2615 		return rc;
2616 	}
2617 
2618 	spdk_bdev_start(vbdev);
2619 	return 0;
2620 
2621 }
2622 
2623 void
2624 spdk_bdev_destruct_done(struct spdk_bdev *bdev, int bdeverrno)
2625 {
2626 	if (bdev->unregister_cb != NULL) {
2627 		bdev->unregister_cb(bdev->unregister_ctx, bdeverrno);
2628 	}
2629 }
2630 
2631 static void
2632 _remove_notify(void *arg)
2633 {
2634 	struct spdk_bdev_desc *desc = arg;
2635 
2636 	desc->remove_cb(desc->remove_ctx);
2637 }
2638 
2639 void
2640 spdk_bdev_unregister(struct spdk_bdev *bdev, spdk_bdev_unregister_cb cb_fn, void *cb_arg)
2641 {
2642 	struct spdk_bdev_desc	*desc, *tmp;
2643 	bool			do_destruct = true;
2644 
2645 	SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Removing bdev %s from list\n", bdev->name);
2646 
2647 	pthread_mutex_lock(&bdev->mutex);
2648 
2649 	spdk_vbdev_remove_base_bdevs(bdev);
2650 
2651 	bdev->status = SPDK_BDEV_STATUS_REMOVING;
2652 	bdev->unregister_cb = cb_fn;
2653 	bdev->unregister_ctx = cb_arg;
2654 
2655 	TAILQ_FOREACH_SAFE(desc, &bdev->open_descs, link, tmp) {
2656 		if (desc->remove_cb) {
2657 			do_destruct = false;
2658 			/*
2659 			 * Defer invocation of the remove_cb to a separate message that will
2660 			 *  run later on this thread.  This ensures this context unwinds and
2661 			 *  we don't recursively unregister this bdev again if the remove_cb
2662 			 *  immediately closes its descriptor.
2663 			 */
2664 			spdk_thread_send_msg(spdk_get_thread(), _remove_notify, desc);
2665 		}
2666 	}
2667 
2668 	if (!do_destruct) {
2669 		pthread_mutex_unlock(&bdev->mutex);
2670 		return;
2671 	}
2672 
2673 	TAILQ_REMOVE(&g_bdev_mgr.bdevs, bdev, link);
2674 	pthread_mutex_unlock(&bdev->mutex);
2675 
2676 	spdk_bdev_fini(bdev);
2677 }
2678 
2679 int
2680 spdk_bdev_open(struct spdk_bdev *bdev, bool write, spdk_bdev_remove_cb_t remove_cb,
2681 	       void *remove_ctx, struct spdk_bdev_desc **_desc)
2682 {
2683 	struct spdk_bdev_desc *desc;
2684 
2685 	desc = calloc(1, sizeof(*desc));
2686 	if (desc == NULL) {
2687 		SPDK_ERRLOG("Failed to allocate memory for bdev descriptor\n");
2688 		return -ENOMEM;
2689 	}
2690 
2691 	pthread_mutex_lock(&bdev->mutex);
2692 
2693 	if (write && bdev->claim_module) {
2694 		SPDK_INFOLOG(SPDK_LOG_BDEV, "Could not open %s - already claimed\n", bdev->name);
2695 		free(desc);
2696 		pthread_mutex_unlock(&bdev->mutex);
2697 		return -EPERM;
2698 	}
2699 
2700 	TAILQ_INSERT_TAIL(&bdev->open_descs, desc, link);
2701 
2702 	desc->bdev = bdev;
2703 	desc->remove_cb = remove_cb;
2704 	desc->remove_ctx = remove_ctx;
2705 	desc->write = write;
2706 	*_desc = desc;
2707 
2708 	pthread_mutex_unlock(&bdev->mutex);
2709 
2710 	return 0;
2711 }
2712 
2713 void
2714 spdk_bdev_close(struct spdk_bdev_desc *desc)
2715 {
2716 	struct spdk_bdev *bdev = desc->bdev;
2717 	bool do_unregister = false;
2718 
2719 	pthread_mutex_lock(&bdev->mutex);
2720 
2721 	TAILQ_REMOVE(&bdev->open_descs, desc, link);
2722 	free(desc);
2723 
2724 	if (bdev->status == SPDK_BDEV_STATUS_REMOVING && TAILQ_EMPTY(&bdev->open_descs)) {
2725 		do_unregister = true;
2726 	}
2727 	pthread_mutex_unlock(&bdev->mutex);
2728 
2729 	if (do_unregister == true) {
2730 		spdk_bdev_unregister(bdev, bdev->unregister_cb, bdev->unregister_ctx);
2731 	}
2732 }
2733 
2734 int
2735 spdk_bdev_module_claim_bdev(struct spdk_bdev *bdev, struct spdk_bdev_desc *desc,
2736 			    struct spdk_bdev_module *module)
2737 {
2738 	if (bdev->claim_module != NULL) {
2739 		SPDK_ERRLOG("bdev %s already claimed by module %s\n", bdev->name,
2740 			    bdev->claim_module->name);
2741 		return -EPERM;
2742 	}
2743 
2744 	if (desc && !desc->write) {
2745 		desc->write = true;
2746 	}
2747 
2748 	bdev->claim_module = module;
2749 	return 0;
2750 }
2751 
2752 void
2753 spdk_bdev_module_release_bdev(struct spdk_bdev *bdev)
2754 {
2755 	assert(bdev->claim_module != NULL);
2756 	bdev->claim_module = NULL;
2757 }
2758 
2759 struct spdk_bdev *
2760 spdk_bdev_desc_get_bdev(struct spdk_bdev_desc *desc)
2761 {
2762 	return desc->bdev;
2763 }
2764 
2765 void
2766 spdk_bdev_io_get_iovec(struct spdk_bdev_io *bdev_io, struct iovec **iovp, int *iovcntp)
2767 {
2768 	struct iovec *iovs;
2769 	int iovcnt;
2770 
2771 	if (bdev_io == NULL) {
2772 		return;
2773 	}
2774 
2775 	switch (bdev_io->type) {
2776 	case SPDK_BDEV_IO_TYPE_READ:
2777 		iovs = bdev_io->u.bdev.iovs;
2778 		iovcnt = bdev_io->u.bdev.iovcnt;
2779 		break;
2780 	case SPDK_BDEV_IO_TYPE_WRITE:
2781 		iovs = bdev_io->u.bdev.iovs;
2782 		iovcnt = bdev_io->u.bdev.iovcnt;
2783 		break;
2784 	default:
2785 		iovs = NULL;
2786 		iovcnt = 0;
2787 		break;
2788 	}
2789 
2790 	if (iovp) {
2791 		*iovp = iovs;
2792 	}
2793 	if (iovcntp) {
2794 		*iovcntp = iovcnt;
2795 	}
2796 }
2797 
2798 void
2799 spdk_bdev_module_list_add(struct spdk_bdev_module *bdev_module)
2800 {
2801 
2802 	if (spdk_bdev_module_list_find(bdev_module->name)) {
2803 		fprintf(stderr, "ERROR: module '%s' already registered.\n", bdev_module->name);
2804 		assert(false);
2805 	}
2806 
2807 	if (bdev_module->async_init) {
2808 		bdev_module->action_in_progress = 1;
2809 	}
2810 
2811 	/*
2812 	 * Modules with examine callbacks must be initialized first, so they are
2813 	 *  ready to handle examine callbacks from later modules that will
2814 	 *  register physical bdevs.
2815 	 */
2816 	if (bdev_module->examine != NULL) {
2817 		TAILQ_INSERT_HEAD(&g_bdev_mgr.bdev_modules, bdev_module, tailq);
2818 	} else {
2819 		TAILQ_INSERT_TAIL(&g_bdev_mgr.bdev_modules, bdev_module, tailq);
2820 	}
2821 }
2822 
2823 struct spdk_bdev_module *
2824 spdk_bdev_module_list_find(const char *name)
2825 {
2826 	struct spdk_bdev_module *bdev_module;
2827 
2828 	TAILQ_FOREACH(bdev_module, &g_bdev_mgr.bdev_modules, tailq) {
2829 		if (strcmp(name, bdev_module->name) == 0) {
2830 			break;
2831 		}
2832 	}
2833 
2834 	return bdev_module;
2835 }
2836 
2837 static void
2838 spdk_bdev_write_zeroes_split(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg)
2839 {
2840 	uint64_t len;
2841 
2842 	if (!success) {
2843 		bdev_io->cb = bdev_io->u.bdev.stored_user_cb;
2844 		_spdk_bdev_io_complete(bdev_io);
2845 		return;
2846 	}
2847 
2848 	/* no need to perform the error checking from write_zeroes_blocks because this request already passed those checks. */
2849 	len = spdk_min(spdk_bdev_get_block_size(bdev_io->bdev) * bdev_io->u.bdev.split_remaining_num_blocks,
2850 		       ZERO_BUFFER_SIZE);
2851 
2852 	bdev_io->u.bdev.offset_blocks = bdev_io->u.bdev.split_current_offset_blocks;
2853 	bdev_io->u.bdev.iov.iov_len = len;
2854 	bdev_io->u.bdev.num_blocks = len / spdk_bdev_get_block_size(bdev_io->bdev);
2855 	bdev_io->u.bdev.split_remaining_num_blocks -= bdev_io->u.bdev.num_blocks;
2856 	bdev_io->u.bdev.split_current_offset_blocks += bdev_io->u.bdev.num_blocks;
2857 
2858 	/* if this round completes the i/o, change the callback to be the original user callback */
2859 	if (bdev_io->u.bdev.split_remaining_num_blocks == 0) {
2860 		spdk_bdev_io_init(bdev_io, bdev_io->bdev, cb_arg, bdev_io->u.bdev.stored_user_cb);
2861 	} else {
2862 		spdk_bdev_io_init(bdev_io, bdev_io->bdev, cb_arg, spdk_bdev_write_zeroes_split);
2863 	}
2864 	spdk_bdev_io_submit(bdev_io);
2865 }
2866 
2867 SPDK_LOG_REGISTER_COMPONENT("bdev", SPDK_LOG_BDEV)
2868