xref: /spdk/lib/bdev/bdev.c (revision 7d030ef7fc9ec09d7e66a966eac0fcec5c1bd8bb)
1 /*-
2  *   BSD LICENSE
3  *
4  *   Copyright (c) Intel Corporation.
5  *   All rights reserved.
6  *
7  *   Redistribution and use in source and binary forms, with or without
8  *   modification, are permitted provided that the following conditions
9  *   are met:
10  *
11  *     * Redistributions of source code must retain the above copyright
12  *       notice, this list of conditions and the following disclaimer.
13  *     * Redistributions in binary form must reproduce the above copyright
14  *       notice, this list of conditions and the following disclaimer in
15  *       the documentation and/or other materials provided with the
16  *       distribution.
17  *     * Neither the name of Intel Corporation nor the names of its
18  *       contributors may be used to endorse or promote products derived
19  *       from this software without specific prior written permission.
20  *
21  *   THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22  *   "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23  *   LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
24  *   A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
25  *   OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
26  *   SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
27  *   LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
28  *   DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
29  *   THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
30  *   (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
31  *   OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
32  */
33 
34 #include "spdk/stdinc.h"
35 
36 #include "spdk/bdev.h"
37 #include "spdk/conf.h"
38 
39 #include "spdk/env.h"
40 #include "spdk/event.h"
41 #include "spdk/thread.h"
42 #include "spdk/likely.h"
43 #include "spdk/queue.h"
44 #include "spdk/nvme_spec.h"
45 #include "spdk/scsi_spec.h"
46 #include "spdk/util.h"
47 #include "spdk/trace.h"
48 
49 #include "spdk/bdev_module.h"
50 #include "spdk_internal/log.h"
51 #include "spdk/string.h"
52 
53 #ifdef SPDK_CONFIG_VTUNE
54 #include "ittnotify.h"
55 #include "ittnotify_types.h"
56 int __itt_init_ittlib(const char *, __itt_group_id);
57 #endif
58 
59 #define SPDK_BDEV_IO_POOL_SIZE			(64 * 1024)
60 #define SPDK_BDEV_IO_CACHE_SIZE			256
61 #define BUF_SMALL_POOL_SIZE			8192
62 #define BUF_LARGE_POOL_SIZE			1024
63 #define NOMEM_THRESHOLD_COUNT			8
64 #define ZERO_BUFFER_SIZE			0x100000
65 
66 #define OWNER_BDEV		0x2
67 
68 #define OBJECT_BDEV_IO		0x2
69 
70 #define TRACE_GROUP_BDEV	0x3
71 #define TRACE_BDEV_IO_START	SPDK_TPOINT_ID(TRACE_GROUP_BDEV, 0x0)
72 #define TRACE_BDEV_IO_DONE	SPDK_TPOINT_ID(TRACE_GROUP_BDEV, 0x1)
73 
74 #define SPDK_BDEV_QOS_TIMESLICE_IN_USEC		1000
75 #define SPDK_BDEV_QOS_MIN_IO_PER_TIMESLICE	1
76 #define SPDK_BDEV_QOS_MIN_BYTE_PER_TIMESLICE	512
77 #define SPDK_BDEV_QOS_MIN_IOS_PER_SEC		10000
78 #define SPDK_BDEV_QOS_MIN_BYTES_PER_SEC		(10 * 1024 * 1024)
79 #define SPDK_BDEV_QOS_LIMIT_NOT_DEFINED		UINT64_MAX
80 
81 static const char *qos_conf_type[] = {"Limit_IOPS", "Limit_BPS"};
82 static const char *qos_rpc_type[] = {"qos_ios_per_sec"};
83 
84 TAILQ_HEAD(spdk_bdev_list, spdk_bdev);
85 
86 struct spdk_bdev_mgr {
87 	struct spdk_mempool *bdev_io_pool;
88 
89 	struct spdk_mempool *buf_small_pool;
90 	struct spdk_mempool *buf_large_pool;
91 
92 	void *zero_buffer;
93 
94 	TAILQ_HEAD(bdev_module_list, spdk_bdev_module) bdev_modules;
95 
96 	struct spdk_bdev_list bdevs;
97 
98 	bool init_complete;
99 	bool module_init_complete;
100 
101 #ifdef SPDK_CONFIG_VTUNE
102 	__itt_domain	*domain;
103 #endif
104 };
105 
106 static struct spdk_bdev_mgr g_bdev_mgr = {
107 	.bdev_modules = TAILQ_HEAD_INITIALIZER(g_bdev_mgr.bdev_modules),
108 	.bdevs = TAILQ_HEAD_INITIALIZER(g_bdev_mgr.bdevs),
109 	.init_complete = false,
110 	.module_init_complete = false,
111 };
112 
113 static struct spdk_bdev_opts	g_bdev_opts = {
114 	.bdev_io_pool_size = SPDK_BDEV_IO_POOL_SIZE,
115 	.bdev_io_cache_size = SPDK_BDEV_IO_CACHE_SIZE,
116 };
117 
118 static spdk_bdev_init_cb	g_init_cb_fn = NULL;
119 static void			*g_init_cb_arg = NULL;
120 
121 static spdk_bdev_fini_cb	g_fini_cb_fn = NULL;
122 static void			*g_fini_cb_arg = NULL;
123 static struct spdk_thread	*g_fini_thread = NULL;
124 
125 struct spdk_bdev_qos_limit {
126 	/** IOs or bytes allowed per second (i.e., 1s). */
127 	uint64_t limit;
128 
129 	/** Remaining IOs or bytes allowed in current timeslice (e.g., 1ms).
130 	 *  For remaining bytes, allowed to run negative if an I/O is submitted when
131 	 *  some bytes are remaining, but the I/O is bigger than that amount. The
132 	 *  excess will be deducted from the next timeslice.
133 	 */
134 	int64_t remaining_this_timeslice;
135 
136 	/** Minimum allowed IOs or bytes to be issued in one timeslice (e.g., 1ms). */
137 	uint32_t min_per_timeslice;
138 
139 	/** Maximum allowed IOs or bytes to be issued in one timeslice (e.g., 1ms). */
140 	uint32_t max_per_timeslice;
141 };
142 
143 struct spdk_bdev_qos {
144 	/** Types of structure of rate limits. */
145 	struct spdk_bdev_qos_limit rate_limits[SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES];
146 
147 	/** The channel that all I/O are funneled through. */
148 	struct spdk_bdev_channel *ch;
149 
150 	/** The thread on which the poller is running. */
151 	struct spdk_thread *thread;
152 
153 	/** Queue of I/O waiting to be issued. */
154 	bdev_io_tailq_t queued;
155 
156 	/** Size of a timeslice in tsc ticks. */
157 	uint64_t timeslice_size;
158 
159 	/** Timestamp of start of last timeslice. */
160 	uint64_t last_timeslice;
161 
162 	/** Poller that processes queued I/O commands each time slice. */
163 	struct spdk_poller *poller;
164 };
165 
166 struct spdk_bdev_mgmt_channel {
167 	bdev_io_stailq_t need_buf_small;
168 	bdev_io_stailq_t need_buf_large;
169 
170 	/*
171 	 * Each thread keeps a cache of bdev_io - this allows
172 	 *  bdev threads which are *not* DPDK threads to still
173 	 *  benefit from a per-thread bdev_io cache.  Without
174 	 *  this, non-DPDK threads fetching from the mempool
175 	 *  incur a cmpxchg on get and put.
176 	 */
177 	bdev_io_stailq_t per_thread_cache;
178 	uint32_t	per_thread_cache_count;
179 	uint32_t	bdev_io_cache_size;
180 
181 	TAILQ_HEAD(, spdk_bdev_shared_resource)	shared_resources;
182 	TAILQ_HEAD(, spdk_bdev_io_wait_entry)	io_wait_queue;
183 };
184 
185 /*
186  * Per-module (or per-io_device) data. Multiple bdevs built on the same io_device
187  * will queue here their IO that awaits retry. It makes it possible to retry sending
188  * IO to one bdev after IO from other bdev completes.
189  */
190 struct spdk_bdev_shared_resource {
191 	/* The bdev management channel */
192 	struct spdk_bdev_mgmt_channel *mgmt_ch;
193 
194 	/*
195 	 * Count of I/O submitted to bdev module and waiting for completion.
196 	 * Incremented before submit_request() is called on an spdk_bdev_io.
197 	 */
198 	uint64_t		io_outstanding;
199 
200 	/*
201 	 * Queue of IO awaiting retry because of a previous NOMEM status returned
202 	 *  on this channel.
203 	 */
204 	bdev_io_tailq_t		nomem_io;
205 
206 	/*
207 	 * Threshold which io_outstanding must drop to before retrying nomem_io.
208 	 */
209 	uint64_t		nomem_threshold;
210 
211 	/* I/O channel allocated by a bdev module */
212 	struct spdk_io_channel	*shared_ch;
213 
214 	/* Refcount of bdev channels using this resource */
215 	uint32_t		ref;
216 
217 	TAILQ_ENTRY(spdk_bdev_shared_resource) link;
218 };
219 
220 #define BDEV_CH_RESET_IN_PROGRESS	(1 << 0)
221 #define BDEV_CH_QOS_ENABLED		(1 << 1)
222 
223 struct spdk_bdev_channel {
224 	struct spdk_bdev	*bdev;
225 
226 	/* The channel for the underlying device */
227 	struct spdk_io_channel	*channel;
228 
229 	/* Per io_device per thread data */
230 	struct spdk_bdev_shared_resource *shared_resource;
231 
232 	struct spdk_bdev_io_stat stat;
233 
234 	/*
235 	 * Count of I/O submitted through this channel and waiting for completion.
236 	 * Incremented before submit_request() is called on an spdk_bdev_io.
237 	 */
238 	uint64_t		io_outstanding;
239 
240 	bdev_io_tailq_t		queued_resets;
241 
242 	uint32_t		flags;
243 
244 #ifdef SPDK_CONFIG_VTUNE
245 	uint64_t		start_tsc;
246 	uint64_t		interval_tsc;
247 	__itt_string_handle	*handle;
248 	struct spdk_bdev_io_stat prev_stat;
249 #endif
250 
251 };
252 
253 struct spdk_bdev_desc {
254 	struct spdk_bdev		*bdev;
255 	struct spdk_thread		*thread;
256 	spdk_bdev_remove_cb_t		remove_cb;
257 	void				*remove_ctx;
258 	bool				remove_scheduled;
259 	bool				closed;
260 	bool				write;
261 	TAILQ_ENTRY(spdk_bdev_desc)	link;
262 };
263 
264 struct spdk_bdev_iostat_ctx {
265 	struct spdk_bdev_io_stat *stat;
266 	spdk_bdev_get_device_stat_cb cb;
267 	void *cb_arg;
268 };
269 
270 #define __bdev_to_io_dev(bdev)		(((char *)bdev) + 1)
271 #define __bdev_from_io_dev(io_dev)	((struct spdk_bdev *)(((char *)io_dev) - 1))
272 
273 static void _spdk_bdev_write_zero_buffer_done(struct spdk_bdev_io *bdev_io, bool success,
274 		void *cb_arg);
275 static void _spdk_bdev_write_zero_buffer_next(void *_bdev_io);
276 
277 void
278 spdk_bdev_get_opts(struct spdk_bdev_opts *opts)
279 {
280 	*opts = g_bdev_opts;
281 }
282 
283 int
284 spdk_bdev_set_opts(struct spdk_bdev_opts *opts)
285 {
286 	uint32_t min_pool_size;
287 
288 	/*
289 	 * Add 1 to the thread count to account for the extra mgmt_ch that gets created during subsystem
290 	 *  initialization.  A second mgmt_ch will be created on the same thread when the application starts
291 	 *  but before the deferred put_io_channel event is executed for the first mgmt_ch.
292 	 */
293 	min_pool_size = opts->bdev_io_cache_size * (spdk_thread_get_count() + 1);
294 	if (opts->bdev_io_pool_size < min_pool_size) {
295 		SPDK_ERRLOG("bdev_io_pool_size %" PRIu32 " is not compatible with bdev_io_cache_size %" PRIu32
296 			    " and %" PRIu32 " threads\n", opts->bdev_io_pool_size, opts->bdev_io_cache_size,
297 			    spdk_thread_get_count());
298 		SPDK_ERRLOG("bdev_io_pool_size must be at least %" PRIu32 "\n", min_pool_size);
299 		return -1;
300 	}
301 
302 	g_bdev_opts = *opts;
303 	return 0;
304 }
305 
306 struct spdk_bdev *
307 spdk_bdev_first(void)
308 {
309 	struct spdk_bdev *bdev;
310 
311 	bdev = TAILQ_FIRST(&g_bdev_mgr.bdevs);
312 	if (bdev) {
313 		SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Starting bdev iteration at %s\n", bdev->name);
314 	}
315 
316 	return bdev;
317 }
318 
319 struct spdk_bdev *
320 spdk_bdev_next(struct spdk_bdev *prev)
321 {
322 	struct spdk_bdev *bdev;
323 
324 	bdev = TAILQ_NEXT(prev, internal.link);
325 	if (bdev) {
326 		SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Continuing bdev iteration at %s\n", bdev->name);
327 	}
328 
329 	return bdev;
330 }
331 
332 static struct spdk_bdev *
333 _bdev_next_leaf(struct spdk_bdev *bdev)
334 {
335 	while (bdev != NULL) {
336 		if (bdev->internal.claim_module == NULL) {
337 			return bdev;
338 		} else {
339 			bdev = TAILQ_NEXT(bdev, internal.link);
340 		}
341 	}
342 
343 	return bdev;
344 }
345 
346 struct spdk_bdev *
347 spdk_bdev_first_leaf(void)
348 {
349 	struct spdk_bdev *bdev;
350 
351 	bdev = _bdev_next_leaf(TAILQ_FIRST(&g_bdev_mgr.bdevs));
352 
353 	if (bdev) {
354 		SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Starting bdev iteration at %s\n", bdev->name);
355 	}
356 
357 	return bdev;
358 }
359 
360 struct spdk_bdev *
361 spdk_bdev_next_leaf(struct spdk_bdev *prev)
362 {
363 	struct spdk_bdev *bdev;
364 
365 	bdev = _bdev_next_leaf(TAILQ_NEXT(prev, internal.link));
366 
367 	if (bdev) {
368 		SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Continuing bdev iteration at %s\n", bdev->name);
369 	}
370 
371 	return bdev;
372 }
373 
374 struct spdk_bdev *
375 spdk_bdev_get_by_name(const char *bdev_name)
376 {
377 	struct spdk_bdev_alias *tmp;
378 	struct spdk_bdev *bdev = spdk_bdev_first();
379 
380 	while (bdev != NULL) {
381 		if (strcmp(bdev_name, bdev->name) == 0) {
382 			return bdev;
383 		}
384 
385 		TAILQ_FOREACH(tmp, &bdev->aliases, tailq) {
386 			if (strcmp(bdev_name, tmp->alias) == 0) {
387 				return bdev;
388 			}
389 		}
390 
391 		bdev = spdk_bdev_next(bdev);
392 	}
393 
394 	return NULL;
395 }
396 
397 void
398 spdk_bdev_io_set_buf(struct spdk_bdev_io *bdev_io, void *buf, size_t len)
399 {
400 	struct iovec *iovs;
401 
402 	iovs = bdev_io->u.bdev.iovs;
403 
404 	assert(iovs != NULL);
405 	assert(bdev_io->u.bdev.iovcnt >= 1);
406 
407 	iovs[0].iov_base = buf;
408 	iovs[0].iov_len = len;
409 }
410 
411 static void
412 spdk_bdev_io_put_buf(struct spdk_bdev_io *bdev_io)
413 {
414 	struct spdk_mempool *pool;
415 	struct spdk_bdev_io *tmp;
416 	void *buf, *aligned_buf;
417 	bdev_io_stailq_t *stailq;
418 	struct spdk_bdev_mgmt_channel *ch;
419 
420 	assert(bdev_io->u.bdev.iovcnt == 1);
421 
422 	buf = bdev_io->internal.buf;
423 	ch = bdev_io->internal.ch->shared_resource->mgmt_ch;
424 
425 	bdev_io->internal.buf = NULL;
426 
427 	if (bdev_io->internal.buf_len <= SPDK_BDEV_SMALL_BUF_MAX_SIZE) {
428 		pool = g_bdev_mgr.buf_small_pool;
429 		stailq = &ch->need_buf_small;
430 	} else {
431 		pool = g_bdev_mgr.buf_large_pool;
432 		stailq = &ch->need_buf_large;
433 	}
434 
435 	if (STAILQ_EMPTY(stailq)) {
436 		spdk_mempool_put(pool, buf);
437 	} else {
438 		tmp = STAILQ_FIRST(stailq);
439 
440 		aligned_buf = (void *)(((uintptr_t)buf + 511) & ~511UL);
441 		spdk_bdev_io_set_buf(tmp, aligned_buf, tmp->internal.buf_len);
442 
443 		STAILQ_REMOVE_HEAD(stailq, internal.buf_link);
444 		tmp->internal.buf = buf;
445 		tmp->internal.get_buf_cb(tmp->internal.ch->channel, tmp);
446 	}
447 }
448 
449 void
450 spdk_bdev_io_get_buf(struct spdk_bdev_io *bdev_io, spdk_bdev_io_get_buf_cb cb, uint64_t len)
451 {
452 	struct spdk_mempool *pool;
453 	bdev_io_stailq_t *stailq;
454 	void *buf, *aligned_buf;
455 	struct spdk_bdev_mgmt_channel *mgmt_ch;
456 
457 	assert(cb != NULL);
458 	assert(bdev_io->u.bdev.iovs != NULL);
459 
460 	if (spdk_unlikely(bdev_io->u.bdev.iovs[0].iov_base != NULL)) {
461 		/* Buffer already present */
462 		cb(bdev_io->internal.ch->channel, bdev_io);
463 		return;
464 	}
465 
466 	assert(len <= SPDK_BDEV_LARGE_BUF_MAX_SIZE);
467 	mgmt_ch = bdev_io->internal.ch->shared_resource->mgmt_ch;
468 
469 	bdev_io->internal.buf_len = len;
470 	bdev_io->internal.get_buf_cb = cb;
471 	if (len <= SPDK_BDEV_SMALL_BUF_MAX_SIZE) {
472 		pool = g_bdev_mgr.buf_small_pool;
473 		stailq = &mgmt_ch->need_buf_small;
474 	} else {
475 		pool = g_bdev_mgr.buf_large_pool;
476 		stailq = &mgmt_ch->need_buf_large;
477 	}
478 
479 	buf = spdk_mempool_get(pool);
480 
481 	if (!buf) {
482 		STAILQ_INSERT_TAIL(stailq, bdev_io, internal.buf_link);
483 	} else {
484 		aligned_buf = (void *)(((uintptr_t)buf + 511) & ~511UL);
485 		spdk_bdev_io_set_buf(bdev_io, aligned_buf, len);
486 
487 		bdev_io->internal.buf = buf;
488 		bdev_io->internal.get_buf_cb(bdev_io->internal.ch->channel, bdev_io);
489 	}
490 }
491 
492 static int
493 spdk_bdev_module_get_max_ctx_size(void)
494 {
495 	struct spdk_bdev_module *bdev_module;
496 	int max_bdev_module_size = 0;
497 
498 	TAILQ_FOREACH(bdev_module, &g_bdev_mgr.bdev_modules, internal.tailq) {
499 		if (bdev_module->get_ctx_size && bdev_module->get_ctx_size() > max_bdev_module_size) {
500 			max_bdev_module_size = bdev_module->get_ctx_size();
501 		}
502 	}
503 
504 	return max_bdev_module_size;
505 }
506 
507 void
508 spdk_bdev_config_text(FILE *fp)
509 {
510 	struct spdk_bdev_module *bdev_module;
511 
512 	TAILQ_FOREACH(bdev_module, &g_bdev_mgr.bdev_modules, internal.tailq) {
513 		if (bdev_module->config_text) {
514 			bdev_module->config_text(fp);
515 		}
516 	}
517 }
518 
519 void
520 spdk_bdev_subsystem_config_json(struct spdk_json_write_ctx *w)
521 {
522 	struct spdk_bdev_module *bdev_module;
523 	struct spdk_bdev *bdev;
524 
525 	assert(w != NULL);
526 
527 	spdk_json_write_array_begin(w);
528 
529 	spdk_json_write_object_begin(w);
530 	spdk_json_write_named_string(w, "method", "set_bdev_options");
531 	spdk_json_write_name(w, "params");
532 	spdk_json_write_object_begin(w);
533 	spdk_json_write_named_uint32(w, "bdev_io_pool_size", g_bdev_opts.bdev_io_pool_size);
534 	spdk_json_write_named_uint32(w, "bdev_io_cache_size", g_bdev_opts.bdev_io_cache_size);
535 	spdk_json_write_object_end(w);
536 	spdk_json_write_object_end(w);
537 
538 	TAILQ_FOREACH(bdev_module, &g_bdev_mgr.bdev_modules, internal.tailq) {
539 		if (bdev_module->config_json) {
540 			bdev_module->config_json(w);
541 		}
542 	}
543 
544 	TAILQ_FOREACH(bdev, &g_bdev_mgr.bdevs, internal.link) {
545 		if (bdev->fn_table->write_config_json) {
546 			bdev->fn_table->write_config_json(bdev, w);
547 		}
548 	}
549 
550 	spdk_json_write_array_end(w);
551 }
552 
553 static int
554 spdk_bdev_mgmt_channel_create(void *io_device, void *ctx_buf)
555 {
556 	struct spdk_bdev_mgmt_channel *ch = ctx_buf;
557 	struct spdk_bdev_io *bdev_io;
558 	uint32_t i;
559 
560 	STAILQ_INIT(&ch->need_buf_small);
561 	STAILQ_INIT(&ch->need_buf_large);
562 
563 	STAILQ_INIT(&ch->per_thread_cache);
564 	ch->bdev_io_cache_size = g_bdev_opts.bdev_io_cache_size;
565 
566 	/* Pre-populate bdev_io cache to ensure this thread cannot be starved. */
567 	ch->per_thread_cache_count = 0;
568 	for (i = 0; i < ch->bdev_io_cache_size; i++) {
569 		bdev_io = spdk_mempool_get(g_bdev_mgr.bdev_io_pool);
570 		assert(bdev_io != NULL);
571 		ch->per_thread_cache_count++;
572 		STAILQ_INSERT_TAIL(&ch->per_thread_cache, bdev_io, internal.buf_link);
573 	}
574 
575 	TAILQ_INIT(&ch->shared_resources);
576 	TAILQ_INIT(&ch->io_wait_queue);
577 
578 	return 0;
579 }
580 
581 static void
582 spdk_bdev_mgmt_channel_destroy(void *io_device, void *ctx_buf)
583 {
584 	struct spdk_bdev_mgmt_channel *ch = ctx_buf;
585 	struct spdk_bdev_io *bdev_io;
586 
587 	if (!STAILQ_EMPTY(&ch->need_buf_small) || !STAILQ_EMPTY(&ch->need_buf_large)) {
588 		SPDK_ERRLOG("Pending I/O list wasn't empty on mgmt channel free\n");
589 	}
590 
591 	if (!TAILQ_EMPTY(&ch->shared_resources)) {
592 		SPDK_ERRLOG("Module channel list wasn't empty on mgmt channel free\n");
593 	}
594 
595 	while (!STAILQ_EMPTY(&ch->per_thread_cache)) {
596 		bdev_io = STAILQ_FIRST(&ch->per_thread_cache);
597 		STAILQ_REMOVE_HEAD(&ch->per_thread_cache, internal.buf_link);
598 		ch->per_thread_cache_count--;
599 		spdk_mempool_put(g_bdev_mgr.bdev_io_pool, (void *)bdev_io);
600 	}
601 
602 	assert(ch->per_thread_cache_count == 0);
603 }
604 
605 static void
606 spdk_bdev_init_complete(int rc)
607 {
608 	spdk_bdev_init_cb cb_fn = g_init_cb_fn;
609 	void *cb_arg = g_init_cb_arg;
610 	struct spdk_bdev_module *m;
611 
612 	g_bdev_mgr.init_complete = true;
613 	g_init_cb_fn = NULL;
614 	g_init_cb_arg = NULL;
615 
616 	/*
617 	 * For modules that need to know when subsystem init is complete,
618 	 * inform them now.
619 	 */
620 	if (rc == 0) {
621 		TAILQ_FOREACH(m, &g_bdev_mgr.bdev_modules, internal.tailq) {
622 			if (m->init_complete) {
623 				m->init_complete();
624 			}
625 		}
626 	}
627 
628 	cb_fn(cb_arg, rc);
629 }
630 
631 static void
632 spdk_bdev_module_action_complete(void)
633 {
634 	struct spdk_bdev_module *m;
635 
636 	/*
637 	 * Don't finish bdev subsystem initialization if
638 	 * module pre-initialization is still in progress, or
639 	 * the subsystem been already initialized.
640 	 */
641 	if (!g_bdev_mgr.module_init_complete || g_bdev_mgr.init_complete) {
642 		return;
643 	}
644 
645 	/*
646 	 * Check all bdev modules for inits/examinations in progress. If any
647 	 * exist, return immediately since we cannot finish bdev subsystem
648 	 * initialization until all are completed.
649 	 */
650 	TAILQ_FOREACH(m, &g_bdev_mgr.bdev_modules, internal.tailq) {
651 		if (m->internal.action_in_progress > 0) {
652 			return;
653 		}
654 	}
655 
656 	/*
657 	 * Modules already finished initialization - now that all
658 	 * the bdev modules have finished their asynchronous I/O
659 	 * processing, the entire bdev layer can be marked as complete.
660 	 */
661 	spdk_bdev_init_complete(0);
662 }
663 
664 static void
665 spdk_bdev_module_action_done(struct spdk_bdev_module *module)
666 {
667 	assert(module->internal.action_in_progress > 0);
668 	module->internal.action_in_progress--;
669 	spdk_bdev_module_action_complete();
670 }
671 
672 void
673 spdk_bdev_module_init_done(struct spdk_bdev_module *module)
674 {
675 	spdk_bdev_module_action_done(module);
676 }
677 
678 void
679 spdk_bdev_module_examine_done(struct spdk_bdev_module *module)
680 {
681 	spdk_bdev_module_action_done(module);
682 }
683 
684 /** The last initialized bdev module */
685 static struct spdk_bdev_module *g_resume_bdev_module = NULL;
686 
687 static int
688 spdk_bdev_modules_init(void)
689 {
690 	struct spdk_bdev_module *module;
691 	int rc = 0;
692 
693 	TAILQ_FOREACH(module, &g_bdev_mgr.bdev_modules, internal.tailq) {
694 		g_resume_bdev_module = module;
695 		rc = module->module_init();
696 		if (rc != 0) {
697 			return rc;
698 		}
699 	}
700 
701 	g_resume_bdev_module = NULL;
702 	return 0;
703 }
704 
705 
706 static void
707 spdk_bdev_init_failed_complete(void *cb_arg)
708 {
709 	spdk_bdev_init_complete(-1);
710 }
711 
712 static void
713 spdk_bdev_init_failed(void *cb_arg)
714 {
715 	spdk_bdev_finish(spdk_bdev_init_failed_complete, NULL);
716 }
717 
718 void
719 spdk_bdev_initialize(spdk_bdev_init_cb cb_fn, void *cb_arg)
720 {
721 	struct spdk_conf_section *sp;
722 	struct spdk_bdev_opts bdev_opts;
723 	int32_t bdev_io_pool_size, bdev_io_cache_size;
724 	int cache_size;
725 	int rc = 0;
726 	char mempool_name[32];
727 
728 	assert(cb_fn != NULL);
729 
730 	sp = spdk_conf_find_section(NULL, "Bdev");
731 	if (sp != NULL) {
732 		spdk_bdev_get_opts(&bdev_opts);
733 
734 		bdev_io_pool_size = spdk_conf_section_get_intval(sp, "BdevIoPoolSize");
735 		if (bdev_io_pool_size >= 0) {
736 			bdev_opts.bdev_io_pool_size = bdev_io_pool_size;
737 		}
738 
739 		bdev_io_cache_size = spdk_conf_section_get_intval(sp, "BdevIoCacheSize");
740 		if (bdev_io_cache_size >= 0) {
741 			bdev_opts.bdev_io_cache_size = bdev_io_cache_size;
742 		}
743 
744 		if (spdk_bdev_set_opts(&bdev_opts)) {
745 			spdk_bdev_init_complete(-1);
746 			return;
747 		}
748 
749 		assert(memcmp(&bdev_opts, &g_bdev_opts, sizeof(bdev_opts)) == 0);
750 	}
751 
752 	g_init_cb_fn = cb_fn;
753 	g_init_cb_arg = cb_arg;
754 
755 	snprintf(mempool_name, sizeof(mempool_name), "bdev_io_%d", getpid());
756 
757 	g_bdev_mgr.bdev_io_pool = spdk_mempool_create(mempool_name,
758 				  g_bdev_opts.bdev_io_pool_size,
759 				  sizeof(struct spdk_bdev_io) +
760 				  spdk_bdev_module_get_max_ctx_size(),
761 				  0,
762 				  SPDK_ENV_SOCKET_ID_ANY);
763 
764 	if (g_bdev_mgr.bdev_io_pool == NULL) {
765 		SPDK_ERRLOG("could not allocate spdk_bdev_io pool\n");
766 		spdk_bdev_init_complete(-1);
767 		return;
768 	}
769 
770 	/**
771 	 * Ensure no more than half of the total buffers end up local caches, by
772 	 *   using spdk_thread_get_count() to determine how many local caches we need
773 	 *   to account for.
774 	 */
775 	cache_size = BUF_SMALL_POOL_SIZE / (2 * spdk_thread_get_count());
776 	snprintf(mempool_name, sizeof(mempool_name), "buf_small_pool_%d", getpid());
777 
778 	g_bdev_mgr.buf_small_pool = spdk_mempool_create(mempool_name,
779 				    BUF_SMALL_POOL_SIZE,
780 				    SPDK_BDEV_SMALL_BUF_MAX_SIZE + 512,
781 				    cache_size,
782 				    SPDK_ENV_SOCKET_ID_ANY);
783 	if (!g_bdev_mgr.buf_small_pool) {
784 		SPDK_ERRLOG("create rbuf small pool failed\n");
785 		spdk_bdev_init_complete(-1);
786 		return;
787 	}
788 
789 	cache_size = BUF_LARGE_POOL_SIZE / (2 * spdk_thread_get_count());
790 	snprintf(mempool_name, sizeof(mempool_name), "buf_large_pool_%d", getpid());
791 
792 	g_bdev_mgr.buf_large_pool = spdk_mempool_create(mempool_name,
793 				    BUF_LARGE_POOL_SIZE,
794 				    SPDK_BDEV_LARGE_BUF_MAX_SIZE + 512,
795 				    cache_size,
796 				    SPDK_ENV_SOCKET_ID_ANY);
797 	if (!g_bdev_mgr.buf_large_pool) {
798 		SPDK_ERRLOG("create rbuf large pool failed\n");
799 		spdk_bdev_init_complete(-1);
800 		return;
801 	}
802 
803 	g_bdev_mgr.zero_buffer = spdk_dma_zmalloc(ZERO_BUFFER_SIZE, ZERO_BUFFER_SIZE,
804 				 NULL);
805 	if (!g_bdev_mgr.zero_buffer) {
806 		SPDK_ERRLOG("create bdev zero buffer failed\n");
807 		spdk_bdev_init_complete(-1);
808 		return;
809 	}
810 
811 #ifdef SPDK_CONFIG_VTUNE
812 	g_bdev_mgr.domain = __itt_domain_create("spdk_bdev");
813 #endif
814 
815 	spdk_io_device_register(&g_bdev_mgr, spdk_bdev_mgmt_channel_create,
816 				spdk_bdev_mgmt_channel_destroy,
817 				sizeof(struct spdk_bdev_mgmt_channel),
818 				"bdev_mgr");
819 
820 	rc = spdk_bdev_modules_init();
821 	g_bdev_mgr.module_init_complete = true;
822 	if (rc != 0) {
823 		SPDK_ERRLOG("bdev modules init failed\n");
824 		spdk_thread_send_msg(spdk_get_thread(), spdk_bdev_init_failed, NULL);
825 		return;
826 	}
827 
828 	spdk_bdev_module_action_complete();
829 }
830 
831 static void
832 spdk_bdev_mgr_unregister_cb(void *io_device)
833 {
834 	spdk_bdev_fini_cb cb_fn = g_fini_cb_fn;
835 
836 	if (spdk_mempool_count(g_bdev_mgr.bdev_io_pool) != g_bdev_opts.bdev_io_pool_size) {
837 		SPDK_ERRLOG("bdev IO pool count is %zu but should be %u\n",
838 			    spdk_mempool_count(g_bdev_mgr.bdev_io_pool),
839 			    g_bdev_opts.bdev_io_pool_size);
840 	}
841 
842 	if (spdk_mempool_count(g_bdev_mgr.buf_small_pool) != BUF_SMALL_POOL_SIZE) {
843 		SPDK_ERRLOG("Small buffer pool count is %zu but should be %u\n",
844 			    spdk_mempool_count(g_bdev_mgr.buf_small_pool),
845 			    BUF_SMALL_POOL_SIZE);
846 		assert(false);
847 	}
848 
849 	if (spdk_mempool_count(g_bdev_mgr.buf_large_pool) != BUF_LARGE_POOL_SIZE) {
850 		SPDK_ERRLOG("Large buffer pool count is %zu but should be %u\n",
851 			    spdk_mempool_count(g_bdev_mgr.buf_large_pool),
852 			    BUF_LARGE_POOL_SIZE);
853 		assert(false);
854 	}
855 
856 	spdk_mempool_free(g_bdev_mgr.bdev_io_pool);
857 	spdk_mempool_free(g_bdev_mgr.buf_small_pool);
858 	spdk_mempool_free(g_bdev_mgr.buf_large_pool);
859 	spdk_dma_free(g_bdev_mgr.zero_buffer);
860 
861 	cb_fn(g_fini_cb_arg);
862 	g_fini_cb_fn = NULL;
863 	g_fini_cb_arg = NULL;
864 }
865 
866 static void
867 spdk_bdev_module_finish_iter(void *arg)
868 {
869 	struct spdk_bdev_module *bdev_module;
870 
871 	/* Start iterating from the last touched module */
872 	if (!g_resume_bdev_module) {
873 		bdev_module = TAILQ_LAST(&g_bdev_mgr.bdev_modules, bdev_module_list);
874 	} else {
875 		bdev_module = TAILQ_PREV(g_resume_bdev_module, bdev_module_list,
876 					 internal.tailq);
877 	}
878 
879 	while (bdev_module) {
880 		if (bdev_module->async_fini) {
881 			/* Save our place so we can resume later. We must
882 			 * save the variable here, before calling module_fini()
883 			 * below, because in some cases the module may immediately
884 			 * call spdk_bdev_module_finish_done() and re-enter
885 			 * this function to continue iterating. */
886 			g_resume_bdev_module = bdev_module;
887 		}
888 
889 		if (bdev_module->module_fini) {
890 			bdev_module->module_fini();
891 		}
892 
893 		if (bdev_module->async_fini) {
894 			return;
895 		}
896 
897 		bdev_module = TAILQ_PREV(bdev_module, bdev_module_list,
898 					 internal.tailq);
899 	}
900 
901 	g_resume_bdev_module = NULL;
902 	spdk_io_device_unregister(&g_bdev_mgr, spdk_bdev_mgr_unregister_cb);
903 }
904 
905 void
906 spdk_bdev_module_finish_done(void)
907 {
908 	if (spdk_get_thread() != g_fini_thread) {
909 		spdk_thread_send_msg(g_fini_thread, spdk_bdev_module_finish_iter, NULL);
910 	} else {
911 		spdk_bdev_module_finish_iter(NULL);
912 	}
913 }
914 
915 static void
916 _spdk_bdev_finish_unregister_bdevs_iter(void *cb_arg, int bdeverrno)
917 {
918 	struct spdk_bdev *bdev = cb_arg;
919 
920 	if (bdeverrno && bdev) {
921 		SPDK_WARNLOG("Unable to unregister bdev '%s' during spdk_bdev_finish()\n",
922 			     bdev->name);
923 
924 		/*
925 		 * Since the call to spdk_bdev_unregister() failed, we have no way to free this
926 		 *  bdev; try to continue by manually removing this bdev from the list and continue
927 		 *  with the next bdev in the list.
928 		 */
929 		TAILQ_REMOVE(&g_bdev_mgr.bdevs, bdev, internal.link);
930 	}
931 
932 	if (TAILQ_EMPTY(&g_bdev_mgr.bdevs)) {
933 		SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Done unregistering bdevs\n");
934 		/*
935 		 * Bdev module finish need to be deffered as we might be in the middle of some context
936 		 * (like bdev part free) that will use this bdev (or private bdev driver ctx data)
937 		 * after returning.
938 		 */
939 		spdk_thread_send_msg(spdk_get_thread(), spdk_bdev_module_finish_iter, NULL);
940 		return;
941 	}
942 
943 	/*
944 	 * Unregister the last bdev in the list.  The last bdev in the list should be a bdev
945 	 * that has no bdevs that depend on it.
946 	 */
947 	bdev = TAILQ_LAST(&g_bdev_mgr.bdevs, spdk_bdev_list);
948 	SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Unregistering bdev '%s'\n", bdev->name);
949 	spdk_bdev_unregister(bdev, _spdk_bdev_finish_unregister_bdevs_iter, bdev);
950 }
951 
952 void
953 spdk_bdev_finish(spdk_bdev_fini_cb cb_fn, void *cb_arg)
954 {
955 	struct spdk_bdev_module *m;
956 
957 	assert(cb_fn != NULL);
958 
959 	g_fini_thread = spdk_get_thread();
960 
961 	g_fini_cb_fn = cb_fn;
962 	g_fini_cb_arg = cb_arg;
963 
964 	TAILQ_FOREACH(m, &g_bdev_mgr.bdev_modules, internal.tailq) {
965 		if (m->fini_start) {
966 			m->fini_start();
967 		}
968 	}
969 
970 	_spdk_bdev_finish_unregister_bdevs_iter(NULL, 0);
971 }
972 
973 static struct spdk_bdev_io *
974 spdk_bdev_get_io(struct spdk_bdev_channel *channel)
975 {
976 	struct spdk_bdev_mgmt_channel *ch = channel->shared_resource->mgmt_ch;
977 	struct spdk_bdev_io *bdev_io;
978 
979 	if (ch->per_thread_cache_count > 0) {
980 		bdev_io = STAILQ_FIRST(&ch->per_thread_cache);
981 		STAILQ_REMOVE_HEAD(&ch->per_thread_cache, internal.buf_link);
982 		ch->per_thread_cache_count--;
983 	} else if (spdk_unlikely(!TAILQ_EMPTY(&ch->io_wait_queue))) {
984 		/*
985 		 * Don't try to look for bdev_ios in the global pool if there are
986 		 * waiters on bdev_ios - we don't want this caller to jump the line.
987 		 */
988 		bdev_io = NULL;
989 	} else {
990 		bdev_io = spdk_mempool_get(g_bdev_mgr.bdev_io_pool);
991 	}
992 
993 	return bdev_io;
994 }
995 
996 void
997 spdk_bdev_free_io(struct spdk_bdev_io *bdev_io)
998 {
999 	struct spdk_bdev_mgmt_channel *ch = bdev_io->internal.ch->shared_resource->mgmt_ch;
1000 
1001 	assert(bdev_io != NULL);
1002 	assert(bdev_io->internal.status != SPDK_BDEV_IO_STATUS_PENDING);
1003 
1004 	if (bdev_io->internal.buf != NULL) {
1005 		spdk_bdev_io_put_buf(bdev_io);
1006 	}
1007 
1008 	if (ch->per_thread_cache_count < ch->bdev_io_cache_size) {
1009 		ch->per_thread_cache_count++;
1010 		STAILQ_INSERT_TAIL(&ch->per_thread_cache, bdev_io, internal.buf_link);
1011 		while (ch->per_thread_cache_count > 0 && !TAILQ_EMPTY(&ch->io_wait_queue)) {
1012 			struct spdk_bdev_io_wait_entry *entry;
1013 
1014 			entry = TAILQ_FIRST(&ch->io_wait_queue);
1015 			TAILQ_REMOVE(&ch->io_wait_queue, entry, link);
1016 			entry->cb_fn(entry->cb_arg);
1017 		}
1018 	} else {
1019 		/* We should never have a full cache with entries on the io wait queue. */
1020 		assert(TAILQ_EMPTY(&ch->io_wait_queue));
1021 		spdk_mempool_put(g_bdev_mgr.bdev_io_pool, (void *)bdev_io);
1022 	}
1023 }
1024 
1025 static bool
1026 _spdk_bdev_qos_is_iops_rate_limit(enum spdk_bdev_qos_rate_limit_type limit)
1027 {
1028 	assert(limit != SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES);
1029 
1030 	switch (limit) {
1031 	case SPDK_BDEV_QOS_RW_IOPS_RATE_LIMIT:
1032 		return true;
1033 	case SPDK_BDEV_QOS_RW_BPS_RATE_LIMIT:
1034 		return false;
1035 	case SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES:
1036 	default:
1037 		return false;
1038 	}
1039 }
1040 
1041 static bool
1042 _spdk_bdev_qos_io_to_limit(struct spdk_bdev_io *bdev_io)
1043 {
1044 	switch (bdev_io->type) {
1045 	case SPDK_BDEV_IO_TYPE_NVME_IO:
1046 	case SPDK_BDEV_IO_TYPE_NVME_IO_MD:
1047 	case SPDK_BDEV_IO_TYPE_READ:
1048 	case SPDK_BDEV_IO_TYPE_WRITE:
1049 	case SPDK_BDEV_IO_TYPE_UNMAP:
1050 	case SPDK_BDEV_IO_TYPE_WRITE_ZEROES:
1051 		return true;
1052 	default:
1053 		return false;
1054 	}
1055 }
1056 
1057 static uint64_t
1058 _spdk_bdev_get_io_size_in_byte(struct spdk_bdev_io *bdev_io)
1059 {
1060 	struct spdk_bdev	*bdev = bdev_io->bdev;
1061 
1062 	switch (bdev_io->type) {
1063 	case SPDK_BDEV_IO_TYPE_NVME_IO:
1064 	case SPDK_BDEV_IO_TYPE_NVME_IO_MD:
1065 		return bdev_io->u.nvme_passthru.nbytes;
1066 	case SPDK_BDEV_IO_TYPE_READ:
1067 	case SPDK_BDEV_IO_TYPE_WRITE:
1068 	case SPDK_BDEV_IO_TYPE_UNMAP:
1069 	case SPDK_BDEV_IO_TYPE_WRITE_ZEROES:
1070 		return bdev_io->u.bdev.num_blocks * bdev->blocklen;
1071 	default:
1072 		return 0;
1073 	}
1074 }
1075 
1076 static void
1077 _spdk_bdev_qos_update_per_io(struct spdk_bdev_qos *qos, uint64_t io_size_in_byte)
1078 {
1079 	int i;
1080 
1081 	for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) {
1082 		if (qos->rate_limits[i].limit == SPDK_BDEV_QOS_LIMIT_NOT_DEFINED) {
1083 			continue;
1084 		}
1085 
1086 		switch (i) {
1087 		case SPDK_BDEV_QOS_RW_IOPS_RATE_LIMIT:
1088 			qos->rate_limits[i].remaining_this_timeslice--;
1089 			break;
1090 		case SPDK_BDEV_QOS_RW_BPS_RATE_LIMIT:
1091 			qos->rate_limits[i].remaining_this_timeslice -= io_size_in_byte;
1092 			break;
1093 		case SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES:
1094 		default:
1095 			break;
1096 		}
1097 	}
1098 }
1099 
1100 static void
1101 _spdk_bdev_qos_io_submit(struct spdk_bdev_channel *ch)
1102 {
1103 	struct spdk_bdev_io		*bdev_io = NULL;
1104 	struct spdk_bdev		*bdev = ch->bdev;
1105 	struct spdk_bdev_qos		*qos = bdev->internal.qos;
1106 	struct spdk_bdev_shared_resource *shared_resource = ch->shared_resource;
1107 	int				i;
1108 	bool				to_limit_io;
1109 	uint64_t			io_size_in_byte;
1110 
1111 	while (!TAILQ_EMPTY(&qos->queued)) {
1112 		for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) {
1113 			if (qos->rate_limits[i].max_per_timeslice > 0 &&
1114 			    (qos->rate_limits[i].remaining_this_timeslice <= 0)) {
1115 				return;
1116 			}
1117 		}
1118 
1119 		bdev_io = TAILQ_FIRST(&qos->queued);
1120 		TAILQ_REMOVE(&qos->queued, bdev_io, internal.link);
1121 		ch->io_outstanding++;
1122 		shared_resource->io_outstanding++;
1123 		to_limit_io = _spdk_bdev_qos_io_to_limit(bdev_io);
1124 		if (to_limit_io == true) {
1125 			io_size_in_byte = _spdk_bdev_get_io_size_in_byte(bdev_io);
1126 			_spdk_bdev_qos_update_per_io(qos, io_size_in_byte);
1127 		}
1128 		bdev->fn_table->submit_request(ch->channel, bdev_io);
1129 	}
1130 }
1131 
1132 static void
1133 _spdk_bdev_queue_io_wait_with_cb(struct spdk_bdev_io *bdev_io, spdk_bdev_io_wait_cb cb_fn)
1134 {
1135 	int rc;
1136 
1137 	bdev_io->internal.waitq_entry.bdev = bdev_io->bdev;
1138 	bdev_io->internal.waitq_entry.cb_fn = cb_fn;
1139 	bdev_io->internal.waitq_entry.cb_arg = bdev_io;
1140 	rc = spdk_bdev_queue_io_wait(bdev_io->bdev, spdk_io_channel_from_ctx(bdev_io->internal.ch),
1141 				     &bdev_io->internal.waitq_entry);
1142 	if (rc != 0) {
1143 		SPDK_ERRLOG("Queue IO failed, rc=%d\n", rc);
1144 		bdev_io->internal.status = SPDK_BDEV_IO_STATUS_FAILED;
1145 		bdev_io->internal.cb(bdev_io, false, bdev_io->internal.caller_ctx);
1146 	}
1147 }
1148 
1149 static bool
1150 _spdk_bdev_io_type_can_split(uint8_t type)
1151 {
1152 	assert(type != SPDK_BDEV_IO_TYPE_INVALID);
1153 	assert(type < SPDK_BDEV_NUM_IO_TYPES);
1154 
1155 	/* Only split READ and WRITE I/O.  Theoretically other types of I/O like
1156 	 * UNMAP could be split, but these types of I/O are typically much larger
1157 	 * in size (sometimes the size of the entire block device), and the bdev
1158 	 * module can more efficiently split these types of I/O.  Plus those types
1159 	 * of I/O do not have a payload, which makes the splitting process simpler.
1160 	 */
1161 	if (type == SPDK_BDEV_IO_TYPE_READ || type == SPDK_BDEV_IO_TYPE_WRITE) {
1162 		return true;
1163 	} else {
1164 		return false;
1165 	}
1166 }
1167 
1168 static bool
1169 _spdk_bdev_io_should_split(struct spdk_bdev_io *bdev_io)
1170 {
1171 	uint64_t start_stripe, end_stripe;
1172 	uint32_t io_boundary = bdev_io->bdev->optimal_io_boundary;
1173 
1174 	if (io_boundary == 0) {
1175 		return false;
1176 	}
1177 
1178 	if (!_spdk_bdev_io_type_can_split(bdev_io->type)) {
1179 		return false;
1180 	}
1181 
1182 	start_stripe = bdev_io->u.bdev.offset_blocks;
1183 	end_stripe = start_stripe + bdev_io->u.bdev.num_blocks - 1;
1184 	/* Avoid expensive div operations if possible.  These spdk_u32 functions are very cheap. */
1185 	if (spdk_likely(spdk_u32_is_pow2(io_boundary))) {
1186 		start_stripe >>= spdk_u32log2(io_boundary);
1187 		end_stripe >>= spdk_u32log2(io_boundary);
1188 	} else {
1189 		start_stripe /= io_boundary;
1190 		end_stripe /= io_boundary;
1191 	}
1192 	return (start_stripe != end_stripe);
1193 }
1194 
1195 static uint32_t
1196 _to_next_boundary(uint64_t offset, uint32_t boundary)
1197 {
1198 	return (boundary - (offset % boundary));
1199 }
1200 
1201 static void
1202 _spdk_bdev_io_split_done(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg);
1203 
1204 static void
1205 _spdk_bdev_io_split_with_payload(void *_bdev_io)
1206 {
1207 	struct spdk_bdev_io *bdev_io = _bdev_io;
1208 	uint64_t current_offset, remaining, bytes_handled;
1209 	uint32_t blocklen, to_next_boundary, to_next_boundary_bytes;
1210 	struct iovec *parent_iov;
1211 	uint64_t parent_iov_offset, child_iov_len;
1212 	uint32_t child_iovcnt;
1213 	int rc;
1214 
1215 	remaining = bdev_io->u.bdev.split_remaining_num_blocks;
1216 	current_offset = bdev_io->u.bdev.split_current_offset_blocks;
1217 	blocklen = bdev_io->bdev->blocklen;
1218 	bytes_handled = (current_offset - bdev_io->u.bdev.offset_blocks) * blocklen;
1219 	parent_iov = &bdev_io->u.bdev.iovs[0];
1220 	parent_iov_offset = 0;
1221 
1222 	while (bytes_handled > 0) {
1223 		if (bytes_handled >= parent_iov->iov_len) {
1224 			bytes_handled -= parent_iov->iov_len;
1225 			parent_iov++;
1226 			continue;
1227 		}
1228 		parent_iov_offset += bytes_handled;
1229 		break;
1230 	}
1231 
1232 	to_next_boundary = _to_next_boundary(current_offset, bdev_io->bdev->optimal_io_boundary);
1233 	to_next_boundary = spdk_min(remaining, to_next_boundary);
1234 	to_next_boundary_bytes = to_next_boundary * blocklen;
1235 	child_iovcnt = 0;
1236 	while (to_next_boundary_bytes > 0 && child_iovcnt < BDEV_IO_NUM_CHILD_IOV) {
1237 		child_iov_len = spdk_min(to_next_boundary_bytes, parent_iov->iov_len - parent_iov_offset);
1238 		to_next_boundary_bytes -= child_iov_len;
1239 
1240 		bdev_io->child_iov[child_iovcnt].iov_base = parent_iov->iov_base + parent_iov_offset;
1241 		bdev_io->child_iov[child_iovcnt].iov_len = child_iov_len;
1242 
1243 		parent_iov++;
1244 		parent_iov_offset = 0;
1245 		child_iovcnt++;
1246 	}
1247 
1248 	if (to_next_boundary_bytes > 0) {
1249 		/* We had to stop this child I/O early because we ran out of
1250 		 *  child_iov space.  Make sure the iovs collected are valid and
1251 		 *  then adjust to_next_boundary before starting the child I/O.
1252 		 */
1253 		if ((to_next_boundary_bytes % blocklen) != 0) {
1254 			SPDK_ERRLOG("Remaining %" PRIu32 " is not multiple of block size %" PRIu32 "\n",
1255 				    to_next_boundary_bytes, blocklen);
1256 			bdev_io->internal.status = SPDK_BDEV_IO_STATUS_FAILED;
1257 			bdev_io->internal.cb(bdev_io, false, bdev_io->internal.caller_ctx);
1258 			return;
1259 		}
1260 		to_next_boundary -= to_next_boundary_bytes / blocklen;
1261 	}
1262 
1263 	if (bdev_io->type == SPDK_BDEV_IO_TYPE_READ) {
1264 		rc = spdk_bdev_readv_blocks(bdev_io->internal.desc,
1265 					    spdk_io_channel_from_ctx(bdev_io->internal.ch),
1266 					    bdev_io->child_iov, child_iovcnt, current_offset, to_next_boundary,
1267 					    _spdk_bdev_io_split_done, bdev_io);
1268 	} else {
1269 		rc = spdk_bdev_writev_blocks(bdev_io->internal.desc,
1270 					     spdk_io_channel_from_ctx(bdev_io->internal.ch),
1271 					     bdev_io->child_iov, child_iovcnt, current_offset, to_next_boundary,
1272 					     _spdk_bdev_io_split_done, bdev_io);
1273 	}
1274 
1275 	if (rc == 0) {
1276 		bdev_io->u.bdev.split_current_offset_blocks += to_next_boundary;
1277 		bdev_io->u.bdev.split_remaining_num_blocks -= to_next_boundary;
1278 	} else if (rc == -ENOMEM) {
1279 		_spdk_bdev_queue_io_wait_with_cb(bdev_io, _spdk_bdev_io_split_with_payload);
1280 	} else {
1281 		bdev_io->internal.status = SPDK_BDEV_IO_STATUS_FAILED;
1282 		bdev_io->internal.cb(bdev_io, false, bdev_io->internal.caller_ctx);
1283 	}
1284 }
1285 
1286 static void
1287 _spdk_bdev_io_split_done(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg)
1288 {
1289 	struct spdk_bdev_io *parent_io = cb_arg;
1290 
1291 	spdk_bdev_free_io(bdev_io);
1292 
1293 	if (!success) {
1294 		parent_io->internal.status = SPDK_BDEV_IO_STATUS_FAILED;
1295 		parent_io->internal.cb(parent_io, false, parent_io->internal.caller_ctx);
1296 		return;
1297 	}
1298 
1299 	if (parent_io->u.bdev.split_remaining_num_blocks == 0) {
1300 		parent_io->internal.status = SPDK_BDEV_IO_STATUS_SUCCESS;
1301 		parent_io->internal.cb(parent_io, true, parent_io->internal.caller_ctx);
1302 		return;
1303 	}
1304 
1305 	/*
1306 	 * Continue with the splitting process.  This function will complete the parent I/O if the
1307 	 * splitting is done.
1308 	 */
1309 	_spdk_bdev_io_split_with_payload(parent_io);
1310 }
1311 
1312 static void
1313 _spdk_bdev_io_split(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io)
1314 {
1315 	assert(_spdk_bdev_io_type_can_split(bdev_io->type));
1316 
1317 	bdev_io->u.bdev.split_current_offset_blocks = bdev_io->u.bdev.offset_blocks;
1318 	bdev_io->u.bdev.split_remaining_num_blocks = bdev_io->u.bdev.num_blocks;
1319 
1320 	_spdk_bdev_io_split_with_payload(bdev_io);
1321 }
1322 
1323 static void
1324 _spdk_bdev_io_submit(void *ctx)
1325 {
1326 	struct spdk_bdev_io *bdev_io = ctx;
1327 	struct spdk_bdev *bdev = bdev_io->bdev;
1328 	struct spdk_bdev_channel *bdev_ch = bdev_io->internal.ch;
1329 	struct spdk_io_channel *ch = bdev_ch->channel;
1330 	struct spdk_bdev_shared_resource *shared_resource = bdev_ch->shared_resource;
1331 	uint64_t tsc;
1332 
1333 	tsc = spdk_get_ticks();
1334 	bdev_io->internal.submit_tsc = tsc;
1335 	spdk_trace_record_tsc(tsc, TRACE_BDEV_IO_START, 0, 0, (uintptr_t)bdev_io, bdev_io->type);
1336 	bdev_ch->io_outstanding++;
1337 	shared_resource->io_outstanding++;
1338 	bdev_io->internal.in_submit_request = true;
1339 	if (spdk_likely(bdev_ch->flags == 0)) {
1340 		if (spdk_likely(TAILQ_EMPTY(&shared_resource->nomem_io))) {
1341 			bdev->fn_table->submit_request(ch, bdev_io);
1342 		} else {
1343 			bdev_ch->io_outstanding--;
1344 			shared_resource->io_outstanding--;
1345 			TAILQ_INSERT_TAIL(&shared_resource->nomem_io, bdev_io, internal.link);
1346 		}
1347 	} else if (bdev_ch->flags & BDEV_CH_RESET_IN_PROGRESS) {
1348 		spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED);
1349 	} else if (bdev_ch->flags & BDEV_CH_QOS_ENABLED) {
1350 		bdev_ch->io_outstanding--;
1351 		shared_resource->io_outstanding--;
1352 		TAILQ_INSERT_TAIL(&bdev->internal.qos->queued, bdev_io, internal.link);
1353 		_spdk_bdev_qos_io_submit(bdev_ch);
1354 	} else {
1355 		SPDK_ERRLOG("unknown bdev_ch flag %x found\n", bdev_ch->flags);
1356 		spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED);
1357 	}
1358 	bdev_io->internal.in_submit_request = false;
1359 }
1360 
1361 static void
1362 spdk_bdev_io_submit(struct spdk_bdev_io *bdev_io)
1363 {
1364 	struct spdk_bdev *bdev = bdev_io->bdev;
1365 	struct spdk_thread *thread = spdk_io_channel_get_thread(bdev_io->internal.ch->channel);
1366 
1367 	assert(thread != NULL);
1368 	assert(bdev_io->internal.status == SPDK_BDEV_IO_STATUS_PENDING);
1369 
1370 	if (bdev->split_on_optimal_io_boundary && _spdk_bdev_io_should_split(bdev_io)) {
1371 		if (bdev_io->type == SPDK_BDEV_IO_TYPE_READ) {
1372 			spdk_bdev_io_get_buf(bdev_io, _spdk_bdev_io_split,
1373 					     bdev_io->u.bdev.num_blocks * bdev_io->bdev->blocklen);
1374 		} else {
1375 			_spdk_bdev_io_split(NULL, bdev_io);
1376 		}
1377 		return;
1378 	}
1379 
1380 	if (bdev_io->internal.ch->flags & BDEV_CH_QOS_ENABLED) {
1381 		if ((thread == bdev->internal.qos->thread) || !bdev->internal.qos->thread) {
1382 			_spdk_bdev_io_submit(bdev_io);
1383 		} else {
1384 			bdev_io->internal.io_submit_ch = bdev_io->internal.ch;
1385 			bdev_io->internal.ch = bdev->internal.qos->ch;
1386 			spdk_thread_send_msg(bdev->internal.qos->thread, _spdk_bdev_io_submit, bdev_io);
1387 		}
1388 	} else {
1389 		_spdk_bdev_io_submit(bdev_io);
1390 	}
1391 }
1392 
1393 static void
1394 spdk_bdev_io_submit_reset(struct spdk_bdev_io *bdev_io)
1395 {
1396 	struct spdk_bdev *bdev = bdev_io->bdev;
1397 	struct spdk_bdev_channel *bdev_ch = bdev_io->internal.ch;
1398 	struct spdk_io_channel *ch = bdev_ch->channel;
1399 
1400 	assert(bdev_io->internal.status == SPDK_BDEV_IO_STATUS_PENDING);
1401 
1402 	bdev_io->internal.in_submit_request = true;
1403 	bdev->fn_table->submit_request(ch, bdev_io);
1404 	bdev_io->internal.in_submit_request = false;
1405 }
1406 
1407 static void
1408 spdk_bdev_io_init(struct spdk_bdev_io *bdev_io,
1409 		  struct spdk_bdev *bdev, void *cb_arg,
1410 		  spdk_bdev_io_completion_cb cb)
1411 {
1412 	bdev_io->bdev = bdev;
1413 	bdev_io->internal.caller_ctx = cb_arg;
1414 	bdev_io->internal.cb = cb;
1415 	bdev_io->internal.status = SPDK_BDEV_IO_STATUS_PENDING;
1416 	bdev_io->internal.in_submit_request = false;
1417 	bdev_io->internal.buf = NULL;
1418 	bdev_io->internal.io_submit_ch = NULL;
1419 }
1420 
1421 static bool
1422 _spdk_bdev_io_type_supported(struct spdk_bdev *bdev, enum spdk_bdev_io_type io_type)
1423 {
1424 	return bdev->fn_table->io_type_supported(bdev->ctxt, io_type);
1425 }
1426 
1427 bool
1428 spdk_bdev_io_type_supported(struct spdk_bdev *bdev, enum spdk_bdev_io_type io_type)
1429 {
1430 	bool supported;
1431 
1432 	supported = _spdk_bdev_io_type_supported(bdev, io_type);
1433 
1434 	if (!supported) {
1435 		switch (io_type) {
1436 		case SPDK_BDEV_IO_TYPE_WRITE_ZEROES:
1437 			/* The bdev layer will emulate write zeroes as long as write is supported. */
1438 			supported = _spdk_bdev_io_type_supported(bdev, SPDK_BDEV_IO_TYPE_WRITE);
1439 			break;
1440 		default:
1441 			break;
1442 		}
1443 	}
1444 
1445 	return supported;
1446 }
1447 
1448 int
1449 spdk_bdev_dump_info_json(struct spdk_bdev *bdev, struct spdk_json_write_ctx *w)
1450 {
1451 	if (bdev->fn_table->dump_info_json) {
1452 		return bdev->fn_table->dump_info_json(bdev->ctxt, w);
1453 	}
1454 
1455 	return 0;
1456 }
1457 
1458 static void
1459 spdk_bdev_qos_update_max_quota_per_timeslice(struct spdk_bdev_qos *qos)
1460 {
1461 	uint32_t max_per_timeslice = 0;
1462 	int i;
1463 
1464 	for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) {
1465 		if (qos->rate_limits[i].limit == SPDK_BDEV_QOS_LIMIT_NOT_DEFINED) {
1466 			qos->rate_limits[i].max_per_timeslice = 0;
1467 			continue;
1468 		}
1469 
1470 		max_per_timeslice = qos->rate_limits[i].limit *
1471 				    SPDK_BDEV_QOS_TIMESLICE_IN_USEC / SPDK_SEC_TO_USEC;
1472 
1473 		qos->rate_limits[i].max_per_timeslice = spdk_max(max_per_timeslice,
1474 							qos->rate_limits[i].min_per_timeslice);
1475 
1476 		qos->rate_limits[i].remaining_this_timeslice = qos->rate_limits[i].max_per_timeslice;
1477 	}
1478 }
1479 
1480 static int
1481 spdk_bdev_channel_poll_qos(void *arg)
1482 {
1483 	struct spdk_bdev_qos *qos = arg;
1484 	uint64_t now = spdk_get_ticks();
1485 	int i;
1486 
1487 	if (now < (qos->last_timeslice + qos->timeslice_size)) {
1488 		/* We received our callback earlier than expected - return
1489 		 *  immediately and wait to do accounting until at least one
1490 		 *  timeslice has actually expired.  This should never happen
1491 		 *  with a well-behaved timer implementation.
1492 		 */
1493 		return 0;
1494 	}
1495 
1496 	/* Reset for next round of rate limiting */
1497 	for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) {
1498 		/* We may have allowed the IOs or bytes to slightly overrun in the last
1499 		 * timeslice. remaining_this_timeslice is signed, so if it's negative
1500 		 * here, we'll account for the overrun so that the next timeslice will
1501 		 * be appropriately reduced.
1502 		 */
1503 		if (qos->rate_limits[i].remaining_this_timeslice > 0) {
1504 			qos->rate_limits[i].remaining_this_timeslice = 0;
1505 		}
1506 	}
1507 
1508 	while (now >= (qos->last_timeslice + qos->timeslice_size)) {
1509 		qos->last_timeslice += qos->timeslice_size;
1510 		for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) {
1511 			qos->rate_limits[i].remaining_this_timeslice +=
1512 				qos->rate_limits[i].max_per_timeslice;
1513 		}
1514 	}
1515 
1516 	_spdk_bdev_qos_io_submit(qos->ch);
1517 
1518 	return -1;
1519 }
1520 
1521 static void
1522 _spdk_bdev_channel_destroy_resource(struct spdk_bdev_channel *ch)
1523 {
1524 	struct spdk_bdev_shared_resource *shared_resource;
1525 
1526 	if (!ch) {
1527 		return;
1528 	}
1529 
1530 	if (ch->channel) {
1531 		spdk_put_io_channel(ch->channel);
1532 	}
1533 
1534 	assert(ch->io_outstanding == 0);
1535 
1536 	shared_resource = ch->shared_resource;
1537 	if (shared_resource) {
1538 		assert(ch->io_outstanding == 0);
1539 		assert(shared_resource->ref > 0);
1540 		shared_resource->ref--;
1541 		if (shared_resource->ref == 0) {
1542 			assert(shared_resource->io_outstanding == 0);
1543 			TAILQ_REMOVE(&shared_resource->mgmt_ch->shared_resources, shared_resource, link);
1544 			spdk_put_io_channel(spdk_io_channel_from_ctx(shared_resource->mgmt_ch));
1545 			free(shared_resource);
1546 		}
1547 	}
1548 }
1549 
1550 /* Caller must hold bdev->internal.mutex. */
1551 static void
1552 _spdk_bdev_enable_qos(struct spdk_bdev *bdev, struct spdk_bdev_channel *ch)
1553 {
1554 	struct spdk_bdev_qos	*qos = bdev->internal.qos;
1555 	int			i;
1556 
1557 	/* Rate limiting on this bdev enabled */
1558 	if (qos) {
1559 		if (qos->ch == NULL) {
1560 			struct spdk_io_channel *io_ch;
1561 
1562 			SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Selecting channel %p as QoS channel for bdev %s on thread %p\n", ch,
1563 				      bdev->name, spdk_get_thread());
1564 
1565 			/* No qos channel has been selected, so set one up */
1566 
1567 			/* Take another reference to ch */
1568 			io_ch = spdk_get_io_channel(__bdev_to_io_dev(bdev));
1569 			qos->ch = ch;
1570 
1571 			qos->thread = spdk_io_channel_get_thread(io_ch);
1572 
1573 			TAILQ_INIT(&qos->queued);
1574 
1575 			for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) {
1576 				if (_spdk_bdev_qos_is_iops_rate_limit(i) == true) {
1577 					qos->rate_limits[i].min_per_timeslice =
1578 						SPDK_BDEV_QOS_MIN_IO_PER_TIMESLICE;
1579 				} else {
1580 					qos->rate_limits[i].min_per_timeslice =
1581 						SPDK_BDEV_QOS_MIN_BYTE_PER_TIMESLICE;
1582 				}
1583 
1584 				if (qos->rate_limits[i].limit == 0) {
1585 					qos->rate_limits[i].limit = SPDK_BDEV_QOS_LIMIT_NOT_DEFINED;
1586 				}
1587 			}
1588 			spdk_bdev_qos_update_max_quota_per_timeslice(qos);
1589 			qos->timeslice_size =
1590 				SPDK_BDEV_QOS_TIMESLICE_IN_USEC * spdk_get_ticks_hz() / SPDK_SEC_TO_USEC;
1591 			qos->last_timeslice = spdk_get_ticks();
1592 			qos->poller = spdk_poller_register(spdk_bdev_channel_poll_qos,
1593 							   qos,
1594 							   SPDK_BDEV_QOS_TIMESLICE_IN_USEC);
1595 		}
1596 
1597 		ch->flags |= BDEV_CH_QOS_ENABLED;
1598 	}
1599 }
1600 
1601 static int
1602 spdk_bdev_channel_create(void *io_device, void *ctx_buf)
1603 {
1604 	struct spdk_bdev		*bdev = __bdev_from_io_dev(io_device);
1605 	struct spdk_bdev_channel	*ch = ctx_buf;
1606 	struct spdk_io_channel		*mgmt_io_ch;
1607 	struct spdk_bdev_mgmt_channel	*mgmt_ch;
1608 	struct spdk_bdev_shared_resource *shared_resource;
1609 
1610 	ch->bdev = bdev;
1611 	ch->channel = bdev->fn_table->get_io_channel(bdev->ctxt);
1612 	if (!ch->channel) {
1613 		return -1;
1614 	}
1615 
1616 	mgmt_io_ch = spdk_get_io_channel(&g_bdev_mgr);
1617 	if (!mgmt_io_ch) {
1618 		return -1;
1619 	}
1620 
1621 	mgmt_ch = spdk_io_channel_get_ctx(mgmt_io_ch);
1622 	TAILQ_FOREACH(shared_resource, &mgmt_ch->shared_resources, link) {
1623 		if (shared_resource->shared_ch == ch->channel) {
1624 			spdk_put_io_channel(mgmt_io_ch);
1625 			shared_resource->ref++;
1626 			break;
1627 		}
1628 	}
1629 
1630 	if (shared_resource == NULL) {
1631 		shared_resource = calloc(1, sizeof(*shared_resource));
1632 		if (shared_resource == NULL) {
1633 			spdk_put_io_channel(mgmt_io_ch);
1634 			return -1;
1635 		}
1636 
1637 		shared_resource->mgmt_ch = mgmt_ch;
1638 		shared_resource->io_outstanding = 0;
1639 		TAILQ_INIT(&shared_resource->nomem_io);
1640 		shared_resource->nomem_threshold = 0;
1641 		shared_resource->shared_ch = ch->channel;
1642 		shared_resource->ref = 1;
1643 		TAILQ_INSERT_TAIL(&mgmt_ch->shared_resources, shared_resource, link);
1644 	}
1645 
1646 	memset(&ch->stat, 0, sizeof(ch->stat));
1647 	ch->stat.ticks_rate = spdk_get_ticks_hz();
1648 	ch->io_outstanding = 0;
1649 	TAILQ_INIT(&ch->queued_resets);
1650 	ch->flags = 0;
1651 	ch->shared_resource = shared_resource;
1652 
1653 #ifdef SPDK_CONFIG_VTUNE
1654 	{
1655 		char *name;
1656 		__itt_init_ittlib(NULL, 0);
1657 		name = spdk_sprintf_alloc("spdk_bdev_%s_%p", ch->bdev->name, ch);
1658 		if (!name) {
1659 			_spdk_bdev_channel_destroy_resource(ch);
1660 			return -1;
1661 		}
1662 		ch->handle = __itt_string_handle_create(name);
1663 		free(name);
1664 		ch->start_tsc = spdk_get_ticks();
1665 		ch->interval_tsc = spdk_get_ticks_hz() / 100;
1666 		memset(&ch->prev_stat, 0, sizeof(ch->prev_stat));
1667 	}
1668 #endif
1669 
1670 	pthread_mutex_lock(&bdev->internal.mutex);
1671 	_spdk_bdev_enable_qos(bdev, ch);
1672 	pthread_mutex_unlock(&bdev->internal.mutex);
1673 
1674 	return 0;
1675 }
1676 
1677 /*
1678  * Abort I/O that are waiting on a data buffer.  These types of I/O are
1679  *  linked using the spdk_bdev_io internal.buf_link TAILQ_ENTRY.
1680  */
1681 static void
1682 _spdk_bdev_abort_buf_io(bdev_io_stailq_t *queue, struct spdk_bdev_channel *ch)
1683 {
1684 	bdev_io_stailq_t tmp;
1685 	struct spdk_bdev_io *bdev_io;
1686 
1687 	STAILQ_INIT(&tmp);
1688 
1689 	while (!STAILQ_EMPTY(queue)) {
1690 		bdev_io = STAILQ_FIRST(queue);
1691 		STAILQ_REMOVE_HEAD(queue, internal.buf_link);
1692 		if (bdev_io->internal.ch == ch) {
1693 			spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED);
1694 		} else {
1695 			STAILQ_INSERT_TAIL(&tmp, bdev_io, internal.buf_link);
1696 		}
1697 	}
1698 
1699 	STAILQ_SWAP(&tmp, queue, spdk_bdev_io);
1700 }
1701 
1702 /*
1703  * Abort I/O that are queued waiting for submission.  These types of I/O are
1704  *  linked using the spdk_bdev_io link TAILQ_ENTRY.
1705  */
1706 static void
1707 _spdk_bdev_abort_queued_io(bdev_io_tailq_t *queue, struct spdk_bdev_channel *ch)
1708 {
1709 	struct spdk_bdev_io *bdev_io, *tmp;
1710 
1711 	TAILQ_FOREACH_SAFE(bdev_io, queue, internal.link, tmp) {
1712 		if (bdev_io->internal.ch == ch) {
1713 			TAILQ_REMOVE(queue, bdev_io, internal.link);
1714 			/*
1715 			 * spdk_bdev_io_complete() assumes that the completed I/O had
1716 			 *  been submitted to the bdev module.  Since in this case it
1717 			 *  hadn't, bump io_outstanding to account for the decrement
1718 			 *  that spdk_bdev_io_complete() will do.
1719 			 */
1720 			if (bdev_io->type != SPDK_BDEV_IO_TYPE_RESET) {
1721 				ch->io_outstanding++;
1722 				ch->shared_resource->io_outstanding++;
1723 			}
1724 			spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED);
1725 		}
1726 	}
1727 }
1728 
1729 static void
1730 spdk_bdev_qos_channel_destroy(void *cb_arg)
1731 {
1732 	struct spdk_bdev_qos *qos = cb_arg;
1733 
1734 	spdk_put_io_channel(spdk_io_channel_from_ctx(qos->ch));
1735 	spdk_poller_unregister(&qos->poller);
1736 
1737 	SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Free QoS %p.\n", qos);
1738 
1739 	free(qos);
1740 }
1741 
1742 static int
1743 spdk_bdev_qos_destroy(struct spdk_bdev *bdev)
1744 {
1745 	int i;
1746 
1747 	/*
1748 	 * Cleanly shutting down the QoS poller is tricky, because
1749 	 * during the asynchronous operation the user could open
1750 	 * a new descriptor and create a new channel, spawning
1751 	 * a new QoS poller.
1752 	 *
1753 	 * The strategy is to create a new QoS structure here and swap it
1754 	 * in. The shutdown path then continues to refer to the old one
1755 	 * until it completes and then releases it.
1756 	 */
1757 	struct spdk_bdev_qos *new_qos, *old_qos;
1758 
1759 	old_qos = bdev->internal.qos;
1760 
1761 	new_qos = calloc(1, sizeof(*new_qos));
1762 	if (!new_qos) {
1763 		SPDK_ERRLOG("Unable to allocate memory to shut down QoS.\n");
1764 		return -ENOMEM;
1765 	}
1766 
1767 	/* Copy the old QoS data into the newly allocated structure */
1768 	memcpy(new_qos, old_qos, sizeof(*new_qos));
1769 
1770 	/* Zero out the key parts of the QoS structure */
1771 	new_qos->ch = NULL;
1772 	new_qos->thread = NULL;
1773 	new_qos->poller = NULL;
1774 	TAILQ_INIT(&new_qos->queued);
1775 	/*
1776 	 * The limit member of spdk_bdev_qos_limit structure is not zeroed.
1777 	 * It will be used later for the new QoS structure.
1778 	 */
1779 	for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) {
1780 		new_qos->rate_limits[i].remaining_this_timeslice = 0;
1781 		new_qos->rate_limits[i].min_per_timeslice = 0;
1782 		new_qos->rate_limits[i].max_per_timeslice = 0;
1783 	}
1784 
1785 	bdev->internal.qos = new_qos;
1786 
1787 	if (old_qos->thread == NULL) {
1788 		free(old_qos);
1789 	} else {
1790 		spdk_thread_send_msg(old_qos->thread, spdk_bdev_qos_channel_destroy,
1791 				     old_qos);
1792 	}
1793 
1794 	/* It is safe to continue with destroying the bdev even though the QoS channel hasn't
1795 	 * been destroyed yet. The destruction path will end up waiting for the final
1796 	 * channel to be put before it releases resources. */
1797 
1798 	return 0;
1799 }
1800 
1801 static void
1802 _spdk_bdev_io_stat_add(struct spdk_bdev_io_stat *total, struct spdk_bdev_io_stat *add)
1803 {
1804 	total->bytes_read += add->bytes_read;
1805 	total->num_read_ops += add->num_read_ops;
1806 	total->bytes_written += add->bytes_written;
1807 	total->num_write_ops += add->num_write_ops;
1808 	total->read_latency_ticks += add->read_latency_ticks;
1809 	total->write_latency_ticks += add->write_latency_ticks;
1810 }
1811 
1812 static void
1813 spdk_bdev_channel_destroy(void *io_device, void *ctx_buf)
1814 {
1815 	struct spdk_bdev_channel	*ch = ctx_buf;
1816 	struct spdk_bdev_mgmt_channel	*mgmt_ch;
1817 	struct spdk_bdev_shared_resource *shared_resource = ch->shared_resource;
1818 
1819 	SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Destroying channel %p for bdev %s on thread %p\n", ch, ch->bdev->name,
1820 		      spdk_get_thread());
1821 
1822 	/* This channel is going away, so add its statistics into the bdev so that they don't get lost. */
1823 	pthread_mutex_lock(&ch->bdev->internal.mutex);
1824 	_spdk_bdev_io_stat_add(&ch->bdev->internal.stat, &ch->stat);
1825 	pthread_mutex_unlock(&ch->bdev->internal.mutex);
1826 
1827 	mgmt_ch = shared_resource->mgmt_ch;
1828 
1829 	_spdk_bdev_abort_queued_io(&ch->queued_resets, ch);
1830 	_spdk_bdev_abort_queued_io(&shared_resource->nomem_io, ch);
1831 	_spdk_bdev_abort_buf_io(&mgmt_ch->need_buf_small, ch);
1832 	_spdk_bdev_abort_buf_io(&mgmt_ch->need_buf_large, ch);
1833 
1834 	_spdk_bdev_channel_destroy_resource(ch);
1835 }
1836 
1837 int
1838 spdk_bdev_alias_add(struct spdk_bdev *bdev, const char *alias)
1839 {
1840 	struct spdk_bdev_alias *tmp;
1841 
1842 	if (alias == NULL) {
1843 		SPDK_ERRLOG("Empty alias passed\n");
1844 		return -EINVAL;
1845 	}
1846 
1847 	if (spdk_bdev_get_by_name(alias)) {
1848 		SPDK_ERRLOG("Bdev name/alias: %s already exists\n", alias);
1849 		return -EEXIST;
1850 	}
1851 
1852 	tmp = calloc(1, sizeof(*tmp));
1853 	if (tmp == NULL) {
1854 		SPDK_ERRLOG("Unable to allocate alias\n");
1855 		return -ENOMEM;
1856 	}
1857 
1858 	tmp->alias = strdup(alias);
1859 	if (tmp->alias == NULL) {
1860 		free(tmp);
1861 		SPDK_ERRLOG("Unable to allocate alias\n");
1862 		return -ENOMEM;
1863 	}
1864 
1865 	TAILQ_INSERT_TAIL(&bdev->aliases, tmp, tailq);
1866 
1867 	return 0;
1868 }
1869 
1870 int
1871 spdk_bdev_alias_del(struct spdk_bdev *bdev, const char *alias)
1872 {
1873 	struct spdk_bdev_alias *tmp;
1874 
1875 	TAILQ_FOREACH(tmp, &bdev->aliases, tailq) {
1876 		if (strcmp(alias, tmp->alias) == 0) {
1877 			TAILQ_REMOVE(&bdev->aliases, tmp, tailq);
1878 			free(tmp->alias);
1879 			free(tmp);
1880 			return 0;
1881 		}
1882 	}
1883 
1884 	SPDK_INFOLOG(SPDK_LOG_BDEV, "Alias %s does not exists\n", alias);
1885 
1886 	return -ENOENT;
1887 }
1888 
1889 void
1890 spdk_bdev_alias_del_all(struct spdk_bdev *bdev)
1891 {
1892 	struct spdk_bdev_alias *p, *tmp;
1893 
1894 	TAILQ_FOREACH_SAFE(p, &bdev->aliases, tailq, tmp) {
1895 		TAILQ_REMOVE(&bdev->aliases, p, tailq);
1896 		free(p->alias);
1897 		free(p);
1898 	}
1899 }
1900 
1901 struct spdk_io_channel *
1902 spdk_bdev_get_io_channel(struct spdk_bdev_desc *desc)
1903 {
1904 	return spdk_get_io_channel(__bdev_to_io_dev(desc->bdev));
1905 }
1906 
1907 const char *
1908 spdk_bdev_get_name(const struct spdk_bdev *bdev)
1909 {
1910 	return bdev->name;
1911 }
1912 
1913 const char *
1914 spdk_bdev_get_product_name(const struct spdk_bdev *bdev)
1915 {
1916 	return bdev->product_name;
1917 }
1918 
1919 const struct spdk_bdev_aliases_list *
1920 spdk_bdev_get_aliases(const struct spdk_bdev *bdev)
1921 {
1922 	return &bdev->aliases;
1923 }
1924 
1925 uint32_t
1926 spdk_bdev_get_block_size(const struct spdk_bdev *bdev)
1927 {
1928 	return bdev->blocklen;
1929 }
1930 
1931 uint64_t
1932 spdk_bdev_get_num_blocks(const struct spdk_bdev *bdev)
1933 {
1934 	return bdev->blockcnt;
1935 }
1936 
1937 const char *
1938 spdk_bdev_get_qos_rpc_type(enum spdk_bdev_qos_rate_limit_type type)
1939 {
1940 	return qos_rpc_type[type];
1941 }
1942 
1943 void
1944 spdk_bdev_get_qos_rate_limits(struct spdk_bdev *bdev, uint64_t *limits)
1945 {
1946 	int i;
1947 
1948 	memset(limits, 0, sizeof(*limits) * SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES);
1949 
1950 	pthread_mutex_lock(&bdev->internal.mutex);
1951 	if (bdev->internal.qos) {
1952 		for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) {
1953 			if (bdev->internal.qos->rate_limits[i].limit !=
1954 			    SPDK_BDEV_QOS_LIMIT_NOT_DEFINED) {
1955 				limits[i] = bdev->internal.qos->rate_limits[i].limit;
1956 			}
1957 		}
1958 	}
1959 	pthread_mutex_unlock(&bdev->internal.mutex);
1960 }
1961 
1962 size_t
1963 spdk_bdev_get_buf_align(const struct spdk_bdev *bdev)
1964 {
1965 	/* TODO: push this logic down to the bdev modules */
1966 	if (bdev->need_aligned_buffer) {
1967 		return bdev->blocklen;
1968 	}
1969 
1970 	return 1;
1971 }
1972 
1973 uint32_t
1974 spdk_bdev_get_optimal_io_boundary(const struct spdk_bdev *bdev)
1975 {
1976 	return bdev->optimal_io_boundary;
1977 }
1978 
1979 bool
1980 spdk_bdev_has_write_cache(const struct spdk_bdev *bdev)
1981 {
1982 	return bdev->write_cache;
1983 }
1984 
1985 const struct spdk_uuid *
1986 spdk_bdev_get_uuid(const struct spdk_bdev *bdev)
1987 {
1988 	return &bdev->uuid;
1989 }
1990 
1991 uint64_t
1992 spdk_bdev_get_qd(const struct spdk_bdev *bdev)
1993 {
1994 	return bdev->internal.measured_queue_depth;
1995 }
1996 
1997 uint64_t
1998 spdk_bdev_get_qd_sampling_period(const struct spdk_bdev *bdev)
1999 {
2000 	return bdev->internal.period;
2001 }
2002 
2003 uint64_t
2004 spdk_bdev_get_weighted_io_time(const struct spdk_bdev *bdev)
2005 {
2006 	return bdev->internal.weighted_io_time;
2007 }
2008 
2009 uint64_t
2010 spdk_bdev_get_io_time(const struct spdk_bdev *bdev)
2011 {
2012 	return bdev->internal.io_time;
2013 }
2014 
2015 static void
2016 _calculate_measured_qd_cpl(struct spdk_io_channel_iter *i, int status)
2017 {
2018 	struct spdk_bdev *bdev = spdk_io_channel_iter_get_ctx(i);
2019 
2020 	bdev->internal.measured_queue_depth = bdev->internal.temporary_queue_depth;
2021 
2022 	if (bdev->internal.measured_queue_depth) {
2023 		bdev->internal.io_time += bdev->internal.period;
2024 		bdev->internal.weighted_io_time += bdev->internal.period * bdev->internal.measured_queue_depth;
2025 	}
2026 }
2027 
2028 static void
2029 _calculate_measured_qd(struct spdk_io_channel_iter *i)
2030 {
2031 	struct spdk_bdev *bdev = spdk_io_channel_iter_get_ctx(i);
2032 	struct spdk_io_channel *io_ch = spdk_io_channel_iter_get_channel(i);
2033 	struct spdk_bdev_channel *ch = spdk_io_channel_get_ctx(io_ch);
2034 
2035 	bdev->internal.temporary_queue_depth += ch->io_outstanding;
2036 	spdk_for_each_channel_continue(i, 0);
2037 }
2038 
2039 static int
2040 spdk_bdev_calculate_measured_queue_depth(void *ctx)
2041 {
2042 	struct spdk_bdev *bdev = ctx;
2043 	bdev->internal.temporary_queue_depth = 0;
2044 	spdk_for_each_channel(__bdev_to_io_dev(bdev), _calculate_measured_qd, bdev,
2045 			      _calculate_measured_qd_cpl);
2046 	return 0;
2047 }
2048 
2049 void
2050 spdk_bdev_set_qd_sampling_period(struct spdk_bdev *bdev, uint64_t period)
2051 {
2052 	bdev->internal.period = period;
2053 
2054 	if (bdev->internal.qd_poller != NULL) {
2055 		spdk_poller_unregister(&bdev->internal.qd_poller);
2056 		bdev->internal.measured_queue_depth = UINT64_MAX;
2057 	}
2058 
2059 	if (period != 0) {
2060 		bdev->internal.qd_poller = spdk_poller_register(spdk_bdev_calculate_measured_queue_depth, bdev,
2061 					   period);
2062 	}
2063 }
2064 
2065 int
2066 spdk_bdev_notify_blockcnt_change(struct spdk_bdev *bdev, uint64_t size)
2067 {
2068 	int ret;
2069 
2070 	pthread_mutex_lock(&bdev->internal.mutex);
2071 
2072 	/* bdev has open descriptors */
2073 	if (!TAILQ_EMPTY(&bdev->internal.open_descs) &&
2074 	    bdev->blockcnt > size) {
2075 		ret = -EBUSY;
2076 	} else {
2077 		bdev->blockcnt = size;
2078 		ret = 0;
2079 	}
2080 
2081 	pthread_mutex_unlock(&bdev->internal.mutex);
2082 
2083 	return ret;
2084 }
2085 
2086 /*
2087  * Convert I/O offset and length from bytes to blocks.
2088  *
2089  * Returns zero on success or non-zero if the byte parameters aren't divisible by the block size.
2090  */
2091 static uint64_t
2092 spdk_bdev_bytes_to_blocks(struct spdk_bdev *bdev, uint64_t offset_bytes, uint64_t *offset_blocks,
2093 			  uint64_t num_bytes, uint64_t *num_blocks)
2094 {
2095 	uint32_t block_size = bdev->blocklen;
2096 
2097 	*offset_blocks = offset_bytes / block_size;
2098 	*num_blocks = num_bytes / block_size;
2099 
2100 	return (offset_bytes % block_size) | (num_bytes % block_size);
2101 }
2102 
2103 static bool
2104 spdk_bdev_io_valid_blocks(struct spdk_bdev *bdev, uint64_t offset_blocks, uint64_t num_blocks)
2105 {
2106 	/* Return failure if offset_blocks + num_blocks is less than offset_blocks; indicates there
2107 	 * has been an overflow and hence the offset has been wrapped around */
2108 	if (offset_blocks + num_blocks < offset_blocks) {
2109 		return false;
2110 	}
2111 
2112 	/* Return failure if offset_blocks + num_blocks exceeds the size of the bdev */
2113 	if (offset_blocks + num_blocks > bdev->blockcnt) {
2114 		return false;
2115 	}
2116 
2117 	return true;
2118 }
2119 
2120 int
2121 spdk_bdev_read(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
2122 	       void *buf, uint64_t offset, uint64_t nbytes,
2123 	       spdk_bdev_io_completion_cb cb, void *cb_arg)
2124 {
2125 	uint64_t offset_blocks, num_blocks;
2126 
2127 	if (spdk_bdev_bytes_to_blocks(desc->bdev, offset, &offset_blocks, nbytes, &num_blocks) != 0) {
2128 		return -EINVAL;
2129 	}
2130 
2131 	return spdk_bdev_read_blocks(desc, ch, buf, offset_blocks, num_blocks, cb, cb_arg);
2132 }
2133 
2134 int
2135 spdk_bdev_read_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
2136 		      void *buf, uint64_t offset_blocks, uint64_t num_blocks,
2137 		      spdk_bdev_io_completion_cb cb, void *cb_arg)
2138 {
2139 	struct spdk_bdev *bdev = desc->bdev;
2140 	struct spdk_bdev_io *bdev_io;
2141 	struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch);
2142 
2143 	if (!spdk_bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) {
2144 		return -EINVAL;
2145 	}
2146 
2147 	bdev_io = spdk_bdev_get_io(channel);
2148 	if (!bdev_io) {
2149 		return -ENOMEM;
2150 	}
2151 
2152 	bdev_io->internal.ch = channel;
2153 	bdev_io->internal.desc = desc;
2154 	bdev_io->type = SPDK_BDEV_IO_TYPE_READ;
2155 	bdev_io->u.bdev.iovs = &bdev_io->iov;
2156 	bdev_io->u.bdev.iovs[0].iov_base = buf;
2157 	bdev_io->u.bdev.iovs[0].iov_len = num_blocks * bdev->blocklen;
2158 	bdev_io->u.bdev.iovcnt = 1;
2159 	bdev_io->u.bdev.num_blocks = num_blocks;
2160 	bdev_io->u.bdev.offset_blocks = offset_blocks;
2161 	spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb);
2162 
2163 	spdk_bdev_io_submit(bdev_io);
2164 	return 0;
2165 }
2166 
2167 int
2168 spdk_bdev_readv(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
2169 		struct iovec *iov, int iovcnt,
2170 		uint64_t offset, uint64_t nbytes,
2171 		spdk_bdev_io_completion_cb cb, void *cb_arg)
2172 {
2173 	uint64_t offset_blocks, num_blocks;
2174 
2175 	if (spdk_bdev_bytes_to_blocks(desc->bdev, offset, &offset_blocks, nbytes, &num_blocks) != 0) {
2176 		return -EINVAL;
2177 	}
2178 
2179 	return spdk_bdev_readv_blocks(desc, ch, iov, iovcnt, offset_blocks, num_blocks, cb, cb_arg);
2180 }
2181 
2182 int spdk_bdev_readv_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
2183 			   struct iovec *iov, int iovcnt,
2184 			   uint64_t offset_blocks, uint64_t num_blocks,
2185 			   spdk_bdev_io_completion_cb cb, void *cb_arg)
2186 {
2187 	struct spdk_bdev *bdev = desc->bdev;
2188 	struct spdk_bdev_io *bdev_io;
2189 	struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch);
2190 
2191 	if (!spdk_bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) {
2192 		return -EINVAL;
2193 	}
2194 
2195 	bdev_io = spdk_bdev_get_io(channel);
2196 	if (!bdev_io) {
2197 		return -ENOMEM;
2198 	}
2199 
2200 	bdev_io->internal.ch = channel;
2201 	bdev_io->internal.desc = desc;
2202 	bdev_io->type = SPDK_BDEV_IO_TYPE_READ;
2203 	bdev_io->u.bdev.iovs = iov;
2204 	bdev_io->u.bdev.iovcnt = iovcnt;
2205 	bdev_io->u.bdev.num_blocks = num_blocks;
2206 	bdev_io->u.bdev.offset_blocks = offset_blocks;
2207 	spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb);
2208 
2209 	spdk_bdev_io_submit(bdev_io);
2210 	return 0;
2211 }
2212 
2213 int
2214 spdk_bdev_write(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
2215 		void *buf, uint64_t offset, uint64_t nbytes,
2216 		spdk_bdev_io_completion_cb cb, void *cb_arg)
2217 {
2218 	uint64_t offset_blocks, num_blocks;
2219 
2220 	if (spdk_bdev_bytes_to_blocks(desc->bdev, offset, &offset_blocks, nbytes, &num_blocks) != 0) {
2221 		return -EINVAL;
2222 	}
2223 
2224 	return spdk_bdev_write_blocks(desc, ch, buf, offset_blocks, num_blocks, cb, cb_arg);
2225 }
2226 
2227 int
2228 spdk_bdev_write_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
2229 		       void *buf, uint64_t offset_blocks, uint64_t num_blocks,
2230 		       spdk_bdev_io_completion_cb cb, void *cb_arg)
2231 {
2232 	struct spdk_bdev *bdev = desc->bdev;
2233 	struct spdk_bdev_io *bdev_io;
2234 	struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch);
2235 
2236 	if (!desc->write) {
2237 		return -EBADF;
2238 	}
2239 
2240 	if (!spdk_bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) {
2241 		return -EINVAL;
2242 	}
2243 
2244 	bdev_io = spdk_bdev_get_io(channel);
2245 	if (!bdev_io) {
2246 		return -ENOMEM;
2247 	}
2248 
2249 	bdev_io->internal.ch = channel;
2250 	bdev_io->internal.desc = desc;
2251 	bdev_io->type = SPDK_BDEV_IO_TYPE_WRITE;
2252 	bdev_io->u.bdev.iovs = &bdev_io->iov;
2253 	bdev_io->u.bdev.iovs[0].iov_base = buf;
2254 	bdev_io->u.bdev.iovs[0].iov_len = num_blocks * bdev->blocklen;
2255 	bdev_io->u.bdev.iovcnt = 1;
2256 	bdev_io->u.bdev.num_blocks = num_blocks;
2257 	bdev_io->u.bdev.offset_blocks = offset_blocks;
2258 	spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb);
2259 
2260 	spdk_bdev_io_submit(bdev_io);
2261 	return 0;
2262 }
2263 
2264 int
2265 spdk_bdev_writev(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
2266 		 struct iovec *iov, int iovcnt,
2267 		 uint64_t offset, uint64_t len,
2268 		 spdk_bdev_io_completion_cb cb, void *cb_arg)
2269 {
2270 	uint64_t offset_blocks, num_blocks;
2271 
2272 	if (spdk_bdev_bytes_to_blocks(desc->bdev, offset, &offset_blocks, len, &num_blocks) != 0) {
2273 		return -EINVAL;
2274 	}
2275 
2276 	return spdk_bdev_writev_blocks(desc, ch, iov, iovcnt, offset_blocks, num_blocks, cb, cb_arg);
2277 }
2278 
2279 int
2280 spdk_bdev_writev_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
2281 			struct iovec *iov, int iovcnt,
2282 			uint64_t offset_blocks, uint64_t num_blocks,
2283 			spdk_bdev_io_completion_cb cb, void *cb_arg)
2284 {
2285 	struct spdk_bdev *bdev = desc->bdev;
2286 	struct spdk_bdev_io *bdev_io;
2287 	struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch);
2288 
2289 	if (!desc->write) {
2290 		return -EBADF;
2291 	}
2292 
2293 	if (!spdk_bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) {
2294 		return -EINVAL;
2295 	}
2296 
2297 	bdev_io = spdk_bdev_get_io(channel);
2298 	if (!bdev_io) {
2299 		return -ENOMEM;
2300 	}
2301 
2302 	bdev_io->internal.ch = channel;
2303 	bdev_io->internal.desc = desc;
2304 	bdev_io->type = SPDK_BDEV_IO_TYPE_WRITE;
2305 	bdev_io->u.bdev.iovs = iov;
2306 	bdev_io->u.bdev.iovcnt = iovcnt;
2307 	bdev_io->u.bdev.num_blocks = num_blocks;
2308 	bdev_io->u.bdev.offset_blocks = offset_blocks;
2309 	spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb);
2310 
2311 	spdk_bdev_io_submit(bdev_io);
2312 	return 0;
2313 }
2314 
2315 int
2316 spdk_bdev_write_zeroes(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
2317 		       uint64_t offset, uint64_t len,
2318 		       spdk_bdev_io_completion_cb cb, void *cb_arg)
2319 {
2320 	uint64_t offset_blocks, num_blocks;
2321 
2322 	if (spdk_bdev_bytes_to_blocks(desc->bdev, offset, &offset_blocks, len, &num_blocks) != 0) {
2323 		return -EINVAL;
2324 	}
2325 
2326 	return spdk_bdev_write_zeroes_blocks(desc, ch, offset_blocks, num_blocks, cb, cb_arg);
2327 }
2328 
2329 int
2330 spdk_bdev_write_zeroes_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
2331 			      uint64_t offset_blocks, uint64_t num_blocks,
2332 			      spdk_bdev_io_completion_cb cb, void *cb_arg)
2333 {
2334 	struct spdk_bdev *bdev = desc->bdev;
2335 	struct spdk_bdev_io *bdev_io;
2336 	struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch);
2337 
2338 	if (!desc->write) {
2339 		return -EBADF;
2340 	}
2341 
2342 	if (!spdk_bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) {
2343 		return -EINVAL;
2344 	}
2345 
2346 	bdev_io = spdk_bdev_get_io(channel);
2347 
2348 	if (!bdev_io) {
2349 		return -ENOMEM;
2350 	}
2351 
2352 	bdev_io->type = SPDK_BDEV_IO_TYPE_WRITE_ZEROES;
2353 	bdev_io->internal.ch = channel;
2354 	bdev_io->internal.desc = desc;
2355 	bdev_io->u.bdev.offset_blocks = offset_blocks;
2356 	bdev_io->u.bdev.num_blocks = num_blocks;
2357 	spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb);
2358 
2359 	if (_spdk_bdev_io_type_supported(bdev, SPDK_BDEV_IO_TYPE_WRITE_ZEROES)) {
2360 		spdk_bdev_io_submit(bdev_io);
2361 		return 0;
2362 	} else if (_spdk_bdev_io_type_supported(bdev, SPDK_BDEV_IO_TYPE_WRITE)) {
2363 		assert(spdk_bdev_get_block_size(bdev) <= ZERO_BUFFER_SIZE);
2364 		bdev_io->u.bdev.split_remaining_num_blocks = num_blocks;
2365 		bdev_io->u.bdev.split_current_offset_blocks = offset_blocks;
2366 		_spdk_bdev_write_zero_buffer_next(bdev_io);
2367 		return 0;
2368 	} else {
2369 		spdk_bdev_free_io(bdev_io);
2370 		return -ENOTSUP;
2371 	}
2372 }
2373 
2374 int
2375 spdk_bdev_unmap(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
2376 		uint64_t offset, uint64_t nbytes,
2377 		spdk_bdev_io_completion_cb cb, void *cb_arg)
2378 {
2379 	uint64_t offset_blocks, num_blocks;
2380 
2381 	if (spdk_bdev_bytes_to_blocks(desc->bdev, offset, &offset_blocks, nbytes, &num_blocks) != 0) {
2382 		return -EINVAL;
2383 	}
2384 
2385 	return spdk_bdev_unmap_blocks(desc, ch, offset_blocks, num_blocks, cb, cb_arg);
2386 }
2387 
2388 int
2389 spdk_bdev_unmap_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
2390 		       uint64_t offset_blocks, uint64_t num_blocks,
2391 		       spdk_bdev_io_completion_cb cb, void *cb_arg)
2392 {
2393 	struct spdk_bdev *bdev = desc->bdev;
2394 	struct spdk_bdev_io *bdev_io;
2395 	struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch);
2396 
2397 	if (!desc->write) {
2398 		return -EBADF;
2399 	}
2400 
2401 	if (!spdk_bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) {
2402 		return -EINVAL;
2403 	}
2404 
2405 	if (num_blocks == 0) {
2406 		SPDK_ERRLOG("Can't unmap 0 bytes\n");
2407 		return -EINVAL;
2408 	}
2409 
2410 	bdev_io = spdk_bdev_get_io(channel);
2411 	if (!bdev_io) {
2412 		return -ENOMEM;
2413 	}
2414 
2415 	bdev_io->internal.ch = channel;
2416 	bdev_io->internal.desc = desc;
2417 	bdev_io->type = SPDK_BDEV_IO_TYPE_UNMAP;
2418 
2419 	bdev_io->u.bdev.iovs = &bdev_io->iov;
2420 	bdev_io->u.bdev.iovs[0].iov_base = NULL;
2421 	bdev_io->u.bdev.iovs[0].iov_len = 0;
2422 	bdev_io->u.bdev.iovcnt = 1;
2423 
2424 	bdev_io->u.bdev.offset_blocks = offset_blocks;
2425 	bdev_io->u.bdev.num_blocks = num_blocks;
2426 	spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb);
2427 
2428 	spdk_bdev_io_submit(bdev_io);
2429 	return 0;
2430 }
2431 
2432 int
2433 spdk_bdev_flush(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
2434 		uint64_t offset, uint64_t length,
2435 		spdk_bdev_io_completion_cb cb, void *cb_arg)
2436 {
2437 	uint64_t offset_blocks, num_blocks;
2438 
2439 	if (spdk_bdev_bytes_to_blocks(desc->bdev, offset, &offset_blocks, length, &num_blocks) != 0) {
2440 		return -EINVAL;
2441 	}
2442 
2443 	return spdk_bdev_flush_blocks(desc, ch, offset_blocks, num_blocks, cb, cb_arg);
2444 }
2445 
2446 int
2447 spdk_bdev_flush_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
2448 		       uint64_t offset_blocks, uint64_t num_blocks,
2449 		       spdk_bdev_io_completion_cb cb, void *cb_arg)
2450 {
2451 	struct spdk_bdev *bdev = desc->bdev;
2452 	struct spdk_bdev_io *bdev_io;
2453 	struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch);
2454 
2455 	if (!desc->write) {
2456 		return -EBADF;
2457 	}
2458 
2459 	if (!spdk_bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) {
2460 		return -EINVAL;
2461 	}
2462 
2463 	bdev_io = spdk_bdev_get_io(channel);
2464 	if (!bdev_io) {
2465 		return -ENOMEM;
2466 	}
2467 
2468 	bdev_io->internal.ch = channel;
2469 	bdev_io->internal.desc = desc;
2470 	bdev_io->type = SPDK_BDEV_IO_TYPE_FLUSH;
2471 	bdev_io->u.bdev.iovs = NULL;
2472 	bdev_io->u.bdev.iovcnt = 0;
2473 	bdev_io->u.bdev.offset_blocks = offset_blocks;
2474 	bdev_io->u.bdev.num_blocks = num_blocks;
2475 	spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb);
2476 
2477 	spdk_bdev_io_submit(bdev_io);
2478 	return 0;
2479 }
2480 
2481 static void
2482 _spdk_bdev_reset_dev(struct spdk_io_channel_iter *i, int status)
2483 {
2484 	struct spdk_bdev_channel *ch = spdk_io_channel_iter_get_ctx(i);
2485 	struct spdk_bdev_io *bdev_io;
2486 
2487 	bdev_io = TAILQ_FIRST(&ch->queued_resets);
2488 	TAILQ_REMOVE(&ch->queued_resets, bdev_io, internal.link);
2489 	spdk_bdev_io_submit_reset(bdev_io);
2490 }
2491 
2492 static void
2493 _spdk_bdev_reset_freeze_channel(struct spdk_io_channel_iter *i)
2494 {
2495 	struct spdk_io_channel		*ch;
2496 	struct spdk_bdev_channel	*channel;
2497 	struct spdk_bdev_mgmt_channel	*mgmt_channel;
2498 	struct spdk_bdev_shared_resource *shared_resource;
2499 	bdev_io_tailq_t			tmp_queued;
2500 
2501 	TAILQ_INIT(&tmp_queued);
2502 
2503 	ch = spdk_io_channel_iter_get_channel(i);
2504 	channel = spdk_io_channel_get_ctx(ch);
2505 	shared_resource = channel->shared_resource;
2506 	mgmt_channel = shared_resource->mgmt_ch;
2507 
2508 	channel->flags |= BDEV_CH_RESET_IN_PROGRESS;
2509 
2510 	if ((channel->flags & BDEV_CH_QOS_ENABLED) != 0) {
2511 		/* The QoS object is always valid and readable while
2512 		 * the channel flag is set, so the lock here should not
2513 		 * be necessary. We're not in the fast path though, so
2514 		 * just take it anyway. */
2515 		pthread_mutex_lock(&channel->bdev->internal.mutex);
2516 		if (channel->bdev->internal.qos->ch == channel) {
2517 			TAILQ_SWAP(&channel->bdev->internal.qos->queued, &tmp_queued, spdk_bdev_io, internal.link);
2518 		}
2519 		pthread_mutex_unlock(&channel->bdev->internal.mutex);
2520 	}
2521 
2522 	_spdk_bdev_abort_queued_io(&shared_resource->nomem_io, channel);
2523 	_spdk_bdev_abort_buf_io(&mgmt_channel->need_buf_small, channel);
2524 	_spdk_bdev_abort_buf_io(&mgmt_channel->need_buf_large, channel);
2525 	_spdk_bdev_abort_queued_io(&tmp_queued, channel);
2526 
2527 	spdk_for_each_channel_continue(i, 0);
2528 }
2529 
2530 static void
2531 _spdk_bdev_start_reset(void *ctx)
2532 {
2533 	struct spdk_bdev_channel *ch = ctx;
2534 
2535 	spdk_for_each_channel(__bdev_to_io_dev(ch->bdev), _spdk_bdev_reset_freeze_channel,
2536 			      ch, _spdk_bdev_reset_dev);
2537 }
2538 
2539 static void
2540 _spdk_bdev_channel_start_reset(struct spdk_bdev_channel *ch)
2541 {
2542 	struct spdk_bdev *bdev = ch->bdev;
2543 
2544 	assert(!TAILQ_EMPTY(&ch->queued_resets));
2545 
2546 	pthread_mutex_lock(&bdev->internal.mutex);
2547 	if (bdev->internal.reset_in_progress == NULL) {
2548 		bdev->internal.reset_in_progress = TAILQ_FIRST(&ch->queued_resets);
2549 		/*
2550 		 * Take a channel reference for the target bdev for the life of this
2551 		 *  reset.  This guards against the channel getting destroyed while
2552 		 *  spdk_for_each_channel() calls related to this reset IO are in
2553 		 *  progress.  We will release the reference when this reset is
2554 		 *  completed.
2555 		 */
2556 		bdev->internal.reset_in_progress->u.reset.ch_ref = spdk_get_io_channel(__bdev_to_io_dev(bdev));
2557 		_spdk_bdev_start_reset(ch);
2558 	}
2559 	pthread_mutex_unlock(&bdev->internal.mutex);
2560 }
2561 
2562 int
2563 spdk_bdev_reset(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
2564 		spdk_bdev_io_completion_cb cb, void *cb_arg)
2565 {
2566 	struct spdk_bdev *bdev = desc->bdev;
2567 	struct spdk_bdev_io *bdev_io;
2568 	struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch);
2569 
2570 	bdev_io = spdk_bdev_get_io(channel);
2571 	if (!bdev_io) {
2572 		return -ENOMEM;
2573 	}
2574 
2575 	bdev_io->internal.ch = channel;
2576 	bdev_io->internal.desc = desc;
2577 	bdev_io->type = SPDK_BDEV_IO_TYPE_RESET;
2578 	bdev_io->u.reset.ch_ref = NULL;
2579 	spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb);
2580 
2581 	pthread_mutex_lock(&bdev->internal.mutex);
2582 	TAILQ_INSERT_TAIL(&channel->queued_resets, bdev_io, internal.link);
2583 	pthread_mutex_unlock(&bdev->internal.mutex);
2584 
2585 	_spdk_bdev_channel_start_reset(channel);
2586 
2587 	return 0;
2588 }
2589 
2590 void
2591 spdk_bdev_get_io_stat(struct spdk_bdev *bdev, struct spdk_io_channel *ch,
2592 		      struct spdk_bdev_io_stat *stat)
2593 {
2594 	struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch);
2595 
2596 	*stat = channel->stat;
2597 }
2598 
2599 static void
2600 _spdk_bdev_get_device_stat_done(struct spdk_io_channel_iter *i, int status)
2601 {
2602 	void *io_device = spdk_io_channel_iter_get_io_device(i);
2603 	struct spdk_bdev_iostat_ctx *bdev_iostat_ctx = spdk_io_channel_iter_get_ctx(i);
2604 
2605 	bdev_iostat_ctx->cb(__bdev_from_io_dev(io_device), bdev_iostat_ctx->stat,
2606 			    bdev_iostat_ctx->cb_arg, 0);
2607 	free(bdev_iostat_ctx);
2608 }
2609 
2610 static void
2611 _spdk_bdev_get_each_channel_stat(struct spdk_io_channel_iter *i)
2612 {
2613 	struct spdk_bdev_iostat_ctx *bdev_iostat_ctx = spdk_io_channel_iter_get_ctx(i);
2614 	struct spdk_io_channel *ch = spdk_io_channel_iter_get_channel(i);
2615 	struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch);
2616 
2617 	_spdk_bdev_io_stat_add(bdev_iostat_ctx->stat, &channel->stat);
2618 	spdk_for_each_channel_continue(i, 0);
2619 }
2620 
2621 void
2622 spdk_bdev_get_device_stat(struct spdk_bdev *bdev, struct spdk_bdev_io_stat *stat,
2623 			  spdk_bdev_get_device_stat_cb cb, void *cb_arg)
2624 {
2625 	struct spdk_bdev_iostat_ctx *bdev_iostat_ctx;
2626 
2627 	assert(bdev != NULL);
2628 	assert(stat != NULL);
2629 	assert(cb != NULL);
2630 
2631 	bdev_iostat_ctx = calloc(1, sizeof(struct spdk_bdev_iostat_ctx));
2632 	if (bdev_iostat_ctx == NULL) {
2633 		SPDK_ERRLOG("Unable to allocate memory for spdk_bdev_iostat_ctx\n");
2634 		cb(bdev, stat, cb_arg, -ENOMEM);
2635 		return;
2636 	}
2637 
2638 	bdev_iostat_ctx->stat = stat;
2639 	bdev_iostat_ctx->cb = cb;
2640 	bdev_iostat_ctx->cb_arg = cb_arg;
2641 
2642 	/* Start with the statistics from previously deleted channels. */
2643 	pthread_mutex_lock(&bdev->internal.mutex);
2644 	_spdk_bdev_io_stat_add(bdev_iostat_ctx->stat, &bdev->internal.stat);
2645 	pthread_mutex_unlock(&bdev->internal.mutex);
2646 
2647 	/* Then iterate and add the statistics from each existing channel. */
2648 	spdk_for_each_channel(__bdev_to_io_dev(bdev),
2649 			      _spdk_bdev_get_each_channel_stat,
2650 			      bdev_iostat_ctx,
2651 			      _spdk_bdev_get_device_stat_done);
2652 }
2653 
2654 int
2655 spdk_bdev_nvme_admin_passthru(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
2656 			      const struct spdk_nvme_cmd *cmd, void *buf, size_t nbytes,
2657 			      spdk_bdev_io_completion_cb cb, void *cb_arg)
2658 {
2659 	struct spdk_bdev *bdev = desc->bdev;
2660 	struct spdk_bdev_io *bdev_io;
2661 	struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch);
2662 
2663 	if (!desc->write) {
2664 		return -EBADF;
2665 	}
2666 
2667 	bdev_io = spdk_bdev_get_io(channel);
2668 	if (!bdev_io) {
2669 		return -ENOMEM;
2670 	}
2671 
2672 	bdev_io->internal.ch = channel;
2673 	bdev_io->internal.desc = desc;
2674 	bdev_io->type = SPDK_BDEV_IO_TYPE_NVME_ADMIN;
2675 	bdev_io->u.nvme_passthru.cmd = *cmd;
2676 	bdev_io->u.nvme_passthru.buf = buf;
2677 	bdev_io->u.nvme_passthru.nbytes = nbytes;
2678 	bdev_io->u.nvme_passthru.md_buf = NULL;
2679 	bdev_io->u.nvme_passthru.md_len = 0;
2680 
2681 	spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb);
2682 
2683 	spdk_bdev_io_submit(bdev_io);
2684 	return 0;
2685 }
2686 
2687 int
2688 spdk_bdev_nvme_io_passthru(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
2689 			   const struct spdk_nvme_cmd *cmd, void *buf, size_t nbytes,
2690 			   spdk_bdev_io_completion_cb cb, void *cb_arg)
2691 {
2692 	struct spdk_bdev *bdev = desc->bdev;
2693 	struct spdk_bdev_io *bdev_io;
2694 	struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch);
2695 
2696 	if (!desc->write) {
2697 		/*
2698 		 * Do not try to parse the NVMe command - we could maybe use bits in the opcode
2699 		 *  to easily determine if the command is a read or write, but for now just
2700 		 *  do not allow io_passthru with a read-only descriptor.
2701 		 */
2702 		return -EBADF;
2703 	}
2704 
2705 	bdev_io = spdk_bdev_get_io(channel);
2706 	if (!bdev_io) {
2707 		return -ENOMEM;
2708 	}
2709 
2710 	bdev_io->internal.ch = channel;
2711 	bdev_io->internal.desc = desc;
2712 	bdev_io->type = SPDK_BDEV_IO_TYPE_NVME_IO;
2713 	bdev_io->u.nvme_passthru.cmd = *cmd;
2714 	bdev_io->u.nvme_passthru.buf = buf;
2715 	bdev_io->u.nvme_passthru.nbytes = nbytes;
2716 	bdev_io->u.nvme_passthru.md_buf = NULL;
2717 	bdev_io->u.nvme_passthru.md_len = 0;
2718 
2719 	spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb);
2720 
2721 	spdk_bdev_io_submit(bdev_io);
2722 	return 0;
2723 }
2724 
2725 int
2726 spdk_bdev_nvme_io_passthru_md(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
2727 			      const struct spdk_nvme_cmd *cmd, void *buf, size_t nbytes, void *md_buf, size_t md_len,
2728 			      spdk_bdev_io_completion_cb cb, void *cb_arg)
2729 {
2730 	struct spdk_bdev *bdev = desc->bdev;
2731 	struct spdk_bdev_io *bdev_io;
2732 	struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch);
2733 
2734 	if (!desc->write) {
2735 		/*
2736 		 * Do not try to parse the NVMe command - we could maybe use bits in the opcode
2737 		 *  to easily determine if the command is a read or write, but for now just
2738 		 *  do not allow io_passthru with a read-only descriptor.
2739 		 */
2740 		return -EBADF;
2741 	}
2742 
2743 	bdev_io = spdk_bdev_get_io(channel);
2744 	if (!bdev_io) {
2745 		return -ENOMEM;
2746 	}
2747 
2748 	bdev_io->internal.ch = channel;
2749 	bdev_io->internal.desc = desc;
2750 	bdev_io->type = SPDK_BDEV_IO_TYPE_NVME_IO_MD;
2751 	bdev_io->u.nvme_passthru.cmd = *cmd;
2752 	bdev_io->u.nvme_passthru.buf = buf;
2753 	bdev_io->u.nvme_passthru.nbytes = nbytes;
2754 	bdev_io->u.nvme_passthru.md_buf = md_buf;
2755 	bdev_io->u.nvme_passthru.md_len = md_len;
2756 
2757 	spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb);
2758 
2759 	spdk_bdev_io_submit(bdev_io);
2760 	return 0;
2761 }
2762 
2763 int
2764 spdk_bdev_queue_io_wait(struct spdk_bdev *bdev, struct spdk_io_channel *ch,
2765 			struct spdk_bdev_io_wait_entry *entry)
2766 {
2767 	struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch);
2768 	struct spdk_bdev_mgmt_channel *mgmt_ch = channel->shared_resource->mgmt_ch;
2769 
2770 	if (bdev != entry->bdev) {
2771 		SPDK_ERRLOG("bdevs do not match\n");
2772 		return -EINVAL;
2773 	}
2774 
2775 	if (mgmt_ch->per_thread_cache_count > 0) {
2776 		SPDK_ERRLOG("Cannot queue io_wait if spdk_bdev_io available in per-thread cache\n");
2777 		return -EINVAL;
2778 	}
2779 
2780 	TAILQ_INSERT_TAIL(&mgmt_ch->io_wait_queue, entry, link);
2781 	return 0;
2782 }
2783 
2784 static void
2785 _spdk_bdev_ch_retry_io(struct spdk_bdev_channel *bdev_ch)
2786 {
2787 	struct spdk_bdev *bdev = bdev_ch->bdev;
2788 	struct spdk_bdev_shared_resource *shared_resource = bdev_ch->shared_resource;
2789 	struct spdk_bdev_io *bdev_io;
2790 
2791 	if (shared_resource->io_outstanding > shared_resource->nomem_threshold) {
2792 		/*
2793 		 * Allow some more I/O to complete before retrying the nomem_io queue.
2794 		 *  Some drivers (such as nvme) cannot immediately take a new I/O in
2795 		 *  the context of a completion, because the resources for the I/O are
2796 		 *  not released until control returns to the bdev poller.  Also, we
2797 		 *  may require several small I/O to complete before a larger I/O
2798 		 *  (that requires splitting) can be submitted.
2799 		 */
2800 		return;
2801 	}
2802 
2803 	while (!TAILQ_EMPTY(&shared_resource->nomem_io)) {
2804 		bdev_io = TAILQ_FIRST(&shared_resource->nomem_io);
2805 		TAILQ_REMOVE(&shared_resource->nomem_io, bdev_io, internal.link);
2806 		bdev_io->internal.ch->io_outstanding++;
2807 		shared_resource->io_outstanding++;
2808 		bdev_io->internal.status = SPDK_BDEV_IO_STATUS_PENDING;
2809 		bdev->fn_table->submit_request(bdev_io->internal.ch->channel, bdev_io);
2810 		if (bdev_io->internal.status == SPDK_BDEV_IO_STATUS_NOMEM) {
2811 			break;
2812 		}
2813 	}
2814 }
2815 
2816 static inline void
2817 _spdk_bdev_io_complete(void *ctx)
2818 {
2819 	struct spdk_bdev_io *bdev_io = ctx;
2820 	uint64_t tsc;
2821 
2822 	if (spdk_unlikely(bdev_io->internal.in_submit_request || bdev_io->internal.io_submit_ch)) {
2823 		/*
2824 		 * Send the completion to the thread that originally submitted the I/O,
2825 		 * which may not be the current thread in the case of QoS.
2826 		 */
2827 		if (bdev_io->internal.io_submit_ch) {
2828 			bdev_io->internal.ch = bdev_io->internal.io_submit_ch;
2829 			bdev_io->internal.io_submit_ch = NULL;
2830 		}
2831 
2832 		/*
2833 		 * Defer completion to avoid potential infinite recursion if the
2834 		 * user's completion callback issues a new I/O.
2835 		 */
2836 		spdk_thread_send_msg(spdk_io_channel_get_thread(bdev_io->internal.ch->channel),
2837 				     _spdk_bdev_io_complete, bdev_io);
2838 		return;
2839 	}
2840 
2841 	tsc = spdk_get_ticks();
2842 	spdk_trace_record_tsc(tsc, TRACE_BDEV_IO_DONE, 0, 0, (uintptr_t)bdev_io, 0);
2843 
2844 	if (bdev_io->internal.status == SPDK_BDEV_IO_STATUS_SUCCESS) {
2845 		switch (bdev_io->type) {
2846 		case SPDK_BDEV_IO_TYPE_READ:
2847 			bdev_io->internal.ch->stat.bytes_read += bdev_io->u.bdev.num_blocks * bdev_io->bdev->blocklen;
2848 			bdev_io->internal.ch->stat.num_read_ops++;
2849 			bdev_io->internal.ch->stat.read_latency_ticks += (tsc - bdev_io->internal.submit_tsc);
2850 			break;
2851 		case SPDK_BDEV_IO_TYPE_WRITE:
2852 			bdev_io->internal.ch->stat.bytes_written += bdev_io->u.bdev.num_blocks * bdev_io->bdev->blocklen;
2853 			bdev_io->internal.ch->stat.num_write_ops++;
2854 			bdev_io->internal.ch->stat.write_latency_ticks += (tsc - bdev_io->internal.submit_tsc);
2855 			break;
2856 		default:
2857 			break;
2858 		}
2859 	}
2860 
2861 #ifdef SPDK_CONFIG_VTUNE
2862 	uint64_t now_tsc = spdk_get_ticks();
2863 	if (now_tsc > (bdev_io->internal.ch->start_tsc + bdev_io->internal.ch->interval_tsc)) {
2864 		uint64_t data[5];
2865 
2866 		data[0] = bdev_io->internal.ch->stat.num_read_ops - bdev_io->internal.ch->prev_stat.num_read_ops;
2867 		data[1] = bdev_io->internal.ch->stat.bytes_read - bdev_io->internal.ch->prev_stat.bytes_read;
2868 		data[2] = bdev_io->internal.ch->stat.num_write_ops - bdev_io->internal.ch->prev_stat.num_write_ops;
2869 		data[3] = bdev_io->internal.ch->stat.bytes_written - bdev_io->internal.ch->prev_stat.bytes_written;
2870 		data[4] = bdev_io->bdev->fn_table->get_spin_time ?
2871 			  bdev_io->bdev->fn_table->get_spin_time(bdev_io->internal.ch->channel) : 0;
2872 
2873 		__itt_metadata_add(g_bdev_mgr.domain, __itt_null, bdev_io->internal.ch->handle,
2874 				   __itt_metadata_u64, 5, data);
2875 
2876 		bdev_io->internal.ch->prev_stat = bdev_io->internal.ch->stat;
2877 		bdev_io->internal.ch->start_tsc = now_tsc;
2878 	}
2879 #endif
2880 
2881 	assert(bdev_io->internal.cb != NULL);
2882 	assert(spdk_get_thread() == spdk_io_channel_get_thread(bdev_io->internal.ch->channel));
2883 
2884 	bdev_io->internal.cb(bdev_io, bdev_io->internal.status == SPDK_BDEV_IO_STATUS_SUCCESS,
2885 			     bdev_io->internal.caller_ctx);
2886 }
2887 
2888 static void
2889 _spdk_bdev_reset_complete(struct spdk_io_channel_iter *i, int status)
2890 {
2891 	struct spdk_bdev_io *bdev_io = spdk_io_channel_iter_get_ctx(i);
2892 
2893 	if (bdev_io->u.reset.ch_ref != NULL) {
2894 		spdk_put_io_channel(bdev_io->u.reset.ch_ref);
2895 		bdev_io->u.reset.ch_ref = NULL;
2896 	}
2897 
2898 	_spdk_bdev_io_complete(bdev_io);
2899 }
2900 
2901 static void
2902 _spdk_bdev_unfreeze_channel(struct spdk_io_channel_iter *i)
2903 {
2904 	struct spdk_io_channel *_ch = spdk_io_channel_iter_get_channel(i);
2905 	struct spdk_bdev_channel *ch = spdk_io_channel_get_ctx(_ch);
2906 
2907 	ch->flags &= ~BDEV_CH_RESET_IN_PROGRESS;
2908 	if (!TAILQ_EMPTY(&ch->queued_resets)) {
2909 		_spdk_bdev_channel_start_reset(ch);
2910 	}
2911 
2912 	spdk_for_each_channel_continue(i, 0);
2913 }
2914 
2915 void
2916 spdk_bdev_io_complete(struct spdk_bdev_io *bdev_io, enum spdk_bdev_io_status status)
2917 {
2918 	struct spdk_bdev *bdev = bdev_io->bdev;
2919 	struct spdk_bdev_channel *bdev_ch = bdev_io->internal.ch;
2920 	struct spdk_bdev_shared_resource *shared_resource = bdev_ch->shared_resource;
2921 
2922 	bdev_io->internal.status = status;
2923 
2924 	if (spdk_unlikely(bdev_io->type == SPDK_BDEV_IO_TYPE_RESET)) {
2925 		bool unlock_channels = false;
2926 
2927 		if (status == SPDK_BDEV_IO_STATUS_NOMEM) {
2928 			SPDK_ERRLOG("NOMEM returned for reset\n");
2929 		}
2930 		pthread_mutex_lock(&bdev->internal.mutex);
2931 		if (bdev_io == bdev->internal.reset_in_progress) {
2932 			bdev->internal.reset_in_progress = NULL;
2933 			unlock_channels = true;
2934 		}
2935 		pthread_mutex_unlock(&bdev->internal.mutex);
2936 
2937 		if (unlock_channels) {
2938 			spdk_for_each_channel(__bdev_to_io_dev(bdev), _spdk_bdev_unfreeze_channel,
2939 					      bdev_io, _spdk_bdev_reset_complete);
2940 			return;
2941 		}
2942 	} else {
2943 		assert(bdev_ch->io_outstanding > 0);
2944 		assert(shared_resource->io_outstanding > 0);
2945 		bdev_ch->io_outstanding--;
2946 		shared_resource->io_outstanding--;
2947 
2948 		if (spdk_unlikely(status == SPDK_BDEV_IO_STATUS_NOMEM)) {
2949 			TAILQ_INSERT_HEAD(&shared_resource->nomem_io, bdev_io, internal.link);
2950 			/*
2951 			 * Wait for some of the outstanding I/O to complete before we
2952 			 *  retry any of the nomem_io.  Normally we will wait for
2953 			 *  NOMEM_THRESHOLD_COUNT I/O to complete but for low queue
2954 			 *  depth channels we will instead wait for half to complete.
2955 			 */
2956 			shared_resource->nomem_threshold = spdk_max((int64_t)shared_resource->io_outstanding / 2,
2957 							   (int64_t)shared_resource->io_outstanding - NOMEM_THRESHOLD_COUNT);
2958 			return;
2959 		}
2960 
2961 		if (spdk_unlikely(!TAILQ_EMPTY(&shared_resource->nomem_io))) {
2962 			_spdk_bdev_ch_retry_io(bdev_ch);
2963 		}
2964 	}
2965 
2966 	_spdk_bdev_io_complete(bdev_io);
2967 }
2968 
2969 void
2970 spdk_bdev_io_complete_scsi_status(struct spdk_bdev_io *bdev_io, enum spdk_scsi_status sc,
2971 				  enum spdk_scsi_sense sk, uint8_t asc, uint8_t ascq)
2972 {
2973 	if (sc == SPDK_SCSI_STATUS_GOOD) {
2974 		bdev_io->internal.status = SPDK_BDEV_IO_STATUS_SUCCESS;
2975 	} else {
2976 		bdev_io->internal.status = SPDK_BDEV_IO_STATUS_SCSI_ERROR;
2977 		bdev_io->internal.error.scsi.sc = sc;
2978 		bdev_io->internal.error.scsi.sk = sk;
2979 		bdev_io->internal.error.scsi.asc = asc;
2980 		bdev_io->internal.error.scsi.ascq = ascq;
2981 	}
2982 
2983 	spdk_bdev_io_complete(bdev_io, bdev_io->internal.status);
2984 }
2985 
2986 void
2987 spdk_bdev_io_get_scsi_status(const struct spdk_bdev_io *bdev_io,
2988 			     int *sc, int *sk, int *asc, int *ascq)
2989 {
2990 	assert(sc != NULL);
2991 	assert(sk != NULL);
2992 	assert(asc != NULL);
2993 	assert(ascq != NULL);
2994 
2995 	switch (bdev_io->internal.status) {
2996 	case SPDK_BDEV_IO_STATUS_SUCCESS:
2997 		*sc = SPDK_SCSI_STATUS_GOOD;
2998 		*sk = SPDK_SCSI_SENSE_NO_SENSE;
2999 		*asc = SPDK_SCSI_ASC_NO_ADDITIONAL_SENSE;
3000 		*ascq = SPDK_SCSI_ASCQ_CAUSE_NOT_REPORTABLE;
3001 		break;
3002 	case SPDK_BDEV_IO_STATUS_NVME_ERROR:
3003 		spdk_scsi_nvme_translate(bdev_io, sc, sk, asc, ascq);
3004 		break;
3005 	case SPDK_BDEV_IO_STATUS_SCSI_ERROR:
3006 		*sc = bdev_io->internal.error.scsi.sc;
3007 		*sk = bdev_io->internal.error.scsi.sk;
3008 		*asc = bdev_io->internal.error.scsi.asc;
3009 		*ascq = bdev_io->internal.error.scsi.ascq;
3010 		break;
3011 	default:
3012 		*sc = SPDK_SCSI_STATUS_CHECK_CONDITION;
3013 		*sk = SPDK_SCSI_SENSE_ABORTED_COMMAND;
3014 		*asc = SPDK_SCSI_ASC_NO_ADDITIONAL_SENSE;
3015 		*ascq = SPDK_SCSI_ASCQ_CAUSE_NOT_REPORTABLE;
3016 		break;
3017 	}
3018 }
3019 
3020 void
3021 spdk_bdev_io_complete_nvme_status(struct spdk_bdev_io *bdev_io, int sct, int sc)
3022 {
3023 	if (sct == SPDK_NVME_SCT_GENERIC && sc == SPDK_NVME_SC_SUCCESS) {
3024 		bdev_io->internal.status = SPDK_BDEV_IO_STATUS_SUCCESS;
3025 	} else {
3026 		bdev_io->internal.error.nvme.sct = sct;
3027 		bdev_io->internal.error.nvme.sc = sc;
3028 		bdev_io->internal.status = SPDK_BDEV_IO_STATUS_NVME_ERROR;
3029 	}
3030 
3031 	spdk_bdev_io_complete(bdev_io, bdev_io->internal.status);
3032 }
3033 
3034 void
3035 spdk_bdev_io_get_nvme_status(const struct spdk_bdev_io *bdev_io, int *sct, int *sc)
3036 {
3037 	assert(sct != NULL);
3038 	assert(sc != NULL);
3039 
3040 	if (bdev_io->internal.status == SPDK_BDEV_IO_STATUS_NVME_ERROR) {
3041 		*sct = bdev_io->internal.error.nvme.sct;
3042 		*sc = bdev_io->internal.error.nvme.sc;
3043 	} else if (bdev_io->internal.status == SPDK_BDEV_IO_STATUS_SUCCESS) {
3044 		*sct = SPDK_NVME_SCT_GENERIC;
3045 		*sc = SPDK_NVME_SC_SUCCESS;
3046 	} else {
3047 		*sct = SPDK_NVME_SCT_GENERIC;
3048 		*sc = SPDK_NVME_SC_INTERNAL_DEVICE_ERROR;
3049 	}
3050 }
3051 
3052 struct spdk_thread *
3053 spdk_bdev_io_get_thread(struct spdk_bdev_io *bdev_io)
3054 {
3055 	return spdk_io_channel_get_thread(bdev_io->internal.ch->channel);
3056 }
3057 
3058 static void
3059 _spdk_bdev_qos_config_limit(struct spdk_bdev *bdev, uint64_t *limits)
3060 {
3061 	uint64_t	min_qos_set;
3062 	int		i;
3063 
3064 	for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) {
3065 		if (limits[i] != SPDK_BDEV_QOS_LIMIT_NOT_DEFINED) {
3066 			break;
3067 		}
3068 	}
3069 
3070 	if (i == SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES) {
3071 		SPDK_ERRLOG("Invalid rate limits set.\n");
3072 		return;
3073 	}
3074 
3075 	for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) {
3076 		if (limits[i] == SPDK_BDEV_QOS_LIMIT_NOT_DEFINED) {
3077 			continue;
3078 		}
3079 
3080 		if (_spdk_bdev_qos_is_iops_rate_limit(i) == true) {
3081 			min_qos_set = SPDK_BDEV_QOS_MIN_IOS_PER_SEC;
3082 		} else {
3083 			min_qos_set = SPDK_BDEV_QOS_MIN_BYTES_PER_SEC;
3084 		}
3085 
3086 		if (limits[i] == 0 || limits[i] % min_qos_set) {
3087 			SPDK_ERRLOG("Assigned limit %" PRIu64 " on bdev %s is not multiple of %" PRIu64 "\n",
3088 				    limits[i], bdev->name, min_qos_set);
3089 			SPDK_ERRLOG("Failed to enable QoS on this bdev %s\n", bdev->name);
3090 			return;
3091 		}
3092 	}
3093 
3094 	if (!bdev->internal.qos) {
3095 		bdev->internal.qos = calloc(1, sizeof(*bdev->internal.qos));
3096 		if (!bdev->internal.qos) {
3097 			SPDK_ERRLOG("Unable to allocate memory for QoS tracking\n");
3098 			return;
3099 		}
3100 	}
3101 
3102 	for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) {
3103 		bdev->internal.qos->rate_limits[i].limit = limits[i];
3104 		SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Bdev:%s QoS type:%d set:%lu\n",
3105 			      bdev->name, i, limits[i]);
3106 	}
3107 
3108 	return;
3109 }
3110 
3111 static void
3112 _spdk_bdev_qos_config(struct spdk_bdev *bdev)
3113 {
3114 	struct spdk_conf_section	*sp = NULL;
3115 	const char			*val = NULL;
3116 	int				i = 0, j = 0;
3117 	uint64_t			limits[SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES] = {};
3118 	bool				config_qos = false;
3119 
3120 	sp = spdk_conf_find_section(NULL, "QoS");
3121 	if (!sp) {
3122 		return;
3123 	}
3124 
3125 	while (j < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES) {
3126 		limits[j] = SPDK_BDEV_QOS_LIMIT_NOT_DEFINED;
3127 
3128 		i = 0;
3129 		while (true) {
3130 			val = spdk_conf_section_get_nmval(sp, qos_conf_type[j], i, 0);
3131 			if (!val) {
3132 				break;
3133 			}
3134 
3135 			if (strcmp(bdev->name, val) != 0) {
3136 				i++;
3137 				continue;
3138 			}
3139 
3140 			val = spdk_conf_section_get_nmval(sp, qos_conf_type[j], i, 1);
3141 			if (val) {
3142 				if (_spdk_bdev_qos_is_iops_rate_limit(j) == true) {
3143 					limits[j] = strtoull(val, NULL, 10);
3144 				} else {
3145 					limits[j] = strtoull(val, NULL, 10) * 1024 * 1024;
3146 				}
3147 				config_qos = true;
3148 			}
3149 
3150 			break;
3151 		}
3152 
3153 		j++;
3154 	}
3155 
3156 	if (config_qos == true) {
3157 		_spdk_bdev_qos_config_limit(bdev, limits);
3158 	}
3159 
3160 	return;
3161 }
3162 
3163 static int
3164 spdk_bdev_init(struct spdk_bdev *bdev)
3165 {
3166 	char *bdev_name;
3167 
3168 	assert(bdev->module != NULL);
3169 
3170 	if (!bdev->name) {
3171 		SPDK_ERRLOG("Bdev name is NULL\n");
3172 		return -EINVAL;
3173 	}
3174 
3175 	if (spdk_bdev_get_by_name(bdev->name)) {
3176 		SPDK_ERRLOG("Bdev name:%s already exists\n", bdev->name);
3177 		return -EEXIST;
3178 	}
3179 
3180 	/* Users often register their own I/O devices using the bdev name. In
3181 	 * order to avoid conflicts, prepend bdev_. */
3182 	bdev_name = spdk_sprintf_alloc("bdev_%s", bdev->name);
3183 	if (!bdev_name) {
3184 		SPDK_ERRLOG("Unable to allocate memory for internal bdev name.\n");
3185 		return -ENOMEM;
3186 	}
3187 
3188 	bdev->internal.status = SPDK_BDEV_STATUS_READY;
3189 	bdev->internal.measured_queue_depth = UINT64_MAX;
3190 
3191 	TAILQ_INIT(&bdev->internal.open_descs);
3192 
3193 	TAILQ_INIT(&bdev->aliases);
3194 
3195 	bdev->internal.reset_in_progress = NULL;
3196 
3197 	_spdk_bdev_qos_config(bdev);
3198 
3199 	spdk_io_device_register(__bdev_to_io_dev(bdev),
3200 				spdk_bdev_channel_create, spdk_bdev_channel_destroy,
3201 				sizeof(struct spdk_bdev_channel),
3202 				bdev_name);
3203 
3204 	free(bdev_name);
3205 
3206 	pthread_mutex_init(&bdev->internal.mutex, NULL);
3207 	return 0;
3208 }
3209 
3210 static void
3211 spdk_bdev_destroy_cb(void *io_device)
3212 {
3213 	int			rc;
3214 	struct spdk_bdev	*bdev;
3215 	spdk_bdev_unregister_cb	cb_fn;
3216 	void			*cb_arg;
3217 
3218 	bdev = __bdev_from_io_dev(io_device);
3219 	cb_fn = bdev->internal.unregister_cb;
3220 	cb_arg = bdev->internal.unregister_ctx;
3221 
3222 	rc = bdev->fn_table->destruct(bdev->ctxt);
3223 	if (rc < 0) {
3224 		SPDK_ERRLOG("destruct failed\n");
3225 	}
3226 	if (rc <= 0 && cb_fn != NULL) {
3227 		cb_fn(cb_arg, rc);
3228 	}
3229 }
3230 
3231 
3232 static void
3233 spdk_bdev_fini(struct spdk_bdev *bdev)
3234 {
3235 	pthread_mutex_destroy(&bdev->internal.mutex);
3236 
3237 	free(bdev->internal.qos);
3238 
3239 	spdk_io_device_unregister(__bdev_to_io_dev(bdev), spdk_bdev_destroy_cb);
3240 }
3241 
3242 static void
3243 spdk_bdev_start(struct spdk_bdev *bdev)
3244 {
3245 	struct spdk_bdev_module *module;
3246 	uint32_t action;
3247 
3248 	SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Inserting bdev %s into list\n", bdev->name);
3249 	TAILQ_INSERT_TAIL(&g_bdev_mgr.bdevs, bdev, internal.link);
3250 
3251 	/* Examine configuration before initializing I/O */
3252 	TAILQ_FOREACH(module, &g_bdev_mgr.bdev_modules, internal.tailq) {
3253 		if (module->examine_config) {
3254 			action = module->internal.action_in_progress;
3255 			module->internal.action_in_progress++;
3256 			module->examine_config(bdev);
3257 			if (action != module->internal.action_in_progress) {
3258 				SPDK_ERRLOG("examine_config for module %s did not call spdk_bdev_module_examine_done()\n",
3259 					    module->name);
3260 			}
3261 		}
3262 	}
3263 
3264 	if (bdev->internal.claim_module) {
3265 		return;
3266 	}
3267 
3268 	TAILQ_FOREACH(module, &g_bdev_mgr.bdev_modules, internal.tailq) {
3269 		if (module->examine_disk) {
3270 			module->internal.action_in_progress++;
3271 			module->examine_disk(bdev);
3272 		}
3273 	}
3274 }
3275 
3276 int
3277 spdk_bdev_register(struct spdk_bdev *bdev)
3278 {
3279 	int rc = spdk_bdev_init(bdev);
3280 
3281 	if (rc == 0) {
3282 		spdk_bdev_start(bdev);
3283 	}
3284 
3285 	return rc;
3286 }
3287 
3288 int
3289 spdk_vbdev_register(struct spdk_bdev *vbdev, struct spdk_bdev **base_bdevs, int base_bdev_count)
3290 {
3291 	int rc;
3292 
3293 	rc = spdk_bdev_init(vbdev);
3294 	if (rc) {
3295 		return rc;
3296 	}
3297 
3298 	spdk_bdev_start(vbdev);
3299 	return 0;
3300 }
3301 
3302 void
3303 spdk_bdev_destruct_done(struct spdk_bdev *bdev, int bdeverrno)
3304 {
3305 	if (bdev->internal.unregister_cb != NULL) {
3306 		bdev->internal.unregister_cb(bdev->internal.unregister_ctx, bdeverrno);
3307 	}
3308 }
3309 
3310 static void
3311 _remove_notify(void *arg)
3312 {
3313 	struct spdk_bdev_desc *desc = arg;
3314 
3315 	desc->remove_scheduled = false;
3316 
3317 	if (desc->closed) {
3318 		free(desc);
3319 	} else {
3320 		desc->remove_cb(desc->remove_ctx);
3321 	}
3322 }
3323 
3324 void
3325 spdk_bdev_unregister(struct spdk_bdev *bdev, spdk_bdev_unregister_cb cb_fn, void *cb_arg)
3326 {
3327 	struct spdk_bdev_desc	*desc, *tmp;
3328 	bool			do_destruct = true;
3329 	struct spdk_thread	*thread;
3330 
3331 	SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Removing bdev %s from list\n", bdev->name);
3332 
3333 	thread = spdk_get_thread();
3334 	if (!thread) {
3335 		/* The user called this from a non-SPDK thread. */
3336 		if (cb_fn != NULL) {
3337 			cb_fn(cb_arg, -ENOTSUP);
3338 		}
3339 		return;
3340 	}
3341 
3342 	pthread_mutex_lock(&bdev->internal.mutex);
3343 
3344 	bdev->internal.status = SPDK_BDEV_STATUS_REMOVING;
3345 	bdev->internal.unregister_cb = cb_fn;
3346 	bdev->internal.unregister_ctx = cb_arg;
3347 
3348 	TAILQ_FOREACH_SAFE(desc, &bdev->internal.open_descs, link, tmp) {
3349 		if (desc->remove_cb) {
3350 			do_destruct = false;
3351 			/*
3352 			 * Defer invocation of the remove_cb to a separate message that will
3353 			 *  run later on its thread.  This ensures this context unwinds and
3354 			 *  we don't recursively unregister this bdev again if the remove_cb
3355 			 *  immediately closes its descriptor.
3356 			 */
3357 			if (!desc->remove_scheduled) {
3358 				/* Avoid scheduling removal of the same descriptor multiple times. */
3359 				desc->remove_scheduled = true;
3360 				spdk_thread_send_msg(desc->thread, _remove_notify, desc);
3361 			}
3362 		}
3363 	}
3364 
3365 	if (!do_destruct) {
3366 		pthread_mutex_unlock(&bdev->internal.mutex);
3367 		return;
3368 	}
3369 
3370 	TAILQ_REMOVE(&g_bdev_mgr.bdevs, bdev, internal.link);
3371 	pthread_mutex_unlock(&bdev->internal.mutex);
3372 
3373 	spdk_bdev_fini(bdev);
3374 }
3375 
3376 int
3377 spdk_bdev_open(struct spdk_bdev *bdev, bool write, spdk_bdev_remove_cb_t remove_cb,
3378 	       void *remove_ctx, struct spdk_bdev_desc **_desc)
3379 {
3380 	struct spdk_bdev_desc *desc;
3381 	struct spdk_thread *thread;
3382 
3383 	thread = spdk_get_thread();
3384 	if (!thread) {
3385 		SPDK_ERRLOG("Cannot open bdev from non-SPDK thread.\n");
3386 		return -ENOTSUP;
3387 	}
3388 
3389 	desc = calloc(1, sizeof(*desc));
3390 	if (desc == NULL) {
3391 		SPDK_ERRLOG("Failed to allocate memory for bdev descriptor\n");
3392 		return -ENOMEM;
3393 	}
3394 
3395 	SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Opening descriptor %p for bdev %s on thread %p\n", desc, bdev->name,
3396 		      spdk_get_thread());
3397 
3398 	pthread_mutex_lock(&bdev->internal.mutex);
3399 
3400 	if (write && bdev->internal.claim_module) {
3401 		SPDK_ERRLOG("Could not open %s - %s module already claimed it\n",
3402 			    bdev->name, bdev->internal.claim_module->name);
3403 		free(desc);
3404 		pthread_mutex_unlock(&bdev->internal.mutex);
3405 		return -EPERM;
3406 	}
3407 
3408 	TAILQ_INSERT_TAIL(&bdev->internal.open_descs, desc, link);
3409 
3410 	desc->bdev = bdev;
3411 	desc->thread = thread;
3412 	desc->remove_cb = remove_cb;
3413 	desc->remove_ctx = remove_ctx;
3414 	desc->write = write;
3415 	*_desc = desc;
3416 
3417 	pthread_mutex_unlock(&bdev->internal.mutex);
3418 
3419 	return 0;
3420 }
3421 
3422 void
3423 spdk_bdev_close(struct spdk_bdev_desc *desc)
3424 {
3425 	struct spdk_bdev *bdev = desc->bdev;
3426 	bool do_unregister = false;
3427 
3428 	SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Closing descriptor %p for bdev %s on thread %p\n", desc, bdev->name,
3429 		      spdk_get_thread());
3430 
3431 	assert(desc->thread == spdk_get_thread());
3432 
3433 	pthread_mutex_lock(&bdev->internal.mutex);
3434 
3435 	TAILQ_REMOVE(&bdev->internal.open_descs, desc, link);
3436 
3437 	desc->closed = true;
3438 
3439 	if (!desc->remove_scheduled) {
3440 		free(desc);
3441 	}
3442 
3443 	/* If no more descriptors, kill QoS channel */
3444 	if (bdev->internal.qos && TAILQ_EMPTY(&bdev->internal.open_descs)) {
3445 		SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Closed last descriptor for bdev %s on thread %p. Stopping QoS.\n",
3446 			      bdev->name, spdk_get_thread());
3447 
3448 		if (spdk_bdev_qos_destroy(bdev)) {
3449 			/* There isn't anything we can do to recover here. Just let the
3450 			 * old QoS poller keep running. The QoS handling won't change
3451 			 * cores when the user allocates a new channel, but it won't break. */
3452 			SPDK_ERRLOG("Unable to shut down QoS poller. It will continue running on the current thread.\n");
3453 		}
3454 	}
3455 
3456 	spdk_bdev_set_qd_sampling_period(bdev, 0);
3457 
3458 	if (bdev->internal.status == SPDK_BDEV_STATUS_REMOVING && TAILQ_EMPTY(&bdev->internal.open_descs)) {
3459 		do_unregister = true;
3460 	}
3461 	pthread_mutex_unlock(&bdev->internal.mutex);
3462 
3463 	if (do_unregister == true) {
3464 		spdk_bdev_unregister(bdev, bdev->internal.unregister_cb, bdev->internal.unregister_ctx);
3465 	}
3466 }
3467 
3468 int
3469 spdk_bdev_module_claim_bdev(struct spdk_bdev *bdev, struct spdk_bdev_desc *desc,
3470 			    struct spdk_bdev_module *module)
3471 {
3472 	if (bdev->internal.claim_module != NULL) {
3473 		SPDK_ERRLOG("bdev %s already claimed by module %s\n", bdev->name,
3474 			    bdev->internal.claim_module->name);
3475 		return -EPERM;
3476 	}
3477 
3478 	if (desc && !desc->write) {
3479 		desc->write = true;
3480 	}
3481 
3482 	bdev->internal.claim_module = module;
3483 	return 0;
3484 }
3485 
3486 void
3487 spdk_bdev_module_release_bdev(struct spdk_bdev *bdev)
3488 {
3489 	assert(bdev->internal.claim_module != NULL);
3490 	bdev->internal.claim_module = NULL;
3491 }
3492 
3493 struct spdk_bdev *
3494 spdk_bdev_desc_get_bdev(struct spdk_bdev_desc *desc)
3495 {
3496 	return desc->bdev;
3497 }
3498 
3499 void
3500 spdk_bdev_io_get_iovec(struct spdk_bdev_io *bdev_io, struct iovec **iovp, int *iovcntp)
3501 {
3502 	struct iovec *iovs;
3503 	int iovcnt;
3504 
3505 	if (bdev_io == NULL) {
3506 		return;
3507 	}
3508 
3509 	switch (bdev_io->type) {
3510 	case SPDK_BDEV_IO_TYPE_READ:
3511 		iovs = bdev_io->u.bdev.iovs;
3512 		iovcnt = bdev_io->u.bdev.iovcnt;
3513 		break;
3514 	case SPDK_BDEV_IO_TYPE_WRITE:
3515 		iovs = bdev_io->u.bdev.iovs;
3516 		iovcnt = bdev_io->u.bdev.iovcnt;
3517 		break;
3518 	default:
3519 		iovs = NULL;
3520 		iovcnt = 0;
3521 		break;
3522 	}
3523 
3524 	if (iovp) {
3525 		*iovp = iovs;
3526 	}
3527 	if (iovcntp) {
3528 		*iovcntp = iovcnt;
3529 	}
3530 }
3531 
3532 void
3533 spdk_bdev_module_list_add(struct spdk_bdev_module *bdev_module)
3534 {
3535 
3536 	if (spdk_bdev_module_list_find(bdev_module->name)) {
3537 		SPDK_ERRLOG("ERROR: module '%s' already registered.\n", bdev_module->name);
3538 		assert(false);
3539 	}
3540 
3541 	if (bdev_module->async_init) {
3542 		bdev_module->internal.action_in_progress = 1;
3543 	}
3544 
3545 	/*
3546 	 * Modules with examine callbacks must be initialized first, so they are
3547 	 *  ready to handle examine callbacks from later modules that will
3548 	 *  register physical bdevs.
3549 	 */
3550 	if (bdev_module->examine_config != NULL || bdev_module->examine_disk != NULL) {
3551 		TAILQ_INSERT_HEAD(&g_bdev_mgr.bdev_modules, bdev_module, internal.tailq);
3552 	} else {
3553 		TAILQ_INSERT_TAIL(&g_bdev_mgr.bdev_modules, bdev_module, internal.tailq);
3554 	}
3555 }
3556 
3557 struct spdk_bdev_module *
3558 spdk_bdev_module_list_find(const char *name)
3559 {
3560 	struct spdk_bdev_module *bdev_module;
3561 
3562 	TAILQ_FOREACH(bdev_module, &g_bdev_mgr.bdev_modules, internal.tailq) {
3563 		if (strcmp(name, bdev_module->name) == 0) {
3564 			break;
3565 		}
3566 	}
3567 
3568 	return bdev_module;
3569 }
3570 
3571 static void
3572 _spdk_bdev_write_zero_buffer_next(void *_bdev_io)
3573 {
3574 	struct spdk_bdev_io *bdev_io = _bdev_io;
3575 	uint64_t num_bytes, num_blocks;
3576 	int rc;
3577 
3578 	num_bytes = spdk_min(spdk_bdev_get_block_size(bdev_io->bdev) *
3579 			     bdev_io->u.bdev.split_remaining_num_blocks,
3580 			     ZERO_BUFFER_SIZE);
3581 	num_blocks = num_bytes / spdk_bdev_get_block_size(bdev_io->bdev);
3582 
3583 	rc = spdk_bdev_write_blocks(bdev_io->internal.desc,
3584 				    spdk_io_channel_from_ctx(bdev_io->internal.ch),
3585 				    g_bdev_mgr.zero_buffer,
3586 				    bdev_io->u.bdev.split_current_offset_blocks, num_blocks,
3587 				    _spdk_bdev_write_zero_buffer_done, bdev_io);
3588 	if (rc == 0) {
3589 		bdev_io->u.bdev.split_remaining_num_blocks -= num_blocks;
3590 		bdev_io->u.bdev.split_current_offset_blocks += num_blocks;
3591 	} else if (rc == -ENOMEM) {
3592 		_spdk_bdev_queue_io_wait_with_cb(bdev_io, _spdk_bdev_write_zero_buffer_next);
3593 	} else {
3594 		bdev_io->internal.status = SPDK_BDEV_IO_STATUS_FAILED;
3595 		bdev_io->internal.cb(bdev_io, false, bdev_io->internal.caller_ctx);
3596 	}
3597 }
3598 
3599 static void
3600 _spdk_bdev_write_zero_buffer_done(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg)
3601 {
3602 	struct spdk_bdev_io *parent_io = cb_arg;
3603 
3604 	spdk_bdev_free_io(bdev_io);
3605 
3606 	if (!success) {
3607 		parent_io->internal.status = SPDK_BDEV_IO_STATUS_FAILED;
3608 		parent_io->internal.cb(parent_io, false, parent_io->internal.caller_ctx);
3609 		return;
3610 	}
3611 
3612 	if (parent_io->u.bdev.split_remaining_num_blocks == 0) {
3613 		parent_io->internal.status = SPDK_BDEV_IO_STATUS_SUCCESS;
3614 		parent_io->internal.cb(parent_io, true, parent_io->internal.caller_ctx);
3615 		return;
3616 	}
3617 
3618 	_spdk_bdev_write_zero_buffer_next(parent_io);
3619 }
3620 
3621 struct set_qos_limit_ctx {
3622 	void (*cb_fn)(void *cb_arg, int status);
3623 	void *cb_arg;
3624 	struct spdk_bdev *bdev;
3625 };
3626 
3627 static void
3628 _spdk_bdev_set_qos_limit_done(struct set_qos_limit_ctx *ctx, int status)
3629 {
3630 	pthread_mutex_lock(&ctx->bdev->internal.mutex);
3631 	ctx->bdev->internal.qos_mod_in_progress = false;
3632 	pthread_mutex_unlock(&ctx->bdev->internal.mutex);
3633 
3634 	ctx->cb_fn(ctx->cb_arg, status);
3635 	free(ctx);
3636 }
3637 
3638 static void
3639 _spdk_bdev_disable_qos_done(void *cb_arg)
3640 {
3641 	struct set_qos_limit_ctx *ctx = cb_arg;
3642 	struct spdk_bdev *bdev = ctx->bdev;
3643 	struct spdk_bdev_io *bdev_io;
3644 	struct spdk_bdev_qos *qos;
3645 
3646 	pthread_mutex_lock(&bdev->internal.mutex);
3647 	qos = bdev->internal.qos;
3648 	bdev->internal.qos = NULL;
3649 	pthread_mutex_unlock(&bdev->internal.mutex);
3650 
3651 	while (!TAILQ_EMPTY(&qos->queued)) {
3652 		/* Send queued I/O back to their original thread for resubmission. */
3653 		bdev_io = TAILQ_FIRST(&qos->queued);
3654 		TAILQ_REMOVE(&qos->queued, bdev_io, internal.link);
3655 
3656 		if (bdev_io->internal.io_submit_ch) {
3657 			/*
3658 			 * Channel was changed when sending it to the QoS thread - change it back
3659 			 *  before sending it back to the original thread.
3660 			 */
3661 			bdev_io->internal.ch = bdev_io->internal.io_submit_ch;
3662 			bdev_io->internal.io_submit_ch = NULL;
3663 		}
3664 
3665 		spdk_thread_send_msg(spdk_io_channel_get_thread(bdev_io->internal.ch->channel),
3666 				     _spdk_bdev_io_submit, bdev_io);
3667 	}
3668 
3669 	spdk_put_io_channel(spdk_io_channel_from_ctx(qos->ch));
3670 	spdk_poller_unregister(&qos->poller);
3671 
3672 	free(qos);
3673 
3674 	_spdk_bdev_set_qos_limit_done(ctx, 0);
3675 }
3676 
3677 static void
3678 _spdk_bdev_disable_qos_msg_done(struct spdk_io_channel_iter *i, int status)
3679 {
3680 	void *io_device = spdk_io_channel_iter_get_io_device(i);
3681 	struct spdk_bdev *bdev = __bdev_from_io_dev(io_device);
3682 	struct set_qos_limit_ctx *ctx = spdk_io_channel_iter_get_ctx(i);
3683 	struct spdk_thread *thread;
3684 
3685 	pthread_mutex_lock(&bdev->internal.mutex);
3686 	thread = bdev->internal.qos->thread;
3687 	pthread_mutex_unlock(&bdev->internal.mutex);
3688 
3689 	spdk_thread_send_msg(thread, _spdk_bdev_disable_qos_done, ctx);
3690 }
3691 
3692 static void
3693 _spdk_bdev_disable_qos_msg(struct spdk_io_channel_iter *i)
3694 {
3695 	struct spdk_io_channel *ch = spdk_io_channel_iter_get_channel(i);
3696 	struct spdk_bdev_channel *bdev_ch = spdk_io_channel_get_ctx(ch);
3697 
3698 	bdev_ch->flags &= ~BDEV_CH_QOS_ENABLED;
3699 
3700 	spdk_for_each_channel_continue(i, 0);
3701 }
3702 
3703 static void
3704 _spdk_bdev_update_qos_rate_limit_msg(void *cb_arg)
3705 {
3706 	struct set_qos_limit_ctx *ctx = cb_arg;
3707 	struct spdk_bdev *bdev = ctx->bdev;
3708 
3709 	pthread_mutex_lock(&bdev->internal.mutex);
3710 	spdk_bdev_qos_update_max_quota_per_timeslice(bdev->internal.qos);
3711 	pthread_mutex_unlock(&bdev->internal.mutex);
3712 
3713 	_spdk_bdev_set_qos_limit_done(ctx, 0);
3714 }
3715 
3716 static void
3717 _spdk_bdev_enable_qos_msg(struct spdk_io_channel_iter *i)
3718 {
3719 	void *io_device = spdk_io_channel_iter_get_io_device(i);
3720 	struct spdk_bdev *bdev = __bdev_from_io_dev(io_device);
3721 	struct spdk_io_channel *ch = spdk_io_channel_iter_get_channel(i);
3722 	struct spdk_bdev_channel *bdev_ch = spdk_io_channel_get_ctx(ch);
3723 
3724 	pthread_mutex_lock(&bdev->internal.mutex);
3725 	_spdk_bdev_enable_qos(bdev, bdev_ch);
3726 	pthread_mutex_unlock(&bdev->internal.mutex);
3727 	spdk_for_each_channel_continue(i, 0);
3728 }
3729 
3730 static void
3731 _spdk_bdev_enable_qos_done(struct spdk_io_channel_iter *i, int status)
3732 {
3733 	struct set_qos_limit_ctx *ctx = spdk_io_channel_iter_get_ctx(i);
3734 
3735 	_spdk_bdev_set_qos_limit_done(ctx, status);
3736 }
3737 
3738 static void
3739 _spdk_bdev_set_qos_rate_limits(struct spdk_bdev *bdev, uint64_t *limits)
3740 {
3741 	int i;
3742 
3743 	assert(bdev->internal.qos != NULL);
3744 
3745 	for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) {
3746 		if (limits[i] != SPDK_BDEV_QOS_LIMIT_NOT_DEFINED) {
3747 			bdev->internal.qos->rate_limits[i].limit = limits[i];
3748 
3749 			if (limits[i] == 0) {
3750 				bdev->internal.qos->rate_limits[i].limit =
3751 					SPDK_BDEV_QOS_LIMIT_NOT_DEFINED;
3752 			}
3753 		}
3754 	}
3755 }
3756 
3757 void
3758 spdk_bdev_set_qos_rate_limits(struct spdk_bdev *bdev, uint64_t *limits,
3759 			      void (*cb_fn)(void *cb_arg, int status), void *cb_arg)
3760 {
3761 	struct set_qos_limit_ctx	*ctx;
3762 	uint32_t			limit_set_complement;
3763 	uint64_t			min_limit_per_sec;
3764 	int				i;
3765 	bool				disable_rate_limit = true;
3766 
3767 	for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) {
3768 		if (limits[i] == SPDK_BDEV_QOS_LIMIT_NOT_DEFINED) {
3769 			continue;
3770 		}
3771 
3772 		if (limits[i] > 0) {
3773 			disable_rate_limit = false;
3774 		}
3775 
3776 		if (_spdk_bdev_qos_is_iops_rate_limit(i) == true) {
3777 			min_limit_per_sec = SPDK_BDEV_QOS_MIN_IOS_PER_SEC;
3778 		} else {
3779 			/* Change from megabyte to byte rate limit */
3780 			limits[i] = limits[i] * 1024 * 1024;
3781 			min_limit_per_sec = SPDK_BDEV_QOS_MIN_BYTES_PER_SEC;
3782 		}
3783 
3784 		limit_set_complement = limits[i] % min_limit_per_sec;
3785 		if (limit_set_complement) {
3786 			SPDK_ERRLOG("Requested rate limit %" PRIu64 " is not a multiple of %" PRIu64 "\n",
3787 				    limits[i], min_limit_per_sec);
3788 			limits[i] += min_limit_per_sec - limit_set_complement;
3789 			SPDK_ERRLOG("Round up the rate limit to %" PRIu64 "\n", limits[i]);
3790 		}
3791 	}
3792 
3793 	ctx = calloc(1, sizeof(*ctx));
3794 	if (ctx == NULL) {
3795 		cb_fn(cb_arg, -ENOMEM);
3796 		return;
3797 	}
3798 
3799 	ctx->cb_fn = cb_fn;
3800 	ctx->cb_arg = cb_arg;
3801 	ctx->bdev = bdev;
3802 
3803 	pthread_mutex_lock(&bdev->internal.mutex);
3804 	if (bdev->internal.qos_mod_in_progress) {
3805 		pthread_mutex_unlock(&bdev->internal.mutex);
3806 		free(ctx);
3807 		cb_fn(cb_arg, -EAGAIN);
3808 		return;
3809 	}
3810 	bdev->internal.qos_mod_in_progress = true;
3811 
3812 	if (disable_rate_limit == true && bdev->internal.qos) {
3813 		for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) {
3814 			if (limits[i] == SPDK_BDEV_QOS_LIMIT_NOT_DEFINED &&
3815 			    (bdev->internal.qos->rate_limits[i].limit > 0 &&
3816 			     bdev->internal.qos->rate_limits[i].limit !=
3817 			     SPDK_BDEV_QOS_LIMIT_NOT_DEFINED)) {
3818 				disable_rate_limit = false;
3819 				break;
3820 			}
3821 		}
3822 	}
3823 
3824 	if (disable_rate_limit == false) {
3825 		if (bdev->internal.qos == NULL) {
3826 			/* Enabling */
3827 			bdev->internal.qos = calloc(1, sizeof(*bdev->internal.qos));
3828 			if (!bdev->internal.qos) {
3829 				pthread_mutex_unlock(&bdev->internal.mutex);
3830 				SPDK_ERRLOG("Unable to allocate memory for QoS tracking\n");
3831 				free(ctx);
3832 				cb_fn(cb_arg, -ENOMEM);
3833 				return;
3834 			}
3835 
3836 			_spdk_bdev_set_qos_rate_limits(bdev, limits);
3837 
3838 			spdk_for_each_channel(__bdev_to_io_dev(bdev),
3839 					      _spdk_bdev_enable_qos_msg, ctx,
3840 					      _spdk_bdev_enable_qos_done);
3841 		} else {
3842 			/* Updating */
3843 			_spdk_bdev_set_qos_rate_limits(bdev, limits);
3844 
3845 			spdk_thread_send_msg(bdev->internal.qos->thread,
3846 					     _spdk_bdev_update_qos_rate_limit_msg, ctx);
3847 		}
3848 	} else {
3849 		if (bdev->internal.qos != NULL) {
3850 			_spdk_bdev_set_qos_rate_limits(bdev, limits);
3851 
3852 			/* Disabling */
3853 			spdk_for_each_channel(__bdev_to_io_dev(bdev),
3854 					      _spdk_bdev_disable_qos_msg, ctx,
3855 					      _spdk_bdev_disable_qos_msg_done);
3856 		} else {
3857 			pthread_mutex_unlock(&bdev->internal.mutex);
3858 			_spdk_bdev_set_qos_limit_done(ctx, 0);
3859 			return;
3860 		}
3861 	}
3862 
3863 	pthread_mutex_unlock(&bdev->internal.mutex);
3864 }
3865 
3866 SPDK_LOG_REGISTER_COMPONENT("bdev", SPDK_LOG_BDEV)
3867 
3868 SPDK_TRACE_REGISTER_FN(bdev_trace)
3869 {
3870 	spdk_trace_register_owner(OWNER_BDEV, 'b');
3871 	spdk_trace_register_object(OBJECT_BDEV_IO, 'i');
3872 	spdk_trace_register_description("BDEV_IO_START", "", TRACE_BDEV_IO_START, OWNER_BDEV,
3873 					OBJECT_BDEV_IO, 1, 0, "type:   ");
3874 	spdk_trace_register_description("BDEV_IO_DONE", "", TRACE_BDEV_IO_DONE, OWNER_BDEV,
3875 					OBJECT_BDEV_IO, 0, 0, "");
3876 }
3877