xref: /spdk/lib/bdev/bdev.c (revision 65e5bbdc059f740cbbe8271ddec0929ea9b9624e)
1 /*-
2  *   BSD LICENSE
3  *
4  *   Copyright (c) Intel Corporation.
5  *   All rights reserved.
6  *
7  *   Redistribution and use in source and binary forms, with or without
8  *   modification, are permitted provided that the following conditions
9  *   are met:
10  *
11  *     * Redistributions of source code must retain the above copyright
12  *       notice, this list of conditions and the following disclaimer.
13  *     * Redistributions in binary form must reproduce the above copyright
14  *       notice, this list of conditions and the following disclaimer in
15  *       the documentation and/or other materials provided with the
16  *       distribution.
17  *     * Neither the name of Intel Corporation nor the names of its
18  *       contributors may be used to endorse or promote products derived
19  *       from this software without specific prior written permission.
20  *
21  *   THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22  *   "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23  *   LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
24  *   A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
25  *   OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
26  *   SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
27  *   LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
28  *   DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
29  *   THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
30  *   (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
31  *   OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
32  */
33 
34 #include "spdk/stdinc.h"
35 
36 #include "spdk/bdev.h"
37 #include "spdk/conf.h"
38 
39 #include "spdk/config.h"
40 #include "spdk/env.h"
41 #include "spdk/event.h"
42 #include "spdk/thread.h"
43 #include "spdk/likely.h"
44 #include "spdk/queue.h"
45 #include "spdk/nvme_spec.h"
46 #include "spdk/scsi_spec.h"
47 #include "spdk/util.h"
48 #include "spdk/trace.h"
49 
50 #include "spdk/bdev_module.h"
51 #include "spdk_internal/log.h"
52 #include "spdk/string.h"
53 
54 #ifdef SPDK_CONFIG_VTUNE
55 #include "ittnotify.h"
56 #include "ittnotify_types.h"
57 int __itt_init_ittlib(const char *, __itt_group_id);
58 #endif
59 
60 #define SPDK_BDEV_IO_POOL_SIZE			(64 * 1024)
61 #define SPDK_BDEV_IO_CACHE_SIZE			256
62 #define BUF_SMALL_POOL_SIZE			8192
63 #define BUF_LARGE_POOL_SIZE			1024
64 #define NOMEM_THRESHOLD_COUNT			8
65 #define ZERO_BUFFER_SIZE			0x100000
66 
67 #define OWNER_BDEV		0x2
68 
69 #define OBJECT_BDEV_IO		0x2
70 
71 #define TRACE_GROUP_BDEV	0x3
72 #define TRACE_BDEV_IO_START	SPDK_TPOINT_ID(TRACE_GROUP_BDEV, 0x0)
73 #define TRACE_BDEV_IO_DONE	SPDK_TPOINT_ID(TRACE_GROUP_BDEV, 0x1)
74 
75 #define SPDK_BDEV_QOS_TIMESLICE_IN_USEC		1000
76 #define SPDK_BDEV_QOS_MIN_IO_PER_TIMESLICE	1
77 #define SPDK_BDEV_QOS_MIN_BYTE_PER_TIMESLICE	512
78 #define SPDK_BDEV_QOS_MIN_IOS_PER_SEC		10000
79 #define SPDK_BDEV_QOS_MIN_BYTES_PER_SEC		(10 * 1024 * 1024)
80 #define SPDK_BDEV_QOS_LIMIT_NOT_DEFINED		UINT64_MAX
81 
82 static const char *qos_conf_type[] = {"Limit_IOPS", "Limit_BPS"};
83 static const char *qos_rpc_type[] = {"rw_ios_per_sec", "rw_mbytes_per_sec"};
84 
85 TAILQ_HEAD(spdk_bdev_list, spdk_bdev);
86 
87 struct spdk_bdev_mgr {
88 	struct spdk_mempool *bdev_io_pool;
89 
90 	struct spdk_mempool *buf_small_pool;
91 	struct spdk_mempool *buf_large_pool;
92 
93 	void *zero_buffer;
94 
95 	TAILQ_HEAD(bdev_module_list, spdk_bdev_module) bdev_modules;
96 
97 	struct spdk_bdev_list bdevs;
98 
99 	bool init_complete;
100 	bool module_init_complete;
101 
102 #ifdef SPDK_CONFIG_VTUNE
103 	__itt_domain	*domain;
104 #endif
105 };
106 
107 static struct spdk_bdev_mgr g_bdev_mgr = {
108 	.bdev_modules = TAILQ_HEAD_INITIALIZER(g_bdev_mgr.bdev_modules),
109 	.bdevs = TAILQ_HEAD_INITIALIZER(g_bdev_mgr.bdevs),
110 	.init_complete = false,
111 	.module_init_complete = false,
112 };
113 
114 static struct spdk_bdev_opts	g_bdev_opts = {
115 	.bdev_io_pool_size = SPDK_BDEV_IO_POOL_SIZE,
116 	.bdev_io_cache_size = SPDK_BDEV_IO_CACHE_SIZE,
117 };
118 
119 static spdk_bdev_init_cb	g_init_cb_fn = NULL;
120 static void			*g_init_cb_arg = NULL;
121 
122 static spdk_bdev_fini_cb	g_fini_cb_fn = NULL;
123 static void			*g_fini_cb_arg = NULL;
124 static struct spdk_thread	*g_fini_thread = NULL;
125 
126 struct spdk_bdev_qos_limit {
127 	/** IOs or bytes allowed per second (i.e., 1s). */
128 	uint64_t limit;
129 
130 	/** Remaining IOs or bytes allowed in current timeslice (e.g., 1ms).
131 	 *  For remaining bytes, allowed to run negative if an I/O is submitted when
132 	 *  some bytes are remaining, but the I/O is bigger than that amount. The
133 	 *  excess will be deducted from the next timeslice.
134 	 */
135 	int64_t remaining_this_timeslice;
136 
137 	/** Minimum allowed IOs or bytes to be issued in one timeslice (e.g., 1ms). */
138 	uint32_t min_per_timeslice;
139 
140 	/** Maximum allowed IOs or bytes to be issued in one timeslice (e.g., 1ms). */
141 	uint32_t max_per_timeslice;
142 };
143 
144 struct spdk_bdev_qos {
145 	/** Types of structure of rate limits. */
146 	struct spdk_bdev_qos_limit rate_limits[SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES];
147 
148 	/** The channel that all I/O are funneled through. */
149 	struct spdk_bdev_channel *ch;
150 
151 	/** The thread on which the poller is running. */
152 	struct spdk_thread *thread;
153 
154 	/** Queue of I/O waiting to be issued. */
155 	bdev_io_tailq_t queued;
156 
157 	/** Size of a timeslice in tsc ticks. */
158 	uint64_t timeslice_size;
159 
160 	/** Timestamp of start of last timeslice. */
161 	uint64_t last_timeslice;
162 
163 	/** Poller that processes queued I/O commands each time slice. */
164 	struct spdk_poller *poller;
165 };
166 
167 struct spdk_bdev_mgmt_channel {
168 	bdev_io_stailq_t need_buf_small;
169 	bdev_io_stailq_t need_buf_large;
170 
171 	/*
172 	 * Each thread keeps a cache of bdev_io - this allows
173 	 *  bdev threads which are *not* DPDK threads to still
174 	 *  benefit from a per-thread bdev_io cache.  Without
175 	 *  this, non-DPDK threads fetching from the mempool
176 	 *  incur a cmpxchg on get and put.
177 	 */
178 	bdev_io_stailq_t per_thread_cache;
179 	uint32_t	per_thread_cache_count;
180 	uint32_t	bdev_io_cache_size;
181 
182 	TAILQ_HEAD(, spdk_bdev_shared_resource)	shared_resources;
183 	TAILQ_HEAD(, spdk_bdev_io_wait_entry)	io_wait_queue;
184 };
185 
186 /*
187  * Per-module (or per-io_device) data. Multiple bdevs built on the same io_device
188  * will queue here their IO that awaits retry. It makes it possible to retry sending
189  * IO to one bdev after IO from other bdev completes.
190  */
191 struct spdk_bdev_shared_resource {
192 	/* The bdev management channel */
193 	struct spdk_bdev_mgmt_channel *mgmt_ch;
194 
195 	/*
196 	 * Count of I/O submitted to bdev module and waiting for completion.
197 	 * Incremented before submit_request() is called on an spdk_bdev_io.
198 	 */
199 	uint64_t		io_outstanding;
200 
201 	/*
202 	 * Queue of IO awaiting retry because of a previous NOMEM status returned
203 	 *  on this channel.
204 	 */
205 	bdev_io_tailq_t		nomem_io;
206 
207 	/*
208 	 * Threshold which io_outstanding must drop to before retrying nomem_io.
209 	 */
210 	uint64_t		nomem_threshold;
211 
212 	/* I/O channel allocated by a bdev module */
213 	struct spdk_io_channel	*shared_ch;
214 
215 	/* Refcount of bdev channels using this resource */
216 	uint32_t		ref;
217 
218 	TAILQ_ENTRY(spdk_bdev_shared_resource) link;
219 };
220 
221 #define BDEV_CH_RESET_IN_PROGRESS	(1 << 0)
222 #define BDEV_CH_QOS_ENABLED		(1 << 1)
223 
224 struct spdk_bdev_channel {
225 	struct spdk_bdev	*bdev;
226 
227 	/* The channel for the underlying device */
228 	struct spdk_io_channel	*channel;
229 
230 	/* Per io_device per thread data */
231 	struct spdk_bdev_shared_resource *shared_resource;
232 
233 	struct spdk_bdev_io_stat stat;
234 
235 	/*
236 	 * Count of I/O submitted through this channel and waiting for completion.
237 	 * Incremented before submit_request() is called on an spdk_bdev_io.
238 	 */
239 	uint64_t		io_outstanding;
240 
241 	bdev_io_tailq_t		queued_resets;
242 
243 	uint32_t		flags;
244 
245 #ifdef SPDK_CONFIG_VTUNE
246 	uint64_t		start_tsc;
247 	uint64_t		interval_tsc;
248 	__itt_string_handle	*handle;
249 	struct spdk_bdev_io_stat prev_stat;
250 #endif
251 
252 };
253 
254 struct spdk_bdev_desc {
255 	struct spdk_bdev		*bdev;
256 	struct spdk_thread		*thread;
257 	spdk_bdev_remove_cb_t		remove_cb;
258 	void				*remove_ctx;
259 	bool				remove_scheduled;
260 	bool				closed;
261 	bool				write;
262 	TAILQ_ENTRY(spdk_bdev_desc)	link;
263 };
264 
265 struct spdk_bdev_iostat_ctx {
266 	struct spdk_bdev_io_stat *stat;
267 	spdk_bdev_get_device_stat_cb cb;
268 	void *cb_arg;
269 };
270 
271 #define __bdev_to_io_dev(bdev)		(((char *)bdev) + 1)
272 #define __bdev_from_io_dev(io_dev)	((struct spdk_bdev *)(((char *)io_dev) - 1))
273 
274 static void _spdk_bdev_write_zero_buffer_done(struct spdk_bdev_io *bdev_io, bool success,
275 		void *cb_arg);
276 static void _spdk_bdev_write_zero_buffer_next(void *_bdev_io);
277 
278 void
279 spdk_bdev_get_opts(struct spdk_bdev_opts *opts)
280 {
281 	*opts = g_bdev_opts;
282 }
283 
284 int
285 spdk_bdev_set_opts(struct spdk_bdev_opts *opts)
286 {
287 	uint32_t min_pool_size;
288 
289 	/*
290 	 * Add 1 to the thread count to account for the extra mgmt_ch that gets created during subsystem
291 	 *  initialization.  A second mgmt_ch will be created on the same thread when the application starts
292 	 *  but before the deferred put_io_channel event is executed for the first mgmt_ch.
293 	 */
294 	min_pool_size = opts->bdev_io_cache_size * (spdk_thread_get_count() + 1);
295 	if (opts->bdev_io_pool_size < min_pool_size) {
296 		SPDK_ERRLOG("bdev_io_pool_size %" PRIu32 " is not compatible with bdev_io_cache_size %" PRIu32
297 			    " and %" PRIu32 " threads\n", opts->bdev_io_pool_size, opts->bdev_io_cache_size,
298 			    spdk_thread_get_count());
299 		SPDK_ERRLOG("bdev_io_pool_size must be at least %" PRIu32 "\n", min_pool_size);
300 		return -1;
301 	}
302 
303 	g_bdev_opts = *opts;
304 	return 0;
305 }
306 
307 struct spdk_bdev *
308 spdk_bdev_first(void)
309 {
310 	struct spdk_bdev *bdev;
311 
312 	bdev = TAILQ_FIRST(&g_bdev_mgr.bdevs);
313 	if (bdev) {
314 		SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Starting bdev iteration at %s\n", bdev->name);
315 	}
316 
317 	return bdev;
318 }
319 
320 struct spdk_bdev *
321 spdk_bdev_next(struct spdk_bdev *prev)
322 {
323 	struct spdk_bdev *bdev;
324 
325 	bdev = TAILQ_NEXT(prev, internal.link);
326 	if (bdev) {
327 		SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Continuing bdev iteration at %s\n", bdev->name);
328 	}
329 
330 	return bdev;
331 }
332 
333 static struct spdk_bdev *
334 _bdev_next_leaf(struct spdk_bdev *bdev)
335 {
336 	while (bdev != NULL) {
337 		if (bdev->internal.claim_module == NULL) {
338 			return bdev;
339 		} else {
340 			bdev = TAILQ_NEXT(bdev, internal.link);
341 		}
342 	}
343 
344 	return bdev;
345 }
346 
347 struct spdk_bdev *
348 spdk_bdev_first_leaf(void)
349 {
350 	struct spdk_bdev *bdev;
351 
352 	bdev = _bdev_next_leaf(TAILQ_FIRST(&g_bdev_mgr.bdevs));
353 
354 	if (bdev) {
355 		SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Starting bdev iteration at %s\n", bdev->name);
356 	}
357 
358 	return bdev;
359 }
360 
361 struct spdk_bdev *
362 spdk_bdev_next_leaf(struct spdk_bdev *prev)
363 {
364 	struct spdk_bdev *bdev;
365 
366 	bdev = _bdev_next_leaf(TAILQ_NEXT(prev, internal.link));
367 
368 	if (bdev) {
369 		SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Continuing bdev iteration at %s\n", bdev->name);
370 	}
371 
372 	return bdev;
373 }
374 
375 struct spdk_bdev *
376 spdk_bdev_get_by_name(const char *bdev_name)
377 {
378 	struct spdk_bdev_alias *tmp;
379 	struct spdk_bdev *bdev = spdk_bdev_first();
380 
381 	while (bdev != NULL) {
382 		if (strcmp(bdev_name, bdev->name) == 0) {
383 			return bdev;
384 		}
385 
386 		TAILQ_FOREACH(tmp, &bdev->aliases, tailq) {
387 			if (strcmp(bdev_name, tmp->alias) == 0) {
388 				return bdev;
389 			}
390 		}
391 
392 		bdev = spdk_bdev_next(bdev);
393 	}
394 
395 	return NULL;
396 }
397 
398 void
399 spdk_bdev_io_set_buf(struct spdk_bdev_io *bdev_io, void *buf, size_t len)
400 {
401 	struct iovec *iovs;
402 
403 	iovs = bdev_io->u.bdev.iovs;
404 
405 	assert(iovs != NULL);
406 	assert(bdev_io->u.bdev.iovcnt >= 1);
407 
408 	iovs[0].iov_base = buf;
409 	iovs[0].iov_len = len;
410 }
411 
412 static void
413 spdk_bdev_io_put_buf(struct spdk_bdev_io *bdev_io)
414 {
415 	struct spdk_mempool *pool;
416 	struct spdk_bdev_io *tmp;
417 	void *buf, *aligned_buf;
418 	bdev_io_stailq_t *stailq;
419 	struct spdk_bdev_mgmt_channel *ch;
420 
421 	assert(bdev_io->u.bdev.iovcnt == 1);
422 
423 	buf = bdev_io->internal.buf;
424 	ch = bdev_io->internal.ch->shared_resource->mgmt_ch;
425 
426 	bdev_io->internal.buf = NULL;
427 
428 	if (bdev_io->internal.buf_len <= SPDK_BDEV_SMALL_BUF_MAX_SIZE) {
429 		pool = g_bdev_mgr.buf_small_pool;
430 		stailq = &ch->need_buf_small;
431 	} else {
432 		pool = g_bdev_mgr.buf_large_pool;
433 		stailq = &ch->need_buf_large;
434 	}
435 
436 	if (STAILQ_EMPTY(stailq)) {
437 		spdk_mempool_put(pool, buf);
438 	} else {
439 		tmp = STAILQ_FIRST(stailq);
440 
441 		aligned_buf = (void *)(((uintptr_t)buf + 511) & ~511UL);
442 		spdk_bdev_io_set_buf(tmp, aligned_buf, tmp->internal.buf_len);
443 
444 		STAILQ_REMOVE_HEAD(stailq, internal.buf_link);
445 		tmp->internal.buf = buf;
446 		tmp->internal.get_buf_cb(tmp->internal.ch->channel, tmp);
447 	}
448 }
449 
450 void
451 spdk_bdev_io_get_buf(struct spdk_bdev_io *bdev_io, spdk_bdev_io_get_buf_cb cb, uint64_t len)
452 {
453 	struct spdk_mempool *pool;
454 	bdev_io_stailq_t *stailq;
455 	void *buf, *aligned_buf;
456 	struct spdk_bdev_mgmt_channel *mgmt_ch;
457 
458 	assert(cb != NULL);
459 	assert(bdev_io->u.bdev.iovs != NULL);
460 
461 	if (spdk_unlikely(bdev_io->u.bdev.iovs[0].iov_base != NULL)) {
462 		/* Buffer already present */
463 		cb(bdev_io->internal.ch->channel, bdev_io);
464 		return;
465 	}
466 
467 	assert(len <= SPDK_BDEV_LARGE_BUF_MAX_SIZE);
468 	mgmt_ch = bdev_io->internal.ch->shared_resource->mgmt_ch;
469 
470 	bdev_io->internal.buf_len = len;
471 	bdev_io->internal.get_buf_cb = cb;
472 	if (len <= SPDK_BDEV_SMALL_BUF_MAX_SIZE) {
473 		pool = g_bdev_mgr.buf_small_pool;
474 		stailq = &mgmt_ch->need_buf_small;
475 	} else {
476 		pool = g_bdev_mgr.buf_large_pool;
477 		stailq = &mgmt_ch->need_buf_large;
478 	}
479 
480 	buf = spdk_mempool_get(pool);
481 
482 	if (!buf) {
483 		STAILQ_INSERT_TAIL(stailq, bdev_io, internal.buf_link);
484 	} else {
485 		aligned_buf = (void *)(((uintptr_t)buf + 511) & ~511UL);
486 		spdk_bdev_io_set_buf(bdev_io, aligned_buf, len);
487 
488 		bdev_io->internal.buf = buf;
489 		bdev_io->internal.get_buf_cb(bdev_io->internal.ch->channel, bdev_io);
490 	}
491 }
492 
493 static int
494 spdk_bdev_module_get_max_ctx_size(void)
495 {
496 	struct spdk_bdev_module *bdev_module;
497 	int max_bdev_module_size = 0;
498 
499 	TAILQ_FOREACH(bdev_module, &g_bdev_mgr.bdev_modules, internal.tailq) {
500 		if (bdev_module->get_ctx_size && bdev_module->get_ctx_size() > max_bdev_module_size) {
501 			max_bdev_module_size = bdev_module->get_ctx_size();
502 		}
503 	}
504 
505 	return max_bdev_module_size;
506 }
507 
508 void
509 spdk_bdev_config_text(FILE *fp)
510 {
511 	struct spdk_bdev_module *bdev_module;
512 
513 	TAILQ_FOREACH(bdev_module, &g_bdev_mgr.bdev_modules, internal.tailq) {
514 		if (bdev_module->config_text) {
515 			bdev_module->config_text(fp);
516 		}
517 	}
518 }
519 
520 static void
521 spdk_bdev_qos_config_json(struct spdk_bdev *bdev, struct spdk_json_write_ctx *w)
522 {
523 	int i;
524 	struct spdk_bdev_qos *qos = bdev->internal.qos;
525 	uint64_t limits[SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES];
526 
527 	if (!qos) {
528 		return;
529 	}
530 
531 	spdk_bdev_get_qos_rate_limits(bdev, limits);
532 
533 	spdk_json_write_object_begin(w);
534 	spdk_json_write_named_string(w, "method", "set_bdev_qos_limit");
535 	spdk_json_write_name(w, "params");
536 
537 	spdk_json_write_object_begin(w);
538 	spdk_json_write_named_string(w, "name", bdev->name);
539 	for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) {
540 		if (limits[i] > 0) {
541 			spdk_json_write_named_uint64(w, qos_rpc_type[i], limits[i]);
542 		}
543 	}
544 	spdk_json_write_object_end(w);
545 
546 	spdk_json_write_object_end(w);
547 }
548 
549 void
550 spdk_bdev_subsystem_config_json(struct spdk_json_write_ctx *w)
551 {
552 	struct spdk_bdev_module *bdev_module;
553 	struct spdk_bdev *bdev;
554 
555 	assert(w != NULL);
556 
557 	spdk_json_write_array_begin(w);
558 
559 	spdk_json_write_object_begin(w);
560 	spdk_json_write_named_string(w, "method", "set_bdev_options");
561 	spdk_json_write_name(w, "params");
562 	spdk_json_write_object_begin(w);
563 	spdk_json_write_named_uint32(w, "bdev_io_pool_size", g_bdev_opts.bdev_io_pool_size);
564 	spdk_json_write_named_uint32(w, "bdev_io_cache_size", g_bdev_opts.bdev_io_cache_size);
565 	spdk_json_write_object_end(w);
566 	spdk_json_write_object_end(w);
567 
568 	TAILQ_FOREACH(bdev_module, &g_bdev_mgr.bdev_modules, internal.tailq) {
569 		if (bdev_module->config_json) {
570 			bdev_module->config_json(w);
571 		}
572 	}
573 
574 	TAILQ_FOREACH(bdev, &g_bdev_mgr.bdevs, internal.link) {
575 		spdk_bdev_qos_config_json(bdev, w);
576 
577 		if (bdev->fn_table->write_config_json) {
578 			bdev->fn_table->write_config_json(bdev, w);
579 		}
580 	}
581 
582 	spdk_json_write_array_end(w);
583 }
584 
585 static int
586 spdk_bdev_mgmt_channel_create(void *io_device, void *ctx_buf)
587 {
588 	struct spdk_bdev_mgmt_channel *ch = ctx_buf;
589 	struct spdk_bdev_io *bdev_io;
590 	uint32_t i;
591 
592 	STAILQ_INIT(&ch->need_buf_small);
593 	STAILQ_INIT(&ch->need_buf_large);
594 
595 	STAILQ_INIT(&ch->per_thread_cache);
596 	ch->bdev_io_cache_size = g_bdev_opts.bdev_io_cache_size;
597 
598 	/* Pre-populate bdev_io cache to ensure this thread cannot be starved. */
599 	ch->per_thread_cache_count = 0;
600 	for (i = 0; i < ch->bdev_io_cache_size; i++) {
601 		bdev_io = spdk_mempool_get(g_bdev_mgr.bdev_io_pool);
602 		assert(bdev_io != NULL);
603 		ch->per_thread_cache_count++;
604 		STAILQ_INSERT_TAIL(&ch->per_thread_cache, bdev_io, internal.buf_link);
605 	}
606 
607 	TAILQ_INIT(&ch->shared_resources);
608 	TAILQ_INIT(&ch->io_wait_queue);
609 
610 	return 0;
611 }
612 
613 static void
614 spdk_bdev_mgmt_channel_destroy(void *io_device, void *ctx_buf)
615 {
616 	struct spdk_bdev_mgmt_channel *ch = ctx_buf;
617 	struct spdk_bdev_io *bdev_io;
618 
619 	if (!STAILQ_EMPTY(&ch->need_buf_small) || !STAILQ_EMPTY(&ch->need_buf_large)) {
620 		SPDK_ERRLOG("Pending I/O list wasn't empty on mgmt channel free\n");
621 	}
622 
623 	if (!TAILQ_EMPTY(&ch->shared_resources)) {
624 		SPDK_ERRLOG("Module channel list wasn't empty on mgmt channel free\n");
625 	}
626 
627 	while (!STAILQ_EMPTY(&ch->per_thread_cache)) {
628 		bdev_io = STAILQ_FIRST(&ch->per_thread_cache);
629 		STAILQ_REMOVE_HEAD(&ch->per_thread_cache, internal.buf_link);
630 		ch->per_thread_cache_count--;
631 		spdk_mempool_put(g_bdev_mgr.bdev_io_pool, (void *)bdev_io);
632 	}
633 
634 	assert(ch->per_thread_cache_count == 0);
635 }
636 
637 static void
638 spdk_bdev_init_complete(int rc)
639 {
640 	spdk_bdev_init_cb cb_fn = g_init_cb_fn;
641 	void *cb_arg = g_init_cb_arg;
642 	struct spdk_bdev_module *m;
643 
644 	g_bdev_mgr.init_complete = true;
645 	g_init_cb_fn = NULL;
646 	g_init_cb_arg = NULL;
647 
648 	/*
649 	 * For modules that need to know when subsystem init is complete,
650 	 * inform them now.
651 	 */
652 	if (rc == 0) {
653 		TAILQ_FOREACH(m, &g_bdev_mgr.bdev_modules, internal.tailq) {
654 			if (m->init_complete) {
655 				m->init_complete();
656 			}
657 		}
658 	}
659 
660 	cb_fn(cb_arg, rc);
661 }
662 
663 static void
664 spdk_bdev_module_action_complete(void)
665 {
666 	struct spdk_bdev_module *m;
667 
668 	/*
669 	 * Don't finish bdev subsystem initialization if
670 	 * module pre-initialization is still in progress, or
671 	 * the subsystem been already initialized.
672 	 */
673 	if (!g_bdev_mgr.module_init_complete || g_bdev_mgr.init_complete) {
674 		return;
675 	}
676 
677 	/*
678 	 * Check all bdev modules for inits/examinations in progress. If any
679 	 * exist, return immediately since we cannot finish bdev subsystem
680 	 * initialization until all are completed.
681 	 */
682 	TAILQ_FOREACH(m, &g_bdev_mgr.bdev_modules, internal.tailq) {
683 		if (m->internal.action_in_progress > 0) {
684 			return;
685 		}
686 	}
687 
688 	/*
689 	 * Modules already finished initialization - now that all
690 	 * the bdev modules have finished their asynchronous I/O
691 	 * processing, the entire bdev layer can be marked as complete.
692 	 */
693 	spdk_bdev_init_complete(0);
694 }
695 
696 static void
697 spdk_bdev_module_action_done(struct spdk_bdev_module *module)
698 {
699 	assert(module->internal.action_in_progress > 0);
700 	module->internal.action_in_progress--;
701 	spdk_bdev_module_action_complete();
702 }
703 
704 void
705 spdk_bdev_module_init_done(struct spdk_bdev_module *module)
706 {
707 	spdk_bdev_module_action_done(module);
708 }
709 
710 void
711 spdk_bdev_module_examine_done(struct spdk_bdev_module *module)
712 {
713 	spdk_bdev_module_action_done(module);
714 }
715 
716 /** The last initialized bdev module */
717 static struct spdk_bdev_module *g_resume_bdev_module = NULL;
718 
719 static int
720 spdk_bdev_modules_init(void)
721 {
722 	struct spdk_bdev_module *module;
723 	int rc = 0;
724 
725 	TAILQ_FOREACH(module, &g_bdev_mgr.bdev_modules, internal.tailq) {
726 		g_resume_bdev_module = module;
727 		rc = module->module_init();
728 		if (rc != 0) {
729 			return rc;
730 		}
731 	}
732 
733 	g_resume_bdev_module = NULL;
734 	return 0;
735 }
736 
737 
738 static void
739 spdk_bdev_init_failed_complete(void *cb_arg)
740 {
741 	spdk_bdev_init_complete(-1);
742 }
743 
744 static void
745 spdk_bdev_init_failed(void *cb_arg)
746 {
747 	spdk_bdev_finish(spdk_bdev_init_failed_complete, NULL);
748 }
749 
750 void
751 spdk_bdev_initialize(spdk_bdev_init_cb cb_fn, void *cb_arg)
752 {
753 	struct spdk_conf_section *sp;
754 	struct spdk_bdev_opts bdev_opts;
755 	int32_t bdev_io_pool_size, bdev_io_cache_size;
756 	int cache_size;
757 	int rc = 0;
758 	char mempool_name[32];
759 
760 	assert(cb_fn != NULL);
761 
762 	sp = spdk_conf_find_section(NULL, "Bdev");
763 	if (sp != NULL) {
764 		spdk_bdev_get_opts(&bdev_opts);
765 
766 		bdev_io_pool_size = spdk_conf_section_get_intval(sp, "BdevIoPoolSize");
767 		if (bdev_io_pool_size >= 0) {
768 			bdev_opts.bdev_io_pool_size = bdev_io_pool_size;
769 		}
770 
771 		bdev_io_cache_size = spdk_conf_section_get_intval(sp, "BdevIoCacheSize");
772 		if (bdev_io_cache_size >= 0) {
773 			bdev_opts.bdev_io_cache_size = bdev_io_cache_size;
774 		}
775 
776 		if (spdk_bdev_set_opts(&bdev_opts)) {
777 			spdk_bdev_init_complete(-1);
778 			return;
779 		}
780 
781 		assert(memcmp(&bdev_opts, &g_bdev_opts, sizeof(bdev_opts)) == 0);
782 	}
783 
784 	g_init_cb_fn = cb_fn;
785 	g_init_cb_arg = cb_arg;
786 
787 	snprintf(mempool_name, sizeof(mempool_name), "bdev_io_%d", getpid());
788 
789 	g_bdev_mgr.bdev_io_pool = spdk_mempool_create(mempool_name,
790 				  g_bdev_opts.bdev_io_pool_size,
791 				  sizeof(struct spdk_bdev_io) +
792 				  spdk_bdev_module_get_max_ctx_size(),
793 				  0,
794 				  SPDK_ENV_SOCKET_ID_ANY);
795 
796 	if (g_bdev_mgr.bdev_io_pool == NULL) {
797 		SPDK_ERRLOG("could not allocate spdk_bdev_io pool\n");
798 		spdk_bdev_init_complete(-1);
799 		return;
800 	}
801 
802 	/**
803 	 * Ensure no more than half of the total buffers end up local caches, by
804 	 *   using spdk_thread_get_count() to determine how many local caches we need
805 	 *   to account for.
806 	 */
807 	cache_size = BUF_SMALL_POOL_SIZE / (2 * spdk_thread_get_count());
808 	snprintf(mempool_name, sizeof(mempool_name), "buf_small_pool_%d", getpid());
809 
810 	g_bdev_mgr.buf_small_pool = spdk_mempool_create(mempool_name,
811 				    BUF_SMALL_POOL_SIZE,
812 				    SPDK_BDEV_SMALL_BUF_MAX_SIZE + 512,
813 				    cache_size,
814 				    SPDK_ENV_SOCKET_ID_ANY);
815 	if (!g_bdev_mgr.buf_small_pool) {
816 		SPDK_ERRLOG("create rbuf small pool failed\n");
817 		spdk_bdev_init_complete(-1);
818 		return;
819 	}
820 
821 	cache_size = BUF_LARGE_POOL_SIZE / (2 * spdk_thread_get_count());
822 	snprintf(mempool_name, sizeof(mempool_name), "buf_large_pool_%d", getpid());
823 
824 	g_bdev_mgr.buf_large_pool = spdk_mempool_create(mempool_name,
825 				    BUF_LARGE_POOL_SIZE,
826 				    SPDK_BDEV_LARGE_BUF_MAX_SIZE + 512,
827 				    cache_size,
828 				    SPDK_ENV_SOCKET_ID_ANY);
829 	if (!g_bdev_mgr.buf_large_pool) {
830 		SPDK_ERRLOG("create rbuf large pool failed\n");
831 		spdk_bdev_init_complete(-1);
832 		return;
833 	}
834 
835 	g_bdev_mgr.zero_buffer = spdk_dma_zmalloc(ZERO_BUFFER_SIZE, ZERO_BUFFER_SIZE,
836 				 NULL);
837 	if (!g_bdev_mgr.zero_buffer) {
838 		SPDK_ERRLOG("create bdev zero buffer failed\n");
839 		spdk_bdev_init_complete(-1);
840 		return;
841 	}
842 
843 #ifdef SPDK_CONFIG_VTUNE
844 	g_bdev_mgr.domain = __itt_domain_create("spdk_bdev");
845 #endif
846 
847 	spdk_io_device_register(&g_bdev_mgr, spdk_bdev_mgmt_channel_create,
848 				spdk_bdev_mgmt_channel_destroy,
849 				sizeof(struct spdk_bdev_mgmt_channel),
850 				"bdev_mgr");
851 
852 	rc = spdk_bdev_modules_init();
853 	g_bdev_mgr.module_init_complete = true;
854 	if (rc != 0) {
855 		SPDK_ERRLOG("bdev modules init failed\n");
856 		spdk_thread_send_msg(spdk_get_thread(), spdk_bdev_init_failed, NULL);
857 		return;
858 	}
859 
860 	spdk_bdev_module_action_complete();
861 }
862 
863 static void
864 spdk_bdev_mgr_unregister_cb(void *io_device)
865 {
866 	spdk_bdev_fini_cb cb_fn = g_fini_cb_fn;
867 
868 	if (spdk_mempool_count(g_bdev_mgr.bdev_io_pool) != g_bdev_opts.bdev_io_pool_size) {
869 		SPDK_ERRLOG("bdev IO pool count is %zu but should be %u\n",
870 			    spdk_mempool_count(g_bdev_mgr.bdev_io_pool),
871 			    g_bdev_opts.bdev_io_pool_size);
872 	}
873 
874 	if (spdk_mempool_count(g_bdev_mgr.buf_small_pool) != BUF_SMALL_POOL_SIZE) {
875 		SPDK_ERRLOG("Small buffer pool count is %zu but should be %u\n",
876 			    spdk_mempool_count(g_bdev_mgr.buf_small_pool),
877 			    BUF_SMALL_POOL_SIZE);
878 		assert(false);
879 	}
880 
881 	if (spdk_mempool_count(g_bdev_mgr.buf_large_pool) != BUF_LARGE_POOL_SIZE) {
882 		SPDK_ERRLOG("Large buffer pool count is %zu but should be %u\n",
883 			    spdk_mempool_count(g_bdev_mgr.buf_large_pool),
884 			    BUF_LARGE_POOL_SIZE);
885 		assert(false);
886 	}
887 
888 	spdk_mempool_free(g_bdev_mgr.bdev_io_pool);
889 	spdk_mempool_free(g_bdev_mgr.buf_small_pool);
890 	spdk_mempool_free(g_bdev_mgr.buf_large_pool);
891 	spdk_dma_free(g_bdev_mgr.zero_buffer);
892 
893 	cb_fn(g_fini_cb_arg);
894 	g_fini_cb_fn = NULL;
895 	g_fini_cb_arg = NULL;
896 	g_bdev_mgr.init_complete = false;
897 	g_bdev_mgr.module_init_complete = false;
898 }
899 
900 static void
901 spdk_bdev_module_finish_iter(void *arg)
902 {
903 	struct spdk_bdev_module *bdev_module;
904 
905 	/* Start iterating from the last touched module */
906 	if (!g_resume_bdev_module) {
907 		bdev_module = TAILQ_LAST(&g_bdev_mgr.bdev_modules, bdev_module_list);
908 	} else {
909 		bdev_module = TAILQ_PREV(g_resume_bdev_module, bdev_module_list,
910 					 internal.tailq);
911 	}
912 
913 	while (bdev_module) {
914 		if (bdev_module->async_fini) {
915 			/* Save our place so we can resume later. We must
916 			 * save the variable here, before calling module_fini()
917 			 * below, because in some cases the module may immediately
918 			 * call spdk_bdev_module_finish_done() and re-enter
919 			 * this function to continue iterating. */
920 			g_resume_bdev_module = bdev_module;
921 		}
922 
923 		if (bdev_module->module_fini) {
924 			bdev_module->module_fini();
925 		}
926 
927 		if (bdev_module->async_fini) {
928 			return;
929 		}
930 
931 		bdev_module = TAILQ_PREV(bdev_module, bdev_module_list,
932 					 internal.tailq);
933 	}
934 
935 	g_resume_bdev_module = NULL;
936 	spdk_io_device_unregister(&g_bdev_mgr, spdk_bdev_mgr_unregister_cb);
937 }
938 
939 void
940 spdk_bdev_module_finish_done(void)
941 {
942 	if (spdk_get_thread() != g_fini_thread) {
943 		spdk_thread_send_msg(g_fini_thread, spdk_bdev_module_finish_iter, NULL);
944 	} else {
945 		spdk_bdev_module_finish_iter(NULL);
946 	}
947 }
948 
949 static void
950 _spdk_bdev_finish_unregister_bdevs_iter(void *cb_arg, int bdeverrno)
951 {
952 	struct spdk_bdev *bdev = cb_arg;
953 
954 	if (bdeverrno && bdev) {
955 		SPDK_WARNLOG("Unable to unregister bdev '%s' during spdk_bdev_finish()\n",
956 			     bdev->name);
957 
958 		/*
959 		 * Since the call to spdk_bdev_unregister() failed, we have no way to free this
960 		 *  bdev; try to continue by manually removing this bdev from the list and continue
961 		 *  with the next bdev in the list.
962 		 */
963 		TAILQ_REMOVE(&g_bdev_mgr.bdevs, bdev, internal.link);
964 	}
965 
966 	if (TAILQ_EMPTY(&g_bdev_mgr.bdevs)) {
967 		SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Done unregistering bdevs\n");
968 		/*
969 		 * Bdev module finish need to be deffered as we might be in the middle of some context
970 		 * (like bdev part free) that will use this bdev (or private bdev driver ctx data)
971 		 * after returning.
972 		 */
973 		spdk_thread_send_msg(spdk_get_thread(), spdk_bdev_module_finish_iter, NULL);
974 		return;
975 	}
976 
977 	/*
978 	 * Unregister the last bdev in the list.  The last bdev in the list should be a bdev
979 	 * that has no bdevs that depend on it.
980 	 */
981 	bdev = TAILQ_LAST(&g_bdev_mgr.bdevs, spdk_bdev_list);
982 	SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Unregistering bdev '%s'\n", bdev->name);
983 	spdk_bdev_unregister(bdev, _spdk_bdev_finish_unregister_bdevs_iter, bdev);
984 }
985 
986 void
987 spdk_bdev_finish(spdk_bdev_fini_cb cb_fn, void *cb_arg)
988 {
989 	struct spdk_bdev_module *m;
990 
991 	assert(cb_fn != NULL);
992 
993 	g_fini_thread = spdk_get_thread();
994 
995 	g_fini_cb_fn = cb_fn;
996 	g_fini_cb_arg = cb_arg;
997 
998 	TAILQ_FOREACH(m, &g_bdev_mgr.bdev_modules, internal.tailq) {
999 		if (m->fini_start) {
1000 			m->fini_start();
1001 		}
1002 	}
1003 
1004 	_spdk_bdev_finish_unregister_bdevs_iter(NULL, 0);
1005 }
1006 
1007 static struct spdk_bdev_io *
1008 spdk_bdev_get_io(struct spdk_bdev_channel *channel)
1009 {
1010 	struct spdk_bdev_mgmt_channel *ch = channel->shared_resource->mgmt_ch;
1011 	struct spdk_bdev_io *bdev_io;
1012 
1013 	if (ch->per_thread_cache_count > 0) {
1014 		bdev_io = STAILQ_FIRST(&ch->per_thread_cache);
1015 		STAILQ_REMOVE_HEAD(&ch->per_thread_cache, internal.buf_link);
1016 		ch->per_thread_cache_count--;
1017 	} else if (spdk_unlikely(!TAILQ_EMPTY(&ch->io_wait_queue))) {
1018 		/*
1019 		 * Don't try to look for bdev_ios in the global pool if there are
1020 		 * waiters on bdev_ios - we don't want this caller to jump the line.
1021 		 */
1022 		bdev_io = NULL;
1023 	} else {
1024 		bdev_io = spdk_mempool_get(g_bdev_mgr.bdev_io_pool);
1025 	}
1026 
1027 	return bdev_io;
1028 }
1029 
1030 void
1031 spdk_bdev_free_io(struct spdk_bdev_io *bdev_io)
1032 {
1033 	struct spdk_bdev_mgmt_channel *ch = bdev_io->internal.ch->shared_resource->mgmt_ch;
1034 
1035 	assert(bdev_io != NULL);
1036 	assert(bdev_io->internal.status != SPDK_BDEV_IO_STATUS_PENDING);
1037 
1038 	if (bdev_io->internal.buf != NULL) {
1039 		spdk_bdev_io_put_buf(bdev_io);
1040 	}
1041 
1042 	if (ch->per_thread_cache_count < ch->bdev_io_cache_size) {
1043 		ch->per_thread_cache_count++;
1044 		STAILQ_INSERT_TAIL(&ch->per_thread_cache, bdev_io, internal.buf_link);
1045 		while (ch->per_thread_cache_count > 0 && !TAILQ_EMPTY(&ch->io_wait_queue)) {
1046 			struct spdk_bdev_io_wait_entry *entry;
1047 
1048 			entry = TAILQ_FIRST(&ch->io_wait_queue);
1049 			TAILQ_REMOVE(&ch->io_wait_queue, entry, link);
1050 			entry->cb_fn(entry->cb_arg);
1051 		}
1052 	} else {
1053 		/* We should never have a full cache with entries on the io wait queue. */
1054 		assert(TAILQ_EMPTY(&ch->io_wait_queue));
1055 		spdk_mempool_put(g_bdev_mgr.bdev_io_pool, (void *)bdev_io);
1056 	}
1057 }
1058 
1059 static bool
1060 _spdk_bdev_qos_is_iops_rate_limit(enum spdk_bdev_qos_rate_limit_type limit)
1061 {
1062 	assert(limit != SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES);
1063 
1064 	switch (limit) {
1065 	case SPDK_BDEV_QOS_RW_IOPS_RATE_LIMIT:
1066 		return true;
1067 	case SPDK_BDEV_QOS_RW_BPS_RATE_LIMIT:
1068 		return false;
1069 	case SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES:
1070 	default:
1071 		return false;
1072 	}
1073 }
1074 
1075 static bool
1076 _spdk_bdev_qos_io_to_limit(struct spdk_bdev_io *bdev_io)
1077 {
1078 	switch (bdev_io->type) {
1079 	case SPDK_BDEV_IO_TYPE_NVME_IO:
1080 	case SPDK_BDEV_IO_TYPE_NVME_IO_MD:
1081 	case SPDK_BDEV_IO_TYPE_READ:
1082 	case SPDK_BDEV_IO_TYPE_WRITE:
1083 	case SPDK_BDEV_IO_TYPE_UNMAP:
1084 	case SPDK_BDEV_IO_TYPE_WRITE_ZEROES:
1085 		return true;
1086 	default:
1087 		return false;
1088 	}
1089 }
1090 
1091 static uint64_t
1092 _spdk_bdev_get_io_size_in_byte(struct spdk_bdev_io *bdev_io)
1093 {
1094 	struct spdk_bdev	*bdev = bdev_io->bdev;
1095 
1096 	switch (bdev_io->type) {
1097 	case SPDK_BDEV_IO_TYPE_NVME_IO:
1098 	case SPDK_BDEV_IO_TYPE_NVME_IO_MD:
1099 		return bdev_io->u.nvme_passthru.nbytes;
1100 	case SPDK_BDEV_IO_TYPE_READ:
1101 	case SPDK_BDEV_IO_TYPE_WRITE:
1102 	case SPDK_BDEV_IO_TYPE_UNMAP:
1103 	case SPDK_BDEV_IO_TYPE_WRITE_ZEROES:
1104 		return bdev_io->u.bdev.num_blocks * bdev->blocklen;
1105 	default:
1106 		return 0;
1107 	}
1108 }
1109 
1110 static void
1111 _spdk_bdev_qos_update_per_io(struct spdk_bdev_qos *qos, uint64_t io_size_in_byte)
1112 {
1113 	int i;
1114 
1115 	for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) {
1116 		if (qos->rate_limits[i].limit == SPDK_BDEV_QOS_LIMIT_NOT_DEFINED) {
1117 			continue;
1118 		}
1119 
1120 		switch (i) {
1121 		case SPDK_BDEV_QOS_RW_IOPS_RATE_LIMIT:
1122 			qos->rate_limits[i].remaining_this_timeslice--;
1123 			break;
1124 		case SPDK_BDEV_QOS_RW_BPS_RATE_LIMIT:
1125 			qos->rate_limits[i].remaining_this_timeslice -= io_size_in_byte;
1126 			break;
1127 		case SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES:
1128 		default:
1129 			break;
1130 		}
1131 	}
1132 }
1133 
1134 static void
1135 _spdk_bdev_qos_io_submit(struct spdk_bdev_channel *ch, struct spdk_bdev_qos *qos)
1136 {
1137 	struct spdk_bdev_io		*bdev_io = NULL;
1138 	struct spdk_bdev		*bdev = ch->bdev;
1139 	struct spdk_bdev_shared_resource *shared_resource = ch->shared_resource;
1140 	int				i;
1141 	bool				to_limit_io;
1142 	uint64_t			io_size_in_byte;
1143 
1144 	while (!TAILQ_EMPTY(&qos->queued)) {
1145 		for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) {
1146 			if (qos->rate_limits[i].max_per_timeslice > 0 &&
1147 			    (qos->rate_limits[i].remaining_this_timeslice <= 0)) {
1148 				return;
1149 			}
1150 		}
1151 
1152 		bdev_io = TAILQ_FIRST(&qos->queued);
1153 		TAILQ_REMOVE(&qos->queued, bdev_io, internal.link);
1154 		ch->io_outstanding++;
1155 		shared_resource->io_outstanding++;
1156 		to_limit_io = _spdk_bdev_qos_io_to_limit(bdev_io);
1157 		if (to_limit_io == true) {
1158 			io_size_in_byte = _spdk_bdev_get_io_size_in_byte(bdev_io);
1159 			_spdk_bdev_qos_update_per_io(qos, io_size_in_byte);
1160 		}
1161 		bdev->fn_table->submit_request(ch->channel, bdev_io);
1162 	}
1163 }
1164 
1165 static void
1166 _spdk_bdev_queue_io_wait_with_cb(struct spdk_bdev_io *bdev_io, spdk_bdev_io_wait_cb cb_fn)
1167 {
1168 	int rc;
1169 
1170 	bdev_io->internal.waitq_entry.bdev = bdev_io->bdev;
1171 	bdev_io->internal.waitq_entry.cb_fn = cb_fn;
1172 	bdev_io->internal.waitq_entry.cb_arg = bdev_io;
1173 	rc = spdk_bdev_queue_io_wait(bdev_io->bdev, spdk_io_channel_from_ctx(bdev_io->internal.ch),
1174 				     &bdev_io->internal.waitq_entry);
1175 	if (rc != 0) {
1176 		SPDK_ERRLOG("Queue IO failed, rc=%d\n", rc);
1177 		bdev_io->internal.status = SPDK_BDEV_IO_STATUS_FAILED;
1178 		bdev_io->internal.cb(bdev_io, false, bdev_io->internal.caller_ctx);
1179 	}
1180 }
1181 
1182 static bool
1183 _spdk_bdev_io_type_can_split(uint8_t type)
1184 {
1185 	assert(type != SPDK_BDEV_IO_TYPE_INVALID);
1186 	assert(type < SPDK_BDEV_NUM_IO_TYPES);
1187 
1188 	/* Only split READ and WRITE I/O.  Theoretically other types of I/O like
1189 	 * UNMAP could be split, but these types of I/O are typically much larger
1190 	 * in size (sometimes the size of the entire block device), and the bdev
1191 	 * module can more efficiently split these types of I/O.  Plus those types
1192 	 * of I/O do not have a payload, which makes the splitting process simpler.
1193 	 */
1194 	if (type == SPDK_BDEV_IO_TYPE_READ || type == SPDK_BDEV_IO_TYPE_WRITE) {
1195 		return true;
1196 	} else {
1197 		return false;
1198 	}
1199 }
1200 
1201 static bool
1202 _spdk_bdev_io_should_split(struct spdk_bdev_io *bdev_io)
1203 {
1204 	uint64_t start_stripe, end_stripe;
1205 	uint32_t io_boundary = bdev_io->bdev->optimal_io_boundary;
1206 
1207 	if (io_boundary == 0) {
1208 		return false;
1209 	}
1210 
1211 	if (!_spdk_bdev_io_type_can_split(bdev_io->type)) {
1212 		return false;
1213 	}
1214 
1215 	start_stripe = bdev_io->u.bdev.offset_blocks;
1216 	end_stripe = start_stripe + bdev_io->u.bdev.num_blocks - 1;
1217 	/* Avoid expensive div operations if possible.  These spdk_u32 functions are very cheap. */
1218 	if (spdk_likely(spdk_u32_is_pow2(io_boundary))) {
1219 		start_stripe >>= spdk_u32log2(io_boundary);
1220 		end_stripe >>= spdk_u32log2(io_boundary);
1221 	} else {
1222 		start_stripe /= io_boundary;
1223 		end_stripe /= io_boundary;
1224 	}
1225 	return (start_stripe != end_stripe);
1226 }
1227 
1228 static uint32_t
1229 _to_next_boundary(uint64_t offset, uint32_t boundary)
1230 {
1231 	return (boundary - (offset % boundary));
1232 }
1233 
1234 static void
1235 _spdk_bdev_io_split_done(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg);
1236 
1237 static void
1238 _spdk_bdev_io_split_with_payload(void *_bdev_io)
1239 {
1240 	struct spdk_bdev_io *bdev_io = _bdev_io;
1241 	uint64_t current_offset, remaining;
1242 	uint32_t blocklen, to_next_boundary, to_next_boundary_bytes;
1243 	struct iovec *parent_iov, *iov;
1244 	uint64_t parent_iov_offset, iov_len;
1245 	uint32_t parent_iovpos, parent_iovcnt, child_iovcnt, iovcnt;
1246 	int rc;
1247 
1248 	remaining = bdev_io->u.bdev.split_remaining_num_blocks;
1249 	current_offset = bdev_io->u.bdev.split_current_offset_blocks;
1250 	blocklen = bdev_io->bdev->blocklen;
1251 	parent_iov_offset = (current_offset - bdev_io->u.bdev.offset_blocks) * blocklen;
1252 	parent_iovcnt = bdev_io->u.bdev.iovcnt;
1253 
1254 	for (parent_iovpos = 0; parent_iovpos < parent_iovcnt; parent_iovpos++) {
1255 		parent_iov = &bdev_io->u.bdev.iovs[parent_iovpos];
1256 		if (parent_iov_offset < parent_iov->iov_len) {
1257 			break;
1258 		}
1259 		parent_iov_offset -= parent_iov->iov_len;
1260 	}
1261 
1262 	child_iovcnt = 0;
1263 	while (remaining > 0 && parent_iovpos < parent_iovcnt && child_iovcnt < BDEV_IO_NUM_CHILD_IOV) {
1264 		to_next_boundary = _to_next_boundary(current_offset, bdev_io->bdev->optimal_io_boundary);
1265 		to_next_boundary = spdk_min(remaining, to_next_boundary);
1266 		to_next_boundary_bytes = to_next_boundary * blocklen;
1267 		iov = &bdev_io->child_iov[child_iovcnt];
1268 		iovcnt = 0;
1269 		while (to_next_boundary_bytes > 0 && parent_iovpos < parent_iovcnt &&
1270 		       child_iovcnt < BDEV_IO_NUM_CHILD_IOV) {
1271 			parent_iov = &bdev_io->u.bdev.iovs[parent_iovpos];
1272 			iov_len = spdk_min(to_next_boundary_bytes, parent_iov->iov_len - parent_iov_offset);
1273 			to_next_boundary_bytes -= iov_len;
1274 
1275 			bdev_io->child_iov[child_iovcnt].iov_base = parent_iov->iov_base + parent_iov_offset;
1276 			bdev_io->child_iov[child_iovcnt].iov_len = iov_len;
1277 
1278 			if (iov_len < parent_iov->iov_len - parent_iov_offset) {
1279 				parent_iov_offset += iov_len;
1280 			} else {
1281 				parent_iovpos++;
1282 				parent_iov_offset = 0;
1283 			}
1284 			child_iovcnt++;
1285 			iovcnt++;
1286 		}
1287 
1288 		if (to_next_boundary_bytes > 0) {
1289 			/* We had to stop this child I/O early because we ran out of
1290 			 *  child_iov space.  Make sure the iovs collected are valid and
1291 			 *  then adjust to_next_boundary before starting the child I/O.
1292 			 */
1293 			if ((to_next_boundary_bytes % blocklen) != 0) {
1294 				SPDK_ERRLOG("Remaining %" PRIu32 " is not multiple of block size %" PRIu32 "\n",
1295 					    to_next_boundary_bytes, blocklen);
1296 				bdev_io->internal.status = SPDK_BDEV_IO_STATUS_FAILED;
1297 				if (bdev_io->u.bdev.split_outstanding == 0) {
1298 					bdev_io->internal.cb(bdev_io, false, bdev_io->internal.caller_ctx);
1299 				}
1300 				return;
1301 			}
1302 			to_next_boundary -= to_next_boundary_bytes / blocklen;
1303 		}
1304 
1305 		bdev_io->u.bdev.split_outstanding++;
1306 
1307 		if (bdev_io->type == SPDK_BDEV_IO_TYPE_READ) {
1308 			rc = spdk_bdev_readv_blocks(bdev_io->internal.desc,
1309 						    spdk_io_channel_from_ctx(bdev_io->internal.ch),
1310 						    iov, iovcnt, current_offset, to_next_boundary,
1311 						    _spdk_bdev_io_split_done, bdev_io);
1312 		} else {
1313 			rc = spdk_bdev_writev_blocks(bdev_io->internal.desc,
1314 						     spdk_io_channel_from_ctx(bdev_io->internal.ch),
1315 						     iov, iovcnt, current_offset, to_next_boundary,
1316 						     _spdk_bdev_io_split_done, bdev_io);
1317 		}
1318 
1319 		if (rc == 0) {
1320 			current_offset += to_next_boundary;
1321 			remaining -= to_next_boundary;
1322 			bdev_io->u.bdev.split_current_offset_blocks = current_offset;
1323 			bdev_io->u.bdev.split_remaining_num_blocks = remaining;
1324 		} else {
1325 			bdev_io->u.bdev.split_outstanding--;
1326 			if (rc == -ENOMEM) {
1327 				if (bdev_io->u.bdev.split_outstanding == 0) {
1328 					/* No I/O is outstanding. Hence we should wait here. */
1329 					_spdk_bdev_queue_io_wait_with_cb(bdev_io,
1330 									 _spdk_bdev_io_split_with_payload);
1331 				}
1332 			} else {
1333 				bdev_io->internal.status = SPDK_BDEV_IO_STATUS_FAILED;
1334 				if (bdev_io->u.bdev.split_outstanding == 0) {
1335 					bdev_io->internal.cb(bdev_io, false, bdev_io->internal.caller_ctx);
1336 				}
1337 			}
1338 
1339 			return;
1340 		}
1341 	}
1342 }
1343 
1344 static void
1345 _spdk_bdev_io_split_done(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg)
1346 {
1347 	struct spdk_bdev_io *parent_io = cb_arg;
1348 
1349 	spdk_bdev_free_io(bdev_io);
1350 
1351 	if (!success) {
1352 		parent_io->internal.status = SPDK_BDEV_IO_STATUS_FAILED;
1353 	}
1354 	parent_io->u.bdev.split_outstanding--;
1355 	if (parent_io->u.bdev.split_outstanding != 0) {
1356 		return;
1357 	}
1358 
1359 	/*
1360 	 * Parent I/O finishes when all blocks are consumed or there is any failure of
1361 	 * child I/O and no outstanding child I/O.
1362 	 */
1363 	if (parent_io->u.bdev.split_remaining_num_blocks == 0 ||
1364 	    parent_io->internal.status != SPDK_BDEV_IO_STATUS_SUCCESS) {
1365 		parent_io->internal.cb(parent_io, parent_io->internal.status == SPDK_BDEV_IO_STATUS_SUCCESS,
1366 				       parent_io->internal.caller_ctx);
1367 		return;
1368 	}
1369 
1370 	/*
1371 	 * Continue with the splitting process.  This function will complete the parent I/O if the
1372 	 * splitting is done.
1373 	 */
1374 	_spdk_bdev_io_split_with_payload(parent_io);
1375 }
1376 
1377 static void
1378 _spdk_bdev_io_split(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io)
1379 {
1380 	assert(_spdk_bdev_io_type_can_split(bdev_io->type));
1381 
1382 	bdev_io->u.bdev.split_current_offset_blocks = bdev_io->u.bdev.offset_blocks;
1383 	bdev_io->u.bdev.split_remaining_num_blocks = bdev_io->u.bdev.num_blocks;
1384 	bdev_io->u.bdev.split_outstanding = 0;
1385 	bdev_io->internal.status = SPDK_BDEV_IO_STATUS_SUCCESS;
1386 
1387 	_spdk_bdev_io_split_with_payload(bdev_io);
1388 }
1389 
1390 static void
1391 _spdk_bdev_io_submit(void *ctx)
1392 {
1393 	struct spdk_bdev_io *bdev_io = ctx;
1394 	struct spdk_bdev *bdev = bdev_io->bdev;
1395 	struct spdk_bdev_channel *bdev_ch = bdev_io->internal.ch;
1396 	struct spdk_io_channel *ch = bdev_ch->channel;
1397 	struct spdk_bdev_shared_resource *shared_resource = bdev_ch->shared_resource;
1398 	uint64_t tsc;
1399 
1400 	tsc = spdk_get_ticks();
1401 	bdev_io->internal.submit_tsc = tsc;
1402 	spdk_trace_record_tsc(tsc, TRACE_BDEV_IO_START, 0, 0, (uintptr_t)bdev_io, bdev_io->type);
1403 	bdev_ch->io_outstanding++;
1404 	shared_resource->io_outstanding++;
1405 	bdev_io->internal.in_submit_request = true;
1406 	if (spdk_likely(bdev_ch->flags == 0)) {
1407 		if (spdk_likely(TAILQ_EMPTY(&shared_resource->nomem_io))) {
1408 			bdev->fn_table->submit_request(ch, bdev_io);
1409 		} else {
1410 			bdev_ch->io_outstanding--;
1411 			shared_resource->io_outstanding--;
1412 			TAILQ_INSERT_TAIL(&shared_resource->nomem_io, bdev_io, internal.link);
1413 		}
1414 	} else if (bdev_ch->flags & BDEV_CH_RESET_IN_PROGRESS) {
1415 		spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED);
1416 	} else if (bdev_ch->flags & BDEV_CH_QOS_ENABLED) {
1417 		bdev_ch->io_outstanding--;
1418 		shared_resource->io_outstanding--;
1419 		TAILQ_INSERT_TAIL(&bdev->internal.qos->queued, bdev_io, internal.link);
1420 		_spdk_bdev_qos_io_submit(bdev_ch, bdev->internal.qos);
1421 	} else {
1422 		SPDK_ERRLOG("unknown bdev_ch flag %x found\n", bdev_ch->flags);
1423 		spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED);
1424 	}
1425 	bdev_io->internal.in_submit_request = false;
1426 }
1427 
1428 static void
1429 spdk_bdev_io_submit(struct spdk_bdev_io *bdev_io)
1430 {
1431 	struct spdk_bdev *bdev = bdev_io->bdev;
1432 	struct spdk_thread *thread = spdk_io_channel_get_thread(bdev_io->internal.ch->channel);
1433 
1434 	assert(thread != NULL);
1435 	assert(bdev_io->internal.status == SPDK_BDEV_IO_STATUS_PENDING);
1436 
1437 	if (bdev->split_on_optimal_io_boundary && _spdk_bdev_io_should_split(bdev_io)) {
1438 		if (bdev_io->type == SPDK_BDEV_IO_TYPE_READ) {
1439 			spdk_bdev_io_get_buf(bdev_io, _spdk_bdev_io_split,
1440 					     bdev_io->u.bdev.num_blocks * bdev_io->bdev->blocklen);
1441 		} else {
1442 			_spdk_bdev_io_split(NULL, bdev_io);
1443 		}
1444 		return;
1445 	}
1446 
1447 	if (bdev_io->internal.ch->flags & BDEV_CH_QOS_ENABLED) {
1448 		if ((thread == bdev->internal.qos->thread) || !bdev->internal.qos->thread) {
1449 			_spdk_bdev_io_submit(bdev_io);
1450 		} else {
1451 			bdev_io->internal.io_submit_ch = bdev_io->internal.ch;
1452 			bdev_io->internal.ch = bdev->internal.qos->ch;
1453 			spdk_thread_send_msg(bdev->internal.qos->thread, _spdk_bdev_io_submit, bdev_io);
1454 		}
1455 	} else {
1456 		_spdk_bdev_io_submit(bdev_io);
1457 	}
1458 }
1459 
1460 static void
1461 spdk_bdev_io_submit_reset(struct spdk_bdev_io *bdev_io)
1462 {
1463 	struct spdk_bdev *bdev = bdev_io->bdev;
1464 	struct spdk_bdev_channel *bdev_ch = bdev_io->internal.ch;
1465 	struct spdk_io_channel *ch = bdev_ch->channel;
1466 
1467 	assert(bdev_io->internal.status == SPDK_BDEV_IO_STATUS_PENDING);
1468 
1469 	bdev_io->internal.in_submit_request = true;
1470 	bdev->fn_table->submit_request(ch, bdev_io);
1471 	bdev_io->internal.in_submit_request = false;
1472 }
1473 
1474 static void
1475 spdk_bdev_io_init(struct spdk_bdev_io *bdev_io,
1476 		  struct spdk_bdev *bdev, void *cb_arg,
1477 		  spdk_bdev_io_completion_cb cb)
1478 {
1479 	bdev_io->bdev = bdev;
1480 	bdev_io->internal.caller_ctx = cb_arg;
1481 	bdev_io->internal.cb = cb;
1482 	bdev_io->internal.status = SPDK_BDEV_IO_STATUS_PENDING;
1483 	bdev_io->internal.in_submit_request = false;
1484 	bdev_io->internal.buf = NULL;
1485 	bdev_io->internal.io_submit_ch = NULL;
1486 }
1487 
1488 static bool
1489 _spdk_bdev_io_type_supported(struct spdk_bdev *bdev, enum spdk_bdev_io_type io_type)
1490 {
1491 	return bdev->fn_table->io_type_supported(bdev->ctxt, io_type);
1492 }
1493 
1494 bool
1495 spdk_bdev_io_type_supported(struct spdk_bdev *bdev, enum spdk_bdev_io_type io_type)
1496 {
1497 	bool supported;
1498 
1499 	supported = _spdk_bdev_io_type_supported(bdev, io_type);
1500 
1501 	if (!supported) {
1502 		switch (io_type) {
1503 		case SPDK_BDEV_IO_TYPE_WRITE_ZEROES:
1504 			/* The bdev layer will emulate write zeroes as long as write is supported. */
1505 			supported = _spdk_bdev_io_type_supported(bdev, SPDK_BDEV_IO_TYPE_WRITE);
1506 			break;
1507 		default:
1508 			break;
1509 		}
1510 	}
1511 
1512 	return supported;
1513 }
1514 
1515 int
1516 spdk_bdev_dump_info_json(struct spdk_bdev *bdev, struct spdk_json_write_ctx *w)
1517 {
1518 	if (bdev->fn_table->dump_info_json) {
1519 		return bdev->fn_table->dump_info_json(bdev->ctxt, w);
1520 	}
1521 
1522 	return 0;
1523 }
1524 
1525 static void
1526 spdk_bdev_qos_update_max_quota_per_timeslice(struct spdk_bdev_qos *qos)
1527 {
1528 	uint32_t max_per_timeslice = 0;
1529 	int i;
1530 
1531 	for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) {
1532 		if (qos->rate_limits[i].limit == SPDK_BDEV_QOS_LIMIT_NOT_DEFINED) {
1533 			qos->rate_limits[i].max_per_timeslice = 0;
1534 			continue;
1535 		}
1536 
1537 		max_per_timeslice = qos->rate_limits[i].limit *
1538 				    SPDK_BDEV_QOS_TIMESLICE_IN_USEC / SPDK_SEC_TO_USEC;
1539 
1540 		qos->rate_limits[i].max_per_timeslice = spdk_max(max_per_timeslice,
1541 							qos->rate_limits[i].min_per_timeslice);
1542 
1543 		qos->rate_limits[i].remaining_this_timeslice = qos->rate_limits[i].max_per_timeslice;
1544 	}
1545 }
1546 
1547 static int
1548 spdk_bdev_channel_poll_qos(void *arg)
1549 {
1550 	struct spdk_bdev_qos *qos = arg;
1551 	uint64_t now = spdk_get_ticks();
1552 	int i;
1553 
1554 	if (now < (qos->last_timeslice + qos->timeslice_size)) {
1555 		/* We received our callback earlier than expected - return
1556 		 *  immediately and wait to do accounting until at least one
1557 		 *  timeslice has actually expired.  This should never happen
1558 		 *  with a well-behaved timer implementation.
1559 		 */
1560 		return 0;
1561 	}
1562 
1563 	/* Reset for next round of rate limiting */
1564 	for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) {
1565 		/* We may have allowed the IOs or bytes to slightly overrun in the last
1566 		 * timeslice. remaining_this_timeslice is signed, so if it's negative
1567 		 * here, we'll account for the overrun so that the next timeslice will
1568 		 * be appropriately reduced.
1569 		 */
1570 		if (qos->rate_limits[i].remaining_this_timeslice > 0) {
1571 			qos->rate_limits[i].remaining_this_timeslice = 0;
1572 		}
1573 	}
1574 
1575 	while (now >= (qos->last_timeslice + qos->timeslice_size)) {
1576 		qos->last_timeslice += qos->timeslice_size;
1577 		for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) {
1578 			qos->rate_limits[i].remaining_this_timeslice +=
1579 				qos->rate_limits[i].max_per_timeslice;
1580 		}
1581 	}
1582 
1583 	_spdk_bdev_qos_io_submit(qos->ch, qos);
1584 
1585 	return -1;
1586 }
1587 
1588 static void
1589 _spdk_bdev_channel_destroy_resource(struct spdk_bdev_channel *ch)
1590 {
1591 	struct spdk_bdev_shared_resource *shared_resource;
1592 
1593 	if (!ch) {
1594 		return;
1595 	}
1596 
1597 	if (ch->channel) {
1598 		spdk_put_io_channel(ch->channel);
1599 	}
1600 
1601 	assert(ch->io_outstanding == 0);
1602 
1603 	shared_resource = ch->shared_resource;
1604 	if (shared_resource) {
1605 		assert(ch->io_outstanding == 0);
1606 		assert(shared_resource->ref > 0);
1607 		shared_resource->ref--;
1608 		if (shared_resource->ref == 0) {
1609 			assert(shared_resource->io_outstanding == 0);
1610 			TAILQ_REMOVE(&shared_resource->mgmt_ch->shared_resources, shared_resource, link);
1611 			spdk_put_io_channel(spdk_io_channel_from_ctx(shared_resource->mgmt_ch));
1612 			free(shared_resource);
1613 		}
1614 	}
1615 }
1616 
1617 /* Caller must hold bdev->internal.mutex. */
1618 static void
1619 _spdk_bdev_enable_qos(struct spdk_bdev *bdev, struct spdk_bdev_channel *ch)
1620 {
1621 	struct spdk_bdev_qos	*qos = bdev->internal.qos;
1622 	int			i;
1623 
1624 	/* Rate limiting on this bdev enabled */
1625 	if (qos) {
1626 		if (qos->ch == NULL) {
1627 			struct spdk_io_channel *io_ch;
1628 
1629 			SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Selecting channel %p as QoS channel for bdev %s on thread %p\n", ch,
1630 				      bdev->name, spdk_get_thread());
1631 
1632 			/* No qos channel has been selected, so set one up */
1633 
1634 			/* Take another reference to ch */
1635 			io_ch = spdk_get_io_channel(__bdev_to_io_dev(bdev));
1636 			qos->ch = ch;
1637 
1638 			qos->thread = spdk_io_channel_get_thread(io_ch);
1639 
1640 			TAILQ_INIT(&qos->queued);
1641 
1642 			for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) {
1643 				if (_spdk_bdev_qos_is_iops_rate_limit(i) == true) {
1644 					qos->rate_limits[i].min_per_timeslice =
1645 						SPDK_BDEV_QOS_MIN_IO_PER_TIMESLICE;
1646 				} else {
1647 					qos->rate_limits[i].min_per_timeslice =
1648 						SPDK_BDEV_QOS_MIN_BYTE_PER_TIMESLICE;
1649 				}
1650 
1651 				if (qos->rate_limits[i].limit == 0) {
1652 					qos->rate_limits[i].limit = SPDK_BDEV_QOS_LIMIT_NOT_DEFINED;
1653 				}
1654 			}
1655 			spdk_bdev_qos_update_max_quota_per_timeslice(qos);
1656 			qos->timeslice_size =
1657 				SPDK_BDEV_QOS_TIMESLICE_IN_USEC * spdk_get_ticks_hz() / SPDK_SEC_TO_USEC;
1658 			qos->last_timeslice = spdk_get_ticks();
1659 			qos->poller = spdk_poller_register(spdk_bdev_channel_poll_qos,
1660 							   qos,
1661 							   SPDK_BDEV_QOS_TIMESLICE_IN_USEC);
1662 		}
1663 
1664 		ch->flags |= BDEV_CH_QOS_ENABLED;
1665 	}
1666 }
1667 
1668 static int
1669 spdk_bdev_channel_create(void *io_device, void *ctx_buf)
1670 {
1671 	struct spdk_bdev		*bdev = __bdev_from_io_dev(io_device);
1672 	struct spdk_bdev_channel	*ch = ctx_buf;
1673 	struct spdk_io_channel		*mgmt_io_ch;
1674 	struct spdk_bdev_mgmt_channel	*mgmt_ch;
1675 	struct spdk_bdev_shared_resource *shared_resource;
1676 
1677 	ch->bdev = bdev;
1678 	ch->channel = bdev->fn_table->get_io_channel(bdev->ctxt);
1679 	if (!ch->channel) {
1680 		return -1;
1681 	}
1682 
1683 	mgmt_io_ch = spdk_get_io_channel(&g_bdev_mgr);
1684 	if (!mgmt_io_ch) {
1685 		return -1;
1686 	}
1687 
1688 	mgmt_ch = spdk_io_channel_get_ctx(mgmt_io_ch);
1689 	TAILQ_FOREACH(shared_resource, &mgmt_ch->shared_resources, link) {
1690 		if (shared_resource->shared_ch == ch->channel) {
1691 			spdk_put_io_channel(mgmt_io_ch);
1692 			shared_resource->ref++;
1693 			break;
1694 		}
1695 	}
1696 
1697 	if (shared_resource == NULL) {
1698 		shared_resource = calloc(1, sizeof(*shared_resource));
1699 		if (shared_resource == NULL) {
1700 			spdk_put_io_channel(mgmt_io_ch);
1701 			return -1;
1702 		}
1703 
1704 		shared_resource->mgmt_ch = mgmt_ch;
1705 		shared_resource->io_outstanding = 0;
1706 		TAILQ_INIT(&shared_resource->nomem_io);
1707 		shared_resource->nomem_threshold = 0;
1708 		shared_resource->shared_ch = ch->channel;
1709 		shared_resource->ref = 1;
1710 		TAILQ_INSERT_TAIL(&mgmt_ch->shared_resources, shared_resource, link);
1711 	}
1712 
1713 	memset(&ch->stat, 0, sizeof(ch->stat));
1714 	ch->stat.ticks_rate = spdk_get_ticks_hz();
1715 	ch->io_outstanding = 0;
1716 	TAILQ_INIT(&ch->queued_resets);
1717 	ch->flags = 0;
1718 	ch->shared_resource = shared_resource;
1719 
1720 #ifdef SPDK_CONFIG_VTUNE
1721 	{
1722 		char *name;
1723 		__itt_init_ittlib(NULL, 0);
1724 		name = spdk_sprintf_alloc("spdk_bdev_%s_%p", ch->bdev->name, ch);
1725 		if (!name) {
1726 			_spdk_bdev_channel_destroy_resource(ch);
1727 			return -1;
1728 		}
1729 		ch->handle = __itt_string_handle_create(name);
1730 		free(name);
1731 		ch->start_tsc = spdk_get_ticks();
1732 		ch->interval_tsc = spdk_get_ticks_hz() / 100;
1733 		memset(&ch->prev_stat, 0, sizeof(ch->prev_stat));
1734 	}
1735 #endif
1736 
1737 	pthread_mutex_lock(&bdev->internal.mutex);
1738 	_spdk_bdev_enable_qos(bdev, ch);
1739 	pthread_mutex_unlock(&bdev->internal.mutex);
1740 
1741 	return 0;
1742 }
1743 
1744 /*
1745  * Abort I/O that are waiting on a data buffer.  These types of I/O are
1746  *  linked using the spdk_bdev_io internal.buf_link TAILQ_ENTRY.
1747  */
1748 static void
1749 _spdk_bdev_abort_buf_io(bdev_io_stailq_t *queue, struct spdk_bdev_channel *ch)
1750 {
1751 	bdev_io_stailq_t tmp;
1752 	struct spdk_bdev_io *bdev_io;
1753 
1754 	STAILQ_INIT(&tmp);
1755 
1756 	while (!STAILQ_EMPTY(queue)) {
1757 		bdev_io = STAILQ_FIRST(queue);
1758 		STAILQ_REMOVE_HEAD(queue, internal.buf_link);
1759 		if (bdev_io->internal.ch == ch) {
1760 			spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED);
1761 		} else {
1762 			STAILQ_INSERT_TAIL(&tmp, bdev_io, internal.buf_link);
1763 		}
1764 	}
1765 
1766 	STAILQ_SWAP(&tmp, queue, spdk_bdev_io);
1767 }
1768 
1769 /*
1770  * Abort I/O that are queued waiting for submission.  These types of I/O are
1771  *  linked using the spdk_bdev_io link TAILQ_ENTRY.
1772  */
1773 static void
1774 _spdk_bdev_abort_queued_io(bdev_io_tailq_t *queue, struct spdk_bdev_channel *ch)
1775 {
1776 	struct spdk_bdev_io *bdev_io, *tmp;
1777 
1778 	TAILQ_FOREACH_SAFE(bdev_io, queue, internal.link, tmp) {
1779 		if (bdev_io->internal.ch == ch) {
1780 			TAILQ_REMOVE(queue, bdev_io, internal.link);
1781 			/*
1782 			 * spdk_bdev_io_complete() assumes that the completed I/O had
1783 			 *  been submitted to the bdev module.  Since in this case it
1784 			 *  hadn't, bump io_outstanding to account for the decrement
1785 			 *  that spdk_bdev_io_complete() will do.
1786 			 */
1787 			if (bdev_io->type != SPDK_BDEV_IO_TYPE_RESET) {
1788 				ch->io_outstanding++;
1789 				ch->shared_resource->io_outstanding++;
1790 			}
1791 			spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED);
1792 		}
1793 	}
1794 }
1795 
1796 static void
1797 spdk_bdev_qos_channel_destroy(void *cb_arg)
1798 {
1799 	struct spdk_bdev_qos *qos = cb_arg;
1800 
1801 	spdk_put_io_channel(spdk_io_channel_from_ctx(qos->ch));
1802 	spdk_poller_unregister(&qos->poller);
1803 
1804 	SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Free QoS %p.\n", qos);
1805 
1806 	free(qos);
1807 }
1808 
1809 static int
1810 spdk_bdev_qos_destroy(struct spdk_bdev *bdev)
1811 {
1812 	int i;
1813 
1814 	/*
1815 	 * Cleanly shutting down the QoS poller is tricky, because
1816 	 * during the asynchronous operation the user could open
1817 	 * a new descriptor and create a new channel, spawning
1818 	 * a new QoS poller.
1819 	 *
1820 	 * The strategy is to create a new QoS structure here and swap it
1821 	 * in. The shutdown path then continues to refer to the old one
1822 	 * until it completes and then releases it.
1823 	 */
1824 	struct spdk_bdev_qos *new_qos, *old_qos;
1825 
1826 	old_qos = bdev->internal.qos;
1827 
1828 	new_qos = calloc(1, sizeof(*new_qos));
1829 	if (!new_qos) {
1830 		SPDK_ERRLOG("Unable to allocate memory to shut down QoS.\n");
1831 		return -ENOMEM;
1832 	}
1833 
1834 	/* Copy the old QoS data into the newly allocated structure */
1835 	memcpy(new_qos, old_qos, sizeof(*new_qos));
1836 
1837 	/* Zero out the key parts of the QoS structure */
1838 	new_qos->ch = NULL;
1839 	new_qos->thread = NULL;
1840 	new_qos->poller = NULL;
1841 	TAILQ_INIT(&new_qos->queued);
1842 	/*
1843 	 * The limit member of spdk_bdev_qos_limit structure is not zeroed.
1844 	 * It will be used later for the new QoS structure.
1845 	 */
1846 	for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) {
1847 		new_qos->rate_limits[i].remaining_this_timeslice = 0;
1848 		new_qos->rate_limits[i].min_per_timeslice = 0;
1849 		new_qos->rate_limits[i].max_per_timeslice = 0;
1850 	}
1851 
1852 	bdev->internal.qos = new_qos;
1853 
1854 	if (old_qos->thread == NULL) {
1855 		free(old_qos);
1856 	} else {
1857 		spdk_thread_send_msg(old_qos->thread, spdk_bdev_qos_channel_destroy,
1858 				     old_qos);
1859 	}
1860 
1861 	/* It is safe to continue with destroying the bdev even though the QoS channel hasn't
1862 	 * been destroyed yet. The destruction path will end up waiting for the final
1863 	 * channel to be put before it releases resources. */
1864 
1865 	return 0;
1866 }
1867 
1868 static void
1869 _spdk_bdev_io_stat_add(struct spdk_bdev_io_stat *total, struct spdk_bdev_io_stat *add)
1870 {
1871 	total->bytes_read += add->bytes_read;
1872 	total->num_read_ops += add->num_read_ops;
1873 	total->bytes_written += add->bytes_written;
1874 	total->num_write_ops += add->num_write_ops;
1875 	total->read_latency_ticks += add->read_latency_ticks;
1876 	total->write_latency_ticks += add->write_latency_ticks;
1877 }
1878 
1879 static void
1880 spdk_bdev_channel_destroy(void *io_device, void *ctx_buf)
1881 {
1882 	struct spdk_bdev_channel	*ch = ctx_buf;
1883 	struct spdk_bdev_mgmt_channel	*mgmt_ch;
1884 	struct spdk_bdev_shared_resource *shared_resource = ch->shared_resource;
1885 
1886 	SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Destroying channel %p for bdev %s on thread %p\n", ch, ch->bdev->name,
1887 		      spdk_get_thread());
1888 
1889 	/* This channel is going away, so add its statistics into the bdev so that they don't get lost. */
1890 	pthread_mutex_lock(&ch->bdev->internal.mutex);
1891 	_spdk_bdev_io_stat_add(&ch->bdev->internal.stat, &ch->stat);
1892 	pthread_mutex_unlock(&ch->bdev->internal.mutex);
1893 
1894 	mgmt_ch = shared_resource->mgmt_ch;
1895 
1896 	_spdk_bdev_abort_queued_io(&ch->queued_resets, ch);
1897 	_spdk_bdev_abort_queued_io(&shared_resource->nomem_io, ch);
1898 	_spdk_bdev_abort_buf_io(&mgmt_ch->need_buf_small, ch);
1899 	_spdk_bdev_abort_buf_io(&mgmt_ch->need_buf_large, ch);
1900 
1901 	_spdk_bdev_channel_destroy_resource(ch);
1902 }
1903 
1904 int
1905 spdk_bdev_alias_add(struct spdk_bdev *bdev, const char *alias)
1906 {
1907 	struct spdk_bdev_alias *tmp;
1908 
1909 	if (alias == NULL) {
1910 		SPDK_ERRLOG("Empty alias passed\n");
1911 		return -EINVAL;
1912 	}
1913 
1914 	if (spdk_bdev_get_by_name(alias)) {
1915 		SPDK_ERRLOG("Bdev name/alias: %s already exists\n", alias);
1916 		return -EEXIST;
1917 	}
1918 
1919 	tmp = calloc(1, sizeof(*tmp));
1920 	if (tmp == NULL) {
1921 		SPDK_ERRLOG("Unable to allocate alias\n");
1922 		return -ENOMEM;
1923 	}
1924 
1925 	tmp->alias = strdup(alias);
1926 	if (tmp->alias == NULL) {
1927 		free(tmp);
1928 		SPDK_ERRLOG("Unable to allocate alias\n");
1929 		return -ENOMEM;
1930 	}
1931 
1932 	TAILQ_INSERT_TAIL(&bdev->aliases, tmp, tailq);
1933 
1934 	return 0;
1935 }
1936 
1937 int
1938 spdk_bdev_alias_del(struct spdk_bdev *bdev, const char *alias)
1939 {
1940 	struct spdk_bdev_alias *tmp;
1941 
1942 	TAILQ_FOREACH(tmp, &bdev->aliases, tailq) {
1943 		if (strcmp(alias, tmp->alias) == 0) {
1944 			TAILQ_REMOVE(&bdev->aliases, tmp, tailq);
1945 			free(tmp->alias);
1946 			free(tmp);
1947 			return 0;
1948 		}
1949 	}
1950 
1951 	SPDK_INFOLOG(SPDK_LOG_BDEV, "Alias %s does not exists\n", alias);
1952 
1953 	return -ENOENT;
1954 }
1955 
1956 void
1957 spdk_bdev_alias_del_all(struct spdk_bdev *bdev)
1958 {
1959 	struct spdk_bdev_alias *p, *tmp;
1960 
1961 	TAILQ_FOREACH_SAFE(p, &bdev->aliases, tailq, tmp) {
1962 		TAILQ_REMOVE(&bdev->aliases, p, tailq);
1963 		free(p->alias);
1964 		free(p);
1965 	}
1966 }
1967 
1968 struct spdk_io_channel *
1969 spdk_bdev_get_io_channel(struct spdk_bdev_desc *desc)
1970 {
1971 	return spdk_get_io_channel(__bdev_to_io_dev(desc->bdev));
1972 }
1973 
1974 const char *
1975 spdk_bdev_get_name(const struct spdk_bdev *bdev)
1976 {
1977 	return bdev->name;
1978 }
1979 
1980 const char *
1981 spdk_bdev_get_product_name(const struct spdk_bdev *bdev)
1982 {
1983 	return bdev->product_name;
1984 }
1985 
1986 const struct spdk_bdev_aliases_list *
1987 spdk_bdev_get_aliases(const struct spdk_bdev *bdev)
1988 {
1989 	return &bdev->aliases;
1990 }
1991 
1992 uint32_t
1993 spdk_bdev_get_block_size(const struct spdk_bdev *bdev)
1994 {
1995 	return bdev->blocklen;
1996 }
1997 
1998 uint64_t
1999 spdk_bdev_get_num_blocks(const struct spdk_bdev *bdev)
2000 {
2001 	return bdev->blockcnt;
2002 }
2003 
2004 const char *
2005 spdk_bdev_get_qos_rpc_type(enum spdk_bdev_qos_rate_limit_type type)
2006 {
2007 	return qos_rpc_type[type];
2008 }
2009 
2010 void
2011 spdk_bdev_get_qos_rate_limits(struct spdk_bdev *bdev, uint64_t *limits)
2012 {
2013 	int i;
2014 
2015 	memset(limits, 0, sizeof(*limits) * SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES);
2016 
2017 	pthread_mutex_lock(&bdev->internal.mutex);
2018 	if (bdev->internal.qos) {
2019 		for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) {
2020 			if (bdev->internal.qos->rate_limits[i].limit !=
2021 			    SPDK_BDEV_QOS_LIMIT_NOT_DEFINED) {
2022 				limits[i] = bdev->internal.qos->rate_limits[i].limit;
2023 				if (_spdk_bdev_qos_is_iops_rate_limit(i) == false) {
2024 					/* Change from Byte to Megabyte which is user visible. */
2025 					limits[i] = limits[i] / 1024 / 1024;
2026 				}
2027 			}
2028 		}
2029 	}
2030 	pthread_mutex_unlock(&bdev->internal.mutex);
2031 }
2032 
2033 size_t
2034 spdk_bdev_get_buf_align(const struct spdk_bdev *bdev)
2035 {
2036 	/* TODO: push this logic down to the bdev modules */
2037 	if (bdev->need_aligned_buffer) {
2038 		return bdev->blocklen;
2039 	}
2040 
2041 	return 1;
2042 }
2043 
2044 uint32_t
2045 spdk_bdev_get_optimal_io_boundary(const struct spdk_bdev *bdev)
2046 {
2047 	return bdev->optimal_io_boundary;
2048 }
2049 
2050 bool
2051 spdk_bdev_has_write_cache(const struct spdk_bdev *bdev)
2052 {
2053 	return bdev->write_cache;
2054 }
2055 
2056 const struct spdk_uuid *
2057 spdk_bdev_get_uuid(const struct spdk_bdev *bdev)
2058 {
2059 	return &bdev->uuid;
2060 }
2061 
2062 uint64_t
2063 spdk_bdev_get_qd(const struct spdk_bdev *bdev)
2064 {
2065 	return bdev->internal.measured_queue_depth;
2066 }
2067 
2068 uint64_t
2069 spdk_bdev_get_qd_sampling_period(const struct spdk_bdev *bdev)
2070 {
2071 	return bdev->internal.period;
2072 }
2073 
2074 uint64_t
2075 spdk_bdev_get_weighted_io_time(const struct spdk_bdev *bdev)
2076 {
2077 	return bdev->internal.weighted_io_time;
2078 }
2079 
2080 uint64_t
2081 spdk_bdev_get_io_time(const struct spdk_bdev *bdev)
2082 {
2083 	return bdev->internal.io_time;
2084 }
2085 
2086 static void
2087 _calculate_measured_qd_cpl(struct spdk_io_channel_iter *i, int status)
2088 {
2089 	struct spdk_bdev *bdev = spdk_io_channel_iter_get_ctx(i);
2090 
2091 	bdev->internal.measured_queue_depth = bdev->internal.temporary_queue_depth;
2092 
2093 	if (bdev->internal.measured_queue_depth) {
2094 		bdev->internal.io_time += bdev->internal.period;
2095 		bdev->internal.weighted_io_time += bdev->internal.period * bdev->internal.measured_queue_depth;
2096 	}
2097 }
2098 
2099 static void
2100 _calculate_measured_qd(struct spdk_io_channel_iter *i)
2101 {
2102 	struct spdk_bdev *bdev = spdk_io_channel_iter_get_ctx(i);
2103 	struct spdk_io_channel *io_ch = spdk_io_channel_iter_get_channel(i);
2104 	struct spdk_bdev_channel *ch = spdk_io_channel_get_ctx(io_ch);
2105 
2106 	bdev->internal.temporary_queue_depth += ch->io_outstanding;
2107 	spdk_for_each_channel_continue(i, 0);
2108 }
2109 
2110 static int
2111 spdk_bdev_calculate_measured_queue_depth(void *ctx)
2112 {
2113 	struct spdk_bdev *bdev = ctx;
2114 	bdev->internal.temporary_queue_depth = 0;
2115 	spdk_for_each_channel(__bdev_to_io_dev(bdev), _calculate_measured_qd, bdev,
2116 			      _calculate_measured_qd_cpl);
2117 	return 0;
2118 }
2119 
2120 void
2121 spdk_bdev_set_qd_sampling_period(struct spdk_bdev *bdev, uint64_t period)
2122 {
2123 	bdev->internal.period = period;
2124 
2125 	if (bdev->internal.qd_poller != NULL) {
2126 		spdk_poller_unregister(&bdev->internal.qd_poller);
2127 		bdev->internal.measured_queue_depth = UINT64_MAX;
2128 	}
2129 
2130 	if (period != 0) {
2131 		bdev->internal.qd_poller = spdk_poller_register(spdk_bdev_calculate_measured_queue_depth, bdev,
2132 					   period);
2133 	}
2134 }
2135 
2136 int
2137 spdk_bdev_notify_blockcnt_change(struct spdk_bdev *bdev, uint64_t size)
2138 {
2139 	int ret;
2140 
2141 	pthread_mutex_lock(&bdev->internal.mutex);
2142 
2143 	/* bdev has open descriptors */
2144 	if (!TAILQ_EMPTY(&bdev->internal.open_descs) &&
2145 	    bdev->blockcnt > size) {
2146 		ret = -EBUSY;
2147 	} else {
2148 		bdev->blockcnt = size;
2149 		ret = 0;
2150 	}
2151 
2152 	pthread_mutex_unlock(&bdev->internal.mutex);
2153 
2154 	return ret;
2155 }
2156 
2157 /*
2158  * Convert I/O offset and length from bytes to blocks.
2159  *
2160  * Returns zero on success or non-zero if the byte parameters aren't divisible by the block size.
2161  */
2162 static uint64_t
2163 spdk_bdev_bytes_to_blocks(struct spdk_bdev *bdev, uint64_t offset_bytes, uint64_t *offset_blocks,
2164 			  uint64_t num_bytes, uint64_t *num_blocks)
2165 {
2166 	uint32_t block_size = bdev->blocklen;
2167 
2168 	*offset_blocks = offset_bytes / block_size;
2169 	*num_blocks = num_bytes / block_size;
2170 
2171 	return (offset_bytes % block_size) | (num_bytes % block_size);
2172 }
2173 
2174 static bool
2175 spdk_bdev_io_valid_blocks(struct spdk_bdev *bdev, uint64_t offset_blocks, uint64_t num_blocks)
2176 {
2177 	/* Return failure if offset_blocks + num_blocks is less than offset_blocks; indicates there
2178 	 * has been an overflow and hence the offset has been wrapped around */
2179 	if (offset_blocks + num_blocks < offset_blocks) {
2180 		return false;
2181 	}
2182 
2183 	/* Return failure if offset_blocks + num_blocks exceeds the size of the bdev */
2184 	if (offset_blocks + num_blocks > bdev->blockcnt) {
2185 		return false;
2186 	}
2187 
2188 	return true;
2189 }
2190 
2191 int
2192 spdk_bdev_read(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
2193 	       void *buf, uint64_t offset, uint64_t nbytes,
2194 	       spdk_bdev_io_completion_cb cb, void *cb_arg)
2195 {
2196 	uint64_t offset_blocks, num_blocks;
2197 
2198 	if (spdk_bdev_bytes_to_blocks(desc->bdev, offset, &offset_blocks, nbytes, &num_blocks) != 0) {
2199 		return -EINVAL;
2200 	}
2201 
2202 	return spdk_bdev_read_blocks(desc, ch, buf, offset_blocks, num_blocks, cb, cb_arg);
2203 }
2204 
2205 int
2206 spdk_bdev_read_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
2207 		      void *buf, uint64_t offset_blocks, uint64_t num_blocks,
2208 		      spdk_bdev_io_completion_cb cb, void *cb_arg)
2209 {
2210 	struct spdk_bdev *bdev = desc->bdev;
2211 	struct spdk_bdev_io *bdev_io;
2212 	struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch);
2213 
2214 	if (!spdk_bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) {
2215 		return -EINVAL;
2216 	}
2217 
2218 	bdev_io = spdk_bdev_get_io(channel);
2219 	if (!bdev_io) {
2220 		return -ENOMEM;
2221 	}
2222 
2223 	bdev_io->internal.ch = channel;
2224 	bdev_io->internal.desc = desc;
2225 	bdev_io->type = SPDK_BDEV_IO_TYPE_READ;
2226 	bdev_io->u.bdev.iovs = &bdev_io->iov;
2227 	bdev_io->u.bdev.iovs[0].iov_base = buf;
2228 	bdev_io->u.bdev.iovs[0].iov_len = num_blocks * bdev->blocklen;
2229 	bdev_io->u.bdev.iovcnt = 1;
2230 	bdev_io->u.bdev.num_blocks = num_blocks;
2231 	bdev_io->u.bdev.offset_blocks = offset_blocks;
2232 	spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb);
2233 
2234 	spdk_bdev_io_submit(bdev_io);
2235 	return 0;
2236 }
2237 
2238 int
2239 spdk_bdev_readv(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
2240 		struct iovec *iov, int iovcnt,
2241 		uint64_t offset, uint64_t nbytes,
2242 		spdk_bdev_io_completion_cb cb, void *cb_arg)
2243 {
2244 	uint64_t offset_blocks, num_blocks;
2245 
2246 	if (spdk_bdev_bytes_to_blocks(desc->bdev, offset, &offset_blocks, nbytes, &num_blocks) != 0) {
2247 		return -EINVAL;
2248 	}
2249 
2250 	return spdk_bdev_readv_blocks(desc, ch, iov, iovcnt, offset_blocks, num_blocks, cb, cb_arg);
2251 }
2252 
2253 int spdk_bdev_readv_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
2254 			   struct iovec *iov, int iovcnt,
2255 			   uint64_t offset_blocks, uint64_t num_blocks,
2256 			   spdk_bdev_io_completion_cb cb, void *cb_arg)
2257 {
2258 	struct spdk_bdev *bdev = desc->bdev;
2259 	struct spdk_bdev_io *bdev_io;
2260 	struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch);
2261 
2262 	if (!spdk_bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) {
2263 		return -EINVAL;
2264 	}
2265 
2266 	bdev_io = spdk_bdev_get_io(channel);
2267 	if (!bdev_io) {
2268 		return -ENOMEM;
2269 	}
2270 
2271 	bdev_io->internal.ch = channel;
2272 	bdev_io->internal.desc = desc;
2273 	bdev_io->type = SPDK_BDEV_IO_TYPE_READ;
2274 	bdev_io->u.bdev.iovs = iov;
2275 	bdev_io->u.bdev.iovcnt = iovcnt;
2276 	bdev_io->u.bdev.num_blocks = num_blocks;
2277 	bdev_io->u.bdev.offset_blocks = offset_blocks;
2278 	spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb);
2279 
2280 	spdk_bdev_io_submit(bdev_io);
2281 	return 0;
2282 }
2283 
2284 int
2285 spdk_bdev_write(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
2286 		void *buf, uint64_t offset, uint64_t nbytes,
2287 		spdk_bdev_io_completion_cb cb, void *cb_arg)
2288 {
2289 	uint64_t offset_blocks, num_blocks;
2290 
2291 	if (spdk_bdev_bytes_to_blocks(desc->bdev, offset, &offset_blocks, nbytes, &num_blocks) != 0) {
2292 		return -EINVAL;
2293 	}
2294 
2295 	return spdk_bdev_write_blocks(desc, ch, buf, offset_blocks, num_blocks, cb, cb_arg);
2296 }
2297 
2298 int
2299 spdk_bdev_write_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
2300 		       void *buf, uint64_t offset_blocks, uint64_t num_blocks,
2301 		       spdk_bdev_io_completion_cb cb, void *cb_arg)
2302 {
2303 	struct spdk_bdev *bdev = desc->bdev;
2304 	struct spdk_bdev_io *bdev_io;
2305 	struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch);
2306 
2307 	if (!desc->write) {
2308 		return -EBADF;
2309 	}
2310 
2311 	if (!spdk_bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) {
2312 		return -EINVAL;
2313 	}
2314 
2315 	bdev_io = spdk_bdev_get_io(channel);
2316 	if (!bdev_io) {
2317 		return -ENOMEM;
2318 	}
2319 
2320 	bdev_io->internal.ch = channel;
2321 	bdev_io->internal.desc = desc;
2322 	bdev_io->type = SPDK_BDEV_IO_TYPE_WRITE;
2323 	bdev_io->u.bdev.iovs = &bdev_io->iov;
2324 	bdev_io->u.bdev.iovs[0].iov_base = buf;
2325 	bdev_io->u.bdev.iovs[0].iov_len = num_blocks * bdev->blocklen;
2326 	bdev_io->u.bdev.iovcnt = 1;
2327 	bdev_io->u.bdev.num_blocks = num_blocks;
2328 	bdev_io->u.bdev.offset_blocks = offset_blocks;
2329 	spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb);
2330 
2331 	spdk_bdev_io_submit(bdev_io);
2332 	return 0;
2333 }
2334 
2335 int
2336 spdk_bdev_writev(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
2337 		 struct iovec *iov, int iovcnt,
2338 		 uint64_t offset, uint64_t len,
2339 		 spdk_bdev_io_completion_cb cb, void *cb_arg)
2340 {
2341 	uint64_t offset_blocks, num_blocks;
2342 
2343 	if (spdk_bdev_bytes_to_blocks(desc->bdev, offset, &offset_blocks, len, &num_blocks) != 0) {
2344 		return -EINVAL;
2345 	}
2346 
2347 	return spdk_bdev_writev_blocks(desc, ch, iov, iovcnt, offset_blocks, num_blocks, cb, cb_arg);
2348 }
2349 
2350 int
2351 spdk_bdev_writev_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
2352 			struct iovec *iov, int iovcnt,
2353 			uint64_t offset_blocks, uint64_t num_blocks,
2354 			spdk_bdev_io_completion_cb cb, void *cb_arg)
2355 {
2356 	struct spdk_bdev *bdev = desc->bdev;
2357 	struct spdk_bdev_io *bdev_io;
2358 	struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch);
2359 
2360 	if (!desc->write) {
2361 		return -EBADF;
2362 	}
2363 
2364 	if (!spdk_bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) {
2365 		return -EINVAL;
2366 	}
2367 
2368 	bdev_io = spdk_bdev_get_io(channel);
2369 	if (!bdev_io) {
2370 		return -ENOMEM;
2371 	}
2372 
2373 	bdev_io->internal.ch = channel;
2374 	bdev_io->internal.desc = desc;
2375 	bdev_io->type = SPDK_BDEV_IO_TYPE_WRITE;
2376 	bdev_io->u.bdev.iovs = iov;
2377 	bdev_io->u.bdev.iovcnt = iovcnt;
2378 	bdev_io->u.bdev.num_blocks = num_blocks;
2379 	bdev_io->u.bdev.offset_blocks = offset_blocks;
2380 	spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb);
2381 
2382 	spdk_bdev_io_submit(bdev_io);
2383 	return 0;
2384 }
2385 
2386 int
2387 spdk_bdev_write_zeroes(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
2388 		       uint64_t offset, uint64_t len,
2389 		       spdk_bdev_io_completion_cb cb, void *cb_arg)
2390 {
2391 	uint64_t offset_blocks, num_blocks;
2392 
2393 	if (spdk_bdev_bytes_to_blocks(desc->bdev, offset, &offset_blocks, len, &num_blocks) != 0) {
2394 		return -EINVAL;
2395 	}
2396 
2397 	return spdk_bdev_write_zeroes_blocks(desc, ch, offset_blocks, num_blocks, cb, cb_arg);
2398 }
2399 
2400 int
2401 spdk_bdev_write_zeroes_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
2402 			      uint64_t offset_blocks, uint64_t num_blocks,
2403 			      spdk_bdev_io_completion_cb cb, void *cb_arg)
2404 {
2405 	struct spdk_bdev *bdev = desc->bdev;
2406 	struct spdk_bdev_io *bdev_io;
2407 	struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch);
2408 
2409 	if (!desc->write) {
2410 		return -EBADF;
2411 	}
2412 
2413 	if (!spdk_bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) {
2414 		return -EINVAL;
2415 	}
2416 
2417 	bdev_io = spdk_bdev_get_io(channel);
2418 
2419 	if (!bdev_io) {
2420 		return -ENOMEM;
2421 	}
2422 
2423 	bdev_io->type = SPDK_BDEV_IO_TYPE_WRITE_ZEROES;
2424 	bdev_io->internal.ch = channel;
2425 	bdev_io->internal.desc = desc;
2426 	bdev_io->u.bdev.offset_blocks = offset_blocks;
2427 	bdev_io->u.bdev.num_blocks = num_blocks;
2428 	spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb);
2429 
2430 	if (_spdk_bdev_io_type_supported(bdev, SPDK_BDEV_IO_TYPE_WRITE_ZEROES)) {
2431 		spdk_bdev_io_submit(bdev_io);
2432 		return 0;
2433 	} else if (_spdk_bdev_io_type_supported(bdev, SPDK_BDEV_IO_TYPE_WRITE)) {
2434 		assert(spdk_bdev_get_block_size(bdev) <= ZERO_BUFFER_SIZE);
2435 		bdev_io->u.bdev.split_remaining_num_blocks = num_blocks;
2436 		bdev_io->u.bdev.split_current_offset_blocks = offset_blocks;
2437 		_spdk_bdev_write_zero_buffer_next(bdev_io);
2438 		return 0;
2439 	} else {
2440 		spdk_bdev_free_io(bdev_io);
2441 		return -ENOTSUP;
2442 	}
2443 }
2444 
2445 int
2446 spdk_bdev_unmap(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
2447 		uint64_t offset, uint64_t nbytes,
2448 		spdk_bdev_io_completion_cb cb, void *cb_arg)
2449 {
2450 	uint64_t offset_blocks, num_blocks;
2451 
2452 	if (spdk_bdev_bytes_to_blocks(desc->bdev, offset, &offset_blocks, nbytes, &num_blocks) != 0) {
2453 		return -EINVAL;
2454 	}
2455 
2456 	return spdk_bdev_unmap_blocks(desc, ch, offset_blocks, num_blocks, cb, cb_arg);
2457 }
2458 
2459 int
2460 spdk_bdev_unmap_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
2461 		       uint64_t offset_blocks, uint64_t num_blocks,
2462 		       spdk_bdev_io_completion_cb cb, void *cb_arg)
2463 {
2464 	struct spdk_bdev *bdev = desc->bdev;
2465 	struct spdk_bdev_io *bdev_io;
2466 	struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch);
2467 
2468 	if (!desc->write) {
2469 		return -EBADF;
2470 	}
2471 
2472 	if (!spdk_bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) {
2473 		return -EINVAL;
2474 	}
2475 
2476 	if (num_blocks == 0) {
2477 		SPDK_ERRLOG("Can't unmap 0 bytes\n");
2478 		return -EINVAL;
2479 	}
2480 
2481 	bdev_io = spdk_bdev_get_io(channel);
2482 	if (!bdev_io) {
2483 		return -ENOMEM;
2484 	}
2485 
2486 	bdev_io->internal.ch = channel;
2487 	bdev_io->internal.desc = desc;
2488 	bdev_io->type = SPDK_BDEV_IO_TYPE_UNMAP;
2489 
2490 	bdev_io->u.bdev.iovs = &bdev_io->iov;
2491 	bdev_io->u.bdev.iovs[0].iov_base = NULL;
2492 	bdev_io->u.bdev.iovs[0].iov_len = 0;
2493 	bdev_io->u.bdev.iovcnt = 1;
2494 
2495 	bdev_io->u.bdev.offset_blocks = offset_blocks;
2496 	bdev_io->u.bdev.num_blocks = num_blocks;
2497 	spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb);
2498 
2499 	spdk_bdev_io_submit(bdev_io);
2500 	return 0;
2501 }
2502 
2503 int
2504 spdk_bdev_flush(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
2505 		uint64_t offset, uint64_t length,
2506 		spdk_bdev_io_completion_cb cb, void *cb_arg)
2507 {
2508 	uint64_t offset_blocks, num_blocks;
2509 
2510 	if (spdk_bdev_bytes_to_blocks(desc->bdev, offset, &offset_blocks, length, &num_blocks) != 0) {
2511 		return -EINVAL;
2512 	}
2513 
2514 	return spdk_bdev_flush_blocks(desc, ch, offset_blocks, num_blocks, cb, cb_arg);
2515 }
2516 
2517 int
2518 spdk_bdev_flush_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
2519 		       uint64_t offset_blocks, uint64_t num_blocks,
2520 		       spdk_bdev_io_completion_cb cb, void *cb_arg)
2521 {
2522 	struct spdk_bdev *bdev = desc->bdev;
2523 	struct spdk_bdev_io *bdev_io;
2524 	struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch);
2525 
2526 	if (!desc->write) {
2527 		return -EBADF;
2528 	}
2529 
2530 	if (!spdk_bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) {
2531 		return -EINVAL;
2532 	}
2533 
2534 	bdev_io = spdk_bdev_get_io(channel);
2535 	if (!bdev_io) {
2536 		return -ENOMEM;
2537 	}
2538 
2539 	bdev_io->internal.ch = channel;
2540 	bdev_io->internal.desc = desc;
2541 	bdev_io->type = SPDK_BDEV_IO_TYPE_FLUSH;
2542 	bdev_io->u.bdev.iovs = NULL;
2543 	bdev_io->u.bdev.iovcnt = 0;
2544 	bdev_io->u.bdev.offset_blocks = offset_blocks;
2545 	bdev_io->u.bdev.num_blocks = num_blocks;
2546 	spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb);
2547 
2548 	spdk_bdev_io_submit(bdev_io);
2549 	return 0;
2550 }
2551 
2552 static void
2553 _spdk_bdev_reset_dev(struct spdk_io_channel_iter *i, int status)
2554 {
2555 	struct spdk_bdev_channel *ch = spdk_io_channel_iter_get_ctx(i);
2556 	struct spdk_bdev_io *bdev_io;
2557 
2558 	bdev_io = TAILQ_FIRST(&ch->queued_resets);
2559 	TAILQ_REMOVE(&ch->queued_resets, bdev_io, internal.link);
2560 	spdk_bdev_io_submit_reset(bdev_io);
2561 }
2562 
2563 static void
2564 _spdk_bdev_reset_freeze_channel(struct spdk_io_channel_iter *i)
2565 {
2566 	struct spdk_io_channel		*ch;
2567 	struct spdk_bdev_channel	*channel;
2568 	struct spdk_bdev_mgmt_channel	*mgmt_channel;
2569 	struct spdk_bdev_shared_resource *shared_resource;
2570 	bdev_io_tailq_t			tmp_queued;
2571 
2572 	TAILQ_INIT(&tmp_queued);
2573 
2574 	ch = spdk_io_channel_iter_get_channel(i);
2575 	channel = spdk_io_channel_get_ctx(ch);
2576 	shared_resource = channel->shared_resource;
2577 	mgmt_channel = shared_resource->mgmt_ch;
2578 
2579 	channel->flags |= BDEV_CH_RESET_IN_PROGRESS;
2580 
2581 	if ((channel->flags & BDEV_CH_QOS_ENABLED) != 0) {
2582 		/* The QoS object is always valid and readable while
2583 		 * the channel flag is set, so the lock here should not
2584 		 * be necessary. We're not in the fast path though, so
2585 		 * just take it anyway. */
2586 		pthread_mutex_lock(&channel->bdev->internal.mutex);
2587 		if (channel->bdev->internal.qos->ch == channel) {
2588 			TAILQ_SWAP(&channel->bdev->internal.qos->queued, &tmp_queued, spdk_bdev_io, internal.link);
2589 		}
2590 		pthread_mutex_unlock(&channel->bdev->internal.mutex);
2591 	}
2592 
2593 	_spdk_bdev_abort_queued_io(&shared_resource->nomem_io, channel);
2594 	_spdk_bdev_abort_buf_io(&mgmt_channel->need_buf_small, channel);
2595 	_spdk_bdev_abort_buf_io(&mgmt_channel->need_buf_large, channel);
2596 	_spdk_bdev_abort_queued_io(&tmp_queued, channel);
2597 
2598 	spdk_for_each_channel_continue(i, 0);
2599 }
2600 
2601 static void
2602 _spdk_bdev_start_reset(void *ctx)
2603 {
2604 	struct spdk_bdev_channel *ch = ctx;
2605 
2606 	spdk_for_each_channel(__bdev_to_io_dev(ch->bdev), _spdk_bdev_reset_freeze_channel,
2607 			      ch, _spdk_bdev_reset_dev);
2608 }
2609 
2610 static void
2611 _spdk_bdev_channel_start_reset(struct spdk_bdev_channel *ch)
2612 {
2613 	struct spdk_bdev *bdev = ch->bdev;
2614 
2615 	assert(!TAILQ_EMPTY(&ch->queued_resets));
2616 
2617 	pthread_mutex_lock(&bdev->internal.mutex);
2618 	if (bdev->internal.reset_in_progress == NULL) {
2619 		bdev->internal.reset_in_progress = TAILQ_FIRST(&ch->queued_resets);
2620 		/*
2621 		 * Take a channel reference for the target bdev for the life of this
2622 		 *  reset.  This guards against the channel getting destroyed while
2623 		 *  spdk_for_each_channel() calls related to this reset IO are in
2624 		 *  progress.  We will release the reference when this reset is
2625 		 *  completed.
2626 		 */
2627 		bdev->internal.reset_in_progress->u.reset.ch_ref = spdk_get_io_channel(__bdev_to_io_dev(bdev));
2628 		_spdk_bdev_start_reset(ch);
2629 	}
2630 	pthread_mutex_unlock(&bdev->internal.mutex);
2631 }
2632 
2633 int
2634 spdk_bdev_reset(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
2635 		spdk_bdev_io_completion_cb cb, void *cb_arg)
2636 {
2637 	struct spdk_bdev *bdev = desc->bdev;
2638 	struct spdk_bdev_io *bdev_io;
2639 	struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch);
2640 
2641 	bdev_io = spdk_bdev_get_io(channel);
2642 	if (!bdev_io) {
2643 		return -ENOMEM;
2644 	}
2645 
2646 	bdev_io->internal.ch = channel;
2647 	bdev_io->internal.desc = desc;
2648 	bdev_io->type = SPDK_BDEV_IO_TYPE_RESET;
2649 	bdev_io->u.reset.ch_ref = NULL;
2650 	spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb);
2651 
2652 	pthread_mutex_lock(&bdev->internal.mutex);
2653 	TAILQ_INSERT_TAIL(&channel->queued_resets, bdev_io, internal.link);
2654 	pthread_mutex_unlock(&bdev->internal.mutex);
2655 
2656 	_spdk_bdev_channel_start_reset(channel);
2657 
2658 	return 0;
2659 }
2660 
2661 void
2662 spdk_bdev_get_io_stat(struct spdk_bdev *bdev, struct spdk_io_channel *ch,
2663 		      struct spdk_bdev_io_stat *stat)
2664 {
2665 	struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch);
2666 
2667 	*stat = channel->stat;
2668 }
2669 
2670 static void
2671 _spdk_bdev_get_device_stat_done(struct spdk_io_channel_iter *i, int status)
2672 {
2673 	void *io_device = spdk_io_channel_iter_get_io_device(i);
2674 	struct spdk_bdev_iostat_ctx *bdev_iostat_ctx = spdk_io_channel_iter_get_ctx(i);
2675 
2676 	bdev_iostat_ctx->cb(__bdev_from_io_dev(io_device), bdev_iostat_ctx->stat,
2677 			    bdev_iostat_ctx->cb_arg, 0);
2678 	free(bdev_iostat_ctx);
2679 }
2680 
2681 static void
2682 _spdk_bdev_get_each_channel_stat(struct spdk_io_channel_iter *i)
2683 {
2684 	struct spdk_bdev_iostat_ctx *bdev_iostat_ctx = spdk_io_channel_iter_get_ctx(i);
2685 	struct spdk_io_channel *ch = spdk_io_channel_iter_get_channel(i);
2686 	struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch);
2687 
2688 	_spdk_bdev_io_stat_add(bdev_iostat_ctx->stat, &channel->stat);
2689 	spdk_for_each_channel_continue(i, 0);
2690 }
2691 
2692 void
2693 spdk_bdev_get_device_stat(struct spdk_bdev *bdev, struct spdk_bdev_io_stat *stat,
2694 			  spdk_bdev_get_device_stat_cb cb, void *cb_arg)
2695 {
2696 	struct spdk_bdev_iostat_ctx *bdev_iostat_ctx;
2697 
2698 	assert(bdev != NULL);
2699 	assert(stat != NULL);
2700 	assert(cb != NULL);
2701 
2702 	bdev_iostat_ctx = calloc(1, sizeof(struct spdk_bdev_iostat_ctx));
2703 	if (bdev_iostat_ctx == NULL) {
2704 		SPDK_ERRLOG("Unable to allocate memory for spdk_bdev_iostat_ctx\n");
2705 		cb(bdev, stat, cb_arg, -ENOMEM);
2706 		return;
2707 	}
2708 
2709 	bdev_iostat_ctx->stat = stat;
2710 	bdev_iostat_ctx->cb = cb;
2711 	bdev_iostat_ctx->cb_arg = cb_arg;
2712 
2713 	/* Start with the statistics from previously deleted channels. */
2714 	pthread_mutex_lock(&bdev->internal.mutex);
2715 	_spdk_bdev_io_stat_add(bdev_iostat_ctx->stat, &bdev->internal.stat);
2716 	pthread_mutex_unlock(&bdev->internal.mutex);
2717 
2718 	/* Then iterate and add the statistics from each existing channel. */
2719 	spdk_for_each_channel(__bdev_to_io_dev(bdev),
2720 			      _spdk_bdev_get_each_channel_stat,
2721 			      bdev_iostat_ctx,
2722 			      _spdk_bdev_get_device_stat_done);
2723 }
2724 
2725 int
2726 spdk_bdev_nvme_admin_passthru(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
2727 			      const struct spdk_nvme_cmd *cmd, void *buf, size_t nbytes,
2728 			      spdk_bdev_io_completion_cb cb, void *cb_arg)
2729 {
2730 	struct spdk_bdev *bdev = desc->bdev;
2731 	struct spdk_bdev_io *bdev_io;
2732 	struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch);
2733 
2734 	if (!desc->write) {
2735 		return -EBADF;
2736 	}
2737 
2738 	bdev_io = spdk_bdev_get_io(channel);
2739 	if (!bdev_io) {
2740 		return -ENOMEM;
2741 	}
2742 
2743 	bdev_io->internal.ch = channel;
2744 	bdev_io->internal.desc = desc;
2745 	bdev_io->type = SPDK_BDEV_IO_TYPE_NVME_ADMIN;
2746 	bdev_io->u.nvme_passthru.cmd = *cmd;
2747 	bdev_io->u.nvme_passthru.buf = buf;
2748 	bdev_io->u.nvme_passthru.nbytes = nbytes;
2749 	bdev_io->u.nvme_passthru.md_buf = NULL;
2750 	bdev_io->u.nvme_passthru.md_len = 0;
2751 
2752 	spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb);
2753 
2754 	spdk_bdev_io_submit(bdev_io);
2755 	return 0;
2756 }
2757 
2758 int
2759 spdk_bdev_nvme_io_passthru(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
2760 			   const struct spdk_nvme_cmd *cmd, void *buf, size_t nbytes,
2761 			   spdk_bdev_io_completion_cb cb, void *cb_arg)
2762 {
2763 	struct spdk_bdev *bdev = desc->bdev;
2764 	struct spdk_bdev_io *bdev_io;
2765 	struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch);
2766 
2767 	if (!desc->write) {
2768 		/*
2769 		 * Do not try to parse the NVMe command - we could maybe use bits in the opcode
2770 		 *  to easily determine if the command is a read or write, but for now just
2771 		 *  do not allow io_passthru with a read-only descriptor.
2772 		 */
2773 		return -EBADF;
2774 	}
2775 
2776 	bdev_io = spdk_bdev_get_io(channel);
2777 	if (!bdev_io) {
2778 		return -ENOMEM;
2779 	}
2780 
2781 	bdev_io->internal.ch = channel;
2782 	bdev_io->internal.desc = desc;
2783 	bdev_io->type = SPDK_BDEV_IO_TYPE_NVME_IO;
2784 	bdev_io->u.nvme_passthru.cmd = *cmd;
2785 	bdev_io->u.nvme_passthru.buf = buf;
2786 	bdev_io->u.nvme_passthru.nbytes = nbytes;
2787 	bdev_io->u.nvme_passthru.md_buf = NULL;
2788 	bdev_io->u.nvme_passthru.md_len = 0;
2789 
2790 	spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb);
2791 
2792 	spdk_bdev_io_submit(bdev_io);
2793 	return 0;
2794 }
2795 
2796 int
2797 spdk_bdev_nvme_io_passthru_md(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
2798 			      const struct spdk_nvme_cmd *cmd, void *buf, size_t nbytes, void *md_buf, size_t md_len,
2799 			      spdk_bdev_io_completion_cb cb, void *cb_arg)
2800 {
2801 	struct spdk_bdev *bdev = desc->bdev;
2802 	struct spdk_bdev_io *bdev_io;
2803 	struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch);
2804 
2805 	if (!desc->write) {
2806 		/*
2807 		 * Do not try to parse the NVMe command - we could maybe use bits in the opcode
2808 		 *  to easily determine if the command is a read or write, but for now just
2809 		 *  do not allow io_passthru with a read-only descriptor.
2810 		 */
2811 		return -EBADF;
2812 	}
2813 
2814 	bdev_io = spdk_bdev_get_io(channel);
2815 	if (!bdev_io) {
2816 		return -ENOMEM;
2817 	}
2818 
2819 	bdev_io->internal.ch = channel;
2820 	bdev_io->internal.desc = desc;
2821 	bdev_io->type = SPDK_BDEV_IO_TYPE_NVME_IO_MD;
2822 	bdev_io->u.nvme_passthru.cmd = *cmd;
2823 	bdev_io->u.nvme_passthru.buf = buf;
2824 	bdev_io->u.nvme_passthru.nbytes = nbytes;
2825 	bdev_io->u.nvme_passthru.md_buf = md_buf;
2826 	bdev_io->u.nvme_passthru.md_len = md_len;
2827 
2828 	spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb);
2829 
2830 	spdk_bdev_io_submit(bdev_io);
2831 	return 0;
2832 }
2833 
2834 int
2835 spdk_bdev_queue_io_wait(struct spdk_bdev *bdev, struct spdk_io_channel *ch,
2836 			struct spdk_bdev_io_wait_entry *entry)
2837 {
2838 	struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch);
2839 	struct spdk_bdev_mgmt_channel *mgmt_ch = channel->shared_resource->mgmt_ch;
2840 
2841 	if (bdev != entry->bdev) {
2842 		SPDK_ERRLOG("bdevs do not match\n");
2843 		return -EINVAL;
2844 	}
2845 
2846 	if (mgmt_ch->per_thread_cache_count > 0) {
2847 		SPDK_ERRLOG("Cannot queue io_wait if spdk_bdev_io available in per-thread cache\n");
2848 		return -EINVAL;
2849 	}
2850 
2851 	TAILQ_INSERT_TAIL(&mgmt_ch->io_wait_queue, entry, link);
2852 	return 0;
2853 }
2854 
2855 static void
2856 _spdk_bdev_ch_retry_io(struct spdk_bdev_channel *bdev_ch)
2857 {
2858 	struct spdk_bdev *bdev = bdev_ch->bdev;
2859 	struct spdk_bdev_shared_resource *shared_resource = bdev_ch->shared_resource;
2860 	struct spdk_bdev_io *bdev_io;
2861 
2862 	if (shared_resource->io_outstanding > shared_resource->nomem_threshold) {
2863 		/*
2864 		 * Allow some more I/O to complete before retrying the nomem_io queue.
2865 		 *  Some drivers (such as nvme) cannot immediately take a new I/O in
2866 		 *  the context of a completion, because the resources for the I/O are
2867 		 *  not released until control returns to the bdev poller.  Also, we
2868 		 *  may require several small I/O to complete before a larger I/O
2869 		 *  (that requires splitting) can be submitted.
2870 		 */
2871 		return;
2872 	}
2873 
2874 	while (!TAILQ_EMPTY(&shared_resource->nomem_io)) {
2875 		bdev_io = TAILQ_FIRST(&shared_resource->nomem_io);
2876 		TAILQ_REMOVE(&shared_resource->nomem_io, bdev_io, internal.link);
2877 		bdev_io->internal.ch->io_outstanding++;
2878 		shared_resource->io_outstanding++;
2879 		bdev_io->internal.status = SPDK_BDEV_IO_STATUS_PENDING;
2880 		bdev->fn_table->submit_request(bdev_io->internal.ch->channel, bdev_io);
2881 		if (bdev_io->internal.status == SPDK_BDEV_IO_STATUS_NOMEM) {
2882 			break;
2883 		}
2884 	}
2885 }
2886 
2887 static inline void
2888 _spdk_bdev_io_complete(void *ctx)
2889 {
2890 	struct spdk_bdev_io *bdev_io = ctx;
2891 	uint64_t tsc;
2892 
2893 	if (spdk_unlikely(bdev_io->internal.in_submit_request || bdev_io->internal.io_submit_ch)) {
2894 		/*
2895 		 * Send the completion to the thread that originally submitted the I/O,
2896 		 * which may not be the current thread in the case of QoS.
2897 		 */
2898 		if (bdev_io->internal.io_submit_ch) {
2899 			bdev_io->internal.ch = bdev_io->internal.io_submit_ch;
2900 			bdev_io->internal.io_submit_ch = NULL;
2901 		}
2902 
2903 		/*
2904 		 * Defer completion to avoid potential infinite recursion if the
2905 		 * user's completion callback issues a new I/O.
2906 		 */
2907 		spdk_thread_send_msg(spdk_io_channel_get_thread(bdev_io->internal.ch->channel),
2908 				     _spdk_bdev_io_complete, bdev_io);
2909 		return;
2910 	}
2911 
2912 	tsc = spdk_get_ticks();
2913 	spdk_trace_record_tsc(tsc, TRACE_BDEV_IO_DONE, 0, 0, (uintptr_t)bdev_io, 0);
2914 
2915 	if (bdev_io->internal.status == SPDK_BDEV_IO_STATUS_SUCCESS) {
2916 		switch (bdev_io->type) {
2917 		case SPDK_BDEV_IO_TYPE_READ:
2918 			bdev_io->internal.ch->stat.bytes_read += bdev_io->u.bdev.num_blocks * bdev_io->bdev->blocklen;
2919 			bdev_io->internal.ch->stat.num_read_ops++;
2920 			bdev_io->internal.ch->stat.read_latency_ticks += (tsc - bdev_io->internal.submit_tsc);
2921 			break;
2922 		case SPDK_BDEV_IO_TYPE_WRITE:
2923 			bdev_io->internal.ch->stat.bytes_written += bdev_io->u.bdev.num_blocks * bdev_io->bdev->blocklen;
2924 			bdev_io->internal.ch->stat.num_write_ops++;
2925 			bdev_io->internal.ch->stat.write_latency_ticks += (tsc - bdev_io->internal.submit_tsc);
2926 			break;
2927 		default:
2928 			break;
2929 		}
2930 	}
2931 
2932 #ifdef SPDK_CONFIG_VTUNE
2933 	uint64_t now_tsc = spdk_get_ticks();
2934 	if (now_tsc > (bdev_io->internal.ch->start_tsc + bdev_io->internal.ch->interval_tsc)) {
2935 		uint64_t data[5];
2936 
2937 		data[0] = bdev_io->internal.ch->stat.num_read_ops - bdev_io->internal.ch->prev_stat.num_read_ops;
2938 		data[1] = bdev_io->internal.ch->stat.bytes_read - bdev_io->internal.ch->prev_stat.bytes_read;
2939 		data[2] = bdev_io->internal.ch->stat.num_write_ops - bdev_io->internal.ch->prev_stat.num_write_ops;
2940 		data[3] = bdev_io->internal.ch->stat.bytes_written - bdev_io->internal.ch->prev_stat.bytes_written;
2941 		data[4] = bdev_io->bdev->fn_table->get_spin_time ?
2942 			  bdev_io->bdev->fn_table->get_spin_time(bdev_io->internal.ch->channel) : 0;
2943 
2944 		__itt_metadata_add(g_bdev_mgr.domain, __itt_null, bdev_io->internal.ch->handle,
2945 				   __itt_metadata_u64, 5, data);
2946 
2947 		bdev_io->internal.ch->prev_stat = bdev_io->internal.ch->stat;
2948 		bdev_io->internal.ch->start_tsc = now_tsc;
2949 	}
2950 #endif
2951 
2952 	assert(bdev_io->internal.cb != NULL);
2953 	assert(spdk_get_thread() == spdk_io_channel_get_thread(bdev_io->internal.ch->channel));
2954 
2955 	bdev_io->internal.cb(bdev_io, bdev_io->internal.status == SPDK_BDEV_IO_STATUS_SUCCESS,
2956 			     bdev_io->internal.caller_ctx);
2957 }
2958 
2959 static void
2960 _spdk_bdev_reset_complete(struct spdk_io_channel_iter *i, int status)
2961 {
2962 	struct spdk_bdev_io *bdev_io = spdk_io_channel_iter_get_ctx(i);
2963 
2964 	if (bdev_io->u.reset.ch_ref != NULL) {
2965 		spdk_put_io_channel(bdev_io->u.reset.ch_ref);
2966 		bdev_io->u.reset.ch_ref = NULL;
2967 	}
2968 
2969 	_spdk_bdev_io_complete(bdev_io);
2970 }
2971 
2972 static void
2973 _spdk_bdev_unfreeze_channel(struct spdk_io_channel_iter *i)
2974 {
2975 	struct spdk_io_channel *_ch = spdk_io_channel_iter_get_channel(i);
2976 	struct spdk_bdev_channel *ch = spdk_io_channel_get_ctx(_ch);
2977 
2978 	ch->flags &= ~BDEV_CH_RESET_IN_PROGRESS;
2979 	if (!TAILQ_EMPTY(&ch->queued_resets)) {
2980 		_spdk_bdev_channel_start_reset(ch);
2981 	}
2982 
2983 	spdk_for_each_channel_continue(i, 0);
2984 }
2985 
2986 void
2987 spdk_bdev_io_complete(struct spdk_bdev_io *bdev_io, enum spdk_bdev_io_status status)
2988 {
2989 	struct spdk_bdev *bdev = bdev_io->bdev;
2990 	struct spdk_bdev_channel *bdev_ch = bdev_io->internal.ch;
2991 	struct spdk_bdev_shared_resource *shared_resource = bdev_ch->shared_resource;
2992 
2993 	bdev_io->internal.status = status;
2994 
2995 	if (spdk_unlikely(bdev_io->type == SPDK_BDEV_IO_TYPE_RESET)) {
2996 		bool unlock_channels = false;
2997 
2998 		if (status == SPDK_BDEV_IO_STATUS_NOMEM) {
2999 			SPDK_ERRLOG("NOMEM returned for reset\n");
3000 		}
3001 		pthread_mutex_lock(&bdev->internal.mutex);
3002 		if (bdev_io == bdev->internal.reset_in_progress) {
3003 			bdev->internal.reset_in_progress = NULL;
3004 			unlock_channels = true;
3005 		}
3006 		pthread_mutex_unlock(&bdev->internal.mutex);
3007 
3008 		if (unlock_channels) {
3009 			spdk_for_each_channel(__bdev_to_io_dev(bdev), _spdk_bdev_unfreeze_channel,
3010 					      bdev_io, _spdk_bdev_reset_complete);
3011 			return;
3012 		}
3013 	} else {
3014 		assert(bdev_ch->io_outstanding > 0);
3015 		assert(shared_resource->io_outstanding > 0);
3016 		bdev_ch->io_outstanding--;
3017 		shared_resource->io_outstanding--;
3018 
3019 		if (spdk_unlikely(status == SPDK_BDEV_IO_STATUS_NOMEM)) {
3020 			TAILQ_INSERT_HEAD(&shared_resource->nomem_io, bdev_io, internal.link);
3021 			/*
3022 			 * Wait for some of the outstanding I/O to complete before we
3023 			 *  retry any of the nomem_io.  Normally we will wait for
3024 			 *  NOMEM_THRESHOLD_COUNT I/O to complete but for low queue
3025 			 *  depth channels we will instead wait for half to complete.
3026 			 */
3027 			shared_resource->nomem_threshold = spdk_max((int64_t)shared_resource->io_outstanding / 2,
3028 							   (int64_t)shared_resource->io_outstanding - NOMEM_THRESHOLD_COUNT);
3029 			return;
3030 		}
3031 
3032 		if (spdk_unlikely(!TAILQ_EMPTY(&shared_resource->nomem_io))) {
3033 			_spdk_bdev_ch_retry_io(bdev_ch);
3034 		}
3035 	}
3036 
3037 	_spdk_bdev_io_complete(bdev_io);
3038 }
3039 
3040 void
3041 spdk_bdev_io_complete_scsi_status(struct spdk_bdev_io *bdev_io, enum spdk_scsi_status sc,
3042 				  enum spdk_scsi_sense sk, uint8_t asc, uint8_t ascq)
3043 {
3044 	if (sc == SPDK_SCSI_STATUS_GOOD) {
3045 		bdev_io->internal.status = SPDK_BDEV_IO_STATUS_SUCCESS;
3046 	} else {
3047 		bdev_io->internal.status = SPDK_BDEV_IO_STATUS_SCSI_ERROR;
3048 		bdev_io->internal.error.scsi.sc = sc;
3049 		bdev_io->internal.error.scsi.sk = sk;
3050 		bdev_io->internal.error.scsi.asc = asc;
3051 		bdev_io->internal.error.scsi.ascq = ascq;
3052 	}
3053 
3054 	spdk_bdev_io_complete(bdev_io, bdev_io->internal.status);
3055 }
3056 
3057 void
3058 spdk_bdev_io_get_scsi_status(const struct spdk_bdev_io *bdev_io,
3059 			     int *sc, int *sk, int *asc, int *ascq)
3060 {
3061 	assert(sc != NULL);
3062 	assert(sk != NULL);
3063 	assert(asc != NULL);
3064 	assert(ascq != NULL);
3065 
3066 	switch (bdev_io->internal.status) {
3067 	case SPDK_BDEV_IO_STATUS_SUCCESS:
3068 		*sc = SPDK_SCSI_STATUS_GOOD;
3069 		*sk = SPDK_SCSI_SENSE_NO_SENSE;
3070 		*asc = SPDK_SCSI_ASC_NO_ADDITIONAL_SENSE;
3071 		*ascq = SPDK_SCSI_ASCQ_CAUSE_NOT_REPORTABLE;
3072 		break;
3073 	case SPDK_BDEV_IO_STATUS_NVME_ERROR:
3074 		spdk_scsi_nvme_translate(bdev_io, sc, sk, asc, ascq);
3075 		break;
3076 	case SPDK_BDEV_IO_STATUS_SCSI_ERROR:
3077 		*sc = bdev_io->internal.error.scsi.sc;
3078 		*sk = bdev_io->internal.error.scsi.sk;
3079 		*asc = bdev_io->internal.error.scsi.asc;
3080 		*ascq = bdev_io->internal.error.scsi.ascq;
3081 		break;
3082 	default:
3083 		*sc = SPDK_SCSI_STATUS_CHECK_CONDITION;
3084 		*sk = SPDK_SCSI_SENSE_ABORTED_COMMAND;
3085 		*asc = SPDK_SCSI_ASC_NO_ADDITIONAL_SENSE;
3086 		*ascq = SPDK_SCSI_ASCQ_CAUSE_NOT_REPORTABLE;
3087 		break;
3088 	}
3089 }
3090 
3091 void
3092 spdk_bdev_io_complete_nvme_status(struct spdk_bdev_io *bdev_io, int sct, int sc)
3093 {
3094 	if (sct == SPDK_NVME_SCT_GENERIC && sc == SPDK_NVME_SC_SUCCESS) {
3095 		bdev_io->internal.status = SPDK_BDEV_IO_STATUS_SUCCESS;
3096 	} else {
3097 		bdev_io->internal.error.nvme.sct = sct;
3098 		bdev_io->internal.error.nvme.sc = sc;
3099 		bdev_io->internal.status = SPDK_BDEV_IO_STATUS_NVME_ERROR;
3100 	}
3101 
3102 	spdk_bdev_io_complete(bdev_io, bdev_io->internal.status);
3103 }
3104 
3105 void
3106 spdk_bdev_io_get_nvme_status(const struct spdk_bdev_io *bdev_io, int *sct, int *sc)
3107 {
3108 	assert(sct != NULL);
3109 	assert(sc != NULL);
3110 
3111 	if (bdev_io->internal.status == SPDK_BDEV_IO_STATUS_NVME_ERROR) {
3112 		*sct = bdev_io->internal.error.nvme.sct;
3113 		*sc = bdev_io->internal.error.nvme.sc;
3114 	} else if (bdev_io->internal.status == SPDK_BDEV_IO_STATUS_SUCCESS) {
3115 		*sct = SPDK_NVME_SCT_GENERIC;
3116 		*sc = SPDK_NVME_SC_SUCCESS;
3117 	} else {
3118 		*sct = SPDK_NVME_SCT_GENERIC;
3119 		*sc = SPDK_NVME_SC_INTERNAL_DEVICE_ERROR;
3120 	}
3121 }
3122 
3123 struct spdk_thread *
3124 spdk_bdev_io_get_thread(struct spdk_bdev_io *bdev_io)
3125 {
3126 	return spdk_io_channel_get_thread(bdev_io->internal.ch->channel);
3127 }
3128 
3129 static void
3130 _spdk_bdev_qos_config_limit(struct spdk_bdev *bdev, uint64_t *limits)
3131 {
3132 	uint64_t	min_qos_set;
3133 	int		i;
3134 
3135 	for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) {
3136 		if (limits[i] != SPDK_BDEV_QOS_LIMIT_NOT_DEFINED) {
3137 			break;
3138 		}
3139 	}
3140 
3141 	if (i == SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES) {
3142 		SPDK_ERRLOG("Invalid rate limits set.\n");
3143 		return;
3144 	}
3145 
3146 	for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) {
3147 		if (limits[i] == SPDK_BDEV_QOS_LIMIT_NOT_DEFINED) {
3148 			continue;
3149 		}
3150 
3151 		if (_spdk_bdev_qos_is_iops_rate_limit(i) == true) {
3152 			min_qos_set = SPDK_BDEV_QOS_MIN_IOS_PER_SEC;
3153 		} else {
3154 			min_qos_set = SPDK_BDEV_QOS_MIN_BYTES_PER_SEC;
3155 		}
3156 
3157 		if (limits[i] == 0 || limits[i] % min_qos_set) {
3158 			SPDK_ERRLOG("Assigned limit %" PRIu64 " on bdev %s is not multiple of %" PRIu64 "\n",
3159 				    limits[i], bdev->name, min_qos_set);
3160 			SPDK_ERRLOG("Failed to enable QoS on this bdev %s\n", bdev->name);
3161 			return;
3162 		}
3163 	}
3164 
3165 	if (!bdev->internal.qos) {
3166 		bdev->internal.qos = calloc(1, sizeof(*bdev->internal.qos));
3167 		if (!bdev->internal.qos) {
3168 			SPDK_ERRLOG("Unable to allocate memory for QoS tracking\n");
3169 			return;
3170 		}
3171 	}
3172 
3173 	for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) {
3174 		bdev->internal.qos->rate_limits[i].limit = limits[i];
3175 		SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Bdev:%s QoS type:%d set:%lu\n",
3176 			      bdev->name, i, limits[i]);
3177 	}
3178 
3179 	return;
3180 }
3181 
3182 static void
3183 _spdk_bdev_qos_config(struct spdk_bdev *bdev)
3184 {
3185 	struct spdk_conf_section	*sp = NULL;
3186 	const char			*val = NULL;
3187 	int				i = 0, j = 0;
3188 	uint64_t			limits[SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES] = {};
3189 	bool				config_qos = false;
3190 
3191 	sp = spdk_conf_find_section(NULL, "QoS");
3192 	if (!sp) {
3193 		return;
3194 	}
3195 
3196 	while (j < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES) {
3197 		limits[j] = SPDK_BDEV_QOS_LIMIT_NOT_DEFINED;
3198 
3199 		i = 0;
3200 		while (true) {
3201 			val = spdk_conf_section_get_nmval(sp, qos_conf_type[j], i, 0);
3202 			if (!val) {
3203 				break;
3204 			}
3205 
3206 			if (strcmp(bdev->name, val) != 0) {
3207 				i++;
3208 				continue;
3209 			}
3210 
3211 			val = spdk_conf_section_get_nmval(sp, qos_conf_type[j], i, 1);
3212 			if (val) {
3213 				if (_spdk_bdev_qos_is_iops_rate_limit(j) == true) {
3214 					limits[j] = strtoull(val, NULL, 10);
3215 				} else {
3216 					limits[j] = strtoull(val, NULL, 10) * 1024 * 1024;
3217 				}
3218 				config_qos = true;
3219 			}
3220 
3221 			break;
3222 		}
3223 
3224 		j++;
3225 	}
3226 
3227 	if (config_qos == true) {
3228 		_spdk_bdev_qos_config_limit(bdev, limits);
3229 	}
3230 
3231 	return;
3232 }
3233 
3234 static int
3235 spdk_bdev_init(struct spdk_bdev *bdev)
3236 {
3237 	char *bdev_name;
3238 
3239 	assert(bdev->module != NULL);
3240 
3241 	if (!bdev->name) {
3242 		SPDK_ERRLOG("Bdev name is NULL\n");
3243 		return -EINVAL;
3244 	}
3245 
3246 	if (spdk_bdev_get_by_name(bdev->name)) {
3247 		SPDK_ERRLOG("Bdev name:%s already exists\n", bdev->name);
3248 		return -EEXIST;
3249 	}
3250 
3251 	/* Users often register their own I/O devices using the bdev name. In
3252 	 * order to avoid conflicts, prepend bdev_. */
3253 	bdev_name = spdk_sprintf_alloc("bdev_%s", bdev->name);
3254 	if (!bdev_name) {
3255 		SPDK_ERRLOG("Unable to allocate memory for internal bdev name.\n");
3256 		return -ENOMEM;
3257 	}
3258 
3259 	bdev->internal.status = SPDK_BDEV_STATUS_READY;
3260 	bdev->internal.measured_queue_depth = UINT64_MAX;
3261 	bdev->internal.claim_module = NULL;
3262 	bdev->internal.qd_poller = NULL;
3263 	bdev->internal.qos = NULL;
3264 
3265 	TAILQ_INIT(&bdev->internal.open_descs);
3266 
3267 	TAILQ_INIT(&bdev->aliases);
3268 
3269 	bdev->internal.reset_in_progress = NULL;
3270 
3271 	_spdk_bdev_qos_config(bdev);
3272 
3273 	spdk_io_device_register(__bdev_to_io_dev(bdev),
3274 				spdk_bdev_channel_create, spdk_bdev_channel_destroy,
3275 				sizeof(struct spdk_bdev_channel),
3276 				bdev_name);
3277 
3278 	free(bdev_name);
3279 
3280 	pthread_mutex_init(&bdev->internal.mutex, NULL);
3281 	return 0;
3282 }
3283 
3284 static void
3285 spdk_bdev_destroy_cb(void *io_device)
3286 {
3287 	int			rc;
3288 	struct spdk_bdev	*bdev;
3289 	spdk_bdev_unregister_cb	cb_fn;
3290 	void			*cb_arg;
3291 
3292 	bdev = __bdev_from_io_dev(io_device);
3293 	cb_fn = bdev->internal.unregister_cb;
3294 	cb_arg = bdev->internal.unregister_ctx;
3295 
3296 	rc = bdev->fn_table->destruct(bdev->ctxt);
3297 	if (rc < 0) {
3298 		SPDK_ERRLOG("destruct failed\n");
3299 	}
3300 	if (rc <= 0 && cb_fn != NULL) {
3301 		cb_fn(cb_arg, rc);
3302 	}
3303 }
3304 
3305 
3306 static void
3307 spdk_bdev_fini(struct spdk_bdev *bdev)
3308 {
3309 	pthread_mutex_destroy(&bdev->internal.mutex);
3310 
3311 	free(bdev->internal.qos);
3312 
3313 	spdk_io_device_unregister(__bdev_to_io_dev(bdev), spdk_bdev_destroy_cb);
3314 }
3315 
3316 static void
3317 spdk_bdev_start(struct spdk_bdev *bdev)
3318 {
3319 	struct spdk_bdev_module *module;
3320 	uint32_t action;
3321 
3322 	SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Inserting bdev %s into list\n", bdev->name);
3323 	TAILQ_INSERT_TAIL(&g_bdev_mgr.bdevs, bdev, internal.link);
3324 
3325 	/* Examine configuration before initializing I/O */
3326 	TAILQ_FOREACH(module, &g_bdev_mgr.bdev_modules, internal.tailq) {
3327 		if (module->examine_config) {
3328 			action = module->internal.action_in_progress;
3329 			module->internal.action_in_progress++;
3330 			module->examine_config(bdev);
3331 			if (action != module->internal.action_in_progress) {
3332 				SPDK_ERRLOG("examine_config for module %s did not call spdk_bdev_module_examine_done()\n",
3333 					    module->name);
3334 			}
3335 		}
3336 	}
3337 
3338 	if (bdev->internal.claim_module) {
3339 		return;
3340 	}
3341 
3342 	TAILQ_FOREACH(module, &g_bdev_mgr.bdev_modules, internal.tailq) {
3343 		if (module->examine_disk) {
3344 			module->internal.action_in_progress++;
3345 			module->examine_disk(bdev);
3346 		}
3347 	}
3348 }
3349 
3350 int
3351 spdk_bdev_register(struct spdk_bdev *bdev)
3352 {
3353 	int rc = spdk_bdev_init(bdev);
3354 
3355 	if (rc == 0) {
3356 		spdk_bdev_start(bdev);
3357 	}
3358 
3359 	return rc;
3360 }
3361 
3362 int
3363 spdk_vbdev_register(struct spdk_bdev *vbdev, struct spdk_bdev **base_bdevs, int base_bdev_count)
3364 {
3365 	int rc;
3366 
3367 	rc = spdk_bdev_init(vbdev);
3368 	if (rc) {
3369 		return rc;
3370 	}
3371 
3372 	spdk_bdev_start(vbdev);
3373 	return 0;
3374 }
3375 
3376 void
3377 spdk_bdev_destruct_done(struct spdk_bdev *bdev, int bdeverrno)
3378 {
3379 	if (bdev->internal.unregister_cb != NULL) {
3380 		bdev->internal.unregister_cb(bdev->internal.unregister_ctx, bdeverrno);
3381 	}
3382 }
3383 
3384 static void
3385 _remove_notify(void *arg)
3386 {
3387 	struct spdk_bdev_desc *desc = arg;
3388 
3389 	desc->remove_scheduled = false;
3390 
3391 	if (desc->closed) {
3392 		free(desc);
3393 	} else {
3394 		desc->remove_cb(desc->remove_ctx);
3395 	}
3396 }
3397 
3398 void
3399 spdk_bdev_unregister(struct spdk_bdev *bdev, spdk_bdev_unregister_cb cb_fn, void *cb_arg)
3400 {
3401 	struct spdk_bdev_desc	*desc, *tmp;
3402 	bool			do_destruct = true;
3403 	struct spdk_thread	*thread;
3404 
3405 	SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Removing bdev %s from list\n", bdev->name);
3406 
3407 	thread = spdk_get_thread();
3408 	if (!thread) {
3409 		/* The user called this from a non-SPDK thread. */
3410 		if (cb_fn != NULL) {
3411 			cb_fn(cb_arg, -ENOTSUP);
3412 		}
3413 		return;
3414 	}
3415 
3416 	pthread_mutex_lock(&bdev->internal.mutex);
3417 
3418 	bdev->internal.status = SPDK_BDEV_STATUS_REMOVING;
3419 	bdev->internal.unregister_cb = cb_fn;
3420 	bdev->internal.unregister_ctx = cb_arg;
3421 
3422 	TAILQ_FOREACH_SAFE(desc, &bdev->internal.open_descs, link, tmp) {
3423 		if (desc->remove_cb) {
3424 			do_destruct = false;
3425 			/*
3426 			 * Defer invocation of the remove_cb to a separate message that will
3427 			 *  run later on its thread.  This ensures this context unwinds and
3428 			 *  we don't recursively unregister this bdev again if the remove_cb
3429 			 *  immediately closes its descriptor.
3430 			 */
3431 			if (!desc->remove_scheduled) {
3432 				/* Avoid scheduling removal of the same descriptor multiple times. */
3433 				desc->remove_scheduled = true;
3434 				spdk_thread_send_msg(desc->thread, _remove_notify, desc);
3435 			}
3436 		}
3437 	}
3438 
3439 	if (!do_destruct) {
3440 		pthread_mutex_unlock(&bdev->internal.mutex);
3441 		return;
3442 	}
3443 
3444 	TAILQ_REMOVE(&g_bdev_mgr.bdevs, bdev, internal.link);
3445 	pthread_mutex_unlock(&bdev->internal.mutex);
3446 
3447 	spdk_bdev_fini(bdev);
3448 }
3449 
3450 int
3451 spdk_bdev_open(struct spdk_bdev *bdev, bool write, spdk_bdev_remove_cb_t remove_cb,
3452 	       void *remove_ctx, struct spdk_bdev_desc **_desc)
3453 {
3454 	struct spdk_bdev_desc *desc;
3455 	struct spdk_thread *thread;
3456 
3457 	thread = spdk_get_thread();
3458 	if (!thread) {
3459 		SPDK_ERRLOG("Cannot open bdev from non-SPDK thread.\n");
3460 		return -ENOTSUP;
3461 	}
3462 
3463 	desc = calloc(1, sizeof(*desc));
3464 	if (desc == NULL) {
3465 		SPDK_ERRLOG("Failed to allocate memory for bdev descriptor\n");
3466 		return -ENOMEM;
3467 	}
3468 
3469 	SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Opening descriptor %p for bdev %s on thread %p\n", desc, bdev->name,
3470 		      spdk_get_thread());
3471 
3472 	pthread_mutex_lock(&bdev->internal.mutex);
3473 
3474 	if (write && bdev->internal.claim_module) {
3475 		SPDK_ERRLOG("Could not open %s - %s module already claimed it\n",
3476 			    bdev->name, bdev->internal.claim_module->name);
3477 		free(desc);
3478 		pthread_mutex_unlock(&bdev->internal.mutex);
3479 		return -EPERM;
3480 	}
3481 
3482 	TAILQ_INSERT_TAIL(&bdev->internal.open_descs, desc, link);
3483 
3484 	desc->bdev = bdev;
3485 	desc->thread = thread;
3486 	desc->remove_cb = remove_cb;
3487 	desc->remove_ctx = remove_ctx;
3488 	desc->write = write;
3489 	*_desc = desc;
3490 
3491 	pthread_mutex_unlock(&bdev->internal.mutex);
3492 
3493 	return 0;
3494 }
3495 
3496 void
3497 spdk_bdev_close(struct spdk_bdev_desc *desc)
3498 {
3499 	struct spdk_bdev *bdev = desc->bdev;
3500 	bool do_unregister = false;
3501 
3502 	SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Closing descriptor %p for bdev %s on thread %p\n", desc, bdev->name,
3503 		      spdk_get_thread());
3504 
3505 	assert(desc->thread == spdk_get_thread());
3506 
3507 	pthread_mutex_lock(&bdev->internal.mutex);
3508 
3509 	TAILQ_REMOVE(&bdev->internal.open_descs, desc, link);
3510 
3511 	desc->closed = true;
3512 
3513 	if (!desc->remove_scheduled) {
3514 		free(desc);
3515 	}
3516 
3517 	/* If no more descriptors, kill QoS channel */
3518 	if (bdev->internal.qos && TAILQ_EMPTY(&bdev->internal.open_descs)) {
3519 		SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Closed last descriptor for bdev %s on thread %p. Stopping QoS.\n",
3520 			      bdev->name, spdk_get_thread());
3521 
3522 		if (spdk_bdev_qos_destroy(bdev)) {
3523 			/* There isn't anything we can do to recover here. Just let the
3524 			 * old QoS poller keep running. The QoS handling won't change
3525 			 * cores when the user allocates a new channel, but it won't break. */
3526 			SPDK_ERRLOG("Unable to shut down QoS poller. It will continue running on the current thread.\n");
3527 		}
3528 	}
3529 
3530 	spdk_bdev_set_qd_sampling_period(bdev, 0);
3531 
3532 	if (bdev->internal.status == SPDK_BDEV_STATUS_REMOVING && TAILQ_EMPTY(&bdev->internal.open_descs)) {
3533 		do_unregister = true;
3534 	}
3535 	pthread_mutex_unlock(&bdev->internal.mutex);
3536 
3537 	if (do_unregister == true) {
3538 		spdk_bdev_unregister(bdev, bdev->internal.unregister_cb, bdev->internal.unregister_ctx);
3539 	}
3540 }
3541 
3542 int
3543 spdk_bdev_module_claim_bdev(struct spdk_bdev *bdev, struct spdk_bdev_desc *desc,
3544 			    struct spdk_bdev_module *module)
3545 {
3546 	if (bdev->internal.claim_module != NULL) {
3547 		SPDK_ERRLOG("bdev %s already claimed by module %s\n", bdev->name,
3548 			    bdev->internal.claim_module->name);
3549 		return -EPERM;
3550 	}
3551 
3552 	if (desc && !desc->write) {
3553 		desc->write = true;
3554 	}
3555 
3556 	bdev->internal.claim_module = module;
3557 	return 0;
3558 }
3559 
3560 void
3561 spdk_bdev_module_release_bdev(struct spdk_bdev *bdev)
3562 {
3563 	assert(bdev->internal.claim_module != NULL);
3564 	bdev->internal.claim_module = NULL;
3565 }
3566 
3567 struct spdk_bdev *
3568 spdk_bdev_desc_get_bdev(struct spdk_bdev_desc *desc)
3569 {
3570 	return desc->bdev;
3571 }
3572 
3573 void
3574 spdk_bdev_io_get_iovec(struct spdk_bdev_io *bdev_io, struct iovec **iovp, int *iovcntp)
3575 {
3576 	struct iovec *iovs;
3577 	int iovcnt;
3578 
3579 	if (bdev_io == NULL) {
3580 		return;
3581 	}
3582 
3583 	switch (bdev_io->type) {
3584 	case SPDK_BDEV_IO_TYPE_READ:
3585 		iovs = bdev_io->u.bdev.iovs;
3586 		iovcnt = bdev_io->u.bdev.iovcnt;
3587 		break;
3588 	case SPDK_BDEV_IO_TYPE_WRITE:
3589 		iovs = bdev_io->u.bdev.iovs;
3590 		iovcnt = bdev_io->u.bdev.iovcnt;
3591 		break;
3592 	default:
3593 		iovs = NULL;
3594 		iovcnt = 0;
3595 		break;
3596 	}
3597 
3598 	if (iovp) {
3599 		*iovp = iovs;
3600 	}
3601 	if (iovcntp) {
3602 		*iovcntp = iovcnt;
3603 	}
3604 }
3605 
3606 void
3607 spdk_bdev_module_list_add(struct spdk_bdev_module *bdev_module)
3608 {
3609 
3610 	if (spdk_bdev_module_list_find(bdev_module->name)) {
3611 		SPDK_ERRLOG("ERROR: module '%s' already registered.\n", bdev_module->name);
3612 		assert(false);
3613 	}
3614 
3615 	if (bdev_module->async_init) {
3616 		bdev_module->internal.action_in_progress = 1;
3617 	}
3618 
3619 	/*
3620 	 * Modules with examine callbacks must be initialized first, so they are
3621 	 *  ready to handle examine callbacks from later modules that will
3622 	 *  register physical bdevs.
3623 	 */
3624 	if (bdev_module->examine_config != NULL || bdev_module->examine_disk != NULL) {
3625 		TAILQ_INSERT_HEAD(&g_bdev_mgr.bdev_modules, bdev_module, internal.tailq);
3626 	} else {
3627 		TAILQ_INSERT_TAIL(&g_bdev_mgr.bdev_modules, bdev_module, internal.tailq);
3628 	}
3629 }
3630 
3631 struct spdk_bdev_module *
3632 spdk_bdev_module_list_find(const char *name)
3633 {
3634 	struct spdk_bdev_module *bdev_module;
3635 
3636 	TAILQ_FOREACH(bdev_module, &g_bdev_mgr.bdev_modules, internal.tailq) {
3637 		if (strcmp(name, bdev_module->name) == 0) {
3638 			break;
3639 		}
3640 	}
3641 
3642 	return bdev_module;
3643 }
3644 
3645 static void
3646 _spdk_bdev_write_zero_buffer_next(void *_bdev_io)
3647 {
3648 	struct spdk_bdev_io *bdev_io = _bdev_io;
3649 	uint64_t num_bytes, num_blocks;
3650 	int rc;
3651 
3652 	num_bytes = spdk_min(spdk_bdev_get_block_size(bdev_io->bdev) *
3653 			     bdev_io->u.bdev.split_remaining_num_blocks,
3654 			     ZERO_BUFFER_SIZE);
3655 	num_blocks = num_bytes / spdk_bdev_get_block_size(bdev_io->bdev);
3656 
3657 	rc = spdk_bdev_write_blocks(bdev_io->internal.desc,
3658 				    spdk_io_channel_from_ctx(bdev_io->internal.ch),
3659 				    g_bdev_mgr.zero_buffer,
3660 				    bdev_io->u.bdev.split_current_offset_blocks, num_blocks,
3661 				    _spdk_bdev_write_zero_buffer_done, bdev_io);
3662 	if (rc == 0) {
3663 		bdev_io->u.bdev.split_remaining_num_blocks -= num_blocks;
3664 		bdev_io->u.bdev.split_current_offset_blocks += num_blocks;
3665 	} else if (rc == -ENOMEM) {
3666 		_spdk_bdev_queue_io_wait_with_cb(bdev_io, _spdk_bdev_write_zero_buffer_next);
3667 	} else {
3668 		bdev_io->internal.status = SPDK_BDEV_IO_STATUS_FAILED;
3669 		bdev_io->internal.cb(bdev_io, false, bdev_io->internal.caller_ctx);
3670 	}
3671 }
3672 
3673 static void
3674 _spdk_bdev_write_zero_buffer_done(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg)
3675 {
3676 	struct spdk_bdev_io *parent_io = cb_arg;
3677 
3678 	spdk_bdev_free_io(bdev_io);
3679 
3680 	if (!success) {
3681 		parent_io->internal.status = SPDK_BDEV_IO_STATUS_FAILED;
3682 		parent_io->internal.cb(parent_io, false, parent_io->internal.caller_ctx);
3683 		return;
3684 	}
3685 
3686 	if (parent_io->u.bdev.split_remaining_num_blocks == 0) {
3687 		parent_io->internal.status = SPDK_BDEV_IO_STATUS_SUCCESS;
3688 		parent_io->internal.cb(parent_io, true, parent_io->internal.caller_ctx);
3689 		return;
3690 	}
3691 
3692 	_spdk_bdev_write_zero_buffer_next(parent_io);
3693 }
3694 
3695 struct set_qos_limit_ctx {
3696 	void (*cb_fn)(void *cb_arg, int status);
3697 	void *cb_arg;
3698 	struct spdk_bdev *bdev;
3699 };
3700 
3701 static void
3702 _spdk_bdev_set_qos_limit_done(struct set_qos_limit_ctx *ctx, int status)
3703 {
3704 	pthread_mutex_lock(&ctx->bdev->internal.mutex);
3705 	ctx->bdev->internal.qos_mod_in_progress = false;
3706 	pthread_mutex_unlock(&ctx->bdev->internal.mutex);
3707 
3708 	ctx->cb_fn(ctx->cb_arg, status);
3709 	free(ctx);
3710 }
3711 
3712 static void
3713 _spdk_bdev_disable_qos_done(void *cb_arg)
3714 {
3715 	struct set_qos_limit_ctx *ctx = cb_arg;
3716 	struct spdk_bdev *bdev = ctx->bdev;
3717 	struct spdk_bdev_io *bdev_io;
3718 	struct spdk_bdev_qos *qos;
3719 
3720 	pthread_mutex_lock(&bdev->internal.mutex);
3721 	qos = bdev->internal.qos;
3722 	bdev->internal.qos = NULL;
3723 	pthread_mutex_unlock(&bdev->internal.mutex);
3724 
3725 	while (!TAILQ_EMPTY(&qos->queued)) {
3726 		/* Send queued I/O back to their original thread for resubmission. */
3727 		bdev_io = TAILQ_FIRST(&qos->queued);
3728 		TAILQ_REMOVE(&qos->queued, bdev_io, internal.link);
3729 
3730 		if (bdev_io->internal.io_submit_ch) {
3731 			/*
3732 			 * Channel was changed when sending it to the QoS thread - change it back
3733 			 *  before sending it back to the original thread.
3734 			 */
3735 			bdev_io->internal.ch = bdev_io->internal.io_submit_ch;
3736 			bdev_io->internal.io_submit_ch = NULL;
3737 		}
3738 
3739 		spdk_thread_send_msg(spdk_io_channel_get_thread(bdev_io->internal.ch->channel),
3740 				     _spdk_bdev_io_submit, bdev_io);
3741 	}
3742 
3743 	spdk_put_io_channel(spdk_io_channel_from_ctx(qos->ch));
3744 	spdk_poller_unregister(&qos->poller);
3745 
3746 	free(qos);
3747 
3748 	_spdk_bdev_set_qos_limit_done(ctx, 0);
3749 }
3750 
3751 static void
3752 _spdk_bdev_disable_qos_msg_done(struct spdk_io_channel_iter *i, int status)
3753 {
3754 	void *io_device = spdk_io_channel_iter_get_io_device(i);
3755 	struct spdk_bdev *bdev = __bdev_from_io_dev(io_device);
3756 	struct set_qos_limit_ctx *ctx = spdk_io_channel_iter_get_ctx(i);
3757 	struct spdk_thread *thread;
3758 
3759 	pthread_mutex_lock(&bdev->internal.mutex);
3760 	thread = bdev->internal.qos->thread;
3761 	pthread_mutex_unlock(&bdev->internal.mutex);
3762 
3763 	spdk_thread_send_msg(thread, _spdk_bdev_disable_qos_done, ctx);
3764 }
3765 
3766 static void
3767 _spdk_bdev_disable_qos_msg(struct spdk_io_channel_iter *i)
3768 {
3769 	struct spdk_io_channel *ch = spdk_io_channel_iter_get_channel(i);
3770 	struct spdk_bdev_channel *bdev_ch = spdk_io_channel_get_ctx(ch);
3771 
3772 	bdev_ch->flags &= ~BDEV_CH_QOS_ENABLED;
3773 
3774 	spdk_for_each_channel_continue(i, 0);
3775 }
3776 
3777 static void
3778 _spdk_bdev_update_qos_rate_limit_msg(void *cb_arg)
3779 {
3780 	struct set_qos_limit_ctx *ctx = cb_arg;
3781 	struct spdk_bdev *bdev = ctx->bdev;
3782 
3783 	pthread_mutex_lock(&bdev->internal.mutex);
3784 	spdk_bdev_qos_update_max_quota_per_timeslice(bdev->internal.qos);
3785 	pthread_mutex_unlock(&bdev->internal.mutex);
3786 
3787 	_spdk_bdev_set_qos_limit_done(ctx, 0);
3788 }
3789 
3790 static void
3791 _spdk_bdev_enable_qos_msg(struct spdk_io_channel_iter *i)
3792 {
3793 	void *io_device = spdk_io_channel_iter_get_io_device(i);
3794 	struct spdk_bdev *bdev = __bdev_from_io_dev(io_device);
3795 	struct spdk_io_channel *ch = spdk_io_channel_iter_get_channel(i);
3796 	struct spdk_bdev_channel *bdev_ch = spdk_io_channel_get_ctx(ch);
3797 
3798 	pthread_mutex_lock(&bdev->internal.mutex);
3799 	_spdk_bdev_enable_qos(bdev, bdev_ch);
3800 	pthread_mutex_unlock(&bdev->internal.mutex);
3801 	spdk_for_each_channel_continue(i, 0);
3802 }
3803 
3804 static void
3805 _spdk_bdev_enable_qos_done(struct spdk_io_channel_iter *i, int status)
3806 {
3807 	struct set_qos_limit_ctx *ctx = spdk_io_channel_iter_get_ctx(i);
3808 
3809 	_spdk_bdev_set_qos_limit_done(ctx, status);
3810 }
3811 
3812 static void
3813 _spdk_bdev_set_qos_rate_limits(struct spdk_bdev *bdev, uint64_t *limits)
3814 {
3815 	int i;
3816 
3817 	assert(bdev->internal.qos != NULL);
3818 
3819 	for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) {
3820 		if (limits[i] != SPDK_BDEV_QOS_LIMIT_NOT_DEFINED) {
3821 			bdev->internal.qos->rate_limits[i].limit = limits[i];
3822 
3823 			if (limits[i] == 0) {
3824 				bdev->internal.qos->rate_limits[i].limit =
3825 					SPDK_BDEV_QOS_LIMIT_NOT_DEFINED;
3826 			}
3827 		}
3828 	}
3829 }
3830 
3831 void
3832 spdk_bdev_set_qos_rate_limits(struct spdk_bdev *bdev, uint64_t *limits,
3833 			      void (*cb_fn)(void *cb_arg, int status), void *cb_arg)
3834 {
3835 	struct set_qos_limit_ctx	*ctx;
3836 	uint32_t			limit_set_complement;
3837 	uint64_t			min_limit_per_sec;
3838 	int				i;
3839 	bool				disable_rate_limit = true;
3840 
3841 	for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) {
3842 		if (limits[i] == SPDK_BDEV_QOS_LIMIT_NOT_DEFINED) {
3843 			continue;
3844 		}
3845 
3846 		if (limits[i] > 0) {
3847 			disable_rate_limit = false;
3848 		}
3849 
3850 		if (_spdk_bdev_qos_is_iops_rate_limit(i) == true) {
3851 			min_limit_per_sec = SPDK_BDEV_QOS_MIN_IOS_PER_SEC;
3852 		} else {
3853 			/* Change from megabyte to byte rate limit */
3854 			limits[i] = limits[i] * 1024 * 1024;
3855 			min_limit_per_sec = SPDK_BDEV_QOS_MIN_BYTES_PER_SEC;
3856 		}
3857 
3858 		limit_set_complement = limits[i] % min_limit_per_sec;
3859 		if (limit_set_complement) {
3860 			SPDK_ERRLOG("Requested rate limit %" PRIu64 " is not a multiple of %" PRIu64 "\n",
3861 				    limits[i], min_limit_per_sec);
3862 			limits[i] += min_limit_per_sec - limit_set_complement;
3863 			SPDK_ERRLOG("Round up the rate limit to %" PRIu64 "\n", limits[i]);
3864 		}
3865 	}
3866 
3867 	ctx = calloc(1, sizeof(*ctx));
3868 	if (ctx == NULL) {
3869 		cb_fn(cb_arg, -ENOMEM);
3870 		return;
3871 	}
3872 
3873 	ctx->cb_fn = cb_fn;
3874 	ctx->cb_arg = cb_arg;
3875 	ctx->bdev = bdev;
3876 
3877 	pthread_mutex_lock(&bdev->internal.mutex);
3878 	if (bdev->internal.qos_mod_in_progress) {
3879 		pthread_mutex_unlock(&bdev->internal.mutex);
3880 		free(ctx);
3881 		cb_fn(cb_arg, -EAGAIN);
3882 		return;
3883 	}
3884 	bdev->internal.qos_mod_in_progress = true;
3885 
3886 	if (disable_rate_limit == true && bdev->internal.qos) {
3887 		for (i = 0; i < SPDK_BDEV_QOS_NUM_RATE_LIMIT_TYPES; i++) {
3888 			if (limits[i] == SPDK_BDEV_QOS_LIMIT_NOT_DEFINED &&
3889 			    (bdev->internal.qos->rate_limits[i].limit > 0 &&
3890 			     bdev->internal.qos->rate_limits[i].limit !=
3891 			     SPDK_BDEV_QOS_LIMIT_NOT_DEFINED)) {
3892 				disable_rate_limit = false;
3893 				break;
3894 			}
3895 		}
3896 	}
3897 
3898 	if (disable_rate_limit == false) {
3899 		if (bdev->internal.qos == NULL) {
3900 			/* Enabling */
3901 			bdev->internal.qos = calloc(1, sizeof(*bdev->internal.qos));
3902 			if (!bdev->internal.qos) {
3903 				pthread_mutex_unlock(&bdev->internal.mutex);
3904 				SPDK_ERRLOG("Unable to allocate memory for QoS tracking\n");
3905 				free(ctx);
3906 				cb_fn(cb_arg, -ENOMEM);
3907 				return;
3908 			}
3909 
3910 			_spdk_bdev_set_qos_rate_limits(bdev, limits);
3911 
3912 			spdk_for_each_channel(__bdev_to_io_dev(bdev),
3913 					      _spdk_bdev_enable_qos_msg, ctx,
3914 					      _spdk_bdev_enable_qos_done);
3915 		} else {
3916 			/* Updating */
3917 			_spdk_bdev_set_qos_rate_limits(bdev, limits);
3918 
3919 			spdk_thread_send_msg(bdev->internal.qos->thread,
3920 					     _spdk_bdev_update_qos_rate_limit_msg, ctx);
3921 		}
3922 	} else {
3923 		if (bdev->internal.qos != NULL) {
3924 			_spdk_bdev_set_qos_rate_limits(bdev, limits);
3925 
3926 			/* Disabling */
3927 			spdk_for_each_channel(__bdev_to_io_dev(bdev),
3928 					      _spdk_bdev_disable_qos_msg, ctx,
3929 					      _spdk_bdev_disable_qos_msg_done);
3930 		} else {
3931 			pthread_mutex_unlock(&bdev->internal.mutex);
3932 			_spdk_bdev_set_qos_limit_done(ctx, 0);
3933 			return;
3934 		}
3935 	}
3936 
3937 	pthread_mutex_unlock(&bdev->internal.mutex);
3938 }
3939 
3940 SPDK_LOG_REGISTER_COMPONENT("bdev", SPDK_LOG_BDEV)
3941 
3942 SPDK_TRACE_REGISTER_FN(bdev_trace)
3943 {
3944 	spdk_trace_register_owner(OWNER_BDEV, 'b');
3945 	spdk_trace_register_object(OBJECT_BDEV_IO, 'i');
3946 	spdk_trace_register_description("BDEV_IO_START", "", TRACE_BDEV_IO_START, OWNER_BDEV,
3947 					OBJECT_BDEV_IO, 1, 0, "type:   ");
3948 	spdk_trace_register_description("BDEV_IO_DONE", "", TRACE_BDEV_IO_DONE, OWNER_BDEV,
3949 					OBJECT_BDEV_IO, 0, 0, "");
3950 }
3951