xref: /spdk/lib/bdev/bdev.c (revision 1fc4165fe9bf8512483356ad8e6d27f793f2e3db)
1 /*-
2  *   BSD LICENSE
3  *
4  *   Copyright (c) Intel Corporation.
5  *   All rights reserved.
6  *
7  *   Redistribution and use in source and binary forms, with or without
8  *   modification, are permitted provided that the following conditions
9  *   are met:
10  *
11  *     * Redistributions of source code must retain the above copyright
12  *       notice, this list of conditions and the following disclaimer.
13  *     * Redistributions in binary form must reproduce the above copyright
14  *       notice, this list of conditions and the following disclaimer in
15  *       the documentation and/or other materials provided with the
16  *       distribution.
17  *     * Neither the name of Intel Corporation nor the names of its
18  *       contributors may be used to endorse or promote products derived
19  *       from this software without specific prior written permission.
20  *
21  *   THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22  *   "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23  *   LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
24  *   A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
25  *   OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
26  *   SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
27  *   LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
28  *   DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
29  *   THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
30  *   (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
31  *   OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
32  */
33 
34 #include "spdk/stdinc.h"
35 
36 #include "spdk/bdev.h"
37 #include "spdk/conf.h"
38 
39 #include "spdk/config.h"
40 #include "spdk/env.h"
41 #include "spdk/event.h"
42 #include "spdk/thread.h"
43 #include "spdk/likely.h"
44 #include "spdk/queue.h"
45 #include "spdk/nvme_spec.h"
46 #include "spdk/scsi_spec.h"
47 #include "spdk/util.h"
48 #include "spdk/trace.h"
49 
50 #include "spdk/bdev_module.h"
51 #include "spdk_internal/log.h"
52 #include "spdk/string.h"
53 
54 #ifdef SPDK_CONFIG_VTUNE
55 #include "ittnotify.h"
56 #include "ittnotify_types.h"
57 int __itt_init_ittlib(const char *, __itt_group_id);
58 #endif
59 
60 #define SPDK_BDEV_IO_POOL_SIZE			(64 * 1024)
61 #define SPDK_BDEV_IO_CACHE_SIZE			256
62 #define BUF_SMALL_POOL_SIZE			8192
63 #define BUF_LARGE_POOL_SIZE			1024
64 #define NOMEM_THRESHOLD_COUNT			8
65 #define ZERO_BUFFER_SIZE			0x100000
66 
67 #define OWNER_BDEV		0x2
68 
69 #define OBJECT_BDEV_IO		0x2
70 
71 #define TRACE_GROUP_BDEV	0x3
72 #define TRACE_BDEV_IO_START	SPDK_TPOINT_ID(TRACE_GROUP_BDEV, 0x0)
73 #define TRACE_BDEV_IO_DONE	SPDK_TPOINT_ID(TRACE_GROUP_BDEV, 0x1)
74 
75 #define SPDK_BDEV_QOS_TIMESLICE_IN_USEC		1000
76 #define SPDK_BDEV_QOS_MIN_IO_PER_TIMESLICE	1
77 #define SPDK_BDEV_QOS_MIN_BYTE_PER_TIMESLICE	512
78 #define SPDK_BDEV_QOS_MIN_IOS_PER_SEC		10000
79 #define SPDK_BDEV_QOS_MIN_BYTES_PER_SEC		(10 * 1024 * 1024)
80 #define SPDK_BDEV_QOS_LIMIT_NOT_DEFINED		UINT64_MAX
81 
82 #define SPDK_BDEV_POOL_ALIGNMENT 512
83 
84 static const char *qos_conf_type[] = {"Limit_IOPS",
85 				      "Limit_BPS", "Limit_Read_BPS", "Limit_Write_BPS"
86 				     };
87 static const char *qos_rpc_type[] = {"rw_ios_per_sec",
88 				     "rw_mbytes_per_sec", "r_mbytes_per_sec", "w_mbytes_per_sec"
89 				    };
90 
91 TAILQ_HEAD(spdk_bdev_list, spdk_bdev);
92 
93 struct spdk_bdev_mgr {
94 	struct spdk_mempool *bdev_io_pool;
95 
96 	struct spdk_mempool *buf_small_pool;
97 	struct spdk_mempool *buf_large_pool;
98 
99 	void *zero_buffer;
100 
101 	TAILQ_HEAD(bdev_module_list, spdk_bdev_module) bdev_modules;
102 
103 	struct spdk_bdev_list bdevs;
104 
105 	bool init_complete;
106 	bool module_init_complete;
107 
108 #ifdef SPDK_CONFIG_VTUNE
109 	__itt_domain	*domain;
110 #endif
111 };
112 
113 static struct spdk_bdev_mgr g_bdev_mgr = {
114 	.bdev_modules = TAILQ_HEAD_INITIALIZER(g_bdev_mgr.bdev_modules),
115 	.bdevs = TAILQ_HEAD_INITIALIZER(g_bdev_mgr.bdevs),
116 	.init_complete = false,
117 	.module_init_complete = false,
118 };
119 
120 static struct spdk_bdev_opts	g_bdev_opts = {
121 	.bdev_io_pool_size = SPDK_BDEV_IO_POOL_SIZE,
122 	.bdev_io_cache_size = SPDK_BDEV_IO_CACHE_SIZE,
123 };
124 
125 static spdk_bdev_init_cb	g_init_cb_fn = NULL;
126 static void			*g_init_cb_arg = NULL;
127 
128 static spdk_bdev_fini_cb	g_fini_cb_fn = NULL;
129 static void			*g_fini_cb_arg = NULL;
130 static struct spdk_thread	*g_fini_thread = NULL;
131 
132 struct spdk_bdev_qos_limit {
133 	/** IOs or bytes allowed per second (i.e., 1s). */
134 	uint64_t limit;
135 
136 	/** Remaining IOs or bytes allowed in current timeslice (e.g., 1ms).
137 	 *  For remaining bytes, allowed to run negative if an I/O is submitted when
138 	 *  some bytes are remaining, but the I/O is bigger than that amount. The
139 	 *  excess will be deducted from the next timeslice.
140 	 */
141 	int64_t remaining_this_timeslice;
142 
143 	/** Minimum allowed IOs or bytes to be issued in one timeslice (e.g., 1ms). */
144 	uint32_t min_per_timeslice;
145 
146 	/** Maximum allowed IOs or bytes to be issued in one timeslice (e.g., 1ms). */
147 	uint32_t max_per_timeslice;
148 
149 	/** Function to check whether to queue the IO. */
150 	bool (*queue_io)(const struct spdk_bdev_qos_limit *limit, struct spdk_bdev_io *io);
151 
152 	/** Function to update for the submitted IO. */
153 	void (*update_quota)(struct spdk_bdev_qos_limit *limit, struct spdk_bdev_io *io);
154 };
155 
156 struct spdk_bdev_qos {
157 	/** Types of structure of rate limits. */
158 	struct spdk_bdev_qos_limit rate_limits[SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES];
159 
160 	/** The channel that all I/O are funneled through. */
161 	struct spdk_bdev_channel *ch;
162 
163 	/** The thread on which the poller is running. */
164 	struct spdk_thread *thread;
165 
166 	/** Queue of I/O waiting to be issued. */
167 	bdev_io_tailq_t queued;
168 
169 	/** Size of a timeslice in tsc ticks. */
170 	uint64_t timeslice_size;
171 
172 	/** Timestamp of start of last timeslice. */
173 	uint64_t last_timeslice;
174 
175 	/** Poller that processes queued I/O commands each time slice. */
176 	struct spdk_poller *poller;
177 };
178 
179 struct spdk_bdev_mgmt_channel {
180 	bdev_io_stailq_t need_buf_small;
181 	bdev_io_stailq_t need_buf_large;
182 
183 	/*
184 	 * Each thread keeps a cache of bdev_io - this allows
185 	 *  bdev threads which are *not* DPDK threads to still
186 	 *  benefit from a per-thread bdev_io cache.  Without
187 	 *  this, non-DPDK threads fetching from the mempool
188 	 *  incur a cmpxchg on get and put.
189 	 */
190 	bdev_io_stailq_t per_thread_cache;
191 	uint32_t	per_thread_cache_count;
192 	uint32_t	bdev_io_cache_size;
193 
194 	TAILQ_HEAD(, spdk_bdev_shared_resource)	shared_resources;
195 	TAILQ_HEAD(, spdk_bdev_io_wait_entry)	io_wait_queue;
196 };
197 
198 /*
199  * Per-module (or per-io_device) data. Multiple bdevs built on the same io_device
200  * will queue here their IO that awaits retry. It makes it possible to retry sending
201  * IO to one bdev after IO from other bdev completes.
202  */
203 struct spdk_bdev_shared_resource {
204 	/* The bdev management channel */
205 	struct spdk_bdev_mgmt_channel *mgmt_ch;
206 
207 	/*
208 	 * Count of I/O submitted to bdev module and waiting for completion.
209 	 * Incremented before submit_request() is called on an spdk_bdev_io.
210 	 */
211 	uint64_t		io_outstanding;
212 
213 	/*
214 	 * Queue of IO awaiting retry because of a previous NOMEM status returned
215 	 *  on this channel.
216 	 */
217 	bdev_io_tailq_t		nomem_io;
218 
219 	/*
220 	 * Threshold which io_outstanding must drop to before retrying nomem_io.
221 	 */
222 	uint64_t		nomem_threshold;
223 
224 	/* I/O channel allocated by a bdev module */
225 	struct spdk_io_channel	*shared_ch;
226 
227 	/* Refcount of bdev channels using this resource */
228 	uint32_t		ref;
229 
230 	TAILQ_ENTRY(spdk_bdev_shared_resource) link;
231 };
232 
233 #define BDEV_CH_RESET_IN_PROGRESS	(1 << 0)
234 #define BDEV_CH_QOS_ENABLED		(1 << 1)
235 
236 struct spdk_bdev_channel {
237 	struct spdk_bdev	*bdev;
238 
239 	/* The channel for the underlying device */
240 	struct spdk_io_channel	*channel;
241 
242 	/* Per io_device per thread data */
243 	struct spdk_bdev_shared_resource *shared_resource;
244 
245 	struct spdk_bdev_io_stat stat;
246 
247 	/*
248 	 * Count of I/O submitted through this channel and waiting for completion.
249 	 * Incremented before submit_request() is called on an spdk_bdev_io.
250 	 */
251 	uint64_t		io_outstanding;
252 
253 	bdev_io_tailq_t		queued_resets;
254 
255 	uint32_t		flags;
256 
257 	struct spdk_histogram_data *histogram;
258 
259 #ifdef SPDK_CONFIG_VTUNE
260 	uint64_t		start_tsc;
261 	uint64_t		interval_tsc;
262 	__itt_string_handle	*handle;
263 	struct spdk_bdev_io_stat prev_stat;
264 #endif
265 
266 };
267 
268 struct spdk_bdev_desc {
269 	struct spdk_bdev		*bdev;
270 	struct spdk_thread		*thread;
271 	spdk_bdev_remove_cb_t		remove_cb;
272 	void				*remove_ctx;
273 	bool				remove_scheduled;
274 	bool				closed;
275 	bool				write;
276 	TAILQ_ENTRY(spdk_bdev_desc)	link;
277 };
278 
279 struct spdk_bdev_iostat_ctx {
280 	struct spdk_bdev_io_stat *stat;
281 	spdk_bdev_get_device_stat_cb cb;
282 	void *cb_arg;
283 };
284 
285 struct set_qos_limit_ctx {
286 	void (*cb_fn)(void *cb_arg, int status);
287 	void *cb_arg;
288 	struct spdk_bdev *bdev;
289 };
290 
291 #define __bdev_to_io_dev(bdev)		(((char *)bdev) + 1)
292 #define __bdev_from_io_dev(io_dev)	((struct spdk_bdev *)(((char *)io_dev) - 1))
293 
294 static void _spdk_bdev_write_zero_buffer_done(struct spdk_bdev_io *bdev_io, bool success,
295 		void *cb_arg);
296 static void _spdk_bdev_write_zero_buffer_next(void *_bdev_io);
297 
298 static void _spdk_bdev_enable_qos_msg(struct spdk_io_channel_iter *i);
299 static void _spdk_bdev_enable_qos_done(struct spdk_io_channel_iter *i, int status);
300 
301 void
302 spdk_bdev_get_opts(struct spdk_bdev_opts *opts)
303 {
304 	*opts = g_bdev_opts;
305 }
306 
307 int
308 spdk_bdev_set_opts(struct spdk_bdev_opts *opts)
309 {
310 	uint32_t min_pool_size;
311 
312 	/*
313 	 * Add 1 to the thread count to account for the extra mgmt_ch that gets created during subsystem
314 	 *  initialization.  A second mgmt_ch will be created on the same thread when the application starts
315 	 *  but before the deferred put_io_channel event is executed for the first mgmt_ch.
316 	 */
317 	min_pool_size = opts->bdev_io_cache_size * (spdk_thread_get_count() + 1);
318 	if (opts->bdev_io_pool_size < min_pool_size) {
319 		SPDK_ERRLOG("bdev_io_pool_size %" PRIu32 " is not compatible with bdev_io_cache_size %" PRIu32
320 			    " and %" PRIu32 " threads\n", opts->bdev_io_pool_size, opts->bdev_io_cache_size,
321 			    spdk_thread_get_count());
322 		SPDK_ERRLOG("bdev_io_pool_size must be at least %" PRIu32 "\n", min_pool_size);
323 		return -1;
324 	}
325 
326 	g_bdev_opts = *opts;
327 	return 0;
328 }
329 
330 struct spdk_bdev *
331 spdk_bdev_first(void)
332 {
333 	struct spdk_bdev *bdev;
334 
335 	bdev = TAILQ_FIRST(&g_bdev_mgr.bdevs);
336 	if (bdev) {
337 		SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Starting bdev iteration at %s\n", bdev->name);
338 	}
339 
340 	return bdev;
341 }
342 
343 struct spdk_bdev *
344 spdk_bdev_next(struct spdk_bdev *prev)
345 {
346 	struct spdk_bdev *bdev;
347 
348 	bdev = TAILQ_NEXT(prev, internal.link);
349 	if (bdev) {
350 		SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Continuing bdev iteration at %s\n", bdev->name);
351 	}
352 
353 	return bdev;
354 }
355 
356 static struct spdk_bdev *
357 _bdev_next_leaf(struct spdk_bdev *bdev)
358 {
359 	while (bdev != NULL) {
360 		if (bdev->internal.claim_module == NULL) {
361 			return bdev;
362 		} else {
363 			bdev = TAILQ_NEXT(bdev, internal.link);
364 		}
365 	}
366 
367 	return bdev;
368 }
369 
370 struct spdk_bdev *
371 spdk_bdev_first_leaf(void)
372 {
373 	struct spdk_bdev *bdev;
374 
375 	bdev = _bdev_next_leaf(TAILQ_FIRST(&g_bdev_mgr.bdevs));
376 
377 	if (bdev) {
378 		SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Starting bdev iteration at %s\n", bdev->name);
379 	}
380 
381 	return bdev;
382 }
383 
384 struct spdk_bdev *
385 spdk_bdev_next_leaf(struct spdk_bdev *prev)
386 {
387 	struct spdk_bdev *bdev;
388 
389 	bdev = _bdev_next_leaf(TAILQ_NEXT(prev, internal.link));
390 
391 	if (bdev) {
392 		SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Continuing bdev iteration at %s\n", bdev->name);
393 	}
394 
395 	return bdev;
396 }
397 
398 struct spdk_bdev *
399 spdk_bdev_get_by_name(const char *bdev_name)
400 {
401 	struct spdk_bdev_alias *tmp;
402 	struct spdk_bdev *bdev = spdk_bdev_first();
403 
404 	while (bdev != NULL) {
405 		if (strcmp(bdev_name, bdev->name) == 0) {
406 			return bdev;
407 		}
408 
409 		TAILQ_FOREACH(tmp, &bdev->aliases, tailq) {
410 			if (strcmp(bdev_name, tmp->alias) == 0) {
411 				return bdev;
412 			}
413 		}
414 
415 		bdev = spdk_bdev_next(bdev);
416 	}
417 
418 	return NULL;
419 }
420 
421 void
422 spdk_bdev_io_set_buf(struct spdk_bdev_io *bdev_io, void *buf, size_t len)
423 {
424 	struct iovec *iovs;
425 
426 	iovs = bdev_io->u.bdev.iovs;
427 
428 	assert(iovs != NULL);
429 	assert(bdev_io->u.bdev.iovcnt >= 1);
430 
431 	iovs[0].iov_base = buf;
432 	iovs[0].iov_len = len;
433 }
434 
435 static bool
436 _is_buf_allocated(struct iovec *iovs)
437 {
438 	return iovs[0].iov_base != NULL;
439 }
440 
441 static bool
442 _are_iovs_aligned(struct iovec *iovs, int iovcnt, uint32_t alignment)
443 {
444 	int i;
445 	uintptr_t iov_base;
446 
447 	if (spdk_likely(alignment == 1)) {
448 		return true;
449 	}
450 
451 	for (i = 0; i < iovcnt; i++) {
452 		iov_base = (uintptr_t)iovs[i].iov_base;
453 		if ((iov_base & (alignment - 1)) != 0) {
454 			return false;
455 		}
456 	}
457 
458 	return true;
459 }
460 
461 static void
462 _copy_iovs_to_buf(void *buf, size_t buf_len, struct iovec *iovs, int iovcnt)
463 {
464 	int i;
465 	size_t len;
466 
467 	for (i = 0; i < iovcnt; i++) {
468 		len = spdk_min(iovs[i].iov_len, buf_len);
469 		memcpy(buf, iovs[i].iov_base, len);
470 		buf += len;
471 		buf_len -= len;
472 	}
473 }
474 
475 static void
476 _copy_buf_to_iovs(struct iovec *iovs, int iovcnt, void *buf, size_t buf_len)
477 {
478 	int i;
479 	size_t len;
480 
481 	for (i = 0; i < iovcnt; i++) {
482 		len = spdk_min(iovs[i].iov_len, buf_len);
483 		memcpy(iovs[i].iov_base, buf, len);
484 		buf += len;
485 		buf_len -= len;
486 	}
487 }
488 
489 static void
490 _bdev_io_set_bounce_buf(struct spdk_bdev_io *bdev_io, void *buf, size_t len)
491 {
492 	/* save original iovec */
493 	bdev_io->internal.orig_iovs = bdev_io->u.bdev.iovs;
494 	bdev_io->internal.orig_iovcnt = bdev_io->u.bdev.iovcnt;
495 	/* set bounce iov */
496 	bdev_io->u.bdev.iovs = &bdev_io->internal.bounce_iov;
497 	bdev_io->u.bdev.iovcnt = 1;
498 	/* set bounce buffer for this operation */
499 	bdev_io->u.bdev.iovs[0].iov_base = buf;
500 	bdev_io->u.bdev.iovs[0].iov_len = len;
501 	/* if this is write path, copy data from original buffer to bounce buffer */
502 	if (bdev_io->type == SPDK_BDEV_IO_TYPE_WRITE) {
503 		_copy_iovs_to_buf(buf, len, bdev_io->internal.orig_iovs, bdev_io->internal.orig_iovcnt);
504 	}
505 }
506 
507 static void
508 spdk_bdev_io_put_buf(struct spdk_bdev_io *bdev_io)
509 {
510 	struct spdk_mempool *pool;
511 	struct spdk_bdev_io *tmp;
512 	void *buf, *aligned_buf;
513 	bdev_io_stailq_t *stailq;
514 	struct spdk_bdev_mgmt_channel *ch;
515 	uint64_t buf_len;
516 	uint64_t alignment;
517 	bool buf_allocated;
518 
519 	buf = bdev_io->internal.buf;
520 	buf_len = bdev_io->internal.buf_len;
521 	alignment = spdk_bdev_get_buf_align(bdev_io->bdev);
522 	ch = bdev_io->internal.ch->shared_resource->mgmt_ch;
523 
524 	bdev_io->internal.buf = NULL;
525 
526 	if (buf_len + alignment <= SPDK_BDEV_SMALL_BUF_MAX_SIZE + SPDK_BDEV_POOL_ALIGNMENT) {
527 		pool = g_bdev_mgr.buf_small_pool;
528 		stailq = &ch->need_buf_small;
529 	} else {
530 		pool = g_bdev_mgr.buf_large_pool;
531 		stailq = &ch->need_buf_large;
532 	}
533 
534 	if (STAILQ_EMPTY(stailq)) {
535 		spdk_mempool_put(pool, buf);
536 	} else {
537 		tmp = STAILQ_FIRST(stailq);
538 
539 		alignment = spdk_bdev_get_buf_align(tmp->bdev);
540 		buf_allocated = _is_buf_allocated(tmp->u.bdev.iovs);
541 
542 		aligned_buf = (void *)(((uintptr_t)buf +
543 					(alignment - 1)) & ~(alignment - 1));
544 		if (buf_allocated) {
545 			_bdev_io_set_bounce_buf(tmp, aligned_buf, tmp->internal.buf_len);
546 		} else {
547 			spdk_bdev_io_set_buf(tmp, aligned_buf, tmp->internal.buf_len);
548 		}
549 
550 		STAILQ_REMOVE_HEAD(stailq, internal.buf_link);
551 		tmp->internal.buf = buf;
552 		tmp->internal.get_buf_cb(tmp->internal.ch->channel, tmp, true);
553 	}
554 }
555 
556 static void
557 _bdev_io_unset_bounce_buf(struct spdk_bdev_io *bdev_io)
558 {
559 	/* if this is read path, copy data from bounce buffer to original buffer */
560 	if (bdev_io->type == SPDK_BDEV_IO_TYPE_READ &&
561 	    bdev_io->internal.status == SPDK_BDEV_IO_STATUS_SUCCESS) {
562 		_copy_buf_to_iovs(bdev_io->internal.orig_iovs, bdev_io->internal.orig_iovcnt,
563 				  bdev_io->internal.bounce_iov.iov_base, bdev_io->internal.bounce_iov.iov_len);
564 	}
565 	/* set orignal buffer for this io */
566 	bdev_io->u.bdev.iovcnt = bdev_io->internal.orig_iovcnt;
567 	bdev_io->u.bdev.iovs = bdev_io->internal.orig_iovs;
568 	/* disable bouncing buffer for this io */
569 	bdev_io->internal.orig_iovcnt = 0;
570 	bdev_io->internal.orig_iovs = NULL;
571 	/* return bounce buffer to the pool */
572 	spdk_bdev_io_put_buf(bdev_io);
573 }
574 
575 void
576 spdk_bdev_io_get_buf(struct spdk_bdev_io *bdev_io, spdk_bdev_io_get_buf_cb cb, uint64_t len)
577 {
578 	struct spdk_mempool *pool;
579 	bdev_io_stailq_t *stailq;
580 	void *buf, *aligned_buf;
581 	struct spdk_bdev_mgmt_channel *mgmt_ch;
582 	uint64_t alignment;
583 	bool buf_allocated;
584 
585 	assert(cb != NULL);
586 	assert(bdev_io->u.bdev.iovs != NULL);
587 
588 	alignment = spdk_bdev_get_buf_align(bdev_io->bdev);
589 	buf_allocated = _is_buf_allocated(bdev_io->u.bdev.iovs);
590 
591 	if (buf_allocated &&
592 	    _are_iovs_aligned(bdev_io->u.bdev.iovs, bdev_io->u.bdev.iovcnt, alignment)) {
593 		/* Buffer already present and aligned */
594 		cb(bdev_io->internal.ch->channel, bdev_io, true);
595 		return;
596 	}
597 
598 	if (len + alignment > SPDK_BDEV_LARGE_BUF_MAX_SIZE + SPDK_BDEV_POOL_ALIGNMENT) {
599 		SPDK_ERRLOG("Length + alignment %" PRIu64 " is larger than allowed\n",
600 			    len + alignment);
601 		cb(bdev_io->internal.ch->channel, bdev_io, false);
602 		return;
603 	}
604 
605 	mgmt_ch = bdev_io->internal.ch->shared_resource->mgmt_ch;
606 
607 	bdev_io->internal.buf_len = len;
608 	bdev_io->internal.get_buf_cb = cb;
609 
610 	if (len + alignment <= SPDK_BDEV_SMALL_BUF_MAX_SIZE + SPDK_BDEV_POOL_ALIGNMENT) {
611 		pool = g_bdev_mgr.buf_small_pool;
612 		stailq = &mgmt_ch->need_buf_small;
613 	} else {
614 		pool = g_bdev_mgr.buf_large_pool;
615 		stailq = &mgmt_ch->need_buf_large;
616 	}
617 
618 	buf = spdk_mempool_get(pool);
619 
620 	if (!buf) {
621 		STAILQ_INSERT_TAIL(stailq, bdev_io, internal.buf_link);
622 	} else {
623 		aligned_buf = (void *)(((uintptr_t)buf + (alignment - 1)) & ~(alignment - 1));
624 
625 		if (buf_allocated) {
626 			_bdev_io_set_bounce_buf(bdev_io, aligned_buf, len);
627 		} else {
628 			spdk_bdev_io_set_buf(bdev_io, aligned_buf, len);
629 		}
630 		bdev_io->internal.buf = buf;
631 		bdev_io->internal.get_buf_cb(bdev_io->internal.ch->channel, bdev_io, true);
632 	}
633 }
634 
635 static int
636 spdk_bdev_module_get_max_ctx_size(void)
637 {
638 	struct spdk_bdev_module *bdev_module;
639 	int max_bdev_module_size = 0;
640 
641 	TAILQ_FOREACH(bdev_module, &g_bdev_mgr.bdev_modules, internal.tailq) {
642 		if (bdev_module->get_ctx_size && bdev_module->get_ctx_size() > max_bdev_module_size) {
643 			max_bdev_module_size = bdev_module->get_ctx_size();
644 		}
645 	}
646 
647 	return max_bdev_module_size;
648 }
649 
650 void
651 spdk_bdev_config_text(FILE *fp)
652 {
653 	struct spdk_bdev_module *bdev_module;
654 
655 	TAILQ_FOREACH(bdev_module, &g_bdev_mgr.bdev_modules, internal.tailq) {
656 		if (bdev_module->config_text) {
657 			bdev_module->config_text(fp);
658 		}
659 	}
660 }
661 
662 static void
663 spdk_bdev_qos_config_json(struct spdk_bdev *bdev, struct spdk_json_write_ctx *w)
664 {
665 	int i;
666 	struct spdk_bdev_qos *qos = bdev->internal.qos;
667 	uint64_t limits[SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES];
668 
669 	if (!qos) {
670 		return;
671 	}
672 
673 	spdk_bdev_get_qos_rate_limits(bdev, limits);
674 
675 	spdk_json_write_object_begin(w);
676 	spdk_json_write_named_string(w, "method", "set_bdev_qos_limit");
677 
678 	spdk_json_write_named_object_begin(w, "params");
679 	spdk_json_write_named_string(w, "name", bdev->name);
680 	for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) {
681 		if (limits[i] > 0) {
682 			spdk_json_write_named_uint64(w, qos_rpc_type[i], limits[i]);
683 		}
684 	}
685 	spdk_json_write_object_end(w);
686 
687 	spdk_json_write_object_end(w);
688 }
689 
690 void
691 spdk_bdev_subsystem_config_json(struct spdk_json_write_ctx *w)
692 {
693 	struct spdk_bdev_module *bdev_module;
694 	struct spdk_bdev *bdev;
695 
696 	assert(w != NULL);
697 
698 	spdk_json_write_array_begin(w);
699 
700 	spdk_json_write_object_begin(w);
701 	spdk_json_write_named_string(w, "method", "set_bdev_options");
702 	spdk_json_write_named_object_begin(w, "params");
703 	spdk_json_write_named_uint32(w, "bdev_io_pool_size", g_bdev_opts.bdev_io_pool_size);
704 	spdk_json_write_named_uint32(w, "bdev_io_cache_size", g_bdev_opts.bdev_io_cache_size);
705 	spdk_json_write_object_end(w);
706 	spdk_json_write_object_end(w);
707 
708 	TAILQ_FOREACH(bdev_module, &g_bdev_mgr.bdev_modules, internal.tailq) {
709 		if (bdev_module->config_json) {
710 			bdev_module->config_json(w);
711 		}
712 	}
713 
714 	TAILQ_FOREACH(bdev, &g_bdev_mgr.bdevs, internal.link) {
715 		spdk_bdev_qos_config_json(bdev, w);
716 
717 		if (bdev->fn_table->write_config_json) {
718 			bdev->fn_table->write_config_json(bdev, w);
719 		}
720 	}
721 
722 	spdk_json_write_array_end(w);
723 }
724 
725 static int
726 spdk_bdev_mgmt_channel_create(void *io_device, void *ctx_buf)
727 {
728 	struct spdk_bdev_mgmt_channel *ch = ctx_buf;
729 	struct spdk_bdev_io *bdev_io;
730 	uint32_t i;
731 
732 	STAILQ_INIT(&ch->need_buf_small);
733 	STAILQ_INIT(&ch->need_buf_large);
734 
735 	STAILQ_INIT(&ch->per_thread_cache);
736 	ch->bdev_io_cache_size = g_bdev_opts.bdev_io_cache_size;
737 
738 	/* Pre-populate bdev_io cache to ensure this thread cannot be starved. */
739 	ch->per_thread_cache_count = 0;
740 	for (i = 0; i < ch->bdev_io_cache_size; i++) {
741 		bdev_io = spdk_mempool_get(g_bdev_mgr.bdev_io_pool);
742 		assert(bdev_io != NULL);
743 		ch->per_thread_cache_count++;
744 		STAILQ_INSERT_HEAD(&ch->per_thread_cache, bdev_io, internal.buf_link);
745 	}
746 
747 	TAILQ_INIT(&ch->shared_resources);
748 	TAILQ_INIT(&ch->io_wait_queue);
749 
750 	return 0;
751 }
752 
753 static void
754 spdk_bdev_mgmt_channel_destroy(void *io_device, void *ctx_buf)
755 {
756 	struct spdk_bdev_mgmt_channel *ch = ctx_buf;
757 	struct spdk_bdev_io *bdev_io;
758 
759 	if (!STAILQ_EMPTY(&ch->need_buf_small) || !STAILQ_EMPTY(&ch->need_buf_large)) {
760 		SPDK_ERRLOG("Pending I/O list wasn't empty on mgmt channel free\n");
761 	}
762 
763 	if (!TAILQ_EMPTY(&ch->shared_resources)) {
764 		SPDK_ERRLOG("Module channel list wasn't empty on mgmt channel free\n");
765 	}
766 
767 	while (!STAILQ_EMPTY(&ch->per_thread_cache)) {
768 		bdev_io = STAILQ_FIRST(&ch->per_thread_cache);
769 		STAILQ_REMOVE_HEAD(&ch->per_thread_cache, internal.buf_link);
770 		ch->per_thread_cache_count--;
771 		spdk_mempool_put(g_bdev_mgr.bdev_io_pool, (void *)bdev_io);
772 	}
773 
774 	assert(ch->per_thread_cache_count == 0);
775 }
776 
777 static void
778 spdk_bdev_init_complete(int rc)
779 {
780 	spdk_bdev_init_cb cb_fn = g_init_cb_fn;
781 	void *cb_arg = g_init_cb_arg;
782 	struct spdk_bdev_module *m;
783 
784 	g_bdev_mgr.init_complete = true;
785 	g_init_cb_fn = NULL;
786 	g_init_cb_arg = NULL;
787 
788 	/*
789 	 * For modules that need to know when subsystem init is complete,
790 	 * inform them now.
791 	 */
792 	if (rc == 0) {
793 		TAILQ_FOREACH(m, &g_bdev_mgr.bdev_modules, internal.tailq) {
794 			if (m->init_complete) {
795 				m->init_complete();
796 			}
797 		}
798 	}
799 
800 	cb_fn(cb_arg, rc);
801 }
802 
803 static void
804 spdk_bdev_module_action_complete(void)
805 {
806 	struct spdk_bdev_module *m;
807 
808 	/*
809 	 * Don't finish bdev subsystem initialization if
810 	 * module pre-initialization is still in progress, or
811 	 * the subsystem been already initialized.
812 	 */
813 	if (!g_bdev_mgr.module_init_complete || g_bdev_mgr.init_complete) {
814 		return;
815 	}
816 
817 	/*
818 	 * Check all bdev modules for inits/examinations in progress. If any
819 	 * exist, return immediately since we cannot finish bdev subsystem
820 	 * initialization until all are completed.
821 	 */
822 	TAILQ_FOREACH(m, &g_bdev_mgr.bdev_modules, internal.tailq) {
823 		if (m->internal.action_in_progress > 0) {
824 			return;
825 		}
826 	}
827 
828 	/*
829 	 * Modules already finished initialization - now that all
830 	 * the bdev modules have finished their asynchronous I/O
831 	 * processing, the entire bdev layer can be marked as complete.
832 	 */
833 	spdk_bdev_init_complete(0);
834 }
835 
836 static void
837 spdk_bdev_module_action_done(struct spdk_bdev_module *module)
838 {
839 	assert(module->internal.action_in_progress > 0);
840 	module->internal.action_in_progress--;
841 	spdk_bdev_module_action_complete();
842 }
843 
844 void
845 spdk_bdev_module_init_done(struct spdk_bdev_module *module)
846 {
847 	spdk_bdev_module_action_done(module);
848 }
849 
850 void
851 spdk_bdev_module_examine_done(struct spdk_bdev_module *module)
852 {
853 	spdk_bdev_module_action_done(module);
854 }
855 
856 /** The last initialized bdev module */
857 static struct spdk_bdev_module *g_resume_bdev_module = NULL;
858 
859 static int
860 spdk_bdev_modules_init(void)
861 {
862 	struct spdk_bdev_module *module;
863 	int rc = 0;
864 
865 	TAILQ_FOREACH(module, &g_bdev_mgr.bdev_modules, internal.tailq) {
866 		g_resume_bdev_module = module;
867 		rc = module->module_init();
868 		if (rc != 0) {
869 			return rc;
870 		}
871 	}
872 
873 	g_resume_bdev_module = NULL;
874 	return 0;
875 }
876 
877 
878 static void
879 spdk_bdev_init_failed_complete(void *cb_arg)
880 {
881 	spdk_bdev_init_complete(-1);
882 }
883 
884 static void
885 spdk_bdev_init_failed(void *cb_arg)
886 {
887 	spdk_bdev_finish(spdk_bdev_init_failed_complete, NULL);
888 }
889 
890 void
891 spdk_bdev_initialize(spdk_bdev_init_cb cb_fn, void *cb_arg)
892 {
893 	struct spdk_conf_section *sp;
894 	struct spdk_bdev_opts bdev_opts;
895 	int32_t bdev_io_pool_size, bdev_io_cache_size;
896 	int cache_size;
897 	int rc = 0;
898 	char mempool_name[32];
899 
900 	assert(cb_fn != NULL);
901 
902 	sp = spdk_conf_find_section(NULL, "Bdev");
903 	if (sp != NULL) {
904 		spdk_bdev_get_opts(&bdev_opts);
905 
906 		bdev_io_pool_size = spdk_conf_section_get_intval(sp, "BdevIoPoolSize");
907 		if (bdev_io_pool_size >= 0) {
908 			bdev_opts.bdev_io_pool_size = bdev_io_pool_size;
909 		}
910 
911 		bdev_io_cache_size = spdk_conf_section_get_intval(sp, "BdevIoCacheSize");
912 		if (bdev_io_cache_size >= 0) {
913 			bdev_opts.bdev_io_cache_size = bdev_io_cache_size;
914 		}
915 
916 		if (spdk_bdev_set_opts(&bdev_opts)) {
917 			spdk_bdev_init_complete(-1);
918 			return;
919 		}
920 
921 		assert(memcmp(&bdev_opts, &g_bdev_opts, sizeof(bdev_opts)) == 0);
922 	}
923 
924 	g_init_cb_fn = cb_fn;
925 	g_init_cb_arg = cb_arg;
926 
927 	snprintf(mempool_name, sizeof(mempool_name), "bdev_io_%d", getpid());
928 
929 	g_bdev_mgr.bdev_io_pool = spdk_mempool_create(mempool_name,
930 				  g_bdev_opts.bdev_io_pool_size,
931 				  sizeof(struct spdk_bdev_io) +
932 				  spdk_bdev_module_get_max_ctx_size(),
933 				  0,
934 				  SPDK_ENV_SOCKET_ID_ANY);
935 
936 	if (g_bdev_mgr.bdev_io_pool == NULL) {
937 		SPDK_ERRLOG("could not allocate spdk_bdev_io pool\n");
938 		spdk_bdev_init_complete(-1);
939 		return;
940 	}
941 
942 	/**
943 	 * Ensure no more than half of the total buffers end up local caches, by
944 	 *   using spdk_thread_get_count() to determine how many local caches we need
945 	 *   to account for.
946 	 */
947 	cache_size = BUF_SMALL_POOL_SIZE / (2 * spdk_thread_get_count());
948 	snprintf(mempool_name, sizeof(mempool_name), "buf_small_pool_%d", getpid());
949 
950 	g_bdev_mgr.buf_small_pool = spdk_mempool_create(mempool_name,
951 				    BUF_SMALL_POOL_SIZE,
952 				    SPDK_BDEV_SMALL_BUF_MAX_SIZE + SPDK_BDEV_POOL_ALIGNMENT,
953 				    cache_size,
954 				    SPDK_ENV_SOCKET_ID_ANY);
955 	if (!g_bdev_mgr.buf_small_pool) {
956 		SPDK_ERRLOG("create rbuf small pool failed\n");
957 		spdk_bdev_init_complete(-1);
958 		return;
959 	}
960 
961 	cache_size = BUF_LARGE_POOL_SIZE / (2 * spdk_thread_get_count());
962 	snprintf(mempool_name, sizeof(mempool_name), "buf_large_pool_%d", getpid());
963 
964 	g_bdev_mgr.buf_large_pool = spdk_mempool_create(mempool_name,
965 				    BUF_LARGE_POOL_SIZE,
966 				    SPDK_BDEV_LARGE_BUF_MAX_SIZE + SPDK_BDEV_POOL_ALIGNMENT,
967 				    cache_size,
968 				    SPDK_ENV_SOCKET_ID_ANY);
969 	if (!g_bdev_mgr.buf_large_pool) {
970 		SPDK_ERRLOG("create rbuf large pool failed\n");
971 		spdk_bdev_init_complete(-1);
972 		return;
973 	}
974 
975 	g_bdev_mgr.zero_buffer = spdk_dma_zmalloc(ZERO_BUFFER_SIZE, ZERO_BUFFER_SIZE,
976 				 NULL);
977 	if (!g_bdev_mgr.zero_buffer) {
978 		SPDK_ERRLOG("create bdev zero buffer failed\n");
979 		spdk_bdev_init_complete(-1);
980 		return;
981 	}
982 
983 #ifdef SPDK_CONFIG_VTUNE
984 	g_bdev_mgr.domain = __itt_domain_create("spdk_bdev");
985 #endif
986 
987 	spdk_io_device_register(&g_bdev_mgr, spdk_bdev_mgmt_channel_create,
988 				spdk_bdev_mgmt_channel_destroy,
989 				sizeof(struct spdk_bdev_mgmt_channel),
990 				"bdev_mgr");
991 
992 	rc = spdk_bdev_modules_init();
993 	g_bdev_mgr.module_init_complete = true;
994 	if (rc != 0) {
995 		SPDK_ERRLOG("bdev modules init failed\n");
996 		spdk_thread_send_msg(spdk_get_thread(), spdk_bdev_init_failed, NULL);
997 		return;
998 	}
999 
1000 	spdk_bdev_module_action_complete();
1001 }
1002 
1003 static void
1004 spdk_bdev_mgr_unregister_cb(void *io_device)
1005 {
1006 	spdk_bdev_fini_cb cb_fn = g_fini_cb_fn;
1007 
1008 	if (spdk_mempool_count(g_bdev_mgr.bdev_io_pool) != g_bdev_opts.bdev_io_pool_size) {
1009 		SPDK_ERRLOG("bdev IO pool count is %zu but should be %u\n",
1010 			    spdk_mempool_count(g_bdev_mgr.bdev_io_pool),
1011 			    g_bdev_opts.bdev_io_pool_size);
1012 	}
1013 
1014 	if (spdk_mempool_count(g_bdev_mgr.buf_small_pool) != BUF_SMALL_POOL_SIZE) {
1015 		SPDK_ERRLOG("Small buffer pool count is %zu but should be %u\n",
1016 			    spdk_mempool_count(g_bdev_mgr.buf_small_pool),
1017 			    BUF_SMALL_POOL_SIZE);
1018 		assert(false);
1019 	}
1020 
1021 	if (spdk_mempool_count(g_bdev_mgr.buf_large_pool) != BUF_LARGE_POOL_SIZE) {
1022 		SPDK_ERRLOG("Large buffer pool count is %zu but should be %u\n",
1023 			    spdk_mempool_count(g_bdev_mgr.buf_large_pool),
1024 			    BUF_LARGE_POOL_SIZE);
1025 		assert(false);
1026 	}
1027 
1028 	spdk_mempool_free(g_bdev_mgr.bdev_io_pool);
1029 	spdk_mempool_free(g_bdev_mgr.buf_small_pool);
1030 	spdk_mempool_free(g_bdev_mgr.buf_large_pool);
1031 	spdk_dma_free(g_bdev_mgr.zero_buffer);
1032 
1033 	cb_fn(g_fini_cb_arg);
1034 	g_fini_cb_fn = NULL;
1035 	g_fini_cb_arg = NULL;
1036 	g_bdev_mgr.init_complete = false;
1037 	g_bdev_mgr.module_init_complete = false;
1038 }
1039 
1040 static void
1041 spdk_bdev_module_finish_iter(void *arg)
1042 {
1043 	struct spdk_bdev_module *bdev_module;
1044 
1045 	/* Start iterating from the last touched module */
1046 	if (!g_resume_bdev_module) {
1047 		bdev_module = TAILQ_LAST(&g_bdev_mgr.bdev_modules, bdev_module_list);
1048 	} else {
1049 		bdev_module = TAILQ_PREV(g_resume_bdev_module, bdev_module_list,
1050 					 internal.tailq);
1051 	}
1052 
1053 	while (bdev_module) {
1054 		if (bdev_module->async_fini) {
1055 			/* Save our place so we can resume later. We must
1056 			 * save the variable here, before calling module_fini()
1057 			 * below, because in some cases the module may immediately
1058 			 * call spdk_bdev_module_finish_done() and re-enter
1059 			 * this function to continue iterating. */
1060 			g_resume_bdev_module = bdev_module;
1061 		}
1062 
1063 		if (bdev_module->module_fini) {
1064 			bdev_module->module_fini();
1065 		}
1066 
1067 		if (bdev_module->async_fini) {
1068 			return;
1069 		}
1070 
1071 		bdev_module = TAILQ_PREV(bdev_module, bdev_module_list,
1072 					 internal.tailq);
1073 	}
1074 
1075 	g_resume_bdev_module = NULL;
1076 	spdk_io_device_unregister(&g_bdev_mgr, spdk_bdev_mgr_unregister_cb);
1077 }
1078 
1079 void
1080 spdk_bdev_module_finish_done(void)
1081 {
1082 	if (spdk_get_thread() != g_fini_thread) {
1083 		spdk_thread_send_msg(g_fini_thread, spdk_bdev_module_finish_iter, NULL);
1084 	} else {
1085 		spdk_bdev_module_finish_iter(NULL);
1086 	}
1087 }
1088 
1089 static void
1090 _spdk_bdev_finish_unregister_bdevs_iter(void *cb_arg, int bdeverrno)
1091 {
1092 	struct spdk_bdev *bdev = cb_arg;
1093 
1094 	if (bdeverrno && bdev) {
1095 		SPDK_WARNLOG("Unable to unregister bdev '%s' during spdk_bdev_finish()\n",
1096 			     bdev->name);
1097 
1098 		/*
1099 		 * Since the call to spdk_bdev_unregister() failed, we have no way to free this
1100 		 *  bdev; try to continue by manually removing this bdev from the list and continue
1101 		 *  with the next bdev in the list.
1102 		 */
1103 		TAILQ_REMOVE(&g_bdev_mgr.bdevs, bdev, internal.link);
1104 	}
1105 
1106 	if (TAILQ_EMPTY(&g_bdev_mgr.bdevs)) {
1107 		SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Done unregistering bdevs\n");
1108 		/*
1109 		 * Bdev module finish need to be deferred as we might be in the middle of some context
1110 		 * (like bdev part free) that will use this bdev (or private bdev driver ctx data)
1111 		 * after returning.
1112 		 */
1113 		spdk_thread_send_msg(spdk_get_thread(), spdk_bdev_module_finish_iter, NULL);
1114 		return;
1115 	}
1116 
1117 	/*
1118 	 * Unregister last unclaimed bdev in the list, to ensure that bdev subsystem
1119 	 * shutdown proceeds top-down. The goal is to give virtual bdevs an opportunity
1120 	 * to detect clean shutdown as opposed to run-time hot removal of the underlying
1121 	 * base bdevs.
1122 	 *
1123 	 * Also, walk the list in the reverse order.
1124 	 */
1125 	for (bdev = TAILQ_LAST(&g_bdev_mgr.bdevs, spdk_bdev_list);
1126 	     bdev; bdev = TAILQ_PREV(bdev, spdk_bdev_list, internal.link)) {
1127 		if (bdev->internal.claim_module != NULL) {
1128 			SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Skipping claimed bdev '%s'(<-'%s').\n",
1129 				      bdev->name, bdev->internal.claim_module->name);
1130 			continue;
1131 		}
1132 
1133 		SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Unregistering bdev '%s'\n", bdev->name);
1134 		spdk_bdev_unregister(bdev, _spdk_bdev_finish_unregister_bdevs_iter, bdev);
1135 		return;
1136 	}
1137 
1138 	/*
1139 	 * If any bdev fails to unclaim underlying bdev properly, we may face the
1140 	 * case of bdev list consisting of claimed bdevs only (if claims are managed
1141 	 * correctly, this would mean there's a loop in the claims graph which is
1142 	 * clearly impossible). Warn and unregister last bdev on the list then.
1143 	 */
1144 	for (bdev = TAILQ_LAST(&g_bdev_mgr.bdevs, spdk_bdev_list);
1145 	     bdev; bdev = TAILQ_PREV(bdev, spdk_bdev_list, internal.link)) {
1146 		SPDK_ERRLOG("Unregistering claimed bdev '%s'!\n", bdev->name);
1147 		spdk_bdev_unregister(bdev, _spdk_bdev_finish_unregister_bdevs_iter, bdev);
1148 		return;
1149 	}
1150 }
1151 
1152 void
1153 spdk_bdev_finish(spdk_bdev_fini_cb cb_fn, void *cb_arg)
1154 {
1155 	struct spdk_bdev_module *m;
1156 
1157 	assert(cb_fn != NULL);
1158 
1159 	g_fini_thread = spdk_get_thread();
1160 
1161 	g_fini_cb_fn = cb_fn;
1162 	g_fini_cb_arg = cb_arg;
1163 
1164 	TAILQ_FOREACH(m, &g_bdev_mgr.bdev_modules, internal.tailq) {
1165 		if (m->fini_start) {
1166 			m->fini_start();
1167 		}
1168 	}
1169 
1170 	_spdk_bdev_finish_unregister_bdevs_iter(NULL, 0);
1171 }
1172 
1173 static struct spdk_bdev_io *
1174 spdk_bdev_get_io(struct spdk_bdev_channel *channel)
1175 {
1176 	struct spdk_bdev_mgmt_channel *ch = channel->shared_resource->mgmt_ch;
1177 	struct spdk_bdev_io *bdev_io;
1178 
1179 	if (ch->per_thread_cache_count > 0) {
1180 		bdev_io = STAILQ_FIRST(&ch->per_thread_cache);
1181 		STAILQ_REMOVE_HEAD(&ch->per_thread_cache, internal.buf_link);
1182 		ch->per_thread_cache_count--;
1183 	} else if (spdk_unlikely(!TAILQ_EMPTY(&ch->io_wait_queue))) {
1184 		/*
1185 		 * Don't try to look for bdev_ios in the global pool if there are
1186 		 * waiters on bdev_ios - we don't want this caller to jump the line.
1187 		 */
1188 		bdev_io = NULL;
1189 	} else {
1190 		bdev_io = spdk_mempool_get(g_bdev_mgr.bdev_io_pool);
1191 	}
1192 
1193 	return bdev_io;
1194 }
1195 
1196 void
1197 spdk_bdev_free_io(struct spdk_bdev_io *bdev_io)
1198 {
1199 	struct spdk_bdev_mgmt_channel *ch = bdev_io->internal.ch->shared_resource->mgmt_ch;
1200 
1201 	assert(bdev_io != NULL);
1202 	assert(bdev_io->internal.status != SPDK_BDEV_IO_STATUS_PENDING);
1203 
1204 	if (bdev_io->internal.buf != NULL) {
1205 		spdk_bdev_io_put_buf(bdev_io);
1206 	}
1207 
1208 	if (ch->per_thread_cache_count < ch->bdev_io_cache_size) {
1209 		ch->per_thread_cache_count++;
1210 		STAILQ_INSERT_HEAD(&ch->per_thread_cache, bdev_io, internal.buf_link);
1211 		while (ch->per_thread_cache_count > 0 && !TAILQ_EMPTY(&ch->io_wait_queue)) {
1212 			struct spdk_bdev_io_wait_entry *entry;
1213 
1214 			entry = TAILQ_FIRST(&ch->io_wait_queue);
1215 			TAILQ_REMOVE(&ch->io_wait_queue, entry, link);
1216 			entry->cb_fn(entry->cb_arg);
1217 		}
1218 	} else {
1219 		/* We should never have a full cache with entries on the io wait queue. */
1220 		assert(TAILQ_EMPTY(&ch->io_wait_queue));
1221 		spdk_mempool_put(g_bdev_mgr.bdev_io_pool, (void *)bdev_io);
1222 	}
1223 }
1224 
1225 static bool
1226 _spdk_bdev_qos_is_iops_rate_limit(enum spdk_bdev_qos_rate_limit_type limit)
1227 {
1228 	assert(limit != SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES);
1229 
1230 	switch (limit) {
1231 	case SPDK_BDEV_QOS_RW_IOPS_RATE_LIMIT:
1232 		return true;
1233 	case SPDK_BDEV_QOS_RW_BPS_RATE_LIMIT:
1234 	case SPDK_BDEV_QOS_R_BPS_RATE_LIMIT:
1235 	case SPDK_BDEV_QOS_W_BPS_RATE_LIMIT:
1236 		return false;
1237 	case SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES:
1238 	default:
1239 		return false;
1240 	}
1241 }
1242 
1243 static bool
1244 _spdk_bdev_qos_io_to_limit(struct spdk_bdev_io *bdev_io)
1245 {
1246 	switch (bdev_io->type) {
1247 	case SPDK_BDEV_IO_TYPE_NVME_IO:
1248 	case SPDK_BDEV_IO_TYPE_NVME_IO_MD:
1249 	case SPDK_BDEV_IO_TYPE_READ:
1250 	case SPDK_BDEV_IO_TYPE_WRITE:
1251 		return true;
1252 	default:
1253 		return false;
1254 	}
1255 }
1256 
1257 static bool
1258 _spdk_bdev_is_read_io(struct spdk_bdev_io *bdev_io)
1259 {
1260 	switch (bdev_io->type) {
1261 	case SPDK_BDEV_IO_TYPE_NVME_IO:
1262 	case SPDK_BDEV_IO_TYPE_NVME_IO_MD:
1263 		/* Bit 1 (0x2) set for read operation */
1264 		if (bdev_io->u.nvme_passthru.cmd.opc & SPDK_NVME_OPC_READ) {
1265 			return true;
1266 		} else {
1267 			return false;
1268 		}
1269 	case SPDK_BDEV_IO_TYPE_READ:
1270 		return true;
1271 	default:
1272 		return false;
1273 	}
1274 }
1275 
1276 static uint64_t
1277 _spdk_bdev_get_io_size_in_byte(struct spdk_bdev_io *bdev_io)
1278 {
1279 	struct spdk_bdev	*bdev = bdev_io->bdev;
1280 
1281 	switch (bdev_io->type) {
1282 	case SPDK_BDEV_IO_TYPE_NVME_IO:
1283 	case SPDK_BDEV_IO_TYPE_NVME_IO_MD:
1284 		return bdev_io->u.nvme_passthru.nbytes;
1285 	case SPDK_BDEV_IO_TYPE_READ:
1286 	case SPDK_BDEV_IO_TYPE_WRITE:
1287 		return bdev_io->u.bdev.num_blocks * bdev->blocklen;
1288 	default:
1289 		return 0;
1290 	}
1291 }
1292 
1293 static bool
1294 _spdk_bdev_qos_rw_queue_io(const struct spdk_bdev_qos_limit *limit, struct spdk_bdev_io *io)
1295 {
1296 	if (limit->max_per_timeslice > 0 && limit->remaining_this_timeslice <= 0) {
1297 		return true;
1298 	} else {
1299 		return false;
1300 	}
1301 }
1302 
1303 static bool
1304 _spdk_bdev_qos_r_queue_io(const struct spdk_bdev_qos_limit *limit, struct spdk_bdev_io *io)
1305 {
1306 	if (_spdk_bdev_is_read_io(io) == false) {
1307 		return false;
1308 	}
1309 
1310 	return _spdk_bdev_qos_rw_queue_io(limit, io);
1311 }
1312 
1313 static bool
1314 _spdk_bdev_qos_w_queue_io(const struct spdk_bdev_qos_limit *limit, struct spdk_bdev_io *io)
1315 {
1316 	if (_spdk_bdev_is_read_io(io) == true) {
1317 		return false;
1318 	}
1319 
1320 	return _spdk_bdev_qos_rw_queue_io(limit, io);
1321 }
1322 
1323 static void
1324 _spdk_bdev_qos_rw_iops_update_quota(struct spdk_bdev_qos_limit *limit, struct spdk_bdev_io *io)
1325 {
1326 	limit->remaining_this_timeslice--;
1327 }
1328 
1329 static void
1330 _spdk_bdev_qos_rw_bps_update_quota(struct spdk_bdev_qos_limit *limit, struct spdk_bdev_io *io)
1331 {
1332 	limit->remaining_this_timeslice -= _spdk_bdev_get_io_size_in_byte(io);
1333 }
1334 
1335 static void
1336 _spdk_bdev_qos_r_bps_update_quota(struct spdk_bdev_qos_limit *limit, struct spdk_bdev_io *io)
1337 {
1338 	if (_spdk_bdev_is_read_io(io) == false) {
1339 		return;
1340 	}
1341 
1342 	return _spdk_bdev_qos_rw_bps_update_quota(limit, io);
1343 }
1344 
1345 static void
1346 _spdk_bdev_qos_w_bps_update_quota(struct spdk_bdev_qos_limit *limit, struct spdk_bdev_io *io)
1347 {
1348 	if (_spdk_bdev_is_read_io(io) == true) {
1349 		return;
1350 	}
1351 
1352 	return _spdk_bdev_qos_rw_bps_update_quota(limit, io);
1353 }
1354 
1355 static void
1356 _spdk_bdev_qos_set_ops(struct spdk_bdev_qos *qos)
1357 {
1358 	int i;
1359 
1360 	for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) {
1361 		if (qos->rate_limits[i].limit == SPDK_BDEV_QOS_LIMIT_NOT_DEFINED) {
1362 			qos->rate_limits[i].queue_io = NULL;
1363 			qos->rate_limits[i].update_quota = NULL;
1364 			continue;
1365 		}
1366 
1367 		switch (i) {
1368 		case SPDK_BDEV_QOS_RW_IOPS_RATE_LIMIT:
1369 			qos->rate_limits[i].queue_io = _spdk_bdev_qos_rw_queue_io;
1370 			qos->rate_limits[i].update_quota = _spdk_bdev_qos_rw_iops_update_quota;
1371 			break;
1372 		case SPDK_BDEV_QOS_RW_BPS_RATE_LIMIT:
1373 			qos->rate_limits[i].queue_io = _spdk_bdev_qos_rw_queue_io;
1374 			qos->rate_limits[i].update_quota = _spdk_bdev_qos_rw_bps_update_quota;
1375 			break;
1376 		case SPDK_BDEV_QOS_R_BPS_RATE_LIMIT:
1377 			qos->rate_limits[i].queue_io = _spdk_bdev_qos_r_queue_io;
1378 			qos->rate_limits[i].update_quota = _spdk_bdev_qos_r_bps_update_quota;
1379 			break;
1380 		case SPDK_BDEV_QOS_W_BPS_RATE_LIMIT:
1381 			qos->rate_limits[i].queue_io = _spdk_bdev_qos_w_queue_io;
1382 			qos->rate_limits[i].update_quota = _spdk_bdev_qos_w_bps_update_quota;
1383 			break;
1384 		default:
1385 			break;
1386 		}
1387 	}
1388 }
1389 
1390 static int
1391 _spdk_bdev_qos_io_submit(struct spdk_bdev_channel *ch, struct spdk_bdev_qos *qos)
1392 {
1393 	struct spdk_bdev_io		*bdev_io = NULL, *tmp = NULL;
1394 	struct spdk_bdev		*bdev = ch->bdev;
1395 	struct spdk_bdev_shared_resource *shared_resource = ch->shared_resource;
1396 	int				i, submitted_ios = 0;
1397 
1398 	TAILQ_FOREACH_SAFE(bdev_io, &qos->queued, internal.link, tmp) {
1399 		if (_spdk_bdev_qos_io_to_limit(bdev_io) == true) {
1400 			for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) {
1401 				if (!qos->rate_limits[i].queue_io) {
1402 					continue;
1403 				}
1404 
1405 				if (qos->rate_limits[i].queue_io(&qos->rate_limits[i],
1406 								 bdev_io) == true) {
1407 					return submitted_ios;
1408 				}
1409 			}
1410 			for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) {
1411 				if (!qos->rate_limits[i].update_quota) {
1412 					continue;
1413 				}
1414 
1415 				qos->rate_limits[i].update_quota(&qos->rate_limits[i], bdev_io);
1416 			}
1417 		}
1418 
1419 		TAILQ_REMOVE(&qos->queued, bdev_io, internal.link);
1420 		ch->io_outstanding++;
1421 		shared_resource->io_outstanding++;
1422 		bdev_io->internal.in_submit_request = true;
1423 		bdev->fn_table->submit_request(ch->channel, bdev_io);
1424 		bdev_io->internal.in_submit_request = false;
1425 		submitted_ios++;
1426 	}
1427 
1428 	return submitted_ios;
1429 }
1430 
1431 static void
1432 _spdk_bdev_queue_io_wait_with_cb(struct spdk_bdev_io *bdev_io, spdk_bdev_io_wait_cb cb_fn)
1433 {
1434 	int rc;
1435 
1436 	bdev_io->internal.waitq_entry.bdev = bdev_io->bdev;
1437 	bdev_io->internal.waitq_entry.cb_fn = cb_fn;
1438 	bdev_io->internal.waitq_entry.cb_arg = bdev_io;
1439 	rc = spdk_bdev_queue_io_wait(bdev_io->bdev, spdk_io_channel_from_ctx(bdev_io->internal.ch),
1440 				     &bdev_io->internal.waitq_entry);
1441 	if (rc != 0) {
1442 		SPDK_ERRLOG("Queue IO failed, rc=%d\n", rc);
1443 		bdev_io->internal.status = SPDK_BDEV_IO_STATUS_FAILED;
1444 		bdev_io->internal.cb(bdev_io, false, bdev_io->internal.caller_ctx);
1445 	}
1446 }
1447 
1448 static bool
1449 _spdk_bdev_io_type_can_split(uint8_t type)
1450 {
1451 	assert(type != SPDK_BDEV_IO_TYPE_INVALID);
1452 	assert(type < SPDK_BDEV_NUM_IO_TYPES);
1453 
1454 	/* Only split READ and WRITE I/O.  Theoretically other types of I/O like
1455 	 * UNMAP could be split, but these types of I/O are typically much larger
1456 	 * in size (sometimes the size of the entire block device), and the bdev
1457 	 * module can more efficiently split these types of I/O.  Plus those types
1458 	 * of I/O do not have a payload, which makes the splitting process simpler.
1459 	 */
1460 	if (type == SPDK_BDEV_IO_TYPE_READ || type == SPDK_BDEV_IO_TYPE_WRITE) {
1461 		return true;
1462 	} else {
1463 		return false;
1464 	}
1465 }
1466 
1467 static bool
1468 _spdk_bdev_io_should_split(struct spdk_bdev_io *bdev_io)
1469 {
1470 	uint64_t start_stripe, end_stripe;
1471 	uint32_t io_boundary = bdev_io->bdev->optimal_io_boundary;
1472 
1473 	if (io_boundary == 0) {
1474 		return false;
1475 	}
1476 
1477 	if (!_spdk_bdev_io_type_can_split(bdev_io->type)) {
1478 		return false;
1479 	}
1480 
1481 	start_stripe = bdev_io->u.bdev.offset_blocks;
1482 	end_stripe = start_stripe + bdev_io->u.bdev.num_blocks - 1;
1483 	/* Avoid expensive div operations if possible.  These spdk_u32 functions are very cheap. */
1484 	if (spdk_likely(spdk_u32_is_pow2(io_boundary))) {
1485 		start_stripe >>= spdk_u32log2(io_boundary);
1486 		end_stripe >>= spdk_u32log2(io_boundary);
1487 	} else {
1488 		start_stripe /= io_boundary;
1489 		end_stripe /= io_boundary;
1490 	}
1491 	return (start_stripe != end_stripe);
1492 }
1493 
1494 static uint32_t
1495 _to_next_boundary(uint64_t offset, uint32_t boundary)
1496 {
1497 	return (boundary - (offset % boundary));
1498 }
1499 
1500 static void
1501 _spdk_bdev_io_split_done(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg);
1502 
1503 static void
1504 _spdk_bdev_io_split_with_payload(void *_bdev_io)
1505 {
1506 	struct spdk_bdev_io *bdev_io = _bdev_io;
1507 	uint64_t current_offset, remaining;
1508 	uint32_t blocklen, to_next_boundary, to_next_boundary_bytes;
1509 	struct iovec *parent_iov, *iov;
1510 	uint64_t parent_iov_offset, iov_len;
1511 	uint32_t parent_iovpos, parent_iovcnt, child_iovcnt, iovcnt;
1512 	int rc;
1513 
1514 	remaining = bdev_io->u.bdev.split_remaining_num_blocks;
1515 	current_offset = bdev_io->u.bdev.split_current_offset_blocks;
1516 	blocklen = bdev_io->bdev->blocklen;
1517 	parent_iov_offset = (current_offset - bdev_io->u.bdev.offset_blocks) * blocklen;
1518 	parent_iovcnt = bdev_io->u.bdev.iovcnt;
1519 
1520 	for (parent_iovpos = 0; parent_iovpos < parent_iovcnt; parent_iovpos++) {
1521 		parent_iov = &bdev_io->u.bdev.iovs[parent_iovpos];
1522 		if (parent_iov_offset < parent_iov->iov_len) {
1523 			break;
1524 		}
1525 		parent_iov_offset -= parent_iov->iov_len;
1526 	}
1527 
1528 	child_iovcnt = 0;
1529 	while (remaining > 0 && parent_iovpos < parent_iovcnt && child_iovcnt < BDEV_IO_NUM_CHILD_IOV) {
1530 		to_next_boundary = _to_next_boundary(current_offset, bdev_io->bdev->optimal_io_boundary);
1531 		to_next_boundary = spdk_min(remaining, to_next_boundary);
1532 		to_next_boundary_bytes = to_next_boundary * blocklen;
1533 		iov = &bdev_io->child_iov[child_iovcnt];
1534 		iovcnt = 0;
1535 		while (to_next_boundary_bytes > 0 && parent_iovpos < parent_iovcnt &&
1536 		       child_iovcnt < BDEV_IO_NUM_CHILD_IOV) {
1537 			parent_iov = &bdev_io->u.bdev.iovs[parent_iovpos];
1538 			iov_len = spdk_min(to_next_boundary_bytes, parent_iov->iov_len - parent_iov_offset);
1539 			to_next_boundary_bytes -= iov_len;
1540 
1541 			bdev_io->child_iov[child_iovcnt].iov_base = parent_iov->iov_base + parent_iov_offset;
1542 			bdev_io->child_iov[child_iovcnt].iov_len = iov_len;
1543 
1544 			if (iov_len < parent_iov->iov_len - parent_iov_offset) {
1545 				parent_iov_offset += iov_len;
1546 			} else {
1547 				parent_iovpos++;
1548 				parent_iov_offset = 0;
1549 			}
1550 			child_iovcnt++;
1551 			iovcnt++;
1552 		}
1553 
1554 		if (to_next_boundary_bytes > 0) {
1555 			/* We had to stop this child I/O early because we ran out of
1556 			 *  child_iov space.  Make sure the iovs collected are valid and
1557 			 *  then adjust to_next_boundary before starting the child I/O.
1558 			 */
1559 			if ((to_next_boundary_bytes % blocklen) != 0) {
1560 				SPDK_ERRLOG("Remaining %" PRIu32 " is not multiple of block size %" PRIu32 "\n",
1561 					    to_next_boundary_bytes, blocklen);
1562 				bdev_io->internal.status = SPDK_BDEV_IO_STATUS_FAILED;
1563 				if (bdev_io->u.bdev.split_outstanding == 0) {
1564 					bdev_io->internal.cb(bdev_io, false, bdev_io->internal.caller_ctx);
1565 				}
1566 				return;
1567 			}
1568 			to_next_boundary -= to_next_boundary_bytes / blocklen;
1569 		}
1570 
1571 		bdev_io->u.bdev.split_outstanding++;
1572 
1573 		if (bdev_io->type == SPDK_BDEV_IO_TYPE_READ) {
1574 			rc = spdk_bdev_readv_blocks(bdev_io->internal.desc,
1575 						    spdk_io_channel_from_ctx(bdev_io->internal.ch),
1576 						    iov, iovcnt, current_offset, to_next_boundary,
1577 						    _spdk_bdev_io_split_done, bdev_io);
1578 		} else {
1579 			rc = spdk_bdev_writev_blocks(bdev_io->internal.desc,
1580 						     spdk_io_channel_from_ctx(bdev_io->internal.ch),
1581 						     iov, iovcnt, current_offset, to_next_boundary,
1582 						     _spdk_bdev_io_split_done, bdev_io);
1583 		}
1584 
1585 		if (rc == 0) {
1586 			current_offset += to_next_boundary;
1587 			remaining -= to_next_boundary;
1588 			bdev_io->u.bdev.split_current_offset_blocks = current_offset;
1589 			bdev_io->u.bdev.split_remaining_num_blocks = remaining;
1590 		} else {
1591 			bdev_io->u.bdev.split_outstanding--;
1592 			if (rc == -ENOMEM) {
1593 				if (bdev_io->u.bdev.split_outstanding == 0) {
1594 					/* No I/O is outstanding. Hence we should wait here. */
1595 					_spdk_bdev_queue_io_wait_with_cb(bdev_io,
1596 									 _spdk_bdev_io_split_with_payload);
1597 				}
1598 			} else {
1599 				bdev_io->internal.status = SPDK_BDEV_IO_STATUS_FAILED;
1600 				if (bdev_io->u.bdev.split_outstanding == 0) {
1601 					bdev_io->internal.cb(bdev_io, false, bdev_io->internal.caller_ctx);
1602 				}
1603 			}
1604 
1605 			return;
1606 		}
1607 	}
1608 }
1609 
1610 static void
1611 _spdk_bdev_io_split_done(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg)
1612 {
1613 	struct spdk_bdev_io *parent_io = cb_arg;
1614 
1615 	spdk_bdev_free_io(bdev_io);
1616 
1617 	if (!success) {
1618 		parent_io->internal.status = SPDK_BDEV_IO_STATUS_FAILED;
1619 	}
1620 	parent_io->u.bdev.split_outstanding--;
1621 	if (parent_io->u.bdev.split_outstanding != 0) {
1622 		return;
1623 	}
1624 
1625 	/*
1626 	 * Parent I/O finishes when all blocks are consumed or there is any failure of
1627 	 * child I/O and no outstanding child I/O.
1628 	 */
1629 	if (parent_io->u.bdev.split_remaining_num_blocks == 0 ||
1630 	    parent_io->internal.status != SPDK_BDEV_IO_STATUS_SUCCESS) {
1631 		parent_io->internal.cb(parent_io, parent_io->internal.status == SPDK_BDEV_IO_STATUS_SUCCESS,
1632 				       parent_io->internal.caller_ctx);
1633 		return;
1634 	}
1635 
1636 	/*
1637 	 * Continue with the splitting process.  This function will complete the parent I/O if the
1638 	 * splitting is done.
1639 	 */
1640 	_spdk_bdev_io_split_with_payload(parent_io);
1641 }
1642 
1643 static void
1644 _spdk_bdev_io_split(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io)
1645 {
1646 	assert(_spdk_bdev_io_type_can_split(bdev_io->type));
1647 
1648 	bdev_io->u.bdev.split_current_offset_blocks = bdev_io->u.bdev.offset_blocks;
1649 	bdev_io->u.bdev.split_remaining_num_blocks = bdev_io->u.bdev.num_blocks;
1650 	bdev_io->u.bdev.split_outstanding = 0;
1651 	bdev_io->internal.status = SPDK_BDEV_IO_STATUS_SUCCESS;
1652 
1653 	_spdk_bdev_io_split_with_payload(bdev_io);
1654 }
1655 
1656 static void
1657 _spdk_bdev_io_split_get_buf_cb(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io,
1658 			       bool success)
1659 {
1660 	if (!success) {
1661 		spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED);
1662 		return;
1663 	}
1664 
1665 	_spdk_bdev_io_split(ch, bdev_io);
1666 }
1667 
1668 /* Explicitly mark this inline, since it's used as a function pointer and otherwise won't
1669  *  be inlined, at least on some compilers.
1670  */
1671 static inline void
1672 _spdk_bdev_io_submit(void *ctx)
1673 {
1674 	struct spdk_bdev_io *bdev_io = ctx;
1675 	struct spdk_bdev *bdev = bdev_io->bdev;
1676 	struct spdk_bdev_channel *bdev_ch = bdev_io->internal.ch;
1677 	struct spdk_io_channel *ch = bdev_ch->channel;
1678 	struct spdk_bdev_shared_resource *shared_resource = bdev_ch->shared_resource;
1679 	uint64_t tsc;
1680 
1681 	tsc = spdk_get_ticks();
1682 	bdev_io->internal.submit_tsc = tsc;
1683 	spdk_trace_record_tsc(tsc, TRACE_BDEV_IO_START, 0, 0, (uintptr_t)bdev_io, bdev_io->type);
1684 	bdev_ch->io_outstanding++;
1685 	shared_resource->io_outstanding++;
1686 	bdev_io->internal.in_submit_request = true;
1687 	if (spdk_likely(bdev_ch->flags == 0)) {
1688 		if (spdk_likely(TAILQ_EMPTY(&shared_resource->nomem_io))) {
1689 			bdev->fn_table->submit_request(ch, bdev_io);
1690 		} else {
1691 			bdev_ch->io_outstanding--;
1692 			shared_resource->io_outstanding--;
1693 			TAILQ_INSERT_TAIL(&shared_resource->nomem_io, bdev_io, internal.link);
1694 		}
1695 	} else if (bdev_ch->flags & BDEV_CH_RESET_IN_PROGRESS) {
1696 		spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED);
1697 	} else if (bdev_ch->flags & BDEV_CH_QOS_ENABLED) {
1698 		bdev_ch->io_outstanding--;
1699 		shared_resource->io_outstanding--;
1700 		TAILQ_INSERT_TAIL(&bdev->internal.qos->queued, bdev_io, internal.link);
1701 		_spdk_bdev_qos_io_submit(bdev_ch, bdev->internal.qos);
1702 	} else {
1703 		SPDK_ERRLOG("unknown bdev_ch flag %x found\n", bdev_ch->flags);
1704 		spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED);
1705 	}
1706 	bdev_io->internal.in_submit_request = false;
1707 }
1708 
1709 static void
1710 spdk_bdev_io_submit(struct spdk_bdev_io *bdev_io)
1711 {
1712 	struct spdk_bdev *bdev = bdev_io->bdev;
1713 	struct spdk_thread *thread = spdk_io_channel_get_thread(bdev_io->internal.ch->channel);
1714 
1715 	assert(thread != NULL);
1716 	assert(bdev_io->internal.status == SPDK_BDEV_IO_STATUS_PENDING);
1717 
1718 	if (bdev->split_on_optimal_io_boundary && _spdk_bdev_io_should_split(bdev_io)) {
1719 		if (bdev_io->type == SPDK_BDEV_IO_TYPE_READ) {
1720 			spdk_bdev_io_get_buf(bdev_io, _spdk_bdev_io_split_get_buf_cb,
1721 					     bdev_io->u.bdev.num_blocks * bdev_io->bdev->blocklen);
1722 		} else {
1723 			_spdk_bdev_io_split(NULL, bdev_io);
1724 		}
1725 		return;
1726 	}
1727 
1728 	if (bdev_io->internal.ch->flags & BDEV_CH_QOS_ENABLED) {
1729 		if ((thread == bdev->internal.qos->thread) || !bdev->internal.qos->thread) {
1730 			_spdk_bdev_io_submit(bdev_io);
1731 		} else {
1732 			bdev_io->internal.io_submit_ch = bdev_io->internal.ch;
1733 			bdev_io->internal.ch = bdev->internal.qos->ch;
1734 			spdk_thread_send_msg(bdev->internal.qos->thread, _spdk_bdev_io_submit, bdev_io);
1735 		}
1736 	} else {
1737 		_spdk_bdev_io_submit(bdev_io);
1738 	}
1739 }
1740 
1741 static void
1742 spdk_bdev_io_submit_reset(struct spdk_bdev_io *bdev_io)
1743 {
1744 	struct spdk_bdev *bdev = bdev_io->bdev;
1745 	struct spdk_bdev_channel *bdev_ch = bdev_io->internal.ch;
1746 	struct spdk_io_channel *ch = bdev_ch->channel;
1747 
1748 	assert(bdev_io->internal.status == SPDK_BDEV_IO_STATUS_PENDING);
1749 
1750 	bdev_io->internal.in_submit_request = true;
1751 	bdev->fn_table->submit_request(ch, bdev_io);
1752 	bdev_io->internal.in_submit_request = false;
1753 }
1754 
1755 static void
1756 spdk_bdev_io_init(struct spdk_bdev_io *bdev_io,
1757 		  struct spdk_bdev *bdev, void *cb_arg,
1758 		  spdk_bdev_io_completion_cb cb)
1759 {
1760 	bdev_io->bdev = bdev;
1761 	bdev_io->internal.caller_ctx = cb_arg;
1762 	bdev_io->internal.cb = cb;
1763 	bdev_io->internal.status = SPDK_BDEV_IO_STATUS_PENDING;
1764 	bdev_io->internal.in_submit_request = false;
1765 	bdev_io->internal.buf = NULL;
1766 	bdev_io->internal.io_submit_ch = NULL;
1767 	bdev_io->internal.orig_iovs = NULL;
1768 	bdev_io->internal.orig_iovcnt = 0;
1769 }
1770 
1771 static bool
1772 _spdk_bdev_io_type_supported(struct spdk_bdev *bdev, enum spdk_bdev_io_type io_type)
1773 {
1774 	return bdev->fn_table->io_type_supported(bdev->ctxt, io_type);
1775 }
1776 
1777 bool
1778 spdk_bdev_io_type_supported(struct spdk_bdev *bdev, enum spdk_bdev_io_type io_type)
1779 {
1780 	bool supported;
1781 
1782 	supported = _spdk_bdev_io_type_supported(bdev, io_type);
1783 
1784 	if (!supported) {
1785 		switch (io_type) {
1786 		case SPDK_BDEV_IO_TYPE_WRITE_ZEROES:
1787 			/* The bdev layer will emulate write zeroes as long as write is supported. */
1788 			supported = _spdk_bdev_io_type_supported(bdev, SPDK_BDEV_IO_TYPE_WRITE);
1789 			break;
1790 		default:
1791 			break;
1792 		}
1793 	}
1794 
1795 	return supported;
1796 }
1797 
1798 int
1799 spdk_bdev_dump_info_json(struct spdk_bdev *bdev, struct spdk_json_write_ctx *w)
1800 {
1801 	if (bdev->fn_table->dump_info_json) {
1802 		return bdev->fn_table->dump_info_json(bdev->ctxt, w);
1803 	}
1804 
1805 	return 0;
1806 }
1807 
1808 static void
1809 spdk_bdev_qos_update_max_quota_per_timeslice(struct spdk_bdev_qos *qos)
1810 {
1811 	uint32_t max_per_timeslice = 0;
1812 	int i;
1813 
1814 	for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) {
1815 		if (qos->rate_limits[i].limit == SPDK_BDEV_QOS_LIMIT_NOT_DEFINED) {
1816 			qos->rate_limits[i].max_per_timeslice = 0;
1817 			continue;
1818 		}
1819 
1820 		max_per_timeslice = qos->rate_limits[i].limit *
1821 				    SPDK_BDEV_QOS_TIMESLICE_IN_USEC / SPDK_SEC_TO_USEC;
1822 
1823 		qos->rate_limits[i].max_per_timeslice = spdk_max(max_per_timeslice,
1824 							qos->rate_limits[i].min_per_timeslice);
1825 
1826 		qos->rate_limits[i].remaining_this_timeslice = qos->rate_limits[i].max_per_timeslice;
1827 	}
1828 
1829 	_spdk_bdev_qos_set_ops(qos);
1830 }
1831 
1832 static int
1833 spdk_bdev_channel_poll_qos(void *arg)
1834 {
1835 	struct spdk_bdev_qos *qos = arg;
1836 	uint64_t now = spdk_get_ticks();
1837 	int i;
1838 
1839 	if (now < (qos->last_timeslice + qos->timeslice_size)) {
1840 		/* We received our callback earlier than expected - return
1841 		 *  immediately and wait to do accounting until at least one
1842 		 *  timeslice has actually expired.  This should never happen
1843 		 *  with a well-behaved timer implementation.
1844 		 */
1845 		return 0;
1846 	}
1847 
1848 	/* Reset for next round of rate limiting */
1849 	for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) {
1850 		/* We may have allowed the IOs or bytes to slightly overrun in the last
1851 		 * timeslice. remaining_this_timeslice is signed, so if it's negative
1852 		 * here, we'll account for the overrun so that the next timeslice will
1853 		 * be appropriately reduced.
1854 		 */
1855 		if (qos->rate_limits[i].remaining_this_timeslice > 0) {
1856 			qos->rate_limits[i].remaining_this_timeslice = 0;
1857 		}
1858 	}
1859 
1860 	while (now >= (qos->last_timeslice + qos->timeslice_size)) {
1861 		qos->last_timeslice += qos->timeslice_size;
1862 		for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) {
1863 			qos->rate_limits[i].remaining_this_timeslice +=
1864 				qos->rate_limits[i].max_per_timeslice;
1865 		}
1866 	}
1867 
1868 	return _spdk_bdev_qos_io_submit(qos->ch, qos);
1869 }
1870 
1871 static void
1872 _spdk_bdev_channel_destroy_resource(struct spdk_bdev_channel *ch)
1873 {
1874 	struct spdk_bdev_shared_resource *shared_resource;
1875 
1876 	spdk_put_io_channel(ch->channel);
1877 
1878 	shared_resource = ch->shared_resource;
1879 
1880 	assert(ch->io_outstanding == 0);
1881 	assert(shared_resource->ref > 0);
1882 	shared_resource->ref--;
1883 	if (shared_resource->ref == 0) {
1884 		assert(shared_resource->io_outstanding == 0);
1885 		TAILQ_REMOVE(&shared_resource->mgmt_ch->shared_resources, shared_resource, link);
1886 		spdk_put_io_channel(spdk_io_channel_from_ctx(shared_resource->mgmt_ch));
1887 		free(shared_resource);
1888 	}
1889 }
1890 
1891 /* Caller must hold bdev->internal.mutex. */
1892 static void
1893 _spdk_bdev_enable_qos(struct spdk_bdev *bdev, struct spdk_bdev_channel *ch)
1894 {
1895 	struct spdk_bdev_qos	*qos = bdev->internal.qos;
1896 	int			i;
1897 
1898 	/* Rate limiting on this bdev enabled */
1899 	if (qos) {
1900 		if (qos->ch == NULL) {
1901 			struct spdk_io_channel *io_ch;
1902 
1903 			SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Selecting channel %p as QoS channel for bdev %s on thread %p\n", ch,
1904 				      bdev->name, spdk_get_thread());
1905 
1906 			/* No qos channel has been selected, so set one up */
1907 
1908 			/* Take another reference to ch */
1909 			io_ch = spdk_get_io_channel(__bdev_to_io_dev(bdev));
1910 			assert(io_ch != NULL);
1911 			qos->ch = ch;
1912 
1913 			qos->thread = spdk_io_channel_get_thread(io_ch);
1914 
1915 			TAILQ_INIT(&qos->queued);
1916 
1917 			for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) {
1918 				if (_spdk_bdev_qos_is_iops_rate_limit(i) == true) {
1919 					qos->rate_limits[i].min_per_timeslice =
1920 						SPDK_BDEV_QOS_MIN_IO_PER_TIMESLICE;
1921 				} else {
1922 					qos->rate_limits[i].min_per_timeslice =
1923 						SPDK_BDEV_QOS_MIN_BYTE_PER_TIMESLICE;
1924 				}
1925 
1926 				if (qos->rate_limits[i].limit == 0) {
1927 					qos->rate_limits[i].limit = SPDK_BDEV_QOS_LIMIT_NOT_DEFINED;
1928 				}
1929 			}
1930 			spdk_bdev_qos_update_max_quota_per_timeslice(qos);
1931 			qos->timeslice_size =
1932 				SPDK_BDEV_QOS_TIMESLICE_IN_USEC * spdk_get_ticks_hz() / SPDK_SEC_TO_USEC;
1933 			qos->last_timeslice = spdk_get_ticks();
1934 			qos->poller = spdk_poller_register(spdk_bdev_channel_poll_qos,
1935 							   qos,
1936 							   SPDK_BDEV_QOS_TIMESLICE_IN_USEC);
1937 		}
1938 
1939 		ch->flags |= BDEV_CH_QOS_ENABLED;
1940 	}
1941 }
1942 
1943 static int
1944 spdk_bdev_channel_create(void *io_device, void *ctx_buf)
1945 {
1946 	struct spdk_bdev		*bdev = __bdev_from_io_dev(io_device);
1947 	struct spdk_bdev_channel	*ch = ctx_buf;
1948 	struct spdk_io_channel		*mgmt_io_ch;
1949 	struct spdk_bdev_mgmt_channel	*mgmt_ch;
1950 	struct spdk_bdev_shared_resource *shared_resource;
1951 
1952 	ch->bdev = bdev;
1953 	ch->channel = bdev->fn_table->get_io_channel(bdev->ctxt);
1954 	if (!ch->channel) {
1955 		return -1;
1956 	}
1957 
1958 	assert(ch->histogram == NULL);
1959 	if (bdev->internal.histogram_enabled) {
1960 		ch->histogram = spdk_histogram_data_alloc();
1961 		if (ch->histogram == NULL) {
1962 			SPDK_ERRLOG("Could not allocate histogram\n");
1963 		}
1964 	}
1965 
1966 	mgmt_io_ch = spdk_get_io_channel(&g_bdev_mgr);
1967 	if (!mgmt_io_ch) {
1968 		spdk_put_io_channel(ch->channel);
1969 		return -1;
1970 	}
1971 
1972 	mgmt_ch = spdk_io_channel_get_ctx(mgmt_io_ch);
1973 	TAILQ_FOREACH(shared_resource, &mgmt_ch->shared_resources, link) {
1974 		if (shared_resource->shared_ch == ch->channel) {
1975 			spdk_put_io_channel(mgmt_io_ch);
1976 			shared_resource->ref++;
1977 			break;
1978 		}
1979 	}
1980 
1981 	if (shared_resource == NULL) {
1982 		shared_resource = calloc(1, sizeof(*shared_resource));
1983 		if (shared_resource == NULL) {
1984 			spdk_put_io_channel(ch->channel);
1985 			spdk_put_io_channel(mgmt_io_ch);
1986 			return -1;
1987 		}
1988 
1989 		shared_resource->mgmt_ch = mgmt_ch;
1990 		shared_resource->io_outstanding = 0;
1991 		TAILQ_INIT(&shared_resource->nomem_io);
1992 		shared_resource->nomem_threshold = 0;
1993 		shared_resource->shared_ch = ch->channel;
1994 		shared_resource->ref = 1;
1995 		TAILQ_INSERT_TAIL(&mgmt_ch->shared_resources, shared_resource, link);
1996 	}
1997 
1998 	memset(&ch->stat, 0, sizeof(ch->stat));
1999 	ch->stat.ticks_rate = spdk_get_ticks_hz();
2000 	ch->io_outstanding = 0;
2001 	TAILQ_INIT(&ch->queued_resets);
2002 	ch->flags = 0;
2003 	ch->shared_resource = shared_resource;
2004 
2005 #ifdef SPDK_CONFIG_VTUNE
2006 	{
2007 		char *name;
2008 		__itt_init_ittlib(NULL, 0);
2009 		name = spdk_sprintf_alloc("spdk_bdev_%s_%p", ch->bdev->name, ch);
2010 		if (!name) {
2011 			_spdk_bdev_channel_destroy_resource(ch);
2012 			return -1;
2013 		}
2014 		ch->handle = __itt_string_handle_create(name);
2015 		free(name);
2016 		ch->start_tsc = spdk_get_ticks();
2017 		ch->interval_tsc = spdk_get_ticks_hz() / 100;
2018 		memset(&ch->prev_stat, 0, sizeof(ch->prev_stat));
2019 	}
2020 #endif
2021 
2022 	pthread_mutex_lock(&bdev->internal.mutex);
2023 	_spdk_bdev_enable_qos(bdev, ch);
2024 	pthread_mutex_unlock(&bdev->internal.mutex);
2025 
2026 	return 0;
2027 }
2028 
2029 /*
2030  * Abort I/O that are waiting on a data buffer.  These types of I/O are
2031  *  linked using the spdk_bdev_io internal.buf_link TAILQ_ENTRY.
2032  */
2033 static void
2034 _spdk_bdev_abort_buf_io(bdev_io_stailq_t *queue, struct spdk_bdev_channel *ch)
2035 {
2036 	bdev_io_stailq_t tmp;
2037 	struct spdk_bdev_io *bdev_io;
2038 
2039 	STAILQ_INIT(&tmp);
2040 
2041 	while (!STAILQ_EMPTY(queue)) {
2042 		bdev_io = STAILQ_FIRST(queue);
2043 		STAILQ_REMOVE_HEAD(queue, internal.buf_link);
2044 		if (bdev_io->internal.ch == ch) {
2045 			spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED);
2046 		} else {
2047 			STAILQ_INSERT_TAIL(&tmp, bdev_io, internal.buf_link);
2048 		}
2049 	}
2050 
2051 	STAILQ_SWAP(&tmp, queue, spdk_bdev_io);
2052 }
2053 
2054 /*
2055  * Abort I/O that are queued waiting for submission.  These types of I/O are
2056  *  linked using the spdk_bdev_io link TAILQ_ENTRY.
2057  */
2058 static void
2059 _spdk_bdev_abort_queued_io(bdev_io_tailq_t *queue, struct spdk_bdev_channel *ch)
2060 {
2061 	struct spdk_bdev_io *bdev_io, *tmp;
2062 
2063 	TAILQ_FOREACH_SAFE(bdev_io, queue, internal.link, tmp) {
2064 		if (bdev_io->internal.ch == ch) {
2065 			TAILQ_REMOVE(queue, bdev_io, internal.link);
2066 			/*
2067 			 * spdk_bdev_io_complete() assumes that the completed I/O had
2068 			 *  been submitted to the bdev module.  Since in this case it
2069 			 *  hadn't, bump io_outstanding to account for the decrement
2070 			 *  that spdk_bdev_io_complete() will do.
2071 			 */
2072 			if (bdev_io->type != SPDK_BDEV_IO_TYPE_RESET) {
2073 				ch->io_outstanding++;
2074 				ch->shared_resource->io_outstanding++;
2075 			}
2076 			spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED);
2077 		}
2078 	}
2079 }
2080 
2081 static void
2082 spdk_bdev_qos_channel_destroy(void *cb_arg)
2083 {
2084 	struct spdk_bdev_qos *qos = cb_arg;
2085 
2086 	spdk_put_io_channel(spdk_io_channel_from_ctx(qos->ch));
2087 	spdk_poller_unregister(&qos->poller);
2088 
2089 	SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Free QoS %p.\n", qos);
2090 
2091 	free(qos);
2092 }
2093 
2094 static int
2095 spdk_bdev_qos_destroy(struct spdk_bdev *bdev)
2096 {
2097 	int i;
2098 
2099 	/*
2100 	 * Cleanly shutting down the QoS poller is tricky, because
2101 	 * during the asynchronous operation the user could open
2102 	 * a new descriptor and create a new channel, spawning
2103 	 * a new QoS poller.
2104 	 *
2105 	 * The strategy is to create a new QoS structure here and swap it
2106 	 * in. The shutdown path then continues to refer to the old one
2107 	 * until it completes and then releases it.
2108 	 */
2109 	struct spdk_bdev_qos *new_qos, *old_qos;
2110 
2111 	old_qos = bdev->internal.qos;
2112 
2113 	new_qos = calloc(1, sizeof(*new_qos));
2114 	if (!new_qos) {
2115 		SPDK_ERRLOG("Unable to allocate memory to shut down QoS.\n");
2116 		return -ENOMEM;
2117 	}
2118 
2119 	/* Copy the old QoS data into the newly allocated structure */
2120 	memcpy(new_qos, old_qos, sizeof(*new_qos));
2121 
2122 	/* Zero out the key parts of the QoS structure */
2123 	new_qos->ch = NULL;
2124 	new_qos->thread = NULL;
2125 	new_qos->poller = NULL;
2126 	TAILQ_INIT(&new_qos->queued);
2127 	/*
2128 	 * The limit member of spdk_bdev_qos_limit structure is not zeroed.
2129 	 * It will be used later for the new QoS structure.
2130 	 */
2131 	for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) {
2132 		new_qos->rate_limits[i].remaining_this_timeslice = 0;
2133 		new_qos->rate_limits[i].min_per_timeslice = 0;
2134 		new_qos->rate_limits[i].max_per_timeslice = 0;
2135 	}
2136 
2137 	bdev->internal.qos = new_qos;
2138 
2139 	if (old_qos->thread == NULL) {
2140 		free(old_qos);
2141 	} else {
2142 		spdk_thread_send_msg(old_qos->thread, spdk_bdev_qos_channel_destroy,
2143 				     old_qos);
2144 	}
2145 
2146 	/* It is safe to continue with destroying the bdev even though the QoS channel hasn't
2147 	 * been destroyed yet. The destruction path will end up waiting for the final
2148 	 * channel to be put before it releases resources. */
2149 
2150 	return 0;
2151 }
2152 
2153 static void
2154 _spdk_bdev_io_stat_add(struct spdk_bdev_io_stat *total, struct spdk_bdev_io_stat *add)
2155 {
2156 	total->bytes_read += add->bytes_read;
2157 	total->num_read_ops += add->num_read_ops;
2158 	total->bytes_written += add->bytes_written;
2159 	total->num_write_ops += add->num_write_ops;
2160 	total->bytes_unmapped += add->bytes_unmapped;
2161 	total->num_unmap_ops += add->num_unmap_ops;
2162 	total->read_latency_ticks += add->read_latency_ticks;
2163 	total->write_latency_ticks += add->write_latency_ticks;
2164 	total->unmap_latency_ticks += add->unmap_latency_ticks;
2165 }
2166 
2167 static void
2168 spdk_bdev_channel_destroy(void *io_device, void *ctx_buf)
2169 {
2170 	struct spdk_bdev_channel	*ch = ctx_buf;
2171 	struct spdk_bdev_mgmt_channel	*mgmt_ch;
2172 	struct spdk_bdev_shared_resource *shared_resource = ch->shared_resource;
2173 
2174 	SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Destroying channel %p for bdev %s on thread %p\n", ch, ch->bdev->name,
2175 		      spdk_get_thread());
2176 
2177 	/* This channel is going away, so add its statistics into the bdev so that they don't get lost. */
2178 	pthread_mutex_lock(&ch->bdev->internal.mutex);
2179 	_spdk_bdev_io_stat_add(&ch->bdev->internal.stat, &ch->stat);
2180 	pthread_mutex_unlock(&ch->bdev->internal.mutex);
2181 
2182 	mgmt_ch = shared_resource->mgmt_ch;
2183 
2184 	_spdk_bdev_abort_queued_io(&ch->queued_resets, ch);
2185 	_spdk_bdev_abort_queued_io(&shared_resource->nomem_io, ch);
2186 	_spdk_bdev_abort_buf_io(&mgmt_ch->need_buf_small, ch);
2187 	_spdk_bdev_abort_buf_io(&mgmt_ch->need_buf_large, ch);
2188 
2189 	if (ch->histogram) {
2190 		spdk_histogram_data_free(ch->histogram);
2191 	}
2192 
2193 	_spdk_bdev_channel_destroy_resource(ch);
2194 }
2195 
2196 int
2197 spdk_bdev_alias_add(struct spdk_bdev *bdev, const char *alias)
2198 {
2199 	struct spdk_bdev_alias *tmp;
2200 
2201 	if (alias == NULL) {
2202 		SPDK_ERRLOG("Empty alias passed\n");
2203 		return -EINVAL;
2204 	}
2205 
2206 	if (spdk_bdev_get_by_name(alias)) {
2207 		SPDK_ERRLOG("Bdev name/alias: %s already exists\n", alias);
2208 		return -EEXIST;
2209 	}
2210 
2211 	tmp = calloc(1, sizeof(*tmp));
2212 	if (tmp == NULL) {
2213 		SPDK_ERRLOG("Unable to allocate alias\n");
2214 		return -ENOMEM;
2215 	}
2216 
2217 	tmp->alias = strdup(alias);
2218 	if (tmp->alias == NULL) {
2219 		free(tmp);
2220 		SPDK_ERRLOG("Unable to allocate alias\n");
2221 		return -ENOMEM;
2222 	}
2223 
2224 	TAILQ_INSERT_TAIL(&bdev->aliases, tmp, tailq);
2225 
2226 	return 0;
2227 }
2228 
2229 int
2230 spdk_bdev_alias_del(struct spdk_bdev *bdev, const char *alias)
2231 {
2232 	struct spdk_bdev_alias *tmp;
2233 
2234 	TAILQ_FOREACH(tmp, &bdev->aliases, tailq) {
2235 		if (strcmp(alias, tmp->alias) == 0) {
2236 			TAILQ_REMOVE(&bdev->aliases, tmp, tailq);
2237 			free(tmp->alias);
2238 			free(tmp);
2239 			return 0;
2240 		}
2241 	}
2242 
2243 	SPDK_INFOLOG(SPDK_LOG_BDEV, "Alias %s does not exists\n", alias);
2244 
2245 	return -ENOENT;
2246 }
2247 
2248 void
2249 spdk_bdev_alias_del_all(struct spdk_bdev *bdev)
2250 {
2251 	struct spdk_bdev_alias *p, *tmp;
2252 
2253 	TAILQ_FOREACH_SAFE(p, &bdev->aliases, tailq, tmp) {
2254 		TAILQ_REMOVE(&bdev->aliases, p, tailq);
2255 		free(p->alias);
2256 		free(p);
2257 	}
2258 }
2259 
2260 struct spdk_io_channel *
2261 spdk_bdev_get_io_channel(struct spdk_bdev_desc *desc)
2262 {
2263 	return spdk_get_io_channel(__bdev_to_io_dev(desc->bdev));
2264 }
2265 
2266 const char *
2267 spdk_bdev_get_name(const struct spdk_bdev *bdev)
2268 {
2269 	return bdev->name;
2270 }
2271 
2272 const char *
2273 spdk_bdev_get_product_name(const struct spdk_bdev *bdev)
2274 {
2275 	return bdev->product_name;
2276 }
2277 
2278 const struct spdk_bdev_aliases_list *
2279 spdk_bdev_get_aliases(const struct spdk_bdev *bdev)
2280 {
2281 	return &bdev->aliases;
2282 }
2283 
2284 uint32_t
2285 spdk_bdev_get_block_size(const struct spdk_bdev *bdev)
2286 {
2287 	return bdev->blocklen;
2288 }
2289 
2290 uint64_t
2291 spdk_bdev_get_num_blocks(const struct spdk_bdev *bdev)
2292 {
2293 	return bdev->blockcnt;
2294 }
2295 
2296 const char *
2297 spdk_bdev_get_qos_rpc_type(enum spdk_bdev_qos_rate_limit_type type)
2298 {
2299 	return qos_rpc_type[type];
2300 }
2301 
2302 void
2303 spdk_bdev_get_qos_rate_limits(struct spdk_bdev *bdev, uint64_t *limits)
2304 {
2305 	int i;
2306 
2307 	memset(limits, 0, sizeof(*limits) * SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES);
2308 
2309 	pthread_mutex_lock(&bdev->internal.mutex);
2310 	if (bdev->internal.qos) {
2311 		for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) {
2312 			if (bdev->internal.qos->rate_limits[i].limit !=
2313 			    SPDK_BDEV_QOS_LIMIT_NOT_DEFINED) {
2314 				limits[i] = bdev->internal.qos->rate_limits[i].limit;
2315 				if (_spdk_bdev_qos_is_iops_rate_limit(i) == false) {
2316 					/* Change from Byte to Megabyte which is user visible. */
2317 					limits[i] = limits[i] / 1024 / 1024;
2318 				}
2319 			}
2320 		}
2321 	}
2322 	pthread_mutex_unlock(&bdev->internal.mutex);
2323 }
2324 
2325 size_t
2326 spdk_bdev_get_buf_align(const struct spdk_bdev *bdev)
2327 {
2328 	return 1 << bdev->required_alignment;
2329 }
2330 
2331 uint32_t
2332 spdk_bdev_get_optimal_io_boundary(const struct spdk_bdev *bdev)
2333 {
2334 	return bdev->optimal_io_boundary;
2335 }
2336 
2337 bool
2338 spdk_bdev_has_write_cache(const struct spdk_bdev *bdev)
2339 {
2340 	return bdev->write_cache;
2341 }
2342 
2343 const struct spdk_uuid *
2344 spdk_bdev_get_uuid(const struct spdk_bdev *bdev)
2345 {
2346 	return &bdev->uuid;
2347 }
2348 
2349 uint32_t
2350 spdk_bdev_get_md_size(const struct spdk_bdev *bdev)
2351 {
2352 	return bdev->md_len;
2353 }
2354 
2355 bool
2356 spdk_bdev_is_md_interleaved(const struct spdk_bdev *bdev)
2357 {
2358 	return (bdev->md_len != 0) && bdev->md_interleave;
2359 }
2360 
2361 uint32_t
2362 spdk_bdev_get_data_block_size(const struct spdk_bdev *bdev)
2363 {
2364 	if (spdk_bdev_is_md_interleaved(bdev)) {
2365 		return bdev->blocklen - bdev->md_len;
2366 	} else {
2367 		return bdev->blocklen;
2368 	}
2369 }
2370 
2371 enum spdk_dif_type spdk_bdev_get_dif_type(const struct spdk_bdev *bdev)
2372 {
2373 	if (bdev->md_len != 0) {
2374 		return bdev->dif_type;
2375 	} else {
2376 		return SPDK_DIF_DISABLE;
2377 	}
2378 }
2379 
2380 bool
2381 spdk_bdev_is_dif_head_of_md(const struct spdk_bdev *bdev)
2382 {
2383 	if (spdk_bdev_get_dif_type(bdev) != SPDK_DIF_DISABLE) {
2384 		return bdev->dif_is_head_of_md;
2385 	} else {
2386 		return false;
2387 	}
2388 }
2389 
2390 bool
2391 spdk_bdev_is_dif_check_enabled(const struct spdk_bdev *bdev,
2392 			       enum spdk_dif_check_type check_type)
2393 {
2394 	if (spdk_bdev_get_dif_type(bdev) == SPDK_DIF_DISABLE) {
2395 		return false;
2396 	}
2397 
2398 	switch (check_type) {
2399 	case SPDK_DIF_CHECK_TYPE_REFTAG:
2400 		return (bdev->dif_check_flags & SPDK_DIF_FLAGS_REFTAG_CHECK) != 0;
2401 	case SPDK_DIF_CHECK_TYPE_APPTAG:
2402 		return (bdev->dif_check_flags & SPDK_DIF_FLAGS_APPTAG_CHECK) != 0;
2403 	case SPDK_DIF_CHECK_TYPE_GUARD:
2404 		return (bdev->dif_check_flags & SPDK_DIF_FLAGS_GUARD_CHECK) != 0;
2405 	default:
2406 		return false;
2407 	}
2408 }
2409 
2410 uint64_t
2411 spdk_bdev_get_qd(const struct spdk_bdev *bdev)
2412 {
2413 	return bdev->internal.measured_queue_depth;
2414 }
2415 
2416 uint64_t
2417 spdk_bdev_get_qd_sampling_period(const struct spdk_bdev *bdev)
2418 {
2419 	return bdev->internal.period;
2420 }
2421 
2422 uint64_t
2423 spdk_bdev_get_weighted_io_time(const struct spdk_bdev *bdev)
2424 {
2425 	return bdev->internal.weighted_io_time;
2426 }
2427 
2428 uint64_t
2429 spdk_bdev_get_io_time(const struct spdk_bdev *bdev)
2430 {
2431 	return bdev->internal.io_time;
2432 }
2433 
2434 static void
2435 _calculate_measured_qd_cpl(struct spdk_io_channel_iter *i, int status)
2436 {
2437 	struct spdk_bdev *bdev = spdk_io_channel_iter_get_ctx(i);
2438 
2439 	bdev->internal.measured_queue_depth = bdev->internal.temporary_queue_depth;
2440 
2441 	if (bdev->internal.measured_queue_depth) {
2442 		bdev->internal.io_time += bdev->internal.period;
2443 		bdev->internal.weighted_io_time += bdev->internal.period * bdev->internal.measured_queue_depth;
2444 	}
2445 }
2446 
2447 static void
2448 _calculate_measured_qd(struct spdk_io_channel_iter *i)
2449 {
2450 	struct spdk_bdev *bdev = spdk_io_channel_iter_get_ctx(i);
2451 	struct spdk_io_channel *io_ch = spdk_io_channel_iter_get_channel(i);
2452 	struct spdk_bdev_channel *ch = spdk_io_channel_get_ctx(io_ch);
2453 
2454 	bdev->internal.temporary_queue_depth += ch->io_outstanding;
2455 	spdk_for_each_channel_continue(i, 0);
2456 }
2457 
2458 static int
2459 spdk_bdev_calculate_measured_queue_depth(void *ctx)
2460 {
2461 	struct spdk_bdev *bdev = ctx;
2462 	bdev->internal.temporary_queue_depth = 0;
2463 	spdk_for_each_channel(__bdev_to_io_dev(bdev), _calculate_measured_qd, bdev,
2464 			      _calculate_measured_qd_cpl);
2465 	return 0;
2466 }
2467 
2468 void
2469 spdk_bdev_set_qd_sampling_period(struct spdk_bdev *bdev, uint64_t period)
2470 {
2471 	bdev->internal.period = period;
2472 
2473 	if (bdev->internal.qd_poller != NULL) {
2474 		spdk_poller_unregister(&bdev->internal.qd_poller);
2475 		bdev->internal.measured_queue_depth = UINT64_MAX;
2476 	}
2477 
2478 	if (period != 0) {
2479 		bdev->internal.qd_poller = spdk_poller_register(spdk_bdev_calculate_measured_queue_depth, bdev,
2480 					   period);
2481 	}
2482 }
2483 
2484 int
2485 spdk_bdev_notify_blockcnt_change(struct spdk_bdev *bdev, uint64_t size)
2486 {
2487 	int ret;
2488 
2489 	pthread_mutex_lock(&bdev->internal.mutex);
2490 
2491 	/* bdev has open descriptors */
2492 	if (!TAILQ_EMPTY(&bdev->internal.open_descs) &&
2493 	    bdev->blockcnt > size) {
2494 		ret = -EBUSY;
2495 	} else {
2496 		bdev->blockcnt = size;
2497 		ret = 0;
2498 	}
2499 
2500 	pthread_mutex_unlock(&bdev->internal.mutex);
2501 
2502 	return ret;
2503 }
2504 
2505 /*
2506  * Convert I/O offset and length from bytes to blocks.
2507  *
2508  * Returns zero on success or non-zero if the byte parameters aren't divisible by the block size.
2509  */
2510 static uint64_t
2511 spdk_bdev_bytes_to_blocks(struct spdk_bdev *bdev, uint64_t offset_bytes, uint64_t *offset_blocks,
2512 			  uint64_t num_bytes, uint64_t *num_blocks)
2513 {
2514 	uint32_t block_size = bdev->blocklen;
2515 	uint8_t shift_cnt;
2516 
2517 	/* Avoid expensive div operations if possible. These spdk_u32 functions are very cheap. */
2518 	if (spdk_likely(spdk_u32_is_pow2(block_size))) {
2519 		shift_cnt = spdk_u32log2(block_size);
2520 		*offset_blocks = offset_bytes >> shift_cnt;
2521 		*num_blocks = num_bytes >> shift_cnt;
2522 		return (offset_bytes - (*offset_blocks << shift_cnt)) |
2523 		       (num_bytes - (*num_blocks << shift_cnt));
2524 	} else {
2525 		*offset_blocks = offset_bytes / block_size;
2526 		*num_blocks = num_bytes / block_size;
2527 		return (offset_bytes % block_size) | (num_bytes % block_size);
2528 	}
2529 }
2530 
2531 static bool
2532 spdk_bdev_io_valid_blocks(struct spdk_bdev *bdev, uint64_t offset_blocks, uint64_t num_blocks)
2533 {
2534 	/* Return failure if offset_blocks + num_blocks is less than offset_blocks; indicates there
2535 	 * has been an overflow and hence the offset has been wrapped around */
2536 	if (offset_blocks + num_blocks < offset_blocks) {
2537 		return false;
2538 	}
2539 
2540 	/* Return failure if offset_blocks + num_blocks exceeds the size of the bdev */
2541 	if (offset_blocks + num_blocks > bdev->blockcnt) {
2542 		return false;
2543 	}
2544 
2545 	return true;
2546 }
2547 
2548 int
2549 spdk_bdev_read(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
2550 	       void *buf, uint64_t offset, uint64_t nbytes,
2551 	       spdk_bdev_io_completion_cb cb, void *cb_arg)
2552 {
2553 	uint64_t offset_blocks, num_blocks;
2554 
2555 	if (spdk_bdev_bytes_to_blocks(desc->bdev, offset, &offset_blocks, nbytes, &num_blocks) != 0) {
2556 		return -EINVAL;
2557 	}
2558 
2559 	return spdk_bdev_read_blocks(desc, ch, buf, offset_blocks, num_blocks, cb, cb_arg);
2560 }
2561 
2562 int
2563 spdk_bdev_read_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
2564 		      void *buf, uint64_t offset_blocks, uint64_t num_blocks,
2565 		      spdk_bdev_io_completion_cb cb, void *cb_arg)
2566 {
2567 	struct spdk_bdev *bdev = desc->bdev;
2568 	struct spdk_bdev_io *bdev_io;
2569 	struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch);
2570 
2571 	if (!spdk_bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) {
2572 		return -EINVAL;
2573 	}
2574 
2575 	bdev_io = spdk_bdev_get_io(channel);
2576 	if (!bdev_io) {
2577 		return -ENOMEM;
2578 	}
2579 
2580 	bdev_io->internal.ch = channel;
2581 	bdev_io->internal.desc = desc;
2582 	bdev_io->type = SPDK_BDEV_IO_TYPE_READ;
2583 	bdev_io->u.bdev.iovs = &bdev_io->iov;
2584 	bdev_io->u.bdev.iovs[0].iov_base = buf;
2585 	bdev_io->u.bdev.iovs[0].iov_len = num_blocks * bdev->blocklen;
2586 	bdev_io->u.bdev.iovcnt = 1;
2587 	bdev_io->u.bdev.num_blocks = num_blocks;
2588 	bdev_io->u.bdev.offset_blocks = offset_blocks;
2589 	spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb);
2590 
2591 	spdk_bdev_io_submit(bdev_io);
2592 	return 0;
2593 }
2594 
2595 int
2596 spdk_bdev_readv(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
2597 		struct iovec *iov, int iovcnt,
2598 		uint64_t offset, uint64_t nbytes,
2599 		spdk_bdev_io_completion_cb cb, void *cb_arg)
2600 {
2601 	uint64_t offset_blocks, num_blocks;
2602 
2603 	if (spdk_bdev_bytes_to_blocks(desc->bdev, offset, &offset_blocks, nbytes, &num_blocks) != 0) {
2604 		return -EINVAL;
2605 	}
2606 
2607 	return spdk_bdev_readv_blocks(desc, ch, iov, iovcnt, offset_blocks, num_blocks, cb, cb_arg);
2608 }
2609 
2610 int spdk_bdev_readv_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
2611 			   struct iovec *iov, int iovcnt,
2612 			   uint64_t offset_blocks, uint64_t num_blocks,
2613 			   spdk_bdev_io_completion_cb cb, void *cb_arg)
2614 {
2615 	struct spdk_bdev *bdev = desc->bdev;
2616 	struct spdk_bdev_io *bdev_io;
2617 	struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch);
2618 
2619 	if (!spdk_bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) {
2620 		return -EINVAL;
2621 	}
2622 
2623 	bdev_io = spdk_bdev_get_io(channel);
2624 	if (!bdev_io) {
2625 		return -ENOMEM;
2626 	}
2627 
2628 	bdev_io->internal.ch = channel;
2629 	bdev_io->internal.desc = desc;
2630 	bdev_io->type = SPDK_BDEV_IO_TYPE_READ;
2631 	bdev_io->u.bdev.iovs = iov;
2632 	bdev_io->u.bdev.iovcnt = iovcnt;
2633 	bdev_io->u.bdev.num_blocks = num_blocks;
2634 	bdev_io->u.bdev.offset_blocks = offset_blocks;
2635 	spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb);
2636 
2637 	spdk_bdev_io_submit(bdev_io);
2638 	return 0;
2639 }
2640 
2641 int
2642 spdk_bdev_write(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
2643 		void *buf, uint64_t offset, uint64_t nbytes,
2644 		spdk_bdev_io_completion_cb cb, void *cb_arg)
2645 {
2646 	uint64_t offset_blocks, num_blocks;
2647 
2648 	if (spdk_bdev_bytes_to_blocks(desc->bdev, offset, &offset_blocks, nbytes, &num_blocks) != 0) {
2649 		return -EINVAL;
2650 	}
2651 
2652 	return spdk_bdev_write_blocks(desc, ch, buf, offset_blocks, num_blocks, cb, cb_arg);
2653 }
2654 
2655 int
2656 spdk_bdev_write_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
2657 		       void *buf, uint64_t offset_blocks, uint64_t num_blocks,
2658 		       spdk_bdev_io_completion_cb cb, void *cb_arg)
2659 {
2660 	struct spdk_bdev *bdev = desc->bdev;
2661 	struct spdk_bdev_io *bdev_io;
2662 	struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch);
2663 
2664 	if (!desc->write) {
2665 		return -EBADF;
2666 	}
2667 
2668 	if (!spdk_bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) {
2669 		return -EINVAL;
2670 	}
2671 
2672 	bdev_io = spdk_bdev_get_io(channel);
2673 	if (!bdev_io) {
2674 		return -ENOMEM;
2675 	}
2676 
2677 	bdev_io->internal.ch = channel;
2678 	bdev_io->internal.desc = desc;
2679 	bdev_io->type = SPDK_BDEV_IO_TYPE_WRITE;
2680 	bdev_io->u.bdev.iovs = &bdev_io->iov;
2681 	bdev_io->u.bdev.iovs[0].iov_base = buf;
2682 	bdev_io->u.bdev.iovs[0].iov_len = num_blocks * bdev->blocklen;
2683 	bdev_io->u.bdev.iovcnt = 1;
2684 	bdev_io->u.bdev.num_blocks = num_blocks;
2685 	bdev_io->u.bdev.offset_blocks = offset_blocks;
2686 	spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb);
2687 
2688 	spdk_bdev_io_submit(bdev_io);
2689 	return 0;
2690 }
2691 
2692 int
2693 spdk_bdev_writev(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
2694 		 struct iovec *iov, int iovcnt,
2695 		 uint64_t offset, uint64_t len,
2696 		 spdk_bdev_io_completion_cb cb, void *cb_arg)
2697 {
2698 	uint64_t offset_blocks, num_blocks;
2699 
2700 	if (spdk_bdev_bytes_to_blocks(desc->bdev, offset, &offset_blocks, len, &num_blocks) != 0) {
2701 		return -EINVAL;
2702 	}
2703 
2704 	return spdk_bdev_writev_blocks(desc, ch, iov, iovcnt, offset_blocks, num_blocks, cb, cb_arg);
2705 }
2706 
2707 int
2708 spdk_bdev_writev_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
2709 			struct iovec *iov, int iovcnt,
2710 			uint64_t offset_blocks, uint64_t num_blocks,
2711 			spdk_bdev_io_completion_cb cb, void *cb_arg)
2712 {
2713 	struct spdk_bdev *bdev = desc->bdev;
2714 	struct spdk_bdev_io *bdev_io;
2715 	struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch);
2716 
2717 	if (!desc->write) {
2718 		return -EBADF;
2719 	}
2720 
2721 	if (!spdk_bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) {
2722 		return -EINVAL;
2723 	}
2724 
2725 	bdev_io = spdk_bdev_get_io(channel);
2726 	if (!bdev_io) {
2727 		return -ENOMEM;
2728 	}
2729 
2730 	bdev_io->internal.ch = channel;
2731 	bdev_io->internal.desc = desc;
2732 	bdev_io->type = SPDK_BDEV_IO_TYPE_WRITE;
2733 	bdev_io->u.bdev.iovs = iov;
2734 	bdev_io->u.bdev.iovcnt = iovcnt;
2735 	bdev_io->u.bdev.num_blocks = num_blocks;
2736 	bdev_io->u.bdev.offset_blocks = offset_blocks;
2737 	spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb);
2738 
2739 	spdk_bdev_io_submit(bdev_io);
2740 	return 0;
2741 }
2742 
2743 int
2744 spdk_bdev_write_zeroes(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
2745 		       uint64_t offset, uint64_t len,
2746 		       spdk_bdev_io_completion_cb cb, void *cb_arg)
2747 {
2748 	uint64_t offset_blocks, num_blocks;
2749 
2750 	if (spdk_bdev_bytes_to_blocks(desc->bdev, offset, &offset_blocks, len, &num_blocks) != 0) {
2751 		return -EINVAL;
2752 	}
2753 
2754 	return spdk_bdev_write_zeroes_blocks(desc, ch, offset_blocks, num_blocks, cb, cb_arg);
2755 }
2756 
2757 int
2758 spdk_bdev_write_zeroes_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
2759 			      uint64_t offset_blocks, uint64_t num_blocks,
2760 			      spdk_bdev_io_completion_cb cb, void *cb_arg)
2761 {
2762 	struct spdk_bdev *bdev = desc->bdev;
2763 	struct spdk_bdev_io *bdev_io;
2764 	struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch);
2765 
2766 	if (!desc->write) {
2767 		return -EBADF;
2768 	}
2769 
2770 	if (!spdk_bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) {
2771 		return -EINVAL;
2772 	}
2773 
2774 	bdev_io = spdk_bdev_get_io(channel);
2775 
2776 	if (!bdev_io) {
2777 		return -ENOMEM;
2778 	}
2779 
2780 	bdev_io->type = SPDK_BDEV_IO_TYPE_WRITE_ZEROES;
2781 	bdev_io->internal.ch = channel;
2782 	bdev_io->internal.desc = desc;
2783 	bdev_io->u.bdev.offset_blocks = offset_blocks;
2784 	bdev_io->u.bdev.num_blocks = num_blocks;
2785 	spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb);
2786 
2787 	if (_spdk_bdev_io_type_supported(bdev, SPDK_BDEV_IO_TYPE_WRITE_ZEROES)) {
2788 		spdk_bdev_io_submit(bdev_io);
2789 		return 0;
2790 	} else if (_spdk_bdev_io_type_supported(bdev, SPDK_BDEV_IO_TYPE_WRITE)) {
2791 		assert(spdk_bdev_get_block_size(bdev) <= ZERO_BUFFER_SIZE);
2792 		bdev_io->u.bdev.split_remaining_num_blocks = num_blocks;
2793 		bdev_io->u.bdev.split_current_offset_blocks = offset_blocks;
2794 		_spdk_bdev_write_zero_buffer_next(bdev_io);
2795 		return 0;
2796 	} else {
2797 		spdk_bdev_free_io(bdev_io);
2798 		return -ENOTSUP;
2799 	}
2800 }
2801 
2802 int
2803 spdk_bdev_unmap(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
2804 		uint64_t offset, uint64_t nbytes,
2805 		spdk_bdev_io_completion_cb cb, void *cb_arg)
2806 {
2807 	uint64_t offset_blocks, num_blocks;
2808 
2809 	if (spdk_bdev_bytes_to_blocks(desc->bdev, offset, &offset_blocks, nbytes, &num_blocks) != 0) {
2810 		return -EINVAL;
2811 	}
2812 
2813 	return spdk_bdev_unmap_blocks(desc, ch, offset_blocks, num_blocks, cb, cb_arg);
2814 }
2815 
2816 int
2817 spdk_bdev_unmap_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
2818 		       uint64_t offset_blocks, uint64_t num_blocks,
2819 		       spdk_bdev_io_completion_cb cb, void *cb_arg)
2820 {
2821 	struct spdk_bdev *bdev = desc->bdev;
2822 	struct spdk_bdev_io *bdev_io;
2823 	struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch);
2824 
2825 	if (!desc->write) {
2826 		return -EBADF;
2827 	}
2828 
2829 	if (!spdk_bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) {
2830 		return -EINVAL;
2831 	}
2832 
2833 	if (num_blocks == 0) {
2834 		SPDK_ERRLOG("Can't unmap 0 bytes\n");
2835 		return -EINVAL;
2836 	}
2837 
2838 	bdev_io = spdk_bdev_get_io(channel);
2839 	if (!bdev_io) {
2840 		return -ENOMEM;
2841 	}
2842 
2843 	bdev_io->internal.ch = channel;
2844 	bdev_io->internal.desc = desc;
2845 	bdev_io->type = SPDK_BDEV_IO_TYPE_UNMAP;
2846 
2847 	bdev_io->u.bdev.iovs = &bdev_io->iov;
2848 	bdev_io->u.bdev.iovs[0].iov_base = NULL;
2849 	bdev_io->u.bdev.iovs[0].iov_len = 0;
2850 	bdev_io->u.bdev.iovcnt = 1;
2851 
2852 	bdev_io->u.bdev.offset_blocks = offset_blocks;
2853 	bdev_io->u.bdev.num_blocks = num_blocks;
2854 	spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb);
2855 
2856 	spdk_bdev_io_submit(bdev_io);
2857 	return 0;
2858 }
2859 
2860 int
2861 spdk_bdev_flush(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
2862 		uint64_t offset, uint64_t length,
2863 		spdk_bdev_io_completion_cb cb, void *cb_arg)
2864 {
2865 	uint64_t offset_blocks, num_blocks;
2866 
2867 	if (spdk_bdev_bytes_to_blocks(desc->bdev, offset, &offset_blocks, length, &num_blocks) != 0) {
2868 		return -EINVAL;
2869 	}
2870 
2871 	return spdk_bdev_flush_blocks(desc, ch, offset_blocks, num_blocks, cb, cb_arg);
2872 }
2873 
2874 int
2875 spdk_bdev_flush_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
2876 		       uint64_t offset_blocks, uint64_t num_blocks,
2877 		       spdk_bdev_io_completion_cb cb, void *cb_arg)
2878 {
2879 	struct spdk_bdev *bdev = desc->bdev;
2880 	struct spdk_bdev_io *bdev_io;
2881 	struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch);
2882 
2883 	if (!desc->write) {
2884 		return -EBADF;
2885 	}
2886 
2887 	if (!spdk_bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) {
2888 		return -EINVAL;
2889 	}
2890 
2891 	bdev_io = spdk_bdev_get_io(channel);
2892 	if (!bdev_io) {
2893 		return -ENOMEM;
2894 	}
2895 
2896 	bdev_io->internal.ch = channel;
2897 	bdev_io->internal.desc = desc;
2898 	bdev_io->type = SPDK_BDEV_IO_TYPE_FLUSH;
2899 	bdev_io->u.bdev.iovs = NULL;
2900 	bdev_io->u.bdev.iovcnt = 0;
2901 	bdev_io->u.bdev.offset_blocks = offset_blocks;
2902 	bdev_io->u.bdev.num_blocks = num_blocks;
2903 	spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb);
2904 
2905 	spdk_bdev_io_submit(bdev_io);
2906 	return 0;
2907 }
2908 
2909 static void
2910 _spdk_bdev_reset_dev(struct spdk_io_channel_iter *i, int status)
2911 {
2912 	struct spdk_bdev_channel *ch = spdk_io_channel_iter_get_ctx(i);
2913 	struct spdk_bdev_io *bdev_io;
2914 
2915 	bdev_io = TAILQ_FIRST(&ch->queued_resets);
2916 	TAILQ_REMOVE(&ch->queued_resets, bdev_io, internal.link);
2917 	spdk_bdev_io_submit_reset(bdev_io);
2918 }
2919 
2920 static void
2921 _spdk_bdev_reset_freeze_channel(struct spdk_io_channel_iter *i)
2922 {
2923 	struct spdk_io_channel		*ch;
2924 	struct spdk_bdev_channel	*channel;
2925 	struct spdk_bdev_mgmt_channel	*mgmt_channel;
2926 	struct spdk_bdev_shared_resource *shared_resource;
2927 	bdev_io_tailq_t			tmp_queued;
2928 
2929 	TAILQ_INIT(&tmp_queued);
2930 
2931 	ch = spdk_io_channel_iter_get_channel(i);
2932 	channel = spdk_io_channel_get_ctx(ch);
2933 	shared_resource = channel->shared_resource;
2934 	mgmt_channel = shared_resource->mgmt_ch;
2935 
2936 	channel->flags |= BDEV_CH_RESET_IN_PROGRESS;
2937 
2938 	if ((channel->flags & BDEV_CH_QOS_ENABLED) != 0) {
2939 		/* The QoS object is always valid and readable while
2940 		 * the channel flag is set, so the lock here should not
2941 		 * be necessary. We're not in the fast path though, so
2942 		 * just take it anyway. */
2943 		pthread_mutex_lock(&channel->bdev->internal.mutex);
2944 		if (channel->bdev->internal.qos->ch == channel) {
2945 			TAILQ_SWAP(&channel->bdev->internal.qos->queued, &tmp_queued, spdk_bdev_io, internal.link);
2946 		}
2947 		pthread_mutex_unlock(&channel->bdev->internal.mutex);
2948 	}
2949 
2950 	_spdk_bdev_abort_queued_io(&shared_resource->nomem_io, channel);
2951 	_spdk_bdev_abort_buf_io(&mgmt_channel->need_buf_small, channel);
2952 	_spdk_bdev_abort_buf_io(&mgmt_channel->need_buf_large, channel);
2953 	_spdk_bdev_abort_queued_io(&tmp_queued, channel);
2954 
2955 	spdk_for_each_channel_continue(i, 0);
2956 }
2957 
2958 static void
2959 _spdk_bdev_start_reset(void *ctx)
2960 {
2961 	struct spdk_bdev_channel *ch = ctx;
2962 
2963 	spdk_for_each_channel(__bdev_to_io_dev(ch->bdev), _spdk_bdev_reset_freeze_channel,
2964 			      ch, _spdk_bdev_reset_dev);
2965 }
2966 
2967 static void
2968 _spdk_bdev_channel_start_reset(struct spdk_bdev_channel *ch)
2969 {
2970 	struct spdk_bdev *bdev = ch->bdev;
2971 
2972 	assert(!TAILQ_EMPTY(&ch->queued_resets));
2973 
2974 	pthread_mutex_lock(&bdev->internal.mutex);
2975 	if (bdev->internal.reset_in_progress == NULL) {
2976 		bdev->internal.reset_in_progress = TAILQ_FIRST(&ch->queued_resets);
2977 		/*
2978 		 * Take a channel reference for the target bdev for the life of this
2979 		 *  reset.  This guards against the channel getting destroyed while
2980 		 *  spdk_for_each_channel() calls related to this reset IO are in
2981 		 *  progress.  We will release the reference when this reset is
2982 		 *  completed.
2983 		 */
2984 		bdev->internal.reset_in_progress->u.reset.ch_ref = spdk_get_io_channel(__bdev_to_io_dev(bdev));
2985 		_spdk_bdev_start_reset(ch);
2986 	}
2987 	pthread_mutex_unlock(&bdev->internal.mutex);
2988 }
2989 
2990 int
2991 spdk_bdev_reset(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
2992 		spdk_bdev_io_completion_cb cb, void *cb_arg)
2993 {
2994 	struct spdk_bdev *bdev = desc->bdev;
2995 	struct spdk_bdev_io *bdev_io;
2996 	struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch);
2997 
2998 	bdev_io = spdk_bdev_get_io(channel);
2999 	if (!bdev_io) {
3000 		return -ENOMEM;
3001 	}
3002 
3003 	bdev_io->internal.ch = channel;
3004 	bdev_io->internal.desc = desc;
3005 	bdev_io->type = SPDK_BDEV_IO_TYPE_RESET;
3006 	bdev_io->u.reset.ch_ref = NULL;
3007 	spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb);
3008 
3009 	pthread_mutex_lock(&bdev->internal.mutex);
3010 	TAILQ_INSERT_TAIL(&channel->queued_resets, bdev_io, internal.link);
3011 	pthread_mutex_unlock(&bdev->internal.mutex);
3012 
3013 	_spdk_bdev_channel_start_reset(channel);
3014 
3015 	return 0;
3016 }
3017 
3018 void
3019 spdk_bdev_get_io_stat(struct spdk_bdev *bdev, struct spdk_io_channel *ch,
3020 		      struct spdk_bdev_io_stat *stat)
3021 {
3022 	struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch);
3023 
3024 	*stat = channel->stat;
3025 }
3026 
3027 static void
3028 _spdk_bdev_get_device_stat_done(struct spdk_io_channel_iter *i, int status)
3029 {
3030 	void *io_device = spdk_io_channel_iter_get_io_device(i);
3031 	struct spdk_bdev_iostat_ctx *bdev_iostat_ctx = spdk_io_channel_iter_get_ctx(i);
3032 
3033 	bdev_iostat_ctx->cb(__bdev_from_io_dev(io_device), bdev_iostat_ctx->stat,
3034 			    bdev_iostat_ctx->cb_arg, 0);
3035 	free(bdev_iostat_ctx);
3036 }
3037 
3038 static void
3039 _spdk_bdev_get_each_channel_stat(struct spdk_io_channel_iter *i)
3040 {
3041 	struct spdk_bdev_iostat_ctx *bdev_iostat_ctx = spdk_io_channel_iter_get_ctx(i);
3042 	struct spdk_io_channel *ch = spdk_io_channel_iter_get_channel(i);
3043 	struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch);
3044 
3045 	_spdk_bdev_io_stat_add(bdev_iostat_ctx->stat, &channel->stat);
3046 	spdk_for_each_channel_continue(i, 0);
3047 }
3048 
3049 void
3050 spdk_bdev_get_device_stat(struct spdk_bdev *bdev, struct spdk_bdev_io_stat *stat,
3051 			  spdk_bdev_get_device_stat_cb cb, void *cb_arg)
3052 {
3053 	struct spdk_bdev_iostat_ctx *bdev_iostat_ctx;
3054 
3055 	assert(bdev != NULL);
3056 	assert(stat != NULL);
3057 	assert(cb != NULL);
3058 
3059 	bdev_iostat_ctx = calloc(1, sizeof(struct spdk_bdev_iostat_ctx));
3060 	if (bdev_iostat_ctx == NULL) {
3061 		SPDK_ERRLOG("Unable to allocate memory for spdk_bdev_iostat_ctx\n");
3062 		cb(bdev, stat, cb_arg, -ENOMEM);
3063 		return;
3064 	}
3065 
3066 	bdev_iostat_ctx->stat = stat;
3067 	bdev_iostat_ctx->cb = cb;
3068 	bdev_iostat_ctx->cb_arg = cb_arg;
3069 
3070 	/* Start with the statistics from previously deleted channels. */
3071 	pthread_mutex_lock(&bdev->internal.mutex);
3072 	_spdk_bdev_io_stat_add(bdev_iostat_ctx->stat, &bdev->internal.stat);
3073 	pthread_mutex_unlock(&bdev->internal.mutex);
3074 
3075 	/* Then iterate and add the statistics from each existing channel. */
3076 	spdk_for_each_channel(__bdev_to_io_dev(bdev),
3077 			      _spdk_bdev_get_each_channel_stat,
3078 			      bdev_iostat_ctx,
3079 			      _spdk_bdev_get_device_stat_done);
3080 }
3081 
3082 int
3083 spdk_bdev_nvme_admin_passthru(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
3084 			      const struct spdk_nvme_cmd *cmd, void *buf, size_t nbytes,
3085 			      spdk_bdev_io_completion_cb cb, void *cb_arg)
3086 {
3087 	struct spdk_bdev *bdev = desc->bdev;
3088 	struct spdk_bdev_io *bdev_io;
3089 	struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch);
3090 
3091 	if (!desc->write) {
3092 		return -EBADF;
3093 	}
3094 
3095 	bdev_io = spdk_bdev_get_io(channel);
3096 	if (!bdev_io) {
3097 		return -ENOMEM;
3098 	}
3099 
3100 	bdev_io->internal.ch = channel;
3101 	bdev_io->internal.desc = desc;
3102 	bdev_io->type = SPDK_BDEV_IO_TYPE_NVME_ADMIN;
3103 	bdev_io->u.nvme_passthru.cmd = *cmd;
3104 	bdev_io->u.nvme_passthru.buf = buf;
3105 	bdev_io->u.nvme_passthru.nbytes = nbytes;
3106 	bdev_io->u.nvme_passthru.md_buf = NULL;
3107 	bdev_io->u.nvme_passthru.md_len = 0;
3108 
3109 	spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb);
3110 
3111 	spdk_bdev_io_submit(bdev_io);
3112 	return 0;
3113 }
3114 
3115 int
3116 spdk_bdev_nvme_io_passthru(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
3117 			   const struct spdk_nvme_cmd *cmd, void *buf, size_t nbytes,
3118 			   spdk_bdev_io_completion_cb cb, void *cb_arg)
3119 {
3120 	struct spdk_bdev *bdev = desc->bdev;
3121 	struct spdk_bdev_io *bdev_io;
3122 	struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch);
3123 
3124 	if (!desc->write) {
3125 		/*
3126 		 * Do not try to parse the NVMe command - we could maybe use bits in the opcode
3127 		 *  to easily determine if the command is a read or write, but for now just
3128 		 *  do not allow io_passthru with a read-only descriptor.
3129 		 */
3130 		return -EBADF;
3131 	}
3132 
3133 	bdev_io = spdk_bdev_get_io(channel);
3134 	if (!bdev_io) {
3135 		return -ENOMEM;
3136 	}
3137 
3138 	bdev_io->internal.ch = channel;
3139 	bdev_io->internal.desc = desc;
3140 	bdev_io->type = SPDK_BDEV_IO_TYPE_NVME_IO;
3141 	bdev_io->u.nvme_passthru.cmd = *cmd;
3142 	bdev_io->u.nvme_passthru.buf = buf;
3143 	bdev_io->u.nvme_passthru.nbytes = nbytes;
3144 	bdev_io->u.nvme_passthru.md_buf = NULL;
3145 	bdev_io->u.nvme_passthru.md_len = 0;
3146 
3147 	spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb);
3148 
3149 	spdk_bdev_io_submit(bdev_io);
3150 	return 0;
3151 }
3152 
3153 int
3154 spdk_bdev_nvme_io_passthru_md(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
3155 			      const struct spdk_nvme_cmd *cmd, void *buf, size_t nbytes, void *md_buf, size_t md_len,
3156 			      spdk_bdev_io_completion_cb cb, void *cb_arg)
3157 {
3158 	struct spdk_bdev *bdev = desc->bdev;
3159 	struct spdk_bdev_io *bdev_io;
3160 	struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch);
3161 
3162 	if (!desc->write) {
3163 		/*
3164 		 * Do not try to parse the NVMe command - we could maybe use bits in the opcode
3165 		 *  to easily determine if the command is a read or write, but for now just
3166 		 *  do not allow io_passthru with a read-only descriptor.
3167 		 */
3168 		return -EBADF;
3169 	}
3170 
3171 	bdev_io = spdk_bdev_get_io(channel);
3172 	if (!bdev_io) {
3173 		return -ENOMEM;
3174 	}
3175 
3176 	bdev_io->internal.ch = channel;
3177 	bdev_io->internal.desc = desc;
3178 	bdev_io->type = SPDK_BDEV_IO_TYPE_NVME_IO_MD;
3179 	bdev_io->u.nvme_passthru.cmd = *cmd;
3180 	bdev_io->u.nvme_passthru.buf = buf;
3181 	bdev_io->u.nvme_passthru.nbytes = nbytes;
3182 	bdev_io->u.nvme_passthru.md_buf = md_buf;
3183 	bdev_io->u.nvme_passthru.md_len = md_len;
3184 
3185 	spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb);
3186 
3187 	spdk_bdev_io_submit(bdev_io);
3188 	return 0;
3189 }
3190 
3191 int
3192 spdk_bdev_queue_io_wait(struct spdk_bdev *bdev, struct spdk_io_channel *ch,
3193 			struct spdk_bdev_io_wait_entry *entry)
3194 {
3195 	struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch);
3196 	struct spdk_bdev_mgmt_channel *mgmt_ch = channel->shared_resource->mgmt_ch;
3197 
3198 	if (bdev != entry->bdev) {
3199 		SPDK_ERRLOG("bdevs do not match\n");
3200 		return -EINVAL;
3201 	}
3202 
3203 	if (mgmt_ch->per_thread_cache_count > 0) {
3204 		SPDK_ERRLOG("Cannot queue io_wait if spdk_bdev_io available in per-thread cache\n");
3205 		return -EINVAL;
3206 	}
3207 
3208 	TAILQ_INSERT_TAIL(&mgmt_ch->io_wait_queue, entry, link);
3209 	return 0;
3210 }
3211 
3212 static void
3213 _spdk_bdev_ch_retry_io(struct spdk_bdev_channel *bdev_ch)
3214 {
3215 	struct spdk_bdev *bdev = bdev_ch->bdev;
3216 	struct spdk_bdev_shared_resource *shared_resource = bdev_ch->shared_resource;
3217 	struct spdk_bdev_io *bdev_io;
3218 
3219 	if (shared_resource->io_outstanding > shared_resource->nomem_threshold) {
3220 		/*
3221 		 * Allow some more I/O to complete before retrying the nomem_io queue.
3222 		 *  Some drivers (such as nvme) cannot immediately take a new I/O in
3223 		 *  the context of a completion, because the resources for the I/O are
3224 		 *  not released until control returns to the bdev poller.  Also, we
3225 		 *  may require several small I/O to complete before a larger I/O
3226 		 *  (that requires splitting) can be submitted.
3227 		 */
3228 		return;
3229 	}
3230 
3231 	while (!TAILQ_EMPTY(&shared_resource->nomem_io)) {
3232 		bdev_io = TAILQ_FIRST(&shared_resource->nomem_io);
3233 		TAILQ_REMOVE(&shared_resource->nomem_io, bdev_io, internal.link);
3234 		bdev_io->internal.ch->io_outstanding++;
3235 		shared_resource->io_outstanding++;
3236 		bdev_io->internal.status = SPDK_BDEV_IO_STATUS_PENDING;
3237 		bdev->fn_table->submit_request(bdev_io->internal.ch->channel, bdev_io);
3238 		if (bdev_io->internal.status == SPDK_BDEV_IO_STATUS_NOMEM) {
3239 			break;
3240 		}
3241 	}
3242 }
3243 
3244 static inline void
3245 _spdk_bdev_io_complete(void *ctx)
3246 {
3247 	struct spdk_bdev_io *bdev_io = ctx;
3248 	uint64_t tsc, tsc_diff;
3249 
3250 	if (spdk_unlikely(bdev_io->internal.in_submit_request || bdev_io->internal.io_submit_ch)) {
3251 		/*
3252 		 * Send the completion to the thread that originally submitted the I/O,
3253 		 * which may not be the current thread in the case of QoS.
3254 		 */
3255 		if (bdev_io->internal.io_submit_ch) {
3256 			bdev_io->internal.ch = bdev_io->internal.io_submit_ch;
3257 			bdev_io->internal.io_submit_ch = NULL;
3258 		}
3259 
3260 		/*
3261 		 * Defer completion to avoid potential infinite recursion if the
3262 		 * user's completion callback issues a new I/O.
3263 		 */
3264 		spdk_thread_send_msg(spdk_io_channel_get_thread(bdev_io->internal.ch->channel),
3265 				     _spdk_bdev_io_complete, bdev_io);
3266 		return;
3267 	}
3268 
3269 	tsc = spdk_get_ticks();
3270 	tsc_diff = tsc - bdev_io->internal.submit_tsc;
3271 	spdk_trace_record_tsc(tsc, TRACE_BDEV_IO_DONE, 0, 0, (uintptr_t)bdev_io, 0);
3272 
3273 	if (bdev_io->internal.ch->histogram) {
3274 		spdk_histogram_data_tally(bdev_io->internal.ch->histogram, tsc_diff);
3275 	}
3276 
3277 	if (bdev_io->internal.status == SPDK_BDEV_IO_STATUS_SUCCESS) {
3278 		switch (bdev_io->type) {
3279 		case SPDK_BDEV_IO_TYPE_READ:
3280 			bdev_io->internal.ch->stat.bytes_read += bdev_io->u.bdev.num_blocks * bdev_io->bdev->blocklen;
3281 			bdev_io->internal.ch->stat.num_read_ops++;
3282 			bdev_io->internal.ch->stat.read_latency_ticks += tsc_diff;
3283 			break;
3284 		case SPDK_BDEV_IO_TYPE_WRITE:
3285 			bdev_io->internal.ch->stat.bytes_written += bdev_io->u.bdev.num_blocks * bdev_io->bdev->blocklen;
3286 			bdev_io->internal.ch->stat.num_write_ops++;
3287 			bdev_io->internal.ch->stat.write_latency_ticks += tsc_diff;
3288 			break;
3289 		case SPDK_BDEV_IO_TYPE_UNMAP:
3290 			bdev_io->internal.ch->stat.bytes_unmapped += bdev_io->u.bdev.num_blocks * bdev_io->bdev->blocklen;
3291 			bdev_io->internal.ch->stat.num_unmap_ops++;
3292 			bdev_io->internal.ch->stat.unmap_latency_ticks += tsc_diff;
3293 		default:
3294 			break;
3295 		}
3296 	}
3297 
3298 #ifdef SPDK_CONFIG_VTUNE
3299 	uint64_t now_tsc = spdk_get_ticks();
3300 	if (now_tsc > (bdev_io->internal.ch->start_tsc + bdev_io->internal.ch->interval_tsc)) {
3301 		uint64_t data[5];
3302 
3303 		data[0] = bdev_io->internal.ch->stat.num_read_ops - bdev_io->internal.ch->prev_stat.num_read_ops;
3304 		data[1] = bdev_io->internal.ch->stat.bytes_read - bdev_io->internal.ch->prev_stat.bytes_read;
3305 		data[2] = bdev_io->internal.ch->stat.num_write_ops - bdev_io->internal.ch->prev_stat.num_write_ops;
3306 		data[3] = bdev_io->internal.ch->stat.bytes_written - bdev_io->internal.ch->prev_stat.bytes_written;
3307 		data[4] = bdev_io->bdev->fn_table->get_spin_time ?
3308 			  bdev_io->bdev->fn_table->get_spin_time(bdev_io->internal.ch->channel) : 0;
3309 
3310 		__itt_metadata_add(g_bdev_mgr.domain, __itt_null, bdev_io->internal.ch->handle,
3311 				   __itt_metadata_u64, 5, data);
3312 
3313 		bdev_io->internal.ch->prev_stat = bdev_io->internal.ch->stat;
3314 		bdev_io->internal.ch->start_tsc = now_tsc;
3315 	}
3316 #endif
3317 
3318 	assert(bdev_io->internal.cb != NULL);
3319 	assert(spdk_get_thread() == spdk_io_channel_get_thread(bdev_io->internal.ch->channel));
3320 
3321 	bdev_io->internal.cb(bdev_io, bdev_io->internal.status == SPDK_BDEV_IO_STATUS_SUCCESS,
3322 			     bdev_io->internal.caller_ctx);
3323 }
3324 
3325 static void
3326 _spdk_bdev_reset_complete(struct spdk_io_channel_iter *i, int status)
3327 {
3328 	struct spdk_bdev_io *bdev_io = spdk_io_channel_iter_get_ctx(i);
3329 
3330 	if (bdev_io->u.reset.ch_ref != NULL) {
3331 		spdk_put_io_channel(bdev_io->u.reset.ch_ref);
3332 		bdev_io->u.reset.ch_ref = NULL;
3333 	}
3334 
3335 	_spdk_bdev_io_complete(bdev_io);
3336 }
3337 
3338 static void
3339 _spdk_bdev_unfreeze_channel(struct spdk_io_channel_iter *i)
3340 {
3341 	struct spdk_io_channel *_ch = spdk_io_channel_iter_get_channel(i);
3342 	struct spdk_bdev_channel *ch = spdk_io_channel_get_ctx(_ch);
3343 
3344 	ch->flags &= ~BDEV_CH_RESET_IN_PROGRESS;
3345 	if (!TAILQ_EMPTY(&ch->queued_resets)) {
3346 		_spdk_bdev_channel_start_reset(ch);
3347 	}
3348 
3349 	spdk_for_each_channel_continue(i, 0);
3350 }
3351 
3352 void
3353 spdk_bdev_io_complete(struct spdk_bdev_io *bdev_io, enum spdk_bdev_io_status status)
3354 {
3355 	struct spdk_bdev *bdev = bdev_io->bdev;
3356 	struct spdk_bdev_channel *bdev_ch = bdev_io->internal.ch;
3357 	struct spdk_bdev_shared_resource *shared_resource = bdev_ch->shared_resource;
3358 
3359 	bdev_io->internal.status = status;
3360 
3361 	if (spdk_unlikely(bdev_io->type == SPDK_BDEV_IO_TYPE_RESET)) {
3362 		bool unlock_channels = false;
3363 
3364 		if (status == SPDK_BDEV_IO_STATUS_NOMEM) {
3365 			SPDK_ERRLOG("NOMEM returned for reset\n");
3366 		}
3367 		pthread_mutex_lock(&bdev->internal.mutex);
3368 		if (bdev_io == bdev->internal.reset_in_progress) {
3369 			bdev->internal.reset_in_progress = NULL;
3370 			unlock_channels = true;
3371 		}
3372 		pthread_mutex_unlock(&bdev->internal.mutex);
3373 
3374 		if (unlock_channels) {
3375 			spdk_for_each_channel(__bdev_to_io_dev(bdev), _spdk_bdev_unfreeze_channel,
3376 					      bdev_io, _spdk_bdev_reset_complete);
3377 			return;
3378 		}
3379 	} else {
3380 		if (spdk_unlikely(bdev_io->internal.orig_iovcnt > 0)) {
3381 			_bdev_io_unset_bounce_buf(bdev_io);
3382 		}
3383 
3384 		assert(bdev_ch->io_outstanding > 0);
3385 		assert(shared_resource->io_outstanding > 0);
3386 		bdev_ch->io_outstanding--;
3387 		shared_resource->io_outstanding--;
3388 
3389 		if (spdk_unlikely(status == SPDK_BDEV_IO_STATUS_NOMEM)) {
3390 			TAILQ_INSERT_HEAD(&shared_resource->nomem_io, bdev_io, internal.link);
3391 			/*
3392 			 * Wait for some of the outstanding I/O to complete before we
3393 			 *  retry any of the nomem_io.  Normally we will wait for
3394 			 *  NOMEM_THRESHOLD_COUNT I/O to complete but for low queue
3395 			 *  depth channels we will instead wait for half to complete.
3396 			 */
3397 			shared_resource->nomem_threshold = spdk_max((int64_t)shared_resource->io_outstanding / 2,
3398 							   (int64_t)shared_resource->io_outstanding - NOMEM_THRESHOLD_COUNT);
3399 			return;
3400 		}
3401 
3402 		if (spdk_unlikely(!TAILQ_EMPTY(&shared_resource->nomem_io))) {
3403 			_spdk_bdev_ch_retry_io(bdev_ch);
3404 		}
3405 	}
3406 
3407 	_spdk_bdev_io_complete(bdev_io);
3408 }
3409 
3410 void
3411 spdk_bdev_io_complete_scsi_status(struct spdk_bdev_io *bdev_io, enum spdk_scsi_status sc,
3412 				  enum spdk_scsi_sense sk, uint8_t asc, uint8_t ascq)
3413 {
3414 	if (sc == SPDK_SCSI_STATUS_GOOD) {
3415 		bdev_io->internal.status = SPDK_BDEV_IO_STATUS_SUCCESS;
3416 	} else {
3417 		bdev_io->internal.status = SPDK_BDEV_IO_STATUS_SCSI_ERROR;
3418 		bdev_io->internal.error.scsi.sc = sc;
3419 		bdev_io->internal.error.scsi.sk = sk;
3420 		bdev_io->internal.error.scsi.asc = asc;
3421 		bdev_io->internal.error.scsi.ascq = ascq;
3422 	}
3423 
3424 	spdk_bdev_io_complete(bdev_io, bdev_io->internal.status);
3425 }
3426 
3427 void
3428 spdk_bdev_io_get_scsi_status(const struct spdk_bdev_io *bdev_io,
3429 			     int *sc, int *sk, int *asc, int *ascq)
3430 {
3431 	assert(sc != NULL);
3432 	assert(sk != NULL);
3433 	assert(asc != NULL);
3434 	assert(ascq != NULL);
3435 
3436 	switch (bdev_io->internal.status) {
3437 	case SPDK_BDEV_IO_STATUS_SUCCESS:
3438 		*sc = SPDK_SCSI_STATUS_GOOD;
3439 		*sk = SPDK_SCSI_SENSE_NO_SENSE;
3440 		*asc = SPDK_SCSI_ASC_NO_ADDITIONAL_SENSE;
3441 		*ascq = SPDK_SCSI_ASCQ_CAUSE_NOT_REPORTABLE;
3442 		break;
3443 	case SPDK_BDEV_IO_STATUS_NVME_ERROR:
3444 		spdk_scsi_nvme_translate(bdev_io, sc, sk, asc, ascq);
3445 		break;
3446 	case SPDK_BDEV_IO_STATUS_SCSI_ERROR:
3447 		*sc = bdev_io->internal.error.scsi.sc;
3448 		*sk = bdev_io->internal.error.scsi.sk;
3449 		*asc = bdev_io->internal.error.scsi.asc;
3450 		*ascq = bdev_io->internal.error.scsi.ascq;
3451 		break;
3452 	default:
3453 		*sc = SPDK_SCSI_STATUS_CHECK_CONDITION;
3454 		*sk = SPDK_SCSI_SENSE_ABORTED_COMMAND;
3455 		*asc = SPDK_SCSI_ASC_NO_ADDITIONAL_SENSE;
3456 		*ascq = SPDK_SCSI_ASCQ_CAUSE_NOT_REPORTABLE;
3457 		break;
3458 	}
3459 }
3460 
3461 void
3462 spdk_bdev_io_complete_nvme_status(struct spdk_bdev_io *bdev_io, int sct, int sc)
3463 {
3464 	if (sct == SPDK_NVME_SCT_GENERIC && sc == SPDK_NVME_SC_SUCCESS) {
3465 		bdev_io->internal.status = SPDK_BDEV_IO_STATUS_SUCCESS;
3466 	} else {
3467 		bdev_io->internal.error.nvme.sct = sct;
3468 		bdev_io->internal.error.nvme.sc = sc;
3469 		bdev_io->internal.status = SPDK_BDEV_IO_STATUS_NVME_ERROR;
3470 	}
3471 
3472 	spdk_bdev_io_complete(bdev_io, bdev_io->internal.status);
3473 }
3474 
3475 void
3476 spdk_bdev_io_get_nvme_status(const struct spdk_bdev_io *bdev_io, int *sct, int *sc)
3477 {
3478 	assert(sct != NULL);
3479 	assert(sc != NULL);
3480 
3481 	if (bdev_io->internal.status == SPDK_BDEV_IO_STATUS_NVME_ERROR) {
3482 		*sct = bdev_io->internal.error.nvme.sct;
3483 		*sc = bdev_io->internal.error.nvme.sc;
3484 	} else if (bdev_io->internal.status == SPDK_BDEV_IO_STATUS_SUCCESS) {
3485 		*sct = SPDK_NVME_SCT_GENERIC;
3486 		*sc = SPDK_NVME_SC_SUCCESS;
3487 	} else {
3488 		*sct = SPDK_NVME_SCT_GENERIC;
3489 		*sc = SPDK_NVME_SC_INTERNAL_DEVICE_ERROR;
3490 	}
3491 }
3492 
3493 struct spdk_thread *
3494 spdk_bdev_io_get_thread(struct spdk_bdev_io *bdev_io)
3495 {
3496 	return spdk_io_channel_get_thread(bdev_io->internal.ch->channel);
3497 }
3498 
3499 struct spdk_io_channel *
3500 spdk_bdev_io_get_io_channel(struct spdk_bdev_io *bdev_io)
3501 {
3502 	return bdev_io->internal.ch->channel;
3503 }
3504 
3505 static void
3506 _spdk_bdev_qos_config_limit(struct spdk_bdev *bdev, uint64_t *limits)
3507 {
3508 	uint64_t	min_qos_set;
3509 	int		i;
3510 
3511 	for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) {
3512 		if (limits[i] != SPDK_BDEV_QOS_LIMIT_NOT_DEFINED) {
3513 			break;
3514 		}
3515 	}
3516 
3517 	if (i == SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES) {
3518 		SPDK_ERRLOG("Invalid rate limits set.\n");
3519 		return;
3520 	}
3521 
3522 	for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) {
3523 		if (limits[i] == SPDK_BDEV_QOS_LIMIT_NOT_DEFINED) {
3524 			continue;
3525 		}
3526 
3527 		if (_spdk_bdev_qos_is_iops_rate_limit(i) == true) {
3528 			min_qos_set = SPDK_BDEV_QOS_MIN_IOS_PER_SEC;
3529 		} else {
3530 			min_qos_set = SPDK_BDEV_QOS_MIN_BYTES_PER_SEC;
3531 		}
3532 
3533 		if (limits[i] == 0 || limits[i] % min_qos_set) {
3534 			SPDK_ERRLOG("Assigned limit %" PRIu64 " on bdev %s is not multiple of %" PRIu64 "\n",
3535 				    limits[i], bdev->name, min_qos_set);
3536 			SPDK_ERRLOG("Failed to enable QoS on this bdev %s\n", bdev->name);
3537 			return;
3538 		}
3539 	}
3540 
3541 	if (!bdev->internal.qos) {
3542 		bdev->internal.qos = calloc(1, sizeof(*bdev->internal.qos));
3543 		if (!bdev->internal.qos) {
3544 			SPDK_ERRLOG("Unable to allocate memory for QoS tracking\n");
3545 			return;
3546 		}
3547 	}
3548 
3549 	for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) {
3550 		bdev->internal.qos->rate_limits[i].limit = limits[i];
3551 		SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Bdev:%s QoS type:%d set:%lu\n",
3552 			      bdev->name, i, limits[i]);
3553 	}
3554 
3555 	return;
3556 }
3557 
3558 static void
3559 _spdk_bdev_qos_config(struct spdk_bdev *bdev)
3560 {
3561 	struct spdk_conf_section	*sp = NULL;
3562 	const char			*val = NULL;
3563 	int				i = 0, j = 0;
3564 	uint64_t			limits[SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES] = {};
3565 	bool				config_qos = false;
3566 
3567 	sp = spdk_conf_find_section(NULL, "QoS");
3568 	if (!sp) {
3569 		return;
3570 	}
3571 
3572 	while (j < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES) {
3573 		limits[j] = SPDK_BDEV_QOS_LIMIT_NOT_DEFINED;
3574 
3575 		i = 0;
3576 		while (true) {
3577 			val = spdk_conf_section_get_nmval(sp, qos_conf_type[j], i, 0);
3578 			if (!val) {
3579 				break;
3580 			}
3581 
3582 			if (strcmp(bdev->name, val) != 0) {
3583 				i++;
3584 				continue;
3585 			}
3586 
3587 			val = spdk_conf_section_get_nmval(sp, qos_conf_type[j], i, 1);
3588 			if (val) {
3589 				if (_spdk_bdev_qos_is_iops_rate_limit(j) == true) {
3590 					limits[j] = strtoull(val, NULL, 10);
3591 				} else {
3592 					limits[j] = strtoull(val, NULL, 10) * 1024 * 1024;
3593 				}
3594 				config_qos = true;
3595 			}
3596 
3597 			break;
3598 		}
3599 
3600 		j++;
3601 	}
3602 
3603 	if (config_qos == true) {
3604 		_spdk_bdev_qos_config_limit(bdev, limits);
3605 	}
3606 
3607 	return;
3608 }
3609 
3610 static int
3611 spdk_bdev_init(struct spdk_bdev *bdev)
3612 {
3613 	char *bdev_name;
3614 
3615 	assert(bdev->module != NULL);
3616 
3617 	if (!bdev->name) {
3618 		SPDK_ERRLOG("Bdev name is NULL\n");
3619 		return -EINVAL;
3620 	}
3621 
3622 	if (spdk_bdev_get_by_name(bdev->name)) {
3623 		SPDK_ERRLOG("Bdev name:%s already exists\n", bdev->name);
3624 		return -EEXIST;
3625 	}
3626 
3627 	/* Users often register their own I/O devices using the bdev name. In
3628 	 * order to avoid conflicts, prepend bdev_. */
3629 	bdev_name = spdk_sprintf_alloc("bdev_%s", bdev->name);
3630 	if (!bdev_name) {
3631 		SPDK_ERRLOG("Unable to allocate memory for internal bdev name.\n");
3632 		return -ENOMEM;
3633 	}
3634 
3635 	bdev->internal.status = SPDK_BDEV_STATUS_READY;
3636 	bdev->internal.measured_queue_depth = UINT64_MAX;
3637 	bdev->internal.claim_module = NULL;
3638 	bdev->internal.qd_poller = NULL;
3639 	bdev->internal.qos = NULL;
3640 
3641 	if (spdk_bdev_get_buf_align(bdev) > 1) {
3642 		if (bdev->split_on_optimal_io_boundary) {
3643 			bdev->optimal_io_boundary = spdk_min(bdev->optimal_io_boundary,
3644 							     SPDK_BDEV_LARGE_BUF_MAX_SIZE / bdev->blocklen);
3645 		} else {
3646 			bdev->split_on_optimal_io_boundary = true;
3647 			bdev->optimal_io_boundary = SPDK_BDEV_LARGE_BUF_MAX_SIZE / bdev->blocklen;
3648 		}
3649 	}
3650 
3651 	TAILQ_INIT(&bdev->internal.open_descs);
3652 
3653 	TAILQ_INIT(&bdev->aliases);
3654 
3655 	bdev->internal.reset_in_progress = NULL;
3656 
3657 	_spdk_bdev_qos_config(bdev);
3658 
3659 	spdk_io_device_register(__bdev_to_io_dev(bdev),
3660 				spdk_bdev_channel_create, spdk_bdev_channel_destroy,
3661 				sizeof(struct spdk_bdev_channel),
3662 				bdev_name);
3663 
3664 	free(bdev_name);
3665 
3666 	pthread_mutex_init(&bdev->internal.mutex, NULL);
3667 	return 0;
3668 }
3669 
3670 static void
3671 spdk_bdev_destroy_cb(void *io_device)
3672 {
3673 	int			rc;
3674 	struct spdk_bdev	*bdev;
3675 	spdk_bdev_unregister_cb	cb_fn;
3676 	void			*cb_arg;
3677 
3678 	bdev = __bdev_from_io_dev(io_device);
3679 	cb_fn = bdev->internal.unregister_cb;
3680 	cb_arg = bdev->internal.unregister_ctx;
3681 
3682 	rc = bdev->fn_table->destruct(bdev->ctxt);
3683 	if (rc < 0) {
3684 		SPDK_ERRLOG("destruct failed\n");
3685 	}
3686 	if (rc <= 0 && cb_fn != NULL) {
3687 		cb_fn(cb_arg, rc);
3688 	}
3689 }
3690 
3691 
3692 static void
3693 spdk_bdev_fini(struct spdk_bdev *bdev)
3694 {
3695 	pthread_mutex_destroy(&bdev->internal.mutex);
3696 
3697 	free(bdev->internal.qos);
3698 
3699 	spdk_io_device_unregister(__bdev_to_io_dev(bdev), spdk_bdev_destroy_cb);
3700 }
3701 
3702 static void
3703 spdk_bdev_start(struct spdk_bdev *bdev)
3704 {
3705 	struct spdk_bdev_module *module;
3706 	uint32_t action;
3707 
3708 	SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Inserting bdev %s into list\n", bdev->name);
3709 	TAILQ_INSERT_TAIL(&g_bdev_mgr.bdevs, bdev, internal.link);
3710 
3711 	/* Examine configuration before initializing I/O */
3712 	TAILQ_FOREACH(module, &g_bdev_mgr.bdev_modules, internal.tailq) {
3713 		if (module->examine_config) {
3714 			action = module->internal.action_in_progress;
3715 			module->internal.action_in_progress++;
3716 			module->examine_config(bdev);
3717 			if (action != module->internal.action_in_progress) {
3718 				SPDK_ERRLOG("examine_config for module %s did not call spdk_bdev_module_examine_done()\n",
3719 					    module->name);
3720 			}
3721 		}
3722 	}
3723 
3724 	if (bdev->internal.claim_module) {
3725 		return;
3726 	}
3727 
3728 	TAILQ_FOREACH(module, &g_bdev_mgr.bdev_modules, internal.tailq) {
3729 		if (module->examine_disk) {
3730 			module->internal.action_in_progress++;
3731 			module->examine_disk(bdev);
3732 		}
3733 	}
3734 }
3735 
3736 int
3737 spdk_bdev_register(struct spdk_bdev *bdev)
3738 {
3739 	int rc = spdk_bdev_init(bdev);
3740 
3741 	if (rc == 0) {
3742 		spdk_bdev_start(bdev);
3743 	}
3744 
3745 	return rc;
3746 }
3747 
3748 int
3749 spdk_vbdev_register(struct spdk_bdev *vbdev, struct spdk_bdev **base_bdevs, int base_bdev_count)
3750 {
3751 	int rc;
3752 
3753 	rc = spdk_bdev_init(vbdev);
3754 	if (rc) {
3755 		return rc;
3756 	}
3757 
3758 	spdk_bdev_start(vbdev);
3759 	return 0;
3760 }
3761 
3762 void
3763 spdk_bdev_destruct_done(struct spdk_bdev *bdev, int bdeverrno)
3764 {
3765 	if (bdev->internal.unregister_cb != NULL) {
3766 		bdev->internal.unregister_cb(bdev->internal.unregister_ctx, bdeverrno);
3767 	}
3768 }
3769 
3770 static void
3771 _remove_notify(void *arg)
3772 {
3773 	struct spdk_bdev_desc *desc = arg;
3774 
3775 	desc->remove_scheduled = false;
3776 
3777 	if (desc->closed) {
3778 		free(desc);
3779 	} else {
3780 		desc->remove_cb(desc->remove_ctx);
3781 	}
3782 }
3783 
3784 void
3785 spdk_bdev_unregister(struct spdk_bdev *bdev, spdk_bdev_unregister_cb cb_fn, void *cb_arg)
3786 {
3787 	struct spdk_bdev_desc	*desc, *tmp;
3788 	bool			do_destruct = true;
3789 	struct spdk_thread	*thread;
3790 
3791 	SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Removing bdev %s from list\n", bdev->name);
3792 
3793 	thread = spdk_get_thread();
3794 	if (!thread) {
3795 		/* The user called this from a non-SPDK thread. */
3796 		if (cb_fn != NULL) {
3797 			cb_fn(cb_arg, -ENOTSUP);
3798 		}
3799 		return;
3800 	}
3801 
3802 	pthread_mutex_lock(&bdev->internal.mutex);
3803 
3804 	bdev->internal.status = SPDK_BDEV_STATUS_REMOVING;
3805 	bdev->internal.unregister_cb = cb_fn;
3806 	bdev->internal.unregister_ctx = cb_arg;
3807 
3808 	TAILQ_FOREACH_SAFE(desc, &bdev->internal.open_descs, link, tmp) {
3809 		if (desc->remove_cb) {
3810 			do_destruct = false;
3811 			/*
3812 			 * Defer invocation of the remove_cb to a separate message that will
3813 			 *  run later on its thread.  This ensures this context unwinds and
3814 			 *  we don't recursively unregister this bdev again if the remove_cb
3815 			 *  immediately closes its descriptor.
3816 			 */
3817 			if (!desc->remove_scheduled) {
3818 				/* Avoid scheduling removal of the same descriptor multiple times. */
3819 				desc->remove_scheduled = true;
3820 				spdk_thread_send_msg(desc->thread, _remove_notify, desc);
3821 			}
3822 		}
3823 	}
3824 
3825 	if (!do_destruct) {
3826 		pthread_mutex_unlock(&bdev->internal.mutex);
3827 		return;
3828 	}
3829 
3830 	TAILQ_REMOVE(&g_bdev_mgr.bdevs, bdev, internal.link);
3831 	pthread_mutex_unlock(&bdev->internal.mutex);
3832 
3833 	spdk_bdev_fini(bdev);
3834 }
3835 
3836 int
3837 spdk_bdev_open(struct spdk_bdev *bdev, bool write, spdk_bdev_remove_cb_t remove_cb,
3838 	       void *remove_ctx, struct spdk_bdev_desc **_desc)
3839 {
3840 	struct spdk_bdev_desc *desc;
3841 	struct spdk_thread *thread;
3842 	struct set_qos_limit_ctx *ctx;
3843 
3844 	thread = spdk_get_thread();
3845 	if (!thread) {
3846 		SPDK_ERRLOG("Cannot open bdev from non-SPDK thread.\n");
3847 		return -ENOTSUP;
3848 	}
3849 
3850 	desc = calloc(1, sizeof(*desc));
3851 	if (desc == NULL) {
3852 		SPDK_ERRLOG("Failed to allocate memory for bdev descriptor\n");
3853 		return -ENOMEM;
3854 	}
3855 
3856 	SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Opening descriptor %p for bdev %s on thread %p\n", desc, bdev->name,
3857 		      spdk_get_thread());
3858 
3859 	desc->bdev = bdev;
3860 	desc->thread = thread;
3861 	desc->remove_cb = remove_cb;
3862 	desc->remove_ctx = remove_ctx;
3863 	desc->write = write;
3864 	*_desc = desc;
3865 
3866 	pthread_mutex_lock(&bdev->internal.mutex);
3867 
3868 	if (write && bdev->internal.claim_module) {
3869 		SPDK_ERRLOG("Could not open %s - %s module already claimed it\n",
3870 			    bdev->name, bdev->internal.claim_module->name);
3871 		pthread_mutex_unlock(&bdev->internal.mutex);
3872 		free(desc);
3873 		*_desc = NULL;
3874 		return -EPERM;
3875 	}
3876 
3877 	/* Enable QoS */
3878 	if (bdev->internal.qos && bdev->internal.qos->thread == NULL) {
3879 		ctx = calloc(1, sizeof(*ctx));
3880 		if (ctx == NULL) {
3881 			SPDK_ERRLOG("Failed to allocate memory for QoS context\n");
3882 			pthread_mutex_unlock(&bdev->internal.mutex);
3883 			free(desc);
3884 			*_desc = NULL;
3885 			return -ENOMEM;
3886 		}
3887 		ctx->bdev = bdev;
3888 		spdk_for_each_channel(__bdev_to_io_dev(bdev),
3889 				      _spdk_bdev_enable_qos_msg, ctx,
3890 				      _spdk_bdev_enable_qos_done);
3891 	}
3892 
3893 	TAILQ_INSERT_TAIL(&bdev->internal.open_descs, desc, link);
3894 
3895 	pthread_mutex_unlock(&bdev->internal.mutex);
3896 
3897 	return 0;
3898 }
3899 
3900 void
3901 spdk_bdev_close(struct spdk_bdev_desc *desc)
3902 {
3903 	struct spdk_bdev *bdev = desc->bdev;
3904 	bool do_unregister = false;
3905 
3906 	SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Closing descriptor %p for bdev %s on thread %p\n", desc, bdev->name,
3907 		      spdk_get_thread());
3908 
3909 	assert(desc->thread == spdk_get_thread());
3910 
3911 	pthread_mutex_lock(&bdev->internal.mutex);
3912 
3913 	TAILQ_REMOVE(&bdev->internal.open_descs, desc, link);
3914 
3915 	desc->closed = true;
3916 
3917 	if (!desc->remove_scheduled) {
3918 		free(desc);
3919 	}
3920 
3921 	/* If no more descriptors, kill QoS channel */
3922 	if (bdev->internal.qos && TAILQ_EMPTY(&bdev->internal.open_descs)) {
3923 		SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Closed last descriptor for bdev %s on thread %p. Stopping QoS.\n",
3924 			      bdev->name, spdk_get_thread());
3925 
3926 		if (spdk_bdev_qos_destroy(bdev)) {
3927 			/* There isn't anything we can do to recover here. Just let the
3928 			 * old QoS poller keep running. The QoS handling won't change
3929 			 * cores when the user allocates a new channel, but it won't break. */
3930 			SPDK_ERRLOG("Unable to shut down QoS poller. It will continue running on the current thread.\n");
3931 		}
3932 	}
3933 
3934 	spdk_bdev_set_qd_sampling_period(bdev, 0);
3935 
3936 	if (bdev->internal.status == SPDK_BDEV_STATUS_REMOVING && TAILQ_EMPTY(&bdev->internal.open_descs)) {
3937 		do_unregister = true;
3938 	}
3939 	pthread_mutex_unlock(&bdev->internal.mutex);
3940 
3941 	if (do_unregister == true) {
3942 		spdk_bdev_unregister(bdev, bdev->internal.unregister_cb, bdev->internal.unregister_ctx);
3943 	}
3944 }
3945 
3946 int
3947 spdk_bdev_module_claim_bdev(struct spdk_bdev *bdev, struct spdk_bdev_desc *desc,
3948 			    struct spdk_bdev_module *module)
3949 {
3950 	if (bdev->internal.claim_module != NULL) {
3951 		SPDK_ERRLOG("bdev %s already claimed by module %s\n", bdev->name,
3952 			    bdev->internal.claim_module->name);
3953 		return -EPERM;
3954 	}
3955 
3956 	if (desc && !desc->write) {
3957 		desc->write = true;
3958 	}
3959 
3960 	bdev->internal.claim_module = module;
3961 	return 0;
3962 }
3963 
3964 void
3965 spdk_bdev_module_release_bdev(struct spdk_bdev *bdev)
3966 {
3967 	assert(bdev->internal.claim_module != NULL);
3968 	bdev->internal.claim_module = NULL;
3969 }
3970 
3971 struct spdk_bdev *
3972 spdk_bdev_desc_get_bdev(struct spdk_bdev_desc *desc)
3973 {
3974 	return desc->bdev;
3975 }
3976 
3977 void
3978 spdk_bdev_io_get_iovec(struct spdk_bdev_io *bdev_io, struct iovec **iovp, int *iovcntp)
3979 {
3980 	struct iovec *iovs;
3981 	int iovcnt;
3982 
3983 	if (bdev_io == NULL) {
3984 		return;
3985 	}
3986 
3987 	switch (bdev_io->type) {
3988 	case SPDK_BDEV_IO_TYPE_READ:
3989 		iovs = bdev_io->u.bdev.iovs;
3990 		iovcnt = bdev_io->u.bdev.iovcnt;
3991 		break;
3992 	case SPDK_BDEV_IO_TYPE_WRITE:
3993 		iovs = bdev_io->u.bdev.iovs;
3994 		iovcnt = bdev_io->u.bdev.iovcnt;
3995 		break;
3996 	default:
3997 		iovs = NULL;
3998 		iovcnt = 0;
3999 		break;
4000 	}
4001 
4002 	if (iovp) {
4003 		*iovp = iovs;
4004 	}
4005 	if (iovcntp) {
4006 		*iovcntp = iovcnt;
4007 	}
4008 }
4009 
4010 void
4011 spdk_bdev_module_list_add(struct spdk_bdev_module *bdev_module)
4012 {
4013 
4014 	if (spdk_bdev_module_list_find(bdev_module->name)) {
4015 		SPDK_ERRLOG("ERROR: module '%s' already registered.\n", bdev_module->name);
4016 		assert(false);
4017 	}
4018 
4019 	if (bdev_module->async_init) {
4020 		bdev_module->internal.action_in_progress = 1;
4021 	}
4022 
4023 	/*
4024 	 * Modules with examine callbacks must be initialized first, so they are
4025 	 *  ready to handle examine callbacks from later modules that will
4026 	 *  register physical bdevs.
4027 	 */
4028 	if (bdev_module->examine_config != NULL || bdev_module->examine_disk != NULL) {
4029 		TAILQ_INSERT_HEAD(&g_bdev_mgr.bdev_modules, bdev_module, internal.tailq);
4030 	} else {
4031 		TAILQ_INSERT_TAIL(&g_bdev_mgr.bdev_modules, bdev_module, internal.tailq);
4032 	}
4033 }
4034 
4035 struct spdk_bdev_module *
4036 spdk_bdev_module_list_find(const char *name)
4037 {
4038 	struct spdk_bdev_module *bdev_module;
4039 
4040 	TAILQ_FOREACH(bdev_module, &g_bdev_mgr.bdev_modules, internal.tailq) {
4041 		if (strcmp(name, bdev_module->name) == 0) {
4042 			break;
4043 		}
4044 	}
4045 
4046 	return bdev_module;
4047 }
4048 
4049 static void
4050 _spdk_bdev_write_zero_buffer_next(void *_bdev_io)
4051 {
4052 	struct spdk_bdev_io *bdev_io = _bdev_io;
4053 	uint64_t num_bytes, num_blocks;
4054 	int rc;
4055 
4056 	num_bytes = spdk_min(spdk_bdev_get_block_size(bdev_io->bdev) *
4057 			     bdev_io->u.bdev.split_remaining_num_blocks,
4058 			     ZERO_BUFFER_SIZE);
4059 	num_blocks = num_bytes / spdk_bdev_get_block_size(bdev_io->bdev);
4060 
4061 	rc = spdk_bdev_write_blocks(bdev_io->internal.desc,
4062 				    spdk_io_channel_from_ctx(bdev_io->internal.ch),
4063 				    g_bdev_mgr.zero_buffer,
4064 				    bdev_io->u.bdev.split_current_offset_blocks, num_blocks,
4065 				    _spdk_bdev_write_zero_buffer_done, bdev_io);
4066 	if (rc == 0) {
4067 		bdev_io->u.bdev.split_remaining_num_blocks -= num_blocks;
4068 		bdev_io->u.bdev.split_current_offset_blocks += num_blocks;
4069 	} else if (rc == -ENOMEM) {
4070 		_spdk_bdev_queue_io_wait_with_cb(bdev_io, _spdk_bdev_write_zero_buffer_next);
4071 	} else {
4072 		bdev_io->internal.status = SPDK_BDEV_IO_STATUS_FAILED;
4073 		bdev_io->internal.cb(bdev_io, false, bdev_io->internal.caller_ctx);
4074 	}
4075 }
4076 
4077 static void
4078 _spdk_bdev_write_zero_buffer_done(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg)
4079 {
4080 	struct spdk_bdev_io *parent_io = cb_arg;
4081 
4082 	spdk_bdev_free_io(bdev_io);
4083 
4084 	if (!success) {
4085 		parent_io->internal.status = SPDK_BDEV_IO_STATUS_FAILED;
4086 		parent_io->internal.cb(parent_io, false, parent_io->internal.caller_ctx);
4087 		return;
4088 	}
4089 
4090 	if (parent_io->u.bdev.split_remaining_num_blocks == 0) {
4091 		parent_io->internal.status = SPDK_BDEV_IO_STATUS_SUCCESS;
4092 		parent_io->internal.cb(parent_io, true, parent_io->internal.caller_ctx);
4093 		return;
4094 	}
4095 
4096 	_spdk_bdev_write_zero_buffer_next(parent_io);
4097 }
4098 
4099 static void
4100 _spdk_bdev_set_qos_limit_done(struct set_qos_limit_ctx *ctx, int status)
4101 {
4102 	pthread_mutex_lock(&ctx->bdev->internal.mutex);
4103 	ctx->bdev->internal.qos_mod_in_progress = false;
4104 	pthread_mutex_unlock(&ctx->bdev->internal.mutex);
4105 
4106 	if (ctx->cb_fn) {
4107 		ctx->cb_fn(ctx->cb_arg, status);
4108 	}
4109 	free(ctx);
4110 }
4111 
4112 static void
4113 _spdk_bdev_disable_qos_done(void *cb_arg)
4114 {
4115 	struct set_qos_limit_ctx *ctx = cb_arg;
4116 	struct spdk_bdev *bdev = ctx->bdev;
4117 	struct spdk_bdev_io *bdev_io;
4118 	struct spdk_bdev_qos *qos;
4119 
4120 	pthread_mutex_lock(&bdev->internal.mutex);
4121 	qos = bdev->internal.qos;
4122 	bdev->internal.qos = NULL;
4123 	pthread_mutex_unlock(&bdev->internal.mutex);
4124 
4125 	while (!TAILQ_EMPTY(&qos->queued)) {
4126 		/* Send queued I/O back to their original thread for resubmission. */
4127 		bdev_io = TAILQ_FIRST(&qos->queued);
4128 		TAILQ_REMOVE(&qos->queued, bdev_io, internal.link);
4129 
4130 		if (bdev_io->internal.io_submit_ch) {
4131 			/*
4132 			 * Channel was changed when sending it to the QoS thread - change it back
4133 			 *  before sending it back to the original thread.
4134 			 */
4135 			bdev_io->internal.ch = bdev_io->internal.io_submit_ch;
4136 			bdev_io->internal.io_submit_ch = NULL;
4137 		}
4138 
4139 		spdk_thread_send_msg(spdk_io_channel_get_thread(bdev_io->internal.ch->channel),
4140 				     _spdk_bdev_io_submit, bdev_io);
4141 	}
4142 
4143 	if (qos->thread != NULL) {
4144 		spdk_put_io_channel(spdk_io_channel_from_ctx(qos->ch));
4145 		spdk_poller_unregister(&qos->poller);
4146 	}
4147 
4148 	free(qos);
4149 
4150 	_spdk_bdev_set_qos_limit_done(ctx, 0);
4151 }
4152 
4153 static void
4154 _spdk_bdev_disable_qos_msg_done(struct spdk_io_channel_iter *i, int status)
4155 {
4156 	void *io_device = spdk_io_channel_iter_get_io_device(i);
4157 	struct spdk_bdev *bdev = __bdev_from_io_dev(io_device);
4158 	struct set_qos_limit_ctx *ctx = spdk_io_channel_iter_get_ctx(i);
4159 	struct spdk_thread *thread;
4160 
4161 	pthread_mutex_lock(&bdev->internal.mutex);
4162 	thread = bdev->internal.qos->thread;
4163 	pthread_mutex_unlock(&bdev->internal.mutex);
4164 
4165 	if (thread != NULL) {
4166 		spdk_thread_send_msg(thread, _spdk_bdev_disable_qos_done, ctx);
4167 	} else {
4168 		_spdk_bdev_disable_qos_done(ctx);
4169 	}
4170 }
4171 
4172 static void
4173 _spdk_bdev_disable_qos_msg(struct spdk_io_channel_iter *i)
4174 {
4175 	struct spdk_io_channel *ch = spdk_io_channel_iter_get_channel(i);
4176 	struct spdk_bdev_channel *bdev_ch = spdk_io_channel_get_ctx(ch);
4177 
4178 	bdev_ch->flags &= ~BDEV_CH_QOS_ENABLED;
4179 
4180 	spdk_for_each_channel_continue(i, 0);
4181 }
4182 
4183 static void
4184 _spdk_bdev_update_qos_rate_limit_msg(void *cb_arg)
4185 {
4186 	struct set_qos_limit_ctx *ctx = cb_arg;
4187 	struct spdk_bdev *bdev = ctx->bdev;
4188 
4189 	pthread_mutex_lock(&bdev->internal.mutex);
4190 	spdk_bdev_qos_update_max_quota_per_timeslice(bdev->internal.qos);
4191 	pthread_mutex_unlock(&bdev->internal.mutex);
4192 
4193 	_spdk_bdev_set_qos_limit_done(ctx, 0);
4194 }
4195 
4196 static void
4197 _spdk_bdev_enable_qos_msg(struct spdk_io_channel_iter *i)
4198 {
4199 	void *io_device = spdk_io_channel_iter_get_io_device(i);
4200 	struct spdk_bdev *bdev = __bdev_from_io_dev(io_device);
4201 	struct spdk_io_channel *ch = spdk_io_channel_iter_get_channel(i);
4202 	struct spdk_bdev_channel *bdev_ch = spdk_io_channel_get_ctx(ch);
4203 
4204 	pthread_mutex_lock(&bdev->internal.mutex);
4205 	_spdk_bdev_enable_qos(bdev, bdev_ch);
4206 	pthread_mutex_unlock(&bdev->internal.mutex);
4207 	spdk_for_each_channel_continue(i, 0);
4208 }
4209 
4210 static void
4211 _spdk_bdev_enable_qos_done(struct spdk_io_channel_iter *i, int status)
4212 {
4213 	struct set_qos_limit_ctx *ctx = spdk_io_channel_iter_get_ctx(i);
4214 
4215 	_spdk_bdev_set_qos_limit_done(ctx, status);
4216 }
4217 
4218 static void
4219 _spdk_bdev_set_qos_rate_limits(struct spdk_bdev *bdev, uint64_t *limits)
4220 {
4221 	int i;
4222 
4223 	assert(bdev->internal.qos != NULL);
4224 
4225 	for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) {
4226 		if (limits[i] != SPDK_BDEV_QOS_LIMIT_NOT_DEFINED) {
4227 			bdev->internal.qos->rate_limits[i].limit = limits[i];
4228 
4229 			if (limits[i] == 0) {
4230 				bdev->internal.qos->rate_limits[i].limit =
4231 					SPDK_BDEV_QOS_LIMIT_NOT_DEFINED;
4232 			}
4233 		}
4234 	}
4235 }
4236 
4237 void
4238 spdk_bdev_set_qos_rate_limits(struct spdk_bdev *bdev, uint64_t *limits,
4239 			      void (*cb_fn)(void *cb_arg, int status), void *cb_arg)
4240 {
4241 	struct set_qos_limit_ctx	*ctx;
4242 	uint32_t			limit_set_complement;
4243 	uint64_t			min_limit_per_sec;
4244 	int				i;
4245 	bool				disable_rate_limit = true;
4246 
4247 	for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) {
4248 		if (limits[i] == SPDK_BDEV_QOS_LIMIT_NOT_DEFINED) {
4249 			continue;
4250 		}
4251 
4252 		if (limits[i] > 0) {
4253 			disable_rate_limit = false;
4254 		}
4255 
4256 		if (_spdk_bdev_qos_is_iops_rate_limit(i) == true) {
4257 			min_limit_per_sec = SPDK_BDEV_QOS_MIN_IOS_PER_SEC;
4258 		} else {
4259 			/* Change from megabyte to byte rate limit */
4260 			limits[i] = limits[i] * 1024 * 1024;
4261 			min_limit_per_sec = SPDK_BDEV_QOS_MIN_BYTES_PER_SEC;
4262 		}
4263 
4264 		limit_set_complement = limits[i] % min_limit_per_sec;
4265 		if (limit_set_complement) {
4266 			SPDK_ERRLOG("Requested rate limit %" PRIu64 " is not a multiple of %" PRIu64 "\n",
4267 				    limits[i], min_limit_per_sec);
4268 			limits[i] += min_limit_per_sec - limit_set_complement;
4269 			SPDK_ERRLOG("Round up the rate limit to %" PRIu64 "\n", limits[i]);
4270 		}
4271 	}
4272 
4273 	ctx = calloc(1, sizeof(*ctx));
4274 	if (ctx == NULL) {
4275 		cb_fn(cb_arg, -ENOMEM);
4276 		return;
4277 	}
4278 
4279 	ctx->cb_fn = cb_fn;
4280 	ctx->cb_arg = cb_arg;
4281 	ctx->bdev = bdev;
4282 
4283 	pthread_mutex_lock(&bdev->internal.mutex);
4284 	if (bdev->internal.qos_mod_in_progress) {
4285 		pthread_mutex_unlock(&bdev->internal.mutex);
4286 		free(ctx);
4287 		cb_fn(cb_arg, -EAGAIN);
4288 		return;
4289 	}
4290 	bdev->internal.qos_mod_in_progress = true;
4291 
4292 	if (disable_rate_limit == true && bdev->internal.qos) {
4293 		for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) {
4294 			if (limits[i] == SPDK_BDEV_QOS_LIMIT_NOT_DEFINED &&
4295 			    (bdev->internal.qos->rate_limits[i].limit > 0 &&
4296 			     bdev->internal.qos->rate_limits[i].limit !=
4297 			     SPDK_BDEV_QOS_LIMIT_NOT_DEFINED)) {
4298 				disable_rate_limit = false;
4299 				break;
4300 			}
4301 		}
4302 	}
4303 
4304 	if (disable_rate_limit == false) {
4305 		if (bdev->internal.qos == NULL) {
4306 			bdev->internal.qos = calloc(1, sizeof(*bdev->internal.qos));
4307 			if (!bdev->internal.qos) {
4308 				pthread_mutex_unlock(&bdev->internal.mutex);
4309 				SPDK_ERRLOG("Unable to allocate memory for QoS tracking\n");
4310 				free(ctx);
4311 				cb_fn(cb_arg, -ENOMEM);
4312 				return;
4313 			}
4314 		}
4315 
4316 		if (bdev->internal.qos->thread == NULL) {
4317 			/* Enabling */
4318 			_spdk_bdev_set_qos_rate_limits(bdev, limits);
4319 
4320 			spdk_for_each_channel(__bdev_to_io_dev(bdev),
4321 					      _spdk_bdev_enable_qos_msg, ctx,
4322 					      _spdk_bdev_enable_qos_done);
4323 		} else {
4324 			/* Updating */
4325 			_spdk_bdev_set_qos_rate_limits(bdev, limits);
4326 
4327 			spdk_thread_send_msg(bdev->internal.qos->thread,
4328 					     _spdk_bdev_update_qos_rate_limit_msg, ctx);
4329 		}
4330 	} else {
4331 		if (bdev->internal.qos != NULL) {
4332 			_spdk_bdev_set_qos_rate_limits(bdev, limits);
4333 
4334 			/* Disabling */
4335 			spdk_for_each_channel(__bdev_to_io_dev(bdev),
4336 					      _spdk_bdev_disable_qos_msg, ctx,
4337 					      _spdk_bdev_disable_qos_msg_done);
4338 		} else {
4339 			pthread_mutex_unlock(&bdev->internal.mutex);
4340 			_spdk_bdev_set_qos_limit_done(ctx, 0);
4341 			return;
4342 		}
4343 	}
4344 
4345 	pthread_mutex_unlock(&bdev->internal.mutex);
4346 }
4347 
4348 struct spdk_bdev_histogram_ctx {
4349 	spdk_bdev_histogram_status_cb cb_fn;
4350 	void *cb_arg;
4351 	struct spdk_bdev *bdev;
4352 	int status;
4353 };
4354 
4355 static void
4356 _spdk_bdev_histogram_disable_channel_cb(struct spdk_io_channel_iter *i, int status)
4357 {
4358 	struct spdk_bdev_histogram_ctx *ctx = spdk_io_channel_iter_get_ctx(i);
4359 
4360 	pthread_mutex_lock(&ctx->bdev->internal.mutex);
4361 	ctx->bdev->internal.histogram_in_progress = false;
4362 	pthread_mutex_unlock(&ctx->bdev->internal.mutex);
4363 	ctx->cb_fn(ctx->cb_arg, ctx->status);
4364 	free(ctx);
4365 }
4366 
4367 static void
4368 _spdk_bdev_histogram_disable_channel(struct spdk_io_channel_iter *i)
4369 {
4370 	struct spdk_io_channel *_ch = spdk_io_channel_iter_get_channel(i);
4371 	struct spdk_bdev_channel *ch = spdk_io_channel_get_ctx(_ch);
4372 
4373 	if (ch->histogram != NULL) {
4374 		spdk_histogram_data_free(ch->histogram);
4375 		ch->histogram = NULL;
4376 	}
4377 	spdk_for_each_channel_continue(i, 0);
4378 }
4379 
4380 static void
4381 _spdk_bdev_histogram_enable_channel_cb(struct spdk_io_channel_iter *i, int status)
4382 {
4383 	struct spdk_bdev_histogram_ctx *ctx = spdk_io_channel_iter_get_ctx(i);
4384 
4385 	if (status != 0) {
4386 		ctx->status = status;
4387 		ctx->bdev->internal.histogram_enabled = false;
4388 		spdk_for_each_channel(__bdev_to_io_dev(ctx->bdev), _spdk_bdev_histogram_disable_channel, ctx,
4389 				      _spdk_bdev_histogram_disable_channel_cb);
4390 	} else {
4391 		pthread_mutex_lock(&ctx->bdev->internal.mutex);
4392 		ctx->bdev->internal.histogram_in_progress = false;
4393 		pthread_mutex_unlock(&ctx->bdev->internal.mutex);
4394 		ctx->cb_fn(ctx->cb_arg, ctx->status);
4395 		free(ctx);
4396 	}
4397 }
4398 
4399 static void
4400 _spdk_bdev_histogram_enable_channel(struct spdk_io_channel_iter *i)
4401 {
4402 	struct spdk_io_channel *_ch = spdk_io_channel_iter_get_channel(i);
4403 	struct spdk_bdev_channel *ch = spdk_io_channel_get_ctx(_ch);
4404 	int status = 0;
4405 
4406 	if (ch->histogram == NULL) {
4407 		ch->histogram = spdk_histogram_data_alloc();
4408 		if (ch->histogram == NULL) {
4409 			status = -ENOMEM;
4410 		}
4411 	}
4412 
4413 	spdk_for_each_channel_continue(i, status);
4414 }
4415 
4416 void
4417 spdk_bdev_histogram_enable(struct spdk_bdev *bdev, spdk_bdev_histogram_status_cb cb_fn,
4418 			   void *cb_arg, bool enable)
4419 {
4420 	struct spdk_bdev_histogram_ctx *ctx;
4421 
4422 	ctx = calloc(1, sizeof(struct spdk_bdev_histogram_ctx));
4423 	if (ctx == NULL) {
4424 		cb_fn(cb_arg, -ENOMEM);
4425 		return;
4426 	}
4427 
4428 	ctx->bdev = bdev;
4429 	ctx->status = 0;
4430 	ctx->cb_fn = cb_fn;
4431 	ctx->cb_arg = cb_arg;
4432 
4433 	pthread_mutex_lock(&bdev->internal.mutex);
4434 	if (bdev->internal.histogram_in_progress) {
4435 		pthread_mutex_unlock(&bdev->internal.mutex);
4436 		free(ctx);
4437 		cb_fn(cb_arg, -EAGAIN);
4438 		return;
4439 	}
4440 
4441 	bdev->internal.histogram_in_progress = true;
4442 	pthread_mutex_unlock(&bdev->internal.mutex);
4443 
4444 	bdev->internal.histogram_enabled = enable;
4445 
4446 	if (enable) {
4447 		/* Allocate histogram for each channel */
4448 		spdk_for_each_channel(__bdev_to_io_dev(bdev), _spdk_bdev_histogram_enable_channel, ctx,
4449 				      _spdk_bdev_histogram_enable_channel_cb);
4450 	} else {
4451 		spdk_for_each_channel(__bdev_to_io_dev(bdev), _spdk_bdev_histogram_disable_channel, ctx,
4452 				      _spdk_bdev_histogram_disable_channel_cb);
4453 	}
4454 }
4455 
4456 struct spdk_bdev_histogram_data_ctx {
4457 	spdk_bdev_histogram_data_cb cb_fn;
4458 	void *cb_arg;
4459 	struct spdk_bdev *bdev;
4460 	/** merged histogram data from all channels */
4461 	struct spdk_histogram_data	*histogram;
4462 };
4463 
4464 static void
4465 _spdk_bdev_histogram_get_channel_cb(struct spdk_io_channel_iter *i, int status)
4466 {
4467 	struct spdk_bdev_histogram_data_ctx *ctx = spdk_io_channel_iter_get_ctx(i);
4468 
4469 	ctx->cb_fn(ctx->cb_arg, status, ctx->histogram);
4470 	free(ctx);
4471 }
4472 
4473 static void
4474 _spdk_bdev_histogram_get_channel(struct spdk_io_channel_iter *i)
4475 {
4476 	struct spdk_io_channel *_ch = spdk_io_channel_iter_get_channel(i);
4477 	struct spdk_bdev_channel *ch = spdk_io_channel_get_ctx(_ch);
4478 	struct spdk_bdev_histogram_data_ctx *ctx = spdk_io_channel_iter_get_ctx(i);
4479 	int status = 0;
4480 
4481 	if (ch->histogram == NULL) {
4482 		status = -EFAULT;
4483 	} else {
4484 		spdk_histogram_data_merge(ctx->histogram, ch->histogram);
4485 	}
4486 
4487 	spdk_for_each_channel_continue(i, status);
4488 }
4489 
4490 void
4491 spdk_bdev_histogram_get(struct spdk_bdev *bdev, struct spdk_histogram_data *histogram,
4492 			spdk_bdev_histogram_data_cb cb_fn,
4493 			void *cb_arg)
4494 {
4495 	struct spdk_bdev_histogram_data_ctx *ctx;
4496 
4497 	ctx = calloc(1, sizeof(struct spdk_bdev_histogram_data_ctx));
4498 	if (ctx == NULL) {
4499 		cb_fn(cb_arg, -ENOMEM, NULL);
4500 		return;
4501 	}
4502 
4503 	ctx->bdev = bdev;
4504 	ctx->cb_fn = cb_fn;
4505 	ctx->cb_arg = cb_arg;
4506 
4507 	ctx->histogram = histogram;
4508 
4509 	spdk_for_each_channel(__bdev_to_io_dev(bdev), _spdk_bdev_histogram_get_channel, ctx,
4510 			      _spdk_bdev_histogram_get_channel_cb);
4511 }
4512 
4513 SPDK_LOG_REGISTER_COMPONENT("bdev", SPDK_LOG_BDEV)
4514 
4515 SPDK_TRACE_REGISTER_FN(bdev_trace, "bdev", TRACE_GROUP_BDEV)
4516 {
4517 	spdk_trace_register_owner(OWNER_BDEV, 'b');
4518 	spdk_trace_register_object(OBJECT_BDEV_IO, 'i');
4519 	spdk_trace_register_description("BDEV_IO_START", "", TRACE_BDEV_IO_START, OWNER_BDEV,
4520 					OBJECT_BDEV_IO, 1, 0, "type:   ");
4521 	spdk_trace_register_description("BDEV_IO_DONE", "", TRACE_BDEV_IO_DONE, OWNER_BDEV,
4522 					OBJECT_BDEV_IO, 0, 0, "");
4523 }
4524