xref: /spdk/lib/bdev/bdev.c (revision c4fee1e970ab48f45848f3653a024382dfd9cdfa)
1 /*-
2  *   BSD LICENSE
3  *
4  *   Copyright (c) Intel Corporation.
5  *   All rights reserved.
6  *
7  *   Redistribution and use in source and binary forms, with or without
8  *   modification, are permitted provided that the following conditions
9  *   are met:
10  *
11  *     * Redistributions of source code must retain the above copyright
12  *       notice, this list of conditions and the following disclaimer.
13  *     * Redistributions in binary form must reproduce the above copyright
14  *       notice, this list of conditions and the following disclaimer in
15  *       the documentation and/or other materials provided with the
16  *       distribution.
17  *     * Neither the name of Intel Corporation nor the names of its
18  *       contributors may be used to endorse or promote products derived
19  *       from this software without specific prior written permission.
20  *
21  *   THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22  *   "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23  *   LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
24  *   A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
25  *   OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
26  *   SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
27  *   LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
28  *   DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
29  *   THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
30  *   (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
31  *   OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
32  */
33 
34 #include "spdk/stdinc.h"
35 
36 #include "spdk/bdev.h"
37 #include "spdk/conf.h"
38 
39 #include "spdk/config.h"
40 #include "spdk/env.h"
41 #include "spdk/event.h"
42 #include "spdk/thread.h"
43 #include "spdk/likely.h"
44 #include "spdk/queue.h"
45 #include "spdk/nvme_spec.h"
46 #include "spdk/scsi_spec.h"
47 #include "spdk/util.h"
48 #include "spdk/trace.h"
49 
50 #include "spdk/bdev_module.h"
51 #include "spdk_internal/log.h"
52 #include "spdk/string.h"
53 
54 #ifdef SPDK_CONFIG_VTUNE
55 #include "ittnotify.h"
56 #include "ittnotify_types.h"
57 int __itt_init_ittlib(const char *, __itt_group_id);
58 #endif
59 
60 #define SPDK_BDEV_IO_POOL_SIZE			(64 * 1024)
61 #define SPDK_BDEV_IO_CACHE_SIZE			256
62 #define BUF_SMALL_POOL_SIZE			8192
63 #define BUF_LARGE_POOL_SIZE			1024
64 #define NOMEM_THRESHOLD_COUNT			8
65 #define ZERO_BUFFER_SIZE			0x100000
66 
67 #define OWNER_BDEV		0x2
68 
69 #define OBJECT_BDEV_IO		0x2
70 
71 #define TRACE_GROUP_BDEV	0x3
72 #define TRACE_BDEV_IO_START	SPDK_TPOINT_ID(TRACE_GROUP_BDEV, 0x0)
73 #define TRACE_BDEV_IO_DONE	SPDK_TPOINT_ID(TRACE_GROUP_BDEV, 0x1)
74 
75 #define SPDK_BDEV_QOS_TIMESLICE_IN_USEC		1000
76 #define SPDK_BDEV_QOS_MIN_IO_PER_TIMESLICE	1
77 #define SPDK_BDEV_QOS_MIN_BYTE_PER_TIMESLICE	512
78 #define SPDK_BDEV_QOS_MIN_IOS_PER_SEC		10000
79 #define SPDK_BDEV_QOS_MIN_BYTES_PER_SEC		(10 * 1024 * 1024)
80 #define SPDK_BDEV_QOS_LIMIT_NOT_DEFINED		UINT64_MAX
81 
82 static const char *qos_conf_type[] = {"Limit_IOPS", "Limit_BPS"};
83 static const char *qos_rpc_type[] = {"qos_ios_per_sec"};
84 
85 TAILQ_HEAD(spdk_bdev_list, spdk_bdev);
86 
87 struct spdk_bdev_mgr {
88 	struct spdk_mempool *bdev_io_pool;
89 
90 	struct spdk_mempool *buf_small_pool;
91 	struct spdk_mempool *buf_large_pool;
92 
93 	void *zero_buffer;
94 
95 	TAILQ_HEAD(bdev_module_list, spdk_bdev_module) bdev_modules;
96 
97 	struct spdk_bdev_list bdevs;
98 
99 	bool init_complete;
100 	bool module_init_complete;
101 
102 #ifdef SPDK_CONFIG_VTUNE
103 	__itt_domain	*domain;
104 #endif
105 };
106 
107 static struct spdk_bdev_mgr g_bdev_mgr = {
108 	.bdev_modules = TAILQ_HEAD_INITIALIZER(g_bdev_mgr.bdev_modules),
109 	.bdevs = TAILQ_HEAD_INITIALIZER(g_bdev_mgr.bdevs),
110 	.init_complete = false,
111 	.module_init_complete = false,
112 };
113 
114 static struct spdk_bdev_opts	g_bdev_opts = {
115 	.bdev_io_pool_size = SPDK_BDEV_IO_POOL_SIZE,
116 	.bdev_io_cache_size = SPDK_BDEV_IO_CACHE_SIZE,
117 };
118 
119 static spdk_bdev_init_cb	g_init_cb_fn = NULL;
120 static void			*g_init_cb_arg = NULL;
121 
122 static spdk_bdev_fini_cb	g_fini_cb_fn = NULL;
123 static void			*g_fini_cb_arg = NULL;
124 static struct spdk_thread	*g_fini_thread = NULL;
125 
126 struct spdk_bdev_qos_limit {
127 	/** IOs or bytes allowed per second (i.e., 1s). */
128 	uint64_t limit;
129 
130 	/** Remaining IOs or bytes allowed in current timeslice (e.g., 1ms).
131 	 *  For remaining bytes, allowed to run negative if an I/O is submitted when
132 	 *  some bytes are remaining, but the I/O is bigger than that amount. The
133 	 *  excess will be deducted from the next timeslice.
134 	 */
135 	int64_t remaining_this_timeslice;
136 
137 	/** Minimum allowed IOs or bytes to be issued in one timeslice (e.g., 1ms). */
138 	uint32_t min_per_timeslice;
139 
140 	/** Maximum allowed IOs or bytes to be issued in one timeslice (e.g., 1ms). */
141 	uint32_t max_per_timeslice;
142 };
143 
144 struct spdk_bdev_qos {
145 	/** Types of structure of rate limits. */
146 	struct spdk_bdev_qos_limit rate_limits[SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES];
147 
148 	/** The channel that all I/O are funneled through. */
149 	struct spdk_bdev_channel *ch;
150 
151 	/** The thread on which the poller is running. */
152 	struct spdk_thread *thread;
153 
154 	/** Queue of I/O waiting to be issued. */
155 	bdev_io_tailq_t queued;
156 
157 	/** Size of a timeslice in tsc ticks. */
158 	uint64_t timeslice_size;
159 
160 	/** Timestamp of start of last timeslice. */
161 	uint64_t last_timeslice;
162 
163 	/** Poller that processes queued I/O commands each time slice. */
164 	struct spdk_poller *poller;
165 };
166 
167 struct spdk_bdev_mgmt_channel {
168 	bdev_io_stailq_t need_buf_small;
169 	bdev_io_stailq_t need_buf_large;
170 
171 	/*
172 	 * Each thread keeps a cache of bdev_io - this allows
173 	 *  bdev threads which are *not* DPDK threads to still
174 	 *  benefit from a per-thread bdev_io cache.  Without
175 	 *  this, non-DPDK threads fetching from the mempool
176 	 *  incur a cmpxchg on get and put.
177 	 */
178 	bdev_io_stailq_t per_thread_cache;
179 	uint32_t	per_thread_cache_count;
180 	uint32_t	bdev_io_cache_size;
181 
182 	TAILQ_HEAD(, spdk_bdev_shared_resource)	shared_resources;
183 	TAILQ_HEAD(, spdk_bdev_io_wait_entry)	io_wait_queue;
184 };
185 
186 /*
187  * Per-module (or per-io_device) data. Multiple bdevs built on the same io_device
188  * will queue here their IO that awaits retry. It makes it possible to retry sending
189  * IO to one bdev after IO from other bdev completes.
190  */
191 struct spdk_bdev_shared_resource {
192 	/* The bdev management channel */
193 	struct spdk_bdev_mgmt_channel *mgmt_ch;
194 
195 	/*
196 	 * Count of I/O submitted to bdev module and waiting for completion.
197 	 * Incremented before submit_request() is called on an spdk_bdev_io.
198 	 */
199 	uint64_t		io_outstanding;
200 
201 	/*
202 	 * Queue of IO awaiting retry because of a previous NOMEM status returned
203 	 *  on this channel.
204 	 */
205 	bdev_io_tailq_t		nomem_io;
206 
207 	/*
208 	 * Threshold which io_outstanding must drop to before retrying nomem_io.
209 	 */
210 	uint64_t		nomem_threshold;
211 
212 	/* I/O channel allocated by a bdev module */
213 	struct spdk_io_channel	*shared_ch;
214 
215 	/* Refcount of bdev channels using this resource */
216 	uint32_t		ref;
217 
218 	TAILQ_ENTRY(spdk_bdev_shared_resource) link;
219 };
220 
221 #define BDEV_CH_RESET_IN_PROGRESS	(1 << 0)
222 #define BDEV_CH_QOS_ENABLED		(1 << 1)
223 
224 struct spdk_bdev_channel {
225 	struct spdk_bdev	*bdev;
226 
227 	/* The channel for the underlying device */
228 	struct spdk_io_channel	*channel;
229 
230 	/* Per io_device per thread data */
231 	struct spdk_bdev_shared_resource *shared_resource;
232 
233 	struct spdk_bdev_io_stat stat;
234 
235 	/*
236 	 * Count of I/O submitted through this channel and waiting for completion.
237 	 * Incremented before submit_request() is called on an spdk_bdev_io.
238 	 */
239 	uint64_t		io_outstanding;
240 
241 	bdev_io_tailq_t		queued_resets;
242 
243 	uint32_t		flags;
244 
245 #ifdef SPDK_CONFIG_VTUNE
246 	uint64_t		start_tsc;
247 	uint64_t		interval_tsc;
248 	__itt_string_handle	*handle;
249 	struct spdk_bdev_io_stat prev_stat;
250 #endif
251 
252 };
253 
254 struct spdk_bdev_desc {
255 	struct spdk_bdev		*bdev;
256 	struct spdk_thread		*thread;
257 	spdk_bdev_remove_cb_t		remove_cb;
258 	void				*remove_ctx;
259 	bool				remove_scheduled;
260 	bool				closed;
261 	bool				write;
262 	TAILQ_ENTRY(spdk_bdev_desc)	link;
263 };
264 
265 struct spdk_bdev_iostat_ctx {
266 	struct spdk_bdev_io_stat *stat;
267 	spdk_bdev_get_device_stat_cb cb;
268 	void *cb_arg;
269 };
270 
271 #define __bdev_to_io_dev(bdev)		(((char *)bdev) + 1)
272 #define __bdev_from_io_dev(io_dev)	((struct spdk_bdev *)(((char *)io_dev) - 1))
273 
274 static void _spdk_bdev_write_zero_buffer_done(struct spdk_bdev_io *bdev_io, bool success,
275 		void *cb_arg);
276 static void _spdk_bdev_write_zero_buffer_next(void *_bdev_io);
277 
278 void
279 spdk_bdev_get_opts(struct spdk_bdev_opts *opts)
280 {
281 	*opts = g_bdev_opts;
282 }
283 
284 int
285 spdk_bdev_set_opts(struct spdk_bdev_opts *opts)
286 {
287 	uint32_t min_pool_size;
288 
289 	/*
290 	 * Add 1 to the thread count to account for the extra mgmt_ch that gets created during subsystem
291 	 *  initialization.  A second mgmt_ch will be created on the same thread when the application starts
292 	 *  but before the deferred put_io_channel event is executed for the first mgmt_ch.
293 	 */
294 	min_pool_size = opts->bdev_io_cache_size * (spdk_thread_get_count() + 1);
295 	if (opts->bdev_io_pool_size < min_pool_size) {
296 		SPDK_ERRLOG("bdev_io_pool_size %" PRIu32 " is not compatible with bdev_io_cache_size %" PRIu32
297 			    " and %" PRIu32 " threads\n", opts->bdev_io_pool_size, opts->bdev_io_cache_size,
298 			    spdk_thread_get_count());
299 		SPDK_ERRLOG("bdev_io_pool_size must be at least %" PRIu32 "\n", min_pool_size);
300 		return -1;
301 	}
302 
303 	g_bdev_opts = *opts;
304 	return 0;
305 }
306 
307 struct spdk_bdev *
308 spdk_bdev_first(void)
309 {
310 	struct spdk_bdev *bdev;
311 
312 	bdev = TAILQ_FIRST(&g_bdev_mgr.bdevs);
313 	if (bdev) {
314 		SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Starting bdev iteration at %s\n", bdev->name);
315 	}
316 
317 	return bdev;
318 }
319 
320 struct spdk_bdev *
321 spdk_bdev_next(struct spdk_bdev *prev)
322 {
323 	struct spdk_bdev *bdev;
324 
325 	bdev = TAILQ_NEXT(prev, internal.link);
326 	if (bdev) {
327 		SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Continuing bdev iteration at %s\n", bdev->name);
328 	}
329 
330 	return bdev;
331 }
332 
333 static struct spdk_bdev *
334 _bdev_next_leaf(struct spdk_bdev *bdev)
335 {
336 	while (bdev != NULL) {
337 		if (bdev->internal.claim_module == NULL) {
338 			return bdev;
339 		} else {
340 			bdev = TAILQ_NEXT(bdev, internal.link);
341 		}
342 	}
343 
344 	return bdev;
345 }
346 
347 struct spdk_bdev *
348 spdk_bdev_first_leaf(void)
349 {
350 	struct spdk_bdev *bdev;
351 
352 	bdev = _bdev_next_leaf(TAILQ_FIRST(&g_bdev_mgr.bdevs));
353 
354 	if (bdev) {
355 		SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Starting bdev iteration at %s\n", bdev->name);
356 	}
357 
358 	return bdev;
359 }
360 
361 struct spdk_bdev *
362 spdk_bdev_next_leaf(struct spdk_bdev *prev)
363 {
364 	struct spdk_bdev *bdev;
365 
366 	bdev = _bdev_next_leaf(TAILQ_NEXT(prev, internal.link));
367 
368 	if (bdev) {
369 		SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Continuing bdev iteration at %s\n", bdev->name);
370 	}
371 
372 	return bdev;
373 }
374 
375 struct spdk_bdev *
376 spdk_bdev_get_by_name(const char *bdev_name)
377 {
378 	struct spdk_bdev_alias *tmp;
379 	struct spdk_bdev *bdev = spdk_bdev_first();
380 
381 	while (bdev != NULL) {
382 		if (strcmp(bdev_name, bdev->name) == 0) {
383 			return bdev;
384 		}
385 
386 		TAILQ_FOREACH(tmp, &bdev->aliases, tailq) {
387 			if (strcmp(bdev_name, tmp->alias) == 0) {
388 				return bdev;
389 			}
390 		}
391 
392 		bdev = spdk_bdev_next(bdev);
393 	}
394 
395 	return NULL;
396 }
397 
398 void
399 spdk_bdev_io_set_buf(struct spdk_bdev_io *bdev_io, void *buf, size_t len)
400 {
401 	struct iovec *iovs;
402 
403 	iovs = bdev_io->u.bdev.iovs;
404 
405 	assert(iovs != NULL);
406 	assert(bdev_io->u.bdev.iovcnt >= 1);
407 
408 	iovs[0].iov_base = buf;
409 	iovs[0].iov_len = len;
410 }
411 
412 static void
413 spdk_bdev_io_put_buf(struct spdk_bdev_io *bdev_io)
414 {
415 	struct spdk_mempool *pool;
416 	struct spdk_bdev_io *tmp;
417 	void *buf, *aligned_buf;
418 	bdev_io_stailq_t *stailq;
419 	struct spdk_bdev_mgmt_channel *ch;
420 
421 	assert(bdev_io->u.bdev.iovcnt == 1);
422 
423 	buf = bdev_io->internal.buf;
424 	ch = bdev_io->internal.ch->shared_resource->mgmt_ch;
425 
426 	bdev_io->internal.buf = NULL;
427 
428 	if (bdev_io->internal.buf_len <= SPDK_BDEV_SMALL_BUF_MAX_SIZE) {
429 		pool = g_bdev_mgr.buf_small_pool;
430 		stailq = &ch->need_buf_small;
431 	} else {
432 		pool = g_bdev_mgr.buf_large_pool;
433 		stailq = &ch->need_buf_large;
434 	}
435 
436 	if (STAILQ_EMPTY(stailq)) {
437 		spdk_mempool_put(pool, buf);
438 	} else {
439 		tmp = STAILQ_FIRST(stailq);
440 
441 		aligned_buf = (void *)(((uintptr_t)buf + 511) & ~511UL);
442 		spdk_bdev_io_set_buf(tmp, aligned_buf, tmp->internal.buf_len);
443 
444 		STAILQ_REMOVE_HEAD(stailq, internal.buf_link);
445 		tmp->internal.buf = buf;
446 		tmp->internal.get_buf_cb(tmp->internal.ch->channel, tmp);
447 	}
448 }
449 
450 void
451 spdk_bdev_io_get_buf(struct spdk_bdev_io *bdev_io, spdk_bdev_io_get_buf_cb cb, uint64_t len)
452 {
453 	struct spdk_mempool *pool;
454 	bdev_io_stailq_t *stailq;
455 	void *buf, *aligned_buf;
456 	struct spdk_bdev_mgmt_channel *mgmt_ch;
457 
458 	assert(cb != NULL);
459 	assert(bdev_io->u.bdev.iovs != NULL);
460 
461 	if (spdk_unlikely(bdev_io->u.bdev.iovs[0].iov_base != NULL)) {
462 		/* Buffer already present */
463 		cb(bdev_io->internal.ch->channel, bdev_io);
464 		return;
465 	}
466 
467 	assert(len <= SPDK_BDEV_LARGE_BUF_MAX_SIZE);
468 	mgmt_ch = bdev_io->internal.ch->shared_resource->mgmt_ch;
469 
470 	bdev_io->internal.buf_len = len;
471 	bdev_io->internal.get_buf_cb = cb;
472 	if (len <= SPDK_BDEV_SMALL_BUF_MAX_SIZE) {
473 		pool = g_bdev_mgr.buf_small_pool;
474 		stailq = &mgmt_ch->need_buf_small;
475 	} else {
476 		pool = g_bdev_mgr.buf_large_pool;
477 		stailq = &mgmt_ch->need_buf_large;
478 	}
479 
480 	buf = spdk_mempool_get(pool);
481 
482 	if (!buf) {
483 		STAILQ_INSERT_TAIL(stailq, bdev_io, internal.buf_link);
484 	} else {
485 		aligned_buf = (void *)(((uintptr_t)buf + 511) & ~511UL);
486 		spdk_bdev_io_set_buf(bdev_io, aligned_buf, len);
487 
488 		bdev_io->internal.buf = buf;
489 		bdev_io->internal.get_buf_cb(bdev_io->internal.ch->channel, bdev_io);
490 	}
491 }
492 
493 static int
494 spdk_bdev_module_get_max_ctx_size(void)
495 {
496 	struct spdk_bdev_module *bdev_module;
497 	int max_bdev_module_size = 0;
498 
499 	TAILQ_FOREACH(bdev_module, &g_bdev_mgr.bdev_modules, internal.tailq) {
500 		if (bdev_module->get_ctx_size && bdev_module->get_ctx_size() > max_bdev_module_size) {
501 			max_bdev_module_size = bdev_module->get_ctx_size();
502 		}
503 	}
504 
505 	return max_bdev_module_size;
506 }
507 
508 void
509 spdk_bdev_config_text(FILE *fp)
510 {
511 	struct spdk_bdev_module *bdev_module;
512 
513 	TAILQ_FOREACH(bdev_module, &g_bdev_mgr.bdev_modules, internal.tailq) {
514 		if (bdev_module->config_text) {
515 			bdev_module->config_text(fp);
516 		}
517 	}
518 }
519 
520 void
521 spdk_bdev_subsystem_config_json(struct spdk_json_write_ctx *w)
522 {
523 	struct spdk_bdev_module *bdev_module;
524 	struct spdk_bdev *bdev;
525 
526 	assert(w != NULL);
527 
528 	spdk_json_write_array_begin(w);
529 
530 	spdk_json_write_object_begin(w);
531 	spdk_json_write_named_string(w, "method", "set_bdev_options");
532 	spdk_json_write_name(w, "params");
533 	spdk_json_write_object_begin(w);
534 	spdk_json_write_named_uint32(w, "bdev_io_pool_size", g_bdev_opts.bdev_io_pool_size);
535 	spdk_json_write_named_uint32(w, "bdev_io_cache_size", g_bdev_opts.bdev_io_cache_size);
536 	spdk_json_write_object_end(w);
537 	spdk_json_write_object_end(w);
538 
539 	TAILQ_FOREACH(bdev_module, &g_bdev_mgr.bdev_modules, internal.tailq) {
540 		if (bdev_module->config_json) {
541 			bdev_module->config_json(w);
542 		}
543 	}
544 
545 	TAILQ_FOREACH(bdev, &g_bdev_mgr.bdevs, internal.link) {
546 		if (bdev->fn_table->write_config_json) {
547 			bdev->fn_table->write_config_json(bdev, w);
548 		}
549 	}
550 
551 	spdk_json_write_array_end(w);
552 }
553 
554 static int
555 spdk_bdev_mgmt_channel_create(void *io_device, void *ctx_buf)
556 {
557 	struct spdk_bdev_mgmt_channel *ch = ctx_buf;
558 	struct spdk_bdev_io *bdev_io;
559 	uint32_t i;
560 
561 	STAILQ_INIT(&ch->need_buf_small);
562 	STAILQ_INIT(&ch->need_buf_large);
563 
564 	STAILQ_INIT(&ch->per_thread_cache);
565 	ch->bdev_io_cache_size = g_bdev_opts.bdev_io_cache_size;
566 
567 	/* Pre-populate bdev_io cache to ensure this thread cannot be starved. */
568 	ch->per_thread_cache_count = 0;
569 	for (i = 0; i < ch->bdev_io_cache_size; i++) {
570 		bdev_io = spdk_mempool_get(g_bdev_mgr.bdev_io_pool);
571 		assert(bdev_io != NULL);
572 		ch->per_thread_cache_count++;
573 		STAILQ_INSERT_TAIL(&ch->per_thread_cache, bdev_io, internal.buf_link);
574 	}
575 
576 	TAILQ_INIT(&ch->shared_resources);
577 	TAILQ_INIT(&ch->io_wait_queue);
578 
579 	return 0;
580 }
581 
582 static void
583 spdk_bdev_mgmt_channel_destroy(void *io_device, void *ctx_buf)
584 {
585 	struct spdk_bdev_mgmt_channel *ch = ctx_buf;
586 	struct spdk_bdev_io *bdev_io;
587 
588 	if (!STAILQ_EMPTY(&ch->need_buf_small) || !STAILQ_EMPTY(&ch->need_buf_large)) {
589 		SPDK_ERRLOG("Pending I/O list wasn't empty on mgmt channel free\n");
590 	}
591 
592 	if (!TAILQ_EMPTY(&ch->shared_resources)) {
593 		SPDK_ERRLOG("Module channel list wasn't empty on mgmt channel free\n");
594 	}
595 
596 	while (!STAILQ_EMPTY(&ch->per_thread_cache)) {
597 		bdev_io = STAILQ_FIRST(&ch->per_thread_cache);
598 		STAILQ_REMOVE_HEAD(&ch->per_thread_cache, internal.buf_link);
599 		ch->per_thread_cache_count--;
600 		spdk_mempool_put(g_bdev_mgr.bdev_io_pool, (void *)bdev_io);
601 	}
602 
603 	assert(ch->per_thread_cache_count == 0);
604 }
605 
606 static void
607 spdk_bdev_init_complete(int rc)
608 {
609 	spdk_bdev_init_cb cb_fn = g_init_cb_fn;
610 	void *cb_arg = g_init_cb_arg;
611 	struct spdk_bdev_module *m;
612 
613 	g_bdev_mgr.init_complete = true;
614 	g_init_cb_fn = NULL;
615 	g_init_cb_arg = NULL;
616 
617 	/*
618 	 * For modules that need to know when subsystem init is complete,
619 	 * inform them now.
620 	 */
621 	if (rc == 0) {
622 		TAILQ_FOREACH(m, &g_bdev_mgr.bdev_modules, internal.tailq) {
623 			if (m->init_complete) {
624 				m->init_complete();
625 			}
626 		}
627 	}
628 
629 	cb_fn(cb_arg, rc);
630 }
631 
632 static void
633 spdk_bdev_module_action_complete(void)
634 {
635 	struct spdk_bdev_module *m;
636 
637 	/*
638 	 * Don't finish bdev subsystem initialization if
639 	 * module pre-initialization is still in progress, or
640 	 * the subsystem been already initialized.
641 	 */
642 	if (!g_bdev_mgr.module_init_complete || g_bdev_mgr.init_complete) {
643 		return;
644 	}
645 
646 	/*
647 	 * Check all bdev modules for inits/examinations in progress. If any
648 	 * exist, return immediately since we cannot finish bdev subsystem
649 	 * initialization until all are completed.
650 	 */
651 	TAILQ_FOREACH(m, &g_bdev_mgr.bdev_modules, internal.tailq) {
652 		if (m->internal.action_in_progress > 0) {
653 			return;
654 		}
655 	}
656 
657 	/*
658 	 * Modules already finished initialization - now that all
659 	 * the bdev modules have finished their asynchronous I/O
660 	 * processing, the entire bdev layer can be marked as complete.
661 	 */
662 	spdk_bdev_init_complete(0);
663 }
664 
665 static void
666 spdk_bdev_module_action_done(struct spdk_bdev_module *module)
667 {
668 	assert(module->internal.action_in_progress > 0);
669 	module->internal.action_in_progress--;
670 	spdk_bdev_module_action_complete();
671 }
672 
673 void
674 spdk_bdev_module_init_done(struct spdk_bdev_module *module)
675 {
676 	spdk_bdev_module_action_done(module);
677 }
678 
679 void
680 spdk_bdev_module_examine_done(struct spdk_bdev_module *module)
681 {
682 	spdk_bdev_module_action_done(module);
683 }
684 
685 /** The last initialized bdev module */
686 static struct spdk_bdev_module *g_resume_bdev_module = NULL;
687 
688 static int
689 spdk_bdev_modules_init(void)
690 {
691 	struct spdk_bdev_module *module;
692 	int rc = 0;
693 
694 	TAILQ_FOREACH(module, &g_bdev_mgr.bdev_modules, internal.tailq) {
695 		g_resume_bdev_module = module;
696 		rc = module->module_init();
697 		if (rc != 0) {
698 			return rc;
699 		}
700 	}
701 
702 	g_resume_bdev_module = NULL;
703 	return 0;
704 }
705 
706 
707 static void
708 spdk_bdev_init_failed_complete(void *cb_arg)
709 {
710 	spdk_bdev_init_complete(-1);
711 }
712 
713 static void
714 spdk_bdev_init_failed(void *cb_arg)
715 {
716 	spdk_bdev_finish(spdk_bdev_init_failed_complete, NULL);
717 }
718 
719 void
720 spdk_bdev_initialize(spdk_bdev_init_cb cb_fn, void *cb_arg)
721 {
722 	struct spdk_conf_section *sp;
723 	struct spdk_bdev_opts bdev_opts;
724 	int32_t bdev_io_pool_size, bdev_io_cache_size;
725 	int cache_size;
726 	int rc = 0;
727 	char mempool_name[32];
728 
729 	assert(cb_fn != NULL);
730 
731 	sp = spdk_conf_find_section(NULL, "Bdev");
732 	if (sp != NULL) {
733 		spdk_bdev_get_opts(&bdev_opts);
734 
735 		bdev_io_pool_size = spdk_conf_section_get_intval(sp, "BdevIoPoolSize");
736 		if (bdev_io_pool_size >= 0) {
737 			bdev_opts.bdev_io_pool_size = bdev_io_pool_size;
738 		}
739 
740 		bdev_io_cache_size = spdk_conf_section_get_intval(sp, "BdevIoCacheSize");
741 		if (bdev_io_cache_size >= 0) {
742 			bdev_opts.bdev_io_cache_size = bdev_io_cache_size;
743 		}
744 
745 		if (spdk_bdev_set_opts(&bdev_opts)) {
746 			spdk_bdev_init_complete(-1);
747 			return;
748 		}
749 
750 		assert(memcmp(&bdev_opts, &g_bdev_opts, sizeof(bdev_opts)) == 0);
751 	}
752 
753 	g_init_cb_fn = cb_fn;
754 	g_init_cb_arg = cb_arg;
755 
756 	snprintf(mempool_name, sizeof(mempool_name), "bdev_io_%d", getpid());
757 
758 	g_bdev_mgr.bdev_io_pool = spdk_mempool_create(mempool_name,
759 				  g_bdev_opts.bdev_io_pool_size,
760 				  sizeof(struct spdk_bdev_io) +
761 				  spdk_bdev_module_get_max_ctx_size(),
762 				  0,
763 				  SPDK_ENV_SOCKET_ID_ANY);
764 
765 	if (g_bdev_mgr.bdev_io_pool == NULL) {
766 		SPDK_ERRLOG("could not allocate spdk_bdev_io pool\n");
767 		spdk_bdev_init_complete(-1);
768 		return;
769 	}
770 
771 	/**
772 	 * Ensure no more than half of the total buffers end up local caches, by
773 	 *   using spdk_thread_get_count() to determine how many local caches we need
774 	 *   to account for.
775 	 */
776 	cache_size = BUF_SMALL_POOL_SIZE / (2 * spdk_thread_get_count());
777 	snprintf(mempool_name, sizeof(mempool_name), "buf_small_pool_%d", getpid());
778 
779 	g_bdev_mgr.buf_small_pool = spdk_mempool_create(mempool_name,
780 				    BUF_SMALL_POOL_SIZE,
781 				    SPDK_BDEV_SMALL_BUF_MAX_SIZE + 512,
782 				    cache_size,
783 				    SPDK_ENV_SOCKET_ID_ANY);
784 	if (!g_bdev_mgr.buf_small_pool) {
785 		SPDK_ERRLOG("create rbuf small pool failed\n");
786 		spdk_bdev_init_complete(-1);
787 		return;
788 	}
789 
790 	cache_size = BUF_LARGE_POOL_SIZE / (2 * spdk_thread_get_count());
791 	snprintf(mempool_name, sizeof(mempool_name), "buf_large_pool_%d", getpid());
792 
793 	g_bdev_mgr.buf_large_pool = spdk_mempool_create(mempool_name,
794 				    BUF_LARGE_POOL_SIZE,
795 				    SPDK_BDEV_LARGE_BUF_MAX_SIZE + 512,
796 				    cache_size,
797 				    SPDK_ENV_SOCKET_ID_ANY);
798 	if (!g_bdev_mgr.buf_large_pool) {
799 		SPDK_ERRLOG("create rbuf large pool failed\n");
800 		spdk_bdev_init_complete(-1);
801 		return;
802 	}
803 
804 	g_bdev_mgr.zero_buffer = spdk_dma_zmalloc(ZERO_BUFFER_SIZE, ZERO_BUFFER_SIZE,
805 				 NULL);
806 	if (!g_bdev_mgr.zero_buffer) {
807 		SPDK_ERRLOG("create bdev zero buffer failed\n");
808 		spdk_bdev_init_complete(-1);
809 		return;
810 	}
811 
812 #ifdef SPDK_CONFIG_VTUNE
813 	g_bdev_mgr.domain = __itt_domain_create("spdk_bdev");
814 #endif
815 
816 	spdk_io_device_register(&g_bdev_mgr, spdk_bdev_mgmt_channel_create,
817 				spdk_bdev_mgmt_channel_destroy,
818 				sizeof(struct spdk_bdev_mgmt_channel),
819 				"bdev_mgr");
820 
821 	rc = spdk_bdev_modules_init();
822 	g_bdev_mgr.module_init_complete = true;
823 	if (rc != 0) {
824 		SPDK_ERRLOG("bdev modules init failed\n");
825 		spdk_thread_send_msg(spdk_get_thread(), spdk_bdev_init_failed, NULL);
826 		return;
827 	}
828 
829 	spdk_bdev_module_action_complete();
830 }
831 
832 static void
833 spdk_bdev_mgr_unregister_cb(void *io_device)
834 {
835 	spdk_bdev_fini_cb cb_fn = g_fini_cb_fn;
836 
837 	if (spdk_mempool_count(g_bdev_mgr.bdev_io_pool) != g_bdev_opts.bdev_io_pool_size) {
838 		SPDK_ERRLOG("bdev IO pool count is %zu but should be %u\n",
839 			    spdk_mempool_count(g_bdev_mgr.bdev_io_pool),
840 			    g_bdev_opts.bdev_io_pool_size);
841 	}
842 
843 	if (spdk_mempool_count(g_bdev_mgr.buf_small_pool) != BUF_SMALL_POOL_SIZE) {
844 		SPDK_ERRLOG("Small buffer pool count is %zu but should be %u\n",
845 			    spdk_mempool_count(g_bdev_mgr.buf_small_pool),
846 			    BUF_SMALL_POOL_SIZE);
847 		assert(false);
848 	}
849 
850 	if (spdk_mempool_count(g_bdev_mgr.buf_large_pool) != BUF_LARGE_POOL_SIZE) {
851 		SPDK_ERRLOG("Large buffer pool count is %zu but should be %u\n",
852 			    spdk_mempool_count(g_bdev_mgr.buf_large_pool),
853 			    BUF_LARGE_POOL_SIZE);
854 		assert(false);
855 	}
856 
857 	spdk_mempool_free(g_bdev_mgr.bdev_io_pool);
858 	spdk_mempool_free(g_bdev_mgr.buf_small_pool);
859 	spdk_mempool_free(g_bdev_mgr.buf_large_pool);
860 	spdk_dma_free(g_bdev_mgr.zero_buffer);
861 
862 	cb_fn(g_fini_cb_arg);
863 	g_fini_cb_fn = NULL;
864 	g_fini_cb_arg = NULL;
865 }
866 
867 static void
868 spdk_bdev_module_finish_iter(void *arg)
869 {
870 	struct spdk_bdev_module *bdev_module;
871 
872 	/* Start iterating from the last touched module */
873 	if (!g_resume_bdev_module) {
874 		bdev_module = TAILQ_LAST(&g_bdev_mgr.bdev_modules, bdev_module_list);
875 	} else {
876 		bdev_module = TAILQ_PREV(g_resume_bdev_module, bdev_module_list,
877 					 internal.tailq);
878 	}
879 
880 	while (bdev_module) {
881 		if (bdev_module->async_fini) {
882 			/* Save our place so we can resume later. We must
883 			 * save the variable here, before calling module_fini()
884 			 * below, because in some cases the module may immediately
885 			 * call spdk_bdev_module_finish_done() and re-enter
886 			 * this function to continue iterating. */
887 			g_resume_bdev_module = bdev_module;
888 		}
889 
890 		if (bdev_module->module_fini) {
891 			bdev_module->module_fini();
892 		}
893 
894 		if (bdev_module->async_fini) {
895 			return;
896 		}
897 
898 		bdev_module = TAILQ_PREV(bdev_module, bdev_module_list,
899 					 internal.tailq);
900 	}
901 
902 	g_resume_bdev_module = NULL;
903 	spdk_io_device_unregister(&g_bdev_mgr, spdk_bdev_mgr_unregister_cb);
904 }
905 
906 void
907 spdk_bdev_module_finish_done(void)
908 {
909 	if (spdk_get_thread() != g_fini_thread) {
910 		spdk_thread_send_msg(g_fini_thread, spdk_bdev_module_finish_iter, NULL);
911 	} else {
912 		spdk_bdev_module_finish_iter(NULL);
913 	}
914 }
915 
916 static void
917 _spdk_bdev_finish_unregister_bdevs_iter(void *cb_arg, int bdeverrno)
918 {
919 	struct spdk_bdev *bdev = cb_arg;
920 
921 	if (bdeverrno && bdev) {
922 		SPDK_WARNLOG("Unable to unregister bdev '%s' during spdk_bdev_finish()\n",
923 			     bdev->name);
924 
925 		/*
926 		 * Since the call to spdk_bdev_unregister() failed, we have no way to free this
927 		 *  bdev; try to continue by manually removing this bdev from the list and continue
928 		 *  with the next bdev in the list.
929 		 */
930 		TAILQ_REMOVE(&g_bdev_mgr.bdevs, bdev, internal.link);
931 	}
932 
933 	if (TAILQ_EMPTY(&g_bdev_mgr.bdevs)) {
934 		SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Done unregistering bdevs\n");
935 		/*
936 		 * Bdev module finish need to be deffered as we might be in the middle of some context
937 		 * (like bdev part free) that will use this bdev (or private bdev driver ctx data)
938 		 * after returning.
939 		 */
940 		spdk_thread_send_msg(spdk_get_thread(), spdk_bdev_module_finish_iter, NULL);
941 		return;
942 	}
943 
944 	/*
945 	 * Unregister the last bdev in the list.  The last bdev in the list should be a bdev
946 	 * that has no bdevs that depend on it.
947 	 */
948 	bdev = TAILQ_LAST(&g_bdev_mgr.bdevs, spdk_bdev_list);
949 	SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Unregistering bdev '%s'\n", bdev->name);
950 	spdk_bdev_unregister(bdev, _spdk_bdev_finish_unregister_bdevs_iter, bdev);
951 }
952 
953 void
954 spdk_bdev_finish(spdk_bdev_fini_cb cb_fn, void *cb_arg)
955 {
956 	struct spdk_bdev_module *m;
957 
958 	assert(cb_fn != NULL);
959 
960 	g_fini_thread = spdk_get_thread();
961 
962 	g_fini_cb_fn = cb_fn;
963 	g_fini_cb_arg = cb_arg;
964 
965 	TAILQ_FOREACH(m, &g_bdev_mgr.bdev_modules, internal.tailq) {
966 		if (m->fini_start) {
967 			m->fini_start();
968 		}
969 	}
970 
971 	_spdk_bdev_finish_unregister_bdevs_iter(NULL, 0);
972 }
973 
974 static struct spdk_bdev_io *
975 spdk_bdev_get_io(struct spdk_bdev_channel *channel)
976 {
977 	struct spdk_bdev_mgmt_channel *ch = channel->shared_resource->mgmt_ch;
978 	struct spdk_bdev_io *bdev_io;
979 
980 	if (ch->per_thread_cache_count > 0) {
981 		bdev_io = STAILQ_FIRST(&ch->per_thread_cache);
982 		STAILQ_REMOVE_HEAD(&ch->per_thread_cache, internal.buf_link);
983 		ch->per_thread_cache_count--;
984 	} else if (spdk_unlikely(!TAILQ_EMPTY(&ch->io_wait_queue))) {
985 		/*
986 		 * Don't try to look for bdev_ios in the global pool if there are
987 		 * waiters on bdev_ios - we don't want this caller to jump the line.
988 		 */
989 		bdev_io = NULL;
990 	} else {
991 		bdev_io = spdk_mempool_get(g_bdev_mgr.bdev_io_pool);
992 	}
993 
994 	return bdev_io;
995 }
996 
997 void
998 spdk_bdev_free_io(struct spdk_bdev_io *bdev_io)
999 {
1000 	struct spdk_bdev_mgmt_channel *ch = bdev_io->internal.ch->shared_resource->mgmt_ch;
1001 
1002 	assert(bdev_io != NULL);
1003 	assert(bdev_io->internal.status != SPDK_BDEV_IO_STATUS_PENDING);
1004 
1005 	if (bdev_io->internal.buf != NULL) {
1006 		spdk_bdev_io_put_buf(bdev_io);
1007 	}
1008 
1009 	if (ch->per_thread_cache_count < ch->bdev_io_cache_size) {
1010 		ch->per_thread_cache_count++;
1011 		STAILQ_INSERT_TAIL(&ch->per_thread_cache, bdev_io, internal.buf_link);
1012 		while (ch->per_thread_cache_count > 0 && !TAILQ_EMPTY(&ch->io_wait_queue)) {
1013 			struct spdk_bdev_io_wait_entry *entry;
1014 
1015 			entry = TAILQ_FIRST(&ch->io_wait_queue);
1016 			TAILQ_REMOVE(&ch->io_wait_queue, entry, link);
1017 			entry->cb_fn(entry->cb_arg);
1018 		}
1019 	} else {
1020 		/* We should never have a full cache with entries on the io wait queue. */
1021 		assert(TAILQ_EMPTY(&ch->io_wait_queue));
1022 		spdk_mempool_put(g_bdev_mgr.bdev_io_pool, (void *)bdev_io);
1023 	}
1024 }
1025 
1026 static bool
1027 _spdk_bdev_qos_is_iops_rate_limit(enum spdk_bdev_qos_rate_limit_type limit)
1028 {
1029 	assert(limit != SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES);
1030 
1031 	switch (limit) {
1032 	case SPDK_BDEV_QOS_RW_IOPS_RATE_LIMIT:
1033 		return true;
1034 	case SPDK_BDEV_QOS_RW_BPS_RATE_LIMIT:
1035 		return false;
1036 	case SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES:
1037 	default:
1038 		return false;
1039 	}
1040 }
1041 
1042 static bool
1043 _spdk_bdev_qos_io_to_limit(struct spdk_bdev_io *bdev_io)
1044 {
1045 	switch (bdev_io->type) {
1046 	case SPDK_BDEV_IO_TYPE_NVME_IO:
1047 	case SPDK_BDEV_IO_TYPE_NVME_IO_MD:
1048 	case SPDK_BDEV_IO_TYPE_READ:
1049 	case SPDK_BDEV_IO_TYPE_WRITE:
1050 	case SPDK_BDEV_IO_TYPE_UNMAP:
1051 	case SPDK_BDEV_IO_TYPE_WRITE_ZEROES:
1052 		return true;
1053 	default:
1054 		return false;
1055 	}
1056 }
1057 
1058 static uint64_t
1059 _spdk_bdev_get_io_size_in_byte(struct spdk_bdev_io *bdev_io)
1060 {
1061 	struct spdk_bdev	*bdev = bdev_io->bdev;
1062 
1063 	switch (bdev_io->type) {
1064 	case SPDK_BDEV_IO_TYPE_NVME_IO:
1065 	case SPDK_BDEV_IO_TYPE_NVME_IO_MD:
1066 		return bdev_io->u.nvme_passthru.nbytes;
1067 	case SPDK_BDEV_IO_TYPE_READ:
1068 	case SPDK_BDEV_IO_TYPE_WRITE:
1069 	case SPDK_BDEV_IO_TYPE_UNMAP:
1070 	case SPDK_BDEV_IO_TYPE_WRITE_ZEROES:
1071 		return bdev_io->u.bdev.num_blocks * bdev->blocklen;
1072 	default:
1073 		return 0;
1074 	}
1075 }
1076 
1077 static void
1078 _spdk_bdev_qos_update_per_io(struct spdk_bdev_qos *qos, uint64_t io_size_in_byte)
1079 {
1080 	int i;
1081 
1082 	for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) {
1083 		if (qos->rate_limits[i].limit == SPDK_BDEV_QOS_LIMIT_NOT_DEFINED) {
1084 			continue;
1085 		}
1086 
1087 		switch (i) {
1088 		case SPDK_BDEV_QOS_RW_IOPS_RATE_LIMIT:
1089 			qos->rate_limits[i].remaining_this_timeslice--;
1090 			break;
1091 		case SPDK_BDEV_QOS_RW_BPS_RATE_LIMIT:
1092 			qos->rate_limits[i].remaining_this_timeslice -= io_size_in_byte;
1093 			break;
1094 		case SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES:
1095 		default:
1096 			break;
1097 		}
1098 	}
1099 }
1100 
1101 static void
1102 _spdk_bdev_qos_io_submit(struct spdk_bdev_channel *ch)
1103 {
1104 	struct spdk_bdev_io		*bdev_io = NULL;
1105 	struct spdk_bdev		*bdev = ch->bdev;
1106 	struct spdk_bdev_qos		*qos = bdev->internal.qos;
1107 	struct spdk_bdev_shared_resource *shared_resource = ch->shared_resource;
1108 	int				i;
1109 	bool				to_limit_io;
1110 	uint64_t			io_size_in_byte;
1111 
1112 	while (!TAILQ_EMPTY(&qos->queued)) {
1113 		for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) {
1114 			if (qos->rate_limits[i].max_per_timeslice > 0 &&
1115 			    (qos->rate_limits[i].remaining_this_timeslice <= 0)) {
1116 				return;
1117 			}
1118 		}
1119 
1120 		bdev_io = TAILQ_FIRST(&qos->queued);
1121 		TAILQ_REMOVE(&qos->queued, bdev_io, internal.link);
1122 		ch->io_outstanding++;
1123 		shared_resource->io_outstanding++;
1124 		to_limit_io = _spdk_bdev_qos_io_to_limit(bdev_io);
1125 		if (to_limit_io == true) {
1126 			io_size_in_byte = _spdk_bdev_get_io_size_in_byte(bdev_io);
1127 			_spdk_bdev_qos_update_per_io(qos, io_size_in_byte);
1128 		}
1129 		bdev->fn_table->submit_request(ch->channel, bdev_io);
1130 	}
1131 }
1132 
1133 static void
1134 _spdk_bdev_queue_io_wait_with_cb(struct spdk_bdev_io *bdev_io, spdk_bdev_io_wait_cb cb_fn)
1135 {
1136 	int rc;
1137 
1138 	bdev_io->internal.waitq_entry.bdev = bdev_io->bdev;
1139 	bdev_io->internal.waitq_entry.cb_fn = cb_fn;
1140 	bdev_io->internal.waitq_entry.cb_arg = bdev_io;
1141 	rc = spdk_bdev_queue_io_wait(bdev_io->bdev, spdk_io_channel_from_ctx(bdev_io->internal.ch),
1142 				     &bdev_io->internal.waitq_entry);
1143 	if (rc != 0) {
1144 		SPDK_ERRLOG("Queue IO failed, rc=%d\n", rc);
1145 		bdev_io->internal.status = SPDK_BDEV_IO_STATUS_FAILED;
1146 		bdev_io->internal.cb(bdev_io, false, bdev_io->internal.caller_ctx);
1147 	}
1148 }
1149 
1150 static bool
1151 _spdk_bdev_io_type_can_split(uint8_t type)
1152 {
1153 	assert(type != SPDK_BDEV_IO_TYPE_INVALID);
1154 	assert(type < SPDK_BDEV_NUM_IO_TYPES);
1155 
1156 	/* Only split READ and WRITE I/O.  Theoretically other types of I/O like
1157 	 * UNMAP could be split, but these types of I/O are typically much larger
1158 	 * in size (sometimes the size of the entire block device), and the bdev
1159 	 * module can more efficiently split these types of I/O.  Plus those types
1160 	 * of I/O do not have a payload, which makes the splitting process simpler.
1161 	 */
1162 	if (type == SPDK_BDEV_IO_TYPE_READ || type == SPDK_BDEV_IO_TYPE_WRITE) {
1163 		return true;
1164 	} else {
1165 		return false;
1166 	}
1167 }
1168 
1169 static bool
1170 _spdk_bdev_io_should_split(struct spdk_bdev_io *bdev_io)
1171 {
1172 	uint64_t start_stripe, end_stripe;
1173 	uint32_t io_boundary = bdev_io->bdev->optimal_io_boundary;
1174 
1175 	if (io_boundary == 0) {
1176 		return false;
1177 	}
1178 
1179 	if (!_spdk_bdev_io_type_can_split(bdev_io->type)) {
1180 		return false;
1181 	}
1182 
1183 	start_stripe = bdev_io->u.bdev.offset_blocks;
1184 	end_stripe = start_stripe + bdev_io->u.bdev.num_blocks - 1;
1185 	/* Avoid expensive div operations if possible.  These spdk_u32 functions are very cheap. */
1186 	if (spdk_likely(spdk_u32_is_pow2(io_boundary))) {
1187 		start_stripe >>= spdk_u32log2(io_boundary);
1188 		end_stripe >>= spdk_u32log2(io_boundary);
1189 	} else {
1190 		start_stripe /= io_boundary;
1191 		end_stripe /= io_boundary;
1192 	}
1193 	return (start_stripe != end_stripe);
1194 }
1195 
1196 static uint32_t
1197 _to_next_boundary(uint64_t offset, uint32_t boundary)
1198 {
1199 	return (boundary - (offset % boundary));
1200 }
1201 
1202 static void
1203 _spdk_bdev_io_split_done(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg);
1204 
1205 static void
1206 _spdk_bdev_io_split_with_payload(void *_bdev_io)
1207 {
1208 	struct spdk_bdev_io *bdev_io = _bdev_io;
1209 	uint64_t current_offset, remaining, bytes_handled;
1210 	uint32_t blocklen, to_next_boundary, to_next_boundary_bytes;
1211 	struct iovec *parent_iov;
1212 	uint64_t parent_iov_offset, child_iov_len;
1213 	uint32_t child_iovcnt;
1214 	int rc;
1215 
1216 	remaining = bdev_io->u.bdev.split_remaining_num_blocks;
1217 	current_offset = bdev_io->u.bdev.split_current_offset_blocks;
1218 	blocklen = bdev_io->bdev->blocklen;
1219 	bytes_handled = (current_offset - bdev_io->u.bdev.offset_blocks) * blocklen;
1220 	parent_iov = &bdev_io->u.bdev.iovs[0];
1221 	parent_iov_offset = 0;
1222 
1223 	while (bytes_handled > 0) {
1224 		if (bytes_handled >= parent_iov->iov_len) {
1225 			bytes_handled -= parent_iov->iov_len;
1226 			parent_iov++;
1227 			continue;
1228 		}
1229 		parent_iov_offset += bytes_handled;
1230 		break;
1231 	}
1232 
1233 	to_next_boundary = _to_next_boundary(current_offset, bdev_io->bdev->optimal_io_boundary);
1234 	to_next_boundary = spdk_min(remaining, to_next_boundary);
1235 	to_next_boundary_bytes = to_next_boundary * blocklen;
1236 	child_iovcnt = 0;
1237 	while (to_next_boundary_bytes > 0 && child_iovcnt < BDEV_IO_NUM_CHILD_IOV) {
1238 		child_iov_len = spdk_min(to_next_boundary_bytes, parent_iov->iov_len - parent_iov_offset);
1239 		to_next_boundary_bytes -= child_iov_len;
1240 
1241 		bdev_io->child_iov[child_iovcnt].iov_base = parent_iov->iov_base + parent_iov_offset;
1242 		bdev_io->child_iov[child_iovcnt].iov_len = child_iov_len;
1243 
1244 		parent_iov++;
1245 		parent_iov_offset = 0;
1246 		child_iovcnt++;
1247 	}
1248 
1249 	if (to_next_boundary_bytes > 0) {
1250 		/* We had to stop this child I/O early because we ran out of
1251 		 *  child_iov space.  Make sure the iovs collected are valid and
1252 		 *  then adjust to_next_boundary before starting the child I/O.
1253 		 */
1254 		if ((to_next_boundary_bytes % blocklen) != 0) {
1255 			SPDK_ERRLOG("Remaining %" PRIu32 " is not multiple of block size %" PRIu32 "\n",
1256 				    to_next_boundary_bytes, blocklen);
1257 			bdev_io->internal.status = SPDK_BDEV_IO_STATUS_FAILED;
1258 			bdev_io->internal.cb(bdev_io, false, bdev_io->internal.caller_ctx);
1259 			return;
1260 		}
1261 		to_next_boundary -= to_next_boundary_bytes / blocklen;
1262 	}
1263 
1264 	if (bdev_io->type == SPDK_BDEV_IO_TYPE_READ) {
1265 		rc = spdk_bdev_readv_blocks(bdev_io->internal.desc,
1266 					    spdk_io_channel_from_ctx(bdev_io->internal.ch),
1267 					    bdev_io->child_iov, child_iovcnt, current_offset, to_next_boundary,
1268 					    _spdk_bdev_io_split_done, bdev_io);
1269 	} else {
1270 		rc = spdk_bdev_writev_blocks(bdev_io->internal.desc,
1271 					     spdk_io_channel_from_ctx(bdev_io->internal.ch),
1272 					     bdev_io->child_iov, child_iovcnt, current_offset, to_next_boundary,
1273 					     _spdk_bdev_io_split_done, bdev_io);
1274 	}
1275 
1276 	if (rc == 0) {
1277 		bdev_io->u.bdev.split_current_offset_blocks += to_next_boundary;
1278 		bdev_io->u.bdev.split_remaining_num_blocks -= to_next_boundary;
1279 	} else if (rc == -ENOMEM) {
1280 		_spdk_bdev_queue_io_wait_with_cb(bdev_io, _spdk_bdev_io_split_with_payload);
1281 	} else {
1282 		bdev_io->internal.status = SPDK_BDEV_IO_STATUS_FAILED;
1283 		bdev_io->internal.cb(bdev_io, false, bdev_io->internal.caller_ctx);
1284 	}
1285 }
1286 
1287 static void
1288 _spdk_bdev_io_split_done(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg)
1289 {
1290 	struct spdk_bdev_io *parent_io = cb_arg;
1291 
1292 	spdk_bdev_free_io(bdev_io);
1293 
1294 	if (!success) {
1295 		parent_io->internal.status = SPDK_BDEV_IO_STATUS_FAILED;
1296 		parent_io->internal.cb(parent_io, false, parent_io->internal.caller_ctx);
1297 		return;
1298 	}
1299 
1300 	if (parent_io->u.bdev.split_remaining_num_blocks == 0) {
1301 		parent_io->internal.status = SPDK_BDEV_IO_STATUS_SUCCESS;
1302 		parent_io->internal.cb(parent_io, true, parent_io->internal.caller_ctx);
1303 		return;
1304 	}
1305 
1306 	/*
1307 	 * Continue with the splitting process.  This function will complete the parent I/O if the
1308 	 * splitting is done.
1309 	 */
1310 	_spdk_bdev_io_split_with_payload(parent_io);
1311 }
1312 
1313 static void
1314 _spdk_bdev_io_split(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io)
1315 {
1316 	assert(_spdk_bdev_io_type_can_split(bdev_io->type));
1317 
1318 	bdev_io->u.bdev.split_current_offset_blocks = bdev_io->u.bdev.offset_blocks;
1319 	bdev_io->u.bdev.split_remaining_num_blocks = bdev_io->u.bdev.num_blocks;
1320 
1321 	_spdk_bdev_io_split_with_payload(bdev_io);
1322 }
1323 
1324 static void
1325 _spdk_bdev_io_submit(void *ctx)
1326 {
1327 	struct spdk_bdev_io *bdev_io = ctx;
1328 	struct spdk_bdev *bdev = bdev_io->bdev;
1329 	struct spdk_bdev_channel *bdev_ch = bdev_io->internal.ch;
1330 	struct spdk_io_channel *ch = bdev_ch->channel;
1331 	struct spdk_bdev_shared_resource *shared_resource = bdev_ch->shared_resource;
1332 	uint64_t tsc;
1333 
1334 	tsc = spdk_get_ticks();
1335 	bdev_io->internal.submit_tsc = tsc;
1336 	spdk_trace_record_tsc(tsc, TRACE_BDEV_IO_START, 0, 0, (uintptr_t)bdev_io, bdev_io->type);
1337 	bdev_ch->io_outstanding++;
1338 	shared_resource->io_outstanding++;
1339 	bdev_io->internal.in_submit_request = true;
1340 	if (spdk_likely(bdev_ch->flags == 0)) {
1341 		if (spdk_likely(TAILQ_EMPTY(&shared_resource->nomem_io))) {
1342 			bdev->fn_table->submit_request(ch, bdev_io);
1343 		} else {
1344 			bdev_ch->io_outstanding--;
1345 			shared_resource->io_outstanding--;
1346 			TAILQ_INSERT_TAIL(&shared_resource->nomem_io, bdev_io, internal.link);
1347 		}
1348 	} else if (bdev_ch->flags & BDEV_CH_RESET_IN_PROGRESS) {
1349 		spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED);
1350 	} else if (bdev_ch->flags & BDEV_CH_QOS_ENABLED) {
1351 		bdev_ch->io_outstanding--;
1352 		shared_resource->io_outstanding--;
1353 		TAILQ_INSERT_TAIL(&bdev->internal.qos->queued, bdev_io, internal.link);
1354 		_spdk_bdev_qos_io_submit(bdev_ch);
1355 	} else {
1356 		SPDK_ERRLOG("unknown bdev_ch flag %x found\n", bdev_ch->flags);
1357 		spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED);
1358 	}
1359 	bdev_io->internal.in_submit_request = false;
1360 }
1361 
1362 static void
1363 spdk_bdev_io_submit(struct spdk_bdev_io *bdev_io)
1364 {
1365 	struct spdk_bdev *bdev = bdev_io->bdev;
1366 	struct spdk_thread *thread = spdk_io_channel_get_thread(bdev_io->internal.ch->channel);
1367 
1368 	assert(thread != NULL);
1369 	assert(bdev_io->internal.status == SPDK_BDEV_IO_STATUS_PENDING);
1370 
1371 	if (bdev->split_on_optimal_io_boundary && _spdk_bdev_io_should_split(bdev_io)) {
1372 		if (bdev_io->type == SPDK_BDEV_IO_TYPE_READ) {
1373 			spdk_bdev_io_get_buf(bdev_io, _spdk_bdev_io_split,
1374 					     bdev_io->u.bdev.num_blocks * bdev_io->bdev->blocklen);
1375 		} else {
1376 			_spdk_bdev_io_split(NULL, bdev_io);
1377 		}
1378 		return;
1379 	}
1380 
1381 	if (bdev_io->internal.ch->flags & BDEV_CH_QOS_ENABLED) {
1382 		if ((thread == bdev->internal.qos->thread) || !bdev->internal.qos->thread) {
1383 			_spdk_bdev_io_submit(bdev_io);
1384 		} else {
1385 			bdev_io->internal.io_submit_ch = bdev_io->internal.ch;
1386 			bdev_io->internal.ch = bdev->internal.qos->ch;
1387 			spdk_thread_send_msg(bdev->internal.qos->thread, _spdk_bdev_io_submit, bdev_io);
1388 		}
1389 	} else {
1390 		_spdk_bdev_io_submit(bdev_io);
1391 	}
1392 }
1393 
1394 static void
1395 spdk_bdev_io_submit_reset(struct spdk_bdev_io *bdev_io)
1396 {
1397 	struct spdk_bdev *bdev = bdev_io->bdev;
1398 	struct spdk_bdev_channel *bdev_ch = bdev_io->internal.ch;
1399 	struct spdk_io_channel *ch = bdev_ch->channel;
1400 
1401 	assert(bdev_io->internal.status == SPDK_BDEV_IO_STATUS_PENDING);
1402 
1403 	bdev_io->internal.in_submit_request = true;
1404 	bdev->fn_table->submit_request(ch, bdev_io);
1405 	bdev_io->internal.in_submit_request = false;
1406 }
1407 
1408 static void
1409 spdk_bdev_io_init(struct spdk_bdev_io *bdev_io,
1410 		  struct spdk_bdev *bdev, void *cb_arg,
1411 		  spdk_bdev_io_completion_cb cb)
1412 {
1413 	bdev_io->bdev = bdev;
1414 	bdev_io->internal.caller_ctx = cb_arg;
1415 	bdev_io->internal.cb = cb;
1416 	bdev_io->internal.status = SPDK_BDEV_IO_STATUS_PENDING;
1417 	bdev_io->internal.in_submit_request = false;
1418 	bdev_io->internal.buf = NULL;
1419 	bdev_io->internal.io_submit_ch = NULL;
1420 }
1421 
1422 static bool
1423 _spdk_bdev_io_type_supported(struct spdk_bdev *bdev, enum spdk_bdev_io_type io_type)
1424 {
1425 	return bdev->fn_table->io_type_supported(bdev->ctxt, io_type);
1426 }
1427 
1428 bool
1429 spdk_bdev_io_type_supported(struct spdk_bdev *bdev, enum spdk_bdev_io_type io_type)
1430 {
1431 	bool supported;
1432 
1433 	supported = _spdk_bdev_io_type_supported(bdev, io_type);
1434 
1435 	if (!supported) {
1436 		switch (io_type) {
1437 		case SPDK_BDEV_IO_TYPE_WRITE_ZEROES:
1438 			/* The bdev layer will emulate write zeroes as long as write is supported. */
1439 			supported = _spdk_bdev_io_type_supported(bdev, SPDK_BDEV_IO_TYPE_WRITE);
1440 			break;
1441 		default:
1442 			break;
1443 		}
1444 	}
1445 
1446 	return supported;
1447 }
1448 
1449 int
1450 spdk_bdev_dump_info_json(struct spdk_bdev *bdev, struct spdk_json_write_ctx *w)
1451 {
1452 	if (bdev->fn_table->dump_info_json) {
1453 		return bdev->fn_table->dump_info_json(bdev->ctxt, w);
1454 	}
1455 
1456 	return 0;
1457 }
1458 
1459 static void
1460 spdk_bdev_qos_update_max_quota_per_timeslice(struct spdk_bdev_qos *qos)
1461 {
1462 	uint32_t max_per_timeslice = 0;
1463 	int i;
1464 
1465 	for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) {
1466 		if (qos->rate_limits[i].limit == SPDK_BDEV_QOS_LIMIT_NOT_DEFINED) {
1467 			qos->rate_limits[i].max_per_timeslice = 0;
1468 			continue;
1469 		}
1470 
1471 		max_per_timeslice = qos->rate_limits[i].limit *
1472 				    SPDK_BDEV_QOS_TIMESLICE_IN_USEC / SPDK_SEC_TO_USEC;
1473 
1474 		qos->rate_limits[i].max_per_timeslice = spdk_max(max_per_timeslice,
1475 							qos->rate_limits[i].min_per_timeslice);
1476 
1477 		qos->rate_limits[i].remaining_this_timeslice = qos->rate_limits[i].max_per_timeslice;
1478 	}
1479 }
1480 
1481 static int
1482 spdk_bdev_channel_poll_qos(void *arg)
1483 {
1484 	struct spdk_bdev_qos *qos = arg;
1485 	uint64_t now = spdk_get_ticks();
1486 	int i;
1487 
1488 	if (now < (qos->last_timeslice + qos->timeslice_size)) {
1489 		/* We received our callback earlier than expected - return
1490 		 *  immediately and wait to do accounting until at least one
1491 		 *  timeslice has actually expired.  This should never happen
1492 		 *  with a well-behaved timer implementation.
1493 		 */
1494 		return 0;
1495 	}
1496 
1497 	/* Reset for next round of rate limiting */
1498 	for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) {
1499 		/* We may have allowed the IOs or bytes to slightly overrun in the last
1500 		 * timeslice. remaining_this_timeslice is signed, so if it's negative
1501 		 * here, we'll account for the overrun so that the next timeslice will
1502 		 * be appropriately reduced.
1503 		 */
1504 		if (qos->rate_limits[i].remaining_this_timeslice > 0) {
1505 			qos->rate_limits[i].remaining_this_timeslice = 0;
1506 		}
1507 	}
1508 
1509 	while (now >= (qos->last_timeslice + qos->timeslice_size)) {
1510 		qos->last_timeslice += qos->timeslice_size;
1511 		for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) {
1512 			qos->rate_limits[i].remaining_this_timeslice +=
1513 				qos->rate_limits[i].max_per_timeslice;
1514 		}
1515 	}
1516 
1517 	_spdk_bdev_qos_io_submit(qos->ch);
1518 
1519 	return -1;
1520 }
1521 
1522 static void
1523 _spdk_bdev_channel_destroy_resource(struct spdk_bdev_channel *ch)
1524 {
1525 	struct spdk_bdev_shared_resource *shared_resource;
1526 
1527 	if (!ch) {
1528 		return;
1529 	}
1530 
1531 	if (ch->channel) {
1532 		spdk_put_io_channel(ch->channel);
1533 	}
1534 
1535 	assert(ch->io_outstanding == 0);
1536 
1537 	shared_resource = ch->shared_resource;
1538 	if (shared_resource) {
1539 		assert(ch->io_outstanding == 0);
1540 		assert(shared_resource->ref > 0);
1541 		shared_resource->ref--;
1542 		if (shared_resource->ref == 0) {
1543 			assert(shared_resource->io_outstanding == 0);
1544 			TAILQ_REMOVE(&shared_resource->mgmt_ch->shared_resources, shared_resource, link);
1545 			spdk_put_io_channel(spdk_io_channel_from_ctx(shared_resource->mgmt_ch));
1546 			free(shared_resource);
1547 		}
1548 	}
1549 }
1550 
1551 /* Caller must hold bdev->internal.mutex. */
1552 static void
1553 _spdk_bdev_enable_qos(struct spdk_bdev *bdev, struct spdk_bdev_channel *ch)
1554 {
1555 	struct spdk_bdev_qos	*qos = bdev->internal.qos;
1556 	int			i;
1557 
1558 	/* Rate limiting on this bdev enabled */
1559 	if (qos) {
1560 		if (qos->ch == NULL) {
1561 			struct spdk_io_channel *io_ch;
1562 
1563 			SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Selecting channel %p as QoS channel for bdev %s on thread %p\n", ch,
1564 				      bdev->name, spdk_get_thread());
1565 
1566 			/* No qos channel has been selected, so set one up */
1567 
1568 			/* Take another reference to ch */
1569 			io_ch = spdk_get_io_channel(__bdev_to_io_dev(bdev));
1570 			qos->ch = ch;
1571 
1572 			qos->thread = spdk_io_channel_get_thread(io_ch);
1573 
1574 			TAILQ_INIT(&qos->queued);
1575 
1576 			for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) {
1577 				if (_spdk_bdev_qos_is_iops_rate_limit(i) == true) {
1578 					qos->rate_limits[i].min_per_timeslice =
1579 						SPDK_BDEV_QOS_MIN_IO_PER_TIMESLICE;
1580 				} else {
1581 					qos->rate_limits[i].min_per_timeslice =
1582 						SPDK_BDEV_QOS_MIN_BYTE_PER_TIMESLICE;
1583 				}
1584 
1585 				if (qos->rate_limits[i].limit == 0) {
1586 					qos->rate_limits[i].limit = SPDK_BDEV_QOS_LIMIT_NOT_DEFINED;
1587 				}
1588 			}
1589 			spdk_bdev_qos_update_max_quota_per_timeslice(qos);
1590 			qos->timeslice_size =
1591 				SPDK_BDEV_QOS_TIMESLICE_IN_USEC * spdk_get_ticks_hz() / SPDK_SEC_TO_USEC;
1592 			qos->last_timeslice = spdk_get_ticks();
1593 			qos->poller = spdk_poller_register(spdk_bdev_channel_poll_qos,
1594 							   qos,
1595 							   SPDK_BDEV_QOS_TIMESLICE_IN_USEC);
1596 		}
1597 
1598 		ch->flags |= BDEV_CH_QOS_ENABLED;
1599 	}
1600 }
1601 
1602 static int
1603 spdk_bdev_channel_create(void *io_device, void *ctx_buf)
1604 {
1605 	struct spdk_bdev		*bdev = __bdev_from_io_dev(io_device);
1606 	struct spdk_bdev_channel	*ch = ctx_buf;
1607 	struct spdk_io_channel		*mgmt_io_ch;
1608 	struct spdk_bdev_mgmt_channel	*mgmt_ch;
1609 	struct spdk_bdev_shared_resource *shared_resource;
1610 
1611 	ch->bdev = bdev;
1612 	ch->channel = bdev->fn_table->get_io_channel(bdev->ctxt);
1613 	if (!ch->channel) {
1614 		return -1;
1615 	}
1616 
1617 	mgmt_io_ch = spdk_get_io_channel(&g_bdev_mgr);
1618 	if (!mgmt_io_ch) {
1619 		return -1;
1620 	}
1621 
1622 	mgmt_ch = spdk_io_channel_get_ctx(mgmt_io_ch);
1623 	TAILQ_FOREACH(shared_resource, &mgmt_ch->shared_resources, link) {
1624 		if (shared_resource->shared_ch == ch->channel) {
1625 			spdk_put_io_channel(mgmt_io_ch);
1626 			shared_resource->ref++;
1627 			break;
1628 		}
1629 	}
1630 
1631 	if (shared_resource == NULL) {
1632 		shared_resource = calloc(1, sizeof(*shared_resource));
1633 		if (shared_resource == NULL) {
1634 			spdk_put_io_channel(mgmt_io_ch);
1635 			return -1;
1636 		}
1637 
1638 		shared_resource->mgmt_ch = mgmt_ch;
1639 		shared_resource->io_outstanding = 0;
1640 		TAILQ_INIT(&shared_resource->nomem_io);
1641 		shared_resource->nomem_threshold = 0;
1642 		shared_resource->shared_ch = ch->channel;
1643 		shared_resource->ref = 1;
1644 		TAILQ_INSERT_TAIL(&mgmt_ch->shared_resources, shared_resource, link);
1645 	}
1646 
1647 	memset(&ch->stat, 0, sizeof(ch->stat));
1648 	ch->stat.ticks_rate = spdk_get_ticks_hz();
1649 	ch->io_outstanding = 0;
1650 	TAILQ_INIT(&ch->queued_resets);
1651 	ch->flags = 0;
1652 	ch->shared_resource = shared_resource;
1653 
1654 #ifdef SPDK_CONFIG_VTUNE
1655 	{
1656 		char *name;
1657 		__itt_init_ittlib(NULL, 0);
1658 		name = spdk_sprintf_alloc("spdk_bdev_%s_%p", ch->bdev->name, ch);
1659 		if (!name) {
1660 			_spdk_bdev_channel_destroy_resource(ch);
1661 			return -1;
1662 		}
1663 		ch->handle = __itt_string_handle_create(name);
1664 		free(name);
1665 		ch->start_tsc = spdk_get_ticks();
1666 		ch->interval_tsc = spdk_get_ticks_hz() / 100;
1667 		memset(&ch->prev_stat, 0, sizeof(ch->prev_stat));
1668 	}
1669 #endif
1670 
1671 	pthread_mutex_lock(&bdev->internal.mutex);
1672 	_spdk_bdev_enable_qos(bdev, ch);
1673 	pthread_mutex_unlock(&bdev->internal.mutex);
1674 
1675 	return 0;
1676 }
1677 
1678 /*
1679  * Abort I/O that are waiting on a data buffer.  These types of I/O are
1680  *  linked using the spdk_bdev_io internal.buf_link TAILQ_ENTRY.
1681  */
1682 static void
1683 _spdk_bdev_abort_buf_io(bdev_io_stailq_t *queue, struct spdk_bdev_channel *ch)
1684 {
1685 	bdev_io_stailq_t tmp;
1686 	struct spdk_bdev_io *bdev_io;
1687 
1688 	STAILQ_INIT(&tmp);
1689 
1690 	while (!STAILQ_EMPTY(queue)) {
1691 		bdev_io = STAILQ_FIRST(queue);
1692 		STAILQ_REMOVE_HEAD(queue, internal.buf_link);
1693 		if (bdev_io->internal.ch == ch) {
1694 			spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED);
1695 		} else {
1696 			STAILQ_INSERT_TAIL(&tmp, bdev_io, internal.buf_link);
1697 		}
1698 	}
1699 
1700 	STAILQ_SWAP(&tmp, queue, spdk_bdev_io);
1701 }
1702 
1703 /*
1704  * Abort I/O that are queued waiting for submission.  These types of I/O are
1705  *  linked using the spdk_bdev_io link TAILQ_ENTRY.
1706  */
1707 static void
1708 _spdk_bdev_abort_queued_io(bdev_io_tailq_t *queue, struct spdk_bdev_channel *ch)
1709 {
1710 	struct spdk_bdev_io *bdev_io, *tmp;
1711 
1712 	TAILQ_FOREACH_SAFE(bdev_io, queue, internal.link, tmp) {
1713 		if (bdev_io->internal.ch == ch) {
1714 			TAILQ_REMOVE(queue, bdev_io, internal.link);
1715 			/*
1716 			 * spdk_bdev_io_complete() assumes that the completed I/O had
1717 			 *  been submitted to the bdev module.  Since in this case it
1718 			 *  hadn't, bump io_outstanding to account for the decrement
1719 			 *  that spdk_bdev_io_complete() will do.
1720 			 */
1721 			if (bdev_io->type != SPDK_BDEV_IO_TYPE_RESET) {
1722 				ch->io_outstanding++;
1723 				ch->shared_resource->io_outstanding++;
1724 			}
1725 			spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED);
1726 		}
1727 	}
1728 }
1729 
1730 static void
1731 spdk_bdev_qos_channel_destroy(void *cb_arg)
1732 {
1733 	struct spdk_bdev_qos *qos = cb_arg;
1734 
1735 	spdk_put_io_channel(spdk_io_channel_from_ctx(qos->ch));
1736 	spdk_poller_unregister(&qos->poller);
1737 
1738 	SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Free QoS %p.\n", qos);
1739 
1740 	free(qos);
1741 }
1742 
1743 static int
1744 spdk_bdev_qos_destroy(struct spdk_bdev *bdev)
1745 {
1746 	int i;
1747 
1748 	/*
1749 	 * Cleanly shutting down the QoS poller is tricky, because
1750 	 * during the asynchronous operation the user could open
1751 	 * a new descriptor and create a new channel, spawning
1752 	 * a new QoS poller.
1753 	 *
1754 	 * The strategy is to create a new QoS structure here and swap it
1755 	 * in. The shutdown path then continues to refer to the old one
1756 	 * until it completes and then releases it.
1757 	 */
1758 	struct spdk_bdev_qos *new_qos, *old_qos;
1759 
1760 	old_qos = bdev->internal.qos;
1761 
1762 	new_qos = calloc(1, sizeof(*new_qos));
1763 	if (!new_qos) {
1764 		SPDK_ERRLOG("Unable to allocate memory to shut down QoS.\n");
1765 		return -ENOMEM;
1766 	}
1767 
1768 	/* Copy the old QoS data into the newly allocated structure */
1769 	memcpy(new_qos, old_qos, sizeof(*new_qos));
1770 
1771 	/* Zero out the key parts of the QoS structure */
1772 	new_qos->ch = NULL;
1773 	new_qos->thread = NULL;
1774 	new_qos->poller = NULL;
1775 	TAILQ_INIT(&new_qos->queued);
1776 	/*
1777 	 * The limit member of spdk_bdev_qos_limit structure is not zeroed.
1778 	 * It will be used later for the new QoS structure.
1779 	 */
1780 	for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) {
1781 		new_qos->rate_limits[i].remaining_this_timeslice = 0;
1782 		new_qos->rate_limits[i].min_per_timeslice = 0;
1783 		new_qos->rate_limits[i].max_per_timeslice = 0;
1784 	}
1785 
1786 	bdev->internal.qos = new_qos;
1787 
1788 	if (old_qos->thread == NULL) {
1789 		free(old_qos);
1790 	} else {
1791 		spdk_thread_send_msg(old_qos->thread, spdk_bdev_qos_channel_destroy,
1792 				     old_qos);
1793 	}
1794 
1795 	/* It is safe to continue with destroying the bdev even though the QoS channel hasn't
1796 	 * been destroyed yet. The destruction path will end up waiting for the final
1797 	 * channel to be put before it releases resources. */
1798 
1799 	return 0;
1800 }
1801 
1802 static void
1803 _spdk_bdev_io_stat_add(struct spdk_bdev_io_stat *total, struct spdk_bdev_io_stat *add)
1804 {
1805 	total->bytes_read += add->bytes_read;
1806 	total->num_read_ops += add->num_read_ops;
1807 	total->bytes_written += add->bytes_written;
1808 	total->num_write_ops += add->num_write_ops;
1809 	total->read_latency_ticks += add->read_latency_ticks;
1810 	total->write_latency_ticks += add->write_latency_ticks;
1811 }
1812 
1813 static void
1814 spdk_bdev_channel_destroy(void *io_device, void *ctx_buf)
1815 {
1816 	struct spdk_bdev_channel	*ch = ctx_buf;
1817 	struct spdk_bdev_mgmt_channel	*mgmt_ch;
1818 	struct spdk_bdev_shared_resource *shared_resource = ch->shared_resource;
1819 
1820 	SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Destroying channel %p for bdev %s on thread %p\n", ch, ch->bdev->name,
1821 		      spdk_get_thread());
1822 
1823 	/* This channel is going away, so add its statistics into the bdev so that they don't get lost. */
1824 	pthread_mutex_lock(&ch->bdev->internal.mutex);
1825 	_spdk_bdev_io_stat_add(&ch->bdev->internal.stat, &ch->stat);
1826 	pthread_mutex_unlock(&ch->bdev->internal.mutex);
1827 
1828 	mgmt_ch = shared_resource->mgmt_ch;
1829 
1830 	_spdk_bdev_abort_queued_io(&ch->queued_resets, ch);
1831 	_spdk_bdev_abort_queued_io(&shared_resource->nomem_io, ch);
1832 	_spdk_bdev_abort_buf_io(&mgmt_ch->need_buf_small, ch);
1833 	_spdk_bdev_abort_buf_io(&mgmt_ch->need_buf_large, ch);
1834 
1835 	_spdk_bdev_channel_destroy_resource(ch);
1836 }
1837 
1838 int
1839 spdk_bdev_alias_add(struct spdk_bdev *bdev, const char *alias)
1840 {
1841 	struct spdk_bdev_alias *tmp;
1842 
1843 	if (alias == NULL) {
1844 		SPDK_ERRLOG("Empty alias passed\n");
1845 		return -EINVAL;
1846 	}
1847 
1848 	if (spdk_bdev_get_by_name(alias)) {
1849 		SPDK_ERRLOG("Bdev name/alias: %s already exists\n", alias);
1850 		return -EEXIST;
1851 	}
1852 
1853 	tmp = calloc(1, sizeof(*tmp));
1854 	if (tmp == NULL) {
1855 		SPDK_ERRLOG("Unable to allocate alias\n");
1856 		return -ENOMEM;
1857 	}
1858 
1859 	tmp->alias = strdup(alias);
1860 	if (tmp->alias == NULL) {
1861 		free(tmp);
1862 		SPDK_ERRLOG("Unable to allocate alias\n");
1863 		return -ENOMEM;
1864 	}
1865 
1866 	TAILQ_INSERT_TAIL(&bdev->aliases, tmp, tailq);
1867 
1868 	return 0;
1869 }
1870 
1871 int
1872 spdk_bdev_alias_del(struct spdk_bdev *bdev, const char *alias)
1873 {
1874 	struct spdk_bdev_alias *tmp;
1875 
1876 	TAILQ_FOREACH(tmp, &bdev->aliases, tailq) {
1877 		if (strcmp(alias, tmp->alias) == 0) {
1878 			TAILQ_REMOVE(&bdev->aliases, tmp, tailq);
1879 			free(tmp->alias);
1880 			free(tmp);
1881 			return 0;
1882 		}
1883 	}
1884 
1885 	SPDK_INFOLOG(SPDK_LOG_BDEV, "Alias %s does not exists\n", alias);
1886 
1887 	return -ENOENT;
1888 }
1889 
1890 void
1891 spdk_bdev_alias_del_all(struct spdk_bdev *bdev)
1892 {
1893 	struct spdk_bdev_alias *p, *tmp;
1894 
1895 	TAILQ_FOREACH_SAFE(p, &bdev->aliases, tailq, tmp) {
1896 		TAILQ_REMOVE(&bdev->aliases, p, tailq);
1897 		free(p->alias);
1898 		free(p);
1899 	}
1900 }
1901 
1902 struct spdk_io_channel *
1903 spdk_bdev_get_io_channel(struct spdk_bdev_desc *desc)
1904 {
1905 	return spdk_get_io_channel(__bdev_to_io_dev(desc->bdev));
1906 }
1907 
1908 const char *
1909 spdk_bdev_get_name(const struct spdk_bdev *bdev)
1910 {
1911 	return bdev->name;
1912 }
1913 
1914 const char *
1915 spdk_bdev_get_product_name(const struct spdk_bdev *bdev)
1916 {
1917 	return bdev->product_name;
1918 }
1919 
1920 const struct spdk_bdev_aliases_list *
1921 spdk_bdev_get_aliases(const struct spdk_bdev *bdev)
1922 {
1923 	return &bdev->aliases;
1924 }
1925 
1926 uint32_t
1927 spdk_bdev_get_block_size(const struct spdk_bdev *bdev)
1928 {
1929 	return bdev->blocklen;
1930 }
1931 
1932 uint64_t
1933 spdk_bdev_get_num_blocks(const struct spdk_bdev *bdev)
1934 {
1935 	return bdev->blockcnt;
1936 }
1937 
1938 const char *
1939 spdk_bdev_get_qos_rpc_type(enum spdk_bdev_qos_rate_limit_type type)
1940 {
1941 	return qos_rpc_type[type];
1942 }
1943 
1944 void
1945 spdk_bdev_get_qos_rate_limits(struct spdk_bdev *bdev, uint64_t *limits)
1946 {
1947 	int i;
1948 
1949 	memset(limits, 0, sizeof(*limits) * SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES);
1950 
1951 	pthread_mutex_lock(&bdev->internal.mutex);
1952 	if (bdev->internal.qos) {
1953 		for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) {
1954 			if (bdev->internal.qos->rate_limits[i].limit !=
1955 			    SPDK_BDEV_QOS_LIMIT_NOT_DEFINED) {
1956 				limits[i] = bdev->internal.qos->rate_limits[i].limit;
1957 			}
1958 		}
1959 	}
1960 	pthread_mutex_unlock(&bdev->internal.mutex);
1961 }
1962 
1963 size_t
1964 spdk_bdev_get_buf_align(const struct spdk_bdev *bdev)
1965 {
1966 	/* TODO: push this logic down to the bdev modules */
1967 	if (bdev->need_aligned_buffer) {
1968 		return bdev->blocklen;
1969 	}
1970 
1971 	return 1;
1972 }
1973 
1974 uint32_t
1975 spdk_bdev_get_optimal_io_boundary(const struct spdk_bdev *bdev)
1976 {
1977 	return bdev->optimal_io_boundary;
1978 }
1979 
1980 bool
1981 spdk_bdev_has_write_cache(const struct spdk_bdev *bdev)
1982 {
1983 	return bdev->write_cache;
1984 }
1985 
1986 const struct spdk_uuid *
1987 spdk_bdev_get_uuid(const struct spdk_bdev *bdev)
1988 {
1989 	return &bdev->uuid;
1990 }
1991 
1992 uint64_t
1993 spdk_bdev_get_qd(const struct spdk_bdev *bdev)
1994 {
1995 	return bdev->internal.measured_queue_depth;
1996 }
1997 
1998 uint64_t
1999 spdk_bdev_get_qd_sampling_period(const struct spdk_bdev *bdev)
2000 {
2001 	return bdev->internal.period;
2002 }
2003 
2004 uint64_t
2005 spdk_bdev_get_weighted_io_time(const struct spdk_bdev *bdev)
2006 {
2007 	return bdev->internal.weighted_io_time;
2008 }
2009 
2010 uint64_t
2011 spdk_bdev_get_io_time(const struct spdk_bdev *bdev)
2012 {
2013 	return bdev->internal.io_time;
2014 }
2015 
2016 static void
2017 _calculate_measured_qd_cpl(struct spdk_io_channel_iter *i, int status)
2018 {
2019 	struct spdk_bdev *bdev = spdk_io_channel_iter_get_ctx(i);
2020 
2021 	bdev->internal.measured_queue_depth = bdev->internal.temporary_queue_depth;
2022 
2023 	if (bdev->internal.measured_queue_depth) {
2024 		bdev->internal.io_time += bdev->internal.period;
2025 		bdev->internal.weighted_io_time += bdev->internal.period * bdev->internal.measured_queue_depth;
2026 	}
2027 }
2028 
2029 static void
2030 _calculate_measured_qd(struct spdk_io_channel_iter *i)
2031 {
2032 	struct spdk_bdev *bdev = spdk_io_channel_iter_get_ctx(i);
2033 	struct spdk_io_channel *io_ch = spdk_io_channel_iter_get_channel(i);
2034 	struct spdk_bdev_channel *ch = spdk_io_channel_get_ctx(io_ch);
2035 
2036 	bdev->internal.temporary_queue_depth += ch->io_outstanding;
2037 	spdk_for_each_channel_continue(i, 0);
2038 }
2039 
2040 static int
2041 spdk_bdev_calculate_measured_queue_depth(void *ctx)
2042 {
2043 	struct spdk_bdev *bdev = ctx;
2044 	bdev->internal.temporary_queue_depth = 0;
2045 	spdk_for_each_channel(__bdev_to_io_dev(bdev), _calculate_measured_qd, bdev,
2046 			      _calculate_measured_qd_cpl);
2047 	return 0;
2048 }
2049 
2050 void
2051 spdk_bdev_set_qd_sampling_period(struct spdk_bdev *bdev, uint64_t period)
2052 {
2053 	bdev->internal.period = period;
2054 
2055 	if (bdev->internal.qd_poller != NULL) {
2056 		spdk_poller_unregister(&bdev->internal.qd_poller);
2057 		bdev->internal.measured_queue_depth = UINT64_MAX;
2058 	}
2059 
2060 	if (period != 0) {
2061 		bdev->internal.qd_poller = spdk_poller_register(spdk_bdev_calculate_measured_queue_depth, bdev,
2062 					   period);
2063 	}
2064 }
2065 
2066 int
2067 spdk_bdev_notify_blockcnt_change(struct spdk_bdev *bdev, uint64_t size)
2068 {
2069 	int ret;
2070 
2071 	pthread_mutex_lock(&bdev->internal.mutex);
2072 
2073 	/* bdev has open descriptors */
2074 	if (!TAILQ_EMPTY(&bdev->internal.open_descs) &&
2075 	    bdev->blockcnt > size) {
2076 		ret = -EBUSY;
2077 	} else {
2078 		bdev->blockcnt = size;
2079 		ret = 0;
2080 	}
2081 
2082 	pthread_mutex_unlock(&bdev->internal.mutex);
2083 
2084 	return ret;
2085 }
2086 
2087 /*
2088  * Convert I/O offset and length from bytes to blocks.
2089  *
2090  * Returns zero on success or non-zero if the byte parameters aren't divisible by the block size.
2091  */
2092 static uint64_t
2093 spdk_bdev_bytes_to_blocks(struct spdk_bdev *bdev, uint64_t offset_bytes, uint64_t *offset_blocks,
2094 			  uint64_t num_bytes, uint64_t *num_blocks)
2095 {
2096 	uint32_t block_size = bdev->blocklen;
2097 
2098 	*offset_blocks = offset_bytes / block_size;
2099 	*num_blocks = num_bytes / block_size;
2100 
2101 	return (offset_bytes % block_size) | (num_bytes % block_size);
2102 }
2103 
2104 static bool
2105 spdk_bdev_io_valid_blocks(struct spdk_bdev *bdev, uint64_t offset_blocks, uint64_t num_blocks)
2106 {
2107 	/* Return failure if offset_blocks + num_blocks is less than offset_blocks; indicates there
2108 	 * has been an overflow and hence the offset has been wrapped around */
2109 	if (offset_blocks + num_blocks < offset_blocks) {
2110 		return false;
2111 	}
2112 
2113 	/* Return failure if offset_blocks + num_blocks exceeds the size of the bdev */
2114 	if (offset_blocks + num_blocks > bdev->blockcnt) {
2115 		return false;
2116 	}
2117 
2118 	return true;
2119 }
2120 
2121 int
2122 spdk_bdev_read(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
2123 	       void *buf, uint64_t offset, uint64_t nbytes,
2124 	       spdk_bdev_io_completion_cb cb, void *cb_arg)
2125 {
2126 	uint64_t offset_blocks, num_blocks;
2127 
2128 	if (spdk_bdev_bytes_to_blocks(desc->bdev, offset, &offset_blocks, nbytes, &num_blocks) != 0) {
2129 		return -EINVAL;
2130 	}
2131 
2132 	return spdk_bdev_read_blocks(desc, ch, buf, offset_blocks, num_blocks, cb, cb_arg);
2133 }
2134 
2135 int
2136 spdk_bdev_read_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
2137 		      void *buf, uint64_t offset_blocks, uint64_t num_blocks,
2138 		      spdk_bdev_io_completion_cb cb, void *cb_arg)
2139 {
2140 	struct spdk_bdev *bdev = desc->bdev;
2141 	struct spdk_bdev_io *bdev_io;
2142 	struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch);
2143 
2144 	if (!spdk_bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) {
2145 		return -EINVAL;
2146 	}
2147 
2148 	bdev_io = spdk_bdev_get_io(channel);
2149 	if (!bdev_io) {
2150 		return -ENOMEM;
2151 	}
2152 
2153 	bdev_io->internal.ch = channel;
2154 	bdev_io->internal.desc = desc;
2155 	bdev_io->type = SPDK_BDEV_IO_TYPE_READ;
2156 	bdev_io->u.bdev.iovs = &bdev_io->iov;
2157 	bdev_io->u.bdev.iovs[0].iov_base = buf;
2158 	bdev_io->u.bdev.iovs[0].iov_len = num_blocks * bdev->blocklen;
2159 	bdev_io->u.bdev.iovcnt = 1;
2160 	bdev_io->u.bdev.num_blocks = num_blocks;
2161 	bdev_io->u.bdev.offset_blocks = offset_blocks;
2162 	spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb);
2163 
2164 	spdk_bdev_io_submit(bdev_io);
2165 	return 0;
2166 }
2167 
2168 int
2169 spdk_bdev_readv(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
2170 		struct iovec *iov, int iovcnt,
2171 		uint64_t offset, uint64_t nbytes,
2172 		spdk_bdev_io_completion_cb cb, void *cb_arg)
2173 {
2174 	uint64_t offset_blocks, num_blocks;
2175 
2176 	if (spdk_bdev_bytes_to_blocks(desc->bdev, offset, &offset_blocks, nbytes, &num_blocks) != 0) {
2177 		return -EINVAL;
2178 	}
2179 
2180 	return spdk_bdev_readv_blocks(desc, ch, iov, iovcnt, offset_blocks, num_blocks, cb, cb_arg);
2181 }
2182 
2183 int spdk_bdev_readv_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
2184 			   struct iovec *iov, int iovcnt,
2185 			   uint64_t offset_blocks, uint64_t num_blocks,
2186 			   spdk_bdev_io_completion_cb cb, void *cb_arg)
2187 {
2188 	struct spdk_bdev *bdev = desc->bdev;
2189 	struct spdk_bdev_io *bdev_io;
2190 	struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch);
2191 
2192 	if (!spdk_bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) {
2193 		return -EINVAL;
2194 	}
2195 
2196 	bdev_io = spdk_bdev_get_io(channel);
2197 	if (!bdev_io) {
2198 		return -ENOMEM;
2199 	}
2200 
2201 	bdev_io->internal.ch = channel;
2202 	bdev_io->internal.desc = desc;
2203 	bdev_io->type = SPDK_BDEV_IO_TYPE_READ;
2204 	bdev_io->u.bdev.iovs = iov;
2205 	bdev_io->u.bdev.iovcnt = iovcnt;
2206 	bdev_io->u.bdev.num_blocks = num_blocks;
2207 	bdev_io->u.bdev.offset_blocks = offset_blocks;
2208 	spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb);
2209 
2210 	spdk_bdev_io_submit(bdev_io);
2211 	return 0;
2212 }
2213 
2214 int
2215 spdk_bdev_write(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
2216 		void *buf, uint64_t offset, uint64_t nbytes,
2217 		spdk_bdev_io_completion_cb cb, void *cb_arg)
2218 {
2219 	uint64_t offset_blocks, num_blocks;
2220 
2221 	if (spdk_bdev_bytes_to_blocks(desc->bdev, offset, &offset_blocks, nbytes, &num_blocks) != 0) {
2222 		return -EINVAL;
2223 	}
2224 
2225 	return spdk_bdev_write_blocks(desc, ch, buf, offset_blocks, num_blocks, cb, cb_arg);
2226 }
2227 
2228 int
2229 spdk_bdev_write_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
2230 		       void *buf, uint64_t offset_blocks, uint64_t num_blocks,
2231 		       spdk_bdev_io_completion_cb cb, void *cb_arg)
2232 {
2233 	struct spdk_bdev *bdev = desc->bdev;
2234 	struct spdk_bdev_io *bdev_io;
2235 	struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch);
2236 
2237 	if (!desc->write) {
2238 		return -EBADF;
2239 	}
2240 
2241 	if (!spdk_bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) {
2242 		return -EINVAL;
2243 	}
2244 
2245 	bdev_io = spdk_bdev_get_io(channel);
2246 	if (!bdev_io) {
2247 		return -ENOMEM;
2248 	}
2249 
2250 	bdev_io->internal.ch = channel;
2251 	bdev_io->internal.desc = desc;
2252 	bdev_io->type = SPDK_BDEV_IO_TYPE_WRITE;
2253 	bdev_io->u.bdev.iovs = &bdev_io->iov;
2254 	bdev_io->u.bdev.iovs[0].iov_base = buf;
2255 	bdev_io->u.bdev.iovs[0].iov_len = num_blocks * bdev->blocklen;
2256 	bdev_io->u.bdev.iovcnt = 1;
2257 	bdev_io->u.bdev.num_blocks = num_blocks;
2258 	bdev_io->u.bdev.offset_blocks = offset_blocks;
2259 	spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb);
2260 
2261 	spdk_bdev_io_submit(bdev_io);
2262 	return 0;
2263 }
2264 
2265 int
2266 spdk_bdev_writev(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
2267 		 struct iovec *iov, int iovcnt,
2268 		 uint64_t offset, uint64_t len,
2269 		 spdk_bdev_io_completion_cb cb, void *cb_arg)
2270 {
2271 	uint64_t offset_blocks, num_blocks;
2272 
2273 	if (spdk_bdev_bytes_to_blocks(desc->bdev, offset, &offset_blocks, len, &num_blocks) != 0) {
2274 		return -EINVAL;
2275 	}
2276 
2277 	return spdk_bdev_writev_blocks(desc, ch, iov, iovcnt, offset_blocks, num_blocks, cb, cb_arg);
2278 }
2279 
2280 int
2281 spdk_bdev_writev_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
2282 			struct iovec *iov, int iovcnt,
2283 			uint64_t offset_blocks, uint64_t num_blocks,
2284 			spdk_bdev_io_completion_cb cb, void *cb_arg)
2285 {
2286 	struct spdk_bdev *bdev = desc->bdev;
2287 	struct spdk_bdev_io *bdev_io;
2288 	struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch);
2289 
2290 	if (!desc->write) {
2291 		return -EBADF;
2292 	}
2293 
2294 	if (!spdk_bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) {
2295 		return -EINVAL;
2296 	}
2297 
2298 	bdev_io = spdk_bdev_get_io(channel);
2299 	if (!bdev_io) {
2300 		return -ENOMEM;
2301 	}
2302 
2303 	bdev_io->internal.ch = channel;
2304 	bdev_io->internal.desc = desc;
2305 	bdev_io->type = SPDK_BDEV_IO_TYPE_WRITE;
2306 	bdev_io->u.bdev.iovs = iov;
2307 	bdev_io->u.bdev.iovcnt = iovcnt;
2308 	bdev_io->u.bdev.num_blocks = num_blocks;
2309 	bdev_io->u.bdev.offset_blocks = offset_blocks;
2310 	spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb);
2311 
2312 	spdk_bdev_io_submit(bdev_io);
2313 	return 0;
2314 }
2315 
2316 int
2317 spdk_bdev_write_zeroes(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
2318 		       uint64_t offset, uint64_t len,
2319 		       spdk_bdev_io_completion_cb cb, void *cb_arg)
2320 {
2321 	uint64_t offset_blocks, num_blocks;
2322 
2323 	if (spdk_bdev_bytes_to_blocks(desc->bdev, offset, &offset_blocks, len, &num_blocks) != 0) {
2324 		return -EINVAL;
2325 	}
2326 
2327 	return spdk_bdev_write_zeroes_blocks(desc, ch, offset_blocks, num_blocks, cb, cb_arg);
2328 }
2329 
2330 int
2331 spdk_bdev_write_zeroes_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
2332 			      uint64_t offset_blocks, uint64_t num_blocks,
2333 			      spdk_bdev_io_completion_cb cb, void *cb_arg)
2334 {
2335 	struct spdk_bdev *bdev = desc->bdev;
2336 	struct spdk_bdev_io *bdev_io;
2337 	struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch);
2338 
2339 	if (!desc->write) {
2340 		return -EBADF;
2341 	}
2342 
2343 	if (!spdk_bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) {
2344 		return -EINVAL;
2345 	}
2346 
2347 	bdev_io = spdk_bdev_get_io(channel);
2348 
2349 	if (!bdev_io) {
2350 		return -ENOMEM;
2351 	}
2352 
2353 	bdev_io->type = SPDK_BDEV_IO_TYPE_WRITE_ZEROES;
2354 	bdev_io->internal.ch = channel;
2355 	bdev_io->internal.desc = desc;
2356 	bdev_io->u.bdev.offset_blocks = offset_blocks;
2357 	bdev_io->u.bdev.num_blocks = num_blocks;
2358 	spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb);
2359 
2360 	if (_spdk_bdev_io_type_supported(bdev, SPDK_BDEV_IO_TYPE_WRITE_ZEROES)) {
2361 		spdk_bdev_io_submit(bdev_io);
2362 		return 0;
2363 	} else if (_spdk_bdev_io_type_supported(bdev, SPDK_BDEV_IO_TYPE_WRITE)) {
2364 		assert(spdk_bdev_get_block_size(bdev) <= ZERO_BUFFER_SIZE);
2365 		bdev_io->u.bdev.split_remaining_num_blocks = num_blocks;
2366 		bdev_io->u.bdev.split_current_offset_blocks = offset_blocks;
2367 		_spdk_bdev_write_zero_buffer_next(bdev_io);
2368 		return 0;
2369 	} else {
2370 		spdk_bdev_free_io(bdev_io);
2371 		return -ENOTSUP;
2372 	}
2373 }
2374 
2375 int
2376 spdk_bdev_unmap(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
2377 		uint64_t offset, uint64_t nbytes,
2378 		spdk_bdev_io_completion_cb cb, void *cb_arg)
2379 {
2380 	uint64_t offset_blocks, num_blocks;
2381 
2382 	if (spdk_bdev_bytes_to_blocks(desc->bdev, offset, &offset_blocks, nbytes, &num_blocks) != 0) {
2383 		return -EINVAL;
2384 	}
2385 
2386 	return spdk_bdev_unmap_blocks(desc, ch, offset_blocks, num_blocks, cb, cb_arg);
2387 }
2388 
2389 int
2390 spdk_bdev_unmap_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
2391 		       uint64_t offset_blocks, uint64_t num_blocks,
2392 		       spdk_bdev_io_completion_cb cb, void *cb_arg)
2393 {
2394 	struct spdk_bdev *bdev = desc->bdev;
2395 	struct spdk_bdev_io *bdev_io;
2396 	struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch);
2397 
2398 	if (!desc->write) {
2399 		return -EBADF;
2400 	}
2401 
2402 	if (!spdk_bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) {
2403 		return -EINVAL;
2404 	}
2405 
2406 	if (num_blocks == 0) {
2407 		SPDK_ERRLOG("Can't unmap 0 bytes\n");
2408 		return -EINVAL;
2409 	}
2410 
2411 	bdev_io = spdk_bdev_get_io(channel);
2412 	if (!bdev_io) {
2413 		return -ENOMEM;
2414 	}
2415 
2416 	bdev_io->internal.ch = channel;
2417 	bdev_io->internal.desc = desc;
2418 	bdev_io->type = SPDK_BDEV_IO_TYPE_UNMAP;
2419 
2420 	bdev_io->u.bdev.iovs = &bdev_io->iov;
2421 	bdev_io->u.bdev.iovs[0].iov_base = NULL;
2422 	bdev_io->u.bdev.iovs[0].iov_len = 0;
2423 	bdev_io->u.bdev.iovcnt = 1;
2424 
2425 	bdev_io->u.bdev.offset_blocks = offset_blocks;
2426 	bdev_io->u.bdev.num_blocks = num_blocks;
2427 	spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb);
2428 
2429 	spdk_bdev_io_submit(bdev_io);
2430 	return 0;
2431 }
2432 
2433 int
2434 spdk_bdev_flush(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
2435 		uint64_t offset, uint64_t length,
2436 		spdk_bdev_io_completion_cb cb, void *cb_arg)
2437 {
2438 	uint64_t offset_blocks, num_blocks;
2439 
2440 	if (spdk_bdev_bytes_to_blocks(desc->bdev, offset, &offset_blocks, length, &num_blocks) != 0) {
2441 		return -EINVAL;
2442 	}
2443 
2444 	return spdk_bdev_flush_blocks(desc, ch, offset_blocks, num_blocks, cb, cb_arg);
2445 }
2446 
2447 int
2448 spdk_bdev_flush_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
2449 		       uint64_t offset_blocks, uint64_t num_blocks,
2450 		       spdk_bdev_io_completion_cb cb, void *cb_arg)
2451 {
2452 	struct spdk_bdev *bdev = desc->bdev;
2453 	struct spdk_bdev_io *bdev_io;
2454 	struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch);
2455 
2456 	if (!desc->write) {
2457 		return -EBADF;
2458 	}
2459 
2460 	if (!spdk_bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) {
2461 		return -EINVAL;
2462 	}
2463 
2464 	bdev_io = spdk_bdev_get_io(channel);
2465 	if (!bdev_io) {
2466 		return -ENOMEM;
2467 	}
2468 
2469 	bdev_io->internal.ch = channel;
2470 	bdev_io->internal.desc = desc;
2471 	bdev_io->type = SPDK_BDEV_IO_TYPE_FLUSH;
2472 	bdev_io->u.bdev.iovs = NULL;
2473 	bdev_io->u.bdev.iovcnt = 0;
2474 	bdev_io->u.bdev.offset_blocks = offset_blocks;
2475 	bdev_io->u.bdev.num_blocks = num_blocks;
2476 	spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb);
2477 
2478 	spdk_bdev_io_submit(bdev_io);
2479 	return 0;
2480 }
2481 
2482 static void
2483 _spdk_bdev_reset_dev(struct spdk_io_channel_iter *i, int status)
2484 {
2485 	struct spdk_bdev_channel *ch = spdk_io_channel_iter_get_ctx(i);
2486 	struct spdk_bdev_io *bdev_io;
2487 
2488 	bdev_io = TAILQ_FIRST(&ch->queued_resets);
2489 	TAILQ_REMOVE(&ch->queued_resets, bdev_io, internal.link);
2490 	spdk_bdev_io_submit_reset(bdev_io);
2491 }
2492 
2493 static void
2494 _spdk_bdev_reset_freeze_channel(struct spdk_io_channel_iter *i)
2495 {
2496 	struct spdk_io_channel		*ch;
2497 	struct spdk_bdev_channel	*channel;
2498 	struct spdk_bdev_mgmt_channel	*mgmt_channel;
2499 	struct spdk_bdev_shared_resource *shared_resource;
2500 	bdev_io_tailq_t			tmp_queued;
2501 
2502 	TAILQ_INIT(&tmp_queued);
2503 
2504 	ch = spdk_io_channel_iter_get_channel(i);
2505 	channel = spdk_io_channel_get_ctx(ch);
2506 	shared_resource = channel->shared_resource;
2507 	mgmt_channel = shared_resource->mgmt_ch;
2508 
2509 	channel->flags |= BDEV_CH_RESET_IN_PROGRESS;
2510 
2511 	if ((channel->flags & BDEV_CH_QOS_ENABLED) != 0) {
2512 		/* The QoS object is always valid and readable while
2513 		 * the channel flag is set, so the lock here should not
2514 		 * be necessary. We're not in the fast path though, so
2515 		 * just take it anyway. */
2516 		pthread_mutex_lock(&channel->bdev->internal.mutex);
2517 		if (channel->bdev->internal.qos->ch == channel) {
2518 			TAILQ_SWAP(&channel->bdev->internal.qos->queued, &tmp_queued, spdk_bdev_io, internal.link);
2519 		}
2520 		pthread_mutex_unlock(&channel->bdev->internal.mutex);
2521 	}
2522 
2523 	_spdk_bdev_abort_queued_io(&shared_resource->nomem_io, channel);
2524 	_spdk_bdev_abort_buf_io(&mgmt_channel->need_buf_small, channel);
2525 	_spdk_bdev_abort_buf_io(&mgmt_channel->need_buf_large, channel);
2526 	_spdk_bdev_abort_queued_io(&tmp_queued, channel);
2527 
2528 	spdk_for_each_channel_continue(i, 0);
2529 }
2530 
2531 static void
2532 _spdk_bdev_start_reset(void *ctx)
2533 {
2534 	struct spdk_bdev_channel *ch = ctx;
2535 
2536 	spdk_for_each_channel(__bdev_to_io_dev(ch->bdev), _spdk_bdev_reset_freeze_channel,
2537 			      ch, _spdk_bdev_reset_dev);
2538 }
2539 
2540 static void
2541 _spdk_bdev_channel_start_reset(struct spdk_bdev_channel *ch)
2542 {
2543 	struct spdk_bdev *bdev = ch->bdev;
2544 
2545 	assert(!TAILQ_EMPTY(&ch->queued_resets));
2546 
2547 	pthread_mutex_lock(&bdev->internal.mutex);
2548 	if (bdev->internal.reset_in_progress == NULL) {
2549 		bdev->internal.reset_in_progress = TAILQ_FIRST(&ch->queued_resets);
2550 		/*
2551 		 * Take a channel reference for the target bdev for the life of this
2552 		 *  reset.  This guards against the channel getting destroyed while
2553 		 *  spdk_for_each_channel() calls related to this reset IO are in
2554 		 *  progress.  We will release the reference when this reset is
2555 		 *  completed.
2556 		 */
2557 		bdev->internal.reset_in_progress->u.reset.ch_ref = spdk_get_io_channel(__bdev_to_io_dev(bdev));
2558 		_spdk_bdev_start_reset(ch);
2559 	}
2560 	pthread_mutex_unlock(&bdev->internal.mutex);
2561 }
2562 
2563 int
2564 spdk_bdev_reset(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
2565 		spdk_bdev_io_completion_cb cb, void *cb_arg)
2566 {
2567 	struct spdk_bdev *bdev = desc->bdev;
2568 	struct spdk_bdev_io *bdev_io;
2569 	struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch);
2570 
2571 	bdev_io = spdk_bdev_get_io(channel);
2572 	if (!bdev_io) {
2573 		return -ENOMEM;
2574 	}
2575 
2576 	bdev_io->internal.ch = channel;
2577 	bdev_io->internal.desc = desc;
2578 	bdev_io->type = SPDK_BDEV_IO_TYPE_RESET;
2579 	bdev_io->u.reset.ch_ref = NULL;
2580 	spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb);
2581 
2582 	pthread_mutex_lock(&bdev->internal.mutex);
2583 	TAILQ_INSERT_TAIL(&channel->queued_resets, bdev_io, internal.link);
2584 	pthread_mutex_unlock(&bdev->internal.mutex);
2585 
2586 	_spdk_bdev_channel_start_reset(channel);
2587 
2588 	return 0;
2589 }
2590 
2591 void
2592 spdk_bdev_get_io_stat(struct spdk_bdev *bdev, struct spdk_io_channel *ch,
2593 		      struct spdk_bdev_io_stat *stat)
2594 {
2595 	struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch);
2596 
2597 	*stat = channel->stat;
2598 }
2599 
2600 static void
2601 _spdk_bdev_get_device_stat_done(struct spdk_io_channel_iter *i, int status)
2602 {
2603 	void *io_device = spdk_io_channel_iter_get_io_device(i);
2604 	struct spdk_bdev_iostat_ctx *bdev_iostat_ctx = spdk_io_channel_iter_get_ctx(i);
2605 
2606 	bdev_iostat_ctx->cb(__bdev_from_io_dev(io_device), bdev_iostat_ctx->stat,
2607 			    bdev_iostat_ctx->cb_arg, 0);
2608 	free(bdev_iostat_ctx);
2609 }
2610 
2611 static void
2612 _spdk_bdev_get_each_channel_stat(struct spdk_io_channel_iter *i)
2613 {
2614 	struct spdk_bdev_iostat_ctx *bdev_iostat_ctx = spdk_io_channel_iter_get_ctx(i);
2615 	struct spdk_io_channel *ch = spdk_io_channel_iter_get_channel(i);
2616 	struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch);
2617 
2618 	_spdk_bdev_io_stat_add(bdev_iostat_ctx->stat, &channel->stat);
2619 	spdk_for_each_channel_continue(i, 0);
2620 }
2621 
2622 void
2623 spdk_bdev_get_device_stat(struct spdk_bdev *bdev, struct spdk_bdev_io_stat *stat,
2624 			  spdk_bdev_get_device_stat_cb cb, void *cb_arg)
2625 {
2626 	struct spdk_bdev_iostat_ctx *bdev_iostat_ctx;
2627 
2628 	assert(bdev != NULL);
2629 	assert(stat != NULL);
2630 	assert(cb != NULL);
2631 
2632 	bdev_iostat_ctx = calloc(1, sizeof(struct spdk_bdev_iostat_ctx));
2633 	if (bdev_iostat_ctx == NULL) {
2634 		SPDK_ERRLOG("Unable to allocate memory for spdk_bdev_iostat_ctx\n");
2635 		cb(bdev, stat, cb_arg, -ENOMEM);
2636 		return;
2637 	}
2638 
2639 	bdev_iostat_ctx->stat = stat;
2640 	bdev_iostat_ctx->cb = cb;
2641 	bdev_iostat_ctx->cb_arg = cb_arg;
2642 
2643 	/* Start with the statistics from previously deleted channels. */
2644 	pthread_mutex_lock(&bdev->internal.mutex);
2645 	_spdk_bdev_io_stat_add(bdev_iostat_ctx->stat, &bdev->internal.stat);
2646 	pthread_mutex_unlock(&bdev->internal.mutex);
2647 
2648 	/* Then iterate and add the statistics from each existing channel. */
2649 	spdk_for_each_channel(__bdev_to_io_dev(bdev),
2650 			      _spdk_bdev_get_each_channel_stat,
2651 			      bdev_iostat_ctx,
2652 			      _spdk_bdev_get_device_stat_done);
2653 }
2654 
2655 int
2656 spdk_bdev_nvme_admin_passthru(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
2657 			      const struct spdk_nvme_cmd *cmd, void *buf, size_t nbytes,
2658 			      spdk_bdev_io_completion_cb cb, void *cb_arg)
2659 {
2660 	struct spdk_bdev *bdev = desc->bdev;
2661 	struct spdk_bdev_io *bdev_io;
2662 	struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch);
2663 
2664 	if (!desc->write) {
2665 		return -EBADF;
2666 	}
2667 
2668 	bdev_io = spdk_bdev_get_io(channel);
2669 	if (!bdev_io) {
2670 		return -ENOMEM;
2671 	}
2672 
2673 	bdev_io->internal.ch = channel;
2674 	bdev_io->internal.desc = desc;
2675 	bdev_io->type = SPDK_BDEV_IO_TYPE_NVME_ADMIN;
2676 	bdev_io->u.nvme_passthru.cmd = *cmd;
2677 	bdev_io->u.nvme_passthru.buf = buf;
2678 	bdev_io->u.nvme_passthru.nbytes = nbytes;
2679 	bdev_io->u.nvme_passthru.md_buf = NULL;
2680 	bdev_io->u.nvme_passthru.md_len = 0;
2681 
2682 	spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb);
2683 
2684 	spdk_bdev_io_submit(bdev_io);
2685 	return 0;
2686 }
2687 
2688 int
2689 spdk_bdev_nvme_io_passthru(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
2690 			   const struct spdk_nvme_cmd *cmd, void *buf, size_t nbytes,
2691 			   spdk_bdev_io_completion_cb cb, void *cb_arg)
2692 {
2693 	struct spdk_bdev *bdev = desc->bdev;
2694 	struct spdk_bdev_io *bdev_io;
2695 	struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch);
2696 
2697 	if (!desc->write) {
2698 		/*
2699 		 * Do not try to parse the NVMe command - we could maybe use bits in the opcode
2700 		 *  to easily determine if the command is a read or write, but for now just
2701 		 *  do not allow io_passthru with a read-only descriptor.
2702 		 */
2703 		return -EBADF;
2704 	}
2705 
2706 	bdev_io = spdk_bdev_get_io(channel);
2707 	if (!bdev_io) {
2708 		return -ENOMEM;
2709 	}
2710 
2711 	bdev_io->internal.ch = channel;
2712 	bdev_io->internal.desc = desc;
2713 	bdev_io->type = SPDK_BDEV_IO_TYPE_NVME_IO;
2714 	bdev_io->u.nvme_passthru.cmd = *cmd;
2715 	bdev_io->u.nvme_passthru.buf = buf;
2716 	bdev_io->u.nvme_passthru.nbytes = nbytes;
2717 	bdev_io->u.nvme_passthru.md_buf = NULL;
2718 	bdev_io->u.nvme_passthru.md_len = 0;
2719 
2720 	spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb);
2721 
2722 	spdk_bdev_io_submit(bdev_io);
2723 	return 0;
2724 }
2725 
2726 int
2727 spdk_bdev_nvme_io_passthru_md(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
2728 			      const struct spdk_nvme_cmd *cmd, void *buf, size_t nbytes, void *md_buf, size_t md_len,
2729 			      spdk_bdev_io_completion_cb cb, void *cb_arg)
2730 {
2731 	struct spdk_bdev *bdev = desc->bdev;
2732 	struct spdk_bdev_io *bdev_io;
2733 	struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch);
2734 
2735 	if (!desc->write) {
2736 		/*
2737 		 * Do not try to parse the NVMe command - we could maybe use bits in the opcode
2738 		 *  to easily determine if the command is a read or write, but for now just
2739 		 *  do not allow io_passthru with a read-only descriptor.
2740 		 */
2741 		return -EBADF;
2742 	}
2743 
2744 	bdev_io = spdk_bdev_get_io(channel);
2745 	if (!bdev_io) {
2746 		return -ENOMEM;
2747 	}
2748 
2749 	bdev_io->internal.ch = channel;
2750 	bdev_io->internal.desc = desc;
2751 	bdev_io->type = SPDK_BDEV_IO_TYPE_NVME_IO_MD;
2752 	bdev_io->u.nvme_passthru.cmd = *cmd;
2753 	bdev_io->u.nvme_passthru.buf = buf;
2754 	bdev_io->u.nvme_passthru.nbytes = nbytes;
2755 	bdev_io->u.nvme_passthru.md_buf = md_buf;
2756 	bdev_io->u.nvme_passthru.md_len = md_len;
2757 
2758 	spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb);
2759 
2760 	spdk_bdev_io_submit(bdev_io);
2761 	return 0;
2762 }
2763 
2764 int
2765 spdk_bdev_queue_io_wait(struct spdk_bdev *bdev, struct spdk_io_channel *ch,
2766 			struct spdk_bdev_io_wait_entry *entry)
2767 {
2768 	struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch);
2769 	struct spdk_bdev_mgmt_channel *mgmt_ch = channel->shared_resource->mgmt_ch;
2770 
2771 	if (bdev != entry->bdev) {
2772 		SPDK_ERRLOG("bdevs do not match\n");
2773 		return -EINVAL;
2774 	}
2775 
2776 	if (mgmt_ch->per_thread_cache_count > 0) {
2777 		SPDK_ERRLOG("Cannot queue io_wait if spdk_bdev_io available in per-thread cache\n");
2778 		return -EINVAL;
2779 	}
2780 
2781 	TAILQ_INSERT_TAIL(&mgmt_ch->io_wait_queue, entry, link);
2782 	return 0;
2783 }
2784 
2785 static void
2786 _spdk_bdev_ch_retry_io(struct spdk_bdev_channel *bdev_ch)
2787 {
2788 	struct spdk_bdev *bdev = bdev_ch->bdev;
2789 	struct spdk_bdev_shared_resource *shared_resource = bdev_ch->shared_resource;
2790 	struct spdk_bdev_io *bdev_io;
2791 
2792 	if (shared_resource->io_outstanding > shared_resource->nomem_threshold) {
2793 		/*
2794 		 * Allow some more I/O to complete before retrying the nomem_io queue.
2795 		 *  Some drivers (such as nvme) cannot immediately take a new I/O in
2796 		 *  the context of a completion, because the resources for the I/O are
2797 		 *  not released until control returns to the bdev poller.  Also, we
2798 		 *  may require several small I/O to complete before a larger I/O
2799 		 *  (that requires splitting) can be submitted.
2800 		 */
2801 		return;
2802 	}
2803 
2804 	while (!TAILQ_EMPTY(&shared_resource->nomem_io)) {
2805 		bdev_io = TAILQ_FIRST(&shared_resource->nomem_io);
2806 		TAILQ_REMOVE(&shared_resource->nomem_io, bdev_io, internal.link);
2807 		bdev_io->internal.ch->io_outstanding++;
2808 		shared_resource->io_outstanding++;
2809 		bdev_io->internal.status = SPDK_BDEV_IO_STATUS_PENDING;
2810 		bdev->fn_table->submit_request(bdev_io->internal.ch->channel, bdev_io);
2811 		if (bdev_io->internal.status == SPDK_BDEV_IO_STATUS_NOMEM) {
2812 			break;
2813 		}
2814 	}
2815 }
2816 
2817 static inline void
2818 _spdk_bdev_io_complete(void *ctx)
2819 {
2820 	struct spdk_bdev_io *bdev_io = ctx;
2821 	uint64_t tsc;
2822 
2823 	if (spdk_unlikely(bdev_io->internal.in_submit_request || bdev_io->internal.io_submit_ch)) {
2824 		/*
2825 		 * Send the completion to the thread that originally submitted the I/O,
2826 		 * which may not be the current thread in the case of QoS.
2827 		 */
2828 		if (bdev_io->internal.io_submit_ch) {
2829 			bdev_io->internal.ch = bdev_io->internal.io_submit_ch;
2830 			bdev_io->internal.io_submit_ch = NULL;
2831 		}
2832 
2833 		/*
2834 		 * Defer completion to avoid potential infinite recursion if the
2835 		 * user's completion callback issues a new I/O.
2836 		 */
2837 		spdk_thread_send_msg(spdk_io_channel_get_thread(bdev_io->internal.ch->channel),
2838 				     _spdk_bdev_io_complete, bdev_io);
2839 		return;
2840 	}
2841 
2842 	tsc = spdk_get_ticks();
2843 	spdk_trace_record_tsc(tsc, TRACE_BDEV_IO_DONE, 0, 0, (uintptr_t)bdev_io, 0);
2844 
2845 	if (bdev_io->internal.status == SPDK_BDEV_IO_STATUS_SUCCESS) {
2846 		switch (bdev_io->type) {
2847 		case SPDK_BDEV_IO_TYPE_READ:
2848 			bdev_io->internal.ch->stat.bytes_read += bdev_io->u.bdev.num_blocks * bdev_io->bdev->blocklen;
2849 			bdev_io->internal.ch->stat.num_read_ops++;
2850 			bdev_io->internal.ch->stat.read_latency_ticks += (tsc - bdev_io->internal.submit_tsc);
2851 			break;
2852 		case SPDK_BDEV_IO_TYPE_WRITE:
2853 			bdev_io->internal.ch->stat.bytes_written += bdev_io->u.bdev.num_blocks * bdev_io->bdev->blocklen;
2854 			bdev_io->internal.ch->stat.num_write_ops++;
2855 			bdev_io->internal.ch->stat.write_latency_ticks += (tsc - bdev_io->internal.submit_tsc);
2856 			break;
2857 		default:
2858 			break;
2859 		}
2860 	}
2861 
2862 #ifdef SPDK_CONFIG_VTUNE
2863 	uint64_t now_tsc = spdk_get_ticks();
2864 	if (now_tsc > (bdev_io->internal.ch->start_tsc + bdev_io->internal.ch->interval_tsc)) {
2865 		uint64_t data[5];
2866 
2867 		data[0] = bdev_io->internal.ch->stat.num_read_ops - bdev_io->internal.ch->prev_stat.num_read_ops;
2868 		data[1] = bdev_io->internal.ch->stat.bytes_read - bdev_io->internal.ch->prev_stat.bytes_read;
2869 		data[2] = bdev_io->internal.ch->stat.num_write_ops - bdev_io->internal.ch->prev_stat.num_write_ops;
2870 		data[3] = bdev_io->internal.ch->stat.bytes_written - bdev_io->internal.ch->prev_stat.bytes_written;
2871 		data[4] = bdev_io->bdev->fn_table->get_spin_time ?
2872 			  bdev_io->bdev->fn_table->get_spin_time(bdev_io->internal.ch->channel) : 0;
2873 
2874 		__itt_metadata_add(g_bdev_mgr.domain, __itt_null, bdev_io->internal.ch->handle,
2875 				   __itt_metadata_u64, 5, data);
2876 
2877 		bdev_io->internal.ch->prev_stat = bdev_io->internal.ch->stat;
2878 		bdev_io->internal.ch->start_tsc = now_tsc;
2879 	}
2880 #endif
2881 
2882 	assert(bdev_io->internal.cb != NULL);
2883 	assert(spdk_get_thread() == spdk_io_channel_get_thread(bdev_io->internal.ch->channel));
2884 
2885 	bdev_io->internal.cb(bdev_io, bdev_io->internal.status == SPDK_BDEV_IO_STATUS_SUCCESS,
2886 			     bdev_io->internal.caller_ctx);
2887 }
2888 
2889 static void
2890 _spdk_bdev_reset_complete(struct spdk_io_channel_iter *i, int status)
2891 {
2892 	struct spdk_bdev_io *bdev_io = spdk_io_channel_iter_get_ctx(i);
2893 
2894 	if (bdev_io->u.reset.ch_ref != NULL) {
2895 		spdk_put_io_channel(bdev_io->u.reset.ch_ref);
2896 		bdev_io->u.reset.ch_ref = NULL;
2897 	}
2898 
2899 	_spdk_bdev_io_complete(bdev_io);
2900 }
2901 
2902 static void
2903 _spdk_bdev_unfreeze_channel(struct spdk_io_channel_iter *i)
2904 {
2905 	struct spdk_io_channel *_ch = spdk_io_channel_iter_get_channel(i);
2906 	struct spdk_bdev_channel *ch = spdk_io_channel_get_ctx(_ch);
2907 
2908 	ch->flags &= ~BDEV_CH_RESET_IN_PROGRESS;
2909 	if (!TAILQ_EMPTY(&ch->queued_resets)) {
2910 		_spdk_bdev_channel_start_reset(ch);
2911 	}
2912 
2913 	spdk_for_each_channel_continue(i, 0);
2914 }
2915 
2916 void
2917 spdk_bdev_io_complete(struct spdk_bdev_io *bdev_io, enum spdk_bdev_io_status status)
2918 {
2919 	struct spdk_bdev *bdev = bdev_io->bdev;
2920 	struct spdk_bdev_channel *bdev_ch = bdev_io->internal.ch;
2921 	struct spdk_bdev_shared_resource *shared_resource = bdev_ch->shared_resource;
2922 
2923 	bdev_io->internal.status = status;
2924 
2925 	if (spdk_unlikely(bdev_io->type == SPDK_BDEV_IO_TYPE_RESET)) {
2926 		bool unlock_channels = false;
2927 
2928 		if (status == SPDK_BDEV_IO_STATUS_NOMEM) {
2929 			SPDK_ERRLOG("NOMEM returned for reset\n");
2930 		}
2931 		pthread_mutex_lock(&bdev->internal.mutex);
2932 		if (bdev_io == bdev->internal.reset_in_progress) {
2933 			bdev->internal.reset_in_progress = NULL;
2934 			unlock_channels = true;
2935 		}
2936 		pthread_mutex_unlock(&bdev->internal.mutex);
2937 
2938 		if (unlock_channels) {
2939 			spdk_for_each_channel(__bdev_to_io_dev(bdev), _spdk_bdev_unfreeze_channel,
2940 					      bdev_io, _spdk_bdev_reset_complete);
2941 			return;
2942 		}
2943 	} else {
2944 		assert(bdev_ch->io_outstanding > 0);
2945 		assert(shared_resource->io_outstanding > 0);
2946 		bdev_ch->io_outstanding--;
2947 		shared_resource->io_outstanding--;
2948 
2949 		if (spdk_unlikely(status == SPDK_BDEV_IO_STATUS_NOMEM)) {
2950 			TAILQ_INSERT_HEAD(&shared_resource->nomem_io, bdev_io, internal.link);
2951 			/*
2952 			 * Wait for some of the outstanding I/O to complete before we
2953 			 *  retry any of the nomem_io.  Normally we will wait for
2954 			 *  NOMEM_THRESHOLD_COUNT I/O to complete but for low queue
2955 			 *  depth channels we will instead wait for half to complete.
2956 			 */
2957 			shared_resource->nomem_threshold = spdk_max((int64_t)shared_resource->io_outstanding / 2,
2958 							   (int64_t)shared_resource->io_outstanding - NOMEM_THRESHOLD_COUNT);
2959 			return;
2960 		}
2961 
2962 		if (spdk_unlikely(!TAILQ_EMPTY(&shared_resource->nomem_io))) {
2963 			_spdk_bdev_ch_retry_io(bdev_ch);
2964 		}
2965 	}
2966 
2967 	_spdk_bdev_io_complete(bdev_io);
2968 }
2969 
2970 void
2971 spdk_bdev_io_complete_scsi_status(struct spdk_bdev_io *bdev_io, enum spdk_scsi_status sc,
2972 				  enum spdk_scsi_sense sk, uint8_t asc, uint8_t ascq)
2973 {
2974 	if (sc == SPDK_SCSI_STATUS_GOOD) {
2975 		bdev_io->internal.status = SPDK_BDEV_IO_STATUS_SUCCESS;
2976 	} else {
2977 		bdev_io->internal.status = SPDK_BDEV_IO_STATUS_SCSI_ERROR;
2978 		bdev_io->internal.error.scsi.sc = sc;
2979 		bdev_io->internal.error.scsi.sk = sk;
2980 		bdev_io->internal.error.scsi.asc = asc;
2981 		bdev_io->internal.error.scsi.ascq = ascq;
2982 	}
2983 
2984 	spdk_bdev_io_complete(bdev_io, bdev_io->internal.status);
2985 }
2986 
2987 void
2988 spdk_bdev_io_get_scsi_status(const struct spdk_bdev_io *bdev_io,
2989 			     int *sc, int *sk, int *asc, int *ascq)
2990 {
2991 	assert(sc != NULL);
2992 	assert(sk != NULL);
2993 	assert(asc != NULL);
2994 	assert(ascq != NULL);
2995 
2996 	switch (bdev_io->internal.status) {
2997 	case SPDK_BDEV_IO_STATUS_SUCCESS:
2998 		*sc = SPDK_SCSI_STATUS_GOOD;
2999 		*sk = SPDK_SCSI_SENSE_NO_SENSE;
3000 		*asc = SPDK_SCSI_ASC_NO_ADDITIONAL_SENSE;
3001 		*ascq = SPDK_SCSI_ASCQ_CAUSE_NOT_REPORTABLE;
3002 		break;
3003 	case SPDK_BDEV_IO_STATUS_NVME_ERROR:
3004 		spdk_scsi_nvme_translate(bdev_io, sc, sk, asc, ascq);
3005 		break;
3006 	case SPDK_BDEV_IO_STATUS_SCSI_ERROR:
3007 		*sc = bdev_io->internal.error.scsi.sc;
3008 		*sk = bdev_io->internal.error.scsi.sk;
3009 		*asc = bdev_io->internal.error.scsi.asc;
3010 		*ascq = bdev_io->internal.error.scsi.ascq;
3011 		break;
3012 	default:
3013 		*sc = SPDK_SCSI_STATUS_CHECK_CONDITION;
3014 		*sk = SPDK_SCSI_SENSE_ABORTED_COMMAND;
3015 		*asc = SPDK_SCSI_ASC_NO_ADDITIONAL_SENSE;
3016 		*ascq = SPDK_SCSI_ASCQ_CAUSE_NOT_REPORTABLE;
3017 		break;
3018 	}
3019 }
3020 
3021 void
3022 spdk_bdev_io_complete_nvme_status(struct spdk_bdev_io *bdev_io, int sct, int sc)
3023 {
3024 	if (sct == SPDK_NVME_SCT_GENERIC && sc == SPDK_NVME_SC_SUCCESS) {
3025 		bdev_io->internal.status = SPDK_BDEV_IO_STATUS_SUCCESS;
3026 	} else {
3027 		bdev_io->internal.error.nvme.sct = sct;
3028 		bdev_io->internal.error.nvme.sc = sc;
3029 		bdev_io->internal.status = SPDK_BDEV_IO_STATUS_NVME_ERROR;
3030 	}
3031 
3032 	spdk_bdev_io_complete(bdev_io, bdev_io->internal.status);
3033 }
3034 
3035 void
3036 spdk_bdev_io_get_nvme_status(const struct spdk_bdev_io *bdev_io, int *sct, int *sc)
3037 {
3038 	assert(sct != NULL);
3039 	assert(sc != NULL);
3040 
3041 	if (bdev_io->internal.status == SPDK_BDEV_IO_STATUS_NVME_ERROR) {
3042 		*sct = bdev_io->internal.error.nvme.sct;
3043 		*sc = bdev_io->internal.error.nvme.sc;
3044 	} else if (bdev_io->internal.status == SPDK_BDEV_IO_STATUS_SUCCESS) {
3045 		*sct = SPDK_NVME_SCT_GENERIC;
3046 		*sc = SPDK_NVME_SC_SUCCESS;
3047 	} else {
3048 		*sct = SPDK_NVME_SCT_GENERIC;
3049 		*sc = SPDK_NVME_SC_INTERNAL_DEVICE_ERROR;
3050 	}
3051 }
3052 
3053 struct spdk_thread *
3054 spdk_bdev_io_get_thread(struct spdk_bdev_io *bdev_io)
3055 {
3056 	return spdk_io_channel_get_thread(bdev_io->internal.ch->channel);
3057 }
3058 
3059 static void
3060 _spdk_bdev_qos_config_limit(struct spdk_bdev *bdev, uint64_t *limits)
3061 {
3062 	uint64_t	min_qos_set;
3063 	int		i;
3064 
3065 	for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) {
3066 		if (limits[i] != SPDK_BDEV_QOS_LIMIT_NOT_DEFINED) {
3067 			break;
3068 		}
3069 	}
3070 
3071 	if (i == SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES) {
3072 		SPDK_ERRLOG("Invalid rate limits set.\n");
3073 		return;
3074 	}
3075 
3076 	for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) {
3077 		if (limits[i] == SPDK_BDEV_QOS_LIMIT_NOT_DEFINED) {
3078 			continue;
3079 		}
3080 
3081 		if (_spdk_bdev_qos_is_iops_rate_limit(i) == true) {
3082 			min_qos_set = SPDK_BDEV_QOS_MIN_IOS_PER_SEC;
3083 		} else {
3084 			min_qos_set = SPDK_BDEV_QOS_MIN_BYTES_PER_SEC;
3085 		}
3086 
3087 		if (limits[i] == 0 || limits[i] % min_qos_set) {
3088 			SPDK_ERRLOG("Assigned limit %" PRIu64 " on bdev %s is not multiple of %" PRIu64 "\n",
3089 				    limits[i], bdev->name, min_qos_set);
3090 			SPDK_ERRLOG("Failed to enable QoS on this bdev %s\n", bdev->name);
3091 			return;
3092 		}
3093 	}
3094 
3095 	if (!bdev->internal.qos) {
3096 		bdev->internal.qos = calloc(1, sizeof(*bdev->internal.qos));
3097 		if (!bdev->internal.qos) {
3098 			SPDK_ERRLOG("Unable to allocate memory for QoS tracking\n");
3099 			return;
3100 		}
3101 	}
3102 
3103 	for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) {
3104 		bdev->internal.qos->rate_limits[i].limit = limits[i];
3105 		SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Bdev:%s QoS type:%d set:%lu\n",
3106 			      bdev->name, i, limits[i]);
3107 	}
3108 
3109 	return;
3110 }
3111 
3112 static void
3113 _spdk_bdev_qos_config(struct spdk_bdev *bdev)
3114 {
3115 	struct spdk_conf_section	*sp = NULL;
3116 	const char			*val = NULL;
3117 	int				i = 0, j = 0;
3118 	uint64_t			limits[SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES] = {};
3119 	bool				config_qos = false;
3120 
3121 	sp = spdk_conf_find_section(NULL, "QoS");
3122 	if (!sp) {
3123 		return;
3124 	}
3125 
3126 	while (j < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES) {
3127 		limits[j] = SPDK_BDEV_QOS_LIMIT_NOT_DEFINED;
3128 
3129 		i = 0;
3130 		while (true) {
3131 			val = spdk_conf_section_get_nmval(sp, qos_conf_type[j], i, 0);
3132 			if (!val) {
3133 				break;
3134 			}
3135 
3136 			if (strcmp(bdev->name, val) != 0) {
3137 				i++;
3138 				continue;
3139 			}
3140 
3141 			val = spdk_conf_section_get_nmval(sp, qos_conf_type[j], i, 1);
3142 			if (val) {
3143 				if (_spdk_bdev_qos_is_iops_rate_limit(j) == true) {
3144 					limits[j] = strtoull(val, NULL, 10);
3145 				} else {
3146 					limits[j] = strtoull(val, NULL, 10) * 1024 * 1024;
3147 				}
3148 				config_qos = true;
3149 			}
3150 
3151 			break;
3152 		}
3153 
3154 		j++;
3155 	}
3156 
3157 	if (config_qos == true) {
3158 		_spdk_bdev_qos_config_limit(bdev, limits);
3159 	}
3160 
3161 	return;
3162 }
3163 
3164 static int
3165 spdk_bdev_init(struct spdk_bdev *bdev)
3166 {
3167 	char *bdev_name;
3168 
3169 	assert(bdev->module != NULL);
3170 
3171 	if (!bdev->name) {
3172 		SPDK_ERRLOG("Bdev name is NULL\n");
3173 		return -EINVAL;
3174 	}
3175 
3176 	if (spdk_bdev_get_by_name(bdev->name)) {
3177 		SPDK_ERRLOG("Bdev name:%s already exists\n", bdev->name);
3178 		return -EEXIST;
3179 	}
3180 
3181 	/* Users often register their own I/O devices using the bdev name. In
3182 	 * order to avoid conflicts, prepend bdev_. */
3183 	bdev_name = spdk_sprintf_alloc("bdev_%s", bdev->name);
3184 	if (!bdev_name) {
3185 		SPDK_ERRLOG("Unable to allocate memory for internal bdev name.\n");
3186 		return -ENOMEM;
3187 	}
3188 
3189 	bdev->internal.status = SPDK_BDEV_STATUS_READY;
3190 	bdev->internal.measured_queue_depth = UINT64_MAX;
3191 
3192 	TAILQ_INIT(&bdev->internal.open_descs);
3193 
3194 	TAILQ_INIT(&bdev->aliases);
3195 
3196 	bdev->internal.reset_in_progress = NULL;
3197 
3198 	_spdk_bdev_qos_config(bdev);
3199 
3200 	spdk_io_device_register(__bdev_to_io_dev(bdev),
3201 				spdk_bdev_channel_create, spdk_bdev_channel_destroy,
3202 				sizeof(struct spdk_bdev_channel),
3203 				bdev_name);
3204 
3205 	free(bdev_name);
3206 
3207 	pthread_mutex_init(&bdev->internal.mutex, NULL);
3208 	return 0;
3209 }
3210 
3211 static void
3212 spdk_bdev_destroy_cb(void *io_device)
3213 {
3214 	int			rc;
3215 	struct spdk_bdev	*bdev;
3216 	spdk_bdev_unregister_cb	cb_fn;
3217 	void			*cb_arg;
3218 
3219 	bdev = __bdev_from_io_dev(io_device);
3220 	cb_fn = bdev->internal.unregister_cb;
3221 	cb_arg = bdev->internal.unregister_ctx;
3222 
3223 	rc = bdev->fn_table->destruct(bdev->ctxt);
3224 	if (rc < 0) {
3225 		SPDK_ERRLOG("destruct failed\n");
3226 	}
3227 	if (rc <= 0 && cb_fn != NULL) {
3228 		cb_fn(cb_arg, rc);
3229 	}
3230 }
3231 
3232 
3233 static void
3234 spdk_bdev_fini(struct spdk_bdev *bdev)
3235 {
3236 	pthread_mutex_destroy(&bdev->internal.mutex);
3237 
3238 	free(bdev->internal.qos);
3239 
3240 	spdk_io_device_unregister(__bdev_to_io_dev(bdev), spdk_bdev_destroy_cb);
3241 }
3242 
3243 static void
3244 spdk_bdev_start(struct spdk_bdev *bdev)
3245 {
3246 	struct spdk_bdev_module *module;
3247 	uint32_t action;
3248 
3249 	SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Inserting bdev %s into list\n", bdev->name);
3250 	TAILQ_INSERT_TAIL(&g_bdev_mgr.bdevs, bdev, internal.link);
3251 
3252 	/* Examine configuration before initializing I/O */
3253 	TAILQ_FOREACH(module, &g_bdev_mgr.bdev_modules, internal.tailq) {
3254 		if (module->examine_config) {
3255 			action = module->internal.action_in_progress;
3256 			module->internal.action_in_progress++;
3257 			module->examine_config(bdev);
3258 			if (action != module->internal.action_in_progress) {
3259 				SPDK_ERRLOG("examine_config for module %s did not call spdk_bdev_module_examine_done()\n",
3260 					    module->name);
3261 			}
3262 		}
3263 	}
3264 
3265 	if (bdev->internal.claim_module) {
3266 		return;
3267 	}
3268 
3269 	TAILQ_FOREACH(module, &g_bdev_mgr.bdev_modules, internal.tailq) {
3270 		if (module->examine_disk) {
3271 			module->internal.action_in_progress++;
3272 			module->examine_disk(bdev);
3273 		}
3274 	}
3275 }
3276 
3277 int
3278 spdk_bdev_register(struct spdk_bdev *bdev)
3279 {
3280 	int rc = spdk_bdev_init(bdev);
3281 
3282 	if (rc == 0) {
3283 		spdk_bdev_start(bdev);
3284 	}
3285 
3286 	return rc;
3287 }
3288 
3289 int
3290 spdk_vbdev_register(struct spdk_bdev *vbdev, struct spdk_bdev **base_bdevs, int base_bdev_count)
3291 {
3292 	int rc;
3293 
3294 	rc = spdk_bdev_init(vbdev);
3295 	if (rc) {
3296 		return rc;
3297 	}
3298 
3299 	spdk_bdev_start(vbdev);
3300 	return 0;
3301 }
3302 
3303 void
3304 spdk_bdev_destruct_done(struct spdk_bdev *bdev, int bdeverrno)
3305 {
3306 	if (bdev->internal.unregister_cb != NULL) {
3307 		bdev->internal.unregister_cb(bdev->internal.unregister_ctx, bdeverrno);
3308 	}
3309 }
3310 
3311 static void
3312 _remove_notify(void *arg)
3313 {
3314 	struct spdk_bdev_desc *desc = arg;
3315 
3316 	desc->remove_scheduled = false;
3317 
3318 	if (desc->closed) {
3319 		free(desc);
3320 	} else {
3321 		desc->remove_cb(desc->remove_ctx);
3322 	}
3323 }
3324 
3325 void
3326 spdk_bdev_unregister(struct spdk_bdev *bdev, spdk_bdev_unregister_cb cb_fn, void *cb_arg)
3327 {
3328 	struct spdk_bdev_desc	*desc, *tmp;
3329 	bool			do_destruct = true;
3330 	struct spdk_thread	*thread;
3331 
3332 	SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Removing bdev %s from list\n", bdev->name);
3333 
3334 	thread = spdk_get_thread();
3335 	if (!thread) {
3336 		/* The user called this from a non-SPDK thread. */
3337 		if (cb_fn != NULL) {
3338 			cb_fn(cb_arg, -ENOTSUP);
3339 		}
3340 		return;
3341 	}
3342 
3343 	pthread_mutex_lock(&bdev->internal.mutex);
3344 
3345 	bdev->internal.status = SPDK_BDEV_STATUS_REMOVING;
3346 	bdev->internal.unregister_cb = cb_fn;
3347 	bdev->internal.unregister_ctx = cb_arg;
3348 
3349 	TAILQ_FOREACH_SAFE(desc, &bdev->internal.open_descs, link, tmp) {
3350 		if (desc->remove_cb) {
3351 			do_destruct = false;
3352 			/*
3353 			 * Defer invocation of the remove_cb to a separate message that will
3354 			 *  run later on its thread.  This ensures this context unwinds and
3355 			 *  we don't recursively unregister this bdev again if the remove_cb
3356 			 *  immediately closes its descriptor.
3357 			 */
3358 			if (!desc->remove_scheduled) {
3359 				/* Avoid scheduling removal of the same descriptor multiple times. */
3360 				desc->remove_scheduled = true;
3361 				spdk_thread_send_msg(desc->thread, _remove_notify, desc);
3362 			}
3363 		}
3364 	}
3365 
3366 	if (!do_destruct) {
3367 		pthread_mutex_unlock(&bdev->internal.mutex);
3368 		return;
3369 	}
3370 
3371 	TAILQ_REMOVE(&g_bdev_mgr.bdevs, bdev, internal.link);
3372 	pthread_mutex_unlock(&bdev->internal.mutex);
3373 
3374 	spdk_bdev_fini(bdev);
3375 }
3376 
3377 int
3378 spdk_bdev_open(struct spdk_bdev *bdev, bool write, spdk_bdev_remove_cb_t remove_cb,
3379 	       void *remove_ctx, struct spdk_bdev_desc **_desc)
3380 {
3381 	struct spdk_bdev_desc *desc;
3382 	struct spdk_thread *thread;
3383 
3384 	thread = spdk_get_thread();
3385 	if (!thread) {
3386 		SPDK_ERRLOG("Cannot open bdev from non-SPDK thread.\n");
3387 		return -ENOTSUP;
3388 	}
3389 
3390 	desc = calloc(1, sizeof(*desc));
3391 	if (desc == NULL) {
3392 		SPDK_ERRLOG("Failed to allocate memory for bdev descriptor\n");
3393 		return -ENOMEM;
3394 	}
3395 
3396 	SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Opening descriptor %p for bdev %s on thread %p\n", desc, bdev->name,
3397 		      spdk_get_thread());
3398 
3399 	pthread_mutex_lock(&bdev->internal.mutex);
3400 
3401 	if (write && bdev->internal.claim_module) {
3402 		SPDK_ERRLOG("Could not open %s - %s module already claimed it\n",
3403 			    bdev->name, bdev->internal.claim_module->name);
3404 		free(desc);
3405 		pthread_mutex_unlock(&bdev->internal.mutex);
3406 		return -EPERM;
3407 	}
3408 
3409 	TAILQ_INSERT_TAIL(&bdev->internal.open_descs, desc, link);
3410 
3411 	desc->bdev = bdev;
3412 	desc->thread = thread;
3413 	desc->remove_cb = remove_cb;
3414 	desc->remove_ctx = remove_ctx;
3415 	desc->write = write;
3416 	*_desc = desc;
3417 
3418 	pthread_mutex_unlock(&bdev->internal.mutex);
3419 
3420 	return 0;
3421 }
3422 
3423 void
3424 spdk_bdev_close(struct spdk_bdev_desc *desc)
3425 {
3426 	struct spdk_bdev *bdev = desc->bdev;
3427 	bool do_unregister = false;
3428 
3429 	SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Closing descriptor %p for bdev %s on thread %p\n", desc, bdev->name,
3430 		      spdk_get_thread());
3431 
3432 	assert(desc->thread == spdk_get_thread());
3433 
3434 	pthread_mutex_lock(&bdev->internal.mutex);
3435 
3436 	TAILQ_REMOVE(&bdev->internal.open_descs, desc, link);
3437 
3438 	desc->closed = true;
3439 
3440 	if (!desc->remove_scheduled) {
3441 		free(desc);
3442 	}
3443 
3444 	/* If no more descriptors, kill QoS channel */
3445 	if (bdev->internal.qos && TAILQ_EMPTY(&bdev->internal.open_descs)) {
3446 		SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Closed last descriptor for bdev %s on thread %p. Stopping QoS.\n",
3447 			      bdev->name, spdk_get_thread());
3448 
3449 		if (spdk_bdev_qos_destroy(bdev)) {
3450 			/* There isn't anything we can do to recover here. Just let the
3451 			 * old QoS poller keep running. The QoS handling won't change
3452 			 * cores when the user allocates a new channel, but it won't break. */
3453 			SPDK_ERRLOG("Unable to shut down QoS poller. It will continue running on the current thread.\n");
3454 		}
3455 	}
3456 
3457 	spdk_bdev_set_qd_sampling_period(bdev, 0);
3458 
3459 	if (bdev->internal.status == SPDK_BDEV_STATUS_REMOVING && TAILQ_EMPTY(&bdev->internal.open_descs)) {
3460 		do_unregister = true;
3461 	}
3462 	pthread_mutex_unlock(&bdev->internal.mutex);
3463 
3464 	if (do_unregister == true) {
3465 		spdk_bdev_unregister(bdev, bdev->internal.unregister_cb, bdev->internal.unregister_ctx);
3466 	}
3467 }
3468 
3469 int
3470 spdk_bdev_module_claim_bdev(struct spdk_bdev *bdev, struct spdk_bdev_desc *desc,
3471 			    struct spdk_bdev_module *module)
3472 {
3473 	if (bdev->internal.claim_module != NULL) {
3474 		SPDK_ERRLOG("bdev %s already claimed by module %s\n", bdev->name,
3475 			    bdev->internal.claim_module->name);
3476 		return -EPERM;
3477 	}
3478 
3479 	if (desc && !desc->write) {
3480 		desc->write = true;
3481 	}
3482 
3483 	bdev->internal.claim_module = module;
3484 	return 0;
3485 }
3486 
3487 void
3488 spdk_bdev_module_release_bdev(struct spdk_bdev *bdev)
3489 {
3490 	assert(bdev->internal.claim_module != NULL);
3491 	bdev->internal.claim_module = NULL;
3492 }
3493 
3494 struct spdk_bdev *
3495 spdk_bdev_desc_get_bdev(struct spdk_bdev_desc *desc)
3496 {
3497 	return desc->bdev;
3498 }
3499 
3500 void
3501 spdk_bdev_io_get_iovec(struct spdk_bdev_io *bdev_io, struct iovec **iovp, int *iovcntp)
3502 {
3503 	struct iovec *iovs;
3504 	int iovcnt;
3505 
3506 	if (bdev_io == NULL) {
3507 		return;
3508 	}
3509 
3510 	switch (bdev_io->type) {
3511 	case SPDK_BDEV_IO_TYPE_READ:
3512 		iovs = bdev_io->u.bdev.iovs;
3513 		iovcnt = bdev_io->u.bdev.iovcnt;
3514 		break;
3515 	case SPDK_BDEV_IO_TYPE_WRITE:
3516 		iovs = bdev_io->u.bdev.iovs;
3517 		iovcnt = bdev_io->u.bdev.iovcnt;
3518 		break;
3519 	default:
3520 		iovs = NULL;
3521 		iovcnt = 0;
3522 		break;
3523 	}
3524 
3525 	if (iovp) {
3526 		*iovp = iovs;
3527 	}
3528 	if (iovcntp) {
3529 		*iovcntp = iovcnt;
3530 	}
3531 }
3532 
3533 void
3534 spdk_bdev_module_list_add(struct spdk_bdev_module *bdev_module)
3535 {
3536 
3537 	if (spdk_bdev_module_list_find(bdev_module->name)) {
3538 		SPDK_ERRLOG("ERROR: module '%s' already registered.\n", bdev_module->name);
3539 		assert(false);
3540 	}
3541 
3542 	if (bdev_module->async_init) {
3543 		bdev_module->internal.action_in_progress = 1;
3544 	}
3545 
3546 	/*
3547 	 * Modules with examine callbacks must be initialized first, so they are
3548 	 *  ready to handle examine callbacks from later modules that will
3549 	 *  register physical bdevs.
3550 	 */
3551 	if (bdev_module->examine_config != NULL || bdev_module->examine_disk != NULL) {
3552 		TAILQ_INSERT_HEAD(&g_bdev_mgr.bdev_modules, bdev_module, internal.tailq);
3553 	} else {
3554 		TAILQ_INSERT_TAIL(&g_bdev_mgr.bdev_modules, bdev_module, internal.tailq);
3555 	}
3556 }
3557 
3558 struct spdk_bdev_module *
3559 spdk_bdev_module_list_find(const char *name)
3560 {
3561 	struct spdk_bdev_module *bdev_module;
3562 
3563 	TAILQ_FOREACH(bdev_module, &g_bdev_mgr.bdev_modules, internal.tailq) {
3564 		if (strcmp(name, bdev_module->name) == 0) {
3565 			break;
3566 		}
3567 	}
3568 
3569 	return bdev_module;
3570 }
3571 
3572 static void
3573 _spdk_bdev_write_zero_buffer_next(void *_bdev_io)
3574 {
3575 	struct spdk_bdev_io *bdev_io = _bdev_io;
3576 	uint64_t num_bytes, num_blocks;
3577 	int rc;
3578 
3579 	num_bytes = spdk_min(spdk_bdev_get_block_size(bdev_io->bdev) *
3580 			     bdev_io->u.bdev.split_remaining_num_blocks,
3581 			     ZERO_BUFFER_SIZE);
3582 	num_blocks = num_bytes / spdk_bdev_get_block_size(bdev_io->bdev);
3583 
3584 	rc = spdk_bdev_write_blocks(bdev_io->internal.desc,
3585 				    spdk_io_channel_from_ctx(bdev_io->internal.ch),
3586 				    g_bdev_mgr.zero_buffer,
3587 				    bdev_io->u.bdev.split_current_offset_blocks, num_blocks,
3588 				    _spdk_bdev_write_zero_buffer_done, bdev_io);
3589 	if (rc == 0) {
3590 		bdev_io->u.bdev.split_remaining_num_blocks -= num_blocks;
3591 		bdev_io->u.bdev.split_current_offset_blocks += num_blocks;
3592 	} else if (rc == -ENOMEM) {
3593 		_spdk_bdev_queue_io_wait_with_cb(bdev_io, _spdk_bdev_write_zero_buffer_next);
3594 	} else {
3595 		bdev_io->internal.status = SPDK_BDEV_IO_STATUS_FAILED;
3596 		bdev_io->internal.cb(bdev_io, false, bdev_io->internal.caller_ctx);
3597 	}
3598 }
3599 
3600 static void
3601 _spdk_bdev_write_zero_buffer_done(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg)
3602 {
3603 	struct spdk_bdev_io *parent_io = cb_arg;
3604 
3605 	spdk_bdev_free_io(bdev_io);
3606 
3607 	if (!success) {
3608 		parent_io->internal.status = SPDK_BDEV_IO_STATUS_FAILED;
3609 		parent_io->internal.cb(parent_io, false, parent_io->internal.caller_ctx);
3610 		return;
3611 	}
3612 
3613 	if (parent_io->u.bdev.split_remaining_num_blocks == 0) {
3614 		parent_io->internal.status = SPDK_BDEV_IO_STATUS_SUCCESS;
3615 		parent_io->internal.cb(parent_io, true, parent_io->internal.caller_ctx);
3616 		return;
3617 	}
3618 
3619 	_spdk_bdev_write_zero_buffer_next(parent_io);
3620 }
3621 
3622 struct set_qos_limit_ctx {
3623 	void (*cb_fn)(void *cb_arg, int status);
3624 	void *cb_arg;
3625 	struct spdk_bdev *bdev;
3626 };
3627 
3628 static void
3629 _spdk_bdev_set_qos_limit_done(struct set_qos_limit_ctx *ctx, int status)
3630 {
3631 	pthread_mutex_lock(&ctx->bdev->internal.mutex);
3632 	ctx->bdev->internal.qos_mod_in_progress = false;
3633 	pthread_mutex_unlock(&ctx->bdev->internal.mutex);
3634 
3635 	ctx->cb_fn(ctx->cb_arg, status);
3636 	free(ctx);
3637 }
3638 
3639 static void
3640 _spdk_bdev_disable_qos_done(void *cb_arg)
3641 {
3642 	struct set_qos_limit_ctx *ctx = cb_arg;
3643 	struct spdk_bdev *bdev = ctx->bdev;
3644 	struct spdk_bdev_io *bdev_io;
3645 	struct spdk_bdev_qos *qos;
3646 
3647 	pthread_mutex_lock(&bdev->internal.mutex);
3648 	qos = bdev->internal.qos;
3649 	bdev->internal.qos = NULL;
3650 	pthread_mutex_unlock(&bdev->internal.mutex);
3651 
3652 	while (!TAILQ_EMPTY(&qos->queued)) {
3653 		/* Send queued I/O back to their original thread for resubmission. */
3654 		bdev_io = TAILQ_FIRST(&qos->queued);
3655 		TAILQ_REMOVE(&qos->queued, bdev_io, internal.link);
3656 
3657 		if (bdev_io->internal.io_submit_ch) {
3658 			/*
3659 			 * Channel was changed when sending it to the QoS thread - change it back
3660 			 *  before sending it back to the original thread.
3661 			 */
3662 			bdev_io->internal.ch = bdev_io->internal.io_submit_ch;
3663 			bdev_io->internal.io_submit_ch = NULL;
3664 		}
3665 
3666 		spdk_thread_send_msg(spdk_io_channel_get_thread(bdev_io->internal.ch->channel),
3667 				     _spdk_bdev_io_submit, bdev_io);
3668 	}
3669 
3670 	spdk_put_io_channel(spdk_io_channel_from_ctx(qos->ch));
3671 	spdk_poller_unregister(&qos->poller);
3672 
3673 	free(qos);
3674 
3675 	_spdk_bdev_set_qos_limit_done(ctx, 0);
3676 }
3677 
3678 static void
3679 _spdk_bdev_disable_qos_msg_done(struct spdk_io_channel_iter *i, int status)
3680 {
3681 	void *io_device = spdk_io_channel_iter_get_io_device(i);
3682 	struct spdk_bdev *bdev = __bdev_from_io_dev(io_device);
3683 	struct set_qos_limit_ctx *ctx = spdk_io_channel_iter_get_ctx(i);
3684 	struct spdk_thread *thread;
3685 
3686 	pthread_mutex_lock(&bdev->internal.mutex);
3687 	thread = bdev->internal.qos->thread;
3688 	pthread_mutex_unlock(&bdev->internal.mutex);
3689 
3690 	spdk_thread_send_msg(thread, _spdk_bdev_disable_qos_done, ctx);
3691 }
3692 
3693 static void
3694 _spdk_bdev_disable_qos_msg(struct spdk_io_channel_iter *i)
3695 {
3696 	struct spdk_io_channel *ch = spdk_io_channel_iter_get_channel(i);
3697 	struct spdk_bdev_channel *bdev_ch = spdk_io_channel_get_ctx(ch);
3698 
3699 	bdev_ch->flags &= ~BDEV_CH_QOS_ENABLED;
3700 
3701 	spdk_for_each_channel_continue(i, 0);
3702 }
3703 
3704 static void
3705 _spdk_bdev_update_qos_rate_limit_msg(void *cb_arg)
3706 {
3707 	struct set_qos_limit_ctx *ctx = cb_arg;
3708 	struct spdk_bdev *bdev = ctx->bdev;
3709 
3710 	pthread_mutex_lock(&bdev->internal.mutex);
3711 	spdk_bdev_qos_update_max_quota_per_timeslice(bdev->internal.qos);
3712 	pthread_mutex_unlock(&bdev->internal.mutex);
3713 
3714 	_spdk_bdev_set_qos_limit_done(ctx, 0);
3715 }
3716 
3717 static void
3718 _spdk_bdev_enable_qos_msg(struct spdk_io_channel_iter *i)
3719 {
3720 	void *io_device = spdk_io_channel_iter_get_io_device(i);
3721 	struct spdk_bdev *bdev = __bdev_from_io_dev(io_device);
3722 	struct spdk_io_channel *ch = spdk_io_channel_iter_get_channel(i);
3723 	struct spdk_bdev_channel *bdev_ch = spdk_io_channel_get_ctx(ch);
3724 
3725 	pthread_mutex_lock(&bdev->internal.mutex);
3726 	_spdk_bdev_enable_qos(bdev, bdev_ch);
3727 	pthread_mutex_unlock(&bdev->internal.mutex);
3728 	spdk_for_each_channel_continue(i, 0);
3729 }
3730 
3731 static void
3732 _spdk_bdev_enable_qos_done(struct spdk_io_channel_iter *i, int status)
3733 {
3734 	struct set_qos_limit_ctx *ctx = spdk_io_channel_iter_get_ctx(i);
3735 
3736 	_spdk_bdev_set_qos_limit_done(ctx, status);
3737 }
3738 
3739 static void
3740 _spdk_bdev_set_qos_rate_limits(struct spdk_bdev *bdev, uint64_t *limits)
3741 {
3742 	int i;
3743 
3744 	assert(bdev->internal.qos != NULL);
3745 
3746 	for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) {
3747 		if (limits[i] != SPDK_BDEV_QOS_LIMIT_NOT_DEFINED) {
3748 			bdev->internal.qos->rate_limits[i].limit = limits[i];
3749 
3750 			if (limits[i] == 0) {
3751 				bdev->internal.qos->rate_limits[i].limit =
3752 					SPDK_BDEV_QOS_LIMIT_NOT_DEFINED;
3753 			}
3754 		}
3755 	}
3756 }
3757 
3758 void
3759 spdk_bdev_set_qos_rate_limits(struct spdk_bdev *bdev, uint64_t *limits,
3760 			      void (*cb_fn)(void *cb_arg, int status), void *cb_arg)
3761 {
3762 	struct set_qos_limit_ctx	*ctx;
3763 	uint32_t			limit_set_complement;
3764 	uint64_t			min_limit_per_sec;
3765 	int				i;
3766 	bool				disable_rate_limit = true;
3767 
3768 	for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) {
3769 		if (limits[i] == SPDK_BDEV_QOS_LIMIT_NOT_DEFINED) {
3770 			continue;
3771 		}
3772 
3773 		if (limits[i] > 0) {
3774 			disable_rate_limit = false;
3775 		}
3776 
3777 		if (_spdk_bdev_qos_is_iops_rate_limit(i) == true) {
3778 			min_limit_per_sec = SPDK_BDEV_QOS_MIN_IOS_PER_SEC;
3779 		} else {
3780 			/* Change from megabyte to byte rate limit */
3781 			limits[i] = limits[i] * 1024 * 1024;
3782 			min_limit_per_sec = SPDK_BDEV_QOS_MIN_BYTES_PER_SEC;
3783 		}
3784 
3785 		limit_set_complement = limits[i] % min_limit_per_sec;
3786 		if (limit_set_complement) {
3787 			SPDK_ERRLOG("Requested rate limit %" PRIu64 " is not a multiple of %" PRIu64 "\n",
3788 				    limits[i], min_limit_per_sec);
3789 			limits[i] += min_limit_per_sec - limit_set_complement;
3790 			SPDK_ERRLOG("Round up the rate limit to %" PRIu64 "\n", limits[i]);
3791 		}
3792 	}
3793 
3794 	ctx = calloc(1, sizeof(*ctx));
3795 	if (ctx == NULL) {
3796 		cb_fn(cb_arg, -ENOMEM);
3797 		return;
3798 	}
3799 
3800 	ctx->cb_fn = cb_fn;
3801 	ctx->cb_arg = cb_arg;
3802 	ctx->bdev = bdev;
3803 
3804 	pthread_mutex_lock(&bdev->internal.mutex);
3805 	if (bdev->internal.qos_mod_in_progress) {
3806 		pthread_mutex_unlock(&bdev->internal.mutex);
3807 		free(ctx);
3808 		cb_fn(cb_arg, -EAGAIN);
3809 		return;
3810 	}
3811 	bdev->internal.qos_mod_in_progress = true;
3812 
3813 	if (disable_rate_limit == true && bdev->internal.qos) {
3814 		for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) {
3815 			if (limits[i] == SPDK_BDEV_QOS_LIMIT_NOT_DEFINED &&
3816 			    (bdev->internal.qos->rate_limits[i].limit > 0 &&
3817 			     bdev->internal.qos->rate_limits[i].limit !=
3818 			     SPDK_BDEV_QOS_LIMIT_NOT_DEFINED)) {
3819 				disable_rate_limit = false;
3820 				break;
3821 			}
3822 		}
3823 	}
3824 
3825 	if (disable_rate_limit == false) {
3826 		if (bdev->internal.qos == NULL) {
3827 			/* Enabling */
3828 			bdev->internal.qos = calloc(1, sizeof(*bdev->internal.qos));
3829 			if (!bdev->internal.qos) {
3830 				pthread_mutex_unlock(&bdev->internal.mutex);
3831 				SPDK_ERRLOG("Unable to allocate memory for QoS tracking\n");
3832 				free(ctx);
3833 				cb_fn(cb_arg, -ENOMEM);
3834 				return;
3835 			}
3836 
3837 			_spdk_bdev_set_qos_rate_limits(bdev, limits);
3838 
3839 			spdk_for_each_channel(__bdev_to_io_dev(bdev),
3840 					      _spdk_bdev_enable_qos_msg, ctx,
3841 					      _spdk_bdev_enable_qos_done);
3842 		} else {
3843 			/* Updating */
3844 			_spdk_bdev_set_qos_rate_limits(bdev, limits);
3845 
3846 			spdk_thread_send_msg(bdev->internal.qos->thread,
3847 					     _spdk_bdev_update_qos_rate_limit_msg, ctx);
3848 		}
3849 	} else {
3850 		if (bdev->internal.qos != NULL) {
3851 			_spdk_bdev_set_qos_rate_limits(bdev, limits);
3852 
3853 			/* Disabling */
3854 			spdk_for_each_channel(__bdev_to_io_dev(bdev),
3855 					      _spdk_bdev_disable_qos_msg, ctx,
3856 					      _spdk_bdev_disable_qos_msg_done);
3857 		} else {
3858 			pthread_mutex_unlock(&bdev->internal.mutex);
3859 			_spdk_bdev_set_qos_limit_done(ctx, 0);
3860 			return;
3861 		}
3862 	}
3863 
3864 	pthread_mutex_unlock(&bdev->internal.mutex);
3865 }
3866 
3867 SPDK_LOG_REGISTER_COMPONENT("bdev", SPDK_LOG_BDEV)
3868 
3869 SPDK_TRACE_REGISTER_FN(bdev_trace)
3870 {
3871 	spdk_trace_register_owner(OWNER_BDEV, 'b');
3872 	spdk_trace_register_object(OBJECT_BDEV_IO, 'i');
3873 	spdk_trace_register_description("BDEV_IO_START", "", TRACE_BDEV_IO_START, OWNER_BDEV,
3874 					OBJECT_BDEV_IO, 1, 0, "type:   ");
3875 	spdk_trace_register_description("BDEV_IO_DONE", "", TRACE_BDEV_IO_DONE, OWNER_BDEV,
3876 					OBJECT_BDEV_IO, 0, 0, "");
3877 }
3878